1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2010 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 /* 26 * Copyright 2012 Garrett D'Amore <garrett@damore.org>. All rights reserved. 27 * Copyright 2013 Nexenta Systems, Inc. All rights reserved. 28 * Copyright 2017 RackTop Systems. 29 * Copyright 2018, Joyent, Inc. 30 */ 31 32 #include <sys/taskq_impl.h> 33 34 #include <sys/class.h> 35 #include <sys/debug.h> 36 #include <sys/ksynch.h> 37 #include <sys/kmem.h> 38 #include <sys/time.h> 39 #include <sys/systm.h> 40 #include <sys/sysmacros.h> 41 #include <sys/unistd.h> 42 43 /* avoid <sys/disp.h> */ 44 #define maxclsyspri 99 45 46 /* avoid <unistd.h> */ 47 extern long sysconf(int); 48 49 /* avoiding <thread.h> */ 50 typedef unsigned int thread_t; 51 typedef unsigned int thread_key_t; 52 53 extern int thr_create(void *, size_t, void *(*)(void *), void *, long, 54 thread_t *); 55 extern int thr_join(thread_t, thread_t *, void **); 56 57 /* 58 * POSIX.1c Note: 59 * THR_BOUND is defined same as PTHREAD_SCOPE_SYSTEM in <pthread.h> 60 * THR_DETACHED is defined same as PTHREAD_CREATE_DETACHED in <pthread.h> 61 * Any changes in these definitions should be reflected in <pthread.h> 62 */ 63 #define THR_BOUND 0x00000001 /* = PTHREAD_SCOPE_SYSTEM */ 64 #define THR_NEW_LWP 0x00000002 65 #define THR_DETACHED 0x00000040 /* = PTHREAD_CREATE_DETACHED */ 66 #define THR_SUSPENDED 0x00000080 67 #define THR_DAEMON 0x00000100 68 69 70 int taskq_now; 71 taskq_t *system_taskq; 72 73 #define TASKQ_ACTIVE 0x00010000 74 75 struct taskq { 76 kmutex_t tq_lock; 77 krwlock_t tq_threadlock; 78 kcondvar_t tq_dispatch_cv; 79 kcondvar_t tq_wait_cv; 80 thread_t *tq_threadlist; 81 int tq_flags; 82 int tq_active; 83 int tq_nthreads; 84 int tq_nalloc; 85 int tq_minalloc; 86 int tq_maxalloc; 87 kcondvar_t tq_maxalloc_cv; 88 int tq_maxalloc_wait; 89 taskq_ent_t *tq_freelist; 90 taskq_ent_t tq_task; 91 }; 92 93 static taskq_ent_t * 94 task_alloc(taskq_t *tq, int tqflags) 95 { 96 taskq_ent_t *t; 97 int rv; 98 99 again: if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) { 100 tq->tq_freelist = t->tqent_next; 101 } else { 102 if (tq->tq_nalloc >= tq->tq_maxalloc) { 103 if (!(tqflags & KM_SLEEP)) 104 return (NULL); 105 106 /* 107 * We don't want to exceed tq_maxalloc, but we can't 108 * wait for other tasks to complete (and thus free up 109 * task structures) without risking deadlock with 110 * the caller. So, we just delay for one second 111 * to throttle the allocation rate. If we have tasks 112 * complete before one second timeout expires then 113 * taskq_ent_free will signal us and we will 114 * immediately retry the allocation. 115 */ 116 tq->tq_maxalloc_wait++; 117 rv = cv_timedwait(&tq->tq_maxalloc_cv, 118 &tq->tq_lock, ddi_get_lbolt() + hz); 119 tq->tq_maxalloc_wait--; 120 if (rv > 0) 121 goto again; /* signaled */ 122 } 123 mutex_exit(&tq->tq_lock); 124 125 t = kmem_alloc(sizeof (taskq_ent_t), tqflags); 126 127 mutex_enter(&tq->tq_lock); 128 if (t != NULL) 129 tq->tq_nalloc++; 130 } 131 return (t); 132 } 133 134 static void 135 task_free(taskq_t *tq, taskq_ent_t *t) 136 { 137 if (tq->tq_nalloc <= tq->tq_minalloc) { 138 t->tqent_next = tq->tq_freelist; 139 tq->tq_freelist = t; 140 } else { 141 tq->tq_nalloc--; 142 mutex_exit(&tq->tq_lock); 143 kmem_free(t, sizeof (taskq_ent_t)); 144 mutex_enter(&tq->tq_lock); 145 } 146 147 if (tq->tq_maxalloc_wait) 148 cv_signal(&tq->tq_maxalloc_cv); 149 } 150 151 taskqid_t 152 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags) 153 { 154 taskq_ent_t *t; 155 156 if (taskq_now) { 157 func(arg); 158 return (1); 159 } 160 161 mutex_enter(&tq->tq_lock); 162 ASSERT(tq->tq_flags & TASKQ_ACTIVE); 163 if ((t = task_alloc(tq, tqflags)) == NULL) { 164 mutex_exit(&tq->tq_lock); 165 return (0); 166 } 167 if (tqflags & TQ_FRONT) { 168 t->tqent_next = tq->tq_task.tqent_next; 169 t->tqent_prev = &tq->tq_task; 170 } else { 171 t->tqent_next = &tq->tq_task; 172 t->tqent_prev = tq->tq_task.tqent_prev; 173 } 174 t->tqent_next->tqent_prev = t; 175 t->tqent_prev->tqent_next = t; 176 t->tqent_func = func; 177 t->tqent_arg = arg; 178 t->tqent_flags = 0; 179 cv_signal(&tq->tq_dispatch_cv); 180 mutex_exit(&tq->tq_lock); 181 return (1); 182 } 183 184 void 185 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, 186 taskq_ent_t *t) 187 { 188 ASSERT(func != NULL); 189 ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC)); 190 191 /* 192 * Mark it as a prealloc'd task. This is important 193 * to ensure that we don't free it later. 194 */ 195 t->tqent_flags |= TQENT_FLAG_PREALLOC; 196 /* 197 * Enqueue the task to the underlying queue. 198 */ 199 mutex_enter(&tq->tq_lock); 200 201 if (flags & TQ_FRONT) { 202 t->tqent_next = tq->tq_task.tqent_next; 203 t->tqent_prev = &tq->tq_task; 204 } else { 205 t->tqent_next = &tq->tq_task; 206 t->tqent_prev = tq->tq_task.tqent_prev; 207 } 208 t->tqent_next->tqent_prev = t; 209 t->tqent_prev->tqent_next = t; 210 t->tqent_func = func; 211 t->tqent_arg = arg; 212 cv_signal(&tq->tq_dispatch_cv); 213 mutex_exit(&tq->tq_lock); 214 } 215 216 boolean_t 217 taskq_empty(taskq_t *tq) 218 { 219 boolean_t rv; 220 221 mutex_enter(&tq->tq_lock); 222 rv = (tq->tq_task.tqent_next == &tq->tq_task) && (tq->tq_active == 0); 223 mutex_exit(&tq->tq_lock); 224 225 return (rv); 226 } 227 228 void 229 taskq_wait(taskq_t *tq) 230 { 231 mutex_enter(&tq->tq_lock); 232 while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0) 233 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 234 mutex_exit(&tq->tq_lock); 235 } 236 237 static void * 238 taskq_thread(void *arg) 239 { 240 taskq_t *tq = arg; 241 taskq_ent_t *t; 242 boolean_t prealloc; 243 244 mutex_enter(&tq->tq_lock); 245 while (tq->tq_flags & TASKQ_ACTIVE) { 246 if ((t = tq->tq_task.tqent_next) == &tq->tq_task) { 247 if (--tq->tq_active == 0) 248 cv_broadcast(&tq->tq_wait_cv); 249 cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock); 250 tq->tq_active++; 251 continue; 252 } 253 t->tqent_prev->tqent_next = t->tqent_next; 254 t->tqent_next->tqent_prev = t->tqent_prev; 255 t->tqent_next = NULL; 256 t->tqent_prev = NULL; 257 prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC; 258 mutex_exit(&tq->tq_lock); 259 260 rw_enter(&tq->tq_threadlock, RW_READER); 261 t->tqent_func(t->tqent_arg); 262 rw_exit(&tq->tq_threadlock); 263 264 mutex_enter(&tq->tq_lock); 265 if (!prealloc) 266 task_free(tq, t); 267 } 268 tq->tq_nthreads--; 269 cv_broadcast(&tq->tq_wait_cv); 270 mutex_exit(&tq->tq_lock); 271 return (NULL); 272 } 273 274 /*ARGSUSED*/ 275 taskq_t * 276 taskq_create(const char *name, int nthr, pri_t pri, int minalloc, 277 int maxalloc, uint_t flags) 278 { 279 return (taskq_create_proc(name, nthr, pri, 280 minalloc, maxalloc, NULL, flags)); 281 } 282 283 /*ARGSUSED*/ 284 taskq_t * 285 taskq_create_sysdc(const char *name, int nthr, int minalloc, 286 int maxalloc, proc_t *proc, uint_t dc, uint_t flags) 287 { 288 return (taskq_create_proc(name, nthr, maxclsyspri, 289 minalloc, maxalloc, proc, flags)); 290 } 291 292 /*ARGSUSED*/ 293 taskq_t * 294 taskq_create_proc(const char *name, int nthreads, pri_t pri, 295 int minalloc, int maxalloc, proc_t *proc, uint_t flags) 296 { 297 taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP); 298 int t; 299 300 if (flags & TASKQ_THREADS_CPU_PCT) { 301 int pct; 302 ASSERT3S(nthreads, >=, 0); 303 ASSERT3S(nthreads, <=, 100); 304 pct = MIN(nthreads, 100); 305 pct = MAX(pct, 0); 306 307 nthreads = (sysconf(_SC_NPROCESSORS_ONLN) * pct) / 100; 308 nthreads = MAX(nthreads, 1); /* need at least 1 thread */ 309 } else { 310 ASSERT3S(nthreads, >=, 1); 311 } 312 313 rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL); 314 mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL); 315 cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL); 316 cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL); 317 cv_init(&tq->tq_maxalloc_cv, NULL, CV_DEFAULT, NULL); 318 tq->tq_flags = flags | TASKQ_ACTIVE; 319 tq->tq_active = nthreads; 320 tq->tq_nthreads = nthreads; 321 tq->tq_minalloc = minalloc; 322 tq->tq_maxalloc = maxalloc; 323 tq->tq_task.tqent_next = &tq->tq_task; 324 tq->tq_task.tqent_prev = &tq->tq_task; 325 tq->tq_threadlist = kmem_alloc(nthreads * sizeof (thread_t), KM_SLEEP); 326 327 if (flags & TASKQ_PREPOPULATE) { 328 mutex_enter(&tq->tq_lock); 329 while (minalloc-- > 0) 330 task_free(tq, task_alloc(tq, KM_SLEEP)); 331 mutex_exit(&tq->tq_lock); 332 } 333 334 for (t = 0; t < nthreads; t++) 335 (void) thr_create(0, 0, taskq_thread, 336 tq, THR_BOUND, &tq->tq_threadlist[t]); 337 338 return (tq); 339 } 340 341 void 342 taskq_destroy(taskq_t *tq) 343 { 344 int t; 345 int nthreads = tq->tq_nthreads; 346 347 taskq_wait(tq); 348 349 mutex_enter(&tq->tq_lock); 350 351 tq->tq_flags &= ~TASKQ_ACTIVE; 352 cv_broadcast(&tq->tq_dispatch_cv); 353 354 while (tq->tq_nthreads != 0) 355 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 356 357 tq->tq_minalloc = 0; 358 while (tq->tq_nalloc != 0) { 359 ASSERT(tq->tq_freelist != NULL); 360 task_free(tq, task_alloc(tq, KM_SLEEP)); 361 } 362 363 mutex_exit(&tq->tq_lock); 364 365 for (t = 0; t < nthreads; t++) 366 (void) thr_join(tq->tq_threadlist[t], NULL, NULL); 367 368 kmem_free(tq->tq_threadlist, nthreads * sizeof (thread_t)); 369 370 rw_destroy(&tq->tq_threadlock); 371 mutex_destroy(&tq->tq_lock); 372 cv_destroy(&tq->tq_dispatch_cv); 373 cv_destroy(&tq->tq_wait_cv); 374 cv_destroy(&tq->tq_maxalloc_cv); 375 376 kmem_free(tq, sizeof (taskq_t)); 377 } 378 379 int 380 taskq_member(taskq_t *tq, struct _kthread *t) 381 { 382 int i; 383 384 if (taskq_now) 385 return (1); 386 387 for (i = 0; i < tq->tq_nthreads; i++) 388 if (tq->tq_threadlist[i] == (thread_t)(uintptr_t)t) 389 return (1); 390 391 return (0); 392 } 393 394 void 395 system_taskq_init(void) 396 { 397 system_taskq = taskq_create("system_taskq", 64, minclsyspri, 4, 512, 398 TASKQ_DYNAMIC | TASKQ_PREPOPULATE); 399 } 400 401 void 402 system_taskq_fini(void) 403 { 404 taskq_destroy(system_taskq); 405 system_taskq = NULL; /* defensive */ 406 } 407