1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/fs.h> 5 #include <linux/wait.h> 6 #include <linux/slab.h> 7 #include <linux/gfp.h> 8 #include <linux/sched.h> 9 #include <linux/debugfs.h> 10 #include <linux/seq_file.h> 11 #include <linux/ratelimit.h> 12 #include <linux/bits.h> 13 #include <linux/ktime.h> 14 #include <linux/bitmap.h> 15 16 #include "super.h" 17 #include "mds_client.h" 18 19 #include <linux/ceph/ceph_features.h> 20 #include <linux/ceph/messenger.h> 21 #include <linux/ceph/decode.h> 22 #include <linux/ceph/pagelist.h> 23 #include <linux/ceph/auth.h> 24 #include <linux/ceph/debugfs.h> 25 26 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE) 27 28 /* 29 * A cluster of MDS (metadata server) daemons is responsible for 30 * managing the file system namespace (the directory hierarchy and 31 * inodes) and for coordinating shared access to storage. Metadata is 32 * partitioning hierarchically across a number of servers, and that 33 * partition varies over time as the cluster adjusts the distribution 34 * in order to balance load. 35 * 36 * The MDS client is primarily responsible to managing synchronous 37 * metadata requests for operations like open, unlink, and so forth. 38 * If there is a MDS failure, we find out about it when we (possibly 39 * request and) receive a new MDS map, and can resubmit affected 40 * requests. 41 * 42 * For the most part, though, we take advantage of a lossless 43 * communications channel to the MDS, and do not need to worry about 44 * timing out or resubmitting requests. 45 * 46 * We maintain a stateful "session" with each MDS we interact with. 47 * Within each session, we sent periodic heartbeat messages to ensure 48 * any capabilities or leases we have been issues remain valid. If 49 * the session times out and goes stale, our leases and capabilities 50 * are no longer valid. 51 */ 52 53 struct ceph_reconnect_state { 54 struct ceph_mds_session *session; 55 int nr_caps, nr_realms; 56 struct ceph_pagelist *pagelist; 57 unsigned msg_version; 58 bool allow_multi; 59 }; 60 61 static void __wake_requests(struct ceph_mds_client *mdsc, 62 struct list_head *head); 63 static void ceph_cap_release_work(struct work_struct *work); 64 static void ceph_cap_reclaim_work(struct work_struct *work); 65 66 static const struct ceph_connection_operations mds_con_ops; 67 68 69 /* 70 * mds reply parsing 71 */ 72 73 static int parse_reply_info_quota(void **p, void *end, 74 struct ceph_mds_reply_info_in *info) 75 { 76 u8 struct_v, struct_compat; 77 u32 struct_len; 78 79 ceph_decode_8_safe(p, end, struct_v, bad); 80 ceph_decode_8_safe(p, end, struct_compat, bad); 81 /* struct_v is expected to be >= 1. we only 82 * understand encoding with struct_compat == 1. */ 83 if (!struct_v || struct_compat != 1) 84 goto bad; 85 ceph_decode_32_safe(p, end, struct_len, bad); 86 ceph_decode_need(p, end, struct_len, bad); 87 end = *p + struct_len; 88 ceph_decode_64_safe(p, end, info->max_bytes, bad); 89 ceph_decode_64_safe(p, end, info->max_files, bad); 90 *p = end; 91 return 0; 92 bad: 93 return -EIO; 94 } 95 96 /* 97 * parse individual inode info 98 */ 99 static int parse_reply_info_in(void **p, void *end, 100 struct ceph_mds_reply_info_in *info, 101 u64 features) 102 { 103 int err = 0; 104 u8 struct_v = 0; 105 106 if (features == (u64)-1) { 107 u32 struct_len; 108 u8 struct_compat; 109 ceph_decode_8_safe(p, end, struct_v, bad); 110 ceph_decode_8_safe(p, end, struct_compat, bad); 111 /* struct_v is expected to be >= 1. we only understand 112 * encoding with struct_compat == 1. */ 113 if (!struct_v || struct_compat != 1) 114 goto bad; 115 ceph_decode_32_safe(p, end, struct_len, bad); 116 ceph_decode_need(p, end, struct_len, bad); 117 end = *p + struct_len; 118 } 119 120 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad); 121 info->in = *p; 122 *p += sizeof(struct ceph_mds_reply_inode) + 123 sizeof(*info->in->fragtree.splits) * 124 le32_to_cpu(info->in->fragtree.nsplits); 125 126 ceph_decode_32_safe(p, end, info->symlink_len, bad); 127 ceph_decode_need(p, end, info->symlink_len, bad); 128 info->symlink = *p; 129 *p += info->symlink_len; 130 131 ceph_decode_copy_safe(p, end, &info->dir_layout, 132 sizeof(info->dir_layout), bad); 133 ceph_decode_32_safe(p, end, info->xattr_len, bad); 134 ceph_decode_need(p, end, info->xattr_len, bad); 135 info->xattr_data = *p; 136 *p += info->xattr_len; 137 138 if (features == (u64)-1) { 139 /* inline data */ 140 ceph_decode_64_safe(p, end, info->inline_version, bad); 141 ceph_decode_32_safe(p, end, info->inline_len, bad); 142 ceph_decode_need(p, end, info->inline_len, bad); 143 info->inline_data = *p; 144 *p += info->inline_len; 145 /* quota */ 146 err = parse_reply_info_quota(p, end, info); 147 if (err < 0) 148 goto out_bad; 149 /* pool namespace */ 150 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 151 if (info->pool_ns_len > 0) { 152 ceph_decode_need(p, end, info->pool_ns_len, bad); 153 info->pool_ns_data = *p; 154 *p += info->pool_ns_len; 155 } 156 157 /* btime */ 158 ceph_decode_need(p, end, sizeof(info->btime), bad); 159 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 160 161 /* change attribute */ 162 ceph_decode_64_safe(p, end, info->change_attr, bad); 163 164 /* dir pin */ 165 if (struct_v >= 2) { 166 ceph_decode_32_safe(p, end, info->dir_pin, bad); 167 } else { 168 info->dir_pin = -ENODATA; 169 } 170 171 /* snapshot birth time, remains zero for v<=2 */ 172 if (struct_v >= 3) { 173 ceph_decode_need(p, end, sizeof(info->snap_btime), bad); 174 ceph_decode_copy(p, &info->snap_btime, 175 sizeof(info->snap_btime)); 176 } else { 177 memset(&info->snap_btime, 0, sizeof(info->snap_btime)); 178 } 179 180 /* snapshot count, remains zero for v<=3 */ 181 if (struct_v >= 4) { 182 ceph_decode_64_safe(p, end, info->rsnaps, bad); 183 } else { 184 info->rsnaps = 0; 185 } 186 187 *p = end; 188 } else { 189 if (features & CEPH_FEATURE_MDS_INLINE_DATA) { 190 ceph_decode_64_safe(p, end, info->inline_version, bad); 191 ceph_decode_32_safe(p, end, info->inline_len, bad); 192 ceph_decode_need(p, end, info->inline_len, bad); 193 info->inline_data = *p; 194 *p += info->inline_len; 195 } else 196 info->inline_version = CEPH_INLINE_NONE; 197 198 if (features & CEPH_FEATURE_MDS_QUOTA) { 199 err = parse_reply_info_quota(p, end, info); 200 if (err < 0) 201 goto out_bad; 202 } else { 203 info->max_bytes = 0; 204 info->max_files = 0; 205 } 206 207 info->pool_ns_len = 0; 208 info->pool_ns_data = NULL; 209 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { 210 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 211 if (info->pool_ns_len > 0) { 212 ceph_decode_need(p, end, info->pool_ns_len, bad); 213 info->pool_ns_data = *p; 214 *p += info->pool_ns_len; 215 } 216 } 217 218 if (features & CEPH_FEATURE_FS_BTIME) { 219 ceph_decode_need(p, end, sizeof(info->btime), bad); 220 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 221 ceph_decode_64_safe(p, end, info->change_attr, bad); 222 } 223 224 info->dir_pin = -ENODATA; 225 /* info->snap_btime and info->rsnaps remain zero */ 226 } 227 return 0; 228 bad: 229 err = -EIO; 230 out_bad: 231 return err; 232 } 233 234 static int parse_reply_info_dir(void **p, void *end, 235 struct ceph_mds_reply_dirfrag **dirfrag, 236 u64 features) 237 { 238 if (features == (u64)-1) { 239 u8 struct_v, struct_compat; 240 u32 struct_len; 241 ceph_decode_8_safe(p, end, struct_v, bad); 242 ceph_decode_8_safe(p, end, struct_compat, bad); 243 /* struct_v is expected to be >= 1. we only understand 244 * encoding whose struct_compat == 1. */ 245 if (!struct_v || struct_compat != 1) 246 goto bad; 247 ceph_decode_32_safe(p, end, struct_len, bad); 248 ceph_decode_need(p, end, struct_len, bad); 249 end = *p + struct_len; 250 } 251 252 ceph_decode_need(p, end, sizeof(**dirfrag), bad); 253 *dirfrag = *p; 254 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist); 255 if (unlikely(*p > end)) 256 goto bad; 257 if (features == (u64)-1) 258 *p = end; 259 return 0; 260 bad: 261 return -EIO; 262 } 263 264 static int parse_reply_info_lease(void **p, void *end, 265 struct ceph_mds_reply_lease **lease, 266 u64 features) 267 { 268 if (features == (u64)-1) { 269 u8 struct_v, struct_compat; 270 u32 struct_len; 271 ceph_decode_8_safe(p, end, struct_v, bad); 272 ceph_decode_8_safe(p, end, struct_compat, bad); 273 /* struct_v is expected to be >= 1. we only understand 274 * encoding whose struct_compat == 1. */ 275 if (!struct_v || struct_compat != 1) 276 goto bad; 277 ceph_decode_32_safe(p, end, struct_len, bad); 278 ceph_decode_need(p, end, struct_len, bad); 279 end = *p + struct_len; 280 } 281 282 ceph_decode_need(p, end, sizeof(**lease), bad); 283 *lease = *p; 284 *p += sizeof(**lease); 285 if (features == (u64)-1) 286 *p = end; 287 return 0; 288 bad: 289 return -EIO; 290 } 291 292 /* 293 * parse a normal reply, which may contain a (dir+)dentry and/or a 294 * target inode. 295 */ 296 static int parse_reply_info_trace(void **p, void *end, 297 struct ceph_mds_reply_info_parsed *info, 298 u64 features) 299 { 300 int err; 301 302 if (info->head->is_dentry) { 303 err = parse_reply_info_in(p, end, &info->diri, features); 304 if (err < 0) 305 goto out_bad; 306 307 err = parse_reply_info_dir(p, end, &info->dirfrag, features); 308 if (err < 0) 309 goto out_bad; 310 311 ceph_decode_32_safe(p, end, info->dname_len, bad); 312 ceph_decode_need(p, end, info->dname_len, bad); 313 info->dname = *p; 314 *p += info->dname_len; 315 316 err = parse_reply_info_lease(p, end, &info->dlease, features); 317 if (err < 0) 318 goto out_bad; 319 } 320 321 if (info->head->is_target) { 322 err = parse_reply_info_in(p, end, &info->targeti, features); 323 if (err < 0) 324 goto out_bad; 325 } 326 327 if (unlikely(*p != end)) 328 goto bad; 329 return 0; 330 331 bad: 332 err = -EIO; 333 out_bad: 334 pr_err("problem parsing mds trace %d\n", err); 335 return err; 336 } 337 338 /* 339 * parse readdir results 340 */ 341 static int parse_reply_info_readdir(void **p, void *end, 342 struct ceph_mds_reply_info_parsed *info, 343 u64 features) 344 { 345 u32 num, i = 0; 346 int err; 347 348 err = parse_reply_info_dir(p, end, &info->dir_dir, features); 349 if (err < 0) 350 goto out_bad; 351 352 ceph_decode_need(p, end, sizeof(num) + 2, bad); 353 num = ceph_decode_32(p); 354 { 355 u16 flags = ceph_decode_16(p); 356 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END); 357 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE); 358 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER); 359 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH); 360 } 361 if (num == 0) 362 goto done; 363 364 BUG_ON(!info->dir_entries); 365 if ((unsigned long)(info->dir_entries + num) > 366 (unsigned long)info->dir_entries + info->dir_buf_size) { 367 pr_err("dir contents are larger than expected\n"); 368 WARN_ON(1); 369 goto bad; 370 } 371 372 info->dir_nr = num; 373 while (num) { 374 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 375 /* dentry */ 376 ceph_decode_32_safe(p, end, rde->name_len, bad); 377 ceph_decode_need(p, end, rde->name_len, bad); 378 rde->name = *p; 379 *p += rde->name_len; 380 dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name); 381 382 /* dentry lease */ 383 err = parse_reply_info_lease(p, end, &rde->lease, features); 384 if (err) 385 goto out_bad; 386 /* inode */ 387 err = parse_reply_info_in(p, end, &rde->inode, features); 388 if (err < 0) 389 goto out_bad; 390 /* ceph_readdir_prepopulate() will update it */ 391 rde->offset = 0; 392 i++; 393 num--; 394 } 395 396 done: 397 /* Skip over any unrecognized fields */ 398 *p = end; 399 return 0; 400 401 bad: 402 err = -EIO; 403 out_bad: 404 pr_err("problem parsing dir contents %d\n", err); 405 return err; 406 } 407 408 /* 409 * parse fcntl F_GETLK results 410 */ 411 static int parse_reply_info_filelock(void **p, void *end, 412 struct ceph_mds_reply_info_parsed *info, 413 u64 features) 414 { 415 if (*p + sizeof(*info->filelock_reply) > end) 416 goto bad; 417 418 info->filelock_reply = *p; 419 420 /* Skip over any unrecognized fields */ 421 *p = end; 422 return 0; 423 bad: 424 return -EIO; 425 } 426 427 428 #if BITS_PER_LONG == 64 429 430 #define DELEGATED_INO_AVAILABLE xa_mk_value(1) 431 432 static int ceph_parse_deleg_inos(void **p, void *end, 433 struct ceph_mds_session *s) 434 { 435 u32 sets; 436 437 ceph_decode_32_safe(p, end, sets, bad); 438 dout("got %u sets of delegated inodes\n", sets); 439 while (sets--) { 440 u64 start, len; 441 442 ceph_decode_64_safe(p, end, start, bad); 443 ceph_decode_64_safe(p, end, len, bad); 444 445 /* Don't accept a delegation of system inodes */ 446 if (start < CEPH_INO_SYSTEM_BASE) { 447 pr_warn_ratelimited("ceph: ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n", 448 start, len); 449 continue; 450 } 451 while (len--) { 452 int err = xa_insert(&s->s_delegated_inos, start++, 453 DELEGATED_INO_AVAILABLE, 454 GFP_KERNEL); 455 if (!err) { 456 dout("added delegated inode 0x%llx\n", 457 start - 1); 458 } else if (err == -EBUSY) { 459 pr_warn("MDS delegated inode 0x%llx more than once.\n", 460 start - 1); 461 } else { 462 return err; 463 } 464 } 465 } 466 return 0; 467 bad: 468 return -EIO; 469 } 470 471 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 472 { 473 unsigned long ino; 474 void *val; 475 476 xa_for_each(&s->s_delegated_inos, ino, val) { 477 val = xa_erase(&s->s_delegated_inos, ino); 478 if (val == DELEGATED_INO_AVAILABLE) 479 return ino; 480 } 481 return 0; 482 } 483 484 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 485 { 486 return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE, 487 GFP_KERNEL); 488 } 489 #else /* BITS_PER_LONG == 64 */ 490 /* 491 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just 492 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top 493 * and bottom words? 494 */ 495 static int ceph_parse_deleg_inos(void **p, void *end, 496 struct ceph_mds_session *s) 497 { 498 u32 sets; 499 500 ceph_decode_32_safe(p, end, sets, bad); 501 if (sets) 502 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad); 503 return 0; 504 bad: 505 return -EIO; 506 } 507 508 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 509 { 510 return 0; 511 } 512 513 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 514 { 515 return 0; 516 } 517 #endif /* BITS_PER_LONG == 64 */ 518 519 /* 520 * parse create results 521 */ 522 static int parse_reply_info_create(void **p, void *end, 523 struct ceph_mds_reply_info_parsed *info, 524 u64 features, struct ceph_mds_session *s) 525 { 526 int ret; 527 528 if (features == (u64)-1 || 529 (features & CEPH_FEATURE_REPLY_CREATE_INODE)) { 530 if (*p == end) { 531 /* Malformed reply? */ 532 info->has_create_ino = false; 533 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) { 534 info->has_create_ino = true; 535 /* struct_v, struct_compat, and len */ 536 ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad); 537 ceph_decode_64_safe(p, end, info->ino, bad); 538 ret = ceph_parse_deleg_inos(p, end, s); 539 if (ret) 540 return ret; 541 } else { 542 /* legacy */ 543 ceph_decode_64_safe(p, end, info->ino, bad); 544 info->has_create_ino = true; 545 } 546 } else { 547 if (*p != end) 548 goto bad; 549 } 550 551 /* Skip over any unrecognized fields */ 552 *p = end; 553 return 0; 554 bad: 555 return -EIO; 556 } 557 558 static int parse_reply_info_getvxattr(void **p, void *end, 559 struct ceph_mds_reply_info_parsed *info, 560 u64 features) 561 { 562 u32 value_len; 563 564 ceph_decode_skip_8(p, end, bad); /* skip current version: 1 */ 565 ceph_decode_skip_8(p, end, bad); /* skip first version: 1 */ 566 ceph_decode_skip_32(p, end, bad); /* skip payload length */ 567 568 ceph_decode_32_safe(p, end, value_len, bad); 569 570 if (value_len == end - *p) { 571 info->xattr_info.xattr_value = *p; 572 info->xattr_info.xattr_value_len = value_len; 573 *p = end; 574 return value_len; 575 } 576 bad: 577 return -EIO; 578 } 579 580 /* 581 * parse extra results 582 */ 583 static int parse_reply_info_extra(void **p, void *end, 584 struct ceph_mds_reply_info_parsed *info, 585 u64 features, struct ceph_mds_session *s) 586 { 587 u32 op = le32_to_cpu(info->head->op); 588 589 if (op == CEPH_MDS_OP_GETFILELOCK) 590 return parse_reply_info_filelock(p, end, info, features); 591 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP) 592 return parse_reply_info_readdir(p, end, info, features); 593 else if (op == CEPH_MDS_OP_CREATE) 594 return parse_reply_info_create(p, end, info, features, s); 595 else if (op == CEPH_MDS_OP_GETVXATTR) 596 return parse_reply_info_getvxattr(p, end, info, features); 597 else 598 return -EIO; 599 } 600 601 /* 602 * parse entire mds reply 603 */ 604 static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg, 605 struct ceph_mds_reply_info_parsed *info, 606 u64 features) 607 { 608 void *p, *end; 609 u32 len; 610 int err; 611 612 info->head = msg->front.iov_base; 613 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); 614 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head); 615 616 /* trace */ 617 ceph_decode_32_safe(&p, end, len, bad); 618 if (len > 0) { 619 ceph_decode_need(&p, end, len, bad); 620 err = parse_reply_info_trace(&p, p+len, info, features); 621 if (err < 0) 622 goto out_bad; 623 } 624 625 /* extra */ 626 ceph_decode_32_safe(&p, end, len, bad); 627 if (len > 0) { 628 ceph_decode_need(&p, end, len, bad); 629 err = parse_reply_info_extra(&p, p+len, info, features, s); 630 if (err < 0) 631 goto out_bad; 632 } 633 634 /* snap blob */ 635 ceph_decode_32_safe(&p, end, len, bad); 636 info->snapblob_len = len; 637 info->snapblob = p; 638 p += len; 639 640 if (p != end) 641 goto bad; 642 return 0; 643 644 bad: 645 err = -EIO; 646 out_bad: 647 pr_err("mds parse_reply err %d\n", err); 648 return err; 649 } 650 651 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 652 { 653 if (!info->dir_entries) 654 return; 655 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size)); 656 } 657 658 /* 659 * In async unlink case the kclient won't wait for the first reply 660 * from MDS and just drop all the links and unhash the dentry and then 661 * succeeds immediately. 662 * 663 * For any new create/link/rename,etc requests followed by using the 664 * same file names we must wait for the first reply of the inflight 665 * unlink request, or the MDS possibly will fail these following 666 * requests with -EEXIST if the inflight async unlink request was 667 * delayed for some reasons. 668 * 669 * And the worst case is that for the none async openc request it will 670 * successfully open the file if the CDentry hasn't been unlinked yet, 671 * but later the previous delayed async unlink request will remove the 672 * CDenty. That means the just created file is possiblly deleted later 673 * by accident. 674 * 675 * We need to wait for the inflight async unlink requests to finish 676 * when creating new files/directories by using the same file names. 677 */ 678 int ceph_wait_on_conflict_unlink(struct dentry *dentry) 679 { 680 struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); 681 struct dentry *pdentry = dentry->d_parent; 682 struct dentry *udentry, *found = NULL; 683 struct ceph_dentry_info *di; 684 struct qstr dname; 685 u32 hash = dentry->d_name.hash; 686 int err; 687 688 dname.name = dentry->d_name.name; 689 dname.len = dentry->d_name.len; 690 691 rcu_read_lock(); 692 hash_for_each_possible_rcu(fsc->async_unlink_conflict, di, 693 hnode, hash) { 694 udentry = di->dentry; 695 696 spin_lock(&udentry->d_lock); 697 if (udentry->d_name.hash != hash) 698 goto next; 699 if (unlikely(udentry->d_parent != pdentry)) 700 goto next; 701 if (!hash_hashed(&di->hnode)) 702 goto next; 703 704 if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags)) 705 pr_warn("%s dentry %p:%pd async unlink bit is not set\n", 706 __func__, dentry, dentry); 707 708 if (!d_same_name(udentry, pdentry, &dname)) 709 goto next; 710 711 spin_unlock(&udentry->d_lock); 712 found = dget(udentry); 713 break; 714 next: 715 spin_unlock(&udentry->d_lock); 716 } 717 rcu_read_unlock(); 718 719 if (likely(!found)) 720 return 0; 721 722 dout("%s dentry %p:%pd conflict with old %p:%pd\n", __func__, 723 dentry, dentry, found, found); 724 725 err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT, 726 TASK_KILLABLE); 727 dput(found); 728 return err; 729 } 730 731 732 /* 733 * sessions 734 */ 735 const char *ceph_session_state_name(int s) 736 { 737 switch (s) { 738 case CEPH_MDS_SESSION_NEW: return "new"; 739 case CEPH_MDS_SESSION_OPENING: return "opening"; 740 case CEPH_MDS_SESSION_OPEN: return "open"; 741 case CEPH_MDS_SESSION_HUNG: return "hung"; 742 case CEPH_MDS_SESSION_CLOSING: return "closing"; 743 case CEPH_MDS_SESSION_CLOSED: return "closed"; 744 case CEPH_MDS_SESSION_RESTARTING: return "restarting"; 745 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; 746 case CEPH_MDS_SESSION_REJECTED: return "rejected"; 747 default: return "???"; 748 } 749 } 750 751 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s) 752 { 753 if (refcount_inc_not_zero(&s->s_ref)) 754 return s; 755 return NULL; 756 } 757 758 void ceph_put_mds_session(struct ceph_mds_session *s) 759 { 760 if (IS_ERR_OR_NULL(s)) 761 return; 762 763 if (refcount_dec_and_test(&s->s_ref)) { 764 if (s->s_auth.authorizer) 765 ceph_auth_destroy_authorizer(s->s_auth.authorizer); 766 WARN_ON(mutex_is_locked(&s->s_mutex)); 767 xa_destroy(&s->s_delegated_inos); 768 kfree(s); 769 } 770 } 771 772 /* 773 * called under mdsc->mutex 774 */ 775 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, 776 int mds) 777 { 778 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 779 return NULL; 780 return ceph_get_mds_session(mdsc->sessions[mds]); 781 } 782 783 static bool __have_session(struct ceph_mds_client *mdsc, int mds) 784 { 785 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 786 return false; 787 else 788 return true; 789 } 790 791 static int __verify_registered_session(struct ceph_mds_client *mdsc, 792 struct ceph_mds_session *s) 793 { 794 if (s->s_mds >= mdsc->max_sessions || 795 mdsc->sessions[s->s_mds] != s) 796 return -ENOENT; 797 return 0; 798 } 799 800 /* 801 * create+register a new session for given mds. 802 * called under mdsc->mutex. 803 */ 804 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, 805 int mds) 806 { 807 struct ceph_mds_session *s; 808 809 if (mds >= mdsc->mdsmap->possible_max_rank) 810 return ERR_PTR(-EINVAL); 811 812 s = kzalloc(sizeof(*s), GFP_NOFS); 813 if (!s) 814 return ERR_PTR(-ENOMEM); 815 816 if (mds >= mdsc->max_sessions) { 817 int newmax = 1 << get_count_order(mds + 1); 818 struct ceph_mds_session **sa; 819 820 dout("%s: realloc to %d\n", __func__, newmax); 821 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS); 822 if (!sa) 823 goto fail_realloc; 824 if (mdsc->sessions) { 825 memcpy(sa, mdsc->sessions, 826 mdsc->max_sessions * sizeof(void *)); 827 kfree(mdsc->sessions); 828 } 829 mdsc->sessions = sa; 830 mdsc->max_sessions = newmax; 831 } 832 833 dout("%s: mds%d\n", __func__, mds); 834 s->s_mdsc = mdsc; 835 s->s_mds = mds; 836 s->s_state = CEPH_MDS_SESSION_NEW; 837 mutex_init(&s->s_mutex); 838 839 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr); 840 841 atomic_set(&s->s_cap_gen, 1); 842 s->s_cap_ttl = jiffies - 1; 843 844 spin_lock_init(&s->s_cap_lock); 845 INIT_LIST_HEAD(&s->s_caps); 846 refcount_set(&s->s_ref, 1); 847 INIT_LIST_HEAD(&s->s_waiting); 848 INIT_LIST_HEAD(&s->s_unsafe); 849 xa_init(&s->s_delegated_inos); 850 INIT_LIST_HEAD(&s->s_cap_releases); 851 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work); 852 853 INIT_LIST_HEAD(&s->s_cap_dirty); 854 INIT_LIST_HEAD(&s->s_cap_flushing); 855 856 mdsc->sessions[mds] = s; 857 atomic_inc(&mdsc->num_sessions); 858 refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 859 860 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, 861 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 862 863 return s; 864 865 fail_realloc: 866 kfree(s); 867 return ERR_PTR(-ENOMEM); 868 } 869 870 /* 871 * called under mdsc->mutex 872 */ 873 static void __unregister_session(struct ceph_mds_client *mdsc, 874 struct ceph_mds_session *s) 875 { 876 dout("__unregister_session mds%d %p\n", s->s_mds, s); 877 BUG_ON(mdsc->sessions[s->s_mds] != s); 878 mdsc->sessions[s->s_mds] = NULL; 879 ceph_con_close(&s->s_con); 880 ceph_put_mds_session(s); 881 atomic_dec(&mdsc->num_sessions); 882 } 883 884 /* 885 * drop session refs in request. 886 * 887 * should be last request ref, or hold mdsc->mutex 888 */ 889 static void put_request_session(struct ceph_mds_request *req) 890 { 891 if (req->r_session) { 892 ceph_put_mds_session(req->r_session); 893 req->r_session = NULL; 894 } 895 } 896 897 void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc, 898 void (*cb)(struct ceph_mds_session *), 899 bool check_state) 900 { 901 int mds; 902 903 mutex_lock(&mdsc->mutex); 904 for (mds = 0; mds < mdsc->max_sessions; ++mds) { 905 struct ceph_mds_session *s; 906 907 s = __ceph_lookup_mds_session(mdsc, mds); 908 if (!s) 909 continue; 910 911 if (check_state && !check_session_state(s)) { 912 ceph_put_mds_session(s); 913 continue; 914 } 915 916 mutex_unlock(&mdsc->mutex); 917 cb(s); 918 ceph_put_mds_session(s); 919 mutex_lock(&mdsc->mutex); 920 } 921 mutex_unlock(&mdsc->mutex); 922 } 923 924 void ceph_mdsc_release_request(struct kref *kref) 925 { 926 struct ceph_mds_request *req = container_of(kref, 927 struct ceph_mds_request, 928 r_kref); 929 ceph_mdsc_release_dir_caps_no_check(req); 930 destroy_reply_info(&req->r_reply_info); 931 if (req->r_request) 932 ceph_msg_put(req->r_request); 933 if (req->r_reply) 934 ceph_msg_put(req->r_reply); 935 if (req->r_inode) { 936 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 937 iput(req->r_inode); 938 } 939 if (req->r_parent) { 940 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 941 iput(req->r_parent); 942 } 943 iput(req->r_target_inode); 944 if (req->r_dentry) 945 dput(req->r_dentry); 946 if (req->r_old_dentry) 947 dput(req->r_old_dentry); 948 if (req->r_old_dentry_dir) { 949 /* 950 * track (and drop pins for) r_old_dentry_dir 951 * separately, since r_old_dentry's d_parent may have 952 * changed between the dir mutex being dropped and 953 * this request being freed. 954 */ 955 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), 956 CEPH_CAP_PIN); 957 iput(req->r_old_dentry_dir); 958 } 959 kfree(req->r_path1); 960 kfree(req->r_path2); 961 put_cred(req->r_cred); 962 if (req->r_pagelist) 963 ceph_pagelist_release(req->r_pagelist); 964 put_request_session(req); 965 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); 966 WARN_ON_ONCE(!list_empty(&req->r_wait)); 967 kmem_cache_free(ceph_mds_request_cachep, req); 968 } 969 970 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node) 971 972 /* 973 * lookup session, bump ref if found. 974 * 975 * called under mdsc->mutex. 976 */ 977 static struct ceph_mds_request * 978 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid) 979 { 980 struct ceph_mds_request *req; 981 982 req = lookup_request(&mdsc->request_tree, tid); 983 if (req) 984 ceph_mdsc_get_request(req); 985 986 return req; 987 } 988 989 /* 990 * Register an in-flight request, and assign a tid. Link to directory 991 * are modifying (if any). 992 * 993 * Called under mdsc->mutex. 994 */ 995 static void __register_request(struct ceph_mds_client *mdsc, 996 struct ceph_mds_request *req, 997 struct inode *dir) 998 { 999 int ret = 0; 1000 1001 req->r_tid = ++mdsc->last_tid; 1002 if (req->r_num_caps) { 1003 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation, 1004 req->r_num_caps); 1005 if (ret < 0) { 1006 pr_err("__register_request %p " 1007 "failed to reserve caps: %d\n", req, ret); 1008 /* set req->r_err to fail early from __do_request */ 1009 req->r_err = ret; 1010 return; 1011 } 1012 } 1013 dout("__register_request %p tid %lld\n", req, req->r_tid); 1014 ceph_mdsc_get_request(req); 1015 insert_request(&mdsc->request_tree, req); 1016 1017 req->r_cred = get_current_cred(); 1018 1019 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK) 1020 mdsc->oldest_tid = req->r_tid; 1021 1022 if (dir) { 1023 struct ceph_inode_info *ci = ceph_inode(dir); 1024 1025 ihold(dir); 1026 req->r_unsafe_dir = dir; 1027 spin_lock(&ci->i_unsafe_lock); 1028 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); 1029 spin_unlock(&ci->i_unsafe_lock); 1030 } 1031 } 1032 1033 static void __unregister_request(struct ceph_mds_client *mdsc, 1034 struct ceph_mds_request *req) 1035 { 1036 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 1037 1038 /* Never leave an unregistered request on an unsafe list! */ 1039 list_del_init(&req->r_unsafe_item); 1040 1041 if (req->r_tid == mdsc->oldest_tid) { 1042 struct rb_node *p = rb_next(&req->r_node); 1043 mdsc->oldest_tid = 0; 1044 while (p) { 1045 struct ceph_mds_request *next_req = 1046 rb_entry(p, struct ceph_mds_request, r_node); 1047 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) { 1048 mdsc->oldest_tid = next_req->r_tid; 1049 break; 1050 } 1051 p = rb_next(p); 1052 } 1053 } 1054 1055 erase_request(&mdsc->request_tree, req); 1056 1057 if (req->r_unsafe_dir) { 1058 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 1059 spin_lock(&ci->i_unsafe_lock); 1060 list_del_init(&req->r_unsafe_dir_item); 1061 spin_unlock(&ci->i_unsafe_lock); 1062 } 1063 if (req->r_target_inode && 1064 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 1065 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); 1066 spin_lock(&ci->i_unsafe_lock); 1067 list_del_init(&req->r_unsafe_target_item); 1068 spin_unlock(&ci->i_unsafe_lock); 1069 } 1070 1071 if (req->r_unsafe_dir) { 1072 iput(req->r_unsafe_dir); 1073 req->r_unsafe_dir = NULL; 1074 } 1075 1076 complete_all(&req->r_safe_completion); 1077 1078 ceph_mdsc_put_request(req); 1079 } 1080 1081 /* 1082 * Walk back up the dentry tree until we hit a dentry representing a 1083 * non-snapshot inode. We do this using the rcu_read_lock (which must be held 1084 * when calling this) to ensure that the objects won't disappear while we're 1085 * working with them. Once we hit a candidate dentry, we attempt to take a 1086 * reference to it, and return that as the result. 1087 */ 1088 static struct inode *get_nonsnap_parent(struct dentry *dentry) 1089 { 1090 struct inode *inode = NULL; 1091 1092 while (dentry && !IS_ROOT(dentry)) { 1093 inode = d_inode_rcu(dentry); 1094 if (!inode || ceph_snap(inode) == CEPH_NOSNAP) 1095 break; 1096 dentry = dentry->d_parent; 1097 } 1098 if (inode) 1099 inode = igrab(inode); 1100 return inode; 1101 } 1102 1103 /* 1104 * Choose mds to send request to next. If there is a hint set in the 1105 * request (e.g., due to a prior forward hint from the mds), use that. 1106 * Otherwise, consult frag tree and/or caps to identify the 1107 * appropriate mds. If all else fails, choose randomly. 1108 * 1109 * Called under mdsc->mutex. 1110 */ 1111 static int __choose_mds(struct ceph_mds_client *mdsc, 1112 struct ceph_mds_request *req, 1113 bool *random) 1114 { 1115 struct inode *inode; 1116 struct ceph_inode_info *ci; 1117 struct ceph_cap *cap; 1118 int mode = req->r_direct_mode; 1119 int mds = -1; 1120 u32 hash = req->r_direct_hash; 1121 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); 1122 1123 if (random) 1124 *random = false; 1125 1126 /* 1127 * is there a specific mds we should try? ignore hint if we have 1128 * no session and the mds is not up (active or recovering). 1129 */ 1130 if (req->r_resend_mds >= 0 && 1131 (__have_session(mdsc, req->r_resend_mds) || 1132 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) { 1133 dout("%s using resend_mds mds%d\n", __func__, 1134 req->r_resend_mds); 1135 return req->r_resend_mds; 1136 } 1137 1138 if (mode == USE_RANDOM_MDS) 1139 goto random; 1140 1141 inode = NULL; 1142 if (req->r_inode) { 1143 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) { 1144 inode = req->r_inode; 1145 ihold(inode); 1146 } else { 1147 /* req->r_dentry is non-null for LSSNAP request */ 1148 rcu_read_lock(); 1149 inode = get_nonsnap_parent(req->r_dentry); 1150 rcu_read_unlock(); 1151 dout("%s using snapdir's parent %p\n", __func__, inode); 1152 } 1153 } else if (req->r_dentry) { 1154 /* ignore race with rename; old or new d_parent is okay */ 1155 struct dentry *parent; 1156 struct inode *dir; 1157 1158 rcu_read_lock(); 1159 parent = READ_ONCE(req->r_dentry->d_parent); 1160 dir = req->r_parent ? : d_inode_rcu(parent); 1161 1162 if (!dir || dir->i_sb != mdsc->fsc->sb) { 1163 /* not this fs or parent went negative */ 1164 inode = d_inode(req->r_dentry); 1165 if (inode) 1166 ihold(inode); 1167 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 1168 /* direct snapped/virtual snapdir requests 1169 * based on parent dir inode */ 1170 inode = get_nonsnap_parent(parent); 1171 dout("%s using nonsnap parent %p\n", __func__, inode); 1172 } else { 1173 /* dentry target */ 1174 inode = d_inode(req->r_dentry); 1175 if (!inode || mode == USE_AUTH_MDS) { 1176 /* dir + name */ 1177 inode = igrab(dir); 1178 hash = ceph_dentry_hash(dir, req->r_dentry); 1179 is_hash = true; 1180 } else { 1181 ihold(inode); 1182 } 1183 } 1184 rcu_read_unlock(); 1185 } 1186 1187 dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash, 1188 hash, mode); 1189 if (!inode) 1190 goto random; 1191 ci = ceph_inode(inode); 1192 1193 if (is_hash && S_ISDIR(inode->i_mode)) { 1194 struct ceph_inode_frag frag; 1195 int found; 1196 1197 ceph_choose_frag(ci, hash, &frag, &found); 1198 if (found) { 1199 if (mode == USE_ANY_MDS && frag.ndist > 0) { 1200 u8 r; 1201 1202 /* choose a random replica */ 1203 get_random_bytes(&r, 1); 1204 r %= frag.ndist; 1205 mds = frag.dist[r]; 1206 dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n", 1207 __func__, inode, ceph_vinop(inode), 1208 frag.frag, mds, (int)r, frag.ndist); 1209 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1210 CEPH_MDS_STATE_ACTIVE && 1211 !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds)) 1212 goto out; 1213 } 1214 1215 /* since this file/dir wasn't known to be 1216 * replicated, then we want to look for the 1217 * authoritative mds. */ 1218 if (frag.mds >= 0) { 1219 /* choose auth mds */ 1220 mds = frag.mds; 1221 dout("%s %p %llx.%llx frag %u mds%d (auth)\n", 1222 __func__, inode, ceph_vinop(inode), 1223 frag.frag, mds); 1224 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1225 CEPH_MDS_STATE_ACTIVE) { 1226 if (!ceph_mdsmap_is_laggy(mdsc->mdsmap, 1227 mds)) 1228 goto out; 1229 } 1230 } 1231 mode = USE_AUTH_MDS; 1232 } 1233 } 1234 1235 spin_lock(&ci->i_ceph_lock); 1236 cap = NULL; 1237 if (mode == USE_AUTH_MDS) 1238 cap = ci->i_auth_cap; 1239 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) 1240 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); 1241 if (!cap) { 1242 spin_unlock(&ci->i_ceph_lock); 1243 iput(inode); 1244 goto random; 1245 } 1246 mds = cap->session->s_mds; 1247 dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__, 1248 inode, ceph_vinop(inode), mds, 1249 cap == ci->i_auth_cap ? "auth " : "", cap); 1250 spin_unlock(&ci->i_ceph_lock); 1251 out: 1252 iput(inode); 1253 return mds; 1254 1255 random: 1256 if (random) 1257 *random = true; 1258 1259 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap); 1260 dout("%s chose random mds%d\n", __func__, mds); 1261 return mds; 1262 } 1263 1264 1265 /* 1266 * session messages 1267 */ 1268 struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq) 1269 { 1270 struct ceph_msg *msg; 1271 struct ceph_mds_session_head *h; 1272 1273 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS, 1274 false); 1275 if (!msg) { 1276 pr_err("ENOMEM creating session %s msg\n", 1277 ceph_session_op_name(op)); 1278 return NULL; 1279 } 1280 h = msg->front.iov_base; 1281 h->op = cpu_to_le32(op); 1282 h->seq = cpu_to_le64(seq); 1283 1284 return msg; 1285 } 1286 1287 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED; 1288 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8) 1289 static int encode_supported_features(void **p, void *end) 1290 { 1291 static const size_t count = ARRAY_SIZE(feature_bits); 1292 1293 if (count > 0) { 1294 size_t i; 1295 size_t size = FEATURE_BYTES(count); 1296 unsigned long bit; 1297 1298 if (WARN_ON_ONCE(*p + 4 + size > end)) 1299 return -ERANGE; 1300 1301 ceph_encode_32(p, size); 1302 memset(*p, 0, size); 1303 for (i = 0; i < count; i++) { 1304 bit = feature_bits[i]; 1305 ((unsigned char *)(*p))[bit / 8] |= BIT(bit % 8); 1306 } 1307 *p += size; 1308 } else { 1309 if (WARN_ON_ONCE(*p + 4 > end)) 1310 return -ERANGE; 1311 1312 ceph_encode_32(p, 0); 1313 } 1314 1315 return 0; 1316 } 1317 1318 static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED; 1319 #define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8) 1320 static int encode_metric_spec(void **p, void *end) 1321 { 1322 static const size_t count = ARRAY_SIZE(metric_bits); 1323 1324 /* header */ 1325 if (WARN_ON_ONCE(*p + 2 > end)) 1326 return -ERANGE; 1327 1328 ceph_encode_8(p, 1); /* version */ 1329 ceph_encode_8(p, 1); /* compat */ 1330 1331 if (count > 0) { 1332 size_t i; 1333 size_t size = METRIC_BYTES(count); 1334 1335 if (WARN_ON_ONCE(*p + 4 + 4 + size > end)) 1336 return -ERANGE; 1337 1338 /* metric spec info length */ 1339 ceph_encode_32(p, 4 + size); 1340 1341 /* metric spec */ 1342 ceph_encode_32(p, size); 1343 memset(*p, 0, size); 1344 for (i = 0; i < count; i++) 1345 ((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8); 1346 *p += size; 1347 } else { 1348 if (WARN_ON_ONCE(*p + 4 + 4 > end)) 1349 return -ERANGE; 1350 1351 /* metric spec info length */ 1352 ceph_encode_32(p, 4); 1353 /* metric spec */ 1354 ceph_encode_32(p, 0); 1355 } 1356 1357 return 0; 1358 } 1359 1360 /* 1361 * session message, specialization for CEPH_SESSION_REQUEST_OPEN 1362 * to include additional client metadata fields. 1363 */ 1364 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq) 1365 { 1366 struct ceph_msg *msg; 1367 struct ceph_mds_session_head *h; 1368 int i; 1369 int extra_bytes = 0; 1370 int metadata_key_count = 0; 1371 struct ceph_options *opt = mdsc->fsc->client->options; 1372 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options; 1373 size_t size, count; 1374 void *p, *end; 1375 int ret; 1376 1377 const char* metadata[][2] = { 1378 {"hostname", mdsc->nodename}, 1379 {"kernel_version", init_utsname()->release}, 1380 {"entity_id", opt->name ? : ""}, 1381 {"root", fsopt->server_path ? : "/"}, 1382 {NULL, NULL} 1383 }; 1384 1385 /* Calculate serialized length of metadata */ 1386 extra_bytes = 4; /* map length */ 1387 for (i = 0; metadata[i][0]; ++i) { 1388 extra_bytes += 8 + strlen(metadata[i][0]) + 1389 strlen(metadata[i][1]); 1390 metadata_key_count++; 1391 } 1392 1393 /* supported feature */ 1394 size = 0; 1395 count = ARRAY_SIZE(feature_bits); 1396 if (count > 0) 1397 size = FEATURE_BYTES(count); 1398 extra_bytes += 4 + size; 1399 1400 /* metric spec */ 1401 size = 0; 1402 count = ARRAY_SIZE(metric_bits); 1403 if (count > 0) 1404 size = METRIC_BYTES(count); 1405 extra_bytes += 2 + 4 + 4 + size; 1406 1407 /* Allocate the message */ 1408 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes, 1409 GFP_NOFS, false); 1410 if (!msg) { 1411 pr_err("ENOMEM creating session open msg\n"); 1412 return ERR_PTR(-ENOMEM); 1413 } 1414 p = msg->front.iov_base; 1415 end = p + msg->front.iov_len; 1416 1417 h = p; 1418 h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN); 1419 h->seq = cpu_to_le64(seq); 1420 1421 /* 1422 * Serialize client metadata into waiting buffer space, using 1423 * the format that userspace expects for map<string, string> 1424 * 1425 * ClientSession messages with metadata are v4 1426 */ 1427 msg->hdr.version = cpu_to_le16(4); 1428 msg->hdr.compat_version = cpu_to_le16(1); 1429 1430 /* The write pointer, following the session_head structure */ 1431 p += sizeof(*h); 1432 1433 /* Number of entries in the map */ 1434 ceph_encode_32(&p, metadata_key_count); 1435 1436 /* Two length-prefixed strings for each entry in the map */ 1437 for (i = 0; metadata[i][0]; ++i) { 1438 size_t const key_len = strlen(metadata[i][0]); 1439 size_t const val_len = strlen(metadata[i][1]); 1440 1441 ceph_encode_32(&p, key_len); 1442 memcpy(p, metadata[i][0], key_len); 1443 p += key_len; 1444 ceph_encode_32(&p, val_len); 1445 memcpy(p, metadata[i][1], val_len); 1446 p += val_len; 1447 } 1448 1449 ret = encode_supported_features(&p, end); 1450 if (ret) { 1451 pr_err("encode_supported_features failed!\n"); 1452 ceph_msg_put(msg); 1453 return ERR_PTR(ret); 1454 } 1455 1456 ret = encode_metric_spec(&p, end); 1457 if (ret) { 1458 pr_err("encode_metric_spec failed!\n"); 1459 ceph_msg_put(msg); 1460 return ERR_PTR(ret); 1461 } 1462 1463 msg->front.iov_len = p - msg->front.iov_base; 1464 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1465 1466 return msg; 1467 } 1468 1469 /* 1470 * send session open request. 1471 * 1472 * called under mdsc->mutex 1473 */ 1474 static int __open_session(struct ceph_mds_client *mdsc, 1475 struct ceph_mds_session *session) 1476 { 1477 struct ceph_msg *msg; 1478 int mstate; 1479 int mds = session->s_mds; 1480 1481 /* wait for mds to go active? */ 1482 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 1483 dout("open_session to mds%d (%s)\n", mds, 1484 ceph_mds_state_name(mstate)); 1485 session->s_state = CEPH_MDS_SESSION_OPENING; 1486 session->s_renew_requested = jiffies; 1487 1488 /* send connect message */ 1489 msg = create_session_open_msg(mdsc, session->s_seq); 1490 if (IS_ERR(msg)) 1491 return PTR_ERR(msg); 1492 ceph_con_send(&session->s_con, msg); 1493 return 0; 1494 } 1495 1496 /* 1497 * open sessions for any export targets for the given mds 1498 * 1499 * called under mdsc->mutex 1500 */ 1501 static struct ceph_mds_session * 1502 __open_export_target_session(struct ceph_mds_client *mdsc, int target) 1503 { 1504 struct ceph_mds_session *session; 1505 int ret; 1506 1507 session = __ceph_lookup_mds_session(mdsc, target); 1508 if (!session) { 1509 session = register_session(mdsc, target); 1510 if (IS_ERR(session)) 1511 return session; 1512 } 1513 if (session->s_state == CEPH_MDS_SESSION_NEW || 1514 session->s_state == CEPH_MDS_SESSION_CLOSING) { 1515 ret = __open_session(mdsc, session); 1516 if (ret) 1517 return ERR_PTR(ret); 1518 } 1519 1520 return session; 1521 } 1522 1523 struct ceph_mds_session * 1524 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target) 1525 { 1526 struct ceph_mds_session *session; 1527 1528 dout("open_export_target_session to mds%d\n", target); 1529 1530 mutex_lock(&mdsc->mutex); 1531 session = __open_export_target_session(mdsc, target); 1532 mutex_unlock(&mdsc->mutex); 1533 1534 return session; 1535 } 1536 1537 static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 1538 struct ceph_mds_session *session) 1539 { 1540 struct ceph_mds_info *mi; 1541 struct ceph_mds_session *ts; 1542 int i, mds = session->s_mds; 1543 1544 if (mds >= mdsc->mdsmap->possible_max_rank) 1545 return; 1546 1547 mi = &mdsc->mdsmap->m_info[mds]; 1548 dout("open_export_target_sessions for mds%d (%d targets)\n", 1549 session->s_mds, mi->num_export_targets); 1550 1551 for (i = 0; i < mi->num_export_targets; i++) { 1552 ts = __open_export_target_session(mdsc, mi->export_targets[i]); 1553 ceph_put_mds_session(ts); 1554 } 1555 } 1556 1557 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, 1558 struct ceph_mds_session *session) 1559 { 1560 mutex_lock(&mdsc->mutex); 1561 __open_export_target_sessions(mdsc, session); 1562 mutex_unlock(&mdsc->mutex); 1563 } 1564 1565 /* 1566 * session caps 1567 */ 1568 1569 static void detach_cap_releases(struct ceph_mds_session *session, 1570 struct list_head *target) 1571 { 1572 lockdep_assert_held(&session->s_cap_lock); 1573 1574 list_splice_init(&session->s_cap_releases, target); 1575 session->s_num_cap_releases = 0; 1576 dout("dispose_cap_releases mds%d\n", session->s_mds); 1577 } 1578 1579 static void dispose_cap_releases(struct ceph_mds_client *mdsc, 1580 struct list_head *dispose) 1581 { 1582 while (!list_empty(dispose)) { 1583 struct ceph_cap *cap; 1584 /* zero out the in-progress message */ 1585 cap = list_first_entry(dispose, struct ceph_cap, session_caps); 1586 list_del(&cap->session_caps); 1587 ceph_put_cap(mdsc, cap); 1588 } 1589 } 1590 1591 static void cleanup_session_requests(struct ceph_mds_client *mdsc, 1592 struct ceph_mds_session *session) 1593 { 1594 struct ceph_mds_request *req; 1595 struct rb_node *p; 1596 1597 dout("cleanup_session_requests mds%d\n", session->s_mds); 1598 mutex_lock(&mdsc->mutex); 1599 while (!list_empty(&session->s_unsafe)) { 1600 req = list_first_entry(&session->s_unsafe, 1601 struct ceph_mds_request, r_unsafe_item); 1602 pr_warn_ratelimited(" dropping unsafe request %llu\n", 1603 req->r_tid); 1604 if (req->r_target_inode) 1605 mapping_set_error(req->r_target_inode->i_mapping, -EIO); 1606 if (req->r_unsafe_dir) 1607 mapping_set_error(req->r_unsafe_dir->i_mapping, -EIO); 1608 __unregister_request(mdsc, req); 1609 } 1610 /* zero r_attempts, so kick_requests() will re-send requests */ 1611 p = rb_first(&mdsc->request_tree); 1612 while (p) { 1613 req = rb_entry(p, struct ceph_mds_request, r_node); 1614 p = rb_next(p); 1615 if (req->r_session && 1616 req->r_session->s_mds == session->s_mds) 1617 req->r_attempts = 0; 1618 } 1619 mutex_unlock(&mdsc->mutex); 1620 } 1621 1622 /* 1623 * Helper to safely iterate over all caps associated with a session, with 1624 * special care taken to handle a racing __ceph_remove_cap(). 1625 * 1626 * Caller must hold session s_mutex. 1627 */ 1628 int ceph_iterate_session_caps(struct ceph_mds_session *session, 1629 int (*cb)(struct inode *, struct ceph_cap *, 1630 void *), void *arg) 1631 { 1632 struct list_head *p; 1633 struct ceph_cap *cap; 1634 struct inode *inode, *last_inode = NULL; 1635 struct ceph_cap *old_cap = NULL; 1636 int ret; 1637 1638 dout("iterate_session_caps %p mds%d\n", session, session->s_mds); 1639 spin_lock(&session->s_cap_lock); 1640 p = session->s_caps.next; 1641 while (p != &session->s_caps) { 1642 cap = list_entry(p, struct ceph_cap, session_caps); 1643 inode = igrab(&cap->ci->netfs.inode); 1644 if (!inode) { 1645 p = p->next; 1646 continue; 1647 } 1648 session->s_cap_iterator = cap; 1649 spin_unlock(&session->s_cap_lock); 1650 1651 if (last_inode) { 1652 iput(last_inode); 1653 last_inode = NULL; 1654 } 1655 if (old_cap) { 1656 ceph_put_cap(session->s_mdsc, old_cap); 1657 old_cap = NULL; 1658 } 1659 1660 ret = cb(inode, cap, arg); 1661 last_inode = inode; 1662 1663 spin_lock(&session->s_cap_lock); 1664 p = p->next; 1665 if (!cap->ci) { 1666 dout("iterate_session_caps finishing cap %p removal\n", 1667 cap); 1668 BUG_ON(cap->session != session); 1669 cap->session = NULL; 1670 list_del_init(&cap->session_caps); 1671 session->s_nr_caps--; 1672 atomic64_dec(&session->s_mdsc->metric.total_caps); 1673 if (cap->queue_release) 1674 __ceph_queue_cap_release(session, cap); 1675 else 1676 old_cap = cap; /* put_cap it w/o locks held */ 1677 } 1678 if (ret < 0) 1679 goto out; 1680 } 1681 ret = 0; 1682 out: 1683 session->s_cap_iterator = NULL; 1684 spin_unlock(&session->s_cap_lock); 1685 1686 iput(last_inode); 1687 if (old_cap) 1688 ceph_put_cap(session->s_mdsc, old_cap); 1689 1690 return ret; 1691 } 1692 1693 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, 1694 void *arg) 1695 { 1696 struct ceph_inode_info *ci = ceph_inode(inode); 1697 bool invalidate = false; 1698 int iputs; 1699 1700 dout("removing cap %p, ci is %p, inode is %p\n", 1701 cap, ci, &ci->netfs.inode); 1702 spin_lock(&ci->i_ceph_lock); 1703 iputs = ceph_purge_inode_cap(inode, cap, &invalidate); 1704 spin_unlock(&ci->i_ceph_lock); 1705 1706 wake_up_all(&ci->i_cap_wq); 1707 if (invalidate) 1708 ceph_queue_invalidate(inode); 1709 while (iputs--) 1710 iput(inode); 1711 return 0; 1712 } 1713 1714 /* 1715 * caller must hold session s_mutex 1716 */ 1717 static void remove_session_caps(struct ceph_mds_session *session) 1718 { 1719 struct ceph_fs_client *fsc = session->s_mdsc->fsc; 1720 struct super_block *sb = fsc->sb; 1721 LIST_HEAD(dispose); 1722 1723 dout("remove_session_caps on %p\n", session); 1724 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc); 1725 1726 wake_up_all(&fsc->mdsc->cap_flushing_wq); 1727 1728 spin_lock(&session->s_cap_lock); 1729 if (session->s_nr_caps > 0) { 1730 struct inode *inode; 1731 struct ceph_cap *cap, *prev = NULL; 1732 struct ceph_vino vino; 1733 /* 1734 * iterate_session_caps() skips inodes that are being 1735 * deleted, we need to wait until deletions are complete. 1736 * __wait_on_freeing_inode() is designed for the job, 1737 * but it is not exported, so use lookup inode function 1738 * to access it. 1739 */ 1740 while (!list_empty(&session->s_caps)) { 1741 cap = list_entry(session->s_caps.next, 1742 struct ceph_cap, session_caps); 1743 if (cap == prev) 1744 break; 1745 prev = cap; 1746 vino = cap->ci->i_vino; 1747 spin_unlock(&session->s_cap_lock); 1748 1749 inode = ceph_find_inode(sb, vino); 1750 iput(inode); 1751 1752 spin_lock(&session->s_cap_lock); 1753 } 1754 } 1755 1756 // drop cap expires and unlock s_cap_lock 1757 detach_cap_releases(session, &dispose); 1758 1759 BUG_ON(session->s_nr_caps > 0); 1760 BUG_ON(!list_empty(&session->s_cap_flushing)); 1761 spin_unlock(&session->s_cap_lock); 1762 dispose_cap_releases(session->s_mdsc, &dispose); 1763 } 1764 1765 enum { 1766 RECONNECT, 1767 RENEWCAPS, 1768 FORCE_RO, 1769 }; 1770 1771 /* 1772 * wake up any threads waiting on this session's caps. if the cap is 1773 * old (didn't get renewed on the client reconnect), remove it now. 1774 * 1775 * caller must hold s_mutex. 1776 */ 1777 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, 1778 void *arg) 1779 { 1780 struct ceph_inode_info *ci = ceph_inode(inode); 1781 unsigned long ev = (unsigned long)arg; 1782 1783 if (ev == RECONNECT) { 1784 spin_lock(&ci->i_ceph_lock); 1785 ci->i_wanted_max_size = 0; 1786 ci->i_requested_max_size = 0; 1787 spin_unlock(&ci->i_ceph_lock); 1788 } else if (ev == RENEWCAPS) { 1789 if (cap->cap_gen < atomic_read(&cap->session->s_cap_gen)) { 1790 /* mds did not re-issue stale cap */ 1791 spin_lock(&ci->i_ceph_lock); 1792 cap->issued = cap->implemented = CEPH_CAP_PIN; 1793 spin_unlock(&ci->i_ceph_lock); 1794 } 1795 } else if (ev == FORCE_RO) { 1796 } 1797 wake_up_all(&ci->i_cap_wq); 1798 return 0; 1799 } 1800 1801 static void wake_up_session_caps(struct ceph_mds_session *session, int ev) 1802 { 1803 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds); 1804 ceph_iterate_session_caps(session, wake_up_session_cb, 1805 (void *)(unsigned long)ev); 1806 } 1807 1808 /* 1809 * Send periodic message to MDS renewing all currently held caps. The 1810 * ack will reset the expiration for all caps from this session. 1811 * 1812 * caller holds s_mutex 1813 */ 1814 static int send_renew_caps(struct ceph_mds_client *mdsc, 1815 struct ceph_mds_session *session) 1816 { 1817 struct ceph_msg *msg; 1818 int state; 1819 1820 if (time_after_eq(jiffies, session->s_cap_ttl) && 1821 time_after_eq(session->s_cap_ttl, session->s_renew_requested)) 1822 pr_info("mds%d caps stale\n", session->s_mds); 1823 session->s_renew_requested = jiffies; 1824 1825 /* do not try to renew caps until a recovering mds has reconnected 1826 * with its clients. */ 1827 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); 1828 if (state < CEPH_MDS_STATE_RECONNECT) { 1829 dout("send_renew_caps ignoring mds%d (%s)\n", 1830 session->s_mds, ceph_mds_state_name(state)); 1831 return 0; 1832 } 1833 1834 dout("send_renew_caps to mds%d (%s)\n", session->s_mds, 1835 ceph_mds_state_name(state)); 1836 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, 1837 ++session->s_renew_seq); 1838 if (!msg) 1839 return -ENOMEM; 1840 ceph_con_send(&session->s_con, msg); 1841 return 0; 1842 } 1843 1844 static int send_flushmsg_ack(struct ceph_mds_client *mdsc, 1845 struct ceph_mds_session *session, u64 seq) 1846 { 1847 struct ceph_msg *msg; 1848 1849 dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n", 1850 session->s_mds, ceph_session_state_name(session->s_state), seq); 1851 msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq); 1852 if (!msg) 1853 return -ENOMEM; 1854 ceph_con_send(&session->s_con, msg); 1855 return 0; 1856 } 1857 1858 1859 /* 1860 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 1861 * 1862 * Called under session->s_mutex 1863 */ 1864 static void renewed_caps(struct ceph_mds_client *mdsc, 1865 struct ceph_mds_session *session, int is_renew) 1866 { 1867 int was_stale; 1868 int wake = 0; 1869 1870 spin_lock(&session->s_cap_lock); 1871 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl); 1872 1873 session->s_cap_ttl = session->s_renew_requested + 1874 mdsc->mdsmap->m_session_timeout*HZ; 1875 1876 if (was_stale) { 1877 if (time_before(jiffies, session->s_cap_ttl)) { 1878 pr_info("mds%d caps renewed\n", session->s_mds); 1879 wake = 1; 1880 } else { 1881 pr_info("mds%d caps still stale\n", session->s_mds); 1882 } 1883 } 1884 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n", 1885 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh", 1886 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); 1887 spin_unlock(&session->s_cap_lock); 1888 1889 if (wake) 1890 wake_up_session_caps(session, RENEWCAPS); 1891 } 1892 1893 /* 1894 * send a session close request 1895 */ 1896 static int request_close_session(struct ceph_mds_session *session) 1897 { 1898 struct ceph_msg *msg; 1899 1900 dout("request_close_session mds%d state %s seq %lld\n", 1901 session->s_mds, ceph_session_state_name(session->s_state), 1902 session->s_seq); 1903 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE, 1904 session->s_seq); 1905 if (!msg) 1906 return -ENOMEM; 1907 ceph_con_send(&session->s_con, msg); 1908 return 1; 1909 } 1910 1911 /* 1912 * Called with s_mutex held. 1913 */ 1914 static int __close_session(struct ceph_mds_client *mdsc, 1915 struct ceph_mds_session *session) 1916 { 1917 if (session->s_state >= CEPH_MDS_SESSION_CLOSING) 1918 return 0; 1919 session->s_state = CEPH_MDS_SESSION_CLOSING; 1920 return request_close_session(session); 1921 } 1922 1923 static bool drop_negative_children(struct dentry *dentry) 1924 { 1925 struct dentry *child; 1926 bool all_negative = true; 1927 1928 if (!d_is_dir(dentry)) 1929 goto out; 1930 1931 spin_lock(&dentry->d_lock); 1932 list_for_each_entry(child, &dentry->d_subdirs, d_child) { 1933 if (d_really_is_positive(child)) { 1934 all_negative = false; 1935 break; 1936 } 1937 } 1938 spin_unlock(&dentry->d_lock); 1939 1940 if (all_negative) 1941 shrink_dcache_parent(dentry); 1942 out: 1943 return all_negative; 1944 } 1945 1946 /* 1947 * Trim old(er) caps. 1948 * 1949 * Because we can't cache an inode without one or more caps, we do 1950 * this indirectly: if a cap is unused, we prune its aliases, at which 1951 * point the inode will hopefully get dropped to. 1952 * 1953 * Yes, this is a bit sloppy. Our only real goal here is to respond to 1954 * memory pressure from the MDS, though, so it needn't be perfect. 1955 */ 1956 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) 1957 { 1958 int *remaining = arg; 1959 struct ceph_inode_info *ci = ceph_inode(inode); 1960 int used, wanted, oissued, mine; 1961 1962 if (*remaining <= 0) 1963 return -1; 1964 1965 spin_lock(&ci->i_ceph_lock); 1966 mine = cap->issued | cap->implemented; 1967 used = __ceph_caps_used(ci); 1968 wanted = __ceph_caps_file_wanted(ci); 1969 oissued = __ceph_caps_issued_other(ci, cap); 1970 1971 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n", 1972 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), 1973 ceph_cap_string(used), ceph_cap_string(wanted)); 1974 if (cap == ci->i_auth_cap) { 1975 if (ci->i_dirty_caps || ci->i_flushing_caps || 1976 !list_empty(&ci->i_cap_snaps)) 1977 goto out; 1978 if ((used | wanted) & CEPH_CAP_ANY_WR) 1979 goto out; 1980 /* Note: it's possible that i_filelock_ref becomes non-zero 1981 * after dropping auth caps. It doesn't hurt because reply 1982 * of lock mds request will re-add auth caps. */ 1983 if (atomic_read(&ci->i_filelock_ref) > 0) 1984 goto out; 1985 } 1986 /* The inode has cached pages, but it's no longer used. 1987 * we can safely drop it */ 1988 if (S_ISREG(inode->i_mode) && 1989 wanted == 0 && used == CEPH_CAP_FILE_CACHE && 1990 !(oissued & CEPH_CAP_FILE_CACHE)) { 1991 used = 0; 1992 oissued = 0; 1993 } 1994 if ((used | wanted) & ~oissued & mine) 1995 goto out; /* we need these caps */ 1996 1997 if (oissued) { 1998 /* we aren't the only cap.. just remove us */ 1999 ceph_remove_cap(cap, true); 2000 (*remaining)--; 2001 } else { 2002 struct dentry *dentry; 2003 /* try dropping referring dentries */ 2004 spin_unlock(&ci->i_ceph_lock); 2005 dentry = d_find_any_alias(inode); 2006 if (dentry && drop_negative_children(dentry)) { 2007 int count; 2008 dput(dentry); 2009 d_prune_aliases(inode); 2010 count = atomic_read(&inode->i_count); 2011 if (count == 1) 2012 (*remaining)--; 2013 dout("trim_caps_cb %p cap %p pruned, count now %d\n", 2014 inode, cap, count); 2015 } else { 2016 dput(dentry); 2017 } 2018 return 0; 2019 } 2020 2021 out: 2022 spin_unlock(&ci->i_ceph_lock); 2023 return 0; 2024 } 2025 2026 /* 2027 * Trim session cap count down to some max number. 2028 */ 2029 int ceph_trim_caps(struct ceph_mds_client *mdsc, 2030 struct ceph_mds_session *session, 2031 int max_caps) 2032 { 2033 int trim_caps = session->s_nr_caps - max_caps; 2034 2035 dout("trim_caps mds%d start: %d / %d, trim %d\n", 2036 session->s_mds, session->s_nr_caps, max_caps, trim_caps); 2037 if (trim_caps > 0) { 2038 int remaining = trim_caps; 2039 2040 ceph_iterate_session_caps(session, trim_caps_cb, &remaining); 2041 dout("trim_caps mds%d done: %d / %d, trimmed %d\n", 2042 session->s_mds, session->s_nr_caps, max_caps, 2043 trim_caps - remaining); 2044 } 2045 2046 ceph_flush_cap_releases(mdsc, session); 2047 return 0; 2048 } 2049 2050 static int check_caps_flush(struct ceph_mds_client *mdsc, 2051 u64 want_flush_tid) 2052 { 2053 int ret = 1; 2054 2055 spin_lock(&mdsc->cap_dirty_lock); 2056 if (!list_empty(&mdsc->cap_flush_list)) { 2057 struct ceph_cap_flush *cf = 2058 list_first_entry(&mdsc->cap_flush_list, 2059 struct ceph_cap_flush, g_list); 2060 if (cf->tid <= want_flush_tid) { 2061 dout("check_caps_flush still flushing tid " 2062 "%llu <= %llu\n", cf->tid, want_flush_tid); 2063 ret = 0; 2064 } 2065 } 2066 spin_unlock(&mdsc->cap_dirty_lock); 2067 return ret; 2068 } 2069 2070 /* 2071 * flush all dirty inode data to disk. 2072 * 2073 * returns true if we've flushed through want_flush_tid 2074 */ 2075 static void wait_caps_flush(struct ceph_mds_client *mdsc, 2076 u64 want_flush_tid) 2077 { 2078 dout("check_caps_flush want %llu\n", want_flush_tid); 2079 2080 wait_event(mdsc->cap_flushing_wq, 2081 check_caps_flush(mdsc, want_flush_tid)); 2082 2083 dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid); 2084 } 2085 2086 /* 2087 * called under s_mutex 2088 */ 2089 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 2090 struct ceph_mds_session *session) 2091 { 2092 struct ceph_msg *msg = NULL; 2093 struct ceph_mds_cap_release *head; 2094 struct ceph_mds_cap_item *item; 2095 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; 2096 struct ceph_cap *cap; 2097 LIST_HEAD(tmp_list); 2098 int num_cap_releases; 2099 __le32 barrier, *cap_barrier; 2100 2101 down_read(&osdc->lock); 2102 barrier = cpu_to_le32(osdc->epoch_barrier); 2103 up_read(&osdc->lock); 2104 2105 spin_lock(&session->s_cap_lock); 2106 again: 2107 list_splice_init(&session->s_cap_releases, &tmp_list); 2108 num_cap_releases = session->s_num_cap_releases; 2109 session->s_num_cap_releases = 0; 2110 spin_unlock(&session->s_cap_lock); 2111 2112 while (!list_empty(&tmp_list)) { 2113 if (!msg) { 2114 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, 2115 PAGE_SIZE, GFP_NOFS, false); 2116 if (!msg) 2117 goto out_err; 2118 head = msg->front.iov_base; 2119 head->num = cpu_to_le32(0); 2120 msg->front.iov_len = sizeof(*head); 2121 2122 msg->hdr.version = cpu_to_le16(2); 2123 msg->hdr.compat_version = cpu_to_le16(1); 2124 } 2125 2126 cap = list_first_entry(&tmp_list, struct ceph_cap, 2127 session_caps); 2128 list_del(&cap->session_caps); 2129 num_cap_releases--; 2130 2131 head = msg->front.iov_base; 2132 put_unaligned_le32(get_unaligned_le32(&head->num) + 1, 2133 &head->num); 2134 item = msg->front.iov_base + msg->front.iov_len; 2135 item->ino = cpu_to_le64(cap->cap_ino); 2136 item->cap_id = cpu_to_le64(cap->cap_id); 2137 item->migrate_seq = cpu_to_le32(cap->mseq); 2138 item->seq = cpu_to_le32(cap->issue_seq); 2139 msg->front.iov_len += sizeof(*item); 2140 2141 ceph_put_cap(mdsc, cap); 2142 2143 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { 2144 // Append cap_barrier field 2145 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2146 *cap_barrier = barrier; 2147 msg->front.iov_len += sizeof(*cap_barrier); 2148 2149 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2150 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 2151 ceph_con_send(&session->s_con, msg); 2152 msg = NULL; 2153 } 2154 } 2155 2156 BUG_ON(num_cap_releases != 0); 2157 2158 spin_lock(&session->s_cap_lock); 2159 if (!list_empty(&session->s_cap_releases)) 2160 goto again; 2161 spin_unlock(&session->s_cap_lock); 2162 2163 if (msg) { 2164 // Append cap_barrier field 2165 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2166 *cap_barrier = barrier; 2167 msg->front.iov_len += sizeof(*cap_barrier); 2168 2169 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2170 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 2171 ceph_con_send(&session->s_con, msg); 2172 } 2173 return; 2174 out_err: 2175 pr_err("send_cap_releases mds%d, failed to allocate message\n", 2176 session->s_mds); 2177 spin_lock(&session->s_cap_lock); 2178 list_splice(&tmp_list, &session->s_cap_releases); 2179 session->s_num_cap_releases += num_cap_releases; 2180 spin_unlock(&session->s_cap_lock); 2181 } 2182 2183 static void ceph_cap_release_work(struct work_struct *work) 2184 { 2185 struct ceph_mds_session *session = 2186 container_of(work, struct ceph_mds_session, s_cap_release_work); 2187 2188 mutex_lock(&session->s_mutex); 2189 if (session->s_state == CEPH_MDS_SESSION_OPEN || 2190 session->s_state == CEPH_MDS_SESSION_HUNG) 2191 ceph_send_cap_releases(session->s_mdsc, session); 2192 mutex_unlock(&session->s_mutex); 2193 ceph_put_mds_session(session); 2194 } 2195 2196 void ceph_flush_cap_releases(struct ceph_mds_client *mdsc, 2197 struct ceph_mds_session *session) 2198 { 2199 if (mdsc->stopping) 2200 return; 2201 2202 ceph_get_mds_session(session); 2203 if (queue_work(mdsc->fsc->cap_wq, 2204 &session->s_cap_release_work)) { 2205 dout("cap release work queued\n"); 2206 } else { 2207 ceph_put_mds_session(session); 2208 dout("failed to queue cap release work\n"); 2209 } 2210 } 2211 2212 /* 2213 * caller holds session->s_cap_lock 2214 */ 2215 void __ceph_queue_cap_release(struct ceph_mds_session *session, 2216 struct ceph_cap *cap) 2217 { 2218 list_add_tail(&cap->session_caps, &session->s_cap_releases); 2219 session->s_num_cap_releases++; 2220 2221 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE)) 2222 ceph_flush_cap_releases(session->s_mdsc, session); 2223 } 2224 2225 static void ceph_cap_reclaim_work(struct work_struct *work) 2226 { 2227 struct ceph_mds_client *mdsc = 2228 container_of(work, struct ceph_mds_client, cap_reclaim_work); 2229 int ret = ceph_trim_dentries(mdsc); 2230 if (ret == -EAGAIN) 2231 ceph_queue_cap_reclaim_work(mdsc); 2232 } 2233 2234 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc) 2235 { 2236 if (mdsc->stopping) 2237 return; 2238 2239 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) { 2240 dout("caps reclaim work queued\n"); 2241 } else { 2242 dout("failed to queue caps release work\n"); 2243 } 2244 } 2245 2246 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr) 2247 { 2248 int val; 2249 if (!nr) 2250 return; 2251 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending); 2252 if ((val % CEPH_CAPS_PER_RELEASE) < nr) { 2253 atomic_set(&mdsc->cap_reclaim_pending, 0); 2254 ceph_queue_cap_reclaim_work(mdsc); 2255 } 2256 } 2257 2258 /* 2259 * requests 2260 */ 2261 2262 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, 2263 struct inode *dir) 2264 { 2265 struct ceph_inode_info *ci = ceph_inode(dir); 2266 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; 2267 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options; 2268 size_t size = sizeof(struct ceph_mds_reply_dir_entry); 2269 unsigned int num_entries; 2270 int order; 2271 2272 spin_lock(&ci->i_ceph_lock); 2273 num_entries = ci->i_files + ci->i_subdirs; 2274 spin_unlock(&ci->i_ceph_lock); 2275 num_entries = max(num_entries, 1U); 2276 num_entries = min(num_entries, opt->max_readdir); 2277 2278 order = get_order(size * num_entries); 2279 while (order >= 0) { 2280 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL | 2281 __GFP_NOWARN | 2282 __GFP_ZERO, 2283 order); 2284 if (rinfo->dir_entries) 2285 break; 2286 order--; 2287 } 2288 if (!rinfo->dir_entries) 2289 return -ENOMEM; 2290 2291 num_entries = (PAGE_SIZE << order) / size; 2292 num_entries = min(num_entries, opt->max_readdir); 2293 2294 rinfo->dir_buf_size = PAGE_SIZE << order; 2295 req->r_num_caps = num_entries + 1; 2296 req->r_args.readdir.max_entries = cpu_to_le32(num_entries); 2297 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes); 2298 return 0; 2299 } 2300 2301 /* 2302 * Create an mds request. 2303 */ 2304 struct ceph_mds_request * 2305 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) 2306 { 2307 struct ceph_mds_request *req; 2308 2309 req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS); 2310 if (!req) 2311 return ERR_PTR(-ENOMEM); 2312 2313 mutex_init(&req->r_fill_mutex); 2314 req->r_mdsc = mdsc; 2315 req->r_started = jiffies; 2316 req->r_start_latency = ktime_get(); 2317 req->r_resend_mds = -1; 2318 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 2319 INIT_LIST_HEAD(&req->r_unsafe_target_item); 2320 req->r_fmode = -1; 2321 req->r_feature_needed = -1; 2322 kref_init(&req->r_kref); 2323 RB_CLEAR_NODE(&req->r_node); 2324 INIT_LIST_HEAD(&req->r_wait); 2325 init_completion(&req->r_completion); 2326 init_completion(&req->r_safe_completion); 2327 INIT_LIST_HEAD(&req->r_unsafe_item); 2328 2329 ktime_get_coarse_real_ts64(&req->r_stamp); 2330 2331 req->r_op = op; 2332 req->r_direct_mode = mode; 2333 return req; 2334 } 2335 2336 /* 2337 * return oldest (lowest) request, tid in request tree, 0 if none. 2338 * 2339 * called under mdsc->mutex. 2340 */ 2341 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) 2342 { 2343 if (RB_EMPTY_ROOT(&mdsc->request_tree)) 2344 return NULL; 2345 return rb_entry(rb_first(&mdsc->request_tree), 2346 struct ceph_mds_request, r_node); 2347 } 2348 2349 static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) 2350 { 2351 return mdsc->oldest_tid; 2352 } 2353 2354 /* 2355 * Build a dentry's path. Allocate on heap; caller must kfree. Based 2356 * on build_path_from_dentry in fs/cifs/dir.c. 2357 * 2358 * If @stop_on_nosnap, generate path relative to the first non-snapped 2359 * inode. 2360 * 2361 * Encode hidden .snap dirs as a double /, i.e. 2362 * foo/.snap/bar -> foo//bar 2363 */ 2364 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase, 2365 int stop_on_nosnap) 2366 { 2367 struct dentry *temp; 2368 char *path; 2369 int pos; 2370 unsigned seq; 2371 u64 base; 2372 2373 if (!dentry) 2374 return ERR_PTR(-EINVAL); 2375 2376 path = __getname(); 2377 if (!path) 2378 return ERR_PTR(-ENOMEM); 2379 retry: 2380 pos = PATH_MAX - 1; 2381 path[pos] = '\0'; 2382 2383 seq = read_seqbegin(&rename_lock); 2384 rcu_read_lock(); 2385 temp = dentry; 2386 for (;;) { 2387 struct inode *inode; 2388 2389 spin_lock(&temp->d_lock); 2390 inode = d_inode(temp); 2391 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 2392 dout("build_path path+%d: %p SNAPDIR\n", 2393 pos, temp); 2394 } else if (stop_on_nosnap && inode && dentry != temp && 2395 ceph_snap(inode) == CEPH_NOSNAP) { 2396 spin_unlock(&temp->d_lock); 2397 pos++; /* get rid of any prepended '/' */ 2398 break; 2399 } else { 2400 pos -= temp->d_name.len; 2401 if (pos < 0) { 2402 spin_unlock(&temp->d_lock); 2403 break; 2404 } 2405 memcpy(path + pos, temp->d_name.name, temp->d_name.len); 2406 } 2407 spin_unlock(&temp->d_lock); 2408 temp = READ_ONCE(temp->d_parent); 2409 2410 /* Are we at the root? */ 2411 if (IS_ROOT(temp)) 2412 break; 2413 2414 /* Are we out of buffer? */ 2415 if (--pos < 0) 2416 break; 2417 2418 path[pos] = '/'; 2419 } 2420 base = ceph_ino(d_inode(temp)); 2421 rcu_read_unlock(); 2422 2423 if (read_seqretry(&rename_lock, seq)) 2424 goto retry; 2425 2426 if (pos < 0) { 2427 /* 2428 * A rename didn't occur, but somehow we didn't end up where 2429 * we thought we would. Throw a warning and try again. 2430 */ 2431 pr_warn("build_path did not end path lookup where " 2432 "expected, pos is %d\n", pos); 2433 goto retry; 2434 } 2435 2436 *pbase = base; 2437 *plen = PATH_MAX - 1 - pos; 2438 dout("build_path on %p %d built %llx '%.*s'\n", 2439 dentry, d_count(dentry), base, *plen, path + pos); 2440 return path + pos; 2441 } 2442 2443 static int build_dentry_path(struct dentry *dentry, struct inode *dir, 2444 const char **ppath, int *ppathlen, u64 *pino, 2445 bool *pfreepath, bool parent_locked) 2446 { 2447 char *path; 2448 2449 rcu_read_lock(); 2450 if (!dir) 2451 dir = d_inode_rcu(dentry->d_parent); 2452 if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) { 2453 *pino = ceph_ino(dir); 2454 rcu_read_unlock(); 2455 *ppath = dentry->d_name.name; 2456 *ppathlen = dentry->d_name.len; 2457 return 0; 2458 } 2459 rcu_read_unlock(); 2460 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2461 if (IS_ERR(path)) 2462 return PTR_ERR(path); 2463 *ppath = path; 2464 *pfreepath = true; 2465 return 0; 2466 } 2467 2468 static int build_inode_path(struct inode *inode, 2469 const char **ppath, int *ppathlen, u64 *pino, 2470 bool *pfreepath) 2471 { 2472 struct dentry *dentry; 2473 char *path; 2474 2475 if (ceph_snap(inode) == CEPH_NOSNAP) { 2476 *pino = ceph_ino(inode); 2477 *ppathlen = 0; 2478 return 0; 2479 } 2480 dentry = d_find_alias(inode); 2481 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2482 dput(dentry); 2483 if (IS_ERR(path)) 2484 return PTR_ERR(path); 2485 *ppath = path; 2486 *pfreepath = true; 2487 return 0; 2488 } 2489 2490 /* 2491 * request arguments may be specified via an inode *, a dentry *, or 2492 * an explicit ino+path. 2493 */ 2494 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, 2495 struct inode *rdiri, const char *rpath, 2496 u64 rino, const char **ppath, int *pathlen, 2497 u64 *ino, bool *freepath, bool parent_locked) 2498 { 2499 int r = 0; 2500 2501 if (rinode) { 2502 r = build_inode_path(rinode, ppath, pathlen, ino, freepath); 2503 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode), 2504 ceph_snap(rinode)); 2505 } else if (rdentry) { 2506 r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino, 2507 freepath, parent_locked); 2508 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, 2509 *ppath); 2510 } else if (rpath || rino) { 2511 *ino = rino; 2512 *ppath = rpath; 2513 *pathlen = rpath ? strlen(rpath) : 0; 2514 dout(" path %.*s\n", *pathlen, rpath); 2515 } 2516 2517 return r; 2518 } 2519 2520 static void encode_timestamp_and_gids(void **p, 2521 const struct ceph_mds_request *req) 2522 { 2523 struct ceph_timespec ts; 2524 int i; 2525 2526 ceph_encode_timespec64(&ts, &req->r_stamp); 2527 ceph_encode_copy(p, &ts, sizeof(ts)); 2528 2529 /* gid_list */ 2530 ceph_encode_32(p, req->r_cred->group_info->ngroups); 2531 for (i = 0; i < req->r_cred->group_info->ngroups; i++) 2532 ceph_encode_64(p, from_kgid(&init_user_ns, 2533 req->r_cred->group_info->gid[i])); 2534 } 2535 2536 /* 2537 * called under mdsc->mutex 2538 */ 2539 static struct ceph_msg *create_request_message(struct ceph_mds_session *session, 2540 struct ceph_mds_request *req, 2541 bool drop_cap_releases) 2542 { 2543 int mds = session->s_mds; 2544 struct ceph_mds_client *mdsc = session->s_mdsc; 2545 struct ceph_msg *msg; 2546 struct ceph_mds_request_head_old *head; 2547 const char *path1 = NULL; 2548 const char *path2 = NULL; 2549 u64 ino1 = 0, ino2 = 0; 2550 int pathlen1 = 0, pathlen2 = 0; 2551 bool freepath1 = false, freepath2 = false; 2552 int len; 2553 u16 releases; 2554 void *p, *end; 2555 int ret; 2556 bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME); 2557 2558 ret = set_request_path_attr(req->r_inode, req->r_dentry, 2559 req->r_parent, req->r_path1, req->r_ino1.ino, 2560 &path1, &pathlen1, &ino1, &freepath1, 2561 test_bit(CEPH_MDS_R_PARENT_LOCKED, 2562 &req->r_req_flags)); 2563 if (ret < 0) { 2564 msg = ERR_PTR(ret); 2565 goto out; 2566 } 2567 2568 /* If r_old_dentry is set, then assume that its parent is locked */ 2569 ret = set_request_path_attr(NULL, req->r_old_dentry, 2570 req->r_old_dentry_dir, 2571 req->r_path2, req->r_ino2.ino, 2572 &path2, &pathlen2, &ino2, &freepath2, true); 2573 if (ret < 0) { 2574 msg = ERR_PTR(ret); 2575 goto out_free1; 2576 } 2577 2578 len = legacy ? sizeof(*head) : sizeof(struct ceph_mds_request_head); 2579 len += pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) + 2580 sizeof(struct ceph_timespec); 2581 len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups); 2582 2583 /* calculate (max) length for cap releases */ 2584 len += sizeof(struct ceph_mds_request_release) * 2585 (!!req->r_inode_drop + !!req->r_dentry_drop + 2586 !!req->r_old_inode_drop + !!req->r_old_dentry_drop); 2587 2588 if (req->r_dentry_drop) 2589 len += pathlen1; 2590 if (req->r_old_dentry_drop) 2591 len += pathlen2; 2592 2593 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false); 2594 if (!msg) { 2595 msg = ERR_PTR(-ENOMEM); 2596 goto out_free2; 2597 } 2598 2599 msg->hdr.tid = cpu_to_le64(req->r_tid); 2600 2601 /* 2602 * The old ceph_mds_request_head didn't contain a version field, and 2603 * one was added when we moved the message version from 3->4. 2604 */ 2605 if (legacy) { 2606 msg->hdr.version = cpu_to_le16(3); 2607 head = msg->front.iov_base; 2608 p = msg->front.iov_base + sizeof(*head); 2609 } else { 2610 struct ceph_mds_request_head *new_head = msg->front.iov_base; 2611 2612 msg->hdr.version = cpu_to_le16(4); 2613 new_head->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION); 2614 head = (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid; 2615 p = msg->front.iov_base + sizeof(*new_head); 2616 } 2617 2618 end = msg->front.iov_base + msg->front.iov_len; 2619 2620 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); 2621 head->op = cpu_to_le32(req->r_op); 2622 head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, 2623 req->r_cred->fsuid)); 2624 head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, 2625 req->r_cred->fsgid)); 2626 head->ino = cpu_to_le64(req->r_deleg_ino); 2627 head->args = req->r_args; 2628 2629 ceph_encode_filepath(&p, end, ino1, path1); 2630 ceph_encode_filepath(&p, end, ino2, path2); 2631 2632 /* make note of release offset, in case we need to replay */ 2633 req->r_request_release_offset = p - msg->front.iov_base; 2634 2635 /* cap releases */ 2636 releases = 0; 2637 if (req->r_inode_drop) 2638 releases += ceph_encode_inode_release(&p, 2639 req->r_inode ? req->r_inode : d_inode(req->r_dentry), 2640 mds, req->r_inode_drop, req->r_inode_unless, 2641 req->r_op == CEPH_MDS_OP_READDIR); 2642 if (req->r_dentry_drop) 2643 releases += ceph_encode_dentry_release(&p, req->r_dentry, 2644 req->r_parent, mds, req->r_dentry_drop, 2645 req->r_dentry_unless); 2646 if (req->r_old_dentry_drop) 2647 releases += ceph_encode_dentry_release(&p, req->r_old_dentry, 2648 req->r_old_dentry_dir, mds, 2649 req->r_old_dentry_drop, 2650 req->r_old_dentry_unless); 2651 if (req->r_old_inode_drop) 2652 releases += ceph_encode_inode_release(&p, 2653 d_inode(req->r_old_dentry), 2654 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); 2655 2656 if (drop_cap_releases) { 2657 releases = 0; 2658 p = msg->front.iov_base + req->r_request_release_offset; 2659 } 2660 2661 head->num_releases = cpu_to_le16(releases); 2662 2663 encode_timestamp_and_gids(&p, req); 2664 2665 if (WARN_ON_ONCE(p > end)) { 2666 ceph_msg_put(msg); 2667 msg = ERR_PTR(-ERANGE); 2668 goto out_free2; 2669 } 2670 2671 msg->front.iov_len = p - msg->front.iov_base; 2672 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2673 2674 if (req->r_pagelist) { 2675 struct ceph_pagelist *pagelist = req->r_pagelist; 2676 ceph_msg_data_add_pagelist(msg, pagelist); 2677 msg->hdr.data_len = cpu_to_le32(pagelist->length); 2678 } else { 2679 msg->hdr.data_len = 0; 2680 } 2681 2682 msg->hdr.data_off = cpu_to_le16(0); 2683 2684 out_free2: 2685 if (freepath2) 2686 ceph_mdsc_free_path((char *)path2, pathlen2); 2687 out_free1: 2688 if (freepath1) 2689 ceph_mdsc_free_path((char *)path1, pathlen1); 2690 out: 2691 return msg; 2692 } 2693 2694 /* 2695 * called under mdsc->mutex if error, under no mutex if 2696 * success. 2697 */ 2698 static void complete_request(struct ceph_mds_client *mdsc, 2699 struct ceph_mds_request *req) 2700 { 2701 req->r_end_latency = ktime_get(); 2702 2703 if (req->r_callback) 2704 req->r_callback(mdsc, req); 2705 complete_all(&req->r_completion); 2706 } 2707 2708 static struct ceph_mds_request_head_old * 2709 find_old_request_head(void *p, u64 features) 2710 { 2711 bool legacy = !(features & CEPH_FEATURE_FS_BTIME); 2712 struct ceph_mds_request_head *new_head; 2713 2714 if (legacy) 2715 return (struct ceph_mds_request_head_old *)p; 2716 new_head = (struct ceph_mds_request_head *)p; 2717 return (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid; 2718 } 2719 2720 /* 2721 * called under mdsc->mutex 2722 */ 2723 static int __prepare_send_request(struct ceph_mds_session *session, 2724 struct ceph_mds_request *req, 2725 bool drop_cap_releases) 2726 { 2727 int mds = session->s_mds; 2728 struct ceph_mds_client *mdsc = session->s_mdsc; 2729 struct ceph_mds_request_head_old *rhead; 2730 struct ceph_msg *msg; 2731 int flags = 0, max_retry; 2732 2733 /* 2734 * The type of 'r_attempts' in kernel 'ceph_mds_request' 2735 * is 'int', while in 'ceph_mds_request_head' the type of 2736 * 'num_retry' is '__u8'. So in case the request retries 2737 * exceeding 256 times, the MDS will receive a incorrect 2738 * retry seq. 2739 * 2740 * In this case it's ususally a bug in MDS and continue 2741 * retrying the request makes no sense. 2742 * 2743 * In future this could be fixed in ceph code, so avoid 2744 * using the hardcode here. 2745 */ 2746 max_retry = sizeof_field(struct ceph_mds_request_head, num_retry); 2747 max_retry = 1 << (max_retry * BITS_PER_BYTE); 2748 if (req->r_attempts >= max_retry) { 2749 pr_warn_ratelimited("%s request tid %llu seq overflow\n", 2750 __func__, req->r_tid); 2751 return -EMULTIHOP; 2752 } 2753 2754 req->r_attempts++; 2755 if (req->r_inode) { 2756 struct ceph_cap *cap = 2757 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); 2758 2759 if (cap) 2760 req->r_sent_on_mseq = cap->mseq; 2761 else 2762 req->r_sent_on_mseq = -1; 2763 } 2764 dout("%s %p tid %lld %s (attempt %d)\n", __func__, req, 2765 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); 2766 2767 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 2768 void *p; 2769 2770 /* 2771 * Replay. Do not regenerate message (and rebuild 2772 * paths, etc.); just use the original message. 2773 * Rebuilding paths will break for renames because 2774 * d_move mangles the src name. 2775 */ 2776 msg = req->r_request; 2777 rhead = find_old_request_head(msg->front.iov_base, 2778 session->s_con.peer_features); 2779 2780 flags = le32_to_cpu(rhead->flags); 2781 flags |= CEPH_MDS_FLAG_REPLAY; 2782 rhead->flags = cpu_to_le32(flags); 2783 2784 if (req->r_target_inode) 2785 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); 2786 2787 rhead->num_retry = req->r_attempts - 1; 2788 2789 /* remove cap/dentry releases from message */ 2790 rhead->num_releases = 0; 2791 2792 p = msg->front.iov_base + req->r_request_release_offset; 2793 encode_timestamp_and_gids(&p, req); 2794 2795 msg->front.iov_len = p - msg->front.iov_base; 2796 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2797 return 0; 2798 } 2799 2800 if (req->r_request) { 2801 ceph_msg_put(req->r_request); 2802 req->r_request = NULL; 2803 } 2804 msg = create_request_message(session, req, drop_cap_releases); 2805 if (IS_ERR(msg)) { 2806 req->r_err = PTR_ERR(msg); 2807 return PTR_ERR(msg); 2808 } 2809 req->r_request = msg; 2810 2811 rhead = find_old_request_head(msg->front.iov_base, 2812 session->s_con.peer_features); 2813 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 2814 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2815 flags |= CEPH_MDS_FLAG_REPLAY; 2816 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) 2817 flags |= CEPH_MDS_FLAG_ASYNC; 2818 if (req->r_parent) 2819 flags |= CEPH_MDS_FLAG_WANT_DENTRY; 2820 rhead->flags = cpu_to_le32(flags); 2821 rhead->num_fwd = req->r_num_fwd; 2822 rhead->num_retry = req->r_attempts - 1; 2823 2824 dout(" r_parent = %p\n", req->r_parent); 2825 return 0; 2826 } 2827 2828 /* 2829 * called under mdsc->mutex 2830 */ 2831 static int __send_request(struct ceph_mds_session *session, 2832 struct ceph_mds_request *req, 2833 bool drop_cap_releases) 2834 { 2835 int err; 2836 2837 err = __prepare_send_request(session, req, drop_cap_releases); 2838 if (!err) { 2839 ceph_msg_get(req->r_request); 2840 ceph_con_send(&session->s_con, req->r_request); 2841 } 2842 2843 return err; 2844 } 2845 2846 /* 2847 * send request, or put it on the appropriate wait list. 2848 */ 2849 static void __do_request(struct ceph_mds_client *mdsc, 2850 struct ceph_mds_request *req) 2851 { 2852 struct ceph_mds_session *session = NULL; 2853 int mds = -1; 2854 int err = 0; 2855 bool random; 2856 2857 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 2858 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) 2859 __unregister_request(mdsc, req); 2860 return; 2861 } 2862 2863 if (req->r_timeout && 2864 time_after_eq(jiffies, req->r_started + req->r_timeout)) { 2865 dout("do_request timed out\n"); 2866 err = -ETIMEDOUT; 2867 goto finish; 2868 } 2869 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 2870 dout("do_request forced umount\n"); 2871 err = -EIO; 2872 goto finish; 2873 } 2874 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) { 2875 if (mdsc->mdsmap_err) { 2876 err = mdsc->mdsmap_err; 2877 dout("do_request mdsmap err %d\n", err); 2878 goto finish; 2879 } 2880 if (mdsc->mdsmap->m_epoch == 0) { 2881 dout("do_request no mdsmap, waiting for map\n"); 2882 list_add(&req->r_wait, &mdsc->waiting_for_map); 2883 return; 2884 } 2885 if (!(mdsc->fsc->mount_options->flags & 2886 CEPH_MOUNT_OPT_MOUNTWAIT) && 2887 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) { 2888 err = -EHOSTUNREACH; 2889 goto finish; 2890 } 2891 } 2892 2893 put_request_session(req); 2894 2895 mds = __choose_mds(mdsc, req, &random); 2896 if (mds < 0 || 2897 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { 2898 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 2899 err = -EJUKEBOX; 2900 goto finish; 2901 } 2902 dout("do_request no mds or not active, waiting for map\n"); 2903 list_add(&req->r_wait, &mdsc->waiting_for_map); 2904 return; 2905 } 2906 2907 /* get, open session */ 2908 session = __ceph_lookup_mds_session(mdsc, mds); 2909 if (!session) { 2910 session = register_session(mdsc, mds); 2911 if (IS_ERR(session)) { 2912 err = PTR_ERR(session); 2913 goto finish; 2914 } 2915 } 2916 req->r_session = ceph_get_mds_session(session); 2917 2918 dout("do_request mds%d session %p state %s\n", mds, session, 2919 ceph_session_state_name(session->s_state)); 2920 2921 /* 2922 * The old ceph will crash the MDSs when see unknown OPs 2923 */ 2924 if (req->r_feature_needed > 0 && 2925 !test_bit(req->r_feature_needed, &session->s_features)) { 2926 err = -EOPNOTSUPP; 2927 goto out_session; 2928 } 2929 2930 if (session->s_state != CEPH_MDS_SESSION_OPEN && 2931 session->s_state != CEPH_MDS_SESSION_HUNG) { 2932 /* 2933 * We cannot queue async requests since the caps and delegated 2934 * inodes are bound to the session. Just return -EJUKEBOX and 2935 * let the caller retry a sync request in that case. 2936 */ 2937 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 2938 err = -EJUKEBOX; 2939 goto out_session; 2940 } 2941 2942 /* 2943 * If the session has been REJECTED, then return a hard error, 2944 * unless it's a CLEANRECOVER mount, in which case we'll queue 2945 * it to the mdsc queue. 2946 */ 2947 if (session->s_state == CEPH_MDS_SESSION_REJECTED) { 2948 if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER)) 2949 list_add(&req->r_wait, &mdsc->waiting_for_map); 2950 else 2951 err = -EACCES; 2952 goto out_session; 2953 } 2954 2955 if (session->s_state == CEPH_MDS_SESSION_NEW || 2956 session->s_state == CEPH_MDS_SESSION_CLOSING) { 2957 err = __open_session(mdsc, session); 2958 if (err) 2959 goto out_session; 2960 /* retry the same mds later */ 2961 if (random) 2962 req->r_resend_mds = mds; 2963 } 2964 list_add(&req->r_wait, &session->s_waiting); 2965 goto out_session; 2966 } 2967 2968 /* send request */ 2969 req->r_resend_mds = -1; /* forget any previous mds hint */ 2970 2971 if (req->r_request_started == 0) /* note request start time */ 2972 req->r_request_started = jiffies; 2973 2974 /* 2975 * For async create we will choose the auth MDS of frag in parent 2976 * directory to send the request and ususally this works fine, but 2977 * if the migrated the dirtory to another MDS before it could handle 2978 * it the request will be forwarded. 2979 * 2980 * And then the auth cap will be changed. 2981 */ 2982 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags) && req->r_num_fwd) { 2983 struct ceph_dentry_info *di = ceph_dentry(req->r_dentry); 2984 struct ceph_inode_info *ci; 2985 struct ceph_cap *cap; 2986 2987 /* 2988 * The request maybe handled very fast and the new inode 2989 * hasn't been linked to the dentry yet. We need to wait 2990 * for the ceph_finish_async_create(), which shouldn't be 2991 * stuck too long or fail in thoery, to finish when forwarding 2992 * the request. 2993 */ 2994 if (!d_inode(req->r_dentry)) { 2995 err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_CREATE_BIT, 2996 TASK_KILLABLE); 2997 if (err) { 2998 mutex_lock(&req->r_fill_mutex); 2999 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 3000 mutex_unlock(&req->r_fill_mutex); 3001 goto out_session; 3002 } 3003 } 3004 3005 ci = ceph_inode(d_inode(req->r_dentry)); 3006 3007 spin_lock(&ci->i_ceph_lock); 3008 cap = ci->i_auth_cap; 3009 if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE && mds != cap->mds) { 3010 dout("do_request session changed for auth cap %d -> %d\n", 3011 cap->session->s_mds, session->s_mds); 3012 3013 /* Remove the auth cap from old session */ 3014 spin_lock(&cap->session->s_cap_lock); 3015 cap->session->s_nr_caps--; 3016 list_del_init(&cap->session_caps); 3017 spin_unlock(&cap->session->s_cap_lock); 3018 3019 /* Add the auth cap to the new session */ 3020 cap->mds = mds; 3021 cap->session = session; 3022 spin_lock(&session->s_cap_lock); 3023 session->s_nr_caps++; 3024 list_add_tail(&cap->session_caps, &session->s_caps); 3025 spin_unlock(&session->s_cap_lock); 3026 3027 change_auth_cap_ses(ci, session); 3028 } 3029 spin_unlock(&ci->i_ceph_lock); 3030 } 3031 3032 err = __send_request(session, req, false); 3033 3034 out_session: 3035 ceph_put_mds_session(session); 3036 finish: 3037 if (err) { 3038 dout("__do_request early error %d\n", err); 3039 req->r_err = err; 3040 complete_request(mdsc, req); 3041 __unregister_request(mdsc, req); 3042 } 3043 return; 3044 } 3045 3046 /* 3047 * called under mdsc->mutex 3048 */ 3049 static void __wake_requests(struct ceph_mds_client *mdsc, 3050 struct list_head *head) 3051 { 3052 struct ceph_mds_request *req; 3053 LIST_HEAD(tmp_list); 3054 3055 list_splice_init(head, &tmp_list); 3056 3057 while (!list_empty(&tmp_list)) { 3058 req = list_entry(tmp_list.next, 3059 struct ceph_mds_request, r_wait); 3060 list_del_init(&req->r_wait); 3061 dout(" wake request %p tid %llu\n", req, req->r_tid); 3062 __do_request(mdsc, req); 3063 } 3064 } 3065 3066 /* 3067 * Wake up threads with requests pending for @mds, so that they can 3068 * resubmit their requests to a possibly different mds. 3069 */ 3070 static void kick_requests(struct ceph_mds_client *mdsc, int mds) 3071 { 3072 struct ceph_mds_request *req; 3073 struct rb_node *p = rb_first(&mdsc->request_tree); 3074 3075 dout("kick_requests mds%d\n", mds); 3076 while (p) { 3077 req = rb_entry(p, struct ceph_mds_request, r_node); 3078 p = rb_next(p); 3079 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3080 continue; 3081 if (req->r_attempts > 0) 3082 continue; /* only new requests */ 3083 if (req->r_session && 3084 req->r_session->s_mds == mds) { 3085 dout(" kicking tid %llu\n", req->r_tid); 3086 list_del_init(&req->r_wait); 3087 __do_request(mdsc, req); 3088 } 3089 } 3090 } 3091 3092 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, 3093 struct ceph_mds_request *req) 3094 { 3095 int err = 0; 3096 3097 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */ 3098 if (req->r_inode) 3099 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 3100 if (req->r_parent) { 3101 struct ceph_inode_info *ci = ceph_inode(req->r_parent); 3102 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ? 3103 CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD; 3104 spin_lock(&ci->i_ceph_lock); 3105 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false); 3106 __ceph_touch_fmode(ci, mdsc, fmode); 3107 spin_unlock(&ci->i_ceph_lock); 3108 } 3109 if (req->r_old_dentry_dir) 3110 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 3111 CEPH_CAP_PIN); 3112 3113 if (req->r_inode) { 3114 err = ceph_wait_on_async_create(req->r_inode); 3115 if (err) { 3116 dout("%s: wait for async create returned: %d\n", 3117 __func__, err); 3118 return err; 3119 } 3120 } 3121 3122 if (!err && req->r_old_inode) { 3123 err = ceph_wait_on_async_create(req->r_old_inode); 3124 if (err) { 3125 dout("%s: wait for async create returned: %d\n", 3126 __func__, err); 3127 return err; 3128 } 3129 } 3130 3131 dout("submit_request on %p for inode %p\n", req, dir); 3132 mutex_lock(&mdsc->mutex); 3133 __register_request(mdsc, req, dir); 3134 __do_request(mdsc, req); 3135 err = req->r_err; 3136 mutex_unlock(&mdsc->mutex); 3137 return err; 3138 } 3139 3140 int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc, 3141 struct ceph_mds_request *req, 3142 ceph_mds_request_wait_callback_t wait_func) 3143 { 3144 int err; 3145 3146 /* wait */ 3147 dout("do_request waiting\n"); 3148 if (wait_func) { 3149 err = wait_func(mdsc, req); 3150 } else { 3151 long timeleft = wait_for_completion_killable_timeout( 3152 &req->r_completion, 3153 ceph_timeout_jiffies(req->r_timeout)); 3154 if (timeleft > 0) 3155 err = 0; 3156 else if (!timeleft) 3157 err = -ETIMEDOUT; /* timed out */ 3158 else 3159 err = timeleft; /* killed */ 3160 } 3161 dout("do_request waited, got %d\n", err); 3162 mutex_lock(&mdsc->mutex); 3163 3164 /* only abort if we didn't race with a real reply */ 3165 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 3166 err = le32_to_cpu(req->r_reply_info.head->result); 3167 } else if (err < 0) { 3168 dout("aborted request %lld with %d\n", req->r_tid, err); 3169 3170 /* 3171 * ensure we aren't running concurrently with 3172 * ceph_fill_trace or ceph_readdir_prepopulate, which 3173 * rely on locks (dir mutex) held by our caller. 3174 */ 3175 mutex_lock(&req->r_fill_mutex); 3176 req->r_err = err; 3177 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 3178 mutex_unlock(&req->r_fill_mutex); 3179 3180 if (req->r_parent && 3181 (req->r_op & CEPH_MDS_OP_WRITE)) 3182 ceph_invalidate_dir_request(req); 3183 } else { 3184 err = req->r_err; 3185 } 3186 3187 mutex_unlock(&mdsc->mutex); 3188 return err; 3189 } 3190 3191 /* 3192 * Synchrously perform an mds request. Take care of all of the 3193 * session setup, forwarding, retry details. 3194 */ 3195 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, 3196 struct inode *dir, 3197 struct ceph_mds_request *req) 3198 { 3199 int err; 3200 3201 dout("do_request on %p\n", req); 3202 3203 /* issue */ 3204 err = ceph_mdsc_submit_request(mdsc, dir, req); 3205 if (!err) 3206 err = ceph_mdsc_wait_request(mdsc, req, NULL); 3207 dout("do_request %p done, result %d\n", req, err); 3208 return err; 3209 } 3210 3211 /* 3212 * Invalidate dir's completeness, dentry lease state on an aborted MDS 3213 * namespace request. 3214 */ 3215 void ceph_invalidate_dir_request(struct ceph_mds_request *req) 3216 { 3217 struct inode *dir = req->r_parent; 3218 struct inode *old_dir = req->r_old_dentry_dir; 3219 3220 dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir); 3221 3222 ceph_dir_clear_complete(dir); 3223 if (old_dir) 3224 ceph_dir_clear_complete(old_dir); 3225 if (req->r_dentry) 3226 ceph_invalidate_dentry_lease(req->r_dentry); 3227 if (req->r_old_dentry) 3228 ceph_invalidate_dentry_lease(req->r_old_dentry); 3229 } 3230 3231 /* 3232 * Handle mds reply. 3233 * 3234 * We take the session mutex and parse and process the reply immediately. 3235 * This preserves the logical ordering of replies, capabilities, etc., sent 3236 * by the MDS as they are applied to our local cache. 3237 */ 3238 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) 3239 { 3240 struct ceph_mds_client *mdsc = session->s_mdsc; 3241 struct ceph_mds_request *req; 3242 struct ceph_mds_reply_head *head = msg->front.iov_base; 3243 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 3244 struct ceph_snap_realm *realm; 3245 u64 tid; 3246 int err, result; 3247 int mds = session->s_mds; 3248 3249 if (msg->front.iov_len < sizeof(*head)) { 3250 pr_err("mdsc_handle_reply got corrupt (short) reply\n"); 3251 ceph_msg_dump(msg); 3252 return; 3253 } 3254 3255 /* get request, session */ 3256 tid = le64_to_cpu(msg->hdr.tid); 3257 mutex_lock(&mdsc->mutex); 3258 req = lookup_get_request(mdsc, tid); 3259 if (!req) { 3260 dout("handle_reply on unknown tid %llu\n", tid); 3261 mutex_unlock(&mdsc->mutex); 3262 return; 3263 } 3264 dout("handle_reply %p\n", req); 3265 3266 /* correct session? */ 3267 if (req->r_session != session) { 3268 pr_err("mdsc_handle_reply got %llu on session mds%d" 3269 " not mds%d\n", tid, session->s_mds, 3270 req->r_session ? req->r_session->s_mds : -1); 3271 mutex_unlock(&mdsc->mutex); 3272 goto out; 3273 } 3274 3275 /* dup? */ 3276 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) || 3277 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) { 3278 pr_warn("got a dup %s reply on %llu from mds%d\n", 3279 head->safe ? "safe" : "unsafe", tid, mds); 3280 mutex_unlock(&mdsc->mutex); 3281 goto out; 3282 } 3283 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) { 3284 pr_warn("got unsafe after safe on %llu from mds%d\n", 3285 tid, mds); 3286 mutex_unlock(&mdsc->mutex); 3287 goto out; 3288 } 3289 3290 result = le32_to_cpu(head->result); 3291 3292 if (head->safe) { 3293 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags); 3294 __unregister_request(mdsc, req); 3295 3296 /* last request during umount? */ 3297 if (mdsc->stopping && !__get_oldest_req(mdsc)) 3298 complete_all(&mdsc->safe_umount_waiters); 3299 3300 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3301 /* 3302 * We already handled the unsafe response, now do the 3303 * cleanup. No need to examine the response; the MDS 3304 * doesn't include any result info in the safe 3305 * response. And even if it did, there is nothing 3306 * useful we could do with a revised return value. 3307 */ 3308 dout("got safe reply %llu, mds%d\n", tid, mds); 3309 3310 mutex_unlock(&mdsc->mutex); 3311 goto out; 3312 } 3313 } else { 3314 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags); 3315 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 3316 } 3317 3318 dout("handle_reply tid %lld result %d\n", tid, result); 3319 rinfo = &req->r_reply_info; 3320 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features)) 3321 err = parse_reply_info(session, msg, rinfo, (u64)-1); 3322 else 3323 err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features); 3324 mutex_unlock(&mdsc->mutex); 3325 3326 /* Must find target inode outside of mutexes to avoid deadlocks */ 3327 if ((err >= 0) && rinfo->head->is_target) { 3328 struct inode *in; 3329 struct ceph_vino tvino = { 3330 .ino = le64_to_cpu(rinfo->targeti.in->ino), 3331 .snap = le64_to_cpu(rinfo->targeti.in->snapid) 3332 }; 3333 3334 in = ceph_get_inode(mdsc->fsc->sb, tvino); 3335 if (IS_ERR(in)) { 3336 err = PTR_ERR(in); 3337 mutex_lock(&session->s_mutex); 3338 goto out_err; 3339 } 3340 req->r_target_inode = in; 3341 } 3342 3343 mutex_lock(&session->s_mutex); 3344 if (err < 0) { 3345 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid); 3346 ceph_msg_dump(msg); 3347 goto out_err; 3348 } 3349 3350 /* snap trace */ 3351 realm = NULL; 3352 if (rinfo->snapblob_len) { 3353 down_write(&mdsc->snap_rwsem); 3354 ceph_update_snap_trace(mdsc, rinfo->snapblob, 3355 rinfo->snapblob + rinfo->snapblob_len, 3356 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP, 3357 &realm); 3358 downgrade_write(&mdsc->snap_rwsem); 3359 } else { 3360 down_read(&mdsc->snap_rwsem); 3361 } 3362 3363 /* insert trace into our cache */ 3364 mutex_lock(&req->r_fill_mutex); 3365 current->journal_info = req; 3366 err = ceph_fill_trace(mdsc->fsc->sb, req); 3367 if (err == 0) { 3368 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || 3369 req->r_op == CEPH_MDS_OP_LSSNAP)) 3370 ceph_readdir_prepopulate(req, req->r_session); 3371 } 3372 current->journal_info = NULL; 3373 mutex_unlock(&req->r_fill_mutex); 3374 3375 up_read(&mdsc->snap_rwsem); 3376 if (realm) 3377 ceph_put_snap_realm(mdsc, realm); 3378 3379 if (err == 0) { 3380 if (req->r_target_inode && 3381 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3382 struct ceph_inode_info *ci = 3383 ceph_inode(req->r_target_inode); 3384 spin_lock(&ci->i_unsafe_lock); 3385 list_add_tail(&req->r_unsafe_target_item, 3386 &ci->i_unsafe_iops); 3387 spin_unlock(&ci->i_unsafe_lock); 3388 } 3389 3390 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 3391 } 3392 out_err: 3393 mutex_lock(&mdsc->mutex); 3394 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3395 if (err) { 3396 req->r_err = err; 3397 } else { 3398 req->r_reply = ceph_msg_get(msg); 3399 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags); 3400 } 3401 } else { 3402 dout("reply arrived after request %lld was aborted\n", tid); 3403 } 3404 mutex_unlock(&mdsc->mutex); 3405 3406 mutex_unlock(&session->s_mutex); 3407 3408 /* kick calling process */ 3409 complete_request(mdsc, req); 3410 3411 ceph_update_metadata_metrics(&mdsc->metric, req->r_start_latency, 3412 req->r_end_latency, err); 3413 out: 3414 ceph_mdsc_put_request(req); 3415 return; 3416 } 3417 3418 3419 3420 /* 3421 * handle mds notification that our request has been forwarded. 3422 */ 3423 static void handle_forward(struct ceph_mds_client *mdsc, 3424 struct ceph_mds_session *session, 3425 struct ceph_msg *msg) 3426 { 3427 struct ceph_mds_request *req; 3428 u64 tid = le64_to_cpu(msg->hdr.tid); 3429 u32 next_mds; 3430 u32 fwd_seq; 3431 int err = -EINVAL; 3432 void *p = msg->front.iov_base; 3433 void *end = p + msg->front.iov_len; 3434 bool aborted = false; 3435 3436 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 3437 next_mds = ceph_decode_32(&p); 3438 fwd_seq = ceph_decode_32(&p); 3439 3440 mutex_lock(&mdsc->mutex); 3441 req = lookup_get_request(mdsc, tid); 3442 if (!req) { 3443 mutex_unlock(&mdsc->mutex); 3444 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds); 3445 return; /* dup reply? */ 3446 } 3447 3448 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3449 dout("forward tid %llu aborted, unregistering\n", tid); 3450 __unregister_request(mdsc, req); 3451 } else if (fwd_seq <= req->r_num_fwd) { 3452 /* 3453 * The type of 'num_fwd' in ceph 'MClientRequestForward' 3454 * is 'int32_t', while in 'ceph_mds_request_head' the 3455 * type is '__u8'. So in case the request bounces between 3456 * MDSes exceeding 256 times, the client will get stuck. 3457 * 3458 * In this case it's ususally a bug in MDS and continue 3459 * bouncing the request makes no sense. 3460 * 3461 * In future this could be fixed in ceph code, so avoid 3462 * using the hardcode here. 3463 */ 3464 int max = sizeof_field(struct ceph_mds_request_head, num_fwd); 3465 max = 1 << (max * BITS_PER_BYTE); 3466 if (req->r_num_fwd >= max) { 3467 mutex_lock(&req->r_fill_mutex); 3468 req->r_err = -EMULTIHOP; 3469 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 3470 mutex_unlock(&req->r_fill_mutex); 3471 aborted = true; 3472 pr_warn_ratelimited("forward tid %llu seq overflow\n", 3473 tid); 3474 } else { 3475 dout("forward tid %llu to mds%d - old seq %d <= %d\n", 3476 tid, next_mds, req->r_num_fwd, fwd_seq); 3477 } 3478 } else { 3479 /* resend. forward race not possible; mds would drop */ 3480 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); 3481 BUG_ON(req->r_err); 3482 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)); 3483 req->r_attempts = 0; 3484 req->r_num_fwd = fwd_seq; 3485 req->r_resend_mds = next_mds; 3486 put_request_session(req); 3487 __do_request(mdsc, req); 3488 } 3489 mutex_unlock(&mdsc->mutex); 3490 3491 /* kick calling process */ 3492 if (aborted) 3493 complete_request(mdsc, req); 3494 ceph_mdsc_put_request(req); 3495 return; 3496 3497 bad: 3498 pr_err("mdsc_handle_forward decode error err=%d\n", err); 3499 } 3500 3501 static int __decode_session_metadata(void **p, void *end, 3502 bool *blocklisted) 3503 { 3504 /* map<string,string> */ 3505 u32 n; 3506 bool err_str; 3507 ceph_decode_32_safe(p, end, n, bad); 3508 while (n-- > 0) { 3509 u32 len; 3510 ceph_decode_32_safe(p, end, len, bad); 3511 ceph_decode_need(p, end, len, bad); 3512 err_str = !strncmp(*p, "error_string", len); 3513 *p += len; 3514 ceph_decode_32_safe(p, end, len, bad); 3515 ceph_decode_need(p, end, len, bad); 3516 /* 3517 * Match "blocklisted (blacklisted)" from newer MDSes, 3518 * or "blacklisted" from older MDSes. 3519 */ 3520 if (err_str && strnstr(*p, "blacklisted", len)) 3521 *blocklisted = true; 3522 *p += len; 3523 } 3524 return 0; 3525 bad: 3526 return -1; 3527 } 3528 3529 /* 3530 * handle a mds session control message 3531 */ 3532 static void handle_session(struct ceph_mds_session *session, 3533 struct ceph_msg *msg) 3534 { 3535 struct ceph_mds_client *mdsc = session->s_mdsc; 3536 int mds = session->s_mds; 3537 int msg_version = le16_to_cpu(msg->hdr.version); 3538 void *p = msg->front.iov_base; 3539 void *end = p + msg->front.iov_len; 3540 struct ceph_mds_session_head *h; 3541 u32 op; 3542 u64 seq, features = 0; 3543 int wake = 0; 3544 bool blocklisted = false; 3545 3546 /* decode */ 3547 ceph_decode_need(&p, end, sizeof(*h), bad); 3548 h = p; 3549 p += sizeof(*h); 3550 3551 op = le32_to_cpu(h->op); 3552 seq = le64_to_cpu(h->seq); 3553 3554 if (msg_version >= 3) { 3555 u32 len; 3556 /* version >= 2 and < 5, decode metadata, skip otherwise 3557 * as it's handled via flags. 3558 */ 3559 if (msg_version >= 5) 3560 ceph_decode_skip_map(&p, end, string, string, bad); 3561 else if (__decode_session_metadata(&p, end, &blocklisted) < 0) 3562 goto bad; 3563 3564 /* version >= 3, feature bits */ 3565 ceph_decode_32_safe(&p, end, len, bad); 3566 if (len) { 3567 ceph_decode_64_safe(&p, end, features, bad); 3568 p += len - sizeof(features); 3569 } 3570 } 3571 3572 if (msg_version >= 5) { 3573 u32 flags, len; 3574 3575 /* version >= 4 */ 3576 ceph_decode_skip_16(&p, end, bad); /* struct_v, struct_cv */ 3577 ceph_decode_32_safe(&p, end, len, bad); /* len */ 3578 ceph_decode_skip_n(&p, end, len, bad); /* metric_spec */ 3579 3580 /* version >= 5, flags */ 3581 ceph_decode_32_safe(&p, end, flags, bad); 3582 if (flags & CEPH_SESSION_BLOCKLISTED) { 3583 pr_warn("mds%d session blocklisted\n", session->s_mds); 3584 blocklisted = true; 3585 } 3586 } 3587 3588 mutex_lock(&mdsc->mutex); 3589 if (op == CEPH_SESSION_CLOSE) { 3590 ceph_get_mds_session(session); 3591 __unregister_session(mdsc, session); 3592 } 3593 /* FIXME: this ttl calculation is generous */ 3594 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 3595 mutex_unlock(&mdsc->mutex); 3596 3597 mutex_lock(&session->s_mutex); 3598 3599 dout("handle_session mds%d %s %p state %s seq %llu\n", 3600 mds, ceph_session_op_name(op), session, 3601 ceph_session_state_name(session->s_state), seq); 3602 3603 if (session->s_state == CEPH_MDS_SESSION_HUNG) { 3604 session->s_state = CEPH_MDS_SESSION_OPEN; 3605 pr_info("mds%d came back\n", session->s_mds); 3606 } 3607 3608 switch (op) { 3609 case CEPH_SESSION_OPEN: 3610 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3611 pr_info("mds%d reconnect success\n", session->s_mds); 3612 3613 if (session->s_state == CEPH_MDS_SESSION_OPEN) { 3614 pr_notice("mds%d is already opened\n", session->s_mds); 3615 } else { 3616 session->s_state = CEPH_MDS_SESSION_OPEN; 3617 session->s_features = features; 3618 renewed_caps(mdsc, session, 0); 3619 if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, 3620 &session->s_features)) 3621 metric_schedule_delayed(&mdsc->metric); 3622 } 3623 3624 /* 3625 * The connection maybe broken and the session in client 3626 * side has been reinitialized, need to update the seq 3627 * anyway. 3628 */ 3629 if (!session->s_seq && seq) 3630 session->s_seq = seq; 3631 3632 wake = 1; 3633 if (mdsc->stopping) 3634 __close_session(mdsc, session); 3635 break; 3636 3637 case CEPH_SESSION_RENEWCAPS: 3638 if (session->s_renew_seq == seq) 3639 renewed_caps(mdsc, session, 1); 3640 break; 3641 3642 case CEPH_SESSION_CLOSE: 3643 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3644 pr_info("mds%d reconnect denied\n", session->s_mds); 3645 session->s_state = CEPH_MDS_SESSION_CLOSED; 3646 cleanup_session_requests(mdsc, session); 3647 remove_session_caps(session); 3648 wake = 2; /* for good measure */ 3649 wake_up_all(&mdsc->session_close_wq); 3650 break; 3651 3652 case CEPH_SESSION_STALE: 3653 pr_info("mds%d caps went stale, renewing\n", 3654 session->s_mds); 3655 atomic_inc(&session->s_cap_gen); 3656 session->s_cap_ttl = jiffies - 1; 3657 send_renew_caps(mdsc, session); 3658 break; 3659 3660 case CEPH_SESSION_RECALL_STATE: 3661 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 3662 break; 3663 3664 case CEPH_SESSION_FLUSHMSG: 3665 send_flushmsg_ack(mdsc, session, seq); 3666 break; 3667 3668 case CEPH_SESSION_FORCE_RO: 3669 dout("force_session_readonly %p\n", session); 3670 spin_lock(&session->s_cap_lock); 3671 session->s_readonly = true; 3672 spin_unlock(&session->s_cap_lock); 3673 wake_up_session_caps(session, FORCE_RO); 3674 break; 3675 3676 case CEPH_SESSION_REJECT: 3677 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING); 3678 pr_info("mds%d rejected session\n", session->s_mds); 3679 session->s_state = CEPH_MDS_SESSION_REJECTED; 3680 cleanup_session_requests(mdsc, session); 3681 remove_session_caps(session); 3682 if (blocklisted) 3683 mdsc->fsc->blocklisted = true; 3684 wake = 2; /* for good measure */ 3685 break; 3686 3687 default: 3688 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); 3689 WARN_ON(1); 3690 } 3691 3692 mutex_unlock(&session->s_mutex); 3693 if (wake) { 3694 mutex_lock(&mdsc->mutex); 3695 __wake_requests(mdsc, &session->s_waiting); 3696 if (wake == 2) 3697 kick_requests(mdsc, mds); 3698 mutex_unlock(&mdsc->mutex); 3699 } 3700 if (op == CEPH_SESSION_CLOSE) 3701 ceph_put_mds_session(session); 3702 return; 3703 3704 bad: 3705 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds, 3706 (int)msg->front.iov_len); 3707 ceph_msg_dump(msg); 3708 return; 3709 } 3710 3711 void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req) 3712 { 3713 int dcaps; 3714 3715 dcaps = xchg(&req->r_dir_caps, 0); 3716 if (dcaps) { 3717 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 3718 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps); 3719 } 3720 } 3721 3722 void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req) 3723 { 3724 int dcaps; 3725 3726 dcaps = xchg(&req->r_dir_caps, 0); 3727 if (dcaps) { 3728 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 3729 ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent), 3730 dcaps); 3731 } 3732 } 3733 3734 /* 3735 * called under session->mutex. 3736 */ 3737 static void replay_unsafe_requests(struct ceph_mds_client *mdsc, 3738 struct ceph_mds_session *session) 3739 { 3740 struct ceph_mds_request *req, *nreq; 3741 struct rb_node *p; 3742 3743 dout("replay_unsafe_requests mds%d\n", session->s_mds); 3744 3745 mutex_lock(&mdsc->mutex); 3746 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) 3747 __send_request(session, req, true); 3748 3749 /* 3750 * also re-send old requests when MDS enters reconnect stage. So that MDS 3751 * can process completed request in clientreplay stage. 3752 */ 3753 p = rb_first(&mdsc->request_tree); 3754 while (p) { 3755 req = rb_entry(p, struct ceph_mds_request, r_node); 3756 p = rb_next(p); 3757 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3758 continue; 3759 if (req->r_attempts == 0) 3760 continue; /* only old requests */ 3761 if (!req->r_session) 3762 continue; 3763 if (req->r_session->s_mds != session->s_mds) 3764 continue; 3765 3766 ceph_mdsc_release_dir_caps_no_check(req); 3767 3768 __send_request(session, req, true); 3769 } 3770 mutex_unlock(&mdsc->mutex); 3771 } 3772 3773 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state) 3774 { 3775 struct ceph_msg *reply; 3776 struct ceph_pagelist *_pagelist; 3777 struct page *page; 3778 __le32 *addr; 3779 int err = -ENOMEM; 3780 3781 if (!recon_state->allow_multi) 3782 return -ENOSPC; 3783 3784 /* can't handle message that contains both caps and realm */ 3785 BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms); 3786 3787 /* pre-allocate new pagelist */ 3788 _pagelist = ceph_pagelist_alloc(GFP_NOFS); 3789 if (!_pagelist) 3790 return -ENOMEM; 3791 3792 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 3793 if (!reply) 3794 goto fail_msg; 3795 3796 /* placeholder for nr_caps */ 3797 err = ceph_pagelist_encode_32(_pagelist, 0); 3798 if (err < 0) 3799 goto fail; 3800 3801 if (recon_state->nr_caps) { 3802 /* currently encoding caps */ 3803 err = ceph_pagelist_encode_32(recon_state->pagelist, 0); 3804 if (err) 3805 goto fail; 3806 } else { 3807 /* placeholder for nr_realms (currently encoding relams) */ 3808 err = ceph_pagelist_encode_32(_pagelist, 0); 3809 if (err < 0) 3810 goto fail; 3811 } 3812 3813 err = ceph_pagelist_encode_8(recon_state->pagelist, 1); 3814 if (err) 3815 goto fail; 3816 3817 page = list_first_entry(&recon_state->pagelist->head, struct page, lru); 3818 addr = kmap_atomic(page); 3819 if (recon_state->nr_caps) { 3820 /* currently encoding caps */ 3821 *addr = cpu_to_le32(recon_state->nr_caps); 3822 } else { 3823 /* currently encoding relams */ 3824 *(addr + 1) = cpu_to_le32(recon_state->nr_realms); 3825 } 3826 kunmap_atomic(addr); 3827 3828 reply->hdr.version = cpu_to_le16(5); 3829 reply->hdr.compat_version = cpu_to_le16(4); 3830 3831 reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length); 3832 ceph_msg_data_add_pagelist(reply, recon_state->pagelist); 3833 3834 ceph_con_send(&recon_state->session->s_con, reply); 3835 ceph_pagelist_release(recon_state->pagelist); 3836 3837 recon_state->pagelist = _pagelist; 3838 recon_state->nr_caps = 0; 3839 recon_state->nr_realms = 0; 3840 recon_state->msg_version = 5; 3841 return 0; 3842 fail: 3843 ceph_msg_put(reply); 3844 fail_msg: 3845 ceph_pagelist_release(_pagelist); 3846 return err; 3847 } 3848 3849 static struct dentry* d_find_primary(struct inode *inode) 3850 { 3851 struct dentry *alias, *dn = NULL; 3852 3853 if (hlist_empty(&inode->i_dentry)) 3854 return NULL; 3855 3856 spin_lock(&inode->i_lock); 3857 if (hlist_empty(&inode->i_dentry)) 3858 goto out_unlock; 3859 3860 if (S_ISDIR(inode->i_mode)) { 3861 alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias); 3862 if (!IS_ROOT(alias)) 3863 dn = dget(alias); 3864 goto out_unlock; 3865 } 3866 3867 hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) { 3868 spin_lock(&alias->d_lock); 3869 if (!d_unhashed(alias) && 3870 (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) { 3871 dn = dget_dlock(alias); 3872 } 3873 spin_unlock(&alias->d_lock); 3874 if (dn) 3875 break; 3876 } 3877 out_unlock: 3878 spin_unlock(&inode->i_lock); 3879 return dn; 3880 } 3881 3882 /* 3883 * Encode information about a cap for a reconnect with the MDS. 3884 */ 3885 static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap, 3886 void *arg) 3887 { 3888 union { 3889 struct ceph_mds_cap_reconnect v2; 3890 struct ceph_mds_cap_reconnect_v1 v1; 3891 } rec; 3892 struct ceph_inode_info *ci = cap->ci; 3893 struct ceph_reconnect_state *recon_state = arg; 3894 struct ceph_pagelist *pagelist = recon_state->pagelist; 3895 struct dentry *dentry; 3896 char *path; 3897 int pathlen = 0, err; 3898 u64 pathbase; 3899 u64 snap_follows; 3900 3901 dout(" adding %p ino %llx.%llx cap %p %lld %s\n", 3902 inode, ceph_vinop(inode), cap, cap->cap_id, 3903 ceph_cap_string(cap->issued)); 3904 3905 dentry = d_find_primary(inode); 3906 if (dentry) { 3907 /* set pathbase to parent dir when msg_version >= 2 */ 3908 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 3909 recon_state->msg_version >= 2); 3910 dput(dentry); 3911 if (IS_ERR(path)) { 3912 err = PTR_ERR(path); 3913 goto out_err; 3914 } 3915 } else { 3916 path = NULL; 3917 pathbase = 0; 3918 } 3919 3920 spin_lock(&ci->i_ceph_lock); 3921 cap->seq = 0; /* reset cap seq */ 3922 cap->issue_seq = 0; /* and issue_seq */ 3923 cap->mseq = 0; /* and migrate_seq */ 3924 cap->cap_gen = atomic_read(&cap->session->s_cap_gen); 3925 3926 /* These are lost when the session goes away */ 3927 if (S_ISDIR(inode->i_mode)) { 3928 if (cap->issued & CEPH_CAP_DIR_CREATE) { 3929 ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns)); 3930 memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout)); 3931 } 3932 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS; 3933 } 3934 3935 if (recon_state->msg_version >= 2) { 3936 rec.v2.cap_id = cpu_to_le64(cap->cap_id); 3937 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3938 rec.v2.issued = cpu_to_le32(cap->issued); 3939 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 3940 rec.v2.pathbase = cpu_to_le64(pathbase); 3941 rec.v2.flock_len = (__force __le32) 3942 ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1); 3943 } else { 3944 rec.v1.cap_id = cpu_to_le64(cap->cap_id); 3945 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3946 rec.v1.issued = cpu_to_le32(cap->issued); 3947 rec.v1.size = cpu_to_le64(i_size_read(inode)); 3948 ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime); 3949 ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime); 3950 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 3951 rec.v1.pathbase = cpu_to_le64(pathbase); 3952 } 3953 3954 if (list_empty(&ci->i_cap_snaps)) { 3955 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0; 3956 } else { 3957 struct ceph_cap_snap *capsnap = 3958 list_first_entry(&ci->i_cap_snaps, 3959 struct ceph_cap_snap, ci_item); 3960 snap_follows = capsnap->follows; 3961 } 3962 spin_unlock(&ci->i_ceph_lock); 3963 3964 if (recon_state->msg_version >= 2) { 3965 int num_fcntl_locks, num_flock_locks; 3966 struct ceph_filelock *flocks = NULL; 3967 size_t struct_len, total_len = sizeof(u64); 3968 u8 struct_v = 0; 3969 3970 encode_again: 3971 if (rec.v2.flock_len) { 3972 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); 3973 } else { 3974 num_fcntl_locks = 0; 3975 num_flock_locks = 0; 3976 } 3977 if (num_fcntl_locks + num_flock_locks > 0) { 3978 flocks = kmalloc_array(num_fcntl_locks + num_flock_locks, 3979 sizeof(struct ceph_filelock), 3980 GFP_NOFS); 3981 if (!flocks) { 3982 err = -ENOMEM; 3983 goto out_err; 3984 } 3985 err = ceph_encode_locks_to_buffer(inode, flocks, 3986 num_fcntl_locks, 3987 num_flock_locks); 3988 if (err) { 3989 kfree(flocks); 3990 flocks = NULL; 3991 if (err == -ENOSPC) 3992 goto encode_again; 3993 goto out_err; 3994 } 3995 } else { 3996 kfree(flocks); 3997 flocks = NULL; 3998 } 3999 4000 if (recon_state->msg_version >= 3) { 4001 /* version, compat_version and struct_len */ 4002 total_len += 2 * sizeof(u8) + sizeof(u32); 4003 struct_v = 2; 4004 } 4005 /* 4006 * number of encoded locks is stable, so copy to pagelist 4007 */ 4008 struct_len = 2 * sizeof(u32) + 4009 (num_fcntl_locks + num_flock_locks) * 4010 sizeof(struct ceph_filelock); 4011 rec.v2.flock_len = cpu_to_le32(struct_len); 4012 4013 struct_len += sizeof(u32) + pathlen + sizeof(rec.v2); 4014 4015 if (struct_v >= 2) 4016 struct_len += sizeof(u64); /* snap_follows */ 4017 4018 total_len += struct_len; 4019 4020 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) { 4021 err = send_reconnect_partial(recon_state); 4022 if (err) 4023 goto out_freeflocks; 4024 pagelist = recon_state->pagelist; 4025 } 4026 4027 err = ceph_pagelist_reserve(pagelist, total_len); 4028 if (err) 4029 goto out_freeflocks; 4030 4031 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 4032 if (recon_state->msg_version >= 3) { 4033 ceph_pagelist_encode_8(pagelist, struct_v); 4034 ceph_pagelist_encode_8(pagelist, 1); 4035 ceph_pagelist_encode_32(pagelist, struct_len); 4036 } 4037 ceph_pagelist_encode_string(pagelist, path, pathlen); 4038 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2)); 4039 ceph_locks_to_pagelist(flocks, pagelist, 4040 num_fcntl_locks, num_flock_locks); 4041 if (struct_v >= 2) 4042 ceph_pagelist_encode_64(pagelist, snap_follows); 4043 out_freeflocks: 4044 kfree(flocks); 4045 } else { 4046 err = ceph_pagelist_reserve(pagelist, 4047 sizeof(u64) + sizeof(u32) + 4048 pathlen + sizeof(rec.v1)); 4049 if (err) 4050 goto out_err; 4051 4052 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 4053 ceph_pagelist_encode_string(pagelist, path, pathlen); 4054 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); 4055 } 4056 4057 out_err: 4058 ceph_mdsc_free_path(path, pathlen); 4059 if (!err) 4060 recon_state->nr_caps++; 4061 return err; 4062 } 4063 4064 static int encode_snap_realms(struct ceph_mds_client *mdsc, 4065 struct ceph_reconnect_state *recon_state) 4066 { 4067 struct rb_node *p; 4068 struct ceph_pagelist *pagelist = recon_state->pagelist; 4069 int err = 0; 4070 4071 if (recon_state->msg_version >= 4) { 4072 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms); 4073 if (err < 0) 4074 goto fail; 4075 } 4076 4077 /* 4078 * snaprealms. we provide mds with the ino, seq (version), and 4079 * parent for all of our realms. If the mds has any newer info, 4080 * it will tell us. 4081 */ 4082 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { 4083 struct ceph_snap_realm *realm = 4084 rb_entry(p, struct ceph_snap_realm, node); 4085 struct ceph_mds_snaprealm_reconnect sr_rec; 4086 4087 if (recon_state->msg_version >= 4) { 4088 size_t need = sizeof(u8) * 2 + sizeof(u32) + 4089 sizeof(sr_rec); 4090 4091 if (pagelist->length + need > RECONNECT_MAX_SIZE) { 4092 err = send_reconnect_partial(recon_state); 4093 if (err) 4094 goto fail; 4095 pagelist = recon_state->pagelist; 4096 } 4097 4098 err = ceph_pagelist_reserve(pagelist, need); 4099 if (err) 4100 goto fail; 4101 4102 ceph_pagelist_encode_8(pagelist, 1); 4103 ceph_pagelist_encode_8(pagelist, 1); 4104 ceph_pagelist_encode_32(pagelist, sizeof(sr_rec)); 4105 } 4106 4107 dout(" adding snap realm %llx seq %lld parent %llx\n", 4108 realm->ino, realm->seq, realm->parent_ino); 4109 sr_rec.ino = cpu_to_le64(realm->ino); 4110 sr_rec.seq = cpu_to_le64(realm->seq); 4111 sr_rec.parent = cpu_to_le64(realm->parent_ino); 4112 4113 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); 4114 if (err) 4115 goto fail; 4116 4117 recon_state->nr_realms++; 4118 } 4119 fail: 4120 return err; 4121 } 4122 4123 4124 /* 4125 * If an MDS fails and recovers, clients need to reconnect in order to 4126 * reestablish shared state. This includes all caps issued through 4127 * this session _and_ the snap_realm hierarchy. Because it's not 4128 * clear which snap realms the mds cares about, we send everything we 4129 * know about.. that ensures we'll then get any new info the 4130 * recovering MDS might have. 4131 * 4132 * This is a relatively heavyweight operation, but it's rare. 4133 */ 4134 static void send_mds_reconnect(struct ceph_mds_client *mdsc, 4135 struct ceph_mds_session *session) 4136 { 4137 struct ceph_msg *reply; 4138 int mds = session->s_mds; 4139 int err = -ENOMEM; 4140 struct ceph_reconnect_state recon_state = { 4141 .session = session, 4142 }; 4143 LIST_HEAD(dispose); 4144 4145 pr_info("mds%d reconnect start\n", mds); 4146 4147 recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS); 4148 if (!recon_state.pagelist) 4149 goto fail_nopagelist; 4150 4151 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 4152 if (!reply) 4153 goto fail_nomsg; 4154 4155 xa_destroy(&session->s_delegated_inos); 4156 4157 mutex_lock(&session->s_mutex); 4158 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 4159 session->s_seq = 0; 4160 4161 dout("session %p state %s\n", session, 4162 ceph_session_state_name(session->s_state)); 4163 4164 atomic_inc(&session->s_cap_gen); 4165 4166 spin_lock(&session->s_cap_lock); 4167 /* don't know if session is readonly */ 4168 session->s_readonly = 0; 4169 /* 4170 * notify __ceph_remove_cap() that we are composing cap reconnect. 4171 * If a cap get released before being added to the cap reconnect, 4172 * __ceph_remove_cap() should skip queuing cap release. 4173 */ 4174 session->s_cap_reconnect = 1; 4175 /* drop old cap expires; we're about to reestablish that state */ 4176 detach_cap_releases(session, &dispose); 4177 spin_unlock(&session->s_cap_lock); 4178 dispose_cap_releases(mdsc, &dispose); 4179 4180 /* trim unused caps to reduce MDS's cache rejoin time */ 4181 if (mdsc->fsc->sb->s_root) 4182 shrink_dcache_parent(mdsc->fsc->sb->s_root); 4183 4184 ceph_con_close(&session->s_con); 4185 ceph_con_open(&session->s_con, 4186 CEPH_ENTITY_TYPE_MDS, mds, 4187 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 4188 4189 /* replay unsafe requests */ 4190 replay_unsafe_requests(mdsc, session); 4191 4192 ceph_early_kick_flushing_caps(mdsc, session); 4193 4194 down_read(&mdsc->snap_rwsem); 4195 4196 /* placeholder for nr_caps */ 4197 err = ceph_pagelist_encode_32(recon_state.pagelist, 0); 4198 if (err) 4199 goto fail; 4200 4201 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) { 4202 recon_state.msg_version = 3; 4203 recon_state.allow_multi = true; 4204 } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) { 4205 recon_state.msg_version = 3; 4206 } else { 4207 recon_state.msg_version = 2; 4208 } 4209 /* trsaverse this session's caps */ 4210 err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state); 4211 4212 spin_lock(&session->s_cap_lock); 4213 session->s_cap_reconnect = 0; 4214 spin_unlock(&session->s_cap_lock); 4215 4216 if (err < 0) 4217 goto fail; 4218 4219 /* check if all realms can be encoded into current message */ 4220 if (mdsc->num_snap_realms) { 4221 size_t total_len = 4222 recon_state.pagelist->length + 4223 mdsc->num_snap_realms * 4224 sizeof(struct ceph_mds_snaprealm_reconnect); 4225 if (recon_state.msg_version >= 4) { 4226 /* number of realms */ 4227 total_len += sizeof(u32); 4228 /* version, compat_version and struct_len */ 4229 total_len += mdsc->num_snap_realms * 4230 (2 * sizeof(u8) + sizeof(u32)); 4231 } 4232 if (total_len > RECONNECT_MAX_SIZE) { 4233 if (!recon_state.allow_multi) { 4234 err = -ENOSPC; 4235 goto fail; 4236 } 4237 if (recon_state.nr_caps) { 4238 err = send_reconnect_partial(&recon_state); 4239 if (err) 4240 goto fail; 4241 } 4242 recon_state.msg_version = 5; 4243 } 4244 } 4245 4246 err = encode_snap_realms(mdsc, &recon_state); 4247 if (err < 0) 4248 goto fail; 4249 4250 if (recon_state.msg_version >= 5) { 4251 err = ceph_pagelist_encode_8(recon_state.pagelist, 0); 4252 if (err < 0) 4253 goto fail; 4254 } 4255 4256 if (recon_state.nr_caps || recon_state.nr_realms) { 4257 struct page *page = 4258 list_first_entry(&recon_state.pagelist->head, 4259 struct page, lru); 4260 __le32 *addr = kmap_atomic(page); 4261 if (recon_state.nr_caps) { 4262 WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms); 4263 *addr = cpu_to_le32(recon_state.nr_caps); 4264 } else if (recon_state.msg_version >= 4) { 4265 *(addr + 1) = cpu_to_le32(recon_state.nr_realms); 4266 } 4267 kunmap_atomic(addr); 4268 } 4269 4270 reply->hdr.version = cpu_to_le16(recon_state.msg_version); 4271 if (recon_state.msg_version >= 4) 4272 reply->hdr.compat_version = cpu_to_le16(4); 4273 4274 reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length); 4275 ceph_msg_data_add_pagelist(reply, recon_state.pagelist); 4276 4277 ceph_con_send(&session->s_con, reply); 4278 4279 mutex_unlock(&session->s_mutex); 4280 4281 mutex_lock(&mdsc->mutex); 4282 __wake_requests(mdsc, &session->s_waiting); 4283 mutex_unlock(&mdsc->mutex); 4284 4285 up_read(&mdsc->snap_rwsem); 4286 ceph_pagelist_release(recon_state.pagelist); 4287 return; 4288 4289 fail: 4290 ceph_msg_put(reply); 4291 up_read(&mdsc->snap_rwsem); 4292 mutex_unlock(&session->s_mutex); 4293 fail_nomsg: 4294 ceph_pagelist_release(recon_state.pagelist); 4295 fail_nopagelist: 4296 pr_err("error %d preparing reconnect for mds%d\n", err, mds); 4297 return; 4298 } 4299 4300 4301 /* 4302 * compare old and new mdsmaps, kicking requests 4303 * and closing out old connections as necessary 4304 * 4305 * called under mdsc->mutex. 4306 */ 4307 static void check_new_map(struct ceph_mds_client *mdsc, 4308 struct ceph_mdsmap *newmap, 4309 struct ceph_mdsmap *oldmap) 4310 { 4311 int i, j, err; 4312 int oldstate, newstate; 4313 struct ceph_mds_session *s; 4314 unsigned long targets[DIV_ROUND_UP(CEPH_MAX_MDS, sizeof(unsigned long))] = {0}; 4315 4316 dout("check_new_map new %u old %u\n", 4317 newmap->m_epoch, oldmap->m_epoch); 4318 4319 if (newmap->m_info) { 4320 for (i = 0; i < newmap->possible_max_rank; i++) { 4321 for (j = 0; j < newmap->m_info[i].num_export_targets; j++) 4322 set_bit(newmap->m_info[i].export_targets[j], targets); 4323 } 4324 } 4325 4326 for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) { 4327 if (!mdsc->sessions[i]) 4328 continue; 4329 s = mdsc->sessions[i]; 4330 oldstate = ceph_mdsmap_get_state(oldmap, i); 4331 newstate = ceph_mdsmap_get_state(newmap, i); 4332 4333 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n", 4334 i, ceph_mds_state_name(oldstate), 4335 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", 4336 ceph_mds_state_name(newstate), 4337 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", 4338 ceph_session_state_name(s->s_state)); 4339 4340 if (i >= newmap->possible_max_rank) { 4341 /* force close session for stopped mds */ 4342 ceph_get_mds_session(s); 4343 __unregister_session(mdsc, s); 4344 __wake_requests(mdsc, &s->s_waiting); 4345 mutex_unlock(&mdsc->mutex); 4346 4347 mutex_lock(&s->s_mutex); 4348 cleanup_session_requests(mdsc, s); 4349 remove_session_caps(s); 4350 mutex_unlock(&s->s_mutex); 4351 4352 ceph_put_mds_session(s); 4353 4354 mutex_lock(&mdsc->mutex); 4355 kick_requests(mdsc, i); 4356 continue; 4357 } 4358 4359 if (memcmp(ceph_mdsmap_get_addr(oldmap, i), 4360 ceph_mdsmap_get_addr(newmap, i), 4361 sizeof(struct ceph_entity_addr))) { 4362 /* just close it */ 4363 mutex_unlock(&mdsc->mutex); 4364 mutex_lock(&s->s_mutex); 4365 mutex_lock(&mdsc->mutex); 4366 ceph_con_close(&s->s_con); 4367 mutex_unlock(&s->s_mutex); 4368 s->s_state = CEPH_MDS_SESSION_RESTARTING; 4369 } else if (oldstate == newstate) { 4370 continue; /* nothing new with this mds */ 4371 } 4372 4373 /* 4374 * send reconnect? 4375 */ 4376 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 4377 newstate >= CEPH_MDS_STATE_RECONNECT) { 4378 mutex_unlock(&mdsc->mutex); 4379 clear_bit(i, targets); 4380 send_mds_reconnect(mdsc, s); 4381 mutex_lock(&mdsc->mutex); 4382 } 4383 4384 /* 4385 * kick request on any mds that has gone active. 4386 */ 4387 if (oldstate < CEPH_MDS_STATE_ACTIVE && 4388 newstate >= CEPH_MDS_STATE_ACTIVE) { 4389 if (oldstate != CEPH_MDS_STATE_CREATING && 4390 oldstate != CEPH_MDS_STATE_STARTING) 4391 pr_info("mds%d recovery completed\n", s->s_mds); 4392 kick_requests(mdsc, i); 4393 mutex_unlock(&mdsc->mutex); 4394 mutex_lock(&s->s_mutex); 4395 mutex_lock(&mdsc->mutex); 4396 ceph_kick_flushing_caps(mdsc, s); 4397 mutex_unlock(&s->s_mutex); 4398 wake_up_session_caps(s, RECONNECT); 4399 } 4400 } 4401 4402 /* 4403 * Only open and reconnect sessions that don't exist yet. 4404 */ 4405 for (i = 0; i < newmap->possible_max_rank; i++) { 4406 /* 4407 * In case the import MDS is crashed just after 4408 * the EImportStart journal is flushed, so when 4409 * a standby MDS takes over it and is replaying 4410 * the EImportStart journal the new MDS daemon 4411 * will wait the client to reconnect it, but the 4412 * client may never register/open the session yet. 4413 * 4414 * Will try to reconnect that MDS daemon if the 4415 * rank number is in the export targets array and 4416 * is the up:reconnect state. 4417 */ 4418 newstate = ceph_mdsmap_get_state(newmap, i); 4419 if (!test_bit(i, targets) || newstate != CEPH_MDS_STATE_RECONNECT) 4420 continue; 4421 4422 /* 4423 * The session maybe registered and opened by some 4424 * requests which were choosing random MDSes during 4425 * the mdsc->mutex's unlock/lock gap below in rare 4426 * case. But the related MDS daemon will just queue 4427 * that requests and be still waiting for the client's 4428 * reconnection request in up:reconnect state. 4429 */ 4430 s = __ceph_lookup_mds_session(mdsc, i); 4431 if (likely(!s)) { 4432 s = __open_export_target_session(mdsc, i); 4433 if (IS_ERR(s)) { 4434 err = PTR_ERR(s); 4435 pr_err("failed to open export target session, err %d\n", 4436 err); 4437 continue; 4438 } 4439 } 4440 dout("send reconnect to export target mds.%d\n", i); 4441 mutex_unlock(&mdsc->mutex); 4442 send_mds_reconnect(mdsc, s); 4443 ceph_put_mds_session(s); 4444 mutex_lock(&mdsc->mutex); 4445 } 4446 4447 for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) { 4448 s = mdsc->sessions[i]; 4449 if (!s) 4450 continue; 4451 if (!ceph_mdsmap_is_laggy(newmap, i)) 4452 continue; 4453 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4454 s->s_state == CEPH_MDS_SESSION_HUNG || 4455 s->s_state == CEPH_MDS_SESSION_CLOSING) { 4456 dout(" connecting to export targets of laggy mds%d\n", 4457 i); 4458 __open_export_target_sessions(mdsc, s); 4459 } 4460 } 4461 } 4462 4463 4464 4465 /* 4466 * leases 4467 */ 4468 4469 /* 4470 * caller must hold session s_mutex, dentry->d_lock 4471 */ 4472 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry) 4473 { 4474 struct ceph_dentry_info *di = ceph_dentry(dentry); 4475 4476 ceph_put_mds_session(di->lease_session); 4477 di->lease_session = NULL; 4478 } 4479 4480 static void handle_lease(struct ceph_mds_client *mdsc, 4481 struct ceph_mds_session *session, 4482 struct ceph_msg *msg) 4483 { 4484 struct super_block *sb = mdsc->fsc->sb; 4485 struct inode *inode; 4486 struct dentry *parent, *dentry; 4487 struct ceph_dentry_info *di; 4488 int mds = session->s_mds; 4489 struct ceph_mds_lease *h = msg->front.iov_base; 4490 u32 seq; 4491 struct ceph_vino vino; 4492 struct qstr dname; 4493 int release = 0; 4494 4495 dout("handle_lease from mds%d\n", mds); 4496 4497 /* decode */ 4498 if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) 4499 goto bad; 4500 vino.ino = le64_to_cpu(h->ino); 4501 vino.snap = CEPH_NOSNAP; 4502 seq = le32_to_cpu(h->seq); 4503 dname.len = get_unaligned_le32(h + 1); 4504 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len) 4505 goto bad; 4506 dname.name = (void *)(h + 1) + sizeof(u32); 4507 4508 /* lookup inode */ 4509 inode = ceph_find_inode(sb, vino); 4510 dout("handle_lease %s, ino %llx %p %.*s\n", 4511 ceph_lease_op_name(h->action), vino.ino, inode, 4512 dname.len, dname.name); 4513 4514 mutex_lock(&session->s_mutex); 4515 inc_session_sequence(session); 4516 4517 if (!inode) { 4518 dout("handle_lease no inode %llx\n", vino.ino); 4519 goto release; 4520 } 4521 4522 /* dentry */ 4523 parent = d_find_alias(inode); 4524 if (!parent) { 4525 dout("no parent dentry on inode %p\n", inode); 4526 WARN_ON(1); 4527 goto release; /* hrm... */ 4528 } 4529 dname.hash = full_name_hash(parent, dname.name, dname.len); 4530 dentry = d_lookup(parent, &dname); 4531 dput(parent); 4532 if (!dentry) 4533 goto release; 4534 4535 spin_lock(&dentry->d_lock); 4536 di = ceph_dentry(dentry); 4537 switch (h->action) { 4538 case CEPH_MDS_LEASE_REVOKE: 4539 if (di->lease_session == session) { 4540 if (ceph_seq_cmp(di->lease_seq, seq) > 0) 4541 h->seq = cpu_to_le32(di->lease_seq); 4542 __ceph_mdsc_drop_dentry_lease(dentry); 4543 } 4544 release = 1; 4545 break; 4546 4547 case CEPH_MDS_LEASE_RENEW: 4548 if (di->lease_session == session && 4549 di->lease_gen == atomic_read(&session->s_cap_gen) && 4550 di->lease_renew_from && 4551 di->lease_renew_after == 0) { 4552 unsigned long duration = 4553 msecs_to_jiffies(le32_to_cpu(h->duration_ms)); 4554 4555 di->lease_seq = seq; 4556 di->time = di->lease_renew_from + duration; 4557 di->lease_renew_after = di->lease_renew_from + 4558 (duration >> 1); 4559 di->lease_renew_from = 0; 4560 } 4561 break; 4562 } 4563 spin_unlock(&dentry->d_lock); 4564 dput(dentry); 4565 4566 if (!release) 4567 goto out; 4568 4569 release: 4570 /* let's just reuse the same message */ 4571 h->action = CEPH_MDS_LEASE_REVOKE_ACK; 4572 ceph_msg_get(msg); 4573 ceph_con_send(&session->s_con, msg); 4574 4575 out: 4576 mutex_unlock(&session->s_mutex); 4577 iput(inode); 4578 return; 4579 4580 bad: 4581 pr_err("corrupt lease message\n"); 4582 ceph_msg_dump(msg); 4583 } 4584 4585 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, 4586 struct dentry *dentry, char action, 4587 u32 seq) 4588 { 4589 struct ceph_msg *msg; 4590 struct ceph_mds_lease *lease; 4591 struct inode *dir; 4592 int len = sizeof(*lease) + sizeof(u32) + NAME_MAX; 4593 4594 dout("lease_send_msg identry %p %s to mds%d\n", 4595 dentry, ceph_lease_op_name(action), session->s_mds); 4596 4597 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); 4598 if (!msg) 4599 return; 4600 lease = msg->front.iov_base; 4601 lease->action = action; 4602 lease->seq = cpu_to_le32(seq); 4603 4604 spin_lock(&dentry->d_lock); 4605 dir = d_inode(dentry->d_parent); 4606 lease->ino = cpu_to_le64(ceph_ino(dir)); 4607 lease->first = lease->last = cpu_to_le64(ceph_snap(dir)); 4608 4609 put_unaligned_le32(dentry->d_name.len, lease + 1); 4610 memcpy((void *)(lease + 1) + 4, 4611 dentry->d_name.name, dentry->d_name.len); 4612 spin_unlock(&dentry->d_lock); 4613 4614 ceph_con_send(&session->s_con, msg); 4615 } 4616 4617 /* 4618 * lock unlock the session, to wait ongoing session activities 4619 */ 4620 static void lock_unlock_session(struct ceph_mds_session *s) 4621 { 4622 mutex_lock(&s->s_mutex); 4623 mutex_unlock(&s->s_mutex); 4624 } 4625 4626 static void maybe_recover_session(struct ceph_mds_client *mdsc) 4627 { 4628 struct ceph_fs_client *fsc = mdsc->fsc; 4629 4630 if (!ceph_test_mount_opt(fsc, CLEANRECOVER)) 4631 return; 4632 4633 if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED) 4634 return; 4635 4636 if (!READ_ONCE(fsc->blocklisted)) 4637 return; 4638 4639 pr_info("auto reconnect after blocklisted\n"); 4640 ceph_force_reconnect(fsc->sb); 4641 } 4642 4643 bool check_session_state(struct ceph_mds_session *s) 4644 { 4645 switch (s->s_state) { 4646 case CEPH_MDS_SESSION_OPEN: 4647 if (s->s_ttl && time_after(jiffies, s->s_ttl)) { 4648 s->s_state = CEPH_MDS_SESSION_HUNG; 4649 pr_info("mds%d hung\n", s->s_mds); 4650 } 4651 break; 4652 case CEPH_MDS_SESSION_CLOSING: 4653 case CEPH_MDS_SESSION_NEW: 4654 case CEPH_MDS_SESSION_RESTARTING: 4655 case CEPH_MDS_SESSION_CLOSED: 4656 case CEPH_MDS_SESSION_REJECTED: 4657 return false; 4658 } 4659 4660 return true; 4661 } 4662 4663 /* 4664 * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply, 4665 * then we need to retransmit that request. 4666 */ 4667 void inc_session_sequence(struct ceph_mds_session *s) 4668 { 4669 lockdep_assert_held(&s->s_mutex); 4670 4671 s->s_seq++; 4672 4673 if (s->s_state == CEPH_MDS_SESSION_CLOSING) { 4674 int ret; 4675 4676 dout("resending session close request for mds%d\n", s->s_mds); 4677 ret = request_close_session(s); 4678 if (ret < 0) 4679 pr_err("unable to close session to mds%d: %d\n", 4680 s->s_mds, ret); 4681 } 4682 } 4683 4684 /* 4685 * delayed work -- periodically trim expired leases, renew caps with mds. If 4686 * the @delay parameter is set to 0 or if it's more than 5 secs, the default 4687 * workqueue delay value of 5 secs will be used. 4688 */ 4689 static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay) 4690 { 4691 unsigned long max_delay = HZ * 5; 4692 4693 /* 5 secs default delay */ 4694 if (!delay || (delay > max_delay)) 4695 delay = max_delay; 4696 schedule_delayed_work(&mdsc->delayed_work, 4697 round_jiffies_relative(delay)); 4698 } 4699 4700 static void delayed_work(struct work_struct *work) 4701 { 4702 struct ceph_mds_client *mdsc = 4703 container_of(work, struct ceph_mds_client, delayed_work.work); 4704 unsigned long delay; 4705 int renew_interval; 4706 int renew_caps; 4707 int i; 4708 4709 dout("mdsc delayed_work\n"); 4710 4711 if (mdsc->stopping) 4712 return; 4713 4714 mutex_lock(&mdsc->mutex); 4715 renew_interval = mdsc->mdsmap->m_session_timeout >> 2; 4716 renew_caps = time_after_eq(jiffies, HZ*renew_interval + 4717 mdsc->last_renew_caps); 4718 if (renew_caps) 4719 mdsc->last_renew_caps = jiffies; 4720 4721 for (i = 0; i < mdsc->max_sessions; i++) { 4722 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 4723 if (!s) 4724 continue; 4725 4726 if (!check_session_state(s)) { 4727 ceph_put_mds_session(s); 4728 continue; 4729 } 4730 mutex_unlock(&mdsc->mutex); 4731 4732 mutex_lock(&s->s_mutex); 4733 if (renew_caps) 4734 send_renew_caps(mdsc, s); 4735 else 4736 ceph_con_keepalive(&s->s_con); 4737 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4738 s->s_state == CEPH_MDS_SESSION_HUNG) 4739 ceph_send_cap_releases(mdsc, s); 4740 mutex_unlock(&s->s_mutex); 4741 ceph_put_mds_session(s); 4742 4743 mutex_lock(&mdsc->mutex); 4744 } 4745 mutex_unlock(&mdsc->mutex); 4746 4747 delay = ceph_check_delayed_caps(mdsc); 4748 4749 ceph_queue_cap_reclaim_work(mdsc); 4750 4751 ceph_trim_snapid_map(mdsc); 4752 4753 maybe_recover_session(mdsc); 4754 4755 schedule_delayed(mdsc, delay); 4756 } 4757 4758 int ceph_mdsc_init(struct ceph_fs_client *fsc) 4759 4760 { 4761 struct ceph_mds_client *mdsc; 4762 int err; 4763 4764 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS); 4765 if (!mdsc) 4766 return -ENOMEM; 4767 mdsc->fsc = fsc; 4768 mutex_init(&mdsc->mutex); 4769 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); 4770 if (!mdsc->mdsmap) { 4771 err = -ENOMEM; 4772 goto err_mdsc; 4773 } 4774 4775 init_completion(&mdsc->safe_umount_waiters); 4776 init_waitqueue_head(&mdsc->session_close_wq); 4777 INIT_LIST_HEAD(&mdsc->waiting_for_map); 4778 mdsc->quotarealms_inodes = RB_ROOT; 4779 mutex_init(&mdsc->quotarealms_inodes_mutex); 4780 init_rwsem(&mdsc->snap_rwsem); 4781 mdsc->snap_realms = RB_ROOT; 4782 INIT_LIST_HEAD(&mdsc->snap_empty); 4783 spin_lock_init(&mdsc->snap_empty_lock); 4784 mdsc->request_tree = RB_ROOT; 4785 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); 4786 mdsc->last_renew_caps = jiffies; 4787 INIT_LIST_HEAD(&mdsc->cap_delay_list); 4788 INIT_LIST_HEAD(&mdsc->cap_wait_list); 4789 spin_lock_init(&mdsc->cap_delay_lock); 4790 INIT_LIST_HEAD(&mdsc->snap_flush_list); 4791 spin_lock_init(&mdsc->snap_flush_lock); 4792 mdsc->last_cap_flush_tid = 1; 4793 INIT_LIST_HEAD(&mdsc->cap_flush_list); 4794 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); 4795 spin_lock_init(&mdsc->cap_dirty_lock); 4796 init_waitqueue_head(&mdsc->cap_flushing_wq); 4797 INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work); 4798 err = ceph_metric_init(&mdsc->metric); 4799 if (err) 4800 goto err_mdsmap; 4801 4802 spin_lock_init(&mdsc->dentry_list_lock); 4803 INIT_LIST_HEAD(&mdsc->dentry_leases); 4804 INIT_LIST_HEAD(&mdsc->dentry_dir_leases); 4805 4806 ceph_caps_init(mdsc); 4807 ceph_adjust_caps_max_min(mdsc, fsc->mount_options); 4808 4809 spin_lock_init(&mdsc->snapid_map_lock); 4810 mdsc->snapid_map_tree = RB_ROOT; 4811 INIT_LIST_HEAD(&mdsc->snapid_map_lru); 4812 4813 init_rwsem(&mdsc->pool_perm_rwsem); 4814 mdsc->pool_perm_tree = RB_ROOT; 4815 4816 strscpy(mdsc->nodename, utsname()->nodename, 4817 sizeof(mdsc->nodename)); 4818 4819 fsc->mdsc = mdsc; 4820 return 0; 4821 4822 err_mdsmap: 4823 kfree(mdsc->mdsmap); 4824 err_mdsc: 4825 kfree(mdsc); 4826 return err; 4827 } 4828 4829 /* 4830 * Wait for safe replies on open mds requests. If we time out, drop 4831 * all requests from the tree to avoid dangling dentry refs. 4832 */ 4833 static void wait_requests(struct ceph_mds_client *mdsc) 4834 { 4835 struct ceph_options *opts = mdsc->fsc->client->options; 4836 struct ceph_mds_request *req; 4837 4838 mutex_lock(&mdsc->mutex); 4839 if (__get_oldest_req(mdsc)) { 4840 mutex_unlock(&mdsc->mutex); 4841 4842 dout("wait_requests waiting for requests\n"); 4843 wait_for_completion_timeout(&mdsc->safe_umount_waiters, 4844 ceph_timeout_jiffies(opts->mount_timeout)); 4845 4846 /* tear down remaining requests */ 4847 mutex_lock(&mdsc->mutex); 4848 while ((req = __get_oldest_req(mdsc))) { 4849 dout("wait_requests timed out on tid %llu\n", 4850 req->r_tid); 4851 list_del_init(&req->r_wait); 4852 __unregister_request(mdsc, req); 4853 } 4854 } 4855 mutex_unlock(&mdsc->mutex); 4856 dout("wait_requests done\n"); 4857 } 4858 4859 void send_flush_mdlog(struct ceph_mds_session *s) 4860 { 4861 struct ceph_msg *msg; 4862 4863 /* 4864 * Pre-luminous MDS crashes when it sees an unknown session request 4865 */ 4866 if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS)) 4867 return; 4868 4869 mutex_lock(&s->s_mutex); 4870 dout("request mdlog flush to mds%d (%s)s seq %lld\n", s->s_mds, 4871 ceph_session_state_name(s->s_state), s->s_seq); 4872 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG, 4873 s->s_seq); 4874 if (!msg) { 4875 pr_err("failed to request mdlog flush to mds%d (%s) seq %lld\n", 4876 s->s_mds, ceph_session_state_name(s->s_state), s->s_seq); 4877 } else { 4878 ceph_con_send(&s->s_con, msg); 4879 } 4880 mutex_unlock(&s->s_mutex); 4881 } 4882 4883 /* 4884 * called before mount is ro, and before dentries are torn down. 4885 * (hmm, does this still race with new lookups?) 4886 */ 4887 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) 4888 { 4889 dout("pre_umount\n"); 4890 mdsc->stopping = 1; 4891 4892 ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true); 4893 ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false); 4894 ceph_flush_dirty_caps(mdsc); 4895 wait_requests(mdsc); 4896 4897 /* 4898 * wait for reply handlers to drop their request refs and 4899 * their inode/dcache refs 4900 */ 4901 ceph_msgr_flush(); 4902 4903 ceph_cleanup_quotarealms_inodes(mdsc); 4904 } 4905 4906 /* 4907 * flush the mdlog and wait for all write mds requests to flush. 4908 */ 4909 static void flush_mdlog_and_wait_mdsc_unsafe_requests(struct ceph_mds_client *mdsc, 4910 u64 want_tid) 4911 { 4912 struct ceph_mds_request *req = NULL, *nextreq; 4913 struct ceph_mds_session *last_session = NULL; 4914 struct rb_node *n; 4915 4916 mutex_lock(&mdsc->mutex); 4917 dout("%s want %lld\n", __func__, want_tid); 4918 restart: 4919 req = __get_oldest_req(mdsc); 4920 while (req && req->r_tid <= want_tid) { 4921 /* find next request */ 4922 n = rb_next(&req->r_node); 4923 if (n) 4924 nextreq = rb_entry(n, struct ceph_mds_request, r_node); 4925 else 4926 nextreq = NULL; 4927 if (req->r_op != CEPH_MDS_OP_SETFILELOCK && 4928 (req->r_op & CEPH_MDS_OP_WRITE)) { 4929 struct ceph_mds_session *s = req->r_session; 4930 4931 if (!s) { 4932 req = nextreq; 4933 continue; 4934 } 4935 4936 /* write op */ 4937 ceph_mdsc_get_request(req); 4938 if (nextreq) 4939 ceph_mdsc_get_request(nextreq); 4940 s = ceph_get_mds_session(s); 4941 mutex_unlock(&mdsc->mutex); 4942 4943 /* send flush mdlog request to MDS */ 4944 if (last_session != s) { 4945 send_flush_mdlog(s); 4946 ceph_put_mds_session(last_session); 4947 last_session = s; 4948 } else { 4949 ceph_put_mds_session(s); 4950 } 4951 dout("%s wait on %llu (want %llu)\n", __func__, 4952 req->r_tid, want_tid); 4953 wait_for_completion(&req->r_safe_completion); 4954 4955 mutex_lock(&mdsc->mutex); 4956 ceph_mdsc_put_request(req); 4957 if (!nextreq) 4958 break; /* next dne before, so we're done! */ 4959 if (RB_EMPTY_NODE(&nextreq->r_node)) { 4960 /* next request was removed from tree */ 4961 ceph_mdsc_put_request(nextreq); 4962 goto restart; 4963 } 4964 ceph_mdsc_put_request(nextreq); /* won't go away */ 4965 } 4966 req = nextreq; 4967 } 4968 mutex_unlock(&mdsc->mutex); 4969 ceph_put_mds_session(last_session); 4970 dout("%s done\n", __func__); 4971 } 4972 4973 void ceph_mdsc_sync(struct ceph_mds_client *mdsc) 4974 { 4975 u64 want_tid, want_flush; 4976 4977 if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) 4978 return; 4979 4980 dout("sync\n"); 4981 mutex_lock(&mdsc->mutex); 4982 want_tid = mdsc->last_tid; 4983 mutex_unlock(&mdsc->mutex); 4984 4985 ceph_flush_dirty_caps(mdsc); 4986 spin_lock(&mdsc->cap_dirty_lock); 4987 want_flush = mdsc->last_cap_flush_tid; 4988 if (!list_empty(&mdsc->cap_flush_list)) { 4989 struct ceph_cap_flush *cf = 4990 list_last_entry(&mdsc->cap_flush_list, 4991 struct ceph_cap_flush, g_list); 4992 cf->wake = true; 4993 } 4994 spin_unlock(&mdsc->cap_dirty_lock); 4995 4996 dout("sync want tid %lld flush_seq %lld\n", 4997 want_tid, want_flush); 4998 4999 flush_mdlog_and_wait_mdsc_unsafe_requests(mdsc, want_tid); 5000 wait_caps_flush(mdsc, want_flush); 5001 } 5002 5003 /* 5004 * true if all sessions are closed, or we force unmount 5005 */ 5006 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped) 5007 { 5008 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 5009 return true; 5010 return atomic_read(&mdsc->num_sessions) <= skipped; 5011 } 5012 5013 /* 5014 * called after sb is ro. 5015 */ 5016 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) 5017 { 5018 struct ceph_options *opts = mdsc->fsc->client->options; 5019 struct ceph_mds_session *session; 5020 int i; 5021 int skipped = 0; 5022 5023 dout("close_sessions\n"); 5024 5025 /* close sessions */ 5026 mutex_lock(&mdsc->mutex); 5027 for (i = 0; i < mdsc->max_sessions; i++) { 5028 session = __ceph_lookup_mds_session(mdsc, i); 5029 if (!session) 5030 continue; 5031 mutex_unlock(&mdsc->mutex); 5032 mutex_lock(&session->s_mutex); 5033 if (__close_session(mdsc, session) <= 0) 5034 skipped++; 5035 mutex_unlock(&session->s_mutex); 5036 ceph_put_mds_session(session); 5037 mutex_lock(&mdsc->mutex); 5038 } 5039 mutex_unlock(&mdsc->mutex); 5040 5041 dout("waiting for sessions to close\n"); 5042 wait_event_timeout(mdsc->session_close_wq, 5043 done_closing_sessions(mdsc, skipped), 5044 ceph_timeout_jiffies(opts->mount_timeout)); 5045 5046 /* tear down remaining sessions */ 5047 mutex_lock(&mdsc->mutex); 5048 for (i = 0; i < mdsc->max_sessions; i++) { 5049 if (mdsc->sessions[i]) { 5050 session = ceph_get_mds_session(mdsc->sessions[i]); 5051 __unregister_session(mdsc, session); 5052 mutex_unlock(&mdsc->mutex); 5053 mutex_lock(&session->s_mutex); 5054 remove_session_caps(session); 5055 mutex_unlock(&session->s_mutex); 5056 ceph_put_mds_session(session); 5057 mutex_lock(&mdsc->mutex); 5058 } 5059 } 5060 WARN_ON(!list_empty(&mdsc->cap_delay_list)); 5061 mutex_unlock(&mdsc->mutex); 5062 5063 ceph_cleanup_snapid_map(mdsc); 5064 ceph_cleanup_global_and_empty_realms(mdsc); 5065 5066 cancel_work_sync(&mdsc->cap_reclaim_work); 5067 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 5068 5069 dout("stopped\n"); 5070 } 5071 5072 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc) 5073 { 5074 struct ceph_mds_session *session; 5075 int mds; 5076 5077 dout("force umount\n"); 5078 5079 mutex_lock(&mdsc->mutex); 5080 for (mds = 0; mds < mdsc->max_sessions; mds++) { 5081 session = __ceph_lookup_mds_session(mdsc, mds); 5082 if (!session) 5083 continue; 5084 5085 if (session->s_state == CEPH_MDS_SESSION_REJECTED) 5086 __unregister_session(mdsc, session); 5087 __wake_requests(mdsc, &session->s_waiting); 5088 mutex_unlock(&mdsc->mutex); 5089 5090 mutex_lock(&session->s_mutex); 5091 __close_session(mdsc, session); 5092 if (session->s_state == CEPH_MDS_SESSION_CLOSING) { 5093 cleanup_session_requests(mdsc, session); 5094 remove_session_caps(session); 5095 } 5096 mutex_unlock(&session->s_mutex); 5097 ceph_put_mds_session(session); 5098 5099 mutex_lock(&mdsc->mutex); 5100 kick_requests(mdsc, mds); 5101 } 5102 __wake_requests(mdsc, &mdsc->waiting_for_map); 5103 mutex_unlock(&mdsc->mutex); 5104 } 5105 5106 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) 5107 { 5108 dout("stop\n"); 5109 /* 5110 * Make sure the delayed work stopped before releasing 5111 * the resources. 5112 * 5113 * Because the cancel_delayed_work_sync() will only 5114 * guarantee that the work finishes executing. But the 5115 * delayed work will re-arm itself again after that. 5116 */ 5117 flush_delayed_work(&mdsc->delayed_work); 5118 5119 if (mdsc->mdsmap) 5120 ceph_mdsmap_destroy(mdsc->mdsmap); 5121 kfree(mdsc->sessions); 5122 ceph_caps_finalize(mdsc); 5123 ceph_pool_perm_destroy(mdsc); 5124 } 5125 5126 void ceph_mdsc_destroy(struct ceph_fs_client *fsc) 5127 { 5128 struct ceph_mds_client *mdsc = fsc->mdsc; 5129 dout("mdsc_destroy %p\n", mdsc); 5130 5131 if (!mdsc) 5132 return; 5133 5134 /* flush out any connection work with references to us */ 5135 ceph_msgr_flush(); 5136 5137 ceph_mdsc_stop(mdsc); 5138 5139 ceph_metric_destroy(&mdsc->metric); 5140 5141 fsc->mdsc = NULL; 5142 kfree(mdsc); 5143 dout("mdsc_destroy %p done\n", mdsc); 5144 } 5145 5146 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 5147 { 5148 struct ceph_fs_client *fsc = mdsc->fsc; 5149 const char *mds_namespace = fsc->mount_options->mds_namespace; 5150 void *p = msg->front.iov_base; 5151 void *end = p + msg->front.iov_len; 5152 u32 epoch; 5153 u32 num_fs; 5154 u32 mount_fscid = (u32)-1; 5155 int err = -EINVAL; 5156 5157 ceph_decode_need(&p, end, sizeof(u32), bad); 5158 epoch = ceph_decode_32(&p); 5159 5160 dout("handle_fsmap epoch %u\n", epoch); 5161 5162 /* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */ 5163 ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad); 5164 5165 ceph_decode_32_safe(&p, end, num_fs, bad); 5166 while (num_fs-- > 0) { 5167 void *info_p, *info_end; 5168 u32 info_len; 5169 u32 fscid, namelen; 5170 5171 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 5172 p += 2; // info_v, info_cv 5173 info_len = ceph_decode_32(&p); 5174 ceph_decode_need(&p, end, info_len, bad); 5175 info_p = p; 5176 info_end = p + info_len; 5177 p = info_end; 5178 5179 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad); 5180 fscid = ceph_decode_32(&info_p); 5181 namelen = ceph_decode_32(&info_p); 5182 ceph_decode_need(&info_p, info_end, namelen, bad); 5183 5184 if (mds_namespace && 5185 strlen(mds_namespace) == namelen && 5186 !strncmp(mds_namespace, (char *)info_p, namelen)) { 5187 mount_fscid = fscid; 5188 break; 5189 } 5190 } 5191 5192 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch); 5193 if (mount_fscid != (u32)-1) { 5194 fsc->client->monc.fs_cluster_id = mount_fscid; 5195 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 5196 0, true); 5197 ceph_monc_renew_subs(&fsc->client->monc); 5198 } else { 5199 err = -ENOENT; 5200 goto err_out; 5201 } 5202 return; 5203 5204 bad: 5205 pr_err("error decoding fsmap %d. Shutting down mount.\n", err); 5206 ceph_umount_begin(mdsc->fsc->sb); 5207 err_out: 5208 mutex_lock(&mdsc->mutex); 5209 mdsc->mdsmap_err = err; 5210 __wake_requests(mdsc, &mdsc->waiting_for_map); 5211 mutex_unlock(&mdsc->mutex); 5212 } 5213 5214 /* 5215 * handle mds map update. 5216 */ 5217 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 5218 { 5219 u32 epoch; 5220 u32 maplen; 5221 void *p = msg->front.iov_base; 5222 void *end = p + msg->front.iov_len; 5223 struct ceph_mdsmap *newmap, *oldmap; 5224 struct ceph_fsid fsid; 5225 int err = -EINVAL; 5226 5227 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad); 5228 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 5229 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0) 5230 return; 5231 epoch = ceph_decode_32(&p); 5232 maplen = ceph_decode_32(&p); 5233 dout("handle_map epoch %u len %d\n", epoch, (int)maplen); 5234 5235 /* do we need it? */ 5236 mutex_lock(&mdsc->mutex); 5237 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 5238 dout("handle_map epoch %u <= our %u\n", 5239 epoch, mdsc->mdsmap->m_epoch); 5240 mutex_unlock(&mdsc->mutex); 5241 return; 5242 } 5243 5244 newmap = ceph_mdsmap_decode(&p, end, ceph_msgr2(mdsc->fsc->client)); 5245 if (IS_ERR(newmap)) { 5246 err = PTR_ERR(newmap); 5247 goto bad_unlock; 5248 } 5249 5250 /* swap into place */ 5251 if (mdsc->mdsmap) { 5252 oldmap = mdsc->mdsmap; 5253 mdsc->mdsmap = newmap; 5254 check_new_map(mdsc, newmap, oldmap); 5255 ceph_mdsmap_destroy(oldmap); 5256 } else { 5257 mdsc->mdsmap = newmap; /* first mds map */ 5258 } 5259 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size, 5260 MAX_LFS_FILESIZE); 5261 5262 __wake_requests(mdsc, &mdsc->waiting_for_map); 5263 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP, 5264 mdsc->mdsmap->m_epoch); 5265 5266 mutex_unlock(&mdsc->mutex); 5267 schedule_delayed(mdsc, 0); 5268 return; 5269 5270 bad_unlock: 5271 mutex_unlock(&mdsc->mutex); 5272 bad: 5273 pr_err("error decoding mdsmap %d. Shutting down mount.\n", err); 5274 ceph_umount_begin(mdsc->fsc->sb); 5275 return; 5276 } 5277 5278 static struct ceph_connection *mds_get_con(struct ceph_connection *con) 5279 { 5280 struct ceph_mds_session *s = con->private; 5281 5282 if (ceph_get_mds_session(s)) 5283 return con; 5284 return NULL; 5285 } 5286 5287 static void mds_put_con(struct ceph_connection *con) 5288 { 5289 struct ceph_mds_session *s = con->private; 5290 5291 ceph_put_mds_session(s); 5292 } 5293 5294 /* 5295 * if the client is unresponsive for long enough, the mds will kill 5296 * the session entirely. 5297 */ 5298 static void mds_peer_reset(struct ceph_connection *con) 5299 { 5300 struct ceph_mds_session *s = con->private; 5301 struct ceph_mds_client *mdsc = s->s_mdsc; 5302 5303 pr_warn("mds%d closed our session\n", s->s_mds); 5304 send_mds_reconnect(mdsc, s); 5305 } 5306 5307 static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg) 5308 { 5309 struct ceph_mds_session *s = con->private; 5310 struct ceph_mds_client *mdsc = s->s_mdsc; 5311 int type = le16_to_cpu(msg->hdr.type); 5312 5313 mutex_lock(&mdsc->mutex); 5314 if (__verify_registered_session(mdsc, s) < 0) { 5315 mutex_unlock(&mdsc->mutex); 5316 goto out; 5317 } 5318 mutex_unlock(&mdsc->mutex); 5319 5320 switch (type) { 5321 case CEPH_MSG_MDS_MAP: 5322 ceph_mdsc_handle_mdsmap(mdsc, msg); 5323 break; 5324 case CEPH_MSG_FS_MAP_USER: 5325 ceph_mdsc_handle_fsmap(mdsc, msg); 5326 break; 5327 case CEPH_MSG_CLIENT_SESSION: 5328 handle_session(s, msg); 5329 break; 5330 case CEPH_MSG_CLIENT_REPLY: 5331 handle_reply(s, msg); 5332 break; 5333 case CEPH_MSG_CLIENT_REQUEST_FORWARD: 5334 handle_forward(mdsc, s, msg); 5335 break; 5336 case CEPH_MSG_CLIENT_CAPS: 5337 ceph_handle_caps(s, msg); 5338 break; 5339 case CEPH_MSG_CLIENT_SNAP: 5340 ceph_handle_snap(mdsc, s, msg); 5341 break; 5342 case CEPH_MSG_CLIENT_LEASE: 5343 handle_lease(mdsc, s, msg); 5344 break; 5345 case CEPH_MSG_CLIENT_QUOTA: 5346 ceph_handle_quota(mdsc, s, msg); 5347 break; 5348 5349 default: 5350 pr_err("received unknown message type %d %s\n", type, 5351 ceph_msg_type_name(type)); 5352 } 5353 out: 5354 ceph_msg_put(msg); 5355 } 5356 5357 /* 5358 * authentication 5359 */ 5360 5361 /* 5362 * Note: returned pointer is the address of a structure that's 5363 * managed separately. Caller must *not* attempt to free it. 5364 */ 5365 static struct ceph_auth_handshake * 5366 mds_get_authorizer(struct ceph_connection *con, int *proto, int force_new) 5367 { 5368 struct ceph_mds_session *s = con->private; 5369 struct ceph_mds_client *mdsc = s->s_mdsc; 5370 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5371 struct ceph_auth_handshake *auth = &s->s_auth; 5372 int ret; 5373 5374 ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS, 5375 force_new, proto, NULL, NULL); 5376 if (ret) 5377 return ERR_PTR(ret); 5378 5379 return auth; 5380 } 5381 5382 static int mds_add_authorizer_challenge(struct ceph_connection *con, 5383 void *challenge_buf, int challenge_buf_len) 5384 { 5385 struct ceph_mds_session *s = con->private; 5386 struct ceph_mds_client *mdsc = s->s_mdsc; 5387 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5388 5389 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer, 5390 challenge_buf, challenge_buf_len); 5391 } 5392 5393 static int mds_verify_authorizer_reply(struct ceph_connection *con) 5394 { 5395 struct ceph_mds_session *s = con->private; 5396 struct ceph_mds_client *mdsc = s->s_mdsc; 5397 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5398 struct ceph_auth_handshake *auth = &s->s_auth; 5399 5400 return ceph_auth_verify_authorizer_reply(ac, auth->authorizer, 5401 auth->authorizer_reply_buf, auth->authorizer_reply_buf_len, 5402 NULL, NULL, NULL, NULL); 5403 } 5404 5405 static int mds_invalidate_authorizer(struct ceph_connection *con) 5406 { 5407 struct ceph_mds_session *s = con->private; 5408 struct ceph_mds_client *mdsc = s->s_mdsc; 5409 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5410 5411 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS); 5412 5413 return ceph_monc_validate_auth(&mdsc->fsc->client->monc); 5414 } 5415 5416 static int mds_get_auth_request(struct ceph_connection *con, 5417 void *buf, int *buf_len, 5418 void **authorizer, int *authorizer_len) 5419 { 5420 struct ceph_mds_session *s = con->private; 5421 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 5422 struct ceph_auth_handshake *auth = &s->s_auth; 5423 int ret; 5424 5425 ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS, 5426 buf, buf_len); 5427 if (ret) 5428 return ret; 5429 5430 *authorizer = auth->authorizer_buf; 5431 *authorizer_len = auth->authorizer_buf_len; 5432 return 0; 5433 } 5434 5435 static int mds_handle_auth_reply_more(struct ceph_connection *con, 5436 void *reply, int reply_len, 5437 void *buf, int *buf_len, 5438 void **authorizer, int *authorizer_len) 5439 { 5440 struct ceph_mds_session *s = con->private; 5441 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 5442 struct ceph_auth_handshake *auth = &s->s_auth; 5443 int ret; 5444 5445 ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len, 5446 buf, buf_len); 5447 if (ret) 5448 return ret; 5449 5450 *authorizer = auth->authorizer_buf; 5451 *authorizer_len = auth->authorizer_buf_len; 5452 return 0; 5453 } 5454 5455 static int mds_handle_auth_done(struct ceph_connection *con, 5456 u64 global_id, void *reply, int reply_len, 5457 u8 *session_key, int *session_key_len, 5458 u8 *con_secret, int *con_secret_len) 5459 { 5460 struct ceph_mds_session *s = con->private; 5461 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 5462 struct ceph_auth_handshake *auth = &s->s_auth; 5463 5464 return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len, 5465 session_key, session_key_len, 5466 con_secret, con_secret_len); 5467 } 5468 5469 static int mds_handle_auth_bad_method(struct ceph_connection *con, 5470 int used_proto, int result, 5471 const int *allowed_protos, int proto_cnt, 5472 const int *allowed_modes, int mode_cnt) 5473 { 5474 struct ceph_mds_session *s = con->private; 5475 struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc; 5476 int ret; 5477 5478 if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS, 5479 used_proto, result, 5480 allowed_protos, proto_cnt, 5481 allowed_modes, mode_cnt)) { 5482 ret = ceph_monc_validate_auth(monc); 5483 if (ret) 5484 return ret; 5485 } 5486 5487 return -EACCES; 5488 } 5489 5490 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con, 5491 struct ceph_msg_header *hdr, int *skip) 5492 { 5493 struct ceph_msg *msg; 5494 int type = (int) le16_to_cpu(hdr->type); 5495 int front_len = (int) le32_to_cpu(hdr->front_len); 5496 5497 if (con->in_msg) 5498 return con->in_msg; 5499 5500 *skip = 0; 5501 msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 5502 if (!msg) { 5503 pr_err("unable to allocate msg type %d len %d\n", 5504 type, front_len); 5505 return NULL; 5506 } 5507 5508 return msg; 5509 } 5510 5511 static int mds_sign_message(struct ceph_msg *msg) 5512 { 5513 struct ceph_mds_session *s = msg->con->private; 5514 struct ceph_auth_handshake *auth = &s->s_auth; 5515 5516 return ceph_auth_sign_message(auth, msg); 5517 } 5518 5519 static int mds_check_message_signature(struct ceph_msg *msg) 5520 { 5521 struct ceph_mds_session *s = msg->con->private; 5522 struct ceph_auth_handshake *auth = &s->s_auth; 5523 5524 return ceph_auth_check_message_signature(auth, msg); 5525 } 5526 5527 static const struct ceph_connection_operations mds_con_ops = { 5528 .get = mds_get_con, 5529 .put = mds_put_con, 5530 .alloc_msg = mds_alloc_msg, 5531 .dispatch = mds_dispatch, 5532 .peer_reset = mds_peer_reset, 5533 .get_authorizer = mds_get_authorizer, 5534 .add_authorizer_challenge = mds_add_authorizer_challenge, 5535 .verify_authorizer_reply = mds_verify_authorizer_reply, 5536 .invalidate_authorizer = mds_invalidate_authorizer, 5537 .sign_message = mds_sign_message, 5538 .check_message_signature = mds_check_message_signature, 5539 .get_auth_request = mds_get_auth_request, 5540 .handle_auth_reply_more = mds_handle_auth_reply_more, 5541 .handle_auth_done = mds_handle_auth_done, 5542 .handle_auth_bad_method = mds_handle_auth_bad_method, 5543 }; 5544 5545 /* eof */ 5546