1 // SPDX-License-Identifier: GPL-2.0 2 3 #include <linux/ceph/ceph_debug.h> 4 5 #include <linux/module.h> 6 #include <linux/slab.h> 7 8 #include <linux/ceph/libceph.h> 9 #include <linux/ceph/osdmap.h> 10 #include <linux/ceph/decode.h> 11 #include <linux/crush/hash.h> 12 #include <linux/crush/mapper.h> 13 14 static __printf(2, 3) 15 void osdmap_info(const struct ceph_osdmap *map, const char *fmt, ...) 16 { 17 struct va_format vaf; 18 va_list args; 19 20 va_start(args, fmt); 21 vaf.fmt = fmt; 22 vaf.va = &args; 23 24 printk(KERN_INFO "%s (%pU e%u): %pV", KBUILD_MODNAME, &map->fsid, 25 map->epoch, &vaf); 26 27 va_end(args); 28 } 29 30 char *ceph_osdmap_state_str(char *str, int len, u32 state) 31 { 32 if (!len) 33 return str; 34 35 if ((state & CEPH_OSD_EXISTS) && (state & CEPH_OSD_UP)) 36 snprintf(str, len, "exists, up"); 37 else if (state & CEPH_OSD_EXISTS) 38 snprintf(str, len, "exists"); 39 else if (state & CEPH_OSD_UP) 40 snprintf(str, len, "up"); 41 else 42 snprintf(str, len, "doesn't exist"); 43 44 return str; 45 } 46 47 /* maps */ 48 49 static int calc_bits_of(unsigned int t) 50 { 51 int b = 0; 52 while (t) { 53 t = t >> 1; 54 b++; 55 } 56 return b; 57 } 58 59 /* 60 * the foo_mask is the smallest value 2^n-1 that is >= foo. 61 */ 62 static void calc_pg_masks(struct ceph_pg_pool_info *pi) 63 { 64 pi->pg_num_mask = (1 << calc_bits_of(pi->pg_num-1)) - 1; 65 pi->pgp_num_mask = (1 << calc_bits_of(pi->pgp_num-1)) - 1; 66 } 67 68 /* 69 * decode crush map 70 */ 71 static int crush_decode_uniform_bucket(void **p, void *end, 72 struct crush_bucket_uniform *b) 73 { 74 dout("crush_decode_uniform_bucket %p to %p\n", *p, end); 75 ceph_decode_need(p, end, (1+b->h.size) * sizeof(u32), bad); 76 b->item_weight = ceph_decode_32(p); 77 return 0; 78 bad: 79 return -EINVAL; 80 } 81 82 static int crush_decode_list_bucket(void **p, void *end, 83 struct crush_bucket_list *b) 84 { 85 int j; 86 dout("crush_decode_list_bucket %p to %p\n", *p, end); 87 b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS); 88 if (b->item_weights == NULL) 89 return -ENOMEM; 90 b->sum_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS); 91 if (b->sum_weights == NULL) 92 return -ENOMEM; 93 ceph_decode_need(p, end, 2 * b->h.size * sizeof(u32), bad); 94 for (j = 0; j < b->h.size; j++) { 95 b->item_weights[j] = ceph_decode_32(p); 96 b->sum_weights[j] = ceph_decode_32(p); 97 } 98 return 0; 99 bad: 100 return -EINVAL; 101 } 102 103 static int crush_decode_tree_bucket(void **p, void *end, 104 struct crush_bucket_tree *b) 105 { 106 int j; 107 dout("crush_decode_tree_bucket %p to %p\n", *p, end); 108 ceph_decode_8_safe(p, end, b->num_nodes, bad); 109 b->node_weights = kcalloc(b->num_nodes, sizeof(u32), GFP_NOFS); 110 if (b->node_weights == NULL) 111 return -ENOMEM; 112 ceph_decode_need(p, end, b->num_nodes * sizeof(u32), bad); 113 for (j = 0; j < b->num_nodes; j++) 114 b->node_weights[j] = ceph_decode_32(p); 115 return 0; 116 bad: 117 return -EINVAL; 118 } 119 120 static int crush_decode_straw_bucket(void **p, void *end, 121 struct crush_bucket_straw *b) 122 { 123 int j; 124 dout("crush_decode_straw_bucket %p to %p\n", *p, end); 125 b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS); 126 if (b->item_weights == NULL) 127 return -ENOMEM; 128 b->straws = kcalloc(b->h.size, sizeof(u32), GFP_NOFS); 129 if (b->straws == NULL) 130 return -ENOMEM; 131 ceph_decode_need(p, end, 2 * b->h.size * sizeof(u32), bad); 132 for (j = 0; j < b->h.size; j++) { 133 b->item_weights[j] = ceph_decode_32(p); 134 b->straws[j] = ceph_decode_32(p); 135 } 136 return 0; 137 bad: 138 return -EINVAL; 139 } 140 141 static int crush_decode_straw2_bucket(void **p, void *end, 142 struct crush_bucket_straw2 *b) 143 { 144 int j; 145 dout("crush_decode_straw2_bucket %p to %p\n", *p, end); 146 b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS); 147 if (b->item_weights == NULL) 148 return -ENOMEM; 149 ceph_decode_need(p, end, b->h.size * sizeof(u32), bad); 150 for (j = 0; j < b->h.size; j++) 151 b->item_weights[j] = ceph_decode_32(p); 152 return 0; 153 bad: 154 return -EINVAL; 155 } 156 157 struct crush_name_node { 158 struct rb_node cn_node; 159 int cn_id; 160 char cn_name[]; 161 }; 162 163 static struct crush_name_node *alloc_crush_name(size_t name_len) 164 { 165 struct crush_name_node *cn; 166 167 cn = kmalloc(sizeof(*cn) + name_len + 1, GFP_NOIO); 168 if (!cn) 169 return NULL; 170 171 RB_CLEAR_NODE(&cn->cn_node); 172 return cn; 173 } 174 175 static void free_crush_name(struct crush_name_node *cn) 176 { 177 WARN_ON(!RB_EMPTY_NODE(&cn->cn_node)); 178 179 kfree(cn); 180 } 181 182 DEFINE_RB_FUNCS(crush_name, struct crush_name_node, cn_id, cn_node) 183 184 static int decode_crush_names(void **p, void *end, struct rb_root *root) 185 { 186 u32 n; 187 188 ceph_decode_32_safe(p, end, n, e_inval); 189 while (n--) { 190 struct crush_name_node *cn; 191 int id; 192 u32 name_len; 193 194 ceph_decode_32_safe(p, end, id, e_inval); 195 ceph_decode_32_safe(p, end, name_len, e_inval); 196 ceph_decode_need(p, end, name_len, e_inval); 197 198 cn = alloc_crush_name(name_len); 199 if (!cn) 200 return -ENOMEM; 201 202 cn->cn_id = id; 203 memcpy(cn->cn_name, *p, name_len); 204 cn->cn_name[name_len] = '\0'; 205 *p += name_len; 206 207 if (!__insert_crush_name(root, cn)) { 208 free_crush_name(cn); 209 return -EEXIST; 210 } 211 } 212 213 return 0; 214 215 e_inval: 216 return -EINVAL; 217 } 218 219 void clear_crush_names(struct rb_root *root) 220 { 221 while (!RB_EMPTY_ROOT(root)) { 222 struct crush_name_node *cn = 223 rb_entry(rb_first(root), struct crush_name_node, cn_node); 224 225 erase_crush_name(root, cn); 226 free_crush_name(cn); 227 } 228 } 229 230 static struct crush_choose_arg_map *alloc_choose_arg_map(void) 231 { 232 struct crush_choose_arg_map *arg_map; 233 234 arg_map = kzalloc_obj(*arg_map, GFP_NOIO); 235 if (!arg_map) 236 return NULL; 237 238 RB_CLEAR_NODE(&arg_map->node); 239 return arg_map; 240 } 241 242 static void free_choose_arg_map(struct crush_choose_arg_map *arg_map) 243 { 244 int i, j; 245 246 if (!arg_map) 247 return; 248 249 WARN_ON(!RB_EMPTY_NODE(&arg_map->node)); 250 251 if (arg_map->args) { 252 for (i = 0; i < arg_map->size; i++) { 253 struct crush_choose_arg *arg = &arg_map->args[i]; 254 if (arg->weight_set) { 255 for (j = 0; j < arg->weight_set_size; j++) 256 kfree(arg->weight_set[j].weights); 257 kfree(arg->weight_set); 258 } 259 kfree(arg->ids); 260 } 261 kfree(arg_map->args); 262 } 263 kfree(arg_map); 264 } 265 266 DEFINE_RB_FUNCS(choose_arg_map, struct crush_choose_arg_map, choose_args_index, 267 node); 268 269 void clear_choose_args(struct crush_map *c) 270 { 271 while (!RB_EMPTY_ROOT(&c->choose_args)) { 272 struct crush_choose_arg_map *arg_map = 273 rb_entry(rb_first(&c->choose_args), 274 struct crush_choose_arg_map, node); 275 276 erase_choose_arg_map(&c->choose_args, arg_map); 277 free_choose_arg_map(arg_map); 278 } 279 } 280 281 static u32 *decode_array_32_alloc(void **p, void *end, u32 *plen) 282 { 283 u32 *a = NULL; 284 u32 len; 285 int ret; 286 287 ceph_decode_32_safe(p, end, len, e_inval); 288 if (len) { 289 u32 i; 290 291 a = kmalloc_array(len, sizeof(u32), GFP_NOIO); 292 if (!a) { 293 ret = -ENOMEM; 294 goto fail; 295 } 296 297 ceph_decode_need(p, end, len * sizeof(u32), e_inval); 298 for (i = 0; i < len; i++) 299 a[i] = ceph_decode_32(p); 300 } 301 302 *plen = len; 303 return a; 304 305 e_inval: 306 ret = -EINVAL; 307 fail: 308 kfree(a); 309 return ERR_PTR(ret); 310 } 311 312 /* 313 * Assumes @arg is zero-initialized. 314 */ 315 static int decode_choose_arg(void **p, void *end, struct crush_choose_arg *arg) 316 { 317 int ret; 318 319 ceph_decode_32_safe(p, end, arg->weight_set_size, e_inval); 320 if (arg->weight_set_size) { 321 u32 i; 322 323 arg->weight_set = kmalloc_objs(*arg->weight_set, 324 arg->weight_set_size, GFP_NOIO); 325 if (!arg->weight_set) 326 return -ENOMEM; 327 328 for (i = 0; i < arg->weight_set_size; i++) { 329 struct crush_weight_set *w = &arg->weight_set[i]; 330 331 w->weights = decode_array_32_alloc(p, end, &w->size); 332 if (IS_ERR(w->weights)) { 333 ret = PTR_ERR(w->weights); 334 w->weights = NULL; 335 return ret; 336 } 337 } 338 } 339 340 arg->ids = decode_array_32_alloc(p, end, &arg->ids_size); 341 if (IS_ERR(arg->ids)) { 342 ret = PTR_ERR(arg->ids); 343 arg->ids = NULL; 344 return ret; 345 } 346 347 return 0; 348 349 e_inval: 350 return -EINVAL; 351 } 352 353 static int decode_choose_args(void **p, void *end, struct crush_map *c) 354 { 355 struct crush_choose_arg_map *arg_map = NULL; 356 u32 num_choose_arg_maps, num_buckets; 357 int ret; 358 359 ceph_decode_32_safe(p, end, num_choose_arg_maps, e_inval); 360 while (num_choose_arg_maps--) { 361 arg_map = alloc_choose_arg_map(); 362 if (!arg_map) { 363 ret = -ENOMEM; 364 goto fail; 365 } 366 367 ceph_decode_64_safe(p, end, arg_map->choose_args_index, 368 e_inval); 369 arg_map->size = c->max_buckets; 370 arg_map->args = kzalloc_objs(*arg_map->args, arg_map->size, 371 GFP_NOIO); 372 if (!arg_map->args) { 373 ret = -ENOMEM; 374 goto fail; 375 } 376 377 ceph_decode_32_safe(p, end, num_buckets, e_inval); 378 while (num_buckets--) { 379 struct crush_choose_arg *arg; 380 u32 bucket_index; 381 382 ceph_decode_32_safe(p, end, bucket_index, e_inval); 383 if (bucket_index >= arg_map->size) 384 goto e_inval; 385 386 arg = &arg_map->args[bucket_index]; 387 ret = decode_choose_arg(p, end, arg); 388 if (ret) 389 goto fail; 390 391 if (arg->ids_size && 392 arg->ids_size != c->buckets[bucket_index]->size) 393 goto e_inval; 394 } 395 396 insert_choose_arg_map(&c->choose_args, arg_map); 397 } 398 399 return 0; 400 401 e_inval: 402 ret = -EINVAL; 403 fail: 404 free_choose_arg_map(arg_map); 405 return ret; 406 } 407 408 static void crush_finalize(struct crush_map *c) 409 { 410 __s32 b; 411 412 /* Space for the array of pointers to per-bucket workspace */ 413 c->working_size = sizeof(struct crush_work) + 414 c->max_buckets * sizeof(struct crush_work_bucket *); 415 416 for (b = 0; b < c->max_buckets; b++) { 417 if (!c->buckets[b]) 418 continue; 419 420 switch (c->buckets[b]->alg) { 421 default: 422 /* 423 * The base case, permutation variables and 424 * the pointer to the permutation array. 425 */ 426 c->working_size += sizeof(struct crush_work_bucket); 427 break; 428 } 429 /* Every bucket has a permutation array. */ 430 c->working_size += c->buckets[b]->size * sizeof(__u32); 431 } 432 } 433 434 static struct crush_map *crush_decode(void *pbyval, void *end) 435 { 436 struct crush_map *c; 437 int err; 438 int i, j; 439 void **p = &pbyval; 440 void *start = pbyval; 441 u32 magic; 442 443 dout("crush_decode %p to %p len %d\n", *p, end, (int)(end - *p)); 444 445 c = kzalloc_obj(*c, GFP_NOFS); 446 if (c == NULL) 447 return ERR_PTR(-ENOMEM); 448 449 c->type_names = RB_ROOT; 450 c->names = RB_ROOT; 451 c->choose_args = RB_ROOT; 452 453 /* set tunables to default values */ 454 c->choose_local_tries = 2; 455 c->choose_local_fallback_tries = 5; 456 c->choose_total_tries = 19; 457 c->chooseleaf_descend_once = 0; 458 459 ceph_decode_need(p, end, 4*sizeof(u32), bad); 460 magic = ceph_decode_32(p); 461 if (magic != CRUSH_MAGIC) { 462 pr_err("crush_decode magic %x != current %x\n", 463 (unsigned int)magic, (unsigned int)CRUSH_MAGIC); 464 goto bad; 465 } 466 c->max_buckets = ceph_decode_32(p); 467 c->max_rules = ceph_decode_32(p); 468 c->max_devices = ceph_decode_32(p); 469 470 c->buckets = kzalloc_objs(*c->buckets, c->max_buckets, GFP_NOFS); 471 if (c->buckets == NULL) 472 goto badmem; 473 c->rules = kzalloc_objs(*c->rules, c->max_rules, GFP_NOFS); 474 if (c->rules == NULL) 475 goto badmem; 476 477 /* buckets */ 478 for (i = 0; i < c->max_buckets; i++) { 479 int size = 0; 480 u32 alg; 481 struct crush_bucket *b; 482 483 ceph_decode_32_safe(p, end, alg, bad); 484 if (alg == 0) { 485 c->buckets[i] = NULL; 486 continue; 487 } 488 dout("crush_decode bucket %d off %x %p to %p\n", 489 i, (int)(*p-start), *p, end); 490 491 switch (alg) { 492 case CRUSH_BUCKET_UNIFORM: 493 size = sizeof(struct crush_bucket_uniform); 494 break; 495 case CRUSH_BUCKET_LIST: 496 size = sizeof(struct crush_bucket_list); 497 break; 498 case CRUSH_BUCKET_TREE: 499 size = sizeof(struct crush_bucket_tree); 500 break; 501 case CRUSH_BUCKET_STRAW: 502 size = sizeof(struct crush_bucket_straw); 503 break; 504 case CRUSH_BUCKET_STRAW2: 505 size = sizeof(struct crush_bucket_straw2); 506 break; 507 default: 508 goto bad; 509 } 510 BUG_ON(size == 0); 511 b = c->buckets[i] = kzalloc(size, GFP_NOFS); 512 if (b == NULL) 513 goto badmem; 514 515 ceph_decode_need(p, end, 4*sizeof(u32), bad); 516 b->id = ceph_decode_32(p); 517 b->type = ceph_decode_16(p); 518 b->alg = ceph_decode_8(p); 519 b->hash = ceph_decode_8(p); 520 b->weight = ceph_decode_32(p); 521 b->size = ceph_decode_32(p); 522 523 dout("crush_decode bucket size %d off %x %p to %p\n", 524 b->size, (int)(*p-start), *p, end); 525 526 b->items = kzalloc_objs(__s32, b->size, GFP_NOFS); 527 if (b->items == NULL) 528 goto badmem; 529 530 ceph_decode_need(p, end, b->size*sizeof(u32), bad); 531 for (j = 0; j < b->size; j++) 532 b->items[j] = ceph_decode_32(p); 533 534 switch (b->alg) { 535 case CRUSH_BUCKET_UNIFORM: 536 err = crush_decode_uniform_bucket(p, end, 537 (struct crush_bucket_uniform *)b); 538 if (err < 0) 539 goto fail; 540 break; 541 case CRUSH_BUCKET_LIST: 542 err = crush_decode_list_bucket(p, end, 543 (struct crush_bucket_list *)b); 544 if (err < 0) 545 goto fail; 546 break; 547 case CRUSH_BUCKET_TREE: 548 err = crush_decode_tree_bucket(p, end, 549 (struct crush_bucket_tree *)b); 550 if (err < 0) 551 goto fail; 552 break; 553 case CRUSH_BUCKET_STRAW: 554 err = crush_decode_straw_bucket(p, end, 555 (struct crush_bucket_straw *)b); 556 if (err < 0) 557 goto fail; 558 break; 559 case CRUSH_BUCKET_STRAW2: 560 err = crush_decode_straw2_bucket(p, end, 561 (struct crush_bucket_straw2 *)b); 562 if (err < 0) 563 goto fail; 564 break; 565 } 566 } 567 568 /* rules */ 569 dout("rule vec is %p\n", c->rules); 570 for (i = 0; i < c->max_rules; i++) { 571 u32 yes; 572 struct crush_rule *r; 573 574 ceph_decode_32_safe(p, end, yes, bad); 575 if (!yes) { 576 dout("crush_decode NO rule %d off %x %p to %p\n", 577 i, (int)(*p-start), *p, end); 578 c->rules[i] = NULL; 579 continue; 580 } 581 582 dout("crush_decode rule %d off %x %p to %p\n", 583 i, (int)(*p-start), *p, end); 584 585 /* len */ 586 ceph_decode_32_safe(p, end, yes, bad); 587 #if BITS_PER_LONG == 32 588 if (yes > (ULONG_MAX - sizeof(*r)) 589 / sizeof(struct crush_rule_step)) 590 goto bad; 591 #endif 592 r = kmalloc_flex(*r, steps, yes, GFP_NOFS); 593 if (r == NULL) 594 goto badmem; 595 dout(" rule %d is at %p\n", i, r); 596 c->rules[i] = r; 597 r->len = yes; 598 ceph_decode_copy_safe(p, end, &r->mask, 4, bad); /* 4 u8's */ 599 ceph_decode_need(p, end, r->len*3*sizeof(u32), bad); 600 for (j = 0; j < r->len; j++) { 601 r->steps[j].op = ceph_decode_32(p); 602 r->steps[j].arg1 = ceph_decode_32(p); 603 r->steps[j].arg2 = ceph_decode_32(p); 604 } 605 } 606 607 err = decode_crush_names(p, end, &c->type_names); 608 if (err) 609 goto fail; 610 611 err = decode_crush_names(p, end, &c->names); 612 if (err) 613 goto fail; 614 615 ceph_decode_skip_map(p, end, 32, string, bad); /* rule_name_map */ 616 617 /* tunables */ 618 ceph_decode_need(p, end, 3*sizeof(u32), done); 619 c->choose_local_tries = ceph_decode_32(p); 620 c->choose_local_fallback_tries = ceph_decode_32(p); 621 c->choose_total_tries = ceph_decode_32(p); 622 dout("crush decode tunable choose_local_tries = %d\n", 623 c->choose_local_tries); 624 dout("crush decode tunable choose_local_fallback_tries = %d\n", 625 c->choose_local_fallback_tries); 626 dout("crush decode tunable choose_total_tries = %d\n", 627 c->choose_total_tries); 628 629 ceph_decode_need(p, end, sizeof(u32), done); 630 c->chooseleaf_descend_once = ceph_decode_32(p); 631 dout("crush decode tunable chooseleaf_descend_once = %d\n", 632 c->chooseleaf_descend_once); 633 634 ceph_decode_need(p, end, sizeof(u8), done); 635 c->chooseleaf_vary_r = ceph_decode_8(p); 636 dout("crush decode tunable chooseleaf_vary_r = %d\n", 637 c->chooseleaf_vary_r); 638 639 /* skip straw_calc_version, allowed_bucket_algs */ 640 ceph_decode_need(p, end, sizeof(u8) + sizeof(u32), done); 641 *p += sizeof(u8) + sizeof(u32); 642 643 ceph_decode_need(p, end, sizeof(u8), done); 644 c->chooseleaf_stable = ceph_decode_8(p); 645 dout("crush decode tunable chooseleaf_stable = %d\n", 646 c->chooseleaf_stable); 647 648 if (*p != end) { 649 /* class_map */ 650 ceph_decode_skip_map(p, end, 32, 32, bad); 651 /* class_name */ 652 ceph_decode_skip_map(p, end, 32, string, bad); 653 /* class_bucket */ 654 ceph_decode_skip_map_of_map(p, end, 32, 32, 32, bad); 655 } 656 657 if (*p != end) { 658 err = decode_choose_args(p, end, c); 659 if (err) 660 goto fail; 661 } 662 663 done: 664 crush_finalize(c); 665 dout("crush_decode success\n"); 666 return c; 667 668 badmem: 669 err = -ENOMEM; 670 fail: 671 dout("crush_decode fail %d\n", err); 672 crush_destroy(c); 673 return ERR_PTR(err); 674 675 bad: 676 err = -EINVAL; 677 goto fail; 678 } 679 680 int ceph_pg_compare(const struct ceph_pg *lhs, const struct ceph_pg *rhs) 681 { 682 if (lhs->pool < rhs->pool) 683 return -1; 684 if (lhs->pool > rhs->pool) 685 return 1; 686 if (lhs->seed < rhs->seed) 687 return -1; 688 if (lhs->seed > rhs->seed) 689 return 1; 690 691 return 0; 692 } 693 694 int ceph_spg_compare(const struct ceph_spg *lhs, const struct ceph_spg *rhs) 695 { 696 int ret; 697 698 ret = ceph_pg_compare(&lhs->pgid, &rhs->pgid); 699 if (ret) 700 return ret; 701 702 if (lhs->shard < rhs->shard) 703 return -1; 704 if (lhs->shard > rhs->shard) 705 return 1; 706 707 return 0; 708 } 709 710 static struct ceph_pg_mapping *alloc_pg_mapping(size_t payload_len) 711 { 712 struct ceph_pg_mapping *pg; 713 714 pg = kmalloc(sizeof(*pg) + payload_len, GFP_NOIO); 715 if (!pg) 716 return NULL; 717 718 RB_CLEAR_NODE(&pg->node); 719 return pg; 720 } 721 722 static void free_pg_mapping(struct ceph_pg_mapping *pg) 723 { 724 WARN_ON(!RB_EMPTY_NODE(&pg->node)); 725 726 kfree(pg); 727 } 728 729 /* 730 * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid 731 * to a set of osds) and primary_temp (explicit primary setting) 732 */ 733 DEFINE_RB_FUNCS2(pg_mapping, struct ceph_pg_mapping, pgid, ceph_pg_compare, 734 RB_BYPTR, const struct ceph_pg *, node) 735 736 /* 737 * rbtree of pg pool info 738 */ 739 DEFINE_RB_FUNCS(pg_pool, struct ceph_pg_pool_info, id, node) 740 741 struct ceph_pg_pool_info *ceph_pg_pool_by_id(struct ceph_osdmap *map, u64 id) 742 { 743 return lookup_pg_pool(&map->pg_pools, id); 744 } 745 746 const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id) 747 { 748 struct ceph_pg_pool_info *pi; 749 750 if (id == CEPH_NOPOOL) 751 return NULL; 752 753 if (WARN_ON_ONCE(id > (u64) INT_MAX)) 754 return NULL; 755 756 pi = lookup_pg_pool(&map->pg_pools, id); 757 return pi ? pi->name : NULL; 758 } 759 EXPORT_SYMBOL(ceph_pg_pool_name_by_id); 760 761 int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name) 762 { 763 struct rb_node *rbp; 764 765 for (rbp = rb_first(&map->pg_pools); rbp; rbp = rb_next(rbp)) { 766 struct ceph_pg_pool_info *pi = 767 rb_entry(rbp, struct ceph_pg_pool_info, node); 768 if (pi->name && strcmp(pi->name, name) == 0) 769 return pi->id; 770 } 771 return -ENOENT; 772 } 773 EXPORT_SYMBOL(ceph_pg_poolid_by_name); 774 775 u64 ceph_pg_pool_flags(struct ceph_osdmap *map, u64 id) 776 { 777 struct ceph_pg_pool_info *pi; 778 779 pi = lookup_pg_pool(&map->pg_pools, id); 780 return pi ? pi->flags : 0; 781 } 782 EXPORT_SYMBOL(ceph_pg_pool_flags); 783 784 static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi) 785 { 786 erase_pg_pool(root, pi); 787 kfree(pi->name); 788 kfree(pi); 789 } 790 791 static int decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi) 792 { 793 u8 ev, cv; 794 unsigned len, num; 795 void *pool_end; 796 797 ceph_decode_need(p, end, 2 + 4, bad); 798 ev = ceph_decode_8(p); /* encoding version */ 799 cv = ceph_decode_8(p); /* compat version */ 800 if (ev < 5) { 801 pr_warn("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv); 802 return -EINVAL; 803 } 804 if (cv > 9) { 805 pr_warn("got v %d cv %d > 9 of ceph_pg_pool\n", ev, cv); 806 return -EINVAL; 807 } 808 len = ceph_decode_32(p); 809 ceph_decode_need(p, end, len, bad); 810 pool_end = *p + len; 811 812 ceph_decode_need(p, end, 4 + 4 + 4, bad); 813 pi->type = ceph_decode_8(p); 814 pi->size = ceph_decode_8(p); 815 pi->crush_ruleset = ceph_decode_8(p); 816 pi->object_hash = ceph_decode_8(p); 817 pi->pg_num = ceph_decode_32(p); 818 pi->pgp_num = ceph_decode_32(p); 819 820 /* lpg*, last_change, snap_seq, snap_epoch */ 821 ceph_decode_skip_n(p, end, 8 + 4 + 8 + 4, bad); 822 823 /* skip snaps */ 824 ceph_decode_32_safe(p, end, num, bad); 825 while (num--) { 826 /* snapid key, pool snap (with versions) */ 827 ceph_decode_skip_n(p, end, 8 + 2, bad); 828 ceph_decode_skip_string(p, end, bad); 829 } 830 831 /* removed_snaps */ 832 ceph_decode_skip_map(p, end, 64, 64, bad); 833 834 ceph_decode_need(p, end, 8 + 8 + 4, bad); 835 *p += 8; /* skip auid */ 836 pi->flags = ceph_decode_64(p); 837 *p += 4; /* skip crash_replay_interval */ 838 839 if (ev >= 7) 840 ceph_decode_8_safe(p, end, pi->min_size, bad); 841 else 842 pi->min_size = pi->size - pi->size / 2; 843 844 if (ev >= 8) 845 /* quota_max_* */ 846 ceph_decode_skip_n(p, end, 8 + 8, bad); 847 848 if (ev >= 9) { 849 /* tiers */ 850 ceph_decode_skip_set(p, end, 64, bad); 851 852 ceph_decode_need(p, end, 8 + 1 + 8 + 8, bad); 853 *p += 8; /* skip tier_of */ 854 *p += 1; /* skip cache_mode */ 855 pi->read_tier = ceph_decode_64(p); 856 pi->write_tier = ceph_decode_64(p); 857 } else { 858 pi->read_tier = -1; 859 pi->write_tier = -1; 860 } 861 862 if (ev >= 10) 863 /* properties */ 864 ceph_decode_skip_map(p, end, string, string, bad); 865 866 if (ev >= 11) { 867 /* hit_set_params (with versions) */ 868 ceph_decode_skip_n(p, end, 2, bad); 869 ceph_decode_skip_string(p, end, bad); 870 871 /* hit_set_period, hit_set_count */ 872 ceph_decode_skip_n(p, end, 4 + 4, bad); 873 } 874 875 if (ev >= 12) 876 /* stripe_width */ 877 ceph_decode_skip_32(p, end, bad); 878 879 if (ev >= 13) 880 /* target_max_*, cache_target_*, cache_min_* */ 881 ceph_decode_skip_n(p, end, 16 + 8 + 8, bad); 882 883 if (ev >= 14) 884 /* erasure_code_profile */ 885 ceph_decode_skip_string(p, end, bad); 886 887 /* 888 * last_force_op_resend_preluminous, will be overridden if the 889 * map was encoded with RESEND_ON_SPLIT 890 */ 891 if (ev >= 15) 892 ceph_decode_32_safe(p, end, pi->last_force_request_resend, bad); 893 else 894 pi->last_force_request_resend = 0; 895 896 if (ev >= 16) 897 /* min_read_recency_for_promote */ 898 ceph_decode_skip_32(p, end, bad); 899 900 if (ev >= 17) 901 /* expected_num_objects */ 902 ceph_decode_skip_64(p, end, bad); 903 904 if (ev >= 19) 905 /* cache_target_dirty_high_ratio_micro */ 906 ceph_decode_skip_32(p, end, bad); 907 908 if (ev >= 20) 909 /* min_write_recency_for_promote */ 910 ceph_decode_skip_32(p, end, bad); 911 912 if (ev >= 21) 913 /* use_gmt_hitset */ 914 ceph_decode_skip_8(p, end, bad); 915 916 if (ev >= 22) 917 /* fast_read */ 918 ceph_decode_skip_8(p, end, bad); 919 920 if (ev >= 23) 921 /* hit_set_grade_decay_rate, hit_set_search_last_n */ 922 ceph_decode_skip_n(p, end, 4 + 4, bad); 923 924 if (ev >= 24) { 925 /* opts (with versions) */ 926 ceph_decode_skip_n(p, end, 2, bad); 927 ceph_decode_skip_string(p, end, bad); 928 } 929 930 if (ev >= 25) 931 ceph_decode_32_safe(p, end, pi->last_force_request_resend, bad); 932 933 /* ignore the rest */ 934 935 *p = pool_end; 936 calc_pg_masks(pi); 937 return 0; 938 939 bad: 940 return -EINVAL; 941 } 942 943 static int decode_pool_names(void **p, void *end, struct ceph_osdmap *map) 944 { 945 struct ceph_pg_pool_info *pi; 946 u32 num, len; 947 u64 pool; 948 949 ceph_decode_32_safe(p, end, num, bad); 950 dout(" %d pool names\n", num); 951 while (num--) { 952 ceph_decode_64_safe(p, end, pool, bad); 953 ceph_decode_32_safe(p, end, len, bad); 954 dout(" pool %llu len %d\n", pool, len); 955 ceph_decode_need(p, end, len, bad); 956 pi = lookup_pg_pool(&map->pg_pools, pool); 957 if (pi) { 958 char *name = kstrndup(*p, len, GFP_NOFS); 959 960 if (!name) 961 return -ENOMEM; 962 kfree(pi->name); 963 pi->name = name; 964 dout(" name is %s\n", pi->name); 965 } 966 *p += len; 967 } 968 return 0; 969 970 bad: 971 return -EINVAL; 972 } 973 974 /* 975 * CRUSH workspaces 976 * 977 * workspace_manager framework borrowed from fs/btrfs/compression.c. 978 * Two simplifications: there is only one type of workspace and there 979 * is always at least one workspace. 980 */ 981 static struct crush_work *alloc_workspace(const struct crush_map *c) 982 { 983 struct crush_work *work; 984 size_t work_size; 985 986 WARN_ON(!c->working_size); 987 work_size = crush_work_size(c, CEPH_PG_MAX_SIZE); 988 dout("%s work_size %zu bytes\n", __func__, work_size); 989 990 work = kvmalloc(work_size, GFP_NOIO); 991 if (!work) 992 return NULL; 993 994 INIT_LIST_HEAD(&work->item); 995 crush_init_workspace(c, work); 996 return work; 997 } 998 999 static void free_workspace(struct crush_work *work) 1000 { 1001 WARN_ON(!list_empty(&work->item)); 1002 kvfree(work); 1003 } 1004 1005 static void init_workspace_manager(struct workspace_manager *wsm) 1006 { 1007 INIT_LIST_HEAD(&wsm->idle_ws); 1008 spin_lock_init(&wsm->ws_lock); 1009 atomic_set(&wsm->total_ws, 0); 1010 wsm->free_ws = 0; 1011 init_waitqueue_head(&wsm->ws_wait); 1012 } 1013 1014 static void add_initial_workspace(struct workspace_manager *wsm, 1015 struct crush_work *work) 1016 { 1017 WARN_ON(!list_empty(&wsm->idle_ws)); 1018 1019 list_add(&work->item, &wsm->idle_ws); 1020 atomic_set(&wsm->total_ws, 1); 1021 wsm->free_ws = 1; 1022 } 1023 1024 static void cleanup_workspace_manager(struct workspace_manager *wsm) 1025 { 1026 struct crush_work *work; 1027 1028 while (!list_empty(&wsm->idle_ws)) { 1029 work = list_first_entry(&wsm->idle_ws, struct crush_work, 1030 item); 1031 list_del_init(&work->item); 1032 free_workspace(work); 1033 } 1034 atomic_set(&wsm->total_ws, 0); 1035 wsm->free_ws = 0; 1036 } 1037 1038 /* 1039 * Finds an available workspace or allocates a new one. If it's not 1040 * possible to allocate a new one, waits until there is one. 1041 */ 1042 static struct crush_work *get_workspace(struct workspace_manager *wsm, 1043 const struct crush_map *c) 1044 { 1045 struct crush_work *work; 1046 int cpus = num_online_cpus(); 1047 1048 again: 1049 spin_lock(&wsm->ws_lock); 1050 if (!list_empty(&wsm->idle_ws)) { 1051 work = list_first_entry(&wsm->idle_ws, struct crush_work, 1052 item); 1053 list_del_init(&work->item); 1054 wsm->free_ws--; 1055 spin_unlock(&wsm->ws_lock); 1056 return work; 1057 1058 } 1059 if (atomic_read(&wsm->total_ws) > cpus) { 1060 DEFINE_WAIT(wait); 1061 1062 spin_unlock(&wsm->ws_lock); 1063 prepare_to_wait(&wsm->ws_wait, &wait, TASK_UNINTERRUPTIBLE); 1064 if (atomic_read(&wsm->total_ws) > cpus && !wsm->free_ws) 1065 schedule(); 1066 finish_wait(&wsm->ws_wait, &wait); 1067 goto again; 1068 } 1069 atomic_inc(&wsm->total_ws); 1070 spin_unlock(&wsm->ws_lock); 1071 1072 work = alloc_workspace(c); 1073 if (!work) { 1074 atomic_dec(&wsm->total_ws); 1075 wake_up(&wsm->ws_wait); 1076 1077 /* 1078 * Do not return the error but go back to waiting. We 1079 * have the initial workspace and the CRUSH computation 1080 * time is bounded so we will get it eventually. 1081 */ 1082 WARN_ON(atomic_read(&wsm->total_ws) < 1); 1083 goto again; 1084 } 1085 return work; 1086 } 1087 1088 /* 1089 * Puts a workspace back on the list or frees it if we have enough 1090 * idle ones sitting around. 1091 */ 1092 static void put_workspace(struct workspace_manager *wsm, 1093 struct crush_work *work) 1094 { 1095 spin_lock(&wsm->ws_lock); 1096 if (wsm->free_ws <= num_online_cpus()) { 1097 list_add(&work->item, &wsm->idle_ws); 1098 wsm->free_ws++; 1099 spin_unlock(&wsm->ws_lock); 1100 goto wake; 1101 } 1102 spin_unlock(&wsm->ws_lock); 1103 1104 free_workspace(work); 1105 atomic_dec(&wsm->total_ws); 1106 wake: 1107 if (wq_has_sleeper(&wsm->ws_wait)) 1108 wake_up(&wsm->ws_wait); 1109 } 1110 1111 /* 1112 * osd map 1113 */ 1114 struct ceph_osdmap *ceph_osdmap_alloc(void) 1115 { 1116 struct ceph_osdmap *map; 1117 1118 map = kzalloc_obj(*map, GFP_NOIO); 1119 if (!map) 1120 return NULL; 1121 1122 map->pg_pools = RB_ROOT; 1123 map->pool_max = -1; 1124 map->pg_temp = RB_ROOT; 1125 map->primary_temp = RB_ROOT; 1126 map->pg_upmap = RB_ROOT; 1127 map->pg_upmap_items = RB_ROOT; 1128 1129 init_workspace_manager(&map->crush_wsm); 1130 1131 return map; 1132 } 1133 1134 void ceph_osdmap_destroy(struct ceph_osdmap *map) 1135 { 1136 dout("osdmap_destroy %p\n", map); 1137 1138 if (map->crush) 1139 crush_destroy(map->crush); 1140 cleanup_workspace_manager(&map->crush_wsm); 1141 1142 while (!RB_EMPTY_ROOT(&map->pg_temp)) { 1143 struct ceph_pg_mapping *pg = 1144 rb_entry(rb_first(&map->pg_temp), 1145 struct ceph_pg_mapping, node); 1146 erase_pg_mapping(&map->pg_temp, pg); 1147 free_pg_mapping(pg); 1148 } 1149 while (!RB_EMPTY_ROOT(&map->primary_temp)) { 1150 struct ceph_pg_mapping *pg = 1151 rb_entry(rb_first(&map->primary_temp), 1152 struct ceph_pg_mapping, node); 1153 erase_pg_mapping(&map->primary_temp, pg); 1154 free_pg_mapping(pg); 1155 } 1156 while (!RB_EMPTY_ROOT(&map->pg_upmap)) { 1157 struct ceph_pg_mapping *pg = 1158 rb_entry(rb_first(&map->pg_upmap), 1159 struct ceph_pg_mapping, node); 1160 rb_erase(&pg->node, &map->pg_upmap); 1161 kfree(pg); 1162 } 1163 while (!RB_EMPTY_ROOT(&map->pg_upmap_items)) { 1164 struct ceph_pg_mapping *pg = 1165 rb_entry(rb_first(&map->pg_upmap_items), 1166 struct ceph_pg_mapping, node); 1167 rb_erase(&pg->node, &map->pg_upmap_items); 1168 kfree(pg); 1169 } 1170 while (!RB_EMPTY_ROOT(&map->pg_pools)) { 1171 struct ceph_pg_pool_info *pi = 1172 rb_entry(rb_first(&map->pg_pools), 1173 struct ceph_pg_pool_info, node); 1174 __remove_pg_pool(&map->pg_pools, pi); 1175 } 1176 kvfree(map->osd_state); 1177 kvfree(map->osd_weight); 1178 kvfree(map->osd_addr); 1179 kvfree(map->osd_primary_affinity); 1180 kfree(map); 1181 } 1182 1183 /* 1184 * Adjust max_osd value, (re)allocate arrays. 1185 * 1186 * The new elements are properly initialized. 1187 */ 1188 static int osdmap_set_max_osd(struct ceph_osdmap *map, u32 max) 1189 { 1190 u32 *state; 1191 u32 *weight; 1192 struct ceph_entity_addr *addr; 1193 u32 to_copy; 1194 int i; 1195 1196 dout("%s old %u new %u\n", __func__, map->max_osd, max); 1197 if (max == map->max_osd) 1198 return 0; 1199 1200 state = kvmalloc(array_size(max, sizeof(*state)), GFP_NOFS); 1201 weight = kvmalloc(array_size(max, sizeof(*weight)), GFP_NOFS); 1202 addr = kvmalloc(array_size(max, sizeof(*addr)), GFP_NOFS); 1203 if (!state || !weight || !addr) { 1204 kvfree(state); 1205 kvfree(weight); 1206 kvfree(addr); 1207 return -ENOMEM; 1208 } 1209 1210 to_copy = min(map->max_osd, max); 1211 if (map->osd_state) { 1212 memcpy(state, map->osd_state, to_copy * sizeof(*state)); 1213 memcpy(weight, map->osd_weight, to_copy * sizeof(*weight)); 1214 memcpy(addr, map->osd_addr, to_copy * sizeof(*addr)); 1215 kvfree(map->osd_state); 1216 kvfree(map->osd_weight); 1217 kvfree(map->osd_addr); 1218 } 1219 1220 map->osd_state = state; 1221 map->osd_weight = weight; 1222 map->osd_addr = addr; 1223 for (i = map->max_osd; i < max; i++) { 1224 map->osd_state[i] = 0; 1225 map->osd_weight[i] = CEPH_OSD_OUT; 1226 memset(map->osd_addr + i, 0, sizeof(*map->osd_addr)); 1227 } 1228 1229 if (map->osd_primary_affinity) { 1230 u32 *affinity; 1231 1232 affinity = kvmalloc(array_size(max, sizeof(*affinity)), 1233 GFP_NOFS); 1234 if (!affinity) 1235 return -ENOMEM; 1236 1237 memcpy(affinity, map->osd_primary_affinity, 1238 to_copy * sizeof(*affinity)); 1239 kvfree(map->osd_primary_affinity); 1240 1241 map->osd_primary_affinity = affinity; 1242 for (i = map->max_osd; i < max; i++) 1243 map->osd_primary_affinity[i] = 1244 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY; 1245 } 1246 1247 map->max_osd = max; 1248 1249 return 0; 1250 } 1251 1252 static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush) 1253 { 1254 struct crush_work *work; 1255 1256 if (IS_ERR(crush)) 1257 return PTR_ERR(crush); 1258 1259 work = alloc_workspace(crush); 1260 if (!work) { 1261 crush_destroy(crush); 1262 return -ENOMEM; 1263 } 1264 1265 if (map->crush) 1266 crush_destroy(map->crush); 1267 cleanup_workspace_manager(&map->crush_wsm); 1268 map->crush = crush; 1269 add_initial_workspace(&map->crush_wsm, work); 1270 return 0; 1271 } 1272 1273 #define OSDMAP_WRAPPER_COMPAT_VER 7 1274 #define OSDMAP_CLIENT_DATA_COMPAT_VER 1 1275 1276 /* 1277 * Return 0 or error. On success, *v is set to 0 for old (v6) osdmaps, 1278 * to struct_v of the client_data section for new (v7 and above) 1279 * osdmaps. 1280 */ 1281 static int get_osdmap_client_data_v(void **p, void *end, 1282 const char *prefix, u8 *v) 1283 { 1284 u8 struct_v; 1285 1286 ceph_decode_8_safe(p, end, struct_v, e_inval); 1287 if (struct_v >= 7) { 1288 u8 struct_compat; 1289 1290 ceph_decode_8_safe(p, end, struct_compat, e_inval); 1291 if (struct_compat > OSDMAP_WRAPPER_COMPAT_VER) { 1292 pr_warn("got v %d cv %d > %d of %s ceph_osdmap\n", 1293 struct_v, struct_compat, 1294 OSDMAP_WRAPPER_COMPAT_VER, prefix); 1295 return -EINVAL; 1296 } 1297 *p += 4; /* ignore wrapper struct_len */ 1298 1299 ceph_decode_8_safe(p, end, struct_v, e_inval); 1300 ceph_decode_8_safe(p, end, struct_compat, e_inval); 1301 if (struct_compat > OSDMAP_CLIENT_DATA_COMPAT_VER) { 1302 pr_warn("got v %d cv %d > %d of %s ceph_osdmap client data\n", 1303 struct_v, struct_compat, 1304 OSDMAP_CLIENT_DATA_COMPAT_VER, prefix); 1305 return -EINVAL; 1306 } 1307 *p += 4; /* ignore client data struct_len */ 1308 } else { 1309 u16 version; 1310 1311 *p -= 1; 1312 ceph_decode_16_safe(p, end, version, e_inval); 1313 if (version < 6) { 1314 pr_warn("got v %d < 6 of %s ceph_osdmap\n", 1315 version, prefix); 1316 return -EINVAL; 1317 } 1318 1319 /* old osdmap encoding */ 1320 struct_v = 0; 1321 } 1322 1323 *v = struct_v; 1324 return 0; 1325 1326 e_inval: 1327 return -EINVAL; 1328 } 1329 1330 static int __decode_pools(void **p, void *end, struct ceph_osdmap *map, 1331 bool incremental) 1332 { 1333 u32 n; 1334 1335 ceph_decode_32_safe(p, end, n, e_inval); 1336 while (n--) { 1337 struct ceph_pg_pool_info *pi; 1338 u64 pool; 1339 int ret; 1340 1341 ceph_decode_64_safe(p, end, pool, e_inval); 1342 1343 pi = lookup_pg_pool(&map->pg_pools, pool); 1344 if (!incremental || !pi) { 1345 pi = kzalloc_obj(*pi, GFP_NOFS); 1346 if (!pi) 1347 return -ENOMEM; 1348 1349 RB_CLEAR_NODE(&pi->node); 1350 pi->id = pool; 1351 1352 if (!__insert_pg_pool(&map->pg_pools, pi)) { 1353 kfree(pi); 1354 return -EEXIST; 1355 } 1356 } 1357 1358 ret = decode_pool(p, end, pi); 1359 if (ret) 1360 return ret; 1361 } 1362 1363 return 0; 1364 1365 e_inval: 1366 return -EINVAL; 1367 } 1368 1369 static int decode_pools(void **p, void *end, struct ceph_osdmap *map) 1370 { 1371 return __decode_pools(p, end, map, false); 1372 } 1373 1374 static int decode_new_pools(void **p, void *end, struct ceph_osdmap *map) 1375 { 1376 return __decode_pools(p, end, map, true); 1377 } 1378 1379 typedef struct ceph_pg_mapping *(*decode_mapping_fn_t)(void **, void *, bool); 1380 1381 static int decode_pg_mapping(void **p, void *end, struct rb_root *mapping_root, 1382 decode_mapping_fn_t fn, bool incremental) 1383 { 1384 u32 n; 1385 1386 WARN_ON(!incremental && !fn); 1387 1388 ceph_decode_32_safe(p, end, n, e_inval); 1389 while (n--) { 1390 struct ceph_pg_mapping *pg; 1391 struct ceph_pg pgid; 1392 int ret; 1393 1394 ret = ceph_decode_pgid(p, end, &pgid); 1395 if (ret) 1396 return ret; 1397 1398 pg = lookup_pg_mapping(mapping_root, &pgid); 1399 if (pg) { 1400 WARN_ON(!incremental); 1401 erase_pg_mapping(mapping_root, pg); 1402 free_pg_mapping(pg); 1403 } 1404 1405 if (fn) { 1406 pg = fn(p, end, incremental); 1407 if (IS_ERR(pg)) 1408 return PTR_ERR(pg); 1409 1410 if (pg) { 1411 pg->pgid = pgid; /* struct */ 1412 insert_pg_mapping(mapping_root, pg); 1413 } 1414 } 1415 } 1416 1417 return 0; 1418 1419 e_inval: 1420 return -EINVAL; 1421 } 1422 1423 static struct ceph_pg_mapping *__decode_pg_temp(void **p, void *end, 1424 bool incremental) 1425 { 1426 struct ceph_pg_mapping *pg; 1427 u32 len, i; 1428 1429 ceph_decode_32_safe(p, end, len, e_inval); 1430 if (len == 0 && incremental) 1431 return NULL; /* new_pg_temp: [] to remove */ 1432 if ((size_t)len > (SIZE_MAX - sizeof(*pg)) / sizeof(u32)) 1433 return ERR_PTR(-EINVAL); 1434 1435 ceph_decode_need(p, end, len * sizeof(u32), e_inval); 1436 pg = alloc_pg_mapping(len * sizeof(u32)); 1437 if (!pg) 1438 return ERR_PTR(-ENOMEM); 1439 1440 pg->pg_temp.len = len; 1441 for (i = 0; i < len; i++) 1442 pg->pg_temp.osds[i] = ceph_decode_32(p); 1443 1444 return pg; 1445 1446 e_inval: 1447 return ERR_PTR(-EINVAL); 1448 } 1449 1450 static int decode_pg_temp(void **p, void *end, struct ceph_osdmap *map) 1451 { 1452 return decode_pg_mapping(p, end, &map->pg_temp, __decode_pg_temp, 1453 false); 1454 } 1455 1456 static int decode_new_pg_temp(void **p, void *end, struct ceph_osdmap *map) 1457 { 1458 return decode_pg_mapping(p, end, &map->pg_temp, __decode_pg_temp, 1459 true); 1460 } 1461 1462 static struct ceph_pg_mapping *__decode_primary_temp(void **p, void *end, 1463 bool incremental) 1464 { 1465 struct ceph_pg_mapping *pg; 1466 u32 osd; 1467 1468 ceph_decode_32_safe(p, end, osd, e_inval); 1469 if (osd == (u32)-1 && incremental) 1470 return NULL; /* new_primary_temp: -1 to remove */ 1471 1472 pg = alloc_pg_mapping(0); 1473 if (!pg) 1474 return ERR_PTR(-ENOMEM); 1475 1476 pg->primary_temp.osd = osd; 1477 return pg; 1478 1479 e_inval: 1480 return ERR_PTR(-EINVAL); 1481 } 1482 1483 static int decode_primary_temp(void **p, void *end, struct ceph_osdmap *map) 1484 { 1485 return decode_pg_mapping(p, end, &map->primary_temp, 1486 __decode_primary_temp, false); 1487 } 1488 1489 static int decode_new_primary_temp(void **p, void *end, 1490 struct ceph_osdmap *map) 1491 { 1492 return decode_pg_mapping(p, end, &map->primary_temp, 1493 __decode_primary_temp, true); 1494 } 1495 1496 u32 ceph_get_primary_affinity(struct ceph_osdmap *map, int osd) 1497 { 1498 if (!map->osd_primary_affinity) 1499 return CEPH_OSD_DEFAULT_PRIMARY_AFFINITY; 1500 1501 return map->osd_primary_affinity[osd]; 1502 } 1503 1504 static int set_primary_affinity(struct ceph_osdmap *map, int osd, u32 aff) 1505 { 1506 if (!map->osd_primary_affinity) { 1507 int i; 1508 1509 map->osd_primary_affinity = kvmalloc( 1510 array_size(map->max_osd, sizeof(*map->osd_primary_affinity)), 1511 GFP_NOFS); 1512 if (!map->osd_primary_affinity) 1513 return -ENOMEM; 1514 1515 for (i = 0; i < map->max_osd; i++) 1516 map->osd_primary_affinity[i] = 1517 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY; 1518 } 1519 1520 map->osd_primary_affinity[osd] = aff; 1521 1522 return 0; 1523 } 1524 1525 static int decode_primary_affinity(void **p, void *end, 1526 struct ceph_osdmap *map) 1527 { 1528 u32 len, i; 1529 1530 ceph_decode_32_safe(p, end, len, e_inval); 1531 if (len == 0) { 1532 kvfree(map->osd_primary_affinity); 1533 map->osd_primary_affinity = NULL; 1534 return 0; 1535 } 1536 if (len != map->max_osd) 1537 goto e_inval; 1538 1539 ceph_decode_need(p, end, map->max_osd*sizeof(u32), e_inval); 1540 1541 for (i = 0; i < map->max_osd; i++) { 1542 int ret; 1543 1544 ret = set_primary_affinity(map, i, ceph_decode_32(p)); 1545 if (ret) 1546 return ret; 1547 } 1548 1549 return 0; 1550 1551 e_inval: 1552 return -EINVAL; 1553 } 1554 1555 static int decode_new_primary_affinity(void **p, void *end, 1556 struct ceph_osdmap *map) 1557 { 1558 u32 n; 1559 1560 ceph_decode_32_safe(p, end, n, e_inval); 1561 while (n--) { 1562 u32 osd, aff; 1563 int ret; 1564 1565 ceph_decode_32_safe(p, end, osd, e_inval); 1566 ceph_decode_32_safe(p, end, aff, e_inval); 1567 if (osd >= map->max_osd) 1568 goto e_inval; 1569 1570 ret = set_primary_affinity(map, osd, aff); 1571 if (ret) 1572 return ret; 1573 1574 osdmap_info(map, "osd%d primary-affinity 0x%x\n", osd, aff); 1575 } 1576 1577 return 0; 1578 1579 e_inval: 1580 return -EINVAL; 1581 } 1582 1583 static struct ceph_pg_mapping *__decode_pg_upmap(void **p, void *end, 1584 bool __unused) 1585 { 1586 return __decode_pg_temp(p, end, false); 1587 } 1588 1589 static int decode_pg_upmap(void **p, void *end, struct ceph_osdmap *map) 1590 { 1591 return decode_pg_mapping(p, end, &map->pg_upmap, __decode_pg_upmap, 1592 false); 1593 } 1594 1595 static int decode_new_pg_upmap(void **p, void *end, struct ceph_osdmap *map) 1596 { 1597 return decode_pg_mapping(p, end, &map->pg_upmap, __decode_pg_upmap, 1598 true); 1599 } 1600 1601 static int decode_old_pg_upmap(void **p, void *end, struct ceph_osdmap *map) 1602 { 1603 return decode_pg_mapping(p, end, &map->pg_upmap, NULL, true); 1604 } 1605 1606 static struct ceph_pg_mapping *__decode_pg_upmap_items(void **p, void *end, 1607 bool __unused) 1608 { 1609 struct ceph_pg_mapping *pg; 1610 u32 len, i; 1611 1612 ceph_decode_32_safe(p, end, len, e_inval); 1613 if ((size_t)len > (SIZE_MAX - sizeof(*pg)) / (2 * sizeof(u32))) 1614 return ERR_PTR(-EINVAL); 1615 1616 ceph_decode_need(p, end, 2 * len * sizeof(u32), e_inval); 1617 pg = alloc_pg_mapping(2 * len * sizeof(u32)); 1618 if (!pg) 1619 return ERR_PTR(-ENOMEM); 1620 1621 pg->pg_upmap_items.len = len; 1622 for (i = 0; i < len; i++) { 1623 pg->pg_upmap_items.from_to[i][0] = ceph_decode_32(p); 1624 pg->pg_upmap_items.from_to[i][1] = ceph_decode_32(p); 1625 } 1626 1627 return pg; 1628 1629 e_inval: 1630 return ERR_PTR(-EINVAL); 1631 } 1632 1633 static int decode_pg_upmap_items(void **p, void *end, struct ceph_osdmap *map) 1634 { 1635 return decode_pg_mapping(p, end, &map->pg_upmap_items, 1636 __decode_pg_upmap_items, false); 1637 } 1638 1639 static int decode_new_pg_upmap_items(void **p, void *end, 1640 struct ceph_osdmap *map) 1641 { 1642 return decode_pg_mapping(p, end, &map->pg_upmap_items, 1643 __decode_pg_upmap_items, true); 1644 } 1645 1646 static int decode_old_pg_upmap_items(void **p, void *end, 1647 struct ceph_osdmap *map) 1648 { 1649 return decode_pg_mapping(p, end, &map->pg_upmap_items, NULL, true); 1650 } 1651 1652 /* 1653 * decode a full map. 1654 */ 1655 static int osdmap_decode(void **p, void *end, bool msgr2, 1656 struct ceph_osdmap *map) 1657 { 1658 u8 struct_v; 1659 u32 epoch = 0; 1660 void *start = *p; 1661 u32 max; 1662 u32 len, i; 1663 int err; 1664 1665 dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p)); 1666 1667 err = get_osdmap_client_data_v(p, end, "full", &struct_v); 1668 if (err) 1669 goto bad; 1670 1671 /* fsid, epoch, created, modified */ 1672 ceph_decode_need(p, end, sizeof(map->fsid) + sizeof(u32) + 1673 sizeof(map->created) + sizeof(map->modified), e_inval); 1674 ceph_decode_copy(p, &map->fsid, sizeof(map->fsid)); 1675 epoch = map->epoch = ceph_decode_32(p); 1676 ceph_decode_copy(p, &map->created, sizeof(map->created)); 1677 ceph_decode_copy(p, &map->modified, sizeof(map->modified)); 1678 1679 /* pools */ 1680 err = decode_pools(p, end, map); 1681 if (err) 1682 goto bad; 1683 1684 /* pool_name */ 1685 err = decode_pool_names(p, end, map); 1686 if (err) 1687 goto bad; 1688 1689 ceph_decode_32_safe(p, end, map->pool_max, e_inval); 1690 1691 ceph_decode_32_safe(p, end, map->flags, e_inval); 1692 1693 /* max_osd */ 1694 ceph_decode_32_safe(p, end, max, e_inval); 1695 1696 /* (re)alloc osd arrays */ 1697 err = osdmap_set_max_osd(map, max); 1698 if (err) 1699 goto bad; 1700 1701 /* osd_state, osd_weight, osd_addrs->client_addr */ 1702 ceph_decode_need(p, end, 3*sizeof(u32) + 1703 map->max_osd*(struct_v >= 5 ? sizeof(u32) : 1704 sizeof(u8)) + 1705 sizeof(*map->osd_weight), e_inval); 1706 if (ceph_decode_32(p) != map->max_osd) 1707 goto e_inval; 1708 1709 if (struct_v >= 5) { 1710 for (i = 0; i < map->max_osd; i++) 1711 map->osd_state[i] = ceph_decode_32(p); 1712 } else { 1713 for (i = 0; i < map->max_osd; i++) 1714 map->osd_state[i] = ceph_decode_8(p); 1715 } 1716 1717 if (ceph_decode_32(p) != map->max_osd) 1718 goto e_inval; 1719 1720 for (i = 0; i < map->max_osd; i++) 1721 map->osd_weight[i] = ceph_decode_32(p); 1722 1723 if (ceph_decode_32(p) != map->max_osd) 1724 goto e_inval; 1725 1726 for (i = 0; i < map->max_osd; i++) { 1727 struct ceph_entity_addr *addr = &map->osd_addr[i]; 1728 1729 if (struct_v >= 8) 1730 err = ceph_decode_entity_addrvec(p, end, msgr2, addr); 1731 else 1732 err = ceph_decode_entity_addr(p, end, addr); 1733 if (err) 1734 goto bad; 1735 1736 dout("%s osd%d addr %s\n", __func__, i, ceph_pr_addr(addr)); 1737 } 1738 1739 /* pg_temp */ 1740 err = decode_pg_temp(p, end, map); 1741 if (err) 1742 goto bad; 1743 1744 /* primary_temp */ 1745 if (struct_v >= 1) { 1746 err = decode_primary_temp(p, end, map); 1747 if (err) 1748 goto bad; 1749 } 1750 1751 /* primary_affinity */ 1752 if (struct_v >= 2) { 1753 err = decode_primary_affinity(p, end, map); 1754 if (err) 1755 goto bad; 1756 } else { 1757 WARN_ON(map->osd_primary_affinity); 1758 } 1759 1760 /* crush */ 1761 ceph_decode_32_safe(p, end, len, e_inval); 1762 err = osdmap_set_crush(map, crush_decode(*p, min(*p + len, end))); 1763 if (err) 1764 goto bad; 1765 1766 *p += len; 1767 if (struct_v >= 3) { 1768 /* erasure_code_profiles */ 1769 ceph_decode_skip_map_of_map(p, end, string, string, string, 1770 e_inval); 1771 } 1772 1773 if (struct_v >= 4) { 1774 err = decode_pg_upmap(p, end, map); 1775 if (err) 1776 goto bad; 1777 1778 err = decode_pg_upmap_items(p, end, map); 1779 if (err) 1780 goto bad; 1781 } else { 1782 WARN_ON(!RB_EMPTY_ROOT(&map->pg_upmap)); 1783 WARN_ON(!RB_EMPTY_ROOT(&map->pg_upmap_items)); 1784 } 1785 1786 /* ignore the rest */ 1787 *p = end; 1788 1789 dout("full osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd); 1790 return 0; 1791 1792 e_inval: 1793 err = -EINVAL; 1794 bad: 1795 pr_err("corrupt full osdmap (%d) epoch %d off %d (%p of %p-%p)\n", 1796 err, epoch, (int)(*p - start), *p, start, end); 1797 print_hex_dump(KERN_DEBUG, "osdmap: ", 1798 DUMP_PREFIX_OFFSET, 16, 1, 1799 start, end - start, true); 1800 return err; 1801 } 1802 1803 /* 1804 * Allocate and decode a full map. 1805 */ 1806 struct ceph_osdmap *ceph_osdmap_decode(void **p, void *end, bool msgr2) 1807 { 1808 struct ceph_osdmap *map; 1809 int ret; 1810 1811 map = ceph_osdmap_alloc(); 1812 if (!map) 1813 return ERR_PTR(-ENOMEM); 1814 1815 ret = osdmap_decode(p, end, msgr2, map); 1816 if (ret) { 1817 ceph_osdmap_destroy(map); 1818 return ERR_PTR(ret); 1819 } 1820 1821 return map; 1822 } 1823 1824 /* 1825 * Encoding order is (new_up_client, new_state, new_weight). Need to 1826 * apply in the (new_weight, new_state, new_up_client) order, because 1827 * an incremental map may look like e.g. 1828 * 1829 * new_up_client: { osd=6, addr=... } # set osd_state and addr 1830 * new_state: { osd=6, xorstate=EXISTS } # clear osd_state 1831 */ 1832 static int decode_new_up_state_weight(void **p, void *end, u8 struct_v, 1833 bool msgr2, struct ceph_osdmap *map) 1834 { 1835 void *new_up_client; 1836 void *new_state; 1837 void *new_weight_end; 1838 u32 len; 1839 int ret; 1840 int i; 1841 1842 new_up_client = *p; 1843 ceph_decode_32_safe(p, end, len, e_inval); 1844 for (i = 0; i < len; ++i) { 1845 struct ceph_entity_addr addr; 1846 1847 ceph_decode_skip_32(p, end, e_inval); 1848 if (struct_v >= 7) 1849 ret = ceph_decode_entity_addrvec(p, end, msgr2, &addr); 1850 else 1851 ret = ceph_decode_entity_addr(p, end, &addr); 1852 if (ret) 1853 return ret; 1854 } 1855 1856 new_state = *p; 1857 ceph_decode_32_safe(p, end, len, e_inval); 1858 len *= sizeof(u32) + (struct_v >= 5 ? sizeof(u32) : sizeof(u8)); 1859 ceph_decode_need(p, end, len, e_inval); 1860 *p += len; 1861 1862 /* new_weight */ 1863 ceph_decode_32_safe(p, end, len, e_inval); 1864 while (len--) { 1865 s32 osd; 1866 u32 w; 1867 1868 ceph_decode_need(p, end, 2*sizeof(u32), e_inval); 1869 osd = ceph_decode_32(p); 1870 w = ceph_decode_32(p); 1871 if (osd >= map->max_osd) 1872 goto e_inval; 1873 1874 osdmap_info(map, "osd%d weight 0x%x %s\n", osd, w, 1875 w == CEPH_OSD_IN ? "(in)" : 1876 (w == CEPH_OSD_OUT ? "(out)" : "")); 1877 map->osd_weight[osd] = w; 1878 1879 /* 1880 * If we are marking in, set the EXISTS, and clear the 1881 * AUTOOUT and NEW bits. 1882 */ 1883 if (w) { 1884 map->osd_state[osd] |= CEPH_OSD_EXISTS; 1885 map->osd_state[osd] &= ~(CEPH_OSD_AUTOOUT | 1886 CEPH_OSD_NEW); 1887 } 1888 } 1889 new_weight_end = *p; 1890 1891 /* new_state (up/down) */ 1892 *p = new_state; 1893 len = ceph_decode_32(p); 1894 while (len--) { 1895 s32 osd; 1896 u32 xorstate; 1897 1898 osd = ceph_decode_32(p); 1899 if (osd >= map->max_osd) 1900 goto e_inval; 1901 1902 if (struct_v >= 5) 1903 xorstate = ceph_decode_32(p); 1904 else 1905 xorstate = ceph_decode_8(p); 1906 if (xorstate == 0) 1907 xorstate = CEPH_OSD_UP; 1908 if ((map->osd_state[osd] & CEPH_OSD_UP) && 1909 (xorstate & CEPH_OSD_UP)) 1910 osdmap_info(map, "osd%d down\n", osd); 1911 if ((map->osd_state[osd] & CEPH_OSD_EXISTS) && 1912 (xorstate & CEPH_OSD_EXISTS)) { 1913 osdmap_info(map, "osd%d does not exist\n", osd); 1914 ret = set_primary_affinity(map, osd, 1915 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY); 1916 if (ret) 1917 return ret; 1918 memset(map->osd_addr + osd, 0, sizeof(*map->osd_addr)); 1919 map->osd_state[osd] = 0; 1920 } else { 1921 map->osd_state[osd] ^= xorstate; 1922 } 1923 } 1924 1925 /* new_up_client */ 1926 *p = new_up_client; 1927 len = ceph_decode_32(p); 1928 while (len--) { 1929 s32 osd; 1930 struct ceph_entity_addr addr; 1931 1932 osd = ceph_decode_32(p); 1933 if (osd >= map->max_osd) 1934 goto e_inval; 1935 1936 if (struct_v >= 7) 1937 ret = ceph_decode_entity_addrvec(p, end, msgr2, &addr); 1938 else 1939 ret = ceph_decode_entity_addr(p, end, &addr); 1940 if (ret) 1941 return ret; 1942 1943 dout("%s osd%d addr %s\n", __func__, osd, ceph_pr_addr(&addr)); 1944 1945 osdmap_info(map, "osd%d up\n", osd); 1946 map->osd_state[osd] |= CEPH_OSD_EXISTS | CEPH_OSD_UP; 1947 map->osd_addr[osd] = addr; 1948 } 1949 1950 *p = new_weight_end; 1951 return 0; 1952 1953 e_inval: 1954 return -EINVAL; 1955 } 1956 1957 /* 1958 * decode and apply an incremental map update. 1959 */ 1960 struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, bool msgr2, 1961 struct ceph_osdmap *map) 1962 { 1963 struct ceph_fsid fsid; 1964 u32 epoch = 0; 1965 struct ceph_timespec modified; 1966 s32 len; 1967 u64 pool; 1968 __s64 new_pool_max; 1969 __s32 new_flags, max; 1970 void *start = *p; 1971 int err; 1972 u8 struct_v; 1973 1974 dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p)); 1975 1976 err = get_osdmap_client_data_v(p, end, "inc", &struct_v); 1977 if (err) 1978 goto bad; 1979 1980 /* fsid, epoch, modified, new_pool_max, new_flags */ 1981 ceph_decode_need(p, end, sizeof(fsid) + sizeof(u32) + sizeof(modified) + 1982 sizeof(u64) + sizeof(u32), e_inval); 1983 ceph_decode_copy(p, &fsid, sizeof(fsid)); 1984 epoch = ceph_decode_32(p); 1985 ceph_decode_copy(p, &modified, sizeof(modified)); 1986 new_pool_max = ceph_decode_64(p); 1987 new_flags = ceph_decode_32(p); 1988 1989 if (epoch != map->epoch + 1) 1990 goto e_inval; 1991 1992 /* full map? */ 1993 ceph_decode_32_safe(p, end, len, e_inval); 1994 if (len > 0) { 1995 dout("apply_incremental full map len %d, %p to %p\n", 1996 len, *p, end); 1997 return ceph_osdmap_decode(p, min(*p+len, end), msgr2); 1998 } 1999 2000 /* new crush? */ 2001 ceph_decode_32_safe(p, end, len, e_inval); 2002 if (len > 0) { 2003 err = osdmap_set_crush(map, 2004 crush_decode(*p, min(*p + len, end))); 2005 if (err) 2006 goto bad; 2007 *p += len; 2008 } 2009 2010 /* new flags? */ 2011 if (new_flags >= 0) 2012 map->flags = new_flags; 2013 if (new_pool_max >= 0) 2014 map->pool_max = new_pool_max; 2015 2016 /* new max? */ 2017 ceph_decode_32_safe(p, end, max, e_inval); 2018 if (max >= 0) { 2019 err = osdmap_set_max_osd(map, max); 2020 if (err) 2021 goto bad; 2022 } 2023 2024 map->epoch++; 2025 map->modified = modified; 2026 2027 /* new_pools */ 2028 err = decode_new_pools(p, end, map); 2029 if (err) 2030 goto bad; 2031 2032 /* new_pool_names */ 2033 err = decode_pool_names(p, end, map); 2034 if (err) 2035 goto bad; 2036 2037 /* old_pool */ 2038 ceph_decode_32_safe(p, end, len, e_inval); 2039 while (len--) { 2040 struct ceph_pg_pool_info *pi; 2041 2042 ceph_decode_64_safe(p, end, pool, e_inval); 2043 pi = lookup_pg_pool(&map->pg_pools, pool); 2044 if (pi) 2045 __remove_pg_pool(&map->pg_pools, pi); 2046 } 2047 2048 /* new_up_client, new_state, new_weight */ 2049 err = decode_new_up_state_weight(p, end, struct_v, msgr2, map); 2050 if (err) 2051 goto bad; 2052 2053 /* new_pg_temp */ 2054 err = decode_new_pg_temp(p, end, map); 2055 if (err) 2056 goto bad; 2057 2058 /* new_primary_temp */ 2059 if (struct_v >= 1) { 2060 err = decode_new_primary_temp(p, end, map); 2061 if (err) 2062 goto bad; 2063 } 2064 2065 /* new_primary_affinity */ 2066 if (struct_v >= 2) { 2067 err = decode_new_primary_affinity(p, end, map); 2068 if (err) 2069 goto bad; 2070 } 2071 2072 if (struct_v >= 3) { 2073 /* new_erasure_code_profiles */ 2074 ceph_decode_skip_map_of_map(p, end, string, string, string, 2075 e_inval); 2076 /* old_erasure_code_profiles */ 2077 ceph_decode_skip_set(p, end, string, e_inval); 2078 } 2079 2080 if (struct_v >= 4) { 2081 err = decode_new_pg_upmap(p, end, map); 2082 if (err) 2083 goto bad; 2084 2085 err = decode_old_pg_upmap(p, end, map); 2086 if (err) 2087 goto bad; 2088 2089 err = decode_new_pg_upmap_items(p, end, map); 2090 if (err) 2091 goto bad; 2092 2093 err = decode_old_pg_upmap_items(p, end, map); 2094 if (err) 2095 goto bad; 2096 } 2097 2098 /* ignore the rest */ 2099 *p = end; 2100 2101 dout("inc osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd); 2102 return map; 2103 2104 e_inval: 2105 err = -EINVAL; 2106 bad: 2107 pr_err("corrupt inc osdmap (%d) epoch %d off %d (%p of %p-%p)\n", 2108 err, epoch, (int)(*p - start), *p, start, end); 2109 print_hex_dump(KERN_DEBUG, "osdmap: ", 2110 DUMP_PREFIX_OFFSET, 16, 1, 2111 start, end - start, true); 2112 return ERR_PTR(err); 2113 } 2114 2115 void ceph_oloc_copy(struct ceph_object_locator *dest, 2116 const struct ceph_object_locator *src) 2117 { 2118 ceph_oloc_destroy(dest); 2119 2120 dest->pool = src->pool; 2121 if (src->pool_ns) 2122 dest->pool_ns = ceph_get_string(src->pool_ns); 2123 else 2124 dest->pool_ns = NULL; 2125 } 2126 EXPORT_SYMBOL(ceph_oloc_copy); 2127 2128 void ceph_oloc_destroy(struct ceph_object_locator *oloc) 2129 { 2130 ceph_put_string(oloc->pool_ns); 2131 } 2132 EXPORT_SYMBOL(ceph_oloc_destroy); 2133 2134 void ceph_oid_copy(struct ceph_object_id *dest, 2135 const struct ceph_object_id *src) 2136 { 2137 ceph_oid_destroy(dest); 2138 2139 if (src->name != src->inline_name) { 2140 /* very rare, see ceph_object_id definition */ 2141 dest->name = kmalloc(src->name_len + 1, 2142 GFP_NOIO | __GFP_NOFAIL); 2143 } else { 2144 dest->name = dest->inline_name; 2145 } 2146 memcpy(dest->name, src->name, src->name_len + 1); 2147 dest->name_len = src->name_len; 2148 } 2149 EXPORT_SYMBOL(ceph_oid_copy); 2150 2151 static __printf(2, 0) 2152 int oid_printf_vargs(struct ceph_object_id *oid, const char *fmt, va_list ap) 2153 { 2154 int len; 2155 2156 WARN_ON(!ceph_oid_empty(oid)); 2157 2158 len = vsnprintf(oid->inline_name, sizeof(oid->inline_name), fmt, ap); 2159 if (len >= sizeof(oid->inline_name)) 2160 return len; 2161 2162 oid->name_len = len; 2163 return 0; 2164 } 2165 2166 /* 2167 * If oid doesn't fit into inline buffer, BUG. 2168 */ 2169 void ceph_oid_printf(struct ceph_object_id *oid, const char *fmt, ...) 2170 { 2171 va_list ap; 2172 2173 va_start(ap, fmt); 2174 BUG_ON(oid_printf_vargs(oid, fmt, ap)); 2175 va_end(ap); 2176 } 2177 EXPORT_SYMBOL(ceph_oid_printf); 2178 2179 static __printf(3, 0) 2180 int oid_aprintf_vargs(struct ceph_object_id *oid, gfp_t gfp, 2181 const char *fmt, va_list ap) 2182 { 2183 va_list aq; 2184 int len; 2185 2186 va_copy(aq, ap); 2187 len = oid_printf_vargs(oid, fmt, aq); 2188 va_end(aq); 2189 2190 if (len) { 2191 char *external_name; 2192 2193 external_name = kmalloc(len + 1, gfp); 2194 if (!external_name) 2195 return -ENOMEM; 2196 2197 oid->name = external_name; 2198 WARN_ON(vsnprintf(oid->name, len + 1, fmt, ap) != len); 2199 oid->name_len = len; 2200 } 2201 2202 return 0; 2203 } 2204 2205 /* 2206 * If oid doesn't fit into inline buffer, allocate. 2207 */ 2208 int ceph_oid_aprintf(struct ceph_object_id *oid, gfp_t gfp, 2209 const char *fmt, ...) 2210 { 2211 va_list ap; 2212 int ret; 2213 2214 va_start(ap, fmt); 2215 ret = oid_aprintf_vargs(oid, gfp, fmt, ap); 2216 va_end(ap); 2217 2218 return ret; 2219 } 2220 EXPORT_SYMBOL(ceph_oid_aprintf); 2221 2222 void ceph_oid_destroy(struct ceph_object_id *oid) 2223 { 2224 if (oid->name != oid->inline_name) 2225 kfree(oid->name); 2226 } 2227 EXPORT_SYMBOL(ceph_oid_destroy); 2228 2229 /* 2230 * osds only 2231 */ 2232 static bool __osds_equal(const struct ceph_osds *lhs, 2233 const struct ceph_osds *rhs) 2234 { 2235 if (lhs->size == rhs->size && 2236 !memcmp(lhs->osds, rhs->osds, rhs->size * sizeof(rhs->osds[0]))) 2237 return true; 2238 2239 return false; 2240 } 2241 2242 /* 2243 * osds + primary 2244 */ 2245 static bool osds_equal(const struct ceph_osds *lhs, 2246 const struct ceph_osds *rhs) 2247 { 2248 if (__osds_equal(lhs, rhs) && 2249 lhs->primary == rhs->primary) 2250 return true; 2251 2252 return false; 2253 } 2254 2255 static bool osds_valid(const struct ceph_osds *set) 2256 { 2257 /* non-empty set */ 2258 if (set->size > 0 && set->primary >= 0) 2259 return true; 2260 2261 /* empty can_shift_osds set */ 2262 if (!set->size && set->primary == -1) 2263 return true; 2264 2265 /* empty !can_shift_osds set - all NONE */ 2266 if (set->size > 0 && set->primary == -1) { 2267 int i; 2268 2269 for (i = 0; i < set->size; i++) { 2270 if (set->osds[i] != CRUSH_ITEM_NONE) 2271 break; 2272 } 2273 if (i == set->size) 2274 return true; 2275 } 2276 2277 return false; 2278 } 2279 2280 void ceph_osds_copy(struct ceph_osds *dest, const struct ceph_osds *src) 2281 { 2282 memcpy(dest->osds, src->osds, src->size * sizeof(src->osds[0])); 2283 dest->size = src->size; 2284 dest->primary = src->primary; 2285 } 2286 2287 bool ceph_pg_is_split(const struct ceph_pg *pgid, u32 old_pg_num, 2288 u32 new_pg_num) 2289 { 2290 int old_bits = calc_bits_of(old_pg_num); 2291 int old_mask = (1 << old_bits) - 1; 2292 int n; 2293 2294 WARN_ON(pgid->seed >= old_pg_num); 2295 if (new_pg_num <= old_pg_num) 2296 return false; 2297 2298 for (n = 1; ; n++) { 2299 int next_bit = n << (old_bits - 1); 2300 u32 s = next_bit | pgid->seed; 2301 2302 if (s < old_pg_num || s == pgid->seed) 2303 continue; 2304 if (s >= new_pg_num) 2305 break; 2306 2307 s = ceph_stable_mod(s, old_pg_num, old_mask); 2308 if (s == pgid->seed) 2309 return true; 2310 } 2311 2312 return false; 2313 } 2314 2315 bool ceph_is_new_interval(const struct ceph_osds *old_acting, 2316 const struct ceph_osds *new_acting, 2317 const struct ceph_osds *old_up, 2318 const struct ceph_osds *new_up, 2319 int old_size, 2320 int new_size, 2321 int old_min_size, 2322 int new_min_size, 2323 u32 old_pg_num, 2324 u32 new_pg_num, 2325 bool old_sort_bitwise, 2326 bool new_sort_bitwise, 2327 bool old_recovery_deletes, 2328 bool new_recovery_deletes, 2329 const struct ceph_pg *pgid) 2330 { 2331 return !osds_equal(old_acting, new_acting) || 2332 !osds_equal(old_up, new_up) || 2333 old_size != new_size || 2334 old_min_size != new_min_size || 2335 ceph_pg_is_split(pgid, old_pg_num, new_pg_num) || 2336 old_sort_bitwise != new_sort_bitwise || 2337 old_recovery_deletes != new_recovery_deletes; 2338 } 2339 2340 static int calc_pg_rank(int osd, const struct ceph_osds *acting) 2341 { 2342 int i; 2343 2344 for (i = 0; i < acting->size; i++) { 2345 if (acting->osds[i] == osd) 2346 return i; 2347 } 2348 2349 return -1; 2350 } 2351 2352 static bool primary_changed(const struct ceph_osds *old_acting, 2353 const struct ceph_osds *new_acting) 2354 { 2355 if (!old_acting->size && !new_acting->size) 2356 return false; /* both still empty */ 2357 2358 if (!old_acting->size ^ !new_acting->size) 2359 return true; /* was empty, now not, or vice versa */ 2360 2361 if (old_acting->primary != new_acting->primary) 2362 return true; /* primary changed */ 2363 2364 if (calc_pg_rank(old_acting->primary, old_acting) != 2365 calc_pg_rank(new_acting->primary, new_acting)) 2366 return true; 2367 2368 return false; /* same primary (tho replicas may have changed) */ 2369 } 2370 2371 bool ceph_osds_changed(const struct ceph_osds *old_acting, 2372 const struct ceph_osds *new_acting, 2373 bool any_change) 2374 { 2375 if (primary_changed(old_acting, new_acting)) 2376 return true; 2377 2378 if (any_change && !__osds_equal(old_acting, new_acting)) 2379 return true; 2380 2381 return false; 2382 } 2383 2384 /* 2385 * Map an object into a PG. 2386 * 2387 * Should only be called with target_oid and target_oloc (as opposed to 2388 * base_oid and base_oloc), since tiering isn't taken into account. 2389 */ 2390 void __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi, 2391 const struct ceph_object_id *oid, 2392 const struct ceph_object_locator *oloc, 2393 struct ceph_pg *raw_pgid) 2394 { 2395 WARN_ON(pi->id != oloc->pool); 2396 2397 if (!oloc->pool_ns) { 2398 raw_pgid->pool = oloc->pool; 2399 raw_pgid->seed = ceph_str_hash(pi->object_hash, oid->name, 2400 oid->name_len); 2401 dout("%s %s -> raw_pgid %llu.%x\n", __func__, oid->name, 2402 raw_pgid->pool, raw_pgid->seed); 2403 } else { 2404 char stack_buf[256]; 2405 char *buf = stack_buf; 2406 int nsl = oloc->pool_ns->len; 2407 size_t total = nsl + 1 + oid->name_len; 2408 2409 if (total > sizeof(stack_buf)) 2410 buf = kmalloc(total, GFP_NOIO | __GFP_NOFAIL); 2411 memcpy(buf, oloc->pool_ns->str, nsl); 2412 buf[nsl] = '\037'; 2413 memcpy(buf + nsl + 1, oid->name, oid->name_len); 2414 raw_pgid->pool = oloc->pool; 2415 raw_pgid->seed = ceph_str_hash(pi->object_hash, buf, total); 2416 if (buf != stack_buf) 2417 kfree(buf); 2418 dout("%s %s ns %.*s -> raw_pgid %llu.%x\n", __func__, 2419 oid->name, nsl, oloc->pool_ns->str, 2420 raw_pgid->pool, raw_pgid->seed); 2421 } 2422 } 2423 2424 int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap, 2425 const struct ceph_object_id *oid, 2426 const struct ceph_object_locator *oloc, 2427 struct ceph_pg *raw_pgid) 2428 { 2429 struct ceph_pg_pool_info *pi; 2430 2431 pi = ceph_pg_pool_by_id(osdmap, oloc->pool); 2432 if (!pi) 2433 return -ENOENT; 2434 2435 __ceph_object_locator_to_pg(pi, oid, oloc, raw_pgid); 2436 return 0; 2437 } 2438 EXPORT_SYMBOL(ceph_object_locator_to_pg); 2439 2440 /* 2441 * Map a raw PG (full precision ps) into an actual PG. 2442 */ 2443 static void raw_pg_to_pg(struct ceph_pg_pool_info *pi, 2444 const struct ceph_pg *raw_pgid, 2445 struct ceph_pg *pgid) 2446 { 2447 pgid->pool = raw_pgid->pool; 2448 pgid->seed = ceph_stable_mod(raw_pgid->seed, pi->pg_num, 2449 pi->pg_num_mask); 2450 } 2451 2452 /* 2453 * Map a raw PG (full precision ps) into a placement ps (placement 2454 * seed). Include pool id in that value so that different pools don't 2455 * use the same seeds. 2456 */ 2457 static u32 raw_pg_to_pps(struct ceph_pg_pool_info *pi, 2458 const struct ceph_pg *raw_pgid) 2459 { 2460 if (pi->flags & CEPH_POOL_FLAG_HASHPSPOOL) { 2461 /* hash pool id and seed so that pool PGs do not overlap */ 2462 return crush_hash32_2(CRUSH_HASH_RJENKINS1, 2463 ceph_stable_mod(raw_pgid->seed, 2464 pi->pgp_num, 2465 pi->pgp_num_mask), 2466 raw_pgid->pool); 2467 } else { 2468 /* 2469 * legacy behavior: add ps and pool together. this is 2470 * not a great approach because the PGs from each pool 2471 * will overlap on top of each other: 0.5 == 1.4 == 2472 * 2.3 == ... 2473 */ 2474 return ceph_stable_mod(raw_pgid->seed, pi->pgp_num, 2475 pi->pgp_num_mask) + 2476 (unsigned)raw_pgid->pool; 2477 } 2478 } 2479 2480 /* 2481 * Magic value used for a "default" fallback choose_args, used if the 2482 * crush_choose_arg_map passed to do_crush() does not exist. If this 2483 * also doesn't exist, fall back to canonical weights. 2484 */ 2485 #define CEPH_DEFAULT_CHOOSE_ARGS -1 2486 2487 static int do_crush(struct ceph_osdmap *map, int ruleno, int x, 2488 int *result, int result_max, 2489 const __u32 *weight, int weight_max, 2490 s64 choose_args_index) 2491 { 2492 struct crush_choose_arg_map *arg_map; 2493 struct crush_work *work; 2494 int r; 2495 2496 BUG_ON(result_max > CEPH_PG_MAX_SIZE); 2497 2498 arg_map = lookup_choose_arg_map(&map->crush->choose_args, 2499 choose_args_index); 2500 if (!arg_map) 2501 arg_map = lookup_choose_arg_map(&map->crush->choose_args, 2502 CEPH_DEFAULT_CHOOSE_ARGS); 2503 2504 work = get_workspace(&map->crush_wsm, map->crush); 2505 r = crush_do_rule(map->crush, ruleno, x, result, result_max, 2506 weight, weight_max, work, 2507 arg_map ? arg_map->args : NULL); 2508 put_workspace(&map->crush_wsm, work); 2509 return r; 2510 } 2511 2512 static void remove_nonexistent_osds(struct ceph_osdmap *osdmap, 2513 struct ceph_pg_pool_info *pi, 2514 struct ceph_osds *set) 2515 { 2516 int i; 2517 2518 if (ceph_can_shift_osds(pi)) { 2519 int removed = 0; 2520 2521 /* shift left */ 2522 for (i = 0; i < set->size; i++) { 2523 if (!ceph_osd_exists(osdmap, set->osds[i])) { 2524 removed++; 2525 continue; 2526 } 2527 if (removed) 2528 set->osds[i - removed] = set->osds[i]; 2529 } 2530 set->size -= removed; 2531 } else { 2532 /* set dne devices to NONE */ 2533 for (i = 0; i < set->size; i++) { 2534 if (!ceph_osd_exists(osdmap, set->osds[i])) 2535 set->osds[i] = CRUSH_ITEM_NONE; 2536 } 2537 } 2538 } 2539 2540 /* 2541 * Calculate raw set (CRUSH output) for given PG and filter out 2542 * nonexistent OSDs. ->primary is undefined for a raw set. 2543 * 2544 * Placement seed (CRUSH input) is returned through @ppps. 2545 */ 2546 static void pg_to_raw_osds(struct ceph_osdmap *osdmap, 2547 struct ceph_pg_pool_info *pi, 2548 const struct ceph_pg *raw_pgid, 2549 struct ceph_osds *raw, 2550 u32 *ppps) 2551 { 2552 u32 pps = raw_pg_to_pps(pi, raw_pgid); 2553 int ruleno; 2554 int len; 2555 2556 ceph_osds_init(raw); 2557 if (ppps) 2558 *ppps = pps; 2559 2560 ruleno = crush_find_rule(osdmap->crush, pi->crush_ruleset, pi->type, 2561 pi->size); 2562 if (ruleno < 0) { 2563 pr_err("no crush rule: pool %lld ruleset %d type %d size %d\n", 2564 pi->id, pi->crush_ruleset, pi->type, pi->size); 2565 return; 2566 } 2567 2568 if (pi->size > ARRAY_SIZE(raw->osds)) { 2569 pr_err_ratelimited("pool %lld ruleset %d type %d too wide: size %d > %zu\n", 2570 pi->id, pi->crush_ruleset, pi->type, pi->size, 2571 ARRAY_SIZE(raw->osds)); 2572 return; 2573 } 2574 2575 len = do_crush(osdmap, ruleno, pps, raw->osds, pi->size, 2576 osdmap->osd_weight, osdmap->max_osd, pi->id); 2577 if (len < 0) { 2578 pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n", 2579 len, ruleno, pi->id, pi->crush_ruleset, pi->type, 2580 pi->size); 2581 return; 2582 } 2583 2584 raw->size = len; 2585 remove_nonexistent_osds(osdmap, pi, raw); 2586 } 2587 2588 /* apply pg_upmap[_items] mappings */ 2589 static void apply_upmap(struct ceph_osdmap *osdmap, 2590 const struct ceph_pg *pgid, 2591 struct ceph_osds *raw) 2592 { 2593 struct ceph_pg_mapping *pg; 2594 int i, j; 2595 2596 pg = lookup_pg_mapping(&osdmap->pg_upmap, pgid); 2597 if (pg) { 2598 /* make sure targets aren't marked out */ 2599 for (i = 0; i < pg->pg_upmap.len; i++) { 2600 int osd = pg->pg_upmap.osds[i]; 2601 2602 if (osd != CRUSH_ITEM_NONE && 2603 osd < osdmap->max_osd && 2604 osdmap->osd_weight[osd] == 0) { 2605 /* reject/ignore explicit mapping */ 2606 return; 2607 } 2608 } 2609 for (i = 0; i < pg->pg_upmap.len; i++) 2610 raw->osds[i] = pg->pg_upmap.osds[i]; 2611 raw->size = pg->pg_upmap.len; 2612 /* check and apply pg_upmap_items, if any */ 2613 } 2614 2615 pg = lookup_pg_mapping(&osdmap->pg_upmap_items, pgid); 2616 if (pg) { 2617 /* 2618 * Note: this approach does not allow a bidirectional swap, 2619 * e.g., [[1,2],[2,1]] applied to [0,1,2] -> [0,2,1]. 2620 */ 2621 for (i = 0; i < pg->pg_upmap_items.len; i++) { 2622 int from = pg->pg_upmap_items.from_to[i][0]; 2623 int to = pg->pg_upmap_items.from_to[i][1]; 2624 int pos = -1; 2625 bool exists = false; 2626 2627 /* make sure replacement doesn't already appear */ 2628 for (j = 0; j < raw->size; j++) { 2629 int osd = raw->osds[j]; 2630 2631 if (osd == to) { 2632 exists = true; 2633 break; 2634 } 2635 /* ignore mapping if target is marked out */ 2636 if (osd == from && pos < 0 && 2637 !(to != CRUSH_ITEM_NONE && 2638 to < osdmap->max_osd && 2639 osdmap->osd_weight[to] == 0)) { 2640 pos = j; 2641 } 2642 } 2643 if (!exists && pos >= 0) 2644 raw->osds[pos] = to; 2645 } 2646 } 2647 } 2648 2649 /* 2650 * Given raw set, calculate up set and up primary. By definition of an 2651 * up set, the result won't contain nonexistent or down OSDs. 2652 * 2653 * This is done in-place - on return @set is the up set. If it's 2654 * empty, ->primary will remain undefined. 2655 */ 2656 static void raw_to_up_osds(struct ceph_osdmap *osdmap, 2657 struct ceph_pg_pool_info *pi, 2658 struct ceph_osds *set) 2659 { 2660 int i; 2661 2662 /* ->primary is undefined for a raw set */ 2663 BUG_ON(set->primary != -1); 2664 2665 if (ceph_can_shift_osds(pi)) { 2666 int removed = 0; 2667 2668 /* shift left */ 2669 for (i = 0; i < set->size; i++) { 2670 if (ceph_osd_is_down(osdmap, set->osds[i])) { 2671 removed++; 2672 continue; 2673 } 2674 if (removed) 2675 set->osds[i - removed] = set->osds[i]; 2676 } 2677 set->size -= removed; 2678 if (set->size > 0) 2679 set->primary = set->osds[0]; 2680 } else { 2681 /* set down/dne devices to NONE */ 2682 for (i = set->size - 1; i >= 0; i--) { 2683 if (ceph_osd_is_down(osdmap, set->osds[i])) 2684 set->osds[i] = CRUSH_ITEM_NONE; 2685 else 2686 set->primary = set->osds[i]; 2687 } 2688 } 2689 } 2690 2691 static void apply_primary_affinity(struct ceph_osdmap *osdmap, 2692 struct ceph_pg_pool_info *pi, 2693 u32 pps, 2694 struct ceph_osds *up) 2695 { 2696 int i; 2697 int pos = -1; 2698 2699 /* 2700 * Do we have any non-default primary_affinity values for these 2701 * osds? 2702 */ 2703 if (!osdmap->osd_primary_affinity) 2704 return; 2705 2706 for (i = 0; i < up->size; i++) { 2707 int osd = up->osds[i]; 2708 2709 if (osd != CRUSH_ITEM_NONE && 2710 osdmap->osd_primary_affinity[osd] != 2711 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY) { 2712 break; 2713 } 2714 } 2715 if (i == up->size) 2716 return; 2717 2718 /* 2719 * Pick the primary. Feed both the seed (for the pg) and the 2720 * osd into the hash/rng so that a proportional fraction of an 2721 * osd's pgs get rejected as primary. 2722 */ 2723 for (i = 0; i < up->size; i++) { 2724 int osd = up->osds[i]; 2725 u32 aff; 2726 2727 if (osd == CRUSH_ITEM_NONE) 2728 continue; 2729 2730 aff = osdmap->osd_primary_affinity[osd]; 2731 if (aff < CEPH_OSD_MAX_PRIMARY_AFFINITY && 2732 (crush_hash32_2(CRUSH_HASH_RJENKINS1, 2733 pps, osd) >> 16) >= aff) { 2734 /* 2735 * We chose not to use this primary. Note it 2736 * anyway as a fallback in case we don't pick 2737 * anyone else, but keep looking. 2738 */ 2739 if (pos < 0) 2740 pos = i; 2741 } else { 2742 pos = i; 2743 break; 2744 } 2745 } 2746 if (pos < 0) 2747 return; 2748 2749 up->primary = up->osds[pos]; 2750 2751 if (ceph_can_shift_osds(pi) && pos > 0) { 2752 /* move the new primary to the front */ 2753 for (i = pos; i > 0; i--) 2754 up->osds[i] = up->osds[i - 1]; 2755 up->osds[0] = up->primary; 2756 } 2757 } 2758 2759 /* 2760 * Get pg_temp and primary_temp mappings for given PG. 2761 * 2762 * Note that a PG may have none, only pg_temp, only primary_temp or 2763 * both pg_temp and primary_temp mappings. This means @temp isn't 2764 * always a valid OSD set on return: in the "only primary_temp" case, 2765 * @temp will have its ->primary >= 0 but ->size == 0. 2766 */ 2767 static void get_temp_osds(struct ceph_osdmap *osdmap, 2768 struct ceph_pg_pool_info *pi, 2769 const struct ceph_pg *pgid, 2770 struct ceph_osds *temp) 2771 { 2772 struct ceph_pg_mapping *pg; 2773 int i; 2774 2775 ceph_osds_init(temp); 2776 2777 /* pg_temp? */ 2778 pg = lookup_pg_mapping(&osdmap->pg_temp, pgid); 2779 if (pg) { 2780 for (i = 0; i < pg->pg_temp.len; i++) { 2781 if (ceph_osd_is_down(osdmap, pg->pg_temp.osds[i])) { 2782 if (ceph_can_shift_osds(pi)) 2783 continue; 2784 2785 temp->osds[temp->size++] = CRUSH_ITEM_NONE; 2786 } else { 2787 temp->osds[temp->size++] = pg->pg_temp.osds[i]; 2788 } 2789 } 2790 2791 /* apply pg_temp's primary */ 2792 for (i = 0; i < temp->size; i++) { 2793 if (temp->osds[i] != CRUSH_ITEM_NONE) { 2794 temp->primary = temp->osds[i]; 2795 break; 2796 } 2797 } 2798 } 2799 2800 /* primary_temp? */ 2801 pg = lookup_pg_mapping(&osdmap->primary_temp, pgid); 2802 if (pg) 2803 temp->primary = pg->primary_temp.osd; 2804 } 2805 2806 /* 2807 * Map a PG to its acting set as well as its up set. 2808 * 2809 * Acting set is used for data mapping purposes, while up set can be 2810 * recorded for detecting interval changes and deciding whether to 2811 * resend a request. 2812 */ 2813 void ceph_pg_to_up_acting_osds(struct ceph_osdmap *osdmap, 2814 struct ceph_pg_pool_info *pi, 2815 const struct ceph_pg *raw_pgid, 2816 struct ceph_osds *up, 2817 struct ceph_osds *acting) 2818 { 2819 struct ceph_pg pgid; 2820 u32 pps; 2821 2822 WARN_ON(pi->id != raw_pgid->pool); 2823 raw_pg_to_pg(pi, raw_pgid, &pgid); 2824 2825 pg_to_raw_osds(osdmap, pi, raw_pgid, up, &pps); 2826 apply_upmap(osdmap, &pgid, up); 2827 raw_to_up_osds(osdmap, pi, up); 2828 apply_primary_affinity(osdmap, pi, pps, up); 2829 get_temp_osds(osdmap, pi, &pgid, acting); 2830 if (!acting->size) { 2831 memcpy(acting->osds, up->osds, up->size * sizeof(up->osds[0])); 2832 acting->size = up->size; 2833 if (acting->primary == -1) 2834 acting->primary = up->primary; 2835 } 2836 WARN_ON(!osds_valid(up) || !osds_valid(acting)); 2837 } 2838 2839 bool ceph_pg_to_primary_shard(struct ceph_osdmap *osdmap, 2840 struct ceph_pg_pool_info *pi, 2841 const struct ceph_pg *raw_pgid, 2842 struct ceph_spg *spgid) 2843 { 2844 struct ceph_pg pgid; 2845 struct ceph_osds up, acting; 2846 int i; 2847 2848 WARN_ON(pi->id != raw_pgid->pool); 2849 raw_pg_to_pg(pi, raw_pgid, &pgid); 2850 2851 if (ceph_can_shift_osds(pi)) { 2852 spgid->pgid = pgid; /* struct */ 2853 spgid->shard = CEPH_SPG_NOSHARD; 2854 return true; 2855 } 2856 2857 ceph_pg_to_up_acting_osds(osdmap, pi, &pgid, &up, &acting); 2858 for (i = 0; i < acting.size; i++) { 2859 if (acting.osds[i] == acting.primary) { 2860 spgid->pgid = pgid; /* struct */ 2861 spgid->shard = i; 2862 return true; 2863 } 2864 } 2865 2866 return false; 2867 } 2868 2869 /* 2870 * Return acting primary for given PG, or -1 if none. 2871 */ 2872 int ceph_pg_to_acting_primary(struct ceph_osdmap *osdmap, 2873 const struct ceph_pg *raw_pgid) 2874 { 2875 struct ceph_pg_pool_info *pi; 2876 struct ceph_osds up, acting; 2877 2878 pi = ceph_pg_pool_by_id(osdmap, raw_pgid->pool); 2879 if (!pi) 2880 return -1; 2881 2882 ceph_pg_to_up_acting_osds(osdmap, pi, raw_pgid, &up, &acting); 2883 return acting.primary; 2884 } 2885 EXPORT_SYMBOL(ceph_pg_to_acting_primary); 2886 2887 static struct crush_loc_node *alloc_crush_loc(size_t type_name_len, 2888 size_t name_len) 2889 { 2890 struct crush_loc_node *loc; 2891 2892 loc = kmalloc(sizeof(*loc) + type_name_len + name_len + 2, GFP_NOIO); 2893 if (!loc) 2894 return NULL; 2895 2896 RB_CLEAR_NODE(&loc->cl_node); 2897 return loc; 2898 } 2899 2900 static void free_crush_loc(struct crush_loc_node *loc) 2901 { 2902 WARN_ON(!RB_EMPTY_NODE(&loc->cl_node)); 2903 2904 kfree(loc); 2905 } 2906 2907 static int crush_loc_compare(const struct crush_loc *loc1, 2908 const struct crush_loc *loc2) 2909 { 2910 return strcmp(loc1->cl_type_name, loc2->cl_type_name) ?: 2911 strcmp(loc1->cl_name, loc2->cl_name); 2912 } 2913 2914 DEFINE_RB_FUNCS2(crush_loc, struct crush_loc_node, cl_loc, crush_loc_compare, 2915 RB_BYPTR, const struct crush_loc *, cl_node) 2916 2917 /* 2918 * Parses a set of <bucket type name>':'<bucket name> pairs separated 2919 * by '|', e.g. "rack:foo1|rack:foo2|datacenter:bar". 2920 * 2921 * Note that @crush_location is modified by strsep(). 2922 */ 2923 int ceph_parse_crush_location(char *crush_location, struct rb_root *locs) 2924 { 2925 struct crush_loc_node *loc; 2926 const char *type_name, *name, *colon; 2927 size_t type_name_len, name_len; 2928 2929 dout("%s '%s'\n", __func__, crush_location); 2930 while ((type_name = strsep(&crush_location, "|"))) { 2931 colon = strchr(type_name, ':'); 2932 if (!colon) 2933 return -EINVAL; 2934 2935 type_name_len = colon - type_name; 2936 if (type_name_len == 0) 2937 return -EINVAL; 2938 2939 name = colon + 1; 2940 name_len = strlen(name); 2941 if (name_len == 0) 2942 return -EINVAL; 2943 2944 loc = alloc_crush_loc(type_name_len, name_len); 2945 if (!loc) 2946 return -ENOMEM; 2947 2948 loc->cl_loc.cl_type_name = loc->cl_data; 2949 memcpy(loc->cl_loc.cl_type_name, type_name, type_name_len); 2950 loc->cl_loc.cl_type_name[type_name_len] = '\0'; 2951 2952 loc->cl_loc.cl_name = loc->cl_data + type_name_len + 1; 2953 memcpy(loc->cl_loc.cl_name, name, name_len); 2954 loc->cl_loc.cl_name[name_len] = '\0'; 2955 2956 if (!__insert_crush_loc(locs, loc)) { 2957 free_crush_loc(loc); 2958 return -EEXIST; 2959 } 2960 2961 dout("%s type_name '%s' name '%s'\n", __func__, 2962 loc->cl_loc.cl_type_name, loc->cl_loc.cl_name); 2963 } 2964 2965 return 0; 2966 } 2967 2968 int ceph_compare_crush_locs(struct rb_root *locs1, struct rb_root *locs2) 2969 { 2970 struct rb_node *n1 = rb_first(locs1); 2971 struct rb_node *n2 = rb_first(locs2); 2972 int ret; 2973 2974 for ( ; n1 && n2; n1 = rb_next(n1), n2 = rb_next(n2)) { 2975 struct crush_loc_node *loc1 = 2976 rb_entry(n1, struct crush_loc_node, cl_node); 2977 struct crush_loc_node *loc2 = 2978 rb_entry(n2, struct crush_loc_node, cl_node); 2979 2980 ret = crush_loc_compare(&loc1->cl_loc, &loc2->cl_loc); 2981 if (ret) 2982 return ret; 2983 } 2984 2985 if (!n1 && n2) 2986 return -1; 2987 if (n1 && !n2) 2988 return 1; 2989 return 0; 2990 } 2991 2992 void ceph_clear_crush_locs(struct rb_root *locs) 2993 { 2994 while (!RB_EMPTY_ROOT(locs)) { 2995 struct crush_loc_node *loc = 2996 rb_entry(rb_first(locs), struct crush_loc_node, cl_node); 2997 2998 erase_crush_loc(locs, loc); 2999 free_crush_loc(loc); 3000 } 3001 } 3002 3003 /* 3004 * [a-zA-Z0-9-_.]+ 3005 */ 3006 static bool is_valid_crush_name(const char *name) 3007 { 3008 do { 3009 if (!('a' <= *name && *name <= 'z') && 3010 !('A' <= *name && *name <= 'Z') && 3011 !('0' <= *name && *name <= '9') && 3012 *name != '-' && *name != '_' && *name != '.') 3013 return false; 3014 } while (*++name != '\0'); 3015 3016 return true; 3017 } 3018 3019 /* 3020 * Gets the parent of an item. Returns its id (<0 because the 3021 * parent is always a bucket), type id (>0 for the same reason, 3022 * via @parent_type_id) and location (via @parent_loc). If no 3023 * parent, returns 0. 3024 * 3025 * Does a linear search, as there are no parent pointers of any 3026 * kind. Note that the result is ambiguous for items that occur 3027 * multiple times in the map. 3028 */ 3029 static int get_immediate_parent(struct crush_map *c, int id, 3030 u16 *parent_type_id, 3031 struct crush_loc *parent_loc) 3032 { 3033 struct crush_bucket *b; 3034 struct crush_name_node *type_cn, *cn; 3035 int i, j; 3036 3037 for (i = 0; i < c->max_buckets; i++) { 3038 b = c->buckets[i]; 3039 if (!b) 3040 continue; 3041 3042 /* ignore per-class shadow hierarchy */ 3043 cn = lookup_crush_name(&c->names, b->id); 3044 if (!cn || !is_valid_crush_name(cn->cn_name)) 3045 continue; 3046 3047 for (j = 0; j < b->size; j++) { 3048 if (b->items[j] != id) 3049 continue; 3050 3051 *parent_type_id = b->type; 3052 type_cn = lookup_crush_name(&c->type_names, b->type); 3053 parent_loc->cl_type_name = type_cn->cn_name; 3054 parent_loc->cl_name = cn->cn_name; 3055 return b->id; 3056 } 3057 } 3058 3059 return 0; /* no parent */ 3060 } 3061 3062 /* 3063 * Calculates the locality/distance from an item to a client 3064 * location expressed in terms of CRUSH hierarchy as a set of 3065 * (bucket type name, bucket name) pairs. Specifically, looks 3066 * for the lowest-valued bucket type for which the location of 3067 * @id matches one of the locations in @locs, so for standard 3068 * bucket types (host = 1, rack = 3, datacenter = 8, zone = 9) 3069 * a matching host is closer than a matching rack and a matching 3070 * data center is closer than a matching zone. 3071 * 3072 * Specifying multiple locations (a "multipath" location) such 3073 * as "rack=foo1 rack=foo2 datacenter=bar" is allowed -- @locs 3074 * is a multimap. The locality will be: 3075 * 3076 * - 3 for OSDs in racks foo1 and foo2 3077 * - 8 for OSDs in data center bar 3078 * - -1 for all other OSDs 3079 * 3080 * The lowest possible bucket type is 1, so the best locality 3081 * for an OSD is 1 (i.e. a matching host). Locality 0 would be 3082 * the OSD itself. 3083 */ 3084 int ceph_get_crush_locality(struct ceph_osdmap *osdmap, int id, 3085 struct rb_root *locs) 3086 { 3087 struct crush_loc loc; 3088 u16 type_id; 3089 3090 /* 3091 * Instead of repeated get_immediate_parent() calls, 3092 * the location of @id could be obtained with a single 3093 * depth-first traversal. 3094 */ 3095 for (;;) { 3096 id = get_immediate_parent(osdmap->crush, id, &type_id, &loc); 3097 if (id >= 0) 3098 return -1; /* not local */ 3099 3100 if (lookup_crush_loc(locs, &loc)) 3101 return type_id; 3102 } 3103 } 3104