1*7fd79137SRobert Mustacchi /* 2*7fd79137SRobert Mustacchi * This file and its contents are supplied under the terms of the 3*7fd79137SRobert Mustacchi * Common Development and Distribution License ("CDDL"), version 1.0. 4*7fd79137SRobert Mustacchi * You may only use this file in accordance with the terms of version 5*7fd79137SRobert Mustacchi * 1.0 of the CDDL. 6*7fd79137SRobert Mustacchi * 7*7fd79137SRobert Mustacchi * A full copy of the text of the CDDL should have accompanied this 8*7fd79137SRobert Mustacchi * source. A copy of the CDDL is also available via the Internet at 9*7fd79137SRobert Mustacchi * http://www.illumos.org/license/CDDL. 10*7fd79137SRobert Mustacchi */ 11*7fd79137SRobert Mustacchi 12*7fd79137SRobert Mustacchi /* 13*7fd79137SRobert Mustacchi * Copyright 2015 Joyent, Inc. 14*7fd79137SRobert Mustacchi */ 15*7fd79137SRobert Mustacchi 16*7fd79137SRobert Mustacchi /* 17*7fd79137SRobert Mustacchi * Work queue 18*7fd79137SRobert Mustacchi * 19*7fd79137SRobert Mustacchi * A multi-threaded work queue. 20*7fd79137SRobert Mustacchi * 21*7fd79137SRobert Mustacchi * The general design of this is to add a fixed number of items to the queue and 22*7fd79137SRobert Mustacchi * then drain them with the specified number of threads. 23*7fd79137SRobert Mustacchi */ 24*7fd79137SRobert Mustacchi 25*7fd79137SRobert Mustacchi #include <strings.h> 26*7fd79137SRobert Mustacchi #include <sys/debug.h> 27*7fd79137SRobert Mustacchi #include <thread.h> 28*7fd79137SRobert Mustacchi #include <synch.h> 29*7fd79137SRobert Mustacchi #include <errno.h> 30*7fd79137SRobert Mustacchi #include <limits.h> 31*7fd79137SRobert Mustacchi #include <stdlib.h> 32*7fd79137SRobert Mustacchi 33*7fd79137SRobert Mustacchi #include "workq.h" 34*7fd79137SRobert Mustacchi 35*7fd79137SRobert Mustacchi struct workq { 36*7fd79137SRobert Mustacchi mutex_t wq_lock; /* Protects below items */ 37*7fd79137SRobert Mustacchi cond_t wq_cond; /* Condition variable */ 38*7fd79137SRobert Mustacchi void **wq_items; /* Array of items to process */ 39*7fd79137SRobert Mustacchi size_t wq_nitems; /* Number of items in queue */ 40*7fd79137SRobert Mustacchi size_t wq_cap; /* Queue capacity */ 41*7fd79137SRobert Mustacchi size_t wq_next; /* Next item to process */ 42*7fd79137SRobert Mustacchi uint_t wq_ndthreads; /* Desired number of threads */ 43*7fd79137SRobert Mustacchi thread_t *wq_thrs; /* Actual threads */ 44*7fd79137SRobert Mustacchi workq_proc_f *wq_func; /* Processing function */ 45*7fd79137SRobert Mustacchi void *wq_arg; /* Argument for processing */ 46*7fd79137SRobert Mustacchi boolean_t wq_working; /* Are we actively using it? */ 47*7fd79137SRobert Mustacchi boolean_t wq_iserror; /* Have we encountered an error? */ 48*7fd79137SRobert Mustacchi int wq_error; /* Error value, if any */ 49*7fd79137SRobert Mustacchi }; 50*7fd79137SRobert Mustacchi 51*7fd79137SRobert Mustacchi #define WORKQ_DEFAULT_CAP 64 52*7fd79137SRobert Mustacchi 53*7fd79137SRobert Mustacchi static int 54*7fd79137SRobert Mustacchi workq_error(int err) 55*7fd79137SRobert Mustacchi { 56*7fd79137SRobert Mustacchi VERIFY(err != 0); 57*7fd79137SRobert Mustacchi errno = err; 58*7fd79137SRobert Mustacchi return (WORKQ_ERROR); 59*7fd79137SRobert Mustacchi } 60*7fd79137SRobert Mustacchi 61*7fd79137SRobert Mustacchi void 62*7fd79137SRobert Mustacchi workq_fini(workq_t *wqp) 63*7fd79137SRobert Mustacchi { 64*7fd79137SRobert Mustacchi if (wqp == NULL) 65*7fd79137SRobert Mustacchi return; 66*7fd79137SRobert Mustacchi 67*7fd79137SRobert Mustacchi VERIFY(wqp->wq_working != B_TRUE); 68*7fd79137SRobert Mustacchi VERIFY0(mutex_destroy(&wqp->wq_lock)); 69*7fd79137SRobert Mustacchi VERIFY0(cond_destroy(&wqp->wq_cond)); 70*7fd79137SRobert Mustacchi if (wqp->wq_cap > 0) 71*7fd79137SRobert Mustacchi workq_free(wqp->wq_items, sizeof (void *) * wqp->wq_cap); 72*7fd79137SRobert Mustacchi if (wqp->wq_ndthreads > 0) 73*7fd79137SRobert Mustacchi workq_free(wqp->wq_thrs, sizeof (thread_t) * wqp->wq_ndthreads); 74*7fd79137SRobert Mustacchi workq_free(wqp, sizeof (workq_t)); 75*7fd79137SRobert Mustacchi } 76*7fd79137SRobert Mustacchi 77*7fd79137SRobert Mustacchi int 78*7fd79137SRobert Mustacchi workq_init(workq_t **outp, uint_t nthrs) 79*7fd79137SRobert Mustacchi { 80*7fd79137SRobert Mustacchi int ret; 81*7fd79137SRobert Mustacchi workq_t *wqp; 82*7fd79137SRobert Mustacchi 83*7fd79137SRobert Mustacchi wqp = workq_alloc(sizeof (workq_t)); 84*7fd79137SRobert Mustacchi if (wqp == NULL) 85*7fd79137SRobert Mustacchi return (workq_error(ENOMEM)); 86*7fd79137SRobert Mustacchi 87*7fd79137SRobert Mustacchi bzero(wqp, sizeof (workq_t)); 88*7fd79137SRobert Mustacchi wqp->wq_items = workq_alloc(sizeof (void *) * WORKQ_DEFAULT_CAP); 89*7fd79137SRobert Mustacchi if (wqp->wq_items == NULL) { 90*7fd79137SRobert Mustacchi workq_free(wqp, sizeof (workq_t)); 91*7fd79137SRobert Mustacchi return (workq_error(ENOMEM)); 92*7fd79137SRobert Mustacchi } 93*7fd79137SRobert Mustacchi bzero(wqp->wq_items, sizeof (void *) * WORKQ_DEFAULT_CAP); 94*7fd79137SRobert Mustacchi 95*7fd79137SRobert Mustacchi wqp->wq_ndthreads = nthrs - 1; 96*7fd79137SRobert Mustacchi if (wqp->wq_ndthreads > 0) { 97*7fd79137SRobert Mustacchi wqp->wq_thrs = workq_alloc(sizeof (thread_t) * 98*7fd79137SRobert Mustacchi wqp->wq_ndthreads); 99*7fd79137SRobert Mustacchi if (wqp->wq_thrs == NULL) { 100*7fd79137SRobert Mustacchi workq_free(wqp->wq_items, sizeof (void *) * 101*7fd79137SRobert Mustacchi WORKQ_DEFAULT_CAP); 102*7fd79137SRobert Mustacchi workq_free(wqp, sizeof (workq_t)); 103*7fd79137SRobert Mustacchi return (workq_error(ENOMEM)); 104*7fd79137SRobert Mustacchi } 105*7fd79137SRobert Mustacchi } 106*7fd79137SRobert Mustacchi 107*7fd79137SRobert Mustacchi if ((ret = mutex_init(&wqp->wq_lock, USYNC_THREAD | LOCK_ERRORCHECK, 108*7fd79137SRobert Mustacchi NULL)) != 0) { 109*7fd79137SRobert Mustacchi if (wqp->wq_ndthreads > 0) { 110*7fd79137SRobert Mustacchi workq_free(wqp->wq_thrs, 111*7fd79137SRobert Mustacchi sizeof (thread_t) * wqp->wq_ndthreads); 112*7fd79137SRobert Mustacchi } 113*7fd79137SRobert Mustacchi workq_free(wqp->wq_items, sizeof (void *) * WORKQ_DEFAULT_CAP); 114*7fd79137SRobert Mustacchi workq_free(wqp, sizeof (workq_t)); 115*7fd79137SRobert Mustacchi return (workq_error(ret)); 116*7fd79137SRobert Mustacchi } 117*7fd79137SRobert Mustacchi 118*7fd79137SRobert Mustacchi if ((ret = cond_init(&wqp->wq_cond, USYNC_THREAD, NULL)) != 0) { 119*7fd79137SRobert Mustacchi VERIFY0(mutex_destroy(&wqp->wq_lock)); 120*7fd79137SRobert Mustacchi if (wqp->wq_ndthreads > 0) { 121*7fd79137SRobert Mustacchi workq_free(wqp->wq_thrs, 122*7fd79137SRobert Mustacchi sizeof (thread_t) * wqp->wq_ndthreads); 123*7fd79137SRobert Mustacchi } 124*7fd79137SRobert Mustacchi workq_free(wqp->wq_items, sizeof (void *) * WORKQ_DEFAULT_CAP); 125*7fd79137SRobert Mustacchi workq_free(wqp, sizeof (workq_t)); 126*7fd79137SRobert Mustacchi return (workq_error(ret)); 127*7fd79137SRobert Mustacchi } 128*7fd79137SRobert Mustacchi 129*7fd79137SRobert Mustacchi wqp->wq_cap = WORKQ_DEFAULT_CAP; 130*7fd79137SRobert Mustacchi *outp = wqp; 131*7fd79137SRobert Mustacchi return (0); 132*7fd79137SRobert Mustacchi } 133*7fd79137SRobert Mustacchi 134*7fd79137SRobert Mustacchi static void 135*7fd79137SRobert Mustacchi workq_reset(workq_t *wqp) 136*7fd79137SRobert Mustacchi { 137*7fd79137SRobert Mustacchi VERIFY(MUTEX_HELD(&wqp->wq_lock)); 138*7fd79137SRobert Mustacchi VERIFY(wqp->wq_working == B_FALSE); 139*7fd79137SRobert Mustacchi if (wqp->wq_cap > 0) 140*7fd79137SRobert Mustacchi bzero(wqp->wq_items, sizeof (void *) * wqp->wq_cap); 141*7fd79137SRobert Mustacchi wqp->wq_nitems = 0; 142*7fd79137SRobert Mustacchi wqp->wq_next = 0; 143*7fd79137SRobert Mustacchi wqp->wq_func = NULL; 144*7fd79137SRobert Mustacchi wqp->wq_arg = NULL; 145*7fd79137SRobert Mustacchi wqp->wq_iserror = B_FALSE; 146*7fd79137SRobert Mustacchi wqp->wq_error = 0; 147*7fd79137SRobert Mustacchi } 148*7fd79137SRobert Mustacchi 149*7fd79137SRobert Mustacchi static int 150*7fd79137SRobert Mustacchi workq_grow(workq_t *wqp) 151*7fd79137SRobert Mustacchi { 152*7fd79137SRobert Mustacchi size_t ncap; 153*7fd79137SRobert Mustacchi void **items; 154*7fd79137SRobert Mustacchi 155*7fd79137SRobert Mustacchi VERIFY(MUTEX_HELD(&wqp->wq_lock)); 156*7fd79137SRobert Mustacchi VERIFY(wqp->wq_working == B_FALSE); 157*7fd79137SRobert Mustacchi 158*7fd79137SRobert Mustacchi if (SIZE_MAX - wqp->wq_cap < WORKQ_DEFAULT_CAP) 159*7fd79137SRobert Mustacchi return (ENOSPC); 160*7fd79137SRobert Mustacchi 161*7fd79137SRobert Mustacchi ncap = wqp->wq_cap + WORKQ_DEFAULT_CAP; 162*7fd79137SRobert Mustacchi items = workq_alloc(ncap * sizeof (void *)); 163*7fd79137SRobert Mustacchi if (items == NULL) 164*7fd79137SRobert Mustacchi return (ENOMEM); 165*7fd79137SRobert Mustacchi 166*7fd79137SRobert Mustacchi bzero(items, ncap * sizeof (void *)); 167*7fd79137SRobert Mustacchi bcopy(wqp->wq_items, items, wqp->wq_cap * sizeof (void *)); 168*7fd79137SRobert Mustacchi workq_free(wqp->wq_items, sizeof (void *) * wqp->wq_cap); 169*7fd79137SRobert Mustacchi wqp->wq_items = items; 170*7fd79137SRobert Mustacchi wqp->wq_cap = ncap; 171*7fd79137SRobert Mustacchi return (0); 172*7fd79137SRobert Mustacchi } 173*7fd79137SRobert Mustacchi 174*7fd79137SRobert Mustacchi int 175*7fd79137SRobert Mustacchi workq_add(workq_t *wqp, void *item) 176*7fd79137SRobert Mustacchi { 177*7fd79137SRobert Mustacchi VERIFY0(mutex_lock(&wqp->wq_lock)); 178*7fd79137SRobert Mustacchi if (wqp->wq_working == B_TRUE) { 179*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&wqp->wq_lock)); 180*7fd79137SRobert Mustacchi return (workq_error(ENXIO)); 181*7fd79137SRobert Mustacchi } 182*7fd79137SRobert Mustacchi 183*7fd79137SRobert Mustacchi if (wqp->wq_nitems == wqp->wq_cap) { 184*7fd79137SRobert Mustacchi int ret; 185*7fd79137SRobert Mustacchi 186*7fd79137SRobert Mustacchi if ((ret = workq_grow(wqp)) != 0) { 187*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&wqp->wq_lock)); 188*7fd79137SRobert Mustacchi return (workq_error(ret)); 189*7fd79137SRobert Mustacchi } 190*7fd79137SRobert Mustacchi } 191*7fd79137SRobert Mustacchi 192*7fd79137SRobert Mustacchi wqp->wq_items[wqp->wq_nitems] = item; 193*7fd79137SRobert Mustacchi wqp->wq_nitems++; 194*7fd79137SRobert Mustacchi 195*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&wqp->wq_lock)); 196*7fd79137SRobert Mustacchi 197*7fd79137SRobert Mustacchi return (0); 198*7fd79137SRobert Mustacchi } 199*7fd79137SRobert Mustacchi 200*7fd79137SRobert Mustacchi static void * 201*7fd79137SRobert Mustacchi workq_pop(workq_t *wqp) 202*7fd79137SRobert Mustacchi { 203*7fd79137SRobert Mustacchi void *out; 204*7fd79137SRobert Mustacchi 205*7fd79137SRobert Mustacchi VERIFY(MUTEX_HELD(&wqp->wq_lock)); 206*7fd79137SRobert Mustacchi VERIFY(wqp->wq_next < wqp->wq_nitems); 207*7fd79137SRobert Mustacchi 208*7fd79137SRobert Mustacchi out = wqp->wq_items[wqp->wq_next]; 209*7fd79137SRobert Mustacchi wqp->wq_items[wqp->wq_next] = NULL; 210*7fd79137SRobert Mustacchi wqp->wq_next++; 211*7fd79137SRobert Mustacchi 212*7fd79137SRobert Mustacchi return (out); 213*7fd79137SRobert Mustacchi } 214*7fd79137SRobert Mustacchi 215*7fd79137SRobert Mustacchi static void * 216*7fd79137SRobert Mustacchi workq_thr_work(void *arg) 217*7fd79137SRobert Mustacchi { 218*7fd79137SRobert Mustacchi workq_t *wqp = arg; 219*7fd79137SRobert Mustacchi 220*7fd79137SRobert Mustacchi VERIFY0(mutex_lock(&wqp->wq_lock)); 221*7fd79137SRobert Mustacchi VERIFY(wqp->wq_working == B_TRUE); 222*7fd79137SRobert Mustacchi 223*7fd79137SRobert Mustacchi for (;;) { 224*7fd79137SRobert Mustacchi int ret; 225*7fd79137SRobert Mustacchi void *item; 226*7fd79137SRobert Mustacchi 227*7fd79137SRobert Mustacchi if (wqp->wq_iserror == B_TRUE || 228*7fd79137SRobert Mustacchi wqp->wq_next == wqp->wq_nitems) { 229*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&wqp->wq_lock)); 230*7fd79137SRobert Mustacchi return (NULL); 231*7fd79137SRobert Mustacchi } 232*7fd79137SRobert Mustacchi 233*7fd79137SRobert Mustacchi item = workq_pop(wqp); 234*7fd79137SRobert Mustacchi 235*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&wqp->wq_lock)); 236*7fd79137SRobert Mustacchi ret = wqp->wq_func(item, wqp->wq_arg); 237*7fd79137SRobert Mustacchi VERIFY0(mutex_lock(&wqp->wq_lock)); 238*7fd79137SRobert Mustacchi 239*7fd79137SRobert Mustacchi if (ret != 0) { 240*7fd79137SRobert Mustacchi if (wqp->wq_iserror == B_FALSE) { 241*7fd79137SRobert Mustacchi wqp->wq_iserror = B_TRUE; 242*7fd79137SRobert Mustacchi wqp->wq_error = ret; 243*7fd79137SRobert Mustacchi } 244*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&wqp->wq_lock)); 245*7fd79137SRobert Mustacchi return (NULL); 246*7fd79137SRobert Mustacchi } 247*7fd79137SRobert Mustacchi } 248*7fd79137SRobert Mustacchi } 249*7fd79137SRobert Mustacchi 250*7fd79137SRobert Mustacchi int 251*7fd79137SRobert Mustacchi workq_work(workq_t *wqp, workq_proc_f *func, void *arg, int *errp) 252*7fd79137SRobert Mustacchi { 253*7fd79137SRobert Mustacchi int i, ret; 254*7fd79137SRobert Mustacchi boolean_t seterr = B_FALSE; 255*7fd79137SRobert Mustacchi 256*7fd79137SRobert Mustacchi if (wqp == NULL || func == NULL) 257*7fd79137SRobert Mustacchi return (workq_error(EINVAL)); 258*7fd79137SRobert Mustacchi 259*7fd79137SRobert Mustacchi VERIFY0(mutex_lock(&wqp->wq_lock)); 260*7fd79137SRobert Mustacchi if (wqp->wq_working == B_TRUE) { 261*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&wqp->wq_lock)); 262*7fd79137SRobert Mustacchi return (workq_error(EBUSY)); 263*7fd79137SRobert Mustacchi } 264*7fd79137SRobert Mustacchi 265*7fd79137SRobert Mustacchi if (wqp->wq_nitems == 0) { 266*7fd79137SRobert Mustacchi workq_reset(wqp); 267*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&wqp->wq_lock)); 268*7fd79137SRobert Mustacchi return (0); 269*7fd79137SRobert Mustacchi } 270*7fd79137SRobert Mustacchi 271*7fd79137SRobert Mustacchi wqp->wq_func = func; 272*7fd79137SRobert Mustacchi wqp->wq_arg = arg; 273*7fd79137SRobert Mustacchi wqp->wq_next = 0; 274*7fd79137SRobert Mustacchi wqp->wq_working = B_TRUE; 275*7fd79137SRobert Mustacchi 276*7fd79137SRobert Mustacchi ret = 0; 277*7fd79137SRobert Mustacchi for (i = 0; i < wqp->wq_ndthreads; i++) { 278*7fd79137SRobert Mustacchi ret = thr_create(NULL, 0, workq_thr_work, wqp, 0, 279*7fd79137SRobert Mustacchi &wqp->wq_thrs[i]); 280*7fd79137SRobert Mustacchi if (ret != 0) { 281*7fd79137SRobert Mustacchi wqp->wq_iserror = B_TRUE; 282*7fd79137SRobert Mustacchi } 283*7fd79137SRobert Mustacchi } 284*7fd79137SRobert Mustacchi 285*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&wqp->wq_lock)); 286*7fd79137SRobert Mustacchi if (ret == 0) 287*7fd79137SRobert Mustacchi (void) workq_thr_work(wqp); 288*7fd79137SRobert Mustacchi 289*7fd79137SRobert Mustacchi for (i = 0; i < wqp->wq_ndthreads; i++) { 290*7fd79137SRobert Mustacchi VERIFY0(thr_join(wqp->wq_thrs[i], NULL, NULL)); 291*7fd79137SRobert Mustacchi } 292*7fd79137SRobert Mustacchi 293*7fd79137SRobert Mustacchi VERIFY0(mutex_lock(&wqp->wq_lock)); 294*7fd79137SRobert Mustacchi wqp->wq_working = B_FALSE; 295*7fd79137SRobert Mustacchi if (ret == 0 && wqp->wq_iserror == B_TRUE) { 296*7fd79137SRobert Mustacchi ret = WORKQ_UERROR; 297*7fd79137SRobert Mustacchi if (errp != NULL) 298*7fd79137SRobert Mustacchi *errp = wqp->wq_error; 299*7fd79137SRobert Mustacchi } else if (ret != 0) { 300*7fd79137SRobert Mustacchi VERIFY(wqp->wq_iserror == B_FALSE); 301*7fd79137SRobert Mustacchi seterr = B_TRUE; 302*7fd79137SRobert Mustacchi } 303*7fd79137SRobert Mustacchi 304*7fd79137SRobert Mustacchi workq_reset(wqp); 305*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&wqp->wq_lock)); 306*7fd79137SRobert Mustacchi 307*7fd79137SRobert Mustacchi if (seterr == B_TRUE) 308*7fd79137SRobert Mustacchi return (workq_error(ret)); 309*7fd79137SRobert Mustacchi 310*7fd79137SRobert Mustacchi return (ret); 311*7fd79137SRobert Mustacchi } 312