xref: /linux/net/ceph/osdmap.c (revision 69050f8d6d075dc01af7a5f2f550a8067510366f)
1 // SPDX-License-Identifier: GPL-2.0
2 
3 #include <linux/ceph/ceph_debug.h>
4 
5 #include <linux/module.h>
6 #include <linux/slab.h>
7 
8 #include <linux/ceph/libceph.h>
9 #include <linux/ceph/osdmap.h>
10 #include <linux/ceph/decode.h>
11 #include <linux/crush/hash.h>
12 #include <linux/crush/mapper.h>
13 
14 static __printf(2, 3)
15 void osdmap_info(const struct ceph_osdmap *map, const char *fmt, ...)
16 {
17 	struct va_format vaf;
18 	va_list args;
19 
20 	va_start(args, fmt);
21 	vaf.fmt = fmt;
22 	vaf.va = &args;
23 
24 	printk(KERN_INFO "%s (%pU e%u): %pV", KBUILD_MODNAME, &map->fsid,
25 	       map->epoch, &vaf);
26 
27 	va_end(args);
28 }
29 
30 char *ceph_osdmap_state_str(char *str, int len, u32 state)
31 {
32 	if (!len)
33 		return str;
34 
35 	if ((state & CEPH_OSD_EXISTS) && (state & CEPH_OSD_UP))
36 		snprintf(str, len, "exists, up");
37 	else if (state & CEPH_OSD_EXISTS)
38 		snprintf(str, len, "exists");
39 	else if (state & CEPH_OSD_UP)
40 		snprintf(str, len, "up");
41 	else
42 		snprintf(str, len, "doesn't exist");
43 
44 	return str;
45 }
46 
47 /* maps */
48 
49 static int calc_bits_of(unsigned int t)
50 {
51 	int b = 0;
52 	while (t) {
53 		t = t >> 1;
54 		b++;
55 	}
56 	return b;
57 }
58 
59 /*
60  * the foo_mask is the smallest value 2^n-1 that is >= foo.
61  */
62 static void calc_pg_masks(struct ceph_pg_pool_info *pi)
63 {
64 	pi->pg_num_mask = (1 << calc_bits_of(pi->pg_num-1)) - 1;
65 	pi->pgp_num_mask = (1 << calc_bits_of(pi->pgp_num-1)) - 1;
66 }
67 
68 /*
69  * decode crush map
70  */
71 static int crush_decode_uniform_bucket(void **p, void *end,
72 				       struct crush_bucket_uniform *b)
73 {
74 	dout("crush_decode_uniform_bucket %p to %p\n", *p, end);
75 	ceph_decode_need(p, end, (1+b->h.size) * sizeof(u32), bad);
76 	b->item_weight = ceph_decode_32(p);
77 	return 0;
78 bad:
79 	return -EINVAL;
80 }
81 
82 static int crush_decode_list_bucket(void **p, void *end,
83 				    struct crush_bucket_list *b)
84 {
85 	int j;
86 	dout("crush_decode_list_bucket %p to %p\n", *p, end);
87 	b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
88 	if (b->item_weights == NULL)
89 		return -ENOMEM;
90 	b->sum_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
91 	if (b->sum_weights == NULL)
92 		return -ENOMEM;
93 	ceph_decode_need(p, end, 2 * b->h.size * sizeof(u32), bad);
94 	for (j = 0; j < b->h.size; j++) {
95 		b->item_weights[j] = ceph_decode_32(p);
96 		b->sum_weights[j] = ceph_decode_32(p);
97 	}
98 	return 0;
99 bad:
100 	return -EINVAL;
101 }
102 
103 static int crush_decode_tree_bucket(void **p, void *end,
104 				    struct crush_bucket_tree *b)
105 {
106 	int j;
107 	dout("crush_decode_tree_bucket %p to %p\n", *p, end);
108 	ceph_decode_8_safe(p, end, b->num_nodes, bad);
109 	b->node_weights = kcalloc(b->num_nodes, sizeof(u32), GFP_NOFS);
110 	if (b->node_weights == NULL)
111 		return -ENOMEM;
112 	ceph_decode_need(p, end, b->num_nodes * sizeof(u32), bad);
113 	for (j = 0; j < b->num_nodes; j++)
114 		b->node_weights[j] = ceph_decode_32(p);
115 	return 0;
116 bad:
117 	return -EINVAL;
118 }
119 
120 static int crush_decode_straw_bucket(void **p, void *end,
121 				     struct crush_bucket_straw *b)
122 {
123 	int j;
124 	dout("crush_decode_straw_bucket %p to %p\n", *p, end);
125 	b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
126 	if (b->item_weights == NULL)
127 		return -ENOMEM;
128 	b->straws = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
129 	if (b->straws == NULL)
130 		return -ENOMEM;
131 	ceph_decode_need(p, end, 2 * b->h.size * sizeof(u32), bad);
132 	for (j = 0; j < b->h.size; j++) {
133 		b->item_weights[j] = ceph_decode_32(p);
134 		b->straws[j] = ceph_decode_32(p);
135 	}
136 	return 0;
137 bad:
138 	return -EINVAL;
139 }
140 
141 static int crush_decode_straw2_bucket(void **p, void *end,
142 				      struct crush_bucket_straw2 *b)
143 {
144 	int j;
145 	dout("crush_decode_straw2_bucket %p to %p\n", *p, end);
146 	b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
147 	if (b->item_weights == NULL)
148 		return -ENOMEM;
149 	ceph_decode_need(p, end, b->h.size * sizeof(u32), bad);
150 	for (j = 0; j < b->h.size; j++)
151 		b->item_weights[j] = ceph_decode_32(p);
152 	return 0;
153 bad:
154 	return -EINVAL;
155 }
156 
157 struct crush_name_node {
158 	struct rb_node cn_node;
159 	int cn_id;
160 	char cn_name[];
161 };
162 
163 static struct crush_name_node *alloc_crush_name(size_t name_len)
164 {
165 	struct crush_name_node *cn;
166 
167 	cn = kmalloc(sizeof(*cn) + name_len + 1, GFP_NOIO);
168 	if (!cn)
169 		return NULL;
170 
171 	RB_CLEAR_NODE(&cn->cn_node);
172 	return cn;
173 }
174 
175 static void free_crush_name(struct crush_name_node *cn)
176 {
177 	WARN_ON(!RB_EMPTY_NODE(&cn->cn_node));
178 
179 	kfree(cn);
180 }
181 
182 DEFINE_RB_FUNCS(crush_name, struct crush_name_node, cn_id, cn_node)
183 
184 static int decode_crush_names(void **p, void *end, struct rb_root *root)
185 {
186 	u32 n;
187 
188 	ceph_decode_32_safe(p, end, n, e_inval);
189 	while (n--) {
190 		struct crush_name_node *cn;
191 		int id;
192 		u32 name_len;
193 
194 		ceph_decode_32_safe(p, end, id, e_inval);
195 		ceph_decode_32_safe(p, end, name_len, e_inval);
196 		ceph_decode_need(p, end, name_len, e_inval);
197 
198 		cn = alloc_crush_name(name_len);
199 		if (!cn)
200 			return -ENOMEM;
201 
202 		cn->cn_id = id;
203 		memcpy(cn->cn_name, *p, name_len);
204 		cn->cn_name[name_len] = '\0';
205 		*p += name_len;
206 
207 		if (!__insert_crush_name(root, cn)) {
208 			free_crush_name(cn);
209 			return -EEXIST;
210 		}
211 	}
212 
213 	return 0;
214 
215 e_inval:
216 	return -EINVAL;
217 }
218 
219 void clear_crush_names(struct rb_root *root)
220 {
221 	while (!RB_EMPTY_ROOT(root)) {
222 		struct crush_name_node *cn =
223 		    rb_entry(rb_first(root), struct crush_name_node, cn_node);
224 
225 		erase_crush_name(root, cn);
226 		free_crush_name(cn);
227 	}
228 }
229 
230 static struct crush_choose_arg_map *alloc_choose_arg_map(void)
231 {
232 	struct crush_choose_arg_map *arg_map;
233 
234 	arg_map = kzalloc_obj(*arg_map, GFP_NOIO);
235 	if (!arg_map)
236 		return NULL;
237 
238 	RB_CLEAR_NODE(&arg_map->node);
239 	return arg_map;
240 }
241 
242 static void free_choose_arg_map(struct crush_choose_arg_map *arg_map)
243 {
244 	int i, j;
245 
246 	if (!arg_map)
247 		return;
248 
249 	WARN_ON(!RB_EMPTY_NODE(&arg_map->node));
250 
251 	if (arg_map->args) {
252 		for (i = 0; i < arg_map->size; i++) {
253 			struct crush_choose_arg *arg = &arg_map->args[i];
254 			if (arg->weight_set) {
255 				for (j = 0; j < arg->weight_set_size; j++)
256 					kfree(arg->weight_set[j].weights);
257 				kfree(arg->weight_set);
258 			}
259 			kfree(arg->ids);
260 		}
261 		kfree(arg_map->args);
262 	}
263 	kfree(arg_map);
264 }
265 
266 DEFINE_RB_FUNCS(choose_arg_map, struct crush_choose_arg_map, choose_args_index,
267 		node);
268 
269 void clear_choose_args(struct crush_map *c)
270 {
271 	while (!RB_EMPTY_ROOT(&c->choose_args)) {
272 		struct crush_choose_arg_map *arg_map =
273 		    rb_entry(rb_first(&c->choose_args),
274 			     struct crush_choose_arg_map, node);
275 
276 		erase_choose_arg_map(&c->choose_args, arg_map);
277 		free_choose_arg_map(arg_map);
278 	}
279 }
280 
281 static u32 *decode_array_32_alloc(void **p, void *end, u32 *plen)
282 {
283 	u32 *a = NULL;
284 	u32 len;
285 	int ret;
286 
287 	ceph_decode_32_safe(p, end, len, e_inval);
288 	if (len) {
289 		u32 i;
290 
291 		a = kmalloc_array(len, sizeof(u32), GFP_NOIO);
292 		if (!a) {
293 			ret = -ENOMEM;
294 			goto fail;
295 		}
296 
297 		ceph_decode_need(p, end, len * sizeof(u32), e_inval);
298 		for (i = 0; i < len; i++)
299 			a[i] = ceph_decode_32(p);
300 	}
301 
302 	*plen = len;
303 	return a;
304 
305 e_inval:
306 	ret = -EINVAL;
307 fail:
308 	kfree(a);
309 	return ERR_PTR(ret);
310 }
311 
312 /*
313  * Assumes @arg is zero-initialized.
314  */
315 static int decode_choose_arg(void **p, void *end, struct crush_choose_arg *arg)
316 {
317 	int ret;
318 
319 	ceph_decode_32_safe(p, end, arg->weight_set_size, e_inval);
320 	if (arg->weight_set_size) {
321 		u32 i;
322 
323 		arg->weight_set = kmalloc_objs(*arg->weight_set,
324 					       arg->weight_set_size, GFP_NOIO);
325 		if (!arg->weight_set)
326 			return -ENOMEM;
327 
328 		for (i = 0; i < arg->weight_set_size; i++) {
329 			struct crush_weight_set *w = &arg->weight_set[i];
330 
331 			w->weights = decode_array_32_alloc(p, end, &w->size);
332 			if (IS_ERR(w->weights)) {
333 				ret = PTR_ERR(w->weights);
334 				w->weights = NULL;
335 				return ret;
336 			}
337 		}
338 	}
339 
340 	arg->ids = decode_array_32_alloc(p, end, &arg->ids_size);
341 	if (IS_ERR(arg->ids)) {
342 		ret = PTR_ERR(arg->ids);
343 		arg->ids = NULL;
344 		return ret;
345 	}
346 
347 	return 0;
348 
349 e_inval:
350 	return -EINVAL;
351 }
352 
353 static int decode_choose_args(void **p, void *end, struct crush_map *c)
354 {
355 	struct crush_choose_arg_map *arg_map = NULL;
356 	u32 num_choose_arg_maps, num_buckets;
357 	int ret;
358 
359 	ceph_decode_32_safe(p, end, num_choose_arg_maps, e_inval);
360 	while (num_choose_arg_maps--) {
361 		arg_map = alloc_choose_arg_map();
362 		if (!arg_map) {
363 			ret = -ENOMEM;
364 			goto fail;
365 		}
366 
367 		ceph_decode_64_safe(p, end, arg_map->choose_args_index,
368 				    e_inval);
369 		arg_map->size = c->max_buckets;
370 		arg_map->args = kzalloc_objs(*arg_map->args, arg_map->size,
371 					     GFP_NOIO);
372 		if (!arg_map->args) {
373 			ret = -ENOMEM;
374 			goto fail;
375 		}
376 
377 		ceph_decode_32_safe(p, end, num_buckets, e_inval);
378 		while (num_buckets--) {
379 			struct crush_choose_arg *arg;
380 			u32 bucket_index;
381 
382 			ceph_decode_32_safe(p, end, bucket_index, e_inval);
383 			if (bucket_index >= arg_map->size)
384 				goto e_inval;
385 
386 			arg = &arg_map->args[bucket_index];
387 			ret = decode_choose_arg(p, end, arg);
388 			if (ret)
389 				goto fail;
390 
391 			if (arg->ids_size &&
392 			    arg->ids_size != c->buckets[bucket_index]->size)
393 				goto e_inval;
394 		}
395 
396 		insert_choose_arg_map(&c->choose_args, arg_map);
397 	}
398 
399 	return 0;
400 
401 e_inval:
402 	ret = -EINVAL;
403 fail:
404 	free_choose_arg_map(arg_map);
405 	return ret;
406 }
407 
408 static void crush_finalize(struct crush_map *c)
409 {
410 	__s32 b;
411 
412 	/* Space for the array of pointers to per-bucket workspace */
413 	c->working_size = sizeof(struct crush_work) +
414 	    c->max_buckets * sizeof(struct crush_work_bucket *);
415 
416 	for (b = 0; b < c->max_buckets; b++) {
417 		if (!c->buckets[b])
418 			continue;
419 
420 		switch (c->buckets[b]->alg) {
421 		default:
422 			/*
423 			 * The base case, permutation variables and
424 			 * the pointer to the permutation array.
425 			 */
426 			c->working_size += sizeof(struct crush_work_bucket);
427 			break;
428 		}
429 		/* Every bucket has a permutation array. */
430 		c->working_size += c->buckets[b]->size * sizeof(__u32);
431 	}
432 }
433 
434 static struct crush_map *crush_decode(void *pbyval, void *end)
435 {
436 	struct crush_map *c;
437 	int err;
438 	int i, j;
439 	void **p = &pbyval;
440 	void *start = pbyval;
441 	u32 magic;
442 
443 	dout("crush_decode %p to %p len %d\n", *p, end, (int)(end - *p));
444 
445 	c = kzalloc_obj(*c, GFP_NOFS);
446 	if (c == NULL)
447 		return ERR_PTR(-ENOMEM);
448 
449 	c->type_names = RB_ROOT;
450 	c->names = RB_ROOT;
451 	c->choose_args = RB_ROOT;
452 
453         /* set tunables to default values */
454         c->choose_local_tries = 2;
455         c->choose_local_fallback_tries = 5;
456         c->choose_total_tries = 19;
457 	c->chooseleaf_descend_once = 0;
458 
459 	ceph_decode_need(p, end, 4*sizeof(u32), bad);
460 	magic = ceph_decode_32(p);
461 	if (magic != CRUSH_MAGIC) {
462 		pr_err("crush_decode magic %x != current %x\n",
463 		       (unsigned int)magic, (unsigned int)CRUSH_MAGIC);
464 		goto bad;
465 	}
466 	c->max_buckets = ceph_decode_32(p);
467 	c->max_rules = ceph_decode_32(p);
468 	c->max_devices = ceph_decode_32(p);
469 
470 	c->buckets = kzalloc_objs(*c->buckets, c->max_buckets, GFP_NOFS);
471 	if (c->buckets == NULL)
472 		goto badmem;
473 	c->rules = kzalloc_objs(*c->rules, c->max_rules, GFP_NOFS);
474 	if (c->rules == NULL)
475 		goto badmem;
476 
477 	/* buckets */
478 	for (i = 0; i < c->max_buckets; i++) {
479 		int size = 0;
480 		u32 alg;
481 		struct crush_bucket *b;
482 
483 		ceph_decode_32_safe(p, end, alg, bad);
484 		if (alg == 0) {
485 			c->buckets[i] = NULL;
486 			continue;
487 		}
488 		dout("crush_decode bucket %d off %x %p to %p\n",
489 		     i, (int)(*p-start), *p, end);
490 
491 		switch (alg) {
492 		case CRUSH_BUCKET_UNIFORM:
493 			size = sizeof(struct crush_bucket_uniform);
494 			break;
495 		case CRUSH_BUCKET_LIST:
496 			size = sizeof(struct crush_bucket_list);
497 			break;
498 		case CRUSH_BUCKET_TREE:
499 			size = sizeof(struct crush_bucket_tree);
500 			break;
501 		case CRUSH_BUCKET_STRAW:
502 			size = sizeof(struct crush_bucket_straw);
503 			break;
504 		case CRUSH_BUCKET_STRAW2:
505 			size = sizeof(struct crush_bucket_straw2);
506 			break;
507 		default:
508 			goto bad;
509 		}
510 		BUG_ON(size == 0);
511 		b = c->buckets[i] = kzalloc(size, GFP_NOFS);
512 		if (b == NULL)
513 			goto badmem;
514 
515 		ceph_decode_need(p, end, 4*sizeof(u32), bad);
516 		b->id = ceph_decode_32(p);
517 		b->type = ceph_decode_16(p);
518 		b->alg = ceph_decode_8(p);
519 		b->hash = ceph_decode_8(p);
520 		b->weight = ceph_decode_32(p);
521 		b->size = ceph_decode_32(p);
522 
523 		dout("crush_decode bucket size %d off %x %p to %p\n",
524 		     b->size, (int)(*p-start), *p, end);
525 
526 		b->items = kzalloc_objs(__s32, b->size, GFP_NOFS);
527 		if (b->items == NULL)
528 			goto badmem;
529 
530 		ceph_decode_need(p, end, b->size*sizeof(u32), bad);
531 		for (j = 0; j < b->size; j++)
532 			b->items[j] = ceph_decode_32(p);
533 
534 		switch (b->alg) {
535 		case CRUSH_BUCKET_UNIFORM:
536 			err = crush_decode_uniform_bucket(p, end,
537 				  (struct crush_bucket_uniform *)b);
538 			if (err < 0)
539 				goto fail;
540 			break;
541 		case CRUSH_BUCKET_LIST:
542 			err = crush_decode_list_bucket(p, end,
543 			       (struct crush_bucket_list *)b);
544 			if (err < 0)
545 				goto fail;
546 			break;
547 		case CRUSH_BUCKET_TREE:
548 			err = crush_decode_tree_bucket(p, end,
549 				(struct crush_bucket_tree *)b);
550 			if (err < 0)
551 				goto fail;
552 			break;
553 		case CRUSH_BUCKET_STRAW:
554 			err = crush_decode_straw_bucket(p, end,
555 				(struct crush_bucket_straw *)b);
556 			if (err < 0)
557 				goto fail;
558 			break;
559 		case CRUSH_BUCKET_STRAW2:
560 			err = crush_decode_straw2_bucket(p, end,
561 				(struct crush_bucket_straw2 *)b);
562 			if (err < 0)
563 				goto fail;
564 			break;
565 		}
566 	}
567 
568 	/* rules */
569 	dout("rule vec is %p\n", c->rules);
570 	for (i = 0; i < c->max_rules; i++) {
571 		u32 yes;
572 		struct crush_rule *r;
573 
574 		ceph_decode_32_safe(p, end, yes, bad);
575 		if (!yes) {
576 			dout("crush_decode NO rule %d off %x %p to %p\n",
577 			     i, (int)(*p-start), *p, end);
578 			c->rules[i] = NULL;
579 			continue;
580 		}
581 
582 		dout("crush_decode rule %d off %x %p to %p\n",
583 		     i, (int)(*p-start), *p, end);
584 
585 		/* len */
586 		ceph_decode_32_safe(p, end, yes, bad);
587 #if BITS_PER_LONG == 32
588 		if (yes > (ULONG_MAX - sizeof(*r))
589 			  / sizeof(struct crush_rule_step))
590 			goto bad;
591 #endif
592 		r = kmalloc_flex(*r, steps, yes, GFP_NOFS);
593 		if (r == NULL)
594 			goto badmem;
595 		dout(" rule %d is at %p\n", i, r);
596 		c->rules[i] = r;
597 		r->len = yes;
598 		ceph_decode_copy_safe(p, end, &r->mask, 4, bad); /* 4 u8's */
599 		ceph_decode_need(p, end, r->len*3*sizeof(u32), bad);
600 		for (j = 0; j < r->len; j++) {
601 			r->steps[j].op = ceph_decode_32(p);
602 			r->steps[j].arg1 = ceph_decode_32(p);
603 			r->steps[j].arg2 = ceph_decode_32(p);
604 		}
605 	}
606 
607 	err = decode_crush_names(p, end, &c->type_names);
608 	if (err)
609 		goto fail;
610 
611 	err = decode_crush_names(p, end, &c->names);
612 	if (err)
613 		goto fail;
614 
615 	ceph_decode_skip_map(p, end, 32, string, bad); /* rule_name_map */
616 
617         /* tunables */
618         ceph_decode_need(p, end, 3*sizeof(u32), done);
619         c->choose_local_tries = ceph_decode_32(p);
620         c->choose_local_fallback_tries =  ceph_decode_32(p);
621         c->choose_total_tries = ceph_decode_32(p);
622         dout("crush decode tunable choose_local_tries = %d\n",
623              c->choose_local_tries);
624         dout("crush decode tunable choose_local_fallback_tries = %d\n",
625              c->choose_local_fallback_tries);
626         dout("crush decode tunable choose_total_tries = %d\n",
627              c->choose_total_tries);
628 
629 	ceph_decode_need(p, end, sizeof(u32), done);
630 	c->chooseleaf_descend_once = ceph_decode_32(p);
631 	dout("crush decode tunable chooseleaf_descend_once = %d\n",
632 	     c->chooseleaf_descend_once);
633 
634 	ceph_decode_need(p, end, sizeof(u8), done);
635 	c->chooseleaf_vary_r = ceph_decode_8(p);
636 	dout("crush decode tunable chooseleaf_vary_r = %d\n",
637 	     c->chooseleaf_vary_r);
638 
639 	/* skip straw_calc_version, allowed_bucket_algs */
640 	ceph_decode_need(p, end, sizeof(u8) + sizeof(u32), done);
641 	*p += sizeof(u8) + sizeof(u32);
642 
643 	ceph_decode_need(p, end, sizeof(u8), done);
644 	c->chooseleaf_stable = ceph_decode_8(p);
645 	dout("crush decode tunable chooseleaf_stable = %d\n",
646 	     c->chooseleaf_stable);
647 
648 	if (*p != end) {
649 		/* class_map */
650 		ceph_decode_skip_map(p, end, 32, 32, bad);
651 		/* class_name */
652 		ceph_decode_skip_map(p, end, 32, string, bad);
653 		/* class_bucket */
654 		ceph_decode_skip_map_of_map(p, end, 32, 32, 32, bad);
655 	}
656 
657 	if (*p != end) {
658 		err = decode_choose_args(p, end, c);
659 		if (err)
660 			goto fail;
661 	}
662 
663 done:
664 	crush_finalize(c);
665 	dout("crush_decode success\n");
666 	return c;
667 
668 badmem:
669 	err = -ENOMEM;
670 fail:
671 	dout("crush_decode fail %d\n", err);
672 	crush_destroy(c);
673 	return ERR_PTR(err);
674 
675 bad:
676 	err = -EINVAL;
677 	goto fail;
678 }
679 
680 int ceph_pg_compare(const struct ceph_pg *lhs, const struct ceph_pg *rhs)
681 {
682 	if (lhs->pool < rhs->pool)
683 		return -1;
684 	if (lhs->pool > rhs->pool)
685 		return 1;
686 	if (lhs->seed < rhs->seed)
687 		return -1;
688 	if (lhs->seed > rhs->seed)
689 		return 1;
690 
691 	return 0;
692 }
693 
694 int ceph_spg_compare(const struct ceph_spg *lhs, const struct ceph_spg *rhs)
695 {
696 	int ret;
697 
698 	ret = ceph_pg_compare(&lhs->pgid, &rhs->pgid);
699 	if (ret)
700 		return ret;
701 
702 	if (lhs->shard < rhs->shard)
703 		return -1;
704 	if (lhs->shard > rhs->shard)
705 		return 1;
706 
707 	return 0;
708 }
709 
710 static struct ceph_pg_mapping *alloc_pg_mapping(size_t payload_len)
711 {
712 	struct ceph_pg_mapping *pg;
713 
714 	pg = kmalloc(sizeof(*pg) + payload_len, GFP_NOIO);
715 	if (!pg)
716 		return NULL;
717 
718 	RB_CLEAR_NODE(&pg->node);
719 	return pg;
720 }
721 
722 static void free_pg_mapping(struct ceph_pg_mapping *pg)
723 {
724 	WARN_ON(!RB_EMPTY_NODE(&pg->node));
725 
726 	kfree(pg);
727 }
728 
729 /*
730  * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid
731  * to a set of osds) and primary_temp (explicit primary setting)
732  */
733 DEFINE_RB_FUNCS2(pg_mapping, struct ceph_pg_mapping, pgid, ceph_pg_compare,
734 		 RB_BYPTR, const struct ceph_pg *, node)
735 
736 /*
737  * rbtree of pg pool info
738  */
739 DEFINE_RB_FUNCS(pg_pool, struct ceph_pg_pool_info, id, node)
740 
741 struct ceph_pg_pool_info *ceph_pg_pool_by_id(struct ceph_osdmap *map, u64 id)
742 {
743 	return lookup_pg_pool(&map->pg_pools, id);
744 }
745 
746 const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id)
747 {
748 	struct ceph_pg_pool_info *pi;
749 
750 	if (id == CEPH_NOPOOL)
751 		return NULL;
752 
753 	if (WARN_ON_ONCE(id > (u64) INT_MAX))
754 		return NULL;
755 
756 	pi = lookup_pg_pool(&map->pg_pools, id);
757 	return pi ? pi->name : NULL;
758 }
759 EXPORT_SYMBOL(ceph_pg_pool_name_by_id);
760 
761 int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name)
762 {
763 	struct rb_node *rbp;
764 
765 	for (rbp = rb_first(&map->pg_pools); rbp; rbp = rb_next(rbp)) {
766 		struct ceph_pg_pool_info *pi =
767 			rb_entry(rbp, struct ceph_pg_pool_info, node);
768 		if (pi->name && strcmp(pi->name, name) == 0)
769 			return pi->id;
770 	}
771 	return -ENOENT;
772 }
773 EXPORT_SYMBOL(ceph_pg_poolid_by_name);
774 
775 u64 ceph_pg_pool_flags(struct ceph_osdmap *map, u64 id)
776 {
777 	struct ceph_pg_pool_info *pi;
778 
779 	pi = lookup_pg_pool(&map->pg_pools, id);
780 	return pi ? pi->flags : 0;
781 }
782 EXPORT_SYMBOL(ceph_pg_pool_flags);
783 
784 static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi)
785 {
786 	erase_pg_pool(root, pi);
787 	kfree(pi->name);
788 	kfree(pi);
789 }
790 
791 static int decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
792 {
793 	u8 ev, cv;
794 	unsigned len, num;
795 	void *pool_end;
796 
797 	ceph_decode_need(p, end, 2 + 4, bad);
798 	ev = ceph_decode_8(p);  /* encoding version */
799 	cv = ceph_decode_8(p); /* compat version */
800 	if (ev < 5) {
801 		pr_warn("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv);
802 		return -EINVAL;
803 	}
804 	if (cv > 9) {
805 		pr_warn("got v %d cv %d > 9 of ceph_pg_pool\n", ev, cv);
806 		return -EINVAL;
807 	}
808 	len = ceph_decode_32(p);
809 	ceph_decode_need(p, end, len, bad);
810 	pool_end = *p + len;
811 
812 	ceph_decode_need(p, end, 4 + 4 + 4, bad);
813 	pi->type = ceph_decode_8(p);
814 	pi->size = ceph_decode_8(p);
815 	pi->crush_ruleset = ceph_decode_8(p);
816 	pi->object_hash = ceph_decode_8(p);
817 	pi->pg_num = ceph_decode_32(p);
818 	pi->pgp_num = ceph_decode_32(p);
819 
820 	/* lpg*, last_change, snap_seq, snap_epoch */
821 	ceph_decode_skip_n(p, end, 8 + 4 + 8 + 4, bad);
822 
823 	/* skip snaps */
824 	ceph_decode_32_safe(p, end, num, bad);
825 	while (num--) {
826 		/* snapid key, pool snap (with versions) */
827 		ceph_decode_skip_n(p, end, 8 + 2, bad);
828 		ceph_decode_skip_string(p, end, bad);
829 	}
830 
831 	/* removed_snaps */
832 	ceph_decode_skip_map(p, end, 64, 64, bad);
833 
834 	ceph_decode_need(p, end, 8 + 8 + 4, bad);
835 	*p += 8;  /* skip auid */
836 	pi->flags = ceph_decode_64(p);
837 	*p += 4;  /* skip crash_replay_interval */
838 
839 	if (ev >= 7)
840 		ceph_decode_8_safe(p, end, pi->min_size, bad);
841 	else
842 		pi->min_size = pi->size - pi->size / 2;
843 
844 	if (ev >= 8)
845 		/* quota_max_* */
846 		ceph_decode_skip_n(p, end, 8 + 8, bad);
847 
848 	if (ev >= 9) {
849 		/* tiers */
850 		ceph_decode_skip_set(p, end, 64, bad);
851 
852 		ceph_decode_need(p, end, 8 + 1 + 8 + 8, bad);
853 		*p += 8;  /* skip tier_of */
854 		*p += 1;  /* skip cache_mode */
855 		pi->read_tier = ceph_decode_64(p);
856 		pi->write_tier = ceph_decode_64(p);
857 	} else {
858 		pi->read_tier = -1;
859 		pi->write_tier = -1;
860 	}
861 
862 	if (ev >= 10)
863 		/* properties */
864 		ceph_decode_skip_map(p, end, string, string, bad);
865 
866 	if (ev >= 11) {
867 		/* hit_set_params (with versions) */
868 		ceph_decode_skip_n(p, end, 2, bad);
869 		ceph_decode_skip_string(p, end, bad);
870 
871 		/* hit_set_period, hit_set_count */
872 		ceph_decode_skip_n(p, end, 4 + 4, bad);
873 	}
874 
875 	if (ev >= 12)
876 		/* stripe_width */
877 		ceph_decode_skip_32(p, end, bad);
878 
879 	if (ev >= 13)
880 		/* target_max_*, cache_target_*, cache_min_* */
881 		ceph_decode_skip_n(p, end, 16 + 8 + 8, bad);
882 
883 	if (ev >= 14)
884 		/* erasure_code_profile */
885 		ceph_decode_skip_string(p, end, bad);
886 
887 	/*
888 	 * last_force_op_resend_preluminous, will be overridden if the
889 	 * map was encoded with RESEND_ON_SPLIT
890 	 */
891 	if (ev >= 15)
892 		ceph_decode_32_safe(p, end, pi->last_force_request_resend, bad);
893 	else
894 		pi->last_force_request_resend = 0;
895 
896 	if (ev >= 16)
897 		/* min_read_recency_for_promote */
898 		ceph_decode_skip_32(p, end, bad);
899 
900 	if (ev >= 17)
901 		/* expected_num_objects */
902 		ceph_decode_skip_64(p, end, bad);
903 
904 	if (ev >= 19)
905 		/* cache_target_dirty_high_ratio_micro */
906 		ceph_decode_skip_32(p, end, bad);
907 
908 	if (ev >= 20)
909 		/* min_write_recency_for_promote */
910 		ceph_decode_skip_32(p, end, bad);
911 
912 	if (ev >= 21)
913 		/* use_gmt_hitset */
914 		ceph_decode_skip_8(p, end, bad);
915 
916 	if (ev >= 22)
917 		/* fast_read */
918 		ceph_decode_skip_8(p, end, bad);
919 
920 	if (ev >= 23)
921 		/* hit_set_grade_decay_rate, hit_set_search_last_n */
922 		ceph_decode_skip_n(p, end, 4 + 4, bad);
923 
924 	if (ev >= 24) {
925 		/* opts (with versions) */
926 		ceph_decode_skip_n(p, end, 2, bad);
927 		ceph_decode_skip_string(p, end, bad);
928 	}
929 
930 	if (ev >= 25)
931 		ceph_decode_32_safe(p, end, pi->last_force_request_resend, bad);
932 
933 	/* ignore the rest */
934 
935 	*p = pool_end;
936 	calc_pg_masks(pi);
937 	return 0;
938 
939 bad:
940 	return -EINVAL;
941 }
942 
943 static int decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
944 {
945 	struct ceph_pg_pool_info *pi;
946 	u32 num, len;
947 	u64 pool;
948 
949 	ceph_decode_32_safe(p, end, num, bad);
950 	dout(" %d pool names\n", num);
951 	while (num--) {
952 		ceph_decode_64_safe(p, end, pool, bad);
953 		ceph_decode_32_safe(p, end, len, bad);
954 		dout("  pool %llu len %d\n", pool, len);
955 		ceph_decode_need(p, end, len, bad);
956 		pi = lookup_pg_pool(&map->pg_pools, pool);
957 		if (pi) {
958 			char *name = kstrndup(*p, len, GFP_NOFS);
959 
960 			if (!name)
961 				return -ENOMEM;
962 			kfree(pi->name);
963 			pi->name = name;
964 			dout("  name is %s\n", pi->name);
965 		}
966 		*p += len;
967 	}
968 	return 0;
969 
970 bad:
971 	return -EINVAL;
972 }
973 
974 /*
975  * CRUSH workspaces
976  *
977  * workspace_manager framework borrowed from fs/btrfs/compression.c.
978  * Two simplifications: there is only one type of workspace and there
979  * is always at least one workspace.
980  */
981 static struct crush_work *alloc_workspace(const struct crush_map *c)
982 {
983 	struct crush_work *work;
984 	size_t work_size;
985 
986 	WARN_ON(!c->working_size);
987 	work_size = crush_work_size(c, CEPH_PG_MAX_SIZE);
988 	dout("%s work_size %zu bytes\n", __func__, work_size);
989 
990 	work = kvmalloc(work_size, GFP_NOIO);
991 	if (!work)
992 		return NULL;
993 
994 	INIT_LIST_HEAD(&work->item);
995 	crush_init_workspace(c, work);
996 	return work;
997 }
998 
999 static void free_workspace(struct crush_work *work)
1000 {
1001 	WARN_ON(!list_empty(&work->item));
1002 	kvfree(work);
1003 }
1004 
1005 static void init_workspace_manager(struct workspace_manager *wsm)
1006 {
1007 	INIT_LIST_HEAD(&wsm->idle_ws);
1008 	spin_lock_init(&wsm->ws_lock);
1009 	atomic_set(&wsm->total_ws, 0);
1010 	wsm->free_ws = 0;
1011 	init_waitqueue_head(&wsm->ws_wait);
1012 }
1013 
1014 static void add_initial_workspace(struct workspace_manager *wsm,
1015 				  struct crush_work *work)
1016 {
1017 	WARN_ON(!list_empty(&wsm->idle_ws));
1018 
1019 	list_add(&work->item, &wsm->idle_ws);
1020 	atomic_set(&wsm->total_ws, 1);
1021 	wsm->free_ws = 1;
1022 }
1023 
1024 static void cleanup_workspace_manager(struct workspace_manager *wsm)
1025 {
1026 	struct crush_work *work;
1027 
1028 	while (!list_empty(&wsm->idle_ws)) {
1029 		work = list_first_entry(&wsm->idle_ws, struct crush_work,
1030 					item);
1031 		list_del_init(&work->item);
1032 		free_workspace(work);
1033 	}
1034 	atomic_set(&wsm->total_ws, 0);
1035 	wsm->free_ws = 0;
1036 }
1037 
1038 /*
1039  * Finds an available workspace or allocates a new one.  If it's not
1040  * possible to allocate a new one, waits until there is one.
1041  */
1042 static struct crush_work *get_workspace(struct workspace_manager *wsm,
1043 					const struct crush_map *c)
1044 {
1045 	struct crush_work *work;
1046 	int cpus = num_online_cpus();
1047 
1048 again:
1049 	spin_lock(&wsm->ws_lock);
1050 	if (!list_empty(&wsm->idle_ws)) {
1051 		work = list_first_entry(&wsm->idle_ws, struct crush_work,
1052 					item);
1053 		list_del_init(&work->item);
1054 		wsm->free_ws--;
1055 		spin_unlock(&wsm->ws_lock);
1056 		return work;
1057 
1058 	}
1059 	if (atomic_read(&wsm->total_ws) > cpus) {
1060 		DEFINE_WAIT(wait);
1061 
1062 		spin_unlock(&wsm->ws_lock);
1063 		prepare_to_wait(&wsm->ws_wait, &wait, TASK_UNINTERRUPTIBLE);
1064 		if (atomic_read(&wsm->total_ws) > cpus && !wsm->free_ws)
1065 			schedule();
1066 		finish_wait(&wsm->ws_wait, &wait);
1067 		goto again;
1068 	}
1069 	atomic_inc(&wsm->total_ws);
1070 	spin_unlock(&wsm->ws_lock);
1071 
1072 	work = alloc_workspace(c);
1073 	if (!work) {
1074 		atomic_dec(&wsm->total_ws);
1075 		wake_up(&wsm->ws_wait);
1076 
1077 		/*
1078 		 * Do not return the error but go back to waiting.  We
1079 		 * have the initial workspace and the CRUSH computation
1080 		 * time is bounded so we will get it eventually.
1081 		 */
1082 		WARN_ON(atomic_read(&wsm->total_ws) < 1);
1083 		goto again;
1084 	}
1085 	return work;
1086 }
1087 
1088 /*
1089  * Puts a workspace back on the list or frees it if we have enough
1090  * idle ones sitting around.
1091  */
1092 static void put_workspace(struct workspace_manager *wsm,
1093 			  struct crush_work *work)
1094 {
1095 	spin_lock(&wsm->ws_lock);
1096 	if (wsm->free_ws <= num_online_cpus()) {
1097 		list_add(&work->item, &wsm->idle_ws);
1098 		wsm->free_ws++;
1099 		spin_unlock(&wsm->ws_lock);
1100 		goto wake;
1101 	}
1102 	spin_unlock(&wsm->ws_lock);
1103 
1104 	free_workspace(work);
1105 	atomic_dec(&wsm->total_ws);
1106 wake:
1107 	if (wq_has_sleeper(&wsm->ws_wait))
1108 		wake_up(&wsm->ws_wait);
1109 }
1110 
1111 /*
1112  * osd map
1113  */
1114 struct ceph_osdmap *ceph_osdmap_alloc(void)
1115 {
1116 	struct ceph_osdmap *map;
1117 
1118 	map = kzalloc_obj(*map, GFP_NOIO);
1119 	if (!map)
1120 		return NULL;
1121 
1122 	map->pg_pools = RB_ROOT;
1123 	map->pool_max = -1;
1124 	map->pg_temp = RB_ROOT;
1125 	map->primary_temp = RB_ROOT;
1126 	map->pg_upmap = RB_ROOT;
1127 	map->pg_upmap_items = RB_ROOT;
1128 
1129 	init_workspace_manager(&map->crush_wsm);
1130 
1131 	return map;
1132 }
1133 
1134 void ceph_osdmap_destroy(struct ceph_osdmap *map)
1135 {
1136 	dout("osdmap_destroy %p\n", map);
1137 
1138 	if (map->crush)
1139 		crush_destroy(map->crush);
1140 	cleanup_workspace_manager(&map->crush_wsm);
1141 
1142 	while (!RB_EMPTY_ROOT(&map->pg_temp)) {
1143 		struct ceph_pg_mapping *pg =
1144 			rb_entry(rb_first(&map->pg_temp),
1145 				 struct ceph_pg_mapping, node);
1146 		erase_pg_mapping(&map->pg_temp, pg);
1147 		free_pg_mapping(pg);
1148 	}
1149 	while (!RB_EMPTY_ROOT(&map->primary_temp)) {
1150 		struct ceph_pg_mapping *pg =
1151 			rb_entry(rb_first(&map->primary_temp),
1152 				 struct ceph_pg_mapping, node);
1153 		erase_pg_mapping(&map->primary_temp, pg);
1154 		free_pg_mapping(pg);
1155 	}
1156 	while (!RB_EMPTY_ROOT(&map->pg_upmap)) {
1157 		struct ceph_pg_mapping *pg =
1158 			rb_entry(rb_first(&map->pg_upmap),
1159 				 struct ceph_pg_mapping, node);
1160 		rb_erase(&pg->node, &map->pg_upmap);
1161 		kfree(pg);
1162 	}
1163 	while (!RB_EMPTY_ROOT(&map->pg_upmap_items)) {
1164 		struct ceph_pg_mapping *pg =
1165 			rb_entry(rb_first(&map->pg_upmap_items),
1166 				 struct ceph_pg_mapping, node);
1167 		rb_erase(&pg->node, &map->pg_upmap_items);
1168 		kfree(pg);
1169 	}
1170 	while (!RB_EMPTY_ROOT(&map->pg_pools)) {
1171 		struct ceph_pg_pool_info *pi =
1172 			rb_entry(rb_first(&map->pg_pools),
1173 				 struct ceph_pg_pool_info, node);
1174 		__remove_pg_pool(&map->pg_pools, pi);
1175 	}
1176 	kvfree(map->osd_state);
1177 	kvfree(map->osd_weight);
1178 	kvfree(map->osd_addr);
1179 	kvfree(map->osd_primary_affinity);
1180 	kfree(map);
1181 }
1182 
1183 /*
1184  * Adjust max_osd value, (re)allocate arrays.
1185  *
1186  * The new elements are properly initialized.
1187  */
1188 static int osdmap_set_max_osd(struct ceph_osdmap *map, u32 max)
1189 {
1190 	u32 *state;
1191 	u32 *weight;
1192 	struct ceph_entity_addr *addr;
1193 	u32 to_copy;
1194 	int i;
1195 
1196 	dout("%s old %u new %u\n", __func__, map->max_osd, max);
1197 	if (max == map->max_osd)
1198 		return 0;
1199 
1200 	state = kvmalloc(array_size(max, sizeof(*state)), GFP_NOFS);
1201 	weight = kvmalloc(array_size(max, sizeof(*weight)), GFP_NOFS);
1202 	addr = kvmalloc(array_size(max, sizeof(*addr)), GFP_NOFS);
1203 	if (!state || !weight || !addr) {
1204 		kvfree(state);
1205 		kvfree(weight);
1206 		kvfree(addr);
1207 		return -ENOMEM;
1208 	}
1209 
1210 	to_copy = min(map->max_osd, max);
1211 	if (map->osd_state) {
1212 		memcpy(state, map->osd_state, to_copy * sizeof(*state));
1213 		memcpy(weight, map->osd_weight, to_copy * sizeof(*weight));
1214 		memcpy(addr, map->osd_addr, to_copy * sizeof(*addr));
1215 		kvfree(map->osd_state);
1216 		kvfree(map->osd_weight);
1217 		kvfree(map->osd_addr);
1218 	}
1219 
1220 	map->osd_state = state;
1221 	map->osd_weight = weight;
1222 	map->osd_addr = addr;
1223 	for (i = map->max_osd; i < max; i++) {
1224 		map->osd_state[i] = 0;
1225 		map->osd_weight[i] = CEPH_OSD_OUT;
1226 		memset(map->osd_addr + i, 0, sizeof(*map->osd_addr));
1227 	}
1228 
1229 	if (map->osd_primary_affinity) {
1230 		u32 *affinity;
1231 
1232 		affinity = kvmalloc(array_size(max, sizeof(*affinity)),
1233 					 GFP_NOFS);
1234 		if (!affinity)
1235 			return -ENOMEM;
1236 
1237 		memcpy(affinity, map->osd_primary_affinity,
1238 		       to_copy * sizeof(*affinity));
1239 		kvfree(map->osd_primary_affinity);
1240 
1241 		map->osd_primary_affinity = affinity;
1242 		for (i = map->max_osd; i < max; i++)
1243 			map->osd_primary_affinity[i] =
1244 			    CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
1245 	}
1246 
1247 	map->max_osd = max;
1248 
1249 	return 0;
1250 }
1251 
1252 static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush)
1253 {
1254 	struct crush_work *work;
1255 
1256 	if (IS_ERR(crush))
1257 		return PTR_ERR(crush);
1258 
1259 	work = alloc_workspace(crush);
1260 	if (!work) {
1261 		crush_destroy(crush);
1262 		return -ENOMEM;
1263 	}
1264 
1265 	if (map->crush)
1266 		crush_destroy(map->crush);
1267 	cleanup_workspace_manager(&map->crush_wsm);
1268 	map->crush = crush;
1269 	add_initial_workspace(&map->crush_wsm, work);
1270 	return 0;
1271 }
1272 
1273 #define OSDMAP_WRAPPER_COMPAT_VER	7
1274 #define OSDMAP_CLIENT_DATA_COMPAT_VER	1
1275 
1276 /*
1277  * Return 0 or error.  On success, *v is set to 0 for old (v6) osdmaps,
1278  * to struct_v of the client_data section for new (v7 and above)
1279  * osdmaps.
1280  */
1281 static int get_osdmap_client_data_v(void **p, void *end,
1282 				    const char *prefix, u8 *v)
1283 {
1284 	u8 struct_v;
1285 
1286 	ceph_decode_8_safe(p, end, struct_v, e_inval);
1287 	if (struct_v >= 7) {
1288 		u8 struct_compat;
1289 
1290 		ceph_decode_8_safe(p, end, struct_compat, e_inval);
1291 		if (struct_compat > OSDMAP_WRAPPER_COMPAT_VER) {
1292 			pr_warn("got v %d cv %d > %d of %s ceph_osdmap\n",
1293 				struct_v, struct_compat,
1294 				OSDMAP_WRAPPER_COMPAT_VER, prefix);
1295 			return -EINVAL;
1296 		}
1297 		*p += 4; /* ignore wrapper struct_len */
1298 
1299 		ceph_decode_8_safe(p, end, struct_v, e_inval);
1300 		ceph_decode_8_safe(p, end, struct_compat, e_inval);
1301 		if (struct_compat > OSDMAP_CLIENT_DATA_COMPAT_VER) {
1302 			pr_warn("got v %d cv %d > %d of %s ceph_osdmap client data\n",
1303 				struct_v, struct_compat,
1304 				OSDMAP_CLIENT_DATA_COMPAT_VER, prefix);
1305 			return -EINVAL;
1306 		}
1307 		*p += 4; /* ignore client data struct_len */
1308 	} else {
1309 		u16 version;
1310 
1311 		*p -= 1;
1312 		ceph_decode_16_safe(p, end, version, e_inval);
1313 		if (version < 6) {
1314 			pr_warn("got v %d < 6 of %s ceph_osdmap\n",
1315 				version, prefix);
1316 			return -EINVAL;
1317 		}
1318 
1319 		/* old osdmap encoding */
1320 		struct_v = 0;
1321 	}
1322 
1323 	*v = struct_v;
1324 	return 0;
1325 
1326 e_inval:
1327 	return -EINVAL;
1328 }
1329 
1330 static int __decode_pools(void **p, void *end, struct ceph_osdmap *map,
1331 			  bool incremental)
1332 {
1333 	u32 n;
1334 
1335 	ceph_decode_32_safe(p, end, n, e_inval);
1336 	while (n--) {
1337 		struct ceph_pg_pool_info *pi;
1338 		u64 pool;
1339 		int ret;
1340 
1341 		ceph_decode_64_safe(p, end, pool, e_inval);
1342 
1343 		pi = lookup_pg_pool(&map->pg_pools, pool);
1344 		if (!incremental || !pi) {
1345 			pi = kzalloc_obj(*pi, GFP_NOFS);
1346 			if (!pi)
1347 				return -ENOMEM;
1348 
1349 			RB_CLEAR_NODE(&pi->node);
1350 			pi->id = pool;
1351 
1352 			if (!__insert_pg_pool(&map->pg_pools, pi)) {
1353 				kfree(pi);
1354 				return -EEXIST;
1355 			}
1356 		}
1357 
1358 		ret = decode_pool(p, end, pi);
1359 		if (ret)
1360 			return ret;
1361 	}
1362 
1363 	return 0;
1364 
1365 e_inval:
1366 	return -EINVAL;
1367 }
1368 
1369 static int decode_pools(void **p, void *end, struct ceph_osdmap *map)
1370 {
1371 	return __decode_pools(p, end, map, false);
1372 }
1373 
1374 static int decode_new_pools(void **p, void *end, struct ceph_osdmap *map)
1375 {
1376 	return __decode_pools(p, end, map, true);
1377 }
1378 
1379 typedef struct ceph_pg_mapping *(*decode_mapping_fn_t)(void **, void *, bool);
1380 
1381 static int decode_pg_mapping(void **p, void *end, struct rb_root *mapping_root,
1382 			     decode_mapping_fn_t fn, bool incremental)
1383 {
1384 	u32 n;
1385 
1386 	WARN_ON(!incremental && !fn);
1387 
1388 	ceph_decode_32_safe(p, end, n, e_inval);
1389 	while (n--) {
1390 		struct ceph_pg_mapping *pg;
1391 		struct ceph_pg pgid;
1392 		int ret;
1393 
1394 		ret = ceph_decode_pgid(p, end, &pgid);
1395 		if (ret)
1396 			return ret;
1397 
1398 		pg = lookup_pg_mapping(mapping_root, &pgid);
1399 		if (pg) {
1400 			WARN_ON(!incremental);
1401 			erase_pg_mapping(mapping_root, pg);
1402 			free_pg_mapping(pg);
1403 		}
1404 
1405 		if (fn) {
1406 			pg = fn(p, end, incremental);
1407 			if (IS_ERR(pg))
1408 				return PTR_ERR(pg);
1409 
1410 			if (pg) {
1411 				pg->pgid = pgid; /* struct */
1412 				insert_pg_mapping(mapping_root, pg);
1413 			}
1414 		}
1415 	}
1416 
1417 	return 0;
1418 
1419 e_inval:
1420 	return -EINVAL;
1421 }
1422 
1423 static struct ceph_pg_mapping *__decode_pg_temp(void **p, void *end,
1424 						bool incremental)
1425 {
1426 	struct ceph_pg_mapping *pg;
1427 	u32 len, i;
1428 
1429 	ceph_decode_32_safe(p, end, len, e_inval);
1430 	if (len == 0 && incremental)
1431 		return NULL;	/* new_pg_temp: [] to remove */
1432 	if ((size_t)len > (SIZE_MAX - sizeof(*pg)) / sizeof(u32))
1433 		return ERR_PTR(-EINVAL);
1434 
1435 	ceph_decode_need(p, end, len * sizeof(u32), e_inval);
1436 	pg = alloc_pg_mapping(len * sizeof(u32));
1437 	if (!pg)
1438 		return ERR_PTR(-ENOMEM);
1439 
1440 	pg->pg_temp.len = len;
1441 	for (i = 0; i < len; i++)
1442 		pg->pg_temp.osds[i] = ceph_decode_32(p);
1443 
1444 	return pg;
1445 
1446 e_inval:
1447 	return ERR_PTR(-EINVAL);
1448 }
1449 
1450 static int decode_pg_temp(void **p, void *end, struct ceph_osdmap *map)
1451 {
1452 	return decode_pg_mapping(p, end, &map->pg_temp, __decode_pg_temp,
1453 				 false);
1454 }
1455 
1456 static int decode_new_pg_temp(void **p, void *end, struct ceph_osdmap *map)
1457 {
1458 	return decode_pg_mapping(p, end, &map->pg_temp, __decode_pg_temp,
1459 				 true);
1460 }
1461 
1462 static struct ceph_pg_mapping *__decode_primary_temp(void **p, void *end,
1463 						     bool incremental)
1464 {
1465 	struct ceph_pg_mapping *pg;
1466 	u32 osd;
1467 
1468 	ceph_decode_32_safe(p, end, osd, e_inval);
1469 	if (osd == (u32)-1 && incremental)
1470 		return NULL;	/* new_primary_temp: -1 to remove */
1471 
1472 	pg = alloc_pg_mapping(0);
1473 	if (!pg)
1474 		return ERR_PTR(-ENOMEM);
1475 
1476 	pg->primary_temp.osd = osd;
1477 	return pg;
1478 
1479 e_inval:
1480 	return ERR_PTR(-EINVAL);
1481 }
1482 
1483 static int decode_primary_temp(void **p, void *end, struct ceph_osdmap *map)
1484 {
1485 	return decode_pg_mapping(p, end, &map->primary_temp,
1486 				 __decode_primary_temp, false);
1487 }
1488 
1489 static int decode_new_primary_temp(void **p, void *end,
1490 				   struct ceph_osdmap *map)
1491 {
1492 	return decode_pg_mapping(p, end, &map->primary_temp,
1493 				 __decode_primary_temp, true);
1494 }
1495 
1496 u32 ceph_get_primary_affinity(struct ceph_osdmap *map, int osd)
1497 {
1498 	if (!map->osd_primary_affinity)
1499 		return CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
1500 
1501 	return map->osd_primary_affinity[osd];
1502 }
1503 
1504 static int set_primary_affinity(struct ceph_osdmap *map, int osd, u32 aff)
1505 {
1506 	if (!map->osd_primary_affinity) {
1507 		int i;
1508 
1509 		map->osd_primary_affinity = kvmalloc(
1510 		    array_size(map->max_osd, sizeof(*map->osd_primary_affinity)),
1511 		    GFP_NOFS);
1512 		if (!map->osd_primary_affinity)
1513 			return -ENOMEM;
1514 
1515 		for (i = 0; i < map->max_osd; i++)
1516 			map->osd_primary_affinity[i] =
1517 			    CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
1518 	}
1519 
1520 	map->osd_primary_affinity[osd] = aff;
1521 
1522 	return 0;
1523 }
1524 
1525 static int decode_primary_affinity(void **p, void *end,
1526 				   struct ceph_osdmap *map)
1527 {
1528 	u32 len, i;
1529 
1530 	ceph_decode_32_safe(p, end, len, e_inval);
1531 	if (len == 0) {
1532 		kvfree(map->osd_primary_affinity);
1533 		map->osd_primary_affinity = NULL;
1534 		return 0;
1535 	}
1536 	if (len != map->max_osd)
1537 		goto e_inval;
1538 
1539 	ceph_decode_need(p, end, map->max_osd*sizeof(u32), e_inval);
1540 
1541 	for (i = 0; i < map->max_osd; i++) {
1542 		int ret;
1543 
1544 		ret = set_primary_affinity(map, i, ceph_decode_32(p));
1545 		if (ret)
1546 			return ret;
1547 	}
1548 
1549 	return 0;
1550 
1551 e_inval:
1552 	return -EINVAL;
1553 }
1554 
1555 static int decode_new_primary_affinity(void **p, void *end,
1556 				       struct ceph_osdmap *map)
1557 {
1558 	u32 n;
1559 
1560 	ceph_decode_32_safe(p, end, n, e_inval);
1561 	while (n--) {
1562 		u32 osd, aff;
1563 		int ret;
1564 
1565 		ceph_decode_32_safe(p, end, osd, e_inval);
1566 		ceph_decode_32_safe(p, end, aff, e_inval);
1567 		if (osd >= map->max_osd)
1568 			goto e_inval;
1569 
1570 		ret = set_primary_affinity(map, osd, aff);
1571 		if (ret)
1572 			return ret;
1573 
1574 		osdmap_info(map, "osd%d primary-affinity 0x%x\n", osd, aff);
1575 	}
1576 
1577 	return 0;
1578 
1579 e_inval:
1580 	return -EINVAL;
1581 }
1582 
1583 static struct ceph_pg_mapping *__decode_pg_upmap(void **p, void *end,
1584 						 bool __unused)
1585 {
1586 	return __decode_pg_temp(p, end, false);
1587 }
1588 
1589 static int decode_pg_upmap(void **p, void *end, struct ceph_osdmap *map)
1590 {
1591 	return decode_pg_mapping(p, end, &map->pg_upmap, __decode_pg_upmap,
1592 				 false);
1593 }
1594 
1595 static int decode_new_pg_upmap(void **p, void *end, struct ceph_osdmap *map)
1596 {
1597 	return decode_pg_mapping(p, end, &map->pg_upmap, __decode_pg_upmap,
1598 				 true);
1599 }
1600 
1601 static int decode_old_pg_upmap(void **p, void *end, struct ceph_osdmap *map)
1602 {
1603 	return decode_pg_mapping(p, end, &map->pg_upmap, NULL, true);
1604 }
1605 
1606 static struct ceph_pg_mapping *__decode_pg_upmap_items(void **p, void *end,
1607 						       bool __unused)
1608 {
1609 	struct ceph_pg_mapping *pg;
1610 	u32 len, i;
1611 
1612 	ceph_decode_32_safe(p, end, len, e_inval);
1613 	if ((size_t)len > (SIZE_MAX - sizeof(*pg)) / (2 * sizeof(u32)))
1614 		return ERR_PTR(-EINVAL);
1615 
1616 	ceph_decode_need(p, end, 2 * len * sizeof(u32), e_inval);
1617 	pg = alloc_pg_mapping(2 * len * sizeof(u32));
1618 	if (!pg)
1619 		return ERR_PTR(-ENOMEM);
1620 
1621 	pg->pg_upmap_items.len = len;
1622 	for (i = 0; i < len; i++) {
1623 		pg->pg_upmap_items.from_to[i][0] = ceph_decode_32(p);
1624 		pg->pg_upmap_items.from_to[i][1] = ceph_decode_32(p);
1625 	}
1626 
1627 	return pg;
1628 
1629 e_inval:
1630 	return ERR_PTR(-EINVAL);
1631 }
1632 
1633 static int decode_pg_upmap_items(void **p, void *end, struct ceph_osdmap *map)
1634 {
1635 	return decode_pg_mapping(p, end, &map->pg_upmap_items,
1636 				 __decode_pg_upmap_items, false);
1637 }
1638 
1639 static int decode_new_pg_upmap_items(void **p, void *end,
1640 				     struct ceph_osdmap *map)
1641 {
1642 	return decode_pg_mapping(p, end, &map->pg_upmap_items,
1643 				 __decode_pg_upmap_items, true);
1644 }
1645 
1646 static int decode_old_pg_upmap_items(void **p, void *end,
1647 				     struct ceph_osdmap *map)
1648 {
1649 	return decode_pg_mapping(p, end, &map->pg_upmap_items, NULL, true);
1650 }
1651 
1652 /*
1653  * decode a full map.
1654  */
1655 static int osdmap_decode(void **p, void *end, bool msgr2,
1656 			 struct ceph_osdmap *map)
1657 {
1658 	u8 struct_v;
1659 	u32 epoch = 0;
1660 	void *start = *p;
1661 	u32 max;
1662 	u32 len, i;
1663 	int err;
1664 
1665 	dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p));
1666 
1667 	err = get_osdmap_client_data_v(p, end, "full", &struct_v);
1668 	if (err)
1669 		goto bad;
1670 
1671 	/* fsid, epoch, created, modified */
1672 	ceph_decode_need(p, end, sizeof(map->fsid) + sizeof(u32) +
1673 			 sizeof(map->created) + sizeof(map->modified), e_inval);
1674 	ceph_decode_copy(p, &map->fsid, sizeof(map->fsid));
1675 	epoch = map->epoch = ceph_decode_32(p);
1676 	ceph_decode_copy(p, &map->created, sizeof(map->created));
1677 	ceph_decode_copy(p, &map->modified, sizeof(map->modified));
1678 
1679 	/* pools */
1680 	err = decode_pools(p, end, map);
1681 	if (err)
1682 		goto bad;
1683 
1684 	/* pool_name */
1685 	err = decode_pool_names(p, end, map);
1686 	if (err)
1687 		goto bad;
1688 
1689 	ceph_decode_32_safe(p, end, map->pool_max, e_inval);
1690 
1691 	ceph_decode_32_safe(p, end, map->flags, e_inval);
1692 
1693 	/* max_osd */
1694 	ceph_decode_32_safe(p, end, max, e_inval);
1695 
1696 	/* (re)alloc osd arrays */
1697 	err = osdmap_set_max_osd(map, max);
1698 	if (err)
1699 		goto bad;
1700 
1701 	/* osd_state, osd_weight, osd_addrs->client_addr */
1702 	ceph_decode_need(p, end, 3*sizeof(u32) +
1703 			 map->max_osd*(struct_v >= 5 ? sizeof(u32) :
1704 						       sizeof(u8)) +
1705 				       sizeof(*map->osd_weight), e_inval);
1706 	if (ceph_decode_32(p) != map->max_osd)
1707 		goto e_inval;
1708 
1709 	if (struct_v >= 5) {
1710 		for (i = 0; i < map->max_osd; i++)
1711 			map->osd_state[i] = ceph_decode_32(p);
1712 	} else {
1713 		for (i = 0; i < map->max_osd; i++)
1714 			map->osd_state[i] = ceph_decode_8(p);
1715 	}
1716 
1717 	if (ceph_decode_32(p) != map->max_osd)
1718 		goto e_inval;
1719 
1720 	for (i = 0; i < map->max_osd; i++)
1721 		map->osd_weight[i] = ceph_decode_32(p);
1722 
1723 	if (ceph_decode_32(p) != map->max_osd)
1724 		goto e_inval;
1725 
1726 	for (i = 0; i < map->max_osd; i++) {
1727 		struct ceph_entity_addr *addr = &map->osd_addr[i];
1728 
1729 		if (struct_v >= 8)
1730 			err = ceph_decode_entity_addrvec(p, end, msgr2, addr);
1731 		else
1732 			err = ceph_decode_entity_addr(p, end, addr);
1733 		if (err)
1734 			goto bad;
1735 
1736 		dout("%s osd%d addr %s\n", __func__, i, ceph_pr_addr(addr));
1737 	}
1738 
1739 	/* pg_temp */
1740 	err = decode_pg_temp(p, end, map);
1741 	if (err)
1742 		goto bad;
1743 
1744 	/* primary_temp */
1745 	if (struct_v >= 1) {
1746 		err = decode_primary_temp(p, end, map);
1747 		if (err)
1748 			goto bad;
1749 	}
1750 
1751 	/* primary_affinity */
1752 	if (struct_v >= 2) {
1753 		err = decode_primary_affinity(p, end, map);
1754 		if (err)
1755 			goto bad;
1756 	} else {
1757 		WARN_ON(map->osd_primary_affinity);
1758 	}
1759 
1760 	/* crush */
1761 	ceph_decode_32_safe(p, end, len, e_inval);
1762 	err = osdmap_set_crush(map, crush_decode(*p, min(*p + len, end)));
1763 	if (err)
1764 		goto bad;
1765 
1766 	*p += len;
1767 	if (struct_v >= 3) {
1768 		/* erasure_code_profiles */
1769 		ceph_decode_skip_map_of_map(p, end, string, string, string,
1770 					    e_inval);
1771 	}
1772 
1773 	if (struct_v >= 4) {
1774 		err = decode_pg_upmap(p, end, map);
1775 		if (err)
1776 			goto bad;
1777 
1778 		err = decode_pg_upmap_items(p, end, map);
1779 		if (err)
1780 			goto bad;
1781 	} else {
1782 		WARN_ON(!RB_EMPTY_ROOT(&map->pg_upmap));
1783 		WARN_ON(!RB_EMPTY_ROOT(&map->pg_upmap_items));
1784 	}
1785 
1786 	/* ignore the rest */
1787 	*p = end;
1788 
1789 	dout("full osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd);
1790 	return 0;
1791 
1792 e_inval:
1793 	err = -EINVAL;
1794 bad:
1795 	pr_err("corrupt full osdmap (%d) epoch %d off %d (%p of %p-%p)\n",
1796 	       err, epoch, (int)(*p - start), *p, start, end);
1797 	print_hex_dump(KERN_DEBUG, "osdmap: ",
1798 		       DUMP_PREFIX_OFFSET, 16, 1,
1799 		       start, end - start, true);
1800 	return err;
1801 }
1802 
1803 /*
1804  * Allocate and decode a full map.
1805  */
1806 struct ceph_osdmap *ceph_osdmap_decode(void **p, void *end, bool msgr2)
1807 {
1808 	struct ceph_osdmap *map;
1809 	int ret;
1810 
1811 	map = ceph_osdmap_alloc();
1812 	if (!map)
1813 		return ERR_PTR(-ENOMEM);
1814 
1815 	ret = osdmap_decode(p, end, msgr2, map);
1816 	if (ret) {
1817 		ceph_osdmap_destroy(map);
1818 		return ERR_PTR(ret);
1819 	}
1820 
1821 	return map;
1822 }
1823 
1824 /*
1825  * Encoding order is (new_up_client, new_state, new_weight).  Need to
1826  * apply in the (new_weight, new_state, new_up_client) order, because
1827  * an incremental map may look like e.g.
1828  *
1829  *     new_up_client: { osd=6, addr=... } # set osd_state and addr
1830  *     new_state: { osd=6, xorstate=EXISTS } # clear osd_state
1831  */
1832 static int decode_new_up_state_weight(void **p, void *end, u8 struct_v,
1833 				      bool msgr2, struct ceph_osdmap *map)
1834 {
1835 	void *new_up_client;
1836 	void *new_state;
1837 	void *new_weight_end;
1838 	u32 len;
1839 	int ret;
1840 	int i;
1841 
1842 	new_up_client = *p;
1843 	ceph_decode_32_safe(p, end, len, e_inval);
1844 	for (i = 0; i < len; ++i) {
1845 		struct ceph_entity_addr addr;
1846 
1847 		ceph_decode_skip_32(p, end, e_inval);
1848 		if (struct_v >= 7)
1849 			ret = ceph_decode_entity_addrvec(p, end, msgr2, &addr);
1850 		else
1851 			ret = ceph_decode_entity_addr(p, end, &addr);
1852 		if (ret)
1853 			return ret;
1854 	}
1855 
1856 	new_state = *p;
1857 	ceph_decode_32_safe(p, end, len, e_inval);
1858 	len *= sizeof(u32) + (struct_v >= 5 ? sizeof(u32) : sizeof(u8));
1859 	ceph_decode_need(p, end, len, e_inval);
1860 	*p += len;
1861 
1862 	/* new_weight */
1863 	ceph_decode_32_safe(p, end, len, e_inval);
1864 	while (len--) {
1865 		s32 osd;
1866 		u32 w;
1867 
1868 		ceph_decode_need(p, end, 2*sizeof(u32), e_inval);
1869 		osd = ceph_decode_32(p);
1870 		w = ceph_decode_32(p);
1871 		if (osd >= map->max_osd)
1872 			goto e_inval;
1873 
1874 		osdmap_info(map, "osd%d weight 0x%x %s\n", osd, w,
1875 			    w == CEPH_OSD_IN ? "(in)" :
1876 			    (w == CEPH_OSD_OUT ? "(out)" : ""));
1877 		map->osd_weight[osd] = w;
1878 
1879 		/*
1880 		 * If we are marking in, set the EXISTS, and clear the
1881 		 * AUTOOUT and NEW bits.
1882 		 */
1883 		if (w) {
1884 			map->osd_state[osd] |= CEPH_OSD_EXISTS;
1885 			map->osd_state[osd] &= ~(CEPH_OSD_AUTOOUT |
1886 						 CEPH_OSD_NEW);
1887 		}
1888 	}
1889 	new_weight_end = *p;
1890 
1891 	/* new_state (up/down) */
1892 	*p = new_state;
1893 	len = ceph_decode_32(p);
1894 	while (len--) {
1895 		s32 osd;
1896 		u32 xorstate;
1897 
1898 		osd = ceph_decode_32(p);
1899 		if (osd >= map->max_osd)
1900 			goto e_inval;
1901 
1902 		if (struct_v >= 5)
1903 			xorstate = ceph_decode_32(p);
1904 		else
1905 			xorstate = ceph_decode_8(p);
1906 		if (xorstate == 0)
1907 			xorstate = CEPH_OSD_UP;
1908 		if ((map->osd_state[osd] & CEPH_OSD_UP) &&
1909 		    (xorstate & CEPH_OSD_UP))
1910 			osdmap_info(map, "osd%d down\n", osd);
1911 		if ((map->osd_state[osd] & CEPH_OSD_EXISTS) &&
1912 		    (xorstate & CEPH_OSD_EXISTS)) {
1913 			osdmap_info(map, "osd%d does not exist\n", osd);
1914 			ret = set_primary_affinity(map, osd,
1915 						   CEPH_OSD_DEFAULT_PRIMARY_AFFINITY);
1916 			if (ret)
1917 				return ret;
1918 			memset(map->osd_addr + osd, 0, sizeof(*map->osd_addr));
1919 			map->osd_state[osd] = 0;
1920 		} else {
1921 			map->osd_state[osd] ^= xorstate;
1922 		}
1923 	}
1924 
1925 	/* new_up_client */
1926 	*p = new_up_client;
1927 	len = ceph_decode_32(p);
1928 	while (len--) {
1929 		s32 osd;
1930 		struct ceph_entity_addr addr;
1931 
1932 		osd = ceph_decode_32(p);
1933 		if (osd >= map->max_osd)
1934 			goto e_inval;
1935 
1936 		if (struct_v >= 7)
1937 			ret = ceph_decode_entity_addrvec(p, end, msgr2, &addr);
1938 		else
1939 			ret = ceph_decode_entity_addr(p, end, &addr);
1940 		if (ret)
1941 			return ret;
1942 
1943 		dout("%s osd%d addr %s\n", __func__, osd, ceph_pr_addr(&addr));
1944 
1945 		osdmap_info(map, "osd%d up\n", osd);
1946 		map->osd_state[osd] |= CEPH_OSD_EXISTS | CEPH_OSD_UP;
1947 		map->osd_addr[osd] = addr;
1948 	}
1949 
1950 	*p = new_weight_end;
1951 	return 0;
1952 
1953 e_inval:
1954 	return -EINVAL;
1955 }
1956 
1957 /*
1958  * decode and apply an incremental map update.
1959  */
1960 struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, bool msgr2,
1961 					     struct ceph_osdmap *map)
1962 {
1963 	struct ceph_fsid fsid;
1964 	u32 epoch = 0;
1965 	struct ceph_timespec modified;
1966 	s32 len;
1967 	u64 pool;
1968 	__s64 new_pool_max;
1969 	__s32 new_flags, max;
1970 	void *start = *p;
1971 	int err;
1972 	u8 struct_v;
1973 
1974 	dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p));
1975 
1976 	err = get_osdmap_client_data_v(p, end, "inc", &struct_v);
1977 	if (err)
1978 		goto bad;
1979 
1980 	/* fsid, epoch, modified, new_pool_max, new_flags */
1981 	ceph_decode_need(p, end, sizeof(fsid) + sizeof(u32) + sizeof(modified) +
1982 			 sizeof(u64) + sizeof(u32), e_inval);
1983 	ceph_decode_copy(p, &fsid, sizeof(fsid));
1984 	epoch = ceph_decode_32(p);
1985 	ceph_decode_copy(p, &modified, sizeof(modified));
1986 	new_pool_max = ceph_decode_64(p);
1987 	new_flags = ceph_decode_32(p);
1988 
1989 	if (epoch != map->epoch + 1)
1990 		goto e_inval;
1991 
1992 	/* full map? */
1993 	ceph_decode_32_safe(p, end, len, e_inval);
1994 	if (len > 0) {
1995 		dout("apply_incremental full map len %d, %p to %p\n",
1996 		     len, *p, end);
1997 		return ceph_osdmap_decode(p, min(*p+len, end), msgr2);
1998 	}
1999 
2000 	/* new crush? */
2001 	ceph_decode_32_safe(p, end, len, e_inval);
2002 	if (len > 0) {
2003 		err = osdmap_set_crush(map,
2004 				       crush_decode(*p, min(*p + len, end)));
2005 		if (err)
2006 			goto bad;
2007 		*p += len;
2008 	}
2009 
2010 	/* new flags? */
2011 	if (new_flags >= 0)
2012 		map->flags = new_flags;
2013 	if (new_pool_max >= 0)
2014 		map->pool_max = new_pool_max;
2015 
2016 	/* new max? */
2017 	ceph_decode_32_safe(p, end, max, e_inval);
2018 	if (max >= 0) {
2019 		err = osdmap_set_max_osd(map, max);
2020 		if (err)
2021 			goto bad;
2022 	}
2023 
2024 	map->epoch++;
2025 	map->modified = modified;
2026 
2027 	/* new_pools */
2028 	err = decode_new_pools(p, end, map);
2029 	if (err)
2030 		goto bad;
2031 
2032 	/* new_pool_names */
2033 	err = decode_pool_names(p, end, map);
2034 	if (err)
2035 		goto bad;
2036 
2037 	/* old_pool */
2038 	ceph_decode_32_safe(p, end, len, e_inval);
2039 	while (len--) {
2040 		struct ceph_pg_pool_info *pi;
2041 
2042 		ceph_decode_64_safe(p, end, pool, e_inval);
2043 		pi = lookup_pg_pool(&map->pg_pools, pool);
2044 		if (pi)
2045 			__remove_pg_pool(&map->pg_pools, pi);
2046 	}
2047 
2048 	/* new_up_client, new_state, new_weight */
2049 	err = decode_new_up_state_weight(p, end, struct_v, msgr2, map);
2050 	if (err)
2051 		goto bad;
2052 
2053 	/* new_pg_temp */
2054 	err = decode_new_pg_temp(p, end, map);
2055 	if (err)
2056 		goto bad;
2057 
2058 	/* new_primary_temp */
2059 	if (struct_v >= 1) {
2060 		err = decode_new_primary_temp(p, end, map);
2061 		if (err)
2062 			goto bad;
2063 	}
2064 
2065 	/* new_primary_affinity */
2066 	if (struct_v >= 2) {
2067 		err = decode_new_primary_affinity(p, end, map);
2068 		if (err)
2069 			goto bad;
2070 	}
2071 
2072 	if (struct_v >= 3) {
2073 		/* new_erasure_code_profiles */
2074 		ceph_decode_skip_map_of_map(p, end, string, string, string,
2075 					    e_inval);
2076 		/* old_erasure_code_profiles */
2077 		ceph_decode_skip_set(p, end, string, e_inval);
2078 	}
2079 
2080 	if (struct_v >= 4) {
2081 		err = decode_new_pg_upmap(p, end, map);
2082 		if (err)
2083 			goto bad;
2084 
2085 		err = decode_old_pg_upmap(p, end, map);
2086 		if (err)
2087 			goto bad;
2088 
2089 		err = decode_new_pg_upmap_items(p, end, map);
2090 		if (err)
2091 			goto bad;
2092 
2093 		err = decode_old_pg_upmap_items(p, end, map);
2094 		if (err)
2095 			goto bad;
2096 	}
2097 
2098 	/* ignore the rest */
2099 	*p = end;
2100 
2101 	dout("inc osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd);
2102 	return map;
2103 
2104 e_inval:
2105 	err = -EINVAL;
2106 bad:
2107 	pr_err("corrupt inc osdmap (%d) epoch %d off %d (%p of %p-%p)\n",
2108 	       err, epoch, (int)(*p - start), *p, start, end);
2109 	print_hex_dump(KERN_DEBUG, "osdmap: ",
2110 		       DUMP_PREFIX_OFFSET, 16, 1,
2111 		       start, end - start, true);
2112 	return ERR_PTR(err);
2113 }
2114 
2115 void ceph_oloc_copy(struct ceph_object_locator *dest,
2116 		    const struct ceph_object_locator *src)
2117 {
2118 	ceph_oloc_destroy(dest);
2119 
2120 	dest->pool = src->pool;
2121 	if (src->pool_ns)
2122 		dest->pool_ns = ceph_get_string(src->pool_ns);
2123 	else
2124 		dest->pool_ns = NULL;
2125 }
2126 EXPORT_SYMBOL(ceph_oloc_copy);
2127 
2128 void ceph_oloc_destroy(struct ceph_object_locator *oloc)
2129 {
2130 	ceph_put_string(oloc->pool_ns);
2131 }
2132 EXPORT_SYMBOL(ceph_oloc_destroy);
2133 
2134 void ceph_oid_copy(struct ceph_object_id *dest,
2135 		   const struct ceph_object_id *src)
2136 {
2137 	ceph_oid_destroy(dest);
2138 
2139 	if (src->name != src->inline_name) {
2140 		/* very rare, see ceph_object_id definition */
2141 		dest->name = kmalloc(src->name_len + 1,
2142 				     GFP_NOIO | __GFP_NOFAIL);
2143 	} else {
2144 		dest->name = dest->inline_name;
2145 	}
2146 	memcpy(dest->name, src->name, src->name_len + 1);
2147 	dest->name_len = src->name_len;
2148 }
2149 EXPORT_SYMBOL(ceph_oid_copy);
2150 
2151 static __printf(2, 0)
2152 int oid_printf_vargs(struct ceph_object_id *oid, const char *fmt, va_list ap)
2153 {
2154 	int len;
2155 
2156 	WARN_ON(!ceph_oid_empty(oid));
2157 
2158 	len = vsnprintf(oid->inline_name, sizeof(oid->inline_name), fmt, ap);
2159 	if (len >= sizeof(oid->inline_name))
2160 		return len;
2161 
2162 	oid->name_len = len;
2163 	return 0;
2164 }
2165 
2166 /*
2167  * If oid doesn't fit into inline buffer, BUG.
2168  */
2169 void ceph_oid_printf(struct ceph_object_id *oid, const char *fmt, ...)
2170 {
2171 	va_list ap;
2172 
2173 	va_start(ap, fmt);
2174 	BUG_ON(oid_printf_vargs(oid, fmt, ap));
2175 	va_end(ap);
2176 }
2177 EXPORT_SYMBOL(ceph_oid_printf);
2178 
2179 static __printf(3, 0)
2180 int oid_aprintf_vargs(struct ceph_object_id *oid, gfp_t gfp,
2181 		      const char *fmt, va_list ap)
2182 {
2183 	va_list aq;
2184 	int len;
2185 
2186 	va_copy(aq, ap);
2187 	len = oid_printf_vargs(oid, fmt, aq);
2188 	va_end(aq);
2189 
2190 	if (len) {
2191 		char *external_name;
2192 
2193 		external_name = kmalloc(len + 1, gfp);
2194 		if (!external_name)
2195 			return -ENOMEM;
2196 
2197 		oid->name = external_name;
2198 		WARN_ON(vsnprintf(oid->name, len + 1, fmt, ap) != len);
2199 		oid->name_len = len;
2200 	}
2201 
2202 	return 0;
2203 }
2204 
2205 /*
2206  * If oid doesn't fit into inline buffer, allocate.
2207  */
2208 int ceph_oid_aprintf(struct ceph_object_id *oid, gfp_t gfp,
2209 		     const char *fmt, ...)
2210 {
2211 	va_list ap;
2212 	int ret;
2213 
2214 	va_start(ap, fmt);
2215 	ret = oid_aprintf_vargs(oid, gfp, fmt, ap);
2216 	va_end(ap);
2217 
2218 	return ret;
2219 }
2220 EXPORT_SYMBOL(ceph_oid_aprintf);
2221 
2222 void ceph_oid_destroy(struct ceph_object_id *oid)
2223 {
2224 	if (oid->name != oid->inline_name)
2225 		kfree(oid->name);
2226 }
2227 EXPORT_SYMBOL(ceph_oid_destroy);
2228 
2229 /*
2230  * osds only
2231  */
2232 static bool __osds_equal(const struct ceph_osds *lhs,
2233 			 const struct ceph_osds *rhs)
2234 {
2235 	if (lhs->size == rhs->size &&
2236 	    !memcmp(lhs->osds, rhs->osds, rhs->size * sizeof(rhs->osds[0])))
2237 		return true;
2238 
2239 	return false;
2240 }
2241 
2242 /*
2243  * osds + primary
2244  */
2245 static bool osds_equal(const struct ceph_osds *lhs,
2246 		       const struct ceph_osds *rhs)
2247 {
2248 	if (__osds_equal(lhs, rhs) &&
2249 	    lhs->primary == rhs->primary)
2250 		return true;
2251 
2252 	return false;
2253 }
2254 
2255 static bool osds_valid(const struct ceph_osds *set)
2256 {
2257 	/* non-empty set */
2258 	if (set->size > 0 && set->primary >= 0)
2259 		return true;
2260 
2261 	/* empty can_shift_osds set */
2262 	if (!set->size && set->primary == -1)
2263 		return true;
2264 
2265 	/* empty !can_shift_osds set - all NONE */
2266 	if (set->size > 0 && set->primary == -1) {
2267 		int i;
2268 
2269 		for (i = 0; i < set->size; i++) {
2270 			if (set->osds[i] != CRUSH_ITEM_NONE)
2271 				break;
2272 		}
2273 		if (i == set->size)
2274 			return true;
2275 	}
2276 
2277 	return false;
2278 }
2279 
2280 void ceph_osds_copy(struct ceph_osds *dest, const struct ceph_osds *src)
2281 {
2282 	memcpy(dest->osds, src->osds, src->size * sizeof(src->osds[0]));
2283 	dest->size = src->size;
2284 	dest->primary = src->primary;
2285 }
2286 
2287 bool ceph_pg_is_split(const struct ceph_pg *pgid, u32 old_pg_num,
2288 		      u32 new_pg_num)
2289 {
2290 	int old_bits = calc_bits_of(old_pg_num);
2291 	int old_mask = (1 << old_bits) - 1;
2292 	int n;
2293 
2294 	WARN_ON(pgid->seed >= old_pg_num);
2295 	if (new_pg_num <= old_pg_num)
2296 		return false;
2297 
2298 	for (n = 1; ; n++) {
2299 		int next_bit = n << (old_bits - 1);
2300 		u32 s = next_bit | pgid->seed;
2301 
2302 		if (s < old_pg_num || s == pgid->seed)
2303 			continue;
2304 		if (s >= new_pg_num)
2305 			break;
2306 
2307 		s = ceph_stable_mod(s, old_pg_num, old_mask);
2308 		if (s == pgid->seed)
2309 			return true;
2310 	}
2311 
2312 	return false;
2313 }
2314 
2315 bool ceph_is_new_interval(const struct ceph_osds *old_acting,
2316 			  const struct ceph_osds *new_acting,
2317 			  const struct ceph_osds *old_up,
2318 			  const struct ceph_osds *new_up,
2319 			  int old_size,
2320 			  int new_size,
2321 			  int old_min_size,
2322 			  int new_min_size,
2323 			  u32 old_pg_num,
2324 			  u32 new_pg_num,
2325 			  bool old_sort_bitwise,
2326 			  bool new_sort_bitwise,
2327 			  bool old_recovery_deletes,
2328 			  bool new_recovery_deletes,
2329 			  const struct ceph_pg *pgid)
2330 {
2331 	return !osds_equal(old_acting, new_acting) ||
2332 	       !osds_equal(old_up, new_up) ||
2333 	       old_size != new_size ||
2334 	       old_min_size != new_min_size ||
2335 	       ceph_pg_is_split(pgid, old_pg_num, new_pg_num) ||
2336 	       old_sort_bitwise != new_sort_bitwise ||
2337 	       old_recovery_deletes != new_recovery_deletes;
2338 }
2339 
2340 static int calc_pg_rank(int osd, const struct ceph_osds *acting)
2341 {
2342 	int i;
2343 
2344 	for (i = 0; i < acting->size; i++) {
2345 		if (acting->osds[i] == osd)
2346 			return i;
2347 	}
2348 
2349 	return -1;
2350 }
2351 
2352 static bool primary_changed(const struct ceph_osds *old_acting,
2353 			    const struct ceph_osds *new_acting)
2354 {
2355 	if (!old_acting->size && !new_acting->size)
2356 		return false; /* both still empty */
2357 
2358 	if (!old_acting->size ^ !new_acting->size)
2359 		return true; /* was empty, now not, or vice versa */
2360 
2361 	if (old_acting->primary != new_acting->primary)
2362 		return true; /* primary changed */
2363 
2364 	if (calc_pg_rank(old_acting->primary, old_acting) !=
2365 	    calc_pg_rank(new_acting->primary, new_acting))
2366 		return true;
2367 
2368 	return false; /* same primary (tho replicas may have changed) */
2369 }
2370 
2371 bool ceph_osds_changed(const struct ceph_osds *old_acting,
2372 		       const struct ceph_osds *new_acting,
2373 		       bool any_change)
2374 {
2375 	if (primary_changed(old_acting, new_acting))
2376 		return true;
2377 
2378 	if (any_change && !__osds_equal(old_acting, new_acting))
2379 		return true;
2380 
2381 	return false;
2382 }
2383 
2384 /*
2385  * Map an object into a PG.
2386  *
2387  * Should only be called with target_oid and target_oloc (as opposed to
2388  * base_oid and base_oloc), since tiering isn't taken into account.
2389  */
2390 void __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi,
2391 				 const struct ceph_object_id *oid,
2392 				 const struct ceph_object_locator *oloc,
2393 				 struct ceph_pg *raw_pgid)
2394 {
2395 	WARN_ON(pi->id != oloc->pool);
2396 
2397 	if (!oloc->pool_ns) {
2398 		raw_pgid->pool = oloc->pool;
2399 		raw_pgid->seed = ceph_str_hash(pi->object_hash, oid->name,
2400 					     oid->name_len);
2401 		dout("%s %s -> raw_pgid %llu.%x\n", __func__, oid->name,
2402 		     raw_pgid->pool, raw_pgid->seed);
2403 	} else {
2404 		char stack_buf[256];
2405 		char *buf = stack_buf;
2406 		int nsl = oloc->pool_ns->len;
2407 		size_t total = nsl + 1 + oid->name_len;
2408 
2409 		if (total > sizeof(stack_buf))
2410 			buf = kmalloc(total, GFP_NOIO | __GFP_NOFAIL);
2411 		memcpy(buf, oloc->pool_ns->str, nsl);
2412 		buf[nsl] = '\037';
2413 		memcpy(buf + nsl + 1, oid->name, oid->name_len);
2414 		raw_pgid->pool = oloc->pool;
2415 		raw_pgid->seed = ceph_str_hash(pi->object_hash, buf, total);
2416 		if (buf != stack_buf)
2417 			kfree(buf);
2418 		dout("%s %s ns %.*s -> raw_pgid %llu.%x\n", __func__,
2419 		     oid->name, nsl, oloc->pool_ns->str,
2420 		     raw_pgid->pool, raw_pgid->seed);
2421 	}
2422 }
2423 
2424 int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
2425 			      const struct ceph_object_id *oid,
2426 			      const struct ceph_object_locator *oloc,
2427 			      struct ceph_pg *raw_pgid)
2428 {
2429 	struct ceph_pg_pool_info *pi;
2430 
2431 	pi = ceph_pg_pool_by_id(osdmap, oloc->pool);
2432 	if (!pi)
2433 		return -ENOENT;
2434 
2435 	__ceph_object_locator_to_pg(pi, oid, oloc, raw_pgid);
2436 	return 0;
2437 }
2438 EXPORT_SYMBOL(ceph_object_locator_to_pg);
2439 
2440 /*
2441  * Map a raw PG (full precision ps) into an actual PG.
2442  */
2443 static void raw_pg_to_pg(struct ceph_pg_pool_info *pi,
2444 			 const struct ceph_pg *raw_pgid,
2445 			 struct ceph_pg *pgid)
2446 {
2447 	pgid->pool = raw_pgid->pool;
2448 	pgid->seed = ceph_stable_mod(raw_pgid->seed, pi->pg_num,
2449 				     pi->pg_num_mask);
2450 }
2451 
2452 /*
2453  * Map a raw PG (full precision ps) into a placement ps (placement
2454  * seed).  Include pool id in that value so that different pools don't
2455  * use the same seeds.
2456  */
2457 static u32 raw_pg_to_pps(struct ceph_pg_pool_info *pi,
2458 			 const struct ceph_pg *raw_pgid)
2459 {
2460 	if (pi->flags & CEPH_POOL_FLAG_HASHPSPOOL) {
2461 		/* hash pool id and seed so that pool PGs do not overlap */
2462 		return crush_hash32_2(CRUSH_HASH_RJENKINS1,
2463 				      ceph_stable_mod(raw_pgid->seed,
2464 						      pi->pgp_num,
2465 						      pi->pgp_num_mask),
2466 				      raw_pgid->pool);
2467 	} else {
2468 		/*
2469 		 * legacy behavior: add ps and pool together.  this is
2470 		 * not a great approach because the PGs from each pool
2471 		 * will overlap on top of each other: 0.5 == 1.4 ==
2472 		 * 2.3 == ...
2473 		 */
2474 		return ceph_stable_mod(raw_pgid->seed, pi->pgp_num,
2475 				       pi->pgp_num_mask) +
2476 		       (unsigned)raw_pgid->pool;
2477 	}
2478 }
2479 
2480 /*
2481  * Magic value used for a "default" fallback choose_args, used if the
2482  * crush_choose_arg_map passed to do_crush() does not exist.  If this
2483  * also doesn't exist, fall back to canonical weights.
2484  */
2485 #define CEPH_DEFAULT_CHOOSE_ARGS	-1
2486 
2487 static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
2488 		    int *result, int result_max,
2489 		    const __u32 *weight, int weight_max,
2490 		    s64 choose_args_index)
2491 {
2492 	struct crush_choose_arg_map *arg_map;
2493 	struct crush_work *work;
2494 	int r;
2495 
2496 	BUG_ON(result_max > CEPH_PG_MAX_SIZE);
2497 
2498 	arg_map = lookup_choose_arg_map(&map->crush->choose_args,
2499 					choose_args_index);
2500 	if (!arg_map)
2501 		arg_map = lookup_choose_arg_map(&map->crush->choose_args,
2502 						CEPH_DEFAULT_CHOOSE_ARGS);
2503 
2504 	work = get_workspace(&map->crush_wsm, map->crush);
2505 	r = crush_do_rule(map->crush, ruleno, x, result, result_max,
2506 			  weight, weight_max, work,
2507 			  arg_map ? arg_map->args : NULL);
2508 	put_workspace(&map->crush_wsm, work);
2509 	return r;
2510 }
2511 
2512 static void remove_nonexistent_osds(struct ceph_osdmap *osdmap,
2513 				    struct ceph_pg_pool_info *pi,
2514 				    struct ceph_osds *set)
2515 {
2516 	int i;
2517 
2518 	if (ceph_can_shift_osds(pi)) {
2519 		int removed = 0;
2520 
2521 		/* shift left */
2522 		for (i = 0; i < set->size; i++) {
2523 			if (!ceph_osd_exists(osdmap, set->osds[i])) {
2524 				removed++;
2525 				continue;
2526 			}
2527 			if (removed)
2528 				set->osds[i - removed] = set->osds[i];
2529 		}
2530 		set->size -= removed;
2531 	} else {
2532 		/* set dne devices to NONE */
2533 		for (i = 0; i < set->size; i++) {
2534 			if (!ceph_osd_exists(osdmap, set->osds[i]))
2535 				set->osds[i] = CRUSH_ITEM_NONE;
2536 		}
2537 	}
2538 }
2539 
2540 /*
2541  * Calculate raw set (CRUSH output) for given PG and filter out
2542  * nonexistent OSDs.  ->primary is undefined for a raw set.
2543  *
2544  * Placement seed (CRUSH input) is returned through @ppps.
2545  */
2546 static void pg_to_raw_osds(struct ceph_osdmap *osdmap,
2547 			   struct ceph_pg_pool_info *pi,
2548 			   const struct ceph_pg *raw_pgid,
2549 			   struct ceph_osds *raw,
2550 			   u32 *ppps)
2551 {
2552 	u32 pps = raw_pg_to_pps(pi, raw_pgid);
2553 	int ruleno;
2554 	int len;
2555 
2556 	ceph_osds_init(raw);
2557 	if (ppps)
2558 		*ppps = pps;
2559 
2560 	ruleno = crush_find_rule(osdmap->crush, pi->crush_ruleset, pi->type,
2561 				 pi->size);
2562 	if (ruleno < 0) {
2563 		pr_err("no crush rule: pool %lld ruleset %d type %d size %d\n",
2564 		       pi->id, pi->crush_ruleset, pi->type, pi->size);
2565 		return;
2566 	}
2567 
2568 	if (pi->size > ARRAY_SIZE(raw->osds)) {
2569 		pr_err_ratelimited("pool %lld ruleset %d type %d too wide: size %d > %zu\n",
2570 		       pi->id, pi->crush_ruleset, pi->type, pi->size,
2571 		       ARRAY_SIZE(raw->osds));
2572 		return;
2573 	}
2574 
2575 	len = do_crush(osdmap, ruleno, pps, raw->osds, pi->size,
2576 		       osdmap->osd_weight, osdmap->max_osd, pi->id);
2577 	if (len < 0) {
2578 		pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n",
2579 		       len, ruleno, pi->id, pi->crush_ruleset, pi->type,
2580 		       pi->size);
2581 		return;
2582 	}
2583 
2584 	raw->size = len;
2585 	remove_nonexistent_osds(osdmap, pi, raw);
2586 }
2587 
2588 /* apply pg_upmap[_items] mappings */
2589 static void apply_upmap(struct ceph_osdmap *osdmap,
2590 			const struct ceph_pg *pgid,
2591 			struct ceph_osds *raw)
2592 {
2593 	struct ceph_pg_mapping *pg;
2594 	int i, j;
2595 
2596 	pg = lookup_pg_mapping(&osdmap->pg_upmap, pgid);
2597 	if (pg) {
2598 		/* make sure targets aren't marked out */
2599 		for (i = 0; i < pg->pg_upmap.len; i++) {
2600 			int osd = pg->pg_upmap.osds[i];
2601 
2602 			if (osd != CRUSH_ITEM_NONE &&
2603 			    osd < osdmap->max_osd &&
2604 			    osdmap->osd_weight[osd] == 0) {
2605 				/* reject/ignore explicit mapping */
2606 				return;
2607 			}
2608 		}
2609 		for (i = 0; i < pg->pg_upmap.len; i++)
2610 			raw->osds[i] = pg->pg_upmap.osds[i];
2611 		raw->size = pg->pg_upmap.len;
2612 		/* check and apply pg_upmap_items, if any */
2613 	}
2614 
2615 	pg = lookup_pg_mapping(&osdmap->pg_upmap_items, pgid);
2616 	if (pg) {
2617 		/*
2618 		 * Note: this approach does not allow a bidirectional swap,
2619 		 * e.g., [[1,2],[2,1]] applied to [0,1,2] -> [0,2,1].
2620 		 */
2621 		for (i = 0; i < pg->pg_upmap_items.len; i++) {
2622 			int from = pg->pg_upmap_items.from_to[i][0];
2623 			int to = pg->pg_upmap_items.from_to[i][1];
2624 			int pos = -1;
2625 			bool exists = false;
2626 
2627 			/* make sure replacement doesn't already appear */
2628 			for (j = 0; j < raw->size; j++) {
2629 				int osd = raw->osds[j];
2630 
2631 				if (osd == to) {
2632 					exists = true;
2633 					break;
2634 				}
2635 				/* ignore mapping if target is marked out */
2636 				if (osd == from && pos < 0 &&
2637 				    !(to != CRUSH_ITEM_NONE &&
2638 				      to < osdmap->max_osd &&
2639 				      osdmap->osd_weight[to] == 0)) {
2640 					pos = j;
2641 				}
2642 			}
2643 			if (!exists && pos >= 0)
2644 				raw->osds[pos] = to;
2645 		}
2646 	}
2647 }
2648 
2649 /*
2650  * Given raw set, calculate up set and up primary.  By definition of an
2651  * up set, the result won't contain nonexistent or down OSDs.
2652  *
2653  * This is done in-place - on return @set is the up set.  If it's
2654  * empty, ->primary will remain undefined.
2655  */
2656 static void raw_to_up_osds(struct ceph_osdmap *osdmap,
2657 			   struct ceph_pg_pool_info *pi,
2658 			   struct ceph_osds *set)
2659 {
2660 	int i;
2661 
2662 	/* ->primary is undefined for a raw set */
2663 	BUG_ON(set->primary != -1);
2664 
2665 	if (ceph_can_shift_osds(pi)) {
2666 		int removed = 0;
2667 
2668 		/* shift left */
2669 		for (i = 0; i < set->size; i++) {
2670 			if (ceph_osd_is_down(osdmap, set->osds[i])) {
2671 				removed++;
2672 				continue;
2673 			}
2674 			if (removed)
2675 				set->osds[i - removed] = set->osds[i];
2676 		}
2677 		set->size -= removed;
2678 		if (set->size > 0)
2679 			set->primary = set->osds[0];
2680 	} else {
2681 		/* set down/dne devices to NONE */
2682 		for (i = set->size - 1; i >= 0; i--) {
2683 			if (ceph_osd_is_down(osdmap, set->osds[i]))
2684 				set->osds[i] = CRUSH_ITEM_NONE;
2685 			else
2686 				set->primary = set->osds[i];
2687 		}
2688 	}
2689 }
2690 
2691 static void apply_primary_affinity(struct ceph_osdmap *osdmap,
2692 				   struct ceph_pg_pool_info *pi,
2693 				   u32 pps,
2694 				   struct ceph_osds *up)
2695 {
2696 	int i;
2697 	int pos = -1;
2698 
2699 	/*
2700 	 * Do we have any non-default primary_affinity values for these
2701 	 * osds?
2702 	 */
2703 	if (!osdmap->osd_primary_affinity)
2704 		return;
2705 
2706 	for (i = 0; i < up->size; i++) {
2707 		int osd = up->osds[i];
2708 
2709 		if (osd != CRUSH_ITEM_NONE &&
2710 		    osdmap->osd_primary_affinity[osd] !=
2711 					CEPH_OSD_DEFAULT_PRIMARY_AFFINITY) {
2712 			break;
2713 		}
2714 	}
2715 	if (i == up->size)
2716 		return;
2717 
2718 	/*
2719 	 * Pick the primary.  Feed both the seed (for the pg) and the
2720 	 * osd into the hash/rng so that a proportional fraction of an
2721 	 * osd's pgs get rejected as primary.
2722 	 */
2723 	for (i = 0; i < up->size; i++) {
2724 		int osd = up->osds[i];
2725 		u32 aff;
2726 
2727 		if (osd == CRUSH_ITEM_NONE)
2728 			continue;
2729 
2730 		aff = osdmap->osd_primary_affinity[osd];
2731 		if (aff < CEPH_OSD_MAX_PRIMARY_AFFINITY &&
2732 		    (crush_hash32_2(CRUSH_HASH_RJENKINS1,
2733 				    pps, osd) >> 16) >= aff) {
2734 			/*
2735 			 * We chose not to use this primary.  Note it
2736 			 * anyway as a fallback in case we don't pick
2737 			 * anyone else, but keep looking.
2738 			 */
2739 			if (pos < 0)
2740 				pos = i;
2741 		} else {
2742 			pos = i;
2743 			break;
2744 		}
2745 	}
2746 	if (pos < 0)
2747 		return;
2748 
2749 	up->primary = up->osds[pos];
2750 
2751 	if (ceph_can_shift_osds(pi) && pos > 0) {
2752 		/* move the new primary to the front */
2753 		for (i = pos; i > 0; i--)
2754 			up->osds[i] = up->osds[i - 1];
2755 		up->osds[0] = up->primary;
2756 	}
2757 }
2758 
2759 /*
2760  * Get pg_temp and primary_temp mappings for given PG.
2761  *
2762  * Note that a PG may have none, only pg_temp, only primary_temp or
2763  * both pg_temp and primary_temp mappings.  This means @temp isn't
2764  * always a valid OSD set on return: in the "only primary_temp" case,
2765  * @temp will have its ->primary >= 0 but ->size == 0.
2766  */
2767 static void get_temp_osds(struct ceph_osdmap *osdmap,
2768 			  struct ceph_pg_pool_info *pi,
2769 			  const struct ceph_pg *pgid,
2770 			  struct ceph_osds *temp)
2771 {
2772 	struct ceph_pg_mapping *pg;
2773 	int i;
2774 
2775 	ceph_osds_init(temp);
2776 
2777 	/* pg_temp? */
2778 	pg = lookup_pg_mapping(&osdmap->pg_temp, pgid);
2779 	if (pg) {
2780 		for (i = 0; i < pg->pg_temp.len; i++) {
2781 			if (ceph_osd_is_down(osdmap, pg->pg_temp.osds[i])) {
2782 				if (ceph_can_shift_osds(pi))
2783 					continue;
2784 
2785 				temp->osds[temp->size++] = CRUSH_ITEM_NONE;
2786 			} else {
2787 				temp->osds[temp->size++] = pg->pg_temp.osds[i];
2788 			}
2789 		}
2790 
2791 		/* apply pg_temp's primary */
2792 		for (i = 0; i < temp->size; i++) {
2793 			if (temp->osds[i] != CRUSH_ITEM_NONE) {
2794 				temp->primary = temp->osds[i];
2795 				break;
2796 			}
2797 		}
2798 	}
2799 
2800 	/* primary_temp? */
2801 	pg = lookup_pg_mapping(&osdmap->primary_temp, pgid);
2802 	if (pg)
2803 		temp->primary = pg->primary_temp.osd;
2804 }
2805 
2806 /*
2807  * Map a PG to its acting set as well as its up set.
2808  *
2809  * Acting set is used for data mapping purposes, while up set can be
2810  * recorded for detecting interval changes and deciding whether to
2811  * resend a request.
2812  */
2813 void ceph_pg_to_up_acting_osds(struct ceph_osdmap *osdmap,
2814 			       struct ceph_pg_pool_info *pi,
2815 			       const struct ceph_pg *raw_pgid,
2816 			       struct ceph_osds *up,
2817 			       struct ceph_osds *acting)
2818 {
2819 	struct ceph_pg pgid;
2820 	u32 pps;
2821 
2822 	WARN_ON(pi->id != raw_pgid->pool);
2823 	raw_pg_to_pg(pi, raw_pgid, &pgid);
2824 
2825 	pg_to_raw_osds(osdmap, pi, raw_pgid, up, &pps);
2826 	apply_upmap(osdmap, &pgid, up);
2827 	raw_to_up_osds(osdmap, pi, up);
2828 	apply_primary_affinity(osdmap, pi, pps, up);
2829 	get_temp_osds(osdmap, pi, &pgid, acting);
2830 	if (!acting->size) {
2831 		memcpy(acting->osds, up->osds, up->size * sizeof(up->osds[0]));
2832 		acting->size = up->size;
2833 		if (acting->primary == -1)
2834 			acting->primary = up->primary;
2835 	}
2836 	WARN_ON(!osds_valid(up) || !osds_valid(acting));
2837 }
2838 
2839 bool ceph_pg_to_primary_shard(struct ceph_osdmap *osdmap,
2840 			      struct ceph_pg_pool_info *pi,
2841 			      const struct ceph_pg *raw_pgid,
2842 			      struct ceph_spg *spgid)
2843 {
2844 	struct ceph_pg pgid;
2845 	struct ceph_osds up, acting;
2846 	int i;
2847 
2848 	WARN_ON(pi->id != raw_pgid->pool);
2849 	raw_pg_to_pg(pi, raw_pgid, &pgid);
2850 
2851 	if (ceph_can_shift_osds(pi)) {
2852 		spgid->pgid = pgid; /* struct */
2853 		spgid->shard = CEPH_SPG_NOSHARD;
2854 		return true;
2855 	}
2856 
2857 	ceph_pg_to_up_acting_osds(osdmap, pi, &pgid, &up, &acting);
2858 	for (i = 0; i < acting.size; i++) {
2859 		if (acting.osds[i] == acting.primary) {
2860 			spgid->pgid = pgid; /* struct */
2861 			spgid->shard = i;
2862 			return true;
2863 		}
2864 	}
2865 
2866 	return false;
2867 }
2868 
2869 /*
2870  * Return acting primary for given PG, or -1 if none.
2871  */
2872 int ceph_pg_to_acting_primary(struct ceph_osdmap *osdmap,
2873 			      const struct ceph_pg *raw_pgid)
2874 {
2875 	struct ceph_pg_pool_info *pi;
2876 	struct ceph_osds up, acting;
2877 
2878 	pi = ceph_pg_pool_by_id(osdmap, raw_pgid->pool);
2879 	if (!pi)
2880 		return -1;
2881 
2882 	ceph_pg_to_up_acting_osds(osdmap, pi, raw_pgid, &up, &acting);
2883 	return acting.primary;
2884 }
2885 EXPORT_SYMBOL(ceph_pg_to_acting_primary);
2886 
2887 static struct crush_loc_node *alloc_crush_loc(size_t type_name_len,
2888 					      size_t name_len)
2889 {
2890 	struct crush_loc_node *loc;
2891 
2892 	loc = kmalloc(sizeof(*loc) + type_name_len + name_len + 2, GFP_NOIO);
2893 	if (!loc)
2894 		return NULL;
2895 
2896 	RB_CLEAR_NODE(&loc->cl_node);
2897 	return loc;
2898 }
2899 
2900 static void free_crush_loc(struct crush_loc_node *loc)
2901 {
2902 	WARN_ON(!RB_EMPTY_NODE(&loc->cl_node));
2903 
2904 	kfree(loc);
2905 }
2906 
2907 static int crush_loc_compare(const struct crush_loc *loc1,
2908 			     const struct crush_loc *loc2)
2909 {
2910 	return strcmp(loc1->cl_type_name, loc2->cl_type_name) ?:
2911 	       strcmp(loc1->cl_name, loc2->cl_name);
2912 }
2913 
2914 DEFINE_RB_FUNCS2(crush_loc, struct crush_loc_node, cl_loc, crush_loc_compare,
2915 		 RB_BYPTR, const struct crush_loc *, cl_node)
2916 
2917 /*
2918  * Parses a set of <bucket type name>':'<bucket name> pairs separated
2919  * by '|', e.g. "rack:foo1|rack:foo2|datacenter:bar".
2920  *
2921  * Note that @crush_location is modified by strsep().
2922  */
2923 int ceph_parse_crush_location(char *crush_location, struct rb_root *locs)
2924 {
2925 	struct crush_loc_node *loc;
2926 	const char *type_name, *name, *colon;
2927 	size_t type_name_len, name_len;
2928 
2929 	dout("%s '%s'\n", __func__, crush_location);
2930 	while ((type_name = strsep(&crush_location, "|"))) {
2931 		colon = strchr(type_name, ':');
2932 		if (!colon)
2933 			return -EINVAL;
2934 
2935 		type_name_len = colon - type_name;
2936 		if (type_name_len == 0)
2937 			return -EINVAL;
2938 
2939 		name = colon + 1;
2940 		name_len = strlen(name);
2941 		if (name_len == 0)
2942 			return -EINVAL;
2943 
2944 		loc = alloc_crush_loc(type_name_len, name_len);
2945 		if (!loc)
2946 			return -ENOMEM;
2947 
2948 		loc->cl_loc.cl_type_name = loc->cl_data;
2949 		memcpy(loc->cl_loc.cl_type_name, type_name, type_name_len);
2950 		loc->cl_loc.cl_type_name[type_name_len] = '\0';
2951 
2952 		loc->cl_loc.cl_name = loc->cl_data + type_name_len + 1;
2953 		memcpy(loc->cl_loc.cl_name, name, name_len);
2954 		loc->cl_loc.cl_name[name_len] = '\0';
2955 
2956 		if (!__insert_crush_loc(locs, loc)) {
2957 			free_crush_loc(loc);
2958 			return -EEXIST;
2959 		}
2960 
2961 		dout("%s type_name '%s' name '%s'\n", __func__,
2962 		     loc->cl_loc.cl_type_name, loc->cl_loc.cl_name);
2963 	}
2964 
2965 	return 0;
2966 }
2967 
2968 int ceph_compare_crush_locs(struct rb_root *locs1, struct rb_root *locs2)
2969 {
2970 	struct rb_node *n1 = rb_first(locs1);
2971 	struct rb_node *n2 = rb_first(locs2);
2972 	int ret;
2973 
2974 	for ( ; n1 && n2; n1 = rb_next(n1), n2 = rb_next(n2)) {
2975 		struct crush_loc_node *loc1 =
2976 		    rb_entry(n1, struct crush_loc_node, cl_node);
2977 		struct crush_loc_node *loc2 =
2978 		    rb_entry(n2, struct crush_loc_node, cl_node);
2979 
2980 		ret = crush_loc_compare(&loc1->cl_loc, &loc2->cl_loc);
2981 		if (ret)
2982 			return ret;
2983 	}
2984 
2985 	if (!n1 && n2)
2986 		return -1;
2987 	if (n1 && !n2)
2988 		return 1;
2989 	return 0;
2990 }
2991 
2992 void ceph_clear_crush_locs(struct rb_root *locs)
2993 {
2994 	while (!RB_EMPTY_ROOT(locs)) {
2995 		struct crush_loc_node *loc =
2996 		    rb_entry(rb_first(locs), struct crush_loc_node, cl_node);
2997 
2998 		erase_crush_loc(locs, loc);
2999 		free_crush_loc(loc);
3000 	}
3001 }
3002 
3003 /*
3004  * [a-zA-Z0-9-_.]+
3005  */
3006 static bool is_valid_crush_name(const char *name)
3007 {
3008 	do {
3009 		if (!('a' <= *name && *name <= 'z') &&
3010 		    !('A' <= *name && *name <= 'Z') &&
3011 		    !('0' <= *name && *name <= '9') &&
3012 		    *name != '-' && *name != '_' && *name != '.')
3013 			return false;
3014 	} while (*++name != '\0');
3015 
3016 	return true;
3017 }
3018 
3019 /*
3020  * Gets the parent of an item.  Returns its id (<0 because the
3021  * parent is always a bucket), type id (>0 for the same reason,
3022  * via @parent_type_id) and location (via @parent_loc).  If no
3023  * parent, returns 0.
3024  *
3025  * Does a linear search, as there are no parent pointers of any
3026  * kind.  Note that the result is ambiguous for items that occur
3027  * multiple times in the map.
3028  */
3029 static int get_immediate_parent(struct crush_map *c, int id,
3030 				u16 *parent_type_id,
3031 				struct crush_loc *parent_loc)
3032 {
3033 	struct crush_bucket *b;
3034 	struct crush_name_node *type_cn, *cn;
3035 	int i, j;
3036 
3037 	for (i = 0; i < c->max_buckets; i++) {
3038 		b = c->buckets[i];
3039 		if (!b)
3040 			continue;
3041 
3042 		/* ignore per-class shadow hierarchy */
3043 		cn = lookup_crush_name(&c->names, b->id);
3044 		if (!cn || !is_valid_crush_name(cn->cn_name))
3045 			continue;
3046 
3047 		for (j = 0; j < b->size; j++) {
3048 			if (b->items[j] != id)
3049 				continue;
3050 
3051 			*parent_type_id = b->type;
3052 			type_cn = lookup_crush_name(&c->type_names, b->type);
3053 			parent_loc->cl_type_name = type_cn->cn_name;
3054 			parent_loc->cl_name = cn->cn_name;
3055 			return b->id;
3056 		}
3057 	}
3058 
3059 	return 0;  /* no parent */
3060 }
3061 
3062 /*
3063  * Calculates the locality/distance from an item to a client
3064  * location expressed in terms of CRUSH hierarchy as a set of
3065  * (bucket type name, bucket name) pairs.  Specifically, looks
3066  * for the lowest-valued bucket type for which the location of
3067  * @id matches one of the locations in @locs, so for standard
3068  * bucket types (host = 1, rack = 3, datacenter = 8, zone = 9)
3069  * a matching host is closer than a matching rack and a matching
3070  * data center is closer than a matching zone.
3071  *
3072  * Specifying multiple locations (a "multipath" location) such
3073  * as "rack=foo1 rack=foo2 datacenter=bar" is allowed -- @locs
3074  * is a multimap.  The locality will be:
3075  *
3076  * - 3 for OSDs in racks foo1 and foo2
3077  * - 8 for OSDs in data center bar
3078  * - -1 for all other OSDs
3079  *
3080  * The lowest possible bucket type is 1, so the best locality
3081  * for an OSD is 1 (i.e. a matching host).  Locality 0 would be
3082  * the OSD itself.
3083  */
3084 int ceph_get_crush_locality(struct ceph_osdmap *osdmap, int id,
3085 			    struct rb_root *locs)
3086 {
3087 	struct crush_loc loc;
3088 	u16 type_id;
3089 
3090 	/*
3091 	 * Instead of repeated get_immediate_parent() calls,
3092 	 * the location of @id could be obtained with a single
3093 	 * depth-first traversal.
3094 	 */
3095 	for (;;) {
3096 		id = get_immediate_parent(osdmap->crush, id, &type_id, &loc);
3097 		if (id >= 0)
3098 			return -1;  /* not local */
3099 
3100 		if (lookup_crush_loc(locs, &loc))
3101 			return type_id;
3102 	}
3103 }
3104