xref: /freebsd/sys/net/mp_ring.c (revision 4c7070db251a6f2153f9e4783717151b07449ae3)
1*4c7070dbSScott Long /*-
2*4c7070dbSScott Long  * Copyright (c) 2014 Chelsio Communications, Inc.
3*4c7070dbSScott Long  * All rights reserved.
4*4c7070dbSScott Long  * Written by: Navdeep Parhar <np@FreeBSD.org>
5*4c7070dbSScott Long  *
6*4c7070dbSScott Long  * Redistribution and use in source and binary forms, with or without
7*4c7070dbSScott Long  * modification, are permitted provided that the following conditions
8*4c7070dbSScott Long  * are met:
9*4c7070dbSScott Long  * 1. Redistributions of source code must retain the above copyright
10*4c7070dbSScott Long  *    notice, this list of conditions and the following disclaimer.
11*4c7070dbSScott Long  * 2. Redistributions in binary form must reproduce the above copyright
12*4c7070dbSScott Long  *    notice, this list of conditions and the following disclaimer in the
13*4c7070dbSScott Long  *    documentation and/or other materials provided with the distribution.
14*4c7070dbSScott Long  *
15*4c7070dbSScott Long  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16*4c7070dbSScott Long  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17*4c7070dbSScott Long  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18*4c7070dbSScott Long  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19*4c7070dbSScott Long  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20*4c7070dbSScott Long  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21*4c7070dbSScott Long  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22*4c7070dbSScott Long  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23*4c7070dbSScott Long  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24*4c7070dbSScott Long  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25*4c7070dbSScott Long  * SUCH DAMAGE.
26*4c7070dbSScott Long  */
27*4c7070dbSScott Long 
28*4c7070dbSScott Long #include <sys/cdefs.h>
29*4c7070dbSScott Long __FBSDID("$FreeBSD$");
30*4c7070dbSScott Long 
31*4c7070dbSScott Long #include <sys/types.h>
32*4c7070dbSScott Long #include <sys/param.h>
33*4c7070dbSScott Long #include <sys/systm.h>
34*4c7070dbSScott Long #include <sys/counter.h>
35*4c7070dbSScott Long #include <sys/lock.h>
36*4c7070dbSScott Long #include <sys/mutex.h>
37*4c7070dbSScott Long #include <sys/malloc.h>
38*4c7070dbSScott Long #include <machine/cpu.h>
39*4c7070dbSScott Long 
40*4c7070dbSScott Long 
41*4c7070dbSScott Long 
42*4c7070dbSScott Long #include <net/mp_ring.h>
43*4c7070dbSScott Long 
44*4c7070dbSScott Long #if defined(__i386__)
45*4c7070dbSScott Long #define atomic_cmpset_acq_64 atomic_cmpset_64
46*4c7070dbSScott Long #define atomic_cmpset_rel_64 atomic_cmpset_64
47*4c7070dbSScott Long #endif
48*4c7070dbSScott Long 
49*4c7070dbSScott Long union ring_state {
50*4c7070dbSScott Long 	struct {
51*4c7070dbSScott Long 		uint16_t pidx_head;
52*4c7070dbSScott Long 		uint16_t pidx_tail;
53*4c7070dbSScott Long 		uint16_t cidx;
54*4c7070dbSScott Long 		uint16_t flags;
55*4c7070dbSScott Long 	};
56*4c7070dbSScott Long 	uint64_t state;
57*4c7070dbSScott Long };
58*4c7070dbSScott Long 
59*4c7070dbSScott Long enum {
60*4c7070dbSScott Long 	IDLE = 0,	/* consumer ran to completion, nothing more to do. */
61*4c7070dbSScott Long 	BUSY,		/* consumer is running already, or will be shortly. */
62*4c7070dbSScott Long 	STALLED,	/* consumer stopped due to lack of resources. */
63*4c7070dbSScott Long 	ABDICATED,	/* consumer stopped even though there was work to be
64*4c7070dbSScott Long 			   done because it wants another thread to take over. */
65*4c7070dbSScott Long };
66*4c7070dbSScott Long 
67*4c7070dbSScott Long static inline uint16_t
68*4c7070dbSScott Long space_available(struct ifmp_ring *r, union ring_state s)
69*4c7070dbSScott Long {
70*4c7070dbSScott Long 	uint16_t x = r->size - 1;
71*4c7070dbSScott Long 
72*4c7070dbSScott Long 	if (s.cidx == s.pidx_head)
73*4c7070dbSScott Long 		return (x);
74*4c7070dbSScott Long 	else if (s.cidx > s.pidx_head)
75*4c7070dbSScott Long 		return (s.cidx - s.pidx_head - 1);
76*4c7070dbSScott Long 	else
77*4c7070dbSScott Long 		return (x - s.pidx_head + s.cidx);
78*4c7070dbSScott Long }
79*4c7070dbSScott Long 
80*4c7070dbSScott Long static inline uint16_t
81*4c7070dbSScott Long increment_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n)
82*4c7070dbSScott Long {
83*4c7070dbSScott Long 	int x = r->size - idx;
84*4c7070dbSScott Long 
85*4c7070dbSScott Long 	MPASS(x > 0);
86*4c7070dbSScott Long 	return (x > n ? idx + n : n - x);
87*4c7070dbSScott Long }
88*4c7070dbSScott Long 
89*4c7070dbSScott Long /* Consumer is about to update the ring's state to s */
90*4c7070dbSScott Long static inline uint16_t
91*4c7070dbSScott Long state_to_flags(union ring_state s, int abdicate)
92*4c7070dbSScott Long {
93*4c7070dbSScott Long 
94*4c7070dbSScott Long 	if (s.cidx == s.pidx_tail)
95*4c7070dbSScott Long 		return (IDLE);
96*4c7070dbSScott Long 	else if (abdicate && s.pidx_tail != s.pidx_head)
97*4c7070dbSScott Long 		return (ABDICATED);
98*4c7070dbSScott Long 
99*4c7070dbSScott Long 	return (BUSY);
100*4c7070dbSScott Long }
101*4c7070dbSScott Long 
102*4c7070dbSScott Long #ifdef NO_64BIT_ATOMICS
103*4c7070dbSScott Long static void
104*4c7070dbSScott Long drain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
105*4c7070dbSScott Long {
106*4c7070dbSScott Long 	union ring_state ns;
107*4c7070dbSScott Long 	int n, pending, total;
108*4c7070dbSScott Long 	uint16_t cidx = os.cidx;
109*4c7070dbSScott Long 	uint16_t pidx = os.pidx_tail;
110*4c7070dbSScott Long 
111*4c7070dbSScott Long 	MPASS(os.flags == BUSY);
112*4c7070dbSScott Long 	MPASS(cidx != pidx);
113*4c7070dbSScott Long 
114*4c7070dbSScott Long 	if (prev == IDLE)
115*4c7070dbSScott Long 		counter_u64_add(r->starts, 1);
116*4c7070dbSScott Long 	pending = 0;
117*4c7070dbSScott Long 	total = 0;
118*4c7070dbSScott Long 
119*4c7070dbSScott Long 	while (cidx != pidx) {
120*4c7070dbSScott Long 
121*4c7070dbSScott Long 		/* Items from cidx to pidx are available for consumption. */
122*4c7070dbSScott Long 		n = r->drain(r, cidx, pidx);
123*4c7070dbSScott Long 		if (n == 0) {
124*4c7070dbSScott Long 			os.state = ns.state = r->state;
125*4c7070dbSScott Long 			ns.cidx = cidx;
126*4c7070dbSScott Long 			ns.flags = STALLED;
127*4c7070dbSScott Long 			r->state = ns.state;
128*4c7070dbSScott Long 			if (prev != STALLED)
129*4c7070dbSScott Long 				counter_u64_add(r->stalls, 1);
130*4c7070dbSScott Long 			else if (total > 0) {
131*4c7070dbSScott Long 				counter_u64_add(r->restarts, 1);
132*4c7070dbSScott Long 				counter_u64_add(r->stalls, 1);
133*4c7070dbSScott Long 			}
134*4c7070dbSScott Long 			break;
135*4c7070dbSScott Long 		}
136*4c7070dbSScott Long 		cidx = increment_idx(r, cidx, n);
137*4c7070dbSScott Long 		pending += n;
138*4c7070dbSScott Long 		total += n;
139*4c7070dbSScott Long 
140*4c7070dbSScott Long 		/*
141*4c7070dbSScott Long 		 * We update the cidx only if we've caught up with the pidx, the
142*4c7070dbSScott Long 		 * real cidx is getting too far ahead of the one visible to
143*4c7070dbSScott Long 		 * everyone else, or we have exceeded our budget.
144*4c7070dbSScott Long 		 */
145*4c7070dbSScott Long 		if (cidx != pidx && pending < 64 && total < budget)
146*4c7070dbSScott Long 			continue;
147*4c7070dbSScott Long 
148*4c7070dbSScott Long 		os.state = ns.state = r->state;
149*4c7070dbSScott Long 		ns.cidx = cidx;
150*4c7070dbSScott Long 		ns.flags = state_to_flags(ns, total >= budget);
151*4c7070dbSScott Long 		r->state = ns.state;
152*4c7070dbSScott Long 
153*4c7070dbSScott Long 		if (ns.flags == ABDICATED)
154*4c7070dbSScott Long 			counter_u64_add(r->abdications, 1);
155*4c7070dbSScott Long 		if (ns.flags != BUSY) {
156*4c7070dbSScott Long 			/* Wrong loop exit if we're going to stall. */
157*4c7070dbSScott Long 			MPASS(ns.flags != STALLED);
158*4c7070dbSScott Long 			if (prev == STALLED) {
159*4c7070dbSScott Long 				MPASS(total > 0);
160*4c7070dbSScott Long 				counter_u64_add(r->restarts, 1);
161*4c7070dbSScott Long 			}
162*4c7070dbSScott Long 			break;
163*4c7070dbSScott Long 		}
164*4c7070dbSScott Long 
165*4c7070dbSScott Long 		/*
166*4c7070dbSScott Long 		 * The acquire style atomic above guarantees visibility of items
167*4c7070dbSScott Long 		 * associated with any pidx change that we notice here.
168*4c7070dbSScott Long 		 */
169*4c7070dbSScott Long 		pidx = ns.pidx_tail;
170*4c7070dbSScott Long 		pending = 0;
171*4c7070dbSScott Long 	}
172*4c7070dbSScott Long }
173*4c7070dbSScott Long #else
174*4c7070dbSScott Long /*
175*4c7070dbSScott Long  * Caller passes in a state, with a guarantee that there is work to do and that
176*4c7070dbSScott Long  * all items up to the pidx_tail in the state are visible.
177*4c7070dbSScott Long  */
178*4c7070dbSScott Long static void
179*4c7070dbSScott Long drain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
180*4c7070dbSScott Long {
181*4c7070dbSScott Long 	union ring_state ns;
182*4c7070dbSScott Long 	int n, pending, total;
183*4c7070dbSScott Long 	uint16_t cidx = os.cidx;
184*4c7070dbSScott Long 	uint16_t pidx = os.pidx_tail;
185*4c7070dbSScott Long 
186*4c7070dbSScott Long 	MPASS(os.flags == BUSY);
187*4c7070dbSScott Long 	MPASS(cidx != pidx);
188*4c7070dbSScott Long 
189*4c7070dbSScott Long 	if (prev == IDLE)
190*4c7070dbSScott Long 		counter_u64_add(r->starts, 1);
191*4c7070dbSScott Long 	pending = 0;
192*4c7070dbSScott Long 	total = 0;
193*4c7070dbSScott Long 
194*4c7070dbSScott Long 	while (cidx != pidx) {
195*4c7070dbSScott Long 
196*4c7070dbSScott Long 		/* Items from cidx to pidx are available for consumption. */
197*4c7070dbSScott Long 		n = r->drain(r, cidx, pidx);
198*4c7070dbSScott Long 		if (n == 0) {
199*4c7070dbSScott Long 			critical_enter();
200*4c7070dbSScott Long 			do {
201*4c7070dbSScott Long 				os.state = ns.state = r->state;
202*4c7070dbSScott Long 				ns.cidx = cidx;
203*4c7070dbSScott Long 				ns.flags = STALLED;
204*4c7070dbSScott Long 			} while (atomic_cmpset_64(&r->state, os.state,
205*4c7070dbSScott Long 			    ns.state) == 0);
206*4c7070dbSScott Long 			critical_exit();
207*4c7070dbSScott Long 			if (prev != STALLED)
208*4c7070dbSScott Long 				counter_u64_add(r->stalls, 1);
209*4c7070dbSScott Long 			else if (total > 0) {
210*4c7070dbSScott Long 				counter_u64_add(r->restarts, 1);
211*4c7070dbSScott Long 				counter_u64_add(r->stalls, 1);
212*4c7070dbSScott Long 			}
213*4c7070dbSScott Long 			break;
214*4c7070dbSScott Long 		}
215*4c7070dbSScott Long 		cidx = increment_idx(r, cidx, n);
216*4c7070dbSScott Long 		pending += n;
217*4c7070dbSScott Long 		total += n;
218*4c7070dbSScott Long 
219*4c7070dbSScott Long 		/*
220*4c7070dbSScott Long 		 * We update the cidx only if we've caught up with the pidx, the
221*4c7070dbSScott Long 		 * real cidx is getting too far ahead of the one visible to
222*4c7070dbSScott Long 		 * everyone else, or we have exceeded our budget.
223*4c7070dbSScott Long 		 */
224*4c7070dbSScott Long 		if (cidx != pidx && pending < 64 && total < budget)
225*4c7070dbSScott Long 			continue;
226*4c7070dbSScott Long 		critical_enter();
227*4c7070dbSScott Long 		do {
228*4c7070dbSScott Long 			os.state = ns.state = r->state;
229*4c7070dbSScott Long 			ns.cidx = cidx;
230*4c7070dbSScott Long 			ns.flags = state_to_flags(ns, total >= budget);
231*4c7070dbSScott Long 		} while (atomic_cmpset_acq_64(&r->state, os.state, ns.state) == 0);
232*4c7070dbSScott Long 		critical_exit();
233*4c7070dbSScott Long 
234*4c7070dbSScott Long 		if (ns.flags == ABDICATED)
235*4c7070dbSScott Long 			counter_u64_add(r->abdications, 1);
236*4c7070dbSScott Long 		if (ns.flags != BUSY) {
237*4c7070dbSScott Long 			/* Wrong loop exit if we're going to stall. */
238*4c7070dbSScott Long 			MPASS(ns.flags != STALLED);
239*4c7070dbSScott Long 			if (prev == STALLED) {
240*4c7070dbSScott Long 				MPASS(total > 0);
241*4c7070dbSScott Long 				counter_u64_add(r->restarts, 1);
242*4c7070dbSScott Long 			}
243*4c7070dbSScott Long 			break;
244*4c7070dbSScott Long 		}
245*4c7070dbSScott Long 
246*4c7070dbSScott Long 		/*
247*4c7070dbSScott Long 		 * The acquire style atomic above guarantees visibility of items
248*4c7070dbSScott Long 		 * associated with any pidx change that we notice here.
249*4c7070dbSScott Long 		 */
250*4c7070dbSScott Long 		pidx = ns.pidx_tail;
251*4c7070dbSScott Long 		pending = 0;
252*4c7070dbSScott Long 	}
253*4c7070dbSScott Long }
254*4c7070dbSScott Long #endif
255*4c7070dbSScott Long 
256*4c7070dbSScott Long int
257*4c7070dbSScott Long ifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain,
258*4c7070dbSScott Long     mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags)
259*4c7070dbSScott Long {
260*4c7070dbSScott Long 	struct ifmp_ring *r;
261*4c7070dbSScott Long 
262*4c7070dbSScott Long 	/* All idx are 16b so size can be 65536 at most */
263*4c7070dbSScott Long 	if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
264*4c7070dbSScott Long 	    can_drain == NULL)
265*4c7070dbSScott Long 		return (EINVAL);
266*4c7070dbSScott Long 	*pr = NULL;
267*4c7070dbSScott Long 	flags &= M_NOWAIT | M_WAITOK;
268*4c7070dbSScott Long 	MPASS(flags != 0);
269*4c7070dbSScott Long 
270*4c7070dbSScott Long 	r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO);
271*4c7070dbSScott Long 	if (r == NULL)
272*4c7070dbSScott Long 		return (ENOMEM);
273*4c7070dbSScott Long 	r->size = size;
274*4c7070dbSScott Long 	r->cookie = cookie;
275*4c7070dbSScott Long 	r->mt = mt;
276*4c7070dbSScott Long 	r->drain = drain;
277*4c7070dbSScott Long 	r->can_drain = can_drain;
278*4c7070dbSScott Long 	r->enqueues = counter_u64_alloc(flags);
279*4c7070dbSScott Long 	r->drops = counter_u64_alloc(flags);
280*4c7070dbSScott Long 	r->starts = counter_u64_alloc(flags);
281*4c7070dbSScott Long 	r->stalls = counter_u64_alloc(flags);
282*4c7070dbSScott Long 	r->restarts = counter_u64_alloc(flags);
283*4c7070dbSScott Long 	r->abdications = counter_u64_alloc(flags);
284*4c7070dbSScott Long 	if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL ||
285*4c7070dbSScott Long 	    r->stalls == NULL || r->restarts == NULL ||
286*4c7070dbSScott Long 	    r->abdications == NULL) {
287*4c7070dbSScott Long 		ifmp_ring_free(r);
288*4c7070dbSScott Long 		return (ENOMEM);
289*4c7070dbSScott Long 	}
290*4c7070dbSScott Long 
291*4c7070dbSScott Long 	*pr = r;
292*4c7070dbSScott Long #ifdef NO_64BIT_ATOMICS
293*4c7070dbSScott Long 	mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF);
294*4c7070dbSScott Long #endif
295*4c7070dbSScott Long 	return (0);
296*4c7070dbSScott Long }
297*4c7070dbSScott Long 
298*4c7070dbSScott Long void
299*4c7070dbSScott Long ifmp_ring_free(struct ifmp_ring *r)
300*4c7070dbSScott Long {
301*4c7070dbSScott Long 
302*4c7070dbSScott Long 	if (r == NULL)
303*4c7070dbSScott Long 		return;
304*4c7070dbSScott Long 
305*4c7070dbSScott Long 	if (r->enqueues != NULL)
306*4c7070dbSScott Long 		counter_u64_free(r->enqueues);
307*4c7070dbSScott Long 	if (r->drops != NULL)
308*4c7070dbSScott Long 		counter_u64_free(r->drops);
309*4c7070dbSScott Long 	if (r->starts != NULL)
310*4c7070dbSScott Long 		counter_u64_free(r->starts);
311*4c7070dbSScott Long 	if (r->stalls != NULL)
312*4c7070dbSScott Long 		counter_u64_free(r->stalls);
313*4c7070dbSScott Long 	if (r->restarts != NULL)
314*4c7070dbSScott Long 		counter_u64_free(r->restarts);
315*4c7070dbSScott Long 	if (r->abdications != NULL)
316*4c7070dbSScott Long 		counter_u64_free(r->abdications);
317*4c7070dbSScott Long 
318*4c7070dbSScott Long 	free(r, r->mt);
319*4c7070dbSScott Long }
320*4c7070dbSScott Long 
321*4c7070dbSScott Long /*
322*4c7070dbSScott Long  * Enqueue n items and maybe drain the ring for some time.
323*4c7070dbSScott Long  *
324*4c7070dbSScott Long  * Returns an errno.
325*4c7070dbSScott Long  */
326*4c7070dbSScott Long #ifdef NO_64BIT_ATOMICS
327*4c7070dbSScott Long int
328*4c7070dbSScott Long ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget)
329*4c7070dbSScott Long {
330*4c7070dbSScott Long 	union ring_state os, ns;
331*4c7070dbSScott Long 	uint16_t pidx_start, pidx_stop;
332*4c7070dbSScott Long 	int i;
333*4c7070dbSScott Long 
334*4c7070dbSScott Long 	MPASS(items != NULL);
335*4c7070dbSScott Long 	MPASS(n > 0);
336*4c7070dbSScott Long 
337*4c7070dbSScott Long 	mtx_lock(&r->lock);
338*4c7070dbSScott Long 	/*
339*4c7070dbSScott Long 	 * Reserve room for the new items.  Our reservation, if successful, is
340*4c7070dbSScott Long 	 * from 'pidx_start' to 'pidx_stop'.
341*4c7070dbSScott Long 	 */
342*4c7070dbSScott Long 	os.state = r->state;
343*4c7070dbSScott Long 	if (n >= space_available(r, os)) {
344*4c7070dbSScott Long 		counter_u64_add(r->drops, n);
345*4c7070dbSScott Long 		MPASS(os.flags != IDLE);
346*4c7070dbSScott Long 		if (os.flags == STALLED)
347*4c7070dbSScott Long 			ifmp_ring_check_drainage(r, 0);
348*4c7070dbSScott Long 		return (ENOBUFS);
349*4c7070dbSScott Long 	}
350*4c7070dbSScott Long 	ns.state = os.state;
351*4c7070dbSScott Long 	ns.pidx_head = increment_idx(r, os.pidx_head, n);
352*4c7070dbSScott Long 	r->state = ns.state;
353*4c7070dbSScott Long 	pidx_start = os.pidx_head;
354*4c7070dbSScott Long 	pidx_stop = ns.pidx_head;
355*4c7070dbSScott Long 
356*4c7070dbSScott Long 	/*
357*4c7070dbSScott Long 	 * Wait for other producers who got in ahead of us to enqueue their
358*4c7070dbSScott Long 	 * items, one producer at a time.  It is our turn when the ring's
359*4c7070dbSScott Long 	 * pidx_tail reaches the begining of our reservation (pidx_start).
360*4c7070dbSScott Long 	 */
361*4c7070dbSScott Long 	while (ns.pidx_tail != pidx_start) {
362*4c7070dbSScott Long 		cpu_spinwait();
363*4c7070dbSScott Long 		ns.state = r->state;
364*4c7070dbSScott Long 	}
365*4c7070dbSScott Long 
366*4c7070dbSScott Long 	/* Now it is our turn to fill up the area we reserved earlier. */
367*4c7070dbSScott Long 	i = pidx_start;
368*4c7070dbSScott Long 	do {
369*4c7070dbSScott Long 		r->items[i] = *items++;
370*4c7070dbSScott Long 		if (__predict_false(++i == r->size))
371*4c7070dbSScott Long 			i = 0;
372*4c7070dbSScott Long 	} while (i != pidx_stop);
373*4c7070dbSScott Long 
374*4c7070dbSScott Long 	/*
375*4c7070dbSScott Long 	 * Update the ring's pidx_tail.  The release style atomic guarantees
376*4c7070dbSScott Long 	 * that the items are visible to any thread that sees the updated pidx.
377*4c7070dbSScott Long 	 */
378*4c7070dbSScott Long 	os.state = ns.state = r->state;
379*4c7070dbSScott Long 	ns.pidx_tail = pidx_stop;
380*4c7070dbSScott Long 	ns.flags = BUSY;
381*4c7070dbSScott Long 	r->state = ns.state;
382*4c7070dbSScott Long 	counter_u64_add(r->enqueues, n);
383*4c7070dbSScott Long 
384*4c7070dbSScott Long 	/*
385*4c7070dbSScott Long 	 * Turn into a consumer if some other thread isn't active as a consumer
386*4c7070dbSScott Long 	 * already.
387*4c7070dbSScott Long 	 */
388*4c7070dbSScott Long 	if (os.flags != BUSY)
389*4c7070dbSScott Long 		drain_ring_locked(r, ns, os.flags, budget);
390*4c7070dbSScott Long 
391*4c7070dbSScott Long 	mtx_unlock(&r->lock);
392*4c7070dbSScott Long 	return (0);
393*4c7070dbSScott Long }
394*4c7070dbSScott Long 
395*4c7070dbSScott Long #else
396*4c7070dbSScott Long int
397*4c7070dbSScott Long ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget)
398*4c7070dbSScott Long {
399*4c7070dbSScott Long 	union ring_state os, ns;
400*4c7070dbSScott Long 	uint16_t pidx_start, pidx_stop;
401*4c7070dbSScott Long 	int i;
402*4c7070dbSScott Long 
403*4c7070dbSScott Long 	MPASS(items != NULL);
404*4c7070dbSScott Long 	MPASS(n > 0);
405*4c7070dbSScott Long 
406*4c7070dbSScott Long 	/*
407*4c7070dbSScott Long 	 * Reserve room for the new items.  Our reservation, if successful, is
408*4c7070dbSScott Long 	 * from 'pidx_start' to 'pidx_stop'.
409*4c7070dbSScott Long 	 */
410*4c7070dbSScott Long 	for (;;) {
411*4c7070dbSScott Long 		os.state = r->state;
412*4c7070dbSScott Long 		if (n >= space_available(r, os)) {
413*4c7070dbSScott Long 			counter_u64_add(r->drops, n);
414*4c7070dbSScott Long 			MPASS(os.flags != IDLE);
415*4c7070dbSScott Long 			if (os.flags == STALLED)
416*4c7070dbSScott Long 				ifmp_ring_check_drainage(r, 0);
417*4c7070dbSScott Long 			return (ENOBUFS);
418*4c7070dbSScott Long 		}
419*4c7070dbSScott Long 		ns.state = os.state;
420*4c7070dbSScott Long 		ns.pidx_head = increment_idx(r, os.pidx_head, n);
421*4c7070dbSScott Long 		critical_enter();
422*4c7070dbSScott Long 		if (atomic_cmpset_64(&r->state, os.state, ns.state))
423*4c7070dbSScott Long 			break;
424*4c7070dbSScott Long 		critical_exit();
425*4c7070dbSScott Long 		cpu_spinwait();
426*4c7070dbSScott Long 	}
427*4c7070dbSScott Long 	pidx_start = os.pidx_head;
428*4c7070dbSScott Long 	pidx_stop = ns.pidx_head;
429*4c7070dbSScott Long 
430*4c7070dbSScott Long 	/*
431*4c7070dbSScott Long 	 * Wait for other producers who got in ahead of us to enqueue their
432*4c7070dbSScott Long 	 * items, one producer at a time.  It is our turn when the ring's
433*4c7070dbSScott Long 	 * pidx_tail reaches the begining of our reservation (pidx_start).
434*4c7070dbSScott Long 	 */
435*4c7070dbSScott Long 	while (ns.pidx_tail != pidx_start) {
436*4c7070dbSScott Long 		cpu_spinwait();
437*4c7070dbSScott Long 		ns.state = r->state;
438*4c7070dbSScott Long 	}
439*4c7070dbSScott Long 
440*4c7070dbSScott Long 	/* Now it is our turn to fill up the area we reserved earlier. */
441*4c7070dbSScott Long 	i = pidx_start;
442*4c7070dbSScott Long 	do {
443*4c7070dbSScott Long 		r->items[i] = *items++;
444*4c7070dbSScott Long 		if (__predict_false(++i == r->size))
445*4c7070dbSScott Long 			i = 0;
446*4c7070dbSScott Long 	} while (i != pidx_stop);
447*4c7070dbSScott Long 
448*4c7070dbSScott Long 	/*
449*4c7070dbSScott Long 	 * Update the ring's pidx_tail.  The release style atomic guarantees
450*4c7070dbSScott Long 	 * that the items are visible to any thread that sees the updated pidx.
451*4c7070dbSScott Long 	 */
452*4c7070dbSScott Long 	do {
453*4c7070dbSScott Long 		os.state = ns.state = r->state;
454*4c7070dbSScott Long 		ns.pidx_tail = pidx_stop;
455*4c7070dbSScott Long 		ns.flags = BUSY;
456*4c7070dbSScott Long 	} while (atomic_cmpset_rel_64(&r->state, os.state, ns.state) == 0);
457*4c7070dbSScott Long 	critical_exit();
458*4c7070dbSScott Long 	counter_u64_add(r->enqueues, n);
459*4c7070dbSScott Long 
460*4c7070dbSScott Long 	/*
461*4c7070dbSScott Long 	 * Turn into a consumer if some other thread isn't active as a consumer
462*4c7070dbSScott Long 	 * already.
463*4c7070dbSScott Long 	 */
464*4c7070dbSScott Long 	if (os.flags != BUSY)
465*4c7070dbSScott Long 		drain_ring_lockless(r, ns, os.flags, budget);
466*4c7070dbSScott Long 
467*4c7070dbSScott Long 	return (0);
468*4c7070dbSScott Long }
469*4c7070dbSScott Long #endif
470*4c7070dbSScott Long 
471*4c7070dbSScott Long void
472*4c7070dbSScott Long ifmp_ring_check_drainage(struct ifmp_ring *r, int budget)
473*4c7070dbSScott Long {
474*4c7070dbSScott Long 	union ring_state os, ns;
475*4c7070dbSScott Long 
476*4c7070dbSScott Long 	os.state = r->state;
477*4c7070dbSScott Long 	if (os.flags != STALLED || os.pidx_head != os.pidx_tail || r->can_drain(r) == 0)
478*4c7070dbSScott Long 		return;
479*4c7070dbSScott Long 
480*4c7070dbSScott Long 	MPASS(os.cidx != os.pidx_tail);	/* implied by STALLED */
481*4c7070dbSScott Long 	ns.state = os.state;
482*4c7070dbSScott Long 	ns.flags = BUSY;
483*4c7070dbSScott Long 
484*4c7070dbSScott Long 
485*4c7070dbSScott Long #ifdef NO_64BIT_ATOMICS
486*4c7070dbSScott Long 	mtx_lock(&r->lock);
487*4c7070dbSScott Long 	if (r->state != os.state) {
488*4c7070dbSScott Long 		mtx_unlock(&r->lock);
489*4c7070dbSScott Long 		return;
490*4c7070dbSScott Long 	}
491*4c7070dbSScott Long 	r->state = ns.state;
492*4c7070dbSScott Long 	drain_ring_locked(r, ns, os.flags, budget);
493*4c7070dbSScott Long 	mtx_unlock(&r->lock);
494*4c7070dbSScott Long #else
495*4c7070dbSScott Long 	/*
496*4c7070dbSScott Long 	 * The acquire style atomic guarantees visibility of items associated
497*4c7070dbSScott Long 	 * with the pidx that we read here.
498*4c7070dbSScott Long 	 */
499*4c7070dbSScott Long 	if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state))
500*4c7070dbSScott Long 		return;
501*4c7070dbSScott Long 
502*4c7070dbSScott Long 
503*4c7070dbSScott Long 	drain_ring_lockless(r, ns, os.flags, budget);
504*4c7070dbSScott Long #endif
505*4c7070dbSScott Long }
506*4c7070dbSScott Long 
507*4c7070dbSScott Long void
508*4c7070dbSScott Long ifmp_ring_reset_stats(struct ifmp_ring *r)
509*4c7070dbSScott Long {
510*4c7070dbSScott Long 
511*4c7070dbSScott Long 	counter_u64_zero(r->enqueues);
512*4c7070dbSScott Long 	counter_u64_zero(r->drops);
513*4c7070dbSScott Long 	counter_u64_zero(r->starts);
514*4c7070dbSScott Long 	counter_u64_zero(r->stalls);
515*4c7070dbSScott Long 	counter_u64_zero(r->restarts);
516*4c7070dbSScott Long 	counter_u64_zero(r->abdications);
517*4c7070dbSScott Long }
518*4c7070dbSScott Long 
519*4c7070dbSScott Long int
520*4c7070dbSScott Long ifmp_ring_is_idle(struct ifmp_ring *r)
521*4c7070dbSScott Long {
522*4c7070dbSScott Long 	union ring_state s;
523*4c7070dbSScott Long 
524*4c7070dbSScott Long 	s.state = r->state;
525*4c7070dbSScott Long 	if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
526*4c7070dbSScott Long 	    s.flags == IDLE)
527*4c7070dbSScott Long 		return (1);
528*4c7070dbSScott Long 
529*4c7070dbSScott Long 	return (0);
530*4c7070dbSScott Long }
531*4c7070dbSScott Long 
532*4c7070dbSScott Long int
533*4c7070dbSScott Long ifmp_ring_is_stalled(struct ifmp_ring *r)
534*4c7070dbSScott Long {
535*4c7070dbSScott Long 	union ring_state s;
536*4c7070dbSScott Long 
537*4c7070dbSScott Long 	s.state = r->state;
538*4c7070dbSScott Long 	if (s.pidx_head == s.pidx_tail && s.flags == STALLED)
539*4c7070dbSScott Long 		return (1);
540*4c7070dbSScott Long 
541*4c7070dbSScott Long 	return (0);
542*4c7070dbSScott Long }
543