xref: /freebsd/sys/dev/cxgbe/t4_mp_ring.c (revision a812392203d7c4c3f0db9d8a0f3391374c49c71f)
1 /*-
2  * Copyright (c) 2014 Chelsio Communications, Inc.
3  * All rights reserved.
4  * Written by: Navdeep Parhar <np@FreeBSD.org>
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25  * SUCH DAMAGE.
26  */
27 
28 #include <sys/cdefs.h>
29 __FBSDID("$FreeBSD$");
30 
31 #include <sys/types.h>
32 #include <sys/param.h>
33 #include <sys/systm.h>
34 #include <sys/counter.h>
35 #include <sys/lock.h>
36 #include <sys/malloc.h>
37 #include <machine/cpu.h>
38 
39 #include "t4_mp_ring.h"
40 
41 union ring_state {
42 	struct {
43 		uint16_t pidx_head;
44 		uint16_t pidx_tail;
45 		uint16_t cidx;
46 		uint16_t flags;
47 	};
48 	uint64_t state;
49 };
50 
51 enum {
52 	IDLE = 0,	/* consumer ran to completion, nothing more to do. */
53 	BUSY,		/* consumer is running already, or will be shortly. */
54 	STALLED,	/* consumer stopped due to lack of resources. */
55 	ABDICATED,	/* consumer stopped even though there was work to be
56 			   done because it wants another thread to take over. */
57 };
58 
59 static inline uint16_t
60 space_available(struct mp_ring *r, union ring_state s)
61 {
62 	uint16_t x = r->size - 1;
63 
64 	if (s.cidx == s.pidx_head)
65 		return (x);
66 	else if (s.cidx > s.pidx_head)
67 		return (s.cidx - s.pidx_head - 1);
68 	else
69 		return (x - s.pidx_head + s.cidx);
70 }
71 
72 static inline uint16_t
73 increment_idx(struct mp_ring *r, uint16_t idx, uint16_t n)
74 {
75 	int x = r->size - idx;
76 
77 	MPASS(x > 0);
78 	return (x > n ? idx + n : n - x);
79 }
80 
81 /* Consumer is about to update the ring's state to s */
82 static inline uint16_t
83 state_to_flags(union ring_state s, int abdicate)
84 {
85 
86 	if (s.cidx == s.pidx_tail)
87 		return (IDLE);
88 	else if (abdicate && s.pidx_tail != s.pidx_head)
89 		return (ABDICATED);
90 
91 	return (BUSY);
92 }
93 
94 /*
95  * Caller passes in a state, with a guarantee that there is work to do and that
96  * all items up to the pidx_tail in the state are visible.
97  */
98 static void
99 drain_ring(struct mp_ring *r, union ring_state os, uint16_t prev, int budget)
100 {
101 	union ring_state ns;
102 	int n, pending, total;
103 	uint16_t cidx = os.cidx;
104 	uint16_t pidx = os.pidx_tail;
105 
106 	MPASS(os.flags == BUSY);
107 	MPASS(cidx != pidx);
108 
109 	if (prev == IDLE)
110 		counter_u64_add(r->starts, 1);
111 	pending = 0;
112 	total = 0;
113 
114 	while (cidx != pidx) {
115 
116 		/* Items from cidx to pidx are available for consumption. */
117 		n = r->drain(r, cidx, pidx);
118 		if (n == 0) {
119 			critical_enter();
120 			do {
121 				os.state = ns.state = r->state;
122 				ns.cidx = cidx;
123 				ns.flags = STALLED;
124 			} while (atomic_cmpset_64(&r->state, os.state,
125 			    ns.state) == 0);
126 			critical_exit();
127 			if (prev != STALLED)
128 				counter_u64_add(r->stalls, 1);
129 			else if (total > 0) {
130 				counter_u64_add(r->restarts, 1);
131 				counter_u64_add(r->stalls, 1);
132 			}
133 			break;
134 		}
135 		cidx = increment_idx(r, cidx, n);
136 		pending += n;
137 		total += n;
138 
139 		/*
140 		 * We update the cidx only if we've caught up with the pidx, the
141 		 * real cidx is getting too far ahead of the one visible to
142 		 * everyone else, or we have exceeded our budget.
143 		 */
144 		if (cidx != pidx && pending < 64 && total < budget)
145 			continue;
146 		critical_enter();
147 		do {
148 			os.state = ns.state = r->state;
149 			ns.cidx = cidx;
150 			ns.flags = state_to_flags(ns, total >= budget);
151 		} while (atomic_cmpset_acq_64(&r->state, os.state, ns.state) == 0);
152 		critical_exit();
153 
154 		if (ns.flags == ABDICATED)
155 			counter_u64_add(r->abdications, 1);
156 		if (ns.flags != BUSY) {
157 			/* Wrong loop exit if we're going to stall. */
158 			MPASS(ns.flags != STALLED);
159 			if (prev == STALLED) {
160 				MPASS(total > 0);
161 				counter_u64_add(r->restarts, 1);
162 			}
163 			break;
164 		}
165 
166 		/*
167 		 * The acquire style atomic above guarantees visibility of items
168 		 * associated with any pidx change that we notice here.
169 		 */
170 		pidx = ns.pidx_tail;
171 		pending = 0;
172 	}
173 }
174 
175 int
176 mp_ring_alloc(struct mp_ring **pr, int size, void *cookie, ring_drain_t drain,
177     ring_can_drain_t can_drain, struct malloc_type *mt, int flags)
178 {
179 	struct mp_ring *r;
180 
181 	/* All idx are 16b so size can be 65536 at most */
182 	if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
183 	    can_drain == NULL)
184 		return (EINVAL);
185 	*pr = NULL;
186 	flags &= M_NOWAIT | M_WAITOK;
187 	MPASS(flags != 0);
188 
189 	r = malloc(__offsetof(struct mp_ring, items[size]), mt, flags | M_ZERO);
190 	if (r == NULL)
191 		return (ENOMEM);
192 	r->size = size;
193 	r->cookie = cookie;
194 	r->mt = mt;
195 	r->drain = drain;
196 	r->can_drain = can_drain;
197 	r->enqueues = counter_u64_alloc(flags);
198 	r->drops = counter_u64_alloc(flags);
199 	r->starts = counter_u64_alloc(flags);
200 	r->stalls = counter_u64_alloc(flags);
201 	r->restarts = counter_u64_alloc(flags);
202 	r->abdications = counter_u64_alloc(flags);
203 	if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL ||
204 	    r->stalls == NULL || r->restarts == NULL ||
205 	    r->abdications == NULL) {
206 		mp_ring_free(r);
207 		return (ENOMEM);
208 	}
209 
210 	*pr = r;
211 	return (0);
212 }
213 
214 void
215 
216 mp_ring_free(struct mp_ring *r)
217 {
218 
219 	if (r == NULL)
220 		return;
221 
222 	if (r->enqueues != NULL)
223 		counter_u64_free(r->enqueues);
224 	if (r->drops != NULL)
225 		counter_u64_free(r->drops);
226 	if (r->starts != NULL)
227 		counter_u64_free(r->starts);
228 	if (r->stalls != NULL)
229 		counter_u64_free(r->stalls);
230 	if (r->restarts != NULL)
231 		counter_u64_free(r->restarts);
232 	if (r->abdications != NULL)
233 		counter_u64_free(r->abdications);
234 
235 	free(r, r->mt);
236 }
237 
238 /*
239  * Enqueue n items and maybe drain the ring for some time.
240  *
241  * Returns an errno.
242  */
243 int
244 mp_ring_enqueue(struct mp_ring *r, void **items, int n, int budget)
245 {
246 	union ring_state os, ns;
247 	uint16_t pidx_start, pidx_stop;
248 	int i;
249 
250 	MPASS(items != NULL);
251 	MPASS(n > 0);
252 
253 	/*
254 	 * Reserve room for the new items.  Our reservation, if successful, is
255 	 * from 'pidx_start' to 'pidx_stop'.
256 	 */
257 	for (;;) {
258 		os.state = r->state;
259 		if (n >= space_available(r, os)) {
260 			counter_u64_add(r->drops, n);
261 			MPASS(os.flags != IDLE);
262 			if (os.flags == STALLED)
263 				mp_ring_check_drainage(r, 0);
264 			return (ENOBUFS);
265 		}
266 		ns.state = os.state;
267 		ns.pidx_head = increment_idx(r, os.pidx_head, n);
268 		critical_enter();
269 		if (atomic_cmpset_64(&r->state, os.state, ns.state))
270 			break;
271 		critical_exit();
272 		cpu_spinwait();
273 	}
274 	pidx_start = os.pidx_head;
275 	pidx_stop = ns.pidx_head;
276 
277 	/*
278 	 * Wait for other producers who got in ahead of us to enqueue their
279 	 * items, one producer at a time.  It is our turn when the ring's
280 	 * pidx_tail reaches the begining of our reservation (pidx_start).
281 	 */
282 	while (ns.pidx_tail != pidx_start) {
283 		cpu_spinwait();
284 		ns.state = r->state;
285 	}
286 
287 	/* Now it is our turn to fill up the area we reserved earlier. */
288 	i = pidx_start;
289 	do {
290 		r->items[i] = *items++;
291 		if (__predict_false(++i == r->size))
292 			i = 0;
293 	} while (i != pidx_stop);
294 
295 	/*
296 	 * Update the ring's pidx_tail.  The release style atomic guarantees
297 	 * that the items are visible to any thread that sees the updated pidx.
298 	 */
299 	do {
300 		os.state = ns.state = r->state;
301 		ns.pidx_tail = pidx_stop;
302 		ns.flags = BUSY;
303 	} while (atomic_cmpset_rel_64(&r->state, os.state, ns.state) == 0);
304 	critical_exit();
305 	counter_u64_add(r->enqueues, n);
306 
307 	/*
308 	 * Turn into a consumer if some other thread isn't active as a consumer
309 	 * already.
310 	 */
311 	if (os.flags != BUSY)
312 		drain_ring(r, ns, os.flags, budget);
313 
314 	return (0);
315 }
316 
317 void
318 mp_ring_check_drainage(struct mp_ring *r, int budget)
319 {
320 	union ring_state os, ns;
321 
322 	os.state = r->state;
323 	if (os.flags != STALLED || os.pidx_head != os.pidx_tail ||
324 	    r->can_drain(r) == 0)
325 		return;
326 
327 	MPASS(os.cidx != os.pidx_tail);	/* implied by STALLED */
328 	ns.state = os.state;
329 	ns.flags = BUSY;
330 
331 	/*
332 	 * The acquire style atomic guarantees visibility of items associated
333 	 * with the pidx that we read here.
334 	 */
335 	if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state))
336 		return;
337 
338 	drain_ring(r, ns, os.flags, budget);
339 }
340 
341 void
342 mp_ring_reset_stats(struct mp_ring *r)
343 {
344 
345 	counter_u64_zero(r->enqueues);
346 	counter_u64_zero(r->drops);
347 	counter_u64_zero(r->starts);
348 	counter_u64_zero(r->stalls);
349 	counter_u64_zero(r->restarts);
350 	counter_u64_zero(r->abdications);
351 }
352 
353 int
354 mp_ring_is_idle(struct mp_ring *r)
355 {
356 	union ring_state s;
357 
358 	s.state = r->state;
359 	if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
360 	    s.flags == IDLE)
361 		return (1);
362 
363 	return (0);
364 }
365