1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2010 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 /* 26 * Copyright 2011 Nexenta Systems, Inc. All rights reserved. 27 * Copyright 2012 Garrett D'Amore <garrett@damore.org>. All rights reserved. 28 */ 29 30 #include <sys/zfs_context.h> 31 32 int taskq_now; 33 taskq_t *system_taskq; 34 35 #define TASKQ_ACTIVE 0x00010000 36 37 struct taskq { 38 kmutex_t tq_lock; 39 krwlock_t tq_threadlock; 40 kcondvar_t tq_dispatch_cv; 41 kcondvar_t tq_wait_cv; 42 thread_t *tq_threadlist; 43 int tq_flags; 44 int tq_active; 45 int tq_nthreads; 46 int tq_nalloc; 47 int tq_minalloc; 48 int tq_maxalloc; 49 kcondvar_t tq_maxalloc_cv; 50 int tq_maxalloc_wait; 51 taskq_ent_t *tq_freelist; 52 taskq_ent_t tq_task; 53 }; 54 55 static taskq_ent_t * 56 task_alloc(taskq_t *tq, int tqflags) 57 { 58 taskq_ent_t *t; 59 int rv; 60 61 again: if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) { 62 tq->tq_freelist = t->tqent_next; 63 } else { 64 if (tq->tq_nalloc >= tq->tq_maxalloc) { 65 if (!(tqflags & KM_SLEEP)) 66 return (NULL); 67 68 /* 69 * We don't want to exceed tq_maxalloc, but we can't 70 * wait for other tasks to complete (and thus free up 71 * task structures) without risking deadlock with 72 * the caller. So, we just delay for one second 73 * to throttle the allocation rate. If we have tasks 74 * complete before one second timeout expires then 75 * taskq_ent_free will signal us and we will 76 * immediately retry the allocation. 77 */ 78 tq->tq_maxalloc_wait++; 79 rv = cv_timedwait(&tq->tq_maxalloc_cv, 80 &tq->tq_lock, ddi_get_lbolt() + hz); 81 tq->tq_maxalloc_wait--; 82 if (rv > 0) 83 goto again; /* signaled */ 84 } 85 mutex_exit(&tq->tq_lock); 86 87 t = kmem_alloc(sizeof (taskq_ent_t), tqflags); 88 89 mutex_enter(&tq->tq_lock); 90 if (t != NULL) 91 tq->tq_nalloc++; 92 } 93 return (t); 94 } 95 96 static void 97 task_free(taskq_t *tq, taskq_ent_t *t) 98 { 99 if (tq->tq_nalloc <= tq->tq_minalloc) { 100 t->tqent_next = tq->tq_freelist; 101 tq->tq_freelist = t; 102 } else { 103 tq->tq_nalloc--; 104 mutex_exit(&tq->tq_lock); 105 kmem_free(t, sizeof (taskq_ent_t)); 106 mutex_enter(&tq->tq_lock); 107 } 108 109 if (tq->tq_maxalloc_wait) 110 cv_signal(&tq->tq_maxalloc_cv); 111 } 112 113 taskqid_t 114 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags) 115 { 116 taskq_ent_t *t; 117 118 if (taskq_now) { 119 func(arg); 120 return (1); 121 } 122 123 mutex_enter(&tq->tq_lock); 124 ASSERT(tq->tq_flags & TASKQ_ACTIVE); 125 if ((t = task_alloc(tq, tqflags)) == NULL) { 126 mutex_exit(&tq->tq_lock); 127 return (0); 128 } 129 if (tqflags & TQ_FRONT) { 130 t->tqent_next = tq->tq_task.tqent_next; 131 t->tqent_prev = &tq->tq_task; 132 } else { 133 t->tqent_next = &tq->tq_task; 134 t->tqent_prev = tq->tq_task.tqent_prev; 135 } 136 t->tqent_next->tqent_prev = t; 137 t->tqent_prev->tqent_next = t; 138 t->tqent_func = func; 139 t->tqent_arg = arg; 140 t->tqent_flags = 0; 141 cv_signal(&tq->tq_dispatch_cv); 142 mutex_exit(&tq->tq_lock); 143 return (1); 144 } 145 146 void 147 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, 148 taskq_ent_t *t) 149 { 150 ASSERT(func != NULL); 151 ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC)); 152 153 /* 154 * Mark it as a prealloc'd task. This is important 155 * to ensure that we don't free it later. 156 */ 157 t->tqent_flags |= TQENT_FLAG_PREALLOC; 158 /* 159 * Enqueue the task to the underlying queue. 160 */ 161 mutex_enter(&tq->tq_lock); 162 163 if (flags & TQ_FRONT) { 164 t->tqent_next = tq->tq_task.tqent_next; 165 t->tqent_prev = &tq->tq_task; 166 } else { 167 t->tqent_next = &tq->tq_task; 168 t->tqent_prev = tq->tq_task.tqent_prev; 169 } 170 t->tqent_next->tqent_prev = t; 171 t->tqent_prev->tqent_next = t; 172 t->tqent_func = func; 173 t->tqent_arg = arg; 174 cv_signal(&tq->tq_dispatch_cv); 175 mutex_exit(&tq->tq_lock); 176 } 177 178 void 179 taskq_wait(taskq_t *tq) 180 { 181 mutex_enter(&tq->tq_lock); 182 while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0) 183 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 184 mutex_exit(&tq->tq_lock); 185 } 186 187 static void * 188 taskq_thread(void *arg) 189 { 190 taskq_t *tq = arg; 191 taskq_ent_t *t; 192 boolean_t prealloc; 193 194 mutex_enter(&tq->tq_lock); 195 while (tq->tq_flags & TASKQ_ACTIVE) { 196 if ((t = tq->tq_task.tqent_next) == &tq->tq_task) { 197 if (--tq->tq_active == 0) 198 cv_broadcast(&tq->tq_wait_cv); 199 cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock); 200 tq->tq_active++; 201 continue; 202 } 203 t->tqent_prev->tqent_next = t->tqent_next; 204 t->tqent_next->tqent_prev = t->tqent_prev; 205 t->tqent_next = NULL; 206 t->tqent_prev = NULL; 207 prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC; 208 mutex_exit(&tq->tq_lock); 209 210 rw_enter(&tq->tq_threadlock, RW_READER); 211 t->tqent_func(t->tqent_arg); 212 rw_exit(&tq->tq_threadlock); 213 214 mutex_enter(&tq->tq_lock); 215 if (!prealloc) 216 task_free(tq, t); 217 } 218 tq->tq_nthreads--; 219 cv_broadcast(&tq->tq_wait_cv); 220 mutex_exit(&tq->tq_lock); 221 return (NULL); 222 } 223 224 /*ARGSUSED*/ 225 taskq_t * 226 taskq_create(const char *name, int nthreads, pri_t pri, 227 int minalloc, int maxalloc, uint_t flags) 228 { 229 taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP); 230 int t; 231 232 if (flags & TASKQ_THREADS_CPU_PCT) { 233 int pct; 234 ASSERT3S(nthreads, >=, 0); 235 ASSERT3S(nthreads, <=, 100); 236 pct = MIN(nthreads, 100); 237 pct = MAX(pct, 0); 238 239 nthreads = (sysconf(_SC_NPROCESSORS_ONLN) * pct) / 100; 240 nthreads = MAX(nthreads, 1); /* need at least 1 thread */ 241 } else { 242 ASSERT3S(nthreads, >=, 1); 243 } 244 245 rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL); 246 mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL); 247 cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL); 248 cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL); 249 cv_init(&tq->tq_maxalloc_cv, NULL, CV_DEFAULT, NULL); 250 tq->tq_flags = flags | TASKQ_ACTIVE; 251 tq->tq_active = nthreads; 252 tq->tq_nthreads = nthreads; 253 tq->tq_minalloc = minalloc; 254 tq->tq_maxalloc = maxalloc; 255 tq->tq_task.tqent_next = &tq->tq_task; 256 tq->tq_task.tqent_prev = &tq->tq_task; 257 tq->tq_threadlist = kmem_alloc(nthreads * sizeof (thread_t), KM_SLEEP); 258 259 if (flags & TASKQ_PREPOPULATE) { 260 mutex_enter(&tq->tq_lock); 261 while (minalloc-- > 0) 262 task_free(tq, task_alloc(tq, KM_SLEEP)); 263 mutex_exit(&tq->tq_lock); 264 } 265 266 for (t = 0; t < nthreads; t++) 267 (void) thr_create(0, 0, taskq_thread, 268 tq, THR_BOUND, &tq->tq_threadlist[t]); 269 270 return (tq); 271 } 272 273 void 274 taskq_destroy(taskq_t *tq) 275 { 276 int t; 277 int nthreads = tq->tq_nthreads; 278 279 taskq_wait(tq); 280 281 mutex_enter(&tq->tq_lock); 282 283 tq->tq_flags &= ~TASKQ_ACTIVE; 284 cv_broadcast(&tq->tq_dispatch_cv); 285 286 while (tq->tq_nthreads != 0) 287 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 288 289 tq->tq_minalloc = 0; 290 while (tq->tq_nalloc != 0) { 291 ASSERT(tq->tq_freelist != NULL); 292 task_free(tq, task_alloc(tq, KM_SLEEP)); 293 } 294 295 mutex_exit(&tq->tq_lock); 296 297 for (t = 0; t < nthreads; t++) 298 (void) thr_join(tq->tq_threadlist[t], NULL, NULL); 299 300 kmem_free(tq->tq_threadlist, nthreads * sizeof (thread_t)); 301 302 rw_destroy(&tq->tq_threadlock); 303 mutex_destroy(&tq->tq_lock); 304 cv_destroy(&tq->tq_dispatch_cv); 305 cv_destroy(&tq->tq_wait_cv); 306 cv_destroy(&tq->tq_maxalloc_cv); 307 308 kmem_free(tq, sizeof (taskq_t)); 309 } 310 311 int 312 taskq_member(taskq_t *tq, void *t) 313 { 314 int i; 315 316 if (taskq_now) 317 return (1); 318 319 for (i = 0; i < tq->tq_nthreads; i++) 320 if (tq->tq_threadlist[i] == (thread_t)(uintptr_t)t) 321 return (1); 322 323 return (0); 324 } 325 326 void 327 system_taskq_init(void) 328 { 329 system_taskq = taskq_create("system_taskq", 64, minclsyspri, 4, 512, 330 TASKQ_DYNAMIC | TASKQ_PREPOPULATE); 331 } 332 333 void 334 system_taskq_fini(void) 335 { 336 taskq_destroy(system_taskq); 337 system_taskq = NULL; /* defensive */ 338 } 339