1 // SPDX-License-Identifier: GPL-2.0
2
3 #include <linux/ceph/ceph_debug.h>
4
5 #include <linux/module.h>
6 #include <linux/slab.h>
7
8 #include <linux/ceph/libceph.h>
9 #include <linux/ceph/osdmap.h>
10 #include <linux/ceph/decode.h>
11 #include <linux/crush/hash.h>
12 #include <linux/crush/mapper.h>
13
14 static __printf(2, 3)
osdmap_info(const struct ceph_osdmap * map,const char * fmt,...)15 void osdmap_info(const struct ceph_osdmap *map, const char *fmt, ...)
16 {
17 struct va_format vaf;
18 va_list args;
19
20 va_start(args, fmt);
21 vaf.fmt = fmt;
22 vaf.va = &args;
23
24 printk(KERN_INFO "%s (%pU e%u): %pV", KBUILD_MODNAME, &map->fsid,
25 map->epoch, &vaf);
26
27 va_end(args);
28 }
29
ceph_osdmap_state_str(char * str,int len,u32 state)30 char *ceph_osdmap_state_str(char *str, int len, u32 state)
31 {
32 if (!len)
33 return str;
34
35 if ((state & CEPH_OSD_EXISTS) && (state & CEPH_OSD_UP))
36 snprintf(str, len, "exists, up");
37 else if (state & CEPH_OSD_EXISTS)
38 snprintf(str, len, "exists");
39 else if (state & CEPH_OSD_UP)
40 snprintf(str, len, "up");
41 else
42 snprintf(str, len, "doesn't exist");
43
44 return str;
45 }
46
47 /* maps */
48
calc_bits_of(unsigned int t)49 static int calc_bits_of(unsigned int t)
50 {
51 int b = 0;
52 while (t) {
53 t = t >> 1;
54 b++;
55 }
56 return b;
57 }
58
59 /*
60 * the foo_mask is the smallest value 2^n-1 that is >= foo.
61 */
calc_pg_masks(struct ceph_pg_pool_info * pi)62 static void calc_pg_masks(struct ceph_pg_pool_info *pi)
63 {
64 pi->pg_num_mask = (1 << calc_bits_of(pi->pg_num-1)) - 1;
65 pi->pgp_num_mask = (1 << calc_bits_of(pi->pgp_num-1)) - 1;
66 }
67
68 /*
69 * decode crush map
70 */
crush_decode_uniform_bucket(void ** p,void * end,struct crush_bucket_uniform * b)71 static int crush_decode_uniform_bucket(void **p, void *end,
72 struct crush_bucket_uniform *b)
73 {
74 dout("crush_decode_uniform_bucket %p to %p\n", *p, end);
75 ceph_decode_need(p, end, (1+b->h.size) * sizeof(u32), bad);
76 b->item_weight = ceph_decode_32(p);
77 return 0;
78 bad:
79 return -EINVAL;
80 }
81
crush_decode_list_bucket(void ** p,void * end,struct crush_bucket_list * b)82 static int crush_decode_list_bucket(void **p, void *end,
83 struct crush_bucket_list *b)
84 {
85 int j;
86 dout("crush_decode_list_bucket %p to %p\n", *p, end);
87 b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
88 if (b->item_weights == NULL)
89 return -ENOMEM;
90 b->sum_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
91 if (b->sum_weights == NULL)
92 return -ENOMEM;
93 ceph_decode_need(p, end, 2 * b->h.size * sizeof(u32), bad);
94 for (j = 0; j < b->h.size; j++) {
95 b->item_weights[j] = ceph_decode_32(p);
96 b->sum_weights[j] = ceph_decode_32(p);
97 }
98 return 0;
99 bad:
100 return -EINVAL;
101 }
102
crush_decode_tree_bucket(void ** p,void * end,struct crush_bucket_tree * b)103 static int crush_decode_tree_bucket(void **p, void *end,
104 struct crush_bucket_tree *b)
105 {
106 int j;
107 dout("crush_decode_tree_bucket %p to %p\n", *p, end);
108 ceph_decode_8_safe(p, end, b->num_nodes, bad);
109 b->node_weights = kcalloc(b->num_nodes, sizeof(u32), GFP_NOFS);
110 if (b->node_weights == NULL)
111 return -ENOMEM;
112 ceph_decode_need(p, end, b->num_nodes * sizeof(u32), bad);
113 for (j = 0; j < b->num_nodes; j++)
114 b->node_weights[j] = ceph_decode_32(p);
115 return 0;
116 bad:
117 return -EINVAL;
118 }
119
crush_decode_straw_bucket(void ** p,void * end,struct crush_bucket_straw * b)120 static int crush_decode_straw_bucket(void **p, void *end,
121 struct crush_bucket_straw *b)
122 {
123 int j;
124 dout("crush_decode_straw_bucket %p to %p\n", *p, end);
125 b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
126 if (b->item_weights == NULL)
127 return -ENOMEM;
128 b->straws = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
129 if (b->straws == NULL)
130 return -ENOMEM;
131 ceph_decode_need(p, end, 2 * b->h.size * sizeof(u32), bad);
132 for (j = 0; j < b->h.size; j++) {
133 b->item_weights[j] = ceph_decode_32(p);
134 b->straws[j] = ceph_decode_32(p);
135 }
136 return 0;
137 bad:
138 return -EINVAL;
139 }
140
crush_decode_straw2_bucket(void ** p,void * end,struct crush_bucket_straw2 * b)141 static int crush_decode_straw2_bucket(void **p, void *end,
142 struct crush_bucket_straw2 *b)
143 {
144 int j;
145 dout("crush_decode_straw2_bucket %p to %p\n", *p, end);
146 b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
147 if (b->item_weights == NULL)
148 return -ENOMEM;
149 ceph_decode_need(p, end, b->h.size * sizeof(u32), bad);
150 for (j = 0; j < b->h.size; j++)
151 b->item_weights[j] = ceph_decode_32(p);
152 return 0;
153 bad:
154 return -EINVAL;
155 }
156
157 struct crush_name_node {
158 struct rb_node cn_node;
159 int cn_id;
160 char cn_name[];
161 };
162
alloc_crush_name(size_t name_len)163 static struct crush_name_node *alloc_crush_name(size_t name_len)
164 {
165 struct crush_name_node *cn;
166
167 cn = kmalloc(sizeof(*cn) + name_len + 1, GFP_NOIO);
168 if (!cn)
169 return NULL;
170
171 RB_CLEAR_NODE(&cn->cn_node);
172 return cn;
173 }
174
free_crush_name(struct crush_name_node * cn)175 static void free_crush_name(struct crush_name_node *cn)
176 {
177 WARN_ON(!RB_EMPTY_NODE(&cn->cn_node));
178
179 kfree(cn);
180 }
181
DEFINE_RB_FUNCS(crush_name,struct crush_name_node,cn_id,cn_node)182 DEFINE_RB_FUNCS(crush_name, struct crush_name_node, cn_id, cn_node)
183
184 static int decode_crush_names(void **p, void *end, struct rb_root *root)
185 {
186 u32 n;
187
188 ceph_decode_32_safe(p, end, n, e_inval);
189 while (n--) {
190 struct crush_name_node *cn;
191 int id;
192 u32 name_len;
193
194 ceph_decode_32_safe(p, end, id, e_inval);
195 ceph_decode_32_safe(p, end, name_len, e_inval);
196 ceph_decode_need(p, end, name_len, e_inval);
197
198 cn = alloc_crush_name(name_len);
199 if (!cn)
200 return -ENOMEM;
201
202 cn->cn_id = id;
203 memcpy(cn->cn_name, *p, name_len);
204 cn->cn_name[name_len] = '\0';
205 *p += name_len;
206
207 if (!__insert_crush_name(root, cn)) {
208 free_crush_name(cn);
209 return -EEXIST;
210 }
211 }
212
213 return 0;
214
215 e_inval:
216 return -EINVAL;
217 }
218
clear_crush_names(struct rb_root * root)219 void clear_crush_names(struct rb_root *root)
220 {
221 while (!RB_EMPTY_ROOT(root)) {
222 struct crush_name_node *cn =
223 rb_entry(rb_first(root), struct crush_name_node, cn_node);
224
225 erase_crush_name(root, cn);
226 free_crush_name(cn);
227 }
228 }
229
alloc_choose_arg_map(void)230 static struct crush_choose_arg_map *alloc_choose_arg_map(void)
231 {
232 struct crush_choose_arg_map *arg_map;
233
234 arg_map = kzalloc_obj(*arg_map, GFP_NOIO);
235 if (!arg_map)
236 return NULL;
237
238 RB_CLEAR_NODE(&arg_map->node);
239 return arg_map;
240 }
241
free_choose_arg_map(struct crush_choose_arg_map * arg_map)242 static void free_choose_arg_map(struct crush_choose_arg_map *arg_map)
243 {
244 int i, j;
245
246 if (!arg_map)
247 return;
248
249 WARN_ON(!RB_EMPTY_NODE(&arg_map->node));
250
251 if (arg_map->args) {
252 for (i = 0; i < arg_map->size; i++) {
253 struct crush_choose_arg *arg = &arg_map->args[i];
254 if (arg->weight_set) {
255 for (j = 0; j < arg->weight_set_size; j++)
256 kfree(arg->weight_set[j].weights);
257 kfree(arg->weight_set);
258 }
259 kfree(arg->ids);
260 }
261 kfree(arg_map->args);
262 }
263 kfree(arg_map);
264 }
265
266 DEFINE_RB_FUNCS(choose_arg_map, struct crush_choose_arg_map, choose_args_index,
267 node);
268
clear_choose_args(struct crush_map * c)269 void clear_choose_args(struct crush_map *c)
270 {
271 while (!RB_EMPTY_ROOT(&c->choose_args)) {
272 struct crush_choose_arg_map *arg_map =
273 rb_entry(rb_first(&c->choose_args),
274 struct crush_choose_arg_map, node);
275
276 erase_choose_arg_map(&c->choose_args, arg_map);
277 free_choose_arg_map(arg_map);
278 }
279 }
280
decode_array_32_alloc(void ** p,void * end,u32 * plen)281 static u32 *decode_array_32_alloc(void **p, void *end, u32 *plen)
282 {
283 u32 *a = NULL;
284 u32 len;
285 int ret;
286
287 ceph_decode_32_safe(p, end, len, e_inval);
288 if (len) {
289 u32 i;
290
291 a = kmalloc_array(len, sizeof(u32), GFP_NOIO);
292 if (!a) {
293 ret = -ENOMEM;
294 goto fail;
295 }
296
297 ceph_decode_need(p, end, len * sizeof(u32), e_inval);
298 for (i = 0; i < len; i++)
299 a[i] = ceph_decode_32(p);
300 }
301
302 *plen = len;
303 return a;
304
305 e_inval:
306 ret = -EINVAL;
307 fail:
308 kfree(a);
309 return ERR_PTR(ret);
310 }
311
312 /*
313 * Assumes @arg is zero-initialized.
314 */
decode_choose_arg(void ** p,void * end,struct crush_choose_arg * arg)315 static int decode_choose_arg(void **p, void *end, struct crush_choose_arg *arg)
316 {
317 int ret;
318
319 ceph_decode_32_safe(p, end, arg->weight_set_size, e_inval);
320 if (arg->weight_set_size) {
321 u32 i;
322
323 arg->weight_set = kmalloc_objs(*arg->weight_set,
324 arg->weight_set_size, GFP_NOIO);
325 if (!arg->weight_set)
326 return -ENOMEM;
327
328 for (i = 0; i < arg->weight_set_size; i++) {
329 struct crush_weight_set *w = &arg->weight_set[i];
330
331 w->weights = decode_array_32_alloc(p, end, &w->size);
332 if (IS_ERR(w->weights)) {
333 ret = PTR_ERR(w->weights);
334 w->weights = NULL;
335 return ret;
336 }
337 }
338 }
339
340 arg->ids = decode_array_32_alloc(p, end, &arg->ids_size);
341 if (IS_ERR(arg->ids)) {
342 ret = PTR_ERR(arg->ids);
343 arg->ids = NULL;
344 return ret;
345 }
346
347 return 0;
348
349 e_inval:
350 return -EINVAL;
351 }
352
decode_choose_args(void ** p,void * end,struct crush_map * c)353 static int decode_choose_args(void **p, void *end, struct crush_map *c)
354 {
355 struct crush_choose_arg_map *arg_map = NULL;
356 u32 num_choose_arg_maps, num_buckets;
357 int ret;
358
359 ceph_decode_32_safe(p, end, num_choose_arg_maps, e_inval);
360 while (num_choose_arg_maps--) {
361 arg_map = alloc_choose_arg_map();
362 if (!arg_map) {
363 ret = -ENOMEM;
364 goto fail;
365 }
366
367 ceph_decode_64_safe(p, end, arg_map->choose_args_index,
368 e_inval);
369 arg_map->size = c->max_buckets;
370 arg_map->args = kzalloc_objs(*arg_map->args, arg_map->size,
371 GFP_NOIO);
372 if (!arg_map->args) {
373 ret = -ENOMEM;
374 goto fail;
375 }
376
377 ceph_decode_32_safe(p, end, num_buckets, e_inval);
378 while (num_buckets--) {
379 struct crush_choose_arg *arg;
380 u32 bucket_index;
381
382 ceph_decode_32_safe(p, end, bucket_index, e_inval);
383 if (bucket_index >= arg_map->size)
384 goto e_inval;
385
386 arg = &arg_map->args[bucket_index];
387 ret = decode_choose_arg(p, end, arg);
388 if (ret)
389 goto fail;
390
391 if (arg->ids_size &&
392 arg->ids_size != c->buckets[bucket_index]->size)
393 goto e_inval;
394 }
395
396 insert_choose_arg_map(&c->choose_args, arg_map);
397 }
398
399 return 0;
400
401 e_inval:
402 ret = -EINVAL;
403 fail:
404 free_choose_arg_map(arg_map);
405 return ret;
406 }
407
crush_finalize(struct crush_map * c)408 static void crush_finalize(struct crush_map *c)
409 {
410 __s32 b;
411
412 /* Space for the array of pointers to per-bucket workspace */
413 c->working_size = sizeof(struct crush_work) +
414 c->max_buckets * sizeof(struct crush_work_bucket *);
415
416 for (b = 0; b < c->max_buckets; b++) {
417 if (!c->buckets[b])
418 continue;
419
420 switch (c->buckets[b]->alg) {
421 default:
422 /*
423 * The base case, permutation variables and
424 * the pointer to the permutation array.
425 */
426 c->working_size += sizeof(struct crush_work_bucket);
427 break;
428 }
429 /* Every bucket has a permutation array. */
430 c->working_size += c->buckets[b]->size * sizeof(__u32);
431 }
432 }
433
crush_decode(void * pbyval,void * end)434 static struct crush_map *crush_decode(void *pbyval, void *end)
435 {
436 struct crush_map *c;
437 int err;
438 int i, j;
439 void **p = &pbyval;
440 void *start = pbyval;
441 u32 magic;
442
443 dout("crush_decode %p to %p len %d\n", *p, end, (int)(end - *p));
444
445 c = kzalloc_obj(*c, GFP_NOFS);
446 if (c == NULL)
447 return ERR_PTR(-ENOMEM);
448
449 c->type_names = RB_ROOT;
450 c->names = RB_ROOT;
451 c->choose_args = RB_ROOT;
452
453 /* set tunables to default values */
454 c->choose_local_tries = 2;
455 c->choose_local_fallback_tries = 5;
456 c->choose_total_tries = 19;
457 c->chooseleaf_descend_once = 0;
458
459 ceph_decode_need(p, end, 4*sizeof(u32), bad);
460 magic = ceph_decode_32(p);
461 if (magic != CRUSH_MAGIC) {
462 pr_err("crush_decode magic %x != current %x\n",
463 (unsigned int)magic, (unsigned int)CRUSH_MAGIC);
464 goto bad;
465 }
466 c->max_buckets = ceph_decode_32(p);
467 c->max_rules = ceph_decode_32(p);
468 c->max_devices = ceph_decode_32(p);
469
470 c->buckets = kzalloc_objs(*c->buckets, c->max_buckets, GFP_NOFS);
471 if (c->buckets == NULL)
472 goto badmem;
473 c->rules = kzalloc_objs(*c->rules, c->max_rules, GFP_NOFS);
474 if (c->rules == NULL)
475 goto badmem;
476
477 /* buckets */
478 for (i = 0; i < c->max_buckets; i++) {
479 int size = 0;
480 u32 alg;
481 struct crush_bucket *b;
482
483 ceph_decode_32_safe(p, end, alg, bad);
484 if (alg == 0) {
485 c->buckets[i] = NULL;
486 continue;
487 }
488 dout("crush_decode bucket %d off %x %p to %p\n",
489 i, (int)(*p-start), *p, end);
490
491 switch (alg) {
492 case CRUSH_BUCKET_UNIFORM:
493 size = sizeof(struct crush_bucket_uniform);
494 break;
495 case CRUSH_BUCKET_LIST:
496 size = sizeof(struct crush_bucket_list);
497 break;
498 case CRUSH_BUCKET_TREE:
499 size = sizeof(struct crush_bucket_tree);
500 break;
501 case CRUSH_BUCKET_STRAW:
502 size = sizeof(struct crush_bucket_straw);
503 break;
504 case CRUSH_BUCKET_STRAW2:
505 size = sizeof(struct crush_bucket_straw2);
506 break;
507 default:
508 goto bad;
509 }
510 BUG_ON(size == 0);
511 b = c->buckets[i] = kzalloc(size, GFP_NOFS);
512 if (b == NULL)
513 goto badmem;
514
515 ceph_decode_need(p, end, 4*sizeof(u32), bad);
516 b->id = ceph_decode_32(p);
517 b->type = ceph_decode_16(p);
518 b->alg = ceph_decode_8(p);
519 b->hash = ceph_decode_8(p);
520 b->weight = ceph_decode_32(p);
521 b->size = ceph_decode_32(p);
522
523 dout("crush_decode bucket size %d off %x %p to %p\n",
524 b->size, (int)(*p-start), *p, end);
525
526 b->items = kzalloc_objs(__s32, b->size, GFP_NOFS);
527 if (b->items == NULL)
528 goto badmem;
529
530 ceph_decode_need(p, end, b->size*sizeof(u32), bad);
531 for (j = 0; j < b->size; j++)
532 b->items[j] = ceph_decode_32(p);
533
534 switch (b->alg) {
535 case CRUSH_BUCKET_UNIFORM:
536 err = crush_decode_uniform_bucket(p, end,
537 (struct crush_bucket_uniform *)b);
538 if (err < 0)
539 goto fail;
540 break;
541 case CRUSH_BUCKET_LIST:
542 err = crush_decode_list_bucket(p, end,
543 (struct crush_bucket_list *)b);
544 if (err < 0)
545 goto fail;
546 break;
547 case CRUSH_BUCKET_TREE:
548 err = crush_decode_tree_bucket(p, end,
549 (struct crush_bucket_tree *)b);
550 if (err < 0)
551 goto fail;
552 break;
553 case CRUSH_BUCKET_STRAW:
554 err = crush_decode_straw_bucket(p, end,
555 (struct crush_bucket_straw *)b);
556 if (err < 0)
557 goto fail;
558 break;
559 case CRUSH_BUCKET_STRAW2:
560 err = crush_decode_straw2_bucket(p, end,
561 (struct crush_bucket_straw2 *)b);
562 if (err < 0)
563 goto fail;
564 break;
565 }
566 }
567
568 /* rules */
569 dout("rule vec is %p\n", c->rules);
570 for (i = 0; i < c->max_rules; i++) {
571 u32 yes;
572 struct crush_rule *r;
573
574 ceph_decode_32_safe(p, end, yes, bad);
575 if (!yes) {
576 dout("crush_decode NO rule %d off %x %p to %p\n",
577 i, (int)(*p-start), *p, end);
578 c->rules[i] = NULL;
579 continue;
580 }
581
582 dout("crush_decode rule %d off %x %p to %p\n",
583 i, (int)(*p-start), *p, end);
584
585 /* len */
586 ceph_decode_32_safe(p, end, yes, bad);
587 #if BITS_PER_LONG == 32
588 if (yes > (ULONG_MAX - sizeof(*r))
589 / sizeof(struct crush_rule_step))
590 goto bad;
591 #endif
592 r = kmalloc_flex(*r, steps, yes, GFP_NOFS);
593 if (r == NULL)
594 goto badmem;
595 dout(" rule %d is at %p\n", i, r);
596 c->rules[i] = r;
597 r->len = yes;
598 ceph_decode_copy_safe(p, end, &r->mask, 4, bad); /* 4 u8's */
599 ceph_decode_need(p, end, r->len*3*sizeof(u32), bad);
600 for (j = 0; j < r->len; j++) {
601 r->steps[j].op = ceph_decode_32(p);
602 r->steps[j].arg1 = ceph_decode_32(p);
603 r->steps[j].arg2 = ceph_decode_32(p);
604 }
605 }
606
607 err = decode_crush_names(p, end, &c->type_names);
608 if (err)
609 goto fail;
610
611 err = decode_crush_names(p, end, &c->names);
612 if (err)
613 goto fail;
614
615 ceph_decode_skip_map(p, end, 32, string, bad); /* rule_name_map */
616
617 /* tunables */
618 ceph_decode_need(p, end, 3*sizeof(u32), done);
619 c->choose_local_tries = ceph_decode_32(p);
620 c->choose_local_fallback_tries = ceph_decode_32(p);
621 c->choose_total_tries = ceph_decode_32(p);
622 dout("crush decode tunable choose_local_tries = %d\n",
623 c->choose_local_tries);
624 dout("crush decode tunable choose_local_fallback_tries = %d\n",
625 c->choose_local_fallback_tries);
626 dout("crush decode tunable choose_total_tries = %d\n",
627 c->choose_total_tries);
628
629 ceph_decode_need(p, end, sizeof(u32), done);
630 c->chooseleaf_descend_once = ceph_decode_32(p);
631 dout("crush decode tunable chooseleaf_descend_once = %d\n",
632 c->chooseleaf_descend_once);
633
634 ceph_decode_need(p, end, sizeof(u8), done);
635 c->chooseleaf_vary_r = ceph_decode_8(p);
636 dout("crush decode tunable chooseleaf_vary_r = %d\n",
637 c->chooseleaf_vary_r);
638
639 /* skip straw_calc_version, allowed_bucket_algs */
640 ceph_decode_need(p, end, sizeof(u8) + sizeof(u32), done);
641 *p += sizeof(u8) + sizeof(u32);
642
643 ceph_decode_need(p, end, sizeof(u8), done);
644 c->chooseleaf_stable = ceph_decode_8(p);
645 dout("crush decode tunable chooseleaf_stable = %d\n",
646 c->chooseleaf_stable);
647
648 if (*p != end) {
649 /* class_map */
650 ceph_decode_skip_map(p, end, 32, 32, bad);
651 /* class_name */
652 ceph_decode_skip_map(p, end, 32, string, bad);
653 /* class_bucket */
654 ceph_decode_skip_map_of_map(p, end, 32, 32, 32, bad);
655 }
656
657 if (*p != end) {
658 err = decode_choose_args(p, end, c);
659 if (err)
660 goto fail;
661 }
662
663 done:
664 crush_finalize(c);
665 dout("crush_decode success\n");
666 return c;
667
668 badmem:
669 err = -ENOMEM;
670 fail:
671 dout("crush_decode fail %d\n", err);
672 crush_destroy(c);
673 return ERR_PTR(err);
674
675 bad:
676 err = -EINVAL;
677 goto fail;
678 }
679
ceph_pg_compare(const struct ceph_pg * lhs,const struct ceph_pg * rhs)680 int ceph_pg_compare(const struct ceph_pg *lhs, const struct ceph_pg *rhs)
681 {
682 if (lhs->pool < rhs->pool)
683 return -1;
684 if (lhs->pool > rhs->pool)
685 return 1;
686 if (lhs->seed < rhs->seed)
687 return -1;
688 if (lhs->seed > rhs->seed)
689 return 1;
690
691 return 0;
692 }
693
ceph_spg_compare(const struct ceph_spg * lhs,const struct ceph_spg * rhs)694 int ceph_spg_compare(const struct ceph_spg *lhs, const struct ceph_spg *rhs)
695 {
696 int ret;
697
698 ret = ceph_pg_compare(&lhs->pgid, &rhs->pgid);
699 if (ret)
700 return ret;
701
702 if (lhs->shard < rhs->shard)
703 return -1;
704 if (lhs->shard > rhs->shard)
705 return 1;
706
707 return 0;
708 }
709
alloc_pg_mapping(size_t payload_len)710 static struct ceph_pg_mapping *alloc_pg_mapping(size_t payload_len)
711 {
712 struct ceph_pg_mapping *pg;
713
714 pg = kmalloc(sizeof(*pg) + payload_len, GFP_NOIO);
715 if (!pg)
716 return NULL;
717
718 RB_CLEAR_NODE(&pg->node);
719 return pg;
720 }
721
free_pg_mapping(struct ceph_pg_mapping * pg)722 static void free_pg_mapping(struct ceph_pg_mapping *pg)
723 {
724 WARN_ON(!RB_EMPTY_NODE(&pg->node));
725
726 kfree(pg);
727 }
728
729 /*
730 * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid
731 * to a set of osds) and primary_temp (explicit primary setting)
732 */
DEFINE_RB_FUNCS2(pg_mapping,struct ceph_pg_mapping,pgid,ceph_pg_compare,RB_BYPTR,const struct ceph_pg *,node)733 DEFINE_RB_FUNCS2(pg_mapping, struct ceph_pg_mapping, pgid, ceph_pg_compare,
734 RB_BYPTR, const struct ceph_pg *, node)
735
736 /*
737 * rbtree of pg pool info
738 */
739 DEFINE_RB_FUNCS(pg_pool, struct ceph_pg_pool_info, id, node)
740
741 struct ceph_pg_pool_info *ceph_pg_pool_by_id(struct ceph_osdmap *map, u64 id)
742 {
743 return lookup_pg_pool(&map->pg_pools, id);
744 }
745
ceph_pg_pool_name_by_id(struct ceph_osdmap * map,u64 id)746 const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id)
747 {
748 struct ceph_pg_pool_info *pi;
749
750 if (id == CEPH_NOPOOL)
751 return NULL;
752
753 if (WARN_ON_ONCE(id > (u64) INT_MAX))
754 return NULL;
755
756 pi = lookup_pg_pool(&map->pg_pools, id);
757 return pi ? pi->name : NULL;
758 }
759 EXPORT_SYMBOL(ceph_pg_pool_name_by_id);
760
ceph_pg_poolid_by_name(struct ceph_osdmap * map,const char * name)761 int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name)
762 {
763 struct rb_node *rbp;
764
765 for (rbp = rb_first(&map->pg_pools); rbp; rbp = rb_next(rbp)) {
766 struct ceph_pg_pool_info *pi =
767 rb_entry(rbp, struct ceph_pg_pool_info, node);
768 if (pi->name && strcmp(pi->name, name) == 0)
769 return pi->id;
770 }
771 return -ENOENT;
772 }
773 EXPORT_SYMBOL(ceph_pg_poolid_by_name);
774
ceph_pg_pool_flags(struct ceph_osdmap * map,u64 id)775 u64 ceph_pg_pool_flags(struct ceph_osdmap *map, u64 id)
776 {
777 struct ceph_pg_pool_info *pi;
778
779 pi = lookup_pg_pool(&map->pg_pools, id);
780 return pi ? pi->flags : 0;
781 }
782 EXPORT_SYMBOL(ceph_pg_pool_flags);
783
__remove_pg_pool(struct rb_root * root,struct ceph_pg_pool_info * pi)784 static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi)
785 {
786 erase_pg_pool(root, pi);
787 kfree(pi->name);
788 kfree(pi);
789 }
790
decode_pool(void ** p,void * end,struct ceph_pg_pool_info * pi)791 static int decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
792 {
793 u8 ev, cv;
794 unsigned len, num;
795 void *pool_end;
796
797 ceph_decode_need(p, end, 2 + 4, bad);
798 ev = ceph_decode_8(p); /* encoding version */
799 cv = ceph_decode_8(p); /* compat version */
800 if (ev < 5) {
801 pr_warn("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv);
802 return -EINVAL;
803 }
804 if (cv > 9) {
805 pr_warn("got v %d cv %d > 9 of ceph_pg_pool\n", ev, cv);
806 return -EINVAL;
807 }
808 len = ceph_decode_32(p);
809 ceph_decode_need(p, end, len, bad);
810 pool_end = *p + len;
811
812 ceph_decode_need(p, end, 4 + 4 + 4, bad);
813 pi->type = ceph_decode_8(p);
814 pi->size = ceph_decode_8(p);
815 pi->crush_ruleset = ceph_decode_8(p);
816 pi->object_hash = ceph_decode_8(p);
817 pi->pg_num = ceph_decode_32(p);
818 pi->pgp_num = ceph_decode_32(p);
819
820 /* lpg*, last_change, snap_seq, snap_epoch */
821 ceph_decode_skip_n(p, end, 8 + 4 + 8 + 4, bad);
822
823 /* skip snaps */
824 ceph_decode_32_safe(p, end, num, bad);
825 while (num--) {
826 /* snapid key, pool snap (with versions) */
827 ceph_decode_skip_n(p, end, 8 + 2, bad);
828 ceph_decode_skip_string(p, end, bad);
829 }
830
831 /* removed_snaps */
832 ceph_decode_skip_map(p, end, 64, 64, bad);
833
834 ceph_decode_need(p, end, 8 + 8 + 4, bad);
835 *p += 8; /* skip auid */
836 pi->flags = ceph_decode_64(p);
837 *p += 4; /* skip crash_replay_interval */
838
839 if (ev >= 7)
840 ceph_decode_8_safe(p, end, pi->min_size, bad);
841 else
842 pi->min_size = pi->size - pi->size / 2;
843
844 if (ev >= 8)
845 /* quota_max_* */
846 ceph_decode_skip_n(p, end, 8 + 8, bad);
847
848 if (ev >= 9) {
849 /* tiers */
850 ceph_decode_skip_set(p, end, 64, bad);
851
852 ceph_decode_need(p, end, 8 + 1 + 8 + 8, bad);
853 *p += 8; /* skip tier_of */
854 *p += 1; /* skip cache_mode */
855 pi->read_tier = ceph_decode_64(p);
856 pi->write_tier = ceph_decode_64(p);
857 } else {
858 pi->read_tier = -1;
859 pi->write_tier = -1;
860 }
861
862 if (ev >= 10)
863 /* properties */
864 ceph_decode_skip_map(p, end, string, string, bad);
865
866 if (ev >= 11) {
867 /* hit_set_params (with versions) */
868 ceph_decode_skip_n(p, end, 2, bad);
869 ceph_decode_skip_string(p, end, bad);
870
871 /* hit_set_period, hit_set_count */
872 ceph_decode_skip_n(p, end, 4 + 4, bad);
873 }
874
875 if (ev >= 12)
876 /* stripe_width */
877 ceph_decode_skip_32(p, end, bad);
878
879 if (ev >= 13)
880 /* target_max_*, cache_target_*, cache_min_* */
881 ceph_decode_skip_n(p, end, 16 + 8 + 8, bad);
882
883 if (ev >= 14)
884 /* erasure_code_profile */
885 ceph_decode_skip_string(p, end, bad);
886
887 /*
888 * last_force_op_resend_preluminous, will be overridden if the
889 * map was encoded with RESEND_ON_SPLIT
890 */
891 if (ev >= 15)
892 ceph_decode_32_safe(p, end, pi->last_force_request_resend, bad);
893 else
894 pi->last_force_request_resend = 0;
895
896 if (ev >= 16)
897 /* min_read_recency_for_promote */
898 ceph_decode_skip_32(p, end, bad);
899
900 if (ev >= 17)
901 /* expected_num_objects */
902 ceph_decode_skip_64(p, end, bad);
903
904 if (ev >= 19)
905 /* cache_target_dirty_high_ratio_micro */
906 ceph_decode_skip_32(p, end, bad);
907
908 if (ev >= 20)
909 /* min_write_recency_for_promote */
910 ceph_decode_skip_32(p, end, bad);
911
912 if (ev >= 21)
913 /* use_gmt_hitset */
914 ceph_decode_skip_8(p, end, bad);
915
916 if (ev >= 22)
917 /* fast_read */
918 ceph_decode_skip_8(p, end, bad);
919
920 if (ev >= 23)
921 /* hit_set_grade_decay_rate, hit_set_search_last_n */
922 ceph_decode_skip_n(p, end, 4 + 4, bad);
923
924 if (ev >= 24) {
925 /* opts (with versions) */
926 ceph_decode_skip_n(p, end, 2, bad);
927 ceph_decode_skip_string(p, end, bad);
928 }
929
930 if (ev >= 25)
931 ceph_decode_32_safe(p, end, pi->last_force_request_resend, bad);
932
933 /* ignore the rest */
934
935 *p = pool_end;
936 calc_pg_masks(pi);
937 return 0;
938
939 bad:
940 return -EINVAL;
941 }
942
decode_pool_names(void ** p,void * end,struct ceph_osdmap * map)943 static int decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
944 {
945 struct ceph_pg_pool_info *pi;
946 u32 num, len;
947 u64 pool;
948
949 ceph_decode_32_safe(p, end, num, bad);
950 dout(" %d pool names\n", num);
951 while (num--) {
952 ceph_decode_64_safe(p, end, pool, bad);
953 ceph_decode_32_safe(p, end, len, bad);
954 dout(" pool %llu len %d\n", pool, len);
955 ceph_decode_need(p, end, len, bad);
956 pi = lookup_pg_pool(&map->pg_pools, pool);
957 if (pi) {
958 char *name = kstrndup(*p, len, GFP_NOFS);
959
960 if (!name)
961 return -ENOMEM;
962 kfree(pi->name);
963 pi->name = name;
964 dout(" name is %s\n", pi->name);
965 }
966 *p += len;
967 }
968 return 0;
969
970 bad:
971 return -EINVAL;
972 }
973
974 /*
975 * CRUSH workspaces
976 *
977 * workspace_manager framework borrowed from fs/btrfs/compression.c.
978 * Two simplifications: there is only one type of workspace and there
979 * is always at least one workspace.
980 */
alloc_workspace(const struct crush_map * c)981 static struct crush_work *alloc_workspace(const struct crush_map *c)
982 {
983 struct crush_work *work;
984 size_t work_size;
985
986 WARN_ON(!c->working_size);
987 work_size = crush_work_size(c, CEPH_PG_MAX_SIZE);
988 dout("%s work_size %zu bytes\n", __func__, work_size);
989
990 work = kvmalloc(work_size, GFP_NOIO);
991 if (!work)
992 return NULL;
993
994 INIT_LIST_HEAD(&work->item);
995 crush_init_workspace(c, work);
996 return work;
997 }
998
free_workspace(struct crush_work * work)999 static void free_workspace(struct crush_work *work)
1000 {
1001 WARN_ON(!list_empty(&work->item));
1002 kvfree(work);
1003 }
1004
init_workspace_manager(struct workspace_manager * wsm)1005 static void init_workspace_manager(struct workspace_manager *wsm)
1006 {
1007 INIT_LIST_HEAD(&wsm->idle_ws);
1008 spin_lock_init(&wsm->ws_lock);
1009 atomic_set(&wsm->total_ws, 0);
1010 wsm->free_ws = 0;
1011 init_waitqueue_head(&wsm->ws_wait);
1012 }
1013
add_initial_workspace(struct workspace_manager * wsm,struct crush_work * work)1014 static void add_initial_workspace(struct workspace_manager *wsm,
1015 struct crush_work *work)
1016 {
1017 WARN_ON(!list_empty(&wsm->idle_ws));
1018
1019 list_add(&work->item, &wsm->idle_ws);
1020 atomic_set(&wsm->total_ws, 1);
1021 wsm->free_ws = 1;
1022 }
1023
cleanup_workspace_manager(struct workspace_manager * wsm)1024 static void cleanup_workspace_manager(struct workspace_manager *wsm)
1025 {
1026 struct crush_work *work;
1027
1028 while (!list_empty(&wsm->idle_ws)) {
1029 work = list_first_entry(&wsm->idle_ws, struct crush_work,
1030 item);
1031 list_del_init(&work->item);
1032 free_workspace(work);
1033 }
1034 atomic_set(&wsm->total_ws, 0);
1035 wsm->free_ws = 0;
1036 }
1037
1038 /*
1039 * Finds an available workspace or allocates a new one. If it's not
1040 * possible to allocate a new one, waits until there is one.
1041 */
get_workspace(struct workspace_manager * wsm,const struct crush_map * c)1042 static struct crush_work *get_workspace(struct workspace_manager *wsm,
1043 const struct crush_map *c)
1044 {
1045 struct crush_work *work;
1046 int cpus = num_online_cpus();
1047
1048 again:
1049 spin_lock(&wsm->ws_lock);
1050 if (!list_empty(&wsm->idle_ws)) {
1051 work = list_first_entry(&wsm->idle_ws, struct crush_work,
1052 item);
1053 list_del_init(&work->item);
1054 wsm->free_ws--;
1055 spin_unlock(&wsm->ws_lock);
1056 return work;
1057
1058 }
1059 if (atomic_read(&wsm->total_ws) > cpus) {
1060 DEFINE_WAIT(wait);
1061
1062 spin_unlock(&wsm->ws_lock);
1063 prepare_to_wait(&wsm->ws_wait, &wait, TASK_UNINTERRUPTIBLE);
1064 if (atomic_read(&wsm->total_ws) > cpus && !wsm->free_ws)
1065 schedule();
1066 finish_wait(&wsm->ws_wait, &wait);
1067 goto again;
1068 }
1069 atomic_inc(&wsm->total_ws);
1070 spin_unlock(&wsm->ws_lock);
1071
1072 work = alloc_workspace(c);
1073 if (!work) {
1074 atomic_dec(&wsm->total_ws);
1075 wake_up(&wsm->ws_wait);
1076
1077 /*
1078 * Do not return the error but go back to waiting. We
1079 * have the initial workspace and the CRUSH computation
1080 * time is bounded so we will get it eventually.
1081 */
1082 WARN_ON(atomic_read(&wsm->total_ws) < 1);
1083 goto again;
1084 }
1085 return work;
1086 }
1087
1088 /*
1089 * Puts a workspace back on the list or frees it if we have enough
1090 * idle ones sitting around.
1091 */
put_workspace(struct workspace_manager * wsm,struct crush_work * work)1092 static void put_workspace(struct workspace_manager *wsm,
1093 struct crush_work *work)
1094 {
1095 spin_lock(&wsm->ws_lock);
1096 if (wsm->free_ws <= num_online_cpus()) {
1097 list_add(&work->item, &wsm->idle_ws);
1098 wsm->free_ws++;
1099 spin_unlock(&wsm->ws_lock);
1100 goto wake;
1101 }
1102 spin_unlock(&wsm->ws_lock);
1103
1104 free_workspace(work);
1105 atomic_dec(&wsm->total_ws);
1106 wake:
1107 if (wq_has_sleeper(&wsm->ws_wait))
1108 wake_up(&wsm->ws_wait);
1109 }
1110
1111 /*
1112 * osd map
1113 */
ceph_osdmap_alloc(void)1114 struct ceph_osdmap *ceph_osdmap_alloc(void)
1115 {
1116 struct ceph_osdmap *map;
1117
1118 map = kzalloc_obj(*map, GFP_NOIO);
1119 if (!map)
1120 return NULL;
1121
1122 map->pg_pools = RB_ROOT;
1123 map->pool_max = -1;
1124 map->pg_temp = RB_ROOT;
1125 map->primary_temp = RB_ROOT;
1126 map->pg_upmap = RB_ROOT;
1127 map->pg_upmap_items = RB_ROOT;
1128
1129 init_workspace_manager(&map->crush_wsm);
1130
1131 return map;
1132 }
1133
ceph_osdmap_destroy(struct ceph_osdmap * map)1134 void ceph_osdmap_destroy(struct ceph_osdmap *map)
1135 {
1136 dout("osdmap_destroy %p\n", map);
1137
1138 if (map->crush)
1139 crush_destroy(map->crush);
1140 cleanup_workspace_manager(&map->crush_wsm);
1141
1142 while (!RB_EMPTY_ROOT(&map->pg_temp)) {
1143 struct ceph_pg_mapping *pg =
1144 rb_entry(rb_first(&map->pg_temp),
1145 struct ceph_pg_mapping, node);
1146 erase_pg_mapping(&map->pg_temp, pg);
1147 free_pg_mapping(pg);
1148 }
1149 while (!RB_EMPTY_ROOT(&map->primary_temp)) {
1150 struct ceph_pg_mapping *pg =
1151 rb_entry(rb_first(&map->primary_temp),
1152 struct ceph_pg_mapping, node);
1153 erase_pg_mapping(&map->primary_temp, pg);
1154 free_pg_mapping(pg);
1155 }
1156 while (!RB_EMPTY_ROOT(&map->pg_upmap)) {
1157 struct ceph_pg_mapping *pg =
1158 rb_entry(rb_first(&map->pg_upmap),
1159 struct ceph_pg_mapping, node);
1160 rb_erase(&pg->node, &map->pg_upmap);
1161 kfree(pg);
1162 }
1163 while (!RB_EMPTY_ROOT(&map->pg_upmap_items)) {
1164 struct ceph_pg_mapping *pg =
1165 rb_entry(rb_first(&map->pg_upmap_items),
1166 struct ceph_pg_mapping, node);
1167 rb_erase(&pg->node, &map->pg_upmap_items);
1168 kfree(pg);
1169 }
1170 while (!RB_EMPTY_ROOT(&map->pg_pools)) {
1171 struct ceph_pg_pool_info *pi =
1172 rb_entry(rb_first(&map->pg_pools),
1173 struct ceph_pg_pool_info, node);
1174 __remove_pg_pool(&map->pg_pools, pi);
1175 }
1176 kvfree(map->osd_state);
1177 kvfree(map->osd_weight);
1178 kvfree(map->osd_addr);
1179 kvfree(map->osd_primary_affinity);
1180 kfree(map);
1181 }
1182
1183 /*
1184 * Adjust max_osd value, (re)allocate arrays.
1185 *
1186 * The new elements are properly initialized.
1187 */
osdmap_set_max_osd(struct ceph_osdmap * map,u32 max)1188 static int osdmap_set_max_osd(struct ceph_osdmap *map, u32 max)
1189 {
1190 u32 *state;
1191 u32 *weight;
1192 struct ceph_entity_addr *addr;
1193 u32 to_copy;
1194 int i;
1195
1196 dout("%s old %u new %u\n", __func__, map->max_osd, max);
1197 if (max == map->max_osd)
1198 return 0;
1199
1200 state = kvmalloc(array_size(max, sizeof(*state)), GFP_NOFS);
1201 weight = kvmalloc(array_size(max, sizeof(*weight)), GFP_NOFS);
1202 addr = kvmalloc(array_size(max, sizeof(*addr)), GFP_NOFS);
1203 if (!state || !weight || !addr) {
1204 kvfree(state);
1205 kvfree(weight);
1206 kvfree(addr);
1207 return -ENOMEM;
1208 }
1209
1210 to_copy = min(map->max_osd, max);
1211 if (map->osd_state) {
1212 memcpy(state, map->osd_state, to_copy * sizeof(*state));
1213 memcpy(weight, map->osd_weight, to_copy * sizeof(*weight));
1214 memcpy(addr, map->osd_addr, to_copy * sizeof(*addr));
1215 kvfree(map->osd_state);
1216 kvfree(map->osd_weight);
1217 kvfree(map->osd_addr);
1218 }
1219
1220 map->osd_state = state;
1221 map->osd_weight = weight;
1222 map->osd_addr = addr;
1223 for (i = map->max_osd; i < max; i++) {
1224 map->osd_state[i] = 0;
1225 map->osd_weight[i] = CEPH_OSD_OUT;
1226 memset(map->osd_addr + i, 0, sizeof(*map->osd_addr));
1227 }
1228
1229 if (map->osd_primary_affinity) {
1230 u32 *affinity;
1231
1232 affinity = kvmalloc(array_size(max, sizeof(*affinity)),
1233 GFP_NOFS);
1234 if (!affinity)
1235 return -ENOMEM;
1236
1237 memcpy(affinity, map->osd_primary_affinity,
1238 to_copy * sizeof(*affinity));
1239 kvfree(map->osd_primary_affinity);
1240
1241 map->osd_primary_affinity = affinity;
1242 for (i = map->max_osd; i < max; i++)
1243 map->osd_primary_affinity[i] =
1244 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
1245 }
1246
1247 map->max_osd = max;
1248
1249 return 0;
1250 }
1251
osdmap_set_crush(struct ceph_osdmap * map,struct crush_map * crush)1252 static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush)
1253 {
1254 struct crush_work *work;
1255
1256 if (IS_ERR(crush))
1257 return PTR_ERR(crush);
1258
1259 work = alloc_workspace(crush);
1260 if (!work) {
1261 crush_destroy(crush);
1262 return -ENOMEM;
1263 }
1264
1265 if (map->crush)
1266 crush_destroy(map->crush);
1267 cleanup_workspace_manager(&map->crush_wsm);
1268 map->crush = crush;
1269 add_initial_workspace(&map->crush_wsm, work);
1270 return 0;
1271 }
1272
1273 #define OSDMAP_WRAPPER_COMPAT_VER 7
1274 #define OSDMAP_CLIENT_DATA_COMPAT_VER 1
1275
1276 /*
1277 * Return 0 or error. On success, *v is set to 0 for old (v6) osdmaps,
1278 * to struct_v of the client_data section for new (v7 and above)
1279 * osdmaps.
1280 */
get_osdmap_client_data_v(void ** p,void * end,const char * prefix,u8 * v)1281 static int get_osdmap_client_data_v(void **p, void *end,
1282 const char *prefix, u8 *v)
1283 {
1284 u8 struct_v;
1285
1286 ceph_decode_8_safe(p, end, struct_v, e_inval);
1287 if (struct_v >= 7) {
1288 u8 struct_compat;
1289
1290 ceph_decode_8_safe(p, end, struct_compat, e_inval);
1291 if (struct_compat > OSDMAP_WRAPPER_COMPAT_VER) {
1292 pr_warn("got v %d cv %d > %d of %s ceph_osdmap\n",
1293 struct_v, struct_compat,
1294 OSDMAP_WRAPPER_COMPAT_VER, prefix);
1295 return -EINVAL;
1296 }
1297 *p += 4; /* ignore wrapper struct_len */
1298
1299 ceph_decode_8_safe(p, end, struct_v, e_inval);
1300 ceph_decode_8_safe(p, end, struct_compat, e_inval);
1301 if (struct_compat > OSDMAP_CLIENT_DATA_COMPAT_VER) {
1302 pr_warn("got v %d cv %d > %d of %s ceph_osdmap client data\n",
1303 struct_v, struct_compat,
1304 OSDMAP_CLIENT_DATA_COMPAT_VER, prefix);
1305 return -EINVAL;
1306 }
1307 *p += 4; /* ignore client data struct_len */
1308 } else {
1309 u16 version;
1310
1311 *p -= 1;
1312 ceph_decode_16_safe(p, end, version, e_inval);
1313 if (version < 6) {
1314 pr_warn("got v %d < 6 of %s ceph_osdmap\n",
1315 version, prefix);
1316 return -EINVAL;
1317 }
1318
1319 /* old osdmap encoding */
1320 struct_v = 0;
1321 }
1322
1323 *v = struct_v;
1324 return 0;
1325
1326 e_inval:
1327 return -EINVAL;
1328 }
1329
__decode_pools(void ** p,void * end,struct ceph_osdmap * map,bool incremental)1330 static int __decode_pools(void **p, void *end, struct ceph_osdmap *map,
1331 bool incremental)
1332 {
1333 u32 n;
1334
1335 ceph_decode_32_safe(p, end, n, e_inval);
1336 while (n--) {
1337 struct ceph_pg_pool_info *pi;
1338 u64 pool;
1339 int ret;
1340
1341 ceph_decode_64_safe(p, end, pool, e_inval);
1342
1343 pi = lookup_pg_pool(&map->pg_pools, pool);
1344 if (!incremental || !pi) {
1345 pi = kzalloc_obj(*pi, GFP_NOFS);
1346 if (!pi)
1347 return -ENOMEM;
1348
1349 RB_CLEAR_NODE(&pi->node);
1350 pi->id = pool;
1351
1352 if (!__insert_pg_pool(&map->pg_pools, pi)) {
1353 kfree(pi);
1354 return -EEXIST;
1355 }
1356 }
1357
1358 ret = decode_pool(p, end, pi);
1359 if (ret)
1360 return ret;
1361 }
1362
1363 return 0;
1364
1365 e_inval:
1366 return -EINVAL;
1367 }
1368
decode_pools(void ** p,void * end,struct ceph_osdmap * map)1369 static int decode_pools(void **p, void *end, struct ceph_osdmap *map)
1370 {
1371 return __decode_pools(p, end, map, false);
1372 }
1373
decode_new_pools(void ** p,void * end,struct ceph_osdmap * map)1374 static int decode_new_pools(void **p, void *end, struct ceph_osdmap *map)
1375 {
1376 return __decode_pools(p, end, map, true);
1377 }
1378
1379 typedef struct ceph_pg_mapping *(*decode_mapping_fn_t)(void **, void *, bool);
1380
decode_pg_mapping(void ** p,void * end,struct rb_root * mapping_root,decode_mapping_fn_t fn,bool incremental)1381 static int decode_pg_mapping(void **p, void *end, struct rb_root *mapping_root,
1382 decode_mapping_fn_t fn, bool incremental)
1383 {
1384 u32 n;
1385
1386 WARN_ON(!incremental && !fn);
1387
1388 ceph_decode_32_safe(p, end, n, e_inval);
1389 while (n--) {
1390 struct ceph_pg_mapping *pg;
1391 struct ceph_pg pgid;
1392 int ret;
1393
1394 ret = ceph_decode_pgid(p, end, &pgid);
1395 if (ret)
1396 return ret;
1397
1398 pg = lookup_pg_mapping(mapping_root, &pgid);
1399 if (pg) {
1400 WARN_ON(!incremental);
1401 erase_pg_mapping(mapping_root, pg);
1402 free_pg_mapping(pg);
1403 }
1404
1405 if (fn) {
1406 pg = fn(p, end, incremental);
1407 if (IS_ERR(pg))
1408 return PTR_ERR(pg);
1409
1410 if (pg) {
1411 pg->pgid = pgid; /* struct */
1412 insert_pg_mapping(mapping_root, pg);
1413 }
1414 }
1415 }
1416
1417 return 0;
1418
1419 e_inval:
1420 return -EINVAL;
1421 }
1422
__decode_pg_temp(void ** p,void * end,bool incremental)1423 static struct ceph_pg_mapping *__decode_pg_temp(void **p, void *end,
1424 bool incremental)
1425 {
1426 struct ceph_pg_mapping *pg;
1427 u32 len, i;
1428
1429 ceph_decode_32_safe(p, end, len, e_inval);
1430 if (len == 0 && incremental)
1431 return NULL; /* new_pg_temp: [] to remove */
1432 if ((size_t)len > (SIZE_MAX - sizeof(*pg)) / sizeof(u32))
1433 return ERR_PTR(-EINVAL);
1434
1435 ceph_decode_need(p, end, len * sizeof(u32), e_inval);
1436 pg = alloc_pg_mapping(len * sizeof(u32));
1437 if (!pg)
1438 return ERR_PTR(-ENOMEM);
1439
1440 pg->pg_temp.len = len;
1441 for (i = 0; i < len; i++)
1442 pg->pg_temp.osds[i] = ceph_decode_32(p);
1443
1444 return pg;
1445
1446 e_inval:
1447 return ERR_PTR(-EINVAL);
1448 }
1449
decode_pg_temp(void ** p,void * end,struct ceph_osdmap * map)1450 static int decode_pg_temp(void **p, void *end, struct ceph_osdmap *map)
1451 {
1452 return decode_pg_mapping(p, end, &map->pg_temp, __decode_pg_temp,
1453 false);
1454 }
1455
decode_new_pg_temp(void ** p,void * end,struct ceph_osdmap * map)1456 static int decode_new_pg_temp(void **p, void *end, struct ceph_osdmap *map)
1457 {
1458 return decode_pg_mapping(p, end, &map->pg_temp, __decode_pg_temp,
1459 true);
1460 }
1461
__decode_primary_temp(void ** p,void * end,bool incremental)1462 static struct ceph_pg_mapping *__decode_primary_temp(void **p, void *end,
1463 bool incremental)
1464 {
1465 struct ceph_pg_mapping *pg;
1466 u32 osd;
1467
1468 ceph_decode_32_safe(p, end, osd, e_inval);
1469 if (osd == (u32)-1 && incremental)
1470 return NULL; /* new_primary_temp: -1 to remove */
1471
1472 pg = alloc_pg_mapping(0);
1473 if (!pg)
1474 return ERR_PTR(-ENOMEM);
1475
1476 pg->primary_temp.osd = osd;
1477 return pg;
1478
1479 e_inval:
1480 return ERR_PTR(-EINVAL);
1481 }
1482
decode_primary_temp(void ** p,void * end,struct ceph_osdmap * map)1483 static int decode_primary_temp(void **p, void *end, struct ceph_osdmap *map)
1484 {
1485 return decode_pg_mapping(p, end, &map->primary_temp,
1486 __decode_primary_temp, false);
1487 }
1488
decode_new_primary_temp(void ** p,void * end,struct ceph_osdmap * map)1489 static int decode_new_primary_temp(void **p, void *end,
1490 struct ceph_osdmap *map)
1491 {
1492 return decode_pg_mapping(p, end, &map->primary_temp,
1493 __decode_primary_temp, true);
1494 }
1495
ceph_get_primary_affinity(struct ceph_osdmap * map,int osd)1496 u32 ceph_get_primary_affinity(struct ceph_osdmap *map, int osd)
1497 {
1498 if (!map->osd_primary_affinity)
1499 return CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
1500
1501 return map->osd_primary_affinity[osd];
1502 }
1503
set_primary_affinity(struct ceph_osdmap * map,int osd,u32 aff)1504 static int set_primary_affinity(struct ceph_osdmap *map, int osd, u32 aff)
1505 {
1506 if (!map->osd_primary_affinity) {
1507 int i;
1508
1509 map->osd_primary_affinity = kvmalloc(
1510 array_size(map->max_osd, sizeof(*map->osd_primary_affinity)),
1511 GFP_NOFS);
1512 if (!map->osd_primary_affinity)
1513 return -ENOMEM;
1514
1515 for (i = 0; i < map->max_osd; i++)
1516 map->osd_primary_affinity[i] =
1517 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
1518 }
1519
1520 map->osd_primary_affinity[osd] = aff;
1521
1522 return 0;
1523 }
1524
decode_primary_affinity(void ** p,void * end,struct ceph_osdmap * map)1525 static int decode_primary_affinity(void **p, void *end,
1526 struct ceph_osdmap *map)
1527 {
1528 u32 len, i;
1529
1530 ceph_decode_32_safe(p, end, len, e_inval);
1531 if (len == 0) {
1532 kvfree(map->osd_primary_affinity);
1533 map->osd_primary_affinity = NULL;
1534 return 0;
1535 }
1536 if (len != map->max_osd)
1537 goto e_inval;
1538
1539 ceph_decode_need(p, end, map->max_osd*sizeof(u32), e_inval);
1540
1541 for (i = 0; i < map->max_osd; i++) {
1542 int ret;
1543
1544 ret = set_primary_affinity(map, i, ceph_decode_32(p));
1545 if (ret)
1546 return ret;
1547 }
1548
1549 return 0;
1550
1551 e_inval:
1552 return -EINVAL;
1553 }
1554
decode_new_primary_affinity(void ** p,void * end,struct ceph_osdmap * map)1555 static int decode_new_primary_affinity(void **p, void *end,
1556 struct ceph_osdmap *map)
1557 {
1558 u32 n;
1559
1560 ceph_decode_32_safe(p, end, n, e_inval);
1561 while (n--) {
1562 u32 osd, aff;
1563 int ret;
1564
1565 ceph_decode_32_safe(p, end, osd, e_inval);
1566 ceph_decode_32_safe(p, end, aff, e_inval);
1567 if (osd >= map->max_osd)
1568 goto e_inval;
1569
1570 ret = set_primary_affinity(map, osd, aff);
1571 if (ret)
1572 return ret;
1573
1574 osdmap_info(map, "osd%d primary-affinity 0x%x\n", osd, aff);
1575 }
1576
1577 return 0;
1578
1579 e_inval:
1580 return -EINVAL;
1581 }
1582
__decode_pg_upmap(void ** p,void * end,bool __unused)1583 static struct ceph_pg_mapping *__decode_pg_upmap(void **p, void *end,
1584 bool __unused)
1585 {
1586 return __decode_pg_temp(p, end, false);
1587 }
1588
decode_pg_upmap(void ** p,void * end,struct ceph_osdmap * map)1589 static int decode_pg_upmap(void **p, void *end, struct ceph_osdmap *map)
1590 {
1591 return decode_pg_mapping(p, end, &map->pg_upmap, __decode_pg_upmap,
1592 false);
1593 }
1594
decode_new_pg_upmap(void ** p,void * end,struct ceph_osdmap * map)1595 static int decode_new_pg_upmap(void **p, void *end, struct ceph_osdmap *map)
1596 {
1597 return decode_pg_mapping(p, end, &map->pg_upmap, __decode_pg_upmap,
1598 true);
1599 }
1600
decode_old_pg_upmap(void ** p,void * end,struct ceph_osdmap * map)1601 static int decode_old_pg_upmap(void **p, void *end, struct ceph_osdmap *map)
1602 {
1603 return decode_pg_mapping(p, end, &map->pg_upmap, NULL, true);
1604 }
1605
__decode_pg_upmap_items(void ** p,void * end,bool __unused)1606 static struct ceph_pg_mapping *__decode_pg_upmap_items(void **p, void *end,
1607 bool __unused)
1608 {
1609 struct ceph_pg_mapping *pg;
1610 u32 len, i;
1611
1612 ceph_decode_32_safe(p, end, len, e_inval);
1613 if ((size_t)len > (SIZE_MAX - sizeof(*pg)) / (2 * sizeof(u32)))
1614 return ERR_PTR(-EINVAL);
1615
1616 ceph_decode_need(p, end, 2 * len * sizeof(u32), e_inval);
1617 pg = alloc_pg_mapping(2 * len * sizeof(u32));
1618 if (!pg)
1619 return ERR_PTR(-ENOMEM);
1620
1621 pg->pg_upmap_items.len = len;
1622 for (i = 0; i < len; i++) {
1623 pg->pg_upmap_items.from_to[i][0] = ceph_decode_32(p);
1624 pg->pg_upmap_items.from_to[i][1] = ceph_decode_32(p);
1625 }
1626
1627 return pg;
1628
1629 e_inval:
1630 return ERR_PTR(-EINVAL);
1631 }
1632
decode_pg_upmap_items(void ** p,void * end,struct ceph_osdmap * map)1633 static int decode_pg_upmap_items(void **p, void *end, struct ceph_osdmap *map)
1634 {
1635 return decode_pg_mapping(p, end, &map->pg_upmap_items,
1636 __decode_pg_upmap_items, false);
1637 }
1638
decode_new_pg_upmap_items(void ** p,void * end,struct ceph_osdmap * map)1639 static int decode_new_pg_upmap_items(void **p, void *end,
1640 struct ceph_osdmap *map)
1641 {
1642 return decode_pg_mapping(p, end, &map->pg_upmap_items,
1643 __decode_pg_upmap_items, true);
1644 }
1645
decode_old_pg_upmap_items(void ** p,void * end,struct ceph_osdmap * map)1646 static int decode_old_pg_upmap_items(void **p, void *end,
1647 struct ceph_osdmap *map)
1648 {
1649 return decode_pg_mapping(p, end, &map->pg_upmap_items, NULL, true);
1650 }
1651
1652 /*
1653 * decode a full map.
1654 */
osdmap_decode(void ** p,void * end,bool msgr2,struct ceph_osdmap * map)1655 static int osdmap_decode(void **p, void *end, bool msgr2,
1656 struct ceph_osdmap *map)
1657 {
1658 u8 struct_v;
1659 u32 epoch = 0;
1660 void *start = *p;
1661 u32 max;
1662 u32 len, i;
1663 int err;
1664
1665 dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p));
1666
1667 err = get_osdmap_client_data_v(p, end, "full", &struct_v);
1668 if (err)
1669 goto bad;
1670
1671 /* fsid, epoch, created, modified */
1672 ceph_decode_need(p, end, sizeof(map->fsid) + sizeof(u32) +
1673 sizeof(map->created) + sizeof(map->modified), e_inval);
1674 ceph_decode_copy(p, &map->fsid, sizeof(map->fsid));
1675 epoch = map->epoch = ceph_decode_32(p);
1676 ceph_decode_copy(p, &map->created, sizeof(map->created));
1677 ceph_decode_copy(p, &map->modified, sizeof(map->modified));
1678
1679 /* pools */
1680 err = decode_pools(p, end, map);
1681 if (err)
1682 goto bad;
1683
1684 /* pool_name */
1685 err = decode_pool_names(p, end, map);
1686 if (err)
1687 goto bad;
1688
1689 ceph_decode_32_safe(p, end, map->pool_max, e_inval);
1690
1691 ceph_decode_32_safe(p, end, map->flags, e_inval);
1692
1693 /* max_osd */
1694 ceph_decode_32_safe(p, end, max, e_inval);
1695
1696 /* (re)alloc osd arrays */
1697 err = osdmap_set_max_osd(map, max);
1698 if (err)
1699 goto bad;
1700
1701 /* osd_state, osd_weight, osd_addrs->client_addr */
1702 ceph_decode_need(p, end, 3*sizeof(u32) +
1703 map->max_osd*(struct_v >= 5 ? sizeof(u32) :
1704 sizeof(u8)) +
1705 sizeof(*map->osd_weight), e_inval);
1706 if (ceph_decode_32(p) != map->max_osd)
1707 goto e_inval;
1708
1709 if (struct_v >= 5) {
1710 for (i = 0; i < map->max_osd; i++)
1711 map->osd_state[i] = ceph_decode_32(p);
1712 } else {
1713 for (i = 0; i < map->max_osd; i++)
1714 map->osd_state[i] = ceph_decode_8(p);
1715 }
1716
1717 if (ceph_decode_32(p) != map->max_osd)
1718 goto e_inval;
1719
1720 for (i = 0; i < map->max_osd; i++)
1721 map->osd_weight[i] = ceph_decode_32(p);
1722
1723 if (ceph_decode_32(p) != map->max_osd)
1724 goto e_inval;
1725
1726 for (i = 0; i < map->max_osd; i++) {
1727 struct ceph_entity_addr *addr = &map->osd_addr[i];
1728
1729 if (struct_v >= 8)
1730 err = ceph_decode_entity_addrvec(p, end, msgr2, addr);
1731 else
1732 err = ceph_decode_entity_addr(p, end, addr);
1733 if (err)
1734 goto bad;
1735
1736 dout("%s osd%d addr %s\n", __func__, i, ceph_pr_addr(addr));
1737 }
1738
1739 /* pg_temp */
1740 err = decode_pg_temp(p, end, map);
1741 if (err)
1742 goto bad;
1743
1744 /* primary_temp */
1745 if (struct_v >= 1) {
1746 err = decode_primary_temp(p, end, map);
1747 if (err)
1748 goto bad;
1749 }
1750
1751 /* primary_affinity */
1752 if (struct_v >= 2) {
1753 err = decode_primary_affinity(p, end, map);
1754 if (err)
1755 goto bad;
1756 } else {
1757 WARN_ON(map->osd_primary_affinity);
1758 }
1759
1760 /* crush */
1761 ceph_decode_32_safe(p, end, len, e_inval);
1762 err = osdmap_set_crush(map, crush_decode(*p, min(*p + len, end)));
1763 if (err)
1764 goto bad;
1765
1766 *p += len;
1767 if (struct_v >= 3) {
1768 /* erasure_code_profiles */
1769 ceph_decode_skip_map_of_map(p, end, string, string, string,
1770 e_inval);
1771 }
1772
1773 if (struct_v >= 4) {
1774 err = decode_pg_upmap(p, end, map);
1775 if (err)
1776 goto bad;
1777
1778 err = decode_pg_upmap_items(p, end, map);
1779 if (err)
1780 goto bad;
1781 } else {
1782 WARN_ON(!RB_EMPTY_ROOT(&map->pg_upmap));
1783 WARN_ON(!RB_EMPTY_ROOT(&map->pg_upmap_items));
1784 }
1785
1786 /* ignore the rest */
1787 *p = end;
1788
1789 dout("full osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd);
1790 return 0;
1791
1792 e_inval:
1793 err = -EINVAL;
1794 bad:
1795 pr_err("corrupt full osdmap (%d) epoch %d off %d (%p of %p-%p)\n",
1796 err, epoch, (int)(*p - start), *p, start, end);
1797 print_hex_dump(KERN_DEBUG, "osdmap: ",
1798 DUMP_PREFIX_OFFSET, 16, 1,
1799 start, end - start, true);
1800 return err;
1801 }
1802
1803 /*
1804 * Allocate and decode a full map.
1805 */
ceph_osdmap_decode(void ** p,void * end,bool msgr2)1806 struct ceph_osdmap *ceph_osdmap_decode(void **p, void *end, bool msgr2)
1807 {
1808 struct ceph_osdmap *map;
1809 int ret;
1810
1811 map = ceph_osdmap_alloc();
1812 if (!map)
1813 return ERR_PTR(-ENOMEM);
1814
1815 ret = osdmap_decode(p, end, msgr2, map);
1816 if (ret) {
1817 ceph_osdmap_destroy(map);
1818 return ERR_PTR(ret);
1819 }
1820
1821 return map;
1822 }
1823
1824 /*
1825 * Encoding order is (new_up_client, new_state, new_weight). Need to
1826 * apply in the (new_weight, new_state, new_up_client) order, because
1827 * an incremental map may look like e.g.
1828 *
1829 * new_up_client: { osd=6, addr=... } # set osd_state and addr
1830 * new_state: { osd=6, xorstate=EXISTS } # clear osd_state
1831 */
decode_new_up_state_weight(void ** p,void * end,u8 struct_v,bool msgr2,struct ceph_osdmap * map)1832 static int decode_new_up_state_weight(void **p, void *end, u8 struct_v,
1833 bool msgr2, struct ceph_osdmap *map)
1834 {
1835 void *new_up_client;
1836 void *new_state;
1837 void *new_weight_end;
1838 u32 len;
1839 int ret;
1840 int i;
1841
1842 new_up_client = *p;
1843 ceph_decode_32_safe(p, end, len, e_inval);
1844 for (i = 0; i < len; ++i) {
1845 struct ceph_entity_addr addr;
1846
1847 ceph_decode_skip_32(p, end, e_inval);
1848 if (struct_v >= 7)
1849 ret = ceph_decode_entity_addrvec(p, end, msgr2, &addr);
1850 else
1851 ret = ceph_decode_entity_addr(p, end, &addr);
1852 if (ret)
1853 return ret;
1854 }
1855
1856 new_state = *p;
1857 ceph_decode_32_safe(p, end, len, e_inval);
1858 len *= sizeof(u32) + (struct_v >= 5 ? sizeof(u32) : sizeof(u8));
1859 ceph_decode_need(p, end, len, e_inval);
1860 *p += len;
1861
1862 /* new_weight */
1863 ceph_decode_32_safe(p, end, len, e_inval);
1864 while (len--) {
1865 s32 osd;
1866 u32 w;
1867
1868 ceph_decode_need(p, end, 2*sizeof(u32), e_inval);
1869 osd = ceph_decode_32(p);
1870 w = ceph_decode_32(p);
1871 if (osd >= map->max_osd)
1872 goto e_inval;
1873
1874 osdmap_info(map, "osd%d weight 0x%x %s\n", osd, w,
1875 w == CEPH_OSD_IN ? "(in)" :
1876 (w == CEPH_OSD_OUT ? "(out)" : ""));
1877 map->osd_weight[osd] = w;
1878
1879 /*
1880 * If we are marking in, set the EXISTS, and clear the
1881 * AUTOOUT and NEW bits.
1882 */
1883 if (w) {
1884 map->osd_state[osd] |= CEPH_OSD_EXISTS;
1885 map->osd_state[osd] &= ~(CEPH_OSD_AUTOOUT |
1886 CEPH_OSD_NEW);
1887 }
1888 }
1889 new_weight_end = *p;
1890
1891 /* new_state (up/down) */
1892 *p = new_state;
1893 len = ceph_decode_32(p);
1894 while (len--) {
1895 s32 osd;
1896 u32 xorstate;
1897
1898 osd = ceph_decode_32(p);
1899 if (osd >= map->max_osd)
1900 goto e_inval;
1901
1902 if (struct_v >= 5)
1903 xorstate = ceph_decode_32(p);
1904 else
1905 xorstate = ceph_decode_8(p);
1906 if (xorstate == 0)
1907 xorstate = CEPH_OSD_UP;
1908 if ((map->osd_state[osd] & CEPH_OSD_UP) &&
1909 (xorstate & CEPH_OSD_UP))
1910 osdmap_info(map, "osd%d down\n", osd);
1911 if ((map->osd_state[osd] & CEPH_OSD_EXISTS) &&
1912 (xorstate & CEPH_OSD_EXISTS)) {
1913 osdmap_info(map, "osd%d does not exist\n", osd);
1914 ret = set_primary_affinity(map, osd,
1915 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY);
1916 if (ret)
1917 return ret;
1918 memset(map->osd_addr + osd, 0, sizeof(*map->osd_addr));
1919 map->osd_state[osd] = 0;
1920 } else {
1921 map->osd_state[osd] ^= xorstate;
1922 }
1923 }
1924
1925 /* new_up_client */
1926 *p = new_up_client;
1927 len = ceph_decode_32(p);
1928 while (len--) {
1929 s32 osd;
1930 struct ceph_entity_addr addr;
1931
1932 osd = ceph_decode_32(p);
1933 if (osd >= map->max_osd)
1934 goto e_inval;
1935
1936 if (struct_v >= 7)
1937 ret = ceph_decode_entity_addrvec(p, end, msgr2, &addr);
1938 else
1939 ret = ceph_decode_entity_addr(p, end, &addr);
1940 if (ret)
1941 return ret;
1942
1943 dout("%s osd%d addr %s\n", __func__, osd, ceph_pr_addr(&addr));
1944
1945 osdmap_info(map, "osd%d up\n", osd);
1946 map->osd_state[osd] |= CEPH_OSD_EXISTS | CEPH_OSD_UP;
1947 map->osd_addr[osd] = addr;
1948 }
1949
1950 *p = new_weight_end;
1951 return 0;
1952
1953 e_inval:
1954 return -EINVAL;
1955 }
1956
1957 /*
1958 * decode and apply an incremental map update.
1959 */
osdmap_apply_incremental(void ** p,void * end,bool msgr2,struct ceph_osdmap * map)1960 struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, bool msgr2,
1961 struct ceph_osdmap *map)
1962 {
1963 struct ceph_fsid fsid;
1964 u32 epoch = 0;
1965 struct ceph_timespec modified;
1966 s32 len;
1967 u64 pool;
1968 __s64 new_pool_max;
1969 __s32 new_flags, max;
1970 void *start = *p;
1971 int err;
1972 u8 struct_v;
1973
1974 dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p));
1975
1976 err = get_osdmap_client_data_v(p, end, "inc", &struct_v);
1977 if (err)
1978 goto bad;
1979
1980 /* fsid, epoch, modified, new_pool_max, new_flags */
1981 ceph_decode_need(p, end, sizeof(fsid) + sizeof(u32) + sizeof(modified) +
1982 sizeof(u64) + sizeof(u32), e_inval);
1983 ceph_decode_copy(p, &fsid, sizeof(fsid));
1984 epoch = ceph_decode_32(p);
1985 ceph_decode_copy(p, &modified, sizeof(modified));
1986 new_pool_max = ceph_decode_64(p);
1987 new_flags = ceph_decode_32(p);
1988
1989 if (epoch != map->epoch + 1)
1990 goto e_inval;
1991
1992 /* full map? */
1993 ceph_decode_32_safe(p, end, len, e_inval);
1994 if (len > 0) {
1995 dout("apply_incremental full map len %d, %p to %p\n",
1996 len, *p, end);
1997 return ceph_osdmap_decode(p, min(*p+len, end), msgr2);
1998 }
1999
2000 /* new crush? */
2001 ceph_decode_32_safe(p, end, len, e_inval);
2002 if (len > 0) {
2003 err = osdmap_set_crush(map,
2004 crush_decode(*p, min(*p + len, end)));
2005 if (err)
2006 goto bad;
2007 *p += len;
2008 }
2009
2010 /* new flags? */
2011 if (new_flags >= 0)
2012 map->flags = new_flags;
2013 if (new_pool_max >= 0)
2014 map->pool_max = new_pool_max;
2015
2016 /* new max? */
2017 ceph_decode_32_safe(p, end, max, e_inval);
2018 if (max >= 0) {
2019 err = osdmap_set_max_osd(map, max);
2020 if (err)
2021 goto bad;
2022 }
2023
2024 map->epoch++;
2025 map->modified = modified;
2026
2027 /* new_pools */
2028 err = decode_new_pools(p, end, map);
2029 if (err)
2030 goto bad;
2031
2032 /* new_pool_names */
2033 err = decode_pool_names(p, end, map);
2034 if (err)
2035 goto bad;
2036
2037 /* old_pool */
2038 ceph_decode_32_safe(p, end, len, e_inval);
2039 while (len--) {
2040 struct ceph_pg_pool_info *pi;
2041
2042 ceph_decode_64_safe(p, end, pool, e_inval);
2043 pi = lookup_pg_pool(&map->pg_pools, pool);
2044 if (pi)
2045 __remove_pg_pool(&map->pg_pools, pi);
2046 }
2047
2048 /* new_up_client, new_state, new_weight */
2049 err = decode_new_up_state_weight(p, end, struct_v, msgr2, map);
2050 if (err)
2051 goto bad;
2052
2053 /* new_pg_temp */
2054 err = decode_new_pg_temp(p, end, map);
2055 if (err)
2056 goto bad;
2057
2058 /* new_primary_temp */
2059 if (struct_v >= 1) {
2060 err = decode_new_primary_temp(p, end, map);
2061 if (err)
2062 goto bad;
2063 }
2064
2065 /* new_primary_affinity */
2066 if (struct_v >= 2) {
2067 err = decode_new_primary_affinity(p, end, map);
2068 if (err)
2069 goto bad;
2070 }
2071
2072 if (struct_v >= 3) {
2073 /* new_erasure_code_profiles */
2074 ceph_decode_skip_map_of_map(p, end, string, string, string,
2075 e_inval);
2076 /* old_erasure_code_profiles */
2077 ceph_decode_skip_set(p, end, string, e_inval);
2078 }
2079
2080 if (struct_v >= 4) {
2081 err = decode_new_pg_upmap(p, end, map);
2082 if (err)
2083 goto bad;
2084
2085 err = decode_old_pg_upmap(p, end, map);
2086 if (err)
2087 goto bad;
2088
2089 err = decode_new_pg_upmap_items(p, end, map);
2090 if (err)
2091 goto bad;
2092
2093 err = decode_old_pg_upmap_items(p, end, map);
2094 if (err)
2095 goto bad;
2096 }
2097
2098 /* ignore the rest */
2099 *p = end;
2100
2101 dout("inc osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd);
2102 return map;
2103
2104 e_inval:
2105 err = -EINVAL;
2106 bad:
2107 pr_err("corrupt inc osdmap (%d) epoch %d off %d (%p of %p-%p)\n",
2108 err, epoch, (int)(*p - start), *p, start, end);
2109 print_hex_dump(KERN_DEBUG, "osdmap: ",
2110 DUMP_PREFIX_OFFSET, 16, 1,
2111 start, end - start, true);
2112 return ERR_PTR(err);
2113 }
2114
ceph_oloc_copy(struct ceph_object_locator * dest,const struct ceph_object_locator * src)2115 void ceph_oloc_copy(struct ceph_object_locator *dest,
2116 const struct ceph_object_locator *src)
2117 {
2118 ceph_oloc_destroy(dest);
2119
2120 dest->pool = src->pool;
2121 if (src->pool_ns)
2122 dest->pool_ns = ceph_get_string(src->pool_ns);
2123 else
2124 dest->pool_ns = NULL;
2125 }
2126 EXPORT_SYMBOL(ceph_oloc_copy);
2127
ceph_oloc_destroy(struct ceph_object_locator * oloc)2128 void ceph_oloc_destroy(struct ceph_object_locator *oloc)
2129 {
2130 ceph_put_string(oloc->pool_ns);
2131 }
2132 EXPORT_SYMBOL(ceph_oloc_destroy);
2133
ceph_oid_copy(struct ceph_object_id * dest,const struct ceph_object_id * src)2134 void ceph_oid_copy(struct ceph_object_id *dest,
2135 const struct ceph_object_id *src)
2136 {
2137 ceph_oid_destroy(dest);
2138
2139 if (src->name != src->inline_name) {
2140 /* very rare, see ceph_object_id definition */
2141 dest->name = kmalloc(src->name_len + 1,
2142 GFP_NOIO | __GFP_NOFAIL);
2143 } else {
2144 dest->name = dest->inline_name;
2145 }
2146 memcpy(dest->name, src->name, src->name_len + 1);
2147 dest->name_len = src->name_len;
2148 }
2149 EXPORT_SYMBOL(ceph_oid_copy);
2150
2151 static __printf(2, 0)
oid_printf_vargs(struct ceph_object_id * oid,const char * fmt,va_list ap)2152 int oid_printf_vargs(struct ceph_object_id *oid, const char *fmt, va_list ap)
2153 {
2154 int len;
2155
2156 WARN_ON(!ceph_oid_empty(oid));
2157
2158 len = vsnprintf(oid->inline_name, sizeof(oid->inline_name), fmt, ap);
2159 if (len >= sizeof(oid->inline_name))
2160 return len;
2161
2162 oid->name_len = len;
2163 return 0;
2164 }
2165
2166 /*
2167 * If oid doesn't fit into inline buffer, BUG.
2168 */
ceph_oid_printf(struct ceph_object_id * oid,const char * fmt,...)2169 void ceph_oid_printf(struct ceph_object_id *oid, const char *fmt, ...)
2170 {
2171 va_list ap;
2172
2173 va_start(ap, fmt);
2174 BUG_ON(oid_printf_vargs(oid, fmt, ap));
2175 va_end(ap);
2176 }
2177 EXPORT_SYMBOL(ceph_oid_printf);
2178
2179 static __printf(3, 0)
oid_aprintf_vargs(struct ceph_object_id * oid,gfp_t gfp,const char * fmt,va_list ap)2180 int oid_aprintf_vargs(struct ceph_object_id *oid, gfp_t gfp,
2181 const char *fmt, va_list ap)
2182 {
2183 va_list aq;
2184 int len;
2185
2186 va_copy(aq, ap);
2187 len = oid_printf_vargs(oid, fmt, aq);
2188 va_end(aq);
2189
2190 if (len) {
2191 char *external_name;
2192
2193 external_name = kmalloc(len + 1, gfp);
2194 if (!external_name)
2195 return -ENOMEM;
2196
2197 oid->name = external_name;
2198 WARN_ON(vsnprintf(oid->name, len + 1, fmt, ap) != len);
2199 oid->name_len = len;
2200 }
2201
2202 return 0;
2203 }
2204
2205 /*
2206 * If oid doesn't fit into inline buffer, allocate.
2207 */
ceph_oid_aprintf(struct ceph_object_id * oid,gfp_t gfp,const char * fmt,...)2208 int ceph_oid_aprintf(struct ceph_object_id *oid, gfp_t gfp,
2209 const char *fmt, ...)
2210 {
2211 va_list ap;
2212 int ret;
2213
2214 va_start(ap, fmt);
2215 ret = oid_aprintf_vargs(oid, gfp, fmt, ap);
2216 va_end(ap);
2217
2218 return ret;
2219 }
2220 EXPORT_SYMBOL(ceph_oid_aprintf);
2221
ceph_oid_destroy(struct ceph_object_id * oid)2222 void ceph_oid_destroy(struct ceph_object_id *oid)
2223 {
2224 if (oid->name != oid->inline_name)
2225 kfree(oid->name);
2226 }
2227 EXPORT_SYMBOL(ceph_oid_destroy);
2228
2229 /*
2230 * osds only
2231 */
__osds_equal(const struct ceph_osds * lhs,const struct ceph_osds * rhs)2232 static bool __osds_equal(const struct ceph_osds *lhs,
2233 const struct ceph_osds *rhs)
2234 {
2235 if (lhs->size == rhs->size &&
2236 !memcmp(lhs->osds, rhs->osds, rhs->size * sizeof(rhs->osds[0])))
2237 return true;
2238
2239 return false;
2240 }
2241
2242 /*
2243 * osds + primary
2244 */
osds_equal(const struct ceph_osds * lhs,const struct ceph_osds * rhs)2245 static bool osds_equal(const struct ceph_osds *lhs,
2246 const struct ceph_osds *rhs)
2247 {
2248 if (__osds_equal(lhs, rhs) &&
2249 lhs->primary == rhs->primary)
2250 return true;
2251
2252 return false;
2253 }
2254
osds_valid(const struct ceph_osds * set)2255 static bool osds_valid(const struct ceph_osds *set)
2256 {
2257 /* non-empty set */
2258 if (set->size > 0 && set->primary >= 0)
2259 return true;
2260
2261 /* empty can_shift_osds set */
2262 if (!set->size && set->primary == -1)
2263 return true;
2264
2265 /* empty !can_shift_osds set - all NONE */
2266 if (set->size > 0 && set->primary == -1) {
2267 int i;
2268
2269 for (i = 0; i < set->size; i++) {
2270 if (set->osds[i] != CRUSH_ITEM_NONE)
2271 break;
2272 }
2273 if (i == set->size)
2274 return true;
2275 }
2276
2277 return false;
2278 }
2279
ceph_osds_copy(struct ceph_osds * dest,const struct ceph_osds * src)2280 void ceph_osds_copy(struct ceph_osds *dest, const struct ceph_osds *src)
2281 {
2282 memcpy(dest->osds, src->osds, src->size * sizeof(src->osds[0]));
2283 dest->size = src->size;
2284 dest->primary = src->primary;
2285 }
2286
ceph_pg_is_split(const struct ceph_pg * pgid,u32 old_pg_num,u32 new_pg_num)2287 bool ceph_pg_is_split(const struct ceph_pg *pgid, u32 old_pg_num,
2288 u32 new_pg_num)
2289 {
2290 int old_bits = calc_bits_of(old_pg_num);
2291 int old_mask = (1 << old_bits) - 1;
2292 int n;
2293
2294 WARN_ON(pgid->seed >= old_pg_num);
2295 if (new_pg_num <= old_pg_num)
2296 return false;
2297
2298 for (n = 1; ; n++) {
2299 int next_bit = n << (old_bits - 1);
2300 u32 s = next_bit | pgid->seed;
2301
2302 if (s < old_pg_num || s == pgid->seed)
2303 continue;
2304 if (s >= new_pg_num)
2305 break;
2306
2307 s = ceph_stable_mod(s, old_pg_num, old_mask);
2308 if (s == pgid->seed)
2309 return true;
2310 }
2311
2312 return false;
2313 }
2314
ceph_is_new_interval(const struct ceph_osds * old_acting,const struct ceph_osds * new_acting,const struct ceph_osds * old_up,const struct ceph_osds * new_up,int old_size,int new_size,int old_min_size,int new_min_size,u32 old_pg_num,u32 new_pg_num,bool old_sort_bitwise,bool new_sort_bitwise,bool old_recovery_deletes,bool new_recovery_deletes,const struct ceph_pg * pgid)2315 bool ceph_is_new_interval(const struct ceph_osds *old_acting,
2316 const struct ceph_osds *new_acting,
2317 const struct ceph_osds *old_up,
2318 const struct ceph_osds *new_up,
2319 int old_size,
2320 int new_size,
2321 int old_min_size,
2322 int new_min_size,
2323 u32 old_pg_num,
2324 u32 new_pg_num,
2325 bool old_sort_bitwise,
2326 bool new_sort_bitwise,
2327 bool old_recovery_deletes,
2328 bool new_recovery_deletes,
2329 const struct ceph_pg *pgid)
2330 {
2331 return !osds_equal(old_acting, new_acting) ||
2332 !osds_equal(old_up, new_up) ||
2333 old_size != new_size ||
2334 old_min_size != new_min_size ||
2335 ceph_pg_is_split(pgid, old_pg_num, new_pg_num) ||
2336 old_sort_bitwise != new_sort_bitwise ||
2337 old_recovery_deletes != new_recovery_deletes;
2338 }
2339
calc_pg_rank(int osd,const struct ceph_osds * acting)2340 static int calc_pg_rank(int osd, const struct ceph_osds *acting)
2341 {
2342 int i;
2343
2344 for (i = 0; i < acting->size; i++) {
2345 if (acting->osds[i] == osd)
2346 return i;
2347 }
2348
2349 return -1;
2350 }
2351
primary_changed(const struct ceph_osds * old_acting,const struct ceph_osds * new_acting)2352 static bool primary_changed(const struct ceph_osds *old_acting,
2353 const struct ceph_osds *new_acting)
2354 {
2355 if (!old_acting->size && !new_acting->size)
2356 return false; /* both still empty */
2357
2358 if (!old_acting->size ^ !new_acting->size)
2359 return true; /* was empty, now not, or vice versa */
2360
2361 if (old_acting->primary != new_acting->primary)
2362 return true; /* primary changed */
2363
2364 if (calc_pg_rank(old_acting->primary, old_acting) !=
2365 calc_pg_rank(new_acting->primary, new_acting))
2366 return true;
2367
2368 return false; /* same primary (tho replicas may have changed) */
2369 }
2370
ceph_osds_changed(const struct ceph_osds * old_acting,const struct ceph_osds * new_acting,bool any_change)2371 bool ceph_osds_changed(const struct ceph_osds *old_acting,
2372 const struct ceph_osds *new_acting,
2373 bool any_change)
2374 {
2375 if (primary_changed(old_acting, new_acting))
2376 return true;
2377
2378 if (any_change && !__osds_equal(old_acting, new_acting))
2379 return true;
2380
2381 return false;
2382 }
2383
2384 /*
2385 * Map an object into a PG.
2386 *
2387 * Should only be called with target_oid and target_oloc (as opposed to
2388 * base_oid and base_oloc), since tiering isn't taken into account.
2389 */
__ceph_object_locator_to_pg(struct ceph_pg_pool_info * pi,const struct ceph_object_id * oid,const struct ceph_object_locator * oloc,struct ceph_pg * raw_pgid)2390 void __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi,
2391 const struct ceph_object_id *oid,
2392 const struct ceph_object_locator *oloc,
2393 struct ceph_pg *raw_pgid)
2394 {
2395 WARN_ON(pi->id != oloc->pool);
2396
2397 if (!oloc->pool_ns) {
2398 raw_pgid->pool = oloc->pool;
2399 raw_pgid->seed = ceph_str_hash(pi->object_hash, oid->name,
2400 oid->name_len);
2401 dout("%s %s -> raw_pgid %llu.%x\n", __func__, oid->name,
2402 raw_pgid->pool, raw_pgid->seed);
2403 } else {
2404 char stack_buf[256];
2405 char *buf = stack_buf;
2406 int nsl = oloc->pool_ns->len;
2407 size_t total = nsl + 1 + oid->name_len;
2408
2409 if (total > sizeof(stack_buf))
2410 buf = kmalloc(total, GFP_NOIO | __GFP_NOFAIL);
2411 memcpy(buf, oloc->pool_ns->str, nsl);
2412 buf[nsl] = '\037';
2413 memcpy(buf + nsl + 1, oid->name, oid->name_len);
2414 raw_pgid->pool = oloc->pool;
2415 raw_pgid->seed = ceph_str_hash(pi->object_hash, buf, total);
2416 if (buf != stack_buf)
2417 kfree(buf);
2418 dout("%s %s ns %.*s -> raw_pgid %llu.%x\n", __func__,
2419 oid->name, nsl, oloc->pool_ns->str,
2420 raw_pgid->pool, raw_pgid->seed);
2421 }
2422 }
2423
ceph_object_locator_to_pg(struct ceph_osdmap * osdmap,const struct ceph_object_id * oid,const struct ceph_object_locator * oloc,struct ceph_pg * raw_pgid)2424 int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
2425 const struct ceph_object_id *oid,
2426 const struct ceph_object_locator *oloc,
2427 struct ceph_pg *raw_pgid)
2428 {
2429 struct ceph_pg_pool_info *pi;
2430
2431 pi = ceph_pg_pool_by_id(osdmap, oloc->pool);
2432 if (!pi)
2433 return -ENOENT;
2434
2435 __ceph_object_locator_to_pg(pi, oid, oloc, raw_pgid);
2436 return 0;
2437 }
2438 EXPORT_SYMBOL(ceph_object_locator_to_pg);
2439
2440 /*
2441 * Map a raw PG (full precision ps) into an actual PG.
2442 */
raw_pg_to_pg(struct ceph_pg_pool_info * pi,const struct ceph_pg * raw_pgid,struct ceph_pg * pgid)2443 static void raw_pg_to_pg(struct ceph_pg_pool_info *pi,
2444 const struct ceph_pg *raw_pgid,
2445 struct ceph_pg *pgid)
2446 {
2447 pgid->pool = raw_pgid->pool;
2448 pgid->seed = ceph_stable_mod(raw_pgid->seed, pi->pg_num,
2449 pi->pg_num_mask);
2450 }
2451
2452 /*
2453 * Map a raw PG (full precision ps) into a placement ps (placement
2454 * seed). Include pool id in that value so that different pools don't
2455 * use the same seeds.
2456 */
raw_pg_to_pps(struct ceph_pg_pool_info * pi,const struct ceph_pg * raw_pgid)2457 static u32 raw_pg_to_pps(struct ceph_pg_pool_info *pi,
2458 const struct ceph_pg *raw_pgid)
2459 {
2460 if (pi->flags & CEPH_POOL_FLAG_HASHPSPOOL) {
2461 /* hash pool id and seed so that pool PGs do not overlap */
2462 return crush_hash32_2(CRUSH_HASH_RJENKINS1,
2463 ceph_stable_mod(raw_pgid->seed,
2464 pi->pgp_num,
2465 pi->pgp_num_mask),
2466 raw_pgid->pool);
2467 } else {
2468 /*
2469 * legacy behavior: add ps and pool together. this is
2470 * not a great approach because the PGs from each pool
2471 * will overlap on top of each other: 0.5 == 1.4 ==
2472 * 2.3 == ...
2473 */
2474 return ceph_stable_mod(raw_pgid->seed, pi->pgp_num,
2475 pi->pgp_num_mask) +
2476 (unsigned)raw_pgid->pool;
2477 }
2478 }
2479
2480 /*
2481 * Magic value used for a "default" fallback choose_args, used if the
2482 * crush_choose_arg_map passed to do_crush() does not exist. If this
2483 * also doesn't exist, fall back to canonical weights.
2484 */
2485 #define CEPH_DEFAULT_CHOOSE_ARGS -1
2486
do_crush(struct ceph_osdmap * map,int ruleno,int x,int * result,int result_max,const __u32 * weight,int weight_max,s64 choose_args_index)2487 static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
2488 int *result, int result_max,
2489 const __u32 *weight, int weight_max,
2490 s64 choose_args_index)
2491 {
2492 struct crush_choose_arg_map *arg_map;
2493 struct crush_work *work;
2494 int r;
2495
2496 BUG_ON(result_max > CEPH_PG_MAX_SIZE);
2497
2498 arg_map = lookup_choose_arg_map(&map->crush->choose_args,
2499 choose_args_index);
2500 if (!arg_map)
2501 arg_map = lookup_choose_arg_map(&map->crush->choose_args,
2502 CEPH_DEFAULT_CHOOSE_ARGS);
2503
2504 work = get_workspace(&map->crush_wsm, map->crush);
2505 r = crush_do_rule(map->crush, ruleno, x, result, result_max,
2506 weight, weight_max, work,
2507 arg_map ? arg_map->args : NULL);
2508 put_workspace(&map->crush_wsm, work);
2509 return r;
2510 }
2511
remove_nonexistent_osds(struct ceph_osdmap * osdmap,struct ceph_pg_pool_info * pi,struct ceph_osds * set)2512 static void remove_nonexistent_osds(struct ceph_osdmap *osdmap,
2513 struct ceph_pg_pool_info *pi,
2514 struct ceph_osds *set)
2515 {
2516 int i;
2517
2518 if (ceph_can_shift_osds(pi)) {
2519 int removed = 0;
2520
2521 /* shift left */
2522 for (i = 0; i < set->size; i++) {
2523 if (!ceph_osd_exists(osdmap, set->osds[i])) {
2524 removed++;
2525 continue;
2526 }
2527 if (removed)
2528 set->osds[i - removed] = set->osds[i];
2529 }
2530 set->size -= removed;
2531 } else {
2532 /* set dne devices to NONE */
2533 for (i = 0; i < set->size; i++) {
2534 if (!ceph_osd_exists(osdmap, set->osds[i]))
2535 set->osds[i] = CRUSH_ITEM_NONE;
2536 }
2537 }
2538 }
2539
2540 /*
2541 * Calculate raw set (CRUSH output) for given PG and filter out
2542 * nonexistent OSDs. ->primary is undefined for a raw set.
2543 *
2544 * Placement seed (CRUSH input) is returned through @ppps.
2545 */
pg_to_raw_osds(struct ceph_osdmap * osdmap,struct ceph_pg_pool_info * pi,const struct ceph_pg * raw_pgid,struct ceph_osds * raw,u32 * ppps)2546 static void pg_to_raw_osds(struct ceph_osdmap *osdmap,
2547 struct ceph_pg_pool_info *pi,
2548 const struct ceph_pg *raw_pgid,
2549 struct ceph_osds *raw,
2550 u32 *ppps)
2551 {
2552 u32 pps = raw_pg_to_pps(pi, raw_pgid);
2553 int ruleno;
2554 int len;
2555
2556 ceph_osds_init(raw);
2557 if (ppps)
2558 *ppps = pps;
2559
2560 ruleno = crush_find_rule(osdmap->crush, pi->crush_ruleset, pi->type,
2561 pi->size);
2562 if (ruleno < 0) {
2563 pr_err("no crush rule: pool %lld ruleset %d type %d size %d\n",
2564 pi->id, pi->crush_ruleset, pi->type, pi->size);
2565 return;
2566 }
2567
2568 if (pi->size > ARRAY_SIZE(raw->osds)) {
2569 pr_err_ratelimited("pool %lld ruleset %d type %d too wide: size %d > %zu\n",
2570 pi->id, pi->crush_ruleset, pi->type, pi->size,
2571 ARRAY_SIZE(raw->osds));
2572 return;
2573 }
2574
2575 len = do_crush(osdmap, ruleno, pps, raw->osds, pi->size,
2576 osdmap->osd_weight, osdmap->max_osd, pi->id);
2577 if (len < 0) {
2578 pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n",
2579 len, ruleno, pi->id, pi->crush_ruleset, pi->type,
2580 pi->size);
2581 return;
2582 }
2583
2584 raw->size = len;
2585 remove_nonexistent_osds(osdmap, pi, raw);
2586 }
2587
2588 /* apply pg_upmap[_items] mappings */
apply_upmap(struct ceph_osdmap * osdmap,const struct ceph_pg * pgid,struct ceph_osds * raw)2589 static void apply_upmap(struct ceph_osdmap *osdmap,
2590 const struct ceph_pg *pgid,
2591 struct ceph_osds *raw)
2592 {
2593 struct ceph_pg_mapping *pg;
2594 int i, j;
2595
2596 pg = lookup_pg_mapping(&osdmap->pg_upmap, pgid);
2597 if (pg) {
2598 /* make sure targets aren't marked out */
2599 for (i = 0; i < pg->pg_upmap.len; i++) {
2600 int osd = pg->pg_upmap.osds[i];
2601
2602 if (osd != CRUSH_ITEM_NONE &&
2603 osd < osdmap->max_osd &&
2604 osdmap->osd_weight[osd] == 0) {
2605 /* reject/ignore explicit mapping */
2606 return;
2607 }
2608 }
2609 for (i = 0; i < pg->pg_upmap.len; i++)
2610 raw->osds[i] = pg->pg_upmap.osds[i];
2611 raw->size = pg->pg_upmap.len;
2612 /* check and apply pg_upmap_items, if any */
2613 }
2614
2615 pg = lookup_pg_mapping(&osdmap->pg_upmap_items, pgid);
2616 if (pg) {
2617 /*
2618 * Note: this approach does not allow a bidirectional swap,
2619 * e.g., [[1,2],[2,1]] applied to [0,1,2] -> [0,2,1].
2620 */
2621 for (i = 0; i < pg->pg_upmap_items.len; i++) {
2622 int from = pg->pg_upmap_items.from_to[i][0];
2623 int to = pg->pg_upmap_items.from_to[i][1];
2624 int pos = -1;
2625 bool exists = false;
2626
2627 /* make sure replacement doesn't already appear */
2628 for (j = 0; j < raw->size; j++) {
2629 int osd = raw->osds[j];
2630
2631 if (osd == to) {
2632 exists = true;
2633 break;
2634 }
2635 /* ignore mapping if target is marked out */
2636 if (osd == from && pos < 0 &&
2637 !(to != CRUSH_ITEM_NONE &&
2638 to < osdmap->max_osd &&
2639 osdmap->osd_weight[to] == 0)) {
2640 pos = j;
2641 }
2642 }
2643 if (!exists && pos >= 0)
2644 raw->osds[pos] = to;
2645 }
2646 }
2647 }
2648
2649 /*
2650 * Given raw set, calculate up set and up primary. By definition of an
2651 * up set, the result won't contain nonexistent or down OSDs.
2652 *
2653 * This is done in-place - on return @set is the up set. If it's
2654 * empty, ->primary will remain undefined.
2655 */
raw_to_up_osds(struct ceph_osdmap * osdmap,struct ceph_pg_pool_info * pi,struct ceph_osds * set)2656 static void raw_to_up_osds(struct ceph_osdmap *osdmap,
2657 struct ceph_pg_pool_info *pi,
2658 struct ceph_osds *set)
2659 {
2660 int i;
2661
2662 /* ->primary is undefined for a raw set */
2663 BUG_ON(set->primary != -1);
2664
2665 if (ceph_can_shift_osds(pi)) {
2666 int removed = 0;
2667
2668 /* shift left */
2669 for (i = 0; i < set->size; i++) {
2670 if (ceph_osd_is_down(osdmap, set->osds[i])) {
2671 removed++;
2672 continue;
2673 }
2674 if (removed)
2675 set->osds[i - removed] = set->osds[i];
2676 }
2677 set->size -= removed;
2678 if (set->size > 0)
2679 set->primary = set->osds[0];
2680 } else {
2681 /* set down/dne devices to NONE */
2682 for (i = set->size - 1; i >= 0; i--) {
2683 if (ceph_osd_is_down(osdmap, set->osds[i]))
2684 set->osds[i] = CRUSH_ITEM_NONE;
2685 else
2686 set->primary = set->osds[i];
2687 }
2688 }
2689 }
2690
apply_primary_affinity(struct ceph_osdmap * osdmap,struct ceph_pg_pool_info * pi,u32 pps,struct ceph_osds * up)2691 static void apply_primary_affinity(struct ceph_osdmap *osdmap,
2692 struct ceph_pg_pool_info *pi,
2693 u32 pps,
2694 struct ceph_osds *up)
2695 {
2696 int i;
2697 int pos = -1;
2698
2699 /*
2700 * Do we have any non-default primary_affinity values for these
2701 * osds?
2702 */
2703 if (!osdmap->osd_primary_affinity)
2704 return;
2705
2706 for (i = 0; i < up->size; i++) {
2707 int osd = up->osds[i];
2708
2709 if (osd != CRUSH_ITEM_NONE &&
2710 osdmap->osd_primary_affinity[osd] !=
2711 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY) {
2712 break;
2713 }
2714 }
2715 if (i == up->size)
2716 return;
2717
2718 /*
2719 * Pick the primary. Feed both the seed (for the pg) and the
2720 * osd into the hash/rng so that a proportional fraction of an
2721 * osd's pgs get rejected as primary.
2722 */
2723 for (i = 0; i < up->size; i++) {
2724 int osd = up->osds[i];
2725 u32 aff;
2726
2727 if (osd == CRUSH_ITEM_NONE)
2728 continue;
2729
2730 aff = osdmap->osd_primary_affinity[osd];
2731 if (aff < CEPH_OSD_MAX_PRIMARY_AFFINITY &&
2732 (crush_hash32_2(CRUSH_HASH_RJENKINS1,
2733 pps, osd) >> 16) >= aff) {
2734 /*
2735 * We chose not to use this primary. Note it
2736 * anyway as a fallback in case we don't pick
2737 * anyone else, but keep looking.
2738 */
2739 if (pos < 0)
2740 pos = i;
2741 } else {
2742 pos = i;
2743 break;
2744 }
2745 }
2746 if (pos < 0)
2747 return;
2748
2749 up->primary = up->osds[pos];
2750
2751 if (ceph_can_shift_osds(pi) && pos > 0) {
2752 /* move the new primary to the front */
2753 for (i = pos; i > 0; i--)
2754 up->osds[i] = up->osds[i - 1];
2755 up->osds[0] = up->primary;
2756 }
2757 }
2758
2759 /*
2760 * Get pg_temp and primary_temp mappings for given PG.
2761 *
2762 * Note that a PG may have none, only pg_temp, only primary_temp or
2763 * both pg_temp and primary_temp mappings. This means @temp isn't
2764 * always a valid OSD set on return: in the "only primary_temp" case,
2765 * @temp will have its ->primary >= 0 but ->size == 0.
2766 */
get_temp_osds(struct ceph_osdmap * osdmap,struct ceph_pg_pool_info * pi,const struct ceph_pg * pgid,struct ceph_osds * temp)2767 static void get_temp_osds(struct ceph_osdmap *osdmap,
2768 struct ceph_pg_pool_info *pi,
2769 const struct ceph_pg *pgid,
2770 struct ceph_osds *temp)
2771 {
2772 struct ceph_pg_mapping *pg;
2773 int i;
2774
2775 ceph_osds_init(temp);
2776
2777 /* pg_temp? */
2778 pg = lookup_pg_mapping(&osdmap->pg_temp, pgid);
2779 if (pg) {
2780 for (i = 0; i < pg->pg_temp.len; i++) {
2781 if (ceph_osd_is_down(osdmap, pg->pg_temp.osds[i])) {
2782 if (ceph_can_shift_osds(pi))
2783 continue;
2784
2785 temp->osds[temp->size++] = CRUSH_ITEM_NONE;
2786 } else {
2787 temp->osds[temp->size++] = pg->pg_temp.osds[i];
2788 }
2789 }
2790
2791 /* apply pg_temp's primary */
2792 for (i = 0; i < temp->size; i++) {
2793 if (temp->osds[i] != CRUSH_ITEM_NONE) {
2794 temp->primary = temp->osds[i];
2795 break;
2796 }
2797 }
2798 }
2799
2800 /* primary_temp? */
2801 pg = lookup_pg_mapping(&osdmap->primary_temp, pgid);
2802 if (pg)
2803 temp->primary = pg->primary_temp.osd;
2804 }
2805
2806 /*
2807 * Map a PG to its acting set as well as its up set.
2808 *
2809 * Acting set is used for data mapping purposes, while up set can be
2810 * recorded for detecting interval changes and deciding whether to
2811 * resend a request.
2812 */
ceph_pg_to_up_acting_osds(struct ceph_osdmap * osdmap,struct ceph_pg_pool_info * pi,const struct ceph_pg * raw_pgid,struct ceph_osds * up,struct ceph_osds * acting)2813 void ceph_pg_to_up_acting_osds(struct ceph_osdmap *osdmap,
2814 struct ceph_pg_pool_info *pi,
2815 const struct ceph_pg *raw_pgid,
2816 struct ceph_osds *up,
2817 struct ceph_osds *acting)
2818 {
2819 struct ceph_pg pgid;
2820 u32 pps;
2821
2822 WARN_ON(pi->id != raw_pgid->pool);
2823 raw_pg_to_pg(pi, raw_pgid, &pgid);
2824
2825 pg_to_raw_osds(osdmap, pi, raw_pgid, up, &pps);
2826 apply_upmap(osdmap, &pgid, up);
2827 raw_to_up_osds(osdmap, pi, up);
2828 apply_primary_affinity(osdmap, pi, pps, up);
2829 get_temp_osds(osdmap, pi, &pgid, acting);
2830 if (!acting->size) {
2831 memcpy(acting->osds, up->osds, up->size * sizeof(up->osds[0]));
2832 acting->size = up->size;
2833 if (acting->primary == -1)
2834 acting->primary = up->primary;
2835 }
2836 WARN_ON(!osds_valid(up) || !osds_valid(acting));
2837 }
2838
ceph_pg_to_primary_shard(struct ceph_osdmap * osdmap,struct ceph_pg_pool_info * pi,const struct ceph_pg * raw_pgid,struct ceph_spg * spgid)2839 bool ceph_pg_to_primary_shard(struct ceph_osdmap *osdmap,
2840 struct ceph_pg_pool_info *pi,
2841 const struct ceph_pg *raw_pgid,
2842 struct ceph_spg *spgid)
2843 {
2844 struct ceph_pg pgid;
2845 struct ceph_osds up, acting;
2846 int i;
2847
2848 WARN_ON(pi->id != raw_pgid->pool);
2849 raw_pg_to_pg(pi, raw_pgid, &pgid);
2850
2851 if (ceph_can_shift_osds(pi)) {
2852 spgid->pgid = pgid; /* struct */
2853 spgid->shard = CEPH_SPG_NOSHARD;
2854 return true;
2855 }
2856
2857 ceph_pg_to_up_acting_osds(osdmap, pi, &pgid, &up, &acting);
2858 for (i = 0; i < acting.size; i++) {
2859 if (acting.osds[i] == acting.primary) {
2860 spgid->pgid = pgid; /* struct */
2861 spgid->shard = i;
2862 return true;
2863 }
2864 }
2865
2866 return false;
2867 }
2868
2869 /*
2870 * Return acting primary for given PG, or -1 if none.
2871 */
ceph_pg_to_acting_primary(struct ceph_osdmap * osdmap,const struct ceph_pg * raw_pgid)2872 int ceph_pg_to_acting_primary(struct ceph_osdmap *osdmap,
2873 const struct ceph_pg *raw_pgid)
2874 {
2875 struct ceph_pg_pool_info *pi;
2876 struct ceph_osds up, acting;
2877
2878 pi = ceph_pg_pool_by_id(osdmap, raw_pgid->pool);
2879 if (!pi)
2880 return -1;
2881
2882 ceph_pg_to_up_acting_osds(osdmap, pi, raw_pgid, &up, &acting);
2883 return acting.primary;
2884 }
2885 EXPORT_SYMBOL(ceph_pg_to_acting_primary);
2886
alloc_crush_loc(size_t type_name_len,size_t name_len)2887 static struct crush_loc_node *alloc_crush_loc(size_t type_name_len,
2888 size_t name_len)
2889 {
2890 struct crush_loc_node *loc;
2891
2892 loc = kmalloc(sizeof(*loc) + type_name_len + name_len + 2, GFP_NOIO);
2893 if (!loc)
2894 return NULL;
2895
2896 RB_CLEAR_NODE(&loc->cl_node);
2897 return loc;
2898 }
2899
free_crush_loc(struct crush_loc_node * loc)2900 static void free_crush_loc(struct crush_loc_node *loc)
2901 {
2902 WARN_ON(!RB_EMPTY_NODE(&loc->cl_node));
2903
2904 kfree(loc);
2905 }
2906
crush_loc_compare(const struct crush_loc * loc1,const struct crush_loc * loc2)2907 static int crush_loc_compare(const struct crush_loc *loc1,
2908 const struct crush_loc *loc2)
2909 {
2910 return strcmp(loc1->cl_type_name, loc2->cl_type_name) ?:
2911 strcmp(loc1->cl_name, loc2->cl_name);
2912 }
2913
DEFINE_RB_FUNCS2(crush_loc,struct crush_loc_node,cl_loc,crush_loc_compare,RB_BYPTR,const struct crush_loc *,cl_node)2914 DEFINE_RB_FUNCS2(crush_loc, struct crush_loc_node, cl_loc, crush_loc_compare,
2915 RB_BYPTR, const struct crush_loc *, cl_node)
2916
2917 /*
2918 * Parses a set of <bucket type name>':'<bucket name> pairs separated
2919 * by '|', e.g. "rack:foo1|rack:foo2|datacenter:bar".
2920 *
2921 * Note that @crush_location is modified by strsep().
2922 */
2923 int ceph_parse_crush_location(char *crush_location, struct rb_root *locs)
2924 {
2925 struct crush_loc_node *loc;
2926 const char *type_name, *name, *colon;
2927 size_t type_name_len, name_len;
2928
2929 dout("%s '%s'\n", __func__, crush_location);
2930 while ((type_name = strsep(&crush_location, "|"))) {
2931 colon = strchr(type_name, ':');
2932 if (!colon)
2933 return -EINVAL;
2934
2935 type_name_len = colon - type_name;
2936 if (type_name_len == 0)
2937 return -EINVAL;
2938
2939 name = colon + 1;
2940 name_len = strlen(name);
2941 if (name_len == 0)
2942 return -EINVAL;
2943
2944 loc = alloc_crush_loc(type_name_len, name_len);
2945 if (!loc)
2946 return -ENOMEM;
2947
2948 loc->cl_loc.cl_type_name = loc->cl_data;
2949 memcpy(loc->cl_loc.cl_type_name, type_name, type_name_len);
2950 loc->cl_loc.cl_type_name[type_name_len] = '\0';
2951
2952 loc->cl_loc.cl_name = loc->cl_data + type_name_len + 1;
2953 memcpy(loc->cl_loc.cl_name, name, name_len);
2954 loc->cl_loc.cl_name[name_len] = '\0';
2955
2956 if (!__insert_crush_loc(locs, loc)) {
2957 free_crush_loc(loc);
2958 return -EEXIST;
2959 }
2960
2961 dout("%s type_name '%s' name '%s'\n", __func__,
2962 loc->cl_loc.cl_type_name, loc->cl_loc.cl_name);
2963 }
2964
2965 return 0;
2966 }
2967
ceph_compare_crush_locs(struct rb_root * locs1,struct rb_root * locs2)2968 int ceph_compare_crush_locs(struct rb_root *locs1, struct rb_root *locs2)
2969 {
2970 struct rb_node *n1 = rb_first(locs1);
2971 struct rb_node *n2 = rb_first(locs2);
2972 int ret;
2973
2974 for ( ; n1 && n2; n1 = rb_next(n1), n2 = rb_next(n2)) {
2975 struct crush_loc_node *loc1 =
2976 rb_entry(n1, struct crush_loc_node, cl_node);
2977 struct crush_loc_node *loc2 =
2978 rb_entry(n2, struct crush_loc_node, cl_node);
2979
2980 ret = crush_loc_compare(&loc1->cl_loc, &loc2->cl_loc);
2981 if (ret)
2982 return ret;
2983 }
2984
2985 if (!n1 && n2)
2986 return -1;
2987 if (n1 && !n2)
2988 return 1;
2989 return 0;
2990 }
2991
ceph_clear_crush_locs(struct rb_root * locs)2992 void ceph_clear_crush_locs(struct rb_root *locs)
2993 {
2994 while (!RB_EMPTY_ROOT(locs)) {
2995 struct crush_loc_node *loc =
2996 rb_entry(rb_first(locs), struct crush_loc_node, cl_node);
2997
2998 erase_crush_loc(locs, loc);
2999 free_crush_loc(loc);
3000 }
3001 }
3002
3003 /*
3004 * [a-zA-Z0-9-_.]+
3005 */
is_valid_crush_name(const char * name)3006 static bool is_valid_crush_name(const char *name)
3007 {
3008 do {
3009 if (!('a' <= *name && *name <= 'z') &&
3010 !('A' <= *name && *name <= 'Z') &&
3011 !('0' <= *name && *name <= '9') &&
3012 *name != '-' && *name != '_' && *name != '.')
3013 return false;
3014 } while (*++name != '\0');
3015
3016 return true;
3017 }
3018
3019 /*
3020 * Gets the parent of an item. Returns its id (<0 because the
3021 * parent is always a bucket), type id (>0 for the same reason,
3022 * via @parent_type_id) and location (via @parent_loc). If no
3023 * parent, returns 0.
3024 *
3025 * Does a linear search, as there are no parent pointers of any
3026 * kind. Note that the result is ambiguous for items that occur
3027 * multiple times in the map.
3028 */
get_immediate_parent(struct crush_map * c,int id,u16 * parent_type_id,struct crush_loc * parent_loc)3029 static int get_immediate_parent(struct crush_map *c, int id,
3030 u16 *parent_type_id,
3031 struct crush_loc *parent_loc)
3032 {
3033 struct crush_bucket *b;
3034 struct crush_name_node *type_cn, *cn;
3035 int i, j;
3036
3037 for (i = 0; i < c->max_buckets; i++) {
3038 b = c->buckets[i];
3039 if (!b)
3040 continue;
3041
3042 /* ignore per-class shadow hierarchy */
3043 cn = lookup_crush_name(&c->names, b->id);
3044 if (!cn || !is_valid_crush_name(cn->cn_name))
3045 continue;
3046
3047 for (j = 0; j < b->size; j++) {
3048 if (b->items[j] != id)
3049 continue;
3050
3051 *parent_type_id = b->type;
3052 type_cn = lookup_crush_name(&c->type_names, b->type);
3053 parent_loc->cl_type_name = type_cn->cn_name;
3054 parent_loc->cl_name = cn->cn_name;
3055 return b->id;
3056 }
3057 }
3058
3059 return 0; /* no parent */
3060 }
3061
3062 /*
3063 * Calculates the locality/distance from an item to a client
3064 * location expressed in terms of CRUSH hierarchy as a set of
3065 * (bucket type name, bucket name) pairs. Specifically, looks
3066 * for the lowest-valued bucket type for which the location of
3067 * @id matches one of the locations in @locs, so for standard
3068 * bucket types (host = 1, rack = 3, datacenter = 8, zone = 9)
3069 * a matching host is closer than a matching rack and a matching
3070 * data center is closer than a matching zone.
3071 *
3072 * Specifying multiple locations (a "multipath" location) such
3073 * as "rack=foo1 rack=foo2 datacenter=bar" is allowed -- @locs
3074 * is a multimap. The locality will be:
3075 *
3076 * - 3 for OSDs in racks foo1 and foo2
3077 * - 8 for OSDs in data center bar
3078 * - -1 for all other OSDs
3079 *
3080 * The lowest possible bucket type is 1, so the best locality
3081 * for an OSD is 1 (i.e. a matching host). Locality 0 would be
3082 * the OSD itself.
3083 */
ceph_get_crush_locality(struct ceph_osdmap * osdmap,int id,struct rb_root * locs)3084 int ceph_get_crush_locality(struct ceph_osdmap *osdmap, int id,
3085 struct rb_root *locs)
3086 {
3087 struct crush_loc loc;
3088 u16 type_id;
3089
3090 /*
3091 * Instead of repeated get_immediate_parent() calls,
3092 * the location of @id could be obtained with a single
3093 * depth-first traversal.
3094 */
3095 for (;;) {
3096 id = get_immediate_parent(osdmap->crush, id, &type_id, &loc);
3097 if (id >= 0)
3098 return -1; /* not local */
3099
3100 if (lookup_crush_loc(locs, &loc))
3101 return type_id;
3102 }
3103 }
3104