1 /* 2 * This file and its contents are supplied under the terms of the 3 * Common Development and Distribution License ("CDDL"), version 1.0. 4 * You may only use this file in accordance with the terms of version 5 * 1.0 of the CDDL. 6 * 7 * A full copy of the text of the CDDL should have accompanied this 8 * source. A copy of the CDDL is also available via the Internet at 9 * http://www.illumos.org/license/CDDL. 10 */ 11 12 /* 13 * Copyright 2015 Joyent, Inc. 14 */ 15 16 /* 17 * Work queue 18 * 19 * A multi-threaded work queue. 20 * 21 * The general design of this is to add a fixed number of items to the queue and 22 * then drain them with the specified number of threads. 23 */ 24 25 #include <strings.h> 26 #include <sys/debug.h> 27 #include <thread.h> 28 #include <synch.h> 29 #include <errno.h> 30 #include <limits.h> 31 #include <stdlib.h> 32 33 #include "workq.h" 34 35 struct workq { 36 mutex_t wq_lock; /* Protects below items */ 37 cond_t wq_cond; /* Condition variable */ 38 void **wq_items; /* Array of items to process */ 39 size_t wq_nitems; /* Number of items in queue */ 40 size_t wq_cap; /* Queue capacity */ 41 size_t wq_next; /* Next item to process */ 42 uint_t wq_ndthreads; /* Desired number of threads */ 43 thread_t *wq_thrs; /* Actual threads */ 44 workq_proc_f *wq_func; /* Processing function */ 45 void *wq_arg; /* Argument for processing */ 46 boolean_t wq_working; /* Are we actively using it? */ 47 boolean_t wq_iserror; /* Have we encountered an error? */ 48 int wq_error; /* Error value, if any */ 49 }; 50 51 #define WORKQ_DEFAULT_CAP 64 52 53 static int 54 workq_error(int err) 55 { 56 VERIFY(err != 0); 57 errno = err; 58 return (WORKQ_ERROR); 59 } 60 61 void 62 workq_fini(workq_t *wqp) 63 { 64 if (wqp == NULL) 65 return; 66 67 VERIFY(wqp->wq_working != B_TRUE); 68 VERIFY0(mutex_destroy(&wqp->wq_lock)); 69 VERIFY0(cond_destroy(&wqp->wq_cond)); 70 if (wqp->wq_cap > 0) 71 workq_free(wqp->wq_items, sizeof (void *) * wqp->wq_cap); 72 if (wqp->wq_ndthreads > 0) 73 workq_free(wqp->wq_thrs, sizeof (thread_t) * wqp->wq_ndthreads); 74 workq_free(wqp, sizeof (workq_t)); 75 } 76 77 int 78 workq_init(workq_t **outp, uint_t nthrs) 79 { 80 int ret; 81 workq_t *wqp; 82 83 wqp = workq_alloc(sizeof (workq_t)); 84 if (wqp == NULL) 85 return (workq_error(ENOMEM)); 86 87 bzero(wqp, sizeof (workq_t)); 88 wqp->wq_items = workq_alloc(sizeof (void *) * WORKQ_DEFAULT_CAP); 89 if (wqp->wq_items == NULL) { 90 workq_free(wqp, sizeof (workq_t)); 91 return (workq_error(ENOMEM)); 92 } 93 bzero(wqp->wq_items, sizeof (void *) * WORKQ_DEFAULT_CAP); 94 95 wqp->wq_ndthreads = nthrs - 1; 96 if (wqp->wq_ndthreads > 0) { 97 wqp->wq_thrs = workq_alloc(sizeof (thread_t) * 98 wqp->wq_ndthreads); 99 if (wqp->wq_thrs == NULL) { 100 workq_free(wqp->wq_items, sizeof (void *) * 101 WORKQ_DEFAULT_CAP); 102 workq_free(wqp, sizeof (workq_t)); 103 return (workq_error(ENOMEM)); 104 } 105 } 106 107 if ((ret = mutex_init(&wqp->wq_lock, USYNC_THREAD | LOCK_ERRORCHECK, 108 NULL)) != 0) { 109 if (wqp->wq_ndthreads > 0) { 110 workq_free(wqp->wq_thrs, 111 sizeof (thread_t) * wqp->wq_ndthreads); 112 } 113 workq_free(wqp->wq_items, sizeof (void *) * WORKQ_DEFAULT_CAP); 114 workq_free(wqp, sizeof (workq_t)); 115 return (workq_error(ret)); 116 } 117 118 if ((ret = cond_init(&wqp->wq_cond, USYNC_THREAD, NULL)) != 0) { 119 VERIFY0(mutex_destroy(&wqp->wq_lock)); 120 if (wqp->wq_ndthreads > 0) { 121 workq_free(wqp->wq_thrs, 122 sizeof (thread_t) * wqp->wq_ndthreads); 123 } 124 workq_free(wqp->wq_items, sizeof (void *) * WORKQ_DEFAULT_CAP); 125 workq_free(wqp, sizeof (workq_t)); 126 return (workq_error(ret)); 127 } 128 129 wqp->wq_cap = WORKQ_DEFAULT_CAP; 130 *outp = wqp; 131 return (0); 132 } 133 134 static void 135 workq_reset(workq_t *wqp) 136 { 137 VERIFY(MUTEX_HELD(&wqp->wq_lock)); 138 VERIFY(wqp->wq_working == B_FALSE); 139 if (wqp->wq_cap > 0) 140 bzero(wqp->wq_items, sizeof (void *) * wqp->wq_cap); 141 wqp->wq_nitems = 0; 142 wqp->wq_next = 0; 143 wqp->wq_func = NULL; 144 wqp->wq_arg = NULL; 145 wqp->wq_iserror = B_FALSE; 146 wqp->wq_error = 0; 147 } 148 149 static int 150 workq_grow(workq_t *wqp) 151 { 152 size_t ncap; 153 void **items; 154 155 VERIFY(MUTEX_HELD(&wqp->wq_lock)); 156 VERIFY(wqp->wq_working == B_FALSE); 157 158 if (SIZE_MAX - wqp->wq_cap < WORKQ_DEFAULT_CAP) 159 return (ENOSPC); 160 161 ncap = wqp->wq_cap + WORKQ_DEFAULT_CAP; 162 items = workq_alloc(ncap * sizeof (void *)); 163 if (items == NULL) 164 return (ENOMEM); 165 166 bzero(items, ncap * sizeof (void *)); 167 bcopy(wqp->wq_items, items, wqp->wq_cap * sizeof (void *)); 168 workq_free(wqp->wq_items, sizeof (void *) * wqp->wq_cap); 169 wqp->wq_items = items; 170 wqp->wq_cap = ncap; 171 return (0); 172 } 173 174 int 175 workq_add(workq_t *wqp, void *item) 176 { 177 VERIFY0(mutex_lock(&wqp->wq_lock)); 178 if (wqp->wq_working == B_TRUE) { 179 VERIFY0(mutex_unlock(&wqp->wq_lock)); 180 return (workq_error(ENXIO)); 181 } 182 183 if (wqp->wq_nitems == wqp->wq_cap) { 184 int ret; 185 186 if ((ret = workq_grow(wqp)) != 0) { 187 VERIFY0(mutex_unlock(&wqp->wq_lock)); 188 return (workq_error(ret)); 189 } 190 } 191 192 wqp->wq_items[wqp->wq_nitems] = item; 193 wqp->wq_nitems++; 194 195 VERIFY0(mutex_unlock(&wqp->wq_lock)); 196 197 return (0); 198 } 199 200 static void * 201 workq_pop(workq_t *wqp) 202 { 203 void *out; 204 205 VERIFY(MUTEX_HELD(&wqp->wq_lock)); 206 VERIFY(wqp->wq_next < wqp->wq_nitems); 207 208 out = wqp->wq_items[wqp->wq_next]; 209 wqp->wq_items[wqp->wq_next] = NULL; 210 wqp->wq_next++; 211 212 return (out); 213 } 214 215 static void * 216 workq_thr_work(void *arg) 217 { 218 workq_t *wqp = arg; 219 220 VERIFY0(mutex_lock(&wqp->wq_lock)); 221 VERIFY(wqp->wq_working == B_TRUE); 222 223 for (;;) { 224 int ret; 225 void *item; 226 227 if (wqp->wq_iserror == B_TRUE || 228 wqp->wq_next == wqp->wq_nitems) { 229 VERIFY0(mutex_unlock(&wqp->wq_lock)); 230 return (NULL); 231 } 232 233 item = workq_pop(wqp); 234 235 VERIFY0(mutex_unlock(&wqp->wq_lock)); 236 ret = wqp->wq_func(item, wqp->wq_arg); 237 VERIFY0(mutex_lock(&wqp->wq_lock)); 238 239 if (ret != 0) { 240 if (wqp->wq_iserror == B_FALSE) { 241 wqp->wq_iserror = B_TRUE; 242 wqp->wq_error = ret; 243 } 244 VERIFY0(mutex_unlock(&wqp->wq_lock)); 245 return (NULL); 246 } 247 } 248 } 249 250 int 251 workq_work(workq_t *wqp, workq_proc_f *func, void *arg, int *errp) 252 { 253 int i, ret; 254 boolean_t seterr = B_FALSE; 255 256 if (wqp == NULL || func == NULL) 257 return (workq_error(EINVAL)); 258 259 VERIFY0(mutex_lock(&wqp->wq_lock)); 260 if (wqp->wq_working == B_TRUE) { 261 VERIFY0(mutex_unlock(&wqp->wq_lock)); 262 return (workq_error(EBUSY)); 263 } 264 265 if (wqp->wq_nitems == 0) { 266 workq_reset(wqp); 267 VERIFY0(mutex_unlock(&wqp->wq_lock)); 268 return (0); 269 } 270 271 wqp->wq_func = func; 272 wqp->wq_arg = arg; 273 wqp->wq_next = 0; 274 wqp->wq_working = B_TRUE; 275 276 ret = 0; 277 for (i = 0; i < wqp->wq_ndthreads; i++) { 278 ret = thr_create(NULL, 0, workq_thr_work, wqp, 0, 279 &wqp->wq_thrs[i]); 280 if (ret != 0) { 281 wqp->wq_iserror = B_TRUE; 282 } 283 } 284 285 VERIFY0(mutex_unlock(&wqp->wq_lock)); 286 if (ret == 0) 287 (void) workq_thr_work(wqp); 288 289 for (i = 0; i < wqp->wq_ndthreads; i++) { 290 VERIFY0(thr_join(wqp->wq_thrs[i], NULL, NULL)); 291 } 292 293 VERIFY0(mutex_lock(&wqp->wq_lock)); 294 wqp->wq_working = B_FALSE; 295 if (ret == 0 && wqp->wq_iserror == B_TRUE) { 296 ret = WORKQ_UERROR; 297 if (errp != NULL) 298 *errp = wqp->wq_error; 299 } else if (ret != 0) { 300 VERIFY(wqp->wq_iserror == B_FALSE); 301 seterr = B_TRUE; 302 } 303 304 workq_reset(wqp); 305 VERIFY0(mutex_unlock(&wqp->wq_lock)); 306 307 if (seterr == B_TRUE) 308 return (workq_error(ret)); 309 310 return (ret); 311 } 312