1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License, Version 1.0 only 6 * (the "License"). You may not use this file except in compliance 7 * with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or http://www.opensolaris.org/os/licensing. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 /* 23 * Copyright 2005 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 #include <sys/zfs_context.h> 30 31 int taskq_now; 32 33 typedef struct task { 34 struct task *task_next; 35 struct task *task_prev; 36 task_func_t *task_func; 37 void *task_arg; 38 } task_t; 39 40 #define TASKQ_ACTIVE 0x00010000 41 42 struct taskq { 43 kmutex_t tq_lock; 44 krwlock_t tq_threadlock; 45 kcondvar_t tq_dispatch_cv; 46 kcondvar_t tq_wait_cv; 47 thread_t *tq_threadlist; 48 int tq_flags; 49 int tq_active; 50 int tq_nthreads; 51 int tq_nalloc; 52 int tq_minalloc; 53 int tq_maxalloc; 54 task_t *tq_freelist; 55 task_t tq_task; 56 }; 57 58 static task_t * 59 task_alloc(taskq_t *tq, int tqflags) 60 { 61 task_t *t; 62 63 if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) { 64 tq->tq_freelist = t->task_next; 65 } else { 66 mutex_exit(&tq->tq_lock); 67 if (tq->tq_nalloc >= tq->tq_maxalloc) { 68 if (!(tqflags & KM_SLEEP)) { 69 mutex_enter(&tq->tq_lock); 70 return (NULL); 71 } 72 /* 73 * We don't want to exceed tq_maxalloc, but we can't 74 * wait for other tasks to complete (and thus free up 75 * task structures) without risking deadlock with 76 * the caller. So, we just delay for one second 77 * to throttle the allocation rate. 78 */ 79 delay(hz); 80 } 81 t = kmem_alloc(sizeof (task_t), tqflags); 82 mutex_enter(&tq->tq_lock); 83 if (t != NULL) 84 tq->tq_nalloc++; 85 } 86 return (t); 87 } 88 89 static void 90 task_free(taskq_t *tq, task_t *t) 91 { 92 if (tq->tq_nalloc <= tq->tq_minalloc) { 93 t->task_next = tq->tq_freelist; 94 tq->tq_freelist = t; 95 } else { 96 tq->tq_nalloc--; 97 mutex_exit(&tq->tq_lock); 98 kmem_free(t, sizeof (task_t)); 99 mutex_enter(&tq->tq_lock); 100 } 101 } 102 103 taskqid_t 104 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags) 105 { 106 task_t *t; 107 108 if (taskq_now) { 109 func(arg); 110 return (1); 111 } 112 113 mutex_enter(&tq->tq_lock); 114 ASSERT(tq->tq_flags & TASKQ_ACTIVE); 115 if ((t = task_alloc(tq, tqflags)) == NULL) { 116 mutex_exit(&tq->tq_lock); 117 return (0); 118 } 119 t->task_next = &tq->tq_task; 120 t->task_prev = tq->tq_task.task_prev; 121 t->task_next->task_prev = t; 122 t->task_prev->task_next = t; 123 t->task_func = func; 124 t->task_arg = arg; 125 cv_signal(&tq->tq_dispatch_cv); 126 mutex_exit(&tq->tq_lock); 127 return (1); 128 } 129 130 void 131 taskq_wait(taskq_t *tq) 132 { 133 mutex_enter(&tq->tq_lock); 134 while (tq->tq_task.task_next != &tq->tq_task || tq->tq_active != 0) 135 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 136 mutex_exit(&tq->tq_lock); 137 } 138 139 static void * 140 taskq_thread(void *arg) 141 { 142 taskq_t *tq = arg; 143 task_t *t; 144 145 mutex_enter(&tq->tq_lock); 146 while (tq->tq_flags & TASKQ_ACTIVE) { 147 if ((t = tq->tq_task.task_next) == &tq->tq_task) { 148 if (--tq->tq_active == 0) 149 cv_broadcast(&tq->tq_wait_cv); 150 cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock); 151 tq->tq_active++; 152 continue; 153 } 154 t->task_prev->task_next = t->task_next; 155 t->task_next->task_prev = t->task_prev; 156 mutex_exit(&tq->tq_lock); 157 158 rw_enter(&tq->tq_threadlock, RW_READER); 159 t->task_func(t->task_arg); 160 rw_exit(&tq->tq_threadlock); 161 162 mutex_enter(&tq->tq_lock); 163 task_free(tq, t); 164 } 165 tq->tq_nthreads--; 166 cv_broadcast(&tq->tq_wait_cv); 167 mutex_exit(&tq->tq_lock); 168 return (NULL); 169 } 170 171 /*ARGSUSED*/ 172 taskq_t * 173 taskq_create(const char *name, int nthreads, pri_t pri, 174 int minalloc, int maxalloc, uint_t flags) 175 { 176 taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP); 177 int t; 178 179 rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL); 180 tq->tq_flags = flags | TASKQ_ACTIVE; 181 tq->tq_active = nthreads; 182 tq->tq_nthreads = nthreads; 183 tq->tq_minalloc = minalloc; 184 tq->tq_maxalloc = maxalloc; 185 tq->tq_task.task_next = &tq->tq_task; 186 tq->tq_task.task_prev = &tq->tq_task; 187 tq->tq_threadlist = kmem_alloc(nthreads * sizeof (thread_t), KM_SLEEP); 188 189 if (flags & TASKQ_PREPOPULATE) { 190 mutex_enter(&tq->tq_lock); 191 while (minalloc-- > 0) 192 task_free(tq, task_alloc(tq, KM_SLEEP)); 193 mutex_exit(&tq->tq_lock); 194 } 195 196 for (t = 0; t < nthreads; t++) 197 (void) thr_create(0, 0, taskq_thread, 198 tq, THR_BOUND, &tq->tq_threadlist[t]); 199 200 return (tq); 201 } 202 203 void 204 taskq_destroy(taskq_t *tq) 205 { 206 int t; 207 int nthreads = tq->tq_nthreads; 208 209 taskq_wait(tq); 210 211 mutex_enter(&tq->tq_lock); 212 213 tq->tq_flags &= ~TASKQ_ACTIVE; 214 cv_broadcast(&tq->tq_dispatch_cv); 215 216 while (tq->tq_nthreads != 0) 217 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 218 219 tq->tq_minalloc = 0; 220 while (tq->tq_nalloc != 0) { 221 ASSERT(tq->tq_freelist != NULL); 222 task_free(tq, task_alloc(tq, KM_SLEEP)); 223 } 224 225 mutex_exit(&tq->tq_lock); 226 227 for (t = 0; t < nthreads; t++) 228 (void) thr_join(tq->tq_threadlist[t], NULL, NULL); 229 230 kmem_free(tq->tq_threadlist, nthreads * sizeof (thread_t)); 231 232 rw_destroy(&tq->tq_threadlock); 233 234 kmem_free(tq, sizeof (taskq_t)); 235 } 236 237 int 238 taskq_member(taskq_t *tq, void *t) 239 { 240 int i; 241 242 if (taskq_now) 243 return (1); 244 245 for (i = 0; i < tq->tq_nthreads; i++) 246 if (tq->tq_threadlist[i] == (thread_t)(uintptr_t)t) 247 return (1); 248 249 return (0); 250 } 251