xref: /linux/fs/bcachefs/rcu_pending.c (revision 13d68a16430312fc21990f48326366eb73891202)
1 // SPDX-License-Identifier: GPL-2.0
2 #define pr_fmt(fmt) "%s() " fmt "\n", __func__
3 
4 #include <linux/generic-radix-tree.h>
5 #include <linux/mm.h>
6 #include <linux/percpu.h>
7 #include <linux/slab.h>
8 #include <linux/srcu.h>
9 #include <linux/vmalloc.h>
10 
11 #include "rcu_pending.h"
12 #include "darray.h"
13 #include "util.h"
14 
15 #define static_array_for_each(_a, _i)			\
16 	for (typeof(&(_a)[0]) _i = _a;			\
17 	     _i < (_a) + ARRAY_SIZE(_a);		\
18 	     _i++)
19 
20 enum rcu_pending_special {
21 	RCU_PENDING_KVFREE	= 1,
22 	RCU_PENDING_CALL_RCU	= 2,
23 };
24 
25 #define RCU_PENDING_KVFREE_FN		((rcu_pending_process_fn) (ulong) RCU_PENDING_KVFREE)
26 #define RCU_PENDING_CALL_RCU_FN		((rcu_pending_process_fn) (ulong) RCU_PENDING_CALL_RCU)
27 
28 static inline unsigned long __get_state_synchronize_rcu(struct srcu_struct *ssp)
29 {
30 	return ssp
31 		? get_state_synchronize_srcu(ssp)
32 		: get_state_synchronize_rcu();
33 }
34 
35 static inline unsigned long __start_poll_synchronize_rcu(struct srcu_struct *ssp)
36 {
37 	return ssp
38 		? start_poll_synchronize_srcu(ssp)
39 		: start_poll_synchronize_rcu();
40 }
41 
42 static inline bool __poll_state_synchronize_rcu(struct srcu_struct *ssp, unsigned long cookie)
43 {
44 	return ssp
45 		? poll_state_synchronize_srcu(ssp, cookie)
46 		: poll_state_synchronize_rcu(cookie);
47 }
48 
49 static inline void __rcu_barrier(struct srcu_struct *ssp)
50 {
51 	return ssp
52 		? srcu_barrier(ssp)
53 		: rcu_barrier();
54 }
55 
56 static inline void __call_rcu(struct srcu_struct *ssp, struct rcu_head *rhp,
57 			      rcu_callback_t func)
58 {
59 	if (ssp)
60 		call_srcu(ssp, rhp, func);
61 	else
62 		call_rcu(rhp, func);
63 }
64 
65 struct rcu_pending_seq {
66 	/*
67 	 * We're using a radix tree like a vector - we're just pushing elements
68 	 * onto the end; we're using a radix tree instead of an actual vector to
69 	 * avoid reallocation overhead
70 	 */
71 	GENRADIX(struct rcu_head *)	objs;
72 	size_t				nr;
73 	struct rcu_head			**cursor;
74 	unsigned long			seq;
75 };
76 
77 struct rcu_pending_list {
78 	struct rcu_head			*head;
79 	struct rcu_head			*tail;
80 	unsigned long			seq;
81 };
82 
83 struct rcu_pending_pcpu {
84 	struct rcu_pending		*parent;
85 	spinlock_t			lock;
86 	int				cpu;
87 
88 	/*
89 	 * We can't bound the number of unprocessed gp sequence numbers, and we
90 	 * can't efficiently merge radix trees for expired grace periods, so we
91 	 * need darray/vector:
92 	 */
93 	DARRAY_PREALLOCATED(struct rcu_pending_seq, 4) objs;
94 
95 	/* Third entry is for expired objects: */
96 	struct rcu_pending_list		lists[NUM_ACTIVE_RCU_POLL_OLDSTATE + 1];
97 
98 	struct rcu_head			cb;
99 	bool				cb_armed;
100 	struct work_struct		work;
101 };
102 
103 static bool __rcu_pending_has_pending(struct rcu_pending_pcpu *p)
104 {
105 	if (p->objs.nr)
106 		return true;
107 
108 	static_array_for_each(p->lists, i)
109 		if (i->head)
110 			return true;
111 
112 	return false;
113 }
114 
115 static void rcu_pending_list_merge(struct rcu_pending_list *l1,
116 				   struct rcu_pending_list *l2)
117 {
118 #ifdef __KERNEL__
119 	if (!l1->head)
120 		l1->head = l2->head;
121 	else
122 		l1->tail->next = l2->head;
123 #else
124 	if (!l1->head)
125 		l1->head = l2->head;
126 	else
127 		l1->tail->next.next = (void *) l2->head;
128 #endif
129 
130 	l1->tail = l2->tail;
131 	l2->head = l2->tail = NULL;
132 }
133 
134 static void rcu_pending_list_add(struct rcu_pending_list *l,
135 				 struct rcu_head *n)
136 {
137 #ifdef __KERNEL__
138 	if (!l->head)
139 		l->head = n;
140 	else
141 		l->tail->next = n;
142 	l->tail = n;
143 	n->next = NULL;
144 #else
145 	if (!l->head)
146 		l->head = n;
147 	else
148 		l->tail->next.next = (void *) n;
149 	l->tail = n;
150 	n->next.next = NULL;
151 #endif
152 }
153 
154 static void merge_expired_lists(struct rcu_pending_pcpu *p)
155 {
156 	struct rcu_pending_list *expired = &p->lists[NUM_ACTIVE_RCU_POLL_OLDSTATE];
157 
158 	for (struct rcu_pending_list *i = p->lists; i < expired; i++)
159 		if (i->head && __poll_state_synchronize_rcu(p->parent->srcu, i->seq))
160 			rcu_pending_list_merge(expired, i);
161 }
162 
163 #ifndef __KERNEL__
164 static inline void kfree_bulk(size_t nr, void ** p)
165 {
166 	while (nr--)
167 		kfree(*p);
168 }
169 
170 #define local_irq_save(flags)		\
171 do {					\
172 	flags = 0;			\
173 } while (0)
174 #endif
175 
176 static noinline void __process_finished_items(struct rcu_pending *pending,
177 					      struct rcu_pending_pcpu *p,
178 					      unsigned long flags)
179 {
180 	struct rcu_pending_list *expired = &p->lists[NUM_ACTIVE_RCU_POLL_OLDSTATE];
181 	struct rcu_pending_seq objs = {};
182 	struct rcu_head *list = NULL;
183 
184 	if (p->objs.nr &&
185 	    __poll_state_synchronize_rcu(pending->srcu, p->objs.data[0].seq)) {
186 		objs = p->objs.data[0];
187 		darray_remove_item(&p->objs, p->objs.data);
188 	}
189 
190 	merge_expired_lists(p);
191 
192 	list = expired->head;
193 	expired->head = expired->tail = NULL;
194 
195 	spin_unlock_irqrestore(&p->lock, flags);
196 
197 	switch ((ulong) pending->process) {
198 	case RCU_PENDING_KVFREE:
199 		for (size_t i = 0; i < objs.nr; ) {
200 			size_t nr_this_node = min(GENRADIX_NODE_SIZE / sizeof(void *), objs.nr - i);
201 
202 			kfree_bulk(nr_this_node, (void **) genradix_ptr(&objs.objs, i));
203 			i += nr_this_node;
204 		}
205 		genradix_free(&objs.objs);
206 
207 		while (list) {
208 			struct rcu_head *obj = list;
209 #ifdef __KERNEL__
210 			list = obj->next;
211 #else
212 			list = (void *) obj->next.next;
213 #endif
214 
215 			/*
216 			 * low bit of pointer indicates whether rcu_head needs
217 			 * to be freed - kvfree_rcu_mightsleep()
218 			 */
219 			BUILD_BUG_ON(ARCH_SLAB_MINALIGN == 0);
220 
221 			void *ptr = (void *)(((unsigned long) obj->func) & ~1UL);
222 			bool free_head = ((unsigned long) obj->func) & 1UL;
223 
224 			kvfree(ptr);
225 			if (free_head)
226 				kfree(obj);
227 		}
228 
229 		break;
230 
231 	case RCU_PENDING_CALL_RCU:
232 		for (size_t i = 0; i < objs.nr; i++) {
233 			struct rcu_head *obj = *genradix_ptr(&objs.objs, i);
234 			obj->func(obj);
235 		}
236 		genradix_free(&objs.objs);
237 
238 		while (list) {
239 			struct rcu_head *obj = list;
240 #ifdef __KERNEL__
241 			list = obj->next;
242 #else
243 			list = (void *) obj->next.next;
244 #endif
245 			obj->func(obj);
246 		}
247 		break;
248 
249 	default:
250 		for (size_t i = 0; i < objs.nr; i++)
251 			pending->process(pending, *genradix_ptr(&objs.objs, i));
252 		genradix_free(&objs.objs);
253 
254 		while (list) {
255 			struct rcu_head *obj = list;
256 #ifdef __KERNEL__
257 			list = obj->next;
258 #else
259 			list = (void *) obj->next.next;
260 #endif
261 			pending->process(pending, obj);
262 		}
263 		break;
264 	}
265 }
266 
267 static bool process_finished_items(struct rcu_pending *pending,
268 				   struct rcu_pending_pcpu *p,
269 				   unsigned long flags)
270 {
271 	/*
272 	 * XXX: we should grab the gp seq once and avoid multiple function
273 	 * calls, this is called from __rcu_pending_enqueue() fastpath in
274 	 * may_sleep==true mode
275 	 */
276 	if ((p->objs.nr && __poll_state_synchronize_rcu(pending->srcu, p->objs.data[0].seq)) ||
277 	    (p->lists[0].head && __poll_state_synchronize_rcu(pending->srcu, p->lists[0].seq)) ||
278 	    (p->lists[1].head && __poll_state_synchronize_rcu(pending->srcu, p->lists[1].seq)) ||
279 	    p->lists[2].head) {
280 		__process_finished_items(pending, p, flags);
281 		return true;
282 	}
283 
284 	return false;
285 }
286 
287 static void rcu_pending_work(struct work_struct *work)
288 {
289 	struct rcu_pending_pcpu *p =
290 		container_of(work, struct rcu_pending_pcpu, work);
291 	struct rcu_pending *pending = p->parent;
292 	unsigned long flags;
293 
294 	do {
295 		spin_lock_irqsave(&p->lock, flags);
296 	} while (process_finished_items(pending, p, flags));
297 
298 	spin_unlock_irqrestore(&p->lock, flags);
299 }
300 
301 static void rcu_pending_rcu_cb(struct rcu_head *rcu)
302 {
303 	struct rcu_pending_pcpu *p = container_of(rcu, struct rcu_pending_pcpu, cb);
304 
305 	schedule_work_on(p->cpu, &p->work);
306 
307 	unsigned long flags;
308 	spin_lock_irqsave(&p->lock, flags);
309 	if (__rcu_pending_has_pending(p)) {
310 		spin_unlock_irqrestore(&p->lock, flags);
311 		__call_rcu(p->parent->srcu, &p->cb, rcu_pending_rcu_cb);
312 	} else {
313 		p->cb_armed = false;
314 		spin_unlock_irqrestore(&p->lock, flags);
315 	}
316 }
317 
318 static __always_inline struct rcu_pending_seq *
319 get_object_radix(struct rcu_pending_pcpu *p, unsigned long seq)
320 {
321 	darray_for_each_reverse(p->objs, objs)
322 		if (objs->seq == seq)
323 			return objs;
324 
325 	if (darray_push_gfp(&p->objs, ((struct rcu_pending_seq) { .seq = seq }), GFP_ATOMIC))
326 		return NULL;
327 
328 	return &darray_last(p->objs);
329 }
330 
331 static noinline bool
332 rcu_pending_enqueue_list(struct rcu_pending_pcpu *p, unsigned long seq,
333 			 struct rcu_head *head, void *ptr,
334 			 unsigned long *flags)
335 {
336 	if (ptr) {
337 		if (!head) {
338 			/*
339 			 * kvfree_rcu_mightsleep(): we weren't passed an
340 			 * rcu_head, but we need one: use the low bit of the
341 			 * ponter to free to flag that the head needs to be
342 			 * freed as well:
343 			 */
344 			ptr = (void *)(((unsigned long) ptr)|1UL);
345 			head = kmalloc(sizeof(*head), __GFP_NOWARN);
346 			if (!head) {
347 				spin_unlock_irqrestore(&p->lock, *flags);
348 				head = kmalloc(sizeof(*head), GFP_KERNEL|__GFP_NOFAIL);
349 				/*
350 				 * dropped lock, did GFP_KERNEL allocation,
351 				 * check for gp expiration
352 				 */
353 				if (unlikely(__poll_state_synchronize_rcu(p->parent->srcu, seq))) {
354 					kvfree(--ptr);
355 					kfree(head);
356 					spin_lock_irqsave(&p->lock, *flags);
357 					return false;
358 				}
359 			}
360 		}
361 
362 		head->func = ptr;
363 	}
364 again:
365 	for (struct rcu_pending_list *i = p->lists;
366 	     i < p->lists + NUM_ACTIVE_RCU_POLL_OLDSTATE; i++) {
367 		if (i->seq == seq) {
368 			rcu_pending_list_add(i, head);
369 			return false;
370 		}
371 	}
372 
373 	for (struct rcu_pending_list *i = p->lists;
374 	     i < p->lists + NUM_ACTIVE_RCU_POLL_OLDSTATE; i++) {
375 		if (!i->head) {
376 			i->seq = seq;
377 			rcu_pending_list_add(i, head);
378 			return true;
379 		}
380 	}
381 
382 	merge_expired_lists(p);
383 	goto again;
384 }
385 
386 /*
387  * __rcu_pending_enqueue: enqueue a pending RCU item, to be processed (via
388  * pending->pracess) once grace period elapses.
389  *
390  * Attempt to enqueue items onto a radix tree; if memory allocation fails, fall
391  * back to a linked list.
392  *
393  * - If @ptr is NULL, we're enqueuing an item for a generic @pending with a
394  *   process callback
395  *
396  * - If @ptr and @head are both not NULL, we're kvfree_rcu()
397  *
398  * - If @ptr is not NULL and @head is, we're kvfree_rcu_mightsleep()
399  *
400  * - If @may_sleep is true, will do GFP_KERNEL memory allocations and process
401  *   expired items.
402  */
403 static __always_inline void
404 __rcu_pending_enqueue(struct rcu_pending *pending, struct rcu_head *head,
405 		      void *ptr, bool may_sleep)
406 {
407 
408 	struct rcu_pending_pcpu *p;
409 	struct rcu_pending_seq *objs;
410 	struct genradix_node *new_node = NULL;
411 	unsigned long seq, flags;
412 	bool start_gp = false;
413 
414 	BUG_ON((ptr != NULL) != (pending->process == RCU_PENDING_KVFREE_FN));
415 
416 	local_irq_save(flags);
417 	p = this_cpu_ptr(pending->p);
418 	spin_lock(&p->lock);
419 	seq = __get_state_synchronize_rcu(pending->srcu);
420 restart:
421 	if (may_sleep &&
422 	    unlikely(process_finished_items(pending, p, flags)))
423 		goto check_expired;
424 
425 	/*
426 	 * In kvfree_rcu() mode, the radix tree is only for slab pointers so
427 	 * that we can do kfree_bulk() - vmalloc pointers always use the linked
428 	 * list:
429 	 */
430 	if (ptr && unlikely(is_vmalloc_addr(ptr)))
431 		goto list_add;
432 
433 	objs = get_object_radix(p, seq);
434 	if (unlikely(!objs))
435 		goto list_add;
436 
437 	if (unlikely(!objs->cursor)) {
438 		/*
439 		 * New radix tree nodes must be added under @p->lock because the
440 		 * tree root is in a darray that can be resized (typically,
441 		 * genradix supports concurrent unlocked allocation of new
442 		 * nodes) - hence preallocation and the retry loop:
443 		 */
444 		objs->cursor = genradix_ptr_alloc_preallocated_inlined(&objs->objs,
445 						objs->nr, &new_node, GFP_ATOMIC|__GFP_NOWARN);
446 		if (unlikely(!objs->cursor)) {
447 			if (may_sleep) {
448 				spin_unlock_irqrestore(&p->lock, flags);
449 
450 				gfp_t gfp = GFP_KERNEL;
451 				if (!head)
452 					gfp |= __GFP_NOFAIL;
453 
454 				new_node = genradix_alloc_node(gfp);
455 				if (!new_node)
456 					may_sleep = false;
457 				goto check_expired;
458 			}
459 list_add:
460 			start_gp = rcu_pending_enqueue_list(p, seq, head, ptr, &flags);
461 			goto start_gp;
462 		}
463 	}
464 
465 	*objs->cursor++ = ptr ?: head;
466 	/* zero cursor if we hit the end of a radix tree node: */
467 	if (!(((ulong) objs->cursor) & (GENRADIX_NODE_SIZE - 1)))
468 		objs->cursor = NULL;
469 	start_gp = !objs->nr;
470 	objs->nr++;
471 start_gp:
472 	if (unlikely(start_gp)) {
473 		/*
474 		 * We only have one callback (ideally, we would have one for
475 		 * every outstanding graceperiod) - so if our callback is
476 		 * already in flight, we may still have to start a grace period
477 		 * (since we used get_state() above, not start_poll())
478 		 */
479 		if (!p->cb_armed) {
480 			p->cb_armed = true;
481 			__call_rcu(pending->srcu, &p->cb, rcu_pending_rcu_cb);
482 		} else {
483 			__start_poll_synchronize_rcu(pending->srcu);
484 		}
485 	}
486 	spin_unlock_irqrestore(&p->lock, flags);
487 free_node:
488 	if (new_node)
489 		genradix_free_node(new_node);
490 	return;
491 check_expired:
492 	if (unlikely(__poll_state_synchronize_rcu(pending->srcu, seq))) {
493 		switch ((ulong) pending->process) {
494 		case RCU_PENDING_KVFREE:
495 			kvfree(ptr);
496 			break;
497 		case RCU_PENDING_CALL_RCU:
498 			head->func(head);
499 			break;
500 		default:
501 			pending->process(pending, head);
502 			break;
503 		}
504 		goto free_node;
505 	}
506 
507 	local_irq_save(flags);
508 	p = this_cpu_ptr(pending->p);
509 	spin_lock(&p->lock);
510 	goto restart;
511 }
512 
513 void rcu_pending_enqueue(struct rcu_pending *pending, struct rcu_head *obj)
514 {
515 	__rcu_pending_enqueue(pending, obj, NULL, true);
516 }
517 
518 static struct rcu_head *rcu_pending_pcpu_dequeue(struct rcu_pending_pcpu *p)
519 {
520 	struct rcu_head *ret = NULL;
521 
522 	spin_lock_irq(&p->lock);
523 	darray_for_each(p->objs, objs)
524 		if (objs->nr) {
525 			ret = *genradix_ptr(&objs->objs, --objs->nr);
526 			objs->cursor = NULL;
527 			if (!objs->nr)
528 				genradix_free(&objs->objs);
529 			goto out;
530 		}
531 
532 	static_array_for_each(p->lists, i)
533 		if (i->head) {
534 			ret = i->head;
535 #ifdef __KERNEL__
536 			i->head = ret->next;
537 #else
538 			i->head = (void *) ret->next.next;
539 #endif
540 			if (!i->head)
541 				i->tail = NULL;
542 			goto out;
543 		}
544 out:
545 	spin_unlock_irq(&p->lock);
546 
547 	return ret;
548 }
549 
550 struct rcu_head *rcu_pending_dequeue(struct rcu_pending *pending)
551 {
552 	return rcu_pending_pcpu_dequeue(raw_cpu_ptr(pending->p));
553 }
554 
555 struct rcu_head *rcu_pending_dequeue_from_all(struct rcu_pending *pending)
556 {
557 	struct rcu_head *ret = rcu_pending_dequeue(pending);
558 
559 	if (ret)
560 		return ret;
561 
562 	int cpu;
563 	for_each_possible_cpu(cpu) {
564 		ret = rcu_pending_pcpu_dequeue(per_cpu_ptr(pending->p, cpu));
565 		if (ret)
566 			break;
567 	}
568 	return ret;
569 }
570 
571 static bool rcu_pending_has_pending_or_armed(struct rcu_pending *pending)
572 {
573 	int cpu;
574 	for_each_possible_cpu(cpu) {
575 		struct rcu_pending_pcpu *p = per_cpu_ptr(pending->p, cpu);
576 		spin_lock_irq(&p->lock);
577 		if (__rcu_pending_has_pending(p) || p->cb_armed) {
578 			spin_unlock_irq(&p->lock);
579 			return true;
580 		}
581 		spin_unlock_irq(&p->lock);
582 	}
583 
584 	return false;
585 }
586 
587 void rcu_pending_exit(struct rcu_pending *pending)
588 {
589 	int cpu;
590 
591 	if (!pending->p)
592 		return;
593 
594 	while (rcu_pending_has_pending_or_armed(pending)) {
595 		__rcu_barrier(pending->srcu);
596 
597 		for_each_possible_cpu(cpu) {
598 			struct rcu_pending_pcpu *p = per_cpu_ptr(pending->p, cpu);
599 			flush_work(&p->work);
600 		}
601 	}
602 
603 	for_each_possible_cpu(cpu) {
604 		struct rcu_pending_pcpu *p = per_cpu_ptr(pending->p, cpu);
605 		flush_work(&p->work);
606 	}
607 
608 	for_each_possible_cpu(cpu) {
609 		struct rcu_pending_pcpu *p = per_cpu_ptr(pending->p, cpu);
610 
611 		static_array_for_each(p->lists, i)
612 			WARN_ON(i->head);
613 		WARN_ON(p->objs.nr);
614 		darray_exit(&p->objs);
615 	}
616 	free_percpu(pending->p);
617 }
618 
619 /**
620  * rcu_pending_init: - initialize a rcu_pending
621  *
622  * @pending:	Object to init
623  * @srcu:	May optionally be used with an srcu_struct; if NULL, uses normal
624  *		RCU flavor
625  * @process:	Callback function invoked on objects once their RCU barriers
626  *		have completed; if NULL, kvfree() is used.
627  */
628 int rcu_pending_init(struct rcu_pending *pending,
629 		     struct srcu_struct *srcu,
630 		     rcu_pending_process_fn process)
631 {
632 	pending->p = alloc_percpu(struct rcu_pending_pcpu);
633 	if (!pending->p)
634 		return -ENOMEM;
635 
636 	int cpu;
637 	for_each_possible_cpu(cpu) {
638 		struct rcu_pending_pcpu *p = per_cpu_ptr(pending->p, cpu);
639 		p->parent	= pending;
640 		p->cpu		= cpu;
641 		spin_lock_init(&p->lock);
642 		darray_init(&p->objs);
643 		INIT_WORK(&p->work, rcu_pending_work);
644 	}
645 
646 	pending->srcu = srcu;
647 	pending->process = process;
648 
649 	return 0;
650 }
651