xref: /freebsd/sys/sys/buf_ring.h (revision 90cd9c203eb536581e19a6fdfe43e6dedb22089f)
1 /*-
2  * SPDX-License-Identifier: BSD-2-Clause
3  *
4  * Copyright (c) 2007-2009 Kip Macy <kmacy@freebsd.org>
5  * All rights reserved.
6  * Copyright (c) 2024 Arm Ltd
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
21  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27  * SUCH DAMAGE.
28  *
29  */
30 
31 #ifndef	_SYS_BUF_RING_H_
32 #define	_SYS_BUF_RING_H_
33 
34 #include <sys/param.h>
35 #include <sys/kassert.h>
36 #include <machine/atomic.h>
37 #include <machine/cpu.h>
38 
39 #if defined(DEBUG_BUFRING) && defined(_KERNEL)
40 #include <sys/lock.h>
41 #include <sys/mutex.h>
42 #endif
43 
44 /*
45  * We only apply the mask to the head and tail values when calculating the
46  * index into br_ring to access. This means the upper bits can be used as
47  * epoch to reduce the chance the atomic_cmpset succeedes when it should
48  * fail, e.g. when the head wraps while the CPU is in an interrupt. This
49  * is a probablistic fix as there is still a very unlikely chance the
50  * value wraps back to the expected value.
51  *
52  */
53 struct buf_ring {
54 	uint32_t		br_prod_head;
55 	uint32_t		br_prod_tail;
56 	int              	br_prod_size;
57 	int              	br_prod_mask;
58 	uint64_t		br_drops;
59 	uint32_t		br_cons_head __aligned(CACHE_LINE_SIZE);
60 	uint32_t		br_cons_tail;
61 	int		 	br_cons_size;
62 	int              	br_cons_mask;
63 #if defined(DEBUG_BUFRING) && defined(_KERNEL)
64 	struct mtx		*br_lock;
65 #endif
66 	void			*br_ring[0] __aligned(CACHE_LINE_SIZE);
67 };
68 
69 /*
70  * multi-producer safe lock-free ring buffer enqueue
71  *
72  */
73 static __inline int
buf_ring_enqueue(struct buf_ring * br,void * buf)74 buf_ring_enqueue(struct buf_ring *br, void *buf)
75 {
76 	uint32_t prod_head, prod_next, prod_idx;
77 	uint32_t cons_tail, mask;
78 
79 	mask = br->br_prod_mask;
80 #ifdef DEBUG_BUFRING
81 	/*
82 	 * Note: It is possible to encounter an mbuf that was removed
83 	 * via drbr_peek(), and then re-added via drbr_putback() and
84 	 * trigger a spurious panic.
85 	 */
86 	for (uint32_t i = atomic_load_32(&br->br_cons_head);
87 	    i != atomic_load_32(&br->br_prod_head); i++)
88 		if (br->br_ring[i & mask] == buf)
89 			panic("buf=%p already enqueue at %d prod=%d cons=%d",
90 			    buf, i, atomic_load_32(&br->br_prod_tail),
91 			    atomic_load_32(&br->br_cons_tail));
92 #endif
93 	critical_enter();
94 	do {
95 		/*
96 		 * br->br_prod_head needs to be read before br->br_cons_tail.
97 		 * If not then we could perform the dequeue and enqueue
98 		 * between reading br_cons_tail and reading br_prod_head. This
99 		 * could give us values where br_cons_head == br_prod_tail
100 		 * (after masking).
101 		 *
102 		 * To work around this us a load acquire. This is just to
103 		 * ensure ordering within this thread.
104 		 */
105 		prod_head = atomic_load_acq_32(&br->br_prod_head);
106 		prod_next = prod_head + 1;
107 		cons_tail = atomic_load_acq_32(&br->br_cons_tail);
108 
109 		if ((int32_t)(cons_tail + br->br_prod_size - prod_next) < 1) {
110 			rmb();
111 			if (prod_head == atomic_load_32(&br->br_prod_head) &&
112 			    cons_tail == atomic_load_32(&br->br_cons_tail)) {
113 				br->br_drops++;
114 				critical_exit();
115 				return (ENOBUFS);
116 			}
117 			continue;
118 		}
119 	} while (!atomic_cmpset_acq_32(&br->br_prod_head, prod_head, prod_next));
120 	prod_idx = prod_head & mask;
121 #ifdef DEBUG_BUFRING
122 	if (br->br_ring[prod_idx] != NULL)
123 		panic("dangling value in enqueue");
124 #endif
125 	br->br_ring[prod_idx] = buf;
126 
127 	/*
128 	 * If there are other enqueues in progress
129 	 * that preceded us, we need to wait for them
130 	 * to complete
131 	 */
132 	while (atomic_load_32(&br->br_prod_tail) != prod_head)
133 		cpu_spinwait();
134 	atomic_store_rel_32(&br->br_prod_tail, prod_next);
135 	critical_exit();
136 	return (0);
137 }
138 
139 /*
140  * multi-consumer safe dequeue
141  *
142  */
143 static __inline void *
buf_ring_dequeue_mc(struct buf_ring * br)144 buf_ring_dequeue_mc(struct buf_ring *br)
145 {
146 	uint32_t cons_head, cons_next, cons_idx;
147 	uint32_t prod_tail, mask;
148 	void *buf;
149 
150 	critical_enter();
151 	mask = br->br_cons_mask;
152 	do {
153 		/*
154 		 * As with buf_ring_enqueue ensure we read the head before
155 		 * the tail. If we read them in the wrong order we may
156 		 * think the bug_ring is full when it is empty.
157 		 */
158 		cons_head = atomic_load_acq_32(&br->br_cons_head);
159 		cons_next = cons_head + 1;
160 		prod_tail = atomic_load_acq_32(&br->br_prod_tail);
161 
162 		if (cons_head == prod_tail) {
163 			critical_exit();
164 			return (NULL);
165 		}
166 	} while (!atomic_cmpset_acq_32(&br->br_cons_head, cons_head, cons_next));
167 	cons_idx = cons_head & mask;
168 
169 	buf = br->br_ring[cons_idx];
170 #ifdef DEBUG_BUFRING
171 	br->br_ring[cons_idx] = NULL;
172 #endif
173 	/*
174 	 * If there are other dequeues in progress
175 	 * that preceded us, we need to wait for them
176 	 * to complete
177 	 */
178 	while (atomic_load_32(&br->br_cons_tail) != cons_head)
179 		cpu_spinwait();
180 
181 	atomic_store_rel_32(&br->br_cons_tail, cons_next);
182 	critical_exit();
183 
184 	return (buf);
185 }
186 
187 /*
188  * single-consumer dequeue
189  * use where dequeue is protected by a lock
190  * e.g. a network driver's tx queue lock
191  */
192 static __inline void *
buf_ring_dequeue_sc(struct buf_ring * br)193 buf_ring_dequeue_sc(struct buf_ring *br)
194 {
195 	uint32_t cons_head, cons_next, cons_idx;
196 	uint32_t prod_tail, mask;
197 	void *buf;
198 
199 	mask = br->br_cons_mask;
200 	cons_head = atomic_load_32(&br->br_cons_head);
201 	prod_tail = atomic_load_acq_32(&br->br_prod_tail);
202 
203 	cons_next = cons_head + 1;
204 
205 	if (cons_head == prod_tail)
206 		return (NULL);
207 
208 	cons_idx = cons_head & mask;
209 	atomic_store_32(&br->br_cons_head, cons_next);
210 	buf = br->br_ring[cons_idx];
211 
212 #ifdef DEBUG_BUFRING
213 	br->br_ring[cons_idx] = NULL;
214 #ifdef _KERNEL
215 	if (!mtx_owned(br->br_lock))
216 		panic("lock not held on single consumer dequeue");
217 #endif
218 	if (atomic_load_32(&br->br_cons_tail) != cons_head)
219 		panic("inconsistent list cons_tail=%d cons_head=%d",
220 		    atomic_load_32(&br->br_cons_tail), cons_head);
221 #endif
222 	atomic_store_rel_32(&br->br_cons_tail, cons_next);
223 	return (buf);
224 }
225 
226 /*
227  * single-consumer advance after a peek
228  * use where it is protected by a lock
229  * e.g. a network driver's tx queue lock
230  */
231 static __inline void
buf_ring_advance_sc(struct buf_ring * br)232 buf_ring_advance_sc(struct buf_ring *br)
233 {
234 	uint32_t cons_head, cons_next, prod_tail;
235 #ifdef DEBUG_BUFRING
236 	uint32_t mask;
237 
238 	mask = br->br_cons_mask;
239 #endif
240 	cons_head = atomic_load_32(&br->br_cons_head);
241 	prod_tail = atomic_load_32(&br->br_prod_tail);
242 
243 	cons_next = cons_head + 1;
244 	if (cons_head == prod_tail)
245 		return;
246 	atomic_store_32(&br->br_cons_head, cons_next);
247 #ifdef DEBUG_BUFRING
248 	br->br_ring[cons_head & mask] = NULL;
249 #endif
250 	atomic_store_rel_32(&br->br_cons_tail, cons_next);
251 }
252 
253 /*
254  * Used to return a buffer (most likely already there)
255  * to the top of the ring. The caller should *not*
256  * have used any dequeue to pull it out of the ring
257  * but instead should have used the peek() function.
258  * This is normally used where the transmit queue
259  * of a driver is full, and an mbuf must be returned.
260  * Most likely whats in the ring-buffer is what
261  * is being put back (since it was not removed), but
262  * sometimes the lower transmit function may have
263  * done a pullup or other function that will have
264  * changed it. As an optimization we always put it
265  * back (since jhb says the store is probably cheaper),
266  * if we have to do a multi-queue version we will need
267  * the compare and an atomic.
268  */
269 static __inline void
buf_ring_putback_sc(struct buf_ring * br,void * new)270 buf_ring_putback_sc(struct buf_ring *br, void *new)
271 {
272 	uint32_t cons_idx, mask;
273 
274 	mask = br->br_cons_mask;
275 	cons_idx = atomic_load_32(&br->br_cons_head) & mask;
276 	KASSERT(cons_idx != (atomic_load_32(&br->br_prod_tail) & mask),
277 		("Buf-Ring has none in putback")) ;
278 	br->br_ring[cons_idx] = new;
279 }
280 
281 /*
282  * return a pointer to the first entry in the ring
283  * without modifying it, or NULL if the ring is empty
284  * race-prone if not protected by a lock
285  */
286 static __inline void *
buf_ring_peek(struct buf_ring * br)287 buf_ring_peek(struct buf_ring *br)
288 {
289 	uint32_t cons_head, prod_tail, mask;
290 
291 #if defined(DEBUG_BUFRING) && defined(_KERNEL)
292 	if ((br->br_lock != NULL) && !mtx_owned(br->br_lock))
293 		panic("lock not held on single consumer dequeue");
294 #endif
295 	mask = br->br_cons_mask;
296 	prod_tail = atomic_load_acq_32(&br->br_prod_tail);
297 	cons_head = atomic_load_32(&br->br_cons_head);
298 
299 	if (cons_head == prod_tail)
300 		return (NULL);
301 
302 	return (br->br_ring[cons_head & mask]);
303 }
304 
305 static __inline void *
buf_ring_peek_clear_sc(struct buf_ring * br)306 buf_ring_peek_clear_sc(struct buf_ring *br)
307 {
308 	uint32_t cons_head, prod_tail, mask;
309 	void *ret;
310 
311 #if defined(DEBUG_BUFRING) && defined(_KERNEL)
312 	if (!mtx_owned(br->br_lock))
313 		panic("lock not held on single consumer dequeue");
314 #endif
315 
316 	mask = br->br_cons_mask;
317 	prod_tail = atomic_load_acq_32(&br->br_prod_tail);
318 	cons_head = atomic_load_32(&br->br_cons_head);
319 
320 	if (cons_head == prod_tail)
321 		return (NULL);
322 
323 	ret = br->br_ring[cons_head & mask];
324 #ifdef DEBUG_BUFRING
325 	/*
326 	 * Single consumer, i.e. cons_head will not move while we are
327 	 * running, so atomic_swap_ptr() is not necessary here.
328 	 */
329 	br->br_ring[cons_head & mask] = NULL;
330 #endif
331 	return (ret);
332 }
333 
334 static __inline int
buf_ring_full(struct buf_ring * br)335 buf_ring_full(struct buf_ring *br)
336 {
337 
338 	return (atomic_load_32(&br->br_prod_head) ==
339 	    atomic_load_32(&br->br_cons_tail) + br->br_cons_size - 1);
340 }
341 
342 static __inline int
buf_ring_empty(struct buf_ring * br)343 buf_ring_empty(struct buf_ring *br)
344 {
345 
346 	return (atomic_load_32(&br->br_cons_head) ==
347 	    atomic_load_32(&br->br_prod_tail));
348 }
349 
350 static __inline int
buf_ring_count(struct buf_ring * br)351 buf_ring_count(struct buf_ring *br)
352 {
353 	uint32_t cons_tail, prod_tail;
354 
355 	cons_tail = atomic_load_32(&br->br_cons_tail);
356 	prod_tail = atomic_load_32(&br->br_prod_tail);
357 	return ((br->br_prod_size + prod_tail - cons_tail) & br->br_prod_mask);
358 }
359 
360 #ifdef _KERNEL
361 struct buf_ring *buf_ring_alloc(int count, struct malloc_type *type, int flags,
362     struct mtx *);
363 void buf_ring_free(struct buf_ring *br, struct malloc_type *type);
364 #else
365 
366 #include <stdlib.h>
367 
368 static inline struct buf_ring *
buf_ring_alloc(int count)369 buf_ring_alloc(int count)
370 {
371 	struct buf_ring *br;
372 
373 	KASSERT(powerof2(count), ("buf ring must be size power of 2"));
374 
375 	br = calloc(1, sizeof(struct buf_ring) + count * sizeof(void *));
376 	if (br == NULL)
377 		return (NULL);
378 	br->br_prod_size = br->br_cons_size = count;
379 	br->br_prod_mask = br->br_cons_mask = count - 1;
380 	br->br_prod_head = br->br_cons_head = 0;
381 	br->br_prod_tail = br->br_cons_tail = 0;
382 	return (br);
383 }
384 
385 static inline void
buf_ring_free(struct buf_ring * br)386 buf_ring_free(struct buf_ring *br)
387 {
388 	free(br);
389 }
390 
391 #endif /* !_KERNEL */
392 #endif /* _SYS_BUF_RING_H_ */
393