14c7070dbSScott Long /*-
24c7070dbSScott Long * Copyright (c) 2014 Chelsio Communications, Inc.
34c7070dbSScott Long * All rights reserved.
44c7070dbSScott Long * Written by: Navdeep Parhar <np@FreeBSD.org>
54c7070dbSScott Long *
64c7070dbSScott Long * Redistribution and use in source and binary forms, with or without
74c7070dbSScott Long * modification, are permitted provided that the following conditions
84c7070dbSScott Long * are met:
94c7070dbSScott Long * 1. Redistributions of source code must retain the above copyright
104c7070dbSScott Long * notice, this list of conditions and the following disclaimer.
114c7070dbSScott Long * 2. Redistributions in binary form must reproduce the above copyright
124c7070dbSScott Long * notice, this list of conditions and the following disclaimer in the
134c7070dbSScott Long * documentation and/or other materials provided with the distribution.
144c7070dbSScott Long *
154c7070dbSScott Long * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
164c7070dbSScott Long * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
174c7070dbSScott Long * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
184c7070dbSScott Long * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
194c7070dbSScott Long * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
204c7070dbSScott Long * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
214c7070dbSScott Long * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
224c7070dbSScott Long * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
234c7070dbSScott Long * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
244c7070dbSScott Long * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
254c7070dbSScott Long * SUCH DAMAGE.
264c7070dbSScott Long */
274c7070dbSScott Long
284c7070dbSScott Long #include <sys/types.h>
294c7070dbSScott Long #include <sys/param.h>
304c7070dbSScott Long #include <sys/systm.h>
314c7070dbSScott Long #include <sys/counter.h>
324c7070dbSScott Long #include <sys/lock.h>
334c7070dbSScott Long #include <sys/mutex.h>
344c7070dbSScott Long #include <sys/malloc.h>
354c7070dbSScott Long #include <machine/cpu.h>
36fc614c29SScott Long #include <net/mp_ring.h>
37fc614c29SScott Long
384c7070dbSScott Long union ring_state {
394c7070dbSScott Long struct {
404c7070dbSScott Long uint16_t pidx_head;
414c7070dbSScott Long uint16_t pidx_tail;
424c7070dbSScott Long uint16_t cidx;
434c7070dbSScott Long uint16_t flags;
444c7070dbSScott Long };
454c7070dbSScott Long uint64_t state;
464c7070dbSScott Long };
474c7070dbSScott Long
484c7070dbSScott Long enum {
494c7070dbSScott Long IDLE = 0, /* consumer ran to completion, nothing more to do. */
504c7070dbSScott Long BUSY, /* consumer is running already, or will be shortly. */
514c7070dbSScott Long STALLED, /* consumer stopped due to lack of resources. */
524c7070dbSScott Long ABDICATED, /* consumer stopped even though there was work to be
534c7070dbSScott Long done because it wants another thread to take over. */
544c7070dbSScott Long };
554c7070dbSScott Long
564c7070dbSScott Long static inline uint16_t
space_available(struct ifmp_ring * r,union ring_state s)574c7070dbSScott Long space_available(struct ifmp_ring *r, union ring_state s)
584c7070dbSScott Long {
594c7070dbSScott Long uint16_t x = r->size - 1;
604c7070dbSScott Long
614c7070dbSScott Long if (s.cidx == s.pidx_head)
624c7070dbSScott Long return (x);
634c7070dbSScott Long else if (s.cidx > s.pidx_head)
644c7070dbSScott Long return (s.cidx - s.pidx_head - 1);
654c7070dbSScott Long else
664c7070dbSScott Long return (x - s.pidx_head + s.cidx);
674c7070dbSScott Long }
684c7070dbSScott Long
694c7070dbSScott Long static inline uint16_t
increment_idx(struct ifmp_ring * r,uint16_t idx,uint16_t n)704c7070dbSScott Long increment_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n)
714c7070dbSScott Long {
724c7070dbSScott Long int x = r->size - idx;
734c7070dbSScott Long
744c7070dbSScott Long MPASS(x > 0);
754c7070dbSScott Long return (x > n ? idx + n : n - x);
764c7070dbSScott Long }
774c7070dbSScott Long
784c7070dbSScott Long /* Consumer is about to update the ring's state to s */
794c7070dbSScott Long static inline uint16_t
state_to_flags(union ring_state s,int abdicate)804c7070dbSScott Long state_to_flags(union ring_state s, int abdicate)
814c7070dbSScott Long {
824c7070dbSScott Long
834c7070dbSScott Long if (s.cidx == s.pidx_tail)
844c7070dbSScott Long return (IDLE);
854c7070dbSScott Long else if (abdicate && s.pidx_tail != s.pidx_head)
864c7070dbSScott Long return (ABDICATED);
874c7070dbSScott Long
884c7070dbSScott Long return (BUSY);
894c7070dbSScott Long }
904c7070dbSScott Long
915881181dSMatt Macy #ifdef MP_RING_NO_64BIT_ATOMICS
924c7070dbSScott Long static void
drain_ring_locked(struct ifmp_ring * r,union ring_state os,uint16_t prev,int budget)934c7070dbSScott Long drain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
944c7070dbSScott Long {
954c7070dbSScott Long union ring_state ns;
964c7070dbSScott Long int n, pending, total;
974c7070dbSScott Long uint16_t cidx = os.cidx;
984c7070dbSScott Long uint16_t pidx = os.pidx_tail;
994c7070dbSScott Long
1004c7070dbSScott Long MPASS(os.flags == BUSY);
1014c7070dbSScott Long MPASS(cidx != pidx);
1024c7070dbSScott Long
1034c7070dbSScott Long if (prev == IDLE)
1044c7070dbSScott Long counter_u64_add(r->starts, 1);
1054c7070dbSScott Long pending = 0;
1064c7070dbSScott Long total = 0;
1074c7070dbSScott Long
1084c7070dbSScott Long while (cidx != pidx) {
1094c7070dbSScott Long /* Items from cidx to pidx are available for consumption. */
1104c7070dbSScott Long n = r->drain(r, cidx, pidx);
1114c7070dbSScott Long if (n == 0) {
1124c7070dbSScott Long os.state = ns.state = r->state;
1134c7070dbSScott Long ns.cidx = cidx;
1144c7070dbSScott Long ns.flags = STALLED;
1154c7070dbSScott Long r->state = ns.state;
1164c7070dbSScott Long if (prev != STALLED)
1174c7070dbSScott Long counter_u64_add(r->stalls, 1);
1184c7070dbSScott Long else if (total > 0) {
1194c7070dbSScott Long counter_u64_add(r->restarts, 1);
1204c7070dbSScott Long counter_u64_add(r->stalls, 1);
1214c7070dbSScott Long }
1224c7070dbSScott Long break;
1234c7070dbSScott Long }
1244c7070dbSScott Long cidx = increment_idx(r, cidx, n);
1254c7070dbSScott Long pending += n;
1264c7070dbSScott Long total += n;
1274c7070dbSScott Long
1284c7070dbSScott Long /*
1294c7070dbSScott Long * We update the cidx only if we've caught up with the pidx, the
1304c7070dbSScott Long * real cidx is getting too far ahead of the one visible to
1314c7070dbSScott Long * everyone else, or we have exceeded our budget.
1324c7070dbSScott Long */
1334c7070dbSScott Long if (cidx != pidx && pending < 64 && total < budget)
1344c7070dbSScott Long continue;
1354c7070dbSScott Long
1364c7070dbSScott Long os.state = ns.state = r->state;
1374c7070dbSScott Long ns.cidx = cidx;
1384c7070dbSScott Long ns.flags = state_to_flags(ns, total >= budget);
1394c7070dbSScott Long r->state = ns.state;
1404c7070dbSScott Long
1414c7070dbSScott Long if (ns.flags == ABDICATED)
1424c7070dbSScott Long counter_u64_add(r->abdications, 1);
1434c7070dbSScott Long if (ns.flags != BUSY) {
1444c7070dbSScott Long /* Wrong loop exit if we're going to stall. */
1454c7070dbSScott Long MPASS(ns.flags != STALLED);
1464c7070dbSScott Long if (prev == STALLED) {
1474c7070dbSScott Long MPASS(total > 0);
1484c7070dbSScott Long counter_u64_add(r->restarts, 1);
1494c7070dbSScott Long }
1504c7070dbSScott Long break;
1514c7070dbSScott Long }
1524c7070dbSScott Long
1534c7070dbSScott Long /*
1544c7070dbSScott Long * The acquire style atomic above guarantees visibility of items
1554c7070dbSScott Long * associated with any pidx change that we notice here.
1564c7070dbSScott Long */
1574c7070dbSScott Long pidx = ns.pidx_tail;
1584c7070dbSScott Long pending = 0;
1594c7070dbSScott Long }
1604c7070dbSScott Long }
1614c7070dbSScott Long #else
1624c7070dbSScott Long /*
1634c7070dbSScott Long * Caller passes in a state, with a guarantee that there is work to do and that
1644c7070dbSScott Long * all items up to the pidx_tail in the state are visible.
1654c7070dbSScott Long */
1664c7070dbSScott Long static void
drain_ring_lockless(struct ifmp_ring * r,union ring_state os,uint16_t prev,int budget)1674c7070dbSScott Long drain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
1684c7070dbSScott Long {
1694c7070dbSScott Long union ring_state ns;
1704c7070dbSScott Long int n, pending, total;
1714c7070dbSScott Long uint16_t cidx = os.cidx;
1724c7070dbSScott Long uint16_t pidx = os.pidx_tail;
1734c7070dbSScott Long
1744c7070dbSScott Long MPASS(os.flags == BUSY);
1754c7070dbSScott Long MPASS(cidx != pidx);
1764c7070dbSScott Long
1774c7070dbSScott Long if (prev == IDLE)
1784c7070dbSScott Long counter_u64_add(r->starts, 1);
1794c7070dbSScott Long pending = 0;
1804c7070dbSScott Long total = 0;
1814c7070dbSScott Long
1824c7070dbSScott Long while (cidx != pidx) {
1834c7070dbSScott Long /* Items from cidx to pidx are available for consumption. */
1844c7070dbSScott Long n = r->drain(r, cidx, pidx);
1854c7070dbSScott Long if (n == 0) {
1864c7070dbSScott Long critical_enter();
187*14e00107SMarius Strobl os.state = r->state;
1884c7070dbSScott Long do {
189*14e00107SMarius Strobl ns.state = os.state;
1904c7070dbSScott Long ns.cidx = cidx;
1914c7070dbSScott Long ns.flags = STALLED;
192*14e00107SMarius Strobl } while (atomic_fcmpset_64(&r->state, &os.state,
1934c7070dbSScott Long ns.state) == 0);
1944c7070dbSScott Long critical_exit();
1954c7070dbSScott Long if (prev != STALLED)
1964c7070dbSScott Long counter_u64_add(r->stalls, 1);
1974c7070dbSScott Long else if (total > 0) {
1984c7070dbSScott Long counter_u64_add(r->restarts, 1);
1994c7070dbSScott Long counter_u64_add(r->stalls, 1);
2004c7070dbSScott Long }
2014c7070dbSScott Long break;
2024c7070dbSScott Long }
2034c7070dbSScott Long cidx = increment_idx(r, cidx, n);
2044c7070dbSScott Long pending += n;
2054c7070dbSScott Long total += n;
2064c7070dbSScott Long
2074c7070dbSScott Long /*
2084c7070dbSScott Long * We update the cidx only if we've caught up with the pidx, the
2094c7070dbSScott Long * real cidx is getting too far ahead of the one visible to
2104c7070dbSScott Long * everyone else, or we have exceeded our budget.
2114c7070dbSScott Long */
2124c7070dbSScott Long if (cidx != pidx && pending < 64 && total < budget)
2134c7070dbSScott Long continue;
2144c7070dbSScott Long critical_enter();
215*14e00107SMarius Strobl os.state = r->state;
216ab2e3f79SStephen Hurd do {
217*14e00107SMarius Strobl ns.state = os.state;
2184c7070dbSScott Long ns.cidx = cidx;
2194c7070dbSScott Long ns.flags = state_to_flags(ns, total >= budget);
220*14e00107SMarius Strobl } while (atomic_fcmpset_acq_64(&r->state, &os.state,
221*14e00107SMarius Strobl ns.state) == 0);
2224c7070dbSScott Long critical_exit();
2234c7070dbSScott Long
2244c7070dbSScott Long if (ns.flags == ABDICATED)
2254c7070dbSScott Long counter_u64_add(r->abdications, 1);
2264c7070dbSScott Long if (ns.flags != BUSY) {
2274c7070dbSScott Long /* Wrong loop exit if we're going to stall. */
2284c7070dbSScott Long MPASS(ns.flags != STALLED);
2294c7070dbSScott Long if (prev == STALLED) {
2304c7070dbSScott Long MPASS(total > 0);
2314c7070dbSScott Long counter_u64_add(r->restarts, 1);
2324c7070dbSScott Long }
2334c7070dbSScott Long break;
2344c7070dbSScott Long }
2354c7070dbSScott Long
2364c7070dbSScott Long /*
2374c7070dbSScott Long * The acquire style atomic above guarantees visibility of items
2384c7070dbSScott Long * associated with any pidx change that we notice here.
2394c7070dbSScott Long */
2404c7070dbSScott Long pidx = ns.pidx_tail;
2414c7070dbSScott Long pending = 0;
2424c7070dbSScott Long }
2434c7070dbSScott Long }
2444c7070dbSScott Long #endif
2454c7070dbSScott Long
2464c7070dbSScott Long int
ifmp_ring_alloc(struct ifmp_ring ** pr,int size,void * cookie,mp_ring_drain_t drain,mp_ring_can_drain_t can_drain,struct malloc_type * mt,int flags)2474c7070dbSScott Long ifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain,
2484c7070dbSScott Long mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags)
2494c7070dbSScott Long {
2504c7070dbSScott Long struct ifmp_ring *r;
2514c7070dbSScott Long
2524c7070dbSScott Long /* All idx are 16b so size can be 65536 at most */
2534c7070dbSScott Long if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
2544c7070dbSScott Long can_drain == NULL)
2554c7070dbSScott Long return (EINVAL);
2564c7070dbSScott Long *pr = NULL;
2574c7070dbSScott Long flags &= M_NOWAIT | M_WAITOK;
2584c7070dbSScott Long MPASS(flags != 0);
2594c7070dbSScott Long
2604c7070dbSScott Long r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO);
2614c7070dbSScott Long if (r == NULL)
2624c7070dbSScott Long return (ENOMEM);
2634c7070dbSScott Long r->size = size;
2644c7070dbSScott Long r->cookie = cookie;
2654c7070dbSScott Long r->mt = mt;
2664c7070dbSScott Long r->drain = drain;
2674c7070dbSScott Long r->can_drain = can_drain;
2684c7070dbSScott Long r->enqueues = counter_u64_alloc(flags);
2694c7070dbSScott Long r->drops = counter_u64_alloc(flags);
2704c7070dbSScott Long r->starts = counter_u64_alloc(flags);
2714c7070dbSScott Long r->stalls = counter_u64_alloc(flags);
2724c7070dbSScott Long r->restarts = counter_u64_alloc(flags);
2734c7070dbSScott Long r->abdications = counter_u64_alloc(flags);
2744c7070dbSScott Long if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL ||
2754c7070dbSScott Long r->stalls == NULL || r->restarts == NULL ||
2764c7070dbSScott Long r->abdications == NULL) {
2774c7070dbSScott Long ifmp_ring_free(r);
2784c7070dbSScott Long return (ENOMEM);
2794c7070dbSScott Long }
2804c7070dbSScott Long
2814c7070dbSScott Long *pr = r;
2825881181dSMatt Macy #ifdef MP_RING_NO_64BIT_ATOMICS
2834c7070dbSScott Long mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF);
2844c7070dbSScott Long #endif
2854c7070dbSScott Long return (0);
2864c7070dbSScott Long }
2874c7070dbSScott Long
2884c7070dbSScott Long void
ifmp_ring_free(struct ifmp_ring * r)2894c7070dbSScott Long ifmp_ring_free(struct ifmp_ring *r)
2904c7070dbSScott Long {
2914c7070dbSScott Long
2924c7070dbSScott Long if (r == NULL)
2934c7070dbSScott Long return;
2944c7070dbSScott Long
2954c7070dbSScott Long if (r->enqueues != NULL)
2964c7070dbSScott Long counter_u64_free(r->enqueues);
2974c7070dbSScott Long if (r->drops != NULL)
2984c7070dbSScott Long counter_u64_free(r->drops);
2994c7070dbSScott Long if (r->starts != NULL)
3004c7070dbSScott Long counter_u64_free(r->starts);
3014c7070dbSScott Long if (r->stalls != NULL)
3024c7070dbSScott Long counter_u64_free(r->stalls);
3034c7070dbSScott Long if (r->restarts != NULL)
3044c7070dbSScott Long counter_u64_free(r->restarts);
3054c7070dbSScott Long if (r->abdications != NULL)
3064c7070dbSScott Long counter_u64_free(r->abdications);
3074c7070dbSScott Long
3084c7070dbSScott Long free(r, r->mt);
3094c7070dbSScott Long }
3104c7070dbSScott Long
3114c7070dbSScott Long /*
3124c7070dbSScott Long * Enqueue n items and maybe drain the ring for some time.
3134c7070dbSScott Long *
3144c7070dbSScott Long * Returns an errno.
3154c7070dbSScott Long */
3165881181dSMatt Macy #ifdef MP_RING_NO_64BIT_ATOMICS
3174c7070dbSScott Long int
ifmp_ring_enqueue(struct ifmp_ring * r,void ** items,int n,int budget,int abdicate)318fe51d4cdSStephen Hurd ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
3194c7070dbSScott Long {
3204c7070dbSScott Long union ring_state os, ns;
3214c7070dbSScott Long uint16_t pidx_start, pidx_stop;
3224c7070dbSScott Long int i;
3234c7070dbSScott Long
3244c7070dbSScott Long MPASS(items != NULL);
3254c7070dbSScott Long MPASS(n > 0);
3264c7070dbSScott Long
3274c7070dbSScott Long mtx_lock(&r->lock);
3284c7070dbSScott Long /*
3294c7070dbSScott Long * Reserve room for the new items. Our reservation, if successful, is
3304c7070dbSScott Long * from 'pidx_start' to 'pidx_stop'.
3314c7070dbSScott Long */
3324c7070dbSScott Long os.state = r->state;
3334c7070dbSScott Long if (n >= space_available(r, os)) {
3344c7070dbSScott Long counter_u64_add(r->drops, n);
3354c7070dbSScott Long MPASS(os.flags != IDLE);
336e335651eSMatt Macy mtx_unlock(&r->lock);
3374c7070dbSScott Long if (os.flags == STALLED)
3384c7070dbSScott Long ifmp_ring_check_drainage(r, 0);
3394c7070dbSScott Long return (ENOBUFS);
3404c7070dbSScott Long }
3414c7070dbSScott Long ns.state = os.state;
3424c7070dbSScott Long ns.pidx_head = increment_idx(r, os.pidx_head, n);
3434c7070dbSScott Long r->state = ns.state;
3444c7070dbSScott Long pidx_start = os.pidx_head;
3454c7070dbSScott Long pidx_stop = ns.pidx_head;
3464c7070dbSScott Long
3474c7070dbSScott Long /*
3484c7070dbSScott Long * Wait for other producers who got in ahead of us to enqueue their
3494c7070dbSScott Long * items, one producer at a time. It is our turn when the ring's
350efc457e1SPedro F. Giffuni * pidx_tail reaches the beginning of our reservation (pidx_start).
3514c7070dbSScott Long */
3524c7070dbSScott Long while (ns.pidx_tail != pidx_start) {
3534c7070dbSScott Long cpu_spinwait();
3544c7070dbSScott Long ns.state = r->state;
3554c7070dbSScott Long }
3564c7070dbSScott Long
3574c7070dbSScott Long /* Now it is our turn to fill up the area we reserved earlier. */
3584c7070dbSScott Long i = pidx_start;
3594c7070dbSScott Long do {
3604c7070dbSScott Long r->items[i] = *items++;
3614c7070dbSScott Long if (__predict_false(++i == r->size))
3624c7070dbSScott Long i = 0;
3634c7070dbSScott Long } while (i != pidx_stop);
3644c7070dbSScott Long
3654c7070dbSScott Long /*
3664c7070dbSScott Long * Update the ring's pidx_tail. The release style atomic guarantees
3674c7070dbSScott Long * that the items are visible to any thread that sees the updated pidx.
3684c7070dbSScott Long */
3694c7070dbSScott Long os.state = ns.state = r->state;
3704c7070dbSScott Long ns.pidx_tail = pidx_stop;
371fe51d4cdSStephen Hurd if (abdicate) {
372fe51d4cdSStephen Hurd if (os.flags == IDLE)
373fe51d4cdSStephen Hurd ns.flags = ABDICATED;
374*14e00107SMarius Strobl } else
3754c7070dbSScott Long ns.flags = BUSY;
3764c7070dbSScott Long r->state = ns.state;
3774c7070dbSScott Long counter_u64_add(r->enqueues, n);
3784c7070dbSScott Long
379fe51d4cdSStephen Hurd if (!abdicate) {
3804c7070dbSScott Long /*
3814c7070dbSScott Long * Turn into a consumer if some other thread isn't active as a consumer
3824c7070dbSScott Long * already.
3834c7070dbSScott Long */
3844c7070dbSScott Long if (os.flags != BUSY)
3854c7070dbSScott Long drain_ring_locked(r, ns, os.flags, budget);
386fe51d4cdSStephen Hurd }
3874c7070dbSScott Long
3884c7070dbSScott Long mtx_unlock(&r->lock);
3894c7070dbSScott Long return (0);
3904c7070dbSScott Long }
3914c7070dbSScott Long #else
3924c7070dbSScott Long int
ifmp_ring_enqueue(struct ifmp_ring * r,void ** items,int n,int budget,int abdicate)393fe51d4cdSStephen Hurd ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
3944c7070dbSScott Long {
3954c7070dbSScott Long union ring_state os, ns;
3964c7070dbSScott Long uint16_t pidx_start, pidx_stop;
3974c7070dbSScott Long int i;
3984c7070dbSScott Long
3994c7070dbSScott Long MPASS(items != NULL);
4004c7070dbSScott Long MPASS(n > 0);
4014c7070dbSScott Long
4024c7070dbSScott Long /*
4034c7070dbSScott Long * Reserve room for the new items. Our reservation, if successful, is
4044c7070dbSScott Long * from 'pidx_start' to 'pidx_stop'.
4054c7070dbSScott Long */
4064c7070dbSScott Long os.state = r->state;
407*14e00107SMarius Strobl for (;;) {
4084c7070dbSScott Long if (n >= space_available(r, os)) {
4094c7070dbSScott Long counter_u64_add(r->drops, n);
4104c7070dbSScott Long MPASS(os.flags != IDLE);
4114c7070dbSScott Long if (os.flags == STALLED)
4124c7070dbSScott Long ifmp_ring_check_drainage(r, 0);
4134c7070dbSScott Long return (ENOBUFS);
4144c7070dbSScott Long }
4154c7070dbSScott Long ns.state = os.state;
4164c7070dbSScott Long ns.pidx_head = increment_idx(r, os.pidx_head, n);
4174c7070dbSScott Long critical_enter();
418*14e00107SMarius Strobl if (atomic_fcmpset_64(&r->state, &os.state, ns.state))
4194c7070dbSScott Long break;
4204c7070dbSScott Long critical_exit();
4214c7070dbSScott Long cpu_spinwait();
4224c7070dbSScott Long }
4234c7070dbSScott Long pidx_start = os.pidx_head;
4244c7070dbSScott Long pidx_stop = ns.pidx_head;
4254c7070dbSScott Long
4264c7070dbSScott Long /*
4274c7070dbSScott Long * Wait for other producers who got in ahead of us to enqueue their
4284c7070dbSScott Long * items, one producer at a time. It is our turn when the ring's
429efc457e1SPedro F. Giffuni * pidx_tail reaches the beginning of our reservation (pidx_start).
4304c7070dbSScott Long */
4314c7070dbSScott Long while (ns.pidx_tail != pidx_start) {
4324c7070dbSScott Long cpu_spinwait();
4334c7070dbSScott Long ns.state = r->state;
4344c7070dbSScott Long }
4354c7070dbSScott Long
4364c7070dbSScott Long /* Now it is our turn to fill up the area we reserved earlier. */
4374c7070dbSScott Long i = pidx_start;
4384c7070dbSScott Long do {
4394c7070dbSScott Long r->items[i] = *items++;
4404c7070dbSScott Long if (__predict_false(++i == r->size))
4414c7070dbSScott Long i = 0;
4424c7070dbSScott Long } while (i != pidx_stop);
4434c7070dbSScott Long
4444c7070dbSScott Long /*
4454c7070dbSScott Long * Update the ring's pidx_tail. The release style atomic guarantees
4464c7070dbSScott Long * that the items are visible to any thread that sees the updated pidx.
4474c7070dbSScott Long */
448*14e00107SMarius Strobl os.state = r->state;
4494c7070dbSScott Long do {
450*14e00107SMarius Strobl ns.state = os.state;
4514c7070dbSScott Long ns.pidx_tail = pidx_stop;
452fe51d4cdSStephen Hurd if (abdicate) {
4531225d9daSStephen Hurd if (os.flags == IDLE)
4541225d9daSStephen Hurd ns.flags = ABDICATED;
455*14e00107SMarius Strobl } else
456fe51d4cdSStephen Hurd ns.flags = BUSY;
457*14e00107SMarius Strobl } while (atomic_fcmpset_rel_64(&r->state, &os.state, ns.state) == 0);
4584c7070dbSScott Long critical_exit();
4594c7070dbSScott Long counter_u64_add(r->enqueues, n);
4604c7070dbSScott Long
461fe51d4cdSStephen Hurd if (!abdicate) {
462fe51d4cdSStephen Hurd /*
463fe51d4cdSStephen Hurd * Turn into a consumer if some other thread isn't active as a consumer
464fe51d4cdSStephen Hurd * already.
465fe51d4cdSStephen Hurd */
466fe51d4cdSStephen Hurd if (os.flags != BUSY)
467fe51d4cdSStephen Hurd drain_ring_lockless(r, ns, os.flags, budget);
468fe51d4cdSStephen Hurd }
469fe51d4cdSStephen Hurd
4704c7070dbSScott Long return (0);
4714c7070dbSScott Long }
4724c7070dbSScott Long #endif
4734c7070dbSScott Long
4744c7070dbSScott Long void
ifmp_ring_check_drainage(struct ifmp_ring * r,int budget)4754c7070dbSScott Long ifmp_ring_check_drainage(struct ifmp_ring *r, int budget)
4764c7070dbSScott Long {
4774c7070dbSScott Long union ring_state os, ns;
4784c7070dbSScott Long
4794c7070dbSScott Long os.state = r->state;
4801225d9daSStephen Hurd if ((os.flags != STALLED && os.flags != ABDICATED) || // Only continue in STALLED and ABDICATED
4811225d9daSStephen Hurd os.pidx_head != os.pidx_tail || // Require work to be available
4821225d9daSStephen Hurd (os.flags != ABDICATED && r->can_drain(r) == 0)) // Can either drain, or everyone left
4834c7070dbSScott Long return;
4844c7070dbSScott Long
4854c7070dbSScott Long MPASS(os.cidx != os.pidx_tail); /* implied by STALLED */
4864c7070dbSScott Long ns.state = os.state;
4874c7070dbSScott Long ns.flags = BUSY;
4884c7070dbSScott Long
4895881181dSMatt Macy #ifdef MP_RING_NO_64BIT_ATOMICS
4904c7070dbSScott Long mtx_lock(&r->lock);
4914c7070dbSScott Long if (r->state != os.state) {
4924c7070dbSScott Long mtx_unlock(&r->lock);
4934c7070dbSScott Long return;
4944c7070dbSScott Long }
4954c7070dbSScott Long r->state = ns.state;
4964c7070dbSScott Long drain_ring_locked(r, ns, os.flags, budget);
4974c7070dbSScott Long mtx_unlock(&r->lock);
4984c7070dbSScott Long #else
4994c7070dbSScott Long /*
5004c7070dbSScott Long * The acquire style atomic guarantees visibility of items associated
5014c7070dbSScott Long * with the pidx that we read here.
5024c7070dbSScott Long */
5034c7070dbSScott Long if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state))
5044c7070dbSScott Long return;
5054c7070dbSScott Long
5064c7070dbSScott Long drain_ring_lockless(r, ns, os.flags, budget);
5074c7070dbSScott Long #endif
5084c7070dbSScott Long }
5094c7070dbSScott Long
5104c7070dbSScott Long void
ifmp_ring_reset_stats(struct ifmp_ring * r)5114c7070dbSScott Long ifmp_ring_reset_stats(struct ifmp_ring *r)
5124c7070dbSScott Long {
5134c7070dbSScott Long
5144c7070dbSScott Long counter_u64_zero(r->enqueues);
5154c7070dbSScott Long counter_u64_zero(r->drops);
5164c7070dbSScott Long counter_u64_zero(r->starts);
5174c7070dbSScott Long counter_u64_zero(r->stalls);
5184c7070dbSScott Long counter_u64_zero(r->restarts);
5194c7070dbSScott Long counter_u64_zero(r->abdications);
5204c7070dbSScott Long }
5214c7070dbSScott Long
5224c7070dbSScott Long int
ifmp_ring_is_idle(struct ifmp_ring * r)5234c7070dbSScott Long ifmp_ring_is_idle(struct ifmp_ring *r)
5244c7070dbSScott Long {
5254c7070dbSScott Long union ring_state s;
5264c7070dbSScott Long
5274c7070dbSScott Long s.state = r->state;
5284c7070dbSScott Long if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
5294c7070dbSScott Long s.flags == IDLE)
5304c7070dbSScott Long return (1);
5314c7070dbSScott Long
5324c7070dbSScott Long return (0);
5334c7070dbSScott Long }
5344c7070dbSScott Long
5354c7070dbSScott Long int
ifmp_ring_is_stalled(struct ifmp_ring * r)5364c7070dbSScott Long ifmp_ring_is_stalled(struct ifmp_ring *r)
5374c7070dbSScott Long {
5384c7070dbSScott Long union ring_state s;
5394c7070dbSScott Long
5404c7070dbSScott Long s.state = r->state;
5414c7070dbSScott Long if (s.pidx_head == s.pidx_tail && s.flags == STALLED)
5424c7070dbSScott Long return (1);
5434c7070dbSScott Long
5444c7070dbSScott Long return (0);
5454c7070dbSScott Long }
546