xref: /titanic_52/usr/src/lib/mergeq/workq.c (revision 7fd791373689a6af05e27efec3b1ab556e02aa23)
1*7fd79137SRobert Mustacchi /*
2*7fd79137SRobert Mustacchi  * This file and its contents are supplied under the terms of the
3*7fd79137SRobert Mustacchi  * Common Development and Distribution License ("CDDL"), version 1.0.
4*7fd79137SRobert Mustacchi  * You may only use this file in accordance with the terms of version
5*7fd79137SRobert Mustacchi  * 1.0 of the CDDL.
6*7fd79137SRobert Mustacchi  *
7*7fd79137SRobert Mustacchi  * A full copy of the text of the CDDL should have accompanied this
8*7fd79137SRobert Mustacchi  * source.  A copy of the CDDL is also available via the Internet at
9*7fd79137SRobert Mustacchi  * http://www.illumos.org/license/CDDL.
10*7fd79137SRobert Mustacchi  */
11*7fd79137SRobert Mustacchi 
12*7fd79137SRobert Mustacchi /*
13*7fd79137SRobert Mustacchi  * Copyright 2015 Joyent, Inc.
14*7fd79137SRobert Mustacchi  */
15*7fd79137SRobert Mustacchi 
16*7fd79137SRobert Mustacchi /*
17*7fd79137SRobert Mustacchi  * Work queue
18*7fd79137SRobert Mustacchi  *
19*7fd79137SRobert Mustacchi  * A multi-threaded work queue.
20*7fd79137SRobert Mustacchi  *
21*7fd79137SRobert Mustacchi  * The general design of this is to add a fixed number of items to the queue and
22*7fd79137SRobert Mustacchi  * then drain them with the specified number of threads.
23*7fd79137SRobert Mustacchi  */
24*7fd79137SRobert Mustacchi 
25*7fd79137SRobert Mustacchi #include <strings.h>
26*7fd79137SRobert Mustacchi #include <sys/debug.h>
27*7fd79137SRobert Mustacchi #include <thread.h>
28*7fd79137SRobert Mustacchi #include <synch.h>
29*7fd79137SRobert Mustacchi #include <errno.h>
30*7fd79137SRobert Mustacchi #include <limits.h>
31*7fd79137SRobert Mustacchi #include <stdlib.h>
32*7fd79137SRobert Mustacchi 
33*7fd79137SRobert Mustacchi #include "workq.h"
34*7fd79137SRobert Mustacchi 
35*7fd79137SRobert Mustacchi struct workq {
36*7fd79137SRobert Mustacchi 	mutex_t wq_lock;	/* Protects below items */
37*7fd79137SRobert Mustacchi 	cond_t wq_cond;		/* Condition variable */
38*7fd79137SRobert Mustacchi 	void **wq_items;	/* Array of items to process */
39*7fd79137SRobert Mustacchi 	size_t wq_nitems;	/* Number of items in queue */
40*7fd79137SRobert Mustacchi 	size_t wq_cap;		/* Queue capacity */
41*7fd79137SRobert Mustacchi 	size_t wq_next;		/* Next item to process */
42*7fd79137SRobert Mustacchi 	uint_t wq_ndthreads;	/* Desired number of threads */
43*7fd79137SRobert Mustacchi 	thread_t *wq_thrs;	/* Actual threads */
44*7fd79137SRobert Mustacchi 	workq_proc_f *wq_func;	/* Processing function */
45*7fd79137SRobert Mustacchi 	void *wq_arg;		/* Argument for processing */
46*7fd79137SRobert Mustacchi 	boolean_t wq_working;	/* Are we actively using it? */
47*7fd79137SRobert Mustacchi 	boolean_t wq_iserror;	/* Have we encountered an error? */
48*7fd79137SRobert Mustacchi 	int wq_error;		/* Error value, if any */
49*7fd79137SRobert Mustacchi };
50*7fd79137SRobert Mustacchi 
51*7fd79137SRobert Mustacchi #define	WORKQ_DEFAULT_CAP	64
52*7fd79137SRobert Mustacchi 
53*7fd79137SRobert Mustacchi static int
54*7fd79137SRobert Mustacchi workq_error(int err)
55*7fd79137SRobert Mustacchi {
56*7fd79137SRobert Mustacchi 	VERIFY(err != 0);
57*7fd79137SRobert Mustacchi 	errno = err;
58*7fd79137SRobert Mustacchi 	return (WORKQ_ERROR);
59*7fd79137SRobert Mustacchi }
60*7fd79137SRobert Mustacchi 
61*7fd79137SRobert Mustacchi void
62*7fd79137SRobert Mustacchi workq_fini(workq_t *wqp)
63*7fd79137SRobert Mustacchi {
64*7fd79137SRobert Mustacchi 	if (wqp == NULL)
65*7fd79137SRobert Mustacchi 		return;
66*7fd79137SRobert Mustacchi 
67*7fd79137SRobert Mustacchi 	VERIFY(wqp->wq_working != B_TRUE);
68*7fd79137SRobert Mustacchi 	VERIFY0(mutex_destroy(&wqp->wq_lock));
69*7fd79137SRobert Mustacchi 	VERIFY0(cond_destroy(&wqp->wq_cond));
70*7fd79137SRobert Mustacchi 	if (wqp->wq_cap > 0)
71*7fd79137SRobert Mustacchi 		workq_free(wqp->wq_items, sizeof (void *) * wqp->wq_cap);
72*7fd79137SRobert Mustacchi 	if (wqp->wq_ndthreads > 0)
73*7fd79137SRobert Mustacchi 		workq_free(wqp->wq_thrs, sizeof (thread_t) * wqp->wq_ndthreads);
74*7fd79137SRobert Mustacchi 	workq_free(wqp, sizeof (workq_t));
75*7fd79137SRobert Mustacchi }
76*7fd79137SRobert Mustacchi 
77*7fd79137SRobert Mustacchi int
78*7fd79137SRobert Mustacchi workq_init(workq_t **outp, uint_t nthrs)
79*7fd79137SRobert Mustacchi {
80*7fd79137SRobert Mustacchi 	int ret;
81*7fd79137SRobert Mustacchi 	workq_t *wqp;
82*7fd79137SRobert Mustacchi 
83*7fd79137SRobert Mustacchi 	wqp = workq_alloc(sizeof (workq_t));
84*7fd79137SRobert Mustacchi 	if (wqp == NULL)
85*7fd79137SRobert Mustacchi 		return (workq_error(ENOMEM));
86*7fd79137SRobert Mustacchi 
87*7fd79137SRobert Mustacchi 	bzero(wqp, sizeof (workq_t));
88*7fd79137SRobert Mustacchi 	wqp->wq_items = workq_alloc(sizeof (void *) * WORKQ_DEFAULT_CAP);
89*7fd79137SRobert Mustacchi 	if (wqp->wq_items == NULL) {
90*7fd79137SRobert Mustacchi 		workq_free(wqp, sizeof (workq_t));
91*7fd79137SRobert Mustacchi 		return (workq_error(ENOMEM));
92*7fd79137SRobert Mustacchi 	}
93*7fd79137SRobert Mustacchi 	bzero(wqp->wq_items, sizeof (void *) * WORKQ_DEFAULT_CAP);
94*7fd79137SRobert Mustacchi 
95*7fd79137SRobert Mustacchi 	wqp->wq_ndthreads = nthrs - 1;
96*7fd79137SRobert Mustacchi 	if (wqp->wq_ndthreads > 0) {
97*7fd79137SRobert Mustacchi 		wqp->wq_thrs = workq_alloc(sizeof (thread_t) *
98*7fd79137SRobert Mustacchi 		    wqp->wq_ndthreads);
99*7fd79137SRobert Mustacchi 		if (wqp->wq_thrs == NULL) {
100*7fd79137SRobert Mustacchi 			workq_free(wqp->wq_items, sizeof (void *) *
101*7fd79137SRobert Mustacchi 			    WORKQ_DEFAULT_CAP);
102*7fd79137SRobert Mustacchi 			workq_free(wqp, sizeof (workq_t));
103*7fd79137SRobert Mustacchi 			return (workq_error(ENOMEM));
104*7fd79137SRobert Mustacchi 		}
105*7fd79137SRobert Mustacchi 	}
106*7fd79137SRobert Mustacchi 
107*7fd79137SRobert Mustacchi 	if ((ret = mutex_init(&wqp->wq_lock, USYNC_THREAD | LOCK_ERRORCHECK,
108*7fd79137SRobert Mustacchi 	    NULL)) != 0) {
109*7fd79137SRobert Mustacchi 		if (wqp->wq_ndthreads > 0) {
110*7fd79137SRobert Mustacchi 			workq_free(wqp->wq_thrs,
111*7fd79137SRobert Mustacchi 			    sizeof (thread_t) * wqp->wq_ndthreads);
112*7fd79137SRobert Mustacchi 		}
113*7fd79137SRobert Mustacchi 		workq_free(wqp->wq_items, sizeof (void *) * WORKQ_DEFAULT_CAP);
114*7fd79137SRobert Mustacchi 		workq_free(wqp, sizeof (workq_t));
115*7fd79137SRobert Mustacchi 		return (workq_error(ret));
116*7fd79137SRobert Mustacchi 	}
117*7fd79137SRobert Mustacchi 
118*7fd79137SRobert Mustacchi 	if ((ret = cond_init(&wqp->wq_cond, USYNC_THREAD, NULL)) != 0) {
119*7fd79137SRobert Mustacchi 		VERIFY0(mutex_destroy(&wqp->wq_lock));
120*7fd79137SRobert Mustacchi 		if (wqp->wq_ndthreads > 0) {
121*7fd79137SRobert Mustacchi 			workq_free(wqp->wq_thrs,
122*7fd79137SRobert Mustacchi 			    sizeof (thread_t) * wqp->wq_ndthreads);
123*7fd79137SRobert Mustacchi 		}
124*7fd79137SRobert Mustacchi 		workq_free(wqp->wq_items, sizeof (void *) * WORKQ_DEFAULT_CAP);
125*7fd79137SRobert Mustacchi 		workq_free(wqp, sizeof (workq_t));
126*7fd79137SRobert Mustacchi 		return (workq_error(ret));
127*7fd79137SRobert Mustacchi 	}
128*7fd79137SRobert Mustacchi 
129*7fd79137SRobert Mustacchi 	wqp->wq_cap = WORKQ_DEFAULT_CAP;
130*7fd79137SRobert Mustacchi 	*outp = wqp;
131*7fd79137SRobert Mustacchi 	return (0);
132*7fd79137SRobert Mustacchi }
133*7fd79137SRobert Mustacchi 
134*7fd79137SRobert Mustacchi static void
135*7fd79137SRobert Mustacchi workq_reset(workq_t *wqp)
136*7fd79137SRobert Mustacchi {
137*7fd79137SRobert Mustacchi 	VERIFY(MUTEX_HELD(&wqp->wq_lock));
138*7fd79137SRobert Mustacchi 	VERIFY(wqp->wq_working == B_FALSE);
139*7fd79137SRobert Mustacchi 	if (wqp->wq_cap > 0)
140*7fd79137SRobert Mustacchi 		bzero(wqp->wq_items, sizeof (void *) * wqp->wq_cap);
141*7fd79137SRobert Mustacchi 	wqp->wq_nitems = 0;
142*7fd79137SRobert Mustacchi 	wqp->wq_next = 0;
143*7fd79137SRobert Mustacchi 	wqp->wq_func = NULL;
144*7fd79137SRobert Mustacchi 	wqp->wq_arg = NULL;
145*7fd79137SRobert Mustacchi 	wqp->wq_iserror = B_FALSE;
146*7fd79137SRobert Mustacchi 	wqp->wq_error = 0;
147*7fd79137SRobert Mustacchi }
148*7fd79137SRobert Mustacchi 
149*7fd79137SRobert Mustacchi static int
150*7fd79137SRobert Mustacchi workq_grow(workq_t *wqp)
151*7fd79137SRobert Mustacchi {
152*7fd79137SRobert Mustacchi 	size_t ncap;
153*7fd79137SRobert Mustacchi 	void **items;
154*7fd79137SRobert Mustacchi 
155*7fd79137SRobert Mustacchi 	VERIFY(MUTEX_HELD(&wqp->wq_lock));
156*7fd79137SRobert Mustacchi 	VERIFY(wqp->wq_working == B_FALSE);
157*7fd79137SRobert Mustacchi 
158*7fd79137SRobert Mustacchi 	if (SIZE_MAX - wqp->wq_cap < WORKQ_DEFAULT_CAP)
159*7fd79137SRobert Mustacchi 		return (ENOSPC);
160*7fd79137SRobert Mustacchi 
161*7fd79137SRobert Mustacchi 	ncap = wqp->wq_cap + WORKQ_DEFAULT_CAP;
162*7fd79137SRobert Mustacchi 	items = workq_alloc(ncap * sizeof (void *));
163*7fd79137SRobert Mustacchi 	if (items == NULL)
164*7fd79137SRobert Mustacchi 		return (ENOMEM);
165*7fd79137SRobert Mustacchi 
166*7fd79137SRobert Mustacchi 	bzero(items, ncap * sizeof (void *));
167*7fd79137SRobert Mustacchi 	bcopy(wqp->wq_items, items, wqp->wq_cap * sizeof (void *));
168*7fd79137SRobert Mustacchi 	workq_free(wqp->wq_items, sizeof (void *) * wqp->wq_cap);
169*7fd79137SRobert Mustacchi 	wqp->wq_items = items;
170*7fd79137SRobert Mustacchi 	wqp->wq_cap = ncap;
171*7fd79137SRobert Mustacchi 	return (0);
172*7fd79137SRobert Mustacchi }
173*7fd79137SRobert Mustacchi 
174*7fd79137SRobert Mustacchi int
175*7fd79137SRobert Mustacchi workq_add(workq_t *wqp, void *item)
176*7fd79137SRobert Mustacchi {
177*7fd79137SRobert Mustacchi 	VERIFY0(mutex_lock(&wqp->wq_lock));
178*7fd79137SRobert Mustacchi 	if (wqp->wq_working == B_TRUE) {
179*7fd79137SRobert Mustacchi 		VERIFY0(mutex_unlock(&wqp->wq_lock));
180*7fd79137SRobert Mustacchi 		return (workq_error(ENXIO));
181*7fd79137SRobert Mustacchi 	}
182*7fd79137SRobert Mustacchi 
183*7fd79137SRobert Mustacchi 	if (wqp->wq_nitems == wqp->wq_cap) {
184*7fd79137SRobert Mustacchi 		int ret;
185*7fd79137SRobert Mustacchi 
186*7fd79137SRobert Mustacchi 		if ((ret = workq_grow(wqp)) != 0) {
187*7fd79137SRobert Mustacchi 			VERIFY0(mutex_unlock(&wqp->wq_lock));
188*7fd79137SRobert Mustacchi 			return (workq_error(ret));
189*7fd79137SRobert Mustacchi 		}
190*7fd79137SRobert Mustacchi 	}
191*7fd79137SRobert Mustacchi 
192*7fd79137SRobert Mustacchi 	wqp->wq_items[wqp->wq_nitems] = item;
193*7fd79137SRobert Mustacchi 	wqp->wq_nitems++;
194*7fd79137SRobert Mustacchi 
195*7fd79137SRobert Mustacchi 	VERIFY0(mutex_unlock(&wqp->wq_lock));
196*7fd79137SRobert Mustacchi 
197*7fd79137SRobert Mustacchi 	return (0);
198*7fd79137SRobert Mustacchi }
199*7fd79137SRobert Mustacchi 
200*7fd79137SRobert Mustacchi static void *
201*7fd79137SRobert Mustacchi workq_pop(workq_t *wqp)
202*7fd79137SRobert Mustacchi {
203*7fd79137SRobert Mustacchi 	void *out;
204*7fd79137SRobert Mustacchi 
205*7fd79137SRobert Mustacchi 	VERIFY(MUTEX_HELD(&wqp->wq_lock));
206*7fd79137SRobert Mustacchi 	VERIFY(wqp->wq_next < wqp->wq_nitems);
207*7fd79137SRobert Mustacchi 
208*7fd79137SRobert Mustacchi 	out = wqp->wq_items[wqp->wq_next];
209*7fd79137SRobert Mustacchi 	wqp->wq_items[wqp->wq_next] = NULL;
210*7fd79137SRobert Mustacchi 	wqp->wq_next++;
211*7fd79137SRobert Mustacchi 
212*7fd79137SRobert Mustacchi 	return (out);
213*7fd79137SRobert Mustacchi }
214*7fd79137SRobert Mustacchi 
215*7fd79137SRobert Mustacchi static void *
216*7fd79137SRobert Mustacchi workq_thr_work(void *arg)
217*7fd79137SRobert Mustacchi {
218*7fd79137SRobert Mustacchi 	workq_t *wqp = arg;
219*7fd79137SRobert Mustacchi 
220*7fd79137SRobert Mustacchi 	VERIFY0(mutex_lock(&wqp->wq_lock));
221*7fd79137SRobert Mustacchi 	VERIFY(wqp->wq_working == B_TRUE);
222*7fd79137SRobert Mustacchi 
223*7fd79137SRobert Mustacchi 	for (;;) {
224*7fd79137SRobert Mustacchi 		int ret;
225*7fd79137SRobert Mustacchi 		void *item;
226*7fd79137SRobert Mustacchi 
227*7fd79137SRobert Mustacchi 		if (wqp->wq_iserror == B_TRUE ||
228*7fd79137SRobert Mustacchi 		    wqp->wq_next == wqp->wq_nitems) {
229*7fd79137SRobert Mustacchi 			VERIFY0(mutex_unlock(&wqp->wq_lock));
230*7fd79137SRobert Mustacchi 			return (NULL);
231*7fd79137SRobert Mustacchi 		}
232*7fd79137SRobert Mustacchi 
233*7fd79137SRobert Mustacchi 		item = workq_pop(wqp);
234*7fd79137SRobert Mustacchi 
235*7fd79137SRobert Mustacchi 		VERIFY0(mutex_unlock(&wqp->wq_lock));
236*7fd79137SRobert Mustacchi 		ret = wqp->wq_func(item, wqp->wq_arg);
237*7fd79137SRobert Mustacchi 		VERIFY0(mutex_lock(&wqp->wq_lock));
238*7fd79137SRobert Mustacchi 
239*7fd79137SRobert Mustacchi 		if (ret != 0) {
240*7fd79137SRobert Mustacchi 			if (wqp->wq_iserror == B_FALSE) {
241*7fd79137SRobert Mustacchi 				wqp->wq_iserror = B_TRUE;
242*7fd79137SRobert Mustacchi 				wqp->wq_error = ret;
243*7fd79137SRobert Mustacchi 			}
244*7fd79137SRobert Mustacchi 			VERIFY0(mutex_unlock(&wqp->wq_lock));
245*7fd79137SRobert Mustacchi 			return (NULL);
246*7fd79137SRobert Mustacchi 		}
247*7fd79137SRobert Mustacchi 	}
248*7fd79137SRobert Mustacchi }
249*7fd79137SRobert Mustacchi 
250*7fd79137SRobert Mustacchi int
251*7fd79137SRobert Mustacchi workq_work(workq_t *wqp, workq_proc_f *func, void *arg, int *errp)
252*7fd79137SRobert Mustacchi {
253*7fd79137SRobert Mustacchi 	int i, ret;
254*7fd79137SRobert Mustacchi 	boolean_t seterr = B_FALSE;
255*7fd79137SRobert Mustacchi 
256*7fd79137SRobert Mustacchi 	if (wqp == NULL || func == NULL)
257*7fd79137SRobert Mustacchi 		return (workq_error(EINVAL));
258*7fd79137SRobert Mustacchi 
259*7fd79137SRobert Mustacchi 	VERIFY0(mutex_lock(&wqp->wq_lock));
260*7fd79137SRobert Mustacchi 	if (wqp->wq_working == B_TRUE) {
261*7fd79137SRobert Mustacchi 		VERIFY0(mutex_unlock(&wqp->wq_lock));
262*7fd79137SRobert Mustacchi 		return (workq_error(EBUSY));
263*7fd79137SRobert Mustacchi 	}
264*7fd79137SRobert Mustacchi 
265*7fd79137SRobert Mustacchi 	if (wqp->wq_nitems == 0) {
266*7fd79137SRobert Mustacchi 		workq_reset(wqp);
267*7fd79137SRobert Mustacchi 		VERIFY0(mutex_unlock(&wqp->wq_lock));
268*7fd79137SRobert Mustacchi 		return (0);
269*7fd79137SRobert Mustacchi 	}
270*7fd79137SRobert Mustacchi 
271*7fd79137SRobert Mustacchi 	wqp->wq_func = func;
272*7fd79137SRobert Mustacchi 	wqp->wq_arg = arg;
273*7fd79137SRobert Mustacchi 	wqp->wq_next = 0;
274*7fd79137SRobert Mustacchi 	wqp->wq_working = B_TRUE;
275*7fd79137SRobert Mustacchi 
276*7fd79137SRobert Mustacchi 	ret = 0;
277*7fd79137SRobert Mustacchi 	for (i = 0; i < wqp->wq_ndthreads; i++) {
278*7fd79137SRobert Mustacchi 		ret = thr_create(NULL, 0, workq_thr_work, wqp, 0,
279*7fd79137SRobert Mustacchi 		    &wqp->wq_thrs[i]);
280*7fd79137SRobert Mustacchi 		if (ret != 0) {
281*7fd79137SRobert Mustacchi 			wqp->wq_iserror = B_TRUE;
282*7fd79137SRobert Mustacchi 		}
283*7fd79137SRobert Mustacchi 	}
284*7fd79137SRobert Mustacchi 
285*7fd79137SRobert Mustacchi 	VERIFY0(mutex_unlock(&wqp->wq_lock));
286*7fd79137SRobert Mustacchi 	if (ret == 0)
287*7fd79137SRobert Mustacchi 		(void) workq_thr_work(wqp);
288*7fd79137SRobert Mustacchi 
289*7fd79137SRobert Mustacchi 	for (i = 0; i < wqp->wq_ndthreads; i++) {
290*7fd79137SRobert Mustacchi 		VERIFY0(thr_join(wqp->wq_thrs[i], NULL, NULL));
291*7fd79137SRobert Mustacchi 	}
292*7fd79137SRobert Mustacchi 
293*7fd79137SRobert Mustacchi 	VERIFY0(mutex_lock(&wqp->wq_lock));
294*7fd79137SRobert Mustacchi 	wqp->wq_working = B_FALSE;
295*7fd79137SRobert Mustacchi 	if (ret == 0 && wqp->wq_iserror == B_TRUE) {
296*7fd79137SRobert Mustacchi 		ret = WORKQ_UERROR;
297*7fd79137SRobert Mustacchi 		if (errp != NULL)
298*7fd79137SRobert Mustacchi 			*errp = wqp->wq_error;
299*7fd79137SRobert Mustacchi 	} else if (ret != 0) {
300*7fd79137SRobert Mustacchi 		VERIFY(wqp->wq_iserror == B_FALSE);
301*7fd79137SRobert Mustacchi 		seterr = B_TRUE;
302*7fd79137SRobert Mustacchi 	}
303*7fd79137SRobert Mustacchi 
304*7fd79137SRobert Mustacchi 	workq_reset(wqp);
305*7fd79137SRobert Mustacchi 	VERIFY0(mutex_unlock(&wqp->wq_lock));
306*7fd79137SRobert Mustacchi 
307*7fd79137SRobert Mustacchi 	if (seterr == B_TRUE)
308*7fd79137SRobert Mustacchi 		return (workq_error(ret));
309*7fd79137SRobert Mustacchi 
310*7fd79137SRobert Mustacchi 	return (ret);
311*7fd79137SRobert Mustacchi }
312