1 /*
2 * Copyright 2009-2015 Samy Al Bahra.
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 */
26
27 #ifndef CK_RING_H
28 #define CK_RING_H
29
30 #include <ck_cc.h>
31 #include <ck_md.h>
32 #include <ck_pr.h>
33 #include <ck_stdbool.h>
34 #include <ck_string.h>
35
36 /*
37 * Concurrent ring buffer.
38 */
39
40 struct ck_ring {
41 unsigned int c_head;
42 char pad[CK_MD_CACHELINE - sizeof(unsigned int)];
43 unsigned int p_tail;
44 unsigned int p_head;
45 char _pad[CK_MD_CACHELINE - sizeof(unsigned int) * 2];
46 unsigned int size;
47 unsigned int mask;
48 };
49 typedef struct ck_ring ck_ring_t;
50
51 struct ck_ring_buffer {
52 void *value;
53 };
54 typedef struct ck_ring_buffer ck_ring_buffer_t;
55
56 CK_CC_INLINE static unsigned int
ck_ring_size(const struct ck_ring * ring)57 ck_ring_size(const struct ck_ring *ring)
58 {
59 unsigned int c, p;
60
61 c = ck_pr_load_uint(&ring->c_head);
62 p = ck_pr_load_uint(&ring->p_tail);
63 return (p - c) & ring->mask;
64 }
65
66 CK_CC_INLINE static unsigned int
ck_ring_capacity(const struct ck_ring * ring)67 ck_ring_capacity(const struct ck_ring *ring)
68 {
69
70 return ring->size;
71 }
72
73 /*
74 * This function is only safe to call when there are no concurrent operations
75 * on the ring. This is primarily meant for persistent ck_ring use-cases. The
76 * function returns true if any mutations were performed on the ring.
77 */
78 CK_CC_INLINE static bool
ck_ring_repair(struct ck_ring * ring)79 ck_ring_repair(struct ck_ring *ring)
80 {
81 bool r = false;
82
83 if (ring->p_tail != ring->p_head) {
84 ring->p_tail = ring->p_head;
85 r = true;
86 }
87
88 return r;
89 }
90
91 /*
92 * This can be called when no concurrent updates are occurring on the ring
93 * structure to check for consistency. This is primarily meant to be used for
94 * persistent storage of the ring. If this functions returns false, the ring
95 * is in an inconsistent state.
96 */
97 CK_CC_INLINE static bool
ck_ring_valid(const struct ck_ring * ring)98 ck_ring_valid(const struct ck_ring *ring)
99 {
100 unsigned int size = ring->size;
101 unsigned int c_head = ring->c_head;
102 unsigned int p_head = ring->p_head;
103
104 /* The ring must be a power of 2. */
105 if (size & (size - 1))
106 return false;
107
108 /* The consumer counter must always be smaller than the producer. */
109 if (c_head > p_head)
110 return false;
111
112 /* The producer may only be up to size slots ahead of consumer. */
113 if (p_head - c_head >= size)
114 return false;
115
116 return true;
117 }
118
119 CK_CC_INLINE static void
ck_ring_init(struct ck_ring * ring,unsigned int size)120 ck_ring_init(struct ck_ring *ring, unsigned int size)
121 {
122
123 ring->size = size;
124 ring->mask = size - 1;
125 ring->p_tail = 0;
126 ring->p_head = 0;
127 ring->c_head = 0;
128 return;
129 }
130
131 /*
132 * The _ck_ring_* namespace is internal only and must not used externally.
133 */
134
135 /*
136 * This function will return a region of memory to write for the next value
137 * for a single producer.
138 */
139 CK_CC_FORCE_INLINE static void *
_ck_ring_enqueue_reserve_sp(struct ck_ring * ring,void * CK_CC_RESTRICT buffer,unsigned int ts,unsigned int * size)140 _ck_ring_enqueue_reserve_sp(struct ck_ring *ring,
141 void *CK_CC_RESTRICT buffer,
142 unsigned int ts,
143 unsigned int *size)
144 {
145 const unsigned int mask = ring->mask;
146 unsigned int consumer, producer, delta;
147
148 consumer = ck_pr_load_uint(&ring->c_head);
149 producer = ring->p_tail;
150 delta = producer + 1;
151 if (size != NULL)
152 *size = (producer - consumer) & mask;
153
154 if (CK_CC_UNLIKELY((delta & mask) == (consumer & mask)))
155 return NULL;
156
157 return (char *)buffer + ts * (producer & mask);
158 }
159
160 /*
161 * This is to be called to commit and make visible a region of previously
162 * reserved with reverse_sp.
163 */
164 CK_CC_FORCE_INLINE static void
_ck_ring_enqueue_commit_sp(struct ck_ring * ring)165 _ck_ring_enqueue_commit_sp(struct ck_ring *ring)
166 {
167
168 ck_pr_fence_store();
169 ck_pr_store_uint(&ring->p_tail, ring->p_tail + 1);
170 return;
171 }
172
173 CK_CC_FORCE_INLINE static bool
_ck_ring_enqueue_sp(struct ck_ring * ring,void * CK_CC_RESTRICT buffer,const void * CK_CC_RESTRICT entry,unsigned int ts,unsigned int * size)174 _ck_ring_enqueue_sp(struct ck_ring *ring,
175 void *CK_CC_RESTRICT buffer,
176 const void *CK_CC_RESTRICT entry,
177 unsigned int ts,
178 unsigned int *size)
179 {
180 const unsigned int mask = ring->mask;
181 unsigned int consumer, producer, delta;
182
183 consumer = ck_pr_load_uint(&ring->c_head);
184 producer = ring->p_tail;
185 delta = producer + 1;
186 if (size != NULL)
187 *size = (producer - consumer) & mask;
188
189 if (CK_CC_UNLIKELY((delta & mask) == (consumer & mask)))
190 return false;
191
192 buffer = (char *)buffer + ts * (producer & mask);
193 memcpy(buffer, entry, ts);
194
195 /*
196 * Make sure to update slot value before indicating
197 * that the slot is available for consumption.
198 */
199 ck_pr_fence_store();
200 ck_pr_store_uint(&ring->p_tail, delta);
201 return true;
202 }
203
204 CK_CC_FORCE_INLINE static bool
_ck_ring_enqueue_sp_size(struct ck_ring * ring,void * CK_CC_RESTRICT buffer,const void * CK_CC_RESTRICT entry,unsigned int ts,unsigned int * size)205 _ck_ring_enqueue_sp_size(struct ck_ring *ring,
206 void *CK_CC_RESTRICT buffer,
207 const void *CK_CC_RESTRICT entry,
208 unsigned int ts,
209 unsigned int *size)
210 {
211 unsigned int sz;
212 bool r;
213
214 r = _ck_ring_enqueue_sp(ring, buffer, entry, ts, &sz);
215 *size = sz;
216 return r;
217 }
218
219 CK_CC_FORCE_INLINE static bool
_ck_ring_dequeue_sc(struct ck_ring * ring,const void * CK_CC_RESTRICT buffer,void * CK_CC_RESTRICT target,unsigned int size)220 _ck_ring_dequeue_sc(struct ck_ring *ring,
221 const void *CK_CC_RESTRICT buffer,
222 void *CK_CC_RESTRICT target,
223 unsigned int size)
224 {
225 const unsigned int mask = ring->mask;
226 unsigned int consumer, producer;
227
228 consumer = ring->c_head;
229 producer = ck_pr_load_uint(&ring->p_tail);
230
231 if (CK_CC_UNLIKELY(consumer == producer))
232 return false;
233
234 /*
235 * Make sure to serialize with respect to our snapshot
236 * of the producer counter.
237 */
238 ck_pr_fence_load();
239
240 buffer = (const char *)buffer + size * (consumer & mask);
241 memcpy(target, buffer, size);
242
243 /*
244 * Make sure copy is completed with respect to consumer
245 * update.
246 */
247 ck_pr_fence_store();
248 ck_pr_store_uint(&ring->c_head, consumer + 1);
249 return true;
250 }
251
252 CK_CC_FORCE_INLINE static void *
_ck_ring_enqueue_reserve_mp(struct ck_ring * ring,void * buffer,unsigned int ts,unsigned int * ticket,unsigned int * size)253 _ck_ring_enqueue_reserve_mp(struct ck_ring *ring,
254 void *buffer,
255 unsigned int ts,
256 unsigned int *ticket,
257 unsigned int *size)
258 {
259 const unsigned int mask = ring->mask;
260 unsigned int producer, consumer, delta;
261
262 producer = ck_pr_load_uint(&ring->p_head);
263
264 for (;;) {
265 ck_pr_fence_load();
266 consumer = ck_pr_load_uint(&ring->c_head);
267
268 delta = producer + 1;
269
270 if (CK_CC_LIKELY((producer - consumer) < mask)) {
271 if (ck_pr_cas_uint_value(&ring->p_head,
272 producer, delta, &producer) == true) {
273 break;
274 }
275 } else {
276 unsigned int new_producer;
277
278 ck_pr_fence_load();
279 new_producer = ck_pr_load_uint(&ring->p_head);
280
281 if (producer == new_producer) {
282 if (size != NULL)
283 *size = (producer - consumer) & mask;
284
285 return false;
286 }
287
288 producer = new_producer;
289 }
290 }
291
292 *ticket = producer;
293 if (size != NULL)
294 *size = (producer - consumer) & mask;
295
296 return (char *)buffer + ts * (producer & mask);
297 }
298
299 CK_CC_FORCE_INLINE static void
_ck_ring_enqueue_commit_mp(struct ck_ring * ring,unsigned int producer)300 _ck_ring_enqueue_commit_mp(struct ck_ring *ring, unsigned int producer)
301 {
302
303 while (ck_pr_load_uint(&ring->p_tail) != producer)
304 ck_pr_stall();
305
306 ck_pr_fence_store();
307 ck_pr_store_uint(&ring->p_tail, producer + 1);
308 return;
309 }
310
311 CK_CC_FORCE_INLINE static bool
_ck_ring_enqueue_mp(struct ck_ring * ring,void * buffer,const void * entry,unsigned int ts,unsigned int * size)312 _ck_ring_enqueue_mp(struct ck_ring *ring,
313 void *buffer,
314 const void *entry,
315 unsigned int ts,
316 unsigned int *size)
317 {
318 const unsigned int mask = ring->mask;
319 unsigned int producer, consumer, delta;
320 bool r = true;
321
322 producer = ck_pr_load_uint(&ring->p_head);
323
324 for (;;) {
325 /*
326 * The snapshot of producer must be up to date with respect to
327 * consumer.
328 */
329 ck_pr_fence_load();
330 consumer = ck_pr_load_uint(&ring->c_head);
331
332 delta = producer + 1;
333
334 /*
335 * Only try to CAS if the producer is not clearly stale (not
336 * less than consumer) and the buffer is definitely not full.
337 */
338 if (CK_CC_LIKELY((producer - consumer) < mask)) {
339 if (ck_pr_cas_uint_value(&ring->p_head,
340 producer, delta, &producer) == true) {
341 break;
342 }
343 } else {
344 unsigned int new_producer;
345
346 /*
347 * Slow path. Either the buffer is full or we have a
348 * stale snapshot of p_head. Execute a second read of
349 * p_read that must be ordered wrt the snapshot of
350 * c_head.
351 */
352 ck_pr_fence_load();
353 new_producer = ck_pr_load_uint(&ring->p_head);
354
355 /*
356 * Only fail if we haven't made forward progress in
357 * production: the buffer must have been full when we
358 * read new_producer (or we wrapped around UINT_MAX
359 * during this iteration).
360 */
361 if (producer == new_producer) {
362 r = false;
363 goto leave;
364 }
365
366 /*
367 * p_head advanced during this iteration. Try again.
368 */
369 producer = new_producer;
370 }
371 }
372
373 buffer = (char *)buffer + ts * (producer & mask);
374 memcpy(buffer, entry, ts);
375
376 /*
377 * Wait until all concurrent producers have completed writing
378 * their data into the ring buffer.
379 */
380 while (ck_pr_load_uint(&ring->p_tail) != producer)
381 ck_pr_stall();
382
383 /*
384 * Ensure that copy is completed before updating shared producer
385 * counter.
386 */
387 ck_pr_fence_store();
388 ck_pr_store_uint(&ring->p_tail, delta);
389
390 leave:
391 if (size != NULL)
392 *size = (producer - consumer) & mask;
393
394 return r;
395 }
396
397 CK_CC_FORCE_INLINE static bool
_ck_ring_enqueue_mp_size(struct ck_ring * ring,void * buffer,const void * entry,unsigned int ts,unsigned int * size)398 _ck_ring_enqueue_mp_size(struct ck_ring *ring,
399 void *buffer,
400 const void *entry,
401 unsigned int ts,
402 unsigned int *size)
403 {
404 unsigned int sz;
405 bool r;
406
407 r = _ck_ring_enqueue_mp(ring, buffer, entry, ts, &sz);
408 *size = sz;
409 return r;
410 }
411
412 CK_CC_FORCE_INLINE static bool
_ck_ring_trydequeue_mc(struct ck_ring * ring,const void * buffer,void * data,unsigned int size)413 _ck_ring_trydequeue_mc(struct ck_ring *ring,
414 const void *buffer,
415 void *data,
416 unsigned int size)
417 {
418 const unsigned int mask = ring->mask;
419 unsigned int consumer, producer;
420
421 consumer = ck_pr_load_uint(&ring->c_head);
422 ck_pr_fence_load();
423 producer = ck_pr_load_uint(&ring->p_tail);
424
425 if (CK_CC_UNLIKELY(consumer == producer))
426 return false;
427
428 ck_pr_fence_load();
429
430 buffer = (const char *)buffer + size * (consumer & mask);
431 memcpy(data, buffer, size);
432
433 ck_pr_fence_store_atomic();
434 return ck_pr_cas_uint(&ring->c_head, consumer, consumer + 1);
435 }
436
437 CK_CC_FORCE_INLINE static bool
_ck_ring_dequeue_mc(struct ck_ring * ring,const void * buffer,void * data,unsigned int ts)438 _ck_ring_dequeue_mc(struct ck_ring *ring,
439 const void *buffer,
440 void *data,
441 unsigned int ts)
442 {
443 const unsigned int mask = ring->mask;
444 unsigned int consumer, producer;
445
446 consumer = ck_pr_load_uint(&ring->c_head);
447
448 do {
449 const char *target;
450
451 /*
452 * Producer counter must represent state relative to
453 * our latest consumer snapshot.
454 */
455 ck_pr_fence_load();
456 producer = ck_pr_load_uint(&ring->p_tail);
457
458 if (CK_CC_UNLIKELY(consumer == producer))
459 return false;
460
461 ck_pr_fence_load();
462
463 target = (const char *)buffer + ts * (consumer & mask);
464 memcpy(data, target, ts);
465
466 /* Serialize load with respect to head update. */
467 ck_pr_fence_store_atomic();
468 } while (ck_pr_cas_uint_value(&ring->c_head,
469 consumer,
470 consumer + 1,
471 &consumer) == false);
472
473 return true;
474 }
475
476 /*
477 * The ck_ring_*_spsc namespace is the public interface for interacting with a
478 * ring buffer containing pointers. Correctness is only provided if there is up
479 * to one concurrent consumer and up to one concurrent producer.
480 */
481 CK_CC_INLINE static bool
ck_ring_enqueue_spsc_size(struct ck_ring * ring,struct ck_ring_buffer * buffer,const void * entry,unsigned int * size)482 ck_ring_enqueue_spsc_size(struct ck_ring *ring,
483 struct ck_ring_buffer *buffer,
484 const void *entry,
485 unsigned int *size)
486 {
487
488 return _ck_ring_enqueue_sp_size(ring, buffer, &entry,
489 sizeof(entry), size);
490 }
491
492 CK_CC_INLINE static bool
ck_ring_enqueue_spsc(struct ck_ring * ring,struct ck_ring_buffer * buffer,const void * entry)493 ck_ring_enqueue_spsc(struct ck_ring *ring,
494 struct ck_ring_buffer *buffer,
495 const void *entry)
496 {
497
498 return _ck_ring_enqueue_sp(ring, buffer,
499 &entry, sizeof(entry), NULL);
500 }
501
502 CK_CC_INLINE static void *
ck_ring_enqueue_reserve_spsc_size(struct ck_ring * ring,struct ck_ring_buffer * buffer,unsigned int * size)503 ck_ring_enqueue_reserve_spsc_size(struct ck_ring *ring,
504 struct ck_ring_buffer *buffer,
505 unsigned int *size)
506 {
507
508 return _ck_ring_enqueue_reserve_sp(ring, buffer, sizeof(void *),
509 size);
510 }
511
512 CK_CC_INLINE static void *
ck_ring_enqueue_reserve_spsc(struct ck_ring * ring,struct ck_ring_buffer * buffer)513 ck_ring_enqueue_reserve_spsc(struct ck_ring *ring,
514 struct ck_ring_buffer *buffer)
515 {
516
517 return _ck_ring_enqueue_reserve_sp(ring, buffer, sizeof(void *),
518 NULL);
519 }
520
521 CK_CC_INLINE static void
ck_ring_enqueue_commit_spsc(struct ck_ring * ring)522 ck_ring_enqueue_commit_spsc(struct ck_ring *ring)
523 {
524
525 _ck_ring_enqueue_commit_sp(ring);
526 return;
527 }
528
529 CK_CC_INLINE static bool
ck_ring_dequeue_spsc(struct ck_ring * ring,const struct ck_ring_buffer * buffer,void * data)530 ck_ring_dequeue_spsc(struct ck_ring *ring,
531 const struct ck_ring_buffer *buffer,
532 void *data)
533 {
534
535 return _ck_ring_dequeue_sc(ring, buffer,
536 (void **)data, sizeof(void *));
537 }
538
539 /*
540 * The ck_ring_*_mpmc namespace is the public interface for interacting with a
541 * ring buffer containing pointers. Correctness is provided for any number of
542 * producers and consumers.
543 */
544 CK_CC_INLINE static bool
ck_ring_enqueue_mpmc(struct ck_ring * ring,struct ck_ring_buffer * buffer,const void * entry)545 ck_ring_enqueue_mpmc(struct ck_ring *ring,
546 struct ck_ring_buffer *buffer,
547 const void *entry)
548 {
549
550 return _ck_ring_enqueue_mp(ring, buffer, &entry, sizeof(entry), NULL);
551 }
552
553 CK_CC_INLINE static bool
ck_ring_enqueue_mpmc_size(struct ck_ring * ring,struct ck_ring_buffer * buffer,const void * entry,unsigned int * size)554 ck_ring_enqueue_mpmc_size(struct ck_ring *ring,
555 struct ck_ring_buffer *buffer,
556 const void *entry,
557 unsigned int *size)
558 {
559
560 return _ck_ring_enqueue_mp_size(ring, buffer, &entry, sizeof(entry),
561 size);
562 }
563
564 CK_CC_INLINE static void *
ck_ring_enqueue_reserve_mpmc(struct ck_ring * ring,struct ck_ring_buffer * buffer,unsigned int * ticket)565 ck_ring_enqueue_reserve_mpmc(struct ck_ring *ring,
566 struct ck_ring_buffer *buffer,
567 unsigned int *ticket)
568 {
569
570 return _ck_ring_enqueue_reserve_mp(ring, buffer, sizeof(void *),
571 ticket, NULL);
572 }
573
574 CK_CC_INLINE static void *
ck_ring_enqueue_reserve_mpmc_size(struct ck_ring * ring,struct ck_ring_buffer * buffer,unsigned int * ticket,unsigned int * size)575 ck_ring_enqueue_reserve_mpmc_size(struct ck_ring *ring,
576 struct ck_ring_buffer *buffer,
577 unsigned int *ticket,
578 unsigned int *size)
579 {
580
581 return _ck_ring_enqueue_reserve_mp(ring, buffer, sizeof(void *),
582 ticket, size);
583 }
584
585 CK_CC_INLINE static void
ck_ring_enqueue_commit_mpmc(struct ck_ring * ring,unsigned int ticket)586 ck_ring_enqueue_commit_mpmc(struct ck_ring *ring, unsigned int ticket)
587 {
588
589 _ck_ring_enqueue_commit_mp(ring, ticket);
590 return;
591 }
592
593 CK_CC_INLINE static bool
ck_ring_trydequeue_mpmc(struct ck_ring * ring,const struct ck_ring_buffer * buffer,void * data)594 ck_ring_trydequeue_mpmc(struct ck_ring *ring,
595 const struct ck_ring_buffer *buffer,
596 void *data)
597 {
598
599 return _ck_ring_trydequeue_mc(ring,
600 buffer, (void **)data, sizeof(void *));
601 }
602
603 CK_CC_INLINE static bool
ck_ring_dequeue_mpmc(struct ck_ring * ring,const struct ck_ring_buffer * buffer,void * data)604 ck_ring_dequeue_mpmc(struct ck_ring *ring,
605 const struct ck_ring_buffer *buffer,
606 void *data)
607 {
608
609 return _ck_ring_dequeue_mc(ring, buffer, (void **)data,
610 sizeof(void *));
611 }
612
613 /*
614 * The ck_ring_*_spmc namespace is the public interface for interacting with a
615 * ring buffer containing pointers. Correctness is provided for any number of
616 * consumers with up to one concurrent producer.
617 */
618 CK_CC_INLINE static void *
ck_ring_enqueue_reserve_spmc_size(struct ck_ring * ring,struct ck_ring_buffer * buffer,unsigned int * size)619 ck_ring_enqueue_reserve_spmc_size(struct ck_ring *ring,
620 struct ck_ring_buffer *buffer,
621 unsigned int *size)
622 {
623
624 return _ck_ring_enqueue_reserve_sp(ring, buffer, sizeof(void *), size);
625 }
626
627 CK_CC_INLINE static void *
ck_ring_enqueue_reserve_spmc(struct ck_ring * ring,struct ck_ring_buffer * buffer)628 ck_ring_enqueue_reserve_spmc(struct ck_ring *ring,
629 struct ck_ring_buffer *buffer)
630 {
631
632 return _ck_ring_enqueue_reserve_sp(ring, buffer, sizeof(void *), NULL);
633 }
634
635 CK_CC_INLINE static void
ck_ring_enqueue_commit_spmc(struct ck_ring * ring)636 ck_ring_enqueue_commit_spmc(struct ck_ring *ring)
637 {
638
639 _ck_ring_enqueue_commit_sp(ring);
640 return;
641 }
642
643 CK_CC_INLINE static bool
ck_ring_enqueue_spmc_size(struct ck_ring * ring,struct ck_ring_buffer * buffer,const void * entry,unsigned int * size)644 ck_ring_enqueue_spmc_size(struct ck_ring *ring,
645 struct ck_ring_buffer *buffer,
646 const void *entry,
647 unsigned int *size)
648 {
649
650 return _ck_ring_enqueue_sp_size(ring, buffer, &entry,
651 sizeof(entry), size);
652 }
653
654 CK_CC_INLINE static bool
ck_ring_enqueue_spmc(struct ck_ring * ring,struct ck_ring_buffer * buffer,const void * entry)655 ck_ring_enqueue_spmc(struct ck_ring *ring,
656 struct ck_ring_buffer *buffer,
657 const void *entry)
658 {
659
660 return _ck_ring_enqueue_sp(ring, buffer, &entry,
661 sizeof(entry), NULL);
662 }
663
664 CK_CC_INLINE static bool
ck_ring_trydequeue_spmc(struct ck_ring * ring,const struct ck_ring_buffer * buffer,void * data)665 ck_ring_trydequeue_spmc(struct ck_ring *ring,
666 const struct ck_ring_buffer *buffer,
667 void *data)
668 {
669
670 return _ck_ring_trydequeue_mc(ring, buffer, (void **)data, sizeof(void *));
671 }
672
673 CK_CC_INLINE static bool
ck_ring_dequeue_spmc(struct ck_ring * ring,const struct ck_ring_buffer * buffer,void * data)674 ck_ring_dequeue_spmc(struct ck_ring *ring,
675 const struct ck_ring_buffer *buffer,
676 void *data)
677 {
678
679 return _ck_ring_dequeue_mc(ring, buffer, (void **)data, sizeof(void *));
680 }
681
682 /*
683 * The ck_ring_*_mpsc namespace is the public interface for interacting with a
684 * ring buffer containing pointers. Correctness is provided for any number of
685 * producers with up to one concurrent consumers.
686 */
687 CK_CC_INLINE static void *
ck_ring_enqueue_reserve_mpsc(struct ck_ring * ring,struct ck_ring_buffer * buffer,unsigned int * ticket)688 ck_ring_enqueue_reserve_mpsc(struct ck_ring *ring,
689 struct ck_ring_buffer *buffer,
690 unsigned int *ticket)
691 {
692
693 return _ck_ring_enqueue_reserve_mp(ring, buffer, sizeof(void *),
694 ticket, NULL);
695 }
696
697 CK_CC_INLINE static void *
ck_ring_enqueue_reserve_mpsc_size(struct ck_ring * ring,struct ck_ring_buffer * buffer,unsigned int * ticket,unsigned int * size)698 ck_ring_enqueue_reserve_mpsc_size(struct ck_ring *ring,
699 struct ck_ring_buffer *buffer,
700 unsigned int *ticket,
701 unsigned int *size)
702 {
703
704 return _ck_ring_enqueue_reserve_mp(ring, buffer, sizeof(void *),
705 ticket, size);
706 }
707
708 CK_CC_INLINE static void
ck_ring_enqueue_commit_mpsc(struct ck_ring * ring,unsigned int ticket)709 ck_ring_enqueue_commit_mpsc(struct ck_ring *ring, unsigned int ticket)
710 {
711
712 _ck_ring_enqueue_commit_mp(ring, ticket);
713 return;
714 }
715
716 CK_CC_INLINE static bool
ck_ring_enqueue_mpsc(struct ck_ring * ring,struct ck_ring_buffer * buffer,const void * entry)717 ck_ring_enqueue_mpsc(struct ck_ring *ring,
718 struct ck_ring_buffer *buffer,
719 const void *entry)
720 {
721
722 return _ck_ring_enqueue_mp(ring, buffer, &entry,
723 sizeof(entry), NULL);
724 }
725
726 CK_CC_INLINE static bool
ck_ring_enqueue_mpsc_size(struct ck_ring * ring,struct ck_ring_buffer * buffer,const void * entry,unsigned int * size)727 ck_ring_enqueue_mpsc_size(struct ck_ring *ring,
728 struct ck_ring_buffer *buffer,
729 const void *entry,
730 unsigned int *size)
731 {
732
733 return _ck_ring_enqueue_mp_size(ring, buffer, &entry,
734 sizeof(entry), size);
735 }
736
737 CK_CC_INLINE static bool
ck_ring_dequeue_mpsc(struct ck_ring * ring,const struct ck_ring_buffer * buffer,void * data)738 ck_ring_dequeue_mpsc(struct ck_ring *ring,
739 const struct ck_ring_buffer *buffer,
740 void *data)
741 {
742
743 return _ck_ring_dequeue_sc(ring, buffer, (void **)data,
744 sizeof(void *));
745 }
746
747 /*
748 * CK_RING_PROTOTYPE is used to define a type-safe interface for inlining
749 * values of a particular type in the ring the buffer.
750 */
751 #define CK_RING_PROTOTYPE(name, type) \
752 CK_CC_INLINE static struct type * \
753 ck_ring_enqueue_reserve_spsc_##name(struct ck_ring *a, \
754 struct type *b) \
755 { \
756 \
757 return _ck_ring_enqueue_reserve_sp(a, b, \
758 sizeof(struct type), NULL); \
759 } \
760 \
761 CK_CC_INLINE static struct type * \
762 ck_ring_enqueue_reserve_spsc_size_##name(struct ck_ring *a, \
763 struct type *b, \
764 unsigned int *c) \
765 { \
766 \
767 return _ck_ring_enqueue_reserve_sp(a, b, \
768 sizeof(struct type), c); \
769 } \
770 \
771 CK_CC_INLINE static bool \
772 ck_ring_enqueue_spsc_size_##name(struct ck_ring *a, \
773 struct type *b, \
774 struct type *c, \
775 unsigned int *d) \
776 { \
777 \
778 return _ck_ring_enqueue_sp_size(a, b, c, \
779 sizeof(struct type), d); \
780 } \
781 \
782 CK_CC_INLINE static bool \
783 ck_ring_enqueue_spsc_##name(struct ck_ring *a, \
784 struct type *b, \
785 struct type *c) \
786 { \
787 \
788 return _ck_ring_enqueue_sp(a, b, c, \
789 sizeof(struct type), NULL); \
790 } \
791 \
792 CK_CC_INLINE static bool \
793 ck_ring_dequeue_spsc_##name(struct ck_ring *a, \
794 struct type *b, \
795 struct type *c) \
796 { \
797 \
798 return _ck_ring_dequeue_sc(a, b, c, \
799 sizeof(struct type)); \
800 } \
801 \
802 CK_CC_INLINE static struct type * \
803 ck_ring_enqueue_reserve_spmc_##name(struct ck_ring *a, \
804 struct type *b) \
805 { \
806 \
807 return _ck_ring_enqueue_reserve_sp(a, b, \
808 sizeof(struct type), NULL); \
809 } \
810 \
811 CK_CC_INLINE static struct type * \
812 ck_ring_enqueue_reserve_spmc_size_##name(struct ck_ring *a, \
813 struct type *b, \
814 unsigned int *c) \
815 { \
816 \
817 return _ck_ring_enqueue_reserve_sp(a, b, \
818 sizeof(struct type), c); \
819 } \
820 \
821 CK_CC_INLINE static bool \
822 ck_ring_enqueue_spmc_size_##name(struct ck_ring *a, \
823 struct type *b, \
824 struct type *c, \
825 unsigned int *d) \
826 { \
827 \
828 return _ck_ring_enqueue_sp_size(a, b, c, \
829 sizeof(struct type), d); \
830 } \
831 \
832 CK_CC_INLINE static bool \
833 ck_ring_enqueue_spmc_##name(struct ck_ring *a, \
834 struct type *b, \
835 struct type *c) \
836 { \
837 \
838 return _ck_ring_enqueue_sp(a, b, c, \
839 sizeof(struct type), NULL); \
840 } \
841 \
842 CK_CC_INLINE static bool \
843 ck_ring_trydequeue_spmc_##name(struct ck_ring *a, \
844 struct type *b, \
845 struct type *c) \
846 { \
847 \
848 return _ck_ring_trydequeue_mc(a, \
849 b, c, sizeof(struct type)); \
850 } \
851 \
852 CK_CC_INLINE static bool \
853 ck_ring_dequeue_spmc_##name(struct ck_ring *a, \
854 struct type *b, \
855 struct type *c) \
856 { \
857 \
858 return _ck_ring_dequeue_mc(a, b, c, \
859 sizeof(struct type)); \
860 } \
861 \
862 CK_CC_INLINE static struct type * \
863 ck_ring_enqueue_reserve_mpsc_##name(struct ck_ring *a, \
864 struct type *b, \
865 unsigned int *c) \
866 { \
867 \
868 return _ck_ring_enqueue_reserve_mp(a, b, \
869 sizeof(struct type), c, NULL); \
870 } \
871 \
872 CK_CC_INLINE static struct type * \
873 ck_ring_enqueue_reserve_mpsc_size_##name(struct ck_ring *a, \
874 struct type *b, \
875 unsigned int *c, \
876 unsigned int *d) \
877 { \
878 \
879 return _ck_ring_enqueue_reserve_mp(a, b, \
880 sizeof(struct type), c, d); \
881 } \
882 \
883 CK_CC_INLINE static bool \
884 ck_ring_enqueue_mpsc_##name(struct ck_ring *a, \
885 struct type *b, \
886 struct type *c) \
887 { \
888 \
889 return _ck_ring_enqueue_mp(a, b, c, \
890 sizeof(struct type), NULL); \
891 } \
892 \
893 CK_CC_INLINE static bool \
894 ck_ring_enqueue_mpsc_size_##name(struct ck_ring *a, \
895 struct type *b, \
896 struct type *c, \
897 unsigned int *d) \
898 { \
899 \
900 return _ck_ring_enqueue_mp_size(a, b, c, \
901 sizeof(struct type), d); \
902 } \
903 \
904 CK_CC_INLINE static bool \
905 ck_ring_dequeue_mpsc_##name(struct ck_ring *a, \
906 struct type *b, \
907 struct type *c) \
908 { \
909 \
910 return _ck_ring_dequeue_sc(a, b, c, \
911 sizeof(struct type)); \
912 } \
913 \
914 CK_CC_INLINE static struct type * \
915 ck_ring_enqueue_reserve_mpmc_##name(struct ck_ring *a, \
916 struct type *b, \
917 unsigned int *c) \
918 { \
919 \
920 return _ck_ring_enqueue_reserve_mp(a, b, \
921 sizeof(struct type), c, NULL); \
922 } \
923 \
924 CK_CC_INLINE static struct type * \
925 ck_ring_enqueue_reserve_mpmc_size_##name(struct ck_ring *a, \
926 struct type *b, \
927 unsigned int *c, \
928 unsigned int *d) \
929 { \
930 \
931 return _ck_ring_enqueue_reserve_mp(a, b, \
932 sizeof(struct type), c, d); \
933 } \
934 \
935 CK_CC_INLINE static bool \
936 ck_ring_enqueue_mpmc_size_##name(struct ck_ring *a, \
937 struct type *b, \
938 struct type *c, \
939 unsigned int *d) \
940 { \
941 \
942 return _ck_ring_enqueue_mp_size(a, b, c, \
943 sizeof(struct type), d); \
944 } \
945 \
946 CK_CC_INLINE static bool \
947 ck_ring_enqueue_mpmc_##name(struct ck_ring *a, \
948 struct type *b, \
949 struct type *c) \
950 { \
951 \
952 return _ck_ring_enqueue_mp(a, b, c, \
953 sizeof(struct type), NULL); \
954 } \
955 \
956 CK_CC_INLINE static bool \
957 ck_ring_trydequeue_mpmc_##name(struct ck_ring *a, \
958 struct type *b, \
959 struct type *c) \
960 { \
961 \
962 return _ck_ring_trydequeue_mc(a, \
963 b, c, sizeof(struct type)); \
964 } \
965 \
966 CK_CC_INLINE static bool \
967 ck_ring_dequeue_mpmc_##name(struct ck_ring *a, \
968 struct type *b, \
969 struct type *c) \
970 { \
971 \
972 return _ck_ring_dequeue_mc(a, b, c, \
973 sizeof(struct type)); \
974 }
975
976 /*
977 * A single producer with one concurrent consumer.
978 */
979 #define CK_RING_ENQUEUE_SPSC(name, a, b, c) \
980 ck_ring_enqueue_spsc_##name(a, b, c)
981 #define CK_RING_ENQUEUE_SPSC_SIZE(name, a, b, c, d) \
982 ck_ring_enqueue_spsc_size_##name(a, b, c, d)
983 #define CK_RING_ENQUEUE_RESERVE_SPSC(name, a, b, c) \
984 ck_ring_enqueue_reserve_spsc_##name(a, b, c)
985 #define CK_RING_ENQUEUE_RESERVE_SPSC_SIZE(name, a, b, c, d) \
986 ck_ring_enqueue_reserve_spsc_size_##name(a, b, c, d)
987 #define CK_RING_DEQUEUE_SPSC(name, a, b, c) \
988 ck_ring_dequeue_spsc_##name(a, b, c)
989
990 /*
991 * A single producer with any number of concurrent consumers.
992 */
993 #define CK_RING_ENQUEUE_SPMC(name, a, b, c) \
994 ck_ring_enqueue_spmc_##name(a, b, c)
995 #define CK_RING_ENQUEUE_SPMC_SIZE(name, a, b, c, d) \
996 ck_ring_enqueue_spmc_size_##name(a, b, c, d)
997 #define CK_RING_ENQUEUE_RESERVE_SPMC(name, a, b, c) \
998 ck_ring_enqueue_reserve_spmc_##name(a, b, c)
999 #define CK_RING_ENQUEUE_RESERVE_SPMC_SIZE(name, a, b, c, d) \
1000 ck_ring_enqueue_reserve_spmc_size_##name(a, b, c, d)
1001 #define CK_RING_TRYDEQUEUE_SPMC(name, a, b, c) \
1002 ck_ring_trydequeue_spmc_##name(a, b, c)
1003 #define CK_RING_DEQUEUE_SPMC(name, a, b, c) \
1004 ck_ring_dequeue_spmc_##name(a, b, c)
1005
1006 /*
1007 * Any number of concurrent producers with up to one
1008 * concurrent consumer.
1009 */
1010 #define CK_RING_ENQUEUE_MPSC(name, a, b, c) \
1011 ck_ring_enqueue_mpsc_##name(a, b, c)
1012 #define CK_RING_ENQUEUE_MPSC_SIZE(name, a, b, c, d) \
1013 ck_ring_enqueue_mpsc_size_##name(a, b, c, d)
1014 #define CK_RING_ENQUEUE_RESERVE_MPSC(name, a, b, c) \
1015 ck_ring_enqueue_reserve_mpsc_##name(a, b, c)
1016 #define CK_RING_ENQUEUE_RESERVE_MPSC_SIZE(name, a, b, c, d) \
1017 ck_ring_enqueue_reserve_mpsc_size_##name(a, b, c, d)
1018 #define CK_RING_DEQUEUE_MPSC(name, a, b, c) \
1019 ck_ring_dequeue_mpsc_##name(a, b, c)
1020
1021 /*
1022 * Any number of concurrent producers and consumers.
1023 */
1024 #define CK_RING_ENQUEUE_MPMC(name, a, b, c) \
1025 ck_ring_enqueue_mpmc_##name(a, b, c)
1026 #define CK_RING_ENQUEUE_MPMC_SIZE(name, a, b, c, d) \
1027 ck_ring_enqueue_mpmc_size_##name(a, b, c, d)
1028 #define CK_RING_ENQUEUE_RESERVE_MPMC(name, a, b, c) \
1029 ck_ring_enqueue_reserve_mpmc_##name(a, b, c)
1030 #define CK_RING_ENQUEUE_RESERVE_MPMC_SIZE(name, a, b, c, d) \
1031 ck_ring_enqueue_reserve_mpmc_size_##name(a, b, c, d)
1032 #define CK_RING_TRYDEQUEUE_MPMC(name, a, b, c) \
1033 ck_ring_trydequeue_mpmc_##name(a, b, c)
1034 #define CK_RING_DEQUEUE_MPMC(name, a, b, c) \
1035 ck_ring_dequeue_mpmc_##name(a, b, c)
1036
1037 #endif /* CK_RING_H */
1038