1 /*
2 * This file and its contents are supplied under the terms of the
3 * Common Development and Distribution License ("CDDL"), version 1.0.
4 * You may only use this file in accordance with the terms of version
5 * 1.0 of the CDDL.
6 *
7 * A full copy of the text of the CDDL should have accompanied this
8 * source. A copy of the CDDL is also available via the Internet at
9 * http://www.illumos.org/license/CDDL.
10 */
11
12 /*
13 * Copyright 2015 Joyent, Inc.
14 */
15
16 /*
17 * Work queue
18 *
19 * A multi-threaded work queue.
20 *
21 * The general design of this is to add a fixed number of items to the queue and
22 * then drain them with the specified number of threads.
23 */
24
25 #include <strings.h>
26 #include <sys/debug.h>
27 #include <thread.h>
28 #include <synch.h>
29 #include <errno.h>
30 #include <limits.h>
31 #include <stdlib.h>
32
33 #include "workq.h"
34
35 struct workq {
36 mutex_t wq_lock; /* Protects below items */
37 cond_t wq_cond; /* Condition variable */
38 void **wq_items; /* Array of items to process */
39 size_t wq_nitems; /* Number of items in queue */
40 size_t wq_cap; /* Queue capacity */
41 size_t wq_next; /* Next item to process */
42 uint_t wq_ndthreads; /* Desired number of threads */
43 thread_t *wq_thrs; /* Actual threads */
44 workq_proc_f *wq_func; /* Processing function */
45 void *wq_arg; /* Argument for processing */
46 boolean_t wq_working; /* Are we actively using it? */
47 boolean_t wq_iserror; /* Have we encountered an error? */
48 int wq_error; /* Error value, if any */
49 };
50
51 #define WORKQ_DEFAULT_CAP 64
52
53 static int
workq_error(int err)54 workq_error(int err)
55 {
56 VERIFY(err != 0);
57 errno = err;
58 return (WORKQ_ERROR);
59 }
60
61 void
workq_fini(workq_t * wqp)62 workq_fini(workq_t *wqp)
63 {
64 if (wqp == NULL)
65 return;
66
67 VERIFY(wqp->wq_working != B_TRUE);
68 VERIFY0(mutex_destroy(&wqp->wq_lock));
69 VERIFY0(cond_destroy(&wqp->wq_cond));
70 if (wqp->wq_cap > 0)
71 workq_free(wqp->wq_items, sizeof (void *) * wqp->wq_cap);
72 if (wqp->wq_ndthreads > 0)
73 workq_free(wqp->wq_thrs, sizeof (thread_t) * wqp->wq_ndthreads);
74 workq_free(wqp, sizeof (workq_t));
75 }
76
77 int
workq_init(workq_t ** outp,uint_t nthrs)78 workq_init(workq_t **outp, uint_t nthrs)
79 {
80 int ret;
81 workq_t *wqp;
82
83 wqp = workq_alloc(sizeof (workq_t));
84 if (wqp == NULL)
85 return (workq_error(ENOMEM));
86
87 bzero(wqp, sizeof (workq_t));
88 wqp->wq_items = workq_alloc(sizeof (void *) * WORKQ_DEFAULT_CAP);
89 if (wqp->wq_items == NULL) {
90 workq_free(wqp, sizeof (workq_t));
91 return (workq_error(ENOMEM));
92 }
93 bzero(wqp->wq_items, sizeof (void *) * WORKQ_DEFAULT_CAP);
94
95 wqp->wq_ndthreads = nthrs - 1;
96 if (wqp->wq_ndthreads > 0) {
97 wqp->wq_thrs = workq_alloc(sizeof (thread_t) *
98 wqp->wq_ndthreads);
99 if (wqp->wq_thrs == NULL) {
100 workq_free(wqp->wq_items, sizeof (void *) *
101 WORKQ_DEFAULT_CAP);
102 workq_free(wqp, sizeof (workq_t));
103 return (workq_error(ENOMEM));
104 }
105 }
106
107 if ((ret = mutex_init(&wqp->wq_lock, USYNC_THREAD | LOCK_ERRORCHECK,
108 NULL)) != 0) {
109 if (wqp->wq_ndthreads > 0) {
110 workq_free(wqp->wq_thrs,
111 sizeof (thread_t) * wqp->wq_ndthreads);
112 }
113 workq_free(wqp->wq_items, sizeof (void *) * WORKQ_DEFAULT_CAP);
114 workq_free(wqp, sizeof (workq_t));
115 return (workq_error(ret));
116 }
117
118 if ((ret = cond_init(&wqp->wq_cond, USYNC_THREAD, NULL)) != 0) {
119 VERIFY0(mutex_destroy(&wqp->wq_lock));
120 if (wqp->wq_ndthreads > 0) {
121 workq_free(wqp->wq_thrs,
122 sizeof (thread_t) * wqp->wq_ndthreads);
123 }
124 workq_free(wqp->wq_items, sizeof (void *) * WORKQ_DEFAULT_CAP);
125 workq_free(wqp, sizeof (workq_t));
126 return (workq_error(ret));
127 }
128
129 wqp->wq_cap = WORKQ_DEFAULT_CAP;
130 *outp = wqp;
131 return (0);
132 }
133
134 static void
workq_reset(workq_t * wqp)135 workq_reset(workq_t *wqp)
136 {
137 VERIFY(MUTEX_HELD(&wqp->wq_lock));
138 VERIFY(wqp->wq_working == B_FALSE);
139 if (wqp->wq_cap > 0)
140 bzero(wqp->wq_items, sizeof (void *) * wqp->wq_cap);
141 wqp->wq_nitems = 0;
142 wqp->wq_next = 0;
143 wqp->wq_func = NULL;
144 wqp->wq_arg = NULL;
145 wqp->wq_iserror = B_FALSE;
146 wqp->wq_error = 0;
147 }
148
149 static int
workq_grow(workq_t * wqp)150 workq_grow(workq_t *wqp)
151 {
152 size_t ncap;
153 void **items;
154
155 VERIFY(MUTEX_HELD(&wqp->wq_lock));
156 VERIFY(wqp->wq_working == B_FALSE);
157
158 if (SIZE_MAX - wqp->wq_cap < WORKQ_DEFAULT_CAP)
159 return (ENOSPC);
160
161 ncap = wqp->wq_cap + WORKQ_DEFAULT_CAP;
162 items = workq_alloc(ncap * sizeof (void *));
163 if (items == NULL)
164 return (ENOMEM);
165
166 bzero(items, ncap * sizeof (void *));
167 bcopy(wqp->wq_items, items, wqp->wq_cap * sizeof (void *));
168 workq_free(wqp->wq_items, sizeof (void *) * wqp->wq_cap);
169 wqp->wq_items = items;
170 wqp->wq_cap = ncap;
171 return (0);
172 }
173
174 int
workq_add(workq_t * wqp,void * item)175 workq_add(workq_t *wqp, void *item)
176 {
177 VERIFY0(mutex_lock(&wqp->wq_lock));
178 if (wqp->wq_working == B_TRUE) {
179 VERIFY0(mutex_unlock(&wqp->wq_lock));
180 return (workq_error(ENXIO));
181 }
182
183 if (wqp->wq_nitems == wqp->wq_cap) {
184 int ret;
185
186 if ((ret = workq_grow(wqp)) != 0) {
187 VERIFY0(mutex_unlock(&wqp->wq_lock));
188 return (workq_error(ret));
189 }
190 }
191
192 wqp->wq_items[wqp->wq_nitems] = item;
193 wqp->wq_nitems++;
194
195 VERIFY0(mutex_unlock(&wqp->wq_lock));
196
197 return (0);
198 }
199
200 static void *
workq_pop(workq_t * wqp)201 workq_pop(workq_t *wqp)
202 {
203 void *out;
204
205 VERIFY(MUTEX_HELD(&wqp->wq_lock));
206 VERIFY(wqp->wq_next < wqp->wq_nitems);
207
208 out = wqp->wq_items[wqp->wq_next];
209 wqp->wq_items[wqp->wq_next] = NULL;
210 wqp->wq_next++;
211
212 return (out);
213 }
214
215 static void *
workq_thr_work(void * arg)216 workq_thr_work(void *arg)
217 {
218 workq_t *wqp = arg;
219
220 VERIFY0(mutex_lock(&wqp->wq_lock));
221 VERIFY(wqp->wq_working == B_TRUE);
222
223 for (;;) {
224 int ret;
225 void *item;
226
227 if (wqp->wq_iserror == B_TRUE ||
228 wqp->wq_next == wqp->wq_nitems) {
229 VERIFY0(mutex_unlock(&wqp->wq_lock));
230 return (NULL);
231 }
232
233 item = workq_pop(wqp);
234
235 VERIFY0(mutex_unlock(&wqp->wq_lock));
236 ret = wqp->wq_func(item, wqp->wq_arg);
237 VERIFY0(mutex_lock(&wqp->wq_lock));
238
239 if (ret != 0) {
240 if (wqp->wq_iserror == B_FALSE) {
241 wqp->wq_iserror = B_TRUE;
242 wqp->wq_error = ret;
243 }
244 VERIFY0(mutex_unlock(&wqp->wq_lock));
245 return (NULL);
246 }
247 }
248 }
249
250 int
workq_work(workq_t * wqp,workq_proc_f * func,void * arg,int * errp)251 workq_work(workq_t *wqp, workq_proc_f *func, void *arg, int *errp)
252 {
253 int i, ret;
254 boolean_t seterr = B_FALSE;
255
256 if (wqp == NULL || func == NULL)
257 return (workq_error(EINVAL));
258
259 VERIFY0(mutex_lock(&wqp->wq_lock));
260 if (wqp->wq_working == B_TRUE) {
261 VERIFY0(mutex_unlock(&wqp->wq_lock));
262 return (workq_error(EBUSY));
263 }
264
265 if (wqp->wq_nitems == 0) {
266 workq_reset(wqp);
267 VERIFY0(mutex_unlock(&wqp->wq_lock));
268 return (0);
269 }
270
271 wqp->wq_func = func;
272 wqp->wq_arg = arg;
273 wqp->wq_next = 0;
274 wqp->wq_working = B_TRUE;
275
276 ret = 0;
277 for (i = 0; i < wqp->wq_ndthreads; i++) {
278 ret = thr_create(NULL, 0, workq_thr_work, wqp, 0,
279 &wqp->wq_thrs[i]);
280 if (ret != 0) {
281 wqp->wq_iserror = B_TRUE;
282 }
283 }
284
285 VERIFY0(mutex_unlock(&wqp->wq_lock));
286 if (ret == 0)
287 (void) workq_thr_work(wqp);
288
289 for (i = 0; i < wqp->wq_ndthreads; i++) {
290 VERIFY0(thr_join(wqp->wq_thrs[i], NULL, NULL));
291 }
292
293 VERIFY0(mutex_lock(&wqp->wq_lock));
294 wqp->wq_working = B_FALSE;
295 if (ret == 0 && wqp->wq_iserror == B_TRUE) {
296 ret = WORKQ_UERROR;
297 if (errp != NULL)
298 *errp = wqp->wq_error;
299 } else if (ret != 0) {
300 VERIFY(wqp->wq_iserror == B_FALSE);
301 seterr = B_TRUE;
302 }
303
304 workq_reset(wqp);
305 VERIFY0(mutex_unlock(&wqp->wq_lock));
306
307 if (seterr == B_TRUE)
308 return (workq_error(ret));
309
310 return (ret);
311 }
312