xref: /freebsd/sys/net/mp_ring.c (revision 036d2e814bf0f5d88ffb4b24c159320894541757)
1 /*-
2  * Copyright (c) 2014 Chelsio Communications, Inc.
3  * All rights reserved.
4  * Written by: Navdeep Parhar <np@FreeBSD.org>
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25  * SUCH DAMAGE.
26  */
27 
28 #include <sys/cdefs.h>
29 __FBSDID("$FreeBSD$");
30 
31 #include <sys/types.h>
32 #include <sys/param.h>
33 #include <sys/systm.h>
34 #include <sys/counter.h>
35 #include <sys/lock.h>
36 #include <sys/mutex.h>
37 #include <sys/malloc.h>
38 #include <machine/cpu.h>
39 #include <net/mp_ring.h>
40 
41 union ring_state {
42 	struct {
43 		uint16_t pidx_head;
44 		uint16_t pidx_tail;
45 		uint16_t cidx;
46 		uint16_t flags;
47 	};
48 	uint64_t state;
49 };
50 
51 enum {
52 	IDLE = 0,	/* consumer ran to completion, nothing more to do. */
53 	BUSY,		/* consumer is running already, or will be shortly. */
54 	STALLED,	/* consumer stopped due to lack of resources. */
55 	ABDICATED,	/* consumer stopped even though there was work to be
56 			   done because it wants another thread to take over. */
57 };
58 
59 static inline uint16_t
60 space_available(struct ifmp_ring *r, union ring_state s)
61 {
62 	uint16_t x = r->size - 1;
63 
64 	if (s.cidx == s.pidx_head)
65 		return (x);
66 	else if (s.cidx > s.pidx_head)
67 		return (s.cidx - s.pidx_head - 1);
68 	else
69 		return (x - s.pidx_head + s.cidx);
70 }
71 
72 static inline uint16_t
73 increment_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n)
74 {
75 	int x = r->size - idx;
76 
77 	MPASS(x > 0);
78 	return (x > n ? idx + n : n - x);
79 }
80 
81 /* Consumer is about to update the ring's state to s */
82 static inline uint16_t
83 state_to_flags(union ring_state s, int abdicate)
84 {
85 
86 	if (s.cidx == s.pidx_tail)
87 		return (IDLE);
88 	else if (abdicate && s.pidx_tail != s.pidx_head)
89 		return (ABDICATED);
90 
91 	return (BUSY);
92 }
93 
94 #ifdef MP_RING_NO_64BIT_ATOMICS
95 static void
96 drain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
97 {
98 	union ring_state ns;
99 	int n, pending, total;
100 	uint16_t cidx = os.cidx;
101 	uint16_t pidx = os.pidx_tail;
102 
103 	MPASS(os.flags == BUSY);
104 	MPASS(cidx != pidx);
105 
106 	if (prev == IDLE)
107 		counter_u64_add(r->starts, 1);
108 	pending = 0;
109 	total = 0;
110 
111 	while (cidx != pidx) {
112 
113 		/* Items from cidx to pidx are available for consumption. */
114 		n = r->drain(r, cidx, pidx);
115 		if (n == 0) {
116 			os.state = ns.state = r->state;
117 			ns.cidx = cidx;
118 			ns.flags = STALLED;
119 			r->state = ns.state;
120 			if (prev != STALLED)
121 				counter_u64_add(r->stalls, 1);
122 			else if (total > 0) {
123 				counter_u64_add(r->restarts, 1);
124 				counter_u64_add(r->stalls, 1);
125 			}
126 			break;
127 		}
128 		cidx = increment_idx(r, cidx, n);
129 		pending += n;
130 		total += n;
131 
132 		/*
133 		 * We update the cidx only if we've caught up with the pidx, the
134 		 * real cidx is getting too far ahead of the one visible to
135 		 * everyone else, or we have exceeded our budget.
136 		 */
137 		if (cidx != pidx && pending < 64 && total < budget)
138 			continue;
139 
140 		os.state = ns.state = r->state;
141 		ns.cidx = cidx;
142 		ns.flags = state_to_flags(ns, total >= budget);
143 		r->state = ns.state;
144 
145 		if (ns.flags == ABDICATED)
146 			counter_u64_add(r->abdications, 1);
147 		if (ns.flags != BUSY) {
148 			/* Wrong loop exit if we're going to stall. */
149 			MPASS(ns.flags != STALLED);
150 			if (prev == STALLED) {
151 				MPASS(total > 0);
152 				counter_u64_add(r->restarts, 1);
153 			}
154 			break;
155 		}
156 
157 		/*
158 		 * The acquire style atomic above guarantees visibility of items
159 		 * associated with any pidx change that we notice here.
160 		 */
161 		pidx = ns.pidx_tail;
162 		pending = 0;
163 	}
164 }
165 #else
166 /*
167  * Caller passes in a state, with a guarantee that there is work to do and that
168  * all items up to the pidx_tail in the state are visible.
169  */
170 static void
171 drain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
172 {
173 	union ring_state ns;
174 	int n, pending, total;
175 	uint16_t cidx = os.cidx;
176 	uint16_t pidx = os.pidx_tail;
177 
178 	MPASS(os.flags == BUSY);
179 	MPASS(cidx != pidx);
180 
181 	if (prev == IDLE)
182 		counter_u64_add(r->starts, 1);
183 	pending = 0;
184 	total = 0;
185 
186 	while (cidx != pidx) {
187 
188 		/* Items from cidx to pidx are available for consumption. */
189 		n = r->drain(r, cidx, pidx);
190 		if (n == 0) {
191 			critical_enter();
192 			os.state = r->state;
193 			do {
194 				ns.state = os.state;
195 				ns.cidx = cidx;
196 				ns.flags = STALLED;
197 			} while (atomic_fcmpset_64(&r->state, &os.state,
198 			    ns.state) == 0);
199 			critical_exit();
200 			if (prev != STALLED)
201 				counter_u64_add(r->stalls, 1);
202 			else if (total > 0) {
203 				counter_u64_add(r->restarts, 1);
204 				counter_u64_add(r->stalls, 1);
205 			}
206 			break;
207 		}
208 		cidx = increment_idx(r, cidx, n);
209 		pending += n;
210 		total += n;
211 
212 		/*
213 		 * We update the cidx only if we've caught up with the pidx, the
214 		 * real cidx is getting too far ahead of the one visible to
215 		 * everyone else, or we have exceeded our budget.
216 		 */
217 		if (cidx != pidx && pending < 64 && total < budget)
218 			continue;
219 		critical_enter();
220 		os.state = r->state;
221 		do {
222 			ns.state = os.state;
223 			ns.cidx = cidx;
224 			ns.flags = state_to_flags(ns, total >= budget);
225 		} while (atomic_fcmpset_acq_64(&r->state, &os.state,
226 		    ns.state) == 0);
227 		critical_exit();
228 
229 		if (ns.flags == ABDICATED)
230 			counter_u64_add(r->abdications, 1);
231 		if (ns.flags != BUSY) {
232 			/* Wrong loop exit if we're going to stall. */
233 			MPASS(ns.flags != STALLED);
234 			if (prev == STALLED) {
235 				MPASS(total > 0);
236 				counter_u64_add(r->restarts, 1);
237 			}
238 			break;
239 		}
240 
241 		/*
242 		 * The acquire style atomic above guarantees visibility of items
243 		 * associated with any pidx change that we notice here.
244 		 */
245 		pidx = ns.pidx_tail;
246 		pending = 0;
247 	}
248 }
249 #endif
250 
251 int
252 ifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain,
253     mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags)
254 {
255 	struct ifmp_ring *r;
256 
257 	/* All idx are 16b so size can be 65536 at most */
258 	if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
259 	    can_drain == NULL)
260 		return (EINVAL);
261 	*pr = NULL;
262 	flags &= M_NOWAIT | M_WAITOK;
263 	MPASS(flags != 0);
264 
265 	r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO);
266 	if (r == NULL)
267 		return (ENOMEM);
268 	r->size = size;
269 	r->cookie = cookie;
270 	r->mt = mt;
271 	r->drain = drain;
272 	r->can_drain = can_drain;
273 	r->enqueues = counter_u64_alloc(flags);
274 	r->drops = counter_u64_alloc(flags);
275 	r->starts = counter_u64_alloc(flags);
276 	r->stalls = counter_u64_alloc(flags);
277 	r->restarts = counter_u64_alloc(flags);
278 	r->abdications = counter_u64_alloc(flags);
279 	if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL ||
280 	    r->stalls == NULL || r->restarts == NULL ||
281 	    r->abdications == NULL) {
282 		ifmp_ring_free(r);
283 		return (ENOMEM);
284 	}
285 
286 	*pr = r;
287 #ifdef MP_RING_NO_64BIT_ATOMICS
288 	mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF);
289 #endif
290 	return (0);
291 }
292 
293 void
294 ifmp_ring_free(struct ifmp_ring *r)
295 {
296 
297 	if (r == NULL)
298 		return;
299 
300 	if (r->enqueues != NULL)
301 		counter_u64_free(r->enqueues);
302 	if (r->drops != NULL)
303 		counter_u64_free(r->drops);
304 	if (r->starts != NULL)
305 		counter_u64_free(r->starts);
306 	if (r->stalls != NULL)
307 		counter_u64_free(r->stalls);
308 	if (r->restarts != NULL)
309 		counter_u64_free(r->restarts);
310 	if (r->abdications != NULL)
311 		counter_u64_free(r->abdications);
312 
313 	free(r, r->mt);
314 }
315 
316 /*
317  * Enqueue n items and maybe drain the ring for some time.
318  *
319  * Returns an errno.
320  */
321 #ifdef MP_RING_NO_64BIT_ATOMICS
322 int
323 ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
324 {
325 	union ring_state os, ns;
326 	uint16_t pidx_start, pidx_stop;
327 	int i;
328 
329 	MPASS(items != NULL);
330 	MPASS(n > 0);
331 
332 	mtx_lock(&r->lock);
333 	/*
334 	 * Reserve room for the new items.  Our reservation, if successful, is
335 	 * from 'pidx_start' to 'pidx_stop'.
336 	 */
337 	os.state = r->state;
338 	if (n >= space_available(r, os)) {
339 		counter_u64_add(r->drops, n);
340 		MPASS(os.flags != IDLE);
341 		mtx_unlock(&r->lock);
342 		if (os.flags == STALLED)
343 			ifmp_ring_check_drainage(r, 0);
344 		return (ENOBUFS);
345 	}
346 	ns.state = os.state;
347 	ns.pidx_head = increment_idx(r, os.pidx_head, n);
348 	r->state = ns.state;
349 	pidx_start = os.pidx_head;
350 	pidx_stop = ns.pidx_head;
351 
352 	/*
353 	 * Wait for other producers who got in ahead of us to enqueue their
354 	 * items, one producer at a time.  It is our turn when the ring's
355 	 * pidx_tail reaches the beginning of our reservation (pidx_start).
356 	 */
357 	while (ns.pidx_tail != pidx_start) {
358 		cpu_spinwait();
359 		ns.state = r->state;
360 	}
361 
362 	/* Now it is our turn to fill up the area we reserved earlier. */
363 	i = pidx_start;
364 	do {
365 		r->items[i] = *items++;
366 		if (__predict_false(++i == r->size))
367 			i = 0;
368 	} while (i != pidx_stop);
369 
370 	/*
371 	 * Update the ring's pidx_tail.  The release style atomic guarantees
372 	 * that the items are visible to any thread that sees the updated pidx.
373 	 */
374 	os.state = ns.state = r->state;
375 	ns.pidx_tail = pidx_stop;
376 	if (abdicate) {
377 		if (os.flags == IDLE)
378 			ns.flags = ABDICATED;
379 	} else
380 		ns.flags = BUSY;
381 	r->state = ns.state;
382 	counter_u64_add(r->enqueues, n);
383 
384 	if (!abdicate) {
385 		/*
386 		 * Turn into a consumer if some other thread isn't active as a consumer
387 		 * already.
388 		 */
389 		if (os.flags != BUSY)
390 			drain_ring_locked(r, ns, os.flags, budget);
391 	}
392 
393 	mtx_unlock(&r->lock);
394 	return (0);
395 }
396 #else
397 int
398 ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
399 {
400 	union ring_state os, ns;
401 	uint16_t pidx_start, pidx_stop;
402 	int i;
403 
404 	MPASS(items != NULL);
405 	MPASS(n > 0);
406 
407 	/*
408 	 * Reserve room for the new items.  Our reservation, if successful, is
409 	 * from 'pidx_start' to 'pidx_stop'.
410 	 */
411 	os.state = r->state;
412 	for (;;) {
413 		if (n >= space_available(r, os)) {
414 			counter_u64_add(r->drops, n);
415 			MPASS(os.flags != IDLE);
416 			if (os.flags == STALLED)
417 				ifmp_ring_check_drainage(r, 0);
418 			return (ENOBUFS);
419 		}
420 		ns.state = os.state;
421 		ns.pidx_head = increment_idx(r, os.pidx_head, n);
422 		critical_enter();
423 		if (atomic_fcmpset_64(&r->state, &os.state, ns.state))
424 			break;
425 		critical_exit();
426 		cpu_spinwait();
427 	}
428 	pidx_start = os.pidx_head;
429 	pidx_stop = ns.pidx_head;
430 
431 	/*
432 	 * Wait for other producers who got in ahead of us to enqueue their
433 	 * items, one producer at a time.  It is our turn when the ring's
434 	 * pidx_tail reaches the beginning of our reservation (pidx_start).
435 	 */
436 	while (ns.pidx_tail != pidx_start) {
437 		cpu_spinwait();
438 		ns.state = r->state;
439 	}
440 
441 	/* Now it is our turn to fill up the area we reserved earlier. */
442 	i = pidx_start;
443 	do {
444 		r->items[i] = *items++;
445 		if (__predict_false(++i == r->size))
446 			i = 0;
447 	} while (i != pidx_stop);
448 
449 	/*
450 	 * Update the ring's pidx_tail.  The release style atomic guarantees
451 	 * that the items are visible to any thread that sees the updated pidx.
452 	 */
453 	os.state = r->state;
454 	do {
455 		ns.state = os.state;
456 		ns.pidx_tail = pidx_stop;
457 		if (abdicate) {
458 			if (os.flags == IDLE)
459 				ns.flags = ABDICATED;
460 		} else
461 			ns.flags = BUSY;
462 	} while (atomic_fcmpset_rel_64(&r->state, &os.state, ns.state) == 0);
463 	critical_exit();
464 	counter_u64_add(r->enqueues, n);
465 
466 	if (!abdicate) {
467 		/*
468 		 * Turn into a consumer if some other thread isn't active as a consumer
469 		 * already.
470 		 */
471 		if (os.flags != BUSY)
472 			drain_ring_lockless(r, ns, os.flags, budget);
473 	}
474 
475 	return (0);
476 }
477 #endif
478 
479 void
480 ifmp_ring_check_drainage(struct ifmp_ring *r, int budget)
481 {
482 	union ring_state os, ns;
483 
484 	os.state = r->state;
485 	if ((os.flags != STALLED && os.flags != ABDICATED) ||	// Only continue in STALLED and ABDICATED
486 	    os.pidx_head != os.pidx_tail ||			// Require work to be available
487 	    (os.flags != ABDICATED && r->can_drain(r) == 0))	// Can either drain, or everyone left
488 		return;
489 
490 	MPASS(os.cidx != os.pidx_tail);	/* implied by STALLED */
491 	ns.state = os.state;
492 	ns.flags = BUSY;
493 
494 
495 #ifdef MP_RING_NO_64BIT_ATOMICS
496 	mtx_lock(&r->lock);
497 	if (r->state != os.state) {
498 		mtx_unlock(&r->lock);
499 		return;
500 	}
501 	r->state = ns.state;
502 	drain_ring_locked(r, ns, os.flags, budget);
503 	mtx_unlock(&r->lock);
504 #else
505 	/*
506 	 * The acquire style atomic guarantees visibility of items associated
507 	 * with the pidx that we read here.
508 	 */
509 	if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state))
510 		return;
511 
512 
513 	drain_ring_lockless(r, ns, os.flags, budget);
514 #endif
515 }
516 
517 void
518 ifmp_ring_reset_stats(struct ifmp_ring *r)
519 {
520 
521 	counter_u64_zero(r->enqueues);
522 	counter_u64_zero(r->drops);
523 	counter_u64_zero(r->starts);
524 	counter_u64_zero(r->stalls);
525 	counter_u64_zero(r->restarts);
526 	counter_u64_zero(r->abdications);
527 }
528 
529 int
530 ifmp_ring_is_idle(struct ifmp_ring *r)
531 {
532 	union ring_state s;
533 
534 	s.state = r->state;
535 	if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
536 	    s.flags == IDLE)
537 		return (1);
538 
539 	return (0);
540 }
541 
542 int
543 ifmp_ring_is_stalled(struct ifmp_ring *r)
544 {
545 	union ring_state s;
546 
547 	s.state = r->state;
548 	if (s.pidx_head == s.pidx_tail && s.flags == STALLED)
549 		return (1);
550 
551 	return (0);
552 }
553