xref: /freebsd/sys/net/mp_ring.c (revision efc457e1bc3af3880ad3a562bcf963c7ccf4b38c)
14c7070dbSScott Long /*-
24c7070dbSScott Long  * Copyright (c) 2014 Chelsio Communications, Inc.
34c7070dbSScott Long  * All rights reserved.
44c7070dbSScott Long  * Written by: Navdeep Parhar <np@FreeBSD.org>
54c7070dbSScott Long  *
64c7070dbSScott Long  * Redistribution and use in source and binary forms, with or without
74c7070dbSScott Long  * modification, are permitted provided that the following conditions
84c7070dbSScott Long  * are met:
94c7070dbSScott Long  * 1. Redistributions of source code must retain the above copyright
104c7070dbSScott Long  *    notice, this list of conditions and the following disclaimer.
114c7070dbSScott Long  * 2. Redistributions in binary form must reproduce the above copyright
124c7070dbSScott Long  *    notice, this list of conditions and the following disclaimer in the
134c7070dbSScott Long  *    documentation and/or other materials provided with the distribution.
144c7070dbSScott Long  *
154c7070dbSScott Long  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
164c7070dbSScott Long  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
174c7070dbSScott Long  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
184c7070dbSScott Long  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
194c7070dbSScott Long  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
204c7070dbSScott Long  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
214c7070dbSScott Long  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
224c7070dbSScott Long  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
234c7070dbSScott Long  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
244c7070dbSScott Long  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
254c7070dbSScott Long  * SUCH DAMAGE.
264c7070dbSScott Long  */
274c7070dbSScott Long 
284c7070dbSScott Long #include <sys/cdefs.h>
294c7070dbSScott Long __FBSDID("$FreeBSD$");
304c7070dbSScott Long 
314c7070dbSScott Long #include <sys/types.h>
324c7070dbSScott Long #include <sys/param.h>
334c7070dbSScott Long #include <sys/systm.h>
344c7070dbSScott Long #include <sys/counter.h>
354c7070dbSScott Long #include <sys/lock.h>
364c7070dbSScott Long #include <sys/mutex.h>
374c7070dbSScott Long #include <sys/malloc.h>
384c7070dbSScott Long #include <machine/cpu.h>
394c7070dbSScott Long 
40fc614c29SScott Long #if defined(__powerpc__) || defined(__mips__)
41fc614c29SScott Long #define NO_64BIT_ATOMICS
42fc614c29SScott Long #endif
434c7070dbSScott Long 
444c7070dbSScott Long #if defined(__i386__)
454c7070dbSScott Long #define atomic_cmpset_acq_64 atomic_cmpset_64
464c7070dbSScott Long #define atomic_cmpset_rel_64 atomic_cmpset_64
474c7070dbSScott Long #endif
484c7070dbSScott Long 
49fc614c29SScott Long #include <net/mp_ring.h>
50fc614c29SScott Long 
514c7070dbSScott Long union ring_state {
524c7070dbSScott Long 	struct {
534c7070dbSScott Long 		uint16_t pidx_head;
544c7070dbSScott Long 		uint16_t pidx_tail;
554c7070dbSScott Long 		uint16_t cidx;
564c7070dbSScott Long 		uint16_t flags;
574c7070dbSScott Long 	};
584c7070dbSScott Long 	uint64_t state;
594c7070dbSScott Long };
604c7070dbSScott Long 
614c7070dbSScott Long enum {
624c7070dbSScott Long 	IDLE = 0,	/* consumer ran to completion, nothing more to do. */
634c7070dbSScott Long 	BUSY,		/* consumer is running already, or will be shortly. */
644c7070dbSScott Long 	STALLED,	/* consumer stopped due to lack of resources. */
654c7070dbSScott Long 	ABDICATED,	/* consumer stopped even though there was work to be
664c7070dbSScott Long 			   done because it wants another thread to take over. */
674c7070dbSScott Long };
684c7070dbSScott Long 
694c7070dbSScott Long static inline uint16_t
704c7070dbSScott Long space_available(struct ifmp_ring *r, union ring_state s)
714c7070dbSScott Long {
724c7070dbSScott Long 	uint16_t x = r->size - 1;
734c7070dbSScott Long 
744c7070dbSScott Long 	if (s.cidx == s.pidx_head)
754c7070dbSScott Long 		return (x);
764c7070dbSScott Long 	else if (s.cidx > s.pidx_head)
774c7070dbSScott Long 		return (s.cidx - s.pidx_head - 1);
784c7070dbSScott Long 	else
794c7070dbSScott Long 		return (x - s.pidx_head + s.cidx);
804c7070dbSScott Long }
814c7070dbSScott Long 
824c7070dbSScott Long static inline uint16_t
834c7070dbSScott Long increment_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n)
844c7070dbSScott Long {
854c7070dbSScott Long 	int x = r->size - idx;
864c7070dbSScott Long 
874c7070dbSScott Long 	MPASS(x > 0);
884c7070dbSScott Long 	return (x > n ? idx + n : n - x);
894c7070dbSScott Long }
904c7070dbSScott Long 
914c7070dbSScott Long /* Consumer is about to update the ring's state to s */
924c7070dbSScott Long static inline uint16_t
934c7070dbSScott Long state_to_flags(union ring_state s, int abdicate)
944c7070dbSScott Long {
954c7070dbSScott Long 
964c7070dbSScott Long 	if (s.cidx == s.pidx_tail)
974c7070dbSScott Long 		return (IDLE);
984c7070dbSScott Long 	else if (abdicate && s.pidx_tail != s.pidx_head)
994c7070dbSScott Long 		return (ABDICATED);
1004c7070dbSScott Long 
1014c7070dbSScott Long 	return (BUSY);
1024c7070dbSScott Long }
1034c7070dbSScott Long 
1044c7070dbSScott Long #ifdef NO_64BIT_ATOMICS
1054c7070dbSScott Long static void
1064c7070dbSScott Long drain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
1074c7070dbSScott Long {
1084c7070dbSScott Long 	union ring_state ns;
1094c7070dbSScott Long 	int n, pending, total;
1104c7070dbSScott Long 	uint16_t cidx = os.cidx;
1114c7070dbSScott Long 	uint16_t pidx = os.pidx_tail;
1124c7070dbSScott Long 
1134c7070dbSScott Long 	MPASS(os.flags == BUSY);
1144c7070dbSScott Long 	MPASS(cidx != pidx);
1154c7070dbSScott Long 
1164c7070dbSScott Long 	if (prev == IDLE)
1174c7070dbSScott Long 		counter_u64_add(r->starts, 1);
1184c7070dbSScott Long 	pending = 0;
1194c7070dbSScott Long 	total = 0;
1204c7070dbSScott Long 
1214c7070dbSScott Long 	while (cidx != pidx) {
1224c7070dbSScott Long 
1234c7070dbSScott Long 		/* Items from cidx to pidx are available for consumption. */
1244c7070dbSScott Long 		n = r->drain(r, cidx, pidx);
1254c7070dbSScott Long 		if (n == 0) {
1264c7070dbSScott Long 			os.state = ns.state = r->state;
1274c7070dbSScott Long 			ns.cidx = cidx;
1284c7070dbSScott Long 			ns.flags = STALLED;
1294c7070dbSScott Long 			r->state = ns.state;
1304c7070dbSScott Long 			if (prev != STALLED)
1314c7070dbSScott Long 				counter_u64_add(r->stalls, 1);
1324c7070dbSScott Long 			else if (total > 0) {
1334c7070dbSScott Long 				counter_u64_add(r->restarts, 1);
1344c7070dbSScott Long 				counter_u64_add(r->stalls, 1);
1354c7070dbSScott Long 			}
1364c7070dbSScott Long 			break;
1374c7070dbSScott Long 		}
1384c7070dbSScott Long 		cidx = increment_idx(r, cidx, n);
1394c7070dbSScott Long 		pending += n;
1404c7070dbSScott Long 		total += n;
1414c7070dbSScott Long 
1424c7070dbSScott Long 		/*
1434c7070dbSScott Long 		 * We update the cidx only if we've caught up with the pidx, the
1444c7070dbSScott Long 		 * real cidx is getting too far ahead of the one visible to
1454c7070dbSScott Long 		 * everyone else, or we have exceeded our budget.
1464c7070dbSScott Long 		 */
1474c7070dbSScott Long 		if (cidx != pidx && pending < 64 && total < budget)
1484c7070dbSScott Long 			continue;
1494c7070dbSScott Long 
1504c7070dbSScott Long 		os.state = ns.state = r->state;
1514c7070dbSScott Long 		ns.cidx = cidx;
1524c7070dbSScott Long 		ns.flags = state_to_flags(ns, total >= budget);
1534c7070dbSScott Long 		r->state = ns.state;
1544c7070dbSScott Long 
1554c7070dbSScott Long 		if (ns.flags == ABDICATED)
1564c7070dbSScott Long 			counter_u64_add(r->abdications, 1);
1574c7070dbSScott Long 		if (ns.flags != BUSY) {
1584c7070dbSScott Long 			/* Wrong loop exit if we're going to stall. */
1594c7070dbSScott Long 			MPASS(ns.flags != STALLED);
1604c7070dbSScott Long 			if (prev == STALLED) {
1614c7070dbSScott Long 				MPASS(total > 0);
1624c7070dbSScott Long 				counter_u64_add(r->restarts, 1);
1634c7070dbSScott Long 			}
1644c7070dbSScott Long 			break;
1654c7070dbSScott Long 		}
1664c7070dbSScott Long 
1674c7070dbSScott Long 		/*
1684c7070dbSScott Long 		 * The acquire style atomic above guarantees visibility of items
1694c7070dbSScott Long 		 * associated with any pidx change that we notice here.
1704c7070dbSScott Long 		 */
1714c7070dbSScott Long 		pidx = ns.pidx_tail;
1724c7070dbSScott Long 		pending = 0;
1734c7070dbSScott Long 	}
1744c7070dbSScott Long }
1754c7070dbSScott Long #else
1764c7070dbSScott Long /*
1774c7070dbSScott Long  * Caller passes in a state, with a guarantee that there is work to do and that
1784c7070dbSScott Long  * all items up to the pidx_tail in the state are visible.
1794c7070dbSScott Long  */
1804c7070dbSScott Long static void
1814c7070dbSScott Long drain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
1824c7070dbSScott Long {
1834c7070dbSScott Long 	union ring_state ns;
1844c7070dbSScott Long 	int n, pending, total;
1854c7070dbSScott Long 	uint16_t cidx = os.cidx;
1864c7070dbSScott Long 	uint16_t pidx = os.pidx_tail;
1874c7070dbSScott Long 
1884c7070dbSScott Long 	MPASS(os.flags == BUSY);
1894c7070dbSScott Long 	MPASS(cidx != pidx);
1904c7070dbSScott Long 
1914c7070dbSScott Long 	if (prev == IDLE)
1924c7070dbSScott Long 		counter_u64_add(r->starts, 1);
1934c7070dbSScott Long 	pending = 0;
1944c7070dbSScott Long 	total = 0;
1954c7070dbSScott Long 
1964c7070dbSScott Long 	while (cidx != pidx) {
1974c7070dbSScott Long 
1984c7070dbSScott Long 		/* Items from cidx to pidx are available for consumption. */
1994c7070dbSScott Long 		n = r->drain(r, cidx, pidx);
2004c7070dbSScott Long 		if (n == 0) {
2014c7070dbSScott Long 			critical_enter();
2024c7070dbSScott Long 			do {
2034c7070dbSScott Long 				os.state = ns.state = r->state;
2044c7070dbSScott Long 				ns.cidx = cidx;
2054c7070dbSScott Long 				ns.flags = STALLED;
2064c7070dbSScott Long 			} while (atomic_cmpset_64(&r->state, os.state,
2074c7070dbSScott Long 			    ns.state) == 0);
2084c7070dbSScott Long 			critical_exit();
2094c7070dbSScott Long 			if (prev != STALLED)
2104c7070dbSScott Long 				counter_u64_add(r->stalls, 1);
2114c7070dbSScott Long 			else if (total > 0) {
2124c7070dbSScott Long 				counter_u64_add(r->restarts, 1);
2134c7070dbSScott Long 				counter_u64_add(r->stalls, 1);
2144c7070dbSScott Long 			}
2154c7070dbSScott Long 			break;
2164c7070dbSScott Long 		}
2174c7070dbSScott Long 		cidx = increment_idx(r, cidx, n);
2184c7070dbSScott Long 		pending += n;
2194c7070dbSScott Long 		total += n;
2204c7070dbSScott Long 
2214c7070dbSScott Long 		/*
2224c7070dbSScott Long 		 * We update the cidx only if we've caught up with the pidx, the
2234c7070dbSScott Long 		 * real cidx is getting too far ahead of the one visible to
2244c7070dbSScott Long 		 * everyone else, or we have exceeded our budget.
2254c7070dbSScott Long 		 */
2264c7070dbSScott Long 		if (cidx != pidx && pending < 64 && total < budget)
2274c7070dbSScott Long 			continue;
2284c7070dbSScott Long 		critical_enter();
2294c7070dbSScott Long 		do {
2304c7070dbSScott Long 			os.state = ns.state = r->state;
2314c7070dbSScott Long 			ns.cidx = cidx;
2324c7070dbSScott Long 			ns.flags = state_to_flags(ns, total >= budget);
2334c7070dbSScott Long 		} while (atomic_cmpset_acq_64(&r->state, os.state, ns.state) == 0);
2344c7070dbSScott Long 		critical_exit();
2354c7070dbSScott Long 
2364c7070dbSScott Long 		if (ns.flags == ABDICATED)
2374c7070dbSScott Long 			counter_u64_add(r->abdications, 1);
2384c7070dbSScott Long 		if (ns.flags != BUSY) {
2394c7070dbSScott Long 			/* Wrong loop exit if we're going to stall. */
2404c7070dbSScott Long 			MPASS(ns.flags != STALLED);
2414c7070dbSScott Long 			if (prev == STALLED) {
2424c7070dbSScott Long 				MPASS(total > 0);
2434c7070dbSScott Long 				counter_u64_add(r->restarts, 1);
2444c7070dbSScott Long 			}
2454c7070dbSScott Long 			break;
2464c7070dbSScott Long 		}
2474c7070dbSScott Long 
2484c7070dbSScott Long 		/*
2494c7070dbSScott Long 		 * The acquire style atomic above guarantees visibility of items
2504c7070dbSScott Long 		 * associated with any pidx change that we notice here.
2514c7070dbSScott Long 		 */
2524c7070dbSScott Long 		pidx = ns.pidx_tail;
2534c7070dbSScott Long 		pending = 0;
2544c7070dbSScott Long 	}
2554c7070dbSScott Long }
2564c7070dbSScott Long #endif
2574c7070dbSScott Long 
2584c7070dbSScott Long int
2594c7070dbSScott Long ifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain,
2604c7070dbSScott Long     mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags)
2614c7070dbSScott Long {
2624c7070dbSScott Long 	struct ifmp_ring *r;
2634c7070dbSScott Long 
2644c7070dbSScott Long 	/* All idx are 16b so size can be 65536 at most */
2654c7070dbSScott Long 	if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
2664c7070dbSScott Long 	    can_drain == NULL)
2674c7070dbSScott Long 		return (EINVAL);
2684c7070dbSScott Long 	*pr = NULL;
2694c7070dbSScott Long 	flags &= M_NOWAIT | M_WAITOK;
2704c7070dbSScott Long 	MPASS(flags != 0);
2714c7070dbSScott Long 
2724c7070dbSScott Long 	r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO);
2734c7070dbSScott Long 	if (r == NULL)
2744c7070dbSScott Long 		return (ENOMEM);
2754c7070dbSScott Long 	r->size = size;
2764c7070dbSScott Long 	r->cookie = cookie;
2774c7070dbSScott Long 	r->mt = mt;
2784c7070dbSScott Long 	r->drain = drain;
2794c7070dbSScott Long 	r->can_drain = can_drain;
2804c7070dbSScott Long 	r->enqueues = counter_u64_alloc(flags);
2814c7070dbSScott Long 	r->drops = counter_u64_alloc(flags);
2824c7070dbSScott Long 	r->starts = counter_u64_alloc(flags);
2834c7070dbSScott Long 	r->stalls = counter_u64_alloc(flags);
2844c7070dbSScott Long 	r->restarts = counter_u64_alloc(flags);
2854c7070dbSScott Long 	r->abdications = counter_u64_alloc(flags);
2864c7070dbSScott Long 	if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL ||
2874c7070dbSScott Long 	    r->stalls == NULL || r->restarts == NULL ||
2884c7070dbSScott Long 	    r->abdications == NULL) {
2894c7070dbSScott Long 		ifmp_ring_free(r);
2904c7070dbSScott Long 		return (ENOMEM);
2914c7070dbSScott Long 	}
2924c7070dbSScott Long 
2934c7070dbSScott Long 	*pr = r;
2944c7070dbSScott Long #ifdef NO_64BIT_ATOMICS
2954c7070dbSScott Long 	mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF);
2964c7070dbSScott Long #endif
2974c7070dbSScott Long 	return (0);
2984c7070dbSScott Long }
2994c7070dbSScott Long 
3004c7070dbSScott Long void
3014c7070dbSScott Long ifmp_ring_free(struct ifmp_ring *r)
3024c7070dbSScott Long {
3034c7070dbSScott Long 
3044c7070dbSScott Long 	if (r == NULL)
3054c7070dbSScott Long 		return;
3064c7070dbSScott Long 
3074c7070dbSScott Long 	if (r->enqueues != NULL)
3084c7070dbSScott Long 		counter_u64_free(r->enqueues);
3094c7070dbSScott Long 	if (r->drops != NULL)
3104c7070dbSScott Long 		counter_u64_free(r->drops);
3114c7070dbSScott Long 	if (r->starts != NULL)
3124c7070dbSScott Long 		counter_u64_free(r->starts);
3134c7070dbSScott Long 	if (r->stalls != NULL)
3144c7070dbSScott Long 		counter_u64_free(r->stalls);
3154c7070dbSScott Long 	if (r->restarts != NULL)
3164c7070dbSScott Long 		counter_u64_free(r->restarts);
3174c7070dbSScott Long 	if (r->abdications != NULL)
3184c7070dbSScott Long 		counter_u64_free(r->abdications);
3194c7070dbSScott Long 
3204c7070dbSScott Long 	free(r, r->mt);
3214c7070dbSScott Long }
3224c7070dbSScott Long 
3234c7070dbSScott Long /*
3244c7070dbSScott Long  * Enqueue n items and maybe drain the ring for some time.
3254c7070dbSScott Long  *
3264c7070dbSScott Long  * Returns an errno.
3274c7070dbSScott Long  */
3284c7070dbSScott Long #ifdef NO_64BIT_ATOMICS
3294c7070dbSScott Long int
3304c7070dbSScott Long ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget)
3314c7070dbSScott Long {
3324c7070dbSScott Long 	union ring_state os, ns;
3334c7070dbSScott Long 	uint16_t pidx_start, pidx_stop;
3344c7070dbSScott Long 	int i;
3354c7070dbSScott Long 
3364c7070dbSScott Long 	MPASS(items != NULL);
3374c7070dbSScott Long 	MPASS(n > 0);
3384c7070dbSScott Long 
3394c7070dbSScott Long 	mtx_lock(&r->lock);
3404c7070dbSScott Long 	/*
3414c7070dbSScott Long 	 * Reserve room for the new items.  Our reservation, if successful, is
3424c7070dbSScott Long 	 * from 'pidx_start' to 'pidx_stop'.
3434c7070dbSScott Long 	 */
3444c7070dbSScott Long 	os.state = r->state;
3454c7070dbSScott Long 	if (n >= space_available(r, os)) {
3464c7070dbSScott Long 		counter_u64_add(r->drops, n);
3474c7070dbSScott Long 		MPASS(os.flags != IDLE);
3484c7070dbSScott Long 		if (os.flags == STALLED)
3494c7070dbSScott Long 			ifmp_ring_check_drainage(r, 0);
3504c7070dbSScott Long 		return (ENOBUFS);
3514c7070dbSScott Long 	}
3524c7070dbSScott Long 	ns.state = os.state;
3534c7070dbSScott Long 	ns.pidx_head = increment_idx(r, os.pidx_head, n);
3544c7070dbSScott Long 	r->state = ns.state;
3554c7070dbSScott Long 	pidx_start = os.pidx_head;
3564c7070dbSScott Long 	pidx_stop = ns.pidx_head;
3574c7070dbSScott Long 
3584c7070dbSScott Long 	/*
3594c7070dbSScott Long 	 * Wait for other producers who got in ahead of us to enqueue their
3604c7070dbSScott Long 	 * items, one producer at a time.  It is our turn when the ring's
361*efc457e1SPedro F. Giffuni 	 * pidx_tail reaches the beginning of our reservation (pidx_start).
3624c7070dbSScott Long 	 */
3634c7070dbSScott Long 	while (ns.pidx_tail != pidx_start) {
3644c7070dbSScott Long 		cpu_spinwait();
3654c7070dbSScott Long 		ns.state = r->state;
3664c7070dbSScott Long 	}
3674c7070dbSScott Long 
3684c7070dbSScott Long 	/* Now it is our turn to fill up the area we reserved earlier. */
3694c7070dbSScott Long 	i = pidx_start;
3704c7070dbSScott Long 	do {
3714c7070dbSScott Long 		r->items[i] = *items++;
3724c7070dbSScott Long 		if (__predict_false(++i == r->size))
3734c7070dbSScott Long 			i = 0;
3744c7070dbSScott Long 	} while (i != pidx_stop);
3754c7070dbSScott Long 
3764c7070dbSScott Long 	/*
3774c7070dbSScott Long 	 * Update the ring's pidx_tail.  The release style atomic guarantees
3784c7070dbSScott Long 	 * that the items are visible to any thread that sees the updated pidx.
3794c7070dbSScott Long 	 */
3804c7070dbSScott Long 	os.state = ns.state = r->state;
3814c7070dbSScott Long 	ns.pidx_tail = pidx_stop;
3824c7070dbSScott Long 	ns.flags = BUSY;
3834c7070dbSScott Long 	r->state = ns.state;
3844c7070dbSScott Long 	counter_u64_add(r->enqueues, n);
3854c7070dbSScott Long 
3864c7070dbSScott Long 	/*
3874c7070dbSScott Long 	 * Turn into a consumer if some other thread isn't active as a consumer
3884c7070dbSScott Long 	 * already.
3894c7070dbSScott Long 	 */
3904c7070dbSScott Long 	if (os.flags != BUSY)
3914c7070dbSScott Long 		drain_ring_locked(r, ns, os.flags, budget);
3924c7070dbSScott Long 
3934c7070dbSScott Long 	mtx_unlock(&r->lock);
3944c7070dbSScott Long 	return (0);
3954c7070dbSScott Long }
3964c7070dbSScott Long 
3974c7070dbSScott Long #else
3984c7070dbSScott Long int
3994c7070dbSScott Long ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget)
4004c7070dbSScott Long {
4014c7070dbSScott Long 	union ring_state os, ns;
4024c7070dbSScott Long 	uint16_t pidx_start, pidx_stop;
4034c7070dbSScott Long 	int i;
4044c7070dbSScott Long 
4054c7070dbSScott Long 	MPASS(items != NULL);
4064c7070dbSScott Long 	MPASS(n > 0);
4074c7070dbSScott Long 
4084c7070dbSScott Long 	/*
4094c7070dbSScott Long 	 * Reserve room for the new items.  Our reservation, if successful, is
4104c7070dbSScott Long 	 * from 'pidx_start' to 'pidx_stop'.
4114c7070dbSScott Long 	 */
4124c7070dbSScott Long 	for (;;) {
4134c7070dbSScott Long 		os.state = r->state;
4144c7070dbSScott Long 		if (n >= space_available(r, os)) {
4154c7070dbSScott Long 			counter_u64_add(r->drops, n);
4164c7070dbSScott Long 			MPASS(os.flags != IDLE);
4174c7070dbSScott Long 			if (os.flags == STALLED)
4184c7070dbSScott Long 				ifmp_ring_check_drainage(r, 0);
4194c7070dbSScott Long 			return (ENOBUFS);
4204c7070dbSScott Long 		}
4214c7070dbSScott Long 		ns.state = os.state;
4224c7070dbSScott Long 		ns.pidx_head = increment_idx(r, os.pidx_head, n);
4234c7070dbSScott Long 		critical_enter();
4244c7070dbSScott Long 		if (atomic_cmpset_64(&r->state, os.state, ns.state))
4254c7070dbSScott Long 			break;
4264c7070dbSScott Long 		critical_exit();
4274c7070dbSScott Long 		cpu_spinwait();
4284c7070dbSScott Long 	}
4294c7070dbSScott Long 	pidx_start = os.pidx_head;
4304c7070dbSScott Long 	pidx_stop = ns.pidx_head;
4314c7070dbSScott Long 
4324c7070dbSScott Long 	/*
4334c7070dbSScott Long 	 * Wait for other producers who got in ahead of us to enqueue their
4344c7070dbSScott Long 	 * items, one producer at a time.  It is our turn when the ring's
435*efc457e1SPedro F. Giffuni 	 * pidx_tail reaches the beginning of our reservation (pidx_start).
4364c7070dbSScott Long 	 */
4374c7070dbSScott Long 	while (ns.pidx_tail != pidx_start) {
4384c7070dbSScott Long 		cpu_spinwait();
4394c7070dbSScott Long 		ns.state = r->state;
4404c7070dbSScott Long 	}
4414c7070dbSScott Long 
4424c7070dbSScott Long 	/* Now it is our turn to fill up the area we reserved earlier. */
4434c7070dbSScott Long 	i = pidx_start;
4444c7070dbSScott Long 	do {
4454c7070dbSScott Long 		r->items[i] = *items++;
4464c7070dbSScott Long 		if (__predict_false(++i == r->size))
4474c7070dbSScott Long 			i = 0;
4484c7070dbSScott Long 	} while (i != pidx_stop);
4494c7070dbSScott Long 
4504c7070dbSScott Long 	/*
4514c7070dbSScott Long 	 * Update the ring's pidx_tail.  The release style atomic guarantees
4524c7070dbSScott Long 	 * that the items are visible to any thread that sees the updated pidx.
4534c7070dbSScott Long 	 */
4544c7070dbSScott Long 	do {
4554c7070dbSScott Long 		os.state = ns.state = r->state;
4564c7070dbSScott Long 		ns.pidx_tail = pidx_stop;
4574c7070dbSScott Long 		ns.flags = BUSY;
4584c7070dbSScott Long 	} while (atomic_cmpset_rel_64(&r->state, os.state, ns.state) == 0);
4594c7070dbSScott Long 	critical_exit();
4604c7070dbSScott Long 	counter_u64_add(r->enqueues, n);
4614c7070dbSScott Long 
4624c7070dbSScott Long 	/*
4634c7070dbSScott Long 	 * Turn into a consumer if some other thread isn't active as a consumer
4644c7070dbSScott Long 	 * already.
4654c7070dbSScott Long 	 */
4664c7070dbSScott Long 	if (os.flags != BUSY)
4674c7070dbSScott Long 		drain_ring_lockless(r, ns, os.flags, budget);
4684c7070dbSScott Long 
4694c7070dbSScott Long 	return (0);
4704c7070dbSScott Long }
4714c7070dbSScott Long #endif
4724c7070dbSScott Long 
4734c7070dbSScott Long void
4744c7070dbSScott Long ifmp_ring_check_drainage(struct ifmp_ring *r, int budget)
4754c7070dbSScott Long {
4764c7070dbSScott Long 	union ring_state os, ns;
4774c7070dbSScott Long 
4784c7070dbSScott Long 	os.state = r->state;
4794c7070dbSScott Long 	if (os.flags != STALLED || os.pidx_head != os.pidx_tail || r->can_drain(r) == 0)
4804c7070dbSScott Long 		return;
4814c7070dbSScott Long 
4824c7070dbSScott Long 	MPASS(os.cidx != os.pidx_tail);	/* implied by STALLED */
4834c7070dbSScott Long 	ns.state = os.state;
4844c7070dbSScott Long 	ns.flags = BUSY;
4854c7070dbSScott Long 
4864c7070dbSScott Long 
4874c7070dbSScott Long #ifdef NO_64BIT_ATOMICS
4884c7070dbSScott Long 	mtx_lock(&r->lock);
4894c7070dbSScott Long 	if (r->state != os.state) {
4904c7070dbSScott Long 		mtx_unlock(&r->lock);
4914c7070dbSScott Long 		return;
4924c7070dbSScott Long 	}
4934c7070dbSScott Long 	r->state = ns.state;
4944c7070dbSScott Long 	drain_ring_locked(r, ns, os.flags, budget);
4954c7070dbSScott Long 	mtx_unlock(&r->lock);
4964c7070dbSScott Long #else
4974c7070dbSScott Long 	/*
4984c7070dbSScott Long 	 * The acquire style atomic guarantees visibility of items associated
4994c7070dbSScott Long 	 * with the pidx that we read here.
5004c7070dbSScott Long 	 */
5014c7070dbSScott Long 	if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state))
5024c7070dbSScott Long 		return;
5034c7070dbSScott Long 
5044c7070dbSScott Long 
5054c7070dbSScott Long 	drain_ring_lockless(r, ns, os.flags, budget);
5064c7070dbSScott Long #endif
5074c7070dbSScott Long }
5084c7070dbSScott Long 
5094c7070dbSScott Long void
5104c7070dbSScott Long ifmp_ring_reset_stats(struct ifmp_ring *r)
5114c7070dbSScott Long {
5124c7070dbSScott Long 
5134c7070dbSScott Long 	counter_u64_zero(r->enqueues);
5144c7070dbSScott Long 	counter_u64_zero(r->drops);
5154c7070dbSScott Long 	counter_u64_zero(r->starts);
5164c7070dbSScott Long 	counter_u64_zero(r->stalls);
5174c7070dbSScott Long 	counter_u64_zero(r->restarts);
5184c7070dbSScott Long 	counter_u64_zero(r->abdications);
5194c7070dbSScott Long }
5204c7070dbSScott Long 
5214c7070dbSScott Long int
5224c7070dbSScott Long ifmp_ring_is_idle(struct ifmp_ring *r)
5234c7070dbSScott Long {
5244c7070dbSScott Long 	union ring_state s;
5254c7070dbSScott Long 
5264c7070dbSScott Long 	s.state = r->state;
5274c7070dbSScott Long 	if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
5284c7070dbSScott Long 	    s.flags == IDLE)
5294c7070dbSScott Long 		return (1);
5304c7070dbSScott Long 
5314c7070dbSScott Long 	return (0);
5324c7070dbSScott Long }
5334c7070dbSScott Long 
5344c7070dbSScott Long int
5354c7070dbSScott Long ifmp_ring_is_stalled(struct ifmp_ring *r)
5364c7070dbSScott Long {
5374c7070dbSScott Long 	union ring_state s;
5384c7070dbSScott Long 
5394c7070dbSScott Long 	s.state = r->state;
5404c7070dbSScott Long 	if (s.pidx_head == s.pidx_tail && s.flags == STALLED)
5414c7070dbSScott Long 		return (1);
5424c7070dbSScott Long 
5434c7070dbSScott Long 	return (0);
5444c7070dbSScott Long }
545