1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2009 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 26 /* 27 * Kernel task queues: general-purpose asynchronous task scheduling. 28 * 29 * A common problem in kernel programming is the need to schedule tasks 30 * to be performed later, by another thread. There are several reasons 31 * you may want or need to do this: 32 * 33 * (1) The task isn't time-critical, but your current code path is. 34 * 35 * (2) The task may require grabbing locks that you already hold. 36 * 37 * (3) The task may need to block (e.g. to wait for memory), but you 38 * cannot block in your current context. 39 * 40 * (4) Your code path can't complete because of some condition, but you can't 41 * sleep or fail, so you queue the task for later execution when condition 42 * disappears. 43 * 44 * (5) You just want a simple way to launch multiple tasks in parallel. 45 * 46 * Task queues provide such a facility. In its simplest form (used when 47 * performance is not a critical consideration) a task queue consists of a 48 * single list of tasks, together with one or more threads to service the 49 * list. There are some cases when this simple queue is not sufficient: 50 * 51 * (1) The task queues are very hot and there is a need to avoid data and lock 52 * contention over global resources. 53 * 54 * (2) Some tasks may depend on other tasks to complete, so they can't be put in 55 * the same list managed by the same thread. 56 * 57 * (3) Some tasks may block for a long time, and this should not block other 58 * tasks in the queue. 59 * 60 * To provide useful service in such cases we define a "dynamic task queue" 61 * which has an individual thread for each of the tasks. These threads are 62 * dynamically created as they are needed and destroyed when they are not in 63 * use. The API for managing task pools is the same as for managing task queues 64 * with the exception of a taskq creation flag TASKQ_DYNAMIC which tells that 65 * dynamic task pool behavior is desired. 66 * 67 * Dynamic task queues may also place tasks in the normal queue (called "backing 68 * queue") when task pool runs out of resources. Users of task queues may 69 * disallow such queued scheduling by specifying TQ_NOQUEUE in the dispatch 70 * flags. 71 * 72 * The backing task queue is also used for scheduling internal tasks needed for 73 * dynamic task queue maintenance. 74 * 75 * INTERFACES ================================================================== 76 * 77 * taskq_t *taskq_create(name, nthreads, pri_t pri, minalloc, maxall, flags); 78 * 79 * Create a taskq with specified properties. 80 * Possible 'flags': 81 * 82 * TASKQ_DYNAMIC: Create task pool for task management. If this flag is 83 * specified, 'nthreads' specifies the maximum number of threads in 84 * the task queue. Task execution order for dynamic task queues is 85 * not predictable. 86 * 87 * If this flag is not specified (default case) a 88 * single-list task queue is created with 'nthreads' threads 89 * servicing it. Entries in this queue are managed by 90 * taskq_ent_alloc() and taskq_ent_free() which try to keep the 91 * task population between 'minalloc' and 'maxalloc', but the 92 * latter limit is only advisory for TQ_SLEEP dispatches and the 93 * former limit is only advisory for TQ_NOALLOC dispatches. If 94 * TASKQ_PREPOPULATE is set in 'flags', the taskq will be 95 * prepopulated with 'minalloc' task structures. 96 * 97 * Since non-DYNAMIC taskqs are queues, tasks are guaranteed to be 98 * executed in the order they are scheduled if nthreads == 1. 99 * If nthreads > 1, task execution order is not predictable. 100 * 101 * TASKQ_PREPOPULATE: Prepopulate task queue with threads. 102 * Also prepopulate the task queue with 'minalloc' task structures. 103 * 104 * TASKQ_THREADS_CPU_PCT: This flag specifies that 'nthreads' should be 105 * interpreted as a percentage of the # of online CPUs on the 106 * system. The taskq subsystem will automatically adjust the 107 * number of threads in the taskq in response to CPU online 108 * and offline events, to keep the ratio. nthreads must be in 109 * the range [0,100]. 110 * 111 * The calculation used is: 112 * 113 * MAX((ncpus_online * percentage)/100, 1) 114 * 115 * This flag is not supported for DYNAMIC task queues. 116 * This flag is not compatible with TASKQ_CPR_SAFE. 117 * 118 * TASKQ_CPR_SAFE: This flag specifies that users of the task queue will 119 * use their own protocol for handling CPR issues. This flag is not 120 * supported for DYNAMIC task queues. This flag is not compatible 121 * with TASKQ_THREADS_CPU_PCT. 122 * 123 * The 'pri' field specifies the default priority for the threads that 124 * service all scheduled tasks. 125 * 126 * void taskq_destroy(tap): 127 * 128 * Waits for any scheduled tasks to complete, then destroys the taskq. 129 * Caller should guarantee that no new tasks are scheduled in the closing 130 * taskq. 131 * 132 * taskqid_t taskq_dispatch(tq, func, arg, flags): 133 * 134 * Dispatches the task "func(arg)" to taskq. The 'flags' indicates whether 135 * the caller is willing to block for memory. The function returns an 136 * opaque value which is zero iff dispatch fails. If flags is TQ_NOSLEEP 137 * or TQ_NOALLOC and the task can't be dispatched, taskq_dispatch() fails 138 * and returns (taskqid_t)0. 139 * 140 * ASSUMES: func != NULL. 141 * 142 * Possible flags: 143 * TQ_NOSLEEP: Do not wait for resources; may fail. 144 * 145 * TQ_NOALLOC: Do not allocate memory; may fail. May only be used with 146 * non-dynamic task queues. 147 * 148 * TQ_NOQUEUE: Do not enqueue a task if it can't dispatch it due to 149 * lack of available resources and fail. If this flag is not 150 * set, and the task pool is exhausted, the task may be scheduled 151 * in the backing queue. This flag may ONLY be used with dynamic 152 * task queues. 153 * 154 * NOTE: This flag should always be used when a task queue is used 155 * for tasks that may depend on each other for completion. 156 * Enqueueing dependent tasks may create deadlocks. 157 * 158 * TQ_SLEEP: May block waiting for resources. May still fail for 159 * dynamic task queues if TQ_NOQUEUE is also specified, otherwise 160 * always succeed. 161 * 162 * NOTE: Dynamic task queues are much more likely to fail in 163 * taskq_dispatch() (especially if TQ_NOQUEUE was specified), so it 164 * is important to have backup strategies handling such failures. 165 * 166 * void taskq_wait(tq): 167 * 168 * Waits for all previously scheduled tasks to complete. 169 * 170 * NOTE: It does not stop any new task dispatches. 171 * Do NOT call taskq_wait() from a task: it will cause deadlock. 172 * 173 * void taskq_suspend(tq) 174 * 175 * Suspend all task execution. Tasks already scheduled for a dynamic task 176 * queue will still be executed, but all new scheduled tasks will be 177 * suspended until taskq_resume() is called. 178 * 179 * int taskq_suspended(tq) 180 * 181 * Returns 1 if taskq is suspended and 0 otherwise. It is intended to 182 * ASSERT that the task queue is suspended. 183 * 184 * void taskq_resume(tq) 185 * 186 * Resume task queue execution. 187 * 188 * int taskq_member(tq, thread) 189 * 190 * Returns 1 if 'thread' belongs to taskq 'tq' and 0 otherwise. The 191 * intended use is to ASSERT that a given function is called in taskq 192 * context only. 193 * 194 * system_taskq 195 * 196 * Global system-wide dynamic task queue for common uses. It may be used by 197 * any subsystem that needs to schedule tasks and does not need to manage 198 * its own task queues. It is initialized quite early during system boot. 199 * 200 * IMPLEMENTATION ============================================================== 201 * 202 * This is schematic representation of the task queue structures. 203 * 204 * taskq: 205 * +-------------+ 206 * |tq_lock | +---< taskq_ent_free() 207 * +-------------+ | 208 * |... | | tqent: tqent: 209 * +-------------+ | +------------+ +------------+ 210 * | tq_freelist |-->| tqent_next |--> ... ->| tqent_next | 211 * +-------------+ +------------+ +------------+ 212 * |... | | ... | | ... | 213 * +-------------+ +------------+ +------------+ 214 * | tq_task | | 215 * | | +-------------->taskq_ent_alloc() 216 * +--------------------------------------------------------------------------+ 217 * | | | tqent tqent | 218 * | +---------------------+ +--> +------------+ +--> +------------+ | 219 * | | ... | | | func, arg | | | func, arg | | 220 * +>+---------------------+ <---|-+ +------------+ <---|-+ +------------+ | 221 * | tq_taskq.tqent_next | ----+ | | tqent_next | --->+ | | tqent_next |--+ 222 * +---------------------+ | +------------+ ^ | +------------+ 223 * +-| tq_task.tqent_prev | +--| tqent_prev | | +--| tqent_prev | ^ 224 * | +---------------------+ +------------+ | +------------+ | 225 * | |... | | ... | | | ... | | 226 * | +---------------------+ +------------+ | +------------+ | 227 * | ^ | | 228 * | | | | 229 * +--------------------------------------+--------------+ TQ_APPEND() -+ 230 * | | | 231 * |... | taskq_thread()-----+ 232 * +-------------+ 233 * | tq_buckets |--+-------> [ NULL ] (for regular task queues) 234 * +-------------+ | 235 * | DYNAMIC TASK QUEUES: 236 * | 237 * +-> taskq_bucket[nCPU] taskq_bucket_dispatch() 238 * +-------------------+ ^ 239 * +--->| tqbucket_lock | | 240 * | +-------------------+ +--------+ +--------+ 241 * | | tqbucket_freelist |-->| tqent |-->...| tqent | ^ 242 * | +-------------------+<--+--------+<--...+--------+ | 243 * | | ... | | thread | | thread | | 244 * | +-------------------+ +--------+ +--------+ | 245 * | +-------------------+ | 246 * taskq_dispatch()--+--->| tqbucket_lock | TQ_APPEND()------+ 247 * TQ_HASH() | +-------------------+ +--------+ +--------+ 248 * | | tqbucket_freelist |-->| tqent |-->...| tqent | 249 * | +-------------------+<--+--------+<--...+--------+ 250 * | | ... | | thread | | thread | 251 * | +-------------------+ +--------+ +--------+ 252 * +---> ... 253 * 254 * 255 * Task queues use tq_task field to link new entry in the queue. The queue is a 256 * circular doubly-linked list. Entries are put in the end of the list with 257 * TQ_APPEND() and processed from the front of the list by taskq_thread() in 258 * FIFO order. Task queue entries are cached in the free list managed by 259 * taskq_ent_alloc() and taskq_ent_free() functions. 260 * 261 * All threads used by task queues mark t_taskq field of the thread to 262 * point to the task queue. 263 * 264 * Taskq Thread Management ----------------------------------------------------- 265 * 266 * Taskq's non-dynamic threads are managed with several variables and flags: 267 * 268 * * tq_nthreads - The number of threads in taskq_thread() for the 269 * taskq. 270 * 271 * * tq_active - The number of threads not waiting on a CV in 272 * taskq_thread(); includes newly created threads 273 * not yet counted in tq_nthreads. 274 * 275 * * tq_nthreads_target 276 * - The number of threads desired for the taskq. 277 * 278 * * tq_flags & TASKQ_CHANGING 279 * - Indicates that tq_nthreads != tq_nthreads_target. 280 * 281 * * tq_flags & TASKQ_THREAD_CREATED 282 * - Indicates that a thread is being created in the taskq. 283 * 284 * During creation, tq_nthreads and tq_active are set to 0, and 285 * tq_nthreads_target is set to the number of threads desired. The 286 * TASKQ_CHANGING flag is set, and taskq_create_thread() is called to 287 * create the first thread. taskq_create_thread() increments tq_active, 288 * sets TASKQ_THREAD_CREATED, and creates the new thread. 289 * 290 * Each thread starts in taskq_thread(), clears the TASKQ_THREAD_CREATED 291 * flag, and increments tq_nthreads. It stores the new value of 292 * tq_nthreads as its "thread_id", and stores its thread pointer in the 293 * tq_threadlist at the (thread_id - 1). We keep the thread_id space 294 * densely packed by requiring that only the largest thread_id can exit during 295 * normal adjustment. The exception is during the destruction of the 296 * taskq; once tq_nthreads_target is set to zero, no new threads will be created 297 * for the taskq queue, so every thread can exit without any ordering being 298 * necessary. 299 * 300 * Threads will only process work if their thread id is <= tq_nthreads_target. 301 * 302 * When TASKQ_CHANGING is set, threads will check the current thread target 303 * whenever they wake up, and do whatever they can to apply its effects. 304 * 305 * TASKQ_THREAD_CPU_PCT -------------------------------------------------------- 306 * 307 * When a taskq is created with TASKQ_THREAD_CPU_PCT, we store their requested 308 * percentage in tq_threads_ncpus_pct, start them off with the correct thread 309 * target, and add them to the taskq_cpupct_list for later adjustment. 310 * 311 * We register taskq_cpu_setup() to be called whenever a CPU changes state. It 312 * walks the list of TASKQ_THREAD_CPU_PCT taskqs, adjusts their nthread_target 313 * if need be, and wakes up all of the threads to process the change. 314 * 315 * Dynamic Task Queues Implementation ------------------------------------------ 316 * 317 * For a dynamic task queues there is a 1-to-1 mapping between a thread and 318 * taskq_ent_structure. Each entry is serviced by its own thread and each thread 319 * is controlled by a single entry. 320 * 321 * Entries are distributed over a set of buckets. To avoid using modulo 322 * arithmetics the number of buckets is 2^n and is determined as the nearest 323 * power of two roundown of the number of CPUs in the system. Tunable 324 * variable 'taskq_maxbuckets' limits the maximum number of buckets. Each entry 325 * is attached to a bucket for its lifetime and can't migrate to other buckets. 326 * 327 * Entries that have scheduled tasks are not placed in any list. The dispatch 328 * function sets their "func" and "arg" fields and signals the corresponding 329 * thread to execute the task. Once the thread executes the task it clears the 330 * "func" field and places an entry on the bucket cache of free entries pointed 331 * by "tqbucket_freelist" field. ALL entries on the free list should have "func" 332 * field equal to NULL. The free list is a circular doubly-linked list identical 333 * in structure to the tq_task list above, but entries are taken from it in LIFO 334 * order - the last freed entry is the first to be allocated. The 335 * taskq_bucket_dispatch() function gets the most recently used entry from the 336 * free list, sets its "func" and "arg" fields and signals a worker thread. 337 * 338 * After executing each task a per-entry thread taskq_d_thread() places its 339 * entry on the bucket free list and goes to a timed sleep. If it wakes up 340 * without getting new task it removes the entry from the free list and destroys 341 * itself. The thread sleep time is controlled by a tunable variable 342 * `taskq_thread_timeout'. 343 * 344 * There are various statistics kept in the bucket which allows for later 345 * analysis of taskq usage patterns. Also, a global copy of taskq creation and 346 * death statistics is kept in the global taskq data structure. Since thread 347 * creation and death happen rarely, updating such global data does not present 348 * a performance problem. 349 * 350 * NOTE: Threads are not bound to any CPU and there is absolutely no association 351 * between the bucket and actual thread CPU, so buckets are used only to 352 * split resources and reduce resource contention. Having threads attached 353 * to the CPU denoted by a bucket may reduce number of times the job 354 * switches between CPUs. 355 * 356 * Current algorithm creates a thread whenever a bucket has no free 357 * entries. It would be nice to know how many threads are in the running 358 * state and don't create threads if all CPUs are busy with existing 359 * tasks, but it is unclear how such strategy can be implemented. 360 * 361 * Currently buckets are created statically as an array attached to task 362 * queue. On some system with nCPUs < max_ncpus it may waste system 363 * memory. One solution may be allocation of buckets when they are first 364 * touched, but it is not clear how useful it is. 365 * 366 * SUSPEND/RESUME implementation ----------------------------------------------- 367 * 368 * Before executing a task taskq_thread() (executing non-dynamic task 369 * queues) obtains taskq's thread lock as a reader. The taskq_suspend() 370 * function gets the same lock as a writer blocking all non-dynamic task 371 * execution. The taskq_resume() function releases the lock allowing 372 * taskq_thread to continue execution. 373 * 374 * For dynamic task queues, each bucket is marked as TQBUCKET_SUSPEND by 375 * taskq_suspend() function. After that taskq_bucket_dispatch() always 376 * fails, so that taskq_dispatch() will either enqueue tasks for a 377 * suspended backing queue or fail if TQ_NOQUEUE is specified in dispatch 378 * flags. 379 * 380 * NOTE: taskq_suspend() does not immediately block any tasks already 381 * scheduled for dynamic task queues. It only suspends new tasks 382 * scheduled after taskq_suspend() was called. 383 * 384 * taskq_member() function works by comparing a thread t_taskq pointer with 385 * the passed thread pointer. 386 * 387 * LOCKS and LOCK Hierarchy ---------------------------------------------------- 388 * 389 * There are three locks used in task queues: 390 * 391 * 1) The taskq_t's tq_lock, protecting global task queue state. 392 * 393 * 2) Each per-CPU bucket has a lock for bucket management. 394 * 395 * 3) The global taskq_cpupct_lock, which protects the list of 396 * TASKQ_THREADS_CPU_PCT taskqs. 397 * 398 * If both (1) and (2) are needed, tq_lock should be taken *after* the bucket 399 * lock. 400 * 401 * If both (1) and (3) are needed, tq_lock should be taken *after* 402 * taskq_cpupct_lock. 403 * 404 * DEBUG FACILITIES ------------------------------------------------------------ 405 * 406 * For DEBUG kernels it is possible to induce random failures to 407 * taskq_dispatch() function when it is given TQ_NOSLEEP argument. The value of 408 * taskq_dmtbf and taskq_smtbf tunables control the mean time between induced 409 * failures for dynamic and static task queues respectively. 410 * 411 * Setting TASKQ_STATISTIC to 0 will disable per-bucket statistics. 412 * 413 * TUNABLES -------------------------------------------------------------------- 414 * 415 * system_taskq_size - Size of the global system_taskq. 416 * This value is multiplied by nCPUs to determine 417 * actual size. 418 * Default value: 64 419 * 420 * taskq_minimum_nthreads_max 421 * - Minimum size of the thread list for a taskq. 422 * Useful for testing different thread pool 423 * sizes by overwriting tq_nthreads_target. 424 * 425 * taskq_thread_timeout - Maximum idle time for taskq_d_thread() 426 * Default value: 5 minutes 427 * 428 * taskq_maxbuckets - Maximum number of buckets in any task queue 429 * Default value: 128 430 * 431 * taskq_search_depth - Maximum # of buckets searched for a free entry 432 * Default value: 4 433 * 434 * taskq_dmtbf - Mean time between induced dispatch failures 435 * for dynamic task queues. 436 * Default value: UINT_MAX (no induced failures) 437 * 438 * taskq_smtbf - Mean time between induced dispatch failures 439 * for static task queues. 440 * Default value: UINT_MAX (no induced failures) 441 * 442 * CONDITIONAL compilation ----------------------------------------------------- 443 * 444 * TASKQ_STATISTIC - If set will enable bucket statistic (default). 445 * 446 */ 447 448 #include <sys/taskq_impl.h> 449 #include <sys/thread.h> 450 #include <sys/proc.h> 451 #include <sys/kmem.h> 452 #include <sys/vmem.h> 453 #include <sys/callb.h> 454 #include <sys/systm.h> 455 #include <sys/cmn_err.h> 456 #include <sys/debug.h> 457 #include <sys/vmsystm.h> /* For throttlefree */ 458 #include <sys/sysmacros.h> 459 #include <sys/cpuvar.h> 460 #include <sys/sdt.h> 461 #include <sys/note.h> 462 463 static kmem_cache_t *taskq_ent_cache, *taskq_cache; 464 465 /* 466 * Pseudo instance numbers for taskqs without explicitly provided instance. 467 */ 468 static vmem_t *taskq_id_arena; 469 470 /* Global system task queue for common use */ 471 taskq_t *system_taskq; 472 473 /* 474 * Maximum number of entries in global system taskq is 475 * system_taskq_size * max_ncpus 476 */ 477 #define SYSTEM_TASKQ_SIZE 64 478 int system_taskq_size = SYSTEM_TASKQ_SIZE; 479 480 /* 481 * Minimum size for tq_nthreads_max; useful for those who want to play around 482 * with increasing a taskq's tq_nthreads_target. 483 */ 484 int taskq_minimum_nthreads_max = 1; 485 486 /* Maximum percentage allowed for TASKQ_THREADS_CPU_PCT */ 487 #define TASKQ_CPUPCT_MAX_PERCENT 1000 488 int taskq_cpupct_max_percent = TASKQ_CPUPCT_MAX_PERCENT; 489 490 /* 491 * Dynamic task queue threads that don't get any work within 492 * taskq_thread_timeout destroy themselves 493 */ 494 #define TASKQ_THREAD_TIMEOUT (60 * 5) 495 int taskq_thread_timeout = TASKQ_THREAD_TIMEOUT; 496 497 #define TASKQ_MAXBUCKETS 128 498 int taskq_maxbuckets = TASKQ_MAXBUCKETS; 499 500 /* 501 * When a bucket has no available entries another buckets are tried. 502 * taskq_search_depth parameter limits the amount of buckets that we search 503 * before failing. This is mostly useful in systems with many CPUs where we may 504 * spend too much time scanning busy buckets. 505 */ 506 #define TASKQ_SEARCH_DEPTH 4 507 int taskq_search_depth = TASKQ_SEARCH_DEPTH; 508 509 /* 510 * Hashing function: mix various bits of x. May be pretty much anything. 511 */ 512 #define TQ_HASH(x) ((x) ^ ((x) >> 11) ^ ((x) >> 17) ^ ((x) ^ 27)) 513 514 /* 515 * We do not create any new threads when the system is low on memory and start 516 * throttling memory allocations. The following macro tries to estimate such 517 * condition. 518 */ 519 #define ENOUGH_MEMORY() (freemem > throttlefree) 520 521 /* 522 * Static functions. 523 */ 524 static taskq_t *taskq_create_common(const char *, int, int, pri_t, int, 525 int, uint_t); 526 static void taskq_thread(void *); 527 static void taskq_d_thread(taskq_ent_t *); 528 static void taskq_bucket_extend(void *); 529 static int taskq_constructor(void *, void *, int); 530 static void taskq_destructor(void *, void *); 531 static int taskq_ent_constructor(void *, void *, int); 532 static void taskq_ent_destructor(void *, void *); 533 static taskq_ent_t *taskq_ent_alloc(taskq_t *, int); 534 static void taskq_ent_free(taskq_t *, taskq_ent_t *); 535 static taskq_ent_t *taskq_bucket_dispatch(taskq_bucket_t *, task_func_t, 536 void *); 537 538 /* 539 * Task queues kstats. 540 */ 541 struct taskq_kstat { 542 kstat_named_t tq_tasks; 543 kstat_named_t tq_executed; 544 kstat_named_t tq_maxtasks; 545 kstat_named_t tq_totaltime; 546 kstat_named_t tq_nalloc; 547 kstat_named_t tq_nactive; 548 kstat_named_t tq_pri; 549 kstat_named_t tq_nthreads; 550 } taskq_kstat = { 551 { "tasks", KSTAT_DATA_UINT64 }, 552 { "executed", KSTAT_DATA_UINT64 }, 553 { "maxtasks", KSTAT_DATA_UINT64 }, 554 { "totaltime", KSTAT_DATA_UINT64 }, 555 { "nactive", KSTAT_DATA_UINT64 }, 556 { "nalloc", KSTAT_DATA_UINT64 }, 557 { "priority", KSTAT_DATA_UINT64 }, 558 { "threads", KSTAT_DATA_UINT64 }, 559 }; 560 561 struct taskq_d_kstat { 562 kstat_named_t tqd_pri; 563 kstat_named_t tqd_btasks; 564 kstat_named_t tqd_bexecuted; 565 kstat_named_t tqd_bmaxtasks; 566 kstat_named_t tqd_bnalloc; 567 kstat_named_t tqd_bnactive; 568 kstat_named_t tqd_btotaltime; 569 kstat_named_t tqd_hits; 570 kstat_named_t tqd_misses; 571 kstat_named_t tqd_overflows; 572 kstat_named_t tqd_tcreates; 573 kstat_named_t tqd_tdeaths; 574 kstat_named_t tqd_maxthreads; 575 kstat_named_t tqd_nomem; 576 kstat_named_t tqd_disptcreates; 577 kstat_named_t tqd_totaltime; 578 kstat_named_t tqd_nalloc; 579 kstat_named_t tqd_nfree; 580 } taskq_d_kstat = { 581 { "priority", KSTAT_DATA_UINT64 }, 582 { "btasks", KSTAT_DATA_UINT64 }, 583 { "bexecuted", KSTAT_DATA_UINT64 }, 584 { "bmaxtasks", KSTAT_DATA_UINT64 }, 585 { "bnalloc", KSTAT_DATA_UINT64 }, 586 { "bnactive", KSTAT_DATA_UINT64 }, 587 { "btotaltime", KSTAT_DATA_UINT64 }, 588 { "hits", KSTAT_DATA_UINT64 }, 589 { "misses", KSTAT_DATA_UINT64 }, 590 { "overflows", KSTAT_DATA_UINT64 }, 591 { "tcreates", KSTAT_DATA_UINT64 }, 592 { "tdeaths", KSTAT_DATA_UINT64 }, 593 { "maxthreads", KSTAT_DATA_UINT64 }, 594 { "nomem", KSTAT_DATA_UINT64 }, 595 { "disptcreates", KSTAT_DATA_UINT64 }, 596 { "totaltime", KSTAT_DATA_UINT64 }, 597 { "nalloc", KSTAT_DATA_UINT64 }, 598 { "nfree", KSTAT_DATA_UINT64 }, 599 }; 600 601 static kmutex_t taskq_kstat_lock; 602 static kmutex_t taskq_d_kstat_lock; 603 static int taskq_kstat_update(kstat_t *, int); 604 static int taskq_d_kstat_update(kstat_t *, int); 605 606 /* 607 * State for THREAD_CPU_PCT management 608 */ 609 typedef struct taskq_cpupct_ent { 610 list_node_t tp_link; 611 taskq_t *tp_taskq; 612 } taskq_cpupct_ent_t; 613 614 static kmutex_t taskq_cpupct_lock; 615 static list_t taskq_cpupct_list; 616 static int taskq_cpupct_ncpus_online; 617 618 /* 619 * Collect per-bucket statistic when TASKQ_STATISTIC is defined. 620 */ 621 #define TASKQ_STATISTIC 1 622 623 #if TASKQ_STATISTIC 624 #define TQ_STAT(b, x) b->tqbucket_stat.x++ 625 #else 626 #define TQ_STAT(b, x) 627 #endif 628 629 /* 630 * Random fault injection. 631 */ 632 uint_t taskq_random; 633 uint_t taskq_dmtbf = UINT_MAX; /* mean time between injected failures */ 634 uint_t taskq_smtbf = UINT_MAX; /* mean time between injected failures */ 635 636 /* 637 * TQ_NOSLEEP dispatches on dynamic task queues are always allowed to fail. 638 * 639 * TQ_NOSLEEP dispatches on static task queues can't arbitrarily fail because 640 * they could prepopulate the cache and make sure that they do not use more 641 * then minalloc entries. So, fault injection in this case insures that 642 * either TASKQ_PREPOPULATE is not set or there are more entries allocated 643 * than is specified by minalloc. TQ_NOALLOC dispatches are always allowed 644 * to fail, but for simplicity we treat them identically to TQ_NOSLEEP 645 * dispatches. 646 */ 647 #ifdef DEBUG 648 #define TASKQ_D_RANDOM_DISPATCH_FAILURE(tq, flag) \ 649 taskq_random = (taskq_random * 2416 + 374441) % 1771875;\ 650 if ((flag & TQ_NOSLEEP) && \ 651 taskq_random < 1771875 / taskq_dmtbf) { \ 652 return (NULL); \ 653 } 654 655 #define TASKQ_S_RANDOM_DISPATCH_FAILURE(tq, flag) \ 656 taskq_random = (taskq_random * 2416 + 374441) % 1771875;\ 657 if ((flag & (TQ_NOSLEEP | TQ_NOALLOC)) && \ 658 (!(tq->tq_flags & TASKQ_PREPOPULATE) || \ 659 (tq->tq_nalloc > tq->tq_minalloc)) && \ 660 (taskq_random < (1771875 / taskq_smtbf))) { \ 661 mutex_exit(&tq->tq_lock); \ 662 return (NULL); \ 663 } 664 #else 665 #define TASKQ_S_RANDOM_DISPATCH_FAILURE(tq, flag) 666 #define TASKQ_D_RANDOM_DISPATCH_FAILURE(tq, flag) 667 #endif 668 669 #define IS_EMPTY(l) (((l).tqent_prev == (l).tqent_next) && \ 670 ((l).tqent_prev == &(l))) 671 672 /* 673 * Append `tqe' in the end of the doubly-linked list denoted by l. 674 */ 675 #define TQ_APPEND(l, tqe) { \ 676 tqe->tqent_next = &l; \ 677 tqe->tqent_prev = l.tqent_prev; \ 678 tqe->tqent_next->tqent_prev = tqe; \ 679 tqe->tqent_prev->tqent_next = tqe; \ 680 } 681 682 /* 683 * Schedule a task specified by func and arg into the task queue entry tqe. 684 */ 685 #define TQ_ENQUEUE(tq, tqe, func, arg) { \ 686 ASSERT(MUTEX_HELD(&tq->tq_lock)); \ 687 TQ_APPEND(tq->tq_task, tqe); \ 688 tqe->tqent_func = (func); \ 689 tqe->tqent_arg = (arg); \ 690 tq->tq_tasks++; \ 691 if (tq->tq_tasks - tq->tq_executed > tq->tq_maxtasks) \ 692 tq->tq_maxtasks = tq->tq_tasks - tq->tq_executed; \ 693 cv_signal(&tq->tq_dispatch_cv); \ 694 DTRACE_PROBE2(taskq__enqueue, taskq_t *, tq, taskq_ent_t *, tqe); \ 695 } 696 697 /* 698 * Do-nothing task which may be used to prepopulate thread caches. 699 */ 700 /*ARGSUSED*/ 701 void 702 nulltask(void *unused) 703 { 704 } 705 706 707 /*ARGSUSED*/ 708 static int 709 taskq_constructor(void *buf, void *cdrarg, int kmflags) 710 { 711 taskq_t *tq = buf; 712 713 bzero(tq, sizeof (taskq_t)); 714 715 mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL); 716 rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL); 717 cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL); 718 cv_init(&tq->tq_exit_cv, NULL, CV_DEFAULT, NULL); 719 cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL); 720 721 tq->tq_task.tqent_next = &tq->tq_task; 722 tq->tq_task.tqent_prev = &tq->tq_task; 723 724 return (0); 725 } 726 727 /*ARGSUSED*/ 728 static void 729 taskq_destructor(void *buf, void *cdrarg) 730 { 731 taskq_t *tq = buf; 732 733 ASSERT(tq->tq_nthreads == 0); 734 ASSERT(tq->tq_buckets == NULL); 735 ASSERT(tq->tq_tcreates == 0); 736 ASSERT(tq->tq_tdeaths == 0); 737 738 mutex_destroy(&tq->tq_lock); 739 rw_destroy(&tq->tq_threadlock); 740 cv_destroy(&tq->tq_dispatch_cv); 741 cv_destroy(&tq->tq_exit_cv); 742 cv_destroy(&tq->tq_wait_cv); 743 } 744 745 /*ARGSUSED*/ 746 static int 747 taskq_ent_constructor(void *buf, void *cdrarg, int kmflags) 748 { 749 taskq_ent_t *tqe = buf; 750 751 tqe->tqent_thread = NULL; 752 cv_init(&tqe->tqent_cv, NULL, CV_DEFAULT, NULL); 753 754 return (0); 755 } 756 757 /*ARGSUSED*/ 758 static void 759 taskq_ent_destructor(void *buf, void *cdrarg) 760 { 761 taskq_ent_t *tqe = buf; 762 763 ASSERT(tqe->tqent_thread == NULL); 764 cv_destroy(&tqe->tqent_cv); 765 } 766 767 void 768 taskq_init(void) 769 { 770 taskq_ent_cache = kmem_cache_create("taskq_ent_cache", 771 sizeof (taskq_ent_t), 0, taskq_ent_constructor, 772 taskq_ent_destructor, NULL, NULL, NULL, 0); 773 taskq_cache = kmem_cache_create("taskq_cache", sizeof (taskq_t), 774 0, taskq_constructor, taskq_destructor, NULL, NULL, NULL, 0); 775 taskq_id_arena = vmem_create("taskq_id_arena", 776 (void *)1, INT32_MAX, 1, NULL, NULL, NULL, 0, 777 VM_SLEEP | VMC_IDENTIFIER); 778 779 list_create(&taskq_cpupct_list, sizeof (taskq_cpupct_ent_t), 780 offsetof(taskq_cpupct_ent_t, tp_link)); 781 } 782 783 /*ARGSUSED*/ 784 static int 785 taskq_cpu_setup(cpu_setup_t what, int id, void *arg) 786 { 787 taskq_cpupct_ent_t *tpp; 788 int cpus_online = ncpus_online; 789 790 /* offlines are called *before* the cpu is offlined. */ 791 if (what == CPU_OFF) 792 cpus_online--; 793 if (cpus_online < 1) 794 cpus_online = 1; 795 796 mutex_enter(&taskq_cpupct_lock); 797 if (cpus_online == taskq_cpupct_ncpus_online) { 798 mutex_exit(&taskq_cpupct_lock); 799 return (0); 800 } 801 802 for (tpp = list_head(&taskq_cpupct_list); tpp != NULL; 803 tpp = list_next(&taskq_cpupct_list, tpp)) { 804 taskq_t *tq = tpp->tp_taskq; 805 int newtarget; 806 807 mutex_enter(&tq->tq_lock); 808 newtarget = 809 TASKQ_THREADS_PCT(cpus_online, tq->tq_threads_ncpus_pct); 810 ASSERT3S(newtarget, <=, tq->tq_nthreads_max); 811 if (newtarget != tq->tq_nthreads_target) { 812 /* The taskq must not be exiting */ 813 ASSERT3S(tq->tq_nthreads_target, !=, 0); 814 tq->tq_flags |= TASKQ_CHANGING; 815 tq->tq_nthreads_target = newtarget; 816 cv_broadcast(&tq->tq_dispatch_cv); 817 cv_broadcast(&tq->tq_exit_cv); 818 } 819 mutex_exit(&tq->tq_lock); 820 } 821 822 taskq_cpupct_ncpus_online = cpus_online; 823 mutex_exit(&taskq_cpupct_lock); 824 return (0); 825 } 826 827 void 828 taskq_mp_init(void) 829 { 830 mutex_enter(&cpu_lock); 831 register_cpu_setup_func(taskq_cpu_setup, NULL); 832 (void) taskq_cpu_setup(CPU_ON, 0, NULL); 833 mutex_exit(&cpu_lock); 834 } 835 836 /* 837 * Create global system dynamic task queue. 838 */ 839 void 840 system_taskq_init(void) 841 { 842 system_taskq = taskq_create_common("system_taskq", 0, 843 system_taskq_size * max_ncpus, minclsyspri, 4, 512, 844 TASKQ_DYNAMIC | TASKQ_PREPOPULATE); 845 } 846 847 /* 848 * taskq_ent_alloc() 849 * 850 * Allocates a new taskq_ent_t structure either from the free list or from the 851 * cache. Returns NULL if it can't be allocated. 852 * 853 * Assumes: tq->tq_lock is held. 854 */ 855 static taskq_ent_t * 856 taskq_ent_alloc(taskq_t *tq, int flags) 857 { 858 int kmflags = (flags & TQ_NOSLEEP) ? KM_NOSLEEP : KM_SLEEP; 859 860 taskq_ent_t *tqe; 861 862 ASSERT(MUTEX_HELD(&tq->tq_lock)); 863 864 /* 865 * TQ_NOALLOC allocations are allowed to use the freelist, even if 866 * we are below tq_minalloc. 867 */ 868 if ((tqe = tq->tq_freelist) != NULL && 869 ((flags & TQ_NOALLOC) || tq->tq_nalloc >= tq->tq_minalloc)) { 870 tq->tq_freelist = tqe->tqent_next; 871 } else { 872 if (flags & TQ_NOALLOC) 873 return (NULL); 874 875 mutex_exit(&tq->tq_lock); 876 if (tq->tq_nalloc >= tq->tq_maxalloc) { 877 if (kmflags & KM_NOSLEEP) { 878 mutex_enter(&tq->tq_lock); 879 return (NULL); 880 } 881 /* 882 * We don't want to exceed tq_maxalloc, but we can't 883 * wait for other tasks to complete (and thus free up 884 * task structures) without risking deadlock with 885 * the caller. So, we just delay for one second 886 * to throttle the allocation rate. 887 */ 888 delay(hz); 889 } 890 tqe = kmem_cache_alloc(taskq_ent_cache, kmflags); 891 mutex_enter(&tq->tq_lock); 892 if (tqe != NULL) 893 tq->tq_nalloc++; 894 } 895 return (tqe); 896 } 897 898 /* 899 * taskq_ent_free() 900 * 901 * Free taskq_ent_t structure by either putting it on the free list or freeing 902 * it to the cache. 903 * 904 * Assumes: tq->tq_lock is held. 905 */ 906 static void 907 taskq_ent_free(taskq_t *tq, taskq_ent_t *tqe) 908 { 909 ASSERT(MUTEX_HELD(&tq->tq_lock)); 910 911 if (tq->tq_nalloc <= tq->tq_minalloc) { 912 tqe->tqent_next = tq->tq_freelist; 913 tq->tq_freelist = tqe; 914 } else { 915 tq->tq_nalloc--; 916 mutex_exit(&tq->tq_lock); 917 kmem_cache_free(taskq_ent_cache, tqe); 918 mutex_enter(&tq->tq_lock); 919 } 920 } 921 922 /* 923 * Dispatch a task "func(arg)" to a free entry of bucket b. 924 * 925 * Assumes: no bucket locks is held. 926 * 927 * Returns: a pointer to an entry if dispatch was successful. 928 * NULL if there are no free entries or if the bucket is suspended. 929 */ 930 static taskq_ent_t * 931 taskq_bucket_dispatch(taskq_bucket_t *b, task_func_t func, void *arg) 932 { 933 taskq_ent_t *tqe; 934 935 ASSERT(MUTEX_NOT_HELD(&b->tqbucket_lock)); 936 ASSERT(func != NULL); 937 938 mutex_enter(&b->tqbucket_lock); 939 940 ASSERT(b->tqbucket_nfree != 0 || IS_EMPTY(b->tqbucket_freelist)); 941 ASSERT(b->tqbucket_nfree == 0 || !IS_EMPTY(b->tqbucket_freelist)); 942 943 /* 944 * Get en entry from the freelist if there is one. 945 * Schedule task into the entry. 946 */ 947 if ((b->tqbucket_nfree != 0) && 948 !(b->tqbucket_flags & TQBUCKET_SUSPEND)) { 949 tqe = b->tqbucket_freelist.tqent_prev; 950 951 ASSERT(tqe != &b->tqbucket_freelist); 952 ASSERT(tqe->tqent_thread != NULL); 953 954 tqe->tqent_prev->tqent_next = tqe->tqent_next; 955 tqe->tqent_next->tqent_prev = tqe->tqent_prev; 956 b->tqbucket_nalloc++; 957 b->tqbucket_nfree--; 958 tqe->tqent_func = func; 959 tqe->tqent_arg = arg; 960 TQ_STAT(b, tqs_hits); 961 cv_signal(&tqe->tqent_cv); 962 DTRACE_PROBE2(taskq__d__enqueue, taskq_bucket_t *, b, 963 taskq_ent_t *, tqe); 964 } else { 965 tqe = NULL; 966 TQ_STAT(b, tqs_misses); 967 } 968 mutex_exit(&b->tqbucket_lock); 969 return (tqe); 970 } 971 972 /* 973 * Dispatch a task. 974 * 975 * Assumes: func != NULL 976 * 977 * Returns: NULL if dispatch failed. 978 * non-NULL if task dispatched successfully. 979 * Actual return value is the pointer to taskq entry that was used to 980 * dispatch a task. This is useful for debugging. 981 */ 982 /* ARGSUSED */ 983 taskqid_t 984 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) 985 { 986 taskq_bucket_t *bucket = NULL; /* Which bucket needs extension */ 987 taskq_ent_t *tqe = NULL; 988 taskq_ent_t *tqe1; 989 uint_t bsize; 990 991 ASSERT(tq != NULL); 992 ASSERT(func != NULL); 993 994 if (!(tq->tq_flags & TASKQ_DYNAMIC)) { 995 /* 996 * TQ_NOQUEUE flag can't be used with non-dynamic task queues. 997 */ 998 ASSERT(! (flags & TQ_NOQUEUE)); 999 /* 1000 * Enqueue the task to the underlying queue. 1001 */ 1002 mutex_enter(&tq->tq_lock); 1003 1004 TASKQ_S_RANDOM_DISPATCH_FAILURE(tq, flags); 1005 1006 if ((tqe = taskq_ent_alloc(tq, flags)) == NULL) { 1007 mutex_exit(&tq->tq_lock); 1008 return (NULL); 1009 } 1010 TQ_ENQUEUE(tq, tqe, func, arg); 1011 mutex_exit(&tq->tq_lock); 1012 return ((taskqid_t)tqe); 1013 } 1014 1015 /* 1016 * Dynamic taskq dispatching. 1017 */ 1018 ASSERT(!(flags & TQ_NOALLOC)); 1019 TASKQ_D_RANDOM_DISPATCH_FAILURE(tq, flags); 1020 1021 bsize = tq->tq_nbuckets; 1022 1023 if (bsize == 1) { 1024 /* 1025 * In a single-CPU case there is only one bucket, so get 1026 * entry directly from there. 1027 */ 1028 if ((tqe = taskq_bucket_dispatch(tq->tq_buckets, func, arg)) 1029 != NULL) 1030 return ((taskqid_t)tqe); /* Fastpath */ 1031 bucket = tq->tq_buckets; 1032 } else { 1033 int loopcount; 1034 taskq_bucket_t *b; 1035 uintptr_t h = ((uintptr_t)CPU + (uintptr_t)arg) >> 3; 1036 1037 h = TQ_HASH(h); 1038 1039 /* 1040 * The 'bucket' points to the original bucket that we hit. If we 1041 * can't allocate from it, we search other buckets, but only 1042 * extend this one. 1043 */ 1044 b = &tq->tq_buckets[h & (bsize - 1)]; 1045 ASSERT(b->tqbucket_taskq == tq); /* Sanity check */ 1046 1047 /* 1048 * Do a quick check before grabbing the lock. If the bucket does 1049 * not have free entries now, chances are very small that it 1050 * will after we take the lock, so we just skip it. 1051 */ 1052 if (b->tqbucket_nfree != 0) { 1053 if ((tqe = taskq_bucket_dispatch(b, func, arg)) != NULL) 1054 return ((taskqid_t)tqe); /* Fastpath */ 1055 } else { 1056 TQ_STAT(b, tqs_misses); 1057 } 1058 1059 bucket = b; 1060 loopcount = MIN(taskq_search_depth, bsize); 1061 /* 1062 * If bucket dispatch failed, search loopcount number of buckets 1063 * before we give up and fail. 1064 */ 1065 do { 1066 b = &tq->tq_buckets[++h & (bsize - 1)]; 1067 ASSERT(b->tqbucket_taskq == tq); /* Sanity check */ 1068 loopcount--; 1069 1070 if (b->tqbucket_nfree != 0) { 1071 tqe = taskq_bucket_dispatch(b, func, arg); 1072 } else { 1073 TQ_STAT(b, tqs_misses); 1074 } 1075 } while ((tqe == NULL) && (loopcount > 0)); 1076 } 1077 1078 /* 1079 * At this point we either scheduled a task and (tqe != NULL) or failed 1080 * (tqe == NULL). Try to recover from fails. 1081 */ 1082 1083 /* 1084 * For KM_SLEEP dispatches, try to extend the bucket and retry dispatch. 1085 */ 1086 if ((tqe == NULL) && !(flags & TQ_NOSLEEP)) { 1087 /* 1088 * taskq_bucket_extend() may fail to do anything, but this is 1089 * fine - we deal with it later. If the bucket was successfully 1090 * extended, there is a good chance that taskq_bucket_dispatch() 1091 * will get this new entry, unless someone is racing with us and 1092 * stealing the new entry from under our nose. 1093 * taskq_bucket_extend() may sleep. 1094 */ 1095 taskq_bucket_extend(bucket); 1096 TQ_STAT(bucket, tqs_disptcreates); 1097 if ((tqe = taskq_bucket_dispatch(bucket, func, arg)) != NULL) 1098 return ((taskqid_t)tqe); 1099 } 1100 1101 ASSERT(bucket != NULL); 1102 /* 1103 * Since there are not enough free entries in the bucket, extend it 1104 * in the background using backing queue. 1105 */ 1106 mutex_enter(&tq->tq_lock); 1107 if ((tqe1 = taskq_ent_alloc(tq, TQ_NOSLEEP)) != NULL) { 1108 TQ_ENQUEUE(tq, tqe1, taskq_bucket_extend, 1109 bucket); 1110 } else { 1111 TQ_STAT(bucket, tqs_nomem); 1112 } 1113 1114 /* 1115 * Dispatch failed and we can't find an entry to schedule a task. 1116 * Revert to the backing queue unless TQ_NOQUEUE was asked. 1117 */ 1118 if ((tqe == NULL) && !(flags & TQ_NOQUEUE)) { 1119 if ((tqe = taskq_ent_alloc(tq, flags)) != NULL) { 1120 TQ_ENQUEUE(tq, tqe, func, arg); 1121 } else { 1122 TQ_STAT(bucket, tqs_nomem); 1123 } 1124 } 1125 mutex_exit(&tq->tq_lock); 1126 1127 return ((taskqid_t)tqe); 1128 } 1129 1130 /* 1131 * Wait for all pending tasks to complete. 1132 * Calling taskq_wait from a task will cause deadlock. 1133 */ 1134 void 1135 taskq_wait(taskq_t *tq) 1136 { 1137 ASSERT(tq != curthread->t_taskq); 1138 1139 mutex_enter(&tq->tq_lock); 1140 while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0) 1141 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 1142 mutex_exit(&tq->tq_lock); 1143 1144 if (tq->tq_flags & TASKQ_DYNAMIC) { 1145 taskq_bucket_t *b = tq->tq_buckets; 1146 int bid = 0; 1147 for (; (b != NULL) && (bid < tq->tq_nbuckets); b++, bid++) { 1148 mutex_enter(&b->tqbucket_lock); 1149 while (b->tqbucket_nalloc > 0) 1150 cv_wait(&b->tqbucket_cv, &b->tqbucket_lock); 1151 mutex_exit(&b->tqbucket_lock); 1152 } 1153 } 1154 } 1155 1156 /* 1157 * Suspend execution of tasks. 1158 * 1159 * Tasks in the queue part will be suspended immediately upon return from this 1160 * function. Pending tasks in the dynamic part will continue to execute, but all 1161 * new tasks will be suspended. 1162 */ 1163 void 1164 taskq_suspend(taskq_t *tq) 1165 { 1166 rw_enter(&tq->tq_threadlock, RW_WRITER); 1167 1168 if (tq->tq_flags & TASKQ_DYNAMIC) { 1169 taskq_bucket_t *b = tq->tq_buckets; 1170 int bid = 0; 1171 for (; (b != NULL) && (bid < tq->tq_nbuckets); b++, bid++) { 1172 mutex_enter(&b->tqbucket_lock); 1173 b->tqbucket_flags |= TQBUCKET_SUSPEND; 1174 mutex_exit(&b->tqbucket_lock); 1175 } 1176 } 1177 /* 1178 * Mark task queue as being suspended. Needed for taskq_suspended(). 1179 */ 1180 mutex_enter(&tq->tq_lock); 1181 ASSERT(!(tq->tq_flags & TASKQ_SUSPENDED)); 1182 tq->tq_flags |= TASKQ_SUSPENDED; 1183 mutex_exit(&tq->tq_lock); 1184 } 1185 1186 /* 1187 * returns: 1 if tq is suspended, 0 otherwise. 1188 */ 1189 int 1190 taskq_suspended(taskq_t *tq) 1191 { 1192 return ((tq->tq_flags & TASKQ_SUSPENDED) != 0); 1193 } 1194 1195 /* 1196 * Resume taskq execution. 1197 */ 1198 void 1199 taskq_resume(taskq_t *tq) 1200 { 1201 ASSERT(RW_WRITE_HELD(&tq->tq_threadlock)); 1202 1203 if (tq->tq_flags & TASKQ_DYNAMIC) { 1204 taskq_bucket_t *b = tq->tq_buckets; 1205 int bid = 0; 1206 for (; (b != NULL) && (bid < tq->tq_nbuckets); b++, bid++) { 1207 mutex_enter(&b->tqbucket_lock); 1208 b->tqbucket_flags &= ~TQBUCKET_SUSPEND; 1209 mutex_exit(&b->tqbucket_lock); 1210 } 1211 } 1212 mutex_enter(&tq->tq_lock); 1213 ASSERT(tq->tq_flags & TASKQ_SUSPENDED); 1214 tq->tq_flags &= ~TASKQ_SUSPENDED; 1215 mutex_exit(&tq->tq_lock); 1216 1217 rw_exit(&tq->tq_threadlock); 1218 } 1219 1220 int 1221 taskq_member(taskq_t *tq, kthread_t *thread) 1222 { 1223 return (thread->t_taskq == tq); 1224 } 1225 1226 static void 1227 taskq_thread_create(taskq_t *tq) 1228 { 1229 kthread_t *t; 1230 1231 ASSERT(MUTEX_HELD(&tq->tq_lock)); 1232 ASSERT(!(tq->tq_flags & TASKQ_THREAD_CREATED)); 1233 1234 tq->tq_flags |= TASKQ_THREAD_CREATED; 1235 tq->tq_active++; 1236 t = thread_create(NULL, 0, taskq_thread, tq, 0, &p0, TS_RUN, 1237 tq->tq_pri); 1238 t->t_taskq = tq; 1239 } 1240 1241 static void 1242 taskq_thread_wait(taskq_t *tq, kcondvar_t *cv, callb_cpr_t *cprinfo) 1243 { 1244 if (tq->tq_flags & TASKQ_CPR_SAFE) { 1245 cv_wait(cv, &tq->tq_lock); 1246 } else { 1247 CALLB_CPR_SAFE_BEGIN(cprinfo); 1248 cv_wait(cv, &tq->tq_lock); 1249 CALLB_CPR_SAFE_END(cprinfo, &tq->tq_lock); 1250 } 1251 } 1252 1253 /* 1254 * Worker thread for processing task queue. 1255 */ 1256 static void 1257 taskq_thread(void *arg) 1258 { 1259 int thread_id; 1260 1261 taskq_t *tq = arg; 1262 taskq_ent_t *tqe; 1263 callb_cpr_t cprinfo; 1264 hrtime_t start, end; 1265 1266 if (tq->tq_flags & TASKQ_CPR_SAFE) { 1267 CALLB_CPR_INIT_SAFE(curthread, tq->tq_name); 1268 } else { 1269 CALLB_CPR_INIT(&cprinfo, &tq->tq_lock, callb_generic_cpr, 1270 tq->tq_name); 1271 } 1272 mutex_enter(&tq->tq_lock); 1273 thread_id = ++tq->tq_nthreads; 1274 ASSERT(tq->tq_flags & TASKQ_THREAD_CREATED); 1275 tq->tq_flags &= ~TASKQ_THREAD_CREATED; 1276 1277 VERIFY3S(thread_id, <=, tq->tq_nthreads_max); 1278 1279 if (tq->tq_nthreads_max == 1) 1280 tq->tq_thread = curthread; 1281 else 1282 tq->tq_threadlist[thread_id - 1] = curthread; 1283 1284 for (;;) { 1285 if (tq->tq_flags & TASKQ_CHANGING) { 1286 /* we're done; clear the CHANGING flag */ 1287 if (tq->tq_nthreads == tq->tq_nthreads_target) { 1288 tq->tq_flags &= ~TASKQ_CHANGING; 1289 continue; 1290 } 1291 /* We're low on threads and none have been created */ 1292 if (tq->tq_nthreads < tq->tq_nthreads_target && 1293 !(tq->tq_flags & TASKQ_THREAD_CREATED)) { 1294 taskq_thread_create(tq); 1295 continue; 1296 } 1297 /* We're no longer needed */ 1298 if (thread_id > tq->tq_nthreads_target) { 1299 /* 1300 * To preserve the one-to-one mapping between 1301 * thread_id and thread, we must exit from 1302 * highest thread ID to least. 1303 * 1304 * However, if everyone is exiting, the order 1305 * doesn't matter, so just exit immediately. 1306 * (this is safe, since you must wait for 1307 * nthreads to reach 0 after setting 1308 * tq_nthreads_target to 0) 1309 */ 1310 if (thread_id == tq->tq_nthreads || 1311 tq->tq_nthreads_target == 0) 1312 break; 1313 1314 /* Wait for higher thread_ids to exit */ 1315 taskq_thread_wait(tq, &tq->tq_exit_cv, 1316 &cprinfo); 1317 continue; 1318 } 1319 } 1320 if ((tqe = tq->tq_task.tqent_next) == &tq->tq_task) { 1321 if (--tq->tq_active == 0) 1322 cv_broadcast(&tq->tq_wait_cv); 1323 taskq_thread_wait(tq, &tq->tq_dispatch_cv, &cprinfo); 1324 tq->tq_active++; 1325 continue; 1326 } 1327 tqe->tqent_prev->tqent_next = tqe->tqent_next; 1328 tqe->tqent_next->tqent_prev = tqe->tqent_prev; 1329 mutex_exit(&tq->tq_lock); 1330 1331 rw_enter(&tq->tq_threadlock, RW_READER); 1332 start = gethrtime(); 1333 DTRACE_PROBE2(taskq__exec__start, taskq_t *, tq, 1334 taskq_ent_t *, tqe); 1335 tqe->tqent_func(tqe->tqent_arg); 1336 DTRACE_PROBE2(taskq__exec__end, taskq_t *, tq, 1337 taskq_ent_t *, tqe); 1338 end = gethrtime(); 1339 rw_exit(&tq->tq_threadlock); 1340 1341 mutex_enter(&tq->tq_lock); 1342 tq->tq_totaltime += end - start; 1343 tq->tq_executed++; 1344 1345 taskq_ent_free(tq, tqe); 1346 } 1347 1348 if (tq->tq_nthreads_max == 1) 1349 tq->tq_thread = NULL; 1350 else 1351 tq->tq_threadlist[thread_id - 1] = NULL; 1352 1353 ASSERT(tq->tq_nthreads > 0); 1354 if (--tq->tq_nthreads == 0) 1355 cv_broadcast(&tq->tq_wait_cv); 1356 1357 /* let the other threads which need to exit know we're done */ 1358 cv_broadcast(&tq->tq_exit_cv); 1359 1360 /* We're exiting, and therefore no longer active */ 1361 tq->tq_active--; 1362 1363 ASSERT(!(tq->tq_flags & TASKQ_CPR_SAFE)); 1364 CALLB_CPR_EXIT(&cprinfo); 1365 thread_exit(); 1366 } 1367 1368 /* 1369 * Worker per-entry thread for dynamic dispatches. 1370 */ 1371 static void 1372 taskq_d_thread(taskq_ent_t *tqe) 1373 { 1374 taskq_bucket_t *bucket = tqe->tqent_bucket; 1375 taskq_t *tq = bucket->tqbucket_taskq; 1376 kmutex_t *lock = &bucket->tqbucket_lock; 1377 kcondvar_t *cv = &tqe->tqent_cv; 1378 callb_cpr_t cprinfo; 1379 clock_t w; 1380 1381 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, tq->tq_name); 1382 1383 mutex_enter(lock); 1384 1385 for (;;) { 1386 /* 1387 * If a task is scheduled (func != NULL), execute it, otherwise 1388 * sleep, waiting for a job. 1389 */ 1390 if (tqe->tqent_func != NULL) { 1391 hrtime_t start; 1392 hrtime_t end; 1393 1394 ASSERT(bucket->tqbucket_nalloc > 0); 1395 1396 /* 1397 * It is possible to free the entry right away before 1398 * actually executing the task so that subsequent 1399 * dispatches may immediately reuse it. But this, 1400 * effectively, creates a two-length queue in the entry 1401 * and may lead to a deadlock if the execution of the 1402 * current task depends on the execution of the next 1403 * scheduled task. So, we keep the entry busy until the 1404 * task is processed. 1405 */ 1406 1407 mutex_exit(lock); 1408 start = gethrtime(); 1409 DTRACE_PROBE3(taskq__d__exec__start, taskq_t *, tq, 1410 taskq_bucket_t *, bucket, taskq_ent_t *, tqe); 1411 tqe->tqent_func(tqe->tqent_arg); 1412 DTRACE_PROBE3(taskq__d__exec__end, taskq_t *, tq, 1413 taskq_bucket_t *, bucket, taskq_ent_t *, tqe); 1414 end = gethrtime(); 1415 mutex_enter(lock); 1416 bucket->tqbucket_totaltime += end - start; 1417 1418 /* 1419 * Return the entry to the bucket free list. 1420 */ 1421 tqe->tqent_func = NULL; 1422 TQ_APPEND(bucket->tqbucket_freelist, tqe); 1423 bucket->tqbucket_nalloc--; 1424 bucket->tqbucket_nfree++; 1425 ASSERT(!IS_EMPTY(bucket->tqbucket_freelist)); 1426 /* 1427 * taskq_wait() waits for nalloc to drop to zero on 1428 * tqbucket_cv. 1429 */ 1430 cv_signal(&bucket->tqbucket_cv); 1431 } 1432 1433 /* 1434 * At this point the entry must be in the bucket free list - 1435 * either because it was there initially or because it just 1436 * finished executing a task and put itself on the free list. 1437 */ 1438 ASSERT(bucket->tqbucket_nfree > 0); 1439 /* 1440 * Go to sleep unless we are closing. 1441 * If a thread is sleeping too long, it dies. 1442 */ 1443 if (! (bucket->tqbucket_flags & TQBUCKET_CLOSE)) { 1444 CALLB_CPR_SAFE_BEGIN(&cprinfo); 1445 w = cv_timedwait(cv, lock, lbolt + 1446 taskq_thread_timeout * hz); 1447 CALLB_CPR_SAFE_END(&cprinfo, lock); 1448 } 1449 1450 /* 1451 * At this point we may be in two different states: 1452 * 1453 * (1) tqent_func is set which means that a new task is 1454 * dispatched and we need to execute it. 1455 * 1456 * (2) Thread is sleeping for too long or we are closing. In 1457 * both cases destroy the thread and the entry. 1458 */ 1459 1460 /* If func is NULL we should be on the freelist. */ 1461 ASSERT((tqe->tqent_func != NULL) || 1462 (bucket->tqbucket_nfree > 0)); 1463 /* If func is non-NULL we should be allocated */ 1464 ASSERT((tqe->tqent_func == NULL) || 1465 (bucket->tqbucket_nalloc > 0)); 1466 1467 /* Check freelist consistency */ 1468 ASSERT((bucket->tqbucket_nfree > 0) || 1469 IS_EMPTY(bucket->tqbucket_freelist)); 1470 ASSERT((bucket->tqbucket_nfree == 0) || 1471 !IS_EMPTY(bucket->tqbucket_freelist)); 1472 1473 if ((tqe->tqent_func == NULL) && 1474 ((w == -1) || (bucket->tqbucket_flags & TQBUCKET_CLOSE))) { 1475 /* 1476 * This thread is sleeping for too long or we are 1477 * closing - time to die. 1478 * Thread creation/destruction happens rarely, 1479 * so grabbing the lock is not a big performance issue. 1480 * The bucket lock is dropped by CALLB_CPR_EXIT(). 1481 */ 1482 1483 /* Remove the entry from the free list. */ 1484 tqe->tqent_prev->tqent_next = tqe->tqent_next; 1485 tqe->tqent_next->tqent_prev = tqe->tqent_prev; 1486 ASSERT(bucket->tqbucket_nfree > 0); 1487 bucket->tqbucket_nfree--; 1488 1489 TQ_STAT(bucket, tqs_tdeaths); 1490 cv_signal(&bucket->tqbucket_cv); 1491 tqe->tqent_thread = NULL; 1492 mutex_enter(&tq->tq_lock); 1493 tq->tq_tdeaths++; 1494 mutex_exit(&tq->tq_lock); 1495 CALLB_CPR_EXIT(&cprinfo); 1496 kmem_cache_free(taskq_ent_cache, tqe); 1497 thread_exit(); 1498 } 1499 } 1500 } 1501 1502 1503 /* 1504 * Taskq creation. May sleep for memory. 1505 * Always use automatically generated instances to avoid kstat name space 1506 * collisions. 1507 */ 1508 1509 taskq_t * 1510 taskq_create(const char *name, int nthreads, pri_t pri, int minalloc, 1511 int maxalloc, uint_t flags) 1512 { 1513 return taskq_create_common(name, 0, nthreads, pri, minalloc, 1514 maxalloc, flags | TASKQ_NOINSTANCE); 1515 } 1516 1517 /* 1518 * Create an instance of task queue. It is legal to create task queues with the 1519 * same name and different instances. 1520 * 1521 * taskq_create_instance is used by ddi_taskq_create() where it gets the 1522 * instance from ddi_get_instance(). In some cases the instance is not 1523 * initialized and is set to -1. This case is handled as if no instance was 1524 * passed at all. 1525 */ 1526 taskq_t * 1527 taskq_create_instance(const char *name, int instance, int nthreads, pri_t pri, 1528 int minalloc, int maxalloc, uint_t flags) 1529 { 1530 ASSERT((instance >= 0) || (instance == -1)); 1531 1532 if (instance < 0) { 1533 flags |= TASKQ_NOINSTANCE; 1534 } 1535 1536 return (taskq_create_common(name, instance, nthreads, 1537 pri, minalloc, maxalloc, flags)); 1538 } 1539 1540 static taskq_t * 1541 taskq_create_common(const char *name, int instance, int nthreads, pri_t pri, 1542 int minalloc, int maxalloc, uint_t flags) 1543 { 1544 taskq_t *tq = kmem_cache_alloc(taskq_cache, KM_SLEEP); 1545 uint_t ncpus = ((boot_max_ncpus == -1) ? max_ncpus : boot_max_ncpus); 1546 uint_t bsize; /* # of buckets - always power of 2 */ 1547 int max_nthreads; 1548 1549 /* 1550 * TASKQ_DYNAMIC is incompatible with TASKQ_CPR_SAFE and 1551 * TASKQ_THREADS_CPU_PCT. 1552 */ 1553 ASSERT(!(flags & TASKQ_DYNAMIC) || 1554 !(flags & (TASKQ_CPR_SAFE | TASKQ_THREADS_CPU_PCT))); 1555 /* TASKQ_CPR_SAFE is incompatible with TASKQ_THREADS_CPU_PCT */ 1556 1557 ASSERT(!(flags & TASKQ_CPR_SAFE) || !(flags & TASKQ_THREADS_CPU_PCT)); 1558 1559 bsize = 1 << (highbit(ncpus) - 1); 1560 ASSERT(bsize >= 1); 1561 bsize = MIN(bsize, taskq_maxbuckets); 1562 1563 if (flags & TASKQ_DYNAMIC) { 1564 ASSERT3S(nthreads, >=, 1); 1565 tq->tq_maxsize = nthreads; 1566 1567 /* For dynamic task queues use just one backup thread */ 1568 nthreads = max_nthreads = 1; 1569 1570 } else if (!(flags & TASKQ_THREADS_CPU_PCT)) { 1571 ASSERT3S(nthreads, >=, 1); 1572 max_nthreads = nthreads; 1573 } else { 1574 uint_t pct; 1575 ASSERT3S(nthreads, >=, 0); 1576 pct = nthreads; 1577 1578 if (pct > taskq_cpupct_max_percent) 1579 pct = taskq_cpupct_max_percent; 1580 1581 tq->tq_threads_ncpus_pct = pct; 1582 nthreads = TASKQ_THREADS_PCT(ncpus_online, pct); 1583 max_nthreads = TASKQ_THREADS_PCT(max_ncpus, pct); 1584 } 1585 1586 if (max_nthreads < taskq_minimum_nthreads_max) 1587 max_nthreads = taskq_minimum_nthreads_max; 1588 1589 /* 1590 * Make sure the name is 0-terminated, and conforms to the rules for 1591 * C indentifiers 1592 */ 1593 (void) strncpy(tq->tq_name, name, TASKQ_NAMELEN + 1); 1594 strident_canon(tq->tq_name, TASKQ_NAMELEN + 1); 1595 1596 tq->tq_flags = flags | TASKQ_CHANGING; 1597 tq->tq_active = 0; 1598 tq->tq_instance = instance; 1599 tq->tq_nthreads_target = nthreads; 1600 tq->tq_nthreads_max = max_nthreads; 1601 tq->tq_minalloc = minalloc; 1602 tq->tq_maxalloc = maxalloc; 1603 tq->tq_nbuckets = bsize; 1604 tq->tq_pri = pri; 1605 1606 if (max_nthreads > 1) 1607 tq->tq_threadlist = kmem_alloc( 1608 sizeof (kthread_t *) * max_nthreads, KM_SLEEP); 1609 1610 /* Add the taskq to the list of CPU_PCT taskqs */ 1611 if (flags & TASKQ_THREADS_CPU_PCT) { 1612 taskq_cpupct_ent_t *tpp = kmem_zalloc(sizeof (*tpp), KM_SLEEP); 1613 1614 list_link_init(&tpp->tp_link); 1615 tpp->tp_taskq = tq; 1616 1617 mutex_enter(&taskq_cpupct_lock); 1618 list_insert_tail(&taskq_cpupct_list, tpp); 1619 /* reset our target, to avoid race conditions */ 1620 tq->tq_nthreads_target = TASKQ_THREADS_PCT(ncpus_online, 1621 tq->tq_threads_ncpus_pct); 1622 mutex_exit(&taskq_cpupct_lock); 1623 } 1624 1625 mutex_enter(&tq->tq_lock); 1626 if (flags & TASKQ_PREPOPULATE) { 1627 while (minalloc-- > 0) 1628 taskq_ent_free(tq, taskq_ent_alloc(tq, TQ_SLEEP)); 1629 } 1630 1631 /* create the first thread; if more are needed, it'll create them */ 1632 taskq_thread_create(tq); 1633 mutex_exit(&tq->tq_lock); 1634 1635 if (flags & TASKQ_DYNAMIC) { 1636 taskq_bucket_t *bucket = kmem_zalloc(sizeof (taskq_bucket_t) * 1637 bsize, KM_SLEEP); 1638 int b_id; 1639 1640 tq->tq_buckets = bucket; 1641 1642 /* Initialize each bucket */ 1643 for (b_id = 0; b_id < bsize; b_id++, bucket++) { 1644 mutex_init(&bucket->tqbucket_lock, NULL, MUTEX_DEFAULT, 1645 NULL); 1646 cv_init(&bucket->tqbucket_cv, NULL, CV_DEFAULT, NULL); 1647 bucket->tqbucket_taskq = tq; 1648 bucket->tqbucket_freelist.tqent_next = 1649 bucket->tqbucket_freelist.tqent_prev = 1650 &bucket->tqbucket_freelist; 1651 if (flags & TASKQ_PREPOPULATE) 1652 taskq_bucket_extend(bucket); 1653 } 1654 } 1655 1656 /* 1657 * Install kstats. 1658 * We have two cases: 1659 * 1) Instance is provided to taskq_create_instance(). In this case it 1660 * should be >= 0 and we use it. 1661 * 1662 * 2) Instance is not provided and is automatically generated 1663 */ 1664 if (flags & TASKQ_NOINSTANCE) { 1665 instance = tq->tq_instance = 1666 (int)(uintptr_t)vmem_alloc(taskq_id_arena, 1, VM_SLEEP); 1667 } 1668 1669 if (flags & TASKQ_DYNAMIC) { 1670 if ((tq->tq_kstat = kstat_create("unix", instance, 1671 tq->tq_name, "taskq_d", KSTAT_TYPE_NAMED, 1672 sizeof (taskq_d_kstat) / sizeof (kstat_named_t), 1673 KSTAT_FLAG_VIRTUAL)) != NULL) { 1674 tq->tq_kstat->ks_lock = &taskq_d_kstat_lock; 1675 tq->tq_kstat->ks_data = &taskq_d_kstat; 1676 tq->tq_kstat->ks_update = taskq_d_kstat_update; 1677 tq->tq_kstat->ks_private = tq; 1678 kstat_install(tq->tq_kstat); 1679 } 1680 } else { 1681 if ((tq->tq_kstat = kstat_create("unix", instance, tq->tq_name, 1682 "taskq", KSTAT_TYPE_NAMED, 1683 sizeof (taskq_kstat) / sizeof (kstat_named_t), 1684 KSTAT_FLAG_VIRTUAL)) != NULL) { 1685 tq->tq_kstat->ks_lock = &taskq_kstat_lock; 1686 tq->tq_kstat->ks_data = &taskq_kstat; 1687 tq->tq_kstat->ks_update = taskq_kstat_update; 1688 tq->tq_kstat->ks_private = tq; 1689 kstat_install(tq->tq_kstat); 1690 } 1691 } 1692 1693 return (tq); 1694 } 1695 1696 /* 1697 * taskq_destroy(). 1698 * 1699 * Assumes: by the time taskq_destroy is called no one will use this task queue 1700 * in any way and no one will try to dispatch entries in it. 1701 */ 1702 void 1703 taskq_destroy(taskq_t *tq) 1704 { 1705 taskq_bucket_t *b = tq->tq_buckets; 1706 int bid = 0; 1707 1708 ASSERT(! (tq->tq_flags & TASKQ_CPR_SAFE)); 1709 1710 /* 1711 * Destroy kstats. 1712 */ 1713 if (tq->tq_kstat != NULL) { 1714 kstat_delete(tq->tq_kstat); 1715 tq->tq_kstat = NULL; 1716 } 1717 1718 /* 1719 * Destroy instance if needed. 1720 */ 1721 if (tq->tq_flags & TASKQ_NOINSTANCE) { 1722 vmem_free(taskq_id_arena, (void *)(uintptr_t)(tq->tq_instance), 1723 1); 1724 tq->tq_instance = 0; 1725 } 1726 1727 /* 1728 * Unregister from the cpupct list. 1729 */ 1730 if (tq->tq_flags & TASKQ_THREADS_CPU_PCT) { 1731 taskq_cpupct_ent_t *tpp; 1732 1733 mutex_enter(&taskq_cpupct_lock); 1734 for (tpp = list_head(&taskq_cpupct_list); tpp != NULL; 1735 tpp = list_next(&taskq_cpupct_list, tpp)) { 1736 if (tpp->tp_taskq == tq) 1737 break; 1738 } 1739 ASSERT3P(tpp, !=, NULL); 1740 1741 list_remove(&taskq_cpupct_list, tpp); 1742 mutex_exit(&taskq_cpupct_lock); 1743 1744 kmem_free(tpp, sizeof (*tpp)); 1745 } 1746 1747 /* 1748 * Wait for any pending entries to complete. 1749 */ 1750 taskq_wait(tq); 1751 1752 mutex_enter(&tq->tq_lock); 1753 ASSERT((tq->tq_task.tqent_next == &tq->tq_task) && 1754 (tq->tq_active == 0)); 1755 1756 /* notify all the threads that they need to exit */ 1757 tq->tq_nthreads_target = 0; 1758 1759 tq->tq_flags |= TASKQ_CHANGING; 1760 cv_broadcast(&tq->tq_dispatch_cv); 1761 cv_broadcast(&tq->tq_exit_cv); 1762 1763 while (tq->tq_nthreads != 0) 1764 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 1765 1766 if (tq->tq_nthreads_max != 1) 1767 kmem_free(tq->tq_threadlist, sizeof (kthread_t *) * 1768 tq->tq_nthreads_max); 1769 1770 tq->tq_minalloc = 0; 1771 while (tq->tq_nalloc != 0) 1772 taskq_ent_free(tq, taskq_ent_alloc(tq, TQ_SLEEP)); 1773 1774 mutex_exit(&tq->tq_lock); 1775 1776 /* 1777 * Mark each bucket as closing and wakeup all sleeping threads. 1778 */ 1779 for (; (b != NULL) && (bid < tq->tq_nbuckets); b++, bid++) { 1780 taskq_ent_t *tqe; 1781 1782 mutex_enter(&b->tqbucket_lock); 1783 1784 b->tqbucket_flags |= TQBUCKET_CLOSE; 1785 /* Wakeup all sleeping threads */ 1786 1787 for (tqe = b->tqbucket_freelist.tqent_next; 1788 tqe != &b->tqbucket_freelist; tqe = tqe->tqent_next) 1789 cv_signal(&tqe->tqent_cv); 1790 1791 ASSERT(b->tqbucket_nalloc == 0); 1792 1793 /* 1794 * At this point we waited for all pending jobs to complete (in 1795 * both the task queue and the bucket and no new jobs should 1796 * arrive. Wait for all threads to die. 1797 */ 1798 while (b->tqbucket_nfree > 0) 1799 cv_wait(&b->tqbucket_cv, &b->tqbucket_lock); 1800 mutex_exit(&b->tqbucket_lock); 1801 mutex_destroy(&b->tqbucket_lock); 1802 cv_destroy(&b->tqbucket_cv); 1803 } 1804 1805 if (tq->tq_buckets != NULL) { 1806 ASSERT(tq->tq_flags & TASKQ_DYNAMIC); 1807 kmem_free(tq->tq_buckets, 1808 sizeof (taskq_bucket_t) * tq->tq_nbuckets); 1809 1810 /* Cleanup fields before returning tq to the cache */ 1811 tq->tq_buckets = NULL; 1812 tq->tq_tcreates = 0; 1813 tq->tq_tdeaths = 0; 1814 } else { 1815 ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC)); 1816 } 1817 1818 tq->tq_threads_ncpus_pct = 0; 1819 tq->tq_totaltime = 0; 1820 tq->tq_tasks = 0; 1821 tq->tq_maxtasks = 0; 1822 tq->tq_executed = 0; 1823 kmem_cache_free(taskq_cache, tq); 1824 } 1825 1826 /* 1827 * Extend a bucket with a new entry on the free list and attach a worker thread 1828 * to it. 1829 * 1830 * Argument: pointer to the bucket. 1831 * 1832 * This function may quietly fail. It is only used by taskq_dispatch() which 1833 * handles such failures properly. 1834 */ 1835 static void 1836 taskq_bucket_extend(void *arg) 1837 { 1838 taskq_ent_t *tqe; 1839 taskq_bucket_t *b = (taskq_bucket_t *)arg; 1840 taskq_t *tq = b->tqbucket_taskq; 1841 int nthreads; 1842 1843 if (! ENOUGH_MEMORY()) { 1844 TQ_STAT(b, tqs_nomem); 1845 return; 1846 } 1847 1848 mutex_enter(&tq->tq_lock); 1849 1850 /* 1851 * Observe global taskq limits on the number of threads. 1852 */ 1853 if (tq->tq_tcreates++ - tq->tq_tdeaths > tq->tq_maxsize) { 1854 tq->tq_tcreates--; 1855 mutex_exit(&tq->tq_lock); 1856 return; 1857 } 1858 mutex_exit(&tq->tq_lock); 1859 1860 tqe = kmem_cache_alloc(taskq_ent_cache, KM_NOSLEEP); 1861 1862 if (tqe == NULL) { 1863 mutex_enter(&tq->tq_lock); 1864 TQ_STAT(b, tqs_nomem); 1865 tq->tq_tcreates--; 1866 mutex_exit(&tq->tq_lock); 1867 return; 1868 } 1869 1870 ASSERT(tqe->tqent_thread == NULL); 1871 1872 tqe->tqent_bucket = b; 1873 1874 /* 1875 * Create a thread in a TS_STOPPED state first. If it is successfully 1876 * created, place the entry on the free list and start the thread. 1877 */ 1878 tqe->tqent_thread = thread_create(NULL, 0, taskq_d_thread, tqe, 1879 0, &p0, TS_STOPPED, tq->tq_pri); 1880 1881 /* 1882 * Once the entry is ready, link it to the the bucket free list. 1883 */ 1884 mutex_enter(&b->tqbucket_lock); 1885 tqe->tqent_func = NULL; 1886 TQ_APPEND(b->tqbucket_freelist, tqe); 1887 b->tqbucket_nfree++; 1888 TQ_STAT(b, tqs_tcreates); 1889 1890 #if TASKQ_STATISTIC 1891 nthreads = b->tqbucket_stat.tqs_tcreates - 1892 b->tqbucket_stat.tqs_tdeaths; 1893 b->tqbucket_stat.tqs_maxthreads = MAX(nthreads, 1894 b->tqbucket_stat.tqs_maxthreads); 1895 #endif 1896 1897 mutex_exit(&b->tqbucket_lock); 1898 /* 1899 * Start the stopped thread. 1900 */ 1901 thread_lock(tqe->tqent_thread); 1902 tqe->tqent_thread->t_taskq = tq; 1903 tqe->tqent_thread->t_schedflag |= TS_ALLSTART; 1904 setrun_locked(tqe->tqent_thread); 1905 thread_unlock(tqe->tqent_thread); 1906 } 1907 1908 static int 1909 taskq_kstat_update(kstat_t *ksp, int rw) 1910 { 1911 struct taskq_kstat *tqsp = &taskq_kstat; 1912 taskq_t *tq = ksp->ks_private; 1913 1914 if (rw == KSTAT_WRITE) 1915 return (EACCES); 1916 1917 tqsp->tq_tasks.value.ui64 = tq->tq_tasks; 1918 tqsp->tq_executed.value.ui64 = tq->tq_executed; 1919 tqsp->tq_maxtasks.value.ui64 = tq->tq_maxtasks; 1920 tqsp->tq_totaltime.value.ui64 = tq->tq_totaltime; 1921 tqsp->tq_nactive.value.ui64 = tq->tq_active; 1922 tqsp->tq_nalloc.value.ui64 = tq->tq_nalloc; 1923 tqsp->tq_pri.value.ui64 = tq->tq_pri; 1924 tqsp->tq_nthreads.value.ui64 = tq->tq_nthreads; 1925 return (0); 1926 } 1927 1928 static int 1929 taskq_d_kstat_update(kstat_t *ksp, int rw) 1930 { 1931 struct taskq_d_kstat *tqsp = &taskq_d_kstat; 1932 taskq_t *tq = ksp->ks_private; 1933 taskq_bucket_t *b = tq->tq_buckets; 1934 int bid = 0; 1935 1936 if (rw == KSTAT_WRITE) 1937 return (EACCES); 1938 1939 ASSERT(tq->tq_flags & TASKQ_DYNAMIC); 1940 1941 tqsp->tqd_btasks.value.ui64 = tq->tq_tasks; 1942 tqsp->tqd_bexecuted.value.ui64 = tq->tq_executed; 1943 tqsp->tqd_bmaxtasks.value.ui64 = tq->tq_maxtasks; 1944 tqsp->tqd_bnalloc.value.ui64 = tq->tq_nalloc; 1945 tqsp->tqd_bnactive.value.ui64 = tq->tq_active; 1946 tqsp->tqd_btotaltime.value.ui64 = tq->tq_totaltime; 1947 tqsp->tqd_pri.value.ui64 = tq->tq_pri; 1948 1949 tqsp->tqd_hits.value.ui64 = 0; 1950 tqsp->tqd_misses.value.ui64 = 0; 1951 tqsp->tqd_overflows.value.ui64 = 0; 1952 tqsp->tqd_tcreates.value.ui64 = 0; 1953 tqsp->tqd_tdeaths.value.ui64 = 0; 1954 tqsp->tqd_maxthreads.value.ui64 = 0; 1955 tqsp->tqd_nomem.value.ui64 = 0; 1956 tqsp->tqd_disptcreates.value.ui64 = 0; 1957 tqsp->tqd_totaltime.value.ui64 = 0; 1958 tqsp->tqd_nalloc.value.ui64 = 0; 1959 tqsp->tqd_nfree.value.ui64 = 0; 1960 1961 for (; (b != NULL) && (bid < tq->tq_nbuckets); b++, bid++) { 1962 tqsp->tqd_hits.value.ui64 += b->tqbucket_stat.tqs_hits; 1963 tqsp->tqd_misses.value.ui64 += b->tqbucket_stat.tqs_misses; 1964 tqsp->tqd_overflows.value.ui64 += b->tqbucket_stat.tqs_overflow; 1965 tqsp->tqd_tcreates.value.ui64 += b->tqbucket_stat.tqs_tcreates; 1966 tqsp->tqd_tdeaths.value.ui64 += b->tqbucket_stat.tqs_tdeaths; 1967 tqsp->tqd_maxthreads.value.ui64 += 1968 b->tqbucket_stat.tqs_maxthreads; 1969 tqsp->tqd_nomem.value.ui64 += b->tqbucket_stat.tqs_nomem; 1970 tqsp->tqd_disptcreates.value.ui64 += 1971 b->tqbucket_stat.tqs_disptcreates; 1972 tqsp->tqd_totaltime.value.ui64 += b->tqbucket_totaltime; 1973 tqsp->tqd_nalloc.value.ui64 += b->tqbucket_nalloc; 1974 tqsp->tqd_nfree.value.ui64 += b->tqbucket_nfree; 1975 } 1976 return (0); 1977 } 1978