1*4c7070dbSScott Long /*- 2*4c7070dbSScott Long * Copyright (c) 2014 Chelsio Communications, Inc. 3*4c7070dbSScott Long * All rights reserved. 4*4c7070dbSScott Long * Written by: Navdeep Parhar <np@FreeBSD.org> 5*4c7070dbSScott Long * 6*4c7070dbSScott Long * Redistribution and use in source and binary forms, with or without 7*4c7070dbSScott Long * modification, are permitted provided that the following conditions 8*4c7070dbSScott Long * are met: 9*4c7070dbSScott Long * 1. Redistributions of source code must retain the above copyright 10*4c7070dbSScott Long * notice, this list of conditions and the following disclaimer. 11*4c7070dbSScott Long * 2. Redistributions in binary form must reproduce the above copyright 12*4c7070dbSScott Long * notice, this list of conditions and the following disclaimer in the 13*4c7070dbSScott Long * documentation and/or other materials provided with the distribution. 14*4c7070dbSScott Long * 15*4c7070dbSScott Long * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 16*4c7070dbSScott Long * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 17*4c7070dbSScott Long * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 18*4c7070dbSScott Long * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 19*4c7070dbSScott Long * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 20*4c7070dbSScott Long * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 21*4c7070dbSScott Long * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 22*4c7070dbSScott Long * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 23*4c7070dbSScott Long * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 24*4c7070dbSScott Long * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 25*4c7070dbSScott Long * SUCH DAMAGE. 26*4c7070dbSScott Long */ 27*4c7070dbSScott Long 28*4c7070dbSScott Long #include <sys/cdefs.h> 29*4c7070dbSScott Long __FBSDID("$FreeBSD$"); 30*4c7070dbSScott Long 31*4c7070dbSScott Long #include <sys/types.h> 32*4c7070dbSScott Long #include <sys/param.h> 33*4c7070dbSScott Long #include <sys/systm.h> 34*4c7070dbSScott Long #include <sys/counter.h> 35*4c7070dbSScott Long #include <sys/lock.h> 36*4c7070dbSScott Long #include <sys/mutex.h> 37*4c7070dbSScott Long #include <sys/malloc.h> 38*4c7070dbSScott Long #include <machine/cpu.h> 39*4c7070dbSScott Long 40*4c7070dbSScott Long 41*4c7070dbSScott Long 42*4c7070dbSScott Long #include <net/mp_ring.h> 43*4c7070dbSScott Long 44*4c7070dbSScott Long #if defined(__i386__) 45*4c7070dbSScott Long #define atomic_cmpset_acq_64 atomic_cmpset_64 46*4c7070dbSScott Long #define atomic_cmpset_rel_64 atomic_cmpset_64 47*4c7070dbSScott Long #endif 48*4c7070dbSScott Long 49*4c7070dbSScott Long union ring_state { 50*4c7070dbSScott Long struct { 51*4c7070dbSScott Long uint16_t pidx_head; 52*4c7070dbSScott Long uint16_t pidx_tail; 53*4c7070dbSScott Long uint16_t cidx; 54*4c7070dbSScott Long uint16_t flags; 55*4c7070dbSScott Long }; 56*4c7070dbSScott Long uint64_t state; 57*4c7070dbSScott Long }; 58*4c7070dbSScott Long 59*4c7070dbSScott Long enum { 60*4c7070dbSScott Long IDLE = 0, /* consumer ran to completion, nothing more to do. */ 61*4c7070dbSScott Long BUSY, /* consumer is running already, or will be shortly. */ 62*4c7070dbSScott Long STALLED, /* consumer stopped due to lack of resources. */ 63*4c7070dbSScott Long ABDICATED, /* consumer stopped even though there was work to be 64*4c7070dbSScott Long done because it wants another thread to take over. */ 65*4c7070dbSScott Long }; 66*4c7070dbSScott Long 67*4c7070dbSScott Long static inline uint16_t 68*4c7070dbSScott Long space_available(struct ifmp_ring *r, union ring_state s) 69*4c7070dbSScott Long { 70*4c7070dbSScott Long uint16_t x = r->size - 1; 71*4c7070dbSScott Long 72*4c7070dbSScott Long if (s.cidx == s.pidx_head) 73*4c7070dbSScott Long return (x); 74*4c7070dbSScott Long else if (s.cidx > s.pidx_head) 75*4c7070dbSScott Long return (s.cidx - s.pidx_head - 1); 76*4c7070dbSScott Long else 77*4c7070dbSScott Long return (x - s.pidx_head + s.cidx); 78*4c7070dbSScott Long } 79*4c7070dbSScott Long 80*4c7070dbSScott Long static inline uint16_t 81*4c7070dbSScott Long increment_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n) 82*4c7070dbSScott Long { 83*4c7070dbSScott Long int x = r->size - idx; 84*4c7070dbSScott Long 85*4c7070dbSScott Long MPASS(x > 0); 86*4c7070dbSScott Long return (x > n ? idx + n : n - x); 87*4c7070dbSScott Long } 88*4c7070dbSScott Long 89*4c7070dbSScott Long /* Consumer is about to update the ring's state to s */ 90*4c7070dbSScott Long static inline uint16_t 91*4c7070dbSScott Long state_to_flags(union ring_state s, int abdicate) 92*4c7070dbSScott Long { 93*4c7070dbSScott Long 94*4c7070dbSScott Long if (s.cidx == s.pidx_tail) 95*4c7070dbSScott Long return (IDLE); 96*4c7070dbSScott Long else if (abdicate && s.pidx_tail != s.pidx_head) 97*4c7070dbSScott Long return (ABDICATED); 98*4c7070dbSScott Long 99*4c7070dbSScott Long return (BUSY); 100*4c7070dbSScott Long } 101*4c7070dbSScott Long 102*4c7070dbSScott Long #ifdef NO_64BIT_ATOMICS 103*4c7070dbSScott Long static void 104*4c7070dbSScott Long drain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget) 105*4c7070dbSScott Long { 106*4c7070dbSScott Long union ring_state ns; 107*4c7070dbSScott Long int n, pending, total; 108*4c7070dbSScott Long uint16_t cidx = os.cidx; 109*4c7070dbSScott Long uint16_t pidx = os.pidx_tail; 110*4c7070dbSScott Long 111*4c7070dbSScott Long MPASS(os.flags == BUSY); 112*4c7070dbSScott Long MPASS(cidx != pidx); 113*4c7070dbSScott Long 114*4c7070dbSScott Long if (prev == IDLE) 115*4c7070dbSScott Long counter_u64_add(r->starts, 1); 116*4c7070dbSScott Long pending = 0; 117*4c7070dbSScott Long total = 0; 118*4c7070dbSScott Long 119*4c7070dbSScott Long while (cidx != pidx) { 120*4c7070dbSScott Long 121*4c7070dbSScott Long /* Items from cidx to pidx are available for consumption. */ 122*4c7070dbSScott Long n = r->drain(r, cidx, pidx); 123*4c7070dbSScott Long if (n == 0) { 124*4c7070dbSScott Long os.state = ns.state = r->state; 125*4c7070dbSScott Long ns.cidx = cidx; 126*4c7070dbSScott Long ns.flags = STALLED; 127*4c7070dbSScott Long r->state = ns.state; 128*4c7070dbSScott Long if (prev != STALLED) 129*4c7070dbSScott Long counter_u64_add(r->stalls, 1); 130*4c7070dbSScott Long else if (total > 0) { 131*4c7070dbSScott Long counter_u64_add(r->restarts, 1); 132*4c7070dbSScott Long counter_u64_add(r->stalls, 1); 133*4c7070dbSScott Long } 134*4c7070dbSScott Long break; 135*4c7070dbSScott Long } 136*4c7070dbSScott Long cidx = increment_idx(r, cidx, n); 137*4c7070dbSScott Long pending += n; 138*4c7070dbSScott Long total += n; 139*4c7070dbSScott Long 140*4c7070dbSScott Long /* 141*4c7070dbSScott Long * We update the cidx only if we've caught up with the pidx, the 142*4c7070dbSScott Long * real cidx is getting too far ahead of the one visible to 143*4c7070dbSScott Long * everyone else, or we have exceeded our budget. 144*4c7070dbSScott Long */ 145*4c7070dbSScott Long if (cidx != pidx && pending < 64 && total < budget) 146*4c7070dbSScott Long continue; 147*4c7070dbSScott Long 148*4c7070dbSScott Long os.state = ns.state = r->state; 149*4c7070dbSScott Long ns.cidx = cidx; 150*4c7070dbSScott Long ns.flags = state_to_flags(ns, total >= budget); 151*4c7070dbSScott Long r->state = ns.state; 152*4c7070dbSScott Long 153*4c7070dbSScott Long if (ns.flags == ABDICATED) 154*4c7070dbSScott Long counter_u64_add(r->abdications, 1); 155*4c7070dbSScott Long if (ns.flags != BUSY) { 156*4c7070dbSScott Long /* Wrong loop exit if we're going to stall. */ 157*4c7070dbSScott Long MPASS(ns.flags != STALLED); 158*4c7070dbSScott Long if (prev == STALLED) { 159*4c7070dbSScott Long MPASS(total > 0); 160*4c7070dbSScott Long counter_u64_add(r->restarts, 1); 161*4c7070dbSScott Long } 162*4c7070dbSScott Long break; 163*4c7070dbSScott Long } 164*4c7070dbSScott Long 165*4c7070dbSScott Long /* 166*4c7070dbSScott Long * The acquire style atomic above guarantees visibility of items 167*4c7070dbSScott Long * associated with any pidx change that we notice here. 168*4c7070dbSScott Long */ 169*4c7070dbSScott Long pidx = ns.pidx_tail; 170*4c7070dbSScott Long pending = 0; 171*4c7070dbSScott Long } 172*4c7070dbSScott Long } 173*4c7070dbSScott Long #else 174*4c7070dbSScott Long /* 175*4c7070dbSScott Long * Caller passes in a state, with a guarantee that there is work to do and that 176*4c7070dbSScott Long * all items up to the pidx_tail in the state are visible. 177*4c7070dbSScott Long */ 178*4c7070dbSScott Long static void 179*4c7070dbSScott Long drain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget) 180*4c7070dbSScott Long { 181*4c7070dbSScott Long union ring_state ns; 182*4c7070dbSScott Long int n, pending, total; 183*4c7070dbSScott Long uint16_t cidx = os.cidx; 184*4c7070dbSScott Long uint16_t pidx = os.pidx_tail; 185*4c7070dbSScott Long 186*4c7070dbSScott Long MPASS(os.flags == BUSY); 187*4c7070dbSScott Long MPASS(cidx != pidx); 188*4c7070dbSScott Long 189*4c7070dbSScott Long if (prev == IDLE) 190*4c7070dbSScott Long counter_u64_add(r->starts, 1); 191*4c7070dbSScott Long pending = 0; 192*4c7070dbSScott Long total = 0; 193*4c7070dbSScott Long 194*4c7070dbSScott Long while (cidx != pidx) { 195*4c7070dbSScott Long 196*4c7070dbSScott Long /* Items from cidx to pidx are available for consumption. */ 197*4c7070dbSScott Long n = r->drain(r, cidx, pidx); 198*4c7070dbSScott Long if (n == 0) { 199*4c7070dbSScott Long critical_enter(); 200*4c7070dbSScott Long do { 201*4c7070dbSScott Long os.state = ns.state = r->state; 202*4c7070dbSScott Long ns.cidx = cidx; 203*4c7070dbSScott Long ns.flags = STALLED; 204*4c7070dbSScott Long } while (atomic_cmpset_64(&r->state, os.state, 205*4c7070dbSScott Long ns.state) == 0); 206*4c7070dbSScott Long critical_exit(); 207*4c7070dbSScott Long if (prev != STALLED) 208*4c7070dbSScott Long counter_u64_add(r->stalls, 1); 209*4c7070dbSScott Long else if (total > 0) { 210*4c7070dbSScott Long counter_u64_add(r->restarts, 1); 211*4c7070dbSScott Long counter_u64_add(r->stalls, 1); 212*4c7070dbSScott Long } 213*4c7070dbSScott Long break; 214*4c7070dbSScott Long } 215*4c7070dbSScott Long cidx = increment_idx(r, cidx, n); 216*4c7070dbSScott Long pending += n; 217*4c7070dbSScott Long total += n; 218*4c7070dbSScott Long 219*4c7070dbSScott Long /* 220*4c7070dbSScott Long * We update the cidx only if we've caught up with the pidx, the 221*4c7070dbSScott Long * real cidx is getting too far ahead of the one visible to 222*4c7070dbSScott Long * everyone else, or we have exceeded our budget. 223*4c7070dbSScott Long */ 224*4c7070dbSScott Long if (cidx != pidx && pending < 64 && total < budget) 225*4c7070dbSScott Long continue; 226*4c7070dbSScott Long critical_enter(); 227*4c7070dbSScott Long do { 228*4c7070dbSScott Long os.state = ns.state = r->state; 229*4c7070dbSScott Long ns.cidx = cidx; 230*4c7070dbSScott Long ns.flags = state_to_flags(ns, total >= budget); 231*4c7070dbSScott Long } while (atomic_cmpset_acq_64(&r->state, os.state, ns.state) == 0); 232*4c7070dbSScott Long critical_exit(); 233*4c7070dbSScott Long 234*4c7070dbSScott Long if (ns.flags == ABDICATED) 235*4c7070dbSScott Long counter_u64_add(r->abdications, 1); 236*4c7070dbSScott Long if (ns.flags != BUSY) { 237*4c7070dbSScott Long /* Wrong loop exit if we're going to stall. */ 238*4c7070dbSScott Long MPASS(ns.flags != STALLED); 239*4c7070dbSScott Long if (prev == STALLED) { 240*4c7070dbSScott Long MPASS(total > 0); 241*4c7070dbSScott Long counter_u64_add(r->restarts, 1); 242*4c7070dbSScott Long } 243*4c7070dbSScott Long break; 244*4c7070dbSScott Long } 245*4c7070dbSScott Long 246*4c7070dbSScott Long /* 247*4c7070dbSScott Long * The acquire style atomic above guarantees visibility of items 248*4c7070dbSScott Long * associated with any pidx change that we notice here. 249*4c7070dbSScott Long */ 250*4c7070dbSScott Long pidx = ns.pidx_tail; 251*4c7070dbSScott Long pending = 0; 252*4c7070dbSScott Long } 253*4c7070dbSScott Long } 254*4c7070dbSScott Long #endif 255*4c7070dbSScott Long 256*4c7070dbSScott Long int 257*4c7070dbSScott Long ifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain, 258*4c7070dbSScott Long mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags) 259*4c7070dbSScott Long { 260*4c7070dbSScott Long struct ifmp_ring *r; 261*4c7070dbSScott Long 262*4c7070dbSScott Long /* All idx are 16b so size can be 65536 at most */ 263*4c7070dbSScott Long if (pr == NULL || size < 2 || size > 65536 || drain == NULL || 264*4c7070dbSScott Long can_drain == NULL) 265*4c7070dbSScott Long return (EINVAL); 266*4c7070dbSScott Long *pr = NULL; 267*4c7070dbSScott Long flags &= M_NOWAIT | M_WAITOK; 268*4c7070dbSScott Long MPASS(flags != 0); 269*4c7070dbSScott Long 270*4c7070dbSScott Long r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO); 271*4c7070dbSScott Long if (r == NULL) 272*4c7070dbSScott Long return (ENOMEM); 273*4c7070dbSScott Long r->size = size; 274*4c7070dbSScott Long r->cookie = cookie; 275*4c7070dbSScott Long r->mt = mt; 276*4c7070dbSScott Long r->drain = drain; 277*4c7070dbSScott Long r->can_drain = can_drain; 278*4c7070dbSScott Long r->enqueues = counter_u64_alloc(flags); 279*4c7070dbSScott Long r->drops = counter_u64_alloc(flags); 280*4c7070dbSScott Long r->starts = counter_u64_alloc(flags); 281*4c7070dbSScott Long r->stalls = counter_u64_alloc(flags); 282*4c7070dbSScott Long r->restarts = counter_u64_alloc(flags); 283*4c7070dbSScott Long r->abdications = counter_u64_alloc(flags); 284*4c7070dbSScott Long if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL || 285*4c7070dbSScott Long r->stalls == NULL || r->restarts == NULL || 286*4c7070dbSScott Long r->abdications == NULL) { 287*4c7070dbSScott Long ifmp_ring_free(r); 288*4c7070dbSScott Long return (ENOMEM); 289*4c7070dbSScott Long } 290*4c7070dbSScott Long 291*4c7070dbSScott Long *pr = r; 292*4c7070dbSScott Long #ifdef NO_64BIT_ATOMICS 293*4c7070dbSScott Long mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF); 294*4c7070dbSScott Long #endif 295*4c7070dbSScott Long return (0); 296*4c7070dbSScott Long } 297*4c7070dbSScott Long 298*4c7070dbSScott Long void 299*4c7070dbSScott Long ifmp_ring_free(struct ifmp_ring *r) 300*4c7070dbSScott Long { 301*4c7070dbSScott Long 302*4c7070dbSScott Long if (r == NULL) 303*4c7070dbSScott Long return; 304*4c7070dbSScott Long 305*4c7070dbSScott Long if (r->enqueues != NULL) 306*4c7070dbSScott Long counter_u64_free(r->enqueues); 307*4c7070dbSScott Long if (r->drops != NULL) 308*4c7070dbSScott Long counter_u64_free(r->drops); 309*4c7070dbSScott Long if (r->starts != NULL) 310*4c7070dbSScott Long counter_u64_free(r->starts); 311*4c7070dbSScott Long if (r->stalls != NULL) 312*4c7070dbSScott Long counter_u64_free(r->stalls); 313*4c7070dbSScott Long if (r->restarts != NULL) 314*4c7070dbSScott Long counter_u64_free(r->restarts); 315*4c7070dbSScott Long if (r->abdications != NULL) 316*4c7070dbSScott Long counter_u64_free(r->abdications); 317*4c7070dbSScott Long 318*4c7070dbSScott Long free(r, r->mt); 319*4c7070dbSScott Long } 320*4c7070dbSScott Long 321*4c7070dbSScott Long /* 322*4c7070dbSScott Long * Enqueue n items and maybe drain the ring for some time. 323*4c7070dbSScott Long * 324*4c7070dbSScott Long * Returns an errno. 325*4c7070dbSScott Long */ 326*4c7070dbSScott Long #ifdef NO_64BIT_ATOMICS 327*4c7070dbSScott Long int 328*4c7070dbSScott Long ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget) 329*4c7070dbSScott Long { 330*4c7070dbSScott Long union ring_state os, ns; 331*4c7070dbSScott Long uint16_t pidx_start, pidx_stop; 332*4c7070dbSScott Long int i; 333*4c7070dbSScott Long 334*4c7070dbSScott Long MPASS(items != NULL); 335*4c7070dbSScott Long MPASS(n > 0); 336*4c7070dbSScott Long 337*4c7070dbSScott Long mtx_lock(&r->lock); 338*4c7070dbSScott Long /* 339*4c7070dbSScott Long * Reserve room for the new items. Our reservation, if successful, is 340*4c7070dbSScott Long * from 'pidx_start' to 'pidx_stop'. 341*4c7070dbSScott Long */ 342*4c7070dbSScott Long os.state = r->state; 343*4c7070dbSScott Long if (n >= space_available(r, os)) { 344*4c7070dbSScott Long counter_u64_add(r->drops, n); 345*4c7070dbSScott Long MPASS(os.flags != IDLE); 346*4c7070dbSScott Long if (os.flags == STALLED) 347*4c7070dbSScott Long ifmp_ring_check_drainage(r, 0); 348*4c7070dbSScott Long return (ENOBUFS); 349*4c7070dbSScott Long } 350*4c7070dbSScott Long ns.state = os.state; 351*4c7070dbSScott Long ns.pidx_head = increment_idx(r, os.pidx_head, n); 352*4c7070dbSScott Long r->state = ns.state; 353*4c7070dbSScott Long pidx_start = os.pidx_head; 354*4c7070dbSScott Long pidx_stop = ns.pidx_head; 355*4c7070dbSScott Long 356*4c7070dbSScott Long /* 357*4c7070dbSScott Long * Wait for other producers who got in ahead of us to enqueue their 358*4c7070dbSScott Long * items, one producer at a time. It is our turn when the ring's 359*4c7070dbSScott Long * pidx_tail reaches the begining of our reservation (pidx_start). 360*4c7070dbSScott Long */ 361*4c7070dbSScott Long while (ns.pidx_tail != pidx_start) { 362*4c7070dbSScott Long cpu_spinwait(); 363*4c7070dbSScott Long ns.state = r->state; 364*4c7070dbSScott Long } 365*4c7070dbSScott Long 366*4c7070dbSScott Long /* Now it is our turn to fill up the area we reserved earlier. */ 367*4c7070dbSScott Long i = pidx_start; 368*4c7070dbSScott Long do { 369*4c7070dbSScott Long r->items[i] = *items++; 370*4c7070dbSScott Long if (__predict_false(++i == r->size)) 371*4c7070dbSScott Long i = 0; 372*4c7070dbSScott Long } while (i != pidx_stop); 373*4c7070dbSScott Long 374*4c7070dbSScott Long /* 375*4c7070dbSScott Long * Update the ring's pidx_tail. The release style atomic guarantees 376*4c7070dbSScott Long * that the items are visible to any thread that sees the updated pidx. 377*4c7070dbSScott Long */ 378*4c7070dbSScott Long os.state = ns.state = r->state; 379*4c7070dbSScott Long ns.pidx_tail = pidx_stop; 380*4c7070dbSScott Long ns.flags = BUSY; 381*4c7070dbSScott Long r->state = ns.state; 382*4c7070dbSScott Long counter_u64_add(r->enqueues, n); 383*4c7070dbSScott Long 384*4c7070dbSScott Long /* 385*4c7070dbSScott Long * Turn into a consumer if some other thread isn't active as a consumer 386*4c7070dbSScott Long * already. 387*4c7070dbSScott Long */ 388*4c7070dbSScott Long if (os.flags != BUSY) 389*4c7070dbSScott Long drain_ring_locked(r, ns, os.flags, budget); 390*4c7070dbSScott Long 391*4c7070dbSScott Long mtx_unlock(&r->lock); 392*4c7070dbSScott Long return (0); 393*4c7070dbSScott Long } 394*4c7070dbSScott Long 395*4c7070dbSScott Long #else 396*4c7070dbSScott Long int 397*4c7070dbSScott Long ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget) 398*4c7070dbSScott Long { 399*4c7070dbSScott Long union ring_state os, ns; 400*4c7070dbSScott Long uint16_t pidx_start, pidx_stop; 401*4c7070dbSScott Long int i; 402*4c7070dbSScott Long 403*4c7070dbSScott Long MPASS(items != NULL); 404*4c7070dbSScott Long MPASS(n > 0); 405*4c7070dbSScott Long 406*4c7070dbSScott Long /* 407*4c7070dbSScott Long * Reserve room for the new items. Our reservation, if successful, is 408*4c7070dbSScott Long * from 'pidx_start' to 'pidx_stop'. 409*4c7070dbSScott Long */ 410*4c7070dbSScott Long for (;;) { 411*4c7070dbSScott Long os.state = r->state; 412*4c7070dbSScott Long if (n >= space_available(r, os)) { 413*4c7070dbSScott Long counter_u64_add(r->drops, n); 414*4c7070dbSScott Long MPASS(os.flags != IDLE); 415*4c7070dbSScott Long if (os.flags == STALLED) 416*4c7070dbSScott Long ifmp_ring_check_drainage(r, 0); 417*4c7070dbSScott Long return (ENOBUFS); 418*4c7070dbSScott Long } 419*4c7070dbSScott Long ns.state = os.state; 420*4c7070dbSScott Long ns.pidx_head = increment_idx(r, os.pidx_head, n); 421*4c7070dbSScott Long critical_enter(); 422*4c7070dbSScott Long if (atomic_cmpset_64(&r->state, os.state, ns.state)) 423*4c7070dbSScott Long break; 424*4c7070dbSScott Long critical_exit(); 425*4c7070dbSScott Long cpu_spinwait(); 426*4c7070dbSScott Long } 427*4c7070dbSScott Long pidx_start = os.pidx_head; 428*4c7070dbSScott Long pidx_stop = ns.pidx_head; 429*4c7070dbSScott Long 430*4c7070dbSScott Long /* 431*4c7070dbSScott Long * Wait for other producers who got in ahead of us to enqueue their 432*4c7070dbSScott Long * items, one producer at a time. It is our turn when the ring's 433*4c7070dbSScott Long * pidx_tail reaches the begining of our reservation (pidx_start). 434*4c7070dbSScott Long */ 435*4c7070dbSScott Long while (ns.pidx_tail != pidx_start) { 436*4c7070dbSScott Long cpu_spinwait(); 437*4c7070dbSScott Long ns.state = r->state; 438*4c7070dbSScott Long } 439*4c7070dbSScott Long 440*4c7070dbSScott Long /* Now it is our turn to fill up the area we reserved earlier. */ 441*4c7070dbSScott Long i = pidx_start; 442*4c7070dbSScott Long do { 443*4c7070dbSScott Long r->items[i] = *items++; 444*4c7070dbSScott Long if (__predict_false(++i == r->size)) 445*4c7070dbSScott Long i = 0; 446*4c7070dbSScott Long } while (i != pidx_stop); 447*4c7070dbSScott Long 448*4c7070dbSScott Long /* 449*4c7070dbSScott Long * Update the ring's pidx_tail. The release style atomic guarantees 450*4c7070dbSScott Long * that the items are visible to any thread that sees the updated pidx. 451*4c7070dbSScott Long */ 452*4c7070dbSScott Long do { 453*4c7070dbSScott Long os.state = ns.state = r->state; 454*4c7070dbSScott Long ns.pidx_tail = pidx_stop; 455*4c7070dbSScott Long ns.flags = BUSY; 456*4c7070dbSScott Long } while (atomic_cmpset_rel_64(&r->state, os.state, ns.state) == 0); 457*4c7070dbSScott Long critical_exit(); 458*4c7070dbSScott Long counter_u64_add(r->enqueues, n); 459*4c7070dbSScott Long 460*4c7070dbSScott Long /* 461*4c7070dbSScott Long * Turn into a consumer if some other thread isn't active as a consumer 462*4c7070dbSScott Long * already. 463*4c7070dbSScott Long */ 464*4c7070dbSScott Long if (os.flags != BUSY) 465*4c7070dbSScott Long drain_ring_lockless(r, ns, os.flags, budget); 466*4c7070dbSScott Long 467*4c7070dbSScott Long return (0); 468*4c7070dbSScott Long } 469*4c7070dbSScott Long #endif 470*4c7070dbSScott Long 471*4c7070dbSScott Long void 472*4c7070dbSScott Long ifmp_ring_check_drainage(struct ifmp_ring *r, int budget) 473*4c7070dbSScott Long { 474*4c7070dbSScott Long union ring_state os, ns; 475*4c7070dbSScott Long 476*4c7070dbSScott Long os.state = r->state; 477*4c7070dbSScott Long if (os.flags != STALLED || os.pidx_head != os.pidx_tail || r->can_drain(r) == 0) 478*4c7070dbSScott Long return; 479*4c7070dbSScott Long 480*4c7070dbSScott Long MPASS(os.cidx != os.pidx_tail); /* implied by STALLED */ 481*4c7070dbSScott Long ns.state = os.state; 482*4c7070dbSScott Long ns.flags = BUSY; 483*4c7070dbSScott Long 484*4c7070dbSScott Long 485*4c7070dbSScott Long #ifdef NO_64BIT_ATOMICS 486*4c7070dbSScott Long mtx_lock(&r->lock); 487*4c7070dbSScott Long if (r->state != os.state) { 488*4c7070dbSScott Long mtx_unlock(&r->lock); 489*4c7070dbSScott Long return; 490*4c7070dbSScott Long } 491*4c7070dbSScott Long r->state = ns.state; 492*4c7070dbSScott Long drain_ring_locked(r, ns, os.flags, budget); 493*4c7070dbSScott Long mtx_unlock(&r->lock); 494*4c7070dbSScott Long #else 495*4c7070dbSScott Long /* 496*4c7070dbSScott Long * The acquire style atomic guarantees visibility of items associated 497*4c7070dbSScott Long * with the pidx that we read here. 498*4c7070dbSScott Long */ 499*4c7070dbSScott Long if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state)) 500*4c7070dbSScott Long return; 501*4c7070dbSScott Long 502*4c7070dbSScott Long 503*4c7070dbSScott Long drain_ring_lockless(r, ns, os.flags, budget); 504*4c7070dbSScott Long #endif 505*4c7070dbSScott Long } 506*4c7070dbSScott Long 507*4c7070dbSScott Long void 508*4c7070dbSScott Long ifmp_ring_reset_stats(struct ifmp_ring *r) 509*4c7070dbSScott Long { 510*4c7070dbSScott Long 511*4c7070dbSScott Long counter_u64_zero(r->enqueues); 512*4c7070dbSScott Long counter_u64_zero(r->drops); 513*4c7070dbSScott Long counter_u64_zero(r->starts); 514*4c7070dbSScott Long counter_u64_zero(r->stalls); 515*4c7070dbSScott Long counter_u64_zero(r->restarts); 516*4c7070dbSScott Long counter_u64_zero(r->abdications); 517*4c7070dbSScott Long } 518*4c7070dbSScott Long 519*4c7070dbSScott Long int 520*4c7070dbSScott Long ifmp_ring_is_idle(struct ifmp_ring *r) 521*4c7070dbSScott Long { 522*4c7070dbSScott Long union ring_state s; 523*4c7070dbSScott Long 524*4c7070dbSScott Long s.state = r->state; 525*4c7070dbSScott Long if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx && 526*4c7070dbSScott Long s.flags == IDLE) 527*4c7070dbSScott Long return (1); 528*4c7070dbSScott Long 529*4c7070dbSScott Long return (0); 530*4c7070dbSScott Long } 531*4c7070dbSScott Long 532*4c7070dbSScott Long int 533*4c7070dbSScott Long ifmp_ring_is_stalled(struct ifmp_ring *r) 534*4c7070dbSScott Long { 535*4c7070dbSScott Long union ring_state s; 536*4c7070dbSScott Long 537*4c7070dbSScott Long s.state = r->state; 538*4c7070dbSScott Long if (s.pidx_head == s.pidx_tail && s.flags == STALLED) 539*4c7070dbSScott Long return (1); 540*4c7070dbSScott Long 541*4c7070dbSScott Long return (0); 542*4c7070dbSScott Long } 543