1 // SPDX-License-Identifier: GPL-2.0 2 3 #include <linux/ceph/ceph_debug.h> 4 5 #include <linux/module.h> 6 #include <linux/slab.h> 7 8 #include <linux/ceph/libceph.h> 9 #include <linux/ceph/osdmap.h> 10 #include <linux/ceph/decode.h> 11 #include <linux/crush/hash.h> 12 #include <linux/crush/mapper.h> 13 14 char *ceph_osdmap_state_str(char *str, int len, u32 state) 15 { 16 if (!len) 17 return str; 18 19 if ((state & CEPH_OSD_EXISTS) && (state & CEPH_OSD_UP)) 20 snprintf(str, len, "exists, up"); 21 else if (state & CEPH_OSD_EXISTS) 22 snprintf(str, len, "exists"); 23 else if (state & CEPH_OSD_UP) 24 snprintf(str, len, "up"); 25 else 26 snprintf(str, len, "doesn't exist"); 27 28 return str; 29 } 30 31 /* maps */ 32 33 static int calc_bits_of(unsigned int t) 34 { 35 int b = 0; 36 while (t) { 37 t = t >> 1; 38 b++; 39 } 40 return b; 41 } 42 43 /* 44 * the foo_mask is the smallest value 2^n-1 that is >= foo. 45 */ 46 static void calc_pg_masks(struct ceph_pg_pool_info *pi) 47 { 48 pi->pg_num_mask = (1 << calc_bits_of(pi->pg_num-1)) - 1; 49 pi->pgp_num_mask = (1 << calc_bits_of(pi->pgp_num-1)) - 1; 50 } 51 52 /* 53 * decode crush map 54 */ 55 static int crush_decode_uniform_bucket(void **p, void *end, 56 struct crush_bucket_uniform *b) 57 { 58 dout("crush_decode_uniform_bucket %p to %p\n", *p, end); 59 ceph_decode_need(p, end, (1+b->h.size) * sizeof(u32), bad); 60 b->item_weight = ceph_decode_32(p); 61 return 0; 62 bad: 63 return -EINVAL; 64 } 65 66 static int crush_decode_list_bucket(void **p, void *end, 67 struct crush_bucket_list *b) 68 { 69 int j; 70 dout("crush_decode_list_bucket %p to %p\n", *p, end); 71 b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS); 72 if (b->item_weights == NULL) 73 return -ENOMEM; 74 b->sum_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS); 75 if (b->sum_weights == NULL) 76 return -ENOMEM; 77 ceph_decode_need(p, end, 2 * b->h.size * sizeof(u32), bad); 78 for (j = 0; j < b->h.size; j++) { 79 b->item_weights[j] = ceph_decode_32(p); 80 b->sum_weights[j] = ceph_decode_32(p); 81 } 82 return 0; 83 bad: 84 return -EINVAL; 85 } 86 87 static int crush_decode_tree_bucket(void **p, void *end, 88 struct crush_bucket_tree *b) 89 { 90 int j; 91 dout("crush_decode_tree_bucket %p to %p\n", *p, end); 92 ceph_decode_8_safe(p, end, b->num_nodes, bad); 93 b->node_weights = kcalloc(b->num_nodes, sizeof(u32), GFP_NOFS); 94 if (b->node_weights == NULL) 95 return -ENOMEM; 96 ceph_decode_need(p, end, b->num_nodes * sizeof(u32), bad); 97 for (j = 0; j < b->num_nodes; j++) 98 b->node_weights[j] = ceph_decode_32(p); 99 return 0; 100 bad: 101 return -EINVAL; 102 } 103 104 static int crush_decode_straw_bucket(void **p, void *end, 105 struct crush_bucket_straw *b) 106 { 107 int j; 108 dout("crush_decode_straw_bucket %p to %p\n", *p, end); 109 b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS); 110 if (b->item_weights == NULL) 111 return -ENOMEM; 112 b->straws = kcalloc(b->h.size, sizeof(u32), GFP_NOFS); 113 if (b->straws == NULL) 114 return -ENOMEM; 115 ceph_decode_need(p, end, 2 * b->h.size * sizeof(u32), bad); 116 for (j = 0; j < b->h.size; j++) { 117 b->item_weights[j] = ceph_decode_32(p); 118 b->straws[j] = ceph_decode_32(p); 119 } 120 return 0; 121 bad: 122 return -EINVAL; 123 } 124 125 static int crush_decode_straw2_bucket(void **p, void *end, 126 struct crush_bucket_straw2 *b) 127 { 128 int j; 129 dout("crush_decode_straw2_bucket %p to %p\n", *p, end); 130 b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS); 131 if (b->item_weights == NULL) 132 return -ENOMEM; 133 ceph_decode_need(p, end, b->h.size * sizeof(u32), bad); 134 for (j = 0; j < b->h.size; j++) 135 b->item_weights[j] = ceph_decode_32(p); 136 return 0; 137 bad: 138 return -EINVAL; 139 } 140 141 struct crush_name_node { 142 struct rb_node cn_node; 143 int cn_id; 144 char cn_name[]; 145 }; 146 147 static struct crush_name_node *alloc_crush_name(size_t name_len) 148 { 149 struct crush_name_node *cn; 150 151 cn = kmalloc(sizeof(*cn) + name_len + 1, GFP_NOIO); 152 if (!cn) 153 return NULL; 154 155 RB_CLEAR_NODE(&cn->cn_node); 156 return cn; 157 } 158 159 static void free_crush_name(struct crush_name_node *cn) 160 { 161 WARN_ON(!RB_EMPTY_NODE(&cn->cn_node)); 162 163 kfree(cn); 164 } 165 166 DEFINE_RB_FUNCS(crush_name, struct crush_name_node, cn_id, cn_node) 167 168 static int decode_crush_names(void **p, void *end, struct rb_root *root) 169 { 170 u32 n; 171 172 ceph_decode_32_safe(p, end, n, e_inval); 173 while (n--) { 174 struct crush_name_node *cn; 175 int id; 176 u32 name_len; 177 178 ceph_decode_32_safe(p, end, id, e_inval); 179 ceph_decode_32_safe(p, end, name_len, e_inval); 180 ceph_decode_need(p, end, name_len, e_inval); 181 182 cn = alloc_crush_name(name_len); 183 if (!cn) 184 return -ENOMEM; 185 186 cn->cn_id = id; 187 memcpy(cn->cn_name, *p, name_len); 188 cn->cn_name[name_len] = '\0'; 189 *p += name_len; 190 191 if (!__insert_crush_name(root, cn)) { 192 free_crush_name(cn); 193 return -EEXIST; 194 } 195 } 196 197 return 0; 198 199 e_inval: 200 return -EINVAL; 201 } 202 203 void clear_crush_names(struct rb_root *root) 204 { 205 while (!RB_EMPTY_ROOT(root)) { 206 struct crush_name_node *cn = 207 rb_entry(rb_first(root), struct crush_name_node, cn_node); 208 209 erase_crush_name(root, cn); 210 free_crush_name(cn); 211 } 212 } 213 214 static struct crush_choose_arg_map *alloc_choose_arg_map(void) 215 { 216 struct crush_choose_arg_map *arg_map; 217 218 arg_map = kzalloc(sizeof(*arg_map), GFP_NOIO); 219 if (!arg_map) 220 return NULL; 221 222 RB_CLEAR_NODE(&arg_map->node); 223 return arg_map; 224 } 225 226 static void free_choose_arg_map(struct crush_choose_arg_map *arg_map) 227 { 228 if (arg_map) { 229 int i, j; 230 231 WARN_ON(!RB_EMPTY_NODE(&arg_map->node)); 232 233 for (i = 0; i < arg_map->size; i++) { 234 struct crush_choose_arg *arg = &arg_map->args[i]; 235 236 for (j = 0; j < arg->weight_set_size; j++) 237 kfree(arg->weight_set[j].weights); 238 kfree(arg->weight_set); 239 kfree(arg->ids); 240 } 241 kfree(arg_map->args); 242 kfree(arg_map); 243 } 244 } 245 246 DEFINE_RB_FUNCS(choose_arg_map, struct crush_choose_arg_map, choose_args_index, 247 node); 248 249 void clear_choose_args(struct crush_map *c) 250 { 251 while (!RB_EMPTY_ROOT(&c->choose_args)) { 252 struct crush_choose_arg_map *arg_map = 253 rb_entry(rb_first(&c->choose_args), 254 struct crush_choose_arg_map, node); 255 256 erase_choose_arg_map(&c->choose_args, arg_map); 257 free_choose_arg_map(arg_map); 258 } 259 } 260 261 static u32 *decode_array_32_alloc(void **p, void *end, u32 *plen) 262 { 263 u32 *a = NULL; 264 u32 len; 265 int ret; 266 267 ceph_decode_32_safe(p, end, len, e_inval); 268 if (len) { 269 u32 i; 270 271 a = kmalloc_array(len, sizeof(u32), GFP_NOIO); 272 if (!a) { 273 ret = -ENOMEM; 274 goto fail; 275 } 276 277 ceph_decode_need(p, end, len * sizeof(u32), e_inval); 278 for (i = 0; i < len; i++) 279 a[i] = ceph_decode_32(p); 280 } 281 282 *plen = len; 283 return a; 284 285 e_inval: 286 ret = -EINVAL; 287 fail: 288 kfree(a); 289 return ERR_PTR(ret); 290 } 291 292 /* 293 * Assumes @arg is zero-initialized. 294 */ 295 static int decode_choose_arg(void **p, void *end, struct crush_choose_arg *arg) 296 { 297 int ret; 298 299 ceph_decode_32_safe(p, end, arg->weight_set_size, e_inval); 300 if (arg->weight_set_size) { 301 u32 i; 302 303 arg->weight_set = kmalloc_array(arg->weight_set_size, 304 sizeof(*arg->weight_set), 305 GFP_NOIO); 306 if (!arg->weight_set) 307 return -ENOMEM; 308 309 for (i = 0; i < arg->weight_set_size; i++) { 310 struct crush_weight_set *w = &arg->weight_set[i]; 311 312 w->weights = decode_array_32_alloc(p, end, &w->size); 313 if (IS_ERR(w->weights)) { 314 ret = PTR_ERR(w->weights); 315 w->weights = NULL; 316 return ret; 317 } 318 } 319 } 320 321 arg->ids = decode_array_32_alloc(p, end, &arg->ids_size); 322 if (IS_ERR(arg->ids)) { 323 ret = PTR_ERR(arg->ids); 324 arg->ids = NULL; 325 return ret; 326 } 327 328 return 0; 329 330 e_inval: 331 return -EINVAL; 332 } 333 334 static int decode_choose_args(void **p, void *end, struct crush_map *c) 335 { 336 struct crush_choose_arg_map *arg_map = NULL; 337 u32 num_choose_arg_maps, num_buckets; 338 int ret; 339 340 ceph_decode_32_safe(p, end, num_choose_arg_maps, e_inval); 341 while (num_choose_arg_maps--) { 342 arg_map = alloc_choose_arg_map(); 343 if (!arg_map) { 344 ret = -ENOMEM; 345 goto fail; 346 } 347 348 ceph_decode_64_safe(p, end, arg_map->choose_args_index, 349 e_inval); 350 arg_map->size = c->max_buckets; 351 arg_map->args = kcalloc(arg_map->size, sizeof(*arg_map->args), 352 GFP_NOIO); 353 if (!arg_map->args) { 354 ret = -ENOMEM; 355 goto fail; 356 } 357 358 ceph_decode_32_safe(p, end, num_buckets, e_inval); 359 while (num_buckets--) { 360 struct crush_choose_arg *arg; 361 u32 bucket_index; 362 363 ceph_decode_32_safe(p, end, bucket_index, e_inval); 364 if (bucket_index >= arg_map->size) 365 goto e_inval; 366 367 arg = &arg_map->args[bucket_index]; 368 ret = decode_choose_arg(p, end, arg); 369 if (ret) 370 goto fail; 371 372 if (arg->ids_size && 373 arg->ids_size != c->buckets[bucket_index]->size) 374 goto e_inval; 375 } 376 377 insert_choose_arg_map(&c->choose_args, arg_map); 378 } 379 380 return 0; 381 382 e_inval: 383 ret = -EINVAL; 384 fail: 385 free_choose_arg_map(arg_map); 386 return ret; 387 } 388 389 static void crush_finalize(struct crush_map *c) 390 { 391 __s32 b; 392 393 /* Space for the array of pointers to per-bucket workspace */ 394 c->working_size = sizeof(struct crush_work) + 395 c->max_buckets * sizeof(struct crush_work_bucket *); 396 397 for (b = 0; b < c->max_buckets; b++) { 398 if (!c->buckets[b]) 399 continue; 400 401 switch (c->buckets[b]->alg) { 402 default: 403 /* 404 * The base case, permutation variables and 405 * the pointer to the permutation array. 406 */ 407 c->working_size += sizeof(struct crush_work_bucket); 408 break; 409 } 410 /* Every bucket has a permutation array. */ 411 c->working_size += c->buckets[b]->size * sizeof(__u32); 412 } 413 } 414 415 static struct crush_map *crush_decode(void *pbyval, void *end) 416 { 417 struct crush_map *c; 418 int err; 419 int i, j; 420 void **p = &pbyval; 421 void *start = pbyval; 422 u32 magic; 423 424 dout("crush_decode %p to %p len %d\n", *p, end, (int)(end - *p)); 425 426 c = kzalloc(sizeof(*c), GFP_NOFS); 427 if (c == NULL) 428 return ERR_PTR(-ENOMEM); 429 430 c->type_names = RB_ROOT; 431 c->names = RB_ROOT; 432 c->choose_args = RB_ROOT; 433 434 /* set tunables to default values */ 435 c->choose_local_tries = 2; 436 c->choose_local_fallback_tries = 5; 437 c->choose_total_tries = 19; 438 c->chooseleaf_descend_once = 0; 439 440 ceph_decode_need(p, end, 4*sizeof(u32), bad); 441 magic = ceph_decode_32(p); 442 if (magic != CRUSH_MAGIC) { 443 pr_err("crush_decode magic %x != current %x\n", 444 (unsigned int)magic, (unsigned int)CRUSH_MAGIC); 445 goto bad; 446 } 447 c->max_buckets = ceph_decode_32(p); 448 c->max_rules = ceph_decode_32(p); 449 c->max_devices = ceph_decode_32(p); 450 451 c->buckets = kcalloc(c->max_buckets, sizeof(*c->buckets), GFP_NOFS); 452 if (c->buckets == NULL) 453 goto badmem; 454 c->rules = kcalloc(c->max_rules, sizeof(*c->rules), GFP_NOFS); 455 if (c->rules == NULL) 456 goto badmem; 457 458 /* buckets */ 459 for (i = 0; i < c->max_buckets; i++) { 460 int size = 0; 461 u32 alg; 462 struct crush_bucket *b; 463 464 ceph_decode_32_safe(p, end, alg, bad); 465 if (alg == 0) { 466 c->buckets[i] = NULL; 467 continue; 468 } 469 dout("crush_decode bucket %d off %x %p to %p\n", 470 i, (int)(*p-start), *p, end); 471 472 switch (alg) { 473 case CRUSH_BUCKET_UNIFORM: 474 size = sizeof(struct crush_bucket_uniform); 475 break; 476 case CRUSH_BUCKET_LIST: 477 size = sizeof(struct crush_bucket_list); 478 break; 479 case CRUSH_BUCKET_TREE: 480 size = sizeof(struct crush_bucket_tree); 481 break; 482 case CRUSH_BUCKET_STRAW: 483 size = sizeof(struct crush_bucket_straw); 484 break; 485 case CRUSH_BUCKET_STRAW2: 486 size = sizeof(struct crush_bucket_straw2); 487 break; 488 default: 489 goto bad; 490 } 491 BUG_ON(size == 0); 492 b = c->buckets[i] = kzalloc(size, GFP_NOFS); 493 if (b == NULL) 494 goto badmem; 495 496 ceph_decode_need(p, end, 4*sizeof(u32), bad); 497 b->id = ceph_decode_32(p); 498 b->type = ceph_decode_16(p); 499 b->alg = ceph_decode_8(p); 500 b->hash = ceph_decode_8(p); 501 b->weight = ceph_decode_32(p); 502 b->size = ceph_decode_32(p); 503 504 dout("crush_decode bucket size %d off %x %p to %p\n", 505 b->size, (int)(*p-start), *p, end); 506 507 b->items = kcalloc(b->size, sizeof(__s32), GFP_NOFS); 508 if (b->items == NULL) 509 goto badmem; 510 511 ceph_decode_need(p, end, b->size*sizeof(u32), bad); 512 for (j = 0; j < b->size; j++) 513 b->items[j] = ceph_decode_32(p); 514 515 switch (b->alg) { 516 case CRUSH_BUCKET_UNIFORM: 517 err = crush_decode_uniform_bucket(p, end, 518 (struct crush_bucket_uniform *)b); 519 if (err < 0) 520 goto fail; 521 break; 522 case CRUSH_BUCKET_LIST: 523 err = crush_decode_list_bucket(p, end, 524 (struct crush_bucket_list *)b); 525 if (err < 0) 526 goto fail; 527 break; 528 case CRUSH_BUCKET_TREE: 529 err = crush_decode_tree_bucket(p, end, 530 (struct crush_bucket_tree *)b); 531 if (err < 0) 532 goto fail; 533 break; 534 case CRUSH_BUCKET_STRAW: 535 err = crush_decode_straw_bucket(p, end, 536 (struct crush_bucket_straw *)b); 537 if (err < 0) 538 goto fail; 539 break; 540 case CRUSH_BUCKET_STRAW2: 541 err = crush_decode_straw2_bucket(p, end, 542 (struct crush_bucket_straw2 *)b); 543 if (err < 0) 544 goto fail; 545 break; 546 } 547 } 548 549 /* rules */ 550 dout("rule vec is %p\n", c->rules); 551 for (i = 0; i < c->max_rules; i++) { 552 u32 yes; 553 struct crush_rule *r; 554 555 ceph_decode_32_safe(p, end, yes, bad); 556 if (!yes) { 557 dout("crush_decode NO rule %d off %x %p to %p\n", 558 i, (int)(*p-start), *p, end); 559 c->rules[i] = NULL; 560 continue; 561 } 562 563 dout("crush_decode rule %d off %x %p to %p\n", 564 i, (int)(*p-start), *p, end); 565 566 /* len */ 567 ceph_decode_32_safe(p, end, yes, bad); 568 #if BITS_PER_LONG == 32 569 if (yes > (ULONG_MAX - sizeof(*r)) 570 / sizeof(struct crush_rule_step)) 571 goto bad; 572 #endif 573 r = kmalloc(struct_size(r, steps, yes), GFP_NOFS); 574 c->rules[i] = r; 575 if (r == NULL) 576 goto badmem; 577 dout(" rule %d is at %p\n", i, r); 578 r->len = yes; 579 ceph_decode_copy_safe(p, end, &r->mask, 4, bad); /* 4 u8's */ 580 ceph_decode_need(p, end, r->len*3*sizeof(u32), bad); 581 for (j = 0; j < r->len; j++) { 582 r->steps[j].op = ceph_decode_32(p); 583 r->steps[j].arg1 = ceph_decode_32(p); 584 r->steps[j].arg2 = ceph_decode_32(p); 585 } 586 } 587 588 err = decode_crush_names(p, end, &c->type_names); 589 if (err) 590 goto fail; 591 592 err = decode_crush_names(p, end, &c->names); 593 if (err) 594 goto fail; 595 596 ceph_decode_skip_map(p, end, 32, string, bad); /* rule_name_map */ 597 598 /* tunables */ 599 ceph_decode_need(p, end, 3*sizeof(u32), done); 600 c->choose_local_tries = ceph_decode_32(p); 601 c->choose_local_fallback_tries = ceph_decode_32(p); 602 c->choose_total_tries = ceph_decode_32(p); 603 dout("crush decode tunable choose_local_tries = %d\n", 604 c->choose_local_tries); 605 dout("crush decode tunable choose_local_fallback_tries = %d\n", 606 c->choose_local_fallback_tries); 607 dout("crush decode tunable choose_total_tries = %d\n", 608 c->choose_total_tries); 609 610 ceph_decode_need(p, end, sizeof(u32), done); 611 c->chooseleaf_descend_once = ceph_decode_32(p); 612 dout("crush decode tunable chooseleaf_descend_once = %d\n", 613 c->chooseleaf_descend_once); 614 615 ceph_decode_need(p, end, sizeof(u8), done); 616 c->chooseleaf_vary_r = ceph_decode_8(p); 617 dout("crush decode tunable chooseleaf_vary_r = %d\n", 618 c->chooseleaf_vary_r); 619 620 /* skip straw_calc_version, allowed_bucket_algs */ 621 ceph_decode_need(p, end, sizeof(u8) + sizeof(u32), done); 622 *p += sizeof(u8) + sizeof(u32); 623 624 ceph_decode_need(p, end, sizeof(u8), done); 625 c->chooseleaf_stable = ceph_decode_8(p); 626 dout("crush decode tunable chooseleaf_stable = %d\n", 627 c->chooseleaf_stable); 628 629 if (*p != end) { 630 /* class_map */ 631 ceph_decode_skip_map(p, end, 32, 32, bad); 632 /* class_name */ 633 ceph_decode_skip_map(p, end, 32, string, bad); 634 /* class_bucket */ 635 ceph_decode_skip_map_of_map(p, end, 32, 32, 32, bad); 636 } 637 638 if (*p != end) { 639 err = decode_choose_args(p, end, c); 640 if (err) 641 goto fail; 642 } 643 644 done: 645 crush_finalize(c); 646 dout("crush_decode success\n"); 647 return c; 648 649 badmem: 650 err = -ENOMEM; 651 fail: 652 dout("crush_decode fail %d\n", err); 653 crush_destroy(c); 654 return ERR_PTR(err); 655 656 bad: 657 err = -EINVAL; 658 goto fail; 659 } 660 661 int ceph_pg_compare(const struct ceph_pg *lhs, const struct ceph_pg *rhs) 662 { 663 if (lhs->pool < rhs->pool) 664 return -1; 665 if (lhs->pool > rhs->pool) 666 return 1; 667 if (lhs->seed < rhs->seed) 668 return -1; 669 if (lhs->seed > rhs->seed) 670 return 1; 671 672 return 0; 673 } 674 675 int ceph_spg_compare(const struct ceph_spg *lhs, const struct ceph_spg *rhs) 676 { 677 int ret; 678 679 ret = ceph_pg_compare(&lhs->pgid, &rhs->pgid); 680 if (ret) 681 return ret; 682 683 if (lhs->shard < rhs->shard) 684 return -1; 685 if (lhs->shard > rhs->shard) 686 return 1; 687 688 return 0; 689 } 690 691 static struct ceph_pg_mapping *alloc_pg_mapping(size_t payload_len) 692 { 693 struct ceph_pg_mapping *pg; 694 695 pg = kmalloc(sizeof(*pg) + payload_len, GFP_NOIO); 696 if (!pg) 697 return NULL; 698 699 RB_CLEAR_NODE(&pg->node); 700 return pg; 701 } 702 703 static void free_pg_mapping(struct ceph_pg_mapping *pg) 704 { 705 WARN_ON(!RB_EMPTY_NODE(&pg->node)); 706 707 kfree(pg); 708 } 709 710 /* 711 * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid 712 * to a set of osds) and primary_temp (explicit primary setting) 713 */ 714 DEFINE_RB_FUNCS2(pg_mapping, struct ceph_pg_mapping, pgid, ceph_pg_compare, 715 RB_BYPTR, const struct ceph_pg *, node) 716 717 /* 718 * rbtree of pg pool info 719 */ 720 DEFINE_RB_FUNCS(pg_pool, struct ceph_pg_pool_info, id, node) 721 722 struct ceph_pg_pool_info *ceph_pg_pool_by_id(struct ceph_osdmap *map, u64 id) 723 { 724 return lookup_pg_pool(&map->pg_pools, id); 725 } 726 727 const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id) 728 { 729 struct ceph_pg_pool_info *pi; 730 731 if (id == CEPH_NOPOOL) 732 return NULL; 733 734 if (WARN_ON_ONCE(id > (u64) INT_MAX)) 735 return NULL; 736 737 pi = lookup_pg_pool(&map->pg_pools, id); 738 return pi ? pi->name : NULL; 739 } 740 EXPORT_SYMBOL(ceph_pg_pool_name_by_id); 741 742 int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name) 743 { 744 struct rb_node *rbp; 745 746 for (rbp = rb_first(&map->pg_pools); rbp; rbp = rb_next(rbp)) { 747 struct ceph_pg_pool_info *pi = 748 rb_entry(rbp, struct ceph_pg_pool_info, node); 749 if (pi->name && strcmp(pi->name, name) == 0) 750 return pi->id; 751 } 752 return -ENOENT; 753 } 754 EXPORT_SYMBOL(ceph_pg_poolid_by_name); 755 756 u64 ceph_pg_pool_flags(struct ceph_osdmap *map, u64 id) 757 { 758 struct ceph_pg_pool_info *pi; 759 760 pi = lookup_pg_pool(&map->pg_pools, id); 761 return pi ? pi->flags : 0; 762 } 763 EXPORT_SYMBOL(ceph_pg_pool_flags); 764 765 static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi) 766 { 767 erase_pg_pool(root, pi); 768 kfree(pi->name); 769 kfree(pi); 770 } 771 772 static int decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi) 773 { 774 u8 ev, cv; 775 unsigned len, num; 776 void *pool_end; 777 778 ceph_decode_need(p, end, 2 + 4, bad); 779 ev = ceph_decode_8(p); /* encoding version */ 780 cv = ceph_decode_8(p); /* compat version */ 781 if (ev < 5) { 782 pr_warn("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv); 783 return -EINVAL; 784 } 785 if (cv > 9) { 786 pr_warn("got v %d cv %d > 9 of ceph_pg_pool\n", ev, cv); 787 return -EINVAL; 788 } 789 len = ceph_decode_32(p); 790 ceph_decode_need(p, end, len, bad); 791 pool_end = *p + len; 792 793 pi->type = ceph_decode_8(p); 794 pi->size = ceph_decode_8(p); 795 pi->crush_ruleset = ceph_decode_8(p); 796 pi->object_hash = ceph_decode_8(p); 797 798 pi->pg_num = ceph_decode_32(p); 799 pi->pgp_num = ceph_decode_32(p); 800 801 *p += 4 + 4; /* skip lpg* */ 802 *p += 4; /* skip last_change */ 803 *p += 8 + 4; /* skip snap_seq, snap_epoch */ 804 805 /* skip snaps */ 806 num = ceph_decode_32(p); 807 while (num--) { 808 *p += 8; /* snapid key */ 809 *p += 1 + 1; /* versions */ 810 len = ceph_decode_32(p); 811 *p += len; 812 } 813 814 /* skip removed_snaps */ 815 num = ceph_decode_32(p); 816 *p += num * (8 + 8); 817 818 *p += 8; /* skip auid */ 819 pi->flags = ceph_decode_64(p); 820 *p += 4; /* skip crash_replay_interval */ 821 822 if (ev >= 7) 823 pi->min_size = ceph_decode_8(p); 824 else 825 pi->min_size = pi->size - pi->size / 2; 826 827 if (ev >= 8) 828 *p += 8 + 8; /* skip quota_max_* */ 829 830 if (ev >= 9) { 831 /* skip tiers */ 832 num = ceph_decode_32(p); 833 *p += num * 8; 834 835 *p += 8; /* skip tier_of */ 836 *p += 1; /* skip cache_mode */ 837 838 pi->read_tier = ceph_decode_64(p); 839 pi->write_tier = ceph_decode_64(p); 840 } else { 841 pi->read_tier = -1; 842 pi->write_tier = -1; 843 } 844 845 if (ev >= 10) { 846 /* skip properties */ 847 num = ceph_decode_32(p); 848 while (num--) { 849 len = ceph_decode_32(p); 850 *p += len; /* key */ 851 len = ceph_decode_32(p); 852 *p += len; /* val */ 853 } 854 } 855 856 if (ev >= 11) { 857 /* skip hit_set_params */ 858 *p += 1 + 1; /* versions */ 859 len = ceph_decode_32(p); 860 *p += len; 861 862 *p += 4; /* skip hit_set_period */ 863 *p += 4; /* skip hit_set_count */ 864 } 865 866 if (ev >= 12) 867 *p += 4; /* skip stripe_width */ 868 869 if (ev >= 13) { 870 *p += 8; /* skip target_max_bytes */ 871 *p += 8; /* skip target_max_objects */ 872 *p += 4; /* skip cache_target_dirty_ratio_micro */ 873 *p += 4; /* skip cache_target_full_ratio_micro */ 874 *p += 4; /* skip cache_min_flush_age */ 875 *p += 4; /* skip cache_min_evict_age */ 876 } 877 878 if (ev >= 14) { 879 /* skip erasure_code_profile */ 880 len = ceph_decode_32(p); 881 *p += len; 882 } 883 884 /* 885 * last_force_op_resend_preluminous, will be overridden if the 886 * map was encoded with RESEND_ON_SPLIT 887 */ 888 if (ev >= 15) 889 pi->last_force_request_resend = ceph_decode_32(p); 890 else 891 pi->last_force_request_resend = 0; 892 893 if (ev >= 16) 894 *p += 4; /* skip min_read_recency_for_promote */ 895 896 if (ev >= 17) 897 *p += 8; /* skip expected_num_objects */ 898 899 if (ev >= 19) 900 *p += 4; /* skip cache_target_dirty_high_ratio_micro */ 901 902 if (ev >= 20) 903 *p += 4; /* skip min_write_recency_for_promote */ 904 905 if (ev >= 21) 906 *p += 1; /* skip use_gmt_hitset */ 907 908 if (ev >= 22) 909 *p += 1; /* skip fast_read */ 910 911 if (ev >= 23) { 912 *p += 4; /* skip hit_set_grade_decay_rate */ 913 *p += 4; /* skip hit_set_search_last_n */ 914 } 915 916 if (ev >= 24) { 917 /* skip opts */ 918 *p += 1 + 1; /* versions */ 919 len = ceph_decode_32(p); 920 *p += len; 921 } 922 923 if (ev >= 25) 924 pi->last_force_request_resend = ceph_decode_32(p); 925 926 /* ignore the rest */ 927 928 *p = pool_end; 929 calc_pg_masks(pi); 930 return 0; 931 932 bad: 933 return -EINVAL; 934 } 935 936 static int decode_pool_names(void **p, void *end, struct ceph_osdmap *map) 937 { 938 struct ceph_pg_pool_info *pi; 939 u32 num, len; 940 u64 pool; 941 942 ceph_decode_32_safe(p, end, num, bad); 943 dout(" %d pool names\n", num); 944 while (num--) { 945 ceph_decode_64_safe(p, end, pool, bad); 946 ceph_decode_32_safe(p, end, len, bad); 947 dout(" pool %llu len %d\n", pool, len); 948 ceph_decode_need(p, end, len, bad); 949 pi = lookup_pg_pool(&map->pg_pools, pool); 950 if (pi) { 951 char *name = kstrndup(*p, len, GFP_NOFS); 952 953 if (!name) 954 return -ENOMEM; 955 kfree(pi->name); 956 pi->name = name; 957 dout(" name is %s\n", pi->name); 958 } 959 *p += len; 960 } 961 return 0; 962 963 bad: 964 return -EINVAL; 965 } 966 967 /* 968 * CRUSH workspaces 969 * 970 * workspace_manager framework borrowed from fs/btrfs/compression.c. 971 * Two simplifications: there is only one type of workspace and there 972 * is always at least one workspace. 973 */ 974 static struct crush_work *alloc_workspace(const struct crush_map *c) 975 { 976 struct crush_work *work; 977 size_t work_size; 978 979 WARN_ON(!c->working_size); 980 work_size = crush_work_size(c, CEPH_PG_MAX_SIZE); 981 dout("%s work_size %zu bytes\n", __func__, work_size); 982 983 work = ceph_kvmalloc(work_size, GFP_NOIO); 984 if (!work) 985 return NULL; 986 987 INIT_LIST_HEAD(&work->item); 988 crush_init_workspace(c, work); 989 return work; 990 } 991 992 static void free_workspace(struct crush_work *work) 993 { 994 WARN_ON(!list_empty(&work->item)); 995 kvfree(work); 996 } 997 998 static void init_workspace_manager(struct workspace_manager *wsm) 999 { 1000 INIT_LIST_HEAD(&wsm->idle_ws); 1001 spin_lock_init(&wsm->ws_lock); 1002 atomic_set(&wsm->total_ws, 0); 1003 wsm->free_ws = 0; 1004 init_waitqueue_head(&wsm->ws_wait); 1005 } 1006 1007 static void add_initial_workspace(struct workspace_manager *wsm, 1008 struct crush_work *work) 1009 { 1010 WARN_ON(!list_empty(&wsm->idle_ws)); 1011 1012 list_add(&work->item, &wsm->idle_ws); 1013 atomic_set(&wsm->total_ws, 1); 1014 wsm->free_ws = 1; 1015 } 1016 1017 static void cleanup_workspace_manager(struct workspace_manager *wsm) 1018 { 1019 struct crush_work *work; 1020 1021 while (!list_empty(&wsm->idle_ws)) { 1022 work = list_first_entry(&wsm->idle_ws, struct crush_work, 1023 item); 1024 list_del_init(&work->item); 1025 free_workspace(work); 1026 } 1027 atomic_set(&wsm->total_ws, 0); 1028 wsm->free_ws = 0; 1029 } 1030 1031 /* 1032 * Finds an available workspace or allocates a new one. If it's not 1033 * possible to allocate a new one, waits until there is one. 1034 */ 1035 static struct crush_work *get_workspace(struct workspace_manager *wsm, 1036 const struct crush_map *c) 1037 { 1038 struct crush_work *work; 1039 int cpus = num_online_cpus(); 1040 1041 again: 1042 spin_lock(&wsm->ws_lock); 1043 if (!list_empty(&wsm->idle_ws)) { 1044 work = list_first_entry(&wsm->idle_ws, struct crush_work, 1045 item); 1046 list_del_init(&work->item); 1047 wsm->free_ws--; 1048 spin_unlock(&wsm->ws_lock); 1049 return work; 1050 1051 } 1052 if (atomic_read(&wsm->total_ws) > cpus) { 1053 DEFINE_WAIT(wait); 1054 1055 spin_unlock(&wsm->ws_lock); 1056 prepare_to_wait(&wsm->ws_wait, &wait, TASK_UNINTERRUPTIBLE); 1057 if (atomic_read(&wsm->total_ws) > cpus && !wsm->free_ws) 1058 schedule(); 1059 finish_wait(&wsm->ws_wait, &wait); 1060 goto again; 1061 } 1062 atomic_inc(&wsm->total_ws); 1063 spin_unlock(&wsm->ws_lock); 1064 1065 work = alloc_workspace(c); 1066 if (!work) { 1067 atomic_dec(&wsm->total_ws); 1068 wake_up(&wsm->ws_wait); 1069 1070 /* 1071 * Do not return the error but go back to waiting. We 1072 * have the inital workspace and the CRUSH computation 1073 * time is bounded so we will get it eventually. 1074 */ 1075 WARN_ON(atomic_read(&wsm->total_ws) < 1); 1076 goto again; 1077 } 1078 return work; 1079 } 1080 1081 /* 1082 * Puts a workspace back on the list or frees it if we have enough 1083 * idle ones sitting around. 1084 */ 1085 static void put_workspace(struct workspace_manager *wsm, 1086 struct crush_work *work) 1087 { 1088 spin_lock(&wsm->ws_lock); 1089 if (wsm->free_ws <= num_online_cpus()) { 1090 list_add(&work->item, &wsm->idle_ws); 1091 wsm->free_ws++; 1092 spin_unlock(&wsm->ws_lock); 1093 goto wake; 1094 } 1095 spin_unlock(&wsm->ws_lock); 1096 1097 free_workspace(work); 1098 atomic_dec(&wsm->total_ws); 1099 wake: 1100 if (wq_has_sleeper(&wsm->ws_wait)) 1101 wake_up(&wsm->ws_wait); 1102 } 1103 1104 /* 1105 * osd map 1106 */ 1107 struct ceph_osdmap *ceph_osdmap_alloc(void) 1108 { 1109 struct ceph_osdmap *map; 1110 1111 map = kzalloc(sizeof(*map), GFP_NOIO); 1112 if (!map) 1113 return NULL; 1114 1115 map->pg_pools = RB_ROOT; 1116 map->pool_max = -1; 1117 map->pg_temp = RB_ROOT; 1118 map->primary_temp = RB_ROOT; 1119 map->pg_upmap = RB_ROOT; 1120 map->pg_upmap_items = RB_ROOT; 1121 1122 init_workspace_manager(&map->crush_wsm); 1123 1124 return map; 1125 } 1126 1127 void ceph_osdmap_destroy(struct ceph_osdmap *map) 1128 { 1129 dout("osdmap_destroy %p\n", map); 1130 1131 if (map->crush) 1132 crush_destroy(map->crush); 1133 cleanup_workspace_manager(&map->crush_wsm); 1134 1135 while (!RB_EMPTY_ROOT(&map->pg_temp)) { 1136 struct ceph_pg_mapping *pg = 1137 rb_entry(rb_first(&map->pg_temp), 1138 struct ceph_pg_mapping, node); 1139 erase_pg_mapping(&map->pg_temp, pg); 1140 free_pg_mapping(pg); 1141 } 1142 while (!RB_EMPTY_ROOT(&map->primary_temp)) { 1143 struct ceph_pg_mapping *pg = 1144 rb_entry(rb_first(&map->primary_temp), 1145 struct ceph_pg_mapping, node); 1146 erase_pg_mapping(&map->primary_temp, pg); 1147 free_pg_mapping(pg); 1148 } 1149 while (!RB_EMPTY_ROOT(&map->pg_upmap)) { 1150 struct ceph_pg_mapping *pg = 1151 rb_entry(rb_first(&map->pg_upmap), 1152 struct ceph_pg_mapping, node); 1153 rb_erase(&pg->node, &map->pg_upmap); 1154 kfree(pg); 1155 } 1156 while (!RB_EMPTY_ROOT(&map->pg_upmap_items)) { 1157 struct ceph_pg_mapping *pg = 1158 rb_entry(rb_first(&map->pg_upmap_items), 1159 struct ceph_pg_mapping, node); 1160 rb_erase(&pg->node, &map->pg_upmap_items); 1161 kfree(pg); 1162 } 1163 while (!RB_EMPTY_ROOT(&map->pg_pools)) { 1164 struct ceph_pg_pool_info *pi = 1165 rb_entry(rb_first(&map->pg_pools), 1166 struct ceph_pg_pool_info, node); 1167 __remove_pg_pool(&map->pg_pools, pi); 1168 } 1169 kvfree(map->osd_state); 1170 kvfree(map->osd_weight); 1171 kvfree(map->osd_addr); 1172 kvfree(map->osd_primary_affinity); 1173 kfree(map); 1174 } 1175 1176 /* 1177 * Adjust max_osd value, (re)allocate arrays. 1178 * 1179 * The new elements are properly initialized. 1180 */ 1181 static int osdmap_set_max_osd(struct ceph_osdmap *map, u32 max) 1182 { 1183 u32 *state; 1184 u32 *weight; 1185 struct ceph_entity_addr *addr; 1186 u32 to_copy; 1187 int i; 1188 1189 dout("%s old %u new %u\n", __func__, map->max_osd, max); 1190 if (max == map->max_osd) 1191 return 0; 1192 1193 state = ceph_kvmalloc(array_size(max, sizeof(*state)), GFP_NOFS); 1194 weight = ceph_kvmalloc(array_size(max, sizeof(*weight)), GFP_NOFS); 1195 addr = ceph_kvmalloc(array_size(max, sizeof(*addr)), GFP_NOFS); 1196 if (!state || !weight || !addr) { 1197 kvfree(state); 1198 kvfree(weight); 1199 kvfree(addr); 1200 return -ENOMEM; 1201 } 1202 1203 to_copy = min(map->max_osd, max); 1204 if (map->osd_state) { 1205 memcpy(state, map->osd_state, to_copy * sizeof(*state)); 1206 memcpy(weight, map->osd_weight, to_copy * sizeof(*weight)); 1207 memcpy(addr, map->osd_addr, to_copy * sizeof(*addr)); 1208 kvfree(map->osd_state); 1209 kvfree(map->osd_weight); 1210 kvfree(map->osd_addr); 1211 } 1212 1213 map->osd_state = state; 1214 map->osd_weight = weight; 1215 map->osd_addr = addr; 1216 for (i = map->max_osd; i < max; i++) { 1217 map->osd_state[i] = 0; 1218 map->osd_weight[i] = CEPH_OSD_OUT; 1219 memset(map->osd_addr + i, 0, sizeof(*map->osd_addr)); 1220 } 1221 1222 if (map->osd_primary_affinity) { 1223 u32 *affinity; 1224 1225 affinity = ceph_kvmalloc(array_size(max, sizeof(*affinity)), 1226 GFP_NOFS); 1227 if (!affinity) 1228 return -ENOMEM; 1229 1230 memcpy(affinity, map->osd_primary_affinity, 1231 to_copy * sizeof(*affinity)); 1232 kvfree(map->osd_primary_affinity); 1233 1234 map->osd_primary_affinity = affinity; 1235 for (i = map->max_osd; i < max; i++) 1236 map->osd_primary_affinity[i] = 1237 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY; 1238 } 1239 1240 map->max_osd = max; 1241 1242 return 0; 1243 } 1244 1245 static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush) 1246 { 1247 struct crush_work *work; 1248 1249 if (IS_ERR(crush)) 1250 return PTR_ERR(crush); 1251 1252 work = alloc_workspace(crush); 1253 if (!work) { 1254 crush_destroy(crush); 1255 return -ENOMEM; 1256 } 1257 1258 if (map->crush) 1259 crush_destroy(map->crush); 1260 cleanup_workspace_manager(&map->crush_wsm); 1261 map->crush = crush; 1262 add_initial_workspace(&map->crush_wsm, work); 1263 return 0; 1264 } 1265 1266 #define OSDMAP_WRAPPER_COMPAT_VER 7 1267 #define OSDMAP_CLIENT_DATA_COMPAT_VER 1 1268 1269 /* 1270 * Return 0 or error. On success, *v is set to 0 for old (v6) osdmaps, 1271 * to struct_v of the client_data section for new (v7 and above) 1272 * osdmaps. 1273 */ 1274 static int get_osdmap_client_data_v(void **p, void *end, 1275 const char *prefix, u8 *v) 1276 { 1277 u8 struct_v; 1278 1279 ceph_decode_8_safe(p, end, struct_v, e_inval); 1280 if (struct_v >= 7) { 1281 u8 struct_compat; 1282 1283 ceph_decode_8_safe(p, end, struct_compat, e_inval); 1284 if (struct_compat > OSDMAP_WRAPPER_COMPAT_VER) { 1285 pr_warn("got v %d cv %d > %d of %s ceph_osdmap\n", 1286 struct_v, struct_compat, 1287 OSDMAP_WRAPPER_COMPAT_VER, prefix); 1288 return -EINVAL; 1289 } 1290 *p += 4; /* ignore wrapper struct_len */ 1291 1292 ceph_decode_8_safe(p, end, struct_v, e_inval); 1293 ceph_decode_8_safe(p, end, struct_compat, e_inval); 1294 if (struct_compat > OSDMAP_CLIENT_DATA_COMPAT_VER) { 1295 pr_warn("got v %d cv %d > %d of %s ceph_osdmap client data\n", 1296 struct_v, struct_compat, 1297 OSDMAP_CLIENT_DATA_COMPAT_VER, prefix); 1298 return -EINVAL; 1299 } 1300 *p += 4; /* ignore client data struct_len */ 1301 } else { 1302 u16 version; 1303 1304 *p -= 1; 1305 ceph_decode_16_safe(p, end, version, e_inval); 1306 if (version < 6) { 1307 pr_warn("got v %d < 6 of %s ceph_osdmap\n", 1308 version, prefix); 1309 return -EINVAL; 1310 } 1311 1312 /* old osdmap enconding */ 1313 struct_v = 0; 1314 } 1315 1316 *v = struct_v; 1317 return 0; 1318 1319 e_inval: 1320 return -EINVAL; 1321 } 1322 1323 static int __decode_pools(void **p, void *end, struct ceph_osdmap *map, 1324 bool incremental) 1325 { 1326 u32 n; 1327 1328 ceph_decode_32_safe(p, end, n, e_inval); 1329 while (n--) { 1330 struct ceph_pg_pool_info *pi; 1331 u64 pool; 1332 int ret; 1333 1334 ceph_decode_64_safe(p, end, pool, e_inval); 1335 1336 pi = lookup_pg_pool(&map->pg_pools, pool); 1337 if (!incremental || !pi) { 1338 pi = kzalloc(sizeof(*pi), GFP_NOFS); 1339 if (!pi) 1340 return -ENOMEM; 1341 1342 RB_CLEAR_NODE(&pi->node); 1343 pi->id = pool; 1344 1345 if (!__insert_pg_pool(&map->pg_pools, pi)) { 1346 kfree(pi); 1347 return -EEXIST; 1348 } 1349 } 1350 1351 ret = decode_pool(p, end, pi); 1352 if (ret) 1353 return ret; 1354 } 1355 1356 return 0; 1357 1358 e_inval: 1359 return -EINVAL; 1360 } 1361 1362 static int decode_pools(void **p, void *end, struct ceph_osdmap *map) 1363 { 1364 return __decode_pools(p, end, map, false); 1365 } 1366 1367 static int decode_new_pools(void **p, void *end, struct ceph_osdmap *map) 1368 { 1369 return __decode_pools(p, end, map, true); 1370 } 1371 1372 typedef struct ceph_pg_mapping *(*decode_mapping_fn_t)(void **, void *, bool); 1373 1374 static int decode_pg_mapping(void **p, void *end, struct rb_root *mapping_root, 1375 decode_mapping_fn_t fn, bool incremental) 1376 { 1377 u32 n; 1378 1379 WARN_ON(!incremental && !fn); 1380 1381 ceph_decode_32_safe(p, end, n, e_inval); 1382 while (n--) { 1383 struct ceph_pg_mapping *pg; 1384 struct ceph_pg pgid; 1385 int ret; 1386 1387 ret = ceph_decode_pgid(p, end, &pgid); 1388 if (ret) 1389 return ret; 1390 1391 pg = lookup_pg_mapping(mapping_root, &pgid); 1392 if (pg) { 1393 WARN_ON(!incremental); 1394 erase_pg_mapping(mapping_root, pg); 1395 free_pg_mapping(pg); 1396 } 1397 1398 if (fn) { 1399 pg = fn(p, end, incremental); 1400 if (IS_ERR(pg)) 1401 return PTR_ERR(pg); 1402 1403 if (pg) { 1404 pg->pgid = pgid; /* struct */ 1405 insert_pg_mapping(mapping_root, pg); 1406 } 1407 } 1408 } 1409 1410 return 0; 1411 1412 e_inval: 1413 return -EINVAL; 1414 } 1415 1416 static struct ceph_pg_mapping *__decode_pg_temp(void **p, void *end, 1417 bool incremental) 1418 { 1419 struct ceph_pg_mapping *pg; 1420 u32 len, i; 1421 1422 ceph_decode_32_safe(p, end, len, e_inval); 1423 if (len == 0 && incremental) 1424 return NULL; /* new_pg_temp: [] to remove */ 1425 if (len > (SIZE_MAX - sizeof(*pg)) / sizeof(u32)) 1426 return ERR_PTR(-EINVAL); 1427 1428 ceph_decode_need(p, end, len * sizeof(u32), e_inval); 1429 pg = alloc_pg_mapping(len * sizeof(u32)); 1430 if (!pg) 1431 return ERR_PTR(-ENOMEM); 1432 1433 pg->pg_temp.len = len; 1434 for (i = 0; i < len; i++) 1435 pg->pg_temp.osds[i] = ceph_decode_32(p); 1436 1437 return pg; 1438 1439 e_inval: 1440 return ERR_PTR(-EINVAL); 1441 } 1442 1443 static int decode_pg_temp(void **p, void *end, struct ceph_osdmap *map) 1444 { 1445 return decode_pg_mapping(p, end, &map->pg_temp, __decode_pg_temp, 1446 false); 1447 } 1448 1449 static int decode_new_pg_temp(void **p, void *end, struct ceph_osdmap *map) 1450 { 1451 return decode_pg_mapping(p, end, &map->pg_temp, __decode_pg_temp, 1452 true); 1453 } 1454 1455 static struct ceph_pg_mapping *__decode_primary_temp(void **p, void *end, 1456 bool incremental) 1457 { 1458 struct ceph_pg_mapping *pg; 1459 u32 osd; 1460 1461 ceph_decode_32_safe(p, end, osd, e_inval); 1462 if (osd == (u32)-1 && incremental) 1463 return NULL; /* new_primary_temp: -1 to remove */ 1464 1465 pg = alloc_pg_mapping(0); 1466 if (!pg) 1467 return ERR_PTR(-ENOMEM); 1468 1469 pg->primary_temp.osd = osd; 1470 return pg; 1471 1472 e_inval: 1473 return ERR_PTR(-EINVAL); 1474 } 1475 1476 static int decode_primary_temp(void **p, void *end, struct ceph_osdmap *map) 1477 { 1478 return decode_pg_mapping(p, end, &map->primary_temp, 1479 __decode_primary_temp, false); 1480 } 1481 1482 static int decode_new_primary_temp(void **p, void *end, 1483 struct ceph_osdmap *map) 1484 { 1485 return decode_pg_mapping(p, end, &map->primary_temp, 1486 __decode_primary_temp, true); 1487 } 1488 1489 u32 ceph_get_primary_affinity(struct ceph_osdmap *map, int osd) 1490 { 1491 BUG_ON(osd >= map->max_osd); 1492 1493 if (!map->osd_primary_affinity) 1494 return CEPH_OSD_DEFAULT_PRIMARY_AFFINITY; 1495 1496 return map->osd_primary_affinity[osd]; 1497 } 1498 1499 static int set_primary_affinity(struct ceph_osdmap *map, int osd, u32 aff) 1500 { 1501 BUG_ON(osd >= map->max_osd); 1502 1503 if (!map->osd_primary_affinity) { 1504 int i; 1505 1506 map->osd_primary_affinity = ceph_kvmalloc( 1507 array_size(map->max_osd, sizeof(*map->osd_primary_affinity)), 1508 GFP_NOFS); 1509 if (!map->osd_primary_affinity) 1510 return -ENOMEM; 1511 1512 for (i = 0; i < map->max_osd; i++) 1513 map->osd_primary_affinity[i] = 1514 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY; 1515 } 1516 1517 map->osd_primary_affinity[osd] = aff; 1518 1519 return 0; 1520 } 1521 1522 static int decode_primary_affinity(void **p, void *end, 1523 struct ceph_osdmap *map) 1524 { 1525 u32 len, i; 1526 1527 ceph_decode_32_safe(p, end, len, e_inval); 1528 if (len == 0) { 1529 kvfree(map->osd_primary_affinity); 1530 map->osd_primary_affinity = NULL; 1531 return 0; 1532 } 1533 if (len != map->max_osd) 1534 goto e_inval; 1535 1536 ceph_decode_need(p, end, map->max_osd*sizeof(u32), e_inval); 1537 1538 for (i = 0; i < map->max_osd; i++) { 1539 int ret; 1540 1541 ret = set_primary_affinity(map, i, ceph_decode_32(p)); 1542 if (ret) 1543 return ret; 1544 } 1545 1546 return 0; 1547 1548 e_inval: 1549 return -EINVAL; 1550 } 1551 1552 static int decode_new_primary_affinity(void **p, void *end, 1553 struct ceph_osdmap *map) 1554 { 1555 u32 n; 1556 1557 ceph_decode_32_safe(p, end, n, e_inval); 1558 while (n--) { 1559 u32 osd, aff; 1560 int ret; 1561 1562 ceph_decode_32_safe(p, end, osd, e_inval); 1563 ceph_decode_32_safe(p, end, aff, e_inval); 1564 1565 ret = set_primary_affinity(map, osd, aff); 1566 if (ret) 1567 return ret; 1568 1569 pr_info("osd%d primary-affinity 0x%x\n", osd, aff); 1570 } 1571 1572 return 0; 1573 1574 e_inval: 1575 return -EINVAL; 1576 } 1577 1578 static struct ceph_pg_mapping *__decode_pg_upmap(void **p, void *end, 1579 bool __unused) 1580 { 1581 return __decode_pg_temp(p, end, false); 1582 } 1583 1584 static int decode_pg_upmap(void **p, void *end, struct ceph_osdmap *map) 1585 { 1586 return decode_pg_mapping(p, end, &map->pg_upmap, __decode_pg_upmap, 1587 false); 1588 } 1589 1590 static int decode_new_pg_upmap(void **p, void *end, struct ceph_osdmap *map) 1591 { 1592 return decode_pg_mapping(p, end, &map->pg_upmap, __decode_pg_upmap, 1593 true); 1594 } 1595 1596 static int decode_old_pg_upmap(void **p, void *end, struct ceph_osdmap *map) 1597 { 1598 return decode_pg_mapping(p, end, &map->pg_upmap, NULL, true); 1599 } 1600 1601 static struct ceph_pg_mapping *__decode_pg_upmap_items(void **p, void *end, 1602 bool __unused) 1603 { 1604 struct ceph_pg_mapping *pg; 1605 u32 len, i; 1606 1607 ceph_decode_32_safe(p, end, len, e_inval); 1608 if (len > (SIZE_MAX - sizeof(*pg)) / (2 * sizeof(u32))) 1609 return ERR_PTR(-EINVAL); 1610 1611 ceph_decode_need(p, end, 2 * len * sizeof(u32), e_inval); 1612 pg = alloc_pg_mapping(2 * len * sizeof(u32)); 1613 if (!pg) 1614 return ERR_PTR(-ENOMEM); 1615 1616 pg->pg_upmap_items.len = len; 1617 for (i = 0; i < len; i++) { 1618 pg->pg_upmap_items.from_to[i][0] = ceph_decode_32(p); 1619 pg->pg_upmap_items.from_to[i][1] = ceph_decode_32(p); 1620 } 1621 1622 return pg; 1623 1624 e_inval: 1625 return ERR_PTR(-EINVAL); 1626 } 1627 1628 static int decode_pg_upmap_items(void **p, void *end, struct ceph_osdmap *map) 1629 { 1630 return decode_pg_mapping(p, end, &map->pg_upmap_items, 1631 __decode_pg_upmap_items, false); 1632 } 1633 1634 static int decode_new_pg_upmap_items(void **p, void *end, 1635 struct ceph_osdmap *map) 1636 { 1637 return decode_pg_mapping(p, end, &map->pg_upmap_items, 1638 __decode_pg_upmap_items, true); 1639 } 1640 1641 static int decode_old_pg_upmap_items(void **p, void *end, 1642 struct ceph_osdmap *map) 1643 { 1644 return decode_pg_mapping(p, end, &map->pg_upmap_items, NULL, true); 1645 } 1646 1647 /* 1648 * decode a full map. 1649 */ 1650 static int osdmap_decode(void **p, void *end, bool msgr2, 1651 struct ceph_osdmap *map) 1652 { 1653 u8 struct_v; 1654 u32 epoch = 0; 1655 void *start = *p; 1656 u32 max; 1657 u32 len, i; 1658 int err; 1659 1660 dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p)); 1661 1662 err = get_osdmap_client_data_v(p, end, "full", &struct_v); 1663 if (err) 1664 goto bad; 1665 1666 /* fsid, epoch, created, modified */ 1667 ceph_decode_need(p, end, sizeof(map->fsid) + sizeof(u32) + 1668 sizeof(map->created) + sizeof(map->modified), e_inval); 1669 ceph_decode_copy(p, &map->fsid, sizeof(map->fsid)); 1670 epoch = map->epoch = ceph_decode_32(p); 1671 ceph_decode_copy(p, &map->created, sizeof(map->created)); 1672 ceph_decode_copy(p, &map->modified, sizeof(map->modified)); 1673 1674 /* pools */ 1675 err = decode_pools(p, end, map); 1676 if (err) 1677 goto bad; 1678 1679 /* pool_name */ 1680 err = decode_pool_names(p, end, map); 1681 if (err) 1682 goto bad; 1683 1684 ceph_decode_32_safe(p, end, map->pool_max, e_inval); 1685 1686 ceph_decode_32_safe(p, end, map->flags, e_inval); 1687 1688 /* max_osd */ 1689 ceph_decode_32_safe(p, end, max, e_inval); 1690 1691 /* (re)alloc osd arrays */ 1692 err = osdmap_set_max_osd(map, max); 1693 if (err) 1694 goto bad; 1695 1696 /* osd_state, osd_weight, osd_addrs->client_addr */ 1697 ceph_decode_need(p, end, 3*sizeof(u32) + 1698 map->max_osd*(struct_v >= 5 ? sizeof(u32) : 1699 sizeof(u8)) + 1700 sizeof(*map->osd_weight), e_inval); 1701 if (ceph_decode_32(p) != map->max_osd) 1702 goto e_inval; 1703 1704 if (struct_v >= 5) { 1705 for (i = 0; i < map->max_osd; i++) 1706 map->osd_state[i] = ceph_decode_32(p); 1707 } else { 1708 for (i = 0; i < map->max_osd; i++) 1709 map->osd_state[i] = ceph_decode_8(p); 1710 } 1711 1712 if (ceph_decode_32(p) != map->max_osd) 1713 goto e_inval; 1714 1715 for (i = 0; i < map->max_osd; i++) 1716 map->osd_weight[i] = ceph_decode_32(p); 1717 1718 if (ceph_decode_32(p) != map->max_osd) 1719 goto e_inval; 1720 1721 for (i = 0; i < map->max_osd; i++) { 1722 struct ceph_entity_addr *addr = &map->osd_addr[i]; 1723 1724 if (struct_v >= 8) 1725 err = ceph_decode_entity_addrvec(p, end, msgr2, addr); 1726 else 1727 err = ceph_decode_entity_addr(p, end, addr); 1728 if (err) 1729 goto bad; 1730 1731 dout("%s osd%d addr %s\n", __func__, i, ceph_pr_addr(addr)); 1732 } 1733 1734 /* pg_temp */ 1735 err = decode_pg_temp(p, end, map); 1736 if (err) 1737 goto bad; 1738 1739 /* primary_temp */ 1740 if (struct_v >= 1) { 1741 err = decode_primary_temp(p, end, map); 1742 if (err) 1743 goto bad; 1744 } 1745 1746 /* primary_affinity */ 1747 if (struct_v >= 2) { 1748 err = decode_primary_affinity(p, end, map); 1749 if (err) 1750 goto bad; 1751 } else { 1752 WARN_ON(map->osd_primary_affinity); 1753 } 1754 1755 /* crush */ 1756 ceph_decode_32_safe(p, end, len, e_inval); 1757 err = osdmap_set_crush(map, crush_decode(*p, min(*p + len, end))); 1758 if (err) 1759 goto bad; 1760 1761 *p += len; 1762 if (struct_v >= 3) { 1763 /* erasure_code_profiles */ 1764 ceph_decode_skip_map_of_map(p, end, string, string, string, 1765 e_inval); 1766 } 1767 1768 if (struct_v >= 4) { 1769 err = decode_pg_upmap(p, end, map); 1770 if (err) 1771 goto bad; 1772 1773 err = decode_pg_upmap_items(p, end, map); 1774 if (err) 1775 goto bad; 1776 } else { 1777 WARN_ON(!RB_EMPTY_ROOT(&map->pg_upmap)); 1778 WARN_ON(!RB_EMPTY_ROOT(&map->pg_upmap_items)); 1779 } 1780 1781 /* ignore the rest */ 1782 *p = end; 1783 1784 dout("full osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd); 1785 return 0; 1786 1787 e_inval: 1788 err = -EINVAL; 1789 bad: 1790 pr_err("corrupt full osdmap (%d) epoch %d off %d (%p of %p-%p)\n", 1791 err, epoch, (int)(*p - start), *p, start, end); 1792 print_hex_dump(KERN_DEBUG, "osdmap: ", 1793 DUMP_PREFIX_OFFSET, 16, 1, 1794 start, end - start, true); 1795 return err; 1796 } 1797 1798 /* 1799 * Allocate and decode a full map. 1800 */ 1801 struct ceph_osdmap *ceph_osdmap_decode(void **p, void *end, bool msgr2) 1802 { 1803 struct ceph_osdmap *map; 1804 int ret; 1805 1806 map = ceph_osdmap_alloc(); 1807 if (!map) 1808 return ERR_PTR(-ENOMEM); 1809 1810 ret = osdmap_decode(p, end, msgr2, map); 1811 if (ret) { 1812 ceph_osdmap_destroy(map); 1813 return ERR_PTR(ret); 1814 } 1815 1816 return map; 1817 } 1818 1819 /* 1820 * Encoding order is (new_up_client, new_state, new_weight). Need to 1821 * apply in the (new_weight, new_state, new_up_client) order, because 1822 * an incremental map may look like e.g. 1823 * 1824 * new_up_client: { osd=6, addr=... } # set osd_state and addr 1825 * new_state: { osd=6, xorstate=EXISTS } # clear osd_state 1826 */ 1827 static int decode_new_up_state_weight(void **p, void *end, u8 struct_v, 1828 bool msgr2, struct ceph_osdmap *map) 1829 { 1830 void *new_up_client; 1831 void *new_state; 1832 void *new_weight_end; 1833 u32 len; 1834 int ret; 1835 int i; 1836 1837 new_up_client = *p; 1838 ceph_decode_32_safe(p, end, len, e_inval); 1839 for (i = 0; i < len; ++i) { 1840 struct ceph_entity_addr addr; 1841 1842 ceph_decode_skip_32(p, end, e_inval); 1843 if (struct_v >= 7) 1844 ret = ceph_decode_entity_addrvec(p, end, msgr2, &addr); 1845 else 1846 ret = ceph_decode_entity_addr(p, end, &addr); 1847 if (ret) 1848 return ret; 1849 } 1850 1851 new_state = *p; 1852 ceph_decode_32_safe(p, end, len, e_inval); 1853 len *= sizeof(u32) + (struct_v >= 5 ? sizeof(u32) : sizeof(u8)); 1854 ceph_decode_need(p, end, len, e_inval); 1855 *p += len; 1856 1857 /* new_weight */ 1858 ceph_decode_32_safe(p, end, len, e_inval); 1859 while (len--) { 1860 s32 osd; 1861 u32 w; 1862 1863 ceph_decode_need(p, end, 2*sizeof(u32), e_inval); 1864 osd = ceph_decode_32(p); 1865 w = ceph_decode_32(p); 1866 BUG_ON(osd >= map->max_osd); 1867 pr_info("osd%d weight 0x%x %s\n", osd, w, 1868 w == CEPH_OSD_IN ? "(in)" : 1869 (w == CEPH_OSD_OUT ? "(out)" : "")); 1870 map->osd_weight[osd] = w; 1871 1872 /* 1873 * If we are marking in, set the EXISTS, and clear the 1874 * AUTOOUT and NEW bits. 1875 */ 1876 if (w) { 1877 map->osd_state[osd] |= CEPH_OSD_EXISTS; 1878 map->osd_state[osd] &= ~(CEPH_OSD_AUTOOUT | 1879 CEPH_OSD_NEW); 1880 } 1881 } 1882 new_weight_end = *p; 1883 1884 /* new_state (up/down) */ 1885 *p = new_state; 1886 len = ceph_decode_32(p); 1887 while (len--) { 1888 s32 osd; 1889 u32 xorstate; 1890 1891 osd = ceph_decode_32(p); 1892 if (struct_v >= 5) 1893 xorstate = ceph_decode_32(p); 1894 else 1895 xorstate = ceph_decode_8(p); 1896 if (xorstate == 0) 1897 xorstate = CEPH_OSD_UP; 1898 BUG_ON(osd >= map->max_osd); 1899 if ((map->osd_state[osd] & CEPH_OSD_UP) && 1900 (xorstate & CEPH_OSD_UP)) 1901 pr_info("osd%d down\n", osd); 1902 if ((map->osd_state[osd] & CEPH_OSD_EXISTS) && 1903 (xorstate & CEPH_OSD_EXISTS)) { 1904 pr_info("osd%d does not exist\n", osd); 1905 ret = set_primary_affinity(map, osd, 1906 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY); 1907 if (ret) 1908 return ret; 1909 memset(map->osd_addr + osd, 0, sizeof(*map->osd_addr)); 1910 map->osd_state[osd] = 0; 1911 } else { 1912 map->osd_state[osd] ^= xorstate; 1913 } 1914 } 1915 1916 /* new_up_client */ 1917 *p = new_up_client; 1918 len = ceph_decode_32(p); 1919 while (len--) { 1920 s32 osd; 1921 struct ceph_entity_addr addr; 1922 1923 osd = ceph_decode_32(p); 1924 BUG_ON(osd >= map->max_osd); 1925 if (struct_v >= 7) 1926 ret = ceph_decode_entity_addrvec(p, end, msgr2, &addr); 1927 else 1928 ret = ceph_decode_entity_addr(p, end, &addr); 1929 if (ret) 1930 return ret; 1931 1932 dout("%s osd%d addr %s\n", __func__, osd, ceph_pr_addr(&addr)); 1933 1934 pr_info("osd%d up\n", osd); 1935 map->osd_state[osd] |= CEPH_OSD_EXISTS | CEPH_OSD_UP; 1936 map->osd_addr[osd] = addr; 1937 } 1938 1939 *p = new_weight_end; 1940 return 0; 1941 1942 e_inval: 1943 return -EINVAL; 1944 } 1945 1946 /* 1947 * decode and apply an incremental map update. 1948 */ 1949 struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, bool msgr2, 1950 struct ceph_osdmap *map) 1951 { 1952 struct ceph_fsid fsid; 1953 u32 epoch = 0; 1954 struct ceph_timespec modified; 1955 s32 len; 1956 u64 pool; 1957 __s64 new_pool_max; 1958 __s32 new_flags, max; 1959 void *start = *p; 1960 int err; 1961 u8 struct_v; 1962 1963 dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p)); 1964 1965 err = get_osdmap_client_data_v(p, end, "inc", &struct_v); 1966 if (err) 1967 goto bad; 1968 1969 /* fsid, epoch, modified, new_pool_max, new_flags */ 1970 ceph_decode_need(p, end, sizeof(fsid) + sizeof(u32) + sizeof(modified) + 1971 sizeof(u64) + sizeof(u32), e_inval); 1972 ceph_decode_copy(p, &fsid, sizeof(fsid)); 1973 epoch = ceph_decode_32(p); 1974 BUG_ON(epoch != map->epoch+1); 1975 ceph_decode_copy(p, &modified, sizeof(modified)); 1976 new_pool_max = ceph_decode_64(p); 1977 new_flags = ceph_decode_32(p); 1978 1979 /* full map? */ 1980 ceph_decode_32_safe(p, end, len, e_inval); 1981 if (len > 0) { 1982 dout("apply_incremental full map len %d, %p to %p\n", 1983 len, *p, end); 1984 return ceph_osdmap_decode(p, min(*p+len, end), msgr2); 1985 } 1986 1987 /* new crush? */ 1988 ceph_decode_32_safe(p, end, len, e_inval); 1989 if (len > 0) { 1990 err = osdmap_set_crush(map, 1991 crush_decode(*p, min(*p + len, end))); 1992 if (err) 1993 goto bad; 1994 *p += len; 1995 } 1996 1997 /* new flags? */ 1998 if (new_flags >= 0) 1999 map->flags = new_flags; 2000 if (new_pool_max >= 0) 2001 map->pool_max = new_pool_max; 2002 2003 /* new max? */ 2004 ceph_decode_32_safe(p, end, max, e_inval); 2005 if (max >= 0) { 2006 err = osdmap_set_max_osd(map, max); 2007 if (err) 2008 goto bad; 2009 } 2010 2011 map->epoch++; 2012 map->modified = modified; 2013 2014 /* new_pools */ 2015 err = decode_new_pools(p, end, map); 2016 if (err) 2017 goto bad; 2018 2019 /* new_pool_names */ 2020 err = decode_pool_names(p, end, map); 2021 if (err) 2022 goto bad; 2023 2024 /* old_pool */ 2025 ceph_decode_32_safe(p, end, len, e_inval); 2026 while (len--) { 2027 struct ceph_pg_pool_info *pi; 2028 2029 ceph_decode_64_safe(p, end, pool, e_inval); 2030 pi = lookup_pg_pool(&map->pg_pools, pool); 2031 if (pi) 2032 __remove_pg_pool(&map->pg_pools, pi); 2033 } 2034 2035 /* new_up_client, new_state, new_weight */ 2036 err = decode_new_up_state_weight(p, end, struct_v, msgr2, map); 2037 if (err) 2038 goto bad; 2039 2040 /* new_pg_temp */ 2041 err = decode_new_pg_temp(p, end, map); 2042 if (err) 2043 goto bad; 2044 2045 /* new_primary_temp */ 2046 if (struct_v >= 1) { 2047 err = decode_new_primary_temp(p, end, map); 2048 if (err) 2049 goto bad; 2050 } 2051 2052 /* new_primary_affinity */ 2053 if (struct_v >= 2) { 2054 err = decode_new_primary_affinity(p, end, map); 2055 if (err) 2056 goto bad; 2057 } 2058 2059 if (struct_v >= 3) { 2060 /* new_erasure_code_profiles */ 2061 ceph_decode_skip_map_of_map(p, end, string, string, string, 2062 e_inval); 2063 /* old_erasure_code_profiles */ 2064 ceph_decode_skip_set(p, end, string, e_inval); 2065 } 2066 2067 if (struct_v >= 4) { 2068 err = decode_new_pg_upmap(p, end, map); 2069 if (err) 2070 goto bad; 2071 2072 err = decode_old_pg_upmap(p, end, map); 2073 if (err) 2074 goto bad; 2075 2076 err = decode_new_pg_upmap_items(p, end, map); 2077 if (err) 2078 goto bad; 2079 2080 err = decode_old_pg_upmap_items(p, end, map); 2081 if (err) 2082 goto bad; 2083 } 2084 2085 /* ignore the rest */ 2086 *p = end; 2087 2088 dout("inc osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd); 2089 return map; 2090 2091 e_inval: 2092 err = -EINVAL; 2093 bad: 2094 pr_err("corrupt inc osdmap (%d) epoch %d off %d (%p of %p-%p)\n", 2095 err, epoch, (int)(*p - start), *p, start, end); 2096 print_hex_dump(KERN_DEBUG, "osdmap: ", 2097 DUMP_PREFIX_OFFSET, 16, 1, 2098 start, end - start, true); 2099 return ERR_PTR(err); 2100 } 2101 2102 void ceph_oloc_copy(struct ceph_object_locator *dest, 2103 const struct ceph_object_locator *src) 2104 { 2105 ceph_oloc_destroy(dest); 2106 2107 dest->pool = src->pool; 2108 if (src->pool_ns) 2109 dest->pool_ns = ceph_get_string(src->pool_ns); 2110 else 2111 dest->pool_ns = NULL; 2112 } 2113 EXPORT_SYMBOL(ceph_oloc_copy); 2114 2115 void ceph_oloc_destroy(struct ceph_object_locator *oloc) 2116 { 2117 ceph_put_string(oloc->pool_ns); 2118 } 2119 EXPORT_SYMBOL(ceph_oloc_destroy); 2120 2121 void ceph_oid_copy(struct ceph_object_id *dest, 2122 const struct ceph_object_id *src) 2123 { 2124 ceph_oid_destroy(dest); 2125 2126 if (src->name != src->inline_name) { 2127 /* very rare, see ceph_object_id definition */ 2128 dest->name = kmalloc(src->name_len + 1, 2129 GFP_NOIO | __GFP_NOFAIL); 2130 } else { 2131 dest->name = dest->inline_name; 2132 } 2133 memcpy(dest->name, src->name, src->name_len + 1); 2134 dest->name_len = src->name_len; 2135 } 2136 EXPORT_SYMBOL(ceph_oid_copy); 2137 2138 static __printf(2, 0) 2139 int oid_printf_vargs(struct ceph_object_id *oid, const char *fmt, va_list ap) 2140 { 2141 int len; 2142 2143 WARN_ON(!ceph_oid_empty(oid)); 2144 2145 len = vsnprintf(oid->inline_name, sizeof(oid->inline_name), fmt, ap); 2146 if (len >= sizeof(oid->inline_name)) 2147 return len; 2148 2149 oid->name_len = len; 2150 return 0; 2151 } 2152 2153 /* 2154 * If oid doesn't fit into inline buffer, BUG. 2155 */ 2156 void ceph_oid_printf(struct ceph_object_id *oid, const char *fmt, ...) 2157 { 2158 va_list ap; 2159 2160 va_start(ap, fmt); 2161 BUG_ON(oid_printf_vargs(oid, fmt, ap)); 2162 va_end(ap); 2163 } 2164 EXPORT_SYMBOL(ceph_oid_printf); 2165 2166 static __printf(3, 0) 2167 int oid_aprintf_vargs(struct ceph_object_id *oid, gfp_t gfp, 2168 const char *fmt, va_list ap) 2169 { 2170 va_list aq; 2171 int len; 2172 2173 va_copy(aq, ap); 2174 len = oid_printf_vargs(oid, fmt, aq); 2175 va_end(aq); 2176 2177 if (len) { 2178 char *external_name; 2179 2180 external_name = kmalloc(len + 1, gfp); 2181 if (!external_name) 2182 return -ENOMEM; 2183 2184 oid->name = external_name; 2185 WARN_ON(vsnprintf(oid->name, len + 1, fmt, ap) != len); 2186 oid->name_len = len; 2187 } 2188 2189 return 0; 2190 } 2191 2192 /* 2193 * If oid doesn't fit into inline buffer, allocate. 2194 */ 2195 int ceph_oid_aprintf(struct ceph_object_id *oid, gfp_t gfp, 2196 const char *fmt, ...) 2197 { 2198 va_list ap; 2199 int ret; 2200 2201 va_start(ap, fmt); 2202 ret = oid_aprintf_vargs(oid, gfp, fmt, ap); 2203 va_end(ap); 2204 2205 return ret; 2206 } 2207 EXPORT_SYMBOL(ceph_oid_aprintf); 2208 2209 void ceph_oid_destroy(struct ceph_object_id *oid) 2210 { 2211 if (oid->name != oid->inline_name) 2212 kfree(oid->name); 2213 } 2214 EXPORT_SYMBOL(ceph_oid_destroy); 2215 2216 /* 2217 * osds only 2218 */ 2219 static bool __osds_equal(const struct ceph_osds *lhs, 2220 const struct ceph_osds *rhs) 2221 { 2222 if (lhs->size == rhs->size && 2223 !memcmp(lhs->osds, rhs->osds, rhs->size * sizeof(rhs->osds[0]))) 2224 return true; 2225 2226 return false; 2227 } 2228 2229 /* 2230 * osds + primary 2231 */ 2232 static bool osds_equal(const struct ceph_osds *lhs, 2233 const struct ceph_osds *rhs) 2234 { 2235 if (__osds_equal(lhs, rhs) && 2236 lhs->primary == rhs->primary) 2237 return true; 2238 2239 return false; 2240 } 2241 2242 static bool osds_valid(const struct ceph_osds *set) 2243 { 2244 /* non-empty set */ 2245 if (set->size > 0 && set->primary >= 0) 2246 return true; 2247 2248 /* empty can_shift_osds set */ 2249 if (!set->size && set->primary == -1) 2250 return true; 2251 2252 /* empty !can_shift_osds set - all NONE */ 2253 if (set->size > 0 && set->primary == -1) { 2254 int i; 2255 2256 for (i = 0; i < set->size; i++) { 2257 if (set->osds[i] != CRUSH_ITEM_NONE) 2258 break; 2259 } 2260 if (i == set->size) 2261 return true; 2262 } 2263 2264 return false; 2265 } 2266 2267 void ceph_osds_copy(struct ceph_osds *dest, const struct ceph_osds *src) 2268 { 2269 memcpy(dest->osds, src->osds, src->size * sizeof(src->osds[0])); 2270 dest->size = src->size; 2271 dest->primary = src->primary; 2272 } 2273 2274 bool ceph_pg_is_split(const struct ceph_pg *pgid, u32 old_pg_num, 2275 u32 new_pg_num) 2276 { 2277 int old_bits = calc_bits_of(old_pg_num); 2278 int old_mask = (1 << old_bits) - 1; 2279 int n; 2280 2281 WARN_ON(pgid->seed >= old_pg_num); 2282 if (new_pg_num <= old_pg_num) 2283 return false; 2284 2285 for (n = 1; ; n++) { 2286 int next_bit = n << (old_bits - 1); 2287 u32 s = next_bit | pgid->seed; 2288 2289 if (s < old_pg_num || s == pgid->seed) 2290 continue; 2291 if (s >= new_pg_num) 2292 break; 2293 2294 s = ceph_stable_mod(s, old_pg_num, old_mask); 2295 if (s == pgid->seed) 2296 return true; 2297 } 2298 2299 return false; 2300 } 2301 2302 bool ceph_is_new_interval(const struct ceph_osds *old_acting, 2303 const struct ceph_osds *new_acting, 2304 const struct ceph_osds *old_up, 2305 const struct ceph_osds *new_up, 2306 int old_size, 2307 int new_size, 2308 int old_min_size, 2309 int new_min_size, 2310 u32 old_pg_num, 2311 u32 new_pg_num, 2312 bool old_sort_bitwise, 2313 bool new_sort_bitwise, 2314 bool old_recovery_deletes, 2315 bool new_recovery_deletes, 2316 const struct ceph_pg *pgid) 2317 { 2318 return !osds_equal(old_acting, new_acting) || 2319 !osds_equal(old_up, new_up) || 2320 old_size != new_size || 2321 old_min_size != new_min_size || 2322 ceph_pg_is_split(pgid, old_pg_num, new_pg_num) || 2323 old_sort_bitwise != new_sort_bitwise || 2324 old_recovery_deletes != new_recovery_deletes; 2325 } 2326 2327 static int calc_pg_rank(int osd, const struct ceph_osds *acting) 2328 { 2329 int i; 2330 2331 for (i = 0; i < acting->size; i++) { 2332 if (acting->osds[i] == osd) 2333 return i; 2334 } 2335 2336 return -1; 2337 } 2338 2339 static bool primary_changed(const struct ceph_osds *old_acting, 2340 const struct ceph_osds *new_acting) 2341 { 2342 if (!old_acting->size && !new_acting->size) 2343 return false; /* both still empty */ 2344 2345 if (!old_acting->size ^ !new_acting->size) 2346 return true; /* was empty, now not, or vice versa */ 2347 2348 if (old_acting->primary != new_acting->primary) 2349 return true; /* primary changed */ 2350 2351 if (calc_pg_rank(old_acting->primary, old_acting) != 2352 calc_pg_rank(new_acting->primary, new_acting)) 2353 return true; 2354 2355 return false; /* same primary (tho replicas may have changed) */ 2356 } 2357 2358 bool ceph_osds_changed(const struct ceph_osds *old_acting, 2359 const struct ceph_osds *new_acting, 2360 bool any_change) 2361 { 2362 if (primary_changed(old_acting, new_acting)) 2363 return true; 2364 2365 if (any_change && !__osds_equal(old_acting, new_acting)) 2366 return true; 2367 2368 return false; 2369 } 2370 2371 /* 2372 * Map an object into a PG. 2373 * 2374 * Should only be called with target_oid and target_oloc (as opposed to 2375 * base_oid and base_oloc), since tiering isn't taken into account. 2376 */ 2377 void __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi, 2378 const struct ceph_object_id *oid, 2379 const struct ceph_object_locator *oloc, 2380 struct ceph_pg *raw_pgid) 2381 { 2382 WARN_ON(pi->id != oloc->pool); 2383 2384 if (!oloc->pool_ns) { 2385 raw_pgid->pool = oloc->pool; 2386 raw_pgid->seed = ceph_str_hash(pi->object_hash, oid->name, 2387 oid->name_len); 2388 dout("%s %s -> raw_pgid %llu.%x\n", __func__, oid->name, 2389 raw_pgid->pool, raw_pgid->seed); 2390 } else { 2391 char stack_buf[256]; 2392 char *buf = stack_buf; 2393 int nsl = oloc->pool_ns->len; 2394 size_t total = nsl + 1 + oid->name_len; 2395 2396 if (total > sizeof(stack_buf)) 2397 buf = kmalloc(total, GFP_NOIO | __GFP_NOFAIL); 2398 memcpy(buf, oloc->pool_ns->str, nsl); 2399 buf[nsl] = '\037'; 2400 memcpy(buf + nsl + 1, oid->name, oid->name_len); 2401 raw_pgid->pool = oloc->pool; 2402 raw_pgid->seed = ceph_str_hash(pi->object_hash, buf, total); 2403 if (buf != stack_buf) 2404 kfree(buf); 2405 dout("%s %s ns %.*s -> raw_pgid %llu.%x\n", __func__, 2406 oid->name, nsl, oloc->pool_ns->str, 2407 raw_pgid->pool, raw_pgid->seed); 2408 } 2409 } 2410 2411 int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap, 2412 const struct ceph_object_id *oid, 2413 const struct ceph_object_locator *oloc, 2414 struct ceph_pg *raw_pgid) 2415 { 2416 struct ceph_pg_pool_info *pi; 2417 2418 pi = ceph_pg_pool_by_id(osdmap, oloc->pool); 2419 if (!pi) 2420 return -ENOENT; 2421 2422 __ceph_object_locator_to_pg(pi, oid, oloc, raw_pgid); 2423 return 0; 2424 } 2425 EXPORT_SYMBOL(ceph_object_locator_to_pg); 2426 2427 /* 2428 * Map a raw PG (full precision ps) into an actual PG. 2429 */ 2430 static void raw_pg_to_pg(struct ceph_pg_pool_info *pi, 2431 const struct ceph_pg *raw_pgid, 2432 struct ceph_pg *pgid) 2433 { 2434 pgid->pool = raw_pgid->pool; 2435 pgid->seed = ceph_stable_mod(raw_pgid->seed, pi->pg_num, 2436 pi->pg_num_mask); 2437 } 2438 2439 /* 2440 * Map a raw PG (full precision ps) into a placement ps (placement 2441 * seed). Include pool id in that value so that different pools don't 2442 * use the same seeds. 2443 */ 2444 static u32 raw_pg_to_pps(struct ceph_pg_pool_info *pi, 2445 const struct ceph_pg *raw_pgid) 2446 { 2447 if (pi->flags & CEPH_POOL_FLAG_HASHPSPOOL) { 2448 /* hash pool id and seed so that pool PGs do not overlap */ 2449 return crush_hash32_2(CRUSH_HASH_RJENKINS1, 2450 ceph_stable_mod(raw_pgid->seed, 2451 pi->pgp_num, 2452 pi->pgp_num_mask), 2453 raw_pgid->pool); 2454 } else { 2455 /* 2456 * legacy behavior: add ps and pool together. this is 2457 * not a great approach because the PGs from each pool 2458 * will overlap on top of each other: 0.5 == 1.4 == 2459 * 2.3 == ... 2460 */ 2461 return ceph_stable_mod(raw_pgid->seed, pi->pgp_num, 2462 pi->pgp_num_mask) + 2463 (unsigned)raw_pgid->pool; 2464 } 2465 } 2466 2467 /* 2468 * Magic value used for a "default" fallback choose_args, used if the 2469 * crush_choose_arg_map passed to do_crush() does not exist. If this 2470 * also doesn't exist, fall back to canonical weights. 2471 */ 2472 #define CEPH_DEFAULT_CHOOSE_ARGS -1 2473 2474 static int do_crush(struct ceph_osdmap *map, int ruleno, int x, 2475 int *result, int result_max, 2476 const __u32 *weight, int weight_max, 2477 s64 choose_args_index) 2478 { 2479 struct crush_choose_arg_map *arg_map; 2480 struct crush_work *work; 2481 int r; 2482 2483 BUG_ON(result_max > CEPH_PG_MAX_SIZE); 2484 2485 arg_map = lookup_choose_arg_map(&map->crush->choose_args, 2486 choose_args_index); 2487 if (!arg_map) 2488 arg_map = lookup_choose_arg_map(&map->crush->choose_args, 2489 CEPH_DEFAULT_CHOOSE_ARGS); 2490 2491 work = get_workspace(&map->crush_wsm, map->crush); 2492 r = crush_do_rule(map->crush, ruleno, x, result, result_max, 2493 weight, weight_max, work, 2494 arg_map ? arg_map->args : NULL); 2495 put_workspace(&map->crush_wsm, work); 2496 return r; 2497 } 2498 2499 static void remove_nonexistent_osds(struct ceph_osdmap *osdmap, 2500 struct ceph_pg_pool_info *pi, 2501 struct ceph_osds *set) 2502 { 2503 int i; 2504 2505 if (ceph_can_shift_osds(pi)) { 2506 int removed = 0; 2507 2508 /* shift left */ 2509 for (i = 0; i < set->size; i++) { 2510 if (!ceph_osd_exists(osdmap, set->osds[i])) { 2511 removed++; 2512 continue; 2513 } 2514 if (removed) 2515 set->osds[i - removed] = set->osds[i]; 2516 } 2517 set->size -= removed; 2518 } else { 2519 /* set dne devices to NONE */ 2520 for (i = 0; i < set->size; i++) { 2521 if (!ceph_osd_exists(osdmap, set->osds[i])) 2522 set->osds[i] = CRUSH_ITEM_NONE; 2523 } 2524 } 2525 } 2526 2527 /* 2528 * Calculate raw set (CRUSH output) for given PG and filter out 2529 * nonexistent OSDs. ->primary is undefined for a raw set. 2530 * 2531 * Placement seed (CRUSH input) is returned through @ppps. 2532 */ 2533 static void pg_to_raw_osds(struct ceph_osdmap *osdmap, 2534 struct ceph_pg_pool_info *pi, 2535 const struct ceph_pg *raw_pgid, 2536 struct ceph_osds *raw, 2537 u32 *ppps) 2538 { 2539 u32 pps = raw_pg_to_pps(pi, raw_pgid); 2540 int ruleno; 2541 int len; 2542 2543 ceph_osds_init(raw); 2544 if (ppps) 2545 *ppps = pps; 2546 2547 ruleno = crush_find_rule(osdmap->crush, pi->crush_ruleset, pi->type, 2548 pi->size); 2549 if (ruleno < 0) { 2550 pr_err("no crush rule: pool %lld ruleset %d type %d size %d\n", 2551 pi->id, pi->crush_ruleset, pi->type, pi->size); 2552 return; 2553 } 2554 2555 if (pi->size > ARRAY_SIZE(raw->osds)) { 2556 pr_err_ratelimited("pool %lld ruleset %d type %d too wide: size %d > %zu\n", 2557 pi->id, pi->crush_ruleset, pi->type, pi->size, 2558 ARRAY_SIZE(raw->osds)); 2559 return; 2560 } 2561 2562 len = do_crush(osdmap, ruleno, pps, raw->osds, pi->size, 2563 osdmap->osd_weight, osdmap->max_osd, pi->id); 2564 if (len < 0) { 2565 pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n", 2566 len, ruleno, pi->id, pi->crush_ruleset, pi->type, 2567 pi->size); 2568 return; 2569 } 2570 2571 raw->size = len; 2572 remove_nonexistent_osds(osdmap, pi, raw); 2573 } 2574 2575 /* apply pg_upmap[_items] mappings */ 2576 static void apply_upmap(struct ceph_osdmap *osdmap, 2577 const struct ceph_pg *pgid, 2578 struct ceph_osds *raw) 2579 { 2580 struct ceph_pg_mapping *pg; 2581 int i, j; 2582 2583 pg = lookup_pg_mapping(&osdmap->pg_upmap, pgid); 2584 if (pg) { 2585 /* make sure targets aren't marked out */ 2586 for (i = 0; i < pg->pg_upmap.len; i++) { 2587 int osd = pg->pg_upmap.osds[i]; 2588 2589 if (osd != CRUSH_ITEM_NONE && 2590 osd < osdmap->max_osd && 2591 osdmap->osd_weight[osd] == 0) { 2592 /* reject/ignore explicit mapping */ 2593 return; 2594 } 2595 } 2596 for (i = 0; i < pg->pg_upmap.len; i++) 2597 raw->osds[i] = pg->pg_upmap.osds[i]; 2598 raw->size = pg->pg_upmap.len; 2599 /* check and apply pg_upmap_items, if any */ 2600 } 2601 2602 pg = lookup_pg_mapping(&osdmap->pg_upmap_items, pgid); 2603 if (pg) { 2604 /* 2605 * Note: this approach does not allow a bidirectional swap, 2606 * e.g., [[1,2],[2,1]] applied to [0,1,2] -> [0,2,1]. 2607 */ 2608 for (i = 0; i < pg->pg_upmap_items.len; i++) { 2609 int from = pg->pg_upmap_items.from_to[i][0]; 2610 int to = pg->pg_upmap_items.from_to[i][1]; 2611 int pos = -1; 2612 bool exists = false; 2613 2614 /* make sure replacement doesn't already appear */ 2615 for (j = 0; j < raw->size; j++) { 2616 int osd = raw->osds[j]; 2617 2618 if (osd == to) { 2619 exists = true; 2620 break; 2621 } 2622 /* ignore mapping if target is marked out */ 2623 if (osd == from && pos < 0 && 2624 !(to != CRUSH_ITEM_NONE && 2625 to < osdmap->max_osd && 2626 osdmap->osd_weight[to] == 0)) { 2627 pos = j; 2628 } 2629 } 2630 if (!exists && pos >= 0) 2631 raw->osds[pos] = to; 2632 } 2633 } 2634 } 2635 2636 /* 2637 * Given raw set, calculate up set and up primary. By definition of an 2638 * up set, the result won't contain nonexistent or down OSDs. 2639 * 2640 * This is done in-place - on return @set is the up set. If it's 2641 * empty, ->primary will remain undefined. 2642 */ 2643 static void raw_to_up_osds(struct ceph_osdmap *osdmap, 2644 struct ceph_pg_pool_info *pi, 2645 struct ceph_osds *set) 2646 { 2647 int i; 2648 2649 /* ->primary is undefined for a raw set */ 2650 BUG_ON(set->primary != -1); 2651 2652 if (ceph_can_shift_osds(pi)) { 2653 int removed = 0; 2654 2655 /* shift left */ 2656 for (i = 0; i < set->size; i++) { 2657 if (ceph_osd_is_down(osdmap, set->osds[i])) { 2658 removed++; 2659 continue; 2660 } 2661 if (removed) 2662 set->osds[i - removed] = set->osds[i]; 2663 } 2664 set->size -= removed; 2665 if (set->size > 0) 2666 set->primary = set->osds[0]; 2667 } else { 2668 /* set down/dne devices to NONE */ 2669 for (i = set->size - 1; i >= 0; i--) { 2670 if (ceph_osd_is_down(osdmap, set->osds[i])) 2671 set->osds[i] = CRUSH_ITEM_NONE; 2672 else 2673 set->primary = set->osds[i]; 2674 } 2675 } 2676 } 2677 2678 static void apply_primary_affinity(struct ceph_osdmap *osdmap, 2679 struct ceph_pg_pool_info *pi, 2680 u32 pps, 2681 struct ceph_osds *up) 2682 { 2683 int i; 2684 int pos = -1; 2685 2686 /* 2687 * Do we have any non-default primary_affinity values for these 2688 * osds? 2689 */ 2690 if (!osdmap->osd_primary_affinity) 2691 return; 2692 2693 for (i = 0; i < up->size; i++) { 2694 int osd = up->osds[i]; 2695 2696 if (osd != CRUSH_ITEM_NONE && 2697 osdmap->osd_primary_affinity[osd] != 2698 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY) { 2699 break; 2700 } 2701 } 2702 if (i == up->size) 2703 return; 2704 2705 /* 2706 * Pick the primary. Feed both the seed (for the pg) and the 2707 * osd into the hash/rng so that a proportional fraction of an 2708 * osd's pgs get rejected as primary. 2709 */ 2710 for (i = 0; i < up->size; i++) { 2711 int osd = up->osds[i]; 2712 u32 aff; 2713 2714 if (osd == CRUSH_ITEM_NONE) 2715 continue; 2716 2717 aff = osdmap->osd_primary_affinity[osd]; 2718 if (aff < CEPH_OSD_MAX_PRIMARY_AFFINITY && 2719 (crush_hash32_2(CRUSH_HASH_RJENKINS1, 2720 pps, osd) >> 16) >= aff) { 2721 /* 2722 * We chose not to use this primary. Note it 2723 * anyway as a fallback in case we don't pick 2724 * anyone else, but keep looking. 2725 */ 2726 if (pos < 0) 2727 pos = i; 2728 } else { 2729 pos = i; 2730 break; 2731 } 2732 } 2733 if (pos < 0) 2734 return; 2735 2736 up->primary = up->osds[pos]; 2737 2738 if (ceph_can_shift_osds(pi) && pos > 0) { 2739 /* move the new primary to the front */ 2740 for (i = pos; i > 0; i--) 2741 up->osds[i] = up->osds[i - 1]; 2742 up->osds[0] = up->primary; 2743 } 2744 } 2745 2746 /* 2747 * Get pg_temp and primary_temp mappings for given PG. 2748 * 2749 * Note that a PG may have none, only pg_temp, only primary_temp or 2750 * both pg_temp and primary_temp mappings. This means @temp isn't 2751 * always a valid OSD set on return: in the "only primary_temp" case, 2752 * @temp will have its ->primary >= 0 but ->size == 0. 2753 */ 2754 static void get_temp_osds(struct ceph_osdmap *osdmap, 2755 struct ceph_pg_pool_info *pi, 2756 const struct ceph_pg *pgid, 2757 struct ceph_osds *temp) 2758 { 2759 struct ceph_pg_mapping *pg; 2760 int i; 2761 2762 ceph_osds_init(temp); 2763 2764 /* pg_temp? */ 2765 pg = lookup_pg_mapping(&osdmap->pg_temp, pgid); 2766 if (pg) { 2767 for (i = 0; i < pg->pg_temp.len; i++) { 2768 if (ceph_osd_is_down(osdmap, pg->pg_temp.osds[i])) { 2769 if (ceph_can_shift_osds(pi)) 2770 continue; 2771 2772 temp->osds[temp->size++] = CRUSH_ITEM_NONE; 2773 } else { 2774 temp->osds[temp->size++] = pg->pg_temp.osds[i]; 2775 } 2776 } 2777 2778 /* apply pg_temp's primary */ 2779 for (i = 0; i < temp->size; i++) { 2780 if (temp->osds[i] != CRUSH_ITEM_NONE) { 2781 temp->primary = temp->osds[i]; 2782 break; 2783 } 2784 } 2785 } 2786 2787 /* primary_temp? */ 2788 pg = lookup_pg_mapping(&osdmap->primary_temp, pgid); 2789 if (pg) 2790 temp->primary = pg->primary_temp.osd; 2791 } 2792 2793 /* 2794 * Map a PG to its acting set as well as its up set. 2795 * 2796 * Acting set is used for data mapping purposes, while up set can be 2797 * recorded for detecting interval changes and deciding whether to 2798 * resend a request. 2799 */ 2800 void ceph_pg_to_up_acting_osds(struct ceph_osdmap *osdmap, 2801 struct ceph_pg_pool_info *pi, 2802 const struct ceph_pg *raw_pgid, 2803 struct ceph_osds *up, 2804 struct ceph_osds *acting) 2805 { 2806 struct ceph_pg pgid; 2807 u32 pps; 2808 2809 WARN_ON(pi->id != raw_pgid->pool); 2810 raw_pg_to_pg(pi, raw_pgid, &pgid); 2811 2812 pg_to_raw_osds(osdmap, pi, raw_pgid, up, &pps); 2813 apply_upmap(osdmap, &pgid, up); 2814 raw_to_up_osds(osdmap, pi, up); 2815 apply_primary_affinity(osdmap, pi, pps, up); 2816 get_temp_osds(osdmap, pi, &pgid, acting); 2817 if (!acting->size) { 2818 memcpy(acting->osds, up->osds, up->size * sizeof(up->osds[0])); 2819 acting->size = up->size; 2820 if (acting->primary == -1) 2821 acting->primary = up->primary; 2822 } 2823 WARN_ON(!osds_valid(up) || !osds_valid(acting)); 2824 } 2825 2826 bool ceph_pg_to_primary_shard(struct ceph_osdmap *osdmap, 2827 struct ceph_pg_pool_info *pi, 2828 const struct ceph_pg *raw_pgid, 2829 struct ceph_spg *spgid) 2830 { 2831 struct ceph_pg pgid; 2832 struct ceph_osds up, acting; 2833 int i; 2834 2835 WARN_ON(pi->id != raw_pgid->pool); 2836 raw_pg_to_pg(pi, raw_pgid, &pgid); 2837 2838 if (ceph_can_shift_osds(pi)) { 2839 spgid->pgid = pgid; /* struct */ 2840 spgid->shard = CEPH_SPG_NOSHARD; 2841 return true; 2842 } 2843 2844 ceph_pg_to_up_acting_osds(osdmap, pi, &pgid, &up, &acting); 2845 for (i = 0; i < acting.size; i++) { 2846 if (acting.osds[i] == acting.primary) { 2847 spgid->pgid = pgid; /* struct */ 2848 spgid->shard = i; 2849 return true; 2850 } 2851 } 2852 2853 return false; 2854 } 2855 2856 /* 2857 * Return acting primary for given PG, or -1 if none. 2858 */ 2859 int ceph_pg_to_acting_primary(struct ceph_osdmap *osdmap, 2860 const struct ceph_pg *raw_pgid) 2861 { 2862 struct ceph_pg_pool_info *pi; 2863 struct ceph_osds up, acting; 2864 2865 pi = ceph_pg_pool_by_id(osdmap, raw_pgid->pool); 2866 if (!pi) 2867 return -1; 2868 2869 ceph_pg_to_up_acting_osds(osdmap, pi, raw_pgid, &up, &acting); 2870 return acting.primary; 2871 } 2872 EXPORT_SYMBOL(ceph_pg_to_acting_primary); 2873 2874 static struct crush_loc_node *alloc_crush_loc(size_t type_name_len, 2875 size_t name_len) 2876 { 2877 struct crush_loc_node *loc; 2878 2879 loc = kmalloc(sizeof(*loc) + type_name_len + name_len + 2, GFP_NOIO); 2880 if (!loc) 2881 return NULL; 2882 2883 RB_CLEAR_NODE(&loc->cl_node); 2884 return loc; 2885 } 2886 2887 static void free_crush_loc(struct crush_loc_node *loc) 2888 { 2889 WARN_ON(!RB_EMPTY_NODE(&loc->cl_node)); 2890 2891 kfree(loc); 2892 } 2893 2894 static int crush_loc_compare(const struct crush_loc *loc1, 2895 const struct crush_loc *loc2) 2896 { 2897 return strcmp(loc1->cl_type_name, loc2->cl_type_name) ?: 2898 strcmp(loc1->cl_name, loc2->cl_name); 2899 } 2900 2901 DEFINE_RB_FUNCS2(crush_loc, struct crush_loc_node, cl_loc, crush_loc_compare, 2902 RB_BYPTR, const struct crush_loc *, cl_node) 2903 2904 /* 2905 * Parses a set of <bucket type name>':'<bucket name> pairs separated 2906 * by '|', e.g. "rack:foo1|rack:foo2|datacenter:bar". 2907 * 2908 * Note that @crush_location is modified by strsep(). 2909 */ 2910 int ceph_parse_crush_location(char *crush_location, struct rb_root *locs) 2911 { 2912 struct crush_loc_node *loc; 2913 const char *type_name, *name, *colon; 2914 size_t type_name_len, name_len; 2915 2916 dout("%s '%s'\n", __func__, crush_location); 2917 while ((type_name = strsep(&crush_location, "|"))) { 2918 colon = strchr(type_name, ':'); 2919 if (!colon) 2920 return -EINVAL; 2921 2922 type_name_len = colon - type_name; 2923 if (type_name_len == 0) 2924 return -EINVAL; 2925 2926 name = colon + 1; 2927 name_len = strlen(name); 2928 if (name_len == 0) 2929 return -EINVAL; 2930 2931 loc = alloc_crush_loc(type_name_len, name_len); 2932 if (!loc) 2933 return -ENOMEM; 2934 2935 loc->cl_loc.cl_type_name = loc->cl_data; 2936 memcpy(loc->cl_loc.cl_type_name, type_name, type_name_len); 2937 loc->cl_loc.cl_type_name[type_name_len] = '\0'; 2938 2939 loc->cl_loc.cl_name = loc->cl_data + type_name_len + 1; 2940 memcpy(loc->cl_loc.cl_name, name, name_len); 2941 loc->cl_loc.cl_name[name_len] = '\0'; 2942 2943 if (!__insert_crush_loc(locs, loc)) { 2944 free_crush_loc(loc); 2945 return -EEXIST; 2946 } 2947 2948 dout("%s type_name '%s' name '%s'\n", __func__, 2949 loc->cl_loc.cl_type_name, loc->cl_loc.cl_name); 2950 } 2951 2952 return 0; 2953 } 2954 2955 int ceph_compare_crush_locs(struct rb_root *locs1, struct rb_root *locs2) 2956 { 2957 struct rb_node *n1 = rb_first(locs1); 2958 struct rb_node *n2 = rb_first(locs2); 2959 int ret; 2960 2961 for ( ; n1 && n2; n1 = rb_next(n1), n2 = rb_next(n2)) { 2962 struct crush_loc_node *loc1 = 2963 rb_entry(n1, struct crush_loc_node, cl_node); 2964 struct crush_loc_node *loc2 = 2965 rb_entry(n2, struct crush_loc_node, cl_node); 2966 2967 ret = crush_loc_compare(&loc1->cl_loc, &loc2->cl_loc); 2968 if (ret) 2969 return ret; 2970 } 2971 2972 if (!n1 && n2) 2973 return -1; 2974 if (n1 && !n2) 2975 return 1; 2976 return 0; 2977 } 2978 2979 void ceph_clear_crush_locs(struct rb_root *locs) 2980 { 2981 while (!RB_EMPTY_ROOT(locs)) { 2982 struct crush_loc_node *loc = 2983 rb_entry(rb_first(locs), struct crush_loc_node, cl_node); 2984 2985 erase_crush_loc(locs, loc); 2986 free_crush_loc(loc); 2987 } 2988 } 2989 2990 /* 2991 * [a-zA-Z0-9-_.]+ 2992 */ 2993 static bool is_valid_crush_name(const char *name) 2994 { 2995 do { 2996 if (!('a' <= *name && *name <= 'z') && 2997 !('A' <= *name && *name <= 'Z') && 2998 !('0' <= *name && *name <= '9') && 2999 *name != '-' && *name != '_' && *name != '.') 3000 return false; 3001 } while (*++name != '\0'); 3002 3003 return true; 3004 } 3005 3006 /* 3007 * Gets the parent of an item. Returns its id (<0 because the 3008 * parent is always a bucket), type id (>0 for the same reason, 3009 * via @parent_type_id) and location (via @parent_loc). If no 3010 * parent, returns 0. 3011 * 3012 * Does a linear search, as there are no parent pointers of any 3013 * kind. Note that the result is ambigous for items that occur 3014 * multiple times in the map. 3015 */ 3016 static int get_immediate_parent(struct crush_map *c, int id, 3017 u16 *parent_type_id, 3018 struct crush_loc *parent_loc) 3019 { 3020 struct crush_bucket *b; 3021 struct crush_name_node *type_cn, *cn; 3022 int i, j; 3023 3024 for (i = 0; i < c->max_buckets; i++) { 3025 b = c->buckets[i]; 3026 if (!b) 3027 continue; 3028 3029 /* ignore per-class shadow hierarchy */ 3030 cn = lookup_crush_name(&c->names, b->id); 3031 if (!cn || !is_valid_crush_name(cn->cn_name)) 3032 continue; 3033 3034 for (j = 0; j < b->size; j++) { 3035 if (b->items[j] != id) 3036 continue; 3037 3038 *parent_type_id = b->type; 3039 type_cn = lookup_crush_name(&c->type_names, b->type); 3040 parent_loc->cl_type_name = type_cn->cn_name; 3041 parent_loc->cl_name = cn->cn_name; 3042 return b->id; 3043 } 3044 } 3045 3046 return 0; /* no parent */ 3047 } 3048 3049 /* 3050 * Calculates the locality/distance from an item to a client 3051 * location expressed in terms of CRUSH hierarchy as a set of 3052 * (bucket type name, bucket name) pairs. Specifically, looks 3053 * for the lowest-valued bucket type for which the location of 3054 * @id matches one of the locations in @locs, so for standard 3055 * bucket types (host = 1, rack = 3, datacenter = 8, zone = 9) 3056 * a matching host is closer than a matching rack and a matching 3057 * data center is closer than a matching zone. 3058 * 3059 * Specifying multiple locations (a "multipath" location) such 3060 * as "rack=foo1 rack=foo2 datacenter=bar" is allowed -- @locs 3061 * is a multimap. The locality will be: 3062 * 3063 * - 3 for OSDs in racks foo1 and foo2 3064 * - 8 for OSDs in data center bar 3065 * - -1 for all other OSDs 3066 * 3067 * The lowest possible bucket type is 1, so the best locality 3068 * for an OSD is 1 (i.e. a matching host). Locality 0 would be 3069 * the OSD itself. 3070 */ 3071 int ceph_get_crush_locality(struct ceph_osdmap *osdmap, int id, 3072 struct rb_root *locs) 3073 { 3074 struct crush_loc loc; 3075 u16 type_id; 3076 3077 /* 3078 * Instead of repeated get_immediate_parent() calls, 3079 * the location of @id could be obtained with a single 3080 * depth-first traversal. 3081 */ 3082 for (;;) { 3083 id = get_immediate_parent(osdmap->crush, id, &type_id, &loc); 3084 if (id >= 0) 3085 return -1; /* not local */ 3086 3087 if (lookup_crush_loc(locs, &loc)) 3088 return type_id; 3089 } 3090 } 3091