1 /*- 2 * Copyright (c) 2014 Chelsio Communications, Inc. 3 * All rights reserved. 4 * Written by: Navdeep Parhar <np@FreeBSD.org> 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 1. Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 16 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 18 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 19 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 20 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 21 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 22 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 23 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 24 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 25 * SUCH DAMAGE. 26 */ 27 28 #include <sys/cdefs.h> 29 __FBSDID("$FreeBSD$"); 30 31 #include <sys/types.h> 32 #include <sys/param.h> 33 #include <sys/systm.h> 34 #include <sys/counter.h> 35 #include <sys/lock.h> 36 #include <sys/malloc.h> 37 #include <sys/mutex.h> 38 #include <sys/sysctl.h> 39 #include <machine/cpu.h> 40 41 #include "t4_mp_ring.h" 42 43 #if defined(__i386__) 44 #define atomic_cmpset_acq_64 atomic_cmpset_64 45 #define atomic_cmpset_rel_64 atomic_cmpset_64 46 #endif 47 48 /* 49 * mp_ring handles multiple threads (producers) enqueueing data to a tx queue. 50 * The thread that is writing the hardware descriptors is the consumer and it 51 * runs with the consumer lock held. A producer becomes the consumer if there 52 * isn't one already. The consumer runs with the flags sets to BUSY and 53 * consumes everything (IDLE or COALESCING) or gets STALLED. If it is running 54 * over its budget it sets flags to TOO_BUSY. A producer that observes a 55 * TOO_BUSY consumer will become the new consumer by setting flags to 56 * TAKING_OVER. The original consumer stops and sets the flags back to BUSY for 57 * the new consumer. 58 * 59 * COALESCING is the same as IDLE except there are items being held in the hope 60 * that they can be coalesced with items that follow. The driver must arrange 61 * for a tx update or some other event that transmits all the held items in a 62 * timely manner if nothing else is enqueued. 63 */ 64 65 union ring_state { 66 struct { 67 uint16_t pidx_head; 68 uint16_t pidx_tail; 69 uint16_t cidx; 70 uint16_t flags; 71 }; 72 uint64_t state; 73 }; 74 75 enum { 76 IDLE = 0, /* tx is all caught up, nothing to do. */ 77 COALESCING, /* IDLE, but tx frames are being held for coalescing */ 78 BUSY, /* consumer is running already, or will be shortly. */ 79 TOO_BUSY, /* consumer is running and is beyond its budget */ 80 TAKING_OVER, /* new consumer taking over from a TOO_BUSY consumer */ 81 STALLED, /* consumer stopped due to lack of resources. */ 82 }; 83 84 enum { 85 C_FAST = 0, 86 C_2, 87 C_3, 88 C_TAKEOVER, 89 }; 90 91 static inline uint16_t 92 space_available(struct mp_ring *r, union ring_state s) 93 { 94 uint16_t x = r->size - 1; 95 96 if (s.cidx == s.pidx_head) 97 return (x); 98 else if (s.cidx > s.pidx_head) 99 return (s.cidx - s.pidx_head - 1); 100 else 101 return (x - s.pidx_head + s.cidx); 102 } 103 104 static inline uint16_t 105 increment_idx(struct mp_ring *r, uint16_t idx, uint16_t n) 106 { 107 int x = r->size - idx; 108 109 MPASS(x > 0); 110 return (x > n ? idx + n : n - x); 111 } 112 113 /* 114 * Consumer. Called with the consumer lock held and a guarantee that there is 115 * work to do. 116 */ 117 static void 118 drain_ring(struct mp_ring *r, int budget) 119 { 120 union ring_state os, ns; 121 int n, pending, total; 122 uint16_t cidx; 123 uint16_t pidx; 124 bool coalescing; 125 126 mtx_assert(r->cons_lock, MA_OWNED); 127 128 os.state = atomic_load_acq_64(&r->state); 129 MPASS(os.flags == BUSY); 130 131 cidx = os.cidx; 132 pidx = os.pidx_tail; 133 MPASS(cidx != pidx); 134 135 pending = 0; 136 total = 0; 137 138 while (cidx != pidx) { 139 140 /* Items from cidx to pidx are available for consumption. */ 141 n = r->drain(r, cidx, pidx, &coalescing); 142 if (n == 0) { 143 critical_enter(); 144 os.state = atomic_load_64(&r->state); 145 do { 146 ns.state = os.state; 147 ns.cidx = cidx; 148 149 MPASS(os.flags == BUSY || 150 os.flags == TOO_BUSY || 151 os.flags == TAKING_OVER); 152 153 if (os.flags == TAKING_OVER) 154 ns.flags = BUSY; 155 else 156 ns.flags = STALLED; 157 } while (atomic_fcmpset_64(&r->state, &os.state, 158 ns.state) == 0); 159 critical_exit(); 160 if (os.flags == TAKING_OVER) 161 counter_u64_add(r->abdications, 1); 162 else if (ns.flags == STALLED) 163 counter_u64_add(r->stalls, 1); 164 break; 165 } 166 cidx = increment_idx(r, cidx, n); 167 pending += n; 168 total += n; 169 counter_u64_add(r->consumed, n); 170 171 os.state = atomic_load_64(&r->state); 172 do { 173 MPASS(os.flags == BUSY || os.flags == TOO_BUSY || 174 os.flags == TAKING_OVER); 175 176 ns.state = os.state; 177 ns.cidx = cidx; 178 if (__predict_false(os.flags == TAKING_OVER)) { 179 MPASS(total >= budget); 180 ns.flags = BUSY; 181 continue; 182 } 183 if (cidx == os.pidx_tail) { 184 ns.flags = coalescing ? COALESCING : IDLE; 185 continue; 186 } 187 if (total >= budget) { 188 ns.flags = TOO_BUSY; 189 continue; 190 } 191 MPASS(os.flags == BUSY); 192 if (pending < 32) 193 break; 194 } while (atomic_fcmpset_acq_64(&r->state, &os.state, ns.state) == 0); 195 196 if (__predict_false(os.flags == TAKING_OVER)) { 197 MPASS(ns.flags == BUSY); 198 counter_u64_add(r->abdications, 1); 199 break; 200 } 201 202 if (ns.flags == IDLE || ns.flags == COALESCING) { 203 MPASS(ns.pidx_tail == cidx); 204 if (ns.pidx_head != ns.pidx_tail) 205 counter_u64_add(r->cons_idle2, 1); 206 else 207 counter_u64_add(r->cons_idle, 1); 208 break; 209 } 210 211 /* 212 * The acquire style atomic above guarantees visibility of items 213 * associated with any pidx change that we notice here. 214 */ 215 pidx = ns.pidx_tail; 216 pending = 0; 217 } 218 219 #ifdef INVARIANTS 220 if (os.flags == TAKING_OVER) 221 MPASS(ns.flags == BUSY); 222 else { 223 MPASS(ns.flags == IDLE || ns.flags == COALESCING || 224 ns.flags == STALLED); 225 } 226 #endif 227 } 228 229 static void 230 drain_txpkts(struct mp_ring *r, union ring_state os, int budget) 231 { 232 union ring_state ns; 233 uint16_t cidx = os.cidx; 234 uint16_t pidx = os.pidx_tail; 235 bool coalescing; 236 237 mtx_assert(r->cons_lock, MA_OWNED); 238 MPASS(os.flags == BUSY); 239 MPASS(cidx == pidx); 240 241 r->drain(r, cidx, pidx, &coalescing); 242 MPASS(coalescing == false); 243 critical_enter(); 244 os.state = atomic_load_64(&r->state); 245 do { 246 ns.state = os.state; 247 MPASS(os.flags == BUSY); 248 MPASS(os.cidx == cidx); 249 if (ns.cidx == ns.pidx_tail) 250 ns.flags = IDLE; 251 else 252 ns.flags = BUSY; 253 } while (atomic_fcmpset_acq_64(&r->state, &os.state, ns.state) == 0); 254 critical_exit(); 255 256 if (ns.flags == BUSY) 257 drain_ring(r, budget); 258 } 259 260 int 261 mp_ring_alloc(struct mp_ring **pr, int size, void *cookie, ring_drain_t drain, 262 ring_can_drain_t can_drain, struct malloc_type *mt, struct mtx *lck, 263 int flags) 264 { 265 struct mp_ring *r; 266 int i; 267 268 /* All idx are 16b so size can be 65536 at most */ 269 if (pr == NULL || size < 2 || size > 65536 || drain == NULL || 270 can_drain == NULL) 271 return (EINVAL); 272 *pr = NULL; 273 flags &= M_NOWAIT | M_WAITOK; 274 MPASS(flags != 0); 275 276 r = malloc(__offsetof(struct mp_ring, items[size]), mt, flags | M_ZERO); 277 if (r == NULL) 278 return (ENOMEM); 279 r->size = size; 280 r->cookie = cookie; 281 r->mt = mt; 282 r->drain = drain; 283 r->can_drain = can_drain; 284 r->cons_lock = lck; 285 if ((r->dropped = counter_u64_alloc(flags)) == NULL) 286 goto failed; 287 for (i = 0; i < nitems(r->consumer); i++) { 288 if ((r->consumer[i] = counter_u64_alloc(flags)) == NULL) 289 goto failed; 290 } 291 if ((r->not_consumer = counter_u64_alloc(flags)) == NULL) 292 goto failed; 293 if ((r->abdications = counter_u64_alloc(flags)) == NULL) 294 goto failed; 295 if ((r->stalls = counter_u64_alloc(flags)) == NULL) 296 goto failed; 297 if ((r->consumed = counter_u64_alloc(flags)) == NULL) 298 goto failed; 299 if ((r->cons_idle = counter_u64_alloc(flags)) == NULL) 300 goto failed; 301 if ((r->cons_idle2 = counter_u64_alloc(flags)) == NULL) 302 goto failed; 303 *pr = r; 304 return (0); 305 failed: 306 mp_ring_free(r); 307 return (ENOMEM); 308 } 309 310 void 311 312 mp_ring_free(struct mp_ring *r) 313 { 314 int i; 315 316 if (r == NULL) 317 return; 318 319 if (r->dropped != NULL) 320 counter_u64_free(r->dropped); 321 for (i = 0; i < nitems(r->consumer); i++) { 322 if (r->consumer[i] != NULL) 323 counter_u64_free(r->consumer[i]); 324 } 325 if (r->not_consumer != NULL) 326 counter_u64_free(r->not_consumer); 327 if (r->abdications != NULL) 328 counter_u64_free(r->abdications); 329 if (r->stalls != NULL) 330 counter_u64_free(r->stalls); 331 if (r->consumed != NULL) 332 counter_u64_free(r->consumed); 333 if (r->cons_idle != NULL) 334 counter_u64_free(r->cons_idle); 335 if (r->cons_idle2 != NULL) 336 counter_u64_free(r->cons_idle2); 337 338 free(r, r->mt); 339 } 340 341 /* 342 * Enqueue n items and maybe drain the ring for some time. 343 * 344 * Returns an errno. 345 */ 346 int 347 mp_ring_enqueue(struct mp_ring *r, void **items, int n, int budget) 348 { 349 union ring_state os, ns; 350 uint16_t pidx_start, pidx_stop; 351 int i, nospc, cons; 352 bool consumer; 353 354 MPASS(items != NULL); 355 MPASS(n > 0); 356 357 /* 358 * Reserve room for the new items. Our reservation, if successful, is 359 * from 'pidx_start' to 'pidx_stop'. 360 */ 361 nospc = 0; 362 os.state = atomic_load_64(&r->state); 363 for (;;) { 364 for (;;) { 365 if (__predict_true(space_available(r, os) >= n)) 366 break; 367 368 /* Not enough room in the ring. */ 369 370 MPASS(os.flags != IDLE); 371 MPASS(os.flags != COALESCING); 372 if (__predict_false(++nospc > 100)) { 373 counter_u64_add(r->dropped, n); 374 return (ENOBUFS); 375 } 376 if (os.flags == STALLED) 377 mp_ring_check_drainage(r, 64); 378 else 379 cpu_spinwait(); 380 os.state = atomic_load_64(&r->state); 381 } 382 383 /* There is room in the ring. */ 384 385 cons = -1; 386 ns.state = os.state; 387 ns.pidx_head = increment_idx(r, os.pidx_head, n); 388 if (os.flags == IDLE || os.flags == COALESCING) { 389 MPASS(os.pidx_tail == os.cidx); 390 if (os.pidx_head == os.pidx_tail) { 391 cons = C_FAST; 392 ns.pidx_tail = increment_idx(r, os.pidx_tail, n); 393 } else 394 cons = C_2; 395 ns.flags = BUSY; 396 } else if (os.flags == TOO_BUSY) { 397 cons = C_TAKEOVER; 398 ns.flags = TAKING_OVER; 399 } 400 critical_enter(); 401 if (atomic_fcmpset_64(&r->state, &os.state, ns.state)) 402 break; 403 critical_exit(); 404 cpu_spinwait(); 405 }; 406 407 pidx_start = os.pidx_head; 408 pidx_stop = ns.pidx_head; 409 410 if (cons == C_FAST) { 411 i = pidx_start; 412 do { 413 r->items[i] = *items++; 414 if (__predict_false(++i == r->size)) 415 i = 0; 416 } while (i != pidx_stop); 417 critical_exit(); 418 counter_u64_add(r->consumer[C_FAST], 1); 419 mtx_lock(r->cons_lock); 420 drain_ring(r, budget); 421 mtx_unlock(r->cons_lock); 422 return (0); 423 } 424 425 /* 426 * Wait for other producers who got in ahead of us to enqueue their 427 * items, one producer at a time. It is our turn when the ring's 428 * pidx_tail reaches the beginning of our reservation (pidx_start). 429 */ 430 while (ns.pidx_tail != pidx_start) { 431 cpu_spinwait(); 432 ns.state = atomic_load_64(&r->state); 433 } 434 435 /* Now it is our turn to fill up the area we reserved earlier. */ 436 i = pidx_start; 437 do { 438 r->items[i] = *items++; 439 if (__predict_false(++i == r->size)) 440 i = 0; 441 } while (i != pidx_stop); 442 443 /* 444 * Update the ring's pidx_tail. The release style atomic guarantees 445 * that the items are visible to any thread that sees the updated pidx. 446 */ 447 os.state = atomic_load_64(&r->state); 448 do { 449 consumer = false; 450 ns.state = os.state; 451 ns.pidx_tail = pidx_stop; 452 if (os.flags == IDLE || os.flags == COALESCING || 453 (os.flags == STALLED && r->can_drain(r))) { 454 MPASS(cons == -1); 455 consumer = true; 456 ns.flags = BUSY; 457 } 458 } while (atomic_fcmpset_rel_64(&r->state, &os.state, ns.state) == 0); 459 critical_exit(); 460 461 if (cons == -1) { 462 if (consumer) 463 cons = C_3; 464 else { 465 counter_u64_add(r->not_consumer, 1); 466 return (0); 467 } 468 } 469 MPASS(cons > C_FAST && cons < nitems(r->consumer)); 470 counter_u64_add(r->consumer[cons], 1); 471 mtx_lock(r->cons_lock); 472 drain_ring(r, budget); 473 mtx_unlock(r->cons_lock); 474 475 return (0); 476 } 477 478 void 479 mp_ring_check_drainage(struct mp_ring *r, int budget) 480 { 481 union ring_state os, ns; 482 483 os.state = atomic_load_64(&r->state); 484 if (os.flags == STALLED && r->can_drain(r)) { 485 MPASS(os.cidx != os.pidx_tail); /* implied by STALLED */ 486 ns.state = os.state; 487 ns.flags = BUSY; 488 if (atomic_cmpset_acq_64(&r->state, os.state, ns.state)) { 489 mtx_lock(r->cons_lock); 490 drain_ring(r, budget); 491 mtx_unlock(r->cons_lock); 492 } 493 } else if (os.flags == COALESCING) { 494 MPASS(os.cidx == os.pidx_tail); 495 ns.state = os.state; 496 ns.flags = BUSY; 497 if (atomic_cmpset_acq_64(&r->state, os.state, ns.state)) { 498 mtx_lock(r->cons_lock); 499 drain_txpkts(r, ns, budget); 500 mtx_unlock(r->cons_lock); 501 } 502 } 503 } 504 505 void 506 mp_ring_reset_stats(struct mp_ring *r) 507 { 508 int i; 509 510 counter_u64_zero(r->dropped); 511 for (i = 0; i < nitems(r->consumer); i++) 512 counter_u64_zero(r->consumer[i]); 513 counter_u64_zero(r->not_consumer); 514 counter_u64_zero(r->abdications); 515 counter_u64_zero(r->stalls); 516 counter_u64_zero(r->consumed); 517 counter_u64_zero(r->cons_idle); 518 counter_u64_zero(r->cons_idle2); 519 } 520 521 bool 522 mp_ring_is_idle(struct mp_ring *r) 523 { 524 union ring_state s; 525 526 s.state = atomic_load_64(&r->state); 527 if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx && 528 s.flags == IDLE) 529 return (true); 530 531 return (false); 532 } 533 534 void 535 mp_ring_sysctls(struct mp_ring *r, struct sysctl_ctx_list *ctx, 536 struct sysctl_oid_list *children) 537 { 538 struct sysctl_oid *oid; 539 540 oid = SYSCTL_ADD_NODE(ctx, children, OID_AUTO, "mp_ring", CTLFLAG_RD | 541 CTLFLAG_MPSAFE, NULL, "mp_ring statistics"); 542 children = SYSCTL_CHILDREN(oid); 543 544 SYSCTL_ADD_U64(ctx, children, OID_AUTO, "state", CTLFLAG_RD, 545 __DEVOLATILE(uint64_t *, &r->state), 0, "ring state"); 546 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "dropped", CTLFLAG_RD, 547 &r->dropped, "# of items dropped"); 548 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "consumed", 549 CTLFLAG_RD, &r->consumed, "# of items consumed"); 550 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "fast_consumer", 551 CTLFLAG_RD, &r->consumer[C_FAST], 552 "# of times producer became consumer (fast)"); 553 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "consumer2", 554 CTLFLAG_RD, &r->consumer[C_2], 555 "# of times producer became consumer (2)"); 556 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "consumer3", 557 CTLFLAG_RD, &r->consumer[C_3], 558 "# of times producer became consumer (3)"); 559 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "takeovers", 560 CTLFLAG_RD, &r->consumer[C_TAKEOVER], 561 "# of times producer took over from another consumer."); 562 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "not_consumer", 563 CTLFLAG_RD, &r->not_consumer, 564 "# of times producer did not become consumer"); 565 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "abdications", 566 CTLFLAG_RD, &r->abdications, "# of consumer abdications"); 567 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "stalls", 568 CTLFLAG_RD, &r->stalls, "# of consumer stalls"); 569 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "cons_idle", 570 CTLFLAG_RD, &r->cons_idle, 571 "# of times consumer ran fully to completion"); 572 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "cons_idle2", 573 CTLFLAG_RD, &r->cons_idle2, 574 "# of times consumer idled when another enqueue was in progress"); 575 } 576