1 // SPDX-License-Identifier: GPL-2.0 2 3 #include <linux/ceph/ceph_debug.h> 4 5 #include <linux/module.h> 6 #include <linux/slab.h> 7 8 #include <linux/ceph/libceph.h> 9 #include <linux/ceph/osdmap.h> 10 #include <linux/ceph/decode.h> 11 #include <linux/crush/hash.h> 12 #include <linux/crush/mapper.h> 13 14 static __printf(2, 3) 15 void osdmap_info(const struct ceph_osdmap *map, const char *fmt, ...) 16 { 17 struct va_format vaf; 18 va_list args; 19 20 va_start(args, fmt); 21 vaf.fmt = fmt; 22 vaf.va = &args; 23 24 printk(KERN_INFO "%s (%pU e%u): %pV", KBUILD_MODNAME, &map->fsid, 25 map->epoch, &vaf); 26 27 va_end(args); 28 } 29 30 char *ceph_osdmap_state_str(char *str, int len, u32 state) 31 { 32 if (!len) 33 return str; 34 35 if ((state & CEPH_OSD_EXISTS) && (state & CEPH_OSD_UP)) 36 snprintf(str, len, "exists, up"); 37 else if (state & CEPH_OSD_EXISTS) 38 snprintf(str, len, "exists"); 39 else if (state & CEPH_OSD_UP) 40 snprintf(str, len, "up"); 41 else 42 snprintf(str, len, "doesn't exist"); 43 44 return str; 45 } 46 47 /* maps */ 48 49 static int calc_bits_of(unsigned int t) 50 { 51 int b = 0; 52 while (t) { 53 t = t >> 1; 54 b++; 55 } 56 return b; 57 } 58 59 /* 60 * the foo_mask is the smallest value 2^n-1 that is >= foo. 61 */ 62 static void calc_pg_masks(struct ceph_pg_pool_info *pi) 63 { 64 pi->pg_num_mask = (1 << calc_bits_of(pi->pg_num-1)) - 1; 65 pi->pgp_num_mask = (1 << calc_bits_of(pi->pgp_num-1)) - 1; 66 } 67 68 /* 69 * decode crush map 70 */ 71 static int crush_decode_uniform_bucket(void **p, void *end, 72 struct crush_bucket_uniform *b) 73 { 74 dout("crush_decode_uniform_bucket %p to %p\n", *p, end); 75 ceph_decode_need(p, end, (1+b->h.size) * sizeof(u32), bad); 76 b->item_weight = ceph_decode_32(p); 77 return 0; 78 bad: 79 return -EINVAL; 80 } 81 82 static int crush_decode_list_bucket(void **p, void *end, 83 struct crush_bucket_list *b) 84 { 85 int j; 86 dout("crush_decode_list_bucket %p to %p\n", *p, end); 87 b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS); 88 if (b->item_weights == NULL) 89 return -ENOMEM; 90 b->sum_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS); 91 if (b->sum_weights == NULL) 92 return -ENOMEM; 93 ceph_decode_need(p, end, 2 * b->h.size * sizeof(u32), bad); 94 for (j = 0; j < b->h.size; j++) { 95 b->item_weights[j] = ceph_decode_32(p); 96 b->sum_weights[j] = ceph_decode_32(p); 97 } 98 return 0; 99 bad: 100 return -EINVAL; 101 } 102 103 static int crush_decode_tree_bucket(void **p, void *end, 104 struct crush_bucket_tree *b) 105 { 106 int j; 107 dout("crush_decode_tree_bucket %p to %p\n", *p, end); 108 ceph_decode_8_safe(p, end, b->num_nodes, bad); 109 b->node_weights = kcalloc(b->num_nodes, sizeof(u32), GFP_NOFS); 110 if (b->node_weights == NULL) 111 return -ENOMEM; 112 ceph_decode_need(p, end, b->num_nodes * sizeof(u32), bad); 113 for (j = 0; j < b->num_nodes; j++) 114 b->node_weights[j] = ceph_decode_32(p); 115 return 0; 116 bad: 117 return -EINVAL; 118 } 119 120 static int crush_decode_straw_bucket(void **p, void *end, 121 struct crush_bucket_straw *b) 122 { 123 int j; 124 dout("crush_decode_straw_bucket %p to %p\n", *p, end); 125 b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS); 126 if (b->item_weights == NULL) 127 return -ENOMEM; 128 b->straws = kcalloc(b->h.size, sizeof(u32), GFP_NOFS); 129 if (b->straws == NULL) 130 return -ENOMEM; 131 ceph_decode_need(p, end, 2 * b->h.size * sizeof(u32), bad); 132 for (j = 0; j < b->h.size; j++) { 133 b->item_weights[j] = ceph_decode_32(p); 134 b->straws[j] = ceph_decode_32(p); 135 } 136 return 0; 137 bad: 138 return -EINVAL; 139 } 140 141 static int crush_decode_straw2_bucket(void **p, void *end, 142 struct crush_bucket_straw2 *b) 143 { 144 int j; 145 dout("crush_decode_straw2_bucket %p to %p\n", *p, end); 146 b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS); 147 if (b->item_weights == NULL) 148 return -ENOMEM; 149 ceph_decode_need(p, end, b->h.size * sizeof(u32), bad); 150 for (j = 0; j < b->h.size; j++) 151 b->item_weights[j] = ceph_decode_32(p); 152 return 0; 153 bad: 154 return -EINVAL; 155 } 156 157 struct crush_name_node { 158 struct rb_node cn_node; 159 int cn_id; 160 char cn_name[]; 161 }; 162 163 static struct crush_name_node *alloc_crush_name(size_t name_len) 164 { 165 struct crush_name_node *cn; 166 167 cn = kmalloc(sizeof(*cn) + name_len + 1, GFP_NOIO); 168 if (!cn) 169 return NULL; 170 171 RB_CLEAR_NODE(&cn->cn_node); 172 return cn; 173 } 174 175 static void free_crush_name(struct crush_name_node *cn) 176 { 177 WARN_ON(!RB_EMPTY_NODE(&cn->cn_node)); 178 179 kfree(cn); 180 } 181 182 DEFINE_RB_FUNCS(crush_name, struct crush_name_node, cn_id, cn_node) 183 184 static int decode_crush_names(void **p, void *end, struct rb_root *root) 185 { 186 u32 n; 187 188 ceph_decode_32_safe(p, end, n, e_inval); 189 while (n--) { 190 struct crush_name_node *cn; 191 int id; 192 u32 name_len; 193 194 ceph_decode_32_safe(p, end, id, e_inval); 195 ceph_decode_32_safe(p, end, name_len, e_inval); 196 ceph_decode_need(p, end, name_len, e_inval); 197 198 cn = alloc_crush_name(name_len); 199 if (!cn) 200 return -ENOMEM; 201 202 cn->cn_id = id; 203 memcpy(cn->cn_name, *p, name_len); 204 cn->cn_name[name_len] = '\0'; 205 *p += name_len; 206 207 if (!__insert_crush_name(root, cn)) { 208 free_crush_name(cn); 209 return -EEXIST; 210 } 211 } 212 213 return 0; 214 215 e_inval: 216 return -EINVAL; 217 } 218 219 void clear_crush_names(struct rb_root *root) 220 { 221 while (!RB_EMPTY_ROOT(root)) { 222 struct crush_name_node *cn = 223 rb_entry(rb_first(root), struct crush_name_node, cn_node); 224 225 erase_crush_name(root, cn); 226 free_crush_name(cn); 227 } 228 } 229 230 static struct crush_choose_arg_map *alloc_choose_arg_map(void) 231 { 232 struct crush_choose_arg_map *arg_map; 233 234 arg_map = kzalloc(sizeof(*arg_map), GFP_NOIO); 235 if (!arg_map) 236 return NULL; 237 238 RB_CLEAR_NODE(&arg_map->node); 239 return arg_map; 240 } 241 242 static void free_choose_arg_map(struct crush_choose_arg_map *arg_map) 243 { 244 if (arg_map) { 245 int i, j; 246 247 WARN_ON(!RB_EMPTY_NODE(&arg_map->node)); 248 249 for (i = 0; i < arg_map->size; i++) { 250 struct crush_choose_arg *arg = &arg_map->args[i]; 251 252 for (j = 0; j < arg->weight_set_size; j++) 253 kfree(arg->weight_set[j].weights); 254 kfree(arg->weight_set); 255 kfree(arg->ids); 256 } 257 kfree(arg_map->args); 258 kfree(arg_map); 259 } 260 } 261 262 DEFINE_RB_FUNCS(choose_arg_map, struct crush_choose_arg_map, choose_args_index, 263 node); 264 265 void clear_choose_args(struct crush_map *c) 266 { 267 while (!RB_EMPTY_ROOT(&c->choose_args)) { 268 struct crush_choose_arg_map *arg_map = 269 rb_entry(rb_first(&c->choose_args), 270 struct crush_choose_arg_map, node); 271 272 erase_choose_arg_map(&c->choose_args, arg_map); 273 free_choose_arg_map(arg_map); 274 } 275 } 276 277 static u32 *decode_array_32_alloc(void **p, void *end, u32 *plen) 278 { 279 u32 *a = NULL; 280 u32 len; 281 int ret; 282 283 ceph_decode_32_safe(p, end, len, e_inval); 284 if (len) { 285 u32 i; 286 287 a = kmalloc_array(len, sizeof(u32), GFP_NOIO); 288 if (!a) { 289 ret = -ENOMEM; 290 goto fail; 291 } 292 293 ceph_decode_need(p, end, len * sizeof(u32), e_inval); 294 for (i = 0; i < len; i++) 295 a[i] = ceph_decode_32(p); 296 } 297 298 *plen = len; 299 return a; 300 301 e_inval: 302 ret = -EINVAL; 303 fail: 304 kfree(a); 305 return ERR_PTR(ret); 306 } 307 308 /* 309 * Assumes @arg is zero-initialized. 310 */ 311 static int decode_choose_arg(void **p, void *end, struct crush_choose_arg *arg) 312 { 313 int ret; 314 315 ceph_decode_32_safe(p, end, arg->weight_set_size, e_inval); 316 if (arg->weight_set_size) { 317 u32 i; 318 319 arg->weight_set = kmalloc_array(arg->weight_set_size, 320 sizeof(*arg->weight_set), 321 GFP_NOIO); 322 if (!arg->weight_set) 323 return -ENOMEM; 324 325 for (i = 0; i < arg->weight_set_size; i++) { 326 struct crush_weight_set *w = &arg->weight_set[i]; 327 328 w->weights = decode_array_32_alloc(p, end, &w->size); 329 if (IS_ERR(w->weights)) { 330 ret = PTR_ERR(w->weights); 331 w->weights = NULL; 332 return ret; 333 } 334 } 335 } 336 337 arg->ids = decode_array_32_alloc(p, end, &arg->ids_size); 338 if (IS_ERR(arg->ids)) { 339 ret = PTR_ERR(arg->ids); 340 arg->ids = NULL; 341 return ret; 342 } 343 344 return 0; 345 346 e_inval: 347 return -EINVAL; 348 } 349 350 static int decode_choose_args(void **p, void *end, struct crush_map *c) 351 { 352 struct crush_choose_arg_map *arg_map = NULL; 353 u32 num_choose_arg_maps, num_buckets; 354 int ret; 355 356 ceph_decode_32_safe(p, end, num_choose_arg_maps, e_inval); 357 while (num_choose_arg_maps--) { 358 arg_map = alloc_choose_arg_map(); 359 if (!arg_map) { 360 ret = -ENOMEM; 361 goto fail; 362 } 363 364 ceph_decode_64_safe(p, end, arg_map->choose_args_index, 365 e_inval); 366 arg_map->size = c->max_buckets; 367 arg_map->args = kcalloc(arg_map->size, sizeof(*arg_map->args), 368 GFP_NOIO); 369 if (!arg_map->args) { 370 ret = -ENOMEM; 371 goto fail; 372 } 373 374 ceph_decode_32_safe(p, end, num_buckets, e_inval); 375 while (num_buckets--) { 376 struct crush_choose_arg *arg; 377 u32 bucket_index; 378 379 ceph_decode_32_safe(p, end, bucket_index, e_inval); 380 if (bucket_index >= arg_map->size) 381 goto e_inval; 382 383 arg = &arg_map->args[bucket_index]; 384 ret = decode_choose_arg(p, end, arg); 385 if (ret) 386 goto fail; 387 388 if (arg->ids_size && 389 arg->ids_size != c->buckets[bucket_index]->size) 390 goto e_inval; 391 } 392 393 insert_choose_arg_map(&c->choose_args, arg_map); 394 } 395 396 return 0; 397 398 e_inval: 399 ret = -EINVAL; 400 fail: 401 free_choose_arg_map(arg_map); 402 return ret; 403 } 404 405 static void crush_finalize(struct crush_map *c) 406 { 407 __s32 b; 408 409 /* Space for the array of pointers to per-bucket workspace */ 410 c->working_size = sizeof(struct crush_work) + 411 c->max_buckets * sizeof(struct crush_work_bucket *); 412 413 for (b = 0; b < c->max_buckets; b++) { 414 if (!c->buckets[b]) 415 continue; 416 417 switch (c->buckets[b]->alg) { 418 default: 419 /* 420 * The base case, permutation variables and 421 * the pointer to the permutation array. 422 */ 423 c->working_size += sizeof(struct crush_work_bucket); 424 break; 425 } 426 /* Every bucket has a permutation array. */ 427 c->working_size += c->buckets[b]->size * sizeof(__u32); 428 } 429 } 430 431 static struct crush_map *crush_decode(void *pbyval, void *end) 432 { 433 struct crush_map *c; 434 int err; 435 int i, j; 436 void **p = &pbyval; 437 void *start = pbyval; 438 u32 magic; 439 440 dout("crush_decode %p to %p len %d\n", *p, end, (int)(end - *p)); 441 442 c = kzalloc(sizeof(*c), GFP_NOFS); 443 if (c == NULL) 444 return ERR_PTR(-ENOMEM); 445 446 c->type_names = RB_ROOT; 447 c->names = RB_ROOT; 448 c->choose_args = RB_ROOT; 449 450 /* set tunables to default values */ 451 c->choose_local_tries = 2; 452 c->choose_local_fallback_tries = 5; 453 c->choose_total_tries = 19; 454 c->chooseleaf_descend_once = 0; 455 456 ceph_decode_need(p, end, 4*sizeof(u32), bad); 457 magic = ceph_decode_32(p); 458 if (magic != CRUSH_MAGIC) { 459 pr_err("crush_decode magic %x != current %x\n", 460 (unsigned int)magic, (unsigned int)CRUSH_MAGIC); 461 goto bad; 462 } 463 c->max_buckets = ceph_decode_32(p); 464 c->max_rules = ceph_decode_32(p); 465 c->max_devices = ceph_decode_32(p); 466 467 c->buckets = kcalloc(c->max_buckets, sizeof(*c->buckets), GFP_NOFS); 468 if (c->buckets == NULL) 469 goto badmem; 470 c->rules = kcalloc(c->max_rules, sizeof(*c->rules), GFP_NOFS); 471 if (c->rules == NULL) 472 goto badmem; 473 474 /* buckets */ 475 for (i = 0; i < c->max_buckets; i++) { 476 int size = 0; 477 u32 alg; 478 struct crush_bucket *b; 479 480 ceph_decode_32_safe(p, end, alg, bad); 481 if (alg == 0) { 482 c->buckets[i] = NULL; 483 continue; 484 } 485 dout("crush_decode bucket %d off %x %p to %p\n", 486 i, (int)(*p-start), *p, end); 487 488 switch (alg) { 489 case CRUSH_BUCKET_UNIFORM: 490 size = sizeof(struct crush_bucket_uniform); 491 break; 492 case CRUSH_BUCKET_LIST: 493 size = sizeof(struct crush_bucket_list); 494 break; 495 case CRUSH_BUCKET_TREE: 496 size = sizeof(struct crush_bucket_tree); 497 break; 498 case CRUSH_BUCKET_STRAW: 499 size = sizeof(struct crush_bucket_straw); 500 break; 501 case CRUSH_BUCKET_STRAW2: 502 size = sizeof(struct crush_bucket_straw2); 503 break; 504 default: 505 goto bad; 506 } 507 BUG_ON(size == 0); 508 b = c->buckets[i] = kzalloc(size, GFP_NOFS); 509 if (b == NULL) 510 goto badmem; 511 512 ceph_decode_need(p, end, 4*sizeof(u32), bad); 513 b->id = ceph_decode_32(p); 514 b->type = ceph_decode_16(p); 515 b->alg = ceph_decode_8(p); 516 b->hash = ceph_decode_8(p); 517 b->weight = ceph_decode_32(p); 518 b->size = ceph_decode_32(p); 519 520 dout("crush_decode bucket size %d off %x %p to %p\n", 521 b->size, (int)(*p-start), *p, end); 522 523 b->items = kcalloc(b->size, sizeof(__s32), GFP_NOFS); 524 if (b->items == NULL) 525 goto badmem; 526 527 ceph_decode_need(p, end, b->size*sizeof(u32), bad); 528 for (j = 0; j < b->size; j++) 529 b->items[j] = ceph_decode_32(p); 530 531 switch (b->alg) { 532 case CRUSH_BUCKET_UNIFORM: 533 err = crush_decode_uniform_bucket(p, end, 534 (struct crush_bucket_uniform *)b); 535 if (err < 0) 536 goto fail; 537 break; 538 case CRUSH_BUCKET_LIST: 539 err = crush_decode_list_bucket(p, end, 540 (struct crush_bucket_list *)b); 541 if (err < 0) 542 goto fail; 543 break; 544 case CRUSH_BUCKET_TREE: 545 err = crush_decode_tree_bucket(p, end, 546 (struct crush_bucket_tree *)b); 547 if (err < 0) 548 goto fail; 549 break; 550 case CRUSH_BUCKET_STRAW: 551 err = crush_decode_straw_bucket(p, end, 552 (struct crush_bucket_straw *)b); 553 if (err < 0) 554 goto fail; 555 break; 556 case CRUSH_BUCKET_STRAW2: 557 err = crush_decode_straw2_bucket(p, end, 558 (struct crush_bucket_straw2 *)b); 559 if (err < 0) 560 goto fail; 561 break; 562 } 563 } 564 565 /* rules */ 566 dout("rule vec is %p\n", c->rules); 567 for (i = 0; i < c->max_rules; i++) { 568 u32 yes; 569 struct crush_rule *r; 570 571 ceph_decode_32_safe(p, end, yes, bad); 572 if (!yes) { 573 dout("crush_decode NO rule %d off %x %p to %p\n", 574 i, (int)(*p-start), *p, end); 575 c->rules[i] = NULL; 576 continue; 577 } 578 579 dout("crush_decode rule %d off %x %p to %p\n", 580 i, (int)(*p-start), *p, end); 581 582 /* len */ 583 ceph_decode_32_safe(p, end, yes, bad); 584 #if BITS_PER_LONG == 32 585 if (yes > (ULONG_MAX - sizeof(*r)) 586 / sizeof(struct crush_rule_step)) 587 goto bad; 588 #endif 589 r = kmalloc(struct_size(r, steps, yes), GFP_NOFS); 590 if (r == NULL) 591 goto badmem; 592 dout(" rule %d is at %p\n", i, r); 593 c->rules[i] = r; 594 r->len = yes; 595 ceph_decode_copy_safe(p, end, &r->mask, 4, bad); /* 4 u8's */ 596 ceph_decode_need(p, end, r->len*3*sizeof(u32), bad); 597 for (j = 0; j < r->len; j++) { 598 r->steps[j].op = ceph_decode_32(p); 599 r->steps[j].arg1 = ceph_decode_32(p); 600 r->steps[j].arg2 = ceph_decode_32(p); 601 } 602 } 603 604 err = decode_crush_names(p, end, &c->type_names); 605 if (err) 606 goto fail; 607 608 err = decode_crush_names(p, end, &c->names); 609 if (err) 610 goto fail; 611 612 ceph_decode_skip_map(p, end, 32, string, bad); /* rule_name_map */ 613 614 /* tunables */ 615 ceph_decode_need(p, end, 3*sizeof(u32), done); 616 c->choose_local_tries = ceph_decode_32(p); 617 c->choose_local_fallback_tries = ceph_decode_32(p); 618 c->choose_total_tries = ceph_decode_32(p); 619 dout("crush decode tunable choose_local_tries = %d\n", 620 c->choose_local_tries); 621 dout("crush decode tunable choose_local_fallback_tries = %d\n", 622 c->choose_local_fallback_tries); 623 dout("crush decode tunable choose_total_tries = %d\n", 624 c->choose_total_tries); 625 626 ceph_decode_need(p, end, sizeof(u32), done); 627 c->chooseleaf_descend_once = ceph_decode_32(p); 628 dout("crush decode tunable chooseleaf_descend_once = %d\n", 629 c->chooseleaf_descend_once); 630 631 ceph_decode_need(p, end, sizeof(u8), done); 632 c->chooseleaf_vary_r = ceph_decode_8(p); 633 dout("crush decode tunable chooseleaf_vary_r = %d\n", 634 c->chooseleaf_vary_r); 635 636 /* skip straw_calc_version, allowed_bucket_algs */ 637 ceph_decode_need(p, end, sizeof(u8) + sizeof(u32), done); 638 *p += sizeof(u8) + sizeof(u32); 639 640 ceph_decode_need(p, end, sizeof(u8), done); 641 c->chooseleaf_stable = ceph_decode_8(p); 642 dout("crush decode tunable chooseleaf_stable = %d\n", 643 c->chooseleaf_stable); 644 645 if (*p != end) { 646 /* class_map */ 647 ceph_decode_skip_map(p, end, 32, 32, bad); 648 /* class_name */ 649 ceph_decode_skip_map(p, end, 32, string, bad); 650 /* class_bucket */ 651 ceph_decode_skip_map_of_map(p, end, 32, 32, 32, bad); 652 } 653 654 if (*p != end) { 655 err = decode_choose_args(p, end, c); 656 if (err) 657 goto fail; 658 } 659 660 done: 661 crush_finalize(c); 662 dout("crush_decode success\n"); 663 return c; 664 665 badmem: 666 err = -ENOMEM; 667 fail: 668 dout("crush_decode fail %d\n", err); 669 crush_destroy(c); 670 return ERR_PTR(err); 671 672 bad: 673 err = -EINVAL; 674 goto fail; 675 } 676 677 int ceph_pg_compare(const struct ceph_pg *lhs, const struct ceph_pg *rhs) 678 { 679 if (lhs->pool < rhs->pool) 680 return -1; 681 if (lhs->pool > rhs->pool) 682 return 1; 683 if (lhs->seed < rhs->seed) 684 return -1; 685 if (lhs->seed > rhs->seed) 686 return 1; 687 688 return 0; 689 } 690 691 int ceph_spg_compare(const struct ceph_spg *lhs, const struct ceph_spg *rhs) 692 { 693 int ret; 694 695 ret = ceph_pg_compare(&lhs->pgid, &rhs->pgid); 696 if (ret) 697 return ret; 698 699 if (lhs->shard < rhs->shard) 700 return -1; 701 if (lhs->shard > rhs->shard) 702 return 1; 703 704 return 0; 705 } 706 707 static struct ceph_pg_mapping *alloc_pg_mapping(size_t payload_len) 708 { 709 struct ceph_pg_mapping *pg; 710 711 pg = kmalloc(sizeof(*pg) + payload_len, GFP_NOIO); 712 if (!pg) 713 return NULL; 714 715 RB_CLEAR_NODE(&pg->node); 716 return pg; 717 } 718 719 static void free_pg_mapping(struct ceph_pg_mapping *pg) 720 { 721 WARN_ON(!RB_EMPTY_NODE(&pg->node)); 722 723 kfree(pg); 724 } 725 726 /* 727 * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid 728 * to a set of osds) and primary_temp (explicit primary setting) 729 */ 730 DEFINE_RB_FUNCS2(pg_mapping, struct ceph_pg_mapping, pgid, ceph_pg_compare, 731 RB_BYPTR, const struct ceph_pg *, node) 732 733 /* 734 * rbtree of pg pool info 735 */ 736 DEFINE_RB_FUNCS(pg_pool, struct ceph_pg_pool_info, id, node) 737 738 struct ceph_pg_pool_info *ceph_pg_pool_by_id(struct ceph_osdmap *map, u64 id) 739 { 740 return lookup_pg_pool(&map->pg_pools, id); 741 } 742 743 const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id) 744 { 745 struct ceph_pg_pool_info *pi; 746 747 if (id == CEPH_NOPOOL) 748 return NULL; 749 750 if (WARN_ON_ONCE(id > (u64) INT_MAX)) 751 return NULL; 752 753 pi = lookup_pg_pool(&map->pg_pools, id); 754 return pi ? pi->name : NULL; 755 } 756 EXPORT_SYMBOL(ceph_pg_pool_name_by_id); 757 758 int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name) 759 { 760 struct rb_node *rbp; 761 762 for (rbp = rb_first(&map->pg_pools); rbp; rbp = rb_next(rbp)) { 763 struct ceph_pg_pool_info *pi = 764 rb_entry(rbp, struct ceph_pg_pool_info, node); 765 if (pi->name && strcmp(pi->name, name) == 0) 766 return pi->id; 767 } 768 return -ENOENT; 769 } 770 EXPORT_SYMBOL(ceph_pg_poolid_by_name); 771 772 u64 ceph_pg_pool_flags(struct ceph_osdmap *map, u64 id) 773 { 774 struct ceph_pg_pool_info *pi; 775 776 pi = lookup_pg_pool(&map->pg_pools, id); 777 return pi ? pi->flags : 0; 778 } 779 EXPORT_SYMBOL(ceph_pg_pool_flags); 780 781 static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi) 782 { 783 erase_pg_pool(root, pi); 784 kfree(pi->name); 785 kfree(pi); 786 } 787 788 static int decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi) 789 { 790 u8 ev, cv; 791 unsigned len, num; 792 void *pool_end; 793 794 ceph_decode_need(p, end, 2 + 4, bad); 795 ev = ceph_decode_8(p); /* encoding version */ 796 cv = ceph_decode_8(p); /* compat version */ 797 if (ev < 5) { 798 pr_warn("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv); 799 return -EINVAL; 800 } 801 if (cv > 9) { 802 pr_warn("got v %d cv %d > 9 of ceph_pg_pool\n", ev, cv); 803 return -EINVAL; 804 } 805 len = ceph_decode_32(p); 806 ceph_decode_need(p, end, len, bad); 807 pool_end = *p + len; 808 809 ceph_decode_need(p, end, 4 + 4 + 4, bad); 810 pi->type = ceph_decode_8(p); 811 pi->size = ceph_decode_8(p); 812 pi->crush_ruleset = ceph_decode_8(p); 813 pi->object_hash = ceph_decode_8(p); 814 pi->pg_num = ceph_decode_32(p); 815 pi->pgp_num = ceph_decode_32(p); 816 817 /* lpg*, last_change, snap_seq, snap_epoch */ 818 ceph_decode_skip_n(p, end, 8 + 4 + 8 + 4, bad); 819 820 /* skip snaps */ 821 ceph_decode_32_safe(p, end, num, bad); 822 while (num--) { 823 /* snapid key, pool snap (with versions) */ 824 ceph_decode_skip_n(p, end, 8 + 2, bad); 825 ceph_decode_skip_string(p, end, bad); 826 } 827 828 /* removed_snaps */ 829 ceph_decode_skip_map(p, end, 64, 64, bad); 830 831 ceph_decode_need(p, end, 8 + 8 + 4, bad); 832 *p += 8; /* skip auid */ 833 pi->flags = ceph_decode_64(p); 834 *p += 4; /* skip crash_replay_interval */ 835 836 if (ev >= 7) 837 ceph_decode_8_safe(p, end, pi->min_size, bad); 838 else 839 pi->min_size = pi->size - pi->size / 2; 840 841 if (ev >= 8) 842 /* quota_max_* */ 843 ceph_decode_skip_n(p, end, 8 + 8, bad); 844 845 if (ev >= 9) { 846 /* tiers */ 847 ceph_decode_skip_set(p, end, 64, bad); 848 849 ceph_decode_need(p, end, 8 + 1 + 8 + 8, bad); 850 *p += 8; /* skip tier_of */ 851 *p += 1; /* skip cache_mode */ 852 pi->read_tier = ceph_decode_64(p); 853 pi->write_tier = ceph_decode_64(p); 854 } else { 855 pi->read_tier = -1; 856 pi->write_tier = -1; 857 } 858 859 if (ev >= 10) 860 /* properties */ 861 ceph_decode_skip_map(p, end, string, string, bad); 862 863 if (ev >= 11) { 864 /* hit_set_params (with versions) */ 865 ceph_decode_skip_n(p, end, 2, bad); 866 ceph_decode_skip_string(p, end, bad); 867 868 /* hit_set_period, hit_set_count */ 869 ceph_decode_skip_n(p, end, 4 + 4, bad); 870 } 871 872 if (ev >= 12) 873 /* stripe_width */ 874 ceph_decode_skip_32(p, end, bad); 875 876 if (ev >= 13) 877 /* target_max_*, cache_target_*, cache_min_* */ 878 ceph_decode_skip_n(p, end, 16 + 8 + 8, bad); 879 880 if (ev >= 14) 881 /* erasure_code_profile */ 882 ceph_decode_skip_string(p, end, bad); 883 884 /* 885 * last_force_op_resend_preluminous, will be overridden if the 886 * map was encoded with RESEND_ON_SPLIT 887 */ 888 if (ev >= 15) 889 ceph_decode_32_safe(p, end, pi->last_force_request_resend, bad); 890 else 891 pi->last_force_request_resend = 0; 892 893 if (ev >= 16) 894 /* min_read_recency_for_promote */ 895 ceph_decode_skip_32(p, end, bad); 896 897 if (ev >= 17) 898 /* expected_num_objects */ 899 ceph_decode_skip_64(p, end, bad); 900 901 if (ev >= 19) 902 /* cache_target_dirty_high_ratio_micro */ 903 ceph_decode_skip_32(p, end, bad); 904 905 if (ev >= 20) 906 /* min_write_recency_for_promote */ 907 ceph_decode_skip_32(p, end, bad); 908 909 if (ev >= 21) 910 /* use_gmt_hitset */ 911 ceph_decode_skip_8(p, end, bad); 912 913 if (ev >= 22) 914 /* fast_read */ 915 ceph_decode_skip_8(p, end, bad); 916 917 if (ev >= 23) 918 /* hit_set_grade_decay_rate, hit_set_search_last_n */ 919 ceph_decode_skip_n(p, end, 4 + 4, bad); 920 921 if (ev >= 24) { 922 /* opts (with versions) */ 923 ceph_decode_skip_n(p, end, 2, bad); 924 ceph_decode_skip_string(p, end, bad); 925 } 926 927 if (ev >= 25) 928 ceph_decode_32_safe(p, end, pi->last_force_request_resend, bad); 929 930 /* ignore the rest */ 931 932 *p = pool_end; 933 calc_pg_masks(pi); 934 return 0; 935 936 bad: 937 return -EINVAL; 938 } 939 940 static int decode_pool_names(void **p, void *end, struct ceph_osdmap *map) 941 { 942 struct ceph_pg_pool_info *pi; 943 u32 num, len; 944 u64 pool; 945 946 ceph_decode_32_safe(p, end, num, bad); 947 dout(" %d pool names\n", num); 948 while (num--) { 949 ceph_decode_64_safe(p, end, pool, bad); 950 ceph_decode_32_safe(p, end, len, bad); 951 dout(" pool %llu len %d\n", pool, len); 952 ceph_decode_need(p, end, len, bad); 953 pi = lookup_pg_pool(&map->pg_pools, pool); 954 if (pi) { 955 char *name = kstrndup(*p, len, GFP_NOFS); 956 957 if (!name) 958 return -ENOMEM; 959 kfree(pi->name); 960 pi->name = name; 961 dout(" name is %s\n", pi->name); 962 } 963 *p += len; 964 } 965 return 0; 966 967 bad: 968 return -EINVAL; 969 } 970 971 /* 972 * CRUSH workspaces 973 * 974 * workspace_manager framework borrowed from fs/btrfs/compression.c. 975 * Two simplifications: there is only one type of workspace and there 976 * is always at least one workspace. 977 */ 978 static struct crush_work *alloc_workspace(const struct crush_map *c) 979 { 980 struct crush_work *work; 981 size_t work_size; 982 983 WARN_ON(!c->working_size); 984 work_size = crush_work_size(c, CEPH_PG_MAX_SIZE); 985 dout("%s work_size %zu bytes\n", __func__, work_size); 986 987 work = kvmalloc(work_size, GFP_NOIO); 988 if (!work) 989 return NULL; 990 991 INIT_LIST_HEAD(&work->item); 992 crush_init_workspace(c, work); 993 return work; 994 } 995 996 static void free_workspace(struct crush_work *work) 997 { 998 WARN_ON(!list_empty(&work->item)); 999 kvfree(work); 1000 } 1001 1002 static void init_workspace_manager(struct workspace_manager *wsm) 1003 { 1004 INIT_LIST_HEAD(&wsm->idle_ws); 1005 spin_lock_init(&wsm->ws_lock); 1006 atomic_set(&wsm->total_ws, 0); 1007 wsm->free_ws = 0; 1008 init_waitqueue_head(&wsm->ws_wait); 1009 } 1010 1011 static void add_initial_workspace(struct workspace_manager *wsm, 1012 struct crush_work *work) 1013 { 1014 WARN_ON(!list_empty(&wsm->idle_ws)); 1015 1016 list_add(&work->item, &wsm->idle_ws); 1017 atomic_set(&wsm->total_ws, 1); 1018 wsm->free_ws = 1; 1019 } 1020 1021 static void cleanup_workspace_manager(struct workspace_manager *wsm) 1022 { 1023 struct crush_work *work; 1024 1025 while (!list_empty(&wsm->idle_ws)) { 1026 work = list_first_entry(&wsm->idle_ws, struct crush_work, 1027 item); 1028 list_del_init(&work->item); 1029 free_workspace(work); 1030 } 1031 atomic_set(&wsm->total_ws, 0); 1032 wsm->free_ws = 0; 1033 } 1034 1035 /* 1036 * Finds an available workspace or allocates a new one. If it's not 1037 * possible to allocate a new one, waits until there is one. 1038 */ 1039 static struct crush_work *get_workspace(struct workspace_manager *wsm, 1040 const struct crush_map *c) 1041 { 1042 struct crush_work *work; 1043 int cpus = num_online_cpus(); 1044 1045 again: 1046 spin_lock(&wsm->ws_lock); 1047 if (!list_empty(&wsm->idle_ws)) { 1048 work = list_first_entry(&wsm->idle_ws, struct crush_work, 1049 item); 1050 list_del_init(&work->item); 1051 wsm->free_ws--; 1052 spin_unlock(&wsm->ws_lock); 1053 return work; 1054 1055 } 1056 if (atomic_read(&wsm->total_ws) > cpus) { 1057 DEFINE_WAIT(wait); 1058 1059 spin_unlock(&wsm->ws_lock); 1060 prepare_to_wait(&wsm->ws_wait, &wait, TASK_UNINTERRUPTIBLE); 1061 if (atomic_read(&wsm->total_ws) > cpus && !wsm->free_ws) 1062 schedule(); 1063 finish_wait(&wsm->ws_wait, &wait); 1064 goto again; 1065 } 1066 atomic_inc(&wsm->total_ws); 1067 spin_unlock(&wsm->ws_lock); 1068 1069 work = alloc_workspace(c); 1070 if (!work) { 1071 atomic_dec(&wsm->total_ws); 1072 wake_up(&wsm->ws_wait); 1073 1074 /* 1075 * Do not return the error but go back to waiting. We 1076 * have the initial workspace and the CRUSH computation 1077 * time is bounded so we will get it eventually. 1078 */ 1079 WARN_ON(atomic_read(&wsm->total_ws) < 1); 1080 goto again; 1081 } 1082 return work; 1083 } 1084 1085 /* 1086 * Puts a workspace back on the list or frees it if we have enough 1087 * idle ones sitting around. 1088 */ 1089 static void put_workspace(struct workspace_manager *wsm, 1090 struct crush_work *work) 1091 { 1092 spin_lock(&wsm->ws_lock); 1093 if (wsm->free_ws <= num_online_cpus()) { 1094 list_add(&work->item, &wsm->idle_ws); 1095 wsm->free_ws++; 1096 spin_unlock(&wsm->ws_lock); 1097 goto wake; 1098 } 1099 spin_unlock(&wsm->ws_lock); 1100 1101 free_workspace(work); 1102 atomic_dec(&wsm->total_ws); 1103 wake: 1104 if (wq_has_sleeper(&wsm->ws_wait)) 1105 wake_up(&wsm->ws_wait); 1106 } 1107 1108 /* 1109 * osd map 1110 */ 1111 struct ceph_osdmap *ceph_osdmap_alloc(void) 1112 { 1113 struct ceph_osdmap *map; 1114 1115 map = kzalloc(sizeof(*map), GFP_NOIO); 1116 if (!map) 1117 return NULL; 1118 1119 map->pg_pools = RB_ROOT; 1120 map->pool_max = -1; 1121 map->pg_temp = RB_ROOT; 1122 map->primary_temp = RB_ROOT; 1123 map->pg_upmap = RB_ROOT; 1124 map->pg_upmap_items = RB_ROOT; 1125 1126 init_workspace_manager(&map->crush_wsm); 1127 1128 return map; 1129 } 1130 1131 void ceph_osdmap_destroy(struct ceph_osdmap *map) 1132 { 1133 dout("osdmap_destroy %p\n", map); 1134 1135 if (map->crush) 1136 crush_destroy(map->crush); 1137 cleanup_workspace_manager(&map->crush_wsm); 1138 1139 while (!RB_EMPTY_ROOT(&map->pg_temp)) { 1140 struct ceph_pg_mapping *pg = 1141 rb_entry(rb_first(&map->pg_temp), 1142 struct ceph_pg_mapping, node); 1143 erase_pg_mapping(&map->pg_temp, pg); 1144 free_pg_mapping(pg); 1145 } 1146 while (!RB_EMPTY_ROOT(&map->primary_temp)) { 1147 struct ceph_pg_mapping *pg = 1148 rb_entry(rb_first(&map->primary_temp), 1149 struct ceph_pg_mapping, node); 1150 erase_pg_mapping(&map->primary_temp, pg); 1151 free_pg_mapping(pg); 1152 } 1153 while (!RB_EMPTY_ROOT(&map->pg_upmap)) { 1154 struct ceph_pg_mapping *pg = 1155 rb_entry(rb_first(&map->pg_upmap), 1156 struct ceph_pg_mapping, node); 1157 rb_erase(&pg->node, &map->pg_upmap); 1158 kfree(pg); 1159 } 1160 while (!RB_EMPTY_ROOT(&map->pg_upmap_items)) { 1161 struct ceph_pg_mapping *pg = 1162 rb_entry(rb_first(&map->pg_upmap_items), 1163 struct ceph_pg_mapping, node); 1164 rb_erase(&pg->node, &map->pg_upmap_items); 1165 kfree(pg); 1166 } 1167 while (!RB_EMPTY_ROOT(&map->pg_pools)) { 1168 struct ceph_pg_pool_info *pi = 1169 rb_entry(rb_first(&map->pg_pools), 1170 struct ceph_pg_pool_info, node); 1171 __remove_pg_pool(&map->pg_pools, pi); 1172 } 1173 kvfree(map->osd_state); 1174 kvfree(map->osd_weight); 1175 kvfree(map->osd_addr); 1176 kvfree(map->osd_primary_affinity); 1177 kfree(map); 1178 } 1179 1180 /* 1181 * Adjust max_osd value, (re)allocate arrays. 1182 * 1183 * The new elements are properly initialized. 1184 */ 1185 static int osdmap_set_max_osd(struct ceph_osdmap *map, u32 max) 1186 { 1187 u32 *state; 1188 u32 *weight; 1189 struct ceph_entity_addr *addr; 1190 u32 to_copy; 1191 int i; 1192 1193 dout("%s old %u new %u\n", __func__, map->max_osd, max); 1194 if (max == map->max_osd) 1195 return 0; 1196 1197 state = kvmalloc(array_size(max, sizeof(*state)), GFP_NOFS); 1198 weight = kvmalloc(array_size(max, sizeof(*weight)), GFP_NOFS); 1199 addr = kvmalloc(array_size(max, sizeof(*addr)), GFP_NOFS); 1200 if (!state || !weight || !addr) { 1201 kvfree(state); 1202 kvfree(weight); 1203 kvfree(addr); 1204 return -ENOMEM; 1205 } 1206 1207 to_copy = min(map->max_osd, max); 1208 if (map->osd_state) { 1209 memcpy(state, map->osd_state, to_copy * sizeof(*state)); 1210 memcpy(weight, map->osd_weight, to_copy * sizeof(*weight)); 1211 memcpy(addr, map->osd_addr, to_copy * sizeof(*addr)); 1212 kvfree(map->osd_state); 1213 kvfree(map->osd_weight); 1214 kvfree(map->osd_addr); 1215 } 1216 1217 map->osd_state = state; 1218 map->osd_weight = weight; 1219 map->osd_addr = addr; 1220 for (i = map->max_osd; i < max; i++) { 1221 map->osd_state[i] = 0; 1222 map->osd_weight[i] = CEPH_OSD_OUT; 1223 memset(map->osd_addr + i, 0, sizeof(*map->osd_addr)); 1224 } 1225 1226 if (map->osd_primary_affinity) { 1227 u32 *affinity; 1228 1229 affinity = kvmalloc(array_size(max, sizeof(*affinity)), 1230 GFP_NOFS); 1231 if (!affinity) 1232 return -ENOMEM; 1233 1234 memcpy(affinity, map->osd_primary_affinity, 1235 to_copy * sizeof(*affinity)); 1236 kvfree(map->osd_primary_affinity); 1237 1238 map->osd_primary_affinity = affinity; 1239 for (i = map->max_osd; i < max; i++) 1240 map->osd_primary_affinity[i] = 1241 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY; 1242 } 1243 1244 map->max_osd = max; 1245 1246 return 0; 1247 } 1248 1249 static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush) 1250 { 1251 struct crush_work *work; 1252 1253 if (IS_ERR(crush)) 1254 return PTR_ERR(crush); 1255 1256 work = alloc_workspace(crush); 1257 if (!work) { 1258 crush_destroy(crush); 1259 return -ENOMEM; 1260 } 1261 1262 if (map->crush) 1263 crush_destroy(map->crush); 1264 cleanup_workspace_manager(&map->crush_wsm); 1265 map->crush = crush; 1266 add_initial_workspace(&map->crush_wsm, work); 1267 return 0; 1268 } 1269 1270 #define OSDMAP_WRAPPER_COMPAT_VER 7 1271 #define OSDMAP_CLIENT_DATA_COMPAT_VER 1 1272 1273 /* 1274 * Return 0 or error. On success, *v is set to 0 for old (v6) osdmaps, 1275 * to struct_v of the client_data section for new (v7 and above) 1276 * osdmaps. 1277 */ 1278 static int get_osdmap_client_data_v(void **p, void *end, 1279 const char *prefix, u8 *v) 1280 { 1281 u8 struct_v; 1282 1283 ceph_decode_8_safe(p, end, struct_v, e_inval); 1284 if (struct_v >= 7) { 1285 u8 struct_compat; 1286 1287 ceph_decode_8_safe(p, end, struct_compat, e_inval); 1288 if (struct_compat > OSDMAP_WRAPPER_COMPAT_VER) { 1289 pr_warn("got v %d cv %d > %d of %s ceph_osdmap\n", 1290 struct_v, struct_compat, 1291 OSDMAP_WRAPPER_COMPAT_VER, prefix); 1292 return -EINVAL; 1293 } 1294 *p += 4; /* ignore wrapper struct_len */ 1295 1296 ceph_decode_8_safe(p, end, struct_v, e_inval); 1297 ceph_decode_8_safe(p, end, struct_compat, e_inval); 1298 if (struct_compat > OSDMAP_CLIENT_DATA_COMPAT_VER) { 1299 pr_warn("got v %d cv %d > %d of %s ceph_osdmap client data\n", 1300 struct_v, struct_compat, 1301 OSDMAP_CLIENT_DATA_COMPAT_VER, prefix); 1302 return -EINVAL; 1303 } 1304 *p += 4; /* ignore client data struct_len */ 1305 } else { 1306 u16 version; 1307 1308 *p -= 1; 1309 ceph_decode_16_safe(p, end, version, e_inval); 1310 if (version < 6) { 1311 pr_warn("got v %d < 6 of %s ceph_osdmap\n", 1312 version, prefix); 1313 return -EINVAL; 1314 } 1315 1316 /* old osdmap encoding */ 1317 struct_v = 0; 1318 } 1319 1320 *v = struct_v; 1321 return 0; 1322 1323 e_inval: 1324 return -EINVAL; 1325 } 1326 1327 static int __decode_pools(void **p, void *end, struct ceph_osdmap *map, 1328 bool incremental) 1329 { 1330 u32 n; 1331 1332 ceph_decode_32_safe(p, end, n, e_inval); 1333 while (n--) { 1334 struct ceph_pg_pool_info *pi; 1335 u64 pool; 1336 int ret; 1337 1338 ceph_decode_64_safe(p, end, pool, e_inval); 1339 1340 pi = lookup_pg_pool(&map->pg_pools, pool); 1341 if (!incremental || !pi) { 1342 pi = kzalloc(sizeof(*pi), GFP_NOFS); 1343 if (!pi) 1344 return -ENOMEM; 1345 1346 RB_CLEAR_NODE(&pi->node); 1347 pi->id = pool; 1348 1349 if (!__insert_pg_pool(&map->pg_pools, pi)) { 1350 kfree(pi); 1351 return -EEXIST; 1352 } 1353 } 1354 1355 ret = decode_pool(p, end, pi); 1356 if (ret) 1357 return ret; 1358 } 1359 1360 return 0; 1361 1362 e_inval: 1363 return -EINVAL; 1364 } 1365 1366 static int decode_pools(void **p, void *end, struct ceph_osdmap *map) 1367 { 1368 return __decode_pools(p, end, map, false); 1369 } 1370 1371 static int decode_new_pools(void **p, void *end, struct ceph_osdmap *map) 1372 { 1373 return __decode_pools(p, end, map, true); 1374 } 1375 1376 typedef struct ceph_pg_mapping *(*decode_mapping_fn_t)(void **, void *, bool); 1377 1378 static int decode_pg_mapping(void **p, void *end, struct rb_root *mapping_root, 1379 decode_mapping_fn_t fn, bool incremental) 1380 { 1381 u32 n; 1382 1383 WARN_ON(!incremental && !fn); 1384 1385 ceph_decode_32_safe(p, end, n, e_inval); 1386 while (n--) { 1387 struct ceph_pg_mapping *pg; 1388 struct ceph_pg pgid; 1389 int ret; 1390 1391 ret = ceph_decode_pgid(p, end, &pgid); 1392 if (ret) 1393 return ret; 1394 1395 pg = lookup_pg_mapping(mapping_root, &pgid); 1396 if (pg) { 1397 WARN_ON(!incremental); 1398 erase_pg_mapping(mapping_root, pg); 1399 free_pg_mapping(pg); 1400 } 1401 1402 if (fn) { 1403 pg = fn(p, end, incremental); 1404 if (IS_ERR(pg)) 1405 return PTR_ERR(pg); 1406 1407 if (pg) { 1408 pg->pgid = pgid; /* struct */ 1409 insert_pg_mapping(mapping_root, pg); 1410 } 1411 } 1412 } 1413 1414 return 0; 1415 1416 e_inval: 1417 return -EINVAL; 1418 } 1419 1420 static struct ceph_pg_mapping *__decode_pg_temp(void **p, void *end, 1421 bool incremental) 1422 { 1423 struct ceph_pg_mapping *pg; 1424 u32 len, i; 1425 1426 ceph_decode_32_safe(p, end, len, e_inval); 1427 if (len == 0 && incremental) 1428 return NULL; /* new_pg_temp: [] to remove */ 1429 if ((size_t)len > (SIZE_MAX - sizeof(*pg)) / sizeof(u32)) 1430 return ERR_PTR(-EINVAL); 1431 1432 ceph_decode_need(p, end, len * sizeof(u32), e_inval); 1433 pg = alloc_pg_mapping(len * sizeof(u32)); 1434 if (!pg) 1435 return ERR_PTR(-ENOMEM); 1436 1437 pg->pg_temp.len = len; 1438 for (i = 0; i < len; i++) 1439 pg->pg_temp.osds[i] = ceph_decode_32(p); 1440 1441 return pg; 1442 1443 e_inval: 1444 return ERR_PTR(-EINVAL); 1445 } 1446 1447 static int decode_pg_temp(void **p, void *end, struct ceph_osdmap *map) 1448 { 1449 return decode_pg_mapping(p, end, &map->pg_temp, __decode_pg_temp, 1450 false); 1451 } 1452 1453 static int decode_new_pg_temp(void **p, void *end, struct ceph_osdmap *map) 1454 { 1455 return decode_pg_mapping(p, end, &map->pg_temp, __decode_pg_temp, 1456 true); 1457 } 1458 1459 static struct ceph_pg_mapping *__decode_primary_temp(void **p, void *end, 1460 bool incremental) 1461 { 1462 struct ceph_pg_mapping *pg; 1463 u32 osd; 1464 1465 ceph_decode_32_safe(p, end, osd, e_inval); 1466 if (osd == (u32)-1 && incremental) 1467 return NULL; /* new_primary_temp: -1 to remove */ 1468 1469 pg = alloc_pg_mapping(0); 1470 if (!pg) 1471 return ERR_PTR(-ENOMEM); 1472 1473 pg->primary_temp.osd = osd; 1474 return pg; 1475 1476 e_inval: 1477 return ERR_PTR(-EINVAL); 1478 } 1479 1480 static int decode_primary_temp(void **p, void *end, struct ceph_osdmap *map) 1481 { 1482 return decode_pg_mapping(p, end, &map->primary_temp, 1483 __decode_primary_temp, false); 1484 } 1485 1486 static int decode_new_primary_temp(void **p, void *end, 1487 struct ceph_osdmap *map) 1488 { 1489 return decode_pg_mapping(p, end, &map->primary_temp, 1490 __decode_primary_temp, true); 1491 } 1492 1493 u32 ceph_get_primary_affinity(struct ceph_osdmap *map, int osd) 1494 { 1495 if (!map->osd_primary_affinity) 1496 return CEPH_OSD_DEFAULT_PRIMARY_AFFINITY; 1497 1498 return map->osd_primary_affinity[osd]; 1499 } 1500 1501 static int set_primary_affinity(struct ceph_osdmap *map, int osd, u32 aff) 1502 { 1503 if (!map->osd_primary_affinity) { 1504 int i; 1505 1506 map->osd_primary_affinity = kvmalloc( 1507 array_size(map->max_osd, sizeof(*map->osd_primary_affinity)), 1508 GFP_NOFS); 1509 if (!map->osd_primary_affinity) 1510 return -ENOMEM; 1511 1512 for (i = 0; i < map->max_osd; i++) 1513 map->osd_primary_affinity[i] = 1514 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY; 1515 } 1516 1517 map->osd_primary_affinity[osd] = aff; 1518 1519 return 0; 1520 } 1521 1522 static int decode_primary_affinity(void **p, void *end, 1523 struct ceph_osdmap *map) 1524 { 1525 u32 len, i; 1526 1527 ceph_decode_32_safe(p, end, len, e_inval); 1528 if (len == 0) { 1529 kvfree(map->osd_primary_affinity); 1530 map->osd_primary_affinity = NULL; 1531 return 0; 1532 } 1533 if (len != map->max_osd) 1534 goto e_inval; 1535 1536 ceph_decode_need(p, end, map->max_osd*sizeof(u32), e_inval); 1537 1538 for (i = 0; i < map->max_osd; i++) { 1539 int ret; 1540 1541 ret = set_primary_affinity(map, i, ceph_decode_32(p)); 1542 if (ret) 1543 return ret; 1544 } 1545 1546 return 0; 1547 1548 e_inval: 1549 return -EINVAL; 1550 } 1551 1552 static int decode_new_primary_affinity(void **p, void *end, 1553 struct ceph_osdmap *map) 1554 { 1555 u32 n; 1556 1557 ceph_decode_32_safe(p, end, n, e_inval); 1558 while (n--) { 1559 u32 osd, aff; 1560 int ret; 1561 1562 ceph_decode_32_safe(p, end, osd, e_inval); 1563 ceph_decode_32_safe(p, end, aff, e_inval); 1564 if (osd >= map->max_osd) 1565 goto e_inval; 1566 1567 ret = set_primary_affinity(map, osd, aff); 1568 if (ret) 1569 return ret; 1570 1571 osdmap_info(map, "osd%d primary-affinity 0x%x\n", osd, aff); 1572 } 1573 1574 return 0; 1575 1576 e_inval: 1577 return -EINVAL; 1578 } 1579 1580 static struct ceph_pg_mapping *__decode_pg_upmap(void **p, void *end, 1581 bool __unused) 1582 { 1583 return __decode_pg_temp(p, end, false); 1584 } 1585 1586 static int decode_pg_upmap(void **p, void *end, struct ceph_osdmap *map) 1587 { 1588 return decode_pg_mapping(p, end, &map->pg_upmap, __decode_pg_upmap, 1589 false); 1590 } 1591 1592 static int decode_new_pg_upmap(void **p, void *end, struct ceph_osdmap *map) 1593 { 1594 return decode_pg_mapping(p, end, &map->pg_upmap, __decode_pg_upmap, 1595 true); 1596 } 1597 1598 static int decode_old_pg_upmap(void **p, void *end, struct ceph_osdmap *map) 1599 { 1600 return decode_pg_mapping(p, end, &map->pg_upmap, NULL, true); 1601 } 1602 1603 static struct ceph_pg_mapping *__decode_pg_upmap_items(void **p, void *end, 1604 bool __unused) 1605 { 1606 struct ceph_pg_mapping *pg; 1607 u32 len, i; 1608 1609 ceph_decode_32_safe(p, end, len, e_inval); 1610 if ((size_t)len > (SIZE_MAX - sizeof(*pg)) / (2 * sizeof(u32))) 1611 return ERR_PTR(-EINVAL); 1612 1613 ceph_decode_need(p, end, 2 * len * sizeof(u32), e_inval); 1614 pg = alloc_pg_mapping(2 * len * sizeof(u32)); 1615 if (!pg) 1616 return ERR_PTR(-ENOMEM); 1617 1618 pg->pg_upmap_items.len = len; 1619 for (i = 0; i < len; i++) { 1620 pg->pg_upmap_items.from_to[i][0] = ceph_decode_32(p); 1621 pg->pg_upmap_items.from_to[i][1] = ceph_decode_32(p); 1622 } 1623 1624 return pg; 1625 1626 e_inval: 1627 return ERR_PTR(-EINVAL); 1628 } 1629 1630 static int decode_pg_upmap_items(void **p, void *end, struct ceph_osdmap *map) 1631 { 1632 return decode_pg_mapping(p, end, &map->pg_upmap_items, 1633 __decode_pg_upmap_items, false); 1634 } 1635 1636 static int decode_new_pg_upmap_items(void **p, void *end, 1637 struct ceph_osdmap *map) 1638 { 1639 return decode_pg_mapping(p, end, &map->pg_upmap_items, 1640 __decode_pg_upmap_items, true); 1641 } 1642 1643 static int decode_old_pg_upmap_items(void **p, void *end, 1644 struct ceph_osdmap *map) 1645 { 1646 return decode_pg_mapping(p, end, &map->pg_upmap_items, NULL, true); 1647 } 1648 1649 /* 1650 * decode a full map. 1651 */ 1652 static int osdmap_decode(void **p, void *end, bool msgr2, 1653 struct ceph_osdmap *map) 1654 { 1655 u8 struct_v; 1656 u32 epoch = 0; 1657 void *start = *p; 1658 u32 max; 1659 u32 len, i; 1660 int err; 1661 1662 dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p)); 1663 1664 err = get_osdmap_client_data_v(p, end, "full", &struct_v); 1665 if (err) 1666 goto bad; 1667 1668 /* fsid, epoch, created, modified */ 1669 ceph_decode_need(p, end, sizeof(map->fsid) + sizeof(u32) + 1670 sizeof(map->created) + sizeof(map->modified), e_inval); 1671 ceph_decode_copy(p, &map->fsid, sizeof(map->fsid)); 1672 epoch = map->epoch = ceph_decode_32(p); 1673 ceph_decode_copy(p, &map->created, sizeof(map->created)); 1674 ceph_decode_copy(p, &map->modified, sizeof(map->modified)); 1675 1676 /* pools */ 1677 err = decode_pools(p, end, map); 1678 if (err) 1679 goto bad; 1680 1681 /* pool_name */ 1682 err = decode_pool_names(p, end, map); 1683 if (err) 1684 goto bad; 1685 1686 ceph_decode_32_safe(p, end, map->pool_max, e_inval); 1687 1688 ceph_decode_32_safe(p, end, map->flags, e_inval); 1689 1690 /* max_osd */ 1691 ceph_decode_32_safe(p, end, max, e_inval); 1692 1693 /* (re)alloc osd arrays */ 1694 err = osdmap_set_max_osd(map, max); 1695 if (err) 1696 goto bad; 1697 1698 /* osd_state, osd_weight, osd_addrs->client_addr */ 1699 ceph_decode_need(p, end, 3*sizeof(u32) + 1700 map->max_osd*(struct_v >= 5 ? sizeof(u32) : 1701 sizeof(u8)) + 1702 sizeof(*map->osd_weight), e_inval); 1703 if (ceph_decode_32(p) != map->max_osd) 1704 goto e_inval; 1705 1706 if (struct_v >= 5) { 1707 for (i = 0; i < map->max_osd; i++) 1708 map->osd_state[i] = ceph_decode_32(p); 1709 } else { 1710 for (i = 0; i < map->max_osd; i++) 1711 map->osd_state[i] = ceph_decode_8(p); 1712 } 1713 1714 if (ceph_decode_32(p) != map->max_osd) 1715 goto e_inval; 1716 1717 for (i = 0; i < map->max_osd; i++) 1718 map->osd_weight[i] = ceph_decode_32(p); 1719 1720 if (ceph_decode_32(p) != map->max_osd) 1721 goto e_inval; 1722 1723 for (i = 0; i < map->max_osd; i++) { 1724 struct ceph_entity_addr *addr = &map->osd_addr[i]; 1725 1726 if (struct_v >= 8) 1727 err = ceph_decode_entity_addrvec(p, end, msgr2, addr); 1728 else 1729 err = ceph_decode_entity_addr(p, end, addr); 1730 if (err) 1731 goto bad; 1732 1733 dout("%s osd%d addr %s\n", __func__, i, ceph_pr_addr(addr)); 1734 } 1735 1736 /* pg_temp */ 1737 err = decode_pg_temp(p, end, map); 1738 if (err) 1739 goto bad; 1740 1741 /* primary_temp */ 1742 if (struct_v >= 1) { 1743 err = decode_primary_temp(p, end, map); 1744 if (err) 1745 goto bad; 1746 } 1747 1748 /* primary_affinity */ 1749 if (struct_v >= 2) { 1750 err = decode_primary_affinity(p, end, map); 1751 if (err) 1752 goto bad; 1753 } else { 1754 WARN_ON(map->osd_primary_affinity); 1755 } 1756 1757 /* crush */ 1758 ceph_decode_32_safe(p, end, len, e_inval); 1759 err = osdmap_set_crush(map, crush_decode(*p, min(*p + len, end))); 1760 if (err) 1761 goto bad; 1762 1763 *p += len; 1764 if (struct_v >= 3) { 1765 /* erasure_code_profiles */ 1766 ceph_decode_skip_map_of_map(p, end, string, string, string, 1767 e_inval); 1768 } 1769 1770 if (struct_v >= 4) { 1771 err = decode_pg_upmap(p, end, map); 1772 if (err) 1773 goto bad; 1774 1775 err = decode_pg_upmap_items(p, end, map); 1776 if (err) 1777 goto bad; 1778 } else { 1779 WARN_ON(!RB_EMPTY_ROOT(&map->pg_upmap)); 1780 WARN_ON(!RB_EMPTY_ROOT(&map->pg_upmap_items)); 1781 } 1782 1783 /* ignore the rest */ 1784 *p = end; 1785 1786 dout("full osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd); 1787 return 0; 1788 1789 e_inval: 1790 err = -EINVAL; 1791 bad: 1792 pr_err("corrupt full osdmap (%d) epoch %d off %d (%p of %p-%p)\n", 1793 err, epoch, (int)(*p - start), *p, start, end); 1794 print_hex_dump(KERN_DEBUG, "osdmap: ", 1795 DUMP_PREFIX_OFFSET, 16, 1, 1796 start, end - start, true); 1797 return err; 1798 } 1799 1800 /* 1801 * Allocate and decode a full map. 1802 */ 1803 struct ceph_osdmap *ceph_osdmap_decode(void **p, void *end, bool msgr2) 1804 { 1805 struct ceph_osdmap *map; 1806 int ret; 1807 1808 map = ceph_osdmap_alloc(); 1809 if (!map) 1810 return ERR_PTR(-ENOMEM); 1811 1812 ret = osdmap_decode(p, end, msgr2, map); 1813 if (ret) { 1814 ceph_osdmap_destroy(map); 1815 return ERR_PTR(ret); 1816 } 1817 1818 return map; 1819 } 1820 1821 /* 1822 * Encoding order is (new_up_client, new_state, new_weight). Need to 1823 * apply in the (new_weight, new_state, new_up_client) order, because 1824 * an incremental map may look like e.g. 1825 * 1826 * new_up_client: { osd=6, addr=... } # set osd_state and addr 1827 * new_state: { osd=6, xorstate=EXISTS } # clear osd_state 1828 */ 1829 static int decode_new_up_state_weight(void **p, void *end, u8 struct_v, 1830 bool msgr2, struct ceph_osdmap *map) 1831 { 1832 void *new_up_client; 1833 void *new_state; 1834 void *new_weight_end; 1835 u32 len; 1836 int ret; 1837 int i; 1838 1839 new_up_client = *p; 1840 ceph_decode_32_safe(p, end, len, e_inval); 1841 for (i = 0; i < len; ++i) { 1842 struct ceph_entity_addr addr; 1843 1844 ceph_decode_skip_32(p, end, e_inval); 1845 if (struct_v >= 7) 1846 ret = ceph_decode_entity_addrvec(p, end, msgr2, &addr); 1847 else 1848 ret = ceph_decode_entity_addr(p, end, &addr); 1849 if (ret) 1850 return ret; 1851 } 1852 1853 new_state = *p; 1854 ceph_decode_32_safe(p, end, len, e_inval); 1855 len *= sizeof(u32) + (struct_v >= 5 ? sizeof(u32) : sizeof(u8)); 1856 ceph_decode_need(p, end, len, e_inval); 1857 *p += len; 1858 1859 /* new_weight */ 1860 ceph_decode_32_safe(p, end, len, e_inval); 1861 while (len--) { 1862 s32 osd; 1863 u32 w; 1864 1865 ceph_decode_need(p, end, 2*sizeof(u32), e_inval); 1866 osd = ceph_decode_32(p); 1867 w = ceph_decode_32(p); 1868 if (osd >= map->max_osd) 1869 goto e_inval; 1870 1871 osdmap_info(map, "osd%d weight 0x%x %s\n", osd, w, 1872 w == CEPH_OSD_IN ? "(in)" : 1873 (w == CEPH_OSD_OUT ? "(out)" : "")); 1874 map->osd_weight[osd] = w; 1875 1876 /* 1877 * If we are marking in, set the EXISTS, and clear the 1878 * AUTOOUT and NEW bits. 1879 */ 1880 if (w) { 1881 map->osd_state[osd] |= CEPH_OSD_EXISTS; 1882 map->osd_state[osd] &= ~(CEPH_OSD_AUTOOUT | 1883 CEPH_OSD_NEW); 1884 } 1885 } 1886 new_weight_end = *p; 1887 1888 /* new_state (up/down) */ 1889 *p = new_state; 1890 len = ceph_decode_32(p); 1891 while (len--) { 1892 s32 osd; 1893 u32 xorstate; 1894 1895 osd = ceph_decode_32(p); 1896 if (osd >= map->max_osd) 1897 goto e_inval; 1898 1899 if (struct_v >= 5) 1900 xorstate = ceph_decode_32(p); 1901 else 1902 xorstate = ceph_decode_8(p); 1903 if (xorstate == 0) 1904 xorstate = CEPH_OSD_UP; 1905 if ((map->osd_state[osd] & CEPH_OSD_UP) && 1906 (xorstate & CEPH_OSD_UP)) 1907 osdmap_info(map, "osd%d down\n", osd); 1908 if ((map->osd_state[osd] & CEPH_OSD_EXISTS) && 1909 (xorstate & CEPH_OSD_EXISTS)) { 1910 osdmap_info(map, "osd%d does not exist\n", osd); 1911 ret = set_primary_affinity(map, osd, 1912 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY); 1913 if (ret) 1914 return ret; 1915 memset(map->osd_addr + osd, 0, sizeof(*map->osd_addr)); 1916 map->osd_state[osd] = 0; 1917 } else { 1918 map->osd_state[osd] ^= xorstate; 1919 } 1920 } 1921 1922 /* new_up_client */ 1923 *p = new_up_client; 1924 len = ceph_decode_32(p); 1925 while (len--) { 1926 s32 osd; 1927 struct ceph_entity_addr addr; 1928 1929 osd = ceph_decode_32(p); 1930 if (osd >= map->max_osd) 1931 goto e_inval; 1932 1933 if (struct_v >= 7) 1934 ret = ceph_decode_entity_addrvec(p, end, msgr2, &addr); 1935 else 1936 ret = ceph_decode_entity_addr(p, end, &addr); 1937 if (ret) 1938 return ret; 1939 1940 dout("%s osd%d addr %s\n", __func__, osd, ceph_pr_addr(&addr)); 1941 1942 osdmap_info(map, "osd%d up\n", osd); 1943 map->osd_state[osd] |= CEPH_OSD_EXISTS | CEPH_OSD_UP; 1944 map->osd_addr[osd] = addr; 1945 } 1946 1947 *p = new_weight_end; 1948 return 0; 1949 1950 e_inval: 1951 return -EINVAL; 1952 } 1953 1954 /* 1955 * decode and apply an incremental map update. 1956 */ 1957 struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, bool msgr2, 1958 struct ceph_osdmap *map) 1959 { 1960 struct ceph_fsid fsid; 1961 u32 epoch = 0; 1962 struct ceph_timespec modified; 1963 s32 len; 1964 u64 pool; 1965 __s64 new_pool_max; 1966 __s32 new_flags, max; 1967 void *start = *p; 1968 int err; 1969 u8 struct_v; 1970 1971 dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p)); 1972 1973 err = get_osdmap_client_data_v(p, end, "inc", &struct_v); 1974 if (err) 1975 goto bad; 1976 1977 /* fsid, epoch, modified, new_pool_max, new_flags */ 1978 ceph_decode_need(p, end, sizeof(fsid) + sizeof(u32) + sizeof(modified) + 1979 sizeof(u64) + sizeof(u32), e_inval); 1980 ceph_decode_copy(p, &fsid, sizeof(fsid)); 1981 epoch = ceph_decode_32(p); 1982 BUG_ON(epoch != map->epoch+1); 1983 ceph_decode_copy(p, &modified, sizeof(modified)); 1984 new_pool_max = ceph_decode_64(p); 1985 new_flags = ceph_decode_32(p); 1986 1987 /* full map? */ 1988 ceph_decode_32_safe(p, end, len, e_inval); 1989 if (len > 0) { 1990 dout("apply_incremental full map len %d, %p to %p\n", 1991 len, *p, end); 1992 return ceph_osdmap_decode(p, min(*p+len, end), msgr2); 1993 } 1994 1995 /* new crush? */ 1996 ceph_decode_32_safe(p, end, len, e_inval); 1997 if (len > 0) { 1998 err = osdmap_set_crush(map, 1999 crush_decode(*p, min(*p + len, end))); 2000 if (err) 2001 goto bad; 2002 *p += len; 2003 } 2004 2005 /* new flags? */ 2006 if (new_flags >= 0) 2007 map->flags = new_flags; 2008 if (new_pool_max >= 0) 2009 map->pool_max = new_pool_max; 2010 2011 /* new max? */ 2012 ceph_decode_32_safe(p, end, max, e_inval); 2013 if (max >= 0) { 2014 err = osdmap_set_max_osd(map, max); 2015 if (err) 2016 goto bad; 2017 } 2018 2019 map->epoch++; 2020 map->modified = modified; 2021 2022 /* new_pools */ 2023 err = decode_new_pools(p, end, map); 2024 if (err) 2025 goto bad; 2026 2027 /* new_pool_names */ 2028 err = decode_pool_names(p, end, map); 2029 if (err) 2030 goto bad; 2031 2032 /* old_pool */ 2033 ceph_decode_32_safe(p, end, len, e_inval); 2034 while (len--) { 2035 struct ceph_pg_pool_info *pi; 2036 2037 ceph_decode_64_safe(p, end, pool, e_inval); 2038 pi = lookup_pg_pool(&map->pg_pools, pool); 2039 if (pi) 2040 __remove_pg_pool(&map->pg_pools, pi); 2041 } 2042 2043 /* new_up_client, new_state, new_weight */ 2044 err = decode_new_up_state_weight(p, end, struct_v, msgr2, map); 2045 if (err) 2046 goto bad; 2047 2048 /* new_pg_temp */ 2049 err = decode_new_pg_temp(p, end, map); 2050 if (err) 2051 goto bad; 2052 2053 /* new_primary_temp */ 2054 if (struct_v >= 1) { 2055 err = decode_new_primary_temp(p, end, map); 2056 if (err) 2057 goto bad; 2058 } 2059 2060 /* new_primary_affinity */ 2061 if (struct_v >= 2) { 2062 err = decode_new_primary_affinity(p, end, map); 2063 if (err) 2064 goto bad; 2065 } 2066 2067 if (struct_v >= 3) { 2068 /* new_erasure_code_profiles */ 2069 ceph_decode_skip_map_of_map(p, end, string, string, string, 2070 e_inval); 2071 /* old_erasure_code_profiles */ 2072 ceph_decode_skip_set(p, end, string, e_inval); 2073 } 2074 2075 if (struct_v >= 4) { 2076 err = decode_new_pg_upmap(p, end, map); 2077 if (err) 2078 goto bad; 2079 2080 err = decode_old_pg_upmap(p, end, map); 2081 if (err) 2082 goto bad; 2083 2084 err = decode_new_pg_upmap_items(p, end, map); 2085 if (err) 2086 goto bad; 2087 2088 err = decode_old_pg_upmap_items(p, end, map); 2089 if (err) 2090 goto bad; 2091 } 2092 2093 /* ignore the rest */ 2094 *p = end; 2095 2096 dout("inc osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd); 2097 return map; 2098 2099 e_inval: 2100 err = -EINVAL; 2101 bad: 2102 pr_err("corrupt inc osdmap (%d) epoch %d off %d (%p of %p-%p)\n", 2103 err, epoch, (int)(*p - start), *p, start, end); 2104 print_hex_dump(KERN_DEBUG, "osdmap: ", 2105 DUMP_PREFIX_OFFSET, 16, 1, 2106 start, end - start, true); 2107 return ERR_PTR(err); 2108 } 2109 2110 void ceph_oloc_copy(struct ceph_object_locator *dest, 2111 const struct ceph_object_locator *src) 2112 { 2113 ceph_oloc_destroy(dest); 2114 2115 dest->pool = src->pool; 2116 if (src->pool_ns) 2117 dest->pool_ns = ceph_get_string(src->pool_ns); 2118 else 2119 dest->pool_ns = NULL; 2120 } 2121 EXPORT_SYMBOL(ceph_oloc_copy); 2122 2123 void ceph_oloc_destroy(struct ceph_object_locator *oloc) 2124 { 2125 ceph_put_string(oloc->pool_ns); 2126 } 2127 EXPORT_SYMBOL(ceph_oloc_destroy); 2128 2129 void ceph_oid_copy(struct ceph_object_id *dest, 2130 const struct ceph_object_id *src) 2131 { 2132 ceph_oid_destroy(dest); 2133 2134 if (src->name != src->inline_name) { 2135 /* very rare, see ceph_object_id definition */ 2136 dest->name = kmalloc(src->name_len + 1, 2137 GFP_NOIO | __GFP_NOFAIL); 2138 } else { 2139 dest->name = dest->inline_name; 2140 } 2141 memcpy(dest->name, src->name, src->name_len + 1); 2142 dest->name_len = src->name_len; 2143 } 2144 EXPORT_SYMBOL(ceph_oid_copy); 2145 2146 static __printf(2, 0) 2147 int oid_printf_vargs(struct ceph_object_id *oid, const char *fmt, va_list ap) 2148 { 2149 int len; 2150 2151 WARN_ON(!ceph_oid_empty(oid)); 2152 2153 len = vsnprintf(oid->inline_name, sizeof(oid->inline_name), fmt, ap); 2154 if (len >= sizeof(oid->inline_name)) 2155 return len; 2156 2157 oid->name_len = len; 2158 return 0; 2159 } 2160 2161 /* 2162 * If oid doesn't fit into inline buffer, BUG. 2163 */ 2164 void ceph_oid_printf(struct ceph_object_id *oid, const char *fmt, ...) 2165 { 2166 va_list ap; 2167 2168 va_start(ap, fmt); 2169 BUG_ON(oid_printf_vargs(oid, fmt, ap)); 2170 va_end(ap); 2171 } 2172 EXPORT_SYMBOL(ceph_oid_printf); 2173 2174 static __printf(3, 0) 2175 int oid_aprintf_vargs(struct ceph_object_id *oid, gfp_t gfp, 2176 const char *fmt, va_list ap) 2177 { 2178 va_list aq; 2179 int len; 2180 2181 va_copy(aq, ap); 2182 len = oid_printf_vargs(oid, fmt, aq); 2183 va_end(aq); 2184 2185 if (len) { 2186 char *external_name; 2187 2188 external_name = kmalloc(len + 1, gfp); 2189 if (!external_name) 2190 return -ENOMEM; 2191 2192 oid->name = external_name; 2193 WARN_ON(vsnprintf(oid->name, len + 1, fmt, ap) != len); 2194 oid->name_len = len; 2195 } 2196 2197 return 0; 2198 } 2199 2200 /* 2201 * If oid doesn't fit into inline buffer, allocate. 2202 */ 2203 int ceph_oid_aprintf(struct ceph_object_id *oid, gfp_t gfp, 2204 const char *fmt, ...) 2205 { 2206 va_list ap; 2207 int ret; 2208 2209 va_start(ap, fmt); 2210 ret = oid_aprintf_vargs(oid, gfp, fmt, ap); 2211 va_end(ap); 2212 2213 return ret; 2214 } 2215 EXPORT_SYMBOL(ceph_oid_aprintf); 2216 2217 void ceph_oid_destroy(struct ceph_object_id *oid) 2218 { 2219 if (oid->name != oid->inline_name) 2220 kfree(oid->name); 2221 } 2222 EXPORT_SYMBOL(ceph_oid_destroy); 2223 2224 /* 2225 * osds only 2226 */ 2227 static bool __osds_equal(const struct ceph_osds *lhs, 2228 const struct ceph_osds *rhs) 2229 { 2230 if (lhs->size == rhs->size && 2231 !memcmp(lhs->osds, rhs->osds, rhs->size * sizeof(rhs->osds[0]))) 2232 return true; 2233 2234 return false; 2235 } 2236 2237 /* 2238 * osds + primary 2239 */ 2240 static bool osds_equal(const struct ceph_osds *lhs, 2241 const struct ceph_osds *rhs) 2242 { 2243 if (__osds_equal(lhs, rhs) && 2244 lhs->primary == rhs->primary) 2245 return true; 2246 2247 return false; 2248 } 2249 2250 static bool osds_valid(const struct ceph_osds *set) 2251 { 2252 /* non-empty set */ 2253 if (set->size > 0 && set->primary >= 0) 2254 return true; 2255 2256 /* empty can_shift_osds set */ 2257 if (!set->size && set->primary == -1) 2258 return true; 2259 2260 /* empty !can_shift_osds set - all NONE */ 2261 if (set->size > 0 && set->primary == -1) { 2262 int i; 2263 2264 for (i = 0; i < set->size; i++) { 2265 if (set->osds[i] != CRUSH_ITEM_NONE) 2266 break; 2267 } 2268 if (i == set->size) 2269 return true; 2270 } 2271 2272 return false; 2273 } 2274 2275 void ceph_osds_copy(struct ceph_osds *dest, const struct ceph_osds *src) 2276 { 2277 memcpy(dest->osds, src->osds, src->size * sizeof(src->osds[0])); 2278 dest->size = src->size; 2279 dest->primary = src->primary; 2280 } 2281 2282 bool ceph_pg_is_split(const struct ceph_pg *pgid, u32 old_pg_num, 2283 u32 new_pg_num) 2284 { 2285 int old_bits = calc_bits_of(old_pg_num); 2286 int old_mask = (1 << old_bits) - 1; 2287 int n; 2288 2289 WARN_ON(pgid->seed >= old_pg_num); 2290 if (new_pg_num <= old_pg_num) 2291 return false; 2292 2293 for (n = 1; ; n++) { 2294 int next_bit = n << (old_bits - 1); 2295 u32 s = next_bit | pgid->seed; 2296 2297 if (s < old_pg_num || s == pgid->seed) 2298 continue; 2299 if (s >= new_pg_num) 2300 break; 2301 2302 s = ceph_stable_mod(s, old_pg_num, old_mask); 2303 if (s == pgid->seed) 2304 return true; 2305 } 2306 2307 return false; 2308 } 2309 2310 bool ceph_is_new_interval(const struct ceph_osds *old_acting, 2311 const struct ceph_osds *new_acting, 2312 const struct ceph_osds *old_up, 2313 const struct ceph_osds *new_up, 2314 int old_size, 2315 int new_size, 2316 int old_min_size, 2317 int new_min_size, 2318 u32 old_pg_num, 2319 u32 new_pg_num, 2320 bool old_sort_bitwise, 2321 bool new_sort_bitwise, 2322 bool old_recovery_deletes, 2323 bool new_recovery_deletes, 2324 const struct ceph_pg *pgid) 2325 { 2326 return !osds_equal(old_acting, new_acting) || 2327 !osds_equal(old_up, new_up) || 2328 old_size != new_size || 2329 old_min_size != new_min_size || 2330 ceph_pg_is_split(pgid, old_pg_num, new_pg_num) || 2331 old_sort_bitwise != new_sort_bitwise || 2332 old_recovery_deletes != new_recovery_deletes; 2333 } 2334 2335 static int calc_pg_rank(int osd, const struct ceph_osds *acting) 2336 { 2337 int i; 2338 2339 for (i = 0; i < acting->size; i++) { 2340 if (acting->osds[i] == osd) 2341 return i; 2342 } 2343 2344 return -1; 2345 } 2346 2347 static bool primary_changed(const struct ceph_osds *old_acting, 2348 const struct ceph_osds *new_acting) 2349 { 2350 if (!old_acting->size && !new_acting->size) 2351 return false; /* both still empty */ 2352 2353 if (!old_acting->size ^ !new_acting->size) 2354 return true; /* was empty, now not, or vice versa */ 2355 2356 if (old_acting->primary != new_acting->primary) 2357 return true; /* primary changed */ 2358 2359 if (calc_pg_rank(old_acting->primary, old_acting) != 2360 calc_pg_rank(new_acting->primary, new_acting)) 2361 return true; 2362 2363 return false; /* same primary (tho replicas may have changed) */ 2364 } 2365 2366 bool ceph_osds_changed(const struct ceph_osds *old_acting, 2367 const struct ceph_osds *new_acting, 2368 bool any_change) 2369 { 2370 if (primary_changed(old_acting, new_acting)) 2371 return true; 2372 2373 if (any_change && !__osds_equal(old_acting, new_acting)) 2374 return true; 2375 2376 return false; 2377 } 2378 2379 /* 2380 * Map an object into a PG. 2381 * 2382 * Should only be called with target_oid and target_oloc (as opposed to 2383 * base_oid and base_oloc), since tiering isn't taken into account. 2384 */ 2385 void __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi, 2386 const struct ceph_object_id *oid, 2387 const struct ceph_object_locator *oloc, 2388 struct ceph_pg *raw_pgid) 2389 { 2390 WARN_ON(pi->id != oloc->pool); 2391 2392 if (!oloc->pool_ns) { 2393 raw_pgid->pool = oloc->pool; 2394 raw_pgid->seed = ceph_str_hash(pi->object_hash, oid->name, 2395 oid->name_len); 2396 dout("%s %s -> raw_pgid %llu.%x\n", __func__, oid->name, 2397 raw_pgid->pool, raw_pgid->seed); 2398 } else { 2399 char stack_buf[256]; 2400 char *buf = stack_buf; 2401 int nsl = oloc->pool_ns->len; 2402 size_t total = nsl + 1 + oid->name_len; 2403 2404 if (total > sizeof(stack_buf)) 2405 buf = kmalloc(total, GFP_NOIO | __GFP_NOFAIL); 2406 memcpy(buf, oloc->pool_ns->str, nsl); 2407 buf[nsl] = '\037'; 2408 memcpy(buf + nsl + 1, oid->name, oid->name_len); 2409 raw_pgid->pool = oloc->pool; 2410 raw_pgid->seed = ceph_str_hash(pi->object_hash, buf, total); 2411 if (buf != stack_buf) 2412 kfree(buf); 2413 dout("%s %s ns %.*s -> raw_pgid %llu.%x\n", __func__, 2414 oid->name, nsl, oloc->pool_ns->str, 2415 raw_pgid->pool, raw_pgid->seed); 2416 } 2417 } 2418 2419 int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap, 2420 const struct ceph_object_id *oid, 2421 const struct ceph_object_locator *oloc, 2422 struct ceph_pg *raw_pgid) 2423 { 2424 struct ceph_pg_pool_info *pi; 2425 2426 pi = ceph_pg_pool_by_id(osdmap, oloc->pool); 2427 if (!pi) 2428 return -ENOENT; 2429 2430 __ceph_object_locator_to_pg(pi, oid, oloc, raw_pgid); 2431 return 0; 2432 } 2433 EXPORT_SYMBOL(ceph_object_locator_to_pg); 2434 2435 /* 2436 * Map a raw PG (full precision ps) into an actual PG. 2437 */ 2438 static void raw_pg_to_pg(struct ceph_pg_pool_info *pi, 2439 const struct ceph_pg *raw_pgid, 2440 struct ceph_pg *pgid) 2441 { 2442 pgid->pool = raw_pgid->pool; 2443 pgid->seed = ceph_stable_mod(raw_pgid->seed, pi->pg_num, 2444 pi->pg_num_mask); 2445 } 2446 2447 /* 2448 * Map a raw PG (full precision ps) into a placement ps (placement 2449 * seed). Include pool id in that value so that different pools don't 2450 * use the same seeds. 2451 */ 2452 static u32 raw_pg_to_pps(struct ceph_pg_pool_info *pi, 2453 const struct ceph_pg *raw_pgid) 2454 { 2455 if (pi->flags & CEPH_POOL_FLAG_HASHPSPOOL) { 2456 /* hash pool id and seed so that pool PGs do not overlap */ 2457 return crush_hash32_2(CRUSH_HASH_RJENKINS1, 2458 ceph_stable_mod(raw_pgid->seed, 2459 pi->pgp_num, 2460 pi->pgp_num_mask), 2461 raw_pgid->pool); 2462 } else { 2463 /* 2464 * legacy behavior: add ps and pool together. this is 2465 * not a great approach because the PGs from each pool 2466 * will overlap on top of each other: 0.5 == 1.4 == 2467 * 2.3 == ... 2468 */ 2469 return ceph_stable_mod(raw_pgid->seed, pi->pgp_num, 2470 pi->pgp_num_mask) + 2471 (unsigned)raw_pgid->pool; 2472 } 2473 } 2474 2475 /* 2476 * Magic value used for a "default" fallback choose_args, used if the 2477 * crush_choose_arg_map passed to do_crush() does not exist. If this 2478 * also doesn't exist, fall back to canonical weights. 2479 */ 2480 #define CEPH_DEFAULT_CHOOSE_ARGS -1 2481 2482 static int do_crush(struct ceph_osdmap *map, int ruleno, int x, 2483 int *result, int result_max, 2484 const __u32 *weight, int weight_max, 2485 s64 choose_args_index) 2486 { 2487 struct crush_choose_arg_map *arg_map; 2488 struct crush_work *work; 2489 int r; 2490 2491 BUG_ON(result_max > CEPH_PG_MAX_SIZE); 2492 2493 arg_map = lookup_choose_arg_map(&map->crush->choose_args, 2494 choose_args_index); 2495 if (!arg_map) 2496 arg_map = lookup_choose_arg_map(&map->crush->choose_args, 2497 CEPH_DEFAULT_CHOOSE_ARGS); 2498 2499 work = get_workspace(&map->crush_wsm, map->crush); 2500 r = crush_do_rule(map->crush, ruleno, x, result, result_max, 2501 weight, weight_max, work, 2502 arg_map ? arg_map->args : NULL); 2503 put_workspace(&map->crush_wsm, work); 2504 return r; 2505 } 2506 2507 static void remove_nonexistent_osds(struct ceph_osdmap *osdmap, 2508 struct ceph_pg_pool_info *pi, 2509 struct ceph_osds *set) 2510 { 2511 int i; 2512 2513 if (ceph_can_shift_osds(pi)) { 2514 int removed = 0; 2515 2516 /* shift left */ 2517 for (i = 0; i < set->size; i++) { 2518 if (!ceph_osd_exists(osdmap, set->osds[i])) { 2519 removed++; 2520 continue; 2521 } 2522 if (removed) 2523 set->osds[i - removed] = set->osds[i]; 2524 } 2525 set->size -= removed; 2526 } else { 2527 /* set dne devices to NONE */ 2528 for (i = 0; i < set->size; i++) { 2529 if (!ceph_osd_exists(osdmap, set->osds[i])) 2530 set->osds[i] = CRUSH_ITEM_NONE; 2531 } 2532 } 2533 } 2534 2535 /* 2536 * Calculate raw set (CRUSH output) for given PG and filter out 2537 * nonexistent OSDs. ->primary is undefined for a raw set. 2538 * 2539 * Placement seed (CRUSH input) is returned through @ppps. 2540 */ 2541 static void pg_to_raw_osds(struct ceph_osdmap *osdmap, 2542 struct ceph_pg_pool_info *pi, 2543 const struct ceph_pg *raw_pgid, 2544 struct ceph_osds *raw, 2545 u32 *ppps) 2546 { 2547 u32 pps = raw_pg_to_pps(pi, raw_pgid); 2548 int ruleno; 2549 int len; 2550 2551 ceph_osds_init(raw); 2552 if (ppps) 2553 *ppps = pps; 2554 2555 ruleno = crush_find_rule(osdmap->crush, pi->crush_ruleset, pi->type, 2556 pi->size); 2557 if (ruleno < 0) { 2558 pr_err("no crush rule: pool %lld ruleset %d type %d size %d\n", 2559 pi->id, pi->crush_ruleset, pi->type, pi->size); 2560 return; 2561 } 2562 2563 if (pi->size > ARRAY_SIZE(raw->osds)) { 2564 pr_err_ratelimited("pool %lld ruleset %d type %d too wide: size %d > %zu\n", 2565 pi->id, pi->crush_ruleset, pi->type, pi->size, 2566 ARRAY_SIZE(raw->osds)); 2567 return; 2568 } 2569 2570 len = do_crush(osdmap, ruleno, pps, raw->osds, pi->size, 2571 osdmap->osd_weight, osdmap->max_osd, pi->id); 2572 if (len < 0) { 2573 pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n", 2574 len, ruleno, pi->id, pi->crush_ruleset, pi->type, 2575 pi->size); 2576 return; 2577 } 2578 2579 raw->size = len; 2580 remove_nonexistent_osds(osdmap, pi, raw); 2581 } 2582 2583 /* apply pg_upmap[_items] mappings */ 2584 static void apply_upmap(struct ceph_osdmap *osdmap, 2585 const struct ceph_pg *pgid, 2586 struct ceph_osds *raw) 2587 { 2588 struct ceph_pg_mapping *pg; 2589 int i, j; 2590 2591 pg = lookup_pg_mapping(&osdmap->pg_upmap, pgid); 2592 if (pg) { 2593 /* make sure targets aren't marked out */ 2594 for (i = 0; i < pg->pg_upmap.len; i++) { 2595 int osd = pg->pg_upmap.osds[i]; 2596 2597 if (osd != CRUSH_ITEM_NONE && 2598 osd < osdmap->max_osd && 2599 osdmap->osd_weight[osd] == 0) { 2600 /* reject/ignore explicit mapping */ 2601 return; 2602 } 2603 } 2604 for (i = 0; i < pg->pg_upmap.len; i++) 2605 raw->osds[i] = pg->pg_upmap.osds[i]; 2606 raw->size = pg->pg_upmap.len; 2607 /* check and apply pg_upmap_items, if any */ 2608 } 2609 2610 pg = lookup_pg_mapping(&osdmap->pg_upmap_items, pgid); 2611 if (pg) { 2612 /* 2613 * Note: this approach does not allow a bidirectional swap, 2614 * e.g., [[1,2],[2,1]] applied to [0,1,2] -> [0,2,1]. 2615 */ 2616 for (i = 0; i < pg->pg_upmap_items.len; i++) { 2617 int from = pg->pg_upmap_items.from_to[i][0]; 2618 int to = pg->pg_upmap_items.from_to[i][1]; 2619 int pos = -1; 2620 bool exists = false; 2621 2622 /* make sure replacement doesn't already appear */ 2623 for (j = 0; j < raw->size; j++) { 2624 int osd = raw->osds[j]; 2625 2626 if (osd == to) { 2627 exists = true; 2628 break; 2629 } 2630 /* ignore mapping if target is marked out */ 2631 if (osd == from && pos < 0 && 2632 !(to != CRUSH_ITEM_NONE && 2633 to < osdmap->max_osd && 2634 osdmap->osd_weight[to] == 0)) { 2635 pos = j; 2636 } 2637 } 2638 if (!exists && pos >= 0) 2639 raw->osds[pos] = to; 2640 } 2641 } 2642 } 2643 2644 /* 2645 * Given raw set, calculate up set and up primary. By definition of an 2646 * up set, the result won't contain nonexistent or down OSDs. 2647 * 2648 * This is done in-place - on return @set is the up set. If it's 2649 * empty, ->primary will remain undefined. 2650 */ 2651 static void raw_to_up_osds(struct ceph_osdmap *osdmap, 2652 struct ceph_pg_pool_info *pi, 2653 struct ceph_osds *set) 2654 { 2655 int i; 2656 2657 /* ->primary is undefined for a raw set */ 2658 BUG_ON(set->primary != -1); 2659 2660 if (ceph_can_shift_osds(pi)) { 2661 int removed = 0; 2662 2663 /* shift left */ 2664 for (i = 0; i < set->size; i++) { 2665 if (ceph_osd_is_down(osdmap, set->osds[i])) { 2666 removed++; 2667 continue; 2668 } 2669 if (removed) 2670 set->osds[i - removed] = set->osds[i]; 2671 } 2672 set->size -= removed; 2673 if (set->size > 0) 2674 set->primary = set->osds[0]; 2675 } else { 2676 /* set down/dne devices to NONE */ 2677 for (i = set->size - 1; i >= 0; i--) { 2678 if (ceph_osd_is_down(osdmap, set->osds[i])) 2679 set->osds[i] = CRUSH_ITEM_NONE; 2680 else 2681 set->primary = set->osds[i]; 2682 } 2683 } 2684 } 2685 2686 static void apply_primary_affinity(struct ceph_osdmap *osdmap, 2687 struct ceph_pg_pool_info *pi, 2688 u32 pps, 2689 struct ceph_osds *up) 2690 { 2691 int i; 2692 int pos = -1; 2693 2694 /* 2695 * Do we have any non-default primary_affinity values for these 2696 * osds? 2697 */ 2698 if (!osdmap->osd_primary_affinity) 2699 return; 2700 2701 for (i = 0; i < up->size; i++) { 2702 int osd = up->osds[i]; 2703 2704 if (osd != CRUSH_ITEM_NONE && 2705 osdmap->osd_primary_affinity[osd] != 2706 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY) { 2707 break; 2708 } 2709 } 2710 if (i == up->size) 2711 return; 2712 2713 /* 2714 * Pick the primary. Feed both the seed (for the pg) and the 2715 * osd into the hash/rng so that a proportional fraction of an 2716 * osd's pgs get rejected as primary. 2717 */ 2718 for (i = 0; i < up->size; i++) { 2719 int osd = up->osds[i]; 2720 u32 aff; 2721 2722 if (osd == CRUSH_ITEM_NONE) 2723 continue; 2724 2725 aff = osdmap->osd_primary_affinity[osd]; 2726 if (aff < CEPH_OSD_MAX_PRIMARY_AFFINITY && 2727 (crush_hash32_2(CRUSH_HASH_RJENKINS1, 2728 pps, osd) >> 16) >= aff) { 2729 /* 2730 * We chose not to use this primary. Note it 2731 * anyway as a fallback in case we don't pick 2732 * anyone else, but keep looking. 2733 */ 2734 if (pos < 0) 2735 pos = i; 2736 } else { 2737 pos = i; 2738 break; 2739 } 2740 } 2741 if (pos < 0) 2742 return; 2743 2744 up->primary = up->osds[pos]; 2745 2746 if (ceph_can_shift_osds(pi) && pos > 0) { 2747 /* move the new primary to the front */ 2748 for (i = pos; i > 0; i--) 2749 up->osds[i] = up->osds[i - 1]; 2750 up->osds[0] = up->primary; 2751 } 2752 } 2753 2754 /* 2755 * Get pg_temp and primary_temp mappings for given PG. 2756 * 2757 * Note that a PG may have none, only pg_temp, only primary_temp or 2758 * both pg_temp and primary_temp mappings. This means @temp isn't 2759 * always a valid OSD set on return: in the "only primary_temp" case, 2760 * @temp will have its ->primary >= 0 but ->size == 0. 2761 */ 2762 static void get_temp_osds(struct ceph_osdmap *osdmap, 2763 struct ceph_pg_pool_info *pi, 2764 const struct ceph_pg *pgid, 2765 struct ceph_osds *temp) 2766 { 2767 struct ceph_pg_mapping *pg; 2768 int i; 2769 2770 ceph_osds_init(temp); 2771 2772 /* pg_temp? */ 2773 pg = lookup_pg_mapping(&osdmap->pg_temp, pgid); 2774 if (pg) { 2775 for (i = 0; i < pg->pg_temp.len; i++) { 2776 if (ceph_osd_is_down(osdmap, pg->pg_temp.osds[i])) { 2777 if (ceph_can_shift_osds(pi)) 2778 continue; 2779 2780 temp->osds[temp->size++] = CRUSH_ITEM_NONE; 2781 } else { 2782 temp->osds[temp->size++] = pg->pg_temp.osds[i]; 2783 } 2784 } 2785 2786 /* apply pg_temp's primary */ 2787 for (i = 0; i < temp->size; i++) { 2788 if (temp->osds[i] != CRUSH_ITEM_NONE) { 2789 temp->primary = temp->osds[i]; 2790 break; 2791 } 2792 } 2793 } 2794 2795 /* primary_temp? */ 2796 pg = lookup_pg_mapping(&osdmap->primary_temp, pgid); 2797 if (pg) 2798 temp->primary = pg->primary_temp.osd; 2799 } 2800 2801 /* 2802 * Map a PG to its acting set as well as its up set. 2803 * 2804 * Acting set is used for data mapping purposes, while up set can be 2805 * recorded for detecting interval changes and deciding whether to 2806 * resend a request. 2807 */ 2808 void ceph_pg_to_up_acting_osds(struct ceph_osdmap *osdmap, 2809 struct ceph_pg_pool_info *pi, 2810 const struct ceph_pg *raw_pgid, 2811 struct ceph_osds *up, 2812 struct ceph_osds *acting) 2813 { 2814 struct ceph_pg pgid; 2815 u32 pps; 2816 2817 WARN_ON(pi->id != raw_pgid->pool); 2818 raw_pg_to_pg(pi, raw_pgid, &pgid); 2819 2820 pg_to_raw_osds(osdmap, pi, raw_pgid, up, &pps); 2821 apply_upmap(osdmap, &pgid, up); 2822 raw_to_up_osds(osdmap, pi, up); 2823 apply_primary_affinity(osdmap, pi, pps, up); 2824 get_temp_osds(osdmap, pi, &pgid, acting); 2825 if (!acting->size) { 2826 memcpy(acting->osds, up->osds, up->size * sizeof(up->osds[0])); 2827 acting->size = up->size; 2828 if (acting->primary == -1) 2829 acting->primary = up->primary; 2830 } 2831 WARN_ON(!osds_valid(up) || !osds_valid(acting)); 2832 } 2833 2834 bool ceph_pg_to_primary_shard(struct ceph_osdmap *osdmap, 2835 struct ceph_pg_pool_info *pi, 2836 const struct ceph_pg *raw_pgid, 2837 struct ceph_spg *spgid) 2838 { 2839 struct ceph_pg pgid; 2840 struct ceph_osds up, acting; 2841 int i; 2842 2843 WARN_ON(pi->id != raw_pgid->pool); 2844 raw_pg_to_pg(pi, raw_pgid, &pgid); 2845 2846 if (ceph_can_shift_osds(pi)) { 2847 spgid->pgid = pgid; /* struct */ 2848 spgid->shard = CEPH_SPG_NOSHARD; 2849 return true; 2850 } 2851 2852 ceph_pg_to_up_acting_osds(osdmap, pi, &pgid, &up, &acting); 2853 for (i = 0; i < acting.size; i++) { 2854 if (acting.osds[i] == acting.primary) { 2855 spgid->pgid = pgid; /* struct */ 2856 spgid->shard = i; 2857 return true; 2858 } 2859 } 2860 2861 return false; 2862 } 2863 2864 /* 2865 * Return acting primary for given PG, or -1 if none. 2866 */ 2867 int ceph_pg_to_acting_primary(struct ceph_osdmap *osdmap, 2868 const struct ceph_pg *raw_pgid) 2869 { 2870 struct ceph_pg_pool_info *pi; 2871 struct ceph_osds up, acting; 2872 2873 pi = ceph_pg_pool_by_id(osdmap, raw_pgid->pool); 2874 if (!pi) 2875 return -1; 2876 2877 ceph_pg_to_up_acting_osds(osdmap, pi, raw_pgid, &up, &acting); 2878 return acting.primary; 2879 } 2880 EXPORT_SYMBOL(ceph_pg_to_acting_primary); 2881 2882 static struct crush_loc_node *alloc_crush_loc(size_t type_name_len, 2883 size_t name_len) 2884 { 2885 struct crush_loc_node *loc; 2886 2887 loc = kmalloc(sizeof(*loc) + type_name_len + name_len + 2, GFP_NOIO); 2888 if (!loc) 2889 return NULL; 2890 2891 RB_CLEAR_NODE(&loc->cl_node); 2892 return loc; 2893 } 2894 2895 static void free_crush_loc(struct crush_loc_node *loc) 2896 { 2897 WARN_ON(!RB_EMPTY_NODE(&loc->cl_node)); 2898 2899 kfree(loc); 2900 } 2901 2902 static int crush_loc_compare(const struct crush_loc *loc1, 2903 const struct crush_loc *loc2) 2904 { 2905 return strcmp(loc1->cl_type_name, loc2->cl_type_name) ?: 2906 strcmp(loc1->cl_name, loc2->cl_name); 2907 } 2908 2909 DEFINE_RB_FUNCS2(crush_loc, struct crush_loc_node, cl_loc, crush_loc_compare, 2910 RB_BYPTR, const struct crush_loc *, cl_node) 2911 2912 /* 2913 * Parses a set of <bucket type name>':'<bucket name> pairs separated 2914 * by '|', e.g. "rack:foo1|rack:foo2|datacenter:bar". 2915 * 2916 * Note that @crush_location is modified by strsep(). 2917 */ 2918 int ceph_parse_crush_location(char *crush_location, struct rb_root *locs) 2919 { 2920 struct crush_loc_node *loc; 2921 const char *type_name, *name, *colon; 2922 size_t type_name_len, name_len; 2923 2924 dout("%s '%s'\n", __func__, crush_location); 2925 while ((type_name = strsep(&crush_location, "|"))) { 2926 colon = strchr(type_name, ':'); 2927 if (!colon) 2928 return -EINVAL; 2929 2930 type_name_len = colon - type_name; 2931 if (type_name_len == 0) 2932 return -EINVAL; 2933 2934 name = colon + 1; 2935 name_len = strlen(name); 2936 if (name_len == 0) 2937 return -EINVAL; 2938 2939 loc = alloc_crush_loc(type_name_len, name_len); 2940 if (!loc) 2941 return -ENOMEM; 2942 2943 loc->cl_loc.cl_type_name = loc->cl_data; 2944 memcpy(loc->cl_loc.cl_type_name, type_name, type_name_len); 2945 loc->cl_loc.cl_type_name[type_name_len] = '\0'; 2946 2947 loc->cl_loc.cl_name = loc->cl_data + type_name_len + 1; 2948 memcpy(loc->cl_loc.cl_name, name, name_len); 2949 loc->cl_loc.cl_name[name_len] = '\0'; 2950 2951 if (!__insert_crush_loc(locs, loc)) { 2952 free_crush_loc(loc); 2953 return -EEXIST; 2954 } 2955 2956 dout("%s type_name '%s' name '%s'\n", __func__, 2957 loc->cl_loc.cl_type_name, loc->cl_loc.cl_name); 2958 } 2959 2960 return 0; 2961 } 2962 2963 int ceph_compare_crush_locs(struct rb_root *locs1, struct rb_root *locs2) 2964 { 2965 struct rb_node *n1 = rb_first(locs1); 2966 struct rb_node *n2 = rb_first(locs2); 2967 int ret; 2968 2969 for ( ; n1 && n2; n1 = rb_next(n1), n2 = rb_next(n2)) { 2970 struct crush_loc_node *loc1 = 2971 rb_entry(n1, struct crush_loc_node, cl_node); 2972 struct crush_loc_node *loc2 = 2973 rb_entry(n2, struct crush_loc_node, cl_node); 2974 2975 ret = crush_loc_compare(&loc1->cl_loc, &loc2->cl_loc); 2976 if (ret) 2977 return ret; 2978 } 2979 2980 if (!n1 && n2) 2981 return -1; 2982 if (n1 && !n2) 2983 return 1; 2984 return 0; 2985 } 2986 2987 void ceph_clear_crush_locs(struct rb_root *locs) 2988 { 2989 while (!RB_EMPTY_ROOT(locs)) { 2990 struct crush_loc_node *loc = 2991 rb_entry(rb_first(locs), struct crush_loc_node, cl_node); 2992 2993 erase_crush_loc(locs, loc); 2994 free_crush_loc(loc); 2995 } 2996 } 2997 2998 /* 2999 * [a-zA-Z0-9-_.]+ 3000 */ 3001 static bool is_valid_crush_name(const char *name) 3002 { 3003 do { 3004 if (!('a' <= *name && *name <= 'z') && 3005 !('A' <= *name && *name <= 'Z') && 3006 !('0' <= *name && *name <= '9') && 3007 *name != '-' && *name != '_' && *name != '.') 3008 return false; 3009 } while (*++name != '\0'); 3010 3011 return true; 3012 } 3013 3014 /* 3015 * Gets the parent of an item. Returns its id (<0 because the 3016 * parent is always a bucket), type id (>0 for the same reason, 3017 * via @parent_type_id) and location (via @parent_loc). If no 3018 * parent, returns 0. 3019 * 3020 * Does a linear search, as there are no parent pointers of any 3021 * kind. Note that the result is ambiguous for items that occur 3022 * multiple times in the map. 3023 */ 3024 static int get_immediate_parent(struct crush_map *c, int id, 3025 u16 *parent_type_id, 3026 struct crush_loc *parent_loc) 3027 { 3028 struct crush_bucket *b; 3029 struct crush_name_node *type_cn, *cn; 3030 int i, j; 3031 3032 for (i = 0; i < c->max_buckets; i++) { 3033 b = c->buckets[i]; 3034 if (!b) 3035 continue; 3036 3037 /* ignore per-class shadow hierarchy */ 3038 cn = lookup_crush_name(&c->names, b->id); 3039 if (!cn || !is_valid_crush_name(cn->cn_name)) 3040 continue; 3041 3042 for (j = 0; j < b->size; j++) { 3043 if (b->items[j] != id) 3044 continue; 3045 3046 *parent_type_id = b->type; 3047 type_cn = lookup_crush_name(&c->type_names, b->type); 3048 parent_loc->cl_type_name = type_cn->cn_name; 3049 parent_loc->cl_name = cn->cn_name; 3050 return b->id; 3051 } 3052 } 3053 3054 return 0; /* no parent */ 3055 } 3056 3057 /* 3058 * Calculates the locality/distance from an item to a client 3059 * location expressed in terms of CRUSH hierarchy as a set of 3060 * (bucket type name, bucket name) pairs. Specifically, looks 3061 * for the lowest-valued bucket type for which the location of 3062 * @id matches one of the locations in @locs, so for standard 3063 * bucket types (host = 1, rack = 3, datacenter = 8, zone = 9) 3064 * a matching host is closer than a matching rack and a matching 3065 * data center is closer than a matching zone. 3066 * 3067 * Specifying multiple locations (a "multipath" location) such 3068 * as "rack=foo1 rack=foo2 datacenter=bar" is allowed -- @locs 3069 * is a multimap. The locality will be: 3070 * 3071 * - 3 for OSDs in racks foo1 and foo2 3072 * - 8 for OSDs in data center bar 3073 * - -1 for all other OSDs 3074 * 3075 * The lowest possible bucket type is 1, so the best locality 3076 * for an OSD is 1 (i.e. a matching host). Locality 0 would be 3077 * the OSD itself. 3078 */ 3079 int ceph_get_crush_locality(struct ceph_osdmap *osdmap, int id, 3080 struct rb_root *locs) 3081 { 3082 struct crush_loc loc; 3083 u16 type_id; 3084 3085 /* 3086 * Instead of repeated get_immediate_parent() calls, 3087 * the location of @id could be obtained with a single 3088 * depth-first traversal. 3089 */ 3090 for (;;) { 3091 id = get_immediate_parent(osdmap->crush, id, &type_id, &loc); 3092 if (id >= 0) 3093 return -1; /* not local */ 3094 3095 if (lookup_crush_loc(locs, &loc)) 3096 return type_id; 3097 } 3098 } 3099