1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2010 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 /* 26 * Copyright 2012 Garrett D'Amore <garrett@damore.org>. All rights reserved. 27 * Copyright 2013 Nexenta Systems, Inc. All rights reserved. 28 */ 29 30 #include <sys/taskq_impl.h> 31 32 #include <sys/class.h> 33 #include <sys/debug.h> 34 #include <sys/ksynch.h> 35 #include <sys/kmem.h> 36 #include <sys/time.h> 37 #include <sys/systm.h> 38 #include <sys/sysmacros.h> 39 #include <sys/unistd.h> 40 41 /* avoid <unistd.h> */ 42 extern long sysconf(int); 43 44 /* avoiding <thread.h> */ 45 typedef unsigned int thread_t; 46 typedef unsigned int thread_key_t; 47 48 extern int thr_create(void *, size_t, void *(*)(void *), void *, long, 49 thread_t *); 50 extern int thr_join(thread_t, thread_t *, void **); 51 52 /* 53 * POSIX.1c Note: 54 * THR_BOUND is defined same as PTHREAD_SCOPE_SYSTEM in <pthread.h> 55 * THR_DETACHED is defined same as PTHREAD_CREATE_DETACHED in <pthread.h> 56 * Any changes in these definitions should be reflected in <pthread.h> 57 */ 58 #define THR_BOUND 0x00000001 /* = PTHREAD_SCOPE_SYSTEM */ 59 #define THR_NEW_LWP 0x00000002 60 #define THR_DETACHED 0x00000040 /* = PTHREAD_CREATE_DETACHED */ 61 #define THR_SUSPENDED 0x00000080 62 #define THR_DAEMON 0x00000100 63 64 65 int taskq_now; 66 taskq_t *system_taskq; 67 68 #define TASKQ_ACTIVE 0x00010000 69 70 struct taskq { 71 kmutex_t tq_lock; 72 krwlock_t tq_threadlock; 73 kcondvar_t tq_dispatch_cv; 74 kcondvar_t tq_wait_cv; 75 thread_t *tq_threadlist; 76 int tq_flags; 77 int tq_active; 78 int tq_nthreads; 79 int tq_nalloc; 80 int tq_minalloc; 81 int tq_maxalloc; 82 kcondvar_t tq_maxalloc_cv; 83 int tq_maxalloc_wait; 84 taskq_ent_t *tq_freelist; 85 taskq_ent_t tq_task; 86 }; 87 88 static taskq_ent_t * 89 task_alloc(taskq_t *tq, int tqflags) 90 { 91 taskq_ent_t *t; 92 int rv; 93 94 again: if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) { 95 tq->tq_freelist = t->tqent_next; 96 } else { 97 if (tq->tq_nalloc >= tq->tq_maxalloc) { 98 if (!(tqflags & KM_SLEEP)) 99 return (NULL); 100 101 /* 102 * We don't want to exceed tq_maxalloc, but we can't 103 * wait for other tasks to complete (and thus free up 104 * task structures) without risking deadlock with 105 * the caller. So, we just delay for one second 106 * to throttle the allocation rate. If we have tasks 107 * complete before one second timeout expires then 108 * taskq_ent_free will signal us and we will 109 * immediately retry the allocation. 110 */ 111 tq->tq_maxalloc_wait++; 112 rv = cv_timedwait(&tq->tq_maxalloc_cv, 113 &tq->tq_lock, ddi_get_lbolt() + hz); 114 tq->tq_maxalloc_wait--; 115 if (rv > 0) 116 goto again; /* signaled */ 117 } 118 mutex_exit(&tq->tq_lock); 119 120 t = kmem_alloc(sizeof (taskq_ent_t), tqflags); 121 122 mutex_enter(&tq->tq_lock); 123 if (t != NULL) 124 tq->tq_nalloc++; 125 } 126 return (t); 127 } 128 129 static void 130 task_free(taskq_t *tq, taskq_ent_t *t) 131 { 132 if (tq->tq_nalloc <= tq->tq_minalloc) { 133 t->tqent_next = tq->tq_freelist; 134 tq->tq_freelist = t; 135 } else { 136 tq->tq_nalloc--; 137 mutex_exit(&tq->tq_lock); 138 kmem_free(t, sizeof (taskq_ent_t)); 139 mutex_enter(&tq->tq_lock); 140 } 141 142 if (tq->tq_maxalloc_wait) 143 cv_signal(&tq->tq_maxalloc_cv); 144 } 145 146 taskqid_t 147 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags) 148 { 149 taskq_ent_t *t; 150 151 if (taskq_now) { 152 func(arg); 153 return (1); 154 } 155 156 mutex_enter(&tq->tq_lock); 157 ASSERT(tq->tq_flags & TASKQ_ACTIVE); 158 if ((t = task_alloc(tq, tqflags)) == NULL) { 159 mutex_exit(&tq->tq_lock); 160 return (0); 161 } 162 if (tqflags & TQ_FRONT) { 163 t->tqent_next = tq->tq_task.tqent_next; 164 t->tqent_prev = &tq->tq_task; 165 } else { 166 t->tqent_next = &tq->tq_task; 167 t->tqent_prev = tq->tq_task.tqent_prev; 168 } 169 t->tqent_next->tqent_prev = t; 170 t->tqent_prev->tqent_next = t; 171 t->tqent_func = func; 172 t->tqent_arg = arg; 173 t->tqent_flags = 0; 174 cv_signal(&tq->tq_dispatch_cv); 175 mutex_exit(&tq->tq_lock); 176 return (1); 177 } 178 179 void 180 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, 181 taskq_ent_t *t) 182 { 183 ASSERT(func != NULL); 184 ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC)); 185 186 /* 187 * Mark it as a prealloc'd task. This is important 188 * to ensure that we don't free it later. 189 */ 190 t->tqent_flags |= TQENT_FLAG_PREALLOC; 191 /* 192 * Enqueue the task to the underlying queue. 193 */ 194 mutex_enter(&tq->tq_lock); 195 196 if (flags & TQ_FRONT) { 197 t->tqent_next = tq->tq_task.tqent_next; 198 t->tqent_prev = &tq->tq_task; 199 } else { 200 t->tqent_next = &tq->tq_task; 201 t->tqent_prev = tq->tq_task.tqent_prev; 202 } 203 t->tqent_next->tqent_prev = t; 204 t->tqent_prev->tqent_next = t; 205 t->tqent_func = func; 206 t->tqent_arg = arg; 207 cv_signal(&tq->tq_dispatch_cv); 208 mutex_exit(&tq->tq_lock); 209 } 210 211 void 212 taskq_wait(taskq_t *tq) 213 { 214 mutex_enter(&tq->tq_lock); 215 while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0) 216 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 217 mutex_exit(&tq->tq_lock); 218 } 219 220 static void * 221 taskq_thread(void *arg) 222 { 223 taskq_t *tq = arg; 224 taskq_ent_t *t; 225 boolean_t prealloc; 226 227 mutex_enter(&tq->tq_lock); 228 while (tq->tq_flags & TASKQ_ACTIVE) { 229 if ((t = tq->tq_task.tqent_next) == &tq->tq_task) { 230 if (--tq->tq_active == 0) 231 cv_broadcast(&tq->tq_wait_cv); 232 cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock); 233 tq->tq_active++; 234 continue; 235 } 236 t->tqent_prev->tqent_next = t->tqent_next; 237 t->tqent_next->tqent_prev = t->tqent_prev; 238 t->tqent_next = NULL; 239 t->tqent_prev = NULL; 240 prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC; 241 mutex_exit(&tq->tq_lock); 242 243 rw_enter(&tq->tq_threadlock, RW_READER); 244 t->tqent_func(t->tqent_arg); 245 rw_exit(&tq->tq_threadlock); 246 247 mutex_enter(&tq->tq_lock); 248 if (!prealloc) 249 task_free(tq, t); 250 } 251 tq->tq_nthreads--; 252 cv_broadcast(&tq->tq_wait_cv); 253 mutex_exit(&tq->tq_lock); 254 return (NULL); 255 } 256 257 /*ARGSUSED*/ 258 taskq_t * 259 taskq_create(const char *name, int nthr, pri_t pri, int minalloc, 260 int maxalloc, uint_t flags) 261 { 262 return (taskq_create_proc(name, nthr, pri, 263 minalloc, maxalloc, NULL, flags)); 264 } 265 266 /*ARGSUSED*/ 267 taskq_t * 268 taskq_create_proc(const char *name, int nthreads, pri_t pri, 269 int minalloc, int maxalloc, proc_t *proc, uint_t flags) 270 { 271 taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP); 272 int t; 273 274 if (flags & TASKQ_THREADS_CPU_PCT) { 275 int pct; 276 ASSERT3S(nthreads, >=, 0); 277 ASSERT3S(nthreads, <=, 100); 278 pct = MIN(nthreads, 100); 279 pct = MAX(pct, 0); 280 281 nthreads = (sysconf(_SC_NPROCESSORS_ONLN) * pct) / 100; 282 nthreads = MAX(nthreads, 1); /* need at least 1 thread */ 283 } else { 284 ASSERT3S(nthreads, >=, 1); 285 } 286 287 rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL); 288 mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL); 289 cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL); 290 cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL); 291 cv_init(&tq->tq_maxalloc_cv, NULL, CV_DEFAULT, NULL); 292 tq->tq_flags = flags | TASKQ_ACTIVE; 293 tq->tq_active = nthreads; 294 tq->tq_nthreads = nthreads; 295 tq->tq_minalloc = minalloc; 296 tq->tq_maxalloc = maxalloc; 297 tq->tq_task.tqent_next = &tq->tq_task; 298 tq->tq_task.tqent_prev = &tq->tq_task; 299 tq->tq_threadlist = kmem_alloc(nthreads * sizeof (thread_t), KM_SLEEP); 300 301 if (flags & TASKQ_PREPOPULATE) { 302 mutex_enter(&tq->tq_lock); 303 while (minalloc-- > 0) 304 task_free(tq, task_alloc(tq, KM_SLEEP)); 305 mutex_exit(&tq->tq_lock); 306 } 307 308 for (t = 0; t < nthreads; t++) 309 (void) thr_create(0, 0, taskq_thread, 310 tq, THR_BOUND, &tq->tq_threadlist[t]); 311 312 return (tq); 313 } 314 315 void 316 taskq_destroy(taskq_t *tq) 317 { 318 int t; 319 int nthreads = tq->tq_nthreads; 320 321 taskq_wait(tq); 322 323 mutex_enter(&tq->tq_lock); 324 325 tq->tq_flags &= ~TASKQ_ACTIVE; 326 cv_broadcast(&tq->tq_dispatch_cv); 327 328 while (tq->tq_nthreads != 0) 329 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 330 331 tq->tq_minalloc = 0; 332 while (tq->tq_nalloc != 0) { 333 ASSERT(tq->tq_freelist != NULL); 334 task_free(tq, task_alloc(tq, KM_SLEEP)); 335 } 336 337 mutex_exit(&tq->tq_lock); 338 339 for (t = 0; t < nthreads; t++) 340 (void) thr_join(tq->tq_threadlist[t], NULL, NULL); 341 342 kmem_free(tq->tq_threadlist, nthreads * sizeof (thread_t)); 343 344 rw_destroy(&tq->tq_threadlock); 345 mutex_destroy(&tq->tq_lock); 346 cv_destroy(&tq->tq_dispatch_cv); 347 cv_destroy(&tq->tq_wait_cv); 348 cv_destroy(&tq->tq_maxalloc_cv); 349 350 kmem_free(tq, sizeof (taskq_t)); 351 } 352 353 int 354 taskq_member(taskq_t *tq, struct _kthread *t) 355 { 356 int i; 357 358 if (taskq_now) 359 return (1); 360 361 for (i = 0; i < tq->tq_nthreads; i++) 362 if (tq->tq_threadlist[i] == (thread_t)(uintptr_t)t) 363 return (1); 364 365 return (0); 366 } 367 368 void 369 system_taskq_init(void) 370 { 371 system_taskq = taskq_create("system_taskq", 64, minclsyspri, 4, 512, 372 TASKQ_DYNAMIC | TASKQ_PREPOPULATE); 373 } 374 375 void 376 system_taskq_fini(void) 377 { 378 taskq_destroy(system_taskq); 379 system_taskq = NULL; /* defensive */ 380 } 381