xref: /freebsd/sys/sys/buf_ring.h (revision 37bb465d27eefd2237e6e3af9592457fccf324e2)
1 /*-
2  * SPDX-License-Identifier: BSD-2-Clause
3  *
4  * Copyright (c) 2007-2009 Kip Macy <kmacy@freebsd.org>
5  * All rights reserved.
6  * Copyright (c) 2024 Arm Ltd
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
21  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27  * SUCH DAMAGE.
28  *
29  */
30 
31 #ifndef	_SYS_BUF_RING_H_
32 #define	_SYS_BUF_RING_H_
33 
34 #include <sys/param.h>
35 #include <sys/kassert.h>
36 #include <machine/atomic.h>
37 #include <machine/cpu.h>
38 
39 #if defined(DEBUG_BUFRING) && defined(_KERNEL)
40 #include <sys/lock.h>
41 #include <sys/mutex.h>
42 #endif
43 
44 /*
45  * We only apply the mask to the head and tail values when calculating the
46  * index into br_ring to access. This means the upper bits can be used as
47  * epoch to reduce the chance the atomic_cmpset succeedes when it should
48  * fail, e.g. when the head wraps while the CPU is in an interrupt. This
49  * is a probablistic fix as there is still a very unlikely chance the
50  * value wraps back to the expected value.
51  *
52  */
53 struct buf_ring {
54 	uint32_t		br_prod_head;
55 	uint32_t		br_prod_tail;
56 	int              	br_prod_size;
57 	int              	br_prod_mask;
58 	uint64_t		br_drops;
59 	uint32_t		br_cons_head __aligned(CACHE_LINE_SIZE);
60 	uint32_t		br_cons_tail;
61 	int		 	br_cons_size;
62 	int              	br_cons_mask;
63 #if defined(DEBUG_BUFRING) && defined(_KERNEL)
64 	struct mtx		*br_lock;
65 #endif
66 	void			*br_ring[0] __aligned(CACHE_LINE_SIZE);
67 };
68 
69 /*
70  * multi-producer safe lock-free ring buffer enqueue
71  *
72  */
73 static __inline int
buf_ring_enqueue(struct buf_ring * br,void * buf)74 buf_ring_enqueue(struct buf_ring *br, void *buf)
75 {
76 	uint32_t prod_head, prod_next, prod_idx;
77 	uint32_t cons_tail, mask;
78 
79 	mask = br->br_prod_mask;
80 #ifdef DEBUG_BUFRING
81 	/*
82 	 * Note: It is possible to encounter an mbuf that was removed
83 	 * via drbr_peek(), and then re-added via drbr_putback() and
84 	 * trigger a spurious panic.
85 	 */
86 	for (uint32_t i = atomic_load_32(&br->br_cons_head);
87 	    i != atomic_load_32(&br->br_prod_head); i++)
88 		if (br->br_ring[i & mask] == buf)
89 			panic("buf=%p already enqueue at %d prod=%d cons=%d",
90 			    buf, i, atomic_load_32(&br->br_prod_tail),
91 			    atomic_load_32(&br->br_cons_tail));
92 #endif
93 	critical_enter();
94 	do {
95 		/*
96 		 * br->br_prod_head needs to be read before br->br_cons_tail.
97 		 * If not then we could perform the dequeue and enqueue
98 		 * between reading br_cons_tail and reading br_prod_head. This
99 		 * could give us values where br_cons_head == br_prod_tail
100 		 * (after masking).
101 		 *
102 		 * To work around this us a load acquire. This is just to
103 		 * ensure ordering within this thread.
104 		 */
105 		prod_head = atomic_load_acq_32(&br->br_prod_head);
106 		prod_next = prod_head + 1;
107 		cons_tail = atomic_load_acq_32(&br->br_cons_tail);
108 
109 		if ((int32_t)(cons_tail + br->br_prod_size - prod_next) < 1) {
110 			if (prod_head == atomic_load_32(&br->br_prod_head) &&
111 			    cons_tail == atomic_load_32(&br->br_cons_tail)) {
112 				br->br_drops++;
113 				critical_exit();
114 				return (ENOBUFS);
115 			}
116 			continue;
117 		}
118 	} while (!atomic_cmpset_32(&br->br_prod_head, prod_head, prod_next));
119 	prod_idx = prod_head & mask;
120 #ifdef DEBUG_BUFRING
121 	if (br->br_ring[prod_idx] != NULL)
122 		panic("dangling value in enqueue");
123 #endif
124 	br->br_ring[prod_idx] = buf;
125 
126 	/*
127 	 * If there are other enqueues in progress
128 	 * that preceded us, we need to wait for them
129 	 * to complete
130 	 */
131 	while (atomic_load_32(&br->br_prod_tail) != prod_head)
132 		cpu_spinwait();
133 	atomic_store_rel_32(&br->br_prod_tail, prod_next);
134 	critical_exit();
135 	return (0);
136 }
137 
138 /*
139  * multi-consumer safe dequeue
140  *
141  */
142 static __inline void *
buf_ring_dequeue_mc(struct buf_ring * br)143 buf_ring_dequeue_mc(struct buf_ring *br)
144 {
145 	uint32_t cons_head, cons_next, cons_idx;
146 	uint32_t prod_tail, mask;
147 	void *buf;
148 
149 	critical_enter();
150 	mask = br->br_cons_mask;
151 	do {
152 		/*
153 		 * As with buf_ring_enqueue ensure we read the head before
154 		 * the tail. If we read them in the wrong order we may
155 		 * think the bug_ring is full when it is empty.
156 		 */
157 		cons_head = atomic_load_acq_32(&br->br_cons_head);
158 		cons_next = cons_head + 1;
159 		prod_tail = atomic_load_acq_32(&br->br_prod_tail);
160 
161 		if (cons_head == prod_tail) {
162 			critical_exit();
163 			return (NULL);
164 		}
165 	} while (!atomic_cmpset_32(&br->br_cons_head, cons_head, cons_next));
166 	cons_idx = cons_head & mask;
167 
168 	buf = br->br_ring[cons_idx];
169 #ifdef DEBUG_BUFRING
170 	br->br_ring[cons_idx] = NULL;
171 #endif
172 	/*
173 	 * If there are other dequeues in progress
174 	 * that preceded us, we need to wait for them
175 	 * to complete
176 	 */
177 	while (atomic_load_32(&br->br_cons_tail) != cons_head)
178 		cpu_spinwait();
179 
180 	atomic_store_rel_32(&br->br_cons_tail, cons_next);
181 	critical_exit();
182 
183 	return (buf);
184 }
185 
186 /*
187  * single-consumer dequeue
188  * use where dequeue is protected by a lock
189  * e.g. a network driver's tx queue lock
190  */
191 static __inline void *
buf_ring_dequeue_sc(struct buf_ring * br)192 buf_ring_dequeue_sc(struct buf_ring *br)
193 {
194 	uint32_t cons_head, cons_next, cons_idx;
195 	uint32_t prod_tail, mask;
196 	void *buf;
197 
198 	mask = br->br_cons_mask;
199 	cons_head = atomic_load_32(&br->br_cons_head);
200 	prod_tail = atomic_load_acq_32(&br->br_prod_tail);
201 
202 	cons_next = cons_head + 1;
203 
204 	if (cons_head == prod_tail)
205 		return (NULL);
206 
207 	cons_idx = cons_head & mask;
208 	atomic_store_32(&br->br_cons_head, cons_next);
209 	buf = br->br_ring[cons_idx];
210 
211 #ifdef DEBUG_BUFRING
212 	br->br_ring[cons_idx] = NULL;
213 #ifdef _KERNEL
214 	if (!mtx_owned(br->br_lock))
215 		panic("lock not held on single consumer dequeue");
216 #endif
217 	if (atomic_load_32(&br->br_cons_tail) != cons_head)
218 		panic("inconsistent list cons_tail=%d cons_head=%d",
219 		    atomic_load_32(&br->br_cons_tail), cons_head);
220 #endif
221 	atomic_store_rel_32(&br->br_cons_tail, cons_next);
222 	return (buf);
223 }
224 
225 /*
226  * single-consumer advance after a peek
227  * use where it is protected by a lock
228  * e.g. a network driver's tx queue lock
229  */
230 static __inline void
buf_ring_advance_sc(struct buf_ring * br)231 buf_ring_advance_sc(struct buf_ring *br)
232 {
233 	uint32_t cons_head, cons_next, prod_tail;
234 #ifdef DEBUG_BUFRING
235 	uint32_t mask;
236 
237 	mask = br->br_cons_mask;
238 #endif
239 	cons_head = atomic_load_32(&br->br_cons_head);
240 	prod_tail = atomic_load_32(&br->br_prod_tail);
241 
242 	cons_next = cons_head + 1;
243 	if (cons_head == prod_tail)
244 		return;
245 	atomic_store_32(&br->br_cons_head, cons_next);
246 #ifdef DEBUG_BUFRING
247 	br->br_ring[cons_head & mask] = NULL;
248 #endif
249 	atomic_store_rel_32(&br->br_cons_tail, cons_next);
250 }
251 
252 /*
253  * Used to return a buffer (most likely already there)
254  * to the top of the ring. The caller should *not*
255  * have used any dequeue to pull it out of the ring
256  * but instead should have used the peek() function.
257  * This is normally used where the transmit queue
258  * of a driver is full, and an mbuf must be returned.
259  * Most likely whats in the ring-buffer is what
260  * is being put back (since it was not removed), but
261  * sometimes the lower transmit function may have
262  * done a pullup or other function that will have
263  * changed it. As an optimization we always put it
264  * back (since jhb says the store is probably cheaper),
265  * if we have to do a multi-queue version we will need
266  * the compare and an atomic.
267  */
268 static __inline void
buf_ring_putback_sc(struct buf_ring * br,void * new)269 buf_ring_putback_sc(struct buf_ring *br, void *new)
270 {
271 	uint32_t cons_idx, mask;
272 
273 	mask = br->br_cons_mask;
274 	cons_idx = atomic_load_32(&br->br_cons_head) & mask;
275 	KASSERT(cons_idx != (atomic_load_32(&br->br_prod_tail) & mask),
276 		("Buf-Ring has none in putback")) ;
277 	br->br_ring[cons_idx] = new;
278 }
279 
280 /*
281  * return a pointer to the first entry in the ring
282  * without modifying it, or NULL if the ring is empty
283  * race-prone if not protected by a lock
284  */
285 static __inline void *
buf_ring_peek(struct buf_ring * br)286 buf_ring_peek(struct buf_ring *br)
287 {
288 	uint32_t cons_head, prod_tail, mask;
289 
290 #if defined(DEBUG_BUFRING) && defined(_KERNEL)
291 	if ((br->br_lock != NULL) && !mtx_owned(br->br_lock))
292 		panic("lock not held on single consumer dequeue");
293 #endif
294 	mask = br->br_cons_mask;
295 	prod_tail = atomic_load_acq_32(&br->br_prod_tail);
296 	cons_head = atomic_load_32(&br->br_cons_head);
297 
298 	if (cons_head == prod_tail)
299 		return (NULL);
300 
301 	return (br->br_ring[cons_head & mask]);
302 }
303 
304 static __inline void *
buf_ring_peek_clear_sc(struct buf_ring * br)305 buf_ring_peek_clear_sc(struct buf_ring *br)
306 {
307 	uint32_t cons_head, prod_tail, mask;
308 	void *ret;
309 
310 #if defined(DEBUG_BUFRING) && defined(_KERNEL)
311 	if (!mtx_owned(br->br_lock))
312 		panic("lock not held on single consumer dequeue");
313 #endif
314 
315 	mask = br->br_cons_mask;
316 	prod_tail = atomic_load_acq_32(&br->br_prod_tail);
317 	cons_head = atomic_load_32(&br->br_cons_head);
318 
319 	if (cons_head == prod_tail)
320 		return (NULL);
321 
322 	ret = br->br_ring[cons_head & mask];
323 #ifdef DEBUG_BUFRING
324 	/*
325 	 * Single consumer, i.e. cons_head will not move while we are
326 	 * running, so atomic_swap_ptr() is not necessary here.
327 	 */
328 	br->br_ring[cons_head & mask] = NULL;
329 #endif
330 	return (ret);
331 }
332 
333 static __inline int
buf_ring_full(struct buf_ring * br)334 buf_ring_full(struct buf_ring *br)
335 {
336 
337 	return (atomic_load_32(&br->br_prod_head) ==
338 	    atomic_load_32(&br->br_cons_tail) + br->br_cons_size - 1);
339 }
340 
341 static __inline int
buf_ring_empty(struct buf_ring * br)342 buf_ring_empty(struct buf_ring *br)
343 {
344 
345 	return (atomic_load_32(&br->br_cons_head) ==
346 	    atomic_load_32(&br->br_prod_tail));
347 }
348 
349 static __inline int
buf_ring_count(struct buf_ring * br)350 buf_ring_count(struct buf_ring *br)
351 {
352 	uint32_t cons_tail, prod_tail;
353 
354 	cons_tail = atomic_load_32(&br->br_cons_tail);
355 	prod_tail = atomic_load_32(&br->br_prod_tail);
356 	return ((br->br_prod_size + prod_tail - cons_tail) & br->br_prod_mask);
357 }
358 
359 #ifdef _KERNEL
360 struct buf_ring *buf_ring_alloc(int count, struct malloc_type *type, int flags,
361     struct mtx *);
362 void buf_ring_free(struct buf_ring *br, struct malloc_type *type);
363 #else
364 
365 #include <stdlib.h>
366 
367 static inline struct buf_ring *
buf_ring_alloc(int count)368 buf_ring_alloc(int count)
369 {
370 	struct buf_ring *br;
371 
372 	KASSERT(powerof2(count), ("buf ring must be size power of 2"));
373 
374 	br = calloc(1, sizeof(struct buf_ring) + count * sizeof(void *));
375 	if (br == NULL)
376 		return (NULL);
377 	br->br_prod_size = br->br_cons_size = count;
378 	br->br_prod_mask = br->br_cons_mask = count - 1;
379 	br->br_prod_head = br->br_cons_head = 0;
380 	br->br_prod_tail = br->br_cons_tail = 0;
381 	return (br);
382 }
383 
384 static inline void
buf_ring_free(struct buf_ring * br)385 buf_ring_free(struct buf_ring *br)
386 {
387 	free(br);
388 }
389 
390 #endif /* !_KERNEL */
391 #endif /* _SYS_BUF_RING_H_ */
392