1 // SPDX-License-Identifier: GPL-2.0 2 3 #include <linux/ceph/ceph_debug.h> 4 5 #include <linux/module.h> 6 #include <linux/slab.h> 7 8 #include <linux/ceph/libceph.h> 9 #include <linux/ceph/osdmap.h> 10 #include <linux/ceph/decode.h> 11 #include <linux/crush/hash.h> 12 #include <linux/crush/mapper.h> 13 14 static __printf(2, 3) 15 void osdmap_info(const struct ceph_osdmap *map, const char *fmt, ...) 16 { 17 struct va_format vaf; 18 va_list args; 19 20 va_start(args, fmt); 21 vaf.fmt = fmt; 22 vaf.va = &args; 23 24 printk(KERN_INFO "%s (%pU e%u): %pV", KBUILD_MODNAME, &map->fsid, 25 map->epoch, &vaf); 26 27 va_end(args); 28 } 29 30 char *ceph_osdmap_state_str(char *str, int len, u32 state) 31 { 32 if (!len) 33 return str; 34 35 if ((state & CEPH_OSD_EXISTS) && (state & CEPH_OSD_UP)) 36 snprintf(str, len, "exists, up"); 37 else if (state & CEPH_OSD_EXISTS) 38 snprintf(str, len, "exists"); 39 else if (state & CEPH_OSD_UP) 40 snprintf(str, len, "up"); 41 else 42 snprintf(str, len, "doesn't exist"); 43 44 return str; 45 } 46 47 /* maps */ 48 49 static int calc_bits_of(unsigned int t) 50 { 51 int b = 0; 52 while (t) { 53 t = t >> 1; 54 b++; 55 } 56 return b; 57 } 58 59 /* 60 * the foo_mask is the smallest value 2^n-1 that is >= foo. 61 */ 62 static void calc_pg_masks(struct ceph_pg_pool_info *pi) 63 { 64 pi->pg_num_mask = (1 << calc_bits_of(pi->pg_num-1)) - 1; 65 pi->pgp_num_mask = (1 << calc_bits_of(pi->pgp_num-1)) - 1; 66 } 67 68 /* 69 * decode crush map 70 */ 71 static int crush_decode_uniform_bucket(void **p, void *end, 72 struct crush_bucket_uniform *b) 73 { 74 dout("crush_decode_uniform_bucket %p to %p\n", *p, end); 75 ceph_decode_32_safe(p, end, b->item_weight, bad); 76 return 0; 77 bad: 78 return -EINVAL; 79 } 80 81 static int crush_decode_list_bucket(void **p, void *end, 82 struct crush_bucket_list *b) 83 { 84 int j; 85 dout("crush_decode_list_bucket %p to %p\n", *p, end); 86 b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS); 87 if (b->item_weights == NULL) 88 return -ENOMEM; 89 b->sum_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS); 90 if (b->sum_weights == NULL) 91 return -ENOMEM; 92 ceph_decode_need(p, end, 2 * b->h.size * sizeof(u32), bad); 93 for (j = 0; j < b->h.size; j++) { 94 b->item_weights[j] = ceph_decode_32(p); 95 b->sum_weights[j] = ceph_decode_32(p); 96 } 97 return 0; 98 bad: 99 return -EINVAL; 100 } 101 102 static int crush_decode_tree_bucket(void **p, void *end, 103 struct crush_bucket_tree *b) 104 { 105 int j; 106 dout("crush_decode_tree_bucket %p to %p\n", *p, end); 107 ceph_decode_8_safe(p, end, b->num_nodes, bad); 108 b->node_weights = kcalloc(b->num_nodes, sizeof(u32), GFP_NOFS); 109 if (b->node_weights == NULL) 110 return -ENOMEM; 111 ceph_decode_need(p, end, b->num_nodes * sizeof(u32), bad); 112 for (j = 0; j < b->num_nodes; j++) 113 b->node_weights[j] = ceph_decode_32(p); 114 return 0; 115 bad: 116 return -EINVAL; 117 } 118 119 static int crush_decode_straw_bucket(void **p, void *end, 120 struct crush_bucket_straw *b) 121 { 122 int j; 123 dout("crush_decode_straw_bucket %p to %p\n", *p, end); 124 b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS); 125 if (b->item_weights == NULL) 126 return -ENOMEM; 127 b->straws = kcalloc(b->h.size, sizeof(u32), GFP_NOFS); 128 if (b->straws == NULL) 129 return -ENOMEM; 130 ceph_decode_need(p, end, 2 * b->h.size * sizeof(u32), bad); 131 for (j = 0; j < b->h.size; j++) { 132 b->item_weights[j] = ceph_decode_32(p); 133 b->straws[j] = ceph_decode_32(p); 134 } 135 return 0; 136 bad: 137 return -EINVAL; 138 } 139 140 static int crush_decode_straw2_bucket(void **p, void *end, 141 struct crush_bucket_straw2 *b) 142 { 143 int j; 144 dout("crush_decode_straw2_bucket %p to %p\n", *p, end); 145 b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS); 146 if (b->item_weights == NULL) 147 return -ENOMEM; 148 ceph_decode_need(p, end, b->h.size * sizeof(u32), bad); 149 for (j = 0; j < b->h.size; j++) 150 b->item_weights[j] = ceph_decode_32(p); 151 return 0; 152 bad: 153 return -EINVAL; 154 } 155 156 struct crush_name_node { 157 struct rb_node cn_node; 158 int cn_id; 159 char cn_name[]; 160 }; 161 162 static struct crush_name_node *alloc_crush_name(size_t name_len) 163 { 164 struct crush_name_node *cn; 165 166 cn = kmalloc(sizeof(*cn) + name_len + 1, GFP_NOIO); 167 if (!cn) 168 return NULL; 169 170 RB_CLEAR_NODE(&cn->cn_node); 171 return cn; 172 } 173 174 static void free_crush_name(struct crush_name_node *cn) 175 { 176 WARN_ON(!RB_EMPTY_NODE(&cn->cn_node)); 177 178 kfree(cn); 179 } 180 181 DEFINE_RB_FUNCS(crush_name, struct crush_name_node, cn_id, cn_node) 182 183 static int decode_crush_names(void **p, void *end, struct rb_root *root) 184 { 185 u32 n; 186 187 ceph_decode_32_safe(p, end, n, e_inval); 188 while (n--) { 189 struct crush_name_node *cn; 190 int id; 191 u32 name_len; 192 193 ceph_decode_32_safe(p, end, id, e_inval); 194 ceph_decode_32_safe(p, end, name_len, e_inval); 195 ceph_decode_need(p, end, name_len, e_inval); 196 197 cn = alloc_crush_name(name_len); 198 if (!cn) 199 return -ENOMEM; 200 201 cn->cn_id = id; 202 memcpy(cn->cn_name, *p, name_len); 203 cn->cn_name[name_len] = '\0'; 204 *p += name_len; 205 206 if (!__insert_crush_name(root, cn)) { 207 free_crush_name(cn); 208 return -EEXIST; 209 } 210 } 211 212 return 0; 213 214 e_inval: 215 return -EINVAL; 216 } 217 218 void clear_crush_names(struct rb_root *root) 219 { 220 while (!RB_EMPTY_ROOT(root)) { 221 struct crush_name_node *cn = 222 rb_entry(rb_first(root), struct crush_name_node, cn_node); 223 224 erase_crush_name(root, cn); 225 free_crush_name(cn); 226 } 227 } 228 229 static struct crush_choose_arg_map *alloc_choose_arg_map(void) 230 { 231 struct crush_choose_arg_map *arg_map; 232 233 arg_map = kzalloc_obj(*arg_map, GFP_NOIO); 234 if (!arg_map) 235 return NULL; 236 237 RB_CLEAR_NODE(&arg_map->node); 238 return arg_map; 239 } 240 241 static void free_choose_arg_map(struct crush_choose_arg_map *arg_map) 242 { 243 int i, j; 244 245 if (!arg_map) 246 return; 247 248 WARN_ON(!RB_EMPTY_NODE(&arg_map->node)); 249 250 if (arg_map->args) { 251 for (i = 0; i < arg_map->size; i++) { 252 struct crush_choose_arg *arg = &arg_map->args[i]; 253 if (arg->weight_set) { 254 for (j = 0; j < arg->weight_set_size; j++) 255 kfree(arg->weight_set[j].weights); 256 kfree(arg->weight_set); 257 } 258 kfree(arg->ids); 259 } 260 kfree(arg_map->args); 261 } 262 kfree(arg_map); 263 } 264 265 DEFINE_RB_FUNCS(choose_arg_map, struct crush_choose_arg_map, choose_args_index, 266 node); 267 268 void clear_choose_args(struct crush_map *c) 269 { 270 while (!RB_EMPTY_ROOT(&c->choose_args)) { 271 struct crush_choose_arg_map *arg_map = 272 rb_entry(rb_first(&c->choose_args), 273 struct crush_choose_arg_map, node); 274 275 erase_choose_arg_map(&c->choose_args, arg_map); 276 free_choose_arg_map(arg_map); 277 } 278 } 279 280 static u32 *decode_array_32_alloc(void **p, void *end, u32 *plen) 281 { 282 u32 *a = NULL; 283 u32 len; 284 int ret; 285 286 ceph_decode_32_safe(p, end, len, e_inval); 287 if (len) { 288 u32 i; 289 290 a = kmalloc_array(len, sizeof(u32), GFP_NOIO); 291 if (!a) { 292 ret = -ENOMEM; 293 goto fail; 294 } 295 296 ceph_decode_need(p, end, len * sizeof(u32), e_inval); 297 for (i = 0; i < len; i++) 298 a[i] = ceph_decode_32(p); 299 } 300 301 *plen = len; 302 return a; 303 304 e_inval: 305 ret = -EINVAL; 306 fail: 307 kfree(a); 308 return ERR_PTR(ret); 309 } 310 311 /* 312 * Assumes @arg is zero-initialized. 313 */ 314 static int decode_choose_arg(void **p, void *end, struct crush_choose_arg *arg) 315 { 316 int ret; 317 318 ceph_decode_32_safe(p, end, arg->weight_set_size, e_inval); 319 if (arg->weight_set_size) { 320 u32 i; 321 322 arg->weight_set = kmalloc_objs(*arg->weight_set, 323 arg->weight_set_size, GFP_NOIO); 324 if (!arg->weight_set) 325 return -ENOMEM; 326 327 for (i = 0; i < arg->weight_set_size; i++) { 328 struct crush_weight_set *w = &arg->weight_set[i]; 329 330 w->weights = decode_array_32_alloc(p, end, &w->size); 331 if (IS_ERR(w->weights)) { 332 ret = PTR_ERR(w->weights); 333 w->weights = NULL; 334 return ret; 335 } 336 } 337 } 338 339 arg->ids = decode_array_32_alloc(p, end, &arg->ids_size); 340 if (IS_ERR(arg->ids)) { 341 ret = PTR_ERR(arg->ids); 342 arg->ids = NULL; 343 return ret; 344 } 345 346 return 0; 347 348 e_inval: 349 return -EINVAL; 350 } 351 352 static int decode_choose_args(void **p, void *end, struct crush_map *c) 353 { 354 struct crush_choose_arg_map *arg_map = NULL; 355 u32 num_choose_arg_maps, num_buckets; 356 int ret; 357 358 ceph_decode_32_safe(p, end, num_choose_arg_maps, e_inval); 359 while (num_choose_arg_maps--) { 360 arg_map = alloc_choose_arg_map(); 361 if (!arg_map) { 362 ret = -ENOMEM; 363 goto fail; 364 } 365 366 ceph_decode_64_safe(p, end, arg_map->choose_args_index, 367 e_inval); 368 arg_map->size = c->max_buckets; 369 arg_map->args = kzalloc_objs(*arg_map->args, arg_map->size, 370 GFP_NOIO); 371 if (!arg_map->args) { 372 ret = -ENOMEM; 373 goto fail; 374 } 375 376 ceph_decode_32_safe(p, end, num_buckets, e_inval); 377 while (num_buckets--) { 378 struct crush_choose_arg *arg; 379 u32 bucket_index; 380 381 ceph_decode_32_safe(p, end, bucket_index, e_inval); 382 if (bucket_index >= arg_map->size) 383 goto e_inval; 384 385 arg = &arg_map->args[bucket_index]; 386 ret = decode_choose_arg(p, end, arg); 387 if (ret) 388 goto fail; 389 390 if (arg->ids_size && 391 (!c->buckets[bucket_index] || 392 arg->ids_size != c->buckets[bucket_index]->size)) 393 goto e_inval; 394 } 395 396 if (!__insert_choose_arg_map(&c->choose_args, arg_map)) { 397 ret = -EEXIST; 398 goto fail; 399 } 400 } 401 402 return 0; 403 404 e_inval: 405 ret = -EINVAL; 406 fail: 407 free_choose_arg_map(arg_map); 408 return ret; 409 } 410 411 static void crush_finalize(struct crush_map *c) 412 { 413 __s32 b; 414 415 /* Space for the array of pointers to per-bucket workspace */ 416 c->working_size = sizeof(struct crush_work) + 417 c->max_buckets * sizeof(struct crush_work_bucket *); 418 419 for (b = 0; b < c->max_buckets; b++) { 420 if (!c->buckets[b]) 421 continue; 422 423 switch (c->buckets[b]->alg) { 424 default: 425 /* 426 * The base case, permutation variables and 427 * the pointer to the permutation array. 428 */ 429 c->working_size += sizeof(struct crush_work_bucket); 430 break; 431 } 432 /* Every bucket has a permutation array. */ 433 c->working_size += c->buckets[b]->size * sizeof(__u32); 434 } 435 } 436 437 static struct crush_map *crush_decode(void *pbyval, void *end) 438 { 439 struct crush_map *c; 440 int err; 441 int i, j; 442 void **p = &pbyval; 443 void *start = pbyval; 444 u32 magic; 445 446 dout("crush_decode %p to %p len %d\n", *p, end, (int)(end - *p)); 447 448 c = kzalloc_obj(*c, GFP_NOFS); 449 if (c == NULL) 450 return ERR_PTR(-ENOMEM); 451 452 c->type_names = RB_ROOT; 453 c->names = RB_ROOT; 454 c->choose_args = RB_ROOT; 455 456 /* set tunables to default values */ 457 c->choose_local_tries = 2; 458 c->choose_local_fallback_tries = 5; 459 c->choose_total_tries = 19; 460 c->chooseleaf_descend_once = 0; 461 462 ceph_decode_need(p, end, 4*sizeof(u32), bad); 463 magic = ceph_decode_32(p); 464 if (magic != CRUSH_MAGIC) { 465 pr_err("crush_decode magic %x != current %x\n", 466 (unsigned int)magic, (unsigned int)CRUSH_MAGIC); 467 goto bad; 468 } 469 c->max_buckets = ceph_decode_32(p); 470 c->max_rules = ceph_decode_32(p); 471 c->max_devices = ceph_decode_32(p); 472 473 c->buckets = kzalloc_objs(*c->buckets, c->max_buckets, GFP_NOFS); 474 if (c->buckets == NULL) 475 goto badmem; 476 c->rules = kzalloc_objs(*c->rules, c->max_rules, GFP_NOFS); 477 if (c->rules == NULL) 478 goto badmem; 479 480 /* buckets */ 481 for (i = 0; i < c->max_buckets; i++) { 482 int size = 0; 483 u32 alg; 484 struct crush_bucket *b; 485 486 ceph_decode_32_safe(p, end, alg, bad); 487 if (alg == 0) { 488 c->buckets[i] = NULL; 489 continue; 490 } 491 dout("crush_decode bucket %d off %x %p to %p\n", 492 i, (int)(*p-start), *p, end); 493 494 switch (alg) { 495 case CRUSH_BUCKET_UNIFORM: 496 size = sizeof(struct crush_bucket_uniform); 497 break; 498 case CRUSH_BUCKET_LIST: 499 size = sizeof(struct crush_bucket_list); 500 break; 501 case CRUSH_BUCKET_TREE: 502 size = sizeof(struct crush_bucket_tree); 503 break; 504 case CRUSH_BUCKET_STRAW: 505 size = sizeof(struct crush_bucket_straw); 506 break; 507 case CRUSH_BUCKET_STRAW2: 508 size = sizeof(struct crush_bucket_straw2); 509 break; 510 default: 511 goto bad; 512 } 513 BUG_ON(size == 0); 514 b = c->buckets[i] = kzalloc(size, GFP_NOFS); 515 if (b == NULL) 516 goto badmem; 517 518 ceph_decode_need(p, end, 4*sizeof(u32), bad); 519 b->id = ceph_decode_32(p); 520 b->type = ceph_decode_16(p); 521 b->alg = ceph_decode_8(p); 522 if (b->alg != alg) { 523 b->alg = 0; 524 goto bad; 525 } 526 b->hash = ceph_decode_8(p); 527 b->weight = ceph_decode_32(p); 528 b->size = ceph_decode_32(p); 529 530 dout("crush_decode bucket size %d off %x %p to %p\n", 531 b->size, (int)(*p-start), *p, end); 532 533 b->items = kzalloc_objs(__s32, b->size, GFP_NOFS); 534 if (b->items == NULL) 535 goto badmem; 536 537 ceph_decode_need(p, end, b->size*sizeof(u32), bad); 538 for (j = 0; j < b->size; j++) 539 b->items[j] = ceph_decode_32(p); 540 541 switch (b->alg) { 542 case CRUSH_BUCKET_UNIFORM: 543 err = crush_decode_uniform_bucket(p, end, 544 (struct crush_bucket_uniform *)b); 545 if (err < 0) 546 goto fail; 547 break; 548 case CRUSH_BUCKET_LIST: 549 err = crush_decode_list_bucket(p, end, 550 (struct crush_bucket_list *)b); 551 if (err < 0) 552 goto fail; 553 break; 554 case CRUSH_BUCKET_TREE: 555 err = crush_decode_tree_bucket(p, end, 556 (struct crush_bucket_tree *)b); 557 if (err < 0) 558 goto fail; 559 break; 560 case CRUSH_BUCKET_STRAW: 561 err = crush_decode_straw_bucket(p, end, 562 (struct crush_bucket_straw *)b); 563 if (err < 0) 564 goto fail; 565 break; 566 case CRUSH_BUCKET_STRAW2: 567 err = crush_decode_straw2_bucket(p, end, 568 (struct crush_bucket_straw2 *)b); 569 if (err < 0) 570 goto fail; 571 break; 572 } 573 } 574 575 /* rules */ 576 dout("rule vec is %p\n", c->rules); 577 for (i = 0; i < c->max_rules; i++) { 578 u32 yes; 579 struct crush_rule *r; 580 581 ceph_decode_32_safe(p, end, yes, bad); 582 if (!yes) { 583 dout("crush_decode NO rule %d off %x %p to %p\n", 584 i, (int)(*p-start), *p, end); 585 c->rules[i] = NULL; 586 continue; 587 } 588 589 dout("crush_decode rule %d off %x %p to %p\n", 590 i, (int)(*p-start), *p, end); 591 592 /* len */ 593 ceph_decode_32_safe(p, end, yes, bad); 594 #if BITS_PER_LONG == 32 595 if (yes > (ULONG_MAX - sizeof(*r)) 596 / sizeof(struct crush_rule_step)) 597 goto bad; 598 #endif 599 r = kmalloc_flex(*r, steps, yes, GFP_NOFS); 600 if (r == NULL) 601 goto badmem; 602 dout(" rule %d is at %p\n", i, r); 603 c->rules[i] = r; 604 r->len = yes; 605 ceph_decode_copy_safe(p, end, &r->mask, 4, bad); /* 4 u8's */ 606 ceph_decode_need(p, end, r->len*3*sizeof(u32), bad); 607 for (j = 0; j < r->len; j++) { 608 r->steps[j].op = ceph_decode_32(p); 609 r->steps[j].arg1 = ceph_decode_32(p); 610 r->steps[j].arg2 = ceph_decode_32(p); 611 } 612 } 613 614 err = decode_crush_names(p, end, &c->type_names); 615 if (err) 616 goto fail; 617 618 err = decode_crush_names(p, end, &c->names); 619 if (err) 620 goto fail; 621 622 ceph_decode_skip_map(p, end, 32, string, bad); /* rule_name_map */ 623 624 /* tunables */ 625 ceph_decode_need(p, end, 3*sizeof(u32), done); 626 c->choose_local_tries = ceph_decode_32(p); 627 c->choose_local_fallback_tries = ceph_decode_32(p); 628 c->choose_total_tries = ceph_decode_32(p); 629 dout("crush decode tunable choose_local_tries = %d\n", 630 c->choose_local_tries); 631 dout("crush decode tunable choose_local_fallback_tries = %d\n", 632 c->choose_local_fallback_tries); 633 dout("crush decode tunable choose_total_tries = %d\n", 634 c->choose_total_tries); 635 636 ceph_decode_need(p, end, sizeof(u32), done); 637 c->chooseleaf_descend_once = ceph_decode_32(p); 638 dout("crush decode tunable chooseleaf_descend_once = %d\n", 639 c->chooseleaf_descend_once); 640 641 ceph_decode_need(p, end, sizeof(u8), done); 642 c->chooseleaf_vary_r = ceph_decode_8(p); 643 dout("crush decode tunable chooseleaf_vary_r = %d\n", 644 c->chooseleaf_vary_r); 645 646 /* skip straw_calc_version, allowed_bucket_algs */ 647 ceph_decode_need(p, end, sizeof(u8) + sizeof(u32), done); 648 *p += sizeof(u8) + sizeof(u32); 649 650 ceph_decode_need(p, end, sizeof(u8), done); 651 c->chooseleaf_stable = ceph_decode_8(p); 652 dout("crush decode tunable chooseleaf_stable = %d\n", 653 c->chooseleaf_stable); 654 655 if (*p != end) { 656 /* class_map */ 657 ceph_decode_skip_map(p, end, 32, 32, bad); 658 /* class_name */ 659 ceph_decode_skip_map(p, end, 32, string, bad); 660 /* class_bucket */ 661 ceph_decode_skip_map_of_map(p, end, 32, 32, 32, bad); 662 } 663 664 if (*p != end) { 665 err = decode_choose_args(p, end, c); 666 if (err) 667 goto fail; 668 } 669 670 done: 671 crush_finalize(c); 672 dout("crush_decode success\n"); 673 return c; 674 675 badmem: 676 err = -ENOMEM; 677 fail: 678 dout("crush_decode fail %d\n", err); 679 crush_destroy(c); 680 return ERR_PTR(err); 681 682 bad: 683 err = -EINVAL; 684 goto fail; 685 } 686 687 int ceph_pg_compare(const struct ceph_pg *lhs, const struct ceph_pg *rhs) 688 { 689 if (lhs->pool < rhs->pool) 690 return -1; 691 if (lhs->pool > rhs->pool) 692 return 1; 693 if (lhs->seed < rhs->seed) 694 return -1; 695 if (lhs->seed > rhs->seed) 696 return 1; 697 698 return 0; 699 } 700 701 int ceph_spg_compare(const struct ceph_spg *lhs, const struct ceph_spg *rhs) 702 { 703 int ret; 704 705 ret = ceph_pg_compare(&lhs->pgid, &rhs->pgid); 706 if (ret) 707 return ret; 708 709 if (lhs->shard < rhs->shard) 710 return -1; 711 if (lhs->shard > rhs->shard) 712 return 1; 713 714 return 0; 715 } 716 717 static struct ceph_pg_mapping *alloc_pg_mapping(size_t payload_len) 718 { 719 struct ceph_pg_mapping *pg; 720 721 pg = kmalloc(sizeof(*pg) + payload_len, GFP_NOIO); 722 if (!pg) 723 return NULL; 724 725 RB_CLEAR_NODE(&pg->node); 726 return pg; 727 } 728 729 static void free_pg_mapping(struct ceph_pg_mapping *pg) 730 { 731 WARN_ON(!RB_EMPTY_NODE(&pg->node)); 732 733 kfree(pg); 734 } 735 736 /* 737 * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid 738 * to a set of osds) and primary_temp (explicit primary setting) 739 */ 740 DEFINE_RB_FUNCS2(pg_mapping, struct ceph_pg_mapping, pgid, ceph_pg_compare, 741 RB_BYPTR, const struct ceph_pg *, node) 742 743 /* 744 * rbtree of pg pool info 745 */ 746 DEFINE_RB_FUNCS(pg_pool, struct ceph_pg_pool_info, id, node) 747 748 struct ceph_pg_pool_info *ceph_pg_pool_by_id(struct ceph_osdmap *map, u64 id) 749 { 750 return lookup_pg_pool(&map->pg_pools, id); 751 } 752 753 const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id) 754 { 755 struct ceph_pg_pool_info *pi; 756 757 if (id == CEPH_NOPOOL) 758 return NULL; 759 760 if (WARN_ON_ONCE(id > (u64) INT_MAX)) 761 return NULL; 762 763 pi = lookup_pg_pool(&map->pg_pools, id); 764 return pi ? pi->name : NULL; 765 } 766 EXPORT_SYMBOL(ceph_pg_pool_name_by_id); 767 768 int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name) 769 { 770 struct rb_node *rbp; 771 772 for (rbp = rb_first(&map->pg_pools); rbp; rbp = rb_next(rbp)) { 773 struct ceph_pg_pool_info *pi = 774 rb_entry(rbp, struct ceph_pg_pool_info, node); 775 if (pi->name && strcmp(pi->name, name) == 0) 776 return pi->id; 777 } 778 return -ENOENT; 779 } 780 EXPORT_SYMBOL(ceph_pg_poolid_by_name); 781 782 u64 ceph_pg_pool_flags(struct ceph_osdmap *map, u64 id) 783 { 784 struct ceph_pg_pool_info *pi; 785 786 pi = lookup_pg_pool(&map->pg_pools, id); 787 return pi ? pi->flags : 0; 788 } 789 EXPORT_SYMBOL(ceph_pg_pool_flags); 790 791 static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi) 792 { 793 erase_pg_pool(root, pi); 794 kfree(pi->name); 795 kfree(pi); 796 } 797 798 static int decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi) 799 { 800 u8 ev, cv; 801 unsigned len, num; 802 void *pool_end; 803 804 ceph_decode_need(p, end, 2 + 4, bad); 805 ev = ceph_decode_8(p); /* encoding version */ 806 cv = ceph_decode_8(p); /* compat version */ 807 if (ev < 5) { 808 pr_warn("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv); 809 return -EINVAL; 810 } 811 if (cv > 9) { 812 pr_warn("got v %d cv %d > 9 of ceph_pg_pool\n", ev, cv); 813 return -EINVAL; 814 } 815 len = ceph_decode_32(p); 816 ceph_decode_need(p, end, len, bad); 817 pool_end = *p + len; 818 819 ceph_decode_need(p, end, 4 + 4 + 4, bad); 820 pi->type = ceph_decode_8(p); 821 pi->size = ceph_decode_8(p); 822 pi->crush_ruleset = ceph_decode_8(p); 823 pi->object_hash = ceph_decode_8(p); 824 pi->pg_num = ceph_decode_32(p); 825 pi->pgp_num = ceph_decode_32(p); 826 827 /* lpg*, last_change, snap_seq, snap_epoch */ 828 ceph_decode_skip_n(p, end, 8 + 4 + 8 + 4, bad); 829 830 /* skip snaps */ 831 ceph_decode_32_safe(p, end, num, bad); 832 while (num--) { 833 /* snapid key, pool snap (with versions) */ 834 ceph_decode_skip_n(p, end, 8 + 2, bad); 835 ceph_decode_skip_string(p, end, bad); 836 } 837 838 /* removed_snaps */ 839 ceph_decode_skip_map(p, end, 64, 64, bad); 840 841 ceph_decode_need(p, end, 8 + 8 + 4, bad); 842 *p += 8; /* skip auid */ 843 pi->flags = ceph_decode_64(p); 844 *p += 4; /* skip crash_replay_interval */ 845 846 if (ev >= 7) 847 ceph_decode_8_safe(p, end, pi->min_size, bad); 848 else 849 pi->min_size = pi->size - pi->size / 2; 850 851 if (ev >= 8) 852 /* quota_max_* */ 853 ceph_decode_skip_n(p, end, 8 + 8, bad); 854 855 if (ev >= 9) { 856 /* tiers */ 857 ceph_decode_skip_set(p, end, 64, bad); 858 859 ceph_decode_need(p, end, 8 + 1 + 8 + 8, bad); 860 *p += 8; /* skip tier_of */ 861 *p += 1; /* skip cache_mode */ 862 pi->read_tier = ceph_decode_64(p); 863 pi->write_tier = ceph_decode_64(p); 864 } else { 865 pi->read_tier = -1; 866 pi->write_tier = -1; 867 } 868 869 if (ev >= 10) 870 /* properties */ 871 ceph_decode_skip_map(p, end, string, string, bad); 872 873 if (ev >= 11) { 874 /* hit_set_params (with versions) */ 875 ceph_decode_skip_n(p, end, 2, bad); 876 ceph_decode_skip_string(p, end, bad); 877 878 /* hit_set_period, hit_set_count */ 879 ceph_decode_skip_n(p, end, 4 + 4, bad); 880 } 881 882 if (ev >= 12) 883 /* stripe_width */ 884 ceph_decode_skip_32(p, end, bad); 885 886 if (ev >= 13) 887 /* target_max_*, cache_target_*, cache_min_* */ 888 ceph_decode_skip_n(p, end, 16 + 8 + 8, bad); 889 890 if (ev >= 14) 891 /* erasure_code_profile */ 892 ceph_decode_skip_string(p, end, bad); 893 894 /* 895 * last_force_op_resend_preluminous, will be overridden if the 896 * map was encoded with RESEND_ON_SPLIT 897 */ 898 if (ev >= 15) 899 ceph_decode_32_safe(p, end, pi->last_force_request_resend, bad); 900 else 901 pi->last_force_request_resend = 0; 902 903 if (ev >= 16) 904 /* min_read_recency_for_promote */ 905 ceph_decode_skip_32(p, end, bad); 906 907 if (ev >= 17) 908 /* expected_num_objects */ 909 ceph_decode_skip_64(p, end, bad); 910 911 if (ev >= 19) 912 /* cache_target_dirty_high_ratio_micro */ 913 ceph_decode_skip_32(p, end, bad); 914 915 if (ev >= 20) 916 /* min_write_recency_for_promote */ 917 ceph_decode_skip_32(p, end, bad); 918 919 if (ev >= 21) 920 /* use_gmt_hitset */ 921 ceph_decode_skip_8(p, end, bad); 922 923 if (ev >= 22) 924 /* fast_read */ 925 ceph_decode_skip_8(p, end, bad); 926 927 if (ev >= 23) 928 /* hit_set_grade_decay_rate, hit_set_search_last_n */ 929 ceph_decode_skip_n(p, end, 4 + 4, bad); 930 931 if (ev >= 24) { 932 /* opts (with versions) */ 933 ceph_decode_skip_n(p, end, 2, bad); 934 ceph_decode_skip_string(p, end, bad); 935 } 936 937 if (ev >= 25) 938 ceph_decode_32_safe(p, end, pi->last_force_request_resend, bad); 939 940 /* ignore the rest */ 941 942 *p = pool_end; 943 calc_pg_masks(pi); 944 return 0; 945 946 bad: 947 return -EINVAL; 948 } 949 950 static int decode_pool_names(void **p, void *end, struct ceph_osdmap *map) 951 { 952 struct ceph_pg_pool_info *pi; 953 u32 num, len; 954 u64 pool; 955 956 ceph_decode_32_safe(p, end, num, bad); 957 dout(" %d pool names\n", num); 958 while (num--) { 959 ceph_decode_64_safe(p, end, pool, bad); 960 ceph_decode_32_safe(p, end, len, bad); 961 dout(" pool %llu len %d\n", pool, len); 962 ceph_decode_need(p, end, len, bad); 963 pi = lookup_pg_pool(&map->pg_pools, pool); 964 if (pi) { 965 char *name = kstrndup(*p, len, GFP_NOFS); 966 967 if (!name) 968 return -ENOMEM; 969 kfree(pi->name); 970 pi->name = name; 971 dout(" name is %s\n", pi->name); 972 } 973 *p += len; 974 } 975 return 0; 976 977 bad: 978 return -EINVAL; 979 } 980 981 /* 982 * CRUSH workspaces 983 * 984 * workspace_manager framework borrowed from fs/btrfs/compression.c. 985 * Two simplifications: there is only one type of workspace and there 986 * is always at least one workspace. 987 */ 988 static struct crush_work *alloc_workspace(const struct crush_map *c) 989 { 990 struct crush_work *work; 991 size_t work_size; 992 993 WARN_ON(!c->working_size); 994 work_size = crush_work_size(c, CEPH_PG_MAX_SIZE); 995 dout("%s work_size %zu bytes\n", __func__, work_size); 996 997 work = kvmalloc(work_size, GFP_NOIO); 998 if (!work) 999 return NULL; 1000 1001 INIT_LIST_HEAD(&work->item); 1002 crush_init_workspace(c, work); 1003 return work; 1004 } 1005 1006 static void free_workspace(struct crush_work *work) 1007 { 1008 WARN_ON(!list_empty(&work->item)); 1009 kvfree(work); 1010 } 1011 1012 static void init_workspace_manager(struct workspace_manager *wsm) 1013 { 1014 INIT_LIST_HEAD(&wsm->idle_ws); 1015 spin_lock_init(&wsm->ws_lock); 1016 atomic_set(&wsm->total_ws, 0); 1017 wsm->free_ws = 0; 1018 init_waitqueue_head(&wsm->ws_wait); 1019 } 1020 1021 static void add_initial_workspace(struct workspace_manager *wsm, 1022 struct crush_work *work) 1023 { 1024 WARN_ON(!list_empty(&wsm->idle_ws)); 1025 1026 list_add(&work->item, &wsm->idle_ws); 1027 atomic_set(&wsm->total_ws, 1); 1028 wsm->free_ws = 1; 1029 } 1030 1031 static void cleanup_workspace_manager(struct workspace_manager *wsm) 1032 { 1033 struct crush_work *work; 1034 1035 while (!list_empty(&wsm->idle_ws)) { 1036 work = list_first_entry(&wsm->idle_ws, struct crush_work, 1037 item); 1038 list_del_init(&work->item); 1039 free_workspace(work); 1040 } 1041 atomic_set(&wsm->total_ws, 0); 1042 wsm->free_ws = 0; 1043 } 1044 1045 /* 1046 * Finds an available workspace or allocates a new one. If it's not 1047 * possible to allocate a new one, waits until there is one. 1048 */ 1049 static struct crush_work *get_workspace(struct workspace_manager *wsm, 1050 const struct crush_map *c) 1051 { 1052 struct crush_work *work; 1053 int cpus = num_online_cpus(); 1054 1055 again: 1056 spin_lock(&wsm->ws_lock); 1057 if (!list_empty(&wsm->idle_ws)) { 1058 work = list_first_entry(&wsm->idle_ws, struct crush_work, 1059 item); 1060 list_del_init(&work->item); 1061 wsm->free_ws--; 1062 spin_unlock(&wsm->ws_lock); 1063 return work; 1064 1065 } 1066 if (atomic_read(&wsm->total_ws) > cpus) { 1067 DEFINE_WAIT(wait); 1068 1069 spin_unlock(&wsm->ws_lock); 1070 prepare_to_wait(&wsm->ws_wait, &wait, TASK_UNINTERRUPTIBLE); 1071 if (atomic_read(&wsm->total_ws) > cpus && !wsm->free_ws) 1072 schedule(); 1073 finish_wait(&wsm->ws_wait, &wait); 1074 goto again; 1075 } 1076 atomic_inc(&wsm->total_ws); 1077 spin_unlock(&wsm->ws_lock); 1078 1079 work = alloc_workspace(c); 1080 if (!work) { 1081 atomic_dec(&wsm->total_ws); 1082 wake_up(&wsm->ws_wait); 1083 1084 /* 1085 * Do not return the error but go back to waiting. We 1086 * have the initial workspace and the CRUSH computation 1087 * time is bounded so we will get it eventually. 1088 */ 1089 WARN_ON(atomic_read(&wsm->total_ws) < 1); 1090 goto again; 1091 } 1092 return work; 1093 } 1094 1095 /* 1096 * Puts a workspace back on the list or frees it if we have enough 1097 * idle ones sitting around. 1098 */ 1099 static void put_workspace(struct workspace_manager *wsm, 1100 struct crush_work *work) 1101 { 1102 spin_lock(&wsm->ws_lock); 1103 if (wsm->free_ws <= num_online_cpus()) { 1104 list_add(&work->item, &wsm->idle_ws); 1105 wsm->free_ws++; 1106 spin_unlock(&wsm->ws_lock); 1107 goto wake; 1108 } 1109 spin_unlock(&wsm->ws_lock); 1110 1111 free_workspace(work); 1112 atomic_dec(&wsm->total_ws); 1113 wake: 1114 if (wq_has_sleeper(&wsm->ws_wait)) 1115 wake_up(&wsm->ws_wait); 1116 } 1117 1118 /* 1119 * osd map 1120 */ 1121 struct ceph_osdmap *ceph_osdmap_alloc(void) 1122 { 1123 struct ceph_osdmap *map; 1124 1125 map = kzalloc_obj(*map, GFP_NOIO); 1126 if (!map) 1127 return NULL; 1128 1129 map->pg_pools = RB_ROOT; 1130 map->pool_max = -1; 1131 map->pg_temp = RB_ROOT; 1132 map->primary_temp = RB_ROOT; 1133 map->pg_upmap = RB_ROOT; 1134 map->pg_upmap_items = RB_ROOT; 1135 1136 init_workspace_manager(&map->crush_wsm); 1137 1138 return map; 1139 } 1140 1141 void ceph_osdmap_destroy(struct ceph_osdmap *map) 1142 { 1143 dout("osdmap_destroy %p\n", map); 1144 1145 if (map->crush) 1146 crush_destroy(map->crush); 1147 cleanup_workspace_manager(&map->crush_wsm); 1148 1149 while (!RB_EMPTY_ROOT(&map->pg_temp)) { 1150 struct ceph_pg_mapping *pg = 1151 rb_entry(rb_first(&map->pg_temp), 1152 struct ceph_pg_mapping, node); 1153 erase_pg_mapping(&map->pg_temp, pg); 1154 free_pg_mapping(pg); 1155 } 1156 while (!RB_EMPTY_ROOT(&map->primary_temp)) { 1157 struct ceph_pg_mapping *pg = 1158 rb_entry(rb_first(&map->primary_temp), 1159 struct ceph_pg_mapping, node); 1160 erase_pg_mapping(&map->primary_temp, pg); 1161 free_pg_mapping(pg); 1162 } 1163 while (!RB_EMPTY_ROOT(&map->pg_upmap)) { 1164 struct ceph_pg_mapping *pg = 1165 rb_entry(rb_first(&map->pg_upmap), 1166 struct ceph_pg_mapping, node); 1167 rb_erase(&pg->node, &map->pg_upmap); 1168 kfree(pg); 1169 } 1170 while (!RB_EMPTY_ROOT(&map->pg_upmap_items)) { 1171 struct ceph_pg_mapping *pg = 1172 rb_entry(rb_first(&map->pg_upmap_items), 1173 struct ceph_pg_mapping, node); 1174 rb_erase(&pg->node, &map->pg_upmap_items); 1175 kfree(pg); 1176 } 1177 while (!RB_EMPTY_ROOT(&map->pg_pools)) { 1178 struct ceph_pg_pool_info *pi = 1179 rb_entry(rb_first(&map->pg_pools), 1180 struct ceph_pg_pool_info, node); 1181 __remove_pg_pool(&map->pg_pools, pi); 1182 } 1183 kvfree(map->osd_state); 1184 kvfree(map->osd_weight); 1185 kvfree(map->osd_addr); 1186 kvfree(map->osd_primary_affinity); 1187 kfree(map); 1188 } 1189 1190 /* 1191 * Adjust max_osd value, (re)allocate arrays. 1192 * 1193 * The new elements are properly initialized. 1194 */ 1195 static int osdmap_set_max_osd(struct ceph_osdmap *map, u32 max) 1196 { 1197 u32 *state; 1198 u32 *weight; 1199 struct ceph_entity_addr *addr; 1200 u32 to_copy; 1201 int i; 1202 1203 dout("%s old %u new %u\n", __func__, map->max_osd, max); 1204 if (max == map->max_osd) 1205 return 0; 1206 1207 state = kvmalloc(array_size(max, sizeof(*state)), GFP_NOFS); 1208 weight = kvmalloc(array_size(max, sizeof(*weight)), GFP_NOFS); 1209 addr = kvmalloc(array_size(max, sizeof(*addr)), GFP_NOFS); 1210 if (!state || !weight || !addr) { 1211 kvfree(state); 1212 kvfree(weight); 1213 kvfree(addr); 1214 return -ENOMEM; 1215 } 1216 1217 to_copy = min(map->max_osd, max); 1218 if (map->osd_state) { 1219 memcpy(state, map->osd_state, to_copy * sizeof(*state)); 1220 memcpy(weight, map->osd_weight, to_copy * sizeof(*weight)); 1221 memcpy(addr, map->osd_addr, to_copy * sizeof(*addr)); 1222 kvfree(map->osd_state); 1223 kvfree(map->osd_weight); 1224 kvfree(map->osd_addr); 1225 } 1226 1227 map->osd_state = state; 1228 map->osd_weight = weight; 1229 map->osd_addr = addr; 1230 for (i = map->max_osd; i < max; i++) { 1231 map->osd_state[i] = 0; 1232 map->osd_weight[i] = CEPH_OSD_OUT; 1233 memset(map->osd_addr + i, 0, sizeof(*map->osd_addr)); 1234 } 1235 1236 if (map->osd_primary_affinity) { 1237 u32 *affinity; 1238 1239 affinity = kvmalloc(array_size(max, sizeof(*affinity)), 1240 GFP_NOFS); 1241 if (!affinity) 1242 return -ENOMEM; 1243 1244 memcpy(affinity, map->osd_primary_affinity, 1245 to_copy * sizeof(*affinity)); 1246 kvfree(map->osd_primary_affinity); 1247 1248 map->osd_primary_affinity = affinity; 1249 for (i = map->max_osd; i < max; i++) 1250 map->osd_primary_affinity[i] = 1251 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY; 1252 } 1253 1254 map->max_osd = max; 1255 1256 return 0; 1257 } 1258 1259 static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush) 1260 { 1261 struct crush_work *work; 1262 1263 if (IS_ERR(crush)) 1264 return PTR_ERR(crush); 1265 1266 work = alloc_workspace(crush); 1267 if (!work) { 1268 crush_destroy(crush); 1269 return -ENOMEM; 1270 } 1271 1272 if (map->crush) 1273 crush_destroy(map->crush); 1274 cleanup_workspace_manager(&map->crush_wsm); 1275 map->crush = crush; 1276 add_initial_workspace(&map->crush_wsm, work); 1277 return 0; 1278 } 1279 1280 #define OSDMAP_WRAPPER_COMPAT_VER 7 1281 #define OSDMAP_CLIENT_DATA_COMPAT_VER 1 1282 1283 /* 1284 * Return 0 or error. On success, *v is set to 0 for old (v6) osdmaps, 1285 * to struct_v of the client_data section for new (v7 and above) 1286 * osdmaps. 1287 */ 1288 static int get_osdmap_client_data_v(void **p, void *end, 1289 const char *prefix, u8 *v) 1290 { 1291 u8 struct_v; 1292 1293 ceph_decode_8_safe(p, end, struct_v, e_inval); 1294 if (struct_v >= 7) { 1295 u8 struct_compat; 1296 1297 ceph_decode_8_safe(p, end, struct_compat, e_inval); 1298 if (struct_compat > OSDMAP_WRAPPER_COMPAT_VER) { 1299 pr_warn("got v %d cv %d > %d of %s ceph_osdmap\n", 1300 struct_v, struct_compat, 1301 OSDMAP_WRAPPER_COMPAT_VER, prefix); 1302 return -EINVAL; 1303 } 1304 *p += 4; /* ignore wrapper struct_len */ 1305 1306 ceph_decode_8_safe(p, end, struct_v, e_inval); 1307 ceph_decode_8_safe(p, end, struct_compat, e_inval); 1308 if (struct_compat > OSDMAP_CLIENT_DATA_COMPAT_VER) { 1309 pr_warn("got v %d cv %d > %d of %s ceph_osdmap client data\n", 1310 struct_v, struct_compat, 1311 OSDMAP_CLIENT_DATA_COMPAT_VER, prefix); 1312 return -EINVAL; 1313 } 1314 *p += 4; /* ignore client data struct_len */ 1315 } else { 1316 u16 version; 1317 1318 *p -= 1; 1319 ceph_decode_16_safe(p, end, version, e_inval); 1320 if (version < 6) { 1321 pr_warn("got v %d < 6 of %s ceph_osdmap\n", 1322 version, prefix); 1323 return -EINVAL; 1324 } 1325 1326 /* old osdmap encoding */ 1327 struct_v = 0; 1328 } 1329 1330 *v = struct_v; 1331 return 0; 1332 1333 e_inval: 1334 return -EINVAL; 1335 } 1336 1337 static int __decode_pools(void **p, void *end, struct ceph_osdmap *map, 1338 bool incremental) 1339 { 1340 u32 n; 1341 1342 ceph_decode_32_safe(p, end, n, e_inval); 1343 while (n--) { 1344 struct ceph_pg_pool_info *pi; 1345 u64 pool; 1346 int ret; 1347 1348 ceph_decode_64_safe(p, end, pool, e_inval); 1349 1350 pi = lookup_pg_pool(&map->pg_pools, pool); 1351 if (!incremental || !pi) { 1352 pi = kzalloc_obj(*pi, GFP_NOFS); 1353 if (!pi) 1354 return -ENOMEM; 1355 1356 RB_CLEAR_NODE(&pi->node); 1357 pi->id = pool; 1358 1359 if (!__insert_pg_pool(&map->pg_pools, pi)) { 1360 kfree(pi); 1361 return -EEXIST; 1362 } 1363 } 1364 1365 ret = decode_pool(p, end, pi); 1366 if (ret) 1367 return ret; 1368 } 1369 1370 return 0; 1371 1372 e_inval: 1373 return -EINVAL; 1374 } 1375 1376 static int decode_pools(void **p, void *end, struct ceph_osdmap *map) 1377 { 1378 return __decode_pools(p, end, map, false); 1379 } 1380 1381 static int decode_new_pools(void **p, void *end, struct ceph_osdmap *map) 1382 { 1383 return __decode_pools(p, end, map, true); 1384 } 1385 1386 typedef struct ceph_pg_mapping *(*decode_mapping_fn_t)(void **, void *, bool); 1387 1388 static int decode_pg_mapping(void **p, void *end, struct rb_root *mapping_root, 1389 decode_mapping_fn_t fn, bool incremental) 1390 { 1391 u32 n; 1392 1393 WARN_ON(!incremental && !fn); 1394 1395 ceph_decode_32_safe(p, end, n, e_inval); 1396 while (n--) { 1397 struct ceph_pg_mapping *pg; 1398 struct ceph_pg pgid; 1399 int ret; 1400 1401 ret = ceph_decode_pgid(p, end, &pgid); 1402 if (ret) 1403 return ret; 1404 1405 pg = lookup_pg_mapping(mapping_root, &pgid); 1406 if (pg) { 1407 WARN_ON(!incremental); 1408 erase_pg_mapping(mapping_root, pg); 1409 free_pg_mapping(pg); 1410 } 1411 1412 if (fn) { 1413 pg = fn(p, end, incremental); 1414 if (IS_ERR(pg)) 1415 return PTR_ERR(pg); 1416 1417 if (pg) { 1418 pg->pgid = pgid; /* struct */ 1419 insert_pg_mapping(mapping_root, pg); 1420 } 1421 } 1422 } 1423 1424 return 0; 1425 1426 e_inval: 1427 return -EINVAL; 1428 } 1429 1430 static struct ceph_pg_mapping *__decode_pg_temp(void **p, void *end, 1431 bool incremental) 1432 { 1433 struct ceph_pg_mapping *pg; 1434 u32 len, i; 1435 1436 ceph_decode_32_safe(p, end, len, e_inval); 1437 if (len == 0 && incremental) 1438 return NULL; /* new_pg_temp: [] to remove */ 1439 if ((size_t)len > (SIZE_MAX - sizeof(*pg)) / sizeof(u32)) 1440 return ERR_PTR(-EINVAL); 1441 1442 ceph_decode_need(p, end, len * sizeof(u32), e_inval); 1443 pg = alloc_pg_mapping(len * sizeof(u32)); 1444 if (!pg) 1445 return ERR_PTR(-ENOMEM); 1446 1447 pg->pg_temp.len = len; 1448 for (i = 0; i < len; i++) 1449 pg->pg_temp.osds[i] = ceph_decode_32(p); 1450 1451 return pg; 1452 1453 e_inval: 1454 return ERR_PTR(-EINVAL); 1455 } 1456 1457 static int decode_pg_temp(void **p, void *end, struct ceph_osdmap *map) 1458 { 1459 return decode_pg_mapping(p, end, &map->pg_temp, __decode_pg_temp, 1460 false); 1461 } 1462 1463 static int decode_new_pg_temp(void **p, void *end, struct ceph_osdmap *map) 1464 { 1465 return decode_pg_mapping(p, end, &map->pg_temp, __decode_pg_temp, 1466 true); 1467 } 1468 1469 static struct ceph_pg_mapping *__decode_primary_temp(void **p, void *end, 1470 bool incremental) 1471 { 1472 struct ceph_pg_mapping *pg; 1473 u32 osd; 1474 1475 ceph_decode_32_safe(p, end, osd, e_inval); 1476 if (osd == (u32)-1 && incremental) 1477 return NULL; /* new_primary_temp: -1 to remove */ 1478 1479 pg = alloc_pg_mapping(0); 1480 if (!pg) 1481 return ERR_PTR(-ENOMEM); 1482 1483 pg->primary_temp.osd = osd; 1484 return pg; 1485 1486 e_inval: 1487 return ERR_PTR(-EINVAL); 1488 } 1489 1490 static int decode_primary_temp(void **p, void *end, struct ceph_osdmap *map) 1491 { 1492 return decode_pg_mapping(p, end, &map->primary_temp, 1493 __decode_primary_temp, false); 1494 } 1495 1496 static int decode_new_primary_temp(void **p, void *end, 1497 struct ceph_osdmap *map) 1498 { 1499 return decode_pg_mapping(p, end, &map->primary_temp, 1500 __decode_primary_temp, true); 1501 } 1502 1503 u32 ceph_get_primary_affinity(struct ceph_osdmap *map, int osd) 1504 { 1505 if (!map->osd_primary_affinity) 1506 return CEPH_OSD_DEFAULT_PRIMARY_AFFINITY; 1507 1508 return map->osd_primary_affinity[osd]; 1509 } 1510 1511 static int set_primary_affinity(struct ceph_osdmap *map, int osd, u32 aff) 1512 { 1513 if (!map->osd_primary_affinity) { 1514 int i; 1515 1516 map->osd_primary_affinity = kvmalloc( 1517 array_size(map->max_osd, sizeof(*map->osd_primary_affinity)), 1518 GFP_NOFS); 1519 if (!map->osd_primary_affinity) 1520 return -ENOMEM; 1521 1522 for (i = 0; i < map->max_osd; i++) 1523 map->osd_primary_affinity[i] = 1524 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY; 1525 } 1526 1527 map->osd_primary_affinity[osd] = aff; 1528 1529 return 0; 1530 } 1531 1532 static int decode_primary_affinity(void **p, void *end, 1533 struct ceph_osdmap *map) 1534 { 1535 u32 len, i; 1536 1537 ceph_decode_32_safe(p, end, len, e_inval); 1538 if (len == 0) { 1539 kvfree(map->osd_primary_affinity); 1540 map->osd_primary_affinity = NULL; 1541 return 0; 1542 } 1543 if (len != map->max_osd) 1544 goto e_inval; 1545 1546 ceph_decode_need(p, end, map->max_osd*sizeof(u32), e_inval); 1547 1548 for (i = 0; i < map->max_osd; i++) { 1549 int ret; 1550 1551 ret = set_primary_affinity(map, i, ceph_decode_32(p)); 1552 if (ret) 1553 return ret; 1554 } 1555 1556 return 0; 1557 1558 e_inval: 1559 return -EINVAL; 1560 } 1561 1562 static int decode_new_primary_affinity(void **p, void *end, 1563 struct ceph_osdmap *map) 1564 { 1565 u32 n; 1566 1567 ceph_decode_32_safe(p, end, n, e_inval); 1568 while (n--) { 1569 u32 osd, aff; 1570 int ret; 1571 1572 ceph_decode_32_safe(p, end, osd, e_inval); 1573 ceph_decode_32_safe(p, end, aff, e_inval); 1574 if (osd >= map->max_osd) 1575 goto e_inval; 1576 1577 ret = set_primary_affinity(map, osd, aff); 1578 if (ret) 1579 return ret; 1580 1581 osdmap_info(map, "osd%d primary-affinity 0x%x\n", osd, aff); 1582 } 1583 1584 return 0; 1585 1586 e_inval: 1587 return -EINVAL; 1588 } 1589 1590 static struct ceph_pg_mapping *__decode_pg_upmap(void **p, void *end, 1591 bool __unused) 1592 { 1593 return __decode_pg_temp(p, end, false); 1594 } 1595 1596 static int decode_pg_upmap(void **p, void *end, struct ceph_osdmap *map) 1597 { 1598 return decode_pg_mapping(p, end, &map->pg_upmap, __decode_pg_upmap, 1599 false); 1600 } 1601 1602 static int decode_new_pg_upmap(void **p, void *end, struct ceph_osdmap *map) 1603 { 1604 return decode_pg_mapping(p, end, &map->pg_upmap, __decode_pg_upmap, 1605 true); 1606 } 1607 1608 static int decode_old_pg_upmap(void **p, void *end, struct ceph_osdmap *map) 1609 { 1610 return decode_pg_mapping(p, end, &map->pg_upmap, NULL, true); 1611 } 1612 1613 static struct ceph_pg_mapping *__decode_pg_upmap_items(void **p, void *end, 1614 bool __unused) 1615 { 1616 struct ceph_pg_mapping *pg; 1617 u32 len, i; 1618 1619 ceph_decode_32_safe(p, end, len, e_inval); 1620 if ((size_t)len > (SIZE_MAX - sizeof(*pg)) / (2 * sizeof(u32))) 1621 return ERR_PTR(-EINVAL); 1622 1623 ceph_decode_need(p, end, 2 * len * sizeof(u32), e_inval); 1624 pg = alloc_pg_mapping(2 * len * sizeof(u32)); 1625 if (!pg) 1626 return ERR_PTR(-ENOMEM); 1627 1628 pg->pg_upmap_items.len = len; 1629 for (i = 0; i < len; i++) { 1630 pg->pg_upmap_items.from_to[i][0] = ceph_decode_32(p); 1631 pg->pg_upmap_items.from_to[i][1] = ceph_decode_32(p); 1632 } 1633 1634 return pg; 1635 1636 e_inval: 1637 return ERR_PTR(-EINVAL); 1638 } 1639 1640 static int decode_pg_upmap_items(void **p, void *end, struct ceph_osdmap *map) 1641 { 1642 return decode_pg_mapping(p, end, &map->pg_upmap_items, 1643 __decode_pg_upmap_items, false); 1644 } 1645 1646 static int decode_new_pg_upmap_items(void **p, void *end, 1647 struct ceph_osdmap *map) 1648 { 1649 return decode_pg_mapping(p, end, &map->pg_upmap_items, 1650 __decode_pg_upmap_items, true); 1651 } 1652 1653 static int decode_old_pg_upmap_items(void **p, void *end, 1654 struct ceph_osdmap *map) 1655 { 1656 return decode_pg_mapping(p, end, &map->pg_upmap_items, NULL, true); 1657 } 1658 1659 /* 1660 * decode a full map. 1661 */ 1662 static int osdmap_decode(void **p, void *end, bool msgr2, 1663 struct ceph_osdmap *map) 1664 { 1665 u8 struct_v; 1666 u32 epoch = 0; 1667 void *start = *p; 1668 u32 max; 1669 u32 len, i; 1670 int err; 1671 1672 dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p)); 1673 1674 err = get_osdmap_client_data_v(p, end, "full", &struct_v); 1675 if (err) 1676 goto bad; 1677 1678 /* fsid, epoch, created, modified */ 1679 ceph_decode_need(p, end, sizeof(map->fsid) + sizeof(u32) + 1680 sizeof(map->created) + sizeof(map->modified), e_inval); 1681 ceph_decode_copy(p, &map->fsid, sizeof(map->fsid)); 1682 epoch = map->epoch = ceph_decode_32(p); 1683 ceph_decode_copy(p, &map->created, sizeof(map->created)); 1684 ceph_decode_copy(p, &map->modified, sizeof(map->modified)); 1685 1686 /* pools */ 1687 err = decode_pools(p, end, map); 1688 if (err) 1689 goto bad; 1690 1691 /* pool_name */ 1692 err = decode_pool_names(p, end, map); 1693 if (err) 1694 goto bad; 1695 1696 ceph_decode_32_safe(p, end, map->pool_max, e_inval); 1697 1698 ceph_decode_32_safe(p, end, map->flags, e_inval); 1699 1700 /* max_osd */ 1701 ceph_decode_32_safe(p, end, max, e_inval); 1702 1703 /* (re)alloc osd arrays */ 1704 err = osdmap_set_max_osd(map, max); 1705 if (err) 1706 goto bad; 1707 1708 /* osd_state, osd_weight, osd_addrs->client_addr */ 1709 ceph_decode_need(p, end, 3*sizeof(u32) + 1710 map->max_osd*(struct_v >= 5 ? sizeof(u32) : 1711 sizeof(u8)) + 1712 map->max_osd*sizeof(*map->osd_weight), e_inval); 1713 if (ceph_decode_32(p) != map->max_osd) 1714 goto e_inval; 1715 1716 if (struct_v >= 5) { 1717 for (i = 0; i < map->max_osd; i++) 1718 map->osd_state[i] = ceph_decode_32(p); 1719 } else { 1720 for (i = 0; i < map->max_osd; i++) 1721 map->osd_state[i] = ceph_decode_8(p); 1722 } 1723 1724 if (ceph_decode_32(p) != map->max_osd) 1725 goto e_inval; 1726 1727 for (i = 0; i < map->max_osd; i++) 1728 map->osd_weight[i] = ceph_decode_32(p); 1729 1730 if (ceph_decode_32(p) != map->max_osd) 1731 goto e_inval; 1732 1733 for (i = 0; i < map->max_osd; i++) { 1734 struct ceph_entity_addr *addr = &map->osd_addr[i]; 1735 1736 if (struct_v >= 8) 1737 err = ceph_decode_entity_addrvec(p, end, msgr2, addr); 1738 else 1739 err = ceph_decode_entity_addr(p, end, addr); 1740 if (err) 1741 goto bad; 1742 1743 dout("%s osd%d addr %s\n", __func__, i, ceph_pr_addr(addr)); 1744 } 1745 1746 /* pg_temp */ 1747 err = decode_pg_temp(p, end, map); 1748 if (err) 1749 goto bad; 1750 1751 /* primary_temp */ 1752 if (struct_v >= 1) { 1753 err = decode_primary_temp(p, end, map); 1754 if (err) 1755 goto bad; 1756 } 1757 1758 /* primary_affinity */ 1759 if (struct_v >= 2) { 1760 err = decode_primary_affinity(p, end, map); 1761 if (err) 1762 goto bad; 1763 } else { 1764 WARN_ON(map->osd_primary_affinity); 1765 } 1766 1767 /* crush */ 1768 ceph_decode_32_safe(p, end, len, e_inval); 1769 err = osdmap_set_crush(map, crush_decode(*p, min(*p + len, end))); 1770 if (err) 1771 goto bad; 1772 1773 *p += len; 1774 if (struct_v >= 3) { 1775 /* erasure_code_profiles */ 1776 ceph_decode_skip_map_of_map(p, end, string, string, string, 1777 e_inval); 1778 } 1779 1780 if (struct_v >= 4) { 1781 err = decode_pg_upmap(p, end, map); 1782 if (err) 1783 goto bad; 1784 1785 err = decode_pg_upmap_items(p, end, map); 1786 if (err) 1787 goto bad; 1788 } else { 1789 WARN_ON(!RB_EMPTY_ROOT(&map->pg_upmap)); 1790 WARN_ON(!RB_EMPTY_ROOT(&map->pg_upmap_items)); 1791 } 1792 1793 /* ignore the rest */ 1794 *p = end; 1795 1796 dout("full osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd); 1797 return 0; 1798 1799 e_inval: 1800 err = -EINVAL; 1801 bad: 1802 pr_err("corrupt full osdmap (%d) epoch %d off %d (%p of %p-%p)\n", 1803 err, epoch, (int)(*p - start), *p, start, end); 1804 print_hex_dump(KERN_DEBUG, "osdmap: ", 1805 DUMP_PREFIX_OFFSET, 16, 1, 1806 start, end - start, true); 1807 return err; 1808 } 1809 1810 /* 1811 * Allocate and decode a full map. 1812 */ 1813 struct ceph_osdmap *ceph_osdmap_decode(void **p, void *end, bool msgr2) 1814 { 1815 struct ceph_osdmap *map; 1816 int ret; 1817 1818 map = ceph_osdmap_alloc(); 1819 if (!map) 1820 return ERR_PTR(-ENOMEM); 1821 1822 ret = osdmap_decode(p, end, msgr2, map); 1823 if (ret) { 1824 ceph_osdmap_destroy(map); 1825 return ERR_PTR(ret); 1826 } 1827 1828 return map; 1829 } 1830 1831 /* 1832 * Encoding order is (new_up_client, new_state, new_weight). Need to 1833 * apply in the (new_weight, new_state, new_up_client) order, because 1834 * an incremental map may look like e.g. 1835 * 1836 * new_up_client: { osd=6, addr=... } # set osd_state and addr 1837 * new_state: { osd=6, xorstate=EXISTS } # clear osd_state 1838 */ 1839 static int decode_new_up_state_weight(void **p, void *end, u8 struct_v, 1840 bool msgr2, struct ceph_osdmap *map) 1841 { 1842 void *new_up_client; 1843 void *new_state; 1844 void *new_weight_end; 1845 u32 len; 1846 int ret; 1847 int i; 1848 1849 new_up_client = *p; 1850 ceph_decode_32_safe(p, end, len, e_inval); 1851 for (i = 0; i < len; ++i) { 1852 struct ceph_entity_addr addr; 1853 1854 ceph_decode_skip_32(p, end, e_inval); 1855 if (struct_v >= 7) 1856 ret = ceph_decode_entity_addrvec(p, end, msgr2, &addr); 1857 else 1858 ret = ceph_decode_entity_addr(p, end, &addr); 1859 if (ret) 1860 return ret; 1861 } 1862 1863 new_state = *p; 1864 ceph_decode_32_safe(p, end, len, e_inval); 1865 len *= sizeof(u32) + (struct_v >= 5 ? sizeof(u32) : sizeof(u8)); 1866 ceph_decode_need(p, end, len, e_inval); 1867 *p += len; 1868 1869 /* new_weight */ 1870 ceph_decode_32_safe(p, end, len, e_inval); 1871 while (len--) { 1872 s32 osd; 1873 u32 w; 1874 1875 ceph_decode_need(p, end, 2*sizeof(u32), e_inval); 1876 osd = ceph_decode_32(p); 1877 w = ceph_decode_32(p); 1878 if (osd >= map->max_osd) 1879 goto e_inval; 1880 1881 osdmap_info(map, "osd%d weight 0x%x %s\n", osd, w, 1882 w == CEPH_OSD_IN ? "(in)" : 1883 (w == CEPH_OSD_OUT ? "(out)" : "")); 1884 map->osd_weight[osd] = w; 1885 1886 /* 1887 * If we are marking in, set the EXISTS, and clear the 1888 * AUTOOUT and NEW bits. 1889 */ 1890 if (w) { 1891 map->osd_state[osd] |= CEPH_OSD_EXISTS; 1892 map->osd_state[osd] &= ~(CEPH_OSD_AUTOOUT | 1893 CEPH_OSD_NEW); 1894 } 1895 } 1896 new_weight_end = *p; 1897 1898 /* new_state (up/down) */ 1899 *p = new_state; 1900 len = ceph_decode_32(p); 1901 while (len--) { 1902 s32 osd; 1903 u32 xorstate; 1904 1905 osd = ceph_decode_32(p); 1906 if (osd >= map->max_osd) 1907 goto e_inval; 1908 1909 if (struct_v >= 5) 1910 xorstate = ceph_decode_32(p); 1911 else 1912 xorstate = ceph_decode_8(p); 1913 if (xorstate == 0) 1914 xorstate = CEPH_OSD_UP; 1915 if ((map->osd_state[osd] & CEPH_OSD_UP) && 1916 (xorstate & CEPH_OSD_UP)) 1917 osdmap_info(map, "osd%d down\n", osd); 1918 if ((map->osd_state[osd] & CEPH_OSD_EXISTS) && 1919 (xorstate & CEPH_OSD_EXISTS)) { 1920 osdmap_info(map, "osd%d does not exist\n", osd); 1921 ret = set_primary_affinity(map, osd, 1922 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY); 1923 if (ret) 1924 return ret; 1925 memset(map->osd_addr + osd, 0, sizeof(*map->osd_addr)); 1926 map->osd_state[osd] = 0; 1927 } else { 1928 map->osd_state[osd] ^= xorstate; 1929 } 1930 } 1931 1932 /* new_up_client */ 1933 *p = new_up_client; 1934 len = ceph_decode_32(p); 1935 while (len--) { 1936 s32 osd; 1937 struct ceph_entity_addr addr; 1938 1939 osd = ceph_decode_32(p); 1940 if (osd >= map->max_osd) 1941 goto e_inval; 1942 1943 if (struct_v >= 7) 1944 ret = ceph_decode_entity_addrvec(p, end, msgr2, &addr); 1945 else 1946 ret = ceph_decode_entity_addr(p, end, &addr); 1947 if (ret) 1948 return ret; 1949 1950 dout("%s osd%d addr %s\n", __func__, osd, ceph_pr_addr(&addr)); 1951 1952 osdmap_info(map, "osd%d up\n", osd); 1953 map->osd_state[osd] |= CEPH_OSD_EXISTS | CEPH_OSD_UP; 1954 map->osd_addr[osd] = addr; 1955 } 1956 1957 *p = new_weight_end; 1958 return 0; 1959 1960 e_inval: 1961 return -EINVAL; 1962 } 1963 1964 /* 1965 * decode and apply an incremental map update. 1966 */ 1967 struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, bool msgr2, 1968 struct ceph_osdmap *map) 1969 { 1970 struct ceph_fsid fsid; 1971 u32 epoch = 0; 1972 struct ceph_timespec modified; 1973 s32 len; 1974 u64 pool; 1975 __s64 new_pool_max; 1976 __s32 new_flags, max; 1977 void *start = *p; 1978 int err; 1979 u8 struct_v; 1980 1981 dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p)); 1982 1983 err = get_osdmap_client_data_v(p, end, "inc", &struct_v); 1984 if (err) 1985 goto bad; 1986 1987 /* fsid, epoch, modified, new_pool_max, new_flags */ 1988 ceph_decode_need(p, end, sizeof(fsid) + sizeof(u32) + sizeof(modified) + 1989 sizeof(u64) + sizeof(u32), e_inval); 1990 ceph_decode_copy(p, &fsid, sizeof(fsid)); 1991 epoch = ceph_decode_32(p); 1992 ceph_decode_copy(p, &modified, sizeof(modified)); 1993 new_pool_max = ceph_decode_64(p); 1994 new_flags = ceph_decode_32(p); 1995 1996 if (epoch != map->epoch + 1) 1997 goto e_inval; 1998 1999 /* full map? */ 2000 ceph_decode_32_safe(p, end, len, e_inval); 2001 if (len > 0) { 2002 dout("apply_incremental full map len %d, %p to %p\n", 2003 len, *p, end); 2004 return ceph_osdmap_decode(p, min(*p+len, end), msgr2); 2005 } 2006 2007 /* new crush? */ 2008 ceph_decode_32_safe(p, end, len, e_inval); 2009 if (len > 0) { 2010 err = osdmap_set_crush(map, 2011 crush_decode(*p, min(*p + len, end))); 2012 if (err) 2013 goto bad; 2014 *p += len; 2015 } 2016 2017 /* new flags? */ 2018 if (new_flags >= 0) 2019 map->flags = new_flags; 2020 if (new_pool_max >= 0) 2021 map->pool_max = new_pool_max; 2022 2023 /* new max? */ 2024 ceph_decode_32_safe(p, end, max, e_inval); 2025 if (max >= 0) { 2026 err = osdmap_set_max_osd(map, max); 2027 if (err) 2028 goto bad; 2029 } 2030 2031 map->epoch++; 2032 map->modified = modified; 2033 2034 /* new_pools */ 2035 err = decode_new_pools(p, end, map); 2036 if (err) 2037 goto bad; 2038 2039 /* new_pool_names */ 2040 err = decode_pool_names(p, end, map); 2041 if (err) 2042 goto bad; 2043 2044 /* old_pool */ 2045 ceph_decode_32_safe(p, end, len, e_inval); 2046 while (len--) { 2047 struct ceph_pg_pool_info *pi; 2048 2049 ceph_decode_64_safe(p, end, pool, e_inval); 2050 pi = lookup_pg_pool(&map->pg_pools, pool); 2051 if (pi) 2052 __remove_pg_pool(&map->pg_pools, pi); 2053 } 2054 2055 /* new_up_client, new_state, new_weight */ 2056 err = decode_new_up_state_weight(p, end, struct_v, msgr2, map); 2057 if (err) 2058 goto bad; 2059 2060 /* new_pg_temp */ 2061 err = decode_new_pg_temp(p, end, map); 2062 if (err) 2063 goto bad; 2064 2065 /* new_primary_temp */ 2066 if (struct_v >= 1) { 2067 err = decode_new_primary_temp(p, end, map); 2068 if (err) 2069 goto bad; 2070 } 2071 2072 /* new_primary_affinity */ 2073 if (struct_v >= 2) { 2074 err = decode_new_primary_affinity(p, end, map); 2075 if (err) 2076 goto bad; 2077 } 2078 2079 if (struct_v >= 3) { 2080 /* new_erasure_code_profiles */ 2081 ceph_decode_skip_map_of_map(p, end, string, string, string, 2082 e_inval); 2083 /* old_erasure_code_profiles */ 2084 ceph_decode_skip_set(p, end, string, e_inval); 2085 } 2086 2087 if (struct_v >= 4) { 2088 err = decode_new_pg_upmap(p, end, map); 2089 if (err) 2090 goto bad; 2091 2092 err = decode_old_pg_upmap(p, end, map); 2093 if (err) 2094 goto bad; 2095 2096 err = decode_new_pg_upmap_items(p, end, map); 2097 if (err) 2098 goto bad; 2099 2100 err = decode_old_pg_upmap_items(p, end, map); 2101 if (err) 2102 goto bad; 2103 } 2104 2105 /* ignore the rest */ 2106 *p = end; 2107 2108 dout("inc osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd); 2109 return map; 2110 2111 e_inval: 2112 err = -EINVAL; 2113 bad: 2114 pr_err("corrupt inc osdmap (%d) epoch %d off %d (%p of %p-%p)\n", 2115 err, epoch, (int)(*p - start), *p, start, end); 2116 print_hex_dump(KERN_DEBUG, "osdmap: ", 2117 DUMP_PREFIX_OFFSET, 16, 1, 2118 start, end - start, true); 2119 return ERR_PTR(err); 2120 } 2121 2122 void ceph_oloc_copy(struct ceph_object_locator *dest, 2123 const struct ceph_object_locator *src) 2124 { 2125 ceph_oloc_destroy(dest); 2126 2127 dest->pool = src->pool; 2128 if (src->pool_ns) 2129 dest->pool_ns = ceph_get_string(src->pool_ns); 2130 else 2131 dest->pool_ns = NULL; 2132 } 2133 EXPORT_SYMBOL(ceph_oloc_copy); 2134 2135 void ceph_oloc_destroy(struct ceph_object_locator *oloc) 2136 { 2137 ceph_put_string(oloc->pool_ns); 2138 } 2139 EXPORT_SYMBOL(ceph_oloc_destroy); 2140 2141 void ceph_oid_copy(struct ceph_object_id *dest, 2142 const struct ceph_object_id *src) 2143 { 2144 ceph_oid_destroy(dest); 2145 2146 if (src->name != src->inline_name) { 2147 /* very rare, see ceph_object_id definition */ 2148 dest->name = kmalloc(src->name_len + 1, 2149 GFP_NOIO | __GFP_NOFAIL); 2150 } else { 2151 dest->name = dest->inline_name; 2152 } 2153 memcpy(dest->name, src->name, src->name_len + 1); 2154 dest->name_len = src->name_len; 2155 } 2156 EXPORT_SYMBOL(ceph_oid_copy); 2157 2158 static __printf(2, 0) 2159 int oid_printf_vargs(struct ceph_object_id *oid, const char *fmt, va_list ap) 2160 { 2161 int len; 2162 2163 WARN_ON(!ceph_oid_empty(oid)); 2164 2165 len = vsnprintf(oid->inline_name, sizeof(oid->inline_name), fmt, ap); 2166 if (len >= sizeof(oid->inline_name)) 2167 return len; 2168 2169 oid->name_len = len; 2170 return 0; 2171 } 2172 2173 /* 2174 * If oid doesn't fit into inline buffer, BUG. 2175 */ 2176 void ceph_oid_printf(struct ceph_object_id *oid, const char *fmt, ...) 2177 { 2178 va_list ap; 2179 2180 va_start(ap, fmt); 2181 BUG_ON(oid_printf_vargs(oid, fmt, ap)); 2182 va_end(ap); 2183 } 2184 EXPORT_SYMBOL(ceph_oid_printf); 2185 2186 static __printf(3, 0) 2187 int oid_aprintf_vargs(struct ceph_object_id *oid, gfp_t gfp, 2188 const char *fmt, va_list ap) 2189 { 2190 va_list aq; 2191 int len; 2192 2193 va_copy(aq, ap); 2194 len = oid_printf_vargs(oid, fmt, aq); 2195 va_end(aq); 2196 2197 if (len) { 2198 char *external_name; 2199 2200 external_name = kmalloc(len + 1, gfp); 2201 if (!external_name) 2202 return -ENOMEM; 2203 2204 oid->name = external_name; 2205 WARN_ON(vsnprintf(oid->name, len + 1, fmt, ap) != len); 2206 oid->name_len = len; 2207 } 2208 2209 return 0; 2210 } 2211 2212 /* 2213 * If oid doesn't fit into inline buffer, allocate. 2214 */ 2215 int ceph_oid_aprintf(struct ceph_object_id *oid, gfp_t gfp, 2216 const char *fmt, ...) 2217 { 2218 va_list ap; 2219 int ret; 2220 2221 va_start(ap, fmt); 2222 ret = oid_aprintf_vargs(oid, gfp, fmt, ap); 2223 va_end(ap); 2224 2225 return ret; 2226 } 2227 EXPORT_SYMBOL(ceph_oid_aprintf); 2228 2229 void ceph_oid_destroy(struct ceph_object_id *oid) 2230 { 2231 if (oid->name != oid->inline_name) 2232 kfree(oid->name); 2233 } 2234 EXPORT_SYMBOL(ceph_oid_destroy); 2235 2236 /* 2237 * osds only 2238 */ 2239 static bool __osds_equal(const struct ceph_osds *lhs, 2240 const struct ceph_osds *rhs) 2241 { 2242 if (lhs->size == rhs->size && 2243 !memcmp(lhs->osds, rhs->osds, rhs->size * sizeof(rhs->osds[0]))) 2244 return true; 2245 2246 return false; 2247 } 2248 2249 /* 2250 * osds + primary 2251 */ 2252 static bool osds_equal(const struct ceph_osds *lhs, 2253 const struct ceph_osds *rhs) 2254 { 2255 if (__osds_equal(lhs, rhs) && 2256 lhs->primary == rhs->primary) 2257 return true; 2258 2259 return false; 2260 } 2261 2262 static bool osds_valid(const struct ceph_osds *set) 2263 { 2264 /* non-empty set */ 2265 if (set->size > 0 && set->primary >= 0) 2266 return true; 2267 2268 /* empty can_shift_osds set */ 2269 if (!set->size && set->primary == -1) 2270 return true; 2271 2272 /* empty !can_shift_osds set - all NONE */ 2273 if (set->size > 0 && set->primary == -1) { 2274 int i; 2275 2276 for (i = 0; i < set->size; i++) { 2277 if (set->osds[i] != CRUSH_ITEM_NONE) 2278 break; 2279 } 2280 if (i == set->size) 2281 return true; 2282 } 2283 2284 return false; 2285 } 2286 2287 void ceph_osds_copy(struct ceph_osds *dest, const struct ceph_osds *src) 2288 { 2289 memcpy(dest->osds, src->osds, src->size * sizeof(src->osds[0])); 2290 dest->size = src->size; 2291 dest->primary = src->primary; 2292 } 2293 2294 bool ceph_pg_is_split(const struct ceph_pg *pgid, u32 old_pg_num, 2295 u32 new_pg_num) 2296 { 2297 int old_bits = calc_bits_of(old_pg_num); 2298 int old_mask = (1 << old_bits) - 1; 2299 int n; 2300 2301 WARN_ON(pgid->seed >= old_pg_num); 2302 if (new_pg_num <= old_pg_num) 2303 return false; 2304 2305 for (n = 1; ; n++) { 2306 int next_bit = n << (old_bits - 1); 2307 u32 s = next_bit | pgid->seed; 2308 2309 if (s < old_pg_num || s == pgid->seed) 2310 continue; 2311 if (s >= new_pg_num) 2312 break; 2313 2314 s = ceph_stable_mod(s, old_pg_num, old_mask); 2315 if (s == pgid->seed) 2316 return true; 2317 } 2318 2319 return false; 2320 } 2321 2322 bool ceph_is_new_interval(const struct ceph_osds *old_acting, 2323 const struct ceph_osds *new_acting, 2324 const struct ceph_osds *old_up, 2325 const struct ceph_osds *new_up, 2326 int old_size, 2327 int new_size, 2328 int old_min_size, 2329 int new_min_size, 2330 u32 old_pg_num, 2331 u32 new_pg_num, 2332 bool old_sort_bitwise, 2333 bool new_sort_bitwise, 2334 bool old_recovery_deletes, 2335 bool new_recovery_deletes, 2336 const struct ceph_pg *pgid) 2337 { 2338 return !osds_equal(old_acting, new_acting) || 2339 !osds_equal(old_up, new_up) || 2340 old_size != new_size || 2341 old_min_size != new_min_size || 2342 ceph_pg_is_split(pgid, old_pg_num, new_pg_num) || 2343 old_sort_bitwise != new_sort_bitwise || 2344 old_recovery_deletes != new_recovery_deletes; 2345 } 2346 2347 static int calc_pg_rank(int osd, const struct ceph_osds *acting) 2348 { 2349 int i; 2350 2351 for (i = 0; i < acting->size; i++) { 2352 if (acting->osds[i] == osd) 2353 return i; 2354 } 2355 2356 return -1; 2357 } 2358 2359 static bool primary_changed(const struct ceph_osds *old_acting, 2360 const struct ceph_osds *new_acting) 2361 { 2362 if (!old_acting->size && !new_acting->size) 2363 return false; /* both still empty */ 2364 2365 if (!old_acting->size ^ !new_acting->size) 2366 return true; /* was empty, now not, or vice versa */ 2367 2368 if (old_acting->primary != new_acting->primary) 2369 return true; /* primary changed */ 2370 2371 if (calc_pg_rank(old_acting->primary, old_acting) != 2372 calc_pg_rank(new_acting->primary, new_acting)) 2373 return true; 2374 2375 return false; /* same primary (tho replicas may have changed) */ 2376 } 2377 2378 bool ceph_osds_changed(const struct ceph_osds *old_acting, 2379 const struct ceph_osds *new_acting, 2380 bool any_change) 2381 { 2382 if (primary_changed(old_acting, new_acting)) 2383 return true; 2384 2385 if (any_change && !__osds_equal(old_acting, new_acting)) 2386 return true; 2387 2388 return false; 2389 } 2390 2391 /* 2392 * Map an object into a PG. 2393 * 2394 * Should only be called with target_oid and target_oloc (as opposed to 2395 * base_oid and base_oloc), since tiering isn't taken into account. 2396 */ 2397 void __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi, 2398 const struct ceph_object_id *oid, 2399 const struct ceph_object_locator *oloc, 2400 struct ceph_pg *raw_pgid) 2401 { 2402 WARN_ON(pi->id != oloc->pool); 2403 2404 if (!oloc->pool_ns) { 2405 raw_pgid->pool = oloc->pool; 2406 raw_pgid->seed = ceph_str_hash(pi->object_hash, oid->name, 2407 oid->name_len); 2408 dout("%s %s -> raw_pgid %llu.%x\n", __func__, oid->name, 2409 raw_pgid->pool, raw_pgid->seed); 2410 } else { 2411 char stack_buf[256]; 2412 char *buf = stack_buf; 2413 int nsl = oloc->pool_ns->len; 2414 size_t total = nsl + 1 + oid->name_len; 2415 2416 if (total > sizeof(stack_buf)) 2417 buf = kmalloc(total, GFP_NOIO | __GFP_NOFAIL); 2418 memcpy(buf, oloc->pool_ns->str, nsl); 2419 buf[nsl] = '\037'; 2420 memcpy(buf + nsl + 1, oid->name, oid->name_len); 2421 raw_pgid->pool = oloc->pool; 2422 raw_pgid->seed = ceph_str_hash(pi->object_hash, buf, total); 2423 if (buf != stack_buf) 2424 kfree(buf); 2425 dout("%s %s ns %.*s -> raw_pgid %llu.%x\n", __func__, 2426 oid->name, nsl, oloc->pool_ns->str, 2427 raw_pgid->pool, raw_pgid->seed); 2428 } 2429 } 2430 2431 int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap, 2432 const struct ceph_object_id *oid, 2433 const struct ceph_object_locator *oloc, 2434 struct ceph_pg *raw_pgid) 2435 { 2436 struct ceph_pg_pool_info *pi; 2437 2438 pi = ceph_pg_pool_by_id(osdmap, oloc->pool); 2439 if (!pi) 2440 return -ENOENT; 2441 2442 __ceph_object_locator_to_pg(pi, oid, oloc, raw_pgid); 2443 return 0; 2444 } 2445 EXPORT_SYMBOL(ceph_object_locator_to_pg); 2446 2447 /* 2448 * Map a raw PG (full precision ps) into an actual PG. 2449 */ 2450 static void raw_pg_to_pg(struct ceph_pg_pool_info *pi, 2451 const struct ceph_pg *raw_pgid, 2452 struct ceph_pg *pgid) 2453 { 2454 pgid->pool = raw_pgid->pool; 2455 pgid->seed = ceph_stable_mod(raw_pgid->seed, pi->pg_num, 2456 pi->pg_num_mask); 2457 } 2458 2459 /* 2460 * Map a raw PG (full precision ps) into a placement ps (placement 2461 * seed). Include pool id in that value so that different pools don't 2462 * use the same seeds. 2463 */ 2464 static u32 raw_pg_to_pps(struct ceph_pg_pool_info *pi, 2465 const struct ceph_pg *raw_pgid) 2466 { 2467 if (pi->flags & CEPH_POOL_FLAG_HASHPSPOOL) { 2468 /* hash pool id and seed so that pool PGs do not overlap */ 2469 return crush_hash32_2(CRUSH_HASH_RJENKINS1, 2470 ceph_stable_mod(raw_pgid->seed, 2471 pi->pgp_num, 2472 pi->pgp_num_mask), 2473 raw_pgid->pool); 2474 } else { 2475 /* 2476 * legacy behavior: add ps and pool together. this is 2477 * not a great approach because the PGs from each pool 2478 * will overlap on top of each other: 0.5 == 1.4 == 2479 * 2.3 == ... 2480 */ 2481 return ceph_stable_mod(raw_pgid->seed, pi->pgp_num, 2482 pi->pgp_num_mask) + 2483 (unsigned)raw_pgid->pool; 2484 } 2485 } 2486 2487 /* 2488 * Magic value used for a "default" fallback choose_args, used if the 2489 * crush_choose_arg_map passed to do_crush() does not exist. If this 2490 * also doesn't exist, fall back to canonical weights. 2491 */ 2492 #define CEPH_DEFAULT_CHOOSE_ARGS -1 2493 2494 static int do_crush(struct ceph_osdmap *map, int ruleno, int x, 2495 int *result, int result_max, 2496 const __u32 *weight, int weight_max, 2497 s64 choose_args_index) 2498 { 2499 struct crush_choose_arg_map *arg_map; 2500 struct crush_work *work; 2501 int r; 2502 2503 BUG_ON(result_max > CEPH_PG_MAX_SIZE); 2504 2505 arg_map = lookup_choose_arg_map(&map->crush->choose_args, 2506 choose_args_index); 2507 if (!arg_map) 2508 arg_map = lookup_choose_arg_map(&map->crush->choose_args, 2509 CEPH_DEFAULT_CHOOSE_ARGS); 2510 2511 work = get_workspace(&map->crush_wsm, map->crush); 2512 r = crush_do_rule(map->crush, ruleno, x, result, result_max, 2513 weight, weight_max, work, 2514 arg_map ? arg_map->args : NULL); 2515 put_workspace(&map->crush_wsm, work); 2516 return r; 2517 } 2518 2519 static void remove_nonexistent_osds(struct ceph_osdmap *osdmap, 2520 struct ceph_pg_pool_info *pi, 2521 struct ceph_osds *set) 2522 { 2523 int i; 2524 2525 if (ceph_can_shift_osds(pi)) { 2526 int removed = 0; 2527 2528 /* shift left */ 2529 for (i = 0; i < set->size; i++) { 2530 if (!ceph_osd_exists(osdmap, set->osds[i])) { 2531 removed++; 2532 continue; 2533 } 2534 if (removed) 2535 set->osds[i - removed] = set->osds[i]; 2536 } 2537 set->size -= removed; 2538 } else { 2539 /* set dne devices to NONE */ 2540 for (i = 0; i < set->size; i++) { 2541 if (!ceph_osd_exists(osdmap, set->osds[i])) 2542 set->osds[i] = CRUSH_ITEM_NONE; 2543 } 2544 } 2545 } 2546 2547 /* 2548 * Calculate raw set (CRUSH output) for given PG and filter out 2549 * nonexistent OSDs. ->primary is undefined for a raw set. 2550 * 2551 * Placement seed (CRUSH input) is returned through @ppps. 2552 */ 2553 static void pg_to_raw_osds(struct ceph_osdmap *osdmap, 2554 struct ceph_pg_pool_info *pi, 2555 const struct ceph_pg *raw_pgid, 2556 struct ceph_osds *raw, 2557 u32 *ppps) 2558 { 2559 u32 pps = raw_pg_to_pps(pi, raw_pgid); 2560 int ruleno; 2561 int len; 2562 2563 ceph_osds_init(raw); 2564 if (ppps) 2565 *ppps = pps; 2566 2567 ruleno = crush_find_rule(osdmap->crush, pi->crush_ruleset, pi->type, 2568 pi->size); 2569 if (ruleno < 0) { 2570 pr_err("no crush rule: pool %lld ruleset %d type %d size %d\n", 2571 pi->id, pi->crush_ruleset, pi->type, pi->size); 2572 return; 2573 } 2574 2575 if (pi->size > ARRAY_SIZE(raw->osds)) { 2576 pr_err_ratelimited("pool %lld ruleset %d type %d too wide: size %d > %zu\n", 2577 pi->id, pi->crush_ruleset, pi->type, pi->size, 2578 ARRAY_SIZE(raw->osds)); 2579 return; 2580 } 2581 2582 len = do_crush(osdmap, ruleno, pps, raw->osds, pi->size, 2583 osdmap->osd_weight, osdmap->max_osd, pi->id); 2584 if (len < 0) { 2585 pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n", 2586 len, ruleno, pi->id, pi->crush_ruleset, pi->type, 2587 pi->size); 2588 return; 2589 } 2590 2591 raw->size = len; 2592 remove_nonexistent_osds(osdmap, pi, raw); 2593 } 2594 2595 /* apply pg_upmap[_items] mappings */ 2596 static void apply_upmap(struct ceph_osdmap *osdmap, 2597 const struct ceph_pg *pgid, 2598 struct ceph_osds *raw) 2599 { 2600 struct ceph_pg_mapping *pg; 2601 int i, j; 2602 2603 pg = lookup_pg_mapping(&osdmap->pg_upmap, pgid); 2604 if (pg) { 2605 /* make sure targets aren't marked out */ 2606 for (i = 0; i < pg->pg_upmap.len; i++) { 2607 int osd = pg->pg_upmap.osds[i]; 2608 2609 if (osd != CRUSH_ITEM_NONE && 2610 osd < osdmap->max_osd && 2611 osdmap->osd_weight[osd] == 0) { 2612 /* reject/ignore explicit mapping */ 2613 return; 2614 } 2615 } 2616 for (i = 0; i < pg->pg_upmap.len; i++) 2617 raw->osds[i] = pg->pg_upmap.osds[i]; 2618 raw->size = pg->pg_upmap.len; 2619 /* check and apply pg_upmap_items, if any */ 2620 } 2621 2622 pg = lookup_pg_mapping(&osdmap->pg_upmap_items, pgid); 2623 if (pg) { 2624 /* 2625 * Note: this approach does not allow a bidirectional swap, 2626 * e.g., [[1,2],[2,1]] applied to [0,1,2] -> [0,2,1]. 2627 */ 2628 for (i = 0; i < pg->pg_upmap_items.len; i++) { 2629 int from = pg->pg_upmap_items.from_to[i][0]; 2630 int to = pg->pg_upmap_items.from_to[i][1]; 2631 int pos = -1; 2632 bool exists = false; 2633 2634 /* make sure replacement doesn't already appear */ 2635 for (j = 0; j < raw->size; j++) { 2636 int osd = raw->osds[j]; 2637 2638 if (osd == to) { 2639 exists = true; 2640 break; 2641 } 2642 /* ignore mapping if target is marked out */ 2643 if (osd == from && pos < 0 && 2644 !(to != CRUSH_ITEM_NONE && 2645 to < osdmap->max_osd && 2646 osdmap->osd_weight[to] == 0)) { 2647 pos = j; 2648 } 2649 } 2650 if (!exists && pos >= 0) 2651 raw->osds[pos] = to; 2652 } 2653 } 2654 } 2655 2656 /* 2657 * Given raw set, calculate up set and up primary. By definition of an 2658 * up set, the result won't contain nonexistent or down OSDs. 2659 * 2660 * This is done in-place - on return @set is the up set. If it's 2661 * empty, ->primary will remain undefined. 2662 */ 2663 static void raw_to_up_osds(struct ceph_osdmap *osdmap, 2664 struct ceph_pg_pool_info *pi, 2665 struct ceph_osds *set) 2666 { 2667 int i; 2668 2669 /* ->primary is undefined for a raw set */ 2670 BUG_ON(set->primary != -1); 2671 2672 if (ceph_can_shift_osds(pi)) { 2673 int removed = 0; 2674 2675 /* shift left */ 2676 for (i = 0; i < set->size; i++) { 2677 if (ceph_osd_is_down(osdmap, set->osds[i])) { 2678 removed++; 2679 continue; 2680 } 2681 if (removed) 2682 set->osds[i - removed] = set->osds[i]; 2683 } 2684 set->size -= removed; 2685 if (set->size > 0) 2686 set->primary = set->osds[0]; 2687 } else { 2688 /* set down/dne devices to NONE */ 2689 for (i = set->size - 1; i >= 0; i--) { 2690 if (ceph_osd_is_down(osdmap, set->osds[i])) 2691 set->osds[i] = CRUSH_ITEM_NONE; 2692 else 2693 set->primary = set->osds[i]; 2694 } 2695 } 2696 } 2697 2698 static void apply_primary_affinity(struct ceph_osdmap *osdmap, 2699 struct ceph_pg_pool_info *pi, 2700 u32 pps, 2701 struct ceph_osds *up) 2702 { 2703 int i; 2704 int pos = -1; 2705 2706 /* 2707 * Do we have any non-default primary_affinity values for these 2708 * osds? 2709 */ 2710 if (!osdmap->osd_primary_affinity) 2711 return; 2712 2713 for (i = 0; i < up->size; i++) { 2714 int osd = up->osds[i]; 2715 2716 if (osd != CRUSH_ITEM_NONE && 2717 osdmap->osd_primary_affinity[osd] != 2718 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY) { 2719 break; 2720 } 2721 } 2722 if (i == up->size) 2723 return; 2724 2725 /* 2726 * Pick the primary. Feed both the seed (for the pg) and the 2727 * osd into the hash/rng so that a proportional fraction of an 2728 * osd's pgs get rejected as primary. 2729 */ 2730 for (i = 0; i < up->size; i++) { 2731 int osd = up->osds[i]; 2732 u32 aff; 2733 2734 if (osd == CRUSH_ITEM_NONE) 2735 continue; 2736 2737 aff = osdmap->osd_primary_affinity[osd]; 2738 if (aff < CEPH_OSD_MAX_PRIMARY_AFFINITY && 2739 (crush_hash32_2(CRUSH_HASH_RJENKINS1, 2740 pps, osd) >> 16) >= aff) { 2741 /* 2742 * We chose not to use this primary. Note it 2743 * anyway as a fallback in case we don't pick 2744 * anyone else, but keep looking. 2745 */ 2746 if (pos < 0) 2747 pos = i; 2748 } else { 2749 pos = i; 2750 break; 2751 } 2752 } 2753 if (pos < 0) 2754 return; 2755 2756 up->primary = up->osds[pos]; 2757 2758 if (ceph_can_shift_osds(pi) && pos > 0) { 2759 /* move the new primary to the front */ 2760 for (i = pos; i > 0; i--) 2761 up->osds[i] = up->osds[i - 1]; 2762 up->osds[0] = up->primary; 2763 } 2764 } 2765 2766 /* 2767 * Get pg_temp and primary_temp mappings for given PG. 2768 * 2769 * Note that a PG may have none, only pg_temp, only primary_temp or 2770 * both pg_temp and primary_temp mappings. This means @temp isn't 2771 * always a valid OSD set on return: in the "only primary_temp" case, 2772 * @temp will have its ->primary >= 0 but ->size == 0. 2773 */ 2774 static void get_temp_osds(struct ceph_osdmap *osdmap, 2775 struct ceph_pg_pool_info *pi, 2776 const struct ceph_pg *pgid, 2777 struct ceph_osds *temp) 2778 { 2779 struct ceph_pg_mapping *pg; 2780 int i; 2781 2782 ceph_osds_init(temp); 2783 2784 /* pg_temp? */ 2785 pg = lookup_pg_mapping(&osdmap->pg_temp, pgid); 2786 if (pg) { 2787 for (i = 0; i < pg->pg_temp.len; i++) { 2788 if (ceph_osd_is_down(osdmap, pg->pg_temp.osds[i])) { 2789 if (ceph_can_shift_osds(pi)) 2790 continue; 2791 2792 temp->osds[temp->size++] = CRUSH_ITEM_NONE; 2793 } else { 2794 temp->osds[temp->size++] = pg->pg_temp.osds[i]; 2795 } 2796 } 2797 2798 /* apply pg_temp's primary */ 2799 for (i = 0; i < temp->size; i++) { 2800 if (temp->osds[i] != CRUSH_ITEM_NONE) { 2801 temp->primary = temp->osds[i]; 2802 break; 2803 } 2804 } 2805 } 2806 2807 /* primary_temp? */ 2808 pg = lookup_pg_mapping(&osdmap->primary_temp, pgid); 2809 if (pg) 2810 temp->primary = pg->primary_temp.osd; 2811 } 2812 2813 /* 2814 * Map a PG to its acting set as well as its up set. 2815 * 2816 * Acting set is used for data mapping purposes, while up set can be 2817 * recorded for detecting interval changes and deciding whether to 2818 * resend a request. 2819 */ 2820 void ceph_pg_to_up_acting_osds(struct ceph_osdmap *osdmap, 2821 struct ceph_pg_pool_info *pi, 2822 const struct ceph_pg *raw_pgid, 2823 struct ceph_osds *up, 2824 struct ceph_osds *acting) 2825 { 2826 struct ceph_pg pgid; 2827 u32 pps; 2828 2829 WARN_ON(pi->id != raw_pgid->pool); 2830 raw_pg_to_pg(pi, raw_pgid, &pgid); 2831 2832 pg_to_raw_osds(osdmap, pi, raw_pgid, up, &pps); 2833 apply_upmap(osdmap, &pgid, up); 2834 raw_to_up_osds(osdmap, pi, up); 2835 apply_primary_affinity(osdmap, pi, pps, up); 2836 get_temp_osds(osdmap, pi, &pgid, acting); 2837 if (!acting->size) { 2838 memcpy(acting->osds, up->osds, up->size * sizeof(up->osds[0])); 2839 acting->size = up->size; 2840 if (acting->primary == -1) 2841 acting->primary = up->primary; 2842 } 2843 WARN_ON(!osds_valid(up) || !osds_valid(acting)); 2844 } 2845 2846 bool ceph_pg_to_primary_shard(struct ceph_osdmap *osdmap, 2847 struct ceph_pg_pool_info *pi, 2848 const struct ceph_pg *raw_pgid, 2849 struct ceph_spg *spgid) 2850 { 2851 struct ceph_pg pgid; 2852 struct ceph_osds up, acting; 2853 int i; 2854 2855 WARN_ON(pi->id != raw_pgid->pool); 2856 raw_pg_to_pg(pi, raw_pgid, &pgid); 2857 2858 if (ceph_can_shift_osds(pi)) { 2859 spgid->pgid = pgid; /* struct */ 2860 spgid->shard = CEPH_SPG_NOSHARD; 2861 return true; 2862 } 2863 2864 ceph_pg_to_up_acting_osds(osdmap, pi, &pgid, &up, &acting); 2865 for (i = 0; i < acting.size; i++) { 2866 if (acting.osds[i] == acting.primary) { 2867 spgid->pgid = pgid; /* struct */ 2868 spgid->shard = i; 2869 return true; 2870 } 2871 } 2872 2873 return false; 2874 } 2875 2876 /* 2877 * Return acting primary for given PG, or -1 if none. 2878 */ 2879 int ceph_pg_to_acting_primary(struct ceph_osdmap *osdmap, 2880 const struct ceph_pg *raw_pgid) 2881 { 2882 struct ceph_pg_pool_info *pi; 2883 struct ceph_osds up, acting; 2884 2885 pi = ceph_pg_pool_by_id(osdmap, raw_pgid->pool); 2886 if (!pi) 2887 return -1; 2888 2889 ceph_pg_to_up_acting_osds(osdmap, pi, raw_pgid, &up, &acting); 2890 return acting.primary; 2891 } 2892 EXPORT_SYMBOL(ceph_pg_to_acting_primary); 2893 2894 static struct crush_loc_node *alloc_crush_loc(size_t type_name_len, 2895 size_t name_len) 2896 { 2897 struct crush_loc_node *loc; 2898 2899 loc = kmalloc(sizeof(*loc) + type_name_len + name_len + 2, GFP_NOIO); 2900 if (!loc) 2901 return NULL; 2902 2903 RB_CLEAR_NODE(&loc->cl_node); 2904 return loc; 2905 } 2906 2907 static void free_crush_loc(struct crush_loc_node *loc) 2908 { 2909 WARN_ON(!RB_EMPTY_NODE(&loc->cl_node)); 2910 2911 kfree(loc); 2912 } 2913 2914 static int crush_loc_compare(const struct crush_loc *loc1, 2915 const struct crush_loc *loc2) 2916 { 2917 return strcmp(loc1->cl_type_name, loc2->cl_type_name) ?: 2918 strcmp(loc1->cl_name, loc2->cl_name); 2919 } 2920 2921 DEFINE_RB_FUNCS2(crush_loc, struct crush_loc_node, cl_loc, crush_loc_compare, 2922 RB_BYPTR, const struct crush_loc *, cl_node) 2923 2924 /* 2925 * Parses a set of <bucket type name>':'<bucket name> pairs separated 2926 * by '|', e.g. "rack:foo1|rack:foo2|datacenter:bar". 2927 * 2928 * Note that @crush_location is modified by strsep(). 2929 */ 2930 int ceph_parse_crush_location(char *crush_location, struct rb_root *locs) 2931 { 2932 struct crush_loc_node *loc; 2933 const char *type_name, *name, *colon; 2934 size_t type_name_len, name_len; 2935 2936 dout("%s '%s'\n", __func__, crush_location); 2937 while ((type_name = strsep(&crush_location, "|"))) { 2938 colon = strchr(type_name, ':'); 2939 if (!colon) 2940 return -EINVAL; 2941 2942 type_name_len = colon - type_name; 2943 if (type_name_len == 0) 2944 return -EINVAL; 2945 2946 name = colon + 1; 2947 name_len = strlen(name); 2948 if (name_len == 0) 2949 return -EINVAL; 2950 2951 loc = alloc_crush_loc(type_name_len, name_len); 2952 if (!loc) 2953 return -ENOMEM; 2954 2955 loc->cl_loc.cl_type_name = loc->cl_data; 2956 memcpy(loc->cl_loc.cl_type_name, type_name, type_name_len); 2957 loc->cl_loc.cl_type_name[type_name_len] = '\0'; 2958 2959 loc->cl_loc.cl_name = loc->cl_data + type_name_len + 1; 2960 memcpy(loc->cl_loc.cl_name, name, name_len); 2961 loc->cl_loc.cl_name[name_len] = '\0'; 2962 2963 if (!__insert_crush_loc(locs, loc)) { 2964 free_crush_loc(loc); 2965 return -EEXIST; 2966 } 2967 2968 dout("%s type_name '%s' name '%s'\n", __func__, 2969 loc->cl_loc.cl_type_name, loc->cl_loc.cl_name); 2970 } 2971 2972 return 0; 2973 } 2974 2975 int ceph_compare_crush_locs(struct rb_root *locs1, struct rb_root *locs2) 2976 { 2977 struct rb_node *n1 = rb_first(locs1); 2978 struct rb_node *n2 = rb_first(locs2); 2979 int ret; 2980 2981 for ( ; n1 && n2; n1 = rb_next(n1), n2 = rb_next(n2)) { 2982 struct crush_loc_node *loc1 = 2983 rb_entry(n1, struct crush_loc_node, cl_node); 2984 struct crush_loc_node *loc2 = 2985 rb_entry(n2, struct crush_loc_node, cl_node); 2986 2987 ret = crush_loc_compare(&loc1->cl_loc, &loc2->cl_loc); 2988 if (ret) 2989 return ret; 2990 } 2991 2992 if (!n1 && n2) 2993 return -1; 2994 if (n1 && !n2) 2995 return 1; 2996 return 0; 2997 } 2998 2999 void ceph_clear_crush_locs(struct rb_root *locs) 3000 { 3001 while (!RB_EMPTY_ROOT(locs)) { 3002 struct crush_loc_node *loc = 3003 rb_entry(rb_first(locs), struct crush_loc_node, cl_node); 3004 3005 erase_crush_loc(locs, loc); 3006 free_crush_loc(loc); 3007 } 3008 } 3009 3010 /* 3011 * [a-zA-Z0-9-_.]+ 3012 */ 3013 static bool is_valid_crush_name(const char *name) 3014 { 3015 do { 3016 if (!('a' <= *name && *name <= 'z') && 3017 !('A' <= *name && *name <= 'Z') && 3018 !('0' <= *name && *name <= '9') && 3019 *name != '-' && *name != '_' && *name != '.') 3020 return false; 3021 } while (*++name != '\0'); 3022 3023 return true; 3024 } 3025 3026 /* 3027 * Gets the parent of an item. Returns its id (<0 because the 3028 * parent is always a bucket), type id (>0 for the same reason, 3029 * via @parent_type_id) and location (via @parent_loc). If no 3030 * parent, returns 0. 3031 * 3032 * Does a linear search, as there are no parent pointers of any 3033 * kind. Note that the result is ambiguous for items that occur 3034 * multiple times in the map. 3035 */ 3036 static int get_immediate_parent(struct crush_map *c, int id, 3037 u16 *parent_type_id, 3038 struct crush_loc *parent_loc) 3039 { 3040 struct crush_bucket *b; 3041 struct crush_name_node *type_cn, *cn; 3042 int i, j; 3043 3044 for (i = 0; i < c->max_buckets; i++) { 3045 b = c->buckets[i]; 3046 if (!b) 3047 continue; 3048 3049 /* ignore per-class shadow hierarchy */ 3050 cn = lookup_crush_name(&c->names, b->id); 3051 if (!cn || !is_valid_crush_name(cn->cn_name)) 3052 continue; 3053 3054 for (j = 0; j < b->size; j++) { 3055 if (b->items[j] != id) 3056 continue; 3057 3058 *parent_type_id = b->type; 3059 type_cn = lookup_crush_name(&c->type_names, b->type); 3060 parent_loc->cl_type_name = type_cn->cn_name; 3061 parent_loc->cl_name = cn->cn_name; 3062 return b->id; 3063 } 3064 } 3065 3066 return 0; /* no parent */ 3067 } 3068 3069 /* 3070 * Calculates the locality/distance from an item to a client 3071 * location expressed in terms of CRUSH hierarchy as a set of 3072 * (bucket type name, bucket name) pairs. Specifically, looks 3073 * for the lowest-valued bucket type for which the location of 3074 * @id matches one of the locations in @locs, so for standard 3075 * bucket types (host = 1, rack = 3, datacenter = 8, zone = 9) 3076 * a matching host is closer than a matching rack and a matching 3077 * data center is closer than a matching zone. 3078 * 3079 * Specifying multiple locations (a "multipath" location) such 3080 * as "rack=foo1 rack=foo2 datacenter=bar" is allowed -- @locs 3081 * is a multimap. The locality will be: 3082 * 3083 * - 3 for OSDs in racks foo1 and foo2 3084 * - 8 for OSDs in data center bar 3085 * - -1 for all other OSDs 3086 * 3087 * The lowest possible bucket type is 1, so the best locality 3088 * for an OSD is 1 (i.e. a matching host). Locality 0 would be 3089 * the OSD itself. 3090 */ 3091 int ceph_get_crush_locality(struct ceph_osdmap *osdmap, int id, 3092 struct rb_root *locs) 3093 { 3094 struct crush_loc loc; 3095 u16 type_id; 3096 3097 /* 3098 * Instead of repeated get_immediate_parent() calls, 3099 * the location of @id could be obtained with a single 3100 * depth-first traversal. 3101 */ 3102 for (;;) { 3103 id = get_immediate_parent(osdmap->crush, id, &type_id, &loc); 3104 if (id >= 0) 3105 return -1; /* not local */ 3106 3107 if (lookup_crush_loc(locs, &loc)) 3108 return type_id; 3109 } 3110 } 3111