xref: /freebsd/sys/net/mp_ring.c (revision abd872540f24cfc7dbd1ea29b6918c7082a22108)
1 /*-
2  * Copyright (c) 2014 Chelsio Communications, Inc.
3  * All rights reserved.
4  * Written by: Navdeep Parhar <np@FreeBSD.org>
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25  * SUCH DAMAGE.
26  */
27 
28 #include <sys/types.h>
29 #include <sys/param.h>
30 #include <sys/systm.h>
31 #include <sys/counter.h>
32 #include <sys/lock.h>
33 #include <sys/mutex.h>
34 #include <sys/malloc.h>
35 #include <machine/cpu.h>
36 #include <net/mp_ring.h>
37 
38 union ring_state {
39 	struct {
40 		uint16_t pidx_head;
41 		uint16_t pidx_tail;
42 		uint16_t cidx;
43 		uint16_t flags;
44 	};
45 	uint64_t state;
46 };
47 
48 enum {
49 	IDLE = 0,	/* consumer ran to completion, nothing more to do. */
50 	BUSY,		/* consumer is running already, or will be shortly. */
51 	STALLED,	/* consumer stopped due to lack of resources. */
52 	ABDICATED,	/* consumer stopped even though there was work to be
53 			   done because it wants another thread to take over. */
54 };
55 
56 static inline uint16_t
57 space_available(struct ifmp_ring *r, union ring_state s)
58 {
59 	uint16_t x = r->size - 1;
60 
61 	if (s.cidx == s.pidx_head)
62 		return (x);
63 	else if (s.cidx > s.pidx_head)
64 		return (s.cidx - s.pidx_head - 1);
65 	else
66 		return (x - s.pidx_head + s.cidx);
67 }
68 
69 static inline uint16_t
70 increment_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n)
71 {
72 	int x = r->size - idx;
73 
74 	MPASS(x > 0);
75 	return (x > n ? idx + n : n - x);
76 }
77 
78 /* Consumer is about to update the ring's state to s */
79 static inline uint16_t
80 state_to_flags(union ring_state s, int abdicate)
81 {
82 
83 	if (s.cidx == s.pidx_tail)
84 		return (IDLE);
85 	else if (abdicate && s.pidx_tail != s.pidx_head)
86 		return (ABDICATED);
87 
88 	return (BUSY);
89 }
90 
91 #ifdef MP_RING_NO_64BIT_ATOMICS
92 static void
93 drain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
94 {
95 	union ring_state ns;
96 	int n, pending, total;
97 	uint16_t cidx = os.cidx;
98 	uint16_t pidx = os.pidx_tail;
99 
100 	MPASS(os.flags == BUSY);
101 	MPASS(cidx != pidx);
102 
103 	if (prev == IDLE)
104 		counter_u64_add(r->starts, 1);
105 	pending = 0;
106 	total = 0;
107 
108 	while (cidx != pidx) {
109 		/* Items from cidx to pidx are available for consumption. */
110 		n = r->drain(r, cidx, pidx);
111 		if (n == 0) {
112 			os.state = ns.state = r->state;
113 			ns.cidx = cidx;
114 			ns.flags = STALLED;
115 			r->state = ns.state;
116 			if (prev != STALLED)
117 				counter_u64_add(r->stalls, 1);
118 			else if (total > 0) {
119 				counter_u64_add(r->restarts, 1);
120 				counter_u64_add(r->stalls, 1);
121 			}
122 			break;
123 		}
124 		cidx = increment_idx(r, cidx, n);
125 		pending += n;
126 		total += n;
127 
128 		/*
129 		 * We update the cidx only if we've caught up with the pidx, the
130 		 * real cidx is getting too far ahead of the one visible to
131 		 * everyone else, or we have exceeded our budget.
132 		 */
133 		if (cidx != pidx && pending < 64 && total < budget)
134 			continue;
135 
136 		os.state = ns.state = r->state;
137 		ns.cidx = cidx;
138 		ns.flags = state_to_flags(ns, total >= budget);
139 		r->state = ns.state;
140 
141 		if (ns.flags == ABDICATED)
142 			counter_u64_add(r->abdications, 1);
143 		if (ns.flags != BUSY) {
144 			/* Wrong loop exit if we're going to stall. */
145 			MPASS(ns.flags != STALLED);
146 			if (prev == STALLED) {
147 				MPASS(total > 0);
148 				counter_u64_add(r->restarts, 1);
149 			}
150 			break;
151 		}
152 
153 		/*
154 		 * The acquire style atomic above guarantees visibility of items
155 		 * associated with any pidx change that we notice here.
156 		 */
157 		pidx = ns.pidx_tail;
158 		pending = 0;
159 	}
160 }
161 #else
162 /*
163  * Caller passes in a state, with a guarantee that there is work to do and that
164  * all items up to the pidx_tail in the state are visible.
165  */
166 static void
167 drain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
168 {
169 	union ring_state ns;
170 	int n, pending, total;
171 	uint16_t cidx = os.cidx;
172 	uint16_t pidx = os.pidx_tail;
173 
174 	MPASS(os.flags == BUSY);
175 	MPASS(cidx != pidx);
176 
177 	if (prev == IDLE)
178 		counter_u64_add(r->starts, 1);
179 	pending = 0;
180 	total = 0;
181 
182 	while (cidx != pidx) {
183 		/* Items from cidx to pidx are available for consumption. */
184 		n = r->drain(r, cidx, pidx);
185 		if (n == 0) {
186 			critical_enter();
187 			os.state = r->state;
188 			do {
189 				ns.state = os.state;
190 				ns.cidx = cidx;
191 				ns.flags = STALLED;
192 			} while (atomic_fcmpset_64(&r->state, &os.state,
193 			    ns.state) == 0);
194 			critical_exit();
195 			if (prev != STALLED)
196 				counter_u64_add(r->stalls, 1);
197 			else if (total > 0) {
198 				counter_u64_add(r->restarts, 1);
199 				counter_u64_add(r->stalls, 1);
200 			}
201 			break;
202 		}
203 		cidx = increment_idx(r, cidx, n);
204 		pending += n;
205 		total += n;
206 
207 		/*
208 		 * We update the cidx only if we've caught up with the pidx, the
209 		 * real cidx is getting too far ahead of the one visible to
210 		 * everyone else, or we have exceeded our budget.
211 		 */
212 		if (cidx != pidx && pending < 64 && total < budget)
213 			continue;
214 		critical_enter();
215 		os.state = r->state;
216 		do {
217 			ns.state = os.state;
218 			ns.cidx = cidx;
219 			ns.flags = state_to_flags(ns, total >= budget);
220 		} while (atomic_fcmpset_acq_64(&r->state, &os.state,
221 		    ns.state) == 0);
222 		critical_exit();
223 
224 		if (ns.flags == ABDICATED)
225 			counter_u64_add(r->abdications, 1);
226 		if (ns.flags != BUSY) {
227 			/* Wrong loop exit if we're going to stall. */
228 			MPASS(ns.flags != STALLED);
229 			if (prev == STALLED) {
230 				MPASS(total > 0);
231 				counter_u64_add(r->restarts, 1);
232 			}
233 			break;
234 		}
235 
236 		/*
237 		 * The acquire style atomic above guarantees visibility of items
238 		 * associated with any pidx change that we notice here.
239 		 */
240 		pidx = ns.pidx_tail;
241 		pending = 0;
242 	}
243 }
244 #endif
245 
246 int
247 ifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain,
248     mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags)
249 {
250 	struct ifmp_ring *r;
251 
252 	/* All idx are 16b so size can be 65536 at most */
253 	if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
254 	    can_drain == NULL)
255 		return (EINVAL);
256 	*pr = NULL;
257 	flags &= M_NOWAIT | M_WAITOK;
258 	MPASS(flags != 0);
259 
260 	r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO);
261 	if (r == NULL)
262 		return (ENOMEM);
263 	r->size = size;
264 	r->cookie = cookie;
265 	r->mt = mt;
266 	r->drain = drain;
267 	r->can_drain = can_drain;
268 	r->enqueues = counter_u64_alloc(flags);
269 	r->drops = counter_u64_alloc(flags);
270 	r->starts = counter_u64_alloc(flags);
271 	r->stalls = counter_u64_alloc(flags);
272 	r->restarts = counter_u64_alloc(flags);
273 	r->abdications = counter_u64_alloc(flags);
274 	if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL ||
275 	    r->stalls == NULL || r->restarts == NULL ||
276 	    r->abdications == NULL) {
277 		ifmp_ring_free(r);
278 		return (ENOMEM);
279 	}
280 
281 	*pr = r;
282 #ifdef MP_RING_NO_64BIT_ATOMICS
283 	mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF);
284 #endif
285 	return (0);
286 }
287 
288 void
289 ifmp_ring_free(struct ifmp_ring *r)
290 {
291 
292 	if (r == NULL)
293 		return;
294 
295 	if (r->enqueues != NULL)
296 		counter_u64_free(r->enqueues);
297 	if (r->drops != NULL)
298 		counter_u64_free(r->drops);
299 	if (r->starts != NULL)
300 		counter_u64_free(r->starts);
301 	if (r->stalls != NULL)
302 		counter_u64_free(r->stalls);
303 	if (r->restarts != NULL)
304 		counter_u64_free(r->restarts);
305 	if (r->abdications != NULL)
306 		counter_u64_free(r->abdications);
307 
308 	free(r, r->mt);
309 }
310 
311 /*
312  * Enqueue n items and maybe drain the ring for some time.
313  *
314  * Returns an errno.
315  */
316 #ifdef MP_RING_NO_64BIT_ATOMICS
317 int
318 ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
319 {
320 	union ring_state os, ns;
321 	uint16_t pidx_start, pidx_stop;
322 	int i;
323 
324 	MPASS(items != NULL);
325 	MPASS(n > 0);
326 
327 	mtx_lock(&r->lock);
328 	/*
329 	 * Reserve room for the new items.  Our reservation, if successful, is
330 	 * from 'pidx_start' to 'pidx_stop'.
331 	 */
332 	os.state = r->state;
333 	if (n >= space_available(r, os)) {
334 		counter_u64_add(r->drops, n);
335 		MPASS(os.flags != IDLE);
336 		mtx_unlock(&r->lock);
337 		if (os.flags == STALLED)
338 			ifmp_ring_check_drainage(r, 0);
339 		return (ENOBUFS);
340 	}
341 	ns.state = os.state;
342 	ns.pidx_head = increment_idx(r, os.pidx_head, n);
343 	r->state = ns.state;
344 	pidx_start = os.pidx_head;
345 	pidx_stop = ns.pidx_head;
346 
347 	/*
348 	 * Wait for other producers who got in ahead of us to enqueue their
349 	 * items, one producer at a time.  It is our turn when the ring's
350 	 * pidx_tail reaches the beginning of our reservation (pidx_start).
351 	 */
352 	while (ns.pidx_tail != pidx_start) {
353 		cpu_spinwait();
354 		ns.state = r->state;
355 	}
356 
357 	/* Now it is our turn to fill up the area we reserved earlier. */
358 	i = pidx_start;
359 	do {
360 		r->items[i] = *items++;
361 		if (__predict_false(++i == r->size))
362 			i = 0;
363 	} while (i != pidx_stop);
364 
365 	/*
366 	 * Update the ring's pidx_tail.  The release style atomic guarantees
367 	 * that the items are visible to any thread that sees the updated pidx.
368 	 */
369 	os.state = ns.state = r->state;
370 	ns.pidx_tail = pidx_stop;
371 	if (abdicate) {
372 		if (os.flags == IDLE)
373 			ns.flags = ABDICATED;
374 	} else
375 		ns.flags = BUSY;
376 	r->state = ns.state;
377 	counter_u64_add(r->enqueues, n);
378 
379 	if (!abdicate) {
380 		/*
381 		 * Turn into a consumer if some other thread isn't active as a consumer
382 		 * already.
383 		 */
384 		if (os.flags != BUSY)
385 			drain_ring_locked(r, ns, os.flags, budget);
386 	}
387 
388 	mtx_unlock(&r->lock);
389 	return (0);
390 }
391 #else
392 int
393 ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
394 {
395 	union ring_state os, ns;
396 	uint16_t pidx_start, pidx_stop;
397 	int i;
398 
399 	MPASS(items != NULL);
400 	MPASS(n > 0);
401 
402 	/*
403 	 * Reserve room for the new items.  Our reservation, if successful, is
404 	 * from 'pidx_start' to 'pidx_stop'.
405 	 */
406 	os.state = r->state;
407 	for (;;) {
408 		if (n >= space_available(r, os)) {
409 			counter_u64_add(r->drops, n);
410 			MPASS(os.flags != IDLE);
411 			if (os.flags == STALLED)
412 				ifmp_ring_check_drainage(r, 0);
413 			return (ENOBUFS);
414 		}
415 		ns.state = os.state;
416 		ns.pidx_head = increment_idx(r, os.pidx_head, n);
417 		critical_enter();
418 		if (atomic_fcmpset_64(&r->state, &os.state, ns.state))
419 			break;
420 		critical_exit();
421 		cpu_spinwait();
422 	}
423 	pidx_start = os.pidx_head;
424 	pidx_stop = ns.pidx_head;
425 
426 	/*
427 	 * Wait for other producers who got in ahead of us to enqueue their
428 	 * items, one producer at a time.  It is our turn when the ring's
429 	 * pidx_tail reaches the beginning of our reservation (pidx_start).
430 	 */
431 	while (ns.pidx_tail != pidx_start) {
432 		cpu_spinwait();
433 		ns.state = r->state;
434 	}
435 
436 	/* Now it is our turn to fill up the area we reserved earlier. */
437 	i = pidx_start;
438 	do {
439 		r->items[i] = *items++;
440 		if (__predict_false(++i == r->size))
441 			i = 0;
442 	} while (i != pidx_stop);
443 
444 	/*
445 	 * Update the ring's pidx_tail.  The release style atomic guarantees
446 	 * that the items are visible to any thread that sees the updated pidx.
447 	 */
448 	os.state = r->state;
449 	do {
450 		ns.state = os.state;
451 		ns.pidx_tail = pidx_stop;
452 		if (abdicate) {
453 			if (os.flags == IDLE)
454 				ns.flags = ABDICATED;
455 		} else
456 			ns.flags = BUSY;
457 	} while (atomic_fcmpset_rel_64(&r->state, &os.state, ns.state) == 0);
458 	critical_exit();
459 	counter_u64_add(r->enqueues, n);
460 
461 	if (!abdicate) {
462 		/*
463 		 * Turn into a consumer if some other thread isn't active as a consumer
464 		 * already.
465 		 */
466 		if (os.flags != BUSY)
467 			drain_ring_lockless(r, ns, os.flags, budget);
468 	}
469 
470 	return (0);
471 }
472 #endif
473 
474 void
475 ifmp_ring_check_drainage(struct ifmp_ring *r, int budget)
476 {
477 	union ring_state os, ns;
478 
479 	os.state = r->state;
480 	if ((os.flags != STALLED && os.flags != ABDICATED) ||	// Only continue in STALLED and ABDICATED
481 	    os.pidx_head != os.pidx_tail ||			// Require work to be available
482 	    (os.flags != ABDICATED && r->can_drain(r) == 0))	// Can either drain, or everyone left
483 		return;
484 
485 	MPASS(os.cidx != os.pidx_tail);	/* implied by STALLED */
486 	ns.state = os.state;
487 	ns.flags = BUSY;
488 
489 #ifdef MP_RING_NO_64BIT_ATOMICS
490 	mtx_lock(&r->lock);
491 	if (r->state != os.state) {
492 		mtx_unlock(&r->lock);
493 		return;
494 	}
495 	r->state = ns.state;
496 	drain_ring_locked(r, ns, os.flags, budget);
497 	mtx_unlock(&r->lock);
498 #else
499 	/*
500 	 * The acquire style atomic guarantees visibility of items associated
501 	 * with the pidx that we read here.
502 	 */
503 	if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state))
504 		return;
505 
506 	drain_ring_lockless(r, ns, os.flags, budget);
507 #endif
508 }
509 
510 void
511 ifmp_ring_reset_stats(struct ifmp_ring *r)
512 {
513 
514 	counter_u64_zero(r->enqueues);
515 	counter_u64_zero(r->drops);
516 	counter_u64_zero(r->starts);
517 	counter_u64_zero(r->stalls);
518 	counter_u64_zero(r->restarts);
519 	counter_u64_zero(r->abdications);
520 }
521 
522 int
523 ifmp_ring_is_idle(struct ifmp_ring *r)
524 {
525 	union ring_state s;
526 
527 	s.state = r->state;
528 	if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
529 	    s.flags == IDLE)
530 		return (1);
531 
532 	return (0);
533 }
534 
535 int
536 ifmp_ring_is_stalled(struct ifmp_ring *r)
537 {
538 	union ring_state s;
539 
540 	s.state = r->state;
541 	if (s.pidx_head == s.pidx_tail && s.flags == STALLED)
542 		return (1);
543 
544 	return (0);
545 }
546