1 /*- 2 * Copyright (c) 2014 Chelsio Communications, Inc. 3 * All rights reserved. 4 * Written by: Navdeep Parhar <np@FreeBSD.org> 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 1. Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 16 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 18 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 19 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 20 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 21 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 22 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 23 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 24 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 25 * SUCH DAMAGE. 26 */ 27 28 #include <sys/cdefs.h> 29 #include <sys/types.h> 30 #include <sys/param.h> 31 #include <sys/systm.h> 32 #include <sys/counter.h> 33 #include <sys/lock.h> 34 #include <sys/malloc.h> 35 #include <sys/mutex.h> 36 #include <sys/sysctl.h> 37 #include <machine/cpu.h> 38 39 #include "t4_mp_ring.h" 40 41 #if defined(__i386__) 42 #define atomic_cmpset_acq_64 atomic_cmpset_64 43 #define atomic_cmpset_rel_64 atomic_cmpset_64 44 #endif 45 46 /* 47 * mp_ring handles multiple threads (producers) enqueueing data to a tx queue. 48 * The thread that is writing the hardware descriptors is the consumer and it 49 * runs with the consumer lock held. A producer becomes the consumer if there 50 * isn't one already. The consumer runs with the flags sets to BUSY and 51 * consumes everything (IDLE or COALESCING) or gets STALLED. If it is running 52 * over its budget it sets flags to TOO_BUSY. A producer that observes a 53 * TOO_BUSY consumer will become the new consumer by setting flags to 54 * TAKING_OVER. The original consumer stops and sets the flags back to BUSY for 55 * the new consumer. 56 * 57 * COALESCING is the same as IDLE except there are items being held in the hope 58 * that they can be coalesced with items that follow. The driver must arrange 59 * for a tx update or some other event that transmits all the held items in a 60 * timely manner if nothing else is enqueued. 61 */ 62 63 union ring_state { 64 struct { 65 uint16_t pidx_head; 66 uint16_t pidx_tail; 67 uint16_t cidx; 68 uint16_t flags; 69 }; 70 uint64_t state; 71 }; 72 73 enum { 74 IDLE = 0, /* tx is all caught up, nothing to do. */ 75 COALESCING, /* IDLE, but tx frames are being held for coalescing */ 76 BUSY, /* consumer is running already, or will be shortly. */ 77 TOO_BUSY, /* consumer is running and is beyond its budget */ 78 TAKING_OVER, /* new consumer taking over from a TOO_BUSY consumer */ 79 STALLED, /* consumer stopped due to lack of resources. */ 80 }; 81 82 enum { 83 C_FAST = 0, 84 C_2, 85 C_3, 86 C_TAKEOVER, 87 }; 88 89 static inline uint16_t 90 space_available(struct mp_ring *r, union ring_state s) 91 { 92 uint16_t x = r->size - 1; 93 94 if (s.cidx == s.pidx_head) 95 return (x); 96 else if (s.cidx > s.pidx_head) 97 return (s.cidx - s.pidx_head - 1); 98 else 99 return (x - s.pidx_head + s.cidx); 100 } 101 102 static inline uint16_t 103 increment_idx(struct mp_ring *r, uint16_t idx, uint16_t n) 104 { 105 int x = r->size - idx; 106 107 MPASS(x > 0); 108 return (x > n ? idx + n : n - x); 109 } 110 111 /* 112 * Consumer. Called with the consumer lock held and a guarantee that there is 113 * work to do. 114 */ 115 static void 116 drain_ring(struct mp_ring *r, int budget) 117 { 118 union ring_state os, ns; 119 int n, pending, total; 120 uint16_t cidx; 121 uint16_t pidx; 122 bool coalescing; 123 124 mtx_assert(r->cons_lock, MA_OWNED); 125 126 os.state = atomic_load_acq_64(&r->state); 127 MPASS(os.flags == BUSY); 128 129 cidx = os.cidx; 130 pidx = os.pidx_tail; 131 MPASS(cidx != pidx); 132 133 pending = 0; 134 total = 0; 135 136 while (cidx != pidx) { 137 138 /* Items from cidx to pidx are available for consumption. */ 139 n = r->drain(r, cidx, pidx, &coalescing); 140 if (n == 0) { 141 critical_enter(); 142 os.state = atomic_load_64(&r->state); 143 do { 144 ns.state = os.state; 145 ns.cidx = cidx; 146 147 MPASS(os.flags == BUSY || 148 os.flags == TOO_BUSY || 149 os.flags == TAKING_OVER); 150 151 if (os.flags == TAKING_OVER) 152 ns.flags = BUSY; 153 else 154 ns.flags = STALLED; 155 } while (atomic_fcmpset_64(&r->state, &os.state, 156 ns.state) == 0); 157 critical_exit(); 158 if (os.flags == TAKING_OVER) 159 counter_u64_add(r->abdications, 1); 160 else if (ns.flags == STALLED) 161 counter_u64_add(r->stalls, 1); 162 break; 163 } 164 cidx = increment_idx(r, cidx, n); 165 pending += n; 166 total += n; 167 counter_u64_add(r->consumed, n); 168 169 os.state = atomic_load_64(&r->state); 170 do { 171 MPASS(os.flags == BUSY || os.flags == TOO_BUSY || 172 os.flags == TAKING_OVER); 173 174 ns.state = os.state; 175 ns.cidx = cidx; 176 if (__predict_false(os.flags == TAKING_OVER)) { 177 MPASS(total >= budget); 178 ns.flags = BUSY; 179 continue; 180 } 181 if (cidx == os.pidx_tail) { 182 ns.flags = coalescing ? COALESCING : IDLE; 183 continue; 184 } 185 if (total >= budget) { 186 ns.flags = TOO_BUSY; 187 continue; 188 } 189 MPASS(os.flags == BUSY); 190 if (pending < 32) 191 break; 192 } while (atomic_fcmpset_acq_64(&r->state, &os.state, ns.state) == 0); 193 194 if (__predict_false(os.flags == TAKING_OVER)) { 195 MPASS(ns.flags == BUSY); 196 counter_u64_add(r->abdications, 1); 197 break; 198 } 199 200 if (ns.flags == IDLE || ns.flags == COALESCING) { 201 MPASS(ns.pidx_tail == cidx); 202 if (ns.pidx_head != ns.pidx_tail) 203 counter_u64_add(r->cons_idle2, 1); 204 else 205 counter_u64_add(r->cons_idle, 1); 206 break; 207 } 208 209 /* 210 * The acquire style atomic above guarantees visibility of items 211 * associated with any pidx change that we notice here. 212 */ 213 pidx = ns.pidx_tail; 214 pending = 0; 215 } 216 217 #ifdef INVARIANTS 218 if (os.flags == TAKING_OVER) 219 MPASS(ns.flags == BUSY); 220 else { 221 MPASS(ns.flags == IDLE || ns.flags == COALESCING || 222 ns.flags == STALLED); 223 } 224 #endif 225 } 226 227 static void 228 drain_txpkts(struct mp_ring *r, union ring_state os, int budget) 229 { 230 union ring_state ns; 231 uint16_t cidx = os.cidx; 232 uint16_t pidx = os.pidx_tail; 233 bool coalescing; 234 235 mtx_assert(r->cons_lock, MA_OWNED); 236 MPASS(os.flags == BUSY); 237 MPASS(cidx == pidx); 238 239 r->drain(r, cidx, pidx, &coalescing); 240 MPASS(coalescing == false); 241 critical_enter(); 242 os.state = atomic_load_64(&r->state); 243 do { 244 ns.state = os.state; 245 MPASS(os.flags == BUSY); 246 MPASS(os.cidx == cidx); 247 if (ns.cidx == ns.pidx_tail) 248 ns.flags = IDLE; 249 else 250 ns.flags = BUSY; 251 } while (atomic_fcmpset_acq_64(&r->state, &os.state, ns.state) == 0); 252 critical_exit(); 253 254 if (ns.flags == BUSY) 255 drain_ring(r, budget); 256 } 257 258 int 259 mp_ring_alloc(struct mp_ring **pr, int size, void *cookie, ring_drain_t drain, 260 ring_can_drain_t can_drain, struct malloc_type *mt, struct mtx *lck, 261 int flags) 262 { 263 struct mp_ring *r; 264 int i; 265 266 /* All idx are 16b so size can be 65536 at most */ 267 if (pr == NULL || size < 2 || size > 65536 || drain == NULL || 268 can_drain == NULL) 269 return (EINVAL); 270 *pr = NULL; 271 flags &= M_NOWAIT | M_WAITOK; 272 MPASS(flags != 0); 273 274 r = malloc(__offsetof(struct mp_ring, items[size]), mt, flags | M_ZERO); 275 if (r == NULL) 276 return (ENOMEM); 277 r->size = size; 278 r->cookie = cookie; 279 r->mt = mt; 280 r->drain = drain; 281 r->can_drain = can_drain; 282 r->cons_lock = lck; 283 if ((r->dropped = counter_u64_alloc(flags)) == NULL) 284 goto failed; 285 for (i = 0; i < nitems(r->consumer); i++) { 286 if ((r->consumer[i] = counter_u64_alloc(flags)) == NULL) 287 goto failed; 288 } 289 if ((r->not_consumer = counter_u64_alloc(flags)) == NULL) 290 goto failed; 291 if ((r->abdications = counter_u64_alloc(flags)) == NULL) 292 goto failed; 293 if ((r->stalls = counter_u64_alloc(flags)) == NULL) 294 goto failed; 295 if ((r->consumed = counter_u64_alloc(flags)) == NULL) 296 goto failed; 297 if ((r->cons_idle = counter_u64_alloc(flags)) == NULL) 298 goto failed; 299 if ((r->cons_idle2 = counter_u64_alloc(flags)) == NULL) 300 goto failed; 301 *pr = r; 302 return (0); 303 failed: 304 mp_ring_free(r); 305 return (ENOMEM); 306 } 307 308 void 309 310 mp_ring_free(struct mp_ring *r) 311 { 312 int i; 313 314 if (r == NULL) 315 return; 316 317 if (r->dropped != NULL) 318 counter_u64_free(r->dropped); 319 for (i = 0; i < nitems(r->consumer); i++) { 320 if (r->consumer[i] != NULL) 321 counter_u64_free(r->consumer[i]); 322 } 323 if (r->not_consumer != NULL) 324 counter_u64_free(r->not_consumer); 325 if (r->abdications != NULL) 326 counter_u64_free(r->abdications); 327 if (r->stalls != NULL) 328 counter_u64_free(r->stalls); 329 if (r->consumed != NULL) 330 counter_u64_free(r->consumed); 331 if (r->cons_idle != NULL) 332 counter_u64_free(r->cons_idle); 333 if (r->cons_idle2 != NULL) 334 counter_u64_free(r->cons_idle2); 335 336 free(r, r->mt); 337 } 338 339 /* 340 * Enqueue n items and maybe drain the ring for some time. 341 * 342 * Returns an errno. 343 */ 344 int 345 mp_ring_enqueue(struct mp_ring *r, void **items, int n, int budget) 346 { 347 union ring_state os, ns; 348 uint16_t pidx_start, pidx_stop; 349 int i, nospc, cons; 350 bool consumer; 351 352 MPASS(items != NULL); 353 MPASS(n > 0); 354 355 /* 356 * Reserve room for the new items. Our reservation, if successful, is 357 * from 'pidx_start' to 'pidx_stop'. 358 */ 359 nospc = 0; 360 os.state = atomic_load_64(&r->state); 361 for (;;) { 362 for (;;) { 363 if (__predict_true(space_available(r, os) >= n)) 364 break; 365 366 /* Not enough room in the ring. */ 367 368 MPASS(os.flags != IDLE); 369 MPASS(os.flags != COALESCING); 370 if (__predict_false(++nospc > 100)) { 371 counter_u64_add(r->dropped, n); 372 return (ENOBUFS); 373 } 374 if (os.flags == STALLED) 375 mp_ring_check_drainage(r, 64); 376 else 377 cpu_spinwait(); 378 os.state = atomic_load_64(&r->state); 379 } 380 381 /* There is room in the ring. */ 382 383 cons = -1; 384 ns.state = os.state; 385 ns.pidx_head = increment_idx(r, os.pidx_head, n); 386 if (os.flags == IDLE || os.flags == COALESCING) { 387 MPASS(os.pidx_tail == os.cidx); 388 if (os.pidx_head == os.pidx_tail) { 389 cons = C_FAST; 390 ns.pidx_tail = increment_idx(r, os.pidx_tail, n); 391 } else 392 cons = C_2; 393 ns.flags = BUSY; 394 } else if (os.flags == TOO_BUSY) { 395 cons = C_TAKEOVER; 396 ns.flags = TAKING_OVER; 397 } 398 critical_enter(); 399 if (atomic_fcmpset_64(&r->state, &os.state, ns.state)) 400 break; 401 critical_exit(); 402 cpu_spinwait(); 403 }; 404 405 pidx_start = os.pidx_head; 406 pidx_stop = ns.pidx_head; 407 408 if (cons == C_FAST) { 409 i = pidx_start; 410 do { 411 r->items[i] = *items++; 412 if (__predict_false(++i == r->size)) 413 i = 0; 414 } while (i != pidx_stop); 415 critical_exit(); 416 counter_u64_add(r->consumer[C_FAST], 1); 417 mtx_lock(r->cons_lock); 418 drain_ring(r, budget); 419 mtx_unlock(r->cons_lock); 420 return (0); 421 } 422 423 /* 424 * Wait for other producers who got in ahead of us to enqueue their 425 * items, one producer at a time. It is our turn when the ring's 426 * pidx_tail reaches the beginning of our reservation (pidx_start). 427 */ 428 while (ns.pidx_tail != pidx_start) { 429 cpu_spinwait(); 430 ns.state = atomic_load_64(&r->state); 431 } 432 433 /* Now it is our turn to fill up the area we reserved earlier. */ 434 i = pidx_start; 435 do { 436 r->items[i] = *items++; 437 if (__predict_false(++i == r->size)) 438 i = 0; 439 } while (i != pidx_stop); 440 441 /* 442 * Update the ring's pidx_tail. The release style atomic guarantees 443 * that the items are visible to any thread that sees the updated pidx. 444 */ 445 os.state = atomic_load_64(&r->state); 446 do { 447 consumer = false; 448 ns.state = os.state; 449 ns.pidx_tail = pidx_stop; 450 if (os.flags == IDLE || os.flags == COALESCING || 451 (os.flags == STALLED && r->can_drain(r))) { 452 MPASS(cons == -1); 453 consumer = true; 454 ns.flags = BUSY; 455 } 456 } while (atomic_fcmpset_rel_64(&r->state, &os.state, ns.state) == 0); 457 critical_exit(); 458 459 if (cons == -1) { 460 if (consumer) 461 cons = C_3; 462 else { 463 counter_u64_add(r->not_consumer, 1); 464 return (0); 465 } 466 } 467 MPASS(cons > C_FAST && cons < nitems(r->consumer)); 468 counter_u64_add(r->consumer[cons], 1); 469 mtx_lock(r->cons_lock); 470 drain_ring(r, budget); 471 mtx_unlock(r->cons_lock); 472 473 return (0); 474 } 475 476 void 477 mp_ring_check_drainage(struct mp_ring *r, int budget) 478 { 479 union ring_state os, ns; 480 481 os.state = atomic_load_64(&r->state); 482 if (os.flags == STALLED && r->can_drain(r)) { 483 MPASS(os.cidx != os.pidx_tail); /* implied by STALLED */ 484 ns.state = os.state; 485 ns.flags = BUSY; 486 if (atomic_cmpset_acq_64(&r->state, os.state, ns.state)) { 487 mtx_lock(r->cons_lock); 488 drain_ring(r, budget); 489 mtx_unlock(r->cons_lock); 490 } 491 } else if (os.flags == COALESCING) { 492 MPASS(os.cidx == os.pidx_tail); 493 ns.state = os.state; 494 ns.flags = BUSY; 495 if (atomic_cmpset_acq_64(&r->state, os.state, ns.state)) { 496 mtx_lock(r->cons_lock); 497 drain_txpkts(r, ns, budget); 498 mtx_unlock(r->cons_lock); 499 } 500 } 501 } 502 503 void 504 mp_ring_reset_stats(struct mp_ring *r) 505 { 506 int i; 507 508 counter_u64_zero(r->dropped); 509 for (i = 0; i < nitems(r->consumer); i++) 510 counter_u64_zero(r->consumer[i]); 511 counter_u64_zero(r->not_consumer); 512 counter_u64_zero(r->abdications); 513 counter_u64_zero(r->stalls); 514 counter_u64_zero(r->consumed); 515 counter_u64_zero(r->cons_idle); 516 counter_u64_zero(r->cons_idle2); 517 } 518 519 bool 520 mp_ring_is_idle(struct mp_ring *r) 521 { 522 union ring_state s; 523 524 s.state = atomic_load_64(&r->state); 525 if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx && 526 s.flags == IDLE) 527 return (true); 528 529 return (false); 530 } 531 532 void 533 mp_ring_sysctls(struct mp_ring *r, struct sysctl_ctx_list *ctx, 534 struct sysctl_oid_list *children) 535 { 536 struct sysctl_oid *oid; 537 538 oid = SYSCTL_ADD_NODE(ctx, children, OID_AUTO, "mp_ring", CTLFLAG_RD | 539 CTLFLAG_MPSAFE, NULL, "mp_ring statistics"); 540 children = SYSCTL_CHILDREN(oid); 541 542 SYSCTL_ADD_U64(ctx, children, OID_AUTO, "state", CTLFLAG_RD, 543 __DEVOLATILE(uint64_t *, &r->state), 0, "ring state"); 544 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "dropped", CTLFLAG_RD, 545 &r->dropped, "# of items dropped"); 546 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "consumed", 547 CTLFLAG_RD, &r->consumed, "# of items consumed"); 548 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "fast_consumer", 549 CTLFLAG_RD, &r->consumer[C_FAST], 550 "# of times producer became consumer (fast)"); 551 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "consumer2", 552 CTLFLAG_RD, &r->consumer[C_2], 553 "# of times producer became consumer (2)"); 554 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "consumer3", 555 CTLFLAG_RD, &r->consumer[C_3], 556 "# of times producer became consumer (3)"); 557 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "takeovers", 558 CTLFLAG_RD, &r->consumer[C_TAKEOVER], 559 "# of times producer took over from another consumer."); 560 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "not_consumer", 561 CTLFLAG_RD, &r->not_consumer, 562 "# of times producer did not become consumer"); 563 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "abdications", 564 CTLFLAG_RD, &r->abdications, "# of consumer abdications"); 565 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "stalls", 566 CTLFLAG_RD, &r->stalls, "# of consumer stalls"); 567 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "cons_idle", 568 CTLFLAG_RD, &r->cons_idle, 569 "# of times consumer ran fully to completion"); 570 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "cons_idle2", 571 CTLFLAG_RD, &r->cons_idle2, 572 "# of times consumer idled when another enqueue was in progress"); 573 } 574