1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/fs.h> 5 #include <linux/wait.h> 6 #include <linux/slab.h> 7 #include <linux/gfp.h> 8 #include <linux/sched.h> 9 #include <linux/debugfs.h> 10 #include <linux/seq_file.h> 11 #include <linux/ratelimit.h> 12 #include <linux/bits.h> 13 #include <linux/ktime.h> 14 #include <linux/bitmap.h> 15 #include <linux/mnt_idmapping.h> 16 17 #include "super.h" 18 #include "mds_client.h" 19 #include "crypto.h" 20 21 #include <linux/ceph/ceph_features.h> 22 #include <linux/ceph/messenger.h> 23 #include <linux/ceph/decode.h> 24 #include <linux/ceph/pagelist.h> 25 #include <linux/ceph/auth.h> 26 #include <linux/ceph/debugfs.h> 27 #include <trace/events/ceph.h> 28 29 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE) 30 31 /* 32 * A cluster of MDS (metadata server) daemons is responsible for 33 * managing the file system namespace (the directory hierarchy and 34 * inodes) and for coordinating shared access to storage. Metadata is 35 * partitioning hierarchically across a number of servers, and that 36 * partition varies over time as the cluster adjusts the distribution 37 * in order to balance load. 38 * 39 * The MDS client is primarily responsible to managing synchronous 40 * metadata requests for operations like open, unlink, and so forth. 41 * If there is a MDS failure, we find out about it when we (possibly 42 * request and) receive a new MDS map, and can resubmit affected 43 * requests. 44 * 45 * For the most part, though, we take advantage of a lossless 46 * communications channel to the MDS, and do not need to worry about 47 * timing out or resubmitting requests. 48 * 49 * We maintain a stateful "session" with each MDS we interact with. 50 * Within each session, we sent periodic heartbeat messages to ensure 51 * any capabilities or leases we have been issues remain valid. If 52 * the session times out and goes stale, our leases and capabilities 53 * are no longer valid. 54 */ 55 56 struct ceph_reconnect_state { 57 struct ceph_mds_session *session; 58 int nr_caps, nr_realms; 59 struct ceph_pagelist *pagelist; 60 unsigned msg_version; 61 bool allow_multi; 62 }; 63 64 static void __wake_requests(struct ceph_mds_client *mdsc, 65 struct list_head *head); 66 static void ceph_cap_release_work(struct work_struct *work); 67 static void ceph_cap_reclaim_work(struct work_struct *work); 68 69 static const struct ceph_connection_operations mds_con_ops; 70 71 72 /* 73 * mds reply parsing 74 */ 75 76 static int parse_reply_info_quota(void **p, void *end, 77 struct ceph_mds_reply_info_in *info) 78 { 79 u8 struct_v, struct_compat; 80 u32 struct_len; 81 82 ceph_decode_8_safe(p, end, struct_v, bad); 83 ceph_decode_8_safe(p, end, struct_compat, bad); 84 /* struct_v is expected to be >= 1. we only 85 * understand encoding with struct_compat == 1. */ 86 if (!struct_v || struct_compat != 1) 87 goto bad; 88 ceph_decode_32_safe(p, end, struct_len, bad); 89 ceph_decode_need(p, end, struct_len, bad); 90 end = *p + struct_len; 91 ceph_decode_64_safe(p, end, info->max_bytes, bad); 92 ceph_decode_64_safe(p, end, info->max_files, bad); 93 *p = end; 94 return 0; 95 bad: 96 return -EIO; 97 } 98 99 /* 100 * parse individual inode info 101 */ 102 static int parse_reply_info_in(void **p, void *end, 103 struct ceph_mds_reply_info_in *info, 104 u64 features) 105 { 106 int err = 0; 107 u8 struct_v = 0; 108 109 if (features == (u64)-1) { 110 u32 struct_len; 111 u8 struct_compat; 112 ceph_decode_8_safe(p, end, struct_v, bad); 113 ceph_decode_8_safe(p, end, struct_compat, bad); 114 /* struct_v is expected to be >= 1. we only understand 115 * encoding with struct_compat == 1. */ 116 if (!struct_v || struct_compat != 1) 117 goto bad; 118 ceph_decode_32_safe(p, end, struct_len, bad); 119 ceph_decode_need(p, end, struct_len, bad); 120 end = *p + struct_len; 121 } 122 123 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad); 124 info->in = *p; 125 *p += sizeof(struct ceph_mds_reply_inode) + 126 sizeof(*info->in->fragtree.splits) * 127 le32_to_cpu(info->in->fragtree.nsplits); 128 129 ceph_decode_32_safe(p, end, info->symlink_len, bad); 130 ceph_decode_need(p, end, info->symlink_len, bad); 131 info->symlink = *p; 132 *p += info->symlink_len; 133 134 ceph_decode_copy_safe(p, end, &info->dir_layout, 135 sizeof(info->dir_layout), bad); 136 ceph_decode_32_safe(p, end, info->xattr_len, bad); 137 ceph_decode_need(p, end, info->xattr_len, bad); 138 info->xattr_data = *p; 139 *p += info->xattr_len; 140 141 if (features == (u64)-1) { 142 /* inline data */ 143 ceph_decode_64_safe(p, end, info->inline_version, bad); 144 ceph_decode_32_safe(p, end, info->inline_len, bad); 145 ceph_decode_need(p, end, info->inline_len, bad); 146 info->inline_data = *p; 147 *p += info->inline_len; 148 /* quota */ 149 err = parse_reply_info_quota(p, end, info); 150 if (err < 0) 151 goto out_bad; 152 /* pool namespace */ 153 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 154 if (info->pool_ns_len > 0) { 155 ceph_decode_need(p, end, info->pool_ns_len, bad); 156 info->pool_ns_data = *p; 157 *p += info->pool_ns_len; 158 } 159 160 /* btime */ 161 ceph_decode_need(p, end, sizeof(info->btime), bad); 162 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 163 164 /* change attribute */ 165 ceph_decode_64_safe(p, end, info->change_attr, bad); 166 167 /* dir pin */ 168 if (struct_v >= 2) { 169 ceph_decode_32_safe(p, end, info->dir_pin, bad); 170 } else { 171 info->dir_pin = -ENODATA; 172 } 173 174 /* snapshot birth time, remains zero for v<=2 */ 175 if (struct_v >= 3) { 176 ceph_decode_need(p, end, sizeof(info->snap_btime), bad); 177 ceph_decode_copy(p, &info->snap_btime, 178 sizeof(info->snap_btime)); 179 } else { 180 memset(&info->snap_btime, 0, sizeof(info->snap_btime)); 181 } 182 183 /* snapshot count, remains zero for v<=3 */ 184 if (struct_v >= 4) { 185 ceph_decode_64_safe(p, end, info->rsnaps, bad); 186 } else { 187 info->rsnaps = 0; 188 } 189 190 if (struct_v >= 5) { 191 u32 alen; 192 193 ceph_decode_32_safe(p, end, alen, bad); 194 195 while (alen--) { 196 u32 len; 197 198 /* key */ 199 ceph_decode_32_safe(p, end, len, bad); 200 ceph_decode_skip_n(p, end, len, bad); 201 /* value */ 202 ceph_decode_32_safe(p, end, len, bad); 203 ceph_decode_skip_n(p, end, len, bad); 204 } 205 } 206 207 /* fscrypt flag -- ignore */ 208 if (struct_v >= 6) 209 ceph_decode_skip_8(p, end, bad); 210 211 info->fscrypt_auth = NULL; 212 info->fscrypt_auth_len = 0; 213 info->fscrypt_file = NULL; 214 info->fscrypt_file_len = 0; 215 if (struct_v >= 7) { 216 ceph_decode_32_safe(p, end, info->fscrypt_auth_len, bad); 217 if (info->fscrypt_auth_len) { 218 info->fscrypt_auth = kmalloc(info->fscrypt_auth_len, 219 GFP_KERNEL); 220 if (!info->fscrypt_auth) 221 return -ENOMEM; 222 ceph_decode_copy_safe(p, end, info->fscrypt_auth, 223 info->fscrypt_auth_len, bad); 224 } 225 ceph_decode_32_safe(p, end, info->fscrypt_file_len, bad); 226 if (info->fscrypt_file_len) { 227 info->fscrypt_file = kmalloc(info->fscrypt_file_len, 228 GFP_KERNEL); 229 if (!info->fscrypt_file) 230 return -ENOMEM; 231 ceph_decode_copy_safe(p, end, info->fscrypt_file, 232 info->fscrypt_file_len, bad); 233 } 234 } 235 *p = end; 236 } else { 237 /* legacy (unversioned) struct */ 238 if (features & CEPH_FEATURE_MDS_INLINE_DATA) { 239 ceph_decode_64_safe(p, end, info->inline_version, bad); 240 ceph_decode_32_safe(p, end, info->inline_len, bad); 241 ceph_decode_need(p, end, info->inline_len, bad); 242 info->inline_data = *p; 243 *p += info->inline_len; 244 } else 245 info->inline_version = CEPH_INLINE_NONE; 246 247 if (features & CEPH_FEATURE_MDS_QUOTA) { 248 err = parse_reply_info_quota(p, end, info); 249 if (err < 0) 250 goto out_bad; 251 } else { 252 info->max_bytes = 0; 253 info->max_files = 0; 254 } 255 256 info->pool_ns_len = 0; 257 info->pool_ns_data = NULL; 258 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { 259 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 260 if (info->pool_ns_len > 0) { 261 ceph_decode_need(p, end, info->pool_ns_len, bad); 262 info->pool_ns_data = *p; 263 *p += info->pool_ns_len; 264 } 265 } 266 267 if (features & CEPH_FEATURE_FS_BTIME) { 268 ceph_decode_need(p, end, sizeof(info->btime), bad); 269 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 270 ceph_decode_64_safe(p, end, info->change_attr, bad); 271 } 272 273 info->dir_pin = -ENODATA; 274 /* info->snap_btime and info->rsnaps remain zero */ 275 } 276 return 0; 277 bad: 278 err = -EIO; 279 out_bad: 280 return err; 281 } 282 283 static int parse_reply_info_dir(void **p, void *end, 284 struct ceph_mds_reply_dirfrag **dirfrag, 285 u64 features) 286 { 287 if (features == (u64)-1) { 288 u8 struct_v, struct_compat; 289 u32 struct_len; 290 ceph_decode_8_safe(p, end, struct_v, bad); 291 ceph_decode_8_safe(p, end, struct_compat, bad); 292 /* struct_v is expected to be >= 1. we only understand 293 * encoding whose struct_compat == 1. */ 294 if (!struct_v || struct_compat != 1) 295 goto bad; 296 ceph_decode_32_safe(p, end, struct_len, bad); 297 ceph_decode_need(p, end, struct_len, bad); 298 end = *p + struct_len; 299 } 300 301 ceph_decode_need(p, end, sizeof(**dirfrag), bad); 302 *dirfrag = *p; 303 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist); 304 if (unlikely(*p > end)) 305 goto bad; 306 if (features == (u64)-1) 307 *p = end; 308 return 0; 309 bad: 310 return -EIO; 311 } 312 313 static int parse_reply_info_lease(void **p, void *end, 314 struct ceph_mds_reply_lease **lease, 315 u64 features, u32 *altname_len, u8 **altname) 316 { 317 u8 struct_v; 318 u32 struct_len; 319 void *lend; 320 321 if (features == (u64)-1) { 322 u8 struct_compat; 323 324 ceph_decode_8_safe(p, end, struct_v, bad); 325 ceph_decode_8_safe(p, end, struct_compat, bad); 326 327 /* struct_v is expected to be >= 1. we only understand 328 * encoding whose struct_compat == 1. */ 329 if (!struct_v || struct_compat != 1) 330 goto bad; 331 332 ceph_decode_32_safe(p, end, struct_len, bad); 333 } else { 334 struct_len = sizeof(**lease); 335 *altname_len = 0; 336 *altname = NULL; 337 } 338 339 lend = *p + struct_len; 340 ceph_decode_need(p, end, struct_len, bad); 341 *lease = *p; 342 *p += sizeof(**lease); 343 344 if (features == (u64)-1) { 345 if (struct_v >= 2) { 346 ceph_decode_32_safe(p, end, *altname_len, bad); 347 ceph_decode_need(p, end, *altname_len, bad); 348 *altname = *p; 349 *p += *altname_len; 350 } else { 351 *altname = NULL; 352 *altname_len = 0; 353 } 354 } 355 *p = lend; 356 return 0; 357 bad: 358 return -EIO; 359 } 360 361 /* 362 * parse a normal reply, which may contain a (dir+)dentry and/or a 363 * target inode. 364 */ 365 static int parse_reply_info_trace(void **p, void *end, 366 struct ceph_mds_reply_info_parsed *info, 367 u64 features) 368 { 369 int err; 370 371 if (info->head->is_dentry) { 372 err = parse_reply_info_in(p, end, &info->diri, features); 373 if (err < 0) 374 goto out_bad; 375 376 err = parse_reply_info_dir(p, end, &info->dirfrag, features); 377 if (err < 0) 378 goto out_bad; 379 380 ceph_decode_32_safe(p, end, info->dname_len, bad); 381 ceph_decode_need(p, end, info->dname_len, bad); 382 info->dname = *p; 383 *p += info->dname_len; 384 385 err = parse_reply_info_lease(p, end, &info->dlease, features, 386 &info->altname_len, &info->altname); 387 if (err < 0) 388 goto out_bad; 389 } 390 391 if (info->head->is_target) { 392 err = parse_reply_info_in(p, end, &info->targeti, features); 393 if (err < 0) 394 goto out_bad; 395 } 396 397 if (unlikely(*p != end)) 398 goto bad; 399 return 0; 400 401 bad: 402 err = -EIO; 403 out_bad: 404 pr_err("problem parsing mds trace %d\n", err); 405 return err; 406 } 407 408 /* 409 * parse readdir results 410 */ 411 static int parse_reply_info_readdir(void **p, void *end, 412 struct ceph_mds_request *req, 413 u64 features) 414 { 415 struct ceph_mds_reply_info_parsed *info = &req->r_reply_info; 416 struct ceph_client *cl = req->r_mdsc->fsc->client; 417 u32 num, i = 0; 418 int err; 419 420 err = parse_reply_info_dir(p, end, &info->dir_dir, features); 421 if (err < 0) 422 goto out_bad; 423 424 ceph_decode_need(p, end, sizeof(num) + 2, bad); 425 num = ceph_decode_32(p); 426 { 427 u16 flags = ceph_decode_16(p); 428 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END); 429 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE); 430 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER); 431 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH); 432 } 433 if (num == 0) 434 goto done; 435 436 BUG_ON(!info->dir_entries); 437 if ((unsigned long)(info->dir_entries + num) > 438 (unsigned long)info->dir_entries + info->dir_buf_size) { 439 pr_err_client(cl, "dir contents are larger than expected\n"); 440 WARN_ON(1); 441 goto bad; 442 } 443 444 info->dir_nr = num; 445 while (num) { 446 struct inode *inode = d_inode(req->r_dentry); 447 struct ceph_inode_info *ci = ceph_inode(inode); 448 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 449 struct fscrypt_str tname = FSTR_INIT(NULL, 0); 450 struct fscrypt_str oname = FSTR_INIT(NULL, 0); 451 struct ceph_fname fname; 452 u32 altname_len, _name_len; 453 u8 *altname, *_name; 454 455 /* dentry */ 456 ceph_decode_32_safe(p, end, _name_len, bad); 457 ceph_decode_need(p, end, _name_len, bad); 458 _name = *p; 459 *p += _name_len; 460 doutc(cl, "parsed dir dname '%.*s'\n", _name_len, _name); 461 462 if (info->hash_order) 463 rde->raw_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash, 464 _name, _name_len); 465 466 /* dentry lease */ 467 err = parse_reply_info_lease(p, end, &rde->lease, features, 468 &altname_len, &altname); 469 if (err) 470 goto out_bad; 471 472 /* 473 * Try to dencrypt the dentry names and update them 474 * in the ceph_mds_reply_dir_entry struct. 475 */ 476 fname.dir = inode; 477 fname.name = _name; 478 fname.name_len = _name_len; 479 fname.ctext = altname; 480 fname.ctext_len = altname_len; 481 /* 482 * The _name_len maybe larger than altname_len, such as 483 * when the human readable name length is in range of 484 * (CEPH_NOHASH_NAME_MAX, CEPH_NOHASH_NAME_MAX + SHA256_DIGEST_SIZE), 485 * then the copy in ceph_fname_to_usr will corrupt the 486 * data if there has no encryption key. 487 * 488 * Just set the no_copy flag and then if there has no 489 * encryption key the oname.name will be assigned to 490 * _name always. 491 */ 492 fname.no_copy = true; 493 if (altname_len == 0) { 494 /* 495 * Set tname to _name, and this will be used 496 * to do the base64_decode in-place. It's 497 * safe because the decoded string should 498 * always be shorter, which is 3/4 of origin 499 * string. 500 */ 501 tname.name = _name; 502 503 /* 504 * Set oname to _name too, and this will be 505 * used to do the dencryption in-place. 506 */ 507 oname.name = _name; 508 oname.len = _name_len; 509 } else { 510 /* 511 * This will do the decryption only in-place 512 * from altname cryptext directly. 513 */ 514 oname.name = altname; 515 oname.len = altname_len; 516 } 517 rde->is_nokey = false; 518 err = ceph_fname_to_usr(&fname, &tname, &oname, &rde->is_nokey); 519 if (err) { 520 pr_err_client(cl, "unable to decode %.*s, got %d\n", 521 _name_len, _name, err); 522 goto out_bad; 523 } 524 rde->name = oname.name; 525 rde->name_len = oname.len; 526 527 /* inode */ 528 err = parse_reply_info_in(p, end, &rde->inode, features); 529 if (err < 0) 530 goto out_bad; 531 /* ceph_readdir_prepopulate() will update it */ 532 rde->offset = 0; 533 i++; 534 num--; 535 } 536 537 done: 538 /* Skip over any unrecognized fields */ 539 *p = end; 540 return 0; 541 542 bad: 543 err = -EIO; 544 out_bad: 545 pr_err_client(cl, "problem parsing dir contents %d\n", err); 546 return err; 547 } 548 549 /* 550 * parse fcntl F_GETLK results 551 */ 552 static int parse_reply_info_filelock(void **p, void *end, 553 struct ceph_mds_reply_info_parsed *info, 554 u64 features) 555 { 556 if (*p + sizeof(*info->filelock_reply) > end) 557 goto bad; 558 559 info->filelock_reply = *p; 560 561 /* Skip over any unrecognized fields */ 562 *p = end; 563 return 0; 564 bad: 565 return -EIO; 566 } 567 568 569 #if BITS_PER_LONG == 64 570 571 #define DELEGATED_INO_AVAILABLE xa_mk_value(1) 572 573 static int ceph_parse_deleg_inos(void **p, void *end, 574 struct ceph_mds_session *s) 575 { 576 struct ceph_client *cl = s->s_mdsc->fsc->client; 577 u32 sets; 578 579 ceph_decode_32_safe(p, end, sets, bad); 580 doutc(cl, "got %u sets of delegated inodes\n", sets); 581 while (sets--) { 582 u64 start, len; 583 584 ceph_decode_64_safe(p, end, start, bad); 585 ceph_decode_64_safe(p, end, len, bad); 586 587 /* Don't accept a delegation of system inodes */ 588 if (start < CEPH_INO_SYSTEM_BASE) { 589 pr_warn_ratelimited_client(cl, 590 "ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n", 591 start, len); 592 continue; 593 } 594 while (len--) { 595 int err = xa_insert(&s->s_delegated_inos, start++, 596 DELEGATED_INO_AVAILABLE, 597 GFP_KERNEL); 598 if (!err) { 599 doutc(cl, "added delegated inode 0x%llx\n", start - 1); 600 } else if (err == -EBUSY) { 601 pr_warn_client(cl, 602 "MDS delegated inode 0x%llx more than once.\n", 603 start - 1); 604 } else { 605 return err; 606 } 607 } 608 } 609 return 0; 610 bad: 611 return -EIO; 612 } 613 614 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 615 { 616 unsigned long ino; 617 void *val; 618 619 xa_for_each(&s->s_delegated_inos, ino, val) { 620 val = xa_erase(&s->s_delegated_inos, ino); 621 if (val == DELEGATED_INO_AVAILABLE) 622 return ino; 623 } 624 return 0; 625 } 626 627 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 628 { 629 return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE, 630 GFP_KERNEL); 631 } 632 #else /* BITS_PER_LONG == 64 */ 633 /* 634 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just 635 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top 636 * and bottom words? 637 */ 638 static int ceph_parse_deleg_inos(void **p, void *end, 639 struct ceph_mds_session *s) 640 { 641 u32 sets; 642 643 ceph_decode_32_safe(p, end, sets, bad); 644 if (sets) 645 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad); 646 return 0; 647 bad: 648 return -EIO; 649 } 650 651 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 652 { 653 return 0; 654 } 655 656 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 657 { 658 return 0; 659 } 660 #endif /* BITS_PER_LONG == 64 */ 661 662 /* 663 * parse create results 664 */ 665 static int parse_reply_info_create(void **p, void *end, 666 struct ceph_mds_reply_info_parsed *info, 667 u64 features, struct ceph_mds_session *s) 668 { 669 int ret; 670 671 if (features == (u64)-1 || 672 (features & CEPH_FEATURE_REPLY_CREATE_INODE)) { 673 if (*p == end) { 674 /* Malformed reply? */ 675 info->has_create_ino = false; 676 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) { 677 info->has_create_ino = true; 678 /* struct_v, struct_compat, and len */ 679 ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad); 680 ceph_decode_64_safe(p, end, info->ino, bad); 681 ret = ceph_parse_deleg_inos(p, end, s); 682 if (ret) 683 return ret; 684 } else { 685 /* legacy */ 686 ceph_decode_64_safe(p, end, info->ino, bad); 687 info->has_create_ino = true; 688 } 689 } else { 690 if (*p != end) 691 goto bad; 692 } 693 694 /* Skip over any unrecognized fields */ 695 *p = end; 696 return 0; 697 bad: 698 return -EIO; 699 } 700 701 static int parse_reply_info_getvxattr(void **p, void *end, 702 struct ceph_mds_reply_info_parsed *info, 703 u64 features) 704 { 705 u32 value_len; 706 707 ceph_decode_skip_8(p, end, bad); /* skip current version: 1 */ 708 ceph_decode_skip_8(p, end, bad); /* skip first version: 1 */ 709 ceph_decode_skip_32(p, end, bad); /* skip payload length */ 710 711 ceph_decode_32_safe(p, end, value_len, bad); 712 713 if (value_len == end - *p) { 714 info->xattr_info.xattr_value = *p; 715 info->xattr_info.xattr_value_len = value_len; 716 *p = end; 717 return value_len; 718 } 719 bad: 720 return -EIO; 721 } 722 723 /* 724 * parse extra results 725 */ 726 static int parse_reply_info_extra(void **p, void *end, 727 struct ceph_mds_request *req, 728 u64 features, struct ceph_mds_session *s) 729 { 730 struct ceph_mds_reply_info_parsed *info = &req->r_reply_info; 731 u32 op = le32_to_cpu(info->head->op); 732 733 if (op == CEPH_MDS_OP_GETFILELOCK) 734 return parse_reply_info_filelock(p, end, info, features); 735 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP) 736 return parse_reply_info_readdir(p, end, req, features); 737 else if (op == CEPH_MDS_OP_CREATE) 738 return parse_reply_info_create(p, end, info, features, s); 739 else if (op == CEPH_MDS_OP_GETVXATTR) 740 return parse_reply_info_getvxattr(p, end, info, features); 741 else 742 return -EIO; 743 } 744 745 /* 746 * parse entire mds reply 747 */ 748 static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg, 749 struct ceph_mds_request *req, u64 features) 750 { 751 struct ceph_mds_reply_info_parsed *info = &req->r_reply_info; 752 struct ceph_client *cl = s->s_mdsc->fsc->client; 753 void *p, *end; 754 u32 len; 755 int err; 756 757 info->head = msg->front.iov_base; 758 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); 759 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head); 760 761 /* trace */ 762 ceph_decode_32_safe(&p, end, len, bad); 763 if (len > 0) { 764 ceph_decode_need(&p, end, len, bad); 765 err = parse_reply_info_trace(&p, p+len, info, features); 766 if (err < 0) 767 goto out_bad; 768 } 769 770 /* extra */ 771 ceph_decode_32_safe(&p, end, len, bad); 772 if (len > 0) { 773 ceph_decode_need(&p, end, len, bad); 774 err = parse_reply_info_extra(&p, p+len, req, features, s); 775 if (err < 0) 776 goto out_bad; 777 } 778 779 /* snap blob */ 780 ceph_decode_32_safe(&p, end, len, bad); 781 info->snapblob_len = len; 782 info->snapblob = p; 783 p += len; 784 785 if (p != end) 786 goto bad; 787 return 0; 788 789 bad: 790 err = -EIO; 791 out_bad: 792 pr_err_client(cl, "mds parse_reply err %d\n", err); 793 ceph_msg_dump(msg); 794 return err; 795 } 796 797 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 798 { 799 int i; 800 801 kfree(info->diri.fscrypt_auth); 802 kfree(info->diri.fscrypt_file); 803 kfree(info->targeti.fscrypt_auth); 804 kfree(info->targeti.fscrypt_file); 805 if (!info->dir_entries) 806 return; 807 808 for (i = 0; i < info->dir_nr; i++) { 809 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 810 811 kfree(rde->inode.fscrypt_auth); 812 kfree(rde->inode.fscrypt_file); 813 } 814 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size)); 815 } 816 817 /* 818 * In async unlink case the kclient won't wait for the first reply 819 * from MDS and just drop all the links and unhash the dentry and then 820 * succeeds immediately. 821 * 822 * For any new create/link/rename,etc requests followed by using the 823 * same file names we must wait for the first reply of the inflight 824 * unlink request, or the MDS possibly will fail these following 825 * requests with -EEXIST if the inflight async unlink request was 826 * delayed for some reasons. 827 * 828 * And the worst case is that for the none async openc request it will 829 * successfully open the file if the CDentry hasn't been unlinked yet, 830 * but later the previous delayed async unlink request will remove the 831 * CDentry. That means the just created file is possibly deleted later 832 * by accident. 833 * 834 * We need to wait for the inflight async unlink requests to finish 835 * when creating new files/directories by using the same file names. 836 */ 837 int ceph_wait_on_conflict_unlink(struct dentry *dentry) 838 { 839 struct ceph_fs_client *fsc = ceph_sb_to_fs_client(dentry->d_sb); 840 struct ceph_client *cl = fsc->client; 841 struct dentry *pdentry = dentry->d_parent; 842 struct dentry *udentry, *found = NULL; 843 struct ceph_dentry_info *di; 844 struct qstr dname; 845 u32 hash = dentry->d_name.hash; 846 int err; 847 848 dname.name = dentry->d_name.name; 849 dname.len = dentry->d_name.len; 850 851 rcu_read_lock(); 852 hash_for_each_possible_rcu(fsc->async_unlink_conflict, di, 853 hnode, hash) { 854 udentry = di->dentry; 855 856 spin_lock(&udentry->d_lock); 857 if (udentry->d_name.hash != hash) 858 goto next; 859 if (unlikely(udentry->d_parent != pdentry)) 860 goto next; 861 if (!hash_hashed(&di->hnode)) 862 goto next; 863 864 if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags)) 865 pr_warn_client(cl, "dentry %p:%pd async unlink bit is not set\n", 866 dentry, dentry); 867 868 if (!d_same_name(udentry, pdentry, &dname)) 869 goto next; 870 871 found = dget_dlock(udentry); 872 spin_unlock(&udentry->d_lock); 873 break; 874 next: 875 spin_unlock(&udentry->d_lock); 876 } 877 rcu_read_unlock(); 878 879 if (likely(!found)) 880 return 0; 881 882 doutc(cl, "dentry %p:%pd conflict with old %p:%pd\n", dentry, dentry, 883 found, found); 884 885 err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT, 886 TASK_KILLABLE); 887 dput(found); 888 return err; 889 } 890 891 892 /* 893 * sessions 894 */ 895 const char *ceph_session_state_name(int s) 896 { 897 switch (s) { 898 case CEPH_MDS_SESSION_NEW: return "new"; 899 case CEPH_MDS_SESSION_OPENING: return "opening"; 900 case CEPH_MDS_SESSION_OPEN: return "open"; 901 case CEPH_MDS_SESSION_HUNG: return "hung"; 902 case CEPH_MDS_SESSION_CLOSING: return "closing"; 903 case CEPH_MDS_SESSION_CLOSED: return "closed"; 904 case CEPH_MDS_SESSION_RESTARTING: return "restarting"; 905 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; 906 case CEPH_MDS_SESSION_REJECTED: return "rejected"; 907 default: return "???"; 908 } 909 } 910 911 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s) 912 { 913 if (refcount_inc_not_zero(&s->s_ref)) 914 return s; 915 return NULL; 916 } 917 918 void ceph_put_mds_session(struct ceph_mds_session *s) 919 { 920 if (IS_ERR_OR_NULL(s)) 921 return; 922 923 if (refcount_dec_and_test(&s->s_ref)) { 924 if (s->s_auth.authorizer) 925 ceph_auth_destroy_authorizer(s->s_auth.authorizer); 926 WARN_ON(mutex_is_locked(&s->s_mutex)); 927 xa_destroy(&s->s_delegated_inos); 928 kfree(s); 929 } 930 } 931 932 /* 933 * called under mdsc->mutex 934 */ 935 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, 936 int mds) 937 { 938 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 939 return NULL; 940 return ceph_get_mds_session(mdsc->sessions[mds]); 941 } 942 943 static bool __have_session(struct ceph_mds_client *mdsc, int mds) 944 { 945 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 946 return false; 947 else 948 return true; 949 } 950 951 static int __verify_registered_session(struct ceph_mds_client *mdsc, 952 struct ceph_mds_session *s) 953 { 954 if (s->s_mds >= mdsc->max_sessions || 955 mdsc->sessions[s->s_mds] != s) 956 return -ENOENT; 957 return 0; 958 } 959 960 /* 961 * create+register a new session for given mds. 962 * called under mdsc->mutex. 963 */ 964 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, 965 int mds) 966 { 967 struct ceph_client *cl = mdsc->fsc->client; 968 struct ceph_mds_session *s; 969 970 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) 971 return ERR_PTR(-EIO); 972 973 if (mds >= mdsc->mdsmap->possible_max_rank) 974 return ERR_PTR(-EINVAL); 975 976 s = kzalloc(sizeof(*s), GFP_NOFS); 977 if (!s) 978 return ERR_PTR(-ENOMEM); 979 980 if (mds >= mdsc->max_sessions) { 981 int newmax = 1 << get_count_order(mds + 1); 982 struct ceph_mds_session **sa; 983 size_t ptr_size = sizeof(struct ceph_mds_session *); 984 985 doutc(cl, "realloc to %d\n", newmax); 986 sa = kcalloc(newmax, ptr_size, GFP_NOFS); 987 if (!sa) 988 goto fail_realloc; 989 if (mdsc->sessions) { 990 memcpy(sa, mdsc->sessions, 991 mdsc->max_sessions * ptr_size); 992 kfree(mdsc->sessions); 993 } 994 mdsc->sessions = sa; 995 mdsc->max_sessions = newmax; 996 } 997 998 doutc(cl, "mds%d\n", mds); 999 s->s_mdsc = mdsc; 1000 s->s_mds = mds; 1001 s->s_state = CEPH_MDS_SESSION_NEW; 1002 mutex_init(&s->s_mutex); 1003 1004 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr); 1005 1006 atomic_set(&s->s_cap_gen, 1); 1007 s->s_cap_ttl = jiffies - 1; 1008 1009 spin_lock_init(&s->s_cap_lock); 1010 INIT_LIST_HEAD(&s->s_caps); 1011 refcount_set(&s->s_ref, 1); 1012 INIT_LIST_HEAD(&s->s_waiting); 1013 INIT_LIST_HEAD(&s->s_unsafe); 1014 xa_init(&s->s_delegated_inos); 1015 INIT_LIST_HEAD(&s->s_cap_releases); 1016 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work); 1017 1018 INIT_LIST_HEAD(&s->s_cap_dirty); 1019 INIT_LIST_HEAD(&s->s_cap_flushing); 1020 1021 mdsc->sessions[mds] = s; 1022 atomic_inc(&mdsc->num_sessions); 1023 refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 1024 1025 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, 1026 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 1027 1028 return s; 1029 1030 fail_realloc: 1031 kfree(s); 1032 return ERR_PTR(-ENOMEM); 1033 } 1034 1035 /* 1036 * called under mdsc->mutex 1037 */ 1038 static void __unregister_session(struct ceph_mds_client *mdsc, 1039 struct ceph_mds_session *s) 1040 { 1041 doutc(mdsc->fsc->client, "mds%d %p\n", s->s_mds, s); 1042 BUG_ON(mdsc->sessions[s->s_mds] != s); 1043 mdsc->sessions[s->s_mds] = NULL; 1044 ceph_con_close(&s->s_con); 1045 ceph_put_mds_session(s); 1046 atomic_dec(&mdsc->num_sessions); 1047 } 1048 1049 /* 1050 * drop session refs in request. 1051 * 1052 * should be last request ref, or hold mdsc->mutex 1053 */ 1054 static void put_request_session(struct ceph_mds_request *req) 1055 { 1056 if (req->r_session) { 1057 ceph_put_mds_session(req->r_session); 1058 req->r_session = NULL; 1059 } 1060 } 1061 1062 void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc, 1063 void (*cb)(struct ceph_mds_session *), 1064 bool check_state) 1065 { 1066 int mds; 1067 1068 mutex_lock(&mdsc->mutex); 1069 for (mds = 0; mds < mdsc->max_sessions; ++mds) { 1070 struct ceph_mds_session *s; 1071 1072 s = __ceph_lookup_mds_session(mdsc, mds); 1073 if (!s) 1074 continue; 1075 1076 if (check_state && !check_session_state(s)) { 1077 ceph_put_mds_session(s); 1078 continue; 1079 } 1080 1081 mutex_unlock(&mdsc->mutex); 1082 cb(s); 1083 ceph_put_mds_session(s); 1084 mutex_lock(&mdsc->mutex); 1085 } 1086 mutex_unlock(&mdsc->mutex); 1087 } 1088 1089 void ceph_mdsc_release_request(struct kref *kref) 1090 { 1091 struct ceph_mds_request *req = container_of(kref, 1092 struct ceph_mds_request, 1093 r_kref); 1094 ceph_mdsc_release_dir_caps_async(req); 1095 destroy_reply_info(&req->r_reply_info); 1096 if (req->r_request) 1097 ceph_msg_put(req->r_request); 1098 if (req->r_reply) 1099 ceph_msg_put(req->r_reply); 1100 if (req->r_inode) { 1101 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 1102 iput(req->r_inode); 1103 } 1104 if (req->r_parent) { 1105 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 1106 iput(req->r_parent); 1107 } 1108 iput(req->r_target_inode); 1109 iput(req->r_new_inode); 1110 if (req->r_dentry) 1111 dput(req->r_dentry); 1112 if (req->r_old_dentry) 1113 dput(req->r_old_dentry); 1114 if (req->r_old_dentry_dir) { 1115 /* 1116 * track (and drop pins for) r_old_dentry_dir 1117 * separately, since r_old_dentry's d_parent may have 1118 * changed between the dir mutex being dropped and 1119 * this request being freed. 1120 */ 1121 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), 1122 CEPH_CAP_PIN); 1123 iput(req->r_old_dentry_dir); 1124 } 1125 kfree(req->r_path1); 1126 kfree(req->r_path2); 1127 put_cred(req->r_cred); 1128 if (req->r_mnt_idmap) 1129 mnt_idmap_put(req->r_mnt_idmap); 1130 if (req->r_pagelist) 1131 ceph_pagelist_release(req->r_pagelist); 1132 kfree(req->r_fscrypt_auth); 1133 kfree(req->r_altname); 1134 put_request_session(req); 1135 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); 1136 WARN_ON_ONCE(!list_empty(&req->r_wait)); 1137 kmem_cache_free(ceph_mds_request_cachep, req); 1138 } 1139 1140 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node) 1141 1142 /* 1143 * lookup session, bump ref if found. 1144 * 1145 * called under mdsc->mutex. 1146 */ 1147 static struct ceph_mds_request * 1148 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid) 1149 { 1150 struct ceph_mds_request *req; 1151 1152 req = lookup_request(&mdsc->request_tree, tid); 1153 if (req) 1154 ceph_mdsc_get_request(req); 1155 1156 return req; 1157 } 1158 1159 /* 1160 * Register an in-flight request, and assign a tid. Link to directory 1161 * are modifying (if any). 1162 * 1163 * Called under mdsc->mutex. 1164 */ 1165 static void __register_request(struct ceph_mds_client *mdsc, 1166 struct ceph_mds_request *req, 1167 struct inode *dir) 1168 { 1169 struct ceph_client *cl = mdsc->fsc->client; 1170 int ret = 0; 1171 1172 req->r_tid = ++mdsc->last_tid; 1173 if (req->r_num_caps) { 1174 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation, 1175 req->r_num_caps); 1176 if (ret < 0) { 1177 pr_err_client(cl, "%p failed to reserve caps: %d\n", 1178 req, ret); 1179 /* set req->r_err to fail early from __do_request */ 1180 req->r_err = ret; 1181 return; 1182 } 1183 } 1184 doutc(cl, "%p tid %lld\n", req, req->r_tid); 1185 ceph_mdsc_get_request(req); 1186 insert_request(&mdsc->request_tree, req); 1187 1188 req->r_cred = get_current_cred(); 1189 if (!req->r_mnt_idmap) 1190 req->r_mnt_idmap = &nop_mnt_idmap; 1191 1192 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK) 1193 mdsc->oldest_tid = req->r_tid; 1194 1195 if (dir) { 1196 struct ceph_inode_info *ci = ceph_inode(dir); 1197 1198 ihold(dir); 1199 req->r_unsafe_dir = dir; 1200 spin_lock(&ci->i_unsafe_lock); 1201 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); 1202 spin_unlock(&ci->i_unsafe_lock); 1203 } 1204 } 1205 1206 static void __unregister_request(struct ceph_mds_client *mdsc, 1207 struct ceph_mds_request *req) 1208 { 1209 doutc(mdsc->fsc->client, "%p tid %lld\n", req, req->r_tid); 1210 1211 /* Never leave an unregistered request on an unsafe list! */ 1212 list_del_init(&req->r_unsafe_item); 1213 1214 if (req->r_tid == mdsc->oldest_tid) { 1215 struct rb_node *p = rb_next(&req->r_node); 1216 mdsc->oldest_tid = 0; 1217 while (p) { 1218 struct ceph_mds_request *next_req = 1219 rb_entry(p, struct ceph_mds_request, r_node); 1220 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) { 1221 mdsc->oldest_tid = next_req->r_tid; 1222 break; 1223 } 1224 p = rb_next(p); 1225 } 1226 } 1227 1228 erase_request(&mdsc->request_tree, req); 1229 1230 if (req->r_unsafe_dir) { 1231 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 1232 spin_lock(&ci->i_unsafe_lock); 1233 list_del_init(&req->r_unsafe_dir_item); 1234 spin_unlock(&ci->i_unsafe_lock); 1235 } 1236 if (req->r_target_inode && 1237 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 1238 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); 1239 spin_lock(&ci->i_unsafe_lock); 1240 list_del_init(&req->r_unsafe_target_item); 1241 spin_unlock(&ci->i_unsafe_lock); 1242 } 1243 1244 if (req->r_unsafe_dir) { 1245 iput(req->r_unsafe_dir); 1246 req->r_unsafe_dir = NULL; 1247 } 1248 1249 complete_all(&req->r_safe_completion); 1250 1251 ceph_mdsc_put_request(req); 1252 } 1253 1254 /* 1255 * Walk back up the dentry tree until we hit a dentry representing a 1256 * non-snapshot inode. We do this using the rcu_read_lock (which must be held 1257 * when calling this) to ensure that the objects won't disappear while we're 1258 * working with them. Once we hit a candidate dentry, we attempt to take a 1259 * reference to it, and return that as the result. 1260 */ 1261 static struct inode *get_nonsnap_parent(struct dentry *dentry) 1262 { 1263 struct inode *inode = NULL; 1264 1265 while (dentry && !IS_ROOT(dentry)) { 1266 inode = d_inode_rcu(dentry); 1267 if (!inode || ceph_snap(inode) == CEPH_NOSNAP) 1268 break; 1269 dentry = dentry->d_parent; 1270 } 1271 if (inode) 1272 inode = igrab(inode); 1273 return inode; 1274 } 1275 1276 /* 1277 * Choose mds to send request to next. If there is a hint set in the 1278 * request (e.g., due to a prior forward hint from the mds), use that. 1279 * Otherwise, consult frag tree and/or caps to identify the 1280 * appropriate mds. If all else fails, choose randomly. 1281 * 1282 * Called under mdsc->mutex. 1283 */ 1284 static int __choose_mds(struct ceph_mds_client *mdsc, 1285 struct ceph_mds_request *req, 1286 bool *random) 1287 { 1288 struct inode *inode; 1289 struct ceph_inode_info *ci; 1290 struct ceph_cap *cap; 1291 int mode = req->r_direct_mode; 1292 int mds = -1; 1293 u32 hash = req->r_direct_hash; 1294 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); 1295 struct ceph_client *cl = mdsc->fsc->client; 1296 1297 if (random) 1298 *random = false; 1299 1300 /* 1301 * is there a specific mds we should try? ignore hint if we have 1302 * no session and the mds is not up (active or recovering). 1303 */ 1304 if (req->r_resend_mds >= 0 && 1305 (__have_session(mdsc, req->r_resend_mds) || 1306 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) { 1307 doutc(cl, "using resend_mds mds%d\n", req->r_resend_mds); 1308 return req->r_resend_mds; 1309 } 1310 1311 if (mode == USE_RANDOM_MDS) 1312 goto random; 1313 1314 inode = NULL; 1315 if (req->r_inode) { 1316 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) { 1317 inode = req->r_inode; 1318 ihold(inode); 1319 } else { 1320 /* req->r_dentry is non-null for LSSNAP request */ 1321 rcu_read_lock(); 1322 inode = get_nonsnap_parent(req->r_dentry); 1323 rcu_read_unlock(); 1324 doutc(cl, "using snapdir's parent %p %llx.%llx\n", 1325 inode, ceph_vinop(inode)); 1326 } 1327 } else if (req->r_dentry) { 1328 /* ignore race with rename; old or new d_parent is okay */ 1329 struct dentry *parent; 1330 struct inode *dir; 1331 1332 rcu_read_lock(); 1333 parent = READ_ONCE(req->r_dentry->d_parent); 1334 dir = req->r_parent ? : d_inode_rcu(parent); 1335 1336 if (!dir || dir->i_sb != mdsc->fsc->sb) { 1337 /* not this fs or parent went negative */ 1338 inode = d_inode(req->r_dentry); 1339 if (inode) 1340 ihold(inode); 1341 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 1342 /* direct snapped/virtual snapdir requests 1343 * based on parent dir inode */ 1344 inode = get_nonsnap_parent(parent); 1345 doutc(cl, "using nonsnap parent %p %llx.%llx\n", 1346 inode, ceph_vinop(inode)); 1347 } else { 1348 /* dentry target */ 1349 inode = d_inode(req->r_dentry); 1350 if (!inode || mode == USE_AUTH_MDS) { 1351 /* dir + name */ 1352 inode = igrab(dir); 1353 hash = ceph_dentry_hash(dir, req->r_dentry); 1354 is_hash = true; 1355 } else { 1356 ihold(inode); 1357 } 1358 } 1359 rcu_read_unlock(); 1360 } 1361 1362 if (!inode) 1363 goto random; 1364 1365 doutc(cl, "%p %llx.%llx is_hash=%d (0x%x) mode %d\n", inode, 1366 ceph_vinop(inode), (int)is_hash, hash, mode); 1367 ci = ceph_inode(inode); 1368 1369 if (is_hash && S_ISDIR(inode->i_mode)) { 1370 struct ceph_inode_frag frag; 1371 int found; 1372 1373 ceph_choose_frag(ci, hash, &frag, &found); 1374 if (found) { 1375 if (mode == USE_ANY_MDS && frag.ndist > 0) { 1376 u8 r; 1377 1378 /* choose a random replica */ 1379 get_random_bytes(&r, 1); 1380 r %= frag.ndist; 1381 mds = frag.dist[r]; 1382 doutc(cl, "%p %llx.%llx frag %u mds%d (%d/%d)\n", 1383 inode, ceph_vinop(inode), frag.frag, 1384 mds, (int)r, frag.ndist); 1385 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1386 CEPH_MDS_STATE_ACTIVE && 1387 !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds)) 1388 goto out; 1389 } 1390 1391 /* since this file/dir wasn't known to be 1392 * replicated, then we want to look for the 1393 * authoritative mds. */ 1394 if (frag.mds >= 0) { 1395 /* choose auth mds */ 1396 mds = frag.mds; 1397 doutc(cl, "%p %llx.%llx frag %u mds%d (auth)\n", 1398 inode, ceph_vinop(inode), frag.frag, mds); 1399 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1400 CEPH_MDS_STATE_ACTIVE) { 1401 if (!ceph_mdsmap_is_laggy(mdsc->mdsmap, 1402 mds)) 1403 goto out; 1404 } 1405 } 1406 mode = USE_AUTH_MDS; 1407 } 1408 } 1409 1410 spin_lock(&ci->i_ceph_lock); 1411 cap = NULL; 1412 if (mode == USE_AUTH_MDS) 1413 cap = ci->i_auth_cap; 1414 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) 1415 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); 1416 if (!cap) { 1417 spin_unlock(&ci->i_ceph_lock); 1418 iput(inode); 1419 goto random; 1420 } 1421 mds = cap->session->s_mds; 1422 doutc(cl, "%p %llx.%llx mds%d (%scap %p)\n", inode, 1423 ceph_vinop(inode), mds, 1424 cap == ci->i_auth_cap ? "auth " : "", cap); 1425 spin_unlock(&ci->i_ceph_lock); 1426 out: 1427 iput(inode); 1428 return mds; 1429 1430 random: 1431 if (random) 1432 *random = true; 1433 1434 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap); 1435 doutc(cl, "chose random mds%d\n", mds); 1436 return mds; 1437 } 1438 1439 1440 /* 1441 * session messages 1442 */ 1443 struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq) 1444 { 1445 struct ceph_msg *msg; 1446 struct ceph_mds_session_head *h; 1447 1448 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS, 1449 false); 1450 if (!msg) { 1451 pr_err("ENOMEM creating session %s msg\n", 1452 ceph_session_op_name(op)); 1453 return NULL; 1454 } 1455 h = msg->front.iov_base; 1456 h->op = cpu_to_le32(op); 1457 h->seq = cpu_to_le64(seq); 1458 1459 return msg; 1460 } 1461 1462 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED; 1463 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8) 1464 static int encode_supported_features(void **p, void *end) 1465 { 1466 static const size_t count = ARRAY_SIZE(feature_bits); 1467 1468 if (count > 0) { 1469 size_t i; 1470 size_t size = FEATURE_BYTES(count); 1471 unsigned long bit; 1472 1473 if (WARN_ON_ONCE(*p + 4 + size > end)) 1474 return -ERANGE; 1475 1476 ceph_encode_32(p, size); 1477 memset(*p, 0, size); 1478 for (i = 0; i < count; i++) { 1479 bit = feature_bits[i]; 1480 ((unsigned char *)(*p))[bit / 8] |= BIT(bit % 8); 1481 } 1482 *p += size; 1483 } else { 1484 if (WARN_ON_ONCE(*p + 4 > end)) 1485 return -ERANGE; 1486 1487 ceph_encode_32(p, 0); 1488 } 1489 1490 return 0; 1491 } 1492 1493 static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED; 1494 #define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8) 1495 static int encode_metric_spec(void **p, void *end) 1496 { 1497 static const size_t count = ARRAY_SIZE(metric_bits); 1498 1499 /* header */ 1500 if (WARN_ON_ONCE(*p + 2 > end)) 1501 return -ERANGE; 1502 1503 ceph_encode_8(p, 1); /* version */ 1504 ceph_encode_8(p, 1); /* compat */ 1505 1506 if (count > 0) { 1507 size_t i; 1508 size_t size = METRIC_BYTES(count); 1509 1510 if (WARN_ON_ONCE(*p + 4 + 4 + size > end)) 1511 return -ERANGE; 1512 1513 /* metric spec info length */ 1514 ceph_encode_32(p, 4 + size); 1515 1516 /* metric spec */ 1517 ceph_encode_32(p, size); 1518 memset(*p, 0, size); 1519 for (i = 0; i < count; i++) 1520 ((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8); 1521 *p += size; 1522 } else { 1523 if (WARN_ON_ONCE(*p + 4 + 4 > end)) 1524 return -ERANGE; 1525 1526 /* metric spec info length */ 1527 ceph_encode_32(p, 4); 1528 /* metric spec */ 1529 ceph_encode_32(p, 0); 1530 } 1531 1532 return 0; 1533 } 1534 1535 /* 1536 * session message, specialization for CEPH_SESSION_REQUEST_OPEN 1537 * to include additional client metadata fields. 1538 */ 1539 static struct ceph_msg * 1540 create_session_full_msg(struct ceph_mds_client *mdsc, int op, u64 seq) 1541 { 1542 struct ceph_msg *msg; 1543 struct ceph_mds_session_head *h; 1544 int i; 1545 int extra_bytes = 0; 1546 int metadata_key_count = 0; 1547 struct ceph_options *opt = mdsc->fsc->client->options; 1548 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options; 1549 struct ceph_client *cl = mdsc->fsc->client; 1550 size_t size, count; 1551 void *p, *end; 1552 int ret; 1553 1554 const char* metadata[][2] = { 1555 {"hostname", mdsc->nodename}, 1556 {"kernel_version", init_utsname()->release}, 1557 {"entity_id", opt->name ? : ""}, 1558 {"root", fsopt->server_path ? : "/"}, 1559 {NULL, NULL} 1560 }; 1561 1562 /* Calculate serialized length of metadata */ 1563 extra_bytes = 4; /* map length */ 1564 for (i = 0; metadata[i][0]; ++i) { 1565 extra_bytes += 8 + strlen(metadata[i][0]) + 1566 strlen(metadata[i][1]); 1567 metadata_key_count++; 1568 } 1569 1570 /* supported feature */ 1571 size = 0; 1572 count = ARRAY_SIZE(feature_bits); 1573 if (count > 0) 1574 size = FEATURE_BYTES(count); 1575 extra_bytes += 4 + size; 1576 1577 /* metric spec */ 1578 size = 0; 1579 count = ARRAY_SIZE(metric_bits); 1580 if (count > 0) 1581 size = METRIC_BYTES(count); 1582 extra_bytes += 2 + 4 + 4 + size; 1583 1584 /* flags, mds auth caps and oldest_client_tid */ 1585 extra_bytes += 4 + 4 + 8; 1586 1587 /* Allocate the message */ 1588 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes, 1589 GFP_NOFS, false); 1590 if (!msg) { 1591 pr_err_client(cl, "ENOMEM creating session open msg\n"); 1592 return ERR_PTR(-ENOMEM); 1593 } 1594 p = msg->front.iov_base; 1595 end = p + msg->front.iov_len; 1596 1597 h = p; 1598 h->op = cpu_to_le32(op); 1599 h->seq = cpu_to_le64(seq); 1600 1601 /* 1602 * Serialize client metadata into waiting buffer space, using 1603 * the format that userspace expects for map<string, string> 1604 * 1605 * ClientSession messages with metadata are v7 1606 */ 1607 msg->hdr.version = cpu_to_le16(7); 1608 msg->hdr.compat_version = cpu_to_le16(1); 1609 1610 /* The write pointer, following the session_head structure */ 1611 p += sizeof(*h); 1612 1613 /* Number of entries in the map */ 1614 ceph_encode_32(&p, metadata_key_count); 1615 1616 /* Two length-prefixed strings for each entry in the map */ 1617 for (i = 0; metadata[i][0]; ++i) { 1618 size_t const key_len = strlen(metadata[i][0]); 1619 size_t const val_len = strlen(metadata[i][1]); 1620 1621 ceph_encode_32(&p, key_len); 1622 memcpy(p, metadata[i][0], key_len); 1623 p += key_len; 1624 ceph_encode_32(&p, val_len); 1625 memcpy(p, metadata[i][1], val_len); 1626 p += val_len; 1627 } 1628 1629 ret = encode_supported_features(&p, end); 1630 if (ret) { 1631 pr_err_client(cl, "encode_supported_features failed!\n"); 1632 ceph_msg_put(msg); 1633 return ERR_PTR(ret); 1634 } 1635 1636 ret = encode_metric_spec(&p, end); 1637 if (ret) { 1638 pr_err_client(cl, "encode_metric_spec failed!\n"); 1639 ceph_msg_put(msg); 1640 return ERR_PTR(ret); 1641 } 1642 1643 /* version == 5, flags */ 1644 ceph_encode_32(&p, 0); 1645 1646 /* version == 6, mds auth caps */ 1647 ceph_encode_32(&p, 0); 1648 1649 /* version == 7, oldest_client_tid */ 1650 ceph_encode_64(&p, mdsc->oldest_tid); 1651 1652 msg->front.iov_len = p - msg->front.iov_base; 1653 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1654 1655 return msg; 1656 } 1657 1658 /* 1659 * send session open request. 1660 * 1661 * called under mdsc->mutex 1662 */ 1663 static int __open_session(struct ceph_mds_client *mdsc, 1664 struct ceph_mds_session *session) 1665 { 1666 struct ceph_msg *msg; 1667 int mstate; 1668 int mds = session->s_mds; 1669 1670 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) 1671 return -EIO; 1672 1673 /* wait for mds to go active? */ 1674 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 1675 doutc(mdsc->fsc->client, "open_session to mds%d (%s)\n", mds, 1676 ceph_mds_state_name(mstate)); 1677 session->s_state = CEPH_MDS_SESSION_OPENING; 1678 session->s_renew_requested = jiffies; 1679 1680 /* send connect message */ 1681 msg = create_session_full_msg(mdsc, CEPH_SESSION_REQUEST_OPEN, 1682 session->s_seq); 1683 if (IS_ERR(msg)) 1684 return PTR_ERR(msg); 1685 ceph_con_send(&session->s_con, msg); 1686 return 0; 1687 } 1688 1689 /* 1690 * open sessions for any export targets for the given mds 1691 * 1692 * called under mdsc->mutex 1693 */ 1694 static struct ceph_mds_session * 1695 __open_export_target_session(struct ceph_mds_client *mdsc, int target) 1696 { 1697 struct ceph_mds_session *session; 1698 int ret; 1699 1700 session = __ceph_lookup_mds_session(mdsc, target); 1701 if (!session) { 1702 session = register_session(mdsc, target); 1703 if (IS_ERR(session)) 1704 return session; 1705 } 1706 if (session->s_state == CEPH_MDS_SESSION_NEW || 1707 session->s_state == CEPH_MDS_SESSION_CLOSING) { 1708 ret = __open_session(mdsc, session); 1709 if (ret) 1710 return ERR_PTR(ret); 1711 } 1712 1713 return session; 1714 } 1715 1716 struct ceph_mds_session * 1717 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target) 1718 { 1719 struct ceph_mds_session *session; 1720 struct ceph_client *cl = mdsc->fsc->client; 1721 1722 doutc(cl, "to mds%d\n", target); 1723 1724 mutex_lock(&mdsc->mutex); 1725 session = __open_export_target_session(mdsc, target); 1726 mutex_unlock(&mdsc->mutex); 1727 1728 return session; 1729 } 1730 1731 static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 1732 struct ceph_mds_session *session) 1733 { 1734 struct ceph_mds_info *mi; 1735 struct ceph_mds_session *ts; 1736 int i, mds = session->s_mds; 1737 struct ceph_client *cl = mdsc->fsc->client; 1738 1739 if (mds >= mdsc->mdsmap->possible_max_rank) 1740 return; 1741 1742 mi = &mdsc->mdsmap->m_info[mds]; 1743 doutc(cl, "for mds%d (%d targets)\n", session->s_mds, 1744 mi->num_export_targets); 1745 1746 for (i = 0; i < mi->num_export_targets; i++) { 1747 ts = __open_export_target_session(mdsc, mi->export_targets[i]); 1748 ceph_put_mds_session(ts); 1749 } 1750 } 1751 1752 /* 1753 * session caps 1754 */ 1755 1756 static void detach_cap_releases(struct ceph_mds_session *session, 1757 struct list_head *target) 1758 { 1759 struct ceph_client *cl = session->s_mdsc->fsc->client; 1760 1761 lockdep_assert_held(&session->s_cap_lock); 1762 1763 list_splice_init(&session->s_cap_releases, target); 1764 session->s_num_cap_releases = 0; 1765 doutc(cl, "mds%d\n", session->s_mds); 1766 } 1767 1768 static void dispose_cap_releases(struct ceph_mds_client *mdsc, 1769 struct list_head *dispose) 1770 { 1771 while (!list_empty(dispose)) { 1772 struct ceph_cap *cap; 1773 /* zero out the in-progress message */ 1774 cap = list_first_entry(dispose, struct ceph_cap, session_caps); 1775 list_del(&cap->session_caps); 1776 ceph_put_cap(mdsc, cap); 1777 } 1778 } 1779 1780 static void cleanup_session_requests(struct ceph_mds_client *mdsc, 1781 struct ceph_mds_session *session) 1782 { 1783 struct ceph_client *cl = mdsc->fsc->client; 1784 struct ceph_mds_request *req; 1785 struct rb_node *p; 1786 1787 doutc(cl, "mds%d\n", session->s_mds); 1788 mutex_lock(&mdsc->mutex); 1789 while (!list_empty(&session->s_unsafe)) { 1790 req = list_first_entry(&session->s_unsafe, 1791 struct ceph_mds_request, r_unsafe_item); 1792 pr_warn_ratelimited_client(cl, " dropping unsafe request %llu\n", 1793 req->r_tid); 1794 if (req->r_target_inode) 1795 mapping_set_error(req->r_target_inode->i_mapping, -EIO); 1796 if (req->r_unsafe_dir) 1797 mapping_set_error(req->r_unsafe_dir->i_mapping, -EIO); 1798 __unregister_request(mdsc, req); 1799 } 1800 /* zero r_attempts, so kick_requests() will re-send requests */ 1801 p = rb_first(&mdsc->request_tree); 1802 while (p) { 1803 req = rb_entry(p, struct ceph_mds_request, r_node); 1804 p = rb_next(p); 1805 if (req->r_session && 1806 req->r_session->s_mds == session->s_mds) 1807 req->r_attempts = 0; 1808 } 1809 mutex_unlock(&mdsc->mutex); 1810 } 1811 1812 /* 1813 * Helper to safely iterate over all caps associated with a session, with 1814 * special care taken to handle a racing __ceph_remove_cap(). 1815 * 1816 * Caller must hold session s_mutex. 1817 */ 1818 int ceph_iterate_session_caps(struct ceph_mds_session *session, 1819 int (*cb)(struct inode *, int mds, void *), 1820 void *arg) 1821 { 1822 struct ceph_client *cl = session->s_mdsc->fsc->client; 1823 struct list_head *p; 1824 struct ceph_cap *cap; 1825 struct inode *inode, *last_inode = NULL; 1826 struct ceph_cap *old_cap = NULL; 1827 int ret; 1828 1829 doutc(cl, "%p mds%d\n", session, session->s_mds); 1830 spin_lock(&session->s_cap_lock); 1831 p = session->s_caps.next; 1832 while (p != &session->s_caps) { 1833 int mds; 1834 1835 cap = list_entry(p, struct ceph_cap, session_caps); 1836 inode = igrab(&cap->ci->netfs.inode); 1837 if (!inode) { 1838 p = p->next; 1839 continue; 1840 } 1841 session->s_cap_iterator = cap; 1842 mds = cap->mds; 1843 spin_unlock(&session->s_cap_lock); 1844 1845 if (last_inode) { 1846 iput(last_inode); 1847 last_inode = NULL; 1848 } 1849 if (old_cap) { 1850 ceph_put_cap(session->s_mdsc, old_cap); 1851 old_cap = NULL; 1852 } 1853 1854 ret = cb(inode, mds, arg); 1855 last_inode = inode; 1856 1857 spin_lock(&session->s_cap_lock); 1858 p = p->next; 1859 if (!cap->ci) { 1860 doutc(cl, "finishing cap %p removal\n", cap); 1861 BUG_ON(cap->session != session); 1862 cap->session = NULL; 1863 list_del_init(&cap->session_caps); 1864 session->s_nr_caps--; 1865 atomic64_dec(&session->s_mdsc->metric.total_caps); 1866 if (cap->queue_release) 1867 __ceph_queue_cap_release(session, cap); 1868 else 1869 old_cap = cap; /* put_cap it w/o locks held */ 1870 } 1871 if (ret < 0) 1872 goto out; 1873 } 1874 ret = 0; 1875 out: 1876 session->s_cap_iterator = NULL; 1877 spin_unlock(&session->s_cap_lock); 1878 1879 iput(last_inode); 1880 if (old_cap) 1881 ceph_put_cap(session->s_mdsc, old_cap); 1882 1883 return ret; 1884 } 1885 1886 static int remove_session_caps_cb(struct inode *inode, int mds, void *arg) 1887 { 1888 struct ceph_inode_info *ci = ceph_inode(inode); 1889 struct ceph_client *cl = ceph_inode_to_client(inode); 1890 bool invalidate = false; 1891 struct ceph_cap *cap; 1892 int iputs = 0; 1893 1894 spin_lock(&ci->i_ceph_lock); 1895 cap = __get_cap_for_mds(ci, mds); 1896 if (cap) { 1897 doutc(cl, " removing cap %p, ci is %p, inode is %p\n", 1898 cap, ci, &ci->netfs.inode); 1899 1900 iputs = ceph_purge_inode_cap(inode, cap, &invalidate); 1901 } 1902 spin_unlock(&ci->i_ceph_lock); 1903 1904 if (cap) 1905 wake_up_all(&ci->i_cap_wq); 1906 if (invalidate) 1907 ceph_queue_invalidate(inode); 1908 while (iputs--) 1909 iput(inode); 1910 return 0; 1911 } 1912 1913 /* 1914 * caller must hold session s_mutex 1915 */ 1916 static void remove_session_caps(struct ceph_mds_session *session) 1917 { 1918 struct ceph_fs_client *fsc = session->s_mdsc->fsc; 1919 struct super_block *sb = fsc->sb; 1920 LIST_HEAD(dispose); 1921 1922 doutc(fsc->client, "on %p\n", session); 1923 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc); 1924 1925 wake_up_all(&fsc->mdsc->cap_flushing_wq); 1926 1927 spin_lock(&session->s_cap_lock); 1928 if (session->s_nr_caps > 0) { 1929 struct inode *inode; 1930 struct ceph_cap *cap, *prev = NULL; 1931 struct ceph_vino vino; 1932 /* 1933 * iterate_session_caps() skips inodes that are being 1934 * deleted, we need to wait until deletions are complete. 1935 * __wait_on_freeing_inode() is designed for the job, 1936 * but it is not exported, so use lookup inode function 1937 * to access it. 1938 */ 1939 while (!list_empty(&session->s_caps)) { 1940 cap = list_entry(session->s_caps.next, 1941 struct ceph_cap, session_caps); 1942 if (cap == prev) 1943 break; 1944 prev = cap; 1945 vino = cap->ci->i_vino; 1946 spin_unlock(&session->s_cap_lock); 1947 1948 inode = ceph_find_inode(sb, vino); 1949 iput(inode); 1950 1951 spin_lock(&session->s_cap_lock); 1952 } 1953 } 1954 1955 // drop cap expires and unlock s_cap_lock 1956 detach_cap_releases(session, &dispose); 1957 1958 BUG_ON(session->s_nr_caps > 0); 1959 BUG_ON(!list_empty(&session->s_cap_flushing)); 1960 spin_unlock(&session->s_cap_lock); 1961 dispose_cap_releases(session->s_mdsc, &dispose); 1962 } 1963 1964 enum { 1965 RECONNECT, 1966 RENEWCAPS, 1967 FORCE_RO, 1968 }; 1969 1970 /* 1971 * wake up any threads waiting on this session's caps. if the cap is 1972 * old (didn't get renewed on the client reconnect), remove it now. 1973 * 1974 * caller must hold s_mutex. 1975 */ 1976 static int wake_up_session_cb(struct inode *inode, int mds, void *arg) 1977 { 1978 struct ceph_inode_info *ci = ceph_inode(inode); 1979 unsigned long ev = (unsigned long)arg; 1980 1981 if (ev == RECONNECT) { 1982 spin_lock(&ci->i_ceph_lock); 1983 ci->i_wanted_max_size = 0; 1984 ci->i_requested_max_size = 0; 1985 spin_unlock(&ci->i_ceph_lock); 1986 } else if (ev == RENEWCAPS) { 1987 struct ceph_cap *cap; 1988 1989 spin_lock(&ci->i_ceph_lock); 1990 cap = __get_cap_for_mds(ci, mds); 1991 /* mds did not re-issue stale cap */ 1992 if (cap && cap->cap_gen < atomic_read(&cap->session->s_cap_gen)) 1993 cap->issued = cap->implemented = CEPH_CAP_PIN; 1994 spin_unlock(&ci->i_ceph_lock); 1995 } else if (ev == FORCE_RO) { 1996 } 1997 wake_up_all(&ci->i_cap_wq); 1998 return 0; 1999 } 2000 2001 static void wake_up_session_caps(struct ceph_mds_session *session, int ev) 2002 { 2003 struct ceph_client *cl = session->s_mdsc->fsc->client; 2004 2005 doutc(cl, "session %p mds%d\n", session, session->s_mds); 2006 ceph_iterate_session_caps(session, wake_up_session_cb, 2007 (void *)(unsigned long)ev); 2008 } 2009 2010 /* 2011 * Send periodic message to MDS renewing all currently held caps. The 2012 * ack will reset the expiration for all caps from this session. 2013 * 2014 * caller holds s_mutex 2015 */ 2016 static int send_renew_caps(struct ceph_mds_client *mdsc, 2017 struct ceph_mds_session *session) 2018 { 2019 struct ceph_client *cl = mdsc->fsc->client; 2020 struct ceph_msg *msg; 2021 int state; 2022 2023 if (time_after_eq(jiffies, session->s_cap_ttl) && 2024 time_after_eq(session->s_cap_ttl, session->s_renew_requested)) 2025 pr_info_client(cl, "mds%d caps stale\n", session->s_mds); 2026 session->s_renew_requested = jiffies; 2027 2028 /* do not try to renew caps until a recovering mds has reconnected 2029 * with its clients. */ 2030 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); 2031 if (state < CEPH_MDS_STATE_RECONNECT) { 2032 doutc(cl, "ignoring mds%d (%s)\n", session->s_mds, 2033 ceph_mds_state_name(state)); 2034 return 0; 2035 } 2036 2037 doutc(cl, "to mds%d (%s)\n", session->s_mds, 2038 ceph_mds_state_name(state)); 2039 msg = create_session_full_msg(mdsc, CEPH_SESSION_REQUEST_RENEWCAPS, 2040 ++session->s_renew_seq); 2041 if (IS_ERR(msg)) 2042 return PTR_ERR(msg); 2043 ceph_con_send(&session->s_con, msg); 2044 return 0; 2045 } 2046 2047 static int send_flushmsg_ack(struct ceph_mds_client *mdsc, 2048 struct ceph_mds_session *session, u64 seq) 2049 { 2050 struct ceph_client *cl = mdsc->fsc->client; 2051 struct ceph_msg *msg; 2052 2053 doutc(cl, "to mds%d (%s)s seq %lld\n", session->s_mds, 2054 ceph_session_state_name(session->s_state), seq); 2055 msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq); 2056 if (!msg) 2057 return -ENOMEM; 2058 ceph_con_send(&session->s_con, msg); 2059 return 0; 2060 } 2061 2062 2063 /* 2064 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 2065 * 2066 * Called under session->s_mutex 2067 */ 2068 static void renewed_caps(struct ceph_mds_client *mdsc, 2069 struct ceph_mds_session *session, int is_renew) 2070 { 2071 struct ceph_client *cl = mdsc->fsc->client; 2072 int was_stale; 2073 int wake = 0; 2074 2075 spin_lock(&session->s_cap_lock); 2076 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl); 2077 2078 session->s_cap_ttl = session->s_renew_requested + 2079 mdsc->mdsmap->m_session_timeout*HZ; 2080 2081 if (was_stale) { 2082 if (time_before(jiffies, session->s_cap_ttl)) { 2083 pr_info_client(cl, "mds%d caps renewed\n", 2084 session->s_mds); 2085 wake = 1; 2086 } else { 2087 pr_info_client(cl, "mds%d caps still stale\n", 2088 session->s_mds); 2089 } 2090 } 2091 doutc(cl, "mds%d ttl now %lu, was %s, now %s\n", session->s_mds, 2092 session->s_cap_ttl, was_stale ? "stale" : "fresh", 2093 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); 2094 spin_unlock(&session->s_cap_lock); 2095 2096 if (wake) 2097 wake_up_session_caps(session, RENEWCAPS); 2098 } 2099 2100 /* 2101 * send a session close request 2102 */ 2103 static int request_close_session(struct ceph_mds_session *session) 2104 { 2105 struct ceph_client *cl = session->s_mdsc->fsc->client; 2106 struct ceph_msg *msg; 2107 2108 doutc(cl, "mds%d state %s seq %lld\n", session->s_mds, 2109 ceph_session_state_name(session->s_state), session->s_seq); 2110 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE, 2111 session->s_seq); 2112 if (!msg) 2113 return -ENOMEM; 2114 ceph_con_send(&session->s_con, msg); 2115 return 1; 2116 } 2117 2118 /* 2119 * Called with s_mutex held. 2120 */ 2121 static int __close_session(struct ceph_mds_client *mdsc, 2122 struct ceph_mds_session *session) 2123 { 2124 if (session->s_state >= CEPH_MDS_SESSION_CLOSING) 2125 return 0; 2126 session->s_state = CEPH_MDS_SESSION_CLOSING; 2127 return request_close_session(session); 2128 } 2129 2130 static bool drop_negative_children(struct dentry *dentry) 2131 { 2132 struct dentry *child; 2133 bool all_negative = true; 2134 2135 if (!d_is_dir(dentry)) 2136 goto out; 2137 2138 spin_lock(&dentry->d_lock); 2139 hlist_for_each_entry(child, &dentry->d_children, d_sib) { 2140 if (d_really_is_positive(child)) { 2141 all_negative = false; 2142 break; 2143 } 2144 } 2145 spin_unlock(&dentry->d_lock); 2146 2147 if (all_negative) 2148 shrink_dcache_parent(dentry); 2149 out: 2150 return all_negative; 2151 } 2152 2153 /* 2154 * Trim old(er) caps. 2155 * 2156 * Because we can't cache an inode without one or more caps, we do 2157 * this indirectly: if a cap is unused, we prune its aliases, at which 2158 * point the inode will hopefully get dropped to. 2159 * 2160 * Yes, this is a bit sloppy. Our only real goal here is to respond to 2161 * memory pressure from the MDS, though, so it needn't be perfect. 2162 */ 2163 static int trim_caps_cb(struct inode *inode, int mds, void *arg) 2164 { 2165 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); 2166 struct ceph_client *cl = mdsc->fsc->client; 2167 int *remaining = arg; 2168 struct ceph_inode_info *ci = ceph_inode(inode); 2169 int used, wanted, oissued, mine; 2170 struct ceph_cap *cap; 2171 2172 if (*remaining <= 0) 2173 return -1; 2174 2175 spin_lock(&ci->i_ceph_lock); 2176 cap = __get_cap_for_mds(ci, mds); 2177 if (!cap) { 2178 spin_unlock(&ci->i_ceph_lock); 2179 return 0; 2180 } 2181 mine = cap->issued | cap->implemented; 2182 used = __ceph_caps_used(ci); 2183 wanted = __ceph_caps_file_wanted(ci); 2184 oissued = __ceph_caps_issued_other(ci, cap); 2185 2186 doutc(cl, "%p %llx.%llx cap %p mine %s oissued %s used %s wanted %s\n", 2187 inode, ceph_vinop(inode), cap, ceph_cap_string(mine), 2188 ceph_cap_string(oissued), ceph_cap_string(used), 2189 ceph_cap_string(wanted)); 2190 if (cap == ci->i_auth_cap) { 2191 if (ci->i_dirty_caps || ci->i_flushing_caps || 2192 !list_empty(&ci->i_cap_snaps)) 2193 goto out; 2194 if ((used | wanted) & CEPH_CAP_ANY_WR) 2195 goto out; 2196 /* Note: it's possible that i_filelock_ref becomes non-zero 2197 * after dropping auth caps. It doesn't hurt because reply 2198 * of lock mds request will re-add auth caps. */ 2199 if (atomic_read(&ci->i_filelock_ref) > 0) 2200 goto out; 2201 } 2202 /* The inode has cached pages, but it's no longer used. 2203 * we can safely drop it */ 2204 if (S_ISREG(inode->i_mode) && 2205 wanted == 0 && used == CEPH_CAP_FILE_CACHE && 2206 !(oissued & CEPH_CAP_FILE_CACHE)) { 2207 used = 0; 2208 oissued = 0; 2209 } 2210 if ((used | wanted) & ~oissued & mine) 2211 goto out; /* we need these caps */ 2212 2213 if (oissued) { 2214 /* we aren't the only cap.. just remove us */ 2215 ceph_remove_cap(mdsc, cap, true); 2216 (*remaining)--; 2217 } else { 2218 struct dentry *dentry; 2219 /* try dropping referring dentries */ 2220 spin_unlock(&ci->i_ceph_lock); 2221 dentry = d_find_any_alias(inode); 2222 if (dentry && drop_negative_children(dentry)) { 2223 int count; 2224 dput(dentry); 2225 d_prune_aliases(inode); 2226 count = icount_read(inode); 2227 if (count == 1) 2228 (*remaining)--; 2229 doutc(cl, "%p %llx.%llx cap %p pruned, count now %d\n", 2230 inode, ceph_vinop(inode), cap, count); 2231 } else { 2232 dput(dentry); 2233 } 2234 return 0; 2235 } 2236 2237 out: 2238 spin_unlock(&ci->i_ceph_lock); 2239 return 0; 2240 } 2241 2242 /* 2243 * Trim session cap count down to some max number. 2244 */ 2245 int ceph_trim_caps(struct ceph_mds_client *mdsc, 2246 struct ceph_mds_session *session, 2247 int max_caps) 2248 { 2249 struct ceph_client *cl = mdsc->fsc->client; 2250 int trim_caps = session->s_nr_caps - max_caps; 2251 2252 doutc(cl, "mds%d start: %d / %d, trim %d\n", session->s_mds, 2253 session->s_nr_caps, max_caps, trim_caps); 2254 if (trim_caps > 0) { 2255 int remaining = trim_caps; 2256 2257 ceph_iterate_session_caps(session, trim_caps_cb, &remaining); 2258 doutc(cl, "mds%d done: %d / %d, trimmed %d\n", 2259 session->s_mds, session->s_nr_caps, max_caps, 2260 trim_caps - remaining); 2261 } 2262 2263 ceph_flush_session_cap_releases(mdsc, session); 2264 return 0; 2265 } 2266 2267 static int check_caps_flush(struct ceph_mds_client *mdsc, 2268 u64 want_flush_tid) 2269 { 2270 struct ceph_client *cl = mdsc->fsc->client; 2271 int ret = 1; 2272 2273 spin_lock(&mdsc->cap_dirty_lock); 2274 if (!list_empty(&mdsc->cap_flush_list)) { 2275 struct ceph_cap_flush *cf = 2276 list_first_entry(&mdsc->cap_flush_list, 2277 struct ceph_cap_flush, g_list); 2278 if (cf->tid <= want_flush_tid) { 2279 doutc(cl, "still flushing tid %llu <= %llu\n", 2280 cf->tid, want_flush_tid); 2281 ret = 0; 2282 } 2283 } 2284 spin_unlock(&mdsc->cap_dirty_lock); 2285 return ret; 2286 } 2287 2288 /* 2289 * flush all dirty inode data to disk. 2290 * 2291 * returns true if we've flushed through want_flush_tid 2292 */ 2293 static void wait_caps_flush(struct ceph_mds_client *mdsc, 2294 u64 want_flush_tid) 2295 { 2296 struct ceph_client *cl = mdsc->fsc->client; 2297 2298 doutc(cl, "want %llu\n", want_flush_tid); 2299 2300 wait_event(mdsc->cap_flushing_wq, 2301 check_caps_flush(mdsc, want_flush_tid)); 2302 2303 doutc(cl, "ok, flushed thru %llu\n", want_flush_tid); 2304 } 2305 2306 /* 2307 * called under s_mutex 2308 */ 2309 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 2310 struct ceph_mds_session *session) 2311 { 2312 struct ceph_client *cl = mdsc->fsc->client; 2313 struct ceph_msg *msg = NULL; 2314 struct ceph_mds_cap_release *head; 2315 struct ceph_mds_cap_item *item; 2316 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; 2317 struct ceph_cap *cap; 2318 LIST_HEAD(tmp_list); 2319 int num_cap_releases; 2320 __le32 barrier, *cap_barrier; 2321 2322 down_read(&osdc->lock); 2323 barrier = cpu_to_le32(osdc->epoch_barrier); 2324 up_read(&osdc->lock); 2325 2326 spin_lock(&session->s_cap_lock); 2327 again: 2328 list_splice_init(&session->s_cap_releases, &tmp_list); 2329 num_cap_releases = session->s_num_cap_releases; 2330 session->s_num_cap_releases = 0; 2331 spin_unlock(&session->s_cap_lock); 2332 2333 while (!list_empty(&tmp_list)) { 2334 if (!msg) { 2335 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, 2336 PAGE_SIZE, GFP_NOFS, false); 2337 if (!msg) 2338 goto out_err; 2339 head = msg->front.iov_base; 2340 head->num = cpu_to_le32(0); 2341 msg->front.iov_len = sizeof(*head); 2342 2343 msg->hdr.version = cpu_to_le16(2); 2344 msg->hdr.compat_version = cpu_to_le16(1); 2345 } 2346 2347 cap = list_first_entry(&tmp_list, struct ceph_cap, 2348 session_caps); 2349 list_del(&cap->session_caps); 2350 num_cap_releases--; 2351 2352 head = msg->front.iov_base; 2353 put_unaligned_le32(get_unaligned_le32(&head->num) + 1, 2354 &head->num); 2355 item = msg->front.iov_base + msg->front.iov_len; 2356 item->ino = cpu_to_le64(cap->cap_ino); 2357 item->cap_id = cpu_to_le64(cap->cap_id); 2358 item->migrate_seq = cpu_to_le32(cap->mseq); 2359 item->issue_seq = cpu_to_le32(cap->issue_seq); 2360 msg->front.iov_len += sizeof(*item); 2361 2362 ceph_put_cap(mdsc, cap); 2363 2364 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { 2365 // Append cap_barrier field 2366 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2367 *cap_barrier = barrier; 2368 msg->front.iov_len += sizeof(*cap_barrier); 2369 2370 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2371 doutc(cl, "mds%d %p\n", session->s_mds, msg); 2372 ceph_con_send(&session->s_con, msg); 2373 msg = NULL; 2374 } 2375 } 2376 2377 BUG_ON(num_cap_releases != 0); 2378 2379 spin_lock(&session->s_cap_lock); 2380 if (!list_empty(&session->s_cap_releases)) 2381 goto again; 2382 spin_unlock(&session->s_cap_lock); 2383 2384 if (msg) { 2385 // Append cap_barrier field 2386 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2387 *cap_barrier = barrier; 2388 msg->front.iov_len += sizeof(*cap_barrier); 2389 2390 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2391 doutc(cl, "mds%d %p\n", session->s_mds, msg); 2392 ceph_con_send(&session->s_con, msg); 2393 } 2394 return; 2395 out_err: 2396 pr_err_client(cl, "mds%d, failed to allocate message\n", 2397 session->s_mds); 2398 spin_lock(&session->s_cap_lock); 2399 list_splice(&tmp_list, &session->s_cap_releases); 2400 session->s_num_cap_releases += num_cap_releases; 2401 spin_unlock(&session->s_cap_lock); 2402 } 2403 2404 static void ceph_cap_release_work(struct work_struct *work) 2405 { 2406 struct ceph_mds_session *session = 2407 container_of(work, struct ceph_mds_session, s_cap_release_work); 2408 2409 mutex_lock(&session->s_mutex); 2410 if (session->s_state == CEPH_MDS_SESSION_OPEN || 2411 session->s_state == CEPH_MDS_SESSION_HUNG) 2412 ceph_send_cap_releases(session->s_mdsc, session); 2413 mutex_unlock(&session->s_mutex); 2414 ceph_put_mds_session(session); 2415 } 2416 2417 void ceph_flush_session_cap_releases(struct ceph_mds_client *mdsc, 2418 struct ceph_mds_session *session) 2419 { 2420 struct ceph_client *cl = mdsc->fsc->client; 2421 if (mdsc->stopping) 2422 return; 2423 2424 ceph_get_mds_session(session); 2425 if (queue_work(mdsc->fsc->cap_wq, 2426 &session->s_cap_release_work)) { 2427 doutc(cl, "cap release work queued\n"); 2428 } else { 2429 ceph_put_mds_session(session); 2430 doutc(cl, "failed to queue cap release work\n"); 2431 } 2432 } 2433 2434 /* 2435 * caller holds session->s_cap_lock 2436 */ 2437 void __ceph_queue_cap_release(struct ceph_mds_session *session, 2438 struct ceph_cap *cap) 2439 { 2440 list_add_tail(&cap->session_caps, &session->s_cap_releases); 2441 session->s_num_cap_releases++; 2442 2443 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE)) 2444 ceph_flush_session_cap_releases(session->s_mdsc, session); 2445 } 2446 2447 static void ceph_cap_reclaim_work(struct work_struct *work) 2448 { 2449 struct ceph_mds_client *mdsc = 2450 container_of(work, struct ceph_mds_client, cap_reclaim_work); 2451 int ret = ceph_trim_dentries(mdsc); 2452 if (ret == -EAGAIN) 2453 ceph_queue_cap_reclaim_work(mdsc); 2454 } 2455 2456 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc) 2457 { 2458 struct ceph_client *cl = mdsc->fsc->client; 2459 if (mdsc->stopping) 2460 return; 2461 2462 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) { 2463 doutc(cl, "caps reclaim work queued\n"); 2464 } else { 2465 doutc(cl, "failed to queue caps release work\n"); 2466 } 2467 } 2468 2469 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr) 2470 { 2471 int val; 2472 if (!nr) 2473 return; 2474 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending); 2475 if ((val % CEPH_CAPS_PER_RELEASE) < nr) { 2476 atomic_set(&mdsc->cap_reclaim_pending, 0); 2477 ceph_queue_cap_reclaim_work(mdsc); 2478 } 2479 } 2480 2481 void ceph_queue_cap_unlink_work(struct ceph_mds_client *mdsc) 2482 { 2483 struct ceph_client *cl = mdsc->fsc->client; 2484 if (mdsc->stopping) 2485 return; 2486 2487 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_unlink_work)) { 2488 doutc(cl, "caps unlink work queued\n"); 2489 } else { 2490 doutc(cl, "failed to queue caps unlink work\n"); 2491 } 2492 } 2493 2494 static void ceph_cap_unlink_work(struct work_struct *work) 2495 { 2496 struct ceph_mds_client *mdsc = 2497 container_of(work, struct ceph_mds_client, cap_unlink_work); 2498 struct ceph_client *cl = mdsc->fsc->client; 2499 2500 doutc(cl, "begin\n"); 2501 spin_lock(&mdsc->cap_delay_lock); 2502 while (!list_empty(&mdsc->cap_unlink_delay_list)) { 2503 struct ceph_inode_info *ci; 2504 struct inode *inode; 2505 2506 ci = list_first_entry(&mdsc->cap_unlink_delay_list, 2507 struct ceph_inode_info, 2508 i_cap_delay_list); 2509 list_del_init(&ci->i_cap_delay_list); 2510 2511 inode = igrab(&ci->netfs.inode); 2512 if (inode) { 2513 spin_unlock(&mdsc->cap_delay_lock); 2514 doutc(cl, "on %p %llx.%llx\n", inode, 2515 ceph_vinop(inode)); 2516 ceph_check_caps(ci, CHECK_CAPS_FLUSH); 2517 iput(inode); 2518 spin_lock(&mdsc->cap_delay_lock); 2519 } 2520 } 2521 spin_unlock(&mdsc->cap_delay_lock); 2522 doutc(cl, "done\n"); 2523 } 2524 2525 /* 2526 * requests 2527 */ 2528 2529 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, 2530 struct inode *dir) 2531 { 2532 struct ceph_inode_info *ci = ceph_inode(dir); 2533 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; 2534 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options; 2535 size_t size = sizeof(struct ceph_mds_reply_dir_entry); 2536 unsigned int num_entries; 2537 u64 bytes_count; 2538 int order; 2539 2540 spin_lock(&ci->i_ceph_lock); 2541 num_entries = ci->i_files + ci->i_subdirs; 2542 spin_unlock(&ci->i_ceph_lock); 2543 num_entries = max(num_entries, 1U); 2544 num_entries = min(num_entries, opt->max_readdir); 2545 2546 bytes_count = (u64)size * num_entries; 2547 if (unlikely(bytes_count > ULONG_MAX)) 2548 bytes_count = ULONG_MAX; 2549 2550 order = get_order((unsigned long)bytes_count); 2551 while (order >= 0) { 2552 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL | 2553 __GFP_NOWARN | 2554 __GFP_ZERO, 2555 order); 2556 if (rinfo->dir_entries) 2557 break; 2558 order--; 2559 } 2560 if (!rinfo->dir_entries || unlikely(order < 0)) 2561 return -ENOMEM; 2562 2563 num_entries = (PAGE_SIZE << order) / size; 2564 num_entries = min(num_entries, opt->max_readdir); 2565 2566 rinfo->dir_buf_size = PAGE_SIZE << order; 2567 req->r_num_caps = num_entries + 1; 2568 req->r_args.readdir.max_entries = cpu_to_le32(num_entries); 2569 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes); 2570 return 0; 2571 } 2572 2573 /* 2574 * Create an mds request. 2575 */ 2576 struct ceph_mds_request * 2577 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) 2578 { 2579 struct ceph_mds_request *req; 2580 2581 req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS); 2582 if (!req) 2583 return ERR_PTR(-ENOMEM); 2584 2585 mutex_init(&req->r_fill_mutex); 2586 req->r_mdsc = mdsc; 2587 req->r_started = jiffies; 2588 req->r_start_latency = ktime_get(); 2589 req->r_resend_mds = -1; 2590 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 2591 INIT_LIST_HEAD(&req->r_unsafe_target_item); 2592 req->r_fmode = -1; 2593 req->r_feature_needed = -1; 2594 kref_init(&req->r_kref); 2595 RB_CLEAR_NODE(&req->r_node); 2596 INIT_LIST_HEAD(&req->r_wait); 2597 init_completion(&req->r_completion); 2598 init_completion(&req->r_safe_completion); 2599 INIT_LIST_HEAD(&req->r_unsafe_item); 2600 2601 ktime_get_coarse_real_ts64(&req->r_stamp); 2602 2603 req->r_op = op; 2604 req->r_direct_mode = mode; 2605 return req; 2606 } 2607 2608 /* 2609 * return oldest (lowest) request, tid in request tree, 0 if none. 2610 * 2611 * called under mdsc->mutex. 2612 */ 2613 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) 2614 { 2615 if (RB_EMPTY_ROOT(&mdsc->request_tree)) 2616 return NULL; 2617 return rb_entry(rb_first(&mdsc->request_tree), 2618 struct ceph_mds_request, r_node); 2619 } 2620 2621 static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) 2622 { 2623 return mdsc->oldest_tid; 2624 } 2625 2626 #if IS_ENABLED(CONFIG_FS_ENCRYPTION) 2627 static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen) 2628 { 2629 struct inode *dir = req->r_parent; 2630 struct dentry *dentry = req->r_dentry; 2631 const struct qstr *name = req->r_dname; 2632 u8 *cryptbuf = NULL; 2633 u32 len = 0; 2634 int ret = 0; 2635 2636 /* only encode if we have parent and dentry */ 2637 if (!dir || !dentry) 2638 goto success; 2639 2640 /* No-op unless this is encrypted */ 2641 if (!IS_ENCRYPTED(dir)) 2642 goto success; 2643 2644 ret = ceph_fscrypt_prepare_readdir(dir); 2645 if (ret < 0) 2646 return ERR_PTR(ret); 2647 2648 /* No key? Just ignore it. */ 2649 if (!fscrypt_has_encryption_key(dir)) 2650 goto success; 2651 2652 if (!name) 2653 name = &dentry->d_name; 2654 2655 if (!fscrypt_fname_encrypted_size(dir, name->len, NAME_MAX, &len)) { 2656 WARN_ON_ONCE(1); 2657 return ERR_PTR(-ENAMETOOLONG); 2658 } 2659 2660 /* No need to append altname if name is short enough */ 2661 if (len <= CEPH_NOHASH_NAME_MAX) { 2662 len = 0; 2663 goto success; 2664 } 2665 2666 cryptbuf = kmalloc(len, GFP_KERNEL); 2667 if (!cryptbuf) 2668 return ERR_PTR(-ENOMEM); 2669 2670 ret = fscrypt_fname_encrypt(dir, name, cryptbuf, len); 2671 if (ret) { 2672 kfree(cryptbuf); 2673 return ERR_PTR(ret); 2674 } 2675 success: 2676 *plen = len; 2677 return cryptbuf; 2678 } 2679 #else 2680 static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen) 2681 { 2682 *plen = 0; 2683 return NULL; 2684 } 2685 #endif 2686 2687 /** 2688 * ceph_mdsc_build_path - build a path string to a given dentry 2689 * @mdsc: mds client 2690 * @dentry: dentry to which path should be built 2691 * @path_info: output path, length, base ino+snap, and freepath ownership flag 2692 * @for_wire: is this path going to be sent to the MDS? 2693 * 2694 * Build a string that represents the path to the dentry. This is mostly called 2695 * for two different purposes: 2696 * 2697 * 1) we need to build a path string to send to the MDS (for_wire == true) 2698 * 2) we need a path string for local presentation (e.g. debugfs) 2699 * (for_wire == false) 2700 * 2701 * The path is built in reverse, starting with the dentry. Walk back up toward 2702 * the root, building the path until the first non-snapped inode is reached 2703 * (for_wire) or the root inode is reached (!for_wire). 2704 * 2705 * Encode hidden .snap dirs as a double /, i.e. 2706 * foo/.snap/bar -> foo//bar 2707 */ 2708 char *ceph_mdsc_build_path(struct ceph_mds_client *mdsc, struct dentry *dentry, 2709 struct ceph_path_info *path_info, int for_wire) 2710 { 2711 struct ceph_client *cl = mdsc->fsc->client; 2712 struct dentry *cur; 2713 struct inode *inode; 2714 char *path; 2715 int pos; 2716 unsigned seq; 2717 u64 base; 2718 2719 if (!dentry) 2720 return ERR_PTR(-EINVAL); 2721 2722 path = __getname(); 2723 if (!path) 2724 return ERR_PTR(-ENOMEM); 2725 retry: 2726 pos = PATH_MAX - 1; 2727 path[pos] = '\0'; 2728 2729 seq = read_seqbegin(&rename_lock); 2730 cur = dget(dentry); 2731 for (;;) { 2732 struct dentry *parent; 2733 2734 spin_lock(&cur->d_lock); 2735 inode = d_inode(cur); 2736 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 2737 doutc(cl, "path+%d: %p SNAPDIR\n", pos, cur); 2738 spin_unlock(&cur->d_lock); 2739 parent = dget_parent(cur); 2740 } else if (for_wire && inode && dentry != cur && 2741 ceph_snap(inode) == CEPH_NOSNAP) { 2742 spin_unlock(&cur->d_lock); 2743 pos++; /* get rid of any prepended '/' */ 2744 break; 2745 } else if (!for_wire || !IS_ENCRYPTED(d_inode(cur->d_parent))) { 2746 pos -= cur->d_name.len; 2747 if (pos < 0) { 2748 spin_unlock(&cur->d_lock); 2749 break; 2750 } 2751 memcpy(path + pos, cur->d_name.name, cur->d_name.len); 2752 spin_unlock(&cur->d_lock); 2753 parent = dget_parent(cur); 2754 } else { 2755 int len, ret; 2756 char buf[NAME_MAX]; 2757 2758 /* 2759 * Proactively copy name into buf, in case we need to 2760 * present it as-is. 2761 */ 2762 memcpy(buf, cur->d_name.name, cur->d_name.len); 2763 len = cur->d_name.len; 2764 spin_unlock(&cur->d_lock); 2765 parent = dget_parent(cur); 2766 2767 ret = ceph_fscrypt_prepare_readdir(d_inode(parent)); 2768 if (ret < 0) { 2769 dput(parent); 2770 dput(cur); 2771 return ERR_PTR(ret); 2772 } 2773 2774 if (fscrypt_has_encryption_key(d_inode(parent))) { 2775 len = ceph_encode_encrypted_dname(d_inode(parent), 2776 buf, len); 2777 if (len < 0) { 2778 dput(parent); 2779 dput(cur); 2780 return ERR_PTR(len); 2781 } 2782 } 2783 pos -= len; 2784 if (pos < 0) { 2785 dput(parent); 2786 break; 2787 } 2788 memcpy(path + pos, buf, len); 2789 } 2790 dput(cur); 2791 cur = parent; 2792 2793 /* Are we at the root? */ 2794 if (IS_ROOT(cur)) 2795 break; 2796 2797 /* Are we out of buffer? */ 2798 if (--pos < 0) 2799 break; 2800 2801 path[pos] = '/'; 2802 } 2803 inode = d_inode(cur); 2804 base = inode ? ceph_ino(inode) : 0; 2805 dput(cur); 2806 2807 if (read_seqretry(&rename_lock, seq)) 2808 goto retry; 2809 2810 if (pos < 0) { 2811 /* 2812 * The path is longer than PATH_MAX and this function 2813 * cannot ever succeed. Creating paths that long is 2814 * possible with Ceph, but Linux cannot use them. 2815 */ 2816 return ERR_PTR(-ENAMETOOLONG); 2817 } 2818 2819 /* Initialize the output structure */ 2820 memset(path_info, 0, sizeof(*path_info)); 2821 2822 path_info->vino.ino = base; 2823 path_info->pathlen = PATH_MAX - 1 - pos; 2824 path_info->path = path + pos; 2825 path_info->freepath = true; 2826 2827 /* Set snap from dentry if available */ 2828 if (d_inode(dentry)) 2829 path_info->vino.snap = ceph_snap(d_inode(dentry)); 2830 else 2831 path_info->vino.snap = CEPH_NOSNAP; 2832 2833 doutc(cl, "on %p %d built %llx '%.*s'\n", dentry, d_count(dentry), 2834 base, PATH_MAX - 1 - pos, path + pos); 2835 return path + pos; 2836 } 2837 2838 static int build_dentry_path(struct ceph_mds_client *mdsc, struct dentry *dentry, 2839 struct inode *dir, struct ceph_path_info *path_info, 2840 bool parent_locked) 2841 { 2842 char *path; 2843 2844 rcu_read_lock(); 2845 if (!dir) 2846 dir = d_inode_rcu(dentry->d_parent); 2847 if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP && 2848 !IS_ENCRYPTED(dir)) { 2849 path_info->vino.ino = ceph_ino(dir); 2850 path_info->vino.snap = ceph_snap(dir); 2851 rcu_read_unlock(); 2852 path_info->path = dentry->d_name.name; 2853 path_info->pathlen = dentry->d_name.len; 2854 path_info->freepath = false; 2855 return 0; 2856 } 2857 rcu_read_unlock(); 2858 path = ceph_mdsc_build_path(mdsc, dentry, path_info, 1); 2859 if (IS_ERR(path)) 2860 return PTR_ERR(path); 2861 /* 2862 * ceph_mdsc_build_path already fills path_info, including snap handling. 2863 */ 2864 return 0; 2865 } 2866 2867 static int build_inode_path(struct inode *inode, struct ceph_path_info *path_info) 2868 { 2869 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); 2870 struct dentry *dentry; 2871 char *path; 2872 2873 if (ceph_snap(inode) == CEPH_NOSNAP) { 2874 path_info->vino.ino = ceph_ino(inode); 2875 path_info->vino.snap = ceph_snap(inode); 2876 path_info->pathlen = 0; 2877 path_info->freepath = false; 2878 return 0; 2879 } 2880 dentry = d_find_alias(inode); 2881 path = ceph_mdsc_build_path(mdsc, dentry, path_info, 1); 2882 dput(dentry); 2883 if (IS_ERR(path)) 2884 return PTR_ERR(path); 2885 /* 2886 * ceph_mdsc_build_path already fills path_info, including snap from dentry. 2887 * Override with inode's snap since that's what this function is for. 2888 */ 2889 path_info->vino.snap = ceph_snap(inode); 2890 return 0; 2891 } 2892 2893 /* 2894 * request arguments may be specified via an inode *, a dentry *, or 2895 * an explicit ino+path. 2896 */ 2897 static int set_request_path_attr(struct ceph_mds_client *mdsc, struct inode *rinode, 2898 struct dentry *rdentry, struct inode *rdiri, 2899 const char *rpath, u64 rino, 2900 struct ceph_path_info *path_info, 2901 bool parent_locked) 2902 { 2903 struct ceph_client *cl = mdsc->fsc->client; 2904 int r = 0; 2905 2906 /* Initialize the output structure */ 2907 memset(path_info, 0, sizeof(*path_info)); 2908 2909 if (rinode) { 2910 r = build_inode_path(rinode, path_info); 2911 doutc(cl, " inode %p %llx.%llx\n", rinode, ceph_ino(rinode), 2912 ceph_snap(rinode)); 2913 } else if (rdentry) { 2914 r = build_dentry_path(mdsc, rdentry, rdiri, path_info, parent_locked); 2915 doutc(cl, " dentry %p %llx/%.*s\n", rdentry, path_info->vino.ino, 2916 path_info->pathlen, path_info->path); 2917 } else if (rpath || rino) { 2918 path_info->vino.ino = rino; 2919 path_info->vino.snap = CEPH_NOSNAP; 2920 path_info->path = rpath; 2921 path_info->pathlen = rpath ? strlen(rpath) : 0; 2922 path_info->freepath = false; 2923 2924 doutc(cl, " path %.*s\n", path_info->pathlen, rpath); 2925 } 2926 2927 return r; 2928 } 2929 2930 static void encode_mclientrequest_tail(void **p, 2931 const struct ceph_mds_request *req) 2932 { 2933 struct ceph_timespec ts; 2934 int i; 2935 2936 ceph_encode_timespec64(&ts, &req->r_stamp); 2937 ceph_encode_copy(p, &ts, sizeof(ts)); 2938 2939 /* v4: gid_list */ 2940 ceph_encode_32(p, req->r_cred->group_info->ngroups); 2941 for (i = 0; i < req->r_cred->group_info->ngroups; i++) 2942 ceph_encode_64(p, from_kgid(&init_user_ns, 2943 req->r_cred->group_info->gid[i])); 2944 2945 /* v5: altname */ 2946 ceph_encode_32(p, req->r_altname_len); 2947 ceph_encode_copy(p, req->r_altname, req->r_altname_len); 2948 2949 /* v6: fscrypt_auth and fscrypt_file */ 2950 if (req->r_fscrypt_auth) { 2951 u32 authlen = ceph_fscrypt_auth_len(req->r_fscrypt_auth); 2952 2953 ceph_encode_32(p, authlen); 2954 ceph_encode_copy(p, req->r_fscrypt_auth, authlen); 2955 } else { 2956 ceph_encode_32(p, 0); 2957 } 2958 if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags)) { 2959 ceph_encode_32(p, sizeof(__le64)); 2960 ceph_encode_64(p, req->r_fscrypt_file); 2961 } else { 2962 ceph_encode_32(p, 0); 2963 } 2964 } 2965 2966 static inline u16 mds_supported_head_version(struct ceph_mds_session *session) 2967 { 2968 if (!test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD, &session->s_features)) 2969 return 1; 2970 2971 if (!test_bit(CEPHFS_FEATURE_HAS_OWNER_UIDGID, &session->s_features)) 2972 return 2; 2973 2974 return CEPH_MDS_REQUEST_HEAD_VERSION; 2975 } 2976 2977 static struct ceph_mds_request_head_legacy * 2978 find_legacy_request_head(void *p, u64 features) 2979 { 2980 bool legacy = !(features & CEPH_FEATURE_FS_BTIME); 2981 struct ceph_mds_request_head *head; 2982 2983 if (legacy) 2984 return (struct ceph_mds_request_head_legacy *)p; 2985 head = (struct ceph_mds_request_head *)p; 2986 return (struct ceph_mds_request_head_legacy *)&head->oldest_client_tid; 2987 } 2988 2989 /* 2990 * called under mdsc->mutex 2991 */ 2992 static struct ceph_msg *create_request_message(struct ceph_mds_session *session, 2993 struct ceph_mds_request *req, 2994 bool drop_cap_releases) 2995 { 2996 int mds = session->s_mds; 2997 struct ceph_mds_client *mdsc = session->s_mdsc; 2998 struct ceph_client *cl = mdsc->fsc->client; 2999 struct ceph_msg *msg; 3000 struct ceph_mds_request_head_legacy *lhead; 3001 struct ceph_path_info path_info1 = {0}; 3002 struct ceph_path_info path_info2 = {0}; 3003 struct dentry *old_dentry = NULL; 3004 int len; 3005 u16 releases; 3006 void *p, *end; 3007 int ret; 3008 bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME); 3009 u16 request_head_version = mds_supported_head_version(session); 3010 kuid_t caller_fsuid = req->r_cred->fsuid; 3011 kgid_t caller_fsgid = req->r_cred->fsgid; 3012 bool parent_locked = test_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); 3013 3014 ret = set_request_path_attr(mdsc, req->r_inode, req->r_dentry, 3015 req->r_parent, req->r_path1, req->r_ino1.ino, 3016 &path_info1, parent_locked); 3017 if (ret < 0) { 3018 msg = ERR_PTR(ret); 3019 goto out; 3020 } 3021 3022 /* 3023 * When the parent directory's i_rwsem is *not* locked, req->r_parent may 3024 * have become stale (e.g. after a concurrent rename) between the time the 3025 * dentry was looked up and now. If we detect that the stored r_parent 3026 * does not match the inode number we just encoded for the request, switch 3027 * to the correct inode so that the MDS receives a valid parent reference. 3028 */ 3029 if (!parent_locked && req->r_parent && path_info1.vino.ino && 3030 ceph_ino(req->r_parent) != path_info1.vino.ino) { 3031 struct inode *old_parent = req->r_parent; 3032 struct inode *correct_dir = ceph_get_inode(mdsc->fsc->sb, path_info1.vino, NULL); 3033 if (!IS_ERR(correct_dir)) { 3034 WARN_ONCE(1, "ceph: r_parent mismatch (had %llx wanted %llx) - updating\n", 3035 ceph_ino(old_parent), path_info1.vino.ino); 3036 /* 3037 * Transfer CEPH_CAP_PIN from the old parent to the new one. 3038 * The pin was taken earlier in ceph_mdsc_submit_request(). 3039 */ 3040 ceph_put_cap_refs(ceph_inode(old_parent), CEPH_CAP_PIN); 3041 iput(old_parent); 3042 req->r_parent = correct_dir; 3043 ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 3044 } 3045 } 3046 3047 /* If r_old_dentry is set, then assume that its parent is locked */ 3048 if (req->r_old_dentry && 3049 !(req->r_old_dentry->d_flags & DCACHE_DISCONNECTED)) 3050 old_dentry = req->r_old_dentry; 3051 ret = set_request_path_attr(mdsc, NULL, old_dentry, 3052 req->r_old_dentry_dir, 3053 req->r_path2, req->r_ino2.ino, 3054 &path_info2, true); 3055 if (ret < 0) { 3056 msg = ERR_PTR(ret); 3057 goto out_free1; 3058 } 3059 3060 req->r_altname = get_fscrypt_altname(req, &req->r_altname_len); 3061 if (IS_ERR(req->r_altname)) { 3062 msg = ERR_CAST(req->r_altname); 3063 req->r_altname = NULL; 3064 goto out_free2; 3065 } 3066 3067 /* 3068 * For old cephs without supporting the 32bit retry/fwd feature 3069 * it will copy the raw memories directly when decoding the 3070 * requests. While new cephs will decode the head depending the 3071 * version member, so we need to make sure it will be compatible 3072 * with them both. 3073 */ 3074 if (legacy) 3075 len = sizeof(struct ceph_mds_request_head_legacy); 3076 else if (request_head_version == 1) 3077 len = offsetofend(struct ceph_mds_request_head, args); 3078 else if (request_head_version == 2) 3079 len = offsetofend(struct ceph_mds_request_head, ext_num_fwd); 3080 else 3081 len = sizeof(struct ceph_mds_request_head); 3082 3083 /* filepaths */ 3084 len += 2 * (1 + sizeof(u32) + sizeof(u64)); 3085 len += path_info1.pathlen + path_info2.pathlen; 3086 3087 /* cap releases */ 3088 len += sizeof(struct ceph_mds_request_release) * 3089 (!!req->r_inode_drop + !!req->r_dentry_drop + 3090 !!req->r_old_inode_drop + !!req->r_old_dentry_drop); 3091 3092 if (req->r_dentry_drop) 3093 len += path_info1.pathlen; 3094 if (req->r_old_dentry_drop) 3095 len += path_info2.pathlen; 3096 3097 /* MClientRequest tail */ 3098 3099 /* req->r_stamp */ 3100 len += sizeof(struct ceph_timespec); 3101 3102 /* gid list */ 3103 len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups); 3104 3105 /* alternate name */ 3106 len += sizeof(u32) + req->r_altname_len; 3107 3108 /* fscrypt_auth */ 3109 len += sizeof(u32); // fscrypt_auth 3110 if (req->r_fscrypt_auth) 3111 len += ceph_fscrypt_auth_len(req->r_fscrypt_auth); 3112 3113 /* fscrypt_file */ 3114 len += sizeof(u32); 3115 if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags)) 3116 len += sizeof(__le64); 3117 3118 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false); 3119 if (!msg) { 3120 msg = ERR_PTR(-ENOMEM); 3121 goto out_free2; 3122 } 3123 3124 msg->hdr.tid = cpu_to_le64(req->r_tid); 3125 3126 lhead = find_legacy_request_head(msg->front.iov_base, 3127 session->s_con.peer_features); 3128 3129 if ((req->r_mnt_idmap != &nop_mnt_idmap) && 3130 !test_bit(CEPHFS_FEATURE_HAS_OWNER_UIDGID, &session->s_features)) { 3131 WARN_ON_ONCE(!IS_CEPH_MDS_OP_NEWINODE(req->r_op)); 3132 3133 if (enable_unsafe_idmap) { 3134 pr_warn_once_client(cl, 3135 "idmapped mount is used and CEPHFS_FEATURE_HAS_OWNER_UIDGID" 3136 " is not supported by MDS. UID/GID-based restrictions may" 3137 " not work properly.\n"); 3138 3139 caller_fsuid = from_vfsuid(req->r_mnt_idmap, &init_user_ns, 3140 VFSUIDT_INIT(req->r_cred->fsuid)); 3141 caller_fsgid = from_vfsgid(req->r_mnt_idmap, &init_user_ns, 3142 VFSGIDT_INIT(req->r_cred->fsgid)); 3143 } else { 3144 pr_err_ratelimited_client(cl, 3145 "idmapped mount is used and CEPHFS_FEATURE_HAS_OWNER_UIDGID" 3146 " is not supported by MDS. Fail request with -EIO.\n"); 3147 3148 ret = -EIO; 3149 goto out_err; 3150 } 3151 } 3152 3153 /* 3154 * The ceph_mds_request_head_legacy didn't contain a version field, and 3155 * one was added when we moved the message version from 3->4. 3156 */ 3157 if (legacy) { 3158 msg->hdr.version = cpu_to_le16(3); 3159 p = msg->front.iov_base + sizeof(*lhead); 3160 } else if (request_head_version == 1) { 3161 struct ceph_mds_request_head *nhead = msg->front.iov_base; 3162 3163 msg->hdr.version = cpu_to_le16(4); 3164 nhead->version = cpu_to_le16(1); 3165 p = msg->front.iov_base + offsetofend(struct ceph_mds_request_head, args); 3166 } else if (request_head_version == 2) { 3167 struct ceph_mds_request_head *nhead = msg->front.iov_base; 3168 3169 msg->hdr.version = cpu_to_le16(6); 3170 nhead->version = cpu_to_le16(2); 3171 3172 p = msg->front.iov_base + offsetofend(struct ceph_mds_request_head, ext_num_fwd); 3173 } else { 3174 struct ceph_mds_request_head *nhead = msg->front.iov_base; 3175 kuid_t owner_fsuid; 3176 kgid_t owner_fsgid; 3177 3178 msg->hdr.version = cpu_to_le16(6); 3179 nhead->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION); 3180 nhead->struct_len = cpu_to_le32(sizeof(struct ceph_mds_request_head)); 3181 3182 if (IS_CEPH_MDS_OP_NEWINODE(req->r_op)) { 3183 owner_fsuid = from_vfsuid(req->r_mnt_idmap, &init_user_ns, 3184 VFSUIDT_INIT(req->r_cred->fsuid)); 3185 owner_fsgid = from_vfsgid(req->r_mnt_idmap, &init_user_ns, 3186 VFSGIDT_INIT(req->r_cred->fsgid)); 3187 nhead->owner_uid = cpu_to_le32(from_kuid(&init_user_ns, owner_fsuid)); 3188 nhead->owner_gid = cpu_to_le32(from_kgid(&init_user_ns, owner_fsgid)); 3189 } else { 3190 nhead->owner_uid = cpu_to_le32(-1); 3191 nhead->owner_gid = cpu_to_le32(-1); 3192 } 3193 3194 p = msg->front.iov_base + sizeof(*nhead); 3195 } 3196 3197 end = msg->front.iov_base + msg->front.iov_len; 3198 3199 lhead->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); 3200 lhead->op = cpu_to_le32(req->r_op); 3201 lhead->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, 3202 caller_fsuid)); 3203 lhead->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, 3204 caller_fsgid)); 3205 lhead->ino = cpu_to_le64(req->r_deleg_ino); 3206 lhead->args = req->r_args; 3207 3208 ceph_encode_filepath(&p, end, path_info1.vino.ino, path_info1.path); 3209 ceph_encode_filepath(&p, end, path_info2.vino.ino, path_info2.path); 3210 3211 /* make note of release offset, in case we need to replay */ 3212 req->r_request_release_offset = p - msg->front.iov_base; 3213 3214 /* cap releases */ 3215 releases = 0; 3216 if (req->r_inode_drop) 3217 releases += ceph_encode_inode_release(&p, 3218 req->r_inode ? req->r_inode : d_inode(req->r_dentry), 3219 mds, req->r_inode_drop, req->r_inode_unless, 3220 req->r_op == CEPH_MDS_OP_READDIR); 3221 if (req->r_dentry_drop) { 3222 ret = ceph_encode_dentry_release(&p, req->r_dentry, 3223 req->r_parent, mds, req->r_dentry_drop, 3224 req->r_dentry_unless); 3225 if (ret < 0) 3226 goto out_err; 3227 releases += ret; 3228 } 3229 if (req->r_old_dentry_drop) { 3230 ret = ceph_encode_dentry_release(&p, req->r_old_dentry, 3231 req->r_old_dentry_dir, mds, 3232 req->r_old_dentry_drop, 3233 req->r_old_dentry_unless); 3234 if (ret < 0) 3235 goto out_err; 3236 releases += ret; 3237 } 3238 if (req->r_old_inode_drop) 3239 releases += ceph_encode_inode_release(&p, 3240 d_inode(req->r_old_dentry), 3241 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); 3242 3243 if (drop_cap_releases) { 3244 releases = 0; 3245 p = msg->front.iov_base + req->r_request_release_offset; 3246 } 3247 3248 lhead->num_releases = cpu_to_le16(releases); 3249 3250 encode_mclientrequest_tail(&p, req); 3251 3252 if (WARN_ON_ONCE(p > end)) { 3253 ceph_msg_put(msg); 3254 msg = ERR_PTR(-ERANGE); 3255 goto out_free2; 3256 } 3257 3258 msg->front.iov_len = p - msg->front.iov_base; 3259 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 3260 3261 if (req->r_pagelist) { 3262 struct ceph_pagelist *pagelist = req->r_pagelist; 3263 ceph_msg_data_add_pagelist(msg, pagelist); 3264 msg->hdr.data_len = cpu_to_le32(pagelist->length); 3265 } else { 3266 msg->hdr.data_len = 0; 3267 } 3268 3269 msg->hdr.data_off = cpu_to_le16(0); 3270 3271 out_free2: 3272 ceph_mdsc_free_path_info(&path_info2); 3273 out_free1: 3274 ceph_mdsc_free_path_info(&path_info1); 3275 out: 3276 return msg; 3277 out_err: 3278 ceph_msg_put(msg); 3279 msg = ERR_PTR(ret); 3280 goto out_free2; 3281 } 3282 3283 /* 3284 * called under mdsc->mutex if error, under no mutex if 3285 * success. 3286 */ 3287 static void complete_request(struct ceph_mds_client *mdsc, 3288 struct ceph_mds_request *req) 3289 { 3290 req->r_end_latency = ktime_get(); 3291 3292 trace_ceph_mdsc_complete_request(mdsc, req); 3293 3294 if (req->r_callback) 3295 req->r_callback(mdsc, req); 3296 complete_all(&req->r_completion); 3297 } 3298 3299 /* 3300 * called under mdsc->mutex 3301 */ 3302 static int __prepare_send_request(struct ceph_mds_session *session, 3303 struct ceph_mds_request *req, 3304 bool drop_cap_releases) 3305 { 3306 int mds = session->s_mds; 3307 struct ceph_mds_client *mdsc = session->s_mdsc; 3308 struct ceph_client *cl = mdsc->fsc->client; 3309 struct ceph_mds_request_head_legacy *lhead; 3310 struct ceph_mds_request_head *nhead; 3311 struct ceph_msg *msg; 3312 int flags = 0, old_max_retry; 3313 bool old_version = !test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD, 3314 &session->s_features); 3315 3316 /* 3317 * Avoid infinite retrying after overflow. The client will 3318 * increase the retry count and if the MDS is old version, 3319 * so we limit to retry at most 256 times. 3320 */ 3321 if (req->r_attempts) { 3322 old_max_retry = sizeof_field(struct ceph_mds_request_head, 3323 num_retry); 3324 old_max_retry = 1 << (old_max_retry * BITS_PER_BYTE); 3325 if ((old_version && req->r_attempts >= old_max_retry) || 3326 ((uint32_t)req->r_attempts >= U32_MAX)) { 3327 pr_warn_ratelimited_client(cl, "request tid %llu seq overflow\n", 3328 req->r_tid); 3329 return -EMULTIHOP; 3330 } 3331 } 3332 3333 req->r_attempts++; 3334 if (req->r_inode) { 3335 struct ceph_cap *cap = 3336 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); 3337 3338 if (cap) 3339 req->r_sent_on_mseq = cap->mseq; 3340 else 3341 req->r_sent_on_mseq = -1; 3342 } 3343 doutc(cl, "%p tid %lld %s (attempt %d)\n", req, req->r_tid, 3344 ceph_mds_op_name(req->r_op), req->r_attempts); 3345 3346 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3347 void *p; 3348 3349 /* 3350 * Replay. Do not regenerate message (and rebuild 3351 * paths, etc.); just use the original message. 3352 * Rebuilding paths will break for renames because 3353 * d_move mangles the src name. 3354 */ 3355 msg = req->r_request; 3356 lhead = find_legacy_request_head(msg->front.iov_base, 3357 session->s_con.peer_features); 3358 3359 flags = le32_to_cpu(lhead->flags); 3360 flags |= CEPH_MDS_FLAG_REPLAY; 3361 lhead->flags = cpu_to_le32(flags); 3362 3363 if (req->r_target_inode) 3364 lhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); 3365 3366 lhead->num_retry = req->r_attempts - 1; 3367 if (!old_version) { 3368 nhead = (struct ceph_mds_request_head*)msg->front.iov_base; 3369 nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1); 3370 } 3371 3372 /* remove cap/dentry releases from message */ 3373 lhead->num_releases = 0; 3374 3375 p = msg->front.iov_base + req->r_request_release_offset; 3376 encode_mclientrequest_tail(&p, req); 3377 3378 msg->front.iov_len = p - msg->front.iov_base; 3379 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 3380 return 0; 3381 } 3382 3383 if (req->r_request) { 3384 ceph_msg_put(req->r_request); 3385 req->r_request = NULL; 3386 } 3387 msg = create_request_message(session, req, drop_cap_releases); 3388 if (IS_ERR(msg)) { 3389 req->r_err = PTR_ERR(msg); 3390 return PTR_ERR(msg); 3391 } 3392 req->r_request = msg; 3393 3394 lhead = find_legacy_request_head(msg->front.iov_base, 3395 session->s_con.peer_features); 3396 lhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 3397 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3398 flags |= CEPH_MDS_FLAG_REPLAY; 3399 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) 3400 flags |= CEPH_MDS_FLAG_ASYNC; 3401 if (req->r_parent) 3402 flags |= CEPH_MDS_FLAG_WANT_DENTRY; 3403 lhead->flags = cpu_to_le32(flags); 3404 lhead->num_fwd = req->r_num_fwd; 3405 lhead->num_retry = req->r_attempts - 1; 3406 if (!old_version) { 3407 nhead = (struct ceph_mds_request_head*)msg->front.iov_base; 3408 nhead->ext_num_fwd = cpu_to_le32(req->r_num_fwd); 3409 nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1); 3410 } 3411 3412 doutc(cl, " r_parent = %p\n", req->r_parent); 3413 return 0; 3414 } 3415 3416 /* 3417 * called under mdsc->mutex 3418 */ 3419 static int __send_request(struct ceph_mds_session *session, 3420 struct ceph_mds_request *req, 3421 bool drop_cap_releases) 3422 { 3423 int err; 3424 3425 trace_ceph_mdsc_send_request(session, req); 3426 3427 err = __prepare_send_request(session, req, drop_cap_releases); 3428 if (!err) { 3429 ceph_msg_get(req->r_request); 3430 ceph_con_send(&session->s_con, req->r_request); 3431 } 3432 3433 return err; 3434 } 3435 3436 /* 3437 * send request, or put it on the appropriate wait list. 3438 */ 3439 static void __do_request(struct ceph_mds_client *mdsc, 3440 struct ceph_mds_request *req) 3441 { 3442 struct ceph_client *cl = mdsc->fsc->client; 3443 struct ceph_mds_session *session = NULL; 3444 int mds = -1; 3445 int err = 0; 3446 bool random; 3447 3448 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 3449 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) 3450 __unregister_request(mdsc, req); 3451 return; 3452 } 3453 3454 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) { 3455 doutc(cl, "metadata corrupted\n"); 3456 err = -EIO; 3457 goto finish; 3458 } 3459 if (req->r_timeout && 3460 time_after_eq(jiffies, req->r_started + req->r_timeout)) { 3461 doutc(cl, "timed out\n"); 3462 err = -ETIMEDOUT; 3463 goto finish; 3464 } 3465 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 3466 doutc(cl, "forced umount\n"); 3467 err = -EIO; 3468 goto finish; 3469 } 3470 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) { 3471 if (mdsc->mdsmap_err) { 3472 err = mdsc->mdsmap_err; 3473 doutc(cl, "mdsmap err %d\n", err); 3474 goto finish; 3475 } 3476 if (mdsc->mdsmap->m_epoch == 0) { 3477 doutc(cl, "no mdsmap, waiting for map\n"); 3478 trace_ceph_mdsc_suspend_request(mdsc, session, req, 3479 ceph_mdsc_suspend_reason_no_mdsmap); 3480 list_add(&req->r_wait, &mdsc->waiting_for_map); 3481 return; 3482 } 3483 if (!(mdsc->fsc->mount_options->flags & 3484 CEPH_MOUNT_OPT_MOUNTWAIT) && 3485 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) { 3486 err = -EHOSTUNREACH; 3487 goto finish; 3488 } 3489 } 3490 3491 put_request_session(req); 3492 3493 mds = __choose_mds(mdsc, req, &random); 3494 if (mds < 0 || 3495 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { 3496 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 3497 err = -EJUKEBOX; 3498 goto finish; 3499 } 3500 doutc(cl, "no mds or not active, waiting for map\n"); 3501 trace_ceph_mdsc_suspend_request(mdsc, session, req, 3502 ceph_mdsc_suspend_reason_no_active_mds); 3503 list_add(&req->r_wait, &mdsc->waiting_for_map); 3504 return; 3505 } 3506 3507 /* get, open session */ 3508 session = __ceph_lookup_mds_session(mdsc, mds); 3509 if (!session) { 3510 session = register_session(mdsc, mds); 3511 if (IS_ERR(session)) { 3512 err = PTR_ERR(session); 3513 goto finish; 3514 } 3515 } 3516 req->r_session = ceph_get_mds_session(session); 3517 3518 doutc(cl, "mds%d session %p state %s\n", mds, session, 3519 ceph_session_state_name(session->s_state)); 3520 3521 /* 3522 * The old ceph will crash the MDSs when see unknown OPs 3523 */ 3524 if (req->r_feature_needed > 0 && 3525 !test_bit(req->r_feature_needed, &session->s_features)) { 3526 err = -EOPNOTSUPP; 3527 goto out_session; 3528 } 3529 3530 if (session->s_state != CEPH_MDS_SESSION_OPEN && 3531 session->s_state != CEPH_MDS_SESSION_HUNG) { 3532 /* 3533 * We cannot queue async requests since the caps and delegated 3534 * inodes are bound to the session. Just return -EJUKEBOX and 3535 * let the caller retry a sync request in that case. 3536 */ 3537 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 3538 err = -EJUKEBOX; 3539 goto out_session; 3540 } 3541 3542 /* 3543 * If the session has been REJECTED, then return a hard error, 3544 * unless it's a CLEANRECOVER mount, in which case we'll queue 3545 * it to the mdsc queue. 3546 */ 3547 if (session->s_state == CEPH_MDS_SESSION_REJECTED) { 3548 if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER)) { 3549 trace_ceph_mdsc_suspend_request(mdsc, session, req, 3550 ceph_mdsc_suspend_reason_rejected); 3551 list_add(&req->r_wait, &mdsc->waiting_for_map); 3552 } else 3553 err = -EACCES; 3554 goto out_session; 3555 } 3556 3557 if (session->s_state == CEPH_MDS_SESSION_NEW || 3558 session->s_state == CEPH_MDS_SESSION_CLOSING) { 3559 err = __open_session(mdsc, session); 3560 if (err) 3561 goto out_session; 3562 /* retry the same mds later */ 3563 if (random) 3564 req->r_resend_mds = mds; 3565 } 3566 trace_ceph_mdsc_suspend_request(mdsc, session, req, 3567 ceph_mdsc_suspend_reason_session); 3568 list_add(&req->r_wait, &session->s_waiting); 3569 goto out_session; 3570 } 3571 3572 /* send request */ 3573 req->r_resend_mds = -1; /* forget any previous mds hint */ 3574 3575 if (req->r_request_started == 0) /* note request start time */ 3576 req->r_request_started = jiffies; 3577 3578 /* 3579 * For async create we will choose the auth MDS of frag in parent 3580 * directory to send the request and usually this works fine, but 3581 * if the migrated the dirtory to another MDS before it could handle 3582 * it the request will be forwarded. 3583 * 3584 * And then the auth cap will be changed. 3585 */ 3586 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags) && req->r_num_fwd) { 3587 struct ceph_dentry_info *di = ceph_dentry(req->r_dentry); 3588 struct ceph_inode_info *ci; 3589 struct ceph_cap *cap; 3590 3591 /* 3592 * The request maybe handled very fast and the new inode 3593 * hasn't been linked to the dentry yet. We need to wait 3594 * for the ceph_finish_async_create(), which shouldn't be 3595 * stuck too long or fail in thoery, to finish when forwarding 3596 * the request. 3597 */ 3598 if (!d_inode(req->r_dentry)) { 3599 err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_CREATE_BIT, 3600 TASK_KILLABLE); 3601 if (err) { 3602 mutex_lock(&req->r_fill_mutex); 3603 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 3604 mutex_unlock(&req->r_fill_mutex); 3605 goto out_session; 3606 } 3607 } 3608 3609 ci = ceph_inode(d_inode(req->r_dentry)); 3610 3611 spin_lock(&ci->i_ceph_lock); 3612 cap = ci->i_auth_cap; 3613 if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE && mds != cap->mds) { 3614 doutc(cl, "session changed for auth cap %d -> %d\n", 3615 cap->session->s_mds, session->s_mds); 3616 3617 /* Remove the auth cap from old session */ 3618 spin_lock(&cap->session->s_cap_lock); 3619 cap->session->s_nr_caps--; 3620 list_del_init(&cap->session_caps); 3621 spin_unlock(&cap->session->s_cap_lock); 3622 3623 /* Add the auth cap to the new session */ 3624 cap->mds = mds; 3625 cap->session = session; 3626 spin_lock(&session->s_cap_lock); 3627 session->s_nr_caps++; 3628 list_add_tail(&cap->session_caps, &session->s_caps); 3629 spin_unlock(&session->s_cap_lock); 3630 3631 change_auth_cap_ses(ci, session); 3632 } 3633 spin_unlock(&ci->i_ceph_lock); 3634 } 3635 3636 err = __send_request(session, req, false); 3637 3638 out_session: 3639 ceph_put_mds_session(session); 3640 finish: 3641 if (err) { 3642 doutc(cl, "early error %d\n", err); 3643 req->r_err = err; 3644 complete_request(mdsc, req); 3645 __unregister_request(mdsc, req); 3646 } 3647 return; 3648 } 3649 3650 /* 3651 * called under mdsc->mutex 3652 */ 3653 static void __wake_requests(struct ceph_mds_client *mdsc, 3654 struct list_head *head) 3655 { 3656 struct ceph_client *cl = mdsc->fsc->client; 3657 struct ceph_mds_request *req; 3658 LIST_HEAD(tmp_list); 3659 3660 list_splice_init(head, &tmp_list); 3661 3662 while (!list_empty(&tmp_list)) { 3663 req = list_entry(tmp_list.next, 3664 struct ceph_mds_request, r_wait); 3665 list_del_init(&req->r_wait); 3666 doutc(cl, " wake request %p tid %llu\n", req, 3667 req->r_tid); 3668 trace_ceph_mdsc_resume_request(mdsc, req); 3669 __do_request(mdsc, req); 3670 } 3671 } 3672 3673 /* 3674 * Wake up threads with requests pending for @mds, so that they can 3675 * resubmit their requests to a possibly different mds. 3676 */ 3677 static void kick_requests(struct ceph_mds_client *mdsc, int mds) 3678 { 3679 struct ceph_client *cl = mdsc->fsc->client; 3680 struct ceph_mds_request *req; 3681 struct rb_node *p = rb_first(&mdsc->request_tree); 3682 3683 doutc(cl, "kick_requests mds%d\n", mds); 3684 while (p) { 3685 req = rb_entry(p, struct ceph_mds_request, r_node); 3686 p = rb_next(p); 3687 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3688 continue; 3689 if (req->r_attempts > 0) 3690 continue; /* only new requests */ 3691 if (req->r_session && 3692 req->r_session->s_mds == mds) { 3693 doutc(cl, " kicking tid %llu\n", req->r_tid); 3694 list_del_init(&req->r_wait); 3695 trace_ceph_mdsc_resume_request(mdsc, req); 3696 __do_request(mdsc, req); 3697 } 3698 } 3699 } 3700 3701 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, 3702 struct ceph_mds_request *req) 3703 { 3704 struct ceph_client *cl = mdsc->fsc->client; 3705 int err = 0; 3706 3707 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */ 3708 if (req->r_inode) 3709 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 3710 if (req->r_parent) { 3711 struct ceph_inode_info *ci = ceph_inode(req->r_parent); 3712 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ? 3713 CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD; 3714 spin_lock(&ci->i_ceph_lock); 3715 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false); 3716 __ceph_touch_fmode(ci, mdsc, fmode); 3717 spin_unlock(&ci->i_ceph_lock); 3718 } 3719 if (req->r_old_dentry_dir) 3720 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 3721 CEPH_CAP_PIN); 3722 3723 if (req->r_inode) { 3724 err = ceph_wait_on_async_create(req->r_inode); 3725 if (err) { 3726 doutc(cl, "wait for async create returned: %d\n", err); 3727 return err; 3728 } 3729 } 3730 3731 if (!err && req->r_old_inode) { 3732 err = ceph_wait_on_async_create(req->r_old_inode); 3733 if (err) { 3734 doutc(cl, "wait for async create returned: %d\n", err); 3735 return err; 3736 } 3737 } 3738 3739 doutc(cl, "submit_request on %p for inode %p\n", req, dir); 3740 mutex_lock(&mdsc->mutex); 3741 __register_request(mdsc, req, dir); 3742 trace_ceph_mdsc_submit_request(mdsc, req); 3743 __do_request(mdsc, req); 3744 err = req->r_err; 3745 mutex_unlock(&mdsc->mutex); 3746 return err; 3747 } 3748 3749 int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc, 3750 struct ceph_mds_request *req, 3751 ceph_mds_request_wait_callback_t wait_func) 3752 { 3753 struct ceph_client *cl = mdsc->fsc->client; 3754 int err; 3755 3756 /* wait */ 3757 doutc(cl, "do_request waiting\n"); 3758 if (wait_func) { 3759 err = wait_func(mdsc, req); 3760 } else { 3761 long timeleft = wait_for_completion_killable_timeout( 3762 &req->r_completion, 3763 ceph_timeout_jiffies(req->r_timeout)); 3764 if (timeleft > 0) 3765 err = 0; 3766 else if (!timeleft) 3767 err = -ETIMEDOUT; /* timed out */ 3768 else 3769 err = timeleft; /* killed */ 3770 } 3771 doutc(cl, "do_request waited, got %d\n", err); 3772 mutex_lock(&mdsc->mutex); 3773 3774 /* only abort if we didn't race with a real reply */ 3775 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 3776 err = le32_to_cpu(req->r_reply_info.head->result); 3777 } else if (err < 0) { 3778 doutc(cl, "aborted request %lld with %d\n", req->r_tid, err); 3779 3780 /* 3781 * ensure we aren't running concurrently with 3782 * ceph_fill_trace or ceph_readdir_prepopulate, which 3783 * rely on locks (dir mutex) held by our caller. 3784 */ 3785 mutex_lock(&req->r_fill_mutex); 3786 req->r_err = err; 3787 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 3788 mutex_unlock(&req->r_fill_mutex); 3789 3790 if (req->r_parent && 3791 (req->r_op & CEPH_MDS_OP_WRITE)) 3792 ceph_invalidate_dir_request(req); 3793 } else { 3794 err = req->r_err; 3795 } 3796 3797 mutex_unlock(&mdsc->mutex); 3798 return err; 3799 } 3800 3801 /* 3802 * Synchrously perform an mds request. Take care of all of the 3803 * session setup, forwarding, retry details. 3804 */ 3805 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, 3806 struct inode *dir, 3807 struct ceph_mds_request *req) 3808 { 3809 struct ceph_client *cl = mdsc->fsc->client; 3810 int err; 3811 3812 doutc(cl, "do_request on %p\n", req); 3813 3814 /* issue */ 3815 err = ceph_mdsc_submit_request(mdsc, dir, req); 3816 if (!err) 3817 err = ceph_mdsc_wait_request(mdsc, req, NULL); 3818 doutc(cl, "do_request %p done, result %d\n", req, err); 3819 return err; 3820 } 3821 3822 /* 3823 * Invalidate dir's completeness, dentry lease state on an aborted MDS 3824 * namespace request. 3825 */ 3826 void ceph_invalidate_dir_request(struct ceph_mds_request *req) 3827 { 3828 struct inode *dir = req->r_parent; 3829 struct inode *old_dir = req->r_old_dentry_dir; 3830 struct ceph_client *cl = req->r_mdsc->fsc->client; 3831 3832 doutc(cl, "invalidate_dir_request %p %p (complete, lease(s))\n", 3833 dir, old_dir); 3834 3835 ceph_dir_clear_complete(dir); 3836 if (old_dir) 3837 ceph_dir_clear_complete(old_dir); 3838 if (req->r_dentry) 3839 ceph_invalidate_dentry_lease(req->r_dentry); 3840 if (req->r_old_dentry) 3841 ceph_invalidate_dentry_lease(req->r_old_dentry); 3842 } 3843 3844 /* 3845 * Handle mds reply. 3846 * 3847 * We take the session mutex and parse and process the reply immediately. 3848 * This preserves the logical ordering of replies, capabilities, etc., sent 3849 * by the MDS as they are applied to our local cache. 3850 */ 3851 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) 3852 { 3853 struct ceph_mds_client *mdsc = session->s_mdsc; 3854 struct ceph_client *cl = mdsc->fsc->client; 3855 struct ceph_mds_request *req; 3856 struct ceph_mds_reply_head *head = msg->front.iov_base; 3857 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 3858 struct ceph_snap_realm *realm; 3859 u64 tid; 3860 int err, result; 3861 int mds = session->s_mds; 3862 bool close_sessions = false; 3863 3864 if (msg->front.iov_len < sizeof(*head)) { 3865 pr_err_client(cl, "got corrupt (short) reply\n"); 3866 ceph_msg_dump(msg); 3867 return; 3868 } 3869 3870 /* get request, session */ 3871 tid = le64_to_cpu(msg->hdr.tid); 3872 mutex_lock(&mdsc->mutex); 3873 req = lookup_get_request(mdsc, tid); 3874 if (!req) { 3875 doutc(cl, "on unknown tid %llu\n", tid); 3876 mutex_unlock(&mdsc->mutex); 3877 return; 3878 } 3879 doutc(cl, "handle_reply %p\n", req); 3880 3881 /* correct session? */ 3882 if (req->r_session != session) { 3883 pr_err_client(cl, "got %llu on session mds%d not mds%d\n", 3884 tid, session->s_mds, 3885 req->r_session ? req->r_session->s_mds : -1); 3886 mutex_unlock(&mdsc->mutex); 3887 goto out; 3888 } 3889 3890 /* dup? */ 3891 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) || 3892 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) { 3893 pr_warn_client(cl, "got a dup %s reply on %llu from mds%d\n", 3894 head->safe ? "safe" : "unsafe", tid, mds); 3895 mutex_unlock(&mdsc->mutex); 3896 goto out; 3897 } 3898 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) { 3899 pr_warn_client(cl, "got unsafe after safe on %llu from mds%d\n", 3900 tid, mds); 3901 mutex_unlock(&mdsc->mutex); 3902 goto out; 3903 } 3904 3905 result = le32_to_cpu(head->result); 3906 3907 if (head->safe) { 3908 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags); 3909 __unregister_request(mdsc, req); 3910 3911 /* last request during umount? */ 3912 if (mdsc->stopping && !__get_oldest_req(mdsc)) 3913 complete_all(&mdsc->safe_umount_waiters); 3914 3915 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3916 /* 3917 * We already handled the unsafe response, now do the 3918 * cleanup. No need to examine the response; the MDS 3919 * doesn't include any result info in the safe 3920 * response. And even if it did, there is nothing 3921 * useful we could do with a revised return value. 3922 */ 3923 doutc(cl, "got safe reply %llu, mds%d\n", tid, mds); 3924 3925 mutex_unlock(&mdsc->mutex); 3926 goto out; 3927 } 3928 } else { 3929 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags); 3930 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 3931 } 3932 3933 doutc(cl, "tid %lld result %d\n", tid, result); 3934 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features)) 3935 err = parse_reply_info(session, msg, req, (u64)-1); 3936 else 3937 err = parse_reply_info(session, msg, req, 3938 session->s_con.peer_features); 3939 mutex_unlock(&mdsc->mutex); 3940 3941 /* Must find target inode outside of mutexes to avoid deadlocks */ 3942 rinfo = &req->r_reply_info; 3943 if ((err >= 0) && rinfo->head->is_target) { 3944 struct inode *in = xchg(&req->r_new_inode, NULL); 3945 struct ceph_vino tvino = { 3946 .ino = le64_to_cpu(rinfo->targeti.in->ino), 3947 .snap = le64_to_cpu(rinfo->targeti.in->snapid) 3948 }; 3949 3950 /* 3951 * If we ended up opening an existing inode, discard 3952 * r_new_inode 3953 */ 3954 if (req->r_op == CEPH_MDS_OP_CREATE && 3955 !req->r_reply_info.has_create_ino) { 3956 /* This should never happen on an async create */ 3957 WARN_ON_ONCE(req->r_deleg_ino); 3958 iput(in); 3959 in = NULL; 3960 } 3961 3962 in = ceph_get_inode(mdsc->fsc->sb, tvino, in); 3963 if (IS_ERR(in)) { 3964 err = PTR_ERR(in); 3965 mutex_lock(&session->s_mutex); 3966 goto out_err; 3967 } 3968 req->r_target_inode = in; 3969 } 3970 3971 mutex_lock(&session->s_mutex); 3972 if (err < 0) { 3973 pr_err_client(cl, "got corrupt reply mds%d(tid:%lld)\n", 3974 mds, tid); 3975 ceph_msg_dump(msg); 3976 goto out_err; 3977 } 3978 3979 /* snap trace */ 3980 realm = NULL; 3981 if (rinfo->snapblob_len) { 3982 down_write(&mdsc->snap_rwsem); 3983 err = ceph_update_snap_trace(mdsc, rinfo->snapblob, 3984 rinfo->snapblob + rinfo->snapblob_len, 3985 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP, 3986 &realm); 3987 if (err) { 3988 up_write(&mdsc->snap_rwsem); 3989 close_sessions = true; 3990 if (err == -EIO) 3991 ceph_msg_dump(msg); 3992 goto out_err; 3993 } 3994 downgrade_write(&mdsc->snap_rwsem); 3995 } else { 3996 down_read(&mdsc->snap_rwsem); 3997 } 3998 3999 /* insert trace into our cache */ 4000 mutex_lock(&req->r_fill_mutex); 4001 current->journal_info = req; 4002 err = ceph_fill_trace(mdsc->fsc->sb, req); 4003 if (err == 0) { 4004 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || 4005 req->r_op == CEPH_MDS_OP_LSSNAP)) 4006 err = ceph_readdir_prepopulate(req, req->r_session); 4007 } 4008 current->journal_info = NULL; 4009 mutex_unlock(&req->r_fill_mutex); 4010 4011 up_read(&mdsc->snap_rwsem); 4012 if (realm) 4013 ceph_put_snap_realm(mdsc, realm); 4014 4015 if (err == 0) { 4016 if (req->r_target_inode && 4017 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 4018 struct ceph_inode_info *ci = 4019 ceph_inode(req->r_target_inode); 4020 spin_lock(&ci->i_unsafe_lock); 4021 list_add_tail(&req->r_unsafe_target_item, 4022 &ci->i_unsafe_iops); 4023 spin_unlock(&ci->i_unsafe_lock); 4024 } 4025 4026 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 4027 } 4028 out_err: 4029 mutex_lock(&mdsc->mutex); 4030 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 4031 if (err) { 4032 req->r_err = err; 4033 } else { 4034 req->r_reply = ceph_msg_get(msg); 4035 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags); 4036 } 4037 } else { 4038 doutc(cl, "reply arrived after request %lld was aborted\n", tid); 4039 } 4040 mutex_unlock(&mdsc->mutex); 4041 4042 mutex_unlock(&session->s_mutex); 4043 4044 /* kick calling process */ 4045 complete_request(mdsc, req); 4046 4047 ceph_update_metadata_metrics(&mdsc->metric, req->r_start_latency, 4048 req->r_end_latency, err); 4049 out: 4050 ceph_mdsc_put_request(req); 4051 4052 /* Defer closing the sessions after s_mutex lock being released */ 4053 if (close_sessions) 4054 ceph_mdsc_close_sessions(mdsc); 4055 return; 4056 } 4057 4058 4059 4060 /* 4061 * handle mds notification that our request has been forwarded. 4062 */ 4063 static void handle_forward(struct ceph_mds_client *mdsc, 4064 struct ceph_mds_session *session, 4065 struct ceph_msg *msg) 4066 { 4067 struct ceph_client *cl = mdsc->fsc->client; 4068 struct ceph_mds_request *req; 4069 u64 tid = le64_to_cpu(msg->hdr.tid); 4070 u32 next_mds; 4071 u32 fwd_seq; 4072 int err = -EINVAL; 4073 void *p = msg->front.iov_base; 4074 void *end = p + msg->front.iov_len; 4075 bool aborted = false; 4076 4077 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 4078 next_mds = ceph_decode_32(&p); 4079 fwd_seq = ceph_decode_32(&p); 4080 4081 mutex_lock(&mdsc->mutex); 4082 req = lookup_get_request(mdsc, tid); 4083 if (!req) { 4084 mutex_unlock(&mdsc->mutex); 4085 doutc(cl, "forward tid %llu to mds%d - req dne\n", tid, next_mds); 4086 return; /* dup reply? */ 4087 } 4088 4089 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 4090 doutc(cl, "forward tid %llu aborted, unregistering\n", tid); 4091 __unregister_request(mdsc, req); 4092 } else if (fwd_seq <= req->r_num_fwd || (uint32_t)fwd_seq >= U32_MAX) { 4093 /* 4094 * Avoid infinite retrying after overflow. 4095 * 4096 * The MDS will increase the fwd count and in client side 4097 * if the num_fwd is less than the one saved in request 4098 * that means the MDS is an old version and overflowed of 4099 * 8 bits. 4100 */ 4101 mutex_lock(&req->r_fill_mutex); 4102 req->r_err = -EMULTIHOP; 4103 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 4104 mutex_unlock(&req->r_fill_mutex); 4105 aborted = true; 4106 pr_warn_ratelimited_client(cl, "forward tid %llu seq overflow\n", 4107 tid); 4108 } else { 4109 /* resend. forward race not possible; mds would drop */ 4110 doutc(cl, "forward tid %llu to mds%d (we resend)\n", tid, next_mds); 4111 BUG_ON(req->r_err); 4112 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)); 4113 req->r_attempts = 0; 4114 req->r_num_fwd = fwd_seq; 4115 req->r_resend_mds = next_mds; 4116 put_request_session(req); 4117 __do_request(mdsc, req); 4118 } 4119 mutex_unlock(&mdsc->mutex); 4120 4121 /* kick calling process */ 4122 if (aborted) 4123 complete_request(mdsc, req); 4124 ceph_mdsc_put_request(req); 4125 return; 4126 4127 bad: 4128 pr_err_client(cl, "decode error err=%d\n", err); 4129 ceph_msg_dump(msg); 4130 } 4131 4132 static int __decode_session_metadata(void **p, void *end, 4133 bool *blocklisted) 4134 { 4135 /* map<string,string> */ 4136 u32 n; 4137 bool err_str; 4138 ceph_decode_32_safe(p, end, n, bad); 4139 while (n-- > 0) { 4140 u32 len; 4141 ceph_decode_32_safe(p, end, len, bad); 4142 ceph_decode_need(p, end, len, bad); 4143 err_str = !strncmp(*p, "error_string", len); 4144 *p += len; 4145 ceph_decode_32_safe(p, end, len, bad); 4146 ceph_decode_need(p, end, len, bad); 4147 /* 4148 * Match "blocklisted (blacklisted)" from newer MDSes, 4149 * or "blacklisted" from older MDSes. 4150 */ 4151 if (err_str && strnstr(*p, "blacklisted", len)) 4152 *blocklisted = true; 4153 *p += len; 4154 } 4155 return 0; 4156 bad: 4157 return -1; 4158 } 4159 4160 /* 4161 * handle a mds session control message 4162 */ 4163 static void handle_session(struct ceph_mds_session *session, 4164 struct ceph_msg *msg) 4165 { 4166 struct ceph_mds_client *mdsc = session->s_mdsc; 4167 struct ceph_client *cl = mdsc->fsc->client; 4168 int mds = session->s_mds; 4169 int msg_version = le16_to_cpu(msg->hdr.version); 4170 void *p = msg->front.iov_base; 4171 void *end = p + msg->front.iov_len; 4172 struct ceph_mds_session_head *h; 4173 struct ceph_mds_cap_auth *cap_auths = NULL; 4174 u32 op, cap_auths_num = 0; 4175 u64 seq, features = 0; 4176 int wake = 0; 4177 bool blocklisted = false; 4178 u32 i; 4179 4180 4181 /* decode */ 4182 ceph_decode_need(&p, end, sizeof(*h), bad); 4183 h = p; 4184 p += sizeof(*h); 4185 4186 op = le32_to_cpu(h->op); 4187 seq = le64_to_cpu(h->seq); 4188 4189 if (msg_version >= 3) { 4190 u32 len; 4191 /* version >= 2 and < 5, decode metadata, skip otherwise 4192 * as it's handled via flags. 4193 */ 4194 if (msg_version >= 5) 4195 ceph_decode_skip_map(&p, end, string, string, bad); 4196 else if (__decode_session_metadata(&p, end, &blocklisted) < 0) 4197 goto bad; 4198 4199 /* version >= 3, feature bits */ 4200 ceph_decode_32_safe(&p, end, len, bad); 4201 if (len) { 4202 ceph_decode_64_safe(&p, end, features, bad); 4203 p += len - sizeof(features); 4204 } 4205 } 4206 4207 if (msg_version >= 5) { 4208 u32 flags, len; 4209 4210 /* version >= 4 */ 4211 ceph_decode_skip_16(&p, end, bad); /* struct_v, struct_cv */ 4212 ceph_decode_32_safe(&p, end, len, bad); /* len */ 4213 ceph_decode_skip_n(&p, end, len, bad); /* metric_spec */ 4214 4215 /* version >= 5, flags */ 4216 ceph_decode_32_safe(&p, end, flags, bad); 4217 if (flags & CEPH_SESSION_BLOCKLISTED) { 4218 pr_warn_client(cl, "mds%d session blocklisted\n", 4219 session->s_mds); 4220 blocklisted = true; 4221 } 4222 } 4223 4224 if (msg_version >= 6) { 4225 ceph_decode_32_safe(&p, end, cap_auths_num, bad); 4226 doutc(cl, "cap_auths_num %d\n", cap_auths_num); 4227 4228 if (cap_auths_num && op != CEPH_SESSION_OPEN) { 4229 WARN_ON_ONCE(op != CEPH_SESSION_OPEN); 4230 goto skip_cap_auths; 4231 } 4232 4233 cap_auths = kcalloc(cap_auths_num, 4234 sizeof(struct ceph_mds_cap_auth), 4235 GFP_KERNEL); 4236 if (!cap_auths) { 4237 pr_err_client(cl, "No memory for cap_auths\n"); 4238 return; 4239 } 4240 4241 for (i = 0; i < cap_auths_num; i++) { 4242 u32 _len, j; 4243 4244 /* struct_v, struct_compat, and struct_len in MDSCapAuth */ 4245 ceph_decode_skip_n(&p, end, 2 + sizeof(u32), bad); 4246 4247 /* struct_v, struct_compat, and struct_len in MDSCapMatch */ 4248 ceph_decode_skip_n(&p, end, 2 + sizeof(u32), bad); 4249 ceph_decode_64_safe(&p, end, cap_auths[i].match.uid, bad); 4250 ceph_decode_32_safe(&p, end, _len, bad); 4251 if (_len) { 4252 cap_auths[i].match.gids = kcalloc(_len, sizeof(u32), 4253 GFP_KERNEL); 4254 if (!cap_auths[i].match.gids) { 4255 pr_err_client(cl, "No memory for gids\n"); 4256 goto fail; 4257 } 4258 4259 cap_auths[i].match.num_gids = _len; 4260 for (j = 0; j < _len; j++) 4261 ceph_decode_32_safe(&p, end, 4262 cap_auths[i].match.gids[j], 4263 bad); 4264 } 4265 4266 ceph_decode_32_safe(&p, end, _len, bad); 4267 if (_len) { 4268 cap_auths[i].match.path = kcalloc(_len + 1, sizeof(char), 4269 GFP_KERNEL); 4270 if (!cap_auths[i].match.path) { 4271 pr_err_client(cl, "No memory for path\n"); 4272 goto fail; 4273 } 4274 ceph_decode_copy(&p, cap_auths[i].match.path, _len); 4275 4276 /* Remove the tailing '/' */ 4277 while (_len && cap_auths[i].match.path[_len - 1] == '/') { 4278 cap_auths[i].match.path[_len - 1] = '\0'; 4279 _len -= 1; 4280 } 4281 } 4282 4283 ceph_decode_32_safe(&p, end, _len, bad); 4284 if (_len) { 4285 cap_auths[i].match.fs_name = kcalloc(_len + 1, sizeof(char), 4286 GFP_KERNEL); 4287 if (!cap_auths[i].match.fs_name) { 4288 pr_err_client(cl, "No memory for fs_name\n"); 4289 goto fail; 4290 } 4291 ceph_decode_copy(&p, cap_auths[i].match.fs_name, _len); 4292 } 4293 4294 ceph_decode_8_safe(&p, end, cap_auths[i].match.root_squash, bad); 4295 ceph_decode_8_safe(&p, end, cap_auths[i].readable, bad); 4296 ceph_decode_8_safe(&p, end, cap_auths[i].writeable, bad); 4297 doutc(cl, "uid %lld, num_gids %u, path %s, fs_name %s, root_squash %d, readable %d, writeable %d\n", 4298 cap_auths[i].match.uid, cap_auths[i].match.num_gids, 4299 cap_auths[i].match.path, cap_auths[i].match.fs_name, 4300 cap_auths[i].match.root_squash, 4301 cap_auths[i].readable, cap_auths[i].writeable); 4302 } 4303 } 4304 4305 skip_cap_auths: 4306 mutex_lock(&mdsc->mutex); 4307 if (op == CEPH_SESSION_OPEN) { 4308 if (mdsc->s_cap_auths) { 4309 for (i = 0; i < mdsc->s_cap_auths_num; i++) { 4310 kfree(mdsc->s_cap_auths[i].match.gids); 4311 kfree(mdsc->s_cap_auths[i].match.path); 4312 kfree(mdsc->s_cap_auths[i].match.fs_name); 4313 } 4314 kfree(mdsc->s_cap_auths); 4315 } 4316 mdsc->s_cap_auths_num = cap_auths_num; 4317 mdsc->s_cap_auths = cap_auths; 4318 } 4319 if (op == CEPH_SESSION_CLOSE) { 4320 ceph_get_mds_session(session); 4321 __unregister_session(mdsc, session); 4322 } 4323 /* FIXME: this ttl calculation is generous */ 4324 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 4325 mutex_unlock(&mdsc->mutex); 4326 4327 mutex_lock(&session->s_mutex); 4328 4329 doutc(cl, "mds%d %s %p state %s seq %llu\n", mds, 4330 ceph_session_op_name(op), session, 4331 ceph_session_state_name(session->s_state), seq); 4332 4333 if (session->s_state == CEPH_MDS_SESSION_HUNG) { 4334 session->s_state = CEPH_MDS_SESSION_OPEN; 4335 pr_info_client(cl, "mds%d came back\n", session->s_mds); 4336 } 4337 4338 switch (op) { 4339 case CEPH_SESSION_OPEN: 4340 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 4341 pr_info_client(cl, "mds%d reconnect success\n", 4342 session->s_mds); 4343 4344 session->s_features = features; 4345 if (session->s_state == CEPH_MDS_SESSION_OPEN) { 4346 pr_notice_client(cl, "mds%d is already opened\n", 4347 session->s_mds); 4348 } else { 4349 session->s_state = CEPH_MDS_SESSION_OPEN; 4350 renewed_caps(mdsc, session, 0); 4351 if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, 4352 &session->s_features)) 4353 metric_schedule_delayed(&mdsc->metric); 4354 } 4355 4356 /* 4357 * The connection maybe broken and the session in client 4358 * side has been reinitialized, need to update the seq 4359 * anyway. 4360 */ 4361 if (!session->s_seq && seq) 4362 session->s_seq = seq; 4363 4364 wake = 1; 4365 if (mdsc->stopping) 4366 __close_session(mdsc, session); 4367 break; 4368 4369 case CEPH_SESSION_RENEWCAPS: 4370 if (session->s_renew_seq == seq) 4371 renewed_caps(mdsc, session, 1); 4372 break; 4373 4374 case CEPH_SESSION_CLOSE: 4375 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 4376 pr_info_client(cl, "mds%d reconnect denied\n", 4377 session->s_mds); 4378 session->s_state = CEPH_MDS_SESSION_CLOSED; 4379 cleanup_session_requests(mdsc, session); 4380 remove_session_caps(session); 4381 wake = 2; /* for good measure */ 4382 wake_up_all(&mdsc->session_close_wq); 4383 break; 4384 4385 case CEPH_SESSION_STALE: 4386 pr_info_client(cl, "mds%d caps went stale, renewing\n", 4387 session->s_mds); 4388 atomic_inc(&session->s_cap_gen); 4389 session->s_cap_ttl = jiffies - 1; 4390 send_renew_caps(mdsc, session); 4391 break; 4392 4393 case CEPH_SESSION_RECALL_STATE: 4394 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 4395 break; 4396 4397 case CEPH_SESSION_FLUSHMSG: 4398 /* flush cap releases */ 4399 spin_lock(&session->s_cap_lock); 4400 if (session->s_num_cap_releases) 4401 ceph_flush_session_cap_releases(mdsc, session); 4402 spin_unlock(&session->s_cap_lock); 4403 4404 send_flushmsg_ack(mdsc, session, seq); 4405 break; 4406 4407 case CEPH_SESSION_FORCE_RO: 4408 doutc(cl, "force_session_readonly %p\n", session); 4409 spin_lock(&session->s_cap_lock); 4410 session->s_readonly = true; 4411 spin_unlock(&session->s_cap_lock); 4412 wake_up_session_caps(session, FORCE_RO); 4413 break; 4414 4415 case CEPH_SESSION_REJECT: 4416 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING); 4417 pr_info_client(cl, "mds%d rejected session\n", 4418 session->s_mds); 4419 session->s_state = CEPH_MDS_SESSION_REJECTED; 4420 cleanup_session_requests(mdsc, session); 4421 remove_session_caps(session); 4422 if (blocklisted) 4423 mdsc->fsc->blocklisted = true; 4424 wake = 2; /* for good measure */ 4425 break; 4426 4427 default: 4428 pr_err_client(cl, "bad op %d mds%d\n", op, mds); 4429 WARN_ON(1); 4430 } 4431 4432 mutex_unlock(&session->s_mutex); 4433 if (wake) { 4434 mutex_lock(&mdsc->mutex); 4435 __wake_requests(mdsc, &session->s_waiting); 4436 if (wake == 2) 4437 kick_requests(mdsc, mds); 4438 mutex_unlock(&mdsc->mutex); 4439 } 4440 if (op == CEPH_SESSION_CLOSE) 4441 ceph_put_mds_session(session); 4442 return; 4443 4444 bad: 4445 pr_err_client(cl, "corrupt message mds%d len %d\n", mds, 4446 (int)msg->front.iov_len); 4447 ceph_msg_dump(msg); 4448 fail: 4449 for (i = 0; i < cap_auths_num; i++) { 4450 kfree(cap_auths[i].match.gids); 4451 kfree(cap_auths[i].match.path); 4452 kfree(cap_auths[i].match.fs_name); 4453 } 4454 kfree(cap_auths); 4455 return; 4456 } 4457 4458 void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req) 4459 { 4460 struct ceph_client *cl = req->r_mdsc->fsc->client; 4461 int dcaps; 4462 4463 dcaps = xchg(&req->r_dir_caps, 0); 4464 if (dcaps) { 4465 doutc(cl, "releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 4466 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps); 4467 } 4468 } 4469 4470 void ceph_mdsc_release_dir_caps_async(struct ceph_mds_request *req) 4471 { 4472 struct ceph_client *cl = req->r_mdsc->fsc->client; 4473 int dcaps; 4474 4475 dcaps = xchg(&req->r_dir_caps, 0); 4476 if (dcaps) { 4477 doutc(cl, "releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 4478 ceph_put_cap_refs_async(ceph_inode(req->r_parent), dcaps); 4479 } 4480 } 4481 4482 /* 4483 * called under session->mutex. 4484 */ 4485 static void replay_unsafe_requests(struct ceph_mds_client *mdsc, 4486 struct ceph_mds_session *session) 4487 { 4488 struct ceph_mds_request *req, *nreq; 4489 struct rb_node *p; 4490 4491 doutc(mdsc->fsc->client, "mds%d\n", session->s_mds); 4492 4493 mutex_lock(&mdsc->mutex); 4494 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) 4495 __send_request(session, req, true); 4496 4497 /* 4498 * also re-send old requests when MDS enters reconnect stage. So that MDS 4499 * can process completed request in clientreplay stage. 4500 */ 4501 p = rb_first(&mdsc->request_tree); 4502 while (p) { 4503 req = rb_entry(p, struct ceph_mds_request, r_node); 4504 p = rb_next(p); 4505 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 4506 continue; 4507 if (req->r_attempts == 0) 4508 continue; /* only old requests */ 4509 if (!req->r_session) 4510 continue; 4511 if (req->r_session->s_mds != session->s_mds) 4512 continue; 4513 4514 ceph_mdsc_release_dir_caps_async(req); 4515 4516 __send_request(session, req, true); 4517 } 4518 mutex_unlock(&mdsc->mutex); 4519 } 4520 4521 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state) 4522 { 4523 struct ceph_msg *reply; 4524 struct ceph_pagelist *_pagelist; 4525 struct page *page; 4526 __le32 *addr; 4527 int err = -ENOMEM; 4528 4529 if (!recon_state->allow_multi) 4530 return -ENOSPC; 4531 4532 /* can't handle message that contains both caps and realm */ 4533 BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms); 4534 4535 /* pre-allocate new pagelist */ 4536 _pagelist = ceph_pagelist_alloc(GFP_NOFS); 4537 if (!_pagelist) 4538 return -ENOMEM; 4539 4540 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 4541 if (!reply) 4542 goto fail_msg; 4543 4544 /* placeholder for nr_caps */ 4545 err = ceph_pagelist_encode_32(_pagelist, 0); 4546 if (err < 0) 4547 goto fail; 4548 4549 if (recon_state->nr_caps) { 4550 /* currently encoding caps */ 4551 err = ceph_pagelist_encode_32(recon_state->pagelist, 0); 4552 if (err) 4553 goto fail; 4554 } else { 4555 /* placeholder for nr_realms (currently encoding relams) */ 4556 err = ceph_pagelist_encode_32(_pagelist, 0); 4557 if (err < 0) 4558 goto fail; 4559 } 4560 4561 err = ceph_pagelist_encode_8(recon_state->pagelist, 1); 4562 if (err) 4563 goto fail; 4564 4565 page = list_first_entry(&recon_state->pagelist->head, struct page, lru); 4566 addr = kmap_atomic(page); 4567 if (recon_state->nr_caps) { 4568 /* currently encoding caps */ 4569 *addr = cpu_to_le32(recon_state->nr_caps); 4570 } else { 4571 /* currently encoding relams */ 4572 *(addr + 1) = cpu_to_le32(recon_state->nr_realms); 4573 } 4574 kunmap_atomic(addr); 4575 4576 reply->hdr.version = cpu_to_le16(5); 4577 reply->hdr.compat_version = cpu_to_le16(4); 4578 4579 reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length); 4580 ceph_msg_data_add_pagelist(reply, recon_state->pagelist); 4581 4582 ceph_con_send(&recon_state->session->s_con, reply); 4583 ceph_pagelist_release(recon_state->pagelist); 4584 4585 recon_state->pagelist = _pagelist; 4586 recon_state->nr_caps = 0; 4587 recon_state->nr_realms = 0; 4588 recon_state->msg_version = 5; 4589 return 0; 4590 fail: 4591 ceph_msg_put(reply); 4592 fail_msg: 4593 ceph_pagelist_release(_pagelist); 4594 return err; 4595 } 4596 4597 static struct dentry* d_find_primary(struct inode *inode) 4598 { 4599 struct dentry *alias, *dn = NULL; 4600 4601 if (hlist_empty(&inode->i_dentry)) 4602 return NULL; 4603 4604 spin_lock(&inode->i_lock); 4605 if (hlist_empty(&inode->i_dentry)) 4606 goto out_unlock; 4607 4608 if (S_ISDIR(inode->i_mode)) { 4609 alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias); 4610 if (!IS_ROOT(alias)) 4611 dn = dget(alias); 4612 goto out_unlock; 4613 } 4614 4615 hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) { 4616 spin_lock(&alias->d_lock); 4617 if (!d_unhashed(alias) && 4618 (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) { 4619 dn = dget_dlock(alias); 4620 } 4621 spin_unlock(&alias->d_lock); 4622 if (dn) 4623 break; 4624 } 4625 out_unlock: 4626 spin_unlock(&inode->i_lock); 4627 return dn; 4628 } 4629 4630 /* 4631 * Encode information about a cap for a reconnect with the MDS. 4632 */ 4633 static int reconnect_caps_cb(struct inode *inode, int mds, void *arg) 4634 { 4635 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); 4636 struct ceph_client *cl = ceph_inode_to_client(inode); 4637 union { 4638 struct ceph_mds_cap_reconnect v2; 4639 struct ceph_mds_cap_reconnect_v1 v1; 4640 } rec; 4641 struct ceph_inode_info *ci = ceph_inode(inode); 4642 struct ceph_reconnect_state *recon_state = arg; 4643 struct ceph_pagelist *pagelist = recon_state->pagelist; 4644 struct dentry *dentry; 4645 struct ceph_cap *cap; 4646 struct ceph_path_info path_info = {0}; 4647 int err; 4648 u64 snap_follows; 4649 4650 dentry = d_find_primary(inode); 4651 if (dentry) { 4652 /* set pathbase to parent dir when msg_version >= 2 */ 4653 char *path = ceph_mdsc_build_path(mdsc, dentry, &path_info, 4654 recon_state->msg_version >= 2); 4655 dput(dentry); 4656 if (IS_ERR(path)) { 4657 err = PTR_ERR(path); 4658 goto out_err; 4659 } 4660 } 4661 4662 spin_lock(&ci->i_ceph_lock); 4663 cap = __get_cap_for_mds(ci, mds); 4664 if (!cap) { 4665 spin_unlock(&ci->i_ceph_lock); 4666 err = 0; 4667 goto out_err; 4668 } 4669 doutc(cl, " adding %p ino %llx.%llx cap %p %lld %s\n", inode, 4670 ceph_vinop(inode), cap, cap->cap_id, 4671 ceph_cap_string(cap->issued)); 4672 4673 cap->seq = 0; /* reset cap seq */ 4674 cap->issue_seq = 0; /* and issue_seq */ 4675 cap->mseq = 0; /* and migrate_seq */ 4676 cap->cap_gen = atomic_read(&cap->session->s_cap_gen); 4677 4678 /* These are lost when the session goes away */ 4679 if (S_ISDIR(inode->i_mode)) { 4680 if (cap->issued & CEPH_CAP_DIR_CREATE) { 4681 ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns)); 4682 memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout)); 4683 } 4684 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS; 4685 } 4686 4687 if (recon_state->msg_version >= 2) { 4688 rec.v2.cap_id = cpu_to_le64(cap->cap_id); 4689 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 4690 rec.v2.issued = cpu_to_le32(cap->issued); 4691 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 4692 rec.v2.pathbase = cpu_to_le64(path_info.vino.ino); 4693 rec.v2.flock_len = (__force __le32) 4694 ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1); 4695 } else { 4696 struct timespec64 ts; 4697 4698 rec.v1.cap_id = cpu_to_le64(cap->cap_id); 4699 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 4700 rec.v1.issued = cpu_to_le32(cap->issued); 4701 rec.v1.size = cpu_to_le64(i_size_read(inode)); 4702 ts = inode_get_mtime(inode); 4703 ceph_encode_timespec64(&rec.v1.mtime, &ts); 4704 ts = inode_get_atime(inode); 4705 ceph_encode_timespec64(&rec.v1.atime, &ts); 4706 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 4707 rec.v1.pathbase = cpu_to_le64(path_info.vino.ino); 4708 } 4709 4710 if (list_empty(&ci->i_cap_snaps)) { 4711 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0; 4712 } else { 4713 struct ceph_cap_snap *capsnap = 4714 list_first_entry(&ci->i_cap_snaps, 4715 struct ceph_cap_snap, ci_item); 4716 snap_follows = capsnap->follows; 4717 } 4718 spin_unlock(&ci->i_ceph_lock); 4719 4720 if (recon_state->msg_version >= 2) { 4721 int num_fcntl_locks, num_flock_locks; 4722 struct ceph_filelock *flocks = NULL; 4723 size_t struct_len, total_len = sizeof(u64); 4724 u8 struct_v = 0; 4725 4726 encode_again: 4727 if (rec.v2.flock_len) { 4728 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); 4729 } else { 4730 num_fcntl_locks = 0; 4731 num_flock_locks = 0; 4732 } 4733 if (num_fcntl_locks + num_flock_locks > 0) { 4734 flocks = kmalloc_array(num_fcntl_locks + num_flock_locks, 4735 sizeof(struct ceph_filelock), 4736 GFP_NOFS); 4737 if (!flocks) { 4738 err = -ENOMEM; 4739 goto out_err; 4740 } 4741 err = ceph_encode_locks_to_buffer(inode, flocks, 4742 num_fcntl_locks, 4743 num_flock_locks); 4744 if (err) { 4745 kfree(flocks); 4746 flocks = NULL; 4747 if (err == -ENOSPC) 4748 goto encode_again; 4749 goto out_err; 4750 } 4751 } else { 4752 kfree(flocks); 4753 flocks = NULL; 4754 } 4755 4756 if (recon_state->msg_version >= 3) { 4757 /* version, compat_version and struct_len */ 4758 total_len += 2 * sizeof(u8) + sizeof(u32); 4759 struct_v = 2; 4760 } 4761 /* 4762 * number of encoded locks is stable, so copy to pagelist 4763 */ 4764 struct_len = 2 * sizeof(u32) + 4765 (num_fcntl_locks + num_flock_locks) * 4766 sizeof(struct ceph_filelock); 4767 rec.v2.flock_len = cpu_to_le32(struct_len); 4768 4769 struct_len += sizeof(u32) + path_info.pathlen + sizeof(rec.v2); 4770 4771 if (struct_v >= 2) 4772 struct_len += sizeof(u64); /* snap_follows */ 4773 4774 total_len += struct_len; 4775 4776 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) { 4777 err = send_reconnect_partial(recon_state); 4778 if (err) 4779 goto out_freeflocks; 4780 pagelist = recon_state->pagelist; 4781 } 4782 4783 err = ceph_pagelist_reserve(pagelist, total_len); 4784 if (err) 4785 goto out_freeflocks; 4786 4787 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 4788 if (recon_state->msg_version >= 3) { 4789 ceph_pagelist_encode_8(pagelist, struct_v); 4790 ceph_pagelist_encode_8(pagelist, 1); 4791 ceph_pagelist_encode_32(pagelist, struct_len); 4792 } 4793 ceph_pagelist_encode_string(pagelist, (char *)path_info.path, path_info.pathlen); 4794 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2)); 4795 ceph_locks_to_pagelist(flocks, pagelist, 4796 num_fcntl_locks, num_flock_locks); 4797 if (struct_v >= 2) 4798 ceph_pagelist_encode_64(pagelist, snap_follows); 4799 out_freeflocks: 4800 kfree(flocks); 4801 } else { 4802 err = ceph_pagelist_reserve(pagelist, 4803 sizeof(u64) + sizeof(u32) + 4804 path_info.pathlen + sizeof(rec.v1)); 4805 if (err) 4806 goto out_err; 4807 4808 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 4809 ceph_pagelist_encode_string(pagelist, (char *)path_info.path, path_info.pathlen); 4810 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); 4811 } 4812 4813 out_err: 4814 ceph_mdsc_free_path_info(&path_info); 4815 if (!err) 4816 recon_state->nr_caps++; 4817 return err; 4818 } 4819 4820 static int encode_snap_realms(struct ceph_mds_client *mdsc, 4821 struct ceph_reconnect_state *recon_state) 4822 { 4823 struct rb_node *p; 4824 struct ceph_pagelist *pagelist = recon_state->pagelist; 4825 struct ceph_client *cl = mdsc->fsc->client; 4826 int err = 0; 4827 4828 if (recon_state->msg_version >= 4) { 4829 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms); 4830 if (err < 0) 4831 goto fail; 4832 } 4833 4834 /* 4835 * snaprealms. we provide mds with the ino, seq (version), and 4836 * parent for all of our realms. If the mds has any newer info, 4837 * it will tell us. 4838 */ 4839 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { 4840 struct ceph_snap_realm *realm = 4841 rb_entry(p, struct ceph_snap_realm, node); 4842 struct ceph_mds_snaprealm_reconnect sr_rec; 4843 4844 if (recon_state->msg_version >= 4) { 4845 size_t need = sizeof(u8) * 2 + sizeof(u32) + 4846 sizeof(sr_rec); 4847 4848 if (pagelist->length + need > RECONNECT_MAX_SIZE) { 4849 err = send_reconnect_partial(recon_state); 4850 if (err) 4851 goto fail; 4852 pagelist = recon_state->pagelist; 4853 } 4854 4855 err = ceph_pagelist_reserve(pagelist, need); 4856 if (err) 4857 goto fail; 4858 4859 ceph_pagelist_encode_8(pagelist, 1); 4860 ceph_pagelist_encode_8(pagelist, 1); 4861 ceph_pagelist_encode_32(pagelist, sizeof(sr_rec)); 4862 } 4863 4864 doutc(cl, " adding snap realm %llx seq %lld parent %llx\n", 4865 realm->ino, realm->seq, realm->parent_ino); 4866 sr_rec.ino = cpu_to_le64(realm->ino); 4867 sr_rec.seq = cpu_to_le64(realm->seq); 4868 sr_rec.parent = cpu_to_le64(realm->parent_ino); 4869 4870 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); 4871 if (err) 4872 goto fail; 4873 4874 recon_state->nr_realms++; 4875 } 4876 fail: 4877 return err; 4878 } 4879 4880 4881 /* 4882 * If an MDS fails and recovers, clients need to reconnect in order to 4883 * reestablish shared state. This includes all caps issued through 4884 * this session _and_ the snap_realm hierarchy. Because it's not 4885 * clear which snap realms the mds cares about, we send everything we 4886 * know about.. that ensures we'll then get any new info the 4887 * recovering MDS might have. 4888 * 4889 * This is a relatively heavyweight operation, but it's rare. 4890 */ 4891 static void send_mds_reconnect(struct ceph_mds_client *mdsc, 4892 struct ceph_mds_session *session) 4893 { 4894 struct ceph_client *cl = mdsc->fsc->client; 4895 struct ceph_msg *reply; 4896 int mds = session->s_mds; 4897 int err = -ENOMEM; 4898 struct ceph_reconnect_state recon_state = { 4899 .session = session, 4900 }; 4901 LIST_HEAD(dispose); 4902 4903 pr_info_client(cl, "mds%d reconnect start\n", mds); 4904 4905 recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS); 4906 if (!recon_state.pagelist) 4907 goto fail_nopagelist; 4908 4909 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 4910 if (!reply) 4911 goto fail_nomsg; 4912 4913 xa_destroy(&session->s_delegated_inos); 4914 4915 mutex_lock(&session->s_mutex); 4916 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 4917 session->s_seq = 0; 4918 4919 doutc(cl, "session %p state %s\n", session, 4920 ceph_session_state_name(session->s_state)); 4921 4922 atomic_inc(&session->s_cap_gen); 4923 4924 spin_lock(&session->s_cap_lock); 4925 /* don't know if session is readonly */ 4926 session->s_readonly = 0; 4927 /* 4928 * notify __ceph_remove_cap() that we are composing cap reconnect. 4929 * If a cap get released before being added to the cap reconnect, 4930 * __ceph_remove_cap() should skip queuing cap release. 4931 */ 4932 session->s_cap_reconnect = 1; 4933 /* drop old cap expires; we're about to reestablish that state */ 4934 detach_cap_releases(session, &dispose); 4935 spin_unlock(&session->s_cap_lock); 4936 dispose_cap_releases(mdsc, &dispose); 4937 4938 /* trim unused caps to reduce MDS's cache rejoin time */ 4939 if (mdsc->fsc->sb->s_root) 4940 shrink_dcache_parent(mdsc->fsc->sb->s_root); 4941 4942 ceph_con_close(&session->s_con); 4943 ceph_con_open(&session->s_con, 4944 CEPH_ENTITY_TYPE_MDS, mds, 4945 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 4946 4947 /* replay unsafe requests */ 4948 replay_unsafe_requests(mdsc, session); 4949 4950 ceph_early_kick_flushing_caps(mdsc, session); 4951 4952 down_read(&mdsc->snap_rwsem); 4953 4954 /* placeholder for nr_caps */ 4955 err = ceph_pagelist_encode_32(recon_state.pagelist, 0); 4956 if (err) 4957 goto fail; 4958 4959 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) { 4960 recon_state.msg_version = 3; 4961 recon_state.allow_multi = true; 4962 } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) { 4963 recon_state.msg_version = 3; 4964 } else { 4965 recon_state.msg_version = 2; 4966 } 4967 /* traverse this session's caps */ 4968 err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state); 4969 4970 spin_lock(&session->s_cap_lock); 4971 session->s_cap_reconnect = 0; 4972 spin_unlock(&session->s_cap_lock); 4973 4974 if (err < 0) 4975 goto fail; 4976 4977 /* check if all realms can be encoded into current message */ 4978 if (mdsc->num_snap_realms) { 4979 size_t total_len = 4980 recon_state.pagelist->length + 4981 mdsc->num_snap_realms * 4982 sizeof(struct ceph_mds_snaprealm_reconnect); 4983 if (recon_state.msg_version >= 4) { 4984 /* number of realms */ 4985 total_len += sizeof(u32); 4986 /* version, compat_version and struct_len */ 4987 total_len += mdsc->num_snap_realms * 4988 (2 * sizeof(u8) + sizeof(u32)); 4989 } 4990 if (total_len > RECONNECT_MAX_SIZE) { 4991 if (!recon_state.allow_multi) { 4992 err = -ENOSPC; 4993 goto fail; 4994 } 4995 if (recon_state.nr_caps) { 4996 err = send_reconnect_partial(&recon_state); 4997 if (err) 4998 goto fail; 4999 } 5000 recon_state.msg_version = 5; 5001 } 5002 } 5003 5004 err = encode_snap_realms(mdsc, &recon_state); 5005 if (err < 0) 5006 goto fail; 5007 5008 if (recon_state.msg_version >= 5) { 5009 err = ceph_pagelist_encode_8(recon_state.pagelist, 0); 5010 if (err < 0) 5011 goto fail; 5012 } 5013 5014 if (recon_state.nr_caps || recon_state.nr_realms) { 5015 struct page *page = 5016 list_first_entry(&recon_state.pagelist->head, 5017 struct page, lru); 5018 __le32 *addr = kmap_atomic(page); 5019 if (recon_state.nr_caps) { 5020 WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms); 5021 *addr = cpu_to_le32(recon_state.nr_caps); 5022 } else if (recon_state.msg_version >= 4) { 5023 *(addr + 1) = cpu_to_le32(recon_state.nr_realms); 5024 } 5025 kunmap_atomic(addr); 5026 } 5027 5028 reply->hdr.version = cpu_to_le16(recon_state.msg_version); 5029 if (recon_state.msg_version >= 4) 5030 reply->hdr.compat_version = cpu_to_le16(4); 5031 5032 reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length); 5033 ceph_msg_data_add_pagelist(reply, recon_state.pagelist); 5034 5035 ceph_con_send(&session->s_con, reply); 5036 5037 mutex_unlock(&session->s_mutex); 5038 5039 mutex_lock(&mdsc->mutex); 5040 __wake_requests(mdsc, &session->s_waiting); 5041 mutex_unlock(&mdsc->mutex); 5042 5043 up_read(&mdsc->snap_rwsem); 5044 ceph_pagelist_release(recon_state.pagelist); 5045 return; 5046 5047 fail: 5048 ceph_msg_put(reply); 5049 up_read(&mdsc->snap_rwsem); 5050 mutex_unlock(&session->s_mutex); 5051 fail_nomsg: 5052 ceph_pagelist_release(recon_state.pagelist); 5053 fail_nopagelist: 5054 pr_err_client(cl, "error %d preparing reconnect for mds%d\n", 5055 err, mds); 5056 return; 5057 } 5058 5059 5060 /* 5061 * compare old and new mdsmaps, kicking requests 5062 * and closing out old connections as necessary 5063 * 5064 * called under mdsc->mutex. 5065 */ 5066 static void check_new_map(struct ceph_mds_client *mdsc, 5067 struct ceph_mdsmap *newmap, 5068 struct ceph_mdsmap *oldmap) 5069 { 5070 int i, j, err; 5071 int oldstate, newstate; 5072 struct ceph_mds_session *s; 5073 unsigned long targets[DIV_ROUND_UP(CEPH_MAX_MDS, sizeof(unsigned long))] = {0}; 5074 struct ceph_client *cl = mdsc->fsc->client; 5075 5076 doutc(cl, "new %u old %u\n", newmap->m_epoch, oldmap->m_epoch); 5077 5078 if (newmap->m_info) { 5079 for (i = 0; i < newmap->possible_max_rank; i++) { 5080 for (j = 0; j < newmap->m_info[i].num_export_targets; j++) 5081 set_bit(newmap->m_info[i].export_targets[j], targets); 5082 } 5083 } 5084 5085 for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) { 5086 if (!mdsc->sessions[i]) 5087 continue; 5088 s = mdsc->sessions[i]; 5089 oldstate = ceph_mdsmap_get_state(oldmap, i); 5090 newstate = ceph_mdsmap_get_state(newmap, i); 5091 5092 doutc(cl, "mds%d state %s%s -> %s%s (session %s)\n", 5093 i, ceph_mds_state_name(oldstate), 5094 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", 5095 ceph_mds_state_name(newstate), 5096 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", 5097 ceph_session_state_name(s->s_state)); 5098 5099 if (i >= newmap->possible_max_rank) { 5100 /* force close session for stopped mds */ 5101 ceph_get_mds_session(s); 5102 __unregister_session(mdsc, s); 5103 __wake_requests(mdsc, &s->s_waiting); 5104 mutex_unlock(&mdsc->mutex); 5105 5106 mutex_lock(&s->s_mutex); 5107 cleanup_session_requests(mdsc, s); 5108 remove_session_caps(s); 5109 mutex_unlock(&s->s_mutex); 5110 5111 ceph_put_mds_session(s); 5112 5113 mutex_lock(&mdsc->mutex); 5114 kick_requests(mdsc, i); 5115 continue; 5116 } 5117 5118 if (memcmp(ceph_mdsmap_get_addr(oldmap, i), 5119 ceph_mdsmap_get_addr(newmap, i), 5120 sizeof(struct ceph_entity_addr))) { 5121 /* just close it */ 5122 mutex_unlock(&mdsc->mutex); 5123 mutex_lock(&s->s_mutex); 5124 mutex_lock(&mdsc->mutex); 5125 ceph_con_close(&s->s_con); 5126 mutex_unlock(&s->s_mutex); 5127 s->s_state = CEPH_MDS_SESSION_RESTARTING; 5128 } else if (oldstate == newstate) { 5129 continue; /* nothing new with this mds */ 5130 } 5131 5132 /* 5133 * send reconnect? 5134 */ 5135 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 5136 newstate >= CEPH_MDS_STATE_RECONNECT) { 5137 mutex_unlock(&mdsc->mutex); 5138 clear_bit(i, targets); 5139 send_mds_reconnect(mdsc, s); 5140 mutex_lock(&mdsc->mutex); 5141 } 5142 5143 /* 5144 * kick request on any mds that has gone active. 5145 */ 5146 if (oldstate < CEPH_MDS_STATE_ACTIVE && 5147 newstate >= CEPH_MDS_STATE_ACTIVE) { 5148 if (oldstate != CEPH_MDS_STATE_CREATING && 5149 oldstate != CEPH_MDS_STATE_STARTING) 5150 pr_info_client(cl, "mds%d recovery completed\n", 5151 s->s_mds); 5152 kick_requests(mdsc, i); 5153 mutex_unlock(&mdsc->mutex); 5154 mutex_lock(&s->s_mutex); 5155 mutex_lock(&mdsc->mutex); 5156 ceph_kick_flushing_caps(mdsc, s); 5157 mutex_unlock(&s->s_mutex); 5158 wake_up_session_caps(s, RECONNECT); 5159 } 5160 } 5161 5162 /* 5163 * Only open and reconnect sessions that don't exist yet. 5164 */ 5165 for (i = 0; i < newmap->possible_max_rank; i++) { 5166 /* 5167 * In case the import MDS is crashed just after 5168 * the EImportStart journal is flushed, so when 5169 * a standby MDS takes over it and is replaying 5170 * the EImportStart journal the new MDS daemon 5171 * will wait the client to reconnect it, but the 5172 * client may never register/open the session yet. 5173 * 5174 * Will try to reconnect that MDS daemon if the 5175 * rank number is in the export targets array and 5176 * is the up:reconnect state. 5177 */ 5178 newstate = ceph_mdsmap_get_state(newmap, i); 5179 if (!test_bit(i, targets) || newstate != CEPH_MDS_STATE_RECONNECT) 5180 continue; 5181 5182 /* 5183 * The session maybe registered and opened by some 5184 * requests which were choosing random MDSes during 5185 * the mdsc->mutex's unlock/lock gap below in rare 5186 * case. But the related MDS daemon will just queue 5187 * that requests and be still waiting for the client's 5188 * reconnection request in up:reconnect state. 5189 */ 5190 s = __ceph_lookup_mds_session(mdsc, i); 5191 if (likely(!s)) { 5192 s = __open_export_target_session(mdsc, i); 5193 if (IS_ERR(s)) { 5194 err = PTR_ERR(s); 5195 pr_err_client(cl, 5196 "failed to open export target session, err %d\n", 5197 err); 5198 continue; 5199 } 5200 } 5201 doutc(cl, "send reconnect to export target mds.%d\n", i); 5202 mutex_unlock(&mdsc->mutex); 5203 send_mds_reconnect(mdsc, s); 5204 ceph_put_mds_session(s); 5205 mutex_lock(&mdsc->mutex); 5206 } 5207 5208 for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) { 5209 s = mdsc->sessions[i]; 5210 if (!s) 5211 continue; 5212 if (!ceph_mdsmap_is_laggy(newmap, i)) 5213 continue; 5214 if (s->s_state == CEPH_MDS_SESSION_OPEN || 5215 s->s_state == CEPH_MDS_SESSION_HUNG || 5216 s->s_state == CEPH_MDS_SESSION_CLOSING) { 5217 doutc(cl, " connecting to export targets of laggy mds%d\n", i); 5218 __open_export_target_sessions(mdsc, s); 5219 } 5220 } 5221 } 5222 5223 5224 5225 /* 5226 * leases 5227 */ 5228 5229 /* 5230 * caller must hold session s_mutex, dentry->d_lock 5231 */ 5232 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry) 5233 { 5234 struct ceph_dentry_info *di = ceph_dentry(dentry); 5235 5236 ceph_put_mds_session(di->lease_session); 5237 di->lease_session = NULL; 5238 } 5239 5240 static void handle_lease(struct ceph_mds_client *mdsc, 5241 struct ceph_mds_session *session, 5242 struct ceph_msg *msg) 5243 { 5244 struct ceph_client *cl = mdsc->fsc->client; 5245 struct super_block *sb = mdsc->fsc->sb; 5246 struct inode *inode; 5247 struct dentry *parent, *dentry; 5248 struct ceph_dentry_info *di; 5249 int mds = session->s_mds; 5250 struct ceph_mds_lease *h = msg->front.iov_base; 5251 u32 seq; 5252 struct ceph_vino vino; 5253 struct qstr dname; 5254 int release = 0; 5255 5256 doutc(cl, "from mds%d\n", mds); 5257 5258 if (!ceph_inc_mds_stopping_blocker(mdsc, session)) 5259 return; 5260 5261 /* decode */ 5262 if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) 5263 goto bad; 5264 vino.ino = le64_to_cpu(h->ino); 5265 vino.snap = CEPH_NOSNAP; 5266 seq = le32_to_cpu(h->seq); 5267 dname.len = get_unaligned_le32(h + 1); 5268 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len) 5269 goto bad; 5270 dname.name = (void *)(h + 1) + sizeof(u32); 5271 5272 /* lookup inode */ 5273 inode = ceph_find_inode(sb, vino); 5274 doutc(cl, "%s, ino %llx %p %.*s\n", ceph_lease_op_name(h->action), 5275 vino.ino, inode, dname.len, dname.name); 5276 5277 mutex_lock(&session->s_mutex); 5278 if (!inode) { 5279 doutc(cl, "no inode %llx\n", vino.ino); 5280 goto release; 5281 } 5282 5283 /* dentry */ 5284 parent = d_find_alias(inode); 5285 if (!parent) { 5286 doutc(cl, "no parent dentry on inode %p\n", inode); 5287 WARN_ON(1); 5288 goto release; /* hrm... */ 5289 } 5290 dname.hash = full_name_hash(parent, dname.name, dname.len); 5291 dentry = d_lookup(parent, &dname); 5292 dput(parent); 5293 if (!dentry) 5294 goto release; 5295 5296 spin_lock(&dentry->d_lock); 5297 di = ceph_dentry(dentry); 5298 switch (h->action) { 5299 case CEPH_MDS_LEASE_REVOKE: 5300 if (di->lease_session == session) { 5301 if (ceph_seq_cmp(di->lease_seq, seq) > 0) 5302 h->seq = cpu_to_le32(di->lease_seq); 5303 __ceph_mdsc_drop_dentry_lease(dentry); 5304 } 5305 release = 1; 5306 break; 5307 5308 case CEPH_MDS_LEASE_RENEW: 5309 if (di->lease_session == session && 5310 di->lease_gen == atomic_read(&session->s_cap_gen) && 5311 di->lease_renew_from && 5312 di->lease_renew_after == 0) { 5313 unsigned long duration = 5314 msecs_to_jiffies(le32_to_cpu(h->duration_ms)); 5315 5316 di->lease_seq = seq; 5317 di->time = di->lease_renew_from + duration; 5318 di->lease_renew_after = di->lease_renew_from + 5319 (duration >> 1); 5320 di->lease_renew_from = 0; 5321 } 5322 break; 5323 } 5324 spin_unlock(&dentry->d_lock); 5325 dput(dentry); 5326 5327 if (!release) 5328 goto out; 5329 5330 release: 5331 /* let's just reuse the same message */ 5332 h->action = CEPH_MDS_LEASE_REVOKE_ACK; 5333 ceph_msg_get(msg); 5334 ceph_con_send(&session->s_con, msg); 5335 5336 out: 5337 mutex_unlock(&session->s_mutex); 5338 iput(inode); 5339 5340 ceph_dec_mds_stopping_blocker(mdsc); 5341 return; 5342 5343 bad: 5344 ceph_dec_mds_stopping_blocker(mdsc); 5345 5346 pr_err_client(cl, "corrupt lease message\n"); 5347 ceph_msg_dump(msg); 5348 } 5349 5350 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, 5351 struct dentry *dentry, char action, 5352 u32 seq) 5353 { 5354 struct ceph_client *cl = session->s_mdsc->fsc->client; 5355 struct ceph_msg *msg; 5356 struct ceph_mds_lease *lease; 5357 struct inode *dir; 5358 int len = sizeof(*lease) + sizeof(u32) + NAME_MAX; 5359 5360 doutc(cl, "identry %p %s to mds%d\n", dentry, ceph_lease_op_name(action), 5361 session->s_mds); 5362 5363 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); 5364 if (!msg) 5365 return; 5366 lease = msg->front.iov_base; 5367 lease->action = action; 5368 lease->seq = cpu_to_le32(seq); 5369 5370 spin_lock(&dentry->d_lock); 5371 dir = d_inode(dentry->d_parent); 5372 lease->ino = cpu_to_le64(ceph_ino(dir)); 5373 lease->first = lease->last = cpu_to_le64(ceph_snap(dir)); 5374 5375 put_unaligned_le32(dentry->d_name.len, lease + 1); 5376 memcpy((void *)(lease + 1) + 4, 5377 dentry->d_name.name, dentry->d_name.len); 5378 spin_unlock(&dentry->d_lock); 5379 5380 ceph_con_send(&session->s_con, msg); 5381 } 5382 5383 /* 5384 * lock unlock the session, to wait ongoing session activities 5385 */ 5386 static void lock_unlock_session(struct ceph_mds_session *s) 5387 { 5388 mutex_lock(&s->s_mutex); 5389 mutex_unlock(&s->s_mutex); 5390 } 5391 5392 static void maybe_recover_session(struct ceph_mds_client *mdsc) 5393 { 5394 struct ceph_client *cl = mdsc->fsc->client; 5395 struct ceph_fs_client *fsc = mdsc->fsc; 5396 5397 if (!ceph_test_mount_opt(fsc, CLEANRECOVER)) 5398 return; 5399 5400 if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED) 5401 return; 5402 5403 if (!READ_ONCE(fsc->blocklisted)) 5404 return; 5405 5406 pr_info_client(cl, "auto reconnect after blocklisted\n"); 5407 ceph_force_reconnect(fsc->sb); 5408 } 5409 5410 bool check_session_state(struct ceph_mds_session *s) 5411 { 5412 struct ceph_client *cl = s->s_mdsc->fsc->client; 5413 5414 switch (s->s_state) { 5415 case CEPH_MDS_SESSION_OPEN: 5416 if (s->s_ttl && time_after(jiffies, s->s_ttl)) { 5417 s->s_state = CEPH_MDS_SESSION_HUNG; 5418 pr_info_client(cl, "mds%d hung\n", s->s_mds); 5419 } 5420 break; 5421 case CEPH_MDS_SESSION_CLOSING: 5422 case CEPH_MDS_SESSION_NEW: 5423 case CEPH_MDS_SESSION_RESTARTING: 5424 case CEPH_MDS_SESSION_CLOSED: 5425 case CEPH_MDS_SESSION_REJECTED: 5426 return false; 5427 } 5428 5429 return true; 5430 } 5431 5432 /* 5433 * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply, 5434 * then we need to retransmit that request. 5435 */ 5436 void inc_session_sequence(struct ceph_mds_session *s) 5437 { 5438 struct ceph_client *cl = s->s_mdsc->fsc->client; 5439 5440 lockdep_assert_held(&s->s_mutex); 5441 5442 s->s_seq++; 5443 5444 if (s->s_state == CEPH_MDS_SESSION_CLOSING) { 5445 int ret; 5446 5447 doutc(cl, "resending session close request for mds%d\n", s->s_mds); 5448 ret = request_close_session(s); 5449 if (ret < 0) 5450 pr_err_client(cl, "unable to close session to mds%d: %d\n", 5451 s->s_mds, ret); 5452 } 5453 } 5454 5455 /* 5456 * delayed work -- periodically trim expired leases, renew caps with mds. If 5457 * the @delay parameter is set to 0 or if it's more than 5 secs, the default 5458 * workqueue delay value of 5 secs will be used. 5459 */ 5460 static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay) 5461 { 5462 unsigned long max_delay = HZ * 5; 5463 5464 /* 5 secs default delay */ 5465 if (!delay || (delay > max_delay)) 5466 delay = max_delay; 5467 schedule_delayed_work(&mdsc->delayed_work, 5468 round_jiffies_relative(delay)); 5469 } 5470 5471 static void delayed_work(struct work_struct *work) 5472 { 5473 struct ceph_mds_client *mdsc = 5474 container_of(work, struct ceph_mds_client, delayed_work.work); 5475 unsigned long delay; 5476 int renew_interval; 5477 int renew_caps; 5478 int i; 5479 5480 doutc(mdsc->fsc->client, "mdsc delayed_work\n"); 5481 5482 if (mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHED) 5483 return; 5484 5485 mutex_lock(&mdsc->mutex); 5486 renew_interval = mdsc->mdsmap->m_session_timeout >> 2; 5487 renew_caps = time_after_eq(jiffies, HZ*renew_interval + 5488 mdsc->last_renew_caps); 5489 if (renew_caps) 5490 mdsc->last_renew_caps = jiffies; 5491 5492 for (i = 0; i < mdsc->max_sessions; i++) { 5493 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 5494 if (!s) 5495 continue; 5496 5497 if (!check_session_state(s)) { 5498 ceph_put_mds_session(s); 5499 continue; 5500 } 5501 mutex_unlock(&mdsc->mutex); 5502 5503 ceph_flush_session_cap_releases(mdsc, s); 5504 5505 mutex_lock(&s->s_mutex); 5506 if (renew_caps) 5507 send_renew_caps(mdsc, s); 5508 else 5509 ceph_con_keepalive(&s->s_con); 5510 if (s->s_state == CEPH_MDS_SESSION_OPEN || 5511 s->s_state == CEPH_MDS_SESSION_HUNG) 5512 ceph_send_cap_releases(mdsc, s); 5513 mutex_unlock(&s->s_mutex); 5514 ceph_put_mds_session(s); 5515 5516 mutex_lock(&mdsc->mutex); 5517 } 5518 mutex_unlock(&mdsc->mutex); 5519 5520 delay = ceph_check_delayed_caps(mdsc); 5521 5522 ceph_queue_cap_reclaim_work(mdsc); 5523 5524 ceph_trim_snapid_map(mdsc); 5525 5526 maybe_recover_session(mdsc); 5527 5528 schedule_delayed(mdsc, delay); 5529 } 5530 5531 int ceph_mdsc_init(struct ceph_fs_client *fsc) 5532 5533 { 5534 struct ceph_mds_client *mdsc; 5535 int err; 5536 5537 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS); 5538 if (!mdsc) 5539 return -ENOMEM; 5540 mdsc->fsc = fsc; 5541 mutex_init(&mdsc->mutex); 5542 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); 5543 if (!mdsc->mdsmap) { 5544 err = -ENOMEM; 5545 goto err_mdsc; 5546 } 5547 5548 init_completion(&mdsc->safe_umount_waiters); 5549 spin_lock_init(&mdsc->stopping_lock); 5550 atomic_set(&mdsc->stopping_blockers, 0); 5551 init_completion(&mdsc->stopping_waiter); 5552 atomic64_set(&mdsc->dirty_folios, 0); 5553 init_waitqueue_head(&mdsc->flush_end_wq); 5554 init_waitqueue_head(&mdsc->session_close_wq); 5555 INIT_LIST_HEAD(&mdsc->waiting_for_map); 5556 mdsc->quotarealms_inodes = RB_ROOT; 5557 mutex_init(&mdsc->quotarealms_inodes_mutex); 5558 init_rwsem(&mdsc->snap_rwsem); 5559 mdsc->snap_realms = RB_ROOT; 5560 INIT_LIST_HEAD(&mdsc->snap_empty); 5561 spin_lock_init(&mdsc->snap_empty_lock); 5562 mdsc->request_tree = RB_ROOT; 5563 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); 5564 mdsc->last_renew_caps = jiffies; 5565 INIT_LIST_HEAD(&mdsc->cap_delay_list); 5566 #ifdef CONFIG_DEBUG_FS 5567 INIT_LIST_HEAD(&mdsc->cap_wait_list); 5568 #endif 5569 spin_lock_init(&mdsc->cap_delay_lock); 5570 INIT_LIST_HEAD(&mdsc->cap_unlink_delay_list); 5571 INIT_LIST_HEAD(&mdsc->snap_flush_list); 5572 spin_lock_init(&mdsc->snap_flush_lock); 5573 mdsc->last_cap_flush_tid = 1; 5574 INIT_LIST_HEAD(&mdsc->cap_flush_list); 5575 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); 5576 spin_lock_init(&mdsc->cap_dirty_lock); 5577 init_waitqueue_head(&mdsc->cap_flushing_wq); 5578 INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work); 5579 INIT_WORK(&mdsc->cap_unlink_work, ceph_cap_unlink_work); 5580 err = ceph_metric_init(&mdsc->metric); 5581 if (err) 5582 goto err_mdsmap; 5583 5584 spin_lock_init(&mdsc->dentry_list_lock); 5585 INIT_LIST_HEAD(&mdsc->dentry_leases); 5586 INIT_LIST_HEAD(&mdsc->dentry_dir_leases); 5587 5588 ceph_caps_init(mdsc); 5589 ceph_adjust_caps_max_min(mdsc, fsc->mount_options); 5590 5591 spin_lock_init(&mdsc->snapid_map_lock); 5592 mdsc->snapid_map_tree = RB_ROOT; 5593 INIT_LIST_HEAD(&mdsc->snapid_map_lru); 5594 5595 init_rwsem(&mdsc->pool_perm_rwsem); 5596 mdsc->pool_perm_tree = RB_ROOT; 5597 5598 strscpy(mdsc->nodename, utsname()->nodename, 5599 sizeof(mdsc->nodename)); 5600 5601 fsc->mdsc = mdsc; 5602 return 0; 5603 5604 err_mdsmap: 5605 kfree(mdsc->mdsmap); 5606 err_mdsc: 5607 kfree(mdsc); 5608 return err; 5609 } 5610 5611 /* 5612 * Wait for safe replies on open mds requests. If we time out, drop 5613 * all requests from the tree to avoid dangling dentry refs. 5614 */ 5615 static void wait_requests(struct ceph_mds_client *mdsc) 5616 { 5617 struct ceph_client *cl = mdsc->fsc->client; 5618 struct ceph_options *opts = mdsc->fsc->client->options; 5619 struct ceph_mds_request *req; 5620 5621 mutex_lock(&mdsc->mutex); 5622 if (__get_oldest_req(mdsc)) { 5623 mutex_unlock(&mdsc->mutex); 5624 5625 doutc(cl, "waiting for requests\n"); 5626 wait_for_completion_timeout(&mdsc->safe_umount_waiters, 5627 ceph_timeout_jiffies(opts->mount_timeout)); 5628 5629 /* tear down remaining requests */ 5630 mutex_lock(&mdsc->mutex); 5631 while ((req = __get_oldest_req(mdsc))) { 5632 doutc(cl, "timed out on tid %llu\n", req->r_tid); 5633 list_del_init(&req->r_wait); 5634 __unregister_request(mdsc, req); 5635 } 5636 } 5637 mutex_unlock(&mdsc->mutex); 5638 doutc(cl, "done\n"); 5639 } 5640 5641 void send_flush_mdlog(struct ceph_mds_session *s) 5642 { 5643 struct ceph_client *cl = s->s_mdsc->fsc->client; 5644 struct ceph_msg *msg; 5645 5646 /* 5647 * Pre-luminous MDS crashes when it sees an unknown session request 5648 */ 5649 if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS)) 5650 return; 5651 5652 mutex_lock(&s->s_mutex); 5653 doutc(cl, "request mdlog flush to mds%d (%s)s seq %lld\n", 5654 s->s_mds, ceph_session_state_name(s->s_state), s->s_seq); 5655 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG, 5656 s->s_seq); 5657 if (!msg) { 5658 pr_err_client(cl, "failed to request mdlog flush to mds%d (%s) seq %lld\n", 5659 s->s_mds, ceph_session_state_name(s->s_state), s->s_seq); 5660 } else { 5661 ceph_con_send(&s->s_con, msg); 5662 } 5663 mutex_unlock(&s->s_mutex); 5664 } 5665 5666 static int ceph_mds_auth_match(struct ceph_mds_client *mdsc, 5667 struct ceph_mds_cap_auth *auth, 5668 const struct cred *cred, 5669 char *tpath) 5670 { 5671 u32 caller_uid = from_kuid(&init_user_ns, cred->fsuid); 5672 u32 caller_gid = from_kgid(&init_user_ns, cred->fsgid); 5673 struct ceph_client *cl = mdsc->fsc->client; 5674 const char *fs_name = mdsc->fsc->mount_options->mds_namespace; 5675 const char *spath = mdsc->fsc->mount_options->server_path; 5676 bool gid_matched = false; 5677 u32 gid, tlen, len; 5678 int i, j; 5679 5680 doutc(cl, "fsname check fs_name=%s match.fs_name=%s\n", 5681 fs_name, auth->match.fs_name ? auth->match.fs_name : ""); 5682 if (auth->match.fs_name && strcmp(auth->match.fs_name, fs_name)) { 5683 /* fsname mismatch, try next one */ 5684 return 0; 5685 } 5686 5687 doutc(cl, "match.uid %lld\n", auth->match.uid); 5688 if (auth->match.uid != MDS_AUTH_UID_ANY) { 5689 if (auth->match.uid != caller_uid) 5690 return 0; 5691 if (auth->match.num_gids) { 5692 for (i = 0; i < auth->match.num_gids; i++) { 5693 if (caller_gid == auth->match.gids[i]) 5694 gid_matched = true; 5695 } 5696 if (!gid_matched && cred->group_info->ngroups) { 5697 for (i = 0; i < cred->group_info->ngroups; i++) { 5698 gid = from_kgid(&init_user_ns, 5699 cred->group_info->gid[i]); 5700 for (j = 0; j < auth->match.num_gids; j++) { 5701 if (gid == auth->match.gids[j]) { 5702 gid_matched = true; 5703 break; 5704 } 5705 } 5706 if (gid_matched) 5707 break; 5708 } 5709 } 5710 if (!gid_matched) 5711 return 0; 5712 } 5713 } 5714 5715 /* path match */ 5716 if (auth->match.path) { 5717 if (!tpath) 5718 return 0; 5719 5720 tlen = strlen(tpath); 5721 len = strlen(auth->match.path); 5722 if (len) { 5723 char *_tpath = tpath; 5724 bool free_tpath = false; 5725 int m, n; 5726 5727 doutc(cl, "server path %s, tpath %s, match.path %s\n", 5728 spath, tpath, auth->match.path); 5729 if (spath && (m = strlen(spath)) != 1) { 5730 /* mount path + '/' + tpath + an extra space */ 5731 n = m + 1 + tlen + 1; 5732 _tpath = kmalloc(n, GFP_NOFS); 5733 if (!_tpath) 5734 return -ENOMEM; 5735 /* remove the leading '/' */ 5736 snprintf(_tpath, n, "%s/%s", spath + 1, tpath); 5737 free_tpath = true; 5738 tlen = strlen(_tpath); 5739 } 5740 5741 /* 5742 * Please note the tailing '/' for match.path has already 5743 * been removed when parsing. 5744 * 5745 * Remove the tailing '/' for the target path. 5746 */ 5747 while (tlen && _tpath[tlen - 1] == '/') { 5748 _tpath[tlen - 1] = '\0'; 5749 tlen -= 1; 5750 } 5751 doutc(cl, "_tpath %s\n", _tpath); 5752 5753 /* 5754 * In case first == _tpath && tlen == len: 5755 * match.path=/foo --> /foo _path=/foo --> match 5756 * match.path=/foo/ --> /foo _path=/foo --> match 5757 * 5758 * In case first == _tmatch.path && tlen > len: 5759 * match.path=/foo/ --> /foo _path=/foo/ --> match 5760 * match.path=/foo --> /foo _path=/foo/ --> match 5761 * match.path=/foo/ --> /foo _path=/foo/d --> match 5762 * match.path=/foo --> /foo _path=/food --> mismatch 5763 * 5764 * All the other cases --> mismatch 5765 */ 5766 bool path_matched = true; 5767 char *first = strstr(_tpath, auth->match.path); 5768 if (first != _tpath || 5769 (tlen > len && _tpath[len] != '/')) { 5770 path_matched = false; 5771 } 5772 5773 if (free_tpath) 5774 kfree(_tpath); 5775 5776 if (!path_matched) 5777 return 0; 5778 } 5779 } 5780 5781 doutc(cl, "matched\n"); 5782 return 1; 5783 } 5784 5785 int ceph_mds_check_access(struct ceph_mds_client *mdsc, char *tpath, int mask) 5786 { 5787 const struct cred *cred = get_current_cred(); 5788 u32 caller_uid = from_kuid(&init_user_ns, cred->fsuid); 5789 u32 caller_gid = from_kgid(&init_user_ns, cred->fsgid); 5790 struct ceph_mds_cap_auth *rw_perms_s = NULL; 5791 struct ceph_client *cl = mdsc->fsc->client; 5792 bool root_squash_perms = true; 5793 int i, err; 5794 5795 doutc(cl, "tpath '%s', mask %d, caller_uid %d, caller_gid %d\n", 5796 tpath, mask, caller_uid, caller_gid); 5797 5798 for (i = 0; i < mdsc->s_cap_auths_num; i++) { 5799 struct ceph_mds_cap_auth *s = &mdsc->s_cap_auths[i]; 5800 5801 err = ceph_mds_auth_match(mdsc, s, cred, tpath); 5802 if (err < 0) { 5803 put_cred(cred); 5804 return err; 5805 } else if (err > 0) { 5806 /* always follow the last auth caps' permission */ 5807 root_squash_perms = true; 5808 rw_perms_s = NULL; 5809 if ((mask & MAY_WRITE) && s->writeable && 5810 s->match.root_squash && (!caller_uid || !caller_gid)) 5811 root_squash_perms = false; 5812 5813 if (((mask & MAY_WRITE) && !s->writeable) || 5814 ((mask & MAY_READ) && !s->readable)) 5815 rw_perms_s = s; 5816 } 5817 } 5818 5819 put_cred(cred); 5820 5821 doutc(cl, "root_squash_perms %d, rw_perms_s %p\n", root_squash_perms, 5822 rw_perms_s); 5823 if (root_squash_perms && rw_perms_s == NULL) { 5824 doutc(cl, "access allowed\n"); 5825 return 0; 5826 } 5827 5828 if (!root_squash_perms) { 5829 doutc(cl, "root_squash is enabled and user(%d %d) isn't allowed to write", 5830 caller_uid, caller_gid); 5831 } 5832 if (rw_perms_s) { 5833 doutc(cl, "mds auth caps readable/writeable %d/%d while request r/w %d/%d", 5834 rw_perms_s->readable, rw_perms_s->writeable, 5835 !!(mask & MAY_READ), !!(mask & MAY_WRITE)); 5836 } 5837 doutc(cl, "access denied\n"); 5838 return -EACCES; 5839 } 5840 5841 /* 5842 * called before mount is ro, and before dentries are torn down. 5843 * (hmm, does this still race with new lookups?) 5844 */ 5845 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) 5846 { 5847 doutc(mdsc->fsc->client, "begin\n"); 5848 mdsc->stopping = CEPH_MDSC_STOPPING_BEGIN; 5849 5850 ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true); 5851 ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false); 5852 ceph_flush_dirty_caps(mdsc); 5853 wait_requests(mdsc); 5854 5855 /* 5856 * wait for reply handlers to drop their request refs and 5857 * their inode/dcache refs 5858 */ 5859 ceph_msgr_flush(); 5860 5861 ceph_cleanup_quotarealms_inodes(mdsc); 5862 doutc(mdsc->fsc->client, "done\n"); 5863 } 5864 5865 /* 5866 * flush the mdlog and wait for all write mds requests to flush. 5867 */ 5868 static void flush_mdlog_and_wait_mdsc_unsafe_requests(struct ceph_mds_client *mdsc, 5869 u64 want_tid) 5870 { 5871 struct ceph_client *cl = mdsc->fsc->client; 5872 struct ceph_mds_request *req = NULL, *nextreq; 5873 struct ceph_mds_session *last_session = NULL; 5874 struct rb_node *n; 5875 5876 mutex_lock(&mdsc->mutex); 5877 doutc(cl, "want %lld\n", want_tid); 5878 restart: 5879 req = __get_oldest_req(mdsc); 5880 while (req && req->r_tid <= want_tid) { 5881 /* find next request */ 5882 n = rb_next(&req->r_node); 5883 if (n) 5884 nextreq = rb_entry(n, struct ceph_mds_request, r_node); 5885 else 5886 nextreq = NULL; 5887 if (req->r_op != CEPH_MDS_OP_SETFILELOCK && 5888 (req->r_op & CEPH_MDS_OP_WRITE)) { 5889 struct ceph_mds_session *s = req->r_session; 5890 5891 if (!s) { 5892 req = nextreq; 5893 continue; 5894 } 5895 5896 /* write op */ 5897 ceph_mdsc_get_request(req); 5898 if (nextreq) 5899 ceph_mdsc_get_request(nextreq); 5900 s = ceph_get_mds_session(s); 5901 mutex_unlock(&mdsc->mutex); 5902 5903 /* send flush mdlog request to MDS */ 5904 if (last_session != s) { 5905 send_flush_mdlog(s); 5906 ceph_put_mds_session(last_session); 5907 last_session = s; 5908 } else { 5909 ceph_put_mds_session(s); 5910 } 5911 doutc(cl, "wait on %llu (want %llu)\n", 5912 req->r_tid, want_tid); 5913 wait_for_completion(&req->r_safe_completion); 5914 5915 mutex_lock(&mdsc->mutex); 5916 ceph_mdsc_put_request(req); 5917 if (!nextreq) 5918 break; /* next dne before, so we're done! */ 5919 if (RB_EMPTY_NODE(&nextreq->r_node)) { 5920 /* next request was removed from tree */ 5921 ceph_mdsc_put_request(nextreq); 5922 goto restart; 5923 } 5924 ceph_mdsc_put_request(nextreq); /* won't go away */ 5925 } 5926 req = nextreq; 5927 } 5928 mutex_unlock(&mdsc->mutex); 5929 ceph_put_mds_session(last_session); 5930 doutc(cl, "done\n"); 5931 } 5932 5933 void ceph_mdsc_sync(struct ceph_mds_client *mdsc) 5934 { 5935 struct ceph_client *cl = mdsc->fsc->client; 5936 u64 want_tid, want_flush; 5937 5938 if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) 5939 return; 5940 5941 doutc(cl, "sync\n"); 5942 mutex_lock(&mdsc->mutex); 5943 want_tid = mdsc->last_tid; 5944 mutex_unlock(&mdsc->mutex); 5945 5946 ceph_flush_dirty_caps(mdsc); 5947 ceph_flush_cap_releases(mdsc); 5948 spin_lock(&mdsc->cap_dirty_lock); 5949 want_flush = mdsc->last_cap_flush_tid; 5950 if (!list_empty(&mdsc->cap_flush_list)) { 5951 struct ceph_cap_flush *cf = 5952 list_last_entry(&mdsc->cap_flush_list, 5953 struct ceph_cap_flush, g_list); 5954 cf->wake = true; 5955 } 5956 spin_unlock(&mdsc->cap_dirty_lock); 5957 5958 doutc(cl, "sync want tid %lld flush_seq %lld\n", want_tid, want_flush); 5959 5960 flush_mdlog_and_wait_mdsc_unsafe_requests(mdsc, want_tid); 5961 wait_caps_flush(mdsc, want_flush); 5962 } 5963 5964 /* 5965 * true if all sessions are closed, or we force unmount 5966 */ 5967 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped) 5968 { 5969 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 5970 return true; 5971 return atomic_read(&mdsc->num_sessions) <= skipped; 5972 } 5973 5974 /* 5975 * called after sb is ro or when metadata corrupted. 5976 */ 5977 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) 5978 { 5979 struct ceph_options *opts = mdsc->fsc->client->options; 5980 struct ceph_client *cl = mdsc->fsc->client; 5981 struct ceph_mds_session *session; 5982 int i; 5983 int skipped = 0; 5984 5985 doutc(cl, "begin\n"); 5986 5987 /* close sessions */ 5988 mutex_lock(&mdsc->mutex); 5989 for (i = 0; i < mdsc->max_sessions; i++) { 5990 session = __ceph_lookup_mds_session(mdsc, i); 5991 if (!session) 5992 continue; 5993 mutex_unlock(&mdsc->mutex); 5994 mutex_lock(&session->s_mutex); 5995 if (__close_session(mdsc, session) <= 0) 5996 skipped++; 5997 mutex_unlock(&session->s_mutex); 5998 ceph_put_mds_session(session); 5999 mutex_lock(&mdsc->mutex); 6000 } 6001 mutex_unlock(&mdsc->mutex); 6002 6003 doutc(cl, "waiting for sessions to close\n"); 6004 wait_event_timeout(mdsc->session_close_wq, 6005 done_closing_sessions(mdsc, skipped), 6006 ceph_timeout_jiffies(opts->mount_timeout)); 6007 6008 /* tear down remaining sessions */ 6009 mutex_lock(&mdsc->mutex); 6010 for (i = 0; i < mdsc->max_sessions; i++) { 6011 if (mdsc->sessions[i]) { 6012 session = ceph_get_mds_session(mdsc->sessions[i]); 6013 __unregister_session(mdsc, session); 6014 mutex_unlock(&mdsc->mutex); 6015 mutex_lock(&session->s_mutex); 6016 remove_session_caps(session); 6017 mutex_unlock(&session->s_mutex); 6018 ceph_put_mds_session(session); 6019 mutex_lock(&mdsc->mutex); 6020 } 6021 } 6022 WARN_ON(!list_empty(&mdsc->cap_delay_list)); 6023 mutex_unlock(&mdsc->mutex); 6024 6025 ceph_cleanup_snapid_map(mdsc); 6026 ceph_cleanup_global_and_empty_realms(mdsc); 6027 6028 cancel_work_sync(&mdsc->cap_reclaim_work); 6029 cancel_work_sync(&mdsc->cap_unlink_work); 6030 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 6031 6032 doutc(cl, "done\n"); 6033 } 6034 6035 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc) 6036 { 6037 struct ceph_mds_session *session; 6038 int mds; 6039 6040 doutc(mdsc->fsc->client, "force umount\n"); 6041 6042 mutex_lock(&mdsc->mutex); 6043 for (mds = 0; mds < mdsc->max_sessions; mds++) { 6044 session = __ceph_lookup_mds_session(mdsc, mds); 6045 if (!session) 6046 continue; 6047 6048 if (session->s_state == CEPH_MDS_SESSION_REJECTED) 6049 __unregister_session(mdsc, session); 6050 __wake_requests(mdsc, &session->s_waiting); 6051 mutex_unlock(&mdsc->mutex); 6052 6053 mutex_lock(&session->s_mutex); 6054 __close_session(mdsc, session); 6055 if (session->s_state == CEPH_MDS_SESSION_CLOSING) { 6056 cleanup_session_requests(mdsc, session); 6057 remove_session_caps(session); 6058 } 6059 mutex_unlock(&session->s_mutex); 6060 ceph_put_mds_session(session); 6061 6062 mutex_lock(&mdsc->mutex); 6063 kick_requests(mdsc, mds); 6064 } 6065 __wake_requests(mdsc, &mdsc->waiting_for_map); 6066 mutex_unlock(&mdsc->mutex); 6067 } 6068 6069 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) 6070 { 6071 doutc(mdsc->fsc->client, "stop\n"); 6072 /* 6073 * Make sure the delayed work stopped before releasing 6074 * the resources. 6075 * 6076 * Because the cancel_delayed_work_sync() will only 6077 * guarantee that the work finishes executing. But the 6078 * delayed work will re-arm itself again after that. 6079 */ 6080 flush_delayed_work(&mdsc->delayed_work); 6081 6082 if (mdsc->mdsmap) 6083 ceph_mdsmap_destroy(mdsc->mdsmap); 6084 kfree(mdsc->sessions); 6085 ceph_caps_finalize(mdsc); 6086 6087 if (mdsc->s_cap_auths) { 6088 int i; 6089 6090 for (i = 0; i < mdsc->s_cap_auths_num; i++) { 6091 kfree(mdsc->s_cap_auths[i].match.gids); 6092 kfree(mdsc->s_cap_auths[i].match.path); 6093 kfree(mdsc->s_cap_auths[i].match.fs_name); 6094 } 6095 kfree(mdsc->s_cap_auths); 6096 } 6097 6098 ceph_pool_perm_destroy(mdsc); 6099 } 6100 6101 void ceph_mdsc_destroy(struct ceph_fs_client *fsc) 6102 { 6103 struct ceph_mds_client *mdsc = fsc->mdsc; 6104 doutc(fsc->client, "%p\n", mdsc); 6105 6106 if (!mdsc) 6107 return; 6108 6109 /* flush out any connection work with references to us */ 6110 ceph_msgr_flush(); 6111 6112 ceph_mdsc_stop(mdsc); 6113 6114 ceph_metric_destroy(&mdsc->metric); 6115 6116 fsc->mdsc = NULL; 6117 kfree(mdsc); 6118 doutc(fsc->client, "%p done\n", mdsc); 6119 } 6120 6121 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 6122 { 6123 struct ceph_fs_client *fsc = mdsc->fsc; 6124 struct ceph_client *cl = fsc->client; 6125 const char *mds_namespace = fsc->mount_options->mds_namespace; 6126 void *p = msg->front.iov_base; 6127 void *end = p + msg->front.iov_len; 6128 u32 epoch; 6129 u32 num_fs; 6130 u32 mount_fscid = (u32)-1; 6131 int err = -EINVAL; 6132 6133 ceph_decode_need(&p, end, sizeof(u32), bad); 6134 epoch = ceph_decode_32(&p); 6135 6136 doutc(cl, "epoch %u\n", epoch); 6137 6138 /* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */ 6139 ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad); 6140 6141 ceph_decode_32_safe(&p, end, num_fs, bad); 6142 while (num_fs-- > 0) { 6143 void *info_p, *info_end; 6144 u32 info_len; 6145 u32 fscid, namelen; 6146 6147 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 6148 p += 2; // info_v, info_cv 6149 info_len = ceph_decode_32(&p); 6150 ceph_decode_need(&p, end, info_len, bad); 6151 info_p = p; 6152 info_end = p + info_len; 6153 p = info_end; 6154 6155 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad); 6156 fscid = ceph_decode_32(&info_p); 6157 namelen = ceph_decode_32(&info_p); 6158 ceph_decode_need(&info_p, info_end, namelen, bad); 6159 6160 if (mds_namespace && 6161 strlen(mds_namespace) == namelen && 6162 !strncmp(mds_namespace, (char *)info_p, namelen)) { 6163 mount_fscid = fscid; 6164 break; 6165 } 6166 } 6167 6168 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch); 6169 if (mount_fscid != (u32)-1) { 6170 fsc->client->monc.fs_cluster_id = mount_fscid; 6171 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 6172 0, true); 6173 ceph_monc_renew_subs(&fsc->client->monc); 6174 } else { 6175 err = -ENOENT; 6176 goto err_out; 6177 } 6178 return; 6179 6180 bad: 6181 pr_err_client(cl, "error decoding fsmap %d. Shutting down mount.\n", 6182 err); 6183 ceph_umount_begin(mdsc->fsc->sb); 6184 ceph_msg_dump(msg); 6185 err_out: 6186 mutex_lock(&mdsc->mutex); 6187 mdsc->mdsmap_err = err; 6188 __wake_requests(mdsc, &mdsc->waiting_for_map); 6189 mutex_unlock(&mdsc->mutex); 6190 } 6191 6192 /* 6193 * handle mds map update. 6194 */ 6195 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 6196 { 6197 struct ceph_client *cl = mdsc->fsc->client; 6198 u32 epoch; 6199 u32 maplen; 6200 void *p = msg->front.iov_base; 6201 void *end = p + msg->front.iov_len; 6202 struct ceph_mdsmap *newmap, *oldmap; 6203 struct ceph_fsid fsid; 6204 int err = -EINVAL; 6205 6206 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad); 6207 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 6208 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0) 6209 return; 6210 epoch = ceph_decode_32(&p); 6211 maplen = ceph_decode_32(&p); 6212 doutc(cl, "epoch %u len %d\n", epoch, (int)maplen); 6213 6214 /* do we need it? */ 6215 mutex_lock(&mdsc->mutex); 6216 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 6217 doutc(cl, "epoch %u <= our %u\n", epoch, mdsc->mdsmap->m_epoch); 6218 mutex_unlock(&mdsc->mutex); 6219 return; 6220 } 6221 6222 newmap = ceph_mdsmap_decode(mdsc, &p, end, ceph_msgr2(mdsc->fsc->client)); 6223 if (IS_ERR(newmap)) { 6224 err = PTR_ERR(newmap); 6225 goto bad_unlock; 6226 } 6227 6228 /* swap into place */ 6229 if (mdsc->mdsmap) { 6230 oldmap = mdsc->mdsmap; 6231 mdsc->mdsmap = newmap; 6232 check_new_map(mdsc, newmap, oldmap); 6233 ceph_mdsmap_destroy(oldmap); 6234 } else { 6235 mdsc->mdsmap = newmap; /* first mds map */ 6236 } 6237 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size, 6238 MAX_LFS_FILESIZE); 6239 6240 __wake_requests(mdsc, &mdsc->waiting_for_map); 6241 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP, 6242 mdsc->mdsmap->m_epoch); 6243 6244 mutex_unlock(&mdsc->mutex); 6245 schedule_delayed(mdsc, 0); 6246 return; 6247 6248 bad_unlock: 6249 mutex_unlock(&mdsc->mutex); 6250 bad: 6251 pr_err_client(cl, "error decoding mdsmap %d. Shutting down mount.\n", 6252 err); 6253 ceph_umount_begin(mdsc->fsc->sb); 6254 ceph_msg_dump(msg); 6255 return; 6256 } 6257 6258 static struct ceph_connection *mds_get_con(struct ceph_connection *con) 6259 { 6260 struct ceph_mds_session *s = con->private; 6261 6262 if (ceph_get_mds_session(s)) 6263 return con; 6264 return NULL; 6265 } 6266 6267 static void mds_put_con(struct ceph_connection *con) 6268 { 6269 struct ceph_mds_session *s = con->private; 6270 6271 ceph_put_mds_session(s); 6272 } 6273 6274 /* 6275 * if the client is unresponsive for long enough, the mds will kill 6276 * the session entirely. 6277 */ 6278 static void mds_peer_reset(struct ceph_connection *con) 6279 { 6280 struct ceph_mds_session *s = con->private; 6281 struct ceph_mds_client *mdsc = s->s_mdsc; 6282 6283 pr_warn_client(mdsc->fsc->client, "mds%d closed our session\n", 6284 s->s_mds); 6285 if (READ_ONCE(mdsc->fsc->mount_state) != CEPH_MOUNT_FENCE_IO && 6286 ceph_mdsmap_get_state(mdsc->mdsmap, s->s_mds) >= CEPH_MDS_STATE_RECONNECT) 6287 send_mds_reconnect(mdsc, s); 6288 } 6289 6290 static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg) 6291 { 6292 struct ceph_mds_session *s = con->private; 6293 struct ceph_mds_client *mdsc = s->s_mdsc; 6294 struct ceph_client *cl = mdsc->fsc->client; 6295 int type = le16_to_cpu(msg->hdr.type); 6296 6297 mutex_lock(&mdsc->mutex); 6298 if (__verify_registered_session(mdsc, s) < 0) { 6299 mutex_unlock(&mdsc->mutex); 6300 goto out; 6301 } 6302 mutex_unlock(&mdsc->mutex); 6303 6304 switch (type) { 6305 case CEPH_MSG_MDS_MAP: 6306 ceph_mdsc_handle_mdsmap(mdsc, msg); 6307 break; 6308 case CEPH_MSG_FS_MAP_USER: 6309 ceph_mdsc_handle_fsmap(mdsc, msg); 6310 break; 6311 case CEPH_MSG_CLIENT_SESSION: 6312 handle_session(s, msg); 6313 break; 6314 case CEPH_MSG_CLIENT_REPLY: 6315 handle_reply(s, msg); 6316 break; 6317 case CEPH_MSG_CLIENT_REQUEST_FORWARD: 6318 handle_forward(mdsc, s, msg); 6319 break; 6320 case CEPH_MSG_CLIENT_CAPS: 6321 ceph_handle_caps(s, msg); 6322 break; 6323 case CEPH_MSG_CLIENT_SNAP: 6324 ceph_handle_snap(mdsc, s, msg); 6325 break; 6326 case CEPH_MSG_CLIENT_LEASE: 6327 handle_lease(mdsc, s, msg); 6328 break; 6329 case CEPH_MSG_CLIENT_QUOTA: 6330 ceph_handle_quota(mdsc, s, msg); 6331 break; 6332 6333 default: 6334 pr_err_client(cl, "received unknown message type %d %s\n", 6335 type, ceph_msg_type_name(type)); 6336 } 6337 out: 6338 ceph_msg_put(msg); 6339 } 6340 6341 /* 6342 * authentication 6343 */ 6344 6345 /* 6346 * Note: returned pointer is the address of a structure that's 6347 * managed separately. Caller must *not* attempt to free it. 6348 */ 6349 static struct ceph_auth_handshake * 6350 mds_get_authorizer(struct ceph_connection *con, int *proto, int force_new) 6351 { 6352 struct ceph_mds_session *s = con->private; 6353 struct ceph_mds_client *mdsc = s->s_mdsc; 6354 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 6355 struct ceph_auth_handshake *auth = &s->s_auth; 6356 int ret; 6357 6358 ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS, 6359 force_new, proto, NULL, NULL); 6360 if (ret) 6361 return ERR_PTR(ret); 6362 6363 return auth; 6364 } 6365 6366 static int mds_add_authorizer_challenge(struct ceph_connection *con, 6367 void *challenge_buf, int challenge_buf_len) 6368 { 6369 struct ceph_mds_session *s = con->private; 6370 struct ceph_mds_client *mdsc = s->s_mdsc; 6371 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 6372 6373 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer, 6374 challenge_buf, challenge_buf_len); 6375 } 6376 6377 static int mds_verify_authorizer_reply(struct ceph_connection *con) 6378 { 6379 struct ceph_mds_session *s = con->private; 6380 struct ceph_mds_client *mdsc = s->s_mdsc; 6381 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 6382 struct ceph_auth_handshake *auth = &s->s_auth; 6383 6384 return ceph_auth_verify_authorizer_reply(ac, auth->authorizer, 6385 auth->authorizer_reply_buf, auth->authorizer_reply_buf_len, 6386 NULL, NULL, NULL, NULL); 6387 } 6388 6389 static int mds_invalidate_authorizer(struct ceph_connection *con) 6390 { 6391 struct ceph_mds_session *s = con->private; 6392 struct ceph_mds_client *mdsc = s->s_mdsc; 6393 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 6394 6395 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS); 6396 6397 return ceph_monc_validate_auth(&mdsc->fsc->client->monc); 6398 } 6399 6400 static int mds_get_auth_request(struct ceph_connection *con, 6401 void *buf, int *buf_len, 6402 void **authorizer, int *authorizer_len) 6403 { 6404 struct ceph_mds_session *s = con->private; 6405 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 6406 struct ceph_auth_handshake *auth = &s->s_auth; 6407 int ret; 6408 6409 ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS, 6410 buf, buf_len); 6411 if (ret) 6412 return ret; 6413 6414 *authorizer = auth->authorizer_buf; 6415 *authorizer_len = auth->authorizer_buf_len; 6416 return 0; 6417 } 6418 6419 static int mds_handle_auth_reply_more(struct ceph_connection *con, 6420 void *reply, int reply_len, 6421 void *buf, int *buf_len, 6422 void **authorizer, int *authorizer_len) 6423 { 6424 struct ceph_mds_session *s = con->private; 6425 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 6426 struct ceph_auth_handshake *auth = &s->s_auth; 6427 int ret; 6428 6429 ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len, 6430 buf, buf_len); 6431 if (ret) 6432 return ret; 6433 6434 *authorizer = auth->authorizer_buf; 6435 *authorizer_len = auth->authorizer_buf_len; 6436 return 0; 6437 } 6438 6439 static int mds_handle_auth_done(struct ceph_connection *con, 6440 u64 global_id, void *reply, int reply_len, 6441 u8 *session_key, int *session_key_len, 6442 u8 *con_secret, int *con_secret_len) 6443 { 6444 struct ceph_mds_session *s = con->private; 6445 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 6446 struct ceph_auth_handshake *auth = &s->s_auth; 6447 6448 return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len, 6449 session_key, session_key_len, 6450 con_secret, con_secret_len); 6451 } 6452 6453 static int mds_handle_auth_bad_method(struct ceph_connection *con, 6454 int used_proto, int result, 6455 const int *allowed_protos, int proto_cnt, 6456 const int *allowed_modes, int mode_cnt) 6457 { 6458 struct ceph_mds_session *s = con->private; 6459 struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc; 6460 int ret; 6461 6462 if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS, 6463 used_proto, result, 6464 allowed_protos, proto_cnt, 6465 allowed_modes, mode_cnt)) { 6466 ret = ceph_monc_validate_auth(monc); 6467 if (ret) 6468 return ret; 6469 } 6470 6471 return -EACCES; 6472 } 6473 6474 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con, 6475 struct ceph_msg_header *hdr, int *skip) 6476 { 6477 struct ceph_msg *msg; 6478 int type = (int) le16_to_cpu(hdr->type); 6479 int front_len = (int) le32_to_cpu(hdr->front_len); 6480 6481 if (con->in_msg) 6482 return con->in_msg; 6483 6484 *skip = 0; 6485 msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 6486 if (!msg) { 6487 pr_err("unable to allocate msg type %d len %d\n", 6488 type, front_len); 6489 return NULL; 6490 } 6491 6492 return msg; 6493 } 6494 6495 static int mds_sign_message(struct ceph_msg *msg) 6496 { 6497 struct ceph_mds_session *s = msg->con->private; 6498 struct ceph_auth_handshake *auth = &s->s_auth; 6499 6500 return ceph_auth_sign_message(auth, msg); 6501 } 6502 6503 static int mds_check_message_signature(struct ceph_msg *msg) 6504 { 6505 struct ceph_mds_session *s = msg->con->private; 6506 struct ceph_auth_handshake *auth = &s->s_auth; 6507 6508 return ceph_auth_check_message_signature(auth, msg); 6509 } 6510 6511 static const struct ceph_connection_operations mds_con_ops = { 6512 .get = mds_get_con, 6513 .put = mds_put_con, 6514 .alloc_msg = mds_alloc_msg, 6515 .dispatch = mds_dispatch, 6516 .peer_reset = mds_peer_reset, 6517 .get_authorizer = mds_get_authorizer, 6518 .add_authorizer_challenge = mds_add_authorizer_challenge, 6519 .verify_authorizer_reply = mds_verify_authorizer_reply, 6520 .invalidate_authorizer = mds_invalidate_authorizer, 6521 .sign_message = mds_sign_message, 6522 .check_message_signature = mds_check_message_signature, 6523 .get_auth_request = mds_get_auth_request, 6524 .handle_auth_reply_more = mds_handle_auth_reply_more, 6525 .handle_auth_done = mds_handle_auth_done, 6526 .handle_auth_bad_method = mds_handle_auth_bad_method, 6527 }; 6528 6529 /* eof */ 6530