1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/fs.h> 5 #include <linux/wait.h> 6 #include <linux/slab.h> 7 #include <linux/gfp.h> 8 #include <linux/sched.h> 9 #include <linux/delay.h> 10 #include <linux/debugfs.h> 11 #include <linux/seq_file.h> 12 #include <linux/ratelimit.h> 13 #include <linux/bits.h> 14 #include <linux/ktime.h> 15 #include <linux/bitmap.h> 16 #include <linux/mnt_idmapping.h> 17 18 #include "super.h" 19 #include "mds_client.h" 20 #include "crypto.h" 21 22 #include <linux/ceph/ceph_features.h> 23 #include <linux/ceph/messenger.h> 24 #include <linux/ceph/decode.h> 25 #include <linux/ceph/pagelist.h> 26 #include <linux/ceph/auth.h> 27 #include <linux/ceph/debugfs.h> 28 #include <trace/events/ceph.h> 29 30 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE) 31 32 /* 33 * A cluster of MDS (metadata server) daemons is responsible for 34 * managing the file system namespace (the directory hierarchy and 35 * inodes) and for coordinating shared access to storage. Metadata is 36 * partitioning hierarchically across a number of servers, and that 37 * partition varies over time as the cluster adjusts the distribution 38 * in order to balance load. 39 * 40 * The MDS client is primarily responsible to managing synchronous 41 * metadata requests for operations like open, unlink, and so forth. 42 * If there is a MDS failure, we find out about it when we (possibly 43 * request and) receive a new MDS map, and can resubmit affected 44 * requests. 45 * 46 * For the most part, though, we take advantage of a lossless 47 * communications channel to the MDS, and do not need to worry about 48 * timing out or resubmitting requests. 49 * 50 * We maintain a stateful "session" with each MDS we interact with. 51 * Within each session, we sent periodic heartbeat messages to ensure 52 * any capabilities or leases we have been issues remain valid. If 53 * the session times out and goes stale, our leases and capabilities 54 * are no longer valid. 55 */ 56 57 struct ceph_reconnect_state { 58 struct ceph_mds_session *session; 59 int nr_caps, nr_realms; 60 struct ceph_pagelist *pagelist; 61 unsigned msg_version; 62 bool allow_multi; 63 }; 64 65 static void __wake_requests(struct ceph_mds_client *mdsc, 66 struct list_head *head); 67 static void ceph_cap_release_work(struct work_struct *work); 68 static void ceph_cap_reclaim_work(struct work_struct *work); 69 static void ceph_mdsc_reset_workfn(struct work_struct *work); 70 71 static const struct ceph_connection_operations mds_con_ops; 72 73 static void ceph_metric_bind_session(struct ceph_mds_client *mdsc, 74 struct ceph_mds_session *session) 75 { 76 struct ceph_mds_session *old; 77 78 if (!mdsc || !session || disable_send_metrics) 79 return; 80 81 old = mdsc->metric.session; 82 mdsc->metric.session = ceph_get_mds_session(session); 83 if (old) 84 ceph_put_mds_session(old); 85 86 metric_schedule_delayed(&mdsc->metric); 87 } 88 89 /* 90 * mds reply parsing 91 */ 92 93 static int parse_reply_info_quota(void **p, void *end, 94 struct ceph_mds_reply_info_in *info) 95 { 96 u8 struct_v, struct_compat; 97 u32 struct_len; 98 99 ceph_decode_8_safe(p, end, struct_v, bad); 100 ceph_decode_8_safe(p, end, struct_compat, bad); 101 /* struct_v is expected to be >= 1. we only 102 * understand encoding with struct_compat == 1. */ 103 if (!struct_v || struct_compat != 1) 104 goto bad; 105 ceph_decode_32_safe(p, end, struct_len, bad); 106 ceph_decode_need(p, end, struct_len, bad); 107 end = *p + struct_len; 108 ceph_decode_64_safe(p, end, info->max_bytes, bad); 109 ceph_decode_64_safe(p, end, info->max_files, bad); 110 *p = end; 111 return 0; 112 bad: 113 return -EIO; 114 } 115 116 static int parse_reply_info_in(void **p, void *end, 117 struct ceph_mds_reply_info_in *info, 118 u64 features, 119 struct ceph_mds_client *mdsc) 120 { 121 int err = 0; 122 u8 struct_v = 0; 123 u8 struct_compat = 0; 124 u32 struct_len = 0; 125 126 info->subvolume_id = CEPH_SUBVOLUME_ID_NONE; 127 128 if (features == (u64)-1) { 129 ceph_decode_8_safe(p, end, struct_v, bad); 130 ceph_decode_8_safe(p, end, struct_compat, bad); 131 /* struct_v is expected to be >= 1. we only understand 132 * encoding with struct_compat == 1. */ 133 if (!struct_v || struct_compat != 1) 134 goto bad; 135 ceph_decode_32_safe(p, end, struct_len, bad); 136 ceph_decode_need(p, end, struct_len, bad); 137 end = *p + struct_len; 138 } 139 140 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad); 141 info->in = *p; 142 *p += sizeof(struct ceph_mds_reply_inode) + 143 sizeof(*info->in->fragtree.splits) * 144 le32_to_cpu(info->in->fragtree.nsplits); 145 146 ceph_decode_32_safe(p, end, info->symlink_len, bad); 147 ceph_decode_need(p, end, info->symlink_len, bad); 148 info->symlink = *p; 149 *p += info->symlink_len; 150 151 ceph_decode_copy_safe(p, end, &info->dir_layout, 152 sizeof(info->dir_layout), bad); 153 ceph_decode_32_safe(p, end, info->xattr_len, bad); 154 ceph_decode_need(p, end, info->xattr_len, bad); 155 info->xattr_data = *p; 156 *p += info->xattr_len; 157 158 if (features == (u64)-1) { 159 /* inline data */ 160 ceph_decode_64_safe(p, end, info->inline_version, bad); 161 ceph_decode_32_safe(p, end, info->inline_len, bad); 162 ceph_decode_need(p, end, info->inline_len, bad); 163 info->inline_data = *p; 164 *p += info->inline_len; 165 /* quota */ 166 err = parse_reply_info_quota(p, end, info); 167 if (err < 0) 168 goto out_bad; 169 /* pool namespace */ 170 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 171 if (info->pool_ns_len > 0) { 172 ceph_decode_need(p, end, info->pool_ns_len, bad); 173 info->pool_ns_data = *p; 174 *p += info->pool_ns_len; 175 } 176 177 /* btime */ 178 ceph_decode_need(p, end, sizeof(info->btime), bad); 179 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 180 181 /* change attribute */ 182 ceph_decode_64_safe(p, end, info->change_attr, bad); 183 184 /* dir pin */ 185 if (struct_v >= 2) { 186 ceph_decode_32_safe(p, end, info->dir_pin, bad); 187 } else { 188 info->dir_pin = -ENODATA; 189 } 190 191 /* snapshot birth time, remains zero for v<=2 */ 192 if (struct_v >= 3) { 193 ceph_decode_need(p, end, sizeof(info->snap_btime), bad); 194 ceph_decode_copy(p, &info->snap_btime, 195 sizeof(info->snap_btime)); 196 } else { 197 memset(&info->snap_btime, 0, sizeof(info->snap_btime)); 198 } 199 200 /* snapshot count, remains zero for v<=3 */ 201 if (struct_v >= 4) { 202 ceph_decode_64_safe(p, end, info->rsnaps, bad); 203 } else { 204 info->rsnaps = 0; 205 } 206 207 if (struct_v >= 5) { 208 u32 alen; 209 210 ceph_decode_32_safe(p, end, alen, bad); 211 212 while (alen--) { 213 u32 len; 214 215 /* key */ 216 ceph_decode_32_safe(p, end, len, bad); 217 ceph_decode_skip_n(p, end, len, bad); 218 /* value */ 219 ceph_decode_32_safe(p, end, len, bad); 220 ceph_decode_skip_n(p, end, len, bad); 221 } 222 } 223 224 /* fscrypt flag -- ignore */ 225 if (struct_v >= 6) 226 ceph_decode_skip_8(p, end, bad); 227 228 info->fscrypt_auth = NULL; 229 info->fscrypt_auth_len = 0; 230 info->fscrypt_file = NULL; 231 info->fscrypt_file_len = 0; 232 if (struct_v >= 7) { 233 ceph_decode_32_safe(p, end, info->fscrypt_auth_len, bad); 234 if (info->fscrypt_auth_len) { 235 info->fscrypt_auth = kmalloc(info->fscrypt_auth_len, 236 GFP_KERNEL); 237 if (!info->fscrypt_auth) 238 return -ENOMEM; 239 ceph_decode_copy_safe(p, end, info->fscrypt_auth, 240 info->fscrypt_auth_len, bad); 241 } 242 ceph_decode_32_safe(p, end, info->fscrypt_file_len, bad); 243 if (info->fscrypt_file_len) { 244 info->fscrypt_file = kmalloc(info->fscrypt_file_len, 245 GFP_KERNEL); 246 if (!info->fscrypt_file) 247 return -ENOMEM; 248 ceph_decode_copy_safe(p, end, info->fscrypt_file, 249 info->fscrypt_file_len, bad); 250 } 251 } 252 253 /* 254 * InodeStat encoding versions: 255 * v1-v7: various fields added over time 256 * v8: added optmetadata (versioned sub-structure containing 257 * optional inode metadata like charmap for case-insensitive 258 * filesystems). The kernel client doesn't support 259 * case-insensitive lookups, so we skip this field. 260 * v9: added subvolume_id (parsed below) 261 */ 262 if (struct_v >= 8) { 263 u32 v8_struct_len; 264 265 /* skip optmetadata versioned sub-structure */ 266 ceph_decode_skip_8(p, end, bad); /* struct_v */ 267 ceph_decode_skip_8(p, end, bad); /* struct_compat */ 268 ceph_decode_32_safe(p, end, v8_struct_len, bad); 269 ceph_decode_skip_n(p, end, v8_struct_len, bad); 270 } 271 272 /* struct_v 9 added subvolume_id */ 273 if (struct_v >= 9) 274 ceph_decode_64_safe(p, end, info->subvolume_id, bad); 275 276 *p = end; 277 } else { 278 /* legacy (unversioned) struct */ 279 if (features & CEPH_FEATURE_MDS_INLINE_DATA) { 280 ceph_decode_64_safe(p, end, info->inline_version, bad); 281 ceph_decode_32_safe(p, end, info->inline_len, bad); 282 ceph_decode_need(p, end, info->inline_len, bad); 283 info->inline_data = *p; 284 *p += info->inline_len; 285 } else 286 info->inline_version = CEPH_INLINE_NONE; 287 288 if (features & CEPH_FEATURE_MDS_QUOTA) { 289 err = parse_reply_info_quota(p, end, info); 290 if (err < 0) 291 goto out_bad; 292 } else { 293 info->max_bytes = 0; 294 info->max_files = 0; 295 } 296 297 info->pool_ns_len = 0; 298 info->pool_ns_data = NULL; 299 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { 300 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 301 if (info->pool_ns_len > 0) { 302 ceph_decode_need(p, end, info->pool_ns_len, bad); 303 info->pool_ns_data = *p; 304 *p += info->pool_ns_len; 305 } 306 } 307 308 if (features & CEPH_FEATURE_FS_BTIME) { 309 ceph_decode_need(p, end, sizeof(info->btime), bad); 310 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 311 ceph_decode_64_safe(p, end, info->change_attr, bad); 312 } 313 314 info->dir_pin = -ENODATA; 315 /* info->snap_btime and info->rsnaps remain zero */ 316 } 317 return 0; 318 bad: 319 err = -EIO; 320 out_bad: 321 return err; 322 } 323 324 static int parse_reply_info_dir(void **p, void *end, 325 struct ceph_mds_reply_dirfrag **dirfrag, 326 u64 features) 327 { 328 if (features == (u64)-1) { 329 u8 struct_v, struct_compat; 330 u32 struct_len; 331 ceph_decode_8_safe(p, end, struct_v, bad); 332 ceph_decode_8_safe(p, end, struct_compat, bad); 333 /* struct_v is expected to be >= 1. we only understand 334 * encoding whose struct_compat == 1. */ 335 if (!struct_v || struct_compat != 1) 336 goto bad; 337 ceph_decode_32_safe(p, end, struct_len, bad); 338 ceph_decode_need(p, end, struct_len, bad); 339 end = *p + struct_len; 340 } 341 342 ceph_decode_need(p, end, sizeof(**dirfrag), bad); 343 *dirfrag = *p; 344 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist); 345 if (unlikely(*p > end)) 346 goto bad; 347 if (features == (u64)-1) 348 *p = end; 349 return 0; 350 bad: 351 return -EIO; 352 } 353 354 static int parse_reply_info_lease(void **p, void *end, 355 struct ceph_mds_reply_lease **lease, 356 u64 features, u32 *altname_len, u8 **altname) 357 { 358 u8 struct_v; 359 u32 struct_len; 360 void *lend; 361 362 if (features == (u64)-1) { 363 u8 struct_compat; 364 365 ceph_decode_8_safe(p, end, struct_v, bad); 366 ceph_decode_8_safe(p, end, struct_compat, bad); 367 368 /* struct_v is expected to be >= 1. we only understand 369 * encoding whose struct_compat == 1. */ 370 if (!struct_v || struct_compat != 1) 371 goto bad; 372 373 ceph_decode_32_safe(p, end, struct_len, bad); 374 } else { 375 struct_len = sizeof(**lease); 376 *altname_len = 0; 377 *altname = NULL; 378 } 379 380 lend = *p + struct_len; 381 ceph_decode_need(p, end, struct_len, bad); 382 *lease = *p; 383 *p += sizeof(**lease); 384 385 if (features == (u64)-1) { 386 if (struct_v >= 2) { 387 ceph_decode_32_safe(p, end, *altname_len, bad); 388 ceph_decode_need(p, end, *altname_len, bad); 389 *altname = *p; 390 *p += *altname_len; 391 } else { 392 *altname = NULL; 393 *altname_len = 0; 394 } 395 } 396 *p = lend; 397 return 0; 398 bad: 399 return -EIO; 400 } 401 402 /* 403 * parse a normal reply, which may contain a (dir+)dentry and/or a 404 * target inode. 405 */ 406 static int parse_reply_info_trace(void **p, void *end, 407 struct ceph_mds_reply_info_parsed *info, 408 u64 features, 409 struct ceph_mds_client *mdsc) 410 { 411 int err; 412 413 if (info->head->is_dentry) { 414 err = parse_reply_info_in(p, end, &info->diri, features, mdsc); 415 if (err < 0) 416 goto out_bad; 417 418 err = parse_reply_info_dir(p, end, &info->dirfrag, features); 419 if (err < 0) 420 goto out_bad; 421 422 ceph_decode_32_safe(p, end, info->dname_len, bad); 423 ceph_decode_need(p, end, info->dname_len, bad); 424 info->dname = *p; 425 *p += info->dname_len; 426 427 err = parse_reply_info_lease(p, end, &info->dlease, features, 428 &info->altname_len, &info->altname); 429 if (err < 0) 430 goto out_bad; 431 } 432 433 if (info->head->is_target) { 434 err = parse_reply_info_in(p, end, &info->targeti, features, 435 mdsc); 436 if (err < 0) 437 goto out_bad; 438 } 439 440 if (unlikely(*p != end)) 441 goto bad; 442 return 0; 443 444 bad: 445 err = -EIO; 446 out_bad: 447 pr_err("problem parsing mds trace %d\n", err); 448 return err; 449 } 450 451 /* 452 * parse readdir results 453 */ 454 static int parse_reply_info_readdir(void **p, void *end, 455 struct ceph_mds_request *req, 456 u64 features, 457 struct ceph_mds_client *mdsc) 458 { 459 struct ceph_mds_reply_info_parsed *info = &req->r_reply_info; 460 struct ceph_client *cl = req->r_mdsc->fsc->client; 461 u32 num, i = 0; 462 int err; 463 464 err = parse_reply_info_dir(p, end, &info->dir_dir, features); 465 if (err < 0) 466 goto out_bad; 467 468 ceph_decode_need(p, end, sizeof(num) + 2, bad); 469 num = ceph_decode_32(p); 470 { 471 u16 flags = ceph_decode_16(p); 472 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END); 473 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE); 474 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER); 475 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH); 476 } 477 if (num == 0) 478 goto done; 479 480 BUG_ON(!info->dir_entries); 481 if ((unsigned long)(info->dir_entries + num) > 482 (unsigned long)info->dir_entries + info->dir_buf_size) { 483 pr_err_client(cl, "dir contents are larger than expected\n"); 484 WARN_ON(1); 485 goto bad; 486 } 487 488 info->dir_nr = num; 489 while (num) { 490 struct inode *inode = d_inode(req->r_dentry); 491 struct ceph_inode_info *ci = ceph_inode(inode); 492 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 493 struct fscrypt_str tname = FSTR_INIT(NULL, 0); 494 struct fscrypt_str oname = FSTR_INIT(NULL, 0); 495 struct ceph_fname fname; 496 u32 altname_len, _name_len; 497 u8 *altname, *_name; 498 499 /* dentry */ 500 ceph_decode_32_safe(p, end, _name_len, bad); 501 ceph_decode_need(p, end, _name_len, bad); 502 _name = *p; 503 *p += _name_len; 504 doutc(cl, "parsed dir dname '%.*s'\n", _name_len, _name); 505 506 if (info->hash_order) 507 rde->raw_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash, 508 _name, _name_len); 509 510 /* dentry lease */ 511 err = parse_reply_info_lease(p, end, &rde->lease, features, 512 &altname_len, &altname); 513 if (err) 514 goto out_bad; 515 516 /* 517 * Try to dencrypt the dentry names and update them 518 * in the ceph_mds_reply_dir_entry struct. 519 */ 520 fname.dir = inode; 521 fname.name = _name; 522 fname.name_len = _name_len; 523 fname.ctext = altname; 524 fname.ctext_len = altname_len; 525 /* 526 * The _name_len maybe larger than altname_len, such as 527 * when the human readable name length is in range of 528 * (CEPH_NOHASH_NAME_MAX, CEPH_NOHASH_NAME_MAX + SHA256_DIGEST_SIZE), 529 * then the copy in ceph_fname_to_usr will corrupt the 530 * data if there has no encryption key. 531 * 532 * Just set the no_copy flag and then if there has no 533 * encryption key the oname.name will be assigned to 534 * _name always. 535 */ 536 fname.no_copy = true; 537 if (altname_len == 0) { 538 /* 539 * Set tname to _name, and this will be used 540 * to do the base64_decode in-place. It's 541 * safe because the decoded string should 542 * always be shorter, which is 3/4 of origin 543 * string. 544 */ 545 tname.name = _name; 546 547 /* 548 * Set oname to _name too, and this will be 549 * used to do the dencryption in-place. 550 */ 551 oname.name = _name; 552 oname.len = _name_len; 553 } else { 554 /* 555 * This will do the decryption only in-place 556 * from altname cryptext directly. 557 */ 558 oname.name = altname; 559 oname.len = altname_len; 560 } 561 rde->is_nokey = false; 562 err = ceph_fname_to_usr(&fname, &tname, &oname, &rde->is_nokey); 563 if (err) { 564 pr_err_client(cl, "unable to decode %.*s, got %d\n", 565 _name_len, _name, err); 566 goto out_bad; 567 } 568 rde->name = oname.name; 569 rde->name_len = oname.len; 570 571 /* inode */ 572 err = parse_reply_info_in(p, end, &rde->inode, features, mdsc); 573 if (err < 0) 574 goto out_bad; 575 /* ceph_readdir_prepopulate() will update it */ 576 rde->offset = 0; 577 i++; 578 num--; 579 } 580 581 done: 582 /* Skip over any unrecognized fields */ 583 *p = end; 584 return 0; 585 586 bad: 587 err = -EIO; 588 out_bad: 589 pr_err_client(cl, "problem parsing dir contents %d\n", err); 590 return err; 591 } 592 593 /* 594 * parse fcntl F_GETLK results 595 */ 596 static int parse_reply_info_filelock(void **p, void *end, 597 struct ceph_mds_reply_info_parsed *info, 598 u64 features) 599 { 600 if (*p + sizeof(*info->filelock_reply) > end) 601 goto bad; 602 603 info->filelock_reply = *p; 604 605 /* Skip over any unrecognized fields */ 606 *p = end; 607 return 0; 608 bad: 609 return -EIO; 610 } 611 612 613 #if BITS_PER_LONG == 64 614 615 #define DELEGATED_INO_AVAILABLE xa_mk_value(1) 616 617 static int ceph_parse_deleg_inos(void **p, void *end, 618 struct ceph_mds_session *s) 619 { 620 struct ceph_client *cl = s->s_mdsc->fsc->client; 621 u32 sets; 622 623 ceph_decode_32_safe(p, end, sets, bad); 624 doutc(cl, "got %u sets of delegated inodes\n", sets); 625 while (sets--) { 626 u64 start, len; 627 628 ceph_decode_64_safe(p, end, start, bad); 629 ceph_decode_64_safe(p, end, len, bad); 630 631 /* Don't accept a delegation of system inodes */ 632 if (start < CEPH_INO_SYSTEM_BASE) { 633 pr_warn_ratelimited_client(cl, 634 "ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n", 635 start, len); 636 continue; 637 } 638 while (len--) { 639 int err = xa_insert(&s->s_delegated_inos, start++, 640 DELEGATED_INO_AVAILABLE, 641 GFP_KERNEL); 642 if (!err) { 643 doutc(cl, "added delegated inode 0x%llx\n", start - 1); 644 } else if (err == -EBUSY) { 645 pr_warn_client(cl, 646 "MDS delegated inode 0x%llx more than once.\n", 647 start - 1); 648 } else { 649 return err; 650 } 651 } 652 } 653 return 0; 654 bad: 655 return -EIO; 656 } 657 658 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 659 { 660 unsigned long ino; 661 void *val; 662 663 xa_for_each(&s->s_delegated_inos, ino, val) { 664 val = xa_erase(&s->s_delegated_inos, ino); 665 if (val == DELEGATED_INO_AVAILABLE) 666 return ino; 667 } 668 return 0; 669 } 670 671 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 672 { 673 return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE, 674 GFP_KERNEL); 675 } 676 #else /* BITS_PER_LONG == 64 */ 677 /* 678 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just 679 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top 680 * and bottom words? 681 */ 682 static int ceph_parse_deleg_inos(void **p, void *end, 683 struct ceph_mds_session *s) 684 { 685 u32 sets; 686 687 ceph_decode_32_safe(p, end, sets, bad); 688 if (sets) 689 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad); 690 return 0; 691 bad: 692 return -EIO; 693 } 694 695 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 696 { 697 return 0; 698 } 699 700 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 701 { 702 return 0; 703 } 704 #endif /* BITS_PER_LONG == 64 */ 705 706 /* 707 * parse create results 708 */ 709 static int parse_reply_info_create(void **p, void *end, 710 struct ceph_mds_reply_info_parsed *info, 711 u64 features, struct ceph_mds_session *s) 712 { 713 int ret; 714 715 if (features == (u64)-1 || 716 (features & CEPH_FEATURE_REPLY_CREATE_INODE)) { 717 if (*p == end) { 718 /* Malformed reply? */ 719 info->has_create_ino = false; 720 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) { 721 info->has_create_ino = true; 722 /* struct_v, struct_compat, and len */ 723 ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad); 724 ceph_decode_64_safe(p, end, info->ino, bad); 725 ret = ceph_parse_deleg_inos(p, end, s); 726 if (ret) 727 return ret; 728 } else { 729 /* legacy */ 730 ceph_decode_64_safe(p, end, info->ino, bad); 731 info->has_create_ino = true; 732 } 733 } else { 734 if (*p != end) 735 goto bad; 736 } 737 738 /* Skip over any unrecognized fields */ 739 *p = end; 740 return 0; 741 bad: 742 return -EIO; 743 } 744 745 static int parse_reply_info_getvxattr(void **p, void *end, 746 struct ceph_mds_reply_info_parsed *info, 747 u64 features) 748 { 749 u32 value_len; 750 751 ceph_decode_skip_8(p, end, bad); /* skip current version: 1 */ 752 ceph_decode_skip_8(p, end, bad); /* skip first version: 1 */ 753 ceph_decode_skip_32(p, end, bad); /* skip payload length */ 754 755 ceph_decode_32_safe(p, end, value_len, bad); 756 757 if (value_len == end - *p) { 758 info->xattr_info.xattr_value = *p; 759 info->xattr_info.xattr_value_len = value_len; 760 *p = end; 761 return value_len; 762 } 763 bad: 764 return -EIO; 765 } 766 767 /* 768 * parse extra results 769 */ 770 static int parse_reply_info_extra(void **p, void *end, 771 struct ceph_mds_request *req, 772 u64 features, struct ceph_mds_session *s) 773 { 774 struct ceph_mds_reply_info_parsed *info = &req->r_reply_info; 775 u32 op = le32_to_cpu(info->head->op); 776 777 if (op == CEPH_MDS_OP_GETFILELOCK) 778 return parse_reply_info_filelock(p, end, info, features); 779 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP) 780 return parse_reply_info_readdir(p, end, req, features, 781 req->r_mdsc); 782 else if (op == CEPH_MDS_OP_CREATE) 783 return parse_reply_info_create(p, end, info, features, s); 784 else if (op == CEPH_MDS_OP_GETVXATTR) 785 return parse_reply_info_getvxattr(p, end, info, features); 786 else 787 return -EIO; 788 } 789 790 /* 791 * parse entire mds reply 792 */ 793 static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg, 794 struct ceph_mds_request *req, u64 features) 795 { 796 struct ceph_mds_reply_info_parsed *info = &req->r_reply_info; 797 struct ceph_client *cl = s->s_mdsc->fsc->client; 798 void *p, *end; 799 u32 len; 800 int err; 801 802 info->head = msg->front.iov_base; 803 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); 804 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head); 805 806 /* trace */ 807 ceph_decode_32_safe(&p, end, len, bad); 808 if (len > 0) { 809 ceph_decode_need(&p, end, len, bad); 810 err = parse_reply_info_trace(&p, p + len, info, features, 811 s->s_mdsc); 812 if (err < 0) 813 goto out_bad; 814 } 815 816 /* extra */ 817 ceph_decode_32_safe(&p, end, len, bad); 818 if (len > 0) { 819 ceph_decode_need(&p, end, len, bad); 820 err = parse_reply_info_extra(&p, p + len, req, features, s); 821 if (err < 0) 822 goto out_bad; 823 } 824 825 /* snap blob */ 826 ceph_decode_32_safe(&p, end, len, bad); 827 info->snapblob_len = len; 828 info->snapblob = p; 829 p += len; 830 831 if (p != end) 832 goto bad; 833 return 0; 834 835 bad: 836 err = -EIO; 837 out_bad: 838 pr_err_client(cl, "mds parse_reply err %d\n", err); 839 ceph_msg_dump(msg); 840 return err; 841 } 842 843 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 844 { 845 int i; 846 847 kfree(info->diri.fscrypt_auth); 848 kfree(info->diri.fscrypt_file); 849 kfree(info->targeti.fscrypt_auth); 850 kfree(info->targeti.fscrypt_file); 851 if (!info->dir_entries) 852 return; 853 854 for (i = 0; i < info->dir_nr; i++) { 855 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 856 857 kfree(rde->inode.fscrypt_auth); 858 kfree(rde->inode.fscrypt_file); 859 } 860 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size)); 861 } 862 863 /* 864 * In async unlink case the kclient won't wait for the first reply 865 * from MDS and just drop all the links and unhash the dentry and then 866 * succeeds immediately. 867 * 868 * For any new create/link/rename,etc requests followed by using the 869 * same file names we must wait for the first reply of the inflight 870 * unlink request, or the MDS possibly will fail these following 871 * requests with -EEXIST if the inflight async unlink request was 872 * delayed for some reasons. 873 * 874 * And the worst case is that for the none async openc request it will 875 * successfully open the file if the CDentry hasn't been unlinked yet, 876 * but later the previous delayed async unlink request will remove the 877 * CDentry. That means the just created file is possibly deleted later 878 * by accident. 879 * 880 * We need to wait for the inflight async unlink requests to finish 881 * when creating new files/directories by using the same file names. 882 */ 883 int ceph_wait_on_conflict_unlink(struct dentry *dentry) 884 { 885 struct ceph_fs_client *fsc = ceph_sb_to_fs_client(dentry->d_sb); 886 struct ceph_client *cl = fsc->client; 887 struct dentry *pdentry = dentry->d_parent; 888 struct dentry *udentry, *found = NULL; 889 struct ceph_dentry_info *di; 890 struct qstr dname; 891 u32 hash = dentry->d_name.hash; 892 int err; 893 894 dname.name = dentry->d_name.name; 895 dname.len = dentry->d_name.len; 896 897 rcu_read_lock(); 898 hash_for_each_possible_rcu(fsc->async_unlink_conflict, di, 899 hnode, hash) { 900 udentry = di->dentry; 901 902 spin_lock(&udentry->d_lock); 903 if (udentry->d_name.hash != hash) 904 goto next; 905 if (unlikely(udentry->d_parent != pdentry)) 906 goto next; 907 if (!hash_hashed(&di->hnode)) 908 goto next; 909 910 if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags)) 911 pr_warn_client(cl, "dentry %p:%pd async unlink bit is not set\n", 912 dentry, dentry); 913 914 if (!d_same_name(udentry, pdentry, &dname)) 915 goto next; 916 917 found = dget_dlock(udentry); 918 spin_unlock(&udentry->d_lock); 919 break; 920 next: 921 spin_unlock(&udentry->d_lock); 922 } 923 rcu_read_unlock(); 924 925 if (likely(!found)) 926 return 0; 927 928 doutc(cl, "dentry %p:%pd conflict with old %p:%pd\n", dentry, dentry, 929 found, found); 930 931 err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT, 932 TASK_KILLABLE); 933 dput(found); 934 return err; 935 } 936 937 938 /* 939 * sessions 940 */ 941 const char *ceph_session_state_name(int s) 942 { 943 switch (s) { 944 case CEPH_MDS_SESSION_NEW: return "new"; 945 case CEPH_MDS_SESSION_OPENING: return "opening"; 946 case CEPH_MDS_SESSION_OPEN: return "open"; 947 case CEPH_MDS_SESSION_HUNG: return "hung"; 948 case CEPH_MDS_SESSION_CLOSING: return "closing"; 949 case CEPH_MDS_SESSION_CLOSED: return "closed"; 950 case CEPH_MDS_SESSION_RESTARTING: return "restarting"; 951 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; 952 case CEPH_MDS_SESSION_REJECTED: return "rejected"; 953 default: return "???"; 954 } 955 } 956 957 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s) 958 { 959 if (refcount_inc_not_zero(&s->s_ref)) 960 return s; 961 return NULL; 962 } 963 964 void ceph_put_mds_session(struct ceph_mds_session *s) 965 { 966 if (IS_ERR_OR_NULL(s)) 967 return; 968 969 if (refcount_dec_and_test(&s->s_ref)) { 970 if (s->s_auth.authorizer) 971 ceph_auth_destroy_authorizer(s->s_auth.authorizer); 972 WARN_ON(mutex_is_locked(&s->s_mutex)); 973 xa_destroy(&s->s_delegated_inos); 974 kfree(s); 975 } 976 } 977 978 /* 979 * called under mdsc->mutex 980 */ 981 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, 982 int mds) 983 { 984 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 985 return NULL; 986 return ceph_get_mds_session(mdsc->sessions[mds]); 987 } 988 989 static bool __have_session(struct ceph_mds_client *mdsc, int mds) 990 { 991 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 992 return false; 993 else 994 return true; 995 } 996 997 static int __verify_registered_session(struct ceph_mds_client *mdsc, 998 struct ceph_mds_session *s) 999 { 1000 if (s->s_mds >= mdsc->max_sessions || 1001 mdsc->sessions[s->s_mds] != s) 1002 return -ENOENT; 1003 return 0; 1004 } 1005 1006 /* 1007 * create+register a new session for given mds. 1008 * called under mdsc->mutex. 1009 */ 1010 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, 1011 int mds) 1012 { 1013 struct ceph_client *cl = mdsc->fsc->client; 1014 struct ceph_mds_session *s; 1015 1016 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) 1017 return ERR_PTR(-EIO); 1018 1019 if (mds >= mdsc->mdsmap->possible_max_rank) 1020 return ERR_PTR(-EINVAL); 1021 1022 s = kzalloc_obj(*s, GFP_NOFS); 1023 if (!s) 1024 return ERR_PTR(-ENOMEM); 1025 1026 if (mds >= mdsc->max_sessions) { 1027 int newmax = 1 << get_count_order(mds + 1); 1028 struct ceph_mds_session **sa; 1029 size_t ptr_size = sizeof(struct ceph_mds_session *); 1030 1031 doutc(cl, "realloc to %d\n", newmax); 1032 sa = kcalloc(newmax, ptr_size, GFP_NOFS); 1033 if (!sa) 1034 goto fail_realloc; 1035 if (mdsc->sessions) { 1036 memcpy(sa, mdsc->sessions, 1037 mdsc->max_sessions * ptr_size); 1038 kfree(mdsc->sessions); 1039 } 1040 mdsc->sessions = sa; 1041 mdsc->max_sessions = newmax; 1042 } 1043 1044 doutc(cl, "mds%d\n", mds); 1045 s->s_mdsc = mdsc; 1046 s->s_mds = mds; 1047 s->s_state = CEPH_MDS_SESSION_NEW; 1048 mutex_init(&s->s_mutex); 1049 1050 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr); 1051 1052 atomic_set(&s->s_cap_gen, 1); 1053 s->s_cap_ttl = jiffies - 1; 1054 1055 spin_lock_init(&s->s_cap_lock); 1056 INIT_LIST_HEAD(&s->s_caps); 1057 refcount_set(&s->s_ref, 1); 1058 INIT_LIST_HEAD(&s->s_waiting); 1059 INIT_LIST_HEAD(&s->s_unsafe); 1060 xa_init(&s->s_delegated_inos); 1061 INIT_LIST_HEAD(&s->s_cap_releases); 1062 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work); 1063 1064 INIT_LIST_HEAD(&s->s_cap_dirty); 1065 INIT_LIST_HEAD(&s->s_cap_flushing); 1066 1067 mdsc->sessions[mds] = s; 1068 atomic_inc(&mdsc->num_sessions); 1069 refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 1070 1071 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, 1072 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 1073 1074 return s; 1075 1076 fail_realloc: 1077 kfree(s); 1078 return ERR_PTR(-ENOMEM); 1079 } 1080 1081 /* 1082 * called under mdsc->mutex 1083 */ 1084 static void __unregister_session(struct ceph_mds_client *mdsc, 1085 struct ceph_mds_session *s) 1086 { 1087 doutc(mdsc->fsc->client, "mds%d %p\n", s->s_mds, s); 1088 BUG_ON(mdsc->sessions[s->s_mds] != s); 1089 mdsc->sessions[s->s_mds] = NULL; 1090 ceph_con_close(&s->s_con); 1091 ceph_put_mds_session(s); 1092 atomic_dec(&mdsc->num_sessions); 1093 } 1094 1095 /* 1096 * drop session refs in request. 1097 * 1098 * should be last request ref, or hold mdsc->mutex 1099 */ 1100 static void put_request_session(struct ceph_mds_request *req) 1101 { 1102 if (req->r_session) { 1103 ceph_put_mds_session(req->r_session); 1104 req->r_session = NULL; 1105 } 1106 } 1107 1108 void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc, 1109 void (*cb)(struct ceph_mds_session *), 1110 bool check_state) 1111 { 1112 int mds; 1113 1114 mutex_lock(&mdsc->mutex); 1115 for (mds = 0; mds < mdsc->max_sessions; ++mds) { 1116 struct ceph_mds_session *s; 1117 1118 s = __ceph_lookup_mds_session(mdsc, mds); 1119 if (!s) 1120 continue; 1121 1122 if (check_state && !check_session_state(s)) { 1123 ceph_put_mds_session(s); 1124 continue; 1125 } 1126 1127 mutex_unlock(&mdsc->mutex); 1128 cb(s); 1129 ceph_put_mds_session(s); 1130 mutex_lock(&mdsc->mutex); 1131 } 1132 mutex_unlock(&mdsc->mutex); 1133 } 1134 1135 void ceph_mdsc_release_request(struct kref *kref) 1136 { 1137 struct ceph_mds_request *req = container_of(kref, 1138 struct ceph_mds_request, 1139 r_kref); 1140 ceph_mdsc_release_dir_caps_async(req); 1141 destroy_reply_info(&req->r_reply_info); 1142 if (req->r_request) 1143 ceph_msg_put(req->r_request); 1144 if (req->r_reply) 1145 ceph_msg_put(req->r_reply); 1146 if (req->r_inode) { 1147 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 1148 iput(req->r_inode); 1149 } 1150 if (req->r_parent) { 1151 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 1152 iput(req->r_parent); 1153 } 1154 iput(req->r_target_inode); 1155 iput(req->r_new_inode); 1156 if (req->r_dentry) 1157 dput(req->r_dentry); 1158 if (req->r_old_dentry) 1159 dput(req->r_old_dentry); 1160 if (req->r_old_dentry_dir) { 1161 /* 1162 * track (and drop pins for) r_old_dentry_dir 1163 * separately, since r_old_dentry's d_parent may have 1164 * changed between the dir mutex being dropped and 1165 * this request being freed. 1166 */ 1167 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), 1168 CEPH_CAP_PIN); 1169 iput(req->r_old_dentry_dir); 1170 } 1171 kfree(req->r_path1); 1172 kfree(req->r_path2); 1173 put_cred(req->r_cred); 1174 if (req->r_mnt_idmap) 1175 mnt_idmap_put(req->r_mnt_idmap); 1176 if (req->r_pagelist) 1177 ceph_pagelist_release(req->r_pagelist); 1178 kfree(req->r_fscrypt_auth); 1179 kfree(req->r_altname); 1180 put_request_session(req); 1181 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); 1182 WARN_ON_ONCE(!list_empty(&req->r_wait)); 1183 kmem_cache_free(ceph_mds_request_cachep, req); 1184 } 1185 1186 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node) 1187 1188 /* 1189 * lookup session, bump ref if found. 1190 * 1191 * called under mdsc->mutex. 1192 */ 1193 static struct ceph_mds_request * 1194 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid) 1195 { 1196 struct ceph_mds_request *req; 1197 1198 req = lookup_request(&mdsc->request_tree, tid); 1199 if (req) 1200 ceph_mdsc_get_request(req); 1201 1202 return req; 1203 } 1204 1205 /* 1206 * Register an in-flight request, and assign a tid. Link to directory 1207 * are modifying (if any). 1208 * 1209 * Called under mdsc->mutex. 1210 */ 1211 static void __register_request(struct ceph_mds_client *mdsc, 1212 struct ceph_mds_request *req, 1213 struct inode *dir) 1214 { 1215 struct ceph_client *cl = mdsc->fsc->client; 1216 int ret = 0; 1217 1218 req->r_tid = ++mdsc->last_tid; 1219 if (req->r_num_caps) { 1220 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation, 1221 req->r_num_caps); 1222 if (ret < 0) { 1223 pr_err_client(cl, "%p failed to reserve caps: %d\n", 1224 req, ret); 1225 /* set req->r_err to fail early from __do_request */ 1226 req->r_err = ret; 1227 return; 1228 } 1229 } 1230 doutc(cl, "%p tid %lld\n", req, req->r_tid); 1231 ceph_mdsc_get_request(req); 1232 insert_request(&mdsc->request_tree, req); 1233 1234 req->r_cred = get_current_cred(); 1235 if (!req->r_mnt_idmap) 1236 req->r_mnt_idmap = &nop_mnt_idmap; 1237 1238 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK) 1239 mdsc->oldest_tid = req->r_tid; 1240 1241 if (dir) { 1242 struct ceph_inode_info *ci = ceph_inode(dir); 1243 1244 ihold(dir); 1245 req->r_unsafe_dir = dir; 1246 spin_lock(&ci->i_unsafe_lock); 1247 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); 1248 spin_unlock(&ci->i_unsafe_lock); 1249 } 1250 } 1251 1252 static void __unregister_request(struct ceph_mds_client *mdsc, 1253 struct ceph_mds_request *req) 1254 { 1255 doutc(mdsc->fsc->client, "%p tid %lld\n", req, req->r_tid); 1256 1257 /* Never leave an unregistered request on an unsafe list! */ 1258 list_del_init(&req->r_unsafe_item); 1259 1260 if (req->r_tid == mdsc->oldest_tid) { 1261 struct rb_node *p = rb_next(&req->r_node); 1262 mdsc->oldest_tid = 0; 1263 while (p) { 1264 struct ceph_mds_request *next_req = 1265 rb_entry(p, struct ceph_mds_request, r_node); 1266 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) { 1267 mdsc->oldest_tid = next_req->r_tid; 1268 break; 1269 } 1270 p = rb_next(p); 1271 } 1272 } 1273 1274 erase_request(&mdsc->request_tree, req); 1275 1276 if (req->r_unsafe_dir) { 1277 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 1278 spin_lock(&ci->i_unsafe_lock); 1279 list_del_init(&req->r_unsafe_dir_item); 1280 spin_unlock(&ci->i_unsafe_lock); 1281 } 1282 if (req->r_target_inode && 1283 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 1284 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); 1285 spin_lock(&ci->i_unsafe_lock); 1286 list_del_init(&req->r_unsafe_target_item); 1287 spin_unlock(&ci->i_unsafe_lock); 1288 } 1289 1290 if (req->r_unsafe_dir) { 1291 iput(req->r_unsafe_dir); 1292 req->r_unsafe_dir = NULL; 1293 } 1294 1295 complete_all(&req->r_safe_completion); 1296 1297 ceph_mdsc_put_request(req); 1298 } 1299 1300 /* 1301 * Walk back up the dentry tree until we hit a dentry representing a 1302 * non-snapshot inode. We do this using the rcu_read_lock (which must be held 1303 * when calling this) to ensure that the objects won't disappear while we're 1304 * working with them. Once we hit a candidate dentry, we attempt to take a 1305 * reference to it, and return that as the result. 1306 */ 1307 static struct inode *get_nonsnap_parent(struct dentry *dentry) 1308 { 1309 struct inode *inode = NULL; 1310 1311 while (dentry && !IS_ROOT(dentry)) { 1312 inode = d_inode_rcu(dentry); 1313 if (!inode || ceph_snap(inode) == CEPH_NOSNAP) 1314 break; 1315 dentry = dentry->d_parent; 1316 } 1317 if (inode) 1318 inode = igrab(inode); 1319 return inode; 1320 } 1321 1322 /* 1323 * Choose mds to send request to next. If there is a hint set in the 1324 * request (e.g., due to a prior forward hint from the mds), use that. 1325 * Otherwise, consult frag tree and/or caps to identify the 1326 * appropriate mds. If all else fails, choose randomly. 1327 * 1328 * Called under mdsc->mutex. 1329 */ 1330 static int __choose_mds(struct ceph_mds_client *mdsc, 1331 struct ceph_mds_request *req, 1332 bool *random) 1333 { 1334 struct inode *inode; 1335 struct ceph_inode_info *ci; 1336 struct ceph_cap *cap; 1337 int mode = req->r_direct_mode; 1338 int mds = -1; 1339 u32 hash = req->r_direct_hash; 1340 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); 1341 struct ceph_client *cl = mdsc->fsc->client; 1342 1343 if (random) 1344 *random = false; 1345 1346 /* 1347 * is there a specific mds we should try? ignore hint if we have 1348 * no session and the mds is not up (active or recovering). 1349 */ 1350 if (req->r_resend_mds >= 0 && 1351 (__have_session(mdsc, req->r_resend_mds) || 1352 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) { 1353 doutc(cl, "using resend_mds mds%d\n", req->r_resend_mds); 1354 return req->r_resend_mds; 1355 } 1356 1357 if (mode == USE_RANDOM_MDS) 1358 goto random; 1359 1360 inode = NULL; 1361 if (req->r_inode) { 1362 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) { 1363 inode = req->r_inode; 1364 ihold(inode); 1365 } else { 1366 /* req->r_dentry is non-null for LSSNAP request */ 1367 rcu_read_lock(); 1368 inode = get_nonsnap_parent(req->r_dentry); 1369 rcu_read_unlock(); 1370 doutc(cl, "using snapdir's parent %p %llx.%llx\n", 1371 inode, ceph_vinop(inode)); 1372 } 1373 } else if (req->r_dentry) { 1374 /* ignore race with rename; old or new d_parent is okay */ 1375 struct dentry *parent; 1376 struct inode *dir; 1377 1378 rcu_read_lock(); 1379 parent = READ_ONCE(req->r_dentry->d_parent); 1380 dir = req->r_parent ? : d_inode_rcu(parent); 1381 1382 if (!dir || dir->i_sb != mdsc->fsc->sb) { 1383 /* not this fs or parent went negative */ 1384 inode = d_inode(req->r_dentry); 1385 if (inode) 1386 ihold(inode); 1387 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 1388 /* direct snapped/virtual snapdir requests 1389 * based on parent dir inode */ 1390 inode = get_nonsnap_parent(parent); 1391 doutc(cl, "using nonsnap parent %p %llx.%llx\n", 1392 inode, ceph_vinop(inode)); 1393 } else { 1394 /* dentry target */ 1395 inode = d_inode(req->r_dentry); 1396 if (!inode || mode == USE_AUTH_MDS) { 1397 /* dir + name */ 1398 inode = igrab(dir); 1399 hash = ceph_dentry_hash(dir, req->r_dentry); 1400 is_hash = true; 1401 } else { 1402 ihold(inode); 1403 } 1404 } 1405 rcu_read_unlock(); 1406 } 1407 1408 if (!inode) 1409 goto random; 1410 1411 doutc(cl, "%p %llx.%llx is_hash=%d (0x%x) mode %d\n", inode, 1412 ceph_vinop(inode), (int)is_hash, hash, mode); 1413 ci = ceph_inode(inode); 1414 1415 if (is_hash && S_ISDIR(inode->i_mode)) { 1416 struct ceph_inode_frag frag; 1417 int found; 1418 1419 ceph_choose_frag(ci, hash, &frag, &found); 1420 if (found) { 1421 if (mode == USE_ANY_MDS && frag.ndist > 0) { 1422 u8 r; 1423 1424 /* choose a random replica */ 1425 get_random_bytes(&r, 1); 1426 r %= frag.ndist; 1427 mds = frag.dist[r]; 1428 doutc(cl, "%p %llx.%llx frag %u mds%d (%d/%d)\n", 1429 inode, ceph_vinop(inode), frag.frag, 1430 mds, (int)r, frag.ndist); 1431 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1432 CEPH_MDS_STATE_ACTIVE && 1433 !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds)) 1434 goto out; 1435 } 1436 1437 /* since this file/dir wasn't known to be 1438 * replicated, then we want to look for the 1439 * authoritative mds. */ 1440 if (frag.mds >= 0) { 1441 /* choose auth mds */ 1442 mds = frag.mds; 1443 doutc(cl, "%p %llx.%llx frag %u mds%d (auth)\n", 1444 inode, ceph_vinop(inode), frag.frag, mds); 1445 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1446 CEPH_MDS_STATE_ACTIVE) { 1447 if (!ceph_mdsmap_is_laggy(mdsc->mdsmap, 1448 mds)) 1449 goto out; 1450 } 1451 } 1452 mode = USE_AUTH_MDS; 1453 } 1454 } 1455 1456 spin_lock(&ci->i_ceph_lock); 1457 cap = NULL; 1458 if (mode == USE_AUTH_MDS) 1459 cap = ci->i_auth_cap; 1460 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) 1461 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); 1462 if (!cap) { 1463 spin_unlock(&ci->i_ceph_lock); 1464 iput(inode); 1465 goto random; 1466 } 1467 mds = cap->session->s_mds; 1468 doutc(cl, "%p %llx.%llx mds%d (%scap %p)\n", inode, 1469 ceph_vinop(inode), mds, 1470 cap == ci->i_auth_cap ? "auth " : "", cap); 1471 spin_unlock(&ci->i_ceph_lock); 1472 out: 1473 iput(inode); 1474 return mds; 1475 1476 random: 1477 if (random) 1478 *random = true; 1479 1480 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap); 1481 doutc(cl, "chose random mds%d\n", mds); 1482 return mds; 1483 } 1484 1485 1486 /* 1487 * session messages 1488 */ 1489 struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq) 1490 { 1491 struct ceph_msg *msg; 1492 struct ceph_mds_session_head *h; 1493 1494 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS, 1495 false); 1496 if (!msg) { 1497 pr_err("ENOMEM creating session %s msg\n", 1498 ceph_session_op_name(op)); 1499 return NULL; 1500 } 1501 h = msg->front.iov_base; 1502 h->op = cpu_to_le32(op); 1503 h->seq = cpu_to_le64(seq); 1504 1505 return msg; 1506 } 1507 1508 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED; 1509 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8) 1510 static int encode_supported_features(void **p, void *end) 1511 { 1512 static const size_t count = ARRAY_SIZE(feature_bits); 1513 1514 if (count > 0) { 1515 size_t i; 1516 size_t size = FEATURE_BYTES(count); 1517 unsigned long bit; 1518 1519 if (WARN_ON_ONCE(*p + 4 + size > end)) 1520 return -ERANGE; 1521 1522 ceph_encode_32(p, size); 1523 memset(*p, 0, size); 1524 for (i = 0; i < count; i++) { 1525 bit = feature_bits[i]; 1526 ((unsigned char *)(*p))[bit / 8] |= BIT(bit % 8); 1527 } 1528 *p += size; 1529 } else { 1530 if (WARN_ON_ONCE(*p + 4 > end)) 1531 return -ERANGE; 1532 1533 ceph_encode_32(p, 0); 1534 } 1535 1536 return 0; 1537 } 1538 1539 static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED; 1540 #define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8) 1541 static int encode_metric_spec(void **p, void *end) 1542 { 1543 static const size_t count = ARRAY_SIZE(metric_bits); 1544 1545 /* header */ 1546 if (WARN_ON_ONCE(*p + 2 > end)) 1547 return -ERANGE; 1548 1549 ceph_encode_8(p, 1); /* version */ 1550 ceph_encode_8(p, 1); /* compat */ 1551 1552 if (count > 0) { 1553 size_t i; 1554 size_t size = METRIC_BYTES(count); 1555 1556 if (WARN_ON_ONCE(*p + 4 + 4 + size > end)) 1557 return -ERANGE; 1558 1559 /* metric spec info length */ 1560 ceph_encode_32(p, 4 + size); 1561 1562 /* metric spec */ 1563 ceph_encode_32(p, size); 1564 memset(*p, 0, size); 1565 for (i = 0; i < count; i++) 1566 ((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8); 1567 *p += size; 1568 } else { 1569 if (WARN_ON_ONCE(*p + 4 + 4 > end)) 1570 return -ERANGE; 1571 1572 /* metric spec info length */ 1573 ceph_encode_32(p, 4); 1574 /* metric spec */ 1575 ceph_encode_32(p, 0); 1576 } 1577 1578 return 0; 1579 } 1580 1581 /* 1582 * session message, specialization for CEPH_SESSION_REQUEST_OPEN 1583 * to include additional client metadata fields. 1584 */ 1585 static struct ceph_msg * 1586 create_session_full_msg(struct ceph_mds_client *mdsc, int op, u64 seq) 1587 { 1588 struct ceph_msg *msg; 1589 struct ceph_mds_session_head *h; 1590 int i; 1591 int extra_bytes = 0; 1592 int metadata_key_count = 0; 1593 struct ceph_options *opt = mdsc->fsc->client->options; 1594 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options; 1595 struct ceph_client *cl = mdsc->fsc->client; 1596 size_t size, count; 1597 void *p, *end; 1598 int ret; 1599 1600 const char* metadata[][2] = { 1601 {"hostname", mdsc->nodename}, 1602 {"kernel_version", init_utsname()->release}, 1603 {"entity_id", opt->name ? : ""}, 1604 {"root", fsopt->server_path ? : "/"}, 1605 {NULL, NULL} 1606 }; 1607 1608 /* Calculate serialized length of metadata */ 1609 extra_bytes = 4; /* map length */ 1610 for (i = 0; metadata[i][0]; ++i) { 1611 extra_bytes += 8 + strlen(metadata[i][0]) + 1612 strlen(metadata[i][1]); 1613 metadata_key_count++; 1614 } 1615 1616 /* supported feature */ 1617 size = 0; 1618 count = ARRAY_SIZE(feature_bits); 1619 if (count > 0) 1620 size = FEATURE_BYTES(count); 1621 extra_bytes += 4 + size; 1622 1623 /* metric spec */ 1624 size = 0; 1625 count = ARRAY_SIZE(metric_bits); 1626 if (count > 0) 1627 size = METRIC_BYTES(count); 1628 extra_bytes += 2 + 4 + 4 + size; 1629 1630 /* flags, mds auth caps and oldest_client_tid */ 1631 extra_bytes += 4 + 4 + 8; 1632 1633 /* Allocate the message */ 1634 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes, 1635 GFP_NOFS, false); 1636 if (!msg) { 1637 pr_err_client(cl, "ENOMEM creating session open msg\n"); 1638 return ERR_PTR(-ENOMEM); 1639 } 1640 p = msg->front.iov_base; 1641 end = p + msg->front.iov_len; 1642 1643 h = p; 1644 h->op = cpu_to_le32(op); 1645 h->seq = cpu_to_le64(seq); 1646 1647 /* 1648 * Serialize client metadata into waiting buffer space, using 1649 * the format that userspace expects for map<string, string> 1650 * 1651 * ClientSession messages with metadata are v7 1652 */ 1653 msg->hdr.version = cpu_to_le16(7); 1654 msg->hdr.compat_version = cpu_to_le16(1); 1655 1656 /* The write pointer, following the session_head structure */ 1657 p += sizeof(*h); 1658 1659 /* Number of entries in the map */ 1660 ceph_encode_32(&p, metadata_key_count); 1661 1662 /* Two length-prefixed strings for each entry in the map */ 1663 for (i = 0; metadata[i][0]; ++i) { 1664 size_t const key_len = strlen(metadata[i][0]); 1665 size_t const val_len = strlen(metadata[i][1]); 1666 1667 ceph_encode_32(&p, key_len); 1668 memcpy(p, metadata[i][0], key_len); 1669 p += key_len; 1670 ceph_encode_32(&p, val_len); 1671 memcpy(p, metadata[i][1], val_len); 1672 p += val_len; 1673 } 1674 1675 ret = encode_supported_features(&p, end); 1676 if (ret) { 1677 pr_err_client(cl, "encode_supported_features failed!\n"); 1678 ceph_msg_put(msg); 1679 return ERR_PTR(ret); 1680 } 1681 1682 ret = encode_metric_spec(&p, end); 1683 if (ret) { 1684 pr_err_client(cl, "encode_metric_spec failed!\n"); 1685 ceph_msg_put(msg); 1686 return ERR_PTR(ret); 1687 } 1688 1689 /* version == 5, flags */ 1690 ceph_encode_32(&p, 0); 1691 1692 /* version == 6, mds auth caps */ 1693 ceph_encode_32(&p, 0); 1694 1695 /* version == 7, oldest_client_tid */ 1696 ceph_encode_64(&p, mdsc->oldest_tid); 1697 1698 msg->front.iov_len = p - msg->front.iov_base; 1699 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1700 1701 return msg; 1702 } 1703 1704 /* 1705 * send session open request. 1706 * 1707 * called under mdsc->mutex 1708 */ 1709 static int __open_session(struct ceph_mds_client *mdsc, 1710 struct ceph_mds_session *session) 1711 { 1712 struct ceph_msg *msg; 1713 int mstate; 1714 int mds = session->s_mds; 1715 1716 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) 1717 return -EIO; 1718 1719 /* wait for mds to go active? */ 1720 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 1721 doutc(mdsc->fsc->client, "open_session to mds%d (%s)\n", mds, 1722 ceph_mds_state_name(mstate)); 1723 session->s_state = CEPH_MDS_SESSION_OPENING; 1724 session->s_renew_requested = jiffies; 1725 1726 /* send connect message */ 1727 msg = create_session_full_msg(mdsc, CEPH_SESSION_REQUEST_OPEN, 1728 session->s_seq); 1729 if (IS_ERR(msg)) 1730 return PTR_ERR(msg); 1731 ceph_con_send(&session->s_con, msg); 1732 return 0; 1733 } 1734 1735 /* 1736 * open sessions for any export targets for the given mds 1737 * 1738 * called under mdsc->mutex 1739 */ 1740 static struct ceph_mds_session * 1741 __open_export_target_session(struct ceph_mds_client *mdsc, int target) 1742 { 1743 struct ceph_mds_session *session; 1744 int ret; 1745 1746 session = __ceph_lookup_mds_session(mdsc, target); 1747 if (!session) { 1748 session = register_session(mdsc, target); 1749 if (IS_ERR(session)) 1750 return session; 1751 } 1752 if (session->s_state == CEPH_MDS_SESSION_NEW || 1753 session->s_state == CEPH_MDS_SESSION_CLOSING) { 1754 ret = __open_session(mdsc, session); 1755 if (ret) 1756 return ERR_PTR(ret); 1757 } 1758 1759 return session; 1760 } 1761 1762 struct ceph_mds_session * 1763 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target) 1764 { 1765 struct ceph_mds_session *session; 1766 struct ceph_client *cl = mdsc->fsc->client; 1767 1768 doutc(cl, "to mds%d\n", target); 1769 1770 mutex_lock(&mdsc->mutex); 1771 session = __open_export_target_session(mdsc, target); 1772 mutex_unlock(&mdsc->mutex); 1773 1774 return session; 1775 } 1776 1777 static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 1778 struct ceph_mds_session *session) 1779 { 1780 struct ceph_mds_info *mi; 1781 struct ceph_mds_session *ts; 1782 int i, mds = session->s_mds; 1783 struct ceph_client *cl = mdsc->fsc->client; 1784 1785 if (mds >= mdsc->mdsmap->possible_max_rank) 1786 return; 1787 1788 mi = &mdsc->mdsmap->m_info[mds]; 1789 doutc(cl, "for mds%d (%d targets)\n", session->s_mds, 1790 mi->num_export_targets); 1791 1792 for (i = 0; i < mi->num_export_targets; i++) { 1793 ts = __open_export_target_session(mdsc, mi->export_targets[i]); 1794 ceph_put_mds_session(ts); 1795 } 1796 } 1797 1798 /* 1799 * session caps 1800 */ 1801 1802 static void detach_cap_releases(struct ceph_mds_session *session, 1803 struct list_head *target) 1804 { 1805 struct ceph_client *cl = session->s_mdsc->fsc->client; 1806 1807 lockdep_assert_held(&session->s_cap_lock); 1808 1809 list_splice_init(&session->s_cap_releases, target); 1810 session->s_num_cap_releases = 0; 1811 doutc(cl, "mds%d\n", session->s_mds); 1812 } 1813 1814 static void dispose_cap_releases(struct ceph_mds_client *mdsc, 1815 struct list_head *dispose) 1816 { 1817 while (!list_empty(dispose)) { 1818 struct ceph_cap *cap; 1819 /* zero out the in-progress message */ 1820 cap = list_first_entry(dispose, struct ceph_cap, session_caps); 1821 list_del(&cap->session_caps); 1822 ceph_put_cap(mdsc, cap); 1823 } 1824 } 1825 1826 static void cleanup_session_requests(struct ceph_mds_client *mdsc, 1827 struct ceph_mds_session *session) 1828 { 1829 struct ceph_client *cl = mdsc->fsc->client; 1830 struct ceph_mds_request *req; 1831 struct rb_node *p; 1832 1833 doutc(cl, "mds%d\n", session->s_mds); 1834 mutex_lock(&mdsc->mutex); 1835 while (!list_empty(&session->s_unsafe)) { 1836 req = list_first_entry(&session->s_unsafe, 1837 struct ceph_mds_request, r_unsafe_item); 1838 pr_warn_ratelimited_client(cl, " dropping unsafe request %llu\n", 1839 req->r_tid); 1840 if (req->r_target_inode) 1841 mapping_set_error(req->r_target_inode->i_mapping, -EIO); 1842 if (req->r_unsafe_dir) 1843 mapping_set_error(req->r_unsafe_dir->i_mapping, -EIO); 1844 __unregister_request(mdsc, req); 1845 } 1846 /* zero r_attempts, so kick_requests() will re-send requests */ 1847 p = rb_first(&mdsc->request_tree); 1848 while (p) { 1849 req = rb_entry(p, struct ceph_mds_request, r_node); 1850 p = rb_next(p); 1851 if (req->r_session && 1852 req->r_session->s_mds == session->s_mds) 1853 req->r_attempts = 0; 1854 } 1855 mutex_unlock(&mdsc->mutex); 1856 } 1857 1858 /* 1859 * Helper to safely iterate over all caps associated with a session, with 1860 * special care taken to handle a racing __ceph_remove_cap(). 1861 * 1862 * Caller must hold session s_mutex. 1863 */ 1864 int ceph_iterate_session_caps(struct ceph_mds_session *session, 1865 int (*cb)(struct inode *, int mds, void *), 1866 void *arg) 1867 { 1868 struct ceph_client *cl = session->s_mdsc->fsc->client; 1869 struct list_head *p; 1870 struct ceph_cap *cap; 1871 struct inode *inode, *last_inode = NULL; 1872 struct ceph_cap *old_cap = NULL; 1873 int ret; 1874 1875 doutc(cl, "%p mds%d\n", session, session->s_mds); 1876 spin_lock(&session->s_cap_lock); 1877 p = session->s_caps.next; 1878 while (p != &session->s_caps) { 1879 int mds; 1880 1881 cap = list_entry(p, struct ceph_cap, session_caps); 1882 inode = igrab(&cap->ci->netfs.inode); 1883 if (!inode) { 1884 p = p->next; 1885 continue; 1886 } 1887 session->s_cap_iterator = cap; 1888 mds = cap->mds; 1889 spin_unlock(&session->s_cap_lock); 1890 1891 if (last_inode) { 1892 iput(last_inode); 1893 last_inode = NULL; 1894 } 1895 if (old_cap) { 1896 ceph_put_cap(session->s_mdsc, old_cap); 1897 old_cap = NULL; 1898 } 1899 1900 ret = cb(inode, mds, arg); 1901 last_inode = inode; 1902 1903 spin_lock(&session->s_cap_lock); 1904 p = p->next; 1905 if (!cap->ci) { 1906 doutc(cl, "finishing cap %p removal\n", cap); 1907 BUG_ON(cap->session != session); 1908 cap->session = NULL; 1909 list_del_init(&cap->session_caps); 1910 session->s_nr_caps--; 1911 atomic64_dec(&session->s_mdsc->metric.total_caps); 1912 if (cap->queue_release) 1913 __ceph_queue_cap_release(session, cap); 1914 else 1915 old_cap = cap; /* put_cap it w/o locks held */ 1916 } 1917 if (ret < 0) 1918 goto out; 1919 } 1920 ret = 0; 1921 out: 1922 session->s_cap_iterator = NULL; 1923 spin_unlock(&session->s_cap_lock); 1924 1925 iput(last_inode); 1926 if (old_cap) 1927 ceph_put_cap(session->s_mdsc, old_cap); 1928 1929 return ret; 1930 } 1931 1932 static int remove_session_caps_cb(struct inode *inode, int mds, void *arg) 1933 { 1934 struct ceph_inode_info *ci = ceph_inode(inode); 1935 struct ceph_client *cl = ceph_inode_to_client(inode); 1936 bool invalidate = false; 1937 struct ceph_cap *cap; 1938 int iputs = 0; 1939 1940 spin_lock(&ci->i_ceph_lock); 1941 cap = __get_cap_for_mds(ci, mds); 1942 if (cap) { 1943 doutc(cl, " removing cap %p, ci is %p, inode is %p\n", 1944 cap, ci, &ci->netfs.inode); 1945 1946 iputs = ceph_purge_inode_cap(inode, cap, &invalidate); 1947 } 1948 spin_unlock(&ci->i_ceph_lock); 1949 1950 if (cap) 1951 wake_up_all(&ci->i_cap_wq); 1952 if (invalidate) 1953 ceph_queue_invalidate(inode); 1954 while (iputs--) 1955 iput(inode); 1956 return 0; 1957 } 1958 1959 /* 1960 * caller must hold session s_mutex 1961 */ 1962 static void remove_session_caps(struct ceph_mds_session *session) 1963 { 1964 struct ceph_fs_client *fsc = session->s_mdsc->fsc; 1965 struct super_block *sb = fsc->sb; 1966 LIST_HEAD(dispose); 1967 1968 doutc(fsc->client, "on %p\n", session); 1969 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc); 1970 1971 wake_up_all(&fsc->mdsc->cap_flushing_wq); 1972 1973 spin_lock(&session->s_cap_lock); 1974 if (session->s_nr_caps > 0) { 1975 struct inode *inode; 1976 struct ceph_cap *cap, *prev = NULL; 1977 struct ceph_vino vino; 1978 /* 1979 * iterate_session_caps() skips inodes that are being 1980 * deleted, we need to wait until deletions are complete. 1981 * __wait_on_freeing_inode() is designed for the job, 1982 * but it is not exported, so use lookup inode function 1983 * to access it. 1984 */ 1985 while (!list_empty(&session->s_caps)) { 1986 cap = list_entry(session->s_caps.next, 1987 struct ceph_cap, session_caps); 1988 if (cap == prev) 1989 break; 1990 prev = cap; 1991 vino = cap->ci->i_vino; 1992 spin_unlock(&session->s_cap_lock); 1993 1994 inode = ceph_find_inode(sb, vino); 1995 iput(inode); 1996 1997 spin_lock(&session->s_cap_lock); 1998 } 1999 } 2000 2001 // drop cap expires and unlock s_cap_lock 2002 detach_cap_releases(session, &dispose); 2003 2004 BUG_ON(session->s_nr_caps > 0); 2005 BUG_ON(!list_empty(&session->s_cap_flushing)); 2006 spin_unlock(&session->s_cap_lock); 2007 dispose_cap_releases(session->s_mdsc, &dispose); 2008 } 2009 2010 enum { 2011 RECONNECT, 2012 RENEWCAPS, 2013 FORCE_RO, 2014 }; 2015 2016 /* 2017 * wake up any threads waiting on this session's caps. if the cap is 2018 * old (didn't get renewed on the client reconnect), remove it now. 2019 * 2020 * caller must hold s_mutex. 2021 */ 2022 static int wake_up_session_cb(struct inode *inode, int mds, void *arg) 2023 { 2024 struct ceph_inode_info *ci = ceph_inode(inode); 2025 unsigned long ev = (unsigned long)arg; 2026 2027 if (ev == RECONNECT) { 2028 spin_lock(&ci->i_ceph_lock); 2029 ci->i_wanted_max_size = 0; 2030 ci->i_requested_max_size = 0; 2031 spin_unlock(&ci->i_ceph_lock); 2032 } else if (ev == RENEWCAPS) { 2033 struct ceph_cap *cap; 2034 2035 spin_lock(&ci->i_ceph_lock); 2036 cap = __get_cap_for_mds(ci, mds); 2037 /* mds did not re-issue stale cap */ 2038 if (cap && cap->cap_gen < atomic_read(&cap->session->s_cap_gen)) 2039 cap->issued = cap->implemented = CEPH_CAP_PIN; 2040 spin_unlock(&ci->i_ceph_lock); 2041 } else if (ev == FORCE_RO) { 2042 } 2043 wake_up_all(&ci->i_cap_wq); 2044 return 0; 2045 } 2046 2047 static void wake_up_session_caps(struct ceph_mds_session *session, int ev) 2048 { 2049 struct ceph_client *cl = session->s_mdsc->fsc->client; 2050 2051 doutc(cl, "session %p mds%d\n", session, session->s_mds); 2052 ceph_iterate_session_caps(session, wake_up_session_cb, 2053 (void *)(unsigned long)ev); 2054 } 2055 2056 /* 2057 * Send periodic message to MDS renewing all currently held caps. The 2058 * ack will reset the expiration for all caps from this session. 2059 * 2060 * caller holds s_mutex 2061 */ 2062 static int send_renew_caps(struct ceph_mds_client *mdsc, 2063 struct ceph_mds_session *session) 2064 { 2065 struct ceph_client *cl = mdsc->fsc->client; 2066 struct ceph_msg *msg; 2067 int state; 2068 2069 if (time_after_eq(jiffies, session->s_cap_ttl) && 2070 time_after_eq(session->s_cap_ttl, session->s_renew_requested)) 2071 pr_info_client(cl, "mds%d caps stale\n", session->s_mds); 2072 session->s_renew_requested = jiffies; 2073 2074 /* do not try to renew caps until a recovering mds has reconnected 2075 * with its clients. */ 2076 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); 2077 if (state < CEPH_MDS_STATE_RECONNECT) { 2078 doutc(cl, "ignoring mds%d (%s)\n", session->s_mds, 2079 ceph_mds_state_name(state)); 2080 return 0; 2081 } 2082 2083 doutc(cl, "to mds%d (%s)\n", session->s_mds, 2084 ceph_mds_state_name(state)); 2085 msg = create_session_full_msg(mdsc, CEPH_SESSION_REQUEST_RENEWCAPS, 2086 ++session->s_renew_seq); 2087 if (IS_ERR(msg)) 2088 return PTR_ERR(msg); 2089 ceph_con_send(&session->s_con, msg); 2090 return 0; 2091 } 2092 2093 static int send_flushmsg_ack(struct ceph_mds_client *mdsc, 2094 struct ceph_mds_session *session, u64 seq) 2095 { 2096 struct ceph_client *cl = mdsc->fsc->client; 2097 struct ceph_msg *msg; 2098 2099 doutc(cl, "to mds%d (%s)s seq %lld\n", session->s_mds, 2100 ceph_session_state_name(session->s_state), seq); 2101 msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq); 2102 if (!msg) 2103 return -ENOMEM; 2104 ceph_con_send(&session->s_con, msg); 2105 return 0; 2106 } 2107 2108 2109 /* 2110 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 2111 * 2112 * Called under session->s_mutex 2113 */ 2114 static void renewed_caps(struct ceph_mds_client *mdsc, 2115 struct ceph_mds_session *session, int is_renew) 2116 { 2117 struct ceph_client *cl = mdsc->fsc->client; 2118 int was_stale; 2119 int wake = 0; 2120 2121 spin_lock(&session->s_cap_lock); 2122 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl); 2123 2124 session->s_cap_ttl = session->s_renew_requested + 2125 mdsc->mdsmap->m_session_timeout*HZ; 2126 2127 if (was_stale) { 2128 if (time_before(jiffies, session->s_cap_ttl)) { 2129 pr_info_client(cl, "mds%d caps renewed\n", 2130 session->s_mds); 2131 wake = 1; 2132 } else { 2133 pr_info_client(cl, "mds%d caps still stale\n", 2134 session->s_mds); 2135 } 2136 } 2137 doutc(cl, "mds%d ttl now %lu, was %s, now %s\n", session->s_mds, 2138 session->s_cap_ttl, was_stale ? "stale" : "fresh", 2139 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); 2140 spin_unlock(&session->s_cap_lock); 2141 2142 if (wake) 2143 wake_up_session_caps(session, RENEWCAPS); 2144 } 2145 2146 /* 2147 * send a session close request 2148 */ 2149 static int request_close_session(struct ceph_mds_session *session) 2150 { 2151 struct ceph_client *cl = session->s_mdsc->fsc->client; 2152 struct ceph_msg *msg; 2153 2154 doutc(cl, "mds%d state %s seq %lld\n", session->s_mds, 2155 ceph_session_state_name(session->s_state), session->s_seq); 2156 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE, 2157 session->s_seq); 2158 if (!msg) 2159 return -ENOMEM; 2160 ceph_con_send(&session->s_con, msg); 2161 return 1; 2162 } 2163 2164 /* 2165 * Called with s_mutex held. 2166 */ 2167 static int __close_session(struct ceph_mds_client *mdsc, 2168 struct ceph_mds_session *session) 2169 { 2170 if (session->s_state >= CEPH_MDS_SESSION_CLOSING) 2171 return 0; 2172 session->s_state = CEPH_MDS_SESSION_CLOSING; 2173 return request_close_session(session); 2174 } 2175 2176 static bool drop_negative_children(struct dentry *dentry) 2177 { 2178 struct dentry *child; 2179 bool all_negative = true; 2180 2181 if (!d_is_dir(dentry)) 2182 goto out; 2183 2184 spin_lock(&dentry->d_lock); 2185 hlist_for_each_entry(child, &dentry->d_children, d_sib) { 2186 if (d_really_is_positive(child)) { 2187 all_negative = false; 2188 break; 2189 } 2190 } 2191 spin_unlock(&dentry->d_lock); 2192 2193 if (all_negative) 2194 shrink_dcache_parent(dentry); 2195 out: 2196 return all_negative; 2197 } 2198 2199 /* 2200 * Trim old(er) caps. 2201 * 2202 * Because we can't cache an inode without one or more caps, we do 2203 * this indirectly: if a cap is unused, we prune its aliases, at which 2204 * point the inode will hopefully get dropped to. 2205 * 2206 * Yes, this is a bit sloppy. Our only real goal here is to respond to 2207 * memory pressure from the MDS, though, so it needn't be perfect. 2208 */ 2209 static int trim_caps_cb(struct inode *inode, int mds, void *arg) 2210 { 2211 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); 2212 struct ceph_client *cl = mdsc->fsc->client; 2213 int *remaining = arg; 2214 struct ceph_inode_info *ci = ceph_inode(inode); 2215 int used, wanted, oissued, mine; 2216 struct ceph_cap *cap; 2217 2218 if (*remaining <= 0) 2219 return -1; 2220 2221 spin_lock(&ci->i_ceph_lock); 2222 cap = __get_cap_for_mds(ci, mds); 2223 if (!cap) { 2224 spin_unlock(&ci->i_ceph_lock); 2225 return 0; 2226 } 2227 mine = cap->issued | cap->implemented; 2228 used = __ceph_caps_used(ci); 2229 wanted = __ceph_caps_file_wanted(ci); 2230 oissued = __ceph_caps_issued_other(ci, cap); 2231 2232 doutc(cl, "%p %llx.%llx cap %p mine %s oissued %s used %s wanted %s\n", 2233 inode, ceph_vinop(inode), cap, ceph_cap_string(mine), 2234 ceph_cap_string(oissued), ceph_cap_string(used), 2235 ceph_cap_string(wanted)); 2236 if (cap == ci->i_auth_cap) { 2237 if (ci->i_dirty_caps || ci->i_flushing_caps || 2238 !list_empty(&ci->i_cap_snaps)) 2239 goto out; 2240 if ((used | wanted) & CEPH_CAP_ANY_WR) 2241 goto out; 2242 /* Note: it's possible that i_filelock_ref becomes non-zero 2243 * after dropping auth caps. It doesn't hurt because reply 2244 * of lock mds request will re-add auth caps. */ 2245 if (atomic_read(&ci->i_filelock_ref) > 0) 2246 goto out; 2247 } 2248 /* The inode has cached pages, but it's no longer used. 2249 * we can safely drop it */ 2250 if (S_ISREG(inode->i_mode) && 2251 wanted == 0 && used == CEPH_CAP_FILE_CACHE && 2252 !(oissued & CEPH_CAP_FILE_CACHE)) { 2253 used = 0; 2254 oissued = 0; 2255 } 2256 if ((used | wanted) & ~oissued & mine) 2257 goto out; /* we need these caps */ 2258 2259 if (oissued) { 2260 /* we aren't the only cap.. just remove us */ 2261 ceph_remove_cap(mdsc, cap, true); 2262 (*remaining)--; 2263 } else { 2264 struct dentry *dentry; 2265 /* try dropping referring dentries */ 2266 spin_unlock(&ci->i_ceph_lock); 2267 dentry = d_find_any_alias(inode); 2268 if (dentry && drop_negative_children(dentry)) { 2269 int count; 2270 dput(dentry); 2271 d_prune_aliases(inode); 2272 count = icount_read_once(inode); 2273 if (count == 1) 2274 (*remaining)--; 2275 doutc(cl, "%p %llx.%llx cap %p pruned, count now %d\n", 2276 inode, ceph_vinop(inode), cap, count); 2277 } else { 2278 dput(dentry); 2279 } 2280 return 0; 2281 } 2282 2283 out: 2284 spin_unlock(&ci->i_ceph_lock); 2285 return 0; 2286 } 2287 2288 /* 2289 * Trim session cap count down to some max number. 2290 */ 2291 int ceph_trim_caps(struct ceph_mds_client *mdsc, 2292 struct ceph_mds_session *session, 2293 int max_caps) 2294 { 2295 struct ceph_client *cl = mdsc->fsc->client; 2296 int trim_caps = session->s_nr_caps - max_caps; 2297 2298 doutc(cl, "mds%d start: %d / %d, trim %d\n", session->s_mds, 2299 session->s_nr_caps, max_caps, trim_caps); 2300 if (trim_caps > 0) { 2301 int remaining = trim_caps; 2302 2303 ceph_iterate_session_caps(session, trim_caps_cb, &remaining); 2304 doutc(cl, "mds%d done: %d / %d, trimmed %d\n", 2305 session->s_mds, session->s_nr_caps, max_caps, 2306 trim_caps - remaining); 2307 } 2308 2309 ceph_flush_session_cap_releases(mdsc, session); 2310 return 0; 2311 } 2312 2313 static int check_caps_flush(struct ceph_mds_client *mdsc, 2314 u64 want_flush_tid) 2315 { 2316 struct ceph_client *cl = mdsc->fsc->client; 2317 int ret = 1; 2318 2319 spin_lock(&mdsc->cap_dirty_lock); 2320 if (!list_empty(&mdsc->cap_flush_list)) { 2321 struct ceph_cap_flush *cf = 2322 list_first_entry(&mdsc->cap_flush_list, 2323 struct ceph_cap_flush, g_list); 2324 if (cf->tid <= want_flush_tid) { 2325 doutc(cl, "still flushing tid %llu <= %llu\n", 2326 cf->tid, want_flush_tid); 2327 ret = 0; 2328 } 2329 } 2330 spin_unlock(&mdsc->cap_dirty_lock); 2331 return ret; 2332 } 2333 2334 /* 2335 * Snapshot of a single cap_flush entry for diagnostic dump. 2336 * Collected under cap_dirty_lock, printed after releasing it. 2337 */ 2338 struct flush_dump_entry { 2339 u64 ino; /* inode number */ 2340 u64 snap; /* snap id */ 2341 int caps; /* dirty cap bits */ 2342 u64 tid; /* flush transaction id */ 2343 u64 last_ack; /* most recent ack tid for this inode */ 2344 bool wake; /* whether completion was requested */ 2345 bool is_capsnap; /* true if this is a cap snap flush */ 2346 bool ci_null; /* true if cf->ci was unexpectedly NULL */ 2347 }; 2348 2349 /* 2350 * Dump pending cap flushes for diagnostic purposes. 2351 * 2352 * cf->ci is safe to dereference here: cap_flush entries hold a 2353 * reference on the inode (via the cap), and entries are removed from 2354 * cap_flush_list under cap_dirty_lock before the cap (and thus the 2355 * inode reference) is released. Holding cap_dirty_lock therefore 2356 * guarantees the inode remains valid for the lifetime of the scan. 2357 */ 2358 2359 static void dump_cap_flushes(struct ceph_mds_client *mdsc, u64 want_tid) 2360 { 2361 struct ceph_client *cl = mdsc->fsc->client; 2362 struct flush_dump_entry entries[CEPH_CAP_FLUSH_MAX_DUMP_ENTRIES]; 2363 struct ceph_cap_flush *cf; 2364 int n = 0, remaining = 0; 2365 int i; 2366 2367 spin_lock(&mdsc->cap_dirty_lock); 2368 list_for_each_entry(cf, &mdsc->cap_flush_list, g_list) { 2369 if (cf->tid > want_tid) 2370 break; 2371 if (n < CEPH_CAP_FLUSH_MAX_DUMP_ENTRIES) { 2372 struct flush_dump_entry *e = &entries[n++]; 2373 2374 e->ci_null = WARN_ON_ONCE(!cf->ci); 2375 if (!e->ci_null) { 2376 e->ino = ceph_ino(&cf->ci->netfs.inode); 2377 e->snap = ceph_snap(&cf->ci->netfs.inode); 2378 e->last_ack = READ_ONCE(cf->ci->i_last_cap_flush_ack); 2379 } 2380 e->caps = cf->caps; 2381 e->tid = cf->tid; 2382 e->wake = cf->wake; 2383 e->is_capsnap = cf->is_capsnap; 2384 } else { 2385 remaining++; 2386 } 2387 } 2388 spin_unlock(&mdsc->cap_dirty_lock); 2389 2390 pr_info_client(cl, "still waiting for cap flushes through %llu:\n", 2391 want_tid); 2392 for (i = 0; i < n; i++) { 2393 struct flush_dump_entry *e = &entries[i]; 2394 2395 if (e->ci_null) 2396 pr_info_client(cl, 2397 " (null ci) %s tid=%llu wake=%d%s\n", 2398 ceph_cap_string(e->caps), e->tid, 2399 e->wake, 2400 e->is_capsnap ? " is_capsnap" : ""); 2401 else 2402 pr_info_client(cl, 2403 " %llx.%llx %s tid=%llu last_ack=%llu wake=%d%s\n", 2404 e->ino, e->snap, 2405 ceph_cap_string(e->caps), e->tid, 2406 e->last_ack, e->wake, 2407 e->is_capsnap ? " is_capsnap" : ""); 2408 } 2409 if (remaining) 2410 pr_info_client(cl, " ... and %d more pending flushes\n", 2411 remaining); 2412 } 2413 2414 /* 2415 * Wait for all cap flushes through @want_flush_tid to complete. 2416 * Periodically dumps pending cap flush state for diagnostics. 2417 */ 2418 static void wait_caps_flush(struct ceph_mds_client *mdsc, 2419 u64 want_flush_tid) 2420 { 2421 struct ceph_client *cl = mdsc->fsc->client; 2422 int i = 0; 2423 long ret; 2424 2425 doutc(cl, "want %llu\n", want_flush_tid); 2426 2427 do { 2428 /* 60 * HZ fits in a long on all supported architectures. */ 2429 ret = wait_event_timeout(mdsc->cap_flushing_wq, 2430 check_caps_flush(mdsc, want_flush_tid), 2431 CEPH_CAP_FLUSH_WAIT_TIMEOUT_SEC * HZ); 2432 if (ret == 0) { 2433 if (i < CEPH_CAP_FLUSH_MAX_DUMP_ITERS) 2434 dump_cap_flushes(mdsc, want_flush_tid); 2435 else if (i == CEPH_CAP_FLUSH_MAX_DUMP_ITERS) 2436 pr_info_client(cl, 2437 "still waiting for cap flushes; suppressing further dumps\n"); 2438 i++; 2439 } 2440 } while (ret == 0); 2441 2442 doutc(cl, "ok, flushed thru %llu\n", want_flush_tid); 2443 } 2444 2445 /* 2446 * called under s_mutex 2447 */ 2448 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 2449 struct ceph_mds_session *session) 2450 { 2451 struct ceph_client *cl = mdsc->fsc->client; 2452 struct ceph_msg *msg = NULL; 2453 struct ceph_mds_cap_release *head; 2454 struct ceph_mds_cap_item *item; 2455 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; 2456 struct ceph_cap *cap; 2457 LIST_HEAD(tmp_list); 2458 int num_cap_releases; 2459 __le32 barrier, *cap_barrier; 2460 2461 down_read(&osdc->lock); 2462 barrier = cpu_to_le32(osdc->epoch_barrier); 2463 up_read(&osdc->lock); 2464 2465 spin_lock(&session->s_cap_lock); 2466 again: 2467 list_splice_init(&session->s_cap_releases, &tmp_list); 2468 num_cap_releases = session->s_num_cap_releases; 2469 session->s_num_cap_releases = 0; 2470 spin_unlock(&session->s_cap_lock); 2471 2472 while (!list_empty(&tmp_list)) { 2473 if (!msg) { 2474 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, 2475 PAGE_SIZE, GFP_NOFS, false); 2476 if (!msg) 2477 goto out_err; 2478 head = msg->front.iov_base; 2479 head->num = cpu_to_le32(0); 2480 msg->front.iov_len = sizeof(*head); 2481 2482 msg->hdr.version = cpu_to_le16(2); 2483 msg->hdr.compat_version = cpu_to_le16(1); 2484 } 2485 2486 cap = list_first_entry(&tmp_list, struct ceph_cap, 2487 session_caps); 2488 list_del(&cap->session_caps); 2489 num_cap_releases--; 2490 2491 head = msg->front.iov_base; 2492 put_unaligned_le32(get_unaligned_le32(&head->num) + 1, 2493 &head->num); 2494 item = msg->front.iov_base + msg->front.iov_len; 2495 item->ino = cpu_to_le64(cap->cap_ino); 2496 item->cap_id = cpu_to_le64(cap->cap_id); 2497 item->migrate_seq = cpu_to_le32(cap->mseq); 2498 item->issue_seq = cpu_to_le32(cap->issue_seq); 2499 msg->front.iov_len += sizeof(*item); 2500 2501 ceph_put_cap(mdsc, cap); 2502 2503 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { 2504 // Append cap_barrier field 2505 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2506 *cap_barrier = barrier; 2507 msg->front.iov_len += sizeof(*cap_barrier); 2508 2509 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2510 doutc(cl, "mds%d %p\n", session->s_mds, msg); 2511 ceph_con_send(&session->s_con, msg); 2512 msg = NULL; 2513 } 2514 } 2515 2516 BUG_ON(num_cap_releases != 0); 2517 2518 spin_lock(&session->s_cap_lock); 2519 if (!list_empty(&session->s_cap_releases)) 2520 goto again; 2521 spin_unlock(&session->s_cap_lock); 2522 2523 if (msg) { 2524 // Append cap_barrier field 2525 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2526 *cap_barrier = barrier; 2527 msg->front.iov_len += sizeof(*cap_barrier); 2528 2529 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2530 doutc(cl, "mds%d %p\n", session->s_mds, msg); 2531 ceph_con_send(&session->s_con, msg); 2532 } 2533 return; 2534 out_err: 2535 pr_err_client(cl, "mds%d, failed to allocate message\n", 2536 session->s_mds); 2537 spin_lock(&session->s_cap_lock); 2538 list_splice(&tmp_list, &session->s_cap_releases); 2539 session->s_num_cap_releases += num_cap_releases; 2540 spin_unlock(&session->s_cap_lock); 2541 } 2542 2543 static void ceph_cap_release_work(struct work_struct *work) 2544 { 2545 struct ceph_mds_session *session = 2546 container_of(work, struct ceph_mds_session, s_cap_release_work); 2547 2548 mutex_lock(&session->s_mutex); 2549 if (session->s_state == CEPH_MDS_SESSION_OPEN || 2550 session->s_state == CEPH_MDS_SESSION_HUNG) 2551 ceph_send_cap_releases(session->s_mdsc, session); 2552 mutex_unlock(&session->s_mutex); 2553 ceph_put_mds_session(session); 2554 } 2555 2556 void ceph_flush_session_cap_releases(struct ceph_mds_client *mdsc, 2557 struct ceph_mds_session *session) 2558 { 2559 struct ceph_client *cl = mdsc->fsc->client; 2560 if (mdsc->stopping) 2561 return; 2562 2563 ceph_get_mds_session(session); 2564 if (queue_work(mdsc->fsc->cap_wq, 2565 &session->s_cap_release_work)) { 2566 doutc(cl, "cap release work queued\n"); 2567 } else { 2568 ceph_put_mds_session(session); 2569 doutc(cl, "failed to queue cap release work\n"); 2570 } 2571 } 2572 2573 /* 2574 * caller holds session->s_cap_lock 2575 */ 2576 void __ceph_queue_cap_release(struct ceph_mds_session *session, 2577 struct ceph_cap *cap) 2578 { 2579 list_add_tail(&cap->session_caps, &session->s_cap_releases); 2580 session->s_num_cap_releases++; 2581 2582 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE)) 2583 ceph_flush_session_cap_releases(session->s_mdsc, session); 2584 } 2585 2586 static void ceph_cap_reclaim_work(struct work_struct *work) 2587 { 2588 struct ceph_mds_client *mdsc = 2589 container_of(work, struct ceph_mds_client, cap_reclaim_work); 2590 int ret = ceph_trim_dentries(mdsc); 2591 if (ret == -EAGAIN) 2592 ceph_queue_cap_reclaim_work(mdsc); 2593 } 2594 2595 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc) 2596 { 2597 struct ceph_client *cl = mdsc->fsc->client; 2598 if (mdsc->stopping) 2599 return; 2600 2601 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) { 2602 doutc(cl, "caps reclaim work queued\n"); 2603 } else { 2604 doutc(cl, "failed to queue caps release work\n"); 2605 } 2606 } 2607 2608 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr) 2609 { 2610 int val; 2611 if (!nr) 2612 return; 2613 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending); 2614 if ((val % CEPH_CAPS_PER_RELEASE) < nr) { 2615 atomic_set(&mdsc->cap_reclaim_pending, 0); 2616 ceph_queue_cap_reclaim_work(mdsc); 2617 } 2618 } 2619 2620 void ceph_queue_cap_unlink_work(struct ceph_mds_client *mdsc) 2621 { 2622 struct ceph_client *cl = mdsc->fsc->client; 2623 if (mdsc->stopping) 2624 return; 2625 2626 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_unlink_work)) { 2627 doutc(cl, "caps unlink work queued\n"); 2628 } else { 2629 doutc(cl, "failed to queue caps unlink work\n"); 2630 } 2631 } 2632 2633 static void ceph_cap_unlink_work(struct work_struct *work) 2634 { 2635 struct ceph_mds_client *mdsc = 2636 container_of(work, struct ceph_mds_client, cap_unlink_work); 2637 struct ceph_client *cl = mdsc->fsc->client; 2638 2639 doutc(cl, "begin\n"); 2640 spin_lock(&mdsc->cap_delay_lock); 2641 while (!list_empty(&mdsc->cap_unlink_delay_list)) { 2642 struct ceph_inode_info *ci; 2643 struct inode *inode; 2644 2645 ci = list_first_entry(&mdsc->cap_unlink_delay_list, 2646 struct ceph_inode_info, 2647 i_cap_delay_list); 2648 list_del_init(&ci->i_cap_delay_list); 2649 2650 inode = igrab(&ci->netfs.inode); 2651 if (inode) { 2652 spin_unlock(&mdsc->cap_delay_lock); 2653 doutc(cl, "on %p %llx.%llx\n", inode, 2654 ceph_vinop(inode)); 2655 ceph_check_caps(ci, CHECK_CAPS_FLUSH); 2656 iput(inode); 2657 spin_lock(&mdsc->cap_delay_lock); 2658 } 2659 } 2660 spin_unlock(&mdsc->cap_delay_lock); 2661 doutc(cl, "done\n"); 2662 } 2663 2664 /* 2665 * requests 2666 */ 2667 2668 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, 2669 struct inode *dir) 2670 { 2671 struct ceph_inode_info *ci = ceph_inode(dir); 2672 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; 2673 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options; 2674 size_t size = sizeof(struct ceph_mds_reply_dir_entry); 2675 unsigned int num_entries; 2676 u64 bytes_count; 2677 int order; 2678 2679 spin_lock(&ci->i_ceph_lock); 2680 num_entries = ci->i_files + ci->i_subdirs; 2681 spin_unlock(&ci->i_ceph_lock); 2682 num_entries = max(num_entries, 1U); 2683 num_entries = min(num_entries, opt->max_readdir); 2684 2685 bytes_count = (u64)size * num_entries; 2686 if (unlikely(bytes_count > ULONG_MAX)) 2687 bytes_count = ULONG_MAX; 2688 2689 order = get_order((unsigned long)bytes_count); 2690 while (order >= 0) { 2691 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL | 2692 __GFP_NOWARN | 2693 __GFP_ZERO, 2694 order); 2695 if (rinfo->dir_entries) 2696 break; 2697 order--; 2698 } 2699 if (!rinfo->dir_entries || unlikely(order < 0)) 2700 return -ENOMEM; 2701 2702 num_entries = (PAGE_SIZE << order) / size; 2703 num_entries = min(num_entries, opt->max_readdir); 2704 2705 rinfo->dir_buf_size = PAGE_SIZE << order; 2706 req->r_num_caps = num_entries + 1; 2707 req->r_args.readdir.max_entries = cpu_to_le32(num_entries); 2708 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes); 2709 return 0; 2710 } 2711 2712 /* 2713 * Create an mds request. 2714 */ 2715 struct ceph_mds_request * 2716 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) 2717 { 2718 struct ceph_mds_request *req; 2719 2720 req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS); 2721 if (!req) 2722 return ERR_PTR(-ENOMEM); 2723 2724 mutex_init(&req->r_fill_mutex); 2725 req->r_mdsc = mdsc; 2726 req->r_started = jiffies; 2727 req->r_start_latency = ktime_get(); 2728 req->r_resend_mds = -1; 2729 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 2730 INIT_LIST_HEAD(&req->r_unsafe_target_item); 2731 req->r_fmode = -1; 2732 req->r_feature_needed = -1; 2733 kref_init(&req->r_kref); 2734 RB_CLEAR_NODE(&req->r_node); 2735 INIT_LIST_HEAD(&req->r_wait); 2736 init_completion(&req->r_completion); 2737 init_completion(&req->r_safe_completion); 2738 INIT_LIST_HEAD(&req->r_unsafe_item); 2739 2740 ktime_get_coarse_real_ts64(&req->r_stamp); 2741 2742 req->r_op = op; 2743 req->r_direct_mode = mode; 2744 return req; 2745 } 2746 2747 /* 2748 * return oldest (lowest) request, tid in request tree, 0 if none. 2749 * 2750 * called under mdsc->mutex. 2751 */ 2752 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) 2753 { 2754 if (RB_EMPTY_ROOT(&mdsc->request_tree)) 2755 return NULL; 2756 return rb_entry(rb_first(&mdsc->request_tree), 2757 struct ceph_mds_request, r_node); 2758 } 2759 2760 static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) 2761 { 2762 return mdsc->oldest_tid; 2763 } 2764 2765 #if IS_ENABLED(CONFIG_FS_ENCRYPTION) 2766 static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen) 2767 { 2768 struct inode *dir = req->r_parent; 2769 struct dentry *dentry = req->r_dentry; 2770 const struct qstr *name = req->r_dname; 2771 u8 *cryptbuf = NULL; 2772 u32 len = 0; 2773 int ret = 0; 2774 2775 /* only encode if we have parent and dentry */ 2776 if (!dir || !dentry) 2777 goto success; 2778 2779 /* No-op unless this is encrypted */ 2780 if (!IS_ENCRYPTED(dir)) 2781 goto success; 2782 2783 ret = ceph_fscrypt_prepare_readdir(dir); 2784 if (ret < 0) 2785 return ERR_PTR(ret); 2786 2787 /* No key? Just ignore it. */ 2788 if (!fscrypt_has_encryption_key(dir)) 2789 goto success; 2790 2791 if (!name) 2792 name = &dentry->d_name; 2793 2794 if (!fscrypt_fname_encrypted_size(dir, name->len, NAME_MAX, &len)) { 2795 WARN_ON_ONCE(1); 2796 return ERR_PTR(-ENAMETOOLONG); 2797 } 2798 2799 /* No need to append altname if name is short enough */ 2800 if (len <= CEPH_NOHASH_NAME_MAX) { 2801 len = 0; 2802 goto success; 2803 } 2804 2805 cryptbuf = kmalloc(len, GFP_KERNEL); 2806 if (!cryptbuf) 2807 return ERR_PTR(-ENOMEM); 2808 2809 ret = fscrypt_fname_encrypt(dir, name, cryptbuf, len); 2810 if (ret) { 2811 kfree(cryptbuf); 2812 return ERR_PTR(ret); 2813 } 2814 success: 2815 *plen = len; 2816 return cryptbuf; 2817 } 2818 #else 2819 static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen) 2820 { 2821 *plen = 0; 2822 return NULL; 2823 } 2824 #endif 2825 2826 /** 2827 * ceph_mdsc_build_path - build a path string to a given dentry 2828 * @mdsc: mds client 2829 * @dentry: dentry to which path should be built 2830 * @path_info: output path, length, base ino+snap, and freepath ownership flag 2831 * @for_wire: is this path going to be sent to the MDS? 2832 * 2833 * Build a string that represents the path to the dentry. This is mostly called 2834 * for two different purposes: 2835 * 2836 * 1) we need to build a path string to send to the MDS (for_wire == true) 2837 * 2) we need a path string for local presentation (e.g. debugfs) 2838 * (for_wire == false) 2839 * 2840 * The path is built in reverse, starting with the dentry. Walk back up toward 2841 * the root, building the path until the first non-snapped inode is reached 2842 * (for_wire) or the root inode is reached (!for_wire). 2843 * 2844 * Encode hidden .snap dirs as a double /, i.e. 2845 * foo/.snap/bar -> foo//bar 2846 */ 2847 char *ceph_mdsc_build_path(struct ceph_mds_client *mdsc, struct dentry *dentry, 2848 struct ceph_path_info *path_info, int for_wire) 2849 { 2850 struct ceph_client *cl = mdsc->fsc->client; 2851 struct dentry *cur; 2852 struct inode *inode; 2853 char *path; 2854 int pos; 2855 unsigned seq; 2856 u64 base; 2857 2858 if (!dentry) 2859 return ERR_PTR(-EINVAL); 2860 2861 path = __getname(); 2862 if (!path) 2863 return ERR_PTR(-ENOMEM); 2864 retry: 2865 pos = PATH_MAX - 1; 2866 path[pos] = '\0'; 2867 2868 seq = read_seqbegin(&rename_lock); 2869 cur = dget(dentry); 2870 for (;;) { 2871 struct dentry *parent; 2872 2873 spin_lock(&cur->d_lock); 2874 inode = d_inode(cur); 2875 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 2876 doutc(cl, "path+%d: %p SNAPDIR\n", pos, cur); 2877 spin_unlock(&cur->d_lock); 2878 parent = dget_parent(cur); 2879 } else if (for_wire && inode && dentry != cur && 2880 ceph_snap(inode) == CEPH_NOSNAP) { 2881 spin_unlock(&cur->d_lock); 2882 pos++; /* get rid of any prepended '/' */ 2883 break; 2884 } else if (!for_wire || !IS_ENCRYPTED(d_inode(cur->d_parent))) { 2885 pos -= cur->d_name.len; 2886 if (pos < 0) { 2887 spin_unlock(&cur->d_lock); 2888 break; 2889 } 2890 memcpy(path + pos, cur->d_name.name, cur->d_name.len); 2891 spin_unlock(&cur->d_lock); 2892 parent = dget_parent(cur); 2893 } else { 2894 int len, ret; 2895 char buf[NAME_MAX]; 2896 2897 /* 2898 * Proactively copy name into buf, in case we need to 2899 * present it as-is. 2900 */ 2901 memcpy(buf, cur->d_name.name, cur->d_name.len); 2902 len = cur->d_name.len; 2903 spin_unlock(&cur->d_lock); 2904 parent = dget_parent(cur); 2905 2906 ret = ceph_fscrypt_prepare_readdir(d_inode(parent)); 2907 if (ret < 0) { 2908 dput(parent); 2909 dput(cur); 2910 __putname(path); 2911 return ERR_PTR(ret); 2912 } 2913 2914 if (fscrypt_has_encryption_key(d_inode(parent))) { 2915 len = ceph_encode_encrypted_dname(d_inode(parent), 2916 buf, len); 2917 if (len < 0) { 2918 dput(parent); 2919 dput(cur); 2920 __putname(path); 2921 return ERR_PTR(len); 2922 } 2923 } 2924 pos -= len; 2925 if (pos < 0) { 2926 dput(parent); 2927 break; 2928 } 2929 memcpy(path + pos, buf, len); 2930 } 2931 dput(cur); 2932 cur = parent; 2933 2934 /* Are we at the root? */ 2935 if (IS_ROOT(cur)) 2936 break; 2937 2938 /* Are we out of buffer? */ 2939 if (--pos < 0) 2940 break; 2941 2942 path[pos] = '/'; 2943 } 2944 inode = d_inode(cur); 2945 base = inode ? ceph_ino(inode) : 0; 2946 dput(cur); 2947 2948 if (read_seqretry(&rename_lock, seq)) 2949 goto retry; 2950 2951 if (pos < 0) { 2952 /* 2953 * The path is longer than PATH_MAX and this function 2954 * cannot ever succeed. Creating paths that long is 2955 * possible with Ceph, but Linux cannot use them. 2956 */ 2957 __putname(path); 2958 return ERR_PTR(-ENAMETOOLONG); 2959 } 2960 2961 /* Initialize the output structure */ 2962 memset(path_info, 0, sizeof(*path_info)); 2963 2964 path_info->vino.ino = base; 2965 path_info->pathlen = PATH_MAX - 1 - pos; 2966 path_info->path = path + pos; 2967 path_info->freepath = true; 2968 2969 /* Set snap from dentry if available */ 2970 if (d_inode(dentry)) 2971 path_info->vino.snap = ceph_snap(d_inode(dentry)); 2972 else 2973 path_info->vino.snap = CEPH_NOSNAP; 2974 2975 doutc(cl, "on %p %d built %llx '%.*s'\n", dentry, d_count(dentry), 2976 base, PATH_MAX - 1 - pos, path + pos); 2977 return path + pos; 2978 } 2979 2980 static int build_dentry_path(struct ceph_mds_client *mdsc, struct dentry *dentry, 2981 struct inode *dir, struct ceph_path_info *path_info, 2982 bool parent_locked) 2983 { 2984 char *path; 2985 2986 rcu_read_lock(); 2987 if (!dir) 2988 dir = d_inode_rcu(dentry->d_parent); 2989 if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP && 2990 !IS_ENCRYPTED(dir)) { 2991 path_info->vino.ino = ceph_ino(dir); 2992 path_info->vino.snap = ceph_snap(dir); 2993 rcu_read_unlock(); 2994 path_info->path = dentry->d_name.name; 2995 path_info->pathlen = dentry->d_name.len; 2996 path_info->freepath = false; 2997 return 0; 2998 } 2999 rcu_read_unlock(); 3000 path = ceph_mdsc_build_path(mdsc, dentry, path_info, 1); 3001 if (IS_ERR(path)) 3002 return PTR_ERR(path); 3003 /* 3004 * ceph_mdsc_build_path already fills path_info, including snap handling. 3005 */ 3006 return 0; 3007 } 3008 3009 static int build_inode_path(struct inode *inode, struct ceph_path_info *path_info) 3010 { 3011 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); 3012 struct dentry *dentry; 3013 char *path; 3014 3015 if (ceph_snap(inode) == CEPH_NOSNAP) { 3016 path_info->vino.ino = ceph_ino(inode); 3017 path_info->vino.snap = ceph_snap(inode); 3018 path_info->pathlen = 0; 3019 path_info->freepath = false; 3020 return 0; 3021 } 3022 dentry = d_find_alias(inode); 3023 path = ceph_mdsc_build_path(mdsc, dentry, path_info, 1); 3024 dput(dentry); 3025 if (IS_ERR(path)) 3026 return PTR_ERR(path); 3027 /* 3028 * ceph_mdsc_build_path already fills path_info, including snap from dentry. 3029 * Override with inode's snap since that's what this function is for. 3030 */ 3031 path_info->vino.snap = ceph_snap(inode); 3032 return 0; 3033 } 3034 3035 /* 3036 * request arguments may be specified via an inode *, a dentry *, or 3037 * an explicit ino+path. 3038 */ 3039 static int set_request_path_attr(struct ceph_mds_client *mdsc, struct inode *rinode, 3040 struct dentry *rdentry, struct inode *rdiri, 3041 const char *rpath, u64 rino, 3042 struct ceph_path_info *path_info, 3043 bool parent_locked) 3044 { 3045 struct ceph_client *cl = mdsc->fsc->client; 3046 int r = 0; 3047 3048 /* Initialize the output structure */ 3049 memset(path_info, 0, sizeof(*path_info)); 3050 3051 if (rinode) { 3052 r = build_inode_path(rinode, path_info); 3053 doutc(cl, " inode %p %llx.%llx\n", rinode, ceph_ino(rinode), 3054 ceph_snap(rinode)); 3055 } else if (rdentry) { 3056 r = build_dentry_path(mdsc, rdentry, rdiri, path_info, parent_locked); 3057 doutc(cl, " dentry %p %llx/%.*s\n", rdentry, path_info->vino.ino, 3058 path_info->pathlen, path_info->path); 3059 } else if (rpath || rino) { 3060 path_info->vino.ino = rino; 3061 path_info->vino.snap = CEPH_NOSNAP; 3062 path_info->path = rpath; 3063 path_info->pathlen = rpath ? strlen(rpath) : 0; 3064 path_info->freepath = false; 3065 3066 doutc(cl, " path %.*s\n", path_info->pathlen, rpath); 3067 } 3068 3069 return r; 3070 } 3071 3072 static void encode_mclientrequest_tail(void **p, 3073 const struct ceph_mds_request *req) 3074 { 3075 struct ceph_timespec ts; 3076 int i; 3077 3078 ceph_encode_timespec64(&ts, &req->r_stamp); 3079 ceph_encode_copy(p, &ts, sizeof(ts)); 3080 3081 /* v4: gid_list */ 3082 ceph_encode_32(p, req->r_cred->group_info->ngroups); 3083 for (i = 0; i < req->r_cred->group_info->ngroups; i++) 3084 ceph_encode_64(p, from_kgid(&init_user_ns, 3085 req->r_cred->group_info->gid[i])); 3086 3087 /* v5: altname */ 3088 ceph_encode_32(p, req->r_altname_len); 3089 ceph_encode_copy(p, req->r_altname, req->r_altname_len); 3090 3091 /* v6: fscrypt_auth and fscrypt_file */ 3092 if (req->r_fscrypt_auth) { 3093 u32 authlen = ceph_fscrypt_auth_len(req->r_fscrypt_auth); 3094 3095 ceph_encode_32(p, authlen); 3096 ceph_encode_copy(p, req->r_fscrypt_auth, authlen); 3097 } else { 3098 ceph_encode_32(p, 0); 3099 } 3100 if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags)) { 3101 ceph_encode_32(p, sizeof(__le64)); 3102 ceph_encode_64(p, req->r_fscrypt_file); 3103 } else { 3104 ceph_encode_32(p, 0); 3105 } 3106 } 3107 3108 static inline u16 mds_supported_head_version(struct ceph_mds_session *session) 3109 { 3110 if (!test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD, &session->s_features)) 3111 return 1; 3112 3113 if (!test_bit(CEPHFS_FEATURE_HAS_OWNER_UIDGID, &session->s_features)) 3114 return 2; 3115 3116 return CEPH_MDS_REQUEST_HEAD_VERSION; 3117 } 3118 3119 static struct ceph_mds_request_head_legacy * 3120 find_legacy_request_head(void *p, u64 features) 3121 { 3122 bool legacy = !(features & CEPH_FEATURE_FS_BTIME); 3123 struct ceph_mds_request_head *head; 3124 3125 if (legacy) 3126 return (struct ceph_mds_request_head_legacy *)p; 3127 head = (struct ceph_mds_request_head *)p; 3128 return (struct ceph_mds_request_head_legacy *)&head->oldest_client_tid; 3129 } 3130 3131 /* 3132 * called under mdsc->mutex 3133 */ 3134 static struct ceph_msg *create_request_message(struct ceph_mds_session *session, 3135 struct ceph_mds_request *req, 3136 bool drop_cap_releases) 3137 { 3138 int mds = session->s_mds; 3139 struct ceph_mds_client *mdsc = session->s_mdsc; 3140 struct ceph_client *cl = mdsc->fsc->client; 3141 struct ceph_msg *msg; 3142 struct ceph_mds_request_head_legacy *lhead; 3143 struct ceph_path_info path_info1 = {0}; 3144 struct ceph_path_info path_info2 = {0}; 3145 struct dentry *old_dentry = NULL; 3146 int len; 3147 u16 releases; 3148 void *p, *end; 3149 int ret; 3150 bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME); 3151 u16 request_head_version = mds_supported_head_version(session); 3152 kuid_t caller_fsuid = req->r_cred->fsuid; 3153 kgid_t caller_fsgid = req->r_cred->fsgid; 3154 bool parent_locked = test_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); 3155 3156 ret = set_request_path_attr(mdsc, req->r_inode, req->r_dentry, 3157 req->r_parent, req->r_path1, req->r_ino1.ino, 3158 &path_info1, parent_locked); 3159 if (ret < 0) { 3160 msg = ERR_PTR(ret); 3161 goto out; 3162 } 3163 3164 /* 3165 * When the parent directory's i_rwsem is *not* locked, req->r_parent may 3166 * have become stale (e.g. after a concurrent rename) between the time the 3167 * dentry was looked up and now. If we detect that the stored r_parent 3168 * does not match the inode number we just encoded for the request, switch 3169 * to the correct inode so that the MDS receives a valid parent reference. 3170 */ 3171 if (!parent_locked && req->r_parent && path_info1.vino.ino && 3172 ceph_ino(req->r_parent) != path_info1.vino.ino) { 3173 struct inode *old_parent = req->r_parent; 3174 struct inode *correct_dir = ceph_get_inode(mdsc->fsc->sb, path_info1.vino, NULL); 3175 if (!IS_ERR(correct_dir)) { 3176 WARN_ONCE(1, "ceph: r_parent mismatch (had %llx wanted %llx) - updating\n", 3177 ceph_ino(old_parent), path_info1.vino.ino); 3178 /* 3179 * Transfer CEPH_CAP_PIN from the old parent to the new one. 3180 * The pin was taken earlier in ceph_mdsc_submit_request(). 3181 */ 3182 ceph_put_cap_refs(ceph_inode(old_parent), CEPH_CAP_PIN); 3183 iput(old_parent); 3184 req->r_parent = correct_dir; 3185 ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 3186 } 3187 } 3188 3189 /* If r_old_dentry is set, then assume that its parent is locked */ 3190 if (req->r_old_dentry && 3191 !(req->r_old_dentry->d_flags & DCACHE_DISCONNECTED)) 3192 old_dentry = req->r_old_dentry; 3193 ret = set_request_path_attr(mdsc, NULL, old_dentry, 3194 req->r_old_dentry_dir, 3195 req->r_path2, req->r_ino2.ino, 3196 &path_info2, true); 3197 if (ret < 0) { 3198 msg = ERR_PTR(ret); 3199 goto out_free1; 3200 } 3201 3202 req->r_altname = get_fscrypt_altname(req, &req->r_altname_len); 3203 if (IS_ERR(req->r_altname)) { 3204 msg = ERR_CAST(req->r_altname); 3205 req->r_altname = NULL; 3206 goto out_free2; 3207 } 3208 3209 /* 3210 * For old cephs without supporting the 32bit retry/fwd feature 3211 * it will copy the raw memories directly when decoding the 3212 * requests. While new cephs will decode the head depending the 3213 * version member, so we need to make sure it will be compatible 3214 * with them both. 3215 */ 3216 if (legacy) 3217 len = sizeof(struct ceph_mds_request_head_legacy); 3218 else if (request_head_version == 1) 3219 len = offsetofend(struct ceph_mds_request_head, args); 3220 else if (request_head_version == 2) 3221 len = offsetofend(struct ceph_mds_request_head, ext_num_fwd); 3222 else 3223 len = sizeof(struct ceph_mds_request_head); 3224 3225 /* filepaths */ 3226 len += 2 * (1 + sizeof(u32) + sizeof(u64)); 3227 len += path_info1.pathlen + path_info2.pathlen; 3228 3229 /* cap releases */ 3230 len += sizeof(struct ceph_mds_request_release) * 3231 (!!req->r_inode_drop + !!req->r_dentry_drop + 3232 !!req->r_old_inode_drop + !!req->r_old_dentry_drop); 3233 3234 if (req->r_dentry_drop) 3235 len += path_info1.pathlen; 3236 if (req->r_old_dentry_drop) 3237 len += path_info2.pathlen; 3238 3239 /* MClientRequest tail */ 3240 3241 /* req->r_stamp */ 3242 len += sizeof(struct ceph_timespec); 3243 3244 /* gid list */ 3245 len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups); 3246 3247 /* alternate name */ 3248 len += sizeof(u32) + req->r_altname_len; 3249 3250 /* fscrypt_auth */ 3251 len += sizeof(u32); // fscrypt_auth 3252 if (req->r_fscrypt_auth) 3253 len += ceph_fscrypt_auth_len(req->r_fscrypt_auth); 3254 3255 /* fscrypt_file */ 3256 len += sizeof(u32); 3257 if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags)) 3258 len += sizeof(__le64); 3259 3260 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false); 3261 if (!msg) { 3262 msg = ERR_PTR(-ENOMEM); 3263 goto out_free2; 3264 } 3265 3266 msg->hdr.tid = cpu_to_le64(req->r_tid); 3267 3268 lhead = find_legacy_request_head(msg->front.iov_base, 3269 session->s_con.peer_features); 3270 3271 if ((req->r_mnt_idmap != &nop_mnt_idmap) && 3272 !test_bit(CEPHFS_FEATURE_HAS_OWNER_UIDGID, &session->s_features)) { 3273 WARN_ON_ONCE(!IS_CEPH_MDS_OP_NEWINODE(req->r_op)); 3274 3275 if (enable_unsafe_idmap) { 3276 pr_warn_once_client(cl, 3277 "idmapped mount is used and CEPHFS_FEATURE_HAS_OWNER_UIDGID" 3278 " is not supported by MDS. UID/GID-based restrictions may" 3279 " not work properly.\n"); 3280 3281 caller_fsuid = from_vfsuid(req->r_mnt_idmap, &init_user_ns, 3282 VFSUIDT_INIT(req->r_cred->fsuid)); 3283 caller_fsgid = from_vfsgid(req->r_mnt_idmap, &init_user_ns, 3284 VFSGIDT_INIT(req->r_cred->fsgid)); 3285 } else { 3286 pr_err_ratelimited_client(cl, 3287 "idmapped mount is used and CEPHFS_FEATURE_HAS_OWNER_UIDGID" 3288 " is not supported by MDS. Fail request with -EIO.\n"); 3289 3290 ret = -EIO; 3291 goto out_err; 3292 } 3293 } 3294 3295 /* 3296 * The ceph_mds_request_head_legacy didn't contain a version field, and 3297 * one was added when we moved the message version from 3->4. 3298 */ 3299 if (legacy) { 3300 msg->hdr.version = cpu_to_le16(3); 3301 p = msg->front.iov_base + sizeof(*lhead); 3302 } else if (request_head_version == 1) { 3303 struct ceph_mds_request_head *nhead = msg->front.iov_base; 3304 3305 msg->hdr.version = cpu_to_le16(4); 3306 nhead->version = cpu_to_le16(1); 3307 p = msg->front.iov_base + offsetofend(struct ceph_mds_request_head, args); 3308 } else if (request_head_version == 2) { 3309 struct ceph_mds_request_head *nhead = msg->front.iov_base; 3310 3311 msg->hdr.version = cpu_to_le16(6); 3312 nhead->version = cpu_to_le16(2); 3313 3314 p = msg->front.iov_base + offsetofend(struct ceph_mds_request_head, ext_num_fwd); 3315 } else { 3316 struct ceph_mds_request_head *nhead = msg->front.iov_base; 3317 kuid_t owner_fsuid; 3318 kgid_t owner_fsgid; 3319 3320 msg->hdr.version = cpu_to_le16(6); 3321 nhead->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION); 3322 nhead->struct_len = cpu_to_le32(sizeof(struct ceph_mds_request_head)); 3323 3324 if (IS_CEPH_MDS_OP_NEWINODE(req->r_op)) { 3325 owner_fsuid = from_vfsuid(req->r_mnt_idmap, &init_user_ns, 3326 VFSUIDT_INIT(req->r_cred->fsuid)); 3327 owner_fsgid = from_vfsgid(req->r_mnt_idmap, &init_user_ns, 3328 VFSGIDT_INIT(req->r_cred->fsgid)); 3329 nhead->owner_uid = cpu_to_le32(from_kuid(&init_user_ns, owner_fsuid)); 3330 nhead->owner_gid = cpu_to_le32(from_kgid(&init_user_ns, owner_fsgid)); 3331 } else { 3332 nhead->owner_uid = cpu_to_le32(-1); 3333 nhead->owner_gid = cpu_to_le32(-1); 3334 } 3335 3336 p = msg->front.iov_base + sizeof(*nhead); 3337 } 3338 3339 end = msg->front.iov_base + msg->front.iov_len; 3340 3341 lhead->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); 3342 lhead->op = cpu_to_le32(req->r_op); 3343 lhead->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, 3344 caller_fsuid)); 3345 lhead->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, 3346 caller_fsgid)); 3347 lhead->ino = cpu_to_le64(req->r_deleg_ino); 3348 lhead->args = req->r_args; 3349 3350 ceph_encode_filepath(&p, end, path_info1.vino.ino, path_info1.path); 3351 ceph_encode_filepath(&p, end, path_info2.vino.ino, path_info2.path); 3352 3353 /* make note of release offset, in case we need to replay */ 3354 req->r_request_release_offset = p - msg->front.iov_base; 3355 3356 /* cap releases */ 3357 releases = 0; 3358 if (req->r_inode_drop) 3359 releases += ceph_encode_inode_release(&p, 3360 req->r_inode ? req->r_inode : d_inode(req->r_dentry), 3361 mds, req->r_inode_drop, req->r_inode_unless, 3362 req->r_op == CEPH_MDS_OP_READDIR); 3363 if (req->r_dentry_drop) { 3364 ret = ceph_encode_dentry_release(&p, req->r_dentry, 3365 req->r_parent, mds, req->r_dentry_drop, 3366 req->r_dentry_unless); 3367 if (ret < 0) 3368 goto out_err; 3369 releases += ret; 3370 } 3371 if (req->r_old_dentry_drop) { 3372 ret = ceph_encode_dentry_release(&p, req->r_old_dentry, 3373 req->r_old_dentry_dir, mds, 3374 req->r_old_dentry_drop, 3375 req->r_old_dentry_unless); 3376 if (ret < 0) 3377 goto out_err; 3378 releases += ret; 3379 } 3380 if (req->r_old_inode_drop) 3381 releases += ceph_encode_inode_release(&p, 3382 d_inode(req->r_old_dentry), 3383 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); 3384 3385 if (drop_cap_releases) { 3386 releases = 0; 3387 p = msg->front.iov_base + req->r_request_release_offset; 3388 } 3389 3390 lhead->num_releases = cpu_to_le16(releases); 3391 3392 encode_mclientrequest_tail(&p, req); 3393 3394 if (WARN_ON_ONCE(p > end)) { 3395 ceph_msg_put(msg); 3396 msg = ERR_PTR(-ERANGE); 3397 goto out_free2; 3398 } 3399 3400 msg->front.iov_len = p - msg->front.iov_base; 3401 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 3402 3403 if (req->r_pagelist) { 3404 struct ceph_pagelist *pagelist = req->r_pagelist; 3405 ceph_msg_data_add_pagelist(msg, pagelist); 3406 msg->hdr.data_len = cpu_to_le32(pagelist->length); 3407 } else { 3408 msg->hdr.data_len = 0; 3409 } 3410 3411 msg->hdr.data_off = cpu_to_le16(0); 3412 3413 out_free2: 3414 ceph_mdsc_free_path_info(&path_info2); 3415 out_free1: 3416 ceph_mdsc_free_path_info(&path_info1); 3417 out: 3418 return msg; 3419 out_err: 3420 ceph_msg_put(msg); 3421 msg = ERR_PTR(ret); 3422 goto out_free2; 3423 } 3424 3425 /* 3426 * called under mdsc->mutex if error, under no mutex if 3427 * success. 3428 */ 3429 static void complete_request(struct ceph_mds_client *mdsc, 3430 struct ceph_mds_request *req) 3431 { 3432 req->r_end_latency = ktime_get(); 3433 3434 trace_ceph_mdsc_complete_request(mdsc, req); 3435 3436 if (req->r_callback) 3437 req->r_callback(mdsc, req); 3438 complete_all(&req->r_completion); 3439 } 3440 3441 /* 3442 * called under mdsc->mutex 3443 */ 3444 static int __prepare_send_request(struct ceph_mds_session *session, 3445 struct ceph_mds_request *req, 3446 bool drop_cap_releases) 3447 { 3448 int mds = session->s_mds; 3449 struct ceph_mds_client *mdsc = session->s_mdsc; 3450 struct ceph_client *cl = mdsc->fsc->client; 3451 struct ceph_mds_request_head_legacy *lhead; 3452 struct ceph_mds_request_head *nhead; 3453 struct ceph_msg *msg; 3454 int flags = 0, old_max_retry; 3455 bool old_version = !test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD, 3456 &session->s_features); 3457 3458 /* 3459 * Avoid infinite retrying after overflow. The client will 3460 * increase the retry count and if the MDS is old version, 3461 * so we limit to retry at most 256 times. 3462 */ 3463 if (req->r_attempts) { 3464 old_max_retry = sizeof_field(struct ceph_mds_request_head, 3465 num_retry); 3466 old_max_retry = 1 << (old_max_retry * BITS_PER_BYTE); 3467 if ((old_version && req->r_attempts >= old_max_retry) || 3468 ((uint32_t)req->r_attempts >= U32_MAX)) { 3469 pr_warn_ratelimited_client(cl, "request tid %llu seq overflow\n", 3470 req->r_tid); 3471 return -EMULTIHOP; 3472 } 3473 } 3474 3475 req->r_attempts++; 3476 if (req->r_inode) { 3477 struct ceph_cap *cap = 3478 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); 3479 3480 if (cap) 3481 req->r_sent_on_mseq = cap->mseq; 3482 else 3483 req->r_sent_on_mseq = -1; 3484 } 3485 doutc(cl, "%p tid %lld %s (attempt %d)\n", req, req->r_tid, 3486 ceph_mds_op_name(req->r_op), req->r_attempts); 3487 3488 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3489 void *p; 3490 3491 /* 3492 * Replay. Do not regenerate message (and rebuild 3493 * paths, etc.); just use the original message. 3494 * Rebuilding paths will break for renames because 3495 * d_move mangles the src name. 3496 */ 3497 msg = req->r_request; 3498 lhead = find_legacy_request_head(msg->front.iov_base, 3499 session->s_con.peer_features); 3500 3501 flags = le32_to_cpu(lhead->flags); 3502 flags |= CEPH_MDS_FLAG_REPLAY; 3503 lhead->flags = cpu_to_le32(flags); 3504 3505 if (req->r_target_inode) 3506 lhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); 3507 3508 lhead->num_retry = req->r_attempts - 1; 3509 if (!old_version) { 3510 nhead = (struct ceph_mds_request_head*)msg->front.iov_base; 3511 nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1); 3512 } 3513 3514 /* remove cap/dentry releases from message */ 3515 lhead->num_releases = 0; 3516 3517 p = msg->front.iov_base + req->r_request_release_offset; 3518 encode_mclientrequest_tail(&p, req); 3519 3520 msg->front.iov_len = p - msg->front.iov_base; 3521 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 3522 return 0; 3523 } 3524 3525 if (req->r_request) { 3526 ceph_msg_put(req->r_request); 3527 req->r_request = NULL; 3528 } 3529 msg = create_request_message(session, req, drop_cap_releases); 3530 if (IS_ERR(msg)) { 3531 req->r_err = PTR_ERR(msg); 3532 return PTR_ERR(msg); 3533 } 3534 req->r_request = msg; 3535 3536 lhead = find_legacy_request_head(msg->front.iov_base, 3537 session->s_con.peer_features); 3538 lhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 3539 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3540 flags |= CEPH_MDS_FLAG_REPLAY; 3541 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) 3542 flags |= CEPH_MDS_FLAG_ASYNC; 3543 if (req->r_parent) 3544 flags |= CEPH_MDS_FLAG_WANT_DENTRY; 3545 lhead->flags = cpu_to_le32(flags); 3546 lhead->num_fwd = req->r_num_fwd; 3547 lhead->num_retry = req->r_attempts - 1; 3548 if (!old_version) { 3549 nhead = (struct ceph_mds_request_head*)msg->front.iov_base; 3550 nhead->ext_num_fwd = cpu_to_le32(req->r_num_fwd); 3551 nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1); 3552 } 3553 3554 doutc(cl, " r_parent = %p\n", req->r_parent); 3555 return 0; 3556 } 3557 3558 /* 3559 * called under mdsc->mutex 3560 */ 3561 static int __send_request(struct ceph_mds_session *session, 3562 struct ceph_mds_request *req, 3563 bool drop_cap_releases) 3564 { 3565 int err; 3566 3567 trace_ceph_mdsc_send_request(session, req); 3568 3569 err = __prepare_send_request(session, req, drop_cap_releases); 3570 if (!err) { 3571 ceph_msg_get(req->r_request); 3572 ceph_con_send(&session->s_con, req->r_request); 3573 } 3574 3575 return err; 3576 } 3577 3578 /* 3579 * send request, or put it on the appropriate wait list. 3580 */ 3581 static void __do_request(struct ceph_mds_client *mdsc, 3582 struct ceph_mds_request *req) 3583 { 3584 struct ceph_client *cl = mdsc->fsc->client; 3585 struct ceph_mds_session *session = NULL; 3586 int mds = -1; 3587 int err = 0; 3588 bool random; 3589 3590 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 3591 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) 3592 __unregister_request(mdsc, req); 3593 return; 3594 } 3595 3596 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) { 3597 doutc(cl, "metadata corrupted\n"); 3598 err = -EIO; 3599 goto finish; 3600 } 3601 if (req->r_timeout && 3602 time_after_eq(jiffies, req->r_started + req->r_timeout)) { 3603 doutc(cl, "timed out\n"); 3604 err = -ETIMEDOUT; 3605 goto finish; 3606 } 3607 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 3608 doutc(cl, "forced umount\n"); 3609 err = -EIO; 3610 goto finish; 3611 } 3612 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) { 3613 if (mdsc->mdsmap_err) { 3614 err = mdsc->mdsmap_err; 3615 doutc(cl, "mdsmap err %d\n", err); 3616 goto finish; 3617 } 3618 if (mdsc->mdsmap->m_epoch == 0) { 3619 doutc(cl, "no mdsmap, waiting for map\n"); 3620 trace_ceph_mdsc_suspend_request(mdsc, session, req, 3621 ceph_mdsc_suspend_reason_no_mdsmap); 3622 list_add(&req->r_wait, &mdsc->waiting_for_map); 3623 return; 3624 } 3625 if (!(mdsc->fsc->mount_options->flags & 3626 CEPH_MOUNT_OPT_MOUNTWAIT) && 3627 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) { 3628 err = -EHOSTUNREACH; 3629 goto finish; 3630 } 3631 } 3632 3633 put_request_session(req); 3634 3635 mds = __choose_mds(mdsc, req, &random); 3636 if (mds < 0 || 3637 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { 3638 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 3639 err = -EJUKEBOX; 3640 goto finish; 3641 } 3642 doutc(cl, "no mds or not active, waiting for map\n"); 3643 trace_ceph_mdsc_suspend_request(mdsc, session, req, 3644 ceph_mdsc_suspend_reason_no_active_mds); 3645 list_add(&req->r_wait, &mdsc->waiting_for_map); 3646 return; 3647 } 3648 3649 /* get, open session */ 3650 session = __ceph_lookup_mds_session(mdsc, mds); 3651 if (!session) { 3652 session = register_session(mdsc, mds); 3653 if (IS_ERR(session)) { 3654 err = PTR_ERR(session); 3655 goto finish; 3656 } 3657 } 3658 req->r_session = ceph_get_mds_session(session); 3659 3660 doutc(cl, "mds%d session %p state %s\n", mds, session, 3661 ceph_session_state_name(session->s_state)); 3662 3663 /* 3664 * The old ceph will crash the MDSs when see unknown OPs 3665 */ 3666 if (req->r_feature_needed > 0 && 3667 !test_bit(req->r_feature_needed, &session->s_features)) { 3668 err = -EOPNOTSUPP; 3669 goto out_session; 3670 } 3671 3672 if (session->s_state != CEPH_MDS_SESSION_OPEN && 3673 session->s_state != CEPH_MDS_SESSION_HUNG) { 3674 /* 3675 * We cannot queue async requests since the caps and delegated 3676 * inodes are bound to the session. Just return -EJUKEBOX and 3677 * let the caller retry a sync request in that case. 3678 */ 3679 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 3680 err = -EJUKEBOX; 3681 goto out_session; 3682 } 3683 3684 /* 3685 * If the session has been REJECTED, then return a hard error, 3686 * unless it's a CLEANRECOVER mount, in which case we'll queue 3687 * it to the mdsc queue. 3688 */ 3689 if (session->s_state == CEPH_MDS_SESSION_REJECTED) { 3690 if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER)) { 3691 trace_ceph_mdsc_suspend_request(mdsc, session, req, 3692 ceph_mdsc_suspend_reason_rejected); 3693 list_add(&req->r_wait, &mdsc->waiting_for_map); 3694 } else 3695 err = -EACCES; 3696 goto out_session; 3697 } 3698 3699 if (session->s_state == CEPH_MDS_SESSION_NEW || 3700 session->s_state == CEPH_MDS_SESSION_CLOSING) { 3701 err = __open_session(mdsc, session); 3702 if (err) 3703 goto out_session; 3704 /* retry the same mds later */ 3705 if (random) 3706 req->r_resend_mds = mds; 3707 } 3708 trace_ceph_mdsc_suspend_request(mdsc, session, req, 3709 ceph_mdsc_suspend_reason_session); 3710 list_add(&req->r_wait, &session->s_waiting); 3711 goto out_session; 3712 } 3713 3714 /* send request */ 3715 req->r_resend_mds = -1; /* forget any previous mds hint */ 3716 3717 if (req->r_request_started == 0) /* note request start time */ 3718 req->r_request_started = jiffies; 3719 3720 /* 3721 * For async create we will choose the auth MDS of frag in parent 3722 * directory to send the request and usually this works fine, but 3723 * if the migrated the dirtory to another MDS before it could handle 3724 * it the request will be forwarded. 3725 * 3726 * And then the auth cap will be changed. 3727 */ 3728 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags) && req->r_num_fwd) { 3729 struct ceph_dentry_info *di = ceph_dentry(req->r_dentry); 3730 struct ceph_inode_info *ci; 3731 struct ceph_cap *cap; 3732 3733 /* 3734 * The request maybe handled very fast and the new inode 3735 * hasn't been linked to the dentry yet. We need to wait 3736 * for the ceph_finish_async_create(), which shouldn't be 3737 * stuck too long or fail in thoery, to finish when forwarding 3738 * the request. 3739 */ 3740 if (!d_inode(req->r_dentry)) { 3741 err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_CREATE_BIT, 3742 TASK_KILLABLE); 3743 if (err) { 3744 mutex_lock(&req->r_fill_mutex); 3745 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 3746 mutex_unlock(&req->r_fill_mutex); 3747 goto out_session; 3748 } 3749 } 3750 3751 ci = ceph_inode(d_inode(req->r_dentry)); 3752 3753 spin_lock(&ci->i_ceph_lock); 3754 cap = ci->i_auth_cap; 3755 if (test_bit(CEPH_I_ASYNC_CREATE_BIT, &ci->i_ceph_flags) && 3756 mds != cap->mds) { 3757 doutc(cl, "session changed for auth cap %d -> %d\n", 3758 cap->session->s_mds, session->s_mds); 3759 3760 /* Remove the auth cap from old session */ 3761 spin_lock(&cap->session->s_cap_lock); 3762 cap->session->s_nr_caps--; 3763 list_del_init(&cap->session_caps); 3764 spin_unlock(&cap->session->s_cap_lock); 3765 3766 /* Add the auth cap to the new session */ 3767 cap->mds = mds; 3768 cap->session = session; 3769 spin_lock(&session->s_cap_lock); 3770 session->s_nr_caps++; 3771 list_add_tail(&cap->session_caps, &session->s_caps); 3772 spin_unlock(&session->s_cap_lock); 3773 3774 change_auth_cap_ses(ci, session); 3775 } 3776 spin_unlock(&ci->i_ceph_lock); 3777 } 3778 3779 err = __send_request(session, req, false); 3780 3781 out_session: 3782 ceph_put_mds_session(session); 3783 finish: 3784 if (err) { 3785 doutc(cl, "early error %d\n", err); 3786 req->r_err = err; 3787 complete_request(mdsc, req); 3788 __unregister_request(mdsc, req); 3789 } 3790 return; 3791 } 3792 3793 /* 3794 * called under mdsc->mutex 3795 */ 3796 static void __wake_requests(struct ceph_mds_client *mdsc, 3797 struct list_head *head) 3798 { 3799 struct ceph_client *cl = mdsc->fsc->client; 3800 struct ceph_mds_request *req; 3801 LIST_HEAD(tmp_list); 3802 3803 list_splice_init(head, &tmp_list); 3804 3805 while (!list_empty(&tmp_list)) { 3806 req = list_entry(tmp_list.next, 3807 struct ceph_mds_request, r_wait); 3808 list_del_init(&req->r_wait); 3809 doutc(cl, " wake request %p tid %llu\n", req, 3810 req->r_tid); 3811 trace_ceph_mdsc_resume_request(mdsc, req); 3812 __do_request(mdsc, req); 3813 } 3814 } 3815 3816 /* 3817 * Wake up threads with requests pending for @mds, so that they can 3818 * resubmit their requests to a possibly different mds. 3819 */ 3820 static void kick_requests(struct ceph_mds_client *mdsc, int mds) 3821 { 3822 struct ceph_client *cl = mdsc->fsc->client; 3823 struct ceph_mds_request *req; 3824 struct rb_node *p = rb_first(&mdsc->request_tree); 3825 3826 doutc(cl, "kick_requests mds%d\n", mds); 3827 while (p) { 3828 req = rb_entry(p, struct ceph_mds_request, r_node); 3829 p = rb_next(p); 3830 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3831 continue; 3832 if (req->r_attempts > 0) 3833 continue; /* only new requests */ 3834 if (req->r_session && 3835 req->r_session->s_mds == mds) { 3836 doutc(cl, " kicking tid %llu\n", req->r_tid); 3837 list_del_init(&req->r_wait); 3838 trace_ceph_mdsc_resume_request(mdsc, req); 3839 __do_request(mdsc, req); 3840 } 3841 } 3842 } 3843 3844 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, 3845 struct ceph_mds_request *req) 3846 { 3847 struct ceph_client *cl = mdsc->fsc->client; 3848 int err = 0; 3849 3850 /* 3851 * If a reset is in progress, wait for it to complete. 3852 * 3853 * This is best-effort: a request can pass this check just 3854 * before the phase leaves IDLE and proceed concurrently with 3855 * reset. That is acceptable because (a) such requests will 3856 * either complete normally or fail and be retried by the 3857 * caller, and (b) adding lock serialization here would 3858 * penalize every request for a rare manual operation. 3859 */ 3860 err = ceph_mdsc_wait_for_reset(mdsc); 3861 if (err) { 3862 doutc(cl, "wait_for_reset failed: %d\n", err); 3863 return err; 3864 } 3865 3866 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */ 3867 if (req->r_inode) 3868 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 3869 if (req->r_parent) { 3870 struct ceph_inode_info *ci = ceph_inode(req->r_parent); 3871 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ? 3872 CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD; 3873 spin_lock(&ci->i_ceph_lock); 3874 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false); 3875 __ceph_touch_fmode(ci, mdsc, fmode); 3876 spin_unlock(&ci->i_ceph_lock); 3877 } 3878 if (req->r_old_dentry_dir) 3879 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 3880 CEPH_CAP_PIN); 3881 3882 if (req->r_inode) { 3883 err = ceph_wait_on_async_create(req->r_inode); 3884 if (err) { 3885 doutc(cl, "wait for async create returned: %d\n", err); 3886 return err; 3887 } 3888 } 3889 3890 if (!err && req->r_old_inode) { 3891 err = ceph_wait_on_async_create(req->r_old_inode); 3892 if (err) { 3893 doutc(cl, "wait for async create returned: %d\n", err); 3894 return err; 3895 } 3896 } 3897 3898 doutc(cl, "submit_request on %p for inode %p\n", req, dir); 3899 mutex_lock(&mdsc->mutex); 3900 __register_request(mdsc, req, dir); 3901 trace_ceph_mdsc_submit_request(mdsc, req); 3902 __do_request(mdsc, req); 3903 err = req->r_err; 3904 mutex_unlock(&mdsc->mutex); 3905 return err; 3906 } 3907 3908 int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc, 3909 struct ceph_mds_request *req, 3910 ceph_mds_request_wait_callback_t wait_func) 3911 { 3912 struct ceph_client *cl = mdsc->fsc->client; 3913 int err; 3914 3915 /* wait */ 3916 doutc(cl, "do_request waiting\n"); 3917 if (wait_func) { 3918 err = wait_func(mdsc, req); 3919 } else { 3920 long timeleft = wait_for_completion_killable_timeout( 3921 &req->r_completion, 3922 ceph_timeout_jiffies(req->r_timeout)); 3923 if (timeleft > 0) 3924 err = 0; 3925 else if (!timeleft) 3926 err = -ETIMEDOUT; /* timed out */ 3927 else 3928 err = timeleft; /* killed */ 3929 } 3930 doutc(cl, "do_request waited, got %d\n", err); 3931 mutex_lock(&mdsc->mutex); 3932 3933 /* only abort if we didn't race with a real reply */ 3934 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 3935 err = le32_to_cpu(req->r_reply_info.head->result); 3936 } else if (err < 0) { 3937 doutc(cl, "aborted request %lld with %d\n", req->r_tid, err); 3938 3939 /* 3940 * ensure we aren't running concurrently with 3941 * ceph_fill_trace or ceph_readdir_prepopulate, which 3942 * rely on locks (dir mutex) held by our caller. 3943 */ 3944 mutex_lock(&req->r_fill_mutex); 3945 req->r_err = err; 3946 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 3947 mutex_unlock(&req->r_fill_mutex); 3948 3949 if (req->r_parent && 3950 (req->r_op & CEPH_MDS_OP_WRITE)) 3951 ceph_invalidate_dir_request(req); 3952 } else { 3953 err = req->r_err; 3954 } 3955 3956 mutex_unlock(&mdsc->mutex); 3957 return err; 3958 } 3959 3960 /* 3961 * Synchrously perform an mds request. Take care of all of the 3962 * session setup, forwarding, retry details. 3963 */ 3964 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, 3965 struct inode *dir, 3966 struct ceph_mds_request *req) 3967 { 3968 struct ceph_client *cl = mdsc->fsc->client; 3969 int err; 3970 3971 doutc(cl, "do_request on %p\n", req); 3972 3973 /* issue */ 3974 err = ceph_mdsc_submit_request(mdsc, dir, req); 3975 if (!err) 3976 err = ceph_mdsc_wait_request(mdsc, req, NULL); 3977 doutc(cl, "do_request %p done, result %d\n", req, err); 3978 return err; 3979 } 3980 3981 /* 3982 * Invalidate dir's completeness, dentry lease state on an aborted MDS 3983 * namespace request. 3984 */ 3985 void ceph_invalidate_dir_request(struct ceph_mds_request *req) 3986 { 3987 struct inode *dir = req->r_parent; 3988 struct inode *old_dir = req->r_old_dentry_dir; 3989 struct ceph_client *cl = req->r_mdsc->fsc->client; 3990 3991 doutc(cl, "invalidate_dir_request %p %p (complete, lease(s))\n", 3992 dir, old_dir); 3993 3994 ceph_dir_clear_complete(dir); 3995 if (old_dir) 3996 ceph_dir_clear_complete(old_dir); 3997 if (req->r_dentry) 3998 ceph_invalidate_dentry_lease(req->r_dentry); 3999 if (req->r_old_dentry) 4000 ceph_invalidate_dentry_lease(req->r_old_dentry); 4001 } 4002 4003 /* 4004 * Handle mds reply. 4005 * 4006 * We take the session mutex and parse and process the reply immediately. 4007 * This preserves the logical ordering of replies, capabilities, etc., sent 4008 * by the MDS as they are applied to our local cache. 4009 */ 4010 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) 4011 { 4012 struct ceph_mds_client *mdsc = session->s_mdsc; 4013 struct ceph_client *cl = mdsc->fsc->client; 4014 struct ceph_mds_request *req; 4015 struct ceph_mds_reply_head *head = msg->front.iov_base; 4016 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 4017 struct ceph_snap_realm *realm; 4018 u64 tid; 4019 int err, result; 4020 int mds = session->s_mds; 4021 bool close_sessions = false; 4022 4023 if (msg->front.iov_len < sizeof(*head)) { 4024 pr_err_client(cl, "got corrupt (short) reply\n"); 4025 ceph_msg_dump(msg); 4026 return; 4027 } 4028 4029 /* get request, session */ 4030 tid = le64_to_cpu(msg->hdr.tid); 4031 mutex_lock(&mdsc->mutex); 4032 req = lookup_get_request(mdsc, tid); 4033 if (!req) { 4034 doutc(cl, "on unknown tid %llu\n", tid); 4035 mutex_unlock(&mdsc->mutex); 4036 return; 4037 } 4038 doutc(cl, "handle_reply %p\n", req); 4039 4040 /* correct session? */ 4041 if (req->r_session != session) { 4042 pr_err_client(cl, "got %llu on session mds%d not mds%d\n", 4043 tid, session->s_mds, 4044 req->r_session ? req->r_session->s_mds : -1); 4045 mutex_unlock(&mdsc->mutex); 4046 goto out; 4047 } 4048 4049 /* dup? */ 4050 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) || 4051 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) { 4052 pr_warn_client(cl, "got a dup %s reply on %llu from mds%d\n", 4053 head->safe ? "safe" : "unsafe", tid, mds); 4054 mutex_unlock(&mdsc->mutex); 4055 goto out; 4056 } 4057 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) { 4058 pr_warn_client(cl, "got unsafe after safe on %llu from mds%d\n", 4059 tid, mds); 4060 mutex_unlock(&mdsc->mutex); 4061 goto out; 4062 } 4063 4064 result = le32_to_cpu(head->result); 4065 4066 if (head->safe) { 4067 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags); 4068 __unregister_request(mdsc, req); 4069 4070 /* last request during umount? */ 4071 if (mdsc->stopping && !__get_oldest_req(mdsc)) 4072 complete_all(&mdsc->safe_umount_waiters); 4073 4074 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 4075 /* 4076 * We already handled the unsafe response, now do the 4077 * cleanup. No need to examine the response; the MDS 4078 * doesn't include any result info in the safe 4079 * response. And even if it did, there is nothing 4080 * useful we could do with a revised return value. 4081 */ 4082 doutc(cl, "got safe reply %llu, mds%d\n", tid, mds); 4083 4084 mutex_unlock(&mdsc->mutex); 4085 goto out; 4086 } 4087 } else { 4088 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags); 4089 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 4090 } 4091 4092 doutc(cl, "tid %lld result %d\n", tid, result); 4093 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features)) 4094 err = parse_reply_info(session, msg, req, (u64)-1); 4095 else 4096 err = parse_reply_info(session, msg, req, 4097 session->s_con.peer_features); 4098 mutex_unlock(&mdsc->mutex); 4099 4100 /* Must find target inode outside of mutexes to avoid deadlocks */ 4101 rinfo = &req->r_reply_info; 4102 if ((err >= 0) && rinfo->head->is_target) { 4103 struct inode *in = xchg(&req->r_new_inode, NULL); 4104 struct ceph_vino tvino = { 4105 .ino = le64_to_cpu(rinfo->targeti.in->ino), 4106 .snap = le64_to_cpu(rinfo->targeti.in->snapid) 4107 }; 4108 4109 /* 4110 * If we ended up opening an existing inode, discard 4111 * r_new_inode 4112 */ 4113 if (req->r_op == CEPH_MDS_OP_CREATE && 4114 !req->r_reply_info.has_create_ino) { 4115 /* This should never happen on an async create */ 4116 WARN_ON_ONCE(req->r_deleg_ino); 4117 iput(in); 4118 in = NULL; 4119 } 4120 4121 in = ceph_get_inode(mdsc->fsc->sb, tvino, in); 4122 if (IS_ERR(in)) { 4123 err = PTR_ERR(in); 4124 mutex_lock(&session->s_mutex); 4125 goto out_err; 4126 } 4127 req->r_target_inode = in; 4128 ceph_inode_set_subvolume(in, rinfo->targeti.subvolume_id); 4129 } 4130 4131 mutex_lock(&session->s_mutex); 4132 if (err < 0) { 4133 pr_err_client(cl, "got corrupt reply mds%d(tid:%lld)\n", 4134 mds, tid); 4135 ceph_msg_dump(msg); 4136 goto out_err; 4137 } 4138 4139 /* snap trace */ 4140 realm = NULL; 4141 if (rinfo->snapblob_len) { 4142 down_write(&mdsc->snap_rwsem); 4143 err = ceph_update_snap_trace(mdsc, rinfo->snapblob, 4144 rinfo->snapblob + rinfo->snapblob_len, 4145 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP, 4146 &realm); 4147 if (err) { 4148 up_write(&mdsc->snap_rwsem); 4149 close_sessions = true; 4150 if (err == -EIO) 4151 ceph_msg_dump(msg); 4152 goto out_err; 4153 } 4154 downgrade_write(&mdsc->snap_rwsem); 4155 } else { 4156 down_read(&mdsc->snap_rwsem); 4157 } 4158 4159 /* insert trace into our cache */ 4160 mutex_lock(&req->r_fill_mutex); 4161 current->journal_info = req; 4162 err = ceph_fill_trace(mdsc->fsc->sb, req); 4163 if (err == 0) { 4164 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || 4165 req->r_op == CEPH_MDS_OP_LSSNAP)) 4166 err = ceph_readdir_prepopulate(req, req->r_session); 4167 } 4168 current->journal_info = NULL; 4169 mutex_unlock(&req->r_fill_mutex); 4170 4171 up_read(&mdsc->snap_rwsem); 4172 if (realm) 4173 ceph_put_snap_realm(mdsc, realm); 4174 4175 if (err == 0) { 4176 if (req->r_target_inode && 4177 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 4178 struct ceph_inode_info *ci = 4179 ceph_inode(req->r_target_inode); 4180 spin_lock(&ci->i_unsafe_lock); 4181 list_add_tail(&req->r_unsafe_target_item, 4182 &ci->i_unsafe_iops); 4183 spin_unlock(&ci->i_unsafe_lock); 4184 } 4185 4186 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 4187 } 4188 out_err: 4189 mutex_lock(&mdsc->mutex); 4190 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 4191 if (err) { 4192 req->r_err = err; 4193 } else { 4194 req->r_reply = ceph_msg_get(msg); 4195 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags); 4196 } 4197 } else { 4198 doutc(cl, "reply arrived after request %lld was aborted\n", tid); 4199 } 4200 mutex_unlock(&mdsc->mutex); 4201 4202 mutex_unlock(&session->s_mutex); 4203 4204 /* kick calling process */ 4205 complete_request(mdsc, req); 4206 4207 ceph_update_metadata_metrics(&mdsc->metric, req->r_start_latency, 4208 req->r_end_latency, err); 4209 out: 4210 ceph_mdsc_put_request(req); 4211 4212 /* Defer closing the sessions after s_mutex lock being released */ 4213 if (close_sessions) 4214 ceph_mdsc_close_sessions(mdsc); 4215 return; 4216 } 4217 4218 4219 4220 /* 4221 * handle mds notification that our request has been forwarded. 4222 */ 4223 static void handle_forward(struct ceph_mds_client *mdsc, 4224 struct ceph_mds_session *session, 4225 struct ceph_msg *msg) 4226 { 4227 struct ceph_client *cl = mdsc->fsc->client; 4228 struct ceph_mds_request *req; 4229 u64 tid = le64_to_cpu(msg->hdr.tid); 4230 u32 next_mds; 4231 u32 fwd_seq; 4232 int err = -EINVAL; 4233 void *p = msg->front.iov_base; 4234 void *end = p + msg->front.iov_len; 4235 bool aborted = false; 4236 4237 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 4238 next_mds = ceph_decode_32(&p); 4239 fwd_seq = ceph_decode_32(&p); 4240 4241 mutex_lock(&mdsc->mutex); 4242 req = lookup_get_request(mdsc, tid); 4243 if (!req) { 4244 mutex_unlock(&mdsc->mutex); 4245 doutc(cl, "forward tid %llu to mds%d - req dne\n", tid, next_mds); 4246 return; /* dup reply? */ 4247 } 4248 4249 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 4250 doutc(cl, "forward tid %llu aborted, unregistering\n", tid); 4251 __unregister_request(mdsc, req); 4252 } else if (fwd_seq <= req->r_num_fwd || (uint32_t)fwd_seq >= U32_MAX) { 4253 /* 4254 * Avoid infinite retrying after overflow. 4255 * 4256 * The MDS will increase the fwd count and in client side 4257 * if the num_fwd is less than the one saved in request 4258 * that means the MDS is an old version and overflowed of 4259 * 8 bits. 4260 */ 4261 mutex_lock(&req->r_fill_mutex); 4262 req->r_err = -EMULTIHOP; 4263 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 4264 mutex_unlock(&req->r_fill_mutex); 4265 aborted = true; 4266 pr_warn_ratelimited_client(cl, "forward tid %llu seq overflow\n", 4267 tid); 4268 } else { 4269 /* resend. forward race not possible; mds would drop */ 4270 doutc(cl, "forward tid %llu to mds%d (we resend)\n", tid, next_mds); 4271 BUG_ON(req->r_err); 4272 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)); 4273 req->r_attempts = 0; 4274 req->r_num_fwd = fwd_seq; 4275 req->r_resend_mds = next_mds; 4276 put_request_session(req); 4277 __do_request(mdsc, req); 4278 } 4279 mutex_unlock(&mdsc->mutex); 4280 4281 /* kick calling process */ 4282 if (aborted) 4283 complete_request(mdsc, req); 4284 ceph_mdsc_put_request(req); 4285 return; 4286 4287 bad: 4288 pr_err_client(cl, "decode error err=%d\n", err); 4289 ceph_msg_dump(msg); 4290 } 4291 4292 static int __decode_session_metadata(void **p, void *end, 4293 bool *blocklisted) 4294 { 4295 /* map<string,string> */ 4296 u32 n; 4297 bool err_str; 4298 ceph_decode_32_safe(p, end, n, bad); 4299 while (n-- > 0) { 4300 u32 len; 4301 ceph_decode_32_safe(p, end, len, bad); 4302 ceph_decode_need(p, end, len, bad); 4303 err_str = !strncmp(*p, "error_string", len); 4304 *p += len; 4305 ceph_decode_32_safe(p, end, len, bad); 4306 ceph_decode_need(p, end, len, bad); 4307 /* 4308 * Match "blocklisted (blacklisted)" from newer MDSes, 4309 * or "blacklisted" from older MDSes. 4310 */ 4311 if (err_str && strnstr(*p, "blacklisted", len)) 4312 *blocklisted = true; 4313 *p += len; 4314 } 4315 return 0; 4316 bad: 4317 return -1; 4318 } 4319 4320 /* 4321 * handle a mds session control message 4322 */ 4323 static void handle_session(struct ceph_mds_session *session, 4324 struct ceph_msg *msg) 4325 { 4326 struct ceph_mds_client *mdsc = session->s_mdsc; 4327 struct ceph_client *cl = mdsc->fsc->client; 4328 int mds = session->s_mds; 4329 int msg_version = le16_to_cpu(msg->hdr.version); 4330 void *p = msg->front.iov_base; 4331 void *end = p + msg->front.iov_len; 4332 struct ceph_mds_session_head *h; 4333 struct ceph_mds_cap_auth *cap_auths = NULL; 4334 u32 op, cap_auths_num = 0; 4335 u64 seq, features = 0; 4336 int wake = 0; 4337 bool blocklisted = false; 4338 u32 i; 4339 4340 4341 /* decode */ 4342 ceph_decode_need(&p, end, sizeof(*h), bad); 4343 h = p; 4344 p += sizeof(*h); 4345 4346 op = le32_to_cpu(h->op); 4347 seq = le64_to_cpu(h->seq); 4348 4349 if (msg_version >= 3) { 4350 u32 len; 4351 /* version >= 2 and < 5, decode metadata, skip otherwise 4352 * as it's handled via flags. 4353 */ 4354 if (msg_version >= 5) 4355 ceph_decode_skip_map(&p, end, string, string, bad); 4356 else if (__decode_session_metadata(&p, end, &blocklisted) < 0) 4357 goto bad; 4358 4359 /* version >= 3, feature bits */ 4360 ceph_decode_32_safe(&p, end, len, bad); 4361 if (len) { 4362 ceph_decode_64_safe(&p, end, features, bad); 4363 p += len - sizeof(features); 4364 } 4365 } 4366 4367 if (msg_version >= 5) { 4368 u32 flags, len; 4369 4370 /* version >= 4 */ 4371 ceph_decode_skip_16(&p, end, bad); /* struct_v, struct_cv */ 4372 ceph_decode_32_safe(&p, end, len, bad); /* len */ 4373 ceph_decode_skip_n(&p, end, len, bad); /* metric_spec */ 4374 4375 /* version >= 5, flags */ 4376 ceph_decode_32_safe(&p, end, flags, bad); 4377 if (flags & CEPH_SESSION_BLOCKLISTED) { 4378 pr_warn_client(cl, "mds%d session blocklisted\n", 4379 session->s_mds); 4380 blocklisted = true; 4381 } 4382 } 4383 4384 if (msg_version >= 6) { 4385 ceph_decode_32_safe(&p, end, cap_auths_num, bad); 4386 doutc(cl, "cap_auths_num %d\n", cap_auths_num); 4387 4388 if (cap_auths_num && op != CEPH_SESSION_OPEN) { 4389 WARN_ON_ONCE(op != CEPH_SESSION_OPEN); 4390 goto skip_cap_auths; 4391 } 4392 4393 cap_auths = kzalloc_objs(struct ceph_mds_cap_auth, 4394 cap_auths_num); 4395 if (!cap_auths) { 4396 pr_err_client(cl, "No memory for cap_auths\n"); 4397 return; 4398 } 4399 4400 for (i = 0; i < cap_auths_num; i++) { 4401 u32 _len, j; 4402 4403 /* struct_v, struct_compat, and struct_len in MDSCapAuth */ 4404 ceph_decode_skip_n(&p, end, 2 + sizeof(u32), bad); 4405 4406 /* struct_v, struct_compat, and struct_len in MDSCapMatch */ 4407 ceph_decode_skip_n(&p, end, 2 + sizeof(u32), bad); 4408 ceph_decode_64_safe(&p, end, cap_auths[i].match.uid, bad); 4409 ceph_decode_32_safe(&p, end, _len, bad); 4410 if (_len) { 4411 cap_auths[i].match.gids = kcalloc(_len, sizeof(u32), 4412 GFP_KERNEL); 4413 if (!cap_auths[i].match.gids) { 4414 pr_err_client(cl, "No memory for gids\n"); 4415 goto fail; 4416 } 4417 4418 cap_auths[i].match.num_gids = _len; 4419 for (j = 0; j < _len; j++) 4420 ceph_decode_32_safe(&p, end, 4421 cap_auths[i].match.gids[j], 4422 bad); 4423 } 4424 4425 ceph_decode_32_safe(&p, end, _len, bad); 4426 if (_len) { 4427 cap_auths[i].match.path = kcalloc(_len + 1, sizeof(char), 4428 GFP_KERNEL); 4429 if (!cap_auths[i].match.path) { 4430 pr_err_client(cl, "No memory for path\n"); 4431 goto fail; 4432 } 4433 ceph_decode_copy(&p, cap_auths[i].match.path, _len); 4434 4435 /* Remove the tailing '/' */ 4436 while (_len && cap_auths[i].match.path[_len - 1] == '/') { 4437 cap_auths[i].match.path[_len - 1] = '\0'; 4438 _len -= 1; 4439 } 4440 } 4441 4442 ceph_decode_32_safe(&p, end, _len, bad); 4443 if (_len) { 4444 cap_auths[i].match.fs_name = kcalloc(_len + 1, sizeof(char), 4445 GFP_KERNEL); 4446 if (!cap_auths[i].match.fs_name) { 4447 pr_err_client(cl, "No memory for fs_name\n"); 4448 goto fail; 4449 } 4450 ceph_decode_copy(&p, cap_auths[i].match.fs_name, _len); 4451 } 4452 4453 ceph_decode_8_safe(&p, end, cap_auths[i].match.root_squash, bad); 4454 ceph_decode_8_safe(&p, end, cap_auths[i].readable, bad); 4455 ceph_decode_8_safe(&p, end, cap_auths[i].writeable, bad); 4456 doutc(cl, "uid %lld, num_gids %u, path %s, fs_name %s, root_squash %d, readable %d, writeable %d\n", 4457 cap_auths[i].match.uid, cap_auths[i].match.num_gids, 4458 cap_auths[i].match.path, cap_auths[i].match.fs_name, 4459 cap_auths[i].match.root_squash, 4460 cap_auths[i].readable, cap_auths[i].writeable); 4461 } 4462 } 4463 4464 skip_cap_auths: 4465 mutex_lock(&mdsc->mutex); 4466 if (op == CEPH_SESSION_OPEN) { 4467 if (mdsc->s_cap_auths) { 4468 for (i = 0; i < mdsc->s_cap_auths_num; i++) { 4469 kfree(mdsc->s_cap_auths[i].match.gids); 4470 kfree(mdsc->s_cap_auths[i].match.path); 4471 kfree(mdsc->s_cap_auths[i].match.fs_name); 4472 } 4473 kfree(mdsc->s_cap_auths); 4474 } 4475 mdsc->s_cap_auths_num = cap_auths_num; 4476 mdsc->s_cap_auths = cap_auths; 4477 4478 session->s_features = features; 4479 if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, 4480 &session->s_features)) 4481 ceph_metric_bind_session(mdsc, session); 4482 } 4483 if (op == CEPH_SESSION_CLOSE) { 4484 ceph_get_mds_session(session); 4485 __unregister_session(mdsc, session); 4486 } 4487 /* FIXME: this ttl calculation is generous */ 4488 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 4489 mutex_unlock(&mdsc->mutex); 4490 4491 mutex_lock(&session->s_mutex); 4492 4493 doutc(cl, "mds%d %s %p state %s seq %llu\n", mds, 4494 ceph_session_op_name(op), session, 4495 ceph_session_state_name(session->s_state), seq); 4496 4497 if (session->s_state == CEPH_MDS_SESSION_HUNG) { 4498 session->s_state = CEPH_MDS_SESSION_OPEN; 4499 pr_info_client(cl, "mds%d came back\n", session->s_mds); 4500 } 4501 4502 switch (op) { 4503 case CEPH_SESSION_OPEN: 4504 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 4505 pr_info_client(cl, "mds%d reconnect success\n", 4506 session->s_mds); 4507 4508 if (test_bit(CEPHFS_FEATURE_SUBVOLUME_METRICS, 4509 &session->s_features)) 4510 ceph_subvolume_metrics_enable(&mdsc->subvol_metrics, true); 4511 else 4512 ceph_subvolume_metrics_enable(&mdsc->subvol_metrics, false); 4513 if (session->s_state == CEPH_MDS_SESSION_OPEN) { 4514 pr_notice_client(cl, "mds%d is already opened\n", 4515 session->s_mds); 4516 } else { 4517 session->s_state = CEPH_MDS_SESSION_OPEN; 4518 renewed_caps(mdsc, session, 0); 4519 if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, 4520 &session->s_features)) 4521 metric_schedule_delayed(&mdsc->metric); 4522 } 4523 4524 /* 4525 * The connection maybe broken and the session in client 4526 * side has been reinitialized, need to update the seq 4527 * anyway. 4528 */ 4529 if (!session->s_seq && seq) 4530 session->s_seq = seq; 4531 4532 wake = 1; 4533 if (mdsc->stopping) 4534 __close_session(mdsc, session); 4535 break; 4536 4537 case CEPH_SESSION_RENEWCAPS: 4538 if (session->s_renew_seq == seq) 4539 renewed_caps(mdsc, session, 1); 4540 break; 4541 4542 case CEPH_SESSION_CLOSE: 4543 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 4544 pr_info_client(cl, "mds%d reconnect denied\n", 4545 session->s_mds); 4546 session->s_state = CEPH_MDS_SESSION_CLOSED; 4547 cleanup_session_requests(mdsc, session); 4548 remove_session_caps(session); 4549 wake = 2; /* for good measure */ 4550 wake_up_all(&mdsc->session_close_wq); 4551 break; 4552 4553 case CEPH_SESSION_STALE: 4554 pr_info_client(cl, "mds%d caps went stale, renewing\n", 4555 session->s_mds); 4556 atomic_inc(&session->s_cap_gen); 4557 session->s_cap_ttl = jiffies - 1; 4558 send_renew_caps(mdsc, session); 4559 break; 4560 4561 case CEPH_SESSION_RECALL_STATE: 4562 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 4563 break; 4564 4565 case CEPH_SESSION_FLUSHMSG: 4566 /* flush cap releases */ 4567 spin_lock(&session->s_cap_lock); 4568 if (session->s_num_cap_releases) 4569 ceph_flush_session_cap_releases(mdsc, session); 4570 spin_unlock(&session->s_cap_lock); 4571 4572 send_flushmsg_ack(mdsc, session, seq); 4573 break; 4574 4575 case CEPH_SESSION_FORCE_RO: 4576 doutc(cl, "force_session_readonly %p\n", session); 4577 spin_lock(&session->s_cap_lock); 4578 session->s_readonly = true; 4579 spin_unlock(&session->s_cap_lock); 4580 wake_up_session_caps(session, FORCE_RO); 4581 break; 4582 4583 case CEPH_SESSION_REJECT: 4584 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING && 4585 session->s_state != CEPH_MDS_SESSION_RECONNECTING); 4586 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 4587 pr_info_client(cl, "mds%d reconnect rejected\n", 4588 session->s_mds); 4589 else 4590 pr_info_client(cl, "mds%d rejected session\n", 4591 session->s_mds); 4592 session->s_state = CEPH_MDS_SESSION_REJECTED; 4593 cleanup_session_requests(mdsc, session); 4594 remove_session_caps(session); 4595 if (blocklisted) 4596 mdsc->fsc->blocklisted = true; 4597 wake = 2; /* for good measure */ 4598 break; 4599 4600 default: 4601 pr_err_client(cl, "bad op %d mds%d\n", op, mds); 4602 WARN_ON(1); 4603 } 4604 4605 mutex_unlock(&session->s_mutex); 4606 if (wake) { 4607 mutex_lock(&mdsc->mutex); 4608 __wake_requests(mdsc, &session->s_waiting); 4609 if (wake == 2) 4610 kick_requests(mdsc, mds); 4611 mutex_unlock(&mdsc->mutex); 4612 } 4613 if (op == CEPH_SESSION_CLOSE) 4614 ceph_put_mds_session(session); 4615 return; 4616 4617 bad: 4618 pr_err_client(cl, "corrupt message mds%d len %d\n", mds, 4619 (int)msg->front.iov_len); 4620 ceph_msg_dump(msg); 4621 fail: 4622 for (i = 0; i < cap_auths_num; i++) { 4623 kfree(cap_auths[i].match.gids); 4624 kfree(cap_auths[i].match.path); 4625 kfree(cap_auths[i].match.fs_name); 4626 } 4627 kfree(cap_auths); 4628 return; 4629 } 4630 4631 void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req) 4632 { 4633 struct ceph_client *cl = req->r_mdsc->fsc->client; 4634 int dcaps; 4635 4636 dcaps = xchg(&req->r_dir_caps, 0); 4637 if (dcaps) { 4638 doutc(cl, "releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 4639 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps); 4640 } 4641 } 4642 4643 void ceph_mdsc_release_dir_caps_async(struct ceph_mds_request *req) 4644 { 4645 struct ceph_client *cl = req->r_mdsc->fsc->client; 4646 int dcaps; 4647 4648 dcaps = xchg(&req->r_dir_caps, 0); 4649 if (dcaps) { 4650 doutc(cl, "releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 4651 ceph_put_cap_refs_async(ceph_inode(req->r_parent), dcaps); 4652 } 4653 } 4654 4655 /* 4656 * called under session->mutex. 4657 */ 4658 static void replay_unsafe_requests(struct ceph_mds_client *mdsc, 4659 struct ceph_mds_session *session) 4660 { 4661 struct ceph_mds_request *req, *nreq; 4662 struct rb_node *p; 4663 4664 doutc(mdsc->fsc->client, "mds%d\n", session->s_mds); 4665 4666 mutex_lock(&mdsc->mutex); 4667 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) 4668 __send_request(session, req, true); 4669 4670 /* 4671 * also re-send old requests when MDS enters reconnect stage. So that MDS 4672 * can process completed request in clientreplay stage. 4673 */ 4674 p = rb_first(&mdsc->request_tree); 4675 while (p) { 4676 req = rb_entry(p, struct ceph_mds_request, r_node); 4677 p = rb_next(p); 4678 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 4679 continue; 4680 if (req->r_attempts == 0) 4681 continue; /* only old requests */ 4682 if (!req->r_session) 4683 continue; 4684 if (req->r_session->s_mds != session->s_mds) 4685 continue; 4686 4687 ceph_mdsc_release_dir_caps_async(req); 4688 4689 __send_request(session, req, true); 4690 } 4691 mutex_unlock(&mdsc->mutex); 4692 } 4693 4694 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state) 4695 { 4696 struct ceph_msg *reply; 4697 struct ceph_pagelist *_pagelist; 4698 struct page *page; 4699 __le32 *addr; 4700 int err = -ENOMEM; 4701 4702 if (!recon_state->allow_multi) 4703 return -ENOSPC; 4704 4705 /* can't handle message that contains both caps and realm */ 4706 BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms); 4707 4708 /* pre-allocate new pagelist */ 4709 _pagelist = ceph_pagelist_alloc(GFP_NOFS); 4710 if (!_pagelist) 4711 return -ENOMEM; 4712 4713 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 4714 if (!reply) 4715 goto fail_msg; 4716 4717 /* placeholder for nr_caps */ 4718 err = ceph_pagelist_encode_32(_pagelist, 0); 4719 if (err < 0) 4720 goto fail; 4721 4722 if (recon_state->nr_caps) { 4723 /* currently encoding caps */ 4724 err = ceph_pagelist_encode_32(recon_state->pagelist, 0); 4725 if (err) 4726 goto fail; 4727 } else { 4728 /* placeholder for nr_realms (currently encoding relams) */ 4729 err = ceph_pagelist_encode_32(_pagelist, 0); 4730 if (err < 0) 4731 goto fail; 4732 } 4733 4734 err = ceph_pagelist_encode_8(recon_state->pagelist, 1); 4735 if (err) 4736 goto fail; 4737 4738 page = list_first_entry(&recon_state->pagelist->head, struct page, lru); 4739 addr = kmap_atomic(page); 4740 if (recon_state->nr_caps) { 4741 /* currently encoding caps */ 4742 *addr = cpu_to_le32(recon_state->nr_caps); 4743 } else { 4744 /* currently encoding relams */ 4745 *(addr + 1) = cpu_to_le32(recon_state->nr_realms); 4746 } 4747 kunmap_atomic(addr); 4748 4749 reply->hdr.version = cpu_to_le16(5); 4750 reply->hdr.compat_version = cpu_to_le16(4); 4751 4752 reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length); 4753 ceph_msg_data_add_pagelist(reply, recon_state->pagelist); 4754 4755 ceph_con_send(&recon_state->session->s_con, reply); 4756 ceph_pagelist_release(recon_state->pagelist); 4757 4758 recon_state->pagelist = _pagelist; 4759 recon_state->nr_caps = 0; 4760 recon_state->nr_realms = 0; 4761 recon_state->msg_version = 5; 4762 return 0; 4763 fail: 4764 ceph_msg_put(reply); 4765 fail_msg: 4766 ceph_pagelist_release(_pagelist); 4767 return err; 4768 } 4769 4770 static struct dentry* d_find_primary(struct inode *inode) 4771 { 4772 struct dentry *alias, *dn = NULL; 4773 4774 if (hlist_empty(&inode->i_dentry)) 4775 return NULL; 4776 4777 spin_lock(&inode->i_lock); 4778 if (hlist_empty(&inode->i_dentry)) 4779 goto out_unlock; 4780 4781 if (S_ISDIR(inode->i_mode)) { 4782 alias = hlist_entry(inode->i_dentry.first, struct dentry, d_alias); 4783 if (!IS_ROOT(alias)) 4784 dn = dget(alias); 4785 goto out_unlock; 4786 } 4787 4788 for_each_alias(alias, inode) { 4789 spin_lock(&alias->d_lock); 4790 if (!d_unhashed(alias) && 4791 (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) { 4792 dn = dget_dlock(alias); 4793 } 4794 spin_unlock(&alias->d_lock); 4795 if (dn) 4796 break; 4797 } 4798 out_unlock: 4799 spin_unlock(&inode->i_lock); 4800 return dn; 4801 } 4802 4803 /* 4804 * Encode information about a cap for a reconnect with the MDS. 4805 */ 4806 static int reconnect_caps_cb(struct inode *inode, int mds, void *arg) 4807 { 4808 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); 4809 struct ceph_client *cl = ceph_inode_to_client(inode); 4810 union { 4811 struct ceph_mds_cap_reconnect v2; 4812 struct ceph_mds_cap_reconnect_v1 v1; 4813 } rec; 4814 struct ceph_inode_info *ci = ceph_inode(inode); 4815 struct ceph_reconnect_state *recon_state = arg; 4816 struct ceph_pagelist *pagelist = recon_state->pagelist; 4817 struct dentry *dentry; 4818 struct ceph_cap *cap; 4819 struct ceph_path_info path_info = {0}; 4820 int err; 4821 u64 snap_follows; 4822 4823 dentry = d_find_primary(inode); 4824 if (dentry) { 4825 /* set pathbase to parent dir when msg_version >= 2 */ 4826 char *path = ceph_mdsc_build_path(mdsc, dentry, &path_info, 4827 recon_state->msg_version >= 2); 4828 dput(dentry); 4829 if (IS_ERR(path)) { 4830 err = PTR_ERR(path); 4831 goto out_err; 4832 } 4833 } 4834 4835 spin_lock(&ci->i_ceph_lock); 4836 cap = __get_cap_for_mds(ci, mds); 4837 if (!cap) { 4838 spin_unlock(&ci->i_ceph_lock); 4839 err = 0; 4840 goto out_err; 4841 } 4842 doutc(cl, " adding %p ino %llx.%llx cap %p %lld %s\n", inode, 4843 ceph_vinop(inode), cap, cap->cap_id, 4844 ceph_cap_string(cap->issued)); 4845 4846 cap->seq = 0; /* reset cap seq */ 4847 cap->issue_seq = 0; /* and issue_seq */ 4848 cap->mseq = 0; /* and migrate_seq */ 4849 cap->cap_gen = atomic_read(&cap->session->s_cap_gen); 4850 4851 /* 4852 * Note: CEPH_I_ERROR_FILELOCK is not set during reconnect. 4853 * Instead, locks are submitted for best-effort MDS reclaim 4854 * via the flock_len field below. If reclaim fails (e.g., 4855 * another client grabbed a conflicting lock), future lock 4856 * operations will fail and set the error flag at that point. 4857 */ 4858 4859 /* These are lost when the session goes away */ 4860 if (S_ISDIR(inode->i_mode)) { 4861 if (cap->issued & CEPH_CAP_DIR_CREATE) { 4862 ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns)); 4863 memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout)); 4864 } 4865 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS; 4866 } 4867 4868 if (recon_state->msg_version >= 2) { 4869 rec.v2.cap_id = cpu_to_le64(cap->cap_id); 4870 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 4871 rec.v2.issued = cpu_to_le32(cap->issued); 4872 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 4873 rec.v2.pathbase = cpu_to_le64(path_info.vino.ino); 4874 rec.v2.flock_len = cpu_to_le32( 4875 test_bit(CEPH_I_ERROR_FILELOCK_BIT, 4876 &ci->i_ceph_flags) ? 0 : 1); 4877 } else { 4878 struct timespec64 ts; 4879 4880 rec.v1.cap_id = cpu_to_le64(cap->cap_id); 4881 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 4882 rec.v1.issued = cpu_to_le32(cap->issued); 4883 rec.v1.size = cpu_to_le64(i_size_read(inode)); 4884 ts = inode_get_mtime(inode); 4885 ceph_encode_timespec64(&rec.v1.mtime, &ts); 4886 ts = inode_get_atime(inode); 4887 ceph_encode_timespec64(&rec.v1.atime, &ts); 4888 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 4889 rec.v1.pathbase = cpu_to_le64(path_info.vino.ino); 4890 } 4891 4892 if (list_empty(&ci->i_cap_snaps)) { 4893 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0; 4894 } else { 4895 struct ceph_cap_snap *capsnap = 4896 list_first_entry(&ci->i_cap_snaps, 4897 struct ceph_cap_snap, ci_item); 4898 snap_follows = capsnap->follows; 4899 } 4900 spin_unlock(&ci->i_ceph_lock); 4901 4902 if (recon_state->msg_version >= 2) { 4903 int num_fcntl_locks, num_flock_locks; 4904 struct ceph_filelock *flocks = NULL; 4905 size_t struct_len, total_len = sizeof(u64); 4906 u8 struct_v = 0; 4907 4908 encode_again: 4909 if (rec.v2.flock_len) { 4910 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); 4911 } else { 4912 num_fcntl_locks = 0; 4913 num_flock_locks = 0; 4914 } 4915 if (num_fcntl_locks + num_flock_locks > 0) { 4916 flocks = kmalloc_objs(struct ceph_filelock, 4917 num_fcntl_locks + num_flock_locks, 4918 GFP_NOFS); 4919 if (!flocks) { 4920 err = -ENOMEM; 4921 goto out_err; 4922 } 4923 err = ceph_encode_locks_to_buffer(inode, flocks, 4924 num_fcntl_locks, 4925 num_flock_locks); 4926 if (err) { 4927 kfree(flocks); 4928 flocks = NULL; 4929 if (err == -ENOSPC) 4930 goto encode_again; 4931 goto out_err; 4932 } 4933 } else { 4934 kfree(flocks); 4935 flocks = NULL; 4936 } 4937 4938 if (recon_state->msg_version >= 3) { 4939 /* version, compat_version and struct_len */ 4940 total_len += 2 * sizeof(u8) + sizeof(u32); 4941 struct_v = 2; 4942 } 4943 /* 4944 * number of encoded locks is stable, so copy to pagelist 4945 */ 4946 struct_len = 2 * sizeof(u32) + 4947 (num_fcntl_locks + num_flock_locks) * 4948 sizeof(struct ceph_filelock); 4949 rec.v2.flock_len = cpu_to_le32(struct_len); 4950 4951 struct_len += sizeof(u32) + path_info.pathlen + sizeof(rec.v2); 4952 4953 if (struct_v >= 2) 4954 struct_len += sizeof(u64); /* snap_follows */ 4955 4956 total_len += struct_len; 4957 4958 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) { 4959 err = send_reconnect_partial(recon_state); 4960 if (err) 4961 goto out_freeflocks; 4962 pagelist = recon_state->pagelist; 4963 } 4964 4965 err = ceph_pagelist_reserve(pagelist, total_len); 4966 if (err) 4967 goto out_freeflocks; 4968 4969 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 4970 if (recon_state->msg_version >= 3) { 4971 ceph_pagelist_encode_8(pagelist, struct_v); 4972 ceph_pagelist_encode_8(pagelist, 1); 4973 ceph_pagelist_encode_32(pagelist, struct_len); 4974 } 4975 ceph_pagelist_encode_string(pagelist, (char *)path_info.path, path_info.pathlen); 4976 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2)); 4977 ceph_locks_to_pagelist(flocks, pagelist, 4978 num_fcntl_locks, num_flock_locks); 4979 if (struct_v >= 2) 4980 ceph_pagelist_encode_64(pagelist, snap_follows); 4981 out_freeflocks: 4982 kfree(flocks); 4983 } else { 4984 err = ceph_pagelist_reserve(pagelist, 4985 sizeof(u64) + sizeof(u32) + 4986 path_info.pathlen + sizeof(rec.v1)); 4987 if (err) 4988 goto out_err; 4989 4990 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 4991 ceph_pagelist_encode_string(pagelist, (char *)path_info.path, path_info.pathlen); 4992 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); 4993 } 4994 4995 out_err: 4996 ceph_mdsc_free_path_info(&path_info); 4997 if (!err) 4998 recon_state->nr_caps++; 4999 return err; 5000 } 5001 5002 static int encode_snap_realms(struct ceph_mds_client *mdsc, 5003 struct ceph_reconnect_state *recon_state) 5004 { 5005 struct rb_node *p; 5006 struct ceph_pagelist *pagelist = recon_state->pagelist; 5007 struct ceph_client *cl = mdsc->fsc->client; 5008 int err = 0; 5009 5010 if (recon_state->msg_version >= 4) { 5011 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms); 5012 if (err < 0) 5013 goto fail; 5014 } 5015 5016 /* 5017 * snaprealms. we provide mds with the ino, seq (version), and 5018 * parent for all of our realms. If the mds has any newer info, 5019 * it will tell us. 5020 */ 5021 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { 5022 struct ceph_snap_realm *realm = 5023 rb_entry(p, struct ceph_snap_realm, node); 5024 struct ceph_mds_snaprealm_reconnect sr_rec; 5025 5026 if (recon_state->msg_version >= 4) { 5027 size_t need = sizeof(u8) * 2 + sizeof(u32) + 5028 sizeof(sr_rec); 5029 5030 if (pagelist->length + need > RECONNECT_MAX_SIZE) { 5031 err = send_reconnect_partial(recon_state); 5032 if (err) 5033 goto fail; 5034 pagelist = recon_state->pagelist; 5035 } 5036 5037 err = ceph_pagelist_reserve(pagelist, need); 5038 if (err) 5039 goto fail; 5040 5041 ceph_pagelist_encode_8(pagelist, 1); 5042 ceph_pagelist_encode_8(pagelist, 1); 5043 ceph_pagelist_encode_32(pagelist, sizeof(sr_rec)); 5044 } 5045 5046 doutc(cl, " adding snap realm %llx seq %lld parent %llx\n", 5047 realm->ino, realm->seq, realm->parent_ino); 5048 sr_rec.ino = cpu_to_le64(realm->ino); 5049 sr_rec.seq = cpu_to_le64(realm->seq); 5050 sr_rec.parent = cpu_to_le64(realm->parent_ino); 5051 5052 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); 5053 if (err) 5054 goto fail; 5055 5056 recon_state->nr_realms++; 5057 } 5058 fail: 5059 return err; 5060 } 5061 5062 5063 /* 5064 * If an MDS fails and recovers, clients need to reconnect in order to 5065 * reestablish shared state. This includes all caps issued through 5066 * this session _and_ the snap_realm hierarchy. Because it's not 5067 * clear which snap realms the mds cares about, we send everything we 5068 * know about.. that ensures we'll then get any new info the 5069 * recovering MDS might have. 5070 * 5071 * This is a relatively heavyweight operation, but it's rare. 5072 */ 5073 static int send_mds_reconnect(struct ceph_mds_client *mdsc, 5074 struct ceph_mds_session *session) 5075 { 5076 struct ceph_client *cl = mdsc->fsc->client; 5077 struct ceph_msg *reply; 5078 int mds = session->s_mds; 5079 int err = -ENOMEM; 5080 int old_state; 5081 struct ceph_reconnect_state recon_state = { 5082 .session = session, 5083 }; 5084 LIST_HEAD(dispose); 5085 5086 recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS); 5087 if (!recon_state.pagelist) 5088 goto fail_nopagelist; 5089 5090 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 5091 if (!reply) 5092 goto fail_nomsg; 5093 5094 mutex_lock(&session->s_mutex); 5095 5096 /* Serialized by s_mutex against concurrent ceph_get_deleg_ino(). */ 5097 xa_destroy(&session->s_delegated_inos); 5098 if (session->s_state == CEPH_MDS_SESSION_CLOSED || 5099 session->s_state == CEPH_MDS_SESSION_REJECTED) { 5100 pr_info_client(cl, "mds%d skipping reconnect, session %s\n", 5101 mds, 5102 ceph_session_state_name(session->s_state)); 5103 mutex_unlock(&session->s_mutex); 5104 ceph_msg_put(reply); 5105 err = -ESTALE; 5106 goto fail_return; 5107 } 5108 5109 /* s_mutex -> mdsc->mutex matches cleanup_session_requests() order. */ 5110 mutex_lock(&mdsc->mutex); 5111 if (mds >= mdsc->max_sessions || mdsc->sessions[mds] != session) { 5112 mutex_unlock(&mdsc->mutex); 5113 pr_info_client(cl, 5114 "mds%d skipping reconnect, session unregistered\n", 5115 mds); 5116 mutex_unlock(&session->s_mutex); 5117 ceph_msg_put(reply); 5118 err = -ENOENT; 5119 goto fail_return; 5120 } 5121 mutex_unlock(&mdsc->mutex); 5122 5123 pr_info_client(cl, "mds%d reconnect start\n", mds); 5124 old_state = session->s_state; 5125 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 5126 session->s_seq = 0; 5127 5128 doutc(cl, "session %p state %s\n", session, 5129 ceph_session_state_name(session->s_state)); 5130 5131 atomic_inc(&session->s_cap_gen); 5132 5133 spin_lock(&session->s_cap_lock); 5134 /* don't know if session is readonly */ 5135 session->s_readonly = 0; 5136 /* 5137 * notify __ceph_remove_cap() that we are composing cap reconnect. 5138 * If a cap get released before being added to the cap reconnect, 5139 * __ceph_remove_cap() should skip queuing cap release. 5140 */ 5141 session->s_cap_reconnect = 1; 5142 /* drop old cap expires; we're about to reestablish that state */ 5143 detach_cap_releases(session, &dispose); 5144 spin_unlock(&session->s_cap_lock); 5145 dispose_cap_releases(mdsc, &dispose); 5146 5147 /* trim unused caps to reduce MDS's cache rejoin time */ 5148 if (mdsc->fsc->sb->s_root) 5149 shrink_dcache_parent(mdsc->fsc->sb->s_root); 5150 5151 ceph_con_close(&session->s_con); 5152 ceph_con_open(&session->s_con, 5153 CEPH_ENTITY_TYPE_MDS, mds, 5154 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 5155 5156 /* replay unsafe requests */ 5157 replay_unsafe_requests(mdsc, session); 5158 5159 ceph_early_kick_flushing_caps(mdsc, session); 5160 5161 down_read(&mdsc->snap_rwsem); 5162 5163 /* placeholder for nr_caps */ 5164 err = ceph_pagelist_encode_32(recon_state.pagelist, 0); 5165 if (err) 5166 goto fail_clear_cap_reconnect; 5167 5168 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) { 5169 recon_state.msg_version = 3; 5170 recon_state.allow_multi = true; 5171 } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) { 5172 recon_state.msg_version = 3; 5173 } else { 5174 recon_state.msg_version = 2; 5175 } 5176 /* traverse this session's caps */ 5177 err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state); 5178 5179 spin_lock(&session->s_cap_lock); 5180 session->s_cap_reconnect = 0; 5181 spin_unlock(&session->s_cap_lock); 5182 5183 if (err < 0) 5184 goto fail; 5185 5186 /* check if all realms can be encoded into current message */ 5187 if (mdsc->num_snap_realms) { 5188 size_t total_len = 5189 recon_state.pagelist->length + 5190 mdsc->num_snap_realms * 5191 sizeof(struct ceph_mds_snaprealm_reconnect); 5192 if (recon_state.msg_version >= 4) { 5193 /* number of realms */ 5194 total_len += sizeof(u32); 5195 /* version, compat_version and struct_len */ 5196 total_len += mdsc->num_snap_realms * 5197 (2 * sizeof(u8) + sizeof(u32)); 5198 } 5199 if (total_len > RECONNECT_MAX_SIZE) { 5200 if (!recon_state.allow_multi) { 5201 err = -ENOSPC; 5202 goto fail; 5203 } 5204 if (recon_state.nr_caps) { 5205 err = send_reconnect_partial(&recon_state); 5206 if (err) 5207 goto fail; 5208 } 5209 recon_state.msg_version = 5; 5210 } 5211 } 5212 5213 err = encode_snap_realms(mdsc, &recon_state); 5214 if (err < 0) 5215 goto fail; 5216 5217 if (recon_state.msg_version >= 5) { 5218 err = ceph_pagelist_encode_8(recon_state.pagelist, 0); 5219 if (err < 0) 5220 goto fail; 5221 } 5222 5223 if (recon_state.nr_caps || recon_state.nr_realms) { 5224 struct page *page = 5225 list_first_entry(&recon_state.pagelist->head, 5226 struct page, lru); 5227 __le32 *addr = kmap_atomic(page); 5228 if (recon_state.nr_caps) { 5229 WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms); 5230 *addr = cpu_to_le32(recon_state.nr_caps); 5231 } else if (recon_state.msg_version >= 4) { 5232 *(addr + 1) = cpu_to_le32(recon_state.nr_realms); 5233 } 5234 kunmap_atomic(addr); 5235 } 5236 5237 reply->hdr.version = cpu_to_le16(recon_state.msg_version); 5238 if (recon_state.msg_version >= 4) 5239 reply->hdr.compat_version = cpu_to_le16(4); 5240 5241 reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length); 5242 ceph_msg_data_add_pagelist(reply, recon_state.pagelist); 5243 5244 ceph_con_send(&session->s_con, reply); 5245 5246 mutex_unlock(&session->s_mutex); 5247 5248 mutex_lock(&mdsc->mutex); 5249 __wake_requests(mdsc, &session->s_waiting); 5250 mutex_unlock(&mdsc->mutex); 5251 5252 up_read(&mdsc->snap_rwsem); 5253 ceph_pagelist_release(recon_state.pagelist); 5254 return 0; 5255 5256 fail_clear_cap_reconnect: 5257 spin_lock(&session->s_cap_lock); 5258 session->s_cap_reconnect = 0; 5259 spin_unlock(&session->s_cap_lock); 5260 fail: 5261 ceph_msg_put(reply); 5262 up_read(&mdsc->snap_rwsem); 5263 /* 5264 * Restore prior session state so map-driven reconnect logic 5265 * (check_new_map) can retry. Without this, a transient build 5266 * failure strands the session in RECONNECTING indefinitely. 5267 */ 5268 session->s_state = old_state; 5269 mutex_unlock(&session->s_mutex); 5270 fail_nomsg: 5271 ceph_pagelist_release(recon_state.pagelist); 5272 fail_nopagelist: 5273 pr_err_client(cl, "error %d preparing reconnect for mds%d\n", 5274 err, mds); 5275 return err; 5276 5277 fail_return: 5278 /* 5279 * Early-exit path for expected concurrent-teardown races 5280 * (-ESTALE for closed/rejected sessions, -ENOENT for 5281 * unregistered sessions). Skip the pr_err_client diagnostic 5282 * since these are not genuine reconnect build failures. 5283 */ 5284 ceph_pagelist_release(recon_state.pagelist); 5285 return err; 5286 } 5287 5288 const char *ceph_reset_phase_name(enum ceph_client_reset_phase phase) 5289 { 5290 switch (phase) { 5291 case CEPH_CLIENT_RESET_IDLE: return "idle"; 5292 case CEPH_CLIENT_RESET_QUIESCING: return "quiescing"; 5293 case CEPH_CLIENT_RESET_DRAINING: return "draining"; 5294 case CEPH_CLIENT_RESET_TEARDOWN: return "teardown"; 5295 default: return "unknown"; 5296 } 5297 } 5298 5299 /** 5300 * ceph_mdsc_wait_for_reset - wait for an active reset to complete 5301 * @mdsc: MDS client 5302 * 5303 * Returns 0 if reset completed successfully or no reset was active. 5304 * Returns -EAGAIN if reset completed with an error, signalling the 5305 * caller to retry. The internal error (e.g. -ENOMEM) is not propagated 5306 * because callers like open() or flock() have no way to act on 5307 * work-function internals. The detailed error is available via debugfs 5308 * reset/status and tracepoints. 5309 * Returns -ETIMEDOUT if we timed out waiting. 5310 * Returns -ERESTARTSYS if interrupted by signal. 5311 */ 5312 int ceph_mdsc_wait_for_reset(struct ceph_mds_client *mdsc) 5313 { 5314 struct ceph_client_reset_state *st = &mdsc->reset_state; 5315 struct ceph_client *cl = mdsc->fsc->client; 5316 unsigned long deadline = jiffies + CEPH_CLIENT_RESET_WAIT_TIMEOUT_SEC * HZ; 5317 int blocked_count; 5318 long remaining; 5319 long wait_ret; 5320 int ret; 5321 5322 if (ceph_reset_is_idle(st)) 5323 return 0; 5324 5325 blocked_count = atomic_inc_return(&st->blocked_requests); 5326 doutc(cl, "request blocked during reset, %d total blocked\n", 5327 blocked_count); 5328 trace_ceph_client_reset_blocked(mdsc, blocked_count); 5329 5330 retry: 5331 remaining = max_t(long, deadline - jiffies, 1); 5332 wait_ret = wait_event_interruptible_timeout(st->blocked_wq, 5333 ceph_reset_is_idle(st), 5334 remaining); 5335 5336 if (wait_ret == 0) { 5337 atomic_dec(&st->blocked_requests); 5338 pr_warn_client(cl, "timed out waiting for reset to complete\n"); 5339 trace_ceph_client_reset_unblocked(mdsc, -ETIMEDOUT); 5340 return -ETIMEDOUT; 5341 } 5342 if (wait_ret < 0) { 5343 atomic_dec(&st->blocked_requests); 5344 trace_ceph_client_reset_unblocked(mdsc, (int)wait_ret); 5345 return (int)wait_ret; /* -ERESTARTSYS */ 5346 } 5347 5348 /* 5349 * Verify phase is still IDLE under the lock. If another reset 5350 * was scheduled between the wake-up and this check, loop back 5351 * and wait for it to finish rather than returning a stale result. 5352 */ 5353 spin_lock(&st->lock); 5354 if (st->phase != CEPH_CLIENT_RESET_IDLE) { 5355 spin_unlock(&st->lock); 5356 if (time_before(jiffies, deadline)) 5357 goto retry; 5358 atomic_dec(&st->blocked_requests); 5359 trace_ceph_client_reset_unblocked(mdsc, -ETIMEDOUT); 5360 return -ETIMEDOUT; 5361 } 5362 ret = st->last_errno; 5363 spin_unlock(&st->lock); 5364 5365 atomic_dec(&st->blocked_requests); 5366 trace_ceph_client_reset_unblocked(mdsc, ret); 5367 return ret ? -EAGAIN : 0; 5368 } 5369 5370 static void ceph_mdsc_reset_complete(struct ceph_mds_client *mdsc, int ret) 5371 { 5372 struct ceph_client_reset_state *st = &mdsc->reset_state; 5373 5374 spin_lock(&st->lock); 5375 /* 5376 * If destroy already marked us as shut down, it owns the 5377 * final bookkeeping and waiter wakeup. Just bail so we 5378 * don't overwrite its state. 5379 */ 5380 if (st->shutdown) { 5381 spin_unlock(&st->lock); 5382 return; 5383 } 5384 st->last_finish = jiffies; 5385 st->last_errno = ret; 5386 st->phase = CEPH_CLIENT_RESET_IDLE; 5387 if (ret) 5388 st->failure_count++; 5389 else 5390 st->success_count++; 5391 spin_unlock(&st->lock); 5392 5393 /* Wake up all requests that were blocked waiting for reset */ 5394 wake_up_all(&st->blocked_wq); 5395 5396 trace_ceph_client_reset_complete(mdsc, ret); 5397 } 5398 5399 static void ceph_mdsc_reset_workfn(struct work_struct *work) 5400 { 5401 struct ceph_mds_client *mdsc = 5402 container_of(work, struct ceph_mds_client, reset_work); 5403 struct ceph_client_reset_state *st = &mdsc->reset_state; 5404 struct ceph_client *cl = mdsc->fsc->client; 5405 struct ceph_mds_session **sessions = NULL; 5406 char reason[CEPH_CLIENT_RESET_REASON_LEN]; 5407 unsigned long drain_deadline; 5408 int max_sessions, i, n = 0, torn_down = 0; 5409 int ret = 0; 5410 5411 spin_lock(&st->lock); 5412 strscpy(reason, st->last_reason, sizeof(reason)); 5413 spin_unlock(&st->lock); 5414 5415 mutex_lock(&mdsc->mutex); 5416 max_sessions = mdsc->max_sessions; 5417 if (max_sessions <= 0) { 5418 mutex_unlock(&mdsc->mutex); 5419 goto out_complete; 5420 } 5421 5422 sessions = kcalloc(max_sessions, sizeof(*sessions), GFP_KERNEL); 5423 if (!sessions) { 5424 mutex_unlock(&mdsc->mutex); 5425 ret = -ENOMEM; 5426 pr_err_client(cl, 5427 "manual session reset failed to allocate session array\n"); 5428 ceph_mdsc_reset_complete(mdsc, ret); 5429 return; 5430 } 5431 5432 for (i = 0; i < max_sessions; i++) { 5433 struct ceph_mds_session *session = mdsc->sessions[i]; 5434 5435 if (!session) 5436 continue; 5437 5438 /* 5439 * Read session state without s_mutex to avoid nesting 5440 * mdsc->mutex -> s_mutex, which would invert the 5441 * s_mutex -> mdsc->mutex order used by 5442 * cleanup_session_requests(). s_state is an int 5443 * so loads are atomic; the teardown loop below 5444 * handles races with concurrent state transitions. 5445 */ 5446 switch (READ_ONCE(session->s_state)) { 5447 case CEPH_MDS_SESSION_OPEN: 5448 case CEPH_MDS_SESSION_HUNG: 5449 case CEPH_MDS_SESSION_OPENING: 5450 case CEPH_MDS_SESSION_RESTARTING: 5451 case CEPH_MDS_SESSION_RECONNECTING: 5452 case CEPH_MDS_SESSION_CLOSING: 5453 sessions[n++] = ceph_get_mds_session(session); 5454 break; 5455 default: 5456 pr_info_client(cl, 5457 "mds%d in state %s, skipping reset\n", 5458 session->s_mds, 5459 ceph_session_state_name(session->s_state)); 5460 break; 5461 } 5462 } 5463 mutex_unlock(&mdsc->mutex); 5464 5465 pr_info_client(cl, 5466 "manual session reset executing (sessions=%d, reason=\"%s\")\n", 5467 n, reason); 5468 5469 if (n == 0) { 5470 kfree(sessions); 5471 goto out_complete; 5472 } 5473 5474 spin_lock(&st->lock); 5475 if (st->shutdown) { 5476 spin_unlock(&st->lock); 5477 goto out_sessions; 5478 } 5479 st->phase = CEPH_CLIENT_RESET_DRAINING; 5480 spin_unlock(&st->lock); 5481 5482 /* 5483 * Best-effort drain: flush dirty state while sessions are still 5484 * alive. New requests are blocked while phase != IDLE. 5485 * The sessions are functional, so non-stuck state drains normally. 5486 * Stuck state (the cause of the stalemate the operator is trying 5487 * to break) will not drain -- that is expected, and we proceed to 5488 * forced teardown after the timeout. 5489 * 5490 * Four things are drained: 5491 * 1. MDS journal -- send_flush_mdlog asks each MDS to journal 5492 * pending unsafe operations (creates, renames, setattrs). 5493 * 2. Unsafe requests -- bounded wait for each unsafe write 5494 * request to reach safe status via r_safe_completion. 5495 * 3. Dirty caps -- ceph_flush_dirty_caps triggers cap flush on 5496 * all sessions. Non-stuck caps flush in milliseconds. 5497 * 4. Cap releases -- push pending cap release messages. 5498 * 5499 * The unsafe-request wait and cap-flush wait below provide 5500 * the bounded drain window during which all categories can 5501 * make progress. 5502 */ 5503 for (i = 0; i < n; i++) 5504 send_flush_mdlog(sessions[i]); 5505 5506 /* 5507 * Both drain legs (unsafe requests and cap flushes) share a 5508 * single deadline so the total drain time is bounded at 5509 * CEPH_CLIENT_RESET_DRAIN_SEC. 5510 */ 5511 drain_deadline = jiffies + CEPH_CLIENT_RESET_DRAIN_SEC * HZ; 5512 5513 /* 5514 * Wait for unsafe write requests (creates, renames, setattrs) 5515 * to reach safe status. Uses the same pattern as 5516 * flush_mdlog_and_wait_mdsc_unsafe_requests() but bounded by 5517 * the shared drain deadline. Requests that do not complete within 5518 * the window are force-dropped during teardown. 5519 */ 5520 { 5521 struct ceph_mds_request *req; 5522 struct rb_node *rn; 5523 u64 last_tid; 5524 5525 mutex_lock(&mdsc->mutex); 5526 last_tid = mdsc->last_tid; 5527 mutex_unlock(&mdsc->mutex); 5528 5529 mutex_lock(&mdsc->mutex); 5530 rn = rb_first(&mdsc->request_tree); 5531 while (rn) { 5532 req = rb_entry(rn, struct ceph_mds_request, r_node); 5533 if (req->r_tid > last_tid) 5534 break; 5535 if (req->r_op == CEPH_MDS_OP_SETFILELOCK || 5536 !(req->r_op & CEPH_MDS_OP_WRITE)) { 5537 rn = rb_next(rn); 5538 continue; 5539 } 5540 ceph_mdsc_get_request(req); 5541 mutex_unlock(&mdsc->mutex); 5542 5543 wait_for_completion_timeout(&req->r_safe_completion, 5544 max_t(long, drain_deadline - jiffies, 1)); 5545 5546 mutex_lock(&mdsc->mutex); 5547 ceph_mdsc_put_request(req); 5548 if (time_after(jiffies, drain_deadline)) 5549 break; 5550 rn = rb_first(&mdsc->request_tree); 5551 } 5552 mutex_unlock(&mdsc->mutex); 5553 5554 if (time_after_eq(jiffies, drain_deadline)) 5555 WRITE_ONCE(st->drain_timed_out, true); 5556 } 5557 5558 ceph_flush_dirty_caps(mdsc); 5559 ceph_flush_cap_releases(mdsc); 5560 5561 spin_lock(&mdsc->cap_dirty_lock); 5562 if (!list_empty(&mdsc->cap_flush_list)) { 5563 struct ceph_cap_flush *cf = 5564 list_last_entry(&mdsc->cap_flush_list, 5565 struct ceph_cap_flush, g_list); 5566 u64 want_flush = mdsc->last_cap_flush_tid; 5567 long drain_ret; 5568 5569 /* 5570 * Setting wake on the last entry is sufficient: flush 5571 * entries complete in order, so when this entry finishes 5572 * all earlier ones are already done. 5573 */ 5574 cf->wake = true; 5575 spin_unlock(&mdsc->cap_dirty_lock); 5576 pr_info_client(cl, 5577 "draining (want_flush=%llu, %d sessions)\n", 5578 want_flush, n); 5579 drain_ret = wait_event_timeout(mdsc->cap_flushing_wq, 5580 check_caps_flush(mdsc, 5581 want_flush), 5582 max_t(long, 5583 drain_deadline - jiffies, 5584 1)); 5585 if (drain_ret == 0) { 5586 pr_info_client(cl, 5587 "drain timed out, proceeding with forced teardown\n"); 5588 WRITE_ONCE(st->drain_timed_out, true); 5589 } else { 5590 pr_info_client(cl, "drain completed successfully\n"); 5591 } 5592 } else { 5593 spin_unlock(&mdsc->cap_dirty_lock); 5594 } 5595 5596 spin_lock(&st->lock); 5597 if (st->shutdown) { 5598 spin_unlock(&st->lock); 5599 goto out_sessions; 5600 } 5601 st->phase = CEPH_CLIENT_RESET_TEARDOWN; 5602 spin_unlock(&st->lock); 5603 5604 /* 5605 * Ask each MDS to close the session before we tear it down 5606 * locally. Without this the MDS sees only a connection drop and 5607 * waits for the client to reconnect (up to session_autoclose 5608 * seconds) before evicting the session and releasing locks. 5609 * 5610 * Reuse the normal close machinery so the session state/sequence 5611 * snapshot is serialized under s_mutex and a racing s_seq bump 5612 * retransmits REQUEST_CLOSE while the session remains CLOSING. 5613 * We send all close requests first, then yield briefly to let the 5614 * network stack transmit them before __unregister_session() 5615 * closes the connections. 5616 */ 5617 for (i = 0; i < n; i++) { 5618 int err; 5619 5620 mutex_lock(&sessions[i]->s_mutex); 5621 err = __close_session(mdsc, sessions[i]); 5622 mutex_unlock(&sessions[i]->s_mutex); 5623 if (err < 0) 5624 pr_warn_client(cl, 5625 "mds%d failed to queue close request before reset: %d\n", 5626 sessions[i]->s_mds, err); 5627 } 5628 /* 5629 * Best-effort grace period: yield briefly so the network stack 5630 * can transmit the queued REQUEST_CLOSE messages before we tear 5631 * down connections. Not a correctness requirement -- the MDS 5632 * will still evict via session_autoclose if it never receives 5633 * the close request. 5634 * 5635 * Event-based waiting is not viable here: there is no completion 5636 * event for "message left the NIC," and waiting for the MDS 5637 * SESSION_CLOSE response would re-create the stalemate that the 5638 * reset is meant to break. 5639 */ 5640 if (n > 0) 5641 msleep(CEPH_CLIENT_RESET_CLOSE_GRACE_MS); 5642 5643 /* 5644 * Tear down each session: close the connection, remove all 5645 * caps, clean up requests, then kick pending requests so they 5646 * re-open a fresh session on the next attempt. 5647 * 5648 * This is modeled on the check_new_map() forced-close path 5649 * for stopped MDS ranks - a proven pattern for hard session 5650 * teardown. We do NOT attempt send_mds_reconnect() because 5651 * the MDS only accepts reconnects during its own RECONNECT 5652 * phase (after MDS restart), not from an active client. 5653 * 5654 * Any state that did not drain (caps that didn't flush, unsafe 5655 * requests that the MDS didn't journal) is force-dropped here. 5656 * This is intentional: that state is stuck and is the reason 5657 * the operator triggered the reset. 5658 */ 5659 for (i = 0; i < n; i++) { 5660 int mds = sessions[i]->s_mds; 5661 5662 pr_info_client(cl, "mds%d resetting session\n", mds); 5663 5664 mutex_lock(&mdsc->mutex); 5665 if (mds >= mdsc->max_sessions || 5666 mdsc->sessions[mds] != sessions[i]) { 5667 pr_info_client(cl, 5668 "mds%d session already torn down, skipping\n", 5669 mds); 5670 mutex_unlock(&mdsc->mutex); 5671 ceph_put_mds_session(sessions[i]); 5672 sessions[i] = NULL; 5673 continue; 5674 } 5675 sessions[i]->s_state = CEPH_MDS_SESSION_CLOSED; 5676 __unregister_session(mdsc, sessions[i]); 5677 __wake_requests(mdsc, &sessions[i]->s_waiting); 5678 mutex_unlock(&mdsc->mutex); 5679 5680 mutex_lock(&sessions[i]->s_mutex); 5681 cleanup_session_requests(mdsc, sessions[i]); 5682 remove_session_caps(sessions[i]); 5683 mutex_unlock(&sessions[i]->s_mutex); 5684 5685 wake_up_all(&mdsc->session_close_wq); 5686 5687 ceph_put_mds_session(sessions[i]); 5688 5689 mutex_lock(&mdsc->mutex); 5690 kick_requests(mdsc, mds); 5691 mutex_unlock(&mdsc->mutex); 5692 5693 torn_down++; 5694 pr_info_client(cl, "mds%d session reset complete\n", mds); 5695 } 5696 5697 kfree(sessions); 5698 5699 spin_lock(&st->lock); 5700 st->sessions_reset = torn_down; 5701 spin_unlock(&st->lock); 5702 5703 out_complete: 5704 ceph_mdsc_reset_complete(mdsc, ret); 5705 return; 5706 5707 out_sessions: 5708 /* shutdown == true: ceph_mdsc_destroy() owns the final transition. */ 5709 for (i = 0; i < n; i++) 5710 ceph_put_mds_session(sessions[i]); 5711 kfree(sessions); 5712 } 5713 5714 int ceph_mdsc_schedule_reset(struct ceph_mds_client *mdsc, 5715 const char *reason) 5716 { 5717 struct ceph_client_reset_state *st = &mdsc->reset_state; 5718 struct ceph_fs_client *fsc = mdsc->fsc; 5719 const char *msg = (reason && reason[0]) ? reason : "manual"; 5720 int mount_state; 5721 5722 mount_state = READ_ONCE(fsc->mount_state); 5723 if (mount_state != CEPH_MOUNT_MOUNTED) { 5724 pr_warn_client(fsc->client, 5725 "reset rejected: mount_state=%d (not mounted)\n", 5726 mount_state); 5727 return -EINVAL; 5728 } 5729 5730 spin_lock(&st->lock); 5731 if (st->phase != CEPH_CLIENT_RESET_IDLE) { 5732 spin_unlock(&st->lock); 5733 return -EBUSY; 5734 } 5735 5736 st->phase = CEPH_CLIENT_RESET_QUIESCING; 5737 st->last_start = jiffies; 5738 st->last_errno = 0; 5739 st->drain_timed_out = false; 5740 st->sessions_reset = 0; 5741 st->trigger_count++; 5742 strscpy(st->last_reason, msg, sizeof(st->last_reason)); 5743 spin_unlock(&st->lock); 5744 5745 if (WARN_ON_ONCE(!queue_work(system_unbound_wq, &mdsc->reset_work))) { 5746 spin_lock(&st->lock); 5747 st->phase = CEPH_CLIENT_RESET_IDLE; 5748 st->last_errno = -EALREADY; 5749 st->last_finish = jiffies; 5750 st->failure_count++; 5751 spin_unlock(&st->lock); 5752 wake_up_all(&st->blocked_wq); 5753 return -EALREADY; 5754 } 5755 5756 pr_info_client(mdsc->fsc->client, 5757 "manual session reset scheduled (reason=\"%s\")\n", 5758 msg); 5759 trace_ceph_client_reset_schedule(mdsc, msg); 5760 return 0; 5761 } 5762 5763 5764 /* 5765 * compare old and new mdsmaps, kicking requests 5766 * and closing out old connections as necessary 5767 * 5768 * called under mdsc->mutex. 5769 */ 5770 static void check_new_map(struct ceph_mds_client *mdsc, 5771 struct ceph_mdsmap *newmap, 5772 struct ceph_mdsmap *oldmap) 5773 { 5774 int i, j, err; 5775 int oldstate, newstate; 5776 struct ceph_mds_session *s; 5777 unsigned long targets[DIV_ROUND_UP(CEPH_MAX_MDS, sizeof(unsigned long))] = {0}; 5778 struct ceph_client *cl = mdsc->fsc->client; 5779 5780 doutc(cl, "new %u old %u\n", newmap->m_epoch, oldmap->m_epoch); 5781 5782 if (newmap->m_info) { 5783 for (i = 0; i < newmap->possible_max_rank; i++) { 5784 for (j = 0; j < newmap->m_info[i].num_export_targets; j++) 5785 set_bit(newmap->m_info[i].export_targets[j], targets); 5786 } 5787 } 5788 5789 for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) { 5790 if (!mdsc->sessions[i]) 5791 continue; 5792 s = mdsc->sessions[i]; 5793 oldstate = ceph_mdsmap_get_state(oldmap, i); 5794 newstate = ceph_mdsmap_get_state(newmap, i); 5795 5796 doutc(cl, "mds%d state %s%s -> %s%s (session %s)\n", 5797 i, ceph_mds_state_name(oldstate), 5798 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", 5799 ceph_mds_state_name(newstate), 5800 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", 5801 ceph_session_state_name(s->s_state)); 5802 5803 if (i >= newmap->possible_max_rank) { 5804 /* force close session for stopped mds */ 5805 ceph_get_mds_session(s); 5806 __unregister_session(mdsc, s); 5807 __wake_requests(mdsc, &s->s_waiting); 5808 mutex_unlock(&mdsc->mutex); 5809 5810 mutex_lock(&s->s_mutex); 5811 cleanup_session_requests(mdsc, s); 5812 remove_session_caps(s); 5813 mutex_unlock(&s->s_mutex); 5814 5815 ceph_put_mds_session(s); 5816 5817 mutex_lock(&mdsc->mutex); 5818 kick_requests(mdsc, i); 5819 continue; 5820 } 5821 5822 if (memcmp(ceph_mdsmap_get_addr(oldmap, i), 5823 ceph_mdsmap_get_addr(newmap, i), 5824 sizeof(struct ceph_entity_addr))) { 5825 /* just close it */ 5826 mutex_unlock(&mdsc->mutex); 5827 mutex_lock(&s->s_mutex); 5828 mutex_lock(&mdsc->mutex); 5829 ceph_con_close(&s->s_con); 5830 mutex_unlock(&s->s_mutex); 5831 s->s_state = CEPH_MDS_SESSION_RESTARTING; 5832 } else if (oldstate == newstate) { 5833 continue; /* nothing new with this mds */ 5834 } 5835 5836 /* 5837 * send reconnect? 5838 */ 5839 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 5840 newstate >= CEPH_MDS_STATE_RECONNECT) { 5841 int rc; 5842 5843 mutex_unlock(&mdsc->mutex); 5844 clear_bit(i, targets); 5845 rc = send_mds_reconnect(mdsc, s); 5846 if (rc) 5847 pr_warn_client(cl, 5848 "mds%d reconnect failed: %d\n", 5849 i, rc); 5850 mutex_lock(&mdsc->mutex); 5851 } 5852 5853 /* 5854 * kick request on any mds that has gone active. 5855 */ 5856 if (oldstate < CEPH_MDS_STATE_ACTIVE && 5857 newstate >= CEPH_MDS_STATE_ACTIVE) { 5858 if (oldstate != CEPH_MDS_STATE_CREATING && 5859 oldstate != CEPH_MDS_STATE_STARTING) 5860 pr_info_client(cl, "mds%d recovery completed\n", 5861 s->s_mds); 5862 kick_requests(mdsc, i); 5863 mutex_unlock(&mdsc->mutex); 5864 mutex_lock(&s->s_mutex); 5865 mutex_lock(&mdsc->mutex); 5866 ceph_kick_flushing_caps(mdsc, s); 5867 mutex_unlock(&s->s_mutex); 5868 wake_up_session_caps(s, RECONNECT); 5869 } 5870 } 5871 5872 /* 5873 * Only open and reconnect sessions that don't exist yet. 5874 */ 5875 for (i = 0; i < newmap->possible_max_rank; i++) { 5876 /* 5877 * In case the import MDS is crashed just after 5878 * the EImportStart journal is flushed, so when 5879 * a standby MDS takes over it and is replaying 5880 * the EImportStart journal the new MDS daemon 5881 * will wait the client to reconnect it, but the 5882 * client may never register/open the session yet. 5883 * 5884 * Will try to reconnect that MDS daemon if the 5885 * rank number is in the export targets array and 5886 * is the up:reconnect state. 5887 */ 5888 newstate = ceph_mdsmap_get_state(newmap, i); 5889 if (!test_bit(i, targets) || newstate != CEPH_MDS_STATE_RECONNECT) 5890 continue; 5891 5892 /* 5893 * The session maybe registered and opened by some 5894 * requests which were choosing random MDSes during 5895 * the mdsc->mutex's unlock/lock gap below in rare 5896 * case. But the related MDS daemon will just queue 5897 * that requests and be still waiting for the client's 5898 * reconnection request in up:reconnect state. 5899 */ 5900 s = __ceph_lookup_mds_session(mdsc, i); 5901 if (likely(!s)) { 5902 s = __open_export_target_session(mdsc, i); 5903 if (IS_ERR(s)) { 5904 err = PTR_ERR(s); 5905 pr_err_client(cl, 5906 "failed to open export target session, err %d\n", 5907 err); 5908 continue; 5909 } 5910 } 5911 doutc(cl, "send reconnect to export target mds.%d\n", i); 5912 mutex_unlock(&mdsc->mutex); 5913 err = send_mds_reconnect(mdsc, s); 5914 if (err) 5915 pr_warn_client(cl, 5916 "mds%d export target reconnect failed: %d\n", 5917 i, err); 5918 ceph_put_mds_session(s); 5919 mutex_lock(&mdsc->mutex); 5920 } 5921 5922 for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) { 5923 s = mdsc->sessions[i]; 5924 if (!s) 5925 continue; 5926 if (!ceph_mdsmap_is_laggy(newmap, i)) 5927 continue; 5928 if (s->s_state == CEPH_MDS_SESSION_OPEN || 5929 s->s_state == CEPH_MDS_SESSION_HUNG || 5930 s->s_state == CEPH_MDS_SESSION_CLOSING) { 5931 doutc(cl, " connecting to export targets of laggy mds%d\n", i); 5932 __open_export_target_sessions(mdsc, s); 5933 } 5934 } 5935 } 5936 5937 5938 5939 /* 5940 * leases 5941 */ 5942 5943 /* 5944 * caller must hold session s_mutex, dentry->d_lock 5945 */ 5946 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry) 5947 { 5948 struct ceph_dentry_info *di = ceph_dentry(dentry); 5949 5950 ceph_put_mds_session(di->lease_session); 5951 di->lease_session = NULL; 5952 } 5953 5954 static void handle_lease(struct ceph_mds_client *mdsc, 5955 struct ceph_mds_session *session, 5956 struct ceph_msg *msg) 5957 { 5958 struct ceph_client *cl = mdsc->fsc->client; 5959 struct super_block *sb = mdsc->fsc->sb; 5960 struct inode *inode; 5961 struct dentry *parent, *dentry; 5962 struct ceph_dentry_info *di; 5963 int mds = session->s_mds; 5964 struct ceph_mds_lease *h = msg->front.iov_base; 5965 u32 seq; 5966 struct ceph_vino vino; 5967 struct qstr dname; 5968 int release = 0; 5969 5970 doutc(cl, "from mds%d\n", mds); 5971 5972 if (!ceph_inc_mds_stopping_blocker(mdsc, session)) 5973 return; 5974 5975 /* decode */ 5976 if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) 5977 goto bad; 5978 vino.ino = le64_to_cpu(h->ino); 5979 vino.snap = CEPH_NOSNAP; 5980 seq = le32_to_cpu(h->seq); 5981 dname.len = get_unaligned_le32(h + 1); 5982 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len) 5983 goto bad; 5984 dname.name = (void *)(h + 1) + sizeof(u32); 5985 5986 /* lookup inode */ 5987 inode = ceph_find_inode(sb, vino); 5988 doutc(cl, "%s, ino %llx %p %.*s\n", ceph_lease_op_name(h->action), 5989 vino.ino, inode, dname.len, dname.name); 5990 5991 mutex_lock(&session->s_mutex); 5992 if (!inode) { 5993 doutc(cl, "no inode %llx\n", vino.ino); 5994 goto release; 5995 } 5996 5997 /* dentry */ 5998 parent = d_find_alias(inode); 5999 if (!parent) { 6000 doutc(cl, "no parent dentry on inode %p\n", inode); 6001 WARN_ON(1); 6002 goto release; /* hrm... */ 6003 } 6004 dname.hash = full_name_hash(parent, dname.name, dname.len); 6005 dentry = d_lookup(parent, &dname); 6006 dput(parent); 6007 if (!dentry) 6008 goto release; 6009 6010 spin_lock(&dentry->d_lock); 6011 di = ceph_dentry(dentry); 6012 switch (h->action) { 6013 case CEPH_MDS_LEASE_REVOKE: 6014 if (di->lease_session == session) { 6015 if (ceph_seq_cmp(di->lease_seq, seq) > 0) 6016 h->seq = cpu_to_le32(di->lease_seq); 6017 __ceph_mdsc_drop_dentry_lease(dentry); 6018 } 6019 release = 1; 6020 break; 6021 6022 case CEPH_MDS_LEASE_RENEW: 6023 if (di->lease_session == session && 6024 di->lease_gen == atomic_read(&session->s_cap_gen) && 6025 di->lease_renew_from && 6026 di->lease_renew_after == 0) { 6027 unsigned long duration = 6028 msecs_to_jiffies(le32_to_cpu(h->duration_ms)); 6029 6030 di->lease_seq = seq; 6031 di->time = di->lease_renew_from + duration; 6032 di->lease_renew_after = di->lease_renew_from + 6033 (duration >> 1); 6034 di->lease_renew_from = 0; 6035 } 6036 break; 6037 } 6038 spin_unlock(&dentry->d_lock); 6039 dput(dentry); 6040 6041 if (!release) 6042 goto out; 6043 6044 release: 6045 /* let's just reuse the same message */ 6046 h->action = CEPH_MDS_LEASE_REVOKE_ACK; 6047 ceph_msg_get(msg); 6048 ceph_con_send(&session->s_con, msg); 6049 6050 out: 6051 mutex_unlock(&session->s_mutex); 6052 iput(inode); 6053 6054 ceph_dec_mds_stopping_blocker(mdsc); 6055 return; 6056 6057 bad: 6058 ceph_dec_mds_stopping_blocker(mdsc); 6059 6060 pr_err_client(cl, "corrupt lease message\n"); 6061 ceph_msg_dump(msg); 6062 } 6063 6064 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, 6065 struct dentry *dentry, char action, 6066 u32 seq) 6067 { 6068 struct ceph_client *cl = session->s_mdsc->fsc->client; 6069 struct ceph_msg *msg; 6070 struct ceph_mds_lease *lease; 6071 struct inode *dir; 6072 int len = sizeof(*lease) + sizeof(u32) + NAME_MAX; 6073 6074 doutc(cl, "identry %p %s to mds%d\n", dentry, ceph_lease_op_name(action), 6075 session->s_mds); 6076 6077 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); 6078 if (!msg) 6079 return; 6080 lease = msg->front.iov_base; 6081 lease->action = action; 6082 lease->seq = cpu_to_le32(seq); 6083 6084 spin_lock(&dentry->d_lock); 6085 dir = d_inode(dentry->d_parent); 6086 lease->ino = cpu_to_le64(ceph_ino(dir)); 6087 lease->first = lease->last = cpu_to_le64(ceph_snap(dir)); 6088 6089 put_unaligned_le32(dentry->d_name.len, lease + 1); 6090 memcpy((void *)(lease + 1) + 4, 6091 dentry->d_name.name, dentry->d_name.len); 6092 spin_unlock(&dentry->d_lock); 6093 6094 ceph_con_send(&session->s_con, msg); 6095 } 6096 6097 /* 6098 * lock unlock the session, to wait ongoing session activities 6099 */ 6100 static void lock_unlock_session(struct ceph_mds_session *s) 6101 { 6102 mutex_lock(&s->s_mutex); 6103 mutex_unlock(&s->s_mutex); 6104 } 6105 6106 static void maybe_recover_session(struct ceph_mds_client *mdsc) 6107 { 6108 struct ceph_client *cl = mdsc->fsc->client; 6109 struct ceph_fs_client *fsc = mdsc->fsc; 6110 6111 if (!ceph_test_mount_opt(fsc, CLEANRECOVER)) 6112 return; 6113 6114 if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED) 6115 return; 6116 6117 if (!READ_ONCE(fsc->blocklisted)) 6118 return; 6119 6120 pr_info_client(cl, "auto reconnect after blocklisted\n"); 6121 ceph_force_reconnect(fsc->sb); 6122 } 6123 6124 bool check_session_state(struct ceph_mds_session *s) 6125 { 6126 struct ceph_client *cl = s->s_mdsc->fsc->client; 6127 6128 switch (s->s_state) { 6129 case CEPH_MDS_SESSION_OPEN: 6130 if (s->s_ttl && time_after(jiffies, s->s_ttl)) { 6131 s->s_state = CEPH_MDS_SESSION_HUNG; 6132 pr_info_client(cl, "mds%d hung\n", s->s_mds); 6133 } 6134 break; 6135 case CEPH_MDS_SESSION_CLOSING: 6136 case CEPH_MDS_SESSION_NEW: 6137 case CEPH_MDS_SESSION_RESTARTING: 6138 case CEPH_MDS_SESSION_CLOSED: 6139 case CEPH_MDS_SESSION_REJECTED: 6140 return false; 6141 } 6142 6143 return true; 6144 } 6145 6146 /* 6147 * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply, 6148 * then we need to retransmit that request. 6149 */ 6150 void inc_session_sequence(struct ceph_mds_session *s) 6151 { 6152 struct ceph_client *cl = s->s_mdsc->fsc->client; 6153 6154 lockdep_assert_held(&s->s_mutex); 6155 6156 s->s_seq++; 6157 6158 if (s->s_state == CEPH_MDS_SESSION_CLOSING) { 6159 int ret; 6160 6161 doutc(cl, "resending session close request for mds%d\n", s->s_mds); 6162 ret = request_close_session(s); 6163 if (ret < 0) 6164 pr_err_client(cl, "unable to close session to mds%d: %d\n", 6165 s->s_mds, ret); 6166 } 6167 } 6168 6169 /* 6170 * delayed work -- periodically trim expired leases, renew caps with mds. If 6171 * the @delay parameter is set to 0 or if it's more than 5 secs, the default 6172 * workqueue delay value of 5 secs will be used. 6173 */ 6174 static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay) 6175 { 6176 unsigned long max_delay = HZ * 5; 6177 6178 /* 5 secs default delay */ 6179 if (!delay || (delay > max_delay)) 6180 delay = max_delay; 6181 schedule_delayed_work(&mdsc->delayed_work, 6182 round_jiffies_relative(delay)); 6183 } 6184 6185 static void delayed_work(struct work_struct *work) 6186 { 6187 struct ceph_mds_client *mdsc = 6188 container_of(work, struct ceph_mds_client, delayed_work.work); 6189 unsigned long delay; 6190 int renew_interval; 6191 int renew_caps; 6192 int i; 6193 6194 doutc(mdsc->fsc->client, "mdsc delayed_work\n"); 6195 6196 if (mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHED) 6197 return; 6198 6199 mutex_lock(&mdsc->mutex); 6200 renew_interval = mdsc->mdsmap->m_session_timeout >> 2; 6201 renew_caps = time_after_eq(jiffies, HZ*renew_interval + 6202 mdsc->last_renew_caps); 6203 if (renew_caps) 6204 mdsc->last_renew_caps = jiffies; 6205 6206 for (i = 0; i < mdsc->max_sessions; i++) { 6207 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 6208 if (!s) 6209 continue; 6210 6211 if (!check_session_state(s)) { 6212 ceph_put_mds_session(s); 6213 continue; 6214 } 6215 mutex_unlock(&mdsc->mutex); 6216 6217 ceph_flush_session_cap_releases(mdsc, s); 6218 6219 mutex_lock(&s->s_mutex); 6220 if (renew_caps) 6221 send_renew_caps(mdsc, s); 6222 else 6223 ceph_con_keepalive(&s->s_con); 6224 if (s->s_state == CEPH_MDS_SESSION_OPEN || 6225 s->s_state == CEPH_MDS_SESSION_HUNG) 6226 ceph_send_cap_releases(mdsc, s); 6227 mutex_unlock(&s->s_mutex); 6228 ceph_put_mds_session(s); 6229 6230 mutex_lock(&mdsc->mutex); 6231 } 6232 mutex_unlock(&mdsc->mutex); 6233 6234 delay = ceph_check_delayed_caps(mdsc); 6235 6236 ceph_queue_cap_reclaim_work(mdsc); 6237 6238 ceph_trim_snapid_map(mdsc); 6239 6240 maybe_recover_session(mdsc); 6241 6242 schedule_delayed(mdsc, delay); 6243 } 6244 6245 int ceph_mdsc_init(struct ceph_fs_client *fsc) 6246 6247 { 6248 struct ceph_mds_client *mdsc; 6249 int err; 6250 6251 mdsc = kzalloc_obj(struct ceph_mds_client, GFP_NOFS); 6252 if (!mdsc) 6253 return -ENOMEM; 6254 mdsc->fsc = fsc; 6255 mutex_init(&mdsc->mutex); 6256 mdsc->mdsmap = kzalloc_obj(*mdsc->mdsmap, GFP_NOFS); 6257 if (!mdsc->mdsmap) { 6258 err = -ENOMEM; 6259 goto err_mdsc; 6260 } 6261 6262 init_completion(&mdsc->safe_umount_waiters); 6263 spin_lock_init(&mdsc->stopping_lock); 6264 atomic_set(&mdsc->stopping_blockers, 0); 6265 init_completion(&mdsc->stopping_waiter); 6266 atomic64_set(&mdsc->dirty_folios, 0); 6267 init_waitqueue_head(&mdsc->flush_end_wq); 6268 init_waitqueue_head(&mdsc->session_close_wq); 6269 INIT_LIST_HEAD(&mdsc->waiting_for_map); 6270 mdsc->quotarealms_inodes = RB_ROOT; 6271 mutex_init(&mdsc->quotarealms_inodes_mutex); 6272 init_rwsem(&mdsc->snap_rwsem); 6273 mdsc->snap_realms = RB_ROOT; 6274 INIT_LIST_HEAD(&mdsc->snap_empty); 6275 spin_lock_init(&mdsc->snap_empty_lock); 6276 mdsc->request_tree = RB_ROOT; 6277 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); 6278 mdsc->last_renew_caps = jiffies; 6279 INIT_LIST_HEAD(&mdsc->cap_delay_list); 6280 #ifdef CONFIG_DEBUG_FS 6281 INIT_LIST_HEAD(&mdsc->cap_wait_list); 6282 #endif 6283 spin_lock_init(&mdsc->cap_delay_lock); 6284 INIT_LIST_HEAD(&mdsc->cap_unlink_delay_list); 6285 INIT_LIST_HEAD(&mdsc->snap_flush_list); 6286 spin_lock_init(&mdsc->snap_flush_lock); 6287 mdsc->last_cap_flush_tid = 1; 6288 INIT_LIST_HEAD(&mdsc->cap_flush_list); 6289 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); 6290 spin_lock_init(&mdsc->cap_dirty_lock); 6291 init_waitqueue_head(&mdsc->cap_flushing_wq); 6292 INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work); 6293 INIT_WORK(&mdsc->cap_unlink_work, ceph_cap_unlink_work); 6294 err = ceph_metric_init(&mdsc->metric); 6295 if (err) 6296 goto err_mdsmap; 6297 ceph_subvolume_metrics_init(&mdsc->subvol_metrics); 6298 mutex_init(&mdsc->subvol_metrics_last_mutex); 6299 mdsc->subvol_metrics_last = NULL; 6300 mdsc->subvol_metrics_last_nr = 0; 6301 mdsc->subvol_metrics_sent = 0; 6302 mdsc->subvol_metrics_nonzero_sends = 0; 6303 6304 spin_lock_init(&mdsc->dentry_list_lock); 6305 INIT_LIST_HEAD(&mdsc->dentry_leases); 6306 INIT_LIST_HEAD(&mdsc->dentry_dir_leases); 6307 6308 spin_lock_init(&mdsc->reset_state.lock); 6309 init_waitqueue_head(&mdsc->reset_state.blocked_wq); 6310 atomic_set(&mdsc->reset_state.blocked_requests, 0); 6311 INIT_WORK(&mdsc->reset_work, ceph_mdsc_reset_workfn); 6312 6313 ceph_caps_init(mdsc); 6314 ceph_adjust_caps_max_min(mdsc, fsc->mount_options); 6315 6316 spin_lock_init(&mdsc->snapid_map_lock); 6317 mdsc->snapid_map_tree = RB_ROOT; 6318 INIT_LIST_HEAD(&mdsc->snapid_map_lru); 6319 6320 init_rwsem(&mdsc->pool_perm_rwsem); 6321 mdsc->pool_perm_tree = RB_ROOT; 6322 6323 strscpy(mdsc->nodename, utsname()->nodename, 6324 sizeof(mdsc->nodename)); 6325 6326 fsc->mdsc = mdsc; 6327 return 0; 6328 6329 err_mdsmap: 6330 kfree(mdsc->mdsmap); 6331 err_mdsc: 6332 kfree(mdsc); 6333 return err; 6334 } 6335 6336 /* 6337 * Wait for safe replies on open mds requests. If we time out, drop 6338 * all requests from the tree to avoid dangling dentry refs. 6339 */ 6340 static void wait_requests(struct ceph_mds_client *mdsc) 6341 { 6342 struct ceph_client *cl = mdsc->fsc->client; 6343 struct ceph_options *opts = mdsc->fsc->client->options; 6344 struct ceph_mds_request *req; 6345 6346 mutex_lock(&mdsc->mutex); 6347 if (__get_oldest_req(mdsc)) { 6348 mutex_unlock(&mdsc->mutex); 6349 6350 doutc(cl, "waiting for requests\n"); 6351 wait_for_completion_timeout(&mdsc->safe_umount_waiters, 6352 ceph_timeout_jiffies(opts->mount_timeout)); 6353 6354 /* tear down remaining requests */ 6355 mutex_lock(&mdsc->mutex); 6356 while ((req = __get_oldest_req(mdsc))) { 6357 doutc(cl, "timed out on tid %llu\n", req->r_tid); 6358 list_del_init(&req->r_wait); 6359 __unregister_request(mdsc, req); 6360 } 6361 } 6362 mutex_unlock(&mdsc->mutex); 6363 doutc(cl, "done\n"); 6364 } 6365 6366 void send_flush_mdlog(struct ceph_mds_session *s) 6367 { 6368 struct ceph_client *cl = s->s_mdsc->fsc->client; 6369 struct ceph_msg *msg; 6370 6371 /* 6372 * Pre-luminous MDS crashes when it sees an unknown session request 6373 */ 6374 if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS)) 6375 return; 6376 6377 mutex_lock(&s->s_mutex); 6378 doutc(cl, "request mdlog flush to mds%d (%s)s seq %lld\n", 6379 s->s_mds, ceph_session_state_name(s->s_state), s->s_seq); 6380 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG, 6381 s->s_seq); 6382 if (!msg) { 6383 pr_err_client(cl, "failed to request mdlog flush to mds%d (%s) seq %lld\n", 6384 s->s_mds, ceph_session_state_name(s->s_state), s->s_seq); 6385 } else { 6386 ceph_con_send(&s->s_con, msg); 6387 } 6388 mutex_unlock(&s->s_mutex); 6389 } 6390 6391 static int ceph_mds_auth_match(struct ceph_mds_client *mdsc, 6392 struct ceph_mds_cap_auth *auth, 6393 const struct cred *cred, 6394 char *tpath) 6395 { 6396 u32 caller_uid = from_kuid(&init_user_ns, cred->fsuid); 6397 u32 caller_gid = from_kgid(&init_user_ns, cred->fsgid); 6398 struct ceph_client *cl = mdsc->fsc->client; 6399 const char *fs_name = mdsc->mdsmap->m_fs_name; 6400 const char *spath = mdsc->fsc->mount_options->server_path; 6401 bool gid_matched = false; 6402 u32 gid, tlen, len; 6403 int i, j; 6404 6405 doutc(cl, "fsname check fs_name=%s match.fs_name=%s\n", 6406 fs_name, auth->match.fs_name ? auth->match.fs_name : ""); 6407 6408 if (!ceph_namespace_match(auth->match.fs_name, fs_name)) { 6409 /* fsname mismatch, try next one */ 6410 return 0; 6411 } 6412 6413 doutc(cl, "match.uid %lld\n", auth->match.uid); 6414 if (auth->match.uid != MDS_AUTH_UID_ANY) { 6415 if (auth->match.uid != caller_uid) 6416 return 0; 6417 if (auth->match.num_gids) { 6418 for (i = 0; i < auth->match.num_gids; i++) { 6419 if (caller_gid == auth->match.gids[i]) 6420 gid_matched = true; 6421 } 6422 if (!gid_matched && cred->group_info->ngroups) { 6423 for (i = 0; i < cred->group_info->ngroups; i++) { 6424 gid = from_kgid(&init_user_ns, 6425 cred->group_info->gid[i]); 6426 for (j = 0; j < auth->match.num_gids; j++) { 6427 if (gid == auth->match.gids[j]) { 6428 gid_matched = true; 6429 break; 6430 } 6431 } 6432 if (gid_matched) 6433 break; 6434 } 6435 } 6436 if (!gid_matched) 6437 return 0; 6438 } 6439 } 6440 6441 /* path match */ 6442 if (auth->match.path) { 6443 if (!tpath) 6444 return 0; 6445 6446 tlen = strlen(tpath); 6447 len = strlen(auth->match.path); 6448 if (len) { 6449 char *_tpath = tpath; 6450 bool free_tpath = false; 6451 int m, n; 6452 6453 doutc(cl, "server path %s, tpath %s, match.path %s\n", 6454 spath, tpath, auth->match.path); 6455 if (spath && (m = strlen(spath)) != 1) { 6456 /* mount path + '/' + tpath + an extra space */ 6457 n = m + 1 + tlen + 1; 6458 _tpath = kmalloc(n, GFP_NOFS); 6459 if (!_tpath) 6460 return -ENOMEM; 6461 /* remove the leading '/' */ 6462 snprintf(_tpath, n, "%s/%s", spath + 1, tpath); 6463 free_tpath = true; 6464 tlen = strlen(_tpath); 6465 } 6466 6467 /* 6468 * Please note the tailing '/' for match.path has already 6469 * been removed when parsing. 6470 * 6471 * Remove the tailing '/' for the target path. 6472 */ 6473 while (tlen && _tpath[tlen - 1] == '/') { 6474 _tpath[tlen - 1] = '\0'; 6475 tlen -= 1; 6476 } 6477 doutc(cl, "_tpath %s\n", _tpath); 6478 6479 /* 6480 * In case first == _tpath && tlen == len: 6481 * match.path=/foo --> /foo _path=/foo --> match 6482 * match.path=/foo/ --> /foo _path=/foo --> match 6483 * 6484 * In case first == _tmatch.path && tlen > len: 6485 * match.path=/foo/ --> /foo _path=/foo/ --> match 6486 * match.path=/foo --> /foo _path=/foo/ --> match 6487 * match.path=/foo/ --> /foo _path=/foo/d --> match 6488 * match.path=/foo --> /foo _path=/food --> mismatch 6489 * 6490 * All the other cases --> mismatch 6491 */ 6492 bool path_matched = true; 6493 char *first = strstr(_tpath, auth->match.path); 6494 if (first != _tpath || 6495 (tlen > len && _tpath[len] != '/')) { 6496 path_matched = false; 6497 } 6498 6499 if (free_tpath) 6500 kfree(_tpath); 6501 6502 if (!path_matched) 6503 return 0; 6504 } 6505 } 6506 6507 doutc(cl, "matched\n"); 6508 return 1; 6509 } 6510 6511 int ceph_mds_check_access(struct ceph_mds_client *mdsc, char *tpath, int mask) 6512 { 6513 const struct cred *cred = get_current_cred(); 6514 u32 caller_uid = from_kuid(&init_user_ns, cred->fsuid); 6515 u32 caller_gid = from_kgid(&init_user_ns, cred->fsgid); 6516 struct ceph_mds_cap_auth *rw_perms_s = NULL; 6517 struct ceph_client *cl = mdsc->fsc->client; 6518 bool root_squash_perms = true; 6519 int i, err; 6520 6521 doutc(cl, "tpath '%s', mask %d, caller_uid %d, caller_gid %d\n", 6522 tpath, mask, caller_uid, caller_gid); 6523 6524 for (i = 0; i < mdsc->s_cap_auths_num; i++) { 6525 struct ceph_mds_cap_auth *s = &mdsc->s_cap_auths[i]; 6526 6527 err = ceph_mds_auth_match(mdsc, s, cred, tpath); 6528 if (err < 0) { 6529 put_cred(cred); 6530 return err; 6531 } else if (err > 0) { 6532 /* always follow the last auth caps' permission */ 6533 root_squash_perms = true; 6534 rw_perms_s = NULL; 6535 if ((mask & MAY_WRITE) && s->writeable && 6536 s->match.root_squash && (!caller_uid || !caller_gid)) 6537 root_squash_perms = false; 6538 6539 if (((mask & MAY_WRITE) && !s->writeable) || 6540 ((mask & MAY_READ) && !s->readable)) 6541 rw_perms_s = s; 6542 } 6543 } 6544 6545 put_cred(cred); 6546 6547 doutc(cl, "root_squash_perms %d, rw_perms_s %p\n", root_squash_perms, 6548 rw_perms_s); 6549 if (root_squash_perms && rw_perms_s == NULL) { 6550 doutc(cl, "access allowed\n"); 6551 return 0; 6552 } 6553 6554 if (!root_squash_perms) { 6555 doutc(cl, "root_squash is enabled and user(%d %d) isn't allowed to write", 6556 caller_uid, caller_gid); 6557 } 6558 if (rw_perms_s) { 6559 doutc(cl, "mds auth caps readable/writeable %d/%d while request r/w %d/%d", 6560 rw_perms_s->readable, rw_perms_s->writeable, 6561 !!(mask & MAY_READ), !!(mask & MAY_WRITE)); 6562 } 6563 doutc(cl, "access denied\n"); 6564 return -EACCES; 6565 } 6566 6567 /* 6568 * called before mount is ro, and before dentries are torn down. 6569 * (hmm, does this still race with new lookups?) 6570 */ 6571 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) 6572 { 6573 doutc(mdsc->fsc->client, "begin\n"); 6574 mdsc->stopping = CEPH_MDSC_STOPPING_BEGIN; 6575 6576 ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true); 6577 ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false); 6578 ceph_flush_dirty_caps(mdsc); 6579 wait_requests(mdsc); 6580 6581 /* 6582 * wait for reply handlers to drop their request refs and 6583 * their inode/dcache refs 6584 */ 6585 ceph_msgr_flush(); 6586 6587 ceph_cleanup_quotarealms_inodes(mdsc); 6588 doutc(mdsc->fsc->client, "done\n"); 6589 } 6590 6591 /* 6592 * flush the mdlog and wait for all write mds requests to flush. 6593 */ 6594 static void flush_mdlog_and_wait_mdsc_unsafe_requests(struct ceph_mds_client *mdsc, 6595 u64 want_tid) 6596 { 6597 struct ceph_client *cl = mdsc->fsc->client; 6598 struct ceph_mds_request *req = NULL, *nextreq; 6599 struct ceph_mds_session *last_session = NULL; 6600 struct rb_node *n; 6601 6602 mutex_lock(&mdsc->mutex); 6603 doutc(cl, "want %lld\n", want_tid); 6604 restart: 6605 req = __get_oldest_req(mdsc); 6606 while (req && req->r_tid <= want_tid) { 6607 /* find next request */ 6608 n = rb_next(&req->r_node); 6609 if (n) 6610 nextreq = rb_entry(n, struct ceph_mds_request, r_node); 6611 else 6612 nextreq = NULL; 6613 if (req->r_op != CEPH_MDS_OP_SETFILELOCK && 6614 (req->r_op & CEPH_MDS_OP_WRITE)) { 6615 struct ceph_mds_session *s = req->r_session; 6616 6617 if (!s) { 6618 req = nextreq; 6619 continue; 6620 } 6621 6622 /* write op */ 6623 ceph_mdsc_get_request(req); 6624 if (nextreq) 6625 ceph_mdsc_get_request(nextreq); 6626 s = ceph_get_mds_session(s); 6627 mutex_unlock(&mdsc->mutex); 6628 6629 /* send flush mdlog request to MDS */ 6630 if (last_session != s) { 6631 send_flush_mdlog(s); 6632 ceph_put_mds_session(last_session); 6633 last_session = s; 6634 } else { 6635 ceph_put_mds_session(s); 6636 } 6637 doutc(cl, "wait on %llu (want %llu)\n", 6638 req->r_tid, want_tid); 6639 wait_for_completion(&req->r_safe_completion); 6640 6641 mutex_lock(&mdsc->mutex); 6642 ceph_mdsc_put_request(req); 6643 if (!nextreq) 6644 break; /* next dne before, so we're done! */ 6645 if (RB_EMPTY_NODE(&nextreq->r_node)) { 6646 /* next request was removed from tree */ 6647 ceph_mdsc_put_request(nextreq); 6648 goto restart; 6649 } 6650 ceph_mdsc_put_request(nextreq); /* won't go away */ 6651 } 6652 req = nextreq; 6653 } 6654 mutex_unlock(&mdsc->mutex); 6655 ceph_put_mds_session(last_session); 6656 doutc(cl, "done\n"); 6657 } 6658 6659 void ceph_mdsc_sync(struct ceph_mds_client *mdsc) 6660 { 6661 struct ceph_client *cl = mdsc->fsc->client; 6662 u64 want_tid, want_flush; 6663 6664 if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) 6665 return; 6666 6667 doutc(cl, "sync\n"); 6668 mutex_lock(&mdsc->mutex); 6669 want_tid = mdsc->last_tid; 6670 mutex_unlock(&mdsc->mutex); 6671 6672 ceph_flush_dirty_caps(mdsc); 6673 ceph_flush_cap_releases(mdsc); 6674 spin_lock(&mdsc->cap_dirty_lock); 6675 want_flush = mdsc->last_cap_flush_tid; 6676 if (!list_empty(&mdsc->cap_flush_list)) { 6677 struct ceph_cap_flush *cf = 6678 list_last_entry(&mdsc->cap_flush_list, 6679 struct ceph_cap_flush, g_list); 6680 cf->wake = true; 6681 } 6682 spin_unlock(&mdsc->cap_dirty_lock); 6683 6684 doutc(cl, "sync want tid %lld flush_seq %lld\n", want_tid, want_flush); 6685 6686 flush_mdlog_and_wait_mdsc_unsafe_requests(mdsc, want_tid); 6687 wait_caps_flush(mdsc, want_flush); 6688 } 6689 6690 /* 6691 * true if all sessions are closed, or we force unmount 6692 */ 6693 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped) 6694 { 6695 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 6696 return true; 6697 return atomic_read(&mdsc->num_sessions) <= skipped; 6698 } 6699 6700 /* 6701 * called after sb is ro or when metadata corrupted. 6702 */ 6703 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) 6704 { 6705 struct ceph_options *opts = mdsc->fsc->client->options; 6706 struct ceph_client *cl = mdsc->fsc->client; 6707 struct ceph_mds_session *session; 6708 int i; 6709 int skipped = 0; 6710 6711 doutc(cl, "begin\n"); 6712 6713 /* close sessions */ 6714 mutex_lock(&mdsc->mutex); 6715 for (i = 0; i < mdsc->max_sessions; i++) { 6716 session = __ceph_lookup_mds_session(mdsc, i); 6717 if (!session) 6718 continue; 6719 mutex_unlock(&mdsc->mutex); 6720 mutex_lock(&session->s_mutex); 6721 if (__close_session(mdsc, session) <= 0) 6722 skipped++; 6723 mutex_unlock(&session->s_mutex); 6724 ceph_put_mds_session(session); 6725 mutex_lock(&mdsc->mutex); 6726 } 6727 mutex_unlock(&mdsc->mutex); 6728 6729 doutc(cl, "waiting for sessions to close\n"); 6730 wait_event_timeout(mdsc->session_close_wq, 6731 done_closing_sessions(mdsc, skipped), 6732 ceph_timeout_jiffies(opts->mount_timeout)); 6733 6734 /* tear down remaining sessions */ 6735 mutex_lock(&mdsc->mutex); 6736 for (i = 0; i < mdsc->max_sessions; i++) { 6737 if (mdsc->sessions[i]) { 6738 session = ceph_get_mds_session(mdsc->sessions[i]); 6739 __unregister_session(mdsc, session); 6740 mutex_unlock(&mdsc->mutex); 6741 mutex_lock(&session->s_mutex); 6742 remove_session_caps(session); 6743 mutex_unlock(&session->s_mutex); 6744 ceph_put_mds_session(session); 6745 mutex_lock(&mdsc->mutex); 6746 } 6747 } 6748 WARN_ON(!list_empty(&mdsc->cap_delay_list)); 6749 mutex_unlock(&mdsc->mutex); 6750 6751 ceph_cleanup_snapid_map(mdsc); 6752 ceph_cleanup_global_and_empty_realms(mdsc); 6753 6754 cancel_work_sync(&mdsc->cap_reclaim_work); 6755 cancel_work_sync(&mdsc->cap_unlink_work); 6756 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 6757 6758 doutc(cl, "done\n"); 6759 } 6760 6761 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc) 6762 { 6763 struct ceph_mds_session *session; 6764 int mds; 6765 6766 doutc(mdsc->fsc->client, "force umount\n"); 6767 6768 mutex_lock(&mdsc->mutex); 6769 for (mds = 0; mds < mdsc->max_sessions; mds++) { 6770 session = __ceph_lookup_mds_session(mdsc, mds); 6771 if (!session) 6772 continue; 6773 6774 if (session->s_state == CEPH_MDS_SESSION_REJECTED) 6775 __unregister_session(mdsc, session); 6776 __wake_requests(mdsc, &session->s_waiting); 6777 mutex_unlock(&mdsc->mutex); 6778 6779 mutex_lock(&session->s_mutex); 6780 __close_session(mdsc, session); 6781 if (session->s_state == CEPH_MDS_SESSION_CLOSING) { 6782 cleanup_session_requests(mdsc, session); 6783 remove_session_caps(session); 6784 } 6785 mutex_unlock(&session->s_mutex); 6786 ceph_put_mds_session(session); 6787 6788 mutex_lock(&mdsc->mutex); 6789 kick_requests(mdsc, mds); 6790 } 6791 __wake_requests(mdsc, &mdsc->waiting_for_map); 6792 mutex_unlock(&mdsc->mutex); 6793 } 6794 6795 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) 6796 { 6797 doutc(mdsc->fsc->client, "stop\n"); 6798 /* 6799 * Make sure the delayed work stopped before releasing 6800 * the resources. 6801 * 6802 * Because the cancel_delayed_work_sync() will only 6803 * guarantee that the work finishes executing. But the 6804 * delayed work will re-arm itself again after that. 6805 */ 6806 flush_delayed_work(&mdsc->delayed_work); 6807 6808 if (mdsc->mdsmap) 6809 ceph_mdsmap_destroy(mdsc->mdsmap); 6810 kfree(mdsc->sessions); 6811 ceph_caps_finalize(mdsc); 6812 6813 if (mdsc->s_cap_auths) { 6814 int i; 6815 6816 for (i = 0; i < mdsc->s_cap_auths_num; i++) { 6817 kfree(mdsc->s_cap_auths[i].match.gids); 6818 kfree(mdsc->s_cap_auths[i].match.path); 6819 kfree(mdsc->s_cap_auths[i].match.fs_name); 6820 } 6821 kfree(mdsc->s_cap_auths); 6822 } 6823 6824 ceph_pool_perm_destroy(mdsc); 6825 } 6826 6827 void ceph_mdsc_destroy(struct ceph_fs_client *fsc) 6828 { 6829 struct ceph_mds_client *mdsc = fsc->mdsc; 6830 doutc(fsc->client, "%p\n", mdsc); 6831 6832 if (!mdsc) 6833 return; 6834 6835 /* flush out any connection work with references to us */ 6836 ceph_msgr_flush(); 6837 6838 /* 6839 * Mark reset as failed and wake any blocked waiters before 6840 * cancelling, so unmount doesn't stall on blocked_wq timeout 6841 * if cancel_work_sync() prevents the work from running. 6842 */ 6843 spin_lock(&mdsc->reset_state.lock); 6844 mdsc->reset_state.shutdown = true; 6845 if (mdsc->reset_state.phase != CEPH_CLIENT_RESET_IDLE) { 6846 mdsc->reset_state.phase = CEPH_CLIENT_RESET_IDLE; 6847 mdsc->reset_state.last_errno = -ESHUTDOWN; 6848 mdsc->reset_state.last_finish = jiffies; 6849 mdsc->reset_state.failure_count++; 6850 } 6851 spin_unlock(&mdsc->reset_state.lock); 6852 wake_up_all(&mdsc->reset_state.blocked_wq); 6853 6854 cancel_work_sync(&mdsc->reset_work); 6855 ceph_mdsc_stop(mdsc); 6856 6857 ceph_metric_destroy(&mdsc->metric); 6858 ceph_subvolume_metrics_destroy(&mdsc->subvol_metrics); 6859 kfree(mdsc->subvol_metrics_last); 6860 6861 fsc->mdsc = NULL; 6862 kfree(mdsc); 6863 doutc(fsc->client, "%p done\n", mdsc); 6864 } 6865 6866 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 6867 { 6868 struct ceph_fs_client *fsc = mdsc->fsc; 6869 struct ceph_client *cl = fsc->client; 6870 const char *mds_namespace = fsc->mount_options->mds_namespace; 6871 void *p = msg->front.iov_base; 6872 void *end = p + msg->front.iov_len; 6873 u32 epoch; 6874 u32 num_fs; 6875 u32 mount_fscid = (u32)-1; 6876 int err = -EINVAL; 6877 6878 ceph_decode_need(&p, end, sizeof(u32), bad); 6879 epoch = ceph_decode_32(&p); 6880 6881 doutc(cl, "epoch %u\n", epoch); 6882 6883 /* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */ 6884 ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad); 6885 6886 ceph_decode_32_safe(&p, end, num_fs, bad); 6887 while (num_fs-- > 0) { 6888 void *info_p, *info_end; 6889 u32 info_len; 6890 u32 fscid, namelen; 6891 6892 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 6893 p += 2; // info_v, info_cv 6894 info_len = ceph_decode_32(&p); 6895 ceph_decode_need(&p, end, info_len, bad); 6896 info_p = p; 6897 info_end = p + info_len; 6898 p = info_end; 6899 6900 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad); 6901 fscid = ceph_decode_32(&info_p); 6902 namelen = ceph_decode_32(&info_p); 6903 ceph_decode_need(&info_p, info_end, namelen, bad); 6904 6905 if (mds_namespace && 6906 strlen(mds_namespace) == namelen && 6907 !strncmp(mds_namespace, (char *)info_p, namelen)) { 6908 mount_fscid = fscid; 6909 break; 6910 } 6911 } 6912 6913 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch); 6914 if (mount_fscid != (u32)-1) { 6915 fsc->client->monc.fs_cluster_id = mount_fscid; 6916 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 6917 0, true); 6918 ceph_monc_renew_subs(&fsc->client->monc); 6919 } else { 6920 err = -ENOENT; 6921 goto err_out; 6922 } 6923 return; 6924 6925 bad: 6926 pr_err_client(cl, "error decoding fsmap %d. Shutting down mount.\n", 6927 err); 6928 ceph_umount_begin(mdsc->fsc->sb); 6929 ceph_msg_dump(msg); 6930 err_out: 6931 mutex_lock(&mdsc->mutex); 6932 mdsc->mdsmap_err = err; 6933 __wake_requests(mdsc, &mdsc->waiting_for_map); 6934 mutex_unlock(&mdsc->mutex); 6935 } 6936 6937 /* 6938 * handle mds map update. 6939 */ 6940 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 6941 { 6942 struct ceph_client *cl = mdsc->fsc->client; 6943 u32 epoch; 6944 u32 maplen; 6945 void *p = msg->front.iov_base; 6946 void *end = p + msg->front.iov_len; 6947 struct ceph_mdsmap *newmap, *oldmap; 6948 struct ceph_fsid fsid; 6949 int err = -EINVAL; 6950 6951 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad); 6952 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 6953 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0) 6954 return; 6955 epoch = ceph_decode_32(&p); 6956 maplen = ceph_decode_32(&p); 6957 doutc(cl, "epoch %u len %d\n", epoch, (int)maplen); 6958 6959 /* do we need it? */ 6960 mutex_lock(&mdsc->mutex); 6961 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 6962 doutc(cl, "epoch %u <= our %u\n", epoch, mdsc->mdsmap->m_epoch); 6963 mutex_unlock(&mdsc->mutex); 6964 return; 6965 } 6966 6967 newmap = ceph_mdsmap_decode(mdsc, &p, end, ceph_msgr2(mdsc->fsc->client)); 6968 if (IS_ERR(newmap)) { 6969 err = PTR_ERR(newmap); 6970 goto bad_unlock; 6971 } 6972 6973 /* swap into place */ 6974 if (mdsc->mdsmap) { 6975 oldmap = mdsc->mdsmap; 6976 mdsc->mdsmap = newmap; 6977 check_new_map(mdsc, newmap, oldmap); 6978 ceph_mdsmap_destroy(oldmap); 6979 } else { 6980 mdsc->mdsmap = newmap; /* first mds map */ 6981 } 6982 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size, 6983 MAX_LFS_FILESIZE); 6984 6985 __wake_requests(mdsc, &mdsc->waiting_for_map); 6986 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP, 6987 mdsc->mdsmap->m_epoch); 6988 6989 mutex_unlock(&mdsc->mutex); 6990 schedule_delayed(mdsc, 0); 6991 return; 6992 6993 bad_unlock: 6994 mutex_unlock(&mdsc->mutex); 6995 bad: 6996 pr_err_client(cl, "error decoding mdsmap %d. Shutting down mount.\n", 6997 err); 6998 ceph_umount_begin(mdsc->fsc->sb); 6999 ceph_msg_dump(msg); 7000 return; 7001 } 7002 7003 static struct ceph_connection *mds_get_con(struct ceph_connection *con) 7004 { 7005 struct ceph_mds_session *s = con->private; 7006 7007 if (ceph_get_mds_session(s)) 7008 return con; 7009 return NULL; 7010 } 7011 7012 static void mds_put_con(struct ceph_connection *con) 7013 { 7014 struct ceph_mds_session *s = con->private; 7015 7016 ceph_put_mds_session(s); 7017 } 7018 7019 /* 7020 * if the client is unresponsive for long enough, the mds will kill 7021 * the session entirely. 7022 */ 7023 static void mds_peer_reset(struct ceph_connection *con) 7024 { 7025 struct ceph_mds_session *s = con->private; 7026 struct ceph_mds_client *mdsc = s->s_mdsc; 7027 int session_state; 7028 7029 pr_warn_client(mdsc->fsc->client, "mds%d closed our session\n", 7030 s->s_mds); 7031 7032 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO || 7033 ceph_mdsmap_get_state(mdsc->mdsmap, s->s_mds) < CEPH_MDS_STATE_RECONNECT) 7034 return; 7035 7036 /* 7037 * Only reconnect if MDS is in its RECONNECT phase. An MDS past 7038 * RECONNECT (REJOIN, CLIENTREPLAY, ACTIVE) will reject reconnect 7039 * attempts, so those states fall through to session teardown below. 7040 */ 7041 if (ceph_mdsmap_get_state(mdsc->mdsmap, s->s_mds) == CEPH_MDS_STATE_RECONNECT) { 7042 int rc = send_mds_reconnect(mdsc, s); 7043 7044 if (rc) 7045 pr_warn_client(mdsc->fsc->client, 7046 "mds%d reconnect failed: %d\n", 7047 s->s_mds, rc); 7048 return; 7049 } 7050 7051 /* 7052 * MDS is active (past RECONNECT). It will not accept a 7053 * CLIENT_RECONNECT from us, so tear the session down locally 7054 * and let new requests re-open a fresh session. 7055 * 7056 * Snapshot session state with READ_ONCE, then revalidate under 7057 * mdsc->mutex before acting. The subsequent mdsc->mutex 7058 * section rechecks s_state to catch concurrent transitions, so 7059 * the lockless snapshot here is safe. s->s_mutex is taken 7060 * separately for cleanup after unregistration, which avoids 7061 * introducing a new s->s_mutex + mdsc->mutex nesting. 7062 */ 7063 session_state = READ_ONCE(s->s_state); 7064 7065 switch (session_state) { 7066 case CEPH_MDS_SESSION_RESTARTING: 7067 case CEPH_MDS_SESSION_RECONNECTING: 7068 case CEPH_MDS_SESSION_CLOSING: 7069 case CEPH_MDS_SESSION_OPEN: 7070 case CEPH_MDS_SESSION_HUNG: 7071 case CEPH_MDS_SESSION_OPENING: 7072 mutex_lock(&mdsc->mutex); 7073 if (s->s_mds >= mdsc->max_sessions || 7074 mdsc->sessions[s->s_mds] != s || 7075 s->s_state != session_state) { 7076 pr_info_client(mdsc->fsc->client, 7077 "mds%d state changed to %s during peer reset\n", 7078 s->s_mds, 7079 ceph_session_state_name(s->s_state)); 7080 mutex_unlock(&mdsc->mutex); 7081 return; 7082 } 7083 7084 ceph_get_mds_session(s); 7085 s->s_state = CEPH_MDS_SESSION_CLOSED; 7086 __unregister_session(mdsc, s); 7087 __wake_requests(mdsc, &s->s_waiting); 7088 mutex_unlock(&mdsc->mutex); 7089 7090 mutex_lock(&s->s_mutex); 7091 cleanup_session_requests(mdsc, s); 7092 remove_session_caps(s); 7093 mutex_unlock(&s->s_mutex); 7094 7095 wake_up_all(&mdsc->session_close_wq); 7096 7097 mutex_lock(&mdsc->mutex); 7098 kick_requests(mdsc, s->s_mds); 7099 mutex_unlock(&mdsc->mutex); 7100 7101 ceph_put_mds_session(s); 7102 break; 7103 case CEPH_MDS_SESSION_CLOSED: 7104 case CEPH_MDS_SESSION_REJECTED: 7105 break; 7106 default: 7107 pr_warn_client(mdsc->fsc->client, 7108 "mds%d peer reset in unexpected state %s\n", 7109 s->s_mds, 7110 ceph_session_state_name(session_state)); 7111 break; 7112 } 7113 } 7114 7115 static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg) 7116 { 7117 struct ceph_mds_session *s = con->private; 7118 struct ceph_mds_client *mdsc = s->s_mdsc; 7119 struct ceph_client *cl = mdsc->fsc->client; 7120 int type = le16_to_cpu(msg->hdr.type); 7121 7122 mutex_lock(&mdsc->mutex); 7123 if (__verify_registered_session(mdsc, s) < 0) { 7124 doutc(cl, "dropping tid %llu from unregistered session %d\n", 7125 le64_to_cpu(msg->hdr.tid), s->s_mds); 7126 mutex_unlock(&mdsc->mutex); 7127 goto out; 7128 } 7129 mutex_unlock(&mdsc->mutex); 7130 7131 switch (type) { 7132 case CEPH_MSG_MDS_MAP: 7133 ceph_mdsc_handle_mdsmap(mdsc, msg); 7134 break; 7135 case CEPH_MSG_FS_MAP_USER: 7136 ceph_mdsc_handle_fsmap(mdsc, msg); 7137 break; 7138 case CEPH_MSG_CLIENT_SESSION: 7139 handle_session(s, msg); 7140 break; 7141 case CEPH_MSG_CLIENT_REPLY: 7142 handle_reply(s, msg); 7143 break; 7144 case CEPH_MSG_CLIENT_REQUEST_FORWARD: 7145 handle_forward(mdsc, s, msg); 7146 break; 7147 case CEPH_MSG_CLIENT_CAPS: 7148 ceph_handle_caps(s, msg); 7149 break; 7150 case CEPH_MSG_CLIENT_SNAP: 7151 ceph_handle_snap(mdsc, s, msg); 7152 break; 7153 case CEPH_MSG_CLIENT_LEASE: 7154 handle_lease(mdsc, s, msg); 7155 break; 7156 case CEPH_MSG_CLIENT_QUOTA: 7157 ceph_handle_quota(mdsc, s, msg); 7158 break; 7159 7160 default: 7161 pr_err_client(cl, "received unknown message type %d %s\n", 7162 type, ceph_msg_type_name(type)); 7163 } 7164 out: 7165 ceph_msg_put(msg); 7166 } 7167 7168 /* 7169 * authentication 7170 */ 7171 7172 /* 7173 * Note: returned pointer is the address of a structure that's 7174 * managed separately. Caller must *not* attempt to free it. 7175 */ 7176 static struct ceph_auth_handshake * 7177 mds_get_authorizer(struct ceph_connection *con, int *proto, int force_new) 7178 { 7179 struct ceph_mds_session *s = con->private; 7180 struct ceph_mds_client *mdsc = s->s_mdsc; 7181 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 7182 struct ceph_auth_handshake *auth = &s->s_auth; 7183 int ret; 7184 7185 ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS, 7186 force_new, proto, NULL, NULL); 7187 if (ret) 7188 return ERR_PTR(ret); 7189 7190 return auth; 7191 } 7192 7193 static int mds_add_authorizer_challenge(struct ceph_connection *con, 7194 void *challenge_buf, int challenge_buf_len) 7195 { 7196 struct ceph_mds_session *s = con->private; 7197 struct ceph_mds_client *mdsc = s->s_mdsc; 7198 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 7199 7200 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer, 7201 challenge_buf, challenge_buf_len); 7202 } 7203 7204 static int mds_verify_authorizer_reply(struct ceph_connection *con) 7205 { 7206 struct ceph_mds_session *s = con->private; 7207 struct ceph_mds_client *mdsc = s->s_mdsc; 7208 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 7209 struct ceph_auth_handshake *auth = &s->s_auth; 7210 7211 return ceph_auth_verify_authorizer_reply(ac, auth->authorizer, 7212 auth->authorizer_reply_buf, auth->authorizer_reply_buf_len, 7213 NULL, NULL, NULL, NULL); 7214 } 7215 7216 static int mds_invalidate_authorizer(struct ceph_connection *con) 7217 { 7218 struct ceph_mds_session *s = con->private; 7219 struct ceph_mds_client *mdsc = s->s_mdsc; 7220 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 7221 7222 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS); 7223 7224 return ceph_monc_validate_auth(&mdsc->fsc->client->monc); 7225 } 7226 7227 static int mds_get_auth_request(struct ceph_connection *con, 7228 void *buf, int *buf_len, 7229 void **authorizer, int *authorizer_len) 7230 { 7231 struct ceph_mds_session *s = con->private; 7232 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 7233 struct ceph_auth_handshake *auth = &s->s_auth; 7234 int ret; 7235 7236 ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS, 7237 buf, buf_len); 7238 if (ret) 7239 return ret; 7240 7241 *authorizer = auth->authorizer_buf; 7242 *authorizer_len = auth->authorizer_buf_len; 7243 return 0; 7244 } 7245 7246 static int mds_handle_auth_reply_more(struct ceph_connection *con, 7247 void *reply, int reply_len, 7248 void *buf, int *buf_len, 7249 void **authorizer, int *authorizer_len) 7250 { 7251 struct ceph_mds_session *s = con->private; 7252 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 7253 struct ceph_auth_handshake *auth = &s->s_auth; 7254 int ret; 7255 7256 ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len, 7257 buf, buf_len); 7258 if (ret) 7259 return ret; 7260 7261 *authorizer = auth->authorizer_buf; 7262 *authorizer_len = auth->authorizer_buf_len; 7263 return 0; 7264 } 7265 7266 static int mds_handle_auth_done(struct ceph_connection *con, 7267 u64 global_id, void *reply, int reply_len, 7268 u8 *session_key, int *session_key_len, 7269 u8 *con_secret, int *con_secret_len) 7270 { 7271 struct ceph_mds_session *s = con->private; 7272 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 7273 struct ceph_auth_handshake *auth = &s->s_auth; 7274 7275 return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len, 7276 session_key, session_key_len, 7277 con_secret, con_secret_len); 7278 } 7279 7280 static int mds_handle_auth_bad_method(struct ceph_connection *con, 7281 int used_proto, int result, 7282 const int *allowed_protos, int proto_cnt, 7283 const int *allowed_modes, int mode_cnt) 7284 { 7285 struct ceph_mds_session *s = con->private; 7286 struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc; 7287 int ret; 7288 7289 if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS, 7290 used_proto, result, 7291 allowed_protos, proto_cnt, 7292 allowed_modes, mode_cnt)) { 7293 ret = ceph_monc_validate_auth(monc); 7294 if (ret) 7295 return ret; 7296 } 7297 7298 return -EACCES; 7299 } 7300 7301 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con, 7302 struct ceph_msg_header *hdr, int *skip) 7303 { 7304 struct ceph_msg *msg; 7305 int type = (int) le16_to_cpu(hdr->type); 7306 int front_len = (int) le32_to_cpu(hdr->front_len); 7307 7308 if (con->in_msg) 7309 return con->in_msg; 7310 7311 *skip = 0; 7312 msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 7313 if (!msg) { 7314 pr_err("unable to allocate msg type %d len %d\n", 7315 type, front_len); 7316 return NULL; 7317 } 7318 7319 return msg; 7320 } 7321 7322 static int mds_sign_message(struct ceph_msg *msg) 7323 { 7324 struct ceph_mds_session *s = msg->con->private; 7325 struct ceph_auth_handshake *auth = &s->s_auth; 7326 7327 return ceph_auth_sign_message(auth, msg); 7328 } 7329 7330 static int mds_check_message_signature(struct ceph_msg *msg) 7331 { 7332 struct ceph_mds_session *s = msg->con->private; 7333 struct ceph_auth_handshake *auth = &s->s_auth; 7334 7335 return ceph_auth_check_message_signature(auth, msg); 7336 } 7337 7338 static const struct ceph_connection_operations mds_con_ops = { 7339 .get = mds_get_con, 7340 .put = mds_put_con, 7341 .alloc_msg = mds_alloc_msg, 7342 .dispatch = mds_dispatch, 7343 .peer_reset = mds_peer_reset, 7344 .get_authorizer = mds_get_authorizer, 7345 .add_authorizer_challenge = mds_add_authorizer_challenge, 7346 .verify_authorizer_reply = mds_verify_authorizer_reply, 7347 .invalidate_authorizer = mds_invalidate_authorizer, 7348 .sign_message = mds_sign_message, 7349 .check_message_signature = mds_check_message_signature, 7350 .get_auth_request = mds_get_auth_request, 7351 .handle_auth_reply_more = mds_handle_auth_reply_more, 7352 .handle_auth_done = mds_handle_auth_done, 7353 .handle_auth_bad_method = mds_handle_auth_bad_method, 7354 }; 7355 7356 /* eof */ 7357