xref: /freebsd/sys/contrib/ck/include/ck_ring.h (revision 18054d0220cfc8df9c9568c437bd6fbb59d53c3c)
1 /*
2  * Copyright 2009-2015 Samy Al Bahra.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24  * SUCH DAMAGE.
25  */
26 
27 #ifndef CK_RING_H
28 #define CK_RING_H
29 
30 #include <ck_cc.h>
31 #include <ck_md.h>
32 #include <ck_pr.h>
33 #include <ck_stdbool.h>
34 #include <ck_string.h>
35 
36 /*
37  * Concurrent ring buffer.
38  */
39 
40 struct ck_ring {
41 	unsigned int c_head;
42 	char pad[CK_MD_CACHELINE - sizeof(unsigned int)];
43 	unsigned int p_tail;
44 	unsigned int p_head;
45 	char _pad[CK_MD_CACHELINE - sizeof(unsigned int) * 2];
46 	unsigned int size;
47 	unsigned int mask;
48 };
49 typedef struct ck_ring ck_ring_t;
50 
51 struct ck_ring_buffer {
52 	void *value;
53 };
54 typedef struct ck_ring_buffer ck_ring_buffer_t;
55 
56 CK_CC_INLINE static unsigned int
57 ck_ring_size(const struct ck_ring *ring)
58 {
59 	unsigned int c, p;
60 
61 	c = ck_pr_load_uint(&ring->c_head);
62 	p = ck_pr_load_uint(&ring->p_tail);
63 	return (p - c) & ring->mask;
64 }
65 
66 CK_CC_INLINE static unsigned int
67 ck_ring_capacity(const struct ck_ring *ring)
68 {
69 
70 	return ring->size;
71 }
72 
73 /*
74  * This function is only safe to call when there are no concurrent operations
75  * on the ring. This is primarily meant for persistent ck_ring use-cases. The
76  * function returns true if any mutations were performed on the ring.
77  */
78 CK_CC_INLINE static bool
79 ck_ring_repair(struct ck_ring *ring)
80 {
81 	bool r = false;
82 
83 	if (ring->p_tail != ring->p_head) {
84 		ring->p_tail = ring->p_head;
85 		r = true;
86 	}
87 
88 	return r;
89 }
90 
91 /*
92  * This can be called when no concurrent updates are occurring on the ring
93  * structure to check for consistency. This is primarily meant to be used for
94  * persistent storage of the ring. If this functions returns false, the ring
95  * is in an inconsistent state.
96  */
97 CK_CC_INLINE static bool
98 ck_ring_valid(const struct ck_ring *ring)
99 {
100 	unsigned int size = ring->size;
101 	unsigned int c_head = ring->c_head;
102 	unsigned int p_head = ring->p_head;
103 
104 	/* The ring must be a power of 2. */
105 	if (size & (size - 1))
106 		return false;
107 
108 	/* The consumer counter must always be smaller than the producer. */
109 	if (c_head > p_head)
110 		return false;
111 
112 	/* The producer may only be up to size slots ahead of consumer. */
113 	if (p_head - c_head >= size)
114 		return false;
115 
116 	return true;
117 }
118 
119 CK_CC_INLINE static void
120 ck_ring_init(struct ck_ring *ring, unsigned int size)
121 {
122 
123 	ring->size = size;
124 	ring->mask = size - 1;
125 	ring->p_tail = 0;
126 	ring->p_head = 0;
127 	ring->c_head = 0;
128 	return;
129 }
130 
131 /*
132  * The _ck_ring_* namespace is internal only and must not used externally.
133  */
134 
135 /*
136  * This function will return a region of memory to write for the next value
137  * for a single producer.
138  */
139 CK_CC_FORCE_INLINE static void *
140 _ck_ring_enqueue_reserve_sp(struct ck_ring *ring,
141     void *CK_CC_RESTRICT buffer,
142     unsigned int ts,
143     unsigned int *size)
144 {
145 	const unsigned int mask = ring->mask;
146 	unsigned int consumer, producer, delta;
147 
148 	consumer = ck_pr_load_uint(&ring->c_head);
149 	producer = ring->p_tail;
150 	delta = producer + 1;
151 	if (size != NULL)
152 		*size = (producer - consumer) & mask;
153 
154 	if (CK_CC_UNLIKELY((delta & mask) == (consumer & mask)))
155 		return NULL;
156 
157 	return (char *)buffer + ts * (producer & mask);
158 }
159 
160 /*
161  * This is to be called to commit and make visible a region of previously
162  * reserved with reverse_sp.
163  */
164 CK_CC_FORCE_INLINE static void
165 _ck_ring_enqueue_commit_sp(struct ck_ring *ring)
166 {
167 
168 	ck_pr_fence_store();
169 	ck_pr_store_uint(&ring->p_tail, ring->p_tail + 1);
170 	return;
171 }
172 
173 CK_CC_FORCE_INLINE static bool
174 _ck_ring_enqueue_sp(struct ck_ring *ring,
175     void *CK_CC_RESTRICT buffer,
176     const void *CK_CC_RESTRICT entry,
177     unsigned int ts,
178     unsigned int *size)
179 {
180 	const unsigned int mask = ring->mask;
181 	unsigned int consumer, producer, delta;
182 
183 	consumer = ck_pr_load_uint(&ring->c_head);
184 	producer = ring->p_tail;
185 	delta = producer + 1;
186 	if (size != NULL)
187 		*size = (producer - consumer) & mask;
188 
189 	if (CK_CC_UNLIKELY((delta & mask) == (consumer & mask)))
190 		return false;
191 
192 	buffer = (char *)buffer + ts * (producer & mask);
193 	memcpy(buffer, entry, ts);
194 
195 	/*
196 	 * Make sure to update slot value before indicating
197 	 * that the slot is available for consumption.
198 	 */
199 	ck_pr_fence_store();
200 	ck_pr_store_uint(&ring->p_tail, delta);
201 	return true;
202 }
203 
204 CK_CC_FORCE_INLINE static bool
205 _ck_ring_enqueue_sp_size(struct ck_ring *ring,
206     void *CK_CC_RESTRICT buffer,
207     const void *CK_CC_RESTRICT entry,
208     unsigned int ts,
209     unsigned int *size)
210 {
211 	unsigned int sz;
212 	bool r;
213 
214 	r = _ck_ring_enqueue_sp(ring, buffer, entry, ts, &sz);
215 	*size = sz;
216 	return r;
217 }
218 
219 CK_CC_FORCE_INLINE static bool
220 _ck_ring_dequeue_sc(struct ck_ring *ring,
221     const void *CK_CC_RESTRICT buffer,
222     void *CK_CC_RESTRICT target,
223     unsigned int size)
224 {
225 	const unsigned int mask = ring->mask;
226 	unsigned int consumer, producer;
227 
228 	consumer = ring->c_head;
229 	producer = ck_pr_load_uint(&ring->p_tail);
230 
231 	if (CK_CC_UNLIKELY(consumer == producer))
232 		return false;
233 
234 	/*
235 	 * Make sure to serialize with respect to our snapshot
236 	 * of the producer counter.
237 	 */
238 	ck_pr_fence_load();
239 
240 	buffer = (const char *)buffer + size * (consumer & mask);
241 	memcpy(target, buffer, size);
242 
243 	/*
244 	 * Make sure copy is completed with respect to consumer
245 	 * update.
246 	 */
247 	ck_pr_fence_store();
248 	ck_pr_store_uint(&ring->c_head, consumer + 1);
249 	return true;
250 }
251 
252 CK_CC_FORCE_INLINE static void *
253 _ck_ring_enqueue_reserve_mp(struct ck_ring *ring,
254     void *buffer,
255     unsigned int ts,
256     unsigned int *ticket,
257     unsigned int *size)
258 {
259 	const unsigned int mask = ring->mask;
260 	unsigned int producer, consumer, delta;
261 
262 	producer = ck_pr_load_uint(&ring->p_head);
263 
264 	for (;;) {
265 		ck_pr_fence_load();
266 		consumer = ck_pr_load_uint(&ring->c_head);
267 
268 		delta = producer + 1;
269 
270 		if (CK_CC_LIKELY((producer - consumer) < mask)) {
271 			if (ck_pr_cas_uint_value(&ring->p_head,
272 			    producer, delta, &producer) == true) {
273 				break;
274 			}
275 		} else {
276 			unsigned int new_producer;
277 
278 			ck_pr_fence_load();
279 			new_producer = ck_pr_load_uint(&ring->p_head);
280 
281 			if (producer == new_producer) {
282 				if (size != NULL)
283 					*size = (producer - consumer) & mask;
284 
285 				return false;
286 			}
287 
288 			producer = new_producer;
289 		}
290 	}
291 
292 	*ticket = producer;
293 	if (size != NULL)
294 		*size = (producer - consumer) & mask;
295 
296 	return (char *)buffer + ts * (producer & mask);
297 }
298 
299 CK_CC_FORCE_INLINE static void
300 _ck_ring_enqueue_commit_mp(struct ck_ring *ring, unsigned int producer)
301 {
302 
303 	while (ck_pr_load_uint(&ring->p_tail) != producer)
304 		ck_pr_stall();
305 
306 	ck_pr_fence_store();
307 	ck_pr_store_uint(&ring->p_tail, producer + 1);
308 	return;
309 }
310 
311 CK_CC_FORCE_INLINE static bool
312 _ck_ring_enqueue_mp(struct ck_ring *ring,
313     void *buffer,
314     const void *entry,
315     unsigned int ts,
316     unsigned int *size)
317 {
318 	const unsigned int mask = ring->mask;
319 	unsigned int producer, consumer, delta;
320 	bool r = true;
321 
322 	producer = ck_pr_load_uint(&ring->p_head);
323 
324 	for (;;) {
325 		/*
326 		 * The snapshot of producer must be up to date with respect to
327 		 * consumer.
328 		 */
329 		ck_pr_fence_load();
330 		consumer = ck_pr_load_uint(&ring->c_head);
331 
332 		delta = producer + 1;
333 
334 		/*
335 		 * Only try to CAS if the producer is not clearly stale (not
336 		 * less than consumer) and the buffer is definitely not full.
337 		 */
338 		if (CK_CC_LIKELY((producer - consumer) < mask)) {
339 			if (ck_pr_cas_uint_value(&ring->p_head,
340 			    producer, delta, &producer) == true) {
341 				break;
342 			}
343 		} else {
344 			unsigned int new_producer;
345 
346 			/*
347 			 * Slow path.  Either the buffer is full or we have a
348 			 * stale snapshot of p_head.  Execute a second read of
349 			 * p_read that must be ordered wrt the snapshot of
350 			 * c_head.
351 			 */
352 			ck_pr_fence_load();
353 			new_producer = ck_pr_load_uint(&ring->p_head);
354 
355 			/*
356 			 * Only fail if we haven't made forward progress in
357 			 * production: the buffer must have been full when we
358 			 * read new_producer (or we wrapped around UINT_MAX
359 			 * during this iteration).
360 			 */
361 			if (producer == new_producer) {
362 				r = false;
363 				goto leave;
364 			}
365 
366 			/*
367 			 * p_head advanced during this iteration. Try again.
368 			 */
369 			producer = new_producer;
370 		}
371 	}
372 
373 	buffer = (char *)buffer + ts * (producer & mask);
374 	memcpy(buffer, entry, ts);
375 
376 	/*
377 	 * Wait until all concurrent producers have completed writing
378 	 * their data into the ring buffer.
379 	 */
380 	while (ck_pr_load_uint(&ring->p_tail) != producer)
381 		ck_pr_stall();
382 
383 	/*
384 	 * Ensure that copy is completed before updating shared producer
385 	 * counter.
386 	 */
387 	ck_pr_fence_store();
388 	ck_pr_store_uint(&ring->p_tail, delta);
389 
390 leave:
391 	if (size != NULL)
392 		*size = (producer - consumer) & mask;
393 
394 	return r;
395 }
396 
397 CK_CC_FORCE_INLINE static bool
398 _ck_ring_enqueue_mp_size(struct ck_ring *ring,
399     void *buffer,
400     const void *entry,
401     unsigned int ts,
402     unsigned int *size)
403 {
404 	unsigned int sz;
405 	bool r;
406 
407 	r = _ck_ring_enqueue_mp(ring, buffer, entry, ts, &sz);
408 	*size = sz;
409 	return r;
410 }
411 
412 CK_CC_FORCE_INLINE static bool
413 _ck_ring_trydequeue_mc(struct ck_ring *ring,
414     const void *buffer,
415     void *data,
416     unsigned int size)
417 {
418 	const unsigned int mask = ring->mask;
419 	unsigned int consumer, producer;
420 
421 	consumer = ck_pr_load_uint(&ring->c_head);
422 	ck_pr_fence_load();
423 	producer = ck_pr_load_uint(&ring->p_tail);
424 
425 	if (CK_CC_UNLIKELY(consumer == producer))
426 		return false;
427 
428 	ck_pr_fence_load();
429 
430 	buffer = (const char *)buffer + size * (consumer & mask);
431 	memcpy(data, buffer, size);
432 
433 	ck_pr_fence_store_atomic();
434 	return ck_pr_cas_uint(&ring->c_head, consumer, consumer + 1);
435 }
436 
437 CK_CC_FORCE_INLINE static bool
438 _ck_ring_dequeue_mc(struct ck_ring *ring,
439     const void *buffer,
440     void *data,
441     unsigned int ts)
442 {
443 	const unsigned int mask = ring->mask;
444 	unsigned int consumer, producer;
445 
446 	consumer = ck_pr_load_uint(&ring->c_head);
447 
448 	do {
449 		const char *target;
450 
451 		/*
452 		 * Producer counter must represent state relative to
453 		 * our latest consumer snapshot.
454 		 */
455 		ck_pr_fence_load();
456 		producer = ck_pr_load_uint(&ring->p_tail);
457 
458 		if (CK_CC_UNLIKELY(consumer == producer))
459 			return false;
460 
461 		ck_pr_fence_load();
462 
463 		target = (const char *)buffer + ts * (consumer & mask);
464 		memcpy(data, target, ts);
465 
466 		/* Serialize load with respect to head update. */
467 		ck_pr_fence_store_atomic();
468 	} while (ck_pr_cas_uint_value(&ring->c_head,
469 				      consumer,
470 				      consumer + 1,
471 				      &consumer) == false);
472 
473 	return true;
474 }
475 
476 /*
477  * The ck_ring_*_spsc namespace is the public interface for interacting with a
478  * ring buffer containing pointers. Correctness is only provided if there is up
479  * to one concurrent consumer and up to one concurrent producer.
480  */
481 CK_CC_INLINE static bool
482 ck_ring_enqueue_spsc_size(struct ck_ring *ring,
483     struct ck_ring_buffer *buffer,
484     const void *entry,
485     unsigned int *size)
486 {
487 
488 	return _ck_ring_enqueue_sp_size(ring, buffer, &entry,
489 	    sizeof(entry), size);
490 }
491 
492 CK_CC_INLINE static bool
493 ck_ring_enqueue_spsc(struct ck_ring *ring,
494     struct ck_ring_buffer *buffer,
495     const void *entry)
496 {
497 
498 	return _ck_ring_enqueue_sp(ring, buffer,
499 	    &entry, sizeof(entry), NULL);
500 }
501 
502 CK_CC_INLINE static void *
503 ck_ring_enqueue_reserve_spsc_size(struct ck_ring *ring,
504     struct ck_ring_buffer *buffer,
505     unsigned int *size)
506 {
507 
508 	return _ck_ring_enqueue_reserve_sp(ring, buffer, sizeof(void *),
509 	    size);
510 }
511 
512 CK_CC_INLINE static void *
513 ck_ring_enqueue_reserve_spsc(struct ck_ring *ring,
514     struct ck_ring_buffer *buffer)
515 {
516 
517 	return _ck_ring_enqueue_reserve_sp(ring, buffer, sizeof(void *),
518 	    NULL);
519 }
520 
521 CK_CC_INLINE static void
522 ck_ring_enqueue_commit_spsc(struct ck_ring *ring)
523 {
524 
525 	_ck_ring_enqueue_commit_sp(ring);
526 	return;
527 }
528 
529 CK_CC_INLINE static bool
530 ck_ring_dequeue_spsc(struct ck_ring *ring,
531     const struct ck_ring_buffer *buffer,
532     void *data)
533 {
534 
535 	return _ck_ring_dequeue_sc(ring, buffer,
536 	    (void **)data, sizeof(void *));
537 }
538 
539 /*
540  * The ck_ring_*_mpmc namespace is the public interface for interacting with a
541  * ring buffer containing pointers. Correctness is provided for any number of
542  * producers and consumers.
543  */
544 CK_CC_INLINE static bool
545 ck_ring_enqueue_mpmc(struct ck_ring *ring,
546     struct ck_ring_buffer *buffer,
547     const void *entry)
548 {
549 
550 	return _ck_ring_enqueue_mp(ring, buffer, &entry, sizeof(entry), NULL);
551 }
552 
553 CK_CC_INLINE static bool
554 ck_ring_enqueue_mpmc_size(struct ck_ring *ring,
555     struct ck_ring_buffer *buffer,
556     const void *entry,
557     unsigned int *size)
558 {
559 
560 	return _ck_ring_enqueue_mp_size(ring, buffer, &entry, sizeof(entry),
561 	    size);
562 }
563 
564 CK_CC_INLINE static void *
565 ck_ring_enqueue_reserve_mpmc(struct ck_ring *ring,
566     struct ck_ring_buffer *buffer,
567     unsigned int *ticket)
568 {
569 
570 	return _ck_ring_enqueue_reserve_mp(ring, buffer, sizeof(void *),
571 	    ticket, NULL);
572 }
573 
574 CK_CC_INLINE static void *
575 ck_ring_enqueue_reserve_mpmc_size(struct ck_ring *ring,
576     struct ck_ring_buffer *buffer,
577     unsigned int *ticket,
578     unsigned int *size)
579 {
580 
581 	return _ck_ring_enqueue_reserve_mp(ring, buffer, sizeof(void *),
582 	    ticket, size);
583 }
584 
585 CK_CC_INLINE static void
586 ck_ring_enqueue_commit_mpmc(struct ck_ring *ring, unsigned int ticket)
587 {
588 
589 	_ck_ring_enqueue_commit_mp(ring, ticket);
590 	return;
591 }
592 
593 CK_CC_INLINE static bool
594 ck_ring_trydequeue_mpmc(struct ck_ring *ring,
595     const struct ck_ring_buffer *buffer,
596     void *data)
597 {
598 
599 	return _ck_ring_trydequeue_mc(ring,
600 	    buffer, (void **)data, sizeof(void *));
601 }
602 
603 CK_CC_INLINE static bool
604 ck_ring_dequeue_mpmc(struct ck_ring *ring,
605     const struct ck_ring_buffer *buffer,
606     void *data)
607 {
608 
609 	return _ck_ring_dequeue_mc(ring, buffer, (void **)data,
610 	    sizeof(void *));
611 }
612 
613 /*
614  * The ck_ring_*_spmc namespace is the public interface for interacting with a
615  * ring buffer containing pointers. Correctness is provided for any number of
616  * consumers with up to one concurrent producer.
617  */
618 CK_CC_INLINE static void *
619 ck_ring_enqueue_reserve_spmc_size(struct ck_ring *ring,
620     struct ck_ring_buffer *buffer,
621     unsigned int *size)
622 {
623 
624 	return _ck_ring_enqueue_reserve_sp(ring, buffer, sizeof(void *), size);
625 }
626 
627 CK_CC_INLINE static void *
628 ck_ring_enqueue_reserve_spmc(struct ck_ring *ring,
629     struct ck_ring_buffer *buffer)
630 {
631 
632 	return _ck_ring_enqueue_reserve_sp(ring, buffer, sizeof(void *), NULL);
633 }
634 
635 CK_CC_INLINE static void
636 ck_ring_enqueue_commit_spmc(struct ck_ring *ring)
637 {
638 
639 	_ck_ring_enqueue_commit_sp(ring);
640 	return;
641 }
642 
643 CK_CC_INLINE static bool
644 ck_ring_enqueue_spmc_size(struct ck_ring *ring,
645     struct ck_ring_buffer *buffer,
646     const void *entry,
647     unsigned int *size)
648 {
649 
650 	return _ck_ring_enqueue_sp_size(ring, buffer, &entry,
651 	    sizeof(entry), size);
652 }
653 
654 CK_CC_INLINE static bool
655 ck_ring_enqueue_spmc(struct ck_ring *ring,
656     struct ck_ring_buffer *buffer,
657     const void *entry)
658 {
659 
660 	return _ck_ring_enqueue_sp(ring, buffer, &entry,
661 	    sizeof(entry), NULL);
662 }
663 
664 CK_CC_INLINE static bool
665 ck_ring_trydequeue_spmc(struct ck_ring *ring,
666     const struct ck_ring_buffer *buffer,
667     void *data)
668 {
669 
670 	return _ck_ring_trydequeue_mc(ring, buffer, (void **)data, sizeof(void *));
671 }
672 
673 CK_CC_INLINE static bool
674 ck_ring_dequeue_spmc(struct ck_ring *ring,
675     const struct ck_ring_buffer *buffer,
676     void *data)
677 {
678 
679 	return _ck_ring_dequeue_mc(ring, buffer, (void **)data, sizeof(void *));
680 }
681 
682 /*
683  * The ck_ring_*_mpsc namespace is the public interface for interacting with a
684  * ring buffer containing pointers. Correctness is provided for any number of
685  * producers with up to one concurrent consumers.
686  */
687 CK_CC_INLINE static void *
688 ck_ring_enqueue_reserve_mpsc(struct ck_ring *ring,
689     struct ck_ring_buffer *buffer,
690     unsigned int *ticket)
691 {
692 
693 	return _ck_ring_enqueue_reserve_mp(ring, buffer, sizeof(void *),
694 	    ticket, NULL);
695 }
696 
697 CK_CC_INLINE static void *
698 ck_ring_enqueue_reserve_mpsc_size(struct ck_ring *ring,
699     struct ck_ring_buffer *buffer,
700     unsigned int *ticket,
701     unsigned int *size)
702 {
703 
704 	return _ck_ring_enqueue_reserve_mp(ring, buffer, sizeof(void *),
705 	    ticket, size);
706 }
707 
708 CK_CC_INLINE static void
709 ck_ring_enqueue_commit_mpsc(struct ck_ring *ring, unsigned int ticket)
710 {
711 
712 	_ck_ring_enqueue_commit_mp(ring, ticket);
713 	return;
714 }
715 
716 CK_CC_INLINE static bool
717 ck_ring_enqueue_mpsc(struct ck_ring *ring,
718     struct ck_ring_buffer *buffer,
719     const void *entry)
720 {
721 
722 	return _ck_ring_enqueue_mp(ring, buffer, &entry,
723 	    sizeof(entry), NULL);
724 }
725 
726 CK_CC_INLINE static bool
727 ck_ring_enqueue_mpsc_size(struct ck_ring *ring,
728     struct ck_ring_buffer *buffer,
729     const void *entry,
730     unsigned int *size)
731 {
732 
733 	return _ck_ring_enqueue_mp_size(ring, buffer, &entry,
734 	    sizeof(entry), size);
735 }
736 
737 CK_CC_INLINE static bool
738 ck_ring_dequeue_mpsc(struct ck_ring *ring,
739     const struct ck_ring_buffer *buffer,
740     void *data)
741 {
742 
743 	return _ck_ring_dequeue_sc(ring, buffer, (void **)data,
744 	    sizeof(void *));
745 }
746 
747 /*
748  * CK_RING_PROTOTYPE is used to define a type-safe interface for inlining
749  * values of a particular type in the ring the buffer.
750  */
751 #define CK_RING_PROTOTYPE(name, type)				\
752 CK_CC_INLINE static struct type *				\
753 ck_ring_enqueue_reserve_spsc_##name(struct ck_ring *a,		\
754     struct type *b)						\
755 {								\
756 								\
757 	return _ck_ring_enqueue_reserve_sp(a, b, 		\
758 	    sizeof(struct type), NULL);				\
759 }								\
760 								\
761 CK_CC_INLINE static struct type *				\
762 ck_ring_enqueue_reserve_spsc_size_##name(struct ck_ring *a,	\
763     struct type *b,						\
764     unsigned int *c)						\
765 {								\
766 								\
767 	return _ck_ring_enqueue_reserve_sp(a, b, 		\
768 	    sizeof(struct type), c);				\
769 }								\
770 								\
771 CK_CC_INLINE static bool					\
772 ck_ring_enqueue_spsc_size_##name(struct ck_ring *a,		\
773     struct type *b,						\
774     struct type *c,						\
775     unsigned int *d)						\
776 {								\
777 								\
778 	return _ck_ring_enqueue_sp_size(a, b, c,		\
779 	    sizeof(struct type), d);				\
780 }								\
781 								\
782 CK_CC_INLINE static bool					\
783 ck_ring_enqueue_spsc_##name(struct ck_ring *a,			\
784     struct type *b,						\
785     struct type *c)						\
786 {								\
787 								\
788 	return _ck_ring_enqueue_sp(a, b, c,			\
789 	    sizeof(struct type), NULL);				\
790 }								\
791 								\
792 CK_CC_INLINE static bool					\
793 ck_ring_dequeue_spsc_##name(struct ck_ring *a,			\
794     struct type *b,						\
795     struct type *c)						\
796 {								\
797 								\
798 	return _ck_ring_dequeue_sc(a, b, c,			\
799 	    sizeof(struct type));				\
800 }								\
801 								\
802 CK_CC_INLINE static struct type *				\
803 ck_ring_enqueue_reserve_spmc_##name(struct ck_ring *a,		\
804     struct type *b)						\
805 {								\
806 								\
807 	return _ck_ring_enqueue_reserve_sp(a, b, 		\
808 	    sizeof(struct type), NULL);				\
809 }								\
810 								\
811 CK_CC_INLINE static struct type *				\
812 ck_ring_enqueue_reserve_spmc_size_##name(struct ck_ring *a,	\
813     struct type *b,						\
814     unsigned int *c)						\
815 {								\
816 								\
817 	return _ck_ring_enqueue_reserve_sp(a, b, 		\
818 	    sizeof(struct type), c);				\
819 }								\
820 								\
821 CK_CC_INLINE static bool					\
822 ck_ring_enqueue_spmc_size_##name(struct ck_ring *a,		\
823     struct type *b,						\
824     struct type *c,						\
825     unsigned int *d)						\
826 {								\
827 								\
828 	return _ck_ring_enqueue_sp_size(a, b, c,		\
829 	    sizeof(struct type), d);				\
830 }								\
831 								\
832 CK_CC_INLINE static bool					\
833 ck_ring_enqueue_spmc_##name(struct ck_ring *a,			\
834     struct type *b,						\
835     struct type *c)						\
836 {								\
837 								\
838 	return _ck_ring_enqueue_sp(a, b, c,			\
839 	    sizeof(struct type), NULL);				\
840 }								\
841 								\
842 CK_CC_INLINE static bool					\
843 ck_ring_trydequeue_spmc_##name(struct ck_ring *a,		\
844     struct type *b,						\
845     struct type *c)						\
846 {								\
847 								\
848 	return _ck_ring_trydequeue_mc(a,			\
849 	    b, c, sizeof(struct type));				\
850 }								\
851 								\
852 CK_CC_INLINE static bool					\
853 ck_ring_dequeue_spmc_##name(struct ck_ring *a,			\
854     struct type *b,						\
855     struct type *c)						\
856 {								\
857 								\
858 	return _ck_ring_dequeue_mc(a, b, c,			\
859 	    sizeof(struct type));				\
860 }								\
861 								\
862 CK_CC_INLINE static struct type *				\
863 ck_ring_enqueue_reserve_mpsc_##name(struct ck_ring *a,		\
864     struct type *b,						\
865     unsigned int *c)						\
866 {								\
867 								\
868 	return _ck_ring_enqueue_reserve_mp(a, b, 		\
869 	    sizeof(struct type), c, NULL);			\
870 }								\
871 								\
872 CK_CC_INLINE static struct type *				\
873 ck_ring_enqueue_reserve_mpsc_size_##name(struct ck_ring *a,	\
874     struct type *b,						\
875     unsigned int *c,						\
876     unsigned int *d)						\
877 {								\
878 								\
879 	return _ck_ring_enqueue_reserve_mp(a, b, 		\
880 	    sizeof(struct type), c, d);				\
881 }								\
882 								\
883 CK_CC_INLINE static bool					\
884 ck_ring_enqueue_mpsc_##name(struct ck_ring *a,			\
885     struct type *b,						\
886     struct type *c)						\
887 {								\
888 								\
889 	return _ck_ring_enqueue_mp(a, b, c,			\
890 	    sizeof(struct type), NULL);				\
891 }								\
892 								\
893 CK_CC_INLINE static bool					\
894 ck_ring_enqueue_mpsc_size_##name(struct ck_ring *a,		\
895     struct type *b,						\
896     struct type *c,						\
897     unsigned int *d)						\
898 {								\
899 								\
900 	return _ck_ring_enqueue_mp_size(a, b, c,		\
901 	    sizeof(struct type), d);				\
902 }								\
903 								\
904 CK_CC_INLINE static bool					\
905 ck_ring_dequeue_mpsc_##name(struct ck_ring *a,			\
906     struct type *b,						\
907     struct type *c)						\
908 {								\
909 								\
910 	return _ck_ring_dequeue_sc(a, b, c,			\
911 	    sizeof(struct type));				\
912 }								\
913 								\
914 CK_CC_INLINE static struct type *				\
915 ck_ring_enqueue_reserve_mpmc_##name(struct ck_ring *a,		\
916     struct type *b,						\
917     unsigned int *c)						\
918 {								\
919 								\
920 	return _ck_ring_enqueue_reserve_mp(a, b, 		\
921 	    sizeof(struct type), c, NULL);			\
922 }								\
923 								\
924 CK_CC_INLINE static struct type *				\
925 ck_ring_enqueue_reserve_mpmc_size_##name(struct ck_ring *a,	\
926     struct type *b,						\
927     unsigned int *c,						\
928     unsigned int *d)						\
929 {								\
930 								\
931 	return _ck_ring_enqueue_reserve_mp(a, b, 		\
932 	    sizeof(struct type), c, d);				\
933 }								\
934 								\
935 CK_CC_INLINE static bool					\
936 ck_ring_enqueue_mpmc_size_##name(struct ck_ring *a,		\
937     struct type *b,						\
938     struct type *c,						\
939     unsigned int *d)						\
940 {								\
941 								\
942 	return _ck_ring_enqueue_mp_size(a, b, c,		\
943 	    sizeof(struct type), d);				\
944 }								\
945 								\
946 CK_CC_INLINE static bool					\
947 ck_ring_enqueue_mpmc_##name(struct ck_ring *a,			\
948     struct type *b,						\
949     struct type *c)						\
950 {								\
951 								\
952 	return _ck_ring_enqueue_mp(a, b, c,			\
953 	    sizeof(struct type), NULL);				\
954 }								\
955 								\
956 CK_CC_INLINE static bool					\
957 ck_ring_trydequeue_mpmc_##name(struct ck_ring *a,		\
958     struct type *b,						\
959     struct type *c)						\
960 {								\
961 								\
962 	return _ck_ring_trydequeue_mc(a,			\
963 	    b, c, sizeof(struct type));				\
964 }								\
965 								\
966 CK_CC_INLINE static bool					\
967 ck_ring_dequeue_mpmc_##name(struct ck_ring *a,			\
968     struct type *b,						\
969     struct type *c)						\
970 {								\
971 								\
972 	return _ck_ring_dequeue_mc(a, b, c,			\
973 	    sizeof(struct type));				\
974 }
975 
976 /*
977  * A single producer with one concurrent consumer.
978  */
979 #define CK_RING_ENQUEUE_SPSC(name, a, b, c)			\
980 	ck_ring_enqueue_spsc_##name(a, b, c)
981 #define CK_RING_ENQUEUE_SPSC_SIZE(name, a, b, c, d)		\
982 	ck_ring_enqueue_spsc_size_##name(a, b, c, d)
983 #define CK_RING_ENQUEUE_RESERVE_SPSC(name, a, b, c)		\
984 	ck_ring_enqueue_reserve_spsc_##name(a, b, c)
985 #define CK_RING_ENQUEUE_RESERVE_SPSC_SIZE(name, a, b, c, d)	\
986 	ck_ring_enqueue_reserve_spsc_size_##name(a, b, c, d)
987 #define CK_RING_DEQUEUE_SPSC(name, a, b, c)			\
988 	ck_ring_dequeue_spsc_##name(a, b, c)
989 
990 /*
991  * A single producer with any number of concurrent consumers.
992  */
993 #define CK_RING_ENQUEUE_SPMC(name, a, b, c)			\
994 	ck_ring_enqueue_spmc_##name(a, b, c)
995 #define CK_RING_ENQUEUE_SPMC_SIZE(name, a, b, c, d)		\
996 	ck_ring_enqueue_spmc_size_##name(a, b, c, d)
997 #define CK_RING_ENQUEUE_RESERVE_SPMC(name, a, b, c)		\
998 	ck_ring_enqueue_reserve_spmc_##name(a, b, c)
999 #define CK_RING_ENQUEUE_RESERVE_SPMC_SIZE(name, a, b, c, d)	\
1000 	ck_ring_enqueue_reserve_spmc_size_##name(a, b, c, d)
1001 #define CK_RING_TRYDEQUEUE_SPMC(name, a, b, c)			\
1002 	ck_ring_trydequeue_spmc_##name(a, b, c)
1003 #define CK_RING_DEQUEUE_SPMC(name, a, b, c)			\
1004 	ck_ring_dequeue_spmc_##name(a, b, c)
1005 
1006 /*
1007  * Any number of concurrent producers with up to one
1008  * concurrent consumer.
1009  */
1010 #define CK_RING_ENQUEUE_MPSC(name, a, b, c)			\
1011 	ck_ring_enqueue_mpsc_##name(a, b, c)
1012 #define CK_RING_ENQUEUE_MPSC_SIZE(name, a, b, c, d)		\
1013 	ck_ring_enqueue_mpsc_size_##name(a, b, c, d)
1014 #define CK_RING_ENQUEUE_RESERVE_MPSC(name, a, b, c)		\
1015 	ck_ring_enqueue_reserve_mpsc_##name(a, b, c)
1016 #define CK_RING_ENQUEUE_RESERVE_MPSC_SIZE(name, a, b, c, d)	\
1017 	ck_ring_enqueue_reserve_mpsc_size_##name(a, b, c, d)
1018 #define CK_RING_DEQUEUE_MPSC(name, a, b, c)			\
1019 	ck_ring_dequeue_mpsc_##name(a, b, c)
1020 
1021 /*
1022  * Any number of concurrent producers and consumers.
1023  */
1024 #define CK_RING_ENQUEUE_MPMC(name, a, b, c)			\
1025 	ck_ring_enqueue_mpmc_##name(a, b, c)
1026 #define CK_RING_ENQUEUE_MPMC_SIZE(name, a, b, c, d)		\
1027 	ck_ring_enqueue_mpmc_size_##name(a, b, c, d)
1028 #define CK_RING_ENQUEUE_RESERVE_MPMC(name, a, b, c)		\
1029 	ck_ring_enqueue_reserve_mpmc_##name(a, b, c)
1030 #define CK_RING_ENQUEUE_RESERVE_MPMC_SIZE(name, a, b, c, d)	\
1031 	ck_ring_enqueue_reserve_mpmc_size_##name(a, b, c, d)
1032 #define CK_RING_TRYDEQUEUE_MPMC(name, a, b, c)			\
1033 	ck_ring_trydequeue_mpmc_##name(a, b, c)
1034 #define CK_RING_DEQUEUE_MPMC(name, a, b, c)			\
1035 	ck_ring_dequeue_mpmc_##name(a, b, c)
1036 
1037 #endif /* CK_RING_H */
1038