1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or https://opensource.org/licenses/CDDL-1.0. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2010 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 /* 26 * Copyright 2011 Nexenta Systems, Inc. All rights reserved. 27 * Copyright 2012 Garrett D'Amore <garrett@damore.org>. All rights reserved. 28 * Copyright (c) 2014 by Delphix. All rights reserved. 29 */ 30 31 #include <sys/zfs_context.h> 32 33 int taskq_now; 34 taskq_t *system_taskq; 35 taskq_t *system_delay_taskq; 36 37 static pthread_key_t taskq_tsd; 38 39 #define TASKQ_ACTIVE 0x00010000 40 41 static taskq_ent_t * 42 task_alloc(taskq_t *tq, int tqflags) 43 { 44 taskq_ent_t *t; 45 int rv; 46 47 again: if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) { 48 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); 49 tq->tq_freelist = t->tqent_next; 50 } else { 51 if (tq->tq_nalloc >= tq->tq_maxalloc) { 52 if (!(tqflags & KM_SLEEP)) 53 return (NULL); 54 55 /* 56 * We don't want to exceed tq_maxalloc, but we can't 57 * wait for other tasks to complete (and thus free up 58 * task structures) without risking deadlock with 59 * the caller. So, we just delay for one second 60 * to throttle the allocation rate. If we have tasks 61 * complete before one second timeout expires then 62 * taskq_ent_free will signal us and we will 63 * immediately retry the allocation. 64 */ 65 tq->tq_maxalloc_wait++; 66 rv = cv_timedwait(&tq->tq_maxalloc_cv, 67 &tq->tq_lock, ddi_get_lbolt() + hz); 68 tq->tq_maxalloc_wait--; 69 if (rv > 0) 70 goto again; /* signaled */ 71 } 72 mutex_exit(&tq->tq_lock); 73 74 t = kmem_alloc(sizeof (taskq_ent_t), tqflags); 75 76 mutex_enter(&tq->tq_lock); 77 if (t != NULL) { 78 /* Make sure we start without any flags */ 79 t->tqent_flags = 0; 80 tq->tq_nalloc++; 81 } 82 } 83 return (t); 84 } 85 86 static void 87 task_free(taskq_t *tq, taskq_ent_t *t) 88 { 89 if (tq->tq_nalloc <= tq->tq_minalloc) { 90 t->tqent_next = tq->tq_freelist; 91 tq->tq_freelist = t; 92 } else { 93 tq->tq_nalloc--; 94 mutex_exit(&tq->tq_lock); 95 kmem_free(t, sizeof (taskq_ent_t)); 96 mutex_enter(&tq->tq_lock); 97 } 98 99 if (tq->tq_maxalloc_wait) 100 cv_signal(&tq->tq_maxalloc_cv); 101 } 102 103 taskqid_t 104 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags) 105 { 106 taskq_ent_t *t; 107 108 if (taskq_now) { 109 func(arg); 110 return (1); 111 } 112 113 mutex_enter(&tq->tq_lock); 114 ASSERT(tq->tq_flags & TASKQ_ACTIVE); 115 if ((t = task_alloc(tq, tqflags)) == NULL) { 116 mutex_exit(&tq->tq_lock); 117 return (0); 118 } 119 if (tqflags & TQ_FRONT) { 120 t->tqent_next = tq->tq_task.tqent_next; 121 t->tqent_prev = &tq->tq_task; 122 } else { 123 t->tqent_next = &tq->tq_task; 124 t->tqent_prev = tq->tq_task.tqent_prev; 125 } 126 t->tqent_next->tqent_prev = t; 127 t->tqent_prev->tqent_next = t; 128 t->tqent_func = func; 129 t->tqent_arg = arg; 130 t->tqent_flags = 0; 131 cv_signal(&tq->tq_dispatch_cv); 132 mutex_exit(&tq->tq_lock); 133 return (1); 134 } 135 136 taskqid_t 137 taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags, 138 clock_t expire_time) 139 { 140 (void) tq, (void) func, (void) arg, (void) tqflags, (void) expire_time; 141 return (0); 142 } 143 144 int 145 taskq_empty_ent(taskq_ent_t *t) 146 { 147 return (t->tqent_next == NULL); 148 } 149 150 void 151 taskq_init_ent(taskq_ent_t *t) 152 { 153 t->tqent_next = NULL; 154 t->tqent_prev = NULL; 155 t->tqent_func = NULL; 156 t->tqent_arg = NULL; 157 t->tqent_flags = 0; 158 } 159 160 void 161 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, 162 taskq_ent_t *t) 163 { 164 ASSERT(func != NULL); 165 166 /* 167 * Mark it as a prealloc'd task. This is important 168 * to ensure that we don't free it later. 169 */ 170 t->tqent_flags |= TQENT_FLAG_PREALLOC; 171 /* 172 * Enqueue the task to the underlying queue. 173 */ 174 mutex_enter(&tq->tq_lock); 175 176 if (flags & TQ_FRONT) { 177 t->tqent_next = tq->tq_task.tqent_next; 178 t->tqent_prev = &tq->tq_task; 179 } else { 180 t->tqent_next = &tq->tq_task; 181 t->tqent_prev = tq->tq_task.tqent_prev; 182 } 183 t->tqent_next->tqent_prev = t; 184 t->tqent_prev->tqent_next = t; 185 t->tqent_func = func; 186 t->tqent_arg = arg; 187 cv_signal(&tq->tq_dispatch_cv); 188 mutex_exit(&tq->tq_lock); 189 } 190 191 void 192 taskq_wait(taskq_t *tq) 193 { 194 mutex_enter(&tq->tq_lock); 195 while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0) 196 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 197 mutex_exit(&tq->tq_lock); 198 } 199 200 void 201 taskq_wait_id(taskq_t *tq, taskqid_t id) 202 { 203 (void) id; 204 taskq_wait(tq); 205 } 206 207 void 208 taskq_wait_outstanding(taskq_t *tq, taskqid_t id) 209 { 210 (void) id; 211 taskq_wait(tq); 212 } 213 214 static __attribute__((noreturn)) void 215 taskq_thread(void *arg) 216 { 217 taskq_t *tq = arg; 218 taskq_ent_t *t; 219 boolean_t prealloc; 220 221 VERIFY0(pthread_setspecific(taskq_tsd, tq)); 222 223 mutex_enter(&tq->tq_lock); 224 while (tq->tq_flags & TASKQ_ACTIVE) { 225 if ((t = tq->tq_task.tqent_next) == &tq->tq_task) { 226 if (--tq->tq_active == 0) 227 cv_broadcast(&tq->tq_wait_cv); 228 cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock); 229 tq->tq_active++; 230 continue; 231 } 232 t->tqent_prev->tqent_next = t->tqent_next; 233 t->tqent_next->tqent_prev = t->tqent_prev; 234 t->tqent_next = NULL; 235 t->tqent_prev = NULL; 236 prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC; 237 mutex_exit(&tq->tq_lock); 238 239 rw_enter(&tq->tq_threadlock, RW_READER); 240 t->tqent_func(t->tqent_arg); 241 rw_exit(&tq->tq_threadlock); 242 243 mutex_enter(&tq->tq_lock); 244 if (!prealloc) 245 task_free(tq, t); 246 } 247 tq->tq_nthreads--; 248 cv_broadcast(&tq->tq_wait_cv); 249 mutex_exit(&tq->tq_lock); 250 thread_exit(); 251 } 252 253 taskq_t * 254 taskq_create(const char *name, int nthreads, pri_t pri, 255 int minalloc, int maxalloc, uint_t flags) 256 { 257 (void) pri; 258 taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP); 259 int t; 260 261 if (flags & TASKQ_THREADS_CPU_PCT) { 262 int pct; 263 ASSERT3S(nthreads, >=, 0); 264 ASSERT3S(nthreads, <=, 100); 265 pct = MIN(nthreads, 100); 266 pct = MAX(pct, 0); 267 268 nthreads = (sysconf(_SC_NPROCESSORS_ONLN) * pct) / 100; 269 nthreads = MAX(nthreads, 1); /* need at least 1 thread */ 270 } else { 271 ASSERT3S(nthreads, >=, 1); 272 } 273 274 rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL); 275 mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL); 276 cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL); 277 cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL); 278 cv_init(&tq->tq_maxalloc_cv, NULL, CV_DEFAULT, NULL); 279 (void) strlcpy(tq->tq_name, name, sizeof (tq->tq_name)); 280 tq->tq_flags = flags | TASKQ_ACTIVE; 281 tq->tq_active = nthreads; 282 tq->tq_nthreads = nthreads; 283 tq->tq_minalloc = minalloc; 284 tq->tq_maxalloc = maxalloc; 285 tq->tq_task.tqent_next = &tq->tq_task; 286 tq->tq_task.tqent_prev = &tq->tq_task; 287 tq->tq_threadlist = kmem_alloc(nthreads * sizeof (kthread_t *), 288 KM_SLEEP); 289 290 if (flags & TASKQ_PREPOPULATE) { 291 mutex_enter(&tq->tq_lock); 292 while (minalloc-- > 0) 293 task_free(tq, task_alloc(tq, KM_SLEEP)); 294 mutex_exit(&tq->tq_lock); 295 } 296 297 for (t = 0; t < nthreads; t++) 298 VERIFY((tq->tq_threadlist[t] = thread_create_named(tq->tq_name, 299 NULL, 0, taskq_thread, tq, 0, &p0, TS_RUN, pri)) != NULL); 300 301 return (tq); 302 } 303 304 void 305 taskq_destroy(taskq_t *tq) 306 { 307 int nthreads = tq->tq_nthreads; 308 309 taskq_wait(tq); 310 311 mutex_enter(&tq->tq_lock); 312 313 tq->tq_flags &= ~TASKQ_ACTIVE; 314 cv_broadcast(&tq->tq_dispatch_cv); 315 316 while (tq->tq_nthreads != 0) 317 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 318 319 tq->tq_minalloc = 0; 320 while (tq->tq_nalloc != 0) { 321 ASSERT(tq->tq_freelist != NULL); 322 taskq_ent_t *tqent_nexttq = tq->tq_freelist->tqent_next; 323 task_free(tq, tq->tq_freelist); 324 tq->tq_freelist = tqent_nexttq; 325 } 326 327 mutex_exit(&tq->tq_lock); 328 329 kmem_free(tq->tq_threadlist, nthreads * sizeof (kthread_t *)); 330 331 rw_destroy(&tq->tq_threadlock); 332 mutex_destroy(&tq->tq_lock); 333 cv_destroy(&tq->tq_dispatch_cv); 334 cv_destroy(&tq->tq_wait_cv); 335 cv_destroy(&tq->tq_maxalloc_cv); 336 337 kmem_free(tq, sizeof (taskq_t)); 338 } 339 340 /* 341 * Create a taskq with a specified number of pool threads. Allocate 342 * and return an array of nthreads kthread_t pointers, one for each 343 * thread in the pool. The array is not ordered and must be freed 344 * by the caller. 345 */ 346 taskq_t * 347 taskq_create_synced(const char *name, int nthreads, pri_t pri, 348 int minalloc, int maxalloc, uint_t flags, kthread_t ***ktpp) 349 { 350 taskq_t *tq; 351 kthread_t **kthreads = kmem_zalloc(sizeof (*kthreads) * nthreads, 352 KM_SLEEP); 353 354 (void) pri; (void) minalloc; (void) maxalloc; 355 356 flags &= ~(TASKQ_DYNAMIC | TASKQ_THREADS_CPU_PCT | TASKQ_DC_BATCH); 357 358 tq = taskq_create(name, nthreads, minclsyspri, nthreads, INT_MAX, 359 flags | TASKQ_PREPOPULATE); 360 VERIFY(tq != NULL); 361 VERIFY(tq->tq_nthreads == nthreads); 362 363 for (int i = 0; i < nthreads; i++) { 364 kthreads[i] = tq->tq_threadlist[i]; 365 } 366 *ktpp = kthreads; 367 return (tq); 368 } 369 370 int 371 taskq_member(taskq_t *tq, kthread_t *t) 372 { 373 int i; 374 375 if (taskq_now) 376 return (1); 377 378 for (i = 0; i < tq->tq_nthreads; i++) 379 if (tq->tq_threadlist[i] == t) 380 return (1); 381 382 return (0); 383 } 384 385 taskq_t * 386 taskq_of_curthread(void) 387 { 388 return (pthread_getspecific(taskq_tsd)); 389 } 390 391 int 392 taskq_cancel_id(taskq_t *tq, taskqid_t id) 393 { 394 (void) tq, (void) id; 395 return (ENOENT); 396 } 397 398 void 399 system_taskq_init(void) 400 { 401 VERIFY0(pthread_key_create(&taskq_tsd, NULL)); 402 system_taskq = taskq_create("system_taskq", 64, maxclsyspri, 4, 512, 403 TASKQ_DYNAMIC | TASKQ_PREPOPULATE); 404 system_delay_taskq = taskq_create("delay_taskq", 4, maxclsyspri, 4, 405 512, TASKQ_DYNAMIC | TASKQ_PREPOPULATE); 406 } 407 408 void 409 system_taskq_fini(void) 410 { 411 taskq_destroy(system_taskq); 412 system_taskq = NULL; /* defensive */ 413 taskq_destroy(system_delay_taskq); 414 system_delay_taskq = NULL; 415 VERIFY0(pthread_key_delete(taskq_tsd)); 416 } 417