1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2009 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 26 /* 27 * Kernel task queues: general-purpose asynchronous task scheduling. 28 * 29 * A common problem in kernel programming is the need to schedule tasks 30 * to be performed later, by another thread. There are several reasons 31 * you may want or need to do this: 32 * 33 * (1) The task isn't time-critical, but your current code path is. 34 * 35 * (2) The task may require grabbing locks that you already hold. 36 * 37 * (3) The task may need to block (e.g. to wait for memory), but you 38 * cannot block in your current context. 39 * 40 * (4) Your code path can't complete because of some condition, but you can't 41 * sleep or fail, so you queue the task for later execution when condition 42 * disappears. 43 * 44 * (5) You just want a simple way to launch multiple tasks in parallel. 45 * 46 * Task queues provide such a facility. In its simplest form (used when 47 * performance is not a critical consideration) a task queue consists of a 48 * single list of tasks, together with one or more threads to service the 49 * list. There are some cases when this simple queue is not sufficient: 50 * 51 * (1) The task queues are very hot and there is a need to avoid data and lock 52 * contention over global resources. 53 * 54 * (2) Some tasks may depend on other tasks to complete, so they can't be put in 55 * the same list managed by the same thread. 56 * 57 * (3) Some tasks may block for a long time, and this should not block other 58 * tasks in the queue. 59 * 60 * To provide useful service in such cases we define a "dynamic task queue" 61 * which has an individual thread for each of the tasks. These threads are 62 * dynamically created as they are needed and destroyed when they are not in 63 * use. The API for managing task pools is the same as for managing task queues 64 * with the exception of a taskq creation flag TASKQ_DYNAMIC which tells that 65 * dynamic task pool behavior is desired. 66 * 67 * Dynamic task queues may also place tasks in the normal queue (called "backing 68 * queue") when task pool runs out of resources. Users of task queues may 69 * disallow such queued scheduling by specifying TQ_NOQUEUE in the dispatch 70 * flags. 71 * 72 * The backing task queue is also used for scheduling internal tasks needed for 73 * dynamic task queue maintenance. 74 * 75 * INTERFACES ================================================================== 76 * 77 * taskq_t *taskq_create(name, nthreads, pri_t pri, minalloc, maxall, flags); 78 * 79 * Create a taskq with specified properties. 80 * Possible 'flags': 81 * 82 * TASKQ_DYNAMIC: Create task pool for task management. If this flag is 83 * specified, 'nthreads' specifies the maximum number of threads in 84 * the task queue. Task execution order for dynamic task queues is 85 * not predictable. 86 * 87 * If this flag is not specified (default case) a 88 * single-list task queue is created with 'nthreads' threads 89 * servicing it. Entries in this queue are managed by 90 * taskq_ent_alloc() and taskq_ent_free() which try to keep the 91 * task population between 'minalloc' and 'maxalloc', but the 92 * latter limit is only advisory for TQ_SLEEP dispatches and the 93 * former limit is only advisory for TQ_NOALLOC dispatches. If 94 * TASKQ_PREPOPULATE is set in 'flags', the taskq will be 95 * prepopulated with 'minalloc' task structures. 96 * 97 * Since non-DYNAMIC taskqs are queues, tasks are guaranteed to be 98 * executed in the order they are scheduled if nthreads == 1. 99 * If nthreads > 1, task execution order is not predictable. 100 * 101 * TASKQ_PREPOPULATE: Prepopulate task queue with threads. 102 * Also prepopulate the task queue with 'minalloc' task structures. 103 * 104 * TASKQ_THREADS_CPU_PCT: This flag specifies that 'nthreads' should be 105 * interpreted as a percentage of the # of online CPUs on the 106 * system. The taskq subsystem will automatically adjust the 107 * number of threads in the taskq in response to CPU online 108 * and offline events, to keep the ratio. nthreads must be in 109 * the range [0,100]. 110 * 111 * The calculation used is: 112 * 113 * MAX((ncpus_online * percentage)/100, 1) 114 * 115 * This flag is not supported for DYNAMIC task queues. 116 * This flag is not compatible with TASKQ_CPR_SAFE. 117 * 118 * TASKQ_CPR_SAFE: This flag specifies that users of the task queue will 119 * use their own protocol for handling CPR issues. This flag is not 120 * supported for DYNAMIC task queues. This flag is not compatible 121 * with TASKQ_THREADS_CPU_PCT. 122 * 123 * The 'pri' field specifies the default priority for the threads that 124 * service all scheduled tasks. 125 * 126 * void taskq_destroy(tap): 127 * 128 * Waits for any scheduled tasks to complete, then destroys the taskq. 129 * Caller should guarantee that no new tasks are scheduled in the closing 130 * taskq. 131 * 132 * taskqid_t taskq_dispatch(tq, func, arg, flags): 133 * 134 * Dispatches the task "func(arg)" to taskq. The 'flags' indicates whether 135 * the caller is willing to block for memory. The function returns an 136 * opaque value which is zero iff dispatch fails. If flags is TQ_NOSLEEP 137 * or TQ_NOALLOC and the task can't be dispatched, taskq_dispatch() fails 138 * and returns (taskqid_t)0. 139 * 140 * ASSUMES: func != NULL. 141 * 142 * Possible flags: 143 * TQ_NOSLEEP: Do not wait for resources; may fail. 144 * 145 * TQ_NOALLOC: Do not allocate memory; may fail. May only be used with 146 * non-dynamic task queues. 147 * 148 * TQ_NOQUEUE: Do not enqueue a task if it can't dispatch it due to 149 * lack of available resources and fail. If this flag is not 150 * set, and the task pool is exhausted, the task may be scheduled 151 * in the backing queue. This flag may ONLY be used with dynamic 152 * task queues. 153 * 154 * NOTE: This flag should always be used when a task queue is used 155 * for tasks that may depend on each other for completion. 156 * Enqueueing dependent tasks may create deadlocks. 157 * 158 * TQ_SLEEP: May block waiting for resources. May still fail for 159 * dynamic task queues if TQ_NOQUEUE is also specified, otherwise 160 * always succeed. 161 * 162 * NOTE: Dynamic task queues are much more likely to fail in 163 * taskq_dispatch() (especially if TQ_NOQUEUE was specified), so it 164 * is important to have backup strategies handling such failures. 165 * 166 * void taskq_wait(tq): 167 * 168 * Waits for all previously scheduled tasks to complete. 169 * 170 * NOTE: It does not stop any new task dispatches. 171 * Do NOT call taskq_wait() from a task: it will cause deadlock. 172 * 173 * void taskq_suspend(tq) 174 * 175 * Suspend all task execution. Tasks already scheduled for a dynamic task 176 * queue will still be executed, but all new scheduled tasks will be 177 * suspended until taskq_resume() is called. 178 * 179 * int taskq_suspended(tq) 180 * 181 * Returns 1 if taskq is suspended and 0 otherwise. It is intended to 182 * ASSERT that the task queue is suspended. 183 * 184 * void taskq_resume(tq) 185 * 186 * Resume task queue execution. 187 * 188 * int taskq_member(tq, thread) 189 * 190 * Returns 1 if 'thread' belongs to taskq 'tq' and 0 otherwise. The 191 * intended use is to ASSERT that a given function is called in taskq 192 * context only. 193 * 194 * system_taskq 195 * 196 * Global system-wide dynamic task queue for common uses. It may be used by 197 * any subsystem that needs to schedule tasks and does not need to manage 198 * its own task queues. It is initialized quite early during system boot. 199 * 200 * IMPLEMENTATION ============================================================== 201 * 202 * This is schematic representation of the task queue structures. 203 * 204 * taskq: 205 * +-------------+ 206 * | tq_lock | +---< taskq_ent_free() 207 * +-------------+ | 208 * |... | | tqent: tqent: 209 * +-------------+ | +------------+ +------------+ 210 * | tq_freelist |-->| tqent_next |--> ... ->| tqent_next | 211 * +-------------+ +------------+ +------------+ 212 * |... | | ... | | ... | 213 * +-------------+ +------------+ +------------+ 214 * | tq_task | | 215 * | | +-------------->taskq_ent_alloc() 216 * +--------------------------------------------------------------------------+ 217 * | | | tqent tqent | 218 * | +---------------------+ +--> +------------+ +--> +------------+ | 219 * | | ... | | | func, arg | | | func, arg | | 220 * +>+---------------------+ <---|-+ +------------+ <---|-+ +------------+ | 221 * | tq_taskq.tqent_next | ----+ | | tqent_next | --->+ | | tqent_next |--+ 222 * +---------------------+ | +------------+ ^ | +------------+ 223 * +-| tq_task.tqent_prev | +--| tqent_prev | | +--| tqent_prev | ^ 224 * | +---------------------+ +------------+ | +------------+ | 225 * | |... | | ... | | | ... | | 226 * | +---------------------+ +------------+ | +------------+ | 227 * | ^ | | 228 * | | | | 229 * +--------------------------------------+--------------+ TQ_APPEND() -+ 230 * | | | 231 * |... | taskq_thread()-----+ 232 * +-------------+ 233 * | tq_buckets |--+-------> [ NULL ] (for regular task queues) 234 * +-------------+ | 235 * | DYNAMIC TASK QUEUES: 236 * | 237 * +-> taskq_bucket[nCPU] taskq_bucket_dispatch() 238 * +-------------------+ ^ 239 * +--->| tqbucket_lock | | 240 * | +-------------------+ +--------+ +--------+ 241 * | | tqbucket_freelist |-->| tqent |-->...| tqent | ^ 242 * | +-------------------+<--+--------+<--...+--------+ | 243 * | | ... | | thread | | thread | | 244 * | +-------------------+ +--------+ +--------+ | 245 * | +-------------------+ | 246 * taskq_dispatch()--+--->| tqbucket_lock | TQ_APPEND()------+ 247 * TQ_HASH() | +-------------------+ +--------+ +--------+ 248 * | | tqbucket_freelist |-->| tqent |-->...| tqent | 249 * | +-------------------+<--+--------+<--...+--------+ 250 * | | ... | | thread | | thread | 251 * | +-------------------+ +--------+ +--------+ 252 * +---> ... 253 * 254 * 255 * Task queues use tq_task field to link new entry in the queue. The queue is a 256 * circular doubly-linked list. Entries are put in the end of the list with 257 * TQ_APPEND() and processed from the front of the list by taskq_thread() in 258 * FIFO order. Task queue entries are cached in the free list managed by 259 * taskq_ent_alloc() and taskq_ent_free() functions. 260 * 261 * All threads used by task queues mark t_taskq field of the thread to 262 * point to the task queue. 263 * 264 * Taskq Thread Management ----------------------------------------------------- 265 * 266 * Taskq's non-dynamic threads are managed with several variables and flags: 267 * 268 * * tq_nthreads - The number of threads in taskq_thread() for the 269 * taskq. 270 * 271 * * tq_active - The number of threads not waiting on a CV in 272 * taskq_thread(); includes newly created threads 273 * not yet counted in tq_nthreads. 274 * 275 * * tq_nthreads_target 276 * - The number of threads desired for the taskq. 277 * 278 * * tq_flags & TASKQ_CHANGING 279 * - Indicates that tq_nthreads != tq_nthreads_target. 280 * 281 * * tq_flags & TASKQ_THREAD_CREATED 282 * - Indicates that a thread is being created in the taskq. 283 * 284 * During creation, tq_nthreads and tq_active are set to 0, and 285 * tq_nthreads_target is set to the number of threads desired. The 286 * TASKQ_CHANGING flag is set, and taskq_create_thread() is called to 287 * create the first thread. taskq_create_thread() increments tq_active, 288 * sets TASKQ_THREAD_CREATED, and creates the new thread. 289 * 290 * Each thread starts in taskq_thread(), clears the TASKQ_THREAD_CREATED 291 * flag, and increments tq_nthreads. It stores the new value of 292 * tq_nthreads as its "thread_id", and stores its thread pointer in the 293 * tq_threadlist at the (thread_id - 1). We keep the thread_id space 294 * densely packed by requiring that only the largest thread_id can exit during 295 * normal adjustment. The exception is during the destruction of the 296 * taskq; once tq_nthreads_target is set to zero, no new threads will be created 297 * for the taskq queue, so every thread can exit without any ordering being 298 * necessary. 299 * 300 * Threads will only process work if their thread id is <= tq_nthreads_target. 301 * 302 * When TASKQ_CHANGING is set, threads will check the current thread target 303 * whenever they wake up, and do whatever they can to apply its effects. 304 * 305 * TASKQ_THREAD_CPU_PCT -------------------------------------------------------- 306 * 307 * When a taskq is created with TASKQ_THREAD_CPU_PCT, we store their requested 308 * percentage in tq_threads_ncpus_pct, start them off with the correct thread 309 * target, and add them to the taskq_cpupct_list for later adjustment. 310 * 311 * We register taskq_cpu_setup() to be called whenever a CPU changes state. It 312 * walks the list of TASKQ_THREAD_CPU_PCT taskqs, adjusts their nthread_target 313 * if need be, and wakes up all of the threads to process the change. 314 * 315 * Dynamic Task Queues Implementation ------------------------------------------ 316 * 317 * For a dynamic task queues there is a 1-to-1 mapping between a thread and 318 * taskq_ent_structure. Each entry is serviced by its own thread and each thread 319 * is controlled by a single entry. 320 * 321 * Entries are distributed over a set of buckets. To avoid using modulo 322 * arithmetics the number of buckets is 2^n and is determined as the nearest 323 * power of two roundown of the number of CPUs in the system. Tunable 324 * variable 'taskq_maxbuckets' limits the maximum number of buckets. Each entry 325 * is attached to a bucket for its lifetime and can't migrate to other buckets. 326 * 327 * Entries that have scheduled tasks are not placed in any list. The dispatch 328 * function sets their "func" and "arg" fields and signals the corresponding 329 * thread to execute the task. Once the thread executes the task it clears the 330 * "func" field and places an entry on the bucket cache of free entries pointed 331 * by "tqbucket_freelist" field. ALL entries on the free list should have "func" 332 * field equal to NULL. The free list is a circular doubly-linked list identical 333 * in structure to the tq_task list above, but entries are taken from it in LIFO 334 * order - the last freed entry is the first to be allocated. The 335 * taskq_bucket_dispatch() function gets the most recently used entry from the 336 * free list, sets its "func" and "arg" fields and signals a worker thread. 337 * 338 * After executing each task a per-entry thread taskq_d_thread() places its 339 * entry on the bucket free list and goes to a timed sleep. If it wakes up 340 * without getting new task it removes the entry from the free list and destroys 341 * itself. The thread sleep time is controlled by a tunable variable 342 * `taskq_thread_timeout'. 343 * 344 * There are various statistics kept in the bucket which allows for later 345 * analysis of taskq usage patterns. Also, a global copy of taskq creation and 346 * death statistics is kept in the global taskq data structure. Since thread 347 * creation and death happen rarely, updating such global data does not present 348 * a performance problem. 349 * 350 * NOTE: Threads are not bound to any CPU and there is absolutely no association 351 * between the bucket and actual thread CPU, so buckets are used only to 352 * split resources and reduce resource contention. Having threads attached 353 * to the CPU denoted by a bucket may reduce number of times the job 354 * switches between CPUs. 355 * 356 * Current algorithm creates a thread whenever a bucket has no free 357 * entries. It would be nice to know how many threads are in the running 358 * state and don't create threads if all CPUs are busy with existing 359 * tasks, but it is unclear how such strategy can be implemented. 360 * 361 * Currently buckets are created statically as an array attached to task 362 * queue. On some system with nCPUs < max_ncpus it may waste system 363 * memory. One solution may be allocation of buckets when they are first 364 * touched, but it is not clear how useful it is. 365 * 366 * SUSPEND/RESUME implementation ----------------------------------------------- 367 * 368 * Before executing a task taskq_thread() (executing non-dynamic task 369 * queues) obtains taskq's thread lock as a reader. The taskq_suspend() 370 * function gets the same lock as a writer blocking all non-dynamic task 371 * execution. The taskq_resume() function releases the lock allowing 372 * taskq_thread to continue execution. 373 * 374 * For dynamic task queues, each bucket is marked as TQBUCKET_SUSPEND by 375 * taskq_suspend() function. After that taskq_bucket_dispatch() always 376 * fails, so that taskq_dispatch() will either enqueue tasks for a 377 * suspended backing queue or fail if TQ_NOQUEUE is specified in dispatch 378 * flags. 379 * 380 * NOTE: taskq_suspend() does not immediately block any tasks already 381 * scheduled for dynamic task queues. It only suspends new tasks 382 * scheduled after taskq_suspend() was called. 383 * 384 * taskq_member() function works by comparing a thread t_taskq pointer with 385 * the passed thread pointer. 386 * 387 * LOCKS and LOCK Hierarchy ---------------------------------------------------- 388 * 389 * There are three locks used in task queues: 390 * 391 * 1) The taskq_t's tq_lock, protecting global task queue state. 392 * 393 * 2) Each per-CPU bucket has a lock for bucket management. 394 * 395 * 3) The global taskq_cpupct_lock, which protects the list of 396 * TASKQ_THREADS_CPU_PCT taskqs. 397 * 398 * If both (1) and (2) are needed, tq_lock should be taken *after* the bucket 399 * lock. 400 * 401 * If both (1) and (3) are needed, tq_lock should be taken *after* 402 * taskq_cpupct_lock. 403 * 404 * DEBUG FACILITIES ------------------------------------------------------------ 405 * 406 * For DEBUG kernels it is possible to induce random failures to 407 * taskq_dispatch() function when it is given TQ_NOSLEEP argument. The value of 408 * taskq_dmtbf and taskq_smtbf tunables control the mean time between induced 409 * failures for dynamic and static task queues respectively. 410 * 411 * Setting TASKQ_STATISTIC to 0 will disable per-bucket statistics. 412 * 413 * TUNABLES -------------------------------------------------------------------- 414 * 415 * system_taskq_size - Size of the global system_taskq. 416 * This value is multiplied by nCPUs to determine 417 * actual size. 418 * Default value: 64 419 * 420 * taskq_minimum_nthreads_max 421 * - Minimum size of the thread list for a taskq. 422 * Useful for testing different thread pool 423 * sizes by overwriting tq_nthreads_target. 424 * 425 * taskq_thread_timeout - Maximum idle time for taskq_d_thread() 426 * Default value: 5 minutes 427 * 428 * taskq_maxbuckets - Maximum number of buckets in any task queue 429 * Default value: 128 430 * 431 * taskq_search_depth - Maximum # of buckets searched for a free entry 432 * Default value: 4 433 * 434 * taskq_dmtbf - Mean time between induced dispatch failures 435 * for dynamic task queues. 436 * Default value: UINT_MAX (no induced failures) 437 * 438 * taskq_smtbf - Mean time between induced dispatch failures 439 * for static task queues. 440 * Default value: UINT_MAX (no induced failures) 441 * 442 * CONDITIONAL compilation ----------------------------------------------------- 443 * 444 * TASKQ_STATISTIC - If set will enable bucket statistic (default). 445 * 446 */ 447 448 #include <sys/taskq_impl.h> 449 #include <sys/thread.h> 450 #include <sys/proc.h> 451 #include <sys/kmem.h> 452 #include <sys/vmem.h> 453 #include <sys/callb.h> 454 #include <sys/systm.h> 455 #include <sys/cmn_err.h> 456 #include <sys/debug.h> 457 #include <sys/vmsystm.h> /* For throttlefree */ 458 #include <sys/sysmacros.h> 459 #include <sys/cpuvar.h> 460 #include <sys/sdt.h> 461 #include <sys/note.h> 462 463 static kmem_cache_t *taskq_ent_cache, *taskq_cache; 464 465 /* 466 * Pseudo instance numbers for taskqs without explicitly provided instance. 467 */ 468 static vmem_t *taskq_id_arena; 469 470 /* Global system task queue for common use */ 471 taskq_t *system_taskq; 472 473 /* 474 * Maximum number of entries in global system taskq is 475 * system_taskq_size * max_ncpus 476 */ 477 #define SYSTEM_TASKQ_SIZE 64 478 int system_taskq_size = SYSTEM_TASKQ_SIZE; 479 480 /* 481 * Minimum size for tq_nthreads_max; useful for those who want to play around 482 * with increasing a taskq's tq_nthreads_target. 483 */ 484 int taskq_minimum_nthreads_max = 1; 485 486 /* Maximum percentage allowed for TASKQ_THREADS_CPU_PCT */ 487 #define TASKQ_CPUPCT_MAX_PERCENT 1000 488 int taskq_cpupct_max_percent = TASKQ_CPUPCT_MAX_PERCENT; 489 490 /* 491 * Dynamic task queue threads that don't get any work within 492 * taskq_thread_timeout destroy themselves 493 */ 494 #define TASKQ_THREAD_TIMEOUT (60 * 5) 495 int taskq_thread_timeout = TASKQ_THREAD_TIMEOUT; 496 497 #define TASKQ_MAXBUCKETS 128 498 int taskq_maxbuckets = TASKQ_MAXBUCKETS; 499 500 /* 501 * When a bucket has no available entries another buckets are tried. 502 * taskq_search_depth parameter limits the amount of buckets that we search 503 * before failing. This is mostly useful in systems with many CPUs where we may 504 * spend too much time scanning busy buckets. 505 */ 506 #define TASKQ_SEARCH_DEPTH 4 507 int taskq_search_depth = TASKQ_SEARCH_DEPTH; 508 509 /* 510 * Hashing function: mix various bits of x. May be pretty much anything. 511 */ 512 #define TQ_HASH(x) ((x) ^ ((x) >> 11) ^ ((x) >> 17) ^ ((x) ^ 27)) 513 514 /* 515 * We do not create any new threads when the system is low on memory and start 516 * throttling memory allocations. The following macro tries to estimate such 517 * condition. 518 */ 519 #define ENOUGH_MEMORY() (freemem > throttlefree) 520 521 /* 522 * Static functions. 523 */ 524 static taskq_t *taskq_create_common(const char *, int, int, pri_t, int, 525 int, uint_t); 526 static void taskq_thread(void *); 527 static void taskq_d_thread(taskq_ent_t *); 528 static void taskq_bucket_extend(void *); 529 static int taskq_constructor(void *, void *, int); 530 static void taskq_destructor(void *, void *); 531 static int taskq_ent_constructor(void *, void *, int); 532 static void taskq_ent_destructor(void *, void *); 533 static taskq_ent_t *taskq_ent_alloc(taskq_t *, int); 534 static void taskq_ent_free(taskq_t *, taskq_ent_t *); 535 static taskq_ent_t *taskq_bucket_dispatch(taskq_bucket_t *, task_func_t, 536 void *); 537 538 /* 539 * Task queues kstats. 540 */ 541 struct taskq_kstat { 542 kstat_named_t tq_tasks; 543 kstat_named_t tq_executed; 544 kstat_named_t tq_maxtasks; 545 kstat_named_t tq_totaltime; 546 kstat_named_t tq_nalloc; 547 kstat_named_t tq_nactive; 548 kstat_named_t tq_pri; 549 kstat_named_t tq_nthreads; 550 } taskq_kstat = { 551 { "tasks", KSTAT_DATA_UINT64 }, 552 { "executed", KSTAT_DATA_UINT64 }, 553 { "maxtasks", KSTAT_DATA_UINT64 }, 554 { "totaltime", KSTAT_DATA_UINT64 }, 555 { "nactive", KSTAT_DATA_UINT64 }, 556 { "nalloc", KSTAT_DATA_UINT64 }, 557 { "priority", KSTAT_DATA_UINT64 }, 558 { "threads", KSTAT_DATA_UINT64 }, 559 }; 560 561 struct taskq_d_kstat { 562 kstat_named_t tqd_pri; 563 kstat_named_t tqd_btasks; 564 kstat_named_t tqd_bexecuted; 565 kstat_named_t tqd_bmaxtasks; 566 kstat_named_t tqd_bnalloc; 567 kstat_named_t tqd_bnactive; 568 kstat_named_t tqd_btotaltime; 569 kstat_named_t tqd_hits; 570 kstat_named_t tqd_misses; 571 kstat_named_t tqd_overflows; 572 kstat_named_t tqd_tcreates; 573 kstat_named_t tqd_tdeaths; 574 kstat_named_t tqd_maxthreads; 575 kstat_named_t tqd_nomem; 576 kstat_named_t tqd_disptcreates; 577 kstat_named_t tqd_totaltime; 578 kstat_named_t tqd_nalloc; 579 kstat_named_t tqd_nfree; 580 } taskq_d_kstat = { 581 { "priority", KSTAT_DATA_UINT64 }, 582 { "btasks", KSTAT_DATA_UINT64 }, 583 { "bexecuted", KSTAT_DATA_UINT64 }, 584 { "bmaxtasks", KSTAT_DATA_UINT64 }, 585 { "bnalloc", KSTAT_DATA_UINT64 }, 586 { "bnactive", KSTAT_DATA_UINT64 }, 587 { "btotaltime", KSTAT_DATA_UINT64 }, 588 { "hits", KSTAT_DATA_UINT64 }, 589 { "misses", KSTAT_DATA_UINT64 }, 590 { "overflows", KSTAT_DATA_UINT64 }, 591 { "tcreates", KSTAT_DATA_UINT64 }, 592 { "tdeaths", KSTAT_DATA_UINT64 }, 593 { "maxthreads", KSTAT_DATA_UINT64 }, 594 { "nomem", KSTAT_DATA_UINT64 }, 595 { "disptcreates", KSTAT_DATA_UINT64 }, 596 { "totaltime", KSTAT_DATA_UINT64 }, 597 { "nalloc", KSTAT_DATA_UINT64 }, 598 { "nfree", KSTAT_DATA_UINT64 }, 599 }; 600 601 static kmutex_t taskq_kstat_lock; 602 static kmutex_t taskq_d_kstat_lock; 603 static int taskq_kstat_update(kstat_t *, int); 604 static int taskq_d_kstat_update(kstat_t *, int); 605 606 /* 607 * State for THREAD_CPU_PCT management 608 */ 609 typedef struct taskq_cpupct_ent { 610 list_node_t tp_link; 611 taskq_t *tp_taskq; 612 } taskq_cpupct_ent_t; 613 614 static kmutex_t taskq_cpupct_lock; 615 static list_t taskq_cpupct_list; 616 static int taskq_cpupct_ncpus_online; 617 618 /* 619 * Collect per-bucket statistic when TASKQ_STATISTIC is defined. 620 */ 621 #define TASKQ_STATISTIC 1 622 623 #if TASKQ_STATISTIC 624 #define TQ_STAT(b, x) b->tqbucket_stat.x++ 625 #else 626 #define TQ_STAT(b, x) 627 #endif 628 629 /* 630 * Random fault injection. 631 */ 632 uint_t taskq_random; 633 uint_t taskq_dmtbf = UINT_MAX; /* mean time between injected failures */ 634 uint_t taskq_smtbf = UINT_MAX; /* mean time between injected failures */ 635 636 /* 637 * TQ_NOSLEEP dispatches on dynamic task queues are always allowed to fail. 638 * 639 * TQ_NOSLEEP dispatches on static task queues can't arbitrarily fail because 640 * they could prepopulate the cache and make sure that they do not use more 641 * then minalloc entries. So, fault injection in this case insures that 642 * either TASKQ_PREPOPULATE is not set or there are more entries allocated 643 * than is specified by minalloc. TQ_NOALLOC dispatches are always allowed 644 * to fail, but for simplicity we treat them identically to TQ_NOSLEEP 645 * dispatches. 646 */ 647 #ifdef DEBUG 648 #define TASKQ_D_RANDOM_DISPATCH_FAILURE(tq, flag) \ 649 taskq_random = (taskq_random * 2416 + 374441) % 1771875;\ 650 if ((flag & TQ_NOSLEEP) && \ 651 taskq_random < 1771875 / taskq_dmtbf) { \ 652 return (NULL); \ 653 } 654 655 #define TASKQ_S_RANDOM_DISPATCH_FAILURE(tq, flag) \ 656 taskq_random = (taskq_random * 2416 + 374441) % 1771875;\ 657 if ((flag & (TQ_NOSLEEP | TQ_NOALLOC)) && \ 658 (!(tq->tq_flags & TASKQ_PREPOPULATE) || \ 659 (tq->tq_nalloc > tq->tq_minalloc)) && \ 660 (taskq_random < (1771875 / taskq_smtbf))) { \ 661 mutex_exit(&tq->tq_lock); \ 662 return (NULL); \ 663 } 664 #else 665 #define TASKQ_S_RANDOM_DISPATCH_FAILURE(tq, flag) 666 #define TASKQ_D_RANDOM_DISPATCH_FAILURE(tq, flag) 667 #endif 668 669 #define IS_EMPTY(l) (((l).tqent_prev == (l).tqent_next) && \ 670 ((l).tqent_prev == &(l))) 671 672 /* 673 * Append `tqe' in the end of the doubly-linked list denoted by l. 674 */ 675 #define TQ_APPEND(l, tqe) { \ 676 tqe->tqent_next = &l; \ 677 tqe->tqent_prev = l.tqent_prev; \ 678 tqe->tqent_next->tqent_prev = tqe; \ 679 tqe->tqent_prev->tqent_next = tqe; \ 680 } 681 682 /* 683 * Schedule a task specified by func and arg into the task queue entry tqe. 684 */ 685 #define TQ_ENQUEUE(tq, tqe, func, arg) { \ 686 ASSERT(MUTEX_HELD(&tq->tq_lock)); \ 687 TQ_APPEND(tq->tq_task, tqe); \ 688 tqe->tqent_func = (func); \ 689 tqe->tqent_arg = (arg); \ 690 tq->tq_tasks++; \ 691 if (tq->tq_tasks - tq->tq_executed > tq->tq_maxtasks) \ 692 tq->tq_maxtasks = tq->tq_tasks - tq->tq_executed; \ 693 cv_signal(&tq->tq_dispatch_cv); \ 694 DTRACE_PROBE2(taskq__enqueue, taskq_t *, tq, taskq_ent_t *, tqe); \ 695 } 696 697 /* 698 * Do-nothing task which may be used to prepopulate thread caches. 699 */ 700 /*ARGSUSED*/ 701 void 702 nulltask(void *unused) 703 { 704 } 705 706 707 /*ARGSUSED*/ 708 static int 709 taskq_constructor(void *buf, void *cdrarg, int kmflags) 710 { 711 taskq_t *tq = buf; 712 713 bzero(tq, sizeof (taskq_t)); 714 715 mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL); 716 rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL); 717 cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL); 718 cv_init(&tq->tq_exit_cv, NULL, CV_DEFAULT, NULL); 719 cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL); 720 721 tq->tq_task.tqent_next = &tq->tq_task; 722 tq->tq_task.tqent_prev = &tq->tq_task; 723 724 return (0); 725 } 726 727 /*ARGSUSED*/ 728 static void 729 taskq_destructor(void *buf, void *cdrarg) 730 { 731 taskq_t *tq = buf; 732 733 ASSERT(tq->tq_nthreads == 0); 734 ASSERT(tq->tq_buckets == NULL); 735 ASSERT(tq->tq_tcreates == 0); 736 ASSERT(tq->tq_tdeaths == 0); 737 738 mutex_destroy(&tq->tq_lock); 739 rw_destroy(&tq->tq_threadlock); 740 cv_destroy(&tq->tq_dispatch_cv); 741 cv_destroy(&tq->tq_exit_cv); 742 cv_destroy(&tq->tq_wait_cv); 743 } 744 745 /*ARGSUSED*/ 746 static int 747 taskq_ent_constructor(void *buf, void *cdrarg, int kmflags) 748 { 749 taskq_ent_t *tqe = buf; 750 751 tqe->tqent_thread = NULL; 752 cv_init(&tqe->tqent_cv, NULL, CV_DEFAULT, NULL); 753 754 return (0); 755 } 756 757 /*ARGSUSED*/ 758 static void 759 taskq_ent_destructor(void *buf, void *cdrarg) 760 { 761 taskq_ent_t *tqe = buf; 762 763 ASSERT(tqe->tqent_thread == NULL); 764 cv_destroy(&tqe->tqent_cv); 765 } 766 767 void 768 taskq_init(void) 769 { 770 taskq_ent_cache = kmem_cache_create("taskq_ent_cache", 771 sizeof (taskq_ent_t), 0, taskq_ent_constructor, 772 taskq_ent_destructor, NULL, NULL, NULL, 0); 773 taskq_cache = kmem_cache_create("taskq_cache", sizeof (taskq_t), 774 0, taskq_constructor, taskq_destructor, NULL, NULL, NULL, 0); 775 taskq_id_arena = vmem_create("taskq_id_arena", 776 (void *)1, INT32_MAX, 1, NULL, NULL, NULL, 0, 777 VM_SLEEP | VMC_IDENTIFIER); 778 779 list_create(&taskq_cpupct_list, sizeof (taskq_cpupct_ent_t), 780 offsetof(taskq_cpupct_ent_t, tp_link)); 781 } 782 783 /*ARGSUSED*/ 784 static int 785 taskq_cpu_setup(cpu_setup_t what, int id, void *arg) 786 { 787 taskq_cpupct_ent_t *tpp; 788 int cpus_online = ncpus_online; 789 790 /* offlines are called *before* the cpu is offlined. */ 791 if (what == CPU_OFF) 792 cpus_online--; 793 if (cpus_online < 1) 794 cpus_online = 1; 795 796 mutex_enter(&taskq_cpupct_lock); 797 if (cpus_online == taskq_cpupct_ncpus_online) { 798 mutex_exit(&taskq_cpupct_lock); 799 return (0); 800 } 801 802 for (tpp = list_head(&taskq_cpupct_list); tpp != NULL; 803 tpp = list_next(&taskq_cpupct_list, tpp)) { 804 taskq_t *tq = tpp->tp_taskq; 805 int newtarget; 806 807 mutex_enter(&tq->tq_lock); 808 newtarget = 809 TASKQ_THREADS_PCT(cpus_online, tq->tq_threads_ncpus_pct); 810 ASSERT3S(newtarget, <=, tq->tq_nthreads_max); 811 if (newtarget != tq->tq_nthreads_target) { 812 /* The taskq must not be exiting */ 813 ASSERT3S(tq->tq_nthreads_target, !=, 0); 814 tq->tq_flags |= TASKQ_CHANGING; 815 tq->tq_nthreads_target = newtarget; 816 cv_broadcast(&tq->tq_dispatch_cv); 817 cv_broadcast(&tq->tq_exit_cv); 818 } 819 mutex_exit(&tq->tq_lock); 820 } 821 822 taskq_cpupct_ncpus_online = cpus_online; 823 mutex_exit(&taskq_cpupct_lock); 824 return (0); 825 } 826 827 void 828 taskq_mp_init(void) 829 { 830 mutex_enter(&cpu_lock); 831 register_cpu_setup_func(taskq_cpu_setup, NULL); 832 (void) taskq_cpu_setup(CPU_ON, 0, NULL); 833 mutex_exit(&cpu_lock); 834 } 835 836 /* 837 * Create global system dynamic task queue. 838 */ 839 void 840 system_taskq_init(void) 841 { 842 system_taskq = taskq_create_common("system_taskq", 0, 843 system_taskq_size * max_ncpus, minclsyspri, 4, 512, 844 TASKQ_DYNAMIC | TASKQ_PREPOPULATE); 845 } 846 847 /* 848 * taskq_ent_alloc() 849 * 850 * Allocates a new taskq_ent_t structure either from the free list or from the 851 * cache. Returns NULL if it can't be allocated. 852 * 853 * Assumes: tq->tq_lock is held. 854 */ 855 static taskq_ent_t * 856 taskq_ent_alloc(taskq_t *tq, int flags) 857 { 858 int kmflags = (flags & TQ_NOSLEEP) ? KM_NOSLEEP : KM_SLEEP; 859 860 taskq_ent_t *tqe; 861 862 ASSERT(MUTEX_HELD(&tq->tq_lock)); 863 864 /* 865 * TQ_NOALLOC allocations are allowed to use the freelist, even if 866 * we are below tq_minalloc. 867 */ 868 if ((tqe = tq->tq_freelist) != NULL && 869 ((flags & TQ_NOALLOC) || tq->tq_nalloc >= tq->tq_minalloc)) { 870 tq->tq_freelist = tqe->tqent_next; 871 } else { 872 if (flags & TQ_NOALLOC) 873 return (NULL); 874 875 mutex_exit(&tq->tq_lock); 876 if (tq->tq_nalloc >= tq->tq_maxalloc) { 877 if (kmflags & KM_NOSLEEP) { 878 mutex_enter(&tq->tq_lock); 879 return (NULL); 880 } 881 /* 882 * We don't want to exceed tq_maxalloc, but we can't 883 * wait for other tasks to complete (and thus free up 884 * task structures) without risking deadlock with 885 * the caller. So, we just delay for one second 886 * to throttle the allocation rate. 887 */ 888 delay(hz); 889 } 890 tqe = kmem_cache_alloc(taskq_ent_cache, kmflags); 891 mutex_enter(&tq->tq_lock); 892 if (tqe != NULL) 893 tq->tq_nalloc++; 894 } 895 return (tqe); 896 } 897 898 /* 899 * taskq_ent_free() 900 * 901 * Free taskq_ent_t structure by either putting it on the free list or freeing 902 * it to the cache. 903 * 904 * Assumes: tq->tq_lock is held. 905 */ 906 static void 907 taskq_ent_free(taskq_t *tq, taskq_ent_t *tqe) 908 { 909 ASSERT(MUTEX_HELD(&tq->tq_lock)); 910 911 if (tq->tq_nalloc <= tq->tq_minalloc) { 912 tqe->tqent_next = tq->tq_freelist; 913 tq->tq_freelist = tqe; 914 } else { 915 tq->tq_nalloc--; 916 mutex_exit(&tq->tq_lock); 917 kmem_cache_free(taskq_ent_cache, tqe); 918 mutex_enter(&tq->tq_lock); 919 } 920 } 921 922 /* 923 * Dispatch a task "func(arg)" to a free entry of bucket b. 924 * 925 * Assumes: no bucket locks is held. 926 * 927 * Returns: a pointer to an entry if dispatch was successful. 928 * NULL if there are no free entries or if the bucket is suspended. 929 */ 930 static taskq_ent_t * 931 taskq_bucket_dispatch(taskq_bucket_t *b, task_func_t func, void *arg) 932 { 933 taskq_ent_t *tqe; 934 935 ASSERT(MUTEX_NOT_HELD(&b->tqbucket_lock)); 936 ASSERT(func != NULL); 937 938 mutex_enter(&b->tqbucket_lock); 939 940 ASSERT(b->tqbucket_nfree != 0 || IS_EMPTY(b->tqbucket_freelist)); 941 ASSERT(b->tqbucket_nfree == 0 || !IS_EMPTY(b->tqbucket_freelist)); 942 943 /* 944 * Get en entry from the freelist if there is one. 945 * Schedule task into the entry. 946 */ 947 if ((b->tqbucket_nfree != 0) && 948 !(b->tqbucket_flags & TQBUCKET_SUSPEND)) { 949 tqe = b->tqbucket_freelist.tqent_prev; 950 951 ASSERT(tqe != &b->tqbucket_freelist); 952 ASSERT(tqe->tqent_thread != NULL); 953 954 tqe->tqent_prev->tqent_next = tqe->tqent_next; 955 tqe->tqent_next->tqent_prev = tqe->tqent_prev; 956 b->tqbucket_nalloc++; 957 b->tqbucket_nfree--; 958 tqe->tqent_func = func; 959 tqe->tqent_arg = arg; 960 TQ_STAT(b, tqs_hits); 961 cv_signal(&tqe->tqent_cv); 962 DTRACE_PROBE2(taskq__d__enqueue, taskq_bucket_t *, b, 963 taskq_ent_t *, tqe); 964 } else { 965 tqe = NULL; 966 TQ_STAT(b, tqs_misses); 967 } 968 mutex_exit(&b->tqbucket_lock); 969 return (tqe); 970 } 971 972 /* 973 * Dispatch a task. 974 * 975 * Assumes: func != NULL 976 * 977 * Returns: NULL if dispatch failed. 978 * non-NULL if task dispatched successfully. 979 * Actual return value is the pointer to taskq entry that was used to 980 * dispatch a task. This is useful for debugging. 981 */ 982 /* ARGSUSED */ 983 taskqid_t 984 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) 985 { 986 taskq_bucket_t *bucket = NULL; /* Which bucket needs extension */ 987 taskq_ent_t *tqe = NULL; 988 taskq_ent_t *tqe1; 989 uint_t bsize; 990 991 ASSERT(tq != NULL); 992 ASSERT(func != NULL); 993 994 if (!(tq->tq_flags & TASKQ_DYNAMIC)) { 995 /* 996 * TQ_NOQUEUE flag can't be used with non-dynamic task queues. 997 */ 998 ASSERT(! (flags & TQ_NOQUEUE)); 999 /* 1000 * Enqueue the task to the underlying queue. 1001 */ 1002 mutex_enter(&tq->tq_lock); 1003 1004 TASKQ_S_RANDOM_DISPATCH_FAILURE(tq, flags); 1005 1006 if ((tqe = taskq_ent_alloc(tq, flags)) == NULL) { 1007 mutex_exit(&tq->tq_lock); 1008 return (NULL); 1009 } 1010 TQ_ENQUEUE(tq, tqe, func, arg); 1011 mutex_exit(&tq->tq_lock); 1012 return ((taskqid_t)tqe); 1013 } 1014 1015 /* 1016 * Dynamic taskq dispatching. 1017 */ 1018 ASSERT(!(flags & TQ_NOALLOC)); 1019 TASKQ_D_RANDOM_DISPATCH_FAILURE(tq, flags); 1020 1021 bsize = tq->tq_nbuckets; 1022 1023 if (bsize == 1) { 1024 /* 1025 * In a single-CPU case there is only one bucket, so get 1026 * entry directly from there. 1027 */ 1028 if ((tqe = taskq_bucket_dispatch(tq->tq_buckets, func, arg)) 1029 != NULL) 1030 return ((taskqid_t)tqe); /* Fastpath */ 1031 bucket = tq->tq_buckets; 1032 } else { 1033 int loopcount; 1034 taskq_bucket_t *b; 1035 uintptr_t h = ((uintptr_t)CPU + (uintptr_t)arg) >> 3; 1036 1037 h = TQ_HASH(h); 1038 1039 /* 1040 * The 'bucket' points to the original bucket that we hit. If we 1041 * can't allocate from it, we search other buckets, but only 1042 * extend this one. 1043 */ 1044 b = &tq->tq_buckets[h & (bsize - 1)]; 1045 ASSERT(b->tqbucket_taskq == tq); /* Sanity check */ 1046 1047 /* 1048 * Do a quick check before grabbing the lock. If the bucket does 1049 * not have free entries now, chances are very small that it 1050 * will after we take the lock, so we just skip it. 1051 */ 1052 if (b->tqbucket_nfree != 0) { 1053 if ((tqe = taskq_bucket_dispatch(b, func, arg)) != NULL) 1054 return ((taskqid_t)tqe); /* Fastpath */ 1055 } else { 1056 TQ_STAT(b, tqs_misses); 1057 } 1058 1059 bucket = b; 1060 loopcount = MIN(taskq_search_depth, bsize); 1061 /* 1062 * If bucket dispatch failed, search loopcount number of buckets 1063 * before we give up and fail. 1064 */ 1065 do { 1066 b = &tq->tq_buckets[++h & (bsize - 1)]; 1067 ASSERT(b->tqbucket_taskq == tq); /* Sanity check */ 1068 loopcount--; 1069 1070 if (b->tqbucket_nfree != 0) { 1071 tqe = taskq_bucket_dispatch(b, func, arg); 1072 } else { 1073 TQ_STAT(b, tqs_misses); 1074 } 1075 } while ((tqe == NULL) && (loopcount > 0)); 1076 } 1077 1078 /* 1079 * At this point we either scheduled a task and (tqe != NULL) or failed 1080 * (tqe == NULL). Try to recover from fails. 1081 */ 1082 1083 /* 1084 * For KM_SLEEP dispatches, try to extend the bucket and retry dispatch. 1085 */ 1086 if ((tqe == NULL) && !(flags & TQ_NOSLEEP)) { 1087 /* 1088 * taskq_bucket_extend() may fail to do anything, but this is 1089 * fine - we deal with it later. If the bucket was successfully 1090 * extended, there is a good chance that taskq_bucket_dispatch() 1091 * will get this new entry, unless someone is racing with us and 1092 * stealing the new entry from under our nose. 1093 * taskq_bucket_extend() may sleep. 1094 */ 1095 taskq_bucket_extend(bucket); 1096 TQ_STAT(bucket, tqs_disptcreates); 1097 if ((tqe = taskq_bucket_dispatch(bucket, func, arg)) != NULL) 1098 return ((taskqid_t)tqe); 1099 } 1100 1101 ASSERT(bucket != NULL); 1102 /* 1103 * Since there are not enough free entries in the bucket, extend it 1104 * in the background using backing queue. 1105 */ 1106 mutex_enter(&tq->tq_lock); 1107 if ((tqe1 = taskq_ent_alloc(tq, TQ_NOSLEEP)) != NULL) { 1108 TQ_ENQUEUE(tq, tqe1, taskq_bucket_extend, 1109 bucket); 1110 } else { 1111 TQ_STAT(bucket, tqs_nomem); 1112 } 1113 1114 /* 1115 * Dispatch failed and we can't find an entry to schedule a task. 1116 * Revert to the backing queue unless TQ_NOQUEUE was asked. 1117 */ 1118 if ((tqe == NULL) && !(flags & TQ_NOQUEUE)) { 1119 if ((tqe = taskq_ent_alloc(tq, flags)) != NULL) { 1120 TQ_ENQUEUE(tq, tqe, func, arg); 1121 } else { 1122 TQ_STAT(bucket, tqs_nomem); 1123 } 1124 } 1125 mutex_exit(&tq->tq_lock); 1126 1127 return ((taskqid_t)tqe); 1128 } 1129 1130 /* 1131 * Wait for all pending tasks to complete. 1132 * Calling taskq_wait from a task will cause deadlock. 1133 */ 1134 void 1135 taskq_wait(taskq_t *tq) 1136 { 1137 ASSERT(tq != curthread->t_taskq); 1138 1139 mutex_enter(&tq->tq_lock); 1140 while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0) 1141 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 1142 mutex_exit(&tq->tq_lock); 1143 1144 if (tq->tq_flags & TASKQ_DYNAMIC) { 1145 taskq_bucket_t *b = tq->tq_buckets; 1146 int bid = 0; 1147 for (; (b != NULL) && (bid < tq->tq_nbuckets); b++, bid++) { 1148 mutex_enter(&b->tqbucket_lock); 1149 while (b->tqbucket_nalloc > 0) 1150 cv_wait(&b->tqbucket_cv, &b->tqbucket_lock); 1151 mutex_exit(&b->tqbucket_lock); 1152 } 1153 } 1154 } 1155 1156 /* 1157 * Suspend execution of tasks. 1158 * 1159 * Tasks in the queue part will be suspended immediately upon return from this 1160 * function. Pending tasks in the dynamic part will continue to execute, but all 1161 * new tasks will be suspended. 1162 */ 1163 void 1164 taskq_suspend(taskq_t *tq) 1165 { 1166 rw_enter(&tq->tq_threadlock, RW_WRITER); 1167 1168 if (tq->tq_flags & TASKQ_DYNAMIC) { 1169 taskq_bucket_t *b = tq->tq_buckets; 1170 int bid = 0; 1171 for (; (b != NULL) && (bid < tq->tq_nbuckets); b++, bid++) { 1172 mutex_enter(&b->tqbucket_lock); 1173 b->tqbucket_flags |= TQBUCKET_SUSPEND; 1174 mutex_exit(&b->tqbucket_lock); 1175 } 1176 } 1177 /* 1178 * Mark task queue as being suspended. Needed for taskq_suspended(). 1179 */ 1180 mutex_enter(&tq->tq_lock); 1181 ASSERT(!(tq->tq_flags & TASKQ_SUSPENDED)); 1182 tq->tq_flags |= TASKQ_SUSPENDED; 1183 mutex_exit(&tq->tq_lock); 1184 } 1185 1186 /* 1187 * returns: 1 if tq is suspended, 0 otherwise. 1188 */ 1189 int 1190 taskq_suspended(taskq_t *tq) 1191 { 1192 return ((tq->tq_flags & TASKQ_SUSPENDED) != 0); 1193 } 1194 1195 /* 1196 * Resume taskq execution. 1197 */ 1198 void 1199 taskq_resume(taskq_t *tq) 1200 { 1201 ASSERT(RW_WRITE_HELD(&tq->tq_threadlock)); 1202 1203 if (tq->tq_flags & TASKQ_DYNAMIC) { 1204 taskq_bucket_t *b = tq->tq_buckets; 1205 int bid = 0; 1206 for (; (b != NULL) && (bid < tq->tq_nbuckets); b++, bid++) { 1207 mutex_enter(&b->tqbucket_lock); 1208 b->tqbucket_flags &= ~TQBUCKET_SUSPEND; 1209 mutex_exit(&b->tqbucket_lock); 1210 } 1211 } 1212 mutex_enter(&tq->tq_lock); 1213 ASSERT(tq->tq_flags & TASKQ_SUSPENDED); 1214 tq->tq_flags &= ~TASKQ_SUSPENDED; 1215 mutex_exit(&tq->tq_lock); 1216 1217 rw_exit(&tq->tq_threadlock); 1218 } 1219 1220 int 1221 taskq_member(taskq_t *tq, kthread_t *thread) 1222 { 1223 return (thread->t_taskq == tq); 1224 } 1225 1226 static void 1227 taskq_thread_create(taskq_t *tq) 1228 { 1229 kthread_t *t; 1230 1231 ASSERT(MUTEX_HELD(&tq->tq_lock)); 1232 ASSERT(!(tq->tq_flags & TASKQ_THREAD_CREATED)); 1233 1234 tq->tq_flags |= TASKQ_THREAD_CREATED; 1235 tq->tq_active++; 1236 t = thread_create(NULL, 0, taskq_thread, tq, 0, &p0, TS_RUN, 1237 tq->tq_pri); 1238 t->t_taskq = tq; 1239 } 1240 1241 /* 1242 * Common "sleep taskq thread" function, which handles CPR stuff, as well 1243 * as giving a nice common point for debuggers to find inactive threads. 1244 */ 1245 static clock_t 1246 taskq_thread_wait(taskq_t *tq, kmutex_t *mx, kcondvar_t *cv, 1247 callb_cpr_t *cprinfo, clock_t timeout) 1248 { 1249 clock_t ret = 0; 1250 1251 if (!(tq->tq_flags & TASKQ_CPR_SAFE)) { 1252 CALLB_CPR_SAFE_BEGIN(cprinfo); 1253 } 1254 if (timeout < 0) 1255 cv_wait(cv, mx); 1256 else 1257 ret = cv_reltimedwait(cv, mx, timeout, TR_CLOCK_TICK); 1258 1259 if (!(tq->tq_flags & TASKQ_CPR_SAFE)) { 1260 CALLB_CPR_SAFE_END(cprinfo, mx); 1261 } 1262 1263 return (ret); 1264 } 1265 1266 /* 1267 * Worker thread for processing task queue. 1268 */ 1269 static void 1270 taskq_thread(void *arg) 1271 { 1272 int thread_id; 1273 1274 taskq_t *tq = arg; 1275 taskq_ent_t *tqe; 1276 callb_cpr_t cprinfo; 1277 hrtime_t start, end; 1278 1279 if (tq->tq_flags & TASKQ_CPR_SAFE) { 1280 CALLB_CPR_INIT_SAFE(curthread, tq->tq_name); 1281 } else { 1282 CALLB_CPR_INIT(&cprinfo, &tq->tq_lock, callb_generic_cpr, 1283 tq->tq_name); 1284 } 1285 mutex_enter(&tq->tq_lock); 1286 thread_id = ++tq->tq_nthreads; 1287 ASSERT(tq->tq_flags & TASKQ_THREAD_CREATED); 1288 tq->tq_flags &= ~TASKQ_THREAD_CREATED; 1289 1290 VERIFY3S(thread_id, <=, tq->tq_nthreads_max); 1291 1292 if (tq->tq_nthreads_max == 1) 1293 tq->tq_thread = curthread; 1294 else 1295 tq->tq_threadlist[thread_id - 1] = curthread; 1296 1297 for (;;) { 1298 if (tq->tq_flags & TASKQ_CHANGING) { 1299 /* we're done; clear the CHANGING flag */ 1300 if (tq->tq_nthreads == tq->tq_nthreads_target) { 1301 tq->tq_flags &= ~TASKQ_CHANGING; 1302 continue; 1303 } 1304 /* We're low on threads and none have been created */ 1305 if (tq->tq_nthreads < tq->tq_nthreads_target && 1306 !(tq->tq_flags & TASKQ_THREAD_CREATED)) { 1307 taskq_thread_create(tq); 1308 continue; 1309 } 1310 /* We're no longer needed */ 1311 if (thread_id > tq->tq_nthreads_target) { 1312 /* 1313 * To preserve the one-to-one mapping between 1314 * thread_id and thread, we must exit from 1315 * highest thread ID to least. 1316 * 1317 * However, if everyone is exiting, the order 1318 * doesn't matter, so just exit immediately. 1319 * (this is safe, since you must wait for 1320 * nthreads to reach 0 after setting 1321 * tq_nthreads_target to 0) 1322 */ 1323 if (thread_id == tq->tq_nthreads || 1324 tq->tq_nthreads_target == 0) 1325 break; 1326 1327 /* Wait for higher thread_ids to exit */ 1328 (void) taskq_thread_wait(tq, &tq->tq_lock, 1329 &tq->tq_exit_cv, &cprinfo, -1); 1330 continue; 1331 } 1332 } 1333 if ((tqe = tq->tq_task.tqent_next) == &tq->tq_task) { 1334 if (--tq->tq_active == 0) 1335 cv_broadcast(&tq->tq_wait_cv); 1336 (void) taskq_thread_wait(tq, &tq->tq_lock, 1337 &tq->tq_dispatch_cv, &cprinfo, -1); 1338 tq->tq_active++; 1339 continue; 1340 } 1341 tqe->tqent_prev->tqent_next = tqe->tqent_next; 1342 tqe->tqent_next->tqent_prev = tqe->tqent_prev; 1343 mutex_exit(&tq->tq_lock); 1344 1345 rw_enter(&tq->tq_threadlock, RW_READER); 1346 start = gethrtime(); 1347 DTRACE_PROBE2(taskq__exec__start, taskq_t *, tq, 1348 taskq_ent_t *, tqe); 1349 tqe->tqent_func(tqe->tqent_arg); 1350 DTRACE_PROBE2(taskq__exec__end, taskq_t *, tq, 1351 taskq_ent_t *, tqe); 1352 end = gethrtime(); 1353 rw_exit(&tq->tq_threadlock); 1354 1355 mutex_enter(&tq->tq_lock); 1356 tq->tq_totaltime += end - start; 1357 tq->tq_executed++; 1358 1359 taskq_ent_free(tq, tqe); 1360 } 1361 1362 if (tq->tq_nthreads_max == 1) 1363 tq->tq_thread = NULL; 1364 else 1365 tq->tq_threadlist[thread_id - 1] = NULL; 1366 1367 ASSERT(tq->tq_nthreads > 0); 1368 if (--tq->tq_nthreads == 0) 1369 cv_broadcast(&tq->tq_wait_cv); 1370 1371 /* let the other threads which need to exit know we're done */ 1372 cv_broadcast(&tq->tq_exit_cv); 1373 1374 /* We're exiting, and therefore no longer active */ 1375 tq->tq_active--; 1376 1377 ASSERT(!(tq->tq_flags & TASKQ_CPR_SAFE)); 1378 CALLB_CPR_EXIT(&cprinfo); 1379 thread_exit(); 1380 } 1381 1382 /* 1383 * Worker per-entry thread for dynamic dispatches. 1384 */ 1385 static void 1386 taskq_d_thread(taskq_ent_t *tqe) 1387 { 1388 taskq_bucket_t *bucket = tqe->tqent_bucket; 1389 taskq_t *tq = bucket->tqbucket_taskq; 1390 kmutex_t *lock = &bucket->tqbucket_lock; 1391 kcondvar_t *cv = &tqe->tqent_cv; 1392 callb_cpr_t cprinfo; 1393 clock_t w; 1394 1395 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, tq->tq_name); 1396 1397 mutex_enter(lock); 1398 1399 for (;;) { 1400 /* 1401 * If a task is scheduled (func != NULL), execute it, otherwise 1402 * sleep, waiting for a job. 1403 */ 1404 if (tqe->tqent_func != NULL) { 1405 hrtime_t start; 1406 hrtime_t end; 1407 1408 ASSERT(bucket->tqbucket_nalloc > 0); 1409 1410 /* 1411 * It is possible to free the entry right away before 1412 * actually executing the task so that subsequent 1413 * dispatches may immediately reuse it. But this, 1414 * effectively, creates a two-length queue in the entry 1415 * and may lead to a deadlock if the execution of the 1416 * current task depends on the execution of the next 1417 * scheduled task. So, we keep the entry busy until the 1418 * task is processed. 1419 */ 1420 1421 mutex_exit(lock); 1422 start = gethrtime(); 1423 DTRACE_PROBE3(taskq__d__exec__start, taskq_t *, tq, 1424 taskq_bucket_t *, bucket, taskq_ent_t *, tqe); 1425 tqe->tqent_func(tqe->tqent_arg); 1426 DTRACE_PROBE3(taskq__d__exec__end, taskq_t *, tq, 1427 taskq_bucket_t *, bucket, taskq_ent_t *, tqe); 1428 end = gethrtime(); 1429 mutex_enter(lock); 1430 bucket->tqbucket_totaltime += end - start; 1431 1432 /* 1433 * Return the entry to the bucket free list. 1434 */ 1435 tqe->tqent_func = NULL; 1436 TQ_APPEND(bucket->tqbucket_freelist, tqe); 1437 bucket->tqbucket_nalloc--; 1438 bucket->tqbucket_nfree++; 1439 ASSERT(!IS_EMPTY(bucket->tqbucket_freelist)); 1440 /* 1441 * taskq_wait() waits for nalloc to drop to zero on 1442 * tqbucket_cv. 1443 */ 1444 cv_signal(&bucket->tqbucket_cv); 1445 } 1446 1447 /* 1448 * At this point the entry must be in the bucket free list - 1449 * either because it was there initially or because it just 1450 * finished executing a task and put itself on the free list. 1451 */ 1452 ASSERT(bucket->tqbucket_nfree > 0); 1453 /* 1454 * Go to sleep unless we are closing. 1455 * If a thread is sleeping too long, it dies. 1456 */ 1457 if (! (bucket->tqbucket_flags & TQBUCKET_CLOSE)) { 1458 w = taskq_thread_wait(tq, lock, cv, 1459 &cprinfo, taskq_thread_timeout * hz); 1460 } 1461 1462 /* 1463 * At this point we may be in two different states: 1464 * 1465 * (1) tqent_func is set which means that a new task is 1466 * dispatched and we need to execute it. 1467 * 1468 * (2) Thread is sleeping for too long or we are closing. In 1469 * both cases destroy the thread and the entry. 1470 */ 1471 1472 /* If func is NULL we should be on the freelist. */ 1473 ASSERT((tqe->tqent_func != NULL) || 1474 (bucket->tqbucket_nfree > 0)); 1475 /* If func is non-NULL we should be allocated */ 1476 ASSERT((tqe->tqent_func == NULL) || 1477 (bucket->tqbucket_nalloc > 0)); 1478 1479 /* Check freelist consistency */ 1480 ASSERT((bucket->tqbucket_nfree > 0) || 1481 IS_EMPTY(bucket->tqbucket_freelist)); 1482 ASSERT((bucket->tqbucket_nfree == 0) || 1483 !IS_EMPTY(bucket->tqbucket_freelist)); 1484 1485 if ((tqe->tqent_func == NULL) && 1486 ((w == -1) || (bucket->tqbucket_flags & TQBUCKET_CLOSE))) { 1487 /* 1488 * This thread is sleeping for too long or we are 1489 * closing - time to die. 1490 * Thread creation/destruction happens rarely, 1491 * so grabbing the lock is not a big performance issue. 1492 * The bucket lock is dropped by CALLB_CPR_EXIT(). 1493 */ 1494 1495 /* Remove the entry from the free list. */ 1496 tqe->tqent_prev->tqent_next = tqe->tqent_next; 1497 tqe->tqent_next->tqent_prev = tqe->tqent_prev; 1498 ASSERT(bucket->tqbucket_nfree > 0); 1499 bucket->tqbucket_nfree--; 1500 1501 TQ_STAT(bucket, tqs_tdeaths); 1502 cv_signal(&bucket->tqbucket_cv); 1503 tqe->tqent_thread = NULL; 1504 mutex_enter(&tq->tq_lock); 1505 tq->tq_tdeaths++; 1506 mutex_exit(&tq->tq_lock); 1507 CALLB_CPR_EXIT(&cprinfo); 1508 kmem_cache_free(taskq_ent_cache, tqe); 1509 thread_exit(); 1510 } 1511 } 1512 } 1513 1514 1515 /* 1516 * Taskq creation. May sleep for memory. 1517 * Always use automatically generated instances to avoid kstat name space 1518 * collisions. 1519 */ 1520 1521 taskq_t * 1522 taskq_create(const char *name, int nthreads, pri_t pri, int minalloc, 1523 int maxalloc, uint_t flags) 1524 { 1525 return taskq_create_common(name, 0, nthreads, pri, minalloc, 1526 maxalloc, flags | TASKQ_NOINSTANCE); 1527 } 1528 1529 /* 1530 * Create an instance of task queue. It is legal to create task queues with the 1531 * same name and different instances. 1532 * 1533 * taskq_create_instance is used by ddi_taskq_create() where it gets the 1534 * instance from ddi_get_instance(). In some cases the instance is not 1535 * initialized and is set to -1. This case is handled as if no instance was 1536 * passed at all. 1537 */ 1538 taskq_t * 1539 taskq_create_instance(const char *name, int instance, int nthreads, pri_t pri, 1540 int minalloc, int maxalloc, uint_t flags) 1541 { 1542 ASSERT((instance >= 0) || (instance == -1)); 1543 1544 if (instance < 0) { 1545 flags |= TASKQ_NOINSTANCE; 1546 } 1547 1548 return (taskq_create_common(name, instance, nthreads, 1549 pri, minalloc, maxalloc, flags)); 1550 } 1551 1552 static taskq_t * 1553 taskq_create_common(const char *name, int instance, int nthreads, pri_t pri, 1554 int minalloc, int maxalloc, uint_t flags) 1555 { 1556 taskq_t *tq = kmem_cache_alloc(taskq_cache, KM_SLEEP); 1557 uint_t ncpus = ((boot_max_ncpus == -1) ? max_ncpus : boot_max_ncpus); 1558 uint_t bsize; /* # of buckets - always power of 2 */ 1559 int max_nthreads; 1560 1561 /* 1562 * TASKQ_DYNAMIC is incompatible with TASKQ_CPR_SAFE and 1563 * TASKQ_THREADS_CPU_PCT. 1564 */ 1565 ASSERT(!(flags & TASKQ_DYNAMIC) || 1566 !(flags & (TASKQ_CPR_SAFE | TASKQ_THREADS_CPU_PCT))); 1567 /* TASKQ_CPR_SAFE is incompatible with TASKQ_THREADS_CPU_PCT */ 1568 1569 ASSERT(!(flags & TASKQ_CPR_SAFE) || !(flags & TASKQ_THREADS_CPU_PCT)); 1570 1571 bsize = 1 << (highbit(ncpus) - 1); 1572 ASSERT(bsize >= 1); 1573 bsize = MIN(bsize, taskq_maxbuckets); 1574 1575 if (flags & TASKQ_DYNAMIC) { 1576 ASSERT3S(nthreads, >=, 1); 1577 tq->tq_maxsize = nthreads; 1578 1579 /* For dynamic task queues use just one backup thread */ 1580 nthreads = max_nthreads = 1; 1581 1582 } else if (!(flags & TASKQ_THREADS_CPU_PCT)) { 1583 ASSERT3S(nthreads, >=, 1); 1584 max_nthreads = nthreads; 1585 } else { 1586 uint_t pct; 1587 ASSERT3S(nthreads, >=, 0); 1588 pct = nthreads; 1589 1590 if (pct > taskq_cpupct_max_percent) 1591 pct = taskq_cpupct_max_percent; 1592 1593 tq->tq_threads_ncpus_pct = pct; 1594 nthreads = TASKQ_THREADS_PCT(ncpus_online, pct); 1595 max_nthreads = TASKQ_THREADS_PCT(max_ncpus, pct); 1596 } 1597 1598 if (max_nthreads < taskq_minimum_nthreads_max) 1599 max_nthreads = taskq_minimum_nthreads_max; 1600 1601 /* 1602 * Make sure the name is 0-terminated, and conforms to the rules for 1603 * C indentifiers 1604 */ 1605 (void) strncpy(tq->tq_name, name, TASKQ_NAMELEN + 1); 1606 strident_canon(tq->tq_name, TASKQ_NAMELEN + 1); 1607 1608 tq->tq_flags = flags | TASKQ_CHANGING; 1609 tq->tq_active = 0; 1610 tq->tq_instance = instance; 1611 tq->tq_nthreads_target = nthreads; 1612 tq->tq_nthreads_max = max_nthreads; 1613 tq->tq_minalloc = minalloc; 1614 tq->tq_maxalloc = maxalloc; 1615 tq->tq_nbuckets = bsize; 1616 tq->tq_pri = pri; 1617 1618 if (max_nthreads > 1) 1619 tq->tq_threadlist = kmem_alloc( 1620 sizeof (kthread_t *) * max_nthreads, KM_SLEEP); 1621 1622 /* Add the taskq to the list of CPU_PCT taskqs */ 1623 if (flags & TASKQ_THREADS_CPU_PCT) { 1624 taskq_cpupct_ent_t *tpp = kmem_zalloc(sizeof (*tpp), KM_SLEEP); 1625 1626 list_link_init(&tpp->tp_link); 1627 tpp->tp_taskq = tq; 1628 1629 mutex_enter(&taskq_cpupct_lock); 1630 list_insert_tail(&taskq_cpupct_list, tpp); 1631 /* reset our target, to avoid race conditions */ 1632 tq->tq_nthreads_target = TASKQ_THREADS_PCT(ncpus_online, 1633 tq->tq_threads_ncpus_pct); 1634 mutex_exit(&taskq_cpupct_lock); 1635 } 1636 1637 mutex_enter(&tq->tq_lock); 1638 if (flags & TASKQ_PREPOPULATE) { 1639 while (minalloc-- > 0) 1640 taskq_ent_free(tq, taskq_ent_alloc(tq, TQ_SLEEP)); 1641 } 1642 1643 /* create the first thread; if more are needed, it'll create them */ 1644 taskq_thread_create(tq); 1645 mutex_exit(&tq->tq_lock); 1646 1647 if (flags & TASKQ_DYNAMIC) { 1648 taskq_bucket_t *bucket = kmem_zalloc(sizeof (taskq_bucket_t) * 1649 bsize, KM_SLEEP); 1650 int b_id; 1651 1652 tq->tq_buckets = bucket; 1653 1654 /* Initialize each bucket */ 1655 for (b_id = 0; b_id < bsize; b_id++, bucket++) { 1656 mutex_init(&bucket->tqbucket_lock, NULL, MUTEX_DEFAULT, 1657 NULL); 1658 cv_init(&bucket->tqbucket_cv, NULL, CV_DEFAULT, NULL); 1659 bucket->tqbucket_taskq = tq; 1660 bucket->tqbucket_freelist.tqent_next = 1661 bucket->tqbucket_freelist.tqent_prev = 1662 &bucket->tqbucket_freelist; 1663 if (flags & TASKQ_PREPOPULATE) 1664 taskq_bucket_extend(bucket); 1665 } 1666 } 1667 1668 /* 1669 * Install kstats. 1670 * We have two cases: 1671 * 1) Instance is provided to taskq_create_instance(). In this case it 1672 * should be >= 0 and we use it. 1673 * 1674 * 2) Instance is not provided and is automatically generated 1675 */ 1676 if (flags & TASKQ_NOINSTANCE) { 1677 instance = tq->tq_instance = 1678 (int)(uintptr_t)vmem_alloc(taskq_id_arena, 1, VM_SLEEP); 1679 } 1680 1681 if (flags & TASKQ_DYNAMIC) { 1682 if ((tq->tq_kstat = kstat_create("unix", instance, 1683 tq->tq_name, "taskq_d", KSTAT_TYPE_NAMED, 1684 sizeof (taskq_d_kstat) / sizeof (kstat_named_t), 1685 KSTAT_FLAG_VIRTUAL)) != NULL) { 1686 tq->tq_kstat->ks_lock = &taskq_d_kstat_lock; 1687 tq->tq_kstat->ks_data = &taskq_d_kstat; 1688 tq->tq_kstat->ks_update = taskq_d_kstat_update; 1689 tq->tq_kstat->ks_private = tq; 1690 kstat_install(tq->tq_kstat); 1691 } 1692 } else { 1693 if ((tq->tq_kstat = kstat_create("unix", instance, tq->tq_name, 1694 "taskq", KSTAT_TYPE_NAMED, 1695 sizeof (taskq_kstat) / sizeof (kstat_named_t), 1696 KSTAT_FLAG_VIRTUAL)) != NULL) { 1697 tq->tq_kstat->ks_lock = &taskq_kstat_lock; 1698 tq->tq_kstat->ks_data = &taskq_kstat; 1699 tq->tq_kstat->ks_update = taskq_kstat_update; 1700 tq->tq_kstat->ks_private = tq; 1701 kstat_install(tq->tq_kstat); 1702 } 1703 } 1704 1705 return (tq); 1706 } 1707 1708 /* 1709 * taskq_destroy(). 1710 * 1711 * Assumes: by the time taskq_destroy is called no one will use this task queue 1712 * in any way and no one will try to dispatch entries in it. 1713 */ 1714 void 1715 taskq_destroy(taskq_t *tq) 1716 { 1717 taskq_bucket_t *b = tq->tq_buckets; 1718 int bid = 0; 1719 1720 ASSERT(! (tq->tq_flags & TASKQ_CPR_SAFE)); 1721 1722 /* 1723 * Destroy kstats. 1724 */ 1725 if (tq->tq_kstat != NULL) { 1726 kstat_delete(tq->tq_kstat); 1727 tq->tq_kstat = NULL; 1728 } 1729 1730 /* 1731 * Destroy instance if needed. 1732 */ 1733 if (tq->tq_flags & TASKQ_NOINSTANCE) { 1734 vmem_free(taskq_id_arena, (void *)(uintptr_t)(tq->tq_instance), 1735 1); 1736 tq->tq_instance = 0; 1737 } 1738 1739 /* 1740 * Unregister from the cpupct list. 1741 */ 1742 if (tq->tq_flags & TASKQ_THREADS_CPU_PCT) { 1743 taskq_cpupct_ent_t *tpp; 1744 1745 mutex_enter(&taskq_cpupct_lock); 1746 for (tpp = list_head(&taskq_cpupct_list); tpp != NULL; 1747 tpp = list_next(&taskq_cpupct_list, tpp)) { 1748 if (tpp->tp_taskq == tq) 1749 break; 1750 } 1751 ASSERT3P(tpp, !=, NULL); 1752 1753 list_remove(&taskq_cpupct_list, tpp); 1754 mutex_exit(&taskq_cpupct_lock); 1755 1756 kmem_free(tpp, sizeof (*tpp)); 1757 } 1758 1759 /* 1760 * Wait for any pending entries to complete. 1761 */ 1762 taskq_wait(tq); 1763 1764 mutex_enter(&tq->tq_lock); 1765 ASSERT((tq->tq_task.tqent_next == &tq->tq_task) && 1766 (tq->tq_active == 0)); 1767 1768 /* notify all the threads that they need to exit */ 1769 tq->tq_nthreads_target = 0; 1770 1771 tq->tq_flags |= TASKQ_CHANGING; 1772 cv_broadcast(&tq->tq_dispatch_cv); 1773 cv_broadcast(&tq->tq_exit_cv); 1774 1775 while (tq->tq_nthreads != 0) 1776 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 1777 1778 if (tq->tq_nthreads_max != 1) 1779 kmem_free(tq->tq_threadlist, sizeof (kthread_t *) * 1780 tq->tq_nthreads_max); 1781 1782 tq->tq_minalloc = 0; 1783 while (tq->tq_nalloc != 0) 1784 taskq_ent_free(tq, taskq_ent_alloc(tq, TQ_SLEEP)); 1785 1786 mutex_exit(&tq->tq_lock); 1787 1788 /* 1789 * Mark each bucket as closing and wakeup all sleeping threads. 1790 */ 1791 for (; (b != NULL) && (bid < tq->tq_nbuckets); b++, bid++) { 1792 taskq_ent_t *tqe; 1793 1794 mutex_enter(&b->tqbucket_lock); 1795 1796 b->tqbucket_flags |= TQBUCKET_CLOSE; 1797 /* Wakeup all sleeping threads */ 1798 1799 for (tqe = b->tqbucket_freelist.tqent_next; 1800 tqe != &b->tqbucket_freelist; tqe = tqe->tqent_next) 1801 cv_signal(&tqe->tqent_cv); 1802 1803 ASSERT(b->tqbucket_nalloc == 0); 1804 1805 /* 1806 * At this point we waited for all pending jobs to complete (in 1807 * both the task queue and the bucket and no new jobs should 1808 * arrive. Wait for all threads to die. 1809 */ 1810 while (b->tqbucket_nfree > 0) 1811 cv_wait(&b->tqbucket_cv, &b->tqbucket_lock); 1812 mutex_exit(&b->tqbucket_lock); 1813 mutex_destroy(&b->tqbucket_lock); 1814 cv_destroy(&b->tqbucket_cv); 1815 } 1816 1817 if (tq->tq_buckets != NULL) { 1818 ASSERT(tq->tq_flags & TASKQ_DYNAMIC); 1819 kmem_free(tq->tq_buckets, 1820 sizeof (taskq_bucket_t) * tq->tq_nbuckets); 1821 1822 /* Cleanup fields before returning tq to the cache */ 1823 tq->tq_buckets = NULL; 1824 tq->tq_tcreates = 0; 1825 tq->tq_tdeaths = 0; 1826 } else { 1827 ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC)); 1828 } 1829 1830 tq->tq_threads_ncpus_pct = 0; 1831 tq->tq_totaltime = 0; 1832 tq->tq_tasks = 0; 1833 tq->tq_maxtasks = 0; 1834 tq->tq_executed = 0; 1835 kmem_cache_free(taskq_cache, tq); 1836 } 1837 1838 /* 1839 * Extend a bucket with a new entry on the free list and attach a worker thread 1840 * to it. 1841 * 1842 * Argument: pointer to the bucket. 1843 * 1844 * This function may quietly fail. It is only used by taskq_dispatch() which 1845 * handles such failures properly. 1846 */ 1847 static void 1848 taskq_bucket_extend(void *arg) 1849 { 1850 taskq_ent_t *tqe; 1851 taskq_bucket_t *b = (taskq_bucket_t *)arg; 1852 taskq_t *tq = b->tqbucket_taskq; 1853 int nthreads; 1854 1855 if (! ENOUGH_MEMORY()) { 1856 TQ_STAT(b, tqs_nomem); 1857 return; 1858 } 1859 1860 mutex_enter(&tq->tq_lock); 1861 1862 /* 1863 * Observe global taskq limits on the number of threads. 1864 */ 1865 if (tq->tq_tcreates++ - tq->tq_tdeaths > tq->tq_maxsize) { 1866 tq->tq_tcreates--; 1867 mutex_exit(&tq->tq_lock); 1868 return; 1869 } 1870 mutex_exit(&tq->tq_lock); 1871 1872 tqe = kmem_cache_alloc(taskq_ent_cache, KM_NOSLEEP); 1873 1874 if (tqe == NULL) { 1875 mutex_enter(&tq->tq_lock); 1876 TQ_STAT(b, tqs_nomem); 1877 tq->tq_tcreates--; 1878 mutex_exit(&tq->tq_lock); 1879 return; 1880 } 1881 1882 ASSERT(tqe->tqent_thread == NULL); 1883 1884 tqe->tqent_bucket = b; 1885 1886 /* 1887 * Create a thread in a TS_STOPPED state first. If it is successfully 1888 * created, place the entry on the free list and start the thread. 1889 */ 1890 tqe->tqent_thread = thread_create(NULL, 0, taskq_d_thread, tqe, 1891 0, &p0, TS_STOPPED, tq->tq_pri); 1892 1893 /* 1894 * Once the entry is ready, link it to the the bucket free list. 1895 */ 1896 mutex_enter(&b->tqbucket_lock); 1897 tqe->tqent_func = NULL; 1898 TQ_APPEND(b->tqbucket_freelist, tqe); 1899 b->tqbucket_nfree++; 1900 TQ_STAT(b, tqs_tcreates); 1901 1902 #if TASKQ_STATISTIC 1903 nthreads = b->tqbucket_stat.tqs_tcreates - 1904 b->tqbucket_stat.tqs_tdeaths; 1905 b->tqbucket_stat.tqs_maxthreads = MAX(nthreads, 1906 b->tqbucket_stat.tqs_maxthreads); 1907 #endif 1908 1909 mutex_exit(&b->tqbucket_lock); 1910 /* 1911 * Start the stopped thread. 1912 */ 1913 thread_lock(tqe->tqent_thread); 1914 tqe->tqent_thread->t_taskq = tq; 1915 tqe->tqent_thread->t_schedflag |= TS_ALLSTART; 1916 setrun_locked(tqe->tqent_thread); 1917 thread_unlock(tqe->tqent_thread); 1918 } 1919 1920 static int 1921 taskq_kstat_update(kstat_t *ksp, int rw) 1922 { 1923 struct taskq_kstat *tqsp = &taskq_kstat; 1924 taskq_t *tq = ksp->ks_private; 1925 1926 if (rw == KSTAT_WRITE) 1927 return (EACCES); 1928 1929 tqsp->tq_tasks.value.ui64 = tq->tq_tasks; 1930 tqsp->tq_executed.value.ui64 = tq->tq_executed; 1931 tqsp->tq_maxtasks.value.ui64 = tq->tq_maxtasks; 1932 tqsp->tq_totaltime.value.ui64 = tq->tq_totaltime; 1933 tqsp->tq_nactive.value.ui64 = tq->tq_active; 1934 tqsp->tq_nalloc.value.ui64 = tq->tq_nalloc; 1935 tqsp->tq_pri.value.ui64 = tq->tq_pri; 1936 tqsp->tq_nthreads.value.ui64 = tq->tq_nthreads; 1937 return (0); 1938 } 1939 1940 static int 1941 taskq_d_kstat_update(kstat_t *ksp, int rw) 1942 { 1943 struct taskq_d_kstat *tqsp = &taskq_d_kstat; 1944 taskq_t *tq = ksp->ks_private; 1945 taskq_bucket_t *b = tq->tq_buckets; 1946 int bid = 0; 1947 1948 if (rw == KSTAT_WRITE) 1949 return (EACCES); 1950 1951 ASSERT(tq->tq_flags & TASKQ_DYNAMIC); 1952 1953 tqsp->tqd_btasks.value.ui64 = tq->tq_tasks; 1954 tqsp->tqd_bexecuted.value.ui64 = tq->tq_executed; 1955 tqsp->tqd_bmaxtasks.value.ui64 = tq->tq_maxtasks; 1956 tqsp->tqd_bnalloc.value.ui64 = tq->tq_nalloc; 1957 tqsp->tqd_bnactive.value.ui64 = tq->tq_active; 1958 tqsp->tqd_btotaltime.value.ui64 = tq->tq_totaltime; 1959 tqsp->tqd_pri.value.ui64 = tq->tq_pri; 1960 1961 tqsp->tqd_hits.value.ui64 = 0; 1962 tqsp->tqd_misses.value.ui64 = 0; 1963 tqsp->tqd_overflows.value.ui64 = 0; 1964 tqsp->tqd_tcreates.value.ui64 = 0; 1965 tqsp->tqd_tdeaths.value.ui64 = 0; 1966 tqsp->tqd_maxthreads.value.ui64 = 0; 1967 tqsp->tqd_nomem.value.ui64 = 0; 1968 tqsp->tqd_disptcreates.value.ui64 = 0; 1969 tqsp->tqd_totaltime.value.ui64 = 0; 1970 tqsp->tqd_nalloc.value.ui64 = 0; 1971 tqsp->tqd_nfree.value.ui64 = 0; 1972 1973 for (; (b != NULL) && (bid < tq->tq_nbuckets); b++, bid++) { 1974 tqsp->tqd_hits.value.ui64 += b->tqbucket_stat.tqs_hits; 1975 tqsp->tqd_misses.value.ui64 += b->tqbucket_stat.tqs_misses; 1976 tqsp->tqd_overflows.value.ui64 += b->tqbucket_stat.tqs_overflow; 1977 tqsp->tqd_tcreates.value.ui64 += b->tqbucket_stat.tqs_tcreates; 1978 tqsp->tqd_tdeaths.value.ui64 += b->tqbucket_stat.tqs_tdeaths; 1979 tqsp->tqd_maxthreads.value.ui64 += 1980 b->tqbucket_stat.tqs_maxthreads; 1981 tqsp->tqd_nomem.value.ui64 += b->tqbucket_stat.tqs_nomem; 1982 tqsp->tqd_disptcreates.value.ui64 += 1983 b->tqbucket_stat.tqs_disptcreates; 1984 tqsp->tqd_totaltime.value.ui64 += b->tqbucket_totaltime; 1985 tqsp->tqd_nalloc.value.ui64 += b->tqbucket_nalloc; 1986 tqsp->tqd_nfree.value.ui64 += b->tqbucket_nfree; 1987 } 1988 return (0); 1989 } 1990