14c7070dbSScott Long /*- 24c7070dbSScott Long * Copyright (c) 2014 Chelsio Communications, Inc. 34c7070dbSScott Long * All rights reserved. 44c7070dbSScott Long * Written by: Navdeep Parhar <np@FreeBSD.org> 54c7070dbSScott Long * 64c7070dbSScott Long * Redistribution and use in source and binary forms, with or without 74c7070dbSScott Long * modification, are permitted provided that the following conditions 84c7070dbSScott Long * are met: 94c7070dbSScott Long * 1. Redistributions of source code must retain the above copyright 104c7070dbSScott Long * notice, this list of conditions and the following disclaimer. 114c7070dbSScott Long * 2. Redistributions in binary form must reproduce the above copyright 124c7070dbSScott Long * notice, this list of conditions and the following disclaimer in the 134c7070dbSScott Long * documentation and/or other materials provided with the distribution. 144c7070dbSScott Long * 154c7070dbSScott Long * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 164c7070dbSScott Long * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 174c7070dbSScott Long * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 184c7070dbSScott Long * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 194c7070dbSScott Long * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 204c7070dbSScott Long * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 214c7070dbSScott Long * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 224c7070dbSScott Long * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 234c7070dbSScott Long * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 244c7070dbSScott Long * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 254c7070dbSScott Long * SUCH DAMAGE. 264c7070dbSScott Long */ 274c7070dbSScott Long 284c7070dbSScott Long #include <sys/cdefs.h> 294c7070dbSScott Long __FBSDID("$FreeBSD$"); 304c7070dbSScott Long 314c7070dbSScott Long #include <sys/types.h> 324c7070dbSScott Long #include <sys/param.h> 334c7070dbSScott Long #include <sys/systm.h> 344c7070dbSScott Long #include <sys/counter.h> 354c7070dbSScott Long #include <sys/lock.h> 364c7070dbSScott Long #include <sys/mutex.h> 374c7070dbSScott Long #include <sys/malloc.h> 384c7070dbSScott Long #include <machine/cpu.h> 394c7070dbSScott Long 40*fc614c29SScott Long #if defined(__powerpc__) || defined(__mips__) 41*fc614c29SScott Long #define NO_64BIT_ATOMICS 42*fc614c29SScott Long #endif 434c7070dbSScott Long 444c7070dbSScott Long #if defined(__i386__) 454c7070dbSScott Long #define atomic_cmpset_acq_64 atomic_cmpset_64 464c7070dbSScott Long #define atomic_cmpset_rel_64 atomic_cmpset_64 474c7070dbSScott Long #endif 484c7070dbSScott Long 49*fc614c29SScott Long #include <net/mp_ring.h> 50*fc614c29SScott Long 514c7070dbSScott Long union ring_state { 524c7070dbSScott Long struct { 534c7070dbSScott Long uint16_t pidx_head; 544c7070dbSScott Long uint16_t pidx_tail; 554c7070dbSScott Long uint16_t cidx; 564c7070dbSScott Long uint16_t flags; 574c7070dbSScott Long }; 584c7070dbSScott Long uint64_t state; 594c7070dbSScott Long }; 604c7070dbSScott Long 614c7070dbSScott Long enum { 624c7070dbSScott Long IDLE = 0, /* consumer ran to completion, nothing more to do. */ 634c7070dbSScott Long BUSY, /* consumer is running already, or will be shortly. */ 644c7070dbSScott Long STALLED, /* consumer stopped due to lack of resources. */ 654c7070dbSScott Long ABDICATED, /* consumer stopped even though there was work to be 664c7070dbSScott Long done because it wants another thread to take over. */ 674c7070dbSScott Long }; 684c7070dbSScott Long 694c7070dbSScott Long static inline uint16_t 704c7070dbSScott Long space_available(struct ifmp_ring *r, union ring_state s) 714c7070dbSScott Long { 724c7070dbSScott Long uint16_t x = r->size - 1; 734c7070dbSScott Long 744c7070dbSScott Long if (s.cidx == s.pidx_head) 754c7070dbSScott Long return (x); 764c7070dbSScott Long else if (s.cidx > s.pidx_head) 774c7070dbSScott Long return (s.cidx - s.pidx_head - 1); 784c7070dbSScott Long else 794c7070dbSScott Long return (x - s.pidx_head + s.cidx); 804c7070dbSScott Long } 814c7070dbSScott Long 824c7070dbSScott Long static inline uint16_t 834c7070dbSScott Long increment_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n) 844c7070dbSScott Long { 854c7070dbSScott Long int x = r->size - idx; 864c7070dbSScott Long 874c7070dbSScott Long MPASS(x > 0); 884c7070dbSScott Long return (x > n ? idx + n : n - x); 894c7070dbSScott Long } 904c7070dbSScott Long 914c7070dbSScott Long /* Consumer is about to update the ring's state to s */ 924c7070dbSScott Long static inline uint16_t 934c7070dbSScott Long state_to_flags(union ring_state s, int abdicate) 944c7070dbSScott Long { 954c7070dbSScott Long 964c7070dbSScott Long if (s.cidx == s.pidx_tail) 974c7070dbSScott Long return (IDLE); 984c7070dbSScott Long else if (abdicate && s.pidx_tail != s.pidx_head) 994c7070dbSScott Long return (ABDICATED); 1004c7070dbSScott Long 1014c7070dbSScott Long return (BUSY); 1024c7070dbSScott Long } 1034c7070dbSScott Long 1044c7070dbSScott Long #ifdef NO_64BIT_ATOMICS 1054c7070dbSScott Long static void 1064c7070dbSScott Long drain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget) 1074c7070dbSScott Long { 1084c7070dbSScott Long union ring_state ns; 1094c7070dbSScott Long int n, pending, total; 1104c7070dbSScott Long uint16_t cidx = os.cidx; 1114c7070dbSScott Long uint16_t pidx = os.pidx_tail; 1124c7070dbSScott Long 1134c7070dbSScott Long MPASS(os.flags == BUSY); 1144c7070dbSScott Long MPASS(cidx != pidx); 1154c7070dbSScott Long 1164c7070dbSScott Long if (prev == IDLE) 1174c7070dbSScott Long counter_u64_add(r->starts, 1); 1184c7070dbSScott Long pending = 0; 1194c7070dbSScott Long total = 0; 1204c7070dbSScott Long 1214c7070dbSScott Long while (cidx != pidx) { 1224c7070dbSScott Long 1234c7070dbSScott Long /* Items from cidx to pidx are available for consumption. */ 1244c7070dbSScott Long n = r->drain(r, cidx, pidx); 1254c7070dbSScott Long if (n == 0) { 1264c7070dbSScott Long os.state = ns.state = r->state; 1274c7070dbSScott Long ns.cidx = cidx; 1284c7070dbSScott Long ns.flags = STALLED; 1294c7070dbSScott Long r->state = ns.state; 1304c7070dbSScott Long if (prev != STALLED) 1314c7070dbSScott Long counter_u64_add(r->stalls, 1); 1324c7070dbSScott Long else if (total > 0) { 1334c7070dbSScott Long counter_u64_add(r->restarts, 1); 1344c7070dbSScott Long counter_u64_add(r->stalls, 1); 1354c7070dbSScott Long } 1364c7070dbSScott Long break; 1374c7070dbSScott Long } 1384c7070dbSScott Long cidx = increment_idx(r, cidx, n); 1394c7070dbSScott Long pending += n; 1404c7070dbSScott Long total += n; 1414c7070dbSScott Long 1424c7070dbSScott Long /* 1434c7070dbSScott Long * We update the cidx only if we've caught up with the pidx, the 1444c7070dbSScott Long * real cidx is getting too far ahead of the one visible to 1454c7070dbSScott Long * everyone else, or we have exceeded our budget. 1464c7070dbSScott Long */ 1474c7070dbSScott Long if (cidx != pidx && pending < 64 && total < budget) 1484c7070dbSScott Long continue; 1494c7070dbSScott Long 1504c7070dbSScott Long os.state = ns.state = r->state; 1514c7070dbSScott Long ns.cidx = cidx; 1524c7070dbSScott Long ns.flags = state_to_flags(ns, total >= budget); 1534c7070dbSScott Long r->state = ns.state; 1544c7070dbSScott Long 1554c7070dbSScott Long if (ns.flags == ABDICATED) 1564c7070dbSScott Long counter_u64_add(r->abdications, 1); 1574c7070dbSScott Long if (ns.flags != BUSY) { 1584c7070dbSScott Long /* Wrong loop exit if we're going to stall. */ 1594c7070dbSScott Long MPASS(ns.flags != STALLED); 1604c7070dbSScott Long if (prev == STALLED) { 1614c7070dbSScott Long MPASS(total > 0); 1624c7070dbSScott Long counter_u64_add(r->restarts, 1); 1634c7070dbSScott Long } 1644c7070dbSScott Long break; 1654c7070dbSScott Long } 1664c7070dbSScott Long 1674c7070dbSScott Long /* 1684c7070dbSScott Long * The acquire style atomic above guarantees visibility of items 1694c7070dbSScott Long * associated with any pidx change that we notice here. 1704c7070dbSScott Long */ 1714c7070dbSScott Long pidx = ns.pidx_tail; 1724c7070dbSScott Long pending = 0; 1734c7070dbSScott Long } 1744c7070dbSScott Long } 1754c7070dbSScott Long #else 1764c7070dbSScott Long /* 1774c7070dbSScott Long * Caller passes in a state, with a guarantee that there is work to do and that 1784c7070dbSScott Long * all items up to the pidx_tail in the state are visible. 1794c7070dbSScott Long */ 1804c7070dbSScott Long static void 1814c7070dbSScott Long drain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget) 1824c7070dbSScott Long { 1834c7070dbSScott Long union ring_state ns; 1844c7070dbSScott Long int n, pending, total; 1854c7070dbSScott Long uint16_t cidx = os.cidx; 1864c7070dbSScott Long uint16_t pidx = os.pidx_tail; 1874c7070dbSScott Long 1884c7070dbSScott Long MPASS(os.flags == BUSY); 1894c7070dbSScott Long MPASS(cidx != pidx); 1904c7070dbSScott Long 1914c7070dbSScott Long if (prev == IDLE) 1924c7070dbSScott Long counter_u64_add(r->starts, 1); 1934c7070dbSScott Long pending = 0; 1944c7070dbSScott Long total = 0; 1954c7070dbSScott Long 1964c7070dbSScott Long while (cidx != pidx) { 1974c7070dbSScott Long 1984c7070dbSScott Long /* Items from cidx to pidx are available for consumption. */ 1994c7070dbSScott Long n = r->drain(r, cidx, pidx); 2004c7070dbSScott Long if (n == 0) { 2014c7070dbSScott Long critical_enter(); 2024c7070dbSScott Long do { 2034c7070dbSScott Long os.state = ns.state = r->state; 2044c7070dbSScott Long ns.cidx = cidx; 2054c7070dbSScott Long ns.flags = STALLED; 2064c7070dbSScott Long } while (atomic_cmpset_64(&r->state, os.state, 2074c7070dbSScott Long ns.state) == 0); 2084c7070dbSScott Long critical_exit(); 2094c7070dbSScott Long if (prev != STALLED) 2104c7070dbSScott Long counter_u64_add(r->stalls, 1); 2114c7070dbSScott Long else if (total > 0) { 2124c7070dbSScott Long counter_u64_add(r->restarts, 1); 2134c7070dbSScott Long counter_u64_add(r->stalls, 1); 2144c7070dbSScott Long } 2154c7070dbSScott Long break; 2164c7070dbSScott Long } 2174c7070dbSScott Long cidx = increment_idx(r, cidx, n); 2184c7070dbSScott Long pending += n; 2194c7070dbSScott Long total += n; 2204c7070dbSScott Long 2214c7070dbSScott Long /* 2224c7070dbSScott Long * We update the cidx only if we've caught up with the pidx, the 2234c7070dbSScott Long * real cidx is getting too far ahead of the one visible to 2244c7070dbSScott Long * everyone else, or we have exceeded our budget. 2254c7070dbSScott Long */ 2264c7070dbSScott Long if (cidx != pidx && pending < 64 && total < budget) 2274c7070dbSScott Long continue; 2284c7070dbSScott Long critical_enter(); 2294c7070dbSScott Long do { 2304c7070dbSScott Long os.state = ns.state = r->state; 2314c7070dbSScott Long ns.cidx = cidx; 2324c7070dbSScott Long ns.flags = state_to_flags(ns, total >= budget); 2334c7070dbSScott Long } while (atomic_cmpset_acq_64(&r->state, os.state, ns.state) == 0); 2344c7070dbSScott Long critical_exit(); 2354c7070dbSScott Long 2364c7070dbSScott Long if (ns.flags == ABDICATED) 2374c7070dbSScott Long counter_u64_add(r->abdications, 1); 2384c7070dbSScott Long if (ns.flags != BUSY) { 2394c7070dbSScott Long /* Wrong loop exit if we're going to stall. */ 2404c7070dbSScott Long MPASS(ns.flags != STALLED); 2414c7070dbSScott Long if (prev == STALLED) { 2424c7070dbSScott Long MPASS(total > 0); 2434c7070dbSScott Long counter_u64_add(r->restarts, 1); 2444c7070dbSScott Long } 2454c7070dbSScott Long break; 2464c7070dbSScott Long } 2474c7070dbSScott Long 2484c7070dbSScott Long /* 2494c7070dbSScott Long * The acquire style atomic above guarantees visibility of items 2504c7070dbSScott Long * associated with any pidx change that we notice here. 2514c7070dbSScott Long */ 2524c7070dbSScott Long pidx = ns.pidx_tail; 2534c7070dbSScott Long pending = 0; 2544c7070dbSScott Long } 2554c7070dbSScott Long } 2564c7070dbSScott Long #endif 2574c7070dbSScott Long 2584c7070dbSScott Long int 2594c7070dbSScott Long ifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain, 2604c7070dbSScott Long mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags) 2614c7070dbSScott Long { 2624c7070dbSScott Long struct ifmp_ring *r; 2634c7070dbSScott Long 2644c7070dbSScott Long /* All idx are 16b so size can be 65536 at most */ 2654c7070dbSScott Long if (pr == NULL || size < 2 || size > 65536 || drain == NULL || 2664c7070dbSScott Long can_drain == NULL) 2674c7070dbSScott Long return (EINVAL); 2684c7070dbSScott Long *pr = NULL; 2694c7070dbSScott Long flags &= M_NOWAIT | M_WAITOK; 2704c7070dbSScott Long MPASS(flags != 0); 2714c7070dbSScott Long 2724c7070dbSScott Long r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO); 2734c7070dbSScott Long if (r == NULL) 2744c7070dbSScott Long return (ENOMEM); 2754c7070dbSScott Long r->size = size; 2764c7070dbSScott Long r->cookie = cookie; 2774c7070dbSScott Long r->mt = mt; 2784c7070dbSScott Long r->drain = drain; 2794c7070dbSScott Long r->can_drain = can_drain; 2804c7070dbSScott Long r->enqueues = counter_u64_alloc(flags); 2814c7070dbSScott Long r->drops = counter_u64_alloc(flags); 2824c7070dbSScott Long r->starts = counter_u64_alloc(flags); 2834c7070dbSScott Long r->stalls = counter_u64_alloc(flags); 2844c7070dbSScott Long r->restarts = counter_u64_alloc(flags); 2854c7070dbSScott Long r->abdications = counter_u64_alloc(flags); 2864c7070dbSScott Long if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL || 2874c7070dbSScott Long r->stalls == NULL || r->restarts == NULL || 2884c7070dbSScott Long r->abdications == NULL) { 2894c7070dbSScott Long ifmp_ring_free(r); 2904c7070dbSScott Long return (ENOMEM); 2914c7070dbSScott Long } 2924c7070dbSScott Long 2934c7070dbSScott Long *pr = r; 2944c7070dbSScott Long #ifdef NO_64BIT_ATOMICS 2954c7070dbSScott Long mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF); 2964c7070dbSScott Long #endif 2974c7070dbSScott Long return (0); 2984c7070dbSScott Long } 2994c7070dbSScott Long 3004c7070dbSScott Long void 3014c7070dbSScott Long ifmp_ring_free(struct ifmp_ring *r) 3024c7070dbSScott Long { 3034c7070dbSScott Long 3044c7070dbSScott Long if (r == NULL) 3054c7070dbSScott Long return; 3064c7070dbSScott Long 3074c7070dbSScott Long if (r->enqueues != NULL) 3084c7070dbSScott Long counter_u64_free(r->enqueues); 3094c7070dbSScott Long if (r->drops != NULL) 3104c7070dbSScott Long counter_u64_free(r->drops); 3114c7070dbSScott Long if (r->starts != NULL) 3124c7070dbSScott Long counter_u64_free(r->starts); 3134c7070dbSScott Long if (r->stalls != NULL) 3144c7070dbSScott Long counter_u64_free(r->stalls); 3154c7070dbSScott Long if (r->restarts != NULL) 3164c7070dbSScott Long counter_u64_free(r->restarts); 3174c7070dbSScott Long if (r->abdications != NULL) 3184c7070dbSScott Long counter_u64_free(r->abdications); 3194c7070dbSScott Long 3204c7070dbSScott Long free(r, r->mt); 3214c7070dbSScott Long } 3224c7070dbSScott Long 3234c7070dbSScott Long /* 3244c7070dbSScott Long * Enqueue n items and maybe drain the ring for some time. 3254c7070dbSScott Long * 3264c7070dbSScott Long * Returns an errno. 3274c7070dbSScott Long */ 3284c7070dbSScott Long #ifdef NO_64BIT_ATOMICS 3294c7070dbSScott Long int 3304c7070dbSScott Long ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget) 3314c7070dbSScott Long { 3324c7070dbSScott Long union ring_state os, ns; 3334c7070dbSScott Long uint16_t pidx_start, pidx_stop; 3344c7070dbSScott Long int i; 3354c7070dbSScott Long 3364c7070dbSScott Long MPASS(items != NULL); 3374c7070dbSScott Long MPASS(n > 0); 3384c7070dbSScott Long 3394c7070dbSScott Long mtx_lock(&r->lock); 3404c7070dbSScott Long /* 3414c7070dbSScott Long * Reserve room for the new items. Our reservation, if successful, is 3424c7070dbSScott Long * from 'pidx_start' to 'pidx_stop'. 3434c7070dbSScott Long */ 3444c7070dbSScott Long os.state = r->state; 3454c7070dbSScott Long if (n >= space_available(r, os)) { 3464c7070dbSScott Long counter_u64_add(r->drops, n); 3474c7070dbSScott Long MPASS(os.flags != IDLE); 3484c7070dbSScott Long if (os.flags == STALLED) 3494c7070dbSScott Long ifmp_ring_check_drainage(r, 0); 3504c7070dbSScott Long return (ENOBUFS); 3514c7070dbSScott Long } 3524c7070dbSScott Long ns.state = os.state; 3534c7070dbSScott Long ns.pidx_head = increment_idx(r, os.pidx_head, n); 3544c7070dbSScott Long r->state = ns.state; 3554c7070dbSScott Long pidx_start = os.pidx_head; 3564c7070dbSScott Long pidx_stop = ns.pidx_head; 3574c7070dbSScott Long 3584c7070dbSScott Long /* 3594c7070dbSScott Long * Wait for other producers who got in ahead of us to enqueue their 3604c7070dbSScott Long * items, one producer at a time. It is our turn when the ring's 3614c7070dbSScott Long * pidx_tail reaches the begining of our reservation (pidx_start). 3624c7070dbSScott Long */ 3634c7070dbSScott Long while (ns.pidx_tail != pidx_start) { 3644c7070dbSScott Long cpu_spinwait(); 3654c7070dbSScott Long ns.state = r->state; 3664c7070dbSScott Long } 3674c7070dbSScott Long 3684c7070dbSScott Long /* Now it is our turn to fill up the area we reserved earlier. */ 3694c7070dbSScott Long i = pidx_start; 3704c7070dbSScott Long do { 3714c7070dbSScott Long r->items[i] = *items++; 3724c7070dbSScott Long if (__predict_false(++i == r->size)) 3734c7070dbSScott Long i = 0; 3744c7070dbSScott Long } while (i != pidx_stop); 3754c7070dbSScott Long 3764c7070dbSScott Long /* 3774c7070dbSScott Long * Update the ring's pidx_tail. The release style atomic guarantees 3784c7070dbSScott Long * that the items are visible to any thread that sees the updated pidx. 3794c7070dbSScott Long */ 3804c7070dbSScott Long os.state = ns.state = r->state; 3814c7070dbSScott Long ns.pidx_tail = pidx_stop; 3824c7070dbSScott Long ns.flags = BUSY; 3834c7070dbSScott Long r->state = ns.state; 3844c7070dbSScott Long counter_u64_add(r->enqueues, n); 3854c7070dbSScott Long 3864c7070dbSScott Long /* 3874c7070dbSScott Long * Turn into a consumer if some other thread isn't active as a consumer 3884c7070dbSScott Long * already. 3894c7070dbSScott Long */ 3904c7070dbSScott Long if (os.flags != BUSY) 3914c7070dbSScott Long drain_ring_locked(r, ns, os.flags, budget); 3924c7070dbSScott Long 3934c7070dbSScott Long mtx_unlock(&r->lock); 3944c7070dbSScott Long return (0); 3954c7070dbSScott Long } 3964c7070dbSScott Long 3974c7070dbSScott Long #else 3984c7070dbSScott Long int 3994c7070dbSScott Long ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget) 4004c7070dbSScott Long { 4014c7070dbSScott Long union ring_state os, ns; 4024c7070dbSScott Long uint16_t pidx_start, pidx_stop; 4034c7070dbSScott Long int i; 4044c7070dbSScott Long 4054c7070dbSScott Long MPASS(items != NULL); 4064c7070dbSScott Long MPASS(n > 0); 4074c7070dbSScott Long 4084c7070dbSScott Long /* 4094c7070dbSScott Long * Reserve room for the new items. Our reservation, if successful, is 4104c7070dbSScott Long * from 'pidx_start' to 'pidx_stop'. 4114c7070dbSScott Long */ 4124c7070dbSScott Long for (;;) { 4134c7070dbSScott Long os.state = r->state; 4144c7070dbSScott Long if (n >= space_available(r, os)) { 4154c7070dbSScott Long counter_u64_add(r->drops, n); 4164c7070dbSScott Long MPASS(os.flags != IDLE); 4174c7070dbSScott Long if (os.flags == STALLED) 4184c7070dbSScott Long ifmp_ring_check_drainage(r, 0); 4194c7070dbSScott Long return (ENOBUFS); 4204c7070dbSScott Long } 4214c7070dbSScott Long ns.state = os.state; 4224c7070dbSScott Long ns.pidx_head = increment_idx(r, os.pidx_head, n); 4234c7070dbSScott Long critical_enter(); 4244c7070dbSScott Long if (atomic_cmpset_64(&r->state, os.state, ns.state)) 4254c7070dbSScott Long break; 4264c7070dbSScott Long critical_exit(); 4274c7070dbSScott Long cpu_spinwait(); 4284c7070dbSScott Long } 4294c7070dbSScott Long pidx_start = os.pidx_head; 4304c7070dbSScott Long pidx_stop = ns.pidx_head; 4314c7070dbSScott Long 4324c7070dbSScott Long /* 4334c7070dbSScott Long * Wait for other producers who got in ahead of us to enqueue their 4344c7070dbSScott Long * items, one producer at a time. It is our turn when the ring's 4354c7070dbSScott Long * pidx_tail reaches the begining of our reservation (pidx_start). 4364c7070dbSScott Long */ 4374c7070dbSScott Long while (ns.pidx_tail != pidx_start) { 4384c7070dbSScott Long cpu_spinwait(); 4394c7070dbSScott Long ns.state = r->state; 4404c7070dbSScott Long } 4414c7070dbSScott Long 4424c7070dbSScott Long /* Now it is our turn to fill up the area we reserved earlier. */ 4434c7070dbSScott Long i = pidx_start; 4444c7070dbSScott Long do { 4454c7070dbSScott Long r->items[i] = *items++; 4464c7070dbSScott Long if (__predict_false(++i == r->size)) 4474c7070dbSScott Long i = 0; 4484c7070dbSScott Long } while (i != pidx_stop); 4494c7070dbSScott Long 4504c7070dbSScott Long /* 4514c7070dbSScott Long * Update the ring's pidx_tail. The release style atomic guarantees 4524c7070dbSScott Long * that the items are visible to any thread that sees the updated pidx. 4534c7070dbSScott Long */ 4544c7070dbSScott Long do { 4554c7070dbSScott Long os.state = ns.state = r->state; 4564c7070dbSScott Long ns.pidx_tail = pidx_stop; 4574c7070dbSScott Long ns.flags = BUSY; 4584c7070dbSScott Long } while (atomic_cmpset_rel_64(&r->state, os.state, ns.state) == 0); 4594c7070dbSScott Long critical_exit(); 4604c7070dbSScott Long counter_u64_add(r->enqueues, n); 4614c7070dbSScott Long 4624c7070dbSScott Long /* 4634c7070dbSScott Long * Turn into a consumer if some other thread isn't active as a consumer 4644c7070dbSScott Long * already. 4654c7070dbSScott Long */ 4664c7070dbSScott Long if (os.flags != BUSY) 4674c7070dbSScott Long drain_ring_lockless(r, ns, os.flags, budget); 4684c7070dbSScott Long 4694c7070dbSScott Long return (0); 4704c7070dbSScott Long } 4714c7070dbSScott Long #endif 4724c7070dbSScott Long 4734c7070dbSScott Long void 4744c7070dbSScott Long ifmp_ring_check_drainage(struct ifmp_ring *r, int budget) 4754c7070dbSScott Long { 4764c7070dbSScott Long union ring_state os, ns; 4774c7070dbSScott Long 4784c7070dbSScott Long os.state = r->state; 4794c7070dbSScott Long if (os.flags != STALLED || os.pidx_head != os.pidx_tail || r->can_drain(r) == 0) 4804c7070dbSScott Long return; 4814c7070dbSScott Long 4824c7070dbSScott Long MPASS(os.cidx != os.pidx_tail); /* implied by STALLED */ 4834c7070dbSScott Long ns.state = os.state; 4844c7070dbSScott Long ns.flags = BUSY; 4854c7070dbSScott Long 4864c7070dbSScott Long 4874c7070dbSScott Long #ifdef NO_64BIT_ATOMICS 4884c7070dbSScott Long mtx_lock(&r->lock); 4894c7070dbSScott Long if (r->state != os.state) { 4904c7070dbSScott Long mtx_unlock(&r->lock); 4914c7070dbSScott Long return; 4924c7070dbSScott Long } 4934c7070dbSScott Long r->state = ns.state; 4944c7070dbSScott Long drain_ring_locked(r, ns, os.flags, budget); 4954c7070dbSScott Long mtx_unlock(&r->lock); 4964c7070dbSScott Long #else 4974c7070dbSScott Long /* 4984c7070dbSScott Long * The acquire style atomic guarantees visibility of items associated 4994c7070dbSScott Long * with the pidx that we read here. 5004c7070dbSScott Long */ 5014c7070dbSScott Long if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state)) 5024c7070dbSScott Long return; 5034c7070dbSScott Long 5044c7070dbSScott Long 5054c7070dbSScott Long drain_ring_lockless(r, ns, os.flags, budget); 5064c7070dbSScott Long #endif 5074c7070dbSScott Long } 5084c7070dbSScott Long 5094c7070dbSScott Long void 5104c7070dbSScott Long ifmp_ring_reset_stats(struct ifmp_ring *r) 5114c7070dbSScott Long { 5124c7070dbSScott Long 5134c7070dbSScott Long counter_u64_zero(r->enqueues); 5144c7070dbSScott Long counter_u64_zero(r->drops); 5154c7070dbSScott Long counter_u64_zero(r->starts); 5164c7070dbSScott Long counter_u64_zero(r->stalls); 5174c7070dbSScott Long counter_u64_zero(r->restarts); 5184c7070dbSScott Long counter_u64_zero(r->abdications); 5194c7070dbSScott Long } 5204c7070dbSScott Long 5214c7070dbSScott Long int 5224c7070dbSScott Long ifmp_ring_is_idle(struct ifmp_ring *r) 5234c7070dbSScott Long { 5244c7070dbSScott Long union ring_state s; 5254c7070dbSScott Long 5264c7070dbSScott Long s.state = r->state; 5274c7070dbSScott Long if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx && 5284c7070dbSScott Long s.flags == IDLE) 5294c7070dbSScott Long return (1); 5304c7070dbSScott Long 5314c7070dbSScott Long return (0); 5324c7070dbSScott Long } 5334c7070dbSScott Long 5344c7070dbSScott Long int 5354c7070dbSScott Long ifmp_ring_is_stalled(struct ifmp_ring *r) 5364c7070dbSScott Long { 5374c7070dbSScott Long union ring_state s; 5384c7070dbSScott Long 5394c7070dbSScott Long s.state = r->state; 5404c7070dbSScott Long if (s.pidx_head == s.pidx_tail && s.flags == STALLED) 5414c7070dbSScott Long return (1); 5424c7070dbSScott Long 5434c7070dbSScott Long return (0); 5444c7070dbSScott Long } 545