1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/module.h> 4 #include <linux/types.h> 5 #include <linux/slab.h> 6 #include <linux/random.h> 7 #include <linux/sched.h> 8 9 #include <linux/ceph/mon_client.h> 10 #include <linux/ceph/libceph.h> 11 #include <linux/ceph/debugfs.h> 12 #include <linux/ceph/decode.h> 13 #include <linux/ceph/auth.h> 14 15 /* 16 * Interact with Ceph monitor cluster. Handle requests for new map 17 * versions, and periodically resend as needed. Also implement 18 * statfs() and umount(). 19 * 20 * A small cluster of Ceph "monitors" are responsible for managing critical 21 * cluster configuration and state information. An odd number (e.g., 3, 5) 22 * of cmon daemons use a modified version of the Paxos part-time parliament 23 * algorithm to manage the MDS map (mds cluster membership), OSD map, and 24 * list of clients who have mounted the file system. 25 * 26 * We maintain an open, active session with a monitor at all times in order to 27 * receive timely MDSMap updates. We periodically send a keepalive byte on the 28 * TCP socket to ensure we detect a failure. If the connection does break, we 29 * randomly hunt for a new monitor. Once the connection is reestablished, we 30 * resend any outstanding requests. 31 */ 32 33 static const struct ceph_connection_operations mon_con_ops; 34 35 static int __validate_auth(struct ceph_mon_client *monc); 36 37 /* 38 * Decode a monmap blob (e.g., during mount). 39 */ 40 struct ceph_monmap *ceph_monmap_decode(void *p, void *end) 41 { 42 struct ceph_monmap *m = NULL; 43 int i, err = -EINVAL; 44 struct ceph_fsid fsid; 45 u32 epoch, num_mon; 46 u16 version; 47 u32 len; 48 49 ceph_decode_32_safe(&p, end, len, bad); 50 ceph_decode_need(&p, end, len, bad); 51 52 dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p)); 53 54 ceph_decode_16_safe(&p, end, version, bad); 55 56 ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad); 57 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 58 epoch = ceph_decode_32(&p); 59 60 num_mon = ceph_decode_32(&p); 61 ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad); 62 63 if (num_mon >= CEPH_MAX_MON) 64 goto bad; 65 m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS); 66 if (m == NULL) 67 return ERR_PTR(-ENOMEM); 68 m->fsid = fsid; 69 m->epoch = epoch; 70 m->num_mon = num_mon; 71 ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0])); 72 for (i = 0; i < num_mon; i++) 73 ceph_decode_addr(&m->mon_inst[i].addr); 74 75 dout("monmap_decode epoch %d, num_mon %d\n", m->epoch, 76 m->num_mon); 77 for (i = 0; i < m->num_mon; i++) 78 dout("monmap_decode mon%d is %s\n", i, 79 ceph_pr_addr(&m->mon_inst[i].addr.in_addr)); 80 return m; 81 82 bad: 83 dout("monmap_decode failed with %d\n", err); 84 kfree(m); 85 return ERR_PTR(err); 86 } 87 88 /* 89 * return true if *addr is included in the monmap. 90 */ 91 int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr) 92 { 93 int i; 94 95 for (i = 0; i < m->num_mon; i++) 96 if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0) 97 return 1; 98 return 0; 99 } 100 101 /* 102 * Send an auth request. 103 */ 104 static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len) 105 { 106 monc->pending_auth = 1; 107 monc->m_auth->front.iov_len = len; 108 monc->m_auth->hdr.front_len = cpu_to_le32(len); 109 ceph_msg_revoke(monc->m_auth); 110 ceph_msg_get(monc->m_auth); /* keep our ref */ 111 ceph_con_send(&monc->con, monc->m_auth); 112 } 113 114 /* 115 * Close monitor session, if any. 116 */ 117 static void __close_session(struct ceph_mon_client *monc) 118 { 119 dout("__close_session closing mon%d\n", monc->cur_mon); 120 ceph_msg_revoke(monc->m_auth); 121 ceph_msg_revoke_incoming(monc->m_auth_reply); 122 ceph_msg_revoke(monc->m_subscribe); 123 ceph_msg_revoke_incoming(monc->m_subscribe_ack); 124 ceph_con_close(&monc->con); 125 monc->cur_mon = -1; 126 monc->pending_auth = 0; 127 ceph_auth_reset(monc->auth); 128 } 129 130 /* 131 * Open a session with a (new) monitor. 132 */ 133 static int __open_session(struct ceph_mon_client *monc) 134 { 135 char r; 136 int ret; 137 138 if (monc->cur_mon < 0) { 139 get_random_bytes(&r, 1); 140 monc->cur_mon = r % monc->monmap->num_mon; 141 dout("open_session num=%d r=%d -> mon%d\n", 142 monc->monmap->num_mon, r, monc->cur_mon); 143 monc->sub_sent = 0; 144 monc->sub_renew_after = jiffies; /* i.e., expired */ 145 monc->want_next_osdmap = !!monc->want_next_osdmap; 146 147 dout("open_session mon%d opening\n", monc->cur_mon); 148 ceph_con_open(&monc->con, 149 CEPH_ENTITY_TYPE_MON, monc->cur_mon, 150 &monc->monmap->mon_inst[monc->cur_mon].addr); 151 152 /* initiatiate authentication handshake */ 153 ret = ceph_auth_build_hello(monc->auth, 154 monc->m_auth->front.iov_base, 155 monc->m_auth->front_max); 156 __send_prepared_auth_request(monc, ret); 157 } else { 158 dout("open_session mon%d already open\n", monc->cur_mon); 159 } 160 return 0; 161 } 162 163 static bool __sub_expired(struct ceph_mon_client *monc) 164 { 165 return time_after_eq(jiffies, monc->sub_renew_after); 166 } 167 168 /* 169 * Reschedule delayed work timer. 170 */ 171 static void __schedule_delayed(struct ceph_mon_client *monc) 172 { 173 unsigned int delay; 174 175 if (monc->cur_mon < 0 || __sub_expired(monc)) 176 delay = 10 * HZ; 177 else 178 delay = 20 * HZ; 179 dout("__schedule_delayed after %u\n", delay); 180 schedule_delayed_work(&monc->delayed_work, delay); 181 } 182 183 /* 184 * Send subscribe request for mdsmap and/or osdmap. 185 */ 186 static void __send_subscribe(struct ceph_mon_client *monc) 187 { 188 dout("__send_subscribe sub_sent=%u exp=%u want_osd=%d\n", 189 (unsigned int)monc->sub_sent, __sub_expired(monc), 190 monc->want_next_osdmap); 191 if ((__sub_expired(monc) && !monc->sub_sent) || 192 monc->want_next_osdmap == 1) { 193 struct ceph_msg *msg = monc->m_subscribe; 194 struct ceph_mon_subscribe_item *i; 195 void *p, *end; 196 int num; 197 198 p = msg->front.iov_base; 199 end = p + msg->front_max; 200 201 num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap; 202 ceph_encode_32(&p, num); 203 204 if (monc->want_next_osdmap) { 205 dout("__send_subscribe to 'osdmap' %u\n", 206 (unsigned int)monc->have_osdmap); 207 ceph_encode_string(&p, end, "osdmap", 6); 208 i = p; 209 i->have = cpu_to_le64(monc->have_osdmap); 210 i->onetime = 1; 211 p += sizeof(*i); 212 monc->want_next_osdmap = 2; /* requested */ 213 } 214 if (monc->want_mdsmap) { 215 dout("__send_subscribe to 'mdsmap' %u+\n", 216 (unsigned int)monc->have_mdsmap); 217 ceph_encode_string(&p, end, "mdsmap", 6); 218 i = p; 219 i->have = cpu_to_le64(monc->have_mdsmap); 220 i->onetime = 0; 221 p += sizeof(*i); 222 } 223 ceph_encode_string(&p, end, "monmap", 6); 224 i = p; 225 i->have = 0; 226 i->onetime = 0; 227 p += sizeof(*i); 228 229 msg->front.iov_len = p - msg->front.iov_base; 230 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 231 ceph_msg_revoke(msg); 232 ceph_con_send(&monc->con, ceph_msg_get(msg)); 233 234 monc->sub_sent = jiffies | 1; /* never 0 */ 235 } 236 } 237 238 static void handle_subscribe_ack(struct ceph_mon_client *monc, 239 struct ceph_msg *msg) 240 { 241 unsigned int seconds; 242 struct ceph_mon_subscribe_ack *h = msg->front.iov_base; 243 244 if (msg->front.iov_len < sizeof(*h)) 245 goto bad; 246 seconds = le32_to_cpu(h->duration); 247 248 mutex_lock(&monc->mutex); 249 if (monc->hunting) { 250 pr_info("mon%d %s session established\n", 251 monc->cur_mon, 252 ceph_pr_addr(&monc->con.peer_addr.in_addr)); 253 monc->hunting = false; 254 } 255 dout("handle_subscribe_ack after %d seconds\n", seconds); 256 monc->sub_renew_after = monc->sub_sent + (seconds >> 1)*HZ - 1; 257 monc->sub_sent = 0; 258 mutex_unlock(&monc->mutex); 259 return; 260 bad: 261 pr_err("got corrupt subscribe-ack msg\n"); 262 ceph_msg_dump(msg); 263 } 264 265 /* 266 * Keep track of which maps we have 267 */ 268 int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got) 269 { 270 mutex_lock(&monc->mutex); 271 monc->have_mdsmap = got; 272 mutex_unlock(&monc->mutex); 273 return 0; 274 } 275 EXPORT_SYMBOL(ceph_monc_got_mdsmap); 276 277 int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got) 278 { 279 mutex_lock(&monc->mutex); 280 monc->have_osdmap = got; 281 monc->want_next_osdmap = 0; 282 mutex_unlock(&monc->mutex); 283 return 0; 284 } 285 286 /* 287 * Register interest in the next osdmap 288 */ 289 void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc) 290 { 291 dout("request_next_osdmap have %u\n", monc->have_osdmap); 292 mutex_lock(&monc->mutex); 293 if (!monc->want_next_osdmap) 294 monc->want_next_osdmap = 1; 295 if (monc->want_next_osdmap < 2) 296 __send_subscribe(monc); 297 mutex_unlock(&monc->mutex); 298 } 299 300 /* 301 * 302 */ 303 int ceph_monc_open_session(struct ceph_mon_client *monc) 304 { 305 mutex_lock(&monc->mutex); 306 __open_session(monc); 307 __schedule_delayed(monc); 308 mutex_unlock(&monc->mutex); 309 return 0; 310 } 311 EXPORT_SYMBOL(ceph_monc_open_session); 312 313 /* 314 * The monitor responds with mount ack indicate mount success. The 315 * included client ticket allows the client to talk to MDSs and OSDs. 316 */ 317 static void ceph_monc_handle_map(struct ceph_mon_client *monc, 318 struct ceph_msg *msg) 319 { 320 struct ceph_client *client = monc->client; 321 struct ceph_monmap *monmap = NULL, *old = monc->monmap; 322 void *p, *end; 323 324 mutex_lock(&monc->mutex); 325 326 dout("handle_monmap\n"); 327 p = msg->front.iov_base; 328 end = p + msg->front.iov_len; 329 330 monmap = ceph_monmap_decode(p, end); 331 if (IS_ERR(monmap)) { 332 pr_err("problem decoding monmap, %d\n", 333 (int)PTR_ERR(monmap)); 334 goto out; 335 } 336 337 if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) { 338 kfree(monmap); 339 goto out; 340 } 341 342 client->monc.monmap = monmap; 343 kfree(old); 344 345 if (!client->have_fsid) { 346 client->have_fsid = true; 347 mutex_unlock(&monc->mutex); 348 /* 349 * do debugfs initialization without mutex to avoid 350 * creating a locking dependency 351 */ 352 ceph_debugfs_client_init(client); 353 goto out_unlocked; 354 } 355 out: 356 mutex_unlock(&monc->mutex); 357 out_unlocked: 358 wake_up_all(&client->auth_wq); 359 } 360 361 /* 362 * generic requests (e.g., statfs, poolop) 363 */ 364 static struct ceph_mon_generic_request *__lookup_generic_req( 365 struct ceph_mon_client *monc, u64 tid) 366 { 367 struct ceph_mon_generic_request *req; 368 struct rb_node *n = monc->generic_request_tree.rb_node; 369 370 while (n) { 371 req = rb_entry(n, struct ceph_mon_generic_request, node); 372 if (tid < req->tid) 373 n = n->rb_left; 374 else if (tid > req->tid) 375 n = n->rb_right; 376 else 377 return req; 378 } 379 return NULL; 380 } 381 382 static void __insert_generic_request(struct ceph_mon_client *monc, 383 struct ceph_mon_generic_request *new) 384 { 385 struct rb_node **p = &monc->generic_request_tree.rb_node; 386 struct rb_node *parent = NULL; 387 struct ceph_mon_generic_request *req = NULL; 388 389 while (*p) { 390 parent = *p; 391 req = rb_entry(parent, struct ceph_mon_generic_request, node); 392 if (new->tid < req->tid) 393 p = &(*p)->rb_left; 394 else if (new->tid > req->tid) 395 p = &(*p)->rb_right; 396 else 397 BUG(); 398 } 399 400 rb_link_node(&new->node, parent, p); 401 rb_insert_color(&new->node, &monc->generic_request_tree); 402 } 403 404 static void release_generic_request(struct kref *kref) 405 { 406 struct ceph_mon_generic_request *req = 407 container_of(kref, struct ceph_mon_generic_request, kref); 408 409 if (req->reply) 410 ceph_msg_put(req->reply); 411 if (req->request) 412 ceph_msg_put(req->request); 413 414 kfree(req); 415 } 416 417 static void put_generic_request(struct ceph_mon_generic_request *req) 418 { 419 kref_put(&req->kref, release_generic_request); 420 } 421 422 static void get_generic_request(struct ceph_mon_generic_request *req) 423 { 424 kref_get(&req->kref); 425 } 426 427 static struct ceph_msg *get_generic_reply(struct ceph_connection *con, 428 struct ceph_msg_header *hdr, 429 int *skip) 430 { 431 struct ceph_mon_client *monc = con->private; 432 struct ceph_mon_generic_request *req; 433 u64 tid = le64_to_cpu(hdr->tid); 434 struct ceph_msg *m; 435 436 mutex_lock(&monc->mutex); 437 req = __lookup_generic_req(monc, tid); 438 if (!req) { 439 dout("get_generic_reply %lld dne\n", tid); 440 *skip = 1; 441 m = NULL; 442 } else { 443 dout("get_generic_reply %lld got %p\n", tid, req->reply); 444 *skip = 0; 445 m = ceph_msg_get(req->reply); 446 /* 447 * we don't need to track the connection reading into 448 * this reply because we only have one open connection 449 * at a time, ever. 450 */ 451 } 452 mutex_unlock(&monc->mutex); 453 return m; 454 } 455 456 static int do_generic_request(struct ceph_mon_client *monc, 457 struct ceph_mon_generic_request *req) 458 { 459 int err; 460 461 /* register request */ 462 mutex_lock(&monc->mutex); 463 req->tid = ++monc->last_tid; 464 req->request->hdr.tid = cpu_to_le64(req->tid); 465 __insert_generic_request(monc, req); 466 monc->num_generic_requests++; 467 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 468 mutex_unlock(&monc->mutex); 469 470 err = wait_for_completion_interruptible(&req->completion); 471 472 mutex_lock(&monc->mutex); 473 rb_erase(&req->node, &monc->generic_request_tree); 474 monc->num_generic_requests--; 475 mutex_unlock(&monc->mutex); 476 477 if (!err) 478 err = req->result; 479 return err; 480 } 481 482 /* 483 * statfs 484 */ 485 static void handle_statfs_reply(struct ceph_mon_client *monc, 486 struct ceph_msg *msg) 487 { 488 struct ceph_mon_generic_request *req; 489 struct ceph_mon_statfs_reply *reply = msg->front.iov_base; 490 u64 tid = le64_to_cpu(msg->hdr.tid); 491 492 if (msg->front.iov_len != sizeof(*reply)) 493 goto bad; 494 dout("handle_statfs_reply %p tid %llu\n", msg, tid); 495 496 mutex_lock(&monc->mutex); 497 req = __lookup_generic_req(monc, tid); 498 if (req) { 499 *(struct ceph_statfs *)req->buf = reply->st; 500 req->result = 0; 501 get_generic_request(req); 502 } 503 mutex_unlock(&monc->mutex); 504 if (req) { 505 complete_all(&req->completion); 506 put_generic_request(req); 507 } 508 return; 509 510 bad: 511 pr_err("corrupt generic reply, tid %llu\n", tid); 512 ceph_msg_dump(msg); 513 } 514 515 /* 516 * Do a synchronous statfs(). 517 */ 518 int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf) 519 { 520 struct ceph_mon_generic_request *req; 521 struct ceph_mon_statfs *h; 522 int err; 523 524 req = kzalloc(sizeof(*req), GFP_NOFS); 525 if (!req) 526 return -ENOMEM; 527 528 kref_init(&req->kref); 529 req->buf = buf; 530 req->buf_len = sizeof(*buf); 531 init_completion(&req->completion); 532 533 err = -ENOMEM; 534 req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS, 535 true); 536 if (!req->request) 537 goto out; 538 req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS, 539 true); 540 if (!req->reply) 541 goto out; 542 543 /* fill out request */ 544 h = req->request->front.iov_base; 545 h->monhdr.have_version = 0; 546 h->monhdr.session_mon = cpu_to_le16(-1); 547 h->monhdr.session_mon_tid = 0; 548 h->fsid = monc->monmap->fsid; 549 550 err = do_generic_request(monc, req); 551 552 out: 553 kref_put(&req->kref, release_generic_request); 554 return err; 555 } 556 EXPORT_SYMBOL(ceph_monc_do_statfs); 557 558 /* 559 * pool ops 560 */ 561 static int get_poolop_reply_buf(const char *src, size_t src_len, 562 char *dst, size_t dst_len) 563 { 564 u32 buf_len; 565 566 if (src_len != sizeof(u32) + dst_len) 567 return -EINVAL; 568 569 buf_len = le32_to_cpu(*(u32 *)src); 570 if (buf_len != dst_len) 571 return -EINVAL; 572 573 memcpy(dst, src + sizeof(u32), dst_len); 574 return 0; 575 } 576 577 static void handle_poolop_reply(struct ceph_mon_client *monc, 578 struct ceph_msg *msg) 579 { 580 struct ceph_mon_generic_request *req; 581 struct ceph_mon_poolop_reply *reply = msg->front.iov_base; 582 u64 tid = le64_to_cpu(msg->hdr.tid); 583 584 if (msg->front.iov_len < sizeof(*reply)) 585 goto bad; 586 dout("handle_poolop_reply %p tid %llu\n", msg, tid); 587 588 mutex_lock(&monc->mutex); 589 req = __lookup_generic_req(monc, tid); 590 if (req) { 591 if (req->buf_len && 592 get_poolop_reply_buf(msg->front.iov_base + sizeof(*reply), 593 msg->front.iov_len - sizeof(*reply), 594 req->buf, req->buf_len) < 0) { 595 mutex_unlock(&monc->mutex); 596 goto bad; 597 } 598 req->result = le32_to_cpu(reply->reply_code); 599 get_generic_request(req); 600 } 601 mutex_unlock(&monc->mutex); 602 if (req) { 603 complete(&req->completion); 604 put_generic_request(req); 605 } 606 return; 607 608 bad: 609 pr_err("corrupt generic reply, tid %llu\n", tid); 610 ceph_msg_dump(msg); 611 } 612 613 /* 614 * Do a synchronous pool op. 615 */ 616 int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op, 617 u32 pool, u64 snapid, 618 char *buf, int len) 619 { 620 struct ceph_mon_generic_request *req; 621 struct ceph_mon_poolop *h; 622 int err; 623 624 req = kzalloc(sizeof(*req), GFP_NOFS); 625 if (!req) 626 return -ENOMEM; 627 628 kref_init(&req->kref); 629 req->buf = buf; 630 req->buf_len = len; 631 init_completion(&req->completion); 632 633 err = -ENOMEM; 634 req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS, 635 true); 636 if (!req->request) 637 goto out; 638 req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS, 639 true); 640 if (!req->reply) 641 goto out; 642 643 /* fill out request */ 644 req->request->hdr.version = cpu_to_le16(2); 645 h = req->request->front.iov_base; 646 h->monhdr.have_version = 0; 647 h->monhdr.session_mon = cpu_to_le16(-1); 648 h->monhdr.session_mon_tid = 0; 649 h->fsid = monc->monmap->fsid; 650 h->pool = cpu_to_le32(pool); 651 h->op = cpu_to_le32(op); 652 h->auid = 0; 653 h->snapid = cpu_to_le64(snapid); 654 h->name_len = 0; 655 656 err = do_generic_request(monc, req); 657 658 out: 659 kref_put(&req->kref, release_generic_request); 660 return err; 661 } 662 663 int ceph_monc_create_snapid(struct ceph_mon_client *monc, 664 u32 pool, u64 *snapid) 665 { 666 return ceph_monc_do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP, 667 pool, 0, (char *)snapid, sizeof(*snapid)); 668 669 } 670 EXPORT_SYMBOL(ceph_monc_create_snapid); 671 672 int ceph_monc_delete_snapid(struct ceph_mon_client *monc, 673 u32 pool, u64 snapid) 674 { 675 return ceph_monc_do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP, 676 pool, snapid, 0, 0); 677 678 } 679 680 /* 681 * Resend pending generic requests. 682 */ 683 static void __resend_generic_request(struct ceph_mon_client *monc) 684 { 685 struct ceph_mon_generic_request *req; 686 struct rb_node *p; 687 688 for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) { 689 req = rb_entry(p, struct ceph_mon_generic_request, node); 690 ceph_msg_revoke(req->request); 691 ceph_msg_revoke_incoming(req->reply); 692 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 693 } 694 } 695 696 /* 697 * Delayed work. If we haven't mounted yet, retry. Otherwise, 698 * renew/retry subscription as needed (in case it is timing out, or we 699 * got an ENOMEM). And keep the monitor connection alive. 700 */ 701 static void delayed_work(struct work_struct *work) 702 { 703 struct ceph_mon_client *monc = 704 container_of(work, struct ceph_mon_client, delayed_work.work); 705 706 dout("monc delayed_work\n"); 707 mutex_lock(&monc->mutex); 708 if (monc->hunting) { 709 __close_session(monc); 710 __open_session(monc); /* continue hunting */ 711 } else { 712 ceph_con_keepalive(&monc->con); 713 714 __validate_auth(monc); 715 716 if (monc->auth->ops->is_authenticated(monc->auth)) 717 __send_subscribe(monc); 718 } 719 __schedule_delayed(monc); 720 mutex_unlock(&monc->mutex); 721 } 722 723 /* 724 * On startup, we build a temporary monmap populated with the IPs 725 * provided by mount(2). 726 */ 727 static int build_initial_monmap(struct ceph_mon_client *monc) 728 { 729 struct ceph_options *opt = monc->client->options; 730 struct ceph_entity_addr *mon_addr = opt->mon_addr; 731 int num_mon = opt->num_mon; 732 int i; 733 734 /* build initial monmap */ 735 monc->monmap = kzalloc(sizeof(*monc->monmap) + 736 num_mon*sizeof(monc->monmap->mon_inst[0]), 737 GFP_KERNEL); 738 if (!monc->monmap) 739 return -ENOMEM; 740 for (i = 0; i < num_mon; i++) { 741 monc->monmap->mon_inst[i].addr = mon_addr[i]; 742 monc->monmap->mon_inst[i].addr.nonce = 0; 743 monc->monmap->mon_inst[i].name.type = 744 CEPH_ENTITY_TYPE_MON; 745 monc->monmap->mon_inst[i].name.num = cpu_to_le64(i); 746 } 747 monc->monmap->num_mon = num_mon; 748 monc->have_fsid = false; 749 return 0; 750 } 751 752 int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) 753 { 754 int err = 0; 755 756 dout("init\n"); 757 memset(monc, 0, sizeof(*monc)); 758 monc->client = cl; 759 monc->monmap = NULL; 760 mutex_init(&monc->mutex); 761 762 err = build_initial_monmap(monc); 763 if (err) 764 goto out; 765 766 /* connection */ 767 /* authentication */ 768 monc->auth = ceph_auth_init(cl->options->name, 769 cl->options->key); 770 if (IS_ERR(monc->auth)) { 771 err = PTR_ERR(monc->auth); 772 goto out_monmap; 773 } 774 monc->auth->want_keys = 775 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON | 776 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS; 777 778 /* msgs */ 779 err = -ENOMEM; 780 monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK, 781 sizeof(struct ceph_mon_subscribe_ack), 782 GFP_NOFS, true); 783 if (!monc->m_subscribe_ack) 784 goto out_auth; 785 786 monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS, 787 true); 788 if (!monc->m_subscribe) 789 goto out_subscribe_ack; 790 791 monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS, 792 true); 793 if (!monc->m_auth_reply) 794 goto out_subscribe; 795 796 monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS, true); 797 monc->pending_auth = 0; 798 if (!monc->m_auth) 799 goto out_auth_reply; 800 801 ceph_con_init(&monc->con, monc, &mon_con_ops, 802 &monc->client->msgr); 803 804 monc->cur_mon = -1; 805 monc->hunting = true; 806 monc->sub_renew_after = jiffies; 807 monc->sub_sent = 0; 808 809 INIT_DELAYED_WORK(&monc->delayed_work, delayed_work); 810 monc->generic_request_tree = RB_ROOT; 811 monc->num_generic_requests = 0; 812 monc->last_tid = 0; 813 814 monc->have_mdsmap = 0; 815 monc->have_osdmap = 0; 816 monc->want_next_osdmap = 1; 817 return 0; 818 819 out_auth_reply: 820 ceph_msg_put(monc->m_auth_reply); 821 out_subscribe: 822 ceph_msg_put(monc->m_subscribe); 823 out_subscribe_ack: 824 ceph_msg_put(monc->m_subscribe_ack); 825 out_auth: 826 ceph_auth_destroy(monc->auth); 827 out_monmap: 828 kfree(monc->monmap); 829 out: 830 return err; 831 } 832 EXPORT_SYMBOL(ceph_monc_init); 833 834 void ceph_monc_stop(struct ceph_mon_client *monc) 835 { 836 dout("stop\n"); 837 cancel_delayed_work_sync(&monc->delayed_work); 838 839 mutex_lock(&monc->mutex); 840 __close_session(monc); 841 842 mutex_unlock(&monc->mutex); 843 844 /* 845 * flush msgr queue before we destroy ourselves to ensure that: 846 * - any work that references our embedded con is finished. 847 * - any osd_client or other work that may reference an authorizer 848 * finishes before we shut down the auth subsystem. 849 */ 850 ceph_msgr_flush(); 851 852 ceph_auth_destroy(monc->auth); 853 854 ceph_msg_put(monc->m_auth); 855 ceph_msg_put(monc->m_auth_reply); 856 ceph_msg_put(monc->m_subscribe); 857 ceph_msg_put(monc->m_subscribe_ack); 858 859 kfree(monc->monmap); 860 } 861 EXPORT_SYMBOL(ceph_monc_stop); 862 863 static void handle_auth_reply(struct ceph_mon_client *monc, 864 struct ceph_msg *msg) 865 { 866 int ret; 867 int was_auth = 0; 868 869 mutex_lock(&monc->mutex); 870 if (monc->auth->ops) 871 was_auth = monc->auth->ops->is_authenticated(monc->auth); 872 monc->pending_auth = 0; 873 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, 874 msg->front.iov_len, 875 monc->m_auth->front.iov_base, 876 monc->m_auth->front_max); 877 if (ret < 0) { 878 monc->client->auth_err = ret; 879 wake_up_all(&monc->client->auth_wq); 880 } else if (ret > 0) { 881 __send_prepared_auth_request(monc, ret); 882 } else if (!was_auth && monc->auth->ops->is_authenticated(monc->auth)) { 883 dout("authenticated, starting session\n"); 884 885 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT; 886 monc->client->msgr.inst.name.num = 887 cpu_to_le64(monc->auth->global_id); 888 889 __send_subscribe(monc); 890 __resend_generic_request(monc); 891 } 892 mutex_unlock(&monc->mutex); 893 } 894 895 static int __validate_auth(struct ceph_mon_client *monc) 896 { 897 int ret; 898 899 if (monc->pending_auth) 900 return 0; 901 902 ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base, 903 monc->m_auth->front_max); 904 if (ret <= 0) 905 return ret; /* either an error, or no need to authenticate */ 906 __send_prepared_auth_request(monc, ret); 907 return 0; 908 } 909 910 int ceph_monc_validate_auth(struct ceph_mon_client *monc) 911 { 912 int ret; 913 914 mutex_lock(&monc->mutex); 915 ret = __validate_auth(monc); 916 mutex_unlock(&monc->mutex); 917 return ret; 918 } 919 EXPORT_SYMBOL(ceph_monc_validate_auth); 920 921 /* 922 * handle incoming message 923 */ 924 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 925 { 926 struct ceph_mon_client *monc = con->private; 927 int type = le16_to_cpu(msg->hdr.type); 928 929 if (!monc) 930 return; 931 932 switch (type) { 933 case CEPH_MSG_AUTH_REPLY: 934 handle_auth_reply(monc, msg); 935 break; 936 937 case CEPH_MSG_MON_SUBSCRIBE_ACK: 938 handle_subscribe_ack(monc, msg); 939 break; 940 941 case CEPH_MSG_STATFS_REPLY: 942 handle_statfs_reply(monc, msg); 943 break; 944 945 case CEPH_MSG_POOLOP_REPLY: 946 handle_poolop_reply(monc, msg); 947 break; 948 949 case CEPH_MSG_MON_MAP: 950 ceph_monc_handle_map(monc, msg); 951 break; 952 953 case CEPH_MSG_OSD_MAP: 954 ceph_osdc_handle_map(&monc->client->osdc, msg); 955 break; 956 957 default: 958 /* can the chained handler handle it? */ 959 if (monc->client->extra_mon_dispatch && 960 monc->client->extra_mon_dispatch(monc->client, msg) == 0) 961 break; 962 963 pr_err("received unknown message type %d %s\n", type, 964 ceph_msg_type_name(type)); 965 } 966 ceph_msg_put(msg); 967 } 968 969 /* 970 * Allocate memory for incoming message 971 */ 972 static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, 973 struct ceph_msg_header *hdr, 974 int *skip) 975 { 976 struct ceph_mon_client *monc = con->private; 977 int type = le16_to_cpu(hdr->type); 978 int front_len = le32_to_cpu(hdr->front_len); 979 struct ceph_msg *m = NULL; 980 981 *skip = 0; 982 983 switch (type) { 984 case CEPH_MSG_MON_SUBSCRIBE_ACK: 985 m = ceph_msg_get(monc->m_subscribe_ack); 986 break; 987 case CEPH_MSG_POOLOP_REPLY: 988 case CEPH_MSG_STATFS_REPLY: 989 return get_generic_reply(con, hdr, skip); 990 case CEPH_MSG_AUTH_REPLY: 991 m = ceph_msg_get(monc->m_auth_reply); 992 break; 993 case CEPH_MSG_MON_MAP: 994 case CEPH_MSG_MDS_MAP: 995 case CEPH_MSG_OSD_MAP: 996 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 997 if (!m) 998 return NULL; /* ENOMEM--return skip == 0 */ 999 break; 1000 } 1001 1002 if (!m) { 1003 pr_info("alloc_msg unknown type %d\n", type); 1004 *skip = 1; 1005 } 1006 return m; 1007 } 1008 1009 /* 1010 * If the monitor connection resets, pick a new monitor and resubmit 1011 * any pending requests. 1012 */ 1013 static void mon_fault(struct ceph_connection *con) 1014 { 1015 struct ceph_mon_client *monc = con->private; 1016 1017 if (!monc) 1018 return; 1019 1020 dout("mon_fault\n"); 1021 mutex_lock(&monc->mutex); 1022 if (!con->private) 1023 goto out; 1024 1025 if (!monc->hunting) 1026 pr_info("mon%d %s session lost, " 1027 "hunting for new mon\n", monc->cur_mon, 1028 ceph_pr_addr(&monc->con.peer_addr.in_addr)); 1029 1030 __close_session(monc); 1031 if (!monc->hunting) { 1032 /* start hunting */ 1033 monc->hunting = true; 1034 __open_session(monc); 1035 } else { 1036 /* already hunting, let's wait a bit */ 1037 __schedule_delayed(monc); 1038 } 1039 out: 1040 mutex_unlock(&monc->mutex); 1041 } 1042 1043 /* 1044 * We can ignore refcounting on the connection struct, as all references 1045 * will come from the messenger workqueue, which is drained prior to 1046 * mon_client destruction. 1047 */ 1048 static struct ceph_connection *con_get(struct ceph_connection *con) 1049 { 1050 return con; 1051 } 1052 1053 static void con_put(struct ceph_connection *con) 1054 { 1055 } 1056 1057 static const struct ceph_connection_operations mon_con_ops = { 1058 .get = con_get, 1059 .put = con_put, 1060 .dispatch = dispatch, 1061 .fault = mon_fault, 1062 .alloc_msg = mon_alloc_msg, 1063 }; 1064