xref: /freebsd/sys/net/mp_ring.c (revision 14e0010729d075f5122e7b2bbb37c5febf914ba4)
14c7070dbSScott Long /*-
24c7070dbSScott Long  * Copyright (c) 2014 Chelsio Communications, Inc.
34c7070dbSScott Long  * All rights reserved.
44c7070dbSScott Long  * Written by: Navdeep Parhar <np@FreeBSD.org>
54c7070dbSScott Long  *
64c7070dbSScott Long  * Redistribution and use in source and binary forms, with or without
74c7070dbSScott Long  * modification, are permitted provided that the following conditions
84c7070dbSScott Long  * are met:
94c7070dbSScott Long  * 1. Redistributions of source code must retain the above copyright
104c7070dbSScott Long  *    notice, this list of conditions and the following disclaimer.
114c7070dbSScott Long  * 2. Redistributions in binary form must reproduce the above copyright
124c7070dbSScott Long  *    notice, this list of conditions and the following disclaimer in the
134c7070dbSScott Long  *    documentation and/or other materials provided with the distribution.
144c7070dbSScott Long  *
154c7070dbSScott Long  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
164c7070dbSScott Long  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
174c7070dbSScott Long  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
184c7070dbSScott Long  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
194c7070dbSScott Long  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
204c7070dbSScott Long  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
214c7070dbSScott Long  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
224c7070dbSScott Long  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
234c7070dbSScott Long  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
244c7070dbSScott Long  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
254c7070dbSScott Long  * SUCH DAMAGE.
264c7070dbSScott Long  */
274c7070dbSScott Long 
284c7070dbSScott Long #include <sys/cdefs.h>
294c7070dbSScott Long __FBSDID("$FreeBSD$");
304c7070dbSScott Long 
314c7070dbSScott Long #include <sys/types.h>
324c7070dbSScott Long #include <sys/param.h>
334c7070dbSScott Long #include <sys/systm.h>
344c7070dbSScott Long #include <sys/counter.h>
354c7070dbSScott Long #include <sys/lock.h>
364c7070dbSScott Long #include <sys/mutex.h>
374c7070dbSScott Long #include <sys/malloc.h>
384c7070dbSScott Long #include <machine/cpu.h>
39fc614c29SScott Long #include <net/mp_ring.h>
40fc614c29SScott Long 
414c7070dbSScott Long union ring_state {
424c7070dbSScott Long 	struct {
434c7070dbSScott Long 		uint16_t pidx_head;
444c7070dbSScott Long 		uint16_t pidx_tail;
454c7070dbSScott Long 		uint16_t cidx;
464c7070dbSScott Long 		uint16_t flags;
474c7070dbSScott Long 	};
484c7070dbSScott Long 	uint64_t state;
494c7070dbSScott Long };
504c7070dbSScott Long 
514c7070dbSScott Long enum {
524c7070dbSScott Long 	IDLE = 0,	/* consumer ran to completion, nothing more to do. */
534c7070dbSScott Long 	BUSY,		/* consumer is running already, or will be shortly. */
544c7070dbSScott Long 	STALLED,	/* consumer stopped due to lack of resources. */
554c7070dbSScott Long 	ABDICATED,	/* consumer stopped even though there was work to be
564c7070dbSScott Long 			   done because it wants another thread to take over. */
574c7070dbSScott Long };
584c7070dbSScott Long 
594c7070dbSScott Long static inline uint16_t
604c7070dbSScott Long space_available(struct ifmp_ring *r, union ring_state s)
614c7070dbSScott Long {
624c7070dbSScott Long 	uint16_t x = r->size - 1;
634c7070dbSScott Long 
644c7070dbSScott Long 	if (s.cidx == s.pidx_head)
654c7070dbSScott Long 		return (x);
664c7070dbSScott Long 	else if (s.cidx > s.pidx_head)
674c7070dbSScott Long 		return (s.cidx - s.pidx_head - 1);
684c7070dbSScott Long 	else
694c7070dbSScott Long 		return (x - s.pidx_head + s.cidx);
704c7070dbSScott Long }
714c7070dbSScott Long 
724c7070dbSScott Long static inline uint16_t
734c7070dbSScott Long increment_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n)
744c7070dbSScott Long {
754c7070dbSScott Long 	int x = r->size - idx;
764c7070dbSScott Long 
774c7070dbSScott Long 	MPASS(x > 0);
784c7070dbSScott Long 	return (x > n ? idx + n : n - x);
794c7070dbSScott Long }
804c7070dbSScott Long 
814c7070dbSScott Long /* Consumer is about to update the ring's state to s */
824c7070dbSScott Long static inline uint16_t
834c7070dbSScott Long state_to_flags(union ring_state s, int abdicate)
844c7070dbSScott Long {
854c7070dbSScott Long 
864c7070dbSScott Long 	if (s.cidx == s.pidx_tail)
874c7070dbSScott Long 		return (IDLE);
884c7070dbSScott Long 	else if (abdicate && s.pidx_tail != s.pidx_head)
894c7070dbSScott Long 		return (ABDICATED);
904c7070dbSScott Long 
914c7070dbSScott Long 	return (BUSY);
924c7070dbSScott Long }
934c7070dbSScott Long 
945881181dSMatt Macy #ifdef MP_RING_NO_64BIT_ATOMICS
954c7070dbSScott Long static void
964c7070dbSScott Long drain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
974c7070dbSScott Long {
984c7070dbSScott Long 	union ring_state ns;
994c7070dbSScott Long 	int n, pending, total;
1004c7070dbSScott Long 	uint16_t cidx = os.cidx;
1014c7070dbSScott Long 	uint16_t pidx = os.pidx_tail;
1024c7070dbSScott Long 
1034c7070dbSScott Long 	MPASS(os.flags == BUSY);
1044c7070dbSScott Long 	MPASS(cidx != pidx);
1054c7070dbSScott Long 
1064c7070dbSScott Long 	if (prev == IDLE)
1074c7070dbSScott Long 		counter_u64_add(r->starts, 1);
1084c7070dbSScott Long 	pending = 0;
1094c7070dbSScott Long 	total = 0;
1104c7070dbSScott Long 
1114c7070dbSScott Long 	while (cidx != pidx) {
1124c7070dbSScott Long 
1134c7070dbSScott Long 		/* Items from cidx to pidx are available for consumption. */
1144c7070dbSScott Long 		n = r->drain(r, cidx, pidx);
1154c7070dbSScott Long 		if (n == 0) {
1164c7070dbSScott Long 			os.state = ns.state = r->state;
1174c7070dbSScott Long 			ns.cidx = cidx;
1184c7070dbSScott Long 			ns.flags = STALLED;
1194c7070dbSScott Long 			r->state = ns.state;
1204c7070dbSScott Long 			if (prev != STALLED)
1214c7070dbSScott Long 				counter_u64_add(r->stalls, 1);
1224c7070dbSScott Long 			else if (total > 0) {
1234c7070dbSScott Long 				counter_u64_add(r->restarts, 1);
1244c7070dbSScott Long 				counter_u64_add(r->stalls, 1);
1254c7070dbSScott Long 			}
1264c7070dbSScott Long 			break;
1274c7070dbSScott Long 		}
1284c7070dbSScott Long 		cidx = increment_idx(r, cidx, n);
1294c7070dbSScott Long 		pending += n;
1304c7070dbSScott Long 		total += n;
1314c7070dbSScott Long 
1324c7070dbSScott Long 		/*
1334c7070dbSScott Long 		 * We update the cidx only if we've caught up with the pidx, the
1344c7070dbSScott Long 		 * real cidx is getting too far ahead of the one visible to
1354c7070dbSScott Long 		 * everyone else, or we have exceeded our budget.
1364c7070dbSScott Long 		 */
1374c7070dbSScott Long 		if (cidx != pidx && pending < 64 && total < budget)
1384c7070dbSScott Long 			continue;
1394c7070dbSScott Long 
1404c7070dbSScott Long 		os.state = ns.state = r->state;
1414c7070dbSScott Long 		ns.cidx = cidx;
1424c7070dbSScott Long 		ns.flags = state_to_flags(ns, total >= budget);
1434c7070dbSScott Long 		r->state = ns.state;
1444c7070dbSScott Long 
1454c7070dbSScott Long 		if (ns.flags == ABDICATED)
1464c7070dbSScott Long 			counter_u64_add(r->abdications, 1);
1474c7070dbSScott Long 		if (ns.flags != BUSY) {
1484c7070dbSScott Long 			/* Wrong loop exit if we're going to stall. */
1494c7070dbSScott Long 			MPASS(ns.flags != STALLED);
1504c7070dbSScott Long 			if (prev == STALLED) {
1514c7070dbSScott Long 				MPASS(total > 0);
1524c7070dbSScott Long 				counter_u64_add(r->restarts, 1);
1534c7070dbSScott Long 			}
1544c7070dbSScott Long 			break;
1554c7070dbSScott Long 		}
1564c7070dbSScott Long 
1574c7070dbSScott Long 		/*
1584c7070dbSScott Long 		 * The acquire style atomic above guarantees visibility of items
1594c7070dbSScott Long 		 * associated with any pidx change that we notice here.
1604c7070dbSScott Long 		 */
1614c7070dbSScott Long 		pidx = ns.pidx_tail;
1624c7070dbSScott Long 		pending = 0;
1634c7070dbSScott Long 	}
1644c7070dbSScott Long }
1654c7070dbSScott Long #else
1664c7070dbSScott Long /*
1674c7070dbSScott Long  * Caller passes in a state, with a guarantee that there is work to do and that
1684c7070dbSScott Long  * all items up to the pidx_tail in the state are visible.
1694c7070dbSScott Long  */
1704c7070dbSScott Long static void
1714c7070dbSScott Long drain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
1724c7070dbSScott Long {
1734c7070dbSScott Long 	union ring_state ns;
1744c7070dbSScott Long 	int n, pending, total;
1754c7070dbSScott Long 	uint16_t cidx = os.cidx;
1764c7070dbSScott Long 	uint16_t pidx = os.pidx_tail;
1774c7070dbSScott Long 
1784c7070dbSScott Long 	MPASS(os.flags == BUSY);
1794c7070dbSScott Long 	MPASS(cidx != pidx);
1804c7070dbSScott Long 
1814c7070dbSScott Long 	if (prev == IDLE)
1824c7070dbSScott Long 		counter_u64_add(r->starts, 1);
1834c7070dbSScott Long 	pending = 0;
1844c7070dbSScott Long 	total = 0;
1854c7070dbSScott Long 
1864c7070dbSScott Long 	while (cidx != pidx) {
1874c7070dbSScott Long 
1884c7070dbSScott Long 		/* Items from cidx to pidx are available for consumption. */
1894c7070dbSScott Long 		n = r->drain(r, cidx, pidx);
1904c7070dbSScott Long 		if (n == 0) {
1914c7070dbSScott Long 			critical_enter();
192*14e00107SMarius Strobl 			os.state = r->state;
1934c7070dbSScott Long 			do {
194*14e00107SMarius Strobl 				ns.state = os.state;
1954c7070dbSScott Long 				ns.cidx = cidx;
1964c7070dbSScott Long 				ns.flags = STALLED;
197*14e00107SMarius Strobl 			} while (atomic_fcmpset_64(&r->state, &os.state,
1984c7070dbSScott Long 			    ns.state) == 0);
1994c7070dbSScott Long 			critical_exit();
2004c7070dbSScott Long 			if (prev != STALLED)
2014c7070dbSScott Long 				counter_u64_add(r->stalls, 1);
2024c7070dbSScott Long 			else if (total > 0) {
2034c7070dbSScott Long 				counter_u64_add(r->restarts, 1);
2044c7070dbSScott Long 				counter_u64_add(r->stalls, 1);
2054c7070dbSScott Long 			}
2064c7070dbSScott Long 			break;
2074c7070dbSScott Long 		}
2084c7070dbSScott Long 		cidx = increment_idx(r, cidx, n);
2094c7070dbSScott Long 		pending += n;
2104c7070dbSScott Long 		total += n;
2114c7070dbSScott Long 
2124c7070dbSScott Long 		/*
2134c7070dbSScott Long 		 * We update the cidx only if we've caught up with the pidx, the
2144c7070dbSScott Long 		 * real cidx is getting too far ahead of the one visible to
2154c7070dbSScott Long 		 * everyone else, or we have exceeded our budget.
2164c7070dbSScott Long 		 */
2174c7070dbSScott Long 		if (cidx != pidx && pending < 64 && total < budget)
2184c7070dbSScott Long 			continue;
2194c7070dbSScott Long 		critical_enter();
220*14e00107SMarius Strobl 		os.state = r->state;
221ab2e3f79SStephen Hurd 		do {
222*14e00107SMarius Strobl 			ns.state = os.state;
2234c7070dbSScott Long 			ns.cidx = cidx;
2244c7070dbSScott Long 			ns.flags = state_to_flags(ns, total >= budget);
225*14e00107SMarius Strobl 		} while (atomic_fcmpset_acq_64(&r->state, &os.state,
226*14e00107SMarius Strobl 		    ns.state) == 0);
2274c7070dbSScott Long 		critical_exit();
2284c7070dbSScott Long 
2294c7070dbSScott Long 		if (ns.flags == ABDICATED)
2304c7070dbSScott Long 			counter_u64_add(r->abdications, 1);
2314c7070dbSScott Long 		if (ns.flags != BUSY) {
2324c7070dbSScott Long 			/* Wrong loop exit if we're going to stall. */
2334c7070dbSScott Long 			MPASS(ns.flags != STALLED);
2344c7070dbSScott Long 			if (prev == STALLED) {
2354c7070dbSScott Long 				MPASS(total > 0);
2364c7070dbSScott Long 				counter_u64_add(r->restarts, 1);
2374c7070dbSScott Long 			}
2384c7070dbSScott Long 			break;
2394c7070dbSScott Long 		}
2404c7070dbSScott Long 
2414c7070dbSScott Long 		/*
2424c7070dbSScott Long 		 * The acquire style atomic above guarantees visibility of items
2434c7070dbSScott Long 		 * associated with any pidx change that we notice here.
2444c7070dbSScott Long 		 */
2454c7070dbSScott Long 		pidx = ns.pidx_tail;
2464c7070dbSScott Long 		pending = 0;
2474c7070dbSScott Long 	}
2484c7070dbSScott Long }
2494c7070dbSScott Long #endif
2504c7070dbSScott Long 
2514c7070dbSScott Long int
2524c7070dbSScott Long ifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain,
2534c7070dbSScott Long     mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags)
2544c7070dbSScott Long {
2554c7070dbSScott Long 	struct ifmp_ring *r;
2564c7070dbSScott Long 
2574c7070dbSScott Long 	/* All idx are 16b so size can be 65536 at most */
2584c7070dbSScott Long 	if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
2594c7070dbSScott Long 	    can_drain == NULL)
2604c7070dbSScott Long 		return (EINVAL);
2614c7070dbSScott Long 	*pr = NULL;
2624c7070dbSScott Long 	flags &= M_NOWAIT | M_WAITOK;
2634c7070dbSScott Long 	MPASS(flags != 0);
2644c7070dbSScott Long 
2654c7070dbSScott Long 	r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO);
2664c7070dbSScott Long 	if (r == NULL)
2674c7070dbSScott Long 		return (ENOMEM);
2684c7070dbSScott Long 	r->size = size;
2694c7070dbSScott Long 	r->cookie = cookie;
2704c7070dbSScott Long 	r->mt = mt;
2714c7070dbSScott Long 	r->drain = drain;
2724c7070dbSScott Long 	r->can_drain = can_drain;
2734c7070dbSScott Long 	r->enqueues = counter_u64_alloc(flags);
2744c7070dbSScott Long 	r->drops = counter_u64_alloc(flags);
2754c7070dbSScott Long 	r->starts = counter_u64_alloc(flags);
2764c7070dbSScott Long 	r->stalls = counter_u64_alloc(flags);
2774c7070dbSScott Long 	r->restarts = counter_u64_alloc(flags);
2784c7070dbSScott Long 	r->abdications = counter_u64_alloc(flags);
2794c7070dbSScott Long 	if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL ||
2804c7070dbSScott Long 	    r->stalls == NULL || r->restarts == NULL ||
2814c7070dbSScott Long 	    r->abdications == NULL) {
2824c7070dbSScott Long 		ifmp_ring_free(r);
2834c7070dbSScott Long 		return (ENOMEM);
2844c7070dbSScott Long 	}
2854c7070dbSScott Long 
2864c7070dbSScott Long 	*pr = r;
2875881181dSMatt Macy #ifdef MP_RING_NO_64BIT_ATOMICS
2884c7070dbSScott Long 	mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF);
2894c7070dbSScott Long #endif
2904c7070dbSScott Long 	return (0);
2914c7070dbSScott Long }
2924c7070dbSScott Long 
2934c7070dbSScott Long void
2944c7070dbSScott Long ifmp_ring_free(struct ifmp_ring *r)
2954c7070dbSScott Long {
2964c7070dbSScott Long 
2974c7070dbSScott Long 	if (r == NULL)
2984c7070dbSScott Long 		return;
2994c7070dbSScott Long 
3004c7070dbSScott Long 	if (r->enqueues != NULL)
3014c7070dbSScott Long 		counter_u64_free(r->enqueues);
3024c7070dbSScott Long 	if (r->drops != NULL)
3034c7070dbSScott Long 		counter_u64_free(r->drops);
3044c7070dbSScott Long 	if (r->starts != NULL)
3054c7070dbSScott Long 		counter_u64_free(r->starts);
3064c7070dbSScott Long 	if (r->stalls != NULL)
3074c7070dbSScott Long 		counter_u64_free(r->stalls);
3084c7070dbSScott Long 	if (r->restarts != NULL)
3094c7070dbSScott Long 		counter_u64_free(r->restarts);
3104c7070dbSScott Long 	if (r->abdications != NULL)
3114c7070dbSScott Long 		counter_u64_free(r->abdications);
3124c7070dbSScott Long 
3134c7070dbSScott Long 	free(r, r->mt);
3144c7070dbSScott Long }
3154c7070dbSScott Long 
3164c7070dbSScott Long /*
3174c7070dbSScott Long  * Enqueue n items and maybe drain the ring for some time.
3184c7070dbSScott Long  *
3194c7070dbSScott Long  * Returns an errno.
3204c7070dbSScott Long  */
3215881181dSMatt Macy #ifdef MP_RING_NO_64BIT_ATOMICS
3224c7070dbSScott Long int
323fe51d4cdSStephen Hurd ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
3244c7070dbSScott Long {
3254c7070dbSScott Long 	union ring_state os, ns;
3264c7070dbSScott Long 	uint16_t pidx_start, pidx_stop;
3274c7070dbSScott Long 	int i;
3284c7070dbSScott Long 
3294c7070dbSScott Long 	MPASS(items != NULL);
3304c7070dbSScott Long 	MPASS(n > 0);
3314c7070dbSScott Long 
3324c7070dbSScott Long 	mtx_lock(&r->lock);
3334c7070dbSScott Long 	/*
3344c7070dbSScott Long 	 * Reserve room for the new items.  Our reservation, if successful, is
3354c7070dbSScott Long 	 * from 'pidx_start' to 'pidx_stop'.
3364c7070dbSScott Long 	 */
3374c7070dbSScott Long 	os.state = r->state;
3384c7070dbSScott Long 	if (n >= space_available(r, os)) {
3394c7070dbSScott Long 		counter_u64_add(r->drops, n);
3404c7070dbSScott Long 		MPASS(os.flags != IDLE);
341e335651eSMatt Macy 		mtx_unlock(&r->lock);
3424c7070dbSScott Long 		if (os.flags == STALLED)
3434c7070dbSScott Long 			ifmp_ring_check_drainage(r, 0);
3444c7070dbSScott Long 		return (ENOBUFS);
3454c7070dbSScott Long 	}
3464c7070dbSScott Long 	ns.state = os.state;
3474c7070dbSScott Long 	ns.pidx_head = increment_idx(r, os.pidx_head, n);
3484c7070dbSScott Long 	r->state = ns.state;
3494c7070dbSScott Long 	pidx_start = os.pidx_head;
3504c7070dbSScott Long 	pidx_stop = ns.pidx_head;
3514c7070dbSScott Long 
3524c7070dbSScott Long 	/*
3534c7070dbSScott Long 	 * Wait for other producers who got in ahead of us to enqueue their
3544c7070dbSScott Long 	 * items, one producer at a time.  It is our turn when the ring's
355efc457e1SPedro F. Giffuni 	 * pidx_tail reaches the beginning of our reservation (pidx_start).
3564c7070dbSScott Long 	 */
3574c7070dbSScott Long 	while (ns.pidx_tail != pidx_start) {
3584c7070dbSScott Long 		cpu_spinwait();
3594c7070dbSScott Long 		ns.state = r->state;
3604c7070dbSScott Long 	}
3614c7070dbSScott Long 
3624c7070dbSScott Long 	/* Now it is our turn to fill up the area we reserved earlier. */
3634c7070dbSScott Long 	i = pidx_start;
3644c7070dbSScott Long 	do {
3654c7070dbSScott Long 		r->items[i] = *items++;
3664c7070dbSScott Long 		if (__predict_false(++i == r->size))
3674c7070dbSScott Long 			i = 0;
3684c7070dbSScott Long 	} while (i != pidx_stop);
3694c7070dbSScott Long 
3704c7070dbSScott Long 	/*
3714c7070dbSScott Long 	 * Update the ring's pidx_tail.  The release style atomic guarantees
3724c7070dbSScott Long 	 * that the items are visible to any thread that sees the updated pidx.
3734c7070dbSScott Long 	 */
3744c7070dbSScott Long 	os.state = ns.state = r->state;
3754c7070dbSScott Long 	ns.pidx_tail = pidx_stop;
376fe51d4cdSStephen Hurd 	if (abdicate) {
377fe51d4cdSStephen Hurd 		if (os.flags == IDLE)
378fe51d4cdSStephen Hurd 			ns.flags = ABDICATED;
379*14e00107SMarius Strobl 	} else
3804c7070dbSScott Long 		ns.flags = BUSY;
3814c7070dbSScott Long 	r->state = ns.state;
3824c7070dbSScott Long 	counter_u64_add(r->enqueues, n);
3834c7070dbSScott Long 
384fe51d4cdSStephen Hurd 	if (!abdicate) {
3854c7070dbSScott Long 		/*
3864c7070dbSScott Long 		 * Turn into a consumer if some other thread isn't active as a consumer
3874c7070dbSScott Long 		 * already.
3884c7070dbSScott Long 		 */
3894c7070dbSScott Long 		if (os.flags != BUSY)
3904c7070dbSScott Long 			drain_ring_locked(r, ns, os.flags, budget);
391fe51d4cdSStephen Hurd 	}
3924c7070dbSScott Long 
3934c7070dbSScott Long 	mtx_unlock(&r->lock);
3944c7070dbSScott Long 	return (0);
3954c7070dbSScott Long }
3964c7070dbSScott Long #else
3974c7070dbSScott Long int
398fe51d4cdSStephen Hurd ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
3994c7070dbSScott Long {
4004c7070dbSScott Long 	union ring_state os, ns;
4014c7070dbSScott Long 	uint16_t pidx_start, pidx_stop;
4024c7070dbSScott Long 	int i;
4034c7070dbSScott Long 
4044c7070dbSScott Long 	MPASS(items != NULL);
4054c7070dbSScott Long 	MPASS(n > 0);
4064c7070dbSScott Long 
4074c7070dbSScott Long 	/*
4084c7070dbSScott Long 	 * Reserve room for the new items.  Our reservation, if successful, is
4094c7070dbSScott Long 	 * from 'pidx_start' to 'pidx_stop'.
4104c7070dbSScott Long 	 */
4114c7070dbSScott Long 	os.state = r->state;
412*14e00107SMarius Strobl 	for (;;) {
4134c7070dbSScott Long 		if (n >= space_available(r, os)) {
4144c7070dbSScott Long 			counter_u64_add(r->drops, n);
4154c7070dbSScott Long 			MPASS(os.flags != IDLE);
4164c7070dbSScott Long 			if (os.flags == STALLED)
4174c7070dbSScott Long 				ifmp_ring_check_drainage(r, 0);
4184c7070dbSScott Long 			return (ENOBUFS);
4194c7070dbSScott Long 		}
4204c7070dbSScott Long 		ns.state = os.state;
4214c7070dbSScott Long 		ns.pidx_head = increment_idx(r, os.pidx_head, n);
4224c7070dbSScott Long 		critical_enter();
423*14e00107SMarius Strobl 		if (atomic_fcmpset_64(&r->state, &os.state, ns.state))
4244c7070dbSScott Long 			break;
4254c7070dbSScott Long 		critical_exit();
4264c7070dbSScott Long 		cpu_spinwait();
4274c7070dbSScott Long 	}
4284c7070dbSScott Long 	pidx_start = os.pidx_head;
4294c7070dbSScott Long 	pidx_stop = ns.pidx_head;
4304c7070dbSScott Long 
4314c7070dbSScott Long 	/*
4324c7070dbSScott Long 	 * Wait for other producers who got in ahead of us to enqueue their
4334c7070dbSScott Long 	 * items, one producer at a time.  It is our turn when the ring's
434efc457e1SPedro F. Giffuni 	 * pidx_tail reaches the beginning of our reservation (pidx_start).
4354c7070dbSScott Long 	 */
4364c7070dbSScott Long 	while (ns.pidx_tail != pidx_start) {
4374c7070dbSScott Long 		cpu_spinwait();
4384c7070dbSScott Long 		ns.state = r->state;
4394c7070dbSScott Long 	}
4404c7070dbSScott Long 
4414c7070dbSScott Long 	/* Now it is our turn to fill up the area we reserved earlier. */
4424c7070dbSScott Long 	i = pidx_start;
4434c7070dbSScott Long 	do {
4444c7070dbSScott Long 		r->items[i] = *items++;
4454c7070dbSScott Long 		if (__predict_false(++i == r->size))
4464c7070dbSScott Long 			i = 0;
4474c7070dbSScott Long 	} while (i != pidx_stop);
4484c7070dbSScott Long 
4494c7070dbSScott Long 	/*
4504c7070dbSScott Long 	 * Update the ring's pidx_tail.  The release style atomic guarantees
4514c7070dbSScott Long 	 * that the items are visible to any thread that sees the updated pidx.
4524c7070dbSScott Long 	 */
453*14e00107SMarius Strobl 	os.state = r->state;
4544c7070dbSScott Long 	do {
455*14e00107SMarius Strobl 		ns.state = os.state;
4564c7070dbSScott Long 		ns.pidx_tail = pidx_stop;
457fe51d4cdSStephen Hurd 		if (abdicate) {
4581225d9daSStephen Hurd 			if (os.flags == IDLE)
4591225d9daSStephen Hurd 				ns.flags = ABDICATED;
460*14e00107SMarius Strobl 		} else
461fe51d4cdSStephen Hurd 			ns.flags = BUSY;
462*14e00107SMarius Strobl 	} while (atomic_fcmpset_rel_64(&r->state, &os.state, ns.state) == 0);
4634c7070dbSScott Long 	critical_exit();
4644c7070dbSScott Long 	counter_u64_add(r->enqueues, n);
4654c7070dbSScott Long 
466fe51d4cdSStephen Hurd 	if (!abdicate) {
467fe51d4cdSStephen Hurd 		/*
468fe51d4cdSStephen Hurd 		 * Turn into a consumer if some other thread isn't active as a consumer
469fe51d4cdSStephen Hurd 		 * already.
470fe51d4cdSStephen Hurd 		 */
471fe51d4cdSStephen Hurd 		if (os.flags != BUSY)
472fe51d4cdSStephen Hurd 			drain_ring_lockless(r, ns, os.flags, budget);
473fe51d4cdSStephen Hurd 	}
474fe51d4cdSStephen Hurd 
4754c7070dbSScott Long 	return (0);
4764c7070dbSScott Long }
4774c7070dbSScott Long #endif
4784c7070dbSScott Long 
4794c7070dbSScott Long void
4804c7070dbSScott Long ifmp_ring_check_drainage(struct ifmp_ring *r, int budget)
4814c7070dbSScott Long {
4824c7070dbSScott Long 	union ring_state os, ns;
4834c7070dbSScott Long 
4844c7070dbSScott Long 	os.state = r->state;
4851225d9daSStephen Hurd 	if ((os.flags != STALLED && os.flags != ABDICATED) ||	// Only continue in STALLED and ABDICATED
4861225d9daSStephen Hurd 	    os.pidx_head != os.pidx_tail ||			// Require work to be available
4871225d9daSStephen Hurd 	    (os.flags != ABDICATED && r->can_drain(r) == 0))	// Can either drain, or everyone left
4884c7070dbSScott Long 		return;
4894c7070dbSScott Long 
4904c7070dbSScott Long 	MPASS(os.cidx != os.pidx_tail);	/* implied by STALLED */
4914c7070dbSScott Long 	ns.state = os.state;
4924c7070dbSScott Long 	ns.flags = BUSY;
4934c7070dbSScott Long 
4944c7070dbSScott Long 
4955881181dSMatt Macy #ifdef MP_RING_NO_64BIT_ATOMICS
4964c7070dbSScott Long 	mtx_lock(&r->lock);
4974c7070dbSScott Long 	if (r->state != os.state) {
4984c7070dbSScott Long 		mtx_unlock(&r->lock);
4994c7070dbSScott Long 		return;
5004c7070dbSScott Long 	}
5014c7070dbSScott Long 	r->state = ns.state;
5024c7070dbSScott Long 	drain_ring_locked(r, ns, os.flags, budget);
5034c7070dbSScott Long 	mtx_unlock(&r->lock);
5044c7070dbSScott Long #else
5054c7070dbSScott Long 	/*
5064c7070dbSScott Long 	 * The acquire style atomic guarantees visibility of items associated
5074c7070dbSScott Long 	 * with the pidx that we read here.
5084c7070dbSScott Long 	 */
5094c7070dbSScott Long 	if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state))
5104c7070dbSScott Long 		return;
5114c7070dbSScott Long 
5124c7070dbSScott Long 
5134c7070dbSScott Long 	drain_ring_lockless(r, ns, os.flags, budget);
5144c7070dbSScott Long #endif
5154c7070dbSScott Long }
5164c7070dbSScott Long 
5174c7070dbSScott Long void
5184c7070dbSScott Long ifmp_ring_reset_stats(struct ifmp_ring *r)
5194c7070dbSScott Long {
5204c7070dbSScott Long 
5214c7070dbSScott Long 	counter_u64_zero(r->enqueues);
5224c7070dbSScott Long 	counter_u64_zero(r->drops);
5234c7070dbSScott Long 	counter_u64_zero(r->starts);
5244c7070dbSScott Long 	counter_u64_zero(r->stalls);
5254c7070dbSScott Long 	counter_u64_zero(r->restarts);
5264c7070dbSScott Long 	counter_u64_zero(r->abdications);
5274c7070dbSScott Long }
5284c7070dbSScott Long 
5294c7070dbSScott Long int
5304c7070dbSScott Long ifmp_ring_is_idle(struct ifmp_ring *r)
5314c7070dbSScott Long {
5324c7070dbSScott Long 	union ring_state s;
5334c7070dbSScott Long 
5344c7070dbSScott Long 	s.state = r->state;
5354c7070dbSScott Long 	if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
5364c7070dbSScott Long 	    s.flags == IDLE)
5374c7070dbSScott Long 		return (1);
5384c7070dbSScott Long 
5394c7070dbSScott Long 	return (0);
5404c7070dbSScott Long }
5414c7070dbSScott Long 
5424c7070dbSScott Long int
5434c7070dbSScott Long ifmp_ring_is_stalled(struct ifmp_ring *r)
5444c7070dbSScott Long {
5454c7070dbSScott Long 	union ring_state s;
5464c7070dbSScott Long 
5474c7070dbSScott Long 	s.state = r->state;
5484c7070dbSScott Long 	if (s.pidx_head == s.pidx_tail && s.flags == STALLED)
5494c7070dbSScott Long 		return (1);
5504c7070dbSScott Long 
5514c7070dbSScott Long 	return (0);
5524c7070dbSScott Long }
553