xref: /linux/fs/bcachefs/rcu_pending.c (revision 1cbfb828e05171ca2dd77b5988d068e6872480fe)
1 // SPDX-License-Identifier: GPL-2.0
2 #define pr_fmt(fmt) "%s() " fmt "\n", __func__
3 
4 #include <linux/generic-radix-tree.h>
5 #include <linux/mm.h>
6 #include <linux/percpu.h>
7 #include <linux/slab.h>
8 #include <linux/srcu.h>
9 #include <linux/vmalloc.h>
10 
11 #include "rcu_pending.h"
12 #include "darray.h"
13 #include "util.h"
14 
15 #define static_array_for_each(_a, _i)			\
16 	for (typeof(&(_a)[0]) _i = _a;			\
17 	     _i < (_a) + ARRAY_SIZE(_a);		\
18 	     _i++)
19 
20 enum rcu_pending_special {
21 	RCU_PENDING_KVFREE	= 1,
22 	RCU_PENDING_CALL_RCU	= 2,
23 };
24 
25 #define RCU_PENDING_KVFREE_FN		((rcu_pending_process_fn) (ulong) RCU_PENDING_KVFREE)
26 #define RCU_PENDING_CALL_RCU_FN		((rcu_pending_process_fn) (ulong) RCU_PENDING_CALL_RCU)
27 
28 #ifdef __KERNEL__
29 typedef unsigned long			rcu_gp_poll_state_t;
30 
31 static inline bool rcu_gp_poll_cookie_eq(rcu_gp_poll_state_t l, rcu_gp_poll_state_t r)
32 {
33 	return l == r;
34 }
35 #else
36 typedef struct urcu_gp_poll_state	rcu_gp_poll_state_t;
37 
38 static inline bool rcu_gp_poll_cookie_eq(rcu_gp_poll_state_t l, rcu_gp_poll_state_t r)
39 {
40 	return l.grace_period_id == r.grace_period_id;
41 }
42 #endif
43 
44 static inline rcu_gp_poll_state_t __get_state_synchronize_rcu(struct srcu_struct *ssp)
45 {
46 	return ssp
47 		? get_state_synchronize_srcu(ssp)
48 		: get_state_synchronize_rcu();
49 }
50 
51 static inline rcu_gp_poll_state_t __start_poll_synchronize_rcu(struct srcu_struct *ssp)
52 {
53 	return ssp
54 		? start_poll_synchronize_srcu(ssp)
55 		: start_poll_synchronize_rcu();
56 }
57 
58 static inline bool __poll_state_synchronize_rcu(struct srcu_struct *ssp, rcu_gp_poll_state_t cookie)
59 {
60 	return ssp
61 		? poll_state_synchronize_srcu(ssp, cookie)
62 		: poll_state_synchronize_rcu(cookie);
63 }
64 
65 static inline void __rcu_barrier(struct srcu_struct *ssp)
66 {
67 	return ssp
68 		? srcu_barrier(ssp)
69 		: rcu_barrier();
70 }
71 
72 static inline void __call_rcu(struct srcu_struct *ssp, struct rcu_head *rhp,
73 			      rcu_callback_t func)
74 {
75 	if (ssp)
76 		call_srcu(ssp, rhp, func);
77 	else
78 		call_rcu(rhp, func);
79 }
80 
81 struct rcu_pending_seq {
82 	/*
83 	 * We're using a radix tree like a vector - we're just pushing elements
84 	 * onto the end; we're using a radix tree instead of an actual vector to
85 	 * avoid reallocation overhead
86 	 */
87 	GENRADIX(struct rcu_head *)	objs;
88 	size_t				nr;
89 	struct rcu_head			**cursor;
90 	rcu_gp_poll_state_t		seq;
91 };
92 
93 struct rcu_pending_list {
94 	struct rcu_head			*head;
95 	struct rcu_head			*tail;
96 	rcu_gp_poll_state_t		seq;
97 };
98 
99 struct rcu_pending_pcpu {
100 	struct rcu_pending		*parent;
101 	spinlock_t			lock;
102 	int				cpu;
103 
104 	/*
105 	 * We can't bound the number of unprocessed gp sequence numbers, and we
106 	 * can't efficiently merge radix trees for expired grace periods, so we
107 	 * need darray/vector:
108 	 */
109 	DARRAY_PREALLOCATED(struct rcu_pending_seq, 4) objs;
110 
111 	/* Third entry is for expired objects: */
112 	struct rcu_pending_list		lists[NUM_ACTIVE_RCU_POLL_OLDSTATE + 1];
113 
114 	struct rcu_head			cb;
115 	bool				cb_armed;
116 	struct work_struct		work;
117 };
118 
119 static bool __rcu_pending_has_pending(struct rcu_pending_pcpu *p)
120 {
121 	if (p->objs.nr)
122 		return true;
123 
124 	static_array_for_each(p->lists, i)
125 		if (i->head)
126 			return true;
127 
128 	return false;
129 }
130 
131 static void rcu_pending_list_merge(struct rcu_pending_list *l1,
132 				   struct rcu_pending_list *l2)
133 {
134 #ifdef __KERNEL__
135 	if (!l1->head)
136 		l1->head = l2->head;
137 	else
138 		l1->tail->next = l2->head;
139 #else
140 	if (!l1->head)
141 		l1->head = l2->head;
142 	else
143 		l1->tail->next.next = (void *) l2->head;
144 #endif
145 
146 	l1->tail = l2->tail;
147 	l2->head = l2->tail = NULL;
148 }
149 
150 static void rcu_pending_list_add(struct rcu_pending_list *l,
151 				 struct rcu_head *n)
152 {
153 #ifdef __KERNEL__
154 	if (!l->head)
155 		l->head = n;
156 	else
157 		l->tail->next = n;
158 	l->tail = n;
159 	n->next = NULL;
160 #else
161 	if (!l->head)
162 		l->head = n;
163 	else
164 		l->tail->next.next = (void *) n;
165 	l->tail = n;
166 	n->next.next = NULL;
167 #endif
168 }
169 
170 static void merge_expired_lists(struct rcu_pending_pcpu *p)
171 {
172 	struct rcu_pending_list *expired = &p->lists[NUM_ACTIVE_RCU_POLL_OLDSTATE];
173 
174 	for (struct rcu_pending_list *i = p->lists; i < expired; i++)
175 		if (i->head && __poll_state_synchronize_rcu(p->parent->srcu, i->seq))
176 			rcu_pending_list_merge(expired, i);
177 }
178 
179 #ifndef __KERNEL__
180 static inline void kfree_bulk(size_t nr, void ** p)
181 {
182 	while (nr--)
183 		kfree(*p);
184 }
185 
186 #define local_irq_save(flags)		\
187 do {					\
188 	flags = 0;			\
189 } while (0)
190 #endif
191 
192 static noinline void __process_finished_items(struct rcu_pending *pending,
193 					      struct rcu_pending_pcpu *p,
194 					      unsigned long flags)
195 {
196 	struct rcu_pending_list *expired = &p->lists[NUM_ACTIVE_RCU_POLL_OLDSTATE];
197 	struct rcu_pending_seq objs = {};
198 	struct rcu_head *list = NULL;
199 
200 	if (p->objs.nr &&
201 	    __poll_state_synchronize_rcu(pending->srcu, p->objs.data[0].seq)) {
202 		objs = p->objs.data[0];
203 		darray_remove_item(&p->objs, p->objs.data);
204 	}
205 
206 	merge_expired_lists(p);
207 
208 	list = expired->head;
209 	expired->head = expired->tail = NULL;
210 
211 	spin_unlock_irqrestore(&p->lock, flags);
212 
213 	switch ((ulong) pending->process) {
214 	case RCU_PENDING_KVFREE:
215 		for (size_t i = 0; i < objs.nr; ) {
216 			size_t nr_this_node = min(GENRADIX_NODE_SIZE / sizeof(void *), objs.nr - i);
217 
218 			kfree_bulk(nr_this_node, (void **) genradix_ptr(&objs.objs, i));
219 			i += nr_this_node;
220 		}
221 		genradix_free(&objs.objs);
222 
223 		while (list) {
224 			struct rcu_head *obj = list;
225 #ifdef __KERNEL__
226 			list = obj->next;
227 #else
228 			list = (void *) obj->next.next;
229 #endif
230 
231 			/*
232 			 * low bit of pointer indicates whether rcu_head needs
233 			 * to be freed - kvfree_rcu_mightsleep()
234 			 */
235 			BUILD_BUG_ON(ARCH_SLAB_MINALIGN == 0);
236 
237 			void *ptr = (void *)(((unsigned long) obj->func) & ~1UL);
238 			bool free_head = ((unsigned long) obj->func) & 1UL;
239 
240 			kvfree(ptr);
241 			if (free_head)
242 				kfree(obj);
243 		}
244 
245 		break;
246 
247 	case RCU_PENDING_CALL_RCU:
248 		for (size_t i = 0; i < objs.nr; i++) {
249 			struct rcu_head *obj = *genradix_ptr(&objs.objs, i);
250 			obj->func(obj);
251 		}
252 		genradix_free(&objs.objs);
253 
254 		while (list) {
255 			struct rcu_head *obj = list;
256 #ifdef __KERNEL__
257 			list = obj->next;
258 #else
259 			list = (void *) obj->next.next;
260 #endif
261 			obj->func(obj);
262 		}
263 		break;
264 
265 	default:
266 		for (size_t i = 0; i < objs.nr; i++)
267 			pending->process(pending, *genradix_ptr(&objs.objs, i));
268 		genradix_free(&objs.objs);
269 
270 		while (list) {
271 			struct rcu_head *obj = list;
272 #ifdef __KERNEL__
273 			list = obj->next;
274 #else
275 			list = (void *) obj->next.next;
276 #endif
277 			pending->process(pending, obj);
278 		}
279 		break;
280 	}
281 }
282 
283 static bool process_finished_items(struct rcu_pending *pending,
284 				   struct rcu_pending_pcpu *p,
285 				   unsigned long flags)
286 {
287 	/*
288 	 * XXX: we should grab the gp seq once and avoid multiple function
289 	 * calls, this is called from __rcu_pending_enqueue() fastpath in
290 	 * may_sleep==true mode
291 	 */
292 	if ((p->objs.nr && __poll_state_synchronize_rcu(pending->srcu, p->objs.data[0].seq)) ||
293 	    (p->lists[0].head && __poll_state_synchronize_rcu(pending->srcu, p->lists[0].seq)) ||
294 	    (p->lists[1].head && __poll_state_synchronize_rcu(pending->srcu, p->lists[1].seq)) ||
295 	    p->lists[2].head) {
296 		__process_finished_items(pending, p, flags);
297 		return true;
298 	}
299 
300 	return false;
301 }
302 
303 static void rcu_pending_work(struct work_struct *work)
304 {
305 	struct rcu_pending_pcpu *p =
306 		container_of(work, struct rcu_pending_pcpu, work);
307 	struct rcu_pending *pending = p->parent;
308 	unsigned long flags;
309 
310 	do {
311 		spin_lock_irqsave(&p->lock, flags);
312 	} while (process_finished_items(pending, p, flags));
313 
314 	spin_unlock_irqrestore(&p->lock, flags);
315 }
316 
317 static void rcu_pending_rcu_cb(struct rcu_head *rcu)
318 {
319 	struct rcu_pending_pcpu *p = container_of(rcu, struct rcu_pending_pcpu, cb);
320 
321 	schedule_work_on(p->cpu, &p->work);
322 
323 	unsigned long flags;
324 	spin_lock_irqsave(&p->lock, flags);
325 	if (__rcu_pending_has_pending(p)) {
326 		spin_unlock_irqrestore(&p->lock, flags);
327 		__call_rcu(p->parent->srcu, &p->cb, rcu_pending_rcu_cb);
328 	} else {
329 		p->cb_armed = false;
330 		spin_unlock_irqrestore(&p->lock, flags);
331 	}
332 }
333 
334 static __always_inline struct rcu_pending_seq *
335 get_object_radix(struct rcu_pending_pcpu *p, rcu_gp_poll_state_t seq)
336 {
337 	darray_for_each_reverse(p->objs, objs)
338 		if (rcu_gp_poll_cookie_eq(objs->seq, seq))
339 			return objs;
340 
341 	if (darray_push_gfp(&p->objs, ((struct rcu_pending_seq) { .seq = seq }), GFP_ATOMIC))
342 		return NULL;
343 
344 	return &darray_last(p->objs);
345 }
346 
347 static noinline bool
348 rcu_pending_enqueue_list(struct rcu_pending_pcpu *p, rcu_gp_poll_state_t seq,
349 			 struct rcu_head *head, void *ptr,
350 			 unsigned long *flags)
351 {
352 	if (ptr) {
353 		if (!head) {
354 			/*
355 			 * kvfree_rcu_mightsleep(): we weren't passed an
356 			 * rcu_head, but we need one: use the low bit of the
357 			 * ponter to free to flag that the head needs to be
358 			 * freed as well:
359 			 */
360 			ptr = (void *)(((unsigned long) ptr)|1UL);
361 			head = kmalloc(sizeof(*head), __GFP_NOWARN);
362 			if (!head) {
363 				spin_unlock_irqrestore(&p->lock, *flags);
364 				head = kmalloc(sizeof(*head), GFP_KERNEL|__GFP_NOFAIL);
365 				/*
366 				 * dropped lock, did GFP_KERNEL allocation,
367 				 * check for gp expiration
368 				 */
369 				if (unlikely(__poll_state_synchronize_rcu(p->parent->srcu, seq))) {
370 					kvfree(--ptr);
371 					kfree(head);
372 					spin_lock_irqsave(&p->lock, *flags);
373 					return false;
374 				}
375 			}
376 		}
377 
378 		head->func = ptr;
379 	}
380 again:
381 	for (struct rcu_pending_list *i = p->lists;
382 	     i < p->lists + NUM_ACTIVE_RCU_POLL_OLDSTATE; i++) {
383 		if (rcu_gp_poll_cookie_eq(i->seq, seq)) {
384 			rcu_pending_list_add(i, head);
385 			return false;
386 		}
387 	}
388 
389 	for (struct rcu_pending_list *i = p->lists;
390 	     i < p->lists + NUM_ACTIVE_RCU_POLL_OLDSTATE; i++) {
391 		if (!i->head) {
392 			i->seq = seq;
393 			rcu_pending_list_add(i, head);
394 			return true;
395 		}
396 	}
397 
398 	merge_expired_lists(p);
399 	goto again;
400 }
401 
402 /*
403  * __rcu_pending_enqueue: enqueue a pending RCU item, to be processed (via
404  * pending->pracess) once grace period elapses.
405  *
406  * Attempt to enqueue items onto a radix tree; if memory allocation fails, fall
407  * back to a linked list.
408  *
409  * - If @ptr is NULL, we're enqueuing an item for a generic @pending with a
410  *   process callback
411  *
412  * - If @ptr and @head are both not NULL, we're kvfree_rcu()
413  *
414  * - If @ptr is not NULL and @head is, we're kvfree_rcu_mightsleep()
415  *
416  * - If @may_sleep is true, will do GFP_KERNEL memory allocations and process
417  *   expired items.
418  */
419 static __always_inline void
420 __rcu_pending_enqueue(struct rcu_pending *pending, struct rcu_head *head,
421 		      void *ptr, bool may_sleep)
422 {
423 
424 	struct rcu_pending_pcpu *p;
425 	struct rcu_pending_seq *objs;
426 	struct genradix_node *new_node = NULL;
427 	unsigned long flags;
428 	bool start_gp = false;
429 
430 	BUG_ON((ptr != NULL) != (pending->process == RCU_PENDING_KVFREE_FN));
431 
432 	local_irq_save(flags);
433 	p = this_cpu_ptr(pending->p);
434 	spin_lock(&p->lock);
435 	rcu_gp_poll_state_t seq = __get_state_synchronize_rcu(pending->srcu);
436 restart:
437 	if (may_sleep &&
438 	    unlikely(process_finished_items(pending, p, flags)))
439 		goto check_expired;
440 
441 	/*
442 	 * In kvfree_rcu() mode, the radix tree is only for slab pointers so
443 	 * that we can do kfree_bulk() - vmalloc pointers always use the linked
444 	 * list:
445 	 */
446 	if (ptr && unlikely(is_vmalloc_addr(ptr)))
447 		goto list_add;
448 
449 	objs = get_object_radix(p, seq);
450 	if (unlikely(!objs))
451 		goto list_add;
452 
453 	if (unlikely(!objs->cursor)) {
454 		/*
455 		 * New radix tree nodes must be added under @p->lock because the
456 		 * tree root is in a darray that can be resized (typically,
457 		 * genradix supports concurrent unlocked allocation of new
458 		 * nodes) - hence preallocation and the retry loop:
459 		 */
460 		objs->cursor = genradix_ptr_alloc_preallocated_inlined(&objs->objs,
461 						objs->nr, &new_node, GFP_ATOMIC|__GFP_NOWARN);
462 		if (unlikely(!objs->cursor)) {
463 			if (may_sleep) {
464 				spin_unlock_irqrestore(&p->lock, flags);
465 
466 				gfp_t gfp = GFP_KERNEL;
467 				if (!head)
468 					gfp |= __GFP_NOFAIL;
469 
470 				new_node = genradix_alloc_node(gfp);
471 				if (!new_node)
472 					may_sleep = false;
473 				goto check_expired;
474 			}
475 list_add:
476 			start_gp = rcu_pending_enqueue_list(p, seq, head, ptr, &flags);
477 			goto start_gp;
478 		}
479 	}
480 
481 	*objs->cursor++ = ptr ?: head;
482 	/* zero cursor if we hit the end of a radix tree node: */
483 	if (!(((ulong) objs->cursor) & (GENRADIX_NODE_SIZE - 1)))
484 		objs->cursor = NULL;
485 	start_gp = !objs->nr;
486 	objs->nr++;
487 start_gp:
488 	if (unlikely(start_gp)) {
489 		/*
490 		 * We only have one callback (ideally, we would have one for
491 		 * every outstanding graceperiod) - so if our callback is
492 		 * already in flight, we may still have to start a grace period
493 		 * (since we used get_state() above, not start_poll())
494 		 */
495 		if (!p->cb_armed) {
496 			p->cb_armed = true;
497 			__call_rcu(pending->srcu, &p->cb, rcu_pending_rcu_cb);
498 		} else {
499 			__start_poll_synchronize_rcu(pending->srcu);
500 		}
501 	}
502 	spin_unlock_irqrestore(&p->lock, flags);
503 free_node:
504 	if (new_node)
505 		genradix_free_node(new_node);
506 	return;
507 check_expired:
508 	if (unlikely(__poll_state_synchronize_rcu(pending->srcu, seq))) {
509 		switch ((ulong) pending->process) {
510 		case RCU_PENDING_KVFREE:
511 			kvfree(ptr);
512 			break;
513 		case RCU_PENDING_CALL_RCU:
514 			head->func(head);
515 			break;
516 		default:
517 			pending->process(pending, head);
518 			break;
519 		}
520 		goto free_node;
521 	}
522 
523 	local_irq_save(flags);
524 	p = this_cpu_ptr(pending->p);
525 	spin_lock(&p->lock);
526 	goto restart;
527 }
528 
529 void rcu_pending_enqueue(struct rcu_pending *pending, struct rcu_head *obj)
530 {
531 	__rcu_pending_enqueue(pending, obj, NULL, true);
532 }
533 
534 static struct rcu_head *rcu_pending_pcpu_dequeue(struct rcu_pending_pcpu *p)
535 {
536 	struct rcu_head *ret = NULL;
537 
538 	spin_lock_irq(&p->lock);
539 	darray_for_each(p->objs, objs)
540 		if (objs->nr) {
541 			ret = *genradix_ptr(&objs->objs, --objs->nr);
542 			objs->cursor = NULL;
543 			if (!objs->nr)
544 				genradix_free(&objs->objs);
545 			goto out;
546 		}
547 
548 	static_array_for_each(p->lists, i)
549 		if (i->head) {
550 			ret = i->head;
551 #ifdef __KERNEL__
552 			i->head = ret->next;
553 #else
554 			i->head = (void *) ret->next.next;
555 #endif
556 			if (!i->head)
557 				i->tail = NULL;
558 			goto out;
559 		}
560 out:
561 	spin_unlock_irq(&p->lock);
562 
563 	return ret;
564 }
565 
566 struct rcu_head *rcu_pending_dequeue(struct rcu_pending *pending)
567 {
568 	return rcu_pending_pcpu_dequeue(raw_cpu_ptr(pending->p));
569 }
570 
571 struct rcu_head *rcu_pending_dequeue_from_all(struct rcu_pending *pending)
572 {
573 	struct rcu_head *ret = rcu_pending_dequeue(pending);
574 
575 	if (ret)
576 		return ret;
577 
578 	int cpu;
579 	for_each_possible_cpu(cpu) {
580 		ret = rcu_pending_pcpu_dequeue(per_cpu_ptr(pending->p, cpu));
581 		if (ret)
582 			break;
583 	}
584 	return ret;
585 }
586 
587 static bool rcu_pending_has_pending_or_armed(struct rcu_pending *pending)
588 {
589 	int cpu;
590 	for_each_possible_cpu(cpu) {
591 		struct rcu_pending_pcpu *p = per_cpu_ptr(pending->p, cpu);
592 		spin_lock_irq(&p->lock);
593 		if (__rcu_pending_has_pending(p) || p->cb_armed) {
594 			spin_unlock_irq(&p->lock);
595 			return true;
596 		}
597 		spin_unlock_irq(&p->lock);
598 	}
599 
600 	return false;
601 }
602 
603 void rcu_pending_exit(struct rcu_pending *pending)
604 {
605 	int cpu;
606 
607 	if (!pending->p)
608 		return;
609 
610 	while (rcu_pending_has_pending_or_armed(pending)) {
611 		__rcu_barrier(pending->srcu);
612 
613 		for_each_possible_cpu(cpu) {
614 			struct rcu_pending_pcpu *p = per_cpu_ptr(pending->p, cpu);
615 			flush_work(&p->work);
616 		}
617 	}
618 
619 	for_each_possible_cpu(cpu) {
620 		struct rcu_pending_pcpu *p = per_cpu_ptr(pending->p, cpu);
621 		flush_work(&p->work);
622 	}
623 
624 	for_each_possible_cpu(cpu) {
625 		struct rcu_pending_pcpu *p = per_cpu_ptr(pending->p, cpu);
626 
627 		static_array_for_each(p->lists, i)
628 			WARN_ON(i->head);
629 		WARN_ON(p->objs.nr);
630 		darray_exit(&p->objs);
631 	}
632 	free_percpu(pending->p);
633 }
634 
635 /**
636  * rcu_pending_init: - initialize a rcu_pending
637  *
638  * @pending:	Object to init
639  * @srcu:	May optionally be used with an srcu_struct; if NULL, uses normal
640  *		RCU flavor
641  * @process:	Callback function invoked on objects once their RCU barriers
642  *		have completed; if NULL, kvfree() is used.
643  */
644 int rcu_pending_init(struct rcu_pending *pending,
645 		     struct srcu_struct *srcu,
646 		     rcu_pending_process_fn process)
647 {
648 	pending->p = alloc_percpu(struct rcu_pending_pcpu);
649 	if (!pending->p)
650 		return -ENOMEM;
651 
652 	int cpu;
653 	for_each_possible_cpu(cpu) {
654 		struct rcu_pending_pcpu *p = per_cpu_ptr(pending->p, cpu);
655 		p->parent	= pending;
656 		p->cpu		= cpu;
657 		spin_lock_init(&p->lock);
658 		darray_init(&p->objs);
659 		INIT_WORK(&p->work, rcu_pending_work);
660 	}
661 
662 	pending->srcu = srcu;
663 	pending->process = process;
664 
665 	return 0;
666 }
667