1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/fs.h> 4 #include <linux/wait.h> 5 #include <linux/slab.h> 6 #include <linux/sched.h> 7 #include <linux/debugfs.h> 8 #include <linux/seq_file.h> 9 #include <linux/smp_lock.h> 10 11 #include "super.h" 12 #include "mds_client.h" 13 14 #include <linux/ceph/messenger.h> 15 #include <linux/ceph/decode.h> 16 #include <linux/ceph/pagelist.h> 17 #include <linux/ceph/auth.h> 18 #include <linux/ceph/debugfs.h> 19 20 /* 21 * A cluster of MDS (metadata server) daemons is responsible for 22 * managing the file system namespace (the directory hierarchy and 23 * inodes) and for coordinating shared access to storage. Metadata is 24 * partitioning hierarchically across a number of servers, and that 25 * partition varies over time as the cluster adjusts the distribution 26 * in order to balance load. 27 * 28 * The MDS client is primarily responsible to managing synchronous 29 * metadata requests for operations like open, unlink, and so forth. 30 * If there is a MDS failure, we find out about it when we (possibly 31 * request and) receive a new MDS map, and can resubmit affected 32 * requests. 33 * 34 * For the most part, though, we take advantage of a lossless 35 * communications channel to the MDS, and do not need to worry about 36 * timing out or resubmitting requests. 37 * 38 * We maintain a stateful "session" with each MDS we interact with. 39 * Within each session, we sent periodic heartbeat messages to ensure 40 * any capabilities or leases we have been issues remain valid. If 41 * the session times out and goes stale, our leases and capabilities 42 * are no longer valid. 43 */ 44 45 struct ceph_reconnect_state { 46 struct ceph_pagelist *pagelist; 47 bool flock; 48 }; 49 50 static void __wake_requests(struct ceph_mds_client *mdsc, 51 struct list_head *head); 52 53 static const struct ceph_connection_operations mds_con_ops; 54 55 56 /* 57 * mds reply parsing 58 */ 59 60 /* 61 * parse individual inode info 62 */ 63 static int parse_reply_info_in(void **p, void *end, 64 struct ceph_mds_reply_info_in *info) 65 { 66 int err = -EIO; 67 68 info->in = *p; 69 *p += sizeof(struct ceph_mds_reply_inode) + 70 sizeof(*info->in->fragtree.splits) * 71 le32_to_cpu(info->in->fragtree.nsplits); 72 73 ceph_decode_32_safe(p, end, info->symlink_len, bad); 74 ceph_decode_need(p, end, info->symlink_len, bad); 75 info->symlink = *p; 76 *p += info->symlink_len; 77 78 ceph_decode_32_safe(p, end, info->xattr_len, bad); 79 ceph_decode_need(p, end, info->xattr_len, bad); 80 info->xattr_data = *p; 81 *p += info->xattr_len; 82 return 0; 83 bad: 84 return err; 85 } 86 87 /* 88 * parse a normal reply, which may contain a (dir+)dentry and/or a 89 * target inode. 90 */ 91 static int parse_reply_info_trace(void **p, void *end, 92 struct ceph_mds_reply_info_parsed *info) 93 { 94 int err; 95 96 if (info->head->is_dentry) { 97 err = parse_reply_info_in(p, end, &info->diri); 98 if (err < 0) 99 goto out_bad; 100 101 if (unlikely(*p + sizeof(*info->dirfrag) > end)) 102 goto bad; 103 info->dirfrag = *p; 104 *p += sizeof(*info->dirfrag) + 105 sizeof(u32)*le32_to_cpu(info->dirfrag->ndist); 106 if (unlikely(*p > end)) 107 goto bad; 108 109 ceph_decode_32_safe(p, end, info->dname_len, bad); 110 ceph_decode_need(p, end, info->dname_len, bad); 111 info->dname = *p; 112 *p += info->dname_len; 113 info->dlease = *p; 114 *p += sizeof(*info->dlease); 115 } 116 117 if (info->head->is_target) { 118 err = parse_reply_info_in(p, end, &info->targeti); 119 if (err < 0) 120 goto out_bad; 121 } 122 123 if (unlikely(*p != end)) 124 goto bad; 125 return 0; 126 127 bad: 128 err = -EIO; 129 out_bad: 130 pr_err("problem parsing mds trace %d\n", err); 131 return err; 132 } 133 134 /* 135 * parse readdir results 136 */ 137 static int parse_reply_info_dir(void **p, void *end, 138 struct ceph_mds_reply_info_parsed *info) 139 { 140 u32 num, i = 0; 141 int err; 142 143 info->dir_dir = *p; 144 if (*p + sizeof(*info->dir_dir) > end) 145 goto bad; 146 *p += sizeof(*info->dir_dir) + 147 sizeof(u32)*le32_to_cpu(info->dir_dir->ndist); 148 if (*p > end) 149 goto bad; 150 151 ceph_decode_need(p, end, sizeof(num) + 2, bad); 152 num = ceph_decode_32(p); 153 info->dir_end = ceph_decode_8(p); 154 info->dir_complete = ceph_decode_8(p); 155 if (num == 0) 156 goto done; 157 158 /* alloc large array */ 159 info->dir_nr = num; 160 info->dir_in = kcalloc(num, sizeof(*info->dir_in) + 161 sizeof(*info->dir_dname) + 162 sizeof(*info->dir_dname_len) + 163 sizeof(*info->dir_dlease), 164 GFP_NOFS); 165 if (info->dir_in == NULL) { 166 err = -ENOMEM; 167 goto out_bad; 168 } 169 info->dir_dname = (void *)(info->dir_in + num); 170 info->dir_dname_len = (void *)(info->dir_dname + num); 171 info->dir_dlease = (void *)(info->dir_dname_len + num); 172 173 while (num) { 174 /* dentry */ 175 ceph_decode_need(p, end, sizeof(u32)*2, bad); 176 info->dir_dname_len[i] = ceph_decode_32(p); 177 ceph_decode_need(p, end, info->dir_dname_len[i], bad); 178 info->dir_dname[i] = *p; 179 *p += info->dir_dname_len[i]; 180 dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i], 181 info->dir_dname[i]); 182 info->dir_dlease[i] = *p; 183 *p += sizeof(struct ceph_mds_reply_lease); 184 185 /* inode */ 186 err = parse_reply_info_in(p, end, &info->dir_in[i]); 187 if (err < 0) 188 goto out_bad; 189 i++; 190 num--; 191 } 192 193 done: 194 if (*p != end) 195 goto bad; 196 return 0; 197 198 bad: 199 err = -EIO; 200 out_bad: 201 pr_err("problem parsing dir contents %d\n", err); 202 return err; 203 } 204 205 /* 206 * parse entire mds reply 207 */ 208 static int parse_reply_info(struct ceph_msg *msg, 209 struct ceph_mds_reply_info_parsed *info) 210 { 211 void *p, *end; 212 u32 len; 213 int err; 214 215 info->head = msg->front.iov_base; 216 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); 217 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head); 218 219 /* trace */ 220 ceph_decode_32_safe(&p, end, len, bad); 221 if (len > 0) { 222 err = parse_reply_info_trace(&p, p+len, info); 223 if (err < 0) 224 goto out_bad; 225 } 226 227 /* dir content */ 228 ceph_decode_32_safe(&p, end, len, bad); 229 if (len > 0) { 230 err = parse_reply_info_dir(&p, p+len, info); 231 if (err < 0) 232 goto out_bad; 233 } 234 235 /* snap blob */ 236 ceph_decode_32_safe(&p, end, len, bad); 237 info->snapblob_len = len; 238 info->snapblob = p; 239 p += len; 240 241 if (p != end) 242 goto bad; 243 return 0; 244 245 bad: 246 err = -EIO; 247 out_bad: 248 pr_err("mds parse_reply err %d\n", err); 249 return err; 250 } 251 252 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 253 { 254 kfree(info->dir_in); 255 } 256 257 258 /* 259 * sessions 260 */ 261 static const char *session_state_name(int s) 262 { 263 switch (s) { 264 case CEPH_MDS_SESSION_NEW: return "new"; 265 case CEPH_MDS_SESSION_OPENING: return "opening"; 266 case CEPH_MDS_SESSION_OPEN: return "open"; 267 case CEPH_MDS_SESSION_HUNG: return "hung"; 268 case CEPH_MDS_SESSION_CLOSING: return "closing"; 269 case CEPH_MDS_SESSION_RESTARTING: return "restarting"; 270 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; 271 default: return "???"; 272 } 273 } 274 275 static struct ceph_mds_session *get_session(struct ceph_mds_session *s) 276 { 277 if (atomic_inc_not_zero(&s->s_ref)) { 278 dout("mdsc get_session %p %d -> %d\n", s, 279 atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref)); 280 return s; 281 } else { 282 dout("mdsc get_session %p 0 -- FAIL", s); 283 return NULL; 284 } 285 } 286 287 void ceph_put_mds_session(struct ceph_mds_session *s) 288 { 289 dout("mdsc put_session %p %d -> %d\n", s, 290 atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1); 291 if (atomic_dec_and_test(&s->s_ref)) { 292 if (s->s_authorizer) 293 s->s_mdsc->fsc->client->monc.auth->ops->destroy_authorizer( 294 s->s_mdsc->fsc->client->monc.auth, 295 s->s_authorizer); 296 kfree(s); 297 } 298 } 299 300 /* 301 * called under mdsc->mutex 302 */ 303 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, 304 int mds) 305 { 306 struct ceph_mds_session *session; 307 308 if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL) 309 return NULL; 310 session = mdsc->sessions[mds]; 311 dout("lookup_mds_session %p %d\n", session, 312 atomic_read(&session->s_ref)); 313 get_session(session); 314 return session; 315 } 316 317 static bool __have_session(struct ceph_mds_client *mdsc, int mds) 318 { 319 if (mds >= mdsc->max_sessions) 320 return false; 321 return mdsc->sessions[mds]; 322 } 323 324 static int __verify_registered_session(struct ceph_mds_client *mdsc, 325 struct ceph_mds_session *s) 326 { 327 if (s->s_mds >= mdsc->max_sessions || 328 mdsc->sessions[s->s_mds] != s) 329 return -ENOENT; 330 return 0; 331 } 332 333 /* 334 * create+register a new session for given mds. 335 * called under mdsc->mutex. 336 */ 337 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, 338 int mds) 339 { 340 struct ceph_mds_session *s; 341 342 s = kzalloc(sizeof(*s), GFP_NOFS); 343 if (!s) 344 return ERR_PTR(-ENOMEM); 345 s->s_mdsc = mdsc; 346 s->s_mds = mds; 347 s->s_state = CEPH_MDS_SESSION_NEW; 348 s->s_ttl = 0; 349 s->s_seq = 0; 350 mutex_init(&s->s_mutex); 351 352 ceph_con_init(mdsc->fsc->client->msgr, &s->s_con); 353 s->s_con.private = s; 354 s->s_con.ops = &mds_con_ops; 355 s->s_con.peer_name.type = CEPH_ENTITY_TYPE_MDS; 356 s->s_con.peer_name.num = cpu_to_le64(mds); 357 358 spin_lock_init(&s->s_cap_lock); 359 s->s_cap_gen = 0; 360 s->s_cap_ttl = 0; 361 s->s_renew_requested = 0; 362 s->s_renew_seq = 0; 363 INIT_LIST_HEAD(&s->s_caps); 364 s->s_nr_caps = 0; 365 s->s_trim_caps = 0; 366 atomic_set(&s->s_ref, 1); 367 INIT_LIST_HEAD(&s->s_waiting); 368 INIT_LIST_HEAD(&s->s_unsafe); 369 s->s_num_cap_releases = 0; 370 s->s_cap_iterator = NULL; 371 INIT_LIST_HEAD(&s->s_cap_releases); 372 INIT_LIST_HEAD(&s->s_cap_releases_done); 373 INIT_LIST_HEAD(&s->s_cap_flushing); 374 INIT_LIST_HEAD(&s->s_cap_snaps_flushing); 375 376 dout("register_session mds%d\n", mds); 377 if (mds >= mdsc->max_sessions) { 378 int newmax = 1 << get_count_order(mds+1); 379 struct ceph_mds_session **sa; 380 381 dout("register_session realloc to %d\n", newmax); 382 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS); 383 if (sa == NULL) 384 goto fail_realloc; 385 if (mdsc->sessions) { 386 memcpy(sa, mdsc->sessions, 387 mdsc->max_sessions * sizeof(void *)); 388 kfree(mdsc->sessions); 389 } 390 mdsc->sessions = sa; 391 mdsc->max_sessions = newmax; 392 } 393 mdsc->sessions[mds] = s; 394 atomic_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 395 396 ceph_con_open(&s->s_con, ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 397 398 return s; 399 400 fail_realloc: 401 kfree(s); 402 return ERR_PTR(-ENOMEM); 403 } 404 405 /* 406 * called under mdsc->mutex 407 */ 408 static void __unregister_session(struct ceph_mds_client *mdsc, 409 struct ceph_mds_session *s) 410 { 411 dout("__unregister_session mds%d %p\n", s->s_mds, s); 412 BUG_ON(mdsc->sessions[s->s_mds] != s); 413 mdsc->sessions[s->s_mds] = NULL; 414 ceph_con_close(&s->s_con); 415 ceph_put_mds_session(s); 416 } 417 418 /* 419 * drop session refs in request. 420 * 421 * should be last request ref, or hold mdsc->mutex 422 */ 423 static void put_request_session(struct ceph_mds_request *req) 424 { 425 if (req->r_session) { 426 ceph_put_mds_session(req->r_session); 427 req->r_session = NULL; 428 } 429 } 430 431 void ceph_mdsc_release_request(struct kref *kref) 432 { 433 struct ceph_mds_request *req = container_of(kref, 434 struct ceph_mds_request, 435 r_kref); 436 if (req->r_request) 437 ceph_msg_put(req->r_request); 438 if (req->r_reply) { 439 ceph_msg_put(req->r_reply); 440 destroy_reply_info(&req->r_reply_info); 441 } 442 if (req->r_inode) { 443 ceph_put_cap_refs(ceph_inode(req->r_inode), 444 CEPH_CAP_PIN); 445 iput(req->r_inode); 446 } 447 if (req->r_locked_dir) 448 ceph_put_cap_refs(ceph_inode(req->r_locked_dir), 449 CEPH_CAP_PIN); 450 if (req->r_target_inode) 451 iput(req->r_target_inode); 452 if (req->r_dentry) 453 dput(req->r_dentry); 454 if (req->r_old_dentry) { 455 ceph_put_cap_refs( 456 ceph_inode(req->r_old_dentry->d_parent->d_inode), 457 CEPH_CAP_PIN); 458 dput(req->r_old_dentry); 459 } 460 kfree(req->r_path1); 461 kfree(req->r_path2); 462 put_request_session(req); 463 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); 464 kfree(req); 465 } 466 467 /* 468 * lookup session, bump ref if found. 469 * 470 * called under mdsc->mutex. 471 */ 472 static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc, 473 u64 tid) 474 { 475 struct ceph_mds_request *req; 476 struct rb_node *n = mdsc->request_tree.rb_node; 477 478 while (n) { 479 req = rb_entry(n, struct ceph_mds_request, r_node); 480 if (tid < req->r_tid) 481 n = n->rb_left; 482 else if (tid > req->r_tid) 483 n = n->rb_right; 484 else { 485 ceph_mdsc_get_request(req); 486 return req; 487 } 488 } 489 return NULL; 490 } 491 492 static void __insert_request(struct ceph_mds_client *mdsc, 493 struct ceph_mds_request *new) 494 { 495 struct rb_node **p = &mdsc->request_tree.rb_node; 496 struct rb_node *parent = NULL; 497 struct ceph_mds_request *req = NULL; 498 499 while (*p) { 500 parent = *p; 501 req = rb_entry(parent, struct ceph_mds_request, r_node); 502 if (new->r_tid < req->r_tid) 503 p = &(*p)->rb_left; 504 else if (new->r_tid > req->r_tid) 505 p = &(*p)->rb_right; 506 else 507 BUG(); 508 } 509 510 rb_link_node(&new->r_node, parent, p); 511 rb_insert_color(&new->r_node, &mdsc->request_tree); 512 } 513 514 /* 515 * Register an in-flight request, and assign a tid. Link to directory 516 * are modifying (if any). 517 * 518 * Called under mdsc->mutex. 519 */ 520 static void __register_request(struct ceph_mds_client *mdsc, 521 struct ceph_mds_request *req, 522 struct inode *dir) 523 { 524 req->r_tid = ++mdsc->last_tid; 525 if (req->r_num_caps) 526 ceph_reserve_caps(mdsc, &req->r_caps_reservation, 527 req->r_num_caps); 528 dout("__register_request %p tid %lld\n", req, req->r_tid); 529 ceph_mdsc_get_request(req); 530 __insert_request(mdsc, req); 531 532 if (dir) { 533 struct ceph_inode_info *ci = ceph_inode(dir); 534 535 spin_lock(&ci->i_unsafe_lock); 536 req->r_unsafe_dir = dir; 537 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); 538 spin_unlock(&ci->i_unsafe_lock); 539 } 540 } 541 542 static void __unregister_request(struct ceph_mds_client *mdsc, 543 struct ceph_mds_request *req) 544 { 545 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 546 rb_erase(&req->r_node, &mdsc->request_tree); 547 RB_CLEAR_NODE(&req->r_node); 548 549 if (req->r_unsafe_dir) { 550 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 551 552 spin_lock(&ci->i_unsafe_lock); 553 list_del_init(&req->r_unsafe_dir_item); 554 spin_unlock(&ci->i_unsafe_lock); 555 } 556 557 ceph_mdsc_put_request(req); 558 } 559 560 /* 561 * Choose mds to send request to next. If there is a hint set in the 562 * request (e.g., due to a prior forward hint from the mds), use that. 563 * Otherwise, consult frag tree and/or caps to identify the 564 * appropriate mds. If all else fails, choose randomly. 565 * 566 * Called under mdsc->mutex. 567 */ 568 struct dentry *get_nonsnap_parent(struct dentry *dentry) 569 { 570 while (!IS_ROOT(dentry) && ceph_snap(dentry->d_inode) != CEPH_NOSNAP) 571 dentry = dentry->d_parent; 572 return dentry; 573 } 574 575 static int __choose_mds(struct ceph_mds_client *mdsc, 576 struct ceph_mds_request *req) 577 { 578 struct inode *inode; 579 struct ceph_inode_info *ci; 580 struct ceph_cap *cap; 581 int mode = req->r_direct_mode; 582 int mds = -1; 583 u32 hash = req->r_direct_hash; 584 bool is_hash = req->r_direct_is_hash; 585 586 /* 587 * is there a specific mds we should try? ignore hint if we have 588 * no session and the mds is not up (active or recovering). 589 */ 590 if (req->r_resend_mds >= 0 && 591 (__have_session(mdsc, req->r_resend_mds) || 592 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) { 593 dout("choose_mds using resend_mds mds%d\n", 594 req->r_resend_mds); 595 return req->r_resend_mds; 596 } 597 598 if (mode == USE_RANDOM_MDS) 599 goto random; 600 601 inode = NULL; 602 if (req->r_inode) { 603 inode = req->r_inode; 604 } else if (req->r_dentry) { 605 struct inode *dir = req->r_dentry->d_parent->d_inode; 606 607 if (dir->i_sb != mdsc->fsc->sb) { 608 /* not this fs! */ 609 inode = req->r_dentry->d_inode; 610 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 611 /* direct snapped/virtual snapdir requests 612 * based on parent dir inode */ 613 struct dentry *dn = 614 get_nonsnap_parent(req->r_dentry->d_parent); 615 inode = dn->d_inode; 616 dout("__choose_mds using nonsnap parent %p\n", inode); 617 } else if (req->r_dentry->d_inode) { 618 /* dentry target */ 619 inode = req->r_dentry->d_inode; 620 } else { 621 /* dir + name */ 622 inode = dir; 623 hash = req->r_dentry->d_name.hash; 624 is_hash = true; 625 } 626 } 627 628 dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash, 629 (int)hash, mode); 630 if (!inode) 631 goto random; 632 ci = ceph_inode(inode); 633 634 if (is_hash && S_ISDIR(inode->i_mode)) { 635 struct ceph_inode_frag frag; 636 int found; 637 638 ceph_choose_frag(ci, hash, &frag, &found); 639 if (found) { 640 if (mode == USE_ANY_MDS && frag.ndist > 0) { 641 u8 r; 642 643 /* choose a random replica */ 644 get_random_bytes(&r, 1); 645 r %= frag.ndist; 646 mds = frag.dist[r]; 647 dout("choose_mds %p %llx.%llx " 648 "frag %u mds%d (%d/%d)\n", 649 inode, ceph_vinop(inode), 650 frag.frag, frag.mds, 651 (int)r, frag.ndist); 652 return mds; 653 } 654 655 /* since this file/dir wasn't known to be 656 * replicated, then we want to look for the 657 * authoritative mds. */ 658 mode = USE_AUTH_MDS; 659 if (frag.mds >= 0) { 660 /* choose auth mds */ 661 mds = frag.mds; 662 dout("choose_mds %p %llx.%llx " 663 "frag %u mds%d (auth)\n", 664 inode, ceph_vinop(inode), frag.frag, mds); 665 return mds; 666 } 667 } 668 } 669 670 spin_lock(&inode->i_lock); 671 cap = NULL; 672 if (mode == USE_AUTH_MDS) 673 cap = ci->i_auth_cap; 674 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) 675 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); 676 if (!cap) { 677 spin_unlock(&inode->i_lock); 678 goto random; 679 } 680 mds = cap->session->s_mds; 681 dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n", 682 inode, ceph_vinop(inode), mds, 683 cap == ci->i_auth_cap ? "auth " : "", cap); 684 spin_unlock(&inode->i_lock); 685 return mds; 686 687 random: 688 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap); 689 dout("choose_mds chose random mds%d\n", mds); 690 return mds; 691 } 692 693 694 /* 695 * session messages 696 */ 697 static struct ceph_msg *create_session_msg(u32 op, u64 seq) 698 { 699 struct ceph_msg *msg; 700 struct ceph_mds_session_head *h; 701 702 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS); 703 if (!msg) { 704 pr_err("create_session_msg ENOMEM creating msg\n"); 705 return NULL; 706 } 707 h = msg->front.iov_base; 708 h->op = cpu_to_le32(op); 709 h->seq = cpu_to_le64(seq); 710 return msg; 711 } 712 713 /* 714 * send session open request. 715 * 716 * called under mdsc->mutex 717 */ 718 static int __open_session(struct ceph_mds_client *mdsc, 719 struct ceph_mds_session *session) 720 { 721 struct ceph_msg *msg; 722 int mstate; 723 int mds = session->s_mds; 724 725 /* wait for mds to go active? */ 726 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 727 dout("open_session to mds%d (%s)\n", mds, 728 ceph_mds_state_name(mstate)); 729 session->s_state = CEPH_MDS_SESSION_OPENING; 730 session->s_renew_requested = jiffies; 731 732 /* send connect message */ 733 msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq); 734 if (!msg) 735 return -ENOMEM; 736 ceph_con_send(&session->s_con, msg); 737 return 0; 738 } 739 740 /* 741 * open sessions for any export targets for the given mds 742 * 743 * called under mdsc->mutex 744 */ 745 static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 746 struct ceph_mds_session *session) 747 { 748 struct ceph_mds_info *mi; 749 struct ceph_mds_session *ts; 750 int i, mds = session->s_mds; 751 int target; 752 753 if (mds >= mdsc->mdsmap->m_max_mds) 754 return; 755 mi = &mdsc->mdsmap->m_info[mds]; 756 dout("open_export_target_sessions for mds%d (%d targets)\n", 757 session->s_mds, mi->num_export_targets); 758 759 for (i = 0; i < mi->num_export_targets; i++) { 760 target = mi->export_targets[i]; 761 ts = __ceph_lookup_mds_session(mdsc, target); 762 if (!ts) { 763 ts = register_session(mdsc, target); 764 if (IS_ERR(ts)) 765 return; 766 } 767 if (session->s_state == CEPH_MDS_SESSION_NEW || 768 session->s_state == CEPH_MDS_SESSION_CLOSING) 769 __open_session(mdsc, session); 770 else 771 dout(" mds%d target mds%d %p is %s\n", session->s_mds, 772 i, ts, session_state_name(ts->s_state)); 773 ceph_put_mds_session(ts); 774 } 775 } 776 777 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, 778 struct ceph_mds_session *session) 779 { 780 mutex_lock(&mdsc->mutex); 781 __open_export_target_sessions(mdsc, session); 782 mutex_unlock(&mdsc->mutex); 783 } 784 785 /* 786 * session caps 787 */ 788 789 /* 790 * Free preallocated cap messages assigned to this session 791 */ 792 static void cleanup_cap_releases(struct ceph_mds_session *session) 793 { 794 struct ceph_msg *msg; 795 796 spin_lock(&session->s_cap_lock); 797 while (!list_empty(&session->s_cap_releases)) { 798 msg = list_first_entry(&session->s_cap_releases, 799 struct ceph_msg, list_head); 800 list_del_init(&msg->list_head); 801 ceph_msg_put(msg); 802 } 803 while (!list_empty(&session->s_cap_releases_done)) { 804 msg = list_first_entry(&session->s_cap_releases_done, 805 struct ceph_msg, list_head); 806 list_del_init(&msg->list_head); 807 ceph_msg_put(msg); 808 } 809 spin_unlock(&session->s_cap_lock); 810 } 811 812 /* 813 * Helper to safely iterate over all caps associated with a session, with 814 * special care taken to handle a racing __ceph_remove_cap(). 815 * 816 * Caller must hold session s_mutex. 817 */ 818 static int iterate_session_caps(struct ceph_mds_session *session, 819 int (*cb)(struct inode *, struct ceph_cap *, 820 void *), void *arg) 821 { 822 struct list_head *p; 823 struct ceph_cap *cap; 824 struct inode *inode, *last_inode = NULL; 825 struct ceph_cap *old_cap = NULL; 826 int ret; 827 828 dout("iterate_session_caps %p mds%d\n", session, session->s_mds); 829 spin_lock(&session->s_cap_lock); 830 p = session->s_caps.next; 831 while (p != &session->s_caps) { 832 cap = list_entry(p, struct ceph_cap, session_caps); 833 inode = igrab(&cap->ci->vfs_inode); 834 if (!inode) { 835 p = p->next; 836 continue; 837 } 838 session->s_cap_iterator = cap; 839 spin_unlock(&session->s_cap_lock); 840 841 if (last_inode) { 842 iput(last_inode); 843 last_inode = NULL; 844 } 845 if (old_cap) { 846 ceph_put_cap(session->s_mdsc, old_cap); 847 old_cap = NULL; 848 } 849 850 ret = cb(inode, cap, arg); 851 last_inode = inode; 852 853 spin_lock(&session->s_cap_lock); 854 p = p->next; 855 if (cap->ci == NULL) { 856 dout("iterate_session_caps finishing cap %p removal\n", 857 cap); 858 BUG_ON(cap->session != session); 859 list_del_init(&cap->session_caps); 860 session->s_nr_caps--; 861 cap->session = NULL; 862 old_cap = cap; /* put_cap it w/o locks held */ 863 } 864 if (ret < 0) 865 goto out; 866 } 867 ret = 0; 868 out: 869 session->s_cap_iterator = NULL; 870 spin_unlock(&session->s_cap_lock); 871 872 if (last_inode) 873 iput(last_inode); 874 if (old_cap) 875 ceph_put_cap(session->s_mdsc, old_cap); 876 877 return ret; 878 } 879 880 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, 881 void *arg) 882 { 883 struct ceph_inode_info *ci = ceph_inode(inode); 884 int drop = 0; 885 886 dout("removing cap %p, ci is %p, inode is %p\n", 887 cap, ci, &ci->vfs_inode); 888 spin_lock(&inode->i_lock); 889 __ceph_remove_cap(cap); 890 if (!__ceph_is_any_real_caps(ci)) { 891 struct ceph_mds_client *mdsc = 892 ceph_sb_to_client(inode->i_sb)->mdsc; 893 894 spin_lock(&mdsc->cap_dirty_lock); 895 if (!list_empty(&ci->i_dirty_item)) { 896 pr_info(" dropping dirty %s state for %p %lld\n", 897 ceph_cap_string(ci->i_dirty_caps), 898 inode, ceph_ino(inode)); 899 ci->i_dirty_caps = 0; 900 list_del_init(&ci->i_dirty_item); 901 drop = 1; 902 } 903 if (!list_empty(&ci->i_flushing_item)) { 904 pr_info(" dropping dirty+flushing %s state for %p %lld\n", 905 ceph_cap_string(ci->i_flushing_caps), 906 inode, ceph_ino(inode)); 907 ci->i_flushing_caps = 0; 908 list_del_init(&ci->i_flushing_item); 909 mdsc->num_cap_flushing--; 910 drop = 1; 911 } 912 if (drop && ci->i_wrbuffer_ref) { 913 pr_info(" dropping dirty data for %p %lld\n", 914 inode, ceph_ino(inode)); 915 ci->i_wrbuffer_ref = 0; 916 ci->i_wrbuffer_ref_head = 0; 917 drop++; 918 } 919 spin_unlock(&mdsc->cap_dirty_lock); 920 } 921 spin_unlock(&inode->i_lock); 922 while (drop--) 923 iput(inode); 924 return 0; 925 } 926 927 /* 928 * caller must hold session s_mutex 929 */ 930 static void remove_session_caps(struct ceph_mds_session *session) 931 { 932 dout("remove_session_caps on %p\n", session); 933 iterate_session_caps(session, remove_session_caps_cb, NULL); 934 BUG_ON(session->s_nr_caps > 0); 935 BUG_ON(!list_empty(&session->s_cap_flushing)); 936 cleanup_cap_releases(session); 937 } 938 939 /* 940 * wake up any threads waiting on this session's caps. if the cap is 941 * old (didn't get renewed on the client reconnect), remove it now. 942 * 943 * caller must hold s_mutex. 944 */ 945 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, 946 void *arg) 947 { 948 struct ceph_inode_info *ci = ceph_inode(inode); 949 950 wake_up_all(&ci->i_cap_wq); 951 if (arg) { 952 spin_lock(&inode->i_lock); 953 ci->i_wanted_max_size = 0; 954 ci->i_requested_max_size = 0; 955 spin_unlock(&inode->i_lock); 956 } 957 return 0; 958 } 959 960 static void wake_up_session_caps(struct ceph_mds_session *session, 961 int reconnect) 962 { 963 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds); 964 iterate_session_caps(session, wake_up_session_cb, 965 (void *)(unsigned long)reconnect); 966 } 967 968 /* 969 * Send periodic message to MDS renewing all currently held caps. The 970 * ack will reset the expiration for all caps from this session. 971 * 972 * caller holds s_mutex 973 */ 974 static int send_renew_caps(struct ceph_mds_client *mdsc, 975 struct ceph_mds_session *session) 976 { 977 struct ceph_msg *msg; 978 int state; 979 980 if (time_after_eq(jiffies, session->s_cap_ttl) && 981 time_after_eq(session->s_cap_ttl, session->s_renew_requested)) 982 pr_info("mds%d caps stale\n", session->s_mds); 983 session->s_renew_requested = jiffies; 984 985 /* do not try to renew caps until a recovering mds has reconnected 986 * with its clients. */ 987 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); 988 if (state < CEPH_MDS_STATE_RECONNECT) { 989 dout("send_renew_caps ignoring mds%d (%s)\n", 990 session->s_mds, ceph_mds_state_name(state)); 991 return 0; 992 } 993 994 dout("send_renew_caps to mds%d (%s)\n", session->s_mds, 995 ceph_mds_state_name(state)); 996 msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, 997 ++session->s_renew_seq); 998 if (!msg) 999 return -ENOMEM; 1000 ceph_con_send(&session->s_con, msg); 1001 return 0; 1002 } 1003 1004 /* 1005 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 1006 * 1007 * Called under session->s_mutex 1008 */ 1009 static void renewed_caps(struct ceph_mds_client *mdsc, 1010 struct ceph_mds_session *session, int is_renew) 1011 { 1012 int was_stale; 1013 int wake = 0; 1014 1015 spin_lock(&session->s_cap_lock); 1016 was_stale = is_renew && (session->s_cap_ttl == 0 || 1017 time_after_eq(jiffies, session->s_cap_ttl)); 1018 1019 session->s_cap_ttl = session->s_renew_requested + 1020 mdsc->mdsmap->m_session_timeout*HZ; 1021 1022 if (was_stale) { 1023 if (time_before(jiffies, session->s_cap_ttl)) { 1024 pr_info("mds%d caps renewed\n", session->s_mds); 1025 wake = 1; 1026 } else { 1027 pr_info("mds%d caps still stale\n", session->s_mds); 1028 } 1029 } 1030 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n", 1031 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh", 1032 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); 1033 spin_unlock(&session->s_cap_lock); 1034 1035 if (wake) 1036 wake_up_session_caps(session, 0); 1037 } 1038 1039 /* 1040 * send a session close request 1041 */ 1042 static int request_close_session(struct ceph_mds_client *mdsc, 1043 struct ceph_mds_session *session) 1044 { 1045 struct ceph_msg *msg; 1046 1047 dout("request_close_session mds%d state %s seq %lld\n", 1048 session->s_mds, session_state_name(session->s_state), 1049 session->s_seq); 1050 msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq); 1051 if (!msg) 1052 return -ENOMEM; 1053 ceph_con_send(&session->s_con, msg); 1054 return 0; 1055 } 1056 1057 /* 1058 * Called with s_mutex held. 1059 */ 1060 static int __close_session(struct ceph_mds_client *mdsc, 1061 struct ceph_mds_session *session) 1062 { 1063 if (session->s_state >= CEPH_MDS_SESSION_CLOSING) 1064 return 0; 1065 session->s_state = CEPH_MDS_SESSION_CLOSING; 1066 return request_close_session(mdsc, session); 1067 } 1068 1069 /* 1070 * Trim old(er) caps. 1071 * 1072 * Because we can't cache an inode without one or more caps, we do 1073 * this indirectly: if a cap is unused, we prune its aliases, at which 1074 * point the inode will hopefully get dropped to. 1075 * 1076 * Yes, this is a bit sloppy. Our only real goal here is to respond to 1077 * memory pressure from the MDS, though, so it needn't be perfect. 1078 */ 1079 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) 1080 { 1081 struct ceph_mds_session *session = arg; 1082 struct ceph_inode_info *ci = ceph_inode(inode); 1083 int used, oissued, mine; 1084 1085 if (session->s_trim_caps <= 0) 1086 return -1; 1087 1088 spin_lock(&inode->i_lock); 1089 mine = cap->issued | cap->implemented; 1090 used = __ceph_caps_used(ci); 1091 oissued = __ceph_caps_issued_other(ci, cap); 1092 1093 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s\n", 1094 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), 1095 ceph_cap_string(used)); 1096 if (ci->i_dirty_caps) 1097 goto out; /* dirty caps */ 1098 if ((used & ~oissued) & mine) 1099 goto out; /* we need these caps */ 1100 1101 session->s_trim_caps--; 1102 if (oissued) { 1103 /* we aren't the only cap.. just remove us */ 1104 __ceph_remove_cap(cap); 1105 } else { 1106 /* try to drop referring dentries */ 1107 spin_unlock(&inode->i_lock); 1108 d_prune_aliases(inode); 1109 dout("trim_caps_cb %p cap %p pruned, count now %d\n", 1110 inode, cap, atomic_read(&inode->i_count)); 1111 return 0; 1112 } 1113 1114 out: 1115 spin_unlock(&inode->i_lock); 1116 return 0; 1117 } 1118 1119 /* 1120 * Trim session cap count down to some max number. 1121 */ 1122 static int trim_caps(struct ceph_mds_client *mdsc, 1123 struct ceph_mds_session *session, 1124 int max_caps) 1125 { 1126 int trim_caps = session->s_nr_caps - max_caps; 1127 1128 dout("trim_caps mds%d start: %d / %d, trim %d\n", 1129 session->s_mds, session->s_nr_caps, max_caps, trim_caps); 1130 if (trim_caps > 0) { 1131 session->s_trim_caps = trim_caps; 1132 iterate_session_caps(session, trim_caps_cb, session); 1133 dout("trim_caps mds%d done: %d / %d, trimmed %d\n", 1134 session->s_mds, session->s_nr_caps, max_caps, 1135 trim_caps - session->s_trim_caps); 1136 session->s_trim_caps = 0; 1137 } 1138 return 0; 1139 } 1140 1141 /* 1142 * Allocate cap_release messages. If there is a partially full message 1143 * in the queue, try to allocate enough to cover it's remainder, so that 1144 * we can send it immediately. 1145 * 1146 * Called under s_mutex. 1147 */ 1148 int ceph_add_cap_releases(struct ceph_mds_client *mdsc, 1149 struct ceph_mds_session *session) 1150 { 1151 struct ceph_msg *msg, *partial = NULL; 1152 struct ceph_mds_cap_release *head; 1153 int err = -ENOMEM; 1154 int extra = mdsc->fsc->mount_options->cap_release_safety; 1155 int num; 1156 1157 dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds, 1158 extra); 1159 1160 spin_lock(&session->s_cap_lock); 1161 1162 if (!list_empty(&session->s_cap_releases)) { 1163 msg = list_first_entry(&session->s_cap_releases, 1164 struct ceph_msg, 1165 list_head); 1166 head = msg->front.iov_base; 1167 num = le32_to_cpu(head->num); 1168 if (num) { 1169 dout(" partial %p with (%d/%d)\n", msg, num, 1170 (int)CEPH_CAPS_PER_RELEASE); 1171 extra += CEPH_CAPS_PER_RELEASE - num; 1172 partial = msg; 1173 } 1174 } 1175 while (session->s_num_cap_releases < session->s_nr_caps + extra) { 1176 spin_unlock(&session->s_cap_lock); 1177 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE, 1178 GFP_NOFS); 1179 if (!msg) 1180 goto out_unlocked; 1181 dout("add_cap_releases %p msg %p now %d\n", session, msg, 1182 (int)msg->front.iov_len); 1183 head = msg->front.iov_base; 1184 head->num = cpu_to_le32(0); 1185 msg->front.iov_len = sizeof(*head); 1186 spin_lock(&session->s_cap_lock); 1187 list_add(&msg->list_head, &session->s_cap_releases); 1188 session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE; 1189 } 1190 1191 if (partial) { 1192 head = partial->front.iov_base; 1193 num = le32_to_cpu(head->num); 1194 dout(" queueing partial %p with %d/%d\n", partial, num, 1195 (int)CEPH_CAPS_PER_RELEASE); 1196 list_move_tail(&partial->list_head, 1197 &session->s_cap_releases_done); 1198 session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num; 1199 } 1200 err = 0; 1201 spin_unlock(&session->s_cap_lock); 1202 out_unlocked: 1203 return err; 1204 } 1205 1206 /* 1207 * flush all dirty inode data to disk. 1208 * 1209 * returns true if we've flushed through want_flush_seq 1210 */ 1211 static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq) 1212 { 1213 int mds, ret = 1; 1214 1215 dout("check_cap_flush want %lld\n", want_flush_seq); 1216 mutex_lock(&mdsc->mutex); 1217 for (mds = 0; ret && mds < mdsc->max_sessions; mds++) { 1218 struct ceph_mds_session *session = mdsc->sessions[mds]; 1219 1220 if (!session) 1221 continue; 1222 get_session(session); 1223 mutex_unlock(&mdsc->mutex); 1224 1225 mutex_lock(&session->s_mutex); 1226 if (!list_empty(&session->s_cap_flushing)) { 1227 struct ceph_inode_info *ci = 1228 list_entry(session->s_cap_flushing.next, 1229 struct ceph_inode_info, 1230 i_flushing_item); 1231 struct inode *inode = &ci->vfs_inode; 1232 1233 spin_lock(&inode->i_lock); 1234 if (ci->i_cap_flush_seq <= want_flush_seq) { 1235 dout("check_cap_flush still flushing %p " 1236 "seq %lld <= %lld to mds%d\n", inode, 1237 ci->i_cap_flush_seq, want_flush_seq, 1238 session->s_mds); 1239 ret = 0; 1240 } 1241 spin_unlock(&inode->i_lock); 1242 } 1243 mutex_unlock(&session->s_mutex); 1244 ceph_put_mds_session(session); 1245 1246 if (!ret) 1247 return ret; 1248 mutex_lock(&mdsc->mutex); 1249 } 1250 1251 mutex_unlock(&mdsc->mutex); 1252 dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq); 1253 return ret; 1254 } 1255 1256 /* 1257 * called under s_mutex 1258 */ 1259 void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 1260 struct ceph_mds_session *session) 1261 { 1262 struct ceph_msg *msg; 1263 1264 dout("send_cap_releases mds%d\n", session->s_mds); 1265 spin_lock(&session->s_cap_lock); 1266 while (!list_empty(&session->s_cap_releases_done)) { 1267 msg = list_first_entry(&session->s_cap_releases_done, 1268 struct ceph_msg, list_head); 1269 list_del_init(&msg->list_head); 1270 spin_unlock(&session->s_cap_lock); 1271 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1272 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 1273 ceph_con_send(&session->s_con, msg); 1274 spin_lock(&session->s_cap_lock); 1275 } 1276 spin_unlock(&session->s_cap_lock); 1277 } 1278 1279 static void discard_cap_releases(struct ceph_mds_client *mdsc, 1280 struct ceph_mds_session *session) 1281 { 1282 struct ceph_msg *msg; 1283 struct ceph_mds_cap_release *head; 1284 unsigned num; 1285 1286 dout("discard_cap_releases mds%d\n", session->s_mds); 1287 spin_lock(&session->s_cap_lock); 1288 1289 /* zero out the in-progress message */ 1290 msg = list_first_entry(&session->s_cap_releases, 1291 struct ceph_msg, list_head); 1292 head = msg->front.iov_base; 1293 num = le32_to_cpu(head->num); 1294 dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num); 1295 head->num = cpu_to_le32(0); 1296 session->s_num_cap_releases += num; 1297 1298 /* requeue completed messages */ 1299 while (!list_empty(&session->s_cap_releases_done)) { 1300 msg = list_first_entry(&session->s_cap_releases_done, 1301 struct ceph_msg, list_head); 1302 list_del_init(&msg->list_head); 1303 1304 head = msg->front.iov_base; 1305 num = le32_to_cpu(head->num); 1306 dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, 1307 num); 1308 session->s_num_cap_releases += num; 1309 head->num = cpu_to_le32(0); 1310 msg->front.iov_len = sizeof(*head); 1311 list_add(&msg->list_head, &session->s_cap_releases); 1312 } 1313 1314 spin_unlock(&session->s_cap_lock); 1315 } 1316 1317 /* 1318 * requests 1319 */ 1320 1321 /* 1322 * Create an mds request. 1323 */ 1324 struct ceph_mds_request * 1325 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) 1326 { 1327 struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS); 1328 1329 if (!req) 1330 return ERR_PTR(-ENOMEM); 1331 1332 mutex_init(&req->r_fill_mutex); 1333 req->r_mdsc = mdsc; 1334 req->r_started = jiffies; 1335 req->r_resend_mds = -1; 1336 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 1337 req->r_fmode = -1; 1338 kref_init(&req->r_kref); 1339 INIT_LIST_HEAD(&req->r_wait); 1340 init_completion(&req->r_completion); 1341 init_completion(&req->r_safe_completion); 1342 INIT_LIST_HEAD(&req->r_unsafe_item); 1343 1344 req->r_op = op; 1345 req->r_direct_mode = mode; 1346 return req; 1347 } 1348 1349 /* 1350 * return oldest (lowest) request, tid in request tree, 0 if none. 1351 * 1352 * called under mdsc->mutex. 1353 */ 1354 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) 1355 { 1356 if (RB_EMPTY_ROOT(&mdsc->request_tree)) 1357 return NULL; 1358 return rb_entry(rb_first(&mdsc->request_tree), 1359 struct ceph_mds_request, r_node); 1360 } 1361 1362 static u64 __get_oldest_tid(struct ceph_mds_client *mdsc) 1363 { 1364 struct ceph_mds_request *req = __get_oldest_req(mdsc); 1365 1366 if (req) 1367 return req->r_tid; 1368 return 0; 1369 } 1370 1371 /* 1372 * Build a dentry's path. Allocate on heap; caller must kfree. Based 1373 * on build_path_from_dentry in fs/cifs/dir.c. 1374 * 1375 * If @stop_on_nosnap, generate path relative to the first non-snapped 1376 * inode. 1377 * 1378 * Encode hidden .snap dirs as a double /, i.e. 1379 * foo/.snap/bar -> foo//bar 1380 */ 1381 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base, 1382 int stop_on_nosnap) 1383 { 1384 struct dentry *temp; 1385 char *path; 1386 int len, pos; 1387 1388 if (dentry == NULL) 1389 return ERR_PTR(-EINVAL); 1390 1391 retry: 1392 len = 0; 1393 for (temp = dentry; !IS_ROOT(temp);) { 1394 struct inode *inode = temp->d_inode; 1395 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) 1396 len++; /* slash only */ 1397 else if (stop_on_nosnap && inode && 1398 ceph_snap(inode) == CEPH_NOSNAP) 1399 break; 1400 else 1401 len += 1 + temp->d_name.len; 1402 temp = temp->d_parent; 1403 if (temp == NULL) { 1404 pr_err("build_path corrupt dentry %p\n", dentry); 1405 return ERR_PTR(-EINVAL); 1406 } 1407 } 1408 if (len) 1409 len--; /* no leading '/' */ 1410 1411 path = kmalloc(len+1, GFP_NOFS); 1412 if (path == NULL) 1413 return ERR_PTR(-ENOMEM); 1414 pos = len; 1415 path[pos] = 0; /* trailing null */ 1416 for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) { 1417 struct inode *inode = temp->d_inode; 1418 1419 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 1420 dout("build_path path+%d: %p SNAPDIR\n", 1421 pos, temp); 1422 } else if (stop_on_nosnap && inode && 1423 ceph_snap(inode) == CEPH_NOSNAP) { 1424 break; 1425 } else { 1426 pos -= temp->d_name.len; 1427 if (pos < 0) 1428 break; 1429 strncpy(path + pos, temp->d_name.name, 1430 temp->d_name.len); 1431 } 1432 if (pos) 1433 path[--pos] = '/'; 1434 temp = temp->d_parent; 1435 if (temp == NULL) { 1436 pr_err("build_path corrupt dentry\n"); 1437 kfree(path); 1438 return ERR_PTR(-EINVAL); 1439 } 1440 } 1441 if (pos != 0) { 1442 pr_err("build_path did not end path lookup where " 1443 "expected, namelen is %d, pos is %d\n", len, pos); 1444 /* presumably this is only possible if racing with a 1445 rename of one of the parent directories (we can not 1446 lock the dentries above us to prevent this, but 1447 retrying should be harmless) */ 1448 kfree(path); 1449 goto retry; 1450 } 1451 1452 *base = ceph_ino(temp->d_inode); 1453 *plen = len; 1454 dout("build_path on %p %d built %llx '%.*s'\n", 1455 dentry, atomic_read(&dentry->d_count), *base, len, path); 1456 return path; 1457 } 1458 1459 static int build_dentry_path(struct dentry *dentry, 1460 const char **ppath, int *ppathlen, u64 *pino, 1461 int *pfreepath) 1462 { 1463 char *path; 1464 1465 if (ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP) { 1466 *pino = ceph_ino(dentry->d_parent->d_inode); 1467 *ppath = dentry->d_name.name; 1468 *ppathlen = dentry->d_name.len; 1469 return 0; 1470 } 1471 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 1472 if (IS_ERR(path)) 1473 return PTR_ERR(path); 1474 *ppath = path; 1475 *pfreepath = 1; 1476 return 0; 1477 } 1478 1479 static int build_inode_path(struct inode *inode, 1480 const char **ppath, int *ppathlen, u64 *pino, 1481 int *pfreepath) 1482 { 1483 struct dentry *dentry; 1484 char *path; 1485 1486 if (ceph_snap(inode) == CEPH_NOSNAP) { 1487 *pino = ceph_ino(inode); 1488 *ppathlen = 0; 1489 return 0; 1490 } 1491 dentry = d_find_alias(inode); 1492 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 1493 dput(dentry); 1494 if (IS_ERR(path)) 1495 return PTR_ERR(path); 1496 *ppath = path; 1497 *pfreepath = 1; 1498 return 0; 1499 } 1500 1501 /* 1502 * request arguments may be specified via an inode *, a dentry *, or 1503 * an explicit ino+path. 1504 */ 1505 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, 1506 const char *rpath, u64 rino, 1507 const char **ppath, int *pathlen, 1508 u64 *ino, int *freepath) 1509 { 1510 int r = 0; 1511 1512 if (rinode) { 1513 r = build_inode_path(rinode, ppath, pathlen, ino, freepath); 1514 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode), 1515 ceph_snap(rinode)); 1516 } else if (rdentry) { 1517 r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath); 1518 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, 1519 *ppath); 1520 } else if (rpath) { 1521 *ino = rino; 1522 *ppath = rpath; 1523 *pathlen = strlen(rpath); 1524 dout(" path %.*s\n", *pathlen, rpath); 1525 } 1526 1527 return r; 1528 } 1529 1530 /* 1531 * called under mdsc->mutex 1532 */ 1533 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, 1534 struct ceph_mds_request *req, 1535 int mds) 1536 { 1537 struct ceph_msg *msg; 1538 struct ceph_mds_request_head *head; 1539 const char *path1 = NULL; 1540 const char *path2 = NULL; 1541 u64 ino1 = 0, ino2 = 0; 1542 int pathlen1 = 0, pathlen2 = 0; 1543 int freepath1 = 0, freepath2 = 0; 1544 int len; 1545 u16 releases; 1546 void *p, *end; 1547 int ret; 1548 1549 ret = set_request_path_attr(req->r_inode, req->r_dentry, 1550 req->r_path1, req->r_ino1.ino, 1551 &path1, &pathlen1, &ino1, &freepath1); 1552 if (ret < 0) { 1553 msg = ERR_PTR(ret); 1554 goto out; 1555 } 1556 1557 ret = set_request_path_attr(NULL, req->r_old_dentry, 1558 req->r_path2, req->r_ino2.ino, 1559 &path2, &pathlen2, &ino2, &freepath2); 1560 if (ret < 0) { 1561 msg = ERR_PTR(ret); 1562 goto out_free1; 1563 } 1564 1565 len = sizeof(*head) + 1566 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)); 1567 1568 /* calculate (max) length for cap releases */ 1569 len += sizeof(struct ceph_mds_request_release) * 1570 (!!req->r_inode_drop + !!req->r_dentry_drop + 1571 !!req->r_old_inode_drop + !!req->r_old_dentry_drop); 1572 if (req->r_dentry_drop) 1573 len += req->r_dentry->d_name.len; 1574 if (req->r_old_dentry_drop) 1575 len += req->r_old_dentry->d_name.len; 1576 1577 msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS); 1578 if (!msg) { 1579 msg = ERR_PTR(-ENOMEM); 1580 goto out_free2; 1581 } 1582 1583 msg->hdr.tid = cpu_to_le64(req->r_tid); 1584 1585 head = msg->front.iov_base; 1586 p = msg->front.iov_base + sizeof(*head); 1587 end = msg->front.iov_base + msg->front.iov_len; 1588 1589 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); 1590 head->op = cpu_to_le32(req->r_op); 1591 head->caller_uid = cpu_to_le32(current_fsuid()); 1592 head->caller_gid = cpu_to_le32(current_fsgid()); 1593 head->args = req->r_args; 1594 1595 ceph_encode_filepath(&p, end, ino1, path1); 1596 ceph_encode_filepath(&p, end, ino2, path2); 1597 1598 /* make note of release offset, in case we need to replay */ 1599 req->r_request_release_offset = p - msg->front.iov_base; 1600 1601 /* cap releases */ 1602 releases = 0; 1603 if (req->r_inode_drop) 1604 releases += ceph_encode_inode_release(&p, 1605 req->r_inode ? req->r_inode : req->r_dentry->d_inode, 1606 mds, req->r_inode_drop, req->r_inode_unless, 0); 1607 if (req->r_dentry_drop) 1608 releases += ceph_encode_dentry_release(&p, req->r_dentry, 1609 mds, req->r_dentry_drop, req->r_dentry_unless); 1610 if (req->r_old_dentry_drop) 1611 releases += ceph_encode_dentry_release(&p, req->r_old_dentry, 1612 mds, req->r_old_dentry_drop, req->r_old_dentry_unless); 1613 if (req->r_old_inode_drop) 1614 releases += ceph_encode_inode_release(&p, 1615 req->r_old_dentry->d_inode, 1616 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); 1617 head->num_releases = cpu_to_le16(releases); 1618 1619 BUG_ON(p > end); 1620 msg->front.iov_len = p - msg->front.iov_base; 1621 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1622 1623 msg->pages = req->r_pages; 1624 msg->nr_pages = req->r_num_pages; 1625 msg->hdr.data_len = cpu_to_le32(req->r_data_len); 1626 msg->hdr.data_off = cpu_to_le16(0); 1627 1628 out_free2: 1629 if (freepath2) 1630 kfree((char *)path2); 1631 out_free1: 1632 if (freepath1) 1633 kfree((char *)path1); 1634 out: 1635 return msg; 1636 } 1637 1638 /* 1639 * called under mdsc->mutex if error, under no mutex if 1640 * success. 1641 */ 1642 static void complete_request(struct ceph_mds_client *mdsc, 1643 struct ceph_mds_request *req) 1644 { 1645 if (req->r_callback) 1646 req->r_callback(mdsc, req); 1647 else 1648 complete_all(&req->r_completion); 1649 } 1650 1651 /* 1652 * called under mdsc->mutex 1653 */ 1654 static int __prepare_send_request(struct ceph_mds_client *mdsc, 1655 struct ceph_mds_request *req, 1656 int mds) 1657 { 1658 struct ceph_mds_request_head *rhead; 1659 struct ceph_msg *msg; 1660 int flags = 0; 1661 1662 req->r_mds = mds; 1663 req->r_attempts++; 1664 if (req->r_inode) { 1665 struct ceph_cap *cap = 1666 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); 1667 1668 if (cap) 1669 req->r_sent_on_mseq = cap->mseq; 1670 else 1671 req->r_sent_on_mseq = -1; 1672 } 1673 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req, 1674 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); 1675 1676 if (req->r_got_unsafe) { 1677 /* 1678 * Replay. Do not regenerate message (and rebuild 1679 * paths, etc.); just use the original message. 1680 * Rebuilding paths will break for renames because 1681 * d_move mangles the src name. 1682 */ 1683 msg = req->r_request; 1684 rhead = msg->front.iov_base; 1685 1686 flags = le32_to_cpu(rhead->flags); 1687 flags |= CEPH_MDS_FLAG_REPLAY; 1688 rhead->flags = cpu_to_le32(flags); 1689 1690 if (req->r_target_inode) 1691 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); 1692 1693 rhead->num_retry = req->r_attempts - 1; 1694 1695 /* remove cap/dentry releases from message */ 1696 rhead->num_releases = 0; 1697 msg->hdr.front_len = cpu_to_le32(req->r_request_release_offset); 1698 msg->front.iov_len = req->r_request_release_offset; 1699 return 0; 1700 } 1701 1702 if (req->r_request) { 1703 ceph_msg_put(req->r_request); 1704 req->r_request = NULL; 1705 } 1706 msg = create_request_message(mdsc, req, mds); 1707 if (IS_ERR(msg)) { 1708 req->r_err = PTR_ERR(msg); 1709 complete_request(mdsc, req); 1710 return PTR_ERR(msg); 1711 } 1712 req->r_request = msg; 1713 1714 rhead = msg->front.iov_base; 1715 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 1716 if (req->r_got_unsafe) 1717 flags |= CEPH_MDS_FLAG_REPLAY; 1718 if (req->r_locked_dir) 1719 flags |= CEPH_MDS_FLAG_WANT_DENTRY; 1720 rhead->flags = cpu_to_le32(flags); 1721 rhead->num_fwd = req->r_num_fwd; 1722 rhead->num_retry = req->r_attempts - 1; 1723 rhead->ino = 0; 1724 1725 dout(" r_locked_dir = %p\n", req->r_locked_dir); 1726 return 0; 1727 } 1728 1729 /* 1730 * send request, or put it on the appropriate wait list. 1731 */ 1732 static int __do_request(struct ceph_mds_client *mdsc, 1733 struct ceph_mds_request *req) 1734 { 1735 struct ceph_mds_session *session = NULL; 1736 int mds = -1; 1737 int err = -EAGAIN; 1738 1739 if (req->r_err || req->r_got_result) 1740 goto out; 1741 1742 if (req->r_timeout && 1743 time_after_eq(jiffies, req->r_started + req->r_timeout)) { 1744 dout("do_request timed out\n"); 1745 err = -EIO; 1746 goto finish; 1747 } 1748 1749 mds = __choose_mds(mdsc, req); 1750 if (mds < 0 || 1751 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { 1752 dout("do_request no mds or not active, waiting for map\n"); 1753 list_add(&req->r_wait, &mdsc->waiting_for_map); 1754 goto out; 1755 } 1756 1757 /* get, open session */ 1758 session = __ceph_lookup_mds_session(mdsc, mds); 1759 if (!session) { 1760 session = register_session(mdsc, mds); 1761 if (IS_ERR(session)) { 1762 err = PTR_ERR(session); 1763 goto finish; 1764 } 1765 } 1766 dout("do_request mds%d session %p state %s\n", mds, session, 1767 session_state_name(session->s_state)); 1768 if (session->s_state != CEPH_MDS_SESSION_OPEN && 1769 session->s_state != CEPH_MDS_SESSION_HUNG) { 1770 if (session->s_state == CEPH_MDS_SESSION_NEW || 1771 session->s_state == CEPH_MDS_SESSION_CLOSING) 1772 __open_session(mdsc, session); 1773 list_add(&req->r_wait, &session->s_waiting); 1774 goto out_session; 1775 } 1776 1777 /* send request */ 1778 req->r_session = get_session(session); 1779 req->r_resend_mds = -1; /* forget any previous mds hint */ 1780 1781 if (req->r_request_started == 0) /* note request start time */ 1782 req->r_request_started = jiffies; 1783 1784 err = __prepare_send_request(mdsc, req, mds); 1785 if (!err) { 1786 ceph_msg_get(req->r_request); 1787 ceph_con_send(&session->s_con, req->r_request); 1788 } 1789 1790 out_session: 1791 ceph_put_mds_session(session); 1792 out: 1793 return err; 1794 1795 finish: 1796 req->r_err = err; 1797 complete_request(mdsc, req); 1798 goto out; 1799 } 1800 1801 /* 1802 * called under mdsc->mutex 1803 */ 1804 static void __wake_requests(struct ceph_mds_client *mdsc, 1805 struct list_head *head) 1806 { 1807 struct ceph_mds_request *req, *nreq; 1808 1809 list_for_each_entry_safe(req, nreq, head, r_wait) { 1810 list_del_init(&req->r_wait); 1811 __do_request(mdsc, req); 1812 } 1813 } 1814 1815 /* 1816 * Wake up threads with requests pending for @mds, so that they can 1817 * resubmit their requests to a possibly different mds. 1818 */ 1819 static void kick_requests(struct ceph_mds_client *mdsc, int mds) 1820 { 1821 struct ceph_mds_request *req; 1822 struct rb_node *p; 1823 1824 dout("kick_requests mds%d\n", mds); 1825 for (p = rb_first(&mdsc->request_tree); p; p = rb_next(p)) { 1826 req = rb_entry(p, struct ceph_mds_request, r_node); 1827 if (req->r_got_unsafe) 1828 continue; 1829 if (req->r_session && 1830 req->r_session->s_mds == mds) { 1831 dout(" kicking tid %llu\n", req->r_tid); 1832 put_request_session(req); 1833 __do_request(mdsc, req); 1834 } 1835 } 1836 } 1837 1838 void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, 1839 struct ceph_mds_request *req) 1840 { 1841 dout("submit_request on %p\n", req); 1842 mutex_lock(&mdsc->mutex); 1843 __register_request(mdsc, req, NULL); 1844 __do_request(mdsc, req); 1845 mutex_unlock(&mdsc->mutex); 1846 } 1847 1848 /* 1849 * Synchrously perform an mds request. Take care of all of the 1850 * session setup, forwarding, retry details. 1851 */ 1852 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, 1853 struct inode *dir, 1854 struct ceph_mds_request *req) 1855 { 1856 int err; 1857 1858 dout("do_request on %p\n", req); 1859 1860 /* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */ 1861 if (req->r_inode) 1862 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 1863 if (req->r_locked_dir) 1864 ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN); 1865 if (req->r_old_dentry) 1866 ceph_get_cap_refs( 1867 ceph_inode(req->r_old_dentry->d_parent->d_inode), 1868 CEPH_CAP_PIN); 1869 1870 /* issue */ 1871 mutex_lock(&mdsc->mutex); 1872 __register_request(mdsc, req, dir); 1873 __do_request(mdsc, req); 1874 1875 if (req->r_err) { 1876 err = req->r_err; 1877 __unregister_request(mdsc, req); 1878 dout("do_request early error %d\n", err); 1879 goto out; 1880 } 1881 1882 /* wait */ 1883 mutex_unlock(&mdsc->mutex); 1884 dout("do_request waiting\n"); 1885 if (req->r_timeout) { 1886 err = (long)wait_for_completion_killable_timeout( 1887 &req->r_completion, req->r_timeout); 1888 if (err == 0) 1889 err = -EIO; 1890 } else { 1891 err = wait_for_completion_killable(&req->r_completion); 1892 } 1893 dout("do_request waited, got %d\n", err); 1894 mutex_lock(&mdsc->mutex); 1895 1896 /* only abort if we didn't race with a real reply */ 1897 if (req->r_got_result) { 1898 err = le32_to_cpu(req->r_reply_info.head->result); 1899 } else if (err < 0) { 1900 dout("aborted request %lld with %d\n", req->r_tid, err); 1901 1902 /* 1903 * ensure we aren't running concurrently with 1904 * ceph_fill_trace or ceph_readdir_prepopulate, which 1905 * rely on locks (dir mutex) held by our caller. 1906 */ 1907 mutex_lock(&req->r_fill_mutex); 1908 req->r_err = err; 1909 req->r_aborted = true; 1910 mutex_unlock(&req->r_fill_mutex); 1911 1912 if (req->r_locked_dir && 1913 (req->r_op & CEPH_MDS_OP_WRITE)) 1914 ceph_invalidate_dir_request(req); 1915 } else { 1916 err = req->r_err; 1917 } 1918 1919 out: 1920 mutex_unlock(&mdsc->mutex); 1921 dout("do_request %p done, result %d\n", req, err); 1922 return err; 1923 } 1924 1925 /* 1926 * Invalidate dir I_COMPLETE, dentry lease state on an aborted MDS 1927 * namespace request. 1928 */ 1929 void ceph_invalidate_dir_request(struct ceph_mds_request *req) 1930 { 1931 struct inode *inode = req->r_locked_dir; 1932 struct ceph_inode_info *ci = ceph_inode(inode); 1933 1934 dout("invalidate_dir_request %p (I_COMPLETE, lease(s))\n", inode); 1935 spin_lock(&inode->i_lock); 1936 ci->i_ceph_flags &= ~CEPH_I_COMPLETE; 1937 ci->i_release_count++; 1938 spin_unlock(&inode->i_lock); 1939 1940 if (req->r_dentry) 1941 ceph_invalidate_dentry_lease(req->r_dentry); 1942 if (req->r_old_dentry) 1943 ceph_invalidate_dentry_lease(req->r_old_dentry); 1944 } 1945 1946 /* 1947 * Handle mds reply. 1948 * 1949 * We take the session mutex and parse and process the reply immediately. 1950 * This preserves the logical ordering of replies, capabilities, etc., sent 1951 * by the MDS as they are applied to our local cache. 1952 */ 1953 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) 1954 { 1955 struct ceph_mds_client *mdsc = session->s_mdsc; 1956 struct ceph_mds_request *req; 1957 struct ceph_mds_reply_head *head = msg->front.iov_base; 1958 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 1959 u64 tid; 1960 int err, result; 1961 int mds = session->s_mds; 1962 1963 if (msg->front.iov_len < sizeof(*head)) { 1964 pr_err("mdsc_handle_reply got corrupt (short) reply\n"); 1965 ceph_msg_dump(msg); 1966 return; 1967 } 1968 1969 /* get request, session */ 1970 tid = le64_to_cpu(msg->hdr.tid); 1971 mutex_lock(&mdsc->mutex); 1972 req = __lookup_request(mdsc, tid); 1973 if (!req) { 1974 dout("handle_reply on unknown tid %llu\n", tid); 1975 mutex_unlock(&mdsc->mutex); 1976 return; 1977 } 1978 dout("handle_reply %p\n", req); 1979 1980 /* correct session? */ 1981 if (req->r_session != session) { 1982 pr_err("mdsc_handle_reply got %llu on session mds%d" 1983 " not mds%d\n", tid, session->s_mds, 1984 req->r_session ? req->r_session->s_mds : -1); 1985 mutex_unlock(&mdsc->mutex); 1986 goto out; 1987 } 1988 1989 /* dup? */ 1990 if ((req->r_got_unsafe && !head->safe) || 1991 (req->r_got_safe && head->safe)) { 1992 pr_warning("got a dup %s reply on %llu from mds%d\n", 1993 head->safe ? "safe" : "unsafe", tid, mds); 1994 mutex_unlock(&mdsc->mutex); 1995 goto out; 1996 } 1997 if (req->r_got_safe && !head->safe) { 1998 pr_warning("got unsafe after safe on %llu from mds%d\n", 1999 tid, mds); 2000 mutex_unlock(&mdsc->mutex); 2001 goto out; 2002 } 2003 2004 result = le32_to_cpu(head->result); 2005 2006 /* 2007 * Handle an ESTALE 2008 * if we're not talking to the authority, send to them 2009 * if the authority has changed while we weren't looking, 2010 * send to new authority 2011 * Otherwise we just have to return an ESTALE 2012 */ 2013 if (result == -ESTALE) { 2014 dout("got ESTALE on request %llu", req->r_tid); 2015 if (!req->r_inode) { 2016 /* do nothing; not an authority problem */ 2017 } else if (req->r_direct_mode != USE_AUTH_MDS) { 2018 dout("not using auth, setting for that now"); 2019 req->r_direct_mode = USE_AUTH_MDS; 2020 __do_request(mdsc, req); 2021 mutex_unlock(&mdsc->mutex); 2022 goto out; 2023 } else { 2024 struct ceph_inode_info *ci = ceph_inode(req->r_inode); 2025 struct ceph_cap *cap = 2026 ceph_get_cap_for_mds(ci, req->r_mds);; 2027 2028 dout("already using auth"); 2029 if ((!cap || cap != ci->i_auth_cap) || 2030 (cap->mseq != req->r_sent_on_mseq)) { 2031 dout("but cap changed, so resending"); 2032 __do_request(mdsc, req); 2033 mutex_unlock(&mdsc->mutex); 2034 goto out; 2035 } 2036 } 2037 dout("have to return ESTALE on request %llu", req->r_tid); 2038 } 2039 2040 2041 if (head->safe) { 2042 req->r_got_safe = true; 2043 __unregister_request(mdsc, req); 2044 complete_all(&req->r_safe_completion); 2045 2046 if (req->r_got_unsafe) { 2047 /* 2048 * We already handled the unsafe response, now do the 2049 * cleanup. No need to examine the response; the MDS 2050 * doesn't include any result info in the safe 2051 * response. And even if it did, there is nothing 2052 * useful we could do with a revised return value. 2053 */ 2054 dout("got safe reply %llu, mds%d\n", tid, mds); 2055 list_del_init(&req->r_unsafe_item); 2056 2057 /* last unsafe request during umount? */ 2058 if (mdsc->stopping && !__get_oldest_req(mdsc)) 2059 complete_all(&mdsc->safe_umount_waiters); 2060 mutex_unlock(&mdsc->mutex); 2061 goto out; 2062 } 2063 } else { 2064 req->r_got_unsafe = true; 2065 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 2066 } 2067 2068 dout("handle_reply tid %lld result %d\n", tid, result); 2069 rinfo = &req->r_reply_info; 2070 err = parse_reply_info(msg, rinfo); 2071 mutex_unlock(&mdsc->mutex); 2072 2073 mutex_lock(&session->s_mutex); 2074 if (err < 0) { 2075 pr_err("mdsc_handle_reply got corrupt reply mds%d\n", mds); 2076 ceph_msg_dump(msg); 2077 goto out_err; 2078 } 2079 2080 /* snap trace */ 2081 if (rinfo->snapblob_len) { 2082 down_write(&mdsc->snap_rwsem); 2083 ceph_update_snap_trace(mdsc, rinfo->snapblob, 2084 rinfo->snapblob + rinfo->snapblob_len, 2085 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP); 2086 downgrade_write(&mdsc->snap_rwsem); 2087 } else { 2088 down_read(&mdsc->snap_rwsem); 2089 } 2090 2091 /* insert trace into our cache */ 2092 mutex_lock(&req->r_fill_mutex); 2093 err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session); 2094 if (err == 0) { 2095 if (result == 0 && rinfo->dir_nr) 2096 ceph_readdir_prepopulate(req, req->r_session); 2097 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 2098 } 2099 mutex_unlock(&req->r_fill_mutex); 2100 2101 up_read(&mdsc->snap_rwsem); 2102 out_err: 2103 mutex_lock(&mdsc->mutex); 2104 if (!req->r_aborted) { 2105 if (err) { 2106 req->r_err = err; 2107 } else { 2108 req->r_reply = msg; 2109 ceph_msg_get(msg); 2110 req->r_got_result = true; 2111 } 2112 } else { 2113 dout("reply arrived after request %lld was aborted\n", tid); 2114 } 2115 mutex_unlock(&mdsc->mutex); 2116 2117 ceph_add_cap_releases(mdsc, req->r_session); 2118 mutex_unlock(&session->s_mutex); 2119 2120 /* kick calling process */ 2121 complete_request(mdsc, req); 2122 out: 2123 ceph_mdsc_put_request(req); 2124 return; 2125 } 2126 2127 2128 2129 /* 2130 * handle mds notification that our request has been forwarded. 2131 */ 2132 static void handle_forward(struct ceph_mds_client *mdsc, 2133 struct ceph_mds_session *session, 2134 struct ceph_msg *msg) 2135 { 2136 struct ceph_mds_request *req; 2137 u64 tid = le64_to_cpu(msg->hdr.tid); 2138 u32 next_mds; 2139 u32 fwd_seq; 2140 int err = -EINVAL; 2141 void *p = msg->front.iov_base; 2142 void *end = p + msg->front.iov_len; 2143 2144 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 2145 next_mds = ceph_decode_32(&p); 2146 fwd_seq = ceph_decode_32(&p); 2147 2148 mutex_lock(&mdsc->mutex); 2149 req = __lookup_request(mdsc, tid); 2150 if (!req) { 2151 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds); 2152 goto out; /* dup reply? */ 2153 } 2154 2155 if (req->r_aborted) { 2156 dout("forward tid %llu aborted, unregistering\n", tid); 2157 __unregister_request(mdsc, req); 2158 } else if (fwd_seq <= req->r_num_fwd) { 2159 dout("forward tid %llu to mds%d - old seq %d <= %d\n", 2160 tid, next_mds, req->r_num_fwd, fwd_seq); 2161 } else { 2162 /* resend. forward race not possible; mds would drop */ 2163 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); 2164 BUG_ON(req->r_err); 2165 BUG_ON(req->r_got_result); 2166 req->r_num_fwd = fwd_seq; 2167 req->r_resend_mds = next_mds; 2168 put_request_session(req); 2169 __do_request(mdsc, req); 2170 } 2171 ceph_mdsc_put_request(req); 2172 out: 2173 mutex_unlock(&mdsc->mutex); 2174 return; 2175 2176 bad: 2177 pr_err("mdsc_handle_forward decode error err=%d\n", err); 2178 } 2179 2180 /* 2181 * handle a mds session control message 2182 */ 2183 static void handle_session(struct ceph_mds_session *session, 2184 struct ceph_msg *msg) 2185 { 2186 struct ceph_mds_client *mdsc = session->s_mdsc; 2187 u32 op; 2188 u64 seq; 2189 int mds = session->s_mds; 2190 struct ceph_mds_session_head *h = msg->front.iov_base; 2191 int wake = 0; 2192 2193 /* decode */ 2194 if (msg->front.iov_len != sizeof(*h)) 2195 goto bad; 2196 op = le32_to_cpu(h->op); 2197 seq = le64_to_cpu(h->seq); 2198 2199 mutex_lock(&mdsc->mutex); 2200 if (op == CEPH_SESSION_CLOSE) 2201 __unregister_session(mdsc, session); 2202 /* FIXME: this ttl calculation is generous */ 2203 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 2204 mutex_unlock(&mdsc->mutex); 2205 2206 mutex_lock(&session->s_mutex); 2207 2208 dout("handle_session mds%d %s %p state %s seq %llu\n", 2209 mds, ceph_session_op_name(op), session, 2210 session_state_name(session->s_state), seq); 2211 2212 if (session->s_state == CEPH_MDS_SESSION_HUNG) { 2213 session->s_state = CEPH_MDS_SESSION_OPEN; 2214 pr_info("mds%d came back\n", session->s_mds); 2215 } 2216 2217 switch (op) { 2218 case CEPH_SESSION_OPEN: 2219 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 2220 pr_info("mds%d reconnect success\n", session->s_mds); 2221 session->s_state = CEPH_MDS_SESSION_OPEN; 2222 renewed_caps(mdsc, session, 0); 2223 wake = 1; 2224 if (mdsc->stopping) 2225 __close_session(mdsc, session); 2226 break; 2227 2228 case CEPH_SESSION_RENEWCAPS: 2229 if (session->s_renew_seq == seq) 2230 renewed_caps(mdsc, session, 1); 2231 break; 2232 2233 case CEPH_SESSION_CLOSE: 2234 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 2235 pr_info("mds%d reconnect denied\n", session->s_mds); 2236 remove_session_caps(session); 2237 wake = 1; /* for good measure */ 2238 wake_up_all(&mdsc->session_close_wq); 2239 kick_requests(mdsc, mds); 2240 break; 2241 2242 case CEPH_SESSION_STALE: 2243 pr_info("mds%d caps went stale, renewing\n", 2244 session->s_mds); 2245 spin_lock(&session->s_cap_lock); 2246 session->s_cap_gen++; 2247 session->s_cap_ttl = 0; 2248 spin_unlock(&session->s_cap_lock); 2249 send_renew_caps(mdsc, session); 2250 break; 2251 2252 case CEPH_SESSION_RECALL_STATE: 2253 trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 2254 break; 2255 2256 default: 2257 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); 2258 WARN_ON(1); 2259 } 2260 2261 mutex_unlock(&session->s_mutex); 2262 if (wake) { 2263 mutex_lock(&mdsc->mutex); 2264 __wake_requests(mdsc, &session->s_waiting); 2265 mutex_unlock(&mdsc->mutex); 2266 } 2267 return; 2268 2269 bad: 2270 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds, 2271 (int)msg->front.iov_len); 2272 ceph_msg_dump(msg); 2273 return; 2274 } 2275 2276 2277 /* 2278 * called under session->mutex. 2279 */ 2280 static void replay_unsafe_requests(struct ceph_mds_client *mdsc, 2281 struct ceph_mds_session *session) 2282 { 2283 struct ceph_mds_request *req, *nreq; 2284 int err; 2285 2286 dout("replay_unsafe_requests mds%d\n", session->s_mds); 2287 2288 mutex_lock(&mdsc->mutex); 2289 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) { 2290 err = __prepare_send_request(mdsc, req, session->s_mds); 2291 if (!err) { 2292 ceph_msg_get(req->r_request); 2293 ceph_con_send(&session->s_con, req->r_request); 2294 } 2295 } 2296 mutex_unlock(&mdsc->mutex); 2297 } 2298 2299 /* 2300 * Encode information about a cap for a reconnect with the MDS. 2301 */ 2302 static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, 2303 void *arg) 2304 { 2305 union { 2306 struct ceph_mds_cap_reconnect v2; 2307 struct ceph_mds_cap_reconnect_v1 v1; 2308 } rec; 2309 size_t reclen; 2310 struct ceph_inode_info *ci; 2311 struct ceph_reconnect_state *recon_state = arg; 2312 struct ceph_pagelist *pagelist = recon_state->pagelist; 2313 char *path; 2314 int pathlen, err; 2315 u64 pathbase; 2316 struct dentry *dentry; 2317 2318 ci = cap->ci; 2319 2320 dout(" adding %p ino %llx.%llx cap %p %lld %s\n", 2321 inode, ceph_vinop(inode), cap, cap->cap_id, 2322 ceph_cap_string(cap->issued)); 2323 err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 2324 if (err) 2325 return err; 2326 2327 dentry = d_find_alias(inode); 2328 if (dentry) { 2329 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0); 2330 if (IS_ERR(path)) { 2331 err = PTR_ERR(path); 2332 goto out_dput; 2333 } 2334 } else { 2335 path = NULL; 2336 pathlen = 0; 2337 } 2338 err = ceph_pagelist_encode_string(pagelist, path, pathlen); 2339 if (err) 2340 goto out_free; 2341 2342 spin_lock(&inode->i_lock); 2343 cap->seq = 0; /* reset cap seq */ 2344 cap->issue_seq = 0; /* and issue_seq */ 2345 2346 if (recon_state->flock) { 2347 rec.v2.cap_id = cpu_to_le64(cap->cap_id); 2348 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 2349 rec.v2.issued = cpu_to_le32(cap->issued); 2350 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 2351 rec.v2.pathbase = cpu_to_le64(pathbase); 2352 rec.v2.flock_len = 0; 2353 reclen = sizeof(rec.v2); 2354 } else { 2355 rec.v1.cap_id = cpu_to_le64(cap->cap_id); 2356 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 2357 rec.v1.issued = cpu_to_le32(cap->issued); 2358 rec.v1.size = cpu_to_le64(inode->i_size); 2359 ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime); 2360 ceph_encode_timespec(&rec.v1.atime, &inode->i_atime); 2361 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 2362 rec.v1.pathbase = cpu_to_le64(pathbase); 2363 reclen = sizeof(rec.v1); 2364 } 2365 spin_unlock(&inode->i_lock); 2366 2367 if (recon_state->flock) { 2368 int num_fcntl_locks, num_flock_locks; 2369 struct ceph_pagelist_cursor trunc_point; 2370 2371 ceph_pagelist_set_cursor(pagelist, &trunc_point); 2372 do { 2373 lock_flocks(); 2374 ceph_count_locks(inode, &num_fcntl_locks, 2375 &num_flock_locks); 2376 rec.v2.flock_len = (2*sizeof(u32) + 2377 (num_fcntl_locks+num_flock_locks) * 2378 sizeof(struct ceph_filelock)); 2379 unlock_flocks(); 2380 2381 /* pre-alloc pagelist */ 2382 ceph_pagelist_truncate(pagelist, &trunc_point); 2383 err = ceph_pagelist_append(pagelist, &rec, reclen); 2384 if (!err) 2385 err = ceph_pagelist_reserve(pagelist, 2386 rec.v2.flock_len); 2387 2388 /* encode locks */ 2389 if (!err) { 2390 lock_flocks(); 2391 err = ceph_encode_locks(inode, 2392 pagelist, 2393 num_fcntl_locks, 2394 num_flock_locks); 2395 unlock_flocks(); 2396 } 2397 } while (err == -ENOSPC); 2398 } else { 2399 err = ceph_pagelist_append(pagelist, &rec, reclen); 2400 } 2401 2402 out_free: 2403 kfree(path); 2404 out_dput: 2405 dput(dentry); 2406 return err; 2407 } 2408 2409 2410 /* 2411 * If an MDS fails and recovers, clients need to reconnect in order to 2412 * reestablish shared state. This includes all caps issued through 2413 * this session _and_ the snap_realm hierarchy. Because it's not 2414 * clear which snap realms the mds cares about, we send everything we 2415 * know about.. that ensures we'll then get any new info the 2416 * recovering MDS might have. 2417 * 2418 * This is a relatively heavyweight operation, but it's rare. 2419 * 2420 * called with mdsc->mutex held. 2421 */ 2422 static void send_mds_reconnect(struct ceph_mds_client *mdsc, 2423 struct ceph_mds_session *session) 2424 { 2425 struct ceph_msg *reply; 2426 struct rb_node *p; 2427 int mds = session->s_mds; 2428 int err = -ENOMEM; 2429 struct ceph_pagelist *pagelist; 2430 struct ceph_reconnect_state recon_state; 2431 2432 pr_info("mds%d reconnect start\n", mds); 2433 2434 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); 2435 if (!pagelist) 2436 goto fail_nopagelist; 2437 ceph_pagelist_init(pagelist); 2438 2439 reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS); 2440 if (!reply) 2441 goto fail_nomsg; 2442 2443 mutex_lock(&session->s_mutex); 2444 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 2445 session->s_seq = 0; 2446 2447 ceph_con_open(&session->s_con, 2448 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 2449 2450 /* replay unsafe requests */ 2451 replay_unsafe_requests(mdsc, session); 2452 2453 down_read(&mdsc->snap_rwsem); 2454 2455 dout("session %p state %s\n", session, 2456 session_state_name(session->s_state)); 2457 2458 /* drop old cap expires; we're about to reestablish that state */ 2459 discard_cap_releases(mdsc, session); 2460 2461 /* traverse this session's caps */ 2462 err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps); 2463 if (err) 2464 goto fail; 2465 2466 recon_state.pagelist = pagelist; 2467 recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK; 2468 err = iterate_session_caps(session, encode_caps_cb, &recon_state); 2469 if (err < 0) 2470 goto fail; 2471 2472 /* 2473 * snaprealms. we provide mds with the ino, seq (version), and 2474 * parent for all of our realms. If the mds has any newer info, 2475 * it will tell us. 2476 */ 2477 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { 2478 struct ceph_snap_realm *realm = 2479 rb_entry(p, struct ceph_snap_realm, node); 2480 struct ceph_mds_snaprealm_reconnect sr_rec; 2481 2482 dout(" adding snap realm %llx seq %lld parent %llx\n", 2483 realm->ino, realm->seq, realm->parent_ino); 2484 sr_rec.ino = cpu_to_le64(realm->ino); 2485 sr_rec.seq = cpu_to_le64(realm->seq); 2486 sr_rec.parent = cpu_to_le64(realm->parent_ino); 2487 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); 2488 if (err) 2489 goto fail; 2490 } 2491 2492 reply->pagelist = pagelist; 2493 if (recon_state.flock) 2494 reply->hdr.version = cpu_to_le16(2); 2495 reply->hdr.data_len = cpu_to_le32(pagelist->length); 2496 reply->nr_pages = calc_pages_for(0, pagelist->length); 2497 ceph_con_send(&session->s_con, reply); 2498 2499 mutex_unlock(&session->s_mutex); 2500 2501 mutex_lock(&mdsc->mutex); 2502 __wake_requests(mdsc, &session->s_waiting); 2503 mutex_unlock(&mdsc->mutex); 2504 2505 up_read(&mdsc->snap_rwsem); 2506 return; 2507 2508 fail: 2509 ceph_msg_put(reply); 2510 up_read(&mdsc->snap_rwsem); 2511 mutex_unlock(&session->s_mutex); 2512 fail_nomsg: 2513 ceph_pagelist_release(pagelist); 2514 kfree(pagelist); 2515 fail_nopagelist: 2516 pr_err("error %d preparing reconnect for mds%d\n", err, mds); 2517 return; 2518 } 2519 2520 2521 /* 2522 * compare old and new mdsmaps, kicking requests 2523 * and closing out old connections as necessary 2524 * 2525 * called under mdsc->mutex. 2526 */ 2527 static void check_new_map(struct ceph_mds_client *mdsc, 2528 struct ceph_mdsmap *newmap, 2529 struct ceph_mdsmap *oldmap) 2530 { 2531 int i; 2532 int oldstate, newstate; 2533 struct ceph_mds_session *s; 2534 2535 dout("check_new_map new %u old %u\n", 2536 newmap->m_epoch, oldmap->m_epoch); 2537 2538 for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) { 2539 if (mdsc->sessions[i] == NULL) 2540 continue; 2541 s = mdsc->sessions[i]; 2542 oldstate = ceph_mdsmap_get_state(oldmap, i); 2543 newstate = ceph_mdsmap_get_state(newmap, i); 2544 2545 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n", 2546 i, ceph_mds_state_name(oldstate), 2547 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", 2548 ceph_mds_state_name(newstate), 2549 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", 2550 session_state_name(s->s_state)); 2551 2552 if (memcmp(ceph_mdsmap_get_addr(oldmap, i), 2553 ceph_mdsmap_get_addr(newmap, i), 2554 sizeof(struct ceph_entity_addr))) { 2555 if (s->s_state == CEPH_MDS_SESSION_OPENING) { 2556 /* the session never opened, just close it 2557 * out now */ 2558 __wake_requests(mdsc, &s->s_waiting); 2559 __unregister_session(mdsc, s); 2560 } else { 2561 /* just close it */ 2562 mutex_unlock(&mdsc->mutex); 2563 mutex_lock(&s->s_mutex); 2564 mutex_lock(&mdsc->mutex); 2565 ceph_con_close(&s->s_con); 2566 mutex_unlock(&s->s_mutex); 2567 s->s_state = CEPH_MDS_SESSION_RESTARTING; 2568 } 2569 2570 /* kick any requests waiting on the recovering mds */ 2571 kick_requests(mdsc, i); 2572 } else if (oldstate == newstate) { 2573 continue; /* nothing new with this mds */ 2574 } 2575 2576 /* 2577 * send reconnect? 2578 */ 2579 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 2580 newstate >= CEPH_MDS_STATE_RECONNECT) { 2581 mutex_unlock(&mdsc->mutex); 2582 send_mds_reconnect(mdsc, s); 2583 mutex_lock(&mdsc->mutex); 2584 } 2585 2586 /* 2587 * kick request on any mds that has gone active. 2588 */ 2589 if (oldstate < CEPH_MDS_STATE_ACTIVE && 2590 newstate >= CEPH_MDS_STATE_ACTIVE) { 2591 if (oldstate != CEPH_MDS_STATE_CREATING && 2592 oldstate != CEPH_MDS_STATE_STARTING) 2593 pr_info("mds%d recovery completed\n", s->s_mds); 2594 kick_requests(mdsc, i); 2595 ceph_kick_flushing_caps(mdsc, s); 2596 wake_up_session_caps(s, 1); 2597 } 2598 } 2599 2600 for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) { 2601 s = mdsc->sessions[i]; 2602 if (!s) 2603 continue; 2604 if (!ceph_mdsmap_is_laggy(newmap, i)) 2605 continue; 2606 if (s->s_state == CEPH_MDS_SESSION_OPEN || 2607 s->s_state == CEPH_MDS_SESSION_HUNG || 2608 s->s_state == CEPH_MDS_SESSION_CLOSING) { 2609 dout(" connecting to export targets of laggy mds%d\n", 2610 i); 2611 __open_export_target_sessions(mdsc, s); 2612 } 2613 } 2614 } 2615 2616 2617 2618 /* 2619 * leases 2620 */ 2621 2622 /* 2623 * caller must hold session s_mutex, dentry->d_lock 2624 */ 2625 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry) 2626 { 2627 struct ceph_dentry_info *di = ceph_dentry(dentry); 2628 2629 ceph_put_mds_session(di->lease_session); 2630 di->lease_session = NULL; 2631 } 2632 2633 static void handle_lease(struct ceph_mds_client *mdsc, 2634 struct ceph_mds_session *session, 2635 struct ceph_msg *msg) 2636 { 2637 struct super_block *sb = mdsc->fsc->sb; 2638 struct inode *inode; 2639 struct ceph_inode_info *ci; 2640 struct dentry *parent, *dentry; 2641 struct ceph_dentry_info *di; 2642 int mds = session->s_mds; 2643 struct ceph_mds_lease *h = msg->front.iov_base; 2644 u32 seq; 2645 struct ceph_vino vino; 2646 int mask; 2647 struct qstr dname; 2648 int release = 0; 2649 2650 dout("handle_lease from mds%d\n", mds); 2651 2652 /* decode */ 2653 if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) 2654 goto bad; 2655 vino.ino = le64_to_cpu(h->ino); 2656 vino.snap = CEPH_NOSNAP; 2657 mask = le16_to_cpu(h->mask); 2658 seq = le32_to_cpu(h->seq); 2659 dname.name = (void *)h + sizeof(*h) + sizeof(u32); 2660 dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32); 2661 if (dname.len != get_unaligned_le32(h+1)) 2662 goto bad; 2663 2664 mutex_lock(&session->s_mutex); 2665 session->s_seq++; 2666 2667 /* lookup inode */ 2668 inode = ceph_find_inode(sb, vino); 2669 dout("handle_lease %s, mask %d, ino %llx %p %.*s\n", 2670 ceph_lease_op_name(h->action), mask, vino.ino, inode, 2671 dname.len, dname.name); 2672 if (inode == NULL) { 2673 dout("handle_lease no inode %llx\n", vino.ino); 2674 goto release; 2675 } 2676 ci = ceph_inode(inode); 2677 2678 /* dentry */ 2679 parent = d_find_alias(inode); 2680 if (!parent) { 2681 dout("no parent dentry on inode %p\n", inode); 2682 WARN_ON(1); 2683 goto release; /* hrm... */ 2684 } 2685 dname.hash = full_name_hash(dname.name, dname.len); 2686 dentry = d_lookup(parent, &dname); 2687 dput(parent); 2688 if (!dentry) 2689 goto release; 2690 2691 spin_lock(&dentry->d_lock); 2692 di = ceph_dentry(dentry); 2693 switch (h->action) { 2694 case CEPH_MDS_LEASE_REVOKE: 2695 if (di && di->lease_session == session) { 2696 if (ceph_seq_cmp(di->lease_seq, seq) > 0) 2697 h->seq = cpu_to_le32(di->lease_seq); 2698 __ceph_mdsc_drop_dentry_lease(dentry); 2699 } 2700 release = 1; 2701 break; 2702 2703 case CEPH_MDS_LEASE_RENEW: 2704 if (di && di->lease_session == session && 2705 di->lease_gen == session->s_cap_gen && 2706 di->lease_renew_from && 2707 di->lease_renew_after == 0) { 2708 unsigned long duration = 2709 le32_to_cpu(h->duration_ms) * HZ / 1000; 2710 2711 di->lease_seq = seq; 2712 dentry->d_time = di->lease_renew_from + duration; 2713 di->lease_renew_after = di->lease_renew_from + 2714 (duration >> 1); 2715 di->lease_renew_from = 0; 2716 } 2717 break; 2718 } 2719 spin_unlock(&dentry->d_lock); 2720 dput(dentry); 2721 2722 if (!release) 2723 goto out; 2724 2725 release: 2726 /* let's just reuse the same message */ 2727 h->action = CEPH_MDS_LEASE_REVOKE_ACK; 2728 ceph_msg_get(msg); 2729 ceph_con_send(&session->s_con, msg); 2730 2731 out: 2732 iput(inode); 2733 mutex_unlock(&session->s_mutex); 2734 return; 2735 2736 bad: 2737 pr_err("corrupt lease message\n"); 2738 ceph_msg_dump(msg); 2739 } 2740 2741 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, 2742 struct inode *inode, 2743 struct dentry *dentry, char action, 2744 u32 seq) 2745 { 2746 struct ceph_msg *msg; 2747 struct ceph_mds_lease *lease; 2748 int len = sizeof(*lease) + sizeof(u32); 2749 int dnamelen = 0; 2750 2751 dout("lease_send_msg inode %p dentry %p %s to mds%d\n", 2752 inode, dentry, ceph_lease_op_name(action), session->s_mds); 2753 dnamelen = dentry->d_name.len; 2754 len += dnamelen; 2755 2756 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS); 2757 if (!msg) 2758 return; 2759 lease = msg->front.iov_base; 2760 lease->action = action; 2761 lease->mask = cpu_to_le16(1); 2762 lease->ino = cpu_to_le64(ceph_vino(inode).ino); 2763 lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap); 2764 lease->seq = cpu_to_le32(seq); 2765 put_unaligned_le32(dnamelen, lease + 1); 2766 memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen); 2767 2768 /* 2769 * if this is a preemptive lease RELEASE, no need to 2770 * flush request stream, since the actual request will 2771 * soon follow. 2772 */ 2773 msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE); 2774 2775 ceph_con_send(&session->s_con, msg); 2776 } 2777 2778 /* 2779 * Preemptively release a lease we expect to invalidate anyway. 2780 * Pass @inode always, @dentry is optional. 2781 */ 2782 void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode, 2783 struct dentry *dentry, int mask) 2784 { 2785 struct ceph_dentry_info *di; 2786 struct ceph_mds_session *session; 2787 u32 seq; 2788 2789 BUG_ON(inode == NULL); 2790 BUG_ON(dentry == NULL); 2791 BUG_ON(mask == 0); 2792 2793 /* is dentry lease valid? */ 2794 spin_lock(&dentry->d_lock); 2795 di = ceph_dentry(dentry); 2796 if (!di || !di->lease_session || 2797 di->lease_session->s_mds < 0 || 2798 di->lease_gen != di->lease_session->s_cap_gen || 2799 !time_before(jiffies, dentry->d_time)) { 2800 dout("lease_release inode %p dentry %p -- " 2801 "no lease on %d\n", 2802 inode, dentry, mask); 2803 spin_unlock(&dentry->d_lock); 2804 return; 2805 } 2806 2807 /* we do have a lease on this dentry; note mds and seq */ 2808 session = ceph_get_mds_session(di->lease_session); 2809 seq = di->lease_seq; 2810 __ceph_mdsc_drop_dentry_lease(dentry); 2811 spin_unlock(&dentry->d_lock); 2812 2813 dout("lease_release inode %p dentry %p mask %d to mds%d\n", 2814 inode, dentry, mask, session->s_mds); 2815 ceph_mdsc_lease_send_msg(session, inode, dentry, 2816 CEPH_MDS_LEASE_RELEASE, seq); 2817 ceph_put_mds_session(session); 2818 } 2819 2820 /* 2821 * drop all leases (and dentry refs) in preparation for umount 2822 */ 2823 static void drop_leases(struct ceph_mds_client *mdsc) 2824 { 2825 int i; 2826 2827 dout("drop_leases\n"); 2828 mutex_lock(&mdsc->mutex); 2829 for (i = 0; i < mdsc->max_sessions; i++) { 2830 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 2831 if (!s) 2832 continue; 2833 mutex_unlock(&mdsc->mutex); 2834 mutex_lock(&s->s_mutex); 2835 mutex_unlock(&s->s_mutex); 2836 ceph_put_mds_session(s); 2837 mutex_lock(&mdsc->mutex); 2838 } 2839 mutex_unlock(&mdsc->mutex); 2840 } 2841 2842 2843 2844 /* 2845 * delayed work -- periodically trim expired leases, renew caps with mds 2846 */ 2847 static void schedule_delayed(struct ceph_mds_client *mdsc) 2848 { 2849 int delay = 5; 2850 unsigned hz = round_jiffies_relative(HZ * delay); 2851 schedule_delayed_work(&mdsc->delayed_work, hz); 2852 } 2853 2854 static void delayed_work(struct work_struct *work) 2855 { 2856 int i; 2857 struct ceph_mds_client *mdsc = 2858 container_of(work, struct ceph_mds_client, delayed_work.work); 2859 int renew_interval; 2860 int renew_caps; 2861 2862 dout("mdsc delayed_work\n"); 2863 ceph_check_delayed_caps(mdsc); 2864 2865 mutex_lock(&mdsc->mutex); 2866 renew_interval = mdsc->mdsmap->m_session_timeout >> 2; 2867 renew_caps = time_after_eq(jiffies, HZ*renew_interval + 2868 mdsc->last_renew_caps); 2869 if (renew_caps) 2870 mdsc->last_renew_caps = jiffies; 2871 2872 for (i = 0; i < mdsc->max_sessions; i++) { 2873 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 2874 if (s == NULL) 2875 continue; 2876 if (s->s_state == CEPH_MDS_SESSION_CLOSING) { 2877 dout("resending session close request for mds%d\n", 2878 s->s_mds); 2879 request_close_session(mdsc, s); 2880 ceph_put_mds_session(s); 2881 continue; 2882 } 2883 if (s->s_ttl && time_after(jiffies, s->s_ttl)) { 2884 if (s->s_state == CEPH_MDS_SESSION_OPEN) { 2885 s->s_state = CEPH_MDS_SESSION_HUNG; 2886 pr_info("mds%d hung\n", s->s_mds); 2887 } 2888 } 2889 if (s->s_state < CEPH_MDS_SESSION_OPEN) { 2890 /* this mds is failed or recovering, just wait */ 2891 ceph_put_mds_session(s); 2892 continue; 2893 } 2894 mutex_unlock(&mdsc->mutex); 2895 2896 mutex_lock(&s->s_mutex); 2897 if (renew_caps) 2898 send_renew_caps(mdsc, s); 2899 else 2900 ceph_con_keepalive(&s->s_con); 2901 ceph_add_cap_releases(mdsc, s); 2902 if (s->s_state == CEPH_MDS_SESSION_OPEN || 2903 s->s_state == CEPH_MDS_SESSION_HUNG) 2904 ceph_send_cap_releases(mdsc, s); 2905 mutex_unlock(&s->s_mutex); 2906 ceph_put_mds_session(s); 2907 2908 mutex_lock(&mdsc->mutex); 2909 } 2910 mutex_unlock(&mdsc->mutex); 2911 2912 schedule_delayed(mdsc); 2913 } 2914 2915 int ceph_mdsc_init(struct ceph_fs_client *fsc) 2916 2917 { 2918 struct ceph_mds_client *mdsc; 2919 2920 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS); 2921 if (!mdsc) 2922 return -ENOMEM; 2923 mdsc->fsc = fsc; 2924 fsc->mdsc = mdsc; 2925 mutex_init(&mdsc->mutex); 2926 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); 2927 if (mdsc->mdsmap == NULL) 2928 return -ENOMEM; 2929 2930 init_completion(&mdsc->safe_umount_waiters); 2931 init_waitqueue_head(&mdsc->session_close_wq); 2932 INIT_LIST_HEAD(&mdsc->waiting_for_map); 2933 mdsc->sessions = NULL; 2934 mdsc->max_sessions = 0; 2935 mdsc->stopping = 0; 2936 init_rwsem(&mdsc->snap_rwsem); 2937 mdsc->snap_realms = RB_ROOT; 2938 INIT_LIST_HEAD(&mdsc->snap_empty); 2939 spin_lock_init(&mdsc->snap_empty_lock); 2940 mdsc->last_tid = 0; 2941 mdsc->request_tree = RB_ROOT; 2942 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); 2943 mdsc->last_renew_caps = jiffies; 2944 INIT_LIST_HEAD(&mdsc->cap_delay_list); 2945 spin_lock_init(&mdsc->cap_delay_lock); 2946 INIT_LIST_HEAD(&mdsc->snap_flush_list); 2947 spin_lock_init(&mdsc->snap_flush_lock); 2948 mdsc->cap_flush_seq = 0; 2949 INIT_LIST_HEAD(&mdsc->cap_dirty); 2950 mdsc->num_cap_flushing = 0; 2951 spin_lock_init(&mdsc->cap_dirty_lock); 2952 init_waitqueue_head(&mdsc->cap_flushing_wq); 2953 spin_lock_init(&mdsc->dentry_lru_lock); 2954 INIT_LIST_HEAD(&mdsc->dentry_lru); 2955 2956 ceph_caps_init(mdsc); 2957 ceph_adjust_min_caps(mdsc, fsc->min_caps); 2958 2959 return 0; 2960 } 2961 2962 /* 2963 * Wait for safe replies on open mds requests. If we time out, drop 2964 * all requests from the tree to avoid dangling dentry refs. 2965 */ 2966 static void wait_requests(struct ceph_mds_client *mdsc) 2967 { 2968 struct ceph_mds_request *req; 2969 struct ceph_fs_client *fsc = mdsc->fsc; 2970 2971 mutex_lock(&mdsc->mutex); 2972 if (__get_oldest_req(mdsc)) { 2973 mutex_unlock(&mdsc->mutex); 2974 2975 dout("wait_requests waiting for requests\n"); 2976 wait_for_completion_timeout(&mdsc->safe_umount_waiters, 2977 fsc->client->options->mount_timeout * HZ); 2978 2979 /* tear down remaining requests */ 2980 mutex_lock(&mdsc->mutex); 2981 while ((req = __get_oldest_req(mdsc))) { 2982 dout("wait_requests timed out on tid %llu\n", 2983 req->r_tid); 2984 __unregister_request(mdsc, req); 2985 } 2986 } 2987 mutex_unlock(&mdsc->mutex); 2988 dout("wait_requests done\n"); 2989 } 2990 2991 /* 2992 * called before mount is ro, and before dentries are torn down. 2993 * (hmm, does this still race with new lookups?) 2994 */ 2995 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) 2996 { 2997 dout("pre_umount\n"); 2998 mdsc->stopping = 1; 2999 3000 drop_leases(mdsc); 3001 ceph_flush_dirty_caps(mdsc); 3002 wait_requests(mdsc); 3003 3004 /* 3005 * wait for reply handlers to drop their request refs and 3006 * their inode/dcache refs 3007 */ 3008 ceph_msgr_flush(); 3009 } 3010 3011 /* 3012 * wait for all write mds requests to flush. 3013 */ 3014 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid) 3015 { 3016 struct ceph_mds_request *req = NULL, *nextreq; 3017 struct rb_node *n; 3018 3019 mutex_lock(&mdsc->mutex); 3020 dout("wait_unsafe_requests want %lld\n", want_tid); 3021 restart: 3022 req = __get_oldest_req(mdsc); 3023 while (req && req->r_tid <= want_tid) { 3024 /* find next request */ 3025 n = rb_next(&req->r_node); 3026 if (n) 3027 nextreq = rb_entry(n, struct ceph_mds_request, r_node); 3028 else 3029 nextreq = NULL; 3030 if ((req->r_op & CEPH_MDS_OP_WRITE)) { 3031 /* write op */ 3032 ceph_mdsc_get_request(req); 3033 if (nextreq) 3034 ceph_mdsc_get_request(nextreq); 3035 mutex_unlock(&mdsc->mutex); 3036 dout("wait_unsafe_requests wait on %llu (want %llu)\n", 3037 req->r_tid, want_tid); 3038 wait_for_completion(&req->r_safe_completion); 3039 mutex_lock(&mdsc->mutex); 3040 ceph_mdsc_put_request(req); 3041 if (!nextreq) 3042 break; /* next dne before, so we're done! */ 3043 if (RB_EMPTY_NODE(&nextreq->r_node)) { 3044 /* next request was removed from tree */ 3045 ceph_mdsc_put_request(nextreq); 3046 goto restart; 3047 } 3048 ceph_mdsc_put_request(nextreq); /* won't go away */ 3049 } 3050 req = nextreq; 3051 } 3052 mutex_unlock(&mdsc->mutex); 3053 dout("wait_unsafe_requests done\n"); 3054 } 3055 3056 void ceph_mdsc_sync(struct ceph_mds_client *mdsc) 3057 { 3058 u64 want_tid, want_flush; 3059 3060 if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN) 3061 return; 3062 3063 dout("sync\n"); 3064 mutex_lock(&mdsc->mutex); 3065 want_tid = mdsc->last_tid; 3066 want_flush = mdsc->cap_flush_seq; 3067 mutex_unlock(&mdsc->mutex); 3068 dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush); 3069 3070 ceph_flush_dirty_caps(mdsc); 3071 3072 wait_unsafe_requests(mdsc, want_tid); 3073 wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush)); 3074 } 3075 3076 /* 3077 * true if all sessions are closed, or we force unmount 3078 */ 3079 bool done_closing_sessions(struct ceph_mds_client *mdsc) 3080 { 3081 int i, n = 0; 3082 3083 if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN) 3084 return true; 3085 3086 mutex_lock(&mdsc->mutex); 3087 for (i = 0; i < mdsc->max_sessions; i++) 3088 if (mdsc->sessions[i]) 3089 n++; 3090 mutex_unlock(&mdsc->mutex); 3091 return n == 0; 3092 } 3093 3094 /* 3095 * called after sb is ro. 3096 */ 3097 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) 3098 { 3099 struct ceph_mds_session *session; 3100 int i; 3101 struct ceph_fs_client *fsc = mdsc->fsc; 3102 unsigned long timeout = fsc->client->options->mount_timeout * HZ; 3103 3104 dout("close_sessions\n"); 3105 3106 /* close sessions */ 3107 mutex_lock(&mdsc->mutex); 3108 for (i = 0; i < mdsc->max_sessions; i++) { 3109 session = __ceph_lookup_mds_session(mdsc, i); 3110 if (!session) 3111 continue; 3112 mutex_unlock(&mdsc->mutex); 3113 mutex_lock(&session->s_mutex); 3114 __close_session(mdsc, session); 3115 mutex_unlock(&session->s_mutex); 3116 ceph_put_mds_session(session); 3117 mutex_lock(&mdsc->mutex); 3118 } 3119 mutex_unlock(&mdsc->mutex); 3120 3121 dout("waiting for sessions to close\n"); 3122 wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc), 3123 timeout); 3124 3125 /* tear down remaining sessions */ 3126 mutex_lock(&mdsc->mutex); 3127 for (i = 0; i < mdsc->max_sessions; i++) { 3128 if (mdsc->sessions[i]) { 3129 session = get_session(mdsc->sessions[i]); 3130 __unregister_session(mdsc, session); 3131 mutex_unlock(&mdsc->mutex); 3132 mutex_lock(&session->s_mutex); 3133 remove_session_caps(session); 3134 mutex_unlock(&session->s_mutex); 3135 ceph_put_mds_session(session); 3136 mutex_lock(&mdsc->mutex); 3137 } 3138 } 3139 WARN_ON(!list_empty(&mdsc->cap_delay_list)); 3140 mutex_unlock(&mdsc->mutex); 3141 3142 ceph_cleanup_empty_realms(mdsc); 3143 3144 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 3145 3146 dout("stopped\n"); 3147 } 3148 3149 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) 3150 { 3151 dout("stop\n"); 3152 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 3153 if (mdsc->mdsmap) 3154 ceph_mdsmap_destroy(mdsc->mdsmap); 3155 kfree(mdsc->sessions); 3156 ceph_caps_finalize(mdsc); 3157 } 3158 3159 void ceph_mdsc_destroy(struct ceph_fs_client *fsc) 3160 { 3161 struct ceph_mds_client *mdsc = fsc->mdsc; 3162 3163 ceph_mdsc_stop(mdsc); 3164 fsc->mdsc = NULL; 3165 kfree(mdsc); 3166 } 3167 3168 3169 /* 3170 * handle mds map update. 3171 */ 3172 void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 3173 { 3174 u32 epoch; 3175 u32 maplen; 3176 void *p = msg->front.iov_base; 3177 void *end = p + msg->front.iov_len; 3178 struct ceph_mdsmap *newmap, *oldmap; 3179 struct ceph_fsid fsid; 3180 int err = -EINVAL; 3181 3182 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad); 3183 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 3184 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0) 3185 return; 3186 epoch = ceph_decode_32(&p); 3187 maplen = ceph_decode_32(&p); 3188 dout("handle_map epoch %u len %d\n", epoch, (int)maplen); 3189 3190 /* do we need it? */ 3191 ceph_monc_got_mdsmap(&mdsc->fsc->client->monc, epoch); 3192 mutex_lock(&mdsc->mutex); 3193 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 3194 dout("handle_map epoch %u <= our %u\n", 3195 epoch, mdsc->mdsmap->m_epoch); 3196 mutex_unlock(&mdsc->mutex); 3197 return; 3198 } 3199 3200 newmap = ceph_mdsmap_decode(&p, end); 3201 if (IS_ERR(newmap)) { 3202 err = PTR_ERR(newmap); 3203 goto bad_unlock; 3204 } 3205 3206 /* swap into place */ 3207 if (mdsc->mdsmap) { 3208 oldmap = mdsc->mdsmap; 3209 mdsc->mdsmap = newmap; 3210 check_new_map(mdsc, newmap, oldmap); 3211 ceph_mdsmap_destroy(oldmap); 3212 } else { 3213 mdsc->mdsmap = newmap; /* first mds map */ 3214 } 3215 mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size; 3216 3217 __wake_requests(mdsc, &mdsc->waiting_for_map); 3218 3219 mutex_unlock(&mdsc->mutex); 3220 schedule_delayed(mdsc); 3221 return; 3222 3223 bad_unlock: 3224 mutex_unlock(&mdsc->mutex); 3225 bad: 3226 pr_err("error decoding mdsmap %d\n", err); 3227 return; 3228 } 3229 3230 static struct ceph_connection *con_get(struct ceph_connection *con) 3231 { 3232 struct ceph_mds_session *s = con->private; 3233 3234 if (get_session(s)) { 3235 dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref)); 3236 return con; 3237 } 3238 dout("mdsc con_get %p FAIL\n", s); 3239 return NULL; 3240 } 3241 3242 static void con_put(struct ceph_connection *con) 3243 { 3244 struct ceph_mds_session *s = con->private; 3245 3246 ceph_put_mds_session(s); 3247 dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref)); 3248 } 3249 3250 /* 3251 * if the client is unresponsive for long enough, the mds will kill 3252 * the session entirely. 3253 */ 3254 static void peer_reset(struct ceph_connection *con) 3255 { 3256 struct ceph_mds_session *s = con->private; 3257 struct ceph_mds_client *mdsc = s->s_mdsc; 3258 3259 pr_warning("mds%d closed our session\n", s->s_mds); 3260 send_mds_reconnect(mdsc, s); 3261 } 3262 3263 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 3264 { 3265 struct ceph_mds_session *s = con->private; 3266 struct ceph_mds_client *mdsc = s->s_mdsc; 3267 int type = le16_to_cpu(msg->hdr.type); 3268 3269 mutex_lock(&mdsc->mutex); 3270 if (__verify_registered_session(mdsc, s) < 0) { 3271 mutex_unlock(&mdsc->mutex); 3272 goto out; 3273 } 3274 mutex_unlock(&mdsc->mutex); 3275 3276 switch (type) { 3277 case CEPH_MSG_MDS_MAP: 3278 ceph_mdsc_handle_map(mdsc, msg); 3279 break; 3280 case CEPH_MSG_CLIENT_SESSION: 3281 handle_session(s, msg); 3282 break; 3283 case CEPH_MSG_CLIENT_REPLY: 3284 handle_reply(s, msg); 3285 break; 3286 case CEPH_MSG_CLIENT_REQUEST_FORWARD: 3287 handle_forward(mdsc, s, msg); 3288 break; 3289 case CEPH_MSG_CLIENT_CAPS: 3290 ceph_handle_caps(s, msg); 3291 break; 3292 case CEPH_MSG_CLIENT_SNAP: 3293 ceph_handle_snap(mdsc, s, msg); 3294 break; 3295 case CEPH_MSG_CLIENT_LEASE: 3296 handle_lease(mdsc, s, msg); 3297 break; 3298 3299 default: 3300 pr_err("received unknown message type %d %s\n", type, 3301 ceph_msg_type_name(type)); 3302 } 3303 out: 3304 ceph_msg_put(msg); 3305 } 3306 3307 /* 3308 * authentication 3309 */ 3310 static int get_authorizer(struct ceph_connection *con, 3311 void **buf, int *len, int *proto, 3312 void **reply_buf, int *reply_len, int force_new) 3313 { 3314 struct ceph_mds_session *s = con->private; 3315 struct ceph_mds_client *mdsc = s->s_mdsc; 3316 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 3317 int ret = 0; 3318 3319 if (force_new && s->s_authorizer) { 3320 ac->ops->destroy_authorizer(ac, s->s_authorizer); 3321 s->s_authorizer = NULL; 3322 } 3323 if (s->s_authorizer == NULL) { 3324 if (ac->ops->create_authorizer) { 3325 ret = ac->ops->create_authorizer( 3326 ac, CEPH_ENTITY_TYPE_MDS, 3327 &s->s_authorizer, 3328 &s->s_authorizer_buf, 3329 &s->s_authorizer_buf_len, 3330 &s->s_authorizer_reply_buf, 3331 &s->s_authorizer_reply_buf_len); 3332 if (ret) 3333 return ret; 3334 } 3335 } 3336 3337 *proto = ac->protocol; 3338 *buf = s->s_authorizer_buf; 3339 *len = s->s_authorizer_buf_len; 3340 *reply_buf = s->s_authorizer_reply_buf; 3341 *reply_len = s->s_authorizer_reply_buf_len; 3342 return 0; 3343 } 3344 3345 3346 static int verify_authorizer_reply(struct ceph_connection *con, int len) 3347 { 3348 struct ceph_mds_session *s = con->private; 3349 struct ceph_mds_client *mdsc = s->s_mdsc; 3350 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 3351 3352 return ac->ops->verify_authorizer_reply(ac, s->s_authorizer, len); 3353 } 3354 3355 static int invalidate_authorizer(struct ceph_connection *con) 3356 { 3357 struct ceph_mds_session *s = con->private; 3358 struct ceph_mds_client *mdsc = s->s_mdsc; 3359 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 3360 3361 if (ac->ops->invalidate_authorizer) 3362 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS); 3363 3364 return ceph_monc_validate_auth(&mdsc->fsc->client->monc); 3365 } 3366 3367 static const struct ceph_connection_operations mds_con_ops = { 3368 .get = con_get, 3369 .put = con_put, 3370 .dispatch = dispatch, 3371 .get_authorizer = get_authorizer, 3372 .verify_authorizer_reply = verify_authorizer_reply, 3373 .invalidate_authorizer = invalidate_authorizer, 3374 .peer_reset = peer_reset, 3375 }; 3376 3377 /* eof */ 3378