1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/fs.h> 5 #include <linux/wait.h> 6 #include <linux/slab.h> 7 #include <linux/gfp.h> 8 #include <linux/sched.h> 9 #include <linux/debugfs.h> 10 #include <linux/seq_file.h> 11 #include <linux/ratelimit.h> 12 #include <linux/bits.h> 13 #include <linux/ktime.h> 14 #include <linux/bitmap.h> 15 #include <linux/mnt_idmapping.h> 16 17 #include "super.h" 18 #include "mds_client.h" 19 #include "crypto.h" 20 21 #include <linux/ceph/ceph_features.h> 22 #include <linux/ceph/messenger.h> 23 #include <linux/ceph/decode.h> 24 #include <linux/ceph/pagelist.h> 25 #include <linux/ceph/auth.h> 26 #include <linux/ceph/debugfs.h> 27 28 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE) 29 30 /* 31 * A cluster of MDS (metadata server) daemons is responsible for 32 * managing the file system namespace (the directory hierarchy and 33 * inodes) and for coordinating shared access to storage. Metadata is 34 * partitioning hierarchically across a number of servers, and that 35 * partition varies over time as the cluster adjusts the distribution 36 * in order to balance load. 37 * 38 * The MDS client is primarily responsible to managing synchronous 39 * metadata requests for operations like open, unlink, and so forth. 40 * If there is a MDS failure, we find out about it when we (possibly 41 * request and) receive a new MDS map, and can resubmit affected 42 * requests. 43 * 44 * For the most part, though, we take advantage of a lossless 45 * communications channel to the MDS, and do not need to worry about 46 * timing out or resubmitting requests. 47 * 48 * We maintain a stateful "session" with each MDS we interact with. 49 * Within each session, we sent periodic heartbeat messages to ensure 50 * any capabilities or leases we have been issues remain valid. If 51 * the session times out and goes stale, our leases and capabilities 52 * are no longer valid. 53 */ 54 55 struct ceph_reconnect_state { 56 struct ceph_mds_session *session; 57 int nr_caps, nr_realms; 58 struct ceph_pagelist *pagelist; 59 unsigned msg_version; 60 bool allow_multi; 61 }; 62 63 static void __wake_requests(struct ceph_mds_client *mdsc, 64 struct list_head *head); 65 static void ceph_cap_release_work(struct work_struct *work); 66 static void ceph_cap_reclaim_work(struct work_struct *work); 67 68 static const struct ceph_connection_operations mds_con_ops; 69 70 71 /* 72 * mds reply parsing 73 */ 74 75 static int parse_reply_info_quota(void **p, void *end, 76 struct ceph_mds_reply_info_in *info) 77 { 78 u8 struct_v, struct_compat; 79 u32 struct_len; 80 81 ceph_decode_8_safe(p, end, struct_v, bad); 82 ceph_decode_8_safe(p, end, struct_compat, bad); 83 /* struct_v is expected to be >= 1. we only 84 * understand encoding with struct_compat == 1. */ 85 if (!struct_v || struct_compat != 1) 86 goto bad; 87 ceph_decode_32_safe(p, end, struct_len, bad); 88 ceph_decode_need(p, end, struct_len, bad); 89 end = *p + struct_len; 90 ceph_decode_64_safe(p, end, info->max_bytes, bad); 91 ceph_decode_64_safe(p, end, info->max_files, bad); 92 *p = end; 93 return 0; 94 bad: 95 return -EIO; 96 } 97 98 /* 99 * parse individual inode info 100 */ 101 static int parse_reply_info_in(void **p, void *end, 102 struct ceph_mds_reply_info_in *info, 103 u64 features) 104 { 105 int err = 0; 106 u8 struct_v = 0; 107 108 if (features == (u64)-1) { 109 u32 struct_len; 110 u8 struct_compat; 111 ceph_decode_8_safe(p, end, struct_v, bad); 112 ceph_decode_8_safe(p, end, struct_compat, bad); 113 /* struct_v is expected to be >= 1. we only understand 114 * encoding with struct_compat == 1. */ 115 if (!struct_v || struct_compat != 1) 116 goto bad; 117 ceph_decode_32_safe(p, end, struct_len, bad); 118 ceph_decode_need(p, end, struct_len, bad); 119 end = *p + struct_len; 120 } 121 122 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad); 123 info->in = *p; 124 *p += sizeof(struct ceph_mds_reply_inode) + 125 sizeof(*info->in->fragtree.splits) * 126 le32_to_cpu(info->in->fragtree.nsplits); 127 128 ceph_decode_32_safe(p, end, info->symlink_len, bad); 129 ceph_decode_need(p, end, info->symlink_len, bad); 130 info->symlink = *p; 131 *p += info->symlink_len; 132 133 ceph_decode_copy_safe(p, end, &info->dir_layout, 134 sizeof(info->dir_layout), bad); 135 ceph_decode_32_safe(p, end, info->xattr_len, bad); 136 ceph_decode_need(p, end, info->xattr_len, bad); 137 info->xattr_data = *p; 138 *p += info->xattr_len; 139 140 if (features == (u64)-1) { 141 /* inline data */ 142 ceph_decode_64_safe(p, end, info->inline_version, bad); 143 ceph_decode_32_safe(p, end, info->inline_len, bad); 144 ceph_decode_need(p, end, info->inline_len, bad); 145 info->inline_data = *p; 146 *p += info->inline_len; 147 /* quota */ 148 err = parse_reply_info_quota(p, end, info); 149 if (err < 0) 150 goto out_bad; 151 /* pool namespace */ 152 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 153 if (info->pool_ns_len > 0) { 154 ceph_decode_need(p, end, info->pool_ns_len, bad); 155 info->pool_ns_data = *p; 156 *p += info->pool_ns_len; 157 } 158 159 /* btime */ 160 ceph_decode_need(p, end, sizeof(info->btime), bad); 161 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 162 163 /* change attribute */ 164 ceph_decode_64_safe(p, end, info->change_attr, bad); 165 166 /* dir pin */ 167 if (struct_v >= 2) { 168 ceph_decode_32_safe(p, end, info->dir_pin, bad); 169 } else { 170 info->dir_pin = -ENODATA; 171 } 172 173 /* snapshot birth time, remains zero for v<=2 */ 174 if (struct_v >= 3) { 175 ceph_decode_need(p, end, sizeof(info->snap_btime), bad); 176 ceph_decode_copy(p, &info->snap_btime, 177 sizeof(info->snap_btime)); 178 } else { 179 memset(&info->snap_btime, 0, sizeof(info->snap_btime)); 180 } 181 182 /* snapshot count, remains zero for v<=3 */ 183 if (struct_v >= 4) { 184 ceph_decode_64_safe(p, end, info->rsnaps, bad); 185 } else { 186 info->rsnaps = 0; 187 } 188 189 if (struct_v >= 5) { 190 u32 alen; 191 192 ceph_decode_32_safe(p, end, alen, bad); 193 194 while (alen--) { 195 u32 len; 196 197 /* key */ 198 ceph_decode_32_safe(p, end, len, bad); 199 ceph_decode_skip_n(p, end, len, bad); 200 /* value */ 201 ceph_decode_32_safe(p, end, len, bad); 202 ceph_decode_skip_n(p, end, len, bad); 203 } 204 } 205 206 /* fscrypt flag -- ignore */ 207 if (struct_v >= 6) 208 ceph_decode_skip_8(p, end, bad); 209 210 info->fscrypt_auth = NULL; 211 info->fscrypt_auth_len = 0; 212 info->fscrypt_file = NULL; 213 info->fscrypt_file_len = 0; 214 if (struct_v >= 7) { 215 ceph_decode_32_safe(p, end, info->fscrypt_auth_len, bad); 216 if (info->fscrypt_auth_len) { 217 info->fscrypt_auth = kmalloc(info->fscrypt_auth_len, 218 GFP_KERNEL); 219 if (!info->fscrypt_auth) 220 return -ENOMEM; 221 ceph_decode_copy_safe(p, end, info->fscrypt_auth, 222 info->fscrypt_auth_len, bad); 223 } 224 ceph_decode_32_safe(p, end, info->fscrypt_file_len, bad); 225 if (info->fscrypt_file_len) { 226 info->fscrypt_file = kmalloc(info->fscrypt_file_len, 227 GFP_KERNEL); 228 if (!info->fscrypt_file) 229 return -ENOMEM; 230 ceph_decode_copy_safe(p, end, info->fscrypt_file, 231 info->fscrypt_file_len, bad); 232 } 233 } 234 *p = end; 235 } else { 236 /* legacy (unversioned) struct */ 237 if (features & CEPH_FEATURE_MDS_INLINE_DATA) { 238 ceph_decode_64_safe(p, end, info->inline_version, bad); 239 ceph_decode_32_safe(p, end, info->inline_len, bad); 240 ceph_decode_need(p, end, info->inline_len, bad); 241 info->inline_data = *p; 242 *p += info->inline_len; 243 } else 244 info->inline_version = CEPH_INLINE_NONE; 245 246 if (features & CEPH_FEATURE_MDS_QUOTA) { 247 err = parse_reply_info_quota(p, end, info); 248 if (err < 0) 249 goto out_bad; 250 } else { 251 info->max_bytes = 0; 252 info->max_files = 0; 253 } 254 255 info->pool_ns_len = 0; 256 info->pool_ns_data = NULL; 257 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { 258 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 259 if (info->pool_ns_len > 0) { 260 ceph_decode_need(p, end, info->pool_ns_len, bad); 261 info->pool_ns_data = *p; 262 *p += info->pool_ns_len; 263 } 264 } 265 266 if (features & CEPH_FEATURE_FS_BTIME) { 267 ceph_decode_need(p, end, sizeof(info->btime), bad); 268 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 269 ceph_decode_64_safe(p, end, info->change_attr, bad); 270 } 271 272 info->dir_pin = -ENODATA; 273 /* info->snap_btime and info->rsnaps remain zero */ 274 } 275 return 0; 276 bad: 277 err = -EIO; 278 out_bad: 279 return err; 280 } 281 282 static int parse_reply_info_dir(void **p, void *end, 283 struct ceph_mds_reply_dirfrag **dirfrag, 284 u64 features) 285 { 286 if (features == (u64)-1) { 287 u8 struct_v, struct_compat; 288 u32 struct_len; 289 ceph_decode_8_safe(p, end, struct_v, bad); 290 ceph_decode_8_safe(p, end, struct_compat, bad); 291 /* struct_v is expected to be >= 1. we only understand 292 * encoding whose struct_compat == 1. */ 293 if (!struct_v || struct_compat != 1) 294 goto bad; 295 ceph_decode_32_safe(p, end, struct_len, bad); 296 ceph_decode_need(p, end, struct_len, bad); 297 end = *p + struct_len; 298 } 299 300 ceph_decode_need(p, end, sizeof(**dirfrag), bad); 301 *dirfrag = *p; 302 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist); 303 if (unlikely(*p > end)) 304 goto bad; 305 if (features == (u64)-1) 306 *p = end; 307 return 0; 308 bad: 309 return -EIO; 310 } 311 312 static int parse_reply_info_lease(void **p, void *end, 313 struct ceph_mds_reply_lease **lease, 314 u64 features, u32 *altname_len, u8 **altname) 315 { 316 u8 struct_v; 317 u32 struct_len; 318 void *lend; 319 320 if (features == (u64)-1) { 321 u8 struct_compat; 322 323 ceph_decode_8_safe(p, end, struct_v, bad); 324 ceph_decode_8_safe(p, end, struct_compat, bad); 325 326 /* struct_v is expected to be >= 1. we only understand 327 * encoding whose struct_compat == 1. */ 328 if (!struct_v || struct_compat != 1) 329 goto bad; 330 331 ceph_decode_32_safe(p, end, struct_len, bad); 332 } else { 333 struct_len = sizeof(**lease); 334 *altname_len = 0; 335 *altname = NULL; 336 } 337 338 lend = *p + struct_len; 339 ceph_decode_need(p, end, struct_len, bad); 340 *lease = *p; 341 *p += sizeof(**lease); 342 343 if (features == (u64)-1) { 344 if (struct_v >= 2) { 345 ceph_decode_32_safe(p, end, *altname_len, bad); 346 ceph_decode_need(p, end, *altname_len, bad); 347 *altname = *p; 348 *p += *altname_len; 349 } else { 350 *altname = NULL; 351 *altname_len = 0; 352 } 353 } 354 *p = lend; 355 return 0; 356 bad: 357 return -EIO; 358 } 359 360 /* 361 * parse a normal reply, which may contain a (dir+)dentry and/or a 362 * target inode. 363 */ 364 static int parse_reply_info_trace(void **p, void *end, 365 struct ceph_mds_reply_info_parsed *info, 366 u64 features) 367 { 368 int err; 369 370 if (info->head->is_dentry) { 371 err = parse_reply_info_in(p, end, &info->diri, features); 372 if (err < 0) 373 goto out_bad; 374 375 err = parse_reply_info_dir(p, end, &info->dirfrag, features); 376 if (err < 0) 377 goto out_bad; 378 379 ceph_decode_32_safe(p, end, info->dname_len, bad); 380 ceph_decode_need(p, end, info->dname_len, bad); 381 info->dname = *p; 382 *p += info->dname_len; 383 384 err = parse_reply_info_lease(p, end, &info->dlease, features, 385 &info->altname_len, &info->altname); 386 if (err < 0) 387 goto out_bad; 388 } 389 390 if (info->head->is_target) { 391 err = parse_reply_info_in(p, end, &info->targeti, features); 392 if (err < 0) 393 goto out_bad; 394 } 395 396 if (unlikely(*p != end)) 397 goto bad; 398 return 0; 399 400 bad: 401 err = -EIO; 402 out_bad: 403 pr_err("problem parsing mds trace %d\n", err); 404 return err; 405 } 406 407 /* 408 * parse readdir results 409 */ 410 static int parse_reply_info_readdir(void **p, void *end, 411 struct ceph_mds_request *req, 412 u64 features) 413 { 414 struct ceph_mds_reply_info_parsed *info = &req->r_reply_info; 415 struct ceph_client *cl = req->r_mdsc->fsc->client; 416 u32 num, i = 0; 417 int err; 418 419 err = parse_reply_info_dir(p, end, &info->dir_dir, features); 420 if (err < 0) 421 goto out_bad; 422 423 ceph_decode_need(p, end, sizeof(num) + 2, bad); 424 num = ceph_decode_32(p); 425 { 426 u16 flags = ceph_decode_16(p); 427 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END); 428 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE); 429 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER); 430 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH); 431 } 432 if (num == 0) 433 goto done; 434 435 BUG_ON(!info->dir_entries); 436 if ((unsigned long)(info->dir_entries + num) > 437 (unsigned long)info->dir_entries + info->dir_buf_size) { 438 pr_err_client(cl, "dir contents are larger than expected\n"); 439 WARN_ON(1); 440 goto bad; 441 } 442 443 info->dir_nr = num; 444 while (num) { 445 struct inode *inode = d_inode(req->r_dentry); 446 struct ceph_inode_info *ci = ceph_inode(inode); 447 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 448 struct fscrypt_str tname = FSTR_INIT(NULL, 0); 449 struct fscrypt_str oname = FSTR_INIT(NULL, 0); 450 struct ceph_fname fname; 451 u32 altname_len, _name_len; 452 u8 *altname, *_name; 453 454 /* dentry */ 455 ceph_decode_32_safe(p, end, _name_len, bad); 456 ceph_decode_need(p, end, _name_len, bad); 457 _name = *p; 458 *p += _name_len; 459 doutc(cl, "parsed dir dname '%.*s'\n", _name_len, _name); 460 461 if (info->hash_order) 462 rde->raw_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash, 463 _name, _name_len); 464 465 /* dentry lease */ 466 err = parse_reply_info_lease(p, end, &rde->lease, features, 467 &altname_len, &altname); 468 if (err) 469 goto out_bad; 470 471 /* 472 * Try to dencrypt the dentry names and update them 473 * in the ceph_mds_reply_dir_entry struct. 474 */ 475 fname.dir = inode; 476 fname.name = _name; 477 fname.name_len = _name_len; 478 fname.ctext = altname; 479 fname.ctext_len = altname_len; 480 /* 481 * The _name_len maybe larger than altname_len, such as 482 * when the human readable name length is in range of 483 * (CEPH_NOHASH_NAME_MAX, CEPH_NOHASH_NAME_MAX + SHA256_DIGEST_SIZE), 484 * then the copy in ceph_fname_to_usr will corrupt the 485 * data if there has no encryption key. 486 * 487 * Just set the no_copy flag and then if there has no 488 * encryption key the oname.name will be assigned to 489 * _name always. 490 */ 491 fname.no_copy = true; 492 if (altname_len == 0) { 493 /* 494 * Set tname to _name, and this will be used 495 * to do the base64_decode in-place. It's 496 * safe because the decoded string should 497 * always be shorter, which is 3/4 of origin 498 * string. 499 */ 500 tname.name = _name; 501 502 /* 503 * Set oname to _name too, and this will be 504 * used to do the dencryption in-place. 505 */ 506 oname.name = _name; 507 oname.len = _name_len; 508 } else { 509 /* 510 * This will do the decryption only in-place 511 * from altname cryptext directly. 512 */ 513 oname.name = altname; 514 oname.len = altname_len; 515 } 516 rde->is_nokey = false; 517 err = ceph_fname_to_usr(&fname, &tname, &oname, &rde->is_nokey); 518 if (err) { 519 pr_err_client(cl, "unable to decode %.*s, got %d\n", 520 _name_len, _name, err); 521 goto out_bad; 522 } 523 rde->name = oname.name; 524 rde->name_len = oname.len; 525 526 /* inode */ 527 err = parse_reply_info_in(p, end, &rde->inode, features); 528 if (err < 0) 529 goto out_bad; 530 /* ceph_readdir_prepopulate() will update it */ 531 rde->offset = 0; 532 i++; 533 num--; 534 } 535 536 done: 537 /* Skip over any unrecognized fields */ 538 *p = end; 539 return 0; 540 541 bad: 542 err = -EIO; 543 out_bad: 544 pr_err_client(cl, "problem parsing dir contents %d\n", err); 545 return err; 546 } 547 548 /* 549 * parse fcntl F_GETLK results 550 */ 551 static int parse_reply_info_filelock(void **p, void *end, 552 struct ceph_mds_reply_info_parsed *info, 553 u64 features) 554 { 555 if (*p + sizeof(*info->filelock_reply) > end) 556 goto bad; 557 558 info->filelock_reply = *p; 559 560 /* Skip over any unrecognized fields */ 561 *p = end; 562 return 0; 563 bad: 564 return -EIO; 565 } 566 567 568 #if BITS_PER_LONG == 64 569 570 #define DELEGATED_INO_AVAILABLE xa_mk_value(1) 571 572 static int ceph_parse_deleg_inos(void **p, void *end, 573 struct ceph_mds_session *s) 574 { 575 struct ceph_client *cl = s->s_mdsc->fsc->client; 576 u32 sets; 577 578 ceph_decode_32_safe(p, end, sets, bad); 579 doutc(cl, "got %u sets of delegated inodes\n", sets); 580 while (sets--) { 581 u64 start, len; 582 583 ceph_decode_64_safe(p, end, start, bad); 584 ceph_decode_64_safe(p, end, len, bad); 585 586 /* Don't accept a delegation of system inodes */ 587 if (start < CEPH_INO_SYSTEM_BASE) { 588 pr_warn_ratelimited_client(cl, 589 "ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n", 590 start, len); 591 continue; 592 } 593 while (len--) { 594 int err = xa_insert(&s->s_delegated_inos, start++, 595 DELEGATED_INO_AVAILABLE, 596 GFP_KERNEL); 597 if (!err) { 598 doutc(cl, "added delegated inode 0x%llx\n", start - 1); 599 } else if (err == -EBUSY) { 600 pr_warn_client(cl, 601 "MDS delegated inode 0x%llx more than once.\n", 602 start - 1); 603 } else { 604 return err; 605 } 606 } 607 } 608 return 0; 609 bad: 610 return -EIO; 611 } 612 613 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 614 { 615 unsigned long ino; 616 void *val; 617 618 xa_for_each(&s->s_delegated_inos, ino, val) { 619 val = xa_erase(&s->s_delegated_inos, ino); 620 if (val == DELEGATED_INO_AVAILABLE) 621 return ino; 622 } 623 return 0; 624 } 625 626 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 627 { 628 return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE, 629 GFP_KERNEL); 630 } 631 #else /* BITS_PER_LONG == 64 */ 632 /* 633 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just 634 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top 635 * and bottom words? 636 */ 637 static int ceph_parse_deleg_inos(void **p, void *end, 638 struct ceph_mds_session *s) 639 { 640 u32 sets; 641 642 ceph_decode_32_safe(p, end, sets, bad); 643 if (sets) 644 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad); 645 return 0; 646 bad: 647 return -EIO; 648 } 649 650 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 651 { 652 return 0; 653 } 654 655 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 656 { 657 return 0; 658 } 659 #endif /* BITS_PER_LONG == 64 */ 660 661 /* 662 * parse create results 663 */ 664 static int parse_reply_info_create(void **p, void *end, 665 struct ceph_mds_reply_info_parsed *info, 666 u64 features, struct ceph_mds_session *s) 667 { 668 int ret; 669 670 if (features == (u64)-1 || 671 (features & CEPH_FEATURE_REPLY_CREATE_INODE)) { 672 if (*p == end) { 673 /* Malformed reply? */ 674 info->has_create_ino = false; 675 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) { 676 info->has_create_ino = true; 677 /* struct_v, struct_compat, and len */ 678 ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad); 679 ceph_decode_64_safe(p, end, info->ino, bad); 680 ret = ceph_parse_deleg_inos(p, end, s); 681 if (ret) 682 return ret; 683 } else { 684 /* legacy */ 685 ceph_decode_64_safe(p, end, info->ino, bad); 686 info->has_create_ino = true; 687 } 688 } else { 689 if (*p != end) 690 goto bad; 691 } 692 693 /* Skip over any unrecognized fields */ 694 *p = end; 695 return 0; 696 bad: 697 return -EIO; 698 } 699 700 static int parse_reply_info_getvxattr(void **p, void *end, 701 struct ceph_mds_reply_info_parsed *info, 702 u64 features) 703 { 704 u32 value_len; 705 706 ceph_decode_skip_8(p, end, bad); /* skip current version: 1 */ 707 ceph_decode_skip_8(p, end, bad); /* skip first version: 1 */ 708 ceph_decode_skip_32(p, end, bad); /* skip payload length */ 709 710 ceph_decode_32_safe(p, end, value_len, bad); 711 712 if (value_len == end - *p) { 713 info->xattr_info.xattr_value = *p; 714 info->xattr_info.xattr_value_len = value_len; 715 *p = end; 716 return value_len; 717 } 718 bad: 719 return -EIO; 720 } 721 722 /* 723 * parse extra results 724 */ 725 static int parse_reply_info_extra(void **p, void *end, 726 struct ceph_mds_request *req, 727 u64 features, struct ceph_mds_session *s) 728 { 729 struct ceph_mds_reply_info_parsed *info = &req->r_reply_info; 730 u32 op = le32_to_cpu(info->head->op); 731 732 if (op == CEPH_MDS_OP_GETFILELOCK) 733 return parse_reply_info_filelock(p, end, info, features); 734 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP) 735 return parse_reply_info_readdir(p, end, req, features); 736 else if (op == CEPH_MDS_OP_CREATE) 737 return parse_reply_info_create(p, end, info, features, s); 738 else if (op == CEPH_MDS_OP_GETVXATTR) 739 return parse_reply_info_getvxattr(p, end, info, features); 740 else 741 return -EIO; 742 } 743 744 /* 745 * parse entire mds reply 746 */ 747 static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg, 748 struct ceph_mds_request *req, u64 features) 749 { 750 struct ceph_mds_reply_info_parsed *info = &req->r_reply_info; 751 struct ceph_client *cl = s->s_mdsc->fsc->client; 752 void *p, *end; 753 u32 len; 754 int err; 755 756 info->head = msg->front.iov_base; 757 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); 758 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head); 759 760 /* trace */ 761 ceph_decode_32_safe(&p, end, len, bad); 762 if (len > 0) { 763 ceph_decode_need(&p, end, len, bad); 764 err = parse_reply_info_trace(&p, p+len, info, features); 765 if (err < 0) 766 goto out_bad; 767 } 768 769 /* extra */ 770 ceph_decode_32_safe(&p, end, len, bad); 771 if (len > 0) { 772 ceph_decode_need(&p, end, len, bad); 773 err = parse_reply_info_extra(&p, p+len, req, features, s); 774 if (err < 0) 775 goto out_bad; 776 } 777 778 /* snap blob */ 779 ceph_decode_32_safe(&p, end, len, bad); 780 info->snapblob_len = len; 781 info->snapblob = p; 782 p += len; 783 784 if (p != end) 785 goto bad; 786 return 0; 787 788 bad: 789 err = -EIO; 790 out_bad: 791 pr_err_client(cl, "mds parse_reply err %d\n", err); 792 ceph_msg_dump(msg); 793 return err; 794 } 795 796 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 797 { 798 int i; 799 800 kfree(info->diri.fscrypt_auth); 801 kfree(info->diri.fscrypt_file); 802 kfree(info->targeti.fscrypt_auth); 803 kfree(info->targeti.fscrypt_file); 804 if (!info->dir_entries) 805 return; 806 807 for (i = 0; i < info->dir_nr; i++) { 808 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 809 810 kfree(rde->inode.fscrypt_auth); 811 kfree(rde->inode.fscrypt_file); 812 } 813 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size)); 814 } 815 816 /* 817 * In async unlink case the kclient won't wait for the first reply 818 * from MDS and just drop all the links and unhash the dentry and then 819 * succeeds immediately. 820 * 821 * For any new create/link/rename,etc requests followed by using the 822 * same file names we must wait for the first reply of the inflight 823 * unlink request, or the MDS possibly will fail these following 824 * requests with -EEXIST if the inflight async unlink request was 825 * delayed for some reasons. 826 * 827 * And the worst case is that for the none async openc request it will 828 * successfully open the file if the CDentry hasn't been unlinked yet, 829 * but later the previous delayed async unlink request will remove the 830 * CDentry. That means the just created file is possibly deleted later 831 * by accident. 832 * 833 * We need to wait for the inflight async unlink requests to finish 834 * when creating new files/directories by using the same file names. 835 */ 836 int ceph_wait_on_conflict_unlink(struct dentry *dentry) 837 { 838 struct ceph_fs_client *fsc = ceph_sb_to_fs_client(dentry->d_sb); 839 struct ceph_client *cl = fsc->client; 840 struct dentry *pdentry = dentry->d_parent; 841 struct dentry *udentry, *found = NULL; 842 struct ceph_dentry_info *di; 843 struct qstr dname; 844 u32 hash = dentry->d_name.hash; 845 int err; 846 847 dname.name = dentry->d_name.name; 848 dname.len = dentry->d_name.len; 849 850 rcu_read_lock(); 851 hash_for_each_possible_rcu(fsc->async_unlink_conflict, di, 852 hnode, hash) { 853 udentry = di->dentry; 854 855 spin_lock(&udentry->d_lock); 856 if (udentry->d_name.hash != hash) 857 goto next; 858 if (unlikely(udentry->d_parent != pdentry)) 859 goto next; 860 if (!hash_hashed(&di->hnode)) 861 goto next; 862 863 if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags)) 864 pr_warn_client(cl, "dentry %p:%pd async unlink bit is not set\n", 865 dentry, dentry); 866 867 if (!d_same_name(udentry, pdentry, &dname)) 868 goto next; 869 870 found = dget_dlock(udentry); 871 spin_unlock(&udentry->d_lock); 872 break; 873 next: 874 spin_unlock(&udentry->d_lock); 875 } 876 rcu_read_unlock(); 877 878 if (likely(!found)) 879 return 0; 880 881 doutc(cl, "dentry %p:%pd conflict with old %p:%pd\n", dentry, dentry, 882 found, found); 883 884 err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT, 885 TASK_KILLABLE); 886 dput(found); 887 return err; 888 } 889 890 891 /* 892 * sessions 893 */ 894 const char *ceph_session_state_name(int s) 895 { 896 switch (s) { 897 case CEPH_MDS_SESSION_NEW: return "new"; 898 case CEPH_MDS_SESSION_OPENING: return "opening"; 899 case CEPH_MDS_SESSION_OPEN: return "open"; 900 case CEPH_MDS_SESSION_HUNG: return "hung"; 901 case CEPH_MDS_SESSION_CLOSING: return "closing"; 902 case CEPH_MDS_SESSION_CLOSED: return "closed"; 903 case CEPH_MDS_SESSION_RESTARTING: return "restarting"; 904 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; 905 case CEPH_MDS_SESSION_REJECTED: return "rejected"; 906 default: return "???"; 907 } 908 } 909 910 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s) 911 { 912 if (refcount_inc_not_zero(&s->s_ref)) 913 return s; 914 return NULL; 915 } 916 917 void ceph_put_mds_session(struct ceph_mds_session *s) 918 { 919 if (IS_ERR_OR_NULL(s)) 920 return; 921 922 if (refcount_dec_and_test(&s->s_ref)) { 923 if (s->s_auth.authorizer) 924 ceph_auth_destroy_authorizer(s->s_auth.authorizer); 925 WARN_ON(mutex_is_locked(&s->s_mutex)); 926 xa_destroy(&s->s_delegated_inos); 927 kfree(s); 928 } 929 } 930 931 /* 932 * called under mdsc->mutex 933 */ 934 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, 935 int mds) 936 { 937 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 938 return NULL; 939 return ceph_get_mds_session(mdsc->sessions[mds]); 940 } 941 942 static bool __have_session(struct ceph_mds_client *mdsc, int mds) 943 { 944 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 945 return false; 946 else 947 return true; 948 } 949 950 static int __verify_registered_session(struct ceph_mds_client *mdsc, 951 struct ceph_mds_session *s) 952 { 953 if (s->s_mds >= mdsc->max_sessions || 954 mdsc->sessions[s->s_mds] != s) 955 return -ENOENT; 956 return 0; 957 } 958 959 /* 960 * create+register a new session for given mds. 961 * called under mdsc->mutex. 962 */ 963 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, 964 int mds) 965 { 966 struct ceph_client *cl = mdsc->fsc->client; 967 struct ceph_mds_session *s; 968 969 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) 970 return ERR_PTR(-EIO); 971 972 if (mds >= mdsc->mdsmap->possible_max_rank) 973 return ERR_PTR(-EINVAL); 974 975 s = kzalloc(sizeof(*s), GFP_NOFS); 976 if (!s) 977 return ERR_PTR(-ENOMEM); 978 979 if (mds >= mdsc->max_sessions) { 980 int newmax = 1 << get_count_order(mds + 1); 981 struct ceph_mds_session **sa; 982 983 doutc(cl, "realloc to %d\n", newmax); 984 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS); 985 if (!sa) 986 goto fail_realloc; 987 if (mdsc->sessions) { 988 memcpy(sa, mdsc->sessions, 989 mdsc->max_sessions * sizeof(void *)); 990 kfree(mdsc->sessions); 991 } 992 mdsc->sessions = sa; 993 mdsc->max_sessions = newmax; 994 } 995 996 doutc(cl, "mds%d\n", mds); 997 s->s_mdsc = mdsc; 998 s->s_mds = mds; 999 s->s_state = CEPH_MDS_SESSION_NEW; 1000 mutex_init(&s->s_mutex); 1001 1002 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr); 1003 1004 atomic_set(&s->s_cap_gen, 1); 1005 s->s_cap_ttl = jiffies - 1; 1006 1007 spin_lock_init(&s->s_cap_lock); 1008 INIT_LIST_HEAD(&s->s_caps); 1009 refcount_set(&s->s_ref, 1); 1010 INIT_LIST_HEAD(&s->s_waiting); 1011 INIT_LIST_HEAD(&s->s_unsafe); 1012 xa_init(&s->s_delegated_inos); 1013 INIT_LIST_HEAD(&s->s_cap_releases); 1014 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work); 1015 1016 INIT_LIST_HEAD(&s->s_cap_dirty); 1017 INIT_LIST_HEAD(&s->s_cap_flushing); 1018 1019 mdsc->sessions[mds] = s; 1020 atomic_inc(&mdsc->num_sessions); 1021 refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 1022 1023 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, 1024 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 1025 1026 return s; 1027 1028 fail_realloc: 1029 kfree(s); 1030 return ERR_PTR(-ENOMEM); 1031 } 1032 1033 /* 1034 * called under mdsc->mutex 1035 */ 1036 static void __unregister_session(struct ceph_mds_client *mdsc, 1037 struct ceph_mds_session *s) 1038 { 1039 doutc(mdsc->fsc->client, "mds%d %p\n", s->s_mds, s); 1040 BUG_ON(mdsc->sessions[s->s_mds] != s); 1041 mdsc->sessions[s->s_mds] = NULL; 1042 ceph_con_close(&s->s_con); 1043 ceph_put_mds_session(s); 1044 atomic_dec(&mdsc->num_sessions); 1045 } 1046 1047 /* 1048 * drop session refs in request. 1049 * 1050 * should be last request ref, or hold mdsc->mutex 1051 */ 1052 static void put_request_session(struct ceph_mds_request *req) 1053 { 1054 if (req->r_session) { 1055 ceph_put_mds_session(req->r_session); 1056 req->r_session = NULL; 1057 } 1058 } 1059 1060 void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc, 1061 void (*cb)(struct ceph_mds_session *), 1062 bool check_state) 1063 { 1064 int mds; 1065 1066 mutex_lock(&mdsc->mutex); 1067 for (mds = 0; mds < mdsc->max_sessions; ++mds) { 1068 struct ceph_mds_session *s; 1069 1070 s = __ceph_lookup_mds_session(mdsc, mds); 1071 if (!s) 1072 continue; 1073 1074 if (check_state && !check_session_state(s)) { 1075 ceph_put_mds_session(s); 1076 continue; 1077 } 1078 1079 mutex_unlock(&mdsc->mutex); 1080 cb(s); 1081 ceph_put_mds_session(s); 1082 mutex_lock(&mdsc->mutex); 1083 } 1084 mutex_unlock(&mdsc->mutex); 1085 } 1086 1087 void ceph_mdsc_release_request(struct kref *kref) 1088 { 1089 struct ceph_mds_request *req = container_of(kref, 1090 struct ceph_mds_request, 1091 r_kref); 1092 ceph_mdsc_release_dir_caps_async(req); 1093 destroy_reply_info(&req->r_reply_info); 1094 if (req->r_request) 1095 ceph_msg_put(req->r_request); 1096 if (req->r_reply) 1097 ceph_msg_put(req->r_reply); 1098 if (req->r_inode) { 1099 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 1100 iput(req->r_inode); 1101 } 1102 if (req->r_parent) { 1103 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 1104 iput(req->r_parent); 1105 } 1106 iput(req->r_target_inode); 1107 iput(req->r_new_inode); 1108 if (req->r_dentry) 1109 dput(req->r_dentry); 1110 if (req->r_old_dentry) 1111 dput(req->r_old_dentry); 1112 if (req->r_old_dentry_dir) { 1113 /* 1114 * track (and drop pins for) r_old_dentry_dir 1115 * separately, since r_old_dentry's d_parent may have 1116 * changed between the dir mutex being dropped and 1117 * this request being freed. 1118 */ 1119 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), 1120 CEPH_CAP_PIN); 1121 iput(req->r_old_dentry_dir); 1122 } 1123 kfree(req->r_path1); 1124 kfree(req->r_path2); 1125 put_cred(req->r_cred); 1126 if (req->r_mnt_idmap) 1127 mnt_idmap_put(req->r_mnt_idmap); 1128 if (req->r_pagelist) 1129 ceph_pagelist_release(req->r_pagelist); 1130 kfree(req->r_fscrypt_auth); 1131 kfree(req->r_altname); 1132 put_request_session(req); 1133 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); 1134 WARN_ON_ONCE(!list_empty(&req->r_wait)); 1135 kmem_cache_free(ceph_mds_request_cachep, req); 1136 } 1137 1138 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node) 1139 1140 /* 1141 * lookup session, bump ref if found. 1142 * 1143 * called under mdsc->mutex. 1144 */ 1145 static struct ceph_mds_request * 1146 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid) 1147 { 1148 struct ceph_mds_request *req; 1149 1150 req = lookup_request(&mdsc->request_tree, tid); 1151 if (req) 1152 ceph_mdsc_get_request(req); 1153 1154 return req; 1155 } 1156 1157 /* 1158 * Register an in-flight request, and assign a tid. Link to directory 1159 * are modifying (if any). 1160 * 1161 * Called under mdsc->mutex. 1162 */ 1163 static void __register_request(struct ceph_mds_client *mdsc, 1164 struct ceph_mds_request *req, 1165 struct inode *dir) 1166 { 1167 struct ceph_client *cl = mdsc->fsc->client; 1168 int ret = 0; 1169 1170 req->r_tid = ++mdsc->last_tid; 1171 if (req->r_num_caps) { 1172 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation, 1173 req->r_num_caps); 1174 if (ret < 0) { 1175 pr_err_client(cl, "%p failed to reserve caps: %d\n", 1176 req, ret); 1177 /* set req->r_err to fail early from __do_request */ 1178 req->r_err = ret; 1179 return; 1180 } 1181 } 1182 doutc(cl, "%p tid %lld\n", req, req->r_tid); 1183 ceph_mdsc_get_request(req); 1184 insert_request(&mdsc->request_tree, req); 1185 1186 req->r_cred = get_current_cred(); 1187 if (!req->r_mnt_idmap) 1188 req->r_mnt_idmap = &nop_mnt_idmap; 1189 1190 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK) 1191 mdsc->oldest_tid = req->r_tid; 1192 1193 if (dir) { 1194 struct ceph_inode_info *ci = ceph_inode(dir); 1195 1196 ihold(dir); 1197 req->r_unsafe_dir = dir; 1198 spin_lock(&ci->i_unsafe_lock); 1199 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); 1200 spin_unlock(&ci->i_unsafe_lock); 1201 } 1202 } 1203 1204 static void __unregister_request(struct ceph_mds_client *mdsc, 1205 struct ceph_mds_request *req) 1206 { 1207 doutc(mdsc->fsc->client, "%p tid %lld\n", req, req->r_tid); 1208 1209 /* Never leave an unregistered request on an unsafe list! */ 1210 list_del_init(&req->r_unsafe_item); 1211 1212 if (req->r_tid == mdsc->oldest_tid) { 1213 struct rb_node *p = rb_next(&req->r_node); 1214 mdsc->oldest_tid = 0; 1215 while (p) { 1216 struct ceph_mds_request *next_req = 1217 rb_entry(p, struct ceph_mds_request, r_node); 1218 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) { 1219 mdsc->oldest_tid = next_req->r_tid; 1220 break; 1221 } 1222 p = rb_next(p); 1223 } 1224 } 1225 1226 erase_request(&mdsc->request_tree, req); 1227 1228 if (req->r_unsafe_dir) { 1229 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 1230 spin_lock(&ci->i_unsafe_lock); 1231 list_del_init(&req->r_unsafe_dir_item); 1232 spin_unlock(&ci->i_unsafe_lock); 1233 } 1234 if (req->r_target_inode && 1235 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 1236 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); 1237 spin_lock(&ci->i_unsafe_lock); 1238 list_del_init(&req->r_unsafe_target_item); 1239 spin_unlock(&ci->i_unsafe_lock); 1240 } 1241 1242 if (req->r_unsafe_dir) { 1243 iput(req->r_unsafe_dir); 1244 req->r_unsafe_dir = NULL; 1245 } 1246 1247 complete_all(&req->r_safe_completion); 1248 1249 ceph_mdsc_put_request(req); 1250 } 1251 1252 /* 1253 * Walk back up the dentry tree until we hit a dentry representing a 1254 * non-snapshot inode. We do this using the rcu_read_lock (which must be held 1255 * when calling this) to ensure that the objects won't disappear while we're 1256 * working with them. Once we hit a candidate dentry, we attempt to take a 1257 * reference to it, and return that as the result. 1258 */ 1259 static struct inode *get_nonsnap_parent(struct dentry *dentry) 1260 { 1261 struct inode *inode = NULL; 1262 1263 while (dentry && !IS_ROOT(dentry)) { 1264 inode = d_inode_rcu(dentry); 1265 if (!inode || ceph_snap(inode) == CEPH_NOSNAP) 1266 break; 1267 dentry = dentry->d_parent; 1268 } 1269 if (inode) 1270 inode = igrab(inode); 1271 return inode; 1272 } 1273 1274 /* 1275 * Choose mds to send request to next. If there is a hint set in the 1276 * request (e.g., due to a prior forward hint from the mds), use that. 1277 * Otherwise, consult frag tree and/or caps to identify the 1278 * appropriate mds. If all else fails, choose randomly. 1279 * 1280 * Called under mdsc->mutex. 1281 */ 1282 static int __choose_mds(struct ceph_mds_client *mdsc, 1283 struct ceph_mds_request *req, 1284 bool *random) 1285 { 1286 struct inode *inode; 1287 struct ceph_inode_info *ci; 1288 struct ceph_cap *cap; 1289 int mode = req->r_direct_mode; 1290 int mds = -1; 1291 u32 hash = req->r_direct_hash; 1292 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); 1293 struct ceph_client *cl = mdsc->fsc->client; 1294 1295 if (random) 1296 *random = false; 1297 1298 /* 1299 * is there a specific mds we should try? ignore hint if we have 1300 * no session and the mds is not up (active or recovering). 1301 */ 1302 if (req->r_resend_mds >= 0 && 1303 (__have_session(mdsc, req->r_resend_mds) || 1304 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) { 1305 doutc(cl, "using resend_mds mds%d\n", req->r_resend_mds); 1306 return req->r_resend_mds; 1307 } 1308 1309 if (mode == USE_RANDOM_MDS) 1310 goto random; 1311 1312 inode = NULL; 1313 if (req->r_inode) { 1314 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) { 1315 inode = req->r_inode; 1316 ihold(inode); 1317 } else { 1318 /* req->r_dentry is non-null for LSSNAP request */ 1319 rcu_read_lock(); 1320 inode = get_nonsnap_parent(req->r_dentry); 1321 rcu_read_unlock(); 1322 doutc(cl, "using snapdir's parent %p %llx.%llx\n", 1323 inode, ceph_vinop(inode)); 1324 } 1325 } else if (req->r_dentry) { 1326 /* ignore race with rename; old or new d_parent is okay */ 1327 struct dentry *parent; 1328 struct inode *dir; 1329 1330 rcu_read_lock(); 1331 parent = READ_ONCE(req->r_dentry->d_parent); 1332 dir = req->r_parent ? : d_inode_rcu(parent); 1333 1334 if (!dir || dir->i_sb != mdsc->fsc->sb) { 1335 /* not this fs or parent went negative */ 1336 inode = d_inode(req->r_dentry); 1337 if (inode) 1338 ihold(inode); 1339 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 1340 /* direct snapped/virtual snapdir requests 1341 * based on parent dir inode */ 1342 inode = get_nonsnap_parent(parent); 1343 doutc(cl, "using nonsnap parent %p %llx.%llx\n", 1344 inode, ceph_vinop(inode)); 1345 } else { 1346 /* dentry target */ 1347 inode = d_inode(req->r_dentry); 1348 if (!inode || mode == USE_AUTH_MDS) { 1349 /* dir + name */ 1350 inode = igrab(dir); 1351 hash = ceph_dentry_hash(dir, req->r_dentry); 1352 is_hash = true; 1353 } else { 1354 ihold(inode); 1355 } 1356 } 1357 rcu_read_unlock(); 1358 } 1359 1360 if (!inode) 1361 goto random; 1362 1363 doutc(cl, "%p %llx.%llx is_hash=%d (0x%x) mode %d\n", inode, 1364 ceph_vinop(inode), (int)is_hash, hash, mode); 1365 ci = ceph_inode(inode); 1366 1367 if (is_hash && S_ISDIR(inode->i_mode)) { 1368 struct ceph_inode_frag frag; 1369 int found; 1370 1371 ceph_choose_frag(ci, hash, &frag, &found); 1372 if (found) { 1373 if (mode == USE_ANY_MDS && frag.ndist > 0) { 1374 u8 r; 1375 1376 /* choose a random replica */ 1377 get_random_bytes(&r, 1); 1378 r %= frag.ndist; 1379 mds = frag.dist[r]; 1380 doutc(cl, "%p %llx.%llx frag %u mds%d (%d/%d)\n", 1381 inode, ceph_vinop(inode), frag.frag, 1382 mds, (int)r, frag.ndist); 1383 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1384 CEPH_MDS_STATE_ACTIVE && 1385 !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds)) 1386 goto out; 1387 } 1388 1389 /* since this file/dir wasn't known to be 1390 * replicated, then we want to look for the 1391 * authoritative mds. */ 1392 if (frag.mds >= 0) { 1393 /* choose auth mds */ 1394 mds = frag.mds; 1395 doutc(cl, "%p %llx.%llx frag %u mds%d (auth)\n", 1396 inode, ceph_vinop(inode), frag.frag, mds); 1397 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1398 CEPH_MDS_STATE_ACTIVE) { 1399 if (!ceph_mdsmap_is_laggy(mdsc->mdsmap, 1400 mds)) 1401 goto out; 1402 } 1403 } 1404 mode = USE_AUTH_MDS; 1405 } 1406 } 1407 1408 spin_lock(&ci->i_ceph_lock); 1409 cap = NULL; 1410 if (mode == USE_AUTH_MDS) 1411 cap = ci->i_auth_cap; 1412 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) 1413 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); 1414 if (!cap) { 1415 spin_unlock(&ci->i_ceph_lock); 1416 iput(inode); 1417 goto random; 1418 } 1419 mds = cap->session->s_mds; 1420 doutc(cl, "%p %llx.%llx mds%d (%scap %p)\n", inode, 1421 ceph_vinop(inode), mds, 1422 cap == ci->i_auth_cap ? "auth " : "", cap); 1423 spin_unlock(&ci->i_ceph_lock); 1424 out: 1425 iput(inode); 1426 return mds; 1427 1428 random: 1429 if (random) 1430 *random = true; 1431 1432 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap); 1433 doutc(cl, "chose random mds%d\n", mds); 1434 return mds; 1435 } 1436 1437 1438 /* 1439 * session messages 1440 */ 1441 struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq) 1442 { 1443 struct ceph_msg *msg; 1444 struct ceph_mds_session_head *h; 1445 1446 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS, 1447 false); 1448 if (!msg) { 1449 pr_err("ENOMEM creating session %s msg\n", 1450 ceph_session_op_name(op)); 1451 return NULL; 1452 } 1453 h = msg->front.iov_base; 1454 h->op = cpu_to_le32(op); 1455 h->seq = cpu_to_le64(seq); 1456 1457 return msg; 1458 } 1459 1460 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED; 1461 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8) 1462 static int encode_supported_features(void **p, void *end) 1463 { 1464 static const size_t count = ARRAY_SIZE(feature_bits); 1465 1466 if (count > 0) { 1467 size_t i; 1468 size_t size = FEATURE_BYTES(count); 1469 unsigned long bit; 1470 1471 if (WARN_ON_ONCE(*p + 4 + size > end)) 1472 return -ERANGE; 1473 1474 ceph_encode_32(p, size); 1475 memset(*p, 0, size); 1476 for (i = 0; i < count; i++) { 1477 bit = feature_bits[i]; 1478 ((unsigned char *)(*p))[bit / 8] |= BIT(bit % 8); 1479 } 1480 *p += size; 1481 } else { 1482 if (WARN_ON_ONCE(*p + 4 > end)) 1483 return -ERANGE; 1484 1485 ceph_encode_32(p, 0); 1486 } 1487 1488 return 0; 1489 } 1490 1491 static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED; 1492 #define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8) 1493 static int encode_metric_spec(void **p, void *end) 1494 { 1495 static const size_t count = ARRAY_SIZE(metric_bits); 1496 1497 /* header */ 1498 if (WARN_ON_ONCE(*p + 2 > end)) 1499 return -ERANGE; 1500 1501 ceph_encode_8(p, 1); /* version */ 1502 ceph_encode_8(p, 1); /* compat */ 1503 1504 if (count > 0) { 1505 size_t i; 1506 size_t size = METRIC_BYTES(count); 1507 1508 if (WARN_ON_ONCE(*p + 4 + 4 + size > end)) 1509 return -ERANGE; 1510 1511 /* metric spec info length */ 1512 ceph_encode_32(p, 4 + size); 1513 1514 /* metric spec */ 1515 ceph_encode_32(p, size); 1516 memset(*p, 0, size); 1517 for (i = 0; i < count; i++) 1518 ((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8); 1519 *p += size; 1520 } else { 1521 if (WARN_ON_ONCE(*p + 4 + 4 > end)) 1522 return -ERANGE; 1523 1524 /* metric spec info length */ 1525 ceph_encode_32(p, 4); 1526 /* metric spec */ 1527 ceph_encode_32(p, 0); 1528 } 1529 1530 return 0; 1531 } 1532 1533 /* 1534 * session message, specialization for CEPH_SESSION_REQUEST_OPEN 1535 * to include additional client metadata fields. 1536 */ 1537 static struct ceph_msg * 1538 create_session_full_msg(struct ceph_mds_client *mdsc, int op, u64 seq) 1539 { 1540 struct ceph_msg *msg; 1541 struct ceph_mds_session_head *h; 1542 int i; 1543 int extra_bytes = 0; 1544 int metadata_key_count = 0; 1545 struct ceph_options *opt = mdsc->fsc->client->options; 1546 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options; 1547 struct ceph_client *cl = mdsc->fsc->client; 1548 size_t size, count; 1549 void *p, *end; 1550 int ret; 1551 1552 const char* metadata[][2] = { 1553 {"hostname", mdsc->nodename}, 1554 {"kernel_version", init_utsname()->release}, 1555 {"entity_id", opt->name ? : ""}, 1556 {"root", fsopt->server_path ? : "/"}, 1557 {NULL, NULL} 1558 }; 1559 1560 /* Calculate serialized length of metadata */ 1561 extra_bytes = 4; /* map length */ 1562 for (i = 0; metadata[i][0]; ++i) { 1563 extra_bytes += 8 + strlen(metadata[i][0]) + 1564 strlen(metadata[i][1]); 1565 metadata_key_count++; 1566 } 1567 1568 /* supported feature */ 1569 size = 0; 1570 count = ARRAY_SIZE(feature_bits); 1571 if (count > 0) 1572 size = FEATURE_BYTES(count); 1573 extra_bytes += 4 + size; 1574 1575 /* metric spec */ 1576 size = 0; 1577 count = ARRAY_SIZE(metric_bits); 1578 if (count > 0) 1579 size = METRIC_BYTES(count); 1580 extra_bytes += 2 + 4 + 4 + size; 1581 1582 /* flags, mds auth caps and oldest_client_tid */ 1583 extra_bytes += 4 + 4 + 8; 1584 1585 /* Allocate the message */ 1586 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes, 1587 GFP_NOFS, false); 1588 if (!msg) { 1589 pr_err_client(cl, "ENOMEM creating session open msg\n"); 1590 return ERR_PTR(-ENOMEM); 1591 } 1592 p = msg->front.iov_base; 1593 end = p + msg->front.iov_len; 1594 1595 h = p; 1596 h->op = cpu_to_le32(op); 1597 h->seq = cpu_to_le64(seq); 1598 1599 /* 1600 * Serialize client metadata into waiting buffer space, using 1601 * the format that userspace expects for map<string, string> 1602 * 1603 * ClientSession messages with metadata are v7 1604 */ 1605 msg->hdr.version = cpu_to_le16(7); 1606 msg->hdr.compat_version = cpu_to_le16(1); 1607 1608 /* The write pointer, following the session_head structure */ 1609 p += sizeof(*h); 1610 1611 /* Number of entries in the map */ 1612 ceph_encode_32(&p, metadata_key_count); 1613 1614 /* Two length-prefixed strings for each entry in the map */ 1615 for (i = 0; metadata[i][0]; ++i) { 1616 size_t const key_len = strlen(metadata[i][0]); 1617 size_t const val_len = strlen(metadata[i][1]); 1618 1619 ceph_encode_32(&p, key_len); 1620 memcpy(p, metadata[i][0], key_len); 1621 p += key_len; 1622 ceph_encode_32(&p, val_len); 1623 memcpy(p, metadata[i][1], val_len); 1624 p += val_len; 1625 } 1626 1627 ret = encode_supported_features(&p, end); 1628 if (ret) { 1629 pr_err_client(cl, "encode_supported_features failed!\n"); 1630 ceph_msg_put(msg); 1631 return ERR_PTR(ret); 1632 } 1633 1634 ret = encode_metric_spec(&p, end); 1635 if (ret) { 1636 pr_err_client(cl, "encode_metric_spec failed!\n"); 1637 ceph_msg_put(msg); 1638 return ERR_PTR(ret); 1639 } 1640 1641 /* version == 5, flags */ 1642 ceph_encode_32(&p, 0); 1643 1644 /* version == 6, mds auth caps */ 1645 ceph_encode_32(&p, 0); 1646 1647 /* version == 7, oldest_client_tid */ 1648 ceph_encode_64(&p, mdsc->oldest_tid); 1649 1650 msg->front.iov_len = p - msg->front.iov_base; 1651 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1652 1653 return msg; 1654 } 1655 1656 /* 1657 * send session open request. 1658 * 1659 * called under mdsc->mutex 1660 */ 1661 static int __open_session(struct ceph_mds_client *mdsc, 1662 struct ceph_mds_session *session) 1663 { 1664 struct ceph_msg *msg; 1665 int mstate; 1666 int mds = session->s_mds; 1667 1668 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) 1669 return -EIO; 1670 1671 /* wait for mds to go active? */ 1672 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 1673 doutc(mdsc->fsc->client, "open_session to mds%d (%s)\n", mds, 1674 ceph_mds_state_name(mstate)); 1675 session->s_state = CEPH_MDS_SESSION_OPENING; 1676 session->s_renew_requested = jiffies; 1677 1678 /* send connect message */ 1679 msg = create_session_full_msg(mdsc, CEPH_SESSION_REQUEST_OPEN, 1680 session->s_seq); 1681 if (IS_ERR(msg)) 1682 return PTR_ERR(msg); 1683 ceph_con_send(&session->s_con, msg); 1684 return 0; 1685 } 1686 1687 /* 1688 * open sessions for any export targets for the given mds 1689 * 1690 * called under mdsc->mutex 1691 */ 1692 static struct ceph_mds_session * 1693 __open_export_target_session(struct ceph_mds_client *mdsc, int target) 1694 { 1695 struct ceph_mds_session *session; 1696 int ret; 1697 1698 session = __ceph_lookup_mds_session(mdsc, target); 1699 if (!session) { 1700 session = register_session(mdsc, target); 1701 if (IS_ERR(session)) 1702 return session; 1703 } 1704 if (session->s_state == CEPH_MDS_SESSION_NEW || 1705 session->s_state == CEPH_MDS_SESSION_CLOSING) { 1706 ret = __open_session(mdsc, session); 1707 if (ret) 1708 return ERR_PTR(ret); 1709 } 1710 1711 return session; 1712 } 1713 1714 struct ceph_mds_session * 1715 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target) 1716 { 1717 struct ceph_mds_session *session; 1718 struct ceph_client *cl = mdsc->fsc->client; 1719 1720 doutc(cl, "to mds%d\n", target); 1721 1722 mutex_lock(&mdsc->mutex); 1723 session = __open_export_target_session(mdsc, target); 1724 mutex_unlock(&mdsc->mutex); 1725 1726 return session; 1727 } 1728 1729 static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 1730 struct ceph_mds_session *session) 1731 { 1732 struct ceph_mds_info *mi; 1733 struct ceph_mds_session *ts; 1734 int i, mds = session->s_mds; 1735 struct ceph_client *cl = mdsc->fsc->client; 1736 1737 if (mds >= mdsc->mdsmap->possible_max_rank) 1738 return; 1739 1740 mi = &mdsc->mdsmap->m_info[mds]; 1741 doutc(cl, "for mds%d (%d targets)\n", session->s_mds, 1742 mi->num_export_targets); 1743 1744 for (i = 0; i < mi->num_export_targets; i++) { 1745 ts = __open_export_target_session(mdsc, mi->export_targets[i]); 1746 ceph_put_mds_session(ts); 1747 } 1748 } 1749 1750 /* 1751 * session caps 1752 */ 1753 1754 static void detach_cap_releases(struct ceph_mds_session *session, 1755 struct list_head *target) 1756 { 1757 struct ceph_client *cl = session->s_mdsc->fsc->client; 1758 1759 lockdep_assert_held(&session->s_cap_lock); 1760 1761 list_splice_init(&session->s_cap_releases, target); 1762 session->s_num_cap_releases = 0; 1763 doutc(cl, "mds%d\n", session->s_mds); 1764 } 1765 1766 static void dispose_cap_releases(struct ceph_mds_client *mdsc, 1767 struct list_head *dispose) 1768 { 1769 while (!list_empty(dispose)) { 1770 struct ceph_cap *cap; 1771 /* zero out the in-progress message */ 1772 cap = list_first_entry(dispose, struct ceph_cap, session_caps); 1773 list_del(&cap->session_caps); 1774 ceph_put_cap(mdsc, cap); 1775 } 1776 } 1777 1778 static void cleanup_session_requests(struct ceph_mds_client *mdsc, 1779 struct ceph_mds_session *session) 1780 { 1781 struct ceph_client *cl = mdsc->fsc->client; 1782 struct ceph_mds_request *req; 1783 struct rb_node *p; 1784 1785 doutc(cl, "mds%d\n", session->s_mds); 1786 mutex_lock(&mdsc->mutex); 1787 while (!list_empty(&session->s_unsafe)) { 1788 req = list_first_entry(&session->s_unsafe, 1789 struct ceph_mds_request, r_unsafe_item); 1790 pr_warn_ratelimited_client(cl, " dropping unsafe request %llu\n", 1791 req->r_tid); 1792 if (req->r_target_inode) 1793 mapping_set_error(req->r_target_inode->i_mapping, -EIO); 1794 if (req->r_unsafe_dir) 1795 mapping_set_error(req->r_unsafe_dir->i_mapping, -EIO); 1796 __unregister_request(mdsc, req); 1797 } 1798 /* zero r_attempts, so kick_requests() will re-send requests */ 1799 p = rb_first(&mdsc->request_tree); 1800 while (p) { 1801 req = rb_entry(p, struct ceph_mds_request, r_node); 1802 p = rb_next(p); 1803 if (req->r_session && 1804 req->r_session->s_mds == session->s_mds) 1805 req->r_attempts = 0; 1806 } 1807 mutex_unlock(&mdsc->mutex); 1808 } 1809 1810 /* 1811 * Helper to safely iterate over all caps associated with a session, with 1812 * special care taken to handle a racing __ceph_remove_cap(). 1813 * 1814 * Caller must hold session s_mutex. 1815 */ 1816 int ceph_iterate_session_caps(struct ceph_mds_session *session, 1817 int (*cb)(struct inode *, int mds, void *), 1818 void *arg) 1819 { 1820 struct ceph_client *cl = session->s_mdsc->fsc->client; 1821 struct list_head *p; 1822 struct ceph_cap *cap; 1823 struct inode *inode, *last_inode = NULL; 1824 struct ceph_cap *old_cap = NULL; 1825 int ret; 1826 1827 doutc(cl, "%p mds%d\n", session, session->s_mds); 1828 spin_lock(&session->s_cap_lock); 1829 p = session->s_caps.next; 1830 while (p != &session->s_caps) { 1831 int mds; 1832 1833 cap = list_entry(p, struct ceph_cap, session_caps); 1834 inode = igrab(&cap->ci->netfs.inode); 1835 if (!inode) { 1836 p = p->next; 1837 continue; 1838 } 1839 session->s_cap_iterator = cap; 1840 mds = cap->mds; 1841 spin_unlock(&session->s_cap_lock); 1842 1843 if (last_inode) { 1844 iput(last_inode); 1845 last_inode = NULL; 1846 } 1847 if (old_cap) { 1848 ceph_put_cap(session->s_mdsc, old_cap); 1849 old_cap = NULL; 1850 } 1851 1852 ret = cb(inode, mds, arg); 1853 last_inode = inode; 1854 1855 spin_lock(&session->s_cap_lock); 1856 p = p->next; 1857 if (!cap->ci) { 1858 doutc(cl, "finishing cap %p removal\n", cap); 1859 BUG_ON(cap->session != session); 1860 cap->session = NULL; 1861 list_del_init(&cap->session_caps); 1862 session->s_nr_caps--; 1863 atomic64_dec(&session->s_mdsc->metric.total_caps); 1864 if (cap->queue_release) 1865 __ceph_queue_cap_release(session, cap); 1866 else 1867 old_cap = cap; /* put_cap it w/o locks held */ 1868 } 1869 if (ret < 0) 1870 goto out; 1871 } 1872 ret = 0; 1873 out: 1874 session->s_cap_iterator = NULL; 1875 spin_unlock(&session->s_cap_lock); 1876 1877 iput(last_inode); 1878 if (old_cap) 1879 ceph_put_cap(session->s_mdsc, old_cap); 1880 1881 return ret; 1882 } 1883 1884 static int remove_session_caps_cb(struct inode *inode, int mds, void *arg) 1885 { 1886 struct ceph_inode_info *ci = ceph_inode(inode); 1887 struct ceph_client *cl = ceph_inode_to_client(inode); 1888 bool invalidate = false; 1889 struct ceph_cap *cap; 1890 int iputs = 0; 1891 1892 spin_lock(&ci->i_ceph_lock); 1893 cap = __get_cap_for_mds(ci, mds); 1894 if (cap) { 1895 doutc(cl, " removing cap %p, ci is %p, inode is %p\n", 1896 cap, ci, &ci->netfs.inode); 1897 1898 iputs = ceph_purge_inode_cap(inode, cap, &invalidate); 1899 } 1900 spin_unlock(&ci->i_ceph_lock); 1901 1902 if (cap) 1903 wake_up_all(&ci->i_cap_wq); 1904 if (invalidate) 1905 ceph_queue_invalidate(inode); 1906 while (iputs--) 1907 iput(inode); 1908 return 0; 1909 } 1910 1911 /* 1912 * caller must hold session s_mutex 1913 */ 1914 static void remove_session_caps(struct ceph_mds_session *session) 1915 { 1916 struct ceph_fs_client *fsc = session->s_mdsc->fsc; 1917 struct super_block *sb = fsc->sb; 1918 LIST_HEAD(dispose); 1919 1920 doutc(fsc->client, "on %p\n", session); 1921 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc); 1922 1923 wake_up_all(&fsc->mdsc->cap_flushing_wq); 1924 1925 spin_lock(&session->s_cap_lock); 1926 if (session->s_nr_caps > 0) { 1927 struct inode *inode; 1928 struct ceph_cap *cap, *prev = NULL; 1929 struct ceph_vino vino; 1930 /* 1931 * iterate_session_caps() skips inodes that are being 1932 * deleted, we need to wait until deletions are complete. 1933 * __wait_on_freeing_inode() is designed for the job, 1934 * but it is not exported, so use lookup inode function 1935 * to access it. 1936 */ 1937 while (!list_empty(&session->s_caps)) { 1938 cap = list_entry(session->s_caps.next, 1939 struct ceph_cap, session_caps); 1940 if (cap == prev) 1941 break; 1942 prev = cap; 1943 vino = cap->ci->i_vino; 1944 spin_unlock(&session->s_cap_lock); 1945 1946 inode = ceph_find_inode(sb, vino); 1947 iput(inode); 1948 1949 spin_lock(&session->s_cap_lock); 1950 } 1951 } 1952 1953 // drop cap expires and unlock s_cap_lock 1954 detach_cap_releases(session, &dispose); 1955 1956 BUG_ON(session->s_nr_caps > 0); 1957 BUG_ON(!list_empty(&session->s_cap_flushing)); 1958 spin_unlock(&session->s_cap_lock); 1959 dispose_cap_releases(session->s_mdsc, &dispose); 1960 } 1961 1962 enum { 1963 RECONNECT, 1964 RENEWCAPS, 1965 FORCE_RO, 1966 }; 1967 1968 /* 1969 * wake up any threads waiting on this session's caps. if the cap is 1970 * old (didn't get renewed on the client reconnect), remove it now. 1971 * 1972 * caller must hold s_mutex. 1973 */ 1974 static int wake_up_session_cb(struct inode *inode, int mds, void *arg) 1975 { 1976 struct ceph_inode_info *ci = ceph_inode(inode); 1977 unsigned long ev = (unsigned long)arg; 1978 1979 if (ev == RECONNECT) { 1980 spin_lock(&ci->i_ceph_lock); 1981 ci->i_wanted_max_size = 0; 1982 ci->i_requested_max_size = 0; 1983 spin_unlock(&ci->i_ceph_lock); 1984 } else if (ev == RENEWCAPS) { 1985 struct ceph_cap *cap; 1986 1987 spin_lock(&ci->i_ceph_lock); 1988 cap = __get_cap_for_mds(ci, mds); 1989 /* mds did not re-issue stale cap */ 1990 if (cap && cap->cap_gen < atomic_read(&cap->session->s_cap_gen)) 1991 cap->issued = cap->implemented = CEPH_CAP_PIN; 1992 spin_unlock(&ci->i_ceph_lock); 1993 } else if (ev == FORCE_RO) { 1994 } 1995 wake_up_all(&ci->i_cap_wq); 1996 return 0; 1997 } 1998 1999 static void wake_up_session_caps(struct ceph_mds_session *session, int ev) 2000 { 2001 struct ceph_client *cl = session->s_mdsc->fsc->client; 2002 2003 doutc(cl, "session %p mds%d\n", session, session->s_mds); 2004 ceph_iterate_session_caps(session, wake_up_session_cb, 2005 (void *)(unsigned long)ev); 2006 } 2007 2008 /* 2009 * Send periodic message to MDS renewing all currently held caps. The 2010 * ack will reset the expiration for all caps from this session. 2011 * 2012 * caller holds s_mutex 2013 */ 2014 static int send_renew_caps(struct ceph_mds_client *mdsc, 2015 struct ceph_mds_session *session) 2016 { 2017 struct ceph_client *cl = mdsc->fsc->client; 2018 struct ceph_msg *msg; 2019 int state; 2020 2021 if (time_after_eq(jiffies, session->s_cap_ttl) && 2022 time_after_eq(session->s_cap_ttl, session->s_renew_requested)) 2023 pr_info_client(cl, "mds%d caps stale\n", session->s_mds); 2024 session->s_renew_requested = jiffies; 2025 2026 /* do not try to renew caps until a recovering mds has reconnected 2027 * with its clients. */ 2028 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); 2029 if (state < CEPH_MDS_STATE_RECONNECT) { 2030 doutc(cl, "ignoring mds%d (%s)\n", session->s_mds, 2031 ceph_mds_state_name(state)); 2032 return 0; 2033 } 2034 2035 doutc(cl, "to mds%d (%s)\n", session->s_mds, 2036 ceph_mds_state_name(state)); 2037 msg = create_session_full_msg(mdsc, CEPH_SESSION_REQUEST_RENEWCAPS, 2038 ++session->s_renew_seq); 2039 if (IS_ERR(msg)) 2040 return PTR_ERR(msg); 2041 ceph_con_send(&session->s_con, msg); 2042 return 0; 2043 } 2044 2045 static int send_flushmsg_ack(struct ceph_mds_client *mdsc, 2046 struct ceph_mds_session *session, u64 seq) 2047 { 2048 struct ceph_client *cl = mdsc->fsc->client; 2049 struct ceph_msg *msg; 2050 2051 doutc(cl, "to mds%d (%s)s seq %lld\n", session->s_mds, 2052 ceph_session_state_name(session->s_state), seq); 2053 msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq); 2054 if (!msg) 2055 return -ENOMEM; 2056 ceph_con_send(&session->s_con, msg); 2057 return 0; 2058 } 2059 2060 2061 /* 2062 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 2063 * 2064 * Called under session->s_mutex 2065 */ 2066 static void renewed_caps(struct ceph_mds_client *mdsc, 2067 struct ceph_mds_session *session, int is_renew) 2068 { 2069 struct ceph_client *cl = mdsc->fsc->client; 2070 int was_stale; 2071 int wake = 0; 2072 2073 spin_lock(&session->s_cap_lock); 2074 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl); 2075 2076 session->s_cap_ttl = session->s_renew_requested + 2077 mdsc->mdsmap->m_session_timeout*HZ; 2078 2079 if (was_stale) { 2080 if (time_before(jiffies, session->s_cap_ttl)) { 2081 pr_info_client(cl, "mds%d caps renewed\n", 2082 session->s_mds); 2083 wake = 1; 2084 } else { 2085 pr_info_client(cl, "mds%d caps still stale\n", 2086 session->s_mds); 2087 } 2088 } 2089 doutc(cl, "mds%d ttl now %lu, was %s, now %s\n", session->s_mds, 2090 session->s_cap_ttl, was_stale ? "stale" : "fresh", 2091 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); 2092 spin_unlock(&session->s_cap_lock); 2093 2094 if (wake) 2095 wake_up_session_caps(session, RENEWCAPS); 2096 } 2097 2098 /* 2099 * send a session close request 2100 */ 2101 static int request_close_session(struct ceph_mds_session *session) 2102 { 2103 struct ceph_client *cl = session->s_mdsc->fsc->client; 2104 struct ceph_msg *msg; 2105 2106 doutc(cl, "mds%d state %s seq %lld\n", session->s_mds, 2107 ceph_session_state_name(session->s_state), session->s_seq); 2108 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE, 2109 session->s_seq); 2110 if (!msg) 2111 return -ENOMEM; 2112 ceph_con_send(&session->s_con, msg); 2113 return 1; 2114 } 2115 2116 /* 2117 * Called with s_mutex held. 2118 */ 2119 static int __close_session(struct ceph_mds_client *mdsc, 2120 struct ceph_mds_session *session) 2121 { 2122 if (session->s_state >= CEPH_MDS_SESSION_CLOSING) 2123 return 0; 2124 session->s_state = CEPH_MDS_SESSION_CLOSING; 2125 return request_close_session(session); 2126 } 2127 2128 static bool drop_negative_children(struct dentry *dentry) 2129 { 2130 struct dentry *child; 2131 bool all_negative = true; 2132 2133 if (!d_is_dir(dentry)) 2134 goto out; 2135 2136 spin_lock(&dentry->d_lock); 2137 hlist_for_each_entry(child, &dentry->d_children, d_sib) { 2138 if (d_really_is_positive(child)) { 2139 all_negative = false; 2140 break; 2141 } 2142 } 2143 spin_unlock(&dentry->d_lock); 2144 2145 if (all_negative) 2146 shrink_dcache_parent(dentry); 2147 out: 2148 return all_negative; 2149 } 2150 2151 /* 2152 * Trim old(er) caps. 2153 * 2154 * Because we can't cache an inode without one or more caps, we do 2155 * this indirectly: if a cap is unused, we prune its aliases, at which 2156 * point the inode will hopefully get dropped to. 2157 * 2158 * Yes, this is a bit sloppy. Our only real goal here is to respond to 2159 * memory pressure from the MDS, though, so it needn't be perfect. 2160 */ 2161 static int trim_caps_cb(struct inode *inode, int mds, void *arg) 2162 { 2163 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); 2164 struct ceph_client *cl = mdsc->fsc->client; 2165 int *remaining = arg; 2166 struct ceph_inode_info *ci = ceph_inode(inode); 2167 int used, wanted, oissued, mine; 2168 struct ceph_cap *cap; 2169 2170 if (*remaining <= 0) 2171 return -1; 2172 2173 spin_lock(&ci->i_ceph_lock); 2174 cap = __get_cap_for_mds(ci, mds); 2175 if (!cap) { 2176 spin_unlock(&ci->i_ceph_lock); 2177 return 0; 2178 } 2179 mine = cap->issued | cap->implemented; 2180 used = __ceph_caps_used(ci); 2181 wanted = __ceph_caps_file_wanted(ci); 2182 oissued = __ceph_caps_issued_other(ci, cap); 2183 2184 doutc(cl, "%p %llx.%llx cap %p mine %s oissued %s used %s wanted %s\n", 2185 inode, ceph_vinop(inode), cap, ceph_cap_string(mine), 2186 ceph_cap_string(oissued), ceph_cap_string(used), 2187 ceph_cap_string(wanted)); 2188 if (cap == ci->i_auth_cap) { 2189 if (ci->i_dirty_caps || ci->i_flushing_caps || 2190 !list_empty(&ci->i_cap_snaps)) 2191 goto out; 2192 if ((used | wanted) & CEPH_CAP_ANY_WR) 2193 goto out; 2194 /* Note: it's possible that i_filelock_ref becomes non-zero 2195 * after dropping auth caps. It doesn't hurt because reply 2196 * of lock mds request will re-add auth caps. */ 2197 if (atomic_read(&ci->i_filelock_ref) > 0) 2198 goto out; 2199 } 2200 /* The inode has cached pages, but it's no longer used. 2201 * we can safely drop it */ 2202 if (S_ISREG(inode->i_mode) && 2203 wanted == 0 && used == CEPH_CAP_FILE_CACHE && 2204 !(oissued & CEPH_CAP_FILE_CACHE)) { 2205 used = 0; 2206 oissued = 0; 2207 } 2208 if ((used | wanted) & ~oissued & mine) 2209 goto out; /* we need these caps */ 2210 2211 if (oissued) { 2212 /* we aren't the only cap.. just remove us */ 2213 ceph_remove_cap(mdsc, cap, true); 2214 (*remaining)--; 2215 } else { 2216 struct dentry *dentry; 2217 /* try dropping referring dentries */ 2218 spin_unlock(&ci->i_ceph_lock); 2219 dentry = d_find_any_alias(inode); 2220 if (dentry && drop_negative_children(dentry)) { 2221 int count; 2222 dput(dentry); 2223 d_prune_aliases(inode); 2224 count = atomic_read(&inode->i_count); 2225 if (count == 1) 2226 (*remaining)--; 2227 doutc(cl, "%p %llx.%llx cap %p pruned, count now %d\n", 2228 inode, ceph_vinop(inode), cap, count); 2229 } else { 2230 dput(dentry); 2231 } 2232 return 0; 2233 } 2234 2235 out: 2236 spin_unlock(&ci->i_ceph_lock); 2237 return 0; 2238 } 2239 2240 /* 2241 * Trim session cap count down to some max number. 2242 */ 2243 int ceph_trim_caps(struct ceph_mds_client *mdsc, 2244 struct ceph_mds_session *session, 2245 int max_caps) 2246 { 2247 struct ceph_client *cl = mdsc->fsc->client; 2248 int trim_caps = session->s_nr_caps - max_caps; 2249 2250 doutc(cl, "mds%d start: %d / %d, trim %d\n", session->s_mds, 2251 session->s_nr_caps, max_caps, trim_caps); 2252 if (trim_caps > 0) { 2253 int remaining = trim_caps; 2254 2255 ceph_iterate_session_caps(session, trim_caps_cb, &remaining); 2256 doutc(cl, "mds%d done: %d / %d, trimmed %d\n", 2257 session->s_mds, session->s_nr_caps, max_caps, 2258 trim_caps - remaining); 2259 } 2260 2261 ceph_flush_session_cap_releases(mdsc, session); 2262 return 0; 2263 } 2264 2265 static int check_caps_flush(struct ceph_mds_client *mdsc, 2266 u64 want_flush_tid) 2267 { 2268 struct ceph_client *cl = mdsc->fsc->client; 2269 int ret = 1; 2270 2271 spin_lock(&mdsc->cap_dirty_lock); 2272 if (!list_empty(&mdsc->cap_flush_list)) { 2273 struct ceph_cap_flush *cf = 2274 list_first_entry(&mdsc->cap_flush_list, 2275 struct ceph_cap_flush, g_list); 2276 if (cf->tid <= want_flush_tid) { 2277 doutc(cl, "still flushing tid %llu <= %llu\n", 2278 cf->tid, want_flush_tid); 2279 ret = 0; 2280 } 2281 } 2282 spin_unlock(&mdsc->cap_dirty_lock); 2283 return ret; 2284 } 2285 2286 /* 2287 * flush all dirty inode data to disk. 2288 * 2289 * returns true if we've flushed through want_flush_tid 2290 */ 2291 static void wait_caps_flush(struct ceph_mds_client *mdsc, 2292 u64 want_flush_tid) 2293 { 2294 struct ceph_client *cl = mdsc->fsc->client; 2295 2296 doutc(cl, "want %llu\n", want_flush_tid); 2297 2298 wait_event(mdsc->cap_flushing_wq, 2299 check_caps_flush(mdsc, want_flush_tid)); 2300 2301 doutc(cl, "ok, flushed thru %llu\n", want_flush_tid); 2302 } 2303 2304 /* 2305 * called under s_mutex 2306 */ 2307 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 2308 struct ceph_mds_session *session) 2309 { 2310 struct ceph_client *cl = mdsc->fsc->client; 2311 struct ceph_msg *msg = NULL; 2312 struct ceph_mds_cap_release *head; 2313 struct ceph_mds_cap_item *item; 2314 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; 2315 struct ceph_cap *cap; 2316 LIST_HEAD(tmp_list); 2317 int num_cap_releases; 2318 __le32 barrier, *cap_barrier; 2319 2320 down_read(&osdc->lock); 2321 barrier = cpu_to_le32(osdc->epoch_barrier); 2322 up_read(&osdc->lock); 2323 2324 spin_lock(&session->s_cap_lock); 2325 again: 2326 list_splice_init(&session->s_cap_releases, &tmp_list); 2327 num_cap_releases = session->s_num_cap_releases; 2328 session->s_num_cap_releases = 0; 2329 spin_unlock(&session->s_cap_lock); 2330 2331 while (!list_empty(&tmp_list)) { 2332 if (!msg) { 2333 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, 2334 PAGE_SIZE, GFP_NOFS, false); 2335 if (!msg) 2336 goto out_err; 2337 head = msg->front.iov_base; 2338 head->num = cpu_to_le32(0); 2339 msg->front.iov_len = sizeof(*head); 2340 2341 msg->hdr.version = cpu_to_le16(2); 2342 msg->hdr.compat_version = cpu_to_le16(1); 2343 } 2344 2345 cap = list_first_entry(&tmp_list, struct ceph_cap, 2346 session_caps); 2347 list_del(&cap->session_caps); 2348 num_cap_releases--; 2349 2350 head = msg->front.iov_base; 2351 put_unaligned_le32(get_unaligned_le32(&head->num) + 1, 2352 &head->num); 2353 item = msg->front.iov_base + msg->front.iov_len; 2354 item->ino = cpu_to_le64(cap->cap_ino); 2355 item->cap_id = cpu_to_le64(cap->cap_id); 2356 item->migrate_seq = cpu_to_le32(cap->mseq); 2357 item->issue_seq = cpu_to_le32(cap->issue_seq); 2358 msg->front.iov_len += sizeof(*item); 2359 2360 ceph_put_cap(mdsc, cap); 2361 2362 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { 2363 // Append cap_barrier field 2364 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2365 *cap_barrier = barrier; 2366 msg->front.iov_len += sizeof(*cap_barrier); 2367 2368 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2369 doutc(cl, "mds%d %p\n", session->s_mds, msg); 2370 ceph_con_send(&session->s_con, msg); 2371 msg = NULL; 2372 } 2373 } 2374 2375 BUG_ON(num_cap_releases != 0); 2376 2377 spin_lock(&session->s_cap_lock); 2378 if (!list_empty(&session->s_cap_releases)) 2379 goto again; 2380 spin_unlock(&session->s_cap_lock); 2381 2382 if (msg) { 2383 // Append cap_barrier field 2384 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2385 *cap_barrier = barrier; 2386 msg->front.iov_len += sizeof(*cap_barrier); 2387 2388 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2389 doutc(cl, "mds%d %p\n", session->s_mds, msg); 2390 ceph_con_send(&session->s_con, msg); 2391 } 2392 return; 2393 out_err: 2394 pr_err_client(cl, "mds%d, failed to allocate message\n", 2395 session->s_mds); 2396 spin_lock(&session->s_cap_lock); 2397 list_splice(&tmp_list, &session->s_cap_releases); 2398 session->s_num_cap_releases += num_cap_releases; 2399 spin_unlock(&session->s_cap_lock); 2400 } 2401 2402 static void ceph_cap_release_work(struct work_struct *work) 2403 { 2404 struct ceph_mds_session *session = 2405 container_of(work, struct ceph_mds_session, s_cap_release_work); 2406 2407 mutex_lock(&session->s_mutex); 2408 if (session->s_state == CEPH_MDS_SESSION_OPEN || 2409 session->s_state == CEPH_MDS_SESSION_HUNG) 2410 ceph_send_cap_releases(session->s_mdsc, session); 2411 mutex_unlock(&session->s_mutex); 2412 ceph_put_mds_session(session); 2413 } 2414 2415 void ceph_flush_session_cap_releases(struct ceph_mds_client *mdsc, 2416 struct ceph_mds_session *session) 2417 { 2418 struct ceph_client *cl = mdsc->fsc->client; 2419 if (mdsc->stopping) 2420 return; 2421 2422 ceph_get_mds_session(session); 2423 if (queue_work(mdsc->fsc->cap_wq, 2424 &session->s_cap_release_work)) { 2425 doutc(cl, "cap release work queued\n"); 2426 } else { 2427 ceph_put_mds_session(session); 2428 doutc(cl, "failed to queue cap release work\n"); 2429 } 2430 } 2431 2432 /* 2433 * caller holds session->s_cap_lock 2434 */ 2435 void __ceph_queue_cap_release(struct ceph_mds_session *session, 2436 struct ceph_cap *cap) 2437 { 2438 list_add_tail(&cap->session_caps, &session->s_cap_releases); 2439 session->s_num_cap_releases++; 2440 2441 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE)) 2442 ceph_flush_session_cap_releases(session->s_mdsc, session); 2443 } 2444 2445 static void ceph_cap_reclaim_work(struct work_struct *work) 2446 { 2447 struct ceph_mds_client *mdsc = 2448 container_of(work, struct ceph_mds_client, cap_reclaim_work); 2449 int ret = ceph_trim_dentries(mdsc); 2450 if (ret == -EAGAIN) 2451 ceph_queue_cap_reclaim_work(mdsc); 2452 } 2453 2454 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc) 2455 { 2456 struct ceph_client *cl = mdsc->fsc->client; 2457 if (mdsc->stopping) 2458 return; 2459 2460 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) { 2461 doutc(cl, "caps reclaim work queued\n"); 2462 } else { 2463 doutc(cl, "failed to queue caps release work\n"); 2464 } 2465 } 2466 2467 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr) 2468 { 2469 int val; 2470 if (!nr) 2471 return; 2472 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending); 2473 if ((val % CEPH_CAPS_PER_RELEASE) < nr) { 2474 atomic_set(&mdsc->cap_reclaim_pending, 0); 2475 ceph_queue_cap_reclaim_work(mdsc); 2476 } 2477 } 2478 2479 void ceph_queue_cap_unlink_work(struct ceph_mds_client *mdsc) 2480 { 2481 struct ceph_client *cl = mdsc->fsc->client; 2482 if (mdsc->stopping) 2483 return; 2484 2485 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_unlink_work)) { 2486 doutc(cl, "caps unlink work queued\n"); 2487 } else { 2488 doutc(cl, "failed to queue caps unlink work\n"); 2489 } 2490 } 2491 2492 static void ceph_cap_unlink_work(struct work_struct *work) 2493 { 2494 struct ceph_mds_client *mdsc = 2495 container_of(work, struct ceph_mds_client, cap_unlink_work); 2496 struct ceph_client *cl = mdsc->fsc->client; 2497 2498 doutc(cl, "begin\n"); 2499 spin_lock(&mdsc->cap_delay_lock); 2500 while (!list_empty(&mdsc->cap_unlink_delay_list)) { 2501 struct ceph_inode_info *ci; 2502 struct inode *inode; 2503 2504 ci = list_first_entry(&mdsc->cap_unlink_delay_list, 2505 struct ceph_inode_info, 2506 i_cap_delay_list); 2507 list_del_init(&ci->i_cap_delay_list); 2508 2509 inode = igrab(&ci->netfs.inode); 2510 if (inode) { 2511 spin_unlock(&mdsc->cap_delay_lock); 2512 doutc(cl, "on %p %llx.%llx\n", inode, 2513 ceph_vinop(inode)); 2514 ceph_check_caps(ci, CHECK_CAPS_FLUSH); 2515 iput(inode); 2516 spin_lock(&mdsc->cap_delay_lock); 2517 } 2518 } 2519 spin_unlock(&mdsc->cap_delay_lock); 2520 doutc(cl, "done\n"); 2521 } 2522 2523 /* 2524 * requests 2525 */ 2526 2527 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, 2528 struct inode *dir) 2529 { 2530 struct ceph_inode_info *ci = ceph_inode(dir); 2531 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; 2532 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options; 2533 size_t size = sizeof(struct ceph_mds_reply_dir_entry); 2534 unsigned int num_entries; 2535 int order; 2536 2537 spin_lock(&ci->i_ceph_lock); 2538 num_entries = ci->i_files + ci->i_subdirs; 2539 spin_unlock(&ci->i_ceph_lock); 2540 num_entries = max(num_entries, 1U); 2541 num_entries = min(num_entries, opt->max_readdir); 2542 2543 order = get_order(size * num_entries); 2544 while (order >= 0) { 2545 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL | 2546 __GFP_NOWARN | 2547 __GFP_ZERO, 2548 order); 2549 if (rinfo->dir_entries) 2550 break; 2551 order--; 2552 } 2553 if (!rinfo->dir_entries) 2554 return -ENOMEM; 2555 2556 num_entries = (PAGE_SIZE << order) / size; 2557 num_entries = min(num_entries, opt->max_readdir); 2558 2559 rinfo->dir_buf_size = PAGE_SIZE << order; 2560 req->r_num_caps = num_entries + 1; 2561 req->r_args.readdir.max_entries = cpu_to_le32(num_entries); 2562 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes); 2563 return 0; 2564 } 2565 2566 /* 2567 * Create an mds request. 2568 */ 2569 struct ceph_mds_request * 2570 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) 2571 { 2572 struct ceph_mds_request *req; 2573 2574 req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS); 2575 if (!req) 2576 return ERR_PTR(-ENOMEM); 2577 2578 mutex_init(&req->r_fill_mutex); 2579 req->r_mdsc = mdsc; 2580 req->r_started = jiffies; 2581 req->r_start_latency = ktime_get(); 2582 req->r_resend_mds = -1; 2583 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 2584 INIT_LIST_HEAD(&req->r_unsafe_target_item); 2585 req->r_fmode = -1; 2586 req->r_feature_needed = -1; 2587 kref_init(&req->r_kref); 2588 RB_CLEAR_NODE(&req->r_node); 2589 INIT_LIST_HEAD(&req->r_wait); 2590 init_completion(&req->r_completion); 2591 init_completion(&req->r_safe_completion); 2592 INIT_LIST_HEAD(&req->r_unsafe_item); 2593 2594 ktime_get_coarse_real_ts64(&req->r_stamp); 2595 2596 req->r_op = op; 2597 req->r_direct_mode = mode; 2598 return req; 2599 } 2600 2601 /* 2602 * return oldest (lowest) request, tid in request tree, 0 if none. 2603 * 2604 * called under mdsc->mutex. 2605 */ 2606 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) 2607 { 2608 if (RB_EMPTY_ROOT(&mdsc->request_tree)) 2609 return NULL; 2610 return rb_entry(rb_first(&mdsc->request_tree), 2611 struct ceph_mds_request, r_node); 2612 } 2613 2614 static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) 2615 { 2616 return mdsc->oldest_tid; 2617 } 2618 2619 #if IS_ENABLED(CONFIG_FS_ENCRYPTION) 2620 static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen) 2621 { 2622 struct inode *dir = req->r_parent; 2623 struct dentry *dentry = req->r_dentry; 2624 const struct qstr *name = req->r_dname; 2625 u8 *cryptbuf = NULL; 2626 u32 len = 0; 2627 int ret = 0; 2628 2629 /* only encode if we have parent and dentry */ 2630 if (!dir || !dentry) 2631 goto success; 2632 2633 /* No-op unless this is encrypted */ 2634 if (!IS_ENCRYPTED(dir)) 2635 goto success; 2636 2637 ret = ceph_fscrypt_prepare_readdir(dir); 2638 if (ret < 0) 2639 return ERR_PTR(ret); 2640 2641 /* No key? Just ignore it. */ 2642 if (!fscrypt_has_encryption_key(dir)) 2643 goto success; 2644 2645 if (!name) 2646 name = &dentry->d_name; 2647 2648 if (!fscrypt_fname_encrypted_size(dir, name->len, NAME_MAX, &len)) { 2649 WARN_ON_ONCE(1); 2650 return ERR_PTR(-ENAMETOOLONG); 2651 } 2652 2653 /* No need to append altname if name is short enough */ 2654 if (len <= CEPH_NOHASH_NAME_MAX) { 2655 len = 0; 2656 goto success; 2657 } 2658 2659 cryptbuf = kmalloc(len, GFP_KERNEL); 2660 if (!cryptbuf) 2661 return ERR_PTR(-ENOMEM); 2662 2663 ret = fscrypt_fname_encrypt(dir, name, cryptbuf, len); 2664 if (ret) { 2665 kfree(cryptbuf); 2666 return ERR_PTR(ret); 2667 } 2668 success: 2669 *plen = len; 2670 return cryptbuf; 2671 } 2672 #else 2673 static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen) 2674 { 2675 *plen = 0; 2676 return NULL; 2677 } 2678 #endif 2679 2680 /** 2681 * ceph_mdsc_build_path - build a path string to a given dentry 2682 * @mdsc: mds client 2683 * @dentry: dentry to which path should be built 2684 * @path_info: output path, length, base ino+snap, and freepath ownership flag 2685 * @for_wire: is this path going to be sent to the MDS? 2686 * 2687 * Build a string that represents the path to the dentry. This is mostly called 2688 * for two different purposes: 2689 * 2690 * 1) we need to build a path string to send to the MDS (for_wire == true) 2691 * 2) we need a path string for local presentation (e.g. debugfs) 2692 * (for_wire == false) 2693 * 2694 * The path is built in reverse, starting with the dentry. Walk back up toward 2695 * the root, building the path until the first non-snapped inode is reached 2696 * (for_wire) or the root inode is reached (!for_wire). 2697 * 2698 * Encode hidden .snap dirs as a double /, i.e. 2699 * foo/.snap/bar -> foo//bar 2700 */ 2701 char *ceph_mdsc_build_path(struct ceph_mds_client *mdsc, struct dentry *dentry, 2702 struct ceph_path_info *path_info, int for_wire) 2703 { 2704 struct ceph_client *cl = mdsc->fsc->client; 2705 struct dentry *cur; 2706 struct inode *inode; 2707 char *path; 2708 int pos; 2709 unsigned seq; 2710 u64 base; 2711 2712 if (!dentry) 2713 return ERR_PTR(-EINVAL); 2714 2715 path = __getname(); 2716 if (!path) 2717 return ERR_PTR(-ENOMEM); 2718 retry: 2719 pos = PATH_MAX - 1; 2720 path[pos] = '\0'; 2721 2722 seq = read_seqbegin(&rename_lock); 2723 cur = dget(dentry); 2724 for (;;) { 2725 struct dentry *parent; 2726 2727 spin_lock(&cur->d_lock); 2728 inode = d_inode(cur); 2729 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 2730 doutc(cl, "path+%d: %p SNAPDIR\n", pos, cur); 2731 spin_unlock(&cur->d_lock); 2732 parent = dget_parent(cur); 2733 } else if (for_wire && inode && dentry != cur && 2734 ceph_snap(inode) == CEPH_NOSNAP) { 2735 spin_unlock(&cur->d_lock); 2736 pos++; /* get rid of any prepended '/' */ 2737 break; 2738 } else if (!for_wire || !IS_ENCRYPTED(d_inode(cur->d_parent))) { 2739 pos -= cur->d_name.len; 2740 if (pos < 0) { 2741 spin_unlock(&cur->d_lock); 2742 break; 2743 } 2744 memcpy(path + pos, cur->d_name.name, cur->d_name.len); 2745 spin_unlock(&cur->d_lock); 2746 parent = dget_parent(cur); 2747 } else { 2748 int len, ret; 2749 char buf[NAME_MAX]; 2750 2751 /* 2752 * Proactively copy name into buf, in case we need to 2753 * present it as-is. 2754 */ 2755 memcpy(buf, cur->d_name.name, cur->d_name.len); 2756 len = cur->d_name.len; 2757 spin_unlock(&cur->d_lock); 2758 parent = dget_parent(cur); 2759 2760 ret = ceph_fscrypt_prepare_readdir(d_inode(parent)); 2761 if (ret < 0) { 2762 dput(parent); 2763 dput(cur); 2764 return ERR_PTR(ret); 2765 } 2766 2767 if (fscrypt_has_encryption_key(d_inode(parent))) { 2768 len = ceph_encode_encrypted_dname(d_inode(parent), 2769 buf, len); 2770 if (len < 0) { 2771 dput(parent); 2772 dput(cur); 2773 return ERR_PTR(len); 2774 } 2775 } 2776 pos -= len; 2777 if (pos < 0) { 2778 dput(parent); 2779 break; 2780 } 2781 memcpy(path + pos, buf, len); 2782 } 2783 dput(cur); 2784 cur = parent; 2785 2786 /* Are we at the root? */ 2787 if (IS_ROOT(cur)) 2788 break; 2789 2790 /* Are we out of buffer? */ 2791 if (--pos < 0) 2792 break; 2793 2794 path[pos] = '/'; 2795 } 2796 inode = d_inode(cur); 2797 base = inode ? ceph_ino(inode) : 0; 2798 dput(cur); 2799 2800 if (read_seqretry(&rename_lock, seq)) 2801 goto retry; 2802 2803 if (pos < 0) { 2804 /* 2805 * The path is longer than PATH_MAX and this function 2806 * cannot ever succeed. Creating paths that long is 2807 * possible with Ceph, but Linux cannot use them. 2808 */ 2809 return ERR_PTR(-ENAMETOOLONG); 2810 } 2811 2812 /* Initialize the output structure */ 2813 memset(path_info, 0, sizeof(*path_info)); 2814 2815 path_info->vino.ino = base; 2816 path_info->pathlen = PATH_MAX - 1 - pos; 2817 path_info->path = path + pos; 2818 path_info->freepath = true; 2819 2820 /* Set snap from dentry if available */ 2821 if (d_inode(dentry)) 2822 path_info->vino.snap = ceph_snap(d_inode(dentry)); 2823 else 2824 path_info->vino.snap = CEPH_NOSNAP; 2825 2826 doutc(cl, "on %p %d built %llx '%.*s'\n", dentry, d_count(dentry), 2827 base, PATH_MAX - 1 - pos, path + pos); 2828 return path + pos; 2829 } 2830 2831 static int build_dentry_path(struct ceph_mds_client *mdsc, struct dentry *dentry, 2832 struct inode *dir, struct ceph_path_info *path_info, 2833 bool parent_locked) 2834 { 2835 char *path; 2836 2837 rcu_read_lock(); 2838 if (!dir) 2839 dir = d_inode_rcu(dentry->d_parent); 2840 if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP && 2841 !IS_ENCRYPTED(dir)) { 2842 path_info->vino.ino = ceph_ino(dir); 2843 path_info->vino.snap = ceph_snap(dir); 2844 rcu_read_unlock(); 2845 path_info->path = dentry->d_name.name; 2846 path_info->pathlen = dentry->d_name.len; 2847 path_info->freepath = false; 2848 return 0; 2849 } 2850 rcu_read_unlock(); 2851 path = ceph_mdsc_build_path(mdsc, dentry, path_info, 1); 2852 if (IS_ERR(path)) 2853 return PTR_ERR(path); 2854 /* 2855 * ceph_mdsc_build_path already fills path_info, including snap handling. 2856 */ 2857 return 0; 2858 } 2859 2860 static int build_inode_path(struct inode *inode, struct ceph_path_info *path_info) 2861 { 2862 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); 2863 struct dentry *dentry; 2864 char *path; 2865 2866 if (ceph_snap(inode) == CEPH_NOSNAP) { 2867 path_info->vino.ino = ceph_ino(inode); 2868 path_info->vino.snap = ceph_snap(inode); 2869 path_info->pathlen = 0; 2870 path_info->freepath = false; 2871 return 0; 2872 } 2873 dentry = d_find_alias(inode); 2874 path = ceph_mdsc_build_path(mdsc, dentry, path_info, 1); 2875 dput(dentry); 2876 if (IS_ERR(path)) 2877 return PTR_ERR(path); 2878 /* 2879 * ceph_mdsc_build_path already fills path_info, including snap from dentry. 2880 * Override with inode's snap since that's what this function is for. 2881 */ 2882 path_info->vino.snap = ceph_snap(inode); 2883 return 0; 2884 } 2885 2886 /* 2887 * request arguments may be specified via an inode *, a dentry *, or 2888 * an explicit ino+path. 2889 */ 2890 static int set_request_path_attr(struct ceph_mds_client *mdsc, struct inode *rinode, 2891 struct dentry *rdentry, struct inode *rdiri, 2892 const char *rpath, u64 rino, 2893 struct ceph_path_info *path_info, 2894 bool parent_locked) 2895 { 2896 struct ceph_client *cl = mdsc->fsc->client; 2897 int r = 0; 2898 2899 /* Initialize the output structure */ 2900 memset(path_info, 0, sizeof(*path_info)); 2901 2902 if (rinode) { 2903 r = build_inode_path(rinode, path_info); 2904 doutc(cl, " inode %p %llx.%llx\n", rinode, ceph_ino(rinode), 2905 ceph_snap(rinode)); 2906 } else if (rdentry) { 2907 r = build_dentry_path(mdsc, rdentry, rdiri, path_info, parent_locked); 2908 doutc(cl, " dentry %p %llx/%.*s\n", rdentry, path_info->vino.ino, 2909 path_info->pathlen, path_info->path); 2910 } else if (rpath || rino) { 2911 path_info->vino.ino = rino; 2912 path_info->vino.snap = CEPH_NOSNAP; 2913 path_info->path = rpath; 2914 path_info->pathlen = rpath ? strlen(rpath) : 0; 2915 path_info->freepath = false; 2916 2917 doutc(cl, " path %.*s\n", path_info->pathlen, rpath); 2918 } 2919 2920 return r; 2921 } 2922 2923 static void encode_mclientrequest_tail(void **p, 2924 const struct ceph_mds_request *req) 2925 { 2926 struct ceph_timespec ts; 2927 int i; 2928 2929 ceph_encode_timespec64(&ts, &req->r_stamp); 2930 ceph_encode_copy(p, &ts, sizeof(ts)); 2931 2932 /* v4: gid_list */ 2933 ceph_encode_32(p, req->r_cred->group_info->ngroups); 2934 for (i = 0; i < req->r_cred->group_info->ngroups; i++) 2935 ceph_encode_64(p, from_kgid(&init_user_ns, 2936 req->r_cred->group_info->gid[i])); 2937 2938 /* v5: altname */ 2939 ceph_encode_32(p, req->r_altname_len); 2940 ceph_encode_copy(p, req->r_altname, req->r_altname_len); 2941 2942 /* v6: fscrypt_auth and fscrypt_file */ 2943 if (req->r_fscrypt_auth) { 2944 u32 authlen = ceph_fscrypt_auth_len(req->r_fscrypt_auth); 2945 2946 ceph_encode_32(p, authlen); 2947 ceph_encode_copy(p, req->r_fscrypt_auth, authlen); 2948 } else { 2949 ceph_encode_32(p, 0); 2950 } 2951 if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags)) { 2952 ceph_encode_32(p, sizeof(__le64)); 2953 ceph_encode_64(p, req->r_fscrypt_file); 2954 } else { 2955 ceph_encode_32(p, 0); 2956 } 2957 } 2958 2959 static inline u16 mds_supported_head_version(struct ceph_mds_session *session) 2960 { 2961 if (!test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD, &session->s_features)) 2962 return 1; 2963 2964 if (!test_bit(CEPHFS_FEATURE_HAS_OWNER_UIDGID, &session->s_features)) 2965 return 2; 2966 2967 return CEPH_MDS_REQUEST_HEAD_VERSION; 2968 } 2969 2970 static struct ceph_mds_request_head_legacy * 2971 find_legacy_request_head(void *p, u64 features) 2972 { 2973 bool legacy = !(features & CEPH_FEATURE_FS_BTIME); 2974 struct ceph_mds_request_head *head; 2975 2976 if (legacy) 2977 return (struct ceph_mds_request_head_legacy *)p; 2978 head = (struct ceph_mds_request_head *)p; 2979 return (struct ceph_mds_request_head_legacy *)&head->oldest_client_tid; 2980 } 2981 2982 /* 2983 * called under mdsc->mutex 2984 */ 2985 static struct ceph_msg *create_request_message(struct ceph_mds_session *session, 2986 struct ceph_mds_request *req, 2987 bool drop_cap_releases) 2988 { 2989 int mds = session->s_mds; 2990 struct ceph_mds_client *mdsc = session->s_mdsc; 2991 struct ceph_client *cl = mdsc->fsc->client; 2992 struct ceph_msg *msg; 2993 struct ceph_mds_request_head_legacy *lhead; 2994 struct ceph_path_info path_info1 = {0}; 2995 struct ceph_path_info path_info2 = {0}; 2996 struct dentry *old_dentry = NULL; 2997 int len; 2998 u16 releases; 2999 void *p, *end; 3000 int ret; 3001 bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME); 3002 u16 request_head_version = mds_supported_head_version(session); 3003 kuid_t caller_fsuid = req->r_cred->fsuid; 3004 kgid_t caller_fsgid = req->r_cred->fsgid; 3005 bool parent_locked = test_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); 3006 3007 ret = set_request_path_attr(mdsc, req->r_inode, req->r_dentry, 3008 req->r_parent, req->r_path1, req->r_ino1.ino, 3009 &path_info1, parent_locked); 3010 if (ret < 0) { 3011 msg = ERR_PTR(ret); 3012 goto out; 3013 } 3014 3015 /* 3016 * When the parent directory's i_rwsem is *not* locked, req->r_parent may 3017 * have become stale (e.g. after a concurrent rename) between the time the 3018 * dentry was looked up and now. If we detect that the stored r_parent 3019 * does not match the inode number we just encoded for the request, switch 3020 * to the correct inode so that the MDS receives a valid parent reference. 3021 */ 3022 if (!parent_locked && req->r_parent && path_info1.vino.ino && 3023 ceph_ino(req->r_parent) != path_info1.vino.ino) { 3024 struct inode *old_parent = req->r_parent; 3025 struct inode *correct_dir = ceph_get_inode(mdsc->fsc->sb, path_info1.vino, NULL); 3026 if (!IS_ERR(correct_dir)) { 3027 WARN_ONCE(1, "ceph: r_parent mismatch (had %llx wanted %llx) - updating\n", 3028 ceph_ino(old_parent), path_info1.vino.ino); 3029 /* 3030 * Transfer CEPH_CAP_PIN from the old parent to the new one. 3031 * The pin was taken earlier in ceph_mdsc_submit_request(). 3032 */ 3033 ceph_put_cap_refs(ceph_inode(old_parent), CEPH_CAP_PIN); 3034 iput(old_parent); 3035 req->r_parent = correct_dir; 3036 ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 3037 } 3038 } 3039 3040 /* If r_old_dentry is set, then assume that its parent is locked */ 3041 if (req->r_old_dentry && 3042 !(req->r_old_dentry->d_flags & DCACHE_DISCONNECTED)) 3043 old_dentry = req->r_old_dentry; 3044 ret = set_request_path_attr(mdsc, NULL, old_dentry, 3045 req->r_old_dentry_dir, 3046 req->r_path2, req->r_ino2.ino, 3047 &path_info2, true); 3048 if (ret < 0) { 3049 msg = ERR_PTR(ret); 3050 goto out_free1; 3051 } 3052 3053 req->r_altname = get_fscrypt_altname(req, &req->r_altname_len); 3054 if (IS_ERR(req->r_altname)) { 3055 msg = ERR_CAST(req->r_altname); 3056 req->r_altname = NULL; 3057 goto out_free2; 3058 } 3059 3060 /* 3061 * For old cephs without supporting the 32bit retry/fwd feature 3062 * it will copy the raw memories directly when decoding the 3063 * requests. While new cephs will decode the head depending the 3064 * version member, so we need to make sure it will be compatible 3065 * with them both. 3066 */ 3067 if (legacy) 3068 len = sizeof(struct ceph_mds_request_head_legacy); 3069 else if (request_head_version == 1) 3070 len = offsetofend(struct ceph_mds_request_head, args); 3071 else if (request_head_version == 2) 3072 len = offsetofend(struct ceph_mds_request_head, ext_num_fwd); 3073 else 3074 len = sizeof(struct ceph_mds_request_head); 3075 3076 /* filepaths */ 3077 len += 2 * (1 + sizeof(u32) + sizeof(u64)); 3078 len += path_info1.pathlen + path_info2.pathlen; 3079 3080 /* cap releases */ 3081 len += sizeof(struct ceph_mds_request_release) * 3082 (!!req->r_inode_drop + !!req->r_dentry_drop + 3083 !!req->r_old_inode_drop + !!req->r_old_dentry_drop); 3084 3085 if (req->r_dentry_drop) 3086 len += path_info1.pathlen; 3087 if (req->r_old_dentry_drop) 3088 len += path_info2.pathlen; 3089 3090 /* MClientRequest tail */ 3091 3092 /* req->r_stamp */ 3093 len += sizeof(struct ceph_timespec); 3094 3095 /* gid list */ 3096 len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups); 3097 3098 /* alternate name */ 3099 len += sizeof(u32) + req->r_altname_len; 3100 3101 /* fscrypt_auth */ 3102 len += sizeof(u32); // fscrypt_auth 3103 if (req->r_fscrypt_auth) 3104 len += ceph_fscrypt_auth_len(req->r_fscrypt_auth); 3105 3106 /* fscrypt_file */ 3107 len += sizeof(u32); 3108 if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags)) 3109 len += sizeof(__le64); 3110 3111 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false); 3112 if (!msg) { 3113 msg = ERR_PTR(-ENOMEM); 3114 goto out_free2; 3115 } 3116 3117 msg->hdr.tid = cpu_to_le64(req->r_tid); 3118 3119 lhead = find_legacy_request_head(msg->front.iov_base, 3120 session->s_con.peer_features); 3121 3122 if ((req->r_mnt_idmap != &nop_mnt_idmap) && 3123 !test_bit(CEPHFS_FEATURE_HAS_OWNER_UIDGID, &session->s_features)) { 3124 WARN_ON_ONCE(!IS_CEPH_MDS_OP_NEWINODE(req->r_op)); 3125 3126 if (enable_unsafe_idmap) { 3127 pr_warn_once_client(cl, 3128 "idmapped mount is used and CEPHFS_FEATURE_HAS_OWNER_UIDGID" 3129 " is not supported by MDS. UID/GID-based restrictions may" 3130 " not work properly.\n"); 3131 3132 caller_fsuid = from_vfsuid(req->r_mnt_idmap, &init_user_ns, 3133 VFSUIDT_INIT(req->r_cred->fsuid)); 3134 caller_fsgid = from_vfsgid(req->r_mnt_idmap, &init_user_ns, 3135 VFSGIDT_INIT(req->r_cred->fsgid)); 3136 } else { 3137 pr_err_ratelimited_client(cl, 3138 "idmapped mount is used and CEPHFS_FEATURE_HAS_OWNER_UIDGID" 3139 " is not supported by MDS. Fail request with -EIO.\n"); 3140 3141 ret = -EIO; 3142 goto out_err; 3143 } 3144 } 3145 3146 /* 3147 * The ceph_mds_request_head_legacy didn't contain a version field, and 3148 * one was added when we moved the message version from 3->4. 3149 */ 3150 if (legacy) { 3151 msg->hdr.version = cpu_to_le16(3); 3152 p = msg->front.iov_base + sizeof(*lhead); 3153 } else if (request_head_version == 1) { 3154 struct ceph_mds_request_head *nhead = msg->front.iov_base; 3155 3156 msg->hdr.version = cpu_to_le16(4); 3157 nhead->version = cpu_to_le16(1); 3158 p = msg->front.iov_base + offsetofend(struct ceph_mds_request_head, args); 3159 } else if (request_head_version == 2) { 3160 struct ceph_mds_request_head *nhead = msg->front.iov_base; 3161 3162 msg->hdr.version = cpu_to_le16(6); 3163 nhead->version = cpu_to_le16(2); 3164 3165 p = msg->front.iov_base + offsetofend(struct ceph_mds_request_head, ext_num_fwd); 3166 } else { 3167 struct ceph_mds_request_head *nhead = msg->front.iov_base; 3168 kuid_t owner_fsuid; 3169 kgid_t owner_fsgid; 3170 3171 msg->hdr.version = cpu_to_le16(6); 3172 nhead->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION); 3173 nhead->struct_len = cpu_to_le32(sizeof(struct ceph_mds_request_head)); 3174 3175 if (IS_CEPH_MDS_OP_NEWINODE(req->r_op)) { 3176 owner_fsuid = from_vfsuid(req->r_mnt_idmap, &init_user_ns, 3177 VFSUIDT_INIT(req->r_cred->fsuid)); 3178 owner_fsgid = from_vfsgid(req->r_mnt_idmap, &init_user_ns, 3179 VFSGIDT_INIT(req->r_cred->fsgid)); 3180 nhead->owner_uid = cpu_to_le32(from_kuid(&init_user_ns, owner_fsuid)); 3181 nhead->owner_gid = cpu_to_le32(from_kgid(&init_user_ns, owner_fsgid)); 3182 } else { 3183 nhead->owner_uid = cpu_to_le32(-1); 3184 nhead->owner_gid = cpu_to_le32(-1); 3185 } 3186 3187 p = msg->front.iov_base + sizeof(*nhead); 3188 } 3189 3190 end = msg->front.iov_base + msg->front.iov_len; 3191 3192 lhead->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); 3193 lhead->op = cpu_to_le32(req->r_op); 3194 lhead->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, 3195 caller_fsuid)); 3196 lhead->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, 3197 caller_fsgid)); 3198 lhead->ino = cpu_to_le64(req->r_deleg_ino); 3199 lhead->args = req->r_args; 3200 3201 ceph_encode_filepath(&p, end, path_info1.vino.ino, path_info1.path); 3202 ceph_encode_filepath(&p, end, path_info2.vino.ino, path_info2.path); 3203 3204 /* make note of release offset, in case we need to replay */ 3205 req->r_request_release_offset = p - msg->front.iov_base; 3206 3207 /* cap releases */ 3208 releases = 0; 3209 if (req->r_inode_drop) 3210 releases += ceph_encode_inode_release(&p, 3211 req->r_inode ? req->r_inode : d_inode(req->r_dentry), 3212 mds, req->r_inode_drop, req->r_inode_unless, 3213 req->r_op == CEPH_MDS_OP_READDIR); 3214 if (req->r_dentry_drop) { 3215 ret = ceph_encode_dentry_release(&p, req->r_dentry, 3216 req->r_parent, mds, req->r_dentry_drop, 3217 req->r_dentry_unless); 3218 if (ret < 0) 3219 goto out_err; 3220 releases += ret; 3221 } 3222 if (req->r_old_dentry_drop) { 3223 ret = ceph_encode_dentry_release(&p, req->r_old_dentry, 3224 req->r_old_dentry_dir, mds, 3225 req->r_old_dentry_drop, 3226 req->r_old_dentry_unless); 3227 if (ret < 0) 3228 goto out_err; 3229 releases += ret; 3230 } 3231 if (req->r_old_inode_drop) 3232 releases += ceph_encode_inode_release(&p, 3233 d_inode(req->r_old_dentry), 3234 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); 3235 3236 if (drop_cap_releases) { 3237 releases = 0; 3238 p = msg->front.iov_base + req->r_request_release_offset; 3239 } 3240 3241 lhead->num_releases = cpu_to_le16(releases); 3242 3243 encode_mclientrequest_tail(&p, req); 3244 3245 if (WARN_ON_ONCE(p > end)) { 3246 ceph_msg_put(msg); 3247 msg = ERR_PTR(-ERANGE); 3248 goto out_free2; 3249 } 3250 3251 msg->front.iov_len = p - msg->front.iov_base; 3252 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 3253 3254 if (req->r_pagelist) { 3255 struct ceph_pagelist *pagelist = req->r_pagelist; 3256 ceph_msg_data_add_pagelist(msg, pagelist); 3257 msg->hdr.data_len = cpu_to_le32(pagelist->length); 3258 } else { 3259 msg->hdr.data_len = 0; 3260 } 3261 3262 msg->hdr.data_off = cpu_to_le16(0); 3263 3264 out_free2: 3265 ceph_mdsc_free_path_info(&path_info2); 3266 out_free1: 3267 ceph_mdsc_free_path_info(&path_info1); 3268 out: 3269 return msg; 3270 out_err: 3271 ceph_msg_put(msg); 3272 msg = ERR_PTR(ret); 3273 goto out_free2; 3274 } 3275 3276 /* 3277 * called under mdsc->mutex if error, under no mutex if 3278 * success. 3279 */ 3280 static void complete_request(struct ceph_mds_client *mdsc, 3281 struct ceph_mds_request *req) 3282 { 3283 req->r_end_latency = ktime_get(); 3284 3285 if (req->r_callback) 3286 req->r_callback(mdsc, req); 3287 complete_all(&req->r_completion); 3288 } 3289 3290 /* 3291 * called under mdsc->mutex 3292 */ 3293 static int __prepare_send_request(struct ceph_mds_session *session, 3294 struct ceph_mds_request *req, 3295 bool drop_cap_releases) 3296 { 3297 int mds = session->s_mds; 3298 struct ceph_mds_client *mdsc = session->s_mdsc; 3299 struct ceph_client *cl = mdsc->fsc->client; 3300 struct ceph_mds_request_head_legacy *lhead; 3301 struct ceph_mds_request_head *nhead; 3302 struct ceph_msg *msg; 3303 int flags = 0, old_max_retry; 3304 bool old_version = !test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD, 3305 &session->s_features); 3306 3307 /* 3308 * Avoid infinite retrying after overflow. The client will 3309 * increase the retry count and if the MDS is old version, 3310 * so we limit to retry at most 256 times. 3311 */ 3312 if (req->r_attempts) { 3313 old_max_retry = sizeof_field(struct ceph_mds_request_head, 3314 num_retry); 3315 old_max_retry = 1 << (old_max_retry * BITS_PER_BYTE); 3316 if ((old_version && req->r_attempts >= old_max_retry) || 3317 ((uint32_t)req->r_attempts >= U32_MAX)) { 3318 pr_warn_ratelimited_client(cl, "request tid %llu seq overflow\n", 3319 req->r_tid); 3320 return -EMULTIHOP; 3321 } 3322 } 3323 3324 req->r_attempts++; 3325 if (req->r_inode) { 3326 struct ceph_cap *cap = 3327 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); 3328 3329 if (cap) 3330 req->r_sent_on_mseq = cap->mseq; 3331 else 3332 req->r_sent_on_mseq = -1; 3333 } 3334 doutc(cl, "%p tid %lld %s (attempt %d)\n", req, req->r_tid, 3335 ceph_mds_op_name(req->r_op), req->r_attempts); 3336 3337 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3338 void *p; 3339 3340 /* 3341 * Replay. Do not regenerate message (and rebuild 3342 * paths, etc.); just use the original message. 3343 * Rebuilding paths will break for renames because 3344 * d_move mangles the src name. 3345 */ 3346 msg = req->r_request; 3347 lhead = find_legacy_request_head(msg->front.iov_base, 3348 session->s_con.peer_features); 3349 3350 flags = le32_to_cpu(lhead->flags); 3351 flags |= CEPH_MDS_FLAG_REPLAY; 3352 lhead->flags = cpu_to_le32(flags); 3353 3354 if (req->r_target_inode) 3355 lhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); 3356 3357 lhead->num_retry = req->r_attempts - 1; 3358 if (!old_version) { 3359 nhead = (struct ceph_mds_request_head*)msg->front.iov_base; 3360 nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1); 3361 } 3362 3363 /* remove cap/dentry releases from message */ 3364 lhead->num_releases = 0; 3365 3366 p = msg->front.iov_base + req->r_request_release_offset; 3367 encode_mclientrequest_tail(&p, req); 3368 3369 msg->front.iov_len = p - msg->front.iov_base; 3370 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 3371 return 0; 3372 } 3373 3374 if (req->r_request) { 3375 ceph_msg_put(req->r_request); 3376 req->r_request = NULL; 3377 } 3378 msg = create_request_message(session, req, drop_cap_releases); 3379 if (IS_ERR(msg)) { 3380 req->r_err = PTR_ERR(msg); 3381 return PTR_ERR(msg); 3382 } 3383 req->r_request = msg; 3384 3385 lhead = find_legacy_request_head(msg->front.iov_base, 3386 session->s_con.peer_features); 3387 lhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 3388 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3389 flags |= CEPH_MDS_FLAG_REPLAY; 3390 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) 3391 flags |= CEPH_MDS_FLAG_ASYNC; 3392 if (req->r_parent) 3393 flags |= CEPH_MDS_FLAG_WANT_DENTRY; 3394 lhead->flags = cpu_to_le32(flags); 3395 lhead->num_fwd = req->r_num_fwd; 3396 lhead->num_retry = req->r_attempts - 1; 3397 if (!old_version) { 3398 nhead = (struct ceph_mds_request_head*)msg->front.iov_base; 3399 nhead->ext_num_fwd = cpu_to_le32(req->r_num_fwd); 3400 nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1); 3401 } 3402 3403 doutc(cl, " r_parent = %p\n", req->r_parent); 3404 return 0; 3405 } 3406 3407 /* 3408 * called under mdsc->mutex 3409 */ 3410 static int __send_request(struct ceph_mds_session *session, 3411 struct ceph_mds_request *req, 3412 bool drop_cap_releases) 3413 { 3414 int err; 3415 3416 err = __prepare_send_request(session, req, drop_cap_releases); 3417 if (!err) { 3418 ceph_msg_get(req->r_request); 3419 ceph_con_send(&session->s_con, req->r_request); 3420 } 3421 3422 return err; 3423 } 3424 3425 /* 3426 * send request, or put it on the appropriate wait list. 3427 */ 3428 static void __do_request(struct ceph_mds_client *mdsc, 3429 struct ceph_mds_request *req) 3430 { 3431 struct ceph_client *cl = mdsc->fsc->client; 3432 struct ceph_mds_session *session = NULL; 3433 int mds = -1; 3434 int err = 0; 3435 bool random; 3436 3437 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 3438 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) 3439 __unregister_request(mdsc, req); 3440 return; 3441 } 3442 3443 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) { 3444 doutc(cl, "metadata corrupted\n"); 3445 err = -EIO; 3446 goto finish; 3447 } 3448 if (req->r_timeout && 3449 time_after_eq(jiffies, req->r_started + req->r_timeout)) { 3450 doutc(cl, "timed out\n"); 3451 err = -ETIMEDOUT; 3452 goto finish; 3453 } 3454 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 3455 doutc(cl, "forced umount\n"); 3456 err = -EIO; 3457 goto finish; 3458 } 3459 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) { 3460 if (mdsc->mdsmap_err) { 3461 err = mdsc->mdsmap_err; 3462 doutc(cl, "mdsmap err %d\n", err); 3463 goto finish; 3464 } 3465 if (mdsc->mdsmap->m_epoch == 0) { 3466 doutc(cl, "no mdsmap, waiting for map\n"); 3467 list_add(&req->r_wait, &mdsc->waiting_for_map); 3468 return; 3469 } 3470 if (!(mdsc->fsc->mount_options->flags & 3471 CEPH_MOUNT_OPT_MOUNTWAIT) && 3472 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) { 3473 err = -EHOSTUNREACH; 3474 goto finish; 3475 } 3476 } 3477 3478 put_request_session(req); 3479 3480 mds = __choose_mds(mdsc, req, &random); 3481 if (mds < 0 || 3482 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { 3483 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 3484 err = -EJUKEBOX; 3485 goto finish; 3486 } 3487 doutc(cl, "no mds or not active, waiting for map\n"); 3488 list_add(&req->r_wait, &mdsc->waiting_for_map); 3489 return; 3490 } 3491 3492 /* get, open session */ 3493 session = __ceph_lookup_mds_session(mdsc, mds); 3494 if (!session) { 3495 session = register_session(mdsc, mds); 3496 if (IS_ERR(session)) { 3497 err = PTR_ERR(session); 3498 goto finish; 3499 } 3500 } 3501 req->r_session = ceph_get_mds_session(session); 3502 3503 doutc(cl, "mds%d session %p state %s\n", mds, session, 3504 ceph_session_state_name(session->s_state)); 3505 3506 /* 3507 * The old ceph will crash the MDSs when see unknown OPs 3508 */ 3509 if (req->r_feature_needed > 0 && 3510 !test_bit(req->r_feature_needed, &session->s_features)) { 3511 err = -EOPNOTSUPP; 3512 goto out_session; 3513 } 3514 3515 if (session->s_state != CEPH_MDS_SESSION_OPEN && 3516 session->s_state != CEPH_MDS_SESSION_HUNG) { 3517 /* 3518 * We cannot queue async requests since the caps and delegated 3519 * inodes are bound to the session. Just return -EJUKEBOX and 3520 * let the caller retry a sync request in that case. 3521 */ 3522 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 3523 err = -EJUKEBOX; 3524 goto out_session; 3525 } 3526 3527 /* 3528 * If the session has been REJECTED, then return a hard error, 3529 * unless it's a CLEANRECOVER mount, in which case we'll queue 3530 * it to the mdsc queue. 3531 */ 3532 if (session->s_state == CEPH_MDS_SESSION_REJECTED) { 3533 if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER)) 3534 list_add(&req->r_wait, &mdsc->waiting_for_map); 3535 else 3536 err = -EACCES; 3537 goto out_session; 3538 } 3539 3540 if (session->s_state == CEPH_MDS_SESSION_NEW || 3541 session->s_state == CEPH_MDS_SESSION_CLOSING) { 3542 err = __open_session(mdsc, session); 3543 if (err) 3544 goto out_session; 3545 /* retry the same mds later */ 3546 if (random) 3547 req->r_resend_mds = mds; 3548 } 3549 list_add(&req->r_wait, &session->s_waiting); 3550 goto out_session; 3551 } 3552 3553 /* send request */ 3554 req->r_resend_mds = -1; /* forget any previous mds hint */ 3555 3556 if (req->r_request_started == 0) /* note request start time */ 3557 req->r_request_started = jiffies; 3558 3559 /* 3560 * For async create we will choose the auth MDS of frag in parent 3561 * directory to send the request and usually this works fine, but 3562 * if the migrated the dirtory to another MDS before it could handle 3563 * it the request will be forwarded. 3564 * 3565 * And then the auth cap will be changed. 3566 */ 3567 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags) && req->r_num_fwd) { 3568 struct ceph_dentry_info *di = ceph_dentry(req->r_dentry); 3569 struct ceph_inode_info *ci; 3570 struct ceph_cap *cap; 3571 3572 /* 3573 * The request maybe handled very fast and the new inode 3574 * hasn't been linked to the dentry yet. We need to wait 3575 * for the ceph_finish_async_create(), which shouldn't be 3576 * stuck too long or fail in thoery, to finish when forwarding 3577 * the request. 3578 */ 3579 if (!d_inode(req->r_dentry)) { 3580 err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_CREATE_BIT, 3581 TASK_KILLABLE); 3582 if (err) { 3583 mutex_lock(&req->r_fill_mutex); 3584 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 3585 mutex_unlock(&req->r_fill_mutex); 3586 goto out_session; 3587 } 3588 } 3589 3590 ci = ceph_inode(d_inode(req->r_dentry)); 3591 3592 spin_lock(&ci->i_ceph_lock); 3593 cap = ci->i_auth_cap; 3594 if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE && mds != cap->mds) { 3595 doutc(cl, "session changed for auth cap %d -> %d\n", 3596 cap->session->s_mds, session->s_mds); 3597 3598 /* Remove the auth cap from old session */ 3599 spin_lock(&cap->session->s_cap_lock); 3600 cap->session->s_nr_caps--; 3601 list_del_init(&cap->session_caps); 3602 spin_unlock(&cap->session->s_cap_lock); 3603 3604 /* Add the auth cap to the new session */ 3605 cap->mds = mds; 3606 cap->session = session; 3607 spin_lock(&session->s_cap_lock); 3608 session->s_nr_caps++; 3609 list_add_tail(&cap->session_caps, &session->s_caps); 3610 spin_unlock(&session->s_cap_lock); 3611 3612 change_auth_cap_ses(ci, session); 3613 } 3614 spin_unlock(&ci->i_ceph_lock); 3615 } 3616 3617 err = __send_request(session, req, false); 3618 3619 out_session: 3620 ceph_put_mds_session(session); 3621 finish: 3622 if (err) { 3623 doutc(cl, "early error %d\n", err); 3624 req->r_err = err; 3625 complete_request(mdsc, req); 3626 __unregister_request(mdsc, req); 3627 } 3628 return; 3629 } 3630 3631 /* 3632 * called under mdsc->mutex 3633 */ 3634 static void __wake_requests(struct ceph_mds_client *mdsc, 3635 struct list_head *head) 3636 { 3637 struct ceph_client *cl = mdsc->fsc->client; 3638 struct ceph_mds_request *req; 3639 LIST_HEAD(tmp_list); 3640 3641 list_splice_init(head, &tmp_list); 3642 3643 while (!list_empty(&tmp_list)) { 3644 req = list_entry(tmp_list.next, 3645 struct ceph_mds_request, r_wait); 3646 list_del_init(&req->r_wait); 3647 doutc(cl, " wake request %p tid %llu\n", req, 3648 req->r_tid); 3649 __do_request(mdsc, req); 3650 } 3651 } 3652 3653 /* 3654 * Wake up threads with requests pending for @mds, so that they can 3655 * resubmit their requests to a possibly different mds. 3656 */ 3657 static void kick_requests(struct ceph_mds_client *mdsc, int mds) 3658 { 3659 struct ceph_client *cl = mdsc->fsc->client; 3660 struct ceph_mds_request *req; 3661 struct rb_node *p = rb_first(&mdsc->request_tree); 3662 3663 doutc(cl, "kick_requests mds%d\n", mds); 3664 while (p) { 3665 req = rb_entry(p, struct ceph_mds_request, r_node); 3666 p = rb_next(p); 3667 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3668 continue; 3669 if (req->r_attempts > 0) 3670 continue; /* only new requests */ 3671 if (req->r_session && 3672 req->r_session->s_mds == mds) { 3673 doutc(cl, " kicking tid %llu\n", req->r_tid); 3674 list_del_init(&req->r_wait); 3675 __do_request(mdsc, req); 3676 } 3677 } 3678 } 3679 3680 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, 3681 struct ceph_mds_request *req) 3682 { 3683 struct ceph_client *cl = mdsc->fsc->client; 3684 int err = 0; 3685 3686 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */ 3687 if (req->r_inode) 3688 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 3689 if (req->r_parent) { 3690 struct ceph_inode_info *ci = ceph_inode(req->r_parent); 3691 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ? 3692 CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD; 3693 spin_lock(&ci->i_ceph_lock); 3694 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false); 3695 __ceph_touch_fmode(ci, mdsc, fmode); 3696 spin_unlock(&ci->i_ceph_lock); 3697 } 3698 if (req->r_old_dentry_dir) 3699 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 3700 CEPH_CAP_PIN); 3701 3702 if (req->r_inode) { 3703 err = ceph_wait_on_async_create(req->r_inode); 3704 if (err) { 3705 doutc(cl, "wait for async create returned: %d\n", err); 3706 return err; 3707 } 3708 } 3709 3710 if (!err && req->r_old_inode) { 3711 err = ceph_wait_on_async_create(req->r_old_inode); 3712 if (err) { 3713 doutc(cl, "wait for async create returned: %d\n", err); 3714 return err; 3715 } 3716 } 3717 3718 doutc(cl, "submit_request on %p for inode %p\n", req, dir); 3719 mutex_lock(&mdsc->mutex); 3720 __register_request(mdsc, req, dir); 3721 __do_request(mdsc, req); 3722 err = req->r_err; 3723 mutex_unlock(&mdsc->mutex); 3724 return err; 3725 } 3726 3727 int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc, 3728 struct ceph_mds_request *req, 3729 ceph_mds_request_wait_callback_t wait_func) 3730 { 3731 struct ceph_client *cl = mdsc->fsc->client; 3732 int err; 3733 3734 /* wait */ 3735 doutc(cl, "do_request waiting\n"); 3736 if (wait_func) { 3737 err = wait_func(mdsc, req); 3738 } else { 3739 long timeleft = wait_for_completion_killable_timeout( 3740 &req->r_completion, 3741 ceph_timeout_jiffies(req->r_timeout)); 3742 if (timeleft > 0) 3743 err = 0; 3744 else if (!timeleft) 3745 err = -ETIMEDOUT; /* timed out */ 3746 else 3747 err = timeleft; /* killed */ 3748 } 3749 doutc(cl, "do_request waited, got %d\n", err); 3750 mutex_lock(&mdsc->mutex); 3751 3752 /* only abort if we didn't race with a real reply */ 3753 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 3754 err = le32_to_cpu(req->r_reply_info.head->result); 3755 } else if (err < 0) { 3756 doutc(cl, "aborted request %lld with %d\n", req->r_tid, err); 3757 3758 /* 3759 * ensure we aren't running concurrently with 3760 * ceph_fill_trace or ceph_readdir_prepopulate, which 3761 * rely on locks (dir mutex) held by our caller. 3762 */ 3763 mutex_lock(&req->r_fill_mutex); 3764 req->r_err = err; 3765 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 3766 mutex_unlock(&req->r_fill_mutex); 3767 3768 if (req->r_parent && 3769 (req->r_op & CEPH_MDS_OP_WRITE)) 3770 ceph_invalidate_dir_request(req); 3771 } else { 3772 err = req->r_err; 3773 } 3774 3775 mutex_unlock(&mdsc->mutex); 3776 return err; 3777 } 3778 3779 /* 3780 * Synchrously perform an mds request. Take care of all of the 3781 * session setup, forwarding, retry details. 3782 */ 3783 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, 3784 struct inode *dir, 3785 struct ceph_mds_request *req) 3786 { 3787 struct ceph_client *cl = mdsc->fsc->client; 3788 int err; 3789 3790 doutc(cl, "do_request on %p\n", req); 3791 3792 /* issue */ 3793 err = ceph_mdsc_submit_request(mdsc, dir, req); 3794 if (!err) 3795 err = ceph_mdsc_wait_request(mdsc, req, NULL); 3796 doutc(cl, "do_request %p done, result %d\n", req, err); 3797 return err; 3798 } 3799 3800 /* 3801 * Invalidate dir's completeness, dentry lease state on an aborted MDS 3802 * namespace request. 3803 */ 3804 void ceph_invalidate_dir_request(struct ceph_mds_request *req) 3805 { 3806 struct inode *dir = req->r_parent; 3807 struct inode *old_dir = req->r_old_dentry_dir; 3808 struct ceph_client *cl = req->r_mdsc->fsc->client; 3809 3810 doutc(cl, "invalidate_dir_request %p %p (complete, lease(s))\n", 3811 dir, old_dir); 3812 3813 ceph_dir_clear_complete(dir); 3814 if (old_dir) 3815 ceph_dir_clear_complete(old_dir); 3816 if (req->r_dentry) 3817 ceph_invalidate_dentry_lease(req->r_dentry); 3818 if (req->r_old_dentry) 3819 ceph_invalidate_dentry_lease(req->r_old_dentry); 3820 } 3821 3822 /* 3823 * Handle mds reply. 3824 * 3825 * We take the session mutex and parse and process the reply immediately. 3826 * This preserves the logical ordering of replies, capabilities, etc., sent 3827 * by the MDS as they are applied to our local cache. 3828 */ 3829 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) 3830 { 3831 struct ceph_mds_client *mdsc = session->s_mdsc; 3832 struct ceph_client *cl = mdsc->fsc->client; 3833 struct ceph_mds_request *req; 3834 struct ceph_mds_reply_head *head = msg->front.iov_base; 3835 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 3836 struct ceph_snap_realm *realm; 3837 u64 tid; 3838 int err, result; 3839 int mds = session->s_mds; 3840 bool close_sessions = false; 3841 3842 if (msg->front.iov_len < sizeof(*head)) { 3843 pr_err_client(cl, "got corrupt (short) reply\n"); 3844 ceph_msg_dump(msg); 3845 return; 3846 } 3847 3848 /* get request, session */ 3849 tid = le64_to_cpu(msg->hdr.tid); 3850 mutex_lock(&mdsc->mutex); 3851 req = lookup_get_request(mdsc, tid); 3852 if (!req) { 3853 doutc(cl, "on unknown tid %llu\n", tid); 3854 mutex_unlock(&mdsc->mutex); 3855 return; 3856 } 3857 doutc(cl, "handle_reply %p\n", req); 3858 3859 /* correct session? */ 3860 if (req->r_session != session) { 3861 pr_err_client(cl, "got %llu on session mds%d not mds%d\n", 3862 tid, session->s_mds, 3863 req->r_session ? req->r_session->s_mds : -1); 3864 mutex_unlock(&mdsc->mutex); 3865 goto out; 3866 } 3867 3868 /* dup? */ 3869 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) || 3870 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) { 3871 pr_warn_client(cl, "got a dup %s reply on %llu from mds%d\n", 3872 head->safe ? "safe" : "unsafe", tid, mds); 3873 mutex_unlock(&mdsc->mutex); 3874 goto out; 3875 } 3876 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) { 3877 pr_warn_client(cl, "got unsafe after safe on %llu from mds%d\n", 3878 tid, mds); 3879 mutex_unlock(&mdsc->mutex); 3880 goto out; 3881 } 3882 3883 result = le32_to_cpu(head->result); 3884 3885 if (head->safe) { 3886 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags); 3887 __unregister_request(mdsc, req); 3888 3889 /* last request during umount? */ 3890 if (mdsc->stopping && !__get_oldest_req(mdsc)) 3891 complete_all(&mdsc->safe_umount_waiters); 3892 3893 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3894 /* 3895 * We already handled the unsafe response, now do the 3896 * cleanup. No need to examine the response; the MDS 3897 * doesn't include any result info in the safe 3898 * response. And even if it did, there is nothing 3899 * useful we could do with a revised return value. 3900 */ 3901 doutc(cl, "got safe reply %llu, mds%d\n", tid, mds); 3902 3903 mutex_unlock(&mdsc->mutex); 3904 goto out; 3905 } 3906 } else { 3907 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags); 3908 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 3909 } 3910 3911 doutc(cl, "tid %lld result %d\n", tid, result); 3912 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features)) 3913 err = parse_reply_info(session, msg, req, (u64)-1); 3914 else 3915 err = parse_reply_info(session, msg, req, 3916 session->s_con.peer_features); 3917 mutex_unlock(&mdsc->mutex); 3918 3919 /* Must find target inode outside of mutexes to avoid deadlocks */ 3920 rinfo = &req->r_reply_info; 3921 if ((err >= 0) && rinfo->head->is_target) { 3922 struct inode *in = xchg(&req->r_new_inode, NULL); 3923 struct ceph_vino tvino = { 3924 .ino = le64_to_cpu(rinfo->targeti.in->ino), 3925 .snap = le64_to_cpu(rinfo->targeti.in->snapid) 3926 }; 3927 3928 /* 3929 * If we ended up opening an existing inode, discard 3930 * r_new_inode 3931 */ 3932 if (req->r_op == CEPH_MDS_OP_CREATE && 3933 !req->r_reply_info.has_create_ino) { 3934 /* This should never happen on an async create */ 3935 WARN_ON_ONCE(req->r_deleg_ino); 3936 iput(in); 3937 in = NULL; 3938 } 3939 3940 in = ceph_get_inode(mdsc->fsc->sb, tvino, in); 3941 if (IS_ERR(in)) { 3942 err = PTR_ERR(in); 3943 mutex_lock(&session->s_mutex); 3944 goto out_err; 3945 } 3946 req->r_target_inode = in; 3947 } 3948 3949 mutex_lock(&session->s_mutex); 3950 if (err < 0) { 3951 pr_err_client(cl, "got corrupt reply mds%d(tid:%lld)\n", 3952 mds, tid); 3953 ceph_msg_dump(msg); 3954 goto out_err; 3955 } 3956 3957 /* snap trace */ 3958 realm = NULL; 3959 if (rinfo->snapblob_len) { 3960 down_write(&mdsc->snap_rwsem); 3961 err = ceph_update_snap_trace(mdsc, rinfo->snapblob, 3962 rinfo->snapblob + rinfo->snapblob_len, 3963 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP, 3964 &realm); 3965 if (err) { 3966 up_write(&mdsc->snap_rwsem); 3967 close_sessions = true; 3968 if (err == -EIO) 3969 ceph_msg_dump(msg); 3970 goto out_err; 3971 } 3972 downgrade_write(&mdsc->snap_rwsem); 3973 } else { 3974 down_read(&mdsc->snap_rwsem); 3975 } 3976 3977 /* insert trace into our cache */ 3978 mutex_lock(&req->r_fill_mutex); 3979 current->journal_info = req; 3980 err = ceph_fill_trace(mdsc->fsc->sb, req); 3981 if (err == 0) { 3982 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || 3983 req->r_op == CEPH_MDS_OP_LSSNAP)) 3984 err = ceph_readdir_prepopulate(req, req->r_session); 3985 } 3986 current->journal_info = NULL; 3987 mutex_unlock(&req->r_fill_mutex); 3988 3989 up_read(&mdsc->snap_rwsem); 3990 if (realm) 3991 ceph_put_snap_realm(mdsc, realm); 3992 3993 if (err == 0) { 3994 if (req->r_target_inode && 3995 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3996 struct ceph_inode_info *ci = 3997 ceph_inode(req->r_target_inode); 3998 spin_lock(&ci->i_unsafe_lock); 3999 list_add_tail(&req->r_unsafe_target_item, 4000 &ci->i_unsafe_iops); 4001 spin_unlock(&ci->i_unsafe_lock); 4002 } 4003 4004 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 4005 } 4006 out_err: 4007 mutex_lock(&mdsc->mutex); 4008 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 4009 if (err) { 4010 req->r_err = err; 4011 } else { 4012 req->r_reply = ceph_msg_get(msg); 4013 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags); 4014 } 4015 } else { 4016 doutc(cl, "reply arrived after request %lld was aborted\n", tid); 4017 } 4018 mutex_unlock(&mdsc->mutex); 4019 4020 mutex_unlock(&session->s_mutex); 4021 4022 /* kick calling process */ 4023 complete_request(mdsc, req); 4024 4025 ceph_update_metadata_metrics(&mdsc->metric, req->r_start_latency, 4026 req->r_end_latency, err); 4027 out: 4028 ceph_mdsc_put_request(req); 4029 4030 /* Defer closing the sessions after s_mutex lock being released */ 4031 if (close_sessions) 4032 ceph_mdsc_close_sessions(mdsc); 4033 return; 4034 } 4035 4036 4037 4038 /* 4039 * handle mds notification that our request has been forwarded. 4040 */ 4041 static void handle_forward(struct ceph_mds_client *mdsc, 4042 struct ceph_mds_session *session, 4043 struct ceph_msg *msg) 4044 { 4045 struct ceph_client *cl = mdsc->fsc->client; 4046 struct ceph_mds_request *req; 4047 u64 tid = le64_to_cpu(msg->hdr.tid); 4048 u32 next_mds; 4049 u32 fwd_seq; 4050 int err = -EINVAL; 4051 void *p = msg->front.iov_base; 4052 void *end = p + msg->front.iov_len; 4053 bool aborted = false; 4054 4055 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 4056 next_mds = ceph_decode_32(&p); 4057 fwd_seq = ceph_decode_32(&p); 4058 4059 mutex_lock(&mdsc->mutex); 4060 req = lookup_get_request(mdsc, tid); 4061 if (!req) { 4062 mutex_unlock(&mdsc->mutex); 4063 doutc(cl, "forward tid %llu to mds%d - req dne\n", tid, next_mds); 4064 return; /* dup reply? */ 4065 } 4066 4067 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 4068 doutc(cl, "forward tid %llu aborted, unregistering\n", tid); 4069 __unregister_request(mdsc, req); 4070 } else if (fwd_seq <= req->r_num_fwd || (uint32_t)fwd_seq >= U32_MAX) { 4071 /* 4072 * Avoid infinite retrying after overflow. 4073 * 4074 * The MDS will increase the fwd count and in client side 4075 * if the num_fwd is less than the one saved in request 4076 * that means the MDS is an old version and overflowed of 4077 * 8 bits. 4078 */ 4079 mutex_lock(&req->r_fill_mutex); 4080 req->r_err = -EMULTIHOP; 4081 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 4082 mutex_unlock(&req->r_fill_mutex); 4083 aborted = true; 4084 pr_warn_ratelimited_client(cl, "forward tid %llu seq overflow\n", 4085 tid); 4086 } else { 4087 /* resend. forward race not possible; mds would drop */ 4088 doutc(cl, "forward tid %llu to mds%d (we resend)\n", tid, next_mds); 4089 BUG_ON(req->r_err); 4090 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)); 4091 req->r_attempts = 0; 4092 req->r_num_fwd = fwd_seq; 4093 req->r_resend_mds = next_mds; 4094 put_request_session(req); 4095 __do_request(mdsc, req); 4096 } 4097 mutex_unlock(&mdsc->mutex); 4098 4099 /* kick calling process */ 4100 if (aborted) 4101 complete_request(mdsc, req); 4102 ceph_mdsc_put_request(req); 4103 return; 4104 4105 bad: 4106 pr_err_client(cl, "decode error err=%d\n", err); 4107 ceph_msg_dump(msg); 4108 } 4109 4110 static int __decode_session_metadata(void **p, void *end, 4111 bool *blocklisted) 4112 { 4113 /* map<string,string> */ 4114 u32 n; 4115 bool err_str; 4116 ceph_decode_32_safe(p, end, n, bad); 4117 while (n-- > 0) { 4118 u32 len; 4119 ceph_decode_32_safe(p, end, len, bad); 4120 ceph_decode_need(p, end, len, bad); 4121 err_str = !strncmp(*p, "error_string", len); 4122 *p += len; 4123 ceph_decode_32_safe(p, end, len, bad); 4124 ceph_decode_need(p, end, len, bad); 4125 /* 4126 * Match "blocklisted (blacklisted)" from newer MDSes, 4127 * or "blacklisted" from older MDSes. 4128 */ 4129 if (err_str && strnstr(*p, "blacklisted", len)) 4130 *blocklisted = true; 4131 *p += len; 4132 } 4133 return 0; 4134 bad: 4135 return -1; 4136 } 4137 4138 /* 4139 * handle a mds session control message 4140 */ 4141 static void handle_session(struct ceph_mds_session *session, 4142 struct ceph_msg *msg) 4143 { 4144 struct ceph_mds_client *mdsc = session->s_mdsc; 4145 struct ceph_client *cl = mdsc->fsc->client; 4146 int mds = session->s_mds; 4147 int msg_version = le16_to_cpu(msg->hdr.version); 4148 void *p = msg->front.iov_base; 4149 void *end = p + msg->front.iov_len; 4150 struct ceph_mds_session_head *h; 4151 struct ceph_mds_cap_auth *cap_auths = NULL; 4152 u32 op, cap_auths_num = 0; 4153 u64 seq, features = 0; 4154 int wake = 0; 4155 bool blocklisted = false; 4156 u32 i; 4157 4158 4159 /* decode */ 4160 ceph_decode_need(&p, end, sizeof(*h), bad); 4161 h = p; 4162 p += sizeof(*h); 4163 4164 op = le32_to_cpu(h->op); 4165 seq = le64_to_cpu(h->seq); 4166 4167 if (msg_version >= 3) { 4168 u32 len; 4169 /* version >= 2 and < 5, decode metadata, skip otherwise 4170 * as it's handled via flags. 4171 */ 4172 if (msg_version >= 5) 4173 ceph_decode_skip_map(&p, end, string, string, bad); 4174 else if (__decode_session_metadata(&p, end, &blocklisted) < 0) 4175 goto bad; 4176 4177 /* version >= 3, feature bits */ 4178 ceph_decode_32_safe(&p, end, len, bad); 4179 if (len) { 4180 ceph_decode_64_safe(&p, end, features, bad); 4181 p += len - sizeof(features); 4182 } 4183 } 4184 4185 if (msg_version >= 5) { 4186 u32 flags, len; 4187 4188 /* version >= 4 */ 4189 ceph_decode_skip_16(&p, end, bad); /* struct_v, struct_cv */ 4190 ceph_decode_32_safe(&p, end, len, bad); /* len */ 4191 ceph_decode_skip_n(&p, end, len, bad); /* metric_spec */ 4192 4193 /* version >= 5, flags */ 4194 ceph_decode_32_safe(&p, end, flags, bad); 4195 if (flags & CEPH_SESSION_BLOCKLISTED) { 4196 pr_warn_client(cl, "mds%d session blocklisted\n", 4197 session->s_mds); 4198 blocklisted = true; 4199 } 4200 } 4201 4202 if (msg_version >= 6) { 4203 ceph_decode_32_safe(&p, end, cap_auths_num, bad); 4204 doutc(cl, "cap_auths_num %d\n", cap_auths_num); 4205 4206 if (cap_auths_num && op != CEPH_SESSION_OPEN) { 4207 WARN_ON_ONCE(op != CEPH_SESSION_OPEN); 4208 goto skip_cap_auths; 4209 } 4210 4211 cap_auths = kcalloc(cap_auths_num, 4212 sizeof(struct ceph_mds_cap_auth), 4213 GFP_KERNEL); 4214 if (!cap_auths) { 4215 pr_err_client(cl, "No memory for cap_auths\n"); 4216 return; 4217 } 4218 4219 for (i = 0; i < cap_auths_num; i++) { 4220 u32 _len, j; 4221 4222 /* struct_v, struct_compat, and struct_len in MDSCapAuth */ 4223 ceph_decode_skip_n(&p, end, 2 + sizeof(u32), bad); 4224 4225 /* struct_v, struct_compat, and struct_len in MDSCapMatch */ 4226 ceph_decode_skip_n(&p, end, 2 + sizeof(u32), bad); 4227 ceph_decode_64_safe(&p, end, cap_auths[i].match.uid, bad); 4228 ceph_decode_32_safe(&p, end, _len, bad); 4229 if (_len) { 4230 cap_auths[i].match.gids = kcalloc(_len, sizeof(u32), 4231 GFP_KERNEL); 4232 if (!cap_auths[i].match.gids) { 4233 pr_err_client(cl, "No memory for gids\n"); 4234 goto fail; 4235 } 4236 4237 cap_auths[i].match.num_gids = _len; 4238 for (j = 0; j < _len; j++) 4239 ceph_decode_32_safe(&p, end, 4240 cap_auths[i].match.gids[j], 4241 bad); 4242 } 4243 4244 ceph_decode_32_safe(&p, end, _len, bad); 4245 if (_len) { 4246 cap_auths[i].match.path = kcalloc(_len + 1, sizeof(char), 4247 GFP_KERNEL); 4248 if (!cap_auths[i].match.path) { 4249 pr_err_client(cl, "No memory for path\n"); 4250 goto fail; 4251 } 4252 ceph_decode_copy(&p, cap_auths[i].match.path, _len); 4253 4254 /* Remove the tailing '/' */ 4255 while (_len && cap_auths[i].match.path[_len - 1] == '/') { 4256 cap_auths[i].match.path[_len - 1] = '\0'; 4257 _len -= 1; 4258 } 4259 } 4260 4261 ceph_decode_32_safe(&p, end, _len, bad); 4262 if (_len) { 4263 cap_auths[i].match.fs_name = kcalloc(_len + 1, sizeof(char), 4264 GFP_KERNEL); 4265 if (!cap_auths[i].match.fs_name) { 4266 pr_err_client(cl, "No memory for fs_name\n"); 4267 goto fail; 4268 } 4269 ceph_decode_copy(&p, cap_auths[i].match.fs_name, _len); 4270 } 4271 4272 ceph_decode_8_safe(&p, end, cap_auths[i].match.root_squash, bad); 4273 ceph_decode_8_safe(&p, end, cap_auths[i].readable, bad); 4274 ceph_decode_8_safe(&p, end, cap_auths[i].writeable, bad); 4275 doutc(cl, "uid %lld, num_gids %u, path %s, fs_name %s, root_squash %d, readable %d, writeable %d\n", 4276 cap_auths[i].match.uid, cap_auths[i].match.num_gids, 4277 cap_auths[i].match.path, cap_auths[i].match.fs_name, 4278 cap_auths[i].match.root_squash, 4279 cap_auths[i].readable, cap_auths[i].writeable); 4280 } 4281 } 4282 4283 skip_cap_auths: 4284 mutex_lock(&mdsc->mutex); 4285 if (op == CEPH_SESSION_OPEN) { 4286 if (mdsc->s_cap_auths) { 4287 for (i = 0; i < mdsc->s_cap_auths_num; i++) { 4288 kfree(mdsc->s_cap_auths[i].match.gids); 4289 kfree(mdsc->s_cap_auths[i].match.path); 4290 kfree(mdsc->s_cap_auths[i].match.fs_name); 4291 } 4292 kfree(mdsc->s_cap_auths); 4293 } 4294 mdsc->s_cap_auths_num = cap_auths_num; 4295 mdsc->s_cap_auths = cap_auths; 4296 } 4297 if (op == CEPH_SESSION_CLOSE) { 4298 ceph_get_mds_session(session); 4299 __unregister_session(mdsc, session); 4300 } 4301 /* FIXME: this ttl calculation is generous */ 4302 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 4303 mutex_unlock(&mdsc->mutex); 4304 4305 mutex_lock(&session->s_mutex); 4306 4307 doutc(cl, "mds%d %s %p state %s seq %llu\n", mds, 4308 ceph_session_op_name(op), session, 4309 ceph_session_state_name(session->s_state), seq); 4310 4311 if (session->s_state == CEPH_MDS_SESSION_HUNG) { 4312 session->s_state = CEPH_MDS_SESSION_OPEN; 4313 pr_info_client(cl, "mds%d came back\n", session->s_mds); 4314 } 4315 4316 switch (op) { 4317 case CEPH_SESSION_OPEN: 4318 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 4319 pr_info_client(cl, "mds%d reconnect success\n", 4320 session->s_mds); 4321 4322 session->s_features = features; 4323 if (session->s_state == CEPH_MDS_SESSION_OPEN) { 4324 pr_notice_client(cl, "mds%d is already opened\n", 4325 session->s_mds); 4326 } else { 4327 session->s_state = CEPH_MDS_SESSION_OPEN; 4328 renewed_caps(mdsc, session, 0); 4329 if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, 4330 &session->s_features)) 4331 metric_schedule_delayed(&mdsc->metric); 4332 } 4333 4334 /* 4335 * The connection maybe broken and the session in client 4336 * side has been reinitialized, need to update the seq 4337 * anyway. 4338 */ 4339 if (!session->s_seq && seq) 4340 session->s_seq = seq; 4341 4342 wake = 1; 4343 if (mdsc->stopping) 4344 __close_session(mdsc, session); 4345 break; 4346 4347 case CEPH_SESSION_RENEWCAPS: 4348 if (session->s_renew_seq == seq) 4349 renewed_caps(mdsc, session, 1); 4350 break; 4351 4352 case CEPH_SESSION_CLOSE: 4353 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 4354 pr_info_client(cl, "mds%d reconnect denied\n", 4355 session->s_mds); 4356 session->s_state = CEPH_MDS_SESSION_CLOSED; 4357 cleanup_session_requests(mdsc, session); 4358 remove_session_caps(session); 4359 wake = 2; /* for good measure */ 4360 wake_up_all(&mdsc->session_close_wq); 4361 break; 4362 4363 case CEPH_SESSION_STALE: 4364 pr_info_client(cl, "mds%d caps went stale, renewing\n", 4365 session->s_mds); 4366 atomic_inc(&session->s_cap_gen); 4367 session->s_cap_ttl = jiffies - 1; 4368 send_renew_caps(mdsc, session); 4369 break; 4370 4371 case CEPH_SESSION_RECALL_STATE: 4372 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 4373 break; 4374 4375 case CEPH_SESSION_FLUSHMSG: 4376 /* flush cap releases */ 4377 spin_lock(&session->s_cap_lock); 4378 if (session->s_num_cap_releases) 4379 ceph_flush_session_cap_releases(mdsc, session); 4380 spin_unlock(&session->s_cap_lock); 4381 4382 send_flushmsg_ack(mdsc, session, seq); 4383 break; 4384 4385 case CEPH_SESSION_FORCE_RO: 4386 doutc(cl, "force_session_readonly %p\n", session); 4387 spin_lock(&session->s_cap_lock); 4388 session->s_readonly = true; 4389 spin_unlock(&session->s_cap_lock); 4390 wake_up_session_caps(session, FORCE_RO); 4391 break; 4392 4393 case CEPH_SESSION_REJECT: 4394 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING); 4395 pr_info_client(cl, "mds%d rejected session\n", 4396 session->s_mds); 4397 session->s_state = CEPH_MDS_SESSION_REJECTED; 4398 cleanup_session_requests(mdsc, session); 4399 remove_session_caps(session); 4400 if (blocklisted) 4401 mdsc->fsc->blocklisted = true; 4402 wake = 2; /* for good measure */ 4403 break; 4404 4405 default: 4406 pr_err_client(cl, "bad op %d mds%d\n", op, mds); 4407 WARN_ON(1); 4408 } 4409 4410 mutex_unlock(&session->s_mutex); 4411 if (wake) { 4412 mutex_lock(&mdsc->mutex); 4413 __wake_requests(mdsc, &session->s_waiting); 4414 if (wake == 2) 4415 kick_requests(mdsc, mds); 4416 mutex_unlock(&mdsc->mutex); 4417 } 4418 if (op == CEPH_SESSION_CLOSE) 4419 ceph_put_mds_session(session); 4420 return; 4421 4422 bad: 4423 pr_err_client(cl, "corrupt message mds%d len %d\n", mds, 4424 (int)msg->front.iov_len); 4425 ceph_msg_dump(msg); 4426 fail: 4427 for (i = 0; i < cap_auths_num; i++) { 4428 kfree(cap_auths[i].match.gids); 4429 kfree(cap_auths[i].match.path); 4430 kfree(cap_auths[i].match.fs_name); 4431 } 4432 kfree(cap_auths); 4433 return; 4434 } 4435 4436 void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req) 4437 { 4438 struct ceph_client *cl = req->r_mdsc->fsc->client; 4439 int dcaps; 4440 4441 dcaps = xchg(&req->r_dir_caps, 0); 4442 if (dcaps) { 4443 doutc(cl, "releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 4444 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps); 4445 } 4446 } 4447 4448 void ceph_mdsc_release_dir_caps_async(struct ceph_mds_request *req) 4449 { 4450 struct ceph_client *cl = req->r_mdsc->fsc->client; 4451 int dcaps; 4452 4453 dcaps = xchg(&req->r_dir_caps, 0); 4454 if (dcaps) { 4455 doutc(cl, "releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 4456 ceph_put_cap_refs_async(ceph_inode(req->r_parent), dcaps); 4457 } 4458 } 4459 4460 /* 4461 * called under session->mutex. 4462 */ 4463 static void replay_unsafe_requests(struct ceph_mds_client *mdsc, 4464 struct ceph_mds_session *session) 4465 { 4466 struct ceph_mds_request *req, *nreq; 4467 struct rb_node *p; 4468 4469 doutc(mdsc->fsc->client, "mds%d\n", session->s_mds); 4470 4471 mutex_lock(&mdsc->mutex); 4472 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) 4473 __send_request(session, req, true); 4474 4475 /* 4476 * also re-send old requests when MDS enters reconnect stage. So that MDS 4477 * can process completed request in clientreplay stage. 4478 */ 4479 p = rb_first(&mdsc->request_tree); 4480 while (p) { 4481 req = rb_entry(p, struct ceph_mds_request, r_node); 4482 p = rb_next(p); 4483 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 4484 continue; 4485 if (req->r_attempts == 0) 4486 continue; /* only old requests */ 4487 if (!req->r_session) 4488 continue; 4489 if (req->r_session->s_mds != session->s_mds) 4490 continue; 4491 4492 ceph_mdsc_release_dir_caps_async(req); 4493 4494 __send_request(session, req, true); 4495 } 4496 mutex_unlock(&mdsc->mutex); 4497 } 4498 4499 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state) 4500 { 4501 struct ceph_msg *reply; 4502 struct ceph_pagelist *_pagelist; 4503 struct page *page; 4504 __le32 *addr; 4505 int err = -ENOMEM; 4506 4507 if (!recon_state->allow_multi) 4508 return -ENOSPC; 4509 4510 /* can't handle message that contains both caps and realm */ 4511 BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms); 4512 4513 /* pre-allocate new pagelist */ 4514 _pagelist = ceph_pagelist_alloc(GFP_NOFS); 4515 if (!_pagelist) 4516 return -ENOMEM; 4517 4518 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 4519 if (!reply) 4520 goto fail_msg; 4521 4522 /* placeholder for nr_caps */ 4523 err = ceph_pagelist_encode_32(_pagelist, 0); 4524 if (err < 0) 4525 goto fail; 4526 4527 if (recon_state->nr_caps) { 4528 /* currently encoding caps */ 4529 err = ceph_pagelist_encode_32(recon_state->pagelist, 0); 4530 if (err) 4531 goto fail; 4532 } else { 4533 /* placeholder for nr_realms (currently encoding relams) */ 4534 err = ceph_pagelist_encode_32(_pagelist, 0); 4535 if (err < 0) 4536 goto fail; 4537 } 4538 4539 err = ceph_pagelist_encode_8(recon_state->pagelist, 1); 4540 if (err) 4541 goto fail; 4542 4543 page = list_first_entry(&recon_state->pagelist->head, struct page, lru); 4544 addr = kmap_atomic(page); 4545 if (recon_state->nr_caps) { 4546 /* currently encoding caps */ 4547 *addr = cpu_to_le32(recon_state->nr_caps); 4548 } else { 4549 /* currently encoding relams */ 4550 *(addr + 1) = cpu_to_le32(recon_state->nr_realms); 4551 } 4552 kunmap_atomic(addr); 4553 4554 reply->hdr.version = cpu_to_le16(5); 4555 reply->hdr.compat_version = cpu_to_le16(4); 4556 4557 reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length); 4558 ceph_msg_data_add_pagelist(reply, recon_state->pagelist); 4559 4560 ceph_con_send(&recon_state->session->s_con, reply); 4561 ceph_pagelist_release(recon_state->pagelist); 4562 4563 recon_state->pagelist = _pagelist; 4564 recon_state->nr_caps = 0; 4565 recon_state->nr_realms = 0; 4566 recon_state->msg_version = 5; 4567 return 0; 4568 fail: 4569 ceph_msg_put(reply); 4570 fail_msg: 4571 ceph_pagelist_release(_pagelist); 4572 return err; 4573 } 4574 4575 static struct dentry* d_find_primary(struct inode *inode) 4576 { 4577 struct dentry *alias, *dn = NULL; 4578 4579 if (hlist_empty(&inode->i_dentry)) 4580 return NULL; 4581 4582 spin_lock(&inode->i_lock); 4583 if (hlist_empty(&inode->i_dentry)) 4584 goto out_unlock; 4585 4586 if (S_ISDIR(inode->i_mode)) { 4587 alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias); 4588 if (!IS_ROOT(alias)) 4589 dn = dget(alias); 4590 goto out_unlock; 4591 } 4592 4593 hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) { 4594 spin_lock(&alias->d_lock); 4595 if (!d_unhashed(alias) && 4596 (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) { 4597 dn = dget_dlock(alias); 4598 } 4599 spin_unlock(&alias->d_lock); 4600 if (dn) 4601 break; 4602 } 4603 out_unlock: 4604 spin_unlock(&inode->i_lock); 4605 return dn; 4606 } 4607 4608 /* 4609 * Encode information about a cap for a reconnect with the MDS. 4610 */ 4611 static int reconnect_caps_cb(struct inode *inode, int mds, void *arg) 4612 { 4613 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); 4614 struct ceph_client *cl = ceph_inode_to_client(inode); 4615 union { 4616 struct ceph_mds_cap_reconnect v2; 4617 struct ceph_mds_cap_reconnect_v1 v1; 4618 } rec; 4619 struct ceph_inode_info *ci = ceph_inode(inode); 4620 struct ceph_reconnect_state *recon_state = arg; 4621 struct ceph_pagelist *pagelist = recon_state->pagelist; 4622 struct dentry *dentry; 4623 struct ceph_cap *cap; 4624 struct ceph_path_info path_info = {0}; 4625 int err; 4626 u64 snap_follows; 4627 4628 dentry = d_find_primary(inode); 4629 if (dentry) { 4630 /* set pathbase to parent dir when msg_version >= 2 */ 4631 char *path = ceph_mdsc_build_path(mdsc, dentry, &path_info, 4632 recon_state->msg_version >= 2); 4633 dput(dentry); 4634 if (IS_ERR(path)) { 4635 err = PTR_ERR(path); 4636 goto out_err; 4637 } 4638 } 4639 4640 spin_lock(&ci->i_ceph_lock); 4641 cap = __get_cap_for_mds(ci, mds); 4642 if (!cap) { 4643 spin_unlock(&ci->i_ceph_lock); 4644 err = 0; 4645 goto out_err; 4646 } 4647 doutc(cl, " adding %p ino %llx.%llx cap %p %lld %s\n", inode, 4648 ceph_vinop(inode), cap, cap->cap_id, 4649 ceph_cap_string(cap->issued)); 4650 4651 cap->seq = 0; /* reset cap seq */ 4652 cap->issue_seq = 0; /* and issue_seq */ 4653 cap->mseq = 0; /* and migrate_seq */ 4654 cap->cap_gen = atomic_read(&cap->session->s_cap_gen); 4655 4656 /* These are lost when the session goes away */ 4657 if (S_ISDIR(inode->i_mode)) { 4658 if (cap->issued & CEPH_CAP_DIR_CREATE) { 4659 ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns)); 4660 memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout)); 4661 } 4662 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS; 4663 } 4664 4665 if (recon_state->msg_version >= 2) { 4666 rec.v2.cap_id = cpu_to_le64(cap->cap_id); 4667 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 4668 rec.v2.issued = cpu_to_le32(cap->issued); 4669 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 4670 rec.v2.pathbase = cpu_to_le64(path_info.vino.ino); 4671 rec.v2.flock_len = (__force __le32) 4672 ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1); 4673 } else { 4674 struct timespec64 ts; 4675 4676 rec.v1.cap_id = cpu_to_le64(cap->cap_id); 4677 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 4678 rec.v1.issued = cpu_to_le32(cap->issued); 4679 rec.v1.size = cpu_to_le64(i_size_read(inode)); 4680 ts = inode_get_mtime(inode); 4681 ceph_encode_timespec64(&rec.v1.mtime, &ts); 4682 ts = inode_get_atime(inode); 4683 ceph_encode_timespec64(&rec.v1.atime, &ts); 4684 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 4685 rec.v1.pathbase = cpu_to_le64(path_info.vino.ino); 4686 } 4687 4688 if (list_empty(&ci->i_cap_snaps)) { 4689 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0; 4690 } else { 4691 struct ceph_cap_snap *capsnap = 4692 list_first_entry(&ci->i_cap_snaps, 4693 struct ceph_cap_snap, ci_item); 4694 snap_follows = capsnap->follows; 4695 } 4696 spin_unlock(&ci->i_ceph_lock); 4697 4698 if (recon_state->msg_version >= 2) { 4699 int num_fcntl_locks, num_flock_locks; 4700 struct ceph_filelock *flocks = NULL; 4701 size_t struct_len, total_len = sizeof(u64); 4702 u8 struct_v = 0; 4703 4704 encode_again: 4705 if (rec.v2.flock_len) { 4706 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); 4707 } else { 4708 num_fcntl_locks = 0; 4709 num_flock_locks = 0; 4710 } 4711 if (num_fcntl_locks + num_flock_locks > 0) { 4712 flocks = kmalloc_array(num_fcntl_locks + num_flock_locks, 4713 sizeof(struct ceph_filelock), 4714 GFP_NOFS); 4715 if (!flocks) { 4716 err = -ENOMEM; 4717 goto out_err; 4718 } 4719 err = ceph_encode_locks_to_buffer(inode, flocks, 4720 num_fcntl_locks, 4721 num_flock_locks); 4722 if (err) { 4723 kfree(flocks); 4724 flocks = NULL; 4725 if (err == -ENOSPC) 4726 goto encode_again; 4727 goto out_err; 4728 } 4729 } else { 4730 kfree(flocks); 4731 flocks = NULL; 4732 } 4733 4734 if (recon_state->msg_version >= 3) { 4735 /* version, compat_version and struct_len */ 4736 total_len += 2 * sizeof(u8) + sizeof(u32); 4737 struct_v = 2; 4738 } 4739 /* 4740 * number of encoded locks is stable, so copy to pagelist 4741 */ 4742 struct_len = 2 * sizeof(u32) + 4743 (num_fcntl_locks + num_flock_locks) * 4744 sizeof(struct ceph_filelock); 4745 rec.v2.flock_len = cpu_to_le32(struct_len); 4746 4747 struct_len += sizeof(u32) + path_info.pathlen + sizeof(rec.v2); 4748 4749 if (struct_v >= 2) 4750 struct_len += sizeof(u64); /* snap_follows */ 4751 4752 total_len += struct_len; 4753 4754 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) { 4755 err = send_reconnect_partial(recon_state); 4756 if (err) 4757 goto out_freeflocks; 4758 pagelist = recon_state->pagelist; 4759 } 4760 4761 err = ceph_pagelist_reserve(pagelist, total_len); 4762 if (err) 4763 goto out_freeflocks; 4764 4765 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 4766 if (recon_state->msg_version >= 3) { 4767 ceph_pagelist_encode_8(pagelist, struct_v); 4768 ceph_pagelist_encode_8(pagelist, 1); 4769 ceph_pagelist_encode_32(pagelist, struct_len); 4770 } 4771 ceph_pagelist_encode_string(pagelist, (char *)path_info.path, path_info.pathlen); 4772 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2)); 4773 ceph_locks_to_pagelist(flocks, pagelist, 4774 num_fcntl_locks, num_flock_locks); 4775 if (struct_v >= 2) 4776 ceph_pagelist_encode_64(pagelist, snap_follows); 4777 out_freeflocks: 4778 kfree(flocks); 4779 } else { 4780 err = ceph_pagelist_reserve(pagelist, 4781 sizeof(u64) + sizeof(u32) + 4782 path_info.pathlen + sizeof(rec.v1)); 4783 if (err) 4784 goto out_err; 4785 4786 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 4787 ceph_pagelist_encode_string(pagelist, (char *)path_info.path, path_info.pathlen); 4788 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); 4789 } 4790 4791 out_err: 4792 ceph_mdsc_free_path_info(&path_info); 4793 if (!err) 4794 recon_state->nr_caps++; 4795 return err; 4796 } 4797 4798 static int encode_snap_realms(struct ceph_mds_client *mdsc, 4799 struct ceph_reconnect_state *recon_state) 4800 { 4801 struct rb_node *p; 4802 struct ceph_pagelist *pagelist = recon_state->pagelist; 4803 struct ceph_client *cl = mdsc->fsc->client; 4804 int err = 0; 4805 4806 if (recon_state->msg_version >= 4) { 4807 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms); 4808 if (err < 0) 4809 goto fail; 4810 } 4811 4812 /* 4813 * snaprealms. we provide mds with the ino, seq (version), and 4814 * parent for all of our realms. If the mds has any newer info, 4815 * it will tell us. 4816 */ 4817 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { 4818 struct ceph_snap_realm *realm = 4819 rb_entry(p, struct ceph_snap_realm, node); 4820 struct ceph_mds_snaprealm_reconnect sr_rec; 4821 4822 if (recon_state->msg_version >= 4) { 4823 size_t need = sizeof(u8) * 2 + sizeof(u32) + 4824 sizeof(sr_rec); 4825 4826 if (pagelist->length + need > RECONNECT_MAX_SIZE) { 4827 err = send_reconnect_partial(recon_state); 4828 if (err) 4829 goto fail; 4830 pagelist = recon_state->pagelist; 4831 } 4832 4833 err = ceph_pagelist_reserve(pagelist, need); 4834 if (err) 4835 goto fail; 4836 4837 ceph_pagelist_encode_8(pagelist, 1); 4838 ceph_pagelist_encode_8(pagelist, 1); 4839 ceph_pagelist_encode_32(pagelist, sizeof(sr_rec)); 4840 } 4841 4842 doutc(cl, " adding snap realm %llx seq %lld parent %llx\n", 4843 realm->ino, realm->seq, realm->parent_ino); 4844 sr_rec.ino = cpu_to_le64(realm->ino); 4845 sr_rec.seq = cpu_to_le64(realm->seq); 4846 sr_rec.parent = cpu_to_le64(realm->parent_ino); 4847 4848 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); 4849 if (err) 4850 goto fail; 4851 4852 recon_state->nr_realms++; 4853 } 4854 fail: 4855 return err; 4856 } 4857 4858 4859 /* 4860 * If an MDS fails and recovers, clients need to reconnect in order to 4861 * reestablish shared state. This includes all caps issued through 4862 * this session _and_ the snap_realm hierarchy. Because it's not 4863 * clear which snap realms the mds cares about, we send everything we 4864 * know about.. that ensures we'll then get any new info the 4865 * recovering MDS might have. 4866 * 4867 * This is a relatively heavyweight operation, but it's rare. 4868 */ 4869 static void send_mds_reconnect(struct ceph_mds_client *mdsc, 4870 struct ceph_mds_session *session) 4871 { 4872 struct ceph_client *cl = mdsc->fsc->client; 4873 struct ceph_msg *reply; 4874 int mds = session->s_mds; 4875 int err = -ENOMEM; 4876 struct ceph_reconnect_state recon_state = { 4877 .session = session, 4878 }; 4879 LIST_HEAD(dispose); 4880 4881 pr_info_client(cl, "mds%d reconnect start\n", mds); 4882 4883 recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS); 4884 if (!recon_state.pagelist) 4885 goto fail_nopagelist; 4886 4887 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 4888 if (!reply) 4889 goto fail_nomsg; 4890 4891 xa_destroy(&session->s_delegated_inos); 4892 4893 mutex_lock(&session->s_mutex); 4894 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 4895 session->s_seq = 0; 4896 4897 doutc(cl, "session %p state %s\n", session, 4898 ceph_session_state_name(session->s_state)); 4899 4900 atomic_inc(&session->s_cap_gen); 4901 4902 spin_lock(&session->s_cap_lock); 4903 /* don't know if session is readonly */ 4904 session->s_readonly = 0; 4905 /* 4906 * notify __ceph_remove_cap() that we are composing cap reconnect. 4907 * If a cap get released before being added to the cap reconnect, 4908 * __ceph_remove_cap() should skip queuing cap release. 4909 */ 4910 session->s_cap_reconnect = 1; 4911 /* drop old cap expires; we're about to reestablish that state */ 4912 detach_cap_releases(session, &dispose); 4913 spin_unlock(&session->s_cap_lock); 4914 dispose_cap_releases(mdsc, &dispose); 4915 4916 /* trim unused caps to reduce MDS's cache rejoin time */ 4917 if (mdsc->fsc->sb->s_root) 4918 shrink_dcache_parent(mdsc->fsc->sb->s_root); 4919 4920 ceph_con_close(&session->s_con); 4921 ceph_con_open(&session->s_con, 4922 CEPH_ENTITY_TYPE_MDS, mds, 4923 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 4924 4925 /* replay unsafe requests */ 4926 replay_unsafe_requests(mdsc, session); 4927 4928 ceph_early_kick_flushing_caps(mdsc, session); 4929 4930 down_read(&mdsc->snap_rwsem); 4931 4932 /* placeholder for nr_caps */ 4933 err = ceph_pagelist_encode_32(recon_state.pagelist, 0); 4934 if (err) 4935 goto fail; 4936 4937 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) { 4938 recon_state.msg_version = 3; 4939 recon_state.allow_multi = true; 4940 } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) { 4941 recon_state.msg_version = 3; 4942 } else { 4943 recon_state.msg_version = 2; 4944 } 4945 /* traverse this session's caps */ 4946 err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state); 4947 4948 spin_lock(&session->s_cap_lock); 4949 session->s_cap_reconnect = 0; 4950 spin_unlock(&session->s_cap_lock); 4951 4952 if (err < 0) 4953 goto fail; 4954 4955 /* check if all realms can be encoded into current message */ 4956 if (mdsc->num_snap_realms) { 4957 size_t total_len = 4958 recon_state.pagelist->length + 4959 mdsc->num_snap_realms * 4960 sizeof(struct ceph_mds_snaprealm_reconnect); 4961 if (recon_state.msg_version >= 4) { 4962 /* number of realms */ 4963 total_len += sizeof(u32); 4964 /* version, compat_version and struct_len */ 4965 total_len += mdsc->num_snap_realms * 4966 (2 * sizeof(u8) + sizeof(u32)); 4967 } 4968 if (total_len > RECONNECT_MAX_SIZE) { 4969 if (!recon_state.allow_multi) { 4970 err = -ENOSPC; 4971 goto fail; 4972 } 4973 if (recon_state.nr_caps) { 4974 err = send_reconnect_partial(&recon_state); 4975 if (err) 4976 goto fail; 4977 } 4978 recon_state.msg_version = 5; 4979 } 4980 } 4981 4982 err = encode_snap_realms(mdsc, &recon_state); 4983 if (err < 0) 4984 goto fail; 4985 4986 if (recon_state.msg_version >= 5) { 4987 err = ceph_pagelist_encode_8(recon_state.pagelist, 0); 4988 if (err < 0) 4989 goto fail; 4990 } 4991 4992 if (recon_state.nr_caps || recon_state.nr_realms) { 4993 struct page *page = 4994 list_first_entry(&recon_state.pagelist->head, 4995 struct page, lru); 4996 __le32 *addr = kmap_atomic(page); 4997 if (recon_state.nr_caps) { 4998 WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms); 4999 *addr = cpu_to_le32(recon_state.nr_caps); 5000 } else if (recon_state.msg_version >= 4) { 5001 *(addr + 1) = cpu_to_le32(recon_state.nr_realms); 5002 } 5003 kunmap_atomic(addr); 5004 } 5005 5006 reply->hdr.version = cpu_to_le16(recon_state.msg_version); 5007 if (recon_state.msg_version >= 4) 5008 reply->hdr.compat_version = cpu_to_le16(4); 5009 5010 reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length); 5011 ceph_msg_data_add_pagelist(reply, recon_state.pagelist); 5012 5013 ceph_con_send(&session->s_con, reply); 5014 5015 mutex_unlock(&session->s_mutex); 5016 5017 mutex_lock(&mdsc->mutex); 5018 __wake_requests(mdsc, &session->s_waiting); 5019 mutex_unlock(&mdsc->mutex); 5020 5021 up_read(&mdsc->snap_rwsem); 5022 ceph_pagelist_release(recon_state.pagelist); 5023 return; 5024 5025 fail: 5026 ceph_msg_put(reply); 5027 up_read(&mdsc->snap_rwsem); 5028 mutex_unlock(&session->s_mutex); 5029 fail_nomsg: 5030 ceph_pagelist_release(recon_state.pagelist); 5031 fail_nopagelist: 5032 pr_err_client(cl, "error %d preparing reconnect for mds%d\n", 5033 err, mds); 5034 return; 5035 } 5036 5037 5038 /* 5039 * compare old and new mdsmaps, kicking requests 5040 * and closing out old connections as necessary 5041 * 5042 * called under mdsc->mutex. 5043 */ 5044 static void check_new_map(struct ceph_mds_client *mdsc, 5045 struct ceph_mdsmap *newmap, 5046 struct ceph_mdsmap *oldmap) 5047 { 5048 int i, j, err; 5049 int oldstate, newstate; 5050 struct ceph_mds_session *s; 5051 unsigned long targets[DIV_ROUND_UP(CEPH_MAX_MDS, sizeof(unsigned long))] = {0}; 5052 struct ceph_client *cl = mdsc->fsc->client; 5053 5054 doutc(cl, "new %u old %u\n", newmap->m_epoch, oldmap->m_epoch); 5055 5056 if (newmap->m_info) { 5057 for (i = 0; i < newmap->possible_max_rank; i++) { 5058 for (j = 0; j < newmap->m_info[i].num_export_targets; j++) 5059 set_bit(newmap->m_info[i].export_targets[j], targets); 5060 } 5061 } 5062 5063 for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) { 5064 if (!mdsc->sessions[i]) 5065 continue; 5066 s = mdsc->sessions[i]; 5067 oldstate = ceph_mdsmap_get_state(oldmap, i); 5068 newstate = ceph_mdsmap_get_state(newmap, i); 5069 5070 doutc(cl, "mds%d state %s%s -> %s%s (session %s)\n", 5071 i, ceph_mds_state_name(oldstate), 5072 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", 5073 ceph_mds_state_name(newstate), 5074 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", 5075 ceph_session_state_name(s->s_state)); 5076 5077 if (i >= newmap->possible_max_rank) { 5078 /* force close session for stopped mds */ 5079 ceph_get_mds_session(s); 5080 __unregister_session(mdsc, s); 5081 __wake_requests(mdsc, &s->s_waiting); 5082 mutex_unlock(&mdsc->mutex); 5083 5084 mutex_lock(&s->s_mutex); 5085 cleanup_session_requests(mdsc, s); 5086 remove_session_caps(s); 5087 mutex_unlock(&s->s_mutex); 5088 5089 ceph_put_mds_session(s); 5090 5091 mutex_lock(&mdsc->mutex); 5092 kick_requests(mdsc, i); 5093 continue; 5094 } 5095 5096 if (memcmp(ceph_mdsmap_get_addr(oldmap, i), 5097 ceph_mdsmap_get_addr(newmap, i), 5098 sizeof(struct ceph_entity_addr))) { 5099 /* just close it */ 5100 mutex_unlock(&mdsc->mutex); 5101 mutex_lock(&s->s_mutex); 5102 mutex_lock(&mdsc->mutex); 5103 ceph_con_close(&s->s_con); 5104 mutex_unlock(&s->s_mutex); 5105 s->s_state = CEPH_MDS_SESSION_RESTARTING; 5106 } else if (oldstate == newstate) { 5107 continue; /* nothing new with this mds */ 5108 } 5109 5110 /* 5111 * send reconnect? 5112 */ 5113 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 5114 newstate >= CEPH_MDS_STATE_RECONNECT) { 5115 mutex_unlock(&mdsc->mutex); 5116 clear_bit(i, targets); 5117 send_mds_reconnect(mdsc, s); 5118 mutex_lock(&mdsc->mutex); 5119 } 5120 5121 /* 5122 * kick request on any mds that has gone active. 5123 */ 5124 if (oldstate < CEPH_MDS_STATE_ACTIVE && 5125 newstate >= CEPH_MDS_STATE_ACTIVE) { 5126 if (oldstate != CEPH_MDS_STATE_CREATING && 5127 oldstate != CEPH_MDS_STATE_STARTING) 5128 pr_info_client(cl, "mds%d recovery completed\n", 5129 s->s_mds); 5130 kick_requests(mdsc, i); 5131 mutex_unlock(&mdsc->mutex); 5132 mutex_lock(&s->s_mutex); 5133 mutex_lock(&mdsc->mutex); 5134 ceph_kick_flushing_caps(mdsc, s); 5135 mutex_unlock(&s->s_mutex); 5136 wake_up_session_caps(s, RECONNECT); 5137 } 5138 } 5139 5140 /* 5141 * Only open and reconnect sessions that don't exist yet. 5142 */ 5143 for (i = 0; i < newmap->possible_max_rank; i++) { 5144 /* 5145 * In case the import MDS is crashed just after 5146 * the EImportStart journal is flushed, so when 5147 * a standby MDS takes over it and is replaying 5148 * the EImportStart journal the new MDS daemon 5149 * will wait the client to reconnect it, but the 5150 * client may never register/open the session yet. 5151 * 5152 * Will try to reconnect that MDS daemon if the 5153 * rank number is in the export targets array and 5154 * is the up:reconnect state. 5155 */ 5156 newstate = ceph_mdsmap_get_state(newmap, i); 5157 if (!test_bit(i, targets) || newstate != CEPH_MDS_STATE_RECONNECT) 5158 continue; 5159 5160 /* 5161 * The session maybe registered and opened by some 5162 * requests which were choosing random MDSes during 5163 * the mdsc->mutex's unlock/lock gap below in rare 5164 * case. But the related MDS daemon will just queue 5165 * that requests and be still waiting for the client's 5166 * reconnection request in up:reconnect state. 5167 */ 5168 s = __ceph_lookup_mds_session(mdsc, i); 5169 if (likely(!s)) { 5170 s = __open_export_target_session(mdsc, i); 5171 if (IS_ERR(s)) { 5172 err = PTR_ERR(s); 5173 pr_err_client(cl, 5174 "failed to open export target session, err %d\n", 5175 err); 5176 continue; 5177 } 5178 } 5179 doutc(cl, "send reconnect to export target mds.%d\n", i); 5180 mutex_unlock(&mdsc->mutex); 5181 send_mds_reconnect(mdsc, s); 5182 ceph_put_mds_session(s); 5183 mutex_lock(&mdsc->mutex); 5184 } 5185 5186 for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) { 5187 s = mdsc->sessions[i]; 5188 if (!s) 5189 continue; 5190 if (!ceph_mdsmap_is_laggy(newmap, i)) 5191 continue; 5192 if (s->s_state == CEPH_MDS_SESSION_OPEN || 5193 s->s_state == CEPH_MDS_SESSION_HUNG || 5194 s->s_state == CEPH_MDS_SESSION_CLOSING) { 5195 doutc(cl, " connecting to export targets of laggy mds%d\n", i); 5196 __open_export_target_sessions(mdsc, s); 5197 } 5198 } 5199 } 5200 5201 5202 5203 /* 5204 * leases 5205 */ 5206 5207 /* 5208 * caller must hold session s_mutex, dentry->d_lock 5209 */ 5210 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry) 5211 { 5212 struct ceph_dentry_info *di = ceph_dentry(dentry); 5213 5214 ceph_put_mds_session(di->lease_session); 5215 di->lease_session = NULL; 5216 } 5217 5218 static void handle_lease(struct ceph_mds_client *mdsc, 5219 struct ceph_mds_session *session, 5220 struct ceph_msg *msg) 5221 { 5222 struct ceph_client *cl = mdsc->fsc->client; 5223 struct super_block *sb = mdsc->fsc->sb; 5224 struct inode *inode; 5225 struct dentry *parent, *dentry; 5226 struct ceph_dentry_info *di; 5227 int mds = session->s_mds; 5228 struct ceph_mds_lease *h = msg->front.iov_base; 5229 u32 seq; 5230 struct ceph_vino vino; 5231 struct qstr dname; 5232 int release = 0; 5233 5234 doutc(cl, "from mds%d\n", mds); 5235 5236 if (!ceph_inc_mds_stopping_blocker(mdsc, session)) 5237 return; 5238 5239 /* decode */ 5240 if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) 5241 goto bad; 5242 vino.ino = le64_to_cpu(h->ino); 5243 vino.snap = CEPH_NOSNAP; 5244 seq = le32_to_cpu(h->seq); 5245 dname.len = get_unaligned_le32(h + 1); 5246 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len) 5247 goto bad; 5248 dname.name = (void *)(h + 1) + sizeof(u32); 5249 5250 /* lookup inode */ 5251 inode = ceph_find_inode(sb, vino); 5252 doutc(cl, "%s, ino %llx %p %.*s\n", ceph_lease_op_name(h->action), 5253 vino.ino, inode, dname.len, dname.name); 5254 5255 mutex_lock(&session->s_mutex); 5256 if (!inode) { 5257 doutc(cl, "no inode %llx\n", vino.ino); 5258 goto release; 5259 } 5260 5261 /* dentry */ 5262 parent = d_find_alias(inode); 5263 if (!parent) { 5264 doutc(cl, "no parent dentry on inode %p\n", inode); 5265 WARN_ON(1); 5266 goto release; /* hrm... */ 5267 } 5268 dname.hash = full_name_hash(parent, dname.name, dname.len); 5269 dentry = d_lookup(parent, &dname); 5270 dput(parent); 5271 if (!dentry) 5272 goto release; 5273 5274 spin_lock(&dentry->d_lock); 5275 di = ceph_dentry(dentry); 5276 switch (h->action) { 5277 case CEPH_MDS_LEASE_REVOKE: 5278 if (di->lease_session == session) { 5279 if (ceph_seq_cmp(di->lease_seq, seq) > 0) 5280 h->seq = cpu_to_le32(di->lease_seq); 5281 __ceph_mdsc_drop_dentry_lease(dentry); 5282 } 5283 release = 1; 5284 break; 5285 5286 case CEPH_MDS_LEASE_RENEW: 5287 if (di->lease_session == session && 5288 di->lease_gen == atomic_read(&session->s_cap_gen) && 5289 di->lease_renew_from && 5290 di->lease_renew_after == 0) { 5291 unsigned long duration = 5292 msecs_to_jiffies(le32_to_cpu(h->duration_ms)); 5293 5294 di->lease_seq = seq; 5295 di->time = di->lease_renew_from + duration; 5296 di->lease_renew_after = di->lease_renew_from + 5297 (duration >> 1); 5298 di->lease_renew_from = 0; 5299 } 5300 break; 5301 } 5302 spin_unlock(&dentry->d_lock); 5303 dput(dentry); 5304 5305 if (!release) 5306 goto out; 5307 5308 release: 5309 /* let's just reuse the same message */ 5310 h->action = CEPH_MDS_LEASE_REVOKE_ACK; 5311 ceph_msg_get(msg); 5312 ceph_con_send(&session->s_con, msg); 5313 5314 out: 5315 mutex_unlock(&session->s_mutex); 5316 iput(inode); 5317 5318 ceph_dec_mds_stopping_blocker(mdsc); 5319 return; 5320 5321 bad: 5322 ceph_dec_mds_stopping_blocker(mdsc); 5323 5324 pr_err_client(cl, "corrupt lease message\n"); 5325 ceph_msg_dump(msg); 5326 } 5327 5328 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, 5329 struct dentry *dentry, char action, 5330 u32 seq) 5331 { 5332 struct ceph_client *cl = session->s_mdsc->fsc->client; 5333 struct ceph_msg *msg; 5334 struct ceph_mds_lease *lease; 5335 struct inode *dir; 5336 int len = sizeof(*lease) + sizeof(u32) + NAME_MAX; 5337 5338 doutc(cl, "identry %p %s to mds%d\n", dentry, ceph_lease_op_name(action), 5339 session->s_mds); 5340 5341 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); 5342 if (!msg) 5343 return; 5344 lease = msg->front.iov_base; 5345 lease->action = action; 5346 lease->seq = cpu_to_le32(seq); 5347 5348 spin_lock(&dentry->d_lock); 5349 dir = d_inode(dentry->d_parent); 5350 lease->ino = cpu_to_le64(ceph_ino(dir)); 5351 lease->first = lease->last = cpu_to_le64(ceph_snap(dir)); 5352 5353 put_unaligned_le32(dentry->d_name.len, lease + 1); 5354 memcpy((void *)(lease + 1) + 4, 5355 dentry->d_name.name, dentry->d_name.len); 5356 spin_unlock(&dentry->d_lock); 5357 5358 ceph_con_send(&session->s_con, msg); 5359 } 5360 5361 /* 5362 * lock unlock the session, to wait ongoing session activities 5363 */ 5364 static void lock_unlock_session(struct ceph_mds_session *s) 5365 { 5366 mutex_lock(&s->s_mutex); 5367 mutex_unlock(&s->s_mutex); 5368 } 5369 5370 static void maybe_recover_session(struct ceph_mds_client *mdsc) 5371 { 5372 struct ceph_client *cl = mdsc->fsc->client; 5373 struct ceph_fs_client *fsc = mdsc->fsc; 5374 5375 if (!ceph_test_mount_opt(fsc, CLEANRECOVER)) 5376 return; 5377 5378 if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED) 5379 return; 5380 5381 if (!READ_ONCE(fsc->blocklisted)) 5382 return; 5383 5384 pr_info_client(cl, "auto reconnect after blocklisted\n"); 5385 ceph_force_reconnect(fsc->sb); 5386 } 5387 5388 bool check_session_state(struct ceph_mds_session *s) 5389 { 5390 struct ceph_client *cl = s->s_mdsc->fsc->client; 5391 5392 switch (s->s_state) { 5393 case CEPH_MDS_SESSION_OPEN: 5394 if (s->s_ttl && time_after(jiffies, s->s_ttl)) { 5395 s->s_state = CEPH_MDS_SESSION_HUNG; 5396 pr_info_client(cl, "mds%d hung\n", s->s_mds); 5397 } 5398 break; 5399 case CEPH_MDS_SESSION_CLOSING: 5400 case CEPH_MDS_SESSION_NEW: 5401 case CEPH_MDS_SESSION_RESTARTING: 5402 case CEPH_MDS_SESSION_CLOSED: 5403 case CEPH_MDS_SESSION_REJECTED: 5404 return false; 5405 } 5406 5407 return true; 5408 } 5409 5410 /* 5411 * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply, 5412 * then we need to retransmit that request. 5413 */ 5414 void inc_session_sequence(struct ceph_mds_session *s) 5415 { 5416 struct ceph_client *cl = s->s_mdsc->fsc->client; 5417 5418 lockdep_assert_held(&s->s_mutex); 5419 5420 s->s_seq++; 5421 5422 if (s->s_state == CEPH_MDS_SESSION_CLOSING) { 5423 int ret; 5424 5425 doutc(cl, "resending session close request for mds%d\n", s->s_mds); 5426 ret = request_close_session(s); 5427 if (ret < 0) 5428 pr_err_client(cl, "unable to close session to mds%d: %d\n", 5429 s->s_mds, ret); 5430 } 5431 } 5432 5433 /* 5434 * delayed work -- periodically trim expired leases, renew caps with mds. If 5435 * the @delay parameter is set to 0 or if it's more than 5 secs, the default 5436 * workqueue delay value of 5 secs will be used. 5437 */ 5438 static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay) 5439 { 5440 unsigned long max_delay = HZ * 5; 5441 5442 /* 5 secs default delay */ 5443 if (!delay || (delay > max_delay)) 5444 delay = max_delay; 5445 schedule_delayed_work(&mdsc->delayed_work, 5446 round_jiffies_relative(delay)); 5447 } 5448 5449 static void delayed_work(struct work_struct *work) 5450 { 5451 struct ceph_mds_client *mdsc = 5452 container_of(work, struct ceph_mds_client, delayed_work.work); 5453 unsigned long delay; 5454 int renew_interval; 5455 int renew_caps; 5456 int i; 5457 5458 doutc(mdsc->fsc->client, "mdsc delayed_work\n"); 5459 5460 if (mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHED) 5461 return; 5462 5463 mutex_lock(&mdsc->mutex); 5464 renew_interval = mdsc->mdsmap->m_session_timeout >> 2; 5465 renew_caps = time_after_eq(jiffies, HZ*renew_interval + 5466 mdsc->last_renew_caps); 5467 if (renew_caps) 5468 mdsc->last_renew_caps = jiffies; 5469 5470 for (i = 0; i < mdsc->max_sessions; i++) { 5471 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 5472 if (!s) 5473 continue; 5474 5475 if (!check_session_state(s)) { 5476 ceph_put_mds_session(s); 5477 continue; 5478 } 5479 mutex_unlock(&mdsc->mutex); 5480 5481 ceph_flush_session_cap_releases(mdsc, s); 5482 5483 mutex_lock(&s->s_mutex); 5484 if (renew_caps) 5485 send_renew_caps(mdsc, s); 5486 else 5487 ceph_con_keepalive(&s->s_con); 5488 if (s->s_state == CEPH_MDS_SESSION_OPEN || 5489 s->s_state == CEPH_MDS_SESSION_HUNG) 5490 ceph_send_cap_releases(mdsc, s); 5491 mutex_unlock(&s->s_mutex); 5492 ceph_put_mds_session(s); 5493 5494 mutex_lock(&mdsc->mutex); 5495 } 5496 mutex_unlock(&mdsc->mutex); 5497 5498 delay = ceph_check_delayed_caps(mdsc); 5499 5500 ceph_queue_cap_reclaim_work(mdsc); 5501 5502 ceph_trim_snapid_map(mdsc); 5503 5504 maybe_recover_session(mdsc); 5505 5506 schedule_delayed(mdsc, delay); 5507 } 5508 5509 int ceph_mdsc_init(struct ceph_fs_client *fsc) 5510 5511 { 5512 struct ceph_mds_client *mdsc; 5513 int err; 5514 5515 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS); 5516 if (!mdsc) 5517 return -ENOMEM; 5518 mdsc->fsc = fsc; 5519 mutex_init(&mdsc->mutex); 5520 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); 5521 if (!mdsc->mdsmap) { 5522 err = -ENOMEM; 5523 goto err_mdsc; 5524 } 5525 5526 init_completion(&mdsc->safe_umount_waiters); 5527 spin_lock_init(&mdsc->stopping_lock); 5528 atomic_set(&mdsc->stopping_blockers, 0); 5529 init_completion(&mdsc->stopping_waiter); 5530 atomic64_set(&mdsc->dirty_folios, 0); 5531 init_waitqueue_head(&mdsc->flush_end_wq); 5532 init_waitqueue_head(&mdsc->session_close_wq); 5533 INIT_LIST_HEAD(&mdsc->waiting_for_map); 5534 mdsc->quotarealms_inodes = RB_ROOT; 5535 mutex_init(&mdsc->quotarealms_inodes_mutex); 5536 init_rwsem(&mdsc->snap_rwsem); 5537 mdsc->snap_realms = RB_ROOT; 5538 INIT_LIST_HEAD(&mdsc->snap_empty); 5539 spin_lock_init(&mdsc->snap_empty_lock); 5540 mdsc->request_tree = RB_ROOT; 5541 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); 5542 mdsc->last_renew_caps = jiffies; 5543 INIT_LIST_HEAD(&mdsc->cap_delay_list); 5544 #ifdef CONFIG_DEBUG_FS 5545 INIT_LIST_HEAD(&mdsc->cap_wait_list); 5546 #endif 5547 spin_lock_init(&mdsc->cap_delay_lock); 5548 INIT_LIST_HEAD(&mdsc->cap_unlink_delay_list); 5549 INIT_LIST_HEAD(&mdsc->snap_flush_list); 5550 spin_lock_init(&mdsc->snap_flush_lock); 5551 mdsc->last_cap_flush_tid = 1; 5552 INIT_LIST_HEAD(&mdsc->cap_flush_list); 5553 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); 5554 spin_lock_init(&mdsc->cap_dirty_lock); 5555 init_waitqueue_head(&mdsc->cap_flushing_wq); 5556 INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work); 5557 INIT_WORK(&mdsc->cap_unlink_work, ceph_cap_unlink_work); 5558 err = ceph_metric_init(&mdsc->metric); 5559 if (err) 5560 goto err_mdsmap; 5561 5562 spin_lock_init(&mdsc->dentry_list_lock); 5563 INIT_LIST_HEAD(&mdsc->dentry_leases); 5564 INIT_LIST_HEAD(&mdsc->dentry_dir_leases); 5565 5566 ceph_caps_init(mdsc); 5567 ceph_adjust_caps_max_min(mdsc, fsc->mount_options); 5568 5569 spin_lock_init(&mdsc->snapid_map_lock); 5570 mdsc->snapid_map_tree = RB_ROOT; 5571 INIT_LIST_HEAD(&mdsc->snapid_map_lru); 5572 5573 init_rwsem(&mdsc->pool_perm_rwsem); 5574 mdsc->pool_perm_tree = RB_ROOT; 5575 5576 strscpy(mdsc->nodename, utsname()->nodename, 5577 sizeof(mdsc->nodename)); 5578 5579 fsc->mdsc = mdsc; 5580 return 0; 5581 5582 err_mdsmap: 5583 kfree(mdsc->mdsmap); 5584 err_mdsc: 5585 kfree(mdsc); 5586 return err; 5587 } 5588 5589 /* 5590 * Wait for safe replies on open mds requests. If we time out, drop 5591 * all requests from the tree to avoid dangling dentry refs. 5592 */ 5593 static void wait_requests(struct ceph_mds_client *mdsc) 5594 { 5595 struct ceph_client *cl = mdsc->fsc->client; 5596 struct ceph_options *opts = mdsc->fsc->client->options; 5597 struct ceph_mds_request *req; 5598 5599 mutex_lock(&mdsc->mutex); 5600 if (__get_oldest_req(mdsc)) { 5601 mutex_unlock(&mdsc->mutex); 5602 5603 doutc(cl, "waiting for requests\n"); 5604 wait_for_completion_timeout(&mdsc->safe_umount_waiters, 5605 ceph_timeout_jiffies(opts->mount_timeout)); 5606 5607 /* tear down remaining requests */ 5608 mutex_lock(&mdsc->mutex); 5609 while ((req = __get_oldest_req(mdsc))) { 5610 doutc(cl, "timed out on tid %llu\n", req->r_tid); 5611 list_del_init(&req->r_wait); 5612 __unregister_request(mdsc, req); 5613 } 5614 } 5615 mutex_unlock(&mdsc->mutex); 5616 doutc(cl, "done\n"); 5617 } 5618 5619 void send_flush_mdlog(struct ceph_mds_session *s) 5620 { 5621 struct ceph_client *cl = s->s_mdsc->fsc->client; 5622 struct ceph_msg *msg; 5623 5624 /* 5625 * Pre-luminous MDS crashes when it sees an unknown session request 5626 */ 5627 if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS)) 5628 return; 5629 5630 mutex_lock(&s->s_mutex); 5631 doutc(cl, "request mdlog flush to mds%d (%s)s seq %lld\n", 5632 s->s_mds, ceph_session_state_name(s->s_state), s->s_seq); 5633 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG, 5634 s->s_seq); 5635 if (!msg) { 5636 pr_err_client(cl, "failed to request mdlog flush to mds%d (%s) seq %lld\n", 5637 s->s_mds, ceph_session_state_name(s->s_state), s->s_seq); 5638 } else { 5639 ceph_con_send(&s->s_con, msg); 5640 } 5641 mutex_unlock(&s->s_mutex); 5642 } 5643 5644 static int ceph_mds_auth_match(struct ceph_mds_client *mdsc, 5645 struct ceph_mds_cap_auth *auth, 5646 const struct cred *cred, 5647 char *tpath) 5648 { 5649 u32 caller_uid = from_kuid(&init_user_ns, cred->fsuid); 5650 u32 caller_gid = from_kgid(&init_user_ns, cred->fsgid); 5651 struct ceph_client *cl = mdsc->fsc->client; 5652 const char *spath = mdsc->fsc->mount_options->server_path; 5653 bool gid_matched = false; 5654 u32 gid, tlen, len; 5655 int i, j; 5656 5657 doutc(cl, "match.uid %lld\n", auth->match.uid); 5658 if (auth->match.uid != MDS_AUTH_UID_ANY) { 5659 if (auth->match.uid != caller_uid) 5660 return 0; 5661 if (auth->match.num_gids) { 5662 for (i = 0; i < auth->match.num_gids; i++) { 5663 if (caller_gid == auth->match.gids[i]) 5664 gid_matched = true; 5665 } 5666 if (!gid_matched && cred->group_info->ngroups) { 5667 for (i = 0; i < cred->group_info->ngroups; i++) { 5668 gid = from_kgid(&init_user_ns, 5669 cred->group_info->gid[i]); 5670 for (j = 0; j < auth->match.num_gids; j++) { 5671 if (gid == auth->match.gids[j]) { 5672 gid_matched = true; 5673 break; 5674 } 5675 } 5676 if (gid_matched) 5677 break; 5678 } 5679 } 5680 if (!gid_matched) 5681 return 0; 5682 } 5683 } 5684 5685 /* path match */ 5686 if (auth->match.path) { 5687 if (!tpath) 5688 return 0; 5689 5690 tlen = strlen(tpath); 5691 len = strlen(auth->match.path); 5692 if (len) { 5693 char *_tpath = tpath; 5694 bool free_tpath = false; 5695 int m, n; 5696 5697 doutc(cl, "server path %s, tpath %s, match.path %s\n", 5698 spath, tpath, auth->match.path); 5699 if (spath && (m = strlen(spath)) != 1) { 5700 /* mount path + '/' + tpath + an extra space */ 5701 n = m + 1 + tlen + 1; 5702 _tpath = kmalloc(n, GFP_NOFS); 5703 if (!_tpath) 5704 return -ENOMEM; 5705 /* remove the leading '/' */ 5706 snprintf(_tpath, n, "%s/%s", spath + 1, tpath); 5707 free_tpath = true; 5708 tlen = strlen(_tpath); 5709 } 5710 5711 /* 5712 * Please note the tailing '/' for match.path has already 5713 * been removed when parsing. 5714 * 5715 * Remove the tailing '/' for the target path. 5716 */ 5717 while (tlen && _tpath[tlen - 1] == '/') { 5718 _tpath[tlen - 1] = '\0'; 5719 tlen -= 1; 5720 } 5721 doutc(cl, "_tpath %s\n", _tpath); 5722 5723 /* 5724 * In case first == _tpath && tlen == len: 5725 * match.path=/foo --> /foo _path=/foo --> match 5726 * match.path=/foo/ --> /foo _path=/foo --> match 5727 * 5728 * In case first == _tmatch.path && tlen > len: 5729 * match.path=/foo/ --> /foo _path=/foo/ --> match 5730 * match.path=/foo --> /foo _path=/foo/ --> match 5731 * match.path=/foo/ --> /foo _path=/foo/d --> match 5732 * match.path=/foo --> /foo _path=/food --> mismatch 5733 * 5734 * All the other cases --> mismatch 5735 */ 5736 bool path_matched = true; 5737 char *first = strstr(_tpath, auth->match.path); 5738 if (first != _tpath || 5739 (tlen > len && _tpath[len] != '/')) { 5740 path_matched = false; 5741 } 5742 5743 if (free_tpath) 5744 kfree(_tpath); 5745 5746 if (!path_matched) 5747 return 0; 5748 } 5749 } 5750 5751 doutc(cl, "matched\n"); 5752 return 1; 5753 } 5754 5755 int ceph_mds_check_access(struct ceph_mds_client *mdsc, char *tpath, int mask) 5756 { 5757 const struct cred *cred = get_current_cred(); 5758 u32 caller_uid = from_kuid(&init_user_ns, cred->fsuid); 5759 u32 caller_gid = from_kgid(&init_user_ns, cred->fsgid); 5760 struct ceph_mds_cap_auth *rw_perms_s = NULL; 5761 struct ceph_client *cl = mdsc->fsc->client; 5762 bool root_squash_perms = true; 5763 int i, err; 5764 5765 doutc(cl, "tpath '%s', mask %d, caller_uid %d, caller_gid %d\n", 5766 tpath, mask, caller_uid, caller_gid); 5767 5768 for (i = 0; i < mdsc->s_cap_auths_num; i++) { 5769 struct ceph_mds_cap_auth *s = &mdsc->s_cap_auths[i]; 5770 5771 err = ceph_mds_auth_match(mdsc, s, cred, tpath); 5772 if (err < 0) { 5773 put_cred(cred); 5774 return err; 5775 } else if (err > 0) { 5776 /* always follow the last auth caps' permission */ 5777 root_squash_perms = true; 5778 rw_perms_s = NULL; 5779 if ((mask & MAY_WRITE) && s->writeable && 5780 s->match.root_squash && (!caller_uid || !caller_gid)) 5781 root_squash_perms = false; 5782 5783 if (((mask & MAY_WRITE) && !s->writeable) || 5784 ((mask & MAY_READ) && !s->readable)) 5785 rw_perms_s = s; 5786 } 5787 } 5788 5789 put_cred(cred); 5790 5791 doutc(cl, "root_squash_perms %d, rw_perms_s %p\n", root_squash_perms, 5792 rw_perms_s); 5793 if (root_squash_perms && rw_perms_s == NULL) { 5794 doutc(cl, "access allowed\n"); 5795 return 0; 5796 } 5797 5798 if (!root_squash_perms) { 5799 doutc(cl, "root_squash is enabled and user(%d %d) isn't allowed to write", 5800 caller_uid, caller_gid); 5801 } 5802 if (rw_perms_s) { 5803 doutc(cl, "mds auth caps readable/writeable %d/%d while request r/w %d/%d", 5804 rw_perms_s->readable, rw_perms_s->writeable, 5805 !!(mask & MAY_READ), !!(mask & MAY_WRITE)); 5806 } 5807 doutc(cl, "access denied\n"); 5808 return -EACCES; 5809 } 5810 5811 /* 5812 * called before mount is ro, and before dentries are torn down. 5813 * (hmm, does this still race with new lookups?) 5814 */ 5815 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) 5816 { 5817 doutc(mdsc->fsc->client, "begin\n"); 5818 mdsc->stopping = CEPH_MDSC_STOPPING_BEGIN; 5819 5820 ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true); 5821 ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false); 5822 ceph_flush_dirty_caps(mdsc); 5823 wait_requests(mdsc); 5824 5825 /* 5826 * wait for reply handlers to drop their request refs and 5827 * their inode/dcache refs 5828 */ 5829 ceph_msgr_flush(); 5830 5831 ceph_cleanup_quotarealms_inodes(mdsc); 5832 doutc(mdsc->fsc->client, "done\n"); 5833 } 5834 5835 /* 5836 * flush the mdlog and wait for all write mds requests to flush. 5837 */ 5838 static void flush_mdlog_and_wait_mdsc_unsafe_requests(struct ceph_mds_client *mdsc, 5839 u64 want_tid) 5840 { 5841 struct ceph_client *cl = mdsc->fsc->client; 5842 struct ceph_mds_request *req = NULL, *nextreq; 5843 struct ceph_mds_session *last_session = NULL; 5844 struct rb_node *n; 5845 5846 mutex_lock(&mdsc->mutex); 5847 doutc(cl, "want %lld\n", want_tid); 5848 restart: 5849 req = __get_oldest_req(mdsc); 5850 while (req && req->r_tid <= want_tid) { 5851 /* find next request */ 5852 n = rb_next(&req->r_node); 5853 if (n) 5854 nextreq = rb_entry(n, struct ceph_mds_request, r_node); 5855 else 5856 nextreq = NULL; 5857 if (req->r_op != CEPH_MDS_OP_SETFILELOCK && 5858 (req->r_op & CEPH_MDS_OP_WRITE)) { 5859 struct ceph_mds_session *s = req->r_session; 5860 5861 if (!s) { 5862 req = nextreq; 5863 continue; 5864 } 5865 5866 /* write op */ 5867 ceph_mdsc_get_request(req); 5868 if (nextreq) 5869 ceph_mdsc_get_request(nextreq); 5870 s = ceph_get_mds_session(s); 5871 mutex_unlock(&mdsc->mutex); 5872 5873 /* send flush mdlog request to MDS */ 5874 if (last_session != s) { 5875 send_flush_mdlog(s); 5876 ceph_put_mds_session(last_session); 5877 last_session = s; 5878 } else { 5879 ceph_put_mds_session(s); 5880 } 5881 doutc(cl, "wait on %llu (want %llu)\n", 5882 req->r_tid, want_tid); 5883 wait_for_completion(&req->r_safe_completion); 5884 5885 mutex_lock(&mdsc->mutex); 5886 ceph_mdsc_put_request(req); 5887 if (!nextreq) 5888 break; /* next dne before, so we're done! */ 5889 if (RB_EMPTY_NODE(&nextreq->r_node)) { 5890 /* next request was removed from tree */ 5891 ceph_mdsc_put_request(nextreq); 5892 goto restart; 5893 } 5894 ceph_mdsc_put_request(nextreq); /* won't go away */ 5895 } 5896 req = nextreq; 5897 } 5898 mutex_unlock(&mdsc->mutex); 5899 ceph_put_mds_session(last_session); 5900 doutc(cl, "done\n"); 5901 } 5902 5903 void ceph_mdsc_sync(struct ceph_mds_client *mdsc) 5904 { 5905 struct ceph_client *cl = mdsc->fsc->client; 5906 u64 want_tid, want_flush; 5907 5908 if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) 5909 return; 5910 5911 doutc(cl, "sync\n"); 5912 mutex_lock(&mdsc->mutex); 5913 want_tid = mdsc->last_tid; 5914 mutex_unlock(&mdsc->mutex); 5915 5916 ceph_flush_dirty_caps(mdsc); 5917 ceph_flush_cap_releases(mdsc); 5918 spin_lock(&mdsc->cap_dirty_lock); 5919 want_flush = mdsc->last_cap_flush_tid; 5920 if (!list_empty(&mdsc->cap_flush_list)) { 5921 struct ceph_cap_flush *cf = 5922 list_last_entry(&mdsc->cap_flush_list, 5923 struct ceph_cap_flush, g_list); 5924 cf->wake = true; 5925 } 5926 spin_unlock(&mdsc->cap_dirty_lock); 5927 5928 doutc(cl, "sync want tid %lld flush_seq %lld\n", want_tid, want_flush); 5929 5930 flush_mdlog_and_wait_mdsc_unsafe_requests(mdsc, want_tid); 5931 wait_caps_flush(mdsc, want_flush); 5932 } 5933 5934 /* 5935 * true if all sessions are closed, or we force unmount 5936 */ 5937 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped) 5938 { 5939 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 5940 return true; 5941 return atomic_read(&mdsc->num_sessions) <= skipped; 5942 } 5943 5944 /* 5945 * called after sb is ro or when metadata corrupted. 5946 */ 5947 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) 5948 { 5949 struct ceph_options *opts = mdsc->fsc->client->options; 5950 struct ceph_client *cl = mdsc->fsc->client; 5951 struct ceph_mds_session *session; 5952 int i; 5953 int skipped = 0; 5954 5955 doutc(cl, "begin\n"); 5956 5957 /* close sessions */ 5958 mutex_lock(&mdsc->mutex); 5959 for (i = 0; i < mdsc->max_sessions; i++) { 5960 session = __ceph_lookup_mds_session(mdsc, i); 5961 if (!session) 5962 continue; 5963 mutex_unlock(&mdsc->mutex); 5964 mutex_lock(&session->s_mutex); 5965 if (__close_session(mdsc, session) <= 0) 5966 skipped++; 5967 mutex_unlock(&session->s_mutex); 5968 ceph_put_mds_session(session); 5969 mutex_lock(&mdsc->mutex); 5970 } 5971 mutex_unlock(&mdsc->mutex); 5972 5973 doutc(cl, "waiting for sessions to close\n"); 5974 wait_event_timeout(mdsc->session_close_wq, 5975 done_closing_sessions(mdsc, skipped), 5976 ceph_timeout_jiffies(opts->mount_timeout)); 5977 5978 /* tear down remaining sessions */ 5979 mutex_lock(&mdsc->mutex); 5980 for (i = 0; i < mdsc->max_sessions; i++) { 5981 if (mdsc->sessions[i]) { 5982 session = ceph_get_mds_session(mdsc->sessions[i]); 5983 __unregister_session(mdsc, session); 5984 mutex_unlock(&mdsc->mutex); 5985 mutex_lock(&session->s_mutex); 5986 remove_session_caps(session); 5987 mutex_unlock(&session->s_mutex); 5988 ceph_put_mds_session(session); 5989 mutex_lock(&mdsc->mutex); 5990 } 5991 } 5992 WARN_ON(!list_empty(&mdsc->cap_delay_list)); 5993 mutex_unlock(&mdsc->mutex); 5994 5995 ceph_cleanup_snapid_map(mdsc); 5996 ceph_cleanup_global_and_empty_realms(mdsc); 5997 5998 cancel_work_sync(&mdsc->cap_reclaim_work); 5999 cancel_work_sync(&mdsc->cap_unlink_work); 6000 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 6001 6002 doutc(cl, "done\n"); 6003 } 6004 6005 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc) 6006 { 6007 struct ceph_mds_session *session; 6008 int mds; 6009 6010 doutc(mdsc->fsc->client, "force umount\n"); 6011 6012 mutex_lock(&mdsc->mutex); 6013 for (mds = 0; mds < mdsc->max_sessions; mds++) { 6014 session = __ceph_lookup_mds_session(mdsc, mds); 6015 if (!session) 6016 continue; 6017 6018 if (session->s_state == CEPH_MDS_SESSION_REJECTED) 6019 __unregister_session(mdsc, session); 6020 __wake_requests(mdsc, &session->s_waiting); 6021 mutex_unlock(&mdsc->mutex); 6022 6023 mutex_lock(&session->s_mutex); 6024 __close_session(mdsc, session); 6025 if (session->s_state == CEPH_MDS_SESSION_CLOSING) { 6026 cleanup_session_requests(mdsc, session); 6027 remove_session_caps(session); 6028 } 6029 mutex_unlock(&session->s_mutex); 6030 ceph_put_mds_session(session); 6031 6032 mutex_lock(&mdsc->mutex); 6033 kick_requests(mdsc, mds); 6034 } 6035 __wake_requests(mdsc, &mdsc->waiting_for_map); 6036 mutex_unlock(&mdsc->mutex); 6037 } 6038 6039 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) 6040 { 6041 doutc(mdsc->fsc->client, "stop\n"); 6042 /* 6043 * Make sure the delayed work stopped before releasing 6044 * the resources. 6045 * 6046 * Because the cancel_delayed_work_sync() will only 6047 * guarantee that the work finishes executing. But the 6048 * delayed work will re-arm itself again after that. 6049 */ 6050 flush_delayed_work(&mdsc->delayed_work); 6051 6052 if (mdsc->mdsmap) 6053 ceph_mdsmap_destroy(mdsc->mdsmap); 6054 kfree(mdsc->sessions); 6055 ceph_caps_finalize(mdsc); 6056 6057 if (mdsc->s_cap_auths) { 6058 int i; 6059 6060 for (i = 0; i < mdsc->s_cap_auths_num; i++) { 6061 kfree(mdsc->s_cap_auths[i].match.gids); 6062 kfree(mdsc->s_cap_auths[i].match.path); 6063 kfree(mdsc->s_cap_auths[i].match.fs_name); 6064 } 6065 kfree(mdsc->s_cap_auths); 6066 } 6067 6068 ceph_pool_perm_destroy(mdsc); 6069 } 6070 6071 void ceph_mdsc_destroy(struct ceph_fs_client *fsc) 6072 { 6073 struct ceph_mds_client *mdsc = fsc->mdsc; 6074 doutc(fsc->client, "%p\n", mdsc); 6075 6076 if (!mdsc) 6077 return; 6078 6079 /* flush out any connection work with references to us */ 6080 ceph_msgr_flush(); 6081 6082 ceph_mdsc_stop(mdsc); 6083 6084 ceph_metric_destroy(&mdsc->metric); 6085 6086 fsc->mdsc = NULL; 6087 kfree(mdsc); 6088 doutc(fsc->client, "%p done\n", mdsc); 6089 } 6090 6091 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 6092 { 6093 struct ceph_fs_client *fsc = mdsc->fsc; 6094 struct ceph_client *cl = fsc->client; 6095 const char *mds_namespace = fsc->mount_options->mds_namespace; 6096 void *p = msg->front.iov_base; 6097 void *end = p + msg->front.iov_len; 6098 u32 epoch; 6099 u32 num_fs; 6100 u32 mount_fscid = (u32)-1; 6101 int err = -EINVAL; 6102 6103 ceph_decode_need(&p, end, sizeof(u32), bad); 6104 epoch = ceph_decode_32(&p); 6105 6106 doutc(cl, "epoch %u\n", epoch); 6107 6108 /* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */ 6109 ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad); 6110 6111 ceph_decode_32_safe(&p, end, num_fs, bad); 6112 while (num_fs-- > 0) { 6113 void *info_p, *info_end; 6114 u32 info_len; 6115 u32 fscid, namelen; 6116 6117 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 6118 p += 2; // info_v, info_cv 6119 info_len = ceph_decode_32(&p); 6120 ceph_decode_need(&p, end, info_len, bad); 6121 info_p = p; 6122 info_end = p + info_len; 6123 p = info_end; 6124 6125 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad); 6126 fscid = ceph_decode_32(&info_p); 6127 namelen = ceph_decode_32(&info_p); 6128 ceph_decode_need(&info_p, info_end, namelen, bad); 6129 6130 if (mds_namespace && 6131 strlen(mds_namespace) == namelen && 6132 !strncmp(mds_namespace, (char *)info_p, namelen)) { 6133 mount_fscid = fscid; 6134 break; 6135 } 6136 } 6137 6138 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch); 6139 if (mount_fscid != (u32)-1) { 6140 fsc->client->monc.fs_cluster_id = mount_fscid; 6141 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 6142 0, true); 6143 ceph_monc_renew_subs(&fsc->client->monc); 6144 } else { 6145 err = -ENOENT; 6146 goto err_out; 6147 } 6148 return; 6149 6150 bad: 6151 pr_err_client(cl, "error decoding fsmap %d. Shutting down mount.\n", 6152 err); 6153 ceph_umount_begin(mdsc->fsc->sb); 6154 ceph_msg_dump(msg); 6155 err_out: 6156 mutex_lock(&mdsc->mutex); 6157 mdsc->mdsmap_err = err; 6158 __wake_requests(mdsc, &mdsc->waiting_for_map); 6159 mutex_unlock(&mdsc->mutex); 6160 } 6161 6162 /* 6163 * handle mds map update. 6164 */ 6165 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 6166 { 6167 struct ceph_client *cl = mdsc->fsc->client; 6168 u32 epoch; 6169 u32 maplen; 6170 void *p = msg->front.iov_base; 6171 void *end = p + msg->front.iov_len; 6172 struct ceph_mdsmap *newmap, *oldmap; 6173 struct ceph_fsid fsid; 6174 int err = -EINVAL; 6175 6176 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad); 6177 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 6178 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0) 6179 return; 6180 epoch = ceph_decode_32(&p); 6181 maplen = ceph_decode_32(&p); 6182 doutc(cl, "epoch %u len %d\n", epoch, (int)maplen); 6183 6184 /* do we need it? */ 6185 mutex_lock(&mdsc->mutex); 6186 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 6187 doutc(cl, "epoch %u <= our %u\n", epoch, mdsc->mdsmap->m_epoch); 6188 mutex_unlock(&mdsc->mutex); 6189 return; 6190 } 6191 6192 newmap = ceph_mdsmap_decode(mdsc, &p, end, ceph_msgr2(mdsc->fsc->client)); 6193 if (IS_ERR(newmap)) { 6194 err = PTR_ERR(newmap); 6195 goto bad_unlock; 6196 } 6197 6198 /* swap into place */ 6199 if (mdsc->mdsmap) { 6200 oldmap = mdsc->mdsmap; 6201 mdsc->mdsmap = newmap; 6202 check_new_map(mdsc, newmap, oldmap); 6203 ceph_mdsmap_destroy(oldmap); 6204 } else { 6205 mdsc->mdsmap = newmap; /* first mds map */ 6206 } 6207 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size, 6208 MAX_LFS_FILESIZE); 6209 6210 __wake_requests(mdsc, &mdsc->waiting_for_map); 6211 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP, 6212 mdsc->mdsmap->m_epoch); 6213 6214 mutex_unlock(&mdsc->mutex); 6215 schedule_delayed(mdsc, 0); 6216 return; 6217 6218 bad_unlock: 6219 mutex_unlock(&mdsc->mutex); 6220 bad: 6221 pr_err_client(cl, "error decoding mdsmap %d. Shutting down mount.\n", 6222 err); 6223 ceph_umount_begin(mdsc->fsc->sb); 6224 ceph_msg_dump(msg); 6225 return; 6226 } 6227 6228 static struct ceph_connection *mds_get_con(struct ceph_connection *con) 6229 { 6230 struct ceph_mds_session *s = con->private; 6231 6232 if (ceph_get_mds_session(s)) 6233 return con; 6234 return NULL; 6235 } 6236 6237 static void mds_put_con(struct ceph_connection *con) 6238 { 6239 struct ceph_mds_session *s = con->private; 6240 6241 ceph_put_mds_session(s); 6242 } 6243 6244 /* 6245 * if the client is unresponsive for long enough, the mds will kill 6246 * the session entirely. 6247 */ 6248 static void mds_peer_reset(struct ceph_connection *con) 6249 { 6250 struct ceph_mds_session *s = con->private; 6251 struct ceph_mds_client *mdsc = s->s_mdsc; 6252 6253 pr_warn_client(mdsc->fsc->client, "mds%d closed our session\n", 6254 s->s_mds); 6255 if (READ_ONCE(mdsc->fsc->mount_state) != CEPH_MOUNT_FENCE_IO && 6256 ceph_mdsmap_get_state(mdsc->mdsmap, s->s_mds) >= CEPH_MDS_STATE_RECONNECT) 6257 send_mds_reconnect(mdsc, s); 6258 } 6259 6260 static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg) 6261 { 6262 struct ceph_mds_session *s = con->private; 6263 struct ceph_mds_client *mdsc = s->s_mdsc; 6264 struct ceph_client *cl = mdsc->fsc->client; 6265 int type = le16_to_cpu(msg->hdr.type); 6266 6267 mutex_lock(&mdsc->mutex); 6268 if (__verify_registered_session(mdsc, s) < 0) { 6269 mutex_unlock(&mdsc->mutex); 6270 goto out; 6271 } 6272 mutex_unlock(&mdsc->mutex); 6273 6274 switch (type) { 6275 case CEPH_MSG_MDS_MAP: 6276 ceph_mdsc_handle_mdsmap(mdsc, msg); 6277 break; 6278 case CEPH_MSG_FS_MAP_USER: 6279 ceph_mdsc_handle_fsmap(mdsc, msg); 6280 break; 6281 case CEPH_MSG_CLIENT_SESSION: 6282 handle_session(s, msg); 6283 break; 6284 case CEPH_MSG_CLIENT_REPLY: 6285 handle_reply(s, msg); 6286 break; 6287 case CEPH_MSG_CLIENT_REQUEST_FORWARD: 6288 handle_forward(mdsc, s, msg); 6289 break; 6290 case CEPH_MSG_CLIENT_CAPS: 6291 ceph_handle_caps(s, msg); 6292 break; 6293 case CEPH_MSG_CLIENT_SNAP: 6294 ceph_handle_snap(mdsc, s, msg); 6295 break; 6296 case CEPH_MSG_CLIENT_LEASE: 6297 handle_lease(mdsc, s, msg); 6298 break; 6299 case CEPH_MSG_CLIENT_QUOTA: 6300 ceph_handle_quota(mdsc, s, msg); 6301 break; 6302 6303 default: 6304 pr_err_client(cl, "received unknown message type %d %s\n", 6305 type, ceph_msg_type_name(type)); 6306 } 6307 out: 6308 ceph_msg_put(msg); 6309 } 6310 6311 /* 6312 * authentication 6313 */ 6314 6315 /* 6316 * Note: returned pointer is the address of a structure that's 6317 * managed separately. Caller must *not* attempt to free it. 6318 */ 6319 static struct ceph_auth_handshake * 6320 mds_get_authorizer(struct ceph_connection *con, int *proto, int force_new) 6321 { 6322 struct ceph_mds_session *s = con->private; 6323 struct ceph_mds_client *mdsc = s->s_mdsc; 6324 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 6325 struct ceph_auth_handshake *auth = &s->s_auth; 6326 int ret; 6327 6328 ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS, 6329 force_new, proto, NULL, NULL); 6330 if (ret) 6331 return ERR_PTR(ret); 6332 6333 return auth; 6334 } 6335 6336 static int mds_add_authorizer_challenge(struct ceph_connection *con, 6337 void *challenge_buf, int challenge_buf_len) 6338 { 6339 struct ceph_mds_session *s = con->private; 6340 struct ceph_mds_client *mdsc = s->s_mdsc; 6341 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 6342 6343 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer, 6344 challenge_buf, challenge_buf_len); 6345 } 6346 6347 static int mds_verify_authorizer_reply(struct ceph_connection *con) 6348 { 6349 struct ceph_mds_session *s = con->private; 6350 struct ceph_mds_client *mdsc = s->s_mdsc; 6351 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 6352 struct ceph_auth_handshake *auth = &s->s_auth; 6353 6354 return ceph_auth_verify_authorizer_reply(ac, auth->authorizer, 6355 auth->authorizer_reply_buf, auth->authorizer_reply_buf_len, 6356 NULL, NULL, NULL, NULL); 6357 } 6358 6359 static int mds_invalidate_authorizer(struct ceph_connection *con) 6360 { 6361 struct ceph_mds_session *s = con->private; 6362 struct ceph_mds_client *mdsc = s->s_mdsc; 6363 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 6364 6365 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS); 6366 6367 return ceph_monc_validate_auth(&mdsc->fsc->client->monc); 6368 } 6369 6370 static int mds_get_auth_request(struct ceph_connection *con, 6371 void *buf, int *buf_len, 6372 void **authorizer, int *authorizer_len) 6373 { 6374 struct ceph_mds_session *s = con->private; 6375 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 6376 struct ceph_auth_handshake *auth = &s->s_auth; 6377 int ret; 6378 6379 ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS, 6380 buf, buf_len); 6381 if (ret) 6382 return ret; 6383 6384 *authorizer = auth->authorizer_buf; 6385 *authorizer_len = auth->authorizer_buf_len; 6386 return 0; 6387 } 6388 6389 static int mds_handle_auth_reply_more(struct ceph_connection *con, 6390 void *reply, int reply_len, 6391 void *buf, int *buf_len, 6392 void **authorizer, int *authorizer_len) 6393 { 6394 struct ceph_mds_session *s = con->private; 6395 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 6396 struct ceph_auth_handshake *auth = &s->s_auth; 6397 int ret; 6398 6399 ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len, 6400 buf, buf_len); 6401 if (ret) 6402 return ret; 6403 6404 *authorizer = auth->authorizer_buf; 6405 *authorizer_len = auth->authorizer_buf_len; 6406 return 0; 6407 } 6408 6409 static int mds_handle_auth_done(struct ceph_connection *con, 6410 u64 global_id, void *reply, int reply_len, 6411 u8 *session_key, int *session_key_len, 6412 u8 *con_secret, int *con_secret_len) 6413 { 6414 struct ceph_mds_session *s = con->private; 6415 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 6416 struct ceph_auth_handshake *auth = &s->s_auth; 6417 6418 return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len, 6419 session_key, session_key_len, 6420 con_secret, con_secret_len); 6421 } 6422 6423 static int mds_handle_auth_bad_method(struct ceph_connection *con, 6424 int used_proto, int result, 6425 const int *allowed_protos, int proto_cnt, 6426 const int *allowed_modes, int mode_cnt) 6427 { 6428 struct ceph_mds_session *s = con->private; 6429 struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc; 6430 int ret; 6431 6432 if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS, 6433 used_proto, result, 6434 allowed_protos, proto_cnt, 6435 allowed_modes, mode_cnt)) { 6436 ret = ceph_monc_validate_auth(monc); 6437 if (ret) 6438 return ret; 6439 } 6440 6441 return -EACCES; 6442 } 6443 6444 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con, 6445 struct ceph_msg_header *hdr, int *skip) 6446 { 6447 struct ceph_msg *msg; 6448 int type = (int) le16_to_cpu(hdr->type); 6449 int front_len = (int) le32_to_cpu(hdr->front_len); 6450 6451 if (con->in_msg) 6452 return con->in_msg; 6453 6454 *skip = 0; 6455 msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 6456 if (!msg) { 6457 pr_err("unable to allocate msg type %d len %d\n", 6458 type, front_len); 6459 return NULL; 6460 } 6461 6462 return msg; 6463 } 6464 6465 static int mds_sign_message(struct ceph_msg *msg) 6466 { 6467 struct ceph_mds_session *s = msg->con->private; 6468 struct ceph_auth_handshake *auth = &s->s_auth; 6469 6470 return ceph_auth_sign_message(auth, msg); 6471 } 6472 6473 static int mds_check_message_signature(struct ceph_msg *msg) 6474 { 6475 struct ceph_mds_session *s = msg->con->private; 6476 struct ceph_auth_handshake *auth = &s->s_auth; 6477 6478 return ceph_auth_check_message_signature(auth, msg); 6479 } 6480 6481 static const struct ceph_connection_operations mds_con_ops = { 6482 .get = mds_get_con, 6483 .put = mds_put_con, 6484 .alloc_msg = mds_alloc_msg, 6485 .dispatch = mds_dispatch, 6486 .peer_reset = mds_peer_reset, 6487 .get_authorizer = mds_get_authorizer, 6488 .add_authorizer_challenge = mds_add_authorizer_challenge, 6489 .verify_authorizer_reply = mds_verify_authorizer_reply, 6490 .invalidate_authorizer = mds_invalidate_authorizer, 6491 .sign_message = mds_sign_message, 6492 .check_message_signature = mds_check_message_signature, 6493 .get_auth_request = mds_get_auth_request, 6494 .handle_auth_reply_more = mds_handle_auth_reply_more, 6495 .handle_auth_done = mds_handle_auth_done, 6496 .handle_auth_bad_method = mds_handle_auth_bad_method, 6497 }; 6498 6499 /* eof */ 6500