xref: /freebsd/sys/contrib/ck/include/ck_ring.h (revision 8df8b2d3e51d1b816201d8a1fe8bc29fe192e562)
1 /*
2  * Copyright 2009-2015 Samy Al Bahra.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24  * SUCH DAMAGE.
25  */
26 
27 #ifndef CK_RING_H
28 #define CK_RING_H
29 
30 #include <ck_cc.h>
31 #include <ck_md.h>
32 #include <ck_pr.h>
33 #include <ck_stdbool.h>
34 #include <ck_string.h>
35 
36 /*
37  * Concurrent ring buffer.
38  */
39 
40 struct ck_ring {
41 	unsigned int c_head;
42 	char pad[CK_MD_CACHELINE - sizeof(unsigned int)];
43 	unsigned int p_tail;
44 	unsigned int p_head;
45 	char _pad[CK_MD_CACHELINE - sizeof(unsigned int) * 2];
46 	unsigned int size;
47 	unsigned int mask;
48 };
49 typedef struct ck_ring ck_ring_t;
50 
51 struct ck_ring_buffer {
52 	void *value;
53 };
54 typedef struct ck_ring_buffer ck_ring_buffer_t;
55 
56 CK_CC_INLINE static unsigned int
57 ck_ring_size(const struct ck_ring *ring)
58 {
59 	unsigned int c, p;
60 
61 	c = ck_pr_load_uint(&ring->c_head);
62 	p = ck_pr_load_uint(&ring->p_tail);
63 	return (p - c) & ring->mask;
64 }
65 
66 CK_CC_INLINE static unsigned int
67 ck_ring_capacity(const struct ck_ring *ring)
68 {
69 	return ring->size;
70 }
71 
72 CK_CC_INLINE static void
73 ck_ring_init(struct ck_ring *ring, unsigned int size)
74 {
75 
76 	ring->size = size;
77 	ring->mask = size - 1;
78 	ring->p_tail = 0;
79 	ring->p_head = 0;
80 	ring->c_head = 0;
81 	return;
82 }
83 
84 /*
85  * The _ck_ring_* namespace is internal only and must not used externally.
86  */
87 CK_CC_FORCE_INLINE static bool
88 _ck_ring_enqueue_sp(struct ck_ring *ring,
89     void *CK_CC_RESTRICT buffer,
90     const void *CK_CC_RESTRICT entry,
91     unsigned int ts,
92     unsigned int *size)
93 {
94 	const unsigned int mask = ring->mask;
95 	unsigned int consumer, producer, delta;
96 
97 	consumer = ck_pr_load_uint(&ring->c_head);
98 	producer = ring->p_tail;
99 	delta = producer + 1;
100 	if (size != NULL)
101 		*size = (producer - consumer) & mask;
102 
103 	if (CK_CC_UNLIKELY((delta & mask) == (consumer & mask)))
104 		return false;
105 
106 	buffer = (char *)buffer + ts * (producer & mask);
107 	memcpy(buffer, entry, ts);
108 
109 	/*
110 	 * Make sure to update slot value before indicating
111 	 * that the slot is available for consumption.
112 	 */
113 	ck_pr_fence_store();
114 	ck_pr_store_uint(&ring->p_tail, delta);
115 	return true;
116 }
117 
118 CK_CC_FORCE_INLINE static bool
119 _ck_ring_enqueue_sp_size(struct ck_ring *ring,
120     void *CK_CC_RESTRICT buffer,
121     const void *CK_CC_RESTRICT entry,
122     unsigned int ts,
123     unsigned int *size)
124 {
125 	unsigned int sz;
126 	bool r;
127 
128 	r = _ck_ring_enqueue_sp(ring, buffer, entry, ts, &sz);
129 	*size = sz;
130 	return r;
131 }
132 
133 CK_CC_FORCE_INLINE static bool
134 _ck_ring_dequeue_sc(struct ck_ring *ring,
135     const void *CK_CC_RESTRICT buffer,
136     void *CK_CC_RESTRICT target,
137     unsigned int size)
138 {
139 	const unsigned int mask = ring->mask;
140 	unsigned int consumer, producer;
141 
142 	consumer = ring->c_head;
143 	producer = ck_pr_load_uint(&ring->p_tail);
144 
145 	if (CK_CC_UNLIKELY(consumer == producer))
146 		return false;
147 
148 	/*
149 	 * Make sure to serialize with respect to our snapshot
150 	 * of the producer counter.
151 	 */
152 	ck_pr_fence_load();
153 
154 	buffer = (const char *)buffer + size * (consumer & mask);
155 	memcpy(target, buffer, size);
156 
157 	/*
158 	 * Make sure copy is completed with respect to consumer
159 	 * update.
160 	 */
161 	ck_pr_fence_store();
162 	ck_pr_store_uint(&ring->c_head, consumer + 1);
163 	return true;
164 }
165 
166 CK_CC_FORCE_INLINE static bool
167 _ck_ring_enqueue_mp(struct ck_ring *ring,
168     void *buffer,
169     const void *entry,
170     unsigned int ts,
171     unsigned int *size)
172 {
173 	const unsigned int mask = ring->mask;
174 	unsigned int producer, consumer, delta;
175 	bool r = true;
176 
177 	producer = ck_pr_load_uint(&ring->p_head);
178 
179 	for (;;) {
180 		/*
181 		 * The snapshot of producer must be up to date with respect to
182 		 * consumer.
183 		 */
184 		ck_pr_fence_load();
185 		consumer = ck_pr_load_uint(&ring->c_head);
186 
187 		delta = producer + 1;
188 
189 		/*
190 		 * Only try to CAS if the producer is not clearly stale (not
191 		 * less than consumer) and the buffer is definitely not full.
192 		 */
193 		if (CK_CC_LIKELY((producer - consumer) < mask)) {
194 			if (ck_pr_cas_uint_value(&ring->p_head,
195 			    producer, delta, &producer) == true) {
196 				break;
197 			}
198 		} else {
199 			unsigned int new_producer;
200 
201 			/*
202 			 * Slow path.  Either the buffer is full or we have a
203 			 * stale snapshot of p_head.  Execute a second read of
204 			 * p_read that must be ordered wrt the snapshot of
205 			 * c_head.
206 			 */
207 			ck_pr_fence_load();
208 			new_producer = ck_pr_load_uint(&ring->p_head);
209 
210 			/*
211 			 * Only fail if we haven't made forward progress in
212 			 * production: the buffer must have been full when we
213 			 * read new_producer (or we wrapped around UINT_MAX
214 			 * during this iteration).
215 			 */
216 			if (producer == new_producer) {
217 				r = false;
218 				goto leave;
219 			}
220 
221 			/*
222 			 * p_head advanced during this iteration. Try again.
223 			 */
224 			producer = new_producer;
225 		}
226 	}
227 
228 	buffer = (char *)buffer + ts * (producer & mask);
229 	memcpy(buffer, entry, ts);
230 
231 	/*
232 	 * Wait until all concurrent producers have completed writing
233 	 * their data into the ring buffer.
234 	 */
235 	while (ck_pr_load_uint(&ring->p_tail) != producer)
236 		ck_pr_stall();
237 
238 	/*
239 	 * Ensure that copy is completed before updating shared producer
240 	 * counter.
241 	 */
242 	ck_pr_fence_store();
243 	ck_pr_store_uint(&ring->p_tail, delta);
244 
245 leave:
246 	if (size != NULL)
247 		*size = (producer - consumer) & mask;
248 
249 	return r;
250 }
251 
252 CK_CC_FORCE_INLINE static bool
253 _ck_ring_enqueue_mp_size(struct ck_ring *ring,
254     void *buffer,
255     const void *entry,
256     unsigned int ts,
257     unsigned int *size)
258 {
259 	unsigned int sz;
260 	bool r;
261 
262 	r = _ck_ring_enqueue_mp(ring, buffer, entry, ts, &sz);
263 	*size = sz;
264 	return r;
265 }
266 
267 CK_CC_FORCE_INLINE static bool
268 _ck_ring_trydequeue_mc(struct ck_ring *ring,
269     const void *buffer,
270     void *data,
271     unsigned int size)
272 {
273 	const unsigned int mask = ring->mask;
274 	unsigned int consumer, producer;
275 
276 	consumer = ck_pr_load_uint(&ring->c_head);
277 	ck_pr_fence_load();
278 	producer = ck_pr_load_uint(&ring->p_tail);
279 
280 	if (CK_CC_UNLIKELY(consumer == producer))
281 		return false;
282 
283 	ck_pr_fence_load();
284 
285 	buffer = (const char *)buffer + size * (consumer & mask);
286 	memcpy(data, buffer, size);
287 
288 	ck_pr_fence_store_atomic();
289 	return ck_pr_cas_uint(&ring->c_head, consumer, consumer + 1);
290 }
291 
292 CK_CC_FORCE_INLINE static bool
293 _ck_ring_dequeue_mc(struct ck_ring *ring,
294     const void *buffer,
295     void *data,
296     unsigned int ts)
297 {
298 	const unsigned int mask = ring->mask;
299 	unsigned int consumer, producer;
300 
301 	consumer = ck_pr_load_uint(&ring->c_head);
302 
303 	do {
304 		const char *target;
305 
306 		/*
307 		 * Producer counter must represent state relative to
308 		 * our latest consumer snapshot.
309 		 */
310 		ck_pr_fence_load();
311 		producer = ck_pr_load_uint(&ring->p_tail);
312 
313 		if (CK_CC_UNLIKELY(consumer == producer))
314 			return false;
315 
316 		ck_pr_fence_load();
317 
318 		target = (const char *)buffer + ts * (consumer & mask);
319 		memcpy(data, target, ts);
320 
321 		/* Serialize load with respect to head update. */
322 		ck_pr_fence_store_atomic();
323 	} while (ck_pr_cas_uint_value(&ring->c_head,
324 				      consumer,
325 				      consumer + 1,
326 				      &consumer) == false);
327 
328 	return true;
329 }
330 
331 /*
332  * The ck_ring_*_spsc namespace is the public interface for interacting with a
333  * ring buffer containing pointers. Correctness is only provided if there is up
334  * to one concurrent consumer and up to one concurrent producer.
335  */
336 CK_CC_INLINE static bool
337 ck_ring_enqueue_spsc_size(struct ck_ring *ring,
338     struct ck_ring_buffer *buffer,
339     const void *entry,
340     unsigned int *size)
341 {
342 
343 	return _ck_ring_enqueue_sp_size(ring, buffer, &entry,
344 	    sizeof(entry), size);
345 }
346 
347 CK_CC_INLINE static bool
348 ck_ring_enqueue_spsc(struct ck_ring *ring,
349     struct ck_ring_buffer *buffer,
350     const void *entry)
351 {
352 
353 	return _ck_ring_enqueue_sp(ring, buffer,
354 	    &entry, sizeof(entry), NULL);
355 }
356 
357 CK_CC_INLINE static bool
358 ck_ring_dequeue_spsc(struct ck_ring *ring,
359     const struct ck_ring_buffer *buffer,
360     void *data)
361 {
362 
363 	return _ck_ring_dequeue_sc(ring, buffer,
364 	    (void **)data, sizeof(void *));
365 }
366 
367 /*
368  * The ck_ring_*_mpmc namespace is the public interface for interacting with a
369  * ring buffer containing pointers. Correctness is provided for any number of
370  * producers and consumers.
371  */
372 CK_CC_INLINE static bool
373 ck_ring_enqueue_mpmc(struct ck_ring *ring,
374     struct ck_ring_buffer *buffer,
375     const void *entry)
376 {
377 
378 	return _ck_ring_enqueue_mp(ring, buffer, &entry,
379 	    sizeof(entry), NULL);
380 }
381 
382 CK_CC_INLINE static bool
383 ck_ring_enqueue_mpmc_size(struct ck_ring *ring,
384     struct ck_ring_buffer *buffer,
385     const void *entry,
386     unsigned int *size)
387 {
388 
389 	return _ck_ring_enqueue_mp_size(ring, buffer, &entry,
390 	    sizeof(entry), size);
391 }
392 
393 CK_CC_INLINE static bool
394 ck_ring_trydequeue_mpmc(struct ck_ring *ring,
395     const struct ck_ring_buffer *buffer,
396     void *data)
397 {
398 
399 	return _ck_ring_trydequeue_mc(ring,
400 	    buffer, (void **)data, sizeof(void *));
401 }
402 
403 CK_CC_INLINE static bool
404 ck_ring_dequeue_mpmc(struct ck_ring *ring,
405     const struct ck_ring_buffer *buffer,
406     void *data)
407 {
408 
409 	return _ck_ring_dequeue_mc(ring, buffer, (void **)data,
410 	    sizeof(void *));
411 }
412 
413 /*
414  * The ck_ring_*_spmc namespace is the public interface for interacting with a
415  * ring buffer containing pointers. Correctness is provided for any number of
416  * consumers with up to one concurrent producer.
417  */
418 CK_CC_INLINE static bool
419 ck_ring_enqueue_spmc_size(struct ck_ring *ring,
420     struct ck_ring_buffer *buffer,
421     const void *entry,
422     unsigned int *size)
423 {
424 
425 	return _ck_ring_enqueue_sp_size(ring, buffer, &entry,
426 	    sizeof(entry), size);
427 }
428 
429 CK_CC_INLINE static bool
430 ck_ring_enqueue_spmc(struct ck_ring *ring,
431     struct ck_ring_buffer *buffer,
432     const void *entry)
433 {
434 
435 	return _ck_ring_enqueue_sp(ring, buffer, &entry,
436 	    sizeof(entry), NULL);
437 }
438 
439 CK_CC_INLINE static bool
440 ck_ring_trydequeue_spmc(struct ck_ring *ring,
441     const struct ck_ring_buffer *buffer,
442     void *data)
443 {
444 
445 	return _ck_ring_trydequeue_mc(ring, buffer, (void **)data, sizeof(void *));
446 }
447 
448 CK_CC_INLINE static bool
449 ck_ring_dequeue_spmc(struct ck_ring *ring,
450     const struct ck_ring_buffer *buffer,
451     void *data)
452 {
453 
454 	return _ck_ring_dequeue_mc(ring, buffer, (void **)data, sizeof(void *));
455 }
456 
457 /*
458  * The ck_ring_*_mpsc namespace is the public interface for interacting with a
459  * ring buffer containing pointers. Correctness is provided for any number of
460  * producers with up to one concurrent consumers.
461  */
462 CK_CC_INLINE static bool
463 ck_ring_enqueue_mpsc(struct ck_ring *ring,
464     struct ck_ring_buffer *buffer,
465     const void *entry)
466 {
467 
468 	return _ck_ring_enqueue_mp(ring, buffer, &entry,
469 	    sizeof(entry), NULL);
470 }
471 
472 CK_CC_INLINE static bool
473 ck_ring_enqueue_mpsc_size(struct ck_ring *ring,
474     struct ck_ring_buffer *buffer,
475     const void *entry,
476     unsigned int *size)
477 {
478 
479 	return _ck_ring_enqueue_mp_size(ring, buffer, &entry,
480 	    sizeof(entry), size);
481 }
482 
483 CK_CC_INLINE static bool
484 ck_ring_dequeue_mpsc(struct ck_ring *ring,
485     const struct ck_ring_buffer *buffer,
486     void *data)
487 {
488 
489 	return _ck_ring_dequeue_sc(ring, buffer, (void **)data,
490 	    sizeof(void *));
491 }
492 
493 /*
494  * CK_RING_PROTOTYPE is used to define a type-safe interface for inlining
495  * values of a particular type in the ring the buffer.
496  */
497 #define CK_RING_PROTOTYPE(name, type)			\
498 CK_CC_INLINE static bool				\
499 ck_ring_enqueue_spsc_size_##name(struct ck_ring *a,	\
500     struct type *b,					\
501     struct type *c,					\
502     unsigned int *d)					\
503 {							\
504 							\
505 	return _ck_ring_enqueue_sp_size(a, b, c,	\
506 	    sizeof(struct type), d);			\
507 }							\
508 							\
509 CK_CC_INLINE static bool				\
510 ck_ring_enqueue_spsc_##name(struct ck_ring *a,		\
511     struct type *b,					\
512     struct type *c)					\
513 {							\
514 							\
515 	return _ck_ring_enqueue_sp(a, b, c,		\
516 	    sizeof(struct type), NULL);			\
517 }							\
518 							\
519 CK_CC_INLINE static bool				\
520 ck_ring_dequeue_spsc_##name(struct ck_ring *a,		\
521     struct type *b,					\
522     struct type *c)					\
523 {							\
524 							\
525 	return _ck_ring_dequeue_sc(a, b, c,		\
526 	    sizeof(struct type));			\
527 }							\
528 							\
529 CK_CC_INLINE static bool				\
530 ck_ring_enqueue_spmc_size_##name(struct ck_ring *a,	\
531     struct type *b,					\
532     struct type *c,					\
533     unsigned int *d)					\
534 {							\
535 							\
536 	return _ck_ring_enqueue_sp_size(a, b, c,	\
537 	    sizeof(struct type), d);			\
538 }							\
539 							\
540 CK_CC_INLINE static bool				\
541 ck_ring_enqueue_spmc_##name(struct ck_ring *a,		\
542     struct type *b,					\
543     struct type *c)					\
544 {							\
545 							\
546 	return _ck_ring_enqueue_sp(a, b, c,		\
547 	    sizeof(struct type), NULL);			\
548 }							\
549 							\
550 CK_CC_INLINE static bool				\
551 ck_ring_trydequeue_spmc_##name(struct ck_ring *a,	\
552     struct type *b,					\
553     struct type *c)					\
554 {							\
555 							\
556 	return _ck_ring_trydequeue_mc(a,		\
557 	    b, c, sizeof(struct type));			\
558 }							\
559 							\
560 CK_CC_INLINE static bool				\
561 ck_ring_dequeue_spmc_##name(struct ck_ring *a,		\
562     struct type *b,					\
563     struct type *c)					\
564 {							\
565 							\
566 	return _ck_ring_dequeue_mc(a, b, c,		\
567 	    sizeof(struct type));			\
568 }							\
569 							\
570 CK_CC_INLINE static bool				\
571 ck_ring_enqueue_mpsc_##name(struct ck_ring *a,		\
572     struct type *b,					\
573     struct type *c)					\
574 {							\
575 							\
576 	return _ck_ring_enqueue_mp(a, b, c,		\
577 	    sizeof(struct type), NULL);			\
578 }							\
579 							\
580 CK_CC_INLINE static bool				\
581 ck_ring_enqueue_mpsc_size_##name(struct ck_ring *a,	\
582     struct type *b,					\
583     struct type *c,					\
584     unsigned int *d)					\
585 {							\
586 							\
587 	return _ck_ring_enqueue_mp_size(a, b, c,	\
588 	    sizeof(struct type), d);			\
589 }							\
590 							\
591 CK_CC_INLINE static bool				\
592 ck_ring_dequeue_mpsc_##name(struct ck_ring *a,		\
593     struct type *b,					\
594     struct type *c)					\
595 {							\
596 							\
597 	return _ck_ring_dequeue_sc(a, b, c,		\
598 	    sizeof(struct type));			\
599 }							\
600 							\
601 CK_CC_INLINE static bool				\
602 ck_ring_enqueue_mpmc_size_##name(struct ck_ring *a,	\
603     struct type *b,					\
604     struct type *c,					\
605     unsigned int *d)					\
606 {							\
607 							\
608 	return _ck_ring_enqueue_mp_size(a, b, c,	\
609 	    sizeof(struct type), d);			\
610 }							\
611 							\
612 CK_CC_INLINE static bool				\
613 ck_ring_enqueue_mpmc_##name(struct ck_ring *a,		\
614     struct type *b,					\
615     struct type *c)					\
616 {							\
617 							\
618 	return _ck_ring_enqueue_mp(a, b, c,		\
619 	    sizeof(struct type), NULL);			\
620 }							\
621 							\
622 CK_CC_INLINE static bool				\
623 ck_ring_trydequeue_mpmc_##name(struct ck_ring *a,	\
624     struct type *b,					\
625     struct type *c)					\
626 {							\
627 							\
628 	return _ck_ring_trydequeue_mc(a,		\
629 	    b, c, sizeof(struct type));			\
630 }							\
631 							\
632 CK_CC_INLINE static bool				\
633 ck_ring_dequeue_mpmc_##name(struct ck_ring *a,		\
634     struct type *b,					\
635     struct type *c)					\
636 {							\
637 							\
638 	return _ck_ring_dequeue_mc(a, b, c,		\
639 	    sizeof(struct type));			\
640 }
641 
642 /*
643  * A single producer with one concurrent consumer.
644  */
645 #define CK_RING_ENQUEUE_SPSC(name, a, b, c)		\
646 	ck_ring_enqueue_spsc_##name(a, b, c)
647 #define CK_RING_ENQUEUE_SPSC_SIZE(name, a, b, c, d)	\
648 	ck_ring_enqueue_spsc_size_##name(a, b, c, d)
649 #define CK_RING_DEQUEUE_SPSC(name, a, b, c)		\
650 	ck_ring_dequeue_spsc_##name(a, b, c)
651 
652 /*
653  * A single producer with any number of concurrent consumers.
654  */
655 #define CK_RING_ENQUEUE_SPMC(name, a, b, c)		\
656 	ck_ring_enqueue_spmc_##name(a, b, c)
657 #define CK_RING_ENQUEUE_SPMC_SIZE(name, a, b, c, d)	\
658 	ck_ring_enqueue_spmc_size_##name(a, b, c, d)
659 #define CK_RING_TRYDEQUEUE_SPMC(name, a, b, c)		\
660 	ck_ring_trydequeue_spmc_##name(a, b, c)
661 #define CK_RING_DEQUEUE_SPMC(name, a, b, c)		\
662 	ck_ring_dequeue_spmc_##name(a, b, c)
663 
664 /*
665  * Any number of concurrent producers with up to one
666  * concurrent consumer.
667  */
668 #define CK_RING_ENQUEUE_MPSC(name, a, b, c)		\
669 	ck_ring_enqueue_mpsc_##name(a, b, c)
670 #define CK_RING_ENQUEUE_MPSC_SIZE(name, a, b, c, d)	\
671 	ck_ring_enqueue_mpsc_size_##name(a, b, c, d)
672 #define CK_RING_DEQUEUE_MPSC(name, a, b, c)		\
673 	ck_ring_dequeue_mpsc_##name(a, b, c)
674 
675 /*
676  * Any number of concurrent producers and consumers.
677  */
678 #define CK_RING_ENQUEUE_MPMC(name, a, b, c)		\
679 	ck_ring_enqueue_mpmc_##name(a, b, c)
680 #define CK_RING_ENQUEUE_MPMC_SIZE(name, a, b, c, d)	\
681 	ck_ring_enqueue_mpmc_size_##name(a, b, c, d)
682 #define CK_RING_TRYDEQUEUE_MPMC(name, a, b, c)		\
683 	ck_ring_trydequeue_mpmc_##name(a, b, c)
684 #define CK_RING_DEQUEUE_MPMC(name, a, b, c)		\
685 	ck_ring_dequeue_mpmc_##name(a, b, c)
686 
687 #endif /* CK_RING_H */
688