1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/module.h> 4 #include <linux/err.h> 5 #include <linux/highmem.h> 6 #include <linux/mm.h> 7 #include <linux/pagemap.h> 8 #include <linux/slab.h> 9 #include <linux/uaccess.h> 10 #ifdef CONFIG_BLOCK 11 #include <linux/bio.h> 12 #endif 13 14 #include <linux/ceph/libceph.h> 15 #include <linux/ceph/osd_client.h> 16 #include <linux/ceph/messenger.h> 17 #include <linux/ceph/decode.h> 18 #include <linux/ceph/auth.h> 19 #include <linux/ceph/pagelist.h> 20 21 #define OSD_OP_FRONT_LEN 4096 22 #define OSD_OPREPLY_FRONT_LEN 512 23 24 static const struct ceph_connection_operations osd_con_ops; 25 static int __kick_requests(struct ceph_osd_client *osdc, 26 struct ceph_osd *kickosd); 27 28 static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd); 29 30 static int op_needs_trail(int op) 31 { 32 switch (op) { 33 case CEPH_OSD_OP_GETXATTR: 34 case CEPH_OSD_OP_SETXATTR: 35 case CEPH_OSD_OP_CMPXATTR: 36 case CEPH_OSD_OP_CALL: 37 return 1; 38 default: 39 return 0; 40 } 41 } 42 43 static int op_has_extent(int op) 44 { 45 return (op == CEPH_OSD_OP_READ || 46 op == CEPH_OSD_OP_WRITE); 47 } 48 49 void ceph_calc_raw_layout(struct ceph_osd_client *osdc, 50 struct ceph_file_layout *layout, 51 u64 snapid, 52 u64 off, u64 *plen, u64 *bno, 53 struct ceph_osd_request *req, 54 struct ceph_osd_req_op *op) 55 { 56 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; 57 u64 orig_len = *plen; 58 u64 objoff, objlen; /* extent in object */ 59 60 reqhead->snapid = cpu_to_le64(snapid); 61 62 /* object extent? */ 63 ceph_calc_file_object_mapping(layout, off, plen, bno, 64 &objoff, &objlen); 65 if (*plen < orig_len) 66 dout(" skipping last %llu, final file extent %llu~%llu\n", 67 orig_len - *plen, off, *plen); 68 69 if (op_has_extent(op->op)) { 70 op->extent.offset = objoff; 71 op->extent.length = objlen; 72 } 73 req->r_num_pages = calc_pages_for(off, *plen); 74 if (op->op == CEPH_OSD_OP_WRITE) 75 op->payload_len = *plen; 76 77 dout("calc_layout bno=%llx %llu~%llu (%d pages)\n", 78 *bno, objoff, objlen, req->r_num_pages); 79 80 } 81 EXPORT_SYMBOL(ceph_calc_raw_layout); 82 83 /* 84 * Implement client access to distributed object storage cluster. 85 * 86 * All data objects are stored within a cluster/cloud of OSDs, or 87 * "object storage devices." (Note that Ceph OSDs have _nothing_ to 88 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply 89 * remote daemons serving up and coordinating consistent and safe 90 * access to storage. 91 * 92 * Cluster membership and the mapping of data objects onto storage devices 93 * are described by the osd map. 94 * 95 * We keep track of pending OSD requests (read, write), resubmit 96 * requests to different OSDs when the cluster topology/data layout 97 * change, or retry the affected requests when the communications 98 * channel with an OSD is reset. 99 */ 100 101 /* 102 * calculate the mapping of a file extent onto an object, and fill out the 103 * request accordingly. shorten extent as necessary if it crosses an 104 * object boundary. 105 * 106 * fill osd op in request message. 107 */ 108 static void calc_layout(struct ceph_osd_client *osdc, 109 struct ceph_vino vino, 110 struct ceph_file_layout *layout, 111 u64 off, u64 *plen, 112 struct ceph_osd_request *req, 113 struct ceph_osd_req_op *op) 114 { 115 u64 bno; 116 117 ceph_calc_raw_layout(osdc, layout, vino.snap, off, 118 plen, &bno, req, op); 119 120 sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno); 121 req->r_oid_len = strlen(req->r_oid); 122 } 123 124 /* 125 * requests 126 */ 127 void ceph_osdc_release_request(struct kref *kref) 128 { 129 struct ceph_osd_request *req = container_of(kref, 130 struct ceph_osd_request, 131 r_kref); 132 133 if (req->r_request) 134 ceph_msg_put(req->r_request); 135 if (req->r_reply) 136 ceph_msg_put(req->r_reply); 137 if (req->r_con_filling_msg) { 138 dout("release_request revoking pages %p from con %p\n", 139 req->r_pages, req->r_con_filling_msg); 140 ceph_con_revoke_message(req->r_con_filling_msg, 141 req->r_reply); 142 ceph_con_put(req->r_con_filling_msg); 143 } 144 if (req->r_own_pages) 145 ceph_release_page_vector(req->r_pages, 146 req->r_num_pages); 147 #ifdef CONFIG_BLOCK 148 if (req->r_bio) 149 bio_put(req->r_bio); 150 #endif 151 ceph_put_snap_context(req->r_snapc); 152 if (req->r_trail) { 153 ceph_pagelist_release(req->r_trail); 154 kfree(req->r_trail); 155 } 156 if (req->r_mempool) 157 mempool_free(req, req->r_osdc->req_mempool); 158 else 159 kfree(req); 160 } 161 EXPORT_SYMBOL(ceph_osdc_release_request); 162 163 static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail) 164 { 165 int i = 0; 166 167 if (needs_trail) 168 *needs_trail = 0; 169 while (ops[i].op) { 170 if (needs_trail && op_needs_trail(ops[i].op)) 171 *needs_trail = 1; 172 i++; 173 } 174 175 return i; 176 } 177 178 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 179 int flags, 180 struct ceph_snap_context *snapc, 181 struct ceph_osd_req_op *ops, 182 bool use_mempool, 183 gfp_t gfp_flags, 184 struct page **pages, 185 struct bio *bio) 186 { 187 struct ceph_osd_request *req; 188 struct ceph_msg *msg; 189 int needs_trail; 190 int num_op = get_num_ops(ops, &needs_trail); 191 size_t msg_size = sizeof(struct ceph_osd_request_head); 192 193 msg_size += num_op*sizeof(struct ceph_osd_op); 194 195 if (use_mempool) { 196 req = mempool_alloc(osdc->req_mempool, gfp_flags); 197 memset(req, 0, sizeof(*req)); 198 } else { 199 req = kzalloc(sizeof(*req), gfp_flags); 200 } 201 if (req == NULL) 202 return NULL; 203 204 req->r_osdc = osdc; 205 req->r_mempool = use_mempool; 206 207 kref_init(&req->r_kref); 208 init_completion(&req->r_completion); 209 init_completion(&req->r_safe_completion); 210 INIT_LIST_HEAD(&req->r_unsafe_item); 211 req->r_flags = flags; 212 213 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); 214 215 /* create reply message */ 216 if (use_mempool) 217 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 218 else 219 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, 220 OSD_OPREPLY_FRONT_LEN, gfp_flags); 221 if (!msg) { 222 ceph_osdc_put_request(req); 223 return NULL; 224 } 225 req->r_reply = msg; 226 227 /* allocate space for the trailing data */ 228 if (needs_trail) { 229 req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags); 230 if (!req->r_trail) { 231 ceph_osdc_put_request(req); 232 return NULL; 233 } 234 ceph_pagelist_init(req->r_trail); 235 } 236 /* create request message; allow space for oid */ 237 msg_size += 40; 238 if (snapc) 239 msg_size += sizeof(u64) * snapc->num_snaps; 240 if (use_mempool) 241 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 242 else 243 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags); 244 if (!msg) { 245 ceph_osdc_put_request(req); 246 return NULL; 247 } 248 249 msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP); 250 memset(msg->front.iov_base, 0, msg->front.iov_len); 251 252 req->r_request = msg; 253 req->r_pages = pages; 254 #ifdef CONFIG_BLOCK 255 if (bio) { 256 req->r_bio = bio; 257 bio_get(req->r_bio); 258 } 259 #endif 260 261 return req; 262 } 263 EXPORT_SYMBOL(ceph_osdc_alloc_request); 264 265 static void osd_req_encode_op(struct ceph_osd_request *req, 266 struct ceph_osd_op *dst, 267 struct ceph_osd_req_op *src) 268 { 269 dst->op = cpu_to_le16(src->op); 270 271 switch (dst->op) { 272 case CEPH_OSD_OP_READ: 273 case CEPH_OSD_OP_WRITE: 274 dst->extent.offset = 275 cpu_to_le64(src->extent.offset); 276 dst->extent.length = 277 cpu_to_le64(src->extent.length); 278 dst->extent.truncate_size = 279 cpu_to_le64(src->extent.truncate_size); 280 dst->extent.truncate_seq = 281 cpu_to_le32(src->extent.truncate_seq); 282 break; 283 284 case CEPH_OSD_OP_GETXATTR: 285 case CEPH_OSD_OP_SETXATTR: 286 case CEPH_OSD_OP_CMPXATTR: 287 BUG_ON(!req->r_trail); 288 289 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len); 290 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len); 291 dst->xattr.cmp_op = src->xattr.cmp_op; 292 dst->xattr.cmp_mode = src->xattr.cmp_mode; 293 ceph_pagelist_append(req->r_trail, src->xattr.name, 294 src->xattr.name_len); 295 ceph_pagelist_append(req->r_trail, src->xattr.val, 296 src->xattr.value_len); 297 break; 298 case CEPH_OSD_OP_CALL: 299 BUG_ON(!req->r_trail); 300 301 dst->cls.class_len = src->cls.class_len; 302 dst->cls.method_len = src->cls.method_len; 303 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len); 304 305 ceph_pagelist_append(req->r_trail, src->cls.class_name, 306 src->cls.class_len); 307 ceph_pagelist_append(req->r_trail, src->cls.method_name, 308 src->cls.method_len); 309 ceph_pagelist_append(req->r_trail, src->cls.indata, 310 src->cls.indata_len); 311 break; 312 case CEPH_OSD_OP_ROLLBACK: 313 dst->snap.snapid = cpu_to_le64(src->snap.snapid); 314 break; 315 case CEPH_OSD_OP_STARTSYNC: 316 break; 317 default: 318 pr_err("unrecognized osd opcode %d\n", dst->op); 319 WARN_ON(1); 320 break; 321 } 322 dst->payload_len = cpu_to_le32(src->payload_len); 323 } 324 325 /* 326 * build new request AND message 327 * 328 */ 329 void ceph_osdc_build_request(struct ceph_osd_request *req, 330 u64 off, u64 *plen, 331 struct ceph_osd_req_op *src_ops, 332 struct ceph_snap_context *snapc, 333 struct timespec *mtime, 334 const char *oid, 335 int oid_len) 336 { 337 struct ceph_msg *msg = req->r_request; 338 struct ceph_osd_request_head *head; 339 struct ceph_osd_req_op *src_op; 340 struct ceph_osd_op *op; 341 void *p; 342 int num_op = get_num_ops(src_ops, NULL); 343 size_t msg_size = sizeof(*head) + num_op*sizeof(*op); 344 int flags = req->r_flags; 345 u64 data_len = 0; 346 int i; 347 348 head = msg->front.iov_base; 349 op = (void *)(head + 1); 350 p = (void *)(op + num_op); 351 352 req->r_snapc = ceph_get_snap_context(snapc); 353 354 head->client_inc = cpu_to_le32(1); /* always, for now. */ 355 head->flags = cpu_to_le32(flags); 356 if (flags & CEPH_OSD_FLAG_WRITE) 357 ceph_encode_timespec(&head->mtime, mtime); 358 head->num_ops = cpu_to_le16(num_op); 359 360 361 /* fill in oid */ 362 head->object_len = cpu_to_le32(oid_len); 363 memcpy(p, oid, oid_len); 364 p += oid_len; 365 366 src_op = src_ops; 367 while (src_op->op) { 368 osd_req_encode_op(req, op, src_op); 369 src_op++; 370 op++; 371 } 372 373 if (req->r_trail) 374 data_len += req->r_trail->length; 375 376 if (snapc) { 377 head->snap_seq = cpu_to_le64(snapc->seq); 378 head->num_snaps = cpu_to_le32(snapc->num_snaps); 379 for (i = 0; i < snapc->num_snaps; i++) { 380 put_unaligned_le64(snapc->snaps[i], p); 381 p += sizeof(u64); 382 } 383 } 384 385 if (flags & CEPH_OSD_FLAG_WRITE) { 386 req->r_request->hdr.data_off = cpu_to_le16(off); 387 req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len); 388 } else if (data_len) { 389 req->r_request->hdr.data_off = 0; 390 req->r_request->hdr.data_len = cpu_to_le32(data_len); 391 } 392 393 BUG_ON(p > msg->front.iov_base + msg->front.iov_len); 394 msg_size = p - msg->front.iov_base; 395 msg->front.iov_len = msg_size; 396 msg->hdr.front_len = cpu_to_le32(msg_size); 397 return; 398 } 399 EXPORT_SYMBOL(ceph_osdc_build_request); 400 401 /* 402 * build new request AND message, calculate layout, and adjust file 403 * extent as needed. 404 * 405 * if the file was recently truncated, we include information about its 406 * old and new size so that the object can be updated appropriately. (we 407 * avoid synchronously deleting truncated objects because it's slow.) 408 * 409 * if @do_sync, include a 'startsync' command so that the osd will flush 410 * data quickly. 411 */ 412 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, 413 struct ceph_file_layout *layout, 414 struct ceph_vino vino, 415 u64 off, u64 *plen, 416 int opcode, int flags, 417 struct ceph_snap_context *snapc, 418 int do_sync, 419 u32 truncate_seq, 420 u64 truncate_size, 421 struct timespec *mtime, 422 bool use_mempool, int num_reply) 423 { 424 struct ceph_osd_req_op ops[3]; 425 struct ceph_osd_request *req; 426 427 ops[0].op = opcode; 428 ops[0].extent.truncate_seq = truncate_seq; 429 ops[0].extent.truncate_size = truncate_size; 430 ops[0].payload_len = 0; 431 432 if (do_sync) { 433 ops[1].op = CEPH_OSD_OP_STARTSYNC; 434 ops[1].payload_len = 0; 435 ops[2].op = 0; 436 } else 437 ops[1].op = 0; 438 439 req = ceph_osdc_alloc_request(osdc, flags, 440 snapc, ops, 441 use_mempool, 442 GFP_NOFS, NULL, NULL); 443 if (IS_ERR(req)) 444 return req; 445 446 /* calculate max write size */ 447 calc_layout(osdc, vino, layout, off, plen, req, ops); 448 req->r_file_layout = *layout; /* keep a copy */ 449 450 ceph_osdc_build_request(req, off, plen, ops, 451 snapc, 452 mtime, 453 req->r_oid, req->r_oid_len); 454 455 return req; 456 } 457 EXPORT_SYMBOL(ceph_osdc_new_request); 458 459 /* 460 * We keep osd requests in an rbtree, sorted by ->r_tid. 461 */ 462 static void __insert_request(struct ceph_osd_client *osdc, 463 struct ceph_osd_request *new) 464 { 465 struct rb_node **p = &osdc->requests.rb_node; 466 struct rb_node *parent = NULL; 467 struct ceph_osd_request *req = NULL; 468 469 while (*p) { 470 parent = *p; 471 req = rb_entry(parent, struct ceph_osd_request, r_node); 472 if (new->r_tid < req->r_tid) 473 p = &(*p)->rb_left; 474 else if (new->r_tid > req->r_tid) 475 p = &(*p)->rb_right; 476 else 477 BUG(); 478 } 479 480 rb_link_node(&new->r_node, parent, p); 481 rb_insert_color(&new->r_node, &osdc->requests); 482 } 483 484 static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc, 485 u64 tid) 486 { 487 struct ceph_osd_request *req; 488 struct rb_node *n = osdc->requests.rb_node; 489 490 while (n) { 491 req = rb_entry(n, struct ceph_osd_request, r_node); 492 if (tid < req->r_tid) 493 n = n->rb_left; 494 else if (tid > req->r_tid) 495 n = n->rb_right; 496 else 497 return req; 498 } 499 return NULL; 500 } 501 502 static struct ceph_osd_request * 503 __lookup_request_ge(struct ceph_osd_client *osdc, 504 u64 tid) 505 { 506 struct ceph_osd_request *req; 507 struct rb_node *n = osdc->requests.rb_node; 508 509 while (n) { 510 req = rb_entry(n, struct ceph_osd_request, r_node); 511 if (tid < req->r_tid) { 512 if (!n->rb_left) 513 return req; 514 n = n->rb_left; 515 } else if (tid > req->r_tid) { 516 n = n->rb_right; 517 } else { 518 return req; 519 } 520 } 521 return NULL; 522 } 523 524 525 /* 526 * If the osd connection drops, we need to resubmit all requests. 527 */ 528 static void osd_reset(struct ceph_connection *con) 529 { 530 struct ceph_osd *osd = con->private; 531 struct ceph_osd_client *osdc; 532 533 if (!osd) 534 return; 535 dout("osd_reset osd%d\n", osd->o_osd); 536 osdc = osd->o_osdc; 537 down_read(&osdc->map_sem); 538 kick_requests(osdc, osd); 539 up_read(&osdc->map_sem); 540 } 541 542 /* 543 * Track open sessions with osds. 544 */ 545 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc) 546 { 547 struct ceph_osd *osd; 548 549 osd = kzalloc(sizeof(*osd), GFP_NOFS); 550 if (!osd) 551 return NULL; 552 553 atomic_set(&osd->o_ref, 1); 554 osd->o_osdc = osdc; 555 INIT_LIST_HEAD(&osd->o_requests); 556 INIT_LIST_HEAD(&osd->o_osd_lru); 557 osd->o_incarnation = 1; 558 559 ceph_con_init(osdc->client->msgr, &osd->o_con); 560 osd->o_con.private = osd; 561 osd->o_con.ops = &osd_con_ops; 562 osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD; 563 564 INIT_LIST_HEAD(&osd->o_keepalive_item); 565 return osd; 566 } 567 568 static struct ceph_osd *get_osd(struct ceph_osd *osd) 569 { 570 if (atomic_inc_not_zero(&osd->o_ref)) { 571 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1, 572 atomic_read(&osd->o_ref)); 573 return osd; 574 } else { 575 dout("get_osd %p FAIL\n", osd); 576 return NULL; 577 } 578 } 579 580 static void put_osd(struct ceph_osd *osd) 581 { 582 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref), 583 atomic_read(&osd->o_ref) - 1); 584 if (atomic_dec_and_test(&osd->o_ref)) { 585 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth; 586 587 if (osd->o_authorizer) 588 ac->ops->destroy_authorizer(ac, osd->o_authorizer); 589 kfree(osd); 590 } 591 } 592 593 /* 594 * remove an osd from our map 595 */ 596 static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 597 { 598 dout("__remove_osd %p\n", osd); 599 BUG_ON(!list_empty(&osd->o_requests)); 600 rb_erase(&osd->o_node, &osdc->osds); 601 list_del_init(&osd->o_osd_lru); 602 ceph_con_close(&osd->o_con); 603 put_osd(osd); 604 } 605 606 static void __move_osd_to_lru(struct ceph_osd_client *osdc, 607 struct ceph_osd *osd) 608 { 609 dout("__move_osd_to_lru %p\n", osd); 610 BUG_ON(!list_empty(&osd->o_osd_lru)); 611 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru); 612 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ; 613 } 614 615 static void __remove_osd_from_lru(struct ceph_osd *osd) 616 { 617 dout("__remove_osd_from_lru %p\n", osd); 618 if (!list_empty(&osd->o_osd_lru)) 619 list_del_init(&osd->o_osd_lru); 620 } 621 622 static void remove_old_osds(struct ceph_osd_client *osdc, int remove_all) 623 { 624 struct ceph_osd *osd, *nosd; 625 626 dout("__remove_old_osds %p\n", osdc); 627 mutex_lock(&osdc->request_mutex); 628 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) { 629 if (!remove_all && time_before(jiffies, osd->lru_ttl)) 630 break; 631 __remove_osd(osdc, osd); 632 } 633 mutex_unlock(&osdc->request_mutex); 634 } 635 636 /* 637 * reset osd connect 638 */ 639 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 640 { 641 struct ceph_osd_request *req; 642 int ret = 0; 643 644 dout("__reset_osd %p osd%d\n", osd, osd->o_osd); 645 if (list_empty(&osd->o_requests)) { 646 __remove_osd(osdc, osd); 647 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd], 648 &osd->o_con.peer_addr, 649 sizeof(osd->o_con.peer_addr)) == 0 && 650 !ceph_con_opened(&osd->o_con)) { 651 dout(" osd addr hasn't changed and connection never opened," 652 " letting msgr retry"); 653 /* touch each r_stamp for handle_timeout()'s benfit */ 654 list_for_each_entry(req, &osd->o_requests, r_osd_item) 655 req->r_stamp = jiffies; 656 ret = -EAGAIN; 657 } else { 658 ceph_con_close(&osd->o_con); 659 ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]); 660 osd->o_incarnation++; 661 } 662 return ret; 663 } 664 665 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new) 666 { 667 struct rb_node **p = &osdc->osds.rb_node; 668 struct rb_node *parent = NULL; 669 struct ceph_osd *osd = NULL; 670 671 while (*p) { 672 parent = *p; 673 osd = rb_entry(parent, struct ceph_osd, o_node); 674 if (new->o_osd < osd->o_osd) 675 p = &(*p)->rb_left; 676 else if (new->o_osd > osd->o_osd) 677 p = &(*p)->rb_right; 678 else 679 BUG(); 680 } 681 682 rb_link_node(&new->o_node, parent, p); 683 rb_insert_color(&new->o_node, &osdc->osds); 684 } 685 686 static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o) 687 { 688 struct ceph_osd *osd; 689 struct rb_node *n = osdc->osds.rb_node; 690 691 while (n) { 692 osd = rb_entry(n, struct ceph_osd, o_node); 693 if (o < osd->o_osd) 694 n = n->rb_left; 695 else if (o > osd->o_osd) 696 n = n->rb_right; 697 else 698 return osd; 699 } 700 return NULL; 701 } 702 703 static void __schedule_osd_timeout(struct ceph_osd_client *osdc) 704 { 705 schedule_delayed_work(&osdc->timeout_work, 706 osdc->client->options->osd_keepalive_timeout * HZ); 707 } 708 709 static void __cancel_osd_timeout(struct ceph_osd_client *osdc) 710 { 711 cancel_delayed_work(&osdc->timeout_work); 712 } 713 714 /* 715 * Register request, assign tid. If this is the first request, set up 716 * the timeout event. 717 */ 718 static void register_request(struct ceph_osd_client *osdc, 719 struct ceph_osd_request *req) 720 { 721 mutex_lock(&osdc->request_mutex); 722 req->r_tid = ++osdc->last_tid; 723 req->r_request->hdr.tid = cpu_to_le64(req->r_tid); 724 INIT_LIST_HEAD(&req->r_req_lru_item); 725 726 dout("register_request %p tid %lld\n", req, req->r_tid); 727 __insert_request(osdc, req); 728 ceph_osdc_get_request(req); 729 osdc->num_requests++; 730 731 if (osdc->num_requests == 1) { 732 dout(" first request, scheduling timeout\n"); 733 __schedule_osd_timeout(osdc); 734 } 735 mutex_unlock(&osdc->request_mutex); 736 } 737 738 /* 739 * called under osdc->request_mutex 740 */ 741 static void __unregister_request(struct ceph_osd_client *osdc, 742 struct ceph_osd_request *req) 743 { 744 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 745 rb_erase(&req->r_node, &osdc->requests); 746 osdc->num_requests--; 747 748 if (req->r_osd) { 749 /* make sure the original request isn't in flight. */ 750 ceph_con_revoke(&req->r_osd->o_con, req->r_request); 751 752 list_del_init(&req->r_osd_item); 753 if (list_empty(&req->r_osd->o_requests)) 754 __move_osd_to_lru(osdc, req->r_osd); 755 req->r_osd = NULL; 756 } 757 758 ceph_osdc_put_request(req); 759 760 list_del_init(&req->r_req_lru_item); 761 if (osdc->num_requests == 0) { 762 dout(" no requests, canceling timeout\n"); 763 __cancel_osd_timeout(osdc); 764 } 765 } 766 767 /* 768 * Cancel a previously queued request message 769 */ 770 static void __cancel_request(struct ceph_osd_request *req) 771 { 772 if (req->r_sent && req->r_osd) { 773 ceph_con_revoke(&req->r_osd->o_con, req->r_request); 774 req->r_sent = 0; 775 } 776 list_del_init(&req->r_req_lru_item); 777 } 778 779 /* 780 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct 781 * (as needed), and set the request r_osd appropriately. If there is 782 * no up osd, set r_osd to NULL. 783 * 784 * Return 0 if unchanged, 1 if changed, or negative on error. 785 * 786 * Caller should hold map_sem for read and request_mutex. 787 */ 788 static int __map_osds(struct ceph_osd_client *osdc, 789 struct ceph_osd_request *req) 790 { 791 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; 792 struct ceph_pg pgid; 793 int acting[CEPH_PG_MAX_SIZE]; 794 int o = -1, num = 0; 795 int err; 796 797 dout("map_osds %p tid %lld\n", req, req->r_tid); 798 err = ceph_calc_object_layout(&reqhead->layout, req->r_oid, 799 &req->r_file_layout, osdc->osdmap); 800 if (err) 801 return err; 802 pgid = reqhead->layout.ol_pgid; 803 req->r_pgid = pgid; 804 805 err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting); 806 if (err > 0) { 807 o = acting[0]; 808 num = err; 809 } 810 811 if ((req->r_osd && req->r_osd->o_osd == o && 812 req->r_sent >= req->r_osd->o_incarnation && 813 req->r_num_pg_osds == num && 814 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) || 815 (req->r_osd == NULL && o == -1)) 816 return 0; /* no change */ 817 818 dout("map_osds tid %llu pgid %d.%x osd%d (was osd%d)\n", 819 req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o, 820 req->r_osd ? req->r_osd->o_osd : -1); 821 822 /* record full pg acting set */ 823 memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num); 824 req->r_num_pg_osds = num; 825 826 if (req->r_osd) { 827 __cancel_request(req); 828 list_del_init(&req->r_osd_item); 829 req->r_osd = NULL; 830 } 831 832 req->r_osd = __lookup_osd(osdc, o); 833 if (!req->r_osd && o >= 0) { 834 err = -ENOMEM; 835 req->r_osd = create_osd(osdc); 836 if (!req->r_osd) 837 goto out; 838 839 dout("map_osds osd %p is osd%d\n", req->r_osd, o); 840 req->r_osd->o_osd = o; 841 req->r_osd->o_con.peer_name.num = cpu_to_le64(o); 842 __insert_osd(osdc, req->r_osd); 843 844 ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]); 845 } 846 847 if (req->r_osd) { 848 __remove_osd_from_lru(req->r_osd); 849 list_add(&req->r_osd_item, &req->r_osd->o_requests); 850 } 851 err = 1; /* osd or pg changed */ 852 853 out: 854 return err; 855 } 856 857 /* 858 * caller should hold map_sem (for read) and request_mutex 859 */ 860 static int __send_request(struct ceph_osd_client *osdc, 861 struct ceph_osd_request *req) 862 { 863 struct ceph_osd_request_head *reqhead; 864 int err; 865 866 err = __map_osds(osdc, req); 867 if (err < 0) 868 return err; 869 if (req->r_osd == NULL) { 870 dout("send_request %p no up osds in pg\n", req); 871 ceph_monc_request_next_osdmap(&osdc->client->monc); 872 return 0; 873 } 874 875 dout("send_request %p tid %llu to osd%d flags %d\n", 876 req, req->r_tid, req->r_osd->o_osd, req->r_flags); 877 878 reqhead = req->r_request->front.iov_base; 879 reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch); 880 reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */ 881 reqhead->reassert_version = req->r_reassert_version; 882 883 req->r_stamp = jiffies; 884 list_move_tail(&req->r_req_lru_item, &osdc->req_lru); 885 886 ceph_msg_get(req->r_request); /* send consumes a ref */ 887 ceph_con_send(&req->r_osd->o_con, req->r_request); 888 req->r_sent = req->r_osd->o_incarnation; 889 return 0; 890 } 891 892 /* 893 * Timeout callback, called every N seconds when 1 or more osd 894 * requests has been active for more than N seconds. When this 895 * happens, we ping all OSDs with requests who have timed out to 896 * ensure any communications channel reset is detected. Reset the 897 * request timeouts another N seconds in the future as we go. 898 * Reschedule the timeout event another N seconds in future (unless 899 * there are no open requests). 900 */ 901 static void handle_timeout(struct work_struct *work) 902 { 903 struct ceph_osd_client *osdc = 904 container_of(work, struct ceph_osd_client, timeout_work.work); 905 struct ceph_osd_request *req, *last_req = NULL; 906 struct ceph_osd *osd; 907 unsigned long timeout = osdc->client->options->osd_timeout * HZ; 908 unsigned long keepalive = 909 osdc->client->options->osd_keepalive_timeout * HZ; 910 unsigned long last_stamp = 0; 911 struct rb_node *p; 912 struct list_head slow_osds; 913 914 dout("timeout\n"); 915 down_read(&osdc->map_sem); 916 917 ceph_monc_request_next_osdmap(&osdc->client->monc); 918 919 mutex_lock(&osdc->request_mutex); 920 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { 921 req = rb_entry(p, struct ceph_osd_request, r_node); 922 923 if (req->r_resend) { 924 int err; 925 926 dout("osdc resending prev failed %lld\n", req->r_tid); 927 err = __send_request(osdc, req); 928 if (err) 929 dout("osdc failed again on %lld\n", req->r_tid); 930 else 931 req->r_resend = false; 932 continue; 933 } 934 } 935 936 /* 937 * reset osds that appear to be _really_ unresponsive. this 938 * is a failsafe measure.. we really shouldn't be getting to 939 * this point if the system is working properly. the monitors 940 * should mark the osd as failed and we should find out about 941 * it from an updated osd map. 942 */ 943 while (timeout && !list_empty(&osdc->req_lru)) { 944 req = list_entry(osdc->req_lru.next, struct ceph_osd_request, 945 r_req_lru_item); 946 947 if (time_before(jiffies, req->r_stamp + timeout)) 948 break; 949 950 BUG_ON(req == last_req && req->r_stamp == last_stamp); 951 last_req = req; 952 last_stamp = req->r_stamp; 953 954 osd = req->r_osd; 955 BUG_ON(!osd); 956 pr_warning(" tid %llu timed out on osd%d, will reset osd\n", 957 req->r_tid, osd->o_osd); 958 __kick_requests(osdc, osd); 959 } 960 961 /* 962 * ping osds that are a bit slow. this ensures that if there 963 * is a break in the TCP connection we will notice, and reopen 964 * a connection with that osd (from the fault callback). 965 */ 966 INIT_LIST_HEAD(&slow_osds); 967 list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) { 968 if (time_before(jiffies, req->r_stamp + keepalive)) 969 break; 970 971 osd = req->r_osd; 972 BUG_ON(!osd); 973 dout(" tid %llu is slow, will send keepalive on osd%d\n", 974 req->r_tid, osd->o_osd); 975 list_move_tail(&osd->o_keepalive_item, &slow_osds); 976 } 977 while (!list_empty(&slow_osds)) { 978 osd = list_entry(slow_osds.next, struct ceph_osd, 979 o_keepalive_item); 980 list_del_init(&osd->o_keepalive_item); 981 ceph_con_keepalive(&osd->o_con); 982 } 983 984 __schedule_osd_timeout(osdc); 985 mutex_unlock(&osdc->request_mutex); 986 987 up_read(&osdc->map_sem); 988 } 989 990 static void handle_osds_timeout(struct work_struct *work) 991 { 992 struct ceph_osd_client *osdc = 993 container_of(work, struct ceph_osd_client, 994 osds_timeout_work.work); 995 unsigned long delay = 996 osdc->client->options->osd_idle_ttl * HZ >> 2; 997 998 dout("osds timeout\n"); 999 down_read(&osdc->map_sem); 1000 remove_old_osds(osdc, 0); 1001 up_read(&osdc->map_sem); 1002 1003 schedule_delayed_work(&osdc->osds_timeout_work, 1004 round_jiffies_relative(delay)); 1005 } 1006 1007 /* 1008 * handle osd op reply. either call the callback if it is specified, 1009 * or do the completion to wake up the waiting thread. 1010 */ 1011 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, 1012 struct ceph_connection *con) 1013 { 1014 struct ceph_osd_reply_head *rhead = msg->front.iov_base; 1015 struct ceph_osd_request *req; 1016 u64 tid; 1017 int numops, object_len, flags; 1018 s32 result; 1019 1020 tid = le64_to_cpu(msg->hdr.tid); 1021 if (msg->front.iov_len < sizeof(*rhead)) 1022 goto bad; 1023 numops = le32_to_cpu(rhead->num_ops); 1024 object_len = le32_to_cpu(rhead->object_len); 1025 result = le32_to_cpu(rhead->result); 1026 if (msg->front.iov_len != sizeof(*rhead) + object_len + 1027 numops * sizeof(struct ceph_osd_op)) 1028 goto bad; 1029 dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result); 1030 1031 /* lookup */ 1032 mutex_lock(&osdc->request_mutex); 1033 req = __lookup_request(osdc, tid); 1034 if (req == NULL) { 1035 dout("handle_reply tid %llu dne\n", tid); 1036 mutex_unlock(&osdc->request_mutex); 1037 return; 1038 } 1039 ceph_osdc_get_request(req); 1040 flags = le32_to_cpu(rhead->flags); 1041 1042 /* 1043 * if this connection filled our message, drop our reference now, to 1044 * avoid a (safe but slower) revoke later. 1045 */ 1046 if (req->r_con_filling_msg == con && req->r_reply == msg) { 1047 dout(" dropping con_filling_msg ref %p\n", con); 1048 req->r_con_filling_msg = NULL; 1049 ceph_con_put(con); 1050 } 1051 1052 if (!req->r_got_reply) { 1053 unsigned bytes; 1054 1055 req->r_result = le32_to_cpu(rhead->result); 1056 bytes = le32_to_cpu(msg->hdr.data_len); 1057 dout("handle_reply result %d bytes %d\n", req->r_result, 1058 bytes); 1059 if (req->r_result == 0) 1060 req->r_result = bytes; 1061 1062 /* in case this is a write and we need to replay, */ 1063 req->r_reassert_version = rhead->reassert_version; 1064 1065 req->r_got_reply = 1; 1066 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { 1067 dout("handle_reply tid %llu dup ack\n", tid); 1068 mutex_unlock(&osdc->request_mutex); 1069 goto done; 1070 } 1071 1072 dout("handle_reply tid %llu flags %d\n", tid, flags); 1073 1074 /* either this is a read, or we got the safe response */ 1075 if (result < 0 || 1076 (flags & CEPH_OSD_FLAG_ONDISK) || 1077 ((flags & CEPH_OSD_FLAG_WRITE) == 0)) 1078 __unregister_request(osdc, req); 1079 1080 mutex_unlock(&osdc->request_mutex); 1081 1082 if (req->r_callback) 1083 req->r_callback(req, msg); 1084 else 1085 complete_all(&req->r_completion); 1086 1087 if (flags & CEPH_OSD_FLAG_ONDISK) { 1088 if (req->r_safe_callback) 1089 req->r_safe_callback(req, msg); 1090 complete_all(&req->r_safe_completion); /* fsync waiter */ 1091 } 1092 1093 done: 1094 ceph_osdc_put_request(req); 1095 return; 1096 1097 bad: 1098 pr_err("corrupt osd_op_reply got %d %d expected %d\n", 1099 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len), 1100 (int)sizeof(*rhead)); 1101 ceph_msg_dump(msg); 1102 } 1103 1104 1105 static int __kick_requests(struct ceph_osd_client *osdc, 1106 struct ceph_osd *kickosd) 1107 { 1108 struct ceph_osd_request *req; 1109 struct rb_node *p, *n; 1110 int needmap = 0; 1111 int err; 1112 1113 dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1); 1114 if (kickosd) { 1115 err = __reset_osd(osdc, kickosd); 1116 if (err == -EAGAIN) 1117 return 1; 1118 } else { 1119 for (p = rb_first(&osdc->osds); p; p = n) { 1120 struct ceph_osd *osd = 1121 rb_entry(p, struct ceph_osd, o_node); 1122 1123 n = rb_next(p); 1124 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) || 1125 memcmp(&osd->o_con.peer_addr, 1126 ceph_osd_addr(osdc->osdmap, 1127 osd->o_osd), 1128 sizeof(struct ceph_entity_addr)) != 0) 1129 __reset_osd(osdc, osd); 1130 } 1131 } 1132 1133 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { 1134 req = rb_entry(p, struct ceph_osd_request, r_node); 1135 1136 if (req->r_resend) { 1137 dout(" r_resend set on tid %llu\n", req->r_tid); 1138 __cancel_request(req); 1139 goto kick; 1140 } 1141 if (req->r_osd && kickosd == req->r_osd) { 1142 __cancel_request(req); 1143 goto kick; 1144 } 1145 1146 err = __map_osds(osdc, req); 1147 if (err == 0) 1148 continue; /* no change */ 1149 if (err < 0) { 1150 /* 1151 * FIXME: really, we should set the request 1152 * error and fail if this isn't a 'nofail' 1153 * request, but that's a fair bit more 1154 * complicated to do. So retry! 1155 */ 1156 dout(" setting r_resend on %llu\n", req->r_tid); 1157 req->r_resend = true; 1158 continue; 1159 } 1160 if (req->r_osd == NULL) { 1161 dout("tid %llu maps to no valid osd\n", req->r_tid); 1162 needmap++; /* request a newer map */ 1163 continue; 1164 } 1165 1166 kick: 1167 dout("kicking %p tid %llu osd%d\n", req, req->r_tid, 1168 req->r_osd ? req->r_osd->o_osd : -1); 1169 req->r_flags |= CEPH_OSD_FLAG_RETRY; 1170 err = __send_request(osdc, req); 1171 if (err) { 1172 dout(" setting r_resend on %llu\n", req->r_tid); 1173 req->r_resend = true; 1174 } 1175 } 1176 1177 return needmap; 1178 } 1179 1180 /* 1181 * Resubmit osd requests whose osd or osd address has changed. Request 1182 * a new osd map if osds are down, or we are otherwise unable to determine 1183 * how to direct a request. 1184 * 1185 * Close connections to down osds. 1186 * 1187 * If @who is specified, resubmit requests for that specific osd. 1188 * 1189 * Caller should hold map_sem for read and request_mutex. 1190 */ 1191 static void kick_requests(struct ceph_osd_client *osdc, 1192 struct ceph_osd *kickosd) 1193 { 1194 int needmap; 1195 1196 mutex_lock(&osdc->request_mutex); 1197 needmap = __kick_requests(osdc, kickosd); 1198 mutex_unlock(&osdc->request_mutex); 1199 1200 if (needmap) { 1201 dout("%d requests for down osds, need new map\n", needmap); 1202 ceph_monc_request_next_osdmap(&osdc->client->monc); 1203 } 1204 1205 } 1206 /* 1207 * Process updated osd map. 1208 * 1209 * The message contains any number of incremental and full maps, normally 1210 * indicating some sort of topology change in the cluster. Kick requests 1211 * off to different OSDs as needed. 1212 */ 1213 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) 1214 { 1215 void *p, *end, *next; 1216 u32 nr_maps, maplen; 1217 u32 epoch; 1218 struct ceph_osdmap *newmap = NULL, *oldmap; 1219 int err; 1220 struct ceph_fsid fsid; 1221 1222 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0); 1223 p = msg->front.iov_base; 1224 end = p + msg->front.iov_len; 1225 1226 /* verify fsid */ 1227 ceph_decode_need(&p, end, sizeof(fsid), bad); 1228 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 1229 if (ceph_check_fsid(osdc->client, &fsid) < 0) 1230 return; 1231 1232 down_write(&osdc->map_sem); 1233 1234 /* incremental maps */ 1235 ceph_decode_32_safe(&p, end, nr_maps, bad); 1236 dout(" %d inc maps\n", nr_maps); 1237 while (nr_maps > 0) { 1238 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 1239 epoch = ceph_decode_32(&p); 1240 maplen = ceph_decode_32(&p); 1241 ceph_decode_need(&p, end, maplen, bad); 1242 next = p + maplen; 1243 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) { 1244 dout("applying incremental map %u len %d\n", 1245 epoch, maplen); 1246 newmap = osdmap_apply_incremental(&p, next, 1247 osdc->osdmap, 1248 osdc->client->msgr); 1249 if (IS_ERR(newmap)) { 1250 err = PTR_ERR(newmap); 1251 goto bad; 1252 } 1253 BUG_ON(!newmap); 1254 if (newmap != osdc->osdmap) { 1255 ceph_osdmap_destroy(osdc->osdmap); 1256 osdc->osdmap = newmap; 1257 } 1258 } else { 1259 dout("ignoring incremental map %u len %d\n", 1260 epoch, maplen); 1261 } 1262 p = next; 1263 nr_maps--; 1264 } 1265 if (newmap) 1266 goto done; 1267 1268 /* full maps */ 1269 ceph_decode_32_safe(&p, end, nr_maps, bad); 1270 dout(" %d full maps\n", nr_maps); 1271 while (nr_maps) { 1272 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 1273 epoch = ceph_decode_32(&p); 1274 maplen = ceph_decode_32(&p); 1275 ceph_decode_need(&p, end, maplen, bad); 1276 if (nr_maps > 1) { 1277 dout("skipping non-latest full map %u len %d\n", 1278 epoch, maplen); 1279 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) { 1280 dout("skipping full map %u len %d, " 1281 "older than our %u\n", epoch, maplen, 1282 osdc->osdmap->epoch); 1283 } else { 1284 dout("taking full map %u len %d\n", epoch, maplen); 1285 newmap = osdmap_decode(&p, p+maplen); 1286 if (IS_ERR(newmap)) { 1287 err = PTR_ERR(newmap); 1288 goto bad; 1289 } 1290 BUG_ON(!newmap); 1291 oldmap = osdc->osdmap; 1292 osdc->osdmap = newmap; 1293 if (oldmap) 1294 ceph_osdmap_destroy(oldmap); 1295 } 1296 p += maplen; 1297 nr_maps--; 1298 } 1299 1300 done: 1301 downgrade_write(&osdc->map_sem); 1302 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch); 1303 if (newmap) 1304 kick_requests(osdc, NULL); 1305 up_read(&osdc->map_sem); 1306 wake_up_all(&osdc->client->auth_wq); 1307 return; 1308 1309 bad: 1310 pr_err("osdc handle_map corrupt msg\n"); 1311 ceph_msg_dump(msg); 1312 up_write(&osdc->map_sem); 1313 return; 1314 } 1315 1316 /* 1317 * Register request, send initial attempt. 1318 */ 1319 int ceph_osdc_start_request(struct ceph_osd_client *osdc, 1320 struct ceph_osd_request *req, 1321 bool nofail) 1322 { 1323 int rc = 0; 1324 1325 req->r_request->pages = req->r_pages; 1326 req->r_request->nr_pages = req->r_num_pages; 1327 #ifdef CONFIG_BLOCK 1328 req->r_request->bio = req->r_bio; 1329 #endif 1330 req->r_request->trail = req->r_trail; 1331 1332 register_request(osdc, req); 1333 1334 down_read(&osdc->map_sem); 1335 mutex_lock(&osdc->request_mutex); 1336 /* 1337 * a racing kick_requests() may have sent the message for us 1338 * while we dropped request_mutex above, so only send now if 1339 * the request still han't been touched yet. 1340 */ 1341 if (req->r_sent == 0) { 1342 rc = __send_request(osdc, req); 1343 if (rc) { 1344 if (nofail) { 1345 dout("osdc_start_request failed send, " 1346 " marking %lld\n", req->r_tid); 1347 req->r_resend = true; 1348 rc = 0; 1349 } else { 1350 __unregister_request(osdc, req); 1351 } 1352 } 1353 } 1354 mutex_unlock(&osdc->request_mutex); 1355 up_read(&osdc->map_sem); 1356 return rc; 1357 } 1358 EXPORT_SYMBOL(ceph_osdc_start_request); 1359 1360 /* 1361 * wait for a request to complete 1362 */ 1363 int ceph_osdc_wait_request(struct ceph_osd_client *osdc, 1364 struct ceph_osd_request *req) 1365 { 1366 int rc; 1367 1368 rc = wait_for_completion_interruptible(&req->r_completion); 1369 if (rc < 0) { 1370 mutex_lock(&osdc->request_mutex); 1371 __cancel_request(req); 1372 __unregister_request(osdc, req); 1373 mutex_unlock(&osdc->request_mutex); 1374 dout("wait_request tid %llu canceled/timed out\n", req->r_tid); 1375 return rc; 1376 } 1377 1378 dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result); 1379 return req->r_result; 1380 } 1381 EXPORT_SYMBOL(ceph_osdc_wait_request); 1382 1383 /* 1384 * sync - wait for all in-flight requests to flush. avoid starvation. 1385 */ 1386 void ceph_osdc_sync(struct ceph_osd_client *osdc) 1387 { 1388 struct ceph_osd_request *req; 1389 u64 last_tid, next_tid = 0; 1390 1391 mutex_lock(&osdc->request_mutex); 1392 last_tid = osdc->last_tid; 1393 while (1) { 1394 req = __lookup_request_ge(osdc, next_tid); 1395 if (!req) 1396 break; 1397 if (req->r_tid > last_tid) 1398 break; 1399 1400 next_tid = req->r_tid + 1; 1401 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0) 1402 continue; 1403 1404 ceph_osdc_get_request(req); 1405 mutex_unlock(&osdc->request_mutex); 1406 dout("sync waiting on tid %llu (last is %llu)\n", 1407 req->r_tid, last_tid); 1408 wait_for_completion(&req->r_safe_completion); 1409 mutex_lock(&osdc->request_mutex); 1410 ceph_osdc_put_request(req); 1411 } 1412 mutex_unlock(&osdc->request_mutex); 1413 dout("sync done (thru tid %llu)\n", last_tid); 1414 } 1415 EXPORT_SYMBOL(ceph_osdc_sync); 1416 1417 /* 1418 * init, shutdown 1419 */ 1420 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) 1421 { 1422 int err; 1423 1424 dout("init\n"); 1425 osdc->client = client; 1426 osdc->osdmap = NULL; 1427 init_rwsem(&osdc->map_sem); 1428 init_completion(&osdc->map_waiters); 1429 osdc->last_requested_map = 0; 1430 mutex_init(&osdc->request_mutex); 1431 osdc->last_tid = 0; 1432 osdc->osds = RB_ROOT; 1433 INIT_LIST_HEAD(&osdc->osd_lru); 1434 osdc->requests = RB_ROOT; 1435 INIT_LIST_HEAD(&osdc->req_lru); 1436 osdc->num_requests = 0; 1437 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); 1438 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); 1439 1440 schedule_delayed_work(&osdc->osds_timeout_work, 1441 round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ)); 1442 1443 err = -ENOMEM; 1444 osdc->req_mempool = mempool_create_kmalloc_pool(10, 1445 sizeof(struct ceph_osd_request)); 1446 if (!osdc->req_mempool) 1447 goto out; 1448 1449 err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true, 1450 "osd_op"); 1451 if (err < 0) 1452 goto out_mempool; 1453 err = ceph_msgpool_init(&osdc->msgpool_op_reply, 1454 OSD_OPREPLY_FRONT_LEN, 10, true, 1455 "osd_op_reply"); 1456 if (err < 0) 1457 goto out_msgpool; 1458 return 0; 1459 1460 out_msgpool: 1461 ceph_msgpool_destroy(&osdc->msgpool_op); 1462 out_mempool: 1463 mempool_destroy(osdc->req_mempool); 1464 out: 1465 return err; 1466 } 1467 EXPORT_SYMBOL(ceph_osdc_init); 1468 1469 void ceph_osdc_stop(struct ceph_osd_client *osdc) 1470 { 1471 cancel_delayed_work_sync(&osdc->timeout_work); 1472 cancel_delayed_work_sync(&osdc->osds_timeout_work); 1473 if (osdc->osdmap) { 1474 ceph_osdmap_destroy(osdc->osdmap); 1475 osdc->osdmap = NULL; 1476 } 1477 remove_old_osds(osdc, 1); 1478 mempool_destroy(osdc->req_mempool); 1479 ceph_msgpool_destroy(&osdc->msgpool_op); 1480 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 1481 } 1482 EXPORT_SYMBOL(ceph_osdc_stop); 1483 1484 /* 1485 * Read some contiguous pages. If we cross a stripe boundary, shorten 1486 * *plen. Return number of bytes read, or error. 1487 */ 1488 int ceph_osdc_readpages(struct ceph_osd_client *osdc, 1489 struct ceph_vino vino, struct ceph_file_layout *layout, 1490 u64 off, u64 *plen, 1491 u32 truncate_seq, u64 truncate_size, 1492 struct page **pages, int num_pages) 1493 { 1494 struct ceph_osd_request *req; 1495 int rc = 0; 1496 1497 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino, 1498 vino.snap, off, *plen); 1499 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1500 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 1501 NULL, 0, truncate_seq, truncate_size, NULL, 1502 false, 1); 1503 if (!req) 1504 return -ENOMEM; 1505 1506 /* it may be a short read due to an object boundary */ 1507 req->r_pages = pages; 1508 1509 dout("readpages final extent is %llu~%llu (%d pages)\n", 1510 off, *plen, req->r_num_pages); 1511 1512 rc = ceph_osdc_start_request(osdc, req, false); 1513 if (!rc) 1514 rc = ceph_osdc_wait_request(osdc, req); 1515 1516 ceph_osdc_put_request(req); 1517 dout("readpages result %d\n", rc); 1518 return rc; 1519 } 1520 EXPORT_SYMBOL(ceph_osdc_readpages); 1521 1522 /* 1523 * do a synchronous write on N pages 1524 */ 1525 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, 1526 struct ceph_file_layout *layout, 1527 struct ceph_snap_context *snapc, 1528 u64 off, u64 len, 1529 u32 truncate_seq, u64 truncate_size, 1530 struct timespec *mtime, 1531 struct page **pages, int num_pages, 1532 int flags, int do_sync, bool nofail) 1533 { 1534 struct ceph_osd_request *req; 1535 int rc = 0; 1536 1537 BUG_ON(vino.snap != CEPH_NOSNAP); 1538 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1539 CEPH_OSD_OP_WRITE, 1540 flags | CEPH_OSD_FLAG_ONDISK | 1541 CEPH_OSD_FLAG_WRITE, 1542 snapc, do_sync, 1543 truncate_seq, truncate_size, mtime, 1544 nofail, 1); 1545 if (!req) 1546 return -ENOMEM; 1547 1548 /* it may be a short write due to an object boundary */ 1549 req->r_pages = pages; 1550 dout("writepages %llu~%llu (%d pages)\n", off, len, 1551 req->r_num_pages); 1552 1553 rc = ceph_osdc_start_request(osdc, req, nofail); 1554 if (!rc) 1555 rc = ceph_osdc_wait_request(osdc, req); 1556 1557 ceph_osdc_put_request(req); 1558 if (rc == 0) 1559 rc = len; 1560 dout("writepages result %d\n", rc); 1561 return rc; 1562 } 1563 EXPORT_SYMBOL(ceph_osdc_writepages); 1564 1565 /* 1566 * handle incoming message 1567 */ 1568 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 1569 { 1570 struct ceph_osd *osd = con->private; 1571 struct ceph_osd_client *osdc; 1572 int type = le16_to_cpu(msg->hdr.type); 1573 1574 if (!osd) 1575 goto out; 1576 osdc = osd->o_osdc; 1577 1578 switch (type) { 1579 case CEPH_MSG_OSD_MAP: 1580 ceph_osdc_handle_map(osdc, msg); 1581 break; 1582 case CEPH_MSG_OSD_OPREPLY: 1583 handle_reply(osdc, msg, con); 1584 break; 1585 1586 default: 1587 pr_err("received unknown message type %d %s\n", type, 1588 ceph_msg_type_name(type)); 1589 } 1590 out: 1591 ceph_msg_put(msg); 1592 } 1593 1594 /* 1595 * lookup and return message for incoming reply. set up reply message 1596 * pages. 1597 */ 1598 static struct ceph_msg *get_reply(struct ceph_connection *con, 1599 struct ceph_msg_header *hdr, 1600 int *skip) 1601 { 1602 struct ceph_osd *osd = con->private; 1603 struct ceph_osd_client *osdc = osd->o_osdc; 1604 struct ceph_msg *m; 1605 struct ceph_osd_request *req; 1606 int front = le32_to_cpu(hdr->front_len); 1607 int data_len = le32_to_cpu(hdr->data_len); 1608 u64 tid; 1609 1610 tid = le64_to_cpu(hdr->tid); 1611 mutex_lock(&osdc->request_mutex); 1612 req = __lookup_request(osdc, tid); 1613 if (!req) { 1614 *skip = 1; 1615 m = NULL; 1616 pr_info("get_reply unknown tid %llu from osd%d\n", tid, 1617 osd->o_osd); 1618 goto out; 1619 } 1620 1621 if (req->r_con_filling_msg) { 1622 dout("get_reply revoking msg %p from old con %p\n", 1623 req->r_reply, req->r_con_filling_msg); 1624 ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply); 1625 ceph_con_put(req->r_con_filling_msg); 1626 req->r_con_filling_msg = NULL; 1627 } 1628 1629 if (front > req->r_reply->front.iov_len) { 1630 pr_warning("get_reply front %d > preallocated %d\n", 1631 front, (int)req->r_reply->front.iov_len); 1632 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS); 1633 if (!m) 1634 goto out; 1635 ceph_msg_put(req->r_reply); 1636 req->r_reply = m; 1637 } 1638 m = ceph_msg_get(req->r_reply); 1639 1640 if (data_len > 0) { 1641 unsigned data_off = le16_to_cpu(hdr->data_off); 1642 int want = calc_pages_for(data_off & ~PAGE_MASK, data_len); 1643 1644 if (unlikely(req->r_num_pages < want)) { 1645 pr_warning("tid %lld reply %d > expected %d pages\n", 1646 tid, want, m->nr_pages); 1647 *skip = 1; 1648 ceph_msg_put(m); 1649 m = NULL; 1650 goto out; 1651 } 1652 m->pages = req->r_pages; 1653 m->nr_pages = req->r_num_pages; 1654 #ifdef CONFIG_BLOCK 1655 m->bio = req->r_bio; 1656 #endif 1657 } 1658 *skip = 0; 1659 req->r_con_filling_msg = ceph_con_get(con); 1660 dout("get_reply tid %lld %p\n", tid, m); 1661 1662 out: 1663 mutex_unlock(&osdc->request_mutex); 1664 return m; 1665 1666 } 1667 1668 static struct ceph_msg *alloc_msg(struct ceph_connection *con, 1669 struct ceph_msg_header *hdr, 1670 int *skip) 1671 { 1672 struct ceph_osd *osd = con->private; 1673 int type = le16_to_cpu(hdr->type); 1674 int front = le32_to_cpu(hdr->front_len); 1675 1676 switch (type) { 1677 case CEPH_MSG_OSD_MAP: 1678 return ceph_msg_new(type, front, GFP_NOFS); 1679 case CEPH_MSG_OSD_OPREPLY: 1680 return get_reply(con, hdr, skip); 1681 default: 1682 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type, 1683 osd->o_osd); 1684 *skip = 1; 1685 return NULL; 1686 } 1687 } 1688 1689 /* 1690 * Wrappers to refcount containing ceph_osd struct 1691 */ 1692 static struct ceph_connection *get_osd_con(struct ceph_connection *con) 1693 { 1694 struct ceph_osd *osd = con->private; 1695 if (get_osd(osd)) 1696 return con; 1697 return NULL; 1698 } 1699 1700 static void put_osd_con(struct ceph_connection *con) 1701 { 1702 struct ceph_osd *osd = con->private; 1703 put_osd(osd); 1704 } 1705 1706 /* 1707 * authentication 1708 */ 1709 static int get_authorizer(struct ceph_connection *con, 1710 void **buf, int *len, int *proto, 1711 void **reply_buf, int *reply_len, int force_new) 1712 { 1713 struct ceph_osd *o = con->private; 1714 struct ceph_osd_client *osdc = o->o_osdc; 1715 struct ceph_auth_client *ac = osdc->client->monc.auth; 1716 int ret = 0; 1717 1718 if (force_new && o->o_authorizer) { 1719 ac->ops->destroy_authorizer(ac, o->o_authorizer); 1720 o->o_authorizer = NULL; 1721 } 1722 if (o->o_authorizer == NULL) { 1723 ret = ac->ops->create_authorizer( 1724 ac, CEPH_ENTITY_TYPE_OSD, 1725 &o->o_authorizer, 1726 &o->o_authorizer_buf, 1727 &o->o_authorizer_buf_len, 1728 &o->o_authorizer_reply_buf, 1729 &o->o_authorizer_reply_buf_len); 1730 if (ret) 1731 return ret; 1732 } 1733 1734 *proto = ac->protocol; 1735 *buf = o->o_authorizer_buf; 1736 *len = o->o_authorizer_buf_len; 1737 *reply_buf = o->o_authorizer_reply_buf; 1738 *reply_len = o->o_authorizer_reply_buf_len; 1739 return 0; 1740 } 1741 1742 1743 static int verify_authorizer_reply(struct ceph_connection *con, int len) 1744 { 1745 struct ceph_osd *o = con->private; 1746 struct ceph_osd_client *osdc = o->o_osdc; 1747 struct ceph_auth_client *ac = osdc->client->monc.auth; 1748 1749 return ac->ops->verify_authorizer_reply(ac, o->o_authorizer, len); 1750 } 1751 1752 static int invalidate_authorizer(struct ceph_connection *con) 1753 { 1754 struct ceph_osd *o = con->private; 1755 struct ceph_osd_client *osdc = o->o_osdc; 1756 struct ceph_auth_client *ac = osdc->client->monc.auth; 1757 1758 if (ac->ops->invalidate_authorizer) 1759 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD); 1760 1761 return ceph_monc_validate_auth(&osdc->client->monc); 1762 } 1763 1764 static const struct ceph_connection_operations osd_con_ops = { 1765 .get = get_osd_con, 1766 .put = put_osd_con, 1767 .dispatch = dispatch, 1768 .get_authorizer = get_authorizer, 1769 .verify_authorizer_reply = verify_authorizer_reply, 1770 .invalidate_authorizer = invalidate_authorizer, 1771 .alloc_msg = alloc_msg, 1772 .fault = osd_reset, 1773 }; 1774