1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/fs.h> 5 #include <linux/wait.h> 6 #include <linux/slab.h> 7 #include <linux/gfp.h> 8 #include <linux/sched.h> 9 #include <linux/debugfs.h> 10 #include <linux/seq_file.h> 11 #include <linux/ratelimit.h> 12 #include <linux/bits.h> 13 #include <linux/ktime.h> 14 #include <linux/bitmap.h> 15 #include <linux/mnt_idmapping.h> 16 17 #include "super.h" 18 #include "mds_client.h" 19 #include "crypto.h" 20 21 #include <linux/ceph/ceph_features.h> 22 #include <linux/ceph/messenger.h> 23 #include <linux/ceph/decode.h> 24 #include <linux/ceph/pagelist.h> 25 #include <linux/ceph/auth.h> 26 #include <linux/ceph/debugfs.h> 27 28 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE) 29 30 /* 31 * A cluster of MDS (metadata server) daemons is responsible for 32 * managing the file system namespace (the directory hierarchy and 33 * inodes) and for coordinating shared access to storage. Metadata is 34 * partitioning hierarchically across a number of servers, and that 35 * partition varies over time as the cluster adjusts the distribution 36 * in order to balance load. 37 * 38 * The MDS client is primarily responsible to managing synchronous 39 * metadata requests for operations like open, unlink, and so forth. 40 * If there is a MDS failure, we find out about it when we (possibly 41 * request and) receive a new MDS map, and can resubmit affected 42 * requests. 43 * 44 * For the most part, though, we take advantage of a lossless 45 * communications channel to the MDS, and do not need to worry about 46 * timing out or resubmitting requests. 47 * 48 * We maintain a stateful "session" with each MDS we interact with. 49 * Within each session, we sent periodic heartbeat messages to ensure 50 * any capabilities or leases we have been issues remain valid. If 51 * the session times out and goes stale, our leases and capabilities 52 * are no longer valid. 53 */ 54 55 struct ceph_reconnect_state { 56 struct ceph_mds_session *session; 57 int nr_caps, nr_realms; 58 struct ceph_pagelist *pagelist; 59 unsigned msg_version; 60 bool allow_multi; 61 }; 62 63 static void __wake_requests(struct ceph_mds_client *mdsc, 64 struct list_head *head); 65 static void ceph_cap_release_work(struct work_struct *work); 66 static void ceph_cap_reclaim_work(struct work_struct *work); 67 68 static const struct ceph_connection_operations mds_con_ops; 69 70 71 /* 72 * mds reply parsing 73 */ 74 75 static int parse_reply_info_quota(void **p, void *end, 76 struct ceph_mds_reply_info_in *info) 77 { 78 u8 struct_v, struct_compat; 79 u32 struct_len; 80 81 ceph_decode_8_safe(p, end, struct_v, bad); 82 ceph_decode_8_safe(p, end, struct_compat, bad); 83 /* struct_v is expected to be >= 1. we only 84 * understand encoding with struct_compat == 1. */ 85 if (!struct_v || struct_compat != 1) 86 goto bad; 87 ceph_decode_32_safe(p, end, struct_len, bad); 88 ceph_decode_need(p, end, struct_len, bad); 89 end = *p + struct_len; 90 ceph_decode_64_safe(p, end, info->max_bytes, bad); 91 ceph_decode_64_safe(p, end, info->max_files, bad); 92 *p = end; 93 return 0; 94 bad: 95 return -EIO; 96 } 97 98 /* 99 * parse individual inode info 100 */ 101 static int parse_reply_info_in(void **p, void *end, 102 struct ceph_mds_reply_info_in *info, 103 u64 features) 104 { 105 int err = 0; 106 u8 struct_v = 0; 107 108 if (features == (u64)-1) { 109 u32 struct_len; 110 u8 struct_compat; 111 ceph_decode_8_safe(p, end, struct_v, bad); 112 ceph_decode_8_safe(p, end, struct_compat, bad); 113 /* struct_v is expected to be >= 1. we only understand 114 * encoding with struct_compat == 1. */ 115 if (!struct_v || struct_compat != 1) 116 goto bad; 117 ceph_decode_32_safe(p, end, struct_len, bad); 118 ceph_decode_need(p, end, struct_len, bad); 119 end = *p + struct_len; 120 } 121 122 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad); 123 info->in = *p; 124 *p += sizeof(struct ceph_mds_reply_inode) + 125 sizeof(*info->in->fragtree.splits) * 126 le32_to_cpu(info->in->fragtree.nsplits); 127 128 ceph_decode_32_safe(p, end, info->symlink_len, bad); 129 ceph_decode_need(p, end, info->symlink_len, bad); 130 info->symlink = *p; 131 *p += info->symlink_len; 132 133 ceph_decode_copy_safe(p, end, &info->dir_layout, 134 sizeof(info->dir_layout), bad); 135 ceph_decode_32_safe(p, end, info->xattr_len, bad); 136 ceph_decode_need(p, end, info->xattr_len, bad); 137 info->xattr_data = *p; 138 *p += info->xattr_len; 139 140 if (features == (u64)-1) { 141 /* inline data */ 142 ceph_decode_64_safe(p, end, info->inline_version, bad); 143 ceph_decode_32_safe(p, end, info->inline_len, bad); 144 ceph_decode_need(p, end, info->inline_len, bad); 145 info->inline_data = *p; 146 *p += info->inline_len; 147 /* quota */ 148 err = parse_reply_info_quota(p, end, info); 149 if (err < 0) 150 goto out_bad; 151 /* pool namespace */ 152 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 153 if (info->pool_ns_len > 0) { 154 ceph_decode_need(p, end, info->pool_ns_len, bad); 155 info->pool_ns_data = *p; 156 *p += info->pool_ns_len; 157 } 158 159 /* btime */ 160 ceph_decode_need(p, end, sizeof(info->btime), bad); 161 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 162 163 /* change attribute */ 164 ceph_decode_64_safe(p, end, info->change_attr, bad); 165 166 /* dir pin */ 167 if (struct_v >= 2) { 168 ceph_decode_32_safe(p, end, info->dir_pin, bad); 169 } else { 170 info->dir_pin = -ENODATA; 171 } 172 173 /* snapshot birth time, remains zero for v<=2 */ 174 if (struct_v >= 3) { 175 ceph_decode_need(p, end, sizeof(info->snap_btime), bad); 176 ceph_decode_copy(p, &info->snap_btime, 177 sizeof(info->snap_btime)); 178 } else { 179 memset(&info->snap_btime, 0, sizeof(info->snap_btime)); 180 } 181 182 /* snapshot count, remains zero for v<=3 */ 183 if (struct_v >= 4) { 184 ceph_decode_64_safe(p, end, info->rsnaps, bad); 185 } else { 186 info->rsnaps = 0; 187 } 188 189 if (struct_v >= 5) { 190 u32 alen; 191 192 ceph_decode_32_safe(p, end, alen, bad); 193 194 while (alen--) { 195 u32 len; 196 197 /* key */ 198 ceph_decode_32_safe(p, end, len, bad); 199 ceph_decode_skip_n(p, end, len, bad); 200 /* value */ 201 ceph_decode_32_safe(p, end, len, bad); 202 ceph_decode_skip_n(p, end, len, bad); 203 } 204 } 205 206 /* fscrypt flag -- ignore */ 207 if (struct_v >= 6) 208 ceph_decode_skip_8(p, end, bad); 209 210 info->fscrypt_auth = NULL; 211 info->fscrypt_auth_len = 0; 212 info->fscrypt_file = NULL; 213 info->fscrypt_file_len = 0; 214 if (struct_v >= 7) { 215 ceph_decode_32_safe(p, end, info->fscrypt_auth_len, bad); 216 if (info->fscrypt_auth_len) { 217 info->fscrypt_auth = kmalloc(info->fscrypt_auth_len, 218 GFP_KERNEL); 219 if (!info->fscrypt_auth) 220 return -ENOMEM; 221 ceph_decode_copy_safe(p, end, info->fscrypt_auth, 222 info->fscrypt_auth_len, bad); 223 } 224 ceph_decode_32_safe(p, end, info->fscrypt_file_len, bad); 225 if (info->fscrypt_file_len) { 226 info->fscrypt_file = kmalloc(info->fscrypt_file_len, 227 GFP_KERNEL); 228 if (!info->fscrypt_file) 229 return -ENOMEM; 230 ceph_decode_copy_safe(p, end, info->fscrypt_file, 231 info->fscrypt_file_len, bad); 232 } 233 } 234 *p = end; 235 } else { 236 /* legacy (unversioned) struct */ 237 if (features & CEPH_FEATURE_MDS_INLINE_DATA) { 238 ceph_decode_64_safe(p, end, info->inline_version, bad); 239 ceph_decode_32_safe(p, end, info->inline_len, bad); 240 ceph_decode_need(p, end, info->inline_len, bad); 241 info->inline_data = *p; 242 *p += info->inline_len; 243 } else 244 info->inline_version = CEPH_INLINE_NONE; 245 246 if (features & CEPH_FEATURE_MDS_QUOTA) { 247 err = parse_reply_info_quota(p, end, info); 248 if (err < 0) 249 goto out_bad; 250 } else { 251 info->max_bytes = 0; 252 info->max_files = 0; 253 } 254 255 info->pool_ns_len = 0; 256 info->pool_ns_data = NULL; 257 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { 258 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 259 if (info->pool_ns_len > 0) { 260 ceph_decode_need(p, end, info->pool_ns_len, bad); 261 info->pool_ns_data = *p; 262 *p += info->pool_ns_len; 263 } 264 } 265 266 if (features & CEPH_FEATURE_FS_BTIME) { 267 ceph_decode_need(p, end, sizeof(info->btime), bad); 268 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 269 ceph_decode_64_safe(p, end, info->change_attr, bad); 270 } 271 272 info->dir_pin = -ENODATA; 273 /* info->snap_btime and info->rsnaps remain zero */ 274 } 275 return 0; 276 bad: 277 err = -EIO; 278 out_bad: 279 return err; 280 } 281 282 static int parse_reply_info_dir(void **p, void *end, 283 struct ceph_mds_reply_dirfrag **dirfrag, 284 u64 features) 285 { 286 if (features == (u64)-1) { 287 u8 struct_v, struct_compat; 288 u32 struct_len; 289 ceph_decode_8_safe(p, end, struct_v, bad); 290 ceph_decode_8_safe(p, end, struct_compat, bad); 291 /* struct_v is expected to be >= 1. we only understand 292 * encoding whose struct_compat == 1. */ 293 if (!struct_v || struct_compat != 1) 294 goto bad; 295 ceph_decode_32_safe(p, end, struct_len, bad); 296 ceph_decode_need(p, end, struct_len, bad); 297 end = *p + struct_len; 298 } 299 300 ceph_decode_need(p, end, sizeof(**dirfrag), bad); 301 *dirfrag = *p; 302 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist); 303 if (unlikely(*p > end)) 304 goto bad; 305 if (features == (u64)-1) 306 *p = end; 307 return 0; 308 bad: 309 return -EIO; 310 } 311 312 static int parse_reply_info_lease(void **p, void *end, 313 struct ceph_mds_reply_lease **lease, 314 u64 features, u32 *altname_len, u8 **altname) 315 { 316 u8 struct_v; 317 u32 struct_len; 318 void *lend; 319 320 if (features == (u64)-1) { 321 u8 struct_compat; 322 323 ceph_decode_8_safe(p, end, struct_v, bad); 324 ceph_decode_8_safe(p, end, struct_compat, bad); 325 326 /* struct_v is expected to be >= 1. we only understand 327 * encoding whose struct_compat == 1. */ 328 if (!struct_v || struct_compat != 1) 329 goto bad; 330 331 ceph_decode_32_safe(p, end, struct_len, bad); 332 } else { 333 struct_len = sizeof(**lease); 334 *altname_len = 0; 335 *altname = NULL; 336 } 337 338 lend = *p + struct_len; 339 ceph_decode_need(p, end, struct_len, bad); 340 *lease = *p; 341 *p += sizeof(**lease); 342 343 if (features == (u64)-1) { 344 if (struct_v >= 2) { 345 ceph_decode_32_safe(p, end, *altname_len, bad); 346 ceph_decode_need(p, end, *altname_len, bad); 347 *altname = *p; 348 *p += *altname_len; 349 } else { 350 *altname = NULL; 351 *altname_len = 0; 352 } 353 } 354 *p = lend; 355 return 0; 356 bad: 357 return -EIO; 358 } 359 360 /* 361 * parse a normal reply, which may contain a (dir+)dentry and/or a 362 * target inode. 363 */ 364 static int parse_reply_info_trace(void **p, void *end, 365 struct ceph_mds_reply_info_parsed *info, 366 u64 features) 367 { 368 int err; 369 370 if (info->head->is_dentry) { 371 err = parse_reply_info_in(p, end, &info->diri, features); 372 if (err < 0) 373 goto out_bad; 374 375 err = parse_reply_info_dir(p, end, &info->dirfrag, features); 376 if (err < 0) 377 goto out_bad; 378 379 ceph_decode_32_safe(p, end, info->dname_len, bad); 380 ceph_decode_need(p, end, info->dname_len, bad); 381 info->dname = *p; 382 *p += info->dname_len; 383 384 err = parse_reply_info_lease(p, end, &info->dlease, features, 385 &info->altname_len, &info->altname); 386 if (err < 0) 387 goto out_bad; 388 } 389 390 if (info->head->is_target) { 391 err = parse_reply_info_in(p, end, &info->targeti, features); 392 if (err < 0) 393 goto out_bad; 394 } 395 396 if (unlikely(*p != end)) 397 goto bad; 398 return 0; 399 400 bad: 401 err = -EIO; 402 out_bad: 403 pr_err("problem parsing mds trace %d\n", err); 404 return err; 405 } 406 407 /* 408 * parse readdir results 409 */ 410 static int parse_reply_info_readdir(void **p, void *end, 411 struct ceph_mds_request *req, 412 u64 features) 413 { 414 struct ceph_mds_reply_info_parsed *info = &req->r_reply_info; 415 struct ceph_client *cl = req->r_mdsc->fsc->client; 416 u32 num, i = 0; 417 int err; 418 419 err = parse_reply_info_dir(p, end, &info->dir_dir, features); 420 if (err < 0) 421 goto out_bad; 422 423 ceph_decode_need(p, end, sizeof(num) + 2, bad); 424 num = ceph_decode_32(p); 425 { 426 u16 flags = ceph_decode_16(p); 427 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END); 428 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE); 429 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER); 430 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH); 431 } 432 if (num == 0) 433 goto done; 434 435 BUG_ON(!info->dir_entries); 436 if ((unsigned long)(info->dir_entries + num) > 437 (unsigned long)info->dir_entries + info->dir_buf_size) { 438 pr_err_client(cl, "dir contents are larger than expected\n"); 439 WARN_ON(1); 440 goto bad; 441 } 442 443 info->dir_nr = num; 444 while (num) { 445 struct inode *inode = d_inode(req->r_dentry); 446 struct ceph_inode_info *ci = ceph_inode(inode); 447 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 448 struct fscrypt_str tname = FSTR_INIT(NULL, 0); 449 struct fscrypt_str oname = FSTR_INIT(NULL, 0); 450 struct ceph_fname fname; 451 u32 altname_len, _name_len; 452 u8 *altname, *_name; 453 454 /* dentry */ 455 ceph_decode_32_safe(p, end, _name_len, bad); 456 ceph_decode_need(p, end, _name_len, bad); 457 _name = *p; 458 *p += _name_len; 459 doutc(cl, "parsed dir dname '%.*s'\n", _name_len, _name); 460 461 if (info->hash_order) 462 rde->raw_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash, 463 _name, _name_len); 464 465 /* dentry lease */ 466 err = parse_reply_info_lease(p, end, &rde->lease, features, 467 &altname_len, &altname); 468 if (err) 469 goto out_bad; 470 471 /* 472 * Try to dencrypt the dentry names and update them 473 * in the ceph_mds_reply_dir_entry struct. 474 */ 475 fname.dir = inode; 476 fname.name = _name; 477 fname.name_len = _name_len; 478 fname.ctext = altname; 479 fname.ctext_len = altname_len; 480 /* 481 * The _name_len maybe larger than altname_len, such as 482 * when the human readable name length is in range of 483 * (CEPH_NOHASH_NAME_MAX, CEPH_NOHASH_NAME_MAX + SHA256_DIGEST_SIZE), 484 * then the copy in ceph_fname_to_usr will corrupt the 485 * data if there has no encryption key. 486 * 487 * Just set the no_copy flag and then if there has no 488 * encryption key the oname.name will be assigned to 489 * _name always. 490 */ 491 fname.no_copy = true; 492 if (altname_len == 0) { 493 /* 494 * Set tname to _name, and this will be used 495 * to do the base64_decode in-place. It's 496 * safe because the decoded string should 497 * always be shorter, which is 3/4 of origin 498 * string. 499 */ 500 tname.name = _name; 501 502 /* 503 * Set oname to _name too, and this will be 504 * used to do the dencryption in-place. 505 */ 506 oname.name = _name; 507 oname.len = _name_len; 508 } else { 509 /* 510 * This will do the decryption only in-place 511 * from altname cryptext directly. 512 */ 513 oname.name = altname; 514 oname.len = altname_len; 515 } 516 rde->is_nokey = false; 517 err = ceph_fname_to_usr(&fname, &tname, &oname, &rde->is_nokey); 518 if (err) { 519 pr_err_client(cl, "unable to decode %.*s, got %d\n", 520 _name_len, _name, err); 521 goto out_bad; 522 } 523 rde->name = oname.name; 524 rde->name_len = oname.len; 525 526 /* inode */ 527 err = parse_reply_info_in(p, end, &rde->inode, features); 528 if (err < 0) 529 goto out_bad; 530 /* ceph_readdir_prepopulate() will update it */ 531 rde->offset = 0; 532 i++; 533 num--; 534 } 535 536 done: 537 /* Skip over any unrecognized fields */ 538 *p = end; 539 return 0; 540 541 bad: 542 err = -EIO; 543 out_bad: 544 pr_err_client(cl, "problem parsing dir contents %d\n", err); 545 return err; 546 } 547 548 /* 549 * parse fcntl F_GETLK results 550 */ 551 static int parse_reply_info_filelock(void **p, void *end, 552 struct ceph_mds_reply_info_parsed *info, 553 u64 features) 554 { 555 if (*p + sizeof(*info->filelock_reply) > end) 556 goto bad; 557 558 info->filelock_reply = *p; 559 560 /* Skip over any unrecognized fields */ 561 *p = end; 562 return 0; 563 bad: 564 return -EIO; 565 } 566 567 568 #if BITS_PER_LONG == 64 569 570 #define DELEGATED_INO_AVAILABLE xa_mk_value(1) 571 572 static int ceph_parse_deleg_inos(void **p, void *end, 573 struct ceph_mds_session *s) 574 { 575 struct ceph_client *cl = s->s_mdsc->fsc->client; 576 u32 sets; 577 578 ceph_decode_32_safe(p, end, sets, bad); 579 doutc(cl, "got %u sets of delegated inodes\n", sets); 580 while (sets--) { 581 u64 start, len; 582 583 ceph_decode_64_safe(p, end, start, bad); 584 ceph_decode_64_safe(p, end, len, bad); 585 586 /* Don't accept a delegation of system inodes */ 587 if (start < CEPH_INO_SYSTEM_BASE) { 588 pr_warn_ratelimited_client(cl, 589 "ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n", 590 start, len); 591 continue; 592 } 593 while (len--) { 594 int err = xa_insert(&s->s_delegated_inos, start++, 595 DELEGATED_INO_AVAILABLE, 596 GFP_KERNEL); 597 if (!err) { 598 doutc(cl, "added delegated inode 0x%llx\n", start - 1); 599 } else if (err == -EBUSY) { 600 pr_warn_client(cl, 601 "MDS delegated inode 0x%llx more than once.\n", 602 start - 1); 603 } else { 604 return err; 605 } 606 } 607 } 608 return 0; 609 bad: 610 return -EIO; 611 } 612 613 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 614 { 615 unsigned long ino; 616 void *val; 617 618 xa_for_each(&s->s_delegated_inos, ino, val) { 619 val = xa_erase(&s->s_delegated_inos, ino); 620 if (val == DELEGATED_INO_AVAILABLE) 621 return ino; 622 } 623 return 0; 624 } 625 626 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 627 { 628 return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE, 629 GFP_KERNEL); 630 } 631 #else /* BITS_PER_LONG == 64 */ 632 /* 633 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just 634 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top 635 * and bottom words? 636 */ 637 static int ceph_parse_deleg_inos(void **p, void *end, 638 struct ceph_mds_session *s) 639 { 640 u32 sets; 641 642 ceph_decode_32_safe(p, end, sets, bad); 643 if (sets) 644 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad); 645 return 0; 646 bad: 647 return -EIO; 648 } 649 650 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 651 { 652 return 0; 653 } 654 655 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 656 { 657 return 0; 658 } 659 #endif /* BITS_PER_LONG == 64 */ 660 661 /* 662 * parse create results 663 */ 664 static int parse_reply_info_create(void **p, void *end, 665 struct ceph_mds_reply_info_parsed *info, 666 u64 features, struct ceph_mds_session *s) 667 { 668 int ret; 669 670 if (features == (u64)-1 || 671 (features & CEPH_FEATURE_REPLY_CREATE_INODE)) { 672 if (*p == end) { 673 /* Malformed reply? */ 674 info->has_create_ino = false; 675 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) { 676 info->has_create_ino = true; 677 /* struct_v, struct_compat, and len */ 678 ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad); 679 ceph_decode_64_safe(p, end, info->ino, bad); 680 ret = ceph_parse_deleg_inos(p, end, s); 681 if (ret) 682 return ret; 683 } else { 684 /* legacy */ 685 ceph_decode_64_safe(p, end, info->ino, bad); 686 info->has_create_ino = true; 687 } 688 } else { 689 if (*p != end) 690 goto bad; 691 } 692 693 /* Skip over any unrecognized fields */ 694 *p = end; 695 return 0; 696 bad: 697 return -EIO; 698 } 699 700 static int parse_reply_info_getvxattr(void **p, void *end, 701 struct ceph_mds_reply_info_parsed *info, 702 u64 features) 703 { 704 u32 value_len; 705 706 ceph_decode_skip_8(p, end, bad); /* skip current version: 1 */ 707 ceph_decode_skip_8(p, end, bad); /* skip first version: 1 */ 708 ceph_decode_skip_32(p, end, bad); /* skip payload length */ 709 710 ceph_decode_32_safe(p, end, value_len, bad); 711 712 if (value_len == end - *p) { 713 info->xattr_info.xattr_value = *p; 714 info->xattr_info.xattr_value_len = value_len; 715 *p = end; 716 return value_len; 717 } 718 bad: 719 return -EIO; 720 } 721 722 /* 723 * parse extra results 724 */ 725 static int parse_reply_info_extra(void **p, void *end, 726 struct ceph_mds_request *req, 727 u64 features, struct ceph_mds_session *s) 728 { 729 struct ceph_mds_reply_info_parsed *info = &req->r_reply_info; 730 u32 op = le32_to_cpu(info->head->op); 731 732 if (op == CEPH_MDS_OP_GETFILELOCK) 733 return parse_reply_info_filelock(p, end, info, features); 734 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP) 735 return parse_reply_info_readdir(p, end, req, features); 736 else if (op == CEPH_MDS_OP_CREATE) 737 return parse_reply_info_create(p, end, info, features, s); 738 else if (op == CEPH_MDS_OP_GETVXATTR) 739 return parse_reply_info_getvxattr(p, end, info, features); 740 else 741 return -EIO; 742 } 743 744 /* 745 * parse entire mds reply 746 */ 747 static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg, 748 struct ceph_mds_request *req, u64 features) 749 { 750 struct ceph_mds_reply_info_parsed *info = &req->r_reply_info; 751 struct ceph_client *cl = s->s_mdsc->fsc->client; 752 void *p, *end; 753 u32 len; 754 int err; 755 756 info->head = msg->front.iov_base; 757 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); 758 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head); 759 760 /* trace */ 761 ceph_decode_32_safe(&p, end, len, bad); 762 if (len > 0) { 763 ceph_decode_need(&p, end, len, bad); 764 err = parse_reply_info_trace(&p, p+len, info, features); 765 if (err < 0) 766 goto out_bad; 767 } 768 769 /* extra */ 770 ceph_decode_32_safe(&p, end, len, bad); 771 if (len > 0) { 772 ceph_decode_need(&p, end, len, bad); 773 err = parse_reply_info_extra(&p, p+len, req, features, s); 774 if (err < 0) 775 goto out_bad; 776 } 777 778 /* snap blob */ 779 ceph_decode_32_safe(&p, end, len, bad); 780 info->snapblob_len = len; 781 info->snapblob = p; 782 p += len; 783 784 if (p != end) 785 goto bad; 786 return 0; 787 788 bad: 789 err = -EIO; 790 out_bad: 791 pr_err_client(cl, "mds parse_reply err %d\n", err); 792 ceph_msg_dump(msg); 793 return err; 794 } 795 796 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 797 { 798 int i; 799 800 kfree(info->diri.fscrypt_auth); 801 kfree(info->diri.fscrypt_file); 802 kfree(info->targeti.fscrypt_auth); 803 kfree(info->targeti.fscrypt_file); 804 if (!info->dir_entries) 805 return; 806 807 for (i = 0; i < info->dir_nr; i++) { 808 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 809 810 kfree(rde->inode.fscrypt_auth); 811 kfree(rde->inode.fscrypt_file); 812 } 813 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size)); 814 } 815 816 /* 817 * In async unlink case the kclient won't wait for the first reply 818 * from MDS and just drop all the links and unhash the dentry and then 819 * succeeds immediately. 820 * 821 * For any new create/link/rename,etc requests followed by using the 822 * same file names we must wait for the first reply of the inflight 823 * unlink request, or the MDS possibly will fail these following 824 * requests with -EEXIST if the inflight async unlink request was 825 * delayed for some reasons. 826 * 827 * And the worst case is that for the none async openc request it will 828 * successfully open the file if the CDentry hasn't been unlinked yet, 829 * but later the previous delayed async unlink request will remove the 830 * CDenty. That means the just created file is possiblly deleted later 831 * by accident. 832 * 833 * We need to wait for the inflight async unlink requests to finish 834 * when creating new files/directories by using the same file names. 835 */ 836 int ceph_wait_on_conflict_unlink(struct dentry *dentry) 837 { 838 struct ceph_fs_client *fsc = ceph_sb_to_fs_client(dentry->d_sb); 839 struct ceph_client *cl = fsc->client; 840 struct dentry *pdentry = dentry->d_parent; 841 struct dentry *udentry, *found = NULL; 842 struct ceph_dentry_info *di; 843 struct qstr dname; 844 u32 hash = dentry->d_name.hash; 845 int err; 846 847 dname.name = dentry->d_name.name; 848 dname.len = dentry->d_name.len; 849 850 rcu_read_lock(); 851 hash_for_each_possible_rcu(fsc->async_unlink_conflict, di, 852 hnode, hash) { 853 udentry = di->dentry; 854 855 spin_lock(&udentry->d_lock); 856 if (udentry->d_name.hash != hash) 857 goto next; 858 if (unlikely(udentry->d_parent != pdentry)) 859 goto next; 860 if (!hash_hashed(&di->hnode)) 861 goto next; 862 863 if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags)) 864 pr_warn_client(cl, "dentry %p:%pd async unlink bit is not set\n", 865 dentry, dentry); 866 867 if (!d_same_name(udentry, pdentry, &dname)) 868 goto next; 869 870 found = dget_dlock(udentry); 871 spin_unlock(&udentry->d_lock); 872 break; 873 next: 874 spin_unlock(&udentry->d_lock); 875 } 876 rcu_read_unlock(); 877 878 if (likely(!found)) 879 return 0; 880 881 doutc(cl, "dentry %p:%pd conflict with old %p:%pd\n", dentry, dentry, 882 found, found); 883 884 err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT, 885 TASK_KILLABLE); 886 dput(found); 887 return err; 888 } 889 890 891 /* 892 * sessions 893 */ 894 const char *ceph_session_state_name(int s) 895 { 896 switch (s) { 897 case CEPH_MDS_SESSION_NEW: return "new"; 898 case CEPH_MDS_SESSION_OPENING: return "opening"; 899 case CEPH_MDS_SESSION_OPEN: return "open"; 900 case CEPH_MDS_SESSION_HUNG: return "hung"; 901 case CEPH_MDS_SESSION_CLOSING: return "closing"; 902 case CEPH_MDS_SESSION_CLOSED: return "closed"; 903 case CEPH_MDS_SESSION_RESTARTING: return "restarting"; 904 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; 905 case CEPH_MDS_SESSION_REJECTED: return "rejected"; 906 default: return "???"; 907 } 908 } 909 910 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s) 911 { 912 if (refcount_inc_not_zero(&s->s_ref)) 913 return s; 914 return NULL; 915 } 916 917 void ceph_put_mds_session(struct ceph_mds_session *s) 918 { 919 if (IS_ERR_OR_NULL(s)) 920 return; 921 922 if (refcount_dec_and_test(&s->s_ref)) { 923 if (s->s_auth.authorizer) 924 ceph_auth_destroy_authorizer(s->s_auth.authorizer); 925 WARN_ON(mutex_is_locked(&s->s_mutex)); 926 xa_destroy(&s->s_delegated_inos); 927 kfree(s); 928 } 929 } 930 931 /* 932 * called under mdsc->mutex 933 */ 934 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, 935 int mds) 936 { 937 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 938 return NULL; 939 return ceph_get_mds_session(mdsc->sessions[mds]); 940 } 941 942 static bool __have_session(struct ceph_mds_client *mdsc, int mds) 943 { 944 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 945 return false; 946 else 947 return true; 948 } 949 950 static int __verify_registered_session(struct ceph_mds_client *mdsc, 951 struct ceph_mds_session *s) 952 { 953 if (s->s_mds >= mdsc->max_sessions || 954 mdsc->sessions[s->s_mds] != s) 955 return -ENOENT; 956 return 0; 957 } 958 959 /* 960 * create+register a new session for given mds. 961 * called under mdsc->mutex. 962 */ 963 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, 964 int mds) 965 { 966 struct ceph_client *cl = mdsc->fsc->client; 967 struct ceph_mds_session *s; 968 969 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) 970 return ERR_PTR(-EIO); 971 972 if (mds >= mdsc->mdsmap->possible_max_rank) 973 return ERR_PTR(-EINVAL); 974 975 s = kzalloc(sizeof(*s), GFP_NOFS); 976 if (!s) 977 return ERR_PTR(-ENOMEM); 978 979 if (mds >= mdsc->max_sessions) { 980 int newmax = 1 << get_count_order(mds + 1); 981 struct ceph_mds_session **sa; 982 983 doutc(cl, "realloc to %d\n", newmax); 984 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS); 985 if (!sa) 986 goto fail_realloc; 987 if (mdsc->sessions) { 988 memcpy(sa, mdsc->sessions, 989 mdsc->max_sessions * sizeof(void *)); 990 kfree(mdsc->sessions); 991 } 992 mdsc->sessions = sa; 993 mdsc->max_sessions = newmax; 994 } 995 996 doutc(cl, "mds%d\n", mds); 997 s->s_mdsc = mdsc; 998 s->s_mds = mds; 999 s->s_state = CEPH_MDS_SESSION_NEW; 1000 mutex_init(&s->s_mutex); 1001 1002 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr); 1003 1004 atomic_set(&s->s_cap_gen, 1); 1005 s->s_cap_ttl = jiffies - 1; 1006 1007 spin_lock_init(&s->s_cap_lock); 1008 INIT_LIST_HEAD(&s->s_caps); 1009 refcount_set(&s->s_ref, 1); 1010 INIT_LIST_HEAD(&s->s_waiting); 1011 INIT_LIST_HEAD(&s->s_unsafe); 1012 xa_init(&s->s_delegated_inos); 1013 INIT_LIST_HEAD(&s->s_cap_releases); 1014 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work); 1015 1016 INIT_LIST_HEAD(&s->s_cap_dirty); 1017 INIT_LIST_HEAD(&s->s_cap_flushing); 1018 1019 mdsc->sessions[mds] = s; 1020 atomic_inc(&mdsc->num_sessions); 1021 refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 1022 1023 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, 1024 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 1025 1026 return s; 1027 1028 fail_realloc: 1029 kfree(s); 1030 return ERR_PTR(-ENOMEM); 1031 } 1032 1033 /* 1034 * called under mdsc->mutex 1035 */ 1036 static void __unregister_session(struct ceph_mds_client *mdsc, 1037 struct ceph_mds_session *s) 1038 { 1039 doutc(mdsc->fsc->client, "mds%d %p\n", s->s_mds, s); 1040 BUG_ON(mdsc->sessions[s->s_mds] != s); 1041 mdsc->sessions[s->s_mds] = NULL; 1042 ceph_con_close(&s->s_con); 1043 ceph_put_mds_session(s); 1044 atomic_dec(&mdsc->num_sessions); 1045 } 1046 1047 /* 1048 * drop session refs in request. 1049 * 1050 * should be last request ref, or hold mdsc->mutex 1051 */ 1052 static void put_request_session(struct ceph_mds_request *req) 1053 { 1054 if (req->r_session) { 1055 ceph_put_mds_session(req->r_session); 1056 req->r_session = NULL; 1057 } 1058 } 1059 1060 void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc, 1061 void (*cb)(struct ceph_mds_session *), 1062 bool check_state) 1063 { 1064 int mds; 1065 1066 mutex_lock(&mdsc->mutex); 1067 for (mds = 0; mds < mdsc->max_sessions; ++mds) { 1068 struct ceph_mds_session *s; 1069 1070 s = __ceph_lookup_mds_session(mdsc, mds); 1071 if (!s) 1072 continue; 1073 1074 if (check_state && !check_session_state(s)) { 1075 ceph_put_mds_session(s); 1076 continue; 1077 } 1078 1079 mutex_unlock(&mdsc->mutex); 1080 cb(s); 1081 ceph_put_mds_session(s); 1082 mutex_lock(&mdsc->mutex); 1083 } 1084 mutex_unlock(&mdsc->mutex); 1085 } 1086 1087 void ceph_mdsc_release_request(struct kref *kref) 1088 { 1089 struct ceph_mds_request *req = container_of(kref, 1090 struct ceph_mds_request, 1091 r_kref); 1092 ceph_mdsc_release_dir_caps_no_check(req); 1093 destroy_reply_info(&req->r_reply_info); 1094 if (req->r_request) 1095 ceph_msg_put(req->r_request); 1096 if (req->r_reply) 1097 ceph_msg_put(req->r_reply); 1098 if (req->r_inode) { 1099 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 1100 iput(req->r_inode); 1101 } 1102 if (req->r_parent) { 1103 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 1104 iput(req->r_parent); 1105 } 1106 iput(req->r_target_inode); 1107 iput(req->r_new_inode); 1108 if (req->r_dentry) 1109 dput(req->r_dentry); 1110 if (req->r_old_dentry) 1111 dput(req->r_old_dentry); 1112 if (req->r_old_dentry_dir) { 1113 /* 1114 * track (and drop pins for) r_old_dentry_dir 1115 * separately, since r_old_dentry's d_parent may have 1116 * changed between the dir mutex being dropped and 1117 * this request being freed. 1118 */ 1119 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), 1120 CEPH_CAP_PIN); 1121 iput(req->r_old_dentry_dir); 1122 } 1123 kfree(req->r_path1); 1124 kfree(req->r_path2); 1125 put_cred(req->r_cred); 1126 if (req->r_mnt_idmap) 1127 mnt_idmap_put(req->r_mnt_idmap); 1128 if (req->r_pagelist) 1129 ceph_pagelist_release(req->r_pagelist); 1130 kfree(req->r_fscrypt_auth); 1131 kfree(req->r_altname); 1132 put_request_session(req); 1133 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); 1134 WARN_ON_ONCE(!list_empty(&req->r_wait)); 1135 kmem_cache_free(ceph_mds_request_cachep, req); 1136 } 1137 1138 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node) 1139 1140 /* 1141 * lookup session, bump ref if found. 1142 * 1143 * called under mdsc->mutex. 1144 */ 1145 static struct ceph_mds_request * 1146 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid) 1147 { 1148 struct ceph_mds_request *req; 1149 1150 req = lookup_request(&mdsc->request_tree, tid); 1151 if (req) 1152 ceph_mdsc_get_request(req); 1153 1154 return req; 1155 } 1156 1157 /* 1158 * Register an in-flight request, and assign a tid. Link to directory 1159 * are modifying (if any). 1160 * 1161 * Called under mdsc->mutex. 1162 */ 1163 static void __register_request(struct ceph_mds_client *mdsc, 1164 struct ceph_mds_request *req, 1165 struct inode *dir) 1166 { 1167 struct ceph_client *cl = mdsc->fsc->client; 1168 int ret = 0; 1169 1170 req->r_tid = ++mdsc->last_tid; 1171 if (req->r_num_caps) { 1172 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation, 1173 req->r_num_caps); 1174 if (ret < 0) { 1175 pr_err_client(cl, "%p failed to reserve caps: %d\n", 1176 req, ret); 1177 /* set req->r_err to fail early from __do_request */ 1178 req->r_err = ret; 1179 return; 1180 } 1181 } 1182 doutc(cl, "%p tid %lld\n", req, req->r_tid); 1183 ceph_mdsc_get_request(req); 1184 insert_request(&mdsc->request_tree, req); 1185 1186 req->r_cred = get_current_cred(); 1187 if (!req->r_mnt_idmap) 1188 req->r_mnt_idmap = &nop_mnt_idmap; 1189 1190 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK) 1191 mdsc->oldest_tid = req->r_tid; 1192 1193 if (dir) { 1194 struct ceph_inode_info *ci = ceph_inode(dir); 1195 1196 ihold(dir); 1197 req->r_unsafe_dir = dir; 1198 spin_lock(&ci->i_unsafe_lock); 1199 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); 1200 spin_unlock(&ci->i_unsafe_lock); 1201 } 1202 } 1203 1204 static void __unregister_request(struct ceph_mds_client *mdsc, 1205 struct ceph_mds_request *req) 1206 { 1207 doutc(mdsc->fsc->client, "%p tid %lld\n", req, req->r_tid); 1208 1209 /* Never leave an unregistered request on an unsafe list! */ 1210 list_del_init(&req->r_unsafe_item); 1211 1212 if (req->r_tid == mdsc->oldest_tid) { 1213 struct rb_node *p = rb_next(&req->r_node); 1214 mdsc->oldest_tid = 0; 1215 while (p) { 1216 struct ceph_mds_request *next_req = 1217 rb_entry(p, struct ceph_mds_request, r_node); 1218 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) { 1219 mdsc->oldest_tid = next_req->r_tid; 1220 break; 1221 } 1222 p = rb_next(p); 1223 } 1224 } 1225 1226 erase_request(&mdsc->request_tree, req); 1227 1228 if (req->r_unsafe_dir) { 1229 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 1230 spin_lock(&ci->i_unsafe_lock); 1231 list_del_init(&req->r_unsafe_dir_item); 1232 spin_unlock(&ci->i_unsafe_lock); 1233 } 1234 if (req->r_target_inode && 1235 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 1236 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); 1237 spin_lock(&ci->i_unsafe_lock); 1238 list_del_init(&req->r_unsafe_target_item); 1239 spin_unlock(&ci->i_unsafe_lock); 1240 } 1241 1242 if (req->r_unsafe_dir) { 1243 iput(req->r_unsafe_dir); 1244 req->r_unsafe_dir = NULL; 1245 } 1246 1247 complete_all(&req->r_safe_completion); 1248 1249 ceph_mdsc_put_request(req); 1250 } 1251 1252 /* 1253 * Walk back up the dentry tree until we hit a dentry representing a 1254 * non-snapshot inode. We do this using the rcu_read_lock (which must be held 1255 * when calling this) to ensure that the objects won't disappear while we're 1256 * working with them. Once we hit a candidate dentry, we attempt to take a 1257 * reference to it, and return that as the result. 1258 */ 1259 static struct inode *get_nonsnap_parent(struct dentry *dentry) 1260 { 1261 struct inode *inode = NULL; 1262 1263 while (dentry && !IS_ROOT(dentry)) { 1264 inode = d_inode_rcu(dentry); 1265 if (!inode || ceph_snap(inode) == CEPH_NOSNAP) 1266 break; 1267 dentry = dentry->d_parent; 1268 } 1269 if (inode) 1270 inode = igrab(inode); 1271 return inode; 1272 } 1273 1274 /* 1275 * Choose mds to send request to next. If there is a hint set in the 1276 * request (e.g., due to a prior forward hint from the mds), use that. 1277 * Otherwise, consult frag tree and/or caps to identify the 1278 * appropriate mds. If all else fails, choose randomly. 1279 * 1280 * Called under mdsc->mutex. 1281 */ 1282 static int __choose_mds(struct ceph_mds_client *mdsc, 1283 struct ceph_mds_request *req, 1284 bool *random) 1285 { 1286 struct inode *inode; 1287 struct ceph_inode_info *ci; 1288 struct ceph_cap *cap; 1289 int mode = req->r_direct_mode; 1290 int mds = -1; 1291 u32 hash = req->r_direct_hash; 1292 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); 1293 struct ceph_client *cl = mdsc->fsc->client; 1294 1295 if (random) 1296 *random = false; 1297 1298 /* 1299 * is there a specific mds we should try? ignore hint if we have 1300 * no session and the mds is not up (active or recovering). 1301 */ 1302 if (req->r_resend_mds >= 0 && 1303 (__have_session(mdsc, req->r_resend_mds) || 1304 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) { 1305 doutc(cl, "using resend_mds mds%d\n", req->r_resend_mds); 1306 return req->r_resend_mds; 1307 } 1308 1309 if (mode == USE_RANDOM_MDS) 1310 goto random; 1311 1312 inode = NULL; 1313 if (req->r_inode) { 1314 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) { 1315 inode = req->r_inode; 1316 ihold(inode); 1317 } else { 1318 /* req->r_dentry is non-null for LSSNAP request */ 1319 rcu_read_lock(); 1320 inode = get_nonsnap_parent(req->r_dentry); 1321 rcu_read_unlock(); 1322 doutc(cl, "using snapdir's parent %p %llx.%llx\n", 1323 inode, ceph_vinop(inode)); 1324 } 1325 } else if (req->r_dentry) { 1326 /* ignore race with rename; old or new d_parent is okay */ 1327 struct dentry *parent; 1328 struct inode *dir; 1329 1330 rcu_read_lock(); 1331 parent = READ_ONCE(req->r_dentry->d_parent); 1332 dir = req->r_parent ? : d_inode_rcu(parent); 1333 1334 if (!dir || dir->i_sb != mdsc->fsc->sb) { 1335 /* not this fs or parent went negative */ 1336 inode = d_inode(req->r_dentry); 1337 if (inode) 1338 ihold(inode); 1339 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 1340 /* direct snapped/virtual snapdir requests 1341 * based on parent dir inode */ 1342 inode = get_nonsnap_parent(parent); 1343 doutc(cl, "using nonsnap parent %p %llx.%llx\n", 1344 inode, ceph_vinop(inode)); 1345 } else { 1346 /* dentry target */ 1347 inode = d_inode(req->r_dentry); 1348 if (!inode || mode == USE_AUTH_MDS) { 1349 /* dir + name */ 1350 inode = igrab(dir); 1351 hash = ceph_dentry_hash(dir, req->r_dentry); 1352 is_hash = true; 1353 } else { 1354 ihold(inode); 1355 } 1356 } 1357 rcu_read_unlock(); 1358 } 1359 1360 if (!inode) 1361 goto random; 1362 1363 doutc(cl, "%p %llx.%llx is_hash=%d (0x%x) mode %d\n", inode, 1364 ceph_vinop(inode), (int)is_hash, hash, mode); 1365 ci = ceph_inode(inode); 1366 1367 if (is_hash && S_ISDIR(inode->i_mode)) { 1368 struct ceph_inode_frag frag; 1369 int found; 1370 1371 ceph_choose_frag(ci, hash, &frag, &found); 1372 if (found) { 1373 if (mode == USE_ANY_MDS && frag.ndist > 0) { 1374 u8 r; 1375 1376 /* choose a random replica */ 1377 get_random_bytes(&r, 1); 1378 r %= frag.ndist; 1379 mds = frag.dist[r]; 1380 doutc(cl, "%p %llx.%llx frag %u mds%d (%d/%d)\n", 1381 inode, ceph_vinop(inode), frag.frag, 1382 mds, (int)r, frag.ndist); 1383 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1384 CEPH_MDS_STATE_ACTIVE && 1385 !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds)) 1386 goto out; 1387 } 1388 1389 /* since this file/dir wasn't known to be 1390 * replicated, then we want to look for the 1391 * authoritative mds. */ 1392 if (frag.mds >= 0) { 1393 /* choose auth mds */ 1394 mds = frag.mds; 1395 doutc(cl, "%p %llx.%llx frag %u mds%d (auth)\n", 1396 inode, ceph_vinop(inode), frag.frag, mds); 1397 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1398 CEPH_MDS_STATE_ACTIVE) { 1399 if (!ceph_mdsmap_is_laggy(mdsc->mdsmap, 1400 mds)) 1401 goto out; 1402 } 1403 } 1404 mode = USE_AUTH_MDS; 1405 } 1406 } 1407 1408 spin_lock(&ci->i_ceph_lock); 1409 cap = NULL; 1410 if (mode == USE_AUTH_MDS) 1411 cap = ci->i_auth_cap; 1412 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) 1413 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); 1414 if (!cap) { 1415 spin_unlock(&ci->i_ceph_lock); 1416 iput(inode); 1417 goto random; 1418 } 1419 mds = cap->session->s_mds; 1420 doutc(cl, "%p %llx.%llx mds%d (%scap %p)\n", inode, 1421 ceph_vinop(inode), mds, 1422 cap == ci->i_auth_cap ? "auth " : "", cap); 1423 spin_unlock(&ci->i_ceph_lock); 1424 out: 1425 iput(inode); 1426 return mds; 1427 1428 random: 1429 if (random) 1430 *random = true; 1431 1432 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap); 1433 doutc(cl, "chose random mds%d\n", mds); 1434 return mds; 1435 } 1436 1437 1438 /* 1439 * session messages 1440 */ 1441 struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq) 1442 { 1443 struct ceph_msg *msg; 1444 struct ceph_mds_session_head *h; 1445 1446 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS, 1447 false); 1448 if (!msg) { 1449 pr_err("ENOMEM creating session %s msg\n", 1450 ceph_session_op_name(op)); 1451 return NULL; 1452 } 1453 h = msg->front.iov_base; 1454 h->op = cpu_to_le32(op); 1455 h->seq = cpu_to_le64(seq); 1456 1457 return msg; 1458 } 1459 1460 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED; 1461 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8) 1462 static int encode_supported_features(void **p, void *end) 1463 { 1464 static const size_t count = ARRAY_SIZE(feature_bits); 1465 1466 if (count > 0) { 1467 size_t i; 1468 size_t size = FEATURE_BYTES(count); 1469 unsigned long bit; 1470 1471 if (WARN_ON_ONCE(*p + 4 + size > end)) 1472 return -ERANGE; 1473 1474 ceph_encode_32(p, size); 1475 memset(*p, 0, size); 1476 for (i = 0; i < count; i++) { 1477 bit = feature_bits[i]; 1478 ((unsigned char *)(*p))[bit / 8] |= BIT(bit % 8); 1479 } 1480 *p += size; 1481 } else { 1482 if (WARN_ON_ONCE(*p + 4 > end)) 1483 return -ERANGE; 1484 1485 ceph_encode_32(p, 0); 1486 } 1487 1488 return 0; 1489 } 1490 1491 static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED; 1492 #define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8) 1493 static int encode_metric_spec(void **p, void *end) 1494 { 1495 static const size_t count = ARRAY_SIZE(metric_bits); 1496 1497 /* header */ 1498 if (WARN_ON_ONCE(*p + 2 > end)) 1499 return -ERANGE; 1500 1501 ceph_encode_8(p, 1); /* version */ 1502 ceph_encode_8(p, 1); /* compat */ 1503 1504 if (count > 0) { 1505 size_t i; 1506 size_t size = METRIC_BYTES(count); 1507 1508 if (WARN_ON_ONCE(*p + 4 + 4 + size > end)) 1509 return -ERANGE; 1510 1511 /* metric spec info length */ 1512 ceph_encode_32(p, 4 + size); 1513 1514 /* metric spec */ 1515 ceph_encode_32(p, size); 1516 memset(*p, 0, size); 1517 for (i = 0; i < count; i++) 1518 ((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8); 1519 *p += size; 1520 } else { 1521 if (WARN_ON_ONCE(*p + 4 + 4 > end)) 1522 return -ERANGE; 1523 1524 /* metric spec info length */ 1525 ceph_encode_32(p, 4); 1526 /* metric spec */ 1527 ceph_encode_32(p, 0); 1528 } 1529 1530 return 0; 1531 } 1532 1533 /* 1534 * session message, specialization for CEPH_SESSION_REQUEST_OPEN 1535 * to include additional client metadata fields. 1536 */ 1537 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq) 1538 { 1539 struct ceph_msg *msg; 1540 struct ceph_mds_session_head *h; 1541 int i; 1542 int extra_bytes = 0; 1543 int metadata_key_count = 0; 1544 struct ceph_options *opt = mdsc->fsc->client->options; 1545 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options; 1546 struct ceph_client *cl = mdsc->fsc->client; 1547 size_t size, count; 1548 void *p, *end; 1549 int ret; 1550 1551 const char* metadata[][2] = { 1552 {"hostname", mdsc->nodename}, 1553 {"kernel_version", init_utsname()->release}, 1554 {"entity_id", opt->name ? : ""}, 1555 {"root", fsopt->server_path ? : "/"}, 1556 {NULL, NULL} 1557 }; 1558 1559 /* Calculate serialized length of metadata */ 1560 extra_bytes = 4; /* map length */ 1561 for (i = 0; metadata[i][0]; ++i) { 1562 extra_bytes += 8 + strlen(metadata[i][0]) + 1563 strlen(metadata[i][1]); 1564 metadata_key_count++; 1565 } 1566 1567 /* supported feature */ 1568 size = 0; 1569 count = ARRAY_SIZE(feature_bits); 1570 if (count > 0) 1571 size = FEATURE_BYTES(count); 1572 extra_bytes += 4 + size; 1573 1574 /* metric spec */ 1575 size = 0; 1576 count = ARRAY_SIZE(metric_bits); 1577 if (count > 0) 1578 size = METRIC_BYTES(count); 1579 extra_bytes += 2 + 4 + 4 + size; 1580 1581 /* Allocate the message */ 1582 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes, 1583 GFP_NOFS, false); 1584 if (!msg) { 1585 pr_err_client(cl, "ENOMEM creating session open msg\n"); 1586 return ERR_PTR(-ENOMEM); 1587 } 1588 p = msg->front.iov_base; 1589 end = p + msg->front.iov_len; 1590 1591 h = p; 1592 h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN); 1593 h->seq = cpu_to_le64(seq); 1594 1595 /* 1596 * Serialize client metadata into waiting buffer space, using 1597 * the format that userspace expects for map<string, string> 1598 * 1599 * ClientSession messages with metadata are v4 1600 */ 1601 msg->hdr.version = cpu_to_le16(4); 1602 msg->hdr.compat_version = cpu_to_le16(1); 1603 1604 /* The write pointer, following the session_head structure */ 1605 p += sizeof(*h); 1606 1607 /* Number of entries in the map */ 1608 ceph_encode_32(&p, metadata_key_count); 1609 1610 /* Two length-prefixed strings for each entry in the map */ 1611 for (i = 0; metadata[i][0]; ++i) { 1612 size_t const key_len = strlen(metadata[i][0]); 1613 size_t const val_len = strlen(metadata[i][1]); 1614 1615 ceph_encode_32(&p, key_len); 1616 memcpy(p, metadata[i][0], key_len); 1617 p += key_len; 1618 ceph_encode_32(&p, val_len); 1619 memcpy(p, metadata[i][1], val_len); 1620 p += val_len; 1621 } 1622 1623 ret = encode_supported_features(&p, end); 1624 if (ret) { 1625 pr_err_client(cl, "encode_supported_features failed!\n"); 1626 ceph_msg_put(msg); 1627 return ERR_PTR(ret); 1628 } 1629 1630 ret = encode_metric_spec(&p, end); 1631 if (ret) { 1632 pr_err_client(cl, "encode_metric_spec failed!\n"); 1633 ceph_msg_put(msg); 1634 return ERR_PTR(ret); 1635 } 1636 1637 msg->front.iov_len = p - msg->front.iov_base; 1638 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1639 1640 return msg; 1641 } 1642 1643 /* 1644 * send session open request. 1645 * 1646 * called under mdsc->mutex 1647 */ 1648 static int __open_session(struct ceph_mds_client *mdsc, 1649 struct ceph_mds_session *session) 1650 { 1651 struct ceph_msg *msg; 1652 int mstate; 1653 int mds = session->s_mds; 1654 1655 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) 1656 return -EIO; 1657 1658 /* wait for mds to go active? */ 1659 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 1660 doutc(mdsc->fsc->client, "open_session to mds%d (%s)\n", mds, 1661 ceph_mds_state_name(mstate)); 1662 session->s_state = CEPH_MDS_SESSION_OPENING; 1663 session->s_renew_requested = jiffies; 1664 1665 /* send connect message */ 1666 msg = create_session_open_msg(mdsc, session->s_seq); 1667 if (IS_ERR(msg)) 1668 return PTR_ERR(msg); 1669 ceph_con_send(&session->s_con, msg); 1670 return 0; 1671 } 1672 1673 /* 1674 * open sessions for any export targets for the given mds 1675 * 1676 * called under mdsc->mutex 1677 */ 1678 static struct ceph_mds_session * 1679 __open_export_target_session(struct ceph_mds_client *mdsc, int target) 1680 { 1681 struct ceph_mds_session *session; 1682 int ret; 1683 1684 session = __ceph_lookup_mds_session(mdsc, target); 1685 if (!session) { 1686 session = register_session(mdsc, target); 1687 if (IS_ERR(session)) 1688 return session; 1689 } 1690 if (session->s_state == CEPH_MDS_SESSION_NEW || 1691 session->s_state == CEPH_MDS_SESSION_CLOSING) { 1692 ret = __open_session(mdsc, session); 1693 if (ret) 1694 return ERR_PTR(ret); 1695 } 1696 1697 return session; 1698 } 1699 1700 struct ceph_mds_session * 1701 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target) 1702 { 1703 struct ceph_mds_session *session; 1704 struct ceph_client *cl = mdsc->fsc->client; 1705 1706 doutc(cl, "to mds%d\n", target); 1707 1708 mutex_lock(&mdsc->mutex); 1709 session = __open_export_target_session(mdsc, target); 1710 mutex_unlock(&mdsc->mutex); 1711 1712 return session; 1713 } 1714 1715 static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 1716 struct ceph_mds_session *session) 1717 { 1718 struct ceph_mds_info *mi; 1719 struct ceph_mds_session *ts; 1720 int i, mds = session->s_mds; 1721 struct ceph_client *cl = mdsc->fsc->client; 1722 1723 if (mds >= mdsc->mdsmap->possible_max_rank) 1724 return; 1725 1726 mi = &mdsc->mdsmap->m_info[mds]; 1727 doutc(cl, "for mds%d (%d targets)\n", session->s_mds, 1728 mi->num_export_targets); 1729 1730 for (i = 0; i < mi->num_export_targets; i++) { 1731 ts = __open_export_target_session(mdsc, mi->export_targets[i]); 1732 ceph_put_mds_session(ts); 1733 } 1734 } 1735 1736 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, 1737 struct ceph_mds_session *session) 1738 { 1739 mutex_lock(&mdsc->mutex); 1740 __open_export_target_sessions(mdsc, session); 1741 mutex_unlock(&mdsc->mutex); 1742 } 1743 1744 /* 1745 * session caps 1746 */ 1747 1748 static void detach_cap_releases(struct ceph_mds_session *session, 1749 struct list_head *target) 1750 { 1751 struct ceph_client *cl = session->s_mdsc->fsc->client; 1752 1753 lockdep_assert_held(&session->s_cap_lock); 1754 1755 list_splice_init(&session->s_cap_releases, target); 1756 session->s_num_cap_releases = 0; 1757 doutc(cl, "mds%d\n", session->s_mds); 1758 } 1759 1760 static void dispose_cap_releases(struct ceph_mds_client *mdsc, 1761 struct list_head *dispose) 1762 { 1763 while (!list_empty(dispose)) { 1764 struct ceph_cap *cap; 1765 /* zero out the in-progress message */ 1766 cap = list_first_entry(dispose, struct ceph_cap, session_caps); 1767 list_del(&cap->session_caps); 1768 ceph_put_cap(mdsc, cap); 1769 } 1770 } 1771 1772 static void cleanup_session_requests(struct ceph_mds_client *mdsc, 1773 struct ceph_mds_session *session) 1774 { 1775 struct ceph_client *cl = mdsc->fsc->client; 1776 struct ceph_mds_request *req; 1777 struct rb_node *p; 1778 1779 doutc(cl, "mds%d\n", session->s_mds); 1780 mutex_lock(&mdsc->mutex); 1781 while (!list_empty(&session->s_unsafe)) { 1782 req = list_first_entry(&session->s_unsafe, 1783 struct ceph_mds_request, r_unsafe_item); 1784 pr_warn_ratelimited_client(cl, " dropping unsafe request %llu\n", 1785 req->r_tid); 1786 if (req->r_target_inode) 1787 mapping_set_error(req->r_target_inode->i_mapping, -EIO); 1788 if (req->r_unsafe_dir) 1789 mapping_set_error(req->r_unsafe_dir->i_mapping, -EIO); 1790 __unregister_request(mdsc, req); 1791 } 1792 /* zero r_attempts, so kick_requests() will re-send requests */ 1793 p = rb_first(&mdsc->request_tree); 1794 while (p) { 1795 req = rb_entry(p, struct ceph_mds_request, r_node); 1796 p = rb_next(p); 1797 if (req->r_session && 1798 req->r_session->s_mds == session->s_mds) 1799 req->r_attempts = 0; 1800 } 1801 mutex_unlock(&mdsc->mutex); 1802 } 1803 1804 /* 1805 * Helper to safely iterate over all caps associated with a session, with 1806 * special care taken to handle a racing __ceph_remove_cap(). 1807 * 1808 * Caller must hold session s_mutex. 1809 */ 1810 int ceph_iterate_session_caps(struct ceph_mds_session *session, 1811 int (*cb)(struct inode *, int mds, void *), 1812 void *arg) 1813 { 1814 struct ceph_client *cl = session->s_mdsc->fsc->client; 1815 struct list_head *p; 1816 struct ceph_cap *cap; 1817 struct inode *inode, *last_inode = NULL; 1818 struct ceph_cap *old_cap = NULL; 1819 int ret; 1820 1821 doutc(cl, "%p mds%d\n", session, session->s_mds); 1822 spin_lock(&session->s_cap_lock); 1823 p = session->s_caps.next; 1824 while (p != &session->s_caps) { 1825 int mds; 1826 1827 cap = list_entry(p, struct ceph_cap, session_caps); 1828 inode = igrab(&cap->ci->netfs.inode); 1829 if (!inode) { 1830 p = p->next; 1831 continue; 1832 } 1833 session->s_cap_iterator = cap; 1834 mds = cap->mds; 1835 spin_unlock(&session->s_cap_lock); 1836 1837 if (last_inode) { 1838 iput(last_inode); 1839 last_inode = NULL; 1840 } 1841 if (old_cap) { 1842 ceph_put_cap(session->s_mdsc, old_cap); 1843 old_cap = NULL; 1844 } 1845 1846 ret = cb(inode, mds, arg); 1847 last_inode = inode; 1848 1849 spin_lock(&session->s_cap_lock); 1850 p = p->next; 1851 if (!cap->ci) { 1852 doutc(cl, "finishing cap %p removal\n", cap); 1853 BUG_ON(cap->session != session); 1854 cap->session = NULL; 1855 list_del_init(&cap->session_caps); 1856 session->s_nr_caps--; 1857 atomic64_dec(&session->s_mdsc->metric.total_caps); 1858 if (cap->queue_release) 1859 __ceph_queue_cap_release(session, cap); 1860 else 1861 old_cap = cap; /* put_cap it w/o locks held */ 1862 } 1863 if (ret < 0) 1864 goto out; 1865 } 1866 ret = 0; 1867 out: 1868 session->s_cap_iterator = NULL; 1869 spin_unlock(&session->s_cap_lock); 1870 1871 iput(last_inode); 1872 if (old_cap) 1873 ceph_put_cap(session->s_mdsc, old_cap); 1874 1875 return ret; 1876 } 1877 1878 static int remove_session_caps_cb(struct inode *inode, int mds, void *arg) 1879 { 1880 struct ceph_inode_info *ci = ceph_inode(inode); 1881 struct ceph_client *cl = ceph_inode_to_client(inode); 1882 bool invalidate = false; 1883 struct ceph_cap *cap; 1884 int iputs = 0; 1885 1886 spin_lock(&ci->i_ceph_lock); 1887 cap = __get_cap_for_mds(ci, mds); 1888 if (cap) { 1889 doutc(cl, " removing cap %p, ci is %p, inode is %p\n", 1890 cap, ci, &ci->netfs.inode); 1891 1892 iputs = ceph_purge_inode_cap(inode, cap, &invalidate); 1893 } 1894 spin_unlock(&ci->i_ceph_lock); 1895 1896 if (cap) 1897 wake_up_all(&ci->i_cap_wq); 1898 if (invalidate) 1899 ceph_queue_invalidate(inode); 1900 while (iputs--) 1901 iput(inode); 1902 return 0; 1903 } 1904 1905 /* 1906 * caller must hold session s_mutex 1907 */ 1908 static void remove_session_caps(struct ceph_mds_session *session) 1909 { 1910 struct ceph_fs_client *fsc = session->s_mdsc->fsc; 1911 struct super_block *sb = fsc->sb; 1912 LIST_HEAD(dispose); 1913 1914 doutc(fsc->client, "on %p\n", session); 1915 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc); 1916 1917 wake_up_all(&fsc->mdsc->cap_flushing_wq); 1918 1919 spin_lock(&session->s_cap_lock); 1920 if (session->s_nr_caps > 0) { 1921 struct inode *inode; 1922 struct ceph_cap *cap, *prev = NULL; 1923 struct ceph_vino vino; 1924 /* 1925 * iterate_session_caps() skips inodes that are being 1926 * deleted, we need to wait until deletions are complete. 1927 * __wait_on_freeing_inode() is designed for the job, 1928 * but it is not exported, so use lookup inode function 1929 * to access it. 1930 */ 1931 while (!list_empty(&session->s_caps)) { 1932 cap = list_entry(session->s_caps.next, 1933 struct ceph_cap, session_caps); 1934 if (cap == prev) 1935 break; 1936 prev = cap; 1937 vino = cap->ci->i_vino; 1938 spin_unlock(&session->s_cap_lock); 1939 1940 inode = ceph_find_inode(sb, vino); 1941 iput(inode); 1942 1943 spin_lock(&session->s_cap_lock); 1944 } 1945 } 1946 1947 // drop cap expires and unlock s_cap_lock 1948 detach_cap_releases(session, &dispose); 1949 1950 BUG_ON(session->s_nr_caps > 0); 1951 BUG_ON(!list_empty(&session->s_cap_flushing)); 1952 spin_unlock(&session->s_cap_lock); 1953 dispose_cap_releases(session->s_mdsc, &dispose); 1954 } 1955 1956 enum { 1957 RECONNECT, 1958 RENEWCAPS, 1959 FORCE_RO, 1960 }; 1961 1962 /* 1963 * wake up any threads waiting on this session's caps. if the cap is 1964 * old (didn't get renewed on the client reconnect), remove it now. 1965 * 1966 * caller must hold s_mutex. 1967 */ 1968 static int wake_up_session_cb(struct inode *inode, int mds, void *arg) 1969 { 1970 struct ceph_inode_info *ci = ceph_inode(inode); 1971 unsigned long ev = (unsigned long)arg; 1972 1973 if (ev == RECONNECT) { 1974 spin_lock(&ci->i_ceph_lock); 1975 ci->i_wanted_max_size = 0; 1976 ci->i_requested_max_size = 0; 1977 spin_unlock(&ci->i_ceph_lock); 1978 } else if (ev == RENEWCAPS) { 1979 struct ceph_cap *cap; 1980 1981 spin_lock(&ci->i_ceph_lock); 1982 cap = __get_cap_for_mds(ci, mds); 1983 /* mds did not re-issue stale cap */ 1984 if (cap && cap->cap_gen < atomic_read(&cap->session->s_cap_gen)) 1985 cap->issued = cap->implemented = CEPH_CAP_PIN; 1986 spin_unlock(&ci->i_ceph_lock); 1987 } else if (ev == FORCE_RO) { 1988 } 1989 wake_up_all(&ci->i_cap_wq); 1990 return 0; 1991 } 1992 1993 static void wake_up_session_caps(struct ceph_mds_session *session, int ev) 1994 { 1995 struct ceph_client *cl = session->s_mdsc->fsc->client; 1996 1997 doutc(cl, "session %p mds%d\n", session, session->s_mds); 1998 ceph_iterate_session_caps(session, wake_up_session_cb, 1999 (void *)(unsigned long)ev); 2000 } 2001 2002 /* 2003 * Send periodic message to MDS renewing all currently held caps. The 2004 * ack will reset the expiration for all caps from this session. 2005 * 2006 * caller holds s_mutex 2007 */ 2008 static int send_renew_caps(struct ceph_mds_client *mdsc, 2009 struct ceph_mds_session *session) 2010 { 2011 struct ceph_client *cl = mdsc->fsc->client; 2012 struct ceph_msg *msg; 2013 int state; 2014 2015 if (time_after_eq(jiffies, session->s_cap_ttl) && 2016 time_after_eq(session->s_cap_ttl, session->s_renew_requested)) 2017 pr_info_client(cl, "mds%d caps stale\n", session->s_mds); 2018 session->s_renew_requested = jiffies; 2019 2020 /* do not try to renew caps until a recovering mds has reconnected 2021 * with its clients. */ 2022 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); 2023 if (state < CEPH_MDS_STATE_RECONNECT) { 2024 doutc(cl, "ignoring mds%d (%s)\n", session->s_mds, 2025 ceph_mds_state_name(state)); 2026 return 0; 2027 } 2028 2029 doutc(cl, "to mds%d (%s)\n", session->s_mds, 2030 ceph_mds_state_name(state)); 2031 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, 2032 ++session->s_renew_seq); 2033 if (!msg) 2034 return -ENOMEM; 2035 ceph_con_send(&session->s_con, msg); 2036 return 0; 2037 } 2038 2039 static int send_flushmsg_ack(struct ceph_mds_client *mdsc, 2040 struct ceph_mds_session *session, u64 seq) 2041 { 2042 struct ceph_client *cl = mdsc->fsc->client; 2043 struct ceph_msg *msg; 2044 2045 doutc(cl, "to mds%d (%s)s seq %lld\n", session->s_mds, 2046 ceph_session_state_name(session->s_state), seq); 2047 msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq); 2048 if (!msg) 2049 return -ENOMEM; 2050 ceph_con_send(&session->s_con, msg); 2051 return 0; 2052 } 2053 2054 2055 /* 2056 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 2057 * 2058 * Called under session->s_mutex 2059 */ 2060 static void renewed_caps(struct ceph_mds_client *mdsc, 2061 struct ceph_mds_session *session, int is_renew) 2062 { 2063 struct ceph_client *cl = mdsc->fsc->client; 2064 int was_stale; 2065 int wake = 0; 2066 2067 spin_lock(&session->s_cap_lock); 2068 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl); 2069 2070 session->s_cap_ttl = session->s_renew_requested + 2071 mdsc->mdsmap->m_session_timeout*HZ; 2072 2073 if (was_stale) { 2074 if (time_before(jiffies, session->s_cap_ttl)) { 2075 pr_info_client(cl, "mds%d caps renewed\n", 2076 session->s_mds); 2077 wake = 1; 2078 } else { 2079 pr_info_client(cl, "mds%d caps still stale\n", 2080 session->s_mds); 2081 } 2082 } 2083 doutc(cl, "mds%d ttl now %lu, was %s, now %s\n", session->s_mds, 2084 session->s_cap_ttl, was_stale ? "stale" : "fresh", 2085 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); 2086 spin_unlock(&session->s_cap_lock); 2087 2088 if (wake) 2089 wake_up_session_caps(session, RENEWCAPS); 2090 } 2091 2092 /* 2093 * send a session close request 2094 */ 2095 static int request_close_session(struct ceph_mds_session *session) 2096 { 2097 struct ceph_client *cl = session->s_mdsc->fsc->client; 2098 struct ceph_msg *msg; 2099 2100 doutc(cl, "mds%d state %s seq %lld\n", session->s_mds, 2101 ceph_session_state_name(session->s_state), session->s_seq); 2102 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE, 2103 session->s_seq); 2104 if (!msg) 2105 return -ENOMEM; 2106 ceph_con_send(&session->s_con, msg); 2107 return 1; 2108 } 2109 2110 /* 2111 * Called with s_mutex held. 2112 */ 2113 static int __close_session(struct ceph_mds_client *mdsc, 2114 struct ceph_mds_session *session) 2115 { 2116 if (session->s_state >= CEPH_MDS_SESSION_CLOSING) 2117 return 0; 2118 session->s_state = CEPH_MDS_SESSION_CLOSING; 2119 return request_close_session(session); 2120 } 2121 2122 static bool drop_negative_children(struct dentry *dentry) 2123 { 2124 struct dentry *child; 2125 bool all_negative = true; 2126 2127 if (!d_is_dir(dentry)) 2128 goto out; 2129 2130 spin_lock(&dentry->d_lock); 2131 list_for_each_entry(child, &dentry->d_subdirs, d_child) { 2132 if (d_really_is_positive(child)) { 2133 all_negative = false; 2134 break; 2135 } 2136 } 2137 spin_unlock(&dentry->d_lock); 2138 2139 if (all_negative) 2140 shrink_dcache_parent(dentry); 2141 out: 2142 return all_negative; 2143 } 2144 2145 /* 2146 * Trim old(er) caps. 2147 * 2148 * Because we can't cache an inode without one or more caps, we do 2149 * this indirectly: if a cap is unused, we prune its aliases, at which 2150 * point the inode will hopefully get dropped to. 2151 * 2152 * Yes, this is a bit sloppy. Our only real goal here is to respond to 2153 * memory pressure from the MDS, though, so it needn't be perfect. 2154 */ 2155 static int trim_caps_cb(struct inode *inode, int mds, void *arg) 2156 { 2157 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); 2158 struct ceph_client *cl = mdsc->fsc->client; 2159 int *remaining = arg; 2160 struct ceph_inode_info *ci = ceph_inode(inode); 2161 int used, wanted, oissued, mine; 2162 struct ceph_cap *cap; 2163 2164 if (*remaining <= 0) 2165 return -1; 2166 2167 spin_lock(&ci->i_ceph_lock); 2168 cap = __get_cap_for_mds(ci, mds); 2169 if (!cap) { 2170 spin_unlock(&ci->i_ceph_lock); 2171 return 0; 2172 } 2173 mine = cap->issued | cap->implemented; 2174 used = __ceph_caps_used(ci); 2175 wanted = __ceph_caps_file_wanted(ci); 2176 oissued = __ceph_caps_issued_other(ci, cap); 2177 2178 doutc(cl, "%p %llx.%llx cap %p mine %s oissued %s used %s wanted %s\n", 2179 inode, ceph_vinop(inode), cap, ceph_cap_string(mine), 2180 ceph_cap_string(oissued), ceph_cap_string(used), 2181 ceph_cap_string(wanted)); 2182 if (cap == ci->i_auth_cap) { 2183 if (ci->i_dirty_caps || ci->i_flushing_caps || 2184 !list_empty(&ci->i_cap_snaps)) 2185 goto out; 2186 if ((used | wanted) & CEPH_CAP_ANY_WR) 2187 goto out; 2188 /* Note: it's possible that i_filelock_ref becomes non-zero 2189 * after dropping auth caps. It doesn't hurt because reply 2190 * of lock mds request will re-add auth caps. */ 2191 if (atomic_read(&ci->i_filelock_ref) > 0) 2192 goto out; 2193 } 2194 /* The inode has cached pages, but it's no longer used. 2195 * we can safely drop it */ 2196 if (S_ISREG(inode->i_mode) && 2197 wanted == 0 && used == CEPH_CAP_FILE_CACHE && 2198 !(oissued & CEPH_CAP_FILE_CACHE)) { 2199 used = 0; 2200 oissued = 0; 2201 } 2202 if ((used | wanted) & ~oissued & mine) 2203 goto out; /* we need these caps */ 2204 2205 if (oissued) { 2206 /* we aren't the only cap.. just remove us */ 2207 ceph_remove_cap(mdsc, cap, true); 2208 (*remaining)--; 2209 } else { 2210 struct dentry *dentry; 2211 /* try dropping referring dentries */ 2212 spin_unlock(&ci->i_ceph_lock); 2213 dentry = d_find_any_alias(inode); 2214 if (dentry && drop_negative_children(dentry)) { 2215 int count; 2216 dput(dentry); 2217 d_prune_aliases(inode); 2218 count = atomic_read(&inode->i_count); 2219 if (count == 1) 2220 (*remaining)--; 2221 doutc(cl, "%p %llx.%llx cap %p pruned, count now %d\n", 2222 inode, ceph_vinop(inode), cap, count); 2223 } else { 2224 dput(dentry); 2225 } 2226 return 0; 2227 } 2228 2229 out: 2230 spin_unlock(&ci->i_ceph_lock); 2231 return 0; 2232 } 2233 2234 /* 2235 * Trim session cap count down to some max number. 2236 */ 2237 int ceph_trim_caps(struct ceph_mds_client *mdsc, 2238 struct ceph_mds_session *session, 2239 int max_caps) 2240 { 2241 struct ceph_client *cl = mdsc->fsc->client; 2242 int trim_caps = session->s_nr_caps - max_caps; 2243 2244 doutc(cl, "mds%d start: %d / %d, trim %d\n", session->s_mds, 2245 session->s_nr_caps, max_caps, trim_caps); 2246 if (trim_caps > 0) { 2247 int remaining = trim_caps; 2248 2249 ceph_iterate_session_caps(session, trim_caps_cb, &remaining); 2250 doutc(cl, "mds%d done: %d / %d, trimmed %d\n", 2251 session->s_mds, session->s_nr_caps, max_caps, 2252 trim_caps - remaining); 2253 } 2254 2255 ceph_flush_cap_releases(mdsc, session); 2256 return 0; 2257 } 2258 2259 static int check_caps_flush(struct ceph_mds_client *mdsc, 2260 u64 want_flush_tid) 2261 { 2262 struct ceph_client *cl = mdsc->fsc->client; 2263 int ret = 1; 2264 2265 spin_lock(&mdsc->cap_dirty_lock); 2266 if (!list_empty(&mdsc->cap_flush_list)) { 2267 struct ceph_cap_flush *cf = 2268 list_first_entry(&mdsc->cap_flush_list, 2269 struct ceph_cap_flush, g_list); 2270 if (cf->tid <= want_flush_tid) { 2271 doutc(cl, "still flushing tid %llu <= %llu\n", 2272 cf->tid, want_flush_tid); 2273 ret = 0; 2274 } 2275 } 2276 spin_unlock(&mdsc->cap_dirty_lock); 2277 return ret; 2278 } 2279 2280 /* 2281 * flush all dirty inode data to disk. 2282 * 2283 * returns true if we've flushed through want_flush_tid 2284 */ 2285 static void wait_caps_flush(struct ceph_mds_client *mdsc, 2286 u64 want_flush_tid) 2287 { 2288 struct ceph_client *cl = mdsc->fsc->client; 2289 2290 doutc(cl, "want %llu\n", want_flush_tid); 2291 2292 wait_event(mdsc->cap_flushing_wq, 2293 check_caps_flush(mdsc, want_flush_tid)); 2294 2295 doutc(cl, "ok, flushed thru %llu\n", want_flush_tid); 2296 } 2297 2298 /* 2299 * called under s_mutex 2300 */ 2301 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 2302 struct ceph_mds_session *session) 2303 { 2304 struct ceph_client *cl = mdsc->fsc->client; 2305 struct ceph_msg *msg = NULL; 2306 struct ceph_mds_cap_release *head; 2307 struct ceph_mds_cap_item *item; 2308 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; 2309 struct ceph_cap *cap; 2310 LIST_HEAD(tmp_list); 2311 int num_cap_releases; 2312 __le32 barrier, *cap_barrier; 2313 2314 down_read(&osdc->lock); 2315 barrier = cpu_to_le32(osdc->epoch_barrier); 2316 up_read(&osdc->lock); 2317 2318 spin_lock(&session->s_cap_lock); 2319 again: 2320 list_splice_init(&session->s_cap_releases, &tmp_list); 2321 num_cap_releases = session->s_num_cap_releases; 2322 session->s_num_cap_releases = 0; 2323 spin_unlock(&session->s_cap_lock); 2324 2325 while (!list_empty(&tmp_list)) { 2326 if (!msg) { 2327 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, 2328 PAGE_SIZE, GFP_NOFS, false); 2329 if (!msg) 2330 goto out_err; 2331 head = msg->front.iov_base; 2332 head->num = cpu_to_le32(0); 2333 msg->front.iov_len = sizeof(*head); 2334 2335 msg->hdr.version = cpu_to_le16(2); 2336 msg->hdr.compat_version = cpu_to_le16(1); 2337 } 2338 2339 cap = list_first_entry(&tmp_list, struct ceph_cap, 2340 session_caps); 2341 list_del(&cap->session_caps); 2342 num_cap_releases--; 2343 2344 head = msg->front.iov_base; 2345 put_unaligned_le32(get_unaligned_le32(&head->num) + 1, 2346 &head->num); 2347 item = msg->front.iov_base + msg->front.iov_len; 2348 item->ino = cpu_to_le64(cap->cap_ino); 2349 item->cap_id = cpu_to_le64(cap->cap_id); 2350 item->migrate_seq = cpu_to_le32(cap->mseq); 2351 item->seq = cpu_to_le32(cap->issue_seq); 2352 msg->front.iov_len += sizeof(*item); 2353 2354 ceph_put_cap(mdsc, cap); 2355 2356 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { 2357 // Append cap_barrier field 2358 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2359 *cap_barrier = barrier; 2360 msg->front.iov_len += sizeof(*cap_barrier); 2361 2362 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2363 doutc(cl, "mds%d %p\n", session->s_mds, msg); 2364 ceph_con_send(&session->s_con, msg); 2365 msg = NULL; 2366 } 2367 } 2368 2369 BUG_ON(num_cap_releases != 0); 2370 2371 spin_lock(&session->s_cap_lock); 2372 if (!list_empty(&session->s_cap_releases)) 2373 goto again; 2374 spin_unlock(&session->s_cap_lock); 2375 2376 if (msg) { 2377 // Append cap_barrier field 2378 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2379 *cap_barrier = barrier; 2380 msg->front.iov_len += sizeof(*cap_barrier); 2381 2382 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2383 doutc(cl, "mds%d %p\n", session->s_mds, msg); 2384 ceph_con_send(&session->s_con, msg); 2385 } 2386 return; 2387 out_err: 2388 pr_err_client(cl, "mds%d, failed to allocate message\n", 2389 session->s_mds); 2390 spin_lock(&session->s_cap_lock); 2391 list_splice(&tmp_list, &session->s_cap_releases); 2392 session->s_num_cap_releases += num_cap_releases; 2393 spin_unlock(&session->s_cap_lock); 2394 } 2395 2396 static void ceph_cap_release_work(struct work_struct *work) 2397 { 2398 struct ceph_mds_session *session = 2399 container_of(work, struct ceph_mds_session, s_cap_release_work); 2400 2401 mutex_lock(&session->s_mutex); 2402 if (session->s_state == CEPH_MDS_SESSION_OPEN || 2403 session->s_state == CEPH_MDS_SESSION_HUNG) 2404 ceph_send_cap_releases(session->s_mdsc, session); 2405 mutex_unlock(&session->s_mutex); 2406 ceph_put_mds_session(session); 2407 } 2408 2409 void ceph_flush_cap_releases(struct ceph_mds_client *mdsc, 2410 struct ceph_mds_session *session) 2411 { 2412 struct ceph_client *cl = mdsc->fsc->client; 2413 if (mdsc->stopping) 2414 return; 2415 2416 ceph_get_mds_session(session); 2417 if (queue_work(mdsc->fsc->cap_wq, 2418 &session->s_cap_release_work)) { 2419 doutc(cl, "cap release work queued\n"); 2420 } else { 2421 ceph_put_mds_session(session); 2422 doutc(cl, "failed to queue cap release work\n"); 2423 } 2424 } 2425 2426 /* 2427 * caller holds session->s_cap_lock 2428 */ 2429 void __ceph_queue_cap_release(struct ceph_mds_session *session, 2430 struct ceph_cap *cap) 2431 { 2432 list_add_tail(&cap->session_caps, &session->s_cap_releases); 2433 session->s_num_cap_releases++; 2434 2435 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE)) 2436 ceph_flush_cap_releases(session->s_mdsc, session); 2437 } 2438 2439 static void ceph_cap_reclaim_work(struct work_struct *work) 2440 { 2441 struct ceph_mds_client *mdsc = 2442 container_of(work, struct ceph_mds_client, cap_reclaim_work); 2443 int ret = ceph_trim_dentries(mdsc); 2444 if (ret == -EAGAIN) 2445 ceph_queue_cap_reclaim_work(mdsc); 2446 } 2447 2448 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc) 2449 { 2450 struct ceph_client *cl = mdsc->fsc->client; 2451 if (mdsc->stopping) 2452 return; 2453 2454 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) { 2455 doutc(cl, "caps reclaim work queued\n"); 2456 } else { 2457 doutc(cl, "failed to queue caps release work\n"); 2458 } 2459 } 2460 2461 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr) 2462 { 2463 int val; 2464 if (!nr) 2465 return; 2466 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending); 2467 if ((val % CEPH_CAPS_PER_RELEASE) < nr) { 2468 atomic_set(&mdsc->cap_reclaim_pending, 0); 2469 ceph_queue_cap_reclaim_work(mdsc); 2470 } 2471 } 2472 2473 /* 2474 * requests 2475 */ 2476 2477 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, 2478 struct inode *dir) 2479 { 2480 struct ceph_inode_info *ci = ceph_inode(dir); 2481 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; 2482 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options; 2483 size_t size = sizeof(struct ceph_mds_reply_dir_entry); 2484 unsigned int num_entries; 2485 int order; 2486 2487 spin_lock(&ci->i_ceph_lock); 2488 num_entries = ci->i_files + ci->i_subdirs; 2489 spin_unlock(&ci->i_ceph_lock); 2490 num_entries = max(num_entries, 1U); 2491 num_entries = min(num_entries, opt->max_readdir); 2492 2493 order = get_order(size * num_entries); 2494 while (order >= 0) { 2495 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL | 2496 __GFP_NOWARN | 2497 __GFP_ZERO, 2498 order); 2499 if (rinfo->dir_entries) 2500 break; 2501 order--; 2502 } 2503 if (!rinfo->dir_entries) 2504 return -ENOMEM; 2505 2506 num_entries = (PAGE_SIZE << order) / size; 2507 num_entries = min(num_entries, opt->max_readdir); 2508 2509 rinfo->dir_buf_size = PAGE_SIZE << order; 2510 req->r_num_caps = num_entries + 1; 2511 req->r_args.readdir.max_entries = cpu_to_le32(num_entries); 2512 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes); 2513 return 0; 2514 } 2515 2516 /* 2517 * Create an mds request. 2518 */ 2519 struct ceph_mds_request * 2520 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) 2521 { 2522 struct ceph_mds_request *req; 2523 2524 req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS); 2525 if (!req) 2526 return ERR_PTR(-ENOMEM); 2527 2528 mutex_init(&req->r_fill_mutex); 2529 req->r_mdsc = mdsc; 2530 req->r_started = jiffies; 2531 req->r_start_latency = ktime_get(); 2532 req->r_resend_mds = -1; 2533 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 2534 INIT_LIST_HEAD(&req->r_unsafe_target_item); 2535 req->r_fmode = -1; 2536 req->r_feature_needed = -1; 2537 kref_init(&req->r_kref); 2538 RB_CLEAR_NODE(&req->r_node); 2539 INIT_LIST_HEAD(&req->r_wait); 2540 init_completion(&req->r_completion); 2541 init_completion(&req->r_safe_completion); 2542 INIT_LIST_HEAD(&req->r_unsafe_item); 2543 2544 ktime_get_coarse_real_ts64(&req->r_stamp); 2545 2546 req->r_op = op; 2547 req->r_direct_mode = mode; 2548 return req; 2549 } 2550 2551 /* 2552 * return oldest (lowest) request, tid in request tree, 0 if none. 2553 * 2554 * called under mdsc->mutex. 2555 */ 2556 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) 2557 { 2558 if (RB_EMPTY_ROOT(&mdsc->request_tree)) 2559 return NULL; 2560 return rb_entry(rb_first(&mdsc->request_tree), 2561 struct ceph_mds_request, r_node); 2562 } 2563 2564 static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) 2565 { 2566 return mdsc->oldest_tid; 2567 } 2568 2569 #if IS_ENABLED(CONFIG_FS_ENCRYPTION) 2570 static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen) 2571 { 2572 struct inode *dir = req->r_parent; 2573 struct dentry *dentry = req->r_dentry; 2574 u8 *cryptbuf = NULL; 2575 u32 len = 0; 2576 int ret = 0; 2577 2578 /* only encode if we have parent and dentry */ 2579 if (!dir || !dentry) 2580 goto success; 2581 2582 /* No-op unless this is encrypted */ 2583 if (!IS_ENCRYPTED(dir)) 2584 goto success; 2585 2586 ret = ceph_fscrypt_prepare_readdir(dir); 2587 if (ret < 0) 2588 return ERR_PTR(ret); 2589 2590 /* No key? Just ignore it. */ 2591 if (!fscrypt_has_encryption_key(dir)) 2592 goto success; 2593 2594 if (!fscrypt_fname_encrypted_size(dir, dentry->d_name.len, NAME_MAX, 2595 &len)) { 2596 WARN_ON_ONCE(1); 2597 return ERR_PTR(-ENAMETOOLONG); 2598 } 2599 2600 /* No need to append altname if name is short enough */ 2601 if (len <= CEPH_NOHASH_NAME_MAX) { 2602 len = 0; 2603 goto success; 2604 } 2605 2606 cryptbuf = kmalloc(len, GFP_KERNEL); 2607 if (!cryptbuf) 2608 return ERR_PTR(-ENOMEM); 2609 2610 ret = fscrypt_fname_encrypt(dir, &dentry->d_name, cryptbuf, len); 2611 if (ret) { 2612 kfree(cryptbuf); 2613 return ERR_PTR(ret); 2614 } 2615 success: 2616 *plen = len; 2617 return cryptbuf; 2618 } 2619 #else 2620 static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen) 2621 { 2622 *plen = 0; 2623 return NULL; 2624 } 2625 #endif 2626 2627 /** 2628 * ceph_mdsc_build_path - build a path string to a given dentry 2629 * @mdsc: mds client 2630 * @dentry: dentry to which path should be built 2631 * @plen: returned length of string 2632 * @pbase: returned base inode number 2633 * @for_wire: is this path going to be sent to the MDS? 2634 * 2635 * Build a string that represents the path to the dentry. This is mostly called 2636 * for two different purposes: 2637 * 2638 * 1) we need to build a path string to send to the MDS (for_wire == true) 2639 * 2) we need a path string for local presentation (e.g. debugfs) 2640 * (for_wire == false) 2641 * 2642 * The path is built in reverse, starting with the dentry. Walk back up toward 2643 * the root, building the path until the first non-snapped inode is reached 2644 * (for_wire) or the root inode is reached (!for_wire). 2645 * 2646 * Encode hidden .snap dirs as a double /, i.e. 2647 * foo/.snap/bar -> foo//bar 2648 */ 2649 char *ceph_mdsc_build_path(struct ceph_mds_client *mdsc, struct dentry *dentry, 2650 int *plen, u64 *pbase, int for_wire) 2651 { 2652 struct ceph_client *cl = mdsc->fsc->client; 2653 struct dentry *cur; 2654 struct inode *inode; 2655 char *path; 2656 int pos; 2657 unsigned seq; 2658 u64 base; 2659 2660 if (!dentry) 2661 return ERR_PTR(-EINVAL); 2662 2663 path = __getname(); 2664 if (!path) 2665 return ERR_PTR(-ENOMEM); 2666 retry: 2667 pos = PATH_MAX - 1; 2668 path[pos] = '\0'; 2669 2670 seq = read_seqbegin(&rename_lock); 2671 cur = dget(dentry); 2672 for (;;) { 2673 struct dentry *parent; 2674 2675 spin_lock(&cur->d_lock); 2676 inode = d_inode(cur); 2677 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 2678 doutc(cl, "path+%d: %p SNAPDIR\n", pos, cur); 2679 spin_unlock(&cur->d_lock); 2680 parent = dget_parent(cur); 2681 } else if (for_wire && inode && dentry != cur && 2682 ceph_snap(inode) == CEPH_NOSNAP) { 2683 spin_unlock(&cur->d_lock); 2684 pos++; /* get rid of any prepended '/' */ 2685 break; 2686 } else if (!for_wire || !IS_ENCRYPTED(d_inode(cur->d_parent))) { 2687 pos -= cur->d_name.len; 2688 if (pos < 0) { 2689 spin_unlock(&cur->d_lock); 2690 break; 2691 } 2692 memcpy(path + pos, cur->d_name.name, cur->d_name.len); 2693 spin_unlock(&cur->d_lock); 2694 parent = dget_parent(cur); 2695 } else { 2696 int len, ret; 2697 char buf[NAME_MAX]; 2698 2699 /* 2700 * Proactively copy name into buf, in case we need to 2701 * present it as-is. 2702 */ 2703 memcpy(buf, cur->d_name.name, cur->d_name.len); 2704 len = cur->d_name.len; 2705 spin_unlock(&cur->d_lock); 2706 parent = dget_parent(cur); 2707 2708 ret = ceph_fscrypt_prepare_readdir(d_inode(parent)); 2709 if (ret < 0) { 2710 dput(parent); 2711 dput(cur); 2712 return ERR_PTR(ret); 2713 } 2714 2715 if (fscrypt_has_encryption_key(d_inode(parent))) { 2716 len = ceph_encode_encrypted_fname(d_inode(parent), 2717 cur, buf); 2718 if (len < 0) { 2719 dput(parent); 2720 dput(cur); 2721 return ERR_PTR(len); 2722 } 2723 } 2724 pos -= len; 2725 if (pos < 0) { 2726 dput(parent); 2727 break; 2728 } 2729 memcpy(path + pos, buf, len); 2730 } 2731 dput(cur); 2732 cur = parent; 2733 2734 /* Are we at the root? */ 2735 if (IS_ROOT(cur)) 2736 break; 2737 2738 /* Are we out of buffer? */ 2739 if (--pos < 0) 2740 break; 2741 2742 path[pos] = '/'; 2743 } 2744 inode = d_inode(cur); 2745 base = inode ? ceph_ino(inode) : 0; 2746 dput(cur); 2747 2748 if (read_seqretry(&rename_lock, seq)) 2749 goto retry; 2750 2751 if (pos < 0) { 2752 /* 2753 * A rename didn't occur, but somehow we didn't end up where 2754 * we thought we would. Throw a warning and try again. 2755 */ 2756 pr_warn_client(cl, "did not end path lookup where expected (pos = %d)\n", 2757 pos); 2758 goto retry; 2759 } 2760 2761 *pbase = base; 2762 *plen = PATH_MAX - 1 - pos; 2763 doutc(cl, "on %p %d built %llx '%.*s'\n", dentry, d_count(dentry), 2764 base, *plen, path + pos); 2765 return path + pos; 2766 } 2767 2768 static int build_dentry_path(struct ceph_mds_client *mdsc, struct dentry *dentry, 2769 struct inode *dir, const char **ppath, int *ppathlen, 2770 u64 *pino, bool *pfreepath, bool parent_locked) 2771 { 2772 char *path; 2773 2774 rcu_read_lock(); 2775 if (!dir) 2776 dir = d_inode_rcu(dentry->d_parent); 2777 if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP && 2778 !IS_ENCRYPTED(dir)) { 2779 *pino = ceph_ino(dir); 2780 rcu_read_unlock(); 2781 *ppath = dentry->d_name.name; 2782 *ppathlen = dentry->d_name.len; 2783 return 0; 2784 } 2785 rcu_read_unlock(); 2786 path = ceph_mdsc_build_path(mdsc, dentry, ppathlen, pino, 1); 2787 if (IS_ERR(path)) 2788 return PTR_ERR(path); 2789 *ppath = path; 2790 *pfreepath = true; 2791 return 0; 2792 } 2793 2794 static int build_inode_path(struct inode *inode, 2795 const char **ppath, int *ppathlen, u64 *pino, 2796 bool *pfreepath) 2797 { 2798 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); 2799 struct dentry *dentry; 2800 char *path; 2801 2802 if (ceph_snap(inode) == CEPH_NOSNAP) { 2803 *pino = ceph_ino(inode); 2804 *ppathlen = 0; 2805 return 0; 2806 } 2807 dentry = d_find_alias(inode); 2808 path = ceph_mdsc_build_path(mdsc, dentry, ppathlen, pino, 1); 2809 dput(dentry); 2810 if (IS_ERR(path)) 2811 return PTR_ERR(path); 2812 *ppath = path; 2813 *pfreepath = true; 2814 return 0; 2815 } 2816 2817 /* 2818 * request arguments may be specified via an inode *, a dentry *, or 2819 * an explicit ino+path. 2820 */ 2821 static int set_request_path_attr(struct ceph_mds_client *mdsc, struct inode *rinode, 2822 struct dentry *rdentry, struct inode *rdiri, 2823 const char *rpath, u64 rino, const char **ppath, 2824 int *pathlen, u64 *ino, bool *freepath, 2825 bool parent_locked) 2826 { 2827 struct ceph_client *cl = mdsc->fsc->client; 2828 int r = 0; 2829 2830 if (rinode) { 2831 r = build_inode_path(rinode, ppath, pathlen, ino, freepath); 2832 doutc(cl, " inode %p %llx.%llx\n", rinode, ceph_ino(rinode), 2833 ceph_snap(rinode)); 2834 } else if (rdentry) { 2835 r = build_dentry_path(mdsc, rdentry, rdiri, ppath, pathlen, ino, 2836 freepath, parent_locked); 2837 doutc(cl, " dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, *ppath); 2838 } else if (rpath || rino) { 2839 *ino = rino; 2840 *ppath = rpath; 2841 *pathlen = rpath ? strlen(rpath) : 0; 2842 doutc(cl, " path %.*s\n", *pathlen, rpath); 2843 } 2844 2845 return r; 2846 } 2847 2848 static void encode_mclientrequest_tail(void **p, 2849 const struct ceph_mds_request *req) 2850 { 2851 struct ceph_timespec ts; 2852 int i; 2853 2854 ceph_encode_timespec64(&ts, &req->r_stamp); 2855 ceph_encode_copy(p, &ts, sizeof(ts)); 2856 2857 /* v4: gid_list */ 2858 ceph_encode_32(p, req->r_cred->group_info->ngroups); 2859 for (i = 0; i < req->r_cred->group_info->ngroups; i++) 2860 ceph_encode_64(p, from_kgid(&init_user_ns, 2861 req->r_cred->group_info->gid[i])); 2862 2863 /* v5: altname */ 2864 ceph_encode_32(p, req->r_altname_len); 2865 ceph_encode_copy(p, req->r_altname, req->r_altname_len); 2866 2867 /* v6: fscrypt_auth and fscrypt_file */ 2868 if (req->r_fscrypt_auth) { 2869 u32 authlen = ceph_fscrypt_auth_len(req->r_fscrypt_auth); 2870 2871 ceph_encode_32(p, authlen); 2872 ceph_encode_copy(p, req->r_fscrypt_auth, authlen); 2873 } else { 2874 ceph_encode_32(p, 0); 2875 } 2876 if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags)) { 2877 ceph_encode_32(p, sizeof(__le64)); 2878 ceph_encode_64(p, req->r_fscrypt_file); 2879 } else { 2880 ceph_encode_32(p, 0); 2881 } 2882 } 2883 2884 static inline u16 mds_supported_head_version(struct ceph_mds_session *session) 2885 { 2886 if (!test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD, &session->s_features)) 2887 return 1; 2888 2889 if (!test_bit(CEPHFS_FEATURE_HAS_OWNER_UIDGID, &session->s_features)) 2890 return 2; 2891 2892 return CEPH_MDS_REQUEST_HEAD_VERSION; 2893 } 2894 2895 static struct ceph_mds_request_head_legacy * 2896 find_legacy_request_head(void *p, u64 features) 2897 { 2898 bool legacy = !(features & CEPH_FEATURE_FS_BTIME); 2899 struct ceph_mds_request_head_old *ohead; 2900 2901 if (legacy) 2902 return (struct ceph_mds_request_head_legacy *)p; 2903 ohead = (struct ceph_mds_request_head_old *)p; 2904 return (struct ceph_mds_request_head_legacy *)&ohead->oldest_client_tid; 2905 } 2906 2907 /* 2908 * called under mdsc->mutex 2909 */ 2910 static struct ceph_msg *create_request_message(struct ceph_mds_session *session, 2911 struct ceph_mds_request *req, 2912 bool drop_cap_releases) 2913 { 2914 int mds = session->s_mds; 2915 struct ceph_mds_client *mdsc = session->s_mdsc; 2916 struct ceph_client *cl = mdsc->fsc->client; 2917 struct ceph_msg *msg; 2918 struct ceph_mds_request_head_legacy *lhead; 2919 const char *path1 = NULL; 2920 const char *path2 = NULL; 2921 u64 ino1 = 0, ino2 = 0; 2922 int pathlen1 = 0, pathlen2 = 0; 2923 bool freepath1 = false, freepath2 = false; 2924 struct dentry *old_dentry = NULL; 2925 int len; 2926 u16 releases; 2927 void *p, *end; 2928 int ret; 2929 bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME); 2930 u16 request_head_version = mds_supported_head_version(session); 2931 kuid_t caller_fsuid = req->r_cred->fsuid; 2932 kgid_t caller_fsgid = req->r_cred->fsgid; 2933 2934 ret = set_request_path_attr(mdsc, req->r_inode, req->r_dentry, 2935 req->r_parent, req->r_path1, req->r_ino1.ino, 2936 &path1, &pathlen1, &ino1, &freepath1, 2937 test_bit(CEPH_MDS_R_PARENT_LOCKED, 2938 &req->r_req_flags)); 2939 if (ret < 0) { 2940 msg = ERR_PTR(ret); 2941 goto out; 2942 } 2943 2944 /* If r_old_dentry is set, then assume that its parent is locked */ 2945 if (req->r_old_dentry && 2946 !(req->r_old_dentry->d_flags & DCACHE_DISCONNECTED)) 2947 old_dentry = req->r_old_dentry; 2948 ret = set_request_path_attr(mdsc, NULL, old_dentry, 2949 req->r_old_dentry_dir, 2950 req->r_path2, req->r_ino2.ino, 2951 &path2, &pathlen2, &ino2, &freepath2, true); 2952 if (ret < 0) { 2953 msg = ERR_PTR(ret); 2954 goto out_free1; 2955 } 2956 2957 req->r_altname = get_fscrypt_altname(req, &req->r_altname_len); 2958 if (IS_ERR(req->r_altname)) { 2959 msg = ERR_CAST(req->r_altname); 2960 req->r_altname = NULL; 2961 goto out_free2; 2962 } 2963 2964 /* 2965 * For old cephs without supporting the 32bit retry/fwd feature 2966 * it will copy the raw memories directly when decoding the 2967 * requests. While new cephs will decode the head depending the 2968 * version member, so we need to make sure it will be compatible 2969 * with them both. 2970 */ 2971 if (legacy) 2972 len = sizeof(struct ceph_mds_request_head_legacy); 2973 else if (request_head_version == 1) 2974 len = sizeof(struct ceph_mds_request_head_old); 2975 else if (request_head_version == 2) 2976 len = offsetofend(struct ceph_mds_request_head, ext_num_fwd); 2977 else 2978 len = sizeof(struct ceph_mds_request_head); 2979 2980 /* filepaths */ 2981 len += 2 * (1 + sizeof(u32) + sizeof(u64)); 2982 len += pathlen1 + pathlen2; 2983 2984 /* cap releases */ 2985 len += sizeof(struct ceph_mds_request_release) * 2986 (!!req->r_inode_drop + !!req->r_dentry_drop + 2987 !!req->r_old_inode_drop + !!req->r_old_dentry_drop); 2988 2989 if (req->r_dentry_drop) 2990 len += pathlen1; 2991 if (req->r_old_dentry_drop) 2992 len += pathlen2; 2993 2994 /* MClientRequest tail */ 2995 2996 /* req->r_stamp */ 2997 len += sizeof(struct ceph_timespec); 2998 2999 /* gid list */ 3000 len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups); 3001 3002 /* alternate name */ 3003 len += sizeof(u32) + req->r_altname_len; 3004 3005 /* fscrypt_auth */ 3006 len += sizeof(u32); // fscrypt_auth 3007 if (req->r_fscrypt_auth) 3008 len += ceph_fscrypt_auth_len(req->r_fscrypt_auth); 3009 3010 /* fscrypt_file */ 3011 len += sizeof(u32); 3012 if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags)) 3013 len += sizeof(__le64); 3014 3015 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false); 3016 if (!msg) { 3017 msg = ERR_PTR(-ENOMEM); 3018 goto out_free2; 3019 } 3020 3021 msg->hdr.tid = cpu_to_le64(req->r_tid); 3022 3023 lhead = find_legacy_request_head(msg->front.iov_base, 3024 session->s_con.peer_features); 3025 3026 if ((req->r_mnt_idmap != &nop_mnt_idmap) && 3027 !test_bit(CEPHFS_FEATURE_HAS_OWNER_UIDGID, &session->s_features)) { 3028 WARN_ON_ONCE(!IS_CEPH_MDS_OP_NEWINODE(req->r_op)); 3029 3030 if (enable_unsafe_idmap) { 3031 pr_warn_once_client(cl, 3032 "idmapped mount is used and CEPHFS_FEATURE_HAS_OWNER_UIDGID" 3033 " is not supported by MDS. UID/GID-based restrictions may" 3034 " not work properly.\n"); 3035 3036 caller_fsuid = from_vfsuid(req->r_mnt_idmap, &init_user_ns, 3037 VFSUIDT_INIT(req->r_cred->fsuid)); 3038 caller_fsgid = from_vfsgid(req->r_mnt_idmap, &init_user_ns, 3039 VFSGIDT_INIT(req->r_cred->fsgid)); 3040 } else { 3041 pr_err_ratelimited_client(cl, 3042 "idmapped mount is used and CEPHFS_FEATURE_HAS_OWNER_UIDGID" 3043 " is not supported by MDS. Fail request with -EIO.\n"); 3044 3045 ret = -EIO; 3046 goto out_err; 3047 } 3048 } 3049 3050 /* 3051 * The ceph_mds_request_head_legacy didn't contain a version field, and 3052 * one was added when we moved the message version from 3->4. 3053 */ 3054 if (legacy) { 3055 msg->hdr.version = cpu_to_le16(3); 3056 p = msg->front.iov_base + sizeof(*lhead); 3057 } else if (request_head_version == 1) { 3058 struct ceph_mds_request_head_old *ohead = msg->front.iov_base; 3059 3060 msg->hdr.version = cpu_to_le16(4); 3061 ohead->version = cpu_to_le16(1); 3062 p = msg->front.iov_base + sizeof(*ohead); 3063 } else if (request_head_version == 2) { 3064 struct ceph_mds_request_head *nhead = msg->front.iov_base; 3065 3066 msg->hdr.version = cpu_to_le16(6); 3067 nhead->version = cpu_to_le16(2); 3068 3069 p = msg->front.iov_base + offsetofend(struct ceph_mds_request_head, ext_num_fwd); 3070 } else { 3071 struct ceph_mds_request_head *nhead = msg->front.iov_base; 3072 kuid_t owner_fsuid; 3073 kgid_t owner_fsgid; 3074 3075 msg->hdr.version = cpu_to_le16(6); 3076 nhead->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION); 3077 nhead->struct_len = cpu_to_le32(sizeof(struct ceph_mds_request_head)); 3078 3079 if (IS_CEPH_MDS_OP_NEWINODE(req->r_op)) { 3080 owner_fsuid = from_vfsuid(req->r_mnt_idmap, &init_user_ns, 3081 VFSUIDT_INIT(req->r_cred->fsuid)); 3082 owner_fsgid = from_vfsgid(req->r_mnt_idmap, &init_user_ns, 3083 VFSGIDT_INIT(req->r_cred->fsgid)); 3084 nhead->owner_uid = cpu_to_le32(from_kuid(&init_user_ns, owner_fsuid)); 3085 nhead->owner_gid = cpu_to_le32(from_kgid(&init_user_ns, owner_fsgid)); 3086 } else { 3087 nhead->owner_uid = cpu_to_le32(-1); 3088 nhead->owner_gid = cpu_to_le32(-1); 3089 } 3090 3091 p = msg->front.iov_base + sizeof(*nhead); 3092 } 3093 3094 end = msg->front.iov_base + msg->front.iov_len; 3095 3096 lhead->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); 3097 lhead->op = cpu_to_le32(req->r_op); 3098 lhead->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, 3099 caller_fsuid)); 3100 lhead->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, 3101 caller_fsgid)); 3102 lhead->ino = cpu_to_le64(req->r_deleg_ino); 3103 lhead->args = req->r_args; 3104 3105 ceph_encode_filepath(&p, end, ino1, path1); 3106 ceph_encode_filepath(&p, end, ino2, path2); 3107 3108 /* make note of release offset, in case we need to replay */ 3109 req->r_request_release_offset = p - msg->front.iov_base; 3110 3111 /* cap releases */ 3112 releases = 0; 3113 if (req->r_inode_drop) 3114 releases += ceph_encode_inode_release(&p, 3115 req->r_inode ? req->r_inode : d_inode(req->r_dentry), 3116 mds, req->r_inode_drop, req->r_inode_unless, 3117 req->r_op == CEPH_MDS_OP_READDIR); 3118 if (req->r_dentry_drop) { 3119 ret = ceph_encode_dentry_release(&p, req->r_dentry, 3120 req->r_parent, mds, req->r_dentry_drop, 3121 req->r_dentry_unless); 3122 if (ret < 0) 3123 goto out_err; 3124 releases += ret; 3125 } 3126 if (req->r_old_dentry_drop) { 3127 ret = ceph_encode_dentry_release(&p, req->r_old_dentry, 3128 req->r_old_dentry_dir, mds, 3129 req->r_old_dentry_drop, 3130 req->r_old_dentry_unless); 3131 if (ret < 0) 3132 goto out_err; 3133 releases += ret; 3134 } 3135 if (req->r_old_inode_drop) 3136 releases += ceph_encode_inode_release(&p, 3137 d_inode(req->r_old_dentry), 3138 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); 3139 3140 if (drop_cap_releases) { 3141 releases = 0; 3142 p = msg->front.iov_base + req->r_request_release_offset; 3143 } 3144 3145 lhead->num_releases = cpu_to_le16(releases); 3146 3147 encode_mclientrequest_tail(&p, req); 3148 3149 if (WARN_ON_ONCE(p > end)) { 3150 ceph_msg_put(msg); 3151 msg = ERR_PTR(-ERANGE); 3152 goto out_free2; 3153 } 3154 3155 msg->front.iov_len = p - msg->front.iov_base; 3156 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 3157 3158 if (req->r_pagelist) { 3159 struct ceph_pagelist *pagelist = req->r_pagelist; 3160 ceph_msg_data_add_pagelist(msg, pagelist); 3161 msg->hdr.data_len = cpu_to_le32(pagelist->length); 3162 } else { 3163 msg->hdr.data_len = 0; 3164 } 3165 3166 msg->hdr.data_off = cpu_to_le16(0); 3167 3168 out_free2: 3169 if (freepath2) 3170 ceph_mdsc_free_path((char *)path2, pathlen2); 3171 out_free1: 3172 if (freepath1) 3173 ceph_mdsc_free_path((char *)path1, pathlen1); 3174 out: 3175 return msg; 3176 out_err: 3177 ceph_msg_put(msg); 3178 msg = ERR_PTR(ret); 3179 goto out_free2; 3180 } 3181 3182 /* 3183 * called under mdsc->mutex if error, under no mutex if 3184 * success. 3185 */ 3186 static void complete_request(struct ceph_mds_client *mdsc, 3187 struct ceph_mds_request *req) 3188 { 3189 req->r_end_latency = ktime_get(); 3190 3191 if (req->r_callback) 3192 req->r_callback(mdsc, req); 3193 complete_all(&req->r_completion); 3194 } 3195 3196 /* 3197 * called under mdsc->mutex 3198 */ 3199 static int __prepare_send_request(struct ceph_mds_session *session, 3200 struct ceph_mds_request *req, 3201 bool drop_cap_releases) 3202 { 3203 int mds = session->s_mds; 3204 struct ceph_mds_client *mdsc = session->s_mdsc; 3205 struct ceph_client *cl = mdsc->fsc->client; 3206 struct ceph_mds_request_head_legacy *lhead; 3207 struct ceph_mds_request_head *nhead; 3208 struct ceph_msg *msg; 3209 int flags = 0, old_max_retry; 3210 bool old_version = !test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD, 3211 &session->s_features); 3212 3213 /* 3214 * Avoid inifinite retrying after overflow. The client will 3215 * increase the retry count and if the MDS is old version, 3216 * so we limit to retry at most 256 times. 3217 */ 3218 if (req->r_attempts) { 3219 old_max_retry = sizeof_field(struct ceph_mds_request_head_old, 3220 num_retry); 3221 old_max_retry = 1 << (old_max_retry * BITS_PER_BYTE); 3222 if ((old_version && req->r_attempts >= old_max_retry) || 3223 ((uint32_t)req->r_attempts >= U32_MAX)) { 3224 pr_warn_ratelimited_client(cl, "request tid %llu seq overflow\n", 3225 req->r_tid); 3226 return -EMULTIHOP; 3227 } 3228 } 3229 3230 req->r_attempts++; 3231 if (req->r_inode) { 3232 struct ceph_cap *cap = 3233 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); 3234 3235 if (cap) 3236 req->r_sent_on_mseq = cap->mseq; 3237 else 3238 req->r_sent_on_mseq = -1; 3239 } 3240 doutc(cl, "%p tid %lld %s (attempt %d)\n", req, req->r_tid, 3241 ceph_mds_op_name(req->r_op), req->r_attempts); 3242 3243 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3244 void *p; 3245 3246 /* 3247 * Replay. Do not regenerate message (and rebuild 3248 * paths, etc.); just use the original message. 3249 * Rebuilding paths will break for renames because 3250 * d_move mangles the src name. 3251 */ 3252 msg = req->r_request; 3253 lhead = find_legacy_request_head(msg->front.iov_base, 3254 session->s_con.peer_features); 3255 3256 flags = le32_to_cpu(lhead->flags); 3257 flags |= CEPH_MDS_FLAG_REPLAY; 3258 lhead->flags = cpu_to_le32(flags); 3259 3260 if (req->r_target_inode) 3261 lhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); 3262 3263 lhead->num_retry = req->r_attempts - 1; 3264 if (!old_version) { 3265 nhead = (struct ceph_mds_request_head*)msg->front.iov_base; 3266 nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1); 3267 } 3268 3269 /* remove cap/dentry releases from message */ 3270 lhead->num_releases = 0; 3271 3272 p = msg->front.iov_base + req->r_request_release_offset; 3273 encode_mclientrequest_tail(&p, req); 3274 3275 msg->front.iov_len = p - msg->front.iov_base; 3276 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 3277 return 0; 3278 } 3279 3280 if (req->r_request) { 3281 ceph_msg_put(req->r_request); 3282 req->r_request = NULL; 3283 } 3284 msg = create_request_message(session, req, drop_cap_releases); 3285 if (IS_ERR(msg)) { 3286 req->r_err = PTR_ERR(msg); 3287 return PTR_ERR(msg); 3288 } 3289 req->r_request = msg; 3290 3291 lhead = find_legacy_request_head(msg->front.iov_base, 3292 session->s_con.peer_features); 3293 lhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 3294 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3295 flags |= CEPH_MDS_FLAG_REPLAY; 3296 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) 3297 flags |= CEPH_MDS_FLAG_ASYNC; 3298 if (req->r_parent) 3299 flags |= CEPH_MDS_FLAG_WANT_DENTRY; 3300 lhead->flags = cpu_to_le32(flags); 3301 lhead->num_fwd = req->r_num_fwd; 3302 lhead->num_retry = req->r_attempts - 1; 3303 if (!old_version) { 3304 nhead = (struct ceph_mds_request_head*)msg->front.iov_base; 3305 nhead->ext_num_fwd = cpu_to_le32(req->r_num_fwd); 3306 nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1); 3307 } 3308 3309 doutc(cl, " r_parent = %p\n", req->r_parent); 3310 return 0; 3311 } 3312 3313 /* 3314 * called under mdsc->mutex 3315 */ 3316 static int __send_request(struct ceph_mds_session *session, 3317 struct ceph_mds_request *req, 3318 bool drop_cap_releases) 3319 { 3320 int err; 3321 3322 err = __prepare_send_request(session, req, drop_cap_releases); 3323 if (!err) { 3324 ceph_msg_get(req->r_request); 3325 ceph_con_send(&session->s_con, req->r_request); 3326 } 3327 3328 return err; 3329 } 3330 3331 /* 3332 * send request, or put it on the appropriate wait list. 3333 */ 3334 static void __do_request(struct ceph_mds_client *mdsc, 3335 struct ceph_mds_request *req) 3336 { 3337 struct ceph_client *cl = mdsc->fsc->client; 3338 struct ceph_mds_session *session = NULL; 3339 int mds = -1; 3340 int err = 0; 3341 bool random; 3342 3343 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 3344 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) 3345 __unregister_request(mdsc, req); 3346 return; 3347 } 3348 3349 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) { 3350 doutc(cl, "metadata corrupted\n"); 3351 err = -EIO; 3352 goto finish; 3353 } 3354 if (req->r_timeout && 3355 time_after_eq(jiffies, req->r_started + req->r_timeout)) { 3356 doutc(cl, "timed out\n"); 3357 err = -ETIMEDOUT; 3358 goto finish; 3359 } 3360 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 3361 doutc(cl, "forced umount\n"); 3362 err = -EIO; 3363 goto finish; 3364 } 3365 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) { 3366 if (mdsc->mdsmap_err) { 3367 err = mdsc->mdsmap_err; 3368 doutc(cl, "mdsmap err %d\n", err); 3369 goto finish; 3370 } 3371 if (mdsc->mdsmap->m_epoch == 0) { 3372 doutc(cl, "no mdsmap, waiting for map\n"); 3373 list_add(&req->r_wait, &mdsc->waiting_for_map); 3374 return; 3375 } 3376 if (!(mdsc->fsc->mount_options->flags & 3377 CEPH_MOUNT_OPT_MOUNTWAIT) && 3378 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) { 3379 err = -EHOSTUNREACH; 3380 goto finish; 3381 } 3382 } 3383 3384 put_request_session(req); 3385 3386 mds = __choose_mds(mdsc, req, &random); 3387 if (mds < 0 || 3388 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { 3389 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 3390 err = -EJUKEBOX; 3391 goto finish; 3392 } 3393 doutc(cl, "no mds or not active, waiting for map\n"); 3394 list_add(&req->r_wait, &mdsc->waiting_for_map); 3395 return; 3396 } 3397 3398 /* get, open session */ 3399 session = __ceph_lookup_mds_session(mdsc, mds); 3400 if (!session) { 3401 session = register_session(mdsc, mds); 3402 if (IS_ERR(session)) { 3403 err = PTR_ERR(session); 3404 goto finish; 3405 } 3406 } 3407 req->r_session = ceph_get_mds_session(session); 3408 3409 doutc(cl, "mds%d session %p state %s\n", mds, session, 3410 ceph_session_state_name(session->s_state)); 3411 3412 /* 3413 * The old ceph will crash the MDSs when see unknown OPs 3414 */ 3415 if (req->r_feature_needed > 0 && 3416 !test_bit(req->r_feature_needed, &session->s_features)) { 3417 err = -EOPNOTSUPP; 3418 goto out_session; 3419 } 3420 3421 if (session->s_state != CEPH_MDS_SESSION_OPEN && 3422 session->s_state != CEPH_MDS_SESSION_HUNG) { 3423 /* 3424 * We cannot queue async requests since the caps and delegated 3425 * inodes are bound to the session. Just return -EJUKEBOX and 3426 * let the caller retry a sync request in that case. 3427 */ 3428 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 3429 err = -EJUKEBOX; 3430 goto out_session; 3431 } 3432 3433 /* 3434 * If the session has been REJECTED, then return a hard error, 3435 * unless it's a CLEANRECOVER mount, in which case we'll queue 3436 * it to the mdsc queue. 3437 */ 3438 if (session->s_state == CEPH_MDS_SESSION_REJECTED) { 3439 if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER)) 3440 list_add(&req->r_wait, &mdsc->waiting_for_map); 3441 else 3442 err = -EACCES; 3443 goto out_session; 3444 } 3445 3446 if (session->s_state == CEPH_MDS_SESSION_NEW || 3447 session->s_state == CEPH_MDS_SESSION_CLOSING) { 3448 err = __open_session(mdsc, session); 3449 if (err) 3450 goto out_session; 3451 /* retry the same mds later */ 3452 if (random) 3453 req->r_resend_mds = mds; 3454 } 3455 list_add(&req->r_wait, &session->s_waiting); 3456 goto out_session; 3457 } 3458 3459 /* send request */ 3460 req->r_resend_mds = -1; /* forget any previous mds hint */ 3461 3462 if (req->r_request_started == 0) /* note request start time */ 3463 req->r_request_started = jiffies; 3464 3465 /* 3466 * For async create we will choose the auth MDS of frag in parent 3467 * directory to send the request and ususally this works fine, but 3468 * if the migrated the dirtory to another MDS before it could handle 3469 * it the request will be forwarded. 3470 * 3471 * And then the auth cap will be changed. 3472 */ 3473 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags) && req->r_num_fwd) { 3474 struct ceph_dentry_info *di = ceph_dentry(req->r_dentry); 3475 struct ceph_inode_info *ci; 3476 struct ceph_cap *cap; 3477 3478 /* 3479 * The request maybe handled very fast and the new inode 3480 * hasn't been linked to the dentry yet. We need to wait 3481 * for the ceph_finish_async_create(), which shouldn't be 3482 * stuck too long or fail in thoery, to finish when forwarding 3483 * the request. 3484 */ 3485 if (!d_inode(req->r_dentry)) { 3486 err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_CREATE_BIT, 3487 TASK_KILLABLE); 3488 if (err) { 3489 mutex_lock(&req->r_fill_mutex); 3490 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 3491 mutex_unlock(&req->r_fill_mutex); 3492 goto out_session; 3493 } 3494 } 3495 3496 ci = ceph_inode(d_inode(req->r_dentry)); 3497 3498 spin_lock(&ci->i_ceph_lock); 3499 cap = ci->i_auth_cap; 3500 if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE && mds != cap->mds) { 3501 doutc(cl, "session changed for auth cap %d -> %d\n", 3502 cap->session->s_mds, session->s_mds); 3503 3504 /* Remove the auth cap from old session */ 3505 spin_lock(&cap->session->s_cap_lock); 3506 cap->session->s_nr_caps--; 3507 list_del_init(&cap->session_caps); 3508 spin_unlock(&cap->session->s_cap_lock); 3509 3510 /* Add the auth cap to the new session */ 3511 cap->mds = mds; 3512 cap->session = session; 3513 spin_lock(&session->s_cap_lock); 3514 session->s_nr_caps++; 3515 list_add_tail(&cap->session_caps, &session->s_caps); 3516 spin_unlock(&session->s_cap_lock); 3517 3518 change_auth_cap_ses(ci, session); 3519 } 3520 spin_unlock(&ci->i_ceph_lock); 3521 } 3522 3523 err = __send_request(session, req, false); 3524 3525 out_session: 3526 ceph_put_mds_session(session); 3527 finish: 3528 if (err) { 3529 doutc(cl, "early error %d\n", err); 3530 req->r_err = err; 3531 complete_request(mdsc, req); 3532 __unregister_request(mdsc, req); 3533 } 3534 return; 3535 } 3536 3537 /* 3538 * called under mdsc->mutex 3539 */ 3540 static void __wake_requests(struct ceph_mds_client *mdsc, 3541 struct list_head *head) 3542 { 3543 struct ceph_client *cl = mdsc->fsc->client; 3544 struct ceph_mds_request *req; 3545 LIST_HEAD(tmp_list); 3546 3547 list_splice_init(head, &tmp_list); 3548 3549 while (!list_empty(&tmp_list)) { 3550 req = list_entry(tmp_list.next, 3551 struct ceph_mds_request, r_wait); 3552 list_del_init(&req->r_wait); 3553 doutc(cl, " wake request %p tid %llu\n", req, 3554 req->r_tid); 3555 __do_request(mdsc, req); 3556 } 3557 } 3558 3559 /* 3560 * Wake up threads with requests pending for @mds, so that they can 3561 * resubmit their requests to a possibly different mds. 3562 */ 3563 static void kick_requests(struct ceph_mds_client *mdsc, int mds) 3564 { 3565 struct ceph_client *cl = mdsc->fsc->client; 3566 struct ceph_mds_request *req; 3567 struct rb_node *p = rb_first(&mdsc->request_tree); 3568 3569 doutc(cl, "kick_requests mds%d\n", mds); 3570 while (p) { 3571 req = rb_entry(p, struct ceph_mds_request, r_node); 3572 p = rb_next(p); 3573 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3574 continue; 3575 if (req->r_attempts > 0) 3576 continue; /* only new requests */ 3577 if (req->r_session && 3578 req->r_session->s_mds == mds) { 3579 doutc(cl, " kicking tid %llu\n", req->r_tid); 3580 list_del_init(&req->r_wait); 3581 __do_request(mdsc, req); 3582 } 3583 } 3584 } 3585 3586 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, 3587 struct ceph_mds_request *req) 3588 { 3589 struct ceph_client *cl = mdsc->fsc->client; 3590 int err = 0; 3591 3592 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */ 3593 if (req->r_inode) 3594 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 3595 if (req->r_parent) { 3596 struct ceph_inode_info *ci = ceph_inode(req->r_parent); 3597 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ? 3598 CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD; 3599 spin_lock(&ci->i_ceph_lock); 3600 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false); 3601 __ceph_touch_fmode(ci, mdsc, fmode); 3602 spin_unlock(&ci->i_ceph_lock); 3603 } 3604 if (req->r_old_dentry_dir) 3605 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 3606 CEPH_CAP_PIN); 3607 3608 if (req->r_inode) { 3609 err = ceph_wait_on_async_create(req->r_inode); 3610 if (err) { 3611 doutc(cl, "wait for async create returned: %d\n", err); 3612 return err; 3613 } 3614 } 3615 3616 if (!err && req->r_old_inode) { 3617 err = ceph_wait_on_async_create(req->r_old_inode); 3618 if (err) { 3619 doutc(cl, "wait for async create returned: %d\n", err); 3620 return err; 3621 } 3622 } 3623 3624 doutc(cl, "submit_request on %p for inode %p\n", req, dir); 3625 mutex_lock(&mdsc->mutex); 3626 __register_request(mdsc, req, dir); 3627 __do_request(mdsc, req); 3628 err = req->r_err; 3629 mutex_unlock(&mdsc->mutex); 3630 return err; 3631 } 3632 3633 int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc, 3634 struct ceph_mds_request *req, 3635 ceph_mds_request_wait_callback_t wait_func) 3636 { 3637 struct ceph_client *cl = mdsc->fsc->client; 3638 int err; 3639 3640 /* wait */ 3641 doutc(cl, "do_request waiting\n"); 3642 if (wait_func) { 3643 err = wait_func(mdsc, req); 3644 } else { 3645 long timeleft = wait_for_completion_killable_timeout( 3646 &req->r_completion, 3647 ceph_timeout_jiffies(req->r_timeout)); 3648 if (timeleft > 0) 3649 err = 0; 3650 else if (!timeleft) 3651 err = -ETIMEDOUT; /* timed out */ 3652 else 3653 err = timeleft; /* killed */ 3654 } 3655 doutc(cl, "do_request waited, got %d\n", err); 3656 mutex_lock(&mdsc->mutex); 3657 3658 /* only abort if we didn't race with a real reply */ 3659 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 3660 err = le32_to_cpu(req->r_reply_info.head->result); 3661 } else if (err < 0) { 3662 doutc(cl, "aborted request %lld with %d\n", req->r_tid, err); 3663 3664 /* 3665 * ensure we aren't running concurrently with 3666 * ceph_fill_trace or ceph_readdir_prepopulate, which 3667 * rely on locks (dir mutex) held by our caller. 3668 */ 3669 mutex_lock(&req->r_fill_mutex); 3670 req->r_err = err; 3671 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 3672 mutex_unlock(&req->r_fill_mutex); 3673 3674 if (req->r_parent && 3675 (req->r_op & CEPH_MDS_OP_WRITE)) 3676 ceph_invalidate_dir_request(req); 3677 } else { 3678 err = req->r_err; 3679 } 3680 3681 mutex_unlock(&mdsc->mutex); 3682 return err; 3683 } 3684 3685 /* 3686 * Synchrously perform an mds request. Take care of all of the 3687 * session setup, forwarding, retry details. 3688 */ 3689 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, 3690 struct inode *dir, 3691 struct ceph_mds_request *req) 3692 { 3693 struct ceph_client *cl = mdsc->fsc->client; 3694 int err; 3695 3696 doutc(cl, "do_request on %p\n", req); 3697 3698 /* issue */ 3699 err = ceph_mdsc_submit_request(mdsc, dir, req); 3700 if (!err) 3701 err = ceph_mdsc_wait_request(mdsc, req, NULL); 3702 doutc(cl, "do_request %p done, result %d\n", req, err); 3703 return err; 3704 } 3705 3706 /* 3707 * Invalidate dir's completeness, dentry lease state on an aborted MDS 3708 * namespace request. 3709 */ 3710 void ceph_invalidate_dir_request(struct ceph_mds_request *req) 3711 { 3712 struct inode *dir = req->r_parent; 3713 struct inode *old_dir = req->r_old_dentry_dir; 3714 struct ceph_client *cl = req->r_mdsc->fsc->client; 3715 3716 doutc(cl, "invalidate_dir_request %p %p (complete, lease(s))\n", 3717 dir, old_dir); 3718 3719 ceph_dir_clear_complete(dir); 3720 if (old_dir) 3721 ceph_dir_clear_complete(old_dir); 3722 if (req->r_dentry) 3723 ceph_invalidate_dentry_lease(req->r_dentry); 3724 if (req->r_old_dentry) 3725 ceph_invalidate_dentry_lease(req->r_old_dentry); 3726 } 3727 3728 /* 3729 * Handle mds reply. 3730 * 3731 * We take the session mutex and parse and process the reply immediately. 3732 * This preserves the logical ordering of replies, capabilities, etc., sent 3733 * by the MDS as they are applied to our local cache. 3734 */ 3735 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) 3736 { 3737 struct ceph_mds_client *mdsc = session->s_mdsc; 3738 struct ceph_client *cl = mdsc->fsc->client; 3739 struct ceph_mds_request *req; 3740 struct ceph_mds_reply_head *head = msg->front.iov_base; 3741 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 3742 struct ceph_snap_realm *realm; 3743 u64 tid; 3744 int err, result; 3745 int mds = session->s_mds; 3746 bool close_sessions = false; 3747 3748 if (msg->front.iov_len < sizeof(*head)) { 3749 pr_err_client(cl, "got corrupt (short) reply\n"); 3750 ceph_msg_dump(msg); 3751 return; 3752 } 3753 3754 /* get request, session */ 3755 tid = le64_to_cpu(msg->hdr.tid); 3756 mutex_lock(&mdsc->mutex); 3757 req = lookup_get_request(mdsc, tid); 3758 if (!req) { 3759 doutc(cl, "on unknown tid %llu\n", tid); 3760 mutex_unlock(&mdsc->mutex); 3761 return; 3762 } 3763 doutc(cl, "handle_reply %p\n", req); 3764 3765 /* correct session? */ 3766 if (req->r_session != session) { 3767 pr_err_client(cl, "got %llu on session mds%d not mds%d\n", 3768 tid, session->s_mds, 3769 req->r_session ? req->r_session->s_mds : -1); 3770 mutex_unlock(&mdsc->mutex); 3771 goto out; 3772 } 3773 3774 /* dup? */ 3775 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) || 3776 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) { 3777 pr_warn_client(cl, "got a dup %s reply on %llu from mds%d\n", 3778 head->safe ? "safe" : "unsafe", tid, mds); 3779 mutex_unlock(&mdsc->mutex); 3780 goto out; 3781 } 3782 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) { 3783 pr_warn_client(cl, "got unsafe after safe on %llu from mds%d\n", 3784 tid, mds); 3785 mutex_unlock(&mdsc->mutex); 3786 goto out; 3787 } 3788 3789 result = le32_to_cpu(head->result); 3790 3791 if (head->safe) { 3792 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags); 3793 __unregister_request(mdsc, req); 3794 3795 /* last request during umount? */ 3796 if (mdsc->stopping && !__get_oldest_req(mdsc)) 3797 complete_all(&mdsc->safe_umount_waiters); 3798 3799 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3800 /* 3801 * We already handled the unsafe response, now do the 3802 * cleanup. No need to examine the response; the MDS 3803 * doesn't include any result info in the safe 3804 * response. And even if it did, there is nothing 3805 * useful we could do with a revised return value. 3806 */ 3807 doutc(cl, "got safe reply %llu, mds%d\n", tid, mds); 3808 3809 mutex_unlock(&mdsc->mutex); 3810 goto out; 3811 } 3812 } else { 3813 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags); 3814 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 3815 } 3816 3817 doutc(cl, "tid %lld result %d\n", tid, result); 3818 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features)) 3819 err = parse_reply_info(session, msg, req, (u64)-1); 3820 else 3821 err = parse_reply_info(session, msg, req, 3822 session->s_con.peer_features); 3823 mutex_unlock(&mdsc->mutex); 3824 3825 /* Must find target inode outside of mutexes to avoid deadlocks */ 3826 rinfo = &req->r_reply_info; 3827 if ((err >= 0) && rinfo->head->is_target) { 3828 struct inode *in = xchg(&req->r_new_inode, NULL); 3829 struct ceph_vino tvino = { 3830 .ino = le64_to_cpu(rinfo->targeti.in->ino), 3831 .snap = le64_to_cpu(rinfo->targeti.in->snapid) 3832 }; 3833 3834 /* 3835 * If we ended up opening an existing inode, discard 3836 * r_new_inode 3837 */ 3838 if (req->r_op == CEPH_MDS_OP_CREATE && 3839 !req->r_reply_info.has_create_ino) { 3840 /* This should never happen on an async create */ 3841 WARN_ON_ONCE(req->r_deleg_ino); 3842 iput(in); 3843 in = NULL; 3844 } 3845 3846 in = ceph_get_inode(mdsc->fsc->sb, tvino, in); 3847 if (IS_ERR(in)) { 3848 err = PTR_ERR(in); 3849 mutex_lock(&session->s_mutex); 3850 goto out_err; 3851 } 3852 req->r_target_inode = in; 3853 } 3854 3855 mutex_lock(&session->s_mutex); 3856 if (err < 0) { 3857 pr_err_client(cl, "got corrupt reply mds%d(tid:%lld)\n", 3858 mds, tid); 3859 ceph_msg_dump(msg); 3860 goto out_err; 3861 } 3862 3863 /* snap trace */ 3864 realm = NULL; 3865 if (rinfo->snapblob_len) { 3866 down_write(&mdsc->snap_rwsem); 3867 err = ceph_update_snap_trace(mdsc, rinfo->snapblob, 3868 rinfo->snapblob + rinfo->snapblob_len, 3869 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP, 3870 &realm); 3871 if (err) { 3872 up_write(&mdsc->snap_rwsem); 3873 close_sessions = true; 3874 if (err == -EIO) 3875 ceph_msg_dump(msg); 3876 goto out_err; 3877 } 3878 downgrade_write(&mdsc->snap_rwsem); 3879 } else { 3880 down_read(&mdsc->snap_rwsem); 3881 } 3882 3883 /* insert trace into our cache */ 3884 mutex_lock(&req->r_fill_mutex); 3885 current->journal_info = req; 3886 err = ceph_fill_trace(mdsc->fsc->sb, req); 3887 if (err == 0) { 3888 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || 3889 req->r_op == CEPH_MDS_OP_LSSNAP)) 3890 err = ceph_readdir_prepopulate(req, req->r_session); 3891 } 3892 current->journal_info = NULL; 3893 mutex_unlock(&req->r_fill_mutex); 3894 3895 up_read(&mdsc->snap_rwsem); 3896 if (realm) 3897 ceph_put_snap_realm(mdsc, realm); 3898 3899 if (err == 0) { 3900 if (req->r_target_inode && 3901 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3902 struct ceph_inode_info *ci = 3903 ceph_inode(req->r_target_inode); 3904 spin_lock(&ci->i_unsafe_lock); 3905 list_add_tail(&req->r_unsafe_target_item, 3906 &ci->i_unsafe_iops); 3907 spin_unlock(&ci->i_unsafe_lock); 3908 } 3909 3910 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 3911 } 3912 out_err: 3913 mutex_lock(&mdsc->mutex); 3914 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3915 if (err) { 3916 req->r_err = err; 3917 } else { 3918 req->r_reply = ceph_msg_get(msg); 3919 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags); 3920 } 3921 } else { 3922 doutc(cl, "reply arrived after request %lld was aborted\n", tid); 3923 } 3924 mutex_unlock(&mdsc->mutex); 3925 3926 mutex_unlock(&session->s_mutex); 3927 3928 /* kick calling process */ 3929 complete_request(mdsc, req); 3930 3931 ceph_update_metadata_metrics(&mdsc->metric, req->r_start_latency, 3932 req->r_end_latency, err); 3933 out: 3934 ceph_mdsc_put_request(req); 3935 3936 /* Defer closing the sessions after s_mutex lock being released */ 3937 if (close_sessions) 3938 ceph_mdsc_close_sessions(mdsc); 3939 return; 3940 } 3941 3942 3943 3944 /* 3945 * handle mds notification that our request has been forwarded. 3946 */ 3947 static void handle_forward(struct ceph_mds_client *mdsc, 3948 struct ceph_mds_session *session, 3949 struct ceph_msg *msg) 3950 { 3951 struct ceph_client *cl = mdsc->fsc->client; 3952 struct ceph_mds_request *req; 3953 u64 tid = le64_to_cpu(msg->hdr.tid); 3954 u32 next_mds; 3955 u32 fwd_seq; 3956 int err = -EINVAL; 3957 void *p = msg->front.iov_base; 3958 void *end = p + msg->front.iov_len; 3959 bool aborted = false; 3960 3961 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 3962 next_mds = ceph_decode_32(&p); 3963 fwd_seq = ceph_decode_32(&p); 3964 3965 mutex_lock(&mdsc->mutex); 3966 req = lookup_get_request(mdsc, tid); 3967 if (!req) { 3968 mutex_unlock(&mdsc->mutex); 3969 doutc(cl, "forward tid %llu to mds%d - req dne\n", tid, next_mds); 3970 return; /* dup reply? */ 3971 } 3972 3973 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3974 doutc(cl, "forward tid %llu aborted, unregistering\n", tid); 3975 __unregister_request(mdsc, req); 3976 } else if (fwd_seq <= req->r_num_fwd || (uint32_t)fwd_seq >= U32_MAX) { 3977 /* 3978 * Avoid inifinite retrying after overflow. 3979 * 3980 * The MDS will increase the fwd count and in client side 3981 * if the num_fwd is less than the one saved in request 3982 * that means the MDS is an old version and overflowed of 3983 * 8 bits. 3984 */ 3985 mutex_lock(&req->r_fill_mutex); 3986 req->r_err = -EMULTIHOP; 3987 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 3988 mutex_unlock(&req->r_fill_mutex); 3989 aborted = true; 3990 pr_warn_ratelimited_client(cl, "forward tid %llu seq overflow\n", 3991 tid); 3992 } else { 3993 /* resend. forward race not possible; mds would drop */ 3994 doutc(cl, "forward tid %llu to mds%d (we resend)\n", tid, next_mds); 3995 BUG_ON(req->r_err); 3996 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)); 3997 req->r_attempts = 0; 3998 req->r_num_fwd = fwd_seq; 3999 req->r_resend_mds = next_mds; 4000 put_request_session(req); 4001 __do_request(mdsc, req); 4002 } 4003 mutex_unlock(&mdsc->mutex); 4004 4005 /* kick calling process */ 4006 if (aborted) 4007 complete_request(mdsc, req); 4008 ceph_mdsc_put_request(req); 4009 return; 4010 4011 bad: 4012 pr_err_client(cl, "decode error err=%d\n", err); 4013 ceph_msg_dump(msg); 4014 } 4015 4016 static int __decode_session_metadata(void **p, void *end, 4017 bool *blocklisted) 4018 { 4019 /* map<string,string> */ 4020 u32 n; 4021 bool err_str; 4022 ceph_decode_32_safe(p, end, n, bad); 4023 while (n-- > 0) { 4024 u32 len; 4025 ceph_decode_32_safe(p, end, len, bad); 4026 ceph_decode_need(p, end, len, bad); 4027 err_str = !strncmp(*p, "error_string", len); 4028 *p += len; 4029 ceph_decode_32_safe(p, end, len, bad); 4030 ceph_decode_need(p, end, len, bad); 4031 /* 4032 * Match "blocklisted (blacklisted)" from newer MDSes, 4033 * or "blacklisted" from older MDSes. 4034 */ 4035 if (err_str && strnstr(*p, "blacklisted", len)) 4036 *blocklisted = true; 4037 *p += len; 4038 } 4039 return 0; 4040 bad: 4041 return -1; 4042 } 4043 4044 /* 4045 * handle a mds session control message 4046 */ 4047 static void handle_session(struct ceph_mds_session *session, 4048 struct ceph_msg *msg) 4049 { 4050 struct ceph_mds_client *mdsc = session->s_mdsc; 4051 struct ceph_client *cl = mdsc->fsc->client; 4052 int mds = session->s_mds; 4053 int msg_version = le16_to_cpu(msg->hdr.version); 4054 void *p = msg->front.iov_base; 4055 void *end = p + msg->front.iov_len; 4056 struct ceph_mds_session_head *h; 4057 u32 op; 4058 u64 seq, features = 0; 4059 int wake = 0; 4060 bool blocklisted = false; 4061 4062 /* decode */ 4063 ceph_decode_need(&p, end, sizeof(*h), bad); 4064 h = p; 4065 p += sizeof(*h); 4066 4067 op = le32_to_cpu(h->op); 4068 seq = le64_to_cpu(h->seq); 4069 4070 if (msg_version >= 3) { 4071 u32 len; 4072 /* version >= 2 and < 5, decode metadata, skip otherwise 4073 * as it's handled via flags. 4074 */ 4075 if (msg_version >= 5) 4076 ceph_decode_skip_map(&p, end, string, string, bad); 4077 else if (__decode_session_metadata(&p, end, &blocklisted) < 0) 4078 goto bad; 4079 4080 /* version >= 3, feature bits */ 4081 ceph_decode_32_safe(&p, end, len, bad); 4082 if (len) { 4083 ceph_decode_64_safe(&p, end, features, bad); 4084 p += len - sizeof(features); 4085 } 4086 } 4087 4088 if (msg_version >= 5) { 4089 u32 flags, len; 4090 4091 /* version >= 4 */ 4092 ceph_decode_skip_16(&p, end, bad); /* struct_v, struct_cv */ 4093 ceph_decode_32_safe(&p, end, len, bad); /* len */ 4094 ceph_decode_skip_n(&p, end, len, bad); /* metric_spec */ 4095 4096 /* version >= 5, flags */ 4097 ceph_decode_32_safe(&p, end, flags, bad); 4098 if (flags & CEPH_SESSION_BLOCKLISTED) { 4099 pr_warn_client(cl, "mds%d session blocklisted\n", 4100 session->s_mds); 4101 blocklisted = true; 4102 } 4103 } 4104 4105 mutex_lock(&mdsc->mutex); 4106 if (op == CEPH_SESSION_CLOSE) { 4107 ceph_get_mds_session(session); 4108 __unregister_session(mdsc, session); 4109 } 4110 /* FIXME: this ttl calculation is generous */ 4111 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 4112 mutex_unlock(&mdsc->mutex); 4113 4114 mutex_lock(&session->s_mutex); 4115 4116 doutc(cl, "mds%d %s %p state %s seq %llu\n", mds, 4117 ceph_session_op_name(op), session, 4118 ceph_session_state_name(session->s_state), seq); 4119 4120 if (session->s_state == CEPH_MDS_SESSION_HUNG) { 4121 session->s_state = CEPH_MDS_SESSION_OPEN; 4122 pr_info_client(cl, "mds%d came back\n", session->s_mds); 4123 } 4124 4125 switch (op) { 4126 case CEPH_SESSION_OPEN: 4127 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 4128 pr_info_client(cl, "mds%d reconnect success\n", 4129 session->s_mds); 4130 4131 if (session->s_state == CEPH_MDS_SESSION_OPEN) { 4132 pr_notice_client(cl, "mds%d is already opened\n", 4133 session->s_mds); 4134 } else { 4135 session->s_state = CEPH_MDS_SESSION_OPEN; 4136 session->s_features = features; 4137 renewed_caps(mdsc, session, 0); 4138 if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, 4139 &session->s_features)) 4140 metric_schedule_delayed(&mdsc->metric); 4141 } 4142 4143 /* 4144 * The connection maybe broken and the session in client 4145 * side has been reinitialized, need to update the seq 4146 * anyway. 4147 */ 4148 if (!session->s_seq && seq) 4149 session->s_seq = seq; 4150 4151 wake = 1; 4152 if (mdsc->stopping) 4153 __close_session(mdsc, session); 4154 break; 4155 4156 case CEPH_SESSION_RENEWCAPS: 4157 if (session->s_renew_seq == seq) 4158 renewed_caps(mdsc, session, 1); 4159 break; 4160 4161 case CEPH_SESSION_CLOSE: 4162 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 4163 pr_info_client(cl, "mds%d reconnect denied\n", 4164 session->s_mds); 4165 session->s_state = CEPH_MDS_SESSION_CLOSED; 4166 cleanup_session_requests(mdsc, session); 4167 remove_session_caps(session); 4168 wake = 2; /* for good measure */ 4169 wake_up_all(&mdsc->session_close_wq); 4170 break; 4171 4172 case CEPH_SESSION_STALE: 4173 pr_info_client(cl, "mds%d caps went stale, renewing\n", 4174 session->s_mds); 4175 atomic_inc(&session->s_cap_gen); 4176 session->s_cap_ttl = jiffies - 1; 4177 send_renew_caps(mdsc, session); 4178 break; 4179 4180 case CEPH_SESSION_RECALL_STATE: 4181 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 4182 break; 4183 4184 case CEPH_SESSION_FLUSHMSG: 4185 /* flush cap releases */ 4186 spin_lock(&session->s_cap_lock); 4187 if (session->s_num_cap_releases) 4188 ceph_flush_cap_releases(mdsc, session); 4189 spin_unlock(&session->s_cap_lock); 4190 4191 send_flushmsg_ack(mdsc, session, seq); 4192 break; 4193 4194 case CEPH_SESSION_FORCE_RO: 4195 doutc(cl, "force_session_readonly %p\n", session); 4196 spin_lock(&session->s_cap_lock); 4197 session->s_readonly = true; 4198 spin_unlock(&session->s_cap_lock); 4199 wake_up_session_caps(session, FORCE_RO); 4200 break; 4201 4202 case CEPH_SESSION_REJECT: 4203 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING); 4204 pr_info_client(cl, "mds%d rejected session\n", 4205 session->s_mds); 4206 session->s_state = CEPH_MDS_SESSION_REJECTED; 4207 cleanup_session_requests(mdsc, session); 4208 remove_session_caps(session); 4209 if (blocklisted) 4210 mdsc->fsc->blocklisted = true; 4211 wake = 2; /* for good measure */ 4212 break; 4213 4214 default: 4215 pr_err_client(cl, "bad op %d mds%d\n", op, mds); 4216 WARN_ON(1); 4217 } 4218 4219 mutex_unlock(&session->s_mutex); 4220 if (wake) { 4221 mutex_lock(&mdsc->mutex); 4222 __wake_requests(mdsc, &session->s_waiting); 4223 if (wake == 2) 4224 kick_requests(mdsc, mds); 4225 mutex_unlock(&mdsc->mutex); 4226 } 4227 if (op == CEPH_SESSION_CLOSE) 4228 ceph_put_mds_session(session); 4229 return; 4230 4231 bad: 4232 pr_err_client(cl, "corrupt message mds%d len %d\n", mds, 4233 (int)msg->front.iov_len); 4234 ceph_msg_dump(msg); 4235 return; 4236 } 4237 4238 void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req) 4239 { 4240 struct ceph_client *cl = req->r_mdsc->fsc->client; 4241 int dcaps; 4242 4243 dcaps = xchg(&req->r_dir_caps, 0); 4244 if (dcaps) { 4245 doutc(cl, "releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 4246 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps); 4247 } 4248 } 4249 4250 void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req) 4251 { 4252 struct ceph_client *cl = req->r_mdsc->fsc->client; 4253 int dcaps; 4254 4255 dcaps = xchg(&req->r_dir_caps, 0); 4256 if (dcaps) { 4257 doutc(cl, "releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 4258 ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent), 4259 dcaps); 4260 } 4261 } 4262 4263 /* 4264 * called under session->mutex. 4265 */ 4266 static void replay_unsafe_requests(struct ceph_mds_client *mdsc, 4267 struct ceph_mds_session *session) 4268 { 4269 struct ceph_mds_request *req, *nreq; 4270 struct rb_node *p; 4271 4272 doutc(mdsc->fsc->client, "mds%d\n", session->s_mds); 4273 4274 mutex_lock(&mdsc->mutex); 4275 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) 4276 __send_request(session, req, true); 4277 4278 /* 4279 * also re-send old requests when MDS enters reconnect stage. So that MDS 4280 * can process completed request in clientreplay stage. 4281 */ 4282 p = rb_first(&mdsc->request_tree); 4283 while (p) { 4284 req = rb_entry(p, struct ceph_mds_request, r_node); 4285 p = rb_next(p); 4286 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 4287 continue; 4288 if (req->r_attempts == 0) 4289 continue; /* only old requests */ 4290 if (!req->r_session) 4291 continue; 4292 if (req->r_session->s_mds != session->s_mds) 4293 continue; 4294 4295 ceph_mdsc_release_dir_caps_no_check(req); 4296 4297 __send_request(session, req, true); 4298 } 4299 mutex_unlock(&mdsc->mutex); 4300 } 4301 4302 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state) 4303 { 4304 struct ceph_msg *reply; 4305 struct ceph_pagelist *_pagelist; 4306 struct page *page; 4307 __le32 *addr; 4308 int err = -ENOMEM; 4309 4310 if (!recon_state->allow_multi) 4311 return -ENOSPC; 4312 4313 /* can't handle message that contains both caps and realm */ 4314 BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms); 4315 4316 /* pre-allocate new pagelist */ 4317 _pagelist = ceph_pagelist_alloc(GFP_NOFS); 4318 if (!_pagelist) 4319 return -ENOMEM; 4320 4321 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 4322 if (!reply) 4323 goto fail_msg; 4324 4325 /* placeholder for nr_caps */ 4326 err = ceph_pagelist_encode_32(_pagelist, 0); 4327 if (err < 0) 4328 goto fail; 4329 4330 if (recon_state->nr_caps) { 4331 /* currently encoding caps */ 4332 err = ceph_pagelist_encode_32(recon_state->pagelist, 0); 4333 if (err) 4334 goto fail; 4335 } else { 4336 /* placeholder for nr_realms (currently encoding relams) */ 4337 err = ceph_pagelist_encode_32(_pagelist, 0); 4338 if (err < 0) 4339 goto fail; 4340 } 4341 4342 err = ceph_pagelist_encode_8(recon_state->pagelist, 1); 4343 if (err) 4344 goto fail; 4345 4346 page = list_first_entry(&recon_state->pagelist->head, struct page, lru); 4347 addr = kmap_atomic(page); 4348 if (recon_state->nr_caps) { 4349 /* currently encoding caps */ 4350 *addr = cpu_to_le32(recon_state->nr_caps); 4351 } else { 4352 /* currently encoding relams */ 4353 *(addr + 1) = cpu_to_le32(recon_state->nr_realms); 4354 } 4355 kunmap_atomic(addr); 4356 4357 reply->hdr.version = cpu_to_le16(5); 4358 reply->hdr.compat_version = cpu_to_le16(4); 4359 4360 reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length); 4361 ceph_msg_data_add_pagelist(reply, recon_state->pagelist); 4362 4363 ceph_con_send(&recon_state->session->s_con, reply); 4364 ceph_pagelist_release(recon_state->pagelist); 4365 4366 recon_state->pagelist = _pagelist; 4367 recon_state->nr_caps = 0; 4368 recon_state->nr_realms = 0; 4369 recon_state->msg_version = 5; 4370 return 0; 4371 fail: 4372 ceph_msg_put(reply); 4373 fail_msg: 4374 ceph_pagelist_release(_pagelist); 4375 return err; 4376 } 4377 4378 static struct dentry* d_find_primary(struct inode *inode) 4379 { 4380 struct dentry *alias, *dn = NULL; 4381 4382 if (hlist_empty(&inode->i_dentry)) 4383 return NULL; 4384 4385 spin_lock(&inode->i_lock); 4386 if (hlist_empty(&inode->i_dentry)) 4387 goto out_unlock; 4388 4389 if (S_ISDIR(inode->i_mode)) { 4390 alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias); 4391 if (!IS_ROOT(alias)) 4392 dn = dget(alias); 4393 goto out_unlock; 4394 } 4395 4396 hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) { 4397 spin_lock(&alias->d_lock); 4398 if (!d_unhashed(alias) && 4399 (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) { 4400 dn = dget_dlock(alias); 4401 } 4402 spin_unlock(&alias->d_lock); 4403 if (dn) 4404 break; 4405 } 4406 out_unlock: 4407 spin_unlock(&inode->i_lock); 4408 return dn; 4409 } 4410 4411 /* 4412 * Encode information about a cap for a reconnect with the MDS. 4413 */ 4414 static int reconnect_caps_cb(struct inode *inode, int mds, void *arg) 4415 { 4416 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); 4417 struct ceph_client *cl = ceph_inode_to_client(inode); 4418 union { 4419 struct ceph_mds_cap_reconnect v2; 4420 struct ceph_mds_cap_reconnect_v1 v1; 4421 } rec; 4422 struct ceph_inode_info *ci = ceph_inode(inode); 4423 struct ceph_reconnect_state *recon_state = arg; 4424 struct ceph_pagelist *pagelist = recon_state->pagelist; 4425 struct dentry *dentry; 4426 struct ceph_cap *cap; 4427 char *path; 4428 int pathlen = 0, err; 4429 u64 pathbase; 4430 u64 snap_follows; 4431 4432 dentry = d_find_primary(inode); 4433 if (dentry) { 4434 /* set pathbase to parent dir when msg_version >= 2 */ 4435 path = ceph_mdsc_build_path(mdsc, dentry, &pathlen, &pathbase, 4436 recon_state->msg_version >= 2); 4437 dput(dentry); 4438 if (IS_ERR(path)) { 4439 err = PTR_ERR(path); 4440 goto out_err; 4441 } 4442 } else { 4443 path = NULL; 4444 pathbase = 0; 4445 } 4446 4447 spin_lock(&ci->i_ceph_lock); 4448 cap = __get_cap_for_mds(ci, mds); 4449 if (!cap) { 4450 spin_unlock(&ci->i_ceph_lock); 4451 err = 0; 4452 goto out_err; 4453 } 4454 doutc(cl, " adding %p ino %llx.%llx cap %p %lld %s\n", inode, 4455 ceph_vinop(inode), cap, cap->cap_id, 4456 ceph_cap_string(cap->issued)); 4457 4458 cap->seq = 0; /* reset cap seq */ 4459 cap->issue_seq = 0; /* and issue_seq */ 4460 cap->mseq = 0; /* and migrate_seq */ 4461 cap->cap_gen = atomic_read(&cap->session->s_cap_gen); 4462 4463 /* These are lost when the session goes away */ 4464 if (S_ISDIR(inode->i_mode)) { 4465 if (cap->issued & CEPH_CAP_DIR_CREATE) { 4466 ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns)); 4467 memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout)); 4468 } 4469 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS; 4470 } 4471 4472 if (recon_state->msg_version >= 2) { 4473 rec.v2.cap_id = cpu_to_le64(cap->cap_id); 4474 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 4475 rec.v2.issued = cpu_to_le32(cap->issued); 4476 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 4477 rec.v2.pathbase = cpu_to_le64(pathbase); 4478 rec.v2.flock_len = (__force __le32) 4479 ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1); 4480 } else { 4481 struct timespec64 ts; 4482 4483 rec.v1.cap_id = cpu_to_le64(cap->cap_id); 4484 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 4485 rec.v1.issued = cpu_to_le32(cap->issued); 4486 rec.v1.size = cpu_to_le64(i_size_read(inode)); 4487 ts = inode_get_mtime(inode); 4488 ceph_encode_timespec64(&rec.v1.mtime, &ts); 4489 ts = inode_get_atime(inode); 4490 ceph_encode_timespec64(&rec.v1.atime, &ts); 4491 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 4492 rec.v1.pathbase = cpu_to_le64(pathbase); 4493 } 4494 4495 if (list_empty(&ci->i_cap_snaps)) { 4496 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0; 4497 } else { 4498 struct ceph_cap_snap *capsnap = 4499 list_first_entry(&ci->i_cap_snaps, 4500 struct ceph_cap_snap, ci_item); 4501 snap_follows = capsnap->follows; 4502 } 4503 spin_unlock(&ci->i_ceph_lock); 4504 4505 if (recon_state->msg_version >= 2) { 4506 int num_fcntl_locks, num_flock_locks; 4507 struct ceph_filelock *flocks = NULL; 4508 size_t struct_len, total_len = sizeof(u64); 4509 u8 struct_v = 0; 4510 4511 encode_again: 4512 if (rec.v2.flock_len) { 4513 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); 4514 } else { 4515 num_fcntl_locks = 0; 4516 num_flock_locks = 0; 4517 } 4518 if (num_fcntl_locks + num_flock_locks > 0) { 4519 flocks = kmalloc_array(num_fcntl_locks + num_flock_locks, 4520 sizeof(struct ceph_filelock), 4521 GFP_NOFS); 4522 if (!flocks) { 4523 err = -ENOMEM; 4524 goto out_err; 4525 } 4526 err = ceph_encode_locks_to_buffer(inode, flocks, 4527 num_fcntl_locks, 4528 num_flock_locks); 4529 if (err) { 4530 kfree(flocks); 4531 flocks = NULL; 4532 if (err == -ENOSPC) 4533 goto encode_again; 4534 goto out_err; 4535 } 4536 } else { 4537 kfree(flocks); 4538 flocks = NULL; 4539 } 4540 4541 if (recon_state->msg_version >= 3) { 4542 /* version, compat_version and struct_len */ 4543 total_len += 2 * sizeof(u8) + sizeof(u32); 4544 struct_v = 2; 4545 } 4546 /* 4547 * number of encoded locks is stable, so copy to pagelist 4548 */ 4549 struct_len = 2 * sizeof(u32) + 4550 (num_fcntl_locks + num_flock_locks) * 4551 sizeof(struct ceph_filelock); 4552 rec.v2.flock_len = cpu_to_le32(struct_len); 4553 4554 struct_len += sizeof(u32) + pathlen + sizeof(rec.v2); 4555 4556 if (struct_v >= 2) 4557 struct_len += sizeof(u64); /* snap_follows */ 4558 4559 total_len += struct_len; 4560 4561 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) { 4562 err = send_reconnect_partial(recon_state); 4563 if (err) 4564 goto out_freeflocks; 4565 pagelist = recon_state->pagelist; 4566 } 4567 4568 err = ceph_pagelist_reserve(pagelist, total_len); 4569 if (err) 4570 goto out_freeflocks; 4571 4572 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 4573 if (recon_state->msg_version >= 3) { 4574 ceph_pagelist_encode_8(pagelist, struct_v); 4575 ceph_pagelist_encode_8(pagelist, 1); 4576 ceph_pagelist_encode_32(pagelist, struct_len); 4577 } 4578 ceph_pagelist_encode_string(pagelist, path, pathlen); 4579 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2)); 4580 ceph_locks_to_pagelist(flocks, pagelist, 4581 num_fcntl_locks, num_flock_locks); 4582 if (struct_v >= 2) 4583 ceph_pagelist_encode_64(pagelist, snap_follows); 4584 out_freeflocks: 4585 kfree(flocks); 4586 } else { 4587 err = ceph_pagelist_reserve(pagelist, 4588 sizeof(u64) + sizeof(u32) + 4589 pathlen + sizeof(rec.v1)); 4590 if (err) 4591 goto out_err; 4592 4593 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 4594 ceph_pagelist_encode_string(pagelist, path, pathlen); 4595 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); 4596 } 4597 4598 out_err: 4599 ceph_mdsc_free_path(path, pathlen); 4600 if (!err) 4601 recon_state->nr_caps++; 4602 return err; 4603 } 4604 4605 static int encode_snap_realms(struct ceph_mds_client *mdsc, 4606 struct ceph_reconnect_state *recon_state) 4607 { 4608 struct rb_node *p; 4609 struct ceph_pagelist *pagelist = recon_state->pagelist; 4610 struct ceph_client *cl = mdsc->fsc->client; 4611 int err = 0; 4612 4613 if (recon_state->msg_version >= 4) { 4614 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms); 4615 if (err < 0) 4616 goto fail; 4617 } 4618 4619 /* 4620 * snaprealms. we provide mds with the ino, seq (version), and 4621 * parent for all of our realms. If the mds has any newer info, 4622 * it will tell us. 4623 */ 4624 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { 4625 struct ceph_snap_realm *realm = 4626 rb_entry(p, struct ceph_snap_realm, node); 4627 struct ceph_mds_snaprealm_reconnect sr_rec; 4628 4629 if (recon_state->msg_version >= 4) { 4630 size_t need = sizeof(u8) * 2 + sizeof(u32) + 4631 sizeof(sr_rec); 4632 4633 if (pagelist->length + need > RECONNECT_MAX_SIZE) { 4634 err = send_reconnect_partial(recon_state); 4635 if (err) 4636 goto fail; 4637 pagelist = recon_state->pagelist; 4638 } 4639 4640 err = ceph_pagelist_reserve(pagelist, need); 4641 if (err) 4642 goto fail; 4643 4644 ceph_pagelist_encode_8(pagelist, 1); 4645 ceph_pagelist_encode_8(pagelist, 1); 4646 ceph_pagelist_encode_32(pagelist, sizeof(sr_rec)); 4647 } 4648 4649 doutc(cl, " adding snap realm %llx seq %lld parent %llx\n", 4650 realm->ino, realm->seq, realm->parent_ino); 4651 sr_rec.ino = cpu_to_le64(realm->ino); 4652 sr_rec.seq = cpu_to_le64(realm->seq); 4653 sr_rec.parent = cpu_to_le64(realm->parent_ino); 4654 4655 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); 4656 if (err) 4657 goto fail; 4658 4659 recon_state->nr_realms++; 4660 } 4661 fail: 4662 return err; 4663 } 4664 4665 4666 /* 4667 * If an MDS fails and recovers, clients need to reconnect in order to 4668 * reestablish shared state. This includes all caps issued through 4669 * this session _and_ the snap_realm hierarchy. Because it's not 4670 * clear which snap realms the mds cares about, we send everything we 4671 * know about.. that ensures we'll then get any new info the 4672 * recovering MDS might have. 4673 * 4674 * This is a relatively heavyweight operation, but it's rare. 4675 */ 4676 static void send_mds_reconnect(struct ceph_mds_client *mdsc, 4677 struct ceph_mds_session *session) 4678 { 4679 struct ceph_client *cl = mdsc->fsc->client; 4680 struct ceph_msg *reply; 4681 int mds = session->s_mds; 4682 int err = -ENOMEM; 4683 struct ceph_reconnect_state recon_state = { 4684 .session = session, 4685 }; 4686 LIST_HEAD(dispose); 4687 4688 pr_info_client(cl, "mds%d reconnect start\n", mds); 4689 4690 recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS); 4691 if (!recon_state.pagelist) 4692 goto fail_nopagelist; 4693 4694 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 4695 if (!reply) 4696 goto fail_nomsg; 4697 4698 xa_destroy(&session->s_delegated_inos); 4699 4700 mutex_lock(&session->s_mutex); 4701 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 4702 session->s_seq = 0; 4703 4704 doutc(cl, "session %p state %s\n", session, 4705 ceph_session_state_name(session->s_state)); 4706 4707 atomic_inc(&session->s_cap_gen); 4708 4709 spin_lock(&session->s_cap_lock); 4710 /* don't know if session is readonly */ 4711 session->s_readonly = 0; 4712 /* 4713 * notify __ceph_remove_cap() that we are composing cap reconnect. 4714 * If a cap get released before being added to the cap reconnect, 4715 * __ceph_remove_cap() should skip queuing cap release. 4716 */ 4717 session->s_cap_reconnect = 1; 4718 /* drop old cap expires; we're about to reestablish that state */ 4719 detach_cap_releases(session, &dispose); 4720 spin_unlock(&session->s_cap_lock); 4721 dispose_cap_releases(mdsc, &dispose); 4722 4723 /* trim unused caps to reduce MDS's cache rejoin time */ 4724 if (mdsc->fsc->sb->s_root) 4725 shrink_dcache_parent(mdsc->fsc->sb->s_root); 4726 4727 ceph_con_close(&session->s_con); 4728 ceph_con_open(&session->s_con, 4729 CEPH_ENTITY_TYPE_MDS, mds, 4730 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 4731 4732 /* replay unsafe requests */ 4733 replay_unsafe_requests(mdsc, session); 4734 4735 ceph_early_kick_flushing_caps(mdsc, session); 4736 4737 down_read(&mdsc->snap_rwsem); 4738 4739 /* placeholder for nr_caps */ 4740 err = ceph_pagelist_encode_32(recon_state.pagelist, 0); 4741 if (err) 4742 goto fail; 4743 4744 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) { 4745 recon_state.msg_version = 3; 4746 recon_state.allow_multi = true; 4747 } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) { 4748 recon_state.msg_version = 3; 4749 } else { 4750 recon_state.msg_version = 2; 4751 } 4752 /* trsaverse this session's caps */ 4753 err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state); 4754 4755 spin_lock(&session->s_cap_lock); 4756 session->s_cap_reconnect = 0; 4757 spin_unlock(&session->s_cap_lock); 4758 4759 if (err < 0) 4760 goto fail; 4761 4762 /* check if all realms can be encoded into current message */ 4763 if (mdsc->num_snap_realms) { 4764 size_t total_len = 4765 recon_state.pagelist->length + 4766 mdsc->num_snap_realms * 4767 sizeof(struct ceph_mds_snaprealm_reconnect); 4768 if (recon_state.msg_version >= 4) { 4769 /* number of realms */ 4770 total_len += sizeof(u32); 4771 /* version, compat_version and struct_len */ 4772 total_len += mdsc->num_snap_realms * 4773 (2 * sizeof(u8) + sizeof(u32)); 4774 } 4775 if (total_len > RECONNECT_MAX_SIZE) { 4776 if (!recon_state.allow_multi) { 4777 err = -ENOSPC; 4778 goto fail; 4779 } 4780 if (recon_state.nr_caps) { 4781 err = send_reconnect_partial(&recon_state); 4782 if (err) 4783 goto fail; 4784 } 4785 recon_state.msg_version = 5; 4786 } 4787 } 4788 4789 err = encode_snap_realms(mdsc, &recon_state); 4790 if (err < 0) 4791 goto fail; 4792 4793 if (recon_state.msg_version >= 5) { 4794 err = ceph_pagelist_encode_8(recon_state.pagelist, 0); 4795 if (err < 0) 4796 goto fail; 4797 } 4798 4799 if (recon_state.nr_caps || recon_state.nr_realms) { 4800 struct page *page = 4801 list_first_entry(&recon_state.pagelist->head, 4802 struct page, lru); 4803 __le32 *addr = kmap_atomic(page); 4804 if (recon_state.nr_caps) { 4805 WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms); 4806 *addr = cpu_to_le32(recon_state.nr_caps); 4807 } else if (recon_state.msg_version >= 4) { 4808 *(addr + 1) = cpu_to_le32(recon_state.nr_realms); 4809 } 4810 kunmap_atomic(addr); 4811 } 4812 4813 reply->hdr.version = cpu_to_le16(recon_state.msg_version); 4814 if (recon_state.msg_version >= 4) 4815 reply->hdr.compat_version = cpu_to_le16(4); 4816 4817 reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length); 4818 ceph_msg_data_add_pagelist(reply, recon_state.pagelist); 4819 4820 ceph_con_send(&session->s_con, reply); 4821 4822 mutex_unlock(&session->s_mutex); 4823 4824 mutex_lock(&mdsc->mutex); 4825 __wake_requests(mdsc, &session->s_waiting); 4826 mutex_unlock(&mdsc->mutex); 4827 4828 up_read(&mdsc->snap_rwsem); 4829 ceph_pagelist_release(recon_state.pagelist); 4830 return; 4831 4832 fail: 4833 ceph_msg_put(reply); 4834 up_read(&mdsc->snap_rwsem); 4835 mutex_unlock(&session->s_mutex); 4836 fail_nomsg: 4837 ceph_pagelist_release(recon_state.pagelist); 4838 fail_nopagelist: 4839 pr_err_client(cl, "error %d preparing reconnect for mds%d\n", 4840 err, mds); 4841 return; 4842 } 4843 4844 4845 /* 4846 * compare old and new mdsmaps, kicking requests 4847 * and closing out old connections as necessary 4848 * 4849 * called under mdsc->mutex. 4850 */ 4851 static void check_new_map(struct ceph_mds_client *mdsc, 4852 struct ceph_mdsmap *newmap, 4853 struct ceph_mdsmap *oldmap) 4854 { 4855 int i, j, err; 4856 int oldstate, newstate; 4857 struct ceph_mds_session *s; 4858 unsigned long targets[DIV_ROUND_UP(CEPH_MAX_MDS, sizeof(unsigned long))] = {0}; 4859 struct ceph_client *cl = mdsc->fsc->client; 4860 4861 doutc(cl, "new %u old %u\n", newmap->m_epoch, oldmap->m_epoch); 4862 4863 if (newmap->m_info) { 4864 for (i = 0; i < newmap->possible_max_rank; i++) { 4865 for (j = 0; j < newmap->m_info[i].num_export_targets; j++) 4866 set_bit(newmap->m_info[i].export_targets[j], targets); 4867 } 4868 } 4869 4870 for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) { 4871 if (!mdsc->sessions[i]) 4872 continue; 4873 s = mdsc->sessions[i]; 4874 oldstate = ceph_mdsmap_get_state(oldmap, i); 4875 newstate = ceph_mdsmap_get_state(newmap, i); 4876 4877 doutc(cl, "mds%d state %s%s -> %s%s (session %s)\n", 4878 i, ceph_mds_state_name(oldstate), 4879 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", 4880 ceph_mds_state_name(newstate), 4881 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", 4882 ceph_session_state_name(s->s_state)); 4883 4884 if (i >= newmap->possible_max_rank) { 4885 /* force close session for stopped mds */ 4886 ceph_get_mds_session(s); 4887 __unregister_session(mdsc, s); 4888 __wake_requests(mdsc, &s->s_waiting); 4889 mutex_unlock(&mdsc->mutex); 4890 4891 mutex_lock(&s->s_mutex); 4892 cleanup_session_requests(mdsc, s); 4893 remove_session_caps(s); 4894 mutex_unlock(&s->s_mutex); 4895 4896 ceph_put_mds_session(s); 4897 4898 mutex_lock(&mdsc->mutex); 4899 kick_requests(mdsc, i); 4900 continue; 4901 } 4902 4903 if (memcmp(ceph_mdsmap_get_addr(oldmap, i), 4904 ceph_mdsmap_get_addr(newmap, i), 4905 sizeof(struct ceph_entity_addr))) { 4906 /* just close it */ 4907 mutex_unlock(&mdsc->mutex); 4908 mutex_lock(&s->s_mutex); 4909 mutex_lock(&mdsc->mutex); 4910 ceph_con_close(&s->s_con); 4911 mutex_unlock(&s->s_mutex); 4912 s->s_state = CEPH_MDS_SESSION_RESTARTING; 4913 } else if (oldstate == newstate) { 4914 continue; /* nothing new with this mds */ 4915 } 4916 4917 /* 4918 * send reconnect? 4919 */ 4920 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 4921 newstate >= CEPH_MDS_STATE_RECONNECT) { 4922 mutex_unlock(&mdsc->mutex); 4923 clear_bit(i, targets); 4924 send_mds_reconnect(mdsc, s); 4925 mutex_lock(&mdsc->mutex); 4926 } 4927 4928 /* 4929 * kick request on any mds that has gone active. 4930 */ 4931 if (oldstate < CEPH_MDS_STATE_ACTIVE && 4932 newstate >= CEPH_MDS_STATE_ACTIVE) { 4933 if (oldstate != CEPH_MDS_STATE_CREATING && 4934 oldstate != CEPH_MDS_STATE_STARTING) 4935 pr_info_client(cl, "mds%d recovery completed\n", 4936 s->s_mds); 4937 kick_requests(mdsc, i); 4938 mutex_unlock(&mdsc->mutex); 4939 mutex_lock(&s->s_mutex); 4940 mutex_lock(&mdsc->mutex); 4941 ceph_kick_flushing_caps(mdsc, s); 4942 mutex_unlock(&s->s_mutex); 4943 wake_up_session_caps(s, RECONNECT); 4944 } 4945 } 4946 4947 /* 4948 * Only open and reconnect sessions that don't exist yet. 4949 */ 4950 for (i = 0; i < newmap->possible_max_rank; i++) { 4951 /* 4952 * In case the import MDS is crashed just after 4953 * the EImportStart journal is flushed, so when 4954 * a standby MDS takes over it and is replaying 4955 * the EImportStart journal the new MDS daemon 4956 * will wait the client to reconnect it, but the 4957 * client may never register/open the session yet. 4958 * 4959 * Will try to reconnect that MDS daemon if the 4960 * rank number is in the export targets array and 4961 * is the up:reconnect state. 4962 */ 4963 newstate = ceph_mdsmap_get_state(newmap, i); 4964 if (!test_bit(i, targets) || newstate != CEPH_MDS_STATE_RECONNECT) 4965 continue; 4966 4967 /* 4968 * The session maybe registered and opened by some 4969 * requests which were choosing random MDSes during 4970 * the mdsc->mutex's unlock/lock gap below in rare 4971 * case. But the related MDS daemon will just queue 4972 * that requests and be still waiting for the client's 4973 * reconnection request in up:reconnect state. 4974 */ 4975 s = __ceph_lookup_mds_session(mdsc, i); 4976 if (likely(!s)) { 4977 s = __open_export_target_session(mdsc, i); 4978 if (IS_ERR(s)) { 4979 err = PTR_ERR(s); 4980 pr_err_client(cl, 4981 "failed to open export target session, err %d\n", 4982 err); 4983 continue; 4984 } 4985 } 4986 doutc(cl, "send reconnect to export target mds.%d\n", i); 4987 mutex_unlock(&mdsc->mutex); 4988 send_mds_reconnect(mdsc, s); 4989 ceph_put_mds_session(s); 4990 mutex_lock(&mdsc->mutex); 4991 } 4992 4993 for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) { 4994 s = mdsc->sessions[i]; 4995 if (!s) 4996 continue; 4997 if (!ceph_mdsmap_is_laggy(newmap, i)) 4998 continue; 4999 if (s->s_state == CEPH_MDS_SESSION_OPEN || 5000 s->s_state == CEPH_MDS_SESSION_HUNG || 5001 s->s_state == CEPH_MDS_SESSION_CLOSING) { 5002 doutc(cl, " connecting to export targets of laggy mds%d\n", i); 5003 __open_export_target_sessions(mdsc, s); 5004 } 5005 } 5006 } 5007 5008 5009 5010 /* 5011 * leases 5012 */ 5013 5014 /* 5015 * caller must hold session s_mutex, dentry->d_lock 5016 */ 5017 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry) 5018 { 5019 struct ceph_dentry_info *di = ceph_dentry(dentry); 5020 5021 ceph_put_mds_session(di->lease_session); 5022 di->lease_session = NULL; 5023 } 5024 5025 static void handle_lease(struct ceph_mds_client *mdsc, 5026 struct ceph_mds_session *session, 5027 struct ceph_msg *msg) 5028 { 5029 struct ceph_client *cl = mdsc->fsc->client; 5030 struct super_block *sb = mdsc->fsc->sb; 5031 struct inode *inode; 5032 struct dentry *parent, *dentry; 5033 struct ceph_dentry_info *di; 5034 int mds = session->s_mds; 5035 struct ceph_mds_lease *h = msg->front.iov_base; 5036 u32 seq; 5037 struct ceph_vino vino; 5038 struct qstr dname; 5039 int release = 0; 5040 5041 doutc(cl, "from mds%d\n", mds); 5042 5043 if (!ceph_inc_mds_stopping_blocker(mdsc, session)) 5044 return; 5045 5046 /* decode */ 5047 if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) 5048 goto bad; 5049 vino.ino = le64_to_cpu(h->ino); 5050 vino.snap = CEPH_NOSNAP; 5051 seq = le32_to_cpu(h->seq); 5052 dname.len = get_unaligned_le32(h + 1); 5053 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len) 5054 goto bad; 5055 dname.name = (void *)(h + 1) + sizeof(u32); 5056 5057 /* lookup inode */ 5058 inode = ceph_find_inode(sb, vino); 5059 doutc(cl, "%s, ino %llx %p %.*s\n", ceph_lease_op_name(h->action), 5060 vino.ino, inode, dname.len, dname.name); 5061 5062 mutex_lock(&session->s_mutex); 5063 if (!inode) { 5064 doutc(cl, "no inode %llx\n", vino.ino); 5065 goto release; 5066 } 5067 5068 /* dentry */ 5069 parent = d_find_alias(inode); 5070 if (!parent) { 5071 doutc(cl, "no parent dentry on inode %p\n", inode); 5072 WARN_ON(1); 5073 goto release; /* hrm... */ 5074 } 5075 dname.hash = full_name_hash(parent, dname.name, dname.len); 5076 dentry = d_lookup(parent, &dname); 5077 dput(parent); 5078 if (!dentry) 5079 goto release; 5080 5081 spin_lock(&dentry->d_lock); 5082 di = ceph_dentry(dentry); 5083 switch (h->action) { 5084 case CEPH_MDS_LEASE_REVOKE: 5085 if (di->lease_session == session) { 5086 if (ceph_seq_cmp(di->lease_seq, seq) > 0) 5087 h->seq = cpu_to_le32(di->lease_seq); 5088 __ceph_mdsc_drop_dentry_lease(dentry); 5089 } 5090 release = 1; 5091 break; 5092 5093 case CEPH_MDS_LEASE_RENEW: 5094 if (di->lease_session == session && 5095 di->lease_gen == atomic_read(&session->s_cap_gen) && 5096 di->lease_renew_from && 5097 di->lease_renew_after == 0) { 5098 unsigned long duration = 5099 msecs_to_jiffies(le32_to_cpu(h->duration_ms)); 5100 5101 di->lease_seq = seq; 5102 di->time = di->lease_renew_from + duration; 5103 di->lease_renew_after = di->lease_renew_from + 5104 (duration >> 1); 5105 di->lease_renew_from = 0; 5106 } 5107 break; 5108 } 5109 spin_unlock(&dentry->d_lock); 5110 dput(dentry); 5111 5112 if (!release) 5113 goto out; 5114 5115 release: 5116 /* let's just reuse the same message */ 5117 h->action = CEPH_MDS_LEASE_REVOKE_ACK; 5118 ceph_msg_get(msg); 5119 ceph_con_send(&session->s_con, msg); 5120 5121 out: 5122 mutex_unlock(&session->s_mutex); 5123 iput(inode); 5124 5125 ceph_dec_mds_stopping_blocker(mdsc); 5126 return; 5127 5128 bad: 5129 ceph_dec_mds_stopping_blocker(mdsc); 5130 5131 pr_err_client(cl, "corrupt lease message\n"); 5132 ceph_msg_dump(msg); 5133 } 5134 5135 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, 5136 struct dentry *dentry, char action, 5137 u32 seq) 5138 { 5139 struct ceph_client *cl = session->s_mdsc->fsc->client; 5140 struct ceph_msg *msg; 5141 struct ceph_mds_lease *lease; 5142 struct inode *dir; 5143 int len = sizeof(*lease) + sizeof(u32) + NAME_MAX; 5144 5145 doutc(cl, "identry %p %s to mds%d\n", dentry, ceph_lease_op_name(action), 5146 session->s_mds); 5147 5148 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); 5149 if (!msg) 5150 return; 5151 lease = msg->front.iov_base; 5152 lease->action = action; 5153 lease->seq = cpu_to_le32(seq); 5154 5155 spin_lock(&dentry->d_lock); 5156 dir = d_inode(dentry->d_parent); 5157 lease->ino = cpu_to_le64(ceph_ino(dir)); 5158 lease->first = lease->last = cpu_to_le64(ceph_snap(dir)); 5159 5160 put_unaligned_le32(dentry->d_name.len, lease + 1); 5161 memcpy((void *)(lease + 1) + 4, 5162 dentry->d_name.name, dentry->d_name.len); 5163 spin_unlock(&dentry->d_lock); 5164 5165 ceph_con_send(&session->s_con, msg); 5166 } 5167 5168 /* 5169 * lock unlock the session, to wait ongoing session activities 5170 */ 5171 static void lock_unlock_session(struct ceph_mds_session *s) 5172 { 5173 mutex_lock(&s->s_mutex); 5174 mutex_unlock(&s->s_mutex); 5175 } 5176 5177 static void maybe_recover_session(struct ceph_mds_client *mdsc) 5178 { 5179 struct ceph_client *cl = mdsc->fsc->client; 5180 struct ceph_fs_client *fsc = mdsc->fsc; 5181 5182 if (!ceph_test_mount_opt(fsc, CLEANRECOVER)) 5183 return; 5184 5185 if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED) 5186 return; 5187 5188 if (!READ_ONCE(fsc->blocklisted)) 5189 return; 5190 5191 pr_info_client(cl, "auto reconnect after blocklisted\n"); 5192 ceph_force_reconnect(fsc->sb); 5193 } 5194 5195 bool check_session_state(struct ceph_mds_session *s) 5196 { 5197 struct ceph_client *cl = s->s_mdsc->fsc->client; 5198 5199 switch (s->s_state) { 5200 case CEPH_MDS_SESSION_OPEN: 5201 if (s->s_ttl && time_after(jiffies, s->s_ttl)) { 5202 s->s_state = CEPH_MDS_SESSION_HUNG; 5203 pr_info_client(cl, "mds%d hung\n", s->s_mds); 5204 } 5205 break; 5206 case CEPH_MDS_SESSION_CLOSING: 5207 case CEPH_MDS_SESSION_NEW: 5208 case CEPH_MDS_SESSION_RESTARTING: 5209 case CEPH_MDS_SESSION_CLOSED: 5210 case CEPH_MDS_SESSION_REJECTED: 5211 return false; 5212 } 5213 5214 return true; 5215 } 5216 5217 /* 5218 * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply, 5219 * then we need to retransmit that request. 5220 */ 5221 void inc_session_sequence(struct ceph_mds_session *s) 5222 { 5223 struct ceph_client *cl = s->s_mdsc->fsc->client; 5224 5225 lockdep_assert_held(&s->s_mutex); 5226 5227 s->s_seq++; 5228 5229 if (s->s_state == CEPH_MDS_SESSION_CLOSING) { 5230 int ret; 5231 5232 doutc(cl, "resending session close request for mds%d\n", s->s_mds); 5233 ret = request_close_session(s); 5234 if (ret < 0) 5235 pr_err_client(cl, "unable to close session to mds%d: %d\n", 5236 s->s_mds, ret); 5237 } 5238 } 5239 5240 /* 5241 * delayed work -- periodically trim expired leases, renew caps with mds. If 5242 * the @delay parameter is set to 0 or if it's more than 5 secs, the default 5243 * workqueue delay value of 5 secs will be used. 5244 */ 5245 static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay) 5246 { 5247 unsigned long max_delay = HZ * 5; 5248 5249 /* 5 secs default delay */ 5250 if (!delay || (delay > max_delay)) 5251 delay = max_delay; 5252 schedule_delayed_work(&mdsc->delayed_work, 5253 round_jiffies_relative(delay)); 5254 } 5255 5256 static void delayed_work(struct work_struct *work) 5257 { 5258 struct ceph_mds_client *mdsc = 5259 container_of(work, struct ceph_mds_client, delayed_work.work); 5260 unsigned long delay; 5261 int renew_interval; 5262 int renew_caps; 5263 int i; 5264 5265 doutc(mdsc->fsc->client, "mdsc delayed_work\n"); 5266 5267 if (mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHED) 5268 return; 5269 5270 mutex_lock(&mdsc->mutex); 5271 renew_interval = mdsc->mdsmap->m_session_timeout >> 2; 5272 renew_caps = time_after_eq(jiffies, HZ*renew_interval + 5273 mdsc->last_renew_caps); 5274 if (renew_caps) 5275 mdsc->last_renew_caps = jiffies; 5276 5277 for (i = 0; i < mdsc->max_sessions; i++) { 5278 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 5279 if (!s) 5280 continue; 5281 5282 if (!check_session_state(s)) { 5283 ceph_put_mds_session(s); 5284 continue; 5285 } 5286 mutex_unlock(&mdsc->mutex); 5287 5288 mutex_lock(&s->s_mutex); 5289 if (renew_caps) 5290 send_renew_caps(mdsc, s); 5291 else 5292 ceph_con_keepalive(&s->s_con); 5293 if (s->s_state == CEPH_MDS_SESSION_OPEN || 5294 s->s_state == CEPH_MDS_SESSION_HUNG) 5295 ceph_send_cap_releases(mdsc, s); 5296 mutex_unlock(&s->s_mutex); 5297 ceph_put_mds_session(s); 5298 5299 mutex_lock(&mdsc->mutex); 5300 } 5301 mutex_unlock(&mdsc->mutex); 5302 5303 delay = ceph_check_delayed_caps(mdsc); 5304 5305 ceph_queue_cap_reclaim_work(mdsc); 5306 5307 ceph_trim_snapid_map(mdsc); 5308 5309 maybe_recover_session(mdsc); 5310 5311 schedule_delayed(mdsc, delay); 5312 } 5313 5314 int ceph_mdsc_init(struct ceph_fs_client *fsc) 5315 5316 { 5317 struct ceph_mds_client *mdsc; 5318 int err; 5319 5320 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS); 5321 if (!mdsc) 5322 return -ENOMEM; 5323 mdsc->fsc = fsc; 5324 mutex_init(&mdsc->mutex); 5325 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); 5326 if (!mdsc->mdsmap) { 5327 err = -ENOMEM; 5328 goto err_mdsc; 5329 } 5330 5331 init_completion(&mdsc->safe_umount_waiters); 5332 spin_lock_init(&mdsc->stopping_lock); 5333 atomic_set(&mdsc->stopping_blockers, 0); 5334 init_completion(&mdsc->stopping_waiter); 5335 init_waitqueue_head(&mdsc->session_close_wq); 5336 INIT_LIST_HEAD(&mdsc->waiting_for_map); 5337 mdsc->quotarealms_inodes = RB_ROOT; 5338 mutex_init(&mdsc->quotarealms_inodes_mutex); 5339 init_rwsem(&mdsc->snap_rwsem); 5340 mdsc->snap_realms = RB_ROOT; 5341 INIT_LIST_HEAD(&mdsc->snap_empty); 5342 spin_lock_init(&mdsc->snap_empty_lock); 5343 mdsc->request_tree = RB_ROOT; 5344 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); 5345 mdsc->last_renew_caps = jiffies; 5346 INIT_LIST_HEAD(&mdsc->cap_delay_list); 5347 INIT_LIST_HEAD(&mdsc->cap_wait_list); 5348 spin_lock_init(&mdsc->cap_delay_lock); 5349 INIT_LIST_HEAD(&mdsc->snap_flush_list); 5350 spin_lock_init(&mdsc->snap_flush_lock); 5351 mdsc->last_cap_flush_tid = 1; 5352 INIT_LIST_HEAD(&mdsc->cap_flush_list); 5353 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); 5354 spin_lock_init(&mdsc->cap_dirty_lock); 5355 init_waitqueue_head(&mdsc->cap_flushing_wq); 5356 INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work); 5357 err = ceph_metric_init(&mdsc->metric); 5358 if (err) 5359 goto err_mdsmap; 5360 5361 spin_lock_init(&mdsc->dentry_list_lock); 5362 INIT_LIST_HEAD(&mdsc->dentry_leases); 5363 INIT_LIST_HEAD(&mdsc->dentry_dir_leases); 5364 5365 ceph_caps_init(mdsc); 5366 ceph_adjust_caps_max_min(mdsc, fsc->mount_options); 5367 5368 spin_lock_init(&mdsc->snapid_map_lock); 5369 mdsc->snapid_map_tree = RB_ROOT; 5370 INIT_LIST_HEAD(&mdsc->snapid_map_lru); 5371 5372 init_rwsem(&mdsc->pool_perm_rwsem); 5373 mdsc->pool_perm_tree = RB_ROOT; 5374 5375 strscpy(mdsc->nodename, utsname()->nodename, 5376 sizeof(mdsc->nodename)); 5377 5378 fsc->mdsc = mdsc; 5379 return 0; 5380 5381 err_mdsmap: 5382 kfree(mdsc->mdsmap); 5383 err_mdsc: 5384 kfree(mdsc); 5385 return err; 5386 } 5387 5388 /* 5389 * Wait for safe replies on open mds requests. If we time out, drop 5390 * all requests from the tree to avoid dangling dentry refs. 5391 */ 5392 static void wait_requests(struct ceph_mds_client *mdsc) 5393 { 5394 struct ceph_client *cl = mdsc->fsc->client; 5395 struct ceph_options *opts = mdsc->fsc->client->options; 5396 struct ceph_mds_request *req; 5397 5398 mutex_lock(&mdsc->mutex); 5399 if (__get_oldest_req(mdsc)) { 5400 mutex_unlock(&mdsc->mutex); 5401 5402 doutc(cl, "waiting for requests\n"); 5403 wait_for_completion_timeout(&mdsc->safe_umount_waiters, 5404 ceph_timeout_jiffies(opts->mount_timeout)); 5405 5406 /* tear down remaining requests */ 5407 mutex_lock(&mdsc->mutex); 5408 while ((req = __get_oldest_req(mdsc))) { 5409 doutc(cl, "timed out on tid %llu\n", req->r_tid); 5410 list_del_init(&req->r_wait); 5411 __unregister_request(mdsc, req); 5412 } 5413 } 5414 mutex_unlock(&mdsc->mutex); 5415 doutc(cl, "done\n"); 5416 } 5417 5418 void send_flush_mdlog(struct ceph_mds_session *s) 5419 { 5420 struct ceph_client *cl = s->s_mdsc->fsc->client; 5421 struct ceph_msg *msg; 5422 5423 /* 5424 * Pre-luminous MDS crashes when it sees an unknown session request 5425 */ 5426 if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS)) 5427 return; 5428 5429 mutex_lock(&s->s_mutex); 5430 doutc(cl, "request mdlog flush to mds%d (%s)s seq %lld\n", 5431 s->s_mds, ceph_session_state_name(s->s_state), s->s_seq); 5432 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG, 5433 s->s_seq); 5434 if (!msg) { 5435 pr_err_client(cl, "failed to request mdlog flush to mds%d (%s) seq %lld\n", 5436 s->s_mds, ceph_session_state_name(s->s_state), s->s_seq); 5437 } else { 5438 ceph_con_send(&s->s_con, msg); 5439 } 5440 mutex_unlock(&s->s_mutex); 5441 } 5442 5443 /* 5444 * called before mount is ro, and before dentries are torn down. 5445 * (hmm, does this still race with new lookups?) 5446 */ 5447 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) 5448 { 5449 doutc(mdsc->fsc->client, "begin\n"); 5450 mdsc->stopping = CEPH_MDSC_STOPPING_BEGIN; 5451 5452 ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true); 5453 ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false); 5454 ceph_flush_dirty_caps(mdsc); 5455 wait_requests(mdsc); 5456 5457 /* 5458 * wait for reply handlers to drop their request refs and 5459 * their inode/dcache refs 5460 */ 5461 ceph_msgr_flush(); 5462 5463 ceph_cleanup_quotarealms_inodes(mdsc); 5464 doutc(mdsc->fsc->client, "done\n"); 5465 } 5466 5467 /* 5468 * flush the mdlog and wait for all write mds requests to flush. 5469 */ 5470 static void flush_mdlog_and_wait_mdsc_unsafe_requests(struct ceph_mds_client *mdsc, 5471 u64 want_tid) 5472 { 5473 struct ceph_client *cl = mdsc->fsc->client; 5474 struct ceph_mds_request *req = NULL, *nextreq; 5475 struct ceph_mds_session *last_session = NULL; 5476 struct rb_node *n; 5477 5478 mutex_lock(&mdsc->mutex); 5479 doutc(cl, "want %lld\n", want_tid); 5480 restart: 5481 req = __get_oldest_req(mdsc); 5482 while (req && req->r_tid <= want_tid) { 5483 /* find next request */ 5484 n = rb_next(&req->r_node); 5485 if (n) 5486 nextreq = rb_entry(n, struct ceph_mds_request, r_node); 5487 else 5488 nextreq = NULL; 5489 if (req->r_op != CEPH_MDS_OP_SETFILELOCK && 5490 (req->r_op & CEPH_MDS_OP_WRITE)) { 5491 struct ceph_mds_session *s = req->r_session; 5492 5493 if (!s) { 5494 req = nextreq; 5495 continue; 5496 } 5497 5498 /* write op */ 5499 ceph_mdsc_get_request(req); 5500 if (nextreq) 5501 ceph_mdsc_get_request(nextreq); 5502 s = ceph_get_mds_session(s); 5503 mutex_unlock(&mdsc->mutex); 5504 5505 /* send flush mdlog request to MDS */ 5506 if (last_session != s) { 5507 send_flush_mdlog(s); 5508 ceph_put_mds_session(last_session); 5509 last_session = s; 5510 } else { 5511 ceph_put_mds_session(s); 5512 } 5513 doutc(cl, "wait on %llu (want %llu)\n", 5514 req->r_tid, want_tid); 5515 wait_for_completion(&req->r_safe_completion); 5516 5517 mutex_lock(&mdsc->mutex); 5518 ceph_mdsc_put_request(req); 5519 if (!nextreq) 5520 break; /* next dne before, so we're done! */ 5521 if (RB_EMPTY_NODE(&nextreq->r_node)) { 5522 /* next request was removed from tree */ 5523 ceph_mdsc_put_request(nextreq); 5524 goto restart; 5525 } 5526 ceph_mdsc_put_request(nextreq); /* won't go away */ 5527 } 5528 req = nextreq; 5529 } 5530 mutex_unlock(&mdsc->mutex); 5531 ceph_put_mds_session(last_session); 5532 doutc(cl, "done\n"); 5533 } 5534 5535 void ceph_mdsc_sync(struct ceph_mds_client *mdsc) 5536 { 5537 struct ceph_client *cl = mdsc->fsc->client; 5538 u64 want_tid, want_flush; 5539 5540 if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) 5541 return; 5542 5543 doutc(cl, "sync\n"); 5544 mutex_lock(&mdsc->mutex); 5545 want_tid = mdsc->last_tid; 5546 mutex_unlock(&mdsc->mutex); 5547 5548 ceph_flush_dirty_caps(mdsc); 5549 spin_lock(&mdsc->cap_dirty_lock); 5550 want_flush = mdsc->last_cap_flush_tid; 5551 if (!list_empty(&mdsc->cap_flush_list)) { 5552 struct ceph_cap_flush *cf = 5553 list_last_entry(&mdsc->cap_flush_list, 5554 struct ceph_cap_flush, g_list); 5555 cf->wake = true; 5556 } 5557 spin_unlock(&mdsc->cap_dirty_lock); 5558 5559 doutc(cl, "sync want tid %lld flush_seq %lld\n", want_tid, want_flush); 5560 5561 flush_mdlog_and_wait_mdsc_unsafe_requests(mdsc, want_tid); 5562 wait_caps_flush(mdsc, want_flush); 5563 } 5564 5565 /* 5566 * true if all sessions are closed, or we force unmount 5567 */ 5568 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped) 5569 { 5570 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 5571 return true; 5572 return atomic_read(&mdsc->num_sessions) <= skipped; 5573 } 5574 5575 /* 5576 * called after sb is ro or when metadata corrupted. 5577 */ 5578 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) 5579 { 5580 struct ceph_options *opts = mdsc->fsc->client->options; 5581 struct ceph_client *cl = mdsc->fsc->client; 5582 struct ceph_mds_session *session; 5583 int i; 5584 int skipped = 0; 5585 5586 doutc(cl, "begin\n"); 5587 5588 /* close sessions */ 5589 mutex_lock(&mdsc->mutex); 5590 for (i = 0; i < mdsc->max_sessions; i++) { 5591 session = __ceph_lookup_mds_session(mdsc, i); 5592 if (!session) 5593 continue; 5594 mutex_unlock(&mdsc->mutex); 5595 mutex_lock(&session->s_mutex); 5596 if (__close_session(mdsc, session) <= 0) 5597 skipped++; 5598 mutex_unlock(&session->s_mutex); 5599 ceph_put_mds_session(session); 5600 mutex_lock(&mdsc->mutex); 5601 } 5602 mutex_unlock(&mdsc->mutex); 5603 5604 doutc(cl, "waiting for sessions to close\n"); 5605 wait_event_timeout(mdsc->session_close_wq, 5606 done_closing_sessions(mdsc, skipped), 5607 ceph_timeout_jiffies(opts->mount_timeout)); 5608 5609 /* tear down remaining sessions */ 5610 mutex_lock(&mdsc->mutex); 5611 for (i = 0; i < mdsc->max_sessions; i++) { 5612 if (mdsc->sessions[i]) { 5613 session = ceph_get_mds_session(mdsc->sessions[i]); 5614 __unregister_session(mdsc, session); 5615 mutex_unlock(&mdsc->mutex); 5616 mutex_lock(&session->s_mutex); 5617 remove_session_caps(session); 5618 mutex_unlock(&session->s_mutex); 5619 ceph_put_mds_session(session); 5620 mutex_lock(&mdsc->mutex); 5621 } 5622 } 5623 WARN_ON(!list_empty(&mdsc->cap_delay_list)); 5624 mutex_unlock(&mdsc->mutex); 5625 5626 ceph_cleanup_snapid_map(mdsc); 5627 ceph_cleanup_global_and_empty_realms(mdsc); 5628 5629 cancel_work_sync(&mdsc->cap_reclaim_work); 5630 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 5631 5632 doutc(cl, "done\n"); 5633 } 5634 5635 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc) 5636 { 5637 struct ceph_mds_session *session; 5638 int mds; 5639 5640 doutc(mdsc->fsc->client, "force umount\n"); 5641 5642 mutex_lock(&mdsc->mutex); 5643 for (mds = 0; mds < mdsc->max_sessions; mds++) { 5644 session = __ceph_lookup_mds_session(mdsc, mds); 5645 if (!session) 5646 continue; 5647 5648 if (session->s_state == CEPH_MDS_SESSION_REJECTED) 5649 __unregister_session(mdsc, session); 5650 __wake_requests(mdsc, &session->s_waiting); 5651 mutex_unlock(&mdsc->mutex); 5652 5653 mutex_lock(&session->s_mutex); 5654 __close_session(mdsc, session); 5655 if (session->s_state == CEPH_MDS_SESSION_CLOSING) { 5656 cleanup_session_requests(mdsc, session); 5657 remove_session_caps(session); 5658 } 5659 mutex_unlock(&session->s_mutex); 5660 ceph_put_mds_session(session); 5661 5662 mutex_lock(&mdsc->mutex); 5663 kick_requests(mdsc, mds); 5664 } 5665 __wake_requests(mdsc, &mdsc->waiting_for_map); 5666 mutex_unlock(&mdsc->mutex); 5667 } 5668 5669 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) 5670 { 5671 doutc(mdsc->fsc->client, "stop\n"); 5672 /* 5673 * Make sure the delayed work stopped before releasing 5674 * the resources. 5675 * 5676 * Because the cancel_delayed_work_sync() will only 5677 * guarantee that the work finishes executing. But the 5678 * delayed work will re-arm itself again after that. 5679 */ 5680 flush_delayed_work(&mdsc->delayed_work); 5681 5682 if (mdsc->mdsmap) 5683 ceph_mdsmap_destroy(mdsc->mdsmap); 5684 kfree(mdsc->sessions); 5685 ceph_caps_finalize(mdsc); 5686 ceph_pool_perm_destroy(mdsc); 5687 } 5688 5689 void ceph_mdsc_destroy(struct ceph_fs_client *fsc) 5690 { 5691 struct ceph_mds_client *mdsc = fsc->mdsc; 5692 doutc(fsc->client, "%p\n", mdsc); 5693 5694 if (!mdsc) 5695 return; 5696 5697 /* flush out any connection work with references to us */ 5698 ceph_msgr_flush(); 5699 5700 ceph_mdsc_stop(mdsc); 5701 5702 ceph_metric_destroy(&mdsc->metric); 5703 5704 fsc->mdsc = NULL; 5705 kfree(mdsc); 5706 doutc(fsc->client, "%p done\n", mdsc); 5707 } 5708 5709 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 5710 { 5711 struct ceph_fs_client *fsc = mdsc->fsc; 5712 struct ceph_client *cl = fsc->client; 5713 const char *mds_namespace = fsc->mount_options->mds_namespace; 5714 void *p = msg->front.iov_base; 5715 void *end = p + msg->front.iov_len; 5716 u32 epoch; 5717 u32 num_fs; 5718 u32 mount_fscid = (u32)-1; 5719 int err = -EINVAL; 5720 5721 ceph_decode_need(&p, end, sizeof(u32), bad); 5722 epoch = ceph_decode_32(&p); 5723 5724 doutc(cl, "epoch %u\n", epoch); 5725 5726 /* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */ 5727 ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad); 5728 5729 ceph_decode_32_safe(&p, end, num_fs, bad); 5730 while (num_fs-- > 0) { 5731 void *info_p, *info_end; 5732 u32 info_len; 5733 u32 fscid, namelen; 5734 5735 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 5736 p += 2; // info_v, info_cv 5737 info_len = ceph_decode_32(&p); 5738 ceph_decode_need(&p, end, info_len, bad); 5739 info_p = p; 5740 info_end = p + info_len; 5741 p = info_end; 5742 5743 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad); 5744 fscid = ceph_decode_32(&info_p); 5745 namelen = ceph_decode_32(&info_p); 5746 ceph_decode_need(&info_p, info_end, namelen, bad); 5747 5748 if (mds_namespace && 5749 strlen(mds_namespace) == namelen && 5750 !strncmp(mds_namespace, (char *)info_p, namelen)) { 5751 mount_fscid = fscid; 5752 break; 5753 } 5754 } 5755 5756 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch); 5757 if (mount_fscid != (u32)-1) { 5758 fsc->client->monc.fs_cluster_id = mount_fscid; 5759 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 5760 0, true); 5761 ceph_monc_renew_subs(&fsc->client->monc); 5762 } else { 5763 err = -ENOENT; 5764 goto err_out; 5765 } 5766 return; 5767 5768 bad: 5769 pr_err_client(cl, "error decoding fsmap %d. Shutting down mount.\n", 5770 err); 5771 ceph_umount_begin(mdsc->fsc->sb); 5772 ceph_msg_dump(msg); 5773 err_out: 5774 mutex_lock(&mdsc->mutex); 5775 mdsc->mdsmap_err = err; 5776 __wake_requests(mdsc, &mdsc->waiting_for_map); 5777 mutex_unlock(&mdsc->mutex); 5778 } 5779 5780 /* 5781 * handle mds map update. 5782 */ 5783 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 5784 { 5785 struct ceph_client *cl = mdsc->fsc->client; 5786 u32 epoch; 5787 u32 maplen; 5788 void *p = msg->front.iov_base; 5789 void *end = p + msg->front.iov_len; 5790 struct ceph_mdsmap *newmap, *oldmap; 5791 struct ceph_fsid fsid; 5792 int err = -EINVAL; 5793 5794 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad); 5795 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 5796 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0) 5797 return; 5798 epoch = ceph_decode_32(&p); 5799 maplen = ceph_decode_32(&p); 5800 doutc(cl, "epoch %u len %d\n", epoch, (int)maplen); 5801 5802 /* do we need it? */ 5803 mutex_lock(&mdsc->mutex); 5804 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 5805 doutc(cl, "epoch %u <= our %u\n", epoch, mdsc->mdsmap->m_epoch); 5806 mutex_unlock(&mdsc->mutex); 5807 return; 5808 } 5809 5810 newmap = ceph_mdsmap_decode(mdsc, &p, end, ceph_msgr2(mdsc->fsc->client)); 5811 if (IS_ERR(newmap)) { 5812 err = PTR_ERR(newmap); 5813 goto bad_unlock; 5814 } 5815 5816 /* swap into place */ 5817 if (mdsc->mdsmap) { 5818 oldmap = mdsc->mdsmap; 5819 mdsc->mdsmap = newmap; 5820 check_new_map(mdsc, newmap, oldmap); 5821 ceph_mdsmap_destroy(oldmap); 5822 } else { 5823 mdsc->mdsmap = newmap; /* first mds map */ 5824 } 5825 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size, 5826 MAX_LFS_FILESIZE); 5827 5828 __wake_requests(mdsc, &mdsc->waiting_for_map); 5829 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP, 5830 mdsc->mdsmap->m_epoch); 5831 5832 mutex_unlock(&mdsc->mutex); 5833 schedule_delayed(mdsc, 0); 5834 return; 5835 5836 bad_unlock: 5837 mutex_unlock(&mdsc->mutex); 5838 bad: 5839 pr_err_client(cl, "error decoding mdsmap %d. Shutting down mount.\n", 5840 err); 5841 ceph_umount_begin(mdsc->fsc->sb); 5842 ceph_msg_dump(msg); 5843 return; 5844 } 5845 5846 static struct ceph_connection *mds_get_con(struct ceph_connection *con) 5847 { 5848 struct ceph_mds_session *s = con->private; 5849 5850 if (ceph_get_mds_session(s)) 5851 return con; 5852 return NULL; 5853 } 5854 5855 static void mds_put_con(struct ceph_connection *con) 5856 { 5857 struct ceph_mds_session *s = con->private; 5858 5859 ceph_put_mds_session(s); 5860 } 5861 5862 /* 5863 * if the client is unresponsive for long enough, the mds will kill 5864 * the session entirely. 5865 */ 5866 static void mds_peer_reset(struct ceph_connection *con) 5867 { 5868 struct ceph_mds_session *s = con->private; 5869 struct ceph_mds_client *mdsc = s->s_mdsc; 5870 5871 pr_warn_client(mdsc->fsc->client, "mds%d closed our session\n", 5872 s->s_mds); 5873 if (READ_ONCE(mdsc->fsc->mount_state) != CEPH_MOUNT_FENCE_IO) 5874 send_mds_reconnect(mdsc, s); 5875 } 5876 5877 static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg) 5878 { 5879 struct ceph_mds_session *s = con->private; 5880 struct ceph_mds_client *mdsc = s->s_mdsc; 5881 struct ceph_client *cl = mdsc->fsc->client; 5882 int type = le16_to_cpu(msg->hdr.type); 5883 5884 mutex_lock(&mdsc->mutex); 5885 if (__verify_registered_session(mdsc, s) < 0) { 5886 mutex_unlock(&mdsc->mutex); 5887 goto out; 5888 } 5889 mutex_unlock(&mdsc->mutex); 5890 5891 switch (type) { 5892 case CEPH_MSG_MDS_MAP: 5893 ceph_mdsc_handle_mdsmap(mdsc, msg); 5894 break; 5895 case CEPH_MSG_FS_MAP_USER: 5896 ceph_mdsc_handle_fsmap(mdsc, msg); 5897 break; 5898 case CEPH_MSG_CLIENT_SESSION: 5899 handle_session(s, msg); 5900 break; 5901 case CEPH_MSG_CLIENT_REPLY: 5902 handle_reply(s, msg); 5903 break; 5904 case CEPH_MSG_CLIENT_REQUEST_FORWARD: 5905 handle_forward(mdsc, s, msg); 5906 break; 5907 case CEPH_MSG_CLIENT_CAPS: 5908 ceph_handle_caps(s, msg); 5909 break; 5910 case CEPH_MSG_CLIENT_SNAP: 5911 ceph_handle_snap(mdsc, s, msg); 5912 break; 5913 case CEPH_MSG_CLIENT_LEASE: 5914 handle_lease(mdsc, s, msg); 5915 break; 5916 case CEPH_MSG_CLIENT_QUOTA: 5917 ceph_handle_quota(mdsc, s, msg); 5918 break; 5919 5920 default: 5921 pr_err_client(cl, "received unknown message type %d %s\n", 5922 type, ceph_msg_type_name(type)); 5923 } 5924 out: 5925 ceph_msg_put(msg); 5926 } 5927 5928 /* 5929 * authentication 5930 */ 5931 5932 /* 5933 * Note: returned pointer is the address of a structure that's 5934 * managed separately. Caller must *not* attempt to free it. 5935 */ 5936 static struct ceph_auth_handshake * 5937 mds_get_authorizer(struct ceph_connection *con, int *proto, int force_new) 5938 { 5939 struct ceph_mds_session *s = con->private; 5940 struct ceph_mds_client *mdsc = s->s_mdsc; 5941 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5942 struct ceph_auth_handshake *auth = &s->s_auth; 5943 int ret; 5944 5945 ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS, 5946 force_new, proto, NULL, NULL); 5947 if (ret) 5948 return ERR_PTR(ret); 5949 5950 return auth; 5951 } 5952 5953 static int mds_add_authorizer_challenge(struct ceph_connection *con, 5954 void *challenge_buf, int challenge_buf_len) 5955 { 5956 struct ceph_mds_session *s = con->private; 5957 struct ceph_mds_client *mdsc = s->s_mdsc; 5958 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5959 5960 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer, 5961 challenge_buf, challenge_buf_len); 5962 } 5963 5964 static int mds_verify_authorizer_reply(struct ceph_connection *con) 5965 { 5966 struct ceph_mds_session *s = con->private; 5967 struct ceph_mds_client *mdsc = s->s_mdsc; 5968 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5969 struct ceph_auth_handshake *auth = &s->s_auth; 5970 5971 return ceph_auth_verify_authorizer_reply(ac, auth->authorizer, 5972 auth->authorizer_reply_buf, auth->authorizer_reply_buf_len, 5973 NULL, NULL, NULL, NULL); 5974 } 5975 5976 static int mds_invalidate_authorizer(struct ceph_connection *con) 5977 { 5978 struct ceph_mds_session *s = con->private; 5979 struct ceph_mds_client *mdsc = s->s_mdsc; 5980 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5981 5982 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS); 5983 5984 return ceph_monc_validate_auth(&mdsc->fsc->client->monc); 5985 } 5986 5987 static int mds_get_auth_request(struct ceph_connection *con, 5988 void *buf, int *buf_len, 5989 void **authorizer, int *authorizer_len) 5990 { 5991 struct ceph_mds_session *s = con->private; 5992 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 5993 struct ceph_auth_handshake *auth = &s->s_auth; 5994 int ret; 5995 5996 ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS, 5997 buf, buf_len); 5998 if (ret) 5999 return ret; 6000 6001 *authorizer = auth->authorizer_buf; 6002 *authorizer_len = auth->authorizer_buf_len; 6003 return 0; 6004 } 6005 6006 static int mds_handle_auth_reply_more(struct ceph_connection *con, 6007 void *reply, int reply_len, 6008 void *buf, int *buf_len, 6009 void **authorizer, int *authorizer_len) 6010 { 6011 struct ceph_mds_session *s = con->private; 6012 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 6013 struct ceph_auth_handshake *auth = &s->s_auth; 6014 int ret; 6015 6016 ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len, 6017 buf, buf_len); 6018 if (ret) 6019 return ret; 6020 6021 *authorizer = auth->authorizer_buf; 6022 *authorizer_len = auth->authorizer_buf_len; 6023 return 0; 6024 } 6025 6026 static int mds_handle_auth_done(struct ceph_connection *con, 6027 u64 global_id, void *reply, int reply_len, 6028 u8 *session_key, int *session_key_len, 6029 u8 *con_secret, int *con_secret_len) 6030 { 6031 struct ceph_mds_session *s = con->private; 6032 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 6033 struct ceph_auth_handshake *auth = &s->s_auth; 6034 6035 return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len, 6036 session_key, session_key_len, 6037 con_secret, con_secret_len); 6038 } 6039 6040 static int mds_handle_auth_bad_method(struct ceph_connection *con, 6041 int used_proto, int result, 6042 const int *allowed_protos, int proto_cnt, 6043 const int *allowed_modes, int mode_cnt) 6044 { 6045 struct ceph_mds_session *s = con->private; 6046 struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc; 6047 int ret; 6048 6049 if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS, 6050 used_proto, result, 6051 allowed_protos, proto_cnt, 6052 allowed_modes, mode_cnt)) { 6053 ret = ceph_monc_validate_auth(monc); 6054 if (ret) 6055 return ret; 6056 } 6057 6058 return -EACCES; 6059 } 6060 6061 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con, 6062 struct ceph_msg_header *hdr, int *skip) 6063 { 6064 struct ceph_msg *msg; 6065 int type = (int) le16_to_cpu(hdr->type); 6066 int front_len = (int) le32_to_cpu(hdr->front_len); 6067 6068 if (con->in_msg) 6069 return con->in_msg; 6070 6071 *skip = 0; 6072 msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 6073 if (!msg) { 6074 pr_err("unable to allocate msg type %d len %d\n", 6075 type, front_len); 6076 return NULL; 6077 } 6078 6079 return msg; 6080 } 6081 6082 static int mds_sign_message(struct ceph_msg *msg) 6083 { 6084 struct ceph_mds_session *s = msg->con->private; 6085 struct ceph_auth_handshake *auth = &s->s_auth; 6086 6087 return ceph_auth_sign_message(auth, msg); 6088 } 6089 6090 static int mds_check_message_signature(struct ceph_msg *msg) 6091 { 6092 struct ceph_mds_session *s = msg->con->private; 6093 struct ceph_auth_handshake *auth = &s->s_auth; 6094 6095 return ceph_auth_check_message_signature(auth, msg); 6096 } 6097 6098 static const struct ceph_connection_operations mds_con_ops = { 6099 .get = mds_get_con, 6100 .put = mds_put_con, 6101 .alloc_msg = mds_alloc_msg, 6102 .dispatch = mds_dispatch, 6103 .peer_reset = mds_peer_reset, 6104 .get_authorizer = mds_get_authorizer, 6105 .add_authorizer_challenge = mds_add_authorizer_challenge, 6106 .verify_authorizer_reply = mds_verify_authorizer_reply, 6107 .invalidate_authorizer = mds_invalidate_authorizer, 6108 .sign_message = mds_sign_message, 6109 .check_message_signature = mds_check_message_signature, 6110 .get_auth_request = mds_get_auth_request, 6111 .handle_auth_reply_more = mds_handle_auth_reply_more, 6112 .handle_auth_done = mds_handle_auth_done, 6113 .handle_auth_bad_method = mds_handle_auth_bad_method, 6114 }; 6115 6116 /* eof */ 6117