1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2010 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 /* 26 * Copyright 2011 Nexenta Systems, Inc. All rights reserved. 27 * Copyright 2012 Garrett D'Amore <garrett@damore.org>. All rights reserved. 28 * Copyright (c) 2014 by Delphix. All rights reserved. 29 */ 30 31 #include <sys/zfs_context.h> 32 33 int taskq_now; 34 taskq_t *system_taskq; 35 taskq_t *system_delay_taskq; 36 37 static pthread_key_t taskq_tsd; 38 39 #define TASKQ_ACTIVE 0x00010000 40 41 static taskq_ent_t * 42 task_alloc(taskq_t *tq, int tqflags) 43 { 44 taskq_ent_t *t; 45 int rv; 46 47 again: if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) { 48 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); 49 tq->tq_freelist = t->tqent_next; 50 } else { 51 if (tq->tq_nalloc >= tq->tq_maxalloc) { 52 if (!(tqflags & KM_SLEEP)) 53 return (NULL); 54 55 /* 56 * We don't want to exceed tq_maxalloc, but we can't 57 * wait for other tasks to complete (and thus free up 58 * task structures) without risking deadlock with 59 * the caller. So, we just delay for one second 60 * to throttle the allocation rate. If we have tasks 61 * complete before one second timeout expires then 62 * taskq_ent_free will signal us and we will 63 * immediately retry the allocation. 64 */ 65 tq->tq_maxalloc_wait++; 66 rv = cv_timedwait(&tq->tq_maxalloc_cv, 67 &tq->tq_lock, ddi_get_lbolt() + hz); 68 tq->tq_maxalloc_wait--; 69 if (rv > 0) 70 goto again; /* signaled */ 71 } 72 mutex_exit(&tq->tq_lock); 73 74 t = kmem_alloc(sizeof (taskq_ent_t), tqflags); 75 76 mutex_enter(&tq->tq_lock); 77 if (t != NULL) { 78 /* Make sure we start without any flags */ 79 t->tqent_flags = 0; 80 tq->tq_nalloc++; 81 } 82 } 83 return (t); 84 } 85 86 static void 87 task_free(taskq_t *tq, taskq_ent_t *t) 88 { 89 if (tq->tq_nalloc <= tq->tq_minalloc) { 90 t->tqent_next = tq->tq_freelist; 91 tq->tq_freelist = t; 92 } else { 93 tq->tq_nalloc--; 94 mutex_exit(&tq->tq_lock); 95 kmem_free(t, sizeof (taskq_ent_t)); 96 mutex_enter(&tq->tq_lock); 97 } 98 99 if (tq->tq_maxalloc_wait) 100 cv_signal(&tq->tq_maxalloc_cv); 101 } 102 103 taskqid_t 104 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags) 105 { 106 taskq_ent_t *t; 107 108 if (taskq_now) { 109 func(arg); 110 return (1); 111 } 112 113 mutex_enter(&tq->tq_lock); 114 ASSERT(tq->tq_flags & TASKQ_ACTIVE); 115 if ((t = task_alloc(tq, tqflags)) == NULL) { 116 mutex_exit(&tq->tq_lock); 117 return (0); 118 } 119 if (tqflags & TQ_FRONT) { 120 t->tqent_next = tq->tq_task.tqent_next; 121 t->tqent_prev = &tq->tq_task; 122 } else { 123 t->tqent_next = &tq->tq_task; 124 t->tqent_prev = tq->tq_task.tqent_prev; 125 } 126 t->tqent_next->tqent_prev = t; 127 t->tqent_prev->tqent_next = t; 128 t->tqent_func = func; 129 t->tqent_arg = arg; 130 t->tqent_flags = 0; 131 cv_signal(&tq->tq_dispatch_cv); 132 mutex_exit(&tq->tq_lock); 133 return (1); 134 } 135 136 taskqid_t 137 taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags, 138 clock_t expire_time) 139 { 140 return (0); 141 } 142 143 int 144 taskq_empty_ent(taskq_ent_t *t) 145 { 146 return (t->tqent_next == NULL); 147 } 148 149 void 150 taskq_init_ent(taskq_ent_t *t) 151 { 152 t->tqent_next = NULL; 153 t->tqent_prev = NULL; 154 t->tqent_func = NULL; 155 t->tqent_arg = NULL; 156 t->tqent_flags = 0; 157 } 158 159 void 160 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, 161 taskq_ent_t *t) 162 { 163 ASSERT(func != NULL); 164 165 /* 166 * Mark it as a prealloc'd task. This is important 167 * to ensure that we don't free it later. 168 */ 169 t->tqent_flags |= TQENT_FLAG_PREALLOC; 170 /* 171 * Enqueue the task to the underlying queue. 172 */ 173 mutex_enter(&tq->tq_lock); 174 175 if (flags & TQ_FRONT) { 176 t->tqent_next = tq->tq_task.tqent_next; 177 t->tqent_prev = &tq->tq_task; 178 } else { 179 t->tqent_next = &tq->tq_task; 180 t->tqent_prev = tq->tq_task.tqent_prev; 181 } 182 t->tqent_next->tqent_prev = t; 183 t->tqent_prev->tqent_next = t; 184 t->tqent_func = func; 185 t->tqent_arg = arg; 186 cv_signal(&tq->tq_dispatch_cv); 187 mutex_exit(&tq->tq_lock); 188 } 189 190 void 191 taskq_wait(taskq_t *tq) 192 { 193 mutex_enter(&tq->tq_lock); 194 while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0) 195 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 196 mutex_exit(&tq->tq_lock); 197 } 198 199 void 200 taskq_wait_id(taskq_t *tq, taskqid_t id) 201 { 202 taskq_wait(tq); 203 } 204 205 void 206 taskq_wait_outstanding(taskq_t *tq, taskqid_t id) 207 { 208 taskq_wait(tq); 209 } 210 211 static void 212 taskq_thread(void *arg) 213 { 214 taskq_t *tq = arg; 215 taskq_ent_t *t; 216 boolean_t prealloc; 217 218 VERIFY0(pthread_setspecific(taskq_tsd, tq)); 219 220 mutex_enter(&tq->tq_lock); 221 while (tq->tq_flags & TASKQ_ACTIVE) { 222 if ((t = tq->tq_task.tqent_next) == &tq->tq_task) { 223 if (--tq->tq_active == 0) 224 cv_broadcast(&tq->tq_wait_cv); 225 cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock); 226 tq->tq_active++; 227 continue; 228 } 229 t->tqent_prev->tqent_next = t->tqent_next; 230 t->tqent_next->tqent_prev = t->tqent_prev; 231 t->tqent_next = NULL; 232 t->tqent_prev = NULL; 233 prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC; 234 mutex_exit(&tq->tq_lock); 235 236 rw_enter(&tq->tq_threadlock, RW_READER); 237 t->tqent_func(t->tqent_arg); 238 rw_exit(&tq->tq_threadlock); 239 240 mutex_enter(&tq->tq_lock); 241 if (!prealloc) 242 task_free(tq, t); 243 } 244 tq->tq_nthreads--; 245 cv_broadcast(&tq->tq_wait_cv); 246 mutex_exit(&tq->tq_lock); 247 thread_exit(); 248 } 249 250 /*ARGSUSED*/ 251 taskq_t * 252 taskq_create(const char *name, int nthreads, pri_t pri, 253 int minalloc, int maxalloc, uint_t flags) 254 { 255 taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP); 256 int t; 257 258 if (flags & TASKQ_THREADS_CPU_PCT) { 259 int pct; 260 ASSERT3S(nthreads, >=, 0); 261 ASSERT3S(nthreads, <=, 100); 262 pct = MIN(nthreads, 100); 263 pct = MAX(pct, 0); 264 265 nthreads = (sysconf(_SC_NPROCESSORS_ONLN) * pct) / 100; 266 nthreads = MAX(nthreads, 1); /* need at least 1 thread */ 267 } else { 268 ASSERT3S(nthreads, >=, 1); 269 } 270 271 rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL); 272 mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL); 273 cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL); 274 cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL); 275 cv_init(&tq->tq_maxalloc_cv, NULL, CV_DEFAULT, NULL); 276 (void) strncpy(tq->tq_name, name, TASKQ_NAMELEN); 277 tq->tq_flags = flags | TASKQ_ACTIVE; 278 tq->tq_active = nthreads; 279 tq->tq_nthreads = nthreads; 280 tq->tq_minalloc = minalloc; 281 tq->tq_maxalloc = maxalloc; 282 tq->tq_task.tqent_next = &tq->tq_task; 283 tq->tq_task.tqent_prev = &tq->tq_task; 284 tq->tq_threadlist = kmem_alloc(nthreads * sizeof (kthread_t *), 285 KM_SLEEP); 286 287 if (flags & TASKQ_PREPOPULATE) { 288 mutex_enter(&tq->tq_lock); 289 while (minalloc-- > 0) 290 task_free(tq, task_alloc(tq, KM_SLEEP)); 291 mutex_exit(&tq->tq_lock); 292 } 293 294 for (t = 0; t < nthreads; t++) 295 VERIFY((tq->tq_threadlist[t] = thread_create(NULL, 0, 296 taskq_thread, tq, 0, &p0, TS_RUN, pri)) != NULL); 297 298 return (tq); 299 } 300 301 void 302 taskq_destroy(taskq_t *tq) 303 { 304 int nthreads = tq->tq_nthreads; 305 306 taskq_wait(tq); 307 308 mutex_enter(&tq->tq_lock); 309 310 tq->tq_flags &= ~TASKQ_ACTIVE; 311 cv_broadcast(&tq->tq_dispatch_cv); 312 313 while (tq->tq_nthreads != 0) 314 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 315 316 tq->tq_minalloc = 0; 317 while (tq->tq_nalloc != 0) { 318 ASSERT(tq->tq_freelist != NULL); 319 task_free(tq, task_alloc(tq, KM_SLEEP)); 320 } 321 322 mutex_exit(&tq->tq_lock); 323 324 kmem_free(tq->tq_threadlist, nthreads * sizeof (kthread_t *)); 325 326 rw_destroy(&tq->tq_threadlock); 327 mutex_destroy(&tq->tq_lock); 328 cv_destroy(&tq->tq_dispatch_cv); 329 cv_destroy(&tq->tq_wait_cv); 330 cv_destroy(&tq->tq_maxalloc_cv); 331 332 kmem_free(tq, sizeof (taskq_t)); 333 } 334 335 int 336 taskq_member(taskq_t *tq, kthread_t *t) 337 { 338 int i; 339 340 if (taskq_now) 341 return (1); 342 343 for (i = 0; i < tq->tq_nthreads; i++) 344 if (tq->tq_threadlist[i] == t) 345 return (1); 346 347 return (0); 348 } 349 350 taskq_t * 351 taskq_of_curthread(void) 352 { 353 return (pthread_getspecific(taskq_tsd)); 354 } 355 356 int 357 taskq_cancel_id(taskq_t *tq, taskqid_t id) 358 { 359 return (ENOENT); 360 } 361 362 void 363 system_taskq_init(void) 364 { 365 VERIFY0(pthread_key_create(&taskq_tsd, NULL)); 366 system_taskq = taskq_create("system_taskq", 64, maxclsyspri, 4, 512, 367 TASKQ_DYNAMIC | TASKQ_PREPOPULATE); 368 system_delay_taskq = taskq_create("delay_taskq", 4, maxclsyspri, 4, 369 512, TASKQ_DYNAMIC | TASKQ_PREPOPULATE); 370 } 371 372 void 373 system_taskq_fini(void) 374 { 375 taskq_destroy(system_taskq); 376 system_taskq = NULL; /* defensive */ 377 taskq_destroy(system_delay_taskq); 378 system_delay_taskq = NULL; 379 VERIFY0(pthread_key_delete(taskq_tsd)); 380 } 381