1 // SPDX-License-Identifier: CDDL-1.0 2 /* 3 * CDDL HEADER START 4 * 5 * The contents of this file are subject to the terms of the 6 * Common Development and Distribution License (the "License"). 7 * You may not use this file except in compliance with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or https://opensource.org/licenses/CDDL-1.0. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 /* 23 * Copyright 2010 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 /* 27 * Copyright 2011 Nexenta Systems, Inc. All rights reserved. 28 * Copyright 2012 Garrett D'Amore <garrett@damore.org>. All rights reserved. 29 * Copyright (c) 2014 by Delphix. All rights reserved. 30 */ 31 32 #include <sys/sysmacros.h> 33 #include <sys/timer.h> 34 #include <sys/types.h> 35 #include <sys/thread.h> 36 #include <sys/taskq.h> 37 #include <sys/kmem.h> 38 39 static taskq_t *__system_taskq = NULL; 40 static taskq_t *__system_delay_taskq = NULL; 41 42 taskq_t 43 *_system_taskq(void) 44 { 45 return (__system_taskq); 46 } 47 48 taskq_t 49 *_system_delay_taskq(void) 50 { 51 return (__system_delay_taskq); 52 } 53 54 static pthread_key_t taskq_tsd; 55 56 #define TASKQ_ACTIVE 0x00010000 57 58 static taskq_ent_t * 59 task_alloc(taskq_t *tq, int tqflags) 60 { 61 taskq_ent_t *t; 62 int rv; 63 64 again: if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) { 65 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); 66 tq->tq_freelist = t->tqent_next; 67 } else { 68 if (tq->tq_nalloc >= tq->tq_maxalloc) { 69 if (!(tqflags & KM_SLEEP)) 70 return (NULL); 71 72 /* 73 * We don't want to exceed tq_maxalloc, but we can't 74 * wait for other tasks to complete (and thus free up 75 * task structures) without risking deadlock with 76 * the caller. So, we just delay for one second 77 * to throttle the allocation rate. If we have tasks 78 * complete before one second timeout expires then 79 * taskq_ent_free will signal us and we will 80 * immediately retry the allocation. 81 */ 82 tq->tq_maxalloc_wait++; 83 rv = cv_timedwait(&tq->tq_maxalloc_cv, 84 &tq->tq_lock, ddi_get_lbolt() + hz); 85 tq->tq_maxalloc_wait--; 86 if (rv > 0) 87 goto again; /* signaled */ 88 } 89 mutex_exit(&tq->tq_lock); 90 91 t = kmem_alloc(sizeof (taskq_ent_t), tqflags); 92 93 mutex_enter(&tq->tq_lock); 94 if (t != NULL) { 95 /* Make sure we start without any flags */ 96 t->tqent_flags = 0; 97 tq->tq_nalloc++; 98 } 99 } 100 return (t); 101 } 102 103 static void 104 task_free(taskq_t *tq, taskq_ent_t *t) 105 { 106 if (tq->tq_nalloc <= tq->tq_minalloc) { 107 t->tqent_next = tq->tq_freelist; 108 tq->tq_freelist = t; 109 } else { 110 tq->tq_nalloc--; 111 mutex_exit(&tq->tq_lock); 112 kmem_free(t, sizeof (taskq_ent_t)); 113 mutex_enter(&tq->tq_lock); 114 } 115 116 if (tq->tq_maxalloc_wait) 117 cv_signal(&tq->tq_maxalloc_cv); 118 } 119 120 taskqid_t 121 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags) 122 { 123 taskq_ent_t *t; 124 125 mutex_enter(&tq->tq_lock); 126 ASSERT(tq->tq_flags & TASKQ_ACTIVE); 127 if ((t = task_alloc(tq, tqflags)) == NULL) { 128 mutex_exit(&tq->tq_lock); 129 return (0); 130 } 131 if (tqflags & TQ_FRONT) { 132 t->tqent_next = tq->tq_task.tqent_next; 133 t->tqent_prev = &tq->tq_task; 134 } else { 135 t->tqent_next = &tq->tq_task; 136 t->tqent_prev = tq->tq_task.tqent_prev; 137 } 138 t->tqent_next->tqent_prev = t; 139 t->tqent_prev->tqent_next = t; 140 t->tqent_func = func; 141 t->tqent_arg = arg; 142 t->tqent_flags = 0; 143 cv_signal(&tq->tq_dispatch_cv); 144 mutex_exit(&tq->tq_lock); 145 return (1); 146 } 147 148 taskqid_t 149 taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags, 150 clock_t expire_time) 151 { 152 (void) tq, (void) func, (void) arg, (void) tqflags, (void) expire_time; 153 return (0); 154 } 155 156 int 157 taskq_empty_ent(taskq_ent_t *t) 158 { 159 return (t->tqent_next == NULL); 160 } 161 162 void 163 taskq_init_ent(taskq_ent_t *t) 164 { 165 t->tqent_next = NULL; 166 t->tqent_prev = NULL; 167 t->tqent_func = NULL; 168 t->tqent_arg = NULL; 169 t->tqent_flags = 0; 170 } 171 172 void 173 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, 174 taskq_ent_t *t) 175 { 176 ASSERT(func != NULL); 177 178 /* 179 * Mark it as a prealloc'd task. This is important 180 * to ensure that we don't free it later. 181 */ 182 t->tqent_flags |= TQENT_FLAG_PREALLOC; 183 /* 184 * Enqueue the task to the underlying queue. 185 */ 186 mutex_enter(&tq->tq_lock); 187 188 if (flags & TQ_FRONT) { 189 t->tqent_next = tq->tq_task.tqent_next; 190 t->tqent_prev = &tq->tq_task; 191 } else { 192 t->tqent_next = &tq->tq_task; 193 t->tqent_prev = tq->tq_task.tqent_prev; 194 } 195 t->tqent_next->tqent_prev = t; 196 t->tqent_prev->tqent_next = t; 197 t->tqent_func = func; 198 t->tqent_arg = arg; 199 cv_signal(&tq->tq_dispatch_cv); 200 mutex_exit(&tq->tq_lock); 201 } 202 203 void 204 taskq_wait(taskq_t *tq) 205 { 206 mutex_enter(&tq->tq_lock); 207 while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0) 208 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 209 mutex_exit(&tq->tq_lock); 210 } 211 212 void 213 taskq_wait_id(taskq_t *tq, taskqid_t id) 214 { 215 (void) id; 216 taskq_wait(tq); 217 } 218 219 void 220 taskq_wait_outstanding(taskq_t *tq, taskqid_t id) 221 { 222 (void) id; 223 taskq_wait(tq); 224 } 225 226 static __attribute__((noreturn)) void 227 taskq_thread(void *arg) 228 { 229 taskq_t *tq = arg; 230 taskq_ent_t *t; 231 boolean_t prealloc; 232 233 VERIFY0(pthread_setspecific(taskq_tsd, tq)); 234 235 mutex_enter(&tq->tq_lock); 236 while (tq->tq_flags & TASKQ_ACTIVE) { 237 if ((t = tq->tq_task.tqent_next) == &tq->tq_task) { 238 if (--tq->tq_active == 0) 239 cv_broadcast(&tq->tq_wait_cv); 240 cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock); 241 tq->tq_active++; 242 continue; 243 } 244 t->tqent_prev->tqent_next = t->tqent_next; 245 t->tqent_next->tqent_prev = t->tqent_prev; 246 t->tqent_next = NULL; 247 t->tqent_prev = NULL; 248 prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC; 249 mutex_exit(&tq->tq_lock); 250 251 rw_enter(&tq->tq_threadlock, RW_READER); 252 t->tqent_func(t->tqent_arg); 253 rw_exit(&tq->tq_threadlock); 254 255 mutex_enter(&tq->tq_lock); 256 if (!prealloc) 257 task_free(tq, t); 258 } 259 tq->tq_nthreads--; 260 cv_broadcast(&tq->tq_wait_cv); 261 mutex_exit(&tq->tq_lock); 262 thread_exit(); 263 } 264 265 taskq_t * 266 taskq_create(const char *name, int nthreads, pri_t pri, 267 int minalloc, int maxalloc, uint_t flags) 268 { 269 (void) pri; 270 taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP); 271 int t; 272 273 if (flags & TASKQ_THREADS_CPU_PCT) { 274 int pct; 275 ASSERT3S(nthreads, >=, 0); 276 ASSERT3S(nthreads, <=, 100); 277 pct = MIN(nthreads, 100); 278 pct = MAX(pct, 0); 279 280 nthreads = (sysconf(_SC_NPROCESSORS_ONLN) * pct) / 100; 281 nthreads = MAX(nthreads, 1); /* need at least 1 thread */ 282 } else { 283 ASSERT3S(nthreads, >=, 1); 284 } 285 286 rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL); 287 mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL); 288 cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL); 289 cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL); 290 cv_init(&tq->tq_maxalloc_cv, NULL, CV_DEFAULT, NULL); 291 (void) strlcpy(tq->tq_name, name, sizeof (tq->tq_name)); 292 tq->tq_flags = flags | TASKQ_ACTIVE; 293 tq->tq_active = nthreads; 294 tq->tq_nthreads = nthreads; 295 tq->tq_minalloc = minalloc; 296 tq->tq_maxalloc = maxalloc; 297 tq->tq_task.tqent_next = &tq->tq_task; 298 tq->tq_task.tqent_prev = &tq->tq_task; 299 tq->tq_threadlist = kmem_alloc(nthreads * sizeof (kthread_t *), 300 KM_SLEEP); 301 302 if (flags & TASKQ_PREPOPULATE) { 303 mutex_enter(&tq->tq_lock); 304 while (minalloc-- > 0) 305 task_free(tq, task_alloc(tq, KM_SLEEP)); 306 mutex_exit(&tq->tq_lock); 307 } 308 309 for (t = 0; t < nthreads; t++) 310 VERIFY((tq->tq_threadlist[t] = thread_create_named(tq->tq_name, 311 NULL, 0, taskq_thread, tq, 0, &p0, TS_RUN, pri)) != NULL); 312 313 return (tq); 314 } 315 316 void 317 taskq_destroy(taskq_t *tq) 318 { 319 int nthreads = tq->tq_nthreads; 320 321 taskq_wait(tq); 322 323 mutex_enter(&tq->tq_lock); 324 325 tq->tq_flags &= ~TASKQ_ACTIVE; 326 cv_broadcast(&tq->tq_dispatch_cv); 327 328 while (tq->tq_nthreads != 0) 329 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 330 331 tq->tq_minalloc = 0; 332 while (tq->tq_nalloc != 0) { 333 ASSERT(tq->tq_freelist != NULL); 334 taskq_ent_t *tqent_nexttq = tq->tq_freelist->tqent_next; 335 task_free(tq, tq->tq_freelist); 336 tq->tq_freelist = tqent_nexttq; 337 } 338 339 mutex_exit(&tq->tq_lock); 340 341 kmem_free(tq->tq_threadlist, nthreads * sizeof (kthread_t *)); 342 343 rw_destroy(&tq->tq_threadlock); 344 mutex_destroy(&tq->tq_lock); 345 cv_destroy(&tq->tq_dispatch_cv); 346 cv_destroy(&tq->tq_wait_cv); 347 cv_destroy(&tq->tq_maxalloc_cv); 348 349 kmem_free(tq, sizeof (taskq_t)); 350 } 351 352 /* 353 * Create a taskq with a specified number of pool threads. Allocate 354 * and return an array of nthreads kthread_t pointers, one for each 355 * thread in the pool. The array is not ordered and must be freed 356 * by the caller. 357 */ 358 taskq_t * 359 taskq_create_synced(const char *name, int nthreads, pri_t pri, 360 int minalloc, int maxalloc, uint_t flags, kthread_t ***ktpp) 361 { 362 taskq_t *tq; 363 kthread_t **kthreads = kmem_zalloc(sizeof (*kthreads) * nthreads, 364 KM_SLEEP); 365 366 (void) pri; (void) minalloc; (void) maxalloc; 367 368 flags &= ~(TASKQ_DYNAMIC | TASKQ_THREADS_CPU_PCT | TASKQ_DC_BATCH); 369 370 tq = taskq_create(name, nthreads, minclsyspri, nthreads, INT_MAX, 371 flags | TASKQ_PREPOPULATE); 372 VERIFY(tq != NULL); 373 VERIFY(tq->tq_nthreads == nthreads); 374 375 for (int i = 0; i < nthreads; i++) { 376 kthreads[i] = tq->tq_threadlist[i]; 377 } 378 *ktpp = kthreads; 379 return (tq); 380 } 381 382 int 383 taskq_member(taskq_t *tq, kthread_t *t) 384 { 385 int i; 386 387 for (i = 0; i < tq->tq_nthreads; i++) 388 if (tq->tq_threadlist[i] == t) 389 return (1); 390 391 return (0); 392 } 393 394 taskq_t * 395 taskq_of_curthread(void) 396 { 397 return (pthread_getspecific(taskq_tsd)); 398 } 399 400 int 401 taskq_cancel_id(taskq_t *tq, taskqid_t id) 402 { 403 (void) tq, (void) id; 404 return (ENOENT); 405 } 406 407 void 408 system_taskq_init(void) 409 { 410 VERIFY0(pthread_key_create(&taskq_tsd, NULL)); 411 __system_taskq = taskq_create("system_taskq", 64, maxclsyspri, 4, 512, 412 TASKQ_DYNAMIC | TASKQ_PREPOPULATE); 413 __system_delay_taskq = taskq_create("delay_taskq", 4, maxclsyspri, 4, 414 512, TASKQ_DYNAMIC | TASKQ_PREPOPULATE); 415 } 416 417 void 418 system_taskq_fini(void) 419 { 420 taskq_destroy(__system_taskq); 421 __system_taskq = NULL; /* defensive */ 422 taskq_destroy(__system_delay_taskq); 423 __system_delay_taskq = NULL; 424 VERIFY0(pthread_key_delete(taskq_tsd)); 425 } 426