1 // SPDX-License-Identifier: CDDL-1.0 2 /* 3 * CDDL HEADER START 4 * 5 * The contents of this file are subject to the terms of the 6 * Common Development and Distribution License (the "License"). 7 * You may not use this file except in compliance with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or https://opensource.org/licenses/CDDL-1.0. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 /* 23 * Copyright 2010 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 /* 27 * Copyright 2011 Nexenta Systems, Inc. All rights reserved. 28 * Copyright 2012 Garrett D'Amore <garrett@damore.org>. All rights reserved. 29 * Copyright (c) 2014 by Delphix. All rights reserved. 30 */ 31 32 #include <sys/zfs_context.h> 33 34 int taskq_now; 35 taskq_t *system_taskq; 36 taskq_t *system_delay_taskq; 37 38 static pthread_key_t taskq_tsd; 39 40 #define TASKQ_ACTIVE 0x00010000 41 42 static taskq_ent_t * 43 task_alloc(taskq_t *tq, int tqflags) 44 { 45 taskq_ent_t *t; 46 int rv; 47 48 again: if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) { 49 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); 50 tq->tq_freelist = t->tqent_next; 51 } else { 52 if (tq->tq_nalloc >= tq->tq_maxalloc) { 53 if (!(tqflags & KM_SLEEP)) 54 return (NULL); 55 56 /* 57 * We don't want to exceed tq_maxalloc, but we can't 58 * wait for other tasks to complete (and thus free up 59 * task structures) without risking deadlock with 60 * the caller. So, we just delay for one second 61 * to throttle the allocation rate. If we have tasks 62 * complete before one second timeout expires then 63 * taskq_ent_free will signal us and we will 64 * immediately retry the allocation. 65 */ 66 tq->tq_maxalloc_wait++; 67 rv = cv_timedwait(&tq->tq_maxalloc_cv, 68 &tq->tq_lock, ddi_get_lbolt() + hz); 69 tq->tq_maxalloc_wait--; 70 if (rv > 0) 71 goto again; /* signaled */ 72 } 73 mutex_exit(&tq->tq_lock); 74 75 t = kmem_alloc(sizeof (taskq_ent_t), tqflags); 76 77 mutex_enter(&tq->tq_lock); 78 if (t != NULL) { 79 /* Make sure we start without any flags */ 80 t->tqent_flags = 0; 81 tq->tq_nalloc++; 82 } 83 } 84 return (t); 85 } 86 87 static void 88 task_free(taskq_t *tq, taskq_ent_t *t) 89 { 90 if (tq->tq_nalloc <= tq->tq_minalloc) { 91 t->tqent_next = tq->tq_freelist; 92 tq->tq_freelist = t; 93 } else { 94 tq->tq_nalloc--; 95 mutex_exit(&tq->tq_lock); 96 kmem_free(t, sizeof (taskq_ent_t)); 97 mutex_enter(&tq->tq_lock); 98 } 99 100 if (tq->tq_maxalloc_wait) 101 cv_signal(&tq->tq_maxalloc_cv); 102 } 103 104 taskqid_t 105 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags) 106 { 107 taskq_ent_t *t; 108 109 if (taskq_now) { 110 func(arg); 111 return (1); 112 } 113 114 mutex_enter(&tq->tq_lock); 115 ASSERT(tq->tq_flags & TASKQ_ACTIVE); 116 if ((t = task_alloc(tq, tqflags)) == NULL) { 117 mutex_exit(&tq->tq_lock); 118 return (0); 119 } 120 if (tqflags & TQ_FRONT) { 121 t->tqent_next = tq->tq_task.tqent_next; 122 t->tqent_prev = &tq->tq_task; 123 } else { 124 t->tqent_next = &tq->tq_task; 125 t->tqent_prev = tq->tq_task.tqent_prev; 126 } 127 t->tqent_next->tqent_prev = t; 128 t->tqent_prev->tqent_next = t; 129 t->tqent_func = func; 130 t->tqent_arg = arg; 131 t->tqent_flags = 0; 132 cv_signal(&tq->tq_dispatch_cv); 133 mutex_exit(&tq->tq_lock); 134 return (1); 135 } 136 137 taskqid_t 138 taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags, 139 clock_t expire_time) 140 { 141 (void) tq, (void) func, (void) arg, (void) tqflags, (void) expire_time; 142 return (0); 143 } 144 145 int 146 taskq_empty_ent(taskq_ent_t *t) 147 { 148 return (t->tqent_next == NULL); 149 } 150 151 void 152 taskq_init_ent(taskq_ent_t *t) 153 { 154 t->tqent_next = NULL; 155 t->tqent_prev = NULL; 156 t->tqent_func = NULL; 157 t->tqent_arg = NULL; 158 t->tqent_flags = 0; 159 } 160 161 void 162 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, 163 taskq_ent_t *t) 164 { 165 ASSERT(func != NULL); 166 167 /* 168 * Mark it as a prealloc'd task. This is important 169 * to ensure that we don't free it later. 170 */ 171 t->tqent_flags |= TQENT_FLAG_PREALLOC; 172 /* 173 * Enqueue the task to the underlying queue. 174 */ 175 mutex_enter(&tq->tq_lock); 176 177 if (flags & TQ_FRONT) { 178 t->tqent_next = tq->tq_task.tqent_next; 179 t->tqent_prev = &tq->tq_task; 180 } else { 181 t->tqent_next = &tq->tq_task; 182 t->tqent_prev = tq->tq_task.tqent_prev; 183 } 184 t->tqent_next->tqent_prev = t; 185 t->tqent_prev->tqent_next = t; 186 t->tqent_func = func; 187 t->tqent_arg = arg; 188 cv_signal(&tq->tq_dispatch_cv); 189 mutex_exit(&tq->tq_lock); 190 } 191 192 void 193 taskq_wait(taskq_t *tq) 194 { 195 mutex_enter(&tq->tq_lock); 196 while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0) 197 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 198 mutex_exit(&tq->tq_lock); 199 } 200 201 void 202 taskq_wait_id(taskq_t *tq, taskqid_t id) 203 { 204 (void) id; 205 taskq_wait(tq); 206 } 207 208 void 209 taskq_wait_outstanding(taskq_t *tq, taskqid_t id) 210 { 211 (void) id; 212 taskq_wait(tq); 213 } 214 215 static __attribute__((noreturn)) void 216 taskq_thread(void *arg) 217 { 218 taskq_t *tq = arg; 219 taskq_ent_t *t; 220 boolean_t prealloc; 221 222 VERIFY0(pthread_setspecific(taskq_tsd, tq)); 223 224 mutex_enter(&tq->tq_lock); 225 while (tq->tq_flags & TASKQ_ACTIVE) { 226 if ((t = tq->tq_task.tqent_next) == &tq->tq_task) { 227 if (--tq->tq_active == 0) 228 cv_broadcast(&tq->tq_wait_cv); 229 cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock); 230 tq->tq_active++; 231 continue; 232 } 233 t->tqent_prev->tqent_next = t->tqent_next; 234 t->tqent_next->tqent_prev = t->tqent_prev; 235 t->tqent_next = NULL; 236 t->tqent_prev = NULL; 237 prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC; 238 mutex_exit(&tq->tq_lock); 239 240 rw_enter(&tq->tq_threadlock, RW_READER); 241 t->tqent_func(t->tqent_arg); 242 rw_exit(&tq->tq_threadlock); 243 244 mutex_enter(&tq->tq_lock); 245 if (!prealloc) 246 task_free(tq, t); 247 } 248 tq->tq_nthreads--; 249 cv_broadcast(&tq->tq_wait_cv); 250 mutex_exit(&tq->tq_lock); 251 thread_exit(); 252 } 253 254 taskq_t * 255 taskq_create(const char *name, int nthreads, pri_t pri, 256 int minalloc, int maxalloc, uint_t flags) 257 { 258 (void) pri; 259 taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP); 260 int t; 261 262 if (flags & TASKQ_THREADS_CPU_PCT) { 263 int pct; 264 ASSERT3S(nthreads, >=, 0); 265 ASSERT3S(nthreads, <=, 100); 266 pct = MIN(nthreads, 100); 267 pct = MAX(pct, 0); 268 269 nthreads = (sysconf(_SC_NPROCESSORS_ONLN) * pct) / 100; 270 nthreads = MAX(nthreads, 1); /* need at least 1 thread */ 271 } else { 272 ASSERT3S(nthreads, >=, 1); 273 } 274 275 rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL); 276 mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL); 277 cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL); 278 cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL); 279 cv_init(&tq->tq_maxalloc_cv, NULL, CV_DEFAULT, NULL); 280 (void) strlcpy(tq->tq_name, name, sizeof (tq->tq_name)); 281 tq->tq_flags = flags | TASKQ_ACTIVE; 282 tq->tq_active = nthreads; 283 tq->tq_nthreads = nthreads; 284 tq->tq_minalloc = minalloc; 285 tq->tq_maxalloc = maxalloc; 286 tq->tq_task.tqent_next = &tq->tq_task; 287 tq->tq_task.tqent_prev = &tq->tq_task; 288 tq->tq_threadlist = kmem_alloc(nthreads * sizeof (kthread_t *), 289 KM_SLEEP); 290 291 if (flags & TASKQ_PREPOPULATE) { 292 mutex_enter(&tq->tq_lock); 293 while (minalloc-- > 0) 294 task_free(tq, task_alloc(tq, KM_SLEEP)); 295 mutex_exit(&tq->tq_lock); 296 } 297 298 for (t = 0; t < nthreads; t++) 299 VERIFY((tq->tq_threadlist[t] = thread_create_named(tq->tq_name, 300 NULL, 0, taskq_thread, tq, 0, &p0, TS_RUN, pri)) != NULL); 301 302 return (tq); 303 } 304 305 void 306 taskq_destroy(taskq_t *tq) 307 { 308 int nthreads = tq->tq_nthreads; 309 310 taskq_wait(tq); 311 312 mutex_enter(&tq->tq_lock); 313 314 tq->tq_flags &= ~TASKQ_ACTIVE; 315 cv_broadcast(&tq->tq_dispatch_cv); 316 317 while (tq->tq_nthreads != 0) 318 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 319 320 tq->tq_minalloc = 0; 321 while (tq->tq_nalloc != 0) { 322 ASSERT(tq->tq_freelist != NULL); 323 taskq_ent_t *tqent_nexttq = tq->tq_freelist->tqent_next; 324 task_free(tq, tq->tq_freelist); 325 tq->tq_freelist = tqent_nexttq; 326 } 327 328 mutex_exit(&tq->tq_lock); 329 330 kmem_free(tq->tq_threadlist, nthreads * sizeof (kthread_t *)); 331 332 rw_destroy(&tq->tq_threadlock); 333 mutex_destroy(&tq->tq_lock); 334 cv_destroy(&tq->tq_dispatch_cv); 335 cv_destroy(&tq->tq_wait_cv); 336 cv_destroy(&tq->tq_maxalloc_cv); 337 338 kmem_free(tq, sizeof (taskq_t)); 339 } 340 341 /* 342 * Create a taskq with a specified number of pool threads. Allocate 343 * and return an array of nthreads kthread_t pointers, one for each 344 * thread in the pool. The array is not ordered and must be freed 345 * by the caller. 346 */ 347 taskq_t * 348 taskq_create_synced(const char *name, int nthreads, pri_t pri, 349 int minalloc, int maxalloc, uint_t flags, kthread_t ***ktpp) 350 { 351 taskq_t *tq; 352 kthread_t **kthreads = kmem_zalloc(sizeof (*kthreads) * nthreads, 353 KM_SLEEP); 354 355 (void) pri; (void) minalloc; (void) maxalloc; 356 357 flags &= ~(TASKQ_DYNAMIC | TASKQ_THREADS_CPU_PCT | TASKQ_DC_BATCH); 358 359 tq = taskq_create(name, nthreads, minclsyspri, nthreads, INT_MAX, 360 flags | TASKQ_PREPOPULATE); 361 VERIFY(tq != NULL); 362 VERIFY(tq->tq_nthreads == nthreads); 363 364 for (int i = 0; i < nthreads; i++) { 365 kthreads[i] = tq->tq_threadlist[i]; 366 } 367 *ktpp = kthreads; 368 return (tq); 369 } 370 371 int 372 taskq_member(taskq_t *tq, kthread_t *t) 373 { 374 int i; 375 376 if (taskq_now) 377 return (1); 378 379 for (i = 0; i < tq->tq_nthreads; i++) 380 if (tq->tq_threadlist[i] == t) 381 return (1); 382 383 return (0); 384 } 385 386 taskq_t * 387 taskq_of_curthread(void) 388 { 389 return (pthread_getspecific(taskq_tsd)); 390 } 391 392 int 393 taskq_cancel_id(taskq_t *tq, taskqid_t id) 394 { 395 (void) tq, (void) id; 396 return (ENOENT); 397 } 398 399 void 400 system_taskq_init(void) 401 { 402 VERIFY0(pthread_key_create(&taskq_tsd, NULL)); 403 system_taskq = taskq_create("system_taskq", 64, maxclsyspri, 4, 512, 404 TASKQ_DYNAMIC | TASKQ_PREPOPULATE); 405 system_delay_taskq = taskq_create("delay_taskq", 4, maxclsyspri, 4, 406 512, TASKQ_DYNAMIC | TASKQ_PREPOPULATE); 407 } 408 409 void 410 system_taskq_fini(void) 411 { 412 taskq_destroy(system_taskq); 413 system_taskq = NULL; /* defensive */ 414 taskq_destroy(system_delay_taskq); 415 system_delay_taskq = NULL; 416 VERIFY0(pthread_key_delete(taskq_tsd)); 417 } 418