1 /*- 2 * Copyright (c) 2014 Chelsio Communications, Inc. 3 * All rights reserved. 4 * Written by: Navdeep Parhar <np@FreeBSD.org> 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 1. Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 16 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 18 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 19 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 20 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 21 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 22 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 23 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 24 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 25 * SUCH DAMAGE. 26 */ 27 28 #include <sys/cdefs.h> 29 #include <sys/types.h> 30 #include <sys/param.h> 31 #include <sys/systm.h> 32 #include <sys/counter.h> 33 #include <sys/lock.h> 34 #include <sys/mutex.h> 35 #include <sys/malloc.h> 36 #include <machine/cpu.h> 37 #include <net/mp_ring.h> 38 39 union ring_state { 40 struct { 41 uint16_t pidx_head; 42 uint16_t pidx_tail; 43 uint16_t cidx; 44 uint16_t flags; 45 }; 46 uint64_t state; 47 }; 48 49 enum { 50 IDLE = 0, /* consumer ran to completion, nothing more to do. */ 51 BUSY, /* consumer is running already, or will be shortly. */ 52 STALLED, /* consumer stopped due to lack of resources. */ 53 ABDICATED, /* consumer stopped even though there was work to be 54 done because it wants another thread to take over. */ 55 }; 56 57 static inline uint16_t 58 space_available(struct ifmp_ring *r, union ring_state s) 59 { 60 uint16_t x = r->size - 1; 61 62 if (s.cidx == s.pidx_head) 63 return (x); 64 else if (s.cidx > s.pidx_head) 65 return (s.cidx - s.pidx_head - 1); 66 else 67 return (x - s.pidx_head + s.cidx); 68 } 69 70 static inline uint16_t 71 increment_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n) 72 { 73 int x = r->size - idx; 74 75 MPASS(x > 0); 76 return (x > n ? idx + n : n - x); 77 } 78 79 /* Consumer is about to update the ring's state to s */ 80 static inline uint16_t 81 state_to_flags(union ring_state s, int abdicate) 82 { 83 84 if (s.cidx == s.pidx_tail) 85 return (IDLE); 86 else if (abdicate && s.pidx_tail != s.pidx_head) 87 return (ABDICATED); 88 89 return (BUSY); 90 } 91 92 #ifdef MP_RING_NO_64BIT_ATOMICS 93 static void 94 drain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget) 95 { 96 union ring_state ns; 97 int n, pending, total; 98 uint16_t cidx = os.cidx; 99 uint16_t pidx = os.pidx_tail; 100 101 MPASS(os.flags == BUSY); 102 MPASS(cidx != pidx); 103 104 if (prev == IDLE) 105 counter_u64_add(r->starts, 1); 106 pending = 0; 107 total = 0; 108 109 while (cidx != pidx) { 110 /* Items from cidx to pidx are available for consumption. */ 111 n = r->drain(r, cidx, pidx); 112 if (n == 0) { 113 os.state = ns.state = r->state; 114 ns.cidx = cidx; 115 ns.flags = STALLED; 116 r->state = ns.state; 117 if (prev != STALLED) 118 counter_u64_add(r->stalls, 1); 119 else if (total > 0) { 120 counter_u64_add(r->restarts, 1); 121 counter_u64_add(r->stalls, 1); 122 } 123 break; 124 } 125 cidx = increment_idx(r, cidx, n); 126 pending += n; 127 total += n; 128 129 /* 130 * We update the cidx only if we've caught up with the pidx, the 131 * real cidx is getting too far ahead of the one visible to 132 * everyone else, or we have exceeded our budget. 133 */ 134 if (cidx != pidx && pending < 64 && total < budget) 135 continue; 136 137 os.state = ns.state = r->state; 138 ns.cidx = cidx; 139 ns.flags = state_to_flags(ns, total >= budget); 140 r->state = ns.state; 141 142 if (ns.flags == ABDICATED) 143 counter_u64_add(r->abdications, 1); 144 if (ns.flags != BUSY) { 145 /* Wrong loop exit if we're going to stall. */ 146 MPASS(ns.flags != STALLED); 147 if (prev == STALLED) { 148 MPASS(total > 0); 149 counter_u64_add(r->restarts, 1); 150 } 151 break; 152 } 153 154 /* 155 * The acquire style atomic above guarantees visibility of items 156 * associated with any pidx change that we notice here. 157 */ 158 pidx = ns.pidx_tail; 159 pending = 0; 160 } 161 } 162 #else 163 /* 164 * Caller passes in a state, with a guarantee that there is work to do and that 165 * all items up to the pidx_tail in the state are visible. 166 */ 167 static void 168 drain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget) 169 { 170 union ring_state ns; 171 int n, pending, total; 172 uint16_t cidx = os.cidx; 173 uint16_t pidx = os.pidx_tail; 174 175 MPASS(os.flags == BUSY); 176 MPASS(cidx != pidx); 177 178 if (prev == IDLE) 179 counter_u64_add(r->starts, 1); 180 pending = 0; 181 total = 0; 182 183 while (cidx != pidx) { 184 /* Items from cidx to pidx are available for consumption. */ 185 n = r->drain(r, cidx, pidx); 186 if (n == 0) { 187 critical_enter(); 188 os.state = r->state; 189 do { 190 ns.state = os.state; 191 ns.cidx = cidx; 192 ns.flags = STALLED; 193 } while (atomic_fcmpset_64(&r->state, &os.state, 194 ns.state) == 0); 195 critical_exit(); 196 if (prev != STALLED) 197 counter_u64_add(r->stalls, 1); 198 else if (total > 0) { 199 counter_u64_add(r->restarts, 1); 200 counter_u64_add(r->stalls, 1); 201 } 202 break; 203 } 204 cidx = increment_idx(r, cidx, n); 205 pending += n; 206 total += n; 207 208 /* 209 * We update the cidx only if we've caught up with the pidx, the 210 * real cidx is getting too far ahead of the one visible to 211 * everyone else, or we have exceeded our budget. 212 */ 213 if (cidx != pidx && pending < 64 && total < budget) 214 continue; 215 critical_enter(); 216 os.state = r->state; 217 do { 218 ns.state = os.state; 219 ns.cidx = cidx; 220 ns.flags = state_to_flags(ns, total >= budget); 221 } while (atomic_fcmpset_acq_64(&r->state, &os.state, 222 ns.state) == 0); 223 critical_exit(); 224 225 if (ns.flags == ABDICATED) 226 counter_u64_add(r->abdications, 1); 227 if (ns.flags != BUSY) { 228 /* Wrong loop exit if we're going to stall. */ 229 MPASS(ns.flags != STALLED); 230 if (prev == STALLED) { 231 MPASS(total > 0); 232 counter_u64_add(r->restarts, 1); 233 } 234 break; 235 } 236 237 /* 238 * The acquire style atomic above guarantees visibility of items 239 * associated with any pidx change that we notice here. 240 */ 241 pidx = ns.pidx_tail; 242 pending = 0; 243 } 244 } 245 #endif 246 247 int 248 ifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain, 249 mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags) 250 { 251 struct ifmp_ring *r; 252 253 /* All idx are 16b so size can be 65536 at most */ 254 if (pr == NULL || size < 2 || size > 65536 || drain == NULL || 255 can_drain == NULL) 256 return (EINVAL); 257 *pr = NULL; 258 flags &= M_NOWAIT | M_WAITOK; 259 MPASS(flags != 0); 260 261 r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO); 262 if (r == NULL) 263 return (ENOMEM); 264 r->size = size; 265 r->cookie = cookie; 266 r->mt = mt; 267 r->drain = drain; 268 r->can_drain = can_drain; 269 r->enqueues = counter_u64_alloc(flags); 270 r->drops = counter_u64_alloc(flags); 271 r->starts = counter_u64_alloc(flags); 272 r->stalls = counter_u64_alloc(flags); 273 r->restarts = counter_u64_alloc(flags); 274 r->abdications = counter_u64_alloc(flags); 275 if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL || 276 r->stalls == NULL || r->restarts == NULL || 277 r->abdications == NULL) { 278 ifmp_ring_free(r); 279 return (ENOMEM); 280 } 281 282 *pr = r; 283 #ifdef MP_RING_NO_64BIT_ATOMICS 284 mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF); 285 #endif 286 return (0); 287 } 288 289 void 290 ifmp_ring_free(struct ifmp_ring *r) 291 { 292 293 if (r == NULL) 294 return; 295 296 if (r->enqueues != NULL) 297 counter_u64_free(r->enqueues); 298 if (r->drops != NULL) 299 counter_u64_free(r->drops); 300 if (r->starts != NULL) 301 counter_u64_free(r->starts); 302 if (r->stalls != NULL) 303 counter_u64_free(r->stalls); 304 if (r->restarts != NULL) 305 counter_u64_free(r->restarts); 306 if (r->abdications != NULL) 307 counter_u64_free(r->abdications); 308 309 free(r, r->mt); 310 } 311 312 /* 313 * Enqueue n items and maybe drain the ring for some time. 314 * 315 * Returns an errno. 316 */ 317 #ifdef MP_RING_NO_64BIT_ATOMICS 318 int 319 ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate) 320 { 321 union ring_state os, ns; 322 uint16_t pidx_start, pidx_stop; 323 int i; 324 325 MPASS(items != NULL); 326 MPASS(n > 0); 327 328 mtx_lock(&r->lock); 329 /* 330 * Reserve room for the new items. Our reservation, if successful, is 331 * from 'pidx_start' to 'pidx_stop'. 332 */ 333 os.state = r->state; 334 if (n >= space_available(r, os)) { 335 counter_u64_add(r->drops, n); 336 MPASS(os.flags != IDLE); 337 mtx_unlock(&r->lock); 338 if (os.flags == STALLED) 339 ifmp_ring_check_drainage(r, 0); 340 return (ENOBUFS); 341 } 342 ns.state = os.state; 343 ns.pidx_head = increment_idx(r, os.pidx_head, n); 344 r->state = ns.state; 345 pidx_start = os.pidx_head; 346 pidx_stop = ns.pidx_head; 347 348 /* 349 * Wait for other producers who got in ahead of us to enqueue their 350 * items, one producer at a time. It is our turn when the ring's 351 * pidx_tail reaches the beginning of our reservation (pidx_start). 352 */ 353 while (ns.pidx_tail != pidx_start) { 354 cpu_spinwait(); 355 ns.state = r->state; 356 } 357 358 /* Now it is our turn to fill up the area we reserved earlier. */ 359 i = pidx_start; 360 do { 361 r->items[i] = *items++; 362 if (__predict_false(++i == r->size)) 363 i = 0; 364 } while (i != pidx_stop); 365 366 /* 367 * Update the ring's pidx_tail. The release style atomic guarantees 368 * that the items are visible to any thread that sees the updated pidx. 369 */ 370 os.state = ns.state = r->state; 371 ns.pidx_tail = pidx_stop; 372 if (abdicate) { 373 if (os.flags == IDLE) 374 ns.flags = ABDICATED; 375 } else 376 ns.flags = BUSY; 377 r->state = ns.state; 378 counter_u64_add(r->enqueues, n); 379 380 if (!abdicate) { 381 /* 382 * Turn into a consumer if some other thread isn't active as a consumer 383 * already. 384 */ 385 if (os.flags != BUSY) 386 drain_ring_locked(r, ns, os.flags, budget); 387 } 388 389 mtx_unlock(&r->lock); 390 return (0); 391 } 392 #else 393 int 394 ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate) 395 { 396 union ring_state os, ns; 397 uint16_t pidx_start, pidx_stop; 398 int i; 399 400 MPASS(items != NULL); 401 MPASS(n > 0); 402 403 /* 404 * Reserve room for the new items. Our reservation, if successful, is 405 * from 'pidx_start' to 'pidx_stop'. 406 */ 407 os.state = r->state; 408 for (;;) { 409 if (n >= space_available(r, os)) { 410 counter_u64_add(r->drops, n); 411 MPASS(os.flags != IDLE); 412 if (os.flags == STALLED) 413 ifmp_ring_check_drainage(r, 0); 414 return (ENOBUFS); 415 } 416 ns.state = os.state; 417 ns.pidx_head = increment_idx(r, os.pidx_head, n); 418 critical_enter(); 419 if (atomic_fcmpset_64(&r->state, &os.state, ns.state)) 420 break; 421 critical_exit(); 422 cpu_spinwait(); 423 } 424 pidx_start = os.pidx_head; 425 pidx_stop = ns.pidx_head; 426 427 /* 428 * Wait for other producers who got in ahead of us to enqueue their 429 * items, one producer at a time. It is our turn when the ring's 430 * pidx_tail reaches the beginning of our reservation (pidx_start). 431 */ 432 while (ns.pidx_tail != pidx_start) { 433 cpu_spinwait(); 434 ns.state = r->state; 435 } 436 437 /* Now it is our turn to fill up the area we reserved earlier. */ 438 i = pidx_start; 439 do { 440 r->items[i] = *items++; 441 if (__predict_false(++i == r->size)) 442 i = 0; 443 } while (i != pidx_stop); 444 445 /* 446 * Update the ring's pidx_tail. The release style atomic guarantees 447 * that the items are visible to any thread that sees the updated pidx. 448 */ 449 os.state = r->state; 450 do { 451 ns.state = os.state; 452 ns.pidx_tail = pidx_stop; 453 if (abdicate) { 454 if (os.flags == IDLE) 455 ns.flags = ABDICATED; 456 } else 457 ns.flags = BUSY; 458 } while (atomic_fcmpset_rel_64(&r->state, &os.state, ns.state) == 0); 459 critical_exit(); 460 counter_u64_add(r->enqueues, n); 461 462 if (!abdicate) { 463 /* 464 * Turn into a consumer if some other thread isn't active as a consumer 465 * already. 466 */ 467 if (os.flags != BUSY) 468 drain_ring_lockless(r, ns, os.flags, budget); 469 } 470 471 return (0); 472 } 473 #endif 474 475 void 476 ifmp_ring_check_drainage(struct ifmp_ring *r, int budget) 477 { 478 union ring_state os, ns; 479 480 os.state = r->state; 481 if ((os.flags != STALLED && os.flags != ABDICATED) || // Only continue in STALLED and ABDICATED 482 os.pidx_head != os.pidx_tail || // Require work to be available 483 (os.flags != ABDICATED && r->can_drain(r) == 0)) // Can either drain, or everyone left 484 return; 485 486 MPASS(os.cidx != os.pidx_tail); /* implied by STALLED */ 487 ns.state = os.state; 488 ns.flags = BUSY; 489 490 #ifdef MP_RING_NO_64BIT_ATOMICS 491 mtx_lock(&r->lock); 492 if (r->state != os.state) { 493 mtx_unlock(&r->lock); 494 return; 495 } 496 r->state = ns.state; 497 drain_ring_locked(r, ns, os.flags, budget); 498 mtx_unlock(&r->lock); 499 #else 500 /* 501 * The acquire style atomic guarantees visibility of items associated 502 * with the pidx that we read here. 503 */ 504 if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state)) 505 return; 506 507 drain_ring_lockless(r, ns, os.flags, budget); 508 #endif 509 } 510 511 void 512 ifmp_ring_reset_stats(struct ifmp_ring *r) 513 { 514 515 counter_u64_zero(r->enqueues); 516 counter_u64_zero(r->drops); 517 counter_u64_zero(r->starts); 518 counter_u64_zero(r->stalls); 519 counter_u64_zero(r->restarts); 520 counter_u64_zero(r->abdications); 521 } 522 523 int 524 ifmp_ring_is_idle(struct ifmp_ring *r) 525 { 526 union ring_state s; 527 528 s.state = r->state; 529 if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx && 530 s.flags == IDLE) 531 return (1); 532 533 return (0); 534 } 535 536 int 537 ifmp_ring_is_stalled(struct ifmp_ring *r) 538 { 539 union ring_state s; 540 541 s.state = r->state; 542 if (s.pidx_head == s.pidx_tail && s.flags == STALLED) 543 return (1); 544 545 return (0); 546 } 547