1 /* 2 * Copyright 2009-2015 Samy Al Bahra. 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 */ 26 27 #ifndef CK_RING_H 28 #define CK_RING_H 29 30 #include <ck_cc.h> 31 #include <ck_md.h> 32 #include <ck_pr.h> 33 #include <ck_stdbool.h> 34 #include <ck_string.h> 35 36 /* 37 * Concurrent ring buffer. 38 */ 39 40 struct ck_ring { 41 unsigned int c_head; 42 char pad[CK_MD_CACHELINE - sizeof(unsigned int)]; 43 unsigned int p_tail; 44 unsigned int p_head; 45 char _pad[CK_MD_CACHELINE - sizeof(unsigned int) * 2]; 46 unsigned int size; 47 unsigned int mask; 48 }; 49 typedef struct ck_ring ck_ring_t; 50 51 struct ck_ring_buffer { 52 void *value; 53 }; 54 typedef struct ck_ring_buffer ck_ring_buffer_t; 55 56 CK_CC_INLINE static unsigned int 57 ck_ring_size(const struct ck_ring *ring) 58 { 59 unsigned int c, p; 60 61 c = ck_pr_load_uint(&ring->c_head); 62 p = ck_pr_load_uint(&ring->p_tail); 63 return (p - c) & ring->mask; 64 } 65 66 CK_CC_INLINE static unsigned int 67 ck_ring_capacity(const struct ck_ring *ring) 68 { 69 return ring->size; 70 } 71 72 CK_CC_INLINE static void 73 ck_ring_init(struct ck_ring *ring, unsigned int size) 74 { 75 76 ring->size = size; 77 ring->mask = size - 1; 78 ring->p_tail = 0; 79 ring->p_head = 0; 80 ring->c_head = 0; 81 return; 82 } 83 84 /* 85 * The _ck_ring_* namespace is internal only and must not used externally. 86 */ 87 CK_CC_FORCE_INLINE static bool 88 _ck_ring_enqueue_sp(struct ck_ring *ring, 89 void *CK_CC_RESTRICT buffer, 90 const void *CK_CC_RESTRICT entry, 91 unsigned int ts, 92 unsigned int *size) 93 { 94 const unsigned int mask = ring->mask; 95 unsigned int consumer, producer, delta; 96 97 consumer = ck_pr_load_uint(&ring->c_head); 98 producer = ring->p_tail; 99 delta = producer + 1; 100 if (size != NULL) 101 *size = (producer - consumer) & mask; 102 103 if (CK_CC_UNLIKELY((delta & mask) == (consumer & mask))) 104 return false; 105 106 buffer = (char *)buffer + ts * (producer & mask); 107 memcpy(buffer, entry, ts); 108 109 /* 110 * Make sure to update slot value before indicating 111 * that the slot is available for consumption. 112 */ 113 ck_pr_fence_store(); 114 ck_pr_store_uint(&ring->p_tail, delta); 115 return true; 116 } 117 118 CK_CC_FORCE_INLINE static bool 119 _ck_ring_enqueue_sp_size(struct ck_ring *ring, 120 void *CK_CC_RESTRICT buffer, 121 const void *CK_CC_RESTRICT entry, 122 unsigned int ts, 123 unsigned int *size) 124 { 125 unsigned int sz; 126 bool r; 127 128 r = _ck_ring_enqueue_sp(ring, buffer, entry, ts, &sz); 129 *size = sz; 130 return r; 131 } 132 133 CK_CC_FORCE_INLINE static bool 134 _ck_ring_dequeue_sc(struct ck_ring *ring, 135 const void *CK_CC_RESTRICT buffer, 136 void *CK_CC_RESTRICT target, 137 unsigned int size) 138 { 139 const unsigned int mask = ring->mask; 140 unsigned int consumer, producer; 141 142 consumer = ring->c_head; 143 producer = ck_pr_load_uint(&ring->p_tail); 144 145 if (CK_CC_UNLIKELY(consumer == producer)) 146 return false; 147 148 /* 149 * Make sure to serialize with respect to our snapshot 150 * of the producer counter. 151 */ 152 ck_pr_fence_load(); 153 154 buffer = (const char *)buffer + size * (consumer & mask); 155 memcpy(target, buffer, size); 156 157 /* 158 * Make sure copy is completed with respect to consumer 159 * update. 160 */ 161 ck_pr_fence_store(); 162 ck_pr_store_uint(&ring->c_head, consumer + 1); 163 return true; 164 } 165 166 CK_CC_FORCE_INLINE static bool 167 _ck_ring_enqueue_mp(struct ck_ring *ring, 168 void *buffer, 169 const void *entry, 170 unsigned int ts, 171 unsigned int *size) 172 { 173 const unsigned int mask = ring->mask; 174 unsigned int producer, consumer, delta; 175 bool r = true; 176 177 producer = ck_pr_load_uint(&ring->p_head); 178 179 do { 180 /* 181 * The snapshot of producer must be up to date with 182 * respect to consumer. 183 */ 184 ck_pr_fence_load(); 185 consumer = ck_pr_load_uint(&ring->c_head); 186 187 delta = producer + 1; 188 if (CK_CC_UNLIKELY((delta & mask) == (consumer & mask))) { 189 r = false; 190 goto leave; 191 } 192 } while (ck_pr_cas_uint_value(&ring->p_head, 193 producer, 194 delta, 195 &producer) == false); 196 197 buffer = (char *)buffer + ts * (producer & mask); 198 memcpy(buffer, entry, ts); 199 200 /* 201 * Wait until all concurrent producers have completed writing 202 * their data into the ring buffer. 203 */ 204 while (ck_pr_load_uint(&ring->p_tail) != producer) 205 ck_pr_stall(); 206 207 /* 208 * Ensure that copy is completed before updating shared producer 209 * counter. 210 */ 211 ck_pr_fence_store(); 212 ck_pr_store_uint(&ring->p_tail, delta); 213 214 leave: 215 if (size != NULL) 216 *size = (producer - consumer) & mask; 217 218 return r; 219 } 220 221 CK_CC_FORCE_INLINE static bool 222 _ck_ring_enqueue_mp_size(struct ck_ring *ring, 223 void *buffer, 224 const void *entry, 225 unsigned int ts, 226 unsigned int *size) 227 { 228 unsigned int sz; 229 bool r; 230 231 r = _ck_ring_enqueue_mp(ring, buffer, entry, ts, &sz); 232 *size = sz; 233 return r; 234 } 235 236 CK_CC_FORCE_INLINE static bool 237 _ck_ring_trydequeue_mc(struct ck_ring *ring, 238 const void *buffer, 239 void *data, 240 unsigned int size) 241 { 242 const unsigned int mask = ring->mask; 243 unsigned int consumer, producer; 244 245 consumer = ck_pr_load_uint(&ring->c_head); 246 ck_pr_fence_load(); 247 producer = ck_pr_load_uint(&ring->p_tail); 248 249 if (CK_CC_UNLIKELY(consumer == producer)) 250 return false; 251 252 ck_pr_fence_load(); 253 254 buffer = (const char *)buffer + size * (consumer & mask); 255 memcpy(data, buffer, size); 256 257 ck_pr_fence_store_atomic(); 258 return ck_pr_cas_uint(&ring->c_head, consumer, consumer + 1); 259 } 260 261 CK_CC_FORCE_INLINE static bool 262 _ck_ring_dequeue_mc(struct ck_ring *ring, 263 const void *buffer, 264 void *data, 265 unsigned int ts) 266 { 267 const unsigned int mask = ring->mask; 268 unsigned int consumer, producer; 269 270 consumer = ck_pr_load_uint(&ring->c_head); 271 272 do { 273 const char *target; 274 275 /* 276 * Producer counter must represent state relative to 277 * our latest consumer snapshot. 278 */ 279 ck_pr_fence_load(); 280 producer = ck_pr_load_uint(&ring->p_tail); 281 282 if (CK_CC_UNLIKELY(consumer == producer)) 283 return false; 284 285 ck_pr_fence_load(); 286 287 target = (const char *)buffer + ts * (consumer & mask); 288 memcpy(data, target, ts); 289 290 /* Serialize load with respect to head update. */ 291 ck_pr_fence_store_atomic(); 292 } while (ck_pr_cas_uint_value(&ring->c_head, 293 consumer, 294 consumer + 1, 295 &consumer) == false); 296 297 return true; 298 } 299 300 /* 301 * The ck_ring_*_spsc namespace is the public interface for interacting with a 302 * ring buffer containing pointers. Correctness is only provided if there is up 303 * to one concurrent consumer and up to one concurrent producer. 304 */ 305 CK_CC_INLINE static bool 306 ck_ring_enqueue_spsc_size(struct ck_ring *ring, 307 struct ck_ring_buffer *buffer, 308 const void *entry, 309 unsigned int *size) 310 { 311 312 return _ck_ring_enqueue_sp_size(ring, buffer, &entry, 313 sizeof(entry), size); 314 } 315 316 CK_CC_INLINE static bool 317 ck_ring_enqueue_spsc(struct ck_ring *ring, 318 struct ck_ring_buffer *buffer, 319 const void *entry) 320 { 321 322 return _ck_ring_enqueue_sp(ring, buffer, 323 &entry, sizeof(entry), NULL); 324 } 325 326 CK_CC_INLINE static bool 327 ck_ring_dequeue_spsc(struct ck_ring *ring, 328 const struct ck_ring_buffer *buffer, 329 void *data) 330 { 331 332 return _ck_ring_dequeue_sc(ring, buffer, 333 (void **)data, sizeof(void *)); 334 } 335 336 /* 337 * The ck_ring_*_mpmc namespace is the public interface for interacting with a 338 * ring buffer containing pointers. Correctness is provided for any number of 339 * producers and consumers. 340 */ 341 CK_CC_INLINE static bool 342 ck_ring_enqueue_mpmc(struct ck_ring *ring, 343 struct ck_ring_buffer *buffer, 344 const void *entry) 345 { 346 347 return _ck_ring_enqueue_mp(ring, buffer, &entry, 348 sizeof(entry), NULL); 349 } 350 351 CK_CC_INLINE static bool 352 ck_ring_enqueue_mpmc_size(struct ck_ring *ring, 353 struct ck_ring_buffer *buffer, 354 const void *entry, 355 unsigned int *size) 356 { 357 358 return _ck_ring_enqueue_mp_size(ring, buffer, &entry, 359 sizeof(entry), size); 360 } 361 362 CK_CC_INLINE static bool 363 ck_ring_trydequeue_mpmc(struct ck_ring *ring, 364 const struct ck_ring_buffer *buffer, 365 void *data) 366 { 367 368 return _ck_ring_trydequeue_mc(ring, 369 buffer, (void **)data, sizeof(void *)); 370 } 371 372 CK_CC_INLINE static bool 373 ck_ring_dequeue_mpmc(struct ck_ring *ring, 374 const struct ck_ring_buffer *buffer, 375 void *data) 376 { 377 378 return _ck_ring_dequeue_mc(ring, buffer, (void **)data, 379 sizeof(void *)); 380 } 381 382 /* 383 * The ck_ring_*_spmc namespace is the public interface for interacting with a 384 * ring buffer containing pointers. Correctness is provided for any number of 385 * consumers with up to one concurrent producer. 386 */ 387 CK_CC_INLINE static bool 388 ck_ring_enqueue_spmc_size(struct ck_ring *ring, 389 struct ck_ring_buffer *buffer, 390 const void *entry, 391 unsigned int *size) 392 { 393 394 return _ck_ring_enqueue_sp_size(ring, buffer, &entry, 395 sizeof(entry), size); 396 } 397 398 CK_CC_INLINE static bool 399 ck_ring_enqueue_spmc(struct ck_ring *ring, 400 struct ck_ring_buffer *buffer, 401 const void *entry) 402 { 403 404 return _ck_ring_enqueue_sp(ring, buffer, &entry, 405 sizeof(entry), NULL); 406 } 407 408 CK_CC_INLINE static bool 409 ck_ring_trydequeue_spmc(struct ck_ring *ring, 410 const struct ck_ring_buffer *buffer, 411 void *data) 412 { 413 414 return _ck_ring_trydequeue_mc(ring, buffer, (void **)data, sizeof(void *)); 415 } 416 417 CK_CC_INLINE static bool 418 ck_ring_dequeue_spmc(struct ck_ring *ring, 419 const struct ck_ring_buffer *buffer, 420 void *data) 421 { 422 423 return _ck_ring_dequeue_mc(ring, buffer, (void **)data, sizeof(void *)); 424 } 425 426 /* 427 * The ck_ring_*_mpsc namespace is the public interface for interacting with a 428 * ring buffer containing pointers. Correctness is provided for any number of 429 * producers with up to one concurrent consumers. 430 */ 431 CK_CC_INLINE static bool 432 ck_ring_enqueue_mpsc(struct ck_ring *ring, 433 struct ck_ring_buffer *buffer, 434 const void *entry) 435 { 436 437 return _ck_ring_enqueue_mp(ring, buffer, &entry, 438 sizeof(entry), NULL); 439 } 440 441 CK_CC_INLINE static bool 442 ck_ring_enqueue_mpsc_size(struct ck_ring *ring, 443 struct ck_ring_buffer *buffer, 444 const void *entry, 445 unsigned int *size) 446 { 447 448 return _ck_ring_enqueue_mp_size(ring, buffer, &entry, 449 sizeof(entry), size); 450 } 451 452 CK_CC_INLINE static bool 453 ck_ring_dequeue_mpsc(struct ck_ring *ring, 454 const struct ck_ring_buffer *buffer, 455 void *data) 456 { 457 458 return _ck_ring_dequeue_sc(ring, buffer, (void **)data, 459 sizeof(void *)); 460 } 461 462 /* 463 * CK_RING_PROTOTYPE is used to define a type-safe interface for inlining 464 * values of a particular type in the ring the buffer. 465 */ 466 #define CK_RING_PROTOTYPE(name, type) \ 467 CK_CC_INLINE static bool \ 468 ck_ring_enqueue_spsc_size_##name(struct ck_ring *a, \ 469 struct type *b, \ 470 struct type *c, \ 471 unsigned int *d) \ 472 { \ 473 \ 474 return _ck_ring_enqueue_sp_size(a, b, c, \ 475 sizeof(struct type), d); \ 476 } \ 477 \ 478 CK_CC_INLINE static bool \ 479 ck_ring_enqueue_spsc_##name(struct ck_ring *a, \ 480 struct type *b, \ 481 struct type *c) \ 482 { \ 483 \ 484 return _ck_ring_enqueue_sp(a, b, c, \ 485 sizeof(struct type), NULL); \ 486 } \ 487 \ 488 CK_CC_INLINE static bool \ 489 ck_ring_dequeue_spsc_##name(struct ck_ring *a, \ 490 struct type *b, \ 491 struct type *c) \ 492 { \ 493 \ 494 return _ck_ring_dequeue_sc(a, b, c, \ 495 sizeof(struct type)); \ 496 } \ 497 \ 498 CK_CC_INLINE static bool \ 499 ck_ring_enqueue_spmc_size_##name(struct ck_ring *a, \ 500 struct type *b, \ 501 struct type *c, \ 502 unsigned int *d) \ 503 { \ 504 \ 505 return _ck_ring_enqueue_sp_size(a, b, c, \ 506 sizeof(struct type), d); \ 507 } \ 508 \ 509 CK_CC_INLINE static bool \ 510 ck_ring_enqueue_spmc_##name(struct ck_ring *a, \ 511 struct type *b, \ 512 struct type *c) \ 513 { \ 514 \ 515 return _ck_ring_enqueue_sp(a, b, c, \ 516 sizeof(struct type), NULL); \ 517 } \ 518 \ 519 CK_CC_INLINE static bool \ 520 ck_ring_trydequeue_spmc_##name(struct ck_ring *a, \ 521 struct type *b, \ 522 struct type *c) \ 523 { \ 524 \ 525 return _ck_ring_trydequeue_mc(a, \ 526 b, c, sizeof(struct type)); \ 527 } \ 528 \ 529 CK_CC_INLINE static bool \ 530 ck_ring_dequeue_spmc_##name(struct ck_ring *a, \ 531 struct type *b, \ 532 struct type *c) \ 533 { \ 534 \ 535 return _ck_ring_dequeue_mc(a, b, c, \ 536 sizeof(struct type)); \ 537 } \ 538 \ 539 CK_CC_INLINE static bool \ 540 ck_ring_enqueue_mpsc_##name(struct ck_ring *a, \ 541 struct type *b, \ 542 struct type *c) \ 543 { \ 544 \ 545 return _ck_ring_enqueue_mp(a, b, c, \ 546 sizeof(struct type), NULL); \ 547 } \ 548 \ 549 CK_CC_INLINE static bool \ 550 ck_ring_enqueue_mpsc_size_##name(struct ck_ring *a, \ 551 struct type *b, \ 552 struct type *c, \ 553 unsigned int *d) \ 554 { \ 555 \ 556 return _ck_ring_enqueue_mp_size(a, b, c, \ 557 sizeof(struct type), d); \ 558 } \ 559 \ 560 CK_CC_INLINE static bool \ 561 ck_ring_dequeue_mpsc_##name(struct ck_ring *a, \ 562 struct type *b, \ 563 struct type *c) \ 564 { \ 565 \ 566 return _ck_ring_dequeue_sc(a, b, c, \ 567 sizeof(struct type)); \ 568 } \ 569 \ 570 CK_CC_INLINE static bool \ 571 ck_ring_enqueue_mpmc_size_##name(struct ck_ring *a, \ 572 struct type *b, \ 573 struct type *c, \ 574 unsigned int *d) \ 575 { \ 576 \ 577 return _ck_ring_enqueue_mp_size(a, b, c, \ 578 sizeof(struct type), d); \ 579 } \ 580 \ 581 CK_CC_INLINE static bool \ 582 ck_ring_enqueue_mpmc_##name(struct ck_ring *a, \ 583 struct type *b, \ 584 struct type *c) \ 585 { \ 586 \ 587 return _ck_ring_enqueue_mp(a, b, c, \ 588 sizeof(struct type), NULL); \ 589 } \ 590 \ 591 CK_CC_INLINE static bool \ 592 ck_ring_trydequeue_mpmc_##name(struct ck_ring *a, \ 593 struct type *b, \ 594 struct type *c) \ 595 { \ 596 \ 597 return _ck_ring_trydequeue_mc(a, \ 598 b, c, sizeof(struct type)); \ 599 } \ 600 \ 601 CK_CC_INLINE static bool \ 602 ck_ring_dequeue_mpmc_##name(struct ck_ring *a, \ 603 struct type *b, \ 604 struct type *c) \ 605 { \ 606 \ 607 return _ck_ring_dequeue_mc(a, b, c, \ 608 sizeof(struct type)); \ 609 } 610 611 /* 612 * A single producer with one concurrent consumer. 613 */ 614 #define CK_RING_ENQUEUE_SPSC(name, a, b, c) \ 615 ck_ring_enqueue_spsc_##name(a, b, c) 616 #define CK_RING_ENQUEUE_SPSC_SIZE(name, a, b, c, d) \ 617 ck_ring_enqueue_spsc_size_##name(a, b, c, d) 618 #define CK_RING_DEQUEUE_SPSC(name, a, b, c) \ 619 ck_ring_dequeue_spsc_##name(a, b, c) 620 621 /* 622 * A single producer with any number of concurrent consumers. 623 */ 624 #define CK_RING_ENQUEUE_SPMC(name, a, b, c) \ 625 ck_ring_enqueue_spmc_##name(a, b, c) 626 #define CK_RING_ENQUEUE_SPMC_SIZE(name, a, b, c, d) \ 627 ck_ring_enqueue_spmc_size_##name(a, b, c, d) 628 #define CK_RING_TRYDEQUEUE_SPMC(name, a, b, c) \ 629 ck_ring_trydequeue_spmc_##name(a, b, c) 630 #define CK_RING_DEQUEUE_SPMC(name, a, b, c) \ 631 ck_ring_dequeue_spmc_##name(a, b, c) 632 633 /* 634 * Any number of concurrent producers with up to one 635 * concurrent consumer. 636 */ 637 #define CK_RING_ENQUEUE_MPSC(name, a, b, c) \ 638 ck_ring_enqueue_mpsc_##name(a, b, c) 639 #define CK_RING_ENQUEUE_MPSC_SIZE(name, a, b, c, d) \ 640 ck_ring_enqueue_mpsc_size_##name(a, b, c, d) 641 #define CK_RING_DEQUEUE_MPSC(name, a, b, c) \ 642 ck_ring_dequeue_mpsc_##name(a, b, c) 643 644 /* 645 * Any number of concurrent producers and consumers. 646 */ 647 #define CK_RING_ENQUEUE_MPMC(name, a, b, c) \ 648 ck_ring_enqueue_mpmc_##name(a, b, c) 649 #define CK_RING_ENQUEUE_MPMC_SIZE(name, a, b, c, d) \ 650 ck_ring_enqueue_mpmc_size_##name(a, b, c, d) 651 #define CK_RING_TRYDEQUEUE_MPMC(name, a, b, c) \ 652 ck_ring_trydequeue_mpmc_##name(a, b, c) 653 #define CK_RING_DEQUEUE_MPMC(name, a, b, c) \ 654 ck_ring_dequeue_mpmc_##name(a, b, c) 655 656 #endif /* CK_RING_H */ 657