1 /*-
2 * SPDX-License-Identifier: BSD-2-Clause
3 *
4 * Copyright (c) 2007-2009 Kip Macy <kmacy@freebsd.org>
5 * All rights reserved.
6 * Copyright (c) 2024 Arm Ltd
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27 * SUCH DAMAGE.
28 *
29 */
30
31 #ifndef _SYS_BUF_RING_H_
32 #define _SYS_BUF_RING_H_
33
34 #include <sys/param.h>
35 #include <sys/kassert.h>
36 #include <machine/atomic.h>
37 #include <machine/cpu.h>
38
39 #if defined(DEBUG_BUFRING) && defined(_KERNEL)
40 #include <sys/lock.h>
41 #include <sys/mutex.h>
42 #endif
43
44 /*
45 * We only apply the mask to the head and tail values when calculating the
46 * index into br_ring to access. This means the upper bits can be used as
47 * epoch to reduce the chance the atomic_cmpset succeedes when it should
48 * fail, e.g. when the head wraps while the CPU is in an interrupt. This
49 * is a probablistic fix as there is still a very unlikely chance the
50 * value wraps back to the expected value.
51 *
52 */
53 struct buf_ring {
54 uint32_t br_prod_head;
55 uint32_t br_prod_tail;
56 int br_prod_size;
57 int br_prod_mask;
58 uint64_t br_drops;
59 uint32_t br_cons_head __aligned(CACHE_LINE_SIZE);
60 uint32_t br_cons_tail;
61 int br_cons_size;
62 int br_cons_mask;
63 #if defined(DEBUG_BUFRING) && defined(_KERNEL)
64 struct mtx *br_lock;
65 #endif
66 void *br_ring[0] __aligned(CACHE_LINE_SIZE);
67 };
68
69 /*
70 * multi-producer safe lock-free ring buffer enqueue
71 *
72 */
73 static __inline int
buf_ring_enqueue(struct buf_ring * br,void * buf)74 buf_ring_enqueue(struct buf_ring *br, void *buf)
75 {
76 uint32_t prod_head, prod_next, prod_idx;
77 uint32_t cons_tail, mask;
78
79 mask = br->br_prod_mask;
80 #ifdef DEBUG_BUFRING
81 /*
82 * Note: It is possible to encounter an mbuf that was removed
83 * via drbr_peek(), and then re-added via drbr_putback() and
84 * trigger a spurious panic.
85 */
86 for (uint32_t i = atomic_load_32(&br->br_cons_head);
87 i != atomic_load_32(&br->br_prod_head); i++)
88 if (br->br_ring[i & mask] == buf)
89 panic("buf=%p already enqueue at %d prod=%d cons=%d",
90 buf, i, atomic_load_32(&br->br_prod_tail),
91 atomic_load_32(&br->br_cons_tail));
92 #endif
93 critical_enter();
94 do {
95 /*
96 * br->br_prod_head needs to be read before br->br_cons_tail.
97 * If not then we could perform the dequeue and enqueue
98 * between reading br_cons_tail and reading br_prod_head. This
99 * could give us values where br_cons_head == br_prod_tail
100 * (after masking).
101 *
102 * To work around this us a load acquire. This is just to
103 * ensure ordering within this thread.
104 */
105 prod_head = atomic_load_acq_32(&br->br_prod_head);
106 prod_next = prod_head + 1;
107 cons_tail = atomic_load_acq_32(&br->br_cons_tail);
108
109 if ((int32_t)(cons_tail + br->br_prod_size - prod_next) < 1) {
110 if (prod_head == atomic_load_32(&br->br_prod_head) &&
111 cons_tail == atomic_load_32(&br->br_cons_tail)) {
112 br->br_drops++;
113 critical_exit();
114 return (ENOBUFS);
115 }
116 continue;
117 }
118 } while (!atomic_cmpset_32(&br->br_prod_head, prod_head, prod_next));
119 prod_idx = prod_head & mask;
120 #ifdef DEBUG_BUFRING
121 if (br->br_ring[prod_idx] != NULL)
122 panic("dangling value in enqueue");
123 #endif
124 br->br_ring[prod_idx] = buf;
125
126 /*
127 * If there are other enqueues in progress
128 * that preceded us, we need to wait for them
129 * to complete
130 */
131 while (atomic_load_32(&br->br_prod_tail) != prod_head)
132 cpu_spinwait();
133 atomic_store_rel_32(&br->br_prod_tail, prod_next);
134 critical_exit();
135 return (0);
136 }
137
138 /*
139 * multi-consumer safe dequeue
140 *
141 */
142 static __inline void *
buf_ring_dequeue_mc(struct buf_ring * br)143 buf_ring_dequeue_mc(struct buf_ring *br)
144 {
145 uint32_t cons_head, cons_next, cons_idx;
146 uint32_t prod_tail, mask;
147 void *buf;
148
149 critical_enter();
150 mask = br->br_cons_mask;
151 do {
152 /*
153 * As with buf_ring_enqueue ensure we read the head before
154 * the tail. If we read them in the wrong order we may
155 * think the bug_ring is full when it is empty.
156 */
157 cons_head = atomic_load_acq_32(&br->br_cons_head);
158 cons_next = cons_head + 1;
159 prod_tail = atomic_load_acq_32(&br->br_prod_tail);
160
161 if (cons_head == prod_tail) {
162 critical_exit();
163 return (NULL);
164 }
165 } while (!atomic_cmpset_32(&br->br_cons_head, cons_head, cons_next));
166 cons_idx = cons_head & mask;
167
168 buf = br->br_ring[cons_idx];
169 #ifdef DEBUG_BUFRING
170 br->br_ring[cons_idx] = NULL;
171 #endif
172 /*
173 * If there are other dequeues in progress
174 * that preceded us, we need to wait for them
175 * to complete
176 */
177 while (atomic_load_32(&br->br_cons_tail) != cons_head)
178 cpu_spinwait();
179
180 atomic_store_rel_32(&br->br_cons_tail, cons_next);
181 critical_exit();
182
183 return (buf);
184 }
185
186 /*
187 * single-consumer dequeue
188 * use where dequeue is protected by a lock
189 * e.g. a network driver's tx queue lock
190 */
191 static __inline void *
buf_ring_dequeue_sc(struct buf_ring * br)192 buf_ring_dequeue_sc(struct buf_ring *br)
193 {
194 uint32_t cons_head, cons_next, cons_idx;
195 uint32_t prod_tail, mask;
196 void *buf;
197
198 mask = br->br_cons_mask;
199 cons_head = atomic_load_32(&br->br_cons_head);
200 prod_tail = atomic_load_acq_32(&br->br_prod_tail);
201
202 cons_next = cons_head + 1;
203
204 if (cons_head == prod_tail)
205 return (NULL);
206
207 cons_idx = cons_head & mask;
208 atomic_store_32(&br->br_cons_head, cons_next);
209 buf = br->br_ring[cons_idx];
210
211 #ifdef DEBUG_BUFRING
212 br->br_ring[cons_idx] = NULL;
213 #ifdef _KERNEL
214 if (!mtx_owned(br->br_lock))
215 panic("lock not held on single consumer dequeue");
216 #endif
217 if (atomic_load_32(&br->br_cons_tail) != cons_head)
218 panic("inconsistent list cons_tail=%d cons_head=%d",
219 atomic_load_32(&br->br_cons_tail), cons_head);
220 #endif
221 atomic_store_rel_32(&br->br_cons_tail, cons_next);
222 return (buf);
223 }
224
225 /*
226 * single-consumer advance after a peek
227 * use where it is protected by a lock
228 * e.g. a network driver's tx queue lock
229 */
230 static __inline void
buf_ring_advance_sc(struct buf_ring * br)231 buf_ring_advance_sc(struct buf_ring *br)
232 {
233 uint32_t cons_head, cons_next, prod_tail;
234 #ifdef DEBUG_BUFRING
235 uint32_t mask;
236
237 mask = br->br_cons_mask;
238 #endif
239 cons_head = atomic_load_32(&br->br_cons_head);
240 prod_tail = atomic_load_32(&br->br_prod_tail);
241
242 cons_next = cons_head + 1;
243 if (cons_head == prod_tail)
244 return;
245 atomic_store_32(&br->br_cons_head, cons_next);
246 #ifdef DEBUG_BUFRING
247 br->br_ring[cons_head & mask] = NULL;
248 #endif
249 atomic_store_rel_32(&br->br_cons_tail, cons_next);
250 }
251
252 /*
253 * Used to return a buffer (most likely already there)
254 * to the top of the ring. The caller should *not*
255 * have used any dequeue to pull it out of the ring
256 * but instead should have used the peek() function.
257 * This is normally used where the transmit queue
258 * of a driver is full, and an mbuf must be returned.
259 * Most likely whats in the ring-buffer is what
260 * is being put back (since it was not removed), but
261 * sometimes the lower transmit function may have
262 * done a pullup or other function that will have
263 * changed it. As an optimization we always put it
264 * back (since jhb says the store is probably cheaper),
265 * if we have to do a multi-queue version we will need
266 * the compare and an atomic.
267 */
268 static __inline void
buf_ring_putback_sc(struct buf_ring * br,void * new)269 buf_ring_putback_sc(struct buf_ring *br, void *new)
270 {
271 uint32_t cons_idx, mask;
272
273 mask = br->br_cons_mask;
274 cons_idx = atomic_load_32(&br->br_cons_head) & mask;
275 KASSERT(cons_idx != (atomic_load_32(&br->br_prod_tail) & mask),
276 ("Buf-Ring has none in putback")) ;
277 br->br_ring[cons_idx] = new;
278 }
279
280 /*
281 * return a pointer to the first entry in the ring
282 * without modifying it, or NULL if the ring is empty
283 * race-prone if not protected by a lock
284 */
285 static __inline void *
buf_ring_peek(struct buf_ring * br)286 buf_ring_peek(struct buf_ring *br)
287 {
288 uint32_t cons_head, prod_tail, mask;
289
290 #if defined(DEBUG_BUFRING) && defined(_KERNEL)
291 if ((br->br_lock != NULL) && !mtx_owned(br->br_lock))
292 panic("lock not held on single consumer dequeue");
293 #endif
294 mask = br->br_cons_mask;
295 prod_tail = atomic_load_acq_32(&br->br_prod_tail);
296 cons_head = atomic_load_32(&br->br_cons_head);
297
298 if (cons_head == prod_tail)
299 return (NULL);
300
301 return (br->br_ring[cons_head & mask]);
302 }
303
304 static __inline void *
buf_ring_peek_clear_sc(struct buf_ring * br)305 buf_ring_peek_clear_sc(struct buf_ring *br)
306 {
307 uint32_t cons_head, prod_tail, mask;
308 void *ret;
309
310 #if defined(DEBUG_BUFRING) && defined(_KERNEL)
311 if (!mtx_owned(br->br_lock))
312 panic("lock not held on single consumer dequeue");
313 #endif
314
315 mask = br->br_cons_mask;
316 prod_tail = atomic_load_acq_32(&br->br_prod_tail);
317 cons_head = atomic_load_32(&br->br_cons_head);
318
319 if (cons_head == prod_tail)
320 return (NULL);
321
322 ret = br->br_ring[cons_head & mask];
323 #ifdef DEBUG_BUFRING
324 /*
325 * Single consumer, i.e. cons_head will not move while we are
326 * running, so atomic_swap_ptr() is not necessary here.
327 */
328 br->br_ring[cons_head & mask] = NULL;
329 #endif
330 return (ret);
331 }
332
333 static __inline int
buf_ring_full(struct buf_ring * br)334 buf_ring_full(struct buf_ring *br)
335 {
336
337 return (atomic_load_32(&br->br_prod_head) ==
338 atomic_load_32(&br->br_cons_tail) + br->br_cons_size - 1);
339 }
340
341 static __inline int
buf_ring_empty(struct buf_ring * br)342 buf_ring_empty(struct buf_ring *br)
343 {
344
345 return (atomic_load_32(&br->br_cons_head) ==
346 atomic_load_32(&br->br_prod_tail));
347 }
348
349 static __inline int
buf_ring_count(struct buf_ring * br)350 buf_ring_count(struct buf_ring *br)
351 {
352 uint32_t cons_tail, prod_tail;
353
354 cons_tail = atomic_load_32(&br->br_cons_tail);
355 prod_tail = atomic_load_32(&br->br_prod_tail);
356 return ((br->br_prod_size + prod_tail - cons_tail) & br->br_prod_mask);
357 }
358
359 #ifdef _KERNEL
360 struct buf_ring *buf_ring_alloc(int count, struct malloc_type *type, int flags,
361 struct mtx *);
362 void buf_ring_free(struct buf_ring *br, struct malloc_type *type);
363 #else
364
365 #include <stdlib.h>
366
367 static inline struct buf_ring *
buf_ring_alloc(int count)368 buf_ring_alloc(int count)
369 {
370 struct buf_ring *br;
371
372 KASSERT(powerof2(count), ("buf ring must be size power of 2"));
373
374 br = calloc(1, sizeof(struct buf_ring) + count * sizeof(void *));
375 if (br == NULL)
376 return (NULL);
377 br->br_prod_size = br->br_cons_size = count;
378 br->br_prod_mask = br->br_cons_mask = count - 1;
379 br->br_prod_head = br->br_cons_head = 0;
380 br->br_prod_tail = br->br_cons_tail = 0;
381 return (br);
382 }
383
384 static inline void
buf_ring_free(struct buf_ring * br)385 buf_ring_free(struct buf_ring *br)
386 {
387 free(br);
388 }
389
390 #endif /* !_KERNEL */
391 #endif /* _SYS_BUF_RING_H_ */
392