1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2010 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 /* 26 * Copyright 2011 Nexenta Systems, Inc. All rights reserved. 27 */ 28 29 #include <sys/zfs_context.h> 30 31 int taskq_now; 32 taskq_t *system_taskq; 33 34 #define TASKQ_ACTIVE 0x00010000 35 36 struct taskq { 37 kmutex_t tq_lock; 38 krwlock_t tq_threadlock; 39 kcondvar_t tq_dispatch_cv; 40 kcondvar_t tq_wait_cv; 41 thread_t *tq_threadlist; 42 int tq_flags; 43 int tq_active; 44 int tq_nthreads; 45 int tq_nalloc; 46 int tq_minalloc; 47 int tq_maxalloc; 48 kcondvar_t tq_maxalloc_cv; 49 int tq_maxalloc_wait; 50 taskq_ent_t *tq_freelist; 51 taskq_ent_t tq_task; 52 }; 53 54 static taskq_ent_t * 55 task_alloc(taskq_t *tq, int tqflags) 56 { 57 taskq_ent_t *t; 58 int rv; 59 60 again: if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) { 61 tq->tq_freelist = t->tqent_next; 62 } else { 63 if (tq->tq_nalloc >= tq->tq_maxalloc) { 64 if (!(tqflags & KM_SLEEP)) 65 return (NULL); 66 67 /* 68 * We don't want to exceed tq_maxalloc, but we can't 69 * wait for other tasks to complete (and thus free up 70 * task structures) without risking deadlock with 71 * the caller. So, we just delay for one second 72 * to throttle the allocation rate. If we have tasks 73 * complete before one second timeout expires then 74 * taskq_ent_free will signal us and we will 75 * immediately retry the allocation. 76 */ 77 tq->tq_maxalloc_wait++; 78 rv = cv_timedwait(&tq->tq_maxalloc_cv, 79 &tq->tq_lock, ddi_get_lbolt() + hz); 80 tq->tq_maxalloc_wait--; 81 if (rv > 0) 82 goto again; /* signaled */ 83 } 84 mutex_exit(&tq->tq_lock); 85 86 t = kmem_alloc(sizeof (taskq_ent_t), tqflags); 87 88 mutex_enter(&tq->tq_lock); 89 if (t != NULL) 90 tq->tq_nalloc++; 91 } 92 return (t); 93 } 94 95 static void 96 task_free(taskq_t *tq, taskq_ent_t *t) 97 { 98 if (tq->tq_nalloc <= tq->tq_minalloc) { 99 t->tqent_next = tq->tq_freelist; 100 tq->tq_freelist = t; 101 } else { 102 tq->tq_nalloc--; 103 mutex_exit(&tq->tq_lock); 104 kmem_free(t, sizeof (taskq_ent_t)); 105 mutex_enter(&tq->tq_lock); 106 } 107 108 if (tq->tq_maxalloc_wait) 109 cv_signal(&tq->tq_maxalloc_cv); 110 } 111 112 taskqid_t 113 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags) 114 { 115 taskq_ent_t *t; 116 117 if (taskq_now) { 118 func(arg); 119 return (1); 120 } 121 122 mutex_enter(&tq->tq_lock); 123 ASSERT(tq->tq_flags & TASKQ_ACTIVE); 124 if ((t = task_alloc(tq, tqflags)) == NULL) { 125 mutex_exit(&tq->tq_lock); 126 return (0); 127 } 128 if (tqflags & TQ_FRONT) { 129 t->tqent_next = tq->tq_task.tqent_next; 130 t->tqent_prev = &tq->tq_task; 131 } else { 132 t->tqent_next = &tq->tq_task; 133 t->tqent_prev = tq->tq_task.tqent_prev; 134 } 135 t->tqent_next->tqent_prev = t; 136 t->tqent_prev->tqent_next = t; 137 t->tqent_func = func; 138 t->tqent_arg = arg; 139 cv_signal(&tq->tq_dispatch_cv); 140 mutex_exit(&tq->tq_lock); 141 return (1); 142 } 143 144 void 145 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, 146 taskq_ent_t *t) 147 { 148 ASSERT(func != NULL); 149 ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC)); 150 151 /* 152 * Mark it as a prealloc'd task. This is important 153 * to ensure that we don't free it later. 154 */ 155 t->tqent_flags |= TQENT_FLAG_PREALLOC; 156 /* 157 * Enqueue the task to the underlying queue. 158 */ 159 mutex_enter(&tq->tq_lock); 160 161 if (flags & TQ_FRONT) { 162 t->tqent_next = tq->tq_task.tqent_next; 163 t->tqent_prev = &tq->tq_task; 164 } else { 165 t->tqent_next = &tq->tq_task; 166 t->tqent_prev = tq->tq_task.tqent_prev; 167 } 168 t->tqent_next->tqent_prev = t; 169 t->tqent_prev->tqent_next = t; 170 t->tqent_func = func; 171 t->tqent_arg = arg; 172 cv_signal(&tq->tq_dispatch_cv); 173 mutex_exit(&tq->tq_lock); 174 } 175 176 void 177 taskq_wait(taskq_t *tq) 178 { 179 mutex_enter(&tq->tq_lock); 180 while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0) 181 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 182 mutex_exit(&tq->tq_lock); 183 } 184 185 static void * 186 taskq_thread(void *arg) 187 { 188 taskq_t *tq = arg; 189 taskq_ent_t *t; 190 boolean_t prealloc; 191 192 mutex_enter(&tq->tq_lock); 193 while (tq->tq_flags & TASKQ_ACTIVE) { 194 if ((t = tq->tq_task.tqent_next) == &tq->tq_task) { 195 if (--tq->tq_active == 0) 196 cv_broadcast(&tq->tq_wait_cv); 197 cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock); 198 tq->tq_active++; 199 continue; 200 } 201 t->tqent_prev->tqent_next = t->tqent_next; 202 t->tqent_next->tqent_prev = t->tqent_prev; 203 t->tqent_next = NULL; 204 t->tqent_prev = NULL; 205 prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC; 206 mutex_exit(&tq->tq_lock); 207 208 rw_enter(&tq->tq_threadlock, RW_READER); 209 t->tqent_func(t->tqent_arg); 210 rw_exit(&tq->tq_threadlock); 211 212 mutex_enter(&tq->tq_lock); 213 if (!prealloc) 214 task_free(tq, t); 215 } 216 tq->tq_nthreads--; 217 cv_broadcast(&tq->tq_wait_cv); 218 mutex_exit(&tq->tq_lock); 219 return (NULL); 220 } 221 222 /*ARGSUSED*/ 223 taskq_t * 224 taskq_create(const char *name, int nthreads, pri_t pri, 225 int minalloc, int maxalloc, uint_t flags) 226 { 227 taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP); 228 int t; 229 230 if (flags & TASKQ_THREADS_CPU_PCT) { 231 int pct; 232 ASSERT3S(nthreads, >=, 0); 233 ASSERT3S(nthreads, <=, 100); 234 pct = MIN(nthreads, 100); 235 pct = MAX(pct, 0); 236 237 nthreads = (sysconf(_SC_NPROCESSORS_ONLN) * pct) / 100; 238 nthreads = MAX(nthreads, 1); /* need at least 1 thread */ 239 } else { 240 ASSERT3S(nthreads, >=, 1); 241 } 242 243 rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL); 244 mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL); 245 cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL); 246 cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL); 247 cv_init(&tq->tq_maxalloc_cv, NULL, CV_DEFAULT, NULL); 248 tq->tq_flags = flags | TASKQ_ACTIVE; 249 tq->tq_active = nthreads; 250 tq->tq_nthreads = nthreads; 251 tq->tq_minalloc = minalloc; 252 tq->tq_maxalloc = maxalloc; 253 tq->tq_task.tqent_next = &tq->tq_task; 254 tq->tq_task.tqent_prev = &tq->tq_task; 255 tq->tq_threadlist = kmem_alloc(nthreads * sizeof (thread_t), KM_SLEEP); 256 257 if (flags & TASKQ_PREPOPULATE) { 258 mutex_enter(&tq->tq_lock); 259 while (minalloc-- > 0) 260 task_free(tq, task_alloc(tq, KM_SLEEP)); 261 mutex_exit(&tq->tq_lock); 262 } 263 264 for (t = 0; t < nthreads; t++) 265 (void) thr_create(0, 0, taskq_thread, 266 tq, THR_BOUND, &tq->tq_threadlist[t]); 267 268 return (tq); 269 } 270 271 void 272 taskq_destroy(taskq_t *tq) 273 { 274 int t; 275 int nthreads = tq->tq_nthreads; 276 277 taskq_wait(tq); 278 279 mutex_enter(&tq->tq_lock); 280 281 tq->tq_flags &= ~TASKQ_ACTIVE; 282 cv_broadcast(&tq->tq_dispatch_cv); 283 284 while (tq->tq_nthreads != 0) 285 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 286 287 tq->tq_minalloc = 0; 288 while (tq->tq_nalloc != 0) { 289 ASSERT(tq->tq_freelist != NULL); 290 task_free(tq, task_alloc(tq, KM_SLEEP)); 291 } 292 293 mutex_exit(&tq->tq_lock); 294 295 for (t = 0; t < nthreads; t++) 296 (void) thr_join(tq->tq_threadlist[t], NULL, NULL); 297 298 kmem_free(tq->tq_threadlist, nthreads * sizeof (thread_t)); 299 300 rw_destroy(&tq->tq_threadlock); 301 mutex_destroy(&tq->tq_lock); 302 cv_destroy(&tq->tq_dispatch_cv); 303 cv_destroy(&tq->tq_wait_cv); 304 cv_destroy(&tq->tq_maxalloc_cv); 305 306 kmem_free(tq, sizeof (taskq_t)); 307 } 308 309 int 310 taskq_member(taskq_t *tq, void *t) 311 { 312 int i; 313 314 if (taskq_now) 315 return (1); 316 317 for (i = 0; i < tq->tq_nthreads; i++) 318 if (tq->tq_threadlist[i] == (thread_t)(uintptr_t)t) 319 return (1); 320 321 return (0); 322 } 323 324 void 325 system_taskq_init(void) 326 { 327 system_taskq = taskq_create("system_taskq", 64, minclsyspri, 4, 512, 328 TASKQ_DYNAMIC | TASKQ_PREPOPULATE); 329 } 330 331 void 332 system_taskq_fini(void) 333 { 334 taskq_destroy(system_taskq); 335 system_taskq = NULL; /* defensive */ 336 } 337