1 // SPDX-License-Identifier: GPL-2.0 2 3 #include <linux/ceph/ceph_debug.h> 4 5 #include <linux/module.h> 6 #include <linux/err.h> 7 #include <linux/highmem.h> 8 #include <linux/mm.h> 9 #include <linux/pagemap.h> 10 #include <linux/slab.h> 11 #include <linux/uaccess.h> 12 #ifdef CONFIG_BLOCK 13 #include <linux/bio.h> 14 #endif 15 16 #include <linux/ceph/ceph_features.h> 17 #include <linux/ceph/libceph.h> 18 #include <linux/ceph/osd_client.h> 19 #include <linux/ceph/messenger.h> 20 #include <linux/ceph/decode.h> 21 #include <linux/ceph/auth.h> 22 #include <linux/ceph/pagelist.h> 23 #include <linux/ceph/striper.h> 24 25 #define OSD_OPREPLY_FRONT_LEN 512 26 27 static struct kmem_cache *ceph_osd_request_cache; 28 29 static const struct ceph_connection_operations osd_con_ops; 30 31 /* 32 * Implement client access to distributed object storage cluster. 33 * 34 * All data objects are stored within a cluster/cloud of OSDs, or 35 * "object storage devices." (Note that Ceph OSDs have _nothing_ to 36 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply 37 * remote daemons serving up and coordinating consistent and safe 38 * access to storage. 39 * 40 * Cluster membership and the mapping of data objects onto storage devices 41 * are described by the osd map. 42 * 43 * We keep track of pending OSD requests (read, write), resubmit 44 * requests to different OSDs when the cluster topology/data layout 45 * change, or retry the affected requests when the communications 46 * channel with an OSD is reset. 47 */ 48 49 static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req); 50 static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req); 51 static void link_linger(struct ceph_osd *osd, 52 struct ceph_osd_linger_request *lreq); 53 static void unlink_linger(struct ceph_osd *osd, 54 struct ceph_osd_linger_request *lreq); 55 static void clear_backoffs(struct ceph_osd *osd); 56 57 #if 1 58 static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem) 59 { 60 bool wrlocked = true; 61 62 if (unlikely(down_read_trylock(sem))) { 63 wrlocked = false; 64 up_read(sem); 65 } 66 67 return wrlocked; 68 } 69 static inline void verify_osdc_locked(struct ceph_osd_client *osdc) 70 { 71 WARN_ON(!rwsem_is_locked(&osdc->lock)); 72 } 73 static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) 74 { 75 WARN_ON(!rwsem_is_wrlocked(&osdc->lock)); 76 } 77 static inline void verify_osd_locked(struct ceph_osd *osd) 78 { 79 struct ceph_osd_client *osdc = osd->o_osdc; 80 81 WARN_ON(!(mutex_is_locked(&osd->lock) && 82 rwsem_is_locked(&osdc->lock)) && 83 !rwsem_is_wrlocked(&osdc->lock)); 84 } 85 static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) 86 { 87 WARN_ON(!mutex_is_locked(&lreq->lock)); 88 } 89 #else 90 static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { } 91 static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { } 92 static inline void verify_osd_locked(struct ceph_osd *osd) { } 93 static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) { } 94 #endif 95 96 /* 97 * calculate the mapping of a file extent onto an object, and fill out the 98 * request accordingly. shorten extent as necessary if it crosses an 99 * object boundary. 100 * 101 * fill osd op in request message. 102 */ 103 static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen, 104 u64 *objnum, u64 *objoff, u64 *objlen) 105 { 106 u64 orig_len = *plen; 107 u32 xlen; 108 109 /* object extent? */ 110 ceph_calc_file_object_mapping(layout, off, orig_len, objnum, 111 objoff, &xlen); 112 *objlen = xlen; 113 if (*objlen < orig_len) { 114 *plen = *objlen; 115 dout(" skipping last %llu, final file extent %llu~%llu\n", 116 orig_len - *plen, off, *plen); 117 } 118 119 dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen); 120 return 0; 121 } 122 123 static void ceph_osd_data_init(struct ceph_osd_data *osd_data) 124 { 125 memset(osd_data, 0, sizeof (*osd_data)); 126 osd_data->type = CEPH_OSD_DATA_TYPE_NONE; 127 } 128 129 static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data, 130 struct page **pages, u64 length, u32 alignment, 131 bool pages_from_pool, bool own_pages) 132 { 133 osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; 134 osd_data->pages = pages; 135 osd_data->length = length; 136 osd_data->alignment = alignment; 137 osd_data->pages_from_pool = pages_from_pool; 138 osd_data->own_pages = own_pages; 139 } 140 141 static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data, 142 struct ceph_pagelist *pagelist) 143 { 144 osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST; 145 osd_data->pagelist = pagelist; 146 } 147 148 #ifdef CONFIG_BLOCK 149 static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data, 150 struct ceph_bio_iter *bio_pos, 151 u32 bio_length) 152 { 153 osd_data->type = CEPH_OSD_DATA_TYPE_BIO; 154 osd_data->bio_pos = *bio_pos; 155 osd_data->bio_length = bio_length; 156 } 157 #endif /* CONFIG_BLOCK */ 158 159 static void ceph_osd_data_bvecs_init(struct ceph_osd_data *osd_data, 160 struct ceph_bvec_iter *bvec_pos) 161 { 162 osd_data->type = CEPH_OSD_DATA_TYPE_BVECS; 163 osd_data->bvec_pos = *bvec_pos; 164 } 165 166 #define osd_req_op_data(oreq, whch, typ, fld) \ 167 ({ \ 168 struct ceph_osd_request *__oreq = (oreq); \ 169 unsigned int __whch = (whch); \ 170 BUG_ON(__whch >= __oreq->r_num_ops); \ 171 &__oreq->r_ops[__whch].typ.fld; \ 172 }) 173 174 static struct ceph_osd_data * 175 osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which) 176 { 177 BUG_ON(which >= osd_req->r_num_ops); 178 179 return &osd_req->r_ops[which].raw_data_in; 180 } 181 182 struct ceph_osd_data * 183 osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req, 184 unsigned int which) 185 { 186 return osd_req_op_data(osd_req, which, extent, osd_data); 187 } 188 EXPORT_SYMBOL(osd_req_op_extent_osd_data); 189 190 void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req, 191 unsigned int which, struct page **pages, 192 u64 length, u32 alignment, 193 bool pages_from_pool, bool own_pages) 194 { 195 struct ceph_osd_data *osd_data; 196 197 osd_data = osd_req_op_raw_data_in(osd_req, which); 198 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 199 pages_from_pool, own_pages); 200 } 201 EXPORT_SYMBOL(osd_req_op_raw_data_in_pages); 202 203 void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req, 204 unsigned int which, struct page **pages, 205 u64 length, u32 alignment, 206 bool pages_from_pool, bool own_pages) 207 { 208 struct ceph_osd_data *osd_data; 209 210 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 211 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 212 pages_from_pool, own_pages); 213 } 214 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages); 215 216 void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req, 217 unsigned int which, struct ceph_pagelist *pagelist) 218 { 219 struct ceph_osd_data *osd_data; 220 221 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 222 ceph_osd_data_pagelist_init(osd_data, pagelist); 223 } 224 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist); 225 226 #ifdef CONFIG_BLOCK 227 void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req, 228 unsigned int which, 229 struct ceph_bio_iter *bio_pos, 230 u32 bio_length) 231 { 232 struct ceph_osd_data *osd_data; 233 234 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 235 ceph_osd_data_bio_init(osd_data, bio_pos, bio_length); 236 } 237 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio); 238 #endif /* CONFIG_BLOCK */ 239 240 void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req, 241 unsigned int which, 242 struct ceph_bvec_iter *bvec_pos) 243 { 244 struct ceph_osd_data *osd_data; 245 246 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 247 ceph_osd_data_bvecs_init(osd_data, bvec_pos); 248 } 249 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvec_pos); 250 251 static void osd_req_op_cls_request_info_pagelist( 252 struct ceph_osd_request *osd_req, 253 unsigned int which, struct ceph_pagelist *pagelist) 254 { 255 struct ceph_osd_data *osd_data; 256 257 osd_data = osd_req_op_data(osd_req, which, cls, request_info); 258 ceph_osd_data_pagelist_init(osd_data, pagelist); 259 } 260 261 void osd_req_op_cls_request_data_pagelist( 262 struct ceph_osd_request *osd_req, 263 unsigned int which, struct ceph_pagelist *pagelist) 264 { 265 struct ceph_osd_data *osd_data; 266 267 osd_data = osd_req_op_data(osd_req, which, cls, request_data); 268 ceph_osd_data_pagelist_init(osd_data, pagelist); 269 osd_req->r_ops[which].cls.indata_len += pagelist->length; 270 osd_req->r_ops[which].indata_len += pagelist->length; 271 } 272 EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist); 273 274 void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req, 275 unsigned int which, struct page **pages, u64 length, 276 u32 alignment, bool pages_from_pool, bool own_pages) 277 { 278 struct ceph_osd_data *osd_data; 279 280 osd_data = osd_req_op_data(osd_req, which, cls, request_data); 281 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 282 pages_from_pool, own_pages); 283 osd_req->r_ops[which].cls.indata_len += length; 284 osd_req->r_ops[which].indata_len += length; 285 } 286 EXPORT_SYMBOL(osd_req_op_cls_request_data_pages); 287 288 void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req, 289 unsigned int which, 290 struct bio_vec *bvecs, u32 bytes) 291 { 292 struct ceph_osd_data *osd_data; 293 struct ceph_bvec_iter it = { 294 .bvecs = bvecs, 295 .iter = { .bi_size = bytes }, 296 }; 297 298 osd_data = osd_req_op_data(osd_req, which, cls, request_data); 299 ceph_osd_data_bvecs_init(osd_data, &it); 300 osd_req->r_ops[which].cls.indata_len += bytes; 301 osd_req->r_ops[which].indata_len += bytes; 302 } 303 EXPORT_SYMBOL(osd_req_op_cls_request_data_bvecs); 304 305 void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req, 306 unsigned int which, struct page **pages, u64 length, 307 u32 alignment, bool pages_from_pool, bool own_pages) 308 { 309 struct ceph_osd_data *osd_data; 310 311 osd_data = osd_req_op_data(osd_req, which, cls, response_data); 312 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 313 pages_from_pool, own_pages); 314 } 315 EXPORT_SYMBOL(osd_req_op_cls_response_data_pages); 316 317 static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data) 318 { 319 switch (osd_data->type) { 320 case CEPH_OSD_DATA_TYPE_NONE: 321 return 0; 322 case CEPH_OSD_DATA_TYPE_PAGES: 323 return osd_data->length; 324 case CEPH_OSD_DATA_TYPE_PAGELIST: 325 return (u64)osd_data->pagelist->length; 326 #ifdef CONFIG_BLOCK 327 case CEPH_OSD_DATA_TYPE_BIO: 328 return (u64)osd_data->bio_length; 329 #endif /* CONFIG_BLOCK */ 330 case CEPH_OSD_DATA_TYPE_BVECS: 331 return osd_data->bvec_pos.iter.bi_size; 332 default: 333 WARN(true, "unrecognized data type %d\n", (int)osd_data->type); 334 return 0; 335 } 336 } 337 338 static void ceph_osd_data_release(struct ceph_osd_data *osd_data) 339 { 340 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) { 341 int num_pages; 342 343 num_pages = calc_pages_for((u64)osd_data->alignment, 344 (u64)osd_data->length); 345 ceph_release_page_vector(osd_data->pages, num_pages); 346 } 347 ceph_osd_data_init(osd_data); 348 } 349 350 static void osd_req_op_data_release(struct ceph_osd_request *osd_req, 351 unsigned int which) 352 { 353 struct ceph_osd_req_op *op; 354 355 BUG_ON(which >= osd_req->r_num_ops); 356 op = &osd_req->r_ops[which]; 357 358 switch (op->op) { 359 case CEPH_OSD_OP_READ: 360 case CEPH_OSD_OP_WRITE: 361 case CEPH_OSD_OP_WRITEFULL: 362 ceph_osd_data_release(&op->extent.osd_data); 363 break; 364 case CEPH_OSD_OP_CALL: 365 ceph_osd_data_release(&op->cls.request_info); 366 ceph_osd_data_release(&op->cls.request_data); 367 ceph_osd_data_release(&op->cls.response_data); 368 break; 369 case CEPH_OSD_OP_SETXATTR: 370 case CEPH_OSD_OP_CMPXATTR: 371 ceph_osd_data_release(&op->xattr.osd_data); 372 break; 373 case CEPH_OSD_OP_STAT: 374 ceph_osd_data_release(&op->raw_data_in); 375 break; 376 case CEPH_OSD_OP_NOTIFY_ACK: 377 ceph_osd_data_release(&op->notify_ack.request_data); 378 break; 379 case CEPH_OSD_OP_NOTIFY: 380 ceph_osd_data_release(&op->notify.request_data); 381 ceph_osd_data_release(&op->notify.response_data); 382 break; 383 case CEPH_OSD_OP_LIST_WATCHERS: 384 ceph_osd_data_release(&op->list_watchers.response_data); 385 break; 386 default: 387 break; 388 } 389 } 390 391 /* 392 * Assumes @t is zero-initialized. 393 */ 394 static void target_init(struct ceph_osd_request_target *t) 395 { 396 ceph_oid_init(&t->base_oid); 397 ceph_oloc_init(&t->base_oloc); 398 ceph_oid_init(&t->target_oid); 399 ceph_oloc_init(&t->target_oloc); 400 401 ceph_osds_init(&t->acting); 402 ceph_osds_init(&t->up); 403 t->size = -1; 404 t->min_size = -1; 405 406 t->osd = CEPH_HOMELESS_OSD; 407 } 408 409 static void target_copy(struct ceph_osd_request_target *dest, 410 const struct ceph_osd_request_target *src) 411 { 412 ceph_oid_copy(&dest->base_oid, &src->base_oid); 413 ceph_oloc_copy(&dest->base_oloc, &src->base_oloc); 414 ceph_oid_copy(&dest->target_oid, &src->target_oid); 415 ceph_oloc_copy(&dest->target_oloc, &src->target_oloc); 416 417 dest->pgid = src->pgid; /* struct */ 418 dest->spgid = src->spgid; /* struct */ 419 dest->pg_num = src->pg_num; 420 dest->pg_num_mask = src->pg_num_mask; 421 ceph_osds_copy(&dest->acting, &src->acting); 422 ceph_osds_copy(&dest->up, &src->up); 423 dest->size = src->size; 424 dest->min_size = src->min_size; 425 dest->sort_bitwise = src->sort_bitwise; 426 427 dest->flags = src->flags; 428 dest->paused = src->paused; 429 430 dest->epoch = src->epoch; 431 dest->last_force_resend = src->last_force_resend; 432 433 dest->osd = src->osd; 434 } 435 436 static void target_destroy(struct ceph_osd_request_target *t) 437 { 438 ceph_oid_destroy(&t->base_oid); 439 ceph_oloc_destroy(&t->base_oloc); 440 ceph_oid_destroy(&t->target_oid); 441 ceph_oloc_destroy(&t->target_oloc); 442 } 443 444 /* 445 * requests 446 */ 447 static void request_release_checks(struct ceph_osd_request *req) 448 { 449 WARN_ON(!RB_EMPTY_NODE(&req->r_node)); 450 WARN_ON(!RB_EMPTY_NODE(&req->r_mc_node)); 451 WARN_ON(!list_empty(&req->r_unsafe_item)); 452 WARN_ON(req->r_osd); 453 } 454 455 static void ceph_osdc_release_request(struct kref *kref) 456 { 457 struct ceph_osd_request *req = container_of(kref, 458 struct ceph_osd_request, r_kref); 459 unsigned int which; 460 461 dout("%s %p (r_request %p r_reply %p)\n", __func__, req, 462 req->r_request, req->r_reply); 463 request_release_checks(req); 464 465 if (req->r_request) 466 ceph_msg_put(req->r_request); 467 if (req->r_reply) 468 ceph_msg_put(req->r_reply); 469 470 for (which = 0; which < req->r_num_ops; which++) 471 osd_req_op_data_release(req, which); 472 473 target_destroy(&req->r_t); 474 ceph_put_snap_context(req->r_snapc); 475 476 if (req->r_mempool) 477 mempool_free(req, req->r_osdc->req_mempool); 478 else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS) 479 kmem_cache_free(ceph_osd_request_cache, req); 480 else 481 kfree(req); 482 } 483 484 void ceph_osdc_get_request(struct ceph_osd_request *req) 485 { 486 dout("%s %p (was %d)\n", __func__, req, 487 kref_read(&req->r_kref)); 488 kref_get(&req->r_kref); 489 } 490 EXPORT_SYMBOL(ceph_osdc_get_request); 491 492 void ceph_osdc_put_request(struct ceph_osd_request *req) 493 { 494 if (req) { 495 dout("%s %p (was %d)\n", __func__, req, 496 kref_read(&req->r_kref)); 497 kref_put(&req->r_kref, ceph_osdc_release_request); 498 } 499 } 500 EXPORT_SYMBOL(ceph_osdc_put_request); 501 502 static void request_init(struct ceph_osd_request *req) 503 { 504 /* req only, each op is zeroed in _osd_req_op_init() */ 505 memset(req, 0, sizeof(*req)); 506 507 kref_init(&req->r_kref); 508 init_completion(&req->r_completion); 509 RB_CLEAR_NODE(&req->r_node); 510 RB_CLEAR_NODE(&req->r_mc_node); 511 INIT_LIST_HEAD(&req->r_unsafe_item); 512 513 target_init(&req->r_t); 514 } 515 516 /* 517 * This is ugly, but it allows us to reuse linger registration and ping 518 * requests, keeping the structure of the code around send_linger{_ping}() 519 * reasonable. Setting up a min_nr=2 mempool for each linger request 520 * and dealing with copying ops (this blasts req only, watch op remains 521 * intact) isn't any better. 522 */ 523 static void request_reinit(struct ceph_osd_request *req) 524 { 525 struct ceph_osd_client *osdc = req->r_osdc; 526 bool mempool = req->r_mempool; 527 unsigned int num_ops = req->r_num_ops; 528 u64 snapid = req->r_snapid; 529 struct ceph_snap_context *snapc = req->r_snapc; 530 bool linger = req->r_linger; 531 struct ceph_msg *request_msg = req->r_request; 532 struct ceph_msg *reply_msg = req->r_reply; 533 534 dout("%s req %p\n", __func__, req); 535 WARN_ON(kref_read(&req->r_kref) != 1); 536 request_release_checks(req); 537 538 WARN_ON(kref_read(&request_msg->kref) != 1); 539 WARN_ON(kref_read(&reply_msg->kref) != 1); 540 target_destroy(&req->r_t); 541 542 request_init(req); 543 req->r_osdc = osdc; 544 req->r_mempool = mempool; 545 req->r_num_ops = num_ops; 546 req->r_snapid = snapid; 547 req->r_snapc = snapc; 548 req->r_linger = linger; 549 req->r_request = request_msg; 550 req->r_reply = reply_msg; 551 } 552 553 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 554 struct ceph_snap_context *snapc, 555 unsigned int num_ops, 556 bool use_mempool, 557 gfp_t gfp_flags) 558 { 559 struct ceph_osd_request *req; 560 561 if (use_mempool) { 562 BUG_ON(num_ops > CEPH_OSD_SLAB_OPS); 563 req = mempool_alloc(osdc->req_mempool, gfp_flags); 564 } else if (num_ops <= CEPH_OSD_SLAB_OPS) { 565 req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags); 566 } else { 567 BUG_ON(num_ops > CEPH_OSD_MAX_OPS); 568 req = kmalloc(struct_size(req, r_ops, num_ops), gfp_flags); 569 } 570 if (unlikely(!req)) 571 return NULL; 572 573 request_init(req); 574 req->r_osdc = osdc; 575 req->r_mempool = use_mempool; 576 req->r_num_ops = num_ops; 577 req->r_snapid = CEPH_NOSNAP; 578 req->r_snapc = ceph_get_snap_context(snapc); 579 580 dout("%s req %p\n", __func__, req); 581 return req; 582 } 583 EXPORT_SYMBOL(ceph_osdc_alloc_request); 584 585 static int ceph_oloc_encoding_size(const struct ceph_object_locator *oloc) 586 { 587 return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0); 588 } 589 590 int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) 591 { 592 struct ceph_osd_client *osdc = req->r_osdc; 593 struct ceph_msg *msg; 594 int msg_size; 595 596 WARN_ON(ceph_oid_empty(&req->r_base_oid)); 597 WARN_ON(ceph_oloc_empty(&req->r_base_oloc)); 598 599 /* create request message */ 600 msg_size = CEPH_ENCODING_START_BLK_LEN + 601 CEPH_PGID_ENCODING_LEN + 1; /* spgid */ 602 msg_size += 4 + 4 + 4; /* hash, osdmap_epoch, flags */ 603 msg_size += CEPH_ENCODING_START_BLK_LEN + 604 sizeof(struct ceph_osd_reqid); /* reqid */ 605 msg_size += sizeof(struct ceph_blkin_trace_info); /* trace */ 606 msg_size += 4 + sizeof(struct ceph_timespec); /* client_inc, mtime */ 607 msg_size += CEPH_ENCODING_START_BLK_LEN + 608 ceph_oloc_encoding_size(&req->r_base_oloc); /* oloc */ 609 msg_size += 4 + req->r_base_oid.name_len; /* oid */ 610 msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op); 611 msg_size += 8; /* snapid */ 612 msg_size += 8; /* snap_seq */ 613 msg_size += 4 + 8 * (req->r_snapc ? req->r_snapc->num_snaps : 0); 614 msg_size += 4 + 8; /* retry_attempt, features */ 615 616 if (req->r_mempool) 617 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 618 else 619 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp, true); 620 if (!msg) 621 return -ENOMEM; 622 623 memset(msg->front.iov_base, 0, msg->front.iov_len); 624 req->r_request = msg; 625 626 /* create reply message */ 627 msg_size = OSD_OPREPLY_FRONT_LEN; 628 msg_size += req->r_base_oid.name_len; 629 msg_size += req->r_num_ops * sizeof(struct ceph_osd_op); 630 631 if (req->r_mempool) 632 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 633 else 634 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, gfp, true); 635 if (!msg) 636 return -ENOMEM; 637 638 req->r_reply = msg; 639 640 return 0; 641 } 642 EXPORT_SYMBOL(ceph_osdc_alloc_messages); 643 644 static bool osd_req_opcode_valid(u16 opcode) 645 { 646 switch (opcode) { 647 #define GENERATE_CASE(op, opcode, str) case CEPH_OSD_OP_##op: return true; 648 __CEPH_FORALL_OSD_OPS(GENERATE_CASE) 649 #undef GENERATE_CASE 650 default: 651 return false; 652 } 653 } 654 655 /* 656 * This is an osd op init function for opcodes that have no data or 657 * other information associated with them. It also serves as a 658 * common init routine for all the other init functions, below. 659 */ 660 static struct ceph_osd_req_op * 661 _osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which, 662 u16 opcode, u32 flags) 663 { 664 struct ceph_osd_req_op *op; 665 666 BUG_ON(which >= osd_req->r_num_ops); 667 BUG_ON(!osd_req_opcode_valid(opcode)); 668 669 op = &osd_req->r_ops[which]; 670 memset(op, 0, sizeof (*op)); 671 op->op = opcode; 672 op->flags = flags; 673 674 return op; 675 } 676 677 void osd_req_op_init(struct ceph_osd_request *osd_req, 678 unsigned int which, u16 opcode, u32 flags) 679 { 680 (void)_osd_req_op_init(osd_req, which, opcode, flags); 681 } 682 EXPORT_SYMBOL(osd_req_op_init); 683 684 void osd_req_op_extent_init(struct ceph_osd_request *osd_req, 685 unsigned int which, u16 opcode, 686 u64 offset, u64 length, 687 u64 truncate_size, u32 truncate_seq) 688 { 689 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 690 opcode, 0); 691 size_t payload_len = 0; 692 693 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && 694 opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO && 695 opcode != CEPH_OSD_OP_TRUNCATE); 696 697 op->extent.offset = offset; 698 op->extent.length = length; 699 op->extent.truncate_size = truncate_size; 700 op->extent.truncate_seq = truncate_seq; 701 if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL) 702 payload_len += length; 703 704 op->indata_len = payload_len; 705 } 706 EXPORT_SYMBOL(osd_req_op_extent_init); 707 708 void osd_req_op_extent_update(struct ceph_osd_request *osd_req, 709 unsigned int which, u64 length) 710 { 711 struct ceph_osd_req_op *op; 712 u64 previous; 713 714 BUG_ON(which >= osd_req->r_num_ops); 715 op = &osd_req->r_ops[which]; 716 previous = op->extent.length; 717 718 if (length == previous) 719 return; /* Nothing to do */ 720 BUG_ON(length > previous); 721 722 op->extent.length = length; 723 if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL) 724 op->indata_len -= previous - length; 725 } 726 EXPORT_SYMBOL(osd_req_op_extent_update); 727 728 void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req, 729 unsigned int which, u64 offset_inc) 730 { 731 struct ceph_osd_req_op *op, *prev_op; 732 733 BUG_ON(which + 1 >= osd_req->r_num_ops); 734 735 prev_op = &osd_req->r_ops[which]; 736 op = _osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags); 737 /* dup previous one */ 738 op->indata_len = prev_op->indata_len; 739 op->outdata_len = prev_op->outdata_len; 740 op->extent = prev_op->extent; 741 /* adjust offset */ 742 op->extent.offset += offset_inc; 743 op->extent.length -= offset_inc; 744 745 if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL) 746 op->indata_len -= offset_inc; 747 } 748 EXPORT_SYMBOL(osd_req_op_extent_dup_last); 749 750 void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, 751 u16 opcode, const char *class, const char *method) 752 { 753 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 754 opcode, 0); 755 struct ceph_pagelist *pagelist; 756 size_t payload_len = 0; 757 size_t size; 758 759 BUG_ON(opcode != CEPH_OSD_OP_CALL); 760 761 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); 762 BUG_ON(!pagelist); 763 ceph_pagelist_init(pagelist); 764 765 op->cls.class_name = class; 766 size = strlen(class); 767 BUG_ON(size > (size_t) U8_MAX); 768 op->cls.class_len = size; 769 ceph_pagelist_append(pagelist, class, size); 770 payload_len += size; 771 772 op->cls.method_name = method; 773 size = strlen(method); 774 BUG_ON(size > (size_t) U8_MAX); 775 op->cls.method_len = size; 776 ceph_pagelist_append(pagelist, method, size); 777 payload_len += size; 778 779 osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist); 780 781 op->indata_len = payload_len; 782 } 783 EXPORT_SYMBOL(osd_req_op_cls_init); 784 785 int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which, 786 u16 opcode, const char *name, const void *value, 787 size_t size, u8 cmp_op, u8 cmp_mode) 788 { 789 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 790 opcode, 0); 791 struct ceph_pagelist *pagelist; 792 size_t payload_len; 793 794 BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR); 795 796 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); 797 if (!pagelist) 798 return -ENOMEM; 799 800 ceph_pagelist_init(pagelist); 801 802 payload_len = strlen(name); 803 op->xattr.name_len = payload_len; 804 ceph_pagelist_append(pagelist, name, payload_len); 805 806 op->xattr.value_len = size; 807 ceph_pagelist_append(pagelist, value, size); 808 payload_len += size; 809 810 op->xattr.cmp_op = cmp_op; 811 op->xattr.cmp_mode = cmp_mode; 812 813 ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist); 814 op->indata_len = payload_len; 815 return 0; 816 } 817 EXPORT_SYMBOL(osd_req_op_xattr_init); 818 819 /* 820 * @watch_opcode: CEPH_OSD_WATCH_OP_* 821 */ 822 static void osd_req_op_watch_init(struct ceph_osd_request *req, int which, 823 u64 cookie, u8 watch_opcode) 824 { 825 struct ceph_osd_req_op *op; 826 827 op = _osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0); 828 op->watch.cookie = cookie; 829 op->watch.op = watch_opcode; 830 op->watch.gen = 0; 831 } 832 833 void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req, 834 unsigned int which, 835 u64 expected_object_size, 836 u64 expected_write_size) 837 { 838 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 839 CEPH_OSD_OP_SETALLOCHINT, 840 0); 841 842 op->alloc_hint.expected_object_size = expected_object_size; 843 op->alloc_hint.expected_write_size = expected_write_size; 844 845 /* 846 * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed 847 * not worth a feature bit. Set FAILOK per-op flag to make 848 * sure older osds don't trip over an unsupported opcode. 849 */ 850 op->flags |= CEPH_OSD_OP_FLAG_FAILOK; 851 } 852 EXPORT_SYMBOL(osd_req_op_alloc_hint_init); 853 854 static void ceph_osdc_msg_data_add(struct ceph_msg *msg, 855 struct ceph_osd_data *osd_data) 856 { 857 u64 length = ceph_osd_data_length(osd_data); 858 859 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { 860 BUG_ON(length > (u64) SIZE_MAX); 861 if (length) 862 ceph_msg_data_add_pages(msg, osd_data->pages, 863 length, osd_data->alignment); 864 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) { 865 BUG_ON(!length); 866 ceph_msg_data_add_pagelist(msg, osd_data->pagelist); 867 #ifdef CONFIG_BLOCK 868 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) { 869 ceph_msg_data_add_bio(msg, &osd_data->bio_pos, length); 870 #endif 871 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BVECS) { 872 ceph_msg_data_add_bvecs(msg, &osd_data->bvec_pos); 873 } else { 874 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE); 875 } 876 } 877 878 static u32 osd_req_encode_op(struct ceph_osd_op *dst, 879 const struct ceph_osd_req_op *src) 880 { 881 if (WARN_ON(!osd_req_opcode_valid(src->op))) { 882 pr_err("unrecognized osd opcode %d\n", src->op); 883 884 return 0; 885 } 886 887 switch (src->op) { 888 case CEPH_OSD_OP_STAT: 889 break; 890 case CEPH_OSD_OP_READ: 891 case CEPH_OSD_OP_WRITE: 892 case CEPH_OSD_OP_WRITEFULL: 893 case CEPH_OSD_OP_ZERO: 894 case CEPH_OSD_OP_TRUNCATE: 895 dst->extent.offset = cpu_to_le64(src->extent.offset); 896 dst->extent.length = cpu_to_le64(src->extent.length); 897 dst->extent.truncate_size = 898 cpu_to_le64(src->extent.truncate_size); 899 dst->extent.truncate_seq = 900 cpu_to_le32(src->extent.truncate_seq); 901 break; 902 case CEPH_OSD_OP_CALL: 903 dst->cls.class_len = src->cls.class_len; 904 dst->cls.method_len = src->cls.method_len; 905 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len); 906 break; 907 case CEPH_OSD_OP_WATCH: 908 dst->watch.cookie = cpu_to_le64(src->watch.cookie); 909 dst->watch.ver = cpu_to_le64(0); 910 dst->watch.op = src->watch.op; 911 dst->watch.gen = cpu_to_le32(src->watch.gen); 912 break; 913 case CEPH_OSD_OP_NOTIFY_ACK: 914 break; 915 case CEPH_OSD_OP_NOTIFY: 916 dst->notify.cookie = cpu_to_le64(src->notify.cookie); 917 break; 918 case CEPH_OSD_OP_LIST_WATCHERS: 919 break; 920 case CEPH_OSD_OP_SETALLOCHINT: 921 dst->alloc_hint.expected_object_size = 922 cpu_to_le64(src->alloc_hint.expected_object_size); 923 dst->alloc_hint.expected_write_size = 924 cpu_to_le64(src->alloc_hint.expected_write_size); 925 break; 926 case CEPH_OSD_OP_SETXATTR: 927 case CEPH_OSD_OP_CMPXATTR: 928 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len); 929 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len); 930 dst->xattr.cmp_op = src->xattr.cmp_op; 931 dst->xattr.cmp_mode = src->xattr.cmp_mode; 932 break; 933 case CEPH_OSD_OP_CREATE: 934 case CEPH_OSD_OP_DELETE: 935 break; 936 default: 937 pr_err("unsupported osd opcode %s\n", 938 ceph_osd_op_name(src->op)); 939 WARN_ON(1); 940 941 return 0; 942 } 943 944 dst->op = cpu_to_le16(src->op); 945 dst->flags = cpu_to_le32(src->flags); 946 dst->payload_len = cpu_to_le32(src->indata_len); 947 948 return src->indata_len; 949 } 950 951 /* 952 * build new request AND message, calculate layout, and adjust file 953 * extent as needed. 954 * 955 * if the file was recently truncated, we include information about its 956 * old and new size so that the object can be updated appropriately. (we 957 * avoid synchronously deleting truncated objects because it's slow.) 958 */ 959 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, 960 struct ceph_file_layout *layout, 961 struct ceph_vino vino, 962 u64 off, u64 *plen, 963 unsigned int which, int num_ops, 964 int opcode, int flags, 965 struct ceph_snap_context *snapc, 966 u32 truncate_seq, 967 u64 truncate_size, 968 bool use_mempool) 969 { 970 struct ceph_osd_request *req; 971 u64 objnum = 0; 972 u64 objoff = 0; 973 u64 objlen = 0; 974 int r; 975 976 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && 977 opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE && 978 opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE); 979 980 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool, 981 GFP_NOFS); 982 if (!req) { 983 r = -ENOMEM; 984 goto fail; 985 } 986 987 /* calculate max write size */ 988 r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen); 989 if (r) 990 goto fail; 991 992 if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) { 993 osd_req_op_init(req, which, opcode, 0); 994 } else { 995 u32 object_size = layout->object_size; 996 u32 object_base = off - objoff; 997 if (!(truncate_seq == 1 && truncate_size == -1ULL)) { 998 if (truncate_size <= object_base) { 999 truncate_size = 0; 1000 } else { 1001 truncate_size -= object_base; 1002 if (truncate_size > object_size) 1003 truncate_size = object_size; 1004 } 1005 } 1006 osd_req_op_extent_init(req, which, opcode, objoff, objlen, 1007 truncate_size, truncate_seq); 1008 } 1009 1010 req->r_abort_on_full = true; 1011 req->r_flags = flags; 1012 req->r_base_oloc.pool = layout->pool_id; 1013 req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns); 1014 ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum); 1015 1016 req->r_snapid = vino.snap; 1017 if (flags & CEPH_OSD_FLAG_WRITE) 1018 req->r_data_offset = off; 1019 1020 r = ceph_osdc_alloc_messages(req, GFP_NOFS); 1021 if (r) 1022 goto fail; 1023 1024 return req; 1025 1026 fail: 1027 ceph_osdc_put_request(req); 1028 return ERR_PTR(r); 1029 } 1030 EXPORT_SYMBOL(ceph_osdc_new_request); 1031 1032 /* 1033 * We keep osd requests in an rbtree, sorted by ->r_tid. 1034 */ 1035 DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node) 1036 DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node) 1037 1038 static bool osd_homeless(struct ceph_osd *osd) 1039 { 1040 return osd->o_osd == CEPH_HOMELESS_OSD; 1041 } 1042 1043 static bool osd_registered(struct ceph_osd *osd) 1044 { 1045 verify_osdc_locked(osd->o_osdc); 1046 1047 return !RB_EMPTY_NODE(&osd->o_node); 1048 } 1049 1050 /* 1051 * Assumes @osd is zero-initialized. 1052 */ 1053 static void osd_init(struct ceph_osd *osd) 1054 { 1055 refcount_set(&osd->o_ref, 1); 1056 RB_CLEAR_NODE(&osd->o_node); 1057 osd->o_requests = RB_ROOT; 1058 osd->o_linger_requests = RB_ROOT; 1059 osd->o_backoff_mappings = RB_ROOT; 1060 osd->o_backoffs_by_id = RB_ROOT; 1061 INIT_LIST_HEAD(&osd->o_osd_lru); 1062 INIT_LIST_HEAD(&osd->o_keepalive_item); 1063 osd->o_incarnation = 1; 1064 mutex_init(&osd->lock); 1065 } 1066 1067 static void osd_cleanup(struct ceph_osd *osd) 1068 { 1069 WARN_ON(!RB_EMPTY_NODE(&osd->o_node)); 1070 WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests)); 1071 WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests)); 1072 WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoff_mappings)); 1073 WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoffs_by_id)); 1074 WARN_ON(!list_empty(&osd->o_osd_lru)); 1075 WARN_ON(!list_empty(&osd->o_keepalive_item)); 1076 1077 if (osd->o_auth.authorizer) { 1078 WARN_ON(osd_homeless(osd)); 1079 ceph_auth_destroy_authorizer(osd->o_auth.authorizer); 1080 } 1081 } 1082 1083 /* 1084 * Track open sessions with osds. 1085 */ 1086 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum) 1087 { 1088 struct ceph_osd *osd; 1089 1090 WARN_ON(onum == CEPH_HOMELESS_OSD); 1091 1092 osd = kzalloc(sizeof(*osd), GFP_NOIO | __GFP_NOFAIL); 1093 osd_init(osd); 1094 osd->o_osdc = osdc; 1095 osd->o_osd = onum; 1096 1097 ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr); 1098 1099 return osd; 1100 } 1101 1102 static struct ceph_osd *get_osd(struct ceph_osd *osd) 1103 { 1104 if (refcount_inc_not_zero(&osd->o_ref)) { 1105 dout("get_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref)-1, 1106 refcount_read(&osd->o_ref)); 1107 return osd; 1108 } else { 1109 dout("get_osd %p FAIL\n", osd); 1110 return NULL; 1111 } 1112 } 1113 1114 static void put_osd(struct ceph_osd *osd) 1115 { 1116 dout("put_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref), 1117 refcount_read(&osd->o_ref) - 1); 1118 if (refcount_dec_and_test(&osd->o_ref)) { 1119 osd_cleanup(osd); 1120 kfree(osd); 1121 } 1122 } 1123 1124 DEFINE_RB_FUNCS(osd, struct ceph_osd, o_osd, o_node) 1125 1126 static void __move_osd_to_lru(struct ceph_osd *osd) 1127 { 1128 struct ceph_osd_client *osdc = osd->o_osdc; 1129 1130 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); 1131 BUG_ON(!list_empty(&osd->o_osd_lru)); 1132 1133 spin_lock(&osdc->osd_lru_lock); 1134 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru); 1135 spin_unlock(&osdc->osd_lru_lock); 1136 1137 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl; 1138 } 1139 1140 static void maybe_move_osd_to_lru(struct ceph_osd *osd) 1141 { 1142 if (RB_EMPTY_ROOT(&osd->o_requests) && 1143 RB_EMPTY_ROOT(&osd->o_linger_requests)) 1144 __move_osd_to_lru(osd); 1145 } 1146 1147 static void __remove_osd_from_lru(struct ceph_osd *osd) 1148 { 1149 struct ceph_osd_client *osdc = osd->o_osdc; 1150 1151 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); 1152 1153 spin_lock(&osdc->osd_lru_lock); 1154 if (!list_empty(&osd->o_osd_lru)) 1155 list_del_init(&osd->o_osd_lru); 1156 spin_unlock(&osdc->osd_lru_lock); 1157 } 1158 1159 /* 1160 * Close the connection and assign any leftover requests to the 1161 * homeless session. 1162 */ 1163 static void close_osd(struct ceph_osd *osd) 1164 { 1165 struct ceph_osd_client *osdc = osd->o_osdc; 1166 struct rb_node *n; 1167 1168 verify_osdc_wrlocked(osdc); 1169 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); 1170 1171 ceph_con_close(&osd->o_con); 1172 1173 for (n = rb_first(&osd->o_requests); n; ) { 1174 struct ceph_osd_request *req = 1175 rb_entry(n, struct ceph_osd_request, r_node); 1176 1177 n = rb_next(n); /* unlink_request() */ 1178 1179 dout(" reassigning req %p tid %llu\n", req, req->r_tid); 1180 unlink_request(osd, req); 1181 link_request(&osdc->homeless_osd, req); 1182 } 1183 for (n = rb_first(&osd->o_linger_requests); n; ) { 1184 struct ceph_osd_linger_request *lreq = 1185 rb_entry(n, struct ceph_osd_linger_request, node); 1186 1187 n = rb_next(n); /* unlink_linger() */ 1188 1189 dout(" reassigning lreq %p linger_id %llu\n", lreq, 1190 lreq->linger_id); 1191 unlink_linger(osd, lreq); 1192 link_linger(&osdc->homeless_osd, lreq); 1193 } 1194 clear_backoffs(osd); 1195 1196 __remove_osd_from_lru(osd); 1197 erase_osd(&osdc->osds, osd); 1198 put_osd(osd); 1199 } 1200 1201 /* 1202 * reset osd connect 1203 */ 1204 static int reopen_osd(struct ceph_osd *osd) 1205 { 1206 struct ceph_entity_addr *peer_addr; 1207 1208 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); 1209 1210 if (RB_EMPTY_ROOT(&osd->o_requests) && 1211 RB_EMPTY_ROOT(&osd->o_linger_requests)) { 1212 close_osd(osd); 1213 return -ENODEV; 1214 } 1215 1216 peer_addr = &osd->o_osdc->osdmap->osd_addr[osd->o_osd]; 1217 if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) && 1218 !ceph_con_opened(&osd->o_con)) { 1219 struct rb_node *n; 1220 1221 dout("osd addr hasn't changed and connection never opened, " 1222 "letting msgr retry\n"); 1223 /* touch each r_stamp for handle_timeout()'s benfit */ 1224 for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) { 1225 struct ceph_osd_request *req = 1226 rb_entry(n, struct ceph_osd_request, r_node); 1227 req->r_stamp = jiffies; 1228 } 1229 1230 return -EAGAIN; 1231 } 1232 1233 ceph_con_close(&osd->o_con); 1234 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr); 1235 osd->o_incarnation++; 1236 1237 return 0; 1238 } 1239 1240 static struct ceph_osd *lookup_create_osd(struct ceph_osd_client *osdc, int o, 1241 bool wrlocked) 1242 { 1243 struct ceph_osd *osd; 1244 1245 if (wrlocked) 1246 verify_osdc_wrlocked(osdc); 1247 else 1248 verify_osdc_locked(osdc); 1249 1250 if (o != CEPH_HOMELESS_OSD) 1251 osd = lookup_osd(&osdc->osds, o); 1252 else 1253 osd = &osdc->homeless_osd; 1254 if (!osd) { 1255 if (!wrlocked) 1256 return ERR_PTR(-EAGAIN); 1257 1258 osd = create_osd(osdc, o); 1259 insert_osd(&osdc->osds, osd); 1260 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, 1261 &osdc->osdmap->osd_addr[osd->o_osd]); 1262 } 1263 1264 dout("%s osdc %p osd%d -> osd %p\n", __func__, osdc, o, osd); 1265 return osd; 1266 } 1267 1268 /* 1269 * Create request <-> OSD session relation. 1270 * 1271 * @req has to be assigned a tid, @osd may be homeless. 1272 */ 1273 static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req) 1274 { 1275 verify_osd_locked(osd); 1276 WARN_ON(!req->r_tid || req->r_osd); 1277 dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd, 1278 req, req->r_tid); 1279 1280 if (!osd_homeless(osd)) 1281 __remove_osd_from_lru(osd); 1282 else 1283 atomic_inc(&osd->o_osdc->num_homeless); 1284 1285 get_osd(osd); 1286 insert_request(&osd->o_requests, req); 1287 req->r_osd = osd; 1288 } 1289 1290 static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req) 1291 { 1292 verify_osd_locked(osd); 1293 WARN_ON(req->r_osd != osd); 1294 dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd, 1295 req, req->r_tid); 1296 1297 req->r_osd = NULL; 1298 erase_request(&osd->o_requests, req); 1299 put_osd(osd); 1300 1301 if (!osd_homeless(osd)) 1302 maybe_move_osd_to_lru(osd); 1303 else 1304 atomic_dec(&osd->o_osdc->num_homeless); 1305 } 1306 1307 static bool __pool_full(struct ceph_pg_pool_info *pi) 1308 { 1309 return pi->flags & CEPH_POOL_FLAG_FULL; 1310 } 1311 1312 static bool have_pool_full(struct ceph_osd_client *osdc) 1313 { 1314 struct rb_node *n; 1315 1316 for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) { 1317 struct ceph_pg_pool_info *pi = 1318 rb_entry(n, struct ceph_pg_pool_info, node); 1319 1320 if (__pool_full(pi)) 1321 return true; 1322 } 1323 1324 return false; 1325 } 1326 1327 static bool pool_full(struct ceph_osd_client *osdc, s64 pool_id) 1328 { 1329 struct ceph_pg_pool_info *pi; 1330 1331 pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id); 1332 if (!pi) 1333 return false; 1334 1335 return __pool_full(pi); 1336 } 1337 1338 /* 1339 * Returns whether a request should be blocked from being sent 1340 * based on the current osdmap and osd_client settings. 1341 */ 1342 static bool target_should_be_paused(struct ceph_osd_client *osdc, 1343 const struct ceph_osd_request_target *t, 1344 struct ceph_pg_pool_info *pi) 1345 { 1346 bool pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD); 1347 bool pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) || 1348 ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 1349 __pool_full(pi); 1350 1351 WARN_ON(pi->id != t->target_oloc.pool); 1352 return ((t->flags & CEPH_OSD_FLAG_READ) && pauserd) || 1353 ((t->flags & CEPH_OSD_FLAG_WRITE) && pausewr) || 1354 (osdc->osdmap->epoch < osdc->epoch_barrier); 1355 } 1356 1357 enum calc_target_result { 1358 CALC_TARGET_NO_ACTION = 0, 1359 CALC_TARGET_NEED_RESEND, 1360 CALC_TARGET_POOL_DNE, 1361 }; 1362 1363 static enum calc_target_result calc_target(struct ceph_osd_client *osdc, 1364 struct ceph_osd_request_target *t, 1365 struct ceph_connection *con, 1366 bool any_change) 1367 { 1368 struct ceph_pg_pool_info *pi; 1369 struct ceph_pg pgid, last_pgid; 1370 struct ceph_osds up, acting; 1371 bool force_resend = false; 1372 bool unpaused = false; 1373 bool legacy_change; 1374 bool split = false; 1375 bool sort_bitwise = ceph_osdmap_flag(osdc, CEPH_OSDMAP_SORTBITWISE); 1376 bool recovery_deletes = ceph_osdmap_flag(osdc, 1377 CEPH_OSDMAP_RECOVERY_DELETES); 1378 enum calc_target_result ct_res; 1379 int ret; 1380 1381 t->epoch = osdc->osdmap->epoch; 1382 pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool); 1383 if (!pi) { 1384 t->osd = CEPH_HOMELESS_OSD; 1385 ct_res = CALC_TARGET_POOL_DNE; 1386 goto out; 1387 } 1388 1389 if (osdc->osdmap->epoch == pi->last_force_request_resend) { 1390 if (t->last_force_resend < pi->last_force_request_resend) { 1391 t->last_force_resend = pi->last_force_request_resend; 1392 force_resend = true; 1393 } else if (t->last_force_resend == 0) { 1394 force_resend = true; 1395 } 1396 } 1397 1398 /* apply tiering */ 1399 ceph_oid_copy(&t->target_oid, &t->base_oid); 1400 ceph_oloc_copy(&t->target_oloc, &t->base_oloc); 1401 if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) { 1402 if (t->flags & CEPH_OSD_FLAG_READ && pi->read_tier >= 0) 1403 t->target_oloc.pool = pi->read_tier; 1404 if (t->flags & CEPH_OSD_FLAG_WRITE && pi->write_tier >= 0) 1405 t->target_oloc.pool = pi->write_tier; 1406 1407 pi = ceph_pg_pool_by_id(osdc->osdmap, t->target_oloc.pool); 1408 if (!pi) { 1409 t->osd = CEPH_HOMELESS_OSD; 1410 ct_res = CALC_TARGET_POOL_DNE; 1411 goto out; 1412 } 1413 } 1414 1415 ret = __ceph_object_locator_to_pg(pi, &t->target_oid, &t->target_oloc, 1416 &pgid); 1417 if (ret) { 1418 WARN_ON(ret != -ENOENT); 1419 t->osd = CEPH_HOMELESS_OSD; 1420 ct_res = CALC_TARGET_POOL_DNE; 1421 goto out; 1422 } 1423 last_pgid.pool = pgid.pool; 1424 last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask); 1425 1426 ceph_pg_to_up_acting_osds(osdc->osdmap, pi, &pgid, &up, &acting); 1427 if (any_change && 1428 ceph_is_new_interval(&t->acting, 1429 &acting, 1430 &t->up, 1431 &up, 1432 t->size, 1433 pi->size, 1434 t->min_size, 1435 pi->min_size, 1436 t->pg_num, 1437 pi->pg_num, 1438 t->sort_bitwise, 1439 sort_bitwise, 1440 t->recovery_deletes, 1441 recovery_deletes, 1442 &last_pgid)) 1443 force_resend = true; 1444 1445 if (t->paused && !target_should_be_paused(osdc, t, pi)) { 1446 t->paused = false; 1447 unpaused = true; 1448 } 1449 legacy_change = ceph_pg_compare(&t->pgid, &pgid) || 1450 ceph_osds_changed(&t->acting, &acting, any_change); 1451 if (t->pg_num) 1452 split = ceph_pg_is_split(&last_pgid, t->pg_num, pi->pg_num); 1453 1454 if (legacy_change || force_resend || split) { 1455 t->pgid = pgid; /* struct */ 1456 ceph_pg_to_primary_shard(osdc->osdmap, pi, &pgid, &t->spgid); 1457 ceph_osds_copy(&t->acting, &acting); 1458 ceph_osds_copy(&t->up, &up); 1459 t->size = pi->size; 1460 t->min_size = pi->min_size; 1461 t->pg_num = pi->pg_num; 1462 t->pg_num_mask = pi->pg_num_mask; 1463 t->sort_bitwise = sort_bitwise; 1464 t->recovery_deletes = recovery_deletes; 1465 1466 t->osd = acting.primary; 1467 } 1468 1469 if (unpaused || legacy_change || force_resend || 1470 (split && con && CEPH_HAVE_FEATURE(con->peer_features, 1471 RESEND_ON_SPLIT))) 1472 ct_res = CALC_TARGET_NEED_RESEND; 1473 else 1474 ct_res = CALC_TARGET_NO_ACTION; 1475 1476 out: 1477 dout("%s t %p -> ct_res %d osd %d\n", __func__, t, ct_res, t->osd); 1478 return ct_res; 1479 } 1480 1481 static struct ceph_spg_mapping *alloc_spg_mapping(void) 1482 { 1483 struct ceph_spg_mapping *spg; 1484 1485 spg = kmalloc(sizeof(*spg), GFP_NOIO); 1486 if (!spg) 1487 return NULL; 1488 1489 RB_CLEAR_NODE(&spg->node); 1490 spg->backoffs = RB_ROOT; 1491 return spg; 1492 } 1493 1494 static void free_spg_mapping(struct ceph_spg_mapping *spg) 1495 { 1496 WARN_ON(!RB_EMPTY_NODE(&spg->node)); 1497 WARN_ON(!RB_EMPTY_ROOT(&spg->backoffs)); 1498 1499 kfree(spg); 1500 } 1501 1502 /* 1503 * rbtree of ceph_spg_mapping for handling map<spg_t, ...>, similar to 1504 * ceph_pg_mapping. Used to track OSD backoffs -- a backoff [range] is 1505 * defined only within a specific spgid; it does not pass anything to 1506 * children on split, or to another primary. 1507 */ 1508 DEFINE_RB_FUNCS2(spg_mapping, struct ceph_spg_mapping, spgid, ceph_spg_compare, 1509 RB_BYPTR, const struct ceph_spg *, node) 1510 1511 static u64 hoid_get_bitwise_key(const struct ceph_hobject_id *hoid) 1512 { 1513 return hoid->is_max ? 0x100000000ull : hoid->hash_reverse_bits; 1514 } 1515 1516 static void hoid_get_effective_key(const struct ceph_hobject_id *hoid, 1517 void **pkey, size_t *pkey_len) 1518 { 1519 if (hoid->key_len) { 1520 *pkey = hoid->key; 1521 *pkey_len = hoid->key_len; 1522 } else { 1523 *pkey = hoid->oid; 1524 *pkey_len = hoid->oid_len; 1525 } 1526 } 1527 1528 static int compare_names(const void *name1, size_t name1_len, 1529 const void *name2, size_t name2_len) 1530 { 1531 int ret; 1532 1533 ret = memcmp(name1, name2, min(name1_len, name2_len)); 1534 if (!ret) { 1535 if (name1_len < name2_len) 1536 ret = -1; 1537 else if (name1_len > name2_len) 1538 ret = 1; 1539 } 1540 return ret; 1541 } 1542 1543 static int hoid_compare(const struct ceph_hobject_id *lhs, 1544 const struct ceph_hobject_id *rhs) 1545 { 1546 void *effective_key1, *effective_key2; 1547 size_t effective_key1_len, effective_key2_len; 1548 int ret; 1549 1550 if (lhs->is_max < rhs->is_max) 1551 return -1; 1552 if (lhs->is_max > rhs->is_max) 1553 return 1; 1554 1555 if (lhs->pool < rhs->pool) 1556 return -1; 1557 if (lhs->pool > rhs->pool) 1558 return 1; 1559 1560 if (hoid_get_bitwise_key(lhs) < hoid_get_bitwise_key(rhs)) 1561 return -1; 1562 if (hoid_get_bitwise_key(lhs) > hoid_get_bitwise_key(rhs)) 1563 return 1; 1564 1565 ret = compare_names(lhs->nspace, lhs->nspace_len, 1566 rhs->nspace, rhs->nspace_len); 1567 if (ret) 1568 return ret; 1569 1570 hoid_get_effective_key(lhs, &effective_key1, &effective_key1_len); 1571 hoid_get_effective_key(rhs, &effective_key2, &effective_key2_len); 1572 ret = compare_names(effective_key1, effective_key1_len, 1573 effective_key2, effective_key2_len); 1574 if (ret) 1575 return ret; 1576 1577 ret = compare_names(lhs->oid, lhs->oid_len, rhs->oid, rhs->oid_len); 1578 if (ret) 1579 return ret; 1580 1581 if (lhs->snapid < rhs->snapid) 1582 return -1; 1583 if (lhs->snapid > rhs->snapid) 1584 return 1; 1585 1586 return 0; 1587 } 1588 1589 /* 1590 * For decoding ->begin and ->end of MOSDBackoff only -- no MIN/MAX 1591 * compat stuff here. 1592 * 1593 * Assumes @hoid is zero-initialized. 1594 */ 1595 static int decode_hoid(void **p, void *end, struct ceph_hobject_id *hoid) 1596 { 1597 u8 struct_v; 1598 u32 struct_len; 1599 int ret; 1600 1601 ret = ceph_start_decoding(p, end, 4, "hobject_t", &struct_v, 1602 &struct_len); 1603 if (ret) 1604 return ret; 1605 1606 if (struct_v < 4) { 1607 pr_err("got struct_v %d < 4 of hobject_t\n", struct_v); 1608 goto e_inval; 1609 } 1610 1611 hoid->key = ceph_extract_encoded_string(p, end, &hoid->key_len, 1612 GFP_NOIO); 1613 if (IS_ERR(hoid->key)) { 1614 ret = PTR_ERR(hoid->key); 1615 hoid->key = NULL; 1616 return ret; 1617 } 1618 1619 hoid->oid = ceph_extract_encoded_string(p, end, &hoid->oid_len, 1620 GFP_NOIO); 1621 if (IS_ERR(hoid->oid)) { 1622 ret = PTR_ERR(hoid->oid); 1623 hoid->oid = NULL; 1624 return ret; 1625 } 1626 1627 ceph_decode_64_safe(p, end, hoid->snapid, e_inval); 1628 ceph_decode_32_safe(p, end, hoid->hash, e_inval); 1629 ceph_decode_8_safe(p, end, hoid->is_max, e_inval); 1630 1631 hoid->nspace = ceph_extract_encoded_string(p, end, &hoid->nspace_len, 1632 GFP_NOIO); 1633 if (IS_ERR(hoid->nspace)) { 1634 ret = PTR_ERR(hoid->nspace); 1635 hoid->nspace = NULL; 1636 return ret; 1637 } 1638 1639 ceph_decode_64_safe(p, end, hoid->pool, e_inval); 1640 1641 ceph_hoid_build_hash_cache(hoid); 1642 return 0; 1643 1644 e_inval: 1645 return -EINVAL; 1646 } 1647 1648 static int hoid_encoding_size(const struct ceph_hobject_id *hoid) 1649 { 1650 return 8 + 4 + 1 + 8 + /* snapid, hash, is_max, pool */ 1651 4 + hoid->key_len + 4 + hoid->oid_len + 4 + hoid->nspace_len; 1652 } 1653 1654 static void encode_hoid(void **p, void *end, const struct ceph_hobject_id *hoid) 1655 { 1656 ceph_start_encoding(p, 4, 3, hoid_encoding_size(hoid)); 1657 ceph_encode_string(p, end, hoid->key, hoid->key_len); 1658 ceph_encode_string(p, end, hoid->oid, hoid->oid_len); 1659 ceph_encode_64(p, hoid->snapid); 1660 ceph_encode_32(p, hoid->hash); 1661 ceph_encode_8(p, hoid->is_max); 1662 ceph_encode_string(p, end, hoid->nspace, hoid->nspace_len); 1663 ceph_encode_64(p, hoid->pool); 1664 } 1665 1666 static void free_hoid(struct ceph_hobject_id *hoid) 1667 { 1668 if (hoid) { 1669 kfree(hoid->key); 1670 kfree(hoid->oid); 1671 kfree(hoid->nspace); 1672 kfree(hoid); 1673 } 1674 } 1675 1676 static struct ceph_osd_backoff *alloc_backoff(void) 1677 { 1678 struct ceph_osd_backoff *backoff; 1679 1680 backoff = kzalloc(sizeof(*backoff), GFP_NOIO); 1681 if (!backoff) 1682 return NULL; 1683 1684 RB_CLEAR_NODE(&backoff->spg_node); 1685 RB_CLEAR_NODE(&backoff->id_node); 1686 return backoff; 1687 } 1688 1689 static void free_backoff(struct ceph_osd_backoff *backoff) 1690 { 1691 WARN_ON(!RB_EMPTY_NODE(&backoff->spg_node)); 1692 WARN_ON(!RB_EMPTY_NODE(&backoff->id_node)); 1693 1694 free_hoid(backoff->begin); 1695 free_hoid(backoff->end); 1696 kfree(backoff); 1697 } 1698 1699 /* 1700 * Within a specific spgid, backoffs are managed by ->begin hoid. 1701 */ 1702 DEFINE_RB_INSDEL_FUNCS2(backoff, struct ceph_osd_backoff, begin, hoid_compare, 1703 RB_BYVAL, spg_node); 1704 1705 static struct ceph_osd_backoff *lookup_containing_backoff(struct rb_root *root, 1706 const struct ceph_hobject_id *hoid) 1707 { 1708 struct rb_node *n = root->rb_node; 1709 1710 while (n) { 1711 struct ceph_osd_backoff *cur = 1712 rb_entry(n, struct ceph_osd_backoff, spg_node); 1713 int cmp; 1714 1715 cmp = hoid_compare(hoid, cur->begin); 1716 if (cmp < 0) { 1717 n = n->rb_left; 1718 } else if (cmp > 0) { 1719 if (hoid_compare(hoid, cur->end) < 0) 1720 return cur; 1721 1722 n = n->rb_right; 1723 } else { 1724 return cur; 1725 } 1726 } 1727 1728 return NULL; 1729 } 1730 1731 /* 1732 * Each backoff has a unique id within its OSD session. 1733 */ 1734 DEFINE_RB_FUNCS(backoff_by_id, struct ceph_osd_backoff, id, id_node) 1735 1736 static void clear_backoffs(struct ceph_osd *osd) 1737 { 1738 while (!RB_EMPTY_ROOT(&osd->o_backoff_mappings)) { 1739 struct ceph_spg_mapping *spg = 1740 rb_entry(rb_first(&osd->o_backoff_mappings), 1741 struct ceph_spg_mapping, node); 1742 1743 while (!RB_EMPTY_ROOT(&spg->backoffs)) { 1744 struct ceph_osd_backoff *backoff = 1745 rb_entry(rb_first(&spg->backoffs), 1746 struct ceph_osd_backoff, spg_node); 1747 1748 erase_backoff(&spg->backoffs, backoff); 1749 erase_backoff_by_id(&osd->o_backoffs_by_id, backoff); 1750 free_backoff(backoff); 1751 } 1752 erase_spg_mapping(&osd->o_backoff_mappings, spg); 1753 free_spg_mapping(spg); 1754 } 1755 } 1756 1757 /* 1758 * Set up a temporary, non-owning view into @t. 1759 */ 1760 static void hoid_fill_from_target(struct ceph_hobject_id *hoid, 1761 const struct ceph_osd_request_target *t) 1762 { 1763 hoid->key = NULL; 1764 hoid->key_len = 0; 1765 hoid->oid = t->target_oid.name; 1766 hoid->oid_len = t->target_oid.name_len; 1767 hoid->snapid = CEPH_NOSNAP; 1768 hoid->hash = t->pgid.seed; 1769 hoid->is_max = false; 1770 if (t->target_oloc.pool_ns) { 1771 hoid->nspace = t->target_oloc.pool_ns->str; 1772 hoid->nspace_len = t->target_oloc.pool_ns->len; 1773 } else { 1774 hoid->nspace = NULL; 1775 hoid->nspace_len = 0; 1776 } 1777 hoid->pool = t->target_oloc.pool; 1778 ceph_hoid_build_hash_cache(hoid); 1779 } 1780 1781 static bool should_plug_request(struct ceph_osd_request *req) 1782 { 1783 struct ceph_osd *osd = req->r_osd; 1784 struct ceph_spg_mapping *spg; 1785 struct ceph_osd_backoff *backoff; 1786 struct ceph_hobject_id hoid; 1787 1788 spg = lookup_spg_mapping(&osd->o_backoff_mappings, &req->r_t.spgid); 1789 if (!spg) 1790 return false; 1791 1792 hoid_fill_from_target(&hoid, &req->r_t); 1793 backoff = lookup_containing_backoff(&spg->backoffs, &hoid); 1794 if (!backoff) 1795 return false; 1796 1797 dout("%s req %p tid %llu backoff osd%d spgid %llu.%xs%d id %llu\n", 1798 __func__, req, req->r_tid, osd->o_osd, backoff->spgid.pgid.pool, 1799 backoff->spgid.pgid.seed, backoff->spgid.shard, backoff->id); 1800 return true; 1801 } 1802 1803 static void setup_request_data(struct ceph_osd_request *req, 1804 struct ceph_msg *msg) 1805 { 1806 u32 data_len = 0; 1807 int i; 1808 1809 if (!list_empty(&msg->data)) 1810 return; 1811 1812 WARN_ON(msg->data_length); 1813 for (i = 0; i < req->r_num_ops; i++) { 1814 struct ceph_osd_req_op *op = &req->r_ops[i]; 1815 1816 switch (op->op) { 1817 /* request */ 1818 case CEPH_OSD_OP_WRITE: 1819 case CEPH_OSD_OP_WRITEFULL: 1820 WARN_ON(op->indata_len != op->extent.length); 1821 ceph_osdc_msg_data_add(msg, &op->extent.osd_data); 1822 break; 1823 case CEPH_OSD_OP_SETXATTR: 1824 case CEPH_OSD_OP_CMPXATTR: 1825 WARN_ON(op->indata_len != op->xattr.name_len + 1826 op->xattr.value_len); 1827 ceph_osdc_msg_data_add(msg, &op->xattr.osd_data); 1828 break; 1829 case CEPH_OSD_OP_NOTIFY_ACK: 1830 ceph_osdc_msg_data_add(msg, 1831 &op->notify_ack.request_data); 1832 break; 1833 1834 /* reply */ 1835 case CEPH_OSD_OP_STAT: 1836 ceph_osdc_msg_data_add(req->r_reply, 1837 &op->raw_data_in); 1838 break; 1839 case CEPH_OSD_OP_READ: 1840 ceph_osdc_msg_data_add(req->r_reply, 1841 &op->extent.osd_data); 1842 break; 1843 case CEPH_OSD_OP_LIST_WATCHERS: 1844 ceph_osdc_msg_data_add(req->r_reply, 1845 &op->list_watchers.response_data); 1846 break; 1847 1848 /* both */ 1849 case CEPH_OSD_OP_CALL: 1850 WARN_ON(op->indata_len != op->cls.class_len + 1851 op->cls.method_len + 1852 op->cls.indata_len); 1853 ceph_osdc_msg_data_add(msg, &op->cls.request_info); 1854 /* optional, can be NONE */ 1855 ceph_osdc_msg_data_add(msg, &op->cls.request_data); 1856 /* optional, can be NONE */ 1857 ceph_osdc_msg_data_add(req->r_reply, 1858 &op->cls.response_data); 1859 break; 1860 case CEPH_OSD_OP_NOTIFY: 1861 ceph_osdc_msg_data_add(msg, 1862 &op->notify.request_data); 1863 ceph_osdc_msg_data_add(req->r_reply, 1864 &op->notify.response_data); 1865 break; 1866 } 1867 1868 data_len += op->indata_len; 1869 } 1870 1871 WARN_ON(data_len != msg->data_length); 1872 } 1873 1874 static void encode_pgid(void **p, const struct ceph_pg *pgid) 1875 { 1876 ceph_encode_8(p, 1); 1877 ceph_encode_64(p, pgid->pool); 1878 ceph_encode_32(p, pgid->seed); 1879 ceph_encode_32(p, -1); /* preferred */ 1880 } 1881 1882 static void encode_spgid(void **p, const struct ceph_spg *spgid) 1883 { 1884 ceph_start_encoding(p, 1, 1, CEPH_PGID_ENCODING_LEN + 1); 1885 encode_pgid(p, &spgid->pgid); 1886 ceph_encode_8(p, spgid->shard); 1887 } 1888 1889 static void encode_oloc(void **p, void *end, 1890 const struct ceph_object_locator *oloc) 1891 { 1892 ceph_start_encoding(p, 5, 4, ceph_oloc_encoding_size(oloc)); 1893 ceph_encode_64(p, oloc->pool); 1894 ceph_encode_32(p, -1); /* preferred */ 1895 ceph_encode_32(p, 0); /* key len */ 1896 if (oloc->pool_ns) 1897 ceph_encode_string(p, end, oloc->pool_ns->str, 1898 oloc->pool_ns->len); 1899 else 1900 ceph_encode_32(p, 0); 1901 } 1902 1903 static void encode_request_partial(struct ceph_osd_request *req, 1904 struct ceph_msg *msg) 1905 { 1906 void *p = msg->front.iov_base; 1907 void *const end = p + msg->front_alloc_len; 1908 u32 data_len = 0; 1909 int i; 1910 1911 if (req->r_flags & CEPH_OSD_FLAG_WRITE) { 1912 /* snapshots aren't writeable */ 1913 WARN_ON(req->r_snapid != CEPH_NOSNAP); 1914 } else { 1915 WARN_ON(req->r_mtime.tv_sec || req->r_mtime.tv_nsec || 1916 req->r_data_offset || req->r_snapc); 1917 } 1918 1919 setup_request_data(req, msg); 1920 1921 encode_spgid(&p, &req->r_t.spgid); /* actual spg */ 1922 ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */ 1923 ceph_encode_32(&p, req->r_osdc->osdmap->epoch); 1924 ceph_encode_32(&p, req->r_flags); 1925 1926 /* reqid */ 1927 ceph_start_encoding(&p, 2, 2, sizeof(struct ceph_osd_reqid)); 1928 memset(p, 0, sizeof(struct ceph_osd_reqid)); 1929 p += sizeof(struct ceph_osd_reqid); 1930 1931 /* trace */ 1932 memset(p, 0, sizeof(struct ceph_blkin_trace_info)); 1933 p += sizeof(struct ceph_blkin_trace_info); 1934 1935 ceph_encode_32(&p, 0); /* client_inc, always 0 */ 1936 ceph_encode_timespec(p, &req->r_mtime); 1937 p += sizeof(struct ceph_timespec); 1938 1939 encode_oloc(&p, end, &req->r_t.target_oloc); 1940 ceph_encode_string(&p, end, req->r_t.target_oid.name, 1941 req->r_t.target_oid.name_len); 1942 1943 /* ops, can imply data */ 1944 ceph_encode_16(&p, req->r_num_ops); 1945 for (i = 0; i < req->r_num_ops; i++) { 1946 data_len += osd_req_encode_op(p, &req->r_ops[i]); 1947 p += sizeof(struct ceph_osd_op); 1948 } 1949 1950 ceph_encode_64(&p, req->r_snapid); /* snapid */ 1951 if (req->r_snapc) { 1952 ceph_encode_64(&p, req->r_snapc->seq); 1953 ceph_encode_32(&p, req->r_snapc->num_snaps); 1954 for (i = 0; i < req->r_snapc->num_snaps; i++) 1955 ceph_encode_64(&p, req->r_snapc->snaps[i]); 1956 } else { 1957 ceph_encode_64(&p, 0); /* snap_seq */ 1958 ceph_encode_32(&p, 0); /* snaps len */ 1959 } 1960 1961 ceph_encode_32(&p, req->r_attempts); /* retry_attempt */ 1962 BUG_ON(p > end - 8); /* space for features */ 1963 1964 msg->hdr.version = cpu_to_le16(8); /* MOSDOp v8 */ 1965 /* front_len is finalized in encode_request_finish() */ 1966 msg->front.iov_len = p - msg->front.iov_base; 1967 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1968 msg->hdr.data_len = cpu_to_le32(data_len); 1969 /* 1970 * The header "data_off" is a hint to the receiver allowing it 1971 * to align received data into its buffers such that there's no 1972 * need to re-copy it before writing it to disk (direct I/O). 1973 */ 1974 msg->hdr.data_off = cpu_to_le16(req->r_data_offset); 1975 1976 dout("%s req %p msg %p oid %s oid_len %d\n", __func__, req, msg, 1977 req->r_t.target_oid.name, req->r_t.target_oid.name_len); 1978 } 1979 1980 static void encode_request_finish(struct ceph_msg *msg) 1981 { 1982 void *p = msg->front.iov_base; 1983 void *const partial_end = p + msg->front.iov_len; 1984 void *const end = p + msg->front_alloc_len; 1985 1986 if (CEPH_HAVE_FEATURE(msg->con->peer_features, RESEND_ON_SPLIT)) { 1987 /* luminous OSD -- encode features and be done */ 1988 p = partial_end; 1989 ceph_encode_64(&p, msg->con->peer_features); 1990 } else { 1991 struct { 1992 char spgid[CEPH_ENCODING_START_BLK_LEN + 1993 CEPH_PGID_ENCODING_LEN + 1]; 1994 __le32 hash; 1995 __le32 epoch; 1996 __le32 flags; 1997 char reqid[CEPH_ENCODING_START_BLK_LEN + 1998 sizeof(struct ceph_osd_reqid)]; 1999 char trace[sizeof(struct ceph_blkin_trace_info)]; 2000 __le32 client_inc; 2001 struct ceph_timespec mtime; 2002 } __packed head; 2003 struct ceph_pg pgid; 2004 void *oloc, *oid, *tail; 2005 int oloc_len, oid_len, tail_len; 2006 int len; 2007 2008 /* 2009 * Pre-luminous OSD -- reencode v8 into v4 using @head 2010 * as a temporary buffer. Encode the raw PG; the rest 2011 * is just a matter of moving oloc, oid and tail blobs 2012 * around. 2013 */ 2014 memcpy(&head, p, sizeof(head)); 2015 p += sizeof(head); 2016 2017 oloc = p; 2018 p += CEPH_ENCODING_START_BLK_LEN; 2019 pgid.pool = ceph_decode_64(&p); 2020 p += 4 + 4; /* preferred, key len */ 2021 len = ceph_decode_32(&p); 2022 p += len; /* nspace */ 2023 oloc_len = p - oloc; 2024 2025 oid = p; 2026 len = ceph_decode_32(&p); 2027 p += len; 2028 oid_len = p - oid; 2029 2030 tail = p; 2031 tail_len = partial_end - p; 2032 2033 p = msg->front.iov_base; 2034 ceph_encode_copy(&p, &head.client_inc, sizeof(head.client_inc)); 2035 ceph_encode_copy(&p, &head.epoch, sizeof(head.epoch)); 2036 ceph_encode_copy(&p, &head.flags, sizeof(head.flags)); 2037 ceph_encode_copy(&p, &head.mtime, sizeof(head.mtime)); 2038 2039 /* reassert_version */ 2040 memset(p, 0, sizeof(struct ceph_eversion)); 2041 p += sizeof(struct ceph_eversion); 2042 2043 BUG_ON(p >= oloc); 2044 memmove(p, oloc, oloc_len); 2045 p += oloc_len; 2046 2047 pgid.seed = le32_to_cpu(head.hash); 2048 encode_pgid(&p, &pgid); /* raw pg */ 2049 2050 BUG_ON(p >= oid); 2051 memmove(p, oid, oid_len); 2052 p += oid_len; 2053 2054 /* tail -- ops, snapid, snapc, retry_attempt */ 2055 BUG_ON(p >= tail); 2056 memmove(p, tail, tail_len); 2057 p += tail_len; 2058 2059 msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */ 2060 } 2061 2062 BUG_ON(p > end); 2063 msg->front.iov_len = p - msg->front.iov_base; 2064 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2065 2066 dout("%s msg %p tid %llu %u+%u+%u v%d\n", __func__, msg, 2067 le64_to_cpu(msg->hdr.tid), le32_to_cpu(msg->hdr.front_len), 2068 le32_to_cpu(msg->hdr.middle_len), le32_to_cpu(msg->hdr.data_len), 2069 le16_to_cpu(msg->hdr.version)); 2070 } 2071 2072 /* 2073 * @req has to be assigned a tid and registered. 2074 */ 2075 static void send_request(struct ceph_osd_request *req) 2076 { 2077 struct ceph_osd *osd = req->r_osd; 2078 2079 verify_osd_locked(osd); 2080 WARN_ON(osd->o_osd != req->r_t.osd); 2081 2082 /* backoff? */ 2083 if (should_plug_request(req)) 2084 return; 2085 2086 /* 2087 * We may have a previously queued request message hanging 2088 * around. Cancel it to avoid corrupting the msgr. 2089 */ 2090 if (req->r_sent) 2091 ceph_msg_revoke(req->r_request); 2092 2093 req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR; 2094 if (req->r_attempts) 2095 req->r_flags |= CEPH_OSD_FLAG_RETRY; 2096 else 2097 WARN_ON(req->r_flags & CEPH_OSD_FLAG_RETRY); 2098 2099 encode_request_partial(req, req->r_request); 2100 2101 dout("%s req %p tid %llu to pgid %llu.%x spgid %llu.%xs%d osd%d e%u flags 0x%x attempt %d\n", 2102 __func__, req, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed, 2103 req->r_t.spgid.pgid.pool, req->r_t.spgid.pgid.seed, 2104 req->r_t.spgid.shard, osd->o_osd, req->r_t.epoch, req->r_flags, 2105 req->r_attempts); 2106 2107 req->r_t.paused = false; 2108 req->r_stamp = jiffies; 2109 req->r_attempts++; 2110 2111 req->r_sent = osd->o_incarnation; 2112 req->r_request->hdr.tid = cpu_to_le64(req->r_tid); 2113 ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request)); 2114 } 2115 2116 static void maybe_request_map(struct ceph_osd_client *osdc) 2117 { 2118 bool continuous = false; 2119 2120 verify_osdc_locked(osdc); 2121 WARN_ON(!osdc->osdmap->epoch); 2122 2123 if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 2124 ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD) || 2125 ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) { 2126 dout("%s osdc %p continuous\n", __func__, osdc); 2127 continuous = true; 2128 } else { 2129 dout("%s osdc %p onetime\n", __func__, osdc); 2130 } 2131 2132 if (ceph_monc_want_map(&osdc->client->monc, CEPH_SUB_OSDMAP, 2133 osdc->osdmap->epoch + 1, continuous)) 2134 ceph_monc_renew_subs(&osdc->client->monc); 2135 } 2136 2137 static void complete_request(struct ceph_osd_request *req, int err); 2138 static void send_map_check(struct ceph_osd_request *req); 2139 2140 static void __submit_request(struct ceph_osd_request *req, bool wrlocked) 2141 { 2142 struct ceph_osd_client *osdc = req->r_osdc; 2143 struct ceph_osd *osd; 2144 enum calc_target_result ct_res; 2145 bool need_send = false; 2146 bool promoted = false; 2147 bool need_abort = false; 2148 2149 WARN_ON(req->r_tid); 2150 dout("%s req %p wrlocked %d\n", __func__, req, wrlocked); 2151 2152 again: 2153 ct_res = calc_target(osdc, &req->r_t, NULL, false); 2154 if (ct_res == CALC_TARGET_POOL_DNE && !wrlocked) 2155 goto promote; 2156 2157 osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked); 2158 if (IS_ERR(osd)) { 2159 WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked); 2160 goto promote; 2161 } 2162 2163 if (osdc->osdmap->epoch < osdc->epoch_barrier) { 2164 dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch, 2165 osdc->epoch_barrier); 2166 req->r_t.paused = true; 2167 maybe_request_map(osdc); 2168 } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && 2169 ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) { 2170 dout("req %p pausewr\n", req); 2171 req->r_t.paused = true; 2172 maybe_request_map(osdc); 2173 } else if ((req->r_flags & CEPH_OSD_FLAG_READ) && 2174 ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) { 2175 dout("req %p pauserd\n", req); 2176 req->r_t.paused = true; 2177 maybe_request_map(osdc); 2178 } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && 2179 !(req->r_flags & (CEPH_OSD_FLAG_FULL_TRY | 2180 CEPH_OSD_FLAG_FULL_FORCE)) && 2181 (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 2182 pool_full(osdc, req->r_t.base_oloc.pool))) { 2183 dout("req %p full/pool_full\n", req); 2184 pr_warn_ratelimited("FULL or reached pool quota\n"); 2185 req->r_t.paused = true; 2186 maybe_request_map(osdc); 2187 if (req->r_abort_on_full) 2188 need_abort = true; 2189 } else if (!osd_homeless(osd)) { 2190 need_send = true; 2191 } else { 2192 maybe_request_map(osdc); 2193 } 2194 2195 mutex_lock(&osd->lock); 2196 /* 2197 * Assign the tid atomically with send_request() to protect 2198 * multiple writes to the same object from racing with each 2199 * other, resulting in out of order ops on the OSDs. 2200 */ 2201 req->r_tid = atomic64_inc_return(&osdc->last_tid); 2202 link_request(osd, req); 2203 if (need_send) 2204 send_request(req); 2205 else if (need_abort) 2206 complete_request(req, -ENOSPC); 2207 mutex_unlock(&osd->lock); 2208 2209 if (ct_res == CALC_TARGET_POOL_DNE) 2210 send_map_check(req); 2211 2212 if (promoted) 2213 downgrade_write(&osdc->lock); 2214 return; 2215 2216 promote: 2217 up_read(&osdc->lock); 2218 down_write(&osdc->lock); 2219 wrlocked = true; 2220 promoted = true; 2221 goto again; 2222 } 2223 2224 static void account_request(struct ceph_osd_request *req) 2225 { 2226 WARN_ON(req->r_flags & (CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK)); 2227 WARN_ON(!(req->r_flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE))); 2228 2229 req->r_flags |= CEPH_OSD_FLAG_ONDISK; 2230 atomic_inc(&req->r_osdc->num_requests); 2231 2232 req->r_start_stamp = jiffies; 2233 } 2234 2235 static void submit_request(struct ceph_osd_request *req, bool wrlocked) 2236 { 2237 ceph_osdc_get_request(req); 2238 account_request(req); 2239 __submit_request(req, wrlocked); 2240 } 2241 2242 static void finish_request(struct ceph_osd_request *req) 2243 { 2244 struct ceph_osd_client *osdc = req->r_osdc; 2245 2246 WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid)); 2247 dout("%s req %p tid %llu\n", __func__, req, req->r_tid); 2248 2249 if (req->r_osd) 2250 unlink_request(req->r_osd, req); 2251 atomic_dec(&osdc->num_requests); 2252 2253 /* 2254 * If an OSD has failed or returned and a request has been sent 2255 * twice, it's possible to get a reply and end up here while the 2256 * request message is queued for delivery. We will ignore the 2257 * reply, so not a big deal, but better to try and catch it. 2258 */ 2259 ceph_msg_revoke(req->r_request); 2260 ceph_msg_revoke_incoming(req->r_reply); 2261 } 2262 2263 static void __complete_request(struct ceph_osd_request *req) 2264 { 2265 if (req->r_callback) { 2266 dout("%s req %p tid %llu cb %pf result %d\n", __func__, req, 2267 req->r_tid, req->r_callback, req->r_result); 2268 req->r_callback(req); 2269 } 2270 } 2271 2272 /* 2273 * This is open-coded in handle_reply(). 2274 */ 2275 static void complete_request(struct ceph_osd_request *req, int err) 2276 { 2277 dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err); 2278 2279 req->r_result = err; 2280 finish_request(req); 2281 __complete_request(req); 2282 complete_all(&req->r_completion); 2283 ceph_osdc_put_request(req); 2284 } 2285 2286 static void cancel_map_check(struct ceph_osd_request *req) 2287 { 2288 struct ceph_osd_client *osdc = req->r_osdc; 2289 struct ceph_osd_request *lookup_req; 2290 2291 verify_osdc_wrlocked(osdc); 2292 2293 lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid); 2294 if (!lookup_req) 2295 return; 2296 2297 WARN_ON(lookup_req != req); 2298 erase_request_mc(&osdc->map_checks, req); 2299 ceph_osdc_put_request(req); 2300 } 2301 2302 static void cancel_request(struct ceph_osd_request *req) 2303 { 2304 dout("%s req %p tid %llu\n", __func__, req, req->r_tid); 2305 2306 cancel_map_check(req); 2307 finish_request(req); 2308 complete_all(&req->r_completion); 2309 ceph_osdc_put_request(req); 2310 } 2311 2312 static void abort_request(struct ceph_osd_request *req, int err) 2313 { 2314 dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err); 2315 2316 cancel_map_check(req); 2317 complete_request(req, err); 2318 } 2319 2320 static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb) 2321 { 2322 if (likely(eb > osdc->epoch_barrier)) { 2323 dout("updating epoch_barrier from %u to %u\n", 2324 osdc->epoch_barrier, eb); 2325 osdc->epoch_barrier = eb; 2326 /* Request map if we're not to the barrier yet */ 2327 if (eb > osdc->osdmap->epoch) 2328 maybe_request_map(osdc); 2329 } 2330 } 2331 2332 void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb) 2333 { 2334 down_read(&osdc->lock); 2335 if (unlikely(eb > osdc->epoch_barrier)) { 2336 up_read(&osdc->lock); 2337 down_write(&osdc->lock); 2338 update_epoch_barrier(osdc, eb); 2339 up_write(&osdc->lock); 2340 } else { 2341 up_read(&osdc->lock); 2342 } 2343 } 2344 EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier); 2345 2346 /* 2347 * Drop all pending requests that are stalled waiting on a full condition to 2348 * clear, and complete them with ENOSPC as the return code. Set the 2349 * osdc->epoch_barrier to the latest map epoch that we've seen if any were 2350 * cancelled. 2351 */ 2352 static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc) 2353 { 2354 struct rb_node *n; 2355 bool victims = false; 2356 2357 dout("enter abort_on_full\n"); 2358 2359 if (!ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) && !have_pool_full(osdc)) 2360 goto out; 2361 2362 /* Scan list and see if there is anything to abort */ 2363 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { 2364 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); 2365 struct rb_node *m; 2366 2367 m = rb_first(&osd->o_requests); 2368 while (m) { 2369 struct ceph_osd_request *req = rb_entry(m, 2370 struct ceph_osd_request, r_node); 2371 m = rb_next(m); 2372 2373 if (req->r_abort_on_full) { 2374 victims = true; 2375 break; 2376 } 2377 } 2378 if (victims) 2379 break; 2380 } 2381 2382 if (!victims) 2383 goto out; 2384 2385 /* 2386 * Update the barrier to current epoch if it's behind that point, 2387 * since we know we have some calls to be aborted in the tree. 2388 */ 2389 update_epoch_barrier(osdc, osdc->osdmap->epoch); 2390 2391 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { 2392 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); 2393 struct rb_node *m; 2394 2395 m = rb_first(&osd->o_requests); 2396 while (m) { 2397 struct ceph_osd_request *req = rb_entry(m, 2398 struct ceph_osd_request, r_node); 2399 m = rb_next(m); 2400 2401 if (req->r_abort_on_full && 2402 (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 2403 pool_full(osdc, req->r_t.target_oloc.pool))) 2404 abort_request(req, -ENOSPC); 2405 } 2406 } 2407 out: 2408 dout("return abort_on_full barrier=%u\n", osdc->epoch_barrier); 2409 } 2410 2411 static void check_pool_dne(struct ceph_osd_request *req) 2412 { 2413 struct ceph_osd_client *osdc = req->r_osdc; 2414 struct ceph_osdmap *map = osdc->osdmap; 2415 2416 verify_osdc_wrlocked(osdc); 2417 WARN_ON(!map->epoch); 2418 2419 if (req->r_attempts) { 2420 /* 2421 * We sent a request earlier, which means that 2422 * previously the pool existed, and now it does not 2423 * (i.e., it was deleted). 2424 */ 2425 req->r_map_dne_bound = map->epoch; 2426 dout("%s req %p tid %llu pool disappeared\n", __func__, req, 2427 req->r_tid); 2428 } else { 2429 dout("%s req %p tid %llu map_dne_bound %u have %u\n", __func__, 2430 req, req->r_tid, req->r_map_dne_bound, map->epoch); 2431 } 2432 2433 if (req->r_map_dne_bound) { 2434 if (map->epoch >= req->r_map_dne_bound) { 2435 /* we had a new enough map */ 2436 pr_info_ratelimited("tid %llu pool does not exist\n", 2437 req->r_tid); 2438 complete_request(req, -ENOENT); 2439 } 2440 } else { 2441 send_map_check(req); 2442 } 2443 } 2444 2445 static void map_check_cb(struct ceph_mon_generic_request *greq) 2446 { 2447 struct ceph_osd_client *osdc = &greq->monc->client->osdc; 2448 struct ceph_osd_request *req; 2449 u64 tid = greq->private_data; 2450 2451 WARN_ON(greq->result || !greq->u.newest); 2452 2453 down_write(&osdc->lock); 2454 req = lookup_request_mc(&osdc->map_checks, tid); 2455 if (!req) { 2456 dout("%s tid %llu dne\n", __func__, tid); 2457 goto out_unlock; 2458 } 2459 2460 dout("%s req %p tid %llu map_dne_bound %u newest %llu\n", __func__, 2461 req, req->r_tid, req->r_map_dne_bound, greq->u.newest); 2462 if (!req->r_map_dne_bound) 2463 req->r_map_dne_bound = greq->u.newest; 2464 erase_request_mc(&osdc->map_checks, req); 2465 check_pool_dne(req); 2466 2467 ceph_osdc_put_request(req); 2468 out_unlock: 2469 up_write(&osdc->lock); 2470 } 2471 2472 static void send_map_check(struct ceph_osd_request *req) 2473 { 2474 struct ceph_osd_client *osdc = req->r_osdc; 2475 struct ceph_osd_request *lookup_req; 2476 int ret; 2477 2478 verify_osdc_wrlocked(osdc); 2479 2480 lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid); 2481 if (lookup_req) { 2482 WARN_ON(lookup_req != req); 2483 return; 2484 } 2485 2486 ceph_osdc_get_request(req); 2487 insert_request_mc(&osdc->map_checks, req); 2488 ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap", 2489 map_check_cb, req->r_tid); 2490 WARN_ON(ret); 2491 } 2492 2493 /* 2494 * lingering requests, watch/notify v2 infrastructure 2495 */ 2496 static void linger_release(struct kref *kref) 2497 { 2498 struct ceph_osd_linger_request *lreq = 2499 container_of(kref, struct ceph_osd_linger_request, kref); 2500 2501 dout("%s lreq %p reg_req %p ping_req %p\n", __func__, lreq, 2502 lreq->reg_req, lreq->ping_req); 2503 WARN_ON(!RB_EMPTY_NODE(&lreq->node)); 2504 WARN_ON(!RB_EMPTY_NODE(&lreq->osdc_node)); 2505 WARN_ON(!RB_EMPTY_NODE(&lreq->mc_node)); 2506 WARN_ON(!list_empty(&lreq->scan_item)); 2507 WARN_ON(!list_empty(&lreq->pending_lworks)); 2508 WARN_ON(lreq->osd); 2509 2510 if (lreq->reg_req) 2511 ceph_osdc_put_request(lreq->reg_req); 2512 if (lreq->ping_req) 2513 ceph_osdc_put_request(lreq->ping_req); 2514 target_destroy(&lreq->t); 2515 kfree(lreq); 2516 } 2517 2518 static void linger_put(struct ceph_osd_linger_request *lreq) 2519 { 2520 if (lreq) 2521 kref_put(&lreq->kref, linger_release); 2522 } 2523 2524 static struct ceph_osd_linger_request * 2525 linger_get(struct ceph_osd_linger_request *lreq) 2526 { 2527 kref_get(&lreq->kref); 2528 return lreq; 2529 } 2530 2531 static struct ceph_osd_linger_request * 2532 linger_alloc(struct ceph_osd_client *osdc) 2533 { 2534 struct ceph_osd_linger_request *lreq; 2535 2536 lreq = kzalloc(sizeof(*lreq), GFP_NOIO); 2537 if (!lreq) 2538 return NULL; 2539 2540 kref_init(&lreq->kref); 2541 mutex_init(&lreq->lock); 2542 RB_CLEAR_NODE(&lreq->node); 2543 RB_CLEAR_NODE(&lreq->osdc_node); 2544 RB_CLEAR_NODE(&lreq->mc_node); 2545 INIT_LIST_HEAD(&lreq->scan_item); 2546 INIT_LIST_HEAD(&lreq->pending_lworks); 2547 init_completion(&lreq->reg_commit_wait); 2548 init_completion(&lreq->notify_finish_wait); 2549 2550 lreq->osdc = osdc; 2551 target_init(&lreq->t); 2552 2553 dout("%s lreq %p\n", __func__, lreq); 2554 return lreq; 2555 } 2556 2557 DEFINE_RB_INSDEL_FUNCS(linger, struct ceph_osd_linger_request, linger_id, node) 2558 DEFINE_RB_FUNCS(linger_osdc, struct ceph_osd_linger_request, linger_id, osdc_node) 2559 DEFINE_RB_FUNCS(linger_mc, struct ceph_osd_linger_request, linger_id, mc_node) 2560 2561 /* 2562 * Create linger request <-> OSD session relation. 2563 * 2564 * @lreq has to be registered, @osd may be homeless. 2565 */ 2566 static void link_linger(struct ceph_osd *osd, 2567 struct ceph_osd_linger_request *lreq) 2568 { 2569 verify_osd_locked(osd); 2570 WARN_ON(!lreq->linger_id || lreq->osd); 2571 dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd, 2572 osd->o_osd, lreq, lreq->linger_id); 2573 2574 if (!osd_homeless(osd)) 2575 __remove_osd_from_lru(osd); 2576 else 2577 atomic_inc(&osd->o_osdc->num_homeless); 2578 2579 get_osd(osd); 2580 insert_linger(&osd->o_linger_requests, lreq); 2581 lreq->osd = osd; 2582 } 2583 2584 static void unlink_linger(struct ceph_osd *osd, 2585 struct ceph_osd_linger_request *lreq) 2586 { 2587 verify_osd_locked(osd); 2588 WARN_ON(lreq->osd != osd); 2589 dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd, 2590 osd->o_osd, lreq, lreq->linger_id); 2591 2592 lreq->osd = NULL; 2593 erase_linger(&osd->o_linger_requests, lreq); 2594 put_osd(osd); 2595 2596 if (!osd_homeless(osd)) 2597 maybe_move_osd_to_lru(osd); 2598 else 2599 atomic_dec(&osd->o_osdc->num_homeless); 2600 } 2601 2602 static bool __linger_registered(struct ceph_osd_linger_request *lreq) 2603 { 2604 verify_osdc_locked(lreq->osdc); 2605 2606 return !RB_EMPTY_NODE(&lreq->osdc_node); 2607 } 2608 2609 static bool linger_registered(struct ceph_osd_linger_request *lreq) 2610 { 2611 struct ceph_osd_client *osdc = lreq->osdc; 2612 bool registered; 2613 2614 down_read(&osdc->lock); 2615 registered = __linger_registered(lreq); 2616 up_read(&osdc->lock); 2617 2618 return registered; 2619 } 2620 2621 static void linger_register(struct ceph_osd_linger_request *lreq) 2622 { 2623 struct ceph_osd_client *osdc = lreq->osdc; 2624 2625 verify_osdc_wrlocked(osdc); 2626 WARN_ON(lreq->linger_id); 2627 2628 linger_get(lreq); 2629 lreq->linger_id = ++osdc->last_linger_id; 2630 insert_linger_osdc(&osdc->linger_requests, lreq); 2631 } 2632 2633 static void linger_unregister(struct ceph_osd_linger_request *lreq) 2634 { 2635 struct ceph_osd_client *osdc = lreq->osdc; 2636 2637 verify_osdc_wrlocked(osdc); 2638 2639 erase_linger_osdc(&osdc->linger_requests, lreq); 2640 linger_put(lreq); 2641 } 2642 2643 static void cancel_linger_request(struct ceph_osd_request *req) 2644 { 2645 struct ceph_osd_linger_request *lreq = req->r_priv; 2646 2647 WARN_ON(!req->r_linger); 2648 cancel_request(req); 2649 linger_put(lreq); 2650 } 2651 2652 struct linger_work { 2653 struct work_struct work; 2654 struct ceph_osd_linger_request *lreq; 2655 struct list_head pending_item; 2656 unsigned long queued_stamp; 2657 2658 union { 2659 struct { 2660 u64 notify_id; 2661 u64 notifier_id; 2662 void *payload; /* points into @msg front */ 2663 size_t payload_len; 2664 2665 struct ceph_msg *msg; /* for ceph_msg_put() */ 2666 } notify; 2667 struct { 2668 int err; 2669 } error; 2670 }; 2671 }; 2672 2673 static struct linger_work *lwork_alloc(struct ceph_osd_linger_request *lreq, 2674 work_func_t workfn) 2675 { 2676 struct linger_work *lwork; 2677 2678 lwork = kzalloc(sizeof(*lwork), GFP_NOIO); 2679 if (!lwork) 2680 return NULL; 2681 2682 INIT_WORK(&lwork->work, workfn); 2683 INIT_LIST_HEAD(&lwork->pending_item); 2684 lwork->lreq = linger_get(lreq); 2685 2686 return lwork; 2687 } 2688 2689 static void lwork_free(struct linger_work *lwork) 2690 { 2691 struct ceph_osd_linger_request *lreq = lwork->lreq; 2692 2693 mutex_lock(&lreq->lock); 2694 list_del(&lwork->pending_item); 2695 mutex_unlock(&lreq->lock); 2696 2697 linger_put(lreq); 2698 kfree(lwork); 2699 } 2700 2701 static void lwork_queue(struct linger_work *lwork) 2702 { 2703 struct ceph_osd_linger_request *lreq = lwork->lreq; 2704 struct ceph_osd_client *osdc = lreq->osdc; 2705 2706 verify_lreq_locked(lreq); 2707 WARN_ON(!list_empty(&lwork->pending_item)); 2708 2709 lwork->queued_stamp = jiffies; 2710 list_add_tail(&lwork->pending_item, &lreq->pending_lworks); 2711 queue_work(osdc->notify_wq, &lwork->work); 2712 } 2713 2714 static void do_watch_notify(struct work_struct *w) 2715 { 2716 struct linger_work *lwork = container_of(w, struct linger_work, work); 2717 struct ceph_osd_linger_request *lreq = lwork->lreq; 2718 2719 if (!linger_registered(lreq)) { 2720 dout("%s lreq %p not registered\n", __func__, lreq); 2721 goto out; 2722 } 2723 2724 WARN_ON(!lreq->is_watch); 2725 dout("%s lreq %p notify_id %llu notifier_id %llu payload_len %zu\n", 2726 __func__, lreq, lwork->notify.notify_id, lwork->notify.notifier_id, 2727 lwork->notify.payload_len); 2728 lreq->wcb(lreq->data, lwork->notify.notify_id, lreq->linger_id, 2729 lwork->notify.notifier_id, lwork->notify.payload, 2730 lwork->notify.payload_len); 2731 2732 out: 2733 ceph_msg_put(lwork->notify.msg); 2734 lwork_free(lwork); 2735 } 2736 2737 static void do_watch_error(struct work_struct *w) 2738 { 2739 struct linger_work *lwork = container_of(w, struct linger_work, work); 2740 struct ceph_osd_linger_request *lreq = lwork->lreq; 2741 2742 if (!linger_registered(lreq)) { 2743 dout("%s lreq %p not registered\n", __func__, lreq); 2744 goto out; 2745 } 2746 2747 dout("%s lreq %p err %d\n", __func__, lreq, lwork->error.err); 2748 lreq->errcb(lreq->data, lreq->linger_id, lwork->error.err); 2749 2750 out: 2751 lwork_free(lwork); 2752 } 2753 2754 static void queue_watch_error(struct ceph_osd_linger_request *lreq) 2755 { 2756 struct linger_work *lwork; 2757 2758 lwork = lwork_alloc(lreq, do_watch_error); 2759 if (!lwork) { 2760 pr_err("failed to allocate error-lwork\n"); 2761 return; 2762 } 2763 2764 lwork->error.err = lreq->last_error; 2765 lwork_queue(lwork); 2766 } 2767 2768 static void linger_reg_commit_complete(struct ceph_osd_linger_request *lreq, 2769 int result) 2770 { 2771 if (!completion_done(&lreq->reg_commit_wait)) { 2772 lreq->reg_commit_error = (result <= 0 ? result : 0); 2773 complete_all(&lreq->reg_commit_wait); 2774 } 2775 } 2776 2777 static void linger_commit_cb(struct ceph_osd_request *req) 2778 { 2779 struct ceph_osd_linger_request *lreq = req->r_priv; 2780 2781 mutex_lock(&lreq->lock); 2782 dout("%s lreq %p linger_id %llu result %d\n", __func__, lreq, 2783 lreq->linger_id, req->r_result); 2784 linger_reg_commit_complete(lreq, req->r_result); 2785 lreq->committed = true; 2786 2787 if (!lreq->is_watch) { 2788 struct ceph_osd_data *osd_data = 2789 osd_req_op_data(req, 0, notify, response_data); 2790 void *p = page_address(osd_data->pages[0]); 2791 2792 WARN_ON(req->r_ops[0].op != CEPH_OSD_OP_NOTIFY || 2793 osd_data->type != CEPH_OSD_DATA_TYPE_PAGES); 2794 2795 /* make note of the notify_id */ 2796 if (req->r_ops[0].outdata_len >= sizeof(u64)) { 2797 lreq->notify_id = ceph_decode_64(&p); 2798 dout("lreq %p notify_id %llu\n", lreq, 2799 lreq->notify_id); 2800 } else { 2801 dout("lreq %p no notify_id\n", lreq); 2802 } 2803 } 2804 2805 mutex_unlock(&lreq->lock); 2806 linger_put(lreq); 2807 } 2808 2809 static int normalize_watch_error(int err) 2810 { 2811 /* 2812 * Translate ENOENT -> ENOTCONN so that a delete->disconnection 2813 * notification and a failure to reconnect because we raced with 2814 * the delete appear the same to the user. 2815 */ 2816 if (err == -ENOENT) 2817 err = -ENOTCONN; 2818 2819 return err; 2820 } 2821 2822 static void linger_reconnect_cb(struct ceph_osd_request *req) 2823 { 2824 struct ceph_osd_linger_request *lreq = req->r_priv; 2825 2826 mutex_lock(&lreq->lock); 2827 dout("%s lreq %p linger_id %llu result %d last_error %d\n", __func__, 2828 lreq, lreq->linger_id, req->r_result, lreq->last_error); 2829 if (req->r_result < 0) { 2830 if (!lreq->last_error) { 2831 lreq->last_error = normalize_watch_error(req->r_result); 2832 queue_watch_error(lreq); 2833 } 2834 } 2835 2836 mutex_unlock(&lreq->lock); 2837 linger_put(lreq); 2838 } 2839 2840 static void send_linger(struct ceph_osd_linger_request *lreq) 2841 { 2842 struct ceph_osd_request *req = lreq->reg_req; 2843 struct ceph_osd_req_op *op = &req->r_ops[0]; 2844 2845 verify_osdc_wrlocked(req->r_osdc); 2846 dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id); 2847 2848 if (req->r_osd) 2849 cancel_linger_request(req); 2850 2851 request_reinit(req); 2852 ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); 2853 ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); 2854 req->r_flags = lreq->t.flags; 2855 req->r_mtime = lreq->mtime; 2856 2857 mutex_lock(&lreq->lock); 2858 if (lreq->is_watch && lreq->committed) { 2859 WARN_ON(op->op != CEPH_OSD_OP_WATCH || 2860 op->watch.cookie != lreq->linger_id); 2861 op->watch.op = CEPH_OSD_WATCH_OP_RECONNECT; 2862 op->watch.gen = ++lreq->register_gen; 2863 dout("lreq %p reconnect register_gen %u\n", lreq, 2864 op->watch.gen); 2865 req->r_callback = linger_reconnect_cb; 2866 } else { 2867 if (!lreq->is_watch) 2868 lreq->notify_id = 0; 2869 else 2870 WARN_ON(op->watch.op != CEPH_OSD_WATCH_OP_WATCH); 2871 dout("lreq %p register\n", lreq); 2872 req->r_callback = linger_commit_cb; 2873 } 2874 mutex_unlock(&lreq->lock); 2875 2876 req->r_priv = linger_get(lreq); 2877 req->r_linger = true; 2878 2879 submit_request(req, true); 2880 } 2881 2882 static void linger_ping_cb(struct ceph_osd_request *req) 2883 { 2884 struct ceph_osd_linger_request *lreq = req->r_priv; 2885 2886 mutex_lock(&lreq->lock); 2887 dout("%s lreq %p linger_id %llu result %d ping_sent %lu last_error %d\n", 2888 __func__, lreq, lreq->linger_id, req->r_result, lreq->ping_sent, 2889 lreq->last_error); 2890 if (lreq->register_gen == req->r_ops[0].watch.gen) { 2891 if (!req->r_result) { 2892 lreq->watch_valid_thru = lreq->ping_sent; 2893 } else if (!lreq->last_error) { 2894 lreq->last_error = normalize_watch_error(req->r_result); 2895 queue_watch_error(lreq); 2896 } 2897 } else { 2898 dout("lreq %p register_gen %u ignoring old pong %u\n", lreq, 2899 lreq->register_gen, req->r_ops[0].watch.gen); 2900 } 2901 2902 mutex_unlock(&lreq->lock); 2903 linger_put(lreq); 2904 } 2905 2906 static void send_linger_ping(struct ceph_osd_linger_request *lreq) 2907 { 2908 struct ceph_osd_client *osdc = lreq->osdc; 2909 struct ceph_osd_request *req = lreq->ping_req; 2910 struct ceph_osd_req_op *op = &req->r_ops[0]; 2911 2912 if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) { 2913 dout("%s PAUSERD\n", __func__); 2914 return; 2915 } 2916 2917 lreq->ping_sent = jiffies; 2918 dout("%s lreq %p linger_id %llu ping_sent %lu register_gen %u\n", 2919 __func__, lreq, lreq->linger_id, lreq->ping_sent, 2920 lreq->register_gen); 2921 2922 if (req->r_osd) 2923 cancel_linger_request(req); 2924 2925 request_reinit(req); 2926 target_copy(&req->r_t, &lreq->t); 2927 2928 WARN_ON(op->op != CEPH_OSD_OP_WATCH || 2929 op->watch.cookie != lreq->linger_id || 2930 op->watch.op != CEPH_OSD_WATCH_OP_PING); 2931 op->watch.gen = lreq->register_gen; 2932 req->r_callback = linger_ping_cb; 2933 req->r_priv = linger_get(lreq); 2934 req->r_linger = true; 2935 2936 ceph_osdc_get_request(req); 2937 account_request(req); 2938 req->r_tid = atomic64_inc_return(&osdc->last_tid); 2939 link_request(lreq->osd, req); 2940 send_request(req); 2941 } 2942 2943 static void linger_submit(struct ceph_osd_linger_request *lreq) 2944 { 2945 struct ceph_osd_client *osdc = lreq->osdc; 2946 struct ceph_osd *osd; 2947 2948 calc_target(osdc, &lreq->t, NULL, false); 2949 osd = lookup_create_osd(osdc, lreq->t.osd, true); 2950 link_linger(osd, lreq); 2951 2952 send_linger(lreq); 2953 } 2954 2955 static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq) 2956 { 2957 struct ceph_osd_client *osdc = lreq->osdc; 2958 struct ceph_osd_linger_request *lookup_lreq; 2959 2960 verify_osdc_wrlocked(osdc); 2961 2962 lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks, 2963 lreq->linger_id); 2964 if (!lookup_lreq) 2965 return; 2966 2967 WARN_ON(lookup_lreq != lreq); 2968 erase_linger_mc(&osdc->linger_map_checks, lreq); 2969 linger_put(lreq); 2970 } 2971 2972 /* 2973 * @lreq has to be both registered and linked. 2974 */ 2975 static void __linger_cancel(struct ceph_osd_linger_request *lreq) 2976 { 2977 if (lreq->is_watch && lreq->ping_req->r_osd) 2978 cancel_linger_request(lreq->ping_req); 2979 if (lreq->reg_req->r_osd) 2980 cancel_linger_request(lreq->reg_req); 2981 cancel_linger_map_check(lreq); 2982 unlink_linger(lreq->osd, lreq); 2983 linger_unregister(lreq); 2984 } 2985 2986 static void linger_cancel(struct ceph_osd_linger_request *lreq) 2987 { 2988 struct ceph_osd_client *osdc = lreq->osdc; 2989 2990 down_write(&osdc->lock); 2991 if (__linger_registered(lreq)) 2992 __linger_cancel(lreq); 2993 up_write(&osdc->lock); 2994 } 2995 2996 static void send_linger_map_check(struct ceph_osd_linger_request *lreq); 2997 2998 static void check_linger_pool_dne(struct ceph_osd_linger_request *lreq) 2999 { 3000 struct ceph_osd_client *osdc = lreq->osdc; 3001 struct ceph_osdmap *map = osdc->osdmap; 3002 3003 verify_osdc_wrlocked(osdc); 3004 WARN_ON(!map->epoch); 3005 3006 if (lreq->register_gen) { 3007 lreq->map_dne_bound = map->epoch; 3008 dout("%s lreq %p linger_id %llu pool disappeared\n", __func__, 3009 lreq, lreq->linger_id); 3010 } else { 3011 dout("%s lreq %p linger_id %llu map_dne_bound %u have %u\n", 3012 __func__, lreq, lreq->linger_id, lreq->map_dne_bound, 3013 map->epoch); 3014 } 3015 3016 if (lreq->map_dne_bound) { 3017 if (map->epoch >= lreq->map_dne_bound) { 3018 /* we had a new enough map */ 3019 pr_info("linger_id %llu pool does not exist\n", 3020 lreq->linger_id); 3021 linger_reg_commit_complete(lreq, -ENOENT); 3022 __linger_cancel(lreq); 3023 } 3024 } else { 3025 send_linger_map_check(lreq); 3026 } 3027 } 3028 3029 static void linger_map_check_cb(struct ceph_mon_generic_request *greq) 3030 { 3031 struct ceph_osd_client *osdc = &greq->monc->client->osdc; 3032 struct ceph_osd_linger_request *lreq; 3033 u64 linger_id = greq->private_data; 3034 3035 WARN_ON(greq->result || !greq->u.newest); 3036 3037 down_write(&osdc->lock); 3038 lreq = lookup_linger_mc(&osdc->linger_map_checks, linger_id); 3039 if (!lreq) { 3040 dout("%s linger_id %llu dne\n", __func__, linger_id); 3041 goto out_unlock; 3042 } 3043 3044 dout("%s lreq %p linger_id %llu map_dne_bound %u newest %llu\n", 3045 __func__, lreq, lreq->linger_id, lreq->map_dne_bound, 3046 greq->u.newest); 3047 if (!lreq->map_dne_bound) 3048 lreq->map_dne_bound = greq->u.newest; 3049 erase_linger_mc(&osdc->linger_map_checks, lreq); 3050 check_linger_pool_dne(lreq); 3051 3052 linger_put(lreq); 3053 out_unlock: 3054 up_write(&osdc->lock); 3055 } 3056 3057 static void send_linger_map_check(struct ceph_osd_linger_request *lreq) 3058 { 3059 struct ceph_osd_client *osdc = lreq->osdc; 3060 struct ceph_osd_linger_request *lookup_lreq; 3061 int ret; 3062 3063 verify_osdc_wrlocked(osdc); 3064 3065 lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks, 3066 lreq->linger_id); 3067 if (lookup_lreq) { 3068 WARN_ON(lookup_lreq != lreq); 3069 return; 3070 } 3071 3072 linger_get(lreq); 3073 insert_linger_mc(&osdc->linger_map_checks, lreq); 3074 ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap", 3075 linger_map_check_cb, lreq->linger_id); 3076 WARN_ON(ret); 3077 } 3078 3079 static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq) 3080 { 3081 int ret; 3082 3083 dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id); 3084 ret = wait_for_completion_interruptible(&lreq->reg_commit_wait); 3085 return ret ?: lreq->reg_commit_error; 3086 } 3087 3088 static int linger_notify_finish_wait(struct ceph_osd_linger_request *lreq) 3089 { 3090 int ret; 3091 3092 dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id); 3093 ret = wait_for_completion_interruptible(&lreq->notify_finish_wait); 3094 return ret ?: lreq->notify_finish_error; 3095 } 3096 3097 /* 3098 * Timeout callback, called every N seconds. When 1 or more OSD 3099 * requests has been active for more than N seconds, we send a keepalive 3100 * (tag + timestamp) to its OSD to ensure any communications channel 3101 * reset is detected. 3102 */ 3103 static void handle_timeout(struct work_struct *work) 3104 { 3105 struct ceph_osd_client *osdc = 3106 container_of(work, struct ceph_osd_client, timeout_work.work); 3107 struct ceph_options *opts = osdc->client->options; 3108 unsigned long cutoff = jiffies - opts->osd_keepalive_timeout; 3109 unsigned long expiry_cutoff = jiffies - opts->osd_request_timeout; 3110 LIST_HEAD(slow_osds); 3111 struct rb_node *n, *p; 3112 3113 dout("%s osdc %p\n", __func__, osdc); 3114 down_write(&osdc->lock); 3115 3116 /* 3117 * ping osds that are a bit slow. this ensures that if there 3118 * is a break in the TCP connection we will notice, and reopen 3119 * a connection with that osd (from the fault callback). 3120 */ 3121 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { 3122 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); 3123 bool found = false; 3124 3125 for (p = rb_first(&osd->o_requests); p; ) { 3126 struct ceph_osd_request *req = 3127 rb_entry(p, struct ceph_osd_request, r_node); 3128 3129 p = rb_next(p); /* abort_request() */ 3130 3131 if (time_before(req->r_stamp, cutoff)) { 3132 dout(" req %p tid %llu on osd%d is laggy\n", 3133 req, req->r_tid, osd->o_osd); 3134 found = true; 3135 } 3136 if (opts->osd_request_timeout && 3137 time_before(req->r_start_stamp, expiry_cutoff)) { 3138 pr_err_ratelimited("tid %llu on osd%d timeout\n", 3139 req->r_tid, osd->o_osd); 3140 abort_request(req, -ETIMEDOUT); 3141 } 3142 } 3143 for (p = rb_first(&osd->o_linger_requests); p; p = rb_next(p)) { 3144 struct ceph_osd_linger_request *lreq = 3145 rb_entry(p, struct ceph_osd_linger_request, node); 3146 3147 dout(" lreq %p linger_id %llu is served by osd%d\n", 3148 lreq, lreq->linger_id, osd->o_osd); 3149 found = true; 3150 3151 mutex_lock(&lreq->lock); 3152 if (lreq->is_watch && lreq->committed && !lreq->last_error) 3153 send_linger_ping(lreq); 3154 mutex_unlock(&lreq->lock); 3155 } 3156 3157 if (found) 3158 list_move_tail(&osd->o_keepalive_item, &slow_osds); 3159 } 3160 3161 if (opts->osd_request_timeout) { 3162 for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) { 3163 struct ceph_osd_request *req = 3164 rb_entry(p, struct ceph_osd_request, r_node); 3165 3166 p = rb_next(p); /* abort_request() */ 3167 3168 if (time_before(req->r_start_stamp, expiry_cutoff)) { 3169 pr_err_ratelimited("tid %llu on osd%d timeout\n", 3170 req->r_tid, osdc->homeless_osd.o_osd); 3171 abort_request(req, -ETIMEDOUT); 3172 } 3173 } 3174 } 3175 3176 if (atomic_read(&osdc->num_homeless) || !list_empty(&slow_osds)) 3177 maybe_request_map(osdc); 3178 3179 while (!list_empty(&slow_osds)) { 3180 struct ceph_osd *osd = list_first_entry(&slow_osds, 3181 struct ceph_osd, 3182 o_keepalive_item); 3183 list_del_init(&osd->o_keepalive_item); 3184 ceph_con_keepalive(&osd->o_con); 3185 } 3186 3187 up_write(&osdc->lock); 3188 schedule_delayed_work(&osdc->timeout_work, 3189 osdc->client->options->osd_keepalive_timeout); 3190 } 3191 3192 static void handle_osds_timeout(struct work_struct *work) 3193 { 3194 struct ceph_osd_client *osdc = 3195 container_of(work, struct ceph_osd_client, 3196 osds_timeout_work.work); 3197 unsigned long delay = osdc->client->options->osd_idle_ttl / 4; 3198 struct ceph_osd *osd, *nosd; 3199 3200 dout("%s osdc %p\n", __func__, osdc); 3201 down_write(&osdc->lock); 3202 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) { 3203 if (time_before(jiffies, osd->lru_ttl)) 3204 break; 3205 3206 WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests)); 3207 WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests)); 3208 close_osd(osd); 3209 } 3210 3211 up_write(&osdc->lock); 3212 schedule_delayed_work(&osdc->osds_timeout_work, 3213 round_jiffies_relative(delay)); 3214 } 3215 3216 static int ceph_oloc_decode(void **p, void *end, 3217 struct ceph_object_locator *oloc) 3218 { 3219 u8 struct_v, struct_cv; 3220 u32 len; 3221 void *struct_end; 3222 int ret = 0; 3223 3224 ceph_decode_need(p, end, 1 + 1 + 4, e_inval); 3225 struct_v = ceph_decode_8(p); 3226 struct_cv = ceph_decode_8(p); 3227 if (struct_v < 3) { 3228 pr_warn("got v %d < 3 cv %d of ceph_object_locator\n", 3229 struct_v, struct_cv); 3230 goto e_inval; 3231 } 3232 if (struct_cv > 6) { 3233 pr_warn("got v %d cv %d > 6 of ceph_object_locator\n", 3234 struct_v, struct_cv); 3235 goto e_inval; 3236 } 3237 len = ceph_decode_32(p); 3238 ceph_decode_need(p, end, len, e_inval); 3239 struct_end = *p + len; 3240 3241 oloc->pool = ceph_decode_64(p); 3242 *p += 4; /* skip preferred */ 3243 3244 len = ceph_decode_32(p); 3245 if (len > 0) { 3246 pr_warn("ceph_object_locator::key is set\n"); 3247 goto e_inval; 3248 } 3249 3250 if (struct_v >= 5) { 3251 bool changed = false; 3252 3253 len = ceph_decode_32(p); 3254 if (len > 0) { 3255 ceph_decode_need(p, end, len, e_inval); 3256 if (!oloc->pool_ns || 3257 ceph_compare_string(oloc->pool_ns, *p, len)) 3258 changed = true; 3259 *p += len; 3260 } else { 3261 if (oloc->pool_ns) 3262 changed = true; 3263 } 3264 if (changed) { 3265 /* redirect changes namespace */ 3266 pr_warn("ceph_object_locator::nspace is changed\n"); 3267 goto e_inval; 3268 } 3269 } 3270 3271 if (struct_v >= 6) { 3272 s64 hash = ceph_decode_64(p); 3273 if (hash != -1) { 3274 pr_warn("ceph_object_locator::hash is set\n"); 3275 goto e_inval; 3276 } 3277 } 3278 3279 /* skip the rest */ 3280 *p = struct_end; 3281 out: 3282 return ret; 3283 3284 e_inval: 3285 ret = -EINVAL; 3286 goto out; 3287 } 3288 3289 static int ceph_redirect_decode(void **p, void *end, 3290 struct ceph_request_redirect *redir) 3291 { 3292 u8 struct_v, struct_cv; 3293 u32 len; 3294 void *struct_end; 3295 int ret; 3296 3297 ceph_decode_need(p, end, 1 + 1 + 4, e_inval); 3298 struct_v = ceph_decode_8(p); 3299 struct_cv = ceph_decode_8(p); 3300 if (struct_cv > 1) { 3301 pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n", 3302 struct_v, struct_cv); 3303 goto e_inval; 3304 } 3305 len = ceph_decode_32(p); 3306 ceph_decode_need(p, end, len, e_inval); 3307 struct_end = *p + len; 3308 3309 ret = ceph_oloc_decode(p, end, &redir->oloc); 3310 if (ret) 3311 goto out; 3312 3313 len = ceph_decode_32(p); 3314 if (len > 0) { 3315 pr_warn("ceph_request_redirect::object_name is set\n"); 3316 goto e_inval; 3317 } 3318 3319 len = ceph_decode_32(p); 3320 *p += len; /* skip osd_instructions */ 3321 3322 /* skip the rest */ 3323 *p = struct_end; 3324 out: 3325 return ret; 3326 3327 e_inval: 3328 ret = -EINVAL; 3329 goto out; 3330 } 3331 3332 struct MOSDOpReply { 3333 struct ceph_pg pgid; 3334 u64 flags; 3335 int result; 3336 u32 epoch; 3337 int num_ops; 3338 u32 outdata_len[CEPH_OSD_MAX_OPS]; 3339 s32 rval[CEPH_OSD_MAX_OPS]; 3340 int retry_attempt; 3341 struct ceph_eversion replay_version; 3342 u64 user_version; 3343 struct ceph_request_redirect redirect; 3344 }; 3345 3346 static int decode_MOSDOpReply(const struct ceph_msg *msg, struct MOSDOpReply *m) 3347 { 3348 void *p = msg->front.iov_base; 3349 void *const end = p + msg->front.iov_len; 3350 u16 version = le16_to_cpu(msg->hdr.version); 3351 struct ceph_eversion bad_replay_version; 3352 u8 decode_redir; 3353 u32 len; 3354 int ret; 3355 int i; 3356 3357 ceph_decode_32_safe(&p, end, len, e_inval); 3358 ceph_decode_need(&p, end, len, e_inval); 3359 p += len; /* skip oid */ 3360 3361 ret = ceph_decode_pgid(&p, end, &m->pgid); 3362 if (ret) 3363 return ret; 3364 3365 ceph_decode_64_safe(&p, end, m->flags, e_inval); 3366 ceph_decode_32_safe(&p, end, m->result, e_inval); 3367 ceph_decode_need(&p, end, sizeof(bad_replay_version), e_inval); 3368 memcpy(&bad_replay_version, p, sizeof(bad_replay_version)); 3369 p += sizeof(bad_replay_version); 3370 ceph_decode_32_safe(&p, end, m->epoch, e_inval); 3371 3372 ceph_decode_32_safe(&p, end, m->num_ops, e_inval); 3373 if (m->num_ops > ARRAY_SIZE(m->outdata_len)) 3374 goto e_inval; 3375 3376 ceph_decode_need(&p, end, m->num_ops * sizeof(struct ceph_osd_op), 3377 e_inval); 3378 for (i = 0; i < m->num_ops; i++) { 3379 struct ceph_osd_op *op = p; 3380 3381 m->outdata_len[i] = le32_to_cpu(op->payload_len); 3382 p += sizeof(*op); 3383 } 3384 3385 ceph_decode_32_safe(&p, end, m->retry_attempt, e_inval); 3386 for (i = 0; i < m->num_ops; i++) 3387 ceph_decode_32_safe(&p, end, m->rval[i], e_inval); 3388 3389 if (version >= 5) { 3390 ceph_decode_need(&p, end, sizeof(m->replay_version), e_inval); 3391 memcpy(&m->replay_version, p, sizeof(m->replay_version)); 3392 p += sizeof(m->replay_version); 3393 ceph_decode_64_safe(&p, end, m->user_version, e_inval); 3394 } else { 3395 m->replay_version = bad_replay_version; /* struct */ 3396 m->user_version = le64_to_cpu(m->replay_version.version); 3397 } 3398 3399 if (version >= 6) { 3400 if (version >= 7) 3401 ceph_decode_8_safe(&p, end, decode_redir, e_inval); 3402 else 3403 decode_redir = 1; 3404 } else { 3405 decode_redir = 0; 3406 } 3407 3408 if (decode_redir) { 3409 ret = ceph_redirect_decode(&p, end, &m->redirect); 3410 if (ret) 3411 return ret; 3412 } else { 3413 ceph_oloc_init(&m->redirect.oloc); 3414 } 3415 3416 return 0; 3417 3418 e_inval: 3419 return -EINVAL; 3420 } 3421 3422 /* 3423 * Handle MOSDOpReply. Set ->r_result and call the callback if it is 3424 * specified. 3425 */ 3426 static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg) 3427 { 3428 struct ceph_osd_client *osdc = osd->o_osdc; 3429 struct ceph_osd_request *req; 3430 struct MOSDOpReply m; 3431 u64 tid = le64_to_cpu(msg->hdr.tid); 3432 u32 data_len = 0; 3433 int ret; 3434 int i; 3435 3436 dout("%s msg %p tid %llu\n", __func__, msg, tid); 3437 3438 down_read(&osdc->lock); 3439 if (!osd_registered(osd)) { 3440 dout("%s osd%d unknown\n", __func__, osd->o_osd); 3441 goto out_unlock_osdc; 3442 } 3443 WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num)); 3444 3445 mutex_lock(&osd->lock); 3446 req = lookup_request(&osd->o_requests, tid); 3447 if (!req) { 3448 dout("%s osd%d tid %llu unknown\n", __func__, osd->o_osd, tid); 3449 goto out_unlock_session; 3450 } 3451 3452 m.redirect.oloc.pool_ns = req->r_t.target_oloc.pool_ns; 3453 ret = decode_MOSDOpReply(msg, &m); 3454 m.redirect.oloc.pool_ns = NULL; 3455 if (ret) { 3456 pr_err("failed to decode MOSDOpReply for tid %llu: %d\n", 3457 req->r_tid, ret); 3458 ceph_msg_dump(msg); 3459 goto fail_request; 3460 } 3461 dout("%s req %p tid %llu flags 0x%llx pgid %llu.%x epoch %u attempt %d v %u'%llu uv %llu\n", 3462 __func__, req, req->r_tid, m.flags, m.pgid.pool, m.pgid.seed, 3463 m.epoch, m.retry_attempt, le32_to_cpu(m.replay_version.epoch), 3464 le64_to_cpu(m.replay_version.version), m.user_version); 3465 3466 if (m.retry_attempt >= 0) { 3467 if (m.retry_attempt != req->r_attempts - 1) { 3468 dout("req %p tid %llu retry_attempt %d != %d, ignoring\n", 3469 req, req->r_tid, m.retry_attempt, 3470 req->r_attempts - 1); 3471 goto out_unlock_session; 3472 } 3473 } else { 3474 WARN_ON(1); /* MOSDOpReply v4 is assumed */ 3475 } 3476 3477 if (!ceph_oloc_empty(&m.redirect.oloc)) { 3478 dout("req %p tid %llu redirect pool %lld\n", req, req->r_tid, 3479 m.redirect.oloc.pool); 3480 unlink_request(osd, req); 3481 mutex_unlock(&osd->lock); 3482 3483 /* 3484 * Not ceph_oloc_copy() - changing pool_ns is not 3485 * supported. 3486 */ 3487 req->r_t.target_oloc.pool = m.redirect.oloc.pool; 3488 req->r_flags |= CEPH_OSD_FLAG_REDIRECTED; 3489 req->r_tid = 0; 3490 __submit_request(req, false); 3491 goto out_unlock_osdc; 3492 } 3493 3494 if (m.num_ops != req->r_num_ops) { 3495 pr_err("num_ops %d != %d for tid %llu\n", m.num_ops, 3496 req->r_num_ops, req->r_tid); 3497 goto fail_request; 3498 } 3499 for (i = 0; i < req->r_num_ops; i++) { 3500 dout(" req %p tid %llu op %d rval %d len %u\n", req, 3501 req->r_tid, i, m.rval[i], m.outdata_len[i]); 3502 req->r_ops[i].rval = m.rval[i]; 3503 req->r_ops[i].outdata_len = m.outdata_len[i]; 3504 data_len += m.outdata_len[i]; 3505 } 3506 if (data_len != le32_to_cpu(msg->hdr.data_len)) { 3507 pr_err("sum of lens %u != %u for tid %llu\n", data_len, 3508 le32_to_cpu(msg->hdr.data_len), req->r_tid); 3509 goto fail_request; 3510 } 3511 dout("%s req %p tid %llu result %d data_len %u\n", __func__, 3512 req, req->r_tid, m.result, data_len); 3513 3514 /* 3515 * Since we only ever request ONDISK, we should only ever get 3516 * one (type of) reply back. 3517 */ 3518 WARN_ON(!(m.flags & CEPH_OSD_FLAG_ONDISK)); 3519 req->r_result = m.result ?: data_len; 3520 finish_request(req); 3521 mutex_unlock(&osd->lock); 3522 up_read(&osdc->lock); 3523 3524 __complete_request(req); 3525 complete_all(&req->r_completion); 3526 ceph_osdc_put_request(req); 3527 return; 3528 3529 fail_request: 3530 complete_request(req, -EIO); 3531 out_unlock_session: 3532 mutex_unlock(&osd->lock); 3533 out_unlock_osdc: 3534 up_read(&osdc->lock); 3535 } 3536 3537 static void set_pool_was_full(struct ceph_osd_client *osdc) 3538 { 3539 struct rb_node *n; 3540 3541 for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) { 3542 struct ceph_pg_pool_info *pi = 3543 rb_entry(n, struct ceph_pg_pool_info, node); 3544 3545 pi->was_full = __pool_full(pi); 3546 } 3547 } 3548 3549 static bool pool_cleared_full(struct ceph_osd_client *osdc, s64 pool_id) 3550 { 3551 struct ceph_pg_pool_info *pi; 3552 3553 pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id); 3554 if (!pi) 3555 return false; 3556 3557 return pi->was_full && !__pool_full(pi); 3558 } 3559 3560 static enum calc_target_result 3561 recalc_linger_target(struct ceph_osd_linger_request *lreq) 3562 { 3563 struct ceph_osd_client *osdc = lreq->osdc; 3564 enum calc_target_result ct_res; 3565 3566 ct_res = calc_target(osdc, &lreq->t, NULL, true); 3567 if (ct_res == CALC_TARGET_NEED_RESEND) { 3568 struct ceph_osd *osd; 3569 3570 osd = lookup_create_osd(osdc, lreq->t.osd, true); 3571 if (osd != lreq->osd) { 3572 unlink_linger(lreq->osd, lreq); 3573 link_linger(osd, lreq); 3574 } 3575 } 3576 3577 return ct_res; 3578 } 3579 3580 /* 3581 * Requeue requests whose mapping to an OSD has changed. 3582 */ 3583 static void scan_requests(struct ceph_osd *osd, 3584 bool force_resend, 3585 bool cleared_full, 3586 bool check_pool_cleared_full, 3587 struct rb_root *need_resend, 3588 struct list_head *need_resend_linger) 3589 { 3590 struct ceph_osd_client *osdc = osd->o_osdc; 3591 struct rb_node *n; 3592 bool force_resend_writes; 3593 3594 for (n = rb_first(&osd->o_linger_requests); n; ) { 3595 struct ceph_osd_linger_request *lreq = 3596 rb_entry(n, struct ceph_osd_linger_request, node); 3597 enum calc_target_result ct_res; 3598 3599 n = rb_next(n); /* recalc_linger_target() */ 3600 3601 dout("%s lreq %p linger_id %llu\n", __func__, lreq, 3602 lreq->linger_id); 3603 ct_res = recalc_linger_target(lreq); 3604 switch (ct_res) { 3605 case CALC_TARGET_NO_ACTION: 3606 force_resend_writes = cleared_full || 3607 (check_pool_cleared_full && 3608 pool_cleared_full(osdc, lreq->t.base_oloc.pool)); 3609 if (!force_resend && !force_resend_writes) 3610 break; 3611 3612 /* fall through */ 3613 case CALC_TARGET_NEED_RESEND: 3614 cancel_linger_map_check(lreq); 3615 /* 3616 * scan_requests() for the previous epoch(s) 3617 * may have already added it to the list, since 3618 * it's not unlinked here. 3619 */ 3620 if (list_empty(&lreq->scan_item)) 3621 list_add_tail(&lreq->scan_item, need_resend_linger); 3622 break; 3623 case CALC_TARGET_POOL_DNE: 3624 list_del_init(&lreq->scan_item); 3625 check_linger_pool_dne(lreq); 3626 break; 3627 } 3628 } 3629 3630 for (n = rb_first(&osd->o_requests); n; ) { 3631 struct ceph_osd_request *req = 3632 rb_entry(n, struct ceph_osd_request, r_node); 3633 enum calc_target_result ct_res; 3634 3635 n = rb_next(n); /* unlink_request(), check_pool_dne() */ 3636 3637 dout("%s req %p tid %llu\n", __func__, req, req->r_tid); 3638 ct_res = calc_target(osdc, &req->r_t, &req->r_osd->o_con, 3639 false); 3640 switch (ct_res) { 3641 case CALC_TARGET_NO_ACTION: 3642 force_resend_writes = cleared_full || 3643 (check_pool_cleared_full && 3644 pool_cleared_full(osdc, req->r_t.base_oloc.pool)); 3645 if (!force_resend && 3646 (!(req->r_flags & CEPH_OSD_FLAG_WRITE) || 3647 !force_resend_writes)) 3648 break; 3649 3650 /* fall through */ 3651 case CALC_TARGET_NEED_RESEND: 3652 cancel_map_check(req); 3653 unlink_request(osd, req); 3654 insert_request(need_resend, req); 3655 break; 3656 case CALC_TARGET_POOL_DNE: 3657 check_pool_dne(req); 3658 break; 3659 } 3660 } 3661 } 3662 3663 static int handle_one_map(struct ceph_osd_client *osdc, 3664 void *p, void *end, bool incremental, 3665 struct rb_root *need_resend, 3666 struct list_head *need_resend_linger) 3667 { 3668 struct ceph_osdmap *newmap; 3669 struct rb_node *n; 3670 bool skipped_map = false; 3671 bool was_full; 3672 3673 was_full = ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL); 3674 set_pool_was_full(osdc); 3675 3676 if (incremental) 3677 newmap = osdmap_apply_incremental(&p, end, osdc->osdmap); 3678 else 3679 newmap = ceph_osdmap_decode(&p, end); 3680 if (IS_ERR(newmap)) 3681 return PTR_ERR(newmap); 3682 3683 if (newmap != osdc->osdmap) { 3684 /* 3685 * Preserve ->was_full before destroying the old map. 3686 * For pools that weren't in the old map, ->was_full 3687 * should be false. 3688 */ 3689 for (n = rb_first(&newmap->pg_pools); n; n = rb_next(n)) { 3690 struct ceph_pg_pool_info *pi = 3691 rb_entry(n, struct ceph_pg_pool_info, node); 3692 struct ceph_pg_pool_info *old_pi; 3693 3694 old_pi = ceph_pg_pool_by_id(osdc->osdmap, pi->id); 3695 if (old_pi) 3696 pi->was_full = old_pi->was_full; 3697 else 3698 WARN_ON(pi->was_full); 3699 } 3700 3701 if (osdc->osdmap->epoch && 3702 osdc->osdmap->epoch + 1 < newmap->epoch) { 3703 WARN_ON(incremental); 3704 skipped_map = true; 3705 } 3706 3707 ceph_osdmap_destroy(osdc->osdmap); 3708 osdc->osdmap = newmap; 3709 } 3710 3711 was_full &= !ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL); 3712 scan_requests(&osdc->homeless_osd, skipped_map, was_full, true, 3713 need_resend, need_resend_linger); 3714 3715 for (n = rb_first(&osdc->osds); n; ) { 3716 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); 3717 3718 n = rb_next(n); /* close_osd() */ 3719 3720 scan_requests(osd, skipped_map, was_full, true, need_resend, 3721 need_resend_linger); 3722 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) || 3723 memcmp(&osd->o_con.peer_addr, 3724 ceph_osd_addr(osdc->osdmap, osd->o_osd), 3725 sizeof(struct ceph_entity_addr))) 3726 close_osd(osd); 3727 } 3728 3729 return 0; 3730 } 3731 3732 static void kick_requests(struct ceph_osd_client *osdc, 3733 struct rb_root *need_resend, 3734 struct list_head *need_resend_linger) 3735 { 3736 struct ceph_osd_linger_request *lreq, *nlreq; 3737 enum calc_target_result ct_res; 3738 struct rb_node *n; 3739 3740 /* make sure need_resend targets reflect latest map */ 3741 for (n = rb_first(need_resend); n; ) { 3742 struct ceph_osd_request *req = 3743 rb_entry(n, struct ceph_osd_request, r_node); 3744 3745 n = rb_next(n); 3746 3747 if (req->r_t.epoch < osdc->osdmap->epoch) { 3748 ct_res = calc_target(osdc, &req->r_t, NULL, false); 3749 if (ct_res == CALC_TARGET_POOL_DNE) { 3750 erase_request(need_resend, req); 3751 check_pool_dne(req); 3752 } 3753 } 3754 } 3755 3756 for (n = rb_first(need_resend); n; ) { 3757 struct ceph_osd_request *req = 3758 rb_entry(n, struct ceph_osd_request, r_node); 3759 struct ceph_osd *osd; 3760 3761 n = rb_next(n); 3762 erase_request(need_resend, req); /* before link_request() */ 3763 3764 osd = lookup_create_osd(osdc, req->r_t.osd, true); 3765 link_request(osd, req); 3766 if (!req->r_linger) { 3767 if (!osd_homeless(osd) && !req->r_t.paused) 3768 send_request(req); 3769 } else { 3770 cancel_linger_request(req); 3771 } 3772 } 3773 3774 list_for_each_entry_safe(lreq, nlreq, need_resend_linger, scan_item) { 3775 if (!osd_homeless(lreq->osd)) 3776 send_linger(lreq); 3777 3778 list_del_init(&lreq->scan_item); 3779 } 3780 } 3781 3782 /* 3783 * Process updated osd map. 3784 * 3785 * The message contains any number of incremental and full maps, normally 3786 * indicating some sort of topology change in the cluster. Kick requests 3787 * off to different OSDs as needed. 3788 */ 3789 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) 3790 { 3791 void *p = msg->front.iov_base; 3792 void *const end = p + msg->front.iov_len; 3793 u32 nr_maps, maplen; 3794 u32 epoch; 3795 struct ceph_fsid fsid; 3796 struct rb_root need_resend = RB_ROOT; 3797 LIST_HEAD(need_resend_linger); 3798 bool handled_incremental = false; 3799 bool was_pauserd, was_pausewr; 3800 bool pauserd, pausewr; 3801 int err; 3802 3803 dout("%s have %u\n", __func__, osdc->osdmap->epoch); 3804 down_write(&osdc->lock); 3805 3806 /* verify fsid */ 3807 ceph_decode_need(&p, end, sizeof(fsid), bad); 3808 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 3809 if (ceph_check_fsid(osdc->client, &fsid) < 0) 3810 goto bad; 3811 3812 was_pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD); 3813 was_pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) || 3814 ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 3815 have_pool_full(osdc); 3816 3817 /* incremental maps */ 3818 ceph_decode_32_safe(&p, end, nr_maps, bad); 3819 dout(" %d inc maps\n", nr_maps); 3820 while (nr_maps > 0) { 3821 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 3822 epoch = ceph_decode_32(&p); 3823 maplen = ceph_decode_32(&p); 3824 ceph_decode_need(&p, end, maplen, bad); 3825 if (osdc->osdmap->epoch && 3826 osdc->osdmap->epoch + 1 == epoch) { 3827 dout("applying incremental map %u len %d\n", 3828 epoch, maplen); 3829 err = handle_one_map(osdc, p, p + maplen, true, 3830 &need_resend, &need_resend_linger); 3831 if (err) 3832 goto bad; 3833 handled_incremental = true; 3834 } else { 3835 dout("ignoring incremental map %u len %d\n", 3836 epoch, maplen); 3837 } 3838 p += maplen; 3839 nr_maps--; 3840 } 3841 if (handled_incremental) 3842 goto done; 3843 3844 /* full maps */ 3845 ceph_decode_32_safe(&p, end, nr_maps, bad); 3846 dout(" %d full maps\n", nr_maps); 3847 while (nr_maps) { 3848 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 3849 epoch = ceph_decode_32(&p); 3850 maplen = ceph_decode_32(&p); 3851 ceph_decode_need(&p, end, maplen, bad); 3852 if (nr_maps > 1) { 3853 dout("skipping non-latest full map %u len %d\n", 3854 epoch, maplen); 3855 } else if (osdc->osdmap->epoch >= epoch) { 3856 dout("skipping full map %u len %d, " 3857 "older than our %u\n", epoch, maplen, 3858 osdc->osdmap->epoch); 3859 } else { 3860 dout("taking full map %u len %d\n", epoch, maplen); 3861 err = handle_one_map(osdc, p, p + maplen, false, 3862 &need_resend, &need_resend_linger); 3863 if (err) 3864 goto bad; 3865 } 3866 p += maplen; 3867 nr_maps--; 3868 } 3869 3870 done: 3871 /* 3872 * subscribe to subsequent osdmap updates if full to ensure 3873 * we find out when we are no longer full and stop returning 3874 * ENOSPC. 3875 */ 3876 pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD); 3877 pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) || 3878 ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 3879 have_pool_full(osdc); 3880 if (was_pauserd || was_pausewr || pauserd || pausewr || 3881 osdc->osdmap->epoch < osdc->epoch_barrier) 3882 maybe_request_map(osdc); 3883 3884 kick_requests(osdc, &need_resend, &need_resend_linger); 3885 3886 ceph_osdc_abort_on_full(osdc); 3887 ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP, 3888 osdc->osdmap->epoch); 3889 up_write(&osdc->lock); 3890 wake_up_all(&osdc->client->auth_wq); 3891 return; 3892 3893 bad: 3894 pr_err("osdc handle_map corrupt msg\n"); 3895 ceph_msg_dump(msg); 3896 up_write(&osdc->lock); 3897 } 3898 3899 /* 3900 * Resubmit requests pending on the given osd. 3901 */ 3902 static void kick_osd_requests(struct ceph_osd *osd) 3903 { 3904 struct rb_node *n; 3905 3906 clear_backoffs(osd); 3907 3908 for (n = rb_first(&osd->o_requests); n; ) { 3909 struct ceph_osd_request *req = 3910 rb_entry(n, struct ceph_osd_request, r_node); 3911 3912 n = rb_next(n); /* cancel_linger_request() */ 3913 3914 if (!req->r_linger) { 3915 if (!req->r_t.paused) 3916 send_request(req); 3917 } else { 3918 cancel_linger_request(req); 3919 } 3920 } 3921 for (n = rb_first(&osd->o_linger_requests); n; n = rb_next(n)) { 3922 struct ceph_osd_linger_request *lreq = 3923 rb_entry(n, struct ceph_osd_linger_request, node); 3924 3925 send_linger(lreq); 3926 } 3927 } 3928 3929 /* 3930 * If the osd connection drops, we need to resubmit all requests. 3931 */ 3932 static void osd_fault(struct ceph_connection *con) 3933 { 3934 struct ceph_osd *osd = con->private; 3935 struct ceph_osd_client *osdc = osd->o_osdc; 3936 3937 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); 3938 3939 down_write(&osdc->lock); 3940 if (!osd_registered(osd)) { 3941 dout("%s osd%d unknown\n", __func__, osd->o_osd); 3942 goto out_unlock; 3943 } 3944 3945 if (!reopen_osd(osd)) 3946 kick_osd_requests(osd); 3947 maybe_request_map(osdc); 3948 3949 out_unlock: 3950 up_write(&osdc->lock); 3951 } 3952 3953 struct MOSDBackoff { 3954 struct ceph_spg spgid; 3955 u32 map_epoch; 3956 u8 op; 3957 u64 id; 3958 struct ceph_hobject_id *begin; 3959 struct ceph_hobject_id *end; 3960 }; 3961 3962 static int decode_MOSDBackoff(const struct ceph_msg *msg, struct MOSDBackoff *m) 3963 { 3964 void *p = msg->front.iov_base; 3965 void *const end = p + msg->front.iov_len; 3966 u8 struct_v; 3967 u32 struct_len; 3968 int ret; 3969 3970 ret = ceph_start_decoding(&p, end, 1, "spg_t", &struct_v, &struct_len); 3971 if (ret) 3972 return ret; 3973 3974 ret = ceph_decode_pgid(&p, end, &m->spgid.pgid); 3975 if (ret) 3976 return ret; 3977 3978 ceph_decode_8_safe(&p, end, m->spgid.shard, e_inval); 3979 ceph_decode_32_safe(&p, end, m->map_epoch, e_inval); 3980 ceph_decode_8_safe(&p, end, m->op, e_inval); 3981 ceph_decode_64_safe(&p, end, m->id, e_inval); 3982 3983 m->begin = kzalloc(sizeof(*m->begin), GFP_NOIO); 3984 if (!m->begin) 3985 return -ENOMEM; 3986 3987 ret = decode_hoid(&p, end, m->begin); 3988 if (ret) { 3989 free_hoid(m->begin); 3990 return ret; 3991 } 3992 3993 m->end = kzalloc(sizeof(*m->end), GFP_NOIO); 3994 if (!m->end) { 3995 free_hoid(m->begin); 3996 return -ENOMEM; 3997 } 3998 3999 ret = decode_hoid(&p, end, m->end); 4000 if (ret) { 4001 free_hoid(m->begin); 4002 free_hoid(m->end); 4003 return ret; 4004 } 4005 4006 return 0; 4007 4008 e_inval: 4009 return -EINVAL; 4010 } 4011 4012 static struct ceph_msg *create_backoff_message( 4013 const struct ceph_osd_backoff *backoff, 4014 u32 map_epoch) 4015 { 4016 struct ceph_msg *msg; 4017 void *p, *end; 4018 int msg_size; 4019 4020 msg_size = CEPH_ENCODING_START_BLK_LEN + 4021 CEPH_PGID_ENCODING_LEN + 1; /* spgid */ 4022 msg_size += 4 + 1 + 8; /* map_epoch, op, id */ 4023 msg_size += CEPH_ENCODING_START_BLK_LEN + 4024 hoid_encoding_size(backoff->begin); 4025 msg_size += CEPH_ENCODING_START_BLK_LEN + 4026 hoid_encoding_size(backoff->end); 4027 4028 msg = ceph_msg_new(CEPH_MSG_OSD_BACKOFF, msg_size, GFP_NOIO, true); 4029 if (!msg) 4030 return NULL; 4031 4032 p = msg->front.iov_base; 4033 end = p + msg->front_alloc_len; 4034 4035 encode_spgid(&p, &backoff->spgid); 4036 ceph_encode_32(&p, map_epoch); 4037 ceph_encode_8(&p, CEPH_OSD_BACKOFF_OP_ACK_BLOCK); 4038 ceph_encode_64(&p, backoff->id); 4039 encode_hoid(&p, end, backoff->begin); 4040 encode_hoid(&p, end, backoff->end); 4041 BUG_ON(p != end); 4042 4043 msg->front.iov_len = p - msg->front.iov_base; 4044 msg->hdr.version = cpu_to_le16(1); /* MOSDBackoff v1 */ 4045 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 4046 4047 return msg; 4048 } 4049 4050 static void handle_backoff_block(struct ceph_osd *osd, struct MOSDBackoff *m) 4051 { 4052 struct ceph_spg_mapping *spg; 4053 struct ceph_osd_backoff *backoff; 4054 struct ceph_msg *msg; 4055 4056 dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd, 4057 m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id); 4058 4059 spg = lookup_spg_mapping(&osd->o_backoff_mappings, &m->spgid); 4060 if (!spg) { 4061 spg = alloc_spg_mapping(); 4062 if (!spg) { 4063 pr_err("%s failed to allocate spg\n", __func__); 4064 return; 4065 } 4066 spg->spgid = m->spgid; /* struct */ 4067 insert_spg_mapping(&osd->o_backoff_mappings, spg); 4068 } 4069 4070 backoff = alloc_backoff(); 4071 if (!backoff) { 4072 pr_err("%s failed to allocate backoff\n", __func__); 4073 return; 4074 } 4075 backoff->spgid = m->spgid; /* struct */ 4076 backoff->id = m->id; 4077 backoff->begin = m->begin; 4078 m->begin = NULL; /* backoff now owns this */ 4079 backoff->end = m->end; 4080 m->end = NULL; /* ditto */ 4081 4082 insert_backoff(&spg->backoffs, backoff); 4083 insert_backoff_by_id(&osd->o_backoffs_by_id, backoff); 4084 4085 /* 4086 * Ack with original backoff's epoch so that the OSD can 4087 * discard this if there was a PG split. 4088 */ 4089 msg = create_backoff_message(backoff, m->map_epoch); 4090 if (!msg) { 4091 pr_err("%s failed to allocate msg\n", __func__); 4092 return; 4093 } 4094 ceph_con_send(&osd->o_con, msg); 4095 } 4096 4097 static bool target_contained_by(const struct ceph_osd_request_target *t, 4098 const struct ceph_hobject_id *begin, 4099 const struct ceph_hobject_id *end) 4100 { 4101 struct ceph_hobject_id hoid; 4102 int cmp; 4103 4104 hoid_fill_from_target(&hoid, t); 4105 cmp = hoid_compare(&hoid, begin); 4106 return !cmp || (cmp > 0 && hoid_compare(&hoid, end) < 0); 4107 } 4108 4109 static void handle_backoff_unblock(struct ceph_osd *osd, 4110 const struct MOSDBackoff *m) 4111 { 4112 struct ceph_spg_mapping *spg; 4113 struct ceph_osd_backoff *backoff; 4114 struct rb_node *n; 4115 4116 dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd, 4117 m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id); 4118 4119 backoff = lookup_backoff_by_id(&osd->o_backoffs_by_id, m->id); 4120 if (!backoff) { 4121 pr_err("%s osd%d spgid %llu.%xs%d id %llu backoff dne\n", 4122 __func__, osd->o_osd, m->spgid.pgid.pool, 4123 m->spgid.pgid.seed, m->spgid.shard, m->id); 4124 return; 4125 } 4126 4127 if (hoid_compare(backoff->begin, m->begin) && 4128 hoid_compare(backoff->end, m->end)) { 4129 pr_err("%s osd%d spgid %llu.%xs%d id %llu bad range?\n", 4130 __func__, osd->o_osd, m->spgid.pgid.pool, 4131 m->spgid.pgid.seed, m->spgid.shard, m->id); 4132 /* unblock it anyway... */ 4133 } 4134 4135 spg = lookup_spg_mapping(&osd->o_backoff_mappings, &backoff->spgid); 4136 BUG_ON(!spg); 4137 4138 erase_backoff(&spg->backoffs, backoff); 4139 erase_backoff_by_id(&osd->o_backoffs_by_id, backoff); 4140 free_backoff(backoff); 4141 4142 if (RB_EMPTY_ROOT(&spg->backoffs)) { 4143 erase_spg_mapping(&osd->o_backoff_mappings, spg); 4144 free_spg_mapping(spg); 4145 } 4146 4147 for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) { 4148 struct ceph_osd_request *req = 4149 rb_entry(n, struct ceph_osd_request, r_node); 4150 4151 if (!ceph_spg_compare(&req->r_t.spgid, &m->spgid)) { 4152 /* 4153 * Match against @m, not @backoff -- the PG may 4154 * have split on the OSD. 4155 */ 4156 if (target_contained_by(&req->r_t, m->begin, m->end)) { 4157 /* 4158 * If no other installed backoff applies, 4159 * resend. 4160 */ 4161 send_request(req); 4162 } 4163 } 4164 } 4165 } 4166 4167 static void handle_backoff(struct ceph_osd *osd, struct ceph_msg *msg) 4168 { 4169 struct ceph_osd_client *osdc = osd->o_osdc; 4170 struct MOSDBackoff m; 4171 int ret; 4172 4173 down_read(&osdc->lock); 4174 if (!osd_registered(osd)) { 4175 dout("%s osd%d unknown\n", __func__, osd->o_osd); 4176 up_read(&osdc->lock); 4177 return; 4178 } 4179 WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num)); 4180 4181 mutex_lock(&osd->lock); 4182 ret = decode_MOSDBackoff(msg, &m); 4183 if (ret) { 4184 pr_err("failed to decode MOSDBackoff: %d\n", ret); 4185 ceph_msg_dump(msg); 4186 goto out_unlock; 4187 } 4188 4189 switch (m.op) { 4190 case CEPH_OSD_BACKOFF_OP_BLOCK: 4191 handle_backoff_block(osd, &m); 4192 break; 4193 case CEPH_OSD_BACKOFF_OP_UNBLOCK: 4194 handle_backoff_unblock(osd, &m); 4195 break; 4196 default: 4197 pr_err("%s osd%d unknown op %d\n", __func__, osd->o_osd, m.op); 4198 } 4199 4200 free_hoid(m.begin); 4201 free_hoid(m.end); 4202 4203 out_unlock: 4204 mutex_unlock(&osd->lock); 4205 up_read(&osdc->lock); 4206 } 4207 4208 /* 4209 * Process osd watch notifications 4210 */ 4211 static void handle_watch_notify(struct ceph_osd_client *osdc, 4212 struct ceph_msg *msg) 4213 { 4214 void *p = msg->front.iov_base; 4215 void *const end = p + msg->front.iov_len; 4216 struct ceph_osd_linger_request *lreq; 4217 struct linger_work *lwork; 4218 u8 proto_ver, opcode; 4219 u64 cookie, notify_id; 4220 u64 notifier_id = 0; 4221 s32 return_code = 0; 4222 void *payload = NULL; 4223 u32 payload_len = 0; 4224 4225 ceph_decode_8_safe(&p, end, proto_ver, bad); 4226 ceph_decode_8_safe(&p, end, opcode, bad); 4227 ceph_decode_64_safe(&p, end, cookie, bad); 4228 p += 8; /* skip ver */ 4229 ceph_decode_64_safe(&p, end, notify_id, bad); 4230 4231 if (proto_ver >= 1) { 4232 ceph_decode_32_safe(&p, end, payload_len, bad); 4233 ceph_decode_need(&p, end, payload_len, bad); 4234 payload = p; 4235 p += payload_len; 4236 } 4237 4238 if (le16_to_cpu(msg->hdr.version) >= 2) 4239 ceph_decode_32_safe(&p, end, return_code, bad); 4240 4241 if (le16_to_cpu(msg->hdr.version) >= 3) 4242 ceph_decode_64_safe(&p, end, notifier_id, bad); 4243 4244 down_read(&osdc->lock); 4245 lreq = lookup_linger_osdc(&osdc->linger_requests, cookie); 4246 if (!lreq) { 4247 dout("%s opcode %d cookie %llu dne\n", __func__, opcode, 4248 cookie); 4249 goto out_unlock_osdc; 4250 } 4251 4252 mutex_lock(&lreq->lock); 4253 dout("%s opcode %d cookie %llu lreq %p is_watch %d\n", __func__, 4254 opcode, cookie, lreq, lreq->is_watch); 4255 if (opcode == CEPH_WATCH_EVENT_DISCONNECT) { 4256 if (!lreq->last_error) { 4257 lreq->last_error = -ENOTCONN; 4258 queue_watch_error(lreq); 4259 } 4260 } else if (!lreq->is_watch) { 4261 /* CEPH_WATCH_EVENT_NOTIFY_COMPLETE */ 4262 if (lreq->notify_id && lreq->notify_id != notify_id) { 4263 dout("lreq %p notify_id %llu != %llu, ignoring\n", lreq, 4264 lreq->notify_id, notify_id); 4265 } else if (!completion_done(&lreq->notify_finish_wait)) { 4266 struct ceph_msg_data *data = 4267 list_first_entry_or_null(&msg->data, 4268 struct ceph_msg_data, 4269 links); 4270 4271 if (data) { 4272 if (lreq->preply_pages) { 4273 WARN_ON(data->type != 4274 CEPH_MSG_DATA_PAGES); 4275 *lreq->preply_pages = data->pages; 4276 *lreq->preply_len = data->length; 4277 } else { 4278 ceph_release_page_vector(data->pages, 4279 calc_pages_for(0, data->length)); 4280 } 4281 } 4282 lreq->notify_finish_error = return_code; 4283 complete_all(&lreq->notify_finish_wait); 4284 } 4285 } else { 4286 /* CEPH_WATCH_EVENT_NOTIFY */ 4287 lwork = lwork_alloc(lreq, do_watch_notify); 4288 if (!lwork) { 4289 pr_err("failed to allocate notify-lwork\n"); 4290 goto out_unlock_lreq; 4291 } 4292 4293 lwork->notify.notify_id = notify_id; 4294 lwork->notify.notifier_id = notifier_id; 4295 lwork->notify.payload = payload; 4296 lwork->notify.payload_len = payload_len; 4297 lwork->notify.msg = ceph_msg_get(msg); 4298 lwork_queue(lwork); 4299 } 4300 4301 out_unlock_lreq: 4302 mutex_unlock(&lreq->lock); 4303 out_unlock_osdc: 4304 up_read(&osdc->lock); 4305 return; 4306 4307 bad: 4308 pr_err("osdc handle_watch_notify corrupt msg\n"); 4309 } 4310 4311 /* 4312 * Register request, send initial attempt. 4313 */ 4314 int ceph_osdc_start_request(struct ceph_osd_client *osdc, 4315 struct ceph_osd_request *req, 4316 bool nofail) 4317 { 4318 down_read(&osdc->lock); 4319 submit_request(req, false); 4320 up_read(&osdc->lock); 4321 4322 return 0; 4323 } 4324 EXPORT_SYMBOL(ceph_osdc_start_request); 4325 4326 /* 4327 * Unregister a registered request. The request is not completed: 4328 * ->r_result isn't set and __complete_request() isn't called. 4329 */ 4330 void ceph_osdc_cancel_request(struct ceph_osd_request *req) 4331 { 4332 struct ceph_osd_client *osdc = req->r_osdc; 4333 4334 down_write(&osdc->lock); 4335 if (req->r_osd) 4336 cancel_request(req); 4337 up_write(&osdc->lock); 4338 } 4339 EXPORT_SYMBOL(ceph_osdc_cancel_request); 4340 4341 /* 4342 * @timeout: in jiffies, 0 means "wait forever" 4343 */ 4344 static int wait_request_timeout(struct ceph_osd_request *req, 4345 unsigned long timeout) 4346 { 4347 long left; 4348 4349 dout("%s req %p tid %llu\n", __func__, req, req->r_tid); 4350 left = wait_for_completion_killable_timeout(&req->r_completion, 4351 ceph_timeout_jiffies(timeout)); 4352 if (left <= 0) { 4353 left = left ?: -ETIMEDOUT; 4354 ceph_osdc_cancel_request(req); 4355 } else { 4356 left = req->r_result; /* completed */ 4357 } 4358 4359 return left; 4360 } 4361 4362 /* 4363 * wait for a request to complete 4364 */ 4365 int ceph_osdc_wait_request(struct ceph_osd_client *osdc, 4366 struct ceph_osd_request *req) 4367 { 4368 return wait_request_timeout(req, 0); 4369 } 4370 EXPORT_SYMBOL(ceph_osdc_wait_request); 4371 4372 /* 4373 * sync - wait for all in-flight requests to flush. avoid starvation. 4374 */ 4375 void ceph_osdc_sync(struct ceph_osd_client *osdc) 4376 { 4377 struct rb_node *n, *p; 4378 u64 last_tid = atomic64_read(&osdc->last_tid); 4379 4380 again: 4381 down_read(&osdc->lock); 4382 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { 4383 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); 4384 4385 mutex_lock(&osd->lock); 4386 for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) { 4387 struct ceph_osd_request *req = 4388 rb_entry(p, struct ceph_osd_request, r_node); 4389 4390 if (req->r_tid > last_tid) 4391 break; 4392 4393 if (!(req->r_flags & CEPH_OSD_FLAG_WRITE)) 4394 continue; 4395 4396 ceph_osdc_get_request(req); 4397 mutex_unlock(&osd->lock); 4398 up_read(&osdc->lock); 4399 dout("%s waiting on req %p tid %llu last_tid %llu\n", 4400 __func__, req, req->r_tid, last_tid); 4401 wait_for_completion(&req->r_completion); 4402 ceph_osdc_put_request(req); 4403 goto again; 4404 } 4405 4406 mutex_unlock(&osd->lock); 4407 } 4408 4409 up_read(&osdc->lock); 4410 dout("%s done last_tid %llu\n", __func__, last_tid); 4411 } 4412 EXPORT_SYMBOL(ceph_osdc_sync); 4413 4414 static struct ceph_osd_request * 4415 alloc_linger_request(struct ceph_osd_linger_request *lreq) 4416 { 4417 struct ceph_osd_request *req; 4418 4419 req = ceph_osdc_alloc_request(lreq->osdc, NULL, 1, false, GFP_NOIO); 4420 if (!req) 4421 return NULL; 4422 4423 ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); 4424 ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); 4425 4426 if (ceph_osdc_alloc_messages(req, GFP_NOIO)) { 4427 ceph_osdc_put_request(req); 4428 return NULL; 4429 } 4430 4431 return req; 4432 } 4433 4434 /* 4435 * Returns a handle, caller owns a ref. 4436 */ 4437 struct ceph_osd_linger_request * 4438 ceph_osdc_watch(struct ceph_osd_client *osdc, 4439 struct ceph_object_id *oid, 4440 struct ceph_object_locator *oloc, 4441 rados_watchcb2_t wcb, 4442 rados_watcherrcb_t errcb, 4443 void *data) 4444 { 4445 struct ceph_osd_linger_request *lreq; 4446 int ret; 4447 4448 lreq = linger_alloc(osdc); 4449 if (!lreq) 4450 return ERR_PTR(-ENOMEM); 4451 4452 lreq->is_watch = true; 4453 lreq->wcb = wcb; 4454 lreq->errcb = errcb; 4455 lreq->data = data; 4456 lreq->watch_valid_thru = jiffies; 4457 4458 ceph_oid_copy(&lreq->t.base_oid, oid); 4459 ceph_oloc_copy(&lreq->t.base_oloc, oloc); 4460 lreq->t.flags = CEPH_OSD_FLAG_WRITE; 4461 ktime_get_real_ts(&lreq->mtime); 4462 4463 lreq->reg_req = alloc_linger_request(lreq); 4464 if (!lreq->reg_req) { 4465 ret = -ENOMEM; 4466 goto err_put_lreq; 4467 } 4468 4469 lreq->ping_req = alloc_linger_request(lreq); 4470 if (!lreq->ping_req) { 4471 ret = -ENOMEM; 4472 goto err_put_lreq; 4473 } 4474 4475 down_write(&osdc->lock); 4476 linger_register(lreq); /* before osd_req_op_* */ 4477 osd_req_op_watch_init(lreq->reg_req, 0, lreq->linger_id, 4478 CEPH_OSD_WATCH_OP_WATCH); 4479 osd_req_op_watch_init(lreq->ping_req, 0, lreq->linger_id, 4480 CEPH_OSD_WATCH_OP_PING); 4481 linger_submit(lreq); 4482 up_write(&osdc->lock); 4483 4484 ret = linger_reg_commit_wait(lreq); 4485 if (ret) { 4486 linger_cancel(lreq); 4487 goto err_put_lreq; 4488 } 4489 4490 return lreq; 4491 4492 err_put_lreq: 4493 linger_put(lreq); 4494 return ERR_PTR(ret); 4495 } 4496 EXPORT_SYMBOL(ceph_osdc_watch); 4497 4498 /* 4499 * Releases a ref. 4500 * 4501 * Times out after mount_timeout to preserve rbd unmap behaviour 4502 * introduced in 2894e1d76974 ("rbd: timeout watch teardown on unmap 4503 * with mount_timeout"). 4504 */ 4505 int ceph_osdc_unwatch(struct ceph_osd_client *osdc, 4506 struct ceph_osd_linger_request *lreq) 4507 { 4508 struct ceph_options *opts = osdc->client->options; 4509 struct ceph_osd_request *req; 4510 int ret; 4511 4512 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); 4513 if (!req) 4514 return -ENOMEM; 4515 4516 ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); 4517 ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); 4518 req->r_flags = CEPH_OSD_FLAG_WRITE; 4519 ktime_get_real_ts(&req->r_mtime); 4520 osd_req_op_watch_init(req, 0, lreq->linger_id, 4521 CEPH_OSD_WATCH_OP_UNWATCH); 4522 4523 ret = ceph_osdc_alloc_messages(req, GFP_NOIO); 4524 if (ret) 4525 goto out_put_req; 4526 4527 ceph_osdc_start_request(osdc, req, false); 4528 linger_cancel(lreq); 4529 linger_put(lreq); 4530 ret = wait_request_timeout(req, opts->mount_timeout); 4531 4532 out_put_req: 4533 ceph_osdc_put_request(req); 4534 return ret; 4535 } 4536 EXPORT_SYMBOL(ceph_osdc_unwatch); 4537 4538 static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which, 4539 u64 notify_id, u64 cookie, void *payload, 4540 size_t payload_len) 4541 { 4542 struct ceph_osd_req_op *op; 4543 struct ceph_pagelist *pl; 4544 int ret; 4545 4546 op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0); 4547 4548 pl = kmalloc(sizeof(*pl), GFP_NOIO); 4549 if (!pl) 4550 return -ENOMEM; 4551 4552 ceph_pagelist_init(pl); 4553 ret = ceph_pagelist_encode_64(pl, notify_id); 4554 ret |= ceph_pagelist_encode_64(pl, cookie); 4555 if (payload) { 4556 ret |= ceph_pagelist_encode_32(pl, payload_len); 4557 ret |= ceph_pagelist_append(pl, payload, payload_len); 4558 } else { 4559 ret |= ceph_pagelist_encode_32(pl, 0); 4560 } 4561 if (ret) { 4562 ceph_pagelist_release(pl); 4563 return -ENOMEM; 4564 } 4565 4566 ceph_osd_data_pagelist_init(&op->notify_ack.request_data, pl); 4567 op->indata_len = pl->length; 4568 return 0; 4569 } 4570 4571 int ceph_osdc_notify_ack(struct ceph_osd_client *osdc, 4572 struct ceph_object_id *oid, 4573 struct ceph_object_locator *oloc, 4574 u64 notify_id, 4575 u64 cookie, 4576 void *payload, 4577 size_t payload_len) 4578 { 4579 struct ceph_osd_request *req; 4580 int ret; 4581 4582 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); 4583 if (!req) 4584 return -ENOMEM; 4585 4586 ceph_oid_copy(&req->r_base_oid, oid); 4587 ceph_oloc_copy(&req->r_base_oloc, oloc); 4588 req->r_flags = CEPH_OSD_FLAG_READ; 4589 4590 ret = ceph_osdc_alloc_messages(req, GFP_NOIO); 4591 if (ret) 4592 goto out_put_req; 4593 4594 ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload, 4595 payload_len); 4596 if (ret) 4597 goto out_put_req; 4598 4599 ceph_osdc_start_request(osdc, req, false); 4600 ret = ceph_osdc_wait_request(osdc, req); 4601 4602 out_put_req: 4603 ceph_osdc_put_request(req); 4604 return ret; 4605 } 4606 EXPORT_SYMBOL(ceph_osdc_notify_ack); 4607 4608 static int osd_req_op_notify_init(struct ceph_osd_request *req, int which, 4609 u64 cookie, u32 prot_ver, u32 timeout, 4610 void *payload, size_t payload_len) 4611 { 4612 struct ceph_osd_req_op *op; 4613 struct ceph_pagelist *pl; 4614 int ret; 4615 4616 op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0); 4617 op->notify.cookie = cookie; 4618 4619 pl = kmalloc(sizeof(*pl), GFP_NOIO); 4620 if (!pl) 4621 return -ENOMEM; 4622 4623 ceph_pagelist_init(pl); 4624 ret = ceph_pagelist_encode_32(pl, 1); /* prot_ver */ 4625 ret |= ceph_pagelist_encode_32(pl, timeout); 4626 ret |= ceph_pagelist_encode_32(pl, payload_len); 4627 ret |= ceph_pagelist_append(pl, payload, payload_len); 4628 if (ret) { 4629 ceph_pagelist_release(pl); 4630 return -ENOMEM; 4631 } 4632 4633 ceph_osd_data_pagelist_init(&op->notify.request_data, pl); 4634 op->indata_len = pl->length; 4635 return 0; 4636 } 4637 4638 /* 4639 * @timeout: in seconds 4640 * 4641 * @preply_{pages,len} are initialized both on success and error. 4642 * The caller is responsible for: 4643 * 4644 * ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len)) 4645 */ 4646 int ceph_osdc_notify(struct ceph_osd_client *osdc, 4647 struct ceph_object_id *oid, 4648 struct ceph_object_locator *oloc, 4649 void *payload, 4650 size_t payload_len, 4651 u32 timeout, 4652 struct page ***preply_pages, 4653 size_t *preply_len) 4654 { 4655 struct ceph_osd_linger_request *lreq; 4656 struct page **pages; 4657 int ret; 4658 4659 WARN_ON(!timeout); 4660 if (preply_pages) { 4661 *preply_pages = NULL; 4662 *preply_len = 0; 4663 } 4664 4665 lreq = linger_alloc(osdc); 4666 if (!lreq) 4667 return -ENOMEM; 4668 4669 lreq->preply_pages = preply_pages; 4670 lreq->preply_len = preply_len; 4671 4672 ceph_oid_copy(&lreq->t.base_oid, oid); 4673 ceph_oloc_copy(&lreq->t.base_oloc, oloc); 4674 lreq->t.flags = CEPH_OSD_FLAG_READ; 4675 4676 lreq->reg_req = alloc_linger_request(lreq); 4677 if (!lreq->reg_req) { 4678 ret = -ENOMEM; 4679 goto out_put_lreq; 4680 } 4681 4682 /* for notify_id */ 4683 pages = ceph_alloc_page_vector(1, GFP_NOIO); 4684 if (IS_ERR(pages)) { 4685 ret = PTR_ERR(pages); 4686 goto out_put_lreq; 4687 } 4688 4689 down_write(&osdc->lock); 4690 linger_register(lreq); /* before osd_req_op_* */ 4691 ret = osd_req_op_notify_init(lreq->reg_req, 0, lreq->linger_id, 1, 4692 timeout, payload, payload_len); 4693 if (ret) { 4694 linger_unregister(lreq); 4695 up_write(&osdc->lock); 4696 ceph_release_page_vector(pages, 1); 4697 goto out_put_lreq; 4698 } 4699 ceph_osd_data_pages_init(osd_req_op_data(lreq->reg_req, 0, notify, 4700 response_data), 4701 pages, PAGE_SIZE, 0, false, true); 4702 linger_submit(lreq); 4703 up_write(&osdc->lock); 4704 4705 ret = linger_reg_commit_wait(lreq); 4706 if (!ret) 4707 ret = linger_notify_finish_wait(lreq); 4708 else 4709 dout("lreq %p failed to initiate notify %d\n", lreq, ret); 4710 4711 linger_cancel(lreq); 4712 out_put_lreq: 4713 linger_put(lreq); 4714 return ret; 4715 } 4716 EXPORT_SYMBOL(ceph_osdc_notify); 4717 4718 /* 4719 * Return the number of milliseconds since the watch was last 4720 * confirmed, or an error. If there is an error, the watch is no 4721 * longer valid, and should be destroyed with ceph_osdc_unwatch(). 4722 */ 4723 int ceph_osdc_watch_check(struct ceph_osd_client *osdc, 4724 struct ceph_osd_linger_request *lreq) 4725 { 4726 unsigned long stamp, age; 4727 int ret; 4728 4729 down_read(&osdc->lock); 4730 mutex_lock(&lreq->lock); 4731 stamp = lreq->watch_valid_thru; 4732 if (!list_empty(&lreq->pending_lworks)) { 4733 struct linger_work *lwork = 4734 list_first_entry(&lreq->pending_lworks, 4735 struct linger_work, 4736 pending_item); 4737 4738 if (time_before(lwork->queued_stamp, stamp)) 4739 stamp = lwork->queued_stamp; 4740 } 4741 age = jiffies - stamp; 4742 dout("%s lreq %p linger_id %llu age %lu last_error %d\n", __func__, 4743 lreq, lreq->linger_id, age, lreq->last_error); 4744 /* we are truncating to msecs, so return a safe upper bound */ 4745 ret = lreq->last_error ?: 1 + jiffies_to_msecs(age); 4746 4747 mutex_unlock(&lreq->lock); 4748 up_read(&osdc->lock); 4749 return ret; 4750 } 4751 4752 static int decode_watcher(void **p, void *end, struct ceph_watch_item *item) 4753 { 4754 u8 struct_v; 4755 u32 struct_len; 4756 int ret; 4757 4758 ret = ceph_start_decoding(p, end, 2, "watch_item_t", 4759 &struct_v, &struct_len); 4760 if (ret) 4761 return ret; 4762 4763 ceph_decode_copy(p, &item->name, sizeof(item->name)); 4764 item->cookie = ceph_decode_64(p); 4765 *p += 4; /* skip timeout_seconds */ 4766 if (struct_v >= 2) { 4767 ceph_decode_copy(p, &item->addr, sizeof(item->addr)); 4768 ceph_decode_addr(&item->addr); 4769 } 4770 4771 dout("%s %s%llu cookie %llu addr %s\n", __func__, 4772 ENTITY_NAME(item->name), item->cookie, 4773 ceph_pr_addr(&item->addr.in_addr)); 4774 return 0; 4775 } 4776 4777 static int decode_watchers(void **p, void *end, 4778 struct ceph_watch_item **watchers, 4779 u32 *num_watchers) 4780 { 4781 u8 struct_v; 4782 u32 struct_len; 4783 int i; 4784 int ret; 4785 4786 ret = ceph_start_decoding(p, end, 1, "obj_list_watch_response_t", 4787 &struct_v, &struct_len); 4788 if (ret) 4789 return ret; 4790 4791 *num_watchers = ceph_decode_32(p); 4792 *watchers = kcalloc(*num_watchers, sizeof(**watchers), GFP_NOIO); 4793 if (!*watchers) 4794 return -ENOMEM; 4795 4796 for (i = 0; i < *num_watchers; i++) { 4797 ret = decode_watcher(p, end, *watchers + i); 4798 if (ret) { 4799 kfree(*watchers); 4800 return ret; 4801 } 4802 } 4803 4804 return 0; 4805 } 4806 4807 /* 4808 * On success, the caller is responsible for: 4809 * 4810 * kfree(watchers); 4811 */ 4812 int ceph_osdc_list_watchers(struct ceph_osd_client *osdc, 4813 struct ceph_object_id *oid, 4814 struct ceph_object_locator *oloc, 4815 struct ceph_watch_item **watchers, 4816 u32 *num_watchers) 4817 { 4818 struct ceph_osd_request *req; 4819 struct page **pages; 4820 int ret; 4821 4822 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); 4823 if (!req) 4824 return -ENOMEM; 4825 4826 ceph_oid_copy(&req->r_base_oid, oid); 4827 ceph_oloc_copy(&req->r_base_oloc, oloc); 4828 req->r_flags = CEPH_OSD_FLAG_READ; 4829 4830 ret = ceph_osdc_alloc_messages(req, GFP_NOIO); 4831 if (ret) 4832 goto out_put_req; 4833 4834 pages = ceph_alloc_page_vector(1, GFP_NOIO); 4835 if (IS_ERR(pages)) { 4836 ret = PTR_ERR(pages); 4837 goto out_put_req; 4838 } 4839 4840 osd_req_op_init(req, 0, CEPH_OSD_OP_LIST_WATCHERS, 0); 4841 ceph_osd_data_pages_init(osd_req_op_data(req, 0, list_watchers, 4842 response_data), 4843 pages, PAGE_SIZE, 0, false, true); 4844 4845 ceph_osdc_start_request(osdc, req, false); 4846 ret = ceph_osdc_wait_request(osdc, req); 4847 if (ret >= 0) { 4848 void *p = page_address(pages[0]); 4849 void *const end = p + req->r_ops[0].outdata_len; 4850 4851 ret = decode_watchers(&p, end, watchers, num_watchers); 4852 } 4853 4854 out_put_req: 4855 ceph_osdc_put_request(req); 4856 return ret; 4857 } 4858 EXPORT_SYMBOL(ceph_osdc_list_watchers); 4859 4860 /* 4861 * Call all pending notify callbacks - for use after a watch is 4862 * unregistered, to make sure no more callbacks for it will be invoked 4863 */ 4864 void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc) 4865 { 4866 dout("%s osdc %p\n", __func__, osdc); 4867 flush_workqueue(osdc->notify_wq); 4868 } 4869 EXPORT_SYMBOL(ceph_osdc_flush_notifies); 4870 4871 void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc) 4872 { 4873 down_read(&osdc->lock); 4874 maybe_request_map(osdc); 4875 up_read(&osdc->lock); 4876 } 4877 EXPORT_SYMBOL(ceph_osdc_maybe_request_map); 4878 4879 /* 4880 * Execute an OSD class method on an object. 4881 * 4882 * @flags: CEPH_OSD_FLAG_* 4883 * @resp_len: in/out param for reply length 4884 */ 4885 int ceph_osdc_call(struct ceph_osd_client *osdc, 4886 struct ceph_object_id *oid, 4887 struct ceph_object_locator *oloc, 4888 const char *class, const char *method, 4889 unsigned int flags, 4890 struct page *req_page, size_t req_len, 4891 struct page *resp_page, size_t *resp_len) 4892 { 4893 struct ceph_osd_request *req; 4894 int ret; 4895 4896 if (req_len > PAGE_SIZE || (resp_page && *resp_len > PAGE_SIZE)) 4897 return -E2BIG; 4898 4899 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); 4900 if (!req) 4901 return -ENOMEM; 4902 4903 ceph_oid_copy(&req->r_base_oid, oid); 4904 ceph_oloc_copy(&req->r_base_oloc, oloc); 4905 req->r_flags = flags; 4906 4907 ret = ceph_osdc_alloc_messages(req, GFP_NOIO); 4908 if (ret) 4909 goto out_put_req; 4910 4911 osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method); 4912 if (req_page) 4913 osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len, 4914 0, false, false); 4915 if (resp_page) 4916 osd_req_op_cls_response_data_pages(req, 0, &resp_page, 4917 *resp_len, 0, false, false); 4918 4919 ceph_osdc_start_request(osdc, req, false); 4920 ret = ceph_osdc_wait_request(osdc, req); 4921 if (ret >= 0) { 4922 ret = req->r_ops[0].rval; 4923 if (resp_page) 4924 *resp_len = req->r_ops[0].outdata_len; 4925 } 4926 4927 out_put_req: 4928 ceph_osdc_put_request(req); 4929 return ret; 4930 } 4931 EXPORT_SYMBOL(ceph_osdc_call); 4932 4933 /* 4934 * init, shutdown 4935 */ 4936 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) 4937 { 4938 int err; 4939 4940 dout("init\n"); 4941 osdc->client = client; 4942 init_rwsem(&osdc->lock); 4943 osdc->osds = RB_ROOT; 4944 INIT_LIST_HEAD(&osdc->osd_lru); 4945 spin_lock_init(&osdc->osd_lru_lock); 4946 osd_init(&osdc->homeless_osd); 4947 osdc->homeless_osd.o_osdc = osdc; 4948 osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD; 4949 osdc->last_linger_id = CEPH_LINGER_ID_START; 4950 osdc->linger_requests = RB_ROOT; 4951 osdc->map_checks = RB_ROOT; 4952 osdc->linger_map_checks = RB_ROOT; 4953 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); 4954 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); 4955 4956 err = -ENOMEM; 4957 osdc->osdmap = ceph_osdmap_alloc(); 4958 if (!osdc->osdmap) 4959 goto out; 4960 4961 osdc->req_mempool = mempool_create_slab_pool(10, 4962 ceph_osd_request_cache); 4963 if (!osdc->req_mempool) 4964 goto out_map; 4965 4966 err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP, 4967 PAGE_SIZE, 10, true, "osd_op"); 4968 if (err < 0) 4969 goto out_mempool; 4970 err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY, 4971 PAGE_SIZE, 10, true, "osd_op_reply"); 4972 if (err < 0) 4973 goto out_msgpool; 4974 4975 err = -ENOMEM; 4976 osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify"); 4977 if (!osdc->notify_wq) 4978 goto out_msgpool_reply; 4979 4980 schedule_delayed_work(&osdc->timeout_work, 4981 osdc->client->options->osd_keepalive_timeout); 4982 schedule_delayed_work(&osdc->osds_timeout_work, 4983 round_jiffies_relative(osdc->client->options->osd_idle_ttl)); 4984 4985 return 0; 4986 4987 out_msgpool_reply: 4988 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 4989 out_msgpool: 4990 ceph_msgpool_destroy(&osdc->msgpool_op); 4991 out_mempool: 4992 mempool_destroy(osdc->req_mempool); 4993 out_map: 4994 ceph_osdmap_destroy(osdc->osdmap); 4995 out: 4996 return err; 4997 } 4998 4999 void ceph_osdc_stop(struct ceph_osd_client *osdc) 5000 { 5001 flush_workqueue(osdc->notify_wq); 5002 destroy_workqueue(osdc->notify_wq); 5003 cancel_delayed_work_sync(&osdc->timeout_work); 5004 cancel_delayed_work_sync(&osdc->osds_timeout_work); 5005 5006 down_write(&osdc->lock); 5007 while (!RB_EMPTY_ROOT(&osdc->osds)) { 5008 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds), 5009 struct ceph_osd, o_node); 5010 close_osd(osd); 5011 } 5012 up_write(&osdc->lock); 5013 WARN_ON(refcount_read(&osdc->homeless_osd.o_ref) != 1); 5014 osd_cleanup(&osdc->homeless_osd); 5015 5016 WARN_ON(!list_empty(&osdc->osd_lru)); 5017 WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_requests)); 5018 WARN_ON(!RB_EMPTY_ROOT(&osdc->map_checks)); 5019 WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_map_checks)); 5020 WARN_ON(atomic_read(&osdc->num_requests)); 5021 WARN_ON(atomic_read(&osdc->num_homeless)); 5022 5023 ceph_osdmap_destroy(osdc->osdmap); 5024 mempool_destroy(osdc->req_mempool); 5025 ceph_msgpool_destroy(&osdc->msgpool_op); 5026 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 5027 } 5028 5029 /* 5030 * Read some contiguous pages. If we cross a stripe boundary, shorten 5031 * *plen. Return number of bytes read, or error. 5032 */ 5033 int ceph_osdc_readpages(struct ceph_osd_client *osdc, 5034 struct ceph_vino vino, struct ceph_file_layout *layout, 5035 u64 off, u64 *plen, 5036 u32 truncate_seq, u64 truncate_size, 5037 struct page **pages, int num_pages, int page_align) 5038 { 5039 struct ceph_osd_request *req; 5040 int rc = 0; 5041 5042 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino, 5043 vino.snap, off, *plen); 5044 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 0, 1, 5045 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 5046 NULL, truncate_seq, truncate_size, 5047 false); 5048 if (IS_ERR(req)) 5049 return PTR_ERR(req); 5050 5051 /* it may be a short read due to an object boundary */ 5052 osd_req_op_extent_osd_data_pages(req, 0, 5053 pages, *plen, page_align, false, false); 5054 5055 dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n", 5056 off, *plen, *plen, page_align); 5057 5058 rc = ceph_osdc_start_request(osdc, req, false); 5059 if (!rc) 5060 rc = ceph_osdc_wait_request(osdc, req); 5061 5062 ceph_osdc_put_request(req); 5063 dout("readpages result %d\n", rc); 5064 return rc; 5065 } 5066 EXPORT_SYMBOL(ceph_osdc_readpages); 5067 5068 /* 5069 * do a synchronous write on N pages 5070 */ 5071 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, 5072 struct ceph_file_layout *layout, 5073 struct ceph_snap_context *snapc, 5074 u64 off, u64 len, 5075 u32 truncate_seq, u64 truncate_size, 5076 struct timespec *mtime, 5077 struct page **pages, int num_pages) 5078 { 5079 struct ceph_osd_request *req; 5080 int rc = 0; 5081 int page_align = off & ~PAGE_MASK; 5082 5083 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1, 5084 CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, 5085 snapc, truncate_seq, truncate_size, 5086 true); 5087 if (IS_ERR(req)) 5088 return PTR_ERR(req); 5089 5090 /* it may be a short write due to an object boundary */ 5091 osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align, 5092 false, false); 5093 dout("writepages %llu~%llu (%llu bytes)\n", off, len, len); 5094 5095 req->r_mtime = *mtime; 5096 rc = ceph_osdc_start_request(osdc, req, true); 5097 if (!rc) 5098 rc = ceph_osdc_wait_request(osdc, req); 5099 5100 ceph_osdc_put_request(req); 5101 if (rc == 0) 5102 rc = len; 5103 dout("writepages result %d\n", rc); 5104 return rc; 5105 } 5106 EXPORT_SYMBOL(ceph_osdc_writepages); 5107 5108 int __init ceph_osdc_setup(void) 5109 { 5110 size_t size = sizeof(struct ceph_osd_request) + 5111 CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op); 5112 5113 BUG_ON(ceph_osd_request_cache); 5114 ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size, 5115 0, 0, NULL); 5116 5117 return ceph_osd_request_cache ? 0 : -ENOMEM; 5118 } 5119 5120 void ceph_osdc_cleanup(void) 5121 { 5122 BUG_ON(!ceph_osd_request_cache); 5123 kmem_cache_destroy(ceph_osd_request_cache); 5124 ceph_osd_request_cache = NULL; 5125 } 5126 5127 /* 5128 * handle incoming message 5129 */ 5130 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 5131 { 5132 struct ceph_osd *osd = con->private; 5133 struct ceph_osd_client *osdc = osd->o_osdc; 5134 int type = le16_to_cpu(msg->hdr.type); 5135 5136 switch (type) { 5137 case CEPH_MSG_OSD_MAP: 5138 ceph_osdc_handle_map(osdc, msg); 5139 break; 5140 case CEPH_MSG_OSD_OPREPLY: 5141 handle_reply(osd, msg); 5142 break; 5143 case CEPH_MSG_OSD_BACKOFF: 5144 handle_backoff(osd, msg); 5145 break; 5146 case CEPH_MSG_WATCH_NOTIFY: 5147 handle_watch_notify(osdc, msg); 5148 break; 5149 5150 default: 5151 pr_err("received unknown message type %d %s\n", type, 5152 ceph_msg_type_name(type)); 5153 } 5154 5155 ceph_msg_put(msg); 5156 } 5157 5158 /* 5159 * Lookup and return message for incoming reply. Don't try to do 5160 * anything about a larger than preallocated data portion of the 5161 * message at the moment - for now, just skip the message. 5162 */ 5163 static struct ceph_msg *get_reply(struct ceph_connection *con, 5164 struct ceph_msg_header *hdr, 5165 int *skip) 5166 { 5167 struct ceph_osd *osd = con->private; 5168 struct ceph_osd_client *osdc = osd->o_osdc; 5169 struct ceph_msg *m = NULL; 5170 struct ceph_osd_request *req; 5171 int front_len = le32_to_cpu(hdr->front_len); 5172 int data_len = le32_to_cpu(hdr->data_len); 5173 u64 tid = le64_to_cpu(hdr->tid); 5174 5175 down_read(&osdc->lock); 5176 if (!osd_registered(osd)) { 5177 dout("%s osd%d unknown, skipping\n", __func__, osd->o_osd); 5178 *skip = 1; 5179 goto out_unlock_osdc; 5180 } 5181 WARN_ON(osd->o_osd != le64_to_cpu(hdr->src.num)); 5182 5183 mutex_lock(&osd->lock); 5184 req = lookup_request(&osd->o_requests, tid); 5185 if (!req) { 5186 dout("%s osd%d tid %llu unknown, skipping\n", __func__, 5187 osd->o_osd, tid); 5188 *skip = 1; 5189 goto out_unlock_session; 5190 } 5191 5192 ceph_msg_revoke_incoming(req->r_reply); 5193 5194 if (front_len > req->r_reply->front_alloc_len) { 5195 pr_warn("%s osd%d tid %llu front %d > preallocated %d\n", 5196 __func__, osd->o_osd, req->r_tid, front_len, 5197 req->r_reply->front_alloc_len); 5198 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS, 5199 false); 5200 if (!m) 5201 goto out_unlock_session; 5202 ceph_msg_put(req->r_reply); 5203 req->r_reply = m; 5204 } 5205 5206 if (data_len > req->r_reply->data_length) { 5207 pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n", 5208 __func__, osd->o_osd, req->r_tid, data_len, 5209 req->r_reply->data_length); 5210 m = NULL; 5211 *skip = 1; 5212 goto out_unlock_session; 5213 } 5214 5215 m = ceph_msg_get(req->r_reply); 5216 dout("get_reply tid %lld %p\n", tid, m); 5217 5218 out_unlock_session: 5219 mutex_unlock(&osd->lock); 5220 out_unlock_osdc: 5221 up_read(&osdc->lock); 5222 return m; 5223 } 5224 5225 /* 5226 * TODO: switch to a msg-owned pagelist 5227 */ 5228 static struct ceph_msg *alloc_msg_with_page_vector(struct ceph_msg_header *hdr) 5229 { 5230 struct ceph_msg *m; 5231 int type = le16_to_cpu(hdr->type); 5232 u32 front_len = le32_to_cpu(hdr->front_len); 5233 u32 data_len = le32_to_cpu(hdr->data_len); 5234 5235 m = ceph_msg_new(type, front_len, GFP_NOIO, false); 5236 if (!m) 5237 return NULL; 5238 5239 if (data_len) { 5240 struct page **pages; 5241 struct ceph_osd_data osd_data; 5242 5243 pages = ceph_alloc_page_vector(calc_pages_for(0, data_len), 5244 GFP_NOIO); 5245 if (IS_ERR(pages)) { 5246 ceph_msg_put(m); 5247 return NULL; 5248 } 5249 5250 ceph_osd_data_pages_init(&osd_data, pages, data_len, 0, false, 5251 false); 5252 ceph_osdc_msg_data_add(m, &osd_data); 5253 } 5254 5255 return m; 5256 } 5257 5258 static struct ceph_msg *alloc_msg(struct ceph_connection *con, 5259 struct ceph_msg_header *hdr, 5260 int *skip) 5261 { 5262 struct ceph_osd *osd = con->private; 5263 int type = le16_to_cpu(hdr->type); 5264 5265 *skip = 0; 5266 switch (type) { 5267 case CEPH_MSG_OSD_MAP: 5268 case CEPH_MSG_OSD_BACKOFF: 5269 case CEPH_MSG_WATCH_NOTIFY: 5270 return alloc_msg_with_page_vector(hdr); 5271 case CEPH_MSG_OSD_OPREPLY: 5272 return get_reply(con, hdr, skip); 5273 default: 5274 pr_warn("%s osd%d unknown msg type %d, skipping\n", __func__, 5275 osd->o_osd, type); 5276 *skip = 1; 5277 return NULL; 5278 } 5279 } 5280 5281 /* 5282 * Wrappers to refcount containing ceph_osd struct 5283 */ 5284 static struct ceph_connection *get_osd_con(struct ceph_connection *con) 5285 { 5286 struct ceph_osd *osd = con->private; 5287 if (get_osd(osd)) 5288 return con; 5289 return NULL; 5290 } 5291 5292 static void put_osd_con(struct ceph_connection *con) 5293 { 5294 struct ceph_osd *osd = con->private; 5295 put_osd(osd); 5296 } 5297 5298 /* 5299 * authentication 5300 */ 5301 /* 5302 * Note: returned pointer is the address of a structure that's 5303 * managed separately. Caller must *not* attempt to free it. 5304 */ 5305 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 5306 int *proto, int force_new) 5307 { 5308 struct ceph_osd *o = con->private; 5309 struct ceph_osd_client *osdc = o->o_osdc; 5310 struct ceph_auth_client *ac = osdc->client->monc.auth; 5311 struct ceph_auth_handshake *auth = &o->o_auth; 5312 5313 if (force_new && auth->authorizer) { 5314 ceph_auth_destroy_authorizer(auth->authorizer); 5315 auth->authorizer = NULL; 5316 } 5317 if (!auth->authorizer) { 5318 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 5319 auth); 5320 if (ret) 5321 return ERR_PTR(ret); 5322 } else { 5323 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 5324 auth); 5325 if (ret) 5326 return ERR_PTR(ret); 5327 } 5328 *proto = ac->protocol; 5329 5330 return auth; 5331 } 5332 5333 5334 static int verify_authorizer_reply(struct ceph_connection *con) 5335 { 5336 struct ceph_osd *o = con->private; 5337 struct ceph_osd_client *osdc = o->o_osdc; 5338 struct ceph_auth_client *ac = osdc->client->monc.auth; 5339 5340 return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer); 5341 } 5342 5343 static int invalidate_authorizer(struct ceph_connection *con) 5344 { 5345 struct ceph_osd *o = con->private; 5346 struct ceph_osd_client *osdc = o->o_osdc; 5347 struct ceph_auth_client *ac = osdc->client->monc.auth; 5348 5349 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD); 5350 return ceph_monc_validate_auth(&osdc->client->monc); 5351 } 5352 5353 static void osd_reencode_message(struct ceph_msg *msg) 5354 { 5355 int type = le16_to_cpu(msg->hdr.type); 5356 5357 if (type == CEPH_MSG_OSD_OP) 5358 encode_request_finish(msg); 5359 } 5360 5361 static int osd_sign_message(struct ceph_msg *msg) 5362 { 5363 struct ceph_osd *o = msg->con->private; 5364 struct ceph_auth_handshake *auth = &o->o_auth; 5365 5366 return ceph_auth_sign_message(auth, msg); 5367 } 5368 5369 static int osd_check_message_signature(struct ceph_msg *msg) 5370 { 5371 struct ceph_osd *o = msg->con->private; 5372 struct ceph_auth_handshake *auth = &o->o_auth; 5373 5374 return ceph_auth_check_message_signature(auth, msg); 5375 } 5376 5377 static const struct ceph_connection_operations osd_con_ops = { 5378 .get = get_osd_con, 5379 .put = put_osd_con, 5380 .dispatch = dispatch, 5381 .get_authorizer = get_authorizer, 5382 .verify_authorizer_reply = verify_authorizer_reply, 5383 .invalidate_authorizer = invalidate_authorizer, 5384 .alloc_msg = alloc_msg, 5385 .reencode_message = osd_reencode_message, 5386 .sign_message = osd_sign_message, 5387 .check_message_signature = osd_check_message_signature, 5388 .fault = osd_fault, 5389 }; 5390