1 /*- 2 * Copyright (c) 2014 Chelsio Communications, Inc. 3 * All rights reserved. 4 * Written by: Navdeep Parhar <np@FreeBSD.org> 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 1. Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 16 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 18 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 19 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 20 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 21 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 22 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 23 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 24 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 25 * SUCH DAMAGE. 26 */ 27 28 #include <sys/cdefs.h> 29 __FBSDID("$FreeBSD$"); 30 31 #include <sys/types.h> 32 #include <sys/param.h> 33 #include <sys/systm.h> 34 #include <sys/counter.h> 35 #include <sys/lock.h> 36 #include <sys/mutex.h> 37 #include <sys/malloc.h> 38 #include <machine/cpu.h> 39 #include <net/mp_ring.h> 40 41 union ring_state { 42 struct { 43 uint16_t pidx_head; 44 uint16_t pidx_tail; 45 uint16_t cidx; 46 uint16_t flags; 47 }; 48 uint64_t state; 49 }; 50 51 enum { 52 IDLE = 0, /* consumer ran to completion, nothing more to do. */ 53 BUSY, /* consumer is running already, or will be shortly. */ 54 STALLED, /* consumer stopped due to lack of resources. */ 55 ABDICATED, /* consumer stopped even though there was work to be 56 done because it wants another thread to take over. */ 57 }; 58 59 static inline uint16_t 60 space_available(struct ifmp_ring *r, union ring_state s) 61 { 62 uint16_t x = r->size - 1; 63 64 if (s.cidx == s.pidx_head) 65 return (x); 66 else if (s.cidx > s.pidx_head) 67 return (s.cidx - s.pidx_head - 1); 68 else 69 return (x - s.pidx_head + s.cidx); 70 } 71 72 static inline uint16_t 73 increment_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n) 74 { 75 int x = r->size - idx; 76 77 MPASS(x > 0); 78 return (x > n ? idx + n : n - x); 79 } 80 81 /* Consumer is about to update the ring's state to s */ 82 static inline uint16_t 83 state_to_flags(union ring_state s, int abdicate) 84 { 85 86 if (s.cidx == s.pidx_tail) 87 return (IDLE); 88 else if (abdicate && s.pidx_tail != s.pidx_head) 89 return (ABDICATED); 90 91 return (BUSY); 92 } 93 94 #ifdef MP_RING_NO_64BIT_ATOMICS 95 static void 96 drain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget) 97 { 98 union ring_state ns; 99 int n, pending, total; 100 uint16_t cidx = os.cidx; 101 uint16_t pidx = os.pidx_tail; 102 103 MPASS(os.flags == BUSY); 104 MPASS(cidx != pidx); 105 106 if (prev == IDLE) 107 counter_u64_add(r->starts, 1); 108 pending = 0; 109 total = 0; 110 111 while (cidx != pidx) { 112 /* Items from cidx to pidx are available for consumption. */ 113 n = r->drain(r, cidx, pidx); 114 if (n == 0) { 115 os.state = ns.state = r->state; 116 ns.cidx = cidx; 117 ns.flags = STALLED; 118 r->state = ns.state; 119 if (prev != STALLED) 120 counter_u64_add(r->stalls, 1); 121 else if (total > 0) { 122 counter_u64_add(r->restarts, 1); 123 counter_u64_add(r->stalls, 1); 124 } 125 break; 126 } 127 cidx = increment_idx(r, cidx, n); 128 pending += n; 129 total += n; 130 131 /* 132 * We update the cidx only if we've caught up with the pidx, the 133 * real cidx is getting too far ahead of the one visible to 134 * everyone else, or we have exceeded our budget. 135 */ 136 if (cidx != pidx && pending < 64 && total < budget) 137 continue; 138 139 os.state = ns.state = r->state; 140 ns.cidx = cidx; 141 ns.flags = state_to_flags(ns, total >= budget); 142 r->state = ns.state; 143 144 if (ns.flags == ABDICATED) 145 counter_u64_add(r->abdications, 1); 146 if (ns.flags != BUSY) { 147 /* Wrong loop exit if we're going to stall. */ 148 MPASS(ns.flags != STALLED); 149 if (prev == STALLED) { 150 MPASS(total > 0); 151 counter_u64_add(r->restarts, 1); 152 } 153 break; 154 } 155 156 /* 157 * The acquire style atomic above guarantees visibility of items 158 * associated with any pidx change that we notice here. 159 */ 160 pidx = ns.pidx_tail; 161 pending = 0; 162 } 163 } 164 #else 165 /* 166 * Caller passes in a state, with a guarantee that there is work to do and that 167 * all items up to the pidx_tail in the state are visible. 168 */ 169 static void 170 drain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget) 171 { 172 union ring_state ns; 173 int n, pending, total; 174 uint16_t cidx = os.cidx; 175 uint16_t pidx = os.pidx_tail; 176 177 MPASS(os.flags == BUSY); 178 MPASS(cidx != pidx); 179 180 if (prev == IDLE) 181 counter_u64_add(r->starts, 1); 182 pending = 0; 183 total = 0; 184 185 while (cidx != pidx) { 186 /* Items from cidx to pidx are available for consumption. */ 187 n = r->drain(r, cidx, pidx); 188 if (n == 0) { 189 critical_enter(); 190 os.state = r->state; 191 do { 192 ns.state = os.state; 193 ns.cidx = cidx; 194 ns.flags = STALLED; 195 } while (atomic_fcmpset_64(&r->state, &os.state, 196 ns.state) == 0); 197 critical_exit(); 198 if (prev != STALLED) 199 counter_u64_add(r->stalls, 1); 200 else if (total > 0) { 201 counter_u64_add(r->restarts, 1); 202 counter_u64_add(r->stalls, 1); 203 } 204 break; 205 } 206 cidx = increment_idx(r, cidx, n); 207 pending += n; 208 total += n; 209 210 /* 211 * We update the cidx only if we've caught up with the pidx, the 212 * real cidx is getting too far ahead of the one visible to 213 * everyone else, or we have exceeded our budget. 214 */ 215 if (cidx != pidx && pending < 64 && total < budget) 216 continue; 217 critical_enter(); 218 os.state = r->state; 219 do { 220 ns.state = os.state; 221 ns.cidx = cidx; 222 ns.flags = state_to_flags(ns, total >= budget); 223 } while (atomic_fcmpset_acq_64(&r->state, &os.state, 224 ns.state) == 0); 225 critical_exit(); 226 227 if (ns.flags == ABDICATED) 228 counter_u64_add(r->abdications, 1); 229 if (ns.flags != BUSY) { 230 /* Wrong loop exit if we're going to stall. */ 231 MPASS(ns.flags != STALLED); 232 if (prev == STALLED) { 233 MPASS(total > 0); 234 counter_u64_add(r->restarts, 1); 235 } 236 break; 237 } 238 239 /* 240 * The acquire style atomic above guarantees visibility of items 241 * associated with any pidx change that we notice here. 242 */ 243 pidx = ns.pidx_tail; 244 pending = 0; 245 } 246 } 247 #endif 248 249 int 250 ifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain, 251 mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags) 252 { 253 struct ifmp_ring *r; 254 255 /* All idx are 16b so size can be 65536 at most */ 256 if (pr == NULL || size < 2 || size > 65536 || drain == NULL || 257 can_drain == NULL) 258 return (EINVAL); 259 *pr = NULL; 260 flags &= M_NOWAIT | M_WAITOK; 261 MPASS(flags != 0); 262 263 r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO); 264 if (r == NULL) 265 return (ENOMEM); 266 r->size = size; 267 r->cookie = cookie; 268 r->mt = mt; 269 r->drain = drain; 270 r->can_drain = can_drain; 271 r->enqueues = counter_u64_alloc(flags); 272 r->drops = counter_u64_alloc(flags); 273 r->starts = counter_u64_alloc(flags); 274 r->stalls = counter_u64_alloc(flags); 275 r->restarts = counter_u64_alloc(flags); 276 r->abdications = counter_u64_alloc(flags); 277 if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL || 278 r->stalls == NULL || r->restarts == NULL || 279 r->abdications == NULL) { 280 ifmp_ring_free(r); 281 return (ENOMEM); 282 } 283 284 *pr = r; 285 #ifdef MP_RING_NO_64BIT_ATOMICS 286 mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF); 287 #endif 288 return (0); 289 } 290 291 void 292 ifmp_ring_free(struct ifmp_ring *r) 293 { 294 295 if (r == NULL) 296 return; 297 298 if (r->enqueues != NULL) 299 counter_u64_free(r->enqueues); 300 if (r->drops != NULL) 301 counter_u64_free(r->drops); 302 if (r->starts != NULL) 303 counter_u64_free(r->starts); 304 if (r->stalls != NULL) 305 counter_u64_free(r->stalls); 306 if (r->restarts != NULL) 307 counter_u64_free(r->restarts); 308 if (r->abdications != NULL) 309 counter_u64_free(r->abdications); 310 311 free(r, r->mt); 312 } 313 314 /* 315 * Enqueue n items and maybe drain the ring for some time. 316 * 317 * Returns an errno. 318 */ 319 #ifdef MP_RING_NO_64BIT_ATOMICS 320 int 321 ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate) 322 { 323 union ring_state os, ns; 324 uint16_t pidx_start, pidx_stop; 325 int i; 326 327 MPASS(items != NULL); 328 MPASS(n > 0); 329 330 mtx_lock(&r->lock); 331 /* 332 * Reserve room for the new items. Our reservation, if successful, is 333 * from 'pidx_start' to 'pidx_stop'. 334 */ 335 os.state = r->state; 336 if (n >= space_available(r, os)) { 337 counter_u64_add(r->drops, n); 338 MPASS(os.flags != IDLE); 339 mtx_unlock(&r->lock); 340 if (os.flags == STALLED) 341 ifmp_ring_check_drainage(r, 0); 342 return (ENOBUFS); 343 } 344 ns.state = os.state; 345 ns.pidx_head = increment_idx(r, os.pidx_head, n); 346 r->state = ns.state; 347 pidx_start = os.pidx_head; 348 pidx_stop = ns.pidx_head; 349 350 /* 351 * Wait for other producers who got in ahead of us to enqueue their 352 * items, one producer at a time. It is our turn when the ring's 353 * pidx_tail reaches the beginning of our reservation (pidx_start). 354 */ 355 while (ns.pidx_tail != pidx_start) { 356 cpu_spinwait(); 357 ns.state = r->state; 358 } 359 360 /* Now it is our turn to fill up the area we reserved earlier. */ 361 i = pidx_start; 362 do { 363 r->items[i] = *items++; 364 if (__predict_false(++i == r->size)) 365 i = 0; 366 } while (i != pidx_stop); 367 368 /* 369 * Update the ring's pidx_tail. The release style atomic guarantees 370 * that the items are visible to any thread that sees the updated pidx. 371 */ 372 os.state = ns.state = r->state; 373 ns.pidx_tail = pidx_stop; 374 if (abdicate) { 375 if (os.flags == IDLE) 376 ns.flags = ABDICATED; 377 } else 378 ns.flags = BUSY; 379 r->state = ns.state; 380 counter_u64_add(r->enqueues, n); 381 382 if (!abdicate) { 383 /* 384 * Turn into a consumer if some other thread isn't active as a consumer 385 * already. 386 */ 387 if (os.flags != BUSY) 388 drain_ring_locked(r, ns, os.flags, budget); 389 } 390 391 mtx_unlock(&r->lock); 392 return (0); 393 } 394 #else 395 int 396 ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate) 397 { 398 union ring_state os, ns; 399 uint16_t pidx_start, pidx_stop; 400 int i; 401 402 MPASS(items != NULL); 403 MPASS(n > 0); 404 405 /* 406 * Reserve room for the new items. Our reservation, if successful, is 407 * from 'pidx_start' to 'pidx_stop'. 408 */ 409 os.state = r->state; 410 for (;;) { 411 if (n >= space_available(r, os)) { 412 counter_u64_add(r->drops, n); 413 MPASS(os.flags != IDLE); 414 if (os.flags == STALLED) 415 ifmp_ring_check_drainage(r, 0); 416 return (ENOBUFS); 417 } 418 ns.state = os.state; 419 ns.pidx_head = increment_idx(r, os.pidx_head, n); 420 critical_enter(); 421 if (atomic_fcmpset_64(&r->state, &os.state, ns.state)) 422 break; 423 critical_exit(); 424 cpu_spinwait(); 425 } 426 pidx_start = os.pidx_head; 427 pidx_stop = ns.pidx_head; 428 429 /* 430 * Wait for other producers who got in ahead of us to enqueue their 431 * items, one producer at a time. It is our turn when the ring's 432 * pidx_tail reaches the beginning of our reservation (pidx_start). 433 */ 434 while (ns.pidx_tail != pidx_start) { 435 cpu_spinwait(); 436 ns.state = r->state; 437 } 438 439 /* Now it is our turn to fill up the area we reserved earlier. */ 440 i = pidx_start; 441 do { 442 r->items[i] = *items++; 443 if (__predict_false(++i == r->size)) 444 i = 0; 445 } while (i != pidx_stop); 446 447 /* 448 * Update the ring's pidx_tail. The release style atomic guarantees 449 * that the items are visible to any thread that sees the updated pidx. 450 */ 451 os.state = r->state; 452 do { 453 ns.state = os.state; 454 ns.pidx_tail = pidx_stop; 455 if (abdicate) { 456 if (os.flags == IDLE) 457 ns.flags = ABDICATED; 458 } else 459 ns.flags = BUSY; 460 } while (atomic_fcmpset_rel_64(&r->state, &os.state, ns.state) == 0); 461 critical_exit(); 462 counter_u64_add(r->enqueues, n); 463 464 if (!abdicate) { 465 /* 466 * Turn into a consumer if some other thread isn't active as a consumer 467 * already. 468 */ 469 if (os.flags != BUSY) 470 drain_ring_lockless(r, ns, os.flags, budget); 471 } 472 473 return (0); 474 } 475 #endif 476 477 void 478 ifmp_ring_check_drainage(struct ifmp_ring *r, int budget) 479 { 480 union ring_state os, ns; 481 482 os.state = r->state; 483 if ((os.flags != STALLED && os.flags != ABDICATED) || // Only continue in STALLED and ABDICATED 484 os.pidx_head != os.pidx_tail || // Require work to be available 485 (os.flags != ABDICATED && r->can_drain(r) == 0)) // Can either drain, or everyone left 486 return; 487 488 MPASS(os.cidx != os.pidx_tail); /* implied by STALLED */ 489 ns.state = os.state; 490 ns.flags = BUSY; 491 492 #ifdef MP_RING_NO_64BIT_ATOMICS 493 mtx_lock(&r->lock); 494 if (r->state != os.state) { 495 mtx_unlock(&r->lock); 496 return; 497 } 498 r->state = ns.state; 499 drain_ring_locked(r, ns, os.flags, budget); 500 mtx_unlock(&r->lock); 501 #else 502 /* 503 * The acquire style atomic guarantees visibility of items associated 504 * with the pidx that we read here. 505 */ 506 if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state)) 507 return; 508 509 drain_ring_lockless(r, ns, os.flags, budget); 510 #endif 511 } 512 513 void 514 ifmp_ring_reset_stats(struct ifmp_ring *r) 515 { 516 517 counter_u64_zero(r->enqueues); 518 counter_u64_zero(r->drops); 519 counter_u64_zero(r->starts); 520 counter_u64_zero(r->stalls); 521 counter_u64_zero(r->restarts); 522 counter_u64_zero(r->abdications); 523 } 524 525 int 526 ifmp_ring_is_idle(struct ifmp_ring *r) 527 { 528 union ring_state s; 529 530 s.state = r->state; 531 if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx && 532 s.flags == IDLE) 533 return (1); 534 535 return (0); 536 } 537 538 int 539 ifmp_ring_is_stalled(struct ifmp_ring *r) 540 { 541 union ring_state s; 542 543 s.state = r->state; 544 if (s.pidx_head == s.pidx_tail && s.flags == STALLED) 545 return (1); 546 547 return (0); 548 } 549