xref: /linux/net/ceph/osdmap.c (revision 6a069876eb1402478900ee0eb7d7fe276bb1f4e3)
1 // SPDX-License-Identifier: GPL-2.0
2 
3 #include <linux/ceph/ceph_debug.h>
4 
5 #include <linux/module.h>
6 #include <linux/slab.h>
7 
8 #include <linux/ceph/libceph.h>
9 #include <linux/ceph/osdmap.h>
10 #include <linux/ceph/decode.h>
11 #include <linux/crush/hash.h>
12 #include <linux/crush/mapper.h>
13 
14 static __printf(2, 3)
15 void osdmap_info(const struct ceph_osdmap *map, const char *fmt, ...)
16 {
17 	struct va_format vaf;
18 	va_list args;
19 
20 	va_start(args, fmt);
21 	vaf.fmt = fmt;
22 	vaf.va = &args;
23 
24 	printk(KERN_INFO "%s (%pU e%u): %pV", KBUILD_MODNAME, &map->fsid,
25 	       map->epoch, &vaf);
26 
27 	va_end(args);
28 }
29 
30 char *ceph_osdmap_state_str(char *str, int len, u32 state)
31 {
32 	if (!len)
33 		return str;
34 
35 	if ((state & CEPH_OSD_EXISTS) && (state & CEPH_OSD_UP))
36 		snprintf(str, len, "exists, up");
37 	else if (state & CEPH_OSD_EXISTS)
38 		snprintf(str, len, "exists");
39 	else if (state & CEPH_OSD_UP)
40 		snprintf(str, len, "up");
41 	else
42 		snprintf(str, len, "doesn't exist");
43 
44 	return str;
45 }
46 
47 /* maps */
48 
49 static int calc_bits_of(unsigned int t)
50 {
51 	int b = 0;
52 	while (t) {
53 		t = t >> 1;
54 		b++;
55 	}
56 	return b;
57 }
58 
59 /*
60  * the foo_mask is the smallest value 2^n-1 that is >= foo.
61  */
62 static void calc_pg_masks(struct ceph_pg_pool_info *pi)
63 {
64 	pi->pg_num_mask = (1 << calc_bits_of(pi->pg_num-1)) - 1;
65 	pi->pgp_num_mask = (1 << calc_bits_of(pi->pgp_num-1)) - 1;
66 }
67 
68 /*
69  * decode crush map
70  */
71 static int crush_decode_uniform_bucket(void **p, void *end,
72 				       struct crush_bucket_uniform *b)
73 {
74 	dout("crush_decode_uniform_bucket %p to %p\n", *p, end);
75 	ceph_decode_need(p, end, (1+b->h.size) * sizeof(u32), bad);
76 	b->item_weight = ceph_decode_32(p);
77 	return 0;
78 bad:
79 	return -EINVAL;
80 }
81 
82 static int crush_decode_list_bucket(void **p, void *end,
83 				    struct crush_bucket_list *b)
84 {
85 	int j;
86 	dout("crush_decode_list_bucket %p to %p\n", *p, end);
87 	b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
88 	if (b->item_weights == NULL)
89 		return -ENOMEM;
90 	b->sum_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
91 	if (b->sum_weights == NULL)
92 		return -ENOMEM;
93 	ceph_decode_need(p, end, 2 * b->h.size * sizeof(u32), bad);
94 	for (j = 0; j < b->h.size; j++) {
95 		b->item_weights[j] = ceph_decode_32(p);
96 		b->sum_weights[j] = ceph_decode_32(p);
97 	}
98 	return 0;
99 bad:
100 	return -EINVAL;
101 }
102 
103 static int crush_decode_tree_bucket(void **p, void *end,
104 				    struct crush_bucket_tree *b)
105 {
106 	int j;
107 	dout("crush_decode_tree_bucket %p to %p\n", *p, end);
108 	ceph_decode_8_safe(p, end, b->num_nodes, bad);
109 	b->node_weights = kcalloc(b->num_nodes, sizeof(u32), GFP_NOFS);
110 	if (b->node_weights == NULL)
111 		return -ENOMEM;
112 	ceph_decode_need(p, end, b->num_nodes * sizeof(u32), bad);
113 	for (j = 0; j < b->num_nodes; j++)
114 		b->node_weights[j] = ceph_decode_32(p);
115 	return 0;
116 bad:
117 	return -EINVAL;
118 }
119 
120 static int crush_decode_straw_bucket(void **p, void *end,
121 				     struct crush_bucket_straw *b)
122 {
123 	int j;
124 	dout("crush_decode_straw_bucket %p to %p\n", *p, end);
125 	b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
126 	if (b->item_weights == NULL)
127 		return -ENOMEM;
128 	b->straws = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
129 	if (b->straws == NULL)
130 		return -ENOMEM;
131 	ceph_decode_need(p, end, 2 * b->h.size * sizeof(u32), bad);
132 	for (j = 0; j < b->h.size; j++) {
133 		b->item_weights[j] = ceph_decode_32(p);
134 		b->straws[j] = ceph_decode_32(p);
135 	}
136 	return 0;
137 bad:
138 	return -EINVAL;
139 }
140 
141 static int crush_decode_straw2_bucket(void **p, void *end,
142 				      struct crush_bucket_straw2 *b)
143 {
144 	int j;
145 	dout("crush_decode_straw2_bucket %p to %p\n", *p, end);
146 	b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
147 	if (b->item_weights == NULL)
148 		return -ENOMEM;
149 	ceph_decode_need(p, end, b->h.size * sizeof(u32), bad);
150 	for (j = 0; j < b->h.size; j++)
151 		b->item_weights[j] = ceph_decode_32(p);
152 	return 0;
153 bad:
154 	return -EINVAL;
155 }
156 
157 struct crush_name_node {
158 	struct rb_node cn_node;
159 	int cn_id;
160 	char cn_name[];
161 };
162 
163 static struct crush_name_node *alloc_crush_name(size_t name_len)
164 {
165 	struct crush_name_node *cn;
166 
167 	cn = kmalloc(sizeof(*cn) + name_len + 1, GFP_NOIO);
168 	if (!cn)
169 		return NULL;
170 
171 	RB_CLEAR_NODE(&cn->cn_node);
172 	return cn;
173 }
174 
175 static void free_crush_name(struct crush_name_node *cn)
176 {
177 	WARN_ON(!RB_EMPTY_NODE(&cn->cn_node));
178 
179 	kfree(cn);
180 }
181 
182 DEFINE_RB_FUNCS(crush_name, struct crush_name_node, cn_id, cn_node)
183 
184 static int decode_crush_names(void **p, void *end, struct rb_root *root)
185 {
186 	u32 n;
187 
188 	ceph_decode_32_safe(p, end, n, e_inval);
189 	while (n--) {
190 		struct crush_name_node *cn;
191 		int id;
192 		u32 name_len;
193 
194 		ceph_decode_32_safe(p, end, id, e_inval);
195 		ceph_decode_32_safe(p, end, name_len, e_inval);
196 		ceph_decode_need(p, end, name_len, e_inval);
197 
198 		cn = alloc_crush_name(name_len);
199 		if (!cn)
200 			return -ENOMEM;
201 
202 		cn->cn_id = id;
203 		memcpy(cn->cn_name, *p, name_len);
204 		cn->cn_name[name_len] = '\0';
205 		*p += name_len;
206 
207 		if (!__insert_crush_name(root, cn)) {
208 			free_crush_name(cn);
209 			return -EEXIST;
210 		}
211 	}
212 
213 	return 0;
214 
215 e_inval:
216 	return -EINVAL;
217 }
218 
219 void clear_crush_names(struct rb_root *root)
220 {
221 	while (!RB_EMPTY_ROOT(root)) {
222 		struct crush_name_node *cn =
223 		    rb_entry(rb_first(root), struct crush_name_node, cn_node);
224 
225 		erase_crush_name(root, cn);
226 		free_crush_name(cn);
227 	}
228 }
229 
230 static struct crush_choose_arg_map *alloc_choose_arg_map(void)
231 {
232 	struct crush_choose_arg_map *arg_map;
233 
234 	arg_map = kzalloc(sizeof(*arg_map), GFP_NOIO);
235 	if (!arg_map)
236 		return NULL;
237 
238 	RB_CLEAR_NODE(&arg_map->node);
239 	return arg_map;
240 }
241 
242 static void free_choose_arg_map(struct crush_choose_arg_map *arg_map)
243 {
244 	int i, j;
245 
246 	if (!arg_map)
247 		return;
248 
249 	WARN_ON(!RB_EMPTY_NODE(&arg_map->node));
250 
251 	if (arg_map->args) {
252 		for (i = 0; i < arg_map->size; i++) {
253 			struct crush_choose_arg *arg = &arg_map->args[i];
254 			if (arg->weight_set) {
255 				for (j = 0; j < arg->weight_set_size; j++)
256 					kfree(arg->weight_set[j].weights);
257 				kfree(arg->weight_set);
258 			}
259 			kfree(arg->ids);
260 		}
261 		kfree(arg_map->args);
262 	}
263 	kfree(arg_map);
264 }
265 
266 DEFINE_RB_FUNCS(choose_arg_map, struct crush_choose_arg_map, choose_args_index,
267 		node);
268 
269 void clear_choose_args(struct crush_map *c)
270 {
271 	while (!RB_EMPTY_ROOT(&c->choose_args)) {
272 		struct crush_choose_arg_map *arg_map =
273 		    rb_entry(rb_first(&c->choose_args),
274 			     struct crush_choose_arg_map, node);
275 
276 		erase_choose_arg_map(&c->choose_args, arg_map);
277 		free_choose_arg_map(arg_map);
278 	}
279 }
280 
281 static u32 *decode_array_32_alloc(void **p, void *end, u32 *plen)
282 {
283 	u32 *a = NULL;
284 	u32 len;
285 	int ret;
286 
287 	ceph_decode_32_safe(p, end, len, e_inval);
288 	if (len) {
289 		u32 i;
290 
291 		a = kmalloc_array(len, sizeof(u32), GFP_NOIO);
292 		if (!a) {
293 			ret = -ENOMEM;
294 			goto fail;
295 		}
296 
297 		ceph_decode_need(p, end, len * sizeof(u32), e_inval);
298 		for (i = 0; i < len; i++)
299 			a[i] = ceph_decode_32(p);
300 	}
301 
302 	*plen = len;
303 	return a;
304 
305 e_inval:
306 	ret = -EINVAL;
307 fail:
308 	kfree(a);
309 	return ERR_PTR(ret);
310 }
311 
312 /*
313  * Assumes @arg is zero-initialized.
314  */
315 static int decode_choose_arg(void **p, void *end, struct crush_choose_arg *arg)
316 {
317 	int ret;
318 
319 	ceph_decode_32_safe(p, end, arg->weight_set_size, e_inval);
320 	if (arg->weight_set_size) {
321 		u32 i;
322 
323 		arg->weight_set = kmalloc_array(arg->weight_set_size,
324 						sizeof(*arg->weight_set),
325 						GFP_NOIO);
326 		if (!arg->weight_set)
327 			return -ENOMEM;
328 
329 		for (i = 0; i < arg->weight_set_size; i++) {
330 			struct crush_weight_set *w = &arg->weight_set[i];
331 
332 			w->weights = decode_array_32_alloc(p, end, &w->size);
333 			if (IS_ERR(w->weights)) {
334 				ret = PTR_ERR(w->weights);
335 				w->weights = NULL;
336 				return ret;
337 			}
338 		}
339 	}
340 
341 	arg->ids = decode_array_32_alloc(p, end, &arg->ids_size);
342 	if (IS_ERR(arg->ids)) {
343 		ret = PTR_ERR(arg->ids);
344 		arg->ids = NULL;
345 		return ret;
346 	}
347 
348 	return 0;
349 
350 e_inval:
351 	return -EINVAL;
352 }
353 
354 static int decode_choose_args(void **p, void *end, struct crush_map *c)
355 {
356 	struct crush_choose_arg_map *arg_map = NULL;
357 	u32 num_choose_arg_maps, num_buckets;
358 	int ret;
359 
360 	ceph_decode_32_safe(p, end, num_choose_arg_maps, e_inval);
361 	while (num_choose_arg_maps--) {
362 		arg_map = alloc_choose_arg_map();
363 		if (!arg_map) {
364 			ret = -ENOMEM;
365 			goto fail;
366 		}
367 
368 		ceph_decode_64_safe(p, end, arg_map->choose_args_index,
369 				    e_inval);
370 		arg_map->size = c->max_buckets;
371 		arg_map->args = kcalloc(arg_map->size, sizeof(*arg_map->args),
372 					GFP_NOIO);
373 		if (!arg_map->args) {
374 			ret = -ENOMEM;
375 			goto fail;
376 		}
377 
378 		ceph_decode_32_safe(p, end, num_buckets, e_inval);
379 		while (num_buckets--) {
380 			struct crush_choose_arg *arg;
381 			u32 bucket_index;
382 
383 			ceph_decode_32_safe(p, end, bucket_index, e_inval);
384 			if (bucket_index >= arg_map->size)
385 				goto e_inval;
386 
387 			arg = &arg_map->args[bucket_index];
388 			ret = decode_choose_arg(p, end, arg);
389 			if (ret)
390 				goto fail;
391 
392 			if (arg->ids_size &&
393 			    arg->ids_size != c->buckets[bucket_index]->size)
394 				goto e_inval;
395 		}
396 
397 		insert_choose_arg_map(&c->choose_args, arg_map);
398 	}
399 
400 	return 0;
401 
402 e_inval:
403 	ret = -EINVAL;
404 fail:
405 	free_choose_arg_map(arg_map);
406 	return ret;
407 }
408 
409 static void crush_finalize(struct crush_map *c)
410 {
411 	__s32 b;
412 
413 	/* Space for the array of pointers to per-bucket workspace */
414 	c->working_size = sizeof(struct crush_work) +
415 	    c->max_buckets * sizeof(struct crush_work_bucket *);
416 
417 	for (b = 0; b < c->max_buckets; b++) {
418 		if (!c->buckets[b])
419 			continue;
420 
421 		switch (c->buckets[b]->alg) {
422 		default:
423 			/*
424 			 * The base case, permutation variables and
425 			 * the pointer to the permutation array.
426 			 */
427 			c->working_size += sizeof(struct crush_work_bucket);
428 			break;
429 		}
430 		/* Every bucket has a permutation array. */
431 		c->working_size += c->buckets[b]->size * sizeof(__u32);
432 	}
433 }
434 
435 static struct crush_map *crush_decode(void *pbyval, void *end)
436 {
437 	struct crush_map *c;
438 	int err;
439 	int i, j;
440 	void **p = &pbyval;
441 	void *start = pbyval;
442 	u32 magic;
443 
444 	dout("crush_decode %p to %p len %d\n", *p, end, (int)(end - *p));
445 
446 	c = kzalloc(sizeof(*c), GFP_NOFS);
447 	if (c == NULL)
448 		return ERR_PTR(-ENOMEM);
449 
450 	c->type_names = RB_ROOT;
451 	c->names = RB_ROOT;
452 	c->choose_args = RB_ROOT;
453 
454         /* set tunables to default values */
455         c->choose_local_tries = 2;
456         c->choose_local_fallback_tries = 5;
457         c->choose_total_tries = 19;
458 	c->chooseleaf_descend_once = 0;
459 
460 	ceph_decode_need(p, end, 4*sizeof(u32), bad);
461 	magic = ceph_decode_32(p);
462 	if (magic != CRUSH_MAGIC) {
463 		pr_err("crush_decode magic %x != current %x\n",
464 		       (unsigned int)magic, (unsigned int)CRUSH_MAGIC);
465 		goto bad;
466 	}
467 	c->max_buckets = ceph_decode_32(p);
468 	c->max_rules = ceph_decode_32(p);
469 	c->max_devices = ceph_decode_32(p);
470 
471 	c->buckets = kcalloc(c->max_buckets, sizeof(*c->buckets), GFP_NOFS);
472 	if (c->buckets == NULL)
473 		goto badmem;
474 	c->rules = kcalloc(c->max_rules, sizeof(*c->rules), GFP_NOFS);
475 	if (c->rules == NULL)
476 		goto badmem;
477 
478 	/* buckets */
479 	for (i = 0; i < c->max_buckets; i++) {
480 		int size = 0;
481 		u32 alg;
482 		struct crush_bucket *b;
483 
484 		ceph_decode_32_safe(p, end, alg, bad);
485 		if (alg == 0) {
486 			c->buckets[i] = NULL;
487 			continue;
488 		}
489 		dout("crush_decode bucket %d off %x %p to %p\n",
490 		     i, (int)(*p-start), *p, end);
491 
492 		switch (alg) {
493 		case CRUSH_BUCKET_UNIFORM:
494 			size = sizeof(struct crush_bucket_uniform);
495 			break;
496 		case CRUSH_BUCKET_LIST:
497 			size = sizeof(struct crush_bucket_list);
498 			break;
499 		case CRUSH_BUCKET_TREE:
500 			size = sizeof(struct crush_bucket_tree);
501 			break;
502 		case CRUSH_BUCKET_STRAW:
503 			size = sizeof(struct crush_bucket_straw);
504 			break;
505 		case CRUSH_BUCKET_STRAW2:
506 			size = sizeof(struct crush_bucket_straw2);
507 			break;
508 		default:
509 			goto bad;
510 		}
511 		BUG_ON(size == 0);
512 		b = c->buckets[i] = kzalloc(size, GFP_NOFS);
513 		if (b == NULL)
514 			goto badmem;
515 
516 		ceph_decode_need(p, end, 4*sizeof(u32), bad);
517 		b->id = ceph_decode_32(p);
518 		b->type = ceph_decode_16(p);
519 		b->alg = ceph_decode_8(p);
520 		b->hash = ceph_decode_8(p);
521 		b->weight = ceph_decode_32(p);
522 		b->size = ceph_decode_32(p);
523 
524 		dout("crush_decode bucket size %d off %x %p to %p\n",
525 		     b->size, (int)(*p-start), *p, end);
526 
527 		b->items = kcalloc(b->size, sizeof(__s32), GFP_NOFS);
528 		if (b->items == NULL)
529 			goto badmem;
530 
531 		ceph_decode_need(p, end, b->size*sizeof(u32), bad);
532 		for (j = 0; j < b->size; j++)
533 			b->items[j] = ceph_decode_32(p);
534 
535 		switch (b->alg) {
536 		case CRUSH_BUCKET_UNIFORM:
537 			err = crush_decode_uniform_bucket(p, end,
538 				  (struct crush_bucket_uniform *)b);
539 			if (err < 0)
540 				goto fail;
541 			break;
542 		case CRUSH_BUCKET_LIST:
543 			err = crush_decode_list_bucket(p, end,
544 			       (struct crush_bucket_list *)b);
545 			if (err < 0)
546 				goto fail;
547 			break;
548 		case CRUSH_BUCKET_TREE:
549 			err = crush_decode_tree_bucket(p, end,
550 				(struct crush_bucket_tree *)b);
551 			if (err < 0)
552 				goto fail;
553 			break;
554 		case CRUSH_BUCKET_STRAW:
555 			err = crush_decode_straw_bucket(p, end,
556 				(struct crush_bucket_straw *)b);
557 			if (err < 0)
558 				goto fail;
559 			break;
560 		case CRUSH_BUCKET_STRAW2:
561 			err = crush_decode_straw2_bucket(p, end,
562 				(struct crush_bucket_straw2 *)b);
563 			if (err < 0)
564 				goto fail;
565 			break;
566 		}
567 	}
568 
569 	/* rules */
570 	dout("rule vec is %p\n", c->rules);
571 	for (i = 0; i < c->max_rules; i++) {
572 		u32 yes;
573 		struct crush_rule *r;
574 
575 		ceph_decode_32_safe(p, end, yes, bad);
576 		if (!yes) {
577 			dout("crush_decode NO rule %d off %x %p to %p\n",
578 			     i, (int)(*p-start), *p, end);
579 			c->rules[i] = NULL;
580 			continue;
581 		}
582 
583 		dout("crush_decode rule %d off %x %p to %p\n",
584 		     i, (int)(*p-start), *p, end);
585 
586 		/* len */
587 		ceph_decode_32_safe(p, end, yes, bad);
588 #if BITS_PER_LONG == 32
589 		if (yes > (ULONG_MAX - sizeof(*r))
590 			  / sizeof(struct crush_rule_step))
591 			goto bad;
592 #endif
593 		r = kmalloc(struct_size(r, steps, yes), GFP_NOFS);
594 		if (r == NULL)
595 			goto badmem;
596 		dout(" rule %d is at %p\n", i, r);
597 		c->rules[i] = r;
598 		r->len = yes;
599 		ceph_decode_copy_safe(p, end, &r->mask, 4, bad); /* 4 u8's */
600 		ceph_decode_need(p, end, r->len*3*sizeof(u32), bad);
601 		for (j = 0; j < r->len; j++) {
602 			r->steps[j].op = ceph_decode_32(p);
603 			r->steps[j].arg1 = ceph_decode_32(p);
604 			r->steps[j].arg2 = ceph_decode_32(p);
605 		}
606 	}
607 
608 	err = decode_crush_names(p, end, &c->type_names);
609 	if (err)
610 		goto fail;
611 
612 	err = decode_crush_names(p, end, &c->names);
613 	if (err)
614 		goto fail;
615 
616 	ceph_decode_skip_map(p, end, 32, string, bad); /* rule_name_map */
617 
618         /* tunables */
619         ceph_decode_need(p, end, 3*sizeof(u32), done);
620         c->choose_local_tries = ceph_decode_32(p);
621         c->choose_local_fallback_tries =  ceph_decode_32(p);
622         c->choose_total_tries = ceph_decode_32(p);
623         dout("crush decode tunable choose_local_tries = %d\n",
624              c->choose_local_tries);
625         dout("crush decode tunable choose_local_fallback_tries = %d\n",
626              c->choose_local_fallback_tries);
627         dout("crush decode tunable choose_total_tries = %d\n",
628              c->choose_total_tries);
629 
630 	ceph_decode_need(p, end, sizeof(u32), done);
631 	c->chooseleaf_descend_once = ceph_decode_32(p);
632 	dout("crush decode tunable chooseleaf_descend_once = %d\n",
633 	     c->chooseleaf_descend_once);
634 
635 	ceph_decode_need(p, end, sizeof(u8), done);
636 	c->chooseleaf_vary_r = ceph_decode_8(p);
637 	dout("crush decode tunable chooseleaf_vary_r = %d\n",
638 	     c->chooseleaf_vary_r);
639 
640 	/* skip straw_calc_version, allowed_bucket_algs */
641 	ceph_decode_need(p, end, sizeof(u8) + sizeof(u32), done);
642 	*p += sizeof(u8) + sizeof(u32);
643 
644 	ceph_decode_need(p, end, sizeof(u8), done);
645 	c->chooseleaf_stable = ceph_decode_8(p);
646 	dout("crush decode tunable chooseleaf_stable = %d\n",
647 	     c->chooseleaf_stable);
648 
649 	if (*p != end) {
650 		/* class_map */
651 		ceph_decode_skip_map(p, end, 32, 32, bad);
652 		/* class_name */
653 		ceph_decode_skip_map(p, end, 32, string, bad);
654 		/* class_bucket */
655 		ceph_decode_skip_map_of_map(p, end, 32, 32, 32, bad);
656 	}
657 
658 	if (*p != end) {
659 		err = decode_choose_args(p, end, c);
660 		if (err)
661 			goto fail;
662 	}
663 
664 done:
665 	crush_finalize(c);
666 	dout("crush_decode success\n");
667 	return c;
668 
669 badmem:
670 	err = -ENOMEM;
671 fail:
672 	dout("crush_decode fail %d\n", err);
673 	crush_destroy(c);
674 	return ERR_PTR(err);
675 
676 bad:
677 	err = -EINVAL;
678 	goto fail;
679 }
680 
681 int ceph_pg_compare(const struct ceph_pg *lhs, const struct ceph_pg *rhs)
682 {
683 	if (lhs->pool < rhs->pool)
684 		return -1;
685 	if (lhs->pool > rhs->pool)
686 		return 1;
687 	if (lhs->seed < rhs->seed)
688 		return -1;
689 	if (lhs->seed > rhs->seed)
690 		return 1;
691 
692 	return 0;
693 }
694 
695 int ceph_spg_compare(const struct ceph_spg *lhs, const struct ceph_spg *rhs)
696 {
697 	int ret;
698 
699 	ret = ceph_pg_compare(&lhs->pgid, &rhs->pgid);
700 	if (ret)
701 		return ret;
702 
703 	if (lhs->shard < rhs->shard)
704 		return -1;
705 	if (lhs->shard > rhs->shard)
706 		return 1;
707 
708 	return 0;
709 }
710 
711 static struct ceph_pg_mapping *alloc_pg_mapping(size_t payload_len)
712 {
713 	struct ceph_pg_mapping *pg;
714 
715 	pg = kmalloc(sizeof(*pg) + payload_len, GFP_NOIO);
716 	if (!pg)
717 		return NULL;
718 
719 	RB_CLEAR_NODE(&pg->node);
720 	return pg;
721 }
722 
723 static void free_pg_mapping(struct ceph_pg_mapping *pg)
724 {
725 	WARN_ON(!RB_EMPTY_NODE(&pg->node));
726 
727 	kfree(pg);
728 }
729 
730 /*
731  * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid
732  * to a set of osds) and primary_temp (explicit primary setting)
733  */
734 DEFINE_RB_FUNCS2(pg_mapping, struct ceph_pg_mapping, pgid, ceph_pg_compare,
735 		 RB_BYPTR, const struct ceph_pg *, node)
736 
737 /*
738  * rbtree of pg pool info
739  */
740 DEFINE_RB_FUNCS(pg_pool, struct ceph_pg_pool_info, id, node)
741 
742 struct ceph_pg_pool_info *ceph_pg_pool_by_id(struct ceph_osdmap *map, u64 id)
743 {
744 	return lookup_pg_pool(&map->pg_pools, id);
745 }
746 
747 const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id)
748 {
749 	struct ceph_pg_pool_info *pi;
750 
751 	if (id == CEPH_NOPOOL)
752 		return NULL;
753 
754 	if (WARN_ON_ONCE(id > (u64) INT_MAX))
755 		return NULL;
756 
757 	pi = lookup_pg_pool(&map->pg_pools, id);
758 	return pi ? pi->name : NULL;
759 }
760 EXPORT_SYMBOL(ceph_pg_pool_name_by_id);
761 
762 int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name)
763 {
764 	struct rb_node *rbp;
765 
766 	for (rbp = rb_first(&map->pg_pools); rbp; rbp = rb_next(rbp)) {
767 		struct ceph_pg_pool_info *pi =
768 			rb_entry(rbp, struct ceph_pg_pool_info, node);
769 		if (pi->name && strcmp(pi->name, name) == 0)
770 			return pi->id;
771 	}
772 	return -ENOENT;
773 }
774 EXPORT_SYMBOL(ceph_pg_poolid_by_name);
775 
776 u64 ceph_pg_pool_flags(struct ceph_osdmap *map, u64 id)
777 {
778 	struct ceph_pg_pool_info *pi;
779 
780 	pi = lookup_pg_pool(&map->pg_pools, id);
781 	return pi ? pi->flags : 0;
782 }
783 EXPORT_SYMBOL(ceph_pg_pool_flags);
784 
785 static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi)
786 {
787 	erase_pg_pool(root, pi);
788 	kfree(pi->name);
789 	kfree(pi);
790 }
791 
792 static int decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
793 {
794 	u8 ev, cv;
795 	unsigned len, num;
796 	void *pool_end;
797 
798 	ceph_decode_need(p, end, 2 + 4, bad);
799 	ev = ceph_decode_8(p);  /* encoding version */
800 	cv = ceph_decode_8(p); /* compat version */
801 	if (ev < 5) {
802 		pr_warn("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv);
803 		return -EINVAL;
804 	}
805 	if (cv > 9) {
806 		pr_warn("got v %d cv %d > 9 of ceph_pg_pool\n", ev, cv);
807 		return -EINVAL;
808 	}
809 	len = ceph_decode_32(p);
810 	ceph_decode_need(p, end, len, bad);
811 	pool_end = *p + len;
812 
813 	ceph_decode_need(p, end, 4 + 4 + 4, bad);
814 	pi->type = ceph_decode_8(p);
815 	pi->size = ceph_decode_8(p);
816 	pi->crush_ruleset = ceph_decode_8(p);
817 	pi->object_hash = ceph_decode_8(p);
818 	pi->pg_num = ceph_decode_32(p);
819 	pi->pgp_num = ceph_decode_32(p);
820 
821 	/* lpg*, last_change, snap_seq, snap_epoch */
822 	ceph_decode_skip_n(p, end, 8 + 4 + 8 + 4, bad);
823 
824 	/* skip snaps */
825 	ceph_decode_32_safe(p, end, num, bad);
826 	while (num--) {
827 		/* snapid key, pool snap (with versions) */
828 		ceph_decode_skip_n(p, end, 8 + 2, bad);
829 		ceph_decode_skip_string(p, end, bad);
830 	}
831 
832 	/* removed_snaps */
833 	ceph_decode_skip_map(p, end, 64, 64, bad);
834 
835 	ceph_decode_need(p, end, 8 + 8 + 4, bad);
836 	*p += 8;  /* skip auid */
837 	pi->flags = ceph_decode_64(p);
838 	*p += 4;  /* skip crash_replay_interval */
839 
840 	if (ev >= 7)
841 		ceph_decode_8_safe(p, end, pi->min_size, bad);
842 	else
843 		pi->min_size = pi->size - pi->size / 2;
844 
845 	if (ev >= 8)
846 		/* quota_max_* */
847 		ceph_decode_skip_n(p, end, 8 + 8, bad);
848 
849 	if (ev >= 9) {
850 		/* tiers */
851 		ceph_decode_skip_set(p, end, 64, bad);
852 
853 		ceph_decode_need(p, end, 8 + 1 + 8 + 8, bad);
854 		*p += 8;  /* skip tier_of */
855 		*p += 1;  /* skip cache_mode */
856 		pi->read_tier = ceph_decode_64(p);
857 		pi->write_tier = ceph_decode_64(p);
858 	} else {
859 		pi->read_tier = -1;
860 		pi->write_tier = -1;
861 	}
862 
863 	if (ev >= 10)
864 		/* properties */
865 		ceph_decode_skip_map(p, end, string, string, bad);
866 
867 	if (ev >= 11) {
868 		/* hit_set_params (with versions) */
869 		ceph_decode_skip_n(p, end, 2, bad);
870 		ceph_decode_skip_string(p, end, bad);
871 
872 		/* hit_set_period, hit_set_count */
873 		ceph_decode_skip_n(p, end, 4 + 4, bad);
874 	}
875 
876 	if (ev >= 12)
877 		/* stripe_width */
878 		ceph_decode_skip_32(p, end, bad);
879 
880 	if (ev >= 13)
881 		/* target_max_*, cache_target_*, cache_min_* */
882 		ceph_decode_skip_n(p, end, 16 + 8 + 8, bad);
883 
884 	if (ev >= 14)
885 		/* erasure_code_profile */
886 		ceph_decode_skip_string(p, end, bad);
887 
888 	/*
889 	 * last_force_op_resend_preluminous, will be overridden if the
890 	 * map was encoded with RESEND_ON_SPLIT
891 	 */
892 	if (ev >= 15)
893 		ceph_decode_32_safe(p, end, pi->last_force_request_resend, bad);
894 	else
895 		pi->last_force_request_resend = 0;
896 
897 	if (ev >= 16)
898 		/* min_read_recency_for_promote */
899 		ceph_decode_skip_32(p, end, bad);
900 
901 	if (ev >= 17)
902 		/* expected_num_objects */
903 		ceph_decode_skip_64(p, end, bad);
904 
905 	if (ev >= 19)
906 		/* cache_target_dirty_high_ratio_micro */
907 		ceph_decode_skip_32(p, end, bad);
908 
909 	if (ev >= 20)
910 		/* min_write_recency_for_promote */
911 		ceph_decode_skip_32(p, end, bad);
912 
913 	if (ev >= 21)
914 		/* use_gmt_hitset */
915 		ceph_decode_skip_8(p, end, bad);
916 
917 	if (ev >= 22)
918 		/* fast_read */
919 		ceph_decode_skip_8(p, end, bad);
920 
921 	if (ev >= 23)
922 		/* hit_set_grade_decay_rate, hit_set_search_last_n */
923 		ceph_decode_skip_n(p, end, 4 + 4, bad);
924 
925 	if (ev >= 24) {
926 		/* opts (with versions) */
927 		ceph_decode_skip_n(p, end, 2, bad);
928 		ceph_decode_skip_string(p, end, bad);
929 	}
930 
931 	if (ev >= 25)
932 		ceph_decode_32_safe(p, end, pi->last_force_request_resend, bad);
933 
934 	/* ignore the rest */
935 
936 	*p = pool_end;
937 	calc_pg_masks(pi);
938 	return 0;
939 
940 bad:
941 	return -EINVAL;
942 }
943 
944 static int decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
945 {
946 	struct ceph_pg_pool_info *pi;
947 	u32 num, len;
948 	u64 pool;
949 
950 	ceph_decode_32_safe(p, end, num, bad);
951 	dout(" %d pool names\n", num);
952 	while (num--) {
953 		ceph_decode_64_safe(p, end, pool, bad);
954 		ceph_decode_32_safe(p, end, len, bad);
955 		dout("  pool %llu len %d\n", pool, len);
956 		ceph_decode_need(p, end, len, bad);
957 		pi = lookup_pg_pool(&map->pg_pools, pool);
958 		if (pi) {
959 			char *name = kstrndup(*p, len, GFP_NOFS);
960 
961 			if (!name)
962 				return -ENOMEM;
963 			kfree(pi->name);
964 			pi->name = name;
965 			dout("  name is %s\n", pi->name);
966 		}
967 		*p += len;
968 	}
969 	return 0;
970 
971 bad:
972 	return -EINVAL;
973 }
974 
975 /*
976  * CRUSH workspaces
977  *
978  * workspace_manager framework borrowed from fs/btrfs/compression.c.
979  * Two simplifications: there is only one type of workspace and there
980  * is always at least one workspace.
981  */
982 static struct crush_work *alloc_workspace(const struct crush_map *c)
983 {
984 	struct crush_work *work;
985 	size_t work_size;
986 
987 	WARN_ON(!c->working_size);
988 	work_size = crush_work_size(c, CEPH_PG_MAX_SIZE);
989 	dout("%s work_size %zu bytes\n", __func__, work_size);
990 
991 	work = kvmalloc(work_size, GFP_NOIO);
992 	if (!work)
993 		return NULL;
994 
995 	INIT_LIST_HEAD(&work->item);
996 	crush_init_workspace(c, work);
997 	return work;
998 }
999 
1000 static void free_workspace(struct crush_work *work)
1001 {
1002 	WARN_ON(!list_empty(&work->item));
1003 	kvfree(work);
1004 }
1005 
1006 static void init_workspace_manager(struct workspace_manager *wsm)
1007 {
1008 	INIT_LIST_HEAD(&wsm->idle_ws);
1009 	spin_lock_init(&wsm->ws_lock);
1010 	atomic_set(&wsm->total_ws, 0);
1011 	wsm->free_ws = 0;
1012 	init_waitqueue_head(&wsm->ws_wait);
1013 }
1014 
1015 static void add_initial_workspace(struct workspace_manager *wsm,
1016 				  struct crush_work *work)
1017 {
1018 	WARN_ON(!list_empty(&wsm->idle_ws));
1019 
1020 	list_add(&work->item, &wsm->idle_ws);
1021 	atomic_set(&wsm->total_ws, 1);
1022 	wsm->free_ws = 1;
1023 }
1024 
1025 static void cleanup_workspace_manager(struct workspace_manager *wsm)
1026 {
1027 	struct crush_work *work;
1028 
1029 	while (!list_empty(&wsm->idle_ws)) {
1030 		work = list_first_entry(&wsm->idle_ws, struct crush_work,
1031 					item);
1032 		list_del_init(&work->item);
1033 		free_workspace(work);
1034 	}
1035 	atomic_set(&wsm->total_ws, 0);
1036 	wsm->free_ws = 0;
1037 }
1038 
1039 /*
1040  * Finds an available workspace or allocates a new one.  If it's not
1041  * possible to allocate a new one, waits until there is one.
1042  */
1043 static struct crush_work *get_workspace(struct workspace_manager *wsm,
1044 					const struct crush_map *c)
1045 {
1046 	struct crush_work *work;
1047 	int cpus = num_online_cpus();
1048 
1049 again:
1050 	spin_lock(&wsm->ws_lock);
1051 	if (!list_empty(&wsm->idle_ws)) {
1052 		work = list_first_entry(&wsm->idle_ws, struct crush_work,
1053 					item);
1054 		list_del_init(&work->item);
1055 		wsm->free_ws--;
1056 		spin_unlock(&wsm->ws_lock);
1057 		return work;
1058 
1059 	}
1060 	if (atomic_read(&wsm->total_ws) > cpus) {
1061 		DEFINE_WAIT(wait);
1062 
1063 		spin_unlock(&wsm->ws_lock);
1064 		prepare_to_wait(&wsm->ws_wait, &wait, TASK_UNINTERRUPTIBLE);
1065 		if (atomic_read(&wsm->total_ws) > cpus && !wsm->free_ws)
1066 			schedule();
1067 		finish_wait(&wsm->ws_wait, &wait);
1068 		goto again;
1069 	}
1070 	atomic_inc(&wsm->total_ws);
1071 	spin_unlock(&wsm->ws_lock);
1072 
1073 	work = alloc_workspace(c);
1074 	if (!work) {
1075 		atomic_dec(&wsm->total_ws);
1076 		wake_up(&wsm->ws_wait);
1077 
1078 		/*
1079 		 * Do not return the error but go back to waiting.  We
1080 		 * have the initial workspace and the CRUSH computation
1081 		 * time is bounded so we will get it eventually.
1082 		 */
1083 		WARN_ON(atomic_read(&wsm->total_ws) < 1);
1084 		goto again;
1085 	}
1086 	return work;
1087 }
1088 
1089 /*
1090  * Puts a workspace back on the list or frees it if we have enough
1091  * idle ones sitting around.
1092  */
1093 static void put_workspace(struct workspace_manager *wsm,
1094 			  struct crush_work *work)
1095 {
1096 	spin_lock(&wsm->ws_lock);
1097 	if (wsm->free_ws <= num_online_cpus()) {
1098 		list_add(&work->item, &wsm->idle_ws);
1099 		wsm->free_ws++;
1100 		spin_unlock(&wsm->ws_lock);
1101 		goto wake;
1102 	}
1103 	spin_unlock(&wsm->ws_lock);
1104 
1105 	free_workspace(work);
1106 	atomic_dec(&wsm->total_ws);
1107 wake:
1108 	if (wq_has_sleeper(&wsm->ws_wait))
1109 		wake_up(&wsm->ws_wait);
1110 }
1111 
1112 /*
1113  * osd map
1114  */
1115 struct ceph_osdmap *ceph_osdmap_alloc(void)
1116 {
1117 	struct ceph_osdmap *map;
1118 
1119 	map = kzalloc(sizeof(*map), GFP_NOIO);
1120 	if (!map)
1121 		return NULL;
1122 
1123 	map->pg_pools = RB_ROOT;
1124 	map->pool_max = -1;
1125 	map->pg_temp = RB_ROOT;
1126 	map->primary_temp = RB_ROOT;
1127 	map->pg_upmap = RB_ROOT;
1128 	map->pg_upmap_items = RB_ROOT;
1129 
1130 	init_workspace_manager(&map->crush_wsm);
1131 
1132 	return map;
1133 }
1134 
1135 void ceph_osdmap_destroy(struct ceph_osdmap *map)
1136 {
1137 	dout("osdmap_destroy %p\n", map);
1138 
1139 	if (map->crush)
1140 		crush_destroy(map->crush);
1141 	cleanup_workspace_manager(&map->crush_wsm);
1142 
1143 	while (!RB_EMPTY_ROOT(&map->pg_temp)) {
1144 		struct ceph_pg_mapping *pg =
1145 			rb_entry(rb_first(&map->pg_temp),
1146 				 struct ceph_pg_mapping, node);
1147 		erase_pg_mapping(&map->pg_temp, pg);
1148 		free_pg_mapping(pg);
1149 	}
1150 	while (!RB_EMPTY_ROOT(&map->primary_temp)) {
1151 		struct ceph_pg_mapping *pg =
1152 			rb_entry(rb_first(&map->primary_temp),
1153 				 struct ceph_pg_mapping, node);
1154 		erase_pg_mapping(&map->primary_temp, pg);
1155 		free_pg_mapping(pg);
1156 	}
1157 	while (!RB_EMPTY_ROOT(&map->pg_upmap)) {
1158 		struct ceph_pg_mapping *pg =
1159 			rb_entry(rb_first(&map->pg_upmap),
1160 				 struct ceph_pg_mapping, node);
1161 		rb_erase(&pg->node, &map->pg_upmap);
1162 		kfree(pg);
1163 	}
1164 	while (!RB_EMPTY_ROOT(&map->pg_upmap_items)) {
1165 		struct ceph_pg_mapping *pg =
1166 			rb_entry(rb_first(&map->pg_upmap_items),
1167 				 struct ceph_pg_mapping, node);
1168 		rb_erase(&pg->node, &map->pg_upmap_items);
1169 		kfree(pg);
1170 	}
1171 	while (!RB_EMPTY_ROOT(&map->pg_pools)) {
1172 		struct ceph_pg_pool_info *pi =
1173 			rb_entry(rb_first(&map->pg_pools),
1174 				 struct ceph_pg_pool_info, node);
1175 		__remove_pg_pool(&map->pg_pools, pi);
1176 	}
1177 	kvfree(map->osd_state);
1178 	kvfree(map->osd_weight);
1179 	kvfree(map->osd_addr);
1180 	kvfree(map->osd_primary_affinity);
1181 	kfree(map);
1182 }
1183 
1184 /*
1185  * Adjust max_osd value, (re)allocate arrays.
1186  *
1187  * The new elements are properly initialized.
1188  */
1189 static int osdmap_set_max_osd(struct ceph_osdmap *map, u32 max)
1190 {
1191 	u32 *state;
1192 	u32 *weight;
1193 	struct ceph_entity_addr *addr;
1194 	u32 to_copy;
1195 	int i;
1196 
1197 	dout("%s old %u new %u\n", __func__, map->max_osd, max);
1198 	if (max == map->max_osd)
1199 		return 0;
1200 
1201 	state = kvmalloc(array_size(max, sizeof(*state)), GFP_NOFS);
1202 	weight = kvmalloc(array_size(max, sizeof(*weight)), GFP_NOFS);
1203 	addr = kvmalloc(array_size(max, sizeof(*addr)), GFP_NOFS);
1204 	if (!state || !weight || !addr) {
1205 		kvfree(state);
1206 		kvfree(weight);
1207 		kvfree(addr);
1208 		return -ENOMEM;
1209 	}
1210 
1211 	to_copy = min(map->max_osd, max);
1212 	if (map->osd_state) {
1213 		memcpy(state, map->osd_state, to_copy * sizeof(*state));
1214 		memcpy(weight, map->osd_weight, to_copy * sizeof(*weight));
1215 		memcpy(addr, map->osd_addr, to_copy * sizeof(*addr));
1216 		kvfree(map->osd_state);
1217 		kvfree(map->osd_weight);
1218 		kvfree(map->osd_addr);
1219 	}
1220 
1221 	map->osd_state = state;
1222 	map->osd_weight = weight;
1223 	map->osd_addr = addr;
1224 	for (i = map->max_osd; i < max; i++) {
1225 		map->osd_state[i] = 0;
1226 		map->osd_weight[i] = CEPH_OSD_OUT;
1227 		memset(map->osd_addr + i, 0, sizeof(*map->osd_addr));
1228 	}
1229 
1230 	if (map->osd_primary_affinity) {
1231 		u32 *affinity;
1232 
1233 		affinity = kvmalloc(array_size(max, sizeof(*affinity)),
1234 					 GFP_NOFS);
1235 		if (!affinity)
1236 			return -ENOMEM;
1237 
1238 		memcpy(affinity, map->osd_primary_affinity,
1239 		       to_copy * sizeof(*affinity));
1240 		kvfree(map->osd_primary_affinity);
1241 
1242 		map->osd_primary_affinity = affinity;
1243 		for (i = map->max_osd; i < max; i++)
1244 			map->osd_primary_affinity[i] =
1245 			    CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
1246 	}
1247 
1248 	map->max_osd = max;
1249 
1250 	return 0;
1251 }
1252 
1253 static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush)
1254 {
1255 	struct crush_work *work;
1256 
1257 	if (IS_ERR(crush))
1258 		return PTR_ERR(crush);
1259 
1260 	work = alloc_workspace(crush);
1261 	if (!work) {
1262 		crush_destroy(crush);
1263 		return -ENOMEM;
1264 	}
1265 
1266 	if (map->crush)
1267 		crush_destroy(map->crush);
1268 	cleanup_workspace_manager(&map->crush_wsm);
1269 	map->crush = crush;
1270 	add_initial_workspace(&map->crush_wsm, work);
1271 	return 0;
1272 }
1273 
1274 #define OSDMAP_WRAPPER_COMPAT_VER	7
1275 #define OSDMAP_CLIENT_DATA_COMPAT_VER	1
1276 
1277 /*
1278  * Return 0 or error.  On success, *v is set to 0 for old (v6) osdmaps,
1279  * to struct_v of the client_data section for new (v7 and above)
1280  * osdmaps.
1281  */
1282 static int get_osdmap_client_data_v(void **p, void *end,
1283 				    const char *prefix, u8 *v)
1284 {
1285 	u8 struct_v;
1286 
1287 	ceph_decode_8_safe(p, end, struct_v, e_inval);
1288 	if (struct_v >= 7) {
1289 		u8 struct_compat;
1290 
1291 		ceph_decode_8_safe(p, end, struct_compat, e_inval);
1292 		if (struct_compat > OSDMAP_WRAPPER_COMPAT_VER) {
1293 			pr_warn("got v %d cv %d > %d of %s ceph_osdmap\n",
1294 				struct_v, struct_compat,
1295 				OSDMAP_WRAPPER_COMPAT_VER, prefix);
1296 			return -EINVAL;
1297 		}
1298 		*p += 4; /* ignore wrapper struct_len */
1299 
1300 		ceph_decode_8_safe(p, end, struct_v, e_inval);
1301 		ceph_decode_8_safe(p, end, struct_compat, e_inval);
1302 		if (struct_compat > OSDMAP_CLIENT_DATA_COMPAT_VER) {
1303 			pr_warn("got v %d cv %d > %d of %s ceph_osdmap client data\n",
1304 				struct_v, struct_compat,
1305 				OSDMAP_CLIENT_DATA_COMPAT_VER, prefix);
1306 			return -EINVAL;
1307 		}
1308 		*p += 4; /* ignore client data struct_len */
1309 	} else {
1310 		u16 version;
1311 
1312 		*p -= 1;
1313 		ceph_decode_16_safe(p, end, version, e_inval);
1314 		if (version < 6) {
1315 			pr_warn("got v %d < 6 of %s ceph_osdmap\n",
1316 				version, prefix);
1317 			return -EINVAL;
1318 		}
1319 
1320 		/* old osdmap encoding */
1321 		struct_v = 0;
1322 	}
1323 
1324 	*v = struct_v;
1325 	return 0;
1326 
1327 e_inval:
1328 	return -EINVAL;
1329 }
1330 
1331 static int __decode_pools(void **p, void *end, struct ceph_osdmap *map,
1332 			  bool incremental)
1333 {
1334 	u32 n;
1335 
1336 	ceph_decode_32_safe(p, end, n, e_inval);
1337 	while (n--) {
1338 		struct ceph_pg_pool_info *pi;
1339 		u64 pool;
1340 		int ret;
1341 
1342 		ceph_decode_64_safe(p, end, pool, e_inval);
1343 
1344 		pi = lookup_pg_pool(&map->pg_pools, pool);
1345 		if (!incremental || !pi) {
1346 			pi = kzalloc(sizeof(*pi), GFP_NOFS);
1347 			if (!pi)
1348 				return -ENOMEM;
1349 
1350 			RB_CLEAR_NODE(&pi->node);
1351 			pi->id = pool;
1352 
1353 			if (!__insert_pg_pool(&map->pg_pools, pi)) {
1354 				kfree(pi);
1355 				return -EEXIST;
1356 			}
1357 		}
1358 
1359 		ret = decode_pool(p, end, pi);
1360 		if (ret)
1361 			return ret;
1362 	}
1363 
1364 	return 0;
1365 
1366 e_inval:
1367 	return -EINVAL;
1368 }
1369 
1370 static int decode_pools(void **p, void *end, struct ceph_osdmap *map)
1371 {
1372 	return __decode_pools(p, end, map, false);
1373 }
1374 
1375 static int decode_new_pools(void **p, void *end, struct ceph_osdmap *map)
1376 {
1377 	return __decode_pools(p, end, map, true);
1378 }
1379 
1380 typedef struct ceph_pg_mapping *(*decode_mapping_fn_t)(void **, void *, bool);
1381 
1382 static int decode_pg_mapping(void **p, void *end, struct rb_root *mapping_root,
1383 			     decode_mapping_fn_t fn, bool incremental)
1384 {
1385 	u32 n;
1386 
1387 	WARN_ON(!incremental && !fn);
1388 
1389 	ceph_decode_32_safe(p, end, n, e_inval);
1390 	while (n--) {
1391 		struct ceph_pg_mapping *pg;
1392 		struct ceph_pg pgid;
1393 		int ret;
1394 
1395 		ret = ceph_decode_pgid(p, end, &pgid);
1396 		if (ret)
1397 			return ret;
1398 
1399 		pg = lookup_pg_mapping(mapping_root, &pgid);
1400 		if (pg) {
1401 			WARN_ON(!incremental);
1402 			erase_pg_mapping(mapping_root, pg);
1403 			free_pg_mapping(pg);
1404 		}
1405 
1406 		if (fn) {
1407 			pg = fn(p, end, incremental);
1408 			if (IS_ERR(pg))
1409 				return PTR_ERR(pg);
1410 
1411 			if (pg) {
1412 				pg->pgid = pgid; /* struct */
1413 				insert_pg_mapping(mapping_root, pg);
1414 			}
1415 		}
1416 	}
1417 
1418 	return 0;
1419 
1420 e_inval:
1421 	return -EINVAL;
1422 }
1423 
1424 static struct ceph_pg_mapping *__decode_pg_temp(void **p, void *end,
1425 						bool incremental)
1426 {
1427 	struct ceph_pg_mapping *pg;
1428 	u32 len, i;
1429 
1430 	ceph_decode_32_safe(p, end, len, e_inval);
1431 	if (len == 0 && incremental)
1432 		return NULL;	/* new_pg_temp: [] to remove */
1433 	if ((size_t)len > (SIZE_MAX - sizeof(*pg)) / sizeof(u32))
1434 		return ERR_PTR(-EINVAL);
1435 
1436 	ceph_decode_need(p, end, len * sizeof(u32), e_inval);
1437 	pg = alloc_pg_mapping(len * sizeof(u32));
1438 	if (!pg)
1439 		return ERR_PTR(-ENOMEM);
1440 
1441 	pg->pg_temp.len = len;
1442 	for (i = 0; i < len; i++)
1443 		pg->pg_temp.osds[i] = ceph_decode_32(p);
1444 
1445 	return pg;
1446 
1447 e_inval:
1448 	return ERR_PTR(-EINVAL);
1449 }
1450 
1451 static int decode_pg_temp(void **p, void *end, struct ceph_osdmap *map)
1452 {
1453 	return decode_pg_mapping(p, end, &map->pg_temp, __decode_pg_temp,
1454 				 false);
1455 }
1456 
1457 static int decode_new_pg_temp(void **p, void *end, struct ceph_osdmap *map)
1458 {
1459 	return decode_pg_mapping(p, end, &map->pg_temp, __decode_pg_temp,
1460 				 true);
1461 }
1462 
1463 static struct ceph_pg_mapping *__decode_primary_temp(void **p, void *end,
1464 						     bool incremental)
1465 {
1466 	struct ceph_pg_mapping *pg;
1467 	u32 osd;
1468 
1469 	ceph_decode_32_safe(p, end, osd, e_inval);
1470 	if (osd == (u32)-1 && incremental)
1471 		return NULL;	/* new_primary_temp: -1 to remove */
1472 
1473 	pg = alloc_pg_mapping(0);
1474 	if (!pg)
1475 		return ERR_PTR(-ENOMEM);
1476 
1477 	pg->primary_temp.osd = osd;
1478 	return pg;
1479 
1480 e_inval:
1481 	return ERR_PTR(-EINVAL);
1482 }
1483 
1484 static int decode_primary_temp(void **p, void *end, struct ceph_osdmap *map)
1485 {
1486 	return decode_pg_mapping(p, end, &map->primary_temp,
1487 				 __decode_primary_temp, false);
1488 }
1489 
1490 static int decode_new_primary_temp(void **p, void *end,
1491 				   struct ceph_osdmap *map)
1492 {
1493 	return decode_pg_mapping(p, end, &map->primary_temp,
1494 				 __decode_primary_temp, true);
1495 }
1496 
1497 u32 ceph_get_primary_affinity(struct ceph_osdmap *map, int osd)
1498 {
1499 	if (!map->osd_primary_affinity)
1500 		return CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
1501 
1502 	return map->osd_primary_affinity[osd];
1503 }
1504 
1505 static int set_primary_affinity(struct ceph_osdmap *map, int osd, u32 aff)
1506 {
1507 	if (!map->osd_primary_affinity) {
1508 		int i;
1509 
1510 		map->osd_primary_affinity = kvmalloc(
1511 		    array_size(map->max_osd, sizeof(*map->osd_primary_affinity)),
1512 		    GFP_NOFS);
1513 		if (!map->osd_primary_affinity)
1514 			return -ENOMEM;
1515 
1516 		for (i = 0; i < map->max_osd; i++)
1517 			map->osd_primary_affinity[i] =
1518 			    CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
1519 	}
1520 
1521 	map->osd_primary_affinity[osd] = aff;
1522 
1523 	return 0;
1524 }
1525 
1526 static int decode_primary_affinity(void **p, void *end,
1527 				   struct ceph_osdmap *map)
1528 {
1529 	u32 len, i;
1530 
1531 	ceph_decode_32_safe(p, end, len, e_inval);
1532 	if (len == 0) {
1533 		kvfree(map->osd_primary_affinity);
1534 		map->osd_primary_affinity = NULL;
1535 		return 0;
1536 	}
1537 	if (len != map->max_osd)
1538 		goto e_inval;
1539 
1540 	ceph_decode_need(p, end, map->max_osd*sizeof(u32), e_inval);
1541 
1542 	for (i = 0; i < map->max_osd; i++) {
1543 		int ret;
1544 
1545 		ret = set_primary_affinity(map, i, ceph_decode_32(p));
1546 		if (ret)
1547 			return ret;
1548 	}
1549 
1550 	return 0;
1551 
1552 e_inval:
1553 	return -EINVAL;
1554 }
1555 
1556 static int decode_new_primary_affinity(void **p, void *end,
1557 				       struct ceph_osdmap *map)
1558 {
1559 	u32 n;
1560 
1561 	ceph_decode_32_safe(p, end, n, e_inval);
1562 	while (n--) {
1563 		u32 osd, aff;
1564 		int ret;
1565 
1566 		ceph_decode_32_safe(p, end, osd, e_inval);
1567 		ceph_decode_32_safe(p, end, aff, e_inval);
1568 		if (osd >= map->max_osd)
1569 			goto e_inval;
1570 
1571 		ret = set_primary_affinity(map, osd, aff);
1572 		if (ret)
1573 			return ret;
1574 
1575 		osdmap_info(map, "osd%d primary-affinity 0x%x\n", osd, aff);
1576 	}
1577 
1578 	return 0;
1579 
1580 e_inval:
1581 	return -EINVAL;
1582 }
1583 
1584 static struct ceph_pg_mapping *__decode_pg_upmap(void **p, void *end,
1585 						 bool __unused)
1586 {
1587 	return __decode_pg_temp(p, end, false);
1588 }
1589 
1590 static int decode_pg_upmap(void **p, void *end, struct ceph_osdmap *map)
1591 {
1592 	return decode_pg_mapping(p, end, &map->pg_upmap, __decode_pg_upmap,
1593 				 false);
1594 }
1595 
1596 static int decode_new_pg_upmap(void **p, void *end, struct ceph_osdmap *map)
1597 {
1598 	return decode_pg_mapping(p, end, &map->pg_upmap, __decode_pg_upmap,
1599 				 true);
1600 }
1601 
1602 static int decode_old_pg_upmap(void **p, void *end, struct ceph_osdmap *map)
1603 {
1604 	return decode_pg_mapping(p, end, &map->pg_upmap, NULL, true);
1605 }
1606 
1607 static struct ceph_pg_mapping *__decode_pg_upmap_items(void **p, void *end,
1608 						       bool __unused)
1609 {
1610 	struct ceph_pg_mapping *pg;
1611 	u32 len, i;
1612 
1613 	ceph_decode_32_safe(p, end, len, e_inval);
1614 	if ((size_t)len > (SIZE_MAX - sizeof(*pg)) / (2 * sizeof(u32)))
1615 		return ERR_PTR(-EINVAL);
1616 
1617 	ceph_decode_need(p, end, 2 * len * sizeof(u32), e_inval);
1618 	pg = alloc_pg_mapping(2 * len * sizeof(u32));
1619 	if (!pg)
1620 		return ERR_PTR(-ENOMEM);
1621 
1622 	pg->pg_upmap_items.len = len;
1623 	for (i = 0; i < len; i++) {
1624 		pg->pg_upmap_items.from_to[i][0] = ceph_decode_32(p);
1625 		pg->pg_upmap_items.from_to[i][1] = ceph_decode_32(p);
1626 	}
1627 
1628 	return pg;
1629 
1630 e_inval:
1631 	return ERR_PTR(-EINVAL);
1632 }
1633 
1634 static int decode_pg_upmap_items(void **p, void *end, struct ceph_osdmap *map)
1635 {
1636 	return decode_pg_mapping(p, end, &map->pg_upmap_items,
1637 				 __decode_pg_upmap_items, false);
1638 }
1639 
1640 static int decode_new_pg_upmap_items(void **p, void *end,
1641 				     struct ceph_osdmap *map)
1642 {
1643 	return decode_pg_mapping(p, end, &map->pg_upmap_items,
1644 				 __decode_pg_upmap_items, true);
1645 }
1646 
1647 static int decode_old_pg_upmap_items(void **p, void *end,
1648 				     struct ceph_osdmap *map)
1649 {
1650 	return decode_pg_mapping(p, end, &map->pg_upmap_items, NULL, true);
1651 }
1652 
1653 /*
1654  * decode a full map.
1655  */
1656 static int osdmap_decode(void **p, void *end, bool msgr2,
1657 			 struct ceph_osdmap *map)
1658 {
1659 	u8 struct_v;
1660 	u32 epoch = 0;
1661 	void *start = *p;
1662 	u32 max;
1663 	u32 len, i;
1664 	int err;
1665 
1666 	dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p));
1667 
1668 	err = get_osdmap_client_data_v(p, end, "full", &struct_v);
1669 	if (err)
1670 		goto bad;
1671 
1672 	/* fsid, epoch, created, modified */
1673 	ceph_decode_need(p, end, sizeof(map->fsid) + sizeof(u32) +
1674 			 sizeof(map->created) + sizeof(map->modified), e_inval);
1675 	ceph_decode_copy(p, &map->fsid, sizeof(map->fsid));
1676 	epoch = map->epoch = ceph_decode_32(p);
1677 	ceph_decode_copy(p, &map->created, sizeof(map->created));
1678 	ceph_decode_copy(p, &map->modified, sizeof(map->modified));
1679 
1680 	/* pools */
1681 	err = decode_pools(p, end, map);
1682 	if (err)
1683 		goto bad;
1684 
1685 	/* pool_name */
1686 	err = decode_pool_names(p, end, map);
1687 	if (err)
1688 		goto bad;
1689 
1690 	ceph_decode_32_safe(p, end, map->pool_max, e_inval);
1691 
1692 	ceph_decode_32_safe(p, end, map->flags, e_inval);
1693 
1694 	/* max_osd */
1695 	ceph_decode_32_safe(p, end, max, e_inval);
1696 
1697 	/* (re)alloc osd arrays */
1698 	err = osdmap_set_max_osd(map, max);
1699 	if (err)
1700 		goto bad;
1701 
1702 	/* osd_state, osd_weight, osd_addrs->client_addr */
1703 	ceph_decode_need(p, end, 3*sizeof(u32) +
1704 			 map->max_osd*(struct_v >= 5 ? sizeof(u32) :
1705 						       sizeof(u8)) +
1706 				       sizeof(*map->osd_weight), e_inval);
1707 	if (ceph_decode_32(p) != map->max_osd)
1708 		goto e_inval;
1709 
1710 	if (struct_v >= 5) {
1711 		for (i = 0; i < map->max_osd; i++)
1712 			map->osd_state[i] = ceph_decode_32(p);
1713 	} else {
1714 		for (i = 0; i < map->max_osd; i++)
1715 			map->osd_state[i] = ceph_decode_8(p);
1716 	}
1717 
1718 	if (ceph_decode_32(p) != map->max_osd)
1719 		goto e_inval;
1720 
1721 	for (i = 0; i < map->max_osd; i++)
1722 		map->osd_weight[i] = ceph_decode_32(p);
1723 
1724 	if (ceph_decode_32(p) != map->max_osd)
1725 		goto e_inval;
1726 
1727 	for (i = 0; i < map->max_osd; i++) {
1728 		struct ceph_entity_addr *addr = &map->osd_addr[i];
1729 
1730 		if (struct_v >= 8)
1731 			err = ceph_decode_entity_addrvec(p, end, msgr2, addr);
1732 		else
1733 			err = ceph_decode_entity_addr(p, end, addr);
1734 		if (err)
1735 			goto bad;
1736 
1737 		dout("%s osd%d addr %s\n", __func__, i, ceph_pr_addr(addr));
1738 	}
1739 
1740 	/* pg_temp */
1741 	err = decode_pg_temp(p, end, map);
1742 	if (err)
1743 		goto bad;
1744 
1745 	/* primary_temp */
1746 	if (struct_v >= 1) {
1747 		err = decode_primary_temp(p, end, map);
1748 		if (err)
1749 			goto bad;
1750 	}
1751 
1752 	/* primary_affinity */
1753 	if (struct_v >= 2) {
1754 		err = decode_primary_affinity(p, end, map);
1755 		if (err)
1756 			goto bad;
1757 	} else {
1758 		WARN_ON(map->osd_primary_affinity);
1759 	}
1760 
1761 	/* crush */
1762 	ceph_decode_32_safe(p, end, len, e_inval);
1763 	err = osdmap_set_crush(map, crush_decode(*p, min(*p + len, end)));
1764 	if (err)
1765 		goto bad;
1766 
1767 	*p += len;
1768 	if (struct_v >= 3) {
1769 		/* erasure_code_profiles */
1770 		ceph_decode_skip_map_of_map(p, end, string, string, string,
1771 					    e_inval);
1772 	}
1773 
1774 	if (struct_v >= 4) {
1775 		err = decode_pg_upmap(p, end, map);
1776 		if (err)
1777 			goto bad;
1778 
1779 		err = decode_pg_upmap_items(p, end, map);
1780 		if (err)
1781 			goto bad;
1782 	} else {
1783 		WARN_ON(!RB_EMPTY_ROOT(&map->pg_upmap));
1784 		WARN_ON(!RB_EMPTY_ROOT(&map->pg_upmap_items));
1785 	}
1786 
1787 	/* ignore the rest */
1788 	*p = end;
1789 
1790 	dout("full osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd);
1791 	return 0;
1792 
1793 e_inval:
1794 	err = -EINVAL;
1795 bad:
1796 	pr_err("corrupt full osdmap (%d) epoch %d off %d (%p of %p-%p)\n",
1797 	       err, epoch, (int)(*p - start), *p, start, end);
1798 	print_hex_dump(KERN_DEBUG, "osdmap: ",
1799 		       DUMP_PREFIX_OFFSET, 16, 1,
1800 		       start, end - start, true);
1801 	return err;
1802 }
1803 
1804 /*
1805  * Allocate and decode a full map.
1806  */
1807 struct ceph_osdmap *ceph_osdmap_decode(void **p, void *end, bool msgr2)
1808 {
1809 	struct ceph_osdmap *map;
1810 	int ret;
1811 
1812 	map = ceph_osdmap_alloc();
1813 	if (!map)
1814 		return ERR_PTR(-ENOMEM);
1815 
1816 	ret = osdmap_decode(p, end, msgr2, map);
1817 	if (ret) {
1818 		ceph_osdmap_destroy(map);
1819 		return ERR_PTR(ret);
1820 	}
1821 
1822 	return map;
1823 }
1824 
1825 /*
1826  * Encoding order is (new_up_client, new_state, new_weight).  Need to
1827  * apply in the (new_weight, new_state, new_up_client) order, because
1828  * an incremental map may look like e.g.
1829  *
1830  *     new_up_client: { osd=6, addr=... } # set osd_state and addr
1831  *     new_state: { osd=6, xorstate=EXISTS } # clear osd_state
1832  */
1833 static int decode_new_up_state_weight(void **p, void *end, u8 struct_v,
1834 				      bool msgr2, struct ceph_osdmap *map)
1835 {
1836 	void *new_up_client;
1837 	void *new_state;
1838 	void *new_weight_end;
1839 	u32 len;
1840 	int ret;
1841 	int i;
1842 
1843 	new_up_client = *p;
1844 	ceph_decode_32_safe(p, end, len, e_inval);
1845 	for (i = 0; i < len; ++i) {
1846 		struct ceph_entity_addr addr;
1847 
1848 		ceph_decode_skip_32(p, end, e_inval);
1849 		if (struct_v >= 7)
1850 			ret = ceph_decode_entity_addrvec(p, end, msgr2, &addr);
1851 		else
1852 			ret = ceph_decode_entity_addr(p, end, &addr);
1853 		if (ret)
1854 			return ret;
1855 	}
1856 
1857 	new_state = *p;
1858 	ceph_decode_32_safe(p, end, len, e_inval);
1859 	len *= sizeof(u32) + (struct_v >= 5 ? sizeof(u32) : sizeof(u8));
1860 	ceph_decode_need(p, end, len, e_inval);
1861 	*p += len;
1862 
1863 	/* new_weight */
1864 	ceph_decode_32_safe(p, end, len, e_inval);
1865 	while (len--) {
1866 		s32 osd;
1867 		u32 w;
1868 
1869 		ceph_decode_need(p, end, 2*sizeof(u32), e_inval);
1870 		osd = ceph_decode_32(p);
1871 		w = ceph_decode_32(p);
1872 		if (osd >= map->max_osd)
1873 			goto e_inval;
1874 
1875 		osdmap_info(map, "osd%d weight 0x%x %s\n", osd, w,
1876 			    w == CEPH_OSD_IN ? "(in)" :
1877 			    (w == CEPH_OSD_OUT ? "(out)" : ""));
1878 		map->osd_weight[osd] = w;
1879 
1880 		/*
1881 		 * If we are marking in, set the EXISTS, and clear the
1882 		 * AUTOOUT and NEW bits.
1883 		 */
1884 		if (w) {
1885 			map->osd_state[osd] |= CEPH_OSD_EXISTS;
1886 			map->osd_state[osd] &= ~(CEPH_OSD_AUTOOUT |
1887 						 CEPH_OSD_NEW);
1888 		}
1889 	}
1890 	new_weight_end = *p;
1891 
1892 	/* new_state (up/down) */
1893 	*p = new_state;
1894 	len = ceph_decode_32(p);
1895 	while (len--) {
1896 		s32 osd;
1897 		u32 xorstate;
1898 
1899 		osd = ceph_decode_32(p);
1900 		if (osd >= map->max_osd)
1901 			goto e_inval;
1902 
1903 		if (struct_v >= 5)
1904 			xorstate = ceph_decode_32(p);
1905 		else
1906 			xorstate = ceph_decode_8(p);
1907 		if (xorstate == 0)
1908 			xorstate = CEPH_OSD_UP;
1909 		if ((map->osd_state[osd] & CEPH_OSD_UP) &&
1910 		    (xorstate & CEPH_OSD_UP))
1911 			osdmap_info(map, "osd%d down\n", osd);
1912 		if ((map->osd_state[osd] & CEPH_OSD_EXISTS) &&
1913 		    (xorstate & CEPH_OSD_EXISTS)) {
1914 			osdmap_info(map, "osd%d does not exist\n", osd);
1915 			ret = set_primary_affinity(map, osd,
1916 						   CEPH_OSD_DEFAULT_PRIMARY_AFFINITY);
1917 			if (ret)
1918 				return ret;
1919 			memset(map->osd_addr + osd, 0, sizeof(*map->osd_addr));
1920 			map->osd_state[osd] = 0;
1921 		} else {
1922 			map->osd_state[osd] ^= xorstate;
1923 		}
1924 	}
1925 
1926 	/* new_up_client */
1927 	*p = new_up_client;
1928 	len = ceph_decode_32(p);
1929 	while (len--) {
1930 		s32 osd;
1931 		struct ceph_entity_addr addr;
1932 
1933 		osd = ceph_decode_32(p);
1934 		if (osd >= map->max_osd)
1935 			goto e_inval;
1936 
1937 		if (struct_v >= 7)
1938 			ret = ceph_decode_entity_addrvec(p, end, msgr2, &addr);
1939 		else
1940 			ret = ceph_decode_entity_addr(p, end, &addr);
1941 		if (ret)
1942 			return ret;
1943 
1944 		dout("%s osd%d addr %s\n", __func__, osd, ceph_pr_addr(&addr));
1945 
1946 		osdmap_info(map, "osd%d up\n", osd);
1947 		map->osd_state[osd] |= CEPH_OSD_EXISTS | CEPH_OSD_UP;
1948 		map->osd_addr[osd] = addr;
1949 	}
1950 
1951 	*p = new_weight_end;
1952 	return 0;
1953 
1954 e_inval:
1955 	return -EINVAL;
1956 }
1957 
1958 /*
1959  * decode and apply an incremental map update.
1960  */
1961 struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, bool msgr2,
1962 					     struct ceph_osdmap *map)
1963 {
1964 	struct ceph_fsid fsid;
1965 	u32 epoch = 0;
1966 	struct ceph_timespec modified;
1967 	s32 len;
1968 	u64 pool;
1969 	__s64 new_pool_max;
1970 	__s32 new_flags, max;
1971 	void *start = *p;
1972 	int err;
1973 	u8 struct_v;
1974 
1975 	dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p));
1976 
1977 	err = get_osdmap_client_data_v(p, end, "inc", &struct_v);
1978 	if (err)
1979 		goto bad;
1980 
1981 	/* fsid, epoch, modified, new_pool_max, new_flags */
1982 	ceph_decode_need(p, end, sizeof(fsid) + sizeof(u32) + sizeof(modified) +
1983 			 sizeof(u64) + sizeof(u32), e_inval);
1984 	ceph_decode_copy(p, &fsid, sizeof(fsid));
1985 	epoch = ceph_decode_32(p);
1986 	ceph_decode_copy(p, &modified, sizeof(modified));
1987 	new_pool_max = ceph_decode_64(p);
1988 	new_flags = ceph_decode_32(p);
1989 
1990 	if (epoch != map->epoch + 1)
1991 		goto e_inval;
1992 
1993 	/* full map? */
1994 	ceph_decode_32_safe(p, end, len, e_inval);
1995 	if (len > 0) {
1996 		dout("apply_incremental full map len %d, %p to %p\n",
1997 		     len, *p, end);
1998 		return ceph_osdmap_decode(p, min(*p+len, end), msgr2);
1999 	}
2000 
2001 	/* new crush? */
2002 	ceph_decode_32_safe(p, end, len, e_inval);
2003 	if (len > 0) {
2004 		err = osdmap_set_crush(map,
2005 				       crush_decode(*p, min(*p + len, end)));
2006 		if (err)
2007 			goto bad;
2008 		*p += len;
2009 	}
2010 
2011 	/* new flags? */
2012 	if (new_flags >= 0)
2013 		map->flags = new_flags;
2014 	if (new_pool_max >= 0)
2015 		map->pool_max = new_pool_max;
2016 
2017 	/* new max? */
2018 	ceph_decode_32_safe(p, end, max, e_inval);
2019 	if (max >= 0) {
2020 		err = osdmap_set_max_osd(map, max);
2021 		if (err)
2022 			goto bad;
2023 	}
2024 
2025 	map->epoch++;
2026 	map->modified = modified;
2027 
2028 	/* new_pools */
2029 	err = decode_new_pools(p, end, map);
2030 	if (err)
2031 		goto bad;
2032 
2033 	/* new_pool_names */
2034 	err = decode_pool_names(p, end, map);
2035 	if (err)
2036 		goto bad;
2037 
2038 	/* old_pool */
2039 	ceph_decode_32_safe(p, end, len, e_inval);
2040 	while (len--) {
2041 		struct ceph_pg_pool_info *pi;
2042 
2043 		ceph_decode_64_safe(p, end, pool, e_inval);
2044 		pi = lookup_pg_pool(&map->pg_pools, pool);
2045 		if (pi)
2046 			__remove_pg_pool(&map->pg_pools, pi);
2047 	}
2048 
2049 	/* new_up_client, new_state, new_weight */
2050 	err = decode_new_up_state_weight(p, end, struct_v, msgr2, map);
2051 	if (err)
2052 		goto bad;
2053 
2054 	/* new_pg_temp */
2055 	err = decode_new_pg_temp(p, end, map);
2056 	if (err)
2057 		goto bad;
2058 
2059 	/* new_primary_temp */
2060 	if (struct_v >= 1) {
2061 		err = decode_new_primary_temp(p, end, map);
2062 		if (err)
2063 			goto bad;
2064 	}
2065 
2066 	/* new_primary_affinity */
2067 	if (struct_v >= 2) {
2068 		err = decode_new_primary_affinity(p, end, map);
2069 		if (err)
2070 			goto bad;
2071 	}
2072 
2073 	if (struct_v >= 3) {
2074 		/* new_erasure_code_profiles */
2075 		ceph_decode_skip_map_of_map(p, end, string, string, string,
2076 					    e_inval);
2077 		/* old_erasure_code_profiles */
2078 		ceph_decode_skip_set(p, end, string, e_inval);
2079 	}
2080 
2081 	if (struct_v >= 4) {
2082 		err = decode_new_pg_upmap(p, end, map);
2083 		if (err)
2084 			goto bad;
2085 
2086 		err = decode_old_pg_upmap(p, end, map);
2087 		if (err)
2088 			goto bad;
2089 
2090 		err = decode_new_pg_upmap_items(p, end, map);
2091 		if (err)
2092 			goto bad;
2093 
2094 		err = decode_old_pg_upmap_items(p, end, map);
2095 		if (err)
2096 			goto bad;
2097 	}
2098 
2099 	/* ignore the rest */
2100 	*p = end;
2101 
2102 	dout("inc osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd);
2103 	return map;
2104 
2105 e_inval:
2106 	err = -EINVAL;
2107 bad:
2108 	pr_err("corrupt inc osdmap (%d) epoch %d off %d (%p of %p-%p)\n",
2109 	       err, epoch, (int)(*p - start), *p, start, end);
2110 	print_hex_dump(KERN_DEBUG, "osdmap: ",
2111 		       DUMP_PREFIX_OFFSET, 16, 1,
2112 		       start, end - start, true);
2113 	return ERR_PTR(err);
2114 }
2115 
2116 void ceph_oloc_copy(struct ceph_object_locator *dest,
2117 		    const struct ceph_object_locator *src)
2118 {
2119 	ceph_oloc_destroy(dest);
2120 
2121 	dest->pool = src->pool;
2122 	if (src->pool_ns)
2123 		dest->pool_ns = ceph_get_string(src->pool_ns);
2124 	else
2125 		dest->pool_ns = NULL;
2126 }
2127 EXPORT_SYMBOL(ceph_oloc_copy);
2128 
2129 void ceph_oloc_destroy(struct ceph_object_locator *oloc)
2130 {
2131 	ceph_put_string(oloc->pool_ns);
2132 }
2133 EXPORT_SYMBOL(ceph_oloc_destroy);
2134 
2135 void ceph_oid_copy(struct ceph_object_id *dest,
2136 		   const struct ceph_object_id *src)
2137 {
2138 	ceph_oid_destroy(dest);
2139 
2140 	if (src->name != src->inline_name) {
2141 		/* very rare, see ceph_object_id definition */
2142 		dest->name = kmalloc(src->name_len + 1,
2143 				     GFP_NOIO | __GFP_NOFAIL);
2144 	} else {
2145 		dest->name = dest->inline_name;
2146 	}
2147 	memcpy(dest->name, src->name, src->name_len + 1);
2148 	dest->name_len = src->name_len;
2149 }
2150 EXPORT_SYMBOL(ceph_oid_copy);
2151 
2152 static __printf(2, 0)
2153 int oid_printf_vargs(struct ceph_object_id *oid, const char *fmt, va_list ap)
2154 {
2155 	int len;
2156 
2157 	WARN_ON(!ceph_oid_empty(oid));
2158 
2159 	len = vsnprintf(oid->inline_name, sizeof(oid->inline_name), fmt, ap);
2160 	if (len >= sizeof(oid->inline_name))
2161 		return len;
2162 
2163 	oid->name_len = len;
2164 	return 0;
2165 }
2166 
2167 /*
2168  * If oid doesn't fit into inline buffer, BUG.
2169  */
2170 void ceph_oid_printf(struct ceph_object_id *oid, const char *fmt, ...)
2171 {
2172 	va_list ap;
2173 
2174 	va_start(ap, fmt);
2175 	BUG_ON(oid_printf_vargs(oid, fmt, ap));
2176 	va_end(ap);
2177 }
2178 EXPORT_SYMBOL(ceph_oid_printf);
2179 
2180 static __printf(3, 0)
2181 int oid_aprintf_vargs(struct ceph_object_id *oid, gfp_t gfp,
2182 		      const char *fmt, va_list ap)
2183 {
2184 	va_list aq;
2185 	int len;
2186 
2187 	va_copy(aq, ap);
2188 	len = oid_printf_vargs(oid, fmt, aq);
2189 	va_end(aq);
2190 
2191 	if (len) {
2192 		char *external_name;
2193 
2194 		external_name = kmalloc(len + 1, gfp);
2195 		if (!external_name)
2196 			return -ENOMEM;
2197 
2198 		oid->name = external_name;
2199 		WARN_ON(vsnprintf(oid->name, len + 1, fmt, ap) != len);
2200 		oid->name_len = len;
2201 	}
2202 
2203 	return 0;
2204 }
2205 
2206 /*
2207  * If oid doesn't fit into inline buffer, allocate.
2208  */
2209 int ceph_oid_aprintf(struct ceph_object_id *oid, gfp_t gfp,
2210 		     const char *fmt, ...)
2211 {
2212 	va_list ap;
2213 	int ret;
2214 
2215 	va_start(ap, fmt);
2216 	ret = oid_aprintf_vargs(oid, gfp, fmt, ap);
2217 	va_end(ap);
2218 
2219 	return ret;
2220 }
2221 EXPORT_SYMBOL(ceph_oid_aprintf);
2222 
2223 void ceph_oid_destroy(struct ceph_object_id *oid)
2224 {
2225 	if (oid->name != oid->inline_name)
2226 		kfree(oid->name);
2227 }
2228 EXPORT_SYMBOL(ceph_oid_destroy);
2229 
2230 /*
2231  * osds only
2232  */
2233 static bool __osds_equal(const struct ceph_osds *lhs,
2234 			 const struct ceph_osds *rhs)
2235 {
2236 	if (lhs->size == rhs->size &&
2237 	    !memcmp(lhs->osds, rhs->osds, rhs->size * sizeof(rhs->osds[0])))
2238 		return true;
2239 
2240 	return false;
2241 }
2242 
2243 /*
2244  * osds + primary
2245  */
2246 static bool osds_equal(const struct ceph_osds *lhs,
2247 		       const struct ceph_osds *rhs)
2248 {
2249 	if (__osds_equal(lhs, rhs) &&
2250 	    lhs->primary == rhs->primary)
2251 		return true;
2252 
2253 	return false;
2254 }
2255 
2256 static bool osds_valid(const struct ceph_osds *set)
2257 {
2258 	/* non-empty set */
2259 	if (set->size > 0 && set->primary >= 0)
2260 		return true;
2261 
2262 	/* empty can_shift_osds set */
2263 	if (!set->size && set->primary == -1)
2264 		return true;
2265 
2266 	/* empty !can_shift_osds set - all NONE */
2267 	if (set->size > 0 && set->primary == -1) {
2268 		int i;
2269 
2270 		for (i = 0; i < set->size; i++) {
2271 			if (set->osds[i] != CRUSH_ITEM_NONE)
2272 				break;
2273 		}
2274 		if (i == set->size)
2275 			return true;
2276 	}
2277 
2278 	return false;
2279 }
2280 
2281 void ceph_osds_copy(struct ceph_osds *dest, const struct ceph_osds *src)
2282 {
2283 	memcpy(dest->osds, src->osds, src->size * sizeof(src->osds[0]));
2284 	dest->size = src->size;
2285 	dest->primary = src->primary;
2286 }
2287 
2288 bool ceph_pg_is_split(const struct ceph_pg *pgid, u32 old_pg_num,
2289 		      u32 new_pg_num)
2290 {
2291 	int old_bits = calc_bits_of(old_pg_num);
2292 	int old_mask = (1 << old_bits) - 1;
2293 	int n;
2294 
2295 	WARN_ON(pgid->seed >= old_pg_num);
2296 	if (new_pg_num <= old_pg_num)
2297 		return false;
2298 
2299 	for (n = 1; ; n++) {
2300 		int next_bit = n << (old_bits - 1);
2301 		u32 s = next_bit | pgid->seed;
2302 
2303 		if (s < old_pg_num || s == pgid->seed)
2304 			continue;
2305 		if (s >= new_pg_num)
2306 			break;
2307 
2308 		s = ceph_stable_mod(s, old_pg_num, old_mask);
2309 		if (s == pgid->seed)
2310 			return true;
2311 	}
2312 
2313 	return false;
2314 }
2315 
2316 bool ceph_is_new_interval(const struct ceph_osds *old_acting,
2317 			  const struct ceph_osds *new_acting,
2318 			  const struct ceph_osds *old_up,
2319 			  const struct ceph_osds *new_up,
2320 			  int old_size,
2321 			  int new_size,
2322 			  int old_min_size,
2323 			  int new_min_size,
2324 			  u32 old_pg_num,
2325 			  u32 new_pg_num,
2326 			  bool old_sort_bitwise,
2327 			  bool new_sort_bitwise,
2328 			  bool old_recovery_deletes,
2329 			  bool new_recovery_deletes,
2330 			  const struct ceph_pg *pgid)
2331 {
2332 	return !osds_equal(old_acting, new_acting) ||
2333 	       !osds_equal(old_up, new_up) ||
2334 	       old_size != new_size ||
2335 	       old_min_size != new_min_size ||
2336 	       ceph_pg_is_split(pgid, old_pg_num, new_pg_num) ||
2337 	       old_sort_bitwise != new_sort_bitwise ||
2338 	       old_recovery_deletes != new_recovery_deletes;
2339 }
2340 
2341 static int calc_pg_rank(int osd, const struct ceph_osds *acting)
2342 {
2343 	int i;
2344 
2345 	for (i = 0; i < acting->size; i++) {
2346 		if (acting->osds[i] == osd)
2347 			return i;
2348 	}
2349 
2350 	return -1;
2351 }
2352 
2353 static bool primary_changed(const struct ceph_osds *old_acting,
2354 			    const struct ceph_osds *new_acting)
2355 {
2356 	if (!old_acting->size && !new_acting->size)
2357 		return false; /* both still empty */
2358 
2359 	if (!old_acting->size ^ !new_acting->size)
2360 		return true; /* was empty, now not, or vice versa */
2361 
2362 	if (old_acting->primary != new_acting->primary)
2363 		return true; /* primary changed */
2364 
2365 	if (calc_pg_rank(old_acting->primary, old_acting) !=
2366 	    calc_pg_rank(new_acting->primary, new_acting))
2367 		return true;
2368 
2369 	return false; /* same primary (tho replicas may have changed) */
2370 }
2371 
2372 bool ceph_osds_changed(const struct ceph_osds *old_acting,
2373 		       const struct ceph_osds *new_acting,
2374 		       bool any_change)
2375 {
2376 	if (primary_changed(old_acting, new_acting))
2377 		return true;
2378 
2379 	if (any_change && !__osds_equal(old_acting, new_acting))
2380 		return true;
2381 
2382 	return false;
2383 }
2384 
2385 /*
2386  * Map an object into a PG.
2387  *
2388  * Should only be called with target_oid and target_oloc (as opposed to
2389  * base_oid and base_oloc), since tiering isn't taken into account.
2390  */
2391 void __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi,
2392 				 const struct ceph_object_id *oid,
2393 				 const struct ceph_object_locator *oloc,
2394 				 struct ceph_pg *raw_pgid)
2395 {
2396 	WARN_ON(pi->id != oloc->pool);
2397 
2398 	if (!oloc->pool_ns) {
2399 		raw_pgid->pool = oloc->pool;
2400 		raw_pgid->seed = ceph_str_hash(pi->object_hash, oid->name,
2401 					     oid->name_len);
2402 		dout("%s %s -> raw_pgid %llu.%x\n", __func__, oid->name,
2403 		     raw_pgid->pool, raw_pgid->seed);
2404 	} else {
2405 		char stack_buf[256];
2406 		char *buf = stack_buf;
2407 		int nsl = oloc->pool_ns->len;
2408 		size_t total = nsl + 1 + oid->name_len;
2409 
2410 		if (total > sizeof(stack_buf))
2411 			buf = kmalloc(total, GFP_NOIO | __GFP_NOFAIL);
2412 		memcpy(buf, oloc->pool_ns->str, nsl);
2413 		buf[nsl] = '\037';
2414 		memcpy(buf + nsl + 1, oid->name, oid->name_len);
2415 		raw_pgid->pool = oloc->pool;
2416 		raw_pgid->seed = ceph_str_hash(pi->object_hash, buf, total);
2417 		if (buf != stack_buf)
2418 			kfree(buf);
2419 		dout("%s %s ns %.*s -> raw_pgid %llu.%x\n", __func__,
2420 		     oid->name, nsl, oloc->pool_ns->str,
2421 		     raw_pgid->pool, raw_pgid->seed);
2422 	}
2423 }
2424 
2425 int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
2426 			      const struct ceph_object_id *oid,
2427 			      const struct ceph_object_locator *oloc,
2428 			      struct ceph_pg *raw_pgid)
2429 {
2430 	struct ceph_pg_pool_info *pi;
2431 
2432 	pi = ceph_pg_pool_by_id(osdmap, oloc->pool);
2433 	if (!pi)
2434 		return -ENOENT;
2435 
2436 	__ceph_object_locator_to_pg(pi, oid, oloc, raw_pgid);
2437 	return 0;
2438 }
2439 EXPORT_SYMBOL(ceph_object_locator_to_pg);
2440 
2441 /*
2442  * Map a raw PG (full precision ps) into an actual PG.
2443  */
2444 static void raw_pg_to_pg(struct ceph_pg_pool_info *pi,
2445 			 const struct ceph_pg *raw_pgid,
2446 			 struct ceph_pg *pgid)
2447 {
2448 	pgid->pool = raw_pgid->pool;
2449 	pgid->seed = ceph_stable_mod(raw_pgid->seed, pi->pg_num,
2450 				     pi->pg_num_mask);
2451 }
2452 
2453 /*
2454  * Map a raw PG (full precision ps) into a placement ps (placement
2455  * seed).  Include pool id in that value so that different pools don't
2456  * use the same seeds.
2457  */
2458 static u32 raw_pg_to_pps(struct ceph_pg_pool_info *pi,
2459 			 const struct ceph_pg *raw_pgid)
2460 {
2461 	if (pi->flags & CEPH_POOL_FLAG_HASHPSPOOL) {
2462 		/* hash pool id and seed so that pool PGs do not overlap */
2463 		return crush_hash32_2(CRUSH_HASH_RJENKINS1,
2464 				      ceph_stable_mod(raw_pgid->seed,
2465 						      pi->pgp_num,
2466 						      pi->pgp_num_mask),
2467 				      raw_pgid->pool);
2468 	} else {
2469 		/*
2470 		 * legacy behavior: add ps and pool together.  this is
2471 		 * not a great approach because the PGs from each pool
2472 		 * will overlap on top of each other: 0.5 == 1.4 ==
2473 		 * 2.3 == ...
2474 		 */
2475 		return ceph_stable_mod(raw_pgid->seed, pi->pgp_num,
2476 				       pi->pgp_num_mask) +
2477 		       (unsigned)raw_pgid->pool;
2478 	}
2479 }
2480 
2481 /*
2482  * Magic value used for a "default" fallback choose_args, used if the
2483  * crush_choose_arg_map passed to do_crush() does not exist.  If this
2484  * also doesn't exist, fall back to canonical weights.
2485  */
2486 #define CEPH_DEFAULT_CHOOSE_ARGS	-1
2487 
2488 static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
2489 		    int *result, int result_max,
2490 		    const __u32 *weight, int weight_max,
2491 		    s64 choose_args_index)
2492 {
2493 	struct crush_choose_arg_map *arg_map;
2494 	struct crush_work *work;
2495 	int r;
2496 
2497 	BUG_ON(result_max > CEPH_PG_MAX_SIZE);
2498 
2499 	arg_map = lookup_choose_arg_map(&map->crush->choose_args,
2500 					choose_args_index);
2501 	if (!arg_map)
2502 		arg_map = lookup_choose_arg_map(&map->crush->choose_args,
2503 						CEPH_DEFAULT_CHOOSE_ARGS);
2504 
2505 	work = get_workspace(&map->crush_wsm, map->crush);
2506 	r = crush_do_rule(map->crush, ruleno, x, result, result_max,
2507 			  weight, weight_max, work,
2508 			  arg_map ? arg_map->args : NULL);
2509 	put_workspace(&map->crush_wsm, work);
2510 	return r;
2511 }
2512 
2513 static void remove_nonexistent_osds(struct ceph_osdmap *osdmap,
2514 				    struct ceph_pg_pool_info *pi,
2515 				    struct ceph_osds *set)
2516 {
2517 	int i;
2518 
2519 	if (ceph_can_shift_osds(pi)) {
2520 		int removed = 0;
2521 
2522 		/* shift left */
2523 		for (i = 0; i < set->size; i++) {
2524 			if (!ceph_osd_exists(osdmap, set->osds[i])) {
2525 				removed++;
2526 				continue;
2527 			}
2528 			if (removed)
2529 				set->osds[i - removed] = set->osds[i];
2530 		}
2531 		set->size -= removed;
2532 	} else {
2533 		/* set dne devices to NONE */
2534 		for (i = 0; i < set->size; i++) {
2535 			if (!ceph_osd_exists(osdmap, set->osds[i]))
2536 				set->osds[i] = CRUSH_ITEM_NONE;
2537 		}
2538 	}
2539 }
2540 
2541 /*
2542  * Calculate raw set (CRUSH output) for given PG and filter out
2543  * nonexistent OSDs.  ->primary is undefined for a raw set.
2544  *
2545  * Placement seed (CRUSH input) is returned through @ppps.
2546  */
2547 static void pg_to_raw_osds(struct ceph_osdmap *osdmap,
2548 			   struct ceph_pg_pool_info *pi,
2549 			   const struct ceph_pg *raw_pgid,
2550 			   struct ceph_osds *raw,
2551 			   u32 *ppps)
2552 {
2553 	u32 pps = raw_pg_to_pps(pi, raw_pgid);
2554 	int ruleno;
2555 	int len;
2556 
2557 	ceph_osds_init(raw);
2558 	if (ppps)
2559 		*ppps = pps;
2560 
2561 	ruleno = crush_find_rule(osdmap->crush, pi->crush_ruleset, pi->type,
2562 				 pi->size);
2563 	if (ruleno < 0) {
2564 		pr_err("no crush rule: pool %lld ruleset %d type %d size %d\n",
2565 		       pi->id, pi->crush_ruleset, pi->type, pi->size);
2566 		return;
2567 	}
2568 
2569 	if (pi->size > ARRAY_SIZE(raw->osds)) {
2570 		pr_err_ratelimited("pool %lld ruleset %d type %d too wide: size %d > %zu\n",
2571 		       pi->id, pi->crush_ruleset, pi->type, pi->size,
2572 		       ARRAY_SIZE(raw->osds));
2573 		return;
2574 	}
2575 
2576 	len = do_crush(osdmap, ruleno, pps, raw->osds, pi->size,
2577 		       osdmap->osd_weight, osdmap->max_osd, pi->id);
2578 	if (len < 0) {
2579 		pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n",
2580 		       len, ruleno, pi->id, pi->crush_ruleset, pi->type,
2581 		       pi->size);
2582 		return;
2583 	}
2584 
2585 	raw->size = len;
2586 	remove_nonexistent_osds(osdmap, pi, raw);
2587 }
2588 
2589 /* apply pg_upmap[_items] mappings */
2590 static void apply_upmap(struct ceph_osdmap *osdmap,
2591 			const struct ceph_pg *pgid,
2592 			struct ceph_osds *raw)
2593 {
2594 	struct ceph_pg_mapping *pg;
2595 	int i, j;
2596 
2597 	pg = lookup_pg_mapping(&osdmap->pg_upmap, pgid);
2598 	if (pg) {
2599 		/* make sure targets aren't marked out */
2600 		for (i = 0; i < pg->pg_upmap.len; i++) {
2601 			int osd = pg->pg_upmap.osds[i];
2602 
2603 			if (osd != CRUSH_ITEM_NONE &&
2604 			    osd < osdmap->max_osd &&
2605 			    osdmap->osd_weight[osd] == 0) {
2606 				/* reject/ignore explicit mapping */
2607 				return;
2608 			}
2609 		}
2610 		for (i = 0; i < pg->pg_upmap.len; i++)
2611 			raw->osds[i] = pg->pg_upmap.osds[i];
2612 		raw->size = pg->pg_upmap.len;
2613 		/* check and apply pg_upmap_items, if any */
2614 	}
2615 
2616 	pg = lookup_pg_mapping(&osdmap->pg_upmap_items, pgid);
2617 	if (pg) {
2618 		/*
2619 		 * Note: this approach does not allow a bidirectional swap,
2620 		 * e.g., [[1,2],[2,1]] applied to [0,1,2] -> [0,2,1].
2621 		 */
2622 		for (i = 0; i < pg->pg_upmap_items.len; i++) {
2623 			int from = pg->pg_upmap_items.from_to[i][0];
2624 			int to = pg->pg_upmap_items.from_to[i][1];
2625 			int pos = -1;
2626 			bool exists = false;
2627 
2628 			/* make sure replacement doesn't already appear */
2629 			for (j = 0; j < raw->size; j++) {
2630 				int osd = raw->osds[j];
2631 
2632 				if (osd == to) {
2633 					exists = true;
2634 					break;
2635 				}
2636 				/* ignore mapping if target is marked out */
2637 				if (osd == from && pos < 0 &&
2638 				    !(to != CRUSH_ITEM_NONE &&
2639 				      to < osdmap->max_osd &&
2640 				      osdmap->osd_weight[to] == 0)) {
2641 					pos = j;
2642 				}
2643 			}
2644 			if (!exists && pos >= 0)
2645 				raw->osds[pos] = to;
2646 		}
2647 	}
2648 }
2649 
2650 /*
2651  * Given raw set, calculate up set and up primary.  By definition of an
2652  * up set, the result won't contain nonexistent or down OSDs.
2653  *
2654  * This is done in-place - on return @set is the up set.  If it's
2655  * empty, ->primary will remain undefined.
2656  */
2657 static void raw_to_up_osds(struct ceph_osdmap *osdmap,
2658 			   struct ceph_pg_pool_info *pi,
2659 			   struct ceph_osds *set)
2660 {
2661 	int i;
2662 
2663 	/* ->primary is undefined for a raw set */
2664 	BUG_ON(set->primary != -1);
2665 
2666 	if (ceph_can_shift_osds(pi)) {
2667 		int removed = 0;
2668 
2669 		/* shift left */
2670 		for (i = 0; i < set->size; i++) {
2671 			if (ceph_osd_is_down(osdmap, set->osds[i])) {
2672 				removed++;
2673 				continue;
2674 			}
2675 			if (removed)
2676 				set->osds[i - removed] = set->osds[i];
2677 		}
2678 		set->size -= removed;
2679 		if (set->size > 0)
2680 			set->primary = set->osds[0];
2681 	} else {
2682 		/* set down/dne devices to NONE */
2683 		for (i = set->size - 1; i >= 0; i--) {
2684 			if (ceph_osd_is_down(osdmap, set->osds[i]))
2685 				set->osds[i] = CRUSH_ITEM_NONE;
2686 			else
2687 				set->primary = set->osds[i];
2688 		}
2689 	}
2690 }
2691 
2692 static void apply_primary_affinity(struct ceph_osdmap *osdmap,
2693 				   struct ceph_pg_pool_info *pi,
2694 				   u32 pps,
2695 				   struct ceph_osds *up)
2696 {
2697 	int i;
2698 	int pos = -1;
2699 
2700 	/*
2701 	 * Do we have any non-default primary_affinity values for these
2702 	 * osds?
2703 	 */
2704 	if (!osdmap->osd_primary_affinity)
2705 		return;
2706 
2707 	for (i = 0; i < up->size; i++) {
2708 		int osd = up->osds[i];
2709 
2710 		if (osd != CRUSH_ITEM_NONE &&
2711 		    osdmap->osd_primary_affinity[osd] !=
2712 					CEPH_OSD_DEFAULT_PRIMARY_AFFINITY) {
2713 			break;
2714 		}
2715 	}
2716 	if (i == up->size)
2717 		return;
2718 
2719 	/*
2720 	 * Pick the primary.  Feed both the seed (for the pg) and the
2721 	 * osd into the hash/rng so that a proportional fraction of an
2722 	 * osd's pgs get rejected as primary.
2723 	 */
2724 	for (i = 0; i < up->size; i++) {
2725 		int osd = up->osds[i];
2726 		u32 aff;
2727 
2728 		if (osd == CRUSH_ITEM_NONE)
2729 			continue;
2730 
2731 		aff = osdmap->osd_primary_affinity[osd];
2732 		if (aff < CEPH_OSD_MAX_PRIMARY_AFFINITY &&
2733 		    (crush_hash32_2(CRUSH_HASH_RJENKINS1,
2734 				    pps, osd) >> 16) >= aff) {
2735 			/*
2736 			 * We chose not to use this primary.  Note it
2737 			 * anyway as a fallback in case we don't pick
2738 			 * anyone else, but keep looking.
2739 			 */
2740 			if (pos < 0)
2741 				pos = i;
2742 		} else {
2743 			pos = i;
2744 			break;
2745 		}
2746 	}
2747 	if (pos < 0)
2748 		return;
2749 
2750 	up->primary = up->osds[pos];
2751 
2752 	if (ceph_can_shift_osds(pi) && pos > 0) {
2753 		/* move the new primary to the front */
2754 		for (i = pos; i > 0; i--)
2755 			up->osds[i] = up->osds[i - 1];
2756 		up->osds[0] = up->primary;
2757 	}
2758 }
2759 
2760 /*
2761  * Get pg_temp and primary_temp mappings for given PG.
2762  *
2763  * Note that a PG may have none, only pg_temp, only primary_temp or
2764  * both pg_temp and primary_temp mappings.  This means @temp isn't
2765  * always a valid OSD set on return: in the "only primary_temp" case,
2766  * @temp will have its ->primary >= 0 but ->size == 0.
2767  */
2768 static void get_temp_osds(struct ceph_osdmap *osdmap,
2769 			  struct ceph_pg_pool_info *pi,
2770 			  const struct ceph_pg *pgid,
2771 			  struct ceph_osds *temp)
2772 {
2773 	struct ceph_pg_mapping *pg;
2774 	int i;
2775 
2776 	ceph_osds_init(temp);
2777 
2778 	/* pg_temp? */
2779 	pg = lookup_pg_mapping(&osdmap->pg_temp, pgid);
2780 	if (pg) {
2781 		for (i = 0; i < pg->pg_temp.len; i++) {
2782 			if (ceph_osd_is_down(osdmap, pg->pg_temp.osds[i])) {
2783 				if (ceph_can_shift_osds(pi))
2784 					continue;
2785 
2786 				temp->osds[temp->size++] = CRUSH_ITEM_NONE;
2787 			} else {
2788 				temp->osds[temp->size++] = pg->pg_temp.osds[i];
2789 			}
2790 		}
2791 
2792 		/* apply pg_temp's primary */
2793 		for (i = 0; i < temp->size; i++) {
2794 			if (temp->osds[i] != CRUSH_ITEM_NONE) {
2795 				temp->primary = temp->osds[i];
2796 				break;
2797 			}
2798 		}
2799 	}
2800 
2801 	/* primary_temp? */
2802 	pg = lookup_pg_mapping(&osdmap->primary_temp, pgid);
2803 	if (pg)
2804 		temp->primary = pg->primary_temp.osd;
2805 }
2806 
2807 /*
2808  * Map a PG to its acting set as well as its up set.
2809  *
2810  * Acting set is used for data mapping purposes, while up set can be
2811  * recorded for detecting interval changes and deciding whether to
2812  * resend a request.
2813  */
2814 void ceph_pg_to_up_acting_osds(struct ceph_osdmap *osdmap,
2815 			       struct ceph_pg_pool_info *pi,
2816 			       const struct ceph_pg *raw_pgid,
2817 			       struct ceph_osds *up,
2818 			       struct ceph_osds *acting)
2819 {
2820 	struct ceph_pg pgid;
2821 	u32 pps;
2822 
2823 	WARN_ON(pi->id != raw_pgid->pool);
2824 	raw_pg_to_pg(pi, raw_pgid, &pgid);
2825 
2826 	pg_to_raw_osds(osdmap, pi, raw_pgid, up, &pps);
2827 	apply_upmap(osdmap, &pgid, up);
2828 	raw_to_up_osds(osdmap, pi, up);
2829 	apply_primary_affinity(osdmap, pi, pps, up);
2830 	get_temp_osds(osdmap, pi, &pgid, acting);
2831 	if (!acting->size) {
2832 		memcpy(acting->osds, up->osds, up->size * sizeof(up->osds[0]));
2833 		acting->size = up->size;
2834 		if (acting->primary == -1)
2835 			acting->primary = up->primary;
2836 	}
2837 	WARN_ON(!osds_valid(up) || !osds_valid(acting));
2838 }
2839 
2840 bool ceph_pg_to_primary_shard(struct ceph_osdmap *osdmap,
2841 			      struct ceph_pg_pool_info *pi,
2842 			      const struct ceph_pg *raw_pgid,
2843 			      struct ceph_spg *spgid)
2844 {
2845 	struct ceph_pg pgid;
2846 	struct ceph_osds up, acting;
2847 	int i;
2848 
2849 	WARN_ON(pi->id != raw_pgid->pool);
2850 	raw_pg_to_pg(pi, raw_pgid, &pgid);
2851 
2852 	if (ceph_can_shift_osds(pi)) {
2853 		spgid->pgid = pgid; /* struct */
2854 		spgid->shard = CEPH_SPG_NOSHARD;
2855 		return true;
2856 	}
2857 
2858 	ceph_pg_to_up_acting_osds(osdmap, pi, &pgid, &up, &acting);
2859 	for (i = 0; i < acting.size; i++) {
2860 		if (acting.osds[i] == acting.primary) {
2861 			spgid->pgid = pgid; /* struct */
2862 			spgid->shard = i;
2863 			return true;
2864 		}
2865 	}
2866 
2867 	return false;
2868 }
2869 
2870 /*
2871  * Return acting primary for given PG, or -1 if none.
2872  */
2873 int ceph_pg_to_acting_primary(struct ceph_osdmap *osdmap,
2874 			      const struct ceph_pg *raw_pgid)
2875 {
2876 	struct ceph_pg_pool_info *pi;
2877 	struct ceph_osds up, acting;
2878 
2879 	pi = ceph_pg_pool_by_id(osdmap, raw_pgid->pool);
2880 	if (!pi)
2881 		return -1;
2882 
2883 	ceph_pg_to_up_acting_osds(osdmap, pi, raw_pgid, &up, &acting);
2884 	return acting.primary;
2885 }
2886 EXPORT_SYMBOL(ceph_pg_to_acting_primary);
2887 
2888 static struct crush_loc_node *alloc_crush_loc(size_t type_name_len,
2889 					      size_t name_len)
2890 {
2891 	struct crush_loc_node *loc;
2892 
2893 	loc = kmalloc(sizeof(*loc) + type_name_len + name_len + 2, GFP_NOIO);
2894 	if (!loc)
2895 		return NULL;
2896 
2897 	RB_CLEAR_NODE(&loc->cl_node);
2898 	return loc;
2899 }
2900 
2901 static void free_crush_loc(struct crush_loc_node *loc)
2902 {
2903 	WARN_ON(!RB_EMPTY_NODE(&loc->cl_node));
2904 
2905 	kfree(loc);
2906 }
2907 
2908 static int crush_loc_compare(const struct crush_loc *loc1,
2909 			     const struct crush_loc *loc2)
2910 {
2911 	return strcmp(loc1->cl_type_name, loc2->cl_type_name) ?:
2912 	       strcmp(loc1->cl_name, loc2->cl_name);
2913 }
2914 
2915 DEFINE_RB_FUNCS2(crush_loc, struct crush_loc_node, cl_loc, crush_loc_compare,
2916 		 RB_BYPTR, const struct crush_loc *, cl_node)
2917 
2918 /*
2919  * Parses a set of <bucket type name>':'<bucket name> pairs separated
2920  * by '|', e.g. "rack:foo1|rack:foo2|datacenter:bar".
2921  *
2922  * Note that @crush_location is modified by strsep().
2923  */
2924 int ceph_parse_crush_location(char *crush_location, struct rb_root *locs)
2925 {
2926 	struct crush_loc_node *loc;
2927 	const char *type_name, *name, *colon;
2928 	size_t type_name_len, name_len;
2929 
2930 	dout("%s '%s'\n", __func__, crush_location);
2931 	while ((type_name = strsep(&crush_location, "|"))) {
2932 		colon = strchr(type_name, ':');
2933 		if (!colon)
2934 			return -EINVAL;
2935 
2936 		type_name_len = colon - type_name;
2937 		if (type_name_len == 0)
2938 			return -EINVAL;
2939 
2940 		name = colon + 1;
2941 		name_len = strlen(name);
2942 		if (name_len == 0)
2943 			return -EINVAL;
2944 
2945 		loc = alloc_crush_loc(type_name_len, name_len);
2946 		if (!loc)
2947 			return -ENOMEM;
2948 
2949 		loc->cl_loc.cl_type_name = loc->cl_data;
2950 		memcpy(loc->cl_loc.cl_type_name, type_name, type_name_len);
2951 		loc->cl_loc.cl_type_name[type_name_len] = '\0';
2952 
2953 		loc->cl_loc.cl_name = loc->cl_data + type_name_len + 1;
2954 		memcpy(loc->cl_loc.cl_name, name, name_len);
2955 		loc->cl_loc.cl_name[name_len] = '\0';
2956 
2957 		if (!__insert_crush_loc(locs, loc)) {
2958 			free_crush_loc(loc);
2959 			return -EEXIST;
2960 		}
2961 
2962 		dout("%s type_name '%s' name '%s'\n", __func__,
2963 		     loc->cl_loc.cl_type_name, loc->cl_loc.cl_name);
2964 	}
2965 
2966 	return 0;
2967 }
2968 
2969 int ceph_compare_crush_locs(struct rb_root *locs1, struct rb_root *locs2)
2970 {
2971 	struct rb_node *n1 = rb_first(locs1);
2972 	struct rb_node *n2 = rb_first(locs2);
2973 	int ret;
2974 
2975 	for ( ; n1 && n2; n1 = rb_next(n1), n2 = rb_next(n2)) {
2976 		struct crush_loc_node *loc1 =
2977 		    rb_entry(n1, struct crush_loc_node, cl_node);
2978 		struct crush_loc_node *loc2 =
2979 		    rb_entry(n2, struct crush_loc_node, cl_node);
2980 
2981 		ret = crush_loc_compare(&loc1->cl_loc, &loc2->cl_loc);
2982 		if (ret)
2983 			return ret;
2984 	}
2985 
2986 	if (!n1 && n2)
2987 		return -1;
2988 	if (n1 && !n2)
2989 		return 1;
2990 	return 0;
2991 }
2992 
2993 void ceph_clear_crush_locs(struct rb_root *locs)
2994 {
2995 	while (!RB_EMPTY_ROOT(locs)) {
2996 		struct crush_loc_node *loc =
2997 		    rb_entry(rb_first(locs), struct crush_loc_node, cl_node);
2998 
2999 		erase_crush_loc(locs, loc);
3000 		free_crush_loc(loc);
3001 	}
3002 }
3003 
3004 /*
3005  * [a-zA-Z0-9-_.]+
3006  */
3007 static bool is_valid_crush_name(const char *name)
3008 {
3009 	do {
3010 		if (!('a' <= *name && *name <= 'z') &&
3011 		    !('A' <= *name && *name <= 'Z') &&
3012 		    !('0' <= *name && *name <= '9') &&
3013 		    *name != '-' && *name != '_' && *name != '.')
3014 			return false;
3015 	} while (*++name != '\0');
3016 
3017 	return true;
3018 }
3019 
3020 /*
3021  * Gets the parent of an item.  Returns its id (<0 because the
3022  * parent is always a bucket), type id (>0 for the same reason,
3023  * via @parent_type_id) and location (via @parent_loc).  If no
3024  * parent, returns 0.
3025  *
3026  * Does a linear search, as there are no parent pointers of any
3027  * kind.  Note that the result is ambiguous for items that occur
3028  * multiple times in the map.
3029  */
3030 static int get_immediate_parent(struct crush_map *c, int id,
3031 				u16 *parent_type_id,
3032 				struct crush_loc *parent_loc)
3033 {
3034 	struct crush_bucket *b;
3035 	struct crush_name_node *type_cn, *cn;
3036 	int i, j;
3037 
3038 	for (i = 0; i < c->max_buckets; i++) {
3039 		b = c->buckets[i];
3040 		if (!b)
3041 			continue;
3042 
3043 		/* ignore per-class shadow hierarchy */
3044 		cn = lookup_crush_name(&c->names, b->id);
3045 		if (!cn || !is_valid_crush_name(cn->cn_name))
3046 			continue;
3047 
3048 		for (j = 0; j < b->size; j++) {
3049 			if (b->items[j] != id)
3050 				continue;
3051 
3052 			*parent_type_id = b->type;
3053 			type_cn = lookup_crush_name(&c->type_names, b->type);
3054 			parent_loc->cl_type_name = type_cn->cn_name;
3055 			parent_loc->cl_name = cn->cn_name;
3056 			return b->id;
3057 		}
3058 	}
3059 
3060 	return 0;  /* no parent */
3061 }
3062 
3063 /*
3064  * Calculates the locality/distance from an item to a client
3065  * location expressed in terms of CRUSH hierarchy as a set of
3066  * (bucket type name, bucket name) pairs.  Specifically, looks
3067  * for the lowest-valued bucket type for which the location of
3068  * @id matches one of the locations in @locs, so for standard
3069  * bucket types (host = 1, rack = 3, datacenter = 8, zone = 9)
3070  * a matching host is closer than a matching rack and a matching
3071  * data center is closer than a matching zone.
3072  *
3073  * Specifying multiple locations (a "multipath" location) such
3074  * as "rack=foo1 rack=foo2 datacenter=bar" is allowed -- @locs
3075  * is a multimap.  The locality will be:
3076  *
3077  * - 3 for OSDs in racks foo1 and foo2
3078  * - 8 for OSDs in data center bar
3079  * - -1 for all other OSDs
3080  *
3081  * The lowest possible bucket type is 1, so the best locality
3082  * for an OSD is 1 (i.e. a matching host).  Locality 0 would be
3083  * the OSD itself.
3084  */
3085 int ceph_get_crush_locality(struct ceph_osdmap *osdmap, int id,
3086 			    struct rb_root *locs)
3087 {
3088 	struct crush_loc loc;
3089 	u16 type_id;
3090 
3091 	/*
3092 	 * Instead of repeated get_immediate_parent() calls,
3093 	 * the location of @id could be obtained with a single
3094 	 * depth-first traversal.
3095 	 */
3096 	for (;;) {
3097 		id = get_immediate_parent(osdmap->crush, id, &type_id, &loc);
3098 		if (id >= 0)
3099 			return -1;  /* not local */
3100 
3101 		if (lookup_crush_loc(locs, &loc))
3102 			return type_id;
3103 	}
3104 }
3105