1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/fs.h> 5 #include <linux/wait.h> 6 #include <linux/slab.h> 7 #include <linux/gfp.h> 8 #include <linux/sched.h> 9 #include <linux/debugfs.h> 10 #include <linux/seq_file.h> 11 #include <linux/ratelimit.h> 12 #include <linux/bits.h> 13 #include <linux/ktime.h> 14 #include <linux/bitmap.h> 15 #include <linux/mnt_idmapping.h> 16 17 #include "super.h" 18 #include "mds_client.h" 19 #include "crypto.h" 20 21 #include <linux/ceph/ceph_features.h> 22 #include <linux/ceph/messenger.h> 23 #include <linux/ceph/decode.h> 24 #include <linux/ceph/pagelist.h> 25 #include <linux/ceph/auth.h> 26 #include <linux/ceph/debugfs.h> 27 #include <trace/events/ceph.h> 28 29 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE) 30 31 /* 32 * A cluster of MDS (metadata server) daemons is responsible for 33 * managing the file system namespace (the directory hierarchy and 34 * inodes) and for coordinating shared access to storage. Metadata is 35 * partitioning hierarchically across a number of servers, and that 36 * partition varies over time as the cluster adjusts the distribution 37 * in order to balance load. 38 * 39 * The MDS client is primarily responsible to managing synchronous 40 * metadata requests for operations like open, unlink, and so forth. 41 * If there is a MDS failure, we find out about it when we (possibly 42 * request and) receive a new MDS map, and can resubmit affected 43 * requests. 44 * 45 * For the most part, though, we take advantage of a lossless 46 * communications channel to the MDS, and do not need to worry about 47 * timing out or resubmitting requests. 48 * 49 * We maintain a stateful "session" with each MDS we interact with. 50 * Within each session, we sent periodic heartbeat messages to ensure 51 * any capabilities or leases we have been issues remain valid. If 52 * the session times out and goes stale, our leases and capabilities 53 * are no longer valid. 54 */ 55 56 struct ceph_reconnect_state { 57 struct ceph_mds_session *session; 58 int nr_caps, nr_realms; 59 struct ceph_pagelist *pagelist; 60 unsigned msg_version; 61 bool allow_multi; 62 }; 63 64 static void __wake_requests(struct ceph_mds_client *mdsc, 65 struct list_head *head); 66 static void ceph_cap_release_work(struct work_struct *work); 67 static void ceph_cap_reclaim_work(struct work_struct *work); 68 69 static const struct ceph_connection_operations mds_con_ops; 70 71 static void ceph_metric_bind_session(struct ceph_mds_client *mdsc, 72 struct ceph_mds_session *session) 73 { 74 struct ceph_mds_session *old; 75 76 if (!mdsc || !session || disable_send_metrics) 77 return; 78 79 old = mdsc->metric.session; 80 mdsc->metric.session = ceph_get_mds_session(session); 81 if (old) 82 ceph_put_mds_session(old); 83 84 metric_schedule_delayed(&mdsc->metric); 85 } 86 87 /* 88 * mds reply parsing 89 */ 90 91 static int parse_reply_info_quota(void **p, void *end, 92 struct ceph_mds_reply_info_in *info) 93 { 94 u8 struct_v, struct_compat; 95 u32 struct_len; 96 97 ceph_decode_8_safe(p, end, struct_v, bad); 98 ceph_decode_8_safe(p, end, struct_compat, bad); 99 /* struct_v is expected to be >= 1. we only 100 * understand encoding with struct_compat == 1. */ 101 if (!struct_v || struct_compat != 1) 102 goto bad; 103 ceph_decode_32_safe(p, end, struct_len, bad); 104 ceph_decode_need(p, end, struct_len, bad); 105 end = *p + struct_len; 106 ceph_decode_64_safe(p, end, info->max_bytes, bad); 107 ceph_decode_64_safe(p, end, info->max_files, bad); 108 *p = end; 109 return 0; 110 bad: 111 return -EIO; 112 } 113 114 static int parse_reply_info_in(void **p, void *end, 115 struct ceph_mds_reply_info_in *info, 116 u64 features, 117 struct ceph_mds_client *mdsc) 118 { 119 int err = 0; 120 u8 struct_v = 0; 121 u8 struct_compat = 0; 122 u32 struct_len = 0; 123 124 info->subvolume_id = CEPH_SUBVOLUME_ID_NONE; 125 126 if (features == (u64)-1) { 127 ceph_decode_8_safe(p, end, struct_v, bad); 128 ceph_decode_8_safe(p, end, struct_compat, bad); 129 /* struct_v is expected to be >= 1. we only understand 130 * encoding with struct_compat == 1. */ 131 if (!struct_v || struct_compat != 1) 132 goto bad; 133 ceph_decode_32_safe(p, end, struct_len, bad); 134 ceph_decode_need(p, end, struct_len, bad); 135 end = *p + struct_len; 136 } 137 138 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad); 139 info->in = *p; 140 *p += sizeof(struct ceph_mds_reply_inode) + 141 sizeof(*info->in->fragtree.splits) * 142 le32_to_cpu(info->in->fragtree.nsplits); 143 144 ceph_decode_32_safe(p, end, info->symlink_len, bad); 145 ceph_decode_need(p, end, info->symlink_len, bad); 146 info->symlink = *p; 147 *p += info->symlink_len; 148 149 ceph_decode_copy_safe(p, end, &info->dir_layout, 150 sizeof(info->dir_layout), bad); 151 ceph_decode_32_safe(p, end, info->xattr_len, bad); 152 ceph_decode_need(p, end, info->xattr_len, bad); 153 info->xattr_data = *p; 154 *p += info->xattr_len; 155 156 if (features == (u64)-1) { 157 /* inline data */ 158 ceph_decode_64_safe(p, end, info->inline_version, bad); 159 ceph_decode_32_safe(p, end, info->inline_len, bad); 160 ceph_decode_need(p, end, info->inline_len, bad); 161 info->inline_data = *p; 162 *p += info->inline_len; 163 /* quota */ 164 err = parse_reply_info_quota(p, end, info); 165 if (err < 0) 166 goto out_bad; 167 /* pool namespace */ 168 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 169 if (info->pool_ns_len > 0) { 170 ceph_decode_need(p, end, info->pool_ns_len, bad); 171 info->pool_ns_data = *p; 172 *p += info->pool_ns_len; 173 } 174 175 /* btime */ 176 ceph_decode_need(p, end, sizeof(info->btime), bad); 177 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 178 179 /* change attribute */ 180 ceph_decode_64_safe(p, end, info->change_attr, bad); 181 182 /* dir pin */ 183 if (struct_v >= 2) { 184 ceph_decode_32_safe(p, end, info->dir_pin, bad); 185 } else { 186 info->dir_pin = -ENODATA; 187 } 188 189 /* snapshot birth time, remains zero for v<=2 */ 190 if (struct_v >= 3) { 191 ceph_decode_need(p, end, sizeof(info->snap_btime), bad); 192 ceph_decode_copy(p, &info->snap_btime, 193 sizeof(info->snap_btime)); 194 } else { 195 memset(&info->snap_btime, 0, sizeof(info->snap_btime)); 196 } 197 198 /* snapshot count, remains zero for v<=3 */ 199 if (struct_v >= 4) { 200 ceph_decode_64_safe(p, end, info->rsnaps, bad); 201 } else { 202 info->rsnaps = 0; 203 } 204 205 if (struct_v >= 5) { 206 u32 alen; 207 208 ceph_decode_32_safe(p, end, alen, bad); 209 210 while (alen--) { 211 u32 len; 212 213 /* key */ 214 ceph_decode_32_safe(p, end, len, bad); 215 ceph_decode_skip_n(p, end, len, bad); 216 /* value */ 217 ceph_decode_32_safe(p, end, len, bad); 218 ceph_decode_skip_n(p, end, len, bad); 219 } 220 } 221 222 /* fscrypt flag -- ignore */ 223 if (struct_v >= 6) 224 ceph_decode_skip_8(p, end, bad); 225 226 info->fscrypt_auth = NULL; 227 info->fscrypt_auth_len = 0; 228 info->fscrypt_file = NULL; 229 info->fscrypt_file_len = 0; 230 if (struct_v >= 7) { 231 ceph_decode_32_safe(p, end, info->fscrypt_auth_len, bad); 232 if (info->fscrypt_auth_len) { 233 info->fscrypt_auth = kmalloc(info->fscrypt_auth_len, 234 GFP_KERNEL); 235 if (!info->fscrypt_auth) 236 return -ENOMEM; 237 ceph_decode_copy_safe(p, end, info->fscrypt_auth, 238 info->fscrypt_auth_len, bad); 239 } 240 ceph_decode_32_safe(p, end, info->fscrypt_file_len, bad); 241 if (info->fscrypt_file_len) { 242 info->fscrypt_file = kmalloc(info->fscrypt_file_len, 243 GFP_KERNEL); 244 if (!info->fscrypt_file) 245 return -ENOMEM; 246 ceph_decode_copy_safe(p, end, info->fscrypt_file, 247 info->fscrypt_file_len, bad); 248 } 249 } 250 251 /* 252 * InodeStat encoding versions: 253 * v1-v7: various fields added over time 254 * v8: added optmetadata (versioned sub-structure containing 255 * optional inode metadata like charmap for case-insensitive 256 * filesystems). The kernel client doesn't support 257 * case-insensitive lookups, so we skip this field. 258 * v9: added subvolume_id (parsed below) 259 */ 260 if (struct_v >= 8) { 261 u32 v8_struct_len; 262 263 /* skip optmetadata versioned sub-structure */ 264 ceph_decode_skip_8(p, end, bad); /* struct_v */ 265 ceph_decode_skip_8(p, end, bad); /* struct_compat */ 266 ceph_decode_32_safe(p, end, v8_struct_len, bad); 267 ceph_decode_skip_n(p, end, v8_struct_len, bad); 268 } 269 270 /* struct_v 9 added subvolume_id */ 271 if (struct_v >= 9) 272 ceph_decode_64_safe(p, end, info->subvolume_id, bad); 273 274 *p = end; 275 } else { 276 /* legacy (unversioned) struct */ 277 if (features & CEPH_FEATURE_MDS_INLINE_DATA) { 278 ceph_decode_64_safe(p, end, info->inline_version, bad); 279 ceph_decode_32_safe(p, end, info->inline_len, bad); 280 ceph_decode_need(p, end, info->inline_len, bad); 281 info->inline_data = *p; 282 *p += info->inline_len; 283 } else 284 info->inline_version = CEPH_INLINE_NONE; 285 286 if (features & CEPH_FEATURE_MDS_QUOTA) { 287 err = parse_reply_info_quota(p, end, info); 288 if (err < 0) 289 goto out_bad; 290 } else { 291 info->max_bytes = 0; 292 info->max_files = 0; 293 } 294 295 info->pool_ns_len = 0; 296 info->pool_ns_data = NULL; 297 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { 298 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 299 if (info->pool_ns_len > 0) { 300 ceph_decode_need(p, end, info->pool_ns_len, bad); 301 info->pool_ns_data = *p; 302 *p += info->pool_ns_len; 303 } 304 } 305 306 if (features & CEPH_FEATURE_FS_BTIME) { 307 ceph_decode_need(p, end, sizeof(info->btime), bad); 308 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 309 ceph_decode_64_safe(p, end, info->change_attr, bad); 310 } 311 312 info->dir_pin = -ENODATA; 313 /* info->snap_btime and info->rsnaps remain zero */ 314 } 315 return 0; 316 bad: 317 err = -EIO; 318 out_bad: 319 return err; 320 } 321 322 static int parse_reply_info_dir(void **p, void *end, 323 struct ceph_mds_reply_dirfrag **dirfrag, 324 u64 features) 325 { 326 if (features == (u64)-1) { 327 u8 struct_v, struct_compat; 328 u32 struct_len; 329 ceph_decode_8_safe(p, end, struct_v, bad); 330 ceph_decode_8_safe(p, end, struct_compat, bad); 331 /* struct_v is expected to be >= 1. we only understand 332 * encoding whose struct_compat == 1. */ 333 if (!struct_v || struct_compat != 1) 334 goto bad; 335 ceph_decode_32_safe(p, end, struct_len, bad); 336 ceph_decode_need(p, end, struct_len, bad); 337 end = *p + struct_len; 338 } 339 340 ceph_decode_need(p, end, sizeof(**dirfrag), bad); 341 *dirfrag = *p; 342 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist); 343 if (unlikely(*p > end)) 344 goto bad; 345 if (features == (u64)-1) 346 *p = end; 347 return 0; 348 bad: 349 return -EIO; 350 } 351 352 static int parse_reply_info_lease(void **p, void *end, 353 struct ceph_mds_reply_lease **lease, 354 u64 features, u32 *altname_len, u8 **altname) 355 { 356 u8 struct_v; 357 u32 struct_len; 358 void *lend; 359 360 if (features == (u64)-1) { 361 u8 struct_compat; 362 363 ceph_decode_8_safe(p, end, struct_v, bad); 364 ceph_decode_8_safe(p, end, struct_compat, bad); 365 366 /* struct_v is expected to be >= 1. we only understand 367 * encoding whose struct_compat == 1. */ 368 if (!struct_v || struct_compat != 1) 369 goto bad; 370 371 ceph_decode_32_safe(p, end, struct_len, bad); 372 } else { 373 struct_len = sizeof(**lease); 374 *altname_len = 0; 375 *altname = NULL; 376 } 377 378 lend = *p + struct_len; 379 ceph_decode_need(p, end, struct_len, bad); 380 *lease = *p; 381 *p += sizeof(**lease); 382 383 if (features == (u64)-1) { 384 if (struct_v >= 2) { 385 ceph_decode_32_safe(p, end, *altname_len, bad); 386 ceph_decode_need(p, end, *altname_len, bad); 387 *altname = *p; 388 *p += *altname_len; 389 } else { 390 *altname = NULL; 391 *altname_len = 0; 392 } 393 } 394 *p = lend; 395 return 0; 396 bad: 397 return -EIO; 398 } 399 400 /* 401 * parse a normal reply, which may contain a (dir+)dentry and/or a 402 * target inode. 403 */ 404 static int parse_reply_info_trace(void **p, void *end, 405 struct ceph_mds_reply_info_parsed *info, 406 u64 features, 407 struct ceph_mds_client *mdsc) 408 { 409 int err; 410 411 if (info->head->is_dentry) { 412 err = parse_reply_info_in(p, end, &info->diri, features, mdsc); 413 if (err < 0) 414 goto out_bad; 415 416 err = parse_reply_info_dir(p, end, &info->dirfrag, features); 417 if (err < 0) 418 goto out_bad; 419 420 ceph_decode_32_safe(p, end, info->dname_len, bad); 421 ceph_decode_need(p, end, info->dname_len, bad); 422 info->dname = *p; 423 *p += info->dname_len; 424 425 err = parse_reply_info_lease(p, end, &info->dlease, features, 426 &info->altname_len, &info->altname); 427 if (err < 0) 428 goto out_bad; 429 } 430 431 if (info->head->is_target) { 432 err = parse_reply_info_in(p, end, &info->targeti, features, 433 mdsc); 434 if (err < 0) 435 goto out_bad; 436 } 437 438 if (unlikely(*p != end)) 439 goto bad; 440 return 0; 441 442 bad: 443 err = -EIO; 444 out_bad: 445 pr_err("problem parsing mds trace %d\n", err); 446 return err; 447 } 448 449 /* 450 * parse readdir results 451 */ 452 static int parse_reply_info_readdir(void **p, void *end, 453 struct ceph_mds_request *req, 454 u64 features, 455 struct ceph_mds_client *mdsc) 456 { 457 struct ceph_mds_reply_info_parsed *info = &req->r_reply_info; 458 struct ceph_client *cl = req->r_mdsc->fsc->client; 459 u32 num, i = 0; 460 int err; 461 462 err = parse_reply_info_dir(p, end, &info->dir_dir, features); 463 if (err < 0) 464 goto out_bad; 465 466 ceph_decode_need(p, end, sizeof(num) + 2, bad); 467 num = ceph_decode_32(p); 468 { 469 u16 flags = ceph_decode_16(p); 470 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END); 471 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE); 472 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER); 473 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH); 474 } 475 if (num == 0) 476 goto done; 477 478 BUG_ON(!info->dir_entries); 479 if ((unsigned long)(info->dir_entries + num) > 480 (unsigned long)info->dir_entries + info->dir_buf_size) { 481 pr_err_client(cl, "dir contents are larger than expected\n"); 482 WARN_ON(1); 483 goto bad; 484 } 485 486 info->dir_nr = num; 487 while (num) { 488 struct inode *inode = d_inode(req->r_dentry); 489 struct ceph_inode_info *ci = ceph_inode(inode); 490 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 491 struct fscrypt_str tname = FSTR_INIT(NULL, 0); 492 struct fscrypt_str oname = FSTR_INIT(NULL, 0); 493 struct ceph_fname fname; 494 u32 altname_len, _name_len; 495 u8 *altname, *_name; 496 497 /* dentry */ 498 ceph_decode_32_safe(p, end, _name_len, bad); 499 ceph_decode_need(p, end, _name_len, bad); 500 _name = *p; 501 *p += _name_len; 502 doutc(cl, "parsed dir dname '%.*s'\n", _name_len, _name); 503 504 if (info->hash_order) 505 rde->raw_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash, 506 _name, _name_len); 507 508 /* dentry lease */ 509 err = parse_reply_info_lease(p, end, &rde->lease, features, 510 &altname_len, &altname); 511 if (err) 512 goto out_bad; 513 514 /* 515 * Try to dencrypt the dentry names and update them 516 * in the ceph_mds_reply_dir_entry struct. 517 */ 518 fname.dir = inode; 519 fname.name = _name; 520 fname.name_len = _name_len; 521 fname.ctext = altname; 522 fname.ctext_len = altname_len; 523 /* 524 * The _name_len maybe larger than altname_len, such as 525 * when the human readable name length is in range of 526 * (CEPH_NOHASH_NAME_MAX, CEPH_NOHASH_NAME_MAX + SHA256_DIGEST_SIZE), 527 * then the copy in ceph_fname_to_usr will corrupt the 528 * data if there has no encryption key. 529 * 530 * Just set the no_copy flag and then if there has no 531 * encryption key the oname.name will be assigned to 532 * _name always. 533 */ 534 fname.no_copy = true; 535 if (altname_len == 0) { 536 /* 537 * Set tname to _name, and this will be used 538 * to do the base64_decode in-place. It's 539 * safe because the decoded string should 540 * always be shorter, which is 3/4 of origin 541 * string. 542 */ 543 tname.name = _name; 544 545 /* 546 * Set oname to _name too, and this will be 547 * used to do the dencryption in-place. 548 */ 549 oname.name = _name; 550 oname.len = _name_len; 551 } else { 552 /* 553 * This will do the decryption only in-place 554 * from altname cryptext directly. 555 */ 556 oname.name = altname; 557 oname.len = altname_len; 558 } 559 rde->is_nokey = false; 560 err = ceph_fname_to_usr(&fname, &tname, &oname, &rde->is_nokey); 561 if (err) { 562 pr_err_client(cl, "unable to decode %.*s, got %d\n", 563 _name_len, _name, err); 564 goto out_bad; 565 } 566 rde->name = oname.name; 567 rde->name_len = oname.len; 568 569 /* inode */ 570 err = parse_reply_info_in(p, end, &rde->inode, features, mdsc); 571 if (err < 0) 572 goto out_bad; 573 /* ceph_readdir_prepopulate() will update it */ 574 rde->offset = 0; 575 i++; 576 num--; 577 } 578 579 done: 580 /* Skip over any unrecognized fields */ 581 *p = end; 582 return 0; 583 584 bad: 585 err = -EIO; 586 out_bad: 587 pr_err_client(cl, "problem parsing dir contents %d\n", err); 588 return err; 589 } 590 591 /* 592 * parse fcntl F_GETLK results 593 */ 594 static int parse_reply_info_filelock(void **p, void *end, 595 struct ceph_mds_reply_info_parsed *info, 596 u64 features) 597 { 598 if (*p + sizeof(*info->filelock_reply) > end) 599 goto bad; 600 601 info->filelock_reply = *p; 602 603 /* Skip over any unrecognized fields */ 604 *p = end; 605 return 0; 606 bad: 607 return -EIO; 608 } 609 610 611 #if BITS_PER_LONG == 64 612 613 #define DELEGATED_INO_AVAILABLE xa_mk_value(1) 614 615 static int ceph_parse_deleg_inos(void **p, void *end, 616 struct ceph_mds_session *s) 617 { 618 struct ceph_client *cl = s->s_mdsc->fsc->client; 619 u32 sets; 620 621 ceph_decode_32_safe(p, end, sets, bad); 622 doutc(cl, "got %u sets of delegated inodes\n", sets); 623 while (sets--) { 624 u64 start, len; 625 626 ceph_decode_64_safe(p, end, start, bad); 627 ceph_decode_64_safe(p, end, len, bad); 628 629 /* Don't accept a delegation of system inodes */ 630 if (start < CEPH_INO_SYSTEM_BASE) { 631 pr_warn_ratelimited_client(cl, 632 "ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n", 633 start, len); 634 continue; 635 } 636 while (len--) { 637 int err = xa_insert(&s->s_delegated_inos, start++, 638 DELEGATED_INO_AVAILABLE, 639 GFP_KERNEL); 640 if (!err) { 641 doutc(cl, "added delegated inode 0x%llx\n", start - 1); 642 } else if (err == -EBUSY) { 643 pr_warn_client(cl, 644 "MDS delegated inode 0x%llx more than once.\n", 645 start - 1); 646 } else { 647 return err; 648 } 649 } 650 } 651 return 0; 652 bad: 653 return -EIO; 654 } 655 656 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 657 { 658 unsigned long ino; 659 void *val; 660 661 xa_for_each(&s->s_delegated_inos, ino, val) { 662 val = xa_erase(&s->s_delegated_inos, ino); 663 if (val == DELEGATED_INO_AVAILABLE) 664 return ino; 665 } 666 return 0; 667 } 668 669 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 670 { 671 return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE, 672 GFP_KERNEL); 673 } 674 #else /* BITS_PER_LONG == 64 */ 675 /* 676 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just 677 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top 678 * and bottom words? 679 */ 680 static int ceph_parse_deleg_inos(void **p, void *end, 681 struct ceph_mds_session *s) 682 { 683 u32 sets; 684 685 ceph_decode_32_safe(p, end, sets, bad); 686 if (sets) 687 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad); 688 return 0; 689 bad: 690 return -EIO; 691 } 692 693 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 694 { 695 return 0; 696 } 697 698 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 699 { 700 return 0; 701 } 702 #endif /* BITS_PER_LONG == 64 */ 703 704 /* 705 * parse create results 706 */ 707 static int parse_reply_info_create(void **p, void *end, 708 struct ceph_mds_reply_info_parsed *info, 709 u64 features, struct ceph_mds_session *s) 710 { 711 int ret; 712 713 if (features == (u64)-1 || 714 (features & CEPH_FEATURE_REPLY_CREATE_INODE)) { 715 if (*p == end) { 716 /* Malformed reply? */ 717 info->has_create_ino = false; 718 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) { 719 info->has_create_ino = true; 720 /* struct_v, struct_compat, and len */ 721 ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad); 722 ceph_decode_64_safe(p, end, info->ino, bad); 723 ret = ceph_parse_deleg_inos(p, end, s); 724 if (ret) 725 return ret; 726 } else { 727 /* legacy */ 728 ceph_decode_64_safe(p, end, info->ino, bad); 729 info->has_create_ino = true; 730 } 731 } else { 732 if (*p != end) 733 goto bad; 734 } 735 736 /* Skip over any unrecognized fields */ 737 *p = end; 738 return 0; 739 bad: 740 return -EIO; 741 } 742 743 static int parse_reply_info_getvxattr(void **p, void *end, 744 struct ceph_mds_reply_info_parsed *info, 745 u64 features) 746 { 747 u32 value_len; 748 749 ceph_decode_skip_8(p, end, bad); /* skip current version: 1 */ 750 ceph_decode_skip_8(p, end, bad); /* skip first version: 1 */ 751 ceph_decode_skip_32(p, end, bad); /* skip payload length */ 752 753 ceph_decode_32_safe(p, end, value_len, bad); 754 755 if (value_len == end - *p) { 756 info->xattr_info.xattr_value = *p; 757 info->xattr_info.xattr_value_len = value_len; 758 *p = end; 759 return value_len; 760 } 761 bad: 762 return -EIO; 763 } 764 765 /* 766 * parse extra results 767 */ 768 static int parse_reply_info_extra(void **p, void *end, 769 struct ceph_mds_request *req, 770 u64 features, struct ceph_mds_session *s) 771 { 772 struct ceph_mds_reply_info_parsed *info = &req->r_reply_info; 773 u32 op = le32_to_cpu(info->head->op); 774 775 if (op == CEPH_MDS_OP_GETFILELOCK) 776 return parse_reply_info_filelock(p, end, info, features); 777 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP) 778 return parse_reply_info_readdir(p, end, req, features, 779 req->r_mdsc); 780 else if (op == CEPH_MDS_OP_CREATE) 781 return parse_reply_info_create(p, end, info, features, s); 782 else if (op == CEPH_MDS_OP_GETVXATTR) 783 return parse_reply_info_getvxattr(p, end, info, features); 784 else 785 return -EIO; 786 } 787 788 /* 789 * parse entire mds reply 790 */ 791 static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg, 792 struct ceph_mds_request *req, u64 features) 793 { 794 struct ceph_mds_reply_info_parsed *info = &req->r_reply_info; 795 struct ceph_client *cl = s->s_mdsc->fsc->client; 796 void *p, *end; 797 u32 len; 798 int err; 799 800 info->head = msg->front.iov_base; 801 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); 802 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head); 803 804 /* trace */ 805 ceph_decode_32_safe(&p, end, len, bad); 806 if (len > 0) { 807 ceph_decode_need(&p, end, len, bad); 808 err = parse_reply_info_trace(&p, p + len, info, features, 809 s->s_mdsc); 810 if (err < 0) 811 goto out_bad; 812 } 813 814 /* extra */ 815 ceph_decode_32_safe(&p, end, len, bad); 816 if (len > 0) { 817 ceph_decode_need(&p, end, len, bad); 818 err = parse_reply_info_extra(&p, p + len, req, features, s); 819 if (err < 0) 820 goto out_bad; 821 } 822 823 /* snap blob */ 824 ceph_decode_32_safe(&p, end, len, bad); 825 info->snapblob_len = len; 826 info->snapblob = p; 827 p += len; 828 829 if (p != end) 830 goto bad; 831 return 0; 832 833 bad: 834 err = -EIO; 835 out_bad: 836 pr_err_client(cl, "mds parse_reply err %d\n", err); 837 ceph_msg_dump(msg); 838 return err; 839 } 840 841 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 842 { 843 int i; 844 845 kfree(info->diri.fscrypt_auth); 846 kfree(info->diri.fscrypt_file); 847 kfree(info->targeti.fscrypt_auth); 848 kfree(info->targeti.fscrypt_file); 849 if (!info->dir_entries) 850 return; 851 852 for (i = 0; i < info->dir_nr; i++) { 853 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 854 855 kfree(rde->inode.fscrypt_auth); 856 kfree(rde->inode.fscrypt_file); 857 } 858 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size)); 859 } 860 861 /* 862 * In async unlink case the kclient won't wait for the first reply 863 * from MDS and just drop all the links and unhash the dentry and then 864 * succeeds immediately. 865 * 866 * For any new create/link/rename,etc requests followed by using the 867 * same file names we must wait for the first reply of the inflight 868 * unlink request, or the MDS possibly will fail these following 869 * requests with -EEXIST if the inflight async unlink request was 870 * delayed for some reasons. 871 * 872 * And the worst case is that for the none async openc request it will 873 * successfully open the file if the CDentry hasn't been unlinked yet, 874 * but later the previous delayed async unlink request will remove the 875 * CDentry. That means the just created file is possibly deleted later 876 * by accident. 877 * 878 * We need to wait for the inflight async unlink requests to finish 879 * when creating new files/directories by using the same file names. 880 */ 881 int ceph_wait_on_conflict_unlink(struct dentry *dentry) 882 { 883 struct ceph_fs_client *fsc = ceph_sb_to_fs_client(dentry->d_sb); 884 struct ceph_client *cl = fsc->client; 885 struct dentry *pdentry = dentry->d_parent; 886 struct dentry *udentry, *found = NULL; 887 struct ceph_dentry_info *di; 888 struct qstr dname; 889 u32 hash = dentry->d_name.hash; 890 int err; 891 892 dname.name = dentry->d_name.name; 893 dname.len = dentry->d_name.len; 894 895 rcu_read_lock(); 896 hash_for_each_possible_rcu(fsc->async_unlink_conflict, di, 897 hnode, hash) { 898 udentry = di->dentry; 899 900 spin_lock(&udentry->d_lock); 901 if (udentry->d_name.hash != hash) 902 goto next; 903 if (unlikely(udentry->d_parent != pdentry)) 904 goto next; 905 if (!hash_hashed(&di->hnode)) 906 goto next; 907 908 if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags)) 909 pr_warn_client(cl, "dentry %p:%pd async unlink bit is not set\n", 910 dentry, dentry); 911 912 if (!d_same_name(udentry, pdentry, &dname)) 913 goto next; 914 915 found = dget_dlock(udentry); 916 spin_unlock(&udentry->d_lock); 917 break; 918 next: 919 spin_unlock(&udentry->d_lock); 920 } 921 rcu_read_unlock(); 922 923 if (likely(!found)) 924 return 0; 925 926 doutc(cl, "dentry %p:%pd conflict with old %p:%pd\n", dentry, dentry, 927 found, found); 928 929 err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT, 930 TASK_KILLABLE); 931 dput(found); 932 return err; 933 } 934 935 936 /* 937 * sessions 938 */ 939 const char *ceph_session_state_name(int s) 940 { 941 switch (s) { 942 case CEPH_MDS_SESSION_NEW: return "new"; 943 case CEPH_MDS_SESSION_OPENING: return "opening"; 944 case CEPH_MDS_SESSION_OPEN: return "open"; 945 case CEPH_MDS_SESSION_HUNG: return "hung"; 946 case CEPH_MDS_SESSION_CLOSING: return "closing"; 947 case CEPH_MDS_SESSION_CLOSED: return "closed"; 948 case CEPH_MDS_SESSION_RESTARTING: return "restarting"; 949 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; 950 case CEPH_MDS_SESSION_REJECTED: return "rejected"; 951 default: return "???"; 952 } 953 } 954 955 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s) 956 { 957 if (refcount_inc_not_zero(&s->s_ref)) 958 return s; 959 return NULL; 960 } 961 962 void ceph_put_mds_session(struct ceph_mds_session *s) 963 { 964 if (IS_ERR_OR_NULL(s)) 965 return; 966 967 if (refcount_dec_and_test(&s->s_ref)) { 968 if (s->s_auth.authorizer) 969 ceph_auth_destroy_authorizer(s->s_auth.authorizer); 970 WARN_ON(mutex_is_locked(&s->s_mutex)); 971 xa_destroy(&s->s_delegated_inos); 972 kfree(s); 973 } 974 } 975 976 /* 977 * called under mdsc->mutex 978 */ 979 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, 980 int mds) 981 { 982 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 983 return NULL; 984 return ceph_get_mds_session(mdsc->sessions[mds]); 985 } 986 987 static bool __have_session(struct ceph_mds_client *mdsc, int mds) 988 { 989 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 990 return false; 991 else 992 return true; 993 } 994 995 static int __verify_registered_session(struct ceph_mds_client *mdsc, 996 struct ceph_mds_session *s) 997 { 998 if (s->s_mds >= mdsc->max_sessions || 999 mdsc->sessions[s->s_mds] != s) 1000 return -ENOENT; 1001 return 0; 1002 } 1003 1004 /* 1005 * create+register a new session for given mds. 1006 * called under mdsc->mutex. 1007 */ 1008 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, 1009 int mds) 1010 { 1011 struct ceph_client *cl = mdsc->fsc->client; 1012 struct ceph_mds_session *s; 1013 1014 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) 1015 return ERR_PTR(-EIO); 1016 1017 if (mds >= mdsc->mdsmap->possible_max_rank) 1018 return ERR_PTR(-EINVAL); 1019 1020 s = kzalloc_obj(*s, GFP_NOFS); 1021 if (!s) 1022 return ERR_PTR(-ENOMEM); 1023 1024 if (mds >= mdsc->max_sessions) { 1025 int newmax = 1 << get_count_order(mds + 1); 1026 struct ceph_mds_session **sa; 1027 size_t ptr_size = sizeof(struct ceph_mds_session *); 1028 1029 doutc(cl, "realloc to %d\n", newmax); 1030 sa = kcalloc(newmax, ptr_size, GFP_NOFS); 1031 if (!sa) 1032 goto fail_realloc; 1033 if (mdsc->sessions) { 1034 memcpy(sa, mdsc->sessions, 1035 mdsc->max_sessions * ptr_size); 1036 kfree(mdsc->sessions); 1037 } 1038 mdsc->sessions = sa; 1039 mdsc->max_sessions = newmax; 1040 } 1041 1042 doutc(cl, "mds%d\n", mds); 1043 s->s_mdsc = mdsc; 1044 s->s_mds = mds; 1045 s->s_state = CEPH_MDS_SESSION_NEW; 1046 mutex_init(&s->s_mutex); 1047 1048 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr); 1049 1050 atomic_set(&s->s_cap_gen, 1); 1051 s->s_cap_ttl = jiffies - 1; 1052 1053 spin_lock_init(&s->s_cap_lock); 1054 INIT_LIST_HEAD(&s->s_caps); 1055 refcount_set(&s->s_ref, 1); 1056 INIT_LIST_HEAD(&s->s_waiting); 1057 INIT_LIST_HEAD(&s->s_unsafe); 1058 xa_init(&s->s_delegated_inos); 1059 INIT_LIST_HEAD(&s->s_cap_releases); 1060 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work); 1061 1062 INIT_LIST_HEAD(&s->s_cap_dirty); 1063 INIT_LIST_HEAD(&s->s_cap_flushing); 1064 1065 mdsc->sessions[mds] = s; 1066 atomic_inc(&mdsc->num_sessions); 1067 refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 1068 1069 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, 1070 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 1071 1072 return s; 1073 1074 fail_realloc: 1075 kfree(s); 1076 return ERR_PTR(-ENOMEM); 1077 } 1078 1079 /* 1080 * called under mdsc->mutex 1081 */ 1082 static void __unregister_session(struct ceph_mds_client *mdsc, 1083 struct ceph_mds_session *s) 1084 { 1085 doutc(mdsc->fsc->client, "mds%d %p\n", s->s_mds, s); 1086 BUG_ON(mdsc->sessions[s->s_mds] != s); 1087 mdsc->sessions[s->s_mds] = NULL; 1088 ceph_con_close(&s->s_con); 1089 ceph_put_mds_session(s); 1090 atomic_dec(&mdsc->num_sessions); 1091 } 1092 1093 /* 1094 * drop session refs in request. 1095 * 1096 * should be last request ref, or hold mdsc->mutex 1097 */ 1098 static void put_request_session(struct ceph_mds_request *req) 1099 { 1100 if (req->r_session) { 1101 ceph_put_mds_session(req->r_session); 1102 req->r_session = NULL; 1103 } 1104 } 1105 1106 void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc, 1107 void (*cb)(struct ceph_mds_session *), 1108 bool check_state) 1109 { 1110 int mds; 1111 1112 mutex_lock(&mdsc->mutex); 1113 for (mds = 0; mds < mdsc->max_sessions; ++mds) { 1114 struct ceph_mds_session *s; 1115 1116 s = __ceph_lookup_mds_session(mdsc, mds); 1117 if (!s) 1118 continue; 1119 1120 if (check_state && !check_session_state(s)) { 1121 ceph_put_mds_session(s); 1122 continue; 1123 } 1124 1125 mutex_unlock(&mdsc->mutex); 1126 cb(s); 1127 ceph_put_mds_session(s); 1128 mutex_lock(&mdsc->mutex); 1129 } 1130 mutex_unlock(&mdsc->mutex); 1131 } 1132 1133 void ceph_mdsc_release_request(struct kref *kref) 1134 { 1135 struct ceph_mds_request *req = container_of(kref, 1136 struct ceph_mds_request, 1137 r_kref); 1138 ceph_mdsc_release_dir_caps_async(req); 1139 destroy_reply_info(&req->r_reply_info); 1140 if (req->r_request) 1141 ceph_msg_put(req->r_request); 1142 if (req->r_reply) 1143 ceph_msg_put(req->r_reply); 1144 if (req->r_inode) { 1145 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 1146 iput(req->r_inode); 1147 } 1148 if (req->r_parent) { 1149 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 1150 iput(req->r_parent); 1151 } 1152 iput(req->r_target_inode); 1153 iput(req->r_new_inode); 1154 if (req->r_dentry) 1155 dput(req->r_dentry); 1156 if (req->r_old_dentry) 1157 dput(req->r_old_dentry); 1158 if (req->r_old_dentry_dir) { 1159 /* 1160 * track (and drop pins for) r_old_dentry_dir 1161 * separately, since r_old_dentry's d_parent may have 1162 * changed between the dir mutex being dropped and 1163 * this request being freed. 1164 */ 1165 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), 1166 CEPH_CAP_PIN); 1167 iput(req->r_old_dentry_dir); 1168 } 1169 kfree(req->r_path1); 1170 kfree(req->r_path2); 1171 put_cred(req->r_cred); 1172 if (req->r_mnt_idmap) 1173 mnt_idmap_put(req->r_mnt_idmap); 1174 if (req->r_pagelist) 1175 ceph_pagelist_release(req->r_pagelist); 1176 kfree(req->r_fscrypt_auth); 1177 kfree(req->r_altname); 1178 put_request_session(req); 1179 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); 1180 WARN_ON_ONCE(!list_empty(&req->r_wait)); 1181 kmem_cache_free(ceph_mds_request_cachep, req); 1182 } 1183 1184 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node) 1185 1186 /* 1187 * lookup session, bump ref if found. 1188 * 1189 * called under mdsc->mutex. 1190 */ 1191 static struct ceph_mds_request * 1192 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid) 1193 { 1194 struct ceph_mds_request *req; 1195 1196 req = lookup_request(&mdsc->request_tree, tid); 1197 if (req) 1198 ceph_mdsc_get_request(req); 1199 1200 return req; 1201 } 1202 1203 /* 1204 * Register an in-flight request, and assign a tid. Link to directory 1205 * are modifying (if any). 1206 * 1207 * Called under mdsc->mutex. 1208 */ 1209 static void __register_request(struct ceph_mds_client *mdsc, 1210 struct ceph_mds_request *req, 1211 struct inode *dir) 1212 { 1213 struct ceph_client *cl = mdsc->fsc->client; 1214 int ret = 0; 1215 1216 req->r_tid = ++mdsc->last_tid; 1217 if (req->r_num_caps) { 1218 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation, 1219 req->r_num_caps); 1220 if (ret < 0) { 1221 pr_err_client(cl, "%p failed to reserve caps: %d\n", 1222 req, ret); 1223 /* set req->r_err to fail early from __do_request */ 1224 req->r_err = ret; 1225 return; 1226 } 1227 } 1228 doutc(cl, "%p tid %lld\n", req, req->r_tid); 1229 ceph_mdsc_get_request(req); 1230 insert_request(&mdsc->request_tree, req); 1231 1232 req->r_cred = get_current_cred(); 1233 if (!req->r_mnt_idmap) 1234 req->r_mnt_idmap = &nop_mnt_idmap; 1235 1236 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK) 1237 mdsc->oldest_tid = req->r_tid; 1238 1239 if (dir) { 1240 struct ceph_inode_info *ci = ceph_inode(dir); 1241 1242 ihold(dir); 1243 req->r_unsafe_dir = dir; 1244 spin_lock(&ci->i_unsafe_lock); 1245 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); 1246 spin_unlock(&ci->i_unsafe_lock); 1247 } 1248 } 1249 1250 static void __unregister_request(struct ceph_mds_client *mdsc, 1251 struct ceph_mds_request *req) 1252 { 1253 doutc(mdsc->fsc->client, "%p tid %lld\n", req, req->r_tid); 1254 1255 /* Never leave an unregistered request on an unsafe list! */ 1256 list_del_init(&req->r_unsafe_item); 1257 1258 if (req->r_tid == mdsc->oldest_tid) { 1259 struct rb_node *p = rb_next(&req->r_node); 1260 mdsc->oldest_tid = 0; 1261 while (p) { 1262 struct ceph_mds_request *next_req = 1263 rb_entry(p, struct ceph_mds_request, r_node); 1264 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) { 1265 mdsc->oldest_tid = next_req->r_tid; 1266 break; 1267 } 1268 p = rb_next(p); 1269 } 1270 } 1271 1272 erase_request(&mdsc->request_tree, req); 1273 1274 if (req->r_unsafe_dir) { 1275 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 1276 spin_lock(&ci->i_unsafe_lock); 1277 list_del_init(&req->r_unsafe_dir_item); 1278 spin_unlock(&ci->i_unsafe_lock); 1279 } 1280 if (req->r_target_inode && 1281 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 1282 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); 1283 spin_lock(&ci->i_unsafe_lock); 1284 list_del_init(&req->r_unsafe_target_item); 1285 spin_unlock(&ci->i_unsafe_lock); 1286 } 1287 1288 if (req->r_unsafe_dir) { 1289 iput(req->r_unsafe_dir); 1290 req->r_unsafe_dir = NULL; 1291 } 1292 1293 complete_all(&req->r_safe_completion); 1294 1295 ceph_mdsc_put_request(req); 1296 } 1297 1298 /* 1299 * Walk back up the dentry tree until we hit a dentry representing a 1300 * non-snapshot inode. We do this using the rcu_read_lock (which must be held 1301 * when calling this) to ensure that the objects won't disappear while we're 1302 * working with them. Once we hit a candidate dentry, we attempt to take a 1303 * reference to it, and return that as the result. 1304 */ 1305 static struct inode *get_nonsnap_parent(struct dentry *dentry) 1306 { 1307 struct inode *inode = NULL; 1308 1309 while (dentry && !IS_ROOT(dentry)) { 1310 inode = d_inode_rcu(dentry); 1311 if (!inode || ceph_snap(inode) == CEPH_NOSNAP) 1312 break; 1313 dentry = dentry->d_parent; 1314 } 1315 if (inode) 1316 inode = igrab(inode); 1317 return inode; 1318 } 1319 1320 /* 1321 * Choose mds to send request to next. If there is a hint set in the 1322 * request (e.g., due to a prior forward hint from the mds), use that. 1323 * Otherwise, consult frag tree and/or caps to identify the 1324 * appropriate mds. If all else fails, choose randomly. 1325 * 1326 * Called under mdsc->mutex. 1327 */ 1328 static int __choose_mds(struct ceph_mds_client *mdsc, 1329 struct ceph_mds_request *req, 1330 bool *random) 1331 { 1332 struct inode *inode; 1333 struct ceph_inode_info *ci; 1334 struct ceph_cap *cap; 1335 int mode = req->r_direct_mode; 1336 int mds = -1; 1337 u32 hash = req->r_direct_hash; 1338 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); 1339 struct ceph_client *cl = mdsc->fsc->client; 1340 1341 if (random) 1342 *random = false; 1343 1344 /* 1345 * is there a specific mds we should try? ignore hint if we have 1346 * no session and the mds is not up (active or recovering). 1347 */ 1348 if (req->r_resend_mds >= 0 && 1349 (__have_session(mdsc, req->r_resend_mds) || 1350 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) { 1351 doutc(cl, "using resend_mds mds%d\n", req->r_resend_mds); 1352 return req->r_resend_mds; 1353 } 1354 1355 if (mode == USE_RANDOM_MDS) 1356 goto random; 1357 1358 inode = NULL; 1359 if (req->r_inode) { 1360 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) { 1361 inode = req->r_inode; 1362 ihold(inode); 1363 } else { 1364 /* req->r_dentry is non-null for LSSNAP request */ 1365 rcu_read_lock(); 1366 inode = get_nonsnap_parent(req->r_dentry); 1367 rcu_read_unlock(); 1368 doutc(cl, "using snapdir's parent %p %llx.%llx\n", 1369 inode, ceph_vinop(inode)); 1370 } 1371 } else if (req->r_dentry) { 1372 /* ignore race with rename; old or new d_parent is okay */ 1373 struct dentry *parent; 1374 struct inode *dir; 1375 1376 rcu_read_lock(); 1377 parent = READ_ONCE(req->r_dentry->d_parent); 1378 dir = req->r_parent ? : d_inode_rcu(parent); 1379 1380 if (!dir || dir->i_sb != mdsc->fsc->sb) { 1381 /* not this fs or parent went negative */ 1382 inode = d_inode(req->r_dentry); 1383 if (inode) 1384 ihold(inode); 1385 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 1386 /* direct snapped/virtual snapdir requests 1387 * based on parent dir inode */ 1388 inode = get_nonsnap_parent(parent); 1389 doutc(cl, "using nonsnap parent %p %llx.%llx\n", 1390 inode, ceph_vinop(inode)); 1391 } else { 1392 /* dentry target */ 1393 inode = d_inode(req->r_dentry); 1394 if (!inode || mode == USE_AUTH_MDS) { 1395 /* dir + name */ 1396 inode = igrab(dir); 1397 hash = ceph_dentry_hash(dir, req->r_dentry); 1398 is_hash = true; 1399 } else { 1400 ihold(inode); 1401 } 1402 } 1403 rcu_read_unlock(); 1404 } 1405 1406 if (!inode) 1407 goto random; 1408 1409 doutc(cl, "%p %llx.%llx is_hash=%d (0x%x) mode %d\n", inode, 1410 ceph_vinop(inode), (int)is_hash, hash, mode); 1411 ci = ceph_inode(inode); 1412 1413 if (is_hash && S_ISDIR(inode->i_mode)) { 1414 struct ceph_inode_frag frag; 1415 int found; 1416 1417 ceph_choose_frag(ci, hash, &frag, &found); 1418 if (found) { 1419 if (mode == USE_ANY_MDS && frag.ndist > 0) { 1420 u8 r; 1421 1422 /* choose a random replica */ 1423 get_random_bytes(&r, 1); 1424 r %= frag.ndist; 1425 mds = frag.dist[r]; 1426 doutc(cl, "%p %llx.%llx frag %u mds%d (%d/%d)\n", 1427 inode, ceph_vinop(inode), frag.frag, 1428 mds, (int)r, frag.ndist); 1429 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1430 CEPH_MDS_STATE_ACTIVE && 1431 !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds)) 1432 goto out; 1433 } 1434 1435 /* since this file/dir wasn't known to be 1436 * replicated, then we want to look for the 1437 * authoritative mds. */ 1438 if (frag.mds >= 0) { 1439 /* choose auth mds */ 1440 mds = frag.mds; 1441 doutc(cl, "%p %llx.%llx frag %u mds%d (auth)\n", 1442 inode, ceph_vinop(inode), frag.frag, mds); 1443 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1444 CEPH_MDS_STATE_ACTIVE) { 1445 if (!ceph_mdsmap_is_laggy(mdsc->mdsmap, 1446 mds)) 1447 goto out; 1448 } 1449 } 1450 mode = USE_AUTH_MDS; 1451 } 1452 } 1453 1454 spin_lock(&ci->i_ceph_lock); 1455 cap = NULL; 1456 if (mode == USE_AUTH_MDS) 1457 cap = ci->i_auth_cap; 1458 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) 1459 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); 1460 if (!cap) { 1461 spin_unlock(&ci->i_ceph_lock); 1462 iput(inode); 1463 goto random; 1464 } 1465 mds = cap->session->s_mds; 1466 doutc(cl, "%p %llx.%llx mds%d (%scap %p)\n", inode, 1467 ceph_vinop(inode), mds, 1468 cap == ci->i_auth_cap ? "auth " : "", cap); 1469 spin_unlock(&ci->i_ceph_lock); 1470 out: 1471 iput(inode); 1472 return mds; 1473 1474 random: 1475 if (random) 1476 *random = true; 1477 1478 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap); 1479 doutc(cl, "chose random mds%d\n", mds); 1480 return mds; 1481 } 1482 1483 1484 /* 1485 * session messages 1486 */ 1487 struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq) 1488 { 1489 struct ceph_msg *msg; 1490 struct ceph_mds_session_head *h; 1491 1492 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS, 1493 false); 1494 if (!msg) { 1495 pr_err("ENOMEM creating session %s msg\n", 1496 ceph_session_op_name(op)); 1497 return NULL; 1498 } 1499 h = msg->front.iov_base; 1500 h->op = cpu_to_le32(op); 1501 h->seq = cpu_to_le64(seq); 1502 1503 return msg; 1504 } 1505 1506 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED; 1507 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8) 1508 static int encode_supported_features(void **p, void *end) 1509 { 1510 static const size_t count = ARRAY_SIZE(feature_bits); 1511 1512 if (count > 0) { 1513 size_t i; 1514 size_t size = FEATURE_BYTES(count); 1515 unsigned long bit; 1516 1517 if (WARN_ON_ONCE(*p + 4 + size > end)) 1518 return -ERANGE; 1519 1520 ceph_encode_32(p, size); 1521 memset(*p, 0, size); 1522 for (i = 0; i < count; i++) { 1523 bit = feature_bits[i]; 1524 ((unsigned char *)(*p))[bit / 8] |= BIT(bit % 8); 1525 } 1526 *p += size; 1527 } else { 1528 if (WARN_ON_ONCE(*p + 4 > end)) 1529 return -ERANGE; 1530 1531 ceph_encode_32(p, 0); 1532 } 1533 1534 return 0; 1535 } 1536 1537 static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED; 1538 #define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8) 1539 static int encode_metric_spec(void **p, void *end) 1540 { 1541 static const size_t count = ARRAY_SIZE(metric_bits); 1542 1543 /* header */ 1544 if (WARN_ON_ONCE(*p + 2 > end)) 1545 return -ERANGE; 1546 1547 ceph_encode_8(p, 1); /* version */ 1548 ceph_encode_8(p, 1); /* compat */ 1549 1550 if (count > 0) { 1551 size_t i; 1552 size_t size = METRIC_BYTES(count); 1553 1554 if (WARN_ON_ONCE(*p + 4 + 4 + size > end)) 1555 return -ERANGE; 1556 1557 /* metric spec info length */ 1558 ceph_encode_32(p, 4 + size); 1559 1560 /* metric spec */ 1561 ceph_encode_32(p, size); 1562 memset(*p, 0, size); 1563 for (i = 0; i < count; i++) 1564 ((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8); 1565 *p += size; 1566 } else { 1567 if (WARN_ON_ONCE(*p + 4 + 4 > end)) 1568 return -ERANGE; 1569 1570 /* metric spec info length */ 1571 ceph_encode_32(p, 4); 1572 /* metric spec */ 1573 ceph_encode_32(p, 0); 1574 } 1575 1576 return 0; 1577 } 1578 1579 /* 1580 * session message, specialization for CEPH_SESSION_REQUEST_OPEN 1581 * to include additional client metadata fields. 1582 */ 1583 static struct ceph_msg * 1584 create_session_full_msg(struct ceph_mds_client *mdsc, int op, u64 seq) 1585 { 1586 struct ceph_msg *msg; 1587 struct ceph_mds_session_head *h; 1588 int i; 1589 int extra_bytes = 0; 1590 int metadata_key_count = 0; 1591 struct ceph_options *opt = mdsc->fsc->client->options; 1592 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options; 1593 struct ceph_client *cl = mdsc->fsc->client; 1594 size_t size, count; 1595 void *p, *end; 1596 int ret; 1597 1598 const char* metadata[][2] = { 1599 {"hostname", mdsc->nodename}, 1600 {"kernel_version", init_utsname()->release}, 1601 {"entity_id", opt->name ? : ""}, 1602 {"root", fsopt->server_path ? : "/"}, 1603 {NULL, NULL} 1604 }; 1605 1606 /* Calculate serialized length of metadata */ 1607 extra_bytes = 4; /* map length */ 1608 for (i = 0; metadata[i][0]; ++i) { 1609 extra_bytes += 8 + strlen(metadata[i][0]) + 1610 strlen(metadata[i][1]); 1611 metadata_key_count++; 1612 } 1613 1614 /* supported feature */ 1615 size = 0; 1616 count = ARRAY_SIZE(feature_bits); 1617 if (count > 0) 1618 size = FEATURE_BYTES(count); 1619 extra_bytes += 4 + size; 1620 1621 /* metric spec */ 1622 size = 0; 1623 count = ARRAY_SIZE(metric_bits); 1624 if (count > 0) 1625 size = METRIC_BYTES(count); 1626 extra_bytes += 2 + 4 + 4 + size; 1627 1628 /* flags, mds auth caps and oldest_client_tid */ 1629 extra_bytes += 4 + 4 + 8; 1630 1631 /* Allocate the message */ 1632 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes, 1633 GFP_NOFS, false); 1634 if (!msg) { 1635 pr_err_client(cl, "ENOMEM creating session open msg\n"); 1636 return ERR_PTR(-ENOMEM); 1637 } 1638 p = msg->front.iov_base; 1639 end = p + msg->front.iov_len; 1640 1641 h = p; 1642 h->op = cpu_to_le32(op); 1643 h->seq = cpu_to_le64(seq); 1644 1645 /* 1646 * Serialize client metadata into waiting buffer space, using 1647 * the format that userspace expects for map<string, string> 1648 * 1649 * ClientSession messages with metadata are v7 1650 */ 1651 msg->hdr.version = cpu_to_le16(7); 1652 msg->hdr.compat_version = cpu_to_le16(1); 1653 1654 /* The write pointer, following the session_head structure */ 1655 p += sizeof(*h); 1656 1657 /* Number of entries in the map */ 1658 ceph_encode_32(&p, metadata_key_count); 1659 1660 /* Two length-prefixed strings for each entry in the map */ 1661 for (i = 0; metadata[i][0]; ++i) { 1662 size_t const key_len = strlen(metadata[i][0]); 1663 size_t const val_len = strlen(metadata[i][1]); 1664 1665 ceph_encode_32(&p, key_len); 1666 memcpy(p, metadata[i][0], key_len); 1667 p += key_len; 1668 ceph_encode_32(&p, val_len); 1669 memcpy(p, metadata[i][1], val_len); 1670 p += val_len; 1671 } 1672 1673 ret = encode_supported_features(&p, end); 1674 if (ret) { 1675 pr_err_client(cl, "encode_supported_features failed!\n"); 1676 ceph_msg_put(msg); 1677 return ERR_PTR(ret); 1678 } 1679 1680 ret = encode_metric_spec(&p, end); 1681 if (ret) { 1682 pr_err_client(cl, "encode_metric_spec failed!\n"); 1683 ceph_msg_put(msg); 1684 return ERR_PTR(ret); 1685 } 1686 1687 /* version == 5, flags */ 1688 ceph_encode_32(&p, 0); 1689 1690 /* version == 6, mds auth caps */ 1691 ceph_encode_32(&p, 0); 1692 1693 /* version == 7, oldest_client_tid */ 1694 ceph_encode_64(&p, mdsc->oldest_tid); 1695 1696 msg->front.iov_len = p - msg->front.iov_base; 1697 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1698 1699 return msg; 1700 } 1701 1702 /* 1703 * send session open request. 1704 * 1705 * called under mdsc->mutex 1706 */ 1707 static int __open_session(struct ceph_mds_client *mdsc, 1708 struct ceph_mds_session *session) 1709 { 1710 struct ceph_msg *msg; 1711 int mstate; 1712 int mds = session->s_mds; 1713 1714 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) 1715 return -EIO; 1716 1717 /* wait for mds to go active? */ 1718 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 1719 doutc(mdsc->fsc->client, "open_session to mds%d (%s)\n", mds, 1720 ceph_mds_state_name(mstate)); 1721 session->s_state = CEPH_MDS_SESSION_OPENING; 1722 session->s_renew_requested = jiffies; 1723 1724 /* send connect message */ 1725 msg = create_session_full_msg(mdsc, CEPH_SESSION_REQUEST_OPEN, 1726 session->s_seq); 1727 if (IS_ERR(msg)) 1728 return PTR_ERR(msg); 1729 ceph_con_send(&session->s_con, msg); 1730 return 0; 1731 } 1732 1733 /* 1734 * open sessions for any export targets for the given mds 1735 * 1736 * called under mdsc->mutex 1737 */ 1738 static struct ceph_mds_session * 1739 __open_export_target_session(struct ceph_mds_client *mdsc, int target) 1740 { 1741 struct ceph_mds_session *session; 1742 int ret; 1743 1744 session = __ceph_lookup_mds_session(mdsc, target); 1745 if (!session) { 1746 session = register_session(mdsc, target); 1747 if (IS_ERR(session)) 1748 return session; 1749 } 1750 if (session->s_state == CEPH_MDS_SESSION_NEW || 1751 session->s_state == CEPH_MDS_SESSION_CLOSING) { 1752 ret = __open_session(mdsc, session); 1753 if (ret) 1754 return ERR_PTR(ret); 1755 } 1756 1757 return session; 1758 } 1759 1760 struct ceph_mds_session * 1761 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target) 1762 { 1763 struct ceph_mds_session *session; 1764 struct ceph_client *cl = mdsc->fsc->client; 1765 1766 doutc(cl, "to mds%d\n", target); 1767 1768 mutex_lock(&mdsc->mutex); 1769 session = __open_export_target_session(mdsc, target); 1770 mutex_unlock(&mdsc->mutex); 1771 1772 return session; 1773 } 1774 1775 static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 1776 struct ceph_mds_session *session) 1777 { 1778 struct ceph_mds_info *mi; 1779 struct ceph_mds_session *ts; 1780 int i, mds = session->s_mds; 1781 struct ceph_client *cl = mdsc->fsc->client; 1782 1783 if (mds >= mdsc->mdsmap->possible_max_rank) 1784 return; 1785 1786 mi = &mdsc->mdsmap->m_info[mds]; 1787 doutc(cl, "for mds%d (%d targets)\n", session->s_mds, 1788 mi->num_export_targets); 1789 1790 for (i = 0; i < mi->num_export_targets; i++) { 1791 ts = __open_export_target_session(mdsc, mi->export_targets[i]); 1792 ceph_put_mds_session(ts); 1793 } 1794 } 1795 1796 /* 1797 * session caps 1798 */ 1799 1800 static void detach_cap_releases(struct ceph_mds_session *session, 1801 struct list_head *target) 1802 { 1803 struct ceph_client *cl = session->s_mdsc->fsc->client; 1804 1805 lockdep_assert_held(&session->s_cap_lock); 1806 1807 list_splice_init(&session->s_cap_releases, target); 1808 session->s_num_cap_releases = 0; 1809 doutc(cl, "mds%d\n", session->s_mds); 1810 } 1811 1812 static void dispose_cap_releases(struct ceph_mds_client *mdsc, 1813 struct list_head *dispose) 1814 { 1815 while (!list_empty(dispose)) { 1816 struct ceph_cap *cap; 1817 /* zero out the in-progress message */ 1818 cap = list_first_entry(dispose, struct ceph_cap, session_caps); 1819 list_del(&cap->session_caps); 1820 ceph_put_cap(mdsc, cap); 1821 } 1822 } 1823 1824 static void cleanup_session_requests(struct ceph_mds_client *mdsc, 1825 struct ceph_mds_session *session) 1826 { 1827 struct ceph_client *cl = mdsc->fsc->client; 1828 struct ceph_mds_request *req; 1829 struct rb_node *p; 1830 1831 doutc(cl, "mds%d\n", session->s_mds); 1832 mutex_lock(&mdsc->mutex); 1833 while (!list_empty(&session->s_unsafe)) { 1834 req = list_first_entry(&session->s_unsafe, 1835 struct ceph_mds_request, r_unsafe_item); 1836 pr_warn_ratelimited_client(cl, " dropping unsafe request %llu\n", 1837 req->r_tid); 1838 if (req->r_target_inode) 1839 mapping_set_error(req->r_target_inode->i_mapping, -EIO); 1840 if (req->r_unsafe_dir) 1841 mapping_set_error(req->r_unsafe_dir->i_mapping, -EIO); 1842 __unregister_request(mdsc, req); 1843 } 1844 /* zero r_attempts, so kick_requests() will re-send requests */ 1845 p = rb_first(&mdsc->request_tree); 1846 while (p) { 1847 req = rb_entry(p, struct ceph_mds_request, r_node); 1848 p = rb_next(p); 1849 if (req->r_session && 1850 req->r_session->s_mds == session->s_mds) 1851 req->r_attempts = 0; 1852 } 1853 mutex_unlock(&mdsc->mutex); 1854 } 1855 1856 /* 1857 * Helper to safely iterate over all caps associated with a session, with 1858 * special care taken to handle a racing __ceph_remove_cap(). 1859 * 1860 * Caller must hold session s_mutex. 1861 */ 1862 int ceph_iterate_session_caps(struct ceph_mds_session *session, 1863 int (*cb)(struct inode *, int mds, void *), 1864 void *arg) 1865 { 1866 struct ceph_client *cl = session->s_mdsc->fsc->client; 1867 struct list_head *p; 1868 struct ceph_cap *cap; 1869 struct inode *inode, *last_inode = NULL; 1870 struct ceph_cap *old_cap = NULL; 1871 int ret; 1872 1873 doutc(cl, "%p mds%d\n", session, session->s_mds); 1874 spin_lock(&session->s_cap_lock); 1875 p = session->s_caps.next; 1876 while (p != &session->s_caps) { 1877 int mds; 1878 1879 cap = list_entry(p, struct ceph_cap, session_caps); 1880 inode = igrab(&cap->ci->netfs.inode); 1881 if (!inode) { 1882 p = p->next; 1883 continue; 1884 } 1885 session->s_cap_iterator = cap; 1886 mds = cap->mds; 1887 spin_unlock(&session->s_cap_lock); 1888 1889 if (last_inode) { 1890 iput(last_inode); 1891 last_inode = NULL; 1892 } 1893 if (old_cap) { 1894 ceph_put_cap(session->s_mdsc, old_cap); 1895 old_cap = NULL; 1896 } 1897 1898 ret = cb(inode, mds, arg); 1899 last_inode = inode; 1900 1901 spin_lock(&session->s_cap_lock); 1902 p = p->next; 1903 if (!cap->ci) { 1904 doutc(cl, "finishing cap %p removal\n", cap); 1905 BUG_ON(cap->session != session); 1906 cap->session = NULL; 1907 list_del_init(&cap->session_caps); 1908 session->s_nr_caps--; 1909 atomic64_dec(&session->s_mdsc->metric.total_caps); 1910 if (cap->queue_release) 1911 __ceph_queue_cap_release(session, cap); 1912 else 1913 old_cap = cap; /* put_cap it w/o locks held */ 1914 } 1915 if (ret < 0) 1916 goto out; 1917 } 1918 ret = 0; 1919 out: 1920 session->s_cap_iterator = NULL; 1921 spin_unlock(&session->s_cap_lock); 1922 1923 iput(last_inode); 1924 if (old_cap) 1925 ceph_put_cap(session->s_mdsc, old_cap); 1926 1927 return ret; 1928 } 1929 1930 static int remove_session_caps_cb(struct inode *inode, int mds, void *arg) 1931 { 1932 struct ceph_inode_info *ci = ceph_inode(inode); 1933 struct ceph_client *cl = ceph_inode_to_client(inode); 1934 bool invalidate = false; 1935 struct ceph_cap *cap; 1936 int iputs = 0; 1937 1938 spin_lock(&ci->i_ceph_lock); 1939 cap = __get_cap_for_mds(ci, mds); 1940 if (cap) { 1941 doutc(cl, " removing cap %p, ci is %p, inode is %p\n", 1942 cap, ci, &ci->netfs.inode); 1943 1944 iputs = ceph_purge_inode_cap(inode, cap, &invalidate); 1945 } 1946 spin_unlock(&ci->i_ceph_lock); 1947 1948 if (cap) 1949 wake_up_all(&ci->i_cap_wq); 1950 if (invalidate) 1951 ceph_queue_invalidate(inode); 1952 while (iputs--) 1953 iput(inode); 1954 return 0; 1955 } 1956 1957 /* 1958 * caller must hold session s_mutex 1959 */ 1960 static void remove_session_caps(struct ceph_mds_session *session) 1961 { 1962 struct ceph_fs_client *fsc = session->s_mdsc->fsc; 1963 struct super_block *sb = fsc->sb; 1964 LIST_HEAD(dispose); 1965 1966 doutc(fsc->client, "on %p\n", session); 1967 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc); 1968 1969 wake_up_all(&fsc->mdsc->cap_flushing_wq); 1970 1971 spin_lock(&session->s_cap_lock); 1972 if (session->s_nr_caps > 0) { 1973 struct inode *inode; 1974 struct ceph_cap *cap, *prev = NULL; 1975 struct ceph_vino vino; 1976 /* 1977 * iterate_session_caps() skips inodes that are being 1978 * deleted, we need to wait until deletions are complete. 1979 * __wait_on_freeing_inode() is designed for the job, 1980 * but it is not exported, so use lookup inode function 1981 * to access it. 1982 */ 1983 while (!list_empty(&session->s_caps)) { 1984 cap = list_entry(session->s_caps.next, 1985 struct ceph_cap, session_caps); 1986 if (cap == prev) 1987 break; 1988 prev = cap; 1989 vino = cap->ci->i_vino; 1990 spin_unlock(&session->s_cap_lock); 1991 1992 inode = ceph_find_inode(sb, vino); 1993 iput(inode); 1994 1995 spin_lock(&session->s_cap_lock); 1996 } 1997 } 1998 1999 // drop cap expires and unlock s_cap_lock 2000 detach_cap_releases(session, &dispose); 2001 2002 BUG_ON(session->s_nr_caps > 0); 2003 BUG_ON(!list_empty(&session->s_cap_flushing)); 2004 spin_unlock(&session->s_cap_lock); 2005 dispose_cap_releases(session->s_mdsc, &dispose); 2006 } 2007 2008 enum { 2009 RECONNECT, 2010 RENEWCAPS, 2011 FORCE_RO, 2012 }; 2013 2014 /* 2015 * wake up any threads waiting on this session's caps. if the cap is 2016 * old (didn't get renewed on the client reconnect), remove it now. 2017 * 2018 * caller must hold s_mutex. 2019 */ 2020 static int wake_up_session_cb(struct inode *inode, int mds, void *arg) 2021 { 2022 struct ceph_inode_info *ci = ceph_inode(inode); 2023 unsigned long ev = (unsigned long)arg; 2024 2025 if (ev == RECONNECT) { 2026 spin_lock(&ci->i_ceph_lock); 2027 ci->i_wanted_max_size = 0; 2028 ci->i_requested_max_size = 0; 2029 spin_unlock(&ci->i_ceph_lock); 2030 } else if (ev == RENEWCAPS) { 2031 struct ceph_cap *cap; 2032 2033 spin_lock(&ci->i_ceph_lock); 2034 cap = __get_cap_for_mds(ci, mds); 2035 /* mds did not re-issue stale cap */ 2036 if (cap && cap->cap_gen < atomic_read(&cap->session->s_cap_gen)) 2037 cap->issued = cap->implemented = CEPH_CAP_PIN; 2038 spin_unlock(&ci->i_ceph_lock); 2039 } else if (ev == FORCE_RO) { 2040 } 2041 wake_up_all(&ci->i_cap_wq); 2042 return 0; 2043 } 2044 2045 static void wake_up_session_caps(struct ceph_mds_session *session, int ev) 2046 { 2047 struct ceph_client *cl = session->s_mdsc->fsc->client; 2048 2049 doutc(cl, "session %p mds%d\n", session, session->s_mds); 2050 ceph_iterate_session_caps(session, wake_up_session_cb, 2051 (void *)(unsigned long)ev); 2052 } 2053 2054 /* 2055 * Send periodic message to MDS renewing all currently held caps. The 2056 * ack will reset the expiration for all caps from this session. 2057 * 2058 * caller holds s_mutex 2059 */ 2060 static int send_renew_caps(struct ceph_mds_client *mdsc, 2061 struct ceph_mds_session *session) 2062 { 2063 struct ceph_client *cl = mdsc->fsc->client; 2064 struct ceph_msg *msg; 2065 int state; 2066 2067 if (time_after_eq(jiffies, session->s_cap_ttl) && 2068 time_after_eq(session->s_cap_ttl, session->s_renew_requested)) 2069 pr_info_client(cl, "mds%d caps stale\n", session->s_mds); 2070 session->s_renew_requested = jiffies; 2071 2072 /* do not try to renew caps until a recovering mds has reconnected 2073 * with its clients. */ 2074 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); 2075 if (state < CEPH_MDS_STATE_RECONNECT) { 2076 doutc(cl, "ignoring mds%d (%s)\n", session->s_mds, 2077 ceph_mds_state_name(state)); 2078 return 0; 2079 } 2080 2081 doutc(cl, "to mds%d (%s)\n", session->s_mds, 2082 ceph_mds_state_name(state)); 2083 msg = create_session_full_msg(mdsc, CEPH_SESSION_REQUEST_RENEWCAPS, 2084 ++session->s_renew_seq); 2085 if (IS_ERR(msg)) 2086 return PTR_ERR(msg); 2087 ceph_con_send(&session->s_con, msg); 2088 return 0; 2089 } 2090 2091 static int send_flushmsg_ack(struct ceph_mds_client *mdsc, 2092 struct ceph_mds_session *session, u64 seq) 2093 { 2094 struct ceph_client *cl = mdsc->fsc->client; 2095 struct ceph_msg *msg; 2096 2097 doutc(cl, "to mds%d (%s)s seq %lld\n", session->s_mds, 2098 ceph_session_state_name(session->s_state), seq); 2099 msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq); 2100 if (!msg) 2101 return -ENOMEM; 2102 ceph_con_send(&session->s_con, msg); 2103 return 0; 2104 } 2105 2106 2107 /* 2108 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 2109 * 2110 * Called under session->s_mutex 2111 */ 2112 static void renewed_caps(struct ceph_mds_client *mdsc, 2113 struct ceph_mds_session *session, int is_renew) 2114 { 2115 struct ceph_client *cl = mdsc->fsc->client; 2116 int was_stale; 2117 int wake = 0; 2118 2119 spin_lock(&session->s_cap_lock); 2120 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl); 2121 2122 session->s_cap_ttl = session->s_renew_requested + 2123 mdsc->mdsmap->m_session_timeout*HZ; 2124 2125 if (was_stale) { 2126 if (time_before(jiffies, session->s_cap_ttl)) { 2127 pr_info_client(cl, "mds%d caps renewed\n", 2128 session->s_mds); 2129 wake = 1; 2130 } else { 2131 pr_info_client(cl, "mds%d caps still stale\n", 2132 session->s_mds); 2133 } 2134 } 2135 doutc(cl, "mds%d ttl now %lu, was %s, now %s\n", session->s_mds, 2136 session->s_cap_ttl, was_stale ? "stale" : "fresh", 2137 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); 2138 spin_unlock(&session->s_cap_lock); 2139 2140 if (wake) 2141 wake_up_session_caps(session, RENEWCAPS); 2142 } 2143 2144 /* 2145 * send a session close request 2146 */ 2147 static int request_close_session(struct ceph_mds_session *session) 2148 { 2149 struct ceph_client *cl = session->s_mdsc->fsc->client; 2150 struct ceph_msg *msg; 2151 2152 doutc(cl, "mds%d state %s seq %lld\n", session->s_mds, 2153 ceph_session_state_name(session->s_state), session->s_seq); 2154 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE, 2155 session->s_seq); 2156 if (!msg) 2157 return -ENOMEM; 2158 ceph_con_send(&session->s_con, msg); 2159 return 1; 2160 } 2161 2162 /* 2163 * Called with s_mutex held. 2164 */ 2165 static int __close_session(struct ceph_mds_client *mdsc, 2166 struct ceph_mds_session *session) 2167 { 2168 if (session->s_state >= CEPH_MDS_SESSION_CLOSING) 2169 return 0; 2170 session->s_state = CEPH_MDS_SESSION_CLOSING; 2171 return request_close_session(session); 2172 } 2173 2174 static bool drop_negative_children(struct dentry *dentry) 2175 { 2176 struct dentry *child; 2177 bool all_negative = true; 2178 2179 if (!d_is_dir(dentry)) 2180 goto out; 2181 2182 spin_lock(&dentry->d_lock); 2183 hlist_for_each_entry(child, &dentry->d_children, d_sib) { 2184 if (d_really_is_positive(child)) { 2185 all_negative = false; 2186 break; 2187 } 2188 } 2189 spin_unlock(&dentry->d_lock); 2190 2191 if (all_negative) 2192 shrink_dcache_parent(dentry); 2193 out: 2194 return all_negative; 2195 } 2196 2197 /* 2198 * Trim old(er) caps. 2199 * 2200 * Because we can't cache an inode without one or more caps, we do 2201 * this indirectly: if a cap is unused, we prune its aliases, at which 2202 * point the inode will hopefully get dropped to. 2203 * 2204 * Yes, this is a bit sloppy. Our only real goal here is to respond to 2205 * memory pressure from the MDS, though, so it needn't be perfect. 2206 */ 2207 static int trim_caps_cb(struct inode *inode, int mds, void *arg) 2208 { 2209 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); 2210 struct ceph_client *cl = mdsc->fsc->client; 2211 int *remaining = arg; 2212 struct ceph_inode_info *ci = ceph_inode(inode); 2213 int used, wanted, oissued, mine; 2214 struct ceph_cap *cap; 2215 2216 if (*remaining <= 0) 2217 return -1; 2218 2219 spin_lock(&ci->i_ceph_lock); 2220 cap = __get_cap_for_mds(ci, mds); 2221 if (!cap) { 2222 spin_unlock(&ci->i_ceph_lock); 2223 return 0; 2224 } 2225 mine = cap->issued | cap->implemented; 2226 used = __ceph_caps_used(ci); 2227 wanted = __ceph_caps_file_wanted(ci); 2228 oissued = __ceph_caps_issued_other(ci, cap); 2229 2230 doutc(cl, "%p %llx.%llx cap %p mine %s oissued %s used %s wanted %s\n", 2231 inode, ceph_vinop(inode), cap, ceph_cap_string(mine), 2232 ceph_cap_string(oissued), ceph_cap_string(used), 2233 ceph_cap_string(wanted)); 2234 if (cap == ci->i_auth_cap) { 2235 if (ci->i_dirty_caps || ci->i_flushing_caps || 2236 !list_empty(&ci->i_cap_snaps)) 2237 goto out; 2238 if ((used | wanted) & CEPH_CAP_ANY_WR) 2239 goto out; 2240 /* Note: it's possible that i_filelock_ref becomes non-zero 2241 * after dropping auth caps. It doesn't hurt because reply 2242 * of lock mds request will re-add auth caps. */ 2243 if (atomic_read(&ci->i_filelock_ref) > 0) 2244 goto out; 2245 } 2246 /* The inode has cached pages, but it's no longer used. 2247 * we can safely drop it */ 2248 if (S_ISREG(inode->i_mode) && 2249 wanted == 0 && used == CEPH_CAP_FILE_CACHE && 2250 !(oissued & CEPH_CAP_FILE_CACHE)) { 2251 used = 0; 2252 oissued = 0; 2253 } 2254 if ((used | wanted) & ~oissued & mine) 2255 goto out; /* we need these caps */ 2256 2257 if (oissued) { 2258 /* we aren't the only cap.. just remove us */ 2259 ceph_remove_cap(mdsc, cap, true); 2260 (*remaining)--; 2261 } else { 2262 struct dentry *dentry; 2263 /* try dropping referring dentries */ 2264 spin_unlock(&ci->i_ceph_lock); 2265 dentry = d_find_any_alias(inode); 2266 if (dentry && drop_negative_children(dentry)) { 2267 int count; 2268 dput(dentry); 2269 d_prune_aliases(inode); 2270 count = icount_read(inode); 2271 if (count == 1) 2272 (*remaining)--; 2273 doutc(cl, "%p %llx.%llx cap %p pruned, count now %d\n", 2274 inode, ceph_vinop(inode), cap, count); 2275 } else { 2276 dput(dentry); 2277 } 2278 return 0; 2279 } 2280 2281 out: 2282 spin_unlock(&ci->i_ceph_lock); 2283 return 0; 2284 } 2285 2286 /* 2287 * Trim session cap count down to some max number. 2288 */ 2289 int ceph_trim_caps(struct ceph_mds_client *mdsc, 2290 struct ceph_mds_session *session, 2291 int max_caps) 2292 { 2293 struct ceph_client *cl = mdsc->fsc->client; 2294 int trim_caps = session->s_nr_caps - max_caps; 2295 2296 doutc(cl, "mds%d start: %d / %d, trim %d\n", session->s_mds, 2297 session->s_nr_caps, max_caps, trim_caps); 2298 if (trim_caps > 0) { 2299 int remaining = trim_caps; 2300 2301 ceph_iterate_session_caps(session, trim_caps_cb, &remaining); 2302 doutc(cl, "mds%d done: %d / %d, trimmed %d\n", 2303 session->s_mds, session->s_nr_caps, max_caps, 2304 trim_caps - remaining); 2305 } 2306 2307 ceph_flush_session_cap_releases(mdsc, session); 2308 return 0; 2309 } 2310 2311 static int check_caps_flush(struct ceph_mds_client *mdsc, 2312 u64 want_flush_tid) 2313 { 2314 struct ceph_client *cl = mdsc->fsc->client; 2315 int ret = 1; 2316 2317 spin_lock(&mdsc->cap_dirty_lock); 2318 if (!list_empty(&mdsc->cap_flush_list)) { 2319 struct ceph_cap_flush *cf = 2320 list_first_entry(&mdsc->cap_flush_list, 2321 struct ceph_cap_flush, g_list); 2322 if (cf->tid <= want_flush_tid) { 2323 doutc(cl, "still flushing tid %llu <= %llu\n", 2324 cf->tid, want_flush_tid); 2325 ret = 0; 2326 } 2327 } 2328 spin_unlock(&mdsc->cap_dirty_lock); 2329 return ret; 2330 } 2331 2332 /* 2333 * flush all dirty inode data to disk. 2334 * 2335 * returns true if we've flushed through want_flush_tid 2336 */ 2337 static void wait_caps_flush(struct ceph_mds_client *mdsc, 2338 u64 want_flush_tid) 2339 { 2340 struct ceph_client *cl = mdsc->fsc->client; 2341 2342 doutc(cl, "want %llu\n", want_flush_tid); 2343 2344 wait_event(mdsc->cap_flushing_wq, 2345 check_caps_flush(mdsc, want_flush_tid)); 2346 2347 doutc(cl, "ok, flushed thru %llu\n", want_flush_tid); 2348 } 2349 2350 /* 2351 * called under s_mutex 2352 */ 2353 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 2354 struct ceph_mds_session *session) 2355 { 2356 struct ceph_client *cl = mdsc->fsc->client; 2357 struct ceph_msg *msg = NULL; 2358 struct ceph_mds_cap_release *head; 2359 struct ceph_mds_cap_item *item; 2360 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; 2361 struct ceph_cap *cap; 2362 LIST_HEAD(tmp_list); 2363 int num_cap_releases; 2364 __le32 barrier, *cap_barrier; 2365 2366 down_read(&osdc->lock); 2367 barrier = cpu_to_le32(osdc->epoch_barrier); 2368 up_read(&osdc->lock); 2369 2370 spin_lock(&session->s_cap_lock); 2371 again: 2372 list_splice_init(&session->s_cap_releases, &tmp_list); 2373 num_cap_releases = session->s_num_cap_releases; 2374 session->s_num_cap_releases = 0; 2375 spin_unlock(&session->s_cap_lock); 2376 2377 while (!list_empty(&tmp_list)) { 2378 if (!msg) { 2379 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, 2380 PAGE_SIZE, GFP_NOFS, false); 2381 if (!msg) 2382 goto out_err; 2383 head = msg->front.iov_base; 2384 head->num = cpu_to_le32(0); 2385 msg->front.iov_len = sizeof(*head); 2386 2387 msg->hdr.version = cpu_to_le16(2); 2388 msg->hdr.compat_version = cpu_to_le16(1); 2389 } 2390 2391 cap = list_first_entry(&tmp_list, struct ceph_cap, 2392 session_caps); 2393 list_del(&cap->session_caps); 2394 num_cap_releases--; 2395 2396 head = msg->front.iov_base; 2397 put_unaligned_le32(get_unaligned_le32(&head->num) + 1, 2398 &head->num); 2399 item = msg->front.iov_base + msg->front.iov_len; 2400 item->ino = cpu_to_le64(cap->cap_ino); 2401 item->cap_id = cpu_to_le64(cap->cap_id); 2402 item->migrate_seq = cpu_to_le32(cap->mseq); 2403 item->issue_seq = cpu_to_le32(cap->issue_seq); 2404 msg->front.iov_len += sizeof(*item); 2405 2406 ceph_put_cap(mdsc, cap); 2407 2408 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { 2409 // Append cap_barrier field 2410 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2411 *cap_barrier = barrier; 2412 msg->front.iov_len += sizeof(*cap_barrier); 2413 2414 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2415 doutc(cl, "mds%d %p\n", session->s_mds, msg); 2416 ceph_con_send(&session->s_con, msg); 2417 msg = NULL; 2418 } 2419 } 2420 2421 BUG_ON(num_cap_releases != 0); 2422 2423 spin_lock(&session->s_cap_lock); 2424 if (!list_empty(&session->s_cap_releases)) 2425 goto again; 2426 spin_unlock(&session->s_cap_lock); 2427 2428 if (msg) { 2429 // Append cap_barrier field 2430 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2431 *cap_barrier = barrier; 2432 msg->front.iov_len += sizeof(*cap_barrier); 2433 2434 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2435 doutc(cl, "mds%d %p\n", session->s_mds, msg); 2436 ceph_con_send(&session->s_con, msg); 2437 } 2438 return; 2439 out_err: 2440 pr_err_client(cl, "mds%d, failed to allocate message\n", 2441 session->s_mds); 2442 spin_lock(&session->s_cap_lock); 2443 list_splice(&tmp_list, &session->s_cap_releases); 2444 session->s_num_cap_releases += num_cap_releases; 2445 spin_unlock(&session->s_cap_lock); 2446 } 2447 2448 static void ceph_cap_release_work(struct work_struct *work) 2449 { 2450 struct ceph_mds_session *session = 2451 container_of(work, struct ceph_mds_session, s_cap_release_work); 2452 2453 mutex_lock(&session->s_mutex); 2454 if (session->s_state == CEPH_MDS_SESSION_OPEN || 2455 session->s_state == CEPH_MDS_SESSION_HUNG) 2456 ceph_send_cap_releases(session->s_mdsc, session); 2457 mutex_unlock(&session->s_mutex); 2458 ceph_put_mds_session(session); 2459 } 2460 2461 void ceph_flush_session_cap_releases(struct ceph_mds_client *mdsc, 2462 struct ceph_mds_session *session) 2463 { 2464 struct ceph_client *cl = mdsc->fsc->client; 2465 if (mdsc->stopping) 2466 return; 2467 2468 ceph_get_mds_session(session); 2469 if (queue_work(mdsc->fsc->cap_wq, 2470 &session->s_cap_release_work)) { 2471 doutc(cl, "cap release work queued\n"); 2472 } else { 2473 ceph_put_mds_session(session); 2474 doutc(cl, "failed to queue cap release work\n"); 2475 } 2476 } 2477 2478 /* 2479 * caller holds session->s_cap_lock 2480 */ 2481 void __ceph_queue_cap_release(struct ceph_mds_session *session, 2482 struct ceph_cap *cap) 2483 { 2484 list_add_tail(&cap->session_caps, &session->s_cap_releases); 2485 session->s_num_cap_releases++; 2486 2487 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE)) 2488 ceph_flush_session_cap_releases(session->s_mdsc, session); 2489 } 2490 2491 static void ceph_cap_reclaim_work(struct work_struct *work) 2492 { 2493 struct ceph_mds_client *mdsc = 2494 container_of(work, struct ceph_mds_client, cap_reclaim_work); 2495 int ret = ceph_trim_dentries(mdsc); 2496 if (ret == -EAGAIN) 2497 ceph_queue_cap_reclaim_work(mdsc); 2498 } 2499 2500 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc) 2501 { 2502 struct ceph_client *cl = mdsc->fsc->client; 2503 if (mdsc->stopping) 2504 return; 2505 2506 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) { 2507 doutc(cl, "caps reclaim work queued\n"); 2508 } else { 2509 doutc(cl, "failed to queue caps release work\n"); 2510 } 2511 } 2512 2513 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr) 2514 { 2515 int val; 2516 if (!nr) 2517 return; 2518 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending); 2519 if ((val % CEPH_CAPS_PER_RELEASE) < nr) { 2520 atomic_set(&mdsc->cap_reclaim_pending, 0); 2521 ceph_queue_cap_reclaim_work(mdsc); 2522 } 2523 } 2524 2525 void ceph_queue_cap_unlink_work(struct ceph_mds_client *mdsc) 2526 { 2527 struct ceph_client *cl = mdsc->fsc->client; 2528 if (mdsc->stopping) 2529 return; 2530 2531 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_unlink_work)) { 2532 doutc(cl, "caps unlink work queued\n"); 2533 } else { 2534 doutc(cl, "failed to queue caps unlink work\n"); 2535 } 2536 } 2537 2538 static void ceph_cap_unlink_work(struct work_struct *work) 2539 { 2540 struct ceph_mds_client *mdsc = 2541 container_of(work, struct ceph_mds_client, cap_unlink_work); 2542 struct ceph_client *cl = mdsc->fsc->client; 2543 2544 doutc(cl, "begin\n"); 2545 spin_lock(&mdsc->cap_delay_lock); 2546 while (!list_empty(&mdsc->cap_unlink_delay_list)) { 2547 struct ceph_inode_info *ci; 2548 struct inode *inode; 2549 2550 ci = list_first_entry(&mdsc->cap_unlink_delay_list, 2551 struct ceph_inode_info, 2552 i_cap_delay_list); 2553 list_del_init(&ci->i_cap_delay_list); 2554 2555 inode = igrab(&ci->netfs.inode); 2556 if (inode) { 2557 spin_unlock(&mdsc->cap_delay_lock); 2558 doutc(cl, "on %p %llx.%llx\n", inode, 2559 ceph_vinop(inode)); 2560 ceph_check_caps(ci, CHECK_CAPS_FLUSH); 2561 iput(inode); 2562 spin_lock(&mdsc->cap_delay_lock); 2563 } 2564 } 2565 spin_unlock(&mdsc->cap_delay_lock); 2566 doutc(cl, "done\n"); 2567 } 2568 2569 /* 2570 * requests 2571 */ 2572 2573 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, 2574 struct inode *dir) 2575 { 2576 struct ceph_inode_info *ci = ceph_inode(dir); 2577 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; 2578 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options; 2579 size_t size = sizeof(struct ceph_mds_reply_dir_entry); 2580 unsigned int num_entries; 2581 u64 bytes_count; 2582 int order; 2583 2584 spin_lock(&ci->i_ceph_lock); 2585 num_entries = ci->i_files + ci->i_subdirs; 2586 spin_unlock(&ci->i_ceph_lock); 2587 num_entries = max(num_entries, 1U); 2588 num_entries = min(num_entries, opt->max_readdir); 2589 2590 bytes_count = (u64)size * num_entries; 2591 if (unlikely(bytes_count > ULONG_MAX)) 2592 bytes_count = ULONG_MAX; 2593 2594 order = get_order((unsigned long)bytes_count); 2595 while (order >= 0) { 2596 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL | 2597 __GFP_NOWARN | 2598 __GFP_ZERO, 2599 order); 2600 if (rinfo->dir_entries) 2601 break; 2602 order--; 2603 } 2604 if (!rinfo->dir_entries || unlikely(order < 0)) 2605 return -ENOMEM; 2606 2607 num_entries = (PAGE_SIZE << order) / size; 2608 num_entries = min(num_entries, opt->max_readdir); 2609 2610 rinfo->dir_buf_size = PAGE_SIZE << order; 2611 req->r_num_caps = num_entries + 1; 2612 req->r_args.readdir.max_entries = cpu_to_le32(num_entries); 2613 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes); 2614 return 0; 2615 } 2616 2617 /* 2618 * Create an mds request. 2619 */ 2620 struct ceph_mds_request * 2621 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) 2622 { 2623 struct ceph_mds_request *req; 2624 2625 req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS); 2626 if (!req) 2627 return ERR_PTR(-ENOMEM); 2628 2629 mutex_init(&req->r_fill_mutex); 2630 req->r_mdsc = mdsc; 2631 req->r_started = jiffies; 2632 req->r_start_latency = ktime_get(); 2633 req->r_resend_mds = -1; 2634 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 2635 INIT_LIST_HEAD(&req->r_unsafe_target_item); 2636 req->r_fmode = -1; 2637 req->r_feature_needed = -1; 2638 kref_init(&req->r_kref); 2639 RB_CLEAR_NODE(&req->r_node); 2640 INIT_LIST_HEAD(&req->r_wait); 2641 init_completion(&req->r_completion); 2642 init_completion(&req->r_safe_completion); 2643 INIT_LIST_HEAD(&req->r_unsafe_item); 2644 2645 ktime_get_coarse_real_ts64(&req->r_stamp); 2646 2647 req->r_op = op; 2648 req->r_direct_mode = mode; 2649 return req; 2650 } 2651 2652 /* 2653 * return oldest (lowest) request, tid in request tree, 0 if none. 2654 * 2655 * called under mdsc->mutex. 2656 */ 2657 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) 2658 { 2659 if (RB_EMPTY_ROOT(&mdsc->request_tree)) 2660 return NULL; 2661 return rb_entry(rb_first(&mdsc->request_tree), 2662 struct ceph_mds_request, r_node); 2663 } 2664 2665 static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) 2666 { 2667 return mdsc->oldest_tid; 2668 } 2669 2670 #if IS_ENABLED(CONFIG_FS_ENCRYPTION) 2671 static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen) 2672 { 2673 struct inode *dir = req->r_parent; 2674 struct dentry *dentry = req->r_dentry; 2675 const struct qstr *name = req->r_dname; 2676 u8 *cryptbuf = NULL; 2677 u32 len = 0; 2678 int ret = 0; 2679 2680 /* only encode if we have parent and dentry */ 2681 if (!dir || !dentry) 2682 goto success; 2683 2684 /* No-op unless this is encrypted */ 2685 if (!IS_ENCRYPTED(dir)) 2686 goto success; 2687 2688 ret = ceph_fscrypt_prepare_readdir(dir); 2689 if (ret < 0) 2690 return ERR_PTR(ret); 2691 2692 /* No key? Just ignore it. */ 2693 if (!fscrypt_has_encryption_key(dir)) 2694 goto success; 2695 2696 if (!name) 2697 name = &dentry->d_name; 2698 2699 if (!fscrypt_fname_encrypted_size(dir, name->len, NAME_MAX, &len)) { 2700 WARN_ON_ONCE(1); 2701 return ERR_PTR(-ENAMETOOLONG); 2702 } 2703 2704 /* No need to append altname if name is short enough */ 2705 if (len <= CEPH_NOHASH_NAME_MAX) { 2706 len = 0; 2707 goto success; 2708 } 2709 2710 cryptbuf = kmalloc(len, GFP_KERNEL); 2711 if (!cryptbuf) 2712 return ERR_PTR(-ENOMEM); 2713 2714 ret = fscrypt_fname_encrypt(dir, name, cryptbuf, len); 2715 if (ret) { 2716 kfree(cryptbuf); 2717 return ERR_PTR(ret); 2718 } 2719 success: 2720 *plen = len; 2721 return cryptbuf; 2722 } 2723 #else 2724 static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen) 2725 { 2726 *plen = 0; 2727 return NULL; 2728 } 2729 #endif 2730 2731 /** 2732 * ceph_mdsc_build_path - build a path string to a given dentry 2733 * @mdsc: mds client 2734 * @dentry: dentry to which path should be built 2735 * @path_info: output path, length, base ino+snap, and freepath ownership flag 2736 * @for_wire: is this path going to be sent to the MDS? 2737 * 2738 * Build a string that represents the path to the dentry. This is mostly called 2739 * for two different purposes: 2740 * 2741 * 1) we need to build a path string to send to the MDS (for_wire == true) 2742 * 2) we need a path string for local presentation (e.g. debugfs) 2743 * (for_wire == false) 2744 * 2745 * The path is built in reverse, starting with the dentry. Walk back up toward 2746 * the root, building the path until the first non-snapped inode is reached 2747 * (for_wire) or the root inode is reached (!for_wire). 2748 * 2749 * Encode hidden .snap dirs as a double /, i.e. 2750 * foo/.snap/bar -> foo//bar 2751 */ 2752 char *ceph_mdsc_build_path(struct ceph_mds_client *mdsc, struct dentry *dentry, 2753 struct ceph_path_info *path_info, int for_wire) 2754 { 2755 struct ceph_client *cl = mdsc->fsc->client; 2756 struct dentry *cur; 2757 struct inode *inode; 2758 char *path; 2759 int pos; 2760 unsigned seq; 2761 u64 base; 2762 2763 if (!dentry) 2764 return ERR_PTR(-EINVAL); 2765 2766 path = __getname(); 2767 if (!path) 2768 return ERR_PTR(-ENOMEM); 2769 retry: 2770 pos = PATH_MAX - 1; 2771 path[pos] = '\0'; 2772 2773 seq = read_seqbegin(&rename_lock); 2774 cur = dget(dentry); 2775 for (;;) { 2776 struct dentry *parent; 2777 2778 spin_lock(&cur->d_lock); 2779 inode = d_inode(cur); 2780 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 2781 doutc(cl, "path+%d: %p SNAPDIR\n", pos, cur); 2782 spin_unlock(&cur->d_lock); 2783 parent = dget_parent(cur); 2784 } else if (for_wire && inode && dentry != cur && 2785 ceph_snap(inode) == CEPH_NOSNAP) { 2786 spin_unlock(&cur->d_lock); 2787 pos++; /* get rid of any prepended '/' */ 2788 break; 2789 } else if (!for_wire || !IS_ENCRYPTED(d_inode(cur->d_parent))) { 2790 pos -= cur->d_name.len; 2791 if (pos < 0) { 2792 spin_unlock(&cur->d_lock); 2793 break; 2794 } 2795 memcpy(path + pos, cur->d_name.name, cur->d_name.len); 2796 spin_unlock(&cur->d_lock); 2797 parent = dget_parent(cur); 2798 } else { 2799 int len, ret; 2800 char buf[NAME_MAX]; 2801 2802 /* 2803 * Proactively copy name into buf, in case we need to 2804 * present it as-is. 2805 */ 2806 memcpy(buf, cur->d_name.name, cur->d_name.len); 2807 len = cur->d_name.len; 2808 spin_unlock(&cur->d_lock); 2809 parent = dget_parent(cur); 2810 2811 ret = ceph_fscrypt_prepare_readdir(d_inode(parent)); 2812 if (ret < 0) { 2813 dput(parent); 2814 dput(cur); 2815 __putname(path); 2816 return ERR_PTR(ret); 2817 } 2818 2819 if (fscrypt_has_encryption_key(d_inode(parent))) { 2820 len = ceph_encode_encrypted_dname(d_inode(parent), 2821 buf, len); 2822 if (len < 0) { 2823 dput(parent); 2824 dput(cur); 2825 __putname(path); 2826 return ERR_PTR(len); 2827 } 2828 } 2829 pos -= len; 2830 if (pos < 0) { 2831 dput(parent); 2832 break; 2833 } 2834 memcpy(path + pos, buf, len); 2835 } 2836 dput(cur); 2837 cur = parent; 2838 2839 /* Are we at the root? */ 2840 if (IS_ROOT(cur)) 2841 break; 2842 2843 /* Are we out of buffer? */ 2844 if (--pos < 0) 2845 break; 2846 2847 path[pos] = '/'; 2848 } 2849 inode = d_inode(cur); 2850 base = inode ? ceph_ino(inode) : 0; 2851 dput(cur); 2852 2853 if (read_seqretry(&rename_lock, seq)) 2854 goto retry; 2855 2856 if (pos < 0) { 2857 /* 2858 * The path is longer than PATH_MAX and this function 2859 * cannot ever succeed. Creating paths that long is 2860 * possible with Ceph, but Linux cannot use them. 2861 */ 2862 __putname(path); 2863 return ERR_PTR(-ENAMETOOLONG); 2864 } 2865 2866 /* Initialize the output structure */ 2867 memset(path_info, 0, sizeof(*path_info)); 2868 2869 path_info->vino.ino = base; 2870 path_info->pathlen = PATH_MAX - 1 - pos; 2871 path_info->path = path + pos; 2872 path_info->freepath = true; 2873 2874 /* Set snap from dentry if available */ 2875 if (d_inode(dentry)) 2876 path_info->vino.snap = ceph_snap(d_inode(dentry)); 2877 else 2878 path_info->vino.snap = CEPH_NOSNAP; 2879 2880 doutc(cl, "on %p %d built %llx '%.*s'\n", dentry, d_count(dentry), 2881 base, PATH_MAX - 1 - pos, path + pos); 2882 return path + pos; 2883 } 2884 2885 static int build_dentry_path(struct ceph_mds_client *mdsc, struct dentry *dentry, 2886 struct inode *dir, struct ceph_path_info *path_info, 2887 bool parent_locked) 2888 { 2889 char *path; 2890 2891 rcu_read_lock(); 2892 if (!dir) 2893 dir = d_inode_rcu(dentry->d_parent); 2894 if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP && 2895 !IS_ENCRYPTED(dir)) { 2896 path_info->vino.ino = ceph_ino(dir); 2897 path_info->vino.snap = ceph_snap(dir); 2898 rcu_read_unlock(); 2899 path_info->path = dentry->d_name.name; 2900 path_info->pathlen = dentry->d_name.len; 2901 path_info->freepath = false; 2902 return 0; 2903 } 2904 rcu_read_unlock(); 2905 path = ceph_mdsc_build_path(mdsc, dentry, path_info, 1); 2906 if (IS_ERR(path)) 2907 return PTR_ERR(path); 2908 /* 2909 * ceph_mdsc_build_path already fills path_info, including snap handling. 2910 */ 2911 return 0; 2912 } 2913 2914 static int build_inode_path(struct inode *inode, struct ceph_path_info *path_info) 2915 { 2916 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); 2917 struct dentry *dentry; 2918 char *path; 2919 2920 if (ceph_snap(inode) == CEPH_NOSNAP) { 2921 path_info->vino.ino = ceph_ino(inode); 2922 path_info->vino.snap = ceph_snap(inode); 2923 path_info->pathlen = 0; 2924 path_info->freepath = false; 2925 return 0; 2926 } 2927 dentry = d_find_alias(inode); 2928 path = ceph_mdsc_build_path(mdsc, dentry, path_info, 1); 2929 dput(dentry); 2930 if (IS_ERR(path)) 2931 return PTR_ERR(path); 2932 /* 2933 * ceph_mdsc_build_path already fills path_info, including snap from dentry. 2934 * Override with inode's snap since that's what this function is for. 2935 */ 2936 path_info->vino.snap = ceph_snap(inode); 2937 return 0; 2938 } 2939 2940 /* 2941 * request arguments may be specified via an inode *, a dentry *, or 2942 * an explicit ino+path. 2943 */ 2944 static int set_request_path_attr(struct ceph_mds_client *mdsc, struct inode *rinode, 2945 struct dentry *rdentry, struct inode *rdiri, 2946 const char *rpath, u64 rino, 2947 struct ceph_path_info *path_info, 2948 bool parent_locked) 2949 { 2950 struct ceph_client *cl = mdsc->fsc->client; 2951 int r = 0; 2952 2953 /* Initialize the output structure */ 2954 memset(path_info, 0, sizeof(*path_info)); 2955 2956 if (rinode) { 2957 r = build_inode_path(rinode, path_info); 2958 doutc(cl, " inode %p %llx.%llx\n", rinode, ceph_ino(rinode), 2959 ceph_snap(rinode)); 2960 } else if (rdentry) { 2961 r = build_dentry_path(mdsc, rdentry, rdiri, path_info, parent_locked); 2962 doutc(cl, " dentry %p %llx/%.*s\n", rdentry, path_info->vino.ino, 2963 path_info->pathlen, path_info->path); 2964 } else if (rpath || rino) { 2965 path_info->vino.ino = rino; 2966 path_info->vino.snap = CEPH_NOSNAP; 2967 path_info->path = rpath; 2968 path_info->pathlen = rpath ? strlen(rpath) : 0; 2969 path_info->freepath = false; 2970 2971 doutc(cl, " path %.*s\n", path_info->pathlen, rpath); 2972 } 2973 2974 return r; 2975 } 2976 2977 static void encode_mclientrequest_tail(void **p, 2978 const struct ceph_mds_request *req) 2979 { 2980 struct ceph_timespec ts; 2981 int i; 2982 2983 ceph_encode_timespec64(&ts, &req->r_stamp); 2984 ceph_encode_copy(p, &ts, sizeof(ts)); 2985 2986 /* v4: gid_list */ 2987 ceph_encode_32(p, req->r_cred->group_info->ngroups); 2988 for (i = 0; i < req->r_cred->group_info->ngroups; i++) 2989 ceph_encode_64(p, from_kgid(&init_user_ns, 2990 req->r_cred->group_info->gid[i])); 2991 2992 /* v5: altname */ 2993 ceph_encode_32(p, req->r_altname_len); 2994 ceph_encode_copy(p, req->r_altname, req->r_altname_len); 2995 2996 /* v6: fscrypt_auth and fscrypt_file */ 2997 if (req->r_fscrypt_auth) { 2998 u32 authlen = ceph_fscrypt_auth_len(req->r_fscrypt_auth); 2999 3000 ceph_encode_32(p, authlen); 3001 ceph_encode_copy(p, req->r_fscrypt_auth, authlen); 3002 } else { 3003 ceph_encode_32(p, 0); 3004 } 3005 if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags)) { 3006 ceph_encode_32(p, sizeof(__le64)); 3007 ceph_encode_64(p, req->r_fscrypt_file); 3008 } else { 3009 ceph_encode_32(p, 0); 3010 } 3011 } 3012 3013 static inline u16 mds_supported_head_version(struct ceph_mds_session *session) 3014 { 3015 if (!test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD, &session->s_features)) 3016 return 1; 3017 3018 if (!test_bit(CEPHFS_FEATURE_HAS_OWNER_UIDGID, &session->s_features)) 3019 return 2; 3020 3021 return CEPH_MDS_REQUEST_HEAD_VERSION; 3022 } 3023 3024 static struct ceph_mds_request_head_legacy * 3025 find_legacy_request_head(void *p, u64 features) 3026 { 3027 bool legacy = !(features & CEPH_FEATURE_FS_BTIME); 3028 struct ceph_mds_request_head *head; 3029 3030 if (legacy) 3031 return (struct ceph_mds_request_head_legacy *)p; 3032 head = (struct ceph_mds_request_head *)p; 3033 return (struct ceph_mds_request_head_legacy *)&head->oldest_client_tid; 3034 } 3035 3036 /* 3037 * called under mdsc->mutex 3038 */ 3039 static struct ceph_msg *create_request_message(struct ceph_mds_session *session, 3040 struct ceph_mds_request *req, 3041 bool drop_cap_releases) 3042 { 3043 int mds = session->s_mds; 3044 struct ceph_mds_client *mdsc = session->s_mdsc; 3045 struct ceph_client *cl = mdsc->fsc->client; 3046 struct ceph_msg *msg; 3047 struct ceph_mds_request_head_legacy *lhead; 3048 struct ceph_path_info path_info1 = {0}; 3049 struct ceph_path_info path_info2 = {0}; 3050 struct dentry *old_dentry = NULL; 3051 int len; 3052 u16 releases; 3053 void *p, *end; 3054 int ret; 3055 bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME); 3056 u16 request_head_version = mds_supported_head_version(session); 3057 kuid_t caller_fsuid = req->r_cred->fsuid; 3058 kgid_t caller_fsgid = req->r_cred->fsgid; 3059 bool parent_locked = test_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); 3060 3061 ret = set_request_path_attr(mdsc, req->r_inode, req->r_dentry, 3062 req->r_parent, req->r_path1, req->r_ino1.ino, 3063 &path_info1, parent_locked); 3064 if (ret < 0) { 3065 msg = ERR_PTR(ret); 3066 goto out; 3067 } 3068 3069 /* 3070 * When the parent directory's i_rwsem is *not* locked, req->r_parent may 3071 * have become stale (e.g. after a concurrent rename) between the time the 3072 * dentry was looked up and now. If we detect that the stored r_parent 3073 * does not match the inode number we just encoded for the request, switch 3074 * to the correct inode so that the MDS receives a valid parent reference. 3075 */ 3076 if (!parent_locked && req->r_parent && path_info1.vino.ino && 3077 ceph_ino(req->r_parent) != path_info1.vino.ino) { 3078 struct inode *old_parent = req->r_parent; 3079 struct inode *correct_dir = ceph_get_inode(mdsc->fsc->sb, path_info1.vino, NULL); 3080 if (!IS_ERR(correct_dir)) { 3081 WARN_ONCE(1, "ceph: r_parent mismatch (had %llx wanted %llx) - updating\n", 3082 ceph_ino(old_parent), path_info1.vino.ino); 3083 /* 3084 * Transfer CEPH_CAP_PIN from the old parent to the new one. 3085 * The pin was taken earlier in ceph_mdsc_submit_request(). 3086 */ 3087 ceph_put_cap_refs(ceph_inode(old_parent), CEPH_CAP_PIN); 3088 iput(old_parent); 3089 req->r_parent = correct_dir; 3090 ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 3091 } 3092 } 3093 3094 /* If r_old_dentry is set, then assume that its parent is locked */ 3095 if (req->r_old_dentry && 3096 !(req->r_old_dentry->d_flags & DCACHE_DISCONNECTED)) 3097 old_dentry = req->r_old_dentry; 3098 ret = set_request_path_attr(mdsc, NULL, old_dentry, 3099 req->r_old_dentry_dir, 3100 req->r_path2, req->r_ino2.ino, 3101 &path_info2, true); 3102 if (ret < 0) { 3103 msg = ERR_PTR(ret); 3104 goto out_free1; 3105 } 3106 3107 req->r_altname = get_fscrypt_altname(req, &req->r_altname_len); 3108 if (IS_ERR(req->r_altname)) { 3109 msg = ERR_CAST(req->r_altname); 3110 req->r_altname = NULL; 3111 goto out_free2; 3112 } 3113 3114 /* 3115 * For old cephs without supporting the 32bit retry/fwd feature 3116 * it will copy the raw memories directly when decoding the 3117 * requests. While new cephs will decode the head depending the 3118 * version member, so we need to make sure it will be compatible 3119 * with them both. 3120 */ 3121 if (legacy) 3122 len = sizeof(struct ceph_mds_request_head_legacy); 3123 else if (request_head_version == 1) 3124 len = offsetofend(struct ceph_mds_request_head, args); 3125 else if (request_head_version == 2) 3126 len = offsetofend(struct ceph_mds_request_head, ext_num_fwd); 3127 else 3128 len = sizeof(struct ceph_mds_request_head); 3129 3130 /* filepaths */ 3131 len += 2 * (1 + sizeof(u32) + sizeof(u64)); 3132 len += path_info1.pathlen + path_info2.pathlen; 3133 3134 /* cap releases */ 3135 len += sizeof(struct ceph_mds_request_release) * 3136 (!!req->r_inode_drop + !!req->r_dentry_drop + 3137 !!req->r_old_inode_drop + !!req->r_old_dentry_drop); 3138 3139 if (req->r_dentry_drop) 3140 len += path_info1.pathlen; 3141 if (req->r_old_dentry_drop) 3142 len += path_info2.pathlen; 3143 3144 /* MClientRequest tail */ 3145 3146 /* req->r_stamp */ 3147 len += sizeof(struct ceph_timespec); 3148 3149 /* gid list */ 3150 len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups); 3151 3152 /* alternate name */ 3153 len += sizeof(u32) + req->r_altname_len; 3154 3155 /* fscrypt_auth */ 3156 len += sizeof(u32); // fscrypt_auth 3157 if (req->r_fscrypt_auth) 3158 len += ceph_fscrypt_auth_len(req->r_fscrypt_auth); 3159 3160 /* fscrypt_file */ 3161 len += sizeof(u32); 3162 if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags)) 3163 len += sizeof(__le64); 3164 3165 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false); 3166 if (!msg) { 3167 msg = ERR_PTR(-ENOMEM); 3168 goto out_free2; 3169 } 3170 3171 msg->hdr.tid = cpu_to_le64(req->r_tid); 3172 3173 lhead = find_legacy_request_head(msg->front.iov_base, 3174 session->s_con.peer_features); 3175 3176 if ((req->r_mnt_idmap != &nop_mnt_idmap) && 3177 !test_bit(CEPHFS_FEATURE_HAS_OWNER_UIDGID, &session->s_features)) { 3178 WARN_ON_ONCE(!IS_CEPH_MDS_OP_NEWINODE(req->r_op)); 3179 3180 if (enable_unsafe_idmap) { 3181 pr_warn_once_client(cl, 3182 "idmapped mount is used and CEPHFS_FEATURE_HAS_OWNER_UIDGID" 3183 " is not supported by MDS. UID/GID-based restrictions may" 3184 " not work properly.\n"); 3185 3186 caller_fsuid = from_vfsuid(req->r_mnt_idmap, &init_user_ns, 3187 VFSUIDT_INIT(req->r_cred->fsuid)); 3188 caller_fsgid = from_vfsgid(req->r_mnt_idmap, &init_user_ns, 3189 VFSGIDT_INIT(req->r_cred->fsgid)); 3190 } else { 3191 pr_err_ratelimited_client(cl, 3192 "idmapped mount is used and CEPHFS_FEATURE_HAS_OWNER_UIDGID" 3193 " is not supported by MDS. Fail request with -EIO.\n"); 3194 3195 ret = -EIO; 3196 goto out_err; 3197 } 3198 } 3199 3200 /* 3201 * The ceph_mds_request_head_legacy didn't contain a version field, and 3202 * one was added when we moved the message version from 3->4. 3203 */ 3204 if (legacy) { 3205 msg->hdr.version = cpu_to_le16(3); 3206 p = msg->front.iov_base + sizeof(*lhead); 3207 } else if (request_head_version == 1) { 3208 struct ceph_mds_request_head *nhead = msg->front.iov_base; 3209 3210 msg->hdr.version = cpu_to_le16(4); 3211 nhead->version = cpu_to_le16(1); 3212 p = msg->front.iov_base + offsetofend(struct ceph_mds_request_head, args); 3213 } else if (request_head_version == 2) { 3214 struct ceph_mds_request_head *nhead = msg->front.iov_base; 3215 3216 msg->hdr.version = cpu_to_le16(6); 3217 nhead->version = cpu_to_le16(2); 3218 3219 p = msg->front.iov_base + offsetofend(struct ceph_mds_request_head, ext_num_fwd); 3220 } else { 3221 struct ceph_mds_request_head *nhead = msg->front.iov_base; 3222 kuid_t owner_fsuid; 3223 kgid_t owner_fsgid; 3224 3225 msg->hdr.version = cpu_to_le16(6); 3226 nhead->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION); 3227 nhead->struct_len = cpu_to_le32(sizeof(struct ceph_mds_request_head)); 3228 3229 if (IS_CEPH_MDS_OP_NEWINODE(req->r_op)) { 3230 owner_fsuid = from_vfsuid(req->r_mnt_idmap, &init_user_ns, 3231 VFSUIDT_INIT(req->r_cred->fsuid)); 3232 owner_fsgid = from_vfsgid(req->r_mnt_idmap, &init_user_ns, 3233 VFSGIDT_INIT(req->r_cred->fsgid)); 3234 nhead->owner_uid = cpu_to_le32(from_kuid(&init_user_ns, owner_fsuid)); 3235 nhead->owner_gid = cpu_to_le32(from_kgid(&init_user_ns, owner_fsgid)); 3236 } else { 3237 nhead->owner_uid = cpu_to_le32(-1); 3238 nhead->owner_gid = cpu_to_le32(-1); 3239 } 3240 3241 p = msg->front.iov_base + sizeof(*nhead); 3242 } 3243 3244 end = msg->front.iov_base + msg->front.iov_len; 3245 3246 lhead->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); 3247 lhead->op = cpu_to_le32(req->r_op); 3248 lhead->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, 3249 caller_fsuid)); 3250 lhead->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, 3251 caller_fsgid)); 3252 lhead->ino = cpu_to_le64(req->r_deleg_ino); 3253 lhead->args = req->r_args; 3254 3255 ceph_encode_filepath(&p, end, path_info1.vino.ino, path_info1.path); 3256 ceph_encode_filepath(&p, end, path_info2.vino.ino, path_info2.path); 3257 3258 /* make note of release offset, in case we need to replay */ 3259 req->r_request_release_offset = p - msg->front.iov_base; 3260 3261 /* cap releases */ 3262 releases = 0; 3263 if (req->r_inode_drop) 3264 releases += ceph_encode_inode_release(&p, 3265 req->r_inode ? req->r_inode : d_inode(req->r_dentry), 3266 mds, req->r_inode_drop, req->r_inode_unless, 3267 req->r_op == CEPH_MDS_OP_READDIR); 3268 if (req->r_dentry_drop) { 3269 ret = ceph_encode_dentry_release(&p, req->r_dentry, 3270 req->r_parent, mds, req->r_dentry_drop, 3271 req->r_dentry_unless); 3272 if (ret < 0) 3273 goto out_err; 3274 releases += ret; 3275 } 3276 if (req->r_old_dentry_drop) { 3277 ret = ceph_encode_dentry_release(&p, req->r_old_dentry, 3278 req->r_old_dentry_dir, mds, 3279 req->r_old_dentry_drop, 3280 req->r_old_dentry_unless); 3281 if (ret < 0) 3282 goto out_err; 3283 releases += ret; 3284 } 3285 if (req->r_old_inode_drop) 3286 releases += ceph_encode_inode_release(&p, 3287 d_inode(req->r_old_dentry), 3288 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); 3289 3290 if (drop_cap_releases) { 3291 releases = 0; 3292 p = msg->front.iov_base + req->r_request_release_offset; 3293 } 3294 3295 lhead->num_releases = cpu_to_le16(releases); 3296 3297 encode_mclientrequest_tail(&p, req); 3298 3299 if (WARN_ON_ONCE(p > end)) { 3300 ceph_msg_put(msg); 3301 msg = ERR_PTR(-ERANGE); 3302 goto out_free2; 3303 } 3304 3305 msg->front.iov_len = p - msg->front.iov_base; 3306 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 3307 3308 if (req->r_pagelist) { 3309 struct ceph_pagelist *pagelist = req->r_pagelist; 3310 ceph_msg_data_add_pagelist(msg, pagelist); 3311 msg->hdr.data_len = cpu_to_le32(pagelist->length); 3312 } else { 3313 msg->hdr.data_len = 0; 3314 } 3315 3316 msg->hdr.data_off = cpu_to_le16(0); 3317 3318 out_free2: 3319 ceph_mdsc_free_path_info(&path_info2); 3320 out_free1: 3321 ceph_mdsc_free_path_info(&path_info1); 3322 out: 3323 return msg; 3324 out_err: 3325 ceph_msg_put(msg); 3326 msg = ERR_PTR(ret); 3327 goto out_free2; 3328 } 3329 3330 /* 3331 * called under mdsc->mutex if error, under no mutex if 3332 * success. 3333 */ 3334 static void complete_request(struct ceph_mds_client *mdsc, 3335 struct ceph_mds_request *req) 3336 { 3337 req->r_end_latency = ktime_get(); 3338 3339 trace_ceph_mdsc_complete_request(mdsc, req); 3340 3341 if (req->r_callback) 3342 req->r_callback(mdsc, req); 3343 complete_all(&req->r_completion); 3344 } 3345 3346 /* 3347 * called under mdsc->mutex 3348 */ 3349 static int __prepare_send_request(struct ceph_mds_session *session, 3350 struct ceph_mds_request *req, 3351 bool drop_cap_releases) 3352 { 3353 int mds = session->s_mds; 3354 struct ceph_mds_client *mdsc = session->s_mdsc; 3355 struct ceph_client *cl = mdsc->fsc->client; 3356 struct ceph_mds_request_head_legacy *lhead; 3357 struct ceph_mds_request_head *nhead; 3358 struct ceph_msg *msg; 3359 int flags = 0, old_max_retry; 3360 bool old_version = !test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD, 3361 &session->s_features); 3362 3363 /* 3364 * Avoid infinite retrying after overflow. The client will 3365 * increase the retry count and if the MDS is old version, 3366 * so we limit to retry at most 256 times. 3367 */ 3368 if (req->r_attempts) { 3369 old_max_retry = sizeof_field(struct ceph_mds_request_head, 3370 num_retry); 3371 old_max_retry = 1 << (old_max_retry * BITS_PER_BYTE); 3372 if ((old_version && req->r_attempts >= old_max_retry) || 3373 ((uint32_t)req->r_attempts >= U32_MAX)) { 3374 pr_warn_ratelimited_client(cl, "request tid %llu seq overflow\n", 3375 req->r_tid); 3376 return -EMULTIHOP; 3377 } 3378 } 3379 3380 req->r_attempts++; 3381 if (req->r_inode) { 3382 struct ceph_cap *cap = 3383 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); 3384 3385 if (cap) 3386 req->r_sent_on_mseq = cap->mseq; 3387 else 3388 req->r_sent_on_mseq = -1; 3389 } 3390 doutc(cl, "%p tid %lld %s (attempt %d)\n", req, req->r_tid, 3391 ceph_mds_op_name(req->r_op), req->r_attempts); 3392 3393 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3394 void *p; 3395 3396 /* 3397 * Replay. Do not regenerate message (and rebuild 3398 * paths, etc.); just use the original message. 3399 * Rebuilding paths will break for renames because 3400 * d_move mangles the src name. 3401 */ 3402 msg = req->r_request; 3403 lhead = find_legacy_request_head(msg->front.iov_base, 3404 session->s_con.peer_features); 3405 3406 flags = le32_to_cpu(lhead->flags); 3407 flags |= CEPH_MDS_FLAG_REPLAY; 3408 lhead->flags = cpu_to_le32(flags); 3409 3410 if (req->r_target_inode) 3411 lhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); 3412 3413 lhead->num_retry = req->r_attempts - 1; 3414 if (!old_version) { 3415 nhead = (struct ceph_mds_request_head*)msg->front.iov_base; 3416 nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1); 3417 } 3418 3419 /* remove cap/dentry releases from message */ 3420 lhead->num_releases = 0; 3421 3422 p = msg->front.iov_base + req->r_request_release_offset; 3423 encode_mclientrequest_tail(&p, req); 3424 3425 msg->front.iov_len = p - msg->front.iov_base; 3426 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 3427 return 0; 3428 } 3429 3430 if (req->r_request) { 3431 ceph_msg_put(req->r_request); 3432 req->r_request = NULL; 3433 } 3434 msg = create_request_message(session, req, drop_cap_releases); 3435 if (IS_ERR(msg)) { 3436 req->r_err = PTR_ERR(msg); 3437 return PTR_ERR(msg); 3438 } 3439 req->r_request = msg; 3440 3441 lhead = find_legacy_request_head(msg->front.iov_base, 3442 session->s_con.peer_features); 3443 lhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 3444 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3445 flags |= CEPH_MDS_FLAG_REPLAY; 3446 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) 3447 flags |= CEPH_MDS_FLAG_ASYNC; 3448 if (req->r_parent) 3449 flags |= CEPH_MDS_FLAG_WANT_DENTRY; 3450 lhead->flags = cpu_to_le32(flags); 3451 lhead->num_fwd = req->r_num_fwd; 3452 lhead->num_retry = req->r_attempts - 1; 3453 if (!old_version) { 3454 nhead = (struct ceph_mds_request_head*)msg->front.iov_base; 3455 nhead->ext_num_fwd = cpu_to_le32(req->r_num_fwd); 3456 nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1); 3457 } 3458 3459 doutc(cl, " r_parent = %p\n", req->r_parent); 3460 return 0; 3461 } 3462 3463 /* 3464 * called under mdsc->mutex 3465 */ 3466 static int __send_request(struct ceph_mds_session *session, 3467 struct ceph_mds_request *req, 3468 bool drop_cap_releases) 3469 { 3470 int err; 3471 3472 trace_ceph_mdsc_send_request(session, req); 3473 3474 err = __prepare_send_request(session, req, drop_cap_releases); 3475 if (!err) { 3476 ceph_msg_get(req->r_request); 3477 ceph_con_send(&session->s_con, req->r_request); 3478 } 3479 3480 return err; 3481 } 3482 3483 /* 3484 * send request, or put it on the appropriate wait list. 3485 */ 3486 static void __do_request(struct ceph_mds_client *mdsc, 3487 struct ceph_mds_request *req) 3488 { 3489 struct ceph_client *cl = mdsc->fsc->client; 3490 struct ceph_mds_session *session = NULL; 3491 int mds = -1; 3492 int err = 0; 3493 bool random; 3494 3495 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 3496 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) 3497 __unregister_request(mdsc, req); 3498 return; 3499 } 3500 3501 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) { 3502 doutc(cl, "metadata corrupted\n"); 3503 err = -EIO; 3504 goto finish; 3505 } 3506 if (req->r_timeout && 3507 time_after_eq(jiffies, req->r_started + req->r_timeout)) { 3508 doutc(cl, "timed out\n"); 3509 err = -ETIMEDOUT; 3510 goto finish; 3511 } 3512 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 3513 doutc(cl, "forced umount\n"); 3514 err = -EIO; 3515 goto finish; 3516 } 3517 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) { 3518 if (mdsc->mdsmap_err) { 3519 err = mdsc->mdsmap_err; 3520 doutc(cl, "mdsmap err %d\n", err); 3521 goto finish; 3522 } 3523 if (mdsc->mdsmap->m_epoch == 0) { 3524 doutc(cl, "no mdsmap, waiting for map\n"); 3525 trace_ceph_mdsc_suspend_request(mdsc, session, req, 3526 ceph_mdsc_suspend_reason_no_mdsmap); 3527 list_add(&req->r_wait, &mdsc->waiting_for_map); 3528 return; 3529 } 3530 if (!(mdsc->fsc->mount_options->flags & 3531 CEPH_MOUNT_OPT_MOUNTWAIT) && 3532 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) { 3533 err = -EHOSTUNREACH; 3534 goto finish; 3535 } 3536 } 3537 3538 put_request_session(req); 3539 3540 mds = __choose_mds(mdsc, req, &random); 3541 if (mds < 0 || 3542 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { 3543 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 3544 err = -EJUKEBOX; 3545 goto finish; 3546 } 3547 doutc(cl, "no mds or not active, waiting for map\n"); 3548 trace_ceph_mdsc_suspend_request(mdsc, session, req, 3549 ceph_mdsc_suspend_reason_no_active_mds); 3550 list_add(&req->r_wait, &mdsc->waiting_for_map); 3551 return; 3552 } 3553 3554 /* get, open session */ 3555 session = __ceph_lookup_mds_session(mdsc, mds); 3556 if (!session) { 3557 session = register_session(mdsc, mds); 3558 if (IS_ERR(session)) { 3559 err = PTR_ERR(session); 3560 goto finish; 3561 } 3562 } 3563 req->r_session = ceph_get_mds_session(session); 3564 3565 doutc(cl, "mds%d session %p state %s\n", mds, session, 3566 ceph_session_state_name(session->s_state)); 3567 3568 /* 3569 * The old ceph will crash the MDSs when see unknown OPs 3570 */ 3571 if (req->r_feature_needed > 0 && 3572 !test_bit(req->r_feature_needed, &session->s_features)) { 3573 err = -EOPNOTSUPP; 3574 goto out_session; 3575 } 3576 3577 if (session->s_state != CEPH_MDS_SESSION_OPEN && 3578 session->s_state != CEPH_MDS_SESSION_HUNG) { 3579 /* 3580 * We cannot queue async requests since the caps and delegated 3581 * inodes are bound to the session. Just return -EJUKEBOX and 3582 * let the caller retry a sync request in that case. 3583 */ 3584 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 3585 err = -EJUKEBOX; 3586 goto out_session; 3587 } 3588 3589 /* 3590 * If the session has been REJECTED, then return a hard error, 3591 * unless it's a CLEANRECOVER mount, in which case we'll queue 3592 * it to the mdsc queue. 3593 */ 3594 if (session->s_state == CEPH_MDS_SESSION_REJECTED) { 3595 if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER)) { 3596 trace_ceph_mdsc_suspend_request(mdsc, session, req, 3597 ceph_mdsc_suspend_reason_rejected); 3598 list_add(&req->r_wait, &mdsc->waiting_for_map); 3599 } else 3600 err = -EACCES; 3601 goto out_session; 3602 } 3603 3604 if (session->s_state == CEPH_MDS_SESSION_NEW || 3605 session->s_state == CEPH_MDS_SESSION_CLOSING) { 3606 err = __open_session(mdsc, session); 3607 if (err) 3608 goto out_session; 3609 /* retry the same mds later */ 3610 if (random) 3611 req->r_resend_mds = mds; 3612 } 3613 trace_ceph_mdsc_suspend_request(mdsc, session, req, 3614 ceph_mdsc_suspend_reason_session); 3615 list_add(&req->r_wait, &session->s_waiting); 3616 goto out_session; 3617 } 3618 3619 /* send request */ 3620 req->r_resend_mds = -1; /* forget any previous mds hint */ 3621 3622 if (req->r_request_started == 0) /* note request start time */ 3623 req->r_request_started = jiffies; 3624 3625 /* 3626 * For async create we will choose the auth MDS of frag in parent 3627 * directory to send the request and usually this works fine, but 3628 * if the migrated the dirtory to another MDS before it could handle 3629 * it the request will be forwarded. 3630 * 3631 * And then the auth cap will be changed. 3632 */ 3633 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags) && req->r_num_fwd) { 3634 struct ceph_dentry_info *di = ceph_dentry(req->r_dentry); 3635 struct ceph_inode_info *ci; 3636 struct ceph_cap *cap; 3637 3638 /* 3639 * The request maybe handled very fast and the new inode 3640 * hasn't been linked to the dentry yet. We need to wait 3641 * for the ceph_finish_async_create(), which shouldn't be 3642 * stuck too long or fail in thoery, to finish when forwarding 3643 * the request. 3644 */ 3645 if (!d_inode(req->r_dentry)) { 3646 err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_CREATE_BIT, 3647 TASK_KILLABLE); 3648 if (err) { 3649 mutex_lock(&req->r_fill_mutex); 3650 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 3651 mutex_unlock(&req->r_fill_mutex); 3652 goto out_session; 3653 } 3654 } 3655 3656 ci = ceph_inode(d_inode(req->r_dentry)); 3657 3658 spin_lock(&ci->i_ceph_lock); 3659 cap = ci->i_auth_cap; 3660 if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE && mds != cap->mds) { 3661 doutc(cl, "session changed for auth cap %d -> %d\n", 3662 cap->session->s_mds, session->s_mds); 3663 3664 /* Remove the auth cap from old session */ 3665 spin_lock(&cap->session->s_cap_lock); 3666 cap->session->s_nr_caps--; 3667 list_del_init(&cap->session_caps); 3668 spin_unlock(&cap->session->s_cap_lock); 3669 3670 /* Add the auth cap to the new session */ 3671 cap->mds = mds; 3672 cap->session = session; 3673 spin_lock(&session->s_cap_lock); 3674 session->s_nr_caps++; 3675 list_add_tail(&cap->session_caps, &session->s_caps); 3676 spin_unlock(&session->s_cap_lock); 3677 3678 change_auth_cap_ses(ci, session); 3679 } 3680 spin_unlock(&ci->i_ceph_lock); 3681 } 3682 3683 err = __send_request(session, req, false); 3684 3685 out_session: 3686 ceph_put_mds_session(session); 3687 finish: 3688 if (err) { 3689 doutc(cl, "early error %d\n", err); 3690 req->r_err = err; 3691 complete_request(mdsc, req); 3692 __unregister_request(mdsc, req); 3693 } 3694 return; 3695 } 3696 3697 /* 3698 * called under mdsc->mutex 3699 */ 3700 static void __wake_requests(struct ceph_mds_client *mdsc, 3701 struct list_head *head) 3702 { 3703 struct ceph_client *cl = mdsc->fsc->client; 3704 struct ceph_mds_request *req; 3705 LIST_HEAD(tmp_list); 3706 3707 list_splice_init(head, &tmp_list); 3708 3709 while (!list_empty(&tmp_list)) { 3710 req = list_entry(tmp_list.next, 3711 struct ceph_mds_request, r_wait); 3712 list_del_init(&req->r_wait); 3713 doutc(cl, " wake request %p tid %llu\n", req, 3714 req->r_tid); 3715 trace_ceph_mdsc_resume_request(mdsc, req); 3716 __do_request(mdsc, req); 3717 } 3718 } 3719 3720 /* 3721 * Wake up threads with requests pending for @mds, so that they can 3722 * resubmit their requests to a possibly different mds. 3723 */ 3724 static void kick_requests(struct ceph_mds_client *mdsc, int mds) 3725 { 3726 struct ceph_client *cl = mdsc->fsc->client; 3727 struct ceph_mds_request *req; 3728 struct rb_node *p = rb_first(&mdsc->request_tree); 3729 3730 doutc(cl, "kick_requests mds%d\n", mds); 3731 while (p) { 3732 req = rb_entry(p, struct ceph_mds_request, r_node); 3733 p = rb_next(p); 3734 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3735 continue; 3736 if (req->r_attempts > 0) 3737 continue; /* only new requests */ 3738 if (req->r_session && 3739 req->r_session->s_mds == mds) { 3740 doutc(cl, " kicking tid %llu\n", req->r_tid); 3741 list_del_init(&req->r_wait); 3742 trace_ceph_mdsc_resume_request(mdsc, req); 3743 __do_request(mdsc, req); 3744 } 3745 } 3746 } 3747 3748 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, 3749 struct ceph_mds_request *req) 3750 { 3751 struct ceph_client *cl = mdsc->fsc->client; 3752 int err = 0; 3753 3754 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */ 3755 if (req->r_inode) 3756 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 3757 if (req->r_parent) { 3758 struct ceph_inode_info *ci = ceph_inode(req->r_parent); 3759 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ? 3760 CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD; 3761 spin_lock(&ci->i_ceph_lock); 3762 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false); 3763 __ceph_touch_fmode(ci, mdsc, fmode); 3764 spin_unlock(&ci->i_ceph_lock); 3765 } 3766 if (req->r_old_dentry_dir) 3767 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 3768 CEPH_CAP_PIN); 3769 3770 if (req->r_inode) { 3771 err = ceph_wait_on_async_create(req->r_inode); 3772 if (err) { 3773 doutc(cl, "wait for async create returned: %d\n", err); 3774 return err; 3775 } 3776 } 3777 3778 if (!err && req->r_old_inode) { 3779 err = ceph_wait_on_async_create(req->r_old_inode); 3780 if (err) { 3781 doutc(cl, "wait for async create returned: %d\n", err); 3782 return err; 3783 } 3784 } 3785 3786 doutc(cl, "submit_request on %p for inode %p\n", req, dir); 3787 mutex_lock(&mdsc->mutex); 3788 __register_request(mdsc, req, dir); 3789 trace_ceph_mdsc_submit_request(mdsc, req); 3790 __do_request(mdsc, req); 3791 err = req->r_err; 3792 mutex_unlock(&mdsc->mutex); 3793 return err; 3794 } 3795 3796 int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc, 3797 struct ceph_mds_request *req, 3798 ceph_mds_request_wait_callback_t wait_func) 3799 { 3800 struct ceph_client *cl = mdsc->fsc->client; 3801 int err; 3802 3803 /* wait */ 3804 doutc(cl, "do_request waiting\n"); 3805 if (wait_func) { 3806 err = wait_func(mdsc, req); 3807 } else { 3808 long timeleft = wait_for_completion_killable_timeout( 3809 &req->r_completion, 3810 ceph_timeout_jiffies(req->r_timeout)); 3811 if (timeleft > 0) 3812 err = 0; 3813 else if (!timeleft) 3814 err = -ETIMEDOUT; /* timed out */ 3815 else 3816 err = timeleft; /* killed */ 3817 } 3818 doutc(cl, "do_request waited, got %d\n", err); 3819 mutex_lock(&mdsc->mutex); 3820 3821 /* only abort if we didn't race with a real reply */ 3822 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 3823 err = le32_to_cpu(req->r_reply_info.head->result); 3824 } else if (err < 0) { 3825 doutc(cl, "aborted request %lld with %d\n", req->r_tid, err); 3826 3827 /* 3828 * ensure we aren't running concurrently with 3829 * ceph_fill_trace or ceph_readdir_prepopulate, which 3830 * rely on locks (dir mutex) held by our caller. 3831 */ 3832 mutex_lock(&req->r_fill_mutex); 3833 req->r_err = err; 3834 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 3835 mutex_unlock(&req->r_fill_mutex); 3836 3837 if (req->r_parent && 3838 (req->r_op & CEPH_MDS_OP_WRITE)) 3839 ceph_invalidate_dir_request(req); 3840 } else { 3841 err = req->r_err; 3842 } 3843 3844 mutex_unlock(&mdsc->mutex); 3845 return err; 3846 } 3847 3848 /* 3849 * Synchrously perform an mds request. Take care of all of the 3850 * session setup, forwarding, retry details. 3851 */ 3852 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, 3853 struct inode *dir, 3854 struct ceph_mds_request *req) 3855 { 3856 struct ceph_client *cl = mdsc->fsc->client; 3857 int err; 3858 3859 doutc(cl, "do_request on %p\n", req); 3860 3861 /* issue */ 3862 err = ceph_mdsc_submit_request(mdsc, dir, req); 3863 if (!err) 3864 err = ceph_mdsc_wait_request(mdsc, req, NULL); 3865 doutc(cl, "do_request %p done, result %d\n", req, err); 3866 return err; 3867 } 3868 3869 /* 3870 * Invalidate dir's completeness, dentry lease state on an aborted MDS 3871 * namespace request. 3872 */ 3873 void ceph_invalidate_dir_request(struct ceph_mds_request *req) 3874 { 3875 struct inode *dir = req->r_parent; 3876 struct inode *old_dir = req->r_old_dentry_dir; 3877 struct ceph_client *cl = req->r_mdsc->fsc->client; 3878 3879 doutc(cl, "invalidate_dir_request %p %p (complete, lease(s))\n", 3880 dir, old_dir); 3881 3882 ceph_dir_clear_complete(dir); 3883 if (old_dir) 3884 ceph_dir_clear_complete(old_dir); 3885 if (req->r_dentry) 3886 ceph_invalidate_dentry_lease(req->r_dentry); 3887 if (req->r_old_dentry) 3888 ceph_invalidate_dentry_lease(req->r_old_dentry); 3889 } 3890 3891 /* 3892 * Handle mds reply. 3893 * 3894 * We take the session mutex and parse and process the reply immediately. 3895 * This preserves the logical ordering of replies, capabilities, etc., sent 3896 * by the MDS as they are applied to our local cache. 3897 */ 3898 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) 3899 { 3900 struct ceph_mds_client *mdsc = session->s_mdsc; 3901 struct ceph_client *cl = mdsc->fsc->client; 3902 struct ceph_mds_request *req; 3903 struct ceph_mds_reply_head *head = msg->front.iov_base; 3904 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 3905 struct ceph_snap_realm *realm; 3906 u64 tid; 3907 int err, result; 3908 int mds = session->s_mds; 3909 bool close_sessions = false; 3910 3911 if (msg->front.iov_len < sizeof(*head)) { 3912 pr_err_client(cl, "got corrupt (short) reply\n"); 3913 ceph_msg_dump(msg); 3914 return; 3915 } 3916 3917 /* get request, session */ 3918 tid = le64_to_cpu(msg->hdr.tid); 3919 mutex_lock(&mdsc->mutex); 3920 req = lookup_get_request(mdsc, tid); 3921 if (!req) { 3922 doutc(cl, "on unknown tid %llu\n", tid); 3923 mutex_unlock(&mdsc->mutex); 3924 return; 3925 } 3926 doutc(cl, "handle_reply %p\n", req); 3927 3928 /* correct session? */ 3929 if (req->r_session != session) { 3930 pr_err_client(cl, "got %llu on session mds%d not mds%d\n", 3931 tid, session->s_mds, 3932 req->r_session ? req->r_session->s_mds : -1); 3933 mutex_unlock(&mdsc->mutex); 3934 goto out; 3935 } 3936 3937 /* dup? */ 3938 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) || 3939 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) { 3940 pr_warn_client(cl, "got a dup %s reply on %llu from mds%d\n", 3941 head->safe ? "safe" : "unsafe", tid, mds); 3942 mutex_unlock(&mdsc->mutex); 3943 goto out; 3944 } 3945 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) { 3946 pr_warn_client(cl, "got unsafe after safe on %llu from mds%d\n", 3947 tid, mds); 3948 mutex_unlock(&mdsc->mutex); 3949 goto out; 3950 } 3951 3952 result = le32_to_cpu(head->result); 3953 3954 if (head->safe) { 3955 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags); 3956 __unregister_request(mdsc, req); 3957 3958 /* last request during umount? */ 3959 if (mdsc->stopping && !__get_oldest_req(mdsc)) 3960 complete_all(&mdsc->safe_umount_waiters); 3961 3962 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3963 /* 3964 * We already handled the unsafe response, now do the 3965 * cleanup. No need to examine the response; the MDS 3966 * doesn't include any result info in the safe 3967 * response. And even if it did, there is nothing 3968 * useful we could do with a revised return value. 3969 */ 3970 doutc(cl, "got safe reply %llu, mds%d\n", tid, mds); 3971 3972 mutex_unlock(&mdsc->mutex); 3973 goto out; 3974 } 3975 } else { 3976 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags); 3977 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 3978 } 3979 3980 doutc(cl, "tid %lld result %d\n", tid, result); 3981 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features)) 3982 err = parse_reply_info(session, msg, req, (u64)-1); 3983 else 3984 err = parse_reply_info(session, msg, req, 3985 session->s_con.peer_features); 3986 mutex_unlock(&mdsc->mutex); 3987 3988 /* Must find target inode outside of mutexes to avoid deadlocks */ 3989 rinfo = &req->r_reply_info; 3990 if ((err >= 0) && rinfo->head->is_target) { 3991 struct inode *in = xchg(&req->r_new_inode, NULL); 3992 struct ceph_vino tvino = { 3993 .ino = le64_to_cpu(rinfo->targeti.in->ino), 3994 .snap = le64_to_cpu(rinfo->targeti.in->snapid) 3995 }; 3996 3997 /* 3998 * If we ended up opening an existing inode, discard 3999 * r_new_inode 4000 */ 4001 if (req->r_op == CEPH_MDS_OP_CREATE && 4002 !req->r_reply_info.has_create_ino) { 4003 /* This should never happen on an async create */ 4004 WARN_ON_ONCE(req->r_deleg_ino); 4005 iput(in); 4006 in = NULL; 4007 } 4008 4009 in = ceph_get_inode(mdsc->fsc->sb, tvino, in); 4010 if (IS_ERR(in)) { 4011 err = PTR_ERR(in); 4012 mutex_lock(&session->s_mutex); 4013 goto out_err; 4014 } 4015 req->r_target_inode = in; 4016 ceph_inode_set_subvolume(in, rinfo->targeti.subvolume_id); 4017 } 4018 4019 mutex_lock(&session->s_mutex); 4020 if (err < 0) { 4021 pr_err_client(cl, "got corrupt reply mds%d(tid:%lld)\n", 4022 mds, tid); 4023 ceph_msg_dump(msg); 4024 goto out_err; 4025 } 4026 4027 /* snap trace */ 4028 realm = NULL; 4029 if (rinfo->snapblob_len) { 4030 down_write(&mdsc->snap_rwsem); 4031 err = ceph_update_snap_trace(mdsc, rinfo->snapblob, 4032 rinfo->snapblob + rinfo->snapblob_len, 4033 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP, 4034 &realm); 4035 if (err) { 4036 up_write(&mdsc->snap_rwsem); 4037 close_sessions = true; 4038 if (err == -EIO) 4039 ceph_msg_dump(msg); 4040 goto out_err; 4041 } 4042 downgrade_write(&mdsc->snap_rwsem); 4043 } else { 4044 down_read(&mdsc->snap_rwsem); 4045 } 4046 4047 /* insert trace into our cache */ 4048 mutex_lock(&req->r_fill_mutex); 4049 current->journal_info = req; 4050 err = ceph_fill_trace(mdsc->fsc->sb, req); 4051 if (err == 0) { 4052 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || 4053 req->r_op == CEPH_MDS_OP_LSSNAP)) 4054 err = ceph_readdir_prepopulate(req, req->r_session); 4055 } 4056 current->journal_info = NULL; 4057 mutex_unlock(&req->r_fill_mutex); 4058 4059 up_read(&mdsc->snap_rwsem); 4060 if (realm) 4061 ceph_put_snap_realm(mdsc, realm); 4062 4063 if (err == 0) { 4064 if (req->r_target_inode && 4065 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 4066 struct ceph_inode_info *ci = 4067 ceph_inode(req->r_target_inode); 4068 spin_lock(&ci->i_unsafe_lock); 4069 list_add_tail(&req->r_unsafe_target_item, 4070 &ci->i_unsafe_iops); 4071 spin_unlock(&ci->i_unsafe_lock); 4072 } 4073 4074 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 4075 } 4076 out_err: 4077 mutex_lock(&mdsc->mutex); 4078 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 4079 if (err) { 4080 req->r_err = err; 4081 } else { 4082 req->r_reply = ceph_msg_get(msg); 4083 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags); 4084 } 4085 } else { 4086 doutc(cl, "reply arrived after request %lld was aborted\n", tid); 4087 } 4088 mutex_unlock(&mdsc->mutex); 4089 4090 mutex_unlock(&session->s_mutex); 4091 4092 /* kick calling process */ 4093 complete_request(mdsc, req); 4094 4095 ceph_update_metadata_metrics(&mdsc->metric, req->r_start_latency, 4096 req->r_end_latency, err); 4097 out: 4098 ceph_mdsc_put_request(req); 4099 4100 /* Defer closing the sessions after s_mutex lock being released */ 4101 if (close_sessions) 4102 ceph_mdsc_close_sessions(mdsc); 4103 return; 4104 } 4105 4106 4107 4108 /* 4109 * handle mds notification that our request has been forwarded. 4110 */ 4111 static void handle_forward(struct ceph_mds_client *mdsc, 4112 struct ceph_mds_session *session, 4113 struct ceph_msg *msg) 4114 { 4115 struct ceph_client *cl = mdsc->fsc->client; 4116 struct ceph_mds_request *req; 4117 u64 tid = le64_to_cpu(msg->hdr.tid); 4118 u32 next_mds; 4119 u32 fwd_seq; 4120 int err = -EINVAL; 4121 void *p = msg->front.iov_base; 4122 void *end = p + msg->front.iov_len; 4123 bool aborted = false; 4124 4125 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 4126 next_mds = ceph_decode_32(&p); 4127 fwd_seq = ceph_decode_32(&p); 4128 4129 mutex_lock(&mdsc->mutex); 4130 req = lookup_get_request(mdsc, tid); 4131 if (!req) { 4132 mutex_unlock(&mdsc->mutex); 4133 doutc(cl, "forward tid %llu to mds%d - req dne\n", tid, next_mds); 4134 return; /* dup reply? */ 4135 } 4136 4137 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 4138 doutc(cl, "forward tid %llu aborted, unregistering\n", tid); 4139 __unregister_request(mdsc, req); 4140 } else if (fwd_seq <= req->r_num_fwd || (uint32_t)fwd_seq >= U32_MAX) { 4141 /* 4142 * Avoid infinite retrying after overflow. 4143 * 4144 * The MDS will increase the fwd count and in client side 4145 * if the num_fwd is less than the one saved in request 4146 * that means the MDS is an old version and overflowed of 4147 * 8 bits. 4148 */ 4149 mutex_lock(&req->r_fill_mutex); 4150 req->r_err = -EMULTIHOP; 4151 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 4152 mutex_unlock(&req->r_fill_mutex); 4153 aborted = true; 4154 pr_warn_ratelimited_client(cl, "forward tid %llu seq overflow\n", 4155 tid); 4156 } else { 4157 /* resend. forward race not possible; mds would drop */ 4158 doutc(cl, "forward tid %llu to mds%d (we resend)\n", tid, next_mds); 4159 BUG_ON(req->r_err); 4160 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)); 4161 req->r_attempts = 0; 4162 req->r_num_fwd = fwd_seq; 4163 req->r_resend_mds = next_mds; 4164 put_request_session(req); 4165 __do_request(mdsc, req); 4166 } 4167 mutex_unlock(&mdsc->mutex); 4168 4169 /* kick calling process */ 4170 if (aborted) 4171 complete_request(mdsc, req); 4172 ceph_mdsc_put_request(req); 4173 return; 4174 4175 bad: 4176 pr_err_client(cl, "decode error err=%d\n", err); 4177 ceph_msg_dump(msg); 4178 } 4179 4180 static int __decode_session_metadata(void **p, void *end, 4181 bool *blocklisted) 4182 { 4183 /* map<string,string> */ 4184 u32 n; 4185 bool err_str; 4186 ceph_decode_32_safe(p, end, n, bad); 4187 while (n-- > 0) { 4188 u32 len; 4189 ceph_decode_32_safe(p, end, len, bad); 4190 ceph_decode_need(p, end, len, bad); 4191 err_str = !strncmp(*p, "error_string", len); 4192 *p += len; 4193 ceph_decode_32_safe(p, end, len, bad); 4194 ceph_decode_need(p, end, len, bad); 4195 /* 4196 * Match "blocklisted (blacklisted)" from newer MDSes, 4197 * or "blacklisted" from older MDSes. 4198 */ 4199 if (err_str && strnstr(*p, "blacklisted", len)) 4200 *blocklisted = true; 4201 *p += len; 4202 } 4203 return 0; 4204 bad: 4205 return -1; 4206 } 4207 4208 /* 4209 * handle a mds session control message 4210 */ 4211 static void handle_session(struct ceph_mds_session *session, 4212 struct ceph_msg *msg) 4213 { 4214 struct ceph_mds_client *mdsc = session->s_mdsc; 4215 struct ceph_client *cl = mdsc->fsc->client; 4216 int mds = session->s_mds; 4217 int msg_version = le16_to_cpu(msg->hdr.version); 4218 void *p = msg->front.iov_base; 4219 void *end = p + msg->front.iov_len; 4220 struct ceph_mds_session_head *h; 4221 struct ceph_mds_cap_auth *cap_auths = NULL; 4222 u32 op, cap_auths_num = 0; 4223 u64 seq, features = 0; 4224 int wake = 0; 4225 bool blocklisted = false; 4226 u32 i; 4227 4228 4229 /* decode */ 4230 ceph_decode_need(&p, end, sizeof(*h), bad); 4231 h = p; 4232 p += sizeof(*h); 4233 4234 op = le32_to_cpu(h->op); 4235 seq = le64_to_cpu(h->seq); 4236 4237 if (msg_version >= 3) { 4238 u32 len; 4239 /* version >= 2 and < 5, decode metadata, skip otherwise 4240 * as it's handled via flags. 4241 */ 4242 if (msg_version >= 5) 4243 ceph_decode_skip_map(&p, end, string, string, bad); 4244 else if (__decode_session_metadata(&p, end, &blocklisted) < 0) 4245 goto bad; 4246 4247 /* version >= 3, feature bits */ 4248 ceph_decode_32_safe(&p, end, len, bad); 4249 if (len) { 4250 ceph_decode_64_safe(&p, end, features, bad); 4251 p += len - sizeof(features); 4252 } 4253 } 4254 4255 if (msg_version >= 5) { 4256 u32 flags, len; 4257 4258 /* version >= 4 */ 4259 ceph_decode_skip_16(&p, end, bad); /* struct_v, struct_cv */ 4260 ceph_decode_32_safe(&p, end, len, bad); /* len */ 4261 ceph_decode_skip_n(&p, end, len, bad); /* metric_spec */ 4262 4263 /* version >= 5, flags */ 4264 ceph_decode_32_safe(&p, end, flags, bad); 4265 if (flags & CEPH_SESSION_BLOCKLISTED) { 4266 pr_warn_client(cl, "mds%d session blocklisted\n", 4267 session->s_mds); 4268 blocklisted = true; 4269 } 4270 } 4271 4272 if (msg_version >= 6) { 4273 ceph_decode_32_safe(&p, end, cap_auths_num, bad); 4274 doutc(cl, "cap_auths_num %d\n", cap_auths_num); 4275 4276 if (cap_auths_num && op != CEPH_SESSION_OPEN) { 4277 WARN_ON_ONCE(op != CEPH_SESSION_OPEN); 4278 goto skip_cap_auths; 4279 } 4280 4281 cap_auths = kzalloc_objs(struct ceph_mds_cap_auth, 4282 cap_auths_num); 4283 if (!cap_auths) { 4284 pr_err_client(cl, "No memory for cap_auths\n"); 4285 return; 4286 } 4287 4288 for (i = 0; i < cap_auths_num; i++) { 4289 u32 _len, j; 4290 4291 /* struct_v, struct_compat, and struct_len in MDSCapAuth */ 4292 ceph_decode_skip_n(&p, end, 2 + sizeof(u32), bad); 4293 4294 /* struct_v, struct_compat, and struct_len in MDSCapMatch */ 4295 ceph_decode_skip_n(&p, end, 2 + sizeof(u32), bad); 4296 ceph_decode_64_safe(&p, end, cap_auths[i].match.uid, bad); 4297 ceph_decode_32_safe(&p, end, _len, bad); 4298 if (_len) { 4299 cap_auths[i].match.gids = kcalloc(_len, sizeof(u32), 4300 GFP_KERNEL); 4301 if (!cap_auths[i].match.gids) { 4302 pr_err_client(cl, "No memory for gids\n"); 4303 goto fail; 4304 } 4305 4306 cap_auths[i].match.num_gids = _len; 4307 for (j = 0; j < _len; j++) 4308 ceph_decode_32_safe(&p, end, 4309 cap_auths[i].match.gids[j], 4310 bad); 4311 } 4312 4313 ceph_decode_32_safe(&p, end, _len, bad); 4314 if (_len) { 4315 cap_auths[i].match.path = kcalloc(_len + 1, sizeof(char), 4316 GFP_KERNEL); 4317 if (!cap_auths[i].match.path) { 4318 pr_err_client(cl, "No memory for path\n"); 4319 goto fail; 4320 } 4321 ceph_decode_copy(&p, cap_auths[i].match.path, _len); 4322 4323 /* Remove the tailing '/' */ 4324 while (_len && cap_auths[i].match.path[_len - 1] == '/') { 4325 cap_auths[i].match.path[_len - 1] = '\0'; 4326 _len -= 1; 4327 } 4328 } 4329 4330 ceph_decode_32_safe(&p, end, _len, bad); 4331 if (_len) { 4332 cap_auths[i].match.fs_name = kcalloc(_len + 1, sizeof(char), 4333 GFP_KERNEL); 4334 if (!cap_auths[i].match.fs_name) { 4335 pr_err_client(cl, "No memory for fs_name\n"); 4336 goto fail; 4337 } 4338 ceph_decode_copy(&p, cap_auths[i].match.fs_name, _len); 4339 } 4340 4341 ceph_decode_8_safe(&p, end, cap_auths[i].match.root_squash, bad); 4342 ceph_decode_8_safe(&p, end, cap_auths[i].readable, bad); 4343 ceph_decode_8_safe(&p, end, cap_auths[i].writeable, bad); 4344 doutc(cl, "uid %lld, num_gids %u, path %s, fs_name %s, root_squash %d, readable %d, writeable %d\n", 4345 cap_auths[i].match.uid, cap_auths[i].match.num_gids, 4346 cap_auths[i].match.path, cap_auths[i].match.fs_name, 4347 cap_auths[i].match.root_squash, 4348 cap_auths[i].readable, cap_auths[i].writeable); 4349 } 4350 } 4351 4352 skip_cap_auths: 4353 mutex_lock(&mdsc->mutex); 4354 if (op == CEPH_SESSION_OPEN) { 4355 if (mdsc->s_cap_auths) { 4356 for (i = 0; i < mdsc->s_cap_auths_num; i++) { 4357 kfree(mdsc->s_cap_auths[i].match.gids); 4358 kfree(mdsc->s_cap_auths[i].match.path); 4359 kfree(mdsc->s_cap_auths[i].match.fs_name); 4360 } 4361 kfree(mdsc->s_cap_auths); 4362 } 4363 mdsc->s_cap_auths_num = cap_auths_num; 4364 mdsc->s_cap_auths = cap_auths; 4365 4366 session->s_features = features; 4367 if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, 4368 &session->s_features)) 4369 ceph_metric_bind_session(mdsc, session); 4370 } 4371 if (op == CEPH_SESSION_CLOSE) { 4372 ceph_get_mds_session(session); 4373 __unregister_session(mdsc, session); 4374 } 4375 /* FIXME: this ttl calculation is generous */ 4376 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 4377 mutex_unlock(&mdsc->mutex); 4378 4379 mutex_lock(&session->s_mutex); 4380 4381 doutc(cl, "mds%d %s %p state %s seq %llu\n", mds, 4382 ceph_session_op_name(op), session, 4383 ceph_session_state_name(session->s_state), seq); 4384 4385 if (session->s_state == CEPH_MDS_SESSION_HUNG) { 4386 session->s_state = CEPH_MDS_SESSION_OPEN; 4387 pr_info_client(cl, "mds%d came back\n", session->s_mds); 4388 } 4389 4390 switch (op) { 4391 case CEPH_SESSION_OPEN: 4392 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 4393 pr_info_client(cl, "mds%d reconnect success\n", 4394 session->s_mds); 4395 4396 if (test_bit(CEPHFS_FEATURE_SUBVOLUME_METRICS, 4397 &session->s_features)) 4398 ceph_subvolume_metrics_enable(&mdsc->subvol_metrics, true); 4399 else 4400 ceph_subvolume_metrics_enable(&mdsc->subvol_metrics, false); 4401 if (session->s_state == CEPH_MDS_SESSION_OPEN) { 4402 pr_notice_client(cl, "mds%d is already opened\n", 4403 session->s_mds); 4404 } else { 4405 session->s_state = CEPH_MDS_SESSION_OPEN; 4406 renewed_caps(mdsc, session, 0); 4407 if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, 4408 &session->s_features)) 4409 metric_schedule_delayed(&mdsc->metric); 4410 } 4411 4412 /* 4413 * The connection maybe broken and the session in client 4414 * side has been reinitialized, need to update the seq 4415 * anyway. 4416 */ 4417 if (!session->s_seq && seq) 4418 session->s_seq = seq; 4419 4420 wake = 1; 4421 if (mdsc->stopping) 4422 __close_session(mdsc, session); 4423 break; 4424 4425 case CEPH_SESSION_RENEWCAPS: 4426 if (session->s_renew_seq == seq) 4427 renewed_caps(mdsc, session, 1); 4428 break; 4429 4430 case CEPH_SESSION_CLOSE: 4431 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 4432 pr_info_client(cl, "mds%d reconnect denied\n", 4433 session->s_mds); 4434 session->s_state = CEPH_MDS_SESSION_CLOSED; 4435 cleanup_session_requests(mdsc, session); 4436 remove_session_caps(session); 4437 wake = 2; /* for good measure */ 4438 wake_up_all(&mdsc->session_close_wq); 4439 break; 4440 4441 case CEPH_SESSION_STALE: 4442 pr_info_client(cl, "mds%d caps went stale, renewing\n", 4443 session->s_mds); 4444 atomic_inc(&session->s_cap_gen); 4445 session->s_cap_ttl = jiffies - 1; 4446 send_renew_caps(mdsc, session); 4447 break; 4448 4449 case CEPH_SESSION_RECALL_STATE: 4450 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 4451 break; 4452 4453 case CEPH_SESSION_FLUSHMSG: 4454 /* flush cap releases */ 4455 spin_lock(&session->s_cap_lock); 4456 if (session->s_num_cap_releases) 4457 ceph_flush_session_cap_releases(mdsc, session); 4458 spin_unlock(&session->s_cap_lock); 4459 4460 send_flushmsg_ack(mdsc, session, seq); 4461 break; 4462 4463 case CEPH_SESSION_FORCE_RO: 4464 doutc(cl, "force_session_readonly %p\n", session); 4465 spin_lock(&session->s_cap_lock); 4466 session->s_readonly = true; 4467 spin_unlock(&session->s_cap_lock); 4468 wake_up_session_caps(session, FORCE_RO); 4469 break; 4470 4471 case CEPH_SESSION_REJECT: 4472 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING); 4473 pr_info_client(cl, "mds%d rejected session\n", 4474 session->s_mds); 4475 session->s_state = CEPH_MDS_SESSION_REJECTED; 4476 cleanup_session_requests(mdsc, session); 4477 remove_session_caps(session); 4478 if (blocklisted) 4479 mdsc->fsc->blocklisted = true; 4480 wake = 2; /* for good measure */ 4481 break; 4482 4483 default: 4484 pr_err_client(cl, "bad op %d mds%d\n", op, mds); 4485 WARN_ON(1); 4486 } 4487 4488 mutex_unlock(&session->s_mutex); 4489 if (wake) { 4490 mutex_lock(&mdsc->mutex); 4491 __wake_requests(mdsc, &session->s_waiting); 4492 if (wake == 2) 4493 kick_requests(mdsc, mds); 4494 mutex_unlock(&mdsc->mutex); 4495 } 4496 if (op == CEPH_SESSION_CLOSE) 4497 ceph_put_mds_session(session); 4498 return; 4499 4500 bad: 4501 pr_err_client(cl, "corrupt message mds%d len %d\n", mds, 4502 (int)msg->front.iov_len); 4503 ceph_msg_dump(msg); 4504 fail: 4505 for (i = 0; i < cap_auths_num; i++) { 4506 kfree(cap_auths[i].match.gids); 4507 kfree(cap_auths[i].match.path); 4508 kfree(cap_auths[i].match.fs_name); 4509 } 4510 kfree(cap_auths); 4511 return; 4512 } 4513 4514 void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req) 4515 { 4516 struct ceph_client *cl = req->r_mdsc->fsc->client; 4517 int dcaps; 4518 4519 dcaps = xchg(&req->r_dir_caps, 0); 4520 if (dcaps) { 4521 doutc(cl, "releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 4522 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps); 4523 } 4524 } 4525 4526 void ceph_mdsc_release_dir_caps_async(struct ceph_mds_request *req) 4527 { 4528 struct ceph_client *cl = req->r_mdsc->fsc->client; 4529 int dcaps; 4530 4531 dcaps = xchg(&req->r_dir_caps, 0); 4532 if (dcaps) { 4533 doutc(cl, "releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 4534 ceph_put_cap_refs_async(ceph_inode(req->r_parent), dcaps); 4535 } 4536 } 4537 4538 /* 4539 * called under session->mutex. 4540 */ 4541 static void replay_unsafe_requests(struct ceph_mds_client *mdsc, 4542 struct ceph_mds_session *session) 4543 { 4544 struct ceph_mds_request *req, *nreq; 4545 struct rb_node *p; 4546 4547 doutc(mdsc->fsc->client, "mds%d\n", session->s_mds); 4548 4549 mutex_lock(&mdsc->mutex); 4550 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) 4551 __send_request(session, req, true); 4552 4553 /* 4554 * also re-send old requests when MDS enters reconnect stage. So that MDS 4555 * can process completed request in clientreplay stage. 4556 */ 4557 p = rb_first(&mdsc->request_tree); 4558 while (p) { 4559 req = rb_entry(p, struct ceph_mds_request, r_node); 4560 p = rb_next(p); 4561 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 4562 continue; 4563 if (req->r_attempts == 0) 4564 continue; /* only old requests */ 4565 if (!req->r_session) 4566 continue; 4567 if (req->r_session->s_mds != session->s_mds) 4568 continue; 4569 4570 ceph_mdsc_release_dir_caps_async(req); 4571 4572 __send_request(session, req, true); 4573 } 4574 mutex_unlock(&mdsc->mutex); 4575 } 4576 4577 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state) 4578 { 4579 struct ceph_msg *reply; 4580 struct ceph_pagelist *_pagelist; 4581 struct page *page; 4582 __le32 *addr; 4583 int err = -ENOMEM; 4584 4585 if (!recon_state->allow_multi) 4586 return -ENOSPC; 4587 4588 /* can't handle message that contains both caps and realm */ 4589 BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms); 4590 4591 /* pre-allocate new pagelist */ 4592 _pagelist = ceph_pagelist_alloc(GFP_NOFS); 4593 if (!_pagelist) 4594 return -ENOMEM; 4595 4596 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 4597 if (!reply) 4598 goto fail_msg; 4599 4600 /* placeholder for nr_caps */ 4601 err = ceph_pagelist_encode_32(_pagelist, 0); 4602 if (err < 0) 4603 goto fail; 4604 4605 if (recon_state->nr_caps) { 4606 /* currently encoding caps */ 4607 err = ceph_pagelist_encode_32(recon_state->pagelist, 0); 4608 if (err) 4609 goto fail; 4610 } else { 4611 /* placeholder for nr_realms (currently encoding relams) */ 4612 err = ceph_pagelist_encode_32(_pagelist, 0); 4613 if (err < 0) 4614 goto fail; 4615 } 4616 4617 err = ceph_pagelist_encode_8(recon_state->pagelist, 1); 4618 if (err) 4619 goto fail; 4620 4621 page = list_first_entry(&recon_state->pagelist->head, struct page, lru); 4622 addr = kmap_atomic(page); 4623 if (recon_state->nr_caps) { 4624 /* currently encoding caps */ 4625 *addr = cpu_to_le32(recon_state->nr_caps); 4626 } else { 4627 /* currently encoding relams */ 4628 *(addr + 1) = cpu_to_le32(recon_state->nr_realms); 4629 } 4630 kunmap_atomic(addr); 4631 4632 reply->hdr.version = cpu_to_le16(5); 4633 reply->hdr.compat_version = cpu_to_le16(4); 4634 4635 reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length); 4636 ceph_msg_data_add_pagelist(reply, recon_state->pagelist); 4637 4638 ceph_con_send(&recon_state->session->s_con, reply); 4639 ceph_pagelist_release(recon_state->pagelist); 4640 4641 recon_state->pagelist = _pagelist; 4642 recon_state->nr_caps = 0; 4643 recon_state->nr_realms = 0; 4644 recon_state->msg_version = 5; 4645 return 0; 4646 fail: 4647 ceph_msg_put(reply); 4648 fail_msg: 4649 ceph_pagelist_release(_pagelist); 4650 return err; 4651 } 4652 4653 static struct dentry* d_find_primary(struct inode *inode) 4654 { 4655 struct dentry *alias, *dn = NULL; 4656 4657 if (hlist_empty(&inode->i_dentry)) 4658 return NULL; 4659 4660 spin_lock(&inode->i_lock); 4661 if (hlist_empty(&inode->i_dentry)) 4662 goto out_unlock; 4663 4664 if (S_ISDIR(inode->i_mode)) { 4665 alias = hlist_entry(inode->i_dentry.first, struct dentry, d_alias); 4666 if (!IS_ROOT(alias)) 4667 dn = dget(alias); 4668 goto out_unlock; 4669 } 4670 4671 for_each_alias(alias, inode) { 4672 spin_lock(&alias->d_lock); 4673 if (!d_unhashed(alias) && 4674 (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) { 4675 dn = dget_dlock(alias); 4676 } 4677 spin_unlock(&alias->d_lock); 4678 if (dn) 4679 break; 4680 } 4681 out_unlock: 4682 spin_unlock(&inode->i_lock); 4683 return dn; 4684 } 4685 4686 /* 4687 * Encode information about a cap for a reconnect with the MDS. 4688 */ 4689 static int reconnect_caps_cb(struct inode *inode, int mds, void *arg) 4690 { 4691 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); 4692 struct ceph_client *cl = ceph_inode_to_client(inode); 4693 union { 4694 struct ceph_mds_cap_reconnect v2; 4695 struct ceph_mds_cap_reconnect_v1 v1; 4696 } rec; 4697 struct ceph_inode_info *ci = ceph_inode(inode); 4698 struct ceph_reconnect_state *recon_state = arg; 4699 struct ceph_pagelist *pagelist = recon_state->pagelist; 4700 struct dentry *dentry; 4701 struct ceph_cap *cap; 4702 struct ceph_path_info path_info = {0}; 4703 int err; 4704 u64 snap_follows; 4705 4706 dentry = d_find_primary(inode); 4707 if (dentry) { 4708 /* set pathbase to parent dir when msg_version >= 2 */ 4709 char *path = ceph_mdsc_build_path(mdsc, dentry, &path_info, 4710 recon_state->msg_version >= 2); 4711 dput(dentry); 4712 if (IS_ERR(path)) { 4713 err = PTR_ERR(path); 4714 goto out_err; 4715 } 4716 } 4717 4718 spin_lock(&ci->i_ceph_lock); 4719 cap = __get_cap_for_mds(ci, mds); 4720 if (!cap) { 4721 spin_unlock(&ci->i_ceph_lock); 4722 err = 0; 4723 goto out_err; 4724 } 4725 doutc(cl, " adding %p ino %llx.%llx cap %p %lld %s\n", inode, 4726 ceph_vinop(inode), cap, cap->cap_id, 4727 ceph_cap_string(cap->issued)); 4728 4729 cap->seq = 0; /* reset cap seq */ 4730 cap->issue_seq = 0; /* and issue_seq */ 4731 cap->mseq = 0; /* and migrate_seq */ 4732 cap->cap_gen = atomic_read(&cap->session->s_cap_gen); 4733 4734 /* These are lost when the session goes away */ 4735 if (S_ISDIR(inode->i_mode)) { 4736 if (cap->issued & CEPH_CAP_DIR_CREATE) { 4737 ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns)); 4738 memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout)); 4739 } 4740 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS; 4741 } 4742 4743 if (recon_state->msg_version >= 2) { 4744 rec.v2.cap_id = cpu_to_le64(cap->cap_id); 4745 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 4746 rec.v2.issued = cpu_to_le32(cap->issued); 4747 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 4748 rec.v2.pathbase = cpu_to_le64(path_info.vino.ino); 4749 rec.v2.flock_len = (__force __le32) 4750 ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1); 4751 } else { 4752 struct timespec64 ts; 4753 4754 rec.v1.cap_id = cpu_to_le64(cap->cap_id); 4755 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 4756 rec.v1.issued = cpu_to_le32(cap->issued); 4757 rec.v1.size = cpu_to_le64(i_size_read(inode)); 4758 ts = inode_get_mtime(inode); 4759 ceph_encode_timespec64(&rec.v1.mtime, &ts); 4760 ts = inode_get_atime(inode); 4761 ceph_encode_timespec64(&rec.v1.atime, &ts); 4762 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 4763 rec.v1.pathbase = cpu_to_le64(path_info.vino.ino); 4764 } 4765 4766 if (list_empty(&ci->i_cap_snaps)) { 4767 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0; 4768 } else { 4769 struct ceph_cap_snap *capsnap = 4770 list_first_entry(&ci->i_cap_snaps, 4771 struct ceph_cap_snap, ci_item); 4772 snap_follows = capsnap->follows; 4773 } 4774 spin_unlock(&ci->i_ceph_lock); 4775 4776 if (recon_state->msg_version >= 2) { 4777 int num_fcntl_locks, num_flock_locks; 4778 struct ceph_filelock *flocks = NULL; 4779 size_t struct_len, total_len = sizeof(u64); 4780 u8 struct_v = 0; 4781 4782 encode_again: 4783 if (rec.v2.flock_len) { 4784 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); 4785 } else { 4786 num_fcntl_locks = 0; 4787 num_flock_locks = 0; 4788 } 4789 if (num_fcntl_locks + num_flock_locks > 0) { 4790 flocks = kmalloc_objs(struct ceph_filelock, 4791 num_fcntl_locks + num_flock_locks, 4792 GFP_NOFS); 4793 if (!flocks) { 4794 err = -ENOMEM; 4795 goto out_err; 4796 } 4797 err = ceph_encode_locks_to_buffer(inode, flocks, 4798 num_fcntl_locks, 4799 num_flock_locks); 4800 if (err) { 4801 kfree(flocks); 4802 flocks = NULL; 4803 if (err == -ENOSPC) 4804 goto encode_again; 4805 goto out_err; 4806 } 4807 } else { 4808 kfree(flocks); 4809 flocks = NULL; 4810 } 4811 4812 if (recon_state->msg_version >= 3) { 4813 /* version, compat_version and struct_len */ 4814 total_len += 2 * sizeof(u8) + sizeof(u32); 4815 struct_v = 2; 4816 } 4817 /* 4818 * number of encoded locks is stable, so copy to pagelist 4819 */ 4820 struct_len = 2 * sizeof(u32) + 4821 (num_fcntl_locks + num_flock_locks) * 4822 sizeof(struct ceph_filelock); 4823 rec.v2.flock_len = cpu_to_le32(struct_len); 4824 4825 struct_len += sizeof(u32) + path_info.pathlen + sizeof(rec.v2); 4826 4827 if (struct_v >= 2) 4828 struct_len += sizeof(u64); /* snap_follows */ 4829 4830 total_len += struct_len; 4831 4832 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) { 4833 err = send_reconnect_partial(recon_state); 4834 if (err) 4835 goto out_freeflocks; 4836 pagelist = recon_state->pagelist; 4837 } 4838 4839 err = ceph_pagelist_reserve(pagelist, total_len); 4840 if (err) 4841 goto out_freeflocks; 4842 4843 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 4844 if (recon_state->msg_version >= 3) { 4845 ceph_pagelist_encode_8(pagelist, struct_v); 4846 ceph_pagelist_encode_8(pagelist, 1); 4847 ceph_pagelist_encode_32(pagelist, struct_len); 4848 } 4849 ceph_pagelist_encode_string(pagelist, (char *)path_info.path, path_info.pathlen); 4850 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2)); 4851 ceph_locks_to_pagelist(flocks, pagelist, 4852 num_fcntl_locks, num_flock_locks); 4853 if (struct_v >= 2) 4854 ceph_pagelist_encode_64(pagelist, snap_follows); 4855 out_freeflocks: 4856 kfree(flocks); 4857 } else { 4858 err = ceph_pagelist_reserve(pagelist, 4859 sizeof(u64) + sizeof(u32) + 4860 path_info.pathlen + sizeof(rec.v1)); 4861 if (err) 4862 goto out_err; 4863 4864 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 4865 ceph_pagelist_encode_string(pagelist, (char *)path_info.path, path_info.pathlen); 4866 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); 4867 } 4868 4869 out_err: 4870 ceph_mdsc_free_path_info(&path_info); 4871 if (!err) 4872 recon_state->nr_caps++; 4873 return err; 4874 } 4875 4876 static int encode_snap_realms(struct ceph_mds_client *mdsc, 4877 struct ceph_reconnect_state *recon_state) 4878 { 4879 struct rb_node *p; 4880 struct ceph_pagelist *pagelist = recon_state->pagelist; 4881 struct ceph_client *cl = mdsc->fsc->client; 4882 int err = 0; 4883 4884 if (recon_state->msg_version >= 4) { 4885 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms); 4886 if (err < 0) 4887 goto fail; 4888 } 4889 4890 /* 4891 * snaprealms. we provide mds with the ino, seq (version), and 4892 * parent for all of our realms. If the mds has any newer info, 4893 * it will tell us. 4894 */ 4895 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { 4896 struct ceph_snap_realm *realm = 4897 rb_entry(p, struct ceph_snap_realm, node); 4898 struct ceph_mds_snaprealm_reconnect sr_rec; 4899 4900 if (recon_state->msg_version >= 4) { 4901 size_t need = sizeof(u8) * 2 + sizeof(u32) + 4902 sizeof(sr_rec); 4903 4904 if (pagelist->length + need > RECONNECT_MAX_SIZE) { 4905 err = send_reconnect_partial(recon_state); 4906 if (err) 4907 goto fail; 4908 pagelist = recon_state->pagelist; 4909 } 4910 4911 err = ceph_pagelist_reserve(pagelist, need); 4912 if (err) 4913 goto fail; 4914 4915 ceph_pagelist_encode_8(pagelist, 1); 4916 ceph_pagelist_encode_8(pagelist, 1); 4917 ceph_pagelist_encode_32(pagelist, sizeof(sr_rec)); 4918 } 4919 4920 doutc(cl, " adding snap realm %llx seq %lld parent %llx\n", 4921 realm->ino, realm->seq, realm->parent_ino); 4922 sr_rec.ino = cpu_to_le64(realm->ino); 4923 sr_rec.seq = cpu_to_le64(realm->seq); 4924 sr_rec.parent = cpu_to_le64(realm->parent_ino); 4925 4926 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); 4927 if (err) 4928 goto fail; 4929 4930 recon_state->nr_realms++; 4931 } 4932 fail: 4933 return err; 4934 } 4935 4936 4937 /* 4938 * If an MDS fails and recovers, clients need to reconnect in order to 4939 * reestablish shared state. This includes all caps issued through 4940 * this session _and_ the snap_realm hierarchy. Because it's not 4941 * clear which snap realms the mds cares about, we send everything we 4942 * know about.. that ensures we'll then get any new info the 4943 * recovering MDS might have. 4944 * 4945 * This is a relatively heavyweight operation, but it's rare. 4946 */ 4947 static void send_mds_reconnect(struct ceph_mds_client *mdsc, 4948 struct ceph_mds_session *session) 4949 { 4950 struct ceph_client *cl = mdsc->fsc->client; 4951 struct ceph_msg *reply; 4952 int mds = session->s_mds; 4953 int err = -ENOMEM; 4954 struct ceph_reconnect_state recon_state = { 4955 .session = session, 4956 }; 4957 LIST_HEAD(dispose); 4958 4959 pr_info_client(cl, "mds%d reconnect start\n", mds); 4960 4961 recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS); 4962 if (!recon_state.pagelist) 4963 goto fail_nopagelist; 4964 4965 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 4966 if (!reply) 4967 goto fail_nomsg; 4968 4969 xa_destroy(&session->s_delegated_inos); 4970 4971 mutex_lock(&session->s_mutex); 4972 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 4973 session->s_seq = 0; 4974 4975 doutc(cl, "session %p state %s\n", session, 4976 ceph_session_state_name(session->s_state)); 4977 4978 atomic_inc(&session->s_cap_gen); 4979 4980 spin_lock(&session->s_cap_lock); 4981 /* don't know if session is readonly */ 4982 session->s_readonly = 0; 4983 /* 4984 * notify __ceph_remove_cap() that we are composing cap reconnect. 4985 * If a cap get released before being added to the cap reconnect, 4986 * __ceph_remove_cap() should skip queuing cap release. 4987 */ 4988 session->s_cap_reconnect = 1; 4989 /* drop old cap expires; we're about to reestablish that state */ 4990 detach_cap_releases(session, &dispose); 4991 spin_unlock(&session->s_cap_lock); 4992 dispose_cap_releases(mdsc, &dispose); 4993 4994 /* trim unused caps to reduce MDS's cache rejoin time */ 4995 if (mdsc->fsc->sb->s_root) 4996 shrink_dcache_parent(mdsc->fsc->sb->s_root); 4997 4998 ceph_con_close(&session->s_con); 4999 ceph_con_open(&session->s_con, 5000 CEPH_ENTITY_TYPE_MDS, mds, 5001 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 5002 5003 /* replay unsafe requests */ 5004 replay_unsafe_requests(mdsc, session); 5005 5006 ceph_early_kick_flushing_caps(mdsc, session); 5007 5008 down_read(&mdsc->snap_rwsem); 5009 5010 /* placeholder for nr_caps */ 5011 err = ceph_pagelist_encode_32(recon_state.pagelist, 0); 5012 if (err) 5013 goto fail_clear_cap_reconnect; 5014 5015 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) { 5016 recon_state.msg_version = 3; 5017 recon_state.allow_multi = true; 5018 } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) { 5019 recon_state.msg_version = 3; 5020 } else { 5021 recon_state.msg_version = 2; 5022 } 5023 /* traverse this session's caps */ 5024 err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state); 5025 5026 spin_lock(&session->s_cap_lock); 5027 session->s_cap_reconnect = 0; 5028 spin_unlock(&session->s_cap_lock); 5029 5030 if (err < 0) 5031 goto fail; 5032 5033 /* check if all realms can be encoded into current message */ 5034 if (mdsc->num_snap_realms) { 5035 size_t total_len = 5036 recon_state.pagelist->length + 5037 mdsc->num_snap_realms * 5038 sizeof(struct ceph_mds_snaprealm_reconnect); 5039 if (recon_state.msg_version >= 4) { 5040 /* number of realms */ 5041 total_len += sizeof(u32); 5042 /* version, compat_version and struct_len */ 5043 total_len += mdsc->num_snap_realms * 5044 (2 * sizeof(u8) + sizeof(u32)); 5045 } 5046 if (total_len > RECONNECT_MAX_SIZE) { 5047 if (!recon_state.allow_multi) { 5048 err = -ENOSPC; 5049 goto fail; 5050 } 5051 if (recon_state.nr_caps) { 5052 err = send_reconnect_partial(&recon_state); 5053 if (err) 5054 goto fail; 5055 } 5056 recon_state.msg_version = 5; 5057 } 5058 } 5059 5060 err = encode_snap_realms(mdsc, &recon_state); 5061 if (err < 0) 5062 goto fail; 5063 5064 if (recon_state.msg_version >= 5) { 5065 err = ceph_pagelist_encode_8(recon_state.pagelist, 0); 5066 if (err < 0) 5067 goto fail; 5068 } 5069 5070 if (recon_state.nr_caps || recon_state.nr_realms) { 5071 struct page *page = 5072 list_first_entry(&recon_state.pagelist->head, 5073 struct page, lru); 5074 __le32 *addr = kmap_atomic(page); 5075 if (recon_state.nr_caps) { 5076 WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms); 5077 *addr = cpu_to_le32(recon_state.nr_caps); 5078 } else if (recon_state.msg_version >= 4) { 5079 *(addr + 1) = cpu_to_le32(recon_state.nr_realms); 5080 } 5081 kunmap_atomic(addr); 5082 } 5083 5084 reply->hdr.version = cpu_to_le16(recon_state.msg_version); 5085 if (recon_state.msg_version >= 4) 5086 reply->hdr.compat_version = cpu_to_le16(4); 5087 5088 reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length); 5089 ceph_msg_data_add_pagelist(reply, recon_state.pagelist); 5090 5091 ceph_con_send(&session->s_con, reply); 5092 5093 mutex_unlock(&session->s_mutex); 5094 5095 mutex_lock(&mdsc->mutex); 5096 __wake_requests(mdsc, &session->s_waiting); 5097 mutex_unlock(&mdsc->mutex); 5098 5099 up_read(&mdsc->snap_rwsem); 5100 ceph_pagelist_release(recon_state.pagelist); 5101 return; 5102 5103 fail_clear_cap_reconnect: 5104 spin_lock(&session->s_cap_lock); 5105 session->s_cap_reconnect = 0; 5106 spin_unlock(&session->s_cap_lock); 5107 fail: 5108 ceph_msg_put(reply); 5109 up_read(&mdsc->snap_rwsem); 5110 mutex_unlock(&session->s_mutex); 5111 fail_nomsg: 5112 ceph_pagelist_release(recon_state.pagelist); 5113 fail_nopagelist: 5114 pr_err_client(cl, "error %d preparing reconnect for mds%d\n", 5115 err, mds); 5116 return; 5117 } 5118 5119 5120 /* 5121 * compare old and new mdsmaps, kicking requests 5122 * and closing out old connections as necessary 5123 * 5124 * called under mdsc->mutex. 5125 */ 5126 static void check_new_map(struct ceph_mds_client *mdsc, 5127 struct ceph_mdsmap *newmap, 5128 struct ceph_mdsmap *oldmap) 5129 { 5130 int i, j, err; 5131 int oldstate, newstate; 5132 struct ceph_mds_session *s; 5133 unsigned long targets[DIV_ROUND_UP(CEPH_MAX_MDS, sizeof(unsigned long))] = {0}; 5134 struct ceph_client *cl = mdsc->fsc->client; 5135 5136 doutc(cl, "new %u old %u\n", newmap->m_epoch, oldmap->m_epoch); 5137 5138 if (newmap->m_info) { 5139 for (i = 0; i < newmap->possible_max_rank; i++) { 5140 for (j = 0; j < newmap->m_info[i].num_export_targets; j++) 5141 set_bit(newmap->m_info[i].export_targets[j], targets); 5142 } 5143 } 5144 5145 for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) { 5146 if (!mdsc->sessions[i]) 5147 continue; 5148 s = mdsc->sessions[i]; 5149 oldstate = ceph_mdsmap_get_state(oldmap, i); 5150 newstate = ceph_mdsmap_get_state(newmap, i); 5151 5152 doutc(cl, "mds%d state %s%s -> %s%s (session %s)\n", 5153 i, ceph_mds_state_name(oldstate), 5154 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", 5155 ceph_mds_state_name(newstate), 5156 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", 5157 ceph_session_state_name(s->s_state)); 5158 5159 if (i >= newmap->possible_max_rank) { 5160 /* force close session for stopped mds */ 5161 ceph_get_mds_session(s); 5162 __unregister_session(mdsc, s); 5163 __wake_requests(mdsc, &s->s_waiting); 5164 mutex_unlock(&mdsc->mutex); 5165 5166 mutex_lock(&s->s_mutex); 5167 cleanup_session_requests(mdsc, s); 5168 remove_session_caps(s); 5169 mutex_unlock(&s->s_mutex); 5170 5171 ceph_put_mds_session(s); 5172 5173 mutex_lock(&mdsc->mutex); 5174 kick_requests(mdsc, i); 5175 continue; 5176 } 5177 5178 if (memcmp(ceph_mdsmap_get_addr(oldmap, i), 5179 ceph_mdsmap_get_addr(newmap, i), 5180 sizeof(struct ceph_entity_addr))) { 5181 /* just close it */ 5182 mutex_unlock(&mdsc->mutex); 5183 mutex_lock(&s->s_mutex); 5184 mutex_lock(&mdsc->mutex); 5185 ceph_con_close(&s->s_con); 5186 mutex_unlock(&s->s_mutex); 5187 s->s_state = CEPH_MDS_SESSION_RESTARTING; 5188 } else if (oldstate == newstate) { 5189 continue; /* nothing new with this mds */ 5190 } 5191 5192 /* 5193 * send reconnect? 5194 */ 5195 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 5196 newstate >= CEPH_MDS_STATE_RECONNECT) { 5197 mutex_unlock(&mdsc->mutex); 5198 clear_bit(i, targets); 5199 send_mds_reconnect(mdsc, s); 5200 mutex_lock(&mdsc->mutex); 5201 } 5202 5203 /* 5204 * kick request on any mds that has gone active. 5205 */ 5206 if (oldstate < CEPH_MDS_STATE_ACTIVE && 5207 newstate >= CEPH_MDS_STATE_ACTIVE) { 5208 if (oldstate != CEPH_MDS_STATE_CREATING && 5209 oldstate != CEPH_MDS_STATE_STARTING) 5210 pr_info_client(cl, "mds%d recovery completed\n", 5211 s->s_mds); 5212 kick_requests(mdsc, i); 5213 mutex_unlock(&mdsc->mutex); 5214 mutex_lock(&s->s_mutex); 5215 mutex_lock(&mdsc->mutex); 5216 ceph_kick_flushing_caps(mdsc, s); 5217 mutex_unlock(&s->s_mutex); 5218 wake_up_session_caps(s, RECONNECT); 5219 } 5220 } 5221 5222 /* 5223 * Only open and reconnect sessions that don't exist yet. 5224 */ 5225 for (i = 0; i < newmap->possible_max_rank; i++) { 5226 /* 5227 * In case the import MDS is crashed just after 5228 * the EImportStart journal is flushed, so when 5229 * a standby MDS takes over it and is replaying 5230 * the EImportStart journal the new MDS daemon 5231 * will wait the client to reconnect it, but the 5232 * client may never register/open the session yet. 5233 * 5234 * Will try to reconnect that MDS daemon if the 5235 * rank number is in the export targets array and 5236 * is the up:reconnect state. 5237 */ 5238 newstate = ceph_mdsmap_get_state(newmap, i); 5239 if (!test_bit(i, targets) || newstate != CEPH_MDS_STATE_RECONNECT) 5240 continue; 5241 5242 /* 5243 * The session maybe registered and opened by some 5244 * requests which were choosing random MDSes during 5245 * the mdsc->mutex's unlock/lock gap below in rare 5246 * case. But the related MDS daemon will just queue 5247 * that requests and be still waiting for the client's 5248 * reconnection request in up:reconnect state. 5249 */ 5250 s = __ceph_lookup_mds_session(mdsc, i); 5251 if (likely(!s)) { 5252 s = __open_export_target_session(mdsc, i); 5253 if (IS_ERR(s)) { 5254 err = PTR_ERR(s); 5255 pr_err_client(cl, 5256 "failed to open export target session, err %d\n", 5257 err); 5258 continue; 5259 } 5260 } 5261 doutc(cl, "send reconnect to export target mds.%d\n", i); 5262 mutex_unlock(&mdsc->mutex); 5263 send_mds_reconnect(mdsc, s); 5264 ceph_put_mds_session(s); 5265 mutex_lock(&mdsc->mutex); 5266 } 5267 5268 for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) { 5269 s = mdsc->sessions[i]; 5270 if (!s) 5271 continue; 5272 if (!ceph_mdsmap_is_laggy(newmap, i)) 5273 continue; 5274 if (s->s_state == CEPH_MDS_SESSION_OPEN || 5275 s->s_state == CEPH_MDS_SESSION_HUNG || 5276 s->s_state == CEPH_MDS_SESSION_CLOSING) { 5277 doutc(cl, " connecting to export targets of laggy mds%d\n", i); 5278 __open_export_target_sessions(mdsc, s); 5279 } 5280 } 5281 } 5282 5283 5284 5285 /* 5286 * leases 5287 */ 5288 5289 /* 5290 * caller must hold session s_mutex, dentry->d_lock 5291 */ 5292 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry) 5293 { 5294 struct ceph_dentry_info *di = ceph_dentry(dentry); 5295 5296 ceph_put_mds_session(di->lease_session); 5297 di->lease_session = NULL; 5298 } 5299 5300 static void handle_lease(struct ceph_mds_client *mdsc, 5301 struct ceph_mds_session *session, 5302 struct ceph_msg *msg) 5303 { 5304 struct ceph_client *cl = mdsc->fsc->client; 5305 struct super_block *sb = mdsc->fsc->sb; 5306 struct inode *inode; 5307 struct dentry *parent, *dentry; 5308 struct ceph_dentry_info *di; 5309 int mds = session->s_mds; 5310 struct ceph_mds_lease *h = msg->front.iov_base; 5311 u32 seq; 5312 struct ceph_vino vino; 5313 struct qstr dname; 5314 int release = 0; 5315 5316 doutc(cl, "from mds%d\n", mds); 5317 5318 if (!ceph_inc_mds_stopping_blocker(mdsc, session)) 5319 return; 5320 5321 /* decode */ 5322 if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) 5323 goto bad; 5324 vino.ino = le64_to_cpu(h->ino); 5325 vino.snap = CEPH_NOSNAP; 5326 seq = le32_to_cpu(h->seq); 5327 dname.len = get_unaligned_le32(h + 1); 5328 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len) 5329 goto bad; 5330 dname.name = (void *)(h + 1) + sizeof(u32); 5331 5332 /* lookup inode */ 5333 inode = ceph_find_inode(sb, vino); 5334 doutc(cl, "%s, ino %llx %p %.*s\n", ceph_lease_op_name(h->action), 5335 vino.ino, inode, dname.len, dname.name); 5336 5337 mutex_lock(&session->s_mutex); 5338 if (!inode) { 5339 doutc(cl, "no inode %llx\n", vino.ino); 5340 goto release; 5341 } 5342 5343 /* dentry */ 5344 parent = d_find_alias(inode); 5345 if (!parent) { 5346 doutc(cl, "no parent dentry on inode %p\n", inode); 5347 WARN_ON(1); 5348 goto release; /* hrm... */ 5349 } 5350 dname.hash = full_name_hash(parent, dname.name, dname.len); 5351 dentry = d_lookup(parent, &dname); 5352 dput(parent); 5353 if (!dentry) 5354 goto release; 5355 5356 spin_lock(&dentry->d_lock); 5357 di = ceph_dentry(dentry); 5358 switch (h->action) { 5359 case CEPH_MDS_LEASE_REVOKE: 5360 if (di->lease_session == session) { 5361 if (ceph_seq_cmp(di->lease_seq, seq) > 0) 5362 h->seq = cpu_to_le32(di->lease_seq); 5363 __ceph_mdsc_drop_dentry_lease(dentry); 5364 } 5365 release = 1; 5366 break; 5367 5368 case CEPH_MDS_LEASE_RENEW: 5369 if (di->lease_session == session && 5370 di->lease_gen == atomic_read(&session->s_cap_gen) && 5371 di->lease_renew_from && 5372 di->lease_renew_after == 0) { 5373 unsigned long duration = 5374 msecs_to_jiffies(le32_to_cpu(h->duration_ms)); 5375 5376 di->lease_seq = seq; 5377 di->time = di->lease_renew_from + duration; 5378 di->lease_renew_after = di->lease_renew_from + 5379 (duration >> 1); 5380 di->lease_renew_from = 0; 5381 } 5382 break; 5383 } 5384 spin_unlock(&dentry->d_lock); 5385 dput(dentry); 5386 5387 if (!release) 5388 goto out; 5389 5390 release: 5391 /* let's just reuse the same message */ 5392 h->action = CEPH_MDS_LEASE_REVOKE_ACK; 5393 ceph_msg_get(msg); 5394 ceph_con_send(&session->s_con, msg); 5395 5396 out: 5397 mutex_unlock(&session->s_mutex); 5398 iput(inode); 5399 5400 ceph_dec_mds_stopping_blocker(mdsc); 5401 return; 5402 5403 bad: 5404 ceph_dec_mds_stopping_blocker(mdsc); 5405 5406 pr_err_client(cl, "corrupt lease message\n"); 5407 ceph_msg_dump(msg); 5408 } 5409 5410 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, 5411 struct dentry *dentry, char action, 5412 u32 seq) 5413 { 5414 struct ceph_client *cl = session->s_mdsc->fsc->client; 5415 struct ceph_msg *msg; 5416 struct ceph_mds_lease *lease; 5417 struct inode *dir; 5418 int len = sizeof(*lease) + sizeof(u32) + NAME_MAX; 5419 5420 doutc(cl, "identry %p %s to mds%d\n", dentry, ceph_lease_op_name(action), 5421 session->s_mds); 5422 5423 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); 5424 if (!msg) 5425 return; 5426 lease = msg->front.iov_base; 5427 lease->action = action; 5428 lease->seq = cpu_to_le32(seq); 5429 5430 spin_lock(&dentry->d_lock); 5431 dir = d_inode(dentry->d_parent); 5432 lease->ino = cpu_to_le64(ceph_ino(dir)); 5433 lease->first = lease->last = cpu_to_le64(ceph_snap(dir)); 5434 5435 put_unaligned_le32(dentry->d_name.len, lease + 1); 5436 memcpy((void *)(lease + 1) + 4, 5437 dentry->d_name.name, dentry->d_name.len); 5438 spin_unlock(&dentry->d_lock); 5439 5440 ceph_con_send(&session->s_con, msg); 5441 } 5442 5443 /* 5444 * lock unlock the session, to wait ongoing session activities 5445 */ 5446 static void lock_unlock_session(struct ceph_mds_session *s) 5447 { 5448 mutex_lock(&s->s_mutex); 5449 mutex_unlock(&s->s_mutex); 5450 } 5451 5452 static void maybe_recover_session(struct ceph_mds_client *mdsc) 5453 { 5454 struct ceph_client *cl = mdsc->fsc->client; 5455 struct ceph_fs_client *fsc = mdsc->fsc; 5456 5457 if (!ceph_test_mount_opt(fsc, CLEANRECOVER)) 5458 return; 5459 5460 if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED) 5461 return; 5462 5463 if (!READ_ONCE(fsc->blocklisted)) 5464 return; 5465 5466 pr_info_client(cl, "auto reconnect after blocklisted\n"); 5467 ceph_force_reconnect(fsc->sb); 5468 } 5469 5470 bool check_session_state(struct ceph_mds_session *s) 5471 { 5472 struct ceph_client *cl = s->s_mdsc->fsc->client; 5473 5474 switch (s->s_state) { 5475 case CEPH_MDS_SESSION_OPEN: 5476 if (s->s_ttl && time_after(jiffies, s->s_ttl)) { 5477 s->s_state = CEPH_MDS_SESSION_HUNG; 5478 pr_info_client(cl, "mds%d hung\n", s->s_mds); 5479 } 5480 break; 5481 case CEPH_MDS_SESSION_CLOSING: 5482 case CEPH_MDS_SESSION_NEW: 5483 case CEPH_MDS_SESSION_RESTARTING: 5484 case CEPH_MDS_SESSION_CLOSED: 5485 case CEPH_MDS_SESSION_REJECTED: 5486 return false; 5487 } 5488 5489 return true; 5490 } 5491 5492 /* 5493 * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply, 5494 * then we need to retransmit that request. 5495 */ 5496 void inc_session_sequence(struct ceph_mds_session *s) 5497 { 5498 struct ceph_client *cl = s->s_mdsc->fsc->client; 5499 5500 lockdep_assert_held(&s->s_mutex); 5501 5502 s->s_seq++; 5503 5504 if (s->s_state == CEPH_MDS_SESSION_CLOSING) { 5505 int ret; 5506 5507 doutc(cl, "resending session close request for mds%d\n", s->s_mds); 5508 ret = request_close_session(s); 5509 if (ret < 0) 5510 pr_err_client(cl, "unable to close session to mds%d: %d\n", 5511 s->s_mds, ret); 5512 } 5513 } 5514 5515 /* 5516 * delayed work -- periodically trim expired leases, renew caps with mds. If 5517 * the @delay parameter is set to 0 or if it's more than 5 secs, the default 5518 * workqueue delay value of 5 secs will be used. 5519 */ 5520 static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay) 5521 { 5522 unsigned long max_delay = HZ * 5; 5523 5524 /* 5 secs default delay */ 5525 if (!delay || (delay > max_delay)) 5526 delay = max_delay; 5527 schedule_delayed_work(&mdsc->delayed_work, 5528 round_jiffies_relative(delay)); 5529 } 5530 5531 static void delayed_work(struct work_struct *work) 5532 { 5533 struct ceph_mds_client *mdsc = 5534 container_of(work, struct ceph_mds_client, delayed_work.work); 5535 unsigned long delay; 5536 int renew_interval; 5537 int renew_caps; 5538 int i; 5539 5540 doutc(mdsc->fsc->client, "mdsc delayed_work\n"); 5541 5542 if (mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHED) 5543 return; 5544 5545 mutex_lock(&mdsc->mutex); 5546 renew_interval = mdsc->mdsmap->m_session_timeout >> 2; 5547 renew_caps = time_after_eq(jiffies, HZ*renew_interval + 5548 mdsc->last_renew_caps); 5549 if (renew_caps) 5550 mdsc->last_renew_caps = jiffies; 5551 5552 for (i = 0; i < mdsc->max_sessions; i++) { 5553 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 5554 if (!s) 5555 continue; 5556 5557 if (!check_session_state(s)) { 5558 ceph_put_mds_session(s); 5559 continue; 5560 } 5561 mutex_unlock(&mdsc->mutex); 5562 5563 ceph_flush_session_cap_releases(mdsc, s); 5564 5565 mutex_lock(&s->s_mutex); 5566 if (renew_caps) 5567 send_renew_caps(mdsc, s); 5568 else 5569 ceph_con_keepalive(&s->s_con); 5570 if (s->s_state == CEPH_MDS_SESSION_OPEN || 5571 s->s_state == CEPH_MDS_SESSION_HUNG) 5572 ceph_send_cap_releases(mdsc, s); 5573 mutex_unlock(&s->s_mutex); 5574 ceph_put_mds_session(s); 5575 5576 mutex_lock(&mdsc->mutex); 5577 } 5578 mutex_unlock(&mdsc->mutex); 5579 5580 delay = ceph_check_delayed_caps(mdsc); 5581 5582 ceph_queue_cap_reclaim_work(mdsc); 5583 5584 ceph_trim_snapid_map(mdsc); 5585 5586 maybe_recover_session(mdsc); 5587 5588 schedule_delayed(mdsc, delay); 5589 } 5590 5591 int ceph_mdsc_init(struct ceph_fs_client *fsc) 5592 5593 { 5594 struct ceph_mds_client *mdsc; 5595 int err; 5596 5597 mdsc = kzalloc_obj(struct ceph_mds_client, GFP_NOFS); 5598 if (!mdsc) 5599 return -ENOMEM; 5600 mdsc->fsc = fsc; 5601 mutex_init(&mdsc->mutex); 5602 mdsc->mdsmap = kzalloc_obj(*mdsc->mdsmap, GFP_NOFS); 5603 if (!mdsc->mdsmap) { 5604 err = -ENOMEM; 5605 goto err_mdsc; 5606 } 5607 5608 init_completion(&mdsc->safe_umount_waiters); 5609 spin_lock_init(&mdsc->stopping_lock); 5610 atomic_set(&mdsc->stopping_blockers, 0); 5611 init_completion(&mdsc->stopping_waiter); 5612 atomic64_set(&mdsc->dirty_folios, 0); 5613 init_waitqueue_head(&mdsc->flush_end_wq); 5614 init_waitqueue_head(&mdsc->session_close_wq); 5615 INIT_LIST_HEAD(&mdsc->waiting_for_map); 5616 mdsc->quotarealms_inodes = RB_ROOT; 5617 mutex_init(&mdsc->quotarealms_inodes_mutex); 5618 init_rwsem(&mdsc->snap_rwsem); 5619 mdsc->snap_realms = RB_ROOT; 5620 INIT_LIST_HEAD(&mdsc->snap_empty); 5621 spin_lock_init(&mdsc->snap_empty_lock); 5622 mdsc->request_tree = RB_ROOT; 5623 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); 5624 mdsc->last_renew_caps = jiffies; 5625 INIT_LIST_HEAD(&mdsc->cap_delay_list); 5626 #ifdef CONFIG_DEBUG_FS 5627 INIT_LIST_HEAD(&mdsc->cap_wait_list); 5628 #endif 5629 spin_lock_init(&mdsc->cap_delay_lock); 5630 INIT_LIST_HEAD(&mdsc->cap_unlink_delay_list); 5631 INIT_LIST_HEAD(&mdsc->snap_flush_list); 5632 spin_lock_init(&mdsc->snap_flush_lock); 5633 mdsc->last_cap_flush_tid = 1; 5634 INIT_LIST_HEAD(&mdsc->cap_flush_list); 5635 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); 5636 spin_lock_init(&mdsc->cap_dirty_lock); 5637 init_waitqueue_head(&mdsc->cap_flushing_wq); 5638 INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work); 5639 INIT_WORK(&mdsc->cap_unlink_work, ceph_cap_unlink_work); 5640 err = ceph_metric_init(&mdsc->metric); 5641 if (err) 5642 goto err_mdsmap; 5643 ceph_subvolume_metrics_init(&mdsc->subvol_metrics); 5644 mutex_init(&mdsc->subvol_metrics_last_mutex); 5645 mdsc->subvol_metrics_last = NULL; 5646 mdsc->subvol_metrics_last_nr = 0; 5647 mdsc->subvol_metrics_sent = 0; 5648 mdsc->subvol_metrics_nonzero_sends = 0; 5649 5650 spin_lock_init(&mdsc->dentry_list_lock); 5651 INIT_LIST_HEAD(&mdsc->dentry_leases); 5652 INIT_LIST_HEAD(&mdsc->dentry_dir_leases); 5653 5654 ceph_caps_init(mdsc); 5655 ceph_adjust_caps_max_min(mdsc, fsc->mount_options); 5656 5657 spin_lock_init(&mdsc->snapid_map_lock); 5658 mdsc->snapid_map_tree = RB_ROOT; 5659 INIT_LIST_HEAD(&mdsc->snapid_map_lru); 5660 5661 init_rwsem(&mdsc->pool_perm_rwsem); 5662 mdsc->pool_perm_tree = RB_ROOT; 5663 5664 strscpy(mdsc->nodename, utsname()->nodename, 5665 sizeof(mdsc->nodename)); 5666 5667 fsc->mdsc = mdsc; 5668 return 0; 5669 5670 err_mdsmap: 5671 kfree(mdsc->mdsmap); 5672 err_mdsc: 5673 kfree(mdsc); 5674 return err; 5675 } 5676 5677 /* 5678 * Wait for safe replies on open mds requests. If we time out, drop 5679 * all requests from the tree to avoid dangling dentry refs. 5680 */ 5681 static void wait_requests(struct ceph_mds_client *mdsc) 5682 { 5683 struct ceph_client *cl = mdsc->fsc->client; 5684 struct ceph_options *opts = mdsc->fsc->client->options; 5685 struct ceph_mds_request *req; 5686 5687 mutex_lock(&mdsc->mutex); 5688 if (__get_oldest_req(mdsc)) { 5689 mutex_unlock(&mdsc->mutex); 5690 5691 doutc(cl, "waiting for requests\n"); 5692 wait_for_completion_timeout(&mdsc->safe_umount_waiters, 5693 ceph_timeout_jiffies(opts->mount_timeout)); 5694 5695 /* tear down remaining requests */ 5696 mutex_lock(&mdsc->mutex); 5697 while ((req = __get_oldest_req(mdsc))) { 5698 doutc(cl, "timed out on tid %llu\n", req->r_tid); 5699 list_del_init(&req->r_wait); 5700 __unregister_request(mdsc, req); 5701 } 5702 } 5703 mutex_unlock(&mdsc->mutex); 5704 doutc(cl, "done\n"); 5705 } 5706 5707 void send_flush_mdlog(struct ceph_mds_session *s) 5708 { 5709 struct ceph_client *cl = s->s_mdsc->fsc->client; 5710 struct ceph_msg *msg; 5711 5712 /* 5713 * Pre-luminous MDS crashes when it sees an unknown session request 5714 */ 5715 if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS)) 5716 return; 5717 5718 mutex_lock(&s->s_mutex); 5719 doutc(cl, "request mdlog flush to mds%d (%s)s seq %lld\n", 5720 s->s_mds, ceph_session_state_name(s->s_state), s->s_seq); 5721 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG, 5722 s->s_seq); 5723 if (!msg) { 5724 pr_err_client(cl, "failed to request mdlog flush to mds%d (%s) seq %lld\n", 5725 s->s_mds, ceph_session_state_name(s->s_state), s->s_seq); 5726 } else { 5727 ceph_con_send(&s->s_con, msg); 5728 } 5729 mutex_unlock(&s->s_mutex); 5730 } 5731 5732 static int ceph_mds_auth_match(struct ceph_mds_client *mdsc, 5733 struct ceph_mds_cap_auth *auth, 5734 const struct cred *cred, 5735 char *tpath) 5736 { 5737 u32 caller_uid = from_kuid(&init_user_ns, cred->fsuid); 5738 u32 caller_gid = from_kgid(&init_user_ns, cred->fsgid); 5739 struct ceph_client *cl = mdsc->fsc->client; 5740 const char *fs_name = mdsc->mdsmap->m_fs_name; 5741 const char *spath = mdsc->fsc->mount_options->server_path; 5742 bool gid_matched = false; 5743 u32 gid, tlen, len; 5744 int i, j; 5745 5746 doutc(cl, "fsname check fs_name=%s match.fs_name=%s\n", 5747 fs_name, auth->match.fs_name ? auth->match.fs_name : ""); 5748 5749 if (!ceph_namespace_match(auth->match.fs_name, fs_name)) { 5750 /* fsname mismatch, try next one */ 5751 return 0; 5752 } 5753 5754 doutc(cl, "match.uid %lld\n", auth->match.uid); 5755 if (auth->match.uid != MDS_AUTH_UID_ANY) { 5756 if (auth->match.uid != caller_uid) 5757 return 0; 5758 if (auth->match.num_gids) { 5759 for (i = 0; i < auth->match.num_gids; i++) { 5760 if (caller_gid == auth->match.gids[i]) 5761 gid_matched = true; 5762 } 5763 if (!gid_matched && cred->group_info->ngroups) { 5764 for (i = 0; i < cred->group_info->ngroups; i++) { 5765 gid = from_kgid(&init_user_ns, 5766 cred->group_info->gid[i]); 5767 for (j = 0; j < auth->match.num_gids; j++) { 5768 if (gid == auth->match.gids[j]) { 5769 gid_matched = true; 5770 break; 5771 } 5772 } 5773 if (gid_matched) 5774 break; 5775 } 5776 } 5777 if (!gid_matched) 5778 return 0; 5779 } 5780 } 5781 5782 /* path match */ 5783 if (auth->match.path) { 5784 if (!tpath) 5785 return 0; 5786 5787 tlen = strlen(tpath); 5788 len = strlen(auth->match.path); 5789 if (len) { 5790 char *_tpath = tpath; 5791 bool free_tpath = false; 5792 int m, n; 5793 5794 doutc(cl, "server path %s, tpath %s, match.path %s\n", 5795 spath, tpath, auth->match.path); 5796 if (spath && (m = strlen(spath)) != 1) { 5797 /* mount path + '/' + tpath + an extra space */ 5798 n = m + 1 + tlen + 1; 5799 _tpath = kmalloc(n, GFP_NOFS); 5800 if (!_tpath) 5801 return -ENOMEM; 5802 /* remove the leading '/' */ 5803 snprintf(_tpath, n, "%s/%s", spath + 1, tpath); 5804 free_tpath = true; 5805 tlen = strlen(_tpath); 5806 } 5807 5808 /* 5809 * Please note the tailing '/' for match.path has already 5810 * been removed when parsing. 5811 * 5812 * Remove the tailing '/' for the target path. 5813 */ 5814 while (tlen && _tpath[tlen - 1] == '/') { 5815 _tpath[tlen - 1] = '\0'; 5816 tlen -= 1; 5817 } 5818 doutc(cl, "_tpath %s\n", _tpath); 5819 5820 /* 5821 * In case first == _tpath && tlen == len: 5822 * match.path=/foo --> /foo _path=/foo --> match 5823 * match.path=/foo/ --> /foo _path=/foo --> match 5824 * 5825 * In case first == _tmatch.path && tlen > len: 5826 * match.path=/foo/ --> /foo _path=/foo/ --> match 5827 * match.path=/foo --> /foo _path=/foo/ --> match 5828 * match.path=/foo/ --> /foo _path=/foo/d --> match 5829 * match.path=/foo --> /foo _path=/food --> mismatch 5830 * 5831 * All the other cases --> mismatch 5832 */ 5833 bool path_matched = true; 5834 char *first = strstr(_tpath, auth->match.path); 5835 if (first != _tpath || 5836 (tlen > len && _tpath[len] != '/')) { 5837 path_matched = false; 5838 } 5839 5840 if (free_tpath) 5841 kfree(_tpath); 5842 5843 if (!path_matched) 5844 return 0; 5845 } 5846 } 5847 5848 doutc(cl, "matched\n"); 5849 return 1; 5850 } 5851 5852 int ceph_mds_check_access(struct ceph_mds_client *mdsc, char *tpath, int mask) 5853 { 5854 const struct cred *cred = get_current_cred(); 5855 u32 caller_uid = from_kuid(&init_user_ns, cred->fsuid); 5856 u32 caller_gid = from_kgid(&init_user_ns, cred->fsgid); 5857 struct ceph_mds_cap_auth *rw_perms_s = NULL; 5858 struct ceph_client *cl = mdsc->fsc->client; 5859 bool root_squash_perms = true; 5860 int i, err; 5861 5862 doutc(cl, "tpath '%s', mask %d, caller_uid %d, caller_gid %d\n", 5863 tpath, mask, caller_uid, caller_gid); 5864 5865 for (i = 0; i < mdsc->s_cap_auths_num; i++) { 5866 struct ceph_mds_cap_auth *s = &mdsc->s_cap_auths[i]; 5867 5868 err = ceph_mds_auth_match(mdsc, s, cred, tpath); 5869 if (err < 0) { 5870 put_cred(cred); 5871 return err; 5872 } else if (err > 0) { 5873 /* always follow the last auth caps' permission */ 5874 root_squash_perms = true; 5875 rw_perms_s = NULL; 5876 if ((mask & MAY_WRITE) && s->writeable && 5877 s->match.root_squash && (!caller_uid || !caller_gid)) 5878 root_squash_perms = false; 5879 5880 if (((mask & MAY_WRITE) && !s->writeable) || 5881 ((mask & MAY_READ) && !s->readable)) 5882 rw_perms_s = s; 5883 } 5884 } 5885 5886 put_cred(cred); 5887 5888 doutc(cl, "root_squash_perms %d, rw_perms_s %p\n", root_squash_perms, 5889 rw_perms_s); 5890 if (root_squash_perms && rw_perms_s == NULL) { 5891 doutc(cl, "access allowed\n"); 5892 return 0; 5893 } 5894 5895 if (!root_squash_perms) { 5896 doutc(cl, "root_squash is enabled and user(%d %d) isn't allowed to write", 5897 caller_uid, caller_gid); 5898 } 5899 if (rw_perms_s) { 5900 doutc(cl, "mds auth caps readable/writeable %d/%d while request r/w %d/%d", 5901 rw_perms_s->readable, rw_perms_s->writeable, 5902 !!(mask & MAY_READ), !!(mask & MAY_WRITE)); 5903 } 5904 doutc(cl, "access denied\n"); 5905 return -EACCES; 5906 } 5907 5908 /* 5909 * called before mount is ro, and before dentries are torn down. 5910 * (hmm, does this still race with new lookups?) 5911 */ 5912 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) 5913 { 5914 doutc(mdsc->fsc->client, "begin\n"); 5915 mdsc->stopping = CEPH_MDSC_STOPPING_BEGIN; 5916 5917 ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true); 5918 ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false); 5919 ceph_flush_dirty_caps(mdsc); 5920 wait_requests(mdsc); 5921 5922 /* 5923 * wait for reply handlers to drop their request refs and 5924 * their inode/dcache refs 5925 */ 5926 ceph_msgr_flush(); 5927 5928 ceph_cleanup_quotarealms_inodes(mdsc); 5929 doutc(mdsc->fsc->client, "done\n"); 5930 } 5931 5932 /* 5933 * flush the mdlog and wait for all write mds requests to flush. 5934 */ 5935 static void flush_mdlog_and_wait_mdsc_unsafe_requests(struct ceph_mds_client *mdsc, 5936 u64 want_tid) 5937 { 5938 struct ceph_client *cl = mdsc->fsc->client; 5939 struct ceph_mds_request *req = NULL, *nextreq; 5940 struct ceph_mds_session *last_session = NULL; 5941 struct rb_node *n; 5942 5943 mutex_lock(&mdsc->mutex); 5944 doutc(cl, "want %lld\n", want_tid); 5945 restart: 5946 req = __get_oldest_req(mdsc); 5947 while (req && req->r_tid <= want_tid) { 5948 /* find next request */ 5949 n = rb_next(&req->r_node); 5950 if (n) 5951 nextreq = rb_entry(n, struct ceph_mds_request, r_node); 5952 else 5953 nextreq = NULL; 5954 if (req->r_op != CEPH_MDS_OP_SETFILELOCK && 5955 (req->r_op & CEPH_MDS_OP_WRITE)) { 5956 struct ceph_mds_session *s = req->r_session; 5957 5958 if (!s) { 5959 req = nextreq; 5960 continue; 5961 } 5962 5963 /* write op */ 5964 ceph_mdsc_get_request(req); 5965 if (nextreq) 5966 ceph_mdsc_get_request(nextreq); 5967 s = ceph_get_mds_session(s); 5968 mutex_unlock(&mdsc->mutex); 5969 5970 /* send flush mdlog request to MDS */ 5971 if (last_session != s) { 5972 send_flush_mdlog(s); 5973 ceph_put_mds_session(last_session); 5974 last_session = s; 5975 } else { 5976 ceph_put_mds_session(s); 5977 } 5978 doutc(cl, "wait on %llu (want %llu)\n", 5979 req->r_tid, want_tid); 5980 wait_for_completion(&req->r_safe_completion); 5981 5982 mutex_lock(&mdsc->mutex); 5983 ceph_mdsc_put_request(req); 5984 if (!nextreq) 5985 break; /* next dne before, so we're done! */ 5986 if (RB_EMPTY_NODE(&nextreq->r_node)) { 5987 /* next request was removed from tree */ 5988 ceph_mdsc_put_request(nextreq); 5989 goto restart; 5990 } 5991 ceph_mdsc_put_request(nextreq); /* won't go away */ 5992 } 5993 req = nextreq; 5994 } 5995 mutex_unlock(&mdsc->mutex); 5996 ceph_put_mds_session(last_session); 5997 doutc(cl, "done\n"); 5998 } 5999 6000 void ceph_mdsc_sync(struct ceph_mds_client *mdsc) 6001 { 6002 struct ceph_client *cl = mdsc->fsc->client; 6003 u64 want_tid, want_flush; 6004 6005 if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) 6006 return; 6007 6008 doutc(cl, "sync\n"); 6009 mutex_lock(&mdsc->mutex); 6010 want_tid = mdsc->last_tid; 6011 mutex_unlock(&mdsc->mutex); 6012 6013 ceph_flush_dirty_caps(mdsc); 6014 ceph_flush_cap_releases(mdsc); 6015 spin_lock(&mdsc->cap_dirty_lock); 6016 want_flush = mdsc->last_cap_flush_tid; 6017 if (!list_empty(&mdsc->cap_flush_list)) { 6018 struct ceph_cap_flush *cf = 6019 list_last_entry(&mdsc->cap_flush_list, 6020 struct ceph_cap_flush, g_list); 6021 cf->wake = true; 6022 } 6023 spin_unlock(&mdsc->cap_dirty_lock); 6024 6025 doutc(cl, "sync want tid %lld flush_seq %lld\n", want_tid, want_flush); 6026 6027 flush_mdlog_and_wait_mdsc_unsafe_requests(mdsc, want_tid); 6028 wait_caps_flush(mdsc, want_flush); 6029 } 6030 6031 /* 6032 * true if all sessions are closed, or we force unmount 6033 */ 6034 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped) 6035 { 6036 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 6037 return true; 6038 return atomic_read(&mdsc->num_sessions) <= skipped; 6039 } 6040 6041 /* 6042 * called after sb is ro or when metadata corrupted. 6043 */ 6044 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) 6045 { 6046 struct ceph_options *opts = mdsc->fsc->client->options; 6047 struct ceph_client *cl = mdsc->fsc->client; 6048 struct ceph_mds_session *session; 6049 int i; 6050 int skipped = 0; 6051 6052 doutc(cl, "begin\n"); 6053 6054 /* close sessions */ 6055 mutex_lock(&mdsc->mutex); 6056 for (i = 0; i < mdsc->max_sessions; i++) { 6057 session = __ceph_lookup_mds_session(mdsc, i); 6058 if (!session) 6059 continue; 6060 mutex_unlock(&mdsc->mutex); 6061 mutex_lock(&session->s_mutex); 6062 if (__close_session(mdsc, session) <= 0) 6063 skipped++; 6064 mutex_unlock(&session->s_mutex); 6065 ceph_put_mds_session(session); 6066 mutex_lock(&mdsc->mutex); 6067 } 6068 mutex_unlock(&mdsc->mutex); 6069 6070 doutc(cl, "waiting for sessions to close\n"); 6071 wait_event_timeout(mdsc->session_close_wq, 6072 done_closing_sessions(mdsc, skipped), 6073 ceph_timeout_jiffies(opts->mount_timeout)); 6074 6075 /* tear down remaining sessions */ 6076 mutex_lock(&mdsc->mutex); 6077 for (i = 0; i < mdsc->max_sessions; i++) { 6078 if (mdsc->sessions[i]) { 6079 session = ceph_get_mds_session(mdsc->sessions[i]); 6080 __unregister_session(mdsc, session); 6081 mutex_unlock(&mdsc->mutex); 6082 mutex_lock(&session->s_mutex); 6083 remove_session_caps(session); 6084 mutex_unlock(&session->s_mutex); 6085 ceph_put_mds_session(session); 6086 mutex_lock(&mdsc->mutex); 6087 } 6088 } 6089 WARN_ON(!list_empty(&mdsc->cap_delay_list)); 6090 mutex_unlock(&mdsc->mutex); 6091 6092 ceph_cleanup_snapid_map(mdsc); 6093 ceph_cleanup_global_and_empty_realms(mdsc); 6094 6095 cancel_work_sync(&mdsc->cap_reclaim_work); 6096 cancel_work_sync(&mdsc->cap_unlink_work); 6097 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 6098 6099 doutc(cl, "done\n"); 6100 } 6101 6102 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc) 6103 { 6104 struct ceph_mds_session *session; 6105 int mds; 6106 6107 doutc(mdsc->fsc->client, "force umount\n"); 6108 6109 mutex_lock(&mdsc->mutex); 6110 for (mds = 0; mds < mdsc->max_sessions; mds++) { 6111 session = __ceph_lookup_mds_session(mdsc, mds); 6112 if (!session) 6113 continue; 6114 6115 if (session->s_state == CEPH_MDS_SESSION_REJECTED) 6116 __unregister_session(mdsc, session); 6117 __wake_requests(mdsc, &session->s_waiting); 6118 mutex_unlock(&mdsc->mutex); 6119 6120 mutex_lock(&session->s_mutex); 6121 __close_session(mdsc, session); 6122 if (session->s_state == CEPH_MDS_SESSION_CLOSING) { 6123 cleanup_session_requests(mdsc, session); 6124 remove_session_caps(session); 6125 } 6126 mutex_unlock(&session->s_mutex); 6127 ceph_put_mds_session(session); 6128 6129 mutex_lock(&mdsc->mutex); 6130 kick_requests(mdsc, mds); 6131 } 6132 __wake_requests(mdsc, &mdsc->waiting_for_map); 6133 mutex_unlock(&mdsc->mutex); 6134 } 6135 6136 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) 6137 { 6138 doutc(mdsc->fsc->client, "stop\n"); 6139 /* 6140 * Make sure the delayed work stopped before releasing 6141 * the resources. 6142 * 6143 * Because the cancel_delayed_work_sync() will only 6144 * guarantee that the work finishes executing. But the 6145 * delayed work will re-arm itself again after that. 6146 */ 6147 flush_delayed_work(&mdsc->delayed_work); 6148 6149 if (mdsc->mdsmap) 6150 ceph_mdsmap_destroy(mdsc->mdsmap); 6151 kfree(mdsc->sessions); 6152 ceph_caps_finalize(mdsc); 6153 6154 if (mdsc->s_cap_auths) { 6155 int i; 6156 6157 for (i = 0; i < mdsc->s_cap_auths_num; i++) { 6158 kfree(mdsc->s_cap_auths[i].match.gids); 6159 kfree(mdsc->s_cap_auths[i].match.path); 6160 kfree(mdsc->s_cap_auths[i].match.fs_name); 6161 } 6162 kfree(mdsc->s_cap_auths); 6163 } 6164 6165 ceph_pool_perm_destroy(mdsc); 6166 } 6167 6168 void ceph_mdsc_destroy(struct ceph_fs_client *fsc) 6169 { 6170 struct ceph_mds_client *mdsc = fsc->mdsc; 6171 doutc(fsc->client, "%p\n", mdsc); 6172 6173 if (!mdsc) 6174 return; 6175 6176 /* flush out any connection work with references to us */ 6177 ceph_msgr_flush(); 6178 6179 ceph_mdsc_stop(mdsc); 6180 6181 ceph_metric_destroy(&mdsc->metric); 6182 ceph_subvolume_metrics_destroy(&mdsc->subvol_metrics); 6183 kfree(mdsc->subvol_metrics_last); 6184 6185 fsc->mdsc = NULL; 6186 kfree(mdsc); 6187 doutc(fsc->client, "%p done\n", mdsc); 6188 } 6189 6190 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 6191 { 6192 struct ceph_fs_client *fsc = mdsc->fsc; 6193 struct ceph_client *cl = fsc->client; 6194 const char *mds_namespace = fsc->mount_options->mds_namespace; 6195 void *p = msg->front.iov_base; 6196 void *end = p + msg->front.iov_len; 6197 u32 epoch; 6198 u32 num_fs; 6199 u32 mount_fscid = (u32)-1; 6200 int err = -EINVAL; 6201 6202 ceph_decode_need(&p, end, sizeof(u32), bad); 6203 epoch = ceph_decode_32(&p); 6204 6205 doutc(cl, "epoch %u\n", epoch); 6206 6207 /* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */ 6208 ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad); 6209 6210 ceph_decode_32_safe(&p, end, num_fs, bad); 6211 while (num_fs-- > 0) { 6212 void *info_p, *info_end; 6213 u32 info_len; 6214 u32 fscid, namelen; 6215 6216 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 6217 p += 2; // info_v, info_cv 6218 info_len = ceph_decode_32(&p); 6219 ceph_decode_need(&p, end, info_len, bad); 6220 info_p = p; 6221 info_end = p + info_len; 6222 p = info_end; 6223 6224 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad); 6225 fscid = ceph_decode_32(&info_p); 6226 namelen = ceph_decode_32(&info_p); 6227 ceph_decode_need(&info_p, info_end, namelen, bad); 6228 6229 if (mds_namespace && 6230 strlen(mds_namespace) == namelen && 6231 !strncmp(mds_namespace, (char *)info_p, namelen)) { 6232 mount_fscid = fscid; 6233 break; 6234 } 6235 } 6236 6237 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch); 6238 if (mount_fscid != (u32)-1) { 6239 fsc->client->monc.fs_cluster_id = mount_fscid; 6240 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 6241 0, true); 6242 ceph_monc_renew_subs(&fsc->client->monc); 6243 } else { 6244 err = -ENOENT; 6245 goto err_out; 6246 } 6247 return; 6248 6249 bad: 6250 pr_err_client(cl, "error decoding fsmap %d. Shutting down mount.\n", 6251 err); 6252 ceph_umount_begin(mdsc->fsc->sb); 6253 ceph_msg_dump(msg); 6254 err_out: 6255 mutex_lock(&mdsc->mutex); 6256 mdsc->mdsmap_err = err; 6257 __wake_requests(mdsc, &mdsc->waiting_for_map); 6258 mutex_unlock(&mdsc->mutex); 6259 } 6260 6261 /* 6262 * handle mds map update. 6263 */ 6264 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 6265 { 6266 struct ceph_client *cl = mdsc->fsc->client; 6267 u32 epoch; 6268 u32 maplen; 6269 void *p = msg->front.iov_base; 6270 void *end = p + msg->front.iov_len; 6271 struct ceph_mdsmap *newmap, *oldmap; 6272 struct ceph_fsid fsid; 6273 int err = -EINVAL; 6274 6275 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad); 6276 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 6277 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0) 6278 return; 6279 epoch = ceph_decode_32(&p); 6280 maplen = ceph_decode_32(&p); 6281 doutc(cl, "epoch %u len %d\n", epoch, (int)maplen); 6282 6283 /* do we need it? */ 6284 mutex_lock(&mdsc->mutex); 6285 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 6286 doutc(cl, "epoch %u <= our %u\n", epoch, mdsc->mdsmap->m_epoch); 6287 mutex_unlock(&mdsc->mutex); 6288 return; 6289 } 6290 6291 newmap = ceph_mdsmap_decode(mdsc, &p, end, ceph_msgr2(mdsc->fsc->client)); 6292 if (IS_ERR(newmap)) { 6293 err = PTR_ERR(newmap); 6294 goto bad_unlock; 6295 } 6296 6297 /* swap into place */ 6298 if (mdsc->mdsmap) { 6299 oldmap = mdsc->mdsmap; 6300 mdsc->mdsmap = newmap; 6301 check_new_map(mdsc, newmap, oldmap); 6302 ceph_mdsmap_destroy(oldmap); 6303 } else { 6304 mdsc->mdsmap = newmap; /* first mds map */ 6305 } 6306 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size, 6307 MAX_LFS_FILESIZE); 6308 6309 __wake_requests(mdsc, &mdsc->waiting_for_map); 6310 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP, 6311 mdsc->mdsmap->m_epoch); 6312 6313 mutex_unlock(&mdsc->mutex); 6314 schedule_delayed(mdsc, 0); 6315 return; 6316 6317 bad_unlock: 6318 mutex_unlock(&mdsc->mutex); 6319 bad: 6320 pr_err_client(cl, "error decoding mdsmap %d. Shutting down mount.\n", 6321 err); 6322 ceph_umount_begin(mdsc->fsc->sb); 6323 ceph_msg_dump(msg); 6324 return; 6325 } 6326 6327 static struct ceph_connection *mds_get_con(struct ceph_connection *con) 6328 { 6329 struct ceph_mds_session *s = con->private; 6330 6331 if (ceph_get_mds_session(s)) 6332 return con; 6333 return NULL; 6334 } 6335 6336 static void mds_put_con(struct ceph_connection *con) 6337 { 6338 struct ceph_mds_session *s = con->private; 6339 6340 ceph_put_mds_session(s); 6341 } 6342 6343 /* 6344 * if the client is unresponsive for long enough, the mds will kill 6345 * the session entirely. 6346 */ 6347 static void mds_peer_reset(struct ceph_connection *con) 6348 { 6349 struct ceph_mds_session *s = con->private; 6350 struct ceph_mds_client *mdsc = s->s_mdsc; 6351 6352 pr_warn_client(mdsc->fsc->client, "mds%d closed our session\n", 6353 s->s_mds); 6354 if (READ_ONCE(mdsc->fsc->mount_state) != CEPH_MOUNT_FENCE_IO && 6355 ceph_mdsmap_get_state(mdsc->mdsmap, s->s_mds) >= CEPH_MDS_STATE_RECONNECT) 6356 send_mds_reconnect(mdsc, s); 6357 } 6358 6359 static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg) 6360 { 6361 struct ceph_mds_session *s = con->private; 6362 struct ceph_mds_client *mdsc = s->s_mdsc; 6363 struct ceph_client *cl = mdsc->fsc->client; 6364 int type = le16_to_cpu(msg->hdr.type); 6365 6366 mutex_lock(&mdsc->mutex); 6367 if (__verify_registered_session(mdsc, s) < 0) { 6368 mutex_unlock(&mdsc->mutex); 6369 goto out; 6370 } 6371 mutex_unlock(&mdsc->mutex); 6372 6373 switch (type) { 6374 case CEPH_MSG_MDS_MAP: 6375 ceph_mdsc_handle_mdsmap(mdsc, msg); 6376 break; 6377 case CEPH_MSG_FS_MAP_USER: 6378 ceph_mdsc_handle_fsmap(mdsc, msg); 6379 break; 6380 case CEPH_MSG_CLIENT_SESSION: 6381 handle_session(s, msg); 6382 break; 6383 case CEPH_MSG_CLIENT_REPLY: 6384 handle_reply(s, msg); 6385 break; 6386 case CEPH_MSG_CLIENT_REQUEST_FORWARD: 6387 handle_forward(mdsc, s, msg); 6388 break; 6389 case CEPH_MSG_CLIENT_CAPS: 6390 ceph_handle_caps(s, msg); 6391 break; 6392 case CEPH_MSG_CLIENT_SNAP: 6393 ceph_handle_snap(mdsc, s, msg); 6394 break; 6395 case CEPH_MSG_CLIENT_LEASE: 6396 handle_lease(mdsc, s, msg); 6397 break; 6398 case CEPH_MSG_CLIENT_QUOTA: 6399 ceph_handle_quota(mdsc, s, msg); 6400 break; 6401 6402 default: 6403 pr_err_client(cl, "received unknown message type %d %s\n", 6404 type, ceph_msg_type_name(type)); 6405 } 6406 out: 6407 ceph_msg_put(msg); 6408 } 6409 6410 /* 6411 * authentication 6412 */ 6413 6414 /* 6415 * Note: returned pointer is the address of a structure that's 6416 * managed separately. Caller must *not* attempt to free it. 6417 */ 6418 static struct ceph_auth_handshake * 6419 mds_get_authorizer(struct ceph_connection *con, int *proto, int force_new) 6420 { 6421 struct ceph_mds_session *s = con->private; 6422 struct ceph_mds_client *mdsc = s->s_mdsc; 6423 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 6424 struct ceph_auth_handshake *auth = &s->s_auth; 6425 int ret; 6426 6427 ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS, 6428 force_new, proto, NULL, NULL); 6429 if (ret) 6430 return ERR_PTR(ret); 6431 6432 return auth; 6433 } 6434 6435 static int mds_add_authorizer_challenge(struct ceph_connection *con, 6436 void *challenge_buf, int challenge_buf_len) 6437 { 6438 struct ceph_mds_session *s = con->private; 6439 struct ceph_mds_client *mdsc = s->s_mdsc; 6440 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 6441 6442 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer, 6443 challenge_buf, challenge_buf_len); 6444 } 6445 6446 static int mds_verify_authorizer_reply(struct ceph_connection *con) 6447 { 6448 struct ceph_mds_session *s = con->private; 6449 struct ceph_mds_client *mdsc = s->s_mdsc; 6450 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 6451 struct ceph_auth_handshake *auth = &s->s_auth; 6452 6453 return ceph_auth_verify_authorizer_reply(ac, auth->authorizer, 6454 auth->authorizer_reply_buf, auth->authorizer_reply_buf_len, 6455 NULL, NULL, NULL, NULL); 6456 } 6457 6458 static int mds_invalidate_authorizer(struct ceph_connection *con) 6459 { 6460 struct ceph_mds_session *s = con->private; 6461 struct ceph_mds_client *mdsc = s->s_mdsc; 6462 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 6463 6464 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS); 6465 6466 return ceph_monc_validate_auth(&mdsc->fsc->client->monc); 6467 } 6468 6469 static int mds_get_auth_request(struct ceph_connection *con, 6470 void *buf, int *buf_len, 6471 void **authorizer, int *authorizer_len) 6472 { 6473 struct ceph_mds_session *s = con->private; 6474 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 6475 struct ceph_auth_handshake *auth = &s->s_auth; 6476 int ret; 6477 6478 ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS, 6479 buf, buf_len); 6480 if (ret) 6481 return ret; 6482 6483 *authorizer = auth->authorizer_buf; 6484 *authorizer_len = auth->authorizer_buf_len; 6485 return 0; 6486 } 6487 6488 static int mds_handle_auth_reply_more(struct ceph_connection *con, 6489 void *reply, int reply_len, 6490 void *buf, int *buf_len, 6491 void **authorizer, int *authorizer_len) 6492 { 6493 struct ceph_mds_session *s = con->private; 6494 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 6495 struct ceph_auth_handshake *auth = &s->s_auth; 6496 int ret; 6497 6498 ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len, 6499 buf, buf_len); 6500 if (ret) 6501 return ret; 6502 6503 *authorizer = auth->authorizer_buf; 6504 *authorizer_len = auth->authorizer_buf_len; 6505 return 0; 6506 } 6507 6508 static int mds_handle_auth_done(struct ceph_connection *con, 6509 u64 global_id, void *reply, int reply_len, 6510 u8 *session_key, int *session_key_len, 6511 u8 *con_secret, int *con_secret_len) 6512 { 6513 struct ceph_mds_session *s = con->private; 6514 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 6515 struct ceph_auth_handshake *auth = &s->s_auth; 6516 6517 return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len, 6518 session_key, session_key_len, 6519 con_secret, con_secret_len); 6520 } 6521 6522 static int mds_handle_auth_bad_method(struct ceph_connection *con, 6523 int used_proto, int result, 6524 const int *allowed_protos, int proto_cnt, 6525 const int *allowed_modes, int mode_cnt) 6526 { 6527 struct ceph_mds_session *s = con->private; 6528 struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc; 6529 int ret; 6530 6531 if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS, 6532 used_proto, result, 6533 allowed_protos, proto_cnt, 6534 allowed_modes, mode_cnt)) { 6535 ret = ceph_monc_validate_auth(monc); 6536 if (ret) 6537 return ret; 6538 } 6539 6540 return -EACCES; 6541 } 6542 6543 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con, 6544 struct ceph_msg_header *hdr, int *skip) 6545 { 6546 struct ceph_msg *msg; 6547 int type = (int) le16_to_cpu(hdr->type); 6548 int front_len = (int) le32_to_cpu(hdr->front_len); 6549 6550 if (con->in_msg) 6551 return con->in_msg; 6552 6553 *skip = 0; 6554 msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 6555 if (!msg) { 6556 pr_err("unable to allocate msg type %d len %d\n", 6557 type, front_len); 6558 return NULL; 6559 } 6560 6561 return msg; 6562 } 6563 6564 static int mds_sign_message(struct ceph_msg *msg) 6565 { 6566 struct ceph_mds_session *s = msg->con->private; 6567 struct ceph_auth_handshake *auth = &s->s_auth; 6568 6569 return ceph_auth_sign_message(auth, msg); 6570 } 6571 6572 static int mds_check_message_signature(struct ceph_msg *msg) 6573 { 6574 struct ceph_mds_session *s = msg->con->private; 6575 struct ceph_auth_handshake *auth = &s->s_auth; 6576 6577 return ceph_auth_check_message_signature(auth, msg); 6578 } 6579 6580 static const struct ceph_connection_operations mds_con_ops = { 6581 .get = mds_get_con, 6582 .put = mds_put_con, 6583 .alloc_msg = mds_alloc_msg, 6584 .dispatch = mds_dispatch, 6585 .peer_reset = mds_peer_reset, 6586 .get_authorizer = mds_get_authorizer, 6587 .add_authorizer_challenge = mds_add_authorizer_challenge, 6588 .verify_authorizer_reply = mds_verify_authorizer_reply, 6589 .invalidate_authorizer = mds_invalidate_authorizer, 6590 .sign_message = mds_sign_message, 6591 .check_message_signature = mds_check_message_signature, 6592 .get_auth_request = mds_get_auth_request, 6593 .handle_auth_reply_more = mds_handle_auth_reply_more, 6594 .handle_auth_done = mds_handle_auth_done, 6595 .handle_auth_bad_method = mds_handle_auth_bad_method, 6596 }; 6597 6598 /* eof */ 6599