1 /* 2 * Copyright 2009-2015 Samy Al Bahra. 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 */ 26 27 #ifndef CK_RING_H 28 #define CK_RING_H 29 30 #include <ck_cc.h> 31 #include <ck_md.h> 32 #include <ck_pr.h> 33 #include <ck_stdbool.h> 34 #include <ck_string.h> 35 36 /* 37 * Concurrent ring buffer. 38 */ 39 40 struct ck_ring { 41 unsigned int c_head; 42 char pad[CK_MD_CACHELINE - sizeof(unsigned int)]; 43 unsigned int p_tail; 44 unsigned int p_head; 45 char _pad[CK_MD_CACHELINE - sizeof(unsigned int) * 2]; 46 unsigned int size; 47 unsigned int mask; 48 }; 49 typedef struct ck_ring ck_ring_t; 50 51 struct ck_ring_buffer { 52 void *value; 53 }; 54 typedef struct ck_ring_buffer ck_ring_buffer_t; 55 56 CK_CC_INLINE static unsigned int 57 ck_ring_size(const struct ck_ring *ring) 58 { 59 unsigned int c, p; 60 61 c = ck_pr_load_uint(&ring->c_head); 62 p = ck_pr_load_uint(&ring->p_tail); 63 return (p - c) & ring->mask; 64 } 65 66 CK_CC_INLINE static unsigned int 67 ck_ring_capacity(const struct ck_ring *ring) 68 { 69 return ring->size; 70 } 71 72 CK_CC_INLINE static void 73 ck_ring_init(struct ck_ring *ring, unsigned int size) 74 { 75 76 ring->size = size; 77 ring->mask = size - 1; 78 ring->p_tail = 0; 79 ring->p_head = 0; 80 ring->c_head = 0; 81 return; 82 } 83 84 /* 85 * The _ck_ring_* namespace is internal only and must not used externally. 86 */ 87 CK_CC_FORCE_INLINE static bool 88 _ck_ring_enqueue_sp(struct ck_ring *ring, 89 void *CK_CC_RESTRICT buffer, 90 const void *CK_CC_RESTRICT entry, 91 unsigned int ts, 92 unsigned int *size) 93 { 94 const unsigned int mask = ring->mask; 95 unsigned int consumer, producer, delta; 96 97 consumer = ck_pr_load_uint(&ring->c_head); 98 producer = ring->p_tail; 99 delta = producer + 1; 100 if (size != NULL) 101 *size = (producer - consumer) & mask; 102 103 if (CK_CC_UNLIKELY((delta & mask) == (consumer & mask))) 104 return false; 105 106 buffer = (char *)buffer + ts * (producer & mask); 107 memcpy(buffer, entry, ts); 108 109 /* 110 * Make sure to update slot value before indicating 111 * that the slot is available for consumption. 112 */ 113 ck_pr_fence_store(); 114 ck_pr_store_uint(&ring->p_tail, delta); 115 return true; 116 } 117 118 CK_CC_FORCE_INLINE static bool 119 _ck_ring_enqueue_sp_size(struct ck_ring *ring, 120 void *CK_CC_RESTRICT buffer, 121 const void *CK_CC_RESTRICT entry, 122 unsigned int ts, 123 unsigned int *size) 124 { 125 unsigned int sz; 126 bool r; 127 128 r = _ck_ring_enqueue_sp(ring, buffer, entry, ts, &sz); 129 *size = sz; 130 return r; 131 } 132 133 CK_CC_FORCE_INLINE static bool 134 _ck_ring_dequeue_sc(struct ck_ring *ring, 135 const void *CK_CC_RESTRICT buffer, 136 void *CK_CC_RESTRICT target, 137 unsigned int size) 138 { 139 const unsigned int mask = ring->mask; 140 unsigned int consumer, producer; 141 142 consumer = ring->c_head; 143 producer = ck_pr_load_uint(&ring->p_tail); 144 145 if (CK_CC_UNLIKELY(consumer == producer)) 146 return false; 147 148 /* 149 * Make sure to serialize with respect to our snapshot 150 * of the producer counter. 151 */ 152 ck_pr_fence_load(); 153 154 buffer = (const char *)buffer + size * (consumer & mask); 155 memcpy(target, buffer, size); 156 157 /* 158 * Make sure copy is completed with respect to consumer 159 * update. 160 */ 161 ck_pr_fence_store(); 162 ck_pr_store_uint(&ring->c_head, consumer + 1); 163 return true; 164 } 165 166 CK_CC_FORCE_INLINE static bool 167 _ck_ring_enqueue_mp(struct ck_ring *ring, 168 void *buffer, 169 const void *entry, 170 unsigned int ts, 171 unsigned int *size) 172 { 173 const unsigned int mask = ring->mask; 174 unsigned int producer, consumer, delta; 175 bool r = true; 176 177 producer = ck_pr_load_uint(&ring->p_head); 178 179 for (;;) { 180 /* 181 * The snapshot of producer must be up to date with respect to 182 * consumer. 183 */ 184 ck_pr_fence_load(); 185 consumer = ck_pr_load_uint(&ring->c_head); 186 187 delta = producer + 1; 188 189 /* 190 * Only try to CAS if the producer is not clearly stale (not 191 * less than consumer) and the buffer is definitely not full. 192 */ 193 if (CK_CC_LIKELY((producer - consumer) < mask)) { 194 if (ck_pr_cas_uint_value(&ring->p_head, 195 producer, delta, &producer) == true) { 196 break; 197 } 198 } else { 199 unsigned int new_producer; 200 201 /* 202 * Slow path. Either the buffer is full or we have a 203 * stale snapshot of p_head. Execute a second read of 204 * p_read that must be ordered wrt the snapshot of 205 * c_head. 206 */ 207 ck_pr_fence_load(); 208 new_producer = ck_pr_load_uint(&ring->p_head); 209 210 /* 211 * Only fail if we haven't made forward progress in 212 * production: the buffer must have been full when we 213 * read new_producer (or we wrapped around UINT_MAX 214 * during this iteration). 215 */ 216 if (producer == new_producer) { 217 r = false; 218 goto leave; 219 } 220 221 /* 222 * p_head advanced during this iteration. Try again. 223 */ 224 producer = new_producer; 225 } 226 } 227 228 buffer = (char *)buffer + ts * (producer & mask); 229 memcpy(buffer, entry, ts); 230 231 /* 232 * Wait until all concurrent producers have completed writing 233 * their data into the ring buffer. 234 */ 235 while (ck_pr_load_uint(&ring->p_tail) != producer) 236 ck_pr_stall(); 237 238 /* 239 * Ensure that copy is completed before updating shared producer 240 * counter. 241 */ 242 ck_pr_fence_store(); 243 ck_pr_store_uint(&ring->p_tail, delta); 244 245 leave: 246 if (size != NULL) 247 *size = (producer - consumer) & mask; 248 249 return r; 250 } 251 252 CK_CC_FORCE_INLINE static bool 253 _ck_ring_enqueue_mp_size(struct ck_ring *ring, 254 void *buffer, 255 const void *entry, 256 unsigned int ts, 257 unsigned int *size) 258 { 259 unsigned int sz; 260 bool r; 261 262 r = _ck_ring_enqueue_mp(ring, buffer, entry, ts, &sz); 263 *size = sz; 264 return r; 265 } 266 267 CK_CC_FORCE_INLINE static bool 268 _ck_ring_trydequeue_mc(struct ck_ring *ring, 269 const void *buffer, 270 void *data, 271 unsigned int size) 272 { 273 const unsigned int mask = ring->mask; 274 unsigned int consumer, producer; 275 276 consumer = ck_pr_load_uint(&ring->c_head); 277 ck_pr_fence_load(); 278 producer = ck_pr_load_uint(&ring->p_tail); 279 280 if (CK_CC_UNLIKELY(consumer == producer)) 281 return false; 282 283 ck_pr_fence_load(); 284 285 buffer = (const char *)buffer + size * (consumer & mask); 286 memcpy(data, buffer, size); 287 288 ck_pr_fence_store_atomic(); 289 return ck_pr_cas_uint(&ring->c_head, consumer, consumer + 1); 290 } 291 292 CK_CC_FORCE_INLINE static bool 293 _ck_ring_dequeue_mc(struct ck_ring *ring, 294 const void *buffer, 295 void *data, 296 unsigned int ts) 297 { 298 const unsigned int mask = ring->mask; 299 unsigned int consumer, producer; 300 301 consumer = ck_pr_load_uint(&ring->c_head); 302 303 do { 304 const char *target; 305 306 /* 307 * Producer counter must represent state relative to 308 * our latest consumer snapshot. 309 */ 310 ck_pr_fence_load(); 311 producer = ck_pr_load_uint(&ring->p_tail); 312 313 if (CK_CC_UNLIKELY(consumer == producer)) 314 return false; 315 316 ck_pr_fence_load(); 317 318 target = (const char *)buffer + ts * (consumer & mask); 319 memcpy(data, target, ts); 320 321 /* Serialize load with respect to head update. */ 322 ck_pr_fence_store_atomic(); 323 } while (ck_pr_cas_uint_value(&ring->c_head, 324 consumer, 325 consumer + 1, 326 &consumer) == false); 327 328 return true; 329 } 330 331 /* 332 * The ck_ring_*_spsc namespace is the public interface for interacting with a 333 * ring buffer containing pointers. Correctness is only provided if there is up 334 * to one concurrent consumer and up to one concurrent producer. 335 */ 336 CK_CC_INLINE static bool 337 ck_ring_enqueue_spsc_size(struct ck_ring *ring, 338 struct ck_ring_buffer *buffer, 339 const void *entry, 340 unsigned int *size) 341 { 342 343 return _ck_ring_enqueue_sp_size(ring, buffer, &entry, 344 sizeof(entry), size); 345 } 346 347 CK_CC_INLINE static bool 348 ck_ring_enqueue_spsc(struct ck_ring *ring, 349 struct ck_ring_buffer *buffer, 350 const void *entry) 351 { 352 353 return _ck_ring_enqueue_sp(ring, buffer, 354 &entry, sizeof(entry), NULL); 355 } 356 357 CK_CC_INLINE static bool 358 ck_ring_dequeue_spsc(struct ck_ring *ring, 359 const struct ck_ring_buffer *buffer, 360 void *data) 361 { 362 363 return _ck_ring_dequeue_sc(ring, buffer, 364 (void **)data, sizeof(void *)); 365 } 366 367 /* 368 * The ck_ring_*_mpmc namespace is the public interface for interacting with a 369 * ring buffer containing pointers. Correctness is provided for any number of 370 * producers and consumers. 371 */ 372 CK_CC_INLINE static bool 373 ck_ring_enqueue_mpmc(struct ck_ring *ring, 374 struct ck_ring_buffer *buffer, 375 const void *entry) 376 { 377 378 return _ck_ring_enqueue_mp(ring, buffer, &entry, 379 sizeof(entry), NULL); 380 } 381 382 CK_CC_INLINE static bool 383 ck_ring_enqueue_mpmc_size(struct ck_ring *ring, 384 struct ck_ring_buffer *buffer, 385 const void *entry, 386 unsigned int *size) 387 { 388 389 return _ck_ring_enqueue_mp_size(ring, buffer, &entry, 390 sizeof(entry), size); 391 } 392 393 CK_CC_INLINE static bool 394 ck_ring_trydequeue_mpmc(struct ck_ring *ring, 395 const struct ck_ring_buffer *buffer, 396 void *data) 397 { 398 399 return _ck_ring_trydequeue_mc(ring, 400 buffer, (void **)data, sizeof(void *)); 401 } 402 403 CK_CC_INLINE static bool 404 ck_ring_dequeue_mpmc(struct ck_ring *ring, 405 const struct ck_ring_buffer *buffer, 406 void *data) 407 { 408 409 return _ck_ring_dequeue_mc(ring, buffer, (void **)data, 410 sizeof(void *)); 411 } 412 413 /* 414 * The ck_ring_*_spmc namespace is the public interface for interacting with a 415 * ring buffer containing pointers. Correctness is provided for any number of 416 * consumers with up to one concurrent producer. 417 */ 418 CK_CC_INLINE static bool 419 ck_ring_enqueue_spmc_size(struct ck_ring *ring, 420 struct ck_ring_buffer *buffer, 421 const void *entry, 422 unsigned int *size) 423 { 424 425 return _ck_ring_enqueue_sp_size(ring, buffer, &entry, 426 sizeof(entry), size); 427 } 428 429 CK_CC_INLINE static bool 430 ck_ring_enqueue_spmc(struct ck_ring *ring, 431 struct ck_ring_buffer *buffer, 432 const void *entry) 433 { 434 435 return _ck_ring_enqueue_sp(ring, buffer, &entry, 436 sizeof(entry), NULL); 437 } 438 439 CK_CC_INLINE static bool 440 ck_ring_trydequeue_spmc(struct ck_ring *ring, 441 const struct ck_ring_buffer *buffer, 442 void *data) 443 { 444 445 return _ck_ring_trydequeue_mc(ring, buffer, (void **)data, sizeof(void *)); 446 } 447 448 CK_CC_INLINE static bool 449 ck_ring_dequeue_spmc(struct ck_ring *ring, 450 const struct ck_ring_buffer *buffer, 451 void *data) 452 { 453 454 return _ck_ring_dequeue_mc(ring, buffer, (void **)data, sizeof(void *)); 455 } 456 457 /* 458 * The ck_ring_*_mpsc namespace is the public interface for interacting with a 459 * ring buffer containing pointers. Correctness is provided for any number of 460 * producers with up to one concurrent consumers. 461 */ 462 CK_CC_INLINE static bool 463 ck_ring_enqueue_mpsc(struct ck_ring *ring, 464 struct ck_ring_buffer *buffer, 465 const void *entry) 466 { 467 468 return _ck_ring_enqueue_mp(ring, buffer, &entry, 469 sizeof(entry), NULL); 470 } 471 472 CK_CC_INLINE static bool 473 ck_ring_enqueue_mpsc_size(struct ck_ring *ring, 474 struct ck_ring_buffer *buffer, 475 const void *entry, 476 unsigned int *size) 477 { 478 479 return _ck_ring_enqueue_mp_size(ring, buffer, &entry, 480 sizeof(entry), size); 481 } 482 483 CK_CC_INLINE static bool 484 ck_ring_dequeue_mpsc(struct ck_ring *ring, 485 const struct ck_ring_buffer *buffer, 486 void *data) 487 { 488 489 return _ck_ring_dequeue_sc(ring, buffer, (void **)data, 490 sizeof(void *)); 491 } 492 493 /* 494 * CK_RING_PROTOTYPE is used to define a type-safe interface for inlining 495 * values of a particular type in the ring the buffer. 496 */ 497 #define CK_RING_PROTOTYPE(name, type) \ 498 CK_CC_INLINE static bool \ 499 ck_ring_enqueue_spsc_size_##name(struct ck_ring *a, \ 500 struct type *b, \ 501 struct type *c, \ 502 unsigned int *d) \ 503 { \ 504 \ 505 return _ck_ring_enqueue_sp_size(a, b, c, \ 506 sizeof(struct type), d); \ 507 } \ 508 \ 509 CK_CC_INLINE static bool \ 510 ck_ring_enqueue_spsc_##name(struct ck_ring *a, \ 511 struct type *b, \ 512 struct type *c) \ 513 { \ 514 \ 515 return _ck_ring_enqueue_sp(a, b, c, \ 516 sizeof(struct type), NULL); \ 517 } \ 518 \ 519 CK_CC_INLINE static bool \ 520 ck_ring_dequeue_spsc_##name(struct ck_ring *a, \ 521 struct type *b, \ 522 struct type *c) \ 523 { \ 524 \ 525 return _ck_ring_dequeue_sc(a, b, c, \ 526 sizeof(struct type)); \ 527 } \ 528 \ 529 CK_CC_INLINE static bool \ 530 ck_ring_enqueue_spmc_size_##name(struct ck_ring *a, \ 531 struct type *b, \ 532 struct type *c, \ 533 unsigned int *d) \ 534 { \ 535 \ 536 return _ck_ring_enqueue_sp_size(a, b, c, \ 537 sizeof(struct type), d); \ 538 } \ 539 \ 540 CK_CC_INLINE static bool \ 541 ck_ring_enqueue_spmc_##name(struct ck_ring *a, \ 542 struct type *b, \ 543 struct type *c) \ 544 { \ 545 \ 546 return _ck_ring_enqueue_sp(a, b, c, \ 547 sizeof(struct type), NULL); \ 548 } \ 549 \ 550 CK_CC_INLINE static bool \ 551 ck_ring_trydequeue_spmc_##name(struct ck_ring *a, \ 552 struct type *b, \ 553 struct type *c) \ 554 { \ 555 \ 556 return _ck_ring_trydequeue_mc(a, \ 557 b, c, sizeof(struct type)); \ 558 } \ 559 \ 560 CK_CC_INLINE static bool \ 561 ck_ring_dequeue_spmc_##name(struct ck_ring *a, \ 562 struct type *b, \ 563 struct type *c) \ 564 { \ 565 \ 566 return _ck_ring_dequeue_mc(a, b, c, \ 567 sizeof(struct type)); \ 568 } \ 569 \ 570 CK_CC_INLINE static bool \ 571 ck_ring_enqueue_mpsc_##name(struct ck_ring *a, \ 572 struct type *b, \ 573 struct type *c) \ 574 { \ 575 \ 576 return _ck_ring_enqueue_mp(a, b, c, \ 577 sizeof(struct type), NULL); \ 578 } \ 579 \ 580 CK_CC_INLINE static bool \ 581 ck_ring_enqueue_mpsc_size_##name(struct ck_ring *a, \ 582 struct type *b, \ 583 struct type *c, \ 584 unsigned int *d) \ 585 { \ 586 \ 587 return _ck_ring_enqueue_mp_size(a, b, c, \ 588 sizeof(struct type), d); \ 589 } \ 590 \ 591 CK_CC_INLINE static bool \ 592 ck_ring_dequeue_mpsc_##name(struct ck_ring *a, \ 593 struct type *b, \ 594 struct type *c) \ 595 { \ 596 \ 597 return _ck_ring_dequeue_sc(a, b, c, \ 598 sizeof(struct type)); \ 599 } \ 600 \ 601 CK_CC_INLINE static bool \ 602 ck_ring_enqueue_mpmc_size_##name(struct ck_ring *a, \ 603 struct type *b, \ 604 struct type *c, \ 605 unsigned int *d) \ 606 { \ 607 \ 608 return _ck_ring_enqueue_mp_size(a, b, c, \ 609 sizeof(struct type), d); \ 610 } \ 611 \ 612 CK_CC_INLINE static bool \ 613 ck_ring_enqueue_mpmc_##name(struct ck_ring *a, \ 614 struct type *b, \ 615 struct type *c) \ 616 { \ 617 \ 618 return _ck_ring_enqueue_mp(a, b, c, \ 619 sizeof(struct type), NULL); \ 620 } \ 621 \ 622 CK_CC_INLINE static bool \ 623 ck_ring_trydequeue_mpmc_##name(struct ck_ring *a, \ 624 struct type *b, \ 625 struct type *c) \ 626 { \ 627 \ 628 return _ck_ring_trydequeue_mc(a, \ 629 b, c, sizeof(struct type)); \ 630 } \ 631 \ 632 CK_CC_INLINE static bool \ 633 ck_ring_dequeue_mpmc_##name(struct ck_ring *a, \ 634 struct type *b, \ 635 struct type *c) \ 636 { \ 637 \ 638 return _ck_ring_dequeue_mc(a, b, c, \ 639 sizeof(struct type)); \ 640 } 641 642 /* 643 * A single producer with one concurrent consumer. 644 */ 645 #define CK_RING_ENQUEUE_SPSC(name, a, b, c) \ 646 ck_ring_enqueue_spsc_##name(a, b, c) 647 #define CK_RING_ENQUEUE_SPSC_SIZE(name, a, b, c, d) \ 648 ck_ring_enqueue_spsc_size_##name(a, b, c, d) 649 #define CK_RING_DEQUEUE_SPSC(name, a, b, c) \ 650 ck_ring_dequeue_spsc_##name(a, b, c) 651 652 /* 653 * A single producer with any number of concurrent consumers. 654 */ 655 #define CK_RING_ENQUEUE_SPMC(name, a, b, c) \ 656 ck_ring_enqueue_spmc_##name(a, b, c) 657 #define CK_RING_ENQUEUE_SPMC_SIZE(name, a, b, c, d) \ 658 ck_ring_enqueue_spmc_size_##name(a, b, c, d) 659 #define CK_RING_TRYDEQUEUE_SPMC(name, a, b, c) \ 660 ck_ring_trydequeue_spmc_##name(a, b, c) 661 #define CK_RING_DEQUEUE_SPMC(name, a, b, c) \ 662 ck_ring_dequeue_spmc_##name(a, b, c) 663 664 /* 665 * Any number of concurrent producers with up to one 666 * concurrent consumer. 667 */ 668 #define CK_RING_ENQUEUE_MPSC(name, a, b, c) \ 669 ck_ring_enqueue_mpsc_##name(a, b, c) 670 #define CK_RING_ENQUEUE_MPSC_SIZE(name, a, b, c, d) \ 671 ck_ring_enqueue_mpsc_size_##name(a, b, c, d) 672 #define CK_RING_DEQUEUE_MPSC(name, a, b, c) \ 673 ck_ring_dequeue_mpsc_##name(a, b, c) 674 675 /* 676 * Any number of concurrent producers and consumers. 677 */ 678 #define CK_RING_ENQUEUE_MPMC(name, a, b, c) \ 679 ck_ring_enqueue_mpmc_##name(a, b, c) 680 #define CK_RING_ENQUEUE_MPMC_SIZE(name, a, b, c, d) \ 681 ck_ring_enqueue_mpmc_size_##name(a, b, c, d) 682 #define CK_RING_TRYDEQUEUE_MPMC(name, a, b, c) \ 683 ck_ring_trydequeue_mpmc_##name(a, b, c) 684 #define CK_RING_DEQUEUE_MPMC(name, a, b, c) \ 685 ck_ring_dequeue_mpmc_##name(a, b, c) 686 687 #endif /* CK_RING_H */ 688