1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/module.h> 5 #include <linux/types.h> 6 #include <linux/slab.h> 7 #include <linux/random.h> 8 #include <linux/sched.h> 9 10 #include <linux/ceph/ceph_features.h> 11 #include <linux/ceph/mon_client.h> 12 #include <linux/ceph/libceph.h> 13 #include <linux/ceph/debugfs.h> 14 #include <linux/ceph/decode.h> 15 #include <linux/ceph/auth.h> 16 17 /* 18 * Interact with Ceph monitor cluster. Handle requests for new map 19 * versions, and periodically resend as needed. Also implement 20 * statfs() and umount(). 21 * 22 * A small cluster of Ceph "monitors" are responsible for managing critical 23 * cluster configuration and state information. An odd number (e.g., 3, 5) 24 * of cmon daemons use a modified version of the Paxos part-time parliament 25 * algorithm to manage the MDS map (mds cluster membership), OSD map, and 26 * list of clients who have mounted the file system. 27 * 28 * We maintain an open, active session with a monitor at all times in order to 29 * receive timely MDSMap updates. We periodically send a keepalive byte on the 30 * TCP socket to ensure we detect a failure. If the connection does break, we 31 * randomly hunt for a new monitor. Once the connection is reestablished, we 32 * resend any outstanding requests. 33 */ 34 35 static const struct ceph_connection_operations mon_con_ops; 36 37 static int __validate_auth(struct ceph_mon_client *monc); 38 39 static int decode_mon_info(void **p, void *end, bool msgr2, 40 struct ceph_entity_addr *addr) 41 { 42 void *mon_info_end; 43 u32 struct_len; 44 u8 struct_v; 45 int ret; 46 47 ret = ceph_start_decoding(p, end, 1, "mon_info_t", &struct_v, 48 &struct_len); 49 if (ret) 50 return ret; 51 52 mon_info_end = *p + struct_len; 53 ceph_decode_skip_string(p, end, e_inval); /* skip mon name */ 54 ret = ceph_decode_entity_addrvec(p, end, msgr2, addr); 55 if (ret) 56 return ret; 57 58 *p = mon_info_end; 59 return 0; 60 61 e_inval: 62 return -EINVAL; 63 } 64 65 /* 66 * Decode a monmap blob (e.g., during mount). 67 * 68 * Assume MonMap v3 (i.e. encoding with MONNAMES and MONENC). 69 */ 70 static struct ceph_monmap *ceph_monmap_decode(void **p, void *end, bool msgr2) 71 { 72 struct ceph_monmap *monmap = NULL; 73 struct ceph_fsid fsid; 74 u32 struct_len; 75 int blob_len; 76 int num_mon; 77 u8 struct_v; 78 u32 epoch; 79 int ret; 80 int i; 81 82 ceph_decode_32_safe(p, end, blob_len, e_inval); 83 ceph_decode_need(p, end, blob_len, e_inval); 84 85 ret = ceph_start_decoding(p, end, 6, "monmap", &struct_v, &struct_len); 86 if (ret) 87 goto fail; 88 89 dout("%s struct_v %d\n", __func__, struct_v); 90 ceph_decode_copy_safe(p, end, &fsid, sizeof(fsid), e_inval); 91 ceph_decode_32_safe(p, end, epoch, e_inval); 92 if (struct_v >= 6) { 93 u32 feat_struct_len; 94 u8 feat_struct_v; 95 96 *p += sizeof(struct ceph_timespec); /* skip last_changed */ 97 *p += sizeof(struct ceph_timespec); /* skip created */ 98 99 ret = ceph_start_decoding(p, end, 1, "mon_feature_t", 100 &feat_struct_v, &feat_struct_len); 101 if (ret) 102 goto fail; 103 104 *p += feat_struct_len; /* skip persistent_features */ 105 106 ret = ceph_start_decoding(p, end, 1, "mon_feature_t", 107 &feat_struct_v, &feat_struct_len); 108 if (ret) 109 goto fail; 110 111 *p += feat_struct_len; /* skip optional_features */ 112 } 113 ceph_decode_32_safe(p, end, num_mon, e_inval); 114 115 dout("%s fsid %pU epoch %u num_mon %d\n", __func__, &fsid, epoch, 116 num_mon); 117 if (num_mon > CEPH_MAX_MON) 118 goto e_inval; 119 120 monmap = kmalloc(struct_size(monmap, mon_inst, num_mon), GFP_NOIO); 121 if (!monmap) { 122 ret = -ENOMEM; 123 goto fail; 124 } 125 monmap->fsid = fsid; 126 monmap->epoch = epoch; 127 monmap->num_mon = num_mon; 128 129 /* legacy_mon_addr map or mon_info map */ 130 for (i = 0; i < num_mon; i++) { 131 struct ceph_entity_inst *inst = &monmap->mon_inst[i]; 132 133 ceph_decode_skip_string(p, end, e_inval); /* skip mon name */ 134 inst->name.type = CEPH_ENTITY_TYPE_MON; 135 inst->name.num = cpu_to_le64(i); 136 137 if (struct_v >= 6) 138 ret = decode_mon_info(p, end, msgr2, &inst->addr); 139 else 140 ret = ceph_decode_entity_addr(p, end, &inst->addr); 141 if (ret) 142 goto fail; 143 144 dout("%s mon%d addr %s\n", __func__, i, 145 ceph_pr_addr(&inst->addr)); 146 } 147 148 return monmap; 149 150 e_inval: 151 ret = -EINVAL; 152 fail: 153 kfree(monmap); 154 return ERR_PTR(ret); 155 } 156 157 /* 158 * return true if *addr is included in the monmap. 159 */ 160 int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr) 161 { 162 int i; 163 164 for (i = 0; i < m->num_mon; i++) { 165 if (ceph_addr_equal_no_type(addr, &m->mon_inst[i].addr)) 166 return 1; 167 } 168 169 return 0; 170 } 171 172 /* 173 * Send an auth request. 174 */ 175 static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len) 176 { 177 monc->pending_auth = 1; 178 monc->m_auth->front.iov_len = len; 179 monc->m_auth->hdr.front_len = cpu_to_le32(len); 180 ceph_msg_revoke(monc->m_auth); 181 ceph_msg_get(monc->m_auth); /* keep our ref */ 182 ceph_con_send(&monc->con, monc->m_auth); 183 } 184 185 /* 186 * Close monitor session, if any. 187 */ 188 static void __close_session(struct ceph_mon_client *monc) 189 { 190 dout("__close_session closing mon%d\n", monc->cur_mon); 191 ceph_msg_revoke(monc->m_auth); 192 ceph_msg_revoke_incoming(monc->m_auth_reply); 193 ceph_msg_revoke(monc->m_subscribe); 194 ceph_msg_revoke_incoming(monc->m_subscribe_ack); 195 ceph_con_close(&monc->con); 196 197 monc->pending_auth = 0; 198 ceph_auth_reset(monc->auth); 199 } 200 201 /* 202 * Pick a new monitor at random and set cur_mon. If we are repicking 203 * (i.e. cur_mon is already set), be sure to pick a different one. 204 */ 205 static void pick_new_mon(struct ceph_mon_client *monc) 206 { 207 int old_mon = monc->cur_mon; 208 209 BUG_ON(monc->monmap->num_mon < 1); 210 211 if (monc->monmap->num_mon == 1) { 212 monc->cur_mon = 0; 213 } else { 214 int max = monc->monmap->num_mon; 215 int o = -1; 216 int n; 217 218 if (monc->cur_mon >= 0) { 219 if (monc->cur_mon < monc->monmap->num_mon) 220 o = monc->cur_mon; 221 if (o >= 0) 222 max--; 223 } 224 225 n = prandom_u32() % max; 226 if (o >= 0 && n >= o) 227 n++; 228 229 monc->cur_mon = n; 230 } 231 232 dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon, 233 monc->cur_mon, monc->monmap->num_mon); 234 } 235 236 /* 237 * Open a session with a new monitor. 238 */ 239 static void __open_session(struct ceph_mon_client *monc) 240 { 241 int ret; 242 243 pick_new_mon(monc); 244 245 monc->hunting = true; 246 if (monc->had_a_connection) { 247 monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF; 248 if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT) 249 monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT; 250 } 251 252 monc->sub_renew_after = jiffies; /* i.e., expired */ 253 monc->sub_renew_sent = 0; 254 255 dout("%s opening mon%d\n", __func__, monc->cur_mon); 256 ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon, 257 &monc->monmap->mon_inst[monc->cur_mon].addr); 258 259 /* 260 * send an initial keepalive to ensure our timestamp is valid 261 * by the time we are in an OPENED state 262 */ 263 ceph_con_keepalive(&monc->con); 264 265 /* initiate authentication handshake */ 266 ret = ceph_auth_build_hello(monc->auth, 267 monc->m_auth->front.iov_base, 268 monc->m_auth->front_alloc_len); 269 BUG_ON(ret <= 0); 270 __send_prepared_auth_request(monc, ret); 271 } 272 273 static void reopen_session(struct ceph_mon_client *monc) 274 { 275 if (!monc->hunting) 276 pr_info("mon%d %s session lost, hunting for new mon\n", 277 monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr)); 278 279 __close_session(monc); 280 __open_session(monc); 281 } 282 283 void ceph_monc_reopen_session(struct ceph_mon_client *monc) 284 { 285 mutex_lock(&monc->mutex); 286 reopen_session(monc); 287 mutex_unlock(&monc->mutex); 288 } 289 290 static void un_backoff(struct ceph_mon_client *monc) 291 { 292 monc->hunt_mult /= 2; /* reduce by 50% */ 293 if (monc->hunt_mult < 1) 294 monc->hunt_mult = 1; 295 dout("%s hunt_mult now %d\n", __func__, monc->hunt_mult); 296 } 297 298 /* 299 * Reschedule delayed work timer. 300 */ 301 static void __schedule_delayed(struct ceph_mon_client *monc) 302 { 303 unsigned long delay; 304 305 if (monc->hunting) 306 delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult; 307 else 308 delay = CEPH_MONC_PING_INTERVAL; 309 310 dout("__schedule_delayed after %lu\n", delay); 311 mod_delayed_work(system_wq, &monc->delayed_work, 312 round_jiffies_relative(delay)); 313 } 314 315 const char *ceph_sub_str[] = { 316 [CEPH_SUB_MONMAP] = "monmap", 317 [CEPH_SUB_OSDMAP] = "osdmap", 318 [CEPH_SUB_FSMAP] = "fsmap.user", 319 [CEPH_SUB_MDSMAP] = "mdsmap", 320 }; 321 322 /* 323 * Send subscribe request for one or more maps, according to 324 * monc->subs. 325 */ 326 static void __send_subscribe(struct ceph_mon_client *monc) 327 { 328 struct ceph_msg *msg = monc->m_subscribe; 329 void *p = msg->front.iov_base; 330 void *const end = p + msg->front_alloc_len; 331 int num = 0; 332 int i; 333 334 dout("%s sent %lu\n", __func__, monc->sub_renew_sent); 335 336 BUG_ON(monc->cur_mon < 0); 337 338 if (!monc->sub_renew_sent) 339 monc->sub_renew_sent = jiffies | 1; /* never 0 */ 340 341 msg->hdr.version = cpu_to_le16(2); 342 343 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) { 344 if (monc->subs[i].want) 345 num++; 346 } 347 BUG_ON(num < 1); /* monmap sub is always there */ 348 ceph_encode_32(&p, num); 349 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) { 350 char buf[32]; 351 int len; 352 353 if (!monc->subs[i].want) 354 continue; 355 356 len = sprintf(buf, "%s", ceph_sub_str[i]); 357 if (i == CEPH_SUB_MDSMAP && 358 monc->fs_cluster_id != CEPH_FS_CLUSTER_ID_NONE) 359 len += sprintf(buf + len, ".%d", monc->fs_cluster_id); 360 361 dout("%s %s start %llu flags 0x%x\n", __func__, buf, 362 le64_to_cpu(monc->subs[i].item.start), 363 monc->subs[i].item.flags); 364 ceph_encode_string(&p, end, buf, len); 365 memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item)); 366 p += sizeof(monc->subs[i].item); 367 } 368 369 BUG_ON(p > end); 370 msg->front.iov_len = p - msg->front.iov_base; 371 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 372 ceph_msg_revoke(msg); 373 ceph_con_send(&monc->con, ceph_msg_get(msg)); 374 } 375 376 static void handle_subscribe_ack(struct ceph_mon_client *monc, 377 struct ceph_msg *msg) 378 { 379 unsigned int seconds; 380 struct ceph_mon_subscribe_ack *h = msg->front.iov_base; 381 382 if (msg->front.iov_len < sizeof(*h)) 383 goto bad; 384 seconds = le32_to_cpu(h->duration); 385 386 mutex_lock(&monc->mutex); 387 if (monc->sub_renew_sent) { 388 /* 389 * This is only needed for legacy (infernalis or older) 390 * MONs -- see delayed_work(). 391 */ 392 monc->sub_renew_after = monc->sub_renew_sent + 393 (seconds >> 1) * HZ - 1; 394 dout("%s sent %lu duration %d renew after %lu\n", __func__, 395 monc->sub_renew_sent, seconds, monc->sub_renew_after); 396 monc->sub_renew_sent = 0; 397 } else { 398 dout("%s sent %lu renew after %lu, ignoring\n", __func__, 399 monc->sub_renew_sent, monc->sub_renew_after); 400 } 401 mutex_unlock(&monc->mutex); 402 return; 403 bad: 404 pr_err("got corrupt subscribe-ack msg\n"); 405 ceph_msg_dump(msg); 406 } 407 408 /* 409 * Register interest in a map 410 * 411 * @sub: one of CEPH_SUB_* 412 * @epoch: X for "every map since X", or 0 for "just the latest" 413 */ 414 static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub, 415 u32 epoch, bool continuous) 416 { 417 __le64 start = cpu_to_le64(epoch); 418 u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0; 419 420 dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub], 421 epoch, continuous); 422 423 if (monc->subs[sub].want && 424 monc->subs[sub].item.start == start && 425 monc->subs[sub].item.flags == flags) 426 return false; 427 428 monc->subs[sub].item.start = start; 429 monc->subs[sub].item.flags = flags; 430 monc->subs[sub].want = true; 431 432 return true; 433 } 434 435 bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch, 436 bool continuous) 437 { 438 bool need_request; 439 440 mutex_lock(&monc->mutex); 441 need_request = __ceph_monc_want_map(monc, sub, epoch, continuous); 442 mutex_unlock(&monc->mutex); 443 444 return need_request; 445 } 446 EXPORT_SYMBOL(ceph_monc_want_map); 447 448 /* 449 * Keep track of which maps we have 450 * 451 * @sub: one of CEPH_SUB_* 452 */ 453 static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub, 454 u32 epoch) 455 { 456 dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch); 457 458 if (monc->subs[sub].want) { 459 if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME) 460 monc->subs[sub].want = false; 461 else 462 monc->subs[sub].item.start = cpu_to_le64(epoch + 1); 463 } 464 465 monc->subs[sub].have = epoch; 466 } 467 468 void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch) 469 { 470 mutex_lock(&monc->mutex); 471 __ceph_monc_got_map(monc, sub, epoch); 472 mutex_unlock(&monc->mutex); 473 } 474 EXPORT_SYMBOL(ceph_monc_got_map); 475 476 void ceph_monc_renew_subs(struct ceph_mon_client *monc) 477 { 478 mutex_lock(&monc->mutex); 479 __send_subscribe(monc); 480 mutex_unlock(&monc->mutex); 481 } 482 EXPORT_SYMBOL(ceph_monc_renew_subs); 483 484 /* 485 * Wait for an osdmap with a given epoch. 486 * 487 * @epoch: epoch to wait for 488 * @timeout: in jiffies, 0 means "wait forever" 489 */ 490 int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, 491 unsigned long timeout) 492 { 493 unsigned long started = jiffies; 494 long ret; 495 496 mutex_lock(&monc->mutex); 497 while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) { 498 mutex_unlock(&monc->mutex); 499 500 if (timeout && time_after_eq(jiffies, started + timeout)) 501 return -ETIMEDOUT; 502 503 ret = wait_event_interruptible_timeout(monc->client->auth_wq, 504 monc->subs[CEPH_SUB_OSDMAP].have >= epoch, 505 ceph_timeout_jiffies(timeout)); 506 if (ret < 0) 507 return ret; 508 509 mutex_lock(&monc->mutex); 510 } 511 512 mutex_unlock(&monc->mutex); 513 return 0; 514 } 515 EXPORT_SYMBOL(ceph_monc_wait_osdmap); 516 517 /* 518 * Open a session with a random monitor. Request monmap and osdmap, 519 * which are waited upon in __ceph_open_session(). 520 */ 521 int ceph_monc_open_session(struct ceph_mon_client *monc) 522 { 523 mutex_lock(&monc->mutex); 524 __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true); 525 __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false); 526 __open_session(monc); 527 __schedule_delayed(monc); 528 mutex_unlock(&monc->mutex); 529 return 0; 530 } 531 EXPORT_SYMBOL(ceph_monc_open_session); 532 533 static void ceph_monc_handle_map(struct ceph_mon_client *monc, 534 struct ceph_msg *msg) 535 { 536 struct ceph_client *client = monc->client; 537 struct ceph_monmap *monmap; 538 void *p, *end; 539 540 mutex_lock(&monc->mutex); 541 542 dout("handle_monmap\n"); 543 p = msg->front.iov_base; 544 end = p + msg->front.iov_len; 545 546 monmap = ceph_monmap_decode(&p, end, false); 547 if (IS_ERR(monmap)) { 548 pr_err("problem decoding monmap, %d\n", 549 (int)PTR_ERR(monmap)); 550 ceph_msg_dump(msg); 551 goto out; 552 } 553 554 if (ceph_check_fsid(client, &monmap->fsid) < 0) { 555 kfree(monmap); 556 goto out; 557 } 558 559 kfree(monc->monmap); 560 monc->monmap = monmap; 561 562 __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch); 563 client->have_fsid = true; 564 565 out: 566 mutex_unlock(&monc->mutex); 567 wake_up_all(&client->auth_wq); 568 } 569 570 /* 571 * generic requests (currently statfs, mon_get_version) 572 */ 573 DEFINE_RB_FUNCS(generic_request, struct ceph_mon_generic_request, tid, node) 574 575 static void release_generic_request(struct kref *kref) 576 { 577 struct ceph_mon_generic_request *req = 578 container_of(kref, struct ceph_mon_generic_request, kref); 579 580 dout("%s greq %p request %p reply %p\n", __func__, req, req->request, 581 req->reply); 582 WARN_ON(!RB_EMPTY_NODE(&req->node)); 583 584 if (req->reply) 585 ceph_msg_put(req->reply); 586 if (req->request) 587 ceph_msg_put(req->request); 588 589 kfree(req); 590 } 591 592 static void put_generic_request(struct ceph_mon_generic_request *req) 593 { 594 if (req) 595 kref_put(&req->kref, release_generic_request); 596 } 597 598 static void get_generic_request(struct ceph_mon_generic_request *req) 599 { 600 kref_get(&req->kref); 601 } 602 603 static struct ceph_mon_generic_request * 604 alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp) 605 { 606 struct ceph_mon_generic_request *req; 607 608 req = kzalloc(sizeof(*req), gfp); 609 if (!req) 610 return NULL; 611 612 req->monc = monc; 613 kref_init(&req->kref); 614 RB_CLEAR_NODE(&req->node); 615 init_completion(&req->completion); 616 617 dout("%s greq %p\n", __func__, req); 618 return req; 619 } 620 621 static void register_generic_request(struct ceph_mon_generic_request *req) 622 { 623 struct ceph_mon_client *monc = req->monc; 624 625 WARN_ON(req->tid); 626 627 get_generic_request(req); 628 req->tid = ++monc->last_tid; 629 insert_generic_request(&monc->generic_request_tree, req); 630 } 631 632 static void send_generic_request(struct ceph_mon_client *monc, 633 struct ceph_mon_generic_request *req) 634 { 635 WARN_ON(!req->tid); 636 637 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 638 req->request->hdr.tid = cpu_to_le64(req->tid); 639 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 640 } 641 642 static void __finish_generic_request(struct ceph_mon_generic_request *req) 643 { 644 struct ceph_mon_client *monc = req->monc; 645 646 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 647 erase_generic_request(&monc->generic_request_tree, req); 648 649 ceph_msg_revoke(req->request); 650 ceph_msg_revoke_incoming(req->reply); 651 } 652 653 static void finish_generic_request(struct ceph_mon_generic_request *req) 654 { 655 __finish_generic_request(req); 656 put_generic_request(req); 657 } 658 659 static void complete_generic_request(struct ceph_mon_generic_request *req) 660 { 661 if (req->complete_cb) 662 req->complete_cb(req); 663 else 664 complete_all(&req->completion); 665 put_generic_request(req); 666 } 667 668 static void cancel_generic_request(struct ceph_mon_generic_request *req) 669 { 670 struct ceph_mon_client *monc = req->monc; 671 struct ceph_mon_generic_request *lookup_req; 672 673 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 674 675 mutex_lock(&monc->mutex); 676 lookup_req = lookup_generic_request(&monc->generic_request_tree, 677 req->tid); 678 if (lookup_req) { 679 WARN_ON(lookup_req != req); 680 finish_generic_request(req); 681 } 682 683 mutex_unlock(&monc->mutex); 684 } 685 686 static int wait_generic_request(struct ceph_mon_generic_request *req) 687 { 688 int ret; 689 690 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 691 ret = wait_for_completion_interruptible(&req->completion); 692 if (ret) 693 cancel_generic_request(req); 694 else 695 ret = req->result; /* completed */ 696 697 return ret; 698 } 699 700 static struct ceph_msg *get_generic_reply(struct ceph_connection *con, 701 struct ceph_msg_header *hdr, 702 int *skip) 703 { 704 struct ceph_mon_client *monc = con->private; 705 struct ceph_mon_generic_request *req; 706 u64 tid = le64_to_cpu(hdr->tid); 707 struct ceph_msg *m; 708 709 mutex_lock(&monc->mutex); 710 req = lookup_generic_request(&monc->generic_request_tree, tid); 711 if (!req) { 712 dout("get_generic_reply %lld dne\n", tid); 713 *skip = 1; 714 m = NULL; 715 } else { 716 dout("get_generic_reply %lld got %p\n", tid, req->reply); 717 *skip = 0; 718 m = ceph_msg_get(req->reply); 719 /* 720 * we don't need to track the connection reading into 721 * this reply because we only have one open connection 722 * at a time, ever. 723 */ 724 } 725 mutex_unlock(&monc->mutex); 726 return m; 727 } 728 729 /* 730 * statfs 731 */ 732 static void handle_statfs_reply(struct ceph_mon_client *monc, 733 struct ceph_msg *msg) 734 { 735 struct ceph_mon_generic_request *req; 736 struct ceph_mon_statfs_reply *reply = msg->front.iov_base; 737 u64 tid = le64_to_cpu(msg->hdr.tid); 738 739 dout("%s msg %p tid %llu\n", __func__, msg, tid); 740 741 if (msg->front.iov_len != sizeof(*reply)) 742 goto bad; 743 744 mutex_lock(&monc->mutex); 745 req = lookup_generic_request(&monc->generic_request_tree, tid); 746 if (!req) { 747 mutex_unlock(&monc->mutex); 748 return; 749 } 750 751 req->result = 0; 752 *req->u.st = reply->st; /* struct */ 753 __finish_generic_request(req); 754 mutex_unlock(&monc->mutex); 755 756 complete_generic_request(req); 757 return; 758 759 bad: 760 pr_err("corrupt statfs reply, tid %llu\n", tid); 761 ceph_msg_dump(msg); 762 } 763 764 /* 765 * Do a synchronous statfs(). 766 */ 767 int ceph_monc_do_statfs(struct ceph_mon_client *monc, u64 data_pool, 768 struct ceph_statfs *buf) 769 { 770 struct ceph_mon_generic_request *req; 771 struct ceph_mon_statfs *h; 772 int ret = -ENOMEM; 773 774 req = alloc_generic_request(monc, GFP_NOFS); 775 if (!req) 776 goto out; 777 778 req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS, 779 true); 780 if (!req->request) 781 goto out; 782 783 req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 64, GFP_NOFS, true); 784 if (!req->reply) 785 goto out; 786 787 req->u.st = buf; 788 req->request->hdr.version = cpu_to_le16(2); 789 790 mutex_lock(&monc->mutex); 791 register_generic_request(req); 792 /* fill out request */ 793 h = req->request->front.iov_base; 794 h->monhdr.have_version = 0; 795 h->monhdr.session_mon = cpu_to_le16(-1); 796 h->monhdr.session_mon_tid = 0; 797 h->fsid = monc->monmap->fsid; 798 h->contains_data_pool = (data_pool != CEPH_NOPOOL); 799 h->data_pool = cpu_to_le64(data_pool); 800 send_generic_request(monc, req); 801 mutex_unlock(&monc->mutex); 802 803 ret = wait_generic_request(req); 804 out: 805 put_generic_request(req); 806 return ret; 807 } 808 EXPORT_SYMBOL(ceph_monc_do_statfs); 809 810 static void handle_get_version_reply(struct ceph_mon_client *monc, 811 struct ceph_msg *msg) 812 { 813 struct ceph_mon_generic_request *req; 814 u64 tid = le64_to_cpu(msg->hdr.tid); 815 void *p = msg->front.iov_base; 816 void *end = p + msg->front_alloc_len; 817 u64 handle; 818 819 dout("%s msg %p tid %llu\n", __func__, msg, tid); 820 821 ceph_decode_need(&p, end, 2*sizeof(u64), bad); 822 handle = ceph_decode_64(&p); 823 if (tid != 0 && tid != handle) 824 goto bad; 825 826 mutex_lock(&monc->mutex); 827 req = lookup_generic_request(&monc->generic_request_tree, handle); 828 if (!req) { 829 mutex_unlock(&monc->mutex); 830 return; 831 } 832 833 req->result = 0; 834 req->u.newest = ceph_decode_64(&p); 835 __finish_generic_request(req); 836 mutex_unlock(&monc->mutex); 837 838 complete_generic_request(req); 839 return; 840 841 bad: 842 pr_err("corrupt mon_get_version reply, tid %llu\n", tid); 843 ceph_msg_dump(msg); 844 } 845 846 static struct ceph_mon_generic_request * 847 __ceph_monc_get_version(struct ceph_mon_client *monc, const char *what, 848 ceph_monc_callback_t cb, u64 private_data) 849 { 850 struct ceph_mon_generic_request *req; 851 852 req = alloc_generic_request(monc, GFP_NOIO); 853 if (!req) 854 goto err_put_req; 855 856 req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION, 857 sizeof(u64) + sizeof(u32) + strlen(what), 858 GFP_NOIO, true); 859 if (!req->request) 860 goto err_put_req; 861 862 req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 32, GFP_NOIO, 863 true); 864 if (!req->reply) 865 goto err_put_req; 866 867 req->complete_cb = cb; 868 req->private_data = private_data; 869 870 mutex_lock(&monc->mutex); 871 register_generic_request(req); 872 { 873 void *p = req->request->front.iov_base; 874 void *const end = p + req->request->front_alloc_len; 875 876 ceph_encode_64(&p, req->tid); /* handle */ 877 ceph_encode_string(&p, end, what, strlen(what)); 878 WARN_ON(p != end); 879 } 880 send_generic_request(monc, req); 881 mutex_unlock(&monc->mutex); 882 883 return req; 884 885 err_put_req: 886 put_generic_request(req); 887 return ERR_PTR(-ENOMEM); 888 } 889 890 /* 891 * Send MMonGetVersion and wait for the reply. 892 * 893 * @what: one of "mdsmap", "osdmap" or "monmap" 894 */ 895 int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what, 896 u64 *newest) 897 { 898 struct ceph_mon_generic_request *req; 899 int ret; 900 901 req = __ceph_monc_get_version(monc, what, NULL, 0); 902 if (IS_ERR(req)) 903 return PTR_ERR(req); 904 905 ret = wait_generic_request(req); 906 if (!ret) 907 *newest = req->u.newest; 908 909 put_generic_request(req); 910 return ret; 911 } 912 EXPORT_SYMBOL(ceph_monc_get_version); 913 914 /* 915 * Send MMonGetVersion, 916 * 917 * @what: one of "mdsmap", "osdmap" or "monmap" 918 */ 919 int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what, 920 ceph_monc_callback_t cb, u64 private_data) 921 { 922 struct ceph_mon_generic_request *req; 923 924 req = __ceph_monc_get_version(monc, what, cb, private_data); 925 if (IS_ERR(req)) 926 return PTR_ERR(req); 927 928 put_generic_request(req); 929 return 0; 930 } 931 EXPORT_SYMBOL(ceph_monc_get_version_async); 932 933 static void handle_command_ack(struct ceph_mon_client *monc, 934 struct ceph_msg *msg) 935 { 936 struct ceph_mon_generic_request *req; 937 void *p = msg->front.iov_base; 938 void *const end = p + msg->front_alloc_len; 939 u64 tid = le64_to_cpu(msg->hdr.tid); 940 941 dout("%s msg %p tid %llu\n", __func__, msg, tid); 942 943 ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) + 944 sizeof(u32), bad); 945 p += sizeof(struct ceph_mon_request_header); 946 947 mutex_lock(&monc->mutex); 948 req = lookup_generic_request(&monc->generic_request_tree, tid); 949 if (!req) { 950 mutex_unlock(&monc->mutex); 951 return; 952 } 953 954 req->result = ceph_decode_32(&p); 955 __finish_generic_request(req); 956 mutex_unlock(&monc->mutex); 957 958 complete_generic_request(req); 959 return; 960 961 bad: 962 pr_err("corrupt mon_command ack, tid %llu\n", tid); 963 ceph_msg_dump(msg); 964 } 965 966 static __printf(2, 0) 967 int do_mon_command_vargs(struct ceph_mon_client *monc, const char *fmt, 968 va_list ap) 969 { 970 struct ceph_mon_generic_request *req; 971 struct ceph_mon_command *h; 972 int ret = -ENOMEM; 973 int len; 974 975 req = alloc_generic_request(monc, GFP_NOIO); 976 if (!req) 977 goto out; 978 979 req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true); 980 if (!req->request) 981 goto out; 982 983 req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO, 984 true); 985 if (!req->reply) 986 goto out; 987 988 mutex_lock(&monc->mutex); 989 register_generic_request(req); 990 h = req->request->front.iov_base; 991 h->monhdr.have_version = 0; 992 h->monhdr.session_mon = cpu_to_le16(-1); 993 h->monhdr.session_mon_tid = 0; 994 h->fsid = monc->monmap->fsid; 995 h->num_strs = cpu_to_le32(1); 996 len = vsprintf(h->str, fmt, ap); 997 h->str_len = cpu_to_le32(len); 998 send_generic_request(monc, req); 999 mutex_unlock(&monc->mutex); 1000 1001 ret = wait_generic_request(req); 1002 out: 1003 put_generic_request(req); 1004 return ret; 1005 } 1006 1007 static __printf(2, 3) 1008 int do_mon_command(struct ceph_mon_client *monc, const char *fmt, ...) 1009 { 1010 va_list ap; 1011 int ret; 1012 1013 va_start(ap, fmt); 1014 ret = do_mon_command_vargs(monc, fmt, ap); 1015 va_end(ap); 1016 return ret; 1017 } 1018 1019 int ceph_monc_blocklist_add(struct ceph_mon_client *monc, 1020 struct ceph_entity_addr *client_addr) 1021 { 1022 int ret; 1023 1024 ret = do_mon_command(monc, 1025 "{ \"prefix\": \"osd blocklist\", \ 1026 \"blocklistop\": \"add\", \ 1027 \"addr\": \"%pISpc/%u\" }", 1028 &client_addr->in_addr, 1029 le32_to_cpu(client_addr->nonce)); 1030 if (ret == -EINVAL) { 1031 /* 1032 * The monitor returns EINVAL on an unrecognized command. 1033 * Try the legacy command -- it is exactly the same except 1034 * for the name. 1035 */ 1036 ret = do_mon_command(monc, 1037 "{ \"prefix\": \"osd blacklist\", \ 1038 \"blacklistop\": \"add\", \ 1039 \"addr\": \"%pISpc/%u\" }", 1040 &client_addr->in_addr, 1041 le32_to_cpu(client_addr->nonce)); 1042 } 1043 if (ret) 1044 return ret; 1045 1046 /* 1047 * Make sure we have the osdmap that includes the blocklist 1048 * entry. This is needed to ensure that the OSDs pick up the 1049 * new blocklist before processing any future requests from 1050 * this client. 1051 */ 1052 return ceph_wait_for_latest_osdmap(monc->client, 0); 1053 } 1054 EXPORT_SYMBOL(ceph_monc_blocklist_add); 1055 1056 /* 1057 * Resend pending generic requests. 1058 */ 1059 static void __resend_generic_request(struct ceph_mon_client *monc) 1060 { 1061 struct ceph_mon_generic_request *req; 1062 struct rb_node *p; 1063 1064 for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) { 1065 req = rb_entry(p, struct ceph_mon_generic_request, node); 1066 ceph_msg_revoke(req->request); 1067 ceph_msg_revoke_incoming(req->reply); 1068 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 1069 } 1070 } 1071 1072 /* 1073 * Delayed work. If we haven't mounted yet, retry. Otherwise, 1074 * renew/retry subscription as needed (in case it is timing out, or we 1075 * got an ENOMEM). And keep the monitor connection alive. 1076 */ 1077 static void delayed_work(struct work_struct *work) 1078 { 1079 struct ceph_mon_client *monc = 1080 container_of(work, struct ceph_mon_client, delayed_work.work); 1081 1082 dout("monc delayed_work\n"); 1083 mutex_lock(&monc->mutex); 1084 if (monc->hunting) { 1085 dout("%s continuing hunt\n", __func__); 1086 reopen_session(monc); 1087 } else { 1088 int is_auth = ceph_auth_is_authenticated(monc->auth); 1089 if (ceph_con_keepalive_expired(&monc->con, 1090 CEPH_MONC_PING_TIMEOUT)) { 1091 dout("monc keepalive timeout\n"); 1092 is_auth = 0; 1093 reopen_session(monc); 1094 } 1095 1096 if (!monc->hunting) { 1097 ceph_con_keepalive(&monc->con); 1098 __validate_auth(monc); 1099 un_backoff(monc); 1100 } 1101 1102 if (is_auth && 1103 !(monc->con.peer_features & CEPH_FEATURE_MON_STATEFUL_SUB)) { 1104 unsigned long now = jiffies; 1105 1106 dout("%s renew subs? now %lu renew after %lu\n", 1107 __func__, now, monc->sub_renew_after); 1108 if (time_after_eq(now, monc->sub_renew_after)) 1109 __send_subscribe(monc); 1110 } 1111 } 1112 __schedule_delayed(monc); 1113 mutex_unlock(&monc->mutex); 1114 } 1115 1116 /* 1117 * On startup, we build a temporary monmap populated with the IPs 1118 * provided by mount(2). 1119 */ 1120 static int build_initial_monmap(struct ceph_mon_client *monc) 1121 { 1122 struct ceph_options *opt = monc->client->options; 1123 struct ceph_entity_addr *mon_addr = opt->mon_addr; 1124 int num_mon = opt->num_mon; 1125 int i; 1126 1127 /* build initial monmap */ 1128 monc->monmap = kzalloc(struct_size(monc->monmap, mon_inst, num_mon), 1129 GFP_KERNEL); 1130 if (!monc->monmap) 1131 return -ENOMEM; 1132 for (i = 0; i < num_mon; i++) { 1133 monc->monmap->mon_inst[i].addr = mon_addr[i]; 1134 monc->monmap->mon_inst[i].addr.nonce = 0; 1135 monc->monmap->mon_inst[i].name.type = 1136 CEPH_ENTITY_TYPE_MON; 1137 monc->monmap->mon_inst[i].name.num = cpu_to_le64(i); 1138 } 1139 monc->monmap->num_mon = num_mon; 1140 return 0; 1141 } 1142 1143 int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) 1144 { 1145 int err = 0; 1146 1147 dout("init\n"); 1148 memset(monc, 0, sizeof(*monc)); 1149 monc->client = cl; 1150 monc->monmap = NULL; 1151 mutex_init(&monc->mutex); 1152 1153 err = build_initial_monmap(monc); 1154 if (err) 1155 goto out; 1156 1157 /* connection */ 1158 /* authentication */ 1159 monc->auth = ceph_auth_init(cl->options->name, cl->options->key, 1160 cl->options->con_modes); 1161 if (IS_ERR(monc->auth)) { 1162 err = PTR_ERR(monc->auth); 1163 goto out_monmap; 1164 } 1165 monc->auth->want_keys = 1166 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON | 1167 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS; 1168 1169 /* msgs */ 1170 err = -ENOMEM; 1171 monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK, 1172 sizeof(struct ceph_mon_subscribe_ack), 1173 GFP_KERNEL, true); 1174 if (!monc->m_subscribe_ack) 1175 goto out_auth; 1176 1177 monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128, 1178 GFP_KERNEL, true); 1179 if (!monc->m_subscribe) 1180 goto out_subscribe_ack; 1181 1182 monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, 1183 GFP_KERNEL, true); 1184 if (!monc->m_auth_reply) 1185 goto out_subscribe; 1186 1187 monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_KERNEL, true); 1188 monc->pending_auth = 0; 1189 if (!monc->m_auth) 1190 goto out_auth_reply; 1191 1192 ceph_con_init(&monc->con, monc, &mon_con_ops, 1193 &monc->client->msgr); 1194 1195 monc->cur_mon = -1; 1196 monc->had_a_connection = false; 1197 monc->hunt_mult = 1; 1198 1199 INIT_DELAYED_WORK(&monc->delayed_work, delayed_work); 1200 monc->generic_request_tree = RB_ROOT; 1201 monc->last_tid = 0; 1202 1203 monc->fs_cluster_id = CEPH_FS_CLUSTER_ID_NONE; 1204 1205 return 0; 1206 1207 out_auth_reply: 1208 ceph_msg_put(monc->m_auth_reply); 1209 out_subscribe: 1210 ceph_msg_put(monc->m_subscribe); 1211 out_subscribe_ack: 1212 ceph_msg_put(monc->m_subscribe_ack); 1213 out_auth: 1214 ceph_auth_destroy(monc->auth); 1215 out_monmap: 1216 kfree(monc->monmap); 1217 out: 1218 return err; 1219 } 1220 EXPORT_SYMBOL(ceph_monc_init); 1221 1222 void ceph_monc_stop(struct ceph_mon_client *monc) 1223 { 1224 dout("stop\n"); 1225 cancel_delayed_work_sync(&monc->delayed_work); 1226 1227 mutex_lock(&monc->mutex); 1228 __close_session(monc); 1229 monc->cur_mon = -1; 1230 mutex_unlock(&monc->mutex); 1231 1232 /* 1233 * flush msgr queue before we destroy ourselves to ensure that: 1234 * - any work that references our embedded con is finished. 1235 * - any osd_client or other work that may reference an authorizer 1236 * finishes before we shut down the auth subsystem. 1237 */ 1238 ceph_msgr_flush(); 1239 1240 ceph_auth_destroy(monc->auth); 1241 1242 WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree)); 1243 1244 ceph_msg_put(monc->m_auth); 1245 ceph_msg_put(monc->m_auth_reply); 1246 ceph_msg_put(monc->m_subscribe); 1247 ceph_msg_put(monc->m_subscribe_ack); 1248 1249 kfree(monc->monmap); 1250 } 1251 EXPORT_SYMBOL(ceph_monc_stop); 1252 1253 static void finish_hunting(struct ceph_mon_client *monc) 1254 { 1255 if (monc->hunting) { 1256 dout("%s found mon%d\n", __func__, monc->cur_mon); 1257 monc->hunting = false; 1258 monc->had_a_connection = true; 1259 un_backoff(monc); 1260 __schedule_delayed(monc); 1261 } 1262 } 1263 1264 static void finish_auth(struct ceph_mon_client *monc, int auth_err, 1265 bool was_authed) 1266 { 1267 dout("%s auth_err %d was_authed %d\n", __func__, auth_err, was_authed); 1268 WARN_ON(auth_err > 0); 1269 1270 monc->pending_auth = 0; 1271 if (auth_err) { 1272 monc->client->auth_err = auth_err; 1273 wake_up_all(&monc->client->auth_wq); 1274 return; 1275 } 1276 1277 if (!was_authed && ceph_auth_is_authenticated(monc->auth)) { 1278 dout("%s authenticated, starting session global_id %llu\n", 1279 __func__, monc->auth->global_id); 1280 1281 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT; 1282 monc->client->msgr.inst.name.num = 1283 cpu_to_le64(monc->auth->global_id); 1284 1285 __send_subscribe(monc); 1286 __resend_generic_request(monc); 1287 1288 pr_info("mon%d %s session established\n", monc->cur_mon, 1289 ceph_pr_addr(&monc->con.peer_addr)); 1290 } 1291 } 1292 1293 static void handle_auth_reply(struct ceph_mon_client *monc, 1294 struct ceph_msg *msg) 1295 { 1296 bool was_authed; 1297 int ret; 1298 1299 mutex_lock(&monc->mutex); 1300 was_authed = ceph_auth_is_authenticated(monc->auth); 1301 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, 1302 msg->front.iov_len, 1303 monc->m_auth->front.iov_base, 1304 monc->m_auth->front_alloc_len); 1305 if (ret > 0) { 1306 __send_prepared_auth_request(monc, ret); 1307 } else { 1308 finish_auth(monc, ret, was_authed); 1309 finish_hunting(monc); 1310 } 1311 mutex_unlock(&monc->mutex); 1312 } 1313 1314 static int __validate_auth(struct ceph_mon_client *monc) 1315 { 1316 int ret; 1317 1318 if (monc->pending_auth) 1319 return 0; 1320 1321 ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base, 1322 monc->m_auth->front_alloc_len); 1323 if (ret <= 0) 1324 return ret; /* either an error, or no need to authenticate */ 1325 __send_prepared_auth_request(monc, ret); 1326 return 0; 1327 } 1328 1329 int ceph_monc_validate_auth(struct ceph_mon_client *monc) 1330 { 1331 int ret; 1332 1333 mutex_lock(&monc->mutex); 1334 ret = __validate_auth(monc); 1335 mutex_unlock(&monc->mutex); 1336 return ret; 1337 } 1338 EXPORT_SYMBOL(ceph_monc_validate_auth); 1339 1340 /* 1341 * handle incoming message 1342 */ 1343 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 1344 { 1345 struct ceph_mon_client *monc = con->private; 1346 int type = le16_to_cpu(msg->hdr.type); 1347 1348 switch (type) { 1349 case CEPH_MSG_AUTH_REPLY: 1350 handle_auth_reply(monc, msg); 1351 break; 1352 1353 case CEPH_MSG_MON_SUBSCRIBE_ACK: 1354 handle_subscribe_ack(monc, msg); 1355 break; 1356 1357 case CEPH_MSG_STATFS_REPLY: 1358 handle_statfs_reply(monc, msg); 1359 break; 1360 1361 case CEPH_MSG_MON_GET_VERSION_REPLY: 1362 handle_get_version_reply(monc, msg); 1363 break; 1364 1365 case CEPH_MSG_MON_COMMAND_ACK: 1366 handle_command_ack(monc, msg); 1367 break; 1368 1369 case CEPH_MSG_MON_MAP: 1370 ceph_monc_handle_map(monc, msg); 1371 break; 1372 1373 case CEPH_MSG_OSD_MAP: 1374 ceph_osdc_handle_map(&monc->client->osdc, msg); 1375 break; 1376 1377 default: 1378 /* can the chained handler handle it? */ 1379 if (monc->client->extra_mon_dispatch && 1380 monc->client->extra_mon_dispatch(monc->client, msg) == 0) 1381 break; 1382 1383 pr_err("received unknown message type %d %s\n", type, 1384 ceph_msg_type_name(type)); 1385 } 1386 ceph_msg_put(msg); 1387 } 1388 1389 /* 1390 * Allocate memory for incoming message 1391 */ 1392 static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, 1393 struct ceph_msg_header *hdr, 1394 int *skip) 1395 { 1396 struct ceph_mon_client *monc = con->private; 1397 int type = le16_to_cpu(hdr->type); 1398 int front_len = le32_to_cpu(hdr->front_len); 1399 struct ceph_msg *m = NULL; 1400 1401 *skip = 0; 1402 1403 switch (type) { 1404 case CEPH_MSG_MON_SUBSCRIBE_ACK: 1405 m = ceph_msg_get(monc->m_subscribe_ack); 1406 break; 1407 case CEPH_MSG_STATFS_REPLY: 1408 case CEPH_MSG_MON_COMMAND_ACK: 1409 return get_generic_reply(con, hdr, skip); 1410 case CEPH_MSG_AUTH_REPLY: 1411 m = ceph_msg_get(monc->m_auth_reply); 1412 break; 1413 case CEPH_MSG_MON_GET_VERSION_REPLY: 1414 if (le64_to_cpu(hdr->tid) != 0) 1415 return get_generic_reply(con, hdr, skip); 1416 1417 /* 1418 * Older OSDs don't set reply tid even if the orignal 1419 * request had a non-zero tid. Work around this weirdness 1420 * by allocating a new message. 1421 */ 1422 fallthrough; 1423 case CEPH_MSG_MON_MAP: 1424 case CEPH_MSG_MDS_MAP: 1425 case CEPH_MSG_OSD_MAP: 1426 case CEPH_MSG_FS_MAP_USER: 1427 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 1428 if (!m) 1429 return NULL; /* ENOMEM--return skip == 0 */ 1430 break; 1431 } 1432 1433 if (!m) { 1434 pr_info("alloc_msg unknown type %d\n", type); 1435 *skip = 1; 1436 } else if (front_len > m->front_alloc_len) { 1437 pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n", 1438 front_len, m->front_alloc_len, 1439 (unsigned int)con->peer_name.type, 1440 le64_to_cpu(con->peer_name.num)); 1441 ceph_msg_put(m); 1442 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 1443 } 1444 1445 return m; 1446 } 1447 1448 /* 1449 * If the monitor connection resets, pick a new monitor and resubmit 1450 * any pending requests. 1451 */ 1452 static void mon_fault(struct ceph_connection *con) 1453 { 1454 struct ceph_mon_client *monc = con->private; 1455 1456 mutex_lock(&monc->mutex); 1457 dout("%s mon%d\n", __func__, monc->cur_mon); 1458 if (monc->cur_mon >= 0) { 1459 if (!monc->hunting) { 1460 dout("%s hunting for new mon\n", __func__); 1461 reopen_session(monc); 1462 __schedule_delayed(monc); 1463 } else { 1464 dout("%s already hunting\n", __func__); 1465 } 1466 } 1467 mutex_unlock(&monc->mutex); 1468 } 1469 1470 /* 1471 * We can ignore refcounting on the connection struct, as all references 1472 * will come from the messenger workqueue, which is drained prior to 1473 * mon_client destruction. 1474 */ 1475 static struct ceph_connection *con_get(struct ceph_connection *con) 1476 { 1477 return con; 1478 } 1479 1480 static void con_put(struct ceph_connection *con) 1481 { 1482 } 1483 1484 static const struct ceph_connection_operations mon_con_ops = { 1485 .get = con_get, 1486 .put = con_put, 1487 .dispatch = dispatch, 1488 .fault = mon_fault, 1489 .alloc_msg = mon_alloc_msg, 1490 }; 1491