1fa9e4066Sahrens /* 2fa9e4066Sahrens * CDDL HEADER START 3fa9e4066Sahrens * 4fa9e4066Sahrens * The contents of this file are subject to the terms of the 5c25056deSgw25295 * Common Development and Distribution License (the "License"). 6c25056deSgw25295 * You may not use this file except in compliance with the License. 7fa9e4066Sahrens * 8fa9e4066Sahrens * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9fa9e4066Sahrens * or http://www.opensolaris.org/os/licensing. 10fa9e4066Sahrens * See the License for the specific language governing permissions 11fa9e4066Sahrens * and limitations under the License. 12fa9e4066Sahrens * 13fa9e4066Sahrens * When distributing Covered Code, include this CDDL HEADER in each 14fa9e4066Sahrens * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15fa9e4066Sahrens * If applicable, add the following below this CDDL HEADER, with the 16fa9e4066Sahrens * fields enclosed by brackets "[]" replaced with your own identifying 17fa9e4066Sahrens * information: Portions Copyright [yyyy] [name of copyright owner] 18fa9e4066Sahrens * 19fa9e4066Sahrens * CDDL HEADER END 20fa9e4066Sahrens */ 21fa9e4066Sahrens /* 2264109744SChris Horne * Copyright 2010 Sun Microsystems, Inc. All rights reserved. 23fa9e4066Sahrens * Use is subject to license terms. 24fa9e4066Sahrens */ 255aeb9474SGarrett D'Amore /* 265aeb9474SGarrett D'Amore * Copyright 2011 Nexenta Systems, Inc. All rights reserved. 27*aa846ad9SGarrett D'Amore * Copyright 2012 Garrett D'Amore <garrett@damore.org>. All rights reserved. 285aeb9474SGarrett D'Amore */ 29fa9e4066Sahrens 30fa9e4066Sahrens #include <sys/zfs_context.h> 31fa9e4066Sahrens 32fa9e4066Sahrens int taskq_now; 3388b7b0f2SMatthew Ahrens taskq_t *system_taskq; 34fa9e4066Sahrens 35fa9e4066Sahrens #define TASKQ_ACTIVE 0x00010000 36fa9e4066Sahrens 37fa9e4066Sahrens struct taskq { 38fa9e4066Sahrens kmutex_t tq_lock; 39fa9e4066Sahrens krwlock_t tq_threadlock; 40fa9e4066Sahrens kcondvar_t tq_dispatch_cv; 41fa9e4066Sahrens kcondvar_t tq_wait_cv; 42fa9e4066Sahrens thread_t *tq_threadlist; 43fa9e4066Sahrens int tq_flags; 44fa9e4066Sahrens int tq_active; 45fa9e4066Sahrens int tq_nthreads; 46fa9e4066Sahrens int tq_nalloc; 47fa9e4066Sahrens int tq_minalloc; 48fa9e4066Sahrens int tq_maxalloc; 4964109744SChris Horne kcondvar_t tq_maxalloc_cv; 5064109744SChris Horne int tq_maxalloc_wait; 515aeb9474SGarrett D'Amore taskq_ent_t *tq_freelist; 525aeb9474SGarrett D'Amore taskq_ent_t tq_task; 53fa9e4066Sahrens }; 54fa9e4066Sahrens 555aeb9474SGarrett D'Amore static taskq_ent_t * 56fa9e4066Sahrens task_alloc(taskq_t *tq, int tqflags) 57fa9e4066Sahrens { 585aeb9474SGarrett D'Amore taskq_ent_t *t; 5964109744SChris Horne int rv; 60fa9e4066Sahrens 6164109744SChris Horne again: if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) { 625aeb9474SGarrett D'Amore tq->tq_freelist = t->tqent_next; 63fa9e4066Sahrens } else { 64fa9e4066Sahrens if (tq->tq_nalloc >= tq->tq_maxalloc) { 6564109744SChris Horne if (!(tqflags & KM_SLEEP)) 66fa9e4066Sahrens return (NULL); 6764109744SChris Horne 68fa9e4066Sahrens /* 69fa9e4066Sahrens * We don't want to exceed tq_maxalloc, but we can't 70fa9e4066Sahrens * wait for other tasks to complete (and thus free up 71fa9e4066Sahrens * task structures) without risking deadlock with 72fa9e4066Sahrens * the caller. So, we just delay for one second 7364109744SChris Horne * to throttle the allocation rate. If we have tasks 7464109744SChris Horne * complete before one second timeout expires then 7564109744SChris Horne * taskq_ent_free will signal us and we will 7664109744SChris Horne * immediately retry the allocation. 77fa9e4066Sahrens */ 7864109744SChris Horne tq->tq_maxalloc_wait++; 7964109744SChris Horne rv = cv_timedwait(&tq->tq_maxalloc_cv, 8064109744SChris Horne &tq->tq_lock, ddi_get_lbolt() + hz); 8164109744SChris Horne tq->tq_maxalloc_wait--; 8264109744SChris Horne if (rv > 0) 8364109744SChris Horne goto again; /* signaled */ 84fa9e4066Sahrens } 8564109744SChris Horne mutex_exit(&tq->tq_lock); 8664109744SChris Horne 875aeb9474SGarrett D'Amore t = kmem_alloc(sizeof (taskq_ent_t), tqflags); 8864109744SChris Horne 89fa9e4066Sahrens mutex_enter(&tq->tq_lock); 90fa9e4066Sahrens if (t != NULL) 91fa9e4066Sahrens tq->tq_nalloc++; 92fa9e4066Sahrens } 93fa9e4066Sahrens return (t); 94fa9e4066Sahrens } 95fa9e4066Sahrens 96fa9e4066Sahrens static void 975aeb9474SGarrett D'Amore task_free(taskq_t *tq, taskq_ent_t *t) 98fa9e4066Sahrens { 99fa9e4066Sahrens if (tq->tq_nalloc <= tq->tq_minalloc) { 1005aeb9474SGarrett D'Amore t->tqent_next = tq->tq_freelist; 101fa9e4066Sahrens tq->tq_freelist = t; 102fa9e4066Sahrens } else { 103fa9e4066Sahrens tq->tq_nalloc--; 104fa9e4066Sahrens mutex_exit(&tq->tq_lock); 1055aeb9474SGarrett D'Amore kmem_free(t, sizeof (taskq_ent_t)); 106fa9e4066Sahrens mutex_enter(&tq->tq_lock); 107fa9e4066Sahrens } 10864109744SChris Horne 10964109744SChris Horne if (tq->tq_maxalloc_wait) 11064109744SChris Horne cv_signal(&tq->tq_maxalloc_cv); 111fa9e4066Sahrens } 112fa9e4066Sahrens 113fa9e4066Sahrens taskqid_t 114fa9e4066Sahrens taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags) 115fa9e4066Sahrens { 1165aeb9474SGarrett D'Amore taskq_ent_t *t; 117fa9e4066Sahrens 118fa9e4066Sahrens if (taskq_now) { 119fa9e4066Sahrens func(arg); 120fa9e4066Sahrens return (1); 121fa9e4066Sahrens } 122fa9e4066Sahrens 123fa9e4066Sahrens mutex_enter(&tq->tq_lock); 124fa9e4066Sahrens ASSERT(tq->tq_flags & TASKQ_ACTIVE); 125fa9e4066Sahrens if ((t = task_alloc(tq, tqflags)) == NULL) { 126fa9e4066Sahrens mutex_exit(&tq->tq_lock); 127fa9e4066Sahrens return (0); 128fa9e4066Sahrens } 12935a5a358SJonathan Adams if (tqflags & TQ_FRONT) { 1305aeb9474SGarrett D'Amore t->tqent_next = tq->tq_task.tqent_next; 1315aeb9474SGarrett D'Amore t->tqent_prev = &tq->tq_task; 13235a5a358SJonathan Adams } else { 1335aeb9474SGarrett D'Amore t->tqent_next = &tq->tq_task; 1345aeb9474SGarrett D'Amore t->tqent_prev = tq->tq_task.tqent_prev; 13535a5a358SJonathan Adams } 1365aeb9474SGarrett D'Amore t->tqent_next->tqent_prev = t; 1375aeb9474SGarrett D'Amore t->tqent_prev->tqent_next = t; 1385aeb9474SGarrett D'Amore t->tqent_func = func; 1395aeb9474SGarrett D'Amore t->tqent_arg = arg; 140*aa846ad9SGarrett D'Amore t->tqent_flags = 0; 141fa9e4066Sahrens cv_signal(&tq->tq_dispatch_cv); 142fa9e4066Sahrens mutex_exit(&tq->tq_lock); 143fa9e4066Sahrens return (1); 144fa9e4066Sahrens } 145fa9e4066Sahrens 146fa9e4066Sahrens void 1475aeb9474SGarrett D'Amore taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, 1485aeb9474SGarrett D'Amore taskq_ent_t *t) 1495aeb9474SGarrett D'Amore { 1505aeb9474SGarrett D'Amore ASSERT(func != NULL); 1515aeb9474SGarrett D'Amore ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC)); 1525aeb9474SGarrett D'Amore 1535aeb9474SGarrett D'Amore /* 1545aeb9474SGarrett D'Amore * Mark it as a prealloc'd task. This is important 1555aeb9474SGarrett D'Amore * to ensure that we don't free it later. 1565aeb9474SGarrett D'Amore */ 1575aeb9474SGarrett D'Amore t->tqent_flags |= TQENT_FLAG_PREALLOC; 1585aeb9474SGarrett D'Amore /* 1595aeb9474SGarrett D'Amore * Enqueue the task to the underlying queue. 1605aeb9474SGarrett D'Amore */ 1615aeb9474SGarrett D'Amore mutex_enter(&tq->tq_lock); 1625aeb9474SGarrett D'Amore 1635aeb9474SGarrett D'Amore if (flags & TQ_FRONT) { 1645aeb9474SGarrett D'Amore t->tqent_next = tq->tq_task.tqent_next; 1655aeb9474SGarrett D'Amore t->tqent_prev = &tq->tq_task; 1665aeb9474SGarrett D'Amore } else { 1675aeb9474SGarrett D'Amore t->tqent_next = &tq->tq_task; 1685aeb9474SGarrett D'Amore t->tqent_prev = tq->tq_task.tqent_prev; 1695aeb9474SGarrett D'Amore } 1705aeb9474SGarrett D'Amore t->tqent_next->tqent_prev = t; 1715aeb9474SGarrett D'Amore t->tqent_prev->tqent_next = t; 1725aeb9474SGarrett D'Amore t->tqent_func = func; 1735aeb9474SGarrett D'Amore t->tqent_arg = arg; 1745aeb9474SGarrett D'Amore cv_signal(&tq->tq_dispatch_cv); 1755aeb9474SGarrett D'Amore mutex_exit(&tq->tq_lock); 1765aeb9474SGarrett D'Amore } 1775aeb9474SGarrett D'Amore 1785aeb9474SGarrett D'Amore void 179fa9e4066Sahrens taskq_wait(taskq_t *tq) 180fa9e4066Sahrens { 181fa9e4066Sahrens mutex_enter(&tq->tq_lock); 1825aeb9474SGarrett D'Amore while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0) 183fa9e4066Sahrens cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 184fa9e4066Sahrens mutex_exit(&tq->tq_lock); 185fa9e4066Sahrens } 186fa9e4066Sahrens 187fa9e4066Sahrens static void * 188fa9e4066Sahrens taskq_thread(void *arg) 189fa9e4066Sahrens { 190fa9e4066Sahrens taskq_t *tq = arg; 1915aeb9474SGarrett D'Amore taskq_ent_t *t; 1925aeb9474SGarrett D'Amore boolean_t prealloc; 193fa9e4066Sahrens 194fa9e4066Sahrens mutex_enter(&tq->tq_lock); 195fa9e4066Sahrens while (tq->tq_flags & TASKQ_ACTIVE) { 1965aeb9474SGarrett D'Amore if ((t = tq->tq_task.tqent_next) == &tq->tq_task) { 197fa9e4066Sahrens if (--tq->tq_active == 0) 198fa9e4066Sahrens cv_broadcast(&tq->tq_wait_cv); 199fa9e4066Sahrens cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock); 200fa9e4066Sahrens tq->tq_active++; 201fa9e4066Sahrens continue; 202fa9e4066Sahrens } 2035aeb9474SGarrett D'Amore t->tqent_prev->tqent_next = t->tqent_next; 2045aeb9474SGarrett D'Amore t->tqent_next->tqent_prev = t->tqent_prev; 2055aeb9474SGarrett D'Amore t->tqent_next = NULL; 2065aeb9474SGarrett D'Amore t->tqent_prev = NULL; 2075aeb9474SGarrett D'Amore prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC; 208fa9e4066Sahrens mutex_exit(&tq->tq_lock); 209fa9e4066Sahrens 210fa9e4066Sahrens rw_enter(&tq->tq_threadlock, RW_READER); 2115aeb9474SGarrett D'Amore t->tqent_func(t->tqent_arg); 212fa9e4066Sahrens rw_exit(&tq->tq_threadlock); 213fa9e4066Sahrens 214fa9e4066Sahrens mutex_enter(&tq->tq_lock); 2155aeb9474SGarrett D'Amore if (!prealloc) 216fa9e4066Sahrens task_free(tq, t); 217fa9e4066Sahrens } 218fa9e4066Sahrens tq->tq_nthreads--; 219fa9e4066Sahrens cv_broadcast(&tq->tq_wait_cv); 220fa9e4066Sahrens mutex_exit(&tq->tq_lock); 221fa9e4066Sahrens return (NULL); 222fa9e4066Sahrens } 223fa9e4066Sahrens 224fa9e4066Sahrens /*ARGSUSED*/ 225fa9e4066Sahrens taskq_t * 226fa9e4066Sahrens taskq_create(const char *name, int nthreads, pri_t pri, 227fa9e4066Sahrens int minalloc, int maxalloc, uint_t flags) 228fa9e4066Sahrens { 229fa9e4066Sahrens taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP); 230fa9e4066Sahrens int t; 231fa9e4066Sahrens 2322e0c549eSJonathan Adams if (flags & TASKQ_THREADS_CPU_PCT) { 2332e0c549eSJonathan Adams int pct; 2342e0c549eSJonathan Adams ASSERT3S(nthreads, >=, 0); 2352e0c549eSJonathan Adams ASSERT3S(nthreads, <=, 100); 2362e0c549eSJonathan Adams pct = MIN(nthreads, 100); 2372e0c549eSJonathan Adams pct = MAX(pct, 0); 2382e0c549eSJonathan Adams 2392e0c549eSJonathan Adams nthreads = (sysconf(_SC_NPROCESSORS_ONLN) * pct) / 100; 2402e0c549eSJonathan Adams nthreads = MAX(nthreads, 1); /* need at least 1 thread */ 2412e0c549eSJonathan Adams } else { 2422e0c549eSJonathan Adams ASSERT3S(nthreads, >=, 1); 2432e0c549eSJonathan Adams } 2442e0c549eSJonathan Adams 245fa9e4066Sahrens rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL); 246c25056deSgw25295 mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL); 247c25056deSgw25295 cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL); 248c25056deSgw25295 cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL); 24964109744SChris Horne cv_init(&tq->tq_maxalloc_cv, NULL, CV_DEFAULT, NULL); 250fa9e4066Sahrens tq->tq_flags = flags | TASKQ_ACTIVE; 251fa9e4066Sahrens tq->tq_active = nthreads; 252fa9e4066Sahrens tq->tq_nthreads = nthreads; 253fa9e4066Sahrens tq->tq_minalloc = minalloc; 254fa9e4066Sahrens tq->tq_maxalloc = maxalloc; 2555aeb9474SGarrett D'Amore tq->tq_task.tqent_next = &tq->tq_task; 2565aeb9474SGarrett D'Amore tq->tq_task.tqent_prev = &tq->tq_task; 257fa9e4066Sahrens tq->tq_threadlist = kmem_alloc(nthreads * sizeof (thread_t), KM_SLEEP); 258fa9e4066Sahrens 259fa9e4066Sahrens if (flags & TASKQ_PREPOPULATE) { 260fa9e4066Sahrens mutex_enter(&tq->tq_lock); 261fa9e4066Sahrens while (minalloc-- > 0) 262fa9e4066Sahrens task_free(tq, task_alloc(tq, KM_SLEEP)); 263fa9e4066Sahrens mutex_exit(&tq->tq_lock); 264fa9e4066Sahrens } 265fa9e4066Sahrens 266fa9e4066Sahrens for (t = 0; t < nthreads; t++) 267fa9e4066Sahrens (void) thr_create(0, 0, taskq_thread, 268fa9e4066Sahrens tq, THR_BOUND, &tq->tq_threadlist[t]); 269fa9e4066Sahrens 270fa9e4066Sahrens return (tq); 271fa9e4066Sahrens } 272fa9e4066Sahrens 273fa9e4066Sahrens void 274fa9e4066Sahrens taskq_destroy(taskq_t *tq) 275fa9e4066Sahrens { 276fa9e4066Sahrens int t; 277fa9e4066Sahrens int nthreads = tq->tq_nthreads; 278fa9e4066Sahrens 279fa9e4066Sahrens taskq_wait(tq); 280fa9e4066Sahrens 281fa9e4066Sahrens mutex_enter(&tq->tq_lock); 282fa9e4066Sahrens 283fa9e4066Sahrens tq->tq_flags &= ~TASKQ_ACTIVE; 284fa9e4066Sahrens cv_broadcast(&tq->tq_dispatch_cv); 285fa9e4066Sahrens 286fa9e4066Sahrens while (tq->tq_nthreads != 0) 287fa9e4066Sahrens cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 288fa9e4066Sahrens 289fa9e4066Sahrens tq->tq_minalloc = 0; 290fa9e4066Sahrens while (tq->tq_nalloc != 0) { 291fa9e4066Sahrens ASSERT(tq->tq_freelist != NULL); 292fa9e4066Sahrens task_free(tq, task_alloc(tq, KM_SLEEP)); 293fa9e4066Sahrens } 294fa9e4066Sahrens 295fa9e4066Sahrens mutex_exit(&tq->tq_lock); 296fa9e4066Sahrens 297fa9e4066Sahrens for (t = 0; t < nthreads; t++) 298fa9e4066Sahrens (void) thr_join(tq->tq_threadlist[t], NULL, NULL); 299fa9e4066Sahrens 300fa9e4066Sahrens kmem_free(tq->tq_threadlist, nthreads * sizeof (thread_t)); 301fa9e4066Sahrens 302fa9e4066Sahrens rw_destroy(&tq->tq_threadlock); 303c25056deSgw25295 mutex_destroy(&tq->tq_lock); 304c25056deSgw25295 cv_destroy(&tq->tq_dispatch_cv); 305c25056deSgw25295 cv_destroy(&tq->tq_wait_cv); 30664109744SChris Horne cv_destroy(&tq->tq_maxalloc_cv); 307fa9e4066Sahrens 308fa9e4066Sahrens kmem_free(tq, sizeof (taskq_t)); 309fa9e4066Sahrens } 310fa9e4066Sahrens 311fa9e4066Sahrens int 312fa9e4066Sahrens taskq_member(taskq_t *tq, void *t) 313fa9e4066Sahrens { 314fa9e4066Sahrens int i; 315fa9e4066Sahrens 316fa9e4066Sahrens if (taskq_now) 317fa9e4066Sahrens return (1); 318fa9e4066Sahrens 319fa9e4066Sahrens for (i = 0; i < tq->tq_nthreads; i++) 320fa9e4066Sahrens if (tq->tq_threadlist[i] == (thread_t)(uintptr_t)t) 321fa9e4066Sahrens return (1); 322fa9e4066Sahrens 323fa9e4066Sahrens return (0); 324fa9e4066Sahrens } 32588b7b0f2SMatthew Ahrens 32688b7b0f2SMatthew Ahrens void 32788b7b0f2SMatthew Ahrens system_taskq_init(void) 32888b7b0f2SMatthew Ahrens { 32988b7b0f2SMatthew Ahrens system_taskq = taskq_create("system_taskq", 64, minclsyspri, 4, 512, 33088b7b0f2SMatthew Ahrens TASKQ_DYNAMIC | TASKQ_PREPOPULATE); 33188b7b0f2SMatthew Ahrens } 332d20e665cSRicardo M. Correia 333d20e665cSRicardo M. Correia void 334d20e665cSRicardo M. Correia system_taskq_fini(void) 335d20e665cSRicardo M. Correia { 336d20e665cSRicardo M. Correia taskq_destroy(system_taskq); 337d20e665cSRicardo M. Correia system_taskq = NULL; /* defensive */ 338d20e665cSRicardo M. Correia } 339