1 /*-
2 * SPDX-License-Identifier: BSD-2-Clause
3 *
4 * Copyright (c) 2007-2009 Kip Macy <kmacy@freebsd.org>
5 * All rights reserved.
6 * Copyright (c) 2024 Arm Ltd
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27 * SUCH DAMAGE.
28 *
29 */
30
31 #ifndef _SYS_BUF_RING_H_
32 #define _SYS_BUF_RING_H_
33
34 #include <sys/param.h>
35 #include <sys/kassert.h>
36 #include <machine/atomic.h>
37 #include <machine/cpu.h>
38
39 #if defined(DEBUG_BUFRING) && defined(_KERNEL)
40 #include <sys/lock.h>
41 #include <sys/mutex.h>
42 #endif
43
44 /*
45 * We only apply the mask to the head and tail values when calculating the
46 * index into br_ring to access. This means the upper bits can be used as
47 * epoch to reduce the chance the atomic_cmpset succeedes when it should
48 * fail, e.g. when the head wraps while the CPU is in an interrupt. This
49 * is a probablistic fix as there is still a very unlikely chance the
50 * value wraps back to the expected value.
51 *
52 */
53 struct buf_ring {
54 uint32_t br_prod_head;
55 uint32_t br_prod_tail;
56 int br_prod_size;
57 int br_prod_mask;
58 uint64_t br_drops;
59 uint32_t br_cons_head __aligned(CACHE_LINE_SIZE);
60 uint32_t br_cons_tail;
61 int br_cons_size;
62 int br_cons_mask;
63 #if defined(DEBUG_BUFRING) && defined(_KERNEL)
64 struct mtx *br_lock;
65 #endif
66 void *br_ring[0] __aligned(CACHE_LINE_SIZE);
67 };
68
69 /*
70 * multi-producer safe lock-free ring buffer enqueue
71 *
72 */
73 static __inline int
buf_ring_enqueue(struct buf_ring * br,void * buf)74 buf_ring_enqueue(struct buf_ring *br, void *buf)
75 {
76 uint32_t prod_head, prod_next, prod_idx;
77 uint32_t cons_tail, mask;
78
79 mask = br->br_prod_mask;
80 #ifdef DEBUG_BUFRING
81 /*
82 * Note: It is possible to encounter an mbuf that was removed
83 * via drbr_peek(), and then re-added via drbr_putback() and
84 * trigger a spurious panic.
85 */
86 for (uint32_t i = atomic_load_32(&br->br_cons_head);
87 i != atomic_load_32(&br->br_prod_head); i++)
88 if (br->br_ring[i & mask] == buf)
89 panic("buf=%p already enqueue at %d prod=%d cons=%d",
90 buf, i, atomic_load_32(&br->br_prod_tail),
91 atomic_load_32(&br->br_cons_tail));
92 #endif
93 critical_enter();
94 do {
95 /*
96 * br->br_prod_head needs to be read before br->br_cons_tail.
97 * If not then we could perform the dequeue and enqueue
98 * between reading br_cons_tail and reading br_prod_head. This
99 * could give us values where br_cons_head == br_prod_tail
100 * (after masking).
101 *
102 * To work around this us a load acquire. This is just to
103 * ensure ordering within this thread.
104 */
105 prod_head = atomic_load_acq_32(&br->br_prod_head);
106 prod_next = prod_head + 1;
107 cons_tail = atomic_load_acq_32(&br->br_cons_tail);
108
109 if ((int32_t)(cons_tail + br->br_prod_size - prod_next) < 1) {
110 rmb();
111 if (prod_head == atomic_load_32(&br->br_prod_head) &&
112 cons_tail == atomic_load_32(&br->br_cons_tail)) {
113 br->br_drops++;
114 critical_exit();
115 return (ENOBUFS);
116 }
117 continue;
118 }
119 } while (!atomic_cmpset_acq_32(&br->br_prod_head, prod_head, prod_next));
120 prod_idx = prod_head & mask;
121 #ifdef DEBUG_BUFRING
122 if (br->br_ring[prod_idx] != NULL)
123 panic("dangling value in enqueue");
124 #endif
125 br->br_ring[prod_idx] = buf;
126
127 /*
128 * If there are other enqueues in progress
129 * that preceded us, we need to wait for them
130 * to complete
131 */
132 while (atomic_load_32(&br->br_prod_tail) != prod_head)
133 cpu_spinwait();
134 atomic_store_rel_32(&br->br_prod_tail, prod_next);
135 critical_exit();
136 return (0);
137 }
138
139 /*
140 * multi-consumer safe dequeue
141 *
142 */
143 static __inline void *
buf_ring_dequeue_mc(struct buf_ring * br)144 buf_ring_dequeue_mc(struct buf_ring *br)
145 {
146 uint32_t cons_head, cons_next, cons_idx;
147 uint32_t prod_tail, mask;
148 void *buf;
149
150 critical_enter();
151 mask = br->br_cons_mask;
152 do {
153 /*
154 * As with buf_ring_enqueue ensure we read the head before
155 * the tail. If we read them in the wrong order we may
156 * think the bug_ring is full when it is empty.
157 */
158 cons_head = atomic_load_acq_32(&br->br_cons_head);
159 cons_next = cons_head + 1;
160 prod_tail = atomic_load_acq_32(&br->br_prod_tail);
161
162 if (cons_head == prod_tail) {
163 critical_exit();
164 return (NULL);
165 }
166 } while (!atomic_cmpset_acq_32(&br->br_cons_head, cons_head, cons_next));
167 cons_idx = cons_head & mask;
168
169 buf = br->br_ring[cons_idx];
170 #ifdef DEBUG_BUFRING
171 br->br_ring[cons_idx] = NULL;
172 #endif
173 /*
174 * If there are other dequeues in progress
175 * that preceded us, we need to wait for them
176 * to complete
177 */
178 while (atomic_load_32(&br->br_cons_tail) != cons_head)
179 cpu_spinwait();
180
181 atomic_store_rel_32(&br->br_cons_tail, cons_next);
182 critical_exit();
183
184 return (buf);
185 }
186
187 /*
188 * single-consumer dequeue
189 * use where dequeue is protected by a lock
190 * e.g. a network driver's tx queue lock
191 */
192 static __inline void *
buf_ring_dequeue_sc(struct buf_ring * br)193 buf_ring_dequeue_sc(struct buf_ring *br)
194 {
195 uint32_t cons_head, cons_next, cons_idx;
196 uint32_t prod_tail, mask;
197 void *buf;
198
199 mask = br->br_cons_mask;
200 cons_head = atomic_load_32(&br->br_cons_head);
201 prod_tail = atomic_load_acq_32(&br->br_prod_tail);
202
203 cons_next = cons_head + 1;
204
205 if (cons_head == prod_tail)
206 return (NULL);
207
208 cons_idx = cons_head & mask;
209 atomic_store_32(&br->br_cons_head, cons_next);
210 buf = br->br_ring[cons_idx];
211
212 #ifdef DEBUG_BUFRING
213 br->br_ring[cons_idx] = NULL;
214 #ifdef _KERNEL
215 if (!mtx_owned(br->br_lock))
216 panic("lock not held on single consumer dequeue");
217 #endif
218 if (atomic_load_32(&br->br_cons_tail) != cons_head)
219 panic("inconsistent list cons_tail=%d cons_head=%d",
220 atomic_load_32(&br->br_cons_tail), cons_head);
221 #endif
222 atomic_store_rel_32(&br->br_cons_tail, cons_next);
223 return (buf);
224 }
225
226 /*
227 * single-consumer advance after a peek
228 * use where it is protected by a lock
229 * e.g. a network driver's tx queue lock
230 */
231 static __inline void
buf_ring_advance_sc(struct buf_ring * br)232 buf_ring_advance_sc(struct buf_ring *br)
233 {
234 uint32_t cons_head, cons_next, prod_tail;
235 #ifdef DEBUG_BUFRING
236 uint32_t mask;
237
238 mask = br->br_cons_mask;
239 #endif
240 cons_head = atomic_load_32(&br->br_cons_head);
241 prod_tail = atomic_load_32(&br->br_prod_tail);
242
243 cons_next = cons_head + 1;
244 if (cons_head == prod_tail)
245 return;
246 atomic_store_32(&br->br_cons_head, cons_next);
247 #ifdef DEBUG_BUFRING
248 br->br_ring[cons_head & mask] = NULL;
249 #endif
250 atomic_store_rel_32(&br->br_cons_tail, cons_next);
251 }
252
253 /*
254 * Used to return a buffer (most likely already there)
255 * to the top of the ring. The caller should *not*
256 * have used any dequeue to pull it out of the ring
257 * but instead should have used the peek() function.
258 * This is normally used where the transmit queue
259 * of a driver is full, and an mbuf must be returned.
260 * Most likely whats in the ring-buffer is what
261 * is being put back (since it was not removed), but
262 * sometimes the lower transmit function may have
263 * done a pullup or other function that will have
264 * changed it. As an optimization we always put it
265 * back (since jhb says the store is probably cheaper),
266 * if we have to do a multi-queue version we will need
267 * the compare and an atomic.
268 */
269 static __inline void
buf_ring_putback_sc(struct buf_ring * br,void * new)270 buf_ring_putback_sc(struct buf_ring *br, void *new)
271 {
272 uint32_t cons_idx, mask;
273
274 mask = br->br_cons_mask;
275 cons_idx = atomic_load_32(&br->br_cons_head) & mask;
276 KASSERT(cons_idx != (atomic_load_32(&br->br_prod_tail) & mask),
277 ("Buf-Ring has none in putback")) ;
278 br->br_ring[cons_idx] = new;
279 }
280
281 /*
282 * return a pointer to the first entry in the ring
283 * without modifying it, or NULL if the ring is empty
284 * race-prone if not protected by a lock
285 */
286 static __inline void *
buf_ring_peek(struct buf_ring * br)287 buf_ring_peek(struct buf_ring *br)
288 {
289 uint32_t cons_head, prod_tail, mask;
290
291 #if defined(DEBUG_BUFRING) && defined(_KERNEL)
292 if ((br->br_lock != NULL) && !mtx_owned(br->br_lock))
293 panic("lock not held on single consumer dequeue");
294 #endif
295 mask = br->br_cons_mask;
296 prod_tail = atomic_load_acq_32(&br->br_prod_tail);
297 cons_head = atomic_load_32(&br->br_cons_head);
298
299 if (cons_head == prod_tail)
300 return (NULL);
301
302 return (br->br_ring[cons_head & mask]);
303 }
304
305 static __inline void *
buf_ring_peek_clear_sc(struct buf_ring * br)306 buf_ring_peek_clear_sc(struct buf_ring *br)
307 {
308 uint32_t cons_head, prod_tail, mask;
309 void *ret;
310
311 #if defined(DEBUG_BUFRING) && defined(_KERNEL)
312 if (!mtx_owned(br->br_lock))
313 panic("lock not held on single consumer dequeue");
314 #endif
315
316 mask = br->br_cons_mask;
317 prod_tail = atomic_load_acq_32(&br->br_prod_tail);
318 cons_head = atomic_load_32(&br->br_cons_head);
319
320 if (cons_head == prod_tail)
321 return (NULL);
322
323 ret = br->br_ring[cons_head & mask];
324 #ifdef DEBUG_BUFRING
325 /*
326 * Single consumer, i.e. cons_head will not move while we are
327 * running, so atomic_swap_ptr() is not necessary here.
328 */
329 br->br_ring[cons_head & mask] = NULL;
330 #endif
331 return (ret);
332 }
333
334 static __inline int
buf_ring_full(struct buf_ring * br)335 buf_ring_full(struct buf_ring *br)
336 {
337
338 return (atomic_load_32(&br->br_prod_head) ==
339 atomic_load_32(&br->br_cons_tail) + br->br_cons_size - 1);
340 }
341
342 static __inline int
buf_ring_empty(struct buf_ring * br)343 buf_ring_empty(struct buf_ring *br)
344 {
345
346 return (atomic_load_32(&br->br_cons_head) ==
347 atomic_load_32(&br->br_prod_tail));
348 }
349
350 static __inline int
buf_ring_count(struct buf_ring * br)351 buf_ring_count(struct buf_ring *br)
352 {
353 uint32_t cons_tail, prod_tail;
354
355 cons_tail = atomic_load_32(&br->br_cons_tail);
356 prod_tail = atomic_load_32(&br->br_prod_tail);
357 return ((br->br_prod_size + prod_tail - cons_tail) & br->br_prod_mask);
358 }
359
360 #ifdef _KERNEL
361 struct buf_ring *buf_ring_alloc(int count, struct malloc_type *type, int flags,
362 struct mtx *);
363 void buf_ring_free(struct buf_ring *br, struct malloc_type *type);
364 #else
365
366 #include <stdlib.h>
367
368 static inline struct buf_ring *
buf_ring_alloc(int count)369 buf_ring_alloc(int count)
370 {
371 struct buf_ring *br;
372
373 KASSERT(powerof2(count), ("buf ring must be size power of 2"));
374
375 br = calloc(1, sizeof(struct buf_ring) + count * sizeof(void *));
376 if (br == NULL)
377 return (NULL);
378 br->br_prod_size = br->br_cons_size = count;
379 br->br_prod_mask = br->br_cons_mask = count - 1;
380 br->br_prod_head = br->br_cons_head = 0;
381 br->br_prod_tail = br->br_cons_tail = 0;
382 return (br);
383 }
384
385 static inline void
buf_ring_free(struct buf_ring * br)386 buf_ring_free(struct buf_ring *br)
387 {
388 free(br);
389 }
390
391 #endif /* !_KERNEL */
392 #endif /* _SYS_BUF_RING_H_ */
393