1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/fs.h> 5 #include <linux/wait.h> 6 #include <linux/slab.h> 7 #include <linux/gfp.h> 8 #include <linux/sched.h> 9 #include <linux/debugfs.h> 10 #include <linux/seq_file.h> 11 #include <linux/ratelimit.h> 12 #include <linux/bits.h> 13 #include <linux/ktime.h> 14 #include <linux/bitmap.h> 15 #include <linux/mnt_idmapping.h> 16 17 #include "super.h" 18 #include "mds_client.h" 19 #include "crypto.h" 20 21 #include <linux/ceph/ceph_features.h> 22 #include <linux/ceph/messenger.h> 23 #include <linux/ceph/decode.h> 24 #include <linux/ceph/pagelist.h> 25 #include <linux/ceph/auth.h> 26 #include <linux/ceph/debugfs.h> 27 #include <trace/events/ceph.h> 28 29 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE) 30 31 /* 32 * A cluster of MDS (metadata server) daemons is responsible for 33 * managing the file system namespace (the directory hierarchy and 34 * inodes) and for coordinating shared access to storage. Metadata is 35 * partitioning hierarchically across a number of servers, and that 36 * partition varies over time as the cluster adjusts the distribution 37 * in order to balance load. 38 * 39 * The MDS client is primarily responsible to managing synchronous 40 * metadata requests for operations like open, unlink, and so forth. 41 * If there is a MDS failure, we find out about it when we (possibly 42 * request and) receive a new MDS map, and can resubmit affected 43 * requests. 44 * 45 * For the most part, though, we take advantage of a lossless 46 * communications channel to the MDS, and do not need to worry about 47 * timing out or resubmitting requests. 48 * 49 * We maintain a stateful "session" with each MDS we interact with. 50 * Within each session, we sent periodic heartbeat messages to ensure 51 * any capabilities or leases we have been issues remain valid. If 52 * the session times out and goes stale, our leases and capabilities 53 * are no longer valid. 54 */ 55 56 struct ceph_reconnect_state { 57 struct ceph_mds_session *session; 58 int nr_caps, nr_realms; 59 struct ceph_pagelist *pagelist; 60 unsigned msg_version; 61 bool allow_multi; 62 }; 63 64 static void __wake_requests(struct ceph_mds_client *mdsc, 65 struct list_head *head); 66 static void ceph_cap_release_work(struct work_struct *work); 67 static void ceph_cap_reclaim_work(struct work_struct *work); 68 69 static const struct ceph_connection_operations mds_con_ops; 70 71 72 /* 73 * mds reply parsing 74 */ 75 76 static int parse_reply_info_quota(void **p, void *end, 77 struct ceph_mds_reply_info_in *info) 78 { 79 u8 struct_v, struct_compat; 80 u32 struct_len; 81 82 ceph_decode_8_safe(p, end, struct_v, bad); 83 ceph_decode_8_safe(p, end, struct_compat, bad); 84 /* struct_v is expected to be >= 1. we only 85 * understand encoding with struct_compat == 1. */ 86 if (!struct_v || struct_compat != 1) 87 goto bad; 88 ceph_decode_32_safe(p, end, struct_len, bad); 89 ceph_decode_need(p, end, struct_len, bad); 90 end = *p + struct_len; 91 ceph_decode_64_safe(p, end, info->max_bytes, bad); 92 ceph_decode_64_safe(p, end, info->max_files, bad); 93 *p = end; 94 return 0; 95 bad: 96 return -EIO; 97 } 98 99 /* 100 * parse individual inode info 101 */ 102 static int parse_reply_info_in(void **p, void *end, 103 struct ceph_mds_reply_info_in *info, 104 u64 features) 105 { 106 int err = 0; 107 u8 struct_v = 0; 108 109 if (features == (u64)-1) { 110 u32 struct_len; 111 u8 struct_compat; 112 ceph_decode_8_safe(p, end, struct_v, bad); 113 ceph_decode_8_safe(p, end, struct_compat, bad); 114 /* struct_v is expected to be >= 1. we only understand 115 * encoding with struct_compat == 1. */ 116 if (!struct_v || struct_compat != 1) 117 goto bad; 118 ceph_decode_32_safe(p, end, struct_len, bad); 119 ceph_decode_need(p, end, struct_len, bad); 120 end = *p + struct_len; 121 } 122 123 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad); 124 info->in = *p; 125 *p += sizeof(struct ceph_mds_reply_inode) + 126 sizeof(*info->in->fragtree.splits) * 127 le32_to_cpu(info->in->fragtree.nsplits); 128 129 ceph_decode_32_safe(p, end, info->symlink_len, bad); 130 ceph_decode_need(p, end, info->symlink_len, bad); 131 info->symlink = *p; 132 *p += info->symlink_len; 133 134 ceph_decode_copy_safe(p, end, &info->dir_layout, 135 sizeof(info->dir_layout), bad); 136 ceph_decode_32_safe(p, end, info->xattr_len, bad); 137 ceph_decode_need(p, end, info->xattr_len, bad); 138 info->xattr_data = *p; 139 *p += info->xattr_len; 140 141 if (features == (u64)-1) { 142 /* inline data */ 143 ceph_decode_64_safe(p, end, info->inline_version, bad); 144 ceph_decode_32_safe(p, end, info->inline_len, bad); 145 ceph_decode_need(p, end, info->inline_len, bad); 146 info->inline_data = *p; 147 *p += info->inline_len; 148 /* quota */ 149 err = parse_reply_info_quota(p, end, info); 150 if (err < 0) 151 goto out_bad; 152 /* pool namespace */ 153 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 154 if (info->pool_ns_len > 0) { 155 ceph_decode_need(p, end, info->pool_ns_len, bad); 156 info->pool_ns_data = *p; 157 *p += info->pool_ns_len; 158 } 159 160 /* btime */ 161 ceph_decode_need(p, end, sizeof(info->btime), bad); 162 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 163 164 /* change attribute */ 165 ceph_decode_64_safe(p, end, info->change_attr, bad); 166 167 /* dir pin */ 168 if (struct_v >= 2) { 169 ceph_decode_32_safe(p, end, info->dir_pin, bad); 170 } else { 171 info->dir_pin = -ENODATA; 172 } 173 174 /* snapshot birth time, remains zero for v<=2 */ 175 if (struct_v >= 3) { 176 ceph_decode_need(p, end, sizeof(info->snap_btime), bad); 177 ceph_decode_copy(p, &info->snap_btime, 178 sizeof(info->snap_btime)); 179 } else { 180 memset(&info->snap_btime, 0, sizeof(info->snap_btime)); 181 } 182 183 /* snapshot count, remains zero for v<=3 */ 184 if (struct_v >= 4) { 185 ceph_decode_64_safe(p, end, info->rsnaps, bad); 186 } else { 187 info->rsnaps = 0; 188 } 189 190 if (struct_v >= 5) { 191 u32 alen; 192 193 ceph_decode_32_safe(p, end, alen, bad); 194 195 while (alen--) { 196 u32 len; 197 198 /* key */ 199 ceph_decode_32_safe(p, end, len, bad); 200 ceph_decode_skip_n(p, end, len, bad); 201 /* value */ 202 ceph_decode_32_safe(p, end, len, bad); 203 ceph_decode_skip_n(p, end, len, bad); 204 } 205 } 206 207 /* fscrypt flag -- ignore */ 208 if (struct_v >= 6) 209 ceph_decode_skip_8(p, end, bad); 210 211 info->fscrypt_auth = NULL; 212 info->fscrypt_auth_len = 0; 213 info->fscrypt_file = NULL; 214 info->fscrypt_file_len = 0; 215 if (struct_v >= 7) { 216 ceph_decode_32_safe(p, end, info->fscrypt_auth_len, bad); 217 if (info->fscrypt_auth_len) { 218 info->fscrypt_auth = kmalloc(info->fscrypt_auth_len, 219 GFP_KERNEL); 220 if (!info->fscrypt_auth) 221 return -ENOMEM; 222 ceph_decode_copy_safe(p, end, info->fscrypt_auth, 223 info->fscrypt_auth_len, bad); 224 } 225 ceph_decode_32_safe(p, end, info->fscrypt_file_len, bad); 226 if (info->fscrypt_file_len) { 227 info->fscrypt_file = kmalloc(info->fscrypt_file_len, 228 GFP_KERNEL); 229 if (!info->fscrypt_file) 230 return -ENOMEM; 231 ceph_decode_copy_safe(p, end, info->fscrypt_file, 232 info->fscrypt_file_len, bad); 233 } 234 } 235 *p = end; 236 } else { 237 /* legacy (unversioned) struct */ 238 if (features & CEPH_FEATURE_MDS_INLINE_DATA) { 239 ceph_decode_64_safe(p, end, info->inline_version, bad); 240 ceph_decode_32_safe(p, end, info->inline_len, bad); 241 ceph_decode_need(p, end, info->inline_len, bad); 242 info->inline_data = *p; 243 *p += info->inline_len; 244 } else 245 info->inline_version = CEPH_INLINE_NONE; 246 247 if (features & CEPH_FEATURE_MDS_QUOTA) { 248 err = parse_reply_info_quota(p, end, info); 249 if (err < 0) 250 goto out_bad; 251 } else { 252 info->max_bytes = 0; 253 info->max_files = 0; 254 } 255 256 info->pool_ns_len = 0; 257 info->pool_ns_data = NULL; 258 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { 259 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 260 if (info->pool_ns_len > 0) { 261 ceph_decode_need(p, end, info->pool_ns_len, bad); 262 info->pool_ns_data = *p; 263 *p += info->pool_ns_len; 264 } 265 } 266 267 if (features & CEPH_FEATURE_FS_BTIME) { 268 ceph_decode_need(p, end, sizeof(info->btime), bad); 269 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 270 ceph_decode_64_safe(p, end, info->change_attr, bad); 271 } 272 273 info->dir_pin = -ENODATA; 274 /* info->snap_btime and info->rsnaps remain zero */ 275 } 276 return 0; 277 bad: 278 err = -EIO; 279 out_bad: 280 return err; 281 } 282 283 static int parse_reply_info_dir(void **p, void *end, 284 struct ceph_mds_reply_dirfrag **dirfrag, 285 u64 features) 286 { 287 if (features == (u64)-1) { 288 u8 struct_v, struct_compat; 289 u32 struct_len; 290 ceph_decode_8_safe(p, end, struct_v, bad); 291 ceph_decode_8_safe(p, end, struct_compat, bad); 292 /* struct_v is expected to be >= 1. we only understand 293 * encoding whose struct_compat == 1. */ 294 if (!struct_v || struct_compat != 1) 295 goto bad; 296 ceph_decode_32_safe(p, end, struct_len, bad); 297 ceph_decode_need(p, end, struct_len, bad); 298 end = *p + struct_len; 299 } 300 301 ceph_decode_need(p, end, sizeof(**dirfrag), bad); 302 *dirfrag = *p; 303 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist); 304 if (unlikely(*p > end)) 305 goto bad; 306 if (features == (u64)-1) 307 *p = end; 308 return 0; 309 bad: 310 return -EIO; 311 } 312 313 static int parse_reply_info_lease(void **p, void *end, 314 struct ceph_mds_reply_lease **lease, 315 u64 features, u32 *altname_len, u8 **altname) 316 { 317 u8 struct_v; 318 u32 struct_len; 319 void *lend; 320 321 if (features == (u64)-1) { 322 u8 struct_compat; 323 324 ceph_decode_8_safe(p, end, struct_v, bad); 325 ceph_decode_8_safe(p, end, struct_compat, bad); 326 327 /* struct_v is expected to be >= 1. we only understand 328 * encoding whose struct_compat == 1. */ 329 if (!struct_v || struct_compat != 1) 330 goto bad; 331 332 ceph_decode_32_safe(p, end, struct_len, bad); 333 } else { 334 struct_len = sizeof(**lease); 335 *altname_len = 0; 336 *altname = NULL; 337 } 338 339 lend = *p + struct_len; 340 ceph_decode_need(p, end, struct_len, bad); 341 *lease = *p; 342 *p += sizeof(**lease); 343 344 if (features == (u64)-1) { 345 if (struct_v >= 2) { 346 ceph_decode_32_safe(p, end, *altname_len, bad); 347 ceph_decode_need(p, end, *altname_len, bad); 348 *altname = *p; 349 *p += *altname_len; 350 } else { 351 *altname = NULL; 352 *altname_len = 0; 353 } 354 } 355 *p = lend; 356 return 0; 357 bad: 358 return -EIO; 359 } 360 361 /* 362 * parse a normal reply, which may contain a (dir+)dentry and/or a 363 * target inode. 364 */ 365 static int parse_reply_info_trace(void **p, void *end, 366 struct ceph_mds_reply_info_parsed *info, 367 u64 features) 368 { 369 int err; 370 371 if (info->head->is_dentry) { 372 err = parse_reply_info_in(p, end, &info->diri, features); 373 if (err < 0) 374 goto out_bad; 375 376 err = parse_reply_info_dir(p, end, &info->dirfrag, features); 377 if (err < 0) 378 goto out_bad; 379 380 ceph_decode_32_safe(p, end, info->dname_len, bad); 381 ceph_decode_need(p, end, info->dname_len, bad); 382 info->dname = *p; 383 *p += info->dname_len; 384 385 err = parse_reply_info_lease(p, end, &info->dlease, features, 386 &info->altname_len, &info->altname); 387 if (err < 0) 388 goto out_bad; 389 } 390 391 if (info->head->is_target) { 392 err = parse_reply_info_in(p, end, &info->targeti, features); 393 if (err < 0) 394 goto out_bad; 395 } 396 397 if (unlikely(*p != end)) 398 goto bad; 399 return 0; 400 401 bad: 402 err = -EIO; 403 out_bad: 404 pr_err("problem parsing mds trace %d\n", err); 405 return err; 406 } 407 408 /* 409 * parse readdir results 410 */ 411 static int parse_reply_info_readdir(void **p, void *end, 412 struct ceph_mds_request *req, 413 u64 features) 414 { 415 struct ceph_mds_reply_info_parsed *info = &req->r_reply_info; 416 struct ceph_client *cl = req->r_mdsc->fsc->client; 417 u32 num, i = 0; 418 int err; 419 420 err = parse_reply_info_dir(p, end, &info->dir_dir, features); 421 if (err < 0) 422 goto out_bad; 423 424 ceph_decode_need(p, end, sizeof(num) + 2, bad); 425 num = ceph_decode_32(p); 426 { 427 u16 flags = ceph_decode_16(p); 428 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END); 429 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE); 430 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER); 431 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH); 432 } 433 if (num == 0) 434 goto done; 435 436 BUG_ON(!info->dir_entries); 437 if ((unsigned long)(info->dir_entries + num) > 438 (unsigned long)info->dir_entries + info->dir_buf_size) { 439 pr_err_client(cl, "dir contents are larger than expected\n"); 440 WARN_ON(1); 441 goto bad; 442 } 443 444 info->dir_nr = num; 445 while (num) { 446 struct inode *inode = d_inode(req->r_dentry); 447 struct ceph_inode_info *ci = ceph_inode(inode); 448 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 449 struct fscrypt_str tname = FSTR_INIT(NULL, 0); 450 struct fscrypt_str oname = FSTR_INIT(NULL, 0); 451 struct ceph_fname fname; 452 u32 altname_len, _name_len; 453 u8 *altname, *_name; 454 455 /* dentry */ 456 ceph_decode_32_safe(p, end, _name_len, bad); 457 ceph_decode_need(p, end, _name_len, bad); 458 _name = *p; 459 *p += _name_len; 460 doutc(cl, "parsed dir dname '%.*s'\n", _name_len, _name); 461 462 if (info->hash_order) 463 rde->raw_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash, 464 _name, _name_len); 465 466 /* dentry lease */ 467 err = parse_reply_info_lease(p, end, &rde->lease, features, 468 &altname_len, &altname); 469 if (err) 470 goto out_bad; 471 472 /* 473 * Try to dencrypt the dentry names and update them 474 * in the ceph_mds_reply_dir_entry struct. 475 */ 476 fname.dir = inode; 477 fname.name = _name; 478 fname.name_len = _name_len; 479 fname.ctext = altname; 480 fname.ctext_len = altname_len; 481 /* 482 * The _name_len maybe larger than altname_len, such as 483 * when the human readable name length is in range of 484 * (CEPH_NOHASH_NAME_MAX, CEPH_NOHASH_NAME_MAX + SHA256_DIGEST_SIZE), 485 * then the copy in ceph_fname_to_usr will corrupt the 486 * data if there has no encryption key. 487 * 488 * Just set the no_copy flag and then if there has no 489 * encryption key the oname.name will be assigned to 490 * _name always. 491 */ 492 fname.no_copy = true; 493 if (altname_len == 0) { 494 /* 495 * Set tname to _name, and this will be used 496 * to do the base64_decode in-place. It's 497 * safe because the decoded string should 498 * always be shorter, which is 3/4 of origin 499 * string. 500 */ 501 tname.name = _name; 502 503 /* 504 * Set oname to _name too, and this will be 505 * used to do the dencryption in-place. 506 */ 507 oname.name = _name; 508 oname.len = _name_len; 509 } else { 510 /* 511 * This will do the decryption only in-place 512 * from altname cryptext directly. 513 */ 514 oname.name = altname; 515 oname.len = altname_len; 516 } 517 rde->is_nokey = false; 518 err = ceph_fname_to_usr(&fname, &tname, &oname, &rde->is_nokey); 519 if (err) { 520 pr_err_client(cl, "unable to decode %.*s, got %d\n", 521 _name_len, _name, err); 522 goto out_bad; 523 } 524 rde->name = oname.name; 525 rde->name_len = oname.len; 526 527 /* inode */ 528 err = parse_reply_info_in(p, end, &rde->inode, features); 529 if (err < 0) 530 goto out_bad; 531 /* ceph_readdir_prepopulate() will update it */ 532 rde->offset = 0; 533 i++; 534 num--; 535 } 536 537 done: 538 /* Skip over any unrecognized fields */ 539 *p = end; 540 return 0; 541 542 bad: 543 err = -EIO; 544 out_bad: 545 pr_err_client(cl, "problem parsing dir contents %d\n", err); 546 return err; 547 } 548 549 /* 550 * parse fcntl F_GETLK results 551 */ 552 static int parse_reply_info_filelock(void **p, void *end, 553 struct ceph_mds_reply_info_parsed *info, 554 u64 features) 555 { 556 if (*p + sizeof(*info->filelock_reply) > end) 557 goto bad; 558 559 info->filelock_reply = *p; 560 561 /* Skip over any unrecognized fields */ 562 *p = end; 563 return 0; 564 bad: 565 return -EIO; 566 } 567 568 569 #if BITS_PER_LONG == 64 570 571 #define DELEGATED_INO_AVAILABLE xa_mk_value(1) 572 573 static int ceph_parse_deleg_inos(void **p, void *end, 574 struct ceph_mds_session *s) 575 { 576 struct ceph_client *cl = s->s_mdsc->fsc->client; 577 u32 sets; 578 579 ceph_decode_32_safe(p, end, sets, bad); 580 doutc(cl, "got %u sets of delegated inodes\n", sets); 581 while (sets--) { 582 u64 start, len; 583 584 ceph_decode_64_safe(p, end, start, bad); 585 ceph_decode_64_safe(p, end, len, bad); 586 587 /* Don't accept a delegation of system inodes */ 588 if (start < CEPH_INO_SYSTEM_BASE) { 589 pr_warn_ratelimited_client(cl, 590 "ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n", 591 start, len); 592 continue; 593 } 594 while (len--) { 595 int err = xa_insert(&s->s_delegated_inos, start++, 596 DELEGATED_INO_AVAILABLE, 597 GFP_KERNEL); 598 if (!err) { 599 doutc(cl, "added delegated inode 0x%llx\n", start - 1); 600 } else if (err == -EBUSY) { 601 pr_warn_client(cl, 602 "MDS delegated inode 0x%llx more than once.\n", 603 start - 1); 604 } else { 605 return err; 606 } 607 } 608 } 609 return 0; 610 bad: 611 return -EIO; 612 } 613 614 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 615 { 616 unsigned long ino; 617 void *val; 618 619 xa_for_each(&s->s_delegated_inos, ino, val) { 620 val = xa_erase(&s->s_delegated_inos, ino); 621 if (val == DELEGATED_INO_AVAILABLE) 622 return ino; 623 } 624 return 0; 625 } 626 627 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 628 { 629 return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE, 630 GFP_KERNEL); 631 } 632 #else /* BITS_PER_LONG == 64 */ 633 /* 634 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just 635 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top 636 * and bottom words? 637 */ 638 static int ceph_parse_deleg_inos(void **p, void *end, 639 struct ceph_mds_session *s) 640 { 641 u32 sets; 642 643 ceph_decode_32_safe(p, end, sets, bad); 644 if (sets) 645 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad); 646 return 0; 647 bad: 648 return -EIO; 649 } 650 651 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 652 { 653 return 0; 654 } 655 656 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 657 { 658 return 0; 659 } 660 #endif /* BITS_PER_LONG == 64 */ 661 662 /* 663 * parse create results 664 */ 665 static int parse_reply_info_create(void **p, void *end, 666 struct ceph_mds_reply_info_parsed *info, 667 u64 features, struct ceph_mds_session *s) 668 { 669 int ret; 670 671 if (features == (u64)-1 || 672 (features & CEPH_FEATURE_REPLY_CREATE_INODE)) { 673 if (*p == end) { 674 /* Malformed reply? */ 675 info->has_create_ino = false; 676 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) { 677 info->has_create_ino = true; 678 /* struct_v, struct_compat, and len */ 679 ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad); 680 ceph_decode_64_safe(p, end, info->ino, bad); 681 ret = ceph_parse_deleg_inos(p, end, s); 682 if (ret) 683 return ret; 684 } else { 685 /* legacy */ 686 ceph_decode_64_safe(p, end, info->ino, bad); 687 info->has_create_ino = true; 688 } 689 } else { 690 if (*p != end) 691 goto bad; 692 } 693 694 /* Skip over any unrecognized fields */ 695 *p = end; 696 return 0; 697 bad: 698 return -EIO; 699 } 700 701 static int parse_reply_info_getvxattr(void **p, void *end, 702 struct ceph_mds_reply_info_parsed *info, 703 u64 features) 704 { 705 u32 value_len; 706 707 ceph_decode_skip_8(p, end, bad); /* skip current version: 1 */ 708 ceph_decode_skip_8(p, end, bad); /* skip first version: 1 */ 709 ceph_decode_skip_32(p, end, bad); /* skip payload length */ 710 711 ceph_decode_32_safe(p, end, value_len, bad); 712 713 if (value_len == end - *p) { 714 info->xattr_info.xattr_value = *p; 715 info->xattr_info.xattr_value_len = value_len; 716 *p = end; 717 return value_len; 718 } 719 bad: 720 return -EIO; 721 } 722 723 /* 724 * parse extra results 725 */ 726 static int parse_reply_info_extra(void **p, void *end, 727 struct ceph_mds_request *req, 728 u64 features, struct ceph_mds_session *s) 729 { 730 struct ceph_mds_reply_info_parsed *info = &req->r_reply_info; 731 u32 op = le32_to_cpu(info->head->op); 732 733 if (op == CEPH_MDS_OP_GETFILELOCK) 734 return parse_reply_info_filelock(p, end, info, features); 735 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP) 736 return parse_reply_info_readdir(p, end, req, features); 737 else if (op == CEPH_MDS_OP_CREATE) 738 return parse_reply_info_create(p, end, info, features, s); 739 else if (op == CEPH_MDS_OP_GETVXATTR) 740 return parse_reply_info_getvxattr(p, end, info, features); 741 else 742 return -EIO; 743 } 744 745 /* 746 * parse entire mds reply 747 */ 748 static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg, 749 struct ceph_mds_request *req, u64 features) 750 { 751 struct ceph_mds_reply_info_parsed *info = &req->r_reply_info; 752 struct ceph_client *cl = s->s_mdsc->fsc->client; 753 void *p, *end; 754 u32 len; 755 int err; 756 757 info->head = msg->front.iov_base; 758 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); 759 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head); 760 761 /* trace */ 762 ceph_decode_32_safe(&p, end, len, bad); 763 if (len > 0) { 764 ceph_decode_need(&p, end, len, bad); 765 err = parse_reply_info_trace(&p, p+len, info, features); 766 if (err < 0) 767 goto out_bad; 768 } 769 770 /* extra */ 771 ceph_decode_32_safe(&p, end, len, bad); 772 if (len > 0) { 773 ceph_decode_need(&p, end, len, bad); 774 err = parse_reply_info_extra(&p, p+len, req, features, s); 775 if (err < 0) 776 goto out_bad; 777 } 778 779 /* snap blob */ 780 ceph_decode_32_safe(&p, end, len, bad); 781 info->snapblob_len = len; 782 info->snapblob = p; 783 p += len; 784 785 if (p != end) 786 goto bad; 787 return 0; 788 789 bad: 790 err = -EIO; 791 out_bad: 792 pr_err_client(cl, "mds parse_reply err %d\n", err); 793 ceph_msg_dump(msg); 794 return err; 795 } 796 797 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 798 { 799 int i; 800 801 kfree(info->diri.fscrypt_auth); 802 kfree(info->diri.fscrypt_file); 803 kfree(info->targeti.fscrypt_auth); 804 kfree(info->targeti.fscrypt_file); 805 if (!info->dir_entries) 806 return; 807 808 for (i = 0; i < info->dir_nr; i++) { 809 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 810 811 kfree(rde->inode.fscrypt_auth); 812 kfree(rde->inode.fscrypt_file); 813 } 814 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size)); 815 } 816 817 /* 818 * In async unlink case the kclient won't wait for the first reply 819 * from MDS and just drop all the links and unhash the dentry and then 820 * succeeds immediately. 821 * 822 * For any new create/link/rename,etc requests followed by using the 823 * same file names we must wait for the first reply of the inflight 824 * unlink request, or the MDS possibly will fail these following 825 * requests with -EEXIST if the inflight async unlink request was 826 * delayed for some reasons. 827 * 828 * And the worst case is that for the none async openc request it will 829 * successfully open the file if the CDentry hasn't been unlinked yet, 830 * but later the previous delayed async unlink request will remove the 831 * CDentry. That means the just created file is possibly deleted later 832 * by accident. 833 * 834 * We need to wait for the inflight async unlink requests to finish 835 * when creating new files/directories by using the same file names. 836 */ 837 int ceph_wait_on_conflict_unlink(struct dentry *dentry) 838 { 839 struct ceph_fs_client *fsc = ceph_sb_to_fs_client(dentry->d_sb); 840 struct ceph_client *cl = fsc->client; 841 struct dentry *pdentry = dentry->d_parent; 842 struct dentry *udentry, *found = NULL; 843 struct ceph_dentry_info *di; 844 struct qstr dname; 845 u32 hash = dentry->d_name.hash; 846 int err; 847 848 dname.name = dentry->d_name.name; 849 dname.len = dentry->d_name.len; 850 851 rcu_read_lock(); 852 hash_for_each_possible_rcu(fsc->async_unlink_conflict, di, 853 hnode, hash) { 854 udentry = di->dentry; 855 856 spin_lock(&udentry->d_lock); 857 if (udentry->d_name.hash != hash) 858 goto next; 859 if (unlikely(udentry->d_parent != pdentry)) 860 goto next; 861 if (!hash_hashed(&di->hnode)) 862 goto next; 863 864 if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags)) 865 pr_warn_client(cl, "dentry %p:%pd async unlink bit is not set\n", 866 dentry, dentry); 867 868 if (!d_same_name(udentry, pdentry, &dname)) 869 goto next; 870 871 found = dget_dlock(udentry); 872 spin_unlock(&udentry->d_lock); 873 break; 874 next: 875 spin_unlock(&udentry->d_lock); 876 } 877 rcu_read_unlock(); 878 879 if (likely(!found)) 880 return 0; 881 882 doutc(cl, "dentry %p:%pd conflict with old %p:%pd\n", dentry, dentry, 883 found, found); 884 885 err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT, 886 TASK_KILLABLE); 887 dput(found); 888 return err; 889 } 890 891 892 /* 893 * sessions 894 */ 895 const char *ceph_session_state_name(int s) 896 { 897 switch (s) { 898 case CEPH_MDS_SESSION_NEW: return "new"; 899 case CEPH_MDS_SESSION_OPENING: return "opening"; 900 case CEPH_MDS_SESSION_OPEN: return "open"; 901 case CEPH_MDS_SESSION_HUNG: return "hung"; 902 case CEPH_MDS_SESSION_CLOSING: return "closing"; 903 case CEPH_MDS_SESSION_CLOSED: return "closed"; 904 case CEPH_MDS_SESSION_RESTARTING: return "restarting"; 905 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; 906 case CEPH_MDS_SESSION_REJECTED: return "rejected"; 907 default: return "???"; 908 } 909 } 910 911 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s) 912 { 913 if (refcount_inc_not_zero(&s->s_ref)) 914 return s; 915 return NULL; 916 } 917 918 void ceph_put_mds_session(struct ceph_mds_session *s) 919 { 920 if (IS_ERR_OR_NULL(s)) 921 return; 922 923 if (refcount_dec_and_test(&s->s_ref)) { 924 if (s->s_auth.authorizer) 925 ceph_auth_destroy_authorizer(s->s_auth.authorizer); 926 WARN_ON(mutex_is_locked(&s->s_mutex)); 927 xa_destroy(&s->s_delegated_inos); 928 kfree(s); 929 } 930 } 931 932 /* 933 * called under mdsc->mutex 934 */ 935 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, 936 int mds) 937 { 938 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 939 return NULL; 940 return ceph_get_mds_session(mdsc->sessions[mds]); 941 } 942 943 static bool __have_session(struct ceph_mds_client *mdsc, int mds) 944 { 945 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 946 return false; 947 else 948 return true; 949 } 950 951 static int __verify_registered_session(struct ceph_mds_client *mdsc, 952 struct ceph_mds_session *s) 953 { 954 if (s->s_mds >= mdsc->max_sessions || 955 mdsc->sessions[s->s_mds] != s) 956 return -ENOENT; 957 return 0; 958 } 959 960 /* 961 * create+register a new session for given mds. 962 * called under mdsc->mutex. 963 */ 964 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, 965 int mds) 966 { 967 struct ceph_client *cl = mdsc->fsc->client; 968 struct ceph_mds_session *s; 969 970 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) 971 return ERR_PTR(-EIO); 972 973 if (mds >= mdsc->mdsmap->possible_max_rank) 974 return ERR_PTR(-EINVAL); 975 976 s = kzalloc_obj(*s, GFP_NOFS); 977 if (!s) 978 return ERR_PTR(-ENOMEM); 979 980 if (mds >= mdsc->max_sessions) { 981 int newmax = 1 << get_count_order(mds + 1); 982 struct ceph_mds_session **sa; 983 size_t ptr_size = sizeof(struct ceph_mds_session *); 984 985 doutc(cl, "realloc to %d\n", newmax); 986 sa = kcalloc(newmax, ptr_size, GFP_NOFS); 987 if (!sa) 988 goto fail_realloc; 989 if (mdsc->sessions) { 990 memcpy(sa, mdsc->sessions, 991 mdsc->max_sessions * ptr_size); 992 kfree(mdsc->sessions); 993 } 994 mdsc->sessions = sa; 995 mdsc->max_sessions = newmax; 996 } 997 998 doutc(cl, "mds%d\n", mds); 999 s->s_mdsc = mdsc; 1000 s->s_mds = mds; 1001 s->s_state = CEPH_MDS_SESSION_NEW; 1002 mutex_init(&s->s_mutex); 1003 1004 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr); 1005 1006 atomic_set(&s->s_cap_gen, 1); 1007 s->s_cap_ttl = jiffies - 1; 1008 1009 spin_lock_init(&s->s_cap_lock); 1010 INIT_LIST_HEAD(&s->s_caps); 1011 refcount_set(&s->s_ref, 1); 1012 INIT_LIST_HEAD(&s->s_waiting); 1013 INIT_LIST_HEAD(&s->s_unsafe); 1014 xa_init(&s->s_delegated_inos); 1015 INIT_LIST_HEAD(&s->s_cap_releases); 1016 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work); 1017 1018 INIT_LIST_HEAD(&s->s_cap_dirty); 1019 INIT_LIST_HEAD(&s->s_cap_flushing); 1020 1021 mdsc->sessions[mds] = s; 1022 atomic_inc(&mdsc->num_sessions); 1023 refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 1024 1025 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, 1026 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 1027 1028 return s; 1029 1030 fail_realloc: 1031 kfree(s); 1032 return ERR_PTR(-ENOMEM); 1033 } 1034 1035 /* 1036 * called under mdsc->mutex 1037 */ 1038 static void __unregister_session(struct ceph_mds_client *mdsc, 1039 struct ceph_mds_session *s) 1040 { 1041 doutc(mdsc->fsc->client, "mds%d %p\n", s->s_mds, s); 1042 BUG_ON(mdsc->sessions[s->s_mds] != s); 1043 mdsc->sessions[s->s_mds] = NULL; 1044 ceph_con_close(&s->s_con); 1045 ceph_put_mds_session(s); 1046 atomic_dec(&mdsc->num_sessions); 1047 } 1048 1049 /* 1050 * drop session refs in request. 1051 * 1052 * should be last request ref, or hold mdsc->mutex 1053 */ 1054 static void put_request_session(struct ceph_mds_request *req) 1055 { 1056 if (req->r_session) { 1057 ceph_put_mds_session(req->r_session); 1058 req->r_session = NULL; 1059 } 1060 } 1061 1062 void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc, 1063 void (*cb)(struct ceph_mds_session *), 1064 bool check_state) 1065 { 1066 int mds; 1067 1068 mutex_lock(&mdsc->mutex); 1069 for (mds = 0; mds < mdsc->max_sessions; ++mds) { 1070 struct ceph_mds_session *s; 1071 1072 s = __ceph_lookup_mds_session(mdsc, mds); 1073 if (!s) 1074 continue; 1075 1076 if (check_state && !check_session_state(s)) { 1077 ceph_put_mds_session(s); 1078 continue; 1079 } 1080 1081 mutex_unlock(&mdsc->mutex); 1082 cb(s); 1083 ceph_put_mds_session(s); 1084 mutex_lock(&mdsc->mutex); 1085 } 1086 mutex_unlock(&mdsc->mutex); 1087 } 1088 1089 void ceph_mdsc_release_request(struct kref *kref) 1090 { 1091 struct ceph_mds_request *req = container_of(kref, 1092 struct ceph_mds_request, 1093 r_kref); 1094 ceph_mdsc_release_dir_caps_async(req); 1095 destroy_reply_info(&req->r_reply_info); 1096 if (req->r_request) 1097 ceph_msg_put(req->r_request); 1098 if (req->r_reply) 1099 ceph_msg_put(req->r_reply); 1100 if (req->r_inode) { 1101 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 1102 iput(req->r_inode); 1103 } 1104 if (req->r_parent) { 1105 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 1106 iput(req->r_parent); 1107 } 1108 iput(req->r_target_inode); 1109 iput(req->r_new_inode); 1110 if (req->r_dentry) 1111 dput(req->r_dentry); 1112 if (req->r_old_dentry) 1113 dput(req->r_old_dentry); 1114 if (req->r_old_dentry_dir) { 1115 /* 1116 * track (and drop pins for) r_old_dentry_dir 1117 * separately, since r_old_dentry's d_parent may have 1118 * changed between the dir mutex being dropped and 1119 * this request being freed. 1120 */ 1121 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), 1122 CEPH_CAP_PIN); 1123 iput(req->r_old_dentry_dir); 1124 } 1125 kfree(req->r_path1); 1126 kfree(req->r_path2); 1127 put_cred(req->r_cred); 1128 if (req->r_mnt_idmap) 1129 mnt_idmap_put(req->r_mnt_idmap); 1130 if (req->r_pagelist) 1131 ceph_pagelist_release(req->r_pagelist); 1132 kfree(req->r_fscrypt_auth); 1133 kfree(req->r_altname); 1134 put_request_session(req); 1135 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); 1136 WARN_ON_ONCE(!list_empty(&req->r_wait)); 1137 kmem_cache_free(ceph_mds_request_cachep, req); 1138 } 1139 1140 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node) 1141 1142 /* 1143 * lookup session, bump ref if found. 1144 * 1145 * called under mdsc->mutex. 1146 */ 1147 static struct ceph_mds_request * 1148 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid) 1149 { 1150 struct ceph_mds_request *req; 1151 1152 req = lookup_request(&mdsc->request_tree, tid); 1153 if (req) 1154 ceph_mdsc_get_request(req); 1155 1156 return req; 1157 } 1158 1159 /* 1160 * Register an in-flight request, and assign a tid. Link to directory 1161 * are modifying (if any). 1162 * 1163 * Called under mdsc->mutex. 1164 */ 1165 static void __register_request(struct ceph_mds_client *mdsc, 1166 struct ceph_mds_request *req, 1167 struct inode *dir) 1168 { 1169 struct ceph_client *cl = mdsc->fsc->client; 1170 int ret = 0; 1171 1172 req->r_tid = ++mdsc->last_tid; 1173 if (req->r_num_caps) { 1174 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation, 1175 req->r_num_caps); 1176 if (ret < 0) { 1177 pr_err_client(cl, "%p failed to reserve caps: %d\n", 1178 req, ret); 1179 /* set req->r_err to fail early from __do_request */ 1180 req->r_err = ret; 1181 return; 1182 } 1183 } 1184 doutc(cl, "%p tid %lld\n", req, req->r_tid); 1185 ceph_mdsc_get_request(req); 1186 insert_request(&mdsc->request_tree, req); 1187 1188 req->r_cred = get_current_cred(); 1189 if (!req->r_mnt_idmap) 1190 req->r_mnt_idmap = &nop_mnt_idmap; 1191 1192 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK) 1193 mdsc->oldest_tid = req->r_tid; 1194 1195 if (dir) { 1196 struct ceph_inode_info *ci = ceph_inode(dir); 1197 1198 ihold(dir); 1199 req->r_unsafe_dir = dir; 1200 spin_lock(&ci->i_unsafe_lock); 1201 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); 1202 spin_unlock(&ci->i_unsafe_lock); 1203 } 1204 } 1205 1206 static void __unregister_request(struct ceph_mds_client *mdsc, 1207 struct ceph_mds_request *req) 1208 { 1209 doutc(mdsc->fsc->client, "%p tid %lld\n", req, req->r_tid); 1210 1211 /* Never leave an unregistered request on an unsafe list! */ 1212 list_del_init(&req->r_unsafe_item); 1213 1214 if (req->r_tid == mdsc->oldest_tid) { 1215 struct rb_node *p = rb_next(&req->r_node); 1216 mdsc->oldest_tid = 0; 1217 while (p) { 1218 struct ceph_mds_request *next_req = 1219 rb_entry(p, struct ceph_mds_request, r_node); 1220 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) { 1221 mdsc->oldest_tid = next_req->r_tid; 1222 break; 1223 } 1224 p = rb_next(p); 1225 } 1226 } 1227 1228 erase_request(&mdsc->request_tree, req); 1229 1230 if (req->r_unsafe_dir) { 1231 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 1232 spin_lock(&ci->i_unsafe_lock); 1233 list_del_init(&req->r_unsafe_dir_item); 1234 spin_unlock(&ci->i_unsafe_lock); 1235 } 1236 if (req->r_target_inode && 1237 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 1238 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); 1239 spin_lock(&ci->i_unsafe_lock); 1240 list_del_init(&req->r_unsafe_target_item); 1241 spin_unlock(&ci->i_unsafe_lock); 1242 } 1243 1244 if (req->r_unsafe_dir) { 1245 iput(req->r_unsafe_dir); 1246 req->r_unsafe_dir = NULL; 1247 } 1248 1249 complete_all(&req->r_safe_completion); 1250 1251 ceph_mdsc_put_request(req); 1252 } 1253 1254 /* 1255 * Walk back up the dentry tree until we hit a dentry representing a 1256 * non-snapshot inode. We do this using the rcu_read_lock (which must be held 1257 * when calling this) to ensure that the objects won't disappear while we're 1258 * working with them. Once we hit a candidate dentry, we attempt to take a 1259 * reference to it, and return that as the result. 1260 */ 1261 static struct inode *get_nonsnap_parent(struct dentry *dentry) 1262 { 1263 struct inode *inode = NULL; 1264 1265 while (dentry && !IS_ROOT(dentry)) { 1266 inode = d_inode_rcu(dentry); 1267 if (!inode || ceph_snap(inode) == CEPH_NOSNAP) 1268 break; 1269 dentry = dentry->d_parent; 1270 } 1271 if (inode) 1272 inode = igrab(inode); 1273 return inode; 1274 } 1275 1276 /* 1277 * Choose mds to send request to next. If there is a hint set in the 1278 * request (e.g., due to a prior forward hint from the mds), use that. 1279 * Otherwise, consult frag tree and/or caps to identify the 1280 * appropriate mds. If all else fails, choose randomly. 1281 * 1282 * Called under mdsc->mutex. 1283 */ 1284 static int __choose_mds(struct ceph_mds_client *mdsc, 1285 struct ceph_mds_request *req, 1286 bool *random) 1287 { 1288 struct inode *inode; 1289 struct ceph_inode_info *ci; 1290 struct ceph_cap *cap; 1291 int mode = req->r_direct_mode; 1292 int mds = -1; 1293 u32 hash = req->r_direct_hash; 1294 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); 1295 struct ceph_client *cl = mdsc->fsc->client; 1296 1297 if (random) 1298 *random = false; 1299 1300 /* 1301 * is there a specific mds we should try? ignore hint if we have 1302 * no session and the mds is not up (active or recovering). 1303 */ 1304 if (req->r_resend_mds >= 0 && 1305 (__have_session(mdsc, req->r_resend_mds) || 1306 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) { 1307 doutc(cl, "using resend_mds mds%d\n", req->r_resend_mds); 1308 return req->r_resend_mds; 1309 } 1310 1311 if (mode == USE_RANDOM_MDS) 1312 goto random; 1313 1314 inode = NULL; 1315 if (req->r_inode) { 1316 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) { 1317 inode = req->r_inode; 1318 ihold(inode); 1319 } else { 1320 /* req->r_dentry is non-null for LSSNAP request */ 1321 rcu_read_lock(); 1322 inode = get_nonsnap_parent(req->r_dentry); 1323 rcu_read_unlock(); 1324 doutc(cl, "using snapdir's parent %p %llx.%llx\n", 1325 inode, ceph_vinop(inode)); 1326 } 1327 } else if (req->r_dentry) { 1328 /* ignore race with rename; old or new d_parent is okay */ 1329 struct dentry *parent; 1330 struct inode *dir; 1331 1332 rcu_read_lock(); 1333 parent = READ_ONCE(req->r_dentry->d_parent); 1334 dir = req->r_parent ? : d_inode_rcu(parent); 1335 1336 if (!dir || dir->i_sb != mdsc->fsc->sb) { 1337 /* not this fs or parent went negative */ 1338 inode = d_inode(req->r_dentry); 1339 if (inode) 1340 ihold(inode); 1341 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 1342 /* direct snapped/virtual snapdir requests 1343 * based on parent dir inode */ 1344 inode = get_nonsnap_parent(parent); 1345 doutc(cl, "using nonsnap parent %p %llx.%llx\n", 1346 inode, ceph_vinop(inode)); 1347 } else { 1348 /* dentry target */ 1349 inode = d_inode(req->r_dentry); 1350 if (!inode || mode == USE_AUTH_MDS) { 1351 /* dir + name */ 1352 inode = igrab(dir); 1353 hash = ceph_dentry_hash(dir, req->r_dentry); 1354 is_hash = true; 1355 } else { 1356 ihold(inode); 1357 } 1358 } 1359 rcu_read_unlock(); 1360 } 1361 1362 if (!inode) 1363 goto random; 1364 1365 doutc(cl, "%p %llx.%llx is_hash=%d (0x%x) mode %d\n", inode, 1366 ceph_vinop(inode), (int)is_hash, hash, mode); 1367 ci = ceph_inode(inode); 1368 1369 if (is_hash && S_ISDIR(inode->i_mode)) { 1370 struct ceph_inode_frag frag; 1371 int found; 1372 1373 ceph_choose_frag(ci, hash, &frag, &found); 1374 if (found) { 1375 if (mode == USE_ANY_MDS && frag.ndist > 0) { 1376 u8 r; 1377 1378 /* choose a random replica */ 1379 get_random_bytes(&r, 1); 1380 r %= frag.ndist; 1381 mds = frag.dist[r]; 1382 doutc(cl, "%p %llx.%llx frag %u mds%d (%d/%d)\n", 1383 inode, ceph_vinop(inode), frag.frag, 1384 mds, (int)r, frag.ndist); 1385 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1386 CEPH_MDS_STATE_ACTIVE && 1387 !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds)) 1388 goto out; 1389 } 1390 1391 /* since this file/dir wasn't known to be 1392 * replicated, then we want to look for the 1393 * authoritative mds. */ 1394 if (frag.mds >= 0) { 1395 /* choose auth mds */ 1396 mds = frag.mds; 1397 doutc(cl, "%p %llx.%llx frag %u mds%d (auth)\n", 1398 inode, ceph_vinop(inode), frag.frag, mds); 1399 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1400 CEPH_MDS_STATE_ACTIVE) { 1401 if (!ceph_mdsmap_is_laggy(mdsc->mdsmap, 1402 mds)) 1403 goto out; 1404 } 1405 } 1406 mode = USE_AUTH_MDS; 1407 } 1408 } 1409 1410 spin_lock(&ci->i_ceph_lock); 1411 cap = NULL; 1412 if (mode == USE_AUTH_MDS) 1413 cap = ci->i_auth_cap; 1414 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) 1415 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); 1416 if (!cap) { 1417 spin_unlock(&ci->i_ceph_lock); 1418 iput(inode); 1419 goto random; 1420 } 1421 mds = cap->session->s_mds; 1422 doutc(cl, "%p %llx.%llx mds%d (%scap %p)\n", inode, 1423 ceph_vinop(inode), mds, 1424 cap == ci->i_auth_cap ? "auth " : "", cap); 1425 spin_unlock(&ci->i_ceph_lock); 1426 out: 1427 iput(inode); 1428 return mds; 1429 1430 random: 1431 if (random) 1432 *random = true; 1433 1434 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap); 1435 doutc(cl, "chose random mds%d\n", mds); 1436 return mds; 1437 } 1438 1439 1440 /* 1441 * session messages 1442 */ 1443 struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq) 1444 { 1445 struct ceph_msg *msg; 1446 struct ceph_mds_session_head *h; 1447 1448 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS, 1449 false); 1450 if (!msg) { 1451 pr_err("ENOMEM creating session %s msg\n", 1452 ceph_session_op_name(op)); 1453 return NULL; 1454 } 1455 h = msg->front.iov_base; 1456 h->op = cpu_to_le32(op); 1457 h->seq = cpu_to_le64(seq); 1458 1459 return msg; 1460 } 1461 1462 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED; 1463 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8) 1464 static int encode_supported_features(void **p, void *end) 1465 { 1466 static const size_t count = ARRAY_SIZE(feature_bits); 1467 1468 if (count > 0) { 1469 size_t i; 1470 size_t size = FEATURE_BYTES(count); 1471 unsigned long bit; 1472 1473 if (WARN_ON_ONCE(*p + 4 + size > end)) 1474 return -ERANGE; 1475 1476 ceph_encode_32(p, size); 1477 memset(*p, 0, size); 1478 for (i = 0; i < count; i++) { 1479 bit = feature_bits[i]; 1480 ((unsigned char *)(*p))[bit / 8] |= BIT(bit % 8); 1481 } 1482 *p += size; 1483 } else { 1484 if (WARN_ON_ONCE(*p + 4 > end)) 1485 return -ERANGE; 1486 1487 ceph_encode_32(p, 0); 1488 } 1489 1490 return 0; 1491 } 1492 1493 static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED; 1494 #define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8) 1495 static int encode_metric_spec(void **p, void *end) 1496 { 1497 static const size_t count = ARRAY_SIZE(metric_bits); 1498 1499 /* header */ 1500 if (WARN_ON_ONCE(*p + 2 > end)) 1501 return -ERANGE; 1502 1503 ceph_encode_8(p, 1); /* version */ 1504 ceph_encode_8(p, 1); /* compat */ 1505 1506 if (count > 0) { 1507 size_t i; 1508 size_t size = METRIC_BYTES(count); 1509 1510 if (WARN_ON_ONCE(*p + 4 + 4 + size > end)) 1511 return -ERANGE; 1512 1513 /* metric spec info length */ 1514 ceph_encode_32(p, 4 + size); 1515 1516 /* metric spec */ 1517 ceph_encode_32(p, size); 1518 memset(*p, 0, size); 1519 for (i = 0; i < count; i++) 1520 ((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8); 1521 *p += size; 1522 } else { 1523 if (WARN_ON_ONCE(*p + 4 + 4 > end)) 1524 return -ERANGE; 1525 1526 /* metric spec info length */ 1527 ceph_encode_32(p, 4); 1528 /* metric spec */ 1529 ceph_encode_32(p, 0); 1530 } 1531 1532 return 0; 1533 } 1534 1535 /* 1536 * session message, specialization for CEPH_SESSION_REQUEST_OPEN 1537 * to include additional client metadata fields. 1538 */ 1539 static struct ceph_msg * 1540 create_session_full_msg(struct ceph_mds_client *mdsc, int op, u64 seq) 1541 { 1542 struct ceph_msg *msg; 1543 struct ceph_mds_session_head *h; 1544 int i; 1545 int extra_bytes = 0; 1546 int metadata_key_count = 0; 1547 struct ceph_options *opt = mdsc->fsc->client->options; 1548 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options; 1549 struct ceph_client *cl = mdsc->fsc->client; 1550 size_t size, count; 1551 void *p, *end; 1552 int ret; 1553 1554 const char* metadata[][2] = { 1555 {"hostname", mdsc->nodename}, 1556 {"kernel_version", init_utsname()->release}, 1557 {"entity_id", opt->name ? : ""}, 1558 {"root", fsopt->server_path ? : "/"}, 1559 {NULL, NULL} 1560 }; 1561 1562 /* Calculate serialized length of metadata */ 1563 extra_bytes = 4; /* map length */ 1564 for (i = 0; metadata[i][0]; ++i) { 1565 extra_bytes += 8 + strlen(metadata[i][0]) + 1566 strlen(metadata[i][1]); 1567 metadata_key_count++; 1568 } 1569 1570 /* supported feature */ 1571 size = 0; 1572 count = ARRAY_SIZE(feature_bits); 1573 if (count > 0) 1574 size = FEATURE_BYTES(count); 1575 extra_bytes += 4 + size; 1576 1577 /* metric spec */ 1578 size = 0; 1579 count = ARRAY_SIZE(metric_bits); 1580 if (count > 0) 1581 size = METRIC_BYTES(count); 1582 extra_bytes += 2 + 4 + 4 + size; 1583 1584 /* flags, mds auth caps and oldest_client_tid */ 1585 extra_bytes += 4 + 4 + 8; 1586 1587 /* Allocate the message */ 1588 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes, 1589 GFP_NOFS, false); 1590 if (!msg) { 1591 pr_err_client(cl, "ENOMEM creating session open msg\n"); 1592 return ERR_PTR(-ENOMEM); 1593 } 1594 p = msg->front.iov_base; 1595 end = p + msg->front.iov_len; 1596 1597 h = p; 1598 h->op = cpu_to_le32(op); 1599 h->seq = cpu_to_le64(seq); 1600 1601 /* 1602 * Serialize client metadata into waiting buffer space, using 1603 * the format that userspace expects for map<string, string> 1604 * 1605 * ClientSession messages with metadata are v7 1606 */ 1607 msg->hdr.version = cpu_to_le16(7); 1608 msg->hdr.compat_version = cpu_to_le16(1); 1609 1610 /* The write pointer, following the session_head structure */ 1611 p += sizeof(*h); 1612 1613 /* Number of entries in the map */ 1614 ceph_encode_32(&p, metadata_key_count); 1615 1616 /* Two length-prefixed strings for each entry in the map */ 1617 for (i = 0; metadata[i][0]; ++i) { 1618 size_t const key_len = strlen(metadata[i][0]); 1619 size_t const val_len = strlen(metadata[i][1]); 1620 1621 ceph_encode_32(&p, key_len); 1622 memcpy(p, metadata[i][0], key_len); 1623 p += key_len; 1624 ceph_encode_32(&p, val_len); 1625 memcpy(p, metadata[i][1], val_len); 1626 p += val_len; 1627 } 1628 1629 ret = encode_supported_features(&p, end); 1630 if (ret) { 1631 pr_err_client(cl, "encode_supported_features failed!\n"); 1632 ceph_msg_put(msg); 1633 return ERR_PTR(ret); 1634 } 1635 1636 ret = encode_metric_spec(&p, end); 1637 if (ret) { 1638 pr_err_client(cl, "encode_metric_spec failed!\n"); 1639 ceph_msg_put(msg); 1640 return ERR_PTR(ret); 1641 } 1642 1643 /* version == 5, flags */ 1644 ceph_encode_32(&p, 0); 1645 1646 /* version == 6, mds auth caps */ 1647 ceph_encode_32(&p, 0); 1648 1649 /* version == 7, oldest_client_tid */ 1650 ceph_encode_64(&p, mdsc->oldest_tid); 1651 1652 msg->front.iov_len = p - msg->front.iov_base; 1653 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1654 1655 return msg; 1656 } 1657 1658 /* 1659 * send session open request. 1660 * 1661 * called under mdsc->mutex 1662 */ 1663 static int __open_session(struct ceph_mds_client *mdsc, 1664 struct ceph_mds_session *session) 1665 { 1666 struct ceph_msg *msg; 1667 int mstate; 1668 int mds = session->s_mds; 1669 1670 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) 1671 return -EIO; 1672 1673 /* wait for mds to go active? */ 1674 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 1675 doutc(mdsc->fsc->client, "open_session to mds%d (%s)\n", mds, 1676 ceph_mds_state_name(mstate)); 1677 session->s_state = CEPH_MDS_SESSION_OPENING; 1678 session->s_renew_requested = jiffies; 1679 1680 /* send connect message */ 1681 msg = create_session_full_msg(mdsc, CEPH_SESSION_REQUEST_OPEN, 1682 session->s_seq); 1683 if (IS_ERR(msg)) 1684 return PTR_ERR(msg); 1685 ceph_con_send(&session->s_con, msg); 1686 return 0; 1687 } 1688 1689 /* 1690 * open sessions for any export targets for the given mds 1691 * 1692 * called under mdsc->mutex 1693 */ 1694 static struct ceph_mds_session * 1695 __open_export_target_session(struct ceph_mds_client *mdsc, int target) 1696 { 1697 struct ceph_mds_session *session; 1698 int ret; 1699 1700 session = __ceph_lookup_mds_session(mdsc, target); 1701 if (!session) { 1702 session = register_session(mdsc, target); 1703 if (IS_ERR(session)) 1704 return session; 1705 } 1706 if (session->s_state == CEPH_MDS_SESSION_NEW || 1707 session->s_state == CEPH_MDS_SESSION_CLOSING) { 1708 ret = __open_session(mdsc, session); 1709 if (ret) 1710 return ERR_PTR(ret); 1711 } 1712 1713 return session; 1714 } 1715 1716 struct ceph_mds_session * 1717 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target) 1718 { 1719 struct ceph_mds_session *session; 1720 struct ceph_client *cl = mdsc->fsc->client; 1721 1722 doutc(cl, "to mds%d\n", target); 1723 1724 mutex_lock(&mdsc->mutex); 1725 session = __open_export_target_session(mdsc, target); 1726 mutex_unlock(&mdsc->mutex); 1727 1728 return session; 1729 } 1730 1731 static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 1732 struct ceph_mds_session *session) 1733 { 1734 struct ceph_mds_info *mi; 1735 struct ceph_mds_session *ts; 1736 int i, mds = session->s_mds; 1737 struct ceph_client *cl = mdsc->fsc->client; 1738 1739 if (mds >= mdsc->mdsmap->possible_max_rank) 1740 return; 1741 1742 mi = &mdsc->mdsmap->m_info[mds]; 1743 doutc(cl, "for mds%d (%d targets)\n", session->s_mds, 1744 mi->num_export_targets); 1745 1746 for (i = 0; i < mi->num_export_targets; i++) { 1747 ts = __open_export_target_session(mdsc, mi->export_targets[i]); 1748 ceph_put_mds_session(ts); 1749 } 1750 } 1751 1752 /* 1753 * session caps 1754 */ 1755 1756 static void detach_cap_releases(struct ceph_mds_session *session, 1757 struct list_head *target) 1758 { 1759 struct ceph_client *cl = session->s_mdsc->fsc->client; 1760 1761 lockdep_assert_held(&session->s_cap_lock); 1762 1763 list_splice_init(&session->s_cap_releases, target); 1764 session->s_num_cap_releases = 0; 1765 doutc(cl, "mds%d\n", session->s_mds); 1766 } 1767 1768 static void dispose_cap_releases(struct ceph_mds_client *mdsc, 1769 struct list_head *dispose) 1770 { 1771 while (!list_empty(dispose)) { 1772 struct ceph_cap *cap; 1773 /* zero out the in-progress message */ 1774 cap = list_first_entry(dispose, struct ceph_cap, session_caps); 1775 list_del(&cap->session_caps); 1776 ceph_put_cap(mdsc, cap); 1777 } 1778 } 1779 1780 static void cleanup_session_requests(struct ceph_mds_client *mdsc, 1781 struct ceph_mds_session *session) 1782 { 1783 struct ceph_client *cl = mdsc->fsc->client; 1784 struct ceph_mds_request *req; 1785 struct rb_node *p; 1786 1787 doutc(cl, "mds%d\n", session->s_mds); 1788 mutex_lock(&mdsc->mutex); 1789 while (!list_empty(&session->s_unsafe)) { 1790 req = list_first_entry(&session->s_unsafe, 1791 struct ceph_mds_request, r_unsafe_item); 1792 pr_warn_ratelimited_client(cl, " dropping unsafe request %llu\n", 1793 req->r_tid); 1794 if (req->r_target_inode) 1795 mapping_set_error(req->r_target_inode->i_mapping, -EIO); 1796 if (req->r_unsafe_dir) 1797 mapping_set_error(req->r_unsafe_dir->i_mapping, -EIO); 1798 __unregister_request(mdsc, req); 1799 } 1800 /* zero r_attempts, so kick_requests() will re-send requests */ 1801 p = rb_first(&mdsc->request_tree); 1802 while (p) { 1803 req = rb_entry(p, struct ceph_mds_request, r_node); 1804 p = rb_next(p); 1805 if (req->r_session && 1806 req->r_session->s_mds == session->s_mds) 1807 req->r_attempts = 0; 1808 } 1809 mutex_unlock(&mdsc->mutex); 1810 } 1811 1812 /* 1813 * Helper to safely iterate over all caps associated with a session, with 1814 * special care taken to handle a racing __ceph_remove_cap(). 1815 * 1816 * Caller must hold session s_mutex. 1817 */ 1818 int ceph_iterate_session_caps(struct ceph_mds_session *session, 1819 int (*cb)(struct inode *, int mds, void *), 1820 void *arg) 1821 { 1822 struct ceph_client *cl = session->s_mdsc->fsc->client; 1823 struct list_head *p; 1824 struct ceph_cap *cap; 1825 struct inode *inode, *last_inode = NULL; 1826 struct ceph_cap *old_cap = NULL; 1827 int ret; 1828 1829 doutc(cl, "%p mds%d\n", session, session->s_mds); 1830 spin_lock(&session->s_cap_lock); 1831 p = session->s_caps.next; 1832 while (p != &session->s_caps) { 1833 int mds; 1834 1835 cap = list_entry(p, struct ceph_cap, session_caps); 1836 inode = igrab(&cap->ci->netfs.inode); 1837 if (!inode) { 1838 p = p->next; 1839 continue; 1840 } 1841 session->s_cap_iterator = cap; 1842 mds = cap->mds; 1843 spin_unlock(&session->s_cap_lock); 1844 1845 if (last_inode) { 1846 iput(last_inode); 1847 last_inode = NULL; 1848 } 1849 if (old_cap) { 1850 ceph_put_cap(session->s_mdsc, old_cap); 1851 old_cap = NULL; 1852 } 1853 1854 ret = cb(inode, mds, arg); 1855 last_inode = inode; 1856 1857 spin_lock(&session->s_cap_lock); 1858 p = p->next; 1859 if (!cap->ci) { 1860 doutc(cl, "finishing cap %p removal\n", cap); 1861 BUG_ON(cap->session != session); 1862 cap->session = NULL; 1863 list_del_init(&cap->session_caps); 1864 session->s_nr_caps--; 1865 atomic64_dec(&session->s_mdsc->metric.total_caps); 1866 if (cap->queue_release) 1867 __ceph_queue_cap_release(session, cap); 1868 else 1869 old_cap = cap; /* put_cap it w/o locks held */ 1870 } 1871 if (ret < 0) 1872 goto out; 1873 } 1874 ret = 0; 1875 out: 1876 session->s_cap_iterator = NULL; 1877 spin_unlock(&session->s_cap_lock); 1878 1879 iput(last_inode); 1880 if (old_cap) 1881 ceph_put_cap(session->s_mdsc, old_cap); 1882 1883 return ret; 1884 } 1885 1886 static int remove_session_caps_cb(struct inode *inode, int mds, void *arg) 1887 { 1888 struct ceph_inode_info *ci = ceph_inode(inode); 1889 struct ceph_client *cl = ceph_inode_to_client(inode); 1890 bool invalidate = false; 1891 struct ceph_cap *cap; 1892 int iputs = 0; 1893 1894 spin_lock(&ci->i_ceph_lock); 1895 cap = __get_cap_for_mds(ci, mds); 1896 if (cap) { 1897 doutc(cl, " removing cap %p, ci is %p, inode is %p\n", 1898 cap, ci, &ci->netfs.inode); 1899 1900 iputs = ceph_purge_inode_cap(inode, cap, &invalidate); 1901 } 1902 spin_unlock(&ci->i_ceph_lock); 1903 1904 if (cap) 1905 wake_up_all(&ci->i_cap_wq); 1906 if (invalidate) 1907 ceph_queue_invalidate(inode); 1908 while (iputs--) 1909 iput(inode); 1910 return 0; 1911 } 1912 1913 /* 1914 * caller must hold session s_mutex 1915 */ 1916 static void remove_session_caps(struct ceph_mds_session *session) 1917 { 1918 struct ceph_fs_client *fsc = session->s_mdsc->fsc; 1919 struct super_block *sb = fsc->sb; 1920 LIST_HEAD(dispose); 1921 1922 doutc(fsc->client, "on %p\n", session); 1923 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc); 1924 1925 wake_up_all(&fsc->mdsc->cap_flushing_wq); 1926 1927 spin_lock(&session->s_cap_lock); 1928 if (session->s_nr_caps > 0) { 1929 struct inode *inode; 1930 struct ceph_cap *cap, *prev = NULL; 1931 struct ceph_vino vino; 1932 /* 1933 * iterate_session_caps() skips inodes that are being 1934 * deleted, we need to wait until deletions are complete. 1935 * __wait_on_freeing_inode() is designed for the job, 1936 * but it is not exported, so use lookup inode function 1937 * to access it. 1938 */ 1939 while (!list_empty(&session->s_caps)) { 1940 cap = list_entry(session->s_caps.next, 1941 struct ceph_cap, session_caps); 1942 if (cap == prev) 1943 break; 1944 prev = cap; 1945 vino = cap->ci->i_vino; 1946 spin_unlock(&session->s_cap_lock); 1947 1948 inode = ceph_find_inode(sb, vino); 1949 iput(inode); 1950 1951 spin_lock(&session->s_cap_lock); 1952 } 1953 } 1954 1955 // drop cap expires and unlock s_cap_lock 1956 detach_cap_releases(session, &dispose); 1957 1958 BUG_ON(session->s_nr_caps > 0); 1959 BUG_ON(!list_empty(&session->s_cap_flushing)); 1960 spin_unlock(&session->s_cap_lock); 1961 dispose_cap_releases(session->s_mdsc, &dispose); 1962 } 1963 1964 enum { 1965 RECONNECT, 1966 RENEWCAPS, 1967 FORCE_RO, 1968 }; 1969 1970 /* 1971 * wake up any threads waiting on this session's caps. if the cap is 1972 * old (didn't get renewed on the client reconnect), remove it now. 1973 * 1974 * caller must hold s_mutex. 1975 */ 1976 static int wake_up_session_cb(struct inode *inode, int mds, void *arg) 1977 { 1978 struct ceph_inode_info *ci = ceph_inode(inode); 1979 unsigned long ev = (unsigned long)arg; 1980 1981 if (ev == RECONNECT) { 1982 spin_lock(&ci->i_ceph_lock); 1983 ci->i_wanted_max_size = 0; 1984 ci->i_requested_max_size = 0; 1985 spin_unlock(&ci->i_ceph_lock); 1986 } else if (ev == RENEWCAPS) { 1987 struct ceph_cap *cap; 1988 1989 spin_lock(&ci->i_ceph_lock); 1990 cap = __get_cap_for_mds(ci, mds); 1991 /* mds did not re-issue stale cap */ 1992 if (cap && cap->cap_gen < atomic_read(&cap->session->s_cap_gen)) 1993 cap->issued = cap->implemented = CEPH_CAP_PIN; 1994 spin_unlock(&ci->i_ceph_lock); 1995 } else if (ev == FORCE_RO) { 1996 } 1997 wake_up_all(&ci->i_cap_wq); 1998 return 0; 1999 } 2000 2001 static void wake_up_session_caps(struct ceph_mds_session *session, int ev) 2002 { 2003 struct ceph_client *cl = session->s_mdsc->fsc->client; 2004 2005 doutc(cl, "session %p mds%d\n", session, session->s_mds); 2006 ceph_iterate_session_caps(session, wake_up_session_cb, 2007 (void *)(unsigned long)ev); 2008 } 2009 2010 /* 2011 * Send periodic message to MDS renewing all currently held caps. The 2012 * ack will reset the expiration for all caps from this session. 2013 * 2014 * caller holds s_mutex 2015 */ 2016 static int send_renew_caps(struct ceph_mds_client *mdsc, 2017 struct ceph_mds_session *session) 2018 { 2019 struct ceph_client *cl = mdsc->fsc->client; 2020 struct ceph_msg *msg; 2021 int state; 2022 2023 if (time_after_eq(jiffies, session->s_cap_ttl) && 2024 time_after_eq(session->s_cap_ttl, session->s_renew_requested)) 2025 pr_info_client(cl, "mds%d caps stale\n", session->s_mds); 2026 session->s_renew_requested = jiffies; 2027 2028 /* do not try to renew caps until a recovering mds has reconnected 2029 * with its clients. */ 2030 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); 2031 if (state < CEPH_MDS_STATE_RECONNECT) { 2032 doutc(cl, "ignoring mds%d (%s)\n", session->s_mds, 2033 ceph_mds_state_name(state)); 2034 return 0; 2035 } 2036 2037 doutc(cl, "to mds%d (%s)\n", session->s_mds, 2038 ceph_mds_state_name(state)); 2039 msg = create_session_full_msg(mdsc, CEPH_SESSION_REQUEST_RENEWCAPS, 2040 ++session->s_renew_seq); 2041 if (IS_ERR(msg)) 2042 return PTR_ERR(msg); 2043 ceph_con_send(&session->s_con, msg); 2044 return 0; 2045 } 2046 2047 static int send_flushmsg_ack(struct ceph_mds_client *mdsc, 2048 struct ceph_mds_session *session, u64 seq) 2049 { 2050 struct ceph_client *cl = mdsc->fsc->client; 2051 struct ceph_msg *msg; 2052 2053 doutc(cl, "to mds%d (%s)s seq %lld\n", session->s_mds, 2054 ceph_session_state_name(session->s_state), seq); 2055 msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq); 2056 if (!msg) 2057 return -ENOMEM; 2058 ceph_con_send(&session->s_con, msg); 2059 return 0; 2060 } 2061 2062 2063 /* 2064 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 2065 * 2066 * Called under session->s_mutex 2067 */ 2068 static void renewed_caps(struct ceph_mds_client *mdsc, 2069 struct ceph_mds_session *session, int is_renew) 2070 { 2071 struct ceph_client *cl = mdsc->fsc->client; 2072 int was_stale; 2073 int wake = 0; 2074 2075 spin_lock(&session->s_cap_lock); 2076 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl); 2077 2078 session->s_cap_ttl = session->s_renew_requested + 2079 mdsc->mdsmap->m_session_timeout*HZ; 2080 2081 if (was_stale) { 2082 if (time_before(jiffies, session->s_cap_ttl)) { 2083 pr_info_client(cl, "mds%d caps renewed\n", 2084 session->s_mds); 2085 wake = 1; 2086 } else { 2087 pr_info_client(cl, "mds%d caps still stale\n", 2088 session->s_mds); 2089 } 2090 } 2091 doutc(cl, "mds%d ttl now %lu, was %s, now %s\n", session->s_mds, 2092 session->s_cap_ttl, was_stale ? "stale" : "fresh", 2093 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); 2094 spin_unlock(&session->s_cap_lock); 2095 2096 if (wake) 2097 wake_up_session_caps(session, RENEWCAPS); 2098 } 2099 2100 /* 2101 * send a session close request 2102 */ 2103 static int request_close_session(struct ceph_mds_session *session) 2104 { 2105 struct ceph_client *cl = session->s_mdsc->fsc->client; 2106 struct ceph_msg *msg; 2107 2108 doutc(cl, "mds%d state %s seq %lld\n", session->s_mds, 2109 ceph_session_state_name(session->s_state), session->s_seq); 2110 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE, 2111 session->s_seq); 2112 if (!msg) 2113 return -ENOMEM; 2114 ceph_con_send(&session->s_con, msg); 2115 return 1; 2116 } 2117 2118 /* 2119 * Called with s_mutex held. 2120 */ 2121 static int __close_session(struct ceph_mds_client *mdsc, 2122 struct ceph_mds_session *session) 2123 { 2124 if (session->s_state >= CEPH_MDS_SESSION_CLOSING) 2125 return 0; 2126 session->s_state = CEPH_MDS_SESSION_CLOSING; 2127 return request_close_session(session); 2128 } 2129 2130 static bool drop_negative_children(struct dentry *dentry) 2131 { 2132 struct dentry *child; 2133 bool all_negative = true; 2134 2135 if (!d_is_dir(dentry)) 2136 goto out; 2137 2138 spin_lock(&dentry->d_lock); 2139 hlist_for_each_entry(child, &dentry->d_children, d_sib) { 2140 if (d_really_is_positive(child)) { 2141 all_negative = false; 2142 break; 2143 } 2144 } 2145 spin_unlock(&dentry->d_lock); 2146 2147 if (all_negative) 2148 shrink_dcache_parent(dentry); 2149 out: 2150 return all_negative; 2151 } 2152 2153 /* 2154 * Trim old(er) caps. 2155 * 2156 * Because we can't cache an inode without one or more caps, we do 2157 * this indirectly: if a cap is unused, we prune its aliases, at which 2158 * point the inode will hopefully get dropped to. 2159 * 2160 * Yes, this is a bit sloppy. Our only real goal here is to respond to 2161 * memory pressure from the MDS, though, so it needn't be perfect. 2162 */ 2163 static int trim_caps_cb(struct inode *inode, int mds, void *arg) 2164 { 2165 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); 2166 struct ceph_client *cl = mdsc->fsc->client; 2167 int *remaining = arg; 2168 struct ceph_inode_info *ci = ceph_inode(inode); 2169 int used, wanted, oissued, mine; 2170 struct ceph_cap *cap; 2171 2172 if (*remaining <= 0) 2173 return -1; 2174 2175 spin_lock(&ci->i_ceph_lock); 2176 cap = __get_cap_for_mds(ci, mds); 2177 if (!cap) { 2178 spin_unlock(&ci->i_ceph_lock); 2179 return 0; 2180 } 2181 mine = cap->issued | cap->implemented; 2182 used = __ceph_caps_used(ci); 2183 wanted = __ceph_caps_file_wanted(ci); 2184 oissued = __ceph_caps_issued_other(ci, cap); 2185 2186 doutc(cl, "%p %llx.%llx cap %p mine %s oissued %s used %s wanted %s\n", 2187 inode, ceph_vinop(inode), cap, ceph_cap_string(mine), 2188 ceph_cap_string(oissued), ceph_cap_string(used), 2189 ceph_cap_string(wanted)); 2190 if (cap == ci->i_auth_cap) { 2191 if (ci->i_dirty_caps || ci->i_flushing_caps || 2192 !list_empty(&ci->i_cap_snaps)) 2193 goto out; 2194 if ((used | wanted) & CEPH_CAP_ANY_WR) 2195 goto out; 2196 /* Note: it's possible that i_filelock_ref becomes non-zero 2197 * after dropping auth caps. It doesn't hurt because reply 2198 * of lock mds request will re-add auth caps. */ 2199 if (atomic_read(&ci->i_filelock_ref) > 0) 2200 goto out; 2201 } 2202 /* The inode has cached pages, but it's no longer used. 2203 * we can safely drop it */ 2204 if (S_ISREG(inode->i_mode) && 2205 wanted == 0 && used == CEPH_CAP_FILE_CACHE && 2206 !(oissued & CEPH_CAP_FILE_CACHE)) { 2207 used = 0; 2208 oissued = 0; 2209 } 2210 if ((used | wanted) & ~oissued & mine) 2211 goto out; /* we need these caps */ 2212 2213 if (oissued) { 2214 /* we aren't the only cap.. just remove us */ 2215 ceph_remove_cap(mdsc, cap, true); 2216 (*remaining)--; 2217 } else { 2218 struct dentry *dentry; 2219 /* try dropping referring dentries */ 2220 spin_unlock(&ci->i_ceph_lock); 2221 dentry = d_find_any_alias(inode); 2222 if (dentry && drop_negative_children(dentry)) { 2223 int count; 2224 dput(dentry); 2225 d_prune_aliases(inode); 2226 count = icount_read(inode); 2227 if (count == 1) 2228 (*remaining)--; 2229 doutc(cl, "%p %llx.%llx cap %p pruned, count now %d\n", 2230 inode, ceph_vinop(inode), cap, count); 2231 } else { 2232 dput(dentry); 2233 } 2234 return 0; 2235 } 2236 2237 out: 2238 spin_unlock(&ci->i_ceph_lock); 2239 return 0; 2240 } 2241 2242 /* 2243 * Trim session cap count down to some max number. 2244 */ 2245 int ceph_trim_caps(struct ceph_mds_client *mdsc, 2246 struct ceph_mds_session *session, 2247 int max_caps) 2248 { 2249 struct ceph_client *cl = mdsc->fsc->client; 2250 int trim_caps = session->s_nr_caps - max_caps; 2251 2252 doutc(cl, "mds%d start: %d / %d, trim %d\n", session->s_mds, 2253 session->s_nr_caps, max_caps, trim_caps); 2254 if (trim_caps > 0) { 2255 int remaining = trim_caps; 2256 2257 ceph_iterate_session_caps(session, trim_caps_cb, &remaining); 2258 doutc(cl, "mds%d done: %d / %d, trimmed %d\n", 2259 session->s_mds, session->s_nr_caps, max_caps, 2260 trim_caps - remaining); 2261 } 2262 2263 ceph_flush_session_cap_releases(mdsc, session); 2264 return 0; 2265 } 2266 2267 static int check_caps_flush(struct ceph_mds_client *mdsc, 2268 u64 want_flush_tid) 2269 { 2270 struct ceph_client *cl = mdsc->fsc->client; 2271 int ret = 1; 2272 2273 spin_lock(&mdsc->cap_dirty_lock); 2274 if (!list_empty(&mdsc->cap_flush_list)) { 2275 struct ceph_cap_flush *cf = 2276 list_first_entry(&mdsc->cap_flush_list, 2277 struct ceph_cap_flush, g_list); 2278 if (cf->tid <= want_flush_tid) { 2279 doutc(cl, "still flushing tid %llu <= %llu\n", 2280 cf->tid, want_flush_tid); 2281 ret = 0; 2282 } 2283 } 2284 spin_unlock(&mdsc->cap_dirty_lock); 2285 return ret; 2286 } 2287 2288 /* 2289 * flush all dirty inode data to disk. 2290 * 2291 * returns true if we've flushed through want_flush_tid 2292 */ 2293 static void wait_caps_flush(struct ceph_mds_client *mdsc, 2294 u64 want_flush_tid) 2295 { 2296 struct ceph_client *cl = mdsc->fsc->client; 2297 2298 doutc(cl, "want %llu\n", want_flush_tid); 2299 2300 wait_event(mdsc->cap_flushing_wq, 2301 check_caps_flush(mdsc, want_flush_tid)); 2302 2303 doutc(cl, "ok, flushed thru %llu\n", want_flush_tid); 2304 } 2305 2306 /* 2307 * called under s_mutex 2308 */ 2309 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 2310 struct ceph_mds_session *session) 2311 { 2312 struct ceph_client *cl = mdsc->fsc->client; 2313 struct ceph_msg *msg = NULL; 2314 struct ceph_mds_cap_release *head; 2315 struct ceph_mds_cap_item *item; 2316 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; 2317 struct ceph_cap *cap; 2318 LIST_HEAD(tmp_list); 2319 int num_cap_releases; 2320 __le32 barrier, *cap_barrier; 2321 2322 down_read(&osdc->lock); 2323 barrier = cpu_to_le32(osdc->epoch_barrier); 2324 up_read(&osdc->lock); 2325 2326 spin_lock(&session->s_cap_lock); 2327 again: 2328 list_splice_init(&session->s_cap_releases, &tmp_list); 2329 num_cap_releases = session->s_num_cap_releases; 2330 session->s_num_cap_releases = 0; 2331 spin_unlock(&session->s_cap_lock); 2332 2333 while (!list_empty(&tmp_list)) { 2334 if (!msg) { 2335 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, 2336 PAGE_SIZE, GFP_NOFS, false); 2337 if (!msg) 2338 goto out_err; 2339 head = msg->front.iov_base; 2340 head->num = cpu_to_le32(0); 2341 msg->front.iov_len = sizeof(*head); 2342 2343 msg->hdr.version = cpu_to_le16(2); 2344 msg->hdr.compat_version = cpu_to_le16(1); 2345 } 2346 2347 cap = list_first_entry(&tmp_list, struct ceph_cap, 2348 session_caps); 2349 list_del(&cap->session_caps); 2350 num_cap_releases--; 2351 2352 head = msg->front.iov_base; 2353 put_unaligned_le32(get_unaligned_le32(&head->num) + 1, 2354 &head->num); 2355 item = msg->front.iov_base + msg->front.iov_len; 2356 item->ino = cpu_to_le64(cap->cap_ino); 2357 item->cap_id = cpu_to_le64(cap->cap_id); 2358 item->migrate_seq = cpu_to_le32(cap->mseq); 2359 item->issue_seq = cpu_to_le32(cap->issue_seq); 2360 msg->front.iov_len += sizeof(*item); 2361 2362 ceph_put_cap(mdsc, cap); 2363 2364 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { 2365 // Append cap_barrier field 2366 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2367 *cap_barrier = barrier; 2368 msg->front.iov_len += sizeof(*cap_barrier); 2369 2370 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2371 doutc(cl, "mds%d %p\n", session->s_mds, msg); 2372 ceph_con_send(&session->s_con, msg); 2373 msg = NULL; 2374 } 2375 } 2376 2377 BUG_ON(num_cap_releases != 0); 2378 2379 spin_lock(&session->s_cap_lock); 2380 if (!list_empty(&session->s_cap_releases)) 2381 goto again; 2382 spin_unlock(&session->s_cap_lock); 2383 2384 if (msg) { 2385 // Append cap_barrier field 2386 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2387 *cap_barrier = barrier; 2388 msg->front.iov_len += sizeof(*cap_barrier); 2389 2390 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2391 doutc(cl, "mds%d %p\n", session->s_mds, msg); 2392 ceph_con_send(&session->s_con, msg); 2393 } 2394 return; 2395 out_err: 2396 pr_err_client(cl, "mds%d, failed to allocate message\n", 2397 session->s_mds); 2398 spin_lock(&session->s_cap_lock); 2399 list_splice(&tmp_list, &session->s_cap_releases); 2400 session->s_num_cap_releases += num_cap_releases; 2401 spin_unlock(&session->s_cap_lock); 2402 } 2403 2404 static void ceph_cap_release_work(struct work_struct *work) 2405 { 2406 struct ceph_mds_session *session = 2407 container_of(work, struct ceph_mds_session, s_cap_release_work); 2408 2409 mutex_lock(&session->s_mutex); 2410 if (session->s_state == CEPH_MDS_SESSION_OPEN || 2411 session->s_state == CEPH_MDS_SESSION_HUNG) 2412 ceph_send_cap_releases(session->s_mdsc, session); 2413 mutex_unlock(&session->s_mutex); 2414 ceph_put_mds_session(session); 2415 } 2416 2417 void ceph_flush_session_cap_releases(struct ceph_mds_client *mdsc, 2418 struct ceph_mds_session *session) 2419 { 2420 struct ceph_client *cl = mdsc->fsc->client; 2421 if (mdsc->stopping) 2422 return; 2423 2424 ceph_get_mds_session(session); 2425 if (queue_work(mdsc->fsc->cap_wq, 2426 &session->s_cap_release_work)) { 2427 doutc(cl, "cap release work queued\n"); 2428 } else { 2429 ceph_put_mds_session(session); 2430 doutc(cl, "failed to queue cap release work\n"); 2431 } 2432 } 2433 2434 /* 2435 * caller holds session->s_cap_lock 2436 */ 2437 void __ceph_queue_cap_release(struct ceph_mds_session *session, 2438 struct ceph_cap *cap) 2439 { 2440 list_add_tail(&cap->session_caps, &session->s_cap_releases); 2441 session->s_num_cap_releases++; 2442 2443 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE)) 2444 ceph_flush_session_cap_releases(session->s_mdsc, session); 2445 } 2446 2447 static void ceph_cap_reclaim_work(struct work_struct *work) 2448 { 2449 struct ceph_mds_client *mdsc = 2450 container_of(work, struct ceph_mds_client, cap_reclaim_work); 2451 int ret = ceph_trim_dentries(mdsc); 2452 if (ret == -EAGAIN) 2453 ceph_queue_cap_reclaim_work(mdsc); 2454 } 2455 2456 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc) 2457 { 2458 struct ceph_client *cl = mdsc->fsc->client; 2459 if (mdsc->stopping) 2460 return; 2461 2462 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) { 2463 doutc(cl, "caps reclaim work queued\n"); 2464 } else { 2465 doutc(cl, "failed to queue caps release work\n"); 2466 } 2467 } 2468 2469 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr) 2470 { 2471 int val; 2472 if (!nr) 2473 return; 2474 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending); 2475 if ((val % CEPH_CAPS_PER_RELEASE) < nr) { 2476 atomic_set(&mdsc->cap_reclaim_pending, 0); 2477 ceph_queue_cap_reclaim_work(mdsc); 2478 } 2479 } 2480 2481 void ceph_queue_cap_unlink_work(struct ceph_mds_client *mdsc) 2482 { 2483 struct ceph_client *cl = mdsc->fsc->client; 2484 if (mdsc->stopping) 2485 return; 2486 2487 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_unlink_work)) { 2488 doutc(cl, "caps unlink work queued\n"); 2489 } else { 2490 doutc(cl, "failed to queue caps unlink work\n"); 2491 } 2492 } 2493 2494 static void ceph_cap_unlink_work(struct work_struct *work) 2495 { 2496 struct ceph_mds_client *mdsc = 2497 container_of(work, struct ceph_mds_client, cap_unlink_work); 2498 struct ceph_client *cl = mdsc->fsc->client; 2499 2500 doutc(cl, "begin\n"); 2501 spin_lock(&mdsc->cap_delay_lock); 2502 while (!list_empty(&mdsc->cap_unlink_delay_list)) { 2503 struct ceph_inode_info *ci; 2504 struct inode *inode; 2505 2506 ci = list_first_entry(&mdsc->cap_unlink_delay_list, 2507 struct ceph_inode_info, 2508 i_cap_delay_list); 2509 list_del_init(&ci->i_cap_delay_list); 2510 2511 inode = igrab(&ci->netfs.inode); 2512 if (inode) { 2513 spin_unlock(&mdsc->cap_delay_lock); 2514 doutc(cl, "on %p %llx.%llx\n", inode, 2515 ceph_vinop(inode)); 2516 ceph_check_caps(ci, CHECK_CAPS_FLUSH); 2517 iput(inode); 2518 spin_lock(&mdsc->cap_delay_lock); 2519 } 2520 } 2521 spin_unlock(&mdsc->cap_delay_lock); 2522 doutc(cl, "done\n"); 2523 } 2524 2525 /* 2526 * requests 2527 */ 2528 2529 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, 2530 struct inode *dir) 2531 { 2532 struct ceph_inode_info *ci = ceph_inode(dir); 2533 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; 2534 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options; 2535 size_t size = sizeof(struct ceph_mds_reply_dir_entry); 2536 unsigned int num_entries; 2537 u64 bytes_count; 2538 int order; 2539 2540 spin_lock(&ci->i_ceph_lock); 2541 num_entries = ci->i_files + ci->i_subdirs; 2542 spin_unlock(&ci->i_ceph_lock); 2543 num_entries = max(num_entries, 1U); 2544 num_entries = min(num_entries, opt->max_readdir); 2545 2546 bytes_count = (u64)size * num_entries; 2547 if (unlikely(bytes_count > ULONG_MAX)) 2548 bytes_count = ULONG_MAX; 2549 2550 order = get_order((unsigned long)bytes_count); 2551 while (order >= 0) { 2552 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL | 2553 __GFP_NOWARN | 2554 __GFP_ZERO, 2555 order); 2556 if (rinfo->dir_entries) 2557 break; 2558 order--; 2559 } 2560 if (!rinfo->dir_entries || unlikely(order < 0)) 2561 return -ENOMEM; 2562 2563 num_entries = (PAGE_SIZE << order) / size; 2564 num_entries = min(num_entries, opt->max_readdir); 2565 2566 rinfo->dir_buf_size = PAGE_SIZE << order; 2567 req->r_num_caps = num_entries + 1; 2568 req->r_args.readdir.max_entries = cpu_to_le32(num_entries); 2569 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes); 2570 return 0; 2571 } 2572 2573 /* 2574 * Create an mds request. 2575 */ 2576 struct ceph_mds_request * 2577 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) 2578 { 2579 struct ceph_mds_request *req; 2580 2581 req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS); 2582 if (!req) 2583 return ERR_PTR(-ENOMEM); 2584 2585 mutex_init(&req->r_fill_mutex); 2586 req->r_mdsc = mdsc; 2587 req->r_started = jiffies; 2588 req->r_start_latency = ktime_get(); 2589 req->r_resend_mds = -1; 2590 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 2591 INIT_LIST_HEAD(&req->r_unsafe_target_item); 2592 req->r_fmode = -1; 2593 req->r_feature_needed = -1; 2594 kref_init(&req->r_kref); 2595 RB_CLEAR_NODE(&req->r_node); 2596 INIT_LIST_HEAD(&req->r_wait); 2597 init_completion(&req->r_completion); 2598 init_completion(&req->r_safe_completion); 2599 INIT_LIST_HEAD(&req->r_unsafe_item); 2600 2601 ktime_get_coarse_real_ts64(&req->r_stamp); 2602 2603 req->r_op = op; 2604 req->r_direct_mode = mode; 2605 return req; 2606 } 2607 2608 /* 2609 * return oldest (lowest) request, tid in request tree, 0 if none. 2610 * 2611 * called under mdsc->mutex. 2612 */ 2613 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) 2614 { 2615 if (RB_EMPTY_ROOT(&mdsc->request_tree)) 2616 return NULL; 2617 return rb_entry(rb_first(&mdsc->request_tree), 2618 struct ceph_mds_request, r_node); 2619 } 2620 2621 static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) 2622 { 2623 return mdsc->oldest_tid; 2624 } 2625 2626 #if IS_ENABLED(CONFIG_FS_ENCRYPTION) 2627 static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen) 2628 { 2629 struct inode *dir = req->r_parent; 2630 struct dentry *dentry = req->r_dentry; 2631 const struct qstr *name = req->r_dname; 2632 u8 *cryptbuf = NULL; 2633 u32 len = 0; 2634 int ret = 0; 2635 2636 /* only encode if we have parent and dentry */ 2637 if (!dir || !dentry) 2638 goto success; 2639 2640 /* No-op unless this is encrypted */ 2641 if (!IS_ENCRYPTED(dir)) 2642 goto success; 2643 2644 ret = ceph_fscrypt_prepare_readdir(dir); 2645 if (ret < 0) 2646 return ERR_PTR(ret); 2647 2648 /* No key? Just ignore it. */ 2649 if (!fscrypt_has_encryption_key(dir)) 2650 goto success; 2651 2652 if (!name) 2653 name = &dentry->d_name; 2654 2655 if (!fscrypt_fname_encrypted_size(dir, name->len, NAME_MAX, &len)) { 2656 WARN_ON_ONCE(1); 2657 return ERR_PTR(-ENAMETOOLONG); 2658 } 2659 2660 /* No need to append altname if name is short enough */ 2661 if (len <= CEPH_NOHASH_NAME_MAX) { 2662 len = 0; 2663 goto success; 2664 } 2665 2666 cryptbuf = kmalloc(len, GFP_KERNEL); 2667 if (!cryptbuf) 2668 return ERR_PTR(-ENOMEM); 2669 2670 ret = fscrypt_fname_encrypt(dir, name, cryptbuf, len); 2671 if (ret) { 2672 kfree(cryptbuf); 2673 return ERR_PTR(ret); 2674 } 2675 success: 2676 *plen = len; 2677 return cryptbuf; 2678 } 2679 #else 2680 static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen) 2681 { 2682 *plen = 0; 2683 return NULL; 2684 } 2685 #endif 2686 2687 /** 2688 * ceph_mdsc_build_path - build a path string to a given dentry 2689 * @mdsc: mds client 2690 * @dentry: dentry to which path should be built 2691 * @path_info: output path, length, base ino+snap, and freepath ownership flag 2692 * @for_wire: is this path going to be sent to the MDS? 2693 * 2694 * Build a string that represents the path to the dentry. This is mostly called 2695 * for two different purposes: 2696 * 2697 * 1) we need to build a path string to send to the MDS (for_wire == true) 2698 * 2) we need a path string for local presentation (e.g. debugfs) 2699 * (for_wire == false) 2700 * 2701 * The path is built in reverse, starting with the dentry. Walk back up toward 2702 * the root, building the path until the first non-snapped inode is reached 2703 * (for_wire) or the root inode is reached (!for_wire). 2704 * 2705 * Encode hidden .snap dirs as a double /, i.e. 2706 * foo/.snap/bar -> foo//bar 2707 */ 2708 char *ceph_mdsc_build_path(struct ceph_mds_client *mdsc, struct dentry *dentry, 2709 struct ceph_path_info *path_info, int for_wire) 2710 { 2711 struct ceph_client *cl = mdsc->fsc->client; 2712 struct dentry *cur; 2713 struct inode *inode; 2714 char *path; 2715 int pos; 2716 unsigned seq; 2717 u64 base; 2718 2719 if (!dentry) 2720 return ERR_PTR(-EINVAL); 2721 2722 path = __getname(); 2723 if (!path) 2724 return ERR_PTR(-ENOMEM); 2725 retry: 2726 pos = PATH_MAX - 1; 2727 path[pos] = '\0'; 2728 2729 seq = read_seqbegin(&rename_lock); 2730 cur = dget(dentry); 2731 for (;;) { 2732 struct dentry *parent; 2733 2734 spin_lock(&cur->d_lock); 2735 inode = d_inode(cur); 2736 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 2737 doutc(cl, "path+%d: %p SNAPDIR\n", pos, cur); 2738 spin_unlock(&cur->d_lock); 2739 parent = dget_parent(cur); 2740 } else if (for_wire && inode && dentry != cur && 2741 ceph_snap(inode) == CEPH_NOSNAP) { 2742 spin_unlock(&cur->d_lock); 2743 pos++; /* get rid of any prepended '/' */ 2744 break; 2745 } else if (!for_wire || !IS_ENCRYPTED(d_inode(cur->d_parent))) { 2746 pos -= cur->d_name.len; 2747 if (pos < 0) { 2748 spin_unlock(&cur->d_lock); 2749 break; 2750 } 2751 memcpy(path + pos, cur->d_name.name, cur->d_name.len); 2752 spin_unlock(&cur->d_lock); 2753 parent = dget_parent(cur); 2754 } else { 2755 int len, ret; 2756 char buf[NAME_MAX]; 2757 2758 /* 2759 * Proactively copy name into buf, in case we need to 2760 * present it as-is. 2761 */ 2762 memcpy(buf, cur->d_name.name, cur->d_name.len); 2763 len = cur->d_name.len; 2764 spin_unlock(&cur->d_lock); 2765 parent = dget_parent(cur); 2766 2767 ret = ceph_fscrypt_prepare_readdir(d_inode(parent)); 2768 if (ret < 0) { 2769 dput(parent); 2770 dput(cur); 2771 __putname(path); 2772 return ERR_PTR(ret); 2773 } 2774 2775 if (fscrypt_has_encryption_key(d_inode(parent))) { 2776 len = ceph_encode_encrypted_dname(d_inode(parent), 2777 buf, len); 2778 if (len < 0) { 2779 dput(parent); 2780 dput(cur); 2781 __putname(path); 2782 return ERR_PTR(len); 2783 } 2784 } 2785 pos -= len; 2786 if (pos < 0) { 2787 dput(parent); 2788 break; 2789 } 2790 memcpy(path + pos, buf, len); 2791 } 2792 dput(cur); 2793 cur = parent; 2794 2795 /* Are we at the root? */ 2796 if (IS_ROOT(cur)) 2797 break; 2798 2799 /* Are we out of buffer? */ 2800 if (--pos < 0) 2801 break; 2802 2803 path[pos] = '/'; 2804 } 2805 inode = d_inode(cur); 2806 base = inode ? ceph_ino(inode) : 0; 2807 dput(cur); 2808 2809 if (read_seqretry(&rename_lock, seq)) 2810 goto retry; 2811 2812 if (pos < 0) { 2813 /* 2814 * The path is longer than PATH_MAX and this function 2815 * cannot ever succeed. Creating paths that long is 2816 * possible with Ceph, but Linux cannot use them. 2817 */ 2818 __putname(path); 2819 return ERR_PTR(-ENAMETOOLONG); 2820 } 2821 2822 /* Initialize the output structure */ 2823 memset(path_info, 0, sizeof(*path_info)); 2824 2825 path_info->vino.ino = base; 2826 path_info->pathlen = PATH_MAX - 1 - pos; 2827 path_info->path = path + pos; 2828 path_info->freepath = true; 2829 2830 /* Set snap from dentry if available */ 2831 if (d_inode(dentry)) 2832 path_info->vino.snap = ceph_snap(d_inode(dentry)); 2833 else 2834 path_info->vino.snap = CEPH_NOSNAP; 2835 2836 doutc(cl, "on %p %d built %llx '%.*s'\n", dentry, d_count(dentry), 2837 base, PATH_MAX - 1 - pos, path + pos); 2838 return path + pos; 2839 } 2840 2841 static int build_dentry_path(struct ceph_mds_client *mdsc, struct dentry *dentry, 2842 struct inode *dir, struct ceph_path_info *path_info, 2843 bool parent_locked) 2844 { 2845 char *path; 2846 2847 rcu_read_lock(); 2848 if (!dir) 2849 dir = d_inode_rcu(dentry->d_parent); 2850 if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP && 2851 !IS_ENCRYPTED(dir)) { 2852 path_info->vino.ino = ceph_ino(dir); 2853 path_info->vino.snap = ceph_snap(dir); 2854 rcu_read_unlock(); 2855 path_info->path = dentry->d_name.name; 2856 path_info->pathlen = dentry->d_name.len; 2857 path_info->freepath = false; 2858 return 0; 2859 } 2860 rcu_read_unlock(); 2861 path = ceph_mdsc_build_path(mdsc, dentry, path_info, 1); 2862 if (IS_ERR(path)) 2863 return PTR_ERR(path); 2864 /* 2865 * ceph_mdsc_build_path already fills path_info, including snap handling. 2866 */ 2867 return 0; 2868 } 2869 2870 static int build_inode_path(struct inode *inode, struct ceph_path_info *path_info) 2871 { 2872 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); 2873 struct dentry *dentry; 2874 char *path; 2875 2876 if (ceph_snap(inode) == CEPH_NOSNAP) { 2877 path_info->vino.ino = ceph_ino(inode); 2878 path_info->vino.snap = ceph_snap(inode); 2879 path_info->pathlen = 0; 2880 path_info->freepath = false; 2881 return 0; 2882 } 2883 dentry = d_find_alias(inode); 2884 path = ceph_mdsc_build_path(mdsc, dentry, path_info, 1); 2885 dput(dentry); 2886 if (IS_ERR(path)) 2887 return PTR_ERR(path); 2888 /* 2889 * ceph_mdsc_build_path already fills path_info, including snap from dentry. 2890 * Override with inode's snap since that's what this function is for. 2891 */ 2892 path_info->vino.snap = ceph_snap(inode); 2893 return 0; 2894 } 2895 2896 /* 2897 * request arguments may be specified via an inode *, a dentry *, or 2898 * an explicit ino+path. 2899 */ 2900 static int set_request_path_attr(struct ceph_mds_client *mdsc, struct inode *rinode, 2901 struct dentry *rdentry, struct inode *rdiri, 2902 const char *rpath, u64 rino, 2903 struct ceph_path_info *path_info, 2904 bool parent_locked) 2905 { 2906 struct ceph_client *cl = mdsc->fsc->client; 2907 int r = 0; 2908 2909 /* Initialize the output structure */ 2910 memset(path_info, 0, sizeof(*path_info)); 2911 2912 if (rinode) { 2913 r = build_inode_path(rinode, path_info); 2914 doutc(cl, " inode %p %llx.%llx\n", rinode, ceph_ino(rinode), 2915 ceph_snap(rinode)); 2916 } else if (rdentry) { 2917 r = build_dentry_path(mdsc, rdentry, rdiri, path_info, parent_locked); 2918 doutc(cl, " dentry %p %llx/%.*s\n", rdentry, path_info->vino.ino, 2919 path_info->pathlen, path_info->path); 2920 } else if (rpath || rino) { 2921 path_info->vino.ino = rino; 2922 path_info->vino.snap = CEPH_NOSNAP; 2923 path_info->path = rpath; 2924 path_info->pathlen = rpath ? strlen(rpath) : 0; 2925 path_info->freepath = false; 2926 2927 doutc(cl, " path %.*s\n", path_info->pathlen, rpath); 2928 } 2929 2930 return r; 2931 } 2932 2933 static void encode_mclientrequest_tail(void **p, 2934 const struct ceph_mds_request *req) 2935 { 2936 struct ceph_timespec ts; 2937 int i; 2938 2939 ceph_encode_timespec64(&ts, &req->r_stamp); 2940 ceph_encode_copy(p, &ts, sizeof(ts)); 2941 2942 /* v4: gid_list */ 2943 ceph_encode_32(p, req->r_cred->group_info->ngroups); 2944 for (i = 0; i < req->r_cred->group_info->ngroups; i++) 2945 ceph_encode_64(p, from_kgid(&init_user_ns, 2946 req->r_cred->group_info->gid[i])); 2947 2948 /* v5: altname */ 2949 ceph_encode_32(p, req->r_altname_len); 2950 ceph_encode_copy(p, req->r_altname, req->r_altname_len); 2951 2952 /* v6: fscrypt_auth and fscrypt_file */ 2953 if (req->r_fscrypt_auth) { 2954 u32 authlen = ceph_fscrypt_auth_len(req->r_fscrypt_auth); 2955 2956 ceph_encode_32(p, authlen); 2957 ceph_encode_copy(p, req->r_fscrypt_auth, authlen); 2958 } else { 2959 ceph_encode_32(p, 0); 2960 } 2961 if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags)) { 2962 ceph_encode_32(p, sizeof(__le64)); 2963 ceph_encode_64(p, req->r_fscrypt_file); 2964 } else { 2965 ceph_encode_32(p, 0); 2966 } 2967 } 2968 2969 static inline u16 mds_supported_head_version(struct ceph_mds_session *session) 2970 { 2971 if (!test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD, &session->s_features)) 2972 return 1; 2973 2974 if (!test_bit(CEPHFS_FEATURE_HAS_OWNER_UIDGID, &session->s_features)) 2975 return 2; 2976 2977 return CEPH_MDS_REQUEST_HEAD_VERSION; 2978 } 2979 2980 static struct ceph_mds_request_head_legacy * 2981 find_legacy_request_head(void *p, u64 features) 2982 { 2983 bool legacy = !(features & CEPH_FEATURE_FS_BTIME); 2984 struct ceph_mds_request_head *head; 2985 2986 if (legacy) 2987 return (struct ceph_mds_request_head_legacy *)p; 2988 head = (struct ceph_mds_request_head *)p; 2989 return (struct ceph_mds_request_head_legacy *)&head->oldest_client_tid; 2990 } 2991 2992 /* 2993 * called under mdsc->mutex 2994 */ 2995 static struct ceph_msg *create_request_message(struct ceph_mds_session *session, 2996 struct ceph_mds_request *req, 2997 bool drop_cap_releases) 2998 { 2999 int mds = session->s_mds; 3000 struct ceph_mds_client *mdsc = session->s_mdsc; 3001 struct ceph_client *cl = mdsc->fsc->client; 3002 struct ceph_msg *msg; 3003 struct ceph_mds_request_head_legacy *lhead; 3004 struct ceph_path_info path_info1 = {0}; 3005 struct ceph_path_info path_info2 = {0}; 3006 struct dentry *old_dentry = NULL; 3007 int len; 3008 u16 releases; 3009 void *p, *end; 3010 int ret; 3011 bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME); 3012 u16 request_head_version = mds_supported_head_version(session); 3013 kuid_t caller_fsuid = req->r_cred->fsuid; 3014 kgid_t caller_fsgid = req->r_cred->fsgid; 3015 bool parent_locked = test_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); 3016 3017 ret = set_request_path_attr(mdsc, req->r_inode, req->r_dentry, 3018 req->r_parent, req->r_path1, req->r_ino1.ino, 3019 &path_info1, parent_locked); 3020 if (ret < 0) { 3021 msg = ERR_PTR(ret); 3022 goto out; 3023 } 3024 3025 /* 3026 * When the parent directory's i_rwsem is *not* locked, req->r_parent may 3027 * have become stale (e.g. after a concurrent rename) between the time the 3028 * dentry was looked up and now. If we detect that the stored r_parent 3029 * does not match the inode number we just encoded for the request, switch 3030 * to the correct inode so that the MDS receives a valid parent reference. 3031 */ 3032 if (!parent_locked && req->r_parent && path_info1.vino.ino && 3033 ceph_ino(req->r_parent) != path_info1.vino.ino) { 3034 struct inode *old_parent = req->r_parent; 3035 struct inode *correct_dir = ceph_get_inode(mdsc->fsc->sb, path_info1.vino, NULL); 3036 if (!IS_ERR(correct_dir)) { 3037 WARN_ONCE(1, "ceph: r_parent mismatch (had %llx wanted %llx) - updating\n", 3038 ceph_ino(old_parent), path_info1.vino.ino); 3039 /* 3040 * Transfer CEPH_CAP_PIN from the old parent to the new one. 3041 * The pin was taken earlier in ceph_mdsc_submit_request(). 3042 */ 3043 ceph_put_cap_refs(ceph_inode(old_parent), CEPH_CAP_PIN); 3044 iput(old_parent); 3045 req->r_parent = correct_dir; 3046 ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 3047 } 3048 } 3049 3050 /* If r_old_dentry is set, then assume that its parent is locked */ 3051 if (req->r_old_dentry && 3052 !(req->r_old_dentry->d_flags & DCACHE_DISCONNECTED)) 3053 old_dentry = req->r_old_dentry; 3054 ret = set_request_path_attr(mdsc, NULL, old_dentry, 3055 req->r_old_dentry_dir, 3056 req->r_path2, req->r_ino2.ino, 3057 &path_info2, true); 3058 if (ret < 0) { 3059 msg = ERR_PTR(ret); 3060 goto out_free1; 3061 } 3062 3063 req->r_altname = get_fscrypt_altname(req, &req->r_altname_len); 3064 if (IS_ERR(req->r_altname)) { 3065 msg = ERR_CAST(req->r_altname); 3066 req->r_altname = NULL; 3067 goto out_free2; 3068 } 3069 3070 /* 3071 * For old cephs without supporting the 32bit retry/fwd feature 3072 * it will copy the raw memories directly when decoding the 3073 * requests. While new cephs will decode the head depending the 3074 * version member, so we need to make sure it will be compatible 3075 * with them both. 3076 */ 3077 if (legacy) 3078 len = sizeof(struct ceph_mds_request_head_legacy); 3079 else if (request_head_version == 1) 3080 len = offsetofend(struct ceph_mds_request_head, args); 3081 else if (request_head_version == 2) 3082 len = offsetofend(struct ceph_mds_request_head, ext_num_fwd); 3083 else 3084 len = sizeof(struct ceph_mds_request_head); 3085 3086 /* filepaths */ 3087 len += 2 * (1 + sizeof(u32) + sizeof(u64)); 3088 len += path_info1.pathlen + path_info2.pathlen; 3089 3090 /* cap releases */ 3091 len += sizeof(struct ceph_mds_request_release) * 3092 (!!req->r_inode_drop + !!req->r_dentry_drop + 3093 !!req->r_old_inode_drop + !!req->r_old_dentry_drop); 3094 3095 if (req->r_dentry_drop) 3096 len += path_info1.pathlen; 3097 if (req->r_old_dentry_drop) 3098 len += path_info2.pathlen; 3099 3100 /* MClientRequest tail */ 3101 3102 /* req->r_stamp */ 3103 len += sizeof(struct ceph_timespec); 3104 3105 /* gid list */ 3106 len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups); 3107 3108 /* alternate name */ 3109 len += sizeof(u32) + req->r_altname_len; 3110 3111 /* fscrypt_auth */ 3112 len += sizeof(u32); // fscrypt_auth 3113 if (req->r_fscrypt_auth) 3114 len += ceph_fscrypt_auth_len(req->r_fscrypt_auth); 3115 3116 /* fscrypt_file */ 3117 len += sizeof(u32); 3118 if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags)) 3119 len += sizeof(__le64); 3120 3121 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false); 3122 if (!msg) { 3123 msg = ERR_PTR(-ENOMEM); 3124 goto out_free2; 3125 } 3126 3127 msg->hdr.tid = cpu_to_le64(req->r_tid); 3128 3129 lhead = find_legacy_request_head(msg->front.iov_base, 3130 session->s_con.peer_features); 3131 3132 if ((req->r_mnt_idmap != &nop_mnt_idmap) && 3133 !test_bit(CEPHFS_FEATURE_HAS_OWNER_UIDGID, &session->s_features)) { 3134 WARN_ON_ONCE(!IS_CEPH_MDS_OP_NEWINODE(req->r_op)); 3135 3136 if (enable_unsafe_idmap) { 3137 pr_warn_once_client(cl, 3138 "idmapped mount is used and CEPHFS_FEATURE_HAS_OWNER_UIDGID" 3139 " is not supported by MDS. UID/GID-based restrictions may" 3140 " not work properly.\n"); 3141 3142 caller_fsuid = from_vfsuid(req->r_mnt_idmap, &init_user_ns, 3143 VFSUIDT_INIT(req->r_cred->fsuid)); 3144 caller_fsgid = from_vfsgid(req->r_mnt_idmap, &init_user_ns, 3145 VFSGIDT_INIT(req->r_cred->fsgid)); 3146 } else { 3147 pr_err_ratelimited_client(cl, 3148 "idmapped mount is used and CEPHFS_FEATURE_HAS_OWNER_UIDGID" 3149 " is not supported by MDS. Fail request with -EIO.\n"); 3150 3151 ret = -EIO; 3152 goto out_err; 3153 } 3154 } 3155 3156 /* 3157 * The ceph_mds_request_head_legacy didn't contain a version field, and 3158 * one was added when we moved the message version from 3->4. 3159 */ 3160 if (legacy) { 3161 msg->hdr.version = cpu_to_le16(3); 3162 p = msg->front.iov_base + sizeof(*lhead); 3163 } else if (request_head_version == 1) { 3164 struct ceph_mds_request_head *nhead = msg->front.iov_base; 3165 3166 msg->hdr.version = cpu_to_le16(4); 3167 nhead->version = cpu_to_le16(1); 3168 p = msg->front.iov_base + offsetofend(struct ceph_mds_request_head, args); 3169 } else if (request_head_version == 2) { 3170 struct ceph_mds_request_head *nhead = msg->front.iov_base; 3171 3172 msg->hdr.version = cpu_to_le16(6); 3173 nhead->version = cpu_to_le16(2); 3174 3175 p = msg->front.iov_base + offsetofend(struct ceph_mds_request_head, ext_num_fwd); 3176 } else { 3177 struct ceph_mds_request_head *nhead = msg->front.iov_base; 3178 kuid_t owner_fsuid; 3179 kgid_t owner_fsgid; 3180 3181 msg->hdr.version = cpu_to_le16(6); 3182 nhead->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION); 3183 nhead->struct_len = cpu_to_le32(sizeof(struct ceph_mds_request_head)); 3184 3185 if (IS_CEPH_MDS_OP_NEWINODE(req->r_op)) { 3186 owner_fsuid = from_vfsuid(req->r_mnt_idmap, &init_user_ns, 3187 VFSUIDT_INIT(req->r_cred->fsuid)); 3188 owner_fsgid = from_vfsgid(req->r_mnt_idmap, &init_user_ns, 3189 VFSGIDT_INIT(req->r_cred->fsgid)); 3190 nhead->owner_uid = cpu_to_le32(from_kuid(&init_user_ns, owner_fsuid)); 3191 nhead->owner_gid = cpu_to_le32(from_kgid(&init_user_ns, owner_fsgid)); 3192 } else { 3193 nhead->owner_uid = cpu_to_le32(-1); 3194 nhead->owner_gid = cpu_to_le32(-1); 3195 } 3196 3197 p = msg->front.iov_base + sizeof(*nhead); 3198 } 3199 3200 end = msg->front.iov_base + msg->front.iov_len; 3201 3202 lhead->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); 3203 lhead->op = cpu_to_le32(req->r_op); 3204 lhead->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, 3205 caller_fsuid)); 3206 lhead->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, 3207 caller_fsgid)); 3208 lhead->ino = cpu_to_le64(req->r_deleg_ino); 3209 lhead->args = req->r_args; 3210 3211 ceph_encode_filepath(&p, end, path_info1.vino.ino, path_info1.path); 3212 ceph_encode_filepath(&p, end, path_info2.vino.ino, path_info2.path); 3213 3214 /* make note of release offset, in case we need to replay */ 3215 req->r_request_release_offset = p - msg->front.iov_base; 3216 3217 /* cap releases */ 3218 releases = 0; 3219 if (req->r_inode_drop) 3220 releases += ceph_encode_inode_release(&p, 3221 req->r_inode ? req->r_inode : d_inode(req->r_dentry), 3222 mds, req->r_inode_drop, req->r_inode_unless, 3223 req->r_op == CEPH_MDS_OP_READDIR); 3224 if (req->r_dentry_drop) { 3225 ret = ceph_encode_dentry_release(&p, req->r_dentry, 3226 req->r_parent, mds, req->r_dentry_drop, 3227 req->r_dentry_unless); 3228 if (ret < 0) 3229 goto out_err; 3230 releases += ret; 3231 } 3232 if (req->r_old_dentry_drop) { 3233 ret = ceph_encode_dentry_release(&p, req->r_old_dentry, 3234 req->r_old_dentry_dir, mds, 3235 req->r_old_dentry_drop, 3236 req->r_old_dentry_unless); 3237 if (ret < 0) 3238 goto out_err; 3239 releases += ret; 3240 } 3241 if (req->r_old_inode_drop) 3242 releases += ceph_encode_inode_release(&p, 3243 d_inode(req->r_old_dentry), 3244 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); 3245 3246 if (drop_cap_releases) { 3247 releases = 0; 3248 p = msg->front.iov_base + req->r_request_release_offset; 3249 } 3250 3251 lhead->num_releases = cpu_to_le16(releases); 3252 3253 encode_mclientrequest_tail(&p, req); 3254 3255 if (WARN_ON_ONCE(p > end)) { 3256 ceph_msg_put(msg); 3257 msg = ERR_PTR(-ERANGE); 3258 goto out_free2; 3259 } 3260 3261 msg->front.iov_len = p - msg->front.iov_base; 3262 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 3263 3264 if (req->r_pagelist) { 3265 struct ceph_pagelist *pagelist = req->r_pagelist; 3266 ceph_msg_data_add_pagelist(msg, pagelist); 3267 msg->hdr.data_len = cpu_to_le32(pagelist->length); 3268 } else { 3269 msg->hdr.data_len = 0; 3270 } 3271 3272 msg->hdr.data_off = cpu_to_le16(0); 3273 3274 out_free2: 3275 ceph_mdsc_free_path_info(&path_info2); 3276 out_free1: 3277 ceph_mdsc_free_path_info(&path_info1); 3278 out: 3279 return msg; 3280 out_err: 3281 ceph_msg_put(msg); 3282 msg = ERR_PTR(ret); 3283 goto out_free2; 3284 } 3285 3286 /* 3287 * called under mdsc->mutex if error, under no mutex if 3288 * success. 3289 */ 3290 static void complete_request(struct ceph_mds_client *mdsc, 3291 struct ceph_mds_request *req) 3292 { 3293 req->r_end_latency = ktime_get(); 3294 3295 trace_ceph_mdsc_complete_request(mdsc, req); 3296 3297 if (req->r_callback) 3298 req->r_callback(mdsc, req); 3299 complete_all(&req->r_completion); 3300 } 3301 3302 /* 3303 * called under mdsc->mutex 3304 */ 3305 static int __prepare_send_request(struct ceph_mds_session *session, 3306 struct ceph_mds_request *req, 3307 bool drop_cap_releases) 3308 { 3309 int mds = session->s_mds; 3310 struct ceph_mds_client *mdsc = session->s_mdsc; 3311 struct ceph_client *cl = mdsc->fsc->client; 3312 struct ceph_mds_request_head_legacy *lhead; 3313 struct ceph_mds_request_head *nhead; 3314 struct ceph_msg *msg; 3315 int flags = 0, old_max_retry; 3316 bool old_version = !test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD, 3317 &session->s_features); 3318 3319 /* 3320 * Avoid infinite retrying after overflow. The client will 3321 * increase the retry count and if the MDS is old version, 3322 * so we limit to retry at most 256 times. 3323 */ 3324 if (req->r_attempts) { 3325 old_max_retry = sizeof_field(struct ceph_mds_request_head, 3326 num_retry); 3327 old_max_retry = 1 << (old_max_retry * BITS_PER_BYTE); 3328 if ((old_version && req->r_attempts >= old_max_retry) || 3329 ((uint32_t)req->r_attempts >= U32_MAX)) { 3330 pr_warn_ratelimited_client(cl, "request tid %llu seq overflow\n", 3331 req->r_tid); 3332 return -EMULTIHOP; 3333 } 3334 } 3335 3336 req->r_attempts++; 3337 if (req->r_inode) { 3338 struct ceph_cap *cap = 3339 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); 3340 3341 if (cap) 3342 req->r_sent_on_mseq = cap->mseq; 3343 else 3344 req->r_sent_on_mseq = -1; 3345 } 3346 doutc(cl, "%p tid %lld %s (attempt %d)\n", req, req->r_tid, 3347 ceph_mds_op_name(req->r_op), req->r_attempts); 3348 3349 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3350 void *p; 3351 3352 /* 3353 * Replay. Do not regenerate message (and rebuild 3354 * paths, etc.); just use the original message. 3355 * Rebuilding paths will break for renames because 3356 * d_move mangles the src name. 3357 */ 3358 msg = req->r_request; 3359 lhead = find_legacy_request_head(msg->front.iov_base, 3360 session->s_con.peer_features); 3361 3362 flags = le32_to_cpu(lhead->flags); 3363 flags |= CEPH_MDS_FLAG_REPLAY; 3364 lhead->flags = cpu_to_le32(flags); 3365 3366 if (req->r_target_inode) 3367 lhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); 3368 3369 lhead->num_retry = req->r_attempts - 1; 3370 if (!old_version) { 3371 nhead = (struct ceph_mds_request_head*)msg->front.iov_base; 3372 nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1); 3373 } 3374 3375 /* remove cap/dentry releases from message */ 3376 lhead->num_releases = 0; 3377 3378 p = msg->front.iov_base + req->r_request_release_offset; 3379 encode_mclientrequest_tail(&p, req); 3380 3381 msg->front.iov_len = p - msg->front.iov_base; 3382 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 3383 return 0; 3384 } 3385 3386 if (req->r_request) { 3387 ceph_msg_put(req->r_request); 3388 req->r_request = NULL; 3389 } 3390 msg = create_request_message(session, req, drop_cap_releases); 3391 if (IS_ERR(msg)) { 3392 req->r_err = PTR_ERR(msg); 3393 return PTR_ERR(msg); 3394 } 3395 req->r_request = msg; 3396 3397 lhead = find_legacy_request_head(msg->front.iov_base, 3398 session->s_con.peer_features); 3399 lhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 3400 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3401 flags |= CEPH_MDS_FLAG_REPLAY; 3402 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) 3403 flags |= CEPH_MDS_FLAG_ASYNC; 3404 if (req->r_parent) 3405 flags |= CEPH_MDS_FLAG_WANT_DENTRY; 3406 lhead->flags = cpu_to_le32(flags); 3407 lhead->num_fwd = req->r_num_fwd; 3408 lhead->num_retry = req->r_attempts - 1; 3409 if (!old_version) { 3410 nhead = (struct ceph_mds_request_head*)msg->front.iov_base; 3411 nhead->ext_num_fwd = cpu_to_le32(req->r_num_fwd); 3412 nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1); 3413 } 3414 3415 doutc(cl, " r_parent = %p\n", req->r_parent); 3416 return 0; 3417 } 3418 3419 /* 3420 * called under mdsc->mutex 3421 */ 3422 static int __send_request(struct ceph_mds_session *session, 3423 struct ceph_mds_request *req, 3424 bool drop_cap_releases) 3425 { 3426 int err; 3427 3428 trace_ceph_mdsc_send_request(session, req); 3429 3430 err = __prepare_send_request(session, req, drop_cap_releases); 3431 if (!err) { 3432 ceph_msg_get(req->r_request); 3433 ceph_con_send(&session->s_con, req->r_request); 3434 } 3435 3436 return err; 3437 } 3438 3439 /* 3440 * send request, or put it on the appropriate wait list. 3441 */ 3442 static void __do_request(struct ceph_mds_client *mdsc, 3443 struct ceph_mds_request *req) 3444 { 3445 struct ceph_client *cl = mdsc->fsc->client; 3446 struct ceph_mds_session *session = NULL; 3447 int mds = -1; 3448 int err = 0; 3449 bool random; 3450 3451 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 3452 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) 3453 __unregister_request(mdsc, req); 3454 return; 3455 } 3456 3457 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) { 3458 doutc(cl, "metadata corrupted\n"); 3459 err = -EIO; 3460 goto finish; 3461 } 3462 if (req->r_timeout && 3463 time_after_eq(jiffies, req->r_started + req->r_timeout)) { 3464 doutc(cl, "timed out\n"); 3465 err = -ETIMEDOUT; 3466 goto finish; 3467 } 3468 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 3469 doutc(cl, "forced umount\n"); 3470 err = -EIO; 3471 goto finish; 3472 } 3473 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) { 3474 if (mdsc->mdsmap_err) { 3475 err = mdsc->mdsmap_err; 3476 doutc(cl, "mdsmap err %d\n", err); 3477 goto finish; 3478 } 3479 if (mdsc->mdsmap->m_epoch == 0) { 3480 doutc(cl, "no mdsmap, waiting for map\n"); 3481 trace_ceph_mdsc_suspend_request(mdsc, session, req, 3482 ceph_mdsc_suspend_reason_no_mdsmap); 3483 list_add(&req->r_wait, &mdsc->waiting_for_map); 3484 return; 3485 } 3486 if (!(mdsc->fsc->mount_options->flags & 3487 CEPH_MOUNT_OPT_MOUNTWAIT) && 3488 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) { 3489 err = -EHOSTUNREACH; 3490 goto finish; 3491 } 3492 } 3493 3494 put_request_session(req); 3495 3496 mds = __choose_mds(mdsc, req, &random); 3497 if (mds < 0 || 3498 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { 3499 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 3500 err = -EJUKEBOX; 3501 goto finish; 3502 } 3503 doutc(cl, "no mds or not active, waiting for map\n"); 3504 trace_ceph_mdsc_suspend_request(mdsc, session, req, 3505 ceph_mdsc_suspend_reason_no_active_mds); 3506 list_add(&req->r_wait, &mdsc->waiting_for_map); 3507 return; 3508 } 3509 3510 /* get, open session */ 3511 session = __ceph_lookup_mds_session(mdsc, mds); 3512 if (!session) { 3513 session = register_session(mdsc, mds); 3514 if (IS_ERR(session)) { 3515 err = PTR_ERR(session); 3516 goto finish; 3517 } 3518 } 3519 req->r_session = ceph_get_mds_session(session); 3520 3521 doutc(cl, "mds%d session %p state %s\n", mds, session, 3522 ceph_session_state_name(session->s_state)); 3523 3524 /* 3525 * The old ceph will crash the MDSs when see unknown OPs 3526 */ 3527 if (req->r_feature_needed > 0 && 3528 !test_bit(req->r_feature_needed, &session->s_features)) { 3529 err = -EOPNOTSUPP; 3530 goto out_session; 3531 } 3532 3533 if (session->s_state != CEPH_MDS_SESSION_OPEN && 3534 session->s_state != CEPH_MDS_SESSION_HUNG) { 3535 /* 3536 * We cannot queue async requests since the caps and delegated 3537 * inodes are bound to the session. Just return -EJUKEBOX and 3538 * let the caller retry a sync request in that case. 3539 */ 3540 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 3541 err = -EJUKEBOX; 3542 goto out_session; 3543 } 3544 3545 /* 3546 * If the session has been REJECTED, then return a hard error, 3547 * unless it's a CLEANRECOVER mount, in which case we'll queue 3548 * it to the mdsc queue. 3549 */ 3550 if (session->s_state == CEPH_MDS_SESSION_REJECTED) { 3551 if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER)) { 3552 trace_ceph_mdsc_suspend_request(mdsc, session, req, 3553 ceph_mdsc_suspend_reason_rejected); 3554 list_add(&req->r_wait, &mdsc->waiting_for_map); 3555 } else 3556 err = -EACCES; 3557 goto out_session; 3558 } 3559 3560 if (session->s_state == CEPH_MDS_SESSION_NEW || 3561 session->s_state == CEPH_MDS_SESSION_CLOSING) { 3562 err = __open_session(mdsc, session); 3563 if (err) 3564 goto out_session; 3565 /* retry the same mds later */ 3566 if (random) 3567 req->r_resend_mds = mds; 3568 } 3569 trace_ceph_mdsc_suspend_request(mdsc, session, req, 3570 ceph_mdsc_suspend_reason_session); 3571 list_add(&req->r_wait, &session->s_waiting); 3572 goto out_session; 3573 } 3574 3575 /* send request */ 3576 req->r_resend_mds = -1; /* forget any previous mds hint */ 3577 3578 if (req->r_request_started == 0) /* note request start time */ 3579 req->r_request_started = jiffies; 3580 3581 /* 3582 * For async create we will choose the auth MDS of frag in parent 3583 * directory to send the request and usually this works fine, but 3584 * if the migrated the dirtory to another MDS before it could handle 3585 * it the request will be forwarded. 3586 * 3587 * And then the auth cap will be changed. 3588 */ 3589 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags) && req->r_num_fwd) { 3590 struct ceph_dentry_info *di = ceph_dentry(req->r_dentry); 3591 struct ceph_inode_info *ci; 3592 struct ceph_cap *cap; 3593 3594 /* 3595 * The request maybe handled very fast and the new inode 3596 * hasn't been linked to the dentry yet. We need to wait 3597 * for the ceph_finish_async_create(), which shouldn't be 3598 * stuck too long or fail in thoery, to finish when forwarding 3599 * the request. 3600 */ 3601 if (!d_inode(req->r_dentry)) { 3602 err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_CREATE_BIT, 3603 TASK_KILLABLE); 3604 if (err) { 3605 mutex_lock(&req->r_fill_mutex); 3606 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 3607 mutex_unlock(&req->r_fill_mutex); 3608 goto out_session; 3609 } 3610 } 3611 3612 ci = ceph_inode(d_inode(req->r_dentry)); 3613 3614 spin_lock(&ci->i_ceph_lock); 3615 cap = ci->i_auth_cap; 3616 if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE && mds != cap->mds) { 3617 doutc(cl, "session changed for auth cap %d -> %d\n", 3618 cap->session->s_mds, session->s_mds); 3619 3620 /* Remove the auth cap from old session */ 3621 spin_lock(&cap->session->s_cap_lock); 3622 cap->session->s_nr_caps--; 3623 list_del_init(&cap->session_caps); 3624 spin_unlock(&cap->session->s_cap_lock); 3625 3626 /* Add the auth cap to the new session */ 3627 cap->mds = mds; 3628 cap->session = session; 3629 spin_lock(&session->s_cap_lock); 3630 session->s_nr_caps++; 3631 list_add_tail(&cap->session_caps, &session->s_caps); 3632 spin_unlock(&session->s_cap_lock); 3633 3634 change_auth_cap_ses(ci, session); 3635 } 3636 spin_unlock(&ci->i_ceph_lock); 3637 } 3638 3639 err = __send_request(session, req, false); 3640 3641 out_session: 3642 ceph_put_mds_session(session); 3643 finish: 3644 if (err) { 3645 doutc(cl, "early error %d\n", err); 3646 req->r_err = err; 3647 complete_request(mdsc, req); 3648 __unregister_request(mdsc, req); 3649 } 3650 return; 3651 } 3652 3653 /* 3654 * called under mdsc->mutex 3655 */ 3656 static void __wake_requests(struct ceph_mds_client *mdsc, 3657 struct list_head *head) 3658 { 3659 struct ceph_client *cl = mdsc->fsc->client; 3660 struct ceph_mds_request *req; 3661 LIST_HEAD(tmp_list); 3662 3663 list_splice_init(head, &tmp_list); 3664 3665 while (!list_empty(&tmp_list)) { 3666 req = list_entry(tmp_list.next, 3667 struct ceph_mds_request, r_wait); 3668 list_del_init(&req->r_wait); 3669 doutc(cl, " wake request %p tid %llu\n", req, 3670 req->r_tid); 3671 trace_ceph_mdsc_resume_request(mdsc, req); 3672 __do_request(mdsc, req); 3673 } 3674 } 3675 3676 /* 3677 * Wake up threads with requests pending for @mds, so that they can 3678 * resubmit their requests to a possibly different mds. 3679 */ 3680 static void kick_requests(struct ceph_mds_client *mdsc, int mds) 3681 { 3682 struct ceph_client *cl = mdsc->fsc->client; 3683 struct ceph_mds_request *req; 3684 struct rb_node *p = rb_first(&mdsc->request_tree); 3685 3686 doutc(cl, "kick_requests mds%d\n", mds); 3687 while (p) { 3688 req = rb_entry(p, struct ceph_mds_request, r_node); 3689 p = rb_next(p); 3690 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3691 continue; 3692 if (req->r_attempts > 0) 3693 continue; /* only new requests */ 3694 if (req->r_session && 3695 req->r_session->s_mds == mds) { 3696 doutc(cl, " kicking tid %llu\n", req->r_tid); 3697 list_del_init(&req->r_wait); 3698 trace_ceph_mdsc_resume_request(mdsc, req); 3699 __do_request(mdsc, req); 3700 } 3701 } 3702 } 3703 3704 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, 3705 struct ceph_mds_request *req) 3706 { 3707 struct ceph_client *cl = mdsc->fsc->client; 3708 int err = 0; 3709 3710 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */ 3711 if (req->r_inode) 3712 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 3713 if (req->r_parent) { 3714 struct ceph_inode_info *ci = ceph_inode(req->r_parent); 3715 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ? 3716 CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD; 3717 spin_lock(&ci->i_ceph_lock); 3718 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false); 3719 __ceph_touch_fmode(ci, mdsc, fmode); 3720 spin_unlock(&ci->i_ceph_lock); 3721 } 3722 if (req->r_old_dentry_dir) 3723 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 3724 CEPH_CAP_PIN); 3725 3726 if (req->r_inode) { 3727 err = ceph_wait_on_async_create(req->r_inode); 3728 if (err) { 3729 doutc(cl, "wait for async create returned: %d\n", err); 3730 return err; 3731 } 3732 } 3733 3734 if (!err && req->r_old_inode) { 3735 err = ceph_wait_on_async_create(req->r_old_inode); 3736 if (err) { 3737 doutc(cl, "wait for async create returned: %d\n", err); 3738 return err; 3739 } 3740 } 3741 3742 doutc(cl, "submit_request on %p for inode %p\n", req, dir); 3743 mutex_lock(&mdsc->mutex); 3744 __register_request(mdsc, req, dir); 3745 trace_ceph_mdsc_submit_request(mdsc, req); 3746 __do_request(mdsc, req); 3747 err = req->r_err; 3748 mutex_unlock(&mdsc->mutex); 3749 return err; 3750 } 3751 3752 int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc, 3753 struct ceph_mds_request *req, 3754 ceph_mds_request_wait_callback_t wait_func) 3755 { 3756 struct ceph_client *cl = mdsc->fsc->client; 3757 int err; 3758 3759 /* wait */ 3760 doutc(cl, "do_request waiting\n"); 3761 if (wait_func) { 3762 err = wait_func(mdsc, req); 3763 } else { 3764 long timeleft = wait_for_completion_killable_timeout( 3765 &req->r_completion, 3766 ceph_timeout_jiffies(req->r_timeout)); 3767 if (timeleft > 0) 3768 err = 0; 3769 else if (!timeleft) 3770 err = -ETIMEDOUT; /* timed out */ 3771 else 3772 err = timeleft; /* killed */ 3773 } 3774 doutc(cl, "do_request waited, got %d\n", err); 3775 mutex_lock(&mdsc->mutex); 3776 3777 /* only abort if we didn't race with a real reply */ 3778 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 3779 err = le32_to_cpu(req->r_reply_info.head->result); 3780 } else if (err < 0) { 3781 doutc(cl, "aborted request %lld with %d\n", req->r_tid, err); 3782 3783 /* 3784 * ensure we aren't running concurrently with 3785 * ceph_fill_trace or ceph_readdir_prepopulate, which 3786 * rely on locks (dir mutex) held by our caller. 3787 */ 3788 mutex_lock(&req->r_fill_mutex); 3789 req->r_err = err; 3790 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 3791 mutex_unlock(&req->r_fill_mutex); 3792 3793 if (req->r_parent && 3794 (req->r_op & CEPH_MDS_OP_WRITE)) 3795 ceph_invalidate_dir_request(req); 3796 } else { 3797 err = req->r_err; 3798 } 3799 3800 mutex_unlock(&mdsc->mutex); 3801 return err; 3802 } 3803 3804 /* 3805 * Synchrously perform an mds request. Take care of all of the 3806 * session setup, forwarding, retry details. 3807 */ 3808 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, 3809 struct inode *dir, 3810 struct ceph_mds_request *req) 3811 { 3812 struct ceph_client *cl = mdsc->fsc->client; 3813 int err; 3814 3815 doutc(cl, "do_request on %p\n", req); 3816 3817 /* issue */ 3818 err = ceph_mdsc_submit_request(mdsc, dir, req); 3819 if (!err) 3820 err = ceph_mdsc_wait_request(mdsc, req, NULL); 3821 doutc(cl, "do_request %p done, result %d\n", req, err); 3822 return err; 3823 } 3824 3825 /* 3826 * Invalidate dir's completeness, dentry lease state on an aborted MDS 3827 * namespace request. 3828 */ 3829 void ceph_invalidate_dir_request(struct ceph_mds_request *req) 3830 { 3831 struct inode *dir = req->r_parent; 3832 struct inode *old_dir = req->r_old_dentry_dir; 3833 struct ceph_client *cl = req->r_mdsc->fsc->client; 3834 3835 doutc(cl, "invalidate_dir_request %p %p (complete, lease(s))\n", 3836 dir, old_dir); 3837 3838 ceph_dir_clear_complete(dir); 3839 if (old_dir) 3840 ceph_dir_clear_complete(old_dir); 3841 if (req->r_dentry) 3842 ceph_invalidate_dentry_lease(req->r_dentry); 3843 if (req->r_old_dentry) 3844 ceph_invalidate_dentry_lease(req->r_old_dentry); 3845 } 3846 3847 /* 3848 * Handle mds reply. 3849 * 3850 * We take the session mutex and parse and process the reply immediately. 3851 * This preserves the logical ordering of replies, capabilities, etc., sent 3852 * by the MDS as they are applied to our local cache. 3853 */ 3854 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) 3855 { 3856 struct ceph_mds_client *mdsc = session->s_mdsc; 3857 struct ceph_client *cl = mdsc->fsc->client; 3858 struct ceph_mds_request *req; 3859 struct ceph_mds_reply_head *head = msg->front.iov_base; 3860 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 3861 struct ceph_snap_realm *realm; 3862 u64 tid; 3863 int err, result; 3864 int mds = session->s_mds; 3865 bool close_sessions = false; 3866 3867 if (msg->front.iov_len < sizeof(*head)) { 3868 pr_err_client(cl, "got corrupt (short) reply\n"); 3869 ceph_msg_dump(msg); 3870 return; 3871 } 3872 3873 /* get request, session */ 3874 tid = le64_to_cpu(msg->hdr.tid); 3875 mutex_lock(&mdsc->mutex); 3876 req = lookup_get_request(mdsc, tid); 3877 if (!req) { 3878 doutc(cl, "on unknown tid %llu\n", tid); 3879 mutex_unlock(&mdsc->mutex); 3880 return; 3881 } 3882 doutc(cl, "handle_reply %p\n", req); 3883 3884 /* correct session? */ 3885 if (req->r_session != session) { 3886 pr_err_client(cl, "got %llu on session mds%d not mds%d\n", 3887 tid, session->s_mds, 3888 req->r_session ? req->r_session->s_mds : -1); 3889 mutex_unlock(&mdsc->mutex); 3890 goto out; 3891 } 3892 3893 /* dup? */ 3894 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) || 3895 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) { 3896 pr_warn_client(cl, "got a dup %s reply on %llu from mds%d\n", 3897 head->safe ? "safe" : "unsafe", tid, mds); 3898 mutex_unlock(&mdsc->mutex); 3899 goto out; 3900 } 3901 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) { 3902 pr_warn_client(cl, "got unsafe after safe on %llu from mds%d\n", 3903 tid, mds); 3904 mutex_unlock(&mdsc->mutex); 3905 goto out; 3906 } 3907 3908 result = le32_to_cpu(head->result); 3909 3910 if (head->safe) { 3911 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags); 3912 __unregister_request(mdsc, req); 3913 3914 /* last request during umount? */ 3915 if (mdsc->stopping && !__get_oldest_req(mdsc)) 3916 complete_all(&mdsc->safe_umount_waiters); 3917 3918 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3919 /* 3920 * We already handled the unsafe response, now do the 3921 * cleanup. No need to examine the response; the MDS 3922 * doesn't include any result info in the safe 3923 * response. And even if it did, there is nothing 3924 * useful we could do with a revised return value. 3925 */ 3926 doutc(cl, "got safe reply %llu, mds%d\n", tid, mds); 3927 3928 mutex_unlock(&mdsc->mutex); 3929 goto out; 3930 } 3931 } else { 3932 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags); 3933 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 3934 } 3935 3936 doutc(cl, "tid %lld result %d\n", tid, result); 3937 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features)) 3938 err = parse_reply_info(session, msg, req, (u64)-1); 3939 else 3940 err = parse_reply_info(session, msg, req, 3941 session->s_con.peer_features); 3942 mutex_unlock(&mdsc->mutex); 3943 3944 /* Must find target inode outside of mutexes to avoid deadlocks */ 3945 rinfo = &req->r_reply_info; 3946 if ((err >= 0) && rinfo->head->is_target) { 3947 struct inode *in = xchg(&req->r_new_inode, NULL); 3948 struct ceph_vino tvino = { 3949 .ino = le64_to_cpu(rinfo->targeti.in->ino), 3950 .snap = le64_to_cpu(rinfo->targeti.in->snapid) 3951 }; 3952 3953 /* 3954 * If we ended up opening an existing inode, discard 3955 * r_new_inode 3956 */ 3957 if (req->r_op == CEPH_MDS_OP_CREATE && 3958 !req->r_reply_info.has_create_ino) { 3959 /* This should never happen on an async create */ 3960 WARN_ON_ONCE(req->r_deleg_ino); 3961 iput(in); 3962 in = NULL; 3963 } 3964 3965 in = ceph_get_inode(mdsc->fsc->sb, tvino, in); 3966 if (IS_ERR(in)) { 3967 err = PTR_ERR(in); 3968 mutex_lock(&session->s_mutex); 3969 goto out_err; 3970 } 3971 req->r_target_inode = in; 3972 } 3973 3974 mutex_lock(&session->s_mutex); 3975 if (err < 0) { 3976 pr_err_client(cl, "got corrupt reply mds%d(tid:%lld)\n", 3977 mds, tid); 3978 ceph_msg_dump(msg); 3979 goto out_err; 3980 } 3981 3982 /* snap trace */ 3983 realm = NULL; 3984 if (rinfo->snapblob_len) { 3985 down_write(&mdsc->snap_rwsem); 3986 err = ceph_update_snap_trace(mdsc, rinfo->snapblob, 3987 rinfo->snapblob + rinfo->snapblob_len, 3988 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP, 3989 &realm); 3990 if (err) { 3991 up_write(&mdsc->snap_rwsem); 3992 close_sessions = true; 3993 if (err == -EIO) 3994 ceph_msg_dump(msg); 3995 goto out_err; 3996 } 3997 downgrade_write(&mdsc->snap_rwsem); 3998 } else { 3999 down_read(&mdsc->snap_rwsem); 4000 } 4001 4002 /* insert trace into our cache */ 4003 mutex_lock(&req->r_fill_mutex); 4004 current->journal_info = req; 4005 err = ceph_fill_trace(mdsc->fsc->sb, req); 4006 if (err == 0) { 4007 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || 4008 req->r_op == CEPH_MDS_OP_LSSNAP)) 4009 err = ceph_readdir_prepopulate(req, req->r_session); 4010 } 4011 current->journal_info = NULL; 4012 mutex_unlock(&req->r_fill_mutex); 4013 4014 up_read(&mdsc->snap_rwsem); 4015 if (realm) 4016 ceph_put_snap_realm(mdsc, realm); 4017 4018 if (err == 0) { 4019 if (req->r_target_inode && 4020 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 4021 struct ceph_inode_info *ci = 4022 ceph_inode(req->r_target_inode); 4023 spin_lock(&ci->i_unsafe_lock); 4024 list_add_tail(&req->r_unsafe_target_item, 4025 &ci->i_unsafe_iops); 4026 spin_unlock(&ci->i_unsafe_lock); 4027 } 4028 4029 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 4030 } 4031 out_err: 4032 mutex_lock(&mdsc->mutex); 4033 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 4034 if (err) { 4035 req->r_err = err; 4036 } else { 4037 req->r_reply = ceph_msg_get(msg); 4038 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags); 4039 } 4040 } else { 4041 doutc(cl, "reply arrived after request %lld was aborted\n", tid); 4042 } 4043 mutex_unlock(&mdsc->mutex); 4044 4045 mutex_unlock(&session->s_mutex); 4046 4047 /* kick calling process */ 4048 complete_request(mdsc, req); 4049 4050 ceph_update_metadata_metrics(&mdsc->metric, req->r_start_latency, 4051 req->r_end_latency, err); 4052 out: 4053 ceph_mdsc_put_request(req); 4054 4055 /* Defer closing the sessions after s_mutex lock being released */ 4056 if (close_sessions) 4057 ceph_mdsc_close_sessions(mdsc); 4058 return; 4059 } 4060 4061 4062 4063 /* 4064 * handle mds notification that our request has been forwarded. 4065 */ 4066 static void handle_forward(struct ceph_mds_client *mdsc, 4067 struct ceph_mds_session *session, 4068 struct ceph_msg *msg) 4069 { 4070 struct ceph_client *cl = mdsc->fsc->client; 4071 struct ceph_mds_request *req; 4072 u64 tid = le64_to_cpu(msg->hdr.tid); 4073 u32 next_mds; 4074 u32 fwd_seq; 4075 int err = -EINVAL; 4076 void *p = msg->front.iov_base; 4077 void *end = p + msg->front.iov_len; 4078 bool aborted = false; 4079 4080 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 4081 next_mds = ceph_decode_32(&p); 4082 fwd_seq = ceph_decode_32(&p); 4083 4084 mutex_lock(&mdsc->mutex); 4085 req = lookup_get_request(mdsc, tid); 4086 if (!req) { 4087 mutex_unlock(&mdsc->mutex); 4088 doutc(cl, "forward tid %llu to mds%d - req dne\n", tid, next_mds); 4089 return; /* dup reply? */ 4090 } 4091 4092 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 4093 doutc(cl, "forward tid %llu aborted, unregistering\n", tid); 4094 __unregister_request(mdsc, req); 4095 } else if (fwd_seq <= req->r_num_fwd || (uint32_t)fwd_seq >= U32_MAX) { 4096 /* 4097 * Avoid infinite retrying after overflow. 4098 * 4099 * The MDS will increase the fwd count and in client side 4100 * if the num_fwd is less than the one saved in request 4101 * that means the MDS is an old version and overflowed of 4102 * 8 bits. 4103 */ 4104 mutex_lock(&req->r_fill_mutex); 4105 req->r_err = -EMULTIHOP; 4106 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 4107 mutex_unlock(&req->r_fill_mutex); 4108 aborted = true; 4109 pr_warn_ratelimited_client(cl, "forward tid %llu seq overflow\n", 4110 tid); 4111 } else { 4112 /* resend. forward race not possible; mds would drop */ 4113 doutc(cl, "forward tid %llu to mds%d (we resend)\n", tid, next_mds); 4114 BUG_ON(req->r_err); 4115 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)); 4116 req->r_attempts = 0; 4117 req->r_num_fwd = fwd_seq; 4118 req->r_resend_mds = next_mds; 4119 put_request_session(req); 4120 __do_request(mdsc, req); 4121 } 4122 mutex_unlock(&mdsc->mutex); 4123 4124 /* kick calling process */ 4125 if (aborted) 4126 complete_request(mdsc, req); 4127 ceph_mdsc_put_request(req); 4128 return; 4129 4130 bad: 4131 pr_err_client(cl, "decode error err=%d\n", err); 4132 ceph_msg_dump(msg); 4133 } 4134 4135 static int __decode_session_metadata(void **p, void *end, 4136 bool *blocklisted) 4137 { 4138 /* map<string,string> */ 4139 u32 n; 4140 bool err_str; 4141 ceph_decode_32_safe(p, end, n, bad); 4142 while (n-- > 0) { 4143 u32 len; 4144 ceph_decode_32_safe(p, end, len, bad); 4145 ceph_decode_need(p, end, len, bad); 4146 err_str = !strncmp(*p, "error_string", len); 4147 *p += len; 4148 ceph_decode_32_safe(p, end, len, bad); 4149 ceph_decode_need(p, end, len, bad); 4150 /* 4151 * Match "blocklisted (blacklisted)" from newer MDSes, 4152 * or "blacklisted" from older MDSes. 4153 */ 4154 if (err_str && strnstr(*p, "blacklisted", len)) 4155 *blocklisted = true; 4156 *p += len; 4157 } 4158 return 0; 4159 bad: 4160 return -1; 4161 } 4162 4163 /* 4164 * handle a mds session control message 4165 */ 4166 static void handle_session(struct ceph_mds_session *session, 4167 struct ceph_msg *msg) 4168 { 4169 struct ceph_mds_client *mdsc = session->s_mdsc; 4170 struct ceph_client *cl = mdsc->fsc->client; 4171 int mds = session->s_mds; 4172 int msg_version = le16_to_cpu(msg->hdr.version); 4173 void *p = msg->front.iov_base; 4174 void *end = p + msg->front.iov_len; 4175 struct ceph_mds_session_head *h; 4176 struct ceph_mds_cap_auth *cap_auths = NULL; 4177 u32 op, cap_auths_num = 0; 4178 u64 seq, features = 0; 4179 int wake = 0; 4180 bool blocklisted = false; 4181 u32 i; 4182 4183 4184 /* decode */ 4185 ceph_decode_need(&p, end, sizeof(*h), bad); 4186 h = p; 4187 p += sizeof(*h); 4188 4189 op = le32_to_cpu(h->op); 4190 seq = le64_to_cpu(h->seq); 4191 4192 if (msg_version >= 3) { 4193 u32 len; 4194 /* version >= 2 and < 5, decode metadata, skip otherwise 4195 * as it's handled via flags. 4196 */ 4197 if (msg_version >= 5) 4198 ceph_decode_skip_map(&p, end, string, string, bad); 4199 else if (__decode_session_metadata(&p, end, &blocklisted) < 0) 4200 goto bad; 4201 4202 /* version >= 3, feature bits */ 4203 ceph_decode_32_safe(&p, end, len, bad); 4204 if (len) { 4205 ceph_decode_64_safe(&p, end, features, bad); 4206 p += len - sizeof(features); 4207 } 4208 } 4209 4210 if (msg_version >= 5) { 4211 u32 flags, len; 4212 4213 /* version >= 4 */ 4214 ceph_decode_skip_16(&p, end, bad); /* struct_v, struct_cv */ 4215 ceph_decode_32_safe(&p, end, len, bad); /* len */ 4216 ceph_decode_skip_n(&p, end, len, bad); /* metric_spec */ 4217 4218 /* version >= 5, flags */ 4219 ceph_decode_32_safe(&p, end, flags, bad); 4220 if (flags & CEPH_SESSION_BLOCKLISTED) { 4221 pr_warn_client(cl, "mds%d session blocklisted\n", 4222 session->s_mds); 4223 blocklisted = true; 4224 } 4225 } 4226 4227 if (msg_version >= 6) { 4228 ceph_decode_32_safe(&p, end, cap_auths_num, bad); 4229 doutc(cl, "cap_auths_num %d\n", cap_auths_num); 4230 4231 if (cap_auths_num && op != CEPH_SESSION_OPEN) { 4232 WARN_ON_ONCE(op != CEPH_SESSION_OPEN); 4233 goto skip_cap_auths; 4234 } 4235 4236 cap_auths = kzalloc_objs(struct ceph_mds_cap_auth, 4237 cap_auths_num); 4238 if (!cap_auths) { 4239 pr_err_client(cl, "No memory for cap_auths\n"); 4240 return; 4241 } 4242 4243 for (i = 0; i < cap_auths_num; i++) { 4244 u32 _len, j; 4245 4246 /* struct_v, struct_compat, and struct_len in MDSCapAuth */ 4247 ceph_decode_skip_n(&p, end, 2 + sizeof(u32), bad); 4248 4249 /* struct_v, struct_compat, and struct_len in MDSCapMatch */ 4250 ceph_decode_skip_n(&p, end, 2 + sizeof(u32), bad); 4251 ceph_decode_64_safe(&p, end, cap_auths[i].match.uid, bad); 4252 ceph_decode_32_safe(&p, end, _len, bad); 4253 if (_len) { 4254 cap_auths[i].match.gids = kcalloc(_len, sizeof(u32), 4255 GFP_KERNEL); 4256 if (!cap_auths[i].match.gids) { 4257 pr_err_client(cl, "No memory for gids\n"); 4258 goto fail; 4259 } 4260 4261 cap_auths[i].match.num_gids = _len; 4262 for (j = 0; j < _len; j++) 4263 ceph_decode_32_safe(&p, end, 4264 cap_auths[i].match.gids[j], 4265 bad); 4266 } 4267 4268 ceph_decode_32_safe(&p, end, _len, bad); 4269 if (_len) { 4270 cap_auths[i].match.path = kcalloc(_len + 1, sizeof(char), 4271 GFP_KERNEL); 4272 if (!cap_auths[i].match.path) { 4273 pr_err_client(cl, "No memory for path\n"); 4274 goto fail; 4275 } 4276 ceph_decode_copy(&p, cap_auths[i].match.path, _len); 4277 4278 /* Remove the tailing '/' */ 4279 while (_len && cap_auths[i].match.path[_len - 1] == '/') { 4280 cap_auths[i].match.path[_len - 1] = '\0'; 4281 _len -= 1; 4282 } 4283 } 4284 4285 ceph_decode_32_safe(&p, end, _len, bad); 4286 if (_len) { 4287 cap_auths[i].match.fs_name = kcalloc(_len + 1, sizeof(char), 4288 GFP_KERNEL); 4289 if (!cap_auths[i].match.fs_name) { 4290 pr_err_client(cl, "No memory for fs_name\n"); 4291 goto fail; 4292 } 4293 ceph_decode_copy(&p, cap_auths[i].match.fs_name, _len); 4294 } 4295 4296 ceph_decode_8_safe(&p, end, cap_auths[i].match.root_squash, bad); 4297 ceph_decode_8_safe(&p, end, cap_auths[i].readable, bad); 4298 ceph_decode_8_safe(&p, end, cap_auths[i].writeable, bad); 4299 doutc(cl, "uid %lld, num_gids %u, path %s, fs_name %s, root_squash %d, readable %d, writeable %d\n", 4300 cap_auths[i].match.uid, cap_auths[i].match.num_gids, 4301 cap_auths[i].match.path, cap_auths[i].match.fs_name, 4302 cap_auths[i].match.root_squash, 4303 cap_auths[i].readable, cap_auths[i].writeable); 4304 } 4305 } 4306 4307 skip_cap_auths: 4308 mutex_lock(&mdsc->mutex); 4309 if (op == CEPH_SESSION_OPEN) { 4310 if (mdsc->s_cap_auths) { 4311 for (i = 0; i < mdsc->s_cap_auths_num; i++) { 4312 kfree(mdsc->s_cap_auths[i].match.gids); 4313 kfree(mdsc->s_cap_auths[i].match.path); 4314 kfree(mdsc->s_cap_auths[i].match.fs_name); 4315 } 4316 kfree(mdsc->s_cap_auths); 4317 } 4318 mdsc->s_cap_auths_num = cap_auths_num; 4319 mdsc->s_cap_auths = cap_auths; 4320 } 4321 if (op == CEPH_SESSION_CLOSE) { 4322 ceph_get_mds_session(session); 4323 __unregister_session(mdsc, session); 4324 } 4325 /* FIXME: this ttl calculation is generous */ 4326 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 4327 mutex_unlock(&mdsc->mutex); 4328 4329 mutex_lock(&session->s_mutex); 4330 4331 doutc(cl, "mds%d %s %p state %s seq %llu\n", mds, 4332 ceph_session_op_name(op), session, 4333 ceph_session_state_name(session->s_state), seq); 4334 4335 if (session->s_state == CEPH_MDS_SESSION_HUNG) { 4336 session->s_state = CEPH_MDS_SESSION_OPEN; 4337 pr_info_client(cl, "mds%d came back\n", session->s_mds); 4338 } 4339 4340 switch (op) { 4341 case CEPH_SESSION_OPEN: 4342 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 4343 pr_info_client(cl, "mds%d reconnect success\n", 4344 session->s_mds); 4345 4346 session->s_features = features; 4347 if (session->s_state == CEPH_MDS_SESSION_OPEN) { 4348 pr_notice_client(cl, "mds%d is already opened\n", 4349 session->s_mds); 4350 } else { 4351 session->s_state = CEPH_MDS_SESSION_OPEN; 4352 renewed_caps(mdsc, session, 0); 4353 if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, 4354 &session->s_features)) 4355 metric_schedule_delayed(&mdsc->metric); 4356 } 4357 4358 /* 4359 * The connection maybe broken and the session in client 4360 * side has been reinitialized, need to update the seq 4361 * anyway. 4362 */ 4363 if (!session->s_seq && seq) 4364 session->s_seq = seq; 4365 4366 wake = 1; 4367 if (mdsc->stopping) 4368 __close_session(mdsc, session); 4369 break; 4370 4371 case CEPH_SESSION_RENEWCAPS: 4372 if (session->s_renew_seq == seq) 4373 renewed_caps(mdsc, session, 1); 4374 break; 4375 4376 case CEPH_SESSION_CLOSE: 4377 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 4378 pr_info_client(cl, "mds%d reconnect denied\n", 4379 session->s_mds); 4380 session->s_state = CEPH_MDS_SESSION_CLOSED; 4381 cleanup_session_requests(mdsc, session); 4382 remove_session_caps(session); 4383 wake = 2; /* for good measure */ 4384 wake_up_all(&mdsc->session_close_wq); 4385 break; 4386 4387 case CEPH_SESSION_STALE: 4388 pr_info_client(cl, "mds%d caps went stale, renewing\n", 4389 session->s_mds); 4390 atomic_inc(&session->s_cap_gen); 4391 session->s_cap_ttl = jiffies - 1; 4392 send_renew_caps(mdsc, session); 4393 break; 4394 4395 case CEPH_SESSION_RECALL_STATE: 4396 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 4397 break; 4398 4399 case CEPH_SESSION_FLUSHMSG: 4400 /* flush cap releases */ 4401 spin_lock(&session->s_cap_lock); 4402 if (session->s_num_cap_releases) 4403 ceph_flush_session_cap_releases(mdsc, session); 4404 spin_unlock(&session->s_cap_lock); 4405 4406 send_flushmsg_ack(mdsc, session, seq); 4407 break; 4408 4409 case CEPH_SESSION_FORCE_RO: 4410 doutc(cl, "force_session_readonly %p\n", session); 4411 spin_lock(&session->s_cap_lock); 4412 session->s_readonly = true; 4413 spin_unlock(&session->s_cap_lock); 4414 wake_up_session_caps(session, FORCE_RO); 4415 break; 4416 4417 case CEPH_SESSION_REJECT: 4418 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING); 4419 pr_info_client(cl, "mds%d rejected session\n", 4420 session->s_mds); 4421 session->s_state = CEPH_MDS_SESSION_REJECTED; 4422 cleanup_session_requests(mdsc, session); 4423 remove_session_caps(session); 4424 if (blocklisted) 4425 mdsc->fsc->blocklisted = true; 4426 wake = 2; /* for good measure */ 4427 break; 4428 4429 default: 4430 pr_err_client(cl, "bad op %d mds%d\n", op, mds); 4431 WARN_ON(1); 4432 } 4433 4434 mutex_unlock(&session->s_mutex); 4435 if (wake) { 4436 mutex_lock(&mdsc->mutex); 4437 __wake_requests(mdsc, &session->s_waiting); 4438 if (wake == 2) 4439 kick_requests(mdsc, mds); 4440 mutex_unlock(&mdsc->mutex); 4441 } 4442 if (op == CEPH_SESSION_CLOSE) 4443 ceph_put_mds_session(session); 4444 return; 4445 4446 bad: 4447 pr_err_client(cl, "corrupt message mds%d len %d\n", mds, 4448 (int)msg->front.iov_len); 4449 ceph_msg_dump(msg); 4450 fail: 4451 for (i = 0; i < cap_auths_num; i++) { 4452 kfree(cap_auths[i].match.gids); 4453 kfree(cap_auths[i].match.path); 4454 kfree(cap_auths[i].match.fs_name); 4455 } 4456 kfree(cap_auths); 4457 return; 4458 } 4459 4460 void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req) 4461 { 4462 struct ceph_client *cl = req->r_mdsc->fsc->client; 4463 int dcaps; 4464 4465 dcaps = xchg(&req->r_dir_caps, 0); 4466 if (dcaps) { 4467 doutc(cl, "releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 4468 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps); 4469 } 4470 } 4471 4472 void ceph_mdsc_release_dir_caps_async(struct ceph_mds_request *req) 4473 { 4474 struct ceph_client *cl = req->r_mdsc->fsc->client; 4475 int dcaps; 4476 4477 dcaps = xchg(&req->r_dir_caps, 0); 4478 if (dcaps) { 4479 doutc(cl, "releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 4480 ceph_put_cap_refs_async(ceph_inode(req->r_parent), dcaps); 4481 } 4482 } 4483 4484 /* 4485 * called under session->mutex. 4486 */ 4487 static void replay_unsafe_requests(struct ceph_mds_client *mdsc, 4488 struct ceph_mds_session *session) 4489 { 4490 struct ceph_mds_request *req, *nreq; 4491 struct rb_node *p; 4492 4493 doutc(mdsc->fsc->client, "mds%d\n", session->s_mds); 4494 4495 mutex_lock(&mdsc->mutex); 4496 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) 4497 __send_request(session, req, true); 4498 4499 /* 4500 * also re-send old requests when MDS enters reconnect stage. So that MDS 4501 * can process completed request in clientreplay stage. 4502 */ 4503 p = rb_first(&mdsc->request_tree); 4504 while (p) { 4505 req = rb_entry(p, struct ceph_mds_request, r_node); 4506 p = rb_next(p); 4507 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 4508 continue; 4509 if (req->r_attempts == 0) 4510 continue; /* only old requests */ 4511 if (!req->r_session) 4512 continue; 4513 if (req->r_session->s_mds != session->s_mds) 4514 continue; 4515 4516 ceph_mdsc_release_dir_caps_async(req); 4517 4518 __send_request(session, req, true); 4519 } 4520 mutex_unlock(&mdsc->mutex); 4521 } 4522 4523 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state) 4524 { 4525 struct ceph_msg *reply; 4526 struct ceph_pagelist *_pagelist; 4527 struct page *page; 4528 __le32 *addr; 4529 int err = -ENOMEM; 4530 4531 if (!recon_state->allow_multi) 4532 return -ENOSPC; 4533 4534 /* can't handle message that contains both caps and realm */ 4535 BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms); 4536 4537 /* pre-allocate new pagelist */ 4538 _pagelist = ceph_pagelist_alloc(GFP_NOFS); 4539 if (!_pagelist) 4540 return -ENOMEM; 4541 4542 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 4543 if (!reply) 4544 goto fail_msg; 4545 4546 /* placeholder for nr_caps */ 4547 err = ceph_pagelist_encode_32(_pagelist, 0); 4548 if (err < 0) 4549 goto fail; 4550 4551 if (recon_state->nr_caps) { 4552 /* currently encoding caps */ 4553 err = ceph_pagelist_encode_32(recon_state->pagelist, 0); 4554 if (err) 4555 goto fail; 4556 } else { 4557 /* placeholder for nr_realms (currently encoding relams) */ 4558 err = ceph_pagelist_encode_32(_pagelist, 0); 4559 if (err < 0) 4560 goto fail; 4561 } 4562 4563 err = ceph_pagelist_encode_8(recon_state->pagelist, 1); 4564 if (err) 4565 goto fail; 4566 4567 page = list_first_entry(&recon_state->pagelist->head, struct page, lru); 4568 addr = kmap_atomic(page); 4569 if (recon_state->nr_caps) { 4570 /* currently encoding caps */ 4571 *addr = cpu_to_le32(recon_state->nr_caps); 4572 } else { 4573 /* currently encoding relams */ 4574 *(addr + 1) = cpu_to_le32(recon_state->nr_realms); 4575 } 4576 kunmap_atomic(addr); 4577 4578 reply->hdr.version = cpu_to_le16(5); 4579 reply->hdr.compat_version = cpu_to_le16(4); 4580 4581 reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length); 4582 ceph_msg_data_add_pagelist(reply, recon_state->pagelist); 4583 4584 ceph_con_send(&recon_state->session->s_con, reply); 4585 ceph_pagelist_release(recon_state->pagelist); 4586 4587 recon_state->pagelist = _pagelist; 4588 recon_state->nr_caps = 0; 4589 recon_state->nr_realms = 0; 4590 recon_state->msg_version = 5; 4591 return 0; 4592 fail: 4593 ceph_msg_put(reply); 4594 fail_msg: 4595 ceph_pagelist_release(_pagelist); 4596 return err; 4597 } 4598 4599 static struct dentry* d_find_primary(struct inode *inode) 4600 { 4601 struct dentry *alias, *dn = NULL; 4602 4603 if (hlist_empty(&inode->i_dentry)) 4604 return NULL; 4605 4606 spin_lock(&inode->i_lock); 4607 if (hlist_empty(&inode->i_dentry)) 4608 goto out_unlock; 4609 4610 if (S_ISDIR(inode->i_mode)) { 4611 alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias); 4612 if (!IS_ROOT(alias)) 4613 dn = dget(alias); 4614 goto out_unlock; 4615 } 4616 4617 hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) { 4618 spin_lock(&alias->d_lock); 4619 if (!d_unhashed(alias) && 4620 (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) { 4621 dn = dget_dlock(alias); 4622 } 4623 spin_unlock(&alias->d_lock); 4624 if (dn) 4625 break; 4626 } 4627 out_unlock: 4628 spin_unlock(&inode->i_lock); 4629 return dn; 4630 } 4631 4632 /* 4633 * Encode information about a cap for a reconnect with the MDS. 4634 */ 4635 static int reconnect_caps_cb(struct inode *inode, int mds, void *arg) 4636 { 4637 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); 4638 struct ceph_client *cl = ceph_inode_to_client(inode); 4639 union { 4640 struct ceph_mds_cap_reconnect v2; 4641 struct ceph_mds_cap_reconnect_v1 v1; 4642 } rec; 4643 struct ceph_inode_info *ci = ceph_inode(inode); 4644 struct ceph_reconnect_state *recon_state = arg; 4645 struct ceph_pagelist *pagelist = recon_state->pagelist; 4646 struct dentry *dentry; 4647 struct ceph_cap *cap; 4648 struct ceph_path_info path_info = {0}; 4649 int err; 4650 u64 snap_follows; 4651 4652 dentry = d_find_primary(inode); 4653 if (dentry) { 4654 /* set pathbase to parent dir when msg_version >= 2 */ 4655 char *path = ceph_mdsc_build_path(mdsc, dentry, &path_info, 4656 recon_state->msg_version >= 2); 4657 dput(dentry); 4658 if (IS_ERR(path)) { 4659 err = PTR_ERR(path); 4660 goto out_err; 4661 } 4662 } 4663 4664 spin_lock(&ci->i_ceph_lock); 4665 cap = __get_cap_for_mds(ci, mds); 4666 if (!cap) { 4667 spin_unlock(&ci->i_ceph_lock); 4668 err = 0; 4669 goto out_err; 4670 } 4671 doutc(cl, " adding %p ino %llx.%llx cap %p %lld %s\n", inode, 4672 ceph_vinop(inode), cap, cap->cap_id, 4673 ceph_cap_string(cap->issued)); 4674 4675 cap->seq = 0; /* reset cap seq */ 4676 cap->issue_seq = 0; /* and issue_seq */ 4677 cap->mseq = 0; /* and migrate_seq */ 4678 cap->cap_gen = atomic_read(&cap->session->s_cap_gen); 4679 4680 /* These are lost when the session goes away */ 4681 if (S_ISDIR(inode->i_mode)) { 4682 if (cap->issued & CEPH_CAP_DIR_CREATE) { 4683 ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns)); 4684 memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout)); 4685 } 4686 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS; 4687 } 4688 4689 if (recon_state->msg_version >= 2) { 4690 rec.v2.cap_id = cpu_to_le64(cap->cap_id); 4691 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 4692 rec.v2.issued = cpu_to_le32(cap->issued); 4693 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 4694 rec.v2.pathbase = cpu_to_le64(path_info.vino.ino); 4695 rec.v2.flock_len = (__force __le32) 4696 ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1); 4697 } else { 4698 struct timespec64 ts; 4699 4700 rec.v1.cap_id = cpu_to_le64(cap->cap_id); 4701 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 4702 rec.v1.issued = cpu_to_le32(cap->issued); 4703 rec.v1.size = cpu_to_le64(i_size_read(inode)); 4704 ts = inode_get_mtime(inode); 4705 ceph_encode_timespec64(&rec.v1.mtime, &ts); 4706 ts = inode_get_atime(inode); 4707 ceph_encode_timespec64(&rec.v1.atime, &ts); 4708 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 4709 rec.v1.pathbase = cpu_to_le64(path_info.vino.ino); 4710 } 4711 4712 if (list_empty(&ci->i_cap_snaps)) { 4713 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0; 4714 } else { 4715 struct ceph_cap_snap *capsnap = 4716 list_first_entry(&ci->i_cap_snaps, 4717 struct ceph_cap_snap, ci_item); 4718 snap_follows = capsnap->follows; 4719 } 4720 spin_unlock(&ci->i_ceph_lock); 4721 4722 if (recon_state->msg_version >= 2) { 4723 int num_fcntl_locks, num_flock_locks; 4724 struct ceph_filelock *flocks = NULL; 4725 size_t struct_len, total_len = sizeof(u64); 4726 u8 struct_v = 0; 4727 4728 encode_again: 4729 if (rec.v2.flock_len) { 4730 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); 4731 } else { 4732 num_fcntl_locks = 0; 4733 num_flock_locks = 0; 4734 } 4735 if (num_fcntl_locks + num_flock_locks > 0) { 4736 flocks = kmalloc_objs(struct ceph_filelock, 4737 num_fcntl_locks + num_flock_locks, 4738 GFP_NOFS); 4739 if (!flocks) { 4740 err = -ENOMEM; 4741 goto out_err; 4742 } 4743 err = ceph_encode_locks_to_buffer(inode, flocks, 4744 num_fcntl_locks, 4745 num_flock_locks); 4746 if (err) { 4747 kfree(flocks); 4748 flocks = NULL; 4749 if (err == -ENOSPC) 4750 goto encode_again; 4751 goto out_err; 4752 } 4753 } else { 4754 kfree(flocks); 4755 flocks = NULL; 4756 } 4757 4758 if (recon_state->msg_version >= 3) { 4759 /* version, compat_version and struct_len */ 4760 total_len += 2 * sizeof(u8) + sizeof(u32); 4761 struct_v = 2; 4762 } 4763 /* 4764 * number of encoded locks is stable, so copy to pagelist 4765 */ 4766 struct_len = 2 * sizeof(u32) + 4767 (num_fcntl_locks + num_flock_locks) * 4768 sizeof(struct ceph_filelock); 4769 rec.v2.flock_len = cpu_to_le32(struct_len); 4770 4771 struct_len += sizeof(u32) + path_info.pathlen + sizeof(rec.v2); 4772 4773 if (struct_v >= 2) 4774 struct_len += sizeof(u64); /* snap_follows */ 4775 4776 total_len += struct_len; 4777 4778 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) { 4779 err = send_reconnect_partial(recon_state); 4780 if (err) 4781 goto out_freeflocks; 4782 pagelist = recon_state->pagelist; 4783 } 4784 4785 err = ceph_pagelist_reserve(pagelist, total_len); 4786 if (err) 4787 goto out_freeflocks; 4788 4789 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 4790 if (recon_state->msg_version >= 3) { 4791 ceph_pagelist_encode_8(pagelist, struct_v); 4792 ceph_pagelist_encode_8(pagelist, 1); 4793 ceph_pagelist_encode_32(pagelist, struct_len); 4794 } 4795 ceph_pagelist_encode_string(pagelist, (char *)path_info.path, path_info.pathlen); 4796 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2)); 4797 ceph_locks_to_pagelist(flocks, pagelist, 4798 num_fcntl_locks, num_flock_locks); 4799 if (struct_v >= 2) 4800 ceph_pagelist_encode_64(pagelist, snap_follows); 4801 out_freeflocks: 4802 kfree(flocks); 4803 } else { 4804 err = ceph_pagelist_reserve(pagelist, 4805 sizeof(u64) + sizeof(u32) + 4806 path_info.pathlen + sizeof(rec.v1)); 4807 if (err) 4808 goto out_err; 4809 4810 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 4811 ceph_pagelist_encode_string(pagelist, (char *)path_info.path, path_info.pathlen); 4812 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); 4813 } 4814 4815 out_err: 4816 ceph_mdsc_free_path_info(&path_info); 4817 if (!err) 4818 recon_state->nr_caps++; 4819 return err; 4820 } 4821 4822 static int encode_snap_realms(struct ceph_mds_client *mdsc, 4823 struct ceph_reconnect_state *recon_state) 4824 { 4825 struct rb_node *p; 4826 struct ceph_pagelist *pagelist = recon_state->pagelist; 4827 struct ceph_client *cl = mdsc->fsc->client; 4828 int err = 0; 4829 4830 if (recon_state->msg_version >= 4) { 4831 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms); 4832 if (err < 0) 4833 goto fail; 4834 } 4835 4836 /* 4837 * snaprealms. we provide mds with the ino, seq (version), and 4838 * parent for all of our realms. If the mds has any newer info, 4839 * it will tell us. 4840 */ 4841 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { 4842 struct ceph_snap_realm *realm = 4843 rb_entry(p, struct ceph_snap_realm, node); 4844 struct ceph_mds_snaprealm_reconnect sr_rec; 4845 4846 if (recon_state->msg_version >= 4) { 4847 size_t need = sizeof(u8) * 2 + sizeof(u32) + 4848 sizeof(sr_rec); 4849 4850 if (pagelist->length + need > RECONNECT_MAX_SIZE) { 4851 err = send_reconnect_partial(recon_state); 4852 if (err) 4853 goto fail; 4854 pagelist = recon_state->pagelist; 4855 } 4856 4857 err = ceph_pagelist_reserve(pagelist, need); 4858 if (err) 4859 goto fail; 4860 4861 ceph_pagelist_encode_8(pagelist, 1); 4862 ceph_pagelist_encode_8(pagelist, 1); 4863 ceph_pagelist_encode_32(pagelist, sizeof(sr_rec)); 4864 } 4865 4866 doutc(cl, " adding snap realm %llx seq %lld parent %llx\n", 4867 realm->ino, realm->seq, realm->parent_ino); 4868 sr_rec.ino = cpu_to_le64(realm->ino); 4869 sr_rec.seq = cpu_to_le64(realm->seq); 4870 sr_rec.parent = cpu_to_le64(realm->parent_ino); 4871 4872 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); 4873 if (err) 4874 goto fail; 4875 4876 recon_state->nr_realms++; 4877 } 4878 fail: 4879 return err; 4880 } 4881 4882 4883 /* 4884 * If an MDS fails and recovers, clients need to reconnect in order to 4885 * reestablish shared state. This includes all caps issued through 4886 * this session _and_ the snap_realm hierarchy. Because it's not 4887 * clear which snap realms the mds cares about, we send everything we 4888 * know about.. that ensures we'll then get any new info the 4889 * recovering MDS might have. 4890 * 4891 * This is a relatively heavyweight operation, but it's rare. 4892 */ 4893 static void send_mds_reconnect(struct ceph_mds_client *mdsc, 4894 struct ceph_mds_session *session) 4895 { 4896 struct ceph_client *cl = mdsc->fsc->client; 4897 struct ceph_msg *reply; 4898 int mds = session->s_mds; 4899 int err = -ENOMEM; 4900 struct ceph_reconnect_state recon_state = { 4901 .session = session, 4902 }; 4903 LIST_HEAD(dispose); 4904 4905 pr_info_client(cl, "mds%d reconnect start\n", mds); 4906 4907 recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS); 4908 if (!recon_state.pagelist) 4909 goto fail_nopagelist; 4910 4911 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 4912 if (!reply) 4913 goto fail_nomsg; 4914 4915 xa_destroy(&session->s_delegated_inos); 4916 4917 mutex_lock(&session->s_mutex); 4918 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 4919 session->s_seq = 0; 4920 4921 doutc(cl, "session %p state %s\n", session, 4922 ceph_session_state_name(session->s_state)); 4923 4924 atomic_inc(&session->s_cap_gen); 4925 4926 spin_lock(&session->s_cap_lock); 4927 /* don't know if session is readonly */ 4928 session->s_readonly = 0; 4929 /* 4930 * notify __ceph_remove_cap() that we are composing cap reconnect. 4931 * If a cap get released before being added to the cap reconnect, 4932 * __ceph_remove_cap() should skip queuing cap release. 4933 */ 4934 session->s_cap_reconnect = 1; 4935 /* drop old cap expires; we're about to reestablish that state */ 4936 detach_cap_releases(session, &dispose); 4937 spin_unlock(&session->s_cap_lock); 4938 dispose_cap_releases(mdsc, &dispose); 4939 4940 /* trim unused caps to reduce MDS's cache rejoin time */ 4941 if (mdsc->fsc->sb->s_root) 4942 shrink_dcache_parent(mdsc->fsc->sb->s_root); 4943 4944 ceph_con_close(&session->s_con); 4945 ceph_con_open(&session->s_con, 4946 CEPH_ENTITY_TYPE_MDS, mds, 4947 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 4948 4949 /* replay unsafe requests */ 4950 replay_unsafe_requests(mdsc, session); 4951 4952 ceph_early_kick_flushing_caps(mdsc, session); 4953 4954 down_read(&mdsc->snap_rwsem); 4955 4956 /* placeholder for nr_caps */ 4957 err = ceph_pagelist_encode_32(recon_state.pagelist, 0); 4958 if (err) 4959 goto fail; 4960 4961 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) { 4962 recon_state.msg_version = 3; 4963 recon_state.allow_multi = true; 4964 } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) { 4965 recon_state.msg_version = 3; 4966 } else { 4967 recon_state.msg_version = 2; 4968 } 4969 /* traverse this session's caps */ 4970 err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state); 4971 4972 spin_lock(&session->s_cap_lock); 4973 session->s_cap_reconnect = 0; 4974 spin_unlock(&session->s_cap_lock); 4975 4976 if (err < 0) 4977 goto fail; 4978 4979 /* check if all realms can be encoded into current message */ 4980 if (mdsc->num_snap_realms) { 4981 size_t total_len = 4982 recon_state.pagelist->length + 4983 mdsc->num_snap_realms * 4984 sizeof(struct ceph_mds_snaprealm_reconnect); 4985 if (recon_state.msg_version >= 4) { 4986 /* number of realms */ 4987 total_len += sizeof(u32); 4988 /* version, compat_version and struct_len */ 4989 total_len += mdsc->num_snap_realms * 4990 (2 * sizeof(u8) + sizeof(u32)); 4991 } 4992 if (total_len > RECONNECT_MAX_SIZE) { 4993 if (!recon_state.allow_multi) { 4994 err = -ENOSPC; 4995 goto fail; 4996 } 4997 if (recon_state.nr_caps) { 4998 err = send_reconnect_partial(&recon_state); 4999 if (err) 5000 goto fail; 5001 } 5002 recon_state.msg_version = 5; 5003 } 5004 } 5005 5006 err = encode_snap_realms(mdsc, &recon_state); 5007 if (err < 0) 5008 goto fail; 5009 5010 if (recon_state.msg_version >= 5) { 5011 err = ceph_pagelist_encode_8(recon_state.pagelist, 0); 5012 if (err < 0) 5013 goto fail; 5014 } 5015 5016 if (recon_state.nr_caps || recon_state.nr_realms) { 5017 struct page *page = 5018 list_first_entry(&recon_state.pagelist->head, 5019 struct page, lru); 5020 __le32 *addr = kmap_atomic(page); 5021 if (recon_state.nr_caps) { 5022 WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms); 5023 *addr = cpu_to_le32(recon_state.nr_caps); 5024 } else if (recon_state.msg_version >= 4) { 5025 *(addr + 1) = cpu_to_le32(recon_state.nr_realms); 5026 } 5027 kunmap_atomic(addr); 5028 } 5029 5030 reply->hdr.version = cpu_to_le16(recon_state.msg_version); 5031 if (recon_state.msg_version >= 4) 5032 reply->hdr.compat_version = cpu_to_le16(4); 5033 5034 reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length); 5035 ceph_msg_data_add_pagelist(reply, recon_state.pagelist); 5036 5037 ceph_con_send(&session->s_con, reply); 5038 5039 mutex_unlock(&session->s_mutex); 5040 5041 mutex_lock(&mdsc->mutex); 5042 __wake_requests(mdsc, &session->s_waiting); 5043 mutex_unlock(&mdsc->mutex); 5044 5045 up_read(&mdsc->snap_rwsem); 5046 ceph_pagelist_release(recon_state.pagelist); 5047 return; 5048 5049 fail: 5050 ceph_msg_put(reply); 5051 up_read(&mdsc->snap_rwsem); 5052 mutex_unlock(&session->s_mutex); 5053 fail_nomsg: 5054 ceph_pagelist_release(recon_state.pagelist); 5055 fail_nopagelist: 5056 pr_err_client(cl, "error %d preparing reconnect for mds%d\n", 5057 err, mds); 5058 return; 5059 } 5060 5061 5062 /* 5063 * compare old and new mdsmaps, kicking requests 5064 * and closing out old connections as necessary 5065 * 5066 * called under mdsc->mutex. 5067 */ 5068 static void check_new_map(struct ceph_mds_client *mdsc, 5069 struct ceph_mdsmap *newmap, 5070 struct ceph_mdsmap *oldmap) 5071 { 5072 int i, j, err; 5073 int oldstate, newstate; 5074 struct ceph_mds_session *s; 5075 unsigned long targets[DIV_ROUND_UP(CEPH_MAX_MDS, sizeof(unsigned long))] = {0}; 5076 struct ceph_client *cl = mdsc->fsc->client; 5077 5078 doutc(cl, "new %u old %u\n", newmap->m_epoch, oldmap->m_epoch); 5079 5080 if (newmap->m_info) { 5081 for (i = 0; i < newmap->possible_max_rank; i++) { 5082 for (j = 0; j < newmap->m_info[i].num_export_targets; j++) 5083 set_bit(newmap->m_info[i].export_targets[j], targets); 5084 } 5085 } 5086 5087 for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) { 5088 if (!mdsc->sessions[i]) 5089 continue; 5090 s = mdsc->sessions[i]; 5091 oldstate = ceph_mdsmap_get_state(oldmap, i); 5092 newstate = ceph_mdsmap_get_state(newmap, i); 5093 5094 doutc(cl, "mds%d state %s%s -> %s%s (session %s)\n", 5095 i, ceph_mds_state_name(oldstate), 5096 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", 5097 ceph_mds_state_name(newstate), 5098 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", 5099 ceph_session_state_name(s->s_state)); 5100 5101 if (i >= newmap->possible_max_rank) { 5102 /* force close session for stopped mds */ 5103 ceph_get_mds_session(s); 5104 __unregister_session(mdsc, s); 5105 __wake_requests(mdsc, &s->s_waiting); 5106 mutex_unlock(&mdsc->mutex); 5107 5108 mutex_lock(&s->s_mutex); 5109 cleanup_session_requests(mdsc, s); 5110 remove_session_caps(s); 5111 mutex_unlock(&s->s_mutex); 5112 5113 ceph_put_mds_session(s); 5114 5115 mutex_lock(&mdsc->mutex); 5116 kick_requests(mdsc, i); 5117 continue; 5118 } 5119 5120 if (memcmp(ceph_mdsmap_get_addr(oldmap, i), 5121 ceph_mdsmap_get_addr(newmap, i), 5122 sizeof(struct ceph_entity_addr))) { 5123 /* just close it */ 5124 mutex_unlock(&mdsc->mutex); 5125 mutex_lock(&s->s_mutex); 5126 mutex_lock(&mdsc->mutex); 5127 ceph_con_close(&s->s_con); 5128 mutex_unlock(&s->s_mutex); 5129 s->s_state = CEPH_MDS_SESSION_RESTARTING; 5130 } else if (oldstate == newstate) { 5131 continue; /* nothing new with this mds */ 5132 } 5133 5134 /* 5135 * send reconnect? 5136 */ 5137 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 5138 newstate >= CEPH_MDS_STATE_RECONNECT) { 5139 mutex_unlock(&mdsc->mutex); 5140 clear_bit(i, targets); 5141 send_mds_reconnect(mdsc, s); 5142 mutex_lock(&mdsc->mutex); 5143 } 5144 5145 /* 5146 * kick request on any mds that has gone active. 5147 */ 5148 if (oldstate < CEPH_MDS_STATE_ACTIVE && 5149 newstate >= CEPH_MDS_STATE_ACTIVE) { 5150 if (oldstate != CEPH_MDS_STATE_CREATING && 5151 oldstate != CEPH_MDS_STATE_STARTING) 5152 pr_info_client(cl, "mds%d recovery completed\n", 5153 s->s_mds); 5154 kick_requests(mdsc, i); 5155 mutex_unlock(&mdsc->mutex); 5156 mutex_lock(&s->s_mutex); 5157 mutex_lock(&mdsc->mutex); 5158 ceph_kick_flushing_caps(mdsc, s); 5159 mutex_unlock(&s->s_mutex); 5160 wake_up_session_caps(s, RECONNECT); 5161 } 5162 } 5163 5164 /* 5165 * Only open and reconnect sessions that don't exist yet. 5166 */ 5167 for (i = 0; i < newmap->possible_max_rank; i++) { 5168 /* 5169 * In case the import MDS is crashed just after 5170 * the EImportStart journal is flushed, so when 5171 * a standby MDS takes over it and is replaying 5172 * the EImportStart journal the new MDS daemon 5173 * will wait the client to reconnect it, but the 5174 * client may never register/open the session yet. 5175 * 5176 * Will try to reconnect that MDS daemon if the 5177 * rank number is in the export targets array and 5178 * is the up:reconnect state. 5179 */ 5180 newstate = ceph_mdsmap_get_state(newmap, i); 5181 if (!test_bit(i, targets) || newstate != CEPH_MDS_STATE_RECONNECT) 5182 continue; 5183 5184 /* 5185 * The session maybe registered and opened by some 5186 * requests which were choosing random MDSes during 5187 * the mdsc->mutex's unlock/lock gap below in rare 5188 * case. But the related MDS daemon will just queue 5189 * that requests and be still waiting for the client's 5190 * reconnection request in up:reconnect state. 5191 */ 5192 s = __ceph_lookup_mds_session(mdsc, i); 5193 if (likely(!s)) { 5194 s = __open_export_target_session(mdsc, i); 5195 if (IS_ERR(s)) { 5196 err = PTR_ERR(s); 5197 pr_err_client(cl, 5198 "failed to open export target session, err %d\n", 5199 err); 5200 continue; 5201 } 5202 } 5203 doutc(cl, "send reconnect to export target mds.%d\n", i); 5204 mutex_unlock(&mdsc->mutex); 5205 send_mds_reconnect(mdsc, s); 5206 ceph_put_mds_session(s); 5207 mutex_lock(&mdsc->mutex); 5208 } 5209 5210 for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) { 5211 s = mdsc->sessions[i]; 5212 if (!s) 5213 continue; 5214 if (!ceph_mdsmap_is_laggy(newmap, i)) 5215 continue; 5216 if (s->s_state == CEPH_MDS_SESSION_OPEN || 5217 s->s_state == CEPH_MDS_SESSION_HUNG || 5218 s->s_state == CEPH_MDS_SESSION_CLOSING) { 5219 doutc(cl, " connecting to export targets of laggy mds%d\n", i); 5220 __open_export_target_sessions(mdsc, s); 5221 } 5222 } 5223 } 5224 5225 5226 5227 /* 5228 * leases 5229 */ 5230 5231 /* 5232 * caller must hold session s_mutex, dentry->d_lock 5233 */ 5234 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry) 5235 { 5236 struct ceph_dentry_info *di = ceph_dentry(dentry); 5237 5238 ceph_put_mds_session(di->lease_session); 5239 di->lease_session = NULL; 5240 } 5241 5242 static void handle_lease(struct ceph_mds_client *mdsc, 5243 struct ceph_mds_session *session, 5244 struct ceph_msg *msg) 5245 { 5246 struct ceph_client *cl = mdsc->fsc->client; 5247 struct super_block *sb = mdsc->fsc->sb; 5248 struct inode *inode; 5249 struct dentry *parent, *dentry; 5250 struct ceph_dentry_info *di; 5251 int mds = session->s_mds; 5252 struct ceph_mds_lease *h = msg->front.iov_base; 5253 u32 seq; 5254 struct ceph_vino vino; 5255 struct qstr dname; 5256 int release = 0; 5257 5258 doutc(cl, "from mds%d\n", mds); 5259 5260 if (!ceph_inc_mds_stopping_blocker(mdsc, session)) 5261 return; 5262 5263 /* decode */ 5264 if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) 5265 goto bad; 5266 vino.ino = le64_to_cpu(h->ino); 5267 vino.snap = CEPH_NOSNAP; 5268 seq = le32_to_cpu(h->seq); 5269 dname.len = get_unaligned_le32(h + 1); 5270 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len) 5271 goto bad; 5272 dname.name = (void *)(h + 1) + sizeof(u32); 5273 5274 /* lookup inode */ 5275 inode = ceph_find_inode(sb, vino); 5276 doutc(cl, "%s, ino %llx %p %.*s\n", ceph_lease_op_name(h->action), 5277 vino.ino, inode, dname.len, dname.name); 5278 5279 mutex_lock(&session->s_mutex); 5280 if (!inode) { 5281 doutc(cl, "no inode %llx\n", vino.ino); 5282 goto release; 5283 } 5284 5285 /* dentry */ 5286 parent = d_find_alias(inode); 5287 if (!parent) { 5288 doutc(cl, "no parent dentry on inode %p\n", inode); 5289 WARN_ON(1); 5290 goto release; /* hrm... */ 5291 } 5292 dname.hash = full_name_hash(parent, dname.name, dname.len); 5293 dentry = d_lookup(parent, &dname); 5294 dput(parent); 5295 if (!dentry) 5296 goto release; 5297 5298 spin_lock(&dentry->d_lock); 5299 di = ceph_dentry(dentry); 5300 switch (h->action) { 5301 case CEPH_MDS_LEASE_REVOKE: 5302 if (di->lease_session == session) { 5303 if (ceph_seq_cmp(di->lease_seq, seq) > 0) 5304 h->seq = cpu_to_le32(di->lease_seq); 5305 __ceph_mdsc_drop_dentry_lease(dentry); 5306 } 5307 release = 1; 5308 break; 5309 5310 case CEPH_MDS_LEASE_RENEW: 5311 if (di->lease_session == session && 5312 di->lease_gen == atomic_read(&session->s_cap_gen) && 5313 di->lease_renew_from && 5314 di->lease_renew_after == 0) { 5315 unsigned long duration = 5316 msecs_to_jiffies(le32_to_cpu(h->duration_ms)); 5317 5318 di->lease_seq = seq; 5319 di->time = di->lease_renew_from + duration; 5320 di->lease_renew_after = di->lease_renew_from + 5321 (duration >> 1); 5322 di->lease_renew_from = 0; 5323 } 5324 break; 5325 } 5326 spin_unlock(&dentry->d_lock); 5327 dput(dentry); 5328 5329 if (!release) 5330 goto out; 5331 5332 release: 5333 /* let's just reuse the same message */ 5334 h->action = CEPH_MDS_LEASE_REVOKE_ACK; 5335 ceph_msg_get(msg); 5336 ceph_con_send(&session->s_con, msg); 5337 5338 out: 5339 mutex_unlock(&session->s_mutex); 5340 iput(inode); 5341 5342 ceph_dec_mds_stopping_blocker(mdsc); 5343 return; 5344 5345 bad: 5346 ceph_dec_mds_stopping_blocker(mdsc); 5347 5348 pr_err_client(cl, "corrupt lease message\n"); 5349 ceph_msg_dump(msg); 5350 } 5351 5352 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, 5353 struct dentry *dentry, char action, 5354 u32 seq) 5355 { 5356 struct ceph_client *cl = session->s_mdsc->fsc->client; 5357 struct ceph_msg *msg; 5358 struct ceph_mds_lease *lease; 5359 struct inode *dir; 5360 int len = sizeof(*lease) + sizeof(u32) + NAME_MAX; 5361 5362 doutc(cl, "identry %p %s to mds%d\n", dentry, ceph_lease_op_name(action), 5363 session->s_mds); 5364 5365 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); 5366 if (!msg) 5367 return; 5368 lease = msg->front.iov_base; 5369 lease->action = action; 5370 lease->seq = cpu_to_le32(seq); 5371 5372 spin_lock(&dentry->d_lock); 5373 dir = d_inode(dentry->d_parent); 5374 lease->ino = cpu_to_le64(ceph_ino(dir)); 5375 lease->first = lease->last = cpu_to_le64(ceph_snap(dir)); 5376 5377 put_unaligned_le32(dentry->d_name.len, lease + 1); 5378 memcpy((void *)(lease + 1) + 4, 5379 dentry->d_name.name, dentry->d_name.len); 5380 spin_unlock(&dentry->d_lock); 5381 5382 ceph_con_send(&session->s_con, msg); 5383 } 5384 5385 /* 5386 * lock unlock the session, to wait ongoing session activities 5387 */ 5388 static void lock_unlock_session(struct ceph_mds_session *s) 5389 { 5390 mutex_lock(&s->s_mutex); 5391 mutex_unlock(&s->s_mutex); 5392 } 5393 5394 static void maybe_recover_session(struct ceph_mds_client *mdsc) 5395 { 5396 struct ceph_client *cl = mdsc->fsc->client; 5397 struct ceph_fs_client *fsc = mdsc->fsc; 5398 5399 if (!ceph_test_mount_opt(fsc, CLEANRECOVER)) 5400 return; 5401 5402 if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED) 5403 return; 5404 5405 if (!READ_ONCE(fsc->blocklisted)) 5406 return; 5407 5408 pr_info_client(cl, "auto reconnect after blocklisted\n"); 5409 ceph_force_reconnect(fsc->sb); 5410 } 5411 5412 bool check_session_state(struct ceph_mds_session *s) 5413 { 5414 struct ceph_client *cl = s->s_mdsc->fsc->client; 5415 5416 switch (s->s_state) { 5417 case CEPH_MDS_SESSION_OPEN: 5418 if (s->s_ttl && time_after(jiffies, s->s_ttl)) { 5419 s->s_state = CEPH_MDS_SESSION_HUNG; 5420 pr_info_client(cl, "mds%d hung\n", s->s_mds); 5421 } 5422 break; 5423 case CEPH_MDS_SESSION_CLOSING: 5424 case CEPH_MDS_SESSION_NEW: 5425 case CEPH_MDS_SESSION_RESTARTING: 5426 case CEPH_MDS_SESSION_CLOSED: 5427 case CEPH_MDS_SESSION_REJECTED: 5428 return false; 5429 } 5430 5431 return true; 5432 } 5433 5434 /* 5435 * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply, 5436 * then we need to retransmit that request. 5437 */ 5438 void inc_session_sequence(struct ceph_mds_session *s) 5439 { 5440 struct ceph_client *cl = s->s_mdsc->fsc->client; 5441 5442 lockdep_assert_held(&s->s_mutex); 5443 5444 s->s_seq++; 5445 5446 if (s->s_state == CEPH_MDS_SESSION_CLOSING) { 5447 int ret; 5448 5449 doutc(cl, "resending session close request for mds%d\n", s->s_mds); 5450 ret = request_close_session(s); 5451 if (ret < 0) 5452 pr_err_client(cl, "unable to close session to mds%d: %d\n", 5453 s->s_mds, ret); 5454 } 5455 } 5456 5457 /* 5458 * delayed work -- periodically trim expired leases, renew caps with mds. If 5459 * the @delay parameter is set to 0 or if it's more than 5 secs, the default 5460 * workqueue delay value of 5 secs will be used. 5461 */ 5462 static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay) 5463 { 5464 unsigned long max_delay = HZ * 5; 5465 5466 /* 5 secs default delay */ 5467 if (!delay || (delay > max_delay)) 5468 delay = max_delay; 5469 schedule_delayed_work(&mdsc->delayed_work, 5470 round_jiffies_relative(delay)); 5471 } 5472 5473 static void delayed_work(struct work_struct *work) 5474 { 5475 struct ceph_mds_client *mdsc = 5476 container_of(work, struct ceph_mds_client, delayed_work.work); 5477 unsigned long delay; 5478 int renew_interval; 5479 int renew_caps; 5480 int i; 5481 5482 doutc(mdsc->fsc->client, "mdsc delayed_work\n"); 5483 5484 if (mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHED) 5485 return; 5486 5487 mutex_lock(&mdsc->mutex); 5488 renew_interval = mdsc->mdsmap->m_session_timeout >> 2; 5489 renew_caps = time_after_eq(jiffies, HZ*renew_interval + 5490 mdsc->last_renew_caps); 5491 if (renew_caps) 5492 mdsc->last_renew_caps = jiffies; 5493 5494 for (i = 0; i < mdsc->max_sessions; i++) { 5495 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 5496 if (!s) 5497 continue; 5498 5499 if (!check_session_state(s)) { 5500 ceph_put_mds_session(s); 5501 continue; 5502 } 5503 mutex_unlock(&mdsc->mutex); 5504 5505 ceph_flush_session_cap_releases(mdsc, s); 5506 5507 mutex_lock(&s->s_mutex); 5508 if (renew_caps) 5509 send_renew_caps(mdsc, s); 5510 else 5511 ceph_con_keepalive(&s->s_con); 5512 if (s->s_state == CEPH_MDS_SESSION_OPEN || 5513 s->s_state == CEPH_MDS_SESSION_HUNG) 5514 ceph_send_cap_releases(mdsc, s); 5515 mutex_unlock(&s->s_mutex); 5516 ceph_put_mds_session(s); 5517 5518 mutex_lock(&mdsc->mutex); 5519 } 5520 mutex_unlock(&mdsc->mutex); 5521 5522 delay = ceph_check_delayed_caps(mdsc); 5523 5524 ceph_queue_cap_reclaim_work(mdsc); 5525 5526 ceph_trim_snapid_map(mdsc); 5527 5528 maybe_recover_session(mdsc); 5529 5530 schedule_delayed(mdsc, delay); 5531 } 5532 5533 int ceph_mdsc_init(struct ceph_fs_client *fsc) 5534 5535 { 5536 struct ceph_mds_client *mdsc; 5537 int err; 5538 5539 mdsc = kzalloc_obj(struct ceph_mds_client, GFP_NOFS); 5540 if (!mdsc) 5541 return -ENOMEM; 5542 mdsc->fsc = fsc; 5543 mutex_init(&mdsc->mutex); 5544 mdsc->mdsmap = kzalloc_obj(*mdsc->mdsmap, GFP_NOFS); 5545 if (!mdsc->mdsmap) { 5546 err = -ENOMEM; 5547 goto err_mdsc; 5548 } 5549 5550 init_completion(&mdsc->safe_umount_waiters); 5551 spin_lock_init(&mdsc->stopping_lock); 5552 atomic_set(&mdsc->stopping_blockers, 0); 5553 init_completion(&mdsc->stopping_waiter); 5554 atomic64_set(&mdsc->dirty_folios, 0); 5555 init_waitqueue_head(&mdsc->flush_end_wq); 5556 init_waitqueue_head(&mdsc->session_close_wq); 5557 INIT_LIST_HEAD(&mdsc->waiting_for_map); 5558 mdsc->quotarealms_inodes = RB_ROOT; 5559 mutex_init(&mdsc->quotarealms_inodes_mutex); 5560 init_rwsem(&mdsc->snap_rwsem); 5561 mdsc->snap_realms = RB_ROOT; 5562 INIT_LIST_HEAD(&mdsc->snap_empty); 5563 spin_lock_init(&mdsc->snap_empty_lock); 5564 mdsc->request_tree = RB_ROOT; 5565 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); 5566 mdsc->last_renew_caps = jiffies; 5567 INIT_LIST_HEAD(&mdsc->cap_delay_list); 5568 #ifdef CONFIG_DEBUG_FS 5569 INIT_LIST_HEAD(&mdsc->cap_wait_list); 5570 #endif 5571 spin_lock_init(&mdsc->cap_delay_lock); 5572 INIT_LIST_HEAD(&mdsc->cap_unlink_delay_list); 5573 INIT_LIST_HEAD(&mdsc->snap_flush_list); 5574 spin_lock_init(&mdsc->snap_flush_lock); 5575 mdsc->last_cap_flush_tid = 1; 5576 INIT_LIST_HEAD(&mdsc->cap_flush_list); 5577 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); 5578 spin_lock_init(&mdsc->cap_dirty_lock); 5579 init_waitqueue_head(&mdsc->cap_flushing_wq); 5580 INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work); 5581 INIT_WORK(&mdsc->cap_unlink_work, ceph_cap_unlink_work); 5582 err = ceph_metric_init(&mdsc->metric); 5583 if (err) 5584 goto err_mdsmap; 5585 5586 spin_lock_init(&mdsc->dentry_list_lock); 5587 INIT_LIST_HEAD(&mdsc->dentry_leases); 5588 INIT_LIST_HEAD(&mdsc->dentry_dir_leases); 5589 5590 ceph_caps_init(mdsc); 5591 ceph_adjust_caps_max_min(mdsc, fsc->mount_options); 5592 5593 spin_lock_init(&mdsc->snapid_map_lock); 5594 mdsc->snapid_map_tree = RB_ROOT; 5595 INIT_LIST_HEAD(&mdsc->snapid_map_lru); 5596 5597 init_rwsem(&mdsc->pool_perm_rwsem); 5598 mdsc->pool_perm_tree = RB_ROOT; 5599 5600 strscpy(mdsc->nodename, utsname()->nodename, 5601 sizeof(mdsc->nodename)); 5602 5603 fsc->mdsc = mdsc; 5604 return 0; 5605 5606 err_mdsmap: 5607 kfree(mdsc->mdsmap); 5608 err_mdsc: 5609 kfree(mdsc); 5610 return err; 5611 } 5612 5613 /* 5614 * Wait for safe replies on open mds requests. If we time out, drop 5615 * all requests from the tree to avoid dangling dentry refs. 5616 */ 5617 static void wait_requests(struct ceph_mds_client *mdsc) 5618 { 5619 struct ceph_client *cl = mdsc->fsc->client; 5620 struct ceph_options *opts = mdsc->fsc->client->options; 5621 struct ceph_mds_request *req; 5622 5623 mutex_lock(&mdsc->mutex); 5624 if (__get_oldest_req(mdsc)) { 5625 mutex_unlock(&mdsc->mutex); 5626 5627 doutc(cl, "waiting for requests\n"); 5628 wait_for_completion_timeout(&mdsc->safe_umount_waiters, 5629 ceph_timeout_jiffies(opts->mount_timeout)); 5630 5631 /* tear down remaining requests */ 5632 mutex_lock(&mdsc->mutex); 5633 while ((req = __get_oldest_req(mdsc))) { 5634 doutc(cl, "timed out on tid %llu\n", req->r_tid); 5635 list_del_init(&req->r_wait); 5636 __unregister_request(mdsc, req); 5637 } 5638 } 5639 mutex_unlock(&mdsc->mutex); 5640 doutc(cl, "done\n"); 5641 } 5642 5643 void send_flush_mdlog(struct ceph_mds_session *s) 5644 { 5645 struct ceph_client *cl = s->s_mdsc->fsc->client; 5646 struct ceph_msg *msg; 5647 5648 /* 5649 * Pre-luminous MDS crashes when it sees an unknown session request 5650 */ 5651 if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS)) 5652 return; 5653 5654 mutex_lock(&s->s_mutex); 5655 doutc(cl, "request mdlog flush to mds%d (%s)s seq %lld\n", 5656 s->s_mds, ceph_session_state_name(s->s_state), s->s_seq); 5657 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG, 5658 s->s_seq); 5659 if (!msg) { 5660 pr_err_client(cl, "failed to request mdlog flush to mds%d (%s) seq %lld\n", 5661 s->s_mds, ceph_session_state_name(s->s_state), s->s_seq); 5662 } else { 5663 ceph_con_send(&s->s_con, msg); 5664 } 5665 mutex_unlock(&s->s_mutex); 5666 } 5667 5668 static int ceph_mds_auth_match(struct ceph_mds_client *mdsc, 5669 struct ceph_mds_cap_auth *auth, 5670 const struct cred *cred, 5671 char *tpath) 5672 { 5673 u32 caller_uid = from_kuid(&init_user_ns, cred->fsuid); 5674 u32 caller_gid = from_kgid(&init_user_ns, cred->fsgid); 5675 struct ceph_client *cl = mdsc->fsc->client; 5676 const char *fs_name = mdsc->mdsmap->m_fs_name; 5677 const char *spath = mdsc->fsc->mount_options->server_path; 5678 bool gid_matched = false; 5679 u32 gid, tlen, len; 5680 int i, j; 5681 5682 doutc(cl, "fsname check fs_name=%s match.fs_name=%s\n", 5683 fs_name, auth->match.fs_name ? auth->match.fs_name : ""); 5684 5685 if (!ceph_namespace_match(auth->match.fs_name, fs_name)) { 5686 /* fsname mismatch, try next one */ 5687 return 0; 5688 } 5689 5690 doutc(cl, "match.uid %lld\n", auth->match.uid); 5691 if (auth->match.uid != MDS_AUTH_UID_ANY) { 5692 if (auth->match.uid != caller_uid) 5693 return 0; 5694 if (auth->match.num_gids) { 5695 for (i = 0; i < auth->match.num_gids; i++) { 5696 if (caller_gid == auth->match.gids[i]) 5697 gid_matched = true; 5698 } 5699 if (!gid_matched && cred->group_info->ngroups) { 5700 for (i = 0; i < cred->group_info->ngroups; i++) { 5701 gid = from_kgid(&init_user_ns, 5702 cred->group_info->gid[i]); 5703 for (j = 0; j < auth->match.num_gids; j++) { 5704 if (gid == auth->match.gids[j]) { 5705 gid_matched = true; 5706 break; 5707 } 5708 } 5709 if (gid_matched) 5710 break; 5711 } 5712 } 5713 if (!gid_matched) 5714 return 0; 5715 } 5716 } 5717 5718 /* path match */ 5719 if (auth->match.path) { 5720 if (!tpath) 5721 return 0; 5722 5723 tlen = strlen(tpath); 5724 len = strlen(auth->match.path); 5725 if (len) { 5726 char *_tpath = tpath; 5727 bool free_tpath = false; 5728 int m, n; 5729 5730 doutc(cl, "server path %s, tpath %s, match.path %s\n", 5731 spath, tpath, auth->match.path); 5732 if (spath && (m = strlen(spath)) != 1) { 5733 /* mount path + '/' + tpath + an extra space */ 5734 n = m + 1 + tlen + 1; 5735 _tpath = kmalloc(n, GFP_NOFS); 5736 if (!_tpath) 5737 return -ENOMEM; 5738 /* remove the leading '/' */ 5739 snprintf(_tpath, n, "%s/%s", spath + 1, tpath); 5740 free_tpath = true; 5741 tlen = strlen(_tpath); 5742 } 5743 5744 /* 5745 * Please note the tailing '/' for match.path has already 5746 * been removed when parsing. 5747 * 5748 * Remove the tailing '/' for the target path. 5749 */ 5750 while (tlen && _tpath[tlen - 1] == '/') { 5751 _tpath[tlen - 1] = '\0'; 5752 tlen -= 1; 5753 } 5754 doutc(cl, "_tpath %s\n", _tpath); 5755 5756 /* 5757 * In case first == _tpath && tlen == len: 5758 * match.path=/foo --> /foo _path=/foo --> match 5759 * match.path=/foo/ --> /foo _path=/foo --> match 5760 * 5761 * In case first == _tmatch.path && tlen > len: 5762 * match.path=/foo/ --> /foo _path=/foo/ --> match 5763 * match.path=/foo --> /foo _path=/foo/ --> match 5764 * match.path=/foo/ --> /foo _path=/foo/d --> match 5765 * match.path=/foo --> /foo _path=/food --> mismatch 5766 * 5767 * All the other cases --> mismatch 5768 */ 5769 bool path_matched = true; 5770 char *first = strstr(_tpath, auth->match.path); 5771 if (first != _tpath || 5772 (tlen > len && _tpath[len] != '/')) { 5773 path_matched = false; 5774 } 5775 5776 if (free_tpath) 5777 kfree(_tpath); 5778 5779 if (!path_matched) 5780 return 0; 5781 } 5782 } 5783 5784 doutc(cl, "matched\n"); 5785 return 1; 5786 } 5787 5788 int ceph_mds_check_access(struct ceph_mds_client *mdsc, char *tpath, int mask) 5789 { 5790 const struct cred *cred = get_current_cred(); 5791 u32 caller_uid = from_kuid(&init_user_ns, cred->fsuid); 5792 u32 caller_gid = from_kgid(&init_user_ns, cred->fsgid); 5793 struct ceph_mds_cap_auth *rw_perms_s = NULL; 5794 struct ceph_client *cl = mdsc->fsc->client; 5795 bool root_squash_perms = true; 5796 int i, err; 5797 5798 doutc(cl, "tpath '%s', mask %d, caller_uid %d, caller_gid %d\n", 5799 tpath, mask, caller_uid, caller_gid); 5800 5801 for (i = 0; i < mdsc->s_cap_auths_num; i++) { 5802 struct ceph_mds_cap_auth *s = &mdsc->s_cap_auths[i]; 5803 5804 err = ceph_mds_auth_match(mdsc, s, cred, tpath); 5805 if (err < 0) { 5806 put_cred(cred); 5807 return err; 5808 } else if (err > 0) { 5809 /* always follow the last auth caps' permission */ 5810 root_squash_perms = true; 5811 rw_perms_s = NULL; 5812 if ((mask & MAY_WRITE) && s->writeable && 5813 s->match.root_squash && (!caller_uid || !caller_gid)) 5814 root_squash_perms = false; 5815 5816 if (((mask & MAY_WRITE) && !s->writeable) || 5817 ((mask & MAY_READ) && !s->readable)) 5818 rw_perms_s = s; 5819 } 5820 } 5821 5822 put_cred(cred); 5823 5824 doutc(cl, "root_squash_perms %d, rw_perms_s %p\n", root_squash_perms, 5825 rw_perms_s); 5826 if (root_squash_perms && rw_perms_s == NULL) { 5827 doutc(cl, "access allowed\n"); 5828 return 0; 5829 } 5830 5831 if (!root_squash_perms) { 5832 doutc(cl, "root_squash is enabled and user(%d %d) isn't allowed to write", 5833 caller_uid, caller_gid); 5834 } 5835 if (rw_perms_s) { 5836 doutc(cl, "mds auth caps readable/writeable %d/%d while request r/w %d/%d", 5837 rw_perms_s->readable, rw_perms_s->writeable, 5838 !!(mask & MAY_READ), !!(mask & MAY_WRITE)); 5839 } 5840 doutc(cl, "access denied\n"); 5841 return -EACCES; 5842 } 5843 5844 /* 5845 * called before mount is ro, and before dentries are torn down. 5846 * (hmm, does this still race with new lookups?) 5847 */ 5848 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) 5849 { 5850 doutc(mdsc->fsc->client, "begin\n"); 5851 mdsc->stopping = CEPH_MDSC_STOPPING_BEGIN; 5852 5853 ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true); 5854 ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false); 5855 ceph_flush_dirty_caps(mdsc); 5856 wait_requests(mdsc); 5857 5858 /* 5859 * wait for reply handlers to drop their request refs and 5860 * their inode/dcache refs 5861 */ 5862 ceph_msgr_flush(); 5863 5864 ceph_cleanup_quotarealms_inodes(mdsc); 5865 doutc(mdsc->fsc->client, "done\n"); 5866 } 5867 5868 /* 5869 * flush the mdlog and wait for all write mds requests to flush. 5870 */ 5871 static void flush_mdlog_and_wait_mdsc_unsafe_requests(struct ceph_mds_client *mdsc, 5872 u64 want_tid) 5873 { 5874 struct ceph_client *cl = mdsc->fsc->client; 5875 struct ceph_mds_request *req = NULL, *nextreq; 5876 struct ceph_mds_session *last_session = NULL; 5877 struct rb_node *n; 5878 5879 mutex_lock(&mdsc->mutex); 5880 doutc(cl, "want %lld\n", want_tid); 5881 restart: 5882 req = __get_oldest_req(mdsc); 5883 while (req && req->r_tid <= want_tid) { 5884 /* find next request */ 5885 n = rb_next(&req->r_node); 5886 if (n) 5887 nextreq = rb_entry(n, struct ceph_mds_request, r_node); 5888 else 5889 nextreq = NULL; 5890 if (req->r_op != CEPH_MDS_OP_SETFILELOCK && 5891 (req->r_op & CEPH_MDS_OP_WRITE)) { 5892 struct ceph_mds_session *s = req->r_session; 5893 5894 if (!s) { 5895 req = nextreq; 5896 continue; 5897 } 5898 5899 /* write op */ 5900 ceph_mdsc_get_request(req); 5901 if (nextreq) 5902 ceph_mdsc_get_request(nextreq); 5903 s = ceph_get_mds_session(s); 5904 mutex_unlock(&mdsc->mutex); 5905 5906 /* send flush mdlog request to MDS */ 5907 if (last_session != s) { 5908 send_flush_mdlog(s); 5909 ceph_put_mds_session(last_session); 5910 last_session = s; 5911 } else { 5912 ceph_put_mds_session(s); 5913 } 5914 doutc(cl, "wait on %llu (want %llu)\n", 5915 req->r_tid, want_tid); 5916 wait_for_completion(&req->r_safe_completion); 5917 5918 mutex_lock(&mdsc->mutex); 5919 ceph_mdsc_put_request(req); 5920 if (!nextreq) 5921 break; /* next dne before, so we're done! */ 5922 if (RB_EMPTY_NODE(&nextreq->r_node)) { 5923 /* next request was removed from tree */ 5924 ceph_mdsc_put_request(nextreq); 5925 goto restart; 5926 } 5927 ceph_mdsc_put_request(nextreq); /* won't go away */ 5928 } 5929 req = nextreq; 5930 } 5931 mutex_unlock(&mdsc->mutex); 5932 ceph_put_mds_session(last_session); 5933 doutc(cl, "done\n"); 5934 } 5935 5936 void ceph_mdsc_sync(struct ceph_mds_client *mdsc) 5937 { 5938 struct ceph_client *cl = mdsc->fsc->client; 5939 u64 want_tid, want_flush; 5940 5941 if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) 5942 return; 5943 5944 doutc(cl, "sync\n"); 5945 mutex_lock(&mdsc->mutex); 5946 want_tid = mdsc->last_tid; 5947 mutex_unlock(&mdsc->mutex); 5948 5949 ceph_flush_dirty_caps(mdsc); 5950 ceph_flush_cap_releases(mdsc); 5951 spin_lock(&mdsc->cap_dirty_lock); 5952 want_flush = mdsc->last_cap_flush_tid; 5953 if (!list_empty(&mdsc->cap_flush_list)) { 5954 struct ceph_cap_flush *cf = 5955 list_last_entry(&mdsc->cap_flush_list, 5956 struct ceph_cap_flush, g_list); 5957 cf->wake = true; 5958 } 5959 spin_unlock(&mdsc->cap_dirty_lock); 5960 5961 doutc(cl, "sync want tid %lld flush_seq %lld\n", want_tid, want_flush); 5962 5963 flush_mdlog_and_wait_mdsc_unsafe_requests(mdsc, want_tid); 5964 wait_caps_flush(mdsc, want_flush); 5965 } 5966 5967 /* 5968 * true if all sessions are closed, or we force unmount 5969 */ 5970 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped) 5971 { 5972 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 5973 return true; 5974 return atomic_read(&mdsc->num_sessions) <= skipped; 5975 } 5976 5977 /* 5978 * called after sb is ro or when metadata corrupted. 5979 */ 5980 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) 5981 { 5982 struct ceph_options *opts = mdsc->fsc->client->options; 5983 struct ceph_client *cl = mdsc->fsc->client; 5984 struct ceph_mds_session *session; 5985 int i; 5986 int skipped = 0; 5987 5988 doutc(cl, "begin\n"); 5989 5990 /* close sessions */ 5991 mutex_lock(&mdsc->mutex); 5992 for (i = 0; i < mdsc->max_sessions; i++) { 5993 session = __ceph_lookup_mds_session(mdsc, i); 5994 if (!session) 5995 continue; 5996 mutex_unlock(&mdsc->mutex); 5997 mutex_lock(&session->s_mutex); 5998 if (__close_session(mdsc, session) <= 0) 5999 skipped++; 6000 mutex_unlock(&session->s_mutex); 6001 ceph_put_mds_session(session); 6002 mutex_lock(&mdsc->mutex); 6003 } 6004 mutex_unlock(&mdsc->mutex); 6005 6006 doutc(cl, "waiting for sessions to close\n"); 6007 wait_event_timeout(mdsc->session_close_wq, 6008 done_closing_sessions(mdsc, skipped), 6009 ceph_timeout_jiffies(opts->mount_timeout)); 6010 6011 /* tear down remaining sessions */ 6012 mutex_lock(&mdsc->mutex); 6013 for (i = 0; i < mdsc->max_sessions; i++) { 6014 if (mdsc->sessions[i]) { 6015 session = ceph_get_mds_session(mdsc->sessions[i]); 6016 __unregister_session(mdsc, session); 6017 mutex_unlock(&mdsc->mutex); 6018 mutex_lock(&session->s_mutex); 6019 remove_session_caps(session); 6020 mutex_unlock(&session->s_mutex); 6021 ceph_put_mds_session(session); 6022 mutex_lock(&mdsc->mutex); 6023 } 6024 } 6025 WARN_ON(!list_empty(&mdsc->cap_delay_list)); 6026 mutex_unlock(&mdsc->mutex); 6027 6028 ceph_cleanup_snapid_map(mdsc); 6029 ceph_cleanup_global_and_empty_realms(mdsc); 6030 6031 cancel_work_sync(&mdsc->cap_reclaim_work); 6032 cancel_work_sync(&mdsc->cap_unlink_work); 6033 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 6034 6035 doutc(cl, "done\n"); 6036 } 6037 6038 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc) 6039 { 6040 struct ceph_mds_session *session; 6041 int mds; 6042 6043 doutc(mdsc->fsc->client, "force umount\n"); 6044 6045 mutex_lock(&mdsc->mutex); 6046 for (mds = 0; mds < mdsc->max_sessions; mds++) { 6047 session = __ceph_lookup_mds_session(mdsc, mds); 6048 if (!session) 6049 continue; 6050 6051 if (session->s_state == CEPH_MDS_SESSION_REJECTED) 6052 __unregister_session(mdsc, session); 6053 __wake_requests(mdsc, &session->s_waiting); 6054 mutex_unlock(&mdsc->mutex); 6055 6056 mutex_lock(&session->s_mutex); 6057 __close_session(mdsc, session); 6058 if (session->s_state == CEPH_MDS_SESSION_CLOSING) { 6059 cleanup_session_requests(mdsc, session); 6060 remove_session_caps(session); 6061 } 6062 mutex_unlock(&session->s_mutex); 6063 ceph_put_mds_session(session); 6064 6065 mutex_lock(&mdsc->mutex); 6066 kick_requests(mdsc, mds); 6067 } 6068 __wake_requests(mdsc, &mdsc->waiting_for_map); 6069 mutex_unlock(&mdsc->mutex); 6070 } 6071 6072 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) 6073 { 6074 doutc(mdsc->fsc->client, "stop\n"); 6075 /* 6076 * Make sure the delayed work stopped before releasing 6077 * the resources. 6078 * 6079 * Because the cancel_delayed_work_sync() will only 6080 * guarantee that the work finishes executing. But the 6081 * delayed work will re-arm itself again after that. 6082 */ 6083 flush_delayed_work(&mdsc->delayed_work); 6084 6085 if (mdsc->mdsmap) 6086 ceph_mdsmap_destroy(mdsc->mdsmap); 6087 kfree(mdsc->sessions); 6088 ceph_caps_finalize(mdsc); 6089 6090 if (mdsc->s_cap_auths) { 6091 int i; 6092 6093 for (i = 0; i < mdsc->s_cap_auths_num; i++) { 6094 kfree(mdsc->s_cap_auths[i].match.gids); 6095 kfree(mdsc->s_cap_auths[i].match.path); 6096 kfree(mdsc->s_cap_auths[i].match.fs_name); 6097 } 6098 kfree(mdsc->s_cap_auths); 6099 } 6100 6101 ceph_pool_perm_destroy(mdsc); 6102 } 6103 6104 void ceph_mdsc_destroy(struct ceph_fs_client *fsc) 6105 { 6106 struct ceph_mds_client *mdsc = fsc->mdsc; 6107 doutc(fsc->client, "%p\n", mdsc); 6108 6109 if (!mdsc) 6110 return; 6111 6112 /* flush out any connection work with references to us */ 6113 ceph_msgr_flush(); 6114 6115 ceph_mdsc_stop(mdsc); 6116 6117 ceph_metric_destroy(&mdsc->metric); 6118 6119 fsc->mdsc = NULL; 6120 kfree(mdsc); 6121 doutc(fsc->client, "%p done\n", mdsc); 6122 } 6123 6124 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 6125 { 6126 struct ceph_fs_client *fsc = mdsc->fsc; 6127 struct ceph_client *cl = fsc->client; 6128 const char *mds_namespace = fsc->mount_options->mds_namespace; 6129 void *p = msg->front.iov_base; 6130 void *end = p + msg->front.iov_len; 6131 u32 epoch; 6132 u32 num_fs; 6133 u32 mount_fscid = (u32)-1; 6134 int err = -EINVAL; 6135 6136 ceph_decode_need(&p, end, sizeof(u32), bad); 6137 epoch = ceph_decode_32(&p); 6138 6139 doutc(cl, "epoch %u\n", epoch); 6140 6141 /* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */ 6142 ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad); 6143 6144 ceph_decode_32_safe(&p, end, num_fs, bad); 6145 while (num_fs-- > 0) { 6146 void *info_p, *info_end; 6147 u32 info_len; 6148 u32 fscid, namelen; 6149 6150 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 6151 p += 2; // info_v, info_cv 6152 info_len = ceph_decode_32(&p); 6153 ceph_decode_need(&p, end, info_len, bad); 6154 info_p = p; 6155 info_end = p + info_len; 6156 p = info_end; 6157 6158 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad); 6159 fscid = ceph_decode_32(&info_p); 6160 namelen = ceph_decode_32(&info_p); 6161 ceph_decode_need(&info_p, info_end, namelen, bad); 6162 6163 if (mds_namespace && 6164 strlen(mds_namespace) == namelen && 6165 !strncmp(mds_namespace, (char *)info_p, namelen)) { 6166 mount_fscid = fscid; 6167 break; 6168 } 6169 } 6170 6171 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch); 6172 if (mount_fscid != (u32)-1) { 6173 fsc->client->monc.fs_cluster_id = mount_fscid; 6174 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 6175 0, true); 6176 ceph_monc_renew_subs(&fsc->client->monc); 6177 } else { 6178 err = -ENOENT; 6179 goto err_out; 6180 } 6181 return; 6182 6183 bad: 6184 pr_err_client(cl, "error decoding fsmap %d. Shutting down mount.\n", 6185 err); 6186 ceph_umount_begin(mdsc->fsc->sb); 6187 ceph_msg_dump(msg); 6188 err_out: 6189 mutex_lock(&mdsc->mutex); 6190 mdsc->mdsmap_err = err; 6191 __wake_requests(mdsc, &mdsc->waiting_for_map); 6192 mutex_unlock(&mdsc->mutex); 6193 } 6194 6195 /* 6196 * handle mds map update. 6197 */ 6198 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 6199 { 6200 struct ceph_client *cl = mdsc->fsc->client; 6201 u32 epoch; 6202 u32 maplen; 6203 void *p = msg->front.iov_base; 6204 void *end = p + msg->front.iov_len; 6205 struct ceph_mdsmap *newmap, *oldmap; 6206 struct ceph_fsid fsid; 6207 int err = -EINVAL; 6208 6209 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad); 6210 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 6211 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0) 6212 return; 6213 epoch = ceph_decode_32(&p); 6214 maplen = ceph_decode_32(&p); 6215 doutc(cl, "epoch %u len %d\n", epoch, (int)maplen); 6216 6217 /* do we need it? */ 6218 mutex_lock(&mdsc->mutex); 6219 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 6220 doutc(cl, "epoch %u <= our %u\n", epoch, mdsc->mdsmap->m_epoch); 6221 mutex_unlock(&mdsc->mutex); 6222 return; 6223 } 6224 6225 newmap = ceph_mdsmap_decode(mdsc, &p, end, ceph_msgr2(mdsc->fsc->client)); 6226 if (IS_ERR(newmap)) { 6227 err = PTR_ERR(newmap); 6228 goto bad_unlock; 6229 } 6230 6231 /* swap into place */ 6232 if (mdsc->mdsmap) { 6233 oldmap = mdsc->mdsmap; 6234 mdsc->mdsmap = newmap; 6235 check_new_map(mdsc, newmap, oldmap); 6236 ceph_mdsmap_destroy(oldmap); 6237 } else { 6238 mdsc->mdsmap = newmap; /* first mds map */ 6239 } 6240 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size, 6241 MAX_LFS_FILESIZE); 6242 6243 __wake_requests(mdsc, &mdsc->waiting_for_map); 6244 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP, 6245 mdsc->mdsmap->m_epoch); 6246 6247 mutex_unlock(&mdsc->mutex); 6248 schedule_delayed(mdsc, 0); 6249 return; 6250 6251 bad_unlock: 6252 mutex_unlock(&mdsc->mutex); 6253 bad: 6254 pr_err_client(cl, "error decoding mdsmap %d. Shutting down mount.\n", 6255 err); 6256 ceph_umount_begin(mdsc->fsc->sb); 6257 ceph_msg_dump(msg); 6258 return; 6259 } 6260 6261 static struct ceph_connection *mds_get_con(struct ceph_connection *con) 6262 { 6263 struct ceph_mds_session *s = con->private; 6264 6265 if (ceph_get_mds_session(s)) 6266 return con; 6267 return NULL; 6268 } 6269 6270 static void mds_put_con(struct ceph_connection *con) 6271 { 6272 struct ceph_mds_session *s = con->private; 6273 6274 ceph_put_mds_session(s); 6275 } 6276 6277 /* 6278 * if the client is unresponsive for long enough, the mds will kill 6279 * the session entirely. 6280 */ 6281 static void mds_peer_reset(struct ceph_connection *con) 6282 { 6283 struct ceph_mds_session *s = con->private; 6284 struct ceph_mds_client *mdsc = s->s_mdsc; 6285 6286 pr_warn_client(mdsc->fsc->client, "mds%d closed our session\n", 6287 s->s_mds); 6288 if (READ_ONCE(mdsc->fsc->mount_state) != CEPH_MOUNT_FENCE_IO && 6289 ceph_mdsmap_get_state(mdsc->mdsmap, s->s_mds) >= CEPH_MDS_STATE_RECONNECT) 6290 send_mds_reconnect(mdsc, s); 6291 } 6292 6293 static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg) 6294 { 6295 struct ceph_mds_session *s = con->private; 6296 struct ceph_mds_client *mdsc = s->s_mdsc; 6297 struct ceph_client *cl = mdsc->fsc->client; 6298 int type = le16_to_cpu(msg->hdr.type); 6299 6300 mutex_lock(&mdsc->mutex); 6301 if (__verify_registered_session(mdsc, s) < 0) { 6302 mutex_unlock(&mdsc->mutex); 6303 goto out; 6304 } 6305 mutex_unlock(&mdsc->mutex); 6306 6307 switch (type) { 6308 case CEPH_MSG_MDS_MAP: 6309 ceph_mdsc_handle_mdsmap(mdsc, msg); 6310 break; 6311 case CEPH_MSG_FS_MAP_USER: 6312 ceph_mdsc_handle_fsmap(mdsc, msg); 6313 break; 6314 case CEPH_MSG_CLIENT_SESSION: 6315 handle_session(s, msg); 6316 break; 6317 case CEPH_MSG_CLIENT_REPLY: 6318 handle_reply(s, msg); 6319 break; 6320 case CEPH_MSG_CLIENT_REQUEST_FORWARD: 6321 handle_forward(mdsc, s, msg); 6322 break; 6323 case CEPH_MSG_CLIENT_CAPS: 6324 ceph_handle_caps(s, msg); 6325 break; 6326 case CEPH_MSG_CLIENT_SNAP: 6327 ceph_handle_snap(mdsc, s, msg); 6328 break; 6329 case CEPH_MSG_CLIENT_LEASE: 6330 handle_lease(mdsc, s, msg); 6331 break; 6332 case CEPH_MSG_CLIENT_QUOTA: 6333 ceph_handle_quota(mdsc, s, msg); 6334 break; 6335 6336 default: 6337 pr_err_client(cl, "received unknown message type %d %s\n", 6338 type, ceph_msg_type_name(type)); 6339 } 6340 out: 6341 ceph_msg_put(msg); 6342 } 6343 6344 /* 6345 * authentication 6346 */ 6347 6348 /* 6349 * Note: returned pointer is the address of a structure that's 6350 * managed separately. Caller must *not* attempt to free it. 6351 */ 6352 static struct ceph_auth_handshake * 6353 mds_get_authorizer(struct ceph_connection *con, int *proto, int force_new) 6354 { 6355 struct ceph_mds_session *s = con->private; 6356 struct ceph_mds_client *mdsc = s->s_mdsc; 6357 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 6358 struct ceph_auth_handshake *auth = &s->s_auth; 6359 int ret; 6360 6361 ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS, 6362 force_new, proto, NULL, NULL); 6363 if (ret) 6364 return ERR_PTR(ret); 6365 6366 return auth; 6367 } 6368 6369 static int mds_add_authorizer_challenge(struct ceph_connection *con, 6370 void *challenge_buf, int challenge_buf_len) 6371 { 6372 struct ceph_mds_session *s = con->private; 6373 struct ceph_mds_client *mdsc = s->s_mdsc; 6374 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 6375 6376 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer, 6377 challenge_buf, challenge_buf_len); 6378 } 6379 6380 static int mds_verify_authorizer_reply(struct ceph_connection *con) 6381 { 6382 struct ceph_mds_session *s = con->private; 6383 struct ceph_mds_client *mdsc = s->s_mdsc; 6384 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 6385 struct ceph_auth_handshake *auth = &s->s_auth; 6386 6387 return ceph_auth_verify_authorizer_reply(ac, auth->authorizer, 6388 auth->authorizer_reply_buf, auth->authorizer_reply_buf_len, 6389 NULL, NULL, NULL, NULL); 6390 } 6391 6392 static int mds_invalidate_authorizer(struct ceph_connection *con) 6393 { 6394 struct ceph_mds_session *s = con->private; 6395 struct ceph_mds_client *mdsc = s->s_mdsc; 6396 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 6397 6398 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS); 6399 6400 return ceph_monc_validate_auth(&mdsc->fsc->client->monc); 6401 } 6402 6403 static int mds_get_auth_request(struct ceph_connection *con, 6404 void *buf, int *buf_len, 6405 void **authorizer, int *authorizer_len) 6406 { 6407 struct ceph_mds_session *s = con->private; 6408 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 6409 struct ceph_auth_handshake *auth = &s->s_auth; 6410 int ret; 6411 6412 ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS, 6413 buf, buf_len); 6414 if (ret) 6415 return ret; 6416 6417 *authorizer = auth->authorizer_buf; 6418 *authorizer_len = auth->authorizer_buf_len; 6419 return 0; 6420 } 6421 6422 static int mds_handle_auth_reply_more(struct ceph_connection *con, 6423 void *reply, int reply_len, 6424 void *buf, int *buf_len, 6425 void **authorizer, int *authorizer_len) 6426 { 6427 struct ceph_mds_session *s = con->private; 6428 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 6429 struct ceph_auth_handshake *auth = &s->s_auth; 6430 int ret; 6431 6432 ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len, 6433 buf, buf_len); 6434 if (ret) 6435 return ret; 6436 6437 *authorizer = auth->authorizer_buf; 6438 *authorizer_len = auth->authorizer_buf_len; 6439 return 0; 6440 } 6441 6442 static int mds_handle_auth_done(struct ceph_connection *con, 6443 u64 global_id, void *reply, int reply_len, 6444 u8 *session_key, int *session_key_len, 6445 u8 *con_secret, int *con_secret_len) 6446 { 6447 struct ceph_mds_session *s = con->private; 6448 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 6449 struct ceph_auth_handshake *auth = &s->s_auth; 6450 6451 return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len, 6452 session_key, session_key_len, 6453 con_secret, con_secret_len); 6454 } 6455 6456 static int mds_handle_auth_bad_method(struct ceph_connection *con, 6457 int used_proto, int result, 6458 const int *allowed_protos, int proto_cnt, 6459 const int *allowed_modes, int mode_cnt) 6460 { 6461 struct ceph_mds_session *s = con->private; 6462 struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc; 6463 int ret; 6464 6465 if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS, 6466 used_proto, result, 6467 allowed_protos, proto_cnt, 6468 allowed_modes, mode_cnt)) { 6469 ret = ceph_monc_validate_auth(monc); 6470 if (ret) 6471 return ret; 6472 } 6473 6474 return -EACCES; 6475 } 6476 6477 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con, 6478 struct ceph_msg_header *hdr, int *skip) 6479 { 6480 struct ceph_msg *msg; 6481 int type = (int) le16_to_cpu(hdr->type); 6482 int front_len = (int) le32_to_cpu(hdr->front_len); 6483 6484 if (con->in_msg) 6485 return con->in_msg; 6486 6487 *skip = 0; 6488 msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 6489 if (!msg) { 6490 pr_err("unable to allocate msg type %d len %d\n", 6491 type, front_len); 6492 return NULL; 6493 } 6494 6495 return msg; 6496 } 6497 6498 static int mds_sign_message(struct ceph_msg *msg) 6499 { 6500 struct ceph_mds_session *s = msg->con->private; 6501 struct ceph_auth_handshake *auth = &s->s_auth; 6502 6503 return ceph_auth_sign_message(auth, msg); 6504 } 6505 6506 static int mds_check_message_signature(struct ceph_msg *msg) 6507 { 6508 struct ceph_mds_session *s = msg->con->private; 6509 struct ceph_auth_handshake *auth = &s->s_auth; 6510 6511 return ceph_auth_check_message_signature(auth, msg); 6512 } 6513 6514 static const struct ceph_connection_operations mds_con_ops = { 6515 .get = mds_get_con, 6516 .put = mds_put_con, 6517 .alloc_msg = mds_alloc_msg, 6518 .dispatch = mds_dispatch, 6519 .peer_reset = mds_peer_reset, 6520 .get_authorizer = mds_get_authorizer, 6521 .add_authorizer_challenge = mds_add_authorizer_challenge, 6522 .verify_authorizer_reply = mds_verify_authorizer_reply, 6523 .invalidate_authorizer = mds_invalidate_authorizer, 6524 .sign_message = mds_sign_message, 6525 .check_message_signature = mds_check_message_signature, 6526 .get_auth_request = mds_get_auth_request, 6527 .handle_auth_reply_more = mds_handle_auth_reply_more, 6528 .handle_auth_done = mds_handle_auth_done, 6529 .handle_auth_bad_method = mds_handle_auth_bad_method, 6530 }; 6531 6532 /* eof */ 6533