1*fa9e4066Sahrens /* 2*fa9e4066Sahrens * CDDL HEADER START 3*fa9e4066Sahrens * 4*fa9e4066Sahrens * The contents of this file are subject to the terms of the 5*fa9e4066Sahrens * Common Development and Distribution License, Version 1.0 only 6*fa9e4066Sahrens * (the "License"). You may not use this file except in compliance 7*fa9e4066Sahrens * with the License. 8*fa9e4066Sahrens * 9*fa9e4066Sahrens * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10*fa9e4066Sahrens * or http://www.opensolaris.org/os/licensing. 11*fa9e4066Sahrens * See the License for the specific language governing permissions 12*fa9e4066Sahrens * and limitations under the License. 13*fa9e4066Sahrens * 14*fa9e4066Sahrens * When distributing Covered Code, include this CDDL HEADER in each 15*fa9e4066Sahrens * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16*fa9e4066Sahrens * If applicable, add the following below this CDDL HEADER, with the 17*fa9e4066Sahrens * fields enclosed by brackets "[]" replaced with your own identifying 18*fa9e4066Sahrens * information: Portions Copyright [yyyy] [name of copyright owner] 19*fa9e4066Sahrens * 20*fa9e4066Sahrens * CDDL HEADER END 21*fa9e4066Sahrens */ 22*fa9e4066Sahrens /* 23*fa9e4066Sahrens * Copyright 2005 Sun Microsystems, Inc. All rights reserved. 24*fa9e4066Sahrens * Use is subject to license terms. 25*fa9e4066Sahrens */ 26*fa9e4066Sahrens 27*fa9e4066Sahrens #pragma ident "%Z%%M% %I% %E% SMI" 28*fa9e4066Sahrens 29*fa9e4066Sahrens #include <sys/zfs_context.h> 30*fa9e4066Sahrens 31*fa9e4066Sahrens int taskq_now; 32*fa9e4066Sahrens 33*fa9e4066Sahrens typedef struct task { 34*fa9e4066Sahrens struct task *task_next; 35*fa9e4066Sahrens struct task *task_prev; 36*fa9e4066Sahrens task_func_t *task_func; 37*fa9e4066Sahrens void *task_arg; 38*fa9e4066Sahrens } task_t; 39*fa9e4066Sahrens 40*fa9e4066Sahrens #define TASKQ_ACTIVE 0x00010000 41*fa9e4066Sahrens 42*fa9e4066Sahrens struct taskq { 43*fa9e4066Sahrens kmutex_t tq_lock; 44*fa9e4066Sahrens krwlock_t tq_threadlock; 45*fa9e4066Sahrens kcondvar_t tq_dispatch_cv; 46*fa9e4066Sahrens kcondvar_t tq_wait_cv; 47*fa9e4066Sahrens thread_t *tq_threadlist; 48*fa9e4066Sahrens int tq_flags; 49*fa9e4066Sahrens int tq_active; 50*fa9e4066Sahrens int tq_nthreads; 51*fa9e4066Sahrens int tq_nalloc; 52*fa9e4066Sahrens int tq_minalloc; 53*fa9e4066Sahrens int tq_maxalloc; 54*fa9e4066Sahrens task_t *tq_freelist; 55*fa9e4066Sahrens task_t tq_task; 56*fa9e4066Sahrens }; 57*fa9e4066Sahrens 58*fa9e4066Sahrens static task_t * 59*fa9e4066Sahrens task_alloc(taskq_t *tq, int tqflags) 60*fa9e4066Sahrens { 61*fa9e4066Sahrens task_t *t; 62*fa9e4066Sahrens 63*fa9e4066Sahrens if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) { 64*fa9e4066Sahrens tq->tq_freelist = t->task_next; 65*fa9e4066Sahrens } else { 66*fa9e4066Sahrens mutex_exit(&tq->tq_lock); 67*fa9e4066Sahrens if (tq->tq_nalloc >= tq->tq_maxalloc) { 68*fa9e4066Sahrens if (!(tqflags & KM_SLEEP)) { 69*fa9e4066Sahrens mutex_enter(&tq->tq_lock); 70*fa9e4066Sahrens return (NULL); 71*fa9e4066Sahrens } 72*fa9e4066Sahrens /* 73*fa9e4066Sahrens * We don't want to exceed tq_maxalloc, but we can't 74*fa9e4066Sahrens * wait for other tasks to complete (and thus free up 75*fa9e4066Sahrens * task structures) without risking deadlock with 76*fa9e4066Sahrens * the caller. So, we just delay for one second 77*fa9e4066Sahrens * to throttle the allocation rate. 78*fa9e4066Sahrens */ 79*fa9e4066Sahrens delay(hz); 80*fa9e4066Sahrens } 81*fa9e4066Sahrens t = kmem_alloc(sizeof (task_t), tqflags); 82*fa9e4066Sahrens mutex_enter(&tq->tq_lock); 83*fa9e4066Sahrens if (t != NULL) 84*fa9e4066Sahrens tq->tq_nalloc++; 85*fa9e4066Sahrens } 86*fa9e4066Sahrens return (t); 87*fa9e4066Sahrens } 88*fa9e4066Sahrens 89*fa9e4066Sahrens static void 90*fa9e4066Sahrens task_free(taskq_t *tq, task_t *t) 91*fa9e4066Sahrens { 92*fa9e4066Sahrens if (tq->tq_nalloc <= tq->tq_minalloc) { 93*fa9e4066Sahrens t->task_next = tq->tq_freelist; 94*fa9e4066Sahrens tq->tq_freelist = t; 95*fa9e4066Sahrens } else { 96*fa9e4066Sahrens tq->tq_nalloc--; 97*fa9e4066Sahrens mutex_exit(&tq->tq_lock); 98*fa9e4066Sahrens kmem_free(t, sizeof (task_t)); 99*fa9e4066Sahrens mutex_enter(&tq->tq_lock); 100*fa9e4066Sahrens } 101*fa9e4066Sahrens } 102*fa9e4066Sahrens 103*fa9e4066Sahrens taskqid_t 104*fa9e4066Sahrens taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags) 105*fa9e4066Sahrens { 106*fa9e4066Sahrens task_t *t; 107*fa9e4066Sahrens 108*fa9e4066Sahrens if (taskq_now) { 109*fa9e4066Sahrens func(arg); 110*fa9e4066Sahrens return (1); 111*fa9e4066Sahrens } 112*fa9e4066Sahrens 113*fa9e4066Sahrens mutex_enter(&tq->tq_lock); 114*fa9e4066Sahrens ASSERT(tq->tq_flags & TASKQ_ACTIVE); 115*fa9e4066Sahrens if ((t = task_alloc(tq, tqflags)) == NULL) { 116*fa9e4066Sahrens mutex_exit(&tq->tq_lock); 117*fa9e4066Sahrens return (0); 118*fa9e4066Sahrens } 119*fa9e4066Sahrens t->task_next = &tq->tq_task; 120*fa9e4066Sahrens t->task_prev = tq->tq_task.task_prev; 121*fa9e4066Sahrens t->task_next->task_prev = t; 122*fa9e4066Sahrens t->task_prev->task_next = t; 123*fa9e4066Sahrens t->task_func = func; 124*fa9e4066Sahrens t->task_arg = arg; 125*fa9e4066Sahrens cv_signal(&tq->tq_dispatch_cv); 126*fa9e4066Sahrens mutex_exit(&tq->tq_lock); 127*fa9e4066Sahrens return (1); 128*fa9e4066Sahrens } 129*fa9e4066Sahrens 130*fa9e4066Sahrens void 131*fa9e4066Sahrens taskq_wait(taskq_t *tq) 132*fa9e4066Sahrens { 133*fa9e4066Sahrens mutex_enter(&tq->tq_lock); 134*fa9e4066Sahrens while (tq->tq_task.task_next != &tq->tq_task || tq->tq_active != 0) 135*fa9e4066Sahrens cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 136*fa9e4066Sahrens mutex_exit(&tq->tq_lock); 137*fa9e4066Sahrens } 138*fa9e4066Sahrens 139*fa9e4066Sahrens static void * 140*fa9e4066Sahrens taskq_thread(void *arg) 141*fa9e4066Sahrens { 142*fa9e4066Sahrens taskq_t *tq = arg; 143*fa9e4066Sahrens task_t *t; 144*fa9e4066Sahrens 145*fa9e4066Sahrens mutex_enter(&tq->tq_lock); 146*fa9e4066Sahrens while (tq->tq_flags & TASKQ_ACTIVE) { 147*fa9e4066Sahrens if ((t = tq->tq_task.task_next) == &tq->tq_task) { 148*fa9e4066Sahrens if (--tq->tq_active == 0) 149*fa9e4066Sahrens cv_broadcast(&tq->tq_wait_cv); 150*fa9e4066Sahrens cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock); 151*fa9e4066Sahrens tq->tq_active++; 152*fa9e4066Sahrens continue; 153*fa9e4066Sahrens } 154*fa9e4066Sahrens t->task_prev->task_next = t->task_next; 155*fa9e4066Sahrens t->task_next->task_prev = t->task_prev; 156*fa9e4066Sahrens mutex_exit(&tq->tq_lock); 157*fa9e4066Sahrens 158*fa9e4066Sahrens rw_enter(&tq->tq_threadlock, RW_READER); 159*fa9e4066Sahrens t->task_func(t->task_arg); 160*fa9e4066Sahrens rw_exit(&tq->tq_threadlock); 161*fa9e4066Sahrens 162*fa9e4066Sahrens mutex_enter(&tq->tq_lock); 163*fa9e4066Sahrens task_free(tq, t); 164*fa9e4066Sahrens } 165*fa9e4066Sahrens tq->tq_nthreads--; 166*fa9e4066Sahrens cv_broadcast(&tq->tq_wait_cv); 167*fa9e4066Sahrens mutex_exit(&tq->tq_lock); 168*fa9e4066Sahrens return (NULL); 169*fa9e4066Sahrens } 170*fa9e4066Sahrens 171*fa9e4066Sahrens /*ARGSUSED*/ 172*fa9e4066Sahrens taskq_t * 173*fa9e4066Sahrens taskq_create(const char *name, int nthreads, pri_t pri, 174*fa9e4066Sahrens int minalloc, int maxalloc, uint_t flags) 175*fa9e4066Sahrens { 176*fa9e4066Sahrens taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP); 177*fa9e4066Sahrens int t; 178*fa9e4066Sahrens 179*fa9e4066Sahrens rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL); 180*fa9e4066Sahrens tq->tq_flags = flags | TASKQ_ACTIVE; 181*fa9e4066Sahrens tq->tq_active = nthreads; 182*fa9e4066Sahrens tq->tq_nthreads = nthreads; 183*fa9e4066Sahrens tq->tq_minalloc = minalloc; 184*fa9e4066Sahrens tq->tq_maxalloc = maxalloc; 185*fa9e4066Sahrens tq->tq_task.task_next = &tq->tq_task; 186*fa9e4066Sahrens tq->tq_task.task_prev = &tq->tq_task; 187*fa9e4066Sahrens tq->tq_threadlist = kmem_alloc(nthreads * sizeof (thread_t), KM_SLEEP); 188*fa9e4066Sahrens 189*fa9e4066Sahrens if (flags & TASKQ_PREPOPULATE) { 190*fa9e4066Sahrens mutex_enter(&tq->tq_lock); 191*fa9e4066Sahrens while (minalloc-- > 0) 192*fa9e4066Sahrens task_free(tq, task_alloc(tq, KM_SLEEP)); 193*fa9e4066Sahrens mutex_exit(&tq->tq_lock); 194*fa9e4066Sahrens } 195*fa9e4066Sahrens 196*fa9e4066Sahrens for (t = 0; t < nthreads; t++) 197*fa9e4066Sahrens (void) thr_create(0, 0, taskq_thread, 198*fa9e4066Sahrens tq, THR_BOUND, &tq->tq_threadlist[t]); 199*fa9e4066Sahrens 200*fa9e4066Sahrens return (tq); 201*fa9e4066Sahrens } 202*fa9e4066Sahrens 203*fa9e4066Sahrens void 204*fa9e4066Sahrens taskq_destroy(taskq_t *tq) 205*fa9e4066Sahrens { 206*fa9e4066Sahrens int t; 207*fa9e4066Sahrens int nthreads = tq->tq_nthreads; 208*fa9e4066Sahrens 209*fa9e4066Sahrens taskq_wait(tq); 210*fa9e4066Sahrens 211*fa9e4066Sahrens mutex_enter(&tq->tq_lock); 212*fa9e4066Sahrens 213*fa9e4066Sahrens tq->tq_flags &= ~TASKQ_ACTIVE; 214*fa9e4066Sahrens cv_broadcast(&tq->tq_dispatch_cv); 215*fa9e4066Sahrens 216*fa9e4066Sahrens while (tq->tq_nthreads != 0) 217*fa9e4066Sahrens cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 218*fa9e4066Sahrens 219*fa9e4066Sahrens tq->tq_minalloc = 0; 220*fa9e4066Sahrens while (tq->tq_nalloc != 0) { 221*fa9e4066Sahrens ASSERT(tq->tq_freelist != NULL); 222*fa9e4066Sahrens task_free(tq, task_alloc(tq, KM_SLEEP)); 223*fa9e4066Sahrens } 224*fa9e4066Sahrens 225*fa9e4066Sahrens mutex_exit(&tq->tq_lock); 226*fa9e4066Sahrens 227*fa9e4066Sahrens for (t = 0; t < nthreads; t++) 228*fa9e4066Sahrens (void) thr_join(tq->tq_threadlist[t], NULL, NULL); 229*fa9e4066Sahrens 230*fa9e4066Sahrens kmem_free(tq->tq_threadlist, nthreads * sizeof (thread_t)); 231*fa9e4066Sahrens 232*fa9e4066Sahrens rw_destroy(&tq->tq_threadlock); 233*fa9e4066Sahrens 234*fa9e4066Sahrens kmem_free(tq, sizeof (taskq_t)); 235*fa9e4066Sahrens } 236*fa9e4066Sahrens 237*fa9e4066Sahrens int 238*fa9e4066Sahrens taskq_member(taskq_t *tq, void *t) 239*fa9e4066Sahrens { 240*fa9e4066Sahrens int i; 241*fa9e4066Sahrens 242*fa9e4066Sahrens if (taskq_now) 243*fa9e4066Sahrens return (1); 244*fa9e4066Sahrens 245*fa9e4066Sahrens for (i = 0; i < tq->tq_nthreads; i++) 246*fa9e4066Sahrens if (tq->tq_threadlist[i] == (thread_t)(uintptr_t)t) 247*fa9e4066Sahrens return (1); 248*fa9e4066Sahrens 249*fa9e4066Sahrens return (0); 250*fa9e4066Sahrens } 251