1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/fs.h> 5 #include <linux/wait.h> 6 #include <linux/slab.h> 7 #include <linux/gfp.h> 8 #include <linux/sched.h> 9 #include <linux/debugfs.h> 10 #include <linux/seq_file.h> 11 #include <linux/ratelimit.h> 12 #include <linux/bits.h> 13 #include <linux/ktime.h> 14 #include <linux/bitmap.h> 15 #include <linux/mnt_idmapping.h> 16 17 #include "super.h" 18 #include "mds_client.h" 19 #include "crypto.h" 20 21 #include <linux/ceph/ceph_features.h> 22 #include <linux/ceph/messenger.h> 23 #include <linux/ceph/decode.h> 24 #include <linux/ceph/pagelist.h> 25 #include <linux/ceph/auth.h> 26 #include <linux/ceph/debugfs.h> 27 28 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE) 29 30 /* 31 * A cluster of MDS (metadata server) daemons is responsible for 32 * managing the file system namespace (the directory hierarchy and 33 * inodes) and for coordinating shared access to storage. Metadata is 34 * partitioning hierarchically across a number of servers, and that 35 * partition varies over time as the cluster adjusts the distribution 36 * in order to balance load. 37 * 38 * The MDS client is primarily responsible to managing synchronous 39 * metadata requests for operations like open, unlink, and so forth. 40 * If there is a MDS failure, we find out about it when we (possibly 41 * request and) receive a new MDS map, and can resubmit affected 42 * requests. 43 * 44 * For the most part, though, we take advantage of a lossless 45 * communications channel to the MDS, and do not need to worry about 46 * timing out or resubmitting requests. 47 * 48 * We maintain a stateful "session" with each MDS we interact with. 49 * Within each session, we sent periodic heartbeat messages to ensure 50 * any capabilities or leases we have been issues remain valid. If 51 * the session times out and goes stale, our leases and capabilities 52 * are no longer valid. 53 */ 54 55 struct ceph_reconnect_state { 56 struct ceph_mds_session *session; 57 int nr_caps, nr_realms; 58 struct ceph_pagelist *pagelist; 59 unsigned msg_version; 60 bool allow_multi; 61 }; 62 63 static void __wake_requests(struct ceph_mds_client *mdsc, 64 struct list_head *head); 65 static void ceph_cap_release_work(struct work_struct *work); 66 static void ceph_cap_reclaim_work(struct work_struct *work); 67 68 static const struct ceph_connection_operations mds_con_ops; 69 70 71 /* 72 * mds reply parsing 73 */ 74 75 static int parse_reply_info_quota(void **p, void *end, 76 struct ceph_mds_reply_info_in *info) 77 { 78 u8 struct_v, struct_compat; 79 u32 struct_len; 80 81 ceph_decode_8_safe(p, end, struct_v, bad); 82 ceph_decode_8_safe(p, end, struct_compat, bad); 83 /* struct_v is expected to be >= 1. we only 84 * understand encoding with struct_compat == 1. */ 85 if (!struct_v || struct_compat != 1) 86 goto bad; 87 ceph_decode_32_safe(p, end, struct_len, bad); 88 ceph_decode_need(p, end, struct_len, bad); 89 end = *p + struct_len; 90 ceph_decode_64_safe(p, end, info->max_bytes, bad); 91 ceph_decode_64_safe(p, end, info->max_files, bad); 92 *p = end; 93 return 0; 94 bad: 95 return -EIO; 96 } 97 98 /* 99 * parse individual inode info 100 */ 101 static int parse_reply_info_in(void **p, void *end, 102 struct ceph_mds_reply_info_in *info, 103 u64 features) 104 { 105 int err = 0; 106 u8 struct_v = 0; 107 108 if (features == (u64)-1) { 109 u32 struct_len; 110 u8 struct_compat; 111 ceph_decode_8_safe(p, end, struct_v, bad); 112 ceph_decode_8_safe(p, end, struct_compat, bad); 113 /* struct_v is expected to be >= 1. we only understand 114 * encoding with struct_compat == 1. */ 115 if (!struct_v || struct_compat != 1) 116 goto bad; 117 ceph_decode_32_safe(p, end, struct_len, bad); 118 ceph_decode_need(p, end, struct_len, bad); 119 end = *p + struct_len; 120 } 121 122 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad); 123 info->in = *p; 124 *p += sizeof(struct ceph_mds_reply_inode) + 125 sizeof(*info->in->fragtree.splits) * 126 le32_to_cpu(info->in->fragtree.nsplits); 127 128 ceph_decode_32_safe(p, end, info->symlink_len, bad); 129 ceph_decode_need(p, end, info->symlink_len, bad); 130 info->symlink = *p; 131 *p += info->symlink_len; 132 133 ceph_decode_copy_safe(p, end, &info->dir_layout, 134 sizeof(info->dir_layout), bad); 135 ceph_decode_32_safe(p, end, info->xattr_len, bad); 136 ceph_decode_need(p, end, info->xattr_len, bad); 137 info->xattr_data = *p; 138 *p += info->xattr_len; 139 140 if (features == (u64)-1) { 141 /* inline data */ 142 ceph_decode_64_safe(p, end, info->inline_version, bad); 143 ceph_decode_32_safe(p, end, info->inline_len, bad); 144 ceph_decode_need(p, end, info->inline_len, bad); 145 info->inline_data = *p; 146 *p += info->inline_len; 147 /* quota */ 148 err = parse_reply_info_quota(p, end, info); 149 if (err < 0) 150 goto out_bad; 151 /* pool namespace */ 152 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 153 if (info->pool_ns_len > 0) { 154 ceph_decode_need(p, end, info->pool_ns_len, bad); 155 info->pool_ns_data = *p; 156 *p += info->pool_ns_len; 157 } 158 159 /* btime */ 160 ceph_decode_need(p, end, sizeof(info->btime), bad); 161 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 162 163 /* change attribute */ 164 ceph_decode_64_safe(p, end, info->change_attr, bad); 165 166 /* dir pin */ 167 if (struct_v >= 2) { 168 ceph_decode_32_safe(p, end, info->dir_pin, bad); 169 } else { 170 info->dir_pin = -ENODATA; 171 } 172 173 /* snapshot birth time, remains zero for v<=2 */ 174 if (struct_v >= 3) { 175 ceph_decode_need(p, end, sizeof(info->snap_btime), bad); 176 ceph_decode_copy(p, &info->snap_btime, 177 sizeof(info->snap_btime)); 178 } else { 179 memset(&info->snap_btime, 0, sizeof(info->snap_btime)); 180 } 181 182 /* snapshot count, remains zero for v<=3 */ 183 if (struct_v >= 4) { 184 ceph_decode_64_safe(p, end, info->rsnaps, bad); 185 } else { 186 info->rsnaps = 0; 187 } 188 189 if (struct_v >= 5) { 190 u32 alen; 191 192 ceph_decode_32_safe(p, end, alen, bad); 193 194 while (alen--) { 195 u32 len; 196 197 /* key */ 198 ceph_decode_32_safe(p, end, len, bad); 199 ceph_decode_skip_n(p, end, len, bad); 200 /* value */ 201 ceph_decode_32_safe(p, end, len, bad); 202 ceph_decode_skip_n(p, end, len, bad); 203 } 204 } 205 206 /* fscrypt flag -- ignore */ 207 if (struct_v >= 6) 208 ceph_decode_skip_8(p, end, bad); 209 210 info->fscrypt_auth = NULL; 211 info->fscrypt_auth_len = 0; 212 info->fscrypt_file = NULL; 213 info->fscrypt_file_len = 0; 214 if (struct_v >= 7) { 215 ceph_decode_32_safe(p, end, info->fscrypt_auth_len, bad); 216 if (info->fscrypt_auth_len) { 217 info->fscrypt_auth = kmalloc(info->fscrypt_auth_len, 218 GFP_KERNEL); 219 if (!info->fscrypt_auth) 220 return -ENOMEM; 221 ceph_decode_copy_safe(p, end, info->fscrypt_auth, 222 info->fscrypt_auth_len, bad); 223 } 224 ceph_decode_32_safe(p, end, info->fscrypt_file_len, bad); 225 if (info->fscrypt_file_len) { 226 info->fscrypt_file = kmalloc(info->fscrypt_file_len, 227 GFP_KERNEL); 228 if (!info->fscrypt_file) 229 return -ENOMEM; 230 ceph_decode_copy_safe(p, end, info->fscrypt_file, 231 info->fscrypt_file_len, bad); 232 } 233 } 234 *p = end; 235 } else { 236 /* legacy (unversioned) struct */ 237 if (features & CEPH_FEATURE_MDS_INLINE_DATA) { 238 ceph_decode_64_safe(p, end, info->inline_version, bad); 239 ceph_decode_32_safe(p, end, info->inline_len, bad); 240 ceph_decode_need(p, end, info->inline_len, bad); 241 info->inline_data = *p; 242 *p += info->inline_len; 243 } else 244 info->inline_version = CEPH_INLINE_NONE; 245 246 if (features & CEPH_FEATURE_MDS_QUOTA) { 247 err = parse_reply_info_quota(p, end, info); 248 if (err < 0) 249 goto out_bad; 250 } else { 251 info->max_bytes = 0; 252 info->max_files = 0; 253 } 254 255 info->pool_ns_len = 0; 256 info->pool_ns_data = NULL; 257 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { 258 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 259 if (info->pool_ns_len > 0) { 260 ceph_decode_need(p, end, info->pool_ns_len, bad); 261 info->pool_ns_data = *p; 262 *p += info->pool_ns_len; 263 } 264 } 265 266 if (features & CEPH_FEATURE_FS_BTIME) { 267 ceph_decode_need(p, end, sizeof(info->btime), bad); 268 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 269 ceph_decode_64_safe(p, end, info->change_attr, bad); 270 } 271 272 info->dir_pin = -ENODATA; 273 /* info->snap_btime and info->rsnaps remain zero */ 274 } 275 return 0; 276 bad: 277 err = -EIO; 278 out_bad: 279 return err; 280 } 281 282 static int parse_reply_info_dir(void **p, void *end, 283 struct ceph_mds_reply_dirfrag **dirfrag, 284 u64 features) 285 { 286 if (features == (u64)-1) { 287 u8 struct_v, struct_compat; 288 u32 struct_len; 289 ceph_decode_8_safe(p, end, struct_v, bad); 290 ceph_decode_8_safe(p, end, struct_compat, bad); 291 /* struct_v is expected to be >= 1. we only understand 292 * encoding whose struct_compat == 1. */ 293 if (!struct_v || struct_compat != 1) 294 goto bad; 295 ceph_decode_32_safe(p, end, struct_len, bad); 296 ceph_decode_need(p, end, struct_len, bad); 297 end = *p + struct_len; 298 } 299 300 ceph_decode_need(p, end, sizeof(**dirfrag), bad); 301 *dirfrag = *p; 302 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist); 303 if (unlikely(*p > end)) 304 goto bad; 305 if (features == (u64)-1) 306 *p = end; 307 return 0; 308 bad: 309 return -EIO; 310 } 311 312 static int parse_reply_info_lease(void **p, void *end, 313 struct ceph_mds_reply_lease **lease, 314 u64 features, u32 *altname_len, u8 **altname) 315 { 316 u8 struct_v; 317 u32 struct_len; 318 void *lend; 319 320 if (features == (u64)-1) { 321 u8 struct_compat; 322 323 ceph_decode_8_safe(p, end, struct_v, bad); 324 ceph_decode_8_safe(p, end, struct_compat, bad); 325 326 /* struct_v is expected to be >= 1. we only understand 327 * encoding whose struct_compat == 1. */ 328 if (!struct_v || struct_compat != 1) 329 goto bad; 330 331 ceph_decode_32_safe(p, end, struct_len, bad); 332 } else { 333 struct_len = sizeof(**lease); 334 *altname_len = 0; 335 *altname = NULL; 336 } 337 338 lend = *p + struct_len; 339 ceph_decode_need(p, end, struct_len, bad); 340 *lease = *p; 341 *p += sizeof(**lease); 342 343 if (features == (u64)-1) { 344 if (struct_v >= 2) { 345 ceph_decode_32_safe(p, end, *altname_len, bad); 346 ceph_decode_need(p, end, *altname_len, bad); 347 *altname = *p; 348 *p += *altname_len; 349 } else { 350 *altname = NULL; 351 *altname_len = 0; 352 } 353 } 354 *p = lend; 355 return 0; 356 bad: 357 return -EIO; 358 } 359 360 /* 361 * parse a normal reply, which may contain a (dir+)dentry and/or a 362 * target inode. 363 */ 364 static int parse_reply_info_trace(void **p, void *end, 365 struct ceph_mds_reply_info_parsed *info, 366 u64 features) 367 { 368 int err; 369 370 if (info->head->is_dentry) { 371 err = parse_reply_info_in(p, end, &info->diri, features); 372 if (err < 0) 373 goto out_bad; 374 375 err = parse_reply_info_dir(p, end, &info->dirfrag, features); 376 if (err < 0) 377 goto out_bad; 378 379 ceph_decode_32_safe(p, end, info->dname_len, bad); 380 ceph_decode_need(p, end, info->dname_len, bad); 381 info->dname = *p; 382 *p += info->dname_len; 383 384 err = parse_reply_info_lease(p, end, &info->dlease, features, 385 &info->altname_len, &info->altname); 386 if (err < 0) 387 goto out_bad; 388 } 389 390 if (info->head->is_target) { 391 err = parse_reply_info_in(p, end, &info->targeti, features); 392 if (err < 0) 393 goto out_bad; 394 } 395 396 if (unlikely(*p != end)) 397 goto bad; 398 return 0; 399 400 bad: 401 err = -EIO; 402 out_bad: 403 pr_err("problem parsing mds trace %d\n", err); 404 return err; 405 } 406 407 /* 408 * parse readdir results 409 */ 410 static int parse_reply_info_readdir(void **p, void *end, 411 struct ceph_mds_request *req, 412 u64 features) 413 { 414 struct ceph_mds_reply_info_parsed *info = &req->r_reply_info; 415 struct ceph_client *cl = req->r_mdsc->fsc->client; 416 u32 num, i = 0; 417 int err; 418 419 err = parse_reply_info_dir(p, end, &info->dir_dir, features); 420 if (err < 0) 421 goto out_bad; 422 423 ceph_decode_need(p, end, sizeof(num) + 2, bad); 424 num = ceph_decode_32(p); 425 { 426 u16 flags = ceph_decode_16(p); 427 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END); 428 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE); 429 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER); 430 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH); 431 } 432 if (num == 0) 433 goto done; 434 435 BUG_ON(!info->dir_entries); 436 if ((unsigned long)(info->dir_entries + num) > 437 (unsigned long)info->dir_entries + info->dir_buf_size) { 438 pr_err_client(cl, "dir contents are larger than expected\n"); 439 WARN_ON(1); 440 goto bad; 441 } 442 443 info->dir_nr = num; 444 while (num) { 445 struct inode *inode = d_inode(req->r_dentry); 446 struct ceph_inode_info *ci = ceph_inode(inode); 447 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 448 struct fscrypt_str tname = FSTR_INIT(NULL, 0); 449 struct fscrypt_str oname = FSTR_INIT(NULL, 0); 450 struct ceph_fname fname; 451 u32 altname_len, _name_len; 452 u8 *altname, *_name; 453 454 /* dentry */ 455 ceph_decode_32_safe(p, end, _name_len, bad); 456 ceph_decode_need(p, end, _name_len, bad); 457 _name = *p; 458 *p += _name_len; 459 doutc(cl, "parsed dir dname '%.*s'\n", _name_len, _name); 460 461 if (info->hash_order) 462 rde->raw_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash, 463 _name, _name_len); 464 465 /* dentry lease */ 466 err = parse_reply_info_lease(p, end, &rde->lease, features, 467 &altname_len, &altname); 468 if (err) 469 goto out_bad; 470 471 /* 472 * Try to dencrypt the dentry names and update them 473 * in the ceph_mds_reply_dir_entry struct. 474 */ 475 fname.dir = inode; 476 fname.name = _name; 477 fname.name_len = _name_len; 478 fname.ctext = altname; 479 fname.ctext_len = altname_len; 480 /* 481 * The _name_len maybe larger than altname_len, such as 482 * when the human readable name length is in range of 483 * (CEPH_NOHASH_NAME_MAX, CEPH_NOHASH_NAME_MAX + SHA256_DIGEST_SIZE), 484 * then the copy in ceph_fname_to_usr will corrupt the 485 * data if there has no encryption key. 486 * 487 * Just set the no_copy flag and then if there has no 488 * encryption key the oname.name will be assigned to 489 * _name always. 490 */ 491 fname.no_copy = true; 492 if (altname_len == 0) { 493 /* 494 * Set tname to _name, and this will be used 495 * to do the base64_decode in-place. It's 496 * safe because the decoded string should 497 * always be shorter, which is 3/4 of origin 498 * string. 499 */ 500 tname.name = _name; 501 502 /* 503 * Set oname to _name too, and this will be 504 * used to do the dencryption in-place. 505 */ 506 oname.name = _name; 507 oname.len = _name_len; 508 } else { 509 /* 510 * This will do the decryption only in-place 511 * from altname cryptext directly. 512 */ 513 oname.name = altname; 514 oname.len = altname_len; 515 } 516 rde->is_nokey = false; 517 err = ceph_fname_to_usr(&fname, &tname, &oname, &rde->is_nokey); 518 if (err) { 519 pr_err_client(cl, "unable to decode %.*s, got %d\n", 520 _name_len, _name, err); 521 goto out_bad; 522 } 523 rde->name = oname.name; 524 rde->name_len = oname.len; 525 526 /* inode */ 527 err = parse_reply_info_in(p, end, &rde->inode, features); 528 if (err < 0) 529 goto out_bad; 530 /* ceph_readdir_prepopulate() will update it */ 531 rde->offset = 0; 532 i++; 533 num--; 534 } 535 536 done: 537 /* Skip over any unrecognized fields */ 538 *p = end; 539 return 0; 540 541 bad: 542 err = -EIO; 543 out_bad: 544 pr_err_client(cl, "problem parsing dir contents %d\n", err); 545 return err; 546 } 547 548 /* 549 * parse fcntl F_GETLK results 550 */ 551 static int parse_reply_info_filelock(void **p, void *end, 552 struct ceph_mds_reply_info_parsed *info, 553 u64 features) 554 { 555 if (*p + sizeof(*info->filelock_reply) > end) 556 goto bad; 557 558 info->filelock_reply = *p; 559 560 /* Skip over any unrecognized fields */ 561 *p = end; 562 return 0; 563 bad: 564 return -EIO; 565 } 566 567 568 #if BITS_PER_LONG == 64 569 570 #define DELEGATED_INO_AVAILABLE xa_mk_value(1) 571 572 static int ceph_parse_deleg_inos(void **p, void *end, 573 struct ceph_mds_session *s) 574 { 575 struct ceph_client *cl = s->s_mdsc->fsc->client; 576 u32 sets; 577 578 ceph_decode_32_safe(p, end, sets, bad); 579 doutc(cl, "got %u sets of delegated inodes\n", sets); 580 while (sets--) { 581 u64 start, len; 582 583 ceph_decode_64_safe(p, end, start, bad); 584 ceph_decode_64_safe(p, end, len, bad); 585 586 /* Don't accept a delegation of system inodes */ 587 if (start < CEPH_INO_SYSTEM_BASE) { 588 pr_warn_ratelimited_client(cl, 589 "ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n", 590 start, len); 591 continue; 592 } 593 while (len--) { 594 int err = xa_insert(&s->s_delegated_inos, start++, 595 DELEGATED_INO_AVAILABLE, 596 GFP_KERNEL); 597 if (!err) { 598 doutc(cl, "added delegated inode 0x%llx\n", start - 1); 599 } else if (err == -EBUSY) { 600 pr_warn_client(cl, 601 "MDS delegated inode 0x%llx more than once.\n", 602 start - 1); 603 } else { 604 return err; 605 } 606 } 607 } 608 return 0; 609 bad: 610 return -EIO; 611 } 612 613 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 614 { 615 unsigned long ino; 616 void *val; 617 618 xa_for_each(&s->s_delegated_inos, ino, val) { 619 val = xa_erase(&s->s_delegated_inos, ino); 620 if (val == DELEGATED_INO_AVAILABLE) 621 return ino; 622 } 623 return 0; 624 } 625 626 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 627 { 628 return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE, 629 GFP_KERNEL); 630 } 631 #else /* BITS_PER_LONG == 64 */ 632 /* 633 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just 634 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top 635 * and bottom words? 636 */ 637 static int ceph_parse_deleg_inos(void **p, void *end, 638 struct ceph_mds_session *s) 639 { 640 u32 sets; 641 642 ceph_decode_32_safe(p, end, sets, bad); 643 if (sets) 644 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad); 645 return 0; 646 bad: 647 return -EIO; 648 } 649 650 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 651 { 652 return 0; 653 } 654 655 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 656 { 657 return 0; 658 } 659 #endif /* BITS_PER_LONG == 64 */ 660 661 /* 662 * parse create results 663 */ 664 static int parse_reply_info_create(void **p, void *end, 665 struct ceph_mds_reply_info_parsed *info, 666 u64 features, struct ceph_mds_session *s) 667 { 668 int ret; 669 670 if (features == (u64)-1 || 671 (features & CEPH_FEATURE_REPLY_CREATE_INODE)) { 672 if (*p == end) { 673 /* Malformed reply? */ 674 info->has_create_ino = false; 675 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) { 676 info->has_create_ino = true; 677 /* struct_v, struct_compat, and len */ 678 ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad); 679 ceph_decode_64_safe(p, end, info->ino, bad); 680 ret = ceph_parse_deleg_inos(p, end, s); 681 if (ret) 682 return ret; 683 } else { 684 /* legacy */ 685 ceph_decode_64_safe(p, end, info->ino, bad); 686 info->has_create_ino = true; 687 } 688 } else { 689 if (*p != end) 690 goto bad; 691 } 692 693 /* Skip over any unrecognized fields */ 694 *p = end; 695 return 0; 696 bad: 697 return -EIO; 698 } 699 700 static int parse_reply_info_getvxattr(void **p, void *end, 701 struct ceph_mds_reply_info_parsed *info, 702 u64 features) 703 { 704 u32 value_len; 705 706 ceph_decode_skip_8(p, end, bad); /* skip current version: 1 */ 707 ceph_decode_skip_8(p, end, bad); /* skip first version: 1 */ 708 ceph_decode_skip_32(p, end, bad); /* skip payload length */ 709 710 ceph_decode_32_safe(p, end, value_len, bad); 711 712 if (value_len == end - *p) { 713 info->xattr_info.xattr_value = *p; 714 info->xattr_info.xattr_value_len = value_len; 715 *p = end; 716 return value_len; 717 } 718 bad: 719 return -EIO; 720 } 721 722 /* 723 * parse extra results 724 */ 725 static int parse_reply_info_extra(void **p, void *end, 726 struct ceph_mds_request *req, 727 u64 features, struct ceph_mds_session *s) 728 { 729 struct ceph_mds_reply_info_parsed *info = &req->r_reply_info; 730 u32 op = le32_to_cpu(info->head->op); 731 732 if (op == CEPH_MDS_OP_GETFILELOCK) 733 return parse_reply_info_filelock(p, end, info, features); 734 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP) 735 return parse_reply_info_readdir(p, end, req, features); 736 else if (op == CEPH_MDS_OP_CREATE) 737 return parse_reply_info_create(p, end, info, features, s); 738 else if (op == CEPH_MDS_OP_GETVXATTR) 739 return parse_reply_info_getvxattr(p, end, info, features); 740 else 741 return -EIO; 742 } 743 744 /* 745 * parse entire mds reply 746 */ 747 static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg, 748 struct ceph_mds_request *req, u64 features) 749 { 750 struct ceph_mds_reply_info_parsed *info = &req->r_reply_info; 751 struct ceph_client *cl = s->s_mdsc->fsc->client; 752 void *p, *end; 753 u32 len; 754 int err; 755 756 info->head = msg->front.iov_base; 757 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); 758 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head); 759 760 /* trace */ 761 ceph_decode_32_safe(&p, end, len, bad); 762 if (len > 0) { 763 ceph_decode_need(&p, end, len, bad); 764 err = parse_reply_info_trace(&p, p+len, info, features); 765 if (err < 0) 766 goto out_bad; 767 } 768 769 /* extra */ 770 ceph_decode_32_safe(&p, end, len, bad); 771 if (len > 0) { 772 ceph_decode_need(&p, end, len, bad); 773 err = parse_reply_info_extra(&p, p+len, req, features, s); 774 if (err < 0) 775 goto out_bad; 776 } 777 778 /* snap blob */ 779 ceph_decode_32_safe(&p, end, len, bad); 780 info->snapblob_len = len; 781 info->snapblob = p; 782 p += len; 783 784 if (p != end) 785 goto bad; 786 return 0; 787 788 bad: 789 err = -EIO; 790 out_bad: 791 pr_err_client(cl, "mds parse_reply err %d\n", err); 792 ceph_msg_dump(msg); 793 return err; 794 } 795 796 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 797 { 798 int i; 799 800 kfree(info->diri.fscrypt_auth); 801 kfree(info->diri.fscrypt_file); 802 kfree(info->targeti.fscrypt_auth); 803 kfree(info->targeti.fscrypt_file); 804 if (!info->dir_entries) 805 return; 806 807 for (i = 0; i < info->dir_nr; i++) { 808 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 809 810 kfree(rde->inode.fscrypt_auth); 811 kfree(rde->inode.fscrypt_file); 812 } 813 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size)); 814 } 815 816 /* 817 * In async unlink case the kclient won't wait for the first reply 818 * from MDS and just drop all the links and unhash the dentry and then 819 * succeeds immediately. 820 * 821 * For any new create/link/rename,etc requests followed by using the 822 * same file names we must wait for the first reply of the inflight 823 * unlink request, or the MDS possibly will fail these following 824 * requests with -EEXIST if the inflight async unlink request was 825 * delayed for some reasons. 826 * 827 * And the worst case is that for the none async openc request it will 828 * successfully open the file if the CDentry hasn't been unlinked yet, 829 * but later the previous delayed async unlink request will remove the 830 * CDentry. That means the just created file is possibly deleted later 831 * by accident. 832 * 833 * We need to wait for the inflight async unlink requests to finish 834 * when creating new files/directories by using the same file names. 835 */ 836 int ceph_wait_on_conflict_unlink(struct dentry *dentry) 837 { 838 struct ceph_fs_client *fsc = ceph_sb_to_fs_client(dentry->d_sb); 839 struct ceph_client *cl = fsc->client; 840 struct dentry *pdentry = dentry->d_parent; 841 struct dentry *udentry, *found = NULL; 842 struct ceph_dentry_info *di; 843 struct qstr dname; 844 u32 hash = dentry->d_name.hash; 845 int err; 846 847 dname.name = dentry->d_name.name; 848 dname.len = dentry->d_name.len; 849 850 rcu_read_lock(); 851 hash_for_each_possible_rcu(fsc->async_unlink_conflict, di, 852 hnode, hash) { 853 udentry = di->dentry; 854 855 spin_lock(&udentry->d_lock); 856 if (udentry->d_name.hash != hash) 857 goto next; 858 if (unlikely(udentry->d_parent != pdentry)) 859 goto next; 860 if (!hash_hashed(&di->hnode)) 861 goto next; 862 863 if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags)) 864 pr_warn_client(cl, "dentry %p:%pd async unlink bit is not set\n", 865 dentry, dentry); 866 867 if (!d_same_name(udentry, pdentry, &dname)) 868 goto next; 869 870 found = dget_dlock(udentry); 871 spin_unlock(&udentry->d_lock); 872 break; 873 next: 874 spin_unlock(&udentry->d_lock); 875 } 876 rcu_read_unlock(); 877 878 if (likely(!found)) 879 return 0; 880 881 doutc(cl, "dentry %p:%pd conflict with old %p:%pd\n", dentry, dentry, 882 found, found); 883 884 err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT, 885 TASK_KILLABLE); 886 dput(found); 887 return err; 888 } 889 890 891 /* 892 * sessions 893 */ 894 const char *ceph_session_state_name(int s) 895 { 896 switch (s) { 897 case CEPH_MDS_SESSION_NEW: return "new"; 898 case CEPH_MDS_SESSION_OPENING: return "opening"; 899 case CEPH_MDS_SESSION_OPEN: return "open"; 900 case CEPH_MDS_SESSION_HUNG: return "hung"; 901 case CEPH_MDS_SESSION_CLOSING: return "closing"; 902 case CEPH_MDS_SESSION_CLOSED: return "closed"; 903 case CEPH_MDS_SESSION_RESTARTING: return "restarting"; 904 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; 905 case CEPH_MDS_SESSION_REJECTED: return "rejected"; 906 default: return "???"; 907 } 908 } 909 910 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s) 911 { 912 if (refcount_inc_not_zero(&s->s_ref)) 913 return s; 914 return NULL; 915 } 916 917 void ceph_put_mds_session(struct ceph_mds_session *s) 918 { 919 if (IS_ERR_OR_NULL(s)) 920 return; 921 922 if (refcount_dec_and_test(&s->s_ref)) { 923 if (s->s_auth.authorizer) 924 ceph_auth_destroy_authorizer(s->s_auth.authorizer); 925 WARN_ON(mutex_is_locked(&s->s_mutex)); 926 xa_destroy(&s->s_delegated_inos); 927 kfree(s); 928 } 929 } 930 931 /* 932 * called under mdsc->mutex 933 */ 934 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, 935 int mds) 936 { 937 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 938 return NULL; 939 return ceph_get_mds_session(mdsc->sessions[mds]); 940 } 941 942 static bool __have_session(struct ceph_mds_client *mdsc, int mds) 943 { 944 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 945 return false; 946 else 947 return true; 948 } 949 950 static int __verify_registered_session(struct ceph_mds_client *mdsc, 951 struct ceph_mds_session *s) 952 { 953 if (s->s_mds >= mdsc->max_sessions || 954 mdsc->sessions[s->s_mds] != s) 955 return -ENOENT; 956 return 0; 957 } 958 959 /* 960 * create+register a new session for given mds. 961 * called under mdsc->mutex. 962 */ 963 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, 964 int mds) 965 { 966 struct ceph_client *cl = mdsc->fsc->client; 967 struct ceph_mds_session *s; 968 969 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) 970 return ERR_PTR(-EIO); 971 972 if (mds >= mdsc->mdsmap->possible_max_rank) 973 return ERR_PTR(-EINVAL); 974 975 s = kzalloc(sizeof(*s), GFP_NOFS); 976 if (!s) 977 return ERR_PTR(-ENOMEM); 978 979 if (mds >= mdsc->max_sessions) { 980 int newmax = 1 << get_count_order(mds + 1); 981 struct ceph_mds_session **sa; 982 size_t ptr_size = sizeof(struct ceph_mds_session *); 983 984 doutc(cl, "realloc to %d\n", newmax); 985 sa = kcalloc(newmax, ptr_size, GFP_NOFS); 986 if (!sa) 987 goto fail_realloc; 988 if (mdsc->sessions) { 989 memcpy(sa, mdsc->sessions, 990 mdsc->max_sessions * ptr_size); 991 kfree(mdsc->sessions); 992 } 993 mdsc->sessions = sa; 994 mdsc->max_sessions = newmax; 995 } 996 997 doutc(cl, "mds%d\n", mds); 998 s->s_mdsc = mdsc; 999 s->s_mds = mds; 1000 s->s_state = CEPH_MDS_SESSION_NEW; 1001 mutex_init(&s->s_mutex); 1002 1003 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr); 1004 1005 atomic_set(&s->s_cap_gen, 1); 1006 s->s_cap_ttl = jiffies - 1; 1007 1008 spin_lock_init(&s->s_cap_lock); 1009 INIT_LIST_HEAD(&s->s_caps); 1010 refcount_set(&s->s_ref, 1); 1011 INIT_LIST_HEAD(&s->s_waiting); 1012 INIT_LIST_HEAD(&s->s_unsafe); 1013 xa_init(&s->s_delegated_inos); 1014 INIT_LIST_HEAD(&s->s_cap_releases); 1015 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work); 1016 1017 INIT_LIST_HEAD(&s->s_cap_dirty); 1018 INIT_LIST_HEAD(&s->s_cap_flushing); 1019 1020 mdsc->sessions[mds] = s; 1021 atomic_inc(&mdsc->num_sessions); 1022 refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 1023 1024 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, 1025 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 1026 1027 return s; 1028 1029 fail_realloc: 1030 kfree(s); 1031 return ERR_PTR(-ENOMEM); 1032 } 1033 1034 /* 1035 * called under mdsc->mutex 1036 */ 1037 static void __unregister_session(struct ceph_mds_client *mdsc, 1038 struct ceph_mds_session *s) 1039 { 1040 doutc(mdsc->fsc->client, "mds%d %p\n", s->s_mds, s); 1041 BUG_ON(mdsc->sessions[s->s_mds] != s); 1042 mdsc->sessions[s->s_mds] = NULL; 1043 ceph_con_close(&s->s_con); 1044 ceph_put_mds_session(s); 1045 atomic_dec(&mdsc->num_sessions); 1046 } 1047 1048 /* 1049 * drop session refs in request. 1050 * 1051 * should be last request ref, or hold mdsc->mutex 1052 */ 1053 static void put_request_session(struct ceph_mds_request *req) 1054 { 1055 if (req->r_session) { 1056 ceph_put_mds_session(req->r_session); 1057 req->r_session = NULL; 1058 } 1059 } 1060 1061 void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc, 1062 void (*cb)(struct ceph_mds_session *), 1063 bool check_state) 1064 { 1065 int mds; 1066 1067 mutex_lock(&mdsc->mutex); 1068 for (mds = 0; mds < mdsc->max_sessions; ++mds) { 1069 struct ceph_mds_session *s; 1070 1071 s = __ceph_lookup_mds_session(mdsc, mds); 1072 if (!s) 1073 continue; 1074 1075 if (check_state && !check_session_state(s)) { 1076 ceph_put_mds_session(s); 1077 continue; 1078 } 1079 1080 mutex_unlock(&mdsc->mutex); 1081 cb(s); 1082 ceph_put_mds_session(s); 1083 mutex_lock(&mdsc->mutex); 1084 } 1085 mutex_unlock(&mdsc->mutex); 1086 } 1087 1088 void ceph_mdsc_release_request(struct kref *kref) 1089 { 1090 struct ceph_mds_request *req = container_of(kref, 1091 struct ceph_mds_request, 1092 r_kref); 1093 ceph_mdsc_release_dir_caps_async(req); 1094 destroy_reply_info(&req->r_reply_info); 1095 if (req->r_request) 1096 ceph_msg_put(req->r_request); 1097 if (req->r_reply) 1098 ceph_msg_put(req->r_reply); 1099 if (req->r_inode) { 1100 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 1101 iput(req->r_inode); 1102 } 1103 if (req->r_parent) { 1104 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 1105 iput(req->r_parent); 1106 } 1107 iput(req->r_target_inode); 1108 iput(req->r_new_inode); 1109 if (req->r_dentry) 1110 dput(req->r_dentry); 1111 if (req->r_old_dentry) 1112 dput(req->r_old_dentry); 1113 if (req->r_old_dentry_dir) { 1114 /* 1115 * track (and drop pins for) r_old_dentry_dir 1116 * separately, since r_old_dentry's d_parent may have 1117 * changed between the dir mutex being dropped and 1118 * this request being freed. 1119 */ 1120 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), 1121 CEPH_CAP_PIN); 1122 iput(req->r_old_dentry_dir); 1123 } 1124 kfree(req->r_path1); 1125 kfree(req->r_path2); 1126 put_cred(req->r_cred); 1127 if (req->r_mnt_idmap) 1128 mnt_idmap_put(req->r_mnt_idmap); 1129 if (req->r_pagelist) 1130 ceph_pagelist_release(req->r_pagelist); 1131 kfree(req->r_fscrypt_auth); 1132 kfree(req->r_altname); 1133 put_request_session(req); 1134 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); 1135 WARN_ON_ONCE(!list_empty(&req->r_wait)); 1136 kmem_cache_free(ceph_mds_request_cachep, req); 1137 } 1138 1139 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node) 1140 1141 /* 1142 * lookup session, bump ref if found. 1143 * 1144 * called under mdsc->mutex. 1145 */ 1146 static struct ceph_mds_request * 1147 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid) 1148 { 1149 struct ceph_mds_request *req; 1150 1151 req = lookup_request(&mdsc->request_tree, tid); 1152 if (req) 1153 ceph_mdsc_get_request(req); 1154 1155 return req; 1156 } 1157 1158 /* 1159 * Register an in-flight request, and assign a tid. Link to directory 1160 * are modifying (if any). 1161 * 1162 * Called under mdsc->mutex. 1163 */ 1164 static void __register_request(struct ceph_mds_client *mdsc, 1165 struct ceph_mds_request *req, 1166 struct inode *dir) 1167 { 1168 struct ceph_client *cl = mdsc->fsc->client; 1169 int ret = 0; 1170 1171 req->r_tid = ++mdsc->last_tid; 1172 if (req->r_num_caps) { 1173 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation, 1174 req->r_num_caps); 1175 if (ret < 0) { 1176 pr_err_client(cl, "%p failed to reserve caps: %d\n", 1177 req, ret); 1178 /* set req->r_err to fail early from __do_request */ 1179 req->r_err = ret; 1180 return; 1181 } 1182 } 1183 doutc(cl, "%p tid %lld\n", req, req->r_tid); 1184 ceph_mdsc_get_request(req); 1185 insert_request(&mdsc->request_tree, req); 1186 1187 req->r_cred = get_current_cred(); 1188 if (!req->r_mnt_idmap) 1189 req->r_mnt_idmap = &nop_mnt_idmap; 1190 1191 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK) 1192 mdsc->oldest_tid = req->r_tid; 1193 1194 if (dir) { 1195 struct ceph_inode_info *ci = ceph_inode(dir); 1196 1197 ihold(dir); 1198 req->r_unsafe_dir = dir; 1199 spin_lock(&ci->i_unsafe_lock); 1200 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); 1201 spin_unlock(&ci->i_unsafe_lock); 1202 } 1203 } 1204 1205 static void __unregister_request(struct ceph_mds_client *mdsc, 1206 struct ceph_mds_request *req) 1207 { 1208 doutc(mdsc->fsc->client, "%p tid %lld\n", req, req->r_tid); 1209 1210 /* Never leave an unregistered request on an unsafe list! */ 1211 list_del_init(&req->r_unsafe_item); 1212 1213 if (req->r_tid == mdsc->oldest_tid) { 1214 struct rb_node *p = rb_next(&req->r_node); 1215 mdsc->oldest_tid = 0; 1216 while (p) { 1217 struct ceph_mds_request *next_req = 1218 rb_entry(p, struct ceph_mds_request, r_node); 1219 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) { 1220 mdsc->oldest_tid = next_req->r_tid; 1221 break; 1222 } 1223 p = rb_next(p); 1224 } 1225 } 1226 1227 erase_request(&mdsc->request_tree, req); 1228 1229 if (req->r_unsafe_dir) { 1230 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 1231 spin_lock(&ci->i_unsafe_lock); 1232 list_del_init(&req->r_unsafe_dir_item); 1233 spin_unlock(&ci->i_unsafe_lock); 1234 } 1235 if (req->r_target_inode && 1236 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 1237 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); 1238 spin_lock(&ci->i_unsafe_lock); 1239 list_del_init(&req->r_unsafe_target_item); 1240 spin_unlock(&ci->i_unsafe_lock); 1241 } 1242 1243 if (req->r_unsafe_dir) { 1244 iput(req->r_unsafe_dir); 1245 req->r_unsafe_dir = NULL; 1246 } 1247 1248 complete_all(&req->r_safe_completion); 1249 1250 ceph_mdsc_put_request(req); 1251 } 1252 1253 /* 1254 * Walk back up the dentry tree until we hit a dentry representing a 1255 * non-snapshot inode. We do this using the rcu_read_lock (which must be held 1256 * when calling this) to ensure that the objects won't disappear while we're 1257 * working with them. Once we hit a candidate dentry, we attempt to take a 1258 * reference to it, and return that as the result. 1259 */ 1260 static struct inode *get_nonsnap_parent(struct dentry *dentry) 1261 { 1262 struct inode *inode = NULL; 1263 1264 while (dentry && !IS_ROOT(dentry)) { 1265 inode = d_inode_rcu(dentry); 1266 if (!inode || ceph_snap(inode) == CEPH_NOSNAP) 1267 break; 1268 dentry = dentry->d_parent; 1269 } 1270 if (inode) 1271 inode = igrab(inode); 1272 return inode; 1273 } 1274 1275 /* 1276 * Choose mds to send request to next. If there is a hint set in the 1277 * request (e.g., due to a prior forward hint from the mds), use that. 1278 * Otherwise, consult frag tree and/or caps to identify the 1279 * appropriate mds. If all else fails, choose randomly. 1280 * 1281 * Called under mdsc->mutex. 1282 */ 1283 static int __choose_mds(struct ceph_mds_client *mdsc, 1284 struct ceph_mds_request *req, 1285 bool *random) 1286 { 1287 struct inode *inode; 1288 struct ceph_inode_info *ci; 1289 struct ceph_cap *cap; 1290 int mode = req->r_direct_mode; 1291 int mds = -1; 1292 u32 hash = req->r_direct_hash; 1293 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); 1294 struct ceph_client *cl = mdsc->fsc->client; 1295 1296 if (random) 1297 *random = false; 1298 1299 /* 1300 * is there a specific mds we should try? ignore hint if we have 1301 * no session and the mds is not up (active or recovering). 1302 */ 1303 if (req->r_resend_mds >= 0 && 1304 (__have_session(mdsc, req->r_resend_mds) || 1305 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) { 1306 doutc(cl, "using resend_mds mds%d\n", req->r_resend_mds); 1307 return req->r_resend_mds; 1308 } 1309 1310 if (mode == USE_RANDOM_MDS) 1311 goto random; 1312 1313 inode = NULL; 1314 if (req->r_inode) { 1315 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) { 1316 inode = req->r_inode; 1317 ihold(inode); 1318 } else { 1319 /* req->r_dentry is non-null for LSSNAP request */ 1320 rcu_read_lock(); 1321 inode = get_nonsnap_parent(req->r_dentry); 1322 rcu_read_unlock(); 1323 doutc(cl, "using snapdir's parent %p %llx.%llx\n", 1324 inode, ceph_vinop(inode)); 1325 } 1326 } else if (req->r_dentry) { 1327 /* ignore race with rename; old or new d_parent is okay */ 1328 struct dentry *parent; 1329 struct inode *dir; 1330 1331 rcu_read_lock(); 1332 parent = READ_ONCE(req->r_dentry->d_parent); 1333 dir = req->r_parent ? : d_inode_rcu(parent); 1334 1335 if (!dir || dir->i_sb != mdsc->fsc->sb) { 1336 /* not this fs or parent went negative */ 1337 inode = d_inode(req->r_dentry); 1338 if (inode) 1339 ihold(inode); 1340 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 1341 /* direct snapped/virtual snapdir requests 1342 * based on parent dir inode */ 1343 inode = get_nonsnap_parent(parent); 1344 doutc(cl, "using nonsnap parent %p %llx.%llx\n", 1345 inode, ceph_vinop(inode)); 1346 } else { 1347 /* dentry target */ 1348 inode = d_inode(req->r_dentry); 1349 if (!inode || mode == USE_AUTH_MDS) { 1350 /* dir + name */ 1351 inode = igrab(dir); 1352 hash = ceph_dentry_hash(dir, req->r_dentry); 1353 is_hash = true; 1354 } else { 1355 ihold(inode); 1356 } 1357 } 1358 rcu_read_unlock(); 1359 } 1360 1361 if (!inode) 1362 goto random; 1363 1364 doutc(cl, "%p %llx.%llx is_hash=%d (0x%x) mode %d\n", inode, 1365 ceph_vinop(inode), (int)is_hash, hash, mode); 1366 ci = ceph_inode(inode); 1367 1368 if (is_hash && S_ISDIR(inode->i_mode)) { 1369 struct ceph_inode_frag frag; 1370 int found; 1371 1372 ceph_choose_frag(ci, hash, &frag, &found); 1373 if (found) { 1374 if (mode == USE_ANY_MDS && frag.ndist > 0) { 1375 u8 r; 1376 1377 /* choose a random replica */ 1378 get_random_bytes(&r, 1); 1379 r %= frag.ndist; 1380 mds = frag.dist[r]; 1381 doutc(cl, "%p %llx.%llx frag %u mds%d (%d/%d)\n", 1382 inode, ceph_vinop(inode), frag.frag, 1383 mds, (int)r, frag.ndist); 1384 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1385 CEPH_MDS_STATE_ACTIVE && 1386 !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds)) 1387 goto out; 1388 } 1389 1390 /* since this file/dir wasn't known to be 1391 * replicated, then we want to look for the 1392 * authoritative mds. */ 1393 if (frag.mds >= 0) { 1394 /* choose auth mds */ 1395 mds = frag.mds; 1396 doutc(cl, "%p %llx.%llx frag %u mds%d (auth)\n", 1397 inode, ceph_vinop(inode), frag.frag, mds); 1398 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1399 CEPH_MDS_STATE_ACTIVE) { 1400 if (!ceph_mdsmap_is_laggy(mdsc->mdsmap, 1401 mds)) 1402 goto out; 1403 } 1404 } 1405 mode = USE_AUTH_MDS; 1406 } 1407 } 1408 1409 spin_lock(&ci->i_ceph_lock); 1410 cap = NULL; 1411 if (mode == USE_AUTH_MDS) 1412 cap = ci->i_auth_cap; 1413 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) 1414 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); 1415 if (!cap) { 1416 spin_unlock(&ci->i_ceph_lock); 1417 iput(inode); 1418 goto random; 1419 } 1420 mds = cap->session->s_mds; 1421 doutc(cl, "%p %llx.%llx mds%d (%scap %p)\n", inode, 1422 ceph_vinop(inode), mds, 1423 cap == ci->i_auth_cap ? "auth " : "", cap); 1424 spin_unlock(&ci->i_ceph_lock); 1425 out: 1426 iput(inode); 1427 return mds; 1428 1429 random: 1430 if (random) 1431 *random = true; 1432 1433 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap); 1434 doutc(cl, "chose random mds%d\n", mds); 1435 return mds; 1436 } 1437 1438 1439 /* 1440 * session messages 1441 */ 1442 struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq) 1443 { 1444 struct ceph_msg *msg; 1445 struct ceph_mds_session_head *h; 1446 1447 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS, 1448 false); 1449 if (!msg) { 1450 pr_err("ENOMEM creating session %s msg\n", 1451 ceph_session_op_name(op)); 1452 return NULL; 1453 } 1454 h = msg->front.iov_base; 1455 h->op = cpu_to_le32(op); 1456 h->seq = cpu_to_le64(seq); 1457 1458 return msg; 1459 } 1460 1461 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED; 1462 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8) 1463 static int encode_supported_features(void **p, void *end) 1464 { 1465 static const size_t count = ARRAY_SIZE(feature_bits); 1466 1467 if (count > 0) { 1468 size_t i; 1469 size_t size = FEATURE_BYTES(count); 1470 unsigned long bit; 1471 1472 if (WARN_ON_ONCE(*p + 4 + size > end)) 1473 return -ERANGE; 1474 1475 ceph_encode_32(p, size); 1476 memset(*p, 0, size); 1477 for (i = 0; i < count; i++) { 1478 bit = feature_bits[i]; 1479 ((unsigned char *)(*p))[bit / 8] |= BIT(bit % 8); 1480 } 1481 *p += size; 1482 } else { 1483 if (WARN_ON_ONCE(*p + 4 > end)) 1484 return -ERANGE; 1485 1486 ceph_encode_32(p, 0); 1487 } 1488 1489 return 0; 1490 } 1491 1492 static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED; 1493 #define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8) 1494 static int encode_metric_spec(void **p, void *end) 1495 { 1496 static const size_t count = ARRAY_SIZE(metric_bits); 1497 1498 /* header */ 1499 if (WARN_ON_ONCE(*p + 2 > end)) 1500 return -ERANGE; 1501 1502 ceph_encode_8(p, 1); /* version */ 1503 ceph_encode_8(p, 1); /* compat */ 1504 1505 if (count > 0) { 1506 size_t i; 1507 size_t size = METRIC_BYTES(count); 1508 1509 if (WARN_ON_ONCE(*p + 4 + 4 + size > end)) 1510 return -ERANGE; 1511 1512 /* metric spec info length */ 1513 ceph_encode_32(p, 4 + size); 1514 1515 /* metric spec */ 1516 ceph_encode_32(p, size); 1517 memset(*p, 0, size); 1518 for (i = 0; i < count; i++) 1519 ((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8); 1520 *p += size; 1521 } else { 1522 if (WARN_ON_ONCE(*p + 4 + 4 > end)) 1523 return -ERANGE; 1524 1525 /* metric spec info length */ 1526 ceph_encode_32(p, 4); 1527 /* metric spec */ 1528 ceph_encode_32(p, 0); 1529 } 1530 1531 return 0; 1532 } 1533 1534 /* 1535 * session message, specialization for CEPH_SESSION_REQUEST_OPEN 1536 * to include additional client metadata fields. 1537 */ 1538 static struct ceph_msg * 1539 create_session_full_msg(struct ceph_mds_client *mdsc, int op, u64 seq) 1540 { 1541 struct ceph_msg *msg; 1542 struct ceph_mds_session_head *h; 1543 int i; 1544 int extra_bytes = 0; 1545 int metadata_key_count = 0; 1546 struct ceph_options *opt = mdsc->fsc->client->options; 1547 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options; 1548 struct ceph_client *cl = mdsc->fsc->client; 1549 size_t size, count; 1550 void *p, *end; 1551 int ret; 1552 1553 const char* metadata[][2] = { 1554 {"hostname", mdsc->nodename}, 1555 {"kernel_version", init_utsname()->release}, 1556 {"entity_id", opt->name ? : ""}, 1557 {"root", fsopt->server_path ? : "/"}, 1558 {NULL, NULL} 1559 }; 1560 1561 /* Calculate serialized length of metadata */ 1562 extra_bytes = 4; /* map length */ 1563 for (i = 0; metadata[i][0]; ++i) { 1564 extra_bytes += 8 + strlen(metadata[i][0]) + 1565 strlen(metadata[i][1]); 1566 metadata_key_count++; 1567 } 1568 1569 /* supported feature */ 1570 size = 0; 1571 count = ARRAY_SIZE(feature_bits); 1572 if (count > 0) 1573 size = FEATURE_BYTES(count); 1574 extra_bytes += 4 + size; 1575 1576 /* metric spec */ 1577 size = 0; 1578 count = ARRAY_SIZE(metric_bits); 1579 if (count > 0) 1580 size = METRIC_BYTES(count); 1581 extra_bytes += 2 + 4 + 4 + size; 1582 1583 /* flags, mds auth caps and oldest_client_tid */ 1584 extra_bytes += 4 + 4 + 8; 1585 1586 /* Allocate the message */ 1587 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes, 1588 GFP_NOFS, false); 1589 if (!msg) { 1590 pr_err_client(cl, "ENOMEM creating session open msg\n"); 1591 return ERR_PTR(-ENOMEM); 1592 } 1593 p = msg->front.iov_base; 1594 end = p + msg->front.iov_len; 1595 1596 h = p; 1597 h->op = cpu_to_le32(op); 1598 h->seq = cpu_to_le64(seq); 1599 1600 /* 1601 * Serialize client metadata into waiting buffer space, using 1602 * the format that userspace expects for map<string, string> 1603 * 1604 * ClientSession messages with metadata are v7 1605 */ 1606 msg->hdr.version = cpu_to_le16(7); 1607 msg->hdr.compat_version = cpu_to_le16(1); 1608 1609 /* The write pointer, following the session_head structure */ 1610 p += sizeof(*h); 1611 1612 /* Number of entries in the map */ 1613 ceph_encode_32(&p, metadata_key_count); 1614 1615 /* Two length-prefixed strings for each entry in the map */ 1616 for (i = 0; metadata[i][0]; ++i) { 1617 size_t const key_len = strlen(metadata[i][0]); 1618 size_t const val_len = strlen(metadata[i][1]); 1619 1620 ceph_encode_32(&p, key_len); 1621 memcpy(p, metadata[i][0], key_len); 1622 p += key_len; 1623 ceph_encode_32(&p, val_len); 1624 memcpy(p, metadata[i][1], val_len); 1625 p += val_len; 1626 } 1627 1628 ret = encode_supported_features(&p, end); 1629 if (ret) { 1630 pr_err_client(cl, "encode_supported_features failed!\n"); 1631 ceph_msg_put(msg); 1632 return ERR_PTR(ret); 1633 } 1634 1635 ret = encode_metric_spec(&p, end); 1636 if (ret) { 1637 pr_err_client(cl, "encode_metric_spec failed!\n"); 1638 ceph_msg_put(msg); 1639 return ERR_PTR(ret); 1640 } 1641 1642 /* version == 5, flags */ 1643 ceph_encode_32(&p, 0); 1644 1645 /* version == 6, mds auth caps */ 1646 ceph_encode_32(&p, 0); 1647 1648 /* version == 7, oldest_client_tid */ 1649 ceph_encode_64(&p, mdsc->oldest_tid); 1650 1651 msg->front.iov_len = p - msg->front.iov_base; 1652 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1653 1654 return msg; 1655 } 1656 1657 /* 1658 * send session open request. 1659 * 1660 * called under mdsc->mutex 1661 */ 1662 static int __open_session(struct ceph_mds_client *mdsc, 1663 struct ceph_mds_session *session) 1664 { 1665 struct ceph_msg *msg; 1666 int mstate; 1667 int mds = session->s_mds; 1668 1669 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) 1670 return -EIO; 1671 1672 /* wait for mds to go active? */ 1673 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 1674 doutc(mdsc->fsc->client, "open_session to mds%d (%s)\n", mds, 1675 ceph_mds_state_name(mstate)); 1676 session->s_state = CEPH_MDS_SESSION_OPENING; 1677 session->s_renew_requested = jiffies; 1678 1679 /* send connect message */ 1680 msg = create_session_full_msg(mdsc, CEPH_SESSION_REQUEST_OPEN, 1681 session->s_seq); 1682 if (IS_ERR(msg)) 1683 return PTR_ERR(msg); 1684 ceph_con_send(&session->s_con, msg); 1685 return 0; 1686 } 1687 1688 /* 1689 * open sessions for any export targets for the given mds 1690 * 1691 * called under mdsc->mutex 1692 */ 1693 static struct ceph_mds_session * 1694 __open_export_target_session(struct ceph_mds_client *mdsc, int target) 1695 { 1696 struct ceph_mds_session *session; 1697 int ret; 1698 1699 session = __ceph_lookup_mds_session(mdsc, target); 1700 if (!session) { 1701 session = register_session(mdsc, target); 1702 if (IS_ERR(session)) 1703 return session; 1704 } 1705 if (session->s_state == CEPH_MDS_SESSION_NEW || 1706 session->s_state == CEPH_MDS_SESSION_CLOSING) { 1707 ret = __open_session(mdsc, session); 1708 if (ret) 1709 return ERR_PTR(ret); 1710 } 1711 1712 return session; 1713 } 1714 1715 struct ceph_mds_session * 1716 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target) 1717 { 1718 struct ceph_mds_session *session; 1719 struct ceph_client *cl = mdsc->fsc->client; 1720 1721 doutc(cl, "to mds%d\n", target); 1722 1723 mutex_lock(&mdsc->mutex); 1724 session = __open_export_target_session(mdsc, target); 1725 mutex_unlock(&mdsc->mutex); 1726 1727 return session; 1728 } 1729 1730 static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 1731 struct ceph_mds_session *session) 1732 { 1733 struct ceph_mds_info *mi; 1734 struct ceph_mds_session *ts; 1735 int i, mds = session->s_mds; 1736 struct ceph_client *cl = mdsc->fsc->client; 1737 1738 if (mds >= mdsc->mdsmap->possible_max_rank) 1739 return; 1740 1741 mi = &mdsc->mdsmap->m_info[mds]; 1742 doutc(cl, "for mds%d (%d targets)\n", session->s_mds, 1743 mi->num_export_targets); 1744 1745 for (i = 0; i < mi->num_export_targets; i++) { 1746 ts = __open_export_target_session(mdsc, mi->export_targets[i]); 1747 ceph_put_mds_session(ts); 1748 } 1749 } 1750 1751 /* 1752 * session caps 1753 */ 1754 1755 static void detach_cap_releases(struct ceph_mds_session *session, 1756 struct list_head *target) 1757 { 1758 struct ceph_client *cl = session->s_mdsc->fsc->client; 1759 1760 lockdep_assert_held(&session->s_cap_lock); 1761 1762 list_splice_init(&session->s_cap_releases, target); 1763 session->s_num_cap_releases = 0; 1764 doutc(cl, "mds%d\n", session->s_mds); 1765 } 1766 1767 static void dispose_cap_releases(struct ceph_mds_client *mdsc, 1768 struct list_head *dispose) 1769 { 1770 while (!list_empty(dispose)) { 1771 struct ceph_cap *cap; 1772 /* zero out the in-progress message */ 1773 cap = list_first_entry(dispose, struct ceph_cap, session_caps); 1774 list_del(&cap->session_caps); 1775 ceph_put_cap(mdsc, cap); 1776 } 1777 } 1778 1779 static void cleanup_session_requests(struct ceph_mds_client *mdsc, 1780 struct ceph_mds_session *session) 1781 { 1782 struct ceph_client *cl = mdsc->fsc->client; 1783 struct ceph_mds_request *req; 1784 struct rb_node *p; 1785 1786 doutc(cl, "mds%d\n", session->s_mds); 1787 mutex_lock(&mdsc->mutex); 1788 while (!list_empty(&session->s_unsafe)) { 1789 req = list_first_entry(&session->s_unsafe, 1790 struct ceph_mds_request, r_unsafe_item); 1791 pr_warn_ratelimited_client(cl, " dropping unsafe request %llu\n", 1792 req->r_tid); 1793 if (req->r_target_inode) 1794 mapping_set_error(req->r_target_inode->i_mapping, -EIO); 1795 if (req->r_unsafe_dir) 1796 mapping_set_error(req->r_unsafe_dir->i_mapping, -EIO); 1797 __unregister_request(mdsc, req); 1798 } 1799 /* zero r_attempts, so kick_requests() will re-send requests */ 1800 p = rb_first(&mdsc->request_tree); 1801 while (p) { 1802 req = rb_entry(p, struct ceph_mds_request, r_node); 1803 p = rb_next(p); 1804 if (req->r_session && 1805 req->r_session->s_mds == session->s_mds) 1806 req->r_attempts = 0; 1807 } 1808 mutex_unlock(&mdsc->mutex); 1809 } 1810 1811 /* 1812 * Helper to safely iterate over all caps associated with a session, with 1813 * special care taken to handle a racing __ceph_remove_cap(). 1814 * 1815 * Caller must hold session s_mutex. 1816 */ 1817 int ceph_iterate_session_caps(struct ceph_mds_session *session, 1818 int (*cb)(struct inode *, int mds, void *), 1819 void *arg) 1820 { 1821 struct ceph_client *cl = session->s_mdsc->fsc->client; 1822 struct list_head *p; 1823 struct ceph_cap *cap; 1824 struct inode *inode, *last_inode = NULL; 1825 struct ceph_cap *old_cap = NULL; 1826 int ret; 1827 1828 doutc(cl, "%p mds%d\n", session, session->s_mds); 1829 spin_lock(&session->s_cap_lock); 1830 p = session->s_caps.next; 1831 while (p != &session->s_caps) { 1832 int mds; 1833 1834 cap = list_entry(p, struct ceph_cap, session_caps); 1835 inode = igrab(&cap->ci->netfs.inode); 1836 if (!inode) { 1837 p = p->next; 1838 continue; 1839 } 1840 session->s_cap_iterator = cap; 1841 mds = cap->mds; 1842 spin_unlock(&session->s_cap_lock); 1843 1844 if (last_inode) { 1845 iput(last_inode); 1846 last_inode = NULL; 1847 } 1848 if (old_cap) { 1849 ceph_put_cap(session->s_mdsc, old_cap); 1850 old_cap = NULL; 1851 } 1852 1853 ret = cb(inode, mds, arg); 1854 last_inode = inode; 1855 1856 spin_lock(&session->s_cap_lock); 1857 p = p->next; 1858 if (!cap->ci) { 1859 doutc(cl, "finishing cap %p removal\n", cap); 1860 BUG_ON(cap->session != session); 1861 cap->session = NULL; 1862 list_del_init(&cap->session_caps); 1863 session->s_nr_caps--; 1864 atomic64_dec(&session->s_mdsc->metric.total_caps); 1865 if (cap->queue_release) 1866 __ceph_queue_cap_release(session, cap); 1867 else 1868 old_cap = cap; /* put_cap it w/o locks held */ 1869 } 1870 if (ret < 0) 1871 goto out; 1872 } 1873 ret = 0; 1874 out: 1875 session->s_cap_iterator = NULL; 1876 spin_unlock(&session->s_cap_lock); 1877 1878 iput(last_inode); 1879 if (old_cap) 1880 ceph_put_cap(session->s_mdsc, old_cap); 1881 1882 return ret; 1883 } 1884 1885 static int remove_session_caps_cb(struct inode *inode, int mds, void *arg) 1886 { 1887 struct ceph_inode_info *ci = ceph_inode(inode); 1888 struct ceph_client *cl = ceph_inode_to_client(inode); 1889 bool invalidate = false; 1890 struct ceph_cap *cap; 1891 int iputs = 0; 1892 1893 spin_lock(&ci->i_ceph_lock); 1894 cap = __get_cap_for_mds(ci, mds); 1895 if (cap) { 1896 doutc(cl, " removing cap %p, ci is %p, inode is %p\n", 1897 cap, ci, &ci->netfs.inode); 1898 1899 iputs = ceph_purge_inode_cap(inode, cap, &invalidate); 1900 } 1901 spin_unlock(&ci->i_ceph_lock); 1902 1903 if (cap) 1904 wake_up_all(&ci->i_cap_wq); 1905 if (invalidate) 1906 ceph_queue_invalidate(inode); 1907 while (iputs--) 1908 iput(inode); 1909 return 0; 1910 } 1911 1912 /* 1913 * caller must hold session s_mutex 1914 */ 1915 static void remove_session_caps(struct ceph_mds_session *session) 1916 { 1917 struct ceph_fs_client *fsc = session->s_mdsc->fsc; 1918 struct super_block *sb = fsc->sb; 1919 LIST_HEAD(dispose); 1920 1921 doutc(fsc->client, "on %p\n", session); 1922 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc); 1923 1924 wake_up_all(&fsc->mdsc->cap_flushing_wq); 1925 1926 spin_lock(&session->s_cap_lock); 1927 if (session->s_nr_caps > 0) { 1928 struct inode *inode; 1929 struct ceph_cap *cap, *prev = NULL; 1930 struct ceph_vino vino; 1931 /* 1932 * iterate_session_caps() skips inodes that are being 1933 * deleted, we need to wait until deletions are complete. 1934 * __wait_on_freeing_inode() is designed for the job, 1935 * but it is not exported, so use lookup inode function 1936 * to access it. 1937 */ 1938 while (!list_empty(&session->s_caps)) { 1939 cap = list_entry(session->s_caps.next, 1940 struct ceph_cap, session_caps); 1941 if (cap == prev) 1942 break; 1943 prev = cap; 1944 vino = cap->ci->i_vino; 1945 spin_unlock(&session->s_cap_lock); 1946 1947 inode = ceph_find_inode(sb, vino); 1948 iput(inode); 1949 1950 spin_lock(&session->s_cap_lock); 1951 } 1952 } 1953 1954 // drop cap expires and unlock s_cap_lock 1955 detach_cap_releases(session, &dispose); 1956 1957 BUG_ON(session->s_nr_caps > 0); 1958 BUG_ON(!list_empty(&session->s_cap_flushing)); 1959 spin_unlock(&session->s_cap_lock); 1960 dispose_cap_releases(session->s_mdsc, &dispose); 1961 } 1962 1963 enum { 1964 RECONNECT, 1965 RENEWCAPS, 1966 FORCE_RO, 1967 }; 1968 1969 /* 1970 * wake up any threads waiting on this session's caps. if the cap is 1971 * old (didn't get renewed on the client reconnect), remove it now. 1972 * 1973 * caller must hold s_mutex. 1974 */ 1975 static int wake_up_session_cb(struct inode *inode, int mds, void *arg) 1976 { 1977 struct ceph_inode_info *ci = ceph_inode(inode); 1978 unsigned long ev = (unsigned long)arg; 1979 1980 if (ev == RECONNECT) { 1981 spin_lock(&ci->i_ceph_lock); 1982 ci->i_wanted_max_size = 0; 1983 ci->i_requested_max_size = 0; 1984 spin_unlock(&ci->i_ceph_lock); 1985 } else if (ev == RENEWCAPS) { 1986 struct ceph_cap *cap; 1987 1988 spin_lock(&ci->i_ceph_lock); 1989 cap = __get_cap_for_mds(ci, mds); 1990 /* mds did not re-issue stale cap */ 1991 if (cap && cap->cap_gen < atomic_read(&cap->session->s_cap_gen)) 1992 cap->issued = cap->implemented = CEPH_CAP_PIN; 1993 spin_unlock(&ci->i_ceph_lock); 1994 } else if (ev == FORCE_RO) { 1995 } 1996 wake_up_all(&ci->i_cap_wq); 1997 return 0; 1998 } 1999 2000 static void wake_up_session_caps(struct ceph_mds_session *session, int ev) 2001 { 2002 struct ceph_client *cl = session->s_mdsc->fsc->client; 2003 2004 doutc(cl, "session %p mds%d\n", session, session->s_mds); 2005 ceph_iterate_session_caps(session, wake_up_session_cb, 2006 (void *)(unsigned long)ev); 2007 } 2008 2009 /* 2010 * Send periodic message to MDS renewing all currently held caps. The 2011 * ack will reset the expiration for all caps from this session. 2012 * 2013 * caller holds s_mutex 2014 */ 2015 static int send_renew_caps(struct ceph_mds_client *mdsc, 2016 struct ceph_mds_session *session) 2017 { 2018 struct ceph_client *cl = mdsc->fsc->client; 2019 struct ceph_msg *msg; 2020 int state; 2021 2022 if (time_after_eq(jiffies, session->s_cap_ttl) && 2023 time_after_eq(session->s_cap_ttl, session->s_renew_requested)) 2024 pr_info_client(cl, "mds%d caps stale\n", session->s_mds); 2025 session->s_renew_requested = jiffies; 2026 2027 /* do not try to renew caps until a recovering mds has reconnected 2028 * with its clients. */ 2029 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); 2030 if (state < CEPH_MDS_STATE_RECONNECT) { 2031 doutc(cl, "ignoring mds%d (%s)\n", session->s_mds, 2032 ceph_mds_state_name(state)); 2033 return 0; 2034 } 2035 2036 doutc(cl, "to mds%d (%s)\n", session->s_mds, 2037 ceph_mds_state_name(state)); 2038 msg = create_session_full_msg(mdsc, CEPH_SESSION_REQUEST_RENEWCAPS, 2039 ++session->s_renew_seq); 2040 if (IS_ERR(msg)) 2041 return PTR_ERR(msg); 2042 ceph_con_send(&session->s_con, msg); 2043 return 0; 2044 } 2045 2046 static int send_flushmsg_ack(struct ceph_mds_client *mdsc, 2047 struct ceph_mds_session *session, u64 seq) 2048 { 2049 struct ceph_client *cl = mdsc->fsc->client; 2050 struct ceph_msg *msg; 2051 2052 doutc(cl, "to mds%d (%s)s seq %lld\n", session->s_mds, 2053 ceph_session_state_name(session->s_state), seq); 2054 msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq); 2055 if (!msg) 2056 return -ENOMEM; 2057 ceph_con_send(&session->s_con, msg); 2058 return 0; 2059 } 2060 2061 2062 /* 2063 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 2064 * 2065 * Called under session->s_mutex 2066 */ 2067 static void renewed_caps(struct ceph_mds_client *mdsc, 2068 struct ceph_mds_session *session, int is_renew) 2069 { 2070 struct ceph_client *cl = mdsc->fsc->client; 2071 int was_stale; 2072 int wake = 0; 2073 2074 spin_lock(&session->s_cap_lock); 2075 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl); 2076 2077 session->s_cap_ttl = session->s_renew_requested + 2078 mdsc->mdsmap->m_session_timeout*HZ; 2079 2080 if (was_stale) { 2081 if (time_before(jiffies, session->s_cap_ttl)) { 2082 pr_info_client(cl, "mds%d caps renewed\n", 2083 session->s_mds); 2084 wake = 1; 2085 } else { 2086 pr_info_client(cl, "mds%d caps still stale\n", 2087 session->s_mds); 2088 } 2089 } 2090 doutc(cl, "mds%d ttl now %lu, was %s, now %s\n", session->s_mds, 2091 session->s_cap_ttl, was_stale ? "stale" : "fresh", 2092 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); 2093 spin_unlock(&session->s_cap_lock); 2094 2095 if (wake) 2096 wake_up_session_caps(session, RENEWCAPS); 2097 } 2098 2099 /* 2100 * send a session close request 2101 */ 2102 static int request_close_session(struct ceph_mds_session *session) 2103 { 2104 struct ceph_client *cl = session->s_mdsc->fsc->client; 2105 struct ceph_msg *msg; 2106 2107 doutc(cl, "mds%d state %s seq %lld\n", session->s_mds, 2108 ceph_session_state_name(session->s_state), session->s_seq); 2109 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE, 2110 session->s_seq); 2111 if (!msg) 2112 return -ENOMEM; 2113 ceph_con_send(&session->s_con, msg); 2114 return 1; 2115 } 2116 2117 /* 2118 * Called with s_mutex held. 2119 */ 2120 static int __close_session(struct ceph_mds_client *mdsc, 2121 struct ceph_mds_session *session) 2122 { 2123 if (session->s_state >= CEPH_MDS_SESSION_CLOSING) 2124 return 0; 2125 session->s_state = CEPH_MDS_SESSION_CLOSING; 2126 return request_close_session(session); 2127 } 2128 2129 static bool drop_negative_children(struct dentry *dentry) 2130 { 2131 struct dentry *child; 2132 bool all_negative = true; 2133 2134 if (!d_is_dir(dentry)) 2135 goto out; 2136 2137 spin_lock(&dentry->d_lock); 2138 hlist_for_each_entry(child, &dentry->d_children, d_sib) { 2139 if (d_really_is_positive(child)) { 2140 all_negative = false; 2141 break; 2142 } 2143 } 2144 spin_unlock(&dentry->d_lock); 2145 2146 if (all_negative) 2147 shrink_dcache_parent(dentry); 2148 out: 2149 return all_negative; 2150 } 2151 2152 /* 2153 * Trim old(er) caps. 2154 * 2155 * Because we can't cache an inode without one or more caps, we do 2156 * this indirectly: if a cap is unused, we prune its aliases, at which 2157 * point the inode will hopefully get dropped to. 2158 * 2159 * Yes, this is a bit sloppy. Our only real goal here is to respond to 2160 * memory pressure from the MDS, though, so it needn't be perfect. 2161 */ 2162 static int trim_caps_cb(struct inode *inode, int mds, void *arg) 2163 { 2164 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); 2165 struct ceph_client *cl = mdsc->fsc->client; 2166 int *remaining = arg; 2167 struct ceph_inode_info *ci = ceph_inode(inode); 2168 int used, wanted, oissued, mine; 2169 struct ceph_cap *cap; 2170 2171 if (*remaining <= 0) 2172 return -1; 2173 2174 spin_lock(&ci->i_ceph_lock); 2175 cap = __get_cap_for_mds(ci, mds); 2176 if (!cap) { 2177 spin_unlock(&ci->i_ceph_lock); 2178 return 0; 2179 } 2180 mine = cap->issued | cap->implemented; 2181 used = __ceph_caps_used(ci); 2182 wanted = __ceph_caps_file_wanted(ci); 2183 oissued = __ceph_caps_issued_other(ci, cap); 2184 2185 doutc(cl, "%p %llx.%llx cap %p mine %s oissued %s used %s wanted %s\n", 2186 inode, ceph_vinop(inode), cap, ceph_cap_string(mine), 2187 ceph_cap_string(oissued), ceph_cap_string(used), 2188 ceph_cap_string(wanted)); 2189 if (cap == ci->i_auth_cap) { 2190 if (ci->i_dirty_caps || ci->i_flushing_caps || 2191 !list_empty(&ci->i_cap_snaps)) 2192 goto out; 2193 if ((used | wanted) & CEPH_CAP_ANY_WR) 2194 goto out; 2195 /* Note: it's possible that i_filelock_ref becomes non-zero 2196 * after dropping auth caps. It doesn't hurt because reply 2197 * of lock mds request will re-add auth caps. */ 2198 if (atomic_read(&ci->i_filelock_ref) > 0) 2199 goto out; 2200 } 2201 /* The inode has cached pages, but it's no longer used. 2202 * we can safely drop it */ 2203 if (S_ISREG(inode->i_mode) && 2204 wanted == 0 && used == CEPH_CAP_FILE_CACHE && 2205 !(oissued & CEPH_CAP_FILE_CACHE)) { 2206 used = 0; 2207 oissued = 0; 2208 } 2209 if ((used | wanted) & ~oissued & mine) 2210 goto out; /* we need these caps */ 2211 2212 if (oissued) { 2213 /* we aren't the only cap.. just remove us */ 2214 ceph_remove_cap(mdsc, cap, true); 2215 (*remaining)--; 2216 } else { 2217 struct dentry *dentry; 2218 /* try dropping referring dentries */ 2219 spin_unlock(&ci->i_ceph_lock); 2220 dentry = d_find_any_alias(inode); 2221 if (dentry && drop_negative_children(dentry)) { 2222 int count; 2223 dput(dentry); 2224 d_prune_aliases(inode); 2225 count = icount_read(inode); 2226 if (count == 1) 2227 (*remaining)--; 2228 doutc(cl, "%p %llx.%llx cap %p pruned, count now %d\n", 2229 inode, ceph_vinop(inode), cap, count); 2230 } else { 2231 dput(dentry); 2232 } 2233 return 0; 2234 } 2235 2236 out: 2237 spin_unlock(&ci->i_ceph_lock); 2238 return 0; 2239 } 2240 2241 /* 2242 * Trim session cap count down to some max number. 2243 */ 2244 int ceph_trim_caps(struct ceph_mds_client *mdsc, 2245 struct ceph_mds_session *session, 2246 int max_caps) 2247 { 2248 struct ceph_client *cl = mdsc->fsc->client; 2249 int trim_caps = session->s_nr_caps - max_caps; 2250 2251 doutc(cl, "mds%d start: %d / %d, trim %d\n", session->s_mds, 2252 session->s_nr_caps, max_caps, trim_caps); 2253 if (trim_caps > 0) { 2254 int remaining = trim_caps; 2255 2256 ceph_iterate_session_caps(session, trim_caps_cb, &remaining); 2257 doutc(cl, "mds%d done: %d / %d, trimmed %d\n", 2258 session->s_mds, session->s_nr_caps, max_caps, 2259 trim_caps - remaining); 2260 } 2261 2262 ceph_flush_session_cap_releases(mdsc, session); 2263 return 0; 2264 } 2265 2266 static int check_caps_flush(struct ceph_mds_client *mdsc, 2267 u64 want_flush_tid) 2268 { 2269 struct ceph_client *cl = mdsc->fsc->client; 2270 int ret = 1; 2271 2272 spin_lock(&mdsc->cap_dirty_lock); 2273 if (!list_empty(&mdsc->cap_flush_list)) { 2274 struct ceph_cap_flush *cf = 2275 list_first_entry(&mdsc->cap_flush_list, 2276 struct ceph_cap_flush, g_list); 2277 if (cf->tid <= want_flush_tid) { 2278 doutc(cl, "still flushing tid %llu <= %llu\n", 2279 cf->tid, want_flush_tid); 2280 ret = 0; 2281 } 2282 } 2283 spin_unlock(&mdsc->cap_dirty_lock); 2284 return ret; 2285 } 2286 2287 /* 2288 * flush all dirty inode data to disk. 2289 * 2290 * returns true if we've flushed through want_flush_tid 2291 */ 2292 static void wait_caps_flush(struct ceph_mds_client *mdsc, 2293 u64 want_flush_tid) 2294 { 2295 struct ceph_client *cl = mdsc->fsc->client; 2296 2297 doutc(cl, "want %llu\n", want_flush_tid); 2298 2299 wait_event(mdsc->cap_flushing_wq, 2300 check_caps_flush(mdsc, want_flush_tid)); 2301 2302 doutc(cl, "ok, flushed thru %llu\n", want_flush_tid); 2303 } 2304 2305 /* 2306 * called under s_mutex 2307 */ 2308 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 2309 struct ceph_mds_session *session) 2310 { 2311 struct ceph_client *cl = mdsc->fsc->client; 2312 struct ceph_msg *msg = NULL; 2313 struct ceph_mds_cap_release *head; 2314 struct ceph_mds_cap_item *item; 2315 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; 2316 struct ceph_cap *cap; 2317 LIST_HEAD(tmp_list); 2318 int num_cap_releases; 2319 __le32 barrier, *cap_barrier; 2320 2321 down_read(&osdc->lock); 2322 barrier = cpu_to_le32(osdc->epoch_barrier); 2323 up_read(&osdc->lock); 2324 2325 spin_lock(&session->s_cap_lock); 2326 again: 2327 list_splice_init(&session->s_cap_releases, &tmp_list); 2328 num_cap_releases = session->s_num_cap_releases; 2329 session->s_num_cap_releases = 0; 2330 spin_unlock(&session->s_cap_lock); 2331 2332 while (!list_empty(&tmp_list)) { 2333 if (!msg) { 2334 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, 2335 PAGE_SIZE, GFP_NOFS, false); 2336 if (!msg) 2337 goto out_err; 2338 head = msg->front.iov_base; 2339 head->num = cpu_to_le32(0); 2340 msg->front.iov_len = sizeof(*head); 2341 2342 msg->hdr.version = cpu_to_le16(2); 2343 msg->hdr.compat_version = cpu_to_le16(1); 2344 } 2345 2346 cap = list_first_entry(&tmp_list, struct ceph_cap, 2347 session_caps); 2348 list_del(&cap->session_caps); 2349 num_cap_releases--; 2350 2351 head = msg->front.iov_base; 2352 put_unaligned_le32(get_unaligned_le32(&head->num) + 1, 2353 &head->num); 2354 item = msg->front.iov_base + msg->front.iov_len; 2355 item->ino = cpu_to_le64(cap->cap_ino); 2356 item->cap_id = cpu_to_le64(cap->cap_id); 2357 item->migrate_seq = cpu_to_le32(cap->mseq); 2358 item->issue_seq = cpu_to_le32(cap->issue_seq); 2359 msg->front.iov_len += sizeof(*item); 2360 2361 ceph_put_cap(mdsc, cap); 2362 2363 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { 2364 // Append cap_barrier field 2365 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2366 *cap_barrier = barrier; 2367 msg->front.iov_len += sizeof(*cap_barrier); 2368 2369 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2370 doutc(cl, "mds%d %p\n", session->s_mds, msg); 2371 ceph_con_send(&session->s_con, msg); 2372 msg = NULL; 2373 } 2374 } 2375 2376 BUG_ON(num_cap_releases != 0); 2377 2378 spin_lock(&session->s_cap_lock); 2379 if (!list_empty(&session->s_cap_releases)) 2380 goto again; 2381 spin_unlock(&session->s_cap_lock); 2382 2383 if (msg) { 2384 // Append cap_barrier field 2385 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2386 *cap_barrier = barrier; 2387 msg->front.iov_len += sizeof(*cap_barrier); 2388 2389 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2390 doutc(cl, "mds%d %p\n", session->s_mds, msg); 2391 ceph_con_send(&session->s_con, msg); 2392 } 2393 return; 2394 out_err: 2395 pr_err_client(cl, "mds%d, failed to allocate message\n", 2396 session->s_mds); 2397 spin_lock(&session->s_cap_lock); 2398 list_splice(&tmp_list, &session->s_cap_releases); 2399 session->s_num_cap_releases += num_cap_releases; 2400 spin_unlock(&session->s_cap_lock); 2401 } 2402 2403 static void ceph_cap_release_work(struct work_struct *work) 2404 { 2405 struct ceph_mds_session *session = 2406 container_of(work, struct ceph_mds_session, s_cap_release_work); 2407 2408 mutex_lock(&session->s_mutex); 2409 if (session->s_state == CEPH_MDS_SESSION_OPEN || 2410 session->s_state == CEPH_MDS_SESSION_HUNG) 2411 ceph_send_cap_releases(session->s_mdsc, session); 2412 mutex_unlock(&session->s_mutex); 2413 ceph_put_mds_session(session); 2414 } 2415 2416 void ceph_flush_session_cap_releases(struct ceph_mds_client *mdsc, 2417 struct ceph_mds_session *session) 2418 { 2419 struct ceph_client *cl = mdsc->fsc->client; 2420 if (mdsc->stopping) 2421 return; 2422 2423 ceph_get_mds_session(session); 2424 if (queue_work(mdsc->fsc->cap_wq, 2425 &session->s_cap_release_work)) { 2426 doutc(cl, "cap release work queued\n"); 2427 } else { 2428 ceph_put_mds_session(session); 2429 doutc(cl, "failed to queue cap release work\n"); 2430 } 2431 } 2432 2433 /* 2434 * caller holds session->s_cap_lock 2435 */ 2436 void __ceph_queue_cap_release(struct ceph_mds_session *session, 2437 struct ceph_cap *cap) 2438 { 2439 list_add_tail(&cap->session_caps, &session->s_cap_releases); 2440 session->s_num_cap_releases++; 2441 2442 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE)) 2443 ceph_flush_session_cap_releases(session->s_mdsc, session); 2444 } 2445 2446 static void ceph_cap_reclaim_work(struct work_struct *work) 2447 { 2448 struct ceph_mds_client *mdsc = 2449 container_of(work, struct ceph_mds_client, cap_reclaim_work); 2450 int ret = ceph_trim_dentries(mdsc); 2451 if (ret == -EAGAIN) 2452 ceph_queue_cap_reclaim_work(mdsc); 2453 } 2454 2455 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc) 2456 { 2457 struct ceph_client *cl = mdsc->fsc->client; 2458 if (mdsc->stopping) 2459 return; 2460 2461 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) { 2462 doutc(cl, "caps reclaim work queued\n"); 2463 } else { 2464 doutc(cl, "failed to queue caps release work\n"); 2465 } 2466 } 2467 2468 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr) 2469 { 2470 int val; 2471 if (!nr) 2472 return; 2473 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending); 2474 if ((val % CEPH_CAPS_PER_RELEASE) < nr) { 2475 atomic_set(&mdsc->cap_reclaim_pending, 0); 2476 ceph_queue_cap_reclaim_work(mdsc); 2477 } 2478 } 2479 2480 void ceph_queue_cap_unlink_work(struct ceph_mds_client *mdsc) 2481 { 2482 struct ceph_client *cl = mdsc->fsc->client; 2483 if (mdsc->stopping) 2484 return; 2485 2486 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_unlink_work)) { 2487 doutc(cl, "caps unlink work queued\n"); 2488 } else { 2489 doutc(cl, "failed to queue caps unlink work\n"); 2490 } 2491 } 2492 2493 static void ceph_cap_unlink_work(struct work_struct *work) 2494 { 2495 struct ceph_mds_client *mdsc = 2496 container_of(work, struct ceph_mds_client, cap_unlink_work); 2497 struct ceph_client *cl = mdsc->fsc->client; 2498 2499 doutc(cl, "begin\n"); 2500 spin_lock(&mdsc->cap_delay_lock); 2501 while (!list_empty(&mdsc->cap_unlink_delay_list)) { 2502 struct ceph_inode_info *ci; 2503 struct inode *inode; 2504 2505 ci = list_first_entry(&mdsc->cap_unlink_delay_list, 2506 struct ceph_inode_info, 2507 i_cap_delay_list); 2508 list_del_init(&ci->i_cap_delay_list); 2509 2510 inode = igrab(&ci->netfs.inode); 2511 if (inode) { 2512 spin_unlock(&mdsc->cap_delay_lock); 2513 doutc(cl, "on %p %llx.%llx\n", inode, 2514 ceph_vinop(inode)); 2515 ceph_check_caps(ci, CHECK_CAPS_FLUSH); 2516 iput(inode); 2517 spin_lock(&mdsc->cap_delay_lock); 2518 } 2519 } 2520 spin_unlock(&mdsc->cap_delay_lock); 2521 doutc(cl, "done\n"); 2522 } 2523 2524 /* 2525 * requests 2526 */ 2527 2528 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, 2529 struct inode *dir) 2530 { 2531 struct ceph_inode_info *ci = ceph_inode(dir); 2532 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; 2533 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options; 2534 size_t size = sizeof(struct ceph_mds_reply_dir_entry); 2535 unsigned int num_entries; 2536 u64 bytes_count; 2537 int order; 2538 2539 spin_lock(&ci->i_ceph_lock); 2540 num_entries = ci->i_files + ci->i_subdirs; 2541 spin_unlock(&ci->i_ceph_lock); 2542 num_entries = max(num_entries, 1U); 2543 num_entries = min(num_entries, opt->max_readdir); 2544 2545 bytes_count = (u64)size * num_entries; 2546 if (unlikely(bytes_count > ULONG_MAX)) 2547 bytes_count = ULONG_MAX; 2548 2549 order = get_order((unsigned long)bytes_count); 2550 while (order >= 0) { 2551 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL | 2552 __GFP_NOWARN | 2553 __GFP_ZERO, 2554 order); 2555 if (rinfo->dir_entries) 2556 break; 2557 order--; 2558 } 2559 if (!rinfo->dir_entries || unlikely(order < 0)) 2560 return -ENOMEM; 2561 2562 num_entries = (PAGE_SIZE << order) / size; 2563 num_entries = min(num_entries, opt->max_readdir); 2564 2565 rinfo->dir_buf_size = PAGE_SIZE << order; 2566 req->r_num_caps = num_entries + 1; 2567 req->r_args.readdir.max_entries = cpu_to_le32(num_entries); 2568 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes); 2569 return 0; 2570 } 2571 2572 /* 2573 * Create an mds request. 2574 */ 2575 struct ceph_mds_request * 2576 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) 2577 { 2578 struct ceph_mds_request *req; 2579 2580 req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS); 2581 if (!req) 2582 return ERR_PTR(-ENOMEM); 2583 2584 mutex_init(&req->r_fill_mutex); 2585 req->r_mdsc = mdsc; 2586 req->r_started = jiffies; 2587 req->r_start_latency = ktime_get(); 2588 req->r_resend_mds = -1; 2589 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 2590 INIT_LIST_HEAD(&req->r_unsafe_target_item); 2591 req->r_fmode = -1; 2592 req->r_feature_needed = -1; 2593 kref_init(&req->r_kref); 2594 RB_CLEAR_NODE(&req->r_node); 2595 INIT_LIST_HEAD(&req->r_wait); 2596 init_completion(&req->r_completion); 2597 init_completion(&req->r_safe_completion); 2598 INIT_LIST_HEAD(&req->r_unsafe_item); 2599 2600 ktime_get_coarse_real_ts64(&req->r_stamp); 2601 2602 req->r_op = op; 2603 req->r_direct_mode = mode; 2604 return req; 2605 } 2606 2607 /* 2608 * return oldest (lowest) request, tid in request tree, 0 if none. 2609 * 2610 * called under mdsc->mutex. 2611 */ 2612 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) 2613 { 2614 if (RB_EMPTY_ROOT(&mdsc->request_tree)) 2615 return NULL; 2616 return rb_entry(rb_first(&mdsc->request_tree), 2617 struct ceph_mds_request, r_node); 2618 } 2619 2620 static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) 2621 { 2622 return mdsc->oldest_tid; 2623 } 2624 2625 #if IS_ENABLED(CONFIG_FS_ENCRYPTION) 2626 static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen) 2627 { 2628 struct inode *dir = req->r_parent; 2629 struct dentry *dentry = req->r_dentry; 2630 const struct qstr *name = req->r_dname; 2631 u8 *cryptbuf = NULL; 2632 u32 len = 0; 2633 int ret = 0; 2634 2635 /* only encode if we have parent and dentry */ 2636 if (!dir || !dentry) 2637 goto success; 2638 2639 /* No-op unless this is encrypted */ 2640 if (!IS_ENCRYPTED(dir)) 2641 goto success; 2642 2643 ret = ceph_fscrypt_prepare_readdir(dir); 2644 if (ret < 0) 2645 return ERR_PTR(ret); 2646 2647 /* No key? Just ignore it. */ 2648 if (!fscrypt_has_encryption_key(dir)) 2649 goto success; 2650 2651 if (!name) 2652 name = &dentry->d_name; 2653 2654 if (!fscrypt_fname_encrypted_size(dir, name->len, NAME_MAX, &len)) { 2655 WARN_ON_ONCE(1); 2656 return ERR_PTR(-ENAMETOOLONG); 2657 } 2658 2659 /* No need to append altname if name is short enough */ 2660 if (len <= CEPH_NOHASH_NAME_MAX) { 2661 len = 0; 2662 goto success; 2663 } 2664 2665 cryptbuf = kmalloc(len, GFP_KERNEL); 2666 if (!cryptbuf) 2667 return ERR_PTR(-ENOMEM); 2668 2669 ret = fscrypt_fname_encrypt(dir, name, cryptbuf, len); 2670 if (ret) { 2671 kfree(cryptbuf); 2672 return ERR_PTR(ret); 2673 } 2674 success: 2675 *plen = len; 2676 return cryptbuf; 2677 } 2678 #else 2679 static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen) 2680 { 2681 *plen = 0; 2682 return NULL; 2683 } 2684 #endif 2685 2686 /** 2687 * ceph_mdsc_build_path - build a path string to a given dentry 2688 * @mdsc: mds client 2689 * @dentry: dentry to which path should be built 2690 * @path_info: output path, length, base ino+snap, and freepath ownership flag 2691 * @for_wire: is this path going to be sent to the MDS? 2692 * 2693 * Build a string that represents the path to the dentry. This is mostly called 2694 * for two different purposes: 2695 * 2696 * 1) we need to build a path string to send to the MDS (for_wire == true) 2697 * 2) we need a path string for local presentation (e.g. debugfs) 2698 * (for_wire == false) 2699 * 2700 * The path is built in reverse, starting with the dentry. Walk back up toward 2701 * the root, building the path until the first non-snapped inode is reached 2702 * (for_wire) or the root inode is reached (!for_wire). 2703 * 2704 * Encode hidden .snap dirs as a double /, i.e. 2705 * foo/.snap/bar -> foo//bar 2706 */ 2707 char *ceph_mdsc_build_path(struct ceph_mds_client *mdsc, struct dentry *dentry, 2708 struct ceph_path_info *path_info, int for_wire) 2709 { 2710 struct ceph_client *cl = mdsc->fsc->client; 2711 struct dentry *cur; 2712 struct inode *inode; 2713 char *path; 2714 int pos; 2715 unsigned seq; 2716 u64 base; 2717 2718 if (!dentry) 2719 return ERR_PTR(-EINVAL); 2720 2721 path = __getname(); 2722 if (!path) 2723 return ERR_PTR(-ENOMEM); 2724 retry: 2725 pos = PATH_MAX - 1; 2726 path[pos] = '\0'; 2727 2728 seq = read_seqbegin(&rename_lock); 2729 cur = dget(dentry); 2730 for (;;) { 2731 struct dentry *parent; 2732 2733 spin_lock(&cur->d_lock); 2734 inode = d_inode(cur); 2735 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 2736 doutc(cl, "path+%d: %p SNAPDIR\n", pos, cur); 2737 spin_unlock(&cur->d_lock); 2738 parent = dget_parent(cur); 2739 } else if (for_wire && inode && dentry != cur && 2740 ceph_snap(inode) == CEPH_NOSNAP) { 2741 spin_unlock(&cur->d_lock); 2742 pos++; /* get rid of any prepended '/' */ 2743 break; 2744 } else if (!for_wire || !IS_ENCRYPTED(d_inode(cur->d_parent))) { 2745 pos -= cur->d_name.len; 2746 if (pos < 0) { 2747 spin_unlock(&cur->d_lock); 2748 break; 2749 } 2750 memcpy(path + pos, cur->d_name.name, cur->d_name.len); 2751 spin_unlock(&cur->d_lock); 2752 parent = dget_parent(cur); 2753 } else { 2754 int len, ret; 2755 char buf[NAME_MAX]; 2756 2757 /* 2758 * Proactively copy name into buf, in case we need to 2759 * present it as-is. 2760 */ 2761 memcpy(buf, cur->d_name.name, cur->d_name.len); 2762 len = cur->d_name.len; 2763 spin_unlock(&cur->d_lock); 2764 parent = dget_parent(cur); 2765 2766 ret = ceph_fscrypt_prepare_readdir(d_inode(parent)); 2767 if (ret < 0) { 2768 dput(parent); 2769 dput(cur); 2770 return ERR_PTR(ret); 2771 } 2772 2773 if (fscrypt_has_encryption_key(d_inode(parent))) { 2774 len = ceph_encode_encrypted_dname(d_inode(parent), 2775 buf, len); 2776 if (len < 0) { 2777 dput(parent); 2778 dput(cur); 2779 return ERR_PTR(len); 2780 } 2781 } 2782 pos -= len; 2783 if (pos < 0) { 2784 dput(parent); 2785 break; 2786 } 2787 memcpy(path + pos, buf, len); 2788 } 2789 dput(cur); 2790 cur = parent; 2791 2792 /* Are we at the root? */ 2793 if (IS_ROOT(cur)) 2794 break; 2795 2796 /* Are we out of buffer? */ 2797 if (--pos < 0) 2798 break; 2799 2800 path[pos] = '/'; 2801 } 2802 inode = d_inode(cur); 2803 base = inode ? ceph_ino(inode) : 0; 2804 dput(cur); 2805 2806 if (read_seqretry(&rename_lock, seq)) 2807 goto retry; 2808 2809 if (pos < 0) { 2810 /* 2811 * The path is longer than PATH_MAX and this function 2812 * cannot ever succeed. Creating paths that long is 2813 * possible with Ceph, but Linux cannot use them. 2814 */ 2815 return ERR_PTR(-ENAMETOOLONG); 2816 } 2817 2818 /* Initialize the output structure */ 2819 memset(path_info, 0, sizeof(*path_info)); 2820 2821 path_info->vino.ino = base; 2822 path_info->pathlen = PATH_MAX - 1 - pos; 2823 path_info->path = path + pos; 2824 path_info->freepath = true; 2825 2826 /* Set snap from dentry if available */ 2827 if (d_inode(dentry)) 2828 path_info->vino.snap = ceph_snap(d_inode(dentry)); 2829 else 2830 path_info->vino.snap = CEPH_NOSNAP; 2831 2832 doutc(cl, "on %p %d built %llx '%.*s'\n", dentry, d_count(dentry), 2833 base, PATH_MAX - 1 - pos, path + pos); 2834 return path + pos; 2835 } 2836 2837 static int build_dentry_path(struct ceph_mds_client *mdsc, struct dentry *dentry, 2838 struct inode *dir, struct ceph_path_info *path_info, 2839 bool parent_locked) 2840 { 2841 char *path; 2842 2843 rcu_read_lock(); 2844 if (!dir) 2845 dir = d_inode_rcu(dentry->d_parent); 2846 if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP && 2847 !IS_ENCRYPTED(dir)) { 2848 path_info->vino.ino = ceph_ino(dir); 2849 path_info->vino.snap = ceph_snap(dir); 2850 rcu_read_unlock(); 2851 path_info->path = dentry->d_name.name; 2852 path_info->pathlen = dentry->d_name.len; 2853 path_info->freepath = false; 2854 return 0; 2855 } 2856 rcu_read_unlock(); 2857 path = ceph_mdsc_build_path(mdsc, dentry, path_info, 1); 2858 if (IS_ERR(path)) 2859 return PTR_ERR(path); 2860 /* 2861 * ceph_mdsc_build_path already fills path_info, including snap handling. 2862 */ 2863 return 0; 2864 } 2865 2866 static int build_inode_path(struct inode *inode, struct ceph_path_info *path_info) 2867 { 2868 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); 2869 struct dentry *dentry; 2870 char *path; 2871 2872 if (ceph_snap(inode) == CEPH_NOSNAP) { 2873 path_info->vino.ino = ceph_ino(inode); 2874 path_info->vino.snap = ceph_snap(inode); 2875 path_info->pathlen = 0; 2876 path_info->freepath = false; 2877 return 0; 2878 } 2879 dentry = d_find_alias(inode); 2880 path = ceph_mdsc_build_path(mdsc, dentry, path_info, 1); 2881 dput(dentry); 2882 if (IS_ERR(path)) 2883 return PTR_ERR(path); 2884 /* 2885 * ceph_mdsc_build_path already fills path_info, including snap from dentry. 2886 * Override with inode's snap since that's what this function is for. 2887 */ 2888 path_info->vino.snap = ceph_snap(inode); 2889 return 0; 2890 } 2891 2892 /* 2893 * request arguments may be specified via an inode *, a dentry *, or 2894 * an explicit ino+path. 2895 */ 2896 static int set_request_path_attr(struct ceph_mds_client *mdsc, struct inode *rinode, 2897 struct dentry *rdentry, struct inode *rdiri, 2898 const char *rpath, u64 rino, 2899 struct ceph_path_info *path_info, 2900 bool parent_locked) 2901 { 2902 struct ceph_client *cl = mdsc->fsc->client; 2903 int r = 0; 2904 2905 /* Initialize the output structure */ 2906 memset(path_info, 0, sizeof(*path_info)); 2907 2908 if (rinode) { 2909 r = build_inode_path(rinode, path_info); 2910 doutc(cl, " inode %p %llx.%llx\n", rinode, ceph_ino(rinode), 2911 ceph_snap(rinode)); 2912 } else if (rdentry) { 2913 r = build_dentry_path(mdsc, rdentry, rdiri, path_info, parent_locked); 2914 doutc(cl, " dentry %p %llx/%.*s\n", rdentry, path_info->vino.ino, 2915 path_info->pathlen, path_info->path); 2916 } else if (rpath || rino) { 2917 path_info->vino.ino = rino; 2918 path_info->vino.snap = CEPH_NOSNAP; 2919 path_info->path = rpath; 2920 path_info->pathlen = rpath ? strlen(rpath) : 0; 2921 path_info->freepath = false; 2922 2923 doutc(cl, " path %.*s\n", path_info->pathlen, rpath); 2924 } 2925 2926 return r; 2927 } 2928 2929 static void encode_mclientrequest_tail(void **p, 2930 const struct ceph_mds_request *req) 2931 { 2932 struct ceph_timespec ts; 2933 int i; 2934 2935 ceph_encode_timespec64(&ts, &req->r_stamp); 2936 ceph_encode_copy(p, &ts, sizeof(ts)); 2937 2938 /* v4: gid_list */ 2939 ceph_encode_32(p, req->r_cred->group_info->ngroups); 2940 for (i = 0; i < req->r_cred->group_info->ngroups; i++) 2941 ceph_encode_64(p, from_kgid(&init_user_ns, 2942 req->r_cred->group_info->gid[i])); 2943 2944 /* v5: altname */ 2945 ceph_encode_32(p, req->r_altname_len); 2946 ceph_encode_copy(p, req->r_altname, req->r_altname_len); 2947 2948 /* v6: fscrypt_auth and fscrypt_file */ 2949 if (req->r_fscrypt_auth) { 2950 u32 authlen = ceph_fscrypt_auth_len(req->r_fscrypt_auth); 2951 2952 ceph_encode_32(p, authlen); 2953 ceph_encode_copy(p, req->r_fscrypt_auth, authlen); 2954 } else { 2955 ceph_encode_32(p, 0); 2956 } 2957 if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags)) { 2958 ceph_encode_32(p, sizeof(__le64)); 2959 ceph_encode_64(p, req->r_fscrypt_file); 2960 } else { 2961 ceph_encode_32(p, 0); 2962 } 2963 } 2964 2965 static inline u16 mds_supported_head_version(struct ceph_mds_session *session) 2966 { 2967 if (!test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD, &session->s_features)) 2968 return 1; 2969 2970 if (!test_bit(CEPHFS_FEATURE_HAS_OWNER_UIDGID, &session->s_features)) 2971 return 2; 2972 2973 return CEPH_MDS_REQUEST_HEAD_VERSION; 2974 } 2975 2976 static struct ceph_mds_request_head_legacy * 2977 find_legacy_request_head(void *p, u64 features) 2978 { 2979 bool legacy = !(features & CEPH_FEATURE_FS_BTIME); 2980 struct ceph_mds_request_head *head; 2981 2982 if (legacy) 2983 return (struct ceph_mds_request_head_legacy *)p; 2984 head = (struct ceph_mds_request_head *)p; 2985 return (struct ceph_mds_request_head_legacy *)&head->oldest_client_tid; 2986 } 2987 2988 /* 2989 * called under mdsc->mutex 2990 */ 2991 static struct ceph_msg *create_request_message(struct ceph_mds_session *session, 2992 struct ceph_mds_request *req, 2993 bool drop_cap_releases) 2994 { 2995 int mds = session->s_mds; 2996 struct ceph_mds_client *mdsc = session->s_mdsc; 2997 struct ceph_client *cl = mdsc->fsc->client; 2998 struct ceph_msg *msg; 2999 struct ceph_mds_request_head_legacy *lhead; 3000 struct ceph_path_info path_info1 = {0}; 3001 struct ceph_path_info path_info2 = {0}; 3002 struct dentry *old_dentry = NULL; 3003 int len; 3004 u16 releases; 3005 void *p, *end; 3006 int ret; 3007 bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME); 3008 u16 request_head_version = mds_supported_head_version(session); 3009 kuid_t caller_fsuid = req->r_cred->fsuid; 3010 kgid_t caller_fsgid = req->r_cred->fsgid; 3011 bool parent_locked = test_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); 3012 3013 ret = set_request_path_attr(mdsc, req->r_inode, req->r_dentry, 3014 req->r_parent, req->r_path1, req->r_ino1.ino, 3015 &path_info1, parent_locked); 3016 if (ret < 0) { 3017 msg = ERR_PTR(ret); 3018 goto out; 3019 } 3020 3021 /* 3022 * When the parent directory's i_rwsem is *not* locked, req->r_parent may 3023 * have become stale (e.g. after a concurrent rename) between the time the 3024 * dentry was looked up and now. If we detect that the stored r_parent 3025 * does not match the inode number we just encoded for the request, switch 3026 * to the correct inode so that the MDS receives a valid parent reference. 3027 */ 3028 if (!parent_locked && req->r_parent && path_info1.vino.ino && 3029 ceph_ino(req->r_parent) != path_info1.vino.ino) { 3030 struct inode *old_parent = req->r_parent; 3031 struct inode *correct_dir = ceph_get_inode(mdsc->fsc->sb, path_info1.vino, NULL); 3032 if (!IS_ERR(correct_dir)) { 3033 WARN_ONCE(1, "ceph: r_parent mismatch (had %llx wanted %llx) - updating\n", 3034 ceph_ino(old_parent), path_info1.vino.ino); 3035 /* 3036 * Transfer CEPH_CAP_PIN from the old parent to the new one. 3037 * The pin was taken earlier in ceph_mdsc_submit_request(). 3038 */ 3039 ceph_put_cap_refs(ceph_inode(old_parent), CEPH_CAP_PIN); 3040 iput(old_parent); 3041 req->r_parent = correct_dir; 3042 ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 3043 } 3044 } 3045 3046 /* If r_old_dentry is set, then assume that its parent is locked */ 3047 if (req->r_old_dentry && 3048 !(req->r_old_dentry->d_flags & DCACHE_DISCONNECTED)) 3049 old_dentry = req->r_old_dentry; 3050 ret = set_request_path_attr(mdsc, NULL, old_dentry, 3051 req->r_old_dentry_dir, 3052 req->r_path2, req->r_ino2.ino, 3053 &path_info2, true); 3054 if (ret < 0) { 3055 msg = ERR_PTR(ret); 3056 goto out_free1; 3057 } 3058 3059 req->r_altname = get_fscrypt_altname(req, &req->r_altname_len); 3060 if (IS_ERR(req->r_altname)) { 3061 msg = ERR_CAST(req->r_altname); 3062 req->r_altname = NULL; 3063 goto out_free2; 3064 } 3065 3066 /* 3067 * For old cephs without supporting the 32bit retry/fwd feature 3068 * it will copy the raw memories directly when decoding the 3069 * requests. While new cephs will decode the head depending the 3070 * version member, so we need to make sure it will be compatible 3071 * with them both. 3072 */ 3073 if (legacy) 3074 len = sizeof(struct ceph_mds_request_head_legacy); 3075 else if (request_head_version == 1) 3076 len = offsetofend(struct ceph_mds_request_head, args); 3077 else if (request_head_version == 2) 3078 len = offsetofend(struct ceph_mds_request_head, ext_num_fwd); 3079 else 3080 len = sizeof(struct ceph_mds_request_head); 3081 3082 /* filepaths */ 3083 len += 2 * (1 + sizeof(u32) + sizeof(u64)); 3084 len += path_info1.pathlen + path_info2.pathlen; 3085 3086 /* cap releases */ 3087 len += sizeof(struct ceph_mds_request_release) * 3088 (!!req->r_inode_drop + !!req->r_dentry_drop + 3089 !!req->r_old_inode_drop + !!req->r_old_dentry_drop); 3090 3091 if (req->r_dentry_drop) 3092 len += path_info1.pathlen; 3093 if (req->r_old_dentry_drop) 3094 len += path_info2.pathlen; 3095 3096 /* MClientRequest tail */ 3097 3098 /* req->r_stamp */ 3099 len += sizeof(struct ceph_timespec); 3100 3101 /* gid list */ 3102 len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups); 3103 3104 /* alternate name */ 3105 len += sizeof(u32) + req->r_altname_len; 3106 3107 /* fscrypt_auth */ 3108 len += sizeof(u32); // fscrypt_auth 3109 if (req->r_fscrypt_auth) 3110 len += ceph_fscrypt_auth_len(req->r_fscrypt_auth); 3111 3112 /* fscrypt_file */ 3113 len += sizeof(u32); 3114 if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags)) 3115 len += sizeof(__le64); 3116 3117 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false); 3118 if (!msg) { 3119 msg = ERR_PTR(-ENOMEM); 3120 goto out_free2; 3121 } 3122 3123 msg->hdr.tid = cpu_to_le64(req->r_tid); 3124 3125 lhead = find_legacy_request_head(msg->front.iov_base, 3126 session->s_con.peer_features); 3127 3128 if ((req->r_mnt_idmap != &nop_mnt_idmap) && 3129 !test_bit(CEPHFS_FEATURE_HAS_OWNER_UIDGID, &session->s_features)) { 3130 WARN_ON_ONCE(!IS_CEPH_MDS_OP_NEWINODE(req->r_op)); 3131 3132 if (enable_unsafe_idmap) { 3133 pr_warn_once_client(cl, 3134 "idmapped mount is used and CEPHFS_FEATURE_HAS_OWNER_UIDGID" 3135 " is not supported by MDS. UID/GID-based restrictions may" 3136 " not work properly.\n"); 3137 3138 caller_fsuid = from_vfsuid(req->r_mnt_idmap, &init_user_ns, 3139 VFSUIDT_INIT(req->r_cred->fsuid)); 3140 caller_fsgid = from_vfsgid(req->r_mnt_idmap, &init_user_ns, 3141 VFSGIDT_INIT(req->r_cred->fsgid)); 3142 } else { 3143 pr_err_ratelimited_client(cl, 3144 "idmapped mount is used and CEPHFS_FEATURE_HAS_OWNER_UIDGID" 3145 " is not supported by MDS. Fail request with -EIO.\n"); 3146 3147 ret = -EIO; 3148 goto out_err; 3149 } 3150 } 3151 3152 /* 3153 * The ceph_mds_request_head_legacy didn't contain a version field, and 3154 * one was added when we moved the message version from 3->4. 3155 */ 3156 if (legacy) { 3157 msg->hdr.version = cpu_to_le16(3); 3158 p = msg->front.iov_base + sizeof(*lhead); 3159 } else if (request_head_version == 1) { 3160 struct ceph_mds_request_head *nhead = msg->front.iov_base; 3161 3162 msg->hdr.version = cpu_to_le16(4); 3163 nhead->version = cpu_to_le16(1); 3164 p = msg->front.iov_base + offsetofend(struct ceph_mds_request_head, args); 3165 } else if (request_head_version == 2) { 3166 struct ceph_mds_request_head *nhead = msg->front.iov_base; 3167 3168 msg->hdr.version = cpu_to_le16(6); 3169 nhead->version = cpu_to_le16(2); 3170 3171 p = msg->front.iov_base + offsetofend(struct ceph_mds_request_head, ext_num_fwd); 3172 } else { 3173 struct ceph_mds_request_head *nhead = msg->front.iov_base; 3174 kuid_t owner_fsuid; 3175 kgid_t owner_fsgid; 3176 3177 msg->hdr.version = cpu_to_le16(6); 3178 nhead->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION); 3179 nhead->struct_len = cpu_to_le32(sizeof(struct ceph_mds_request_head)); 3180 3181 if (IS_CEPH_MDS_OP_NEWINODE(req->r_op)) { 3182 owner_fsuid = from_vfsuid(req->r_mnt_idmap, &init_user_ns, 3183 VFSUIDT_INIT(req->r_cred->fsuid)); 3184 owner_fsgid = from_vfsgid(req->r_mnt_idmap, &init_user_ns, 3185 VFSGIDT_INIT(req->r_cred->fsgid)); 3186 nhead->owner_uid = cpu_to_le32(from_kuid(&init_user_ns, owner_fsuid)); 3187 nhead->owner_gid = cpu_to_le32(from_kgid(&init_user_ns, owner_fsgid)); 3188 } else { 3189 nhead->owner_uid = cpu_to_le32(-1); 3190 nhead->owner_gid = cpu_to_le32(-1); 3191 } 3192 3193 p = msg->front.iov_base + sizeof(*nhead); 3194 } 3195 3196 end = msg->front.iov_base + msg->front.iov_len; 3197 3198 lhead->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); 3199 lhead->op = cpu_to_le32(req->r_op); 3200 lhead->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, 3201 caller_fsuid)); 3202 lhead->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, 3203 caller_fsgid)); 3204 lhead->ino = cpu_to_le64(req->r_deleg_ino); 3205 lhead->args = req->r_args; 3206 3207 ceph_encode_filepath(&p, end, path_info1.vino.ino, path_info1.path); 3208 ceph_encode_filepath(&p, end, path_info2.vino.ino, path_info2.path); 3209 3210 /* make note of release offset, in case we need to replay */ 3211 req->r_request_release_offset = p - msg->front.iov_base; 3212 3213 /* cap releases */ 3214 releases = 0; 3215 if (req->r_inode_drop) 3216 releases += ceph_encode_inode_release(&p, 3217 req->r_inode ? req->r_inode : d_inode(req->r_dentry), 3218 mds, req->r_inode_drop, req->r_inode_unless, 3219 req->r_op == CEPH_MDS_OP_READDIR); 3220 if (req->r_dentry_drop) { 3221 ret = ceph_encode_dentry_release(&p, req->r_dentry, 3222 req->r_parent, mds, req->r_dentry_drop, 3223 req->r_dentry_unless); 3224 if (ret < 0) 3225 goto out_err; 3226 releases += ret; 3227 } 3228 if (req->r_old_dentry_drop) { 3229 ret = ceph_encode_dentry_release(&p, req->r_old_dentry, 3230 req->r_old_dentry_dir, mds, 3231 req->r_old_dentry_drop, 3232 req->r_old_dentry_unless); 3233 if (ret < 0) 3234 goto out_err; 3235 releases += ret; 3236 } 3237 if (req->r_old_inode_drop) 3238 releases += ceph_encode_inode_release(&p, 3239 d_inode(req->r_old_dentry), 3240 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); 3241 3242 if (drop_cap_releases) { 3243 releases = 0; 3244 p = msg->front.iov_base + req->r_request_release_offset; 3245 } 3246 3247 lhead->num_releases = cpu_to_le16(releases); 3248 3249 encode_mclientrequest_tail(&p, req); 3250 3251 if (WARN_ON_ONCE(p > end)) { 3252 ceph_msg_put(msg); 3253 msg = ERR_PTR(-ERANGE); 3254 goto out_free2; 3255 } 3256 3257 msg->front.iov_len = p - msg->front.iov_base; 3258 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 3259 3260 if (req->r_pagelist) { 3261 struct ceph_pagelist *pagelist = req->r_pagelist; 3262 ceph_msg_data_add_pagelist(msg, pagelist); 3263 msg->hdr.data_len = cpu_to_le32(pagelist->length); 3264 } else { 3265 msg->hdr.data_len = 0; 3266 } 3267 3268 msg->hdr.data_off = cpu_to_le16(0); 3269 3270 out_free2: 3271 ceph_mdsc_free_path_info(&path_info2); 3272 out_free1: 3273 ceph_mdsc_free_path_info(&path_info1); 3274 out: 3275 return msg; 3276 out_err: 3277 ceph_msg_put(msg); 3278 msg = ERR_PTR(ret); 3279 goto out_free2; 3280 } 3281 3282 /* 3283 * called under mdsc->mutex if error, under no mutex if 3284 * success. 3285 */ 3286 static void complete_request(struct ceph_mds_client *mdsc, 3287 struct ceph_mds_request *req) 3288 { 3289 req->r_end_latency = ktime_get(); 3290 3291 if (req->r_callback) 3292 req->r_callback(mdsc, req); 3293 complete_all(&req->r_completion); 3294 } 3295 3296 /* 3297 * called under mdsc->mutex 3298 */ 3299 static int __prepare_send_request(struct ceph_mds_session *session, 3300 struct ceph_mds_request *req, 3301 bool drop_cap_releases) 3302 { 3303 int mds = session->s_mds; 3304 struct ceph_mds_client *mdsc = session->s_mdsc; 3305 struct ceph_client *cl = mdsc->fsc->client; 3306 struct ceph_mds_request_head_legacy *lhead; 3307 struct ceph_mds_request_head *nhead; 3308 struct ceph_msg *msg; 3309 int flags = 0, old_max_retry; 3310 bool old_version = !test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD, 3311 &session->s_features); 3312 3313 /* 3314 * Avoid infinite retrying after overflow. The client will 3315 * increase the retry count and if the MDS is old version, 3316 * so we limit to retry at most 256 times. 3317 */ 3318 if (req->r_attempts) { 3319 old_max_retry = sizeof_field(struct ceph_mds_request_head, 3320 num_retry); 3321 old_max_retry = 1 << (old_max_retry * BITS_PER_BYTE); 3322 if ((old_version && req->r_attempts >= old_max_retry) || 3323 ((uint32_t)req->r_attempts >= U32_MAX)) { 3324 pr_warn_ratelimited_client(cl, "request tid %llu seq overflow\n", 3325 req->r_tid); 3326 return -EMULTIHOP; 3327 } 3328 } 3329 3330 req->r_attempts++; 3331 if (req->r_inode) { 3332 struct ceph_cap *cap = 3333 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); 3334 3335 if (cap) 3336 req->r_sent_on_mseq = cap->mseq; 3337 else 3338 req->r_sent_on_mseq = -1; 3339 } 3340 doutc(cl, "%p tid %lld %s (attempt %d)\n", req, req->r_tid, 3341 ceph_mds_op_name(req->r_op), req->r_attempts); 3342 3343 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3344 void *p; 3345 3346 /* 3347 * Replay. Do not regenerate message (and rebuild 3348 * paths, etc.); just use the original message. 3349 * Rebuilding paths will break for renames because 3350 * d_move mangles the src name. 3351 */ 3352 msg = req->r_request; 3353 lhead = find_legacy_request_head(msg->front.iov_base, 3354 session->s_con.peer_features); 3355 3356 flags = le32_to_cpu(lhead->flags); 3357 flags |= CEPH_MDS_FLAG_REPLAY; 3358 lhead->flags = cpu_to_le32(flags); 3359 3360 if (req->r_target_inode) 3361 lhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); 3362 3363 lhead->num_retry = req->r_attempts - 1; 3364 if (!old_version) { 3365 nhead = (struct ceph_mds_request_head*)msg->front.iov_base; 3366 nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1); 3367 } 3368 3369 /* remove cap/dentry releases from message */ 3370 lhead->num_releases = 0; 3371 3372 p = msg->front.iov_base + req->r_request_release_offset; 3373 encode_mclientrequest_tail(&p, req); 3374 3375 msg->front.iov_len = p - msg->front.iov_base; 3376 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 3377 return 0; 3378 } 3379 3380 if (req->r_request) { 3381 ceph_msg_put(req->r_request); 3382 req->r_request = NULL; 3383 } 3384 msg = create_request_message(session, req, drop_cap_releases); 3385 if (IS_ERR(msg)) { 3386 req->r_err = PTR_ERR(msg); 3387 return PTR_ERR(msg); 3388 } 3389 req->r_request = msg; 3390 3391 lhead = find_legacy_request_head(msg->front.iov_base, 3392 session->s_con.peer_features); 3393 lhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 3394 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3395 flags |= CEPH_MDS_FLAG_REPLAY; 3396 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) 3397 flags |= CEPH_MDS_FLAG_ASYNC; 3398 if (req->r_parent) 3399 flags |= CEPH_MDS_FLAG_WANT_DENTRY; 3400 lhead->flags = cpu_to_le32(flags); 3401 lhead->num_fwd = req->r_num_fwd; 3402 lhead->num_retry = req->r_attempts - 1; 3403 if (!old_version) { 3404 nhead = (struct ceph_mds_request_head*)msg->front.iov_base; 3405 nhead->ext_num_fwd = cpu_to_le32(req->r_num_fwd); 3406 nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1); 3407 } 3408 3409 doutc(cl, " r_parent = %p\n", req->r_parent); 3410 return 0; 3411 } 3412 3413 /* 3414 * called under mdsc->mutex 3415 */ 3416 static int __send_request(struct ceph_mds_session *session, 3417 struct ceph_mds_request *req, 3418 bool drop_cap_releases) 3419 { 3420 int err; 3421 3422 err = __prepare_send_request(session, req, drop_cap_releases); 3423 if (!err) { 3424 ceph_msg_get(req->r_request); 3425 ceph_con_send(&session->s_con, req->r_request); 3426 } 3427 3428 return err; 3429 } 3430 3431 /* 3432 * send request, or put it on the appropriate wait list. 3433 */ 3434 static void __do_request(struct ceph_mds_client *mdsc, 3435 struct ceph_mds_request *req) 3436 { 3437 struct ceph_client *cl = mdsc->fsc->client; 3438 struct ceph_mds_session *session = NULL; 3439 int mds = -1; 3440 int err = 0; 3441 bool random; 3442 3443 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 3444 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) 3445 __unregister_request(mdsc, req); 3446 return; 3447 } 3448 3449 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) { 3450 doutc(cl, "metadata corrupted\n"); 3451 err = -EIO; 3452 goto finish; 3453 } 3454 if (req->r_timeout && 3455 time_after_eq(jiffies, req->r_started + req->r_timeout)) { 3456 doutc(cl, "timed out\n"); 3457 err = -ETIMEDOUT; 3458 goto finish; 3459 } 3460 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 3461 doutc(cl, "forced umount\n"); 3462 err = -EIO; 3463 goto finish; 3464 } 3465 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) { 3466 if (mdsc->mdsmap_err) { 3467 err = mdsc->mdsmap_err; 3468 doutc(cl, "mdsmap err %d\n", err); 3469 goto finish; 3470 } 3471 if (mdsc->mdsmap->m_epoch == 0) { 3472 doutc(cl, "no mdsmap, waiting for map\n"); 3473 list_add(&req->r_wait, &mdsc->waiting_for_map); 3474 return; 3475 } 3476 if (!(mdsc->fsc->mount_options->flags & 3477 CEPH_MOUNT_OPT_MOUNTWAIT) && 3478 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) { 3479 err = -EHOSTUNREACH; 3480 goto finish; 3481 } 3482 } 3483 3484 put_request_session(req); 3485 3486 mds = __choose_mds(mdsc, req, &random); 3487 if (mds < 0 || 3488 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { 3489 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 3490 err = -EJUKEBOX; 3491 goto finish; 3492 } 3493 doutc(cl, "no mds or not active, waiting for map\n"); 3494 list_add(&req->r_wait, &mdsc->waiting_for_map); 3495 return; 3496 } 3497 3498 /* get, open session */ 3499 session = __ceph_lookup_mds_session(mdsc, mds); 3500 if (!session) { 3501 session = register_session(mdsc, mds); 3502 if (IS_ERR(session)) { 3503 err = PTR_ERR(session); 3504 goto finish; 3505 } 3506 } 3507 req->r_session = ceph_get_mds_session(session); 3508 3509 doutc(cl, "mds%d session %p state %s\n", mds, session, 3510 ceph_session_state_name(session->s_state)); 3511 3512 /* 3513 * The old ceph will crash the MDSs when see unknown OPs 3514 */ 3515 if (req->r_feature_needed > 0 && 3516 !test_bit(req->r_feature_needed, &session->s_features)) { 3517 err = -EOPNOTSUPP; 3518 goto out_session; 3519 } 3520 3521 if (session->s_state != CEPH_MDS_SESSION_OPEN && 3522 session->s_state != CEPH_MDS_SESSION_HUNG) { 3523 /* 3524 * We cannot queue async requests since the caps and delegated 3525 * inodes are bound to the session. Just return -EJUKEBOX and 3526 * let the caller retry a sync request in that case. 3527 */ 3528 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 3529 err = -EJUKEBOX; 3530 goto out_session; 3531 } 3532 3533 /* 3534 * If the session has been REJECTED, then return a hard error, 3535 * unless it's a CLEANRECOVER mount, in which case we'll queue 3536 * it to the mdsc queue. 3537 */ 3538 if (session->s_state == CEPH_MDS_SESSION_REJECTED) { 3539 if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER)) 3540 list_add(&req->r_wait, &mdsc->waiting_for_map); 3541 else 3542 err = -EACCES; 3543 goto out_session; 3544 } 3545 3546 if (session->s_state == CEPH_MDS_SESSION_NEW || 3547 session->s_state == CEPH_MDS_SESSION_CLOSING) { 3548 err = __open_session(mdsc, session); 3549 if (err) 3550 goto out_session; 3551 /* retry the same mds later */ 3552 if (random) 3553 req->r_resend_mds = mds; 3554 } 3555 list_add(&req->r_wait, &session->s_waiting); 3556 goto out_session; 3557 } 3558 3559 /* send request */ 3560 req->r_resend_mds = -1; /* forget any previous mds hint */ 3561 3562 if (req->r_request_started == 0) /* note request start time */ 3563 req->r_request_started = jiffies; 3564 3565 /* 3566 * For async create we will choose the auth MDS of frag in parent 3567 * directory to send the request and usually this works fine, but 3568 * if the migrated the dirtory to another MDS before it could handle 3569 * it the request will be forwarded. 3570 * 3571 * And then the auth cap will be changed. 3572 */ 3573 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags) && req->r_num_fwd) { 3574 struct ceph_dentry_info *di = ceph_dentry(req->r_dentry); 3575 struct ceph_inode_info *ci; 3576 struct ceph_cap *cap; 3577 3578 /* 3579 * The request maybe handled very fast and the new inode 3580 * hasn't been linked to the dentry yet. We need to wait 3581 * for the ceph_finish_async_create(), which shouldn't be 3582 * stuck too long or fail in thoery, to finish when forwarding 3583 * the request. 3584 */ 3585 if (!d_inode(req->r_dentry)) { 3586 err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_CREATE_BIT, 3587 TASK_KILLABLE); 3588 if (err) { 3589 mutex_lock(&req->r_fill_mutex); 3590 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 3591 mutex_unlock(&req->r_fill_mutex); 3592 goto out_session; 3593 } 3594 } 3595 3596 ci = ceph_inode(d_inode(req->r_dentry)); 3597 3598 spin_lock(&ci->i_ceph_lock); 3599 cap = ci->i_auth_cap; 3600 if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE && mds != cap->mds) { 3601 doutc(cl, "session changed for auth cap %d -> %d\n", 3602 cap->session->s_mds, session->s_mds); 3603 3604 /* Remove the auth cap from old session */ 3605 spin_lock(&cap->session->s_cap_lock); 3606 cap->session->s_nr_caps--; 3607 list_del_init(&cap->session_caps); 3608 spin_unlock(&cap->session->s_cap_lock); 3609 3610 /* Add the auth cap to the new session */ 3611 cap->mds = mds; 3612 cap->session = session; 3613 spin_lock(&session->s_cap_lock); 3614 session->s_nr_caps++; 3615 list_add_tail(&cap->session_caps, &session->s_caps); 3616 spin_unlock(&session->s_cap_lock); 3617 3618 change_auth_cap_ses(ci, session); 3619 } 3620 spin_unlock(&ci->i_ceph_lock); 3621 } 3622 3623 err = __send_request(session, req, false); 3624 3625 out_session: 3626 ceph_put_mds_session(session); 3627 finish: 3628 if (err) { 3629 doutc(cl, "early error %d\n", err); 3630 req->r_err = err; 3631 complete_request(mdsc, req); 3632 __unregister_request(mdsc, req); 3633 } 3634 return; 3635 } 3636 3637 /* 3638 * called under mdsc->mutex 3639 */ 3640 static void __wake_requests(struct ceph_mds_client *mdsc, 3641 struct list_head *head) 3642 { 3643 struct ceph_client *cl = mdsc->fsc->client; 3644 struct ceph_mds_request *req; 3645 LIST_HEAD(tmp_list); 3646 3647 list_splice_init(head, &tmp_list); 3648 3649 while (!list_empty(&tmp_list)) { 3650 req = list_entry(tmp_list.next, 3651 struct ceph_mds_request, r_wait); 3652 list_del_init(&req->r_wait); 3653 doutc(cl, " wake request %p tid %llu\n", req, 3654 req->r_tid); 3655 __do_request(mdsc, req); 3656 } 3657 } 3658 3659 /* 3660 * Wake up threads with requests pending for @mds, so that they can 3661 * resubmit their requests to a possibly different mds. 3662 */ 3663 static void kick_requests(struct ceph_mds_client *mdsc, int mds) 3664 { 3665 struct ceph_client *cl = mdsc->fsc->client; 3666 struct ceph_mds_request *req; 3667 struct rb_node *p = rb_first(&mdsc->request_tree); 3668 3669 doutc(cl, "kick_requests mds%d\n", mds); 3670 while (p) { 3671 req = rb_entry(p, struct ceph_mds_request, r_node); 3672 p = rb_next(p); 3673 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3674 continue; 3675 if (req->r_attempts > 0) 3676 continue; /* only new requests */ 3677 if (req->r_session && 3678 req->r_session->s_mds == mds) { 3679 doutc(cl, " kicking tid %llu\n", req->r_tid); 3680 list_del_init(&req->r_wait); 3681 __do_request(mdsc, req); 3682 } 3683 } 3684 } 3685 3686 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, 3687 struct ceph_mds_request *req) 3688 { 3689 struct ceph_client *cl = mdsc->fsc->client; 3690 int err = 0; 3691 3692 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */ 3693 if (req->r_inode) 3694 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 3695 if (req->r_parent) { 3696 struct ceph_inode_info *ci = ceph_inode(req->r_parent); 3697 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ? 3698 CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD; 3699 spin_lock(&ci->i_ceph_lock); 3700 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false); 3701 __ceph_touch_fmode(ci, mdsc, fmode); 3702 spin_unlock(&ci->i_ceph_lock); 3703 } 3704 if (req->r_old_dentry_dir) 3705 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 3706 CEPH_CAP_PIN); 3707 3708 if (req->r_inode) { 3709 err = ceph_wait_on_async_create(req->r_inode); 3710 if (err) { 3711 doutc(cl, "wait for async create returned: %d\n", err); 3712 return err; 3713 } 3714 } 3715 3716 if (!err && req->r_old_inode) { 3717 err = ceph_wait_on_async_create(req->r_old_inode); 3718 if (err) { 3719 doutc(cl, "wait for async create returned: %d\n", err); 3720 return err; 3721 } 3722 } 3723 3724 doutc(cl, "submit_request on %p for inode %p\n", req, dir); 3725 mutex_lock(&mdsc->mutex); 3726 __register_request(mdsc, req, dir); 3727 __do_request(mdsc, req); 3728 err = req->r_err; 3729 mutex_unlock(&mdsc->mutex); 3730 return err; 3731 } 3732 3733 int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc, 3734 struct ceph_mds_request *req, 3735 ceph_mds_request_wait_callback_t wait_func) 3736 { 3737 struct ceph_client *cl = mdsc->fsc->client; 3738 int err; 3739 3740 /* wait */ 3741 doutc(cl, "do_request waiting\n"); 3742 if (wait_func) { 3743 err = wait_func(mdsc, req); 3744 } else { 3745 long timeleft = wait_for_completion_killable_timeout( 3746 &req->r_completion, 3747 ceph_timeout_jiffies(req->r_timeout)); 3748 if (timeleft > 0) 3749 err = 0; 3750 else if (!timeleft) 3751 err = -ETIMEDOUT; /* timed out */ 3752 else 3753 err = timeleft; /* killed */ 3754 } 3755 doutc(cl, "do_request waited, got %d\n", err); 3756 mutex_lock(&mdsc->mutex); 3757 3758 /* only abort if we didn't race with a real reply */ 3759 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 3760 err = le32_to_cpu(req->r_reply_info.head->result); 3761 } else if (err < 0) { 3762 doutc(cl, "aborted request %lld with %d\n", req->r_tid, err); 3763 3764 /* 3765 * ensure we aren't running concurrently with 3766 * ceph_fill_trace or ceph_readdir_prepopulate, which 3767 * rely on locks (dir mutex) held by our caller. 3768 */ 3769 mutex_lock(&req->r_fill_mutex); 3770 req->r_err = err; 3771 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 3772 mutex_unlock(&req->r_fill_mutex); 3773 3774 if (req->r_parent && 3775 (req->r_op & CEPH_MDS_OP_WRITE)) 3776 ceph_invalidate_dir_request(req); 3777 } else { 3778 err = req->r_err; 3779 } 3780 3781 mutex_unlock(&mdsc->mutex); 3782 return err; 3783 } 3784 3785 /* 3786 * Synchrously perform an mds request. Take care of all of the 3787 * session setup, forwarding, retry details. 3788 */ 3789 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, 3790 struct inode *dir, 3791 struct ceph_mds_request *req) 3792 { 3793 struct ceph_client *cl = mdsc->fsc->client; 3794 int err; 3795 3796 doutc(cl, "do_request on %p\n", req); 3797 3798 /* issue */ 3799 err = ceph_mdsc_submit_request(mdsc, dir, req); 3800 if (!err) 3801 err = ceph_mdsc_wait_request(mdsc, req, NULL); 3802 doutc(cl, "do_request %p done, result %d\n", req, err); 3803 return err; 3804 } 3805 3806 /* 3807 * Invalidate dir's completeness, dentry lease state on an aborted MDS 3808 * namespace request. 3809 */ 3810 void ceph_invalidate_dir_request(struct ceph_mds_request *req) 3811 { 3812 struct inode *dir = req->r_parent; 3813 struct inode *old_dir = req->r_old_dentry_dir; 3814 struct ceph_client *cl = req->r_mdsc->fsc->client; 3815 3816 doutc(cl, "invalidate_dir_request %p %p (complete, lease(s))\n", 3817 dir, old_dir); 3818 3819 ceph_dir_clear_complete(dir); 3820 if (old_dir) 3821 ceph_dir_clear_complete(old_dir); 3822 if (req->r_dentry) 3823 ceph_invalidate_dentry_lease(req->r_dentry); 3824 if (req->r_old_dentry) 3825 ceph_invalidate_dentry_lease(req->r_old_dentry); 3826 } 3827 3828 /* 3829 * Handle mds reply. 3830 * 3831 * We take the session mutex and parse and process the reply immediately. 3832 * This preserves the logical ordering of replies, capabilities, etc., sent 3833 * by the MDS as they are applied to our local cache. 3834 */ 3835 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) 3836 { 3837 struct ceph_mds_client *mdsc = session->s_mdsc; 3838 struct ceph_client *cl = mdsc->fsc->client; 3839 struct ceph_mds_request *req; 3840 struct ceph_mds_reply_head *head = msg->front.iov_base; 3841 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 3842 struct ceph_snap_realm *realm; 3843 u64 tid; 3844 int err, result; 3845 int mds = session->s_mds; 3846 bool close_sessions = false; 3847 3848 if (msg->front.iov_len < sizeof(*head)) { 3849 pr_err_client(cl, "got corrupt (short) reply\n"); 3850 ceph_msg_dump(msg); 3851 return; 3852 } 3853 3854 /* get request, session */ 3855 tid = le64_to_cpu(msg->hdr.tid); 3856 mutex_lock(&mdsc->mutex); 3857 req = lookup_get_request(mdsc, tid); 3858 if (!req) { 3859 doutc(cl, "on unknown tid %llu\n", tid); 3860 mutex_unlock(&mdsc->mutex); 3861 return; 3862 } 3863 doutc(cl, "handle_reply %p\n", req); 3864 3865 /* correct session? */ 3866 if (req->r_session != session) { 3867 pr_err_client(cl, "got %llu on session mds%d not mds%d\n", 3868 tid, session->s_mds, 3869 req->r_session ? req->r_session->s_mds : -1); 3870 mutex_unlock(&mdsc->mutex); 3871 goto out; 3872 } 3873 3874 /* dup? */ 3875 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) || 3876 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) { 3877 pr_warn_client(cl, "got a dup %s reply on %llu from mds%d\n", 3878 head->safe ? "safe" : "unsafe", tid, mds); 3879 mutex_unlock(&mdsc->mutex); 3880 goto out; 3881 } 3882 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) { 3883 pr_warn_client(cl, "got unsafe after safe on %llu from mds%d\n", 3884 tid, mds); 3885 mutex_unlock(&mdsc->mutex); 3886 goto out; 3887 } 3888 3889 result = le32_to_cpu(head->result); 3890 3891 if (head->safe) { 3892 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags); 3893 __unregister_request(mdsc, req); 3894 3895 /* last request during umount? */ 3896 if (mdsc->stopping && !__get_oldest_req(mdsc)) 3897 complete_all(&mdsc->safe_umount_waiters); 3898 3899 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3900 /* 3901 * We already handled the unsafe response, now do the 3902 * cleanup. No need to examine the response; the MDS 3903 * doesn't include any result info in the safe 3904 * response. And even if it did, there is nothing 3905 * useful we could do with a revised return value. 3906 */ 3907 doutc(cl, "got safe reply %llu, mds%d\n", tid, mds); 3908 3909 mutex_unlock(&mdsc->mutex); 3910 goto out; 3911 } 3912 } else { 3913 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags); 3914 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 3915 } 3916 3917 doutc(cl, "tid %lld result %d\n", tid, result); 3918 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features)) 3919 err = parse_reply_info(session, msg, req, (u64)-1); 3920 else 3921 err = parse_reply_info(session, msg, req, 3922 session->s_con.peer_features); 3923 mutex_unlock(&mdsc->mutex); 3924 3925 /* Must find target inode outside of mutexes to avoid deadlocks */ 3926 rinfo = &req->r_reply_info; 3927 if ((err >= 0) && rinfo->head->is_target) { 3928 struct inode *in = xchg(&req->r_new_inode, NULL); 3929 struct ceph_vino tvino = { 3930 .ino = le64_to_cpu(rinfo->targeti.in->ino), 3931 .snap = le64_to_cpu(rinfo->targeti.in->snapid) 3932 }; 3933 3934 /* 3935 * If we ended up opening an existing inode, discard 3936 * r_new_inode 3937 */ 3938 if (req->r_op == CEPH_MDS_OP_CREATE && 3939 !req->r_reply_info.has_create_ino) { 3940 /* This should never happen on an async create */ 3941 WARN_ON_ONCE(req->r_deleg_ino); 3942 iput(in); 3943 in = NULL; 3944 } 3945 3946 in = ceph_get_inode(mdsc->fsc->sb, tvino, in); 3947 if (IS_ERR(in)) { 3948 err = PTR_ERR(in); 3949 mutex_lock(&session->s_mutex); 3950 goto out_err; 3951 } 3952 req->r_target_inode = in; 3953 } 3954 3955 mutex_lock(&session->s_mutex); 3956 if (err < 0) { 3957 pr_err_client(cl, "got corrupt reply mds%d(tid:%lld)\n", 3958 mds, tid); 3959 ceph_msg_dump(msg); 3960 goto out_err; 3961 } 3962 3963 /* snap trace */ 3964 realm = NULL; 3965 if (rinfo->snapblob_len) { 3966 down_write(&mdsc->snap_rwsem); 3967 err = ceph_update_snap_trace(mdsc, rinfo->snapblob, 3968 rinfo->snapblob + rinfo->snapblob_len, 3969 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP, 3970 &realm); 3971 if (err) { 3972 up_write(&mdsc->snap_rwsem); 3973 close_sessions = true; 3974 if (err == -EIO) 3975 ceph_msg_dump(msg); 3976 goto out_err; 3977 } 3978 downgrade_write(&mdsc->snap_rwsem); 3979 } else { 3980 down_read(&mdsc->snap_rwsem); 3981 } 3982 3983 /* insert trace into our cache */ 3984 mutex_lock(&req->r_fill_mutex); 3985 current->journal_info = req; 3986 err = ceph_fill_trace(mdsc->fsc->sb, req); 3987 if (err == 0) { 3988 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || 3989 req->r_op == CEPH_MDS_OP_LSSNAP)) 3990 err = ceph_readdir_prepopulate(req, req->r_session); 3991 } 3992 current->journal_info = NULL; 3993 mutex_unlock(&req->r_fill_mutex); 3994 3995 up_read(&mdsc->snap_rwsem); 3996 if (realm) 3997 ceph_put_snap_realm(mdsc, realm); 3998 3999 if (err == 0) { 4000 if (req->r_target_inode && 4001 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 4002 struct ceph_inode_info *ci = 4003 ceph_inode(req->r_target_inode); 4004 spin_lock(&ci->i_unsafe_lock); 4005 list_add_tail(&req->r_unsafe_target_item, 4006 &ci->i_unsafe_iops); 4007 spin_unlock(&ci->i_unsafe_lock); 4008 } 4009 4010 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 4011 } 4012 out_err: 4013 mutex_lock(&mdsc->mutex); 4014 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 4015 if (err) { 4016 req->r_err = err; 4017 } else { 4018 req->r_reply = ceph_msg_get(msg); 4019 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags); 4020 } 4021 } else { 4022 doutc(cl, "reply arrived after request %lld was aborted\n", tid); 4023 } 4024 mutex_unlock(&mdsc->mutex); 4025 4026 mutex_unlock(&session->s_mutex); 4027 4028 /* kick calling process */ 4029 complete_request(mdsc, req); 4030 4031 ceph_update_metadata_metrics(&mdsc->metric, req->r_start_latency, 4032 req->r_end_latency, err); 4033 out: 4034 ceph_mdsc_put_request(req); 4035 4036 /* Defer closing the sessions after s_mutex lock being released */ 4037 if (close_sessions) 4038 ceph_mdsc_close_sessions(mdsc); 4039 return; 4040 } 4041 4042 4043 4044 /* 4045 * handle mds notification that our request has been forwarded. 4046 */ 4047 static void handle_forward(struct ceph_mds_client *mdsc, 4048 struct ceph_mds_session *session, 4049 struct ceph_msg *msg) 4050 { 4051 struct ceph_client *cl = mdsc->fsc->client; 4052 struct ceph_mds_request *req; 4053 u64 tid = le64_to_cpu(msg->hdr.tid); 4054 u32 next_mds; 4055 u32 fwd_seq; 4056 int err = -EINVAL; 4057 void *p = msg->front.iov_base; 4058 void *end = p + msg->front.iov_len; 4059 bool aborted = false; 4060 4061 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 4062 next_mds = ceph_decode_32(&p); 4063 fwd_seq = ceph_decode_32(&p); 4064 4065 mutex_lock(&mdsc->mutex); 4066 req = lookup_get_request(mdsc, tid); 4067 if (!req) { 4068 mutex_unlock(&mdsc->mutex); 4069 doutc(cl, "forward tid %llu to mds%d - req dne\n", tid, next_mds); 4070 return; /* dup reply? */ 4071 } 4072 4073 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 4074 doutc(cl, "forward tid %llu aborted, unregistering\n", tid); 4075 __unregister_request(mdsc, req); 4076 } else if (fwd_seq <= req->r_num_fwd || (uint32_t)fwd_seq >= U32_MAX) { 4077 /* 4078 * Avoid infinite retrying after overflow. 4079 * 4080 * The MDS will increase the fwd count and in client side 4081 * if the num_fwd is less than the one saved in request 4082 * that means the MDS is an old version and overflowed of 4083 * 8 bits. 4084 */ 4085 mutex_lock(&req->r_fill_mutex); 4086 req->r_err = -EMULTIHOP; 4087 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 4088 mutex_unlock(&req->r_fill_mutex); 4089 aborted = true; 4090 pr_warn_ratelimited_client(cl, "forward tid %llu seq overflow\n", 4091 tid); 4092 } else { 4093 /* resend. forward race not possible; mds would drop */ 4094 doutc(cl, "forward tid %llu to mds%d (we resend)\n", tid, next_mds); 4095 BUG_ON(req->r_err); 4096 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)); 4097 req->r_attempts = 0; 4098 req->r_num_fwd = fwd_seq; 4099 req->r_resend_mds = next_mds; 4100 put_request_session(req); 4101 __do_request(mdsc, req); 4102 } 4103 mutex_unlock(&mdsc->mutex); 4104 4105 /* kick calling process */ 4106 if (aborted) 4107 complete_request(mdsc, req); 4108 ceph_mdsc_put_request(req); 4109 return; 4110 4111 bad: 4112 pr_err_client(cl, "decode error err=%d\n", err); 4113 ceph_msg_dump(msg); 4114 } 4115 4116 static int __decode_session_metadata(void **p, void *end, 4117 bool *blocklisted) 4118 { 4119 /* map<string,string> */ 4120 u32 n; 4121 bool err_str; 4122 ceph_decode_32_safe(p, end, n, bad); 4123 while (n-- > 0) { 4124 u32 len; 4125 ceph_decode_32_safe(p, end, len, bad); 4126 ceph_decode_need(p, end, len, bad); 4127 err_str = !strncmp(*p, "error_string", len); 4128 *p += len; 4129 ceph_decode_32_safe(p, end, len, bad); 4130 ceph_decode_need(p, end, len, bad); 4131 /* 4132 * Match "blocklisted (blacklisted)" from newer MDSes, 4133 * or "blacklisted" from older MDSes. 4134 */ 4135 if (err_str && strnstr(*p, "blacklisted", len)) 4136 *blocklisted = true; 4137 *p += len; 4138 } 4139 return 0; 4140 bad: 4141 return -1; 4142 } 4143 4144 /* 4145 * handle a mds session control message 4146 */ 4147 static void handle_session(struct ceph_mds_session *session, 4148 struct ceph_msg *msg) 4149 { 4150 struct ceph_mds_client *mdsc = session->s_mdsc; 4151 struct ceph_client *cl = mdsc->fsc->client; 4152 int mds = session->s_mds; 4153 int msg_version = le16_to_cpu(msg->hdr.version); 4154 void *p = msg->front.iov_base; 4155 void *end = p + msg->front.iov_len; 4156 struct ceph_mds_session_head *h; 4157 struct ceph_mds_cap_auth *cap_auths = NULL; 4158 u32 op, cap_auths_num = 0; 4159 u64 seq, features = 0; 4160 int wake = 0; 4161 bool blocklisted = false; 4162 u32 i; 4163 4164 4165 /* decode */ 4166 ceph_decode_need(&p, end, sizeof(*h), bad); 4167 h = p; 4168 p += sizeof(*h); 4169 4170 op = le32_to_cpu(h->op); 4171 seq = le64_to_cpu(h->seq); 4172 4173 if (msg_version >= 3) { 4174 u32 len; 4175 /* version >= 2 and < 5, decode metadata, skip otherwise 4176 * as it's handled via flags. 4177 */ 4178 if (msg_version >= 5) 4179 ceph_decode_skip_map(&p, end, string, string, bad); 4180 else if (__decode_session_metadata(&p, end, &blocklisted) < 0) 4181 goto bad; 4182 4183 /* version >= 3, feature bits */ 4184 ceph_decode_32_safe(&p, end, len, bad); 4185 if (len) { 4186 ceph_decode_64_safe(&p, end, features, bad); 4187 p += len - sizeof(features); 4188 } 4189 } 4190 4191 if (msg_version >= 5) { 4192 u32 flags, len; 4193 4194 /* version >= 4 */ 4195 ceph_decode_skip_16(&p, end, bad); /* struct_v, struct_cv */ 4196 ceph_decode_32_safe(&p, end, len, bad); /* len */ 4197 ceph_decode_skip_n(&p, end, len, bad); /* metric_spec */ 4198 4199 /* version >= 5, flags */ 4200 ceph_decode_32_safe(&p, end, flags, bad); 4201 if (flags & CEPH_SESSION_BLOCKLISTED) { 4202 pr_warn_client(cl, "mds%d session blocklisted\n", 4203 session->s_mds); 4204 blocklisted = true; 4205 } 4206 } 4207 4208 if (msg_version >= 6) { 4209 ceph_decode_32_safe(&p, end, cap_auths_num, bad); 4210 doutc(cl, "cap_auths_num %d\n", cap_auths_num); 4211 4212 if (cap_auths_num && op != CEPH_SESSION_OPEN) { 4213 WARN_ON_ONCE(op != CEPH_SESSION_OPEN); 4214 goto skip_cap_auths; 4215 } 4216 4217 cap_auths = kcalloc(cap_auths_num, 4218 sizeof(struct ceph_mds_cap_auth), 4219 GFP_KERNEL); 4220 if (!cap_auths) { 4221 pr_err_client(cl, "No memory for cap_auths\n"); 4222 return; 4223 } 4224 4225 for (i = 0; i < cap_auths_num; i++) { 4226 u32 _len, j; 4227 4228 /* struct_v, struct_compat, and struct_len in MDSCapAuth */ 4229 ceph_decode_skip_n(&p, end, 2 + sizeof(u32), bad); 4230 4231 /* struct_v, struct_compat, and struct_len in MDSCapMatch */ 4232 ceph_decode_skip_n(&p, end, 2 + sizeof(u32), bad); 4233 ceph_decode_64_safe(&p, end, cap_auths[i].match.uid, bad); 4234 ceph_decode_32_safe(&p, end, _len, bad); 4235 if (_len) { 4236 cap_auths[i].match.gids = kcalloc(_len, sizeof(u32), 4237 GFP_KERNEL); 4238 if (!cap_auths[i].match.gids) { 4239 pr_err_client(cl, "No memory for gids\n"); 4240 goto fail; 4241 } 4242 4243 cap_auths[i].match.num_gids = _len; 4244 for (j = 0; j < _len; j++) 4245 ceph_decode_32_safe(&p, end, 4246 cap_auths[i].match.gids[j], 4247 bad); 4248 } 4249 4250 ceph_decode_32_safe(&p, end, _len, bad); 4251 if (_len) { 4252 cap_auths[i].match.path = kcalloc(_len + 1, sizeof(char), 4253 GFP_KERNEL); 4254 if (!cap_auths[i].match.path) { 4255 pr_err_client(cl, "No memory for path\n"); 4256 goto fail; 4257 } 4258 ceph_decode_copy(&p, cap_auths[i].match.path, _len); 4259 4260 /* Remove the tailing '/' */ 4261 while (_len && cap_auths[i].match.path[_len - 1] == '/') { 4262 cap_auths[i].match.path[_len - 1] = '\0'; 4263 _len -= 1; 4264 } 4265 } 4266 4267 ceph_decode_32_safe(&p, end, _len, bad); 4268 if (_len) { 4269 cap_auths[i].match.fs_name = kcalloc(_len + 1, sizeof(char), 4270 GFP_KERNEL); 4271 if (!cap_auths[i].match.fs_name) { 4272 pr_err_client(cl, "No memory for fs_name\n"); 4273 goto fail; 4274 } 4275 ceph_decode_copy(&p, cap_auths[i].match.fs_name, _len); 4276 } 4277 4278 ceph_decode_8_safe(&p, end, cap_auths[i].match.root_squash, bad); 4279 ceph_decode_8_safe(&p, end, cap_auths[i].readable, bad); 4280 ceph_decode_8_safe(&p, end, cap_auths[i].writeable, bad); 4281 doutc(cl, "uid %lld, num_gids %u, path %s, fs_name %s, root_squash %d, readable %d, writeable %d\n", 4282 cap_auths[i].match.uid, cap_auths[i].match.num_gids, 4283 cap_auths[i].match.path, cap_auths[i].match.fs_name, 4284 cap_auths[i].match.root_squash, 4285 cap_auths[i].readable, cap_auths[i].writeable); 4286 } 4287 } 4288 4289 skip_cap_auths: 4290 mutex_lock(&mdsc->mutex); 4291 if (op == CEPH_SESSION_OPEN) { 4292 if (mdsc->s_cap_auths) { 4293 for (i = 0; i < mdsc->s_cap_auths_num; i++) { 4294 kfree(mdsc->s_cap_auths[i].match.gids); 4295 kfree(mdsc->s_cap_auths[i].match.path); 4296 kfree(mdsc->s_cap_auths[i].match.fs_name); 4297 } 4298 kfree(mdsc->s_cap_auths); 4299 } 4300 mdsc->s_cap_auths_num = cap_auths_num; 4301 mdsc->s_cap_auths = cap_auths; 4302 } 4303 if (op == CEPH_SESSION_CLOSE) { 4304 ceph_get_mds_session(session); 4305 __unregister_session(mdsc, session); 4306 } 4307 /* FIXME: this ttl calculation is generous */ 4308 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 4309 mutex_unlock(&mdsc->mutex); 4310 4311 mutex_lock(&session->s_mutex); 4312 4313 doutc(cl, "mds%d %s %p state %s seq %llu\n", mds, 4314 ceph_session_op_name(op), session, 4315 ceph_session_state_name(session->s_state), seq); 4316 4317 if (session->s_state == CEPH_MDS_SESSION_HUNG) { 4318 session->s_state = CEPH_MDS_SESSION_OPEN; 4319 pr_info_client(cl, "mds%d came back\n", session->s_mds); 4320 } 4321 4322 switch (op) { 4323 case CEPH_SESSION_OPEN: 4324 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 4325 pr_info_client(cl, "mds%d reconnect success\n", 4326 session->s_mds); 4327 4328 session->s_features = features; 4329 if (session->s_state == CEPH_MDS_SESSION_OPEN) { 4330 pr_notice_client(cl, "mds%d is already opened\n", 4331 session->s_mds); 4332 } else { 4333 session->s_state = CEPH_MDS_SESSION_OPEN; 4334 renewed_caps(mdsc, session, 0); 4335 if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, 4336 &session->s_features)) 4337 metric_schedule_delayed(&mdsc->metric); 4338 } 4339 4340 /* 4341 * The connection maybe broken and the session in client 4342 * side has been reinitialized, need to update the seq 4343 * anyway. 4344 */ 4345 if (!session->s_seq && seq) 4346 session->s_seq = seq; 4347 4348 wake = 1; 4349 if (mdsc->stopping) 4350 __close_session(mdsc, session); 4351 break; 4352 4353 case CEPH_SESSION_RENEWCAPS: 4354 if (session->s_renew_seq == seq) 4355 renewed_caps(mdsc, session, 1); 4356 break; 4357 4358 case CEPH_SESSION_CLOSE: 4359 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 4360 pr_info_client(cl, "mds%d reconnect denied\n", 4361 session->s_mds); 4362 session->s_state = CEPH_MDS_SESSION_CLOSED; 4363 cleanup_session_requests(mdsc, session); 4364 remove_session_caps(session); 4365 wake = 2; /* for good measure */ 4366 wake_up_all(&mdsc->session_close_wq); 4367 break; 4368 4369 case CEPH_SESSION_STALE: 4370 pr_info_client(cl, "mds%d caps went stale, renewing\n", 4371 session->s_mds); 4372 atomic_inc(&session->s_cap_gen); 4373 session->s_cap_ttl = jiffies - 1; 4374 send_renew_caps(mdsc, session); 4375 break; 4376 4377 case CEPH_SESSION_RECALL_STATE: 4378 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 4379 break; 4380 4381 case CEPH_SESSION_FLUSHMSG: 4382 /* flush cap releases */ 4383 spin_lock(&session->s_cap_lock); 4384 if (session->s_num_cap_releases) 4385 ceph_flush_session_cap_releases(mdsc, session); 4386 spin_unlock(&session->s_cap_lock); 4387 4388 send_flushmsg_ack(mdsc, session, seq); 4389 break; 4390 4391 case CEPH_SESSION_FORCE_RO: 4392 doutc(cl, "force_session_readonly %p\n", session); 4393 spin_lock(&session->s_cap_lock); 4394 session->s_readonly = true; 4395 spin_unlock(&session->s_cap_lock); 4396 wake_up_session_caps(session, FORCE_RO); 4397 break; 4398 4399 case CEPH_SESSION_REJECT: 4400 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING); 4401 pr_info_client(cl, "mds%d rejected session\n", 4402 session->s_mds); 4403 session->s_state = CEPH_MDS_SESSION_REJECTED; 4404 cleanup_session_requests(mdsc, session); 4405 remove_session_caps(session); 4406 if (blocklisted) 4407 mdsc->fsc->blocklisted = true; 4408 wake = 2; /* for good measure */ 4409 break; 4410 4411 default: 4412 pr_err_client(cl, "bad op %d mds%d\n", op, mds); 4413 WARN_ON(1); 4414 } 4415 4416 mutex_unlock(&session->s_mutex); 4417 if (wake) { 4418 mutex_lock(&mdsc->mutex); 4419 __wake_requests(mdsc, &session->s_waiting); 4420 if (wake == 2) 4421 kick_requests(mdsc, mds); 4422 mutex_unlock(&mdsc->mutex); 4423 } 4424 if (op == CEPH_SESSION_CLOSE) 4425 ceph_put_mds_session(session); 4426 return; 4427 4428 bad: 4429 pr_err_client(cl, "corrupt message mds%d len %d\n", mds, 4430 (int)msg->front.iov_len); 4431 ceph_msg_dump(msg); 4432 fail: 4433 for (i = 0; i < cap_auths_num; i++) { 4434 kfree(cap_auths[i].match.gids); 4435 kfree(cap_auths[i].match.path); 4436 kfree(cap_auths[i].match.fs_name); 4437 } 4438 kfree(cap_auths); 4439 return; 4440 } 4441 4442 void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req) 4443 { 4444 struct ceph_client *cl = req->r_mdsc->fsc->client; 4445 int dcaps; 4446 4447 dcaps = xchg(&req->r_dir_caps, 0); 4448 if (dcaps) { 4449 doutc(cl, "releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 4450 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps); 4451 } 4452 } 4453 4454 void ceph_mdsc_release_dir_caps_async(struct ceph_mds_request *req) 4455 { 4456 struct ceph_client *cl = req->r_mdsc->fsc->client; 4457 int dcaps; 4458 4459 dcaps = xchg(&req->r_dir_caps, 0); 4460 if (dcaps) { 4461 doutc(cl, "releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 4462 ceph_put_cap_refs_async(ceph_inode(req->r_parent), dcaps); 4463 } 4464 } 4465 4466 /* 4467 * called under session->mutex. 4468 */ 4469 static void replay_unsafe_requests(struct ceph_mds_client *mdsc, 4470 struct ceph_mds_session *session) 4471 { 4472 struct ceph_mds_request *req, *nreq; 4473 struct rb_node *p; 4474 4475 doutc(mdsc->fsc->client, "mds%d\n", session->s_mds); 4476 4477 mutex_lock(&mdsc->mutex); 4478 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) 4479 __send_request(session, req, true); 4480 4481 /* 4482 * also re-send old requests when MDS enters reconnect stage. So that MDS 4483 * can process completed request in clientreplay stage. 4484 */ 4485 p = rb_first(&mdsc->request_tree); 4486 while (p) { 4487 req = rb_entry(p, struct ceph_mds_request, r_node); 4488 p = rb_next(p); 4489 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 4490 continue; 4491 if (req->r_attempts == 0) 4492 continue; /* only old requests */ 4493 if (!req->r_session) 4494 continue; 4495 if (req->r_session->s_mds != session->s_mds) 4496 continue; 4497 4498 ceph_mdsc_release_dir_caps_async(req); 4499 4500 __send_request(session, req, true); 4501 } 4502 mutex_unlock(&mdsc->mutex); 4503 } 4504 4505 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state) 4506 { 4507 struct ceph_msg *reply; 4508 struct ceph_pagelist *_pagelist; 4509 struct page *page; 4510 __le32 *addr; 4511 int err = -ENOMEM; 4512 4513 if (!recon_state->allow_multi) 4514 return -ENOSPC; 4515 4516 /* can't handle message that contains both caps and realm */ 4517 BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms); 4518 4519 /* pre-allocate new pagelist */ 4520 _pagelist = ceph_pagelist_alloc(GFP_NOFS); 4521 if (!_pagelist) 4522 return -ENOMEM; 4523 4524 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 4525 if (!reply) 4526 goto fail_msg; 4527 4528 /* placeholder for nr_caps */ 4529 err = ceph_pagelist_encode_32(_pagelist, 0); 4530 if (err < 0) 4531 goto fail; 4532 4533 if (recon_state->nr_caps) { 4534 /* currently encoding caps */ 4535 err = ceph_pagelist_encode_32(recon_state->pagelist, 0); 4536 if (err) 4537 goto fail; 4538 } else { 4539 /* placeholder for nr_realms (currently encoding relams) */ 4540 err = ceph_pagelist_encode_32(_pagelist, 0); 4541 if (err < 0) 4542 goto fail; 4543 } 4544 4545 err = ceph_pagelist_encode_8(recon_state->pagelist, 1); 4546 if (err) 4547 goto fail; 4548 4549 page = list_first_entry(&recon_state->pagelist->head, struct page, lru); 4550 addr = kmap_atomic(page); 4551 if (recon_state->nr_caps) { 4552 /* currently encoding caps */ 4553 *addr = cpu_to_le32(recon_state->nr_caps); 4554 } else { 4555 /* currently encoding relams */ 4556 *(addr + 1) = cpu_to_le32(recon_state->nr_realms); 4557 } 4558 kunmap_atomic(addr); 4559 4560 reply->hdr.version = cpu_to_le16(5); 4561 reply->hdr.compat_version = cpu_to_le16(4); 4562 4563 reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length); 4564 ceph_msg_data_add_pagelist(reply, recon_state->pagelist); 4565 4566 ceph_con_send(&recon_state->session->s_con, reply); 4567 ceph_pagelist_release(recon_state->pagelist); 4568 4569 recon_state->pagelist = _pagelist; 4570 recon_state->nr_caps = 0; 4571 recon_state->nr_realms = 0; 4572 recon_state->msg_version = 5; 4573 return 0; 4574 fail: 4575 ceph_msg_put(reply); 4576 fail_msg: 4577 ceph_pagelist_release(_pagelist); 4578 return err; 4579 } 4580 4581 static struct dentry* d_find_primary(struct inode *inode) 4582 { 4583 struct dentry *alias, *dn = NULL; 4584 4585 if (hlist_empty(&inode->i_dentry)) 4586 return NULL; 4587 4588 spin_lock(&inode->i_lock); 4589 if (hlist_empty(&inode->i_dentry)) 4590 goto out_unlock; 4591 4592 if (S_ISDIR(inode->i_mode)) { 4593 alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias); 4594 if (!IS_ROOT(alias)) 4595 dn = dget(alias); 4596 goto out_unlock; 4597 } 4598 4599 hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) { 4600 spin_lock(&alias->d_lock); 4601 if (!d_unhashed(alias) && 4602 (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) { 4603 dn = dget_dlock(alias); 4604 } 4605 spin_unlock(&alias->d_lock); 4606 if (dn) 4607 break; 4608 } 4609 out_unlock: 4610 spin_unlock(&inode->i_lock); 4611 return dn; 4612 } 4613 4614 /* 4615 * Encode information about a cap for a reconnect with the MDS. 4616 */ 4617 static int reconnect_caps_cb(struct inode *inode, int mds, void *arg) 4618 { 4619 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); 4620 struct ceph_client *cl = ceph_inode_to_client(inode); 4621 union { 4622 struct ceph_mds_cap_reconnect v2; 4623 struct ceph_mds_cap_reconnect_v1 v1; 4624 } rec; 4625 struct ceph_inode_info *ci = ceph_inode(inode); 4626 struct ceph_reconnect_state *recon_state = arg; 4627 struct ceph_pagelist *pagelist = recon_state->pagelist; 4628 struct dentry *dentry; 4629 struct ceph_cap *cap; 4630 struct ceph_path_info path_info = {0}; 4631 int err; 4632 u64 snap_follows; 4633 4634 dentry = d_find_primary(inode); 4635 if (dentry) { 4636 /* set pathbase to parent dir when msg_version >= 2 */ 4637 char *path = ceph_mdsc_build_path(mdsc, dentry, &path_info, 4638 recon_state->msg_version >= 2); 4639 dput(dentry); 4640 if (IS_ERR(path)) { 4641 err = PTR_ERR(path); 4642 goto out_err; 4643 } 4644 } 4645 4646 spin_lock(&ci->i_ceph_lock); 4647 cap = __get_cap_for_mds(ci, mds); 4648 if (!cap) { 4649 spin_unlock(&ci->i_ceph_lock); 4650 err = 0; 4651 goto out_err; 4652 } 4653 doutc(cl, " adding %p ino %llx.%llx cap %p %lld %s\n", inode, 4654 ceph_vinop(inode), cap, cap->cap_id, 4655 ceph_cap_string(cap->issued)); 4656 4657 cap->seq = 0; /* reset cap seq */ 4658 cap->issue_seq = 0; /* and issue_seq */ 4659 cap->mseq = 0; /* and migrate_seq */ 4660 cap->cap_gen = atomic_read(&cap->session->s_cap_gen); 4661 4662 /* These are lost when the session goes away */ 4663 if (S_ISDIR(inode->i_mode)) { 4664 if (cap->issued & CEPH_CAP_DIR_CREATE) { 4665 ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns)); 4666 memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout)); 4667 } 4668 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS; 4669 } 4670 4671 if (recon_state->msg_version >= 2) { 4672 rec.v2.cap_id = cpu_to_le64(cap->cap_id); 4673 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 4674 rec.v2.issued = cpu_to_le32(cap->issued); 4675 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 4676 rec.v2.pathbase = cpu_to_le64(path_info.vino.ino); 4677 rec.v2.flock_len = (__force __le32) 4678 ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1); 4679 } else { 4680 struct timespec64 ts; 4681 4682 rec.v1.cap_id = cpu_to_le64(cap->cap_id); 4683 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 4684 rec.v1.issued = cpu_to_le32(cap->issued); 4685 rec.v1.size = cpu_to_le64(i_size_read(inode)); 4686 ts = inode_get_mtime(inode); 4687 ceph_encode_timespec64(&rec.v1.mtime, &ts); 4688 ts = inode_get_atime(inode); 4689 ceph_encode_timespec64(&rec.v1.atime, &ts); 4690 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 4691 rec.v1.pathbase = cpu_to_le64(path_info.vino.ino); 4692 } 4693 4694 if (list_empty(&ci->i_cap_snaps)) { 4695 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0; 4696 } else { 4697 struct ceph_cap_snap *capsnap = 4698 list_first_entry(&ci->i_cap_snaps, 4699 struct ceph_cap_snap, ci_item); 4700 snap_follows = capsnap->follows; 4701 } 4702 spin_unlock(&ci->i_ceph_lock); 4703 4704 if (recon_state->msg_version >= 2) { 4705 int num_fcntl_locks, num_flock_locks; 4706 struct ceph_filelock *flocks = NULL; 4707 size_t struct_len, total_len = sizeof(u64); 4708 u8 struct_v = 0; 4709 4710 encode_again: 4711 if (rec.v2.flock_len) { 4712 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); 4713 } else { 4714 num_fcntl_locks = 0; 4715 num_flock_locks = 0; 4716 } 4717 if (num_fcntl_locks + num_flock_locks > 0) { 4718 flocks = kmalloc_array(num_fcntl_locks + num_flock_locks, 4719 sizeof(struct ceph_filelock), 4720 GFP_NOFS); 4721 if (!flocks) { 4722 err = -ENOMEM; 4723 goto out_err; 4724 } 4725 err = ceph_encode_locks_to_buffer(inode, flocks, 4726 num_fcntl_locks, 4727 num_flock_locks); 4728 if (err) { 4729 kfree(flocks); 4730 flocks = NULL; 4731 if (err == -ENOSPC) 4732 goto encode_again; 4733 goto out_err; 4734 } 4735 } else { 4736 kfree(flocks); 4737 flocks = NULL; 4738 } 4739 4740 if (recon_state->msg_version >= 3) { 4741 /* version, compat_version and struct_len */ 4742 total_len += 2 * sizeof(u8) + sizeof(u32); 4743 struct_v = 2; 4744 } 4745 /* 4746 * number of encoded locks is stable, so copy to pagelist 4747 */ 4748 struct_len = 2 * sizeof(u32) + 4749 (num_fcntl_locks + num_flock_locks) * 4750 sizeof(struct ceph_filelock); 4751 rec.v2.flock_len = cpu_to_le32(struct_len); 4752 4753 struct_len += sizeof(u32) + path_info.pathlen + sizeof(rec.v2); 4754 4755 if (struct_v >= 2) 4756 struct_len += sizeof(u64); /* snap_follows */ 4757 4758 total_len += struct_len; 4759 4760 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) { 4761 err = send_reconnect_partial(recon_state); 4762 if (err) 4763 goto out_freeflocks; 4764 pagelist = recon_state->pagelist; 4765 } 4766 4767 err = ceph_pagelist_reserve(pagelist, total_len); 4768 if (err) 4769 goto out_freeflocks; 4770 4771 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 4772 if (recon_state->msg_version >= 3) { 4773 ceph_pagelist_encode_8(pagelist, struct_v); 4774 ceph_pagelist_encode_8(pagelist, 1); 4775 ceph_pagelist_encode_32(pagelist, struct_len); 4776 } 4777 ceph_pagelist_encode_string(pagelist, (char *)path_info.path, path_info.pathlen); 4778 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2)); 4779 ceph_locks_to_pagelist(flocks, pagelist, 4780 num_fcntl_locks, num_flock_locks); 4781 if (struct_v >= 2) 4782 ceph_pagelist_encode_64(pagelist, snap_follows); 4783 out_freeflocks: 4784 kfree(flocks); 4785 } else { 4786 err = ceph_pagelist_reserve(pagelist, 4787 sizeof(u64) + sizeof(u32) + 4788 path_info.pathlen + sizeof(rec.v1)); 4789 if (err) 4790 goto out_err; 4791 4792 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 4793 ceph_pagelist_encode_string(pagelist, (char *)path_info.path, path_info.pathlen); 4794 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); 4795 } 4796 4797 out_err: 4798 ceph_mdsc_free_path_info(&path_info); 4799 if (!err) 4800 recon_state->nr_caps++; 4801 return err; 4802 } 4803 4804 static int encode_snap_realms(struct ceph_mds_client *mdsc, 4805 struct ceph_reconnect_state *recon_state) 4806 { 4807 struct rb_node *p; 4808 struct ceph_pagelist *pagelist = recon_state->pagelist; 4809 struct ceph_client *cl = mdsc->fsc->client; 4810 int err = 0; 4811 4812 if (recon_state->msg_version >= 4) { 4813 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms); 4814 if (err < 0) 4815 goto fail; 4816 } 4817 4818 /* 4819 * snaprealms. we provide mds with the ino, seq (version), and 4820 * parent for all of our realms. If the mds has any newer info, 4821 * it will tell us. 4822 */ 4823 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { 4824 struct ceph_snap_realm *realm = 4825 rb_entry(p, struct ceph_snap_realm, node); 4826 struct ceph_mds_snaprealm_reconnect sr_rec; 4827 4828 if (recon_state->msg_version >= 4) { 4829 size_t need = sizeof(u8) * 2 + sizeof(u32) + 4830 sizeof(sr_rec); 4831 4832 if (pagelist->length + need > RECONNECT_MAX_SIZE) { 4833 err = send_reconnect_partial(recon_state); 4834 if (err) 4835 goto fail; 4836 pagelist = recon_state->pagelist; 4837 } 4838 4839 err = ceph_pagelist_reserve(pagelist, need); 4840 if (err) 4841 goto fail; 4842 4843 ceph_pagelist_encode_8(pagelist, 1); 4844 ceph_pagelist_encode_8(pagelist, 1); 4845 ceph_pagelist_encode_32(pagelist, sizeof(sr_rec)); 4846 } 4847 4848 doutc(cl, " adding snap realm %llx seq %lld parent %llx\n", 4849 realm->ino, realm->seq, realm->parent_ino); 4850 sr_rec.ino = cpu_to_le64(realm->ino); 4851 sr_rec.seq = cpu_to_le64(realm->seq); 4852 sr_rec.parent = cpu_to_le64(realm->parent_ino); 4853 4854 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); 4855 if (err) 4856 goto fail; 4857 4858 recon_state->nr_realms++; 4859 } 4860 fail: 4861 return err; 4862 } 4863 4864 4865 /* 4866 * If an MDS fails and recovers, clients need to reconnect in order to 4867 * reestablish shared state. This includes all caps issued through 4868 * this session _and_ the snap_realm hierarchy. Because it's not 4869 * clear which snap realms the mds cares about, we send everything we 4870 * know about.. that ensures we'll then get any new info the 4871 * recovering MDS might have. 4872 * 4873 * This is a relatively heavyweight operation, but it's rare. 4874 */ 4875 static void send_mds_reconnect(struct ceph_mds_client *mdsc, 4876 struct ceph_mds_session *session) 4877 { 4878 struct ceph_client *cl = mdsc->fsc->client; 4879 struct ceph_msg *reply; 4880 int mds = session->s_mds; 4881 int err = -ENOMEM; 4882 struct ceph_reconnect_state recon_state = { 4883 .session = session, 4884 }; 4885 LIST_HEAD(dispose); 4886 4887 pr_info_client(cl, "mds%d reconnect start\n", mds); 4888 4889 recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS); 4890 if (!recon_state.pagelist) 4891 goto fail_nopagelist; 4892 4893 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 4894 if (!reply) 4895 goto fail_nomsg; 4896 4897 xa_destroy(&session->s_delegated_inos); 4898 4899 mutex_lock(&session->s_mutex); 4900 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 4901 session->s_seq = 0; 4902 4903 doutc(cl, "session %p state %s\n", session, 4904 ceph_session_state_name(session->s_state)); 4905 4906 atomic_inc(&session->s_cap_gen); 4907 4908 spin_lock(&session->s_cap_lock); 4909 /* don't know if session is readonly */ 4910 session->s_readonly = 0; 4911 /* 4912 * notify __ceph_remove_cap() that we are composing cap reconnect. 4913 * If a cap get released before being added to the cap reconnect, 4914 * __ceph_remove_cap() should skip queuing cap release. 4915 */ 4916 session->s_cap_reconnect = 1; 4917 /* drop old cap expires; we're about to reestablish that state */ 4918 detach_cap_releases(session, &dispose); 4919 spin_unlock(&session->s_cap_lock); 4920 dispose_cap_releases(mdsc, &dispose); 4921 4922 /* trim unused caps to reduce MDS's cache rejoin time */ 4923 if (mdsc->fsc->sb->s_root) 4924 shrink_dcache_parent(mdsc->fsc->sb->s_root); 4925 4926 ceph_con_close(&session->s_con); 4927 ceph_con_open(&session->s_con, 4928 CEPH_ENTITY_TYPE_MDS, mds, 4929 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 4930 4931 /* replay unsafe requests */ 4932 replay_unsafe_requests(mdsc, session); 4933 4934 ceph_early_kick_flushing_caps(mdsc, session); 4935 4936 down_read(&mdsc->snap_rwsem); 4937 4938 /* placeholder for nr_caps */ 4939 err = ceph_pagelist_encode_32(recon_state.pagelist, 0); 4940 if (err) 4941 goto fail; 4942 4943 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) { 4944 recon_state.msg_version = 3; 4945 recon_state.allow_multi = true; 4946 } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) { 4947 recon_state.msg_version = 3; 4948 } else { 4949 recon_state.msg_version = 2; 4950 } 4951 /* traverse this session's caps */ 4952 err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state); 4953 4954 spin_lock(&session->s_cap_lock); 4955 session->s_cap_reconnect = 0; 4956 spin_unlock(&session->s_cap_lock); 4957 4958 if (err < 0) 4959 goto fail; 4960 4961 /* check if all realms can be encoded into current message */ 4962 if (mdsc->num_snap_realms) { 4963 size_t total_len = 4964 recon_state.pagelist->length + 4965 mdsc->num_snap_realms * 4966 sizeof(struct ceph_mds_snaprealm_reconnect); 4967 if (recon_state.msg_version >= 4) { 4968 /* number of realms */ 4969 total_len += sizeof(u32); 4970 /* version, compat_version and struct_len */ 4971 total_len += mdsc->num_snap_realms * 4972 (2 * sizeof(u8) + sizeof(u32)); 4973 } 4974 if (total_len > RECONNECT_MAX_SIZE) { 4975 if (!recon_state.allow_multi) { 4976 err = -ENOSPC; 4977 goto fail; 4978 } 4979 if (recon_state.nr_caps) { 4980 err = send_reconnect_partial(&recon_state); 4981 if (err) 4982 goto fail; 4983 } 4984 recon_state.msg_version = 5; 4985 } 4986 } 4987 4988 err = encode_snap_realms(mdsc, &recon_state); 4989 if (err < 0) 4990 goto fail; 4991 4992 if (recon_state.msg_version >= 5) { 4993 err = ceph_pagelist_encode_8(recon_state.pagelist, 0); 4994 if (err < 0) 4995 goto fail; 4996 } 4997 4998 if (recon_state.nr_caps || recon_state.nr_realms) { 4999 struct page *page = 5000 list_first_entry(&recon_state.pagelist->head, 5001 struct page, lru); 5002 __le32 *addr = kmap_atomic(page); 5003 if (recon_state.nr_caps) { 5004 WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms); 5005 *addr = cpu_to_le32(recon_state.nr_caps); 5006 } else if (recon_state.msg_version >= 4) { 5007 *(addr + 1) = cpu_to_le32(recon_state.nr_realms); 5008 } 5009 kunmap_atomic(addr); 5010 } 5011 5012 reply->hdr.version = cpu_to_le16(recon_state.msg_version); 5013 if (recon_state.msg_version >= 4) 5014 reply->hdr.compat_version = cpu_to_le16(4); 5015 5016 reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length); 5017 ceph_msg_data_add_pagelist(reply, recon_state.pagelist); 5018 5019 ceph_con_send(&session->s_con, reply); 5020 5021 mutex_unlock(&session->s_mutex); 5022 5023 mutex_lock(&mdsc->mutex); 5024 __wake_requests(mdsc, &session->s_waiting); 5025 mutex_unlock(&mdsc->mutex); 5026 5027 up_read(&mdsc->snap_rwsem); 5028 ceph_pagelist_release(recon_state.pagelist); 5029 return; 5030 5031 fail: 5032 ceph_msg_put(reply); 5033 up_read(&mdsc->snap_rwsem); 5034 mutex_unlock(&session->s_mutex); 5035 fail_nomsg: 5036 ceph_pagelist_release(recon_state.pagelist); 5037 fail_nopagelist: 5038 pr_err_client(cl, "error %d preparing reconnect for mds%d\n", 5039 err, mds); 5040 return; 5041 } 5042 5043 5044 /* 5045 * compare old and new mdsmaps, kicking requests 5046 * and closing out old connections as necessary 5047 * 5048 * called under mdsc->mutex. 5049 */ 5050 static void check_new_map(struct ceph_mds_client *mdsc, 5051 struct ceph_mdsmap *newmap, 5052 struct ceph_mdsmap *oldmap) 5053 { 5054 int i, j, err; 5055 int oldstate, newstate; 5056 struct ceph_mds_session *s; 5057 unsigned long targets[DIV_ROUND_UP(CEPH_MAX_MDS, sizeof(unsigned long))] = {0}; 5058 struct ceph_client *cl = mdsc->fsc->client; 5059 5060 doutc(cl, "new %u old %u\n", newmap->m_epoch, oldmap->m_epoch); 5061 5062 if (newmap->m_info) { 5063 for (i = 0; i < newmap->possible_max_rank; i++) { 5064 for (j = 0; j < newmap->m_info[i].num_export_targets; j++) 5065 set_bit(newmap->m_info[i].export_targets[j], targets); 5066 } 5067 } 5068 5069 for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) { 5070 if (!mdsc->sessions[i]) 5071 continue; 5072 s = mdsc->sessions[i]; 5073 oldstate = ceph_mdsmap_get_state(oldmap, i); 5074 newstate = ceph_mdsmap_get_state(newmap, i); 5075 5076 doutc(cl, "mds%d state %s%s -> %s%s (session %s)\n", 5077 i, ceph_mds_state_name(oldstate), 5078 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", 5079 ceph_mds_state_name(newstate), 5080 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", 5081 ceph_session_state_name(s->s_state)); 5082 5083 if (i >= newmap->possible_max_rank) { 5084 /* force close session for stopped mds */ 5085 ceph_get_mds_session(s); 5086 __unregister_session(mdsc, s); 5087 __wake_requests(mdsc, &s->s_waiting); 5088 mutex_unlock(&mdsc->mutex); 5089 5090 mutex_lock(&s->s_mutex); 5091 cleanup_session_requests(mdsc, s); 5092 remove_session_caps(s); 5093 mutex_unlock(&s->s_mutex); 5094 5095 ceph_put_mds_session(s); 5096 5097 mutex_lock(&mdsc->mutex); 5098 kick_requests(mdsc, i); 5099 continue; 5100 } 5101 5102 if (memcmp(ceph_mdsmap_get_addr(oldmap, i), 5103 ceph_mdsmap_get_addr(newmap, i), 5104 sizeof(struct ceph_entity_addr))) { 5105 /* just close it */ 5106 mutex_unlock(&mdsc->mutex); 5107 mutex_lock(&s->s_mutex); 5108 mutex_lock(&mdsc->mutex); 5109 ceph_con_close(&s->s_con); 5110 mutex_unlock(&s->s_mutex); 5111 s->s_state = CEPH_MDS_SESSION_RESTARTING; 5112 } else if (oldstate == newstate) { 5113 continue; /* nothing new with this mds */ 5114 } 5115 5116 /* 5117 * send reconnect? 5118 */ 5119 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 5120 newstate >= CEPH_MDS_STATE_RECONNECT) { 5121 mutex_unlock(&mdsc->mutex); 5122 clear_bit(i, targets); 5123 send_mds_reconnect(mdsc, s); 5124 mutex_lock(&mdsc->mutex); 5125 } 5126 5127 /* 5128 * kick request on any mds that has gone active. 5129 */ 5130 if (oldstate < CEPH_MDS_STATE_ACTIVE && 5131 newstate >= CEPH_MDS_STATE_ACTIVE) { 5132 if (oldstate != CEPH_MDS_STATE_CREATING && 5133 oldstate != CEPH_MDS_STATE_STARTING) 5134 pr_info_client(cl, "mds%d recovery completed\n", 5135 s->s_mds); 5136 kick_requests(mdsc, i); 5137 mutex_unlock(&mdsc->mutex); 5138 mutex_lock(&s->s_mutex); 5139 mutex_lock(&mdsc->mutex); 5140 ceph_kick_flushing_caps(mdsc, s); 5141 mutex_unlock(&s->s_mutex); 5142 wake_up_session_caps(s, RECONNECT); 5143 } 5144 } 5145 5146 /* 5147 * Only open and reconnect sessions that don't exist yet. 5148 */ 5149 for (i = 0; i < newmap->possible_max_rank; i++) { 5150 /* 5151 * In case the import MDS is crashed just after 5152 * the EImportStart journal is flushed, so when 5153 * a standby MDS takes over it and is replaying 5154 * the EImportStart journal the new MDS daemon 5155 * will wait the client to reconnect it, but the 5156 * client may never register/open the session yet. 5157 * 5158 * Will try to reconnect that MDS daemon if the 5159 * rank number is in the export targets array and 5160 * is the up:reconnect state. 5161 */ 5162 newstate = ceph_mdsmap_get_state(newmap, i); 5163 if (!test_bit(i, targets) || newstate != CEPH_MDS_STATE_RECONNECT) 5164 continue; 5165 5166 /* 5167 * The session maybe registered and opened by some 5168 * requests which were choosing random MDSes during 5169 * the mdsc->mutex's unlock/lock gap below in rare 5170 * case. But the related MDS daemon will just queue 5171 * that requests and be still waiting for the client's 5172 * reconnection request in up:reconnect state. 5173 */ 5174 s = __ceph_lookup_mds_session(mdsc, i); 5175 if (likely(!s)) { 5176 s = __open_export_target_session(mdsc, i); 5177 if (IS_ERR(s)) { 5178 err = PTR_ERR(s); 5179 pr_err_client(cl, 5180 "failed to open export target session, err %d\n", 5181 err); 5182 continue; 5183 } 5184 } 5185 doutc(cl, "send reconnect to export target mds.%d\n", i); 5186 mutex_unlock(&mdsc->mutex); 5187 send_mds_reconnect(mdsc, s); 5188 ceph_put_mds_session(s); 5189 mutex_lock(&mdsc->mutex); 5190 } 5191 5192 for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) { 5193 s = mdsc->sessions[i]; 5194 if (!s) 5195 continue; 5196 if (!ceph_mdsmap_is_laggy(newmap, i)) 5197 continue; 5198 if (s->s_state == CEPH_MDS_SESSION_OPEN || 5199 s->s_state == CEPH_MDS_SESSION_HUNG || 5200 s->s_state == CEPH_MDS_SESSION_CLOSING) { 5201 doutc(cl, " connecting to export targets of laggy mds%d\n", i); 5202 __open_export_target_sessions(mdsc, s); 5203 } 5204 } 5205 } 5206 5207 5208 5209 /* 5210 * leases 5211 */ 5212 5213 /* 5214 * caller must hold session s_mutex, dentry->d_lock 5215 */ 5216 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry) 5217 { 5218 struct ceph_dentry_info *di = ceph_dentry(dentry); 5219 5220 ceph_put_mds_session(di->lease_session); 5221 di->lease_session = NULL; 5222 } 5223 5224 static void handle_lease(struct ceph_mds_client *mdsc, 5225 struct ceph_mds_session *session, 5226 struct ceph_msg *msg) 5227 { 5228 struct ceph_client *cl = mdsc->fsc->client; 5229 struct super_block *sb = mdsc->fsc->sb; 5230 struct inode *inode; 5231 struct dentry *parent, *dentry; 5232 struct ceph_dentry_info *di; 5233 int mds = session->s_mds; 5234 struct ceph_mds_lease *h = msg->front.iov_base; 5235 u32 seq; 5236 struct ceph_vino vino; 5237 struct qstr dname; 5238 int release = 0; 5239 5240 doutc(cl, "from mds%d\n", mds); 5241 5242 if (!ceph_inc_mds_stopping_blocker(mdsc, session)) 5243 return; 5244 5245 /* decode */ 5246 if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) 5247 goto bad; 5248 vino.ino = le64_to_cpu(h->ino); 5249 vino.snap = CEPH_NOSNAP; 5250 seq = le32_to_cpu(h->seq); 5251 dname.len = get_unaligned_le32(h + 1); 5252 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len) 5253 goto bad; 5254 dname.name = (void *)(h + 1) + sizeof(u32); 5255 5256 /* lookup inode */ 5257 inode = ceph_find_inode(sb, vino); 5258 doutc(cl, "%s, ino %llx %p %.*s\n", ceph_lease_op_name(h->action), 5259 vino.ino, inode, dname.len, dname.name); 5260 5261 mutex_lock(&session->s_mutex); 5262 if (!inode) { 5263 doutc(cl, "no inode %llx\n", vino.ino); 5264 goto release; 5265 } 5266 5267 /* dentry */ 5268 parent = d_find_alias(inode); 5269 if (!parent) { 5270 doutc(cl, "no parent dentry on inode %p\n", inode); 5271 WARN_ON(1); 5272 goto release; /* hrm... */ 5273 } 5274 dname.hash = full_name_hash(parent, dname.name, dname.len); 5275 dentry = d_lookup(parent, &dname); 5276 dput(parent); 5277 if (!dentry) 5278 goto release; 5279 5280 spin_lock(&dentry->d_lock); 5281 di = ceph_dentry(dentry); 5282 switch (h->action) { 5283 case CEPH_MDS_LEASE_REVOKE: 5284 if (di->lease_session == session) { 5285 if (ceph_seq_cmp(di->lease_seq, seq) > 0) 5286 h->seq = cpu_to_le32(di->lease_seq); 5287 __ceph_mdsc_drop_dentry_lease(dentry); 5288 } 5289 release = 1; 5290 break; 5291 5292 case CEPH_MDS_LEASE_RENEW: 5293 if (di->lease_session == session && 5294 di->lease_gen == atomic_read(&session->s_cap_gen) && 5295 di->lease_renew_from && 5296 di->lease_renew_after == 0) { 5297 unsigned long duration = 5298 msecs_to_jiffies(le32_to_cpu(h->duration_ms)); 5299 5300 di->lease_seq = seq; 5301 di->time = di->lease_renew_from + duration; 5302 di->lease_renew_after = di->lease_renew_from + 5303 (duration >> 1); 5304 di->lease_renew_from = 0; 5305 } 5306 break; 5307 } 5308 spin_unlock(&dentry->d_lock); 5309 dput(dentry); 5310 5311 if (!release) 5312 goto out; 5313 5314 release: 5315 /* let's just reuse the same message */ 5316 h->action = CEPH_MDS_LEASE_REVOKE_ACK; 5317 ceph_msg_get(msg); 5318 ceph_con_send(&session->s_con, msg); 5319 5320 out: 5321 mutex_unlock(&session->s_mutex); 5322 iput(inode); 5323 5324 ceph_dec_mds_stopping_blocker(mdsc); 5325 return; 5326 5327 bad: 5328 ceph_dec_mds_stopping_blocker(mdsc); 5329 5330 pr_err_client(cl, "corrupt lease message\n"); 5331 ceph_msg_dump(msg); 5332 } 5333 5334 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, 5335 struct dentry *dentry, char action, 5336 u32 seq) 5337 { 5338 struct ceph_client *cl = session->s_mdsc->fsc->client; 5339 struct ceph_msg *msg; 5340 struct ceph_mds_lease *lease; 5341 struct inode *dir; 5342 int len = sizeof(*lease) + sizeof(u32) + NAME_MAX; 5343 5344 doutc(cl, "identry %p %s to mds%d\n", dentry, ceph_lease_op_name(action), 5345 session->s_mds); 5346 5347 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); 5348 if (!msg) 5349 return; 5350 lease = msg->front.iov_base; 5351 lease->action = action; 5352 lease->seq = cpu_to_le32(seq); 5353 5354 spin_lock(&dentry->d_lock); 5355 dir = d_inode(dentry->d_parent); 5356 lease->ino = cpu_to_le64(ceph_ino(dir)); 5357 lease->first = lease->last = cpu_to_le64(ceph_snap(dir)); 5358 5359 put_unaligned_le32(dentry->d_name.len, lease + 1); 5360 memcpy((void *)(lease + 1) + 4, 5361 dentry->d_name.name, dentry->d_name.len); 5362 spin_unlock(&dentry->d_lock); 5363 5364 ceph_con_send(&session->s_con, msg); 5365 } 5366 5367 /* 5368 * lock unlock the session, to wait ongoing session activities 5369 */ 5370 static void lock_unlock_session(struct ceph_mds_session *s) 5371 { 5372 mutex_lock(&s->s_mutex); 5373 mutex_unlock(&s->s_mutex); 5374 } 5375 5376 static void maybe_recover_session(struct ceph_mds_client *mdsc) 5377 { 5378 struct ceph_client *cl = mdsc->fsc->client; 5379 struct ceph_fs_client *fsc = mdsc->fsc; 5380 5381 if (!ceph_test_mount_opt(fsc, CLEANRECOVER)) 5382 return; 5383 5384 if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED) 5385 return; 5386 5387 if (!READ_ONCE(fsc->blocklisted)) 5388 return; 5389 5390 pr_info_client(cl, "auto reconnect after blocklisted\n"); 5391 ceph_force_reconnect(fsc->sb); 5392 } 5393 5394 bool check_session_state(struct ceph_mds_session *s) 5395 { 5396 struct ceph_client *cl = s->s_mdsc->fsc->client; 5397 5398 switch (s->s_state) { 5399 case CEPH_MDS_SESSION_OPEN: 5400 if (s->s_ttl && time_after(jiffies, s->s_ttl)) { 5401 s->s_state = CEPH_MDS_SESSION_HUNG; 5402 pr_info_client(cl, "mds%d hung\n", s->s_mds); 5403 } 5404 break; 5405 case CEPH_MDS_SESSION_CLOSING: 5406 case CEPH_MDS_SESSION_NEW: 5407 case CEPH_MDS_SESSION_RESTARTING: 5408 case CEPH_MDS_SESSION_CLOSED: 5409 case CEPH_MDS_SESSION_REJECTED: 5410 return false; 5411 } 5412 5413 return true; 5414 } 5415 5416 /* 5417 * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply, 5418 * then we need to retransmit that request. 5419 */ 5420 void inc_session_sequence(struct ceph_mds_session *s) 5421 { 5422 struct ceph_client *cl = s->s_mdsc->fsc->client; 5423 5424 lockdep_assert_held(&s->s_mutex); 5425 5426 s->s_seq++; 5427 5428 if (s->s_state == CEPH_MDS_SESSION_CLOSING) { 5429 int ret; 5430 5431 doutc(cl, "resending session close request for mds%d\n", s->s_mds); 5432 ret = request_close_session(s); 5433 if (ret < 0) 5434 pr_err_client(cl, "unable to close session to mds%d: %d\n", 5435 s->s_mds, ret); 5436 } 5437 } 5438 5439 /* 5440 * delayed work -- periodically trim expired leases, renew caps with mds. If 5441 * the @delay parameter is set to 0 or if it's more than 5 secs, the default 5442 * workqueue delay value of 5 secs will be used. 5443 */ 5444 static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay) 5445 { 5446 unsigned long max_delay = HZ * 5; 5447 5448 /* 5 secs default delay */ 5449 if (!delay || (delay > max_delay)) 5450 delay = max_delay; 5451 schedule_delayed_work(&mdsc->delayed_work, 5452 round_jiffies_relative(delay)); 5453 } 5454 5455 static void delayed_work(struct work_struct *work) 5456 { 5457 struct ceph_mds_client *mdsc = 5458 container_of(work, struct ceph_mds_client, delayed_work.work); 5459 unsigned long delay; 5460 int renew_interval; 5461 int renew_caps; 5462 int i; 5463 5464 doutc(mdsc->fsc->client, "mdsc delayed_work\n"); 5465 5466 if (mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHED) 5467 return; 5468 5469 mutex_lock(&mdsc->mutex); 5470 renew_interval = mdsc->mdsmap->m_session_timeout >> 2; 5471 renew_caps = time_after_eq(jiffies, HZ*renew_interval + 5472 mdsc->last_renew_caps); 5473 if (renew_caps) 5474 mdsc->last_renew_caps = jiffies; 5475 5476 for (i = 0; i < mdsc->max_sessions; i++) { 5477 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 5478 if (!s) 5479 continue; 5480 5481 if (!check_session_state(s)) { 5482 ceph_put_mds_session(s); 5483 continue; 5484 } 5485 mutex_unlock(&mdsc->mutex); 5486 5487 ceph_flush_session_cap_releases(mdsc, s); 5488 5489 mutex_lock(&s->s_mutex); 5490 if (renew_caps) 5491 send_renew_caps(mdsc, s); 5492 else 5493 ceph_con_keepalive(&s->s_con); 5494 if (s->s_state == CEPH_MDS_SESSION_OPEN || 5495 s->s_state == CEPH_MDS_SESSION_HUNG) 5496 ceph_send_cap_releases(mdsc, s); 5497 mutex_unlock(&s->s_mutex); 5498 ceph_put_mds_session(s); 5499 5500 mutex_lock(&mdsc->mutex); 5501 } 5502 mutex_unlock(&mdsc->mutex); 5503 5504 delay = ceph_check_delayed_caps(mdsc); 5505 5506 ceph_queue_cap_reclaim_work(mdsc); 5507 5508 ceph_trim_snapid_map(mdsc); 5509 5510 maybe_recover_session(mdsc); 5511 5512 schedule_delayed(mdsc, delay); 5513 } 5514 5515 int ceph_mdsc_init(struct ceph_fs_client *fsc) 5516 5517 { 5518 struct ceph_mds_client *mdsc; 5519 int err; 5520 5521 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS); 5522 if (!mdsc) 5523 return -ENOMEM; 5524 mdsc->fsc = fsc; 5525 mutex_init(&mdsc->mutex); 5526 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); 5527 if (!mdsc->mdsmap) { 5528 err = -ENOMEM; 5529 goto err_mdsc; 5530 } 5531 5532 init_completion(&mdsc->safe_umount_waiters); 5533 spin_lock_init(&mdsc->stopping_lock); 5534 atomic_set(&mdsc->stopping_blockers, 0); 5535 init_completion(&mdsc->stopping_waiter); 5536 atomic64_set(&mdsc->dirty_folios, 0); 5537 init_waitqueue_head(&mdsc->flush_end_wq); 5538 init_waitqueue_head(&mdsc->session_close_wq); 5539 INIT_LIST_HEAD(&mdsc->waiting_for_map); 5540 mdsc->quotarealms_inodes = RB_ROOT; 5541 mutex_init(&mdsc->quotarealms_inodes_mutex); 5542 init_rwsem(&mdsc->snap_rwsem); 5543 mdsc->snap_realms = RB_ROOT; 5544 INIT_LIST_HEAD(&mdsc->snap_empty); 5545 spin_lock_init(&mdsc->snap_empty_lock); 5546 mdsc->request_tree = RB_ROOT; 5547 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); 5548 mdsc->last_renew_caps = jiffies; 5549 INIT_LIST_HEAD(&mdsc->cap_delay_list); 5550 #ifdef CONFIG_DEBUG_FS 5551 INIT_LIST_HEAD(&mdsc->cap_wait_list); 5552 #endif 5553 spin_lock_init(&mdsc->cap_delay_lock); 5554 INIT_LIST_HEAD(&mdsc->cap_unlink_delay_list); 5555 INIT_LIST_HEAD(&mdsc->snap_flush_list); 5556 spin_lock_init(&mdsc->snap_flush_lock); 5557 mdsc->last_cap_flush_tid = 1; 5558 INIT_LIST_HEAD(&mdsc->cap_flush_list); 5559 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); 5560 spin_lock_init(&mdsc->cap_dirty_lock); 5561 init_waitqueue_head(&mdsc->cap_flushing_wq); 5562 INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work); 5563 INIT_WORK(&mdsc->cap_unlink_work, ceph_cap_unlink_work); 5564 err = ceph_metric_init(&mdsc->metric); 5565 if (err) 5566 goto err_mdsmap; 5567 5568 spin_lock_init(&mdsc->dentry_list_lock); 5569 INIT_LIST_HEAD(&mdsc->dentry_leases); 5570 INIT_LIST_HEAD(&mdsc->dentry_dir_leases); 5571 5572 ceph_caps_init(mdsc); 5573 ceph_adjust_caps_max_min(mdsc, fsc->mount_options); 5574 5575 spin_lock_init(&mdsc->snapid_map_lock); 5576 mdsc->snapid_map_tree = RB_ROOT; 5577 INIT_LIST_HEAD(&mdsc->snapid_map_lru); 5578 5579 init_rwsem(&mdsc->pool_perm_rwsem); 5580 mdsc->pool_perm_tree = RB_ROOT; 5581 5582 strscpy(mdsc->nodename, utsname()->nodename, 5583 sizeof(mdsc->nodename)); 5584 5585 fsc->mdsc = mdsc; 5586 return 0; 5587 5588 err_mdsmap: 5589 kfree(mdsc->mdsmap); 5590 err_mdsc: 5591 kfree(mdsc); 5592 return err; 5593 } 5594 5595 /* 5596 * Wait for safe replies on open mds requests. If we time out, drop 5597 * all requests from the tree to avoid dangling dentry refs. 5598 */ 5599 static void wait_requests(struct ceph_mds_client *mdsc) 5600 { 5601 struct ceph_client *cl = mdsc->fsc->client; 5602 struct ceph_options *opts = mdsc->fsc->client->options; 5603 struct ceph_mds_request *req; 5604 5605 mutex_lock(&mdsc->mutex); 5606 if (__get_oldest_req(mdsc)) { 5607 mutex_unlock(&mdsc->mutex); 5608 5609 doutc(cl, "waiting for requests\n"); 5610 wait_for_completion_timeout(&mdsc->safe_umount_waiters, 5611 ceph_timeout_jiffies(opts->mount_timeout)); 5612 5613 /* tear down remaining requests */ 5614 mutex_lock(&mdsc->mutex); 5615 while ((req = __get_oldest_req(mdsc))) { 5616 doutc(cl, "timed out on tid %llu\n", req->r_tid); 5617 list_del_init(&req->r_wait); 5618 __unregister_request(mdsc, req); 5619 } 5620 } 5621 mutex_unlock(&mdsc->mutex); 5622 doutc(cl, "done\n"); 5623 } 5624 5625 void send_flush_mdlog(struct ceph_mds_session *s) 5626 { 5627 struct ceph_client *cl = s->s_mdsc->fsc->client; 5628 struct ceph_msg *msg; 5629 5630 /* 5631 * Pre-luminous MDS crashes when it sees an unknown session request 5632 */ 5633 if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS)) 5634 return; 5635 5636 mutex_lock(&s->s_mutex); 5637 doutc(cl, "request mdlog flush to mds%d (%s)s seq %lld\n", 5638 s->s_mds, ceph_session_state_name(s->s_state), s->s_seq); 5639 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG, 5640 s->s_seq); 5641 if (!msg) { 5642 pr_err_client(cl, "failed to request mdlog flush to mds%d (%s) seq %lld\n", 5643 s->s_mds, ceph_session_state_name(s->s_state), s->s_seq); 5644 } else { 5645 ceph_con_send(&s->s_con, msg); 5646 } 5647 mutex_unlock(&s->s_mutex); 5648 } 5649 5650 static int ceph_mds_auth_match(struct ceph_mds_client *mdsc, 5651 struct ceph_mds_cap_auth *auth, 5652 const struct cred *cred, 5653 char *tpath) 5654 { 5655 u32 caller_uid = from_kuid(&init_user_ns, cred->fsuid); 5656 u32 caller_gid = from_kgid(&init_user_ns, cred->fsgid); 5657 struct ceph_client *cl = mdsc->fsc->client; 5658 const char *fs_name = mdsc->fsc->mount_options->mds_namespace; 5659 const char *spath = mdsc->fsc->mount_options->server_path; 5660 bool gid_matched = false; 5661 u32 gid, tlen, len; 5662 int i, j; 5663 5664 doutc(cl, "fsname check fs_name=%s match.fs_name=%s\n", 5665 fs_name, auth->match.fs_name ? auth->match.fs_name : ""); 5666 if (auth->match.fs_name && strcmp(auth->match.fs_name, fs_name)) { 5667 /* fsname mismatch, try next one */ 5668 return 0; 5669 } 5670 5671 doutc(cl, "match.uid %lld\n", auth->match.uid); 5672 if (auth->match.uid != MDS_AUTH_UID_ANY) { 5673 if (auth->match.uid != caller_uid) 5674 return 0; 5675 if (auth->match.num_gids) { 5676 for (i = 0; i < auth->match.num_gids; i++) { 5677 if (caller_gid == auth->match.gids[i]) 5678 gid_matched = true; 5679 } 5680 if (!gid_matched && cred->group_info->ngroups) { 5681 for (i = 0; i < cred->group_info->ngroups; i++) { 5682 gid = from_kgid(&init_user_ns, 5683 cred->group_info->gid[i]); 5684 for (j = 0; j < auth->match.num_gids; j++) { 5685 if (gid == auth->match.gids[j]) { 5686 gid_matched = true; 5687 break; 5688 } 5689 } 5690 if (gid_matched) 5691 break; 5692 } 5693 } 5694 if (!gid_matched) 5695 return 0; 5696 } 5697 } 5698 5699 /* path match */ 5700 if (auth->match.path) { 5701 if (!tpath) 5702 return 0; 5703 5704 tlen = strlen(tpath); 5705 len = strlen(auth->match.path); 5706 if (len) { 5707 char *_tpath = tpath; 5708 bool free_tpath = false; 5709 int m, n; 5710 5711 doutc(cl, "server path %s, tpath %s, match.path %s\n", 5712 spath, tpath, auth->match.path); 5713 if (spath && (m = strlen(spath)) != 1) { 5714 /* mount path + '/' + tpath + an extra space */ 5715 n = m + 1 + tlen + 1; 5716 _tpath = kmalloc(n, GFP_NOFS); 5717 if (!_tpath) 5718 return -ENOMEM; 5719 /* remove the leading '/' */ 5720 snprintf(_tpath, n, "%s/%s", spath + 1, tpath); 5721 free_tpath = true; 5722 tlen = strlen(_tpath); 5723 } 5724 5725 /* 5726 * Please note the tailing '/' for match.path has already 5727 * been removed when parsing. 5728 * 5729 * Remove the tailing '/' for the target path. 5730 */ 5731 while (tlen && _tpath[tlen - 1] == '/') { 5732 _tpath[tlen - 1] = '\0'; 5733 tlen -= 1; 5734 } 5735 doutc(cl, "_tpath %s\n", _tpath); 5736 5737 /* 5738 * In case first == _tpath && tlen == len: 5739 * match.path=/foo --> /foo _path=/foo --> match 5740 * match.path=/foo/ --> /foo _path=/foo --> match 5741 * 5742 * In case first == _tmatch.path && tlen > len: 5743 * match.path=/foo/ --> /foo _path=/foo/ --> match 5744 * match.path=/foo --> /foo _path=/foo/ --> match 5745 * match.path=/foo/ --> /foo _path=/foo/d --> match 5746 * match.path=/foo --> /foo _path=/food --> mismatch 5747 * 5748 * All the other cases --> mismatch 5749 */ 5750 bool path_matched = true; 5751 char *first = strstr(_tpath, auth->match.path); 5752 if (first != _tpath || 5753 (tlen > len && _tpath[len] != '/')) { 5754 path_matched = false; 5755 } 5756 5757 if (free_tpath) 5758 kfree(_tpath); 5759 5760 if (!path_matched) 5761 return 0; 5762 } 5763 } 5764 5765 doutc(cl, "matched\n"); 5766 return 1; 5767 } 5768 5769 int ceph_mds_check_access(struct ceph_mds_client *mdsc, char *tpath, int mask) 5770 { 5771 const struct cred *cred = get_current_cred(); 5772 u32 caller_uid = from_kuid(&init_user_ns, cred->fsuid); 5773 u32 caller_gid = from_kgid(&init_user_ns, cred->fsgid); 5774 struct ceph_mds_cap_auth *rw_perms_s = NULL; 5775 struct ceph_client *cl = mdsc->fsc->client; 5776 bool root_squash_perms = true; 5777 int i, err; 5778 5779 doutc(cl, "tpath '%s', mask %d, caller_uid %d, caller_gid %d\n", 5780 tpath, mask, caller_uid, caller_gid); 5781 5782 for (i = 0; i < mdsc->s_cap_auths_num; i++) { 5783 struct ceph_mds_cap_auth *s = &mdsc->s_cap_auths[i]; 5784 5785 err = ceph_mds_auth_match(mdsc, s, cred, tpath); 5786 if (err < 0) { 5787 put_cred(cred); 5788 return err; 5789 } else if (err > 0) { 5790 /* always follow the last auth caps' permission */ 5791 root_squash_perms = true; 5792 rw_perms_s = NULL; 5793 if ((mask & MAY_WRITE) && s->writeable && 5794 s->match.root_squash && (!caller_uid || !caller_gid)) 5795 root_squash_perms = false; 5796 5797 if (((mask & MAY_WRITE) && !s->writeable) || 5798 ((mask & MAY_READ) && !s->readable)) 5799 rw_perms_s = s; 5800 } 5801 } 5802 5803 put_cred(cred); 5804 5805 doutc(cl, "root_squash_perms %d, rw_perms_s %p\n", root_squash_perms, 5806 rw_perms_s); 5807 if (root_squash_perms && rw_perms_s == NULL) { 5808 doutc(cl, "access allowed\n"); 5809 return 0; 5810 } 5811 5812 if (!root_squash_perms) { 5813 doutc(cl, "root_squash is enabled and user(%d %d) isn't allowed to write", 5814 caller_uid, caller_gid); 5815 } 5816 if (rw_perms_s) { 5817 doutc(cl, "mds auth caps readable/writeable %d/%d while request r/w %d/%d", 5818 rw_perms_s->readable, rw_perms_s->writeable, 5819 !!(mask & MAY_READ), !!(mask & MAY_WRITE)); 5820 } 5821 doutc(cl, "access denied\n"); 5822 return -EACCES; 5823 } 5824 5825 /* 5826 * called before mount is ro, and before dentries are torn down. 5827 * (hmm, does this still race with new lookups?) 5828 */ 5829 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) 5830 { 5831 doutc(mdsc->fsc->client, "begin\n"); 5832 mdsc->stopping = CEPH_MDSC_STOPPING_BEGIN; 5833 5834 ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true); 5835 ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false); 5836 ceph_flush_dirty_caps(mdsc); 5837 wait_requests(mdsc); 5838 5839 /* 5840 * wait for reply handlers to drop their request refs and 5841 * their inode/dcache refs 5842 */ 5843 ceph_msgr_flush(); 5844 5845 ceph_cleanup_quotarealms_inodes(mdsc); 5846 doutc(mdsc->fsc->client, "done\n"); 5847 } 5848 5849 /* 5850 * flush the mdlog and wait for all write mds requests to flush. 5851 */ 5852 static void flush_mdlog_and_wait_mdsc_unsafe_requests(struct ceph_mds_client *mdsc, 5853 u64 want_tid) 5854 { 5855 struct ceph_client *cl = mdsc->fsc->client; 5856 struct ceph_mds_request *req = NULL, *nextreq; 5857 struct ceph_mds_session *last_session = NULL; 5858 struct rb_node *n; 5859 5860 mutex_lock(&mdsc->mutex); 5861 doutc(cl, "want %lld\n", want_tid); 5862 restart: 5863 req = __get_oldest_req(mdsc); 5864 while (req && req->r_tid <= want_tid) { 5865 /* find next request */ 5866 n = rb_next(&req->r_node); 5867 if (n) 5868 nextreq = rb_entry(n, struct ceph_mds_request, r_node); 5869 else 5870 nextreq = NULL; 5871 if (req->r_op != CEPH_MDS_OP_SETFILELOCK && 5872 (req->r_op & CEPH_MDS_OP_WRITE)) { 5873 struct ceph_mds_session *s = req->r_session; 5874 5875 if (!s) { 5876 req = nextreq; 5877 continue; 5878 } 5879 5880 /* write op */ 5881 ceph_mdsc_get_request(req); 5882 if (nextreq) 5883 ceph_mdsc_get_request(nextreq); 5884 s = ceph_get_mds_session(s); 5885 mutex_unlock(&mdsc->mutex); 5886 5887 /* send flush mdlog request to MDS */ 5888 if (last_session != s) { 5889 send_flush_mdlog(s); 5890 ceph_put_mds_session(last_session); 5891 last_session = s; 5892 } else { 5893 ceph_put_mds_session(s); 5894 } 5895 doutc(cl, "wait on %llu (want %llu)\n", 5896 req->r_tid, want_tid); 5897 wait_for_completion(&req->r_safe_completion); 5898 5899 mutex_lock(&mdsc->mutex); 5900 ceph_mdsc_put_request(req); 5901 if (!nextreq) 5902 break; /* next dne before, so we're done! */ 5903 if (RB_EMPTY_NODE(&nextreq->r_node)) { 5904 /* next request was removed from tree */ 5905 ceph_mdsc_put_request(nextreq); 5906 goto restart; 5907 } 5908 ceph_mdsc_put_request(nextreq); /* won't go away */ 5909 } 5910 req = nextreq; 5911 } 5912 mutex_unlock(&mdsc->mutex); 5913 ceph_put_mds_session(last_session); 5914 doutc(cl, "done\n"); 5915 } 5916 5917 void ceph_mdsc_sync(struct ceph_mds_client *mdsc) 5918 { 5919 struct ceph_client *cl = mdsc->fsc->client; 5920 u64 want_tid, want_flush; 5921 5922 if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) 5923 return; 5924 5925 doutc(cl, "sync\n"); 5926 mutex_lock(&mdsc->mutex); 5927 want_tid = mdsc->last_tid; 5928 mutex_unlock(&mdsc->mutex); 5929 5930 ceph_flush_dirty_caps(mdsc); 5931 ceph_flush_cap_releases(mdsc); 5932 spin_lock(&mdsc->cap_dirty_lock); 5933 want_flush = mdsc->last_cap_flush_tid; 5934 if (!list_empty(&mdsc->cap_flush_list)) { 5935 struct ceph_cap_flush *cf = 5936 list_last_entry(&mdsc->cap_flush_list, 5937 struct ceph_cap_flush, g_list); 5938 cf->wake = true; 5939 } 5940 spin_unlock(&mdsc->cap_dirty_lock); 5941 5942 doutc(cl, "sync want tid %lld flush_seq %lld\n", want_tid, want_flush); 5943 5944 flush_mdlog_and_wait_mdsc_unsafe_requests(mdsc, want_tid); 5945 wait_caps_flush(mdsc, want_flush); 5946 } 5947 5948 /* 5949 * true if all sessions are closed, or we force unmount 5950 */ 5951 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped) 5952 { 5953 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 5954 return true; 5955 return atomic_read(&mdsc->num_sessions) <= skipped; 5956 } 5957 5958 /* 5959 * called after sb is ro or when metadata corrupted. 5960 */ 5961 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) 5962 { 5963 struct ceph_options *opts = mdsc->fsc->client->options; 5964 struct ceph_client *cl = mdsc->fsc->client; 5965 struct ceph_mds_session *session; 5966 int i; 5967 int skipped = 0; 5968 5969 doutc(cl, "begin\n"); 5970 5971 /* close sessions */ 5972 mutex_lock(&mdsc->mutex); 5973 for (i = 0; i < mdsc->max_sessions; i++) { 5974 session = __ceph_lookup_mds_session(mdsc, i); 5975 if (!session) 5976 continue; 5977 mutex_unlock(&mdsc->mutex); 5978 mutex_lock(&session->s_mutex); 5979 if (__close_session(mdsc, session) <= 0) 5980 skipped++; 5981 mutex_unlock(&session->s_mutex); 5982 ceph_put_mds_session(session); 5983 mutex_lock(&mdsc->mutex); 5984 } 5985 mutex_unlock(&mdsc->mutex); 5986 5987 doutc(cl, "waiting for sessions to close\n"); 5988 wait_event_timeout(mdsc->session_close_wq, 5989 done_closing_sessions(mdsc, skipped), 5990 ceph_timeout_jiffies(opts->mount_timeout)); 5991 5992 /* tear down remaining sessions */ 5993 mutex_lock(&mdsc->mutex); 5994 for (i = 0; i < mdsc->max_sessions; i++) { 5995 if (mdsc->sessions[i]) { 5996 session = ceph_get_mds_session(mdsc->sessions[i]); 5997 __unregister_session(mdsc, session); 5998 mutex_unlock(&mdsc->mutex); 5999 mutex_lock(&session->s_mutex); 6000 remove_session_caps(session); 6001 mutex_unlock(&session->s_mutex); 6002 ceph_put_mds_session(session); 6003 mutex_lock(&mdsc->mutex); 6004 } 6005 } 6006 WARN_ON(!list_empty(&mdsc->cap_delay_list)); 6007 mutex_unlock(&mdsc->mutex); 6008 6009 ceph_cleanup_snapid_map(mdsc); 6010 ceph_cleanup_global_and_empty_realms(mdsc); 6011 6012 cancel_work_sync(&mdsc->cap_reclaim_work); 6013 cancel_work_sync(&mdsc->cap_unlink_work); 6014 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 6015 6016 doutc(cl, "done\n"); 6017 } 6018 6019 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc) 6020 { 6021 struct ceph_mds_session *session; 6022 int mds; 6023 6024 doutc(mdsc->fsc->client, "force umount\n"); 6025 6026 mutex_lock(&mdsc->mutex); 6027 for (mds = 0; mds < mdsc->max_sessions; mds++) { 6028 session = __ceph_lookup_mds_session(mdsc, mds); 6029 if (!session) 6030 continue; 6031 6032 if (session->s_state == CEPH_MDS_SESSION_REJECTED) 6033 __unregister_session(mdsc, session); 6034 __wake_requests(mdsc, &session->s_waiting); 6035 mutex_unlock(&mdsc->mutex); 6036 6037 mutex_lock(&session->s_mutex); 6038 __close_session(mdsc, session); 6039 if (session->s_state == CEPH_MDS_SESSION_CLOSING) { 6040 cleanup_session_requests(mdsc, session); 6041 remove_session_caps(session); 6042 } 6043 mutex_unlock(&session->s_mutex); 6044 ceph_put_mds_session(session); 6045 6046 mutex_lock(&mdsc->mutex); 6047 kick_requests(mdsc, mds); 6048 } 6049 __wake_requests(mdsc, &mdsc->waiting_for_map); 6050 mutex_unlock(&mdsc->mutex); 6051 } 6052 6053 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) 6054 { 6055 doutc(mdsc->fsc->client, "stop\n"); 6056 /* 6057 * Make sure the delayed work stopped before releasing 6058 * the resources. 6059 * 6060 * Because the cancel_delayed_work_sync() will only 6061 * guarantee that the work finishes executing. But the 6062 * delayed work will re-arm itself again after that. 6063 */ 6064 flush_delayed_work(&mdsc->delayed_work); 6065 6066 if (mdsc->mdsmap) 6067 ceph_mdsmap_destroy(mdsc->mdsmap); 6068 kfree(mdsc->sessions); 6069 ceph_caps_finalize(mdsc); 6070 6071 if (mdsc->s_cap_auths) { 6072 int i; 6073 6074 for (i = 0; i < mdsc->s_cap_auths_num; i++) { 6075 kfree(mdsc->s_cap_auths[i].match.gids); 6076 kfree(mdsc->s_cap_auths[i].match.path); 6077 kfree(mdsc->s_cap_auths[i].match.fs_name); 6078 } 6079 kfree(mdsc->s_cap_auths); 6080 } 6081 6082 ceph_pool_perm_destroy(mdsc); 6083 } 6084 6085 void ceph_mdsc_destroy(struct ceph_fs_client *fsc) 6086 { 6087 struct ceph_mds_client *mdsc = fsc->mdsc; 6088 doutc(fsc->client, "%p\n", mdsc); 6089 6090 if (!mdsc) 6091 return; 6092 6093 /* flush out any connection work with references to us */ 6094 ceph_msgr_flush(); 6095 6096 ceph_mdsc_stop(mdsc); 6097 6098 ceph_metric_destroy(&mdsc->metric); 6099 6100 fsc->mdsc = NULL; 6101 kfree(mdsc); 6102 doutc(fsc->client, "%p done\n", mdsc); 6103 } 6104 6105 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 6106 { 6107 struct ceph_fs_client *fsc = mdsc->fsc; 6108 struct ceph_client *cl = fsc->client; 6109 const char *mds_namespace = fsc->mount_options->mds_namespace; 6110 void *p = msg->front.iov_base; 6111 void *end = p + msg->front.iov_len; 6112 u32 epoch; 6113 u32 num_fs; 6114 u32 mount_fscid = (u32)-1; 6115 int err = -EINVAL; 6116 6117 ceph_decode_need(&p, end, sizeof(u32), bad); 6118 epoch = ceph_decode_32(&p); 6119 6120 doutc(cl, "epoch %u\n", epoch); 6121 6122 /* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */ 6123 ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad); 6124 6125 ceph_decode_32_safe(&p, end, num_fs, bad); 6126 while (num_fs-- > 0) { 6127 void *info_p, *info_end; 6128 u32 info_len; 6129 u32 fscid, namelen; 6130 6131 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 6132 p += 2; // info_v, info_cv 6133 info_len = ceph_decode_32(&p); 6134 ceph_decode_need(&p, end, info_len, bad); 6135 info_p = p; 6136 info_end = p + info_len; 6137 p = info_end; 6138 6139 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad); 6140 fscid = ceph_decode_32(&info_p); 6141 namelen = ceph_decode_32(&info_p); 6142 ceph_decode_need(&info_p, info_end, namelen, bad); 6143 6144 if (mds_namespace && 6145 strlen(mds_namespace) == namelen && 6146 !strncmp(mds_namespace, (char *)info_p, namelen)) { 6147 mount_fscid = fscid; 6148 break; 6149 } 6150 } 6151 6152 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch); 6153 if (mount_fscid != (u32)-1) { 6154 fsc->client->monc.fs_cluster_id = mount_fscid; 6155 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 6156 0, true); 6157 ceph_monc_renew_subs(&fsc->client->monc); 6158 } else { 6159 err = -ENOENT; 6160 goto err_out; 6161 } 6162 return; 6163 6164 bad: 6165 pr_err_client(cl, "error decoding fsmap %d. Shutting down mount.\n", 6166 err); 6167 ceph_umount_begin(mdsc->fsc->sb); 6168 ceph_msg_dump(msg); 6169 err_out: 6170 mutex_lock(&mdsc->mutex); 6171 mdsc->mdsmap_err = err; 6172 __wake_requests(mdsc, &mdsc->waiting_for_map); 6173 mutex_unlock(&mdsc->mutex); 6174 } 6175 6176 /* 6177 * handle mds map update. 6178 */ 6179 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 6180 { 6181 struct ceph_client *cl = mdsc->fsc->client; 6182 u32 epoch; 6183 u32 maplen; 6184 void *p = msg->front.iov_base; 6185 void *end = p + msg->front.iov_len; 6186 struct ceph_mdsmap *newmap, *oldmap; 6187 struct ceph_fsid fsid; 6188 int err = -EINVAL; 6189 6190 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad); 6191 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 6192 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0) 6193 return; 6194 epoch = ceph_decode_32(&p); 6195 maplen = ceph_decode_32(&p); 6196 doutc(cl, "epoch %u len %d\n", epoch, (int)maplen); 6197 6198 /* do we need it? */ 6199 mutex_lock(&mdsc->mutex); 6200 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 6201 doutc(cl, "epoch %u <= our %u\n", epoch, mdsc->mdsmap->m_epoch); 6202 mutex_unlock(&mdsc->mutex); 6203 return; 6204 } 6205 6206 newmap = ceph_mdsmap_decode(mdsc, &p, end, ceph_msgr2(mdsc->fsc->client)); 6207 if (IS_ERR(newmap)) { 6208 err = PTR_ERR(newmap); 6209 goto bad_unlock; 6210 } 6211 6212 /* swap into place */ 6213 if (mdsc->mdsmap) { 6214 oldmap = mdsc->mdsmap; 6215 mdsc->mdsmap = newmap; 6216 check_new_map(mdsc, newmap, oldmap); 6217 ceph_mdsmap_destroy(oldmap); 6218 } else { 6219 mdsc->mdsmap = newmap; /* first mds map */ 6220 } 6221 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size, 6222 MAX_LFS_FILESIZE); 6223 6224 __wake_requests(mdsc, &mdsc->waiting_for_map); 6225 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP, 6226 mdsc->mdsmap->m_epoch); 6227 6228 mutex_unlock(&mdsc->mutex); 6229 schedule_delayed(mdsc, 0); 6230 return; 6231 6232 bad_unlock: 6233 mutex_unlock(&mdsc->mutex); 6234 bad: 6235 pr_err_client(cl, "error decoding mdsmap %d. Shutting down mount.\n", 6236 err); 6237 ceph_umount_begin(mdsc->fsc->sb); 6238 ceph_msg_dump(msg); 6239 return; 6240 } 6241 6242 static struct ceph_connection *mds_get_con(struct ceph_connection *con) 6243 { 6244 struct ceph_mds_session *s = con->private; 6245 6246 if (ceph_get_mds_session(s)) 6247 return con; 6248 return NULL; 6249 } 6250 6251 static void mds_put_con(struct ceph_connection *con) 6252 { 6253 struct ceph_mds_session *s = con->private; 6254 6255 ceph_put_mds_session(s); 6256 } 6257 6258 /* 6259 * if the client is unresponsive for long enough, the mds will kill 6260 * the session entirely. 6261 */ 6262 static void mds_peer_reset(struct ceph_connection *con) 6263 { 6264 struct ceph_mds_session *s = con->private; 6265 struct ceph_mds_client *mdsc = s->s_mdsc; 6266 6267 pr_warn_client(mdsc->fsc->client, "mds%d closed our session\n", 6268 s->s_mds); 6269 if (READ_ONCE(mdsc->fsc->mount_state) != CEPH_MOUNT_FENCE_IO && 6270 ceph_mdsmap_get_state(mdsc->mdsmap, s->s_mds) >= CEPH_MDS_STATE_RECONNECT) 6271 send_mds_reconnect(mdsc, s); 6272 } 6273 6274 static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg) 6275 { 6276 struct ceph_mds_session *s = con->private; 6277 struct ceph_mds_client *mdsc = s->s_mdsc; 6278 struct ceph_client *cl = mdsc->fsc->client; 6279 int type = le16_to_cpu(msg->hdr.type); 6280 6281 mutex_lock(&mdsc->mutex); 6282 if (__verify_registered_session(mdsc, s) < 0) { 6283 mutex_unlock(&mdsc->mutex); 6284 goto out; 6285 } 6286 mutex_unlock(&mdsc->mutex); 6287 6288 switch (type) { 6289 case CEPH_MSG_MDS_MAP: 6290 ceph_mdsc_handle_mdsmap(mdsc, msg); 6291 break; 6292 case CEPH_MSG_FS_MAP_USER: 6293 ceph_mdsc_handle_fsmap(mdsc, msg); 6294 break; 6295 case CEPH_MSG_CLIENT_SESSION: 6296 handle_session(s, msg); 6297 break; 6298 case CEPH_MSG_CLIENT_REPLY: 6299 handle_reply(s, msg); 6300 break; 6301 case CEPH_MSG_CLIENT_REQUEST_FORWARD: 6302 handle_forward(mdsc, s, msg); 6303 break; 6304 case CEPH_MSG_CLIENT_CAPS: 6305 ceph_handle_caps(s, msg); 6306 break; 6307 case CEPH_MSG_CLIENT_SNAP: 6308 ceph_handle_snap(mdsc, s, msg); 6309 break; 6310 case CEPH_MSG_CLIENT_LEASE: 6311 handle_lease(mdsc, s, msg); 6312 break; 6313 case CEPH_MSG_CLIENT_QUOTA: 6314 ceph_handle_quota(mdsc, s, msg); 6315 break; 6316 6317 default: 6318 pr_err_client(cl, "received unknown message type %d %s\n", 6319 type, ceph_msg_type_name(type)); 6320 } 6321 out: 6322 ceph_msg_put(msg); 6323 } 6324 6325 /* 6326 * authentication 6327 */ 6328 6329 /* 6330 * Note: returned pointer is the address of a structure that's 6331 * managed separately. Caller must *not* attempt to free it. 6332 */ 6333 static struct ceph_auth_handshake * 6334 mds_get_authorizer(struct ceph_connection *con, int *proto, int force_new) 6335 { 6336 struct ceph_mds_session *s = con->private; 6337 struct ceph_mds_client *mdsc = s->s_mdsc; 6338 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 6339 struct ceph_auth_handshake *auth = &s->s_auth; 6340 int ret; 6341 6342 ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS, 6343 force_new, proto, NULL, NULL); 6344 if (ret) 6345 return ERR_PTR(ret); 6346 6347 return auth; 6348 } 6349 6350 static int mds_add_authorizer_challenge(struct ceph_connection *con, 6351 void *challenge_buf, int challenge_buf_len) 6352 { 6353 struct ceph_mds_session *s = con->private; 6354 struct ceph_mds_client *mdsc = s->s_mdsc; 6355 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 6356 6357 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer, 6358 challenge_buf, challenge_buf_len); 6359 } 6360 6361 static int mds_verify_authorizer_reply(struct ceph_connection *con) 6362 { 6363 struct ceph_mds_session *s = con->private; 6364 struct ceph_mds_client *mdsc = s->s_mdsc; 6365 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 6366 struct ceph_auth_handshake *auth = &s->s_auth; 6367 6368 return ceph_auth_verify_authorizer_reply(ac, auth->authorizer, 6369 auth->authorizer_reply_buf, auth->authorizer_reply_buf_len, 6370 NULL, NULL, NULL, NULL); 6371 } 6372 6373 static int mds_invalidate_authorizer(struct ceph_connection *con) 6374 { 6375 struct ceph_mds_session *s = con->private; 6376 struct ceph_mds_client *mdsc = s->s_mdsc; 6377 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 6378 6379 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS); 6380 6381 return ceph_monc_validate_auth(&mdsc->fsc->client->monc); 6382 } 6383 6384 static int mds_get_auth_request(struct ceph_connection *con, 6385 void *buf, int *buf_len, 6386 void **authorizer, int *authorizer_len) 6387 { 6388 struct ceph_mds_session *s = con->private; 6389 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 6390 struct ceph_auth_handshake *auth = &s->s_auth; 6391 int ret; 6392 6393 ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS, 6394 buf, buf_len); 6395 if (ret) 6396 return ret; 6397 6398 *authorizer = auth->authorizer_buf; 6399 *authorizer_len = auth->authorizer_buf_len; 6400 return 0; 6401 } 6402 6403 static int mds_handle_auth_reply_more(struct ceph_connection *con, 6404 void *reply, int reply_len, 6405 void *buf, int *buf_len, 6406 void **authorizer, int *authorizer_len) 6407 { 6408 struct ceph_mds_session *s = con->private; 6409 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 6410 struct ceph_auth_handshake *auth = &s->s_auth; 6411 int ret; 6412 6413 ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len, 6414 buf, buf_len); 6415 if (ret) 6416 return ret; 6417 6418 *authorizer = auth->authorizer_buf; 6419 *authorizer_len = auth->authorizer_buf_len; 6420 return 0; 6421 } 6422 6423 static int mds_handle_auth_done(struct ceph_connection *con, 6424 u64 global_id, void *reply, int reply_len, 6425 u8 *session_key, int *session_key_len, 6426 u8 *con_secret, int *con_secret_len) 6427 { 6428 struct ceph_mds_session *s = con->private; 6429 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 6430 struct ceph_auth_handshake *auth = &s->s_auth; 6431 6432 return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len, 6433 session_key, session_key_len, 6434 con_secret, con_secret_len); 6435 } 6436 6437 static int mds_handle_auth_bad_method(struct ceph_connection *con, 6438 int used_proto, int result, 6439 const int *allowed_protos, int proto_cnt, 6440 const int *allowed_modes, int mode_cnt) 6441 { 6442 struct ceph_mds_session *s = con->private; 6443 struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc; 6444 int ret; 6445 6446 if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS, 6447 used_proto, result, 6448 allowed_protos, proto_cnt, 6449 allowed_modes, mode_cnt)) { 6450 ret = ceph_monc_validate_auth(monc); 6451 if (ret) 6452 return ret; 6453 } 6454 6455 return -EACCES; 6456 } 6457 6458 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con, 6459 struct ceph_msg_header *hdr, int *skip) 6460 { 6461 struct ceph_msg *msg; 6462 int type = (int) le16_to_cpu(hdr->type); 6463 int front_len = (int) le32_to_cpu(hdr->front_len); 6464 6465 if (con->in_msg) 6466 return con->in_msg; 6467 6468 *skip = 0; 6469 msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 6470 if (!msg) { 6471 pr_err("unable to allocate msg type %d len %d\n", 6472 type, front_len); 6473 return NULL; 6474 } 6475 6476 return msg; 6477 } 6478 6479 static int mds_sign_message(struct ceph_msg *msg) 6480 { 6481 struct ceph_mds_session *s = msg->con->private; 6482 struct ceph_auth_handshake *auth = &s->s_auth; 6483 6484 return ceph_auth_sign_message(auth, msg); 6485 } 6486 6487 static int mds_check_message_signature(struct ceph_msg *msg) 6488 { 6489 struct ceph_mds_session *s = msg->con->private; 6490 struct ceph_auth_handshake *auth = &s->s_auth; 6491 6492 return ceph_auth_check_message_signature(auth, msg); 6493 } 6494 6495 static const struct ceph_connection_operations mds_con_ops = { 6496 .get = mds_get_con, 6497 .put = mds_put_con, 6498 .alloc_msg = mds_alloc_msg, 6499 .dispatch = mds_dispatch, 6500 .peer_reset = mds_peer_reset, 6501 .get_authorizer = mds_get_authorizer, 6502 .add_authorizer_challenge = mds_add_authorizer_challenge, 6503 .verify_authorizer_reply = mds_verify_authorizer_reply, 6504 .invalidate_authorizer = mds_invalidate_authorizer, 6505 .sign_message = mds_sign_message, 6506 .check_message_signature = mds_check_message_signature, 6507 .get_auth_request = mds_get_auth_request, 6508 .handle_auth_reply_more = mds_handle_auth_reply_more, 6509 .handle_auth_done = mds_handle_auth_done, 6510 .handle_auth_bad_method = mds_handle_auth_bad_method, 6511 }; 6512 6513 /* eof */ 6514