xref: /freebsd/sys/net/mp_ring.c (revision 9f44a47fd07924afc035991af15d84e6585dea4f)
1 /*-
2  * Copyright (c) 2014 Chelsio Communications, Inc.
3  * All rights reserved.
4  * Written by: Navdeep Parhar <np@FreeBSD.org>
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25  * SUCH DAMAGE.
26  */
27 
28 #include <sys/cdefs.h>
29 __FBSDID("$FreeBSD$");
30 
31 #include <sys/types.h>
32 #include <sys/param.h>
33 #include <sys/systm.h>
34 #include <sys/counter.h>
35 #include <sys/lock.h>
36 #include <sys/mutex.h>
37 #include <sys/malloc.h>
38 #include <machine/cpu.h>
39 #include <net/mp_ring.h>
40 
41 union ring_state {
42 	struct {
43 		uint16_t pidx_head;
44 		uint16_t pidx_tail;
45 		uint16_t cidx;
46 		uint16_t flags;
47 	};
48 	uint64_t state;
49 };
50 
51 enum {
52 	IDLE = 0,	/* consumer ran to completion, nothing more to do. */
53 	BUSY,		/* consumer is running already, or will be shortly. */
54 	STALLED,	/* consumer stopped due to lack of resources. */
55 	ABDICATED,	/* consumer stopped even though there was work to be
56 			   done because it wants another thread to take over. */
57 };
58 
59 static inline uint16_t
60 space_available(struct ifmp_ring *r, union ring_state s)
61 {
62 	uint16_t x = r->size - 1;
63 
64 	if (s.cidx == s.pidx_head)
65 		return (x);
66 	else if (s.cidx > s.pidx_head)
67 		return (s.cidx - s.pidx_head - 1);
68 	else
69 		return (x - s.pidx_head + s.cidx);
70 }
71 
72 static inline uint16_t
73 increment_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n)
74 {
75 	int x = r->size - idx;
76 
77 	MPASS(x > 0);
78 	return (x > n ? idx + n : n - x);
79 }
80 
81 /* Consumer is about to update the ring's state to s */
82 static inline uint16_t
83 state_to_flags(union ring_state s, int abdicate)
84 {
85 
86 	if (s.cidx == s.pidx_tail)
87 		return (IDLE);
88 	else if (abdicate && s.pidx_tail != s.pidx_head)
89 		return (ABDICATED);
90 
91 	return (BUSY);
92 }
93 
94 #ifdef MP_RING_NO_64BIT_ATOMICS
95 static void
96 drain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
97 {
98 	union ring_state ns;
99 	int n, pending, total;
100 	uint16_t cidx = os.cidx;
101 	uint16_t pidx = os.pidx_tail;
102 
103 	MPASS(os.flags == BUSY);
104 	MPASS(cidx != pidx);
105 
106 	if (prev == IDLE)
107 		counter_u64_add(r->starts, 1);
108 	pending = 0;
109 	total = 0;
110 
111 	while (cidx != pidx) {
112 		/* Items from cidx to pidx are available for consumption. */
113 		n = r->drain(r, cidx, pidx);
114 		if (n == 0) {
115 			os.state = ns.state = r->state;
116 			ns.cidx = cidx;
117 			ns.flags = STALLED;
118 			r->state = ns.state;
119 			if (prev != STALLED)
120 				counter_u64_add(r->stalls, 1);
121 			else if (total > 0) {
122 				counter_u64_add(r->restarts, 1);
123 				counter_u64_add(r->stalls, 1);
124 			}
125 			break;
126 		}
127 		cidx = increment_idx(r, cidx, n);
128 		pending += n;
129 		total += n;
130 
131 		/*
132 		 * We update the cidx only if we've caught up with the pidx, the
133 		 * real cidx is getting too far ahead of the one visible to
134 		 * everyone else, or we have exceeded our budget.
135 		 */
136 		if (cidx != pidx && pending < 64 && total < budget)
137 			continue;
138 
139 		os.state = ns.state = r->state;
140 		ns.cidx = cidx;
141 		ns.flags = state_to_flags(ns, total >= budget);
142 		r->state = ns.state;
143 
144 		if (ns.flags == ABDICATED)
145 			counter_u64_add(r->abdications, 1);
146 		if (ns.flags != BUSY) {
147 			/* Wrong loop exit if we're going to stall. */
148 			MPASS(ns.flags != STALLED);
149 			if (prev == STALLED) {
150 				MPASS(total > 0);
151 				counter_u64_add(r->restarts, 1);
152 			}
153 			break;
154 		}
155 
156 		/*
157 		 * The acquire style atomic above guarantees visibility of items
158 		 * associated with any pidx change that we notice here.
159 		 */
160 		pidx = ns.pidx_tail;
161 		pending = 0;
162 	}
163 }
164 #else
165 /*
166  * Caller passes in a state, with a guarantee that there is work to do and that
167  * all items up to the pidx_tail in the state are visible.
168  */
169 static void
170 drain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
171 {
172 	union ring_state ns;
173 	int n, pending, total;
174 	uint16_t cidx = os.cidx;
175 	uint16_t pidx = os.pidx_tail;
176 
177 	MPASS(os.flags == BUSY);
178 	MPASS(cidx != pidx);
179 
180 	if (prev == IDLE)
181 		counter_u64_add(r->starts, 1);
182 	pending = 0;
183 	total = 0;
184 
185 	while (cidx != pidx) {
186 		/* Items from cidx to pidx are available for consumption. */
187 		n = r->drain(r, cidx, pidx);
188 		if (n == 0) {
189 			critical_enter();
190 			os.state = r->state;
191 			do {
192 				ns.state = os.state;
193 				ns.cidx = cidx;
194 				ns.flags = STALLED;
195 			} while (atomic_fcmpset_64(&r->state, &os.state,
196 			    ns.state) == 0);
197 			critical_exit();
198 			if (prev != STALLED)
199 				counter_u64_add(r->stalls, 1);
200 			else if (total > 0) {
201 				counter_u64_add(r->restarts, 1);
202 				counter_u64_add(r->stalls, 1);
203 			}
204 			break;
205 		}
206 		cidx = increment_idx(r, cidx, n);
207 		pending += n;
208 		total += n;
209 
210 		/*
211 		 * We update the cidx only if we've caught up with the pidx, the
212 		 * real cidx is getting too far ahead of the one visible to
213 		 * everyone else, or we have exceeded our budget.
214 		 */
215 		if (cidx != pidx && pending < 64 && total < budget)
216 			continue;
217 		critical_enter();
218 		os.state = r->state;
219 		do {
220 			ns.state = os.state;
221 			ns.cidx = cidx;
222 			ns.flags = state_to_flags(ns, total >= budget);
223 		} while (atomic_fcmpset_acq_64(&r->state, &os.state,
224 		    ns.state) == 0);
225 		critical_exit();
226 
227 		if (ns.flags == ABDICATED)
228 			counter_u64_add(r->abdications, 1);
229 		if (ns.flags != BUSY) {
230 			/* Wrong loop exit if we're going to stall. */
231 			MPASS(ns.flags != STALLED);
232 			if (prev == STALLED) {
233 				MPASS(total > 0);
234 				counter_u64_add(r->restarts, 1);
235 			}
236 			break;
237 		}
238 
239 		/*
240 		 * The acquire style atomic above guarantees visibility of items
241 		 * associated with any pidx change that we notice here.
242 		 */
243 		pidx = ns.pidx_tail;
244 		pending = 0;
245 	}
246 }
247 #endif
248 
249 int
250 ifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain,
251     mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags)
252 {
253 	struct ifmp_ring *r;
254 
255 	/* All idx are 16b so size can be 65536 at most */
256 	if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
257 	    can_drain == NULL)
258 		return (EINVAL);
259 	*pr = NULL;
260 	flags &= M_NOWAIT | M_WAITOK;
261 	MPASS(flags != 0);
262 
263 	r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO);
264 	if (r == NULL)
265 		return (ENOMEM);
266 	r->size = size;
267 	r->cookie = cookie;
268 	r->mt = mt;
269 	r->drain = drain;
270 	r->can_drain = can_drain;
271 	r->enqueues = counter_u64_alloc(flags);
272 	r->drops = counter_u64_alloc(flags);
273 	r->starts = counter_u64_alloc(flags);
274 	r->stalls = counter_u64_alloc(flags);
275 	r->restarts = counter_u64_alloc(flags);
276 	r->abdications = counter_u64_alloc(flags);
277 	if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL ||
278 	    r->stalls == NULL || r->restarts == NULL ||
279 	    r->abdications == NULL) {
280 		ifmp_ring_free(r);
281 		return (ENOMEM);
282 	}
283 
284 	*pr = r;
285 #ifdef MP_RING_NO_64BIT_ATOMICS
286 	mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF);
287 #endif
288 	return (0);
289 }
290 
291 void
292 ifmp_ring_free(struct ifmp_ring *r)
293 {
294 
295 	if (r == NULL)
296 		return;
297 
298 	if (r->enqueues != NULL)
299 		counter_u64_free(r->enqueues);
300 	if (r->drops != NULL)
301 		counter_u64_free(r->drops);
302 	if (r->starts != NULL)
303 		counter_u64_free(r->starts);
304 	if (r->stalls != NULL)
305 		counter_u64_free(r->stalls);
306 	if (r->restarts != NULL)
307 		counter_u64_free(r->restarts);
308 	if (r->abdications != NULL)
309 		counter_u64_free(r->abdications);
310 
311 	free(r, r->mt);
312 }
313 
314 /*
315  * Enqueue n items and maybe drain the ring for some time.
316  *
317  * Returns an errno.
318  */
319 #ifdef MP_RING_NO_64BIT_ATOMICS
320 int
321 ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
322 {
323 	union ring_state os, ns;
324 	uint16_t pidx_start, pidx_stop;
325 	int i;
326 
327 	MPASS(items != NULL);
328 	MPASS(n > 0);
329 
330 	mtx_lock(&r->lock);
331 	/*
332 	 * Reserve room for the new items.  Our reservation, if successful, is
333 	 * from 'pidx_start' to 'pidx_stop'.
334 	 */
335 	os.state = r->state;
336 	if (n >= space_available(r, os)) {
337 		counter_u64_add(r->drops, n);
338 		MPASS(os.flags != IDLE);
339 		mtx_unlock(&r->lock);
340 		if (os.flags == STALLED)
341 			ifmp_ring_check_drainage(r, 0);
342 		return (ENOBUFS);
343 	}
344 	ns.state = os.state;
345 	ns.pidx_head = increment_idx(r, os.pidx_head, n);
346 	r->state = ns.state;
347 	pidx_start = os.pidx_head;
348 	pidx_stop = ns.pidx_head;
349 
350 	/*
351 	 * Wait for other producers who got in ahead of us to enqueue their
352 	 * items, one producer at a time.  It is our turn when the ring's
353 	 * pidx_tail reaches the beginning of our reservation (pidx_start).
354 	 */
355 	while (ns.pidx_tail != pidx_start) {
356 		cpu_spinwait();
357 		ns.state = r->state;
358 	}
359 
360 	/* Now it is our turn to fill up the area we reserved earlier. */
361 	i = pidx_start;
362 	do {
363 		r->items[i] = *items++;
364 		if (__predict_false(++i == r->size))
365 			i = 0;
366 	} while (i != pidx_stop);
367 
368 	/*
369 	 * Update the ring's pidx_tail.  The release style atomic guarantees
370 	 * that the items are visible to any thread that sees the updated pidx.
371 	 */
372 	os.state = ns.state = r->state;
373 	ns.pidx_tail = pidx_stop;
374 	if (abdicate) {
375 		if (os.flags == IDLE)
376 			ns.flags = ABDICATED;
377 	} else
378 		ns.flags = BUSY;
379 	r->state = ns.state;
380 	counter_u64_add(r->enqueues, n);
381 
382 	if (!abdicate) {
383 		/*
384 		 * Turn into a consumer if some other thread isn't active as a consumer
385 		 * already.
386 		 */
387 		if (os.flags != BUSY)
388 			drain_ring_locked(r, ns, os.flags, budget);
389 	}
390 
391 	mtx_unlock(&r->lock);
392 	return (0);
393 }
394 #else
395 int
396 ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
397 {
398 	union ring_state os, ns;
399 	uint16_t pidx_start, pidx_stop;
400 	int i;
401 
402 	MPASS(items != NULL);
403 	MPASS(n > 0);
404 
405 	/*
406 	 * Reserve room for the new items.  Our reservation, if successful, is
407 	 * from 'pidx_start' to 'pidx_stop'.
408 	 */
409 	os.state = r->state;
410 	for (;;) {
411 		if (n >= space_available(r, os)) {
412 			counter_u64_add(r->drops, n);
413 			MPASS(os.flags != IDLE);
414 			if (os.flags == STALLED)
415 				ifmp_ring_check_drainage(r, 0);
416 			return (ENOBUFS);
417 		}
418 		ns.state = os.state;
419 		ns.pidx_head = increment_idx(r, os.pidx_head, n);
420 		critical_enter();
421 		if (atomic_fcmpset_64(&r->state, &os.state, ns.state))
422 			break;
423 		critical_exit();
424 		cpu_spinwait();
425 	}
426 	pidx_start = os.pidx_head;
427 	pidx_stop = ns.pidx_head;
428 
429 	/*
430 	 * Wait for other producers who got in ahead of us to enqueue their
431 	 * items, one producer at a time.  It is our turn when the ring's
432 	 * pidx_tail reaches the beginning of our reservation (pidx_start).
433 	 */
434 	while (ns.pidx_tail != pidx_start) {
435 		cpu_spinwait();
436 		ns.state = r->state;
437 	}
438 
439 	/* Now it is our turn to fill up the area we reserved earlier. */
440 	i = pidx_start;
441 	do {
442 		r->items[i] = *items++;
443 		if (__predict_false(++i == r->size))
444 			i = 0;
445 	} while (i != pidx_stop);
446 
447 	/*
448 	 * Update the ring's pidx_tail.  The release style atomic guarantees
449 	 * that the items are visible to any thread that sees the updated pidx.
450 	 */
451 	os.state = r->state;
452 	do {
453 		ns.state = os.state;
454 		ns.pidx_tail = pidx_stop;
455 		if (abdicate) {
456 			if (os.flags == IDLE)
457 				ns.flags = ABDICATED;
458 		} else
459 			ns.flags = BUSY;
460 	} while (atomic_fcmpset_rel_64(&r->state, &os.state, ns.state) == 0);
461 	critical_exit();
462 	counter_u64_add(r->enqueues, n);
463 
464 	if (!abdicate) {
465 		/*
466 		 * Turn into a consumer if some other thread isn't active as a consumer
467 		 * already.
468 		 */
469 		if (os.flags != BUSY)
470 			drain_ring_lockless(r, ns, os.flags, budget);
471 	}
472 
473 	return (0);
474 }
475 #endif
476 
477 void
478 ifmp_ring_check_drainage(struct ifmp_ring *r, int budget)
479 {
480 	union ring_state os, ns;
481 
482 	os.state = r->state;
483 	if ((os.flags != STALLED && os.flags != ABDICATED) ||	// Only continue in STALLED and ABDICATED
484 	    os.pidx_head != os.pidx_tail ||			// Require work to be available
485 	    (os.flags != ABDICATED && r->can_drain(r) == 0))	// Can either drain, or everyone left
486 		return;
487 
488 	MPASS(os.cidx != os.pidx_tail);	/* implied by STALLED */
489 	ns.state = os.state;
490 	ns.flags = BUSY;
491 
492 #ifdef MP_RING_NO_64BIT_ATOMICS
493 	mtx_lock(&r->lock);
494 	if (r->state != os.state) {
495 		mtx_unlock(&r->lock);
496 		return;
497 	}
498 	r->state = ns.state;
499 	drain_ring_locked(r, ns, os.flags, budget);
500 	mtx_unlock(&r->lock);
501 #else
502 	/*
503 	 * The acquire style atomic guarantees visibility of items associated
504 	 * with the pidx that we read here.
505 	 */
506 	if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state))
507 		return;
508 
509 	drain_ring_lockless(r, ns, os.flags, budget);
510 #endif
511 }
512 
513 void
514 ifmp_ring_reset_stats(struct ifmp_ring *r)
515 {
516 
517 	counter_u64_zero(r->enqueues);
518 	counter_u64_zero(r->drops);
519 	counter_u64_zero(r->starts);
520 	counter_u64_zero(r->stalls);
521 	counter_u64_zero(r->restarts);
522 	counter_u64_zero(r->abdications);
523 }
524 
525 int
526 ifmp_ring_is_idle(struct ifmp_ring *r)
527 {
528 	union ring_state s;
529 
530 	s.state = r->state;
531 	if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
532 	    s.flags == IDLE)
533 		return (1);
534 
535 	return (0);
536 }
537 
538 int
539 ifmp_ring_is_stalled(struct ifmp_ring *r)
540 {
541 	union ring_state s;
542 
543 	s.state = r->state;
544 	if (s.pidx_head == s.pidx_tail && s.flags == STALLED)
545 		return (1);
546 
547 	return (0);
548 }
549