14c7070dbSScott Long /*- 24c7070dbSScott Long * Copyright (c) 2014 Chelsio Communications, Inc. 34c7070dbSScott Long * All rights reserved. 44c7070dbSScott Long * Written by: Navdeep Parhar <np@FreeBSD.org> 54c7070dbSScott Long * 64c7070dbSScott Long * Redistribution and use in source and binary forms, with or without 74c7070dbSScott Long * modification, are permitted provided that the following conditions 84c7070dbSScott Long * are met: 94c7070dbSScott Long * 1. Redistributions of source code must retain the above copyright 104c7070dbSScott Long * notice, this list of conditions and the following disclaimer. 114c7070dbSScott Long * 2. Redistributions in binary form must reproduce the above copyright 124c7070dbSScott Long * notice, this list of conditions and the following disclaimer in the 134c7070dbSScott Long * documentation and/or other materials provided with the distribution. 144c7070dbSScott Long * 154c7070dbSScott Long * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 164c7070dbSScott Long * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 174c7070dbSScott Long * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 184c7070dbSScott Long * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 194c7070dbSScott Long * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 204c7070dbSScott Long * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 214c7070dbSScott Long * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 224c7070dbSScott Long * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 234c7070dbSScott Long * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 244c7070dbSScott Long * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 254c7070dbSScott Long * SUCH DAMAGE. 264c7070dbSScott Long */ 274c7070dbSScott Long 284c7070dbSScott Long #include <sys/cdefs.h> 294c7070dbSScott Long __FBSDID("$FreeBSD$"); 304c7070dbSScott Long 314c7070dbSScott Long #include <sys/types.h> 324c7070dbSScott Long #include <sys/param.h> 334c7070dbSScott Long #include <sys/systm.h> 344c7070dbSScott Long #include <sys/counter.h> 354c7070dbSScott Long #include <sys/lock.h> 364c7070dbSScott Long #include <sys/mutex.h> 374c7070dbSScott Long #include <sys/malloc.h> 384c7070dbSScott Long #include <machine/cpu.h> 39fc614c29SScott Long #include <net/mp_ring.h> 40fc614c29SScott Long 414c7070dbSScott Long union ring_state { 424c7070dbSScott Long struct { 434c7070dbSScott Long uint16_t pidx_head; 444c7070dbSScott Long uint16_t pidx_tail; 454c7070dbSScott Long uint16_t cidx; 464c7070dbSScott Long uint16_t flags; 474c7070dbSScott Long }; 484c7070dbSScott Long uint64_t state; 494c7070dbSScott Long }; 504c7070dbSScott Long 514c7070dbSScott Long enum { 524c7070dbSScott Long IDLE = 0, /* consumer ran to completion, nothing more to do. */ 534c7070dbSScott Long BUSY, /* consumer is running already, or will be shortly. */ 544c7070dbSScott Long STALLED, /* consumer stopped due to lack of resources. */ 554c7070dbSScott Long ABDICATED, /* consumer stopped even though there was work to be 564c7070dbSScott Long done because it wants another thread to take over. */ 574c7070dbSScott Long }; 584c7070dbSScott Long 594c7070dbSScott Long static inline uint16_t 604c7070dbSScott Long space_available(struct ifmp_ring *r, union ring_state s) 614c7070dbSScott Long { 624c7070dbSScott Long uint16_t x = r->size - 1; 634c7070dbSScott Long 644c7070dbSScott Long if (s.cidx == s.pidx_head) 654c7070dbSScott Long return (x); 664c7070dbSScott Long else if (s.cidx > s.pidx_head) 674c7070dbSScott Long return (s.cidx - s.pidx_head - 1); 684c7070dbSScott Long else 694c7070dbSScott Long return (x - s.pidx_head + s.cidx); 704c7070dbSScott Long } 714c7070dbSScott Long 724c7070dbSScott Long static inline uint16_t 734c7070dbSScott Long increment_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n) 744c7070dbSScott Long { 754c7070dbSScott Long int x = r->size - idx; 764c7070dbSScott Long 774c7070dbSScott Long MPASS(x > 0); 784c7070dbSScott Long return (x > n ? idx + n : n - x); 794c7070dbSScott Long } 804c7070dbSScott Long 814c7070dbSScott Long /* Consumer is about to update the ring's state to s */ 824c7070dbSScott Long static inline uint16_t 834c7070dbSScott Long state_to_flags(union ring_state s, int abdicate) 844c7070dbSScott Long { 854c7070dbSScott Long 864c7070dbSScott Long if (s.cidx == s.pidx_tail) 874c7070dbSScott Long return (IDLE); 884c7070dbSScott Long else if (abdicate && s.pidx_tail != s.pidx_head) 894c7070dbSScott Long return (ABDICATED); 904c7070dbSScott Long 914c7070dbSScott Long return (BUSY); 924c7070dbSScott Long } 934c7070dbSScott Long 945881181dSMatt Macy #ifdef MP_RING_NO_64BIT_ATOMICS 954c7070dbSScott Long static void 964c7070dbSScott Long drain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget) 974c7070dbSScott Long { 984c7070dbSScott Long union ring_state ns; 994c7070dbSScott Long int n, pending, total; 1004c7070dbSScott Long uint16_t cidx = os.cidx; 1014c7070dbSScott Long uint16_t pidx = os.pidx_tail; 1024c7070dbSScott Long 1034c7070dbSScott Long MPASS(os.flags == BUSY); 1044c7070dbSScott Long MPASS(cidx != pidx); 1054c7070dbSScott Long 1064c7070dbSScott Long if (prev == IDLE) 1074c7070dbSScott Long counter_u64_add(r->starts, 1); 1084c7070dbSScott Long pending = 0; 1094c7070dbSScott Long total = 0; 1104c7070dbSScott Long 1114c7070dbSScott Long while (cidx != pidx) { 1124c7070dbSScott Long 1134c7070dbSScott Long /* Items from cidx to pidx are available for consumption. */ 1144c7070dbSScott Long n = r->drain(r, cidx, pidx); 1154c7070dbSScott Long if (n == 0) { 1164c7070dbSScott Long os.state = ns.state = r->state; 1174c7070dbSScott Long ns.cidx = cidx; 1184c7070dbSScott Long ns.flags = STALLED; 1194c7070dbSScott Long r->state = ns.state; 1204c7070dbSScott Long if (prev != STALLED) 1214c7070dbSScott Long counter_u64_add(r->stalls, 1); 1224c7070dbSScott Long else if (total > 0) { 1234c7070dbSScott Long counter_u64_add(r->restarts, 1); 1244c7070dbSScott Long counter_u64_add(r->stalls, 1); 1254c7070dbSScott Long } 1264c7070dbSScott Long break; 1274c7070dbSScott Long } 1284c7070dbSScott Long cidx = increment_idx(r, cidx, n); 1294c7070dbSScott Long pending += n; 1304c7070dbSScott Long total += n; 1314c7070dbSScott Long 1324c7070dbSScott Long /* 1334c7070dbSScott Long * We update the cidx only if we've caught up with the pidx, the 1344c7070dbSScott Long * real cidx is getting too far ahead of the one visible to 1354c7070dbSScott Long * everyone else, or we have exceeded our budget. 1364c7070dbSScott Long */ 1374c7070dbSScott Long if (cidx != pidx && pending < 64 && total < budget) 1384c7070dbSScott Long continue; 1394c7070dbSScott Long 1404c7070dbSScott Long os.state = ns.state = r->state; 1414c7070dbSScott Long ns.cidx = cidx; 1424c7070dbSScott Long ns.flags = state_to_flags(ns, total >= budget); 1434c7070dbSScott Long r->state = ns.state; 1444c7070dbSScott Long 1454c7070dbSScott Long if (ns.flags == ABDICATED) 1464c7070dbSScott Long counter_u64_add(r->abdications, 1); 1474c7070dbSScott Long if (ns.flags != BUSY) { 1484c7070dbSScott Long /* Wrong loop exit if we're going to stall. */ 1494c7070dbSScott Long MPASS(ns.flags != STALLED); 1504c7070dbSScott Long if (prev == STALLED) { 1514c7070dbSScott Long MPASS(total > 0); 1524c7070dbSScott Long counter_u64_add(r->restarts, 1); 1534c7070dbSScott Long } 1544c7070dbSScott Long break; 1554c7070dbSScott Long } 1564c7070dbSScott Long 1574c7070dbSScott Long /* 1584c7070dbSScott Long * The acquire style atomic above guarantees visibility of items 1594c7070dbSScott Long * associated with any pidx change that we notice here. 1604c7070dbSScott Long */ 1614c7070dbSScott Long pidx = ns.pidx_tail; 1624c7070dbSScott Long pending = 0; 1634c7070dbSScott Long } 1644c7070dbSScott Long } 1654c7070dbSScott Long #else 1664c7070dbSScott Long /* 1674c7070dbSScott Long * Caller passes in a state, with a guarantee that there is work to do and that 1684c7070dbSScott Long * all items up to the pidx_tail in the state are visible. 1694c7070dbSScott Long */ 1704c7070dbSScott Long static void 1714c7070dbSScott Long drain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget) 1724c7070dbSScott Long { 1734c7070dbSScott Long union ring_state ns; 1744c7070dbSScott Long int n, pending, total; 1754c7070dbSScott Long uint16_t cidx = os.cidx; 1764c7070dbSScott Long uint16_t pidx = os.pidx_tail; 1774c7070dbSScott Long 1784c7070dbSScott Long MPASS(os.flags == BUSY); 1794c7070dbSScott Long MPASS(cidx != pidx); 1804c7070dbSScott Long 1814c7070dbSScott Long if (prev == IDLE) 1824c7070dbSScott Long counter_u64_add(r->starts, 1); 1834c7070dbSScott Long pending = 0; 1844c7070dbSScott Long total = 0; 1854c7070dbSScott Long 1864c7070dbSScott Long while (cidx != pidx) { 1874c7070dbSScott Long 1884c7070dbSScott Long /* Items from cidx to pidx are available for consumption. */ 1894c7070dbSScott Long n = r->drain(r, cidx, pidx); 1904c7070dbSScott Long if (n == 0) { 1914c7070dbSScott Long critical_enter(); 192*14e00107SMarius Strobl os.state = r->state; 1934c7070dbSScott Long do { 194*14e00107SMarius Strobl ns.state = os.state; 1954c7070dbSScott Long ns.cidx = cidx; 1964c7070dbSScott Long ns.flags = STALLED; 197*14e00107SMarius Strobl } while (atomic_fcmpset_64(&r->state, &os.state, 1984c7070dbSScott Long ns.state) == 0); 1994c7070dbSScott Long critical_exit(); 2004c7070dbSScott Long if (prev != STALLED) 2014c7070dbSScott Long counter_u64_add(r->stalls, 1); 2024c7070dbSScott Long else if (total > 0) { 2034c7070dbSScott Long counter_u64_add(r->restarts, 1); 2044c7070dbSScott Long counter_u64_add(r->stalls, 1); 2054c7070dbSScott Long } 2064c7070dbSScott Long break; 2074c7070dbSScott Long } 2084c7070dbSScott Long cidx = increment_idx(r, cidx, n); 2094c7070dbSScott Long pending += n; 2104c7070dbSScott Long total += n; 2114c7070dbSScott Long 2124c7070dbSScott Long /* 2134c7070dbSScott Long * We update the cidx only if we've caught up with the pidx, the 2144c7070dbSScott Long * real cidx is getting too far ahead of the one visible to 2154c7070dbSScott Long * everyone else, or we have exceeded our budget. 2164c7070dbSScott Long */ 2174c7070dbSScott Long if (cidx != pidx && pending < 64 && total < budget) 2184c7070dbSScott Long continue; 2194c7070dbSScott Long critical_enter(); 220*14e00107SMarius Strobl os.state = r->state; 221ab2e3f79SStephen Hurd do { 222*14e00107SMarius Strobl ns.state = os.state; 2234c7070dbSScott Long ns.cidx = cidx; 2244c7070dbSScott Long ns.flags = state_to_flags(ns, total >= budget); 225*14e00107SMarius Strobl } while (atomic_fcmpset_acq_64(&r->state, &os.state, 226*14e00107SMarius Strobl ns.state) == 0); 2274c7070dbSScott Long critical_exit(); 2284c7070dbSScott Long 2294c7070dbSScott Long if (ns.flags == ABDICATED) 2304c7070dbSScott Long counter_u64_add(r->abdications, 1); 2314c7070dbSScott Long if (ns.flags != BUSY) { 2324c7070dbSScott Long /* Wrong loop exit if we're going to stall. */ 2334c7070dbSScott Long MPASS(ns.flags != STALLED); 2344c7070dbSScott Long if (prev == STALLED) { 2354c7070dbSScott Long MPASS(total > 0); 2364c7070dbSScott Long counter_u64_add(r->restarts, 1); 2374c7070dbSScott Long } 2384c7070dbSScott Long break; 2394c7070dbSScott Long } 2404c7070dbSScott Long 2414c7070dbSScott Long /* 2424c7070dbSScott Long * The acquire style atomic above guarantees visibility of items 2434c7070dbSScott Long * associated with any pidx change that we notice here. 2444c7070dbSScott Long */ 2454c7070dbSScott Long pidx = ns.pidx_tail; 2464c7070dbSScott Long pending = 0; 2474c7070dbSScott Long } 2484c7070dbSScott Long } 2494c7070dbSScott Long #endif 2504c7070dbSScott Long 2514c7070dbSScott Long int 2524c7070dbSScott Long ifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain, 2534c7070dbSScott Long mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags) 2544c7070dbSScott Long { 2554c7070dbSScott Long struct ifmp_ring *r; 2564c7070dbSScott Long 2574c7070dbSScott Long /* All idx are 16b so size can be 65536 at most */ 2584c7070dbSScott Long if (pr == NULL || size < 2 || size > 65536 || drain == NULL || 2594c7070dbSScott Long can_drain == NULL) 2604c7070dbSScott Long return (EINVAL); 2614c7070dbSScott Long *pr = NULL; 2624c7070dbSScott Long flags &= M_NOWAIT | M_WAITOK; 2634c7070dbSScott Long MPASS(flags != 0); 2644c7070dbSScott Long 2654c7070dbSScott Long r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO); 2664c7070dbSScott Long if (r == NULL) 2674c7070dbSScott Long return (ENOMEM); 2684c7070dbSScott Long r->size = size; 2694c7070dbSScott Long r->cookie = cookie; 2704c7070dbSScott Long r->mt = mt; 2714c7070dbSScott Long r->drain = drain; 2724c7070dbSScott Long r->can_drain = can_drain; 2734c7070dbSScott Long r->enqueues = counter_u64_alloc(flags); 2744c7070dbSScott Long r->drops = counter_u64_alloc(flags); 2754c7070dbSScott Long r->starts = counter_u64_alloc(flags); 2764c7070dbSScott Long r->stalls = counter_u64_alloc(flags); 2774c7070dbSScott Long r->restarts = counter_u64_alloc(flags); 2784c7070dbSScott Long r->abdications = counter_u64_alloc(flags); 2794c7070dbSScott Long if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL || 2804c7070dbSScott Long r->stalls == NULL || r->restarts == NULL || 2814c7070dbSScott Long r->abdications == NULL) { 2824c7070dbSScott Long ifmp_ring_free(r); 2834c7070dbSScott Long return (ENOMEM); 2844c7070dbSScott Long } 2854c7070dbSScott Long 2864c7070dbSScott Long *pr = r; 2875881181dSMatt Macy #ifdef MP_RING_NO_64BIT_ATOMICS 2884c7070dbSScott Long mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF); 2894c7070dbSScott Long #endif 2904c7070dbSScott Long return (0); 2914c7070dbSScott Long } 2924c7070dbSScott Long 2934c7070dbSScott Long void 2944c7070dbSScott Long ifmp_ring_free(struct ifmp_ring *r) 2954c7070dbSScott Long { 2964c7070dbSScott Long 2974c7070dbSScott Long if (r == NULL) 2984c7070dbSScott Long return; 2994c7070dbSScott Long 3004c7070dbSScott Long if (r->enqueues != NULL) 3014c7070dbSScott Long counter_u64_free(r->enqueues); 3024c7070dbSScott Long if (r->drops != NULL) 3034c7070dbSScott Long counter_u64_free(r->drops); 3044c7070dbSScott Long if (r->starts != NULL) 3054c7070dbSScott Long counter_u64_free(r->starts); 3064c7070dbSScott Long if (r->stalls != NULL) 3074c7070dbSScott Long counter_u64_free(r->stalls); 3084c7070dbSScott Long if (r->restarts != NULL) 3094c7070dbSScott Long counter_u64_free(r->restarts); 3104c7070dbSScott Long if (r->abdications != NULL) 3114c7070dbSScott Long counter_u64_free(r->abdications); 3124c7070dbSScott Long 3134c7070dbSScott Long free(r, r->mt); 3144c7070dbSScott Long } 3154c7070dbSScott Long 3164c7070dbSScott Long /* 3174c7070dbSScott Long * Enqueue n items and maybe drain the ring for some time. 3184c7070dbSScott Long * 3194c7070dbSScott Long * Returns an errno. 3204c7070dbSScott Long */ 3215881181dSMatt Macy #ifdef MP_RING_NO_64BIT_ATOMICS 3224c7070dbSScott Long int 323fe51d4cdSStephen Hurd ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate) 3244c7070dbSScott Long { 3254c7070dbSScott Long union ring_state os, ns; 3264c7070dbSScott Long uint16_t pidx_start, pidx_stop; 3274c7070dbSScott Long int i; 3284c7070dbSScott Long 3294c7070dbSScott Long MPASS(items != NULL); 3304c7070dbSScott Long MPASS(n > 0); 3314c7070dbSScott Long 3324c7070dbSScott Long mtx_lock(&r->lock); 3334c7070dbSScott Long /* 3344c7070dbSScott Long * Reserve room for the new items. Our reservation, if successful, is 3354c7070dbSScott Long * from 'pidx_start' to 'pidx_stop'. 3364c7070dbSScott Long */ 3374c7070dbSScott Long os.state = r->state; 3384c7070dbSScott Long if (n >= space_available(r, os)) { 3394c7070dbSScott Long counter_u64_add(r->drops, n); 3404c7070dbSScott Long MPASS(os.flags != IDLE); 341e335651eSMatt Macy mtx_unlock(&r->lock); 3424c7070dbSScott Long if (os.flags == STALLED) 3434c7070dbSScott Long ifmp_ring_check_drainage(r, 0); 3444c7070dbSScott Long return (ENOBUFS); 3454c7070dbSScott Long } 3464c7070dbSScott Long ns.state = os.state; 3474c7070dbSScott Long ns.pidx_head = increment_idx(r, os.pidx_head, n); 3484c7070dbSScott Long r->state = ns.state; 3494c7070dbSScott Long pidx_start = os.pidx_head; 3504c7070dbSScott Long pidx_stop = ns.pidx_head; 3514c7070dbSScott Long 3524c7070dbSScott Long /* 3534c7070dbSScott Long * Wait for other producers who got in ahead of us to enqueue their 3544c7070dbSScott Long * items, one producer at a time. It is our turn when the ring's 355efc457e1SPedro F. Giffuni * pidx_tail reaches the beginning of our reservation (pidx_start). 3564c7070dbSScott Long */ 3574c7070dbSScott Long while (ns.pidx_tail != pidx_start) { 3584c7070dbSScott Long cpu_spinwait(); 3594c7070dbSScott Long ns.state = r->state; 3604c7070dbSScott Long } 3614c7070dbSScott Long 3624c7070dbSScott Long /* Now it is our turn to fill up the area we reserved earlier. */ 3634c7070dbSScott Long i = pidx_start; 3644c7070dbSScott Long do { 3654c7070dbSScott Long r->items[i] = *items++; 3664c7070dbSScott Long if (__predict_false(++i == r->size)) 3674c7070dbSScott Long i = 0; 3684c7070dbSScott Long } while (i != pidx_stop); 3694c7070dbSScott Long 3704c7070dbSScott Long /* 3714c7070dbSScott Long * Update the ring's pidx_tail. The release style atomic guarantees 3724c7070dbSScott Long * that the items are visible to any thread that sees the updated pidx. 3734c7070dbSScott Long */ 3744c7070dbSScott Long os.state = ns.state = r->state; 3754c7070dbSScott Long ns.pidx_tail = pidx_stop; 376fe51d4cdSStephen Hurd if (abdicate) { 377fe51d4cdSStephen Hurd if (os.flags == IDLE) 378fe51d4cdSStephen Hurd ns.flags = ABDICATED; 379*14e00107SMarius Strobl } else 3804c7070dbSScott Long ns.flags = BUSY; 3814c7070dbSScott Long r->state = ns.state; 3824c7070dbSScott Long counter_u64_add(r->enqueues, n); 3834c7070dbSScott Long 384fe51d4cdSStephen Hurd if (!abdicate) { 3854c7070dbSScott Long /* 3864c7070dbSScott Long * Turn into a consumer if some other thread isn't active as a consumer 3874c7070dbSScott Long * already. 3884c7070dbSScott Long */ 3894c7070dbSScott Long if (os.flags != BUSY) 3904c7070dbSScott Long drain_ring_locked(r, ns, os.flags, budget); 391fe51d4cdSStephen Hurd } 3924c7070dbSScott Long 3934c7070dbSScott Long mtx_unlock(&r->lock); 3944c7070dbSScott Long return (0); 3954c7070dbSScott Long } 3964c7070dbSScott Long #else 3974c7070dbSScott Long int 398fe51d4cdSStephen Hurd ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate) 3994c7070dbSScott Long { 4004c7070dbSScott Long union ring_state os, ns; 4014c7070dbSScott Long uint16_t pidx_start, pidx_stop; 4024c7070dbSScott Long int i; 4034c7070dbSScott Long 4044c7070dbSScott Long MPASS(items != NULL); 4054c7070dbSScott Long MPASS(n > 0); 4064c7070dbSScott Long 4074c7070dbSScott Long /* 4084c7070dbSScott Long * Reserve room for the new items. Our reservation, if successful, is 4094c7070dbSScott Long * from 'pidx_start' to 'pidx_stop'. 4104c7070dbSScott Long */ 4114c7070dbSScott Long os.state = r->state; 412*14e00107SMarius Strobl for (;;) { 4134c7070dbSScott Long if (n >= space_available(r, os)) { 4144c7070dbSScott Long counter_u64_add(r->drops, n); 4154c7070dbSScott Long MPASS(os.flags != IDLE); 4164c7070dbSScott Long if (os.flags == STALLED) 4174c7070dbSScott Long ifmp_ring_check_drainage(r, 0); 4184c7070dbSScott Long return (ENOBUFS); 4194c7070dbSScott Long } 4204c7070dbSScott Long ns.state = os.state; 4214c7070dbSScott Long ns.pidx_head = increment_idx(r, os.pidx_head, n); 4224c7070dbSScott Long critical_enter(); 423*14e00107SMarius Strobl if (atomic_fcmpset_64(&r->state, &os.state, ns.state)) 4244c7070dbSScott Long break; 4254c7070dbSScott Long critical_exit(); 4264c7070dbSScott Long cpu_spinwait(); 4274c7070dbSScott Long } 4284c7070dbSScott Long pidx_start = os.pidx_head; 4294c7070dbSScott Long pidx_stop = ns.pidx_head; 4304c7070dbSScott Long 4314c7070dbSScott Long /* 4324c7070dbSScott Long * Wait for other producers who got in ahead of us to enqueue their 4334c7070dbSScott Long * items, one producer at a time. It is our turn when the ring's 434efc457e1SPedro F. Giffuni * pidx_tail reaches the beginning of our reservation (pidx_start). 4354c7070dbSScott Long */ 4364c7070dbSScott Long while (ns.pidx_tail != pidx_start) { 4374c7070dbSScott Long cpu_spinwait(); 4384c7070dbSScott Long ns.state = r->state; 4394c7070dbSScott Long } 4404c7070dbSScott Long 4414c7070dbSScott Long /* Now it is our turn to fill up the area we reserved earlier. */ 4424c7070dbSScott Long i = pidx_start; 4434c7070dbSScott Long do { 4444c7070dbSScott Long r->items[i] = *items++; 4454c7070dbSScott Long if (__predict_false(++i == r->size)) 4464c7070dbSScott Long i = 0; 4474c7070dbSScott Long } while (i != pidx_stop); 4484c7070dbSScott Long 4494c7070dbSScott Long /* 4504c7070dbSScott Long * Update the ring's pidx_tail. The release style atomic guarantees 4514c7070dbSScott Long * that the items are visible to any thread that sees the updated pidx. 4524c7070dbSScott Long */ 453*14e00107SMarius Strobl os.state = r->state; 4544c7070dbSScott Long do { 455*14e00107SMarius Strobl ns.state = os.state; 4564c7070dbSScott Long ns.pidx_tail = pidx_stop; 457fe51d4cdSStephen Hurd if (abdicate) { 4581225d9daSStephen Hurd if (os.flags == IDLE) 4591225d9daSStephen Hurd ns.flags = ABDICATED; 460*14e00107SMarius Strobl } else 461fe51d4cdSStephen Hurd ns.flags = BUSY; 462*14e00107SMarius Strobl } while (atomic_fcmpset_rel_64(&r->state, &os.state, ns.state) == 0); 4634c7070dbSScott Long critical_exit(); 4644c7070dbSScott Long counter_u64_add(r->enqueues, n); 4654c7070dbSScott Long 466fe51d4cdSStephen Hurd if (!abdicate) { 467fe51d4cdSStephen Hurd /* 468fe51d4cdSStephen Hurd * Turn into a consumer if some other thread isn't active as a consumer 469fe51d4cdSStephen Hurd * already. 470fe51d4cdSStephen Hurd */ 471fe51d4cdSStephen Hurd if (os.flags != BUSY) 472fe51d4cdSStephen Hurd drain_ring_lockless(r, ns, os.flags, budget); 473fe51d4cdSStephen Hurd } 474fe51d4cdSStephen Hurd 4754c7070dbSScott Long return (0); 4764c7070dbSScott Long } 4774c7070dbSScott Long #endif 4784c7070dbSScott Long 4794c7070dbSScott Long void 4804c7070dbSScott Long ifmp_ring_check_drainage(struct ifmp_ring *r, int budget) 4814c7070dbSScott Long { 4824c7070dbSScott Long union ring_state os, ns; 4834c7070dbSScott Long 4844c7070dbSScott Long os.state = r->state; 4851225d9daSStephen Hurd if ((os.flags != STALLED && os.flags != ABDICATED) || // Only continue in STALLED and ABDICATED 4861225d9daSStephen Hurd os.pidx_head != os.pidx_tail || // Require work to be available 4871225d9daSStephen Hurd (os.flags != ABDICATED && r->can_drain(r) == 0)) // Can either drain, or everyone left 4884c7070dbSScott Long return; 4894c7070dbSScott Long 4904c7070dbSScott Long MPASS(os.cidx != os.pidx_tail); /* implied by STALLED */ 4914c7070dbSScott Long ns.state = os.state; 4924c7070dbSScott Long ns.flags = BUSY; 4934c7070dbSScott Long 4944c7070dbSScott Long 4955881181dSMatt Macy #ifdef MP_RING_NO_64BIT_ATOMICS 4964c7070dbSScott Long mtx_lock(&r->lock); 4974c7070dbSScott Long if (r->state != os.state) { 4984c7070dbSScott Long mtx_unlock(&r->lock); 4994c7070dbSScott Long return; 5004c7070dbSScott Long } 5014c7070dbSScott Long r->state = ns.state; 5024c7070dbSScott Long drain_ring_locked(r, ns, os.flags, budget); 5034c7070dbSScott Long mtx_unlock(&r->lock); 5044c7070dbSScott Long #else 5054c7070dbSScott Long /* 5064c7070dbSScott Long * The acquire style atomic guarantees visibility of items associated 5074c7070dbSScott Long * with the pidx that we read here. 5084c7070dbSScott Long */ 5094c7070dbSScott Long if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state)) 5104c7070dbSScott Long return; 5114c7070dbSScott Long 5124c7070dbSScott Long 5134c7070dbSScott Long drain_ring_lockless(r, ns, os.flags, budget); 5144c7070dbSScott Long #endif 5154c7070dbSScott Long } 5164c7070dbSScott Long 5174c7070dbSScott Long void 5184c7070dbSScott Long ifmp_ring_reset_stats(struct ifmp_ring *r) 5194c7070dbSScott Long { 5204c7070dbSScott Long 5214c7070dbSScott Long counter_u64_zero(r->enqueues); 5224c7070dbSScott Long counter_u64_zero(r->drops); 5234c7070dbSScott Long counter_u64_zero(r->starts); 5244c7070dbSScott Long counter_u64_zero(r->stalls); 5254c7070dbSScott Long counter_u64_zero(r->restarts); 5264c7070dbSScott Long counter_u64_zero(r->abdications); 5274c7070dbSScott Long } 5284c7070dbSScott Long 5294c7070dbSScott Long int 5304c7070dbSScott Long ifmp_ring_is_idle(struct ifmp_ring *r) 5314c7070dbSScott Long { 5324c7070dbSScott Long union ring_state s; 5334c7070dbSScott Long 5344c7070dbSScott Long s.state = r->state; 5354c7070dbSScott Long if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx && 5364c7070dbSScott Long s.flags == IDLE) 5374c7070dbSScott Long return (1); 5384c7070dbSScott Long 5394c7070dbSScott Long return (0); 5404c7070dbSScott Long } 5414c7070dbSScott Long 5424c7070dbSScott Long int 5434c7070dbSScott Long ifmp_ring_is_stalled(struct ifmp_ring *r) 5444c7070dbSScott Long { 5454c7070dbSScott Long union ring_state s; 5464c7070dbSScott Long 5474c7070dbSScott Long s.state = r->state; 5484c7070dbSScott Long if (s.pidx_head == s.pidx_tail && s.flags == STALLED) 5494c7070dbSScott Long return (1); 5504c7070dbSScott Long 5514c7070dbSScott Long return (0); 5524c7070dbSScott Long } 553