1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/fs.h> 5 #include <linux/wait.h> 6 #include <linux/slab.h> 7 #include <linux/gfp.h> 8 #include <linux/sched.h> 9 #include <linux/debugfs.h> 10 #include <linux/seq_file.h> 11 #include <linux/ratelimit.h> 12 #include <linux/bits.h> 13 #include <linux/ktime.h> 14 15 #include "super.h" 16 #include "mds_client.h" 17 18 #include <linux/ceph/ceph_features.h> 19 #include <linux/ceph/messenger.h> 20 #include <linux/ceph/decode.h> 21 #include <linux/ceph/pagelist.h> 22 #include <linux/ceph/auth.h> 23 #include <linux/ceph/debugfs.h> 24 25 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE) 26 27 /* 28 * A cluster of MDS (metadata server) daemons is responsible for 29 * managing the file system namespace (the directory hierarchy and 30 * inodes) and for coordinating shared access to storage. Metadata is 31 * partitioning hierarchically across a number of servers, and that 32 * partition varies over time as the cluster adjusts the distribution 33 * in order to balance load. 34 * 35 * The MDS client is primarily responsible to managing synchronous 36 * metadata requests for operations like open, unlink, and so forth. 37 * If there is a MDS failure, we find out about it when we (possibly 38 * request and) receive a new MDS map, and can resubmit affected 39 * requests. 40 * 41 * For the most part, though, we take advantage of a lossless 42 * communications channel to the MDS, and do not need to worry about 43 * timing out or resubmitting requests. 44 * 45 * We maintain a stateful "session" with each MDS we interact with. 46 * Within each session, we sent periodic heartbeat messages to ensure 47 * any capabilities or leases we have been issues remain valid. If 48 * the session times out and goes stale, our leases and capabilities 49 * are no longer valid. 50 */ 51 52 struct ceph_reconnect_state { 53 struct ceph_mds_session *session; 54 int nr_caps, nr_realms; 55 struct ceph_pagelist *pagelist; 56 unsigned msg_version; 57 bool allow_multi; 58 }; 59 60 static void __wake_requests(struct ceph_mds_client *mdsc, 61 struct list_head *head); 62 static void ceph_cap_release_work(struct work_struct *work); 63 static void ceph_cap_reclaim_work(struct work_struct *work); 64 65 static const struct ceph_connection_operations mds_con_ops; 66 67 68 /* 69 * mds reply parsing 70 */ 71 72 static int parse_reply_info_quota(void **p, void *end, 73 struct ceph_mds_reply_info_in *info) 74 { 75 u8 struct_v, struct_compat; 76 u32 struct_len; 77 78 ceph_decode_8_safe(p, end, struct_v, bad); 79 ceph_decode_8_safe(p, end, struct_compat, bad); 80 /* struct_v is expected to be >= 1. we only 81 * understand encoding with struct_compat == 1. */ 82 if (!struct_v || struct_compat != 1) 83 goto bad; 84 ceph_decode_32_safe(p, end, struct_len, bad); 85 ceph_decode_need(p, end, struct_len, bad); 86 end = *p + struct_len; 87 ceph_decode_64_safe(p, end, info->max_bytes, bad); 88 ceph_decode_64_safe(p, end, info->max_files, bad); 89 *p = end; 90 return 0; 91 bad: 92 return -EIO; 93 } 94 95 /* 96 * parse individual inode info 97 */ 98 static int parse_reply_info_in(void **p, void *end, 99 struct ceph_mds_reply_info_in *info, 100 u64 features) 101 { 102 int err = 0; 103 u8 struct_v = 0; 104 105 if (features == (u64)-1) { 106 u32 struct_len; 107 u8 struct_compat; 108 ceph_decode_8_safe(p, end, struct_v, bad); 109 ceph_decode_8_safe(p, end, struct_compat, bad); 110 /* struct_v is expected to be >= 1. we only understand 111 * encoding with struct_compat == 1. */ 112 if (!struct_v || struct_compat != 1) 113 goto bad; 114 ceph_decode_32_safe(p, end, struct_len, bad); 115 ceph_decode_need(p, end, struct_len, bad); 116 end = *p + struct_len; 117 } 118 119 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad); 120 info->in = *p; 121 *p += sizeof(struct ceph_mds_reply_inode) + 122 sizeof(*info->in->fragtree.splits) * 123 le32_to_cpu(info->in->fragtree.nsplits); 124 125 ceph_decode_32_safe(p, end, info->symlink_len, bad); 126 ceph_decode_need(p, end, info->symlink_len, bad); 127 info->symlink = *p; 128 *p += info->symlink_len; 129 130 ceph_decode_copy_safe(p, end, &info->dir_layout, 131 sizeof(info->dir_layout), bad); 132 ceph_decode_32_safe(p, end, info->xattr_len, bad); 133 ceph_decode_need(p, end, info->xattr_len, bad); 134 info->xattr_data = *p; 135 *p += info->xattr_len; 136 137 if (features == (u64)-1) { 138 /* inline data */ 139 ceph_decode_64_safe(p, end, info->inline_version, bad); 140 ceph_decode_32_safe(p, end, info->inline_len, bad); 141 ceph_decode_need(p, end, info->inline_len, bad); 142 info->inline_data = *p; 143 *p += info->inline_len; 144 /* quota */ 145 err = parse_reply_info_quota(p, end, info); 146 if (err < 0) 147 goto out_bad; 148 /* pool namespace */ 149 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 150 if (info->pool_ns_len > 0) { 151 ceph_decode_need(p, end, info->pool_ns_len, bad); 152 info->pool_ns_data = *p; 153 *p += info->pool_ns_len; 154 } 155 156 /* btime */ 157 ceph_decode_need(p, end, sizeof(info->btime), bad); 158 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 159 160 /* change attribute */ 161 ceph_decode_64_safe(p, end, info->change_attr, bad); 162 163 /* dir pin */ 164 if (struct_v >= 2) { 165 ceph_decode_32_safe(p, end, info->dir_pin, bad); 166 } else { 167 info->dir_pin = -ENODATA; 168 } 169 170 /* snapshot birth time, remains zero for v<=2 */ 171 if (struct_v >= 3) { 172 ceph_decode_need(p, end, sizeof(info->snap_btime), bad); 173 ceph_decode_copy(p, &info->snap_btime, 174 sizeof(info->snap_btime)); 175 } else { 176 memset(&info->snap_btime, 0, sizeof(info->snap_btime)); 177 } 178 179 *p = end; 180 } else { 181 if (features & CEPH_FEATURE_MDS_INLINE_DATA) { 182 ceph_decode_64_safe(p, end, info->inline_version, bad); 183 ceph_decode_32_safe(p, end, info->inline_len, bad); 184 ceph_decode_need(p, end, info->inline_len, bad); 185 info->inline_data = *p; 186 *p += info->inline_len; 187 } else 188 info->inline_version = CEPH_INLINE_NONE; 189 190 if (features & CEPH_FEATURE_MDS_QUOTA) { 191 err = parse_reply_info_quota(p, end, info); 192 if (err < 0) 193 goto out_bad; 194 } else { 195 info->max_bytes = 0; 196 info->max_files = 0; 197 } 198 199 info->pool_ns_len = 0; 200 info->pool_ns_data = NULL; 201 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { 202 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 203 if (info->pool_ns_len > 0) { 204 ceph_decode_need(p, end, info->pool_ns_len, bad); 205 info->pool_ns_data = *p; 206 *p += info->pool_ns_len; 207 } 208 } 209 210 if (features & CEPH_FEATURE_FS_BTIME) { 211 ceph_decode_need(p, end, sizeof(info->btime), bad); 212 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 213 ceph_decode_64_safe(p, end, info->change_attr, bad); 214 } 215 216 info->dir_pin = -ENODATA; 217 /* info->snap_btime remains zero */ 218 } 219 return 0; 220 bad: 221 err = -EIO; 222 out_bad: 223 return err; 224 } 225 226 static int parse_reply_info_dir(void **p, void *end, 227 struct ceph_mds_reply_dirfrag **dirfrag, 228 u64 features) 229 { 230 if (features == (u64)-1) { 231 u8 struct_v, struct_compat; 232 u32 struct_len; 233 ceph_decode_8_safe(p, end, struct_v, bad); 234 ceph_decode_8_safe(p, end, struct_compat, bad); 235 /* struct_v is expected to be >= 1. we only understand 236 * encoding whose struct_compat == 1. */ 237 if (!struct_v || struct_compat != 1) 238 goto bad; 239 ceph_decode_32_safe(p, end, struct_len, bad); 240 ceph_decode_need(p, end, struct_len, bad); 241 end = *p + struct_len; 242 } 243 244 ceph_decode_need(p, end, sizeof(**dirfrag), bad); 245 *dirfrag = *p; 246 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist); 247 if (unlikely(*p > end)) 248 goto bad; 249 if (features == (u64)-1) 250 *p = end; 251 return 0; 252 bad: 253 return -EIO; 254 } 255 256 static int parse_reply_info_lease(void **p, void *end, 257 struct ceph_mds_reply_lease **lease, 258 u64 features) 259 { 260 if (features == (u64)-1) { 261 u8 struct_v, struct_compat; 262 u32 struct_len; 263 ceph_decode_8_safe(p, end, struct_v, bad); 264 ceph_decode_8_safe(p, end, struct_compat, bad); 265 /* struct_v is expected to be >= 1. we only understand 266 * encoding whose struct_compat == 1. */ 267 if (!struct_v || struct_compat != 1) 268 goto bad; 269 ceph_decode_32_safe(p, end, struct_len, bad); 270 ceph_decode_need(p, end, struct_len, bad); 271 end = *p + struct_len; 272 } 273 274 ceph_decode_need(p, end, sizeof(**lease), bad); 275 *lease = *p; 276 *p += sizeof(**lease); 277 if (features == (u64)-1) 278 *p = end; 279 return 0; 280 bad: 281 return -EIO; 282 } 283 284 /* 285 * parse a normal reply, which may contain a (dir+)dentry and/or a 286 * target inode. 287 */ 288 static int parse_reply_info_trace(void **p, void *end, 289 struct ceph_mds_reply_info_parsed *info, 290 u64 features) 291 { 292 int err; 293 294 if (info->head->is_dentry) { 295 err = parse_reply_info_in(p, end, &info->diri, features); 296 if (err < 0) 297 goto out_bad; 298 299 err = parse_reply_info_dir(p, end, &info->dirfrag, features); 300 if (err < 0) 301 goto out_bad; 302 303 ceph_decode_32_safe(p, end, info->dname_len, bad); 304 ceph_decode_need(p, end, info->dname_len, bad); 305 info->dname = *p; 306 *p += info->dname_len; 307 308 err = parse_reply_info_lease(p, end, &info->dlease, features); 309 if (err < 0) 310 goto out_bad; 311 } 312 313 if (info->head->is_target) { 314 err = parse_reply_info_in(p, end, &info->targeti, features); 315 if (err < 0) 316 goto out_bad; 317 } 318 319 if (unlikely(*p != end)) 320 goto bad; 321 return 0; 322 323 bad: 324 err = -EIO; 325 out_bad: 326 pr_err("problem parsing mds trace %d\n", err); 327 return err; 328 } 329 330 /* 331 * parse readdir results 332 */ 333 static int parse_reply_info_readdir(void **p, void *end, 334 struct ceph_mds_reply_info_parsed *info, 335 u64 features) 336 { 337 u32 num, i = 0; 338 int err; 339 340 err = parse_reply_info_dir(p, end, &info->dir_dir, features); 341 if (err < 0) 342 goto out_bad; 343 344 ceph_decode_need(p, end, sizeof(num) + 2, bad); 345 num = ceph_decode_32(p); 346 { 347 u16 flags = ceph_decode_16(p); 348 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END); 349 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE); 350 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER); 351 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH); 352 } 353 if (num == 0) 354 goto done; 355 356 BUG_ON(!info->dir_entries); 357 if ((unsigned long)(info->dir_entries + num) > 358 (unsigned long)info->dir_entries + info->dir_buf_size) { 359 pr_err("dir contents are larger than expected\n"); 360 WARN_ON(1); 361 goto bad; 362 } 363 364 info->dir_nr = num; 365 while (num) { 366 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 367 /* dentry */ 368 ceph_decode_32_safe(p, end, rde->name_len, bad); 369 ceph_decode_need(p, end, rde->name_len, bad); 370 rde->name = *p; 371 *p += rde->name_len; 372 dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name); 373 374 /* dentry lease */ 375 err = parse_reply_info_lease(p, end, &rde->lease, features); 376 if (err) 377 goto out_bad; 378 /* inode */ 379 err = parse_reply_info_in(p, end, &rde->inode, features); 380 if (err < 0) 381 goto out_bad; 382 /* ceph_readdir_prepopulate() will update it */ 383 rde->offset = 0; 384 i++; 385 num--; 386 } 387 388 done: 389 /* Skip over any unrecognized fields */ 390 *p = end; 391 return 0; 392 393 bad: 394 err = -EIO; 395 out_bad: 396 pr_err("problem parsing dir contents %d\n", err); 397 return err; 398 } 399 400 /* 401 * parse fcntl F_GETLK results 402 */ 403 static int parse_reply_info_filelock(void **p, void *end, 404 struct ceph_mds_reply_info_parsed *info, 405 u64 features) 406 { 407 if (*p + sizeof(*info->filelock_reply) > end) 408 goto bad; 409 410 info->filelock_reply = *p; 411 412 /* Skip over any unrecognized fields */ 413 *p = end; 414 return 0; 415 bad: 416 return -EIO; 417 } 418 419 420 #if BITS_PER_LONG == 64 421 422 #define DELEGATED_INO_AVAILABLE xa_mk_value(1) 423 424 static int ceph_parse_deleg_inos(void **p, void *end, 425 struct ceph_mds_session *s) 426 { 427 u32 sets; 428 429 ceph_decode_32_safe(p, end, sets, bad); 430 dout("got %u sets of delegated inodes\n", sets); 431 while (sets--) { 432 u64 start, len, ino; 433 434 ceph_decode_64_safe(p, end, start, bad); 435 ceph_decode_64_safe(p, end, len, bad); 436 while (len--) { 437 int err = xa_insert(&s->s_delegated_inos, ino = start++, 438 DELEGATED_INO_AVAILABLE, 439 GFP_KERNEL); 440 if (!err) { 441 dout("added delegated inode 0x%llx\n", 442 start - 1); 443 } else if (err == -EBUSY) { 444 pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n", 445 start - 1); 446 } else { 447 return err; 448 } 449 } 450 } 451 return 0; 452 bad: 453 return -EIO; 454 } 455 456 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 457 { 458 unsigned long ino; 459 void *val; 460 461 xa_for_each(&s->s_delegated_inos, ino, val) { 462 val = xa_erase(&s->s_delegated_inos, ino); 463 if (val == DELEGATED_INO_AVAILABLE) 464 return ino; 465 } 466 return 0; 467 } 468 469 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 470 { 471 return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE, 472 GFP_KERNEL); 473 } 474 #else /* BITS_PER_LONG == 64 */ 475 /* 476 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just 477 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top 478 * and bottom words? 479 */ 480 static int ceph_parse_deleg_inos(void **p, void *end, 481 struct ceph_mds_session *s) 482 { 483 u32 sets; 484 485 ceph_decode_32_safe(p, end, sets, bad); 486 if (sets) 487 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad); 488 return 0; 489 bad: 490 return -EIO; 491 } 492 493 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 494 { 495 return 0; 496 } 497 498 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 499 { 500 return 0; 501 } 502 #endif /* BITS_PER_LONG == 64 */ 503 504 /* 505 * parse create results 506 */ 507 static int parse_reply_info_create(void **p, void *end, 508 struct ceph_mds_reply_info_parsed *info, 509 u64 features, struct ceph_mds_session *s) 510 { 511 int ret; 512 513 if (features == (u64)-1 || 514 (features & CEPH_FEATURE_REPLY_CREATE_INODE)) { 515 if (*p == end) { 516 /* Malformed reply? */ 517 info->has_create_ino = false; 518 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) { 519 u8 struct_v, struct_compat; 520 u32 len; 521 522 info->has_create_ino = true; 523 ceph_decode_8_safe(p, end, struct_v, bad); 524 ceph_decode_8_safe(p, end, struct_compat, bad); 525 ceph_decode_32_safe(p, end, len, bad); 526 ceph_decode_64_safe(p, end, info->ino, bad); 527 ret = ceph_parse_deleg_inos(p, end, s); 528 if (ret) 529 return ret; 530 } else { 531 /* legacy */ 532 ceph_decode_64_safe(p, end, info->ino, bad); 533 info->has_create_ino = true; 534 } 535 } else { 536 if (*p != end) 537 goto bad; 538 } 539 540 /* Skip over any unrecognized fields */ 541 *p = end; 542 return 0; 543 bad: 544 return -EIO; 545 } 546 547 /* 548 * parse extra results 549 */ 550 static int parse_reply_info_extra(void **p, void *end, 551 struct ceph_mds_reply_info_parsed *info, 552 u64 features, struct ceph_mds_session *s) 553 { 554 u32 op = le32_to_cpu(info->head->op); 555 556 if (op == CEPH_MDS_OP_GETFILELOCK) 557 return parse_reply_info_filelock(p, end, info, features); 558 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP) 559 return parse_reply_info_readdir(p, end, info, features); 560 else if (op == CEPH_MDS_OP_CREATE) 561 return parse_reply_info_create(p, end, info, features, s); 562 else 563 return -EIO; 564 } 565 566 /* 567 * parse entire mds reply 568 */ 569 static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg, 570 struct ceph_mds_reply_info_parsed *info, 571 u64 features) 572 { 573 void *p, *end; 574 u32 len; 575 int err; 576 577 info->head = msg->front.iov_base; 578 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); 579 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head); 580 581 /* trace */ 582 ceph_decode_32_safe(&p, end, len, bad); 583 if (len > 0) { 584 ceph_decode_need(&p, end, len, bad); 585 err = parse_reply_info_trace(&p, p+len, info, features); 586 if (err < 0) 587 goto out_bad; 588 } 589 590 /* extra */ 591 ceph_decode_32_safe(&p, end, len, bad); 592 if (len > 0) { 593 ceph_decode_need(&p, end, len, bad); 594 err = parse_reply_info_extra(&p, p+len, info, features, s); 595 if (err < 0) 596 goto out_bad; 597 } 598 599 /* snap blob */ 600 ceph_decode_32_safe(&p, end, len, bad); 601 info->snapblob_len = len; 602 info->snapblob = p; 603 p += len; 604 605 if (p != end) 606 goto bad; 607 return 0; 608 609 bad: 610 err = -EIO; 611 out_bad: 612 pr_err("mds parse_reply err %d\n", err); 613 return err; 614 } 615 616 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 617 { 618 if (!info->dir_entries) 619 return; 620 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size)); 621 } 622 623 624 /* 625 * sessions 626 */ 627 const char *ceph_session_state_name(int s) 628 { 629 switch (s) { 630 case CEPH_MDS_SESSION_NEW: return "new"; 631 case CEPH_MDS_SESSION_OPENING: return "opening"; 632 case CEPH_MDS_SESSION_OPEN: return "open"; 633 case CEPH_MDS_SESSION_HUNG: return "hung"; 634 case CEPH_MDS_SESSION_CLOSING: return "closing"; 635 case CEPH_MDS_SESSION_CLOSED: return "closed"; 636 case CEPH_MDS_SESSION_RESTARTING: return "restarting"; 637 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; 638 case CEPH_MDS_SESSION_REJECTED: return "rejected"; 639 default: return "???"; 640 } 641 } 642 643 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s) 644 { 645 if (refcount_inc_not_zero(&s->s_ref)) { 646 dout("mdsc get_session %p %d -> %d\n", s, 647 refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref)); 648 return s; 649 } else { 650 dout("mdsc get_session %p 0 -- FAIL\n", s); 651 return NULL; 652 } 653 } 654 655 void ceph_put_mds_session(struct ceph_mds_session *s) 656 { 657 dout("mdsc put_session %p %d -> %d\n", s, 658 refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1); 659 if (refcount_dec_and_test(&s->s_ref)) { 660 if (s->s_auth.authorizer) 661 ceph_auth_destroy_authorizer(s->s_auth.authorizer); 662 WARN_ON(mutex_is_locked(&s->s_mutex)); 663 xa_destroy(&s->s_delegated_inos); 664 kfree(s); 665 } 666 } 667 668 /* 669 * called under mdsc->mutex 670 */ 671 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, 672 int mds) 673 { 674 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 675 return NULL; 676 return ceph_get_mds_session(mdsc->sessions[mds]); 677 } 678 679 static bool __have_session(struct ceph_mds_client *mdsc, int mds) 680 { 681 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 682 return false; 683 else 684 return true; 685 } 686 687 static int __verify_registered_session(struct ceph_mds_client *mdsc, 688 struct ceph_mds_session *s) 689 { 690 if (s->s_mds >= mdsc->max_sessions || 691 mdsc->sessions[s->s_mds] != s) 692 return -ENOENT; 693 return 0; 694 } 695 696 /* 697 * create+register a new session for given mds. 698 * called under mdsc->mutex. 699 */ 700 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, 701 int mds) 702 { 703 struct ceph_mds_session *s; 704 705 if (mds >= mdsc->mdsmap->possible_max_rank) 706 return ERR_PTR(-EINVAL); 707 708 s = kzalloc(sizeof(*s), GFP_NOFS); 709 if (!s) 710 return ERR_PTR(-ENOMEM); 711 712 if (mds >= mdsc->max_sessions) { 713 int newmax = 1 << get_count_order(mds + 1); 714 struct ceph_mds_session **sa; 715 716 dout("%s: realloc to %d\n", __func__, newmax); 717 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS); 718 if (!sa) 719 goto fail_realloc; 720 if (mdsc->sessions) { 721 memcpy(sa, mdsc->sessions, 722 mdsc->max_sessions * sizeof(void *)); 723 kfree(mdsc->sessions); 724 } 725 mdsc->sessions = sa; 726 mdsc->max_sessions = newmax; 727 } 728 729 dout("%s: mds%d\n", __func__, mds); 730 s->s_mdsc = mdsc; 731 s->s_mds = mds; 732 s->s_state = CEPH_MDS_SESSION_NEW; 733 s->s_ttl = 0; 734 s->s_seq = 0; 735 mutex_init(&s->s_mutex); 736 737 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr); 738 739 spin_lock_init(&s->s_gen_ttl_lock); 740 s->s_cap_gen = 1; 741 s->s_cap_ttl = jiffies - 1; 742 743 spin_lock_init(&s->s_cap_lock); 744 s->s_renew_requested = 0; 745 s->s_renew_seq = 0; 746 INIT_LIST_HEAD(&s->s_caps); 747 s->s_nr_caps = 0; 748 refcount_set(&s->s_ref, 1); 749 INIT_LIST_HEAD(&s->s_waiting); 750 INIT_LIST_HEAD(&s->s_unsafe); 751 xa_init(&s->s_delegated_inos); 752 s->s_num_cap_releases = 0; 753 s->s_cap_reconnect = 0; 754 s->s_cap_iterator = NULL; 755 INIT_LIST_HEAD(&s->s_cap_releases); 756 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work); 757 758 INIT_LIST_HEAD(&s->s_cap_dirty); 759 INIT_LIST_HEAD(&s->s_cap_flushing); 760 761 mdsc->sessions[mds] = s; 762 atomic_inc(&mdsc->num_sessions); 763 refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 764 765 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, 766 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 767 768 return s; 769 770 fail_realloc: 771 kfree(s); 772 return ERR_PTR(-ENOMEM); 773 } 774 775 /* 776 * called under mdsc->mutex 777 */ 778 static void __unregister_session(struct ceph_mds_client *mdsc, 779 struct ceph_mds_session *s) 780 { 781 dout("__unregister_session mds%d %p\n", s->s_mds, s); 782 BUG_ON(mdsc->sessions[s->s_mds] != s); 783 mdsc->sessions[s->s_mds] = NULL; 784 ceph_con_close(&s->s_con); 785 ceph_put_mds_session(s); 786 atomic_dec(&mdsc->num_sessions); 787 } 788 789 /* 790 * drop session refs in request. 791 * 792 * should be last request ref, or hold mdsc->mutex 793 */ 794 static void put_request_session(struct ceph_mds_request *req) 795 { 796 if (req->r_session) { 797 ceph_put_mds_session(req->r_session); 798 req->r_session = NULL; 799 } 800 } 801 802 void ceph_mdsc_release_request(struct kref *kref) 803 { 804 struct ceph_mds_request *req = container_of(kref, 805 struct ceph_mds_request, 806 r_kref); 807 ceph_mdsc_release_dir_caps_no_check(req); 808 destroy_reply_info(&req->r_reply_info); 809 if (req->r_request) 810 ceph_msg_put(req->r_request); 811 if (req->r_reply) 812 ceph_msg_put(req->r_reply); 813 if (req->r_inode) { 814 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 815 /* avoid calling iput_final() in mds dispatch threads */ 816 ceph_async_iput(req->r_inode); 817 } 818 if (req->r_parent) { 819 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 820 ceph_async_iput(req->r_parent); 821 } 822 ceph_async_iput(req->r_target_inode); 823 if (req->r_dentry) 824 dput(req->r_dentry); 825 if (req->r_old_dentry) 826 dput(req->r_old_dentry); 827 if (req->r_old_dentry_dir) { 828 /* 829 * track (and drop pins for) r_old_dentry_dir 830 * separately, since r_old_dentry's d_parent may have 831 * changed between the dir mutex being dropped and 832 * this request being freed. 833 */ 834 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), 835 CEPH_CAP_PIN); 836 ceph_async_iput(req->r_old_dentry_dir); 837 } 838 kfree(req->r_path1); 839 kfree(req->r_path2); 840 if (req->r_pagelist) 841 ceph_pagelist_release(req->r_pagelist); 842 put_request_session(req); 843 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); 844 WARN_ON_ONCE(!list_empty(&req->r_wait)); 845 kmem_cache_free(ceph_mds_request_cachep, req); 846 } 847 848 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node) 849 850 /* 851 * lookup session, bump ref if found. 852 * 853 * called under mdsc->mutex. 854 */ 855 static struct ceph_mds_request * 856 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid) 857 { 858 struct ceph_mds_request *req; 859 860 req = lookup_request(&mdsc->request_tree, tid); 861 if (req) 862 ceph_mdsc_get_request(req); 863 864 return req; 865 } 866 867 /* 868 * Register an in-flight request, and assign a tid. Link to directory 869 * are modifying (if any). 870 * 871 * Called under mdsc->mutex. 872 */ 873 static void __register_request(struct ceph_mds_client *mdsc, 874 struct ceph_mds_request *req, 875 struct inode *dir) 876 { 877 int ret = 0; 878 879 req->r_tid = ++mdsc->last_tid; 880 if (req->r_num_caps) { 881 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation, 882 req->r_num_caps); 883 if (ret < 0) { 884 pr_err("__register_request %p " 885 "failed to reserve caps: %d\n", req, ret); 886 /* set req->r_err to fail early from __do_request */ 887 req->r_err = ret; 888 return; 889 } 890 } 891 dout("__register_request %p tid %lld\n", req, req->r_tid); 892 ceph_mdsc_get_request(req); 893 insert_request(&mdsc->request_tree, req); 894 895 req->r_uid = current_fsuid(); 896 req->r_gid = current_fsgid(); 897 898 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK) 899 mdsc->oldest_tid = req->r_tid; 900 901 if (dir) { 902 struct ceph_inode_info *ci = ceph_inode(dir); 903 904 ihold(dir); 905 req->r_unsafe_dir = dir; 906 spin_lock(&ci->i_unsafe_lock); 907 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); 908 spin_unlock(&ci->i_unsafe_lock); 909 } 910 } 911 912 static void __unregister_request(struct ceph_mds_client *mdsc, 913 struct ceph_mds_request *req) 914 { 915 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 916 917 /* Never leave an unregistered request on an unsafe list! */ 918 list_del_init(&req->r_unsafe_item); 919 920 if (req->r_tid == mdsc->oldest_tid) { 921 struct rb_node *p = rb_next(&req->r_node); 922 mdsc->oldest_tid = 0; 923 while (p) { 924 struct ceph_mds_request *next_req = 925 rb_entry(p, struct ceph_mds_request, r_node); 926 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) { 927 mdsc->oldest_tid = next_req->r_tid; 928 break; 929 } 930 p = rb_next(p); 931 } 932 } 933 934 erase_request(&mdsc->request_tree, req); 935 936 if (req->r_unsafe_dir) { 937 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 938 spin_lock(&ci->i_unsafe_lock); 939 list_del_init(&req->r_unsafe_dir_item); 940 spin_unlock(&ci->i_unsafe_lock); 941 } 942 if (req->r_target_inode && 943 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 944 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); 945 spin_lock(&ci->i_unsafe_lock); 946 list_del_init(&req->r_unsafe_target_item); 947 spin_unlock(&ci->i_unsafe_lock); 948 } 949 950 if (req->r_unsafe_dir) { 951 /* avoid calling iput_final() in mds dispatch threads */ 952 ceph_async_iput(req->r_unsafe_dir); 953 req->r_unsafe_dir = NULL; 954 } 955 956 complete_all(&req->r_safe_completion); 957 958 ceph_mdsc_put_request(req); 959 } 960 961 /* 962 * Walk back up the dentry tree until we hit a dentry representing a 963 * non-snapshot inode. We do this using the rcu_read_lock (which must be held 964 * when calling this) to ensure that the objects won't disappear while we're 965 * working with them. Once we hit a candidate dentry, we attempt to take a 966 * reference to it, and return that as the result. 967 */ 968 static struct inode *get_nonsnap_parent(struct dentry *dentry) 969 { 970 struct inode *inode = NULL; 971 972 while (dentry && !IS_ROOT(dentry)) { 973 inode = d_inode_rcu(dentry); 974 if (!inode || ceph_snap(inode) == CEPH_NOSNAP) 975 break; 976 dentry = dentry->d_parent; 977 } 978 if (inode) 979 inode = igrab(inode); 980 return inode; 981 } 982 983 /* 984 * Choose mds to send request to next. If there is a hint set in the 985 * request (e.g., due to a prior forward hint from the mds), use that. 986 * Otherwise, consult frag tree and/or caps to identify the 987 * appropriate mds. If all else fails, choose randomly. 988 * 989 * Called under mdsc->mutex. 990 */ 991 static int __choose_mds(struct ceph_mds_client *mdsc, 992 struct ceph_mds_request *req, 993 bool *random) 994 { 995 struct inode *inode; 996 struct ceph_inode_info *ci; 997 struct ceph_cap *cap; 998 int mode = req->r_direct_mode; 999 int mds = -1; 1000 u32 hash = req->r_direct_hash; 1001 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); 1002 1003 if (random) 1004 *random = false; 1005 1006 /* 1007 * is there a specific mds we should try? ignore hint if we have 1008 * no session and the mds is not up (active or recovering). 1009 */ 1010 if (req->r_resend_mds >= 0 && 1011 (__have_session(mdsc, req->r_resend_mds) || 1012 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) { 1013 dout("%s using resend_mds mds%d\n", __func__, 1014 req->r_resend_mds); 1015 return req->r_resend_mds; 1016 } 1017 1018 if (mode == USE_RANDOM_MDS) 1019 goto random; 1020 1021 inode = NULL; 1022 if (req->r_inode) { 1023 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) { 1024 inode = req->r_inode; 1025 ihold(inode); 1026 } else { 1027 /* req->r_dentry is non-null for LSSNAP request */ 1028 rcu_read_lock(); 1029 inode = get_nonsnap_parent(req->r_dentry); 1030 rcu_read_unlock(); 1031 dout("%s using snapdir's parent %p\n", __func__, inode); 1032 } 1033 } else if (req->r_dentry) { 1034 /* ignore race with rename; old or new d_parent is okay */ 1035 struct dentry *parent; 1036 struct inode *dir; 1037 1038 rcu_read_lock(); 1039 parent = READ_ONCE(req->r_dentry->d_parent); 1040 dir = req->r_parent ? : d_inode_rcu(parent); 1041 1042 if (!dir || dir->i_sb != mdsc->fsc->sb) { 1043 /* not this fs or parent went negative */ 1044 inode = d_inode(req->r_dentry); 1045 if (inode) 1046 ihold(inode); 1047 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 1048 /* direct snapped/virtual snapdir requests 1049 * based on parent dir inode */ 1050 inode = get_nonsnap_parent(parent); 1051 dout("%s using nonsnap parent %p\n", __func__, inode); 1052 } else { 1053 /* dentry target */ 1054 inode = d_inode(req->r_dentry); 1055 if (!inode || mode == USE_AUTH_MDS) { 1056 /* dir + name */ 1057 inode = igrab(dir); 1058 hash = ceph_dentry_hash(dir, req->r_dentry); 1059 is_hash = true; 1060 } else { 1061 ihold(inode); 1062 } 1063 } 1064 rcu_read_unlock(); 1065 } 1066 1067 dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash, 1068 hash, mode); 1069 if (!inode) 1070 goto random; 1071 ci = ceph_inode(inode); 1072 1073 if (is_hash && S_ISDIR(inode->i_mode)) { 1074 struct ceph_inode_frag frag; 1075 int found; 1076 1077 ceph_choose_frag(ci, hash, &frag, &found); 1078 if (found) { 1079 if (mode == USE_ANY_MDS && frag.ndist > 0) { 1080 u8 r; 1081 1082 /* choose a random replica */ 1083 get_random_bytes(&r, 1); 1084 r %= frag.ndist; 1085 mds = frag.dist[r]; 1086 dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n", 1087 __func__, inode, ceph_vinop(inode), 1088 frag.frag, mds, (int)r, frag.ndist); 1089 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1090 CEPH_MDS_STATE_ACTIVE && 1091 !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds)) 1092 goto out; 1093 } 1094 1095 /* since this file/dir wasn't known to be 1096 * replicated, then we want to look for the 1097 * authoritative mds. */ 1098 if (frag.mds >= 0) { 1099 /* choose auth mds */ 1100 mds = frag.mds; 1101 dout("%s %p %llx.%llx frag %u mds%d (auth)\n", 1102 __func__, inode, ceph_vinop(inode), 1103 frag.frag, mds); 1104 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1105 CEPH_MDS_STATE_ACTIVE) { 1106 if (mode == USE_ANY_MDS && 1107 !ceph_mdsmap_is_laggy(mdsc->mdsmap, 1108 mds)) 1109 goto out; 1110 } 1111 } 1112 mode = USE_AUTH_MDS; 1113 } 1114 } 1115 1116 spin_lock(&ci->i_ceph_lock); 1117 cap = NULL; 1118 if (mode == USE_AUTH_MDS) 1119 cap = ci->i_auth_cap; 1120 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) 1121 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); 1122 if (!cap) { 1123 spin_unlock(&ci->i_ceph_lock); 1124 ceph_async_iput(inode); 1125 goto random; 1126 } 1127 mds = cap->session->s_mds; 1128 dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__, 1129 inode, ceph_vinop(inode), mds, 1130 cap == ci->i_auth_cap ? "auth " : "", cap); 1131 spin_unlock(&ci->i_ceph_lock); 1132 out: 1133 /* avoid calling iput_final() while holding mdsc->mutex or 1134 * in mds dispatch threads */ 1135 ceph_async_iput(inode); 1136 return mds; 1137 1138 random: 1139 if (random) 1140 *random = true; 1141 1142 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap); 1143 dout("%s chose random mds%d\n", __func__, mds); 1144 return mds; 1145 } 1146 1147 1148 /* 1149 * session messages 1150 */ 1151 static struct ceph_msg *create_session_msg(u32 op, u64 seq) 1152 { 1153 struct ceph_msg *msg; 1154 struct ceph_mds_session_head *h; 1155 1156 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS, 1157 false); 1158 if (!msg) { 1159 pr_err("create_session_msg ENOMEM creating msg\n"); 1160 return NULL; 1161 } 1162 h = msg->front.iov_base; 1163 h->op = cpu_to_le32(op); 1164 h->seq = cpu_to_le64(seq); 1165 1166 return msg; 1167 } 1168 1169 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED; 1170 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8) 1171 static int encode_supported_features(void **p, void *end) 1172 { 1173 static const size_t count = ARRAY_SIZE(feature_bits); 1174 1175 if (count > 0) { 1176 size_t i; 1177 size_t size = FEATURE_BYTES(count); 1178 1179 if (WARN_ON_ONCE(*p + 4 + size > end)) 1180 return -ERANGE; 1181 1182 ceph_encode_32(p, size); 1183 memset(*p, 0, size); 1184 for (i = 0; i < count; i++) 1185 ((unsigned char*)(*p))[i / 8] |= BIT(feature_bits[i] % 8); 1186 *p += size; 1187 } else { 1188 if (WARN_ON_ONCE(*p + 4 > end)) 1189 return -ERANGE; 1190 1191 ceph_encode_32(p, 0); 1192 } 1193 1194 return 0; 1195 } 1196 1197 /* 1198 * session message, specialization for CEPH_SESSION_REQUEST_OPEN 1199 * to include additional client metadata fields. 1200 */ 1201 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq) 1202 { 1203 struct ceph_msg *msg; 1204 struct ceph_mds_session_head *h; 1205 int i = -1; 1206 int extra_bytes = 0; 1207 int metadata_key_count = 0; 1208 struct ceph_options *opt = mdsc->fsc->client->options; 1209 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options; 1210 size_t size, count; 1211 void *p, *end; 1212 int ret; 1213 1214 const char* metadata[][2] = { 1215 {"hostname", mdsc->nodename}, 1216 {"kernel_version", init_utsname()->release}, 1217 {"entity_id", opt->name ? : ""}, 1218 {"root", fsopt->server_path ? : "/"}, 1219 {NULL, NULL} 1220 }; 1221 1222 /* Calculate serialized length of metadata */ 1223 extra_bytes = 4; /* map length */ 1224 for (i = 0; metadata[i][0]; ++i) { 1225 extra_bytes += 8 + strlen(metadata[i][0]) + 1226 strlen(metadata[i][1]); 1227 metadata_key_count++; 1228 } 1229 1230 /* supported feature */ 1231 size = 0; 1232 count = ARRAY_SIZE(feature_bits); 1233 if (count > 0) 1234 size = FEATURE_BYTES(count); 1235 extra_bytes += 4 + size; 1236 1237 /* Allocate the message */ 1238 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes, 1239 GFP_NOFS, false); 1240 if (!msg) { 1241 pr_err("create_session_msg ENOMEM creating msg\n"); 1242 return ERR_PTR(-ENOMEM); 1243 } 1244 p = msg->front.iov_base; 1245 end = p + msg->front.iov_len; 1246 1247 h = p; 1248 h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN); 1249 h->seq = cpu_to_le64(seq); 1250 1251 /* 1252 * Serialize client metadata into waiting buffer space, using 1253 * the format that userspace expects for map<string, string> 1254 * 1255 * ClientSession messages with metadata are v3 1256 */ 1257 msg->hdr.version = cpu_to_le16(3); 1258 msg->hdr.compat_version = cpu_to_le16(1); 1259 1260 /* The write pointer, following the session_head structure */ 1261 p += sizeof(*h); 1262 1263 /* Number of entries in the map */ 1264 ceph_encode_32(&p, metadata_key_count); 1265 1266 /* Two length-prefixed strings for each entry in the map */ 1267 for (i = 0; metadata[i][0]; ++i) { 1268 size_t const key_len = strlen(metadata[i][0]); 1269 size_t const val_len = strlen(metadata[i][1]); 1270 1271 ceph_encode_32(&p, key_len); 1272 memcpy(p, metadata[i][0], key_len); 1273 p += key_len; 1274 ceph_encode_32(&p, val_len); 1275 memcpy(p, metadata[i][1], val_len); 1276 p += val_len; 1277 } 1278 1279 ret = encode_supported_features(&p, end); 1280 if (ret) { 1281 pr_err("encode_supported_features failed!\n"); 1282 ceph_msg_put(msg); 1283 return ERR_PTR(ret); 1284 } 1285 1286 msg->front.iov_len = p - msg->front.iov_base; 1287 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1288 1289 return msg; 1290 } 1291 1292 /* 1293 * send session open request. 1294 * 1295 * called under mdsc->mutex 1296 */ 1297 static int __open_session(struct ceph_mds_client *mdsc, 1298 struct ceph_mds_session *session) 1299 { 1300 struct ceph_msg *msg; 1301 int mstate; 1302 int mds = session->s_mds; 1303 1304 /* wait for mds to go active? */ 1305 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 1306 dout("open_session to mds%d (%s)\n", mds, 1307 ceph_mds_state_name(mstate)); 1308 session->s_state = CEPH_MDS_SESSION_OPENING; 1309 session->s_renew_requested = jiffies; 1310 1311 /* send connect message */ 1312 msg = create_session_open_msg(mdsc, session->s_seq); 1313 if (IS_ERR(msg)) 1314 return PTR_ERR(msg); 1315 ceph_con_send(&session->s_con, msg); 1316 return 0; 1317 } 1318 1319 /* 1320 * open sessions for any export targets for the given mds 1321 * 1322 * called under mdsc->mutex 1323 */ 1324 static struct ceph_mds_session * 1325 __open_export_target_session(struct ceph_mds_client *mdsc, int target) 1326 { 1327 struct ceph_mds_session *session; 1328 int ret; 1329 1330 session = __ceph_lookup_mds_session(mdsc, target); 1331 if (!session) { 1332 session = register_session(mdsc, target); 1333 if (IS_ERR(session)) 1334 return session; 1335 } 1336 if (session->s_state == CEPH_MDS_SESSION_NEW || 1337 session->s_state == CEPH_MDS_SESSION_CLOSING) { 1338 ret = __open_session(mdsc, session); 1339 if (ret) 1340 return ERR_PTR(ret); 1341 } 1342 1343 return session; 1344 } 1345 1346 struct ceph_mds_session * 1347 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target) 1348 { 1349 struct ceph_mds_session *session; 1350 1351 dout("open_export_target_session to mds%d\n", target); 1352 1353 mutex_lock(&mdsc->mutex); 1354 session = __open_export_target_session(mdsc, target); 1355 mutex_unlock(&mdsc->mutex); 1356 1357 return session; 1358 } 1359 1360 static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 1361 struct ceph_mds_session *session) 1362 { 1363 struct ceph_mds_info *mi; 1364 struct ceph_mds_session *ts; 1365 int i, mds = session->s_mds; 1366 1367 if (mds >= mdsc->mdsmap->possible_max_rank) 1368 return; 1369 1370 mi = &mdsc->mdsmap->m_info[mds]; 1371 dout("open_export_target_sessions for mds%d (%d targets)\n", 1372 session->s_mds, mi->num_export_targets); 1373 1374 for (i = 0; i < mi->num_export_targets; i++) { 1375 ts = __open_export_target_session(mdsc, mi->export_targets[i]); 1376 if (!IS_ERR(ts)) 1377 ceph_put_mds_session(ts); 1378 } 1379 } 1380 1381 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, 1382 struct ceph_mds_session *session) 1383 { 1384 mutex_lock(&mdsc->mutex); 1385 __open_export_target_sessions(mdsc, session); 1386 mutex_unlock(&mdsc->mutex); 1387 } 1388 1389 /* 1390 * session caps 1391 */ 1392 1393 static void detach_cap_releases(struct ceph_mds_session *session, 1394 struct list_head *target) 1395 { 1396 lockdep_assert_held(&session->s_cap_lock); 1397 1398 list_splice_init(&session->s_cap_releases, target); 1399 session->s_num_cap_releases = 0; 1400 dout("dispose_cap_releases mds%d\n", session->s_mds); 1401 } 1402 1403 static void dispose_cap_releases(struct ceph_mds_client *mdsc, 1404 struct list_head *dispose) 1405 { 1406 while (!list_empty(dispose)) { 1407 struct ceph_cap *cap; 1408 /* zero out the in-progress message */ 1409 cap = list_first_entry(dispose, struct ceph_cap, session_caps); 1410 list_del(&cap->session_caps); 1411 ceph_put_cap(mdsc, cap); 1412 } 1413 } 1414 1415 static void cleanup_session_requests(struct ceph_mds_client *mdsc, 1416 struct ceph_mds_session *session) 1417 { 1418 struct ceph_mds_request *req; 1419 struct rb_node *p; 1420 struct ceph_inode_info *ci; 1421 1422 dout("cleanup_session_requests mds%d\n", session->s_mds); 1423 mutex_lock(&mdsc->mutex); 1424 while (!list_empty(&session->s_unsafe)) { 1425 req = list_first_entry(&session->s_unsafe, 1426 struct ceph_mds_request, r_unsafe_item); 1427 pr_warn_ratelimited(" dropping unsafe request %llu\n", 1428 req->r_tid); 1429 if (req->r_target_inode) { 1430 /* dropping unsafe change of inode's attributes */ 1431 ci = ceph_inode(req->r_target_inode); 1432 errseq_set(&ci->i_meta_err, -EIO); 1433 } 1434 if (req->r_unsafe_dir) { 1435 /* dropping unsafe directory operation */ 1436 ci = ceph_inode(req->r_unsafe_dir); 1437 errseq_set(&ci->i_meta_err, -EIO); 1438 } 1439 __unregister_request(mdsc, req); 1440 } 1441 /* zero r_attempts, so kick_requests() will re-send requests */ 1442 p = rb_first(&mdsc->request_tree); 1443 while (p) { 1444 req = rb_entry(p, struct ceph_mds_request, r_node); 1445 p = rb_next(p); 1446 if (req->r_session && 1447 req->r_session->s_mds == session->s_mds) 1448 req->r_attempts = 0; 1449 } 1450 mutex_unlock(&mdsc->mutex); 1451 } 1452 1453 /* 1454 * Helper to safely iterate over all caps associated with a session, with 1455 * special care taken to handle a racing __ceph_remove_cap(). 1456 * 1457 * Caller must hold session s_mutex. 1458 */ 1459 int ceph_iterate_session_caps(struct ceph_mds_session *session, 1460 int (*cb)(struct inode *, struct ceph_cap *, 1461 void *), void *arg) 1462 { 1463 struct list_head *p; 1464 struct ceph_cap *cap; 1465 struct inode *inode, *last_inode = NULL; 1466 struct ceph_cap *old_cap = NULL; 1467 int ret; 1468 1469 dout("iterate_session_caps %p mds%d\n", session, session->s_mds); 1470 spin_lock(&session->s_cap_lock); 1471 p = session->s_caps.next; 1472 while (p != &session->s_caps) { 1473 cap = list_entry(p, struct ceph_cap, session_caps); 1474 inode = igrab(&cap->ci->vfs_inode); 1475 if (!inode) { 1476 p = p->next; 1477 continue; 1478 } 1479 session->s_cap_iterator = cap; 1480 spin_unlock(&session->s_cap_lock); 1481 1482 if (last_inode) { 1483 /* avoid calling iput_final() while holding 1484 * s_mutex or in mds dispatch threads */ 1485 ceph_async_iput(last_inode); 1486 last_inode = NULL; 1487 } 1488 if (old_cap) { 1489 ceph_put_cap(session->s_mdsc, old_cap); 1490 old_cap = NULL; 1491 } 1492 1493 ret = cb(inode, cap, arg); 1494 last_inode = inode; 1495 1496 spin_lock(&session->s_cap_lock); 1497 p = p->next; 1498 if (!cap->ci) { 1499 dout("iterate_session_caps finishing cap %p removal\n", 1500 cap); 1501 BUG_ON(cap->session != session); 1502 cap->session = NULL; 1503 list_del_init(&cap->session_caps); 1504 session->s_nr_caps--; 1505 atomic64_dec(&session->s_mdsc->metric.total_caps); 1506 if (cap->queue_release) 1507 __ceph_queue_cap_release(session, cap); 1508 else 1509 old_cap = cap; /* put_cap it w/o locks held */ 1510 } 1511 if (ret < 0) 1512 goto out; 1513 } 1514 ret = 0; 1515 out: 1516 session->s_cap_iterator = NULL; 1517 spin_unlock(&session->s_cap_lock); 1518 1519 ceph_async_iput(last_inode); 1520 if (old_cap) 1521 ceph_put_cap(session->s_mdsc, old_cap); 1522 1523 return ret; 1524 } 1525 1526 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, 1527 void *arg) 1528 { 1529 struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg; 1530 struct ceph_inode_info *ci = ceph_inode(inode); 1531 LIST_HEAD(to_remove); 1532 bool dirty_dropped = false; 1533 bool invalidate = false; 1534 1535 dout("removing cap %p, ci is %p, inode is %p\n", 1536 cap, ci, &ci->vfs_inode); 1537 spin_lock(&ci->i_ceph_lock); 1538 __ceph_remove_cap(cap, false); 1539 if (!ci->i_auth_cap) { 1540 struct ceph_cap_flush *cf; 1541 struct ceph_mds_client *mdsc = fsc->mdsc; 1542 1543 if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 1544 if (inode->i_data.nrpages > 0) 1545 invalidate = true; 1546 if (ci->i_wrbuffer_ref > 0) 1547 mapping_set_error(&inode->i_data, -EIO); 1548 } 1549 1550 while (!list_empty(&ci->i_cap_flush_list)) { 1551 cf = list_first_entry(&ci->i_cap_flush_list, 1552 struct ceph_cap_flush, i_list); 1553 list_move(&cf->i_list, &to_remove); 1554 } 1555 1556 spin_lock(&mdsc->cap_dirty_lock); 1557 1558 list_for_each_entry(cf, &to_remove, i_list) 1559 list_del(&cf->g_list); 1560 1561 if (!list_empty(&ci->i_dirty_item)) { 1562 pr_warn_ratelimited( 1563 " dropping dirty %s state for %p %lld\n", 1564 ceph_cap_string(ci->i_dirty_caps), 1565 inode, ceph_ino(inode)); 1566 ci->i_dirty_caps = 0; 1567 list_del_init(&ci->i_dirty_item); 1568 dirty_dropped = true; 1569 } 1570 if (!list_empty(&ci->i_flushing_item)) { 1571 pr_warn_ratelimited( 1572 " dropping dirty+flushing %s state for %p %lld\n", 1573 ceph_cap_string(ci->i_flushing_caps), 1574 inode, ceph_ino(inode)); 1575 ci->i_flushing_caps = 0; 1576 list_del_init(&ci->i_flushing_item); 1577 mdsc->num_cap_flushing--; 1578 dirty_dropped = true; 1579 } 1580 spin_unlock(&mdsc->cap_dirty_lock); 1581 1582 if (dirty_dropped) { 1583 errseq_set(&ci->i_meta_err, -EIO); 1584 1585 if (ci->i_wrbuffer_ref_head == 0 && 1586 ci->i_wr_ref == 0 && 1587 ci->i_dirty_caps == 0 && 1588 ci->i_flushing_caps == 0) { 1589 ceph_put_snap_context(ci->i_head_snapc); 1590 ci->i_head_snapc = NULL; 1591 } 1592 } 1593 1594 if (atomic_read(&ci->i_filelock_ref) > 0) { 1595 /* make further file lock syscall return -EIO */ 1596 ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK; 1597 pr_warn_ratelimited(" dropping file locks for %p %lld\n", 1598 inode, ceph_ino(inode)); 1599 } 1600 1601 if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) { 1602 list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove); 1603 ci->i_prealloc_cap_flush = NULL; 1604 } 1605 } 1606 spin_unlock(&ci->i_ceph_lock); 1607 while (!list_empty(&to_remove)) { 1608 struct ceph_cap_flush *cf; 1609 cf = list_first_entry(&to_remove, 1610 struct ceph_cap_flush, i_list); 1611 list_del(&cf->i_list); 1612 ceph_free_cap_flush(cf); 1613 } 1614 1615 wake_up_all(&ci->i_cap_wq); 1616 if (invalidate) 1617 ceph_queue_invalidate(inode); 1618 if (dirty_dropped) 1619 iput(inode); 1620 return 0; 1621 } 1622 1623 /* 1624 * caller must hold session s_mutex 1625 */ 1626 static void remove_session_caps(struct ceph_mds_session *session) 1627 { 1628 struct ceph_fs_client *fsc = session->s_mdsc->fsc; 1629 struct super_block *sb = fsc->sb; 1630 LIST_HEAD(dispose); 1631 1632 dout("remove_session_caps on %p\n", session); 1633 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc); 1634 1635 wake_up_all(&fsc->mdsc->cap_flushing_wq); 1636 1637 spin_lock(&session->s_cap_lock); 1638 if (session->s_nr_caps > 0) { 1639 struct inode *inode; 1640 struct ceph_cap *cap, *prev = NULL; 1641 struct ceph_vino vino; 1642 /* 1643 * iterate_session_caps() skips inodes that are being 1644 * deleted, we need to wait until deletions are complete. 1645 * __wait_on_freeing_inode() is designed for the job, 1646 * but it is not exported, so use lookup inode function 1647 * to access it. 1648 */ 1649 while (!list_empty(&session->s_caps)) { 1650 cap = list_entry(session->s_caps.next, 1651 struct ceph_cap, session_caps); 1652 if (cap == prev) 1653 break; 1654 prev = cap; 1655 vino = cap->ci->i_vino; 1656 spin_unlock(&session->s_cap_lock); 1657 1658 inode = ceph_find_inode(sb, vino); 1659 /* avoid calling iput_final() while holding s_mutex */ 1660 ceph_async_iput(inode); 1661 1662 spin_lock(&session->s_cap_lock); 1663 } 1664 } 1665 1666 // drop cap expires and unlock s_cap_lock 1667 detach_cap_releases(session, &dispose); 1668 1669 BUG_ON(session->s_nr_caps > 0); 1670 BUG_ON(!list_empty(&session->s_cap_flushing)); 1671 spin_unlock(&session->s_cap_lock); 1672 dispose_cap_releases(session->s_mdsc, &dispose); 1673 } 1674 1675 enum { 1676 RECONNECT, 1677 RENEWCAPS, 1678 FORCE_RO, 1679 }; 1680 1681 /* 1682 * wake up any threads waiting on this session's caps. if the cap is 1683 * old (didn't get renewed on the client reconnect), remove it now. 1684 * 1685 * caller must hold s_mutex. 1686 */ 1687 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, 1688 void *arg) 1689 { 1690 struct ceph_inode_info *ci = ceph_inode(inode); 1691 unsigned long ev = (unsigned long)arg; 1692 1693 if (ev == RECONNECT) { 1694 spin_lock(&ci->i_ceph_lock); 1695 ci->i_wanted_max_size = 0; 1696 ci->i_requested_max_size = 0; 1697 spin_unlock(&ci->i_ceph_lock); 1698 } else if (ev == RENEWCAPS) { 1699 if (cap->cap_gen < cap->session->s_cap_gen) { 1700 /* mds did not re-issue stale cap */ 1701 spin_lock(&ci->i_ceph_lock); 1702 cap->issued = cap->implemented = CEPH_CAP_PIN; 1703 spin_unlock(&ci->i_ceph_lock); 1704 } 1705 } else if (ev == FORCE_RO) { 1706 } 1707 wake_up_all(&ci->i_cap_wq); 1708 return 0; 1709 } 1710 1711 static void wake_up_session_caps(struct ceph_mds_session *session, int ev) 1712 { 1713 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds); 1714 ceph_iterate_session_caps(session, wake_up_session_cb, 1715 (void *)(unsigned long)ev); 1716 } 1717 1718 /* 1719 * Send periodic message to MDS renewing all currently held caps. The 1720 * ack will reset the expiration for all caps from this session. 1721 * 1722 * caller holds s_mutex 1723 */ 1724 static int send_renew_caps(struct ceph_mds_client *mdsc, 1725 struct ceph_mds_session *session) 1726 { 1727 struct ceph_msg *msg; 1728 int state; 1729 1730 if (time_after_eq(jiffies, session->s_cap_ttl) && 1731 time_after_eq(session->s_cap_ttl, session->s_renew_requested)) 1732 pr_info("mds%d caps stale\n", session->s_mds); 1733 session->s_renew_requested = jiffies; 1734 1735 /* do not try to renew caps until a recovering mds has reconnected 1736 * with its clients. */ 1737 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); 1738 if (state < CEPH_MDS_STATE_RECONNECT) { 1739 dout("send_renew_caps ignoring mds%d (%s)\n", 1740 session->s_mds, ceph_mds_state_name(state)); 1741 return 0; 1742 } 1743 1744 dout("send_renew_caps to mds%d (%s)\n", session->s_mds, 1745 ceph_mds_state_name(state)); 1746 msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, 1747 ++session->s_renew_seq); 1748 if (!msg) 1749 return -ENOMEM; 1750 ceph_con_send(&session->s_con, msg); 1751 return 0; 1752 } 1753 1754 static int send_flushmsg_ack(struct ceph_mds_client *mdsc, 1755 struct ceph_mds_session *session, u64 seq) 1756 { 1757 struct ceph_msg *msg; 1758 1759 dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n", 1760 session->s_mds, ceph_session_state_name(session->s_state), seq); 1761 msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq); 1762 if (!msg) 1763 return -ENOMEM; 1764 ceph_con_send(&session->s_con, msg); 1765 return 0; 1766 } 1767 1768 1769 /* 1770 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 1771 * 1772 * Called under session->s_mutex 1773 */ 1774 static void renewed_caps(struct ceph_mds_client *mdsc, 1775 struct ceph_mds_session *session, int is_renew) 1776 { 1777 int was_stale; 1778 int wake = 0; 1779 1780 spin_lock(&session->s_cap_lock); 1781 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl); 1782 1783 session->s_cap_ttl = session->s_renew_requested + 1784 mdsc->mdsmap->m_session_timeout*HZ; 1785 1786 if (was_stale) { 1787 if (time_before(jiffies, session->s_cap_ttl)) { 1788 pr_info("mds%d caps renewed\n", session->s_mds); 1789 wake = 1; 1790 } else { 1791 pr_info("mds%d caps still stale\n", session->s_mds); 1792 } 1793 } 1794 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n", 1795 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh", 1796 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); 1797 spin_unlock(&session->s_cap_lock); 1798 1799 if (wake) 1800 wake_up_session_caps(session, RENEWCAPS); 1801 } 1802 1803 /* 1804 * send a session close request 1805 */ 1806 static int request_close_session(struct ceph_mds_session *session) 1807 { 1808 struct ceph_msg *msg; 1809 1810 dout("request_close_session mds%d state %s seq %lld\n", 1811 session->s_mds, ceph_session_state_name(session->s_state), 1812 session->s_seq); 1813 msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq); 1814 if (!msg) 1815 return -ENOMEM; 1816 ceph_con_send(&session->s_con, msg); 1817 return 1; 1818 } 1819 1820 /* 1821 * Called with s_mutex held. 1822 */ 1823 static int __close_session(struct ceph_mds_client *mdsc, 1824 struct ceph_mds_session *session) 1825 { 1826 if (session->s_state >= CEPH_MDS_SESSION_CLOSING) 1827 return 0; 1828 session->s_state = CEPH_MDS_SESSION_CLOSING; 1829 return request_close_session(session); 1830 } 1831 1832 static bool drop_negative_children(struct dentry *dentry) 1833 { 1834 struct dentry *child; 1835 bool all_negative = true; 1836 1837 if (!d_is_dir(dentry)) 1838 goto out; 1839 1840 spin_lock(&dentry->d_lock); 1841 list_for_each_entry(child, &dentry->d_subdirs, d_child) { 1842 if (d_really_is_positive(child)) { 1843 all_negative = false; 1844 break; 1845 } 1846 } 1847 spin_unlock(&dentry->d_lock); 1848 1849 if (all_negative) 1850 shrink_dcache_parent(dentry); 1851 out: 1852 return all_negative; 1853 } 1854 1855 /* 1856 * Trim old(er) caps. 1857 * 1858 * Because we can't cache an inode without one or more caps, we do 1859 * this indirectly: if a cap is unused, we prune its aliases, at which 1860 * point the inode will hopefully get dropped to. 1861 * 1862 * Yes, this is a bit sloppy. Our only real goal here is to respond to 1863 * memory pressure from the MDS, though, so it needn't be perfect. 1864 */ 1865 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) 1866 { 1867 int *remaining = arg; 1868 struct ceph_inode_info *ci = ceph_inode(inode); 1869 int used, wanted, oissued, mine; 1870 1871 if (*remaining <= 0) 1872 return -1; 1873 1874 spin_lock(&ci->i_ceph_lock); 1875 mine = cap->issued | cap->implemented; 1876 used = __ceph_caps_used(ci); 1877 wanted = __ceph_caps_file_wanted(ci); 1878 oissued = __ceph_caps_issued_other(ci, cap); 1879 1880 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n", 1881 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), 1882 ceph_cap_string(used), ceph_cap_string(wanted)); 1883 if (cap == ci->i_auth_cap) { 1884 if (ci->i_dirty_caps || ci->i_flushing_caps || 1885 !list_empty(&ci->i_cap_snaps)) 1886 goto out; 1887 if ((used | wanted) & CEPH_CAP_ANY_WR) 1888 goto out; 1889 /* Note: it's possible that i_filelock_ref becomes non-zero 1890 * after dropping auth caps. It doesn't hurt because reply 1891 * of lock mds request will re-add auth caps. */ 1892 if (atomic_read(&ci->i_filelock_ref) > 0) 1893 goto out; 1894 } 1895 /* The inode has cached pages, but it's no longer used. 1896 * we can safely drop it */ 1897 if (S_ISREG(inode->i_mode) && 1898 wanted == 0 && used == CEPH_CAP_FILE_CACHE && 1899 !(oissued & CEPH_CAP_FILE_CACHE)) { 1900 used = 0; 1901 oissued = 0; 1902 } 1903 if ((used | wanted) & ~oissued & mine) 1904 goto out; /* we need these caps */ 1905 1906 if (oissued) { 1907 /* we aren't the only cap.. just remove us */ 1908 __ceph_remove_cap(cap, true); 1909 (*remaining)--; 1910 } else { 1911 struct dentry *dentry; 1912 /* try dropping referring dentries */ 1913 spin_unlock(&ci->i_ceph_lock); 1914 dentry = d_find_any_alias(inode); 1915 if (dentry && drop_negative_children(dentry)) { 1916 int count; 1917 dput(dentry); 1918 d_prune_aliases(inode); 1919 count = atomic_read(&inode->i_count); 1920 if (count == 1) 1921 (*remaining)--; 1922 dout("trim_caps_cb %p cap %p pruned, count now %d\n", 1923 inode, cap, count); 1924 } else { 1925 dput(dentry); 1926 } 1927 return 0; 1928 } 1929 1930 out: 1931 spin_unlock(&ci->i_ceph_lock); 1932 return 0; 1933 } 1934 1935 /* 1936 * Trim session cap count down to some max number. 1937 */ 1938 int ceph_trim_caps(struct ceph_mds_client *mdsc, 1939 struct ceph_mds_session *session, 1940 int max_caps) 1941 { 1942 int trim_caps = session->s_nr_caps - max_caps; 1943 1944 dout("trim_caps mds%d start: %d / %d, trim %d\n", 1945 session->s_mds, session->s_nr_caps, max_caps, trim_caps); 1946 if (trim_caps > 0) { 1947 int remaining = trim_caps; 1948 1949 ceph_iterate_session_caps(session, trim_caps_cb, &remaining); 1950 dout("trim_caps mds%d done: %d / %d, trimmed %d\n", 1951 session->s_mds, session->s_nr_caps, max_caps, 1952 trim_caps - remaining); 1953 } 1954 1955 ceph_flush_cap_releases(mdsc, session); 1956 return 0; 1957 } 1958 1959 static int check_caps_flush(struct ceph_mds_client *mdsc, 1960 u64 want_flush_tid) 1961 { 1962 int ret = 1; 1963 1964 spin_lock(&mdsc->cap_dirty_lock); 1965 if (!list_empty(&mdsc->cap_flush_list)) { 1966 struct ceph_cap_flush *cf = 1967 list_first_entry(&mdsc->cap_flush_list, 1968 struct ceph_cap_flush, g_list); 1969 if (cf->tid <= want_flush_tid) { 1970 dout("check_caps_flush still flushing tid " 1971 "%llu <= %llu\n", cf->tid, want_flush_tid); 1972 ret = 0; 1973 } 1974 } 1975 spin_unlock(&mdsc->cap_dirty_lock); 1976 return ret; 1977 } 1978 1979 /* 1980 * flush all dirty inode data to disk. 1981 * 1982 * returns true if we've flushed through want_flush_tid 1983 */ 1984 static void wait_caps_flush(struct ceph_mds_client *mdsc, 1985 u64 want_flush_tid) 1986 { 1987 dout("check_caps_flush want %llu\n", want_flush_tid); 1988 1989 wait_event(mdsc->cap_flushing_wq, 1990 check_caps_flush(mdsc, want_flush_tid)); 1991 1992 dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid); 1993 } 1994 1995 /* 1996 * called under s_mutex 1997 */ 1998 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 1999 struct ceph_mds_session *session) 2000 { 2001 struct ceph_msg *msg = NULL; 2002 struct ceph_mds_cap_release *head; 2003 struct ceph_mds_cap_item *item; 2004 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; 2005 struct ceph_cap *cap; 2006 LIST_HEAD(tmp_list); 2007 int num_cap_releases; 2008 __le32 barrier, *cap_barrier; 2009 2010 down_read(&osdc->lock); 2011 barrier = cpu_to_le32(osdc->epoch_barrier); 2012 up_read(&osdc->lock); 2013 2014 spin_lock(&session->s_cap_lock); 2015 again: 2016 list_splice_init(&session->s_cap_releases, &tmp_list); 2017 num_cap_releases = session->s_num_cap_releases; 2018 session->s_num_cap_releases = 0; 2019 spin_unlock(&session->s_cap_lock); 2020 2021 while (!list_empty(&tmp_list)) { 2022 if (!msg) { 2023 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, 2024 PAGE_SIZE, GFP_NOFS, false); 2025 if (!msg) 2026 goto out_err; 2027 head = msg->front.iov_base; 2028 head->num = cpu_to_le32(0); 2029 msg->front.iov_len = sizeof(*head); 2030 2031 msg->hdr.version = cpu_to_le16(2); 2032 msg->hdr.compat_version = cpu_to_le16(1); 2033 } 2034 2035 cap = list_first_entry(&tmp_list, struct ceph_cap, 2036 session_caps); 2037 list_del(&cap->session_caps); 2038 num_cap_releases--; 2039 2040 head = msg->front.iov_base; 2041 put_unaligned_le32(get_unaligned_le32(&head->num) + 1, 2042 &head->num); 2043 item = msg->front.iov_base + msg->front.iov_len; 2044 item->ino = cpu_to_le64(cap->cap_ino); 2045 item->cap_id = cpu_to_le64(cap->cap_id); 2046 item->migrate_seq = cpu_to_le32(cap->mseq); 2047 item->seq = cpu_to_le32(cap->issue_seq); 2048 msg->front.iov_len += sizeof(*item); 2049 2050 ceph_put_cap(mdsc, cap); 2051 2052 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { 2053 // Append cap_barrier field 2054 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2055 *cap_barrier = barrier; 2056 msg->front.iov_len += sizeof(*cap_barrier); 2057 2058 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2059 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 2060 ceph_con_send(&session->s_con, msg); 2061 msg = NULL; 2062 } 2063 } 2064 2065 BUG_ON(num_cap_releases != 0); 2066 2067 spin_lock(&session->s_cap_lock); 2068 if (!list_empty(&session->s_cap_releases)) 2069 goto again; 2070 spin_unlock(&session->s_cap_lock); 2071 2072 if (msg) { 2073 // Append cap_barrier field 2074 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2075 *cap_barrier = barrier; 2076 msg->front.iov_len += sizeof(*cap_barrier); 2077 2078 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2079 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 2080 ceph_con_send(&session->s_con, msg); 2081 } 2082 return; 2083 out_err: 2084 pr_err("send_cap_releases mds%d, failed to allocate message\n", 2085 session->s_mds); 2086 spin_lock(&session->s_cap_lock); 2087 list_splice(&tmp_list, &session->s_cap_releases); 2088 session->s_num_cap_releases += num_cap_releases; 2089 spin_unlock(&session->s_cap_lock); 2090 } 2091 2092 static void ceph_cap_release_work(struct work_struct *work) 2093 { 2094 struct ceph_mds_session *session = 2095 container_of(work, struct ceph_mds_session, s_cap_release_work); 2096 2097 mutex_lock(&session->s_mutex); 2098 if (session->s_state == CEPH_MDS_SESSION_OPEN || 2099 session->s_state == CEPH_MDS_SESSION_HUNG) 2100 ceph_send_cap_releases(session->s_mdsc, session); 2101 mutex_unlock(&session->s_mutex); 2102 ceph_put_mds_session(session); 2103 } 2104 2105 void ceph_flush_cap_releases(struct ceph_mds_client *mdsc, 2106 struct ceph_mds_session *session) 2107 { 2108 if (mdsc->stopping) 2109 return; 2110 2111 ceph_get_mds_session(session); 2112 if (queue_work(mdsc->fsc->cap_wq, 2113 &session->s_cap_release_work)) { 2114 dout("cap release work queued\n"); 2115 } else { 2116 ceph_put_mds_session(session); 2117 dout("failed to queue cap release work\n"); 2118 } 2119 } 2120 2121 /* 2122 * caller holds session->s_cap_lock 2123 */ 2124 void __ceph_queue_cap_release(struct ceph_mds_session *session, 2125 struct ceph_cap *cap) 2126 { 2127 list_add_tail(&cap->session_caps, &session->s_cap_releases); 2128 session->s_num_cap_releases++; 2129 2130 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE)) 2131 ceph_flush_cap_releases(session->s_mdsc, session); 2132 } 2133 2134 static void ceph_cap_reclaim_work(struct work_struct *work) 2135 { 2136 struct ceph_mds_client *mdsc = 2137 container_of(work, struct ceph_mds_client, cap_reclaim_work); 2138 int ret = ceph_trim_dentries(mdsc); 2139 if (ret == -EAGAIN) 2140 ceph_queue_cap_reclaim_work(mdsc); 2141 } 2142 2143 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc) 2144 { 2145 if (mdsc->stopping) 2146 return; 2147 2148 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) { 2149 dout("caps reclaim work queued\n"); 2150 } else { 2151 dout("failed to queue caps release work\n"); 2152 } 2153 } 2154 2155 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr) 2156 { 2157 int val; 2158 if (!nr) 2159 return; 2160 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending); 2161 if ((val % CEPH_CAPS_PER_RELEASE) < nr) { 2162 atomic_set(&mdsc->cap_reclaim_pending, 0); 2163 ceph_queue_cap_reclaim_work(mdsc); 2164 } 2165 } 2166 2167 /* 2168 * requests 2169 */ 2170 2171 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, 2172 struct inode *dir) 2173 { 2174 struct ceph_inode_info *ci = ceph_inode(dir); 2175 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; 2176 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options; 2177 size_t size = sizeof(struct ceph_mds_reply_dir_entry); 2178 unsigned int num_entries; 2179 int order; 2180 2181 spin_lock(&ci->i_ceph_lock); 2182 num_entries = ci->i_files + ci->i_subdirs; 2183 spin_unlock(&ci->i_ceph_lock); 2184 num_entries = max(num_entries, 1U); 2185 num_entries = min(num_entries, opt->max_readdir); 2186 2187 order = get_order(size * num_entries); 2188 while (order >= 0) { 2189 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL | 2190 __GFP_NOWARN, 2191 order); 2192 if (rinfo->dir_entries) 2193 break; 2194 order--; 2195 } 2196 if (!rinfo->dir_entries) 2197 return -ENOMEM; 2198 2199 num_entries = (PAGE_SIZE << order) / size; 2200 num_entries = min(num_entries, opt->max_readdir); 2201 2202 rinfo->dir_buf_size = PAGE_SIZE << order; 2203 req->r_num_caps = num_entries + 1; 2204 req->r_args.readdir.max_entries = cpu_to_le32(num_entries); 2205 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes); 2206 return 0; 2207 } 2208 2209 /* 2210 * Create an mds request. 2211 */ 2212 struct ceph_mds_request * 2213 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) 2214 { 2215 struct ceph_mds_request *req; 2216 2217 req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS); 2218 if (!req) 2219 return ERR_PTR(-ENOMEM); 2220 2221 mutex_init(&req->r_fill_mutex); 2222 req->r_mdsc = mdsc; 2223 req->r_started = jiffies; 2224 req->r_start_latency = ktime_get(); 2225 req->r_resend_mds = -1; 2226 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 2227 INIT_LIST_HEAD(&req->r_unsafe_target_item); 2228 req->r_fmode = -1; 2229 kref_init(&req->r_kref); 2230 RB_CLEAR_NODE(&req->r_node); 2231 INIT_LIST_HEAD(&req->r_wait); 2232 init_completion(&req->r_completion); 2233 init_completion(&req->r_safe_completion); 2234 INIT_LIST_HEAD(&req->r_unsafe_item); 2235 2236 ktime_get_coarse_real_ts64(&req->r_stamp); 2237 2238 req->r_op = op; 2239 req->r_direct_mode = mode; 2240 return req; 2241 } 2242 2243 /* 2244 * return oldest (lowest) request, tid in request tree, 0 if none. 2245 * 2246 * called under mdsc->mutex. 2247 */ 2248 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) 2249 { 2250 if (RB_EMPTY_ROOT(&mdsc->request_tree)) 2251 return NULL; 2252 return rb_entry(rb_first(&mdsc->request_tree), 2253 struct ceph_mds_request, r_node); 2254 } 2255 2256 static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) 2257 { 2258 return mdsc->oldest_tid; 2259 } 2260 2261 /* 2262 * Build a dentry's path. Allocate on heap; caller must kfree. Based 2263 * on build_path_from_dentry in fs/cifs/dir.c. 2264 * 2265 * If @stop_on_nosnap, generate path relative to the first non-snapped 2266 * inode. 2267 * 2268 * Encode hidden .snap dirs as a double /, i.e. 2269 * foo/.snap/bar -> foo//bar 2270 */ 2271 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase, 2272 int stop_on_nosnap) 2273 { 2274 struct dentry *temp; 2275 char *path; 2276 int pos; 2277 unsigned seq; 2278 u64 base; 2279 2280 if (!dentry) 2281 return ERR_PTR(-EINVAL); 2282 2283 path = __getname(); 2284 if (!path) 2285 return ERR_PTR(-ENOMEM); 2286 retry: 2287 pos = PATH_MAX - 1; 2288 path[pos] = '\0'; 2289 2290 seq = read_seqbegin(&rename_lock); 2291 rcu_read_lock(); 2292 temp = dentry; 2293 for (;;) { 2294 struct inode *inode; 2295 2296 spin_lock(&temp->d_lock); 2297 inode = d_inode(temp); 2298 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 2299 dout("build_path path+%d: %p SNAPDIR\n", 2300 pos, temp); 2301 } else if (stop_on_nosnap && inode && dentry != temp && 2302 ceph_snap(inode) == CEPH_NOSNAP) { 2303 spin_unlock(&temp->d_lock); 2304 pos++; /* get rid of any prepended '/' */ 2305 break; 2306 } else { 2307 pos -= temp->d_name.len; 2308 if (pos < 0) { 2309 spin_unlock(&temp->d_lock); 2310 break; 2311 } 2312 memcpy(path + pos, temp->d_name.name, temp->d_name.len); 2313 } 2314 spin_unlock(&temp->d_lock); 2315 temp = READ_ONCE(temp->d_parent); 2316 2317 /* Are we at the root? */ 2318 if (IS_ROOT(temp)) 2319 break; 2320 2321 /* Are we out of buffer? */ 2322 if (--pos < 0) 2323 break; 2324 2325 path[pos] = '/'; 2326 } 2327 base = ceph_ino(d_inode(temp)); 2328 rcu_read_unlock(); 2329 2330 if (read_seqretry(&rename_lock, seq)) 2331 goto retry; 2332 2333 if (pos < 0) { 2334 /* 2335 * A rename didn't occur, but somehow we didn't end up where 2336 * we thought we would. Throw a warning and try again. 2337 */ 2338 pr_warn("build_path did not end path lookup where " 2339 "expected, pos is %d\n", pos); 2340 goto retry; 2341 } 2342 2343 *pbase = base; 2344 *plen = PATH_MAX - 1 - pos; 2345 dout("build_path on %p %d built %llx '%.*s'\n", 2346 dentry, d_count(dentry), base, *plen, path + pos); 2347 return path + pos; 2348 } 2349 2350 static int build_dentry_path(struct dentry *dentry, struct inode *dir, 2351 const char **ppath, int *ppathlen, u64 *pino, 2352 bool *pfreepath, bool parent_locked) 2353 { 2354 char *path; 2355 2356 rcu_read_lock(); 2357 if (!dir) 2358 dir = d_inode_rcu(dentry->d_parent); 2359 if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) { 2360 *pino = ceph_ino(dir); 2361 rcu_read_unlock(); 2362 *ppath = dentry->d_name.name; 2363 *ppathlen = dentry->d_name.len; 2364 return 0; 2365 } 2366 rcu_read_unlock(); 2367 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2368 if (IS_ERR(path)) 2369 return PTR_ERR(path); 2370 *ppath = path; 2371 *pfreepath = true; 2372 return 0; 2373 } 2374 2375 static int build_inode_path(struct inode *inode, 2376 const char **ppath, int *ppathlen, u64 *pino, 2377 bool *pfreepath) 2378 { 2379 struct dentry *dentry; 2380 char *path; 2381 2382 if (ceph_snap(inode) == CEPH_NOSNAP) { 2383 *pino = ceph_ino(inode); 2384 *ppathlen = 0; 2385 return 0; 2386 } 2387 dentry = d_find_alias(inode); 2388 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2389 dput(dentry); 2390 if (IS_ERR(path)) 2391 return PTR_ERR(path); 2392 *ppath = path; 2393 *pfreepath = true; 2394 return 0; 2395 } 2396 2397 /* 2398 * request arguments may be specified via an inode *, a dentry *, or 2399 * an explicit ino+path. 2400 */ 2401 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, 2402 struct inode *rdiri, const char *rpath, 2403 u64 rino, const char **ppath, int *pathlen, 2404 u64 *ino, bool *freepath, bool parent_locked) 2405 { 2406 int r = 0; 2407 2408 if (rinode) { 2409 r = build_inode_path(rinode, ppath, pathlen, ino, freepath); 2410 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode), 2411 ceph_snap(rinode)); 2412 } else if (rdentry) { 2413 r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino, 2414 freepath, parent_locked); 2415 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, 2416 *ppath); 2417 } else if (rpath || rino) { 2418 *ino = rino; 2419 *ppath = rpath; 2420 *pathlen = rpath ? strlen(rpath) : 0; 2421 dout(" path %.*s\n", *pathlen, rpath); 2422 } 2423 2424 return r; 2425 } 2426 2427 /* 2428 * called under mdsc->mutex 2429 */ 2430 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, 2431 struct ceph_mds_request *req, 2432 int mds, bool drop_cap_releases) 2433 { 2434 struct ceph_msg *msg; 2435 struct ceph_mds_request_head *head; 2436 const char *path1 = NULL; 2437 const char *path2 = NULL; 2438 u64 ino1 = 0, ino2 = 0; 2439 int pathlen1 = 0, pathlen2 = 0; 2440 bool freepath1 = false, freepath2 = false; 2441 int len; 2442 u16 releases; 2443 void *p, *end; 2444 int ret; 2445 2446 ret = set_request_path_attr(req->r_inode, req->r_dentry, 2447 req->r_parent, req->r_path1, req->r_ino1.ino, 2448 &path1, &pathlen1, &ino1, &freepath1, 2449 test_bit(CEPH_MDS_R_PARENT_LOCKED, 2450 &req->r_req_flags)); 2451 if (ret < 0) { 2452 msg = ERR_PTR(ret); 2453 goto out; 2454 } 2455 2456 /* If r_old_dentry is set, then assume that its parent is locked */ 2457 ret = set_request_path_attr(NULL, req->r_old_dentry, 2458 req->r_old_dentry_dir, 2459 req->r_path2, req->r_ino2.ino, 2460 &path2, &pathlen2, &ino2, &freepath2, true); 2461 if (ret < 0) { 2462 msg = ERR_PTR(ret); 2463 goto out_free1; 2464 } 2465 2466 len = sizeof(*head) + 2467 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) + 2468 sizeof(struct ceph_timespec); 2469 2470 /* calculate (max) length for cap releases */ 2471 len += sizeof(struct ceph_mds_request_release) * 2472 (!!req->r_inode_drop + !!req->r_dentry_drop + 2473 !!req->r_old_inode_drop + !!req->r_old_dentry_drop); 2474 if (req->r_dentry_drop) 2475 len += pathlen1; 2476 if (req->r_old_dentry_drop) 2477 len += pathlen2; 2478 2479 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false); 2480 if (!msg) { 2481 msg = ERR_PTR(-ENOMEM); 2482 goto out_free2; 2483 } 2484 2485 msg->hdr.version = cpu_to_le16(2); 2486 msg->hdr.tid = cpu_to_le64(req->r_tid); 2487 2488 head = msg->front.iov_base; 2489 p = msg->front.iov_base + sizeof(*head); 2490 end = msg->front.iov_base + msg->front.iov_len; 2491 2492 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); 2493 head->op = cpu_to_le32(req->r_op); 2494 head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid)); 2495 head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid)); 2496 head->ino = cpu_to_le64(req->r_deleg_ino); 2497 head->args = req->r_args; 2498 2499 ceph_encode_filepath(&p, end, ino1, path1); 2500 ceph_encode_filepath(&p, end, ino2, path2); 2501 2502 /* make note of release offset, in case we need to replay */ 2503 req->r_request_release_offset = p - msg->front.iov_base; 2504 2505 /* cap releases */ 2506 releases = 0; 2507 if (req->r_inode_drop) 2508 releases += ceph_encode_inode_release(&p, 2509 req->r_inode ? req->r_inode : d_inode(req->r_dentry), 2510 mds, req->r_inode_drop, req->r_inode_unless, 2511 req->r_op == CEPH_MDS_OP_READDIR); 2512 if (req->r_dentry_drop) 2513 releases += ceph_encode_dentry_release(&p, req->r_dentry, 2514 req->r_parent, mds, req->r_dentry_drop, 2515 req->r_dentry_unless); 2516 if (req->r_old_dentry_drop) 2517 releases += ceph_encode_dentry_release(&p, req->r_old_dentry, 2518 req->r_old_dentry_dir, mds, 2519 req->r_old_dentry_drop, 2520 req->r_old_dentry_unless); 2521 if (req->r_old_inode_drop) 2522 releases += ceph_encode_inode_release(&p, 2523 d_inode(req->r_old_dentry), 2524 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); 2525 2526 if (drop_cap_releases) { 2527 releases = 0; 2528 p = msg->front.iov_base + req->r_request_release_offset; 2529 } 2530 2531 head->num_releases = cpu_to_le16(releases); 2532 2533 /* time stamp */ 2534 { 2535 struct ceph_timespec ts; 2536 ceph_encode_timespec64(&ts, &req->r_stamp); 2537 ceph_encode_copy(&p, &ts, sizeof(ts)); 2538 } 2539 2540 if (WARN_ON_ONCE(p > end)) { 2541 ceph_msg_put(msg); 2542 msg = ERR_PTR(-ERANGE); 2543 goto out_free2; 2544 } 2545 2546 msg->front.iov_len = p - msg->front.iov_base; 2547 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2548 2549 if (req->r_pagelist) { 2550 struct ceph_pagelist *pagelist = req->r_pagelist; 2551 ceph_msg_data_add_pagelist(msg, pagelist); 2552 msg->hdr.data_len = cpu_to_le32(pagelist->length); 2553 } else { 2554 msg->hdr.data_len = 0; 2555 } 2556 2557 msg->hdr.data_off = cpu_to_le16(0); 2558 2559 out_free2: 2560 if (freepath2) 2561 ceph_mdsc_free_path((char *)path2, pathlen2); 2562 out_free1: 2563 if (freepath1) 2564 ceph_mdsc_free_path((char *)path1, pathlen1); 2565 out: 2566 return msg; 2567 } 2568 2569 /* 2570 * called under mdsc->mutex if error, under no mutex if 2571 * success. 2572 */ 2573 static void complete_request(struct ceph_mds_client *mdsc, 2574 struct ceph_mds_request *req) 2575 { 2576 req->r_end_latency = ktime_get(); 2577 2578 if (req->r_callback) 2579 req->r_callback(mdsc, req); 2580 complete_all(&req->r_completion); 2581 } 2582 2583 /* 2584 * called under mdsc->mutex 2585 */ 2586 static int __prepare_send_request(struct ceph_mds_client *mdsc, 2587 struct ceph_mds_request *req, 2588 int mds, bool drop_cap_releases) 2589 { 2590 struct ceph_mds_request_head *rhead; 2591 struct ceph_msg *msg; 2592 int flags = 0; 2593 2594 req->r_attempts++; 2595 if (req->r_inode) { 2596 struct ceph_cap *cap = 2597 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); 2598 2599 if (cap) 2600 req->r_sent_on_mseq = cap->mseq; 2601 else 2602 req->r_sent_on_mseq = -1; 2603 } 2604 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req, 2605 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); 2606 2607 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 2608 void *p; 2609 /* 2610 * Replay. Do not regenerate message (and rebuild 2611 * paths, etc.); just use the original message. 2612 * Rebuilding paths will break for renames because 2613 * d_move mangles the src name. 2614 */ 2615 msg = req->r_request; 2616 rhead = msg->front.iov_base; 2617 2618 flags = le32_to_cpu(rhead->flags); 2619 flags |= CEPH_MDS_FLAG_REPLAY; 2620 rhead->flags = cpu_to_le32(flags); 2621 2622 if (req->r_target_inode) 2623 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); 2624 2625 rhead->num_retry = req->r_attempts - 1; 2626 2627 /* remove cap/dentry releases from message */ 2628 rhead->num_releases = 0; 2629 2630 /* time stamp */ 2631 p = msg->front.iov_base + req->r_request_release_offset; 2632 { 2633 struct ceph_timespec ts; 2634 ceph_encode_timespec64(&ts, &req->r_stamp); 2635 ceph_encode_copy(&p, &ts, sizeof(ts)); 2636 } 2637 2638 msg->front.iov_len = p - msg->front.iov_base; 2639 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2640 return 0; 2641 } 2642 2643 if (req->r_request) { 2644 ceph_msg_put(req->r_request); 2645 req->r_request = NULL; 2646 } 2647 msg = create_request_message(mdsc, req, mds, drop_cap_releases); 2648 if (IS_ERR(msg)) { 2649 req->r_err = PTR_ERR(msg); 2650 return PTR_ERR(msg); 2651 } 2652 req->r_request = msg; 2653 2654 rhead = msg->front.iov_base; 2655 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 2656 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2657 flags |= CEPH_MDS_FLAG_REPLAY; 2658 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) 2659 flags |= CEPH_MDS_FLAG_ASYNC; 2660 if (req->r_parent) 2661 flags |= CEPH_MDS_FLAG_WANT_DENTRY; 2662 rhead->flags = cpu_to_le32(flags); 2663 rhead->num_fwd = req->r_num_fwd; 2664 rhead->num_retry = req->r_attempts - 1; 2665 2666 dout(" r_parent = %p\n", req->r_parent); 2667 return 0; 2668 } 2669 2670 /* 2671 * called under mdsc->mutex 2672 */ 2673 static int __send_request(struct ceph_mds_client *mdsc, 2674 struct ceph_mds_session *session, 2675 struct ceph_mds_request *req, 2676 bool drop_cap_releases) 2677 { 2678 int err; 2679 2680 err = __prepare_send_request(mdsc, req, session->s_mds, 2681 drop_cap_releases); 2682 if (!err) { 2683 ceph_msg_get(req->r_request); 2684 ceph_con_send(&session->s_con, req->r_request); 2685 } 2686 2687 return err; 2688 } 2689 2690 /* 2691 * send request, or put it on the appropriate wait list. 2692 */ 2693 static void __do_request(struct ceph_mds_client *mdsc, 2694 struct ceph_mds_request *req) 2695 { 2696 struct ceph_mds_session *session = NULL; 2697 int mds = -1; 2698 int err = 0; 2699 bool random; 2700 2701 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 2702 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) 2703 __unregister_request(mdsc, req); 2704 return; 2705 } 2706 2707 if (req->r_timeout && 2708 time_after_eq(jiffies, req->r_started + req->r_timeout)) { 2709 dout("do_request timed out\n"); 2710 err = -ETIMEDOUT; 2711 goto finish; 2712 } 2713 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 2714 dout("do_request forced umount\n"); 2715 err = -EIO; 2716 goto finish; 2717 } 2718 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) { 2719 if (mdsc->mdsmap_err) { 2720 err = mdsc->mdsmap_err; 2721 dout("do_request mdsmap err %d\n", err); 2722 goto finish; 2723 } 2724 if (mdsc->mdsmap->m_epoch == 0) { 2725 dout("do_request no mdsmap, waiting for map\n"); 2726 list_add(&req->r_wait, &mdsc->waiting_for_map); 2727 return; 2728 } 2729 if (!(mdsc->fsc->mount_options->flags & 2730 CEPH_MOUNT_OPT_MOUNTWAIT) && 2731 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) { 2732 err = -EHOSTUNREACH; 2733 goto finish; 2734 } 2735 } 2736 2737 put_request_session(req); 2738 2739 mds = __choose_mds(mdsc, req, &random); 2740 if (mds < 0 || 2741 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { 2742 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 2743 err = -EJUKEBOX; 2744 goto finish; 2745 } 2746 dout("do_request no mds or not active, waiting for map\n"); 2747 list_add(&req->r_wait, &mdsc->waiting_for_map); 2748 return; 2749 } 2750 2751 /* get, open session */ 2752 session = __ceph_lookup_mds_session(mdsc, mds); 2753 if (!session) { 2754 session = register_session(mdsc, mds); 2755 if (IS_ERR(session)) { 2756 err = PTR_ERR(session); 2757 goto finish; 2758 } 2759 } 2760 req->r_session = ceph_get_mds_session(session); 2761 2762 dout("do_request mds%d session %p state %s\n", mds, session, 2763 ceph_session_state_name(session->s_state)); 2764 if (session->s_state != CEPH_MDS_SESSION_OPEN && 2765 session->s_state != CEPH_MDS_SESSION_HUNG) { 2766 if (session->s_state == CEPH_MDS_SESSION_REJECTED) { 2767 err = -EACCES; 2768 goto out_session; 2769 } 2770 /* 2771 * We cannot queue async requests since the caps and delegated 2772 * inodes are bound to the session. Just return -EJUKEBOX and 2773 * let the caller retry a sync request in that case. 2774 */ 2775 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 2776 err = -EJUKEBOX; 2777 goto out_session; 2778 } 2779 if (session->s_state == CEPH_MDS_SESSION_NEW || 2780 session->s_state == CEPH_MDS_SESSION_CLOSING) { 2781 err = __open_session(mdsc, session); 2782 if (err) 2783 goto out_session; 2784 /* retry the same mds later */ 2785 if (random) 2786 req->r_resend_mds = mds; 2787 } 2788 list_add(&req->r_wait, &session->s_waiting); 2789 goto out_session; 2790 } 2791 2792 /* send request */ 2793 req->r_resend_mds = -1; /* forget any previous mds hint */ 2794 2795 if (req->r_request_started == 0) /* note request start time */ 2796 req->r_request_started = jiffies; 2797 2798 err = __send_request(mdsc, session, req, false); 2799 2800 out_session: 2801 ceph_put_mds_session(session); 2802 finish: 2803 if (err) { 2804 dout("__do_request early error %d\n", err); 2805 req->r_err = err; 2806 complete_request(mdsc, req); 2807 __unregister_request(mdsc, req); 2808 } 2809 return; 2810 } 2811 2812 /* 2813 * called under mdsc->mutex 2814 */ 2815 static void __wake_requests(struct ceph_mds_client *mdsc, 2816 struct list_head *head) 2817 { 2818 struct ceph_mds_request *req; 2819 LIST_HEAD(tmp_list); 2820 2821 list_splice_init(head, &tmp_list); 2822 2823 while (!list_empty(&tmp_list)) { 2824 req = list_entry(tmp_list.next, 2825 struct ceph_mds_request, r_wait); 2826 list_del_init(&req->r_wait); 2827 dout(" wake request %p tid %llu\n", req, req->r_tid); 2828 __do_request(mdsc, req); 2829 } 2830 } 2831 2832 /* 2833 * Wake up threads with requests pending for @mds, so that they can 2834 * resubmit their requests to a possibly different mds. 2835 */ 2836 static void kick_requests(struct ceph_mds_client *mdsc, int mds) 2837 { 2838 struct ceph_mds_request *req; 2839 struct rb_node *p = rb_first(&mdsc->request_tree); 2840 2841 dout("kick_requests mds%d\n", mds); 2842 while (p) { 2843 req = rb_entry(p, struct ceph_mds_request, r_node); 2844 p = rb_next(p); 2845 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2846 continue; 2847 if (req->r_attempts > 0) 2848 continue; /* only new requests */ 2849 if (req->r_session && 2850 req->r_session->s_mds == mds) { 2851 dout(" kicking tid %llu\n", req->r_tid); 2852 list_del_init(&req->r_wait); 2853 __do_request(mdsc, req); 2854 } 2855 } 2856 } 2857 2858 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, 2859 struct ceph_mds_request *req) 2860 { 2861 int err = 0; 2862 2863 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */ 2864 if (req->r_inode) 2865 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 2866 if (req->r_parent) { 2867 struct ceph_inode_info *ci = ceph_inode(req->r_parent); 2868 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ? 2869 CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD; 2870 spin_lock(&ci->i_ceph_lock); 2871 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false); 2872 __ceph_touch_fmode(ci, mdsc, fmode); 2873 spin_unlock(&ci->i_ceph_lock); 2874 ihold(req->r_parent); 2875 } 2876 if (req->r_old_dentry_dir) 2877 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 2878 CEPH_CAP_PIN); 2879 2880 if (req->r_inode) { 2881 err = ceph_wait_on_async_create(req->r_inode); 2882 if (err) { 2883 dout("%s: wait for async create returned: %d\n", 2884 __func__, err); 2885 return err; 2886 } 2887 } 2888 2889 if (!err && req->r_old_inode) { 2890 err = ceph_wait_on_async_create(req->r_old_inode); 2891 if (err) { 2892 dout("%s: wait for async create returned: %d\n", 2893 __func__, err); 2894 return err; 2895 } 2896 } 2897 2898 dout("submit_request on %p for inode %p\n", req, dir); 2899 mutex_lock(&mdsc->mutex); 2900 __register_request(mdsc, req, dir); 2901 __do_request(mdsc, req); 2902 err = req->r_err; 2903 mutex_unlock(&mdsc->mutex); 2904 return err; 2905 } 2906 2907 static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc, 2908 struct ceph_mds_request *req) 2909 { 2910 int err; 2911 2912 /* wait */ 2913 dout("do_request waiting\n"); 2914 if (!req->r_timeout && req->r_wait_for_completion) { 2915 err = req->r_wait_for_completion(mdsc, req); 2916 } else { 2917 long timeleft = wait_for_completion_killable_timeout( 2918 &req->r_completion, 2919 ceph_timeout_jiffies(req->r_timeout)); 2920 if (timeleft > 0) 2921 err = 0; 2922 else if (!timeleft) 2923 err = -ETIMEDOUT; /* timed out */ 2924 else 2925 err = timeleft; /* killed */ 2926 } 2927 dout("do_request waited, got %d\n", err); 2928 mutex_lock(&mdsc->mutex); 2929 2930 /* only abort if we didn't race with a real reply */ 2931 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 2932 err = le32_to_cpu(req->r_reply_info.head->result); 2933 } else if (err < 0) { 2934 dout("aborted request %lld with %d\n", req->r_tid, err); 2935 2936 /* 2937 * ensure we aren't running concurrently with 2938 * ceph_fill_trace or ceph_readdir_prepopulate, which 2939 * rely on locks (dir mutex) held by our caller. 2940 */ 2941 mutex_lock(&req->r_fill_mutex); 2942 req->r_err = err; 2943 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 2944 mutex_unlock(&req->r_fill_mutex); 2945 2946 if (req->r_parent && 2947 (req->r_op & CEPH_MDS_OP_WRITE)) 2948 ceph_invalidate_dir_request(req); 2949 } else { 2950 err = req->r_err; 2951 } 2952 2953 mutex_unlock(&mdsc->mutex); 2954 return err; 2955 } 2956 2957 /* 2958 * Synchrously perform an mds request. Take care of all of the 2959 * session setup, forwarding, retry details. 2960 */ 2961 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, 2962 struct inode *dir, 2963 struct ceph_mds_request *req) 2964 { 2965 int err; 2966 2967 dout("do_request on %p\n", req); 2968 2969 /* issue */ 2970 err = ceph_mdsc_submit_request(mdsc, dir, req); 2971 if (!err) 2972 err = ceph_mdsc_wait_request(mdsc, req); 2973 dout("do_request %p done, result %d\n", req, err); 2974 return err; 2975 } 2976 2977 /* 2978 * Invalidate dir's completeness, dentry lease state on an aborted MDS 2979 * namespace request. 2980 */ 2981 void ceph_invalidate_dir_request(struct ceph_mds_request *req) 2982 { 2983 struct inode *dir = req->r_parent; 2984 struct inode *old_dir = req->r_old_dentry_dir; 2985 2986 dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir); 2987 2988 ceph_dir_clear_complete(dir); 2989 if (old_dir) 2990 ceph_dir_clear_complete(old_dir); 2991 if (req->r_dentry) 2992 ceph_invalidate_dentry_lease(req->r_dentry); 2993 if (req->r_old_dentry) 2994 ceph_invalidate_dentry_lease(req->r_old_dentry); 2995 } 2996 2997 /* 2998 * Handle mds reply. 2999 * 3000 * We take the session mutex and parse and process the reply immediately. 3001 * This preserves the logical ordering of replies, capabilities, etc., sent 3002 * by the MDS as they are applied to our local cache. 3003 */ 3004 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) 3005 { 3006 struct ceph_mds_client *mdsc = session->s_mdsc; 3007 struct ceph_mds_request *req; 3008 struct ceph_mds_reply_head *head = msg->front.iov_base; 3009 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 3010 struct ceph_snap_realm *realm; 3011 u64 tid; 3012 int err, result; 3013 int mds = session->s_mds; 3014 3015 if (msg->front.iov_len < sizeof(*head)) { 3016 pr_err("mdsc_handle_reply got corrupt (short) reply\n"); 3017 ceph_msg_dump(msg); 3018 return; 3019 } 3020 3021 /* get request, session */ 3022 tid = le64_to_cpu(msg->hdr.tid); 3023 mutex_lock(&mdsc->mutex); 3024 req = lookup_get_request(mdsc, tid); 3025 if (!req) { 3026 dout("handle_reply on unknown tid %llu\n", tid); 3027 mutex_unlock(&mdsc->mutex); 3028 return; 3029 } 3030 dout("handle_reply %p\n", req); 3031 3032 /* correct session? */ 3033 if (req->r_session != session) { 3034 pr_err("mdsc_handle_reply got %llu on session mds%d" 3035 " not mds%d\n", tid, session->s_mds, 3036 req->r_session ? req->r_session->s_mds : -1); 3037 mutex_unlock(&mdsc->mutex); 3038 goto out; 3039 } 3040 3041 /* dup? */ 3042 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) || 3043 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) { 3044 pr_warn("got a dup %s reply on %llu from mds%d\n", 3045 head->safe ? "safe" : "unsafe", tid, mds); 3046 mutex_unlock(&mdsc->mutex); 3047 goto out; 3048 } 3049 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) { 3050 pr_warn("got unsafe after safe on %llu from mds%d\n", 3051 tid, mds); 3052 mutex_unlock(&mdsc->mutex); 3053 goto out; 3054 } 3055 3056 result = le32_to_cpu(head->result); 3057 3058 /* 3059 * Handle an ESTALE 3060 * if we're not talking to the authority, send to them 3061 * if the authority has changed while we weren't looking, 3062 * send to new authority 3063 * Otherwise we just have to return an ESTALE 3064 */ 3065 if (result == -ESTALE) { 3066 dout("got ESTALE on request %llu\n", req->r_tid); 3067 req->r_resend_mds = -1; 3068 if (req->r_direct_mode != USE_AUTH_MDS) { 3069 dout("not using auth, setting for that now\n"); 3070 req->r_direct_mode = USE_AUTH_MDS; 3071 __do_request(mdsc, req); 3072 mutex_unlock(&mdsc->mutex); 3073 goto out; 3074 } else { 3075 int mds = __choose_mds(mdsc, req, NULL); 3076 if (mds >= 0 && mds != req->r_session->s_mds) { 3077 dout("but auth changed, so resending\n"); 3078 __do_request(mdsc, req); 3079 mutex_unlock(&mdsc->mutex); 3080 goto out; 3081 } 3082 } 3083 dout("have to return ESTALE on request %llu\n", req->r_tid); 3084 } 3085 3086 3087 if (head->safe) { 3088 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags); 3089 __unregister_request(mdsc, req); 3090 3091 /* last request during umount? */ 3092 if (mdsc->stopping && !__get_oldest_req(mdsc)) 3093 complete_all(&mdsc->safe_umount_waiters); 3094 3095 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3096 /* 3097 * We already handled the unsafe response, now do the 3098 * cleanup. No need to examine the response; the MDS 3099 * doesn't include any result info in the safe 3100 * response. And even if it did, there is nothing 3101 * useful we could do with a revised return value. 3102 */ 3103 dout("got safe reply %llu, mds%d\n", tid, mds); 3104 3105 mutex_unlock(&mdsc->mutex); 3106 goto out; 3107 } 3108 } else { 3109 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags); 3110 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 3111 } 3112 3113 dout("handle_reply tid %lld result %d\n", tid, result); 3114 rinfo = &req->r_reply_info; 3115 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features)) 3116 err = parse_reply_info(session, msg, rinfo, (u64)-1); 3117 else 3118 err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features); 3119 mutex_unlock(&mdsc->mutex); 3120 3121 mutex_lock(&session->s_mutex); 3122 if (err < 0) { 3123 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid); 3124 ceph_msg_dump(msg); 3125 goto out_err; 3126 } 3127 3128 /* snap trace */ 3129 realm = NULL; 3130 if (rinfo->snapblob_len) { 3131 down_write(&mdsc->snap_rwsem); 3132 ceph_update_snap_trace(mdsc, rinfo->snapblob, 3133 rinfo->snapblob + rinfo->snapblob_len, 3134 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP, 3135 &realm); 3136 downgrade_write(&mdsc->snap_rwsem); 3137 } else { 3138 down_read(&mdsc->snap_rwsem); 3139 } 3140 3141 /* insert trace into our cache */ 3142 mutex_lock(&req->r_fill_mutex); 3143 current->journal_info = req; 3144 err = ceph_fill_trace(mdsc->fsc->sb, req); 3145 if (err == 0) { 3146 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || 3147 req->r_op == CEPH_MDS_OP_LSSNAP)) 3148 ceph_readdir_prepopulate(req, req->r_session); 3149 } 3150 current->journal_info = NULL; 3151 mutex_unlock(&req->r_fill_mutex); 3152 3153 up_read(&mdsc->snap_rwsem); 3154 if (realm) 3155 ceph_put_snap_realm(mdsc, realm); 3156 3157 if (err == 0) { 3158 if (req->r_target_inode && 3159 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3160 struct ceph_inode_info *ci = 3161 ceph_inode(req->r_target_inode); 3162 spin_lock(&ci->i_unsafe_lock); 3163 list_add_tail(&req->r_unsafe_target_item, 3164 &ci->i_unsafe_iops); 3165 spin_unlock(&ci->i_unsafe_lock); 3166 } 3167 3168 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 3169 } 3170 out_err: 3171 mutex_lock(&mdsc->mutex); 3172 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3173 if (err) { 3174 req->r_err = err; 3175 } else { 3176 req->r_reply = ceph_msg_get(msg); 3177 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags); 3178 } 3179 } else { 3180 dout("reply arrived after request %lld was aborted\n", tid); 3181 } 3182 mutex_unlock(&mdsc->mutex); 3183 3184 mutex_unlock(&session->s_mutex); 3185 3186 /* kick calling process */ 3187 complete_request(mdsc, req); 3188 3189 ceph_update_metadata_latency(&mdsc->metric, req->r_start_latency, 3190 req->r_end_latency, err); 3191 out: 3192 ceph_mdsc_put_request(req); 3193 return; 3194 } 3195 3196 3197 3198 /* 3199 * handle mds notification that our request has been forwarded. 3200 */ 3201 static void handle_forward(struct ceph_mds_client *mdsc, 3202 struct ceph_mds_session *session, 3203 struct ceph_msg *msg) 3204 { 3205 struct ceph_mds_request *req; 3206 u64 tid = le64_to_cpu(msg->hdr.tid); 3207 u32 next_mds; 3208 u32 fwd_seq; 3209 int err = -EINVAL; 3210 void *p = msg->front.iov_base; 3211 void *end = p + msg->front.iov_len; 3212 3213 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 3214 next_mds = ceph_decode_32(&p); 3215 fwd_seq = ceph_decode_32(&p); 3216 3217 mutex_lock(&mdsc->mutex); 3218 req = lookup_get_request(mdsc, tid); 3219 if (!req) { 3220 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds); 3221 goto out; /* dup reply? */ 3222 } 3223 3224 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3225 dout("forward tid %llu aborted, unregistering\n", tid); 3226 __unregister_request(mdsc, req); 3227 } else if (fwd_seq <= req->r_num_fwd) { 3228 dout("forward tid %llu to mds%d - old seq %d <= %d\n", 3229 tid, next_mds, req->r_num_fwd, fwd_seq); 3230 } else { 3231 /* resend. forward race not possible; mds would drop */ 3232 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); 3233 BUG_ON(req->r_err); 3234 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)); 3235 req->r_attempts = 0; 3236 req->r_num_fwd = fwd_seq; 3237 req->r_resend_mds = next_mds; 3238 put_request_session(req); 3239 __do_request(mdsc, req); 3240 } 3241 ceph_mdsc_put_request(req); 3242 out: 3243 mutex_unlock(&mdsc->mutex); 3244 return; 3245 3246 bad: 3247 pr_err("mdsc_handle_forward decode error err=%d\n", err); 3248 } 3249 3250 static int __decode_session_metadata(void **p, void *end, 3251 bool *blacklisted) 3252 { 3253 /* map<string,string> */ 3254 u32 n; 3255 bool err_str; 3256 ceph_decode_32_safe(p, end, n, bad); 3257 while (n-- > 0) { 3258 u32 len; 3259 ceph_decode_32_safe(p, end, len, bad); 3260 ceph_decode_need(p, end, len, bad); 3261 err_str = !strncmp(*p, "error_string", len); 3262 *p += len; 3263 ceph_decode_32_safe(p, end, len, bad); 3264 ceph_decode_need(p, end, len, bad); 3265 if (err_str && strnstr(*p, "blacklisted", len)) 3266 *blacklisted = true; 3267 *p += len; 3268 } 3269 return 0; 3270 bad: 3271 return -1; 3272 } 3273 3274 /* 3275 * handle a mds session control message 3276 */ 3277 static void handle_session(struct ceph_mds_session *session, 3278 struct ceph_msg *msg) 3279 { 3280 struct ceph_mds_client *mdsc = session->s_mdsc; 3281 int mds = session->s_mds; 3282 int msg_version = le16_to_cpu(msg->hdr.version); 3283 void *p = msg->front.iov_base; 3284 void *end = p + msg->front.iov_len; 3285 struct ceph_mds_session_head *h; 3286 u32 op; 3287 u64 seq, features = 0; 3288 int wake = 0; 3289 bool blacklisted = false; 3290 3291 /* decode */ 3292 ceph_decode_need(&p, end, sizeof(*h), bad); 3293 h = p; 3294 p += sizeof(*h); 3295 3296 op = le32_to_cpu(h->op); 3297 seq = le64_to_cpu(h->seq); 3298 3299 if (msg_version >= 3) { 3300 u32 len; 3301 /* version >= 2, metadata */ 3302 if (__decode_session_metadata(&p, end, &blacklisted) < 0) 3303 goto bad; 3304 /* version >= 3, feature bits */ 3305 ceph_decode_32_safe(&p, end, len, bad); 3306 ceph_decode_64_safe(&p, end, features, bad); 3307 p += len - sizeof(features); 3308 } 3309 3310 mutex_lock(&mdsc->mutex); 3311 if (op == CEPH_SESSION_CLOSE) { 3312 ceph_get_mds_session(session); 3313 __unregister_session(mdsc, session); 3314 } 3315 /* FIXME: this ttl calculation is generous */ 3316 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 3317 mutex_unlock(&mdsc->mutex); 3318 3319 mutex_lock(&session->s_mutex); 3320 3321 dout("handle_session mds%d %s %p state %s seq %llu\n", 3322 mds, ceph_session_op_name(op), session, 3323 ceph_session_state_name(session->s_state), seq); 3324 3325 if (session->s_state == CEPH_MDS_SESSION_HUNG) { 3326 session->s_state = CEPH_MDS_SESSION_OPEN; 3327 pr_info("mds%d came back\n", session->s_mds); 3328 } 3329 3330 switch (op) { 3331 case CEPH_SESSION_OPEN: 3332 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3333 pr_info("mds%d reconnect success\n", session->s_mds); 3334 session->s_state = CEPH_MDS_SESSION_OPEN; 3335 session->s_features = features; 3336 renewed_caps(mdsc, session, 0); 3337 wake = 1; 3338 if (mdsc->stopping) 3339 __close_session(mdsc, session); 3340 break; 3341 3342 case CEPH_SESSION_RENEWCAPS: 3343 if (session->s_renew_seq == seq) 3344 renewed_caps(mdsc, session, 1); 3345 break; 3346 3347 case CEPH_SESSION_CLOSE: 3348 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3349 pr_info("mds%d reconnect denied\n", session->s_mds); 3350 session->s_state = CEPH_MDS_SESSION_CLOSED; 3351 cleanup_session_requests(mdsc, session); 3352 remove_session_caps(session); 3353 wake = 2; /* for good measure */ 3354 wake_up_all(&mdsc->session_close_wq); 3355 break; 3356 3357 case CEPH_SESSION_STALE: 3358 pr_info("mds%d caps went stale, renewing\n", 3359 session->s_mds); 3360 spin_lock(&session->s_gen_ttl_lock); 3361 session->s_cap_gen++; 3362 session->s_cap_ttl = jiffies - 1; 3363 spin_unlock(&session->s_gen_ttl_lock); 3364 send_renew_caps(mdsc, session); 3365 break; 3366 3367 case CEPH_SESSION_RECALL_STATE: 3368 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 3369 break; 3370 3371 case CEPH_SESSION_FLUSHMSG: 3372 send_flushmsg_ack(mdsc, session, seq); 3373 break; 3374 3375 case CEPH_SESSION_FORCE_RO: 3376 dout("force_session_readonly %p\n", session); 3377 spin_lock(&session->s_cap_lock); 3378 session->s_readonly = true; 3379 spin_unlock(&session->s_cap_lock); 3380 wake_up_session_caps(session, FORCE_RO); 3381 break; 3382 3383 case CEPH_SESSION_REJECT: 3384 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING); 3385 pr_info("mds%d rejected session\n", session->s_mds); 3386 session->s_state = CEPH_MDS_SESSION_REJECTED; 3387 cleanup_session_requests(mdsc, session); 3388 remove_session_caps(session); 3389 if (blacklisted) 3390 mdsc->fsc->blacklisted = true; 3391 wake = 2; /* for good measure */ 3392 break; 3393 3394 default: 3395 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); 3396 WARN_ON(1); 3397 } 3398 3399 mutex_unlock(&session->s_mutex); 3400 if (wake) { 3401 mutex_lock(&mdsc->mutex); 3402 __wake_requests(mdsc, &session->s_waiting); 3403 if (wake == 2) 3404 kick_requests(mdsc, mds); 3405 mutex_unlock(&mdsc->mutex); 3406 } 3407 if (op == CEPH_SESSION_CLOSE) 3408 ceph_put_mds_session(session); 3409 return; 3410 3411 bad: 3412 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds, 3413 (int)msg->front.iov_len); 3414 ceph_msg_dump(msg); 3415 return; 3416 } 3417 3418 void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req) 3419 { 3420 int dcaps; 3421 3422 dcaps = xchg(&req->r_dir_caps, 0); 3423 if (dcaps) { 3424 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 3425 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps); 3426 } 3427 } 3428 3429 void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req) 3430 { 3431 int dcaps; 3432 3433 dcaps = xchg(&req->r_dir_caps, 0); 3434 if (dcaps) { 3435 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 3436 ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent), 3437 dcaps); 3438 } 3439 } 3440 3441 /* 3442 * called under session->mutex. 3443 */ 3444 static void replay_unsafe_requests(struct ceph_mds_client *mdsc, 3445 struct ceph_mds_session *session) 3446 { 3447 struct ceph_mds_request *req, *nreq; 3448 struct rb_node *p; 3449 3450 dout("replay_unsafe_requests mds%d\n", session->s_mds); 3451 3452 mutex_lock(&mdsc->mutex); 3453 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) 3454 __send_request(mdsc, session, req, true); 3455 3456 /* 3457 * also re-send old requests when MDS enters reconnect stage. So that MDS 3458 * can process completed request in clientreplay stage. 3459 */ 3460 p = rb_first(&mdsc->request_tree); 3461 while (p) { 3462 req = rb_entry(p, struct ceph_mds_request, r_node); 3463 p = rb_next(p); 3464 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3465 continue; 3466 if (req->r_attempts == 0) 3467 continue; /* only old requests */ 3468 if (!req->r_session) 3469 continue; 3470 if (req->r_session->s_mds != session->s_mds) 3471 continue; 3472 3473 ceph_mdsc_release_dir_caps_no_check(req); 3474 3475 __send_request(mdsc, session, req, true); 3476 } 3477 mutex_unlock(&mdsc->mutex); 3478 } 3479 3480 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state) 3481 { 3482 struct ceph_msg *reply; 3483 struct ceph_pagelist *_pagelist; 3484 struct page *page; 3485 __le32 *addr; 3486 int err = -ENOMEM; 3487 3488 if (!recon_state->allow_multi) 3489 return -ENOSPC; 3490 3491 /* can't handle message that contains both caps and realm */ 3492 BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms); 3493 3494 /* pre-allocate new pagelist */ 3495 _pagelist = ceph_pagelist_alloc(GFP_NOFS); 3496 if (!_pagelist) 3497 return -ENOMEM; 3498 3499 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 3500 if (!reply) 3501 goto fail_msg; 3502 3503 /* placeholder for nr_caps */ 3504 err = ceph_pagelist_encode_32(_pagelist, 0); 3505 if (err < 0) 3506 goto fail; 3507 3508 if (recon_state->nr_caps) { 3509 /* currently encoding caps */ 3510 err = ceph_pagelist_encode_32(recon_state->pagelist, 0); 3511 if (err) 3512 goto fail; 3513 } else { 3514 /* placeholder for nr_realms (currently encoding relams) */ 3515 err = ceph_pagelist_encode_32(_pagelist, 0); 3516 if (err < 0) 3517 goto fail; 3518 } 3519 3520 err = ceph_pagelist_encode_8(recon_state->pagelist, 1); 3521 if (err) 3522 goto fail; 3523 3524 page = list_first_entry(&recon_state->pagelist->head, struct page, lru); 3525 addr = kmap_atomic(page); 3526 if (recon_state->nr_caps) { 3527 /* currently encoding caps */ 3528 *addr = cpu_to_le32(recon_state->nr_caps); 3529 } else { 3530 /* currently encoding relams */ 3531 *(addr + 1) = cpu_to_le32(recon_state->nr_realms); 3532 } 3533 kunmap_atomic(addr); 3534 3535 reply->hdr.version = cpu_to_le16(5); 3536 reply->hdr.compat_version = cpu_to_le16(4); 3537 3538 reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length); 3539 ceph_msg_data_add_pagelist(reply, recon_state->pagelist); 3540 3541 ceph_con_send(&recon_state->session->s_con, reply); 3542 ceph_pagelist_release(recon_state->pagelist); 3543 3544 recon_state->pagelist = _pagelist; 3545 recon_state->nr_caps = 0; 3546 recon_state->nr_realms = 0; 3547 recon_state->msg_version = 5; 3548 return 0; 3549 fail: 3550 ceph_msg_put(reply); 3551 fail_msg: 3552 ceph_pagelist_release(_pagelist); 3553 return err; 3554 } 3555 3556 /* 3557 * Encode information about a cap for a reconnect with the MDS. 3558 */ 3559 static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap, 3560 void *arg) 3561 { 3562 union { 3563 struct ceph_mds_cap_reconnect v2; 3564 struct ceph_mds_cap_reconnect_v1 v1; 3565 } rec; 3566 struct ceph_inode_info *ci = cap->ci; 3567 struct ceph_reconnect_state *recon_state = arg; 3568 struct ceph_pagelist *pagelist = recon_state->pagelist; 3569 int err; 3570 u64 snap_follows; 3571 3572 dout(" adding %p ino %llx.%llx cap %p %lld %s\n", 3573 inode, ceph_vinop(inode), cap, cap->cap_id, 3574 ceph_cap_string(cap->issued)); 3575 3576 spin_lock(&ci->i_ceph_lock); 3577 cap->seq = 0; /* reset cap seq */ 3578 cap->issue_seq = 0; /* and issue_seq */ 3579 cap->mseq = 0; /* and migrate_seq */ 3580 cap->cap_gen = cap->session->s_cap_gen; 3581 3582 /* These are lost when the session goes away */ 3583 if (S_ISDIR(inode->i_mode)) { 3584 if (cap->issued & CEPH_CAP_DIR_CREATE) { 3585 ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns)); 3586 memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout)); 3587 } 3588 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS; 3589 } 3590 3591 if (recon_state->msg_version >= 2) { 3592 rec.v2.cap_id = cpu_to_le64(cap->cap_id); 3593 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3594 rec.v2.issued = cpu_to_le32(cap->issued); 3595 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 3596 rec.v2.pathbase = 0; 3597 rec.v2.flock_len = (__force __le32) 3598 ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1); 3599 } else { 3600 rec.v1.cap_id = cpu_to_le64(cap->cap_id); 3601 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3602 rec.v1.issued = cpu_to_le32(cap->issued); 3603 rec.v1.size = cpu_to_le64(inode->i_size); 3604 ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime); 3605 ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime); 3606 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 3607 rec.v1.pathbase = 0; 3608 } 3609 3610 if (list_empty(&ci->i_cap_snaps)) { 3611 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0; 3612 } else { 3613 struct ceph_cap_snap *capsnap = 3614 list_first_entry(&ci->i_cap_snaps, 3615 struct ceph_cap_snap, ci_item); 3616 snap_follows = capsnap->follows; 3617 } 3618 spin_unlock(&ci->i_ceph_lock); 3619 3620 if (recon_state->msg_version >= 2) { 3621 int num_fcntl_locks, num_flock_locks; 3622 struct ceph_filelock *flocks = NULL; 3623 size_t struct_len, total_len = sizeof(u64); 3624 u8 struct_v = 0; 3625 3626 encode_again: 3627 if (rec.v2.flock_len) { 3628 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); 3629 } else { 3630 num_fcntl_locks = 0; 3631 num_flock_locks = 0; 3632 } 3633 if (num_fcntl_locks + num_flock_locks > 0) { 3634 flocks = kmalloc_array(num_fcntl_locks + num_flock_locks, 3635 sizeof(struct ceph_filelock), 3636 GFP_NOFS); 3637 if (!flocks) { 3638 err = -ENOMEM; 3639 goto out_err; 3640 } 3641 err = ceph_encode_locks_to_buffer(inode, flocks, 3642 num_fcntl_locks, 3643 num_flock_locks); 3644 if (err) { 3645 kfree(flocks); 3646 flocks = NULL; 3647 if (err == -ENOSPC) 3648 goto encode_again; 3649 goto out_err; 3650 } 3651 } else { 3652 kfree(flocks); 3653 flocks = NULL; 3654 } 3655 3656 if (recon_state->msg_version >= 3) { 3657 /* version, compat_version and struct_len */ 3658 total_len += 2 * sizeof(u8) + sizeof(u32); 3659 struct_v = 2; 3660 } 3661 /* 3662 * number of encoded locks is stable, so copy to pagelist 3663 */ 3664 struct_len = 2 * sizeof(u32) + 3665 (num_fcntl_locks + num_flock_locks) * 3666 sizeof(struct ceph_filelock); 3667 rec.v2.flock_len = cpu_to_le32(struct_len); 3668 3669 struct_len += sizeof(u32) + sizeof(rec.v2); 3670 3671 if (struct_v >= 2) 3672 struct_len += sizeof(u64); /* snap_follows */ 3673 3674 total_len += struct_len; 3675 3676 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) { 3677 err = send_reconnect_partial(recon_state); 3678 if (err) 3679 goto out_freeflocks; 3680 pagelist = recon_state->pagelist; 3681 } 3682 3683 err = ceph_pagelist_reserve(pagelist, total_len); 3684 if (err) 3685 goto out_freeflocks; 3686 3687 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 3688 if (recon_state->msg_version >= 3) { 3689 ceph_pagelist_encode_8(pagelist, struct_v); 3690 ceph_pagelist_encode_8(pagelist, 1); 3691 ceph_pagelist_encode_32(pagelist, struct_len); 3692 } 3693 ceph_pagelist_encode_string(pagelist, NULL, 0); 3694 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2)); 3695 ceph_locks_to_pagelist(flocks, pagelist, 3696 num_fcntl_locks, num_flock_locks); 3697 if (struct_v >= 2) 3698 ceph_pagelist_encode_64(pagelist, snap_follows); 3699 out_freeflocks: 3700 kfree(flocks); 3701 } else { 3702 u64 pathbase = 0; 3703 int pathlen = 0; 3704 char *path = NULL; 3705 struct dentry *dentry; 3706 3707 dentry = d_find_alias(inode); 3708 if (dentry) { 3709 path = ceph_mdsc_build_path(dentry, 3710 &pathlen, &pathbase, 0); 3711 dput(dentry); 3712 if (IS_ERR(path)) { 3713 err = PTR_ERR(path); 3714 goto out_err; 3715 } 3716 rec.v1.pathbase = cpu_to_le64(pathbase); 3717 } 3718 3719 err = ceph_pagelist_reserve(pagelist, 3720 sizeof(u64) + sizeof(u32) + 3721 pathlen + sizeof(rec.v1)); 3722 if (err) { 3723 goto out_freepath; 3724 } 3725 3726 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 3727 ceph_pagelist_encode_string(pagelist, path, pathlen); 3728 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); 3729 out_freepath: 3730 ceph_mdsc_free_path(path, pathlen); 3731 } 3732 3733 out_err: 3734 if (err >= 0) 3735 recon_state->nr_caps++; 3736 return err; 3737 } 3738 3739 static int encode_snap_realms(struct ceph_mds_client *mdsc, 3740 struct ceph_reconnect_state *recon_state) 3741 { 3742 struct rb_node *p; 3743 struct ceph_pagelist *pagelist = recon_state->pagelist; 3744 int err = 0; 3745 3746 if (recon_state->msg_version >= 4) { 3747 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms); 3748 if (err < 0) 3749 goto fail; 3750 } 3751 3752 /* 3753 * snaprealms. we provide mds with the ino, seq (version), and 3754 * parent for all of our realms. If the mds has any newer info, 3755 * it will tell us. 3756 */ 3757 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { 3758 struct ceph_snap_realm *realm = 3759 rb_entry(p, struct ceph_snap_realm, node); 3760 struct ceph_mds_snaprealm_reconnect sr_rec; 3761 3762 if (recon_state->msg_version >= 4) { 3763 size_t need = sizeof(u8) * 2 + sizeof(u32) + 3764 sizeof(sr_rec); 3765 3766 if (pagelist->length + need > RECONNECT_MAX_SIZE) { 3767 err = send_reconnect_partial(recon_state); 3768 if (err) 3769 goto fail; 3770 pagelist = recon_state->pagelist; 3771 } 3772 3773 err = ceph_pagelist_reserve(pagelist, need); 3774 if (err) 3775 goto fail; 3776 3777 ceph_pagelist_encode_8(pagelist, 1); 3778 ceph_pagelist_encode_8(pagelist, 1); 3779 ceph_pagelist_encode_32(pagelist, sizeof(sr_rec)); 3780 } 3781 3782 dout(" adding snap realm %llx seq %lld parent %llx\n", 3783 realm->ino, realm->seq, realm->parent_ino); 3784 sr_rec.ino = cpu_to_le64(realm->ino); 3785 sr_rec.seq = cpu_to_le64(realm->seq); 3786 sr_rec.parent = cpu_to_le64(realm->parent_ino); 3787 3788 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); 3789 if (err) 3790 goto fail; 3791 3792 recon_state->nr_realms++; 3793 } 3794 fail: 3795 return err; 3796 } 3797 3798 3799 /* 3800 * If an MDS fails and recovers, clients need to reconnect in order to 3801 * reestablish shared state. This includes all caps issued through 3802 * this session _and_ the snap_realm hierarchy. Because it's not 3803 * clear which snap realms the mds cares about, we send everything we 3804 * know about.. that ensures we'll then get any new info the 3805 * recovering MDS might have. 3806 * 3807 * This is a relatively heavyweight operation, but it's rare. 3808 */ 3809 static void send_mds_reconnect(struct ceph_mds_client *mdsc, 3810 struct ceph_mds_session *session) 3811 { 3812 struct ceph_msg *reply; 3813 int mds = session->s_mds; 3814 int err = -ENOMEM; 3815 struct ceph_reconnect_state recon_state = { 3816 .session = session, 3817 }; 3818 LIST_HEAD(dispose); 3819 3820 pr_info("mds%d reconnect start\n", mds); 3821 3822 recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS); 3823 if (!recon_state.pagelist) 3824 goto fail_nopagelist; 3825 3826 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 3827 if (!reply) 3828 goto fail_nomsg; 3829 3830 xa_destroy(&session->s_delegated_inos); 3831 3832 mutex_lock(&session->s_mutex); 3833 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 3834 session->s_seq = 0; 3835 3836 dout("session %p state %s\n", session, 3837 ceph_session_state_name(session->s_state)); 3838 3839 spin_lock(&session->s_gen_ttl_lock); 3840 session->s_cap_gen++; 3841 spin_unlock(&session->s_gen_ttl_lock); 3842 3843 spin_lock(&session->s_cap_lock); 3844 /* don't know if session is readonly */ 3845 session->s_readonly = 0; 3846 /* 3847 * notify __ceph_remove_cap() that we are composing cap reconnect. 3848 * If a cap get released before being added to the cap reconnect, 3849 * __ceph_remove_cap() should skip queuing cap release. 3850 */ 3851 session->s_cap_reconnect = 1; 3852 /* drop old cap expires; we're about to reestablish that state */ 3853 detach_cap_releases(session, &dispose); 3854 spin_unlock(&session->s_cap_lock); 3855 dispose_cap_releases(mdsc, &dispose); 3856 3857 /* trim unused caps to reduce MDS's cache rejoin time */ 3858 if (mdsc->fsc->sb->s_root) 3859 shrink_dcache_parent(mdsc->fsc->sb->s_root); 3860 3861 ceph_con_close(&session->s_con); 3862 ceph_con_open(&session->s_con, 3863 CEPH_ENTITY_TYPE_MDS, mds, 3864 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 3865 3866 /* replay unsafe requests */ 3867 replay_unsafe_requests(mdsc, session); 3868 3869 ceph_early_kick_flushing_caps(mdsc, session); 3870 3871 down_read(&mdsc->snap_rwsem); 3872 3873 /* placeholder for nr_caps */ 3874 err = ceph_pagelist_encode_32(recon_state.pagelist, 0); 3875 if (err) 3876 goto fail; 3877 3878 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) { 3879 recon_state.msg_version = 3; 3880 recon_state.allow_multi = true; 3881 } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) { 3882 recon_state.msg_version = 3; 3883 } else { 3884 recon_state.msg_version = 2; 3885 } 3886 /* trsaverse this session's caps */ 3887 err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state); 3888 3889 spin_lock(&session->s_cap_lock); 3890 session->s_cap_reconnect = 0; 3891 spin_unlock(&session->s_cap_lock); 3892 3893 if (err < 0) 3894 goto fail; 3895 3896 /* check if all realms can be encoded into current message */ 3897 if (mdsc->num_snap_realms) { 3898 size_t total_len = 3899 recon_state.pagelist->length + 3900 mdsc->num_snap_realms * 3901 sizeof(struct ceph_mds_snaprealm_reconnect); 3902 if (recon_state.msg_version >= 4) { 3903 /* number of realms */ 3904 total_len += sizeof(u32); 3905 /* version, compat_version and struct_len */ 3906 total_len += mdsc->num_snap_realms * 3907 (2 * sizeof(u8) + sizeof(u32)); 3908 } 3909 if (total_len > RECONNECT_MAX_SIZE) { 3910 if (!recon_state.allow_multi) { 3911 err = -ENOSPC; 3912 goto fail; 3913 } 3914 if (recon_state.nr_caps) { 3915 err = send_reconnect_partial(&recon_state); 3916 if (err) 3917 goto fail; 3918 } 3919 recon_state.msg_version = 5; 3920 } 3921 } 3922 3923 err = encode_snap_realms(mdsc, &recon_state); 3924 if (err < 0) 3925 goto fail; 3926 3927 if (recon_state.msg_version >= 5) { 3928 err = ceph_pagelist_encode_8(recon_state.pagelist, 0); 3929 if (err < 0) 3930 goto fail; 3931 } 3932 3933 if (recon_state.nr_caps || recon_state.nr_realms) { 3934 struct page *page = 3935 list_first_entry(&recon_state.pagelist->head, 3936 struct page, lru); 3937 __le32 *addr = kmap_atomic(page); 3938 if (recon_state.nr_caps) { 3939 WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms); 3940 *addr = cpu_to_le32(recon_state.nr_caps); 3941 } else if (recon_state.msg_version >= 4) { 3942 *(addr + 1) = cpu_to_le32(recon_state.nr_realms); 3943 } 3944 kunmap_atomic(addr); 3945 } 3946 3947 reply->hdr.version = cpu_to_le16(recon_state.msg_version); 3948 if (recon_state.msg_version >= 4) 3949 reply->hdr.compat_version = cpu_to_le16(4); 3950 3951 reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length); 3952 ceph_msg_data_add_pagelist(reply, recon_state.pagelist); 3953 3954 ceph_con_send(&session->s_con, reply); 3955 3956 mutex_unlock(&session->s_mutex); 3957 3958 mutex_lock(&mdsc->mutex); 3959 __wake_requests(mdsc, &session->s_waiting); 3960 mutex_unlock(&mdsc->mutex); 3961 3962 up_read(&mdsc->snap_rwsem); 3963 ceph_pagelist_release(recon_state.pagelist); 3964 return; 3965 3966 fail: 3967 ceph_msg_put(reply); 3968 up_read(&mdsc->snap_rwsem); 3969 mutex_unlock(&session->s_mutex); 3970 fail_nomsg: 3971 ceph_pagelist_release(recon_state.pagelist); 3972 fail_nopagelist: 3973 pr_err("error %d preparing reconnect for mds%d\n", err, mds); 3974 return; 3975 } 3976 3977 3978 /* 3979 * compare old and new mdsmaps, kicking requests 3980 * and closing out old connections as necessary 3981 * 3982 * called under mdsc->mutex. 3983 */ 3984 static void check_new_map(struct ceph_mds_client *mdsc, 3985 struct ceph_mdsmap *newmap, 3986 struct ceph_mdsmap *oldmap) 3987 { 3988 int i; 3989 int oldstate, newstate; 3990 struct ceph_mds_session *s; 3991 3992 dout("check_new_map new %u old %u\n", 3993 newmap->m_epoch, oldmap->m_epoch); 3994 3995 for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) { 3996 if (!mdsc->sessions[i]) 3997 continue; 3998 s = mdsc->sessions[i]; 3999 oldstate = ceph_mdsmap_get_state(oldmap, i); 4000 newstate = ceph_mdsmap_get_state(newmap, i); 4001 4002 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n", 4003 i, ceph_mds_state_name(oldstate), 4004 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", 4005 ceph_mds_state_name(newstate), 4006 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", 4007 ceph_session_state_name(s->s_state)); 4008 4009 if (i >= newmap->possible_max_rank) { 4010 /* force close session for stopped mds */ 4011 ceph_get_mds_session(s); 4012 __unregister_session(mdsc, s); 4013 __wake_requests(mdsc, &s->s_waiting); 4014 mutex_unlock(&mdsc->mutex); 4015 4016 mutex_lock(&s->s_mutex); 4017 cleanup_session_requests(mdsc, s); 4018 remove_session_caps(s); 4019 mutex_unlock(&s->s_mutex); 4020 4021 ceph_put_mds_session(s); 4022 4023 mutex_lock(&mdsc->mutex); 4024 kick_requests(mdsc, i); 4025 continue; 4026 } 4027 4028 if (memcmp(ceph_mdsmap_get_addr(oldmap, i), 4029 ceph_mdsmap_get_addr(newmap, i), 4030 sizeof(struct ceph_entity_addr))) { 4031 /* just close it */ 4032 mutex_unlock(&mdsc->mutex); 4033 mutex_lock(&s->s_mutex); 4034 mutex_lock(&mdsc->mutex); 4035 ceph_con_close(&s->s_con); 4036 mutex_unlock(&s->s_mutex); 4037 s->s_state = CEPH_MDS_SESSION_RESTARTING; 4038 } else if (oldstate == newstate) { 4039 continue; /* nothing new with this mds */ 4040 } 4041 4042 /* 4043 * send reconnect? 4044 */ 4045 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 4046 newstate >= CEPH_MDS_STATE_RECONNECT) { 4047 mutex_unlock(&mdsc->mutex); 4048 send_mds_reconnect(mdsc, s); 4049 mutex_lock(&mdsc->mutex); 4050 } 4051 4052 /* 4053 * kick request on any mds that has gone active. 4054 */ 4055 if (oldstate < CEPH_MDS_STATE_ACTIVE && 4056 newstate >= CEPH_MDS_STATE_ACTIVE) { 4057 if (oldstate != CEPH_MDS_STATE_CREATING && 4058 oldstate != CEPH_MDS_STATE_STARTING) 4059 pr_info("mds%d recovery completed\n", s->s_mds); 4060 kick_requests(mdsc, i); 4061 mutex_unlock(&mdsc->mutex); 4062 mutex_lock(&s->s_mutex); 4063 mutex_lock(&mdsc->mutex); 4064 ceph_kick_flushing_caps(mdsc, s); 4065 mutex_unlock(&s->s_mutex); 4066 wake_up_session_caps(s, RECONNECT); 4067 } 4068 } 4069 4070 for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) { 4071 s = mdsc->sessions[i]; 4072 if (!s) 4073 continue; 4074 if (!ceph_mdsmap_is_laggy(newmap, i)) 4075 continue; 4076 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4077 s->s_state == CEPH_MDS_SESSION_HUNG || 4078 s->s_state == CEPH_MDS_SESSION_CLOSING) { 4079 dout(" connecting to export targets of laggy mds%d\n", 4080 i); 4081 __open_export_target_sessions(mdsc, s); 4082 } 4083 } 4084 } 4085 4086 4087 4088 /* 4089 * leases 4090 */ 4091 4092 /* 4093 * caller must hold session s_mutex, dentry->d_lock 4094 */ 4095 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry) 4096 { 4097 struct ceph_dentry_info *di = ceph_dentry(dentry); 4098 4099 ceph_put_mds_session(di->lease_session); 4100 di->lease_session = NULL; 4101 } 4102 4103 static void handle_lease(struct ceph_mds_client *mdsc, 4104 struct ceph_mds_session *session, 4105 struct ceph_msg *msg) 4106 { 4107 struct super_block *sb = mdsc->fsc->sb; 4108 struct inode *inode; 4109 struct dentry *parent, *dentry; 4110 struct ceph_dentry_info *di; 4111 int mds = session->s_mds; 4112 struct ceph_mds_lease *h = msg->front.iov_base; 4113 u32 seq; 4114 struct ceph_vino vino; 4115 struct qstr dname; 4116 int release = 0; 4117 4118 dout("handle_lease from mds%d\n", mds); 4119 4120 /* decode */ 4121 if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) 4122 goto bad; 4123 vino.ino = le64_to_cpu(h->ino); 4124 vino.snap = CEPH_NOSNAP; 4125 seq = le32_to_cpu(h->seq); 4126 dname.len = get_unaligned_le32(h + 1); 4127 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len) 4128 goto bad; 4129 dname.name = (void *)(h + 1) + sizeof(u32); 4130 4131 /* lookup inode */ 4132 inode = ceph_find_inode(sb, vino); 4133 dout("handle_lease %s, ino %llx %p %.*s\n", 4134 ceph_lease_op_name(h->action), vino.ino, inode, 4135 dname.len, dname.name); 4136 4137 mutex_lock(&session->s_mutex); 4138 session->s_seq++; 4139 4140 if (!inode) { 4141 dout("handle_lease no inode %llx\n", vino.ino); 4142 goto release; 4143 } 4144 4145 /* dentry */ 4146 parent = d_find_alias(inode); 4147 if (!parent) { 4148 dout("no parent dentry on inode %p\n", inode); 4149 WARN_ON(1); 4150 goto release; /* hrm... */ 4151 } 4152 dname.hash = full_name_hash(parent, dname.name, dname.len); 4153 dentry = d_lookup(parent, &dname); 4154 dput(parent); 4155 if (!dentry) 4156 goto release; 4157 4158 spin_lock(&dentry->d_lock); 4159 di = ceph_dentry(dentry); 4160 switch (h->action) { 4161 case CEPH_MDS_LEASE_REVOKE: 4162 if (di->lease_session == session) { 4163 if (ceph_seq_cmp(di->lease_seq, seq) > 0) 4164 h->seq = cpu_to_le32(di->lease_seq); 4165 __ceph_mdsc_drop_dentry_lease(dentry); 4166 } 4167 release = 1; 4168 break; 4169 4170 case CEPH_MDS_LEASE_RENEW: 4171 if (di->lease_session == session && 4172 di->lease_gen == session->s_cap_gen && 4173 di->lease_renew_from && 4174 di->lease_renew_after == 0) { 4175 unsigned long duration = 4176 msecs_to_jiffies(le32_to_cpu(h->duration_ms)); 4177 4178 di->lease_seq = seq; 4179 di->time = di->lease_renew_from + duration; 4180 di->lease_renew_after = di->lease_renew_from + 4181 (duration >> 1); 4182 di->lease_renew_from = 0; 4183 } 4184 break; 4185 } 4186 spin_unlock(&dentry->d_lock); 4187 dput(dentry); 4188 4189 if (!release) 4190 goto out; 4191 4192 release: 4193 /* let's just reuse the same message */ 4194 h->action = CEPH_MDS_LEASE_REVOKE_ACK; 4195 ceph_msg_get(msg); 4196 ceph_con_send(&session->s_con, msg); 4197 4198 out: 4199 mutex_unlock(&session->s_mutex); 4200 /* avoid calling iput_final() in mds dispatch threads */ 4201 ceph_async_iput(inode); 4202 return; 4203 4204 bad: 4205 pr_err("corrupt lease message\n"); 4206 ceph_msg_dump(msg); 4207 } 4208 4209 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, 4210 struct dentry *dentry, char action, 4211 u32 seq) 4212 { 4213 struct ceph_msg *msg; 4214 struct ceph_mds_lease *lease; 4215 struct inode *dir; 4216 int len = sizeof(*lease) + sizeof(u32) + NAME_MAX; 4217 4218 dout("lease_send_msg identry %p %s to mds%d\n", 4219 dentry, ceph_lease_op_name(action), session->s_mds); 4220 4221 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); 4222 if (!msg) 4223 return; 4224 lease = msg->front.iov_base; 4225 lease->action = action; 4226 lease->seq = cpu_to_le32(seq); 4227 4228 spin_lock(&dentry->d_lock); 4229 dir = d_inode(dentry->d_parent); 4230 lease->ino = cpu_to_le64(ceph_ino(dir)); 4231 lease->first = lease->last = cpu_to_le64(ceph_snap(dir)); 4232 4233 put_unaligned_le32(dentry->d_name.len, lease + 1); 4234 memcpy((void *)(lease + 1) + 4, 4235 dentry->d_name.name, dentry->d_name.len); 4236 spin_unlock(&dentry->d_lock); 4237 /* 4238 * if this is a preemptive lease RELEASE, no need to 4239 * flush request stream, since the actual request will 4240 * soon follow. 4241 */ 4242 msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE); 4243 4244 ceph_con_send(&session->s_con, msg); 4245 } 4246 4247 /* 4248 * lock unlock sessions, to wait ongoing session activities 4249 */ 4250 static void lock_unlock_sessions(struct ceph_mds_client *mdsc) 4251 { 4252 int i; 4253 4254 mutex_lock(&mdsc->mutex); 4255 for (i = 0; i < mdsc->max_sessions; i++) { 4256 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 4257 if (!s) 4258 continue; 4259 mutex_unlock(&mdsc->mutex); 4260 mutex_lock(&s->s_mutex); 4261 mutex_unlock(&s->s_mutex); 4262 ceph_put_mds_session(s); 4263 mutex_lock(&mdsc->mutex); 4264 } 4265 mutex_unlock(&mdsc->mutex); 4266 } 4267 4268 static void maybe_recover_session(struct ceph_mds_client *mdsc) 4269 { 4270 struct ceph_fs_client *fsc = mdsc->fsc; 4271 4272 if (!ceph_test_mount_opt(fsc, CLEANRECOVER)) 4273 return; 4274 4275 if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED) 4276 return; 4277 4278 if (!READ_ONCE(fsc->blacklisted)) 4279 return; 4280 4281 if (fsc->last_auto_reconnect && 4282 time_before(jiffies, fsc->last_auto_reconnect + HZ * 60 * 30)) 4283 return; 4284 4285 pr_info("auto reconnect after blacklisted\n"); 4286 fsc->last_auto_reconnect = jiffies; 4287 ceph_force_reconnect(fsc->sb); 4288 } 4289 4290 bool check_session_state(struct ceph_mds_session *s) 4291 { 4292 if (s->s_state == CEPH_MDS_SESSION_CLOSING) { 4293 dout("resending session close request for mds%d\n", 4294 s->s_mds); 4295 request_close_session(s); 4296 return false; 4297 } 4298 if (s->s_ttl && time_after(jiffies, s->s_ttl)) { 4299 if (s->s_state == CEPH_MDS_SESSION_OPEN) { 4300 s->s_state = CEPH_MDS_SESSION_HUNG; 4301 pr_info("mds%d hung\n", s->s_mds); 4302 } 4303 } 4304 if (s->s_state == CEPH_MDS_SESSION_NEW || 4305 s->s_state == CEPH_MDS_SESSION_RESTARTING || 4306 s->s_state == CEPH_MDS_SESSION_REJECTED) 4307 /* this mds is failed or recovering, just wait */ 4308 return false; 4309 4310 return true; 4311 } 4312 4313 /* 4314 * delayed work -- periodically trim expired leases, renew caps with mds 4315 */ 4316 static void schedule_delayed(struct ceph_mds_client *mdsc) 4317 { 4318 int delay = 5; 4319 unsigned hz = round_jiffies_relative(HZ * delay); 4320 schedule_delayed_work(&mdsc->delayed_work, hz); 4321 } 4322 4323 static void delayed_work(struct work_struct *work) 4324 { 4325 int i; 4326 struct ceph_mds_client *mdsc = 4327 container_of(work, struct ceph_mds_client, delayed_work.work); 4328 int renew_interval; 4329 int renew_caps; 4330 4331 dout("mdsc delayed_work\n"); 4332 4333 if (mdsc->stopping) 4334 return; 4335 4336 mutex_lock(&mdsc->mutex); 4337 renew_interval = mdsc->mdsmap->m_session_timeout >> 2; 4338 renew_caps = time_after_eq(jiffies, HZ*renew_interval + 4339 mdsc->last_renew_caps); 4340 if (renew_caps) 4341 mdsc->last_renew_caps = jiffies; 4342 4343 for (i = 0; i < mdsc->max_sessions; i++) { 4344 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 4345 if (!s) 4346 continue; 4347 4348 if (!check_session_state(s)) { 4349 ceph_put_mds_session(s); 4350 continue; 4351 } 4352 mutex_unlock(&mdsc->mutex); 4353 4354 mutex_lock(&s->s_mutex); 4355 if (renew_caps) 4356 send_renew_caps(mdsc, s); 4357 else 4358 ceph_con_keepalive(&s->s_con); 4359 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4360 s->s_state == CEPH_MDS_SESSION_HUNG) 4361 ceph_send_cap_releases(mdsc, s); 4362 mutex_unlock(&s->s_mutex); 4363 ceph_put_mds_session(s); 4364 4365 mutex_lock(&mdsc->mutex); 4366 } 4367 mutex_unlock(&mdsc->mutex); 4368 4369 ceph_check_delayed_caps(mdsc); 4370 4371 ceph_queue_cap_reclaim_work(mdsc); 4372 4373 ceph_trim_snapid_map(mdsc); 4374 4375 maybe_recover_session(mdsc); 4376 4377 schedule_delayed(mdsc); 4378 } 4379 4380 int ceph_mdsc_init(struct ceph_fs_client *fsc) 4381 4382 { 4383 struct ceph_mds_client *mdsc; 4384 int err; 4385 4386 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS); 4387 if (!mdsc) 4388 return -ENOMEM; 4389 mdsc->fsc = fsc; 4390 mutex_init(&mdsc->mutex); 4391 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); 4392 if (!mdsc->mdsmap) { 4393 err = -ENOMEM; 4394 goto err_mdsc; 4395 } 4396 4397 fsc->mdsc = mdsc; 4398 init_completion(&mdsc->safe_umount_waiters); 4399 init_waitqueue_head(&mdsc->session_close_wq); 4400 INIT_LIST_HEAD(&mdsc->waiting_for_map); 4401 mdsc->sessions = NULL; 4402 atomic_set(&mdsc->num_sessions, 0); 4403 mdsc->max_sessions = 0; 4404 mdsc->stopping = 0; 4405 atomic64_set(&mdsc->quotarealms_count, 0); 4406 mdsc->quotarealms_inodes = RB_ROOT; 4407 mutex_init(&mdsc->quotarealms_inodes_mutex); 4408 mdsc->last_snap_seq = 0; 4409 init_rwsem(&mdsc->snap_rwsem); 4410 mdsc->snap_realms = RB_ROOT; 4411 INIT_LIST_HEAD(&mdsc->snap_empty); 4412 mdsc->num_snap_realms = 0; 4413 spin_lock_init(&mdsc->snap_empty_lock); 4414 mdsc->last_tid = 0; 4415 mdsc->oldest_tid = 0; 4416 mdsc->request_tree = RB_ROOT; 4417 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); 4418 mdsc->last_renew_caps = jiffies; 4419 INIT_LIST_HEAD(&mdsc->cap_delay_list); 4420 INIT_LIST_HEAD(&mdsc->cap_wait_list); 4421 spin_lock_init(&mdsc->cap_delay_lock); 4422 INIT_LIST_HEAD(&mdsc->snap_flush_list); 4423 spin_lock_init(&mdsc->snap_flush_lock); 4424 mdsc->last_cap_flush_tid = 1; 4425 INIT_LIST_HEAD(&mdsc->cap_flush_list); 4426 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); 4427 mdsc->num_cap_flushing = 0; 4428 spin_lock_init(&mdsc->cap_dirty_lock); 4429 init_waitqueue_head(&mdsc->cap_flushing_wq); 4430 INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work); 4431 atomic_set(&mdsc->cap_reclaim_pending, 0); 4432 err = ceph_metric_init(&mdsc->metric); 4433 if (err) 4434 goto err_mdsmap; 4435 4436 spin_lock_init(&mdsc->dentry_list_lock); 4437 INIT_LIST_HEAD(&mdsc->dentry_leases); 4438 INIT_LIST_HEAD(&mdsc->dentry_dir_leases); 4439 4440 ceph_caps_init(mdsc); 4441 ceph_adjust_caps_max_min(mdsc, fsc->mount_options); 4442 4443 spin_lock_init(&mdsc->snapid_map_lock); 4444 mdsc->snapid_map_tree = RB_ROOT; 4445 INIT_LIST_HEAD(&mdsc->snapid_map_lru); 4446 4447 init_rwsem(&mdsc->pool_perm_rwsem); 4448 mdsc->pool_perm_tree = RB_ROOT; 4449 4450 strscpy(mdsc->nodename, utsname()->nodename, 4451 sizeof(mdsc->nodename)); 4452 return 0; 4453 4454 err_mdsmap: 4455 kfree(mdsc->mdsmap); 4456 err_mdsc: 4457 kfree(mdsc); 4458 return err; 4459 } 4460 4461 /* 4462 * Wait for safe replies on open mds requests. If we time out, drop 4463 * all requests from the tree to avoid dangling dentry refs. 4464 */ 4465 static void wait_requests(struct ceph_mds_client *mdsc) 4466 { 4467 struct ceph_options *opts = mdsc->fsc->client->options; 4468 struct ceph_mds_request *req; 4469 4470 mutex_lock(&mdsc->mutex); 4471 if (__get_oldest_req(mdsc)) { 4472 mutex_unlock(&mdsc->mutex); 4473 4474 dout("wait_requests waiting for requests\n"); 4475 wait_for_completion_timeout(&mdsc->safe_umount_waiters, 4476 ceph_timeout_jiffies(opts->mount_timeout)); 4477 4478 /* tear down remaining requests */ 4479 mutex_lock(&mdsc->mutex); 4480 while ((req = __get_oldest_req(mdsc))) { 4481 dout("wait_requests timed out on tid %llu\n", 4482 req->r_tid); 4483 list_del_init(&req->r_wait); 4484 __unregister_request(mdsc, req); 4485 } 4486 } 4487 mutex_unlock(&mdsc->mutex); 4488 dout("wait_requests done\n"); 4489 } 4490 4491 /* 4492 * called before mount is ro, and before dentries are torn down. 4493 * (hmm, does this still race with new lookups?) 4494 */ 4495 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) 4496 { 4497 dout("pre_umount\n"); 4498 mdsc->stopping = 1; 4499 4500 lock_unlock_sessions(mdsc); 4501 ceph_flush_dirty_caps(mdsc); 4502 wait_requests(mdsc); 4503 4504 /* 4505 * wait for reply handlers to drop their request refs and 4506 * their inode/dcache refs 4507 */ 4508 ceph_msgr_flush(); 4509 4510 ceph_cleanup_quotarealms_inodes(mdsc); 4511 } 4512 4513 /* 4514 * wait for all write mds requests to flush. 4515 */ 4516 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid) 4517 { 4518 struct ceph_mds_request *req = NULL, *nextreq; 4519 struct rb_node *n; 4520 4521 mutex_lock(&mdsc->mutex); 4522 dout("wait_unsafe_requests want %lld\n", want_tid); 4523 restart: 4524 req = __get_oldest_req(mdsc); 4525 while (req && req->r_tid <= want_tid) { 4526 /* find next request */ 4527 n = rb_next(&req->r_node); 4528 if (n) 4529 nextreq = rb_entry(n, struct ceph_mds_request, r_node); 4530 else 4531 nextreq = NULL; 4532 if (req->r_op != CEPH_MDS_OP_SETFILELOCK && 4533 (req->r_op & CEPH_MDS_OP_WRITE)) { 4534 /* write op */ 4535 ceph_mdsc_get_request(req); 4536 if (nextreq) 4537 ceph_mdsc_get_request(nextreq); 4538 mutex_unlock(&mdsc->mutex); 4539 dout("wait_unsafe_requests wait on %llu (want %llu)\n", 4540 req->r_tid, want_tid); 4541 wait_for_completion(&req->r_safe_completion); 4542 mutex_lock(&mdsc->mutex); 4543 ceph_mdsc_put_request(req); 4544 if (!nextreq) 4545 break; /* next dne before, so we're done! */ 4546 if (RB_EMPTY_NODE(&nextreq->r_node)) { 4547 /* next request was removed from tree */ 4548 ceph_mdsc_put_request(nextreq); 4549 goto restart; 4550 } 4551 ceph_mdsc_put_request(nextreq); /* won't go away */ 4552 } 4553 req = nextreq; 4554 } 4555 mutex_unlock(&mdsc->mutex); 4556 dout("wait_unsafe_requests done\n"); 4557 } 4558 4559 void ceph_mdsc_sync(struct ceph_mds_client *mdsc) 4560 { 4561 u64 want_tid, want_flush; 4562 4563 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 4564 return; 4565 4566 dout("sync\n"); 4567 mutex_lock(&mdsc->mutex); 4568 want_tid = mdsc->last_tid; 4569 mutex_unlock(&mdsc->mutex); 4570 4571 ceph_flush_dirty_caps(mdsc); 4572 spin_lock(&mdsc->cap_dirty_lock); 4573 want_flush = mdsc->last_cap_flush_tid; 4574 if (!list_empty(&mdsc->cap_flush_list)) { 4575 struct ceph_cap_flush *cf = 4576 list_last_entry(&mdsc->cap_flush_list, 4577 struct ceph_cap_flush, g_list); 4578 cf->wake = true; 4579 } 4580 spin_unlock(&mdsc->cap_dirty_lock); 4581 4582 dout("sync want tid %lld flush_seq %lld\n", 4583 want_tid, want_flush); 4584 4585 wait_unsafe_requests(mdsc, want_tid); 4586 wait_caps_flush(mdsc, want_flush); 4587 } 4588 4589 /* 4590 * true if all sessions are closed, or we force unmount 4591 */ 4592 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped) 4593 { 4594 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 4595 return true; 4596 return atomic_read(&mdsc->num_sessions) <= skipped; 4597 } 4598 4599 /* 4600 * called after sb is ro. 4601 */ 4602 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) 4603 { 4604 struct ceph_options *opts = mdsc->fsc->client->options; 4605 struct ceph_mds_session *session; 4606 int i; 4607 int skipped = 0; 4608 4609 dout("close_sessions\n"); 4610 4611 /* close sessions */ 4612 mutex_lock(&mdsc->mutex); 4613 for (i = 0; i < mdsc->max_sessions; i++) { 4614 session = __ceph_lookup_mds_session(mdsc, i); 4615 if (!session) 4616 continue; 4617 mutex_unlock(&mdsc->mutex); 4618 mutex_lock(&session->s_mutex); 4619 if (__close_session(mdsc, session) <= 0) 4620 skipped++; 4621 mutex_unlock(&session->s_mutex); 4622 ceph_put_mds_session(session); 4623 mutex_lock(&mdsc->mutex); 4624 } 4625 mutex_unlock(&mdsc->mutex); 4626 4627 dout("waiting for sessions to close\n"); 4628 wait_event_timeout(mdsc->session_close_wq, 4629 done_closing_sessions(mdsc, skipped), 4630 ceph_timeout_jiffies(opts->mount_timeout)); 4631 4632 /* tear down remaining sessions */ 4633 mutex_lock(&mdsc->mutex); 4634 for (i = 0; i < mdsc->max_sessions; i++) { 4635 if (mdsc->sessions[i]) { 4636 session = ceph_get_mds_session(mdsc->sessions[i]); 4637 __unregister_session(mdsc, session); 4638 mutex_unlock(&mdsc->mutex); 4639 mutex_lock(&session->s_mutex); 4640 remove_session_caps(session); 4641 mutex_unlock(&session->s_mutex); 4642 ceph_put_mds_session(session); 4643 mutex_lock(&mdsc->mutex); 4644 } 4645 } 4646 WARN_ON(!list_empty(&mdsc->cap_delay_list)); 4647 mutex_unlock(&mdsc->mutex); 4648 4649 ceph_cleanup_snapid_map(mdsc); 4650 ceph_cleanup_empty_realms(mdsc); 4651 4652 cancel_work_sync(&mdsc->cap_reclaim_work); 4653 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 4654 4655 dout("stopped\n"); 4656 } 4657 4658 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc) 4659 { 4660 struct ceph_mds_session *session; 4661 int mds; 4662 4663 dout("force umount\n"); 4664 4665 mutex_lock(&mdsc->mutex); 4666 for (mds = 0; mds < mdsc->max_sessions; mds++) { 4667 session = __ceph_lookup_mds_session(mdsc, mds); 4668 if (!session) 4669 continue; 4670 4671 if (session->s_state == CEPH_MDS_SESSION_REJECTED) 4672 __unregister_session(mdsc, session); 4673 __wake_requests(mdsc, &session->s_waiting); 4674 mutex_unlock(&mdsc->mutex); 4675 4676 mutex_lock(&session->s_mutex); 4677 __close_session(mdsc, session); 4678 if (session->s_state == CEPH_MDS_SESSION_CLOSING) { 4679 cleanup_session_requests(mdsc, session); 4680 remove_session_caps(session); 4681 } 4682 mutex_unlock(&session->s_mutex); 4683 ceph_put_mds_session(session); 4684 4685 mutex_lock(&mdsc->mutex); 4686 kick_requests(mdsc, mds); 4687 } 4688 __wake_requests(mdsc, &mdsc->waiting_for_map); 4689 mutex_unlock(&mdsc->mutex); 4690 } 4691 4692 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) 4693 { 4694 dout("stop\n"); 4695 /* 4696 * Make sure the delayed work stopped before releasing 4697 * the resources. 4698 * 4699 * Because the cancel_delayed_work_sync() will only 4700 * guarantee that the work finishes executing. But the 4701 * delayed work will re-arm itself again after that. 4702 */ 4703 flush_delayed_work(&mdsc->delayed_work); 4704 4705 if (mdsc->mdsmap) 4706 ceph_mdsmap_destroy(mdsc->mdsmap); 4707 kfree(mdsc->sessions); 4708 ceph_caps_finalize(mdsc); 4709 ceph_pool_perm_destroy(mdsc); 4710 } 4711 4712 void ceph_mdsc_destroy(struct ceph_fs_client *fsc) 4713 { 4714 struct ceph_mds_client *mdsc = fsc->mdsc; 4715 dout("mdsc_destroy %p\n", mdsc); 4716 4717 if (!mdsc) 4718 return; 4719 4720 /* flush out any connection work with references to us */ 4721 ceph_msgr_flush(); 4722 4723 ceph_mdsc_stop(mdsc); 4724 4725 ceph_metric_destroy(&mdsc->metric); 4726 4727 fsc->mdsc = NULL; 4728 kfree(mdsc); 4729 dout("mdsc_destroy %p done\n", mdsc); 4730 } 4731 4732 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 4733 { 4734 struct ceph_fs_client *fsc = mdsc->fsc; 4735 const char *mds_namespace = fsc->mount_options->mds_namespace; 4736 void *p = msg->front.iov_base; 4737 void *end = p + msg->front.iov_len; 4738 u32 epoch; 4739 u32 map_len; 4740 u32 num_fs; 4741 u32 mount_fscid = (u32)-1; 4742 u8 struct_v, struct_cv; 4743 int err = -EINVAL; 4744 4745 ceph_decode_need(&p, end, sizeof(u32), bad); 4746 epoch = ceph_decode_32(&p); 4747 4748 dout("handle_fsmap epoch %u\n", epoch); 4749 4750 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 4751 struct_v = ceph_decode_8(&p); 4752 struct_cv = ceph_decode_8(&p); 4753 map_len = ceph_decode_32(&p); 4754 4755 ceph_decode_need(&p, end, sizeof(u32) * 3, bad); 4756 p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */ 4757 4758 num_fs = ceph_decode_32(&p); 4759 while (num_fs-- > 0) { 4760 void *info_p, *info_end; 4761 u32 info_len; 4762 u8 info_v, info_cv; 4763 u32 fscid, namelen; 4764 4765 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 4766 info_v = ceph_decode_8(&p); 4767 info_cv = ceph_decode_8(&p); 4768 info_len = ceph_decode_32(&p); 4769 ceph_decode_need(&p, end, info_len, bad); 4770 info_p = p; 4771 info_end = p + info_len; 4772 p = info_end; 4773 4774 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad); 4775 fscid = ceph_decode_32(&info_p); 4776 namelen = ceph_decode_32(&info_p); 4777 ceph_decode_need(&info_p, info_end, namelen, bad); 4778 4779 if (mds_namespace && 4780 strlen(mds_namespace) == namelen && 4781 !strncmp(mds_namespace, (char *)info_p, namelen)) { 4782 mount_fscid = fscid; 4783 break; 4784 } 4785 } 4786 4787 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch); 4788 if (mount_fscid != (u32)-1) { 4789 fsc->client->monc.fs_cluster_id = mount_fscid; 4790 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 4791 0, true); 4792 ceph_monc_renew_subs(&fsc->client->monc); 4793 } else { 4794 err = -ENOENT; 4795 goto err_out; 4796 } 4797 return; 4798 4799 bad: 4800 pr_err("error decoding fsmap\n"); 4801 err_out: 4802 mutex_lock(&mdsc->mutex); 4803 mdsc->mdsmap_err = err; 4804 __wake_requests(mdsc, &mdsc->waiting_for_map); 4805 mutex_unlock(&mdsc->mutex); 4806 } 4807 4808 /* 4809 * handle mds map update. 4810 */ 4811 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 4812 { 4813 u32 epoch; 4814 u32 maplen; 4815 void *p = msg->front.iov_base; 4816 void *end = p + msg->front.iov_len; 4817 struct ceph_mdsmap *newmap, *oldmap; 4818 struct ceph_fsid fsid; 4819 int err = -EINVAL; 4820 4821 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad); 4822 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 4823 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0) 4824 return; 4825 epoch = ceph_decode_32(&p); 4826 maplen = ceph_decode_32(&p); 4827 dout("handle_map epoch %u len %d\n", epoch, (int)maplen); 4828 4829 /* do we need it? */ 4830 mutex_lock(&mdsc->mutex); 4831 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 4832 dout("handle_map epoch %u <= our %u\n", 4833 epoch, mdsc->mdsmap->m_epoch); 4834 mutex_unlock(&mdsc->mutex); 4835 return; 4836 } 4837 4838 newmap = ceph_mdsmap_decode(&p, end); 4839 if (IS_ERR(newmap)) { 4840 err = PTR_ERR(newmap); 4841 goto bad_unlock; 4842 } 4843 4844 /* swap into place */ 4845 if (mdsc->mdsmap) { 4846 oldmap = mdsc->mdsmap; 4847 mdsc->mdsmap = newmap; 4848 check_new_map(mdsc, newmap, oldmap); 4849 ceph_mdsmap_destroy(oldmap); 4850 } else { 4851 mdsc->mdsmap = newmap; /* first mds map */ 4852 } 4853 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size, 4854 MAX_LFS_FILESIZE); 4855 4856 __wake_requests(mdsc, &mdsc->waiting_for_map); 4857 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP, 4858 mdsc->mdsmap->m_epoch); 4859 4860 mutex_unlock(&mdsc->mutex); 4861 schedule_delayed(mdsc); 4862 return; 4863 4864 bad_unlock: 4865 mutex_unlock(&mdsc->mutex); 4866 bad: 4867 pr_err("error decoding mdsmap %d\n", err); 4868 return; 4869 } 4870 4871 static struct ceph_connection *con_get(struct ceph_connection *con) 4872 { 4873 struct ceph_mds_session *s = con->private; 4874 4875 if (ceph_get_mds_session(s)) 4876 return con; 4877 return NULL; 4878 } 4879 4880 static void con_put(struct ceph_connection *con) 4881 { 4882 struct ceph_mds_session *s = con->private; 4883 4884 ceph_put_mds_session(s); 4885 } 4886 4887 /* 4888 * if the client is unresponsive for long enough, the mds will kill 4889 * the session entirely. 4890 */ 4891 static void peer_reset(struct ceph_connection *con) 4892 { 4893 struct ceph_mds_session *s = con->private; 4894 struct ceph_mds_client *mdsc = s->s_mdsc; 4895 4896 pr_warn("mds%d closed our session\n", s->s_mds); 4897 send_mds_reconnect(mdsc, s); 4898 } 4899 4900 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 4901 { 4902 struct ceph_mds_session *s = con->private; 4903 struct ceph_mds_client *mdsc = s->s_mdsc; 4904 int type = le16_to_cpu(msg->hdr.type); 4905 4906 mutex_lock(&mdsc->mutex); 4907 if (__verify_registered_session(mdsc, s) < 0) { 4908 mutex_unlock(&mdsc->mutex); 4909 goto out; 4910 } 4911 mutex_unlock(&mdsc->mutex); 4912 4913 switch (type) { 4914 case CEPH_MSG_MDS_MAP: 4915 ceph_mdsc_handle_mdsmap(mdsc, msg); 4916 break; 4917 case CEPH_MSG_FS_MAP_USER: 4918 ceph_mdsc_handle_fsmap(mdsc, msg); 4919 break; 4920 case CEPH_MSG_CLIENT_SESSION: 4921 handle_session(s, msg); 4922 break; 4923 case CEPH_MSG_CLIENT_REPLY: 4924 handle_reply(s, msg); 4925 break; 4926 case CEPH_MSG_CLIENT_REQUEST_FORWARD: 4927 handle_forward(mdsc, s, msg); 4928 break; 4929 case CEPH_MSG_CLIENT_CAPS: 4930 ceph_handle_caps(s, msg); 4931 break; 4932 case CEPH_MSG_CLIENT_SNAP: 4933 ceph_handle_snap(mdsc, s, msg); 4934 break; 4935 case CEPH_MSG_CLIENT_LEASE: 4936 handle_lease(mdsc, s, msg); 4937 break; 4938 case CEPH_MSG_CLIENT_QUOTA: 4939 ceph_handle_quota(mdsc, s, msg); 4940 break; 4941 4942 default: 4943 pr_err("received unknown message type %d %s\n", type, 4944 ceph_msg_type_name(type)); 4945 } 4946 out: 4947 ceph_msg_put(msg); 4948 } 4949 4950 /* 4951 * authentication 4952 */ 4953 4954 /* 4955 * Note: returned pointer is the address of a structure that's 4956 * managed separately. Caller must *not* attempt to free it. 4957 */ 4958 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 4959 int *proto, int force_new) 4960 { 4961 struct ceph_mds_session *s = con->private; 4962 struct ceph_mds_client *mdsc = s->s_mdsc; 4963 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 4964 struct ceph_auth_handshake *auth = &s->s_auth; 4965 4966 if (force_new && auth->authorizer) { 4967 ceph_auth_destroy_authorizer(auth->authorizer); 4968 auth->authorizer = NULL; 4969 } 4970 if (!auth->authorizer) { 4971 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS, 4972 auth); 4973 if (ret) 4974 return ERR_PTR(ret); 4975 } else { 4976 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS, 4977 auth); 4978 if (ret) 4979 return ERR_PTR(ret); 4980 } 4981 *proto = ac->protocol; 4982 4983 return auth; 4984 } 4985 4986 static int add_authorizer_challenge(struct ceph_connection *con, 4987 void *challenge_buf, int challenge_buf_len) 4988 { 4989 struct ceph_mds_session *s = con->private; 4990 struct ceph_mds_client *mdsc = s->s_mdsc; 4991 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 4992 4993 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer, 4994 challenge_buf, challenge_buf_len); 4995 } 4996 4997 static int verify_authorizer_reply(struct ceph_connection *con) 4998 { 4999 struct ceph_mds_session *s = con->private; 5000 struct ceph_mds_client *mdsc = s->s_mdsc; 5001 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5002 5003 return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer); 5004 } 5005 5006 static int invalidate_authorizer(struct ceph_connection *con) 5007 { 5008 struct ceph_mds_session *s = con->private; 5009 struct ceph_mds_client *mdsc = s->s_mdsc; 5010 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5011 5012 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS); 5013 5014 return ceph_monc_validate_auth(&mdsc->fsc->client->monc); 5015 } 5016 5017 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con, 5018 struct ceph_msg_header *hdr, int *skip) 5019 { 5020 struct ceph_msg *msg; 5021 int type = (int) le16_to_cpu(hdr->type); 5022 int front_len = (int) le32_to_cpu(hdr->front_len); 5023 5024 if (con->in_msg) 5025 return con->in_msg; 5026 5027 *skip = 0; 5028 msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 5029 if (!msg) { 5030 pr_err("unable to allocate msg type %d len %d\n", 5031 type, front_len); 5032 return NULL; 5033 } 5034 5035 return msg; 5036 } 5037 5038 static int mds_sign_message(struct ceph_msg *msg) 5039 { 5040 struct ceph_mds_session *s = msg->con->private; 5041 struct ceph_auth_handshake *auth = &s->s_auth; 5042 5043 return ceph_auth_sign_message(auth, msg); 5044 } 5045 5046 static int mds_check_message_signature(struct ceph_msg *msg) 5047 { 5048 struct ceph_mds_session *s = msg->con->private; 5049 struct ceph_auth_handshake *auth = &s->s_auth; 5050 5051 return ceph_auth_check_message_signature(auth, msg); 5052 } 5053 5054 static const struct ceph_connection_operations mds_con_ops = { 5055 .get = con_get, 5056 .put = con_put, 5057 .dispatch = dispatch, 5058 .get_authorizer = get_authorizer, 5059 .add_authorizer_challenge = add_authorizer_challenge, 5060 .verify_authorizer_reply = verify_authorizer_reply, 5061 .invalidate_authorizer = invalidate_authorizer, 5062 .peer_reset = peer_reset, 5063 .alloc_msg = mds_alloc_msg, 5064 .sign_message = mds_sign_message, 5065 .check_message_signature = mds_check_message_signature, 5066 }; 5067 5068 /* eof */ 5069