1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2010 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 26 /* 27 * Copyright 2015 Nexenta Systems, Inc. All rights reserved. 28 * Copyright (c) 2017 by Delphix. All rights reserved. 29 * Copyright 2018, Joyent, Inc. 30 * Copyright 2023-2024 RackTop Systems, Inc. 31 */ 32 33 /* 34 * Kernel task queues: general-purpose asynchronous task scheduling. 35 * 36 * A common problem in kernel programming is the need to schedule tasks 37 * to be performed later, by another thread. There are several reasons 38 * you may want or need to do this: 39 * 40 * (1) The task isn't time-critical, but your current code path is. 41 * 42 * (2) The task may require grabbing locks that you already hold. 43 * 44 * (3) The task may need to block (e.g. to wait for memory), but you 45 * cannot block in your current context. 46 * 47 * (4) Your code path can't complete because of some condition, but you can't 48 * sleep or fail, so you queue the task for later execution when condition 49 * disappears. 50 * 51 * (5) You just want a simple way to launch multiple tasks in parallel. 52 * 53 * Task queues provide such a facility. In its simplest form (used when 54 * performance is not a critical consideration) a task queue consists of a 55 * single list of tasks, together with one or more threads to service the 56 * list. There are some cases when this simple queue is not sufficient: 57 * 58 * (1) The task queues are very hot and there is a need to avoid data and lock 59 * contention over global resources. 60 * 61 * (2) Some tasks may depend on other tasks to complete, so they can't be put in 62 * the same list managed by the same thread. 63 * 64 * (3) Some tasks may block for a long time, and this should not block other 65 * tasks in the queue. 66 * 67 * To provide useful service in such cases we define a "dynamic task queue" 68 * which has an individual thread for each of the tasks. These threads are 69 * dynamically created as they are needed and destroyed when they are not in 70 * use. The API for managing task pools is the same as for managing task queues 71 * with the exception of a taskq creation flag TASKQ_DYNAMIC which tells that 72 * dynamic task pool behavior is desired. 73 * 74 * Dynamic task queues may also place tasks in a "backlog" when a taskq is 75 * resource constrained. Users of task queues may prevent tasks from being 76 * enqueued in the backlog by passing TQ_NOQUEUE in the dispatch call. 77 * 78 * See "Dynamic Task Queues" below for more details. 79 * 80 * INTERFACES ================================================================== 81 * 82 * taskq_t *taskq_create(name, nthreads, pri, minalloc, maxalloc, flags); 83 * 84 * Create a taskq with specified properties. 85 * Possible 'flags': 86 * 87 * TASKQ_DYNAMIC: Create task pool for task management. If this flag is 88 * specified, 'nthreads' specifies the maximum number of threads in 89 * the task queue. Task execution order for dynamic task queues is 90 * not predictable. 91 * 92 * If this flag is not specified (default case) a 93 * single-list task queue is created with 'nthreads' threads 94 * servicing it. Entries in this queue are managed by 95 * taskq_ent_alloc() and taskq_ent_free() which try to keep the 96 * task population between 'minalloc' and 'maxalloc', but the 97 * latter limit is only advisory for TQ_SLEEP dispatches and the 98 * former limit is only advisory for TQ_NOALLOC dispatches. If 99 * TASKQ_PREPOPULATE is set in 'flags', the taskq will be 100 * prepopulated with 'minalloc' task structures. 101 * 102 * Since non-DYNAMIC taskqs are queues, tasks are guaranteed to be 103 * executed in the order they are scheduled if nthreads == 1. 104 * If nthreads > 1, task execution order is not predictable. 105 * 106 * TASKQ_PREPOPULATE: Prepopulate task queue with threads. 107 * Also prepopulate the task queue with 'minalloc' task structures. 108 * 109 * TASKQ_THREADS_CPU_PCT: This flag specifies that 'nthreads' should be 110 * interpreted as a percentage of the # of online CPUs on the 111 * system. The taskq subsystem will automatically adjust the 112 * number of threads in the taskq in response to CPU online 113 * and offline events, to keep the ratio. nthreads must be in 114 * the range [0,100]. 115 * 116 * The calculation used is: 117 * 118 * MAX((ncpus_online * percentage)/100, 1) 119 * 120 * This flag is not supported for DYNAMIC task queues. 121 * This flag is not compatible with TASKQ_CPR_SAFE. 122 * 123 * TASKQ_CPR_SAFE: This flag specifies that users of the task queue will 124 * use their own protocol for handling CPR issues. This flag is not 125 * supported for DYNAMIC task queues. This flag is not compatible 126 * with TASKQ_THREADS_CPU_PCT. 127 * 128 * The 'pri' field specifies the default priority for the threads that 129 * service all scheduled tasks. 130 * 131 * taskq_t *taskq_create_instance(name, instance, nthreads, pri, minalloc, 132 * maxalloc, flags); 133 * 134 * Like taskq_create(), but takes an instance number (or -1 to indicate 135 * no instance). 136 * 137 * taskq_t *taskq_create_proc(name, nthreads, pri, minalloc, maxalloc, proc, 138 * flags); 139 * 140 * Like taskq_create(), but creates the taskq threads in the specified 141 * system process. If proc != &p0, this must be called from a thread 142 * in that process. 143 * 144 * taskq_t *taskq_create_sysdc(name, nthreads, minalloc, maxalloc, proc, 145 * dc, flags); 146 * 147 * Like taskq_create_proc(), but the taskq threads will use the 148 * System Duty Cycle (SDC) scheduling class with a duty cycle of dc. 149 * 150 * void taskq_destroy(tap): 151 * 152 * Waits for any scheduled tasks to complete, then destroys the taskq. 153 * Caller should guarantee that no new tasks are scheduled in the closing 154 * taskq. 155 * 156 * taskqid_t taskq_dispatch(tq, func, arg, flags): 157 * 158 * Dispatches the task "func(arg)" to taskq. The 'flags' indicates whether 159 * the caller is willing to block for memory. The function returns an 160 * opaque value which is zero iff dispatch fails. If flags is TQ_NOSLEEP 161 * or TQ_NOALLOC and the task can't be dispatched, taskq_dispatch() fails 162 * and returns TASKQID_INVALID. 163 * 164 * ASSUMES: func != NULL. 165 * 166 * Possible flags: 167 * TQ_NOSLEEP: Do not wait for resources; may fail. 168 * 169 * TQ_NOALLOC: Do not allocate memory; may fail. May only be used with 170 * non-dynamic task queues. 171 * 172 * TQ_NOQUEUE: Do not enqueue a task if it can't dispatch it due to 173 * lack of available resources and fail. If this flag is not 174 * set, and the task pool is exhausted, the task may be scheduled 175 * in the backing queue. This flag may ONLY be used with dynamic 176 * task queues. 177 * 178 * NOTE: This flag should always be used when a task queue is used 179 * for tasks that may depend on each other for completion. 180 * Enqueueing dependent tasks may create deadlocks. 181 * 182 * TQ_SLEEP: May block waiting for resources. May still fail for 183 * dynamic task queues if TQ_NOQUEUE is also specified, otherwise 184 * always succeed. 185 * 186 * TQ_FRONT: Puts the new task at the front of the queue. Be careful. 187 * 188 * NOTE: Dynamic task queues are much more likely to fail in 189 * taskq_dispatch() (especially if TQ_NOQUEUE was specified), so it 190 * is important to have backup strategies handling such failures. 191 * 192 * void taskq_dispatch_ent(tq, func, arg, flags, tqent) 193 * 194 * This is a light-weight form of taskq_dispatch(), that uses a 195 * preallocated taskq_ent_t structure for scheduling. As a 196 * result, it does not perform allocations and cannot ever fail. 197 * Note especially that it cannot be used with TASKQ_DYNAMIC 198 * taskqs. The memory for the tqent must not be modified or used 199 * until the function (func) is called. (However, func itself 200 * may safely modify or free this memory, once it is called.) 201 * Note that the taskq framework will NOT free this memory. 202 * 203 * boolean_t taskq_empty(tq) 204 * 205 * Queries if there are tasks pending on the queue. 206 * 207 * void taskq_wait(tq): 208 * 209 * Waits for all previously scheduled tasks to complete. 210 * 211 * NOTE: It does not stop any new task dispatches. 212 * Do NOT call taskq_wait() from a task: it will cause deadlock. 213 * 214 * void taskq_suspend(tq) 215 * 216 * Suspend all task execution. Tasks already scheduled for a dynamic task 217 * queue will still be executed, but all new scheduled tasks will be 218 * suspended until taskq_resume() is called. 219 * 220 * int taskq_suspended(tq) 221 * 222 * Returns 1 if taskq is suspended and 0 otherwise. It is intended to 223 * ASSERT that the task queue is suspended. 224 * 225 * void taskq_resume(tq) 226 * 227 * Resume task queue execution. 228 * 229 * int taskq_member(tq, thread) 230 * 231 * Returns 1 if 'thread' belongs to taskq 'tq' and 0 otherwise. The 232 * intended use is to ASSERT that a given function is called in taskq 233 * context only. 234 * 235 * system_taskq 236 * 237 * Global system-wide dynamic task queue for common uses. It may be used by 238 * any subsystem that needs to schedule tasks and does not need to manage 239 * its own task queues. It is initialized quite early during system boot. 240 * 241 * IMPLEMENTATION ============================================================== 242 * 243 * This is schematic representation of the task queue structures. 244 * 245 * taskq: 246 * +-------------+ 247 * | tq_lock | +---< taskq_ent_free() 248 * +-------------+ | 249 * |... | | tqent: tqent: 250 * +-------------+ | +------------+ +------------+ 251 * | tq_freelist |-->| tqent_next |--> ... ->| tqent_next | 252 * +-------------+ +------------+ +------------+ 253 * |... | | ... | | ... | 254 * +-------------+ +------------+ +------------+ 255 * | tq_task | | 256 * | | +-------------->taskq_ent_alloc() 257 * +--------------------------------------------------------------------------+ 258 * | | | tqent tqent | 259 * | +---------------------+ +--> +------------+ +--> +------------+ | 260 * | | ... | | | func, arg | | | func, arg | | 261 * +>+---------------------+ <---|-+ +------------+ <---|-+ +------------+ | 262 * | tq_task.tqent_next | ----+ | | tqent_next | --->+ | | tqent_next |--+ 263 * +---------------------+ | +------------+ ^ | +------------+ 264 * +-| tq_task.tqent_prev | +--| tqent_prev | | +--| tqent_prev | ^ 265 * | +---------------------+ +------------+ | +------------+ | 266 * | |... | | ... | | | ... | | 267 * | +---------------------+ +------------+ | +------------+ | 268 * | ^ | | 269 * | | | | 270 * +--------------------------------------+--------------+ TQ_APPEND() -+ 271 * | | | 272 * |... | taskq_thread()-----+ 273 * +-------------+ 274 * | tq_buckets |--+-------> [ NULL ] (for regular task queues) 275 * +-------------+ | 276 * | DYNAMIC TASK QUEUES: 277 * | 278 * +-> taskq_idlebucket taskq_idlebucket_dispatch() 279 * +-> taskq_bucket[nCPU] taskq_bucket_dispatch() 280 * +-------------------+ ^ 281 * +--->| tqbucket_lock | | 282 * | +-------------------+ +--------+ +--------+ 283 * | | tqbucket_freelist |-->| tqent |-->...| tqent | 284 * | +-------------------+<--+--------+<--...+--------+ 285 * | | | | thread | | thread | 286 * | | ... | +--------+ +--------+ 287 * | | | 288 * | +-------------------+ +--------+ +--------+ 289 * | | tqbucket_backlog |-->| tqent |-->...| tqent | 290 * | +-------------------+<--+--------+<--...+--------+ 291 * | | ... | (no thread) 292 * | +-------------------+ 293 * | 294 * | +-------------------+ 295 * taskq_dispatch()--+--->| tqbucket_lock | 296 * TQ_HASH() | +-------------------+ +--------+ +--------+ 297 * | | tqbucket_freelist |-->| tqent |-->...| tqent | 298 * | +-------------------+<--+--------+<--...+--------+ 299 * | | | | thread | | thread | 300 * | | ... | +--------+ +--------+ 301 * | | | 302 * | +-------------------+ +--------+ +--------+ 303 * | | tqbucket_backlog |-->| tqent |-->...| tqent | 304 * | +-------------------+<--+--------+<--...+--------+ 305 * | | ... | (no thread) 306 * | +-------------------+ 307 * | 308 * +---> ... 309 * 310 * 311 * Task queues use tq_task field to link new entry in the queue. The queue is a 312 * circular doubly-linked list. Entries are put in the end of the list with 313 * TQ_APPEND() and processed from the front of the list by taskq_thread() in 314 * FIFO order. Task queue entries are cached in the free list managed by 315 * taskq_ent_alloc() and taskq_ent_free() functions. 316 * 317 * All threads used by task queues mark t_taskq field of the thread to 318 * point to the task queue. 319 * 320 * Taskq Thread Management ----------------------------------------------------- 321 * 322 * Taskq's non-dynamic threads are managed with several variables and flags: 323 * 324 * * tq_nthreads - The number of threads in taskq_thread() for the 325 * taskq. 326 * 327 * * tq_active - The number of threads not waiting on a CV in 328 * taskq_thread(); includes newly created threads 329 * not yet counted in tq_nthreads. 330 * 331 * * tq_nthreads_target 332 * - The number of threads desired for the taskq. 333 * 334 * * tq_flags & TASKQ_CHANGING 335 * - Indicates that tq_nthreads != tq_nthreads_target. 336 * 337 * * tq_flags & TASKQ_THREAD_CREATED 338 * - Indicates that a thread is being created in the taskq. 339 * 340 * During creation, tq_nthreads and tq_active are set to 0, and 341 * tq_nthreads_target is set to the number of threads desired. The 342 * TASKQ_CHANGING flag is set, and taskq_thread_create() is called to 343 * create the first thread. taskq_thread_create() increments tq_active, 344 * sets TASKQ_THREAD_CREATED, and creates the new thread. 345 * 346 * Each thread starts in taskq_thread(), clears the TASKQ_THREAD_CREATED 347 * flag, and increments tq_nthreads. It stores the new value of 348 * tq_nthreads as its "thread_id", and stores its thread pointer in the 349 * tq_threadlist at the (thread_id - 1). We keep the thread_id space 350 * densely packed by requiring that only the largest thread_id can exit during 351 * normal adjustment. The exception is during the destruction of the 352 * taskq; once tq_nthreads_target is set to zero, no new threads will be created 353 * for the taskq queue, so every thread can exit without any ordering being 354 * necessary. 355 * 356 * Threads will only process work if their thread id is <= tq_nthreads_target. 357 * 358 * When TASKQ_CHANGING is set, threads will check the current thread target 359 * whenever they wake up, and do whatever they can to apply its effects. 360 * 361 * TASKQ_THREAD_CPU_PCT -------------------------------------------------------- 362 * 363 * When a taskq is created with TASKQ_THREAD_CPU_PCT, we store their requested 364 * percentage in tq_threads_ncpus_pct, start them off with the correct thread 365 * target, and add them to the taskq_cpupct_list for later adjustment. 366 * 367 * We register taskq_cpu_setup() to be called whenever a CPU changes state. It 368 * walks the list of TASKQ_THREAD_CPU_PCT taskqs, adjusts their nthreads_target 369 * if need be, and wakes up all of the threads to process the change. 370 * 371 * Dynamic Task Queues Implementation ------------------------------------------ 372 * 373 * For a dynamic task queue, the set of worker threads expands and contracts 374 * based on the workload presented via taskq_dispatch calls. The work of a 375 * dynamic task queue is distributed across an array of "buckets" to reduce 376 * lock contention, with distribution determined via a hash (See TQ_HASH). 377 * The array of buckets is sized based on the number of CPUs in the system. 378 * The tunable 'taskq_maxbuckets' limits the maximum number of buckets. 379 * One additional bucket is used as the "idle bucket" (details below). 380 * 381 * Each bucket also has a "backlog" list, used to store pending jobs, 382 * which are taskq_ent_t objects with no associated thread. The total of 383 * backlogged work is distributed through the array of buckets, so that as 384 * threads become available in each bucket, they begin work on the backlog 385 * in parallel. In order to ensure progress on the backlog, some care is 386 * taken to avoid buckets with a backlog with no threads. 387 * 388 * Each bucket usually has some worker threads ready to accept new work, 389 * represented by a taskq_ent_t on the tqbucket_freelist. In addition to 390 * that array of buckets there is one more bucket called the "idle bucket", 391 * used as a place to put idle threads that might be moved to a regular 392 * bucket when that bucket needs another worker thread. When a dispatch 393 * call (one willing to sleep) finds no free thread in either the hashed 394 * bucket free list nor in the idle bucket, it will attempt to create a 395 * new thread in the hashed bucket (see taskq_bucket_extend). 396 * 397 * Dispatch first tries a bucket chosen by hash, then the idle bucket. 398 * If the dispatch call allows sleeping, it then attempts to extend the 399 * bucket chosen by hash, and makes a dispatch attempt on that bucket. 400 * If that all fails, and if the dispatch call allows a queued task, 401 * an entry is placed on a per-bucket backlog queue. The backlog is 402 * serviced as soon as other bucket threads become available. 403 * 404 * Worker threads wait a "short" time (taskq_thread_bucket_wait) on the 405 * free list for the bucket in which they were dispatched, and if no new 406 * work takes them off the free list before the expiration of the "short" 407 * wait, the thread takes itself off that bucket free list and moves to 408 * the "idle bucket", where waits longer (taskq_thread_timeout), before 409 * giving up waiting for work and exiting. 410 * 411 * New threads normally start life in one of the buckets (chosen by hash) 412 * and stay there while there's work for that bucket. After a thread 413 * waits in a bucket for a short time (taskq_d_svc_tmo) without having 414 * any task assigned, it migrates to the idle bucket. An exception 415 * is made for TASKQ_PREPOPULATE, in which case threads start out in 416 * the idle bucket. 417 * 418 * Running taskq_ent_t entries are not on any list. The dispatch function 419 * sets their "func" and "arg" fields and signals the corresponding thread to 420 * execute the task. Once the thread executes the task it clears the "func" 421 * field and places an entry on the per-bucket "tqbucket_freelist" which is 422 * used as a short-term cache of threads available for that bucket. All 423 * entries on the free list should have the "func" field equal to NULL. 424 * The free list is a circular doubly-linked list identical in structure to 425 * the tq_task list above, but entries are taken from it in LIFO order so 426 * that threads seeing no work for a while can move to the idle bucket. 427 * 428 * The taskq_bucket_dispatch() function gets the most recently used entry 429 * from the free list, sets its "func" and "arg" fields and signals a worker 430 * thread. Dispatch first tries a bucket selected via hash, then the idle 431 * bucket. If both of those fail (and depending on options) an attempt to 432 * add threads to the bucket is made. 433 * 434 * After executing each task a per-entry thread taskq_d_thread() places its 435 * entry on the bucket free list and goes to a (short) timed sleep. If it 436 * wakes up without getting a new task it, it removes the entry from the 437 * free list and "migrates" to the "idle bucket" for a longer wait. 438 * If that longer wait expires without work arriving, the thread exits. 439 * The thread sleep time is controlled by a tunable `taskq_thread_timeout'. 440 * A thread may be dispatched work from the idle bucket (eg. when dispatch 441 * fails to find a free entry in the hashed buckets). When a thread is 442 * dispatched from the idle bucket, it moves to the bucket that the hash 443 * initially selected. 444 * 445 * Dynamic task queues make limited use of the "backing queue", which is 446 * the same taskq->tq_task list used by orginary (non-dynamic) task queues. 447 * The only taskq entries places on this list are for taskq_bucket_overflow 448 * calls, used to request thread creation for some bucket after a dispatch 449 * call fails to find a ready thread in some bucket. There is only one 450 * thread servicing this backing queue, so these jobs should only sleep 451 * for memory allocation, and shoud not run jobs that block indefinitely. 452 * 453 * There are various statistics kept in the bucket which allows for later 454 * analysis of taskq usage patterns. Also, a global copy of taskq creation and 455 * death statistics is kept in the global taskq data structure. Since thread 456 * creation and death happen rarely, updating such global data does not present 457 * a performance problem. 458 * 459 * NOTE: Threads are not bound to any CPU and there is absolutely no association 460 * between the bucket and actual thread CPU, so buckets are used only to 461 * split resources and reduce resource contention. Having threads attached 462 * to the CPU denoted by a bucket may reduce number of times the job 463 * switches between CPUs. 464 * 465 * Current algorithm creates a thread whenever a bucket has no free 466 * entries. It would be nice to know how many threads are in the running 467 * state and don't create threads if all CPUs are busy with existing 468 * tasks, but it is unclear how such strategy can be implemented. 469 * 470 * Currently buckets are created statically as an array attached to task 471 * queue. On some system with nCPUs < max_ncpus it may waste system 472 * memory. One solution may be allocation of buckets when they are first 473 * touched, but it is not clear how useful it is. 474 * 475 * SUSPEND/RESUME implementation ----------------------------------------------- 476 * 477 * Before executing a task taskq_thread() (executing non-dynamic task 478 * queues) obtains taskq's thread lock as a reader. The taskq_suspend() 479 * function gets the same lock as a writer blocking all non-dynamic task 480 * execution. The taskq_resume() function releases the lock allowing 481 * taskq_thread to continue execution. 482 * 483 * For dynamic task queues, each bucket is marked as TQBUCKET_SUSPEND by 484 * taskq_suspend() function. After that taskq_bucket_dispatch() always 485 * fails, so that taskq_dispatch() will either enqueue tasks for a 486 * suspended backing queue or fail if TQ_NOQUEUE is specified in dispatch 487 * flags. 488 * 489 * NOTE: taskq_suspend() does not immediately block any tasks already 490 * scheduled for dynamic task queues. It only suspends new tasks 491 * scheduled after taskq_suspend() was called. 492 * 493 * taskq_member() function works by comparing a thread t_taskq pointer with 494 * the passed thread pointer. 495 * 496 * LOCKS and LOCK Order ------------------------------------------------------- 497 * 498 * There are four locks used in task queues: 499 * 500 * 1a) The idle bucket lock for bucket management. 501 * 1b) The hashed bucket locks for bucket management. 502 * 503 * 2) The global taskq_cpupct_lock, which protects the list of 504 * TASKQ_THREADS_CPU_PCT taskqs. 505 * 506 * 3) The taskq_t's tq_lock, protecting global task queue state. 507 * 508 * There are a few cases where two of these are entered, and when that 509 * happens the lock entries are in the order they are listed here. 510 * 511 * DEBUG FACILITIES ------------------------------------------------------------ 512 * 513 * For DEBUG kernels it is possible to induce random failures to 514 * taskq_dispatch() function when it is given TQ_NOSLEEP argument. The value of 515 * taskq_dmtbf and taskq_smtbf tunables control the mean time between induced 516 * failures for dynamic and static task queues respectively. 517 * 518 * Setting TASKQ_STATISTIC to 0 will disable per-bucket statistics. 519 * 520 * TUNABLES -------------------------------------------------------------------- 521 * 522 * system_taskq_size - Size of the global system_taskq. 523 * This value is multiplied by nCPUs to determine 524 * actual size. 525 * Default value: 64 526 * 527 * taskq_minimum_nthreads_max 528 * - Minimum size of the thread list for a taskq. 529 * Useful for testing different thread pool 530 * sizes by overwriting tq_nthreads_target. 531 * 532 * taskq_thread_timeout - Maximum idle time for taskq_d_thread() 533 * Default value: 5 minutes 534 * 535 * taskq_maxbuckets - Maximum number of buckets in any task queue 536 * Default value: 128 537 * 538 * taskq_dmtbf - Mean time between induced dispatch failures 539 * for dynamic task queues. 540 * Default value: UINT_MAX (no induced failures) 541 * 542 * taskq_smtbf - Mean time between induced dispatch failures 543 * for static task queues. 544 * Default value: UINT_MAX (no induced failures) 545 * 546 * CONDITIONAL compilation ----------------------------------------------------- 547 * 548 * TASKQ_STATISTIC - If set will enable bucket statistic (default). 549 * 550 */ 551 552 #include <sys/taskq_impl.h> 553 #include <sys/thread.h> 554 #include <sys/proc.h> 555 #include <sys/kmem.h> 556 #include <sys/vmem.h> 557 #include <sys/callb.h> 558 #include <sys/class.h> 559 #include <sys/systm.h> 560 #include <sys/cmn_err.h> 561 #include <sys/debug.h> 562 #include <sys/vmsystm.h> /* For throttlefree */ 563 #include <sys/sysmacros.h> 564 #include <sys/cpuvar.h> 565 #include <sys/cpupart.h> 566 #include <sys/sdt.h> 567 #include <sys/sysdc.h> 568 #include <sys/note.h> 569 570 static kmem_cache_t *taskq_ent_cache, *taskq_cache; 571 572 /* 573 * Pseudo instance numbers for taskqs without explicitly provided instance. 574 */ 575 static vmem_t *taskq_id_arena; 576 577 /* Global system task queue for common use */ 578 taskq_t *system_taskq; 579 580 /* 581 * Maximum number of entries in global system taskq is 582 * system_taskq_size * max_ncpus 583 */ 584 #define SYSTEM_TASKQ_SIZE 64 585 int system_taskq_size = SYSTEM_TASKQ_SIZE; 586 587 /* 588 * Minimum size for tq_nthreads_max; useful for those who want to play around 589 * with increasing a taskq's tq_nthreads_target. 590 */ 591 int taskq_minimum_nthreads_max = 1; 592 593 /* 594 * We want to ensure that when taskq_create() returns, there is at least 595 * one thread ready to handle requests. To guarantee this, we have to wait 596 * for the second thread, since the first one cannot process requests until 597 * the second thread has been created. 598 */ 599 #define TASKQ_CREATE_ACTIVE_THREADS 2 600 601 /* Maximum percentage allowed for TASKQ_THREADS_CPU_PCT */ 602 #define TASKQ_CPUPCT_MAX_PERCENT 1000 603 int taskq_cpupct_max_percent = TASKQ_CPUPCT_MAX_PERCENT; 604 605 /* 606 * Dynamic task queue threads that don't get any work within 607 * taskq_thread_timeout destroy themselves 608 */ 609 #define TASKQ_THREAD_TIMEOUT (60 * 5) 610 int taskq_thread_timeout = TASKQ_THREAD_TIMEOUT; 611 612 /* 613 * Dynamic taskq queue threads stay in an empty bucket for only a 614 * relatively short time before moving to the "idle bucket". 615 */ 616 int taskq_thread_bucket_wait = 500; /* mSec. */ 617 618 /* 619 * A counter for debug and testing. See the increment site below. 620 */ 621 uint64_t taskq_disptcreates_lost = 0; 622 623 /* 624 * Upper and lower limits on number of buckets for dyanmic taskq. 625 * Must be a power of two. Dynamic should have more than one bucket. 626 * The floor of four is chosen somewhat arbitrarily, based on the 627 * smallest number of CPUs found in modern systems. 628 */ 629 #define TASKQ_MINBUCKETS 4 630 int taskq_minbuckets = TASKQ_MINBUCKETS; 631 #define TASKQ_MAXBUCKETS 128 632 int taskq_maxbuckets = TASKQ_MAXBUCKETS; 633 634 /* 635 * Hashing function: mix various bits of x and CPUHINT 636 * 637 * This hash is applied to the "arg" address supplied to taskq_dispatch. 638 * The distribution of objects in memory for that address are generally 639 * whatever the memory allocation system provides. We know only that they 640 * will be aligned to whatever minimum alignment is provided, and that the 641 * sizes of these objects will vary. Due to the known aligment, this hash 642 * function puts the CPU index in the lowest signigicant bits. Other bits 643 * are simply combined via XOR using a (low-cost) byte-access-compatible 644 * set of shifts. Emperical results show that this hash produces fairly 645 * even distribution for the consumers in this system. 646 */ 647 #define TQ_HASH(x, c) ((c) ^ (x) ^ ((x) >> 8) ^ ((x) >> 16) ^ ((x) >> 24)) 648 649 /* 650 * Get an index for the current CPU, used in the hash to spread 651 * work among buckets based on what CPU is running this. 652 */ 653 #define CPUHINT() ((uintptr_t)(CPU->cpu_seqid)) 654 655 /* 656 * We do not create any new threads when the system is low on memory and start 657 * throttling memory allocations. The following macro tries to estimate such 658 * condition. 659 */ 660 #define ENOUGH_MEMORY() (freemem > throttlefree) 661 662 /* 663 * Static functions. 664 */ 665 static taskq_t *taskq_create_common(const char *, int, int, pri_t, int, 666 int, proc_t *, uint_t, uint_t); 667 static void taskq_thread(void *); 668 static void taskq_d_thread(taskq_ent_t *); 669 static void taskq_d_migrate(void *); 670 static void taskq_d_redirect(void *); 671 static void taskq_bucket_overflow(void *); 672 static taskq_ent_t *taskq_bucket_extend(taskq_bucket_t *); 673 static void taskq_bucket_redist(taskq_bucket_t *); 674 static int taskq_constructor(void *, void *, int); 675 static void taskq_destructor(void *, void *); 676 static int taskq_ent_constructor(void *, void *, int); 677 static void taskq_ent_destructor(void *, void *); 678 static taskq_ent_t *taskq_ent_alloc(taskq_t *, int); 679 static void taskq_ent_free(taskq_t *, taskq_ent_t *); 680 static int taskq_ent_exists(taskq_t *, task_func_t, void *); 681 static taskq_ent_t *taskq_bucket_dispatch(taskq_bucket_t *, task_func_t, 682 void *); 683 static void taskq_backlog_enqueue(taskq_bucket_t *, 684 taskq_ent_t *tqe, int flags); 685 686 /* 687 * Task queues kstats. 688 */ 689 struct taskq_kstat { 690 kstat_named_t tq_pid; 691 kstat_named_t tq_tasks; 692 kstat_named_t tq_executed; 693 kstat_named_t tq_maxtasks; 694 kstat_named_t tq_totaltime; 695 kstat_named_t tq_nalloc; 696 kstat_named_t tq_nactive; 697 kstat_named_t tq_pri; 698 kstat_named_t tq_nthreads; 699 kstat_named_t tq_nomem; 700 } taskq_kstat = { 701 { "pid", KSTAT_DATA_UINT64 }, 702 { "tasks", KSTAT_DATA_UINT64 }, 703 { "executed", KSTAT_DATA_UINT64 }, 704 { "maxtasks", KSTAT_DATA_UINT64 }, 705 { "totaltime", KSTAT_DATA_UINT64 }, 706 { "nalloc", KSTAT_DATA_UINT64 }, 707 { "nactive", KSTAT_DATA_UINT64 }, 708 { "priority", KSTAT_DATA_UINT64 }, 709 { "threads", KSTAT_DATA_UINT64 }, 710 { "nomem", KSTAT_DATA_UINT64 }, 711 }; 712 713 struct taskq_d_kstat { 714 kstat_named_t tqd_pri; 715 kstat_named_t tqd_hits; 716 kstat_named_t tqd_misses; 717 kstat_named_t tqd_ihits; /* idle bucket hits */ 718 kstat_named_t tqd_imisses; /* idle bucket misses */ 719 kstat_named_t tqd_overflows; 720 kstat_named_t tqd_tcreates; 721 kstat_named_t tqd_tdeaths; 722 kstat_named_t tqd_maxthreads; 723 kstat_named_t tqd_nomem; 724 kstat_named_t tqd_disptcreates; 725 kstat_named_t tqd_totaltime; 726 kstat_named_t tqd_nalloc; 727 kstat_named_t tqd_nfree; 728 kstat_named_t tqd_nbacklog; 729 kstat_named_t tqd_maxbacklog; 730 } taskq_d_kstat = { 731 { "priority", KSTAT_DATA_UINT64 }, 732 { "hits", KSTAT_DATA_UINT64 }, 733 { "misses", KSTAT_DATA_UINT64 }, 734 { "ihits", KSTAT_DATA_UINT64 }, 735 { "imisses", KSTAT_DATA_UINT64 }, 736 { "overflows", KSTAT_DATA_UINT64 }, 737 { "tcreates", KSTAT_DATA_UINT64 }, 738 { "tdeaths", KSTAT_DATA_UINT64 }, 739 { "maxthreads", KSTAT_DATA_UINT64 }, 740 { "nomem", KSTAT_DATA_UINT64 }, 741 { "disptcreates", KSTAT_DATA_UINT64 }, 742 { "totaltime", KSTAT_DATA_UINT64 }, 743 { "nalloc", KSTAT_DATA_UINT64 }, 744 { "nfree", KSTAT_DATA_UINT64 }, 745 { "nbacklog", KSTAT_DATA_UINT64 }, 746 { "maxbacklog", KSTAT_DATA_UINT64 }, 747 }; 748 749 static kmutex_t taskq_kstat_lock; 750 static kmutex_t taskq_d_kstat_lock; 751 static int taskq_kstat_update(kstat_t *, int); 752 static int taskq_d_kstat_update(kstat_t *, int); 753 754 /* 755 * List of all TASKQ_THREADS_CPU_PCT taskqs. 756 */ 757 static list_t taskq_cpupct_list; /* protected by cpu_lock */ 758 759 /* 760 * Collect per-bucket statistic when TASKQ_STATISTIC is defined. 761 */ 762 #define TASKQ_STATISTIC 1 763 764 #if TASKQ_STATISTIC 765 #define TQ_STAT(b, x) b->tqbucket_stat.x++ 766 #else 767 #define TQ_STAT(b, x) 768 #endif 769 770 /* 771 * Random fault injection. 772 */ 773 uint_t taskq_random; 774 uint_t taskq_dmtbf = UINT_MAX; /* mean time between injected failures */ 775 uint_t taskq_smtbf = UINT_MAX; /* mean time between injected failures */ 776 777 /* 778 * TQ_NOSLEEP dispatches on dynamic task queues are always allowed to fail. 779 * 780 * TQ_NOSLEEP dispatches on static task queues can't arbitrarily fail because 781 * they could prepopulate the cache and make sure that they do not use more 782 * then minalloc entries. So, fault injection in this case insures that 783 * either TASKQ_PREPOPULATE is not set or there are more entries allocated 784 * than is specified by minalloc. TQ_NOALLOC dispatches are always allowed 785 * to fail, but for simplicity we treat them identically to TQ_NOSLEEP 786 * dispatches. 787 */ 788 #ifdef DEBUG 789 #define TASKQ_D_RANDOM_DISPATCH_FAILURE(tq, flag) \ 790 taskq_random = (taskq_random * 2416 + 374441) % 1771875;\ 791 if ((flag & TQ_NOSLEEP) && \ 792 taskq_random < 1771875 / taskq_dmtbf) { \ 793 return (TASKQID_INVALID); \ 794 } 795 796 #define TASKQ_S_RANDOM_DISPATCH_FAILURE(tq, flag) \ 797 taskq_random = (taskq_random * 2416 + 374441) % 1771875;\ 798 if ((flag & (TQ_NOSLEEP | TQ_NOALLOC)) && \ 799 (!(tq->tq_flags & TASKQ_PREPOPULATE) || \ 800 (tq->tq_nalloc > tq->tq_minalloc)) && \ 801 (taskq_random < (1771875 / taskq_smtbf))) { \ 802 mutex_exit(&tq->tq_lock); \ 803 return (TASKQID_INVALID); \ 804 } 805 #else 806 #define TASKQ_S_RANDOM_DISPATCH_FAILURE(tq, flag) 807 #define TASKQ_D_RANDOM_DISPATCH_FAILURE(tq, flag) 808 #endif 809 810 #define IS_EMPTY(l) (((l).tqent_prev == (l).tqent_next) && \ 811 ((l).tqent_prev == &(l))) 812 813 /* 814 * Initialize 'tqe' list head 815 */ 816 #define TQ_LIST_INIT(l) { \ 817 l.tqent_next = &l; \ 818 l.tqent_prev = &l; \ 819 } 820 /* 821 * Append `tqe' in the end of the doubly-linked list denoted by l. 822 */ 823 #define TQ_APPEND(l, tqe) { \ 824 tqe->tqent_next = &l; \ 825 tqe->tqent_prev = l.tqent_prev; \ 826 tqe->tqent_next->tqent_prev = tqe; \ 827 tqe->tqent_prev->tqent_next = tqe; \ 828 } 829 /* 830 * Prepend 'tqe' to the beginning of l 831 */ 832 #define TQ_PREPEND(l, tqe) { \ 833 tqe->tqent_next = l.tqent_next; \ 834 tqe->tqent_prev = &l; \ 835 tqe->tqent_next->tqent_prev = tqe; \ 836 tqe->tqent_prev->tqent_next = tqe; \ 837 } 838 /* 839 * Remove 'tqe' from some list 840 */ 841 #define TQ_REMOVE(tqe) { \ 842 tqe->tqent_prev->tqent_next = tqe->tqent_next; \ 843 tqe->tqent_next->tqent_prev = tqe->tqent_prev; \ 844 tqe->tqent_next = NULL; \ 845 tqe->tqent_prev = NULL; \ 846 } 847 848 /* 849 * Schedule a task specified by func and arg into the task queue entry tqe. 850 */ 851 #define TQ_DO_ENQUEUE(tq, tqe, func, arg, front) { \ 852 ASSERT(MUTEX_HELD(&tq->tq_lock)); \ 853 _NOTE(CONSTCOND) \ 854 if (front) { \ 855 TQ_PREPEND(tq->tq_task, tqe); \ 856 } else { \ 857 TQ_APPEND(tq->tq_task, tqe); \ 858 } \ 859 tqe->tqent_func = (func); \ 860 tqe->tqent_arg = (arg); \ 861 tq->tq_tasks++; \ 862 if (tq->tq_tasks - tq->tq_executed > tq->tq_maxtasks) \ 863 tq->tq_maxtasks = tq->tq_tasks - tq->tq_executed; \ 864 cv_signal(&tq->tq_dispatch_cv); \ 865 DTRACE_PROBE2(taskq__enqueue, taskq_t *, tq, taskq_ent_t *, tqe); \ 866 } 867 868 #define TQ_ENQUEUE(tq, tqe, func, arg) \ 869 TQ_DO_ENQUEUE(tq, tqe, func, arg, 0) 870 871 #define TQ_ENQUEUE_FRONT(tq, tqe, func, arg) \ 872 TQ_DO_ENQUEUE(tq, tqe, func, arg, 1) 873 874 /* 875 * Do-nothing task which may be used to prepopulate thread caches. 876 */ 877 /*ARGSUSED*/ 878 void 879 nulltask(void *unused) 880 { 881 } 882 883 /*ARGSUSED*/ 884 static int 885 taskq_constructor(void *buf, void *cdrarg, int kmflags) 886 { 887 taskq_t *tq = buf; 888 889 bzero(tq, sizeof (taskq_t)); 890 891 mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL); 892 rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL); 893 cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL); 894 cv_init(&tq->tq_exit_cv, NULL, CV_DEFAULT, NULL); 895 cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL); 896 cv_init(&tq->tq_maxalloc_cv, NULL, CV_DEFAULT, NULL); 897 898 tq->tq_task.tqent_next = &tq->tq_task; 899 tq->tq_task.tqent_prev = &tq->tq_task; 900 901 return (0); 902 } 903 904 /*ARGSUSED*/ 905 static void 906 taskq_destructor(void *buf, void *cdrarg) 907 { 908 taskq_t *tq = buf; 909 910 ASSERT(tq->tq_nthreads == 0); 911 ASSERT(tq->tq_buckets == NULL); 912 ASSERT(tq->tq_dnthreads == 0); 913 914 mutex_destroy(&tq->tq_lock); 915 rw_destroy(&tq->tq_threadlock); 916 cv_destroy(&tq->tq_dispatch_cv); 917 cv_destroy(&tq->tq_exit_cv); 918 cv_destroy(&tq->tq_wait_cv); 919 cv_destroy(&tq->tq_maxalloc_cv); 920 } 921 922 /*ARGSUSED*/ 923 static int 924 taskq_ent_constructor(void *buf, void *cdrarg, int kmflags) 925 { 926 taskq_ent_t *tqe = buf; 927 928 tqe->tqent_thread = NULL; 929 cv_init(&tqe->tqent_cv, NULL, CV_DEFAULT, NULL); 930 931 return (0); 932 } 933 934 /*ARGSUSED*/ 935 static void 936 taskq_ent_destructor(void *buf, void *cdrarg) 937 { 938 taskq_ent_t *tqe = buf; 939 940 ASSERT(tqe->tqent_thread == NULL); 941 cv_destroy(&tqe->tqent_cv); 942 } 943 944 void 945 taskq_init(void) 946 { 947 taskq_ent_cache = kmem_cache_create("taskq_ent_cache", 948 sizeof (taskq_ent_t), 0, taskq_ent_constructor, 949 taskq_ent_destructor, NULL, NULL, NULL, 0); 950 taskq_cache = kmem_cache_create("taskq_cache", sizeof (taskq_t), 951 0, taskq_constructor, taskq_destructor, NULL, NULL, NULL, 0); 952 taskq_id_arena = vmem_create("taskq_id_arena", 953 (void *)1, INT32_MAX, 1, NULL, NULL, NULL, 0, 954 VM_SLEEP | VMC_IDENTIFIER); 955 956 list_create(&taskq_cpupct_list, sizeof (taskq_t), 957 offsetof(taskq_t, tq_cpupct_link)); 958 } 959 960 static void 961 taskq_update_nthreads(taskq_t *tq, uint_t ncpus) 962 { 963 uint_t newtarget = TASKQ_THREADS_PCT(ncpus, tq->tq_threads_ncpus_pct); 964 965 ASSERT(MUTEX_HELD(&cpu_lock)); 966 ASSERT(MUTEX_HELD(&tq->tq_lock)); 967 968 /* We must be going from non-zero to non-zero; no exiting. */ 969 ASSERT3U(tq->tq_nthreads_target, !=, 0); 970 ASSERT3U(newtarget, !=, 0); 971 972 ASSERT3U(newtarget, <=, tq->tq_nthreads_max); 973 if (newtarget != tq->tq_nthreads_target) { 974 tq->tq_flags |= TASKQ_CHANGING; 975 tq->tq_nthreads_target = newtarget; 976 cv_broadcast(&tq->tq_dispatch_cv); 977 cv_broadcast(&tq->tq_exit_cv); 978 } 979 } 980 981 /* called during task queue creation */ 982 static void 983 taskq_cpupct_install(taskq_t *tq, cpupart_t *cpup) 984 { 985 ASSERT(tq->tq_flags & TASKQ_THREADS_CPU_PCT); 986 987 mutex_enter(&cpu_lock); 988 mutex_enter(&tq->tq_lock); 989 tq->tq_cpupart = cpup->cp_id; 990 taskq_update_nthreads(tq, cpup->cp_ncpus); 991 mutex_exit(&tq->tq_lock); 992 993 list_insert_tail(&taskq_cpupct_list, tq); 994 mutex_exit(&cpu_lock); 995 } 996 997 static void 998 taskq_cpupct_remove(taskq_t *tq) 999 { 1000 ASSERT(tq->tq_flags & TASKQ_THREADS_CPU_PCT); 1001 1002 mutex_enter(&cpu_lock); 1003 list_remove(&taskq_cpupct_list, tq); 1004 mutex_exit(&cpu_lock); 1005 } 1006 1007 /*ARGSUSED*/ 1008 static int 1009 taskq_cpu_setup(cpu_setup_t what, int id, void *arg) 1010 { 1011 taskq_t *tq; 1012 cpupart_t *cp = cpu[id]->cpu_part; 1013 uint_t ncpus = cp->cp_ncpus; 1014 1015 ASSERT(MUTEX_HELD(&cpu_lock)); 1016 ASSERT(ncpus > 0); 1017 1018 switch (what) { 1019 case CPU_OFF: 1020 case CPU_CPUPART_OUT: 1021 /* offlines are called *before* the cpu is offlined. */ 1022 if (ncpus > 1) 1023 ncpus--; 1024 break; 1025 1026 case CPU_ON: 1027 case CPU_CPUPART_IN: 1028 break; 1029 1030 default: 1031 return (0); /* doesn't affect cpu count */ 1032 } 1033 1034 for (tq = list_head(&taskq_cpupct_list); tq != NULL; 1035 tq = list_next(&taskq_cpupct_list, tq)) { 1036 1037 mutex_enter(&tq->tq_lock); 1038 /* 1039 * If the taskq is part of the cpuset which is changing, 1040 * update its nthreads_target. 1041 */ 1042 if (tq->tq_cpupart == cp->cp_id) { 1043 taskq_update_nthreads(tq, ncpus); 1044 } 1045 mutex_exit(&tq->tq_lock); 1046 } 1047 return (0); 1048 } 1049 1050 void 1051 taskq_mp_init(void) 1052 { 1053 mutex_enter(&cpu_lock); 1054 register_cpu_setup_func(taskq_cpu_setup, NULL); 1055 /* 1056 * Make sure we're up to date. At this point in boot, there is only 1057 * one processor set, so we only have to update the current CPU. 1058 */ 1059 (void) taskq_cpu_setup(CPU_ON, CPU->cpu_id, NULL); 1060 mutex_exit(&cpu_lock); 1061 } 1062 1063 /* 1064 * Create global system dynamic task queue. 1065 */ 1066 void 1067 system_taskq_init(void) 1068 { 1069 system_taskq = taskq_create_common("system_taskq", 0, 1070 system_taskq_size * max_ncpus, minclsyspri, 4, 512, &p0, 0, 1071 TASKQ_DYNAMIC | TASKQ_PREPOPULATE); 1072 } 1073 1074 /* 1075 * taskq_ent_alloc() 1076 * 1077 * Allocates a new taskq_ent_t structure either from the free list or from the 1078 * cache. Returns NULL if it can't be allocated. 1079 * 1080 * Assumes: tq->tq_lock is held. 1081 */ 1082 static taskq_ent_t * 1083 taskq_ent_alloc(taskq_t *tq, int flags) 1084 { 1085 int kmflags = (flags & TQ_NOSLEEP) ? KM_NOSLEEP : KM_SLEEP; 1086 taskq_ent_t *tqe; 1087 clock_t wait_time; 1088 clock_t wait_rv; 1089 1090 ASSERT(MUTEX_HELD(&tq->tq_lock)); 1091 1092 /* 1093 * TQ_NOALLOC allocations are allowed to use the freelist, even if 1094 * we are below tq_minalloc. 1095 */ 1096 again: if ((tqe = tq->tq_freelist) != NULL && 1097 ((flags & TQ_NOALLOC) || tq->tq_nalloc >= tq->tq_minalloc)) { 1098 tq->tq_freelist = tqe->tqent_next; 1099 } else { 1100 if (flags & TQ_NOALLOC) 1101 return (NULL); 1102 1103 if (tq->tq_nalloc >= tq->tq_maxalloc) { 1104 if (kmflags & KM_NOSLEEP) 1105 return (NULL); 1106 1107 /* 1108 * We don't want to exceed tq_maxalloc, but we can't 1109 * wait for other tasks to complete (and thus free up 1110 * task structures) without risking deadlock with 1111 * the caller. So, we just delay for one second 1112 * to throttle the allocation rate. If we have tasks 1113 * complete before one second timeout expires then 1114 * taskq_ent_free will signal us and we will 1115 * immediately retry the allocation (reap free). 1116 */ 1117 wait_time = ddi_get_lbolt() + hz; 1118 while (tq->tq_freelist == NULL) { 1119 tq->tq_maxalloc_wait++; 1120 wait_rv = cv_timedwait(&tq->tq_maxalloc_cv, 1121 &tq->tq_lock, wait_time); 1122 tq->tq_maxalloc_wait--; 1123 if (wait_rv == -1) 1124 break; 1125 } 1126 if (tq->tq_freelist) 1127 goto again; /* reap freelist */ 1128 1129 } 1130 mutex_exit(&tq->tq_lock); 1131 1132 tqe = kmem_cache_alloc(taskq_ent_cache, kmflags); 1133 1134 mutex_enter(&tq->tq_lock); 1135 if (tqe != NULL) 1136 tq->tq_nalloc++; 1137 } 1138 return (tqe); 1139 } 1140 1141 /* 1142 * taskq_ent_free() 1143 * 1144 * Free taskq_ent_t structure by either putting it on the free list or freeing 1145 * it to the cache. 1146 * 1147 * Assumes: tq->tq_lock is held. 1148 */ 1149 static void 1150 taskq_ent_free(taskq_t *tq, taskq_ent_t *tqe) 1151 { 1152 ASSERT(MUTEX_HELD(&tq->tq_lock)); 1153 1154 if (tq->tq_nalloc <= tq->tq_minalloc) { 1155 tqe->tqent_next = tq->tq_freelist; 1156 tq->tq_freelist = tqe; 1157 } else { 1158 tq->tq_nalloc--; 1159 mutex_exit(&tq->tq_lock); 1160 kmem_cache_free(taskq_ent_cache, tqe); 1161 mutex_enter(&tq->tq_lock); 1162 } 1163 1164 if (tq->tq_maxalloc_wait) 1165 cv_signal(&tq->tq_maxalloc_cv); 1166 } 1167 1168 /* 1169 * taskq_ent_exists() 1170 * 1171 * Return 1 if taskq already has entry for calling 'func(arg)'. 1172 * 1173 * Assumes: tq->tq_lock is held. 1174 */ 1175 static int 1176 taskq_ent_exists(taskq_t *tq, task_func_t func, void *arg) 1177 { 1178 taskq_ent_t *tqe; 1179 1180 ASSERT(MUTEX_HELD(&tq->tq_lock)); 1181 1182 for (tqe = tq->tq_task.tqent_next; tqe != &tq->tq_task; 1183 tqe = tqe->tqent_next) 1184 if ((tqe->tqent_func == func) && (tqe->tqent_arg == arg)) 1185 return (1); 1186 return (0); 1187 } 1188 1189 /* 1190 * Dispatch a task "func(arg)" to a free entry of bucket b. 1191 * 1192 * Assumes: no bucket locks is held. 1193 * 1194 * Returns: a pointer to an entry if dispatch was successful. 1195 * NULL if there are no free entries or if the bucket is suspended. 1196 */ 1197 static taskq_ent_t * 1198 taskq_bucket_dispatch(taskq_bucket_t *b, task_func_t func, void *arg) 1199 { 1200 taskq_ent_t *tqe; 1201 taskq_t *tq = b->tqbucket_taskq; 1202 taskq_bucket_t *idleb = &tq->tq_buckets[tq->tq_nbuckets]; 1203 1204 ASSERT(MUTEX_NOT_HELD(&b->tqbucket_lock)); 1205 ASSERT(func != NULL); 1206 VERIFY(b >= tq->tq_buckets && b < idleb); 1207 1208 mutex_enter(&b->tqbucket_lock); 1209 1210 ASSERT(b->tqbucket_nfree != 0 || IS_EMPTY(b->tqbucket_freelist)); 1211 ASSERT(b->tqbucket_nfree == 0 || !IS_EMPTY(b->tqbucket_freelist)); 1212 1213 /* 1214 * Get en entry from the freelist if there is one. 1215 * Schedule task into the entry. 1216 */ 1217 if ((b->tqbucket_nfree != 0) && 1218 !(b->tqbucket_flags & TQBUCKET_SUSPEND)) { 1219 tqe = b->tqbucket_freelist.tqent_prev; 1220 1221 ASSERT(tqe != &b->tqbucket_freelist); 1222 ASSERT(tqe->tqent_thread != NULL); 1223 1224 TQ_REMOVE(tqe); 1225 b->tqbucket_nfree--; 1226 tqe->tqent_func = func; 1227 tqe->tqent_arg = arg; 1228 b->tqbucket_nalloc++; 1229 DTRACE_PROBE2(taskq__d__enqueue, taskq_bucket_t *, b, 1230 taskq_ent_t *, tqe); 1231 cv_signal(&tqe->tqent_cv); 1232 TQ_STAT(b, tqs_hits); 1233 } else { 1234 tqe = NULL; 1235 TQ_STAT(b, tqs_misses); 1236 } 1237 mutex_exit(&b->tqbucket_lock); 1238 return (tqe); 1239 } 1240 1241 /* 1242 * Dispatch a task "func(arg)" using a free entry from the "idle" bucket. 1243 * If we succeed finding a free entry, migrate that thread from the "idle" 1244 * bucket to the bucket passed (b). 1245 * 1246 * Assumes: no bucket locks is held. 1247 * 1248 * Returns: a pointer to an entry if dispatch was successful. 1249 * NULL if there are no free entries or if the bucket is suspended. 1250 */ 1251 static taskq_ent_t * 1252 taskq_idlebucket_dispatch(taskq_bucket_t *b, task_func_t func, void *arg) 1253 { 1254 taskq_ent_t *tqe; 1255 taskq_t *tq = b->tqbucket_taskq; 1256 taskq_bucket_t *idleb = &tq->tq_buckets[tq->tq_nbuckets]; 1257 1258 ASSERT(func != NULL); 1259 ASSERT(b != idleb); 1260 ASSERT(MUTEX_NOT_HELD(&b->tqbucket_lock)); 1261 ASSERT(MUTEX_NOT_HELD(&idleb->tqbucket_lock)); 1262 1263 /* 1264 * Get out quickly (without locks) if unlikely to succeed. 1265 */ 1266 if (idleb->tqbucket_nfree == 0) { 1267 TQ_STAT(idleb, tqs_misses); 1268 return (NULL); 1269 } 1270 1271 /* 1272 * Need the mutex on both the idle bucket (idleb) and bucket (b) 1273 * entered below. See Locks and Lock Order in the top comments. 1274 */ 1275 mutex_enter(&idleb->tqbucket_lock); 1276 1277 IMPLY(idleb->tqbucket_nfree == 0, IS_EMPTY(idleb->tqbucket_freelist)); 1278 IMPLY(idleb->tqbucket_nfree != 0, !IS_EMPTY(idleb->tqbucket_freelist)); 1279 1280 /* 1281 * Get an entry from the idle bucket freelist if there is one. 1282 * Schedule task into the entry. 1283 */ 1284 if ((idleb->tqbucket_nfree != 0) && 1285 !(idleb->tqbucket_flags & TQBUCKET_SUSPEND)) { 1286 tqe = idleb->tqbucket_freelist.tqent_prev; 1287 1288 ASSERT(tqe != &idleb->tqbucket_freelist); 1289 ASSERT(tqe->tqent_thread != NULL); 1290 1291 TQ_REMOVE(tqe); 1292 idleb->tqbucket_nfree--; 1293 1294 tqe->tqent_func = func; 1295 tqe->tqent_arg = arg; 1296 1297 /* 1298 * Note move TQE to new bucket here! 1299 * See reaction in taskq_d_thread 1300 */ 1301 tqe->tqent_un.tqent_bucket = b; 1302 1303 /* 1304 * Track the "alloc" on the bucket moved to, 1305 * as if this tqe were dispatched from there. 1306 */ 1307 mutex_enter(&b->tqbucket_lock); 1308 b->tqbucket_nalloc++; 1309 mutex_exit(&b->tqbucket_lock); 1310 1311 DTRACE_PROBE2(taskq__d__enqueue, taskq_bucket_t *, b, 1312 taskq_ent_t *, tqe); 1313 1314 /* Let the tqe thread run. */ 1315 cv_signal(&tqe->tqent_cv); 1316 1317 /* Count this as a "hit" on the idle bucket. */ 1318 TQ_STAT(idleb, tqs_hits); 1319 } else { 1320 tqe = NULL; 1321 TQ_STAT(idleb, tqs_misses); 1322 } 1323 1324 mutex_exit(&idleb->tqbucket_lock); 1325 1326 return (tqe); 1327 } 1328 1329 /* 1330 * Enqueue a taskq job on the per-bucket backlog. 1331 */ 1332 static taskq_ent_t * 1333 taskq_backlog_dispatch(taskq_bucket_t *bucket, task_func_t func, void *arg, 1334 int flags) 1335 { 1336 taskq_ent_t *tqe; 1337 int kmflags = (flags & TQ_NOSLEEP) ? KM_NOSLEEP : KM_SLEEP; 1338 1339 tqe = kmem_cache_alloc(taskq_ent_cache, kmflags); 1340 if (tqe == NULL) 1341 return (tqe); 1342 1343 tqe->tqent_func = func; 1344 tqe->tqent_arg = arg; 1345 1346 mutex_enter(&bucket->tqbucket_lock); 1347 taskq_backlog_enqueue(bucket, tqe, flags); 1348 mutex_exit(&bucket->tqbucket_lock); 1349 1350 return (tqe); 1351 } 1352 1353 static void 1354 taskq_backlog_enqueue(taskq_bucket_t *bucket, taskq_ent_t *tqe, int flags) 1355 { 1356 1357 ASSERT(MUTEX_HELD(&bucket->tqbucket_lock)); 1358 1359 tqe->tqent_un.tqent_bucket = bucket; 1360 if ((flags & TQ_FRONT) != 0) { 1361 TQ_PREPEND(bucket->tqbucket_backlog, tqe); 1362 } else { 1363 TQ_APPEND(bucket->tqbucket_backlog, tqe); 1364 } 1365 bucket->tqbucket_nbacklog++; 1366 /* See membar_consumer in taskq_d_thread(). */ 1367 membar_producer(); 1368 DTRACE_PROBE2(taskq__d__enqueue, 1369 taskq_bucket_t *, bucket, 1370 taskq_ent_t *, tqe); 1371 TQ_STAT(bucket, tqs_overflow); 1372 #if TASKQ_STATISTIC 1373 if (bucket->tqbucket_stat.tqs_maxbacklog < 1374 bucket->tqbucket_nbacklog) { 1375 bucket->tqbucket_stat.tqs_maxbacklog = 1376 bucket->tqbucket_nbacklog; 1377 } 1378 #endif 1379 /* 1380 * Before this function is called, the caller has tried 1381 * taskq_bucket_dispatch, taskq_idlebucket_dispatch, and 1382 * not found any idle TQE. The bucket lock is dropped 1383 * between those calls and this, so it's possible that a 1384 * TQE worker became idle before we entered the mutex. 1385 * Check for that here and wake an idle thread so it 1386 * will re-check the backlog. 1387 */ 1388 if (bucket->tqbucket_nfree != 0) { 1389 taskq_ent_t *itqe; 1390 itqe = bucket->tqbucket_freelist.tqent_prev; 1391 cv_signal(&itqe->tqent_cv); 1392 } 1393 } 1394 1395 /* 1396 * Dispatch a task. 1397 * 1398 * Assumes: func != NULL 1399 * 1400 * Returns: NULL if dispatch failed. 1401 * non-NULL if task dispatched successfully. 1402 * Actual return value is the pointer to taskq entry that was used to 1403 * dispatch a task. This is useful for debugging. 1404 */ 1405 taskqid_t 1406 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) 1407 { 1408 taskq_bucket_t *bucket = NULL; /* Which bucket needs extension */ 1409 taskq_ent_t *tqe = NULL; 1410 uint_t bsize; 1411 1412 ASSERT(tq != NULL); 1413 ASSERT(func != NULL); 1414 1415 if ((tq->tq_flags & TASKQ_DYNAMIC) == 0) { 1416 /* 1417 * TQ_NOQUEUE flag can't be used with non-dynamic task queues. 1418 */ 1419 ASSERT(!(flags & TQ_NOQUEUE)); 1420 /* 1421 * Enqueue the task to the underlying queue. 1422 */ 1423 mutex_enter(&tq->tq_lock); 1424 1425 TASKQ_S_RANDOM_DISPATCH_FAILURE(tq, flags); 1426 1427 if ((tqe = taskq_ent_alloc(tq, flags)) == NULL) { 1428 tq->tq_nomem++; 1429 mutex_exit(&tq->tq_lock); 1430 return ((taskqid_t)tqe); 1431 } 1432 /* Make sure we start without any flags */ 1433 tqe->tqent_un.tqent_flags = 0; 1434 1435 if (flags & TQ_FRONT) { 1436 TQ_ENQUEUE_FRONT(tq, tqe, func, arg); 1437 } else { 1438 TQ_ENQUEUE(tq, tqe, func, arg); 1439 } 1440 mutex_exit(&tq->tq_lock); 1441 return ((taskqid_t)tqe); 1442 } 1443 1444 /* 1445 * Dynamic taskq dispatching. 1446 */ 1447 ASSERT(!(flags & (TQ_NOALLOC | TQ_FRONT))); 1448 TASKQ_D_RANDOM_DISPATCH_FAILURE(tq, flags); 1449 1450 ASSERT(func != taskq_d_migrate); 1451 ASSERT(func != taskq_d_redirect); 1452 1453 bsize = tq->tq_nbuckets; 1454 1455 if (bsize == 1) { 1456 /* 1457 * In a single-CPU case there is only one bucket, so get 1458 * entry directly from there. 1459 */ 1460 tqe = taskq_bucket_dispatch(tq->tq_buckets, func, arg); 1461 if (tqe != NULL) 1462 return ((taskqid_t)tqe); /* Fastpath */ 1463 bucket = tq->tq_buckets; 1464 } else { 1465 uintptr_t h = TQ_HASH((uintptr_t)arg, CPUHINT()); 1466 1467 bucket = &tq->tq_buckets[h & (bsize - 1)]; 1468 ASSERT(bucket->tqbucket_taskq == tq); /* Sanity check */ 1469 1470 /* 1471 * Do a quick check before grabbing the lock. If the bucket does 1472 * not have free entries now, chances are very small that it 1473 * will after we take the lock, so we just skip it. 1474 */ 1475 if (bucket->tqbucket_nfree != 0) { 1476 tqe = taskq_bucket_dispatch(bucket, func, arg); 1477 if (tqe != NULL) 1478 return ((taskqid_t)tqe); /* Fastpath */ 1479 } else { 1480 TQ_STAT(bucket, tqs_misses); 1481 } 1482 } 1483 1484 /* 1485 * Try the "idle" bucket, which if successful, will 1486 * migrate an idle thread into this bucket. 1487 */ 1488 tqe = taskq_idlebucket_dispatch(bucket, func, arg); 1489 if (tqe != NULL) 1490 return ((taskqid_t)tqe); 1491 1492 /* 1493 * At this point we have failed to dispatch (tqe == NULL). 1494 * Try more expensive measures, if appropriate. 1495 */ 1496 ASSERT(tqe == NULL); 1497 1498 /* 1499 * For KM_SLEEP dispatches, try to extend the bucket and retry dispatch. 1500 * 1501 * taskq_bucket_extend() may fail to do anything, but this is 1502 * fine - we deal with it later. If the bucket was successfully 1503 * extended, there is a good chance that taskq_bucket_dispatch() 1504 * will get this new entry, unless another dispatch is racing with 1505 * this one and steals the new entry from under us. In that (rare) 1506 * case, repeat the taskq_bucket_extend() call. Keep a count of 1507 * the "lost the race" events just for debug and testing. 1508 */ 1509 if ((flags & TQ_NOSLEEP) == 0) { 1510 while (taskq_bucket_extend(bucket) != NULL) { 1511 TQ_STAT(bucket, tqs_disptcreates); 1512 tqe = taskq_bucket_dispatch(bucket, func, arg); 1513 if (tqe != NULL) { 1514 return ((taskqid_t)tqe); 1515 } 1516 taskq_disptcreates_lost++; 1517 } 1518 } 1519 1520 /* 1521 * Dispatch failed and we can't find an entry to schedule a task. 1522 * Use the per-bucket backlog queue unless TQ_NOQUEUE was asked. 1523 * Whether or not this succeeds, we'll schedule an asynchornous 1524 * task to try to extend (add a thread to) this bucket. 1525 */ 1526 if ((flags & TQ_NOQUEUE) == 0) { 1527 tqe = taskq_backlog_dispatch(bucket, func, arg, flags); 1528 } 1529 1530 /* 1531 * Since there are not enough free entries in the bucket, add a 1532 * taskq entry to the backing queue to extend it in the background 1533 * (unless we already have a taskq entry to perform that work). 1534 * 1535 * Note that this is the ONLY case where dynamic taskq's use the 1536 * (single threaded) tq->tq_tasks dispatch mechanism. 1537 */ 1538 mutex_enter(&tq->tq_lock); 1539 if (!taskq_ent_exists(tq, taskq_bucket_overflow, bucket)) { 1540 taskq_ent_t *tqe1; 1541 if ((tqe1 = taskq_ent_alloc(tq, flags)) != NULL) { 1542 TQ_ENQUEUE(tq, tqe1, taskq_bucket_overflow, bucket); 1543 } else { 1544 tq->tq_nomem++; 1545 } 1546 } 1547 mutex_exit(&tq->tq_lock); 1548 1549 return ((taskqid_t)tqe); 1550 } 1551 1552 void 1553 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, 1554 taskq_ent_t *tqe) 1555 { 1556 ASSERT(func != NULL); 1557 ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC)); 1558 1559 /* 1560 * Mark it as a prealloc'd task. This is important 1561 * to ensure that we don't free it later. 1562 */ 1563 tqe->tqent_un.tqent_flags |= TQENT_FLAG_PREALLOC; 1564 /* 1565 * Enqueue the task to the underlying queue. 1566 */ 1567 mutex_enter(&tq->tq_lock); 1568 1569 if (flags & TQ_FRONT) { 1570 TQ_ENQUEUE_FRONT(tq, tqe, func, arg); 1571 } else { 1572 TQ_ENQUEUE(tq, tqe, func, arg); 1573 } 1574 mutex_exit(&tq->tq_lock); 1575 } 1576 1577 /* 1578 * Allow our caller to ask if there are tasks pending on the queue. 1579 */ 1580 boolean_t 1581 taskq_empty(taskq_t *tq) 1582 { 1583 boolean_t rv; 1584 1585 ASSERT3P(tq, !=, curthread->t_taskq); 1586 mutex_enter(&tq->tq_lock); 1587 rv = (tq->tq_task.tqent_next == &tq->tq_task) && (tq->tq_active == 0); 1588 mutex_exit(&tq->tq_lock); 1589 1590 return (rv); 1591 } 1592 1593 /* 1594 * Wait for all pending tasks to complete. 1595 * Calling taskq_wait from a task will cause deadlock. 1596 */ 1597 void 1598 taskq_wait(taskq_t *tq) 1599 { 1600 ASSERT(tq != curthread->t_taskq); 1601 1602 mutex_enter(&tq->tq_lock); 1603 while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0) 1604 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 1605 mutex_exit(&tq->tq_lock); 1606 1607 if (tq->tq_flags & TASKQ_DYNAMIC) { 1608 taskq_bucket_t *b = tq->tq_buckets; 1609 int bid = 0; 1610 for (; (b != NULL) && (bid <= tq->tq_nbuckets); b++, bid++) { 1611 mutex_enter(&b->tqbucket_lock); 1612 while (b->tqbucket_nalloc > 0 || 1613 b->tqbucket_nbacklog > 0) 1614 cv_wait(&b->tqbucket_cv, &b->tqbucket_lock); 1615 mutex_exit(&b->tqbucket_lock); 1616 } 1617 } 1618 } 1619 1620 void 1621 taskq_wait_id(taskq_t *tq, taskqid_t id __unused) 1622 { 1623 taskq_wait(tq); 1624 } 1625 1626 /* 1627 * Suspend execution of tasks. 1628 * 1629 * Tasks in the queue part will be suspended immediately upon return from this 1630 * function. Pending tasks in the dynamic part will continue to execute, but all 1631 * new tasks will be suspended. 1632 */ 1633 void 1634 taskq_suspend(taskq_t *tq) 1635 { 1636 rw_enter(&tq->tq_threadlock, RW_WRITER); 1637 1638 if (tq->tq_flags & TASKQ_DYNAMIC) { 1639 taskq_bucket_t *b = tq->tq_buckets; 1640 int bid = 0; 1641 for (; (b != NULL) && (bid <= tq->tq_nbuckets); b++, bid++) { 1642 mutex_enter(&b->tqbucket_lock); 1643 b->tqbucket_flags |= TQBUCKET_SUSPEND; 1644 mutex_exit(&b->tqbucket_lock); 1645 } 1646 } 1647 /* 1648 * Mark task queue as being suspended. Needed for taskq_suspended(). 1649 */ 1650 mutex_enter(&tq->tq_lock); 1651 ASSERT(!(tq->tq_flags & TASKQ_SUSPENDED)); 1652 tq->tq_flags |= TASKQ_SUSPENDED; 1653 mutex_exit(&tq->tq_lock); 1654 } 1655 1656 /* 1657 * returns: 1 if tq is suspended, 0 otherwise. 1658 */ 1659 int 1660 taskq_suspended(taskq_t *tq) 1661 { 1662 return ((tq->tq_flags & TASKQ_SUSPENDED) != 0); 1663 } 1664 1665 /* 1666 * Resume taskq execution. 1667 */ 1668 void 1669 taskq_resume(taskq_t *tq) 1670 { 1671 ASSERT(RW_WRITE_HELD(&tq->tq_threadlock)); 1672 1673 if (tq->tq_flags & TASKQ_DYNAMIC) { 1674 taskq_bucket_t *b = tq->tq_buckets; 1675 int bid = 0; 1676 for (; (b != NULL) && (bid <= tq->tq_nbuckets); b++, bid++) { 1677 mutex_enter(&b->tqbucket_lock); 1678 b->tqbucket_flags &= ~TQBUCKET_SUSPEND; 1679 mutex_exit(&b->tqbucket_lock); 1680 } 1681 } 1682 mutex_enter(&tq->tq_lock); 1683 ASSERT(tq->tq_flags & TASKQ_SUSPENDED); 1684 tq->tq_flags &= ~TASKQ_SUSPENDED; 1685 mutex_exit(&tq->tq_lock); 1686 1687 rw_exit(&tq->tq_threadlock); 1688 } 1689 1690 int 1691 taskq_member(taskq_t *tq, kthread_t *thread) 1692 { 1693 return (thread->t_taskq == tq); 1694 } 1695 1696 /* 1697 * Creates a thread in the taskq. We only allow one outstanding create at 1698 * a time. We drop and reacquire the tq_lock in order to avoid blocking other 1699 * taskq activity while thread_create() or lwp_kernel_create() run. 1700 * 1701 * The first time we're called, we do some additional setup, and do not 1702 * return until there are enough threads to start servicing requests. 1703 */ 1704 static void 1705 taskq_thread_create(taskq_t *tq) 1706 { 1707 kthread_t *t; 1708 const boolean_t first = (tq->tq_nthreads == 0); 1709 1710 ASSERT(MUTEX_HELD(&tq->tq_lock)); 1711 ASSERT(tq->tq_flags & TASKQ_CHANGING); 1712 ASSERT(tq->tq_nthreads < tq->tq_nthreads_target); 1713 ASSERT(!(tq->tq_flags & TASKQ_THREAD_CREATED)); 1714 1715 1716 tq->tq_flags |= TASKQ_THREAD_CREATED; 1717 tq->tq_active++; 1718 mutex_exit(&tq->tq_lock); 1719 1720 /* 1721 * With TASKQ_DUTY_CYCLE the new thread must have an LWP 1722 * as explained in ../disp/sysdc.c (for the msacct data). 1723 * Normally simple kthreads are preferred, unless the 1724 * caller has asked for LWPs for other reasons. 1725 */ 1726 if ((tq->tq_flags & (TASKQ_DUTY_CYCLE | TASKQ_THREADS_LWP)) != 0) { 1727 /* Enforced in taskq_create_common */ 1728 ASSERT3P(tq->tq_proc, !=, &p0); 1729 t = lwp_kernel_create(tq->tq_proc, taskq_thread, tq, TS_RUN, 1730 tq->tq_pri); 1731 } else { 1732 t = thread_create(NULL, 0, taskq_thread, tq, 0, tq->tq_proc, 1733 TS_RUN, tq->tq_pri); 1734 } 1735 1736 if (!first) { 1737 mutex_enter(&tq->tq_lock); 1738 return; 1739 } 1740 1741 /* 1742 * We know the thread cannot go away, since tq cannot be 1743 * destroyed until creation has completed. We can therefore 1744 * safely dereference t. 1745 */ 1746 if (tq->tq_flags & TASKQ_THREADS_CPU_PCT) { 1747 taskq_cpupct_install(tq, t->t_cpupart); 1748 } 1749 mutex_enter(&tq->tq_lock); 1750 1751 /* Wait until we can service requests. */ 1752 while (tq->tq_nthreads != tq->tq_nthreads_target && 1753 tq->tq_nthreads < TASKQ_CREATE_ACTIVE_THREADS) { 1754 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 1755 } 1756 } 1757 1758 /* 1759 * Common "sleep taskq thread" function, which handles CPR stuff, as well 1760 * as giving a nice common point for debuggers to find inactive threads. 1761 */ 1762 static clock_t 1763 taskq_thread_wait(taskq_t *tq, kmutex_t *mx, kcondvar_t *cv, 1764 callb_cpr_t *cprinfo, clock_t timeout) 1765 { 1766 clock_t ret = 0; 1767 1768 ASSERT(MUTEX_HELD(mx)); 1769 if (!(tq->tq_flags & TASKQ_CPR_SAFE)) { 1770 CALLB_CPR_SAFE_BEGIN(cprinfo); 1771 } 1772 if (timeout < 0) 1773 cv_wait(cv, mx); 1774 else 1775 ret = cv_reltimedwait(cv, mx, timeout, TR_CLOCK_TICK); 1776 1777 if (!(tq->tq_flags & TASKQ_CPR_SAFE)) { 1778 CALLB_CPR_SAFE_END(cprinfo, mx); 1779 } 1780 1781 return (ret); 1782 } 1783 1784 /* 1785 * Worker thread for processing task queue. 1786 */ 1787 static void 1788 taskq_thread(void *arg) 1789 { 1790 int thread_id; 1791 1792 taskq_t *tq = arg; 1793 taskq_ent_t *tqe; 1794 callb_cpr_t cprinfo; 1795 hrtime_t start, end; 1796 boolean_t freeit; 1797 1798 curthread->t_taskq = tq; /* mark ourselves for taskq_member() */ 1799 1800 if (curproc != &p0 && (tq->tq_flags & TASKQ_DUTY_CYCLE)) { 1801 sysdc_thread_enter(curthread, tq->tq_DC, 1802 (tq->tq_flags & TASKQ_DC_BATCH) ? SYSDC_THREAD_BATCH : 0); 1803 } 1804 1805 if (tq->tq_flags & TASKQ_CPR_SAFE) { 1806 CALLB_CPR_INIT_SAFE(curthread, tq->tq_name); 1807 } else { 1808 CALLB_CPR_INIT(&cprinfo, &tq->tq_lock, callb_generic_cpr, 1809 tq->tq_name); 1810 } 1811 mutex_enter(&tq->tq_lock); 1812 thread_id = ++tq->tq_nthreads; 1813 ASSERT(tq->tq_flags & TASKQ_THREAD_CREATED); 1814 ASSERT(tq->tq_flags & TASKQ_CHANGING); 1815 tq->tq_flags &= ~TASKQ_THREAD_CREATED; 1816 1817 VERIFY3S(thread_id, <=, tq->tq_nthreads_max); 1818 1819 if (tq->tq_nthreads_max == 1) 1820 tq->tq_thread = curthread; 1821 else 1822 tq->tq_threadlist[thread_id - 1] = curthread; 1823 1824 /* Allow taskq_create_common()'s taskq_thread_create() to return. */ 1825 if (tq->tq_nthreads == TASKQ_CREATE_ACTIVE_THREADS) 1826 cv_broadcast(&tq->tq_wait_cv); 1827 1828 for (;;) { 1829 if (tq->tq_flags & TASKQ_CHANGING) { 1830 /* See if we're no longer needed */ 1831 if (thread_id > tq->tq_nthreads_target) { 1832 /* 1833 * To preserve the one-to-one mapping between 1834 * thread_id and thread, we must exit from 1835 * highest thread ID to least. 1836 * 1837 * However, if everyone is exiting, the order 1838 * doesn't matter, so just exit immediately. 1839 * (this is safe, since you must wait for 1840 * nthreads to reach 0 after setting 1841 * tq_nthreads_target to 0) 1842 */ 1843 if (thread_id == tq->tq_nthreads || 1844 tq->tq_nthreads_target == 0) 1845 break; 1846 1847 /* Wait for higher thread_ids to exit */ 1848 (void) taskq_thread_wait(tq, &tq->tq_lock, 1849 &tq->tq_exit_cv, &cprinfo, -1); 1850 continue; 1851 } 1852 1853 /* 1854 * If no thread is starting taskq_thread(), we can 1855 * do some bookkeeping. 1856 */ 1857 if (!(tq->tq_flags & TASKQ_THREAD_CREATED)) { 1858 /* Check if we've reached our target */ 1859 if (tq->tq_nthreads == tq->tq_nthreads_target) { 1860 tq->tq_flags &= ~TASKQ_CHANGING; 1861 cv_broadcast(&tq->tq_wait_cv); 1862 } 1863 /* Check if we need to create a thread */ 1864 if (tq->tq_nthreads < tq->tq_nthreads_target) { 1865 taskq_thread_create(tq); 1866 continue; /* tq_lock was dropped */ 1867 } 1868 } 1869 } 1870 if ((tqe = tq->tq_task.tqent_next) == &tq->tq_task) { 1871 if (--tq->tq_active == 0) 1872 cv_broadcast(&tq->tq_wait_cv); 1873 (void) taskq_thread_wait(tq, &tq->tq_lock, 1874 &tq->tq_dispatch_cv, &cprinfo, -1); 1875 tq->tq_active++; 1876 continue; 1877 } 1878 1879 TQ_REMOVE(tqe); 1880 mutex_exit(&tq->tq_lock); 1881 1882 /* 1883 * For prealloc'd tasks, we don't free anything. We 1884 * have to check this now, because once we call the 1885 * function for a prealloc'd taskq, we can't touch the 1886 * tqent any longer (calling the function returns the 1887 * ownershp of the tqent back to caller of 1888 * taskq_dispatch.) 1889 */ 1890 if ((!(tq->tq_flags & TASKQ_DYNAMIC)) && 1891 (tqe->tqent_un.tqent_flags & TQENT_FLAG_PREALLOC)) { 1892 /* clear pointers to assist assertion checks */ 1893 tqe->tqent_next = tqe->tqent_prev = NULL; 1894 freeit = B_FALSE; 1895 } else { 1896 freeit = B_TRUE; 1897 } 1898 1899 rw_enter(&tq->tq_threadlock, RW_READER); 1900 start = gethrtime(); 1901 DTRACE_PROBE2(taskq__exec__start, taskq_t *, tq, 1902 taskq_ent_t *, tqe); 1903 tqe->tqent_func(tqe->tqent_arg); 1904 DTRACE_PROBE2(taskq__exec__end, taskq_t *, tq, 1905 taskq_ent_t *, tqe); 1906 end = gethrtime(); 1907 rw_exit(&tq->tq_threadlock); 1908 1909 mutex_enter(&tq->tq_lock); 1910 tq->tq_totaltime += end - start; 1911 tq->tq_executed++; 1912 1913 if (freeit) 1914 taskq_ent_free(tq, tqe); 1915 } 1916 1917 if (tq->tq_nthreads_max == 1) 1918 tq->tq_thread = NULL; 1919 else 1920 tq->tq_threadlist[thread_id - 1] = NULL; 1921 1922 /* We're exiting, and therefore no longer active */ 1923 ASSERT(tq->tq_active > 0); 1924 tq->tq_active--; 1925 1926 ASSERT(tq->tq_nthreads > 0); 1927 tq->tq_nthreads--; 1928 1929 /* Wake up anyone waiting for us to exit */ 1930 cv_broadcast(&tq->tq_exit_cv); 1931 if (tq->tq_nthreads == tq->tq_nthreads_target) { 1932 if (!(tq->tq_flags & TASKQ_THREAD_CREATED)) 1933 tq->tq_flags &= ~TASKQ_CHANGING; 1934 1935 cv_broadcast(&tq->tq_wait_cv); 1936 } 1937 1938 ASSERT(!(tq->tq_flags & TASKQ_CPR_SAFE)); 1939 CALLB_CPR_EXIT(&cprinfo); /* drops tq->tq_lock */ 1940 if (curthread->t_lwp != NULL) { 1941 mutex_enter(&curproc->p_lock); 1942 lwp_exit(); 1943 } else { 1944 thread_exit(); 1945 } 1946 } 1947 1948 /* 1949 * Sentinel function to help with thread migration. 1950 * We never actualy run this function. 1951 * 1952 * When a thread becomes idle in one bucket and goes in search of another 1953 * bucket to service, it's not on any free list. For consistency with the 1954 * various assertions, we want the tqent_func to be non-NULL, so in such 1955 * cases it points to this function. 1956 */ 1957 static void 1958 taskq_d_migrate(void *arg __unused) 1959 { 1960 ASSERT(0); 1961 } 1962 1963 /* 1964 * Sentinel function to help with thread redistribution (forced migration). 1965 * We never actualy run this function. 1966 * 1967 * When taskq_bucket_redist needs to direct a thread from one bucket 1968 * to another, this function is dispatched into the bucket that will 1969 * donate the thread, with the arg pointing to the bucket that will 1970 * receive the thread. See checks for this sentinel in the functions 1971 * taskq_d_svc_bucket, taskq_d_thread. 1972 */ 1973 static void 1974 taskq_d_redirect(void *arg __unused) 1975 { 1976 ASSERT(0); 1977 } 1978 1979 /* 1980 * Helper for taskq_d_thread() -- service a bucket 1981 */ 1982 static void 1983 taskq_d_svc_bucket(taskq_ent_t *tqe, 1984 taskq_bucket_t *bucket, taskq_t *tq) 1985 { 1986 kmutex_t *lock = &bucket->tqbucket_lock; 1987 clock_t w = 0; 1988 clock_t tmo = MSEC_TO_TICK(taskq_thread_bucket_wait); 1989 1990 mutex_enter(lock); 1991 1992 /* 1993 * After this thread is started by taskq_bucket_extend(), 1994 * we may be on the free list (func == NULL) or we may have 1995 * been given a task to run. If we have a task, start at 1996 * the top of the for loop, otherwise start in "the middle", 1997 * where we would be after finishing some task. 1998 */ 1999 if (tqe->tqent_func == NULL) { 2000 /* We started on the bucket free list. */ 2001 ASSERT(tqe->tqent_prev != NULL); 2002 ASSERT(bucket->tqbucket_nfree > 0); 2003 2004 /* 2005 * If we have a backlog, take off free list and 2006 * start working on the backlog. 2007 */ 2008 if (bucket->tqbucket_nbacklog > 0) { 2009 TQ_REMOVE(tqe); 2010 bucket->tqbucket_nfree--; 2011 tqe->tqent_func = taskq_d_migrate; 2012 bucket->tqbucket_nalloc++; 2013 goto entry_backlog; 2014 } 2015 /* 2016 * We're already on the free list, so start where 2017 * we'd wait just after going onto the free list. 2018 */ 2019 goto entry_freelist; 2020 } 2021 2022 /* 2023 * After a forced migration, clear the REDIRECT flag, 2024 * then continue as if voluntary migration. 2025 */ 2026 if (tqe->tqent_func == taskq_d_redirect) { 2027 bucket->tqbucket_flags &= ~TQBUCKET_REDIRECT; 2028 tqe->tqent_func = taskq_d_migrate; 2029 } 2030 2031 /* 2032 * Migration to a new bucket (forced or voluntary). 2033 * We're not on any free list. Enter middle of loop, 2034 * but first adjust nalloc as if we were dispatched. 2035 * Adjustment of nfree-- happened during return from 2036 * this function after servicing another bucket. 2037 */ 2038 if (tqe->tqent_func == taskq_d_migrate) { 2039 bucket->tqbucket_nalloc++; 2040 goto entry_backlog; 2041 } 2042 2043 for (;;) { 2044 /* 2045 * If a task is scheduled (func != NULL), execute it. 2046 */ 2047 if (tqe->tqent_func != NULL) { 2048 hrtime_t start; 2049 hrtime_t end; 2050 2051 /* Should not be on free list. */ 2052 ASSERT(tqe->tqent_prev == NULL); 2053 ASSERT(bucket->tqbucket_nalloc > 0); 2054 2055 /* 2056 * Check for redirect (forced migration) 2057 * Skip going on free list. Just return. 2058 */ 2059 if (tqe->tqent_func == taskq_d_redirect) { 2060 bucket->tqbucket_nalloc--; 2061 goto unlock_out; 2062 } 2063 2064 /* 2065 * Run the job. 2066 */ 2067 mutex_exit(lock); 2068 start = gethrtime(); 2069 DTRACE_PROBE3(taskq__d__exec__start, taskq_t *, tq, 2070 taskq_bucket_t *, bucket, taskq_ent_t *, tqe); 2071 tqe->tqent_func(tqe->tqent_arg); 2072 DTRACE_PROBE3(taskq__d__exec__end, taskq_t *, tq, 2073 taskq_bucket_t *, bucket, taskq_ent_t *, tqe); 2074 end = gethrtime(); 2075 mutex_enter(lock); 2076 bucket->tqbucket_totaltime += end - start; 2077 } 2078 2079 entry_backlog: 2080 /* 2081 * If there's a backlog, consume the head of the 2082 * backlog like taskq_bucket_dispatch, then let the 2083 * normal execution code path run it. 2084 */ 2085 if (bucket->tqbucket_nbacklog > 0) { 2086 taskq_ent_t *bltqe; 2087 2088 /* 2089 * Should not be on free list. 2090 * May enter here from the top. 2091 */ 2092 ASSERT(tqe->tqent_prev == NULL); 2093 ASSERT(bucket->tqbucket_nalloc > 0); 2094 2095 ASSERT(!IS_EMPTY(bucket->tqbucket_backlog)); 2096 bltqe = bucket->tqbucket_backlog.tqent_next; 2097 TQ_REMOVE(bltqe); 2098 bucket->tqbucket_nbacklog--; 2099 2100 DTRACE_PROBE2(taskq__x__backlog, 2101 taskq_bucket_t *, bucket, 2102 taskq_ent_t *, bltqe); 2103 2104 /* 2105 * Copy the backlog entry to the tqe 2106 * and free the backlog entry. 2107 */ 2108 tqe->tqent_func = bltqe->tqent_func; 2109 tqe->tqent_arg = bltqe->tqent_arg; 2110 kmem_cache_free(taskq_ent_cache, bltqe); 2111 2112 /* Run as usual. */ 2113 continue; 2114 } 2115 2116 DTRACE_PROBE2(taskq__d__wait1, 2117 taskq_t *, tq, taskq_ent_t *, tqe); 2118 2119 /* 2120 * We've run out of work in this bucket. 2121 * Put our TQE on the free list and wait. 2122 */ 2123 ASSERT(tqe->tqent_prev == NULL); 2124 ASSERT(bucket->tqbucket_nalloc > 0); 2125 bucket->tqbucket_nalloc--; 2126 tqe->tqent_func = NULL; 2127 TQ_APPEND(bucket->tqbucket_freelist, tqe); 2128 bucket->tqbucket_nfree++; 2129 2130 /* 2131 * taskq_wait() waits for nalloc to drop to zero on 2132 * tqbucket_cv. 2133 */ 2134 cv_signal(&bucket->tqbucket_cv); 2135 2136 entry_freelist: 2137 /* 2138 * Note: may enter here from the top. 2139 * We're on the free list. Wait for work. 2140 */ 2141 ASSERT(tqe->tqent_func == NULL); 2142 ASSERT(tqe->tqent_prev != NULL); 2143 ASSERT(MUTEX_HELD(lock)); 2144 2145 /* 2146 * If we're closing, finish. 2147 */ 2148 if ((bucket->tqbucket_flags & TQBUCKET_CLOSE) != 0) 2149 break; 2150 2151 /* 2152 * Go to sleep waiting for work to arrive. 2153 * Sleep only briefly here on the bucket. 2154 * If no work lands in the bucket, return and 2155 * the caller will put this TQE on the common 2156 * list of idle threads and do the long wait. 2157 */ 2158 w = cv_reltimedwait(&tqe->tqent_cv, lock, tmo, TR_CLOCK_TICK); 2159 2160 /* 2161 * At this point we may be in two different states: 2162 * 2163 * (1) tqent_func is set which means that a new task is 2164 * dispatched and we need to execute it. 2165 * The dispatch took us off the free list. 2166 * 2167 * (2) Thread is sleeping for too long, or closing. 2168 * We're done servicing this bucket. 2169 * 2170 * Some consistency checks: 2171 * func == NULL implies on free list 2172 * func != NULL implies not on free list 2173 */ 2174 if (tqe->tqent_func == NULL) { 2175 /* Should be on the free list. */ 2176 ASSERT(tqe->tqent_prev != NULL); 2177 ASSERT(bucket->tqbucket_nfree > 0); 2178 if (w < 0) { 2179 /* slept too long */ 2180 break; 2181 } 2182 2183 /* 2184 * We may have been signaled if we finished a job 2185 * and got on the free list just before a call to 2186 * taskq_backlog_dispatch took the lock. In that 2187 * case resume working on the backlog. 2188 */ 2189 if (bucket->tqbucket_nbacklog > 0) { 2190 TQ_REMOVE(tqe); 2191 bucket->tqbucket_nfree--; 2192 tqe->tqent_func = taskq_d_migrate; 2193 bucket->tqbucket_nalloc++; 2194 goto entry_backlog; 2195 } 2196 2197 /* 2198 * Woken for some other reason. 2199 * Still on the free list, lock held. 2200 * Just wait again. 2201 */ 2202 goto entry_freelist; 2203 } 2204 2205 /* 2206 * taskq_bucket_dispatch has set tqent_func 2207 * and taken us off the free list. 2208 */ 2209 ASSERT(tqe->tqent_func != NULL); 2210 ASSERT(tqe->tqent_prev == NULL); 2211 /* Back to the top (continue) */ 2212 } 2213 2214 /* 2215 * Remove the entry from the free list. 2216 * Will migrate to another bucket. 2217 * See taskq_d_migrate above. 2218 * 2219 * Note: nalloc++ happens after we return to taskq_d_thread 2220 * and enter the mutex for the next bucket we serve. 2221 */ 2222 TQ_REMOVE(tqe); 2223 tqe->tqent_func = taskq_d_migrate; 2224 ASSERT(bucket->tqbucket_nfree > 0); 2225 bucket->tqbucket_nfree--; 2226 cv_signal(&bucket->tqbucket_cv); 2227 2228 unlock_out: 2229 mutex_exit(lock); 2230 } 2231 2232 /* 2233 * Worker thread for dynamic taskq's 2234 */ 2235 static void 2236 taskq_d_thread(taskq_ent_t *tqe) 2237 { 2238 callb_cpr_t cprinfo; 2239 taskq_bucket_t *b; 2240 taskq_bucket_t *bucket = tqe->tqent_un.tqent_bucket; 2241 taskq_t *tq = bucket->tqbucket_taskq; 2242 taskq_bucket_t *idle_bucket = &tq->tq_buckets[tq->tq_nbuckets]; 2243 kmutex_t *idle_lock = &idle_bucket->tqbucket_lock; 2244 clock_t tmo, w = 0; 2245 2246 CALLB_CPR_INIT(&cprinfo, idle_lock, callb_generic_cpr, tq->tq_name); 2247 2248 /* 2249 * Note that taskq_idlebucket_dispatch can change 2250 * tqent_bucket when we're on the free list. Hold 2251 * idle_lock to synchronize with those changes. 2252 */ 2253 mutex_enter(idle_lock); 2254 bucket = tqe->tqent_un.tqent_bucket; 2255 2256 /* 2257 * If we were started for TASKQ_PREPOPULATE, 2258 * we'll be on the idle bucket free list. 2259 * In that case start in the middle. 2260 */ 2261 if (bucket == idle_bucket) { 2262 ASSERT(tqe->tqent_func == NULL); 2263 ASSERT(tqe->tqent_prev != NULL); 2264 goto entry_freelist; 2265 } 2266 2267 /* Not on the idle_bucket free list. */ 2268 mutex_exit(idle_lock); 2269 2270 for (;;) { 2271 continue_2: 2272 2273 /* 2274 * Service the bucket pointed to by the TQE. 2275 * We are NOT on the idle_bucket free list. 2276 * We may or may not be on the bucket free list. 2277 */ 2278 ASSERT(MUTEX_NOT_HELD(idle_lock)); 2279 bucket = tqe->tqent_un.tqent_bucket; 2280 VERIFY3P(bucket, >=, tq->tq_buckets); 2281 VERIFY3P(bucket, <, idle_bucket); 2282 2283 /* Enters/exits bucket->tqbucket_lock */ 2284 taskq_d_svc_bucket(tqe, bucket, tq); 2285 2286 /* 2287 * Finished servicing a bucket where we became idle. 2288 * Not on any free list. Migrate to another bucket. 2289 * With "redirect" (forced migration) we move to the 2290 * bucket indicated by the arg. 2291 */ 2292 ASSERT(tqe->tqent_prev == NULL); 2293 if (tqe->tqent_func == taskq_d_redirect) { 2294 /* 2295 * Migrate to this bucket. 2296 * See: taskq_d_redirect() 2297 */ 2298 tqe->tqent_un.tqent_bucket = tqe->tqent_arg; 2299 DTRACE_PROBE2(taskq__d__redirect, 2300 taskq_t *, tq, taskq_ent_t *, tqe); 2301 continue; 2302 } 2303 2304 /* 2305 * Look for buckets with backlog and if found, migrate 2306 * to that bucket. Search starting at the next bucket 2307 * after the current one so the search starting points 2308 * will be distributed. 2309 * 2310 * Unlocked access is OK here. A bucket may be missed 2311 * due to a stale (cached) nbacklog value, but another 2312 * idle thread will see the updated value soon. If we 2313 * visit a bucket needlessly, the visit will be short. 2314 * There's a membar_producer after tqbucket_nbacklog is 2315 * updated, which should ensure visibility of updates 2316 * soon enough so buckets needing attention will get a 2317 * visit by threads passing through here. 2318 */ 2319 check_backlog: 2320 ASSERT(tqe->tqent_func == taskq_d_migrate); 2321 VERIFY3P(bucket, >=, tq->tq_buckets); 2322 VERIFY3P(bucket, <, idle_bucket); 2323 membar_consumer(); 2324 b = bucket; 2325 do { 2326 /* Next bucket */ 2327 if (++b == idle_bucket) 2328 b = tq->tq_buckets; 2329 2330 if (b->tqbucket_nbacklog > 0) { 2331 /* 2332 * Migrate to this bucket. 2333 * See: taskq_d_migrate() 2334 */ 2335 tqe->tqent_un.tqent_bucket = b; 2336 DTRACE_PROBE2(taskq__d__migration, 2337 taskq_t *, tq, taskq_ent_t *, tqe); 2338 goto continue_2; 2339 } 2340 } while (b != bucket); 2341 2342 DTRACE_PROBE2(taskq__d__wait2, 2343 taskq_t *, tq, taskq_ent_t *, tqe); 2344 2345 /* 2346 * Migrate to the idle bucket, put this TQE on 2347 * the free list for that bucket, then wait. 2348 */ 2349 ASSERT(tqe->tqent_prev == NULL); 2350 tqe->tqent_un.tqent_bucket = idle_bucket; 2351 mutex_enter(idle_lock); 2352 tqe->tqent_func = NULL; 2353 TQ_APPEND(idle_bucket->tqbucket_freelist, tqe); 2354 idle_bucket->tqbucket_nfree++; 2355 2356 entry_freelist: 2357 /* 2358 * Note: may enter here from the top. 2359 * We're on the free list. Wait for work. 2360 */ 2361 ASSERT(tqe->tqent_func == NULL); 2362 ASSERT(tqe->tqent_prev != NULL); 2363 ASSERT(idle_bucket->tqbucket_nfree > 0); 2364 ASSERT(MUTEX_HELD(idle_lock)); 2365 ASSERT3P(tqe->tqent_un.tqent_bucket, ==, idle_bucket); 2366 2367 /* 2368 * If we're closing, finish. 2369 */ 2370 if ((idle_bucket->tqbucket_flags & TQBUCKET_CLOSE) != 0) 2371 break; 2372 2373 /* 2374 * Go to sleep waiting for work to arrive. 2375 * If a thread is sleeping too long, it dies. 2376 * If this is the last thread, no timeout. 2377 */ 2378 if (idle_bucket->tqbucket_nfree == 1) { 2379 tmo = -1; 2380 } else { 2381 tmo = SEC_TO_TICK(taskq_thread_timeout); 2382 } 2383 w = taskq_thread_wait(tq, idle_lock, 2384 &tqe->tqent_cv, &cprinfo, tmo); 2385 2386 /* 2387 * At this point we may be in two different states: 2388 * 2389 * (1) tqent_func is set which means that a new task is 2390 * dispatched and we need to execute it. 2391 * The dispatch took us off the free list. 2392 * Migrate to the new bucket. 2393 * 2394 * (2) Thread is sleeping for too long -- return 2395 * 2396 * Some consistency checks: 2397 * func == NULL implies on free list 2398 * func != NULL implies not on free list 2399 */ 2400 if (tqe->tqent_func == NULL) { 2401 /* Should be on the free list. */ 2402 ASSERT(tqe->tqent_prev != NULL); 2403 if (w < 0 && idle_bucket->tqbucket_nfree > 1) { 2404 /* 2405 * taskq_thread_wait timed out. 2406 * If not last thread, exit. 2407 */ 2408 break; 2409 } 2410 2411 /* 2412 * Woken for some other reason, one of: 2413 * Last thread - stick around longer 2414 * Destroying, out via CLOSE above 2415 * taskq_bucket_redist signaled 2416 * 2417 * Still on the free list, lock held. Continue 2418 * back at the re-check for backlog work, 2419 * which means coming off the free list. 2420 * 2421 * Note that tqent_bucket is the idle bucket 2422 * at this point, which is not valid above, 2423 * so pretend we just finished servicing the 2424 * first bucket. This happens rarely. 2425 */ 2426 bucket = tq->tq_buckets; 2427 TQ_REMOVE(tqe); 2428 idle_bucket->tqbucket_nfree--; 2429 tqe->tqent_func = taskq_d_migrate; 2430 tqe->tqent_un.tqent_bucket = bucket; 2431 mutex_exit(idle_lock); 2432 goto check_backlog; 2433 } 2434 2435 /* 2436 * taskq_idlebucket_dispatch will have moved this 2437 * taskq_ent_t from the idle bucket (idleb) to a 2438 * new bucket (newb). In detail, it has: 2439 * Removed this TQE from idlb->tqbucket_freelist 2440 * deccremented idleb->tqbucket_nfree 2441 * Set tqent_bucket = new_bucket 2442 * Set tqent_func, tqent_argarg 2443 * incremented newb->tqbucket_nalloc 2444 */ 2445 ASSERT(tqe->tqent_func != NULL); 2446 ASSERT(tqe->tqent_prev == NULL); 2447 ASSERT(tqe->tqent_un.tqent_bucket != idle_bucket); 2448 DTRACE_PROBE2(taskq__d__idledisp, 2449 taskq_t *, tq, taskq_ent_t *, tqe); 2450 mutex_exit(idle_lock); 2451 /* Back to the top (continue) */ 2452 } 2453 ASSERT(MUTEX_HELD(idle_lock)); 2454 ASSERT(tqe->tqent_prev != NULL); 2455 2456 /* 2457 * Thread creation/destruction happens rarely, 2458 * so grabbing the lock is not a big performance issue. 2459 * The bucket lock is dropped by CALLB_CPR_EXIT(). 2460 */ 2461 2462 /* Remove the entry from the free list. */ 2463 TQ_REMOVE(tqe); 2464 ASSERT(idle_bucket->tqbucket_nfree > 0); 2465 idle_bucket->tqbucket_nfree--; 2466 2467 /* Note: Creates and deaths are on the idle bucket. */ 2468 TQ_STAT(idle_bucket, tqs_tdeaths); 2469 cv_signal(&idle_bucket->tqbucket_cv); 2470 2471 /* 2472 * When destroying, wake the next thread, if any. 2473 * See thundering herd comment in taskq_destroy. 2474 */ 2475 if ((idle_bucket->tqbucket_flags & TQBUCKET_CLOSE) != 0 && 2476 (idle_bucket->tqbucket_nfree > 0)) { 2477 taskq_ent_t *ntqe; 2478 ASSERT(!IS_EMPTY(idle_bucket->tqbucket_freelist)); 2479 ntqe = idle_bucket->tqbucket_freelist.tqent_next; 2480 cv_signal(&ntqe->tqent_cv); 2481 } 2482 2483 tqe->tqent_thread = NULL; 2484 mutex_enter(&tq->tq_lock); 2485 tq->tq_dnthreads--; 2486 cv_broadcast(&tq->tq_exit_cv); 2487 mutex_exit(&tq->tq_lock); 2488 2489 CALLB_CPR_EXIT(&cprinfo); /* mutex_exit(idle_lock) */ 2490 2491 kmem_cache_free(taskq_ent_cache, tqe); 2492 2493 if (curthread->t_lwp != NULL) { 2494 mutex_enter(&curproc->p_lock); 2495 lwp_exit(); /* noreturn. drops p_lock */ 2496 } else { 2497 thread_exit(); 2498 } 2499 } 2500 2501 2502 /* 2503 * Taskq creation. May sleep for memory. 2504 * Always use automatically generated instances to avoid kstat name space 2505 * collisions. 2506 */ 2507 2508 taskq_t * 2509 taskq_create(const char *name, int nthreads, pri_t pri, int minalloc, 2510 int maxalloc, uint_t flags) 2511 { 2512 ASSERT((flags & ~TASKQ_INTERFACE_FLAGS) == 0); 2513 2514 return (taskq_create_common(name, 0, nthreads, pri, minalloc, 2515 maxalloc, &p0, 0, flags | TASKQ_NOINSTANCE)); 2516 } 2517 2518 /* 2519 * Create an instance of task queue. It is legal to create task queues with the 2520 * same name and different instances. 2521 * 2522 * taskq_create_instance is used by ddi_taskq_create() where it gets the 2523 * instance from ddi_get_instance(). In some cases the instance is not 2524 * initialized and is set to -1. This case is handled as if no instance was 2525 * passed at all. 2526 */ 2527 taskq_t * 2528 taskq_create_instance(const char *name, int instance, int nthreads, pri_t pri, 2529 int minalloc, int maxalloc, uint_t flags) 2530 { 2531 ASSERT((flags & ~TASKQ_INTERFACE_FLAGS) == 0); 2532 ASSERT((instance >= 0) || (instance == -1)); 2533 2534 if (instance < 0) { 2535 flags |= TASKQ_NOINSTANCE; 2536 } 2537 2538 return (taskq_create_common(name, instance, nthreads, 2539 pri, minalloc, maxalloc, &p0, 0, flags)); 2540 } 2541 2542 taskq_t * 2543 taskq_create_proc(const char *name, int nthreads, pri_t pri, int minalloc, 2544 int maxalloc, proc_t *proc, uint_t flags) 2545 { 2546 ASSERT((flags & ~TASKQ_INTERFACE_FLAGS) == 0); 2547 ASSERT(proc->p_flag & SSYS); 2548 2549 return (taskq_create_common(name, 0, nthreads, pri, minalloc, 2550 maxalloc, proc, 0, flags | TASKQ_NOINSTANCE)); 2551 } 2552 2553 taskq_t * 2554 taskq_create_sysdc(const char *name, int nthreads, int minalloc, 2555 int maxalloc, proc_t *proc, uint_t dc, uint_t flags) 2556 { 2557 ASSERT((flags & ~TASKQ_INTERFACE_FLAGS) == 0); 2558 ASSERT(proc->p_flag & SSYS); 2559 2560 return (taskq_create_common(name, 0, nthreads, minclsyspri, minalloc, 2561 maxalloc, proc, dc, flags | TASKQ_NOINSTANCE | TASKQ_DUTY_CYCLE)); 2562 } 2563 2564 static taskq_t * 2565 taskq_create_common(const char *name, int instance, int nthreads, pri_t pri, 2566 int minalloc, int maxalloc, proc_t *proc, uint_t dc, uint_t flags) 2567 { 2568 taskq_t *tq = kmem_cache_alloc(taskq_cache, KM_SLEEP); 2569 uint_t ncpus = ((boot_max_ncpus == -1) ? max_ncpus : boot_max_ncpus); 2570 uint_t bsize; /* # of buckets - always power of 2 */ 2571 int max_nthreads; 2572 2573 /* 2574 * TASKQ_DYNAMIC, TASKQ_CPR_SAFE and TASKQ_THREADS_CPU_PCT are all 2575 * mutually incompatible. 2576 */ 2577 IMPLY((flags & TASKQ_DYNAMIC), !(flags & TASKQ_CPR_SAFE)); 2578 IMPLY((flags & TASKQ_DYNAMIC), !(flags & TASKQ_THREADS_CPU_PCT)); 2579 IMPLY((flags & TASKQ_CPR_SAFE), !(flags & TASKQ_THREADS_CPU_PCT)); 2580 2581 /* Cannot have DYNAMIC with DUTY_CYCLE */ 2582 IMPLY((flags & TASKQ_DYNAMIC), !(flags & TASKQ_DUTY_CYCLE)); 2583 2584 /* Cannot have DUTY_CYCLE with a p0 kernel process */ 2585 IMPLY((flags & TASKQ_DUTY_CYCLE), proc != &p0); 2586 2587 /* Cannot have THREADS_LWP with a p0 kernel process */ 2588 IMPLY((flags & TASKQ_THREADS_LWP), proc != &p0); 2589 2590 /* Cannot have DC_BATCH without DUTY_CYCLE */ 2591 ASSERT((flags & (TASKQ_DUTY_CYCLE|TASKQ_DC_BATCH)) != TASKQ_DC_BATCH); 2592 2593 ASSERT(proc != NULL); 2594 2595 bsize = 1 << (highbit(ncpus) - 1); 2596 ASSERT(bsize >= 1); 2597 bsize = MAX(bsize, taskq_minbuckets); 2598 bsize = MIN(bsize, taskq_maxbuckets); 2599 2600 if (flags & TASKQ_DYNAMIC) { 2601 ASSERT3S(nthreads, >=, 1); 2602 /* Need at least (bsize + 1) threads */ 2603 tq->tq_maxsize = MAX(nthreads, bsize + 1); 2604 /* See taskq_bucket_redist(). */ 2605 tq->tq_atpb = tq->tq_maxsize / bsize; 2606 ASSERT(tq->tq_atpb != 0); 2607 2608 /* For dynamic task queues use just one backing thread */ 2609 nthreads = max_nthreads = 1; 2610 2611 } else if (flags & TASKQ_THREADS_CPU_PCT) { 2612 uint_t pct; 2613 ASSERT3S(nthreads, >=, 0); 2614 pct = nthreads; 2615 2616 if (pct > taskq_cpupct_max_percent) 2617 pct = taskq_cpupct_max_percent; 2618 2619 /* 2620 * If you're using THREADS_CPU_PCT, the process for the 2621 * taskq threads must be curproc. This allows any pset 2622 * binding to be inherited correctly. If proc is &p0, 2623 * we won't be creating LWPs, so new threads will be assigned 2624 * to the default processor set. 2625 */ 2626 ASSERT(curproc == proc || proc == &p0); 2627 tq->tq_threads_ncpus_pct = pct; 2628 nthreads = 1; /* corrected in taskq_thread_create() */ 2629 max_nthreads = TASKQ_THREADS_PCT(max_ncpus, pct); 2630 2631 } else { 2632 ASSERT3S(nthreads, >=, 1); 2633 max_nthreads = nthreads; 2634 } 2635 2636 if (max_nthreads < taskq_minimum_nthreads_max) 2637 max_nthreads = taskq_minimum_nthreads_max; 2638 2639 /* 2640 * Make sure the name is 0-terminated, and conforms to the rules for 2641 * C indentifiers 2642 */ 2643 (void) strncpy(tq->tq_name, name, TASKQ_NAMELEN + 1); 2644 strident_canon(tq->tq_name, TASKQ_NAMELEN + 1); 2645 2646 tq->tq_flags = flags | TASKQ_CHANGING; 2647 tq->tq_active = 0; 2648 tq->tq_instance = instance; 2649 tq->tq_nthreads_target = nthreads; 2650 tq->tq_nthreads_max = max_nthreads; 2651 tq->tq_minalloc = minalloc; 2652 tq->tq_maxalloc = maxalloc; 2653 tq->tq_nbuckets = bsize; 2654 tq->tq_proc = proc; 2655 tq->tq_pri = pri; 2656 tq->tq_DC = dc; 2657 list_link_init(&tq->tq_cpupct_link); 2658 2659 if (max_nthreads > 1) 2660 tq->tq_threadlist = kmem_alloc( 2661 sizeof (kthread_t *) * max_nthreads, KM_SLEEP); 2662 2663 mutex_enter(&tq->tq_lock); 2664 if (flags & TASKQ_PREPOPULATE) { 2665 while (minalloc-- > 0) 2666 taskq_ent_free(tq, taskq_ent_alloc(tq, TQ_SLEEP)); 2667 } 2668 2669 /* 2670 * Before we start creating threads for this taskq, take a 2671 * zone hold so the zone can't go away before taskq_destroy 2672 * makes sure all the taskq threads are gone. This hold is 2673 * similar in purpose to those taken by zthread_create(). 2674 */ 2675 zone_hold(tq->tq_proc->p_zone); 2676 2677 /* 2678 * Create the first thread, which will create any other threads 2679 * necessary. taskq_thread_create will not return until we have 2680 * enough threads to be able to process requests. 2681 */ 2682 taskq_thread_create(tq); 2683 mutex_exit(&tq->tq_lock); 2684 2685 /* 2686 * For dynamic taskq, create the array of buckets, PLUS ONE 2687 * for the bucket used as the "idle bucket". 2688 */ 2689 if (flags & TASKQ_DYNAMIC) { 2690 taskq_bucket_t *bucket = kmem_zalloc(sizeof (taskq_bucket_t) * 2691 (bsize + 1), KM_SLEEP); 2692 taskq_bucket_t *idle_bucket = &bucket[bsize]; 2693 int b_id; 2694 2695 tq->tq_buckets = bucket; 2696 2697 /* Initialize each bucket */ 2698 for (b_id = 0; b_id < (bsize + 1); b_id++, bucket++) { 2699 mutex_init(&bucket->tqbucket_lock, NULL, MUTEX_DEFAULT, 2700 NULL); 2701 cv_init(&bucket->tqbucket_cv, NULL, CV_DEFAULT, NULL); 2702 bucket->tqbucket_taskq = tq; 2703 TQ_LIST_INIT(bucket->tqbucket_freelist); 2704 TQ_LIST_INIT(bucket->tqbucket_backlog); 2705 } 2706 /* 2707 * Always create at least one idle bucket thread. 2708 * That can't fail because we're at nthreads=0. 2709 * If pre-populating, create more (nbuckets) threads. 2710 * That can fail, in which case we'll just try later. 2711 */ 2712 (void) taskq_bucket_extend(idle_bucket); 2713 if (flags & TASKQ_PREPOPULATE) { 2714 int i; 2715 for (i = 1; i < bsize; i++) { 2716 (void) taskq_bucket_extend(idle_bucket); 2717 } 2718 } 2719 } 2720 2721 /* 2722 * Install kstats. 2723 * We have two cases: 2724 * 1) Instance is provided to taskq_create_instance(). In this case it 2725 * should be >= 0 and we use it. 2726 * 2727 * 2) Instance is not provided and is automatically generated 2728 */ 2729 if (flags & TASKQ_NOINSTANCE) { 2730 instance = tq->tq_instance = 2731 (int)(uintptr_t)vmem_alloc(taskq_id_arena, 1, VM_SLEEP); 2732 } 2733 2734 if (flags & TASKQ_DYNAMIC) { 2735 if ((tq->tq_kstat = kstat_create("unix", instance, 2736 tq->tq_name, "taskq_d", KSTAT_TYPE_NAMED, 2737 sizeof (taskq_d_kstat) / sizeof (kstat_named_t), 2738 KSTAT_FLAG_VIRTUAL)) != NULL) { 2739 tq->tq_kstat->ks_lock = &taskq_d_kstat_lock; 2740 tq->tq_kstat->ks_data = &taskq_d_kstat; 2741 tq->tq_kstat->ks_update = taskq_d_kstat_update; 2742 tq->tq_kstat->ks_private = tq; 2743 kstat_install(tq->tq_kstat); 2744 } 2745 } else { 2746 if ((tq->tq_kstat = kstat_create("unix", instance, tq->tq_name, 2747 "taskq", KSTAT_TYPE_NAMED, 2748 sizeof (taskq_kstat) / sizeof (kstat_named_t), 2749 KSTAT_FLAG_VIRTUAL)) != NULL) { 2750 tq->tq_kstat->ks_lock = &taskq_kstat_lock; 2751 tq->tq_kstat->ks_data = &taskq_kstat; 2752 tq->tq_kstat->ks_update = taskq_kstat_update; 2753 tq->tq_kstat->ks_private = tq; 2754 kstat_install(tq->tq_kstat); 2755 } 2756 } 2757 2758 return (tq); 2759 } 2760 2761 /* 2762 * taskq_destroy(). 2763 * 2764 * Assumes: by the time taskq_destroy is called no one will use this task queue 2765 * in any way and no one will try to dispatch entries in it. 2766 */ 2767 void 2768 taskq_destroy(taskq_t *tq) 2769 { 2770 2771 ASSERT(! (tq->tq_flags & TASKQ_CPR_SAFE)); 2772 2773 /* 2774 * Destroy kstats. 2775 */ 2776 if (tq->tq_kstat != NULL) { 2777 kstat_delete(tq->tq_kstat); 2778 tq->tq_kstat = NULL; 2779 } 2780 2781 /* 2782 * Destroy instance if needed. 2783 */ 2784 if (tq->tq_flags & TASKQ_NOINSTANCE) { 2785 vmem_free(taskq_id_arena, (void *)(uintptr_t)(tq->tq_instance), 2786 1); 2787 tq->tq_instance = 0; 2788 } 2789 2790 /* 2791 * Unregister from the cpupct list. 2792 */ 2793 if (tq->tq_flags & TASKQ_THREADS_CPU_PCT) { 2794 taskq_cpupct_remove(tq); 2795 } 2796 2797 /* 2798 * Wait for any pending entries to complete. 2799 */ 2800 taskq_wait(tq); 2801 2802 mutex_enter(&tq->tq_lock); 2803 ASSERT((tq->tq_task.tqent_next == &tq->tq_task) && 2804 (tq->tq_active == 0)); 2805 2806 /* notify all the threads that they need to exit */ 2807 tq->tq_nthreads_target = 0; 2808 2809 tq->tq_flags |= TASKQ_CHANGING; 2810 cv_broadcast(&tq->tq_dispatch_cv); 2811 cv_broadcast(&tq->tq_exit_cv); 2812 2813 while (tq->tq_nthreads != 0) 2814 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 2815 2816 if (tq->tq_nthreads_max != 1) 2817 kmem_free(tq->tq_threadlist, sizeof (kthread_t *) * 2818 tq->tq_nthreads_max); 2819 2820 tq->tq_minalloc = 0; 2821 while (tq->tq_nalloc != 0) 2822 taskq_ent_free(tq, taskq_ent_alloc(tq, TQ_SLEEP)); 2823 2824 mutex_exit(&tq->tq_lock); 2825 2826 /* 2827 * For dynamic taskq: 2828 * Mark each bucket as closing and wakeup all sleeping threads. 2829 * Two passes: 1st mark & wake all; 2nd wait for thread exits. 2830 * Include the idle bucket here. 2831 */ 2832 if (tq->tq_buckets != NULL) { 2833 taskq_bucket_t *b; 2834 uint_t bid = 0; 2835 2836 ASSERT((tq->tq_flags & TASKQ_DYNAMIC) != 0); 2837 2838 for (bid = 0, b = tq->tq_buckets; 2839 bid <= tq->tq_nbuckets; 2840 b++, bid++) { 2841 2842 taskq_ent_t *tqe; 2843 2844 mutex_enter(&b->tqbucket_lock); 2845 2846 /* We called taskq_wait() above. */ 2847 ASSERT(b->tqbucket_nalloc == 0); 2848 2849 /* 2850 * Wakeup all sleeping threads. 2851 * 2852 * The idle bucket may have many threads. 2853 * Avoid a "thundering herd" of calls into 2854 * taskq_thread_wait() / cv_reltimedwait() 2855 * thrashing mutexes in callout teardown, 2856 * and just wake the first idle thread, 2857 * letting it wake the next. 2858 * See cv_signal near end of taskq_d_thread 2859 * In other buckets, wake all threads. 2860 */ 2861 b->tqbucket_flags |= TQBUCKET_CLOSE; 2862 for (tqe = b->tqbucket_freelist.tqent_next; 2863 tqe != &b->tqbucket_freelist; 2864 tqe = tqe->tqent_next) { 2865 2866 cv_signal(&tqe->tqent_cv); 2867 2868 if (bid == tq->tq_nbuckets) { 2869 /* idle bucket; just wake one. */ 2870 break; 2871 } 2872 } 2873 mutex_exit(&b->tqbucket_lock); 2874 } 2875 2876 for (bid = 0, b = tq->tq_buckets; 2877 bid <= tq->tq_nbuckets; 2878 b++, bid++) { 2879 /* 2880 * Wait for tqbucket_freelist threads to exit. 2881 */ 2882 mutex_enter(&b->tqbucket_lock); 2883 while (b->tqbucket_nfree > 0) 2884 cv_wait(&b->tqbucket_cv, &b->tqbucket_lock); 2885 mutex_exit(&b->tqbucket_lock); 2886 } 2887 2888 /* 2889 * Threads that are migrating between buckets could be 2890 * missed by the waits on tqbucket_nfree, so also wait 2891 * for the total thread count to go to zero. 2892 */ 2893 mutex_enter(&tq->tq_lock); 2894 while (tq->tq_dnthreads > 0) { 2895 cv_wait(&tq->tq_exit_cv, &tq->tq_lock); 2896 } 2897 mutex_exit(&tq->tq_lock); 2898 2899 /* 2900 * Destroy all buckets 2901 */ 2902 for (bid = 0, b = tq->tq_buckets; 2903 bid <= tq->tq_nbuckets; 2904 b++, bid++) { 2905 mutex_destroy(&b->tqbucket_lock); 2906 cv_destroy(&b->tqbucket_cv); 2907 } 2908 2909 kmem_free(tq->tq_buckets, 2910 sizeof (taskq_bucket_t) * (tq->tq_nbuckets + 1)); 2911 2912 /* Cleanup fields before returning tq to the cache */ 2913 tq->tq_buckets = NULL; 2914 tq->tq_dnthreads = 0; 2915 } else { 2916 ASSERT((tq->tq_flags & TASKQ_DYNAMIC) == 0); 2917 } 2918 2919 /* 2920 * Now that all the taskq threads are gone, we can 2921 * drop the zone hold taken in taskq_create_common 2922 */ 2923 zone_rele(tq->tq_proc->p_zone); 2924 2925 tq->tq_threads_ncpus_pct = 0; 2926 tq->tq_totaltime = 0; 2927 tq->tq_tasks = 0; 2928 tq->tq_maxtasks = 0; 2929 tq->tq_executed = 0; 2930 kmem_cache_free(taskq_cache, tq); 2931 } 2932 2933 /* 2934 * This is called asynchronously after taskq_dispatch has failed to 2935 * find a free thread. Try to create a thread (taskq_bucket_extend) 2936 * and if that fails, make sure the bucket has at least one thread, 2937 * redirecting a thread from another bucket if necessary. 2938 */ 2939 static void 2940 taskq_bucket_overflow(void *arg) 2941 { 2942 taskq_bucket_t *b = arg; 2943 2944 if (taskq_bucket_extend(b) == NULL) { 2945 taskq_bucket_redist(b); 2946 } 2947 } 2948 2949 /* 2950 * Extend a bucket with a new entry on the free list and attach a worker 2951 * thread to it. This is called from a context where sleep is allowed. 2952 * This function may quietly fail. Callers deal with the possibility 2953 * that this might not have created a thread for some reason, eg. 2954 * lack of resources or limits on the number of threads. 2955 * 2956 * Argument: pointer to the bucket. 2957 * Return: pointer to new taskq_ent_t if we created a thread, else NULL 2958 */ 2959 static taskq_ent_t * 2960 taskq_bucket_extend(taskq_bucket_t *b) 2961 { 2962 taskq_ent_t *tqe; 2963 taskq_t *tq = b->tqbucket_taskq; 2964 taskq_bucket_t *idleb = &tq->tq_buckets[tq->tq_nbuckets]; 2965 kthread_t *t; 2966 int nthreads; 2967 2968 /* How many threads currently in this bucket? */ 2969 mutex_enter(&b->tqbucket_lock); 2970 nthreads = b->tqbucket_nalloc + b->tqbucket_nfree; 2971 mutex_exit(&b->tqbucket_lock); 2972 2973 mutex_enter(&tq->tq_lock); 2974 2975 /* 2976 * When there are no threads in this bucket, this call should 2977 * "try harder", so continue even if short on memory. 2978 */ 2979 if (! ENOUGH_MEMORY() && (nthreads > 0)) { 2980 tq->tq_nomem++; 2981 mutex_exit(&tq->tq_lock); 2982 return (NULL); 2983 } 2984 2985 /* 2986 * Observe global taskq limits on the number of threads. 2987 */ 2988 if ((tq->tq_dnthreads + 1) > tq->tq_maxsize) { 2989 mutex_exit(&tq->tq_lock); 2990 return (NULL); 2991 } 2992 tq->tq_dnthreads++; 2993 mutex_exit(&tq->tq_lock); 2994 2995 tqe = kmem_cache_alloc(taskq_ent_cache, KM_SLEEP); 2996 2997 ASSERT(tqe->tqent_thread == NULL); 2998 2999 tqe->tqent_un.tqent_bucket = b; 3000 3001 /* 3002 * Create a thread in a TS_STOPPED state first. If it is successfully 3003 * created, place the entry on the free list and start the thread. 3004 */ 3005 if ((tq->tq_flags & TASKQ_THREADS_LWP) != 0) { 3006 /* Enforced in taskq_create_common */ 3007 ASSERT3P(tq->tq_proc, !=, &p0); 3008 t = lwp_kernel_create(tq->tq_proc, taskq_d_thread, 3009 tqe, TS_STOPPED, tq->tq_pri); 3010 } else { 3011 t = thread_create(NULL, 0, taskq_d_thread, tqe, 3012 0, tq->tq_proc, TS_STOPPED, tq->tq_pri); 3013 } 3014 tqe->tqent_thread = t; 3015 t->t_taskq = tq; /* mark thread as a taskq_member() */ 3016 3017 /* 3018 * Once the entry is ready, link it to the the bucket free list. 3019 */ 3020 mutex_enter(&b->tqbucket_lock); 3021 tqe->tqent_func = NULL; 3022 TQ_APPEND(b->tqbucket_freelist, tqe); 3023 b->tqbucket_nfree++; 3024 mutex_exit(&b->tqbucket_lock); 3025 3026 /* 3027 * Account for creates in the idle bucket, because 3028 * the deaths will be accounted there. 3029 */ 3030 mutex_enter(&idleb->tqbucket_lock); 3031 TQ_STAT(idleb, tqs_tcreates); 3032 #if TASKQ_STATISTIC 3033 nthreads = idleb->tqbucket_stat.tqs_tcreates - 3034 idleb->tqbucket_stat.tqs_tdeaths; 3035 idleb->tqbucket_stat.tqs_maxthreads = MAX(nthreads, 3036 idleb->tqbucket_stat.tqs_maxthreads); 3037 #endif 3038 mutex_exit(&idleb->tqbucket_lock); 3039 3040 /* 3041 * Start the stopped thread. 3042 */ 3043 if (t->t_lwp != NULL) { 3044 proc_t *p = tq->tq_proc; 3045 mutex_enter(&p->p_lock); 3046 t->t_proc_flag &= ~TP_HOLDLWP; 3047 lwp_create_done(t); /* Sets TS_ALLSTART etc. */ 3048 mutex_exit(&p->p_lock); 3049 } else { 3050 thread_lock(t); 3051 t->t_schedflag |= TS_ALLSTART; 3052 setrun_locked(t); 3053 thread_unlock(t); 3054 } 3055 3056 return (tqe); 3057 } 3058 3059 /* 3060 * This is called after taskq_dispatch failed to find a free thread and 3061 * also failed to create a new thread. This usually means the taskq has 3062 * as many threads are we're allowed to create, but can also happen when 3063 * dispatch has TQ_NOQUEUE, or (rarely) we created a thread but lost the 3064 * new thread to another racing dispatch call. If this bucket has a 3065 * backlog and no threads, then redistribute threads by moving one 3066 * from another bucket (the donor bucket) into this one. A thread in 3067 * the donor bucket is redirected by dispatching the special function 3068 * taskq_d_redirect in the donor bucket. As soon as some thread in the 3069 * donor bucket completes, it will find taskq_d_redirect in the backlog 3070 * and move to the recipient bucket (the bucket arg here). 3071 */ 3072 static void 3073 taskq_bucket_redist(taskq_bucket_t *bucket) 3074 { 3075 taskq_t *tq = bucket->tqbucket_taskq; 3076 taskq_bucket_t *idle_bucket = &tq->tq_buckets[tq->tq_nbuckets]; 3077 taskq_bucket_t *db; /* donor bucket candidate */ 3078 taskq_ent_t *tqe = NULL; 3079 uint_t nthreads; 3080 3081 VERIFY3P(bucket, >=, tq->tq_buckets); 3082 VERIFY3P(bucket, <, idle_bucket); 3083 3084 /* 3085 * This makes no sense with a single bucket. 3086 * Someone patched taskq_minbuckets? 3087 */ 3088 if (tq->tq_nbuckets == 1) 3089 goto out; 3090 3091 /* 3092 * Only redirect when there's a backlog and no threads, 3093 * and we have not already redirected a thread. 3094 */ 3095 mutex_enter(&bucket->tqbucket_lock); 3096 nthreads = bucket->tqbucket_nalloc + bucket->tqbucket_nfree; 3097 if (nthreads > 0 || bucket->tqbucket_nbacklog == 0 || 3098 (bucket->tqbucket_flags & TQBUCKET_REDIRECT) != 0) { 3099 mutex_exit(&bucket->tqbucket_lock); 3100 goto out; 3101 } 3102 /* Clear this later if we fail to redirect a thread. */ 3103 bucket->tqbucket_flags |= TQBUCKET_REDIRECT; 3104 mutex_exit(&bucket->tqbucket_lock); 3105 3106 /* 3107 * Need a tqe for taskq_backlog_enqueue 3108 */ 3109 tqe = kmem_cache_alloc(taskq_ent_cache, KM_SLEEP); 3110 ASSERT(tqe->tqent_thread == NULL); 3111 tqe->tqent_func = taskq_d_redirect; 3112 tqe->tqent_arg = bucket; /* redirected to */ 3113 3114 /* 3115 * Find a "donor bucket" (db) that can afford to lose a thread. 3116 * Search starting at the next bucket after the passed in one. 3117 * There should be some buckets with more threads than average 3118 * because the recipient bucket has no threads. 3119 */ 3120 db = bucket; 3121 for (;;) { 3122 /* Next bucket */ 3123 if (++db == idle_bucket) 3124 db = tq->tq_buckets; 3125 if (db == bucket) 3126 break; 3127 3128 mutex_enter(&db->tqbucket_lock); 3129 nthreads = db->tqbucket_nalloc + db->tqbucket_nfree; 3130 if (nthreads > tq->tq_atpb) { 3131 taskq_backlog_enqueue(db, tqe, TQ_FRONT); 3132 mutex_exit(&db->tqbucket_lock); 3133 goto out; 3134 } 3135 mutex_exit(&db->tqbucket_lock); 3136 } 3137 /* 3138 * No bucket with more than an average number of threads. 3139 * Free the tqe; undo the redirect flag. 3140 */ 3141 DTRACE_PROBE2(taskq__redist__fails, taskq_t *, tq, 3142 taskq_bucket_t *, bucket); 3143 kmem_cache_free(taskq_ent_cache, tqe); 3144 tqe = NULL; 3145 mutex_enter(&bucket->tqbucket_lock); 3146 bucket->tqbucket_flags &= ~TQBUCKET_REDIRECT; 3147 mutex_exit(&bucket->tqbucket_lock); 3148 3149 out: 3150 /* 3151 * We're usually here because some backlog work exists. 3152 * In case a thread became idle just before a backlog 3153 * was added to some bucket, wake an idle thread. 3154 */ 3155 mutex_enter(&idle_bucket->tqbucket_lock); 3156 if (idle_bucket->tqbucket_nfree != 0) { 3157 taskq_ent_t *itqe; 3158 itqe = bucket->tqbucket_freelist.tqent_prev; 3159 cv_signal(&itqe->tqent_cv); 3160 } 3161 mutex_exit(&idle_bucket->tqbucket_lock); 3162 3163 DTRACE_PROBE3(taskq__bucket__redist__ret, taskq_t *, tq, 3164 taskq_bucket_t *, bucket, taskq_ent_t *, tqe); 3165 } 3166 3167 static int 3168 taskq_kstat_update(kstat_t *ksp, int rw) 3169 { 3170 struct taskq_kstat *tqsp = &taskq_kstat; 3171 taskq_t *tq = ksp->ks_private; 3172 3173 if (rw == KSTAT_WRITE) 3174 return (EACCES); 3175 3176 tqsp->tq_pid.value.ui64 = tq->tq_proc->p_pid; 3177 tqsp->tq_tasks.value.ui64 = tq->tq_tasks; 3178 tqsp->tq_executed.value.ui64 = tq->tq_executed; 3179 tqsp->tq_maxtasks.value.ui64 = tq->tq_maxtasks; 3180 tqsp->tq_totaltime.value.ui64 = tq->tq_totaltime; 3181 tqsp->tq_nactive.value.ui64 = tq->tq_active; 3182 tqsp->tq_nalloc.value.ui64 = tq->tq_nalloc; 3183 tqsp->tq_pri.value.ui64 = tq->tq_pri; 3184 tqsp->tq_nthreads.value.ui64 = tq->tq_nthreads; 3185 tqsp->tq_nomem.value.ui64 = tq->tq_nomem; 3186 return (0); 3187 } 3188 3189 static int 3190 taskq_d_kstat_update(kstat_t *ksp, int rw) 3191 { 3192 struct taskq_d_kstat *tqsp = &taskq_d_kstat; 3193 taskq_t *tq = ksp->ks_private; 3194 taskq_bucket_t *b; 3195 int bid; 3196 3197 if (rw == KSTAT_WRITE) 3198 return (EACCES); 3199 3200 ASSERT(tq->tq_flags & TASKQ_DYNAMIC); 3201 3202 tqsp->tqd_pri.value.ui64 = tq->tq_pri; 3203 tqsp->tqd_nomem.value.ui64 = tq->tq_nomem; 3204 3205 /* 3206 * Accumulate tqbucket_nalloc etc, tqbucket_stats 3207 */ 3208 tqsp->tqd_nalloc.value.ui64 = 0; 3209 tqsp->tqd_nbacklog.value.ui64 = 0; 3210 tqsp->tqd_nfree.value.ui64 = 0; 3211 tqsp->tqd_totaltime.value.ui64 = 0; 3212 3213 tqsp->tqd_hits.value.ui64 = 0; 3214 tqsp->tqd_misses.value.ui64 = 0; 3215 tqsp->tqd_ihits.value.ui64 = 0; 3216 tqsp->tqd_imisses.value.ui64 = 0; 3217 tqsp->tqd_overflows.value.ui64 = 0; 3218 tqsp->tqd_maxbacklog.value.ui64 = 0; 3219 tqsp->tqd_tcreates.value.ui64 = 0; 3220 tqsp->tqd_tdeaths.value.ui64 = 0; 3221 tqsp->tqd_maxthreads.value.ui64 = 0; 3222 tqsp->tqd_disptcreates.value.ui64 = 0; 3223 3224 /* Apparently this can be called when... */ 3225 if ((b = tq->tq_buckets) == NULL) 3226 return (0); 3227 3228 for (bid = 0; bid <= tq->tq_nbuckets; b++, bid++) { 3229 3230 tqsp->tqd_nalloc.value.ui64 += b->tqbucket_nalloc; 3231 tqsp->tqd_nbacklog.value.ui64 += b->tqbucket_nbacklog; 3232 tqsp->tqd_nfree.value.ui64 += b->tqbucket_nfree; 3233 tqsp->tqd_totaltime.value.ui64 += b->tqbucket_totaltime; 3234 3235 /* 3236 * For regular buckets, update hits, misses. 3237 * For the idle bucket, update ihits, imisses 3238 */ 3239 if (bid < tq->tq_nbuckets) { 3240 tqsp->tqd_hits.value.ui64 += 3241 b->tqbucket_stat.tqs_hits; 3242 tqsp->tqd_misses.value.ui64 += 3243 b->tqbucket_stat.tqs_misses; 3244 } else { 3245 tqsp->tqd_ihits.value.ui64 += 3246 b->tqbucket_stat.tqs_hits; 3247 tqsp->tqd_imisses.value.ui64 += 3248 b->tqbucket_stat.tqs_misses; 3249 } 3250 3251 tqsp->tqd_overflows.value.ui64 += 3252 b->tqbucket_stat.tqs_overflow; 3253 tqsp->tqd_maxbacklog.value.ui64 += 3254 b->tqbucket_stat.tqs_maxbacklog; 3255 tqsp->tqd_tcreates.value.ui64 += 3256 b->tqbucket_stat.tqs_tcreates; 3257 tqsp->tqd_tdeaths.value.ui64 += 3258 b->tqbucket_stat.tqs_tdeaths; 3259 tqsp->tqd_maxthreads.value.ui64 += 3260 b->tqbucket_stat.tqs_maxthreads; 3261 tqsp->tqd_disptcreates.value.ui64 += 3262 b->tqbucket_stat.tqs_disptcreates; 3263 } 3264 3265 return (0); 3266 } 3267