1 /* 2 * Copyright 2009-2015 Samy Al Bahra. 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 */ 26 27 #ifndef CK_RING_H 28 #define CK_RING_H 29 30 #include <ck_cc.h> 31 #include <ck_md.h> 32 #include <ck_pr.h> 33 #include <ck_stdbool.h> 34 #include <ck_string.h> 35 36 /* 37 * Concurrent ring buffer. 38 */ 39 40 struct ck_ring { 41 unsigned int c_head; 42 char pad[CK_MD_CACHELINE - sizeof(unsigned int)]; 43 unsigned int p_tail; 44 unsigned int p_head; 45 char _pad[CK_MD_CACHELINE - sizeof(unsigned int) * 2]; 46 unsigned int size; 47 unsigned int mask; 48 }; 49 typedef struct ck_ring ck_ring_t; 50 51 struct ck_ring_buffer { 52 void *value; 53 }; 54 typedef struct ck_ring_buffer ck_ring_buffer_t; 55 56 CK_CC_INLINE static unsigned int 57 ck_ring_size(const struct ck_ring *ring) 58 { 59 unsigned int c, p; 60 61 c = ck_pr_load_uint(&ring->c_head); 62 p = ck_pr_load_uint(&ring->p_tail); 63 return (p - c) & ring->mask; 64 } 65 66 CK_CC_INLINE static unsigned int 67 ck_ring_capacity(const struct ck_ring *ring) 68 { 69 70 return ring->size; 71 } 72 73 /* 74 * This function is only safe to call when there are no concurrent operations 75 * on the ring. This is primarily meant for persistent ck_ring use-cases. The 76 * function returns true if any mutations were performed on the ring. 77 */ 78 CK_CC_INLINE static bool 79 ck_ring_repair(struct ck_ring *ring) 80 { 81 bool r = false; 82 83 if (ring->p_tail != ring->p_head) { 84 ring->p_tail = ring->p_head; 85 r = true; 86 } 87 88 return r; 89 } 90 91 /* 92 * This can be called when no concurrent updates are occurring on the ring 93 * structure to check for consistency. This is primarily meant to be used for 94 * persistent storage of the ring. If this functions returns false, the ring 95 * is in an inconsistent state. 96 */ 97 CK_CC_INLINE static bool 98 ck_ring_valid(const struct ck_ring *ring) 99 { 100 unsigned int size = ring->size; 101 unsigned int c_head = ring->c_head; 102 unsigned int p_head = ring->p_head; 103 104 /* The ring must be a power of 2. */ 105 if (size & (size - 1)) 106 return false; 107 108 /* The consumer counter must always be smaller than the producer. */ 109 if (c_head > p_head) 110 return false; 111 112 /* The producer may only be up to size slots ahead of consumer. */ 113 if (p_head - c_head >= size) 114 return false; 115 116 return true; 117 } 118 119 CK_CC_INLINE static void 120 ck_ring_init(struct ck_ring *ring, unsigned int size) 121 { 122 123 ring->size = size; 124 ring->mask = size - 1; 125 ring->p_tail = 0; 126 ring->p_head = 0; 127 ring->c_head = 0; 128 return; 129 } 130 131 /* 132 * The _ck_ring_* namespace is internal only and must not used externally. 133 */ 134 135 /* 136 * This function will return a region of memory to write for the next value 137 * for a single producer. 138 */ 139 CK_CC_FORCE_INLINE static void * 140 _ck_ring_enqueue_reserve_sp(struct ck_ring *ring, 141 void *CK_CC_RESTRICT buffer, 142 unsigned int ts, 143 unsigned int *size) 144 { 145 const unsigned int mask = ring->mask; 146 unsigned int consumer, producer, delta; 147 148 consumer = ck_pr_load_uint(&ring->c_head); 149 producer = ring->p_tail; 150 delta = producer + 1; 151 if (size != NULL) 152 *size = (producer - consumer) & mask; 153 154 if (CK_CC_UNLIKELY((delta & mask) == (consumer & mask))) 155 return NULL; 156 157 return (char *)buffer + ts * (producer & mask); 158 } 159 160 /* 161 * This is to be called to commit and make visible a region of previously 162 * reserved with reverse_sp. 163 */ 164 CK_CC_FORCE_INLINE static void 165 _ck_ring_enqueue_commit_sp(struct ck_ring *ring) 166 { 167 168 ck_pr_fence_store(); 169 ck_pr_store_uint(&ring->p_tail, ring->p_tail + 1); 170 return; 171 } 172 173 CK_CC_FORCE_INLINE static bool 174 _ck_ring_enqueue_sp(struct ck_ring *ring, 175 void *CK_CC_RESTRICT buffer, 176 const void *CK_CC_RESTRICT entry, 177 unsigned int ts, 178 unsigned int *size) 179 { 180 const unsigned int mask = ring->mask; 181 unsigned int consumer, producer, delta; 182 183 consumer = ck_pr_load_uint(&ring->c_head); 184 producer = ring->p_tail; 185 delta = producer + 1; 186 if (size != NULL) 187 *size = (producer - consumer) & mask; 188 189 if (CK_CC_UNLIKELY((delta & mask) == (consumer & mask))) 190 return false; 191 192 buffer = (char *)buffer + ts * (producer & mask); 193 memcpy(buffer, entry, ts); 194 195 /* 196 * Make sure to update slot value before indicating 197 * that the slot is available for consumption. 198 */ 199 ck_pr_fence_store(); 200 ck_pr_store_uint(&ring->p_tail, delta); 201 return true; 202 } 203 204 CK_CC_FORCE_INLINE static bool 205 _ck_ring_enqueue_sp_size(struct ck_ring *ring, 206 void *CK_CC_RESTRICT buffer, 207 const void *CK_CC_RESTRICT entry, 208 unsigned int ts, 209 unsigned int *size) 210 { 211 unsigned int sz; 212 bool r; 213 214 r = _ck_ring_enqueue_sp(ring, buffer, entry, ts, &sz); 215 *size = sz; 216 return r; 217 } 218 219 CK_CC_FORCE_INLINE static bool 220 _ck_ring_dequeue_sc(struct ck_ring *ring, 221 const void *CK_CC_RESTRICT buffer, 222 void *CK_CC_RESTRICT target, 223 unsigned int size) 224 { 225 const unsigned int mask = ring->mask; 226 unsigned int consumer, producer; 227 228 consumer = ring->c_head; 229 producer = ck_pr_load_uint(&ring->p_tail); 230 231 if (CK_CC_UNLIKELY(consumer == producer)) 232 return false; 233 234 /* 235 * Make sure to serialize with respect to our snapshot 236 * of the producer counter. 237 */ 238 ck_pr_fence_load(); 239 240 buffer = (const char *)buffer + size * (consumer & mask); 241 memcpy(target, buffer, size); 242 243 /* 244 * Make sure copy is completed with respect to consumer 245 * update. 246 */ 247 ck_pr_fence_store(); 248 ck_pr_store_uint(&ring->c_head, consumer + 1); 249 return true; 250 } 251 252 CK_CC_FORCE_INLINE static void * 253 _ck_ring_enqueue_reserve_mp(struct ck_ring *ring, 254 void *buffer, 255 unsigned int ts, 256 unsigned int *ticket, 257 unsigned int *size) 258 { 259 const unsigned int mask = ring->mask; 260 unsigned int producer, consumer, delta; 261 262 producer = ck_pr_load_uint(&ring->p_head); 263 264 for (;;) { 265 ck_pr_fence_load(); 266 consumer = ck_pr_load_uint(&ring->c_head); 267 268 delta = producer + 1; 269 270 if (CK_CC_LIKELY((producer - consumer) < mask)) { 271 if (ck_pr_cas_uint_value(&ring->p_head, 272 producer, delta, &producer) == true) { 273 break; 274 } 275 } else { 276 unsigned int new_producer; 277 278 ck_pr_fence_load(); 279 new_producer = ck_pr_load_uint(&ring->p_head); 280 281 if (producer == new_producer) { 282 if (size != NULL) 283 *size = (producer - consumer) & mask; 284 285 return false; 286 } 287 288 producer = new_producer; 289 } 290 } 291 292 *ticket = producer; 293 if (size != NULL) 294 *size = (producer - consumer) & mask; 295 296 return (char *)buffer + ts * (producer & mask); 297 } 298 299 CK_CC_FORCE_INLINE static void 300 _ck_ring_enqueue_commit_mp(struct ck_ring *ring, unsigned int producer) 301 { 302 303 while (ck_pr_load_uint(&ring->p_tail) != producer) 304 ck_pr_stall(); 305 306 ck_pr_fence_store(); 307 ck_pr_store_uint(&ring->p_tail, producer + 1); 308 return; 309 } 310 311 CK_CC_FORCE_INLINE static bool 312 _ck_ring_enqueue_mp(struct ck_ring *ring, 313 void *buffer, 314 const void *entry, 315 unsigned int ts, 316 unsigned int *size) 317 { 318 const unsigned int mask = ring->mask; 319 unsigned int producer, consumer, delta; 320 bool r = true; 321 322 producer = ck_pr_load_uint(&ring->p_head); 323 324 for (;;) { 325 /* 326 * The snapshot of producer must be up to date with respect to 327 * consumer. 328 */ 329 ck_pr_fence_load(); 330 consumer = ck_pr_load_uint(&ring->c_head); 331 332 delta = producer + 1; 333 334 /* 335 * Only try to CAS if the producer is not clearly stale (not 336 * less than consumer) and the buffer is definitely not full. 337 */ 338 if (CK_CC_LIKELY((producer - consumer) < mask)) { 339 if (ck_pr_cas_uint_value(&ring->p_head, 340 producer, delta, &producer) == true) { 341 break; 342 } 343 } else { 344 unsigned int new_producer; 345 346 /* 347 * Slow path. Either the buffer is full or we have a 348 * stale snapshot of p_head. Execute a second read of 349 * p_read that must be ordered wrt the snapshot of 350 * c_head. 351 */ 352 ck_pr_fence_load(); 353 new_producer = ck_pr_load_uint(&ring->p_head); 354 355 /* 356 * Only fail if we haven't made forward progress in 357 * production: the buffer must have been full when we 358 * read new_producer (or we wrapped around UINT_MAX 359 * during this iteration). 360 */ 361 if (producer == new_producer) { 362 r = false; 363 goto leave; 364 } 365 366 /* 367 * p_head advanced during this iteration. Try again. 368 */ 369 producer = new_producer; 370 } 371 } 372 373 buffer = (char *)buffer + ts * (producer & mask); 374 memcpy(buffer, entry, ts); 375 376 /* 377 * Wait until all concurrent producers have completed writing 378 * their data into the ring buffer. 379 */ 380 while (ck_pr_load_uint(&ring->p_tail) != producer) 381 ck_pr_stall(); 382 383 /* 384 * Ensure that copy is completed before updating shared producer 385 * counter. 386 */ 387 ck_pr_fence_store(); 388 ck_pr_store_uint(&ring->p_tail, delta); 389 390 leave: 391 if (size != NULL) 392 *size = (producer - consumer) & mask; 393 394 return r; 395 } 396 397 CK_CC_FORCE_INLINE static bool 398 _ck_ring_enqueue_mp_size(struct ck_ring *ring, 399 void *buffer, 400 const void *entry, 401 unsigned int ts, 402 unsigned int *size) 403 { 404 unsigned int sz; 405 bool r; 406 407 r = _ck_ring_enqueue_mp(ring, buffer, entry, ts, &sz); 408 *size = sz; 409 return r; 410 } 411 412 CK_CC_FORCE_INLINE static bool 413 _ck_ring_trydequeue_mc(struct ck_ring *ring, 414 const void *buffer, 415 void *data, 416 unsigned int size) 417 { 418 const unsigned int mask = ring->mask; 419 unsigned int consumer, producer; 420 421 consumer = ck_pr_load_uint(&ring->c_head); 422 ck_pr_fence_load(); 423 producer = ck_pr_load_uint(&ring->p_tail); 424 425 if (CK_CC_UNLIKELY(consumer == producer)) 426 return false; 427 428 ck_pr_fence_load(); 429 430 buffer = (const char *)buffer + size * (consumer & mask); 431 memcpy(data, buffer, size); 432 433 ck_pr_fence_store_atomic(); 434 return ck_pr_cas_uint(&ring->c_head, consumer, consumer + 1); 435 } 436 437 CK_CC_FORCE_INLINE static bool 438 _ck_ring_dequeue_mc(struct ck_ring *ring, 439 const void *buffer, 440 void *data, 441 unsigned int ts) 442 { 443 const unsigned int mask = ring->mask; 444 unsigned int consumer, producer; 445 446 consumer = ck_pr_load_uint(&ring->c_head); 447 448 do { 449 const char *target; 450 451 /* 452 * Producer counter must represent state relative to 453 * our latest consumer snapshot. 454 */ 455 ck_pr_fence_load(); 456 producer = ck_pr_load_uint(&ring->p_tail); 457 458 if (CK_CC_UNLIKELY(consumer == producer)) 459 return false; 460 461 ck_pr_fence_load(); 462 463 target = (const char *)buffer + ts * (consumer & mask); 464 memcpy(data, target, ts); 465 466 /* Serialize load with respect to head update. */ 467 ck_pr_fence_store_atomic(); 468 } while (ck_pr_cas_uint_value(&ring->c_head, 469 consumer, 470 consumer + 1, 471 &consumer) == false); 472 473 return true; 474 } 475 476 /* 477 * The ck_ring_*_spsc namespace is the public interface for interacting with a 478 * ring buffer containing pointers. Correctness is only provided if there is up 479 * to one concurrent consumer and up to one concurrent producer. 480 */ 481 CK_CC_INLINE static bool 482 ck_ring_enqueue_spsc_size(struct ck_ring *ring, 483 struct ck_ring_buffer *buffer, 484 const void *entry, 485 unsigned int *size) 486 { 487 488 return _ck_ring_enqueue_sp_size(ring, buffer, &entry, 489 sizeof(entry), size); 490 } 491 492 CK_CC_INLINE static bool 493 ck_ring_enqueue_spsc(struct ck_ring *ring, 494 struct ck_ring_buffer *buffer, 495 const void *entry) 496 { 497 498 return _ck_ring_enqueue_sp(ring, buffer, 499 &entry, sizeof(entry), NULL); 500 } 501 502 CK_CC_INLINE static void * 503 ck_ring_enqueue_reserve_spsc_size(struct ck_ring *ring, 504 struct ck_ring_buffer *buffer, 505 unsigned int *size) 506 { 507 508 return _ck_ring_enqueue_reserve_sp(ring, buffer, sizeof(void *), 509 size); 510 } 511 512 CK_CC_INLINE static void * 513 ck_ring_enqueue_reserve_spsc(struct ck_ring *ring, 514 struct ck_ring_buffer *buffer) 515 { 516 517 return _ck_ring_enqueue_reserve_sp(ring, buffer, sizeof(void *), 518 NULL); 519 } 520 521 CK_CC_INLINE static void 522 ck_ring_enqueue_commit_spsc(struct ck_ring *ring) 523 { 524 525 _ck_ring_enqueue_commit_sp(ring); 526 return; 527 } 528 529 CK_CC_INLINE static bool 530 ck_ring_dequeue_spsc(struct ck_ring *ring, 531 const struct ck_ring_buffer *buffer, 532 void *data) 533 { 534 535 return _ck_ring_dequeue_sc(ring, buffer, 536 (void **)data, sizeof(void *)); 537 } 538 539 /* 540 * The ck_ring_*_mpmc namespace is the public interface for interacting with a 541 * ring buffer containing pointers. Correctness is provided for any number of 542 * producers and consumers. 543 */ 544 CK_CC_INLINE static bool 545 ck_ring_enqueue_mpmc(struct ck_ring *ring, 546 struct ck_ring_buffer *buffer, 547 const void *entry) 548 { 549 550 return _ck_ring_enqueue_mp(ring, buffer, &entry, sizeof(entry), NULL); 551 } 552 553 CK_CC_INLINE static bool 554 ck_ring_enqueue_mpmc_size(struct ck_ring *ring, 555 struct ck_ring_buffer *buffer, 556 const void *entry, 557 unsigned int *size) 558 { 559 560 return _ck_ring_enqueue_mp_size(ring, buffer, &entry, sizeof(entry), 561 size); 562 } 563 564 CK_CC_INLINE static void * 565 ck_ring_enqueue_reserve_mpmc(struct ck_ring *ring, 566 struct ck_ring_buffer *buffer, 567 unsigned int *ticket) 568 { 569 570 return _ck_ring_enqueue_reserve_mp(ring, buffer, sizeof(void *), 571 ticket, NULL); 572 } 573 574 CK_CC_INLINE static void * 575 ck_ring_enqueue_reserve_mpmc_size(struct ck_ring *ring, 576 struct ck_ring_buffer *buffer, 577 unsigned int *ticket, 578 unsigned int *size) 579 { 580 581 return _ck_ring_enqueue_reserve_mp(ring, buffer, sizeof(void *), 582 ticket, size); 583 } 584 585 CK_CC_INLINE static void 586 ck_ring_enqueue_commit_mpmc(struct ck_ring *ring, unsigned int ticket) 587 { 588 589 _ck_ring_enqueue_commit_mp(ring, ticket); 590 return; 591 } 592 593 CK_CC_INLINE static bool 594 ck_ring_trydequeue_mpmc(struct ck_ring *ring, 595 const struct ck_ring_buffer *buffer, 596 void *data) 597 { 598 599 return _ck_ring_trydequeue_mc(ring, 600 buffer, (void **)data, sizeof(void *)); 601 } 602 603 CK_CC_INLINE static bool 604 ck_ring_dequeue_mpmc(struct ck_ring *ring, 605 const struct ck_ring_buffer *buffer, 606 void *data) 607 { 608 609 return _ck_ring_dequeue_mc(ring, buffer, (void **)data, 610 sizeof(void *)); 611 } 612 613 /* 614 * The ck_ring_*_spmc namespace is the public interface for interacting with a 615 * ring buffer containing pointers. Correctness is provided for any number of 616 * consumers with up to one concurrent producer. 617 */ 618 CK_CC_INLINE static void * 619 ck_ring_enqueue_reserve_spmc_size(struct ck_ring *ring, 620 struct ck_ring_buffer *buffer, 621 unsigned int *size) 622 { 623 624 return _ck_ring_enqueue_reserve_sp(ring, buffer, sizeof(void *), size); 625 } 626 627 CK_CC_INLINE static void * 628 ck_ring_enqueue_reserve_spmc(struct ck_ring *ring, 629 struct ck_ring_buffer *buffer) 630 { 631 632 return _ck_ring_enqueue_reserve_sp(ring, buffer, sizeof(void *), NULL); 633 } 634 635 CK_CC_INLINE static void 636 ck_ring_enqueue_commit_spmc(struct ck_ring *ring) 637 { 638 639 _ck_ring_enqueue_commit_sp(ring); 640 return; 641 } 642 643 CK_CC_INLINE static bool 644 ck_ring_enqueue_spmc_size(struct ck_ring *ring, 645 struct ck_ring_buffer *buffer, 646 const void *entry, 647 unsigned int *size) 648 { 649 650 return _ck_ring_enqueue_sp_size(ring, buffer, &entry, 651 sizeof(entry), size); 652 } 653 654 CK_CC_INLINE static bool 655 ck_ring_enqueue_spmc(struct ck_ring *ring, 656 struct ck_ring_buffer *buffer, 657 const void *entry) 658 { 659 660 return _ck_ring_enqueue_sp(ring, buffer, &entry, 661 sizeof(entry), NULL); 662 } 663 664 CK_CC_INLINE static bool 665 ck_ring_trydequeue_spmc(struct ck_ring *ring, 666 const struct ck_ring_buffer *buffer, 667 void *data) 668 { 669 670 return _ck_ring_trydequeue_mc(ring, buffer, (void **)data, sizeof(void *)); 671 } 672 673 CK_CC_INLINE static bool 674 ck_ring_dequeue_spmc(struct ck_ring *ring, 675 const struct ck_ring_buffer *buffer, 676 void *data) 677 { 678 679 return _ck_ring_dequeue_mc(ring, buffer, (void **)data, sizeof(void *)); 680 } 681 682 /* 683 * The ck_ring_*_mpsc namespace is the public interface for interacting with a 684 * ring buffer containing pointers. Correctness is provided for any number of 685 * producers with up to one concurrent consumers. 686 */ 687 CK_CC_INLINE static void * 688 ck_ring_enqueue_reserve_mpsc(struct ck_ring *ring, 689 struct ck_ring_buffer *buffer, 690 unsigned int *ticket) 691 { 692 693 return _ck_ring_enqueue_reserve_mp(ring, buffer, sizeof(void *), 694 ticket, NULL); 695 } 696 697 CK_CC_INLINE static void * 698 ck_ring_enqueue_reserve_mpsc_size(struct ck_ring *ring, 699 struct ck_ring_buffer *buffer, 700 unsigned int *ticket, 701 unsigned int *size) 702 { 703 704 return _ck_ring_enqueue_reserve_mp(ring, buffer, sizeof(void *), 705 ticket, size); 706 } 707 708 CK_CC_INLINE static void 709 ck_ring_enqueue_commit_mpsc(struct ck_ring *ring, unsigned int ticket) 710 { 711 712 _ck_ring_enqueue_commit_mp(ring, ticket); 713 return; 714 } 715 716 CK_CC_INLINE static bool 717 ck_ring_enqueue_mpsc(struct ck_ring *ring, 718 struct ck_ring_buffer *buffer, 719 const void *entry) 720 { 721 722 return _ck_ring_enqueue_mp(ring, buffer, &entry, 723 sizeof(entry), NULL); 724 } 725 726 CK_CC_INLINE static bool 727 ck_ring_enqueue_mpsc_size(struct ck_ring *ring, 728 struct ck_ring_buffer *buffer, 729 const void *entry, 730 unsigned int *size) 731 { 732 733 return _ck_ring_enqueue_mp_size(ring, buffer, &entry, 734 sizeof(entry), size); 735 } 736 737 CK_CC_INLINE static bool 738 ck_ring_dequeue_mpsc(struct ck_ring *ring, 739 const struct ck_ring_buffer *buffer, 740 void *data) 741 { 742 743 return _ck_ring_dequeue_sc(ring, buffer, (void **)data, 744 sizeof(void *)); 745 } 746 747 /* 748 * CK_RING_PROTOTYPE is used to define a type-safe interface for inlining 749 * values of a particular type in the ring the buffer. 750 */ 751 #define CK_RING_PROTOTYPE(name, type) \ 752 CK_CC_INLINE static struct type * \ 753 ck_ring_enqueue_reserve_spsc_##name(struct ck_ring *a, \ 754 struct type *b) \ 755 { \ 756 \ 757 return _ck_ring_enqueue_reserve_sp(a, b, \ 758 sizeof(struct type), NULL); \ 759 } \ 760 \ 761 CK_CC_INLINE static struct type * \ 762 ck_ring_enqueue_reserve_spsc_size_##name(struct ck_ring *a, \ 763 struct type *b, \ 764 unsigned int *c) \ 765 { \ 766 \ 767 return _ck_ring_enqueue_reserve_sp(a, b, \ 768 sizeof(struct type), c); \ 769 } \ 770 \ 771 CK_CC_INLINE static bool \ 772 ck_ring_enqueue_spsc_size_##name(struct ck_ring *a, \ 773 struct type *b, \ 774 struct type *c, \ 775 unsigned int *d) \ 776 { \ 777 \ 778 return _ck_ring_enqueue_sp_size(a, b, c, \ 779 sizeof(struct type), d); \ 780 } \ 781 \ 782 CK_CC_INLINE static bool \ 783 ck_ring_enqueue_spsc_##name(struct ck_ring *a, \ 784 struct type *b, \ 785 struct type *c) \ 786 { \ 787 \ 788 return _ck_ring_enqueue_sp(a, b, c, \ 789 sizeof(struct type), NULL); \ 790 } \ 791 \ 792 CK_CC_INLINE static bool \ 793 ck_ring_dequeue_spsc_##name(struct ck_ring *a, \ 794 struct type *b, \ 795 struct type *c) \ 796 { \ 797 \ 798 return _ck_ring_dequeue_sc(a, b, c, \ 799 sizeof(struct type)); \ 800 } \ 801 \ 802 CK_CC_INLINE static struct type * \ 803 ck_ring_enqueue_reserve_spmc_##name(struct ck_ring *a, \ 804 struct type *b) \ 805 { \ 806 \ 807 return _ck_ring_enqueue_reserve_sp(a, b, \ 808 sizeof(struct type), NULL); \ 809 } \ 810 \ 811 CK_CC_INLINE static struct type * \ 812 ck_ring_enqueue_reserve_spmc_size_##name(struct ck_ring *a, \ 813 struct type *b, \ 814 unsigned int *c) \ 815 { \ 816 \ 817 return _ck_ring_enqueue_reserve_sp(a, b, \ 818 sizeof(struct type), c); \ 819 } \ 820 \ 821 CK_CC_INLINE static bool \ 822 ck_ring_enqueue_spmc_size_##name(struct ck_ring *a, \ 823 struct type *b, \ 824 struct type *c, \ 825 unsigned int *d) \ 826 { \ 827 \ 828 return _ck_ring_enqueue_sp_size(a, b, c, \ 829 sizeof(struct type), d); \ 830 } \ 831 \ 832 CK_CC_INLINE static bool \ 833 ck_ring_enqueue_spmc_##name(struct ck_ring *a, \ 834 struct type *b, \ 835 struct type *c) \ 836 { \ 837 \ 838 return _ck_ring_enqueue_sp(a, b, c, \ 839 sizeof(struct type), NULL); \ 840 } \ 841 \ 842 CK_CC_INLINE static bool \ 843 ck_ring_trydequeue_spmc_##name(struct ck_ring *a, \ 844 struct type *b, \ 845 struct type *c) \ 846 { \ 847 \ 848 return _ck_ring_trydequeue_mc(a, \ 849 b, c, sizeof(struct type)); \ 850 } \ 851 \ 852 CK_CC_INLINE static bool \ 853 ck_ring_dequeue_spmc_##name(struct ck_ring *a, \ 854 struct type *b, \ 855 struct type *c) \ 856 { \ 857 \ 858 return _ck_ring_dequeue_mc(a, b, c, \ 859 sizeof(struct type)); \ 860 } \ 861 \ 862 CK_CC_INLINE static struct type * \ 863 ck_ring_enqueue_reserve_mpsc_##name(struct ck_ring *a, \ 864 struct type *b, \ 865 unsigned int *c) \ 866 { \ 867 \ 868 return _ck_ring_enqueue_reserve_mp(a, b, \ 869 sizeof(struct type), c, NULL); \ 870 } \ 871 \ 872 CK_CC_INLINE static struct type * \ 873 ck_ring_enqueue_reserve_mpsc_size_##name(struct ck_ring *a, \ 874 struct type *b, \ 875 unsigned int *c, \ 876 unsigned int *d) \ 877 { \ 878 \ 879 return _ck_ring_enqueue_reserve_mp(a, b, \ 880 sizeof(struct type), c, d); \ 881 } \ 882 \ 883 CK_CC_INLINE static bool \ 884 ck_ring_enqueue_mpsc_##name(struct ck_ring *a, \ 885 struct type *b, \ 886 struct type *c) \ 887 { \ 888 \ 889 return _ck_ring_enqueue_mp(a, b, c, \ 890 sizeof(struct type), NULL); \ 891 } \ 892 \ 893 CK_CC_INLINE static bool \ 894 ck_ring_enqueue_mpsc_size_##name(struct ck_ring *a, \ 895 struct type *b, \ 896 struct type *c, \ 897 unsigned int *d) \ 898 { \ 899 \ 900 return _ck_ring_enqueue_mp_size(a, b, c, \ 901 sizeof(struct type), d); \ 902 } \ 903 \ 904 CK_CC_INLINE static bool \ 905 ck_ring_dequeue_mpsc_##name(struct ck_ring *a, \ 906 struct type *b, \ 907 struct type *c) \ 908 { \ 909 \ 910 return _ck_ring_dequeue_sc(a, b, c, \ 911 sizeof(struct type)); \ 912 } \ 913 \ 914 CK_CC_INLINE static struct type * \ 915 ck_ring_enqueue_reserve_mpmc_##name(struct ck_ring *a, \ 916 struct type *b, \ 917 unsigned int *c) \ 918 { \ 919 \ 920 return _ck_ring_enqueue_reserve_mp(a, b, \ 921 sizeof(struct type), c, NULL); \ 922 } \ 923 \ 924 CK_CC_INLINE static struct type * \ 925 ck_ring_enqueue_reserve_mpmc_size_##name(struct ck_ring *a, \ 926 struct type *b, \ 927 unsigned int *c, \ 928 unsigned int *d) \ 929 { \ 930 \ 931 return _ck_ring_enqueue_reserve_mp(a, b, \ 932 sizeof(struct type), c, d); \ 933 } \ 934 \ 935 CK_CC_INLINE static bool \ 936 ck_ring_enqueue_mpmc_size_##name(struct ck_ring *a, \ 937 struct type *b, \ 938 struct type *c, \ 939 unsigned int *d) \ 940 { \ 941 \ 942 return _ck_ring_enqueue_mp_size(a, b, c, \ 943 sizeof(struct type), d); \ 944 } \ 945 \ 946 CK_CC_INLINE static bool \ 947 ck_ring_enqueue_mpmc_##name(struct ck_ring *a, \ 948 struct type *b, \ 949 struct type *c) \ 950 { \ 951 \ 952 return _ck_ring_enqueue_mp(a, b, c, \ 953 sizeof(struct type), NULL); \ 954 } \ 955 \ 956 CK_CC_INLINE static bool \ 957 ck_ring_trydequeue_mpmc_##name(struct ck_ring *a, \ 958 struct type *b, \ 959 struct type *c) \ 960 { \ 961 \ 962 return _ck_ring_trydequeue_mc(a, \ 963 b, c, sizeof(struct type)); \ 964 } \ 965 \ 966 CK_CC_INLINE static bool \ 967 ck_ring_dequeue_mpmc_##name(struct ck_ring *a, \ 968 struct type *b, \ 969 struct type *c) \ 970 { \ 971 \ 972 return _ck_ring_dequeue_mc(a, b, c, \ 973 sizeof(struct type)); \ 974 } 975 976 /* 977 * A single producer with one concurrent consumer. 978 */ 979 #define CK_RING_ENQUEUE_SPSC(name, a, b, c) \ 980 ck_ring_enqueue_spsc_##name(a, b, c) 981 #define CK_RING_ENQUEUE_SPSC_SIZE(name, a, b, c, d) \ 982 ck_ring_enqueue_spsc_size_##name(a, b, c, d) 983 #define CK_RING_ENQUEUE_RESERVE_SPSC(name, a, b, c) \ 984 ck_ring_enqueue_reserve_spsc_##name(a, b, c) 985 #define CK_RING_ENQUEUE_RESERVE_SPSC_SIZE(name, a, b, c, d) \ 986 ck_ring_enqueue_reserve_spsc_size_##name(a, b, c, d) 987 #define CK_RING_DEQUEUE_SPSC(name, a, b, c) \ 988 ck_ring_dequeue_spsc_##name(a, b, c) 989 990 /* 991 * A single producer with any number of concurrent consumers. 992 */ 993 #define CK_RING_ENQUEUE_SPMC(name, a, b, c) \ 994 ck_ring_enqueue_spmc_##name(a, b, c) 995 #define CK_RING_ENQUEUE_SPMC_SIZE(name, a, b, c, d) \ 996 ck_ring_enqueue_spmc_size_##name(a, b, c, d) 997 #define CK_RING_ENQUEUE_RESERVE_SPMC(name, a, b, c) \ 998 ck_ring_enqueue_reserve_spmc_##name(a, b, c) 999 #define CK_RING_ENQUEUE_RESERVE_SPMC_SIZE(name, a, b, c, d) \ 1000 ck_ring_enqueue_reserve_spmc_size_##name(a, b, c, d) 1001 #define CK_RING_TRYDEQUEUE_SPMC(name, a, b, c) \ 1002 ck_ring_trydequeue_spmc_##name(a, b, c) 1003 #define CK_RING_DEQUEUE_SPMC(name, a, b, c) \ 1004 ck_ring_dequeue_spmc_##name(a, b, c) 1005 1006 /* 1007 * Any number of concurrent producers with up to one 1008 * concurrent consumer. 1009 */ 1010 #define CK_RING_ENQUEUE_MPSC(name, a, b, c) \ 1011 ck_ring_enqueue_mpsc_##name(a, b, c) 1012 #define CK_RING_ENQUEUE_MPSC_SIZE(name, a, b, c, d) \ 1013 ck_ring_enqueue_mpsc_size_##name(a, b, c, d) 1014 #define CK_RING_ENQUEUE_RESERVE_MPSC(name, a, b, c) \ 1015 ck_ring_enqueue_reserve_mpsc_##name(a, b, c) 1016 #define CK_RING_ENQUEUE_RESERVE_MPSC_SIZE(name, a, b, c, d) \ 1017 ck_ring_enqueue_reserve_mpsc_size_##name(a, b, c, d) 1018 #define CK_RING_DEQUEUE_MPSC(name, a, b, c) \ 1019 ck_ring_dequeue_mpsc_##name(a, b, c) 1020 1021 /* 1022 * Any number of concurrent producers and consumers. 1023 */ 1024 #define CK_RING_ENQUEUE_MPMC(name, a, b, c) \ 1025 ck_ring_enqueue_mpmc_##name(a, b, c) 1026 #define CK_RING_ENQUEUE_MPMC_SIZE(name, a, b, c, d) \ 1027 ck_ring_enqueue_mpmc_size_##name(a, b, c, d) 1028 #define CK_RING_ENQUEUE_RESERVE_MPMC(name, a, b, c) \ 1029 ck_ring_enqueue_reserve_mpmc_##name(a, b, c) 1030 #define CK_RING_ENQUEUE_RESERVE_MPMC_SIZE(name, a, b, c, d) \ 1031 ck_ring_enqueue_reserve_mpmc_size_##name(a, b, c, d) 1032 #define CK_RING_TRYDEQUEUE_MPMC(name, a, b, c) \ 1033 ck_ring_trydequeue_mpmc_##name(a, b, c) 1034 #define CK_RING_DEQUEUE_MPMC(name, a, b, c) \ 1035 ck_ring_dequeue_mpmc_##name(a, b, c) 1036 1037 #endif /* CK_RING_H */ 1038