xref: /freebsd/sys/net/mp_ring.c (revision 734e82fe33aa764367791a7d603b383996c6b40b)
1 /*-
2  * Copyright (c) 2014 Chelsio Communications, Inc.
3  * All rights reserved.
4  * Written by: Navdeep Parhar <np@FreeBSD.org>
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25  * SUCH DAMAGE.
26  */
27 
28 #include <sys/cdefs.h>
29 #include <sys/types.h>
30 #include <sys/param.h>
31 #include <sys/systm.h>
32 #include <sys/counter.h>
33 #include <sys/lock.h>
34 #include <sys/mutex.h>
35 #include <sys/malloc.h>
36 #include <machine/cpu.h>
37 #include <net/mp_ring.h>
38 
39 union ring_state {
40 	struct {
41 		uint16_t pidx_head;
42 		uint16_t pidx_tail;
43 		uint16_t cidx;
44 		uint16_t flags;
45 	};
46 	uint64_t state;
47 };
48 
49 enum {
50 	IDLE = 0,	/* consumer ran to completion, nothing more to do. */
51 	BUSY,		/* consumer is running already, or will be shortly. */
52 	STALLED,	/* consumer stopped due to lack of resources. */
53 	ABDICATED,	/* consumer stopped even though there was work to be
54 			   done because it wants another thread to take over. */
55 };
56 
57 static inline uint16_t
58 space_available(struct ifmp_ring *r, union ring_state s)
59 {
60 	uint16_t x = r->size - 1;
61 
62 	if (s.cidx == s.pidx_head)
63 		return (x);
64 	else if (s.cidx > s.pidx_head)
65 		return (s.cidx - s.pidx_head - 1);
66 	else
67 		return (x - s.pidx_head + s.cidx);
68 }
69 
70 static inline uint16_t
71 increment_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n)
72 {
73 	int x = r->size - idx;
74 
75 	MPASS(x > 0);
76 	return (x > n ? idx + n : n - x);
77 }
78 
79 /* Consumer is about to update the ring's state to s */
80 static inline uint16_t
81 state_to_flags(union ring_state s, int abdicate)
82 {
83 
84 	if (s.cidx == s.pidx_tail)
85 		return (IDLE);
86 	else if (abdicate && s.pidx_tail != s.pidx_head)
87 		return (ABDICATED);
88 
89 	return (BUSY);
90 }
91 
92 #ifdef MP_RING_NO_64BIT_ATOMICS
93 static void
94 drain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
95 {
96 	union ring_state ns;
97 	int n, pending, total;
98 	uint16_t cidx = os.cidx;
99 	uint16_t pidx = os.pidx_tail;
100 
101 	MPASS(os.flags == BUSY);
102 	MPASS(cidx != pidx);
103 
104 	if (prev == IDLE)
105 		counter_u64_add(r->starts, 1);
106 	pending = 0;
107 	total = 0;
108 
109 	while (cidx != pidx) {
110 		/* Items from cidx to pidx are available for consumption. */
111 		n = r->drain(r, cidx, pidx);
112 		if (n == 0) {
113 			os.state = ns.state = r->state;
114 			ns.cidx = cidx;
115 			ns.flags = STALLED;
116 			r->state = ns.state;
117 			if (prev != STALLED)
118 				counter_u64_add(r->stalls, 1);
119 			else if (total > 0) {
120 				counter_u64_add(r->restarts, 1);
121 				counter_u64_add(r->stalls, 1);
122 			}
123 			break;
124 		}
125 		cidx = increment_idx(r, cidx, n);
126 		pending += n;
127 		total += n;
128 
129 		/*
130 		 * We update the cidx only if we've caught up with the pidx, the
131 		 * real cidx is getting too far ahead of the one visible to
132 		 * everyone else, or we have exceeded our budget.
133 		 */
134 		if (cidx != pidx && pending < 64 && total < budget)
135 			continue;
136 
137 		os.state = ns.state = r->state;
138 		ns.cidx = cidx;
139 		ns.flags = state_to_flags(ns, total >= budget);
140 		r->state = ns.state;
141 
142 		if (ns.flags == ABDICATED)
143 			counter_u64_add(r->abdications, 1);
144 		if (ns.flags != BUSY) {
145 			/* Wrong loop exit if we're going to stall. */
146 			MPASS(ns.flags != STALLED);
147 			if (prev == STALLED) {
148 				MPASS(total > 0);
149 				counter_u64_add(r->restarts, 1);
150 			}
151 			break;
152 		}
153 
154 		/*
155 		 * The acquire style atomic above guarantees visibility of items
156 		 * associated with any pidx change that we notice here.
157 		 */
158 		pidx = ns.pidx_tail;
159 		pending = 0;
160 	}
161 }
162 #else
163 /*
164  * Caller passes in a state, with a guarantee that there is work to do and that
165  * all items up to the pidx_tail in the state are visible.
166  */
167 static void
168 drain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
169 {
170 	union ring_state ns;
171 	int n, pending, total;
172 	uint16_t cidx = os.cidx;
173 	uint16_t pidx = os.pidx_tail;
174 
175 	MPASS(os.flags == BUSY);
176 	MPASS(cidx != pidx);
177 
178 	if (prev == IDLE)
179 		counter_u64_add(r->starts, 1);
180 	pending = 0;
181 	total = 0;
182 
183 	while (cidx != pidx) {
184 		/* Items from cidx to pidx are available for consumption. */
185 		n = r->drain(r, cidx, pidx);
186 		if (n == 0) {
187 			critical_enter();
188 			os.state = r->state;
189 			do {
190 				ns.state = os.state;
191 				ns.cidx = cidx;
192 				ns.flags = STALLED;
193 			} while (atomic_fcmpset_64(&r->state, &os.state,
194 			    ns.state) == 0);
195 			critical_exit();
196 			if (prev != STALLED)
197 				counter_u64_add(r->stalls, 1);
198 			else if (total > 0) {
199 				counter_u64_add(r->restarts, 1);
200 				counter_u64_add(r->stalls, 1);
201 			}
202 			break;
203 		}
204 		cidx = increment_idx(r, cidx, n);
205 		pending += n;
206 		total += n;
207 
208 		/*
209 		 * We update the cidx only if we've caught up with the pidx, the
210 		 * real cidx is getting too far ahead of the one visible to
211 		 * everyone else, or we have exceeded our budget.
212 		 */
213 		if (cidx != pidx && pending < 64 && total < budget)
214 			continue;
215 		critical_enter();
216 		os.state = r->state;
217 		do {
218 			ns.state = os.state;
219 			ns.cidx = cidx;
220 			ns.flags = state_to_flags(ns, total >= budget);
221 		} while (atomic_fcmpset_acq_64(&r->state, &os.state,
222 		    ns.state) == 0);
223 		critical_exit();
224 
225 		if (ns.flags == ABDICATED)
226 			counter_u64_add(r->abdications, 1);
227 		if (ns.flags != BUSY) {
228 			/* Wrong loop exit if we're going to stall. */
229 			MPASS(ns.flags != STALLED);
230 			if (prev == STALLED) {
231 				MPASS(total > 0);
232 				counter_u64_add(r->restarts, 1);
233 			}
234 			break;
235 		}
236 
237 		/*
238 		 * The acquire style atomic above guarantees visibility of items
239 		 * associated with any pidx change that we notice here.
240 		 */
241 		pidx = ns.pidx_tail;
242 		pending = 0;
243 	}
244 }
245 #endif
246 
247 int
248 ifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain,
249     mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags)
250 {
251 	struct ifmp_ring *r;
252 
253 	/* All idx are 16b so size can be 65536 at most */
254 	if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
255 	    can_drain == NULL)
256 		return (EINVAL);
257 	*pr = NULL;
258 	flags &= M_NOWAIT | M_WAITOK;
259 	MPASS(flags != 0);
260 
261 	r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO);
262 	if (r == NULL)
263 		return (ENOMEM);
264 	r->size = size;
265 	r->cookie = cookie;
266 	r->mt = mt;
267 	r->drain = drain;
268 	r->can_drain = can_drain;
269 	r->enqueues = counter_u64_alloc(flags);
270 	r->drops = counter_u64_alloc(flags);
271 	r->starts = counter_u64_alloc(flags);
272 	r->stalls = counter_u64_alloc(flags);
273 	r->restarts = counter_u64_alloc(flags);
274 	r->abdications = counter_u64_alloc(flags);
275 	if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL ||
276 	    r->stalls == NULL || r->restarts == NULL ||
277 	    r->abdications == NULL) {
278 		ifmp_ring_free(r);
279 		return (ENOMEM);
280 	}
281 
282 	*pr = r;
283 #ifdef MP_RING_NO_64BIT_ATOMICS
284 	mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF);
285 #endif
286 	return (0);
287 }
288 
289 void
290 ifmp_ring_free(struct ifmp_ring *r)
291 {
292 
293 	if (r == NULL)
294 		return;
295 
296 	if (r->enqueues != NULL)
297 		counter_u64_free(r->enqueues);
298 	if (r->drops != NULL)
299 		counter_u64_free(r->drops);
300 	if (r->starts != NULL)
301 		counter_u64_free(r->starts);
302 	if (r->stalls != NULL)
303 		counter_u64_free(r->stalls);
304 	if (r->restarts != NULL)
305 		counter_u64_free(r->restarts);
306 	if (r->abdications != NULL)
307 		counter_u64_free(r->abdications);
308 
309 	free(r, r->mt);
310 }
311 
312 /*
313  * Enqueue n items and maybe drain the ring for some time.
314  *
315  * Returns an errno.
316  */
317 #ifdef MP_RING_NO_64BIT_ATOMICS
318 int
319 ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
320 {
321 	union ring_state os, ns;
322 	uint16_t pidx_start, pidx_stop;
323 	int i;
324 
325 	MPASS(items != NULL);
326 	MPASS(n > 0);
327 
328 	mtx_lock(&r->lock);
329 	/*
330 	 * Reserve room for the new items.  Our reservation, if successful, is
331 	 * from 'pidx_start' to 'pidx_stop'.
332 	 */
333 	os.state = r->state;
334 	if (n >= space_available(r, os)) {
335 		counter_u64_add(r->drops, n);
336 		MPASS(os.flags != IDLE);
337 		mtx_unlock(&r->lock);
338 		if (os.flags == STALLED)
339 			ifmp_ring_check_drainage(r, 0);
340 		return (ENOBUFS);
341 	}
342 	ns.state = os.state;
343 	ns.pidx_head = increment_idx(r, os.pidx_head, n);
344 	r->state = ns.state;
345 	pidx_start = os.pidx_head;
346 	pidx_stop = ns.pidx_head;
347 
348 	/*
349 	 * Wait for other producers who got in ahead of us to enqueue their
350 	 * items, one producer at a time.  It is our turn when the ring's
351 	 * pidx_tail reaches the beginning of our reservation (pidx_start).
352 	 */
353 	while (ns.pidx_tail != pidx_start) {
354 		cpu_spinwait();
355 		ns.state = r->state;
356 	}
357 
358 	/* Now it is our turn to fill up the area we reserved earlier. */
359 	i = pidx_start;
360 	do {
361 		r->items[i] = *items++;
362 		if (__predict_false(++i == r->size))
363 			i = 0;
364 	} while (i != pidx_stop);
365 
366 	/*
367 	 * Update the ring's pidx_tail.  The release style atomic guarantees
368 	 * that the items are visible to any thread that sees the updated pidx.
369 	 */
370 	os.state = ns.state = r->state;
371 	ns.pidx_tail = pidx_stop;
372 	if (abdicate) {
373 		if (os.flags == IDLE)
374 			ns.flags = ABDICATED;
375 	} else
376 		ns.flags = BUSY;
377 	r->state = ns.state;
378 	counter_u64_add(r->enqueues, n);
379 
380 	if (!abdicate) {
381 		/*
382 		 * Turn into a consumer if some other thread isn't active as a consumer
383 		 * already.
384 		 */
385 		if (os.flags != BUSY)
386 			drain_ring_locked(r, ns, os.flags, budget);
387 	}
388 
389 	mtx_unlock(&r->lock);
390 	return (0);
391 }
392 #else
393 int
394 ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
395 {
396 	union ring_state os, ns;
397 	uint16_t pidx_start, pidx_stop;
398 	int i;
399 
400 	MPASS(items != NULL);
401 	MPASS(n > 0);
402 
403 	/*
404 	 * Reserve room for the new items.  Our reservation, if successful, is
405 	 * from 'pidx_start' to 'pidx_stop'.
406 	 */
407 	os.state = r->state;
408 	for (;;) {
409 		if (n >= space_available(r, os)) {
410 			counter_u64_add(r->drops, n);
411 			MPASS(os.flags != IDLE);
412 			if (os.flags == STALLED)
413 				ifmp_ring_check_drainage(r, 0);
414 			return (ENOBUFS);
415 		}
416 		ns.state = os.state;
417 		ns.pidx_head = increment_idx(r, os.pidx_head, n);
418 		critical_enter();
419 		if (atomic_fcmpset_64(&r->state, &os.state, ns.state))
420 			break;
421 		critical_exit();
422 		cpu_spinwait();
423 	}
424 	pidx_start = os.pidx_head;
425 	pidx_stop = ns.pidx_head;
426 
427 	/*
428 	 * Wait for other producers who got in ahead of us to enqueue their
429 	 * items, one producer at a time.  It is our turn when the ring's
430 	 * pidx_tail reaches the beginning of our reservation (pidx_start).
431 	 */
432 	while (ns.pidx_tail != pidx_start) {
433 		cpu_spinwait();
434 		ns.state = r->state;
435 	}
436 
437 	/* Now it is our turn to fill up the area we reserved earlier. */
438 	i = pidx_start;
439 	do {
440 		r->items[i] = *items++;
441 		if (__predict_false(++i == r->size))
442 			i = 0;
443 	} while (i != pidx_stop);
444 
445 	/*
446 	 * Update the ring's pidx_tail.  The release style atomic guarantees
447 	 * that the items are visible to any thread that sees the updated pidx.
448 	 */
449 	os.state = r->state;
450 	do {
451 		ns.state = os.state;
452 		ns.pidx_tail = pidx_stop;
453 		if (abdicate) {
454 			if (os.flags == IDLE)
455 				ns.flags = ABDICATED;
456 		} else
457 			ns.flags = BUSY;
458 	} while (atomic_fcmpset_rel_64(&r->state, &os.state, ns.state) == 0);
459 	critical_exit();
460 	counter_u64_add(r->enqueues, n);
461 
462 	if (!abdicate) {
463 		/*
464 		 * Turn into a consumer if some other thread isn't active as a consumer
465 		 * already.
466 		 */
467 		if (os.flags != BUSY)
468 			drain_ring_lockless(r, ns, os.flags, budget);
469 	}
470 
471 	return (0);
472 }
473 #endif
474 
475 void
476 ifmp_ring_check_drainage(struct ifmp_ring *r, int budget)
477 {
478 	union ring_state os, ns;
479 
480 	os.state = r->state;
481 	if ((os.flags != STALLED && os.flags != ABDICATED) ||	// Only continue in STALLED and ABDICATED
482 	    os.pidx_head != os.pidx_tail ||			// Require work to be available
483 	    (os.flags != ABDICATED && r->can_drain(r) == 0))	// Can either drain, or everyone left
484 		return;
485 
486 	MPASS(os.cidx != os.pidx_tail);	/* implied by STALLED */
487 	ns.state = os.state;
488 	ns.flags = BUSY;
489 
490 #ifdef MP_RING_NO_64BIT_ATOMICS
491 	mtx_lock(&r->lock);
492 	if (r->state != os.state) {
493 		mtx_unlock(&r->lock);
494 		return;
495 	}
496 	r->state = ns.state;
497 	drain_ring_locked(r, ns, os.flags, budget);
498 	mtx_unlock(&r->lock);
499 #else
500 	/*
501 	 * The acquire style atomic guarantees visibility of items associated
502 	 * with the pidx that we read here.
503 	 */
504 	if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state))
505 		return;
506 
507 	drain_ring_lockless(r, ns, os.flags, budget);
508 #endif
509 }
510 
511 void
512 ifmp_ring_reset_stats(struct ifmp_ring *r)
513 {
514 
515 	counter_u64_zero(r->enqueues);
516 	counter_u64_zero(r->drops);
517 	counter_u64_zero(r->starts);
518 	counter_u64_zero(r->stalls);
519 	counter_u64_zero(r->restarts);
520 	counter_u64_zero(r->abdications);
521 }
522 
523 int
524 ifmp_ring_is_idle(struct ifmp_ring *r)
525 {
526 	union ring_state s;
527 
528 	s.state = r->state;
529 	if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
530 	    s.flags == IDLE)
531 		return (1);
532 
533 	return (0);
534 }
535 
536 int
537 ifmp_ring_is_stalled(struct ifmp_ring *r)
538 {
539 	union ring_state s;
540 
541 	s.state = r->state;
542 	if (s.pidx_head == s.pidx_tail && s.flags == STALLED)
543 		return (1);
544 
545 	return (0);
546 }
547