1 /* 2 * Copyright (C) 2007-2010 Lawrence Livermore National Security, LLC. 3 * Copyright (C) 2007 The Regents of the University of California. 4 * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). 5 * Written by Brian Behlendorf <behlendorf1@llnl.gov>. 6 * UCRL-CODE-235197 7 * 8 * This file is part of the SPL, Solaris Porting Layer. 9 * 10 * The SPL is free software; you can redistribute it and/or modify it 11 * under the terms of the GNU General Public License as published by the 12 * Free Software Foundation; either version 2 of the License, or (at your 13 * option) any later version. 14 * 15 * The SPL is distributed in the hope that it will be useful, but WITHOUT 16 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 17 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 18 * for more details. 19 * 20 * You should have received a copy of the GNU General Public License along 21 * with the SPL. If not, see <http://www.gnu.org/licenses/>. 22 * 23 * Solaris Porting Layer (SPL) Task Queue Implementation. 24 */ 25 26 #include <sys/timer.h> 27 #include <sys/taskq.h> 28 #include <sys/kmem.h> 29 #include <sys/tsd.h> 30 #include <sys/trace_spl.h> 31 #ifdef HAVE_CPU_HOTPLUG 32 #include <linux/cpuhotplug.h> 33 #endif 34 35 int spl_taskq_thread_bind = 0; 36 module_param(spl_taskq_thread_bind, int, 0644); 37 MODULE_PARM_DESC(spl_taskq_thread_bind, "Bind taskq thread to CPU by default"); 38 39 40 int spl_taskq_thread_dynamic = 1; 41 module_param(spl_taskq_thread_dynamic, int, 0444); 42 MODULE_PARM_DESC(spl_taskq_thread_dynamic, "Allow dynamic taskq threads"); 43 44 int spl_taskq_thread_priority = 1; 45 module_param(spl_taskq_thread_priority, int, 0644); 46 MODULE_PARM_DESC(spl_taskq_thread_priority, 47 "Allow non-default priority for taskq threads"); 48 49 int spl_taskq_thread_sequential = 4; 50 module_param(spl_taskq_thread_sequential, int, 0644); 51 MODULE_PARM_DESC(spl_taskq_thread_sequential, 52 "Create new taskq threads after N sequential tasks"); 53 54 /* Global system-wide dynamic task queue available for all consumers */ 55 taskq_t *system_taskq; 56 EXPORT_SYMBOL(system_taskq); 57 /* Global dynamic task queue for long delay */ 58 taskq_t *system_delay_taskq; 59 EXPORT_SYMBOL(system_delay_taskq); 60 61 /* Private dedicated taskq for creating new taskq threads on demand. */ 62 static taskq_t *dynamic_taskq; 63 static taskq_thread_t *taskq_thread_create(taskq_t *); 64 65 #ifdef HAVE_CPU_HOTPLUG 66 /* Multi-callback id for cpu hotplugging. */ 67 static int spl_taskq_cpuhp_state; 68 #endif 69 70 /* List of all taskqs */ 71 LIST_HEAD(tq_list); 72 struct rw_semaphore tq_list_sem; 73 static uint_t taskq_tsd; 74 75 static int 76 task_km_flags(uint_t flags) 77 { 78 if (flags & TQ_NOSLEEP) 79 return (KM_NOSLEEP); 80 81 if (flags & TQ_PUSHPAGE) 82 return (KM_PUSHPAGE); 83 84 return (KM_SLEEP); 85 } 86 87 /* 88 * taskq_find_by_name - Find the largest instance number of a named taskq. 89 */ 90 static int 91 taskq_find_by_name(const char *name) 92 { 93 struct list_head *tql = NULL; 94 taskq_t *tq; 95 96 list_for_each_prev(tql, &tq_list) { 97 tq = list_entry(tql, taskq_t, tq_taskqs); 98 if (strcmp(name, tq->tq_name) == 0) 99 return (tq->tq_instance); 100 } 101 return (-1); 102 } 103 104 /* 105 * NOTE: Must be called with tq->tq_lock held, returns a list_t which 106 * is not attached to the free, work, or pending taskq lists. 107 */ 108 static taskq_ent_t * 109 task_alloc(taskq_t *tq, uint_t flags, unsigned long *irqflags) 110 { 111 taskq_ent_t *t; 112 int count = 0; 113 114 ASSERT(tq); 115 retry: 116 /* Acquire taskq_ent_t's from free list if available */ 117 if (!list_empty(&tq->tq_free_list) && !(flags & TQ_NEW)) { 118 t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list); 119 120 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); 121 ASSERT(!(t->tqent_flags & TQENT_FLAG_CANCEL)); 122 ASSERT(!timer_pending(&t->tqent_timer)); 123 124 list_del_init(&t->tqent_list); 125 return (t); 126 } 127 128 /* Free list is empty and memory allocations are prohibited */ 129 if (flags & TQ_NOALLOC) 130 return (NULL); 131 132 /* Hit maximum taskq_ent_t pool size */ 133 if (tq->tq_nalloc >= tq->tq_maxalloc) { 134 if (flags & TQ_NOSLEEP) 135 return (NULL); 136 137 /* 138 * Sleep periodically polling the free list for an available 139 * taskq_ent_t. Dispatching with TQ_SLEEP should always succeed 140 * but we cannot block forever waiting for an taskq_ent_t to 141 * show up in the free list, otherwise a deadlock can happen. 142 * 143 * Therefore, we need to allocate a new task even if the number 144 * of allocated tasks is above tq->tq_maxalloc, but we still 145 * end up delaying the task allocation by one second, thereby 146 * throttling the task dispatch rate. 147 */ 148 spin_unlock_irqrestore(&tq->tq_lock, *irqflags); 149 schedule_timeout(HZ / 100); 150 spin_lock_irqsave_nested(&tq->tq_lock, *irqflags, 151 tq->tq_lock_class); 152 if (count < 100) { 153 count++; 154 goto retry; 155 } 156 } 157 158 spin_unlock_irqrestore(&tq->tq_lock, *irqflags); 159 t = kmem_alloc(sizeof (taskq_ent_t), task_km_flags(flags)); 160 spin_lock_irqsave_nested(&tq->tq_lock, *irqflags, tq->tq_lock_class); 161 162 if (t) { 163 taskq_init_ent(t); 164 tq->tq_nalloc++; 165 } 166 167 return (t); 168 } 169 170 /* 171 * NOTE: Must be called with tq->tq_lock held, expects the taskq_ent_t 172 * to already be removed from the free, work, or pending taskq lists. 173 */ 174 static void 175 task_free(taskq_t *tq, taskq_ent_t *t) 176 { 177 ASSERT(tq); 178 ASSERT(t); 179 ASSERT(list_empty(&t->tqent_list)); 180 ASSERT(!timer_pending(&t->tqent_timer)); 181 182 kmem_free(t, sizeof (taskq_ent_t)); 183 tq->tq_nalloc--; 184 } 185 186 /* 187 * NOTE: Must be called with tq->tq_lock held, either destroys the 188 * taskq_ent_t if too many exist or moves it to the free list for later use. 189 */ 190 static void 191 task_done(taskq_t *tq, taskq_ent_t *t) 192 { 193 ASSERT(tq); 194 ASSERT(t); 195 196 /* Wake tasks blocked in taskq_wait_id() */ 197 wake_up_all(&t->tqent_waitq); 198 199 list_del_init(&t->tqent_list); 200 201 if (tq->tq_nalloc <= tq->tq_minalloc) { 202 t->tqent_id = TASKQID_INVALID; 203 t->tqent_func = NULL; 204 t->tqent_arg = NULL; 205 t->tqent_flags = 0; 206 207 list_add_tail(&t->tqent_list, &tq->tq_free_list); 208 } else { 209 task_free(tq, t); 210 } 211 } 212 213 /* 214 * When a delayed task timer expires remove it from the delay list and 215 * add it to the priority list in order for immediate processing. 216 */ 217 static void 218 task_expire_impl(taskq_ent_t *t) 219 { 220 taskq_ent_t *w; 221 taskq_t *tq = t->tqent_taskq; 222 struct list_head *l = NULL; 223 unsigned long flags; 224 225 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); 226 227 if (t->tqent_flags & TQENT_FLAG_CANCEL) { 228 ASSERT(list_empty(&t->tqent_list)); 229 spin_unlock_irqrestore(&tq->tq_lock, flags); 230 return; 231 } 232 233 t->tqent_birth = jiffies; 234 DTRACE_PROBE1(taskq_ent__birth, taskq_ent_t *, t); 235 236 /* 237 * The priority list must be maintained in strict task id order 238 * from lowest to highest for lowest_id to be easily calculable. 239 */ 240 list_del(&t->tqent_list); 241 list_for_each_prev(l, &tq->tq_prio_list) { 242 w = list_entry(l, taskq_ent_t, tqent_list); 243 if (w->tqent_id < t->tqent_id) { 244 list_add(&t->tqent_list, l); 245 break; 246 } 247 } 248 if (l == &tq->tq_prio_list) 249 list_add(&t->tqent_list, &tq->tq_prio_list); 250 251 spin_unlock_irqrestore(&tq->tq_lock, flags); 252 253 wake_up(&tq->tq_work_waitq); 254 } 255 256 static void 257 task_expire(spl_timer_list_t tl) 258 { 259 struct timer_list *tmr = (struct timer_list *)tl; 260 taskq_ent_t *t = from_timer(t, tmr, tqent_timer); 261 task_expire_impl(t); 262 } 263 264 /* 265 * Returns the lowest incomplete taskqid_t. The taskqid_t may 266 * be queued on the pending list, on the priority list, on the 267 * delay list, or on the work list currently being handled, but 268 * it is not 100% complete yet. 269 */ 270 static taskqid_t 271 taskq_lowest_id(taskq_t *tq) 272 { 273 taskqid_t lowest_id = tq->tq_next_id; 274 taskq_ent_t *t; 275 taskq_thread_t *tqt; 276 277 ASSERT(tq); 278 279 if (!list_empty(&tq->tq_pend_list)) { 280 t = list_entry(tq->tq_pend_list.next, taskq_ent_t, tqent_list); 281 lowest_id = MIN(lowest_id, t->tqent_id); 282 } 283 284 if (!list_empty(&tq->tq_prio_list)) { 285 t = list_entry(tq->tq_prio_list.next, taskq_ent_t, tqent_list); 286 lowest_id = MIN(lowest_id, t->tqent_id); 287 } 288 289 if (!list_empty(&tq->tq_delay_list)) { 290 t = list_entry(tq->tq_delay_list.next, taskq_ent_t, tqent_list); 291 lowest_id = MIN(lowest_id, t->tqent_id); 292 } 293 294 if (!list_empty(&tq->tq_active_list)) { 295 tqt = list_entry(tq->tq_active_list.next, taskq_thread_t, 296 tqt_active_list); 297 ASSERT(tqt->tqt_id != TASKQID_INVALID); 298 lowest_id = MIN(lowest_id, tqt->tqt_id); 299 } 300 301 return (lowest_id); 302 } 303 304 /* 305 * Insert a task into a list keeping the list sorted by increasing taskqid. 306 */ 307 static void 308 taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt) 309 { 310 taskq_thread_t *w; 311 struct list_head *l = NULL; 312 313 ASSERT(tq); 314 ASSERT(tqt); 315 316 list_for_each_prev(l, &tq->tq_active_list) { 317 w = list_entry(l, taskq_thread_t, tqt_active_list); 318 if (w->tqt_id < tqt->tqt_id) { 319 list_add(&tqt->tqt_active_list, l); 320 break; 321 } 322 } 323 if (l == &tq->tq_active_list) 324 list_add(&tqt->tqt_active_list, &tq->tq_active_list); 325 } 326 327 /* 328 * Find and return a task from the given list if it exists. The list 329 * must be in lowest to highest task id order. 330 */ 331 static taskq_ent_t * 332 taskq_find_list(taskq_t *tq, struct list_head *lh, taskqid_t id) 333 { 334 struct list_head *l = NULL; 335 taskq_ent_t *t; 336 337 list_for_each(l, lh) { 338 t = list_entry(l, taskq_ent_t, tqent_list); 339 340 if (t->tqent_id == id) 341 return (t); 342 343 if (t->tqent_id > id) 344 break; 345 } 346 347 return (NULL); 348 } 349 350 /* 351 * Find an already dispatched task given the task id regardless of what 352 * state it is in. If a task is still pending it will be returned. 353 * If a task is executing, then -EBUSY will be returned instead. 354 * If the task has already been run then NULL is returned. 355 */ 356 static taskq_ent_t * 357 taskq_find(taskq_t *tq, taskqid_t id) 358 { 359 taskq_thread_t *tqt; 360 struct list_head *l = NULL; 361 taskq_ent_t *t; 362 363 t = taskq_find_list(tq, &tq->tq_delay_list, id); 364 if (t) 365 return (t); 366 367 t = taskq_find_list(tq, &tq->tq_prio_list, id); 368 if (t) 369 return (t); 370 371 t = taskq_find_list(tq, &tq->tq_pend_list, id); 372 if (t) 373 return (t); 374 375 list_for_each(l, &tq->tq_active_list) { 376 tqt = list_entry(l, taskq_thread_t, tqt_active_list); 377 if (tqt->tqt_id == id) { 378 /* 379 * Instead of returning tqt_task, we just return a non 380 * NULL value to prevent misuse, since tqt_task only 381 * has two valid fields. 382 */ 383 return (ERR_PTR(-EBUSY)); 384 } 385 } 386 387 return (NULL); 388 } 389 390 /* 391 * Theory for the taskq_wait_id(), taskq_wait_outstanding(), and 392 * taskq_wait() functions below. 393 * 394 * Taskq waiting is accomplished by tracking the lowest outstanding task 395 * id and the next available task id. As tasks are dispatched they are 396 * added to the tail of the pending, priority, or delay lists. As worker 397 * threads become available the tasks are removed from the heads of these 398 * lists and linked to the worker threads. This ensures the lists are 399 * kept sorted by lowest to highest task id. 400 * 401 * Therefore the lowest outstanding task id can be quickly determined by 402 * checking the head item from all of these lists. This value is stored 403 * with the taskq as the lowest id. It only needs to be recalculated when 404 * either the task with the current lowest id completes or is canceled. 405 * 406 * By blocking until the lowest task id exceeds the passed task id the 407 * taskq_wait_outstanding() function can be easily implemented. Similarly, 408 * by blocking until the lowest task id matches the next task id taskq_wait() 409 * can be implemented. 410 * 411 * Callers should be aware that when there are multiple worked threads it 412 * is possible for larger task ids to complete before smaller ones. Also 413 * when the taskq contains delay tasks with small task ids callers may 414 * block for a considerable length of time waiting for them to expire and 415 * execute. 416 */ 417 static int 418 taskq_wait_id_check(taskq_t *tq, taskqid_t id) 419 { 420 int rc; 421 unsigned long flags; 422 423 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); 424 rc = (taskq_find(tq, id) == NULL); 425 spin_unlock_irqrestore(&tq->tq_lock, flags); 426 427 return (rc); 428 } 429 430 /* 431 * The taskq_wait_id() function blocks until the passed task id completes. 432 * This does not guarantee that all lower task ids have completed. 433 */ 434 void 435 taskq_wait_id(taskq_t *tq, taskqid_t id) 436 { 437 wait_event(tq->tq_wait_waitq, taskq_wait_id_check(tq, id)); 438 } 439 EXPORT_SYMBOL(taskq_wait_id); 440 441 static int 442 taskq_wait_outstanding_check(taskq_t *tq, taskqid_t id) 443 { 444 int rc; 445 unsigned long flags; 446 447 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); 448 rc = (id < tq->tq_lowest_id); 449 spin_unlock_irqrestore(&tq->tq_lock, flags); 450 451 return (rc); 452 } 453 454 /* 455 * The taskq_wait_outstanding() function will block until all tasks with a 456 * lower taskqid than the passed 'id' have been completed. Note that all 457 * task id's are assigned monotonically at dispatch time. Zero may be 458 * passed for the id to indicate all tasks dispatch up to this point, 459 * but not after, should be waited for. 460 */ 461 void 462 taskq_wait_outstanding(taskq_t *tq, taskqid_t id) 463 { 464 id = id ? id : tq->tq_next_id - 1; 465 wait_event(tq->tq_wait_waitq, taskq_wait_outstanding_check(tq, id)); 466 } 467 EXPORT_SYMBOL(taskq_wait_outstanding); 468 469 static int 470 taskq_wait_check(taskq_t *tq) 471 { 472 int rc; 473 unsigned long flags; 474 475 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); 476 rc = (tq->tq_lowest_id == tq->tq_next_id); 477 spin_unlock_irqrestore(&tq->tq_lock, flags); 478 479 return (rc); 480 } 481 482 /* 483 * The taskq_wait() function will block until the taskq is empty. 484 * This means that if a taskq re-dispatches work to itself taskq_wait() 485 * callers will block indefinitely. 486 */ 487 void 488 taskq_wait(taskq_t *tq) 489 { 490 wait_event(tq->tq_wait_waitq, taskq_wait_check(tq)); 491 } 492 EXPORT_SYMBOL(taskq_wait); 493 494 int 495 taskq_member(taskq_t *tq, kthread_t *t) 496 { 497 return (tq == (taskq_t *)tsd_get_by_thread(taskq_tsd, t)); 498 } 499 EXPORT_SYMBOL(taskq_member); 500 501 taskq_t * 502 taskq_of_curthread(void) 503 { 504 return (tsd_get(taskq_tsd)); 505 } 506 EXPORT_SYMBOL(taskq_of_curthread); 507 508 /* 509 * Cancel an already dispatched task given the task id. Still pending tasks 510 * will be immediately canceled, and if the task is active the function will 511 * block until it completes. Preallocated tasks which are canceled must be 512 * freed by the caller. 513 */ 514 int 515 taskq_cancel_id(taskq_t *tq, taskqid_t id) 516 { 517 taskq_ent_t *t; 518 int rc = ENOENT; 519 unsigned long flags; 520 521 ASSERT(tq); 522 523 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); 524 t = taskq_find(tq, id); 525 if (t && t != ERR_PTR(-EBUSY)) { 526 list_del_init(&t->tqent_list); 527 t->tqent_flags |= TQENT_FLAG_CANCEL; 528 529 /* 530 * When canceling the lowest outstanding task id we 531 * must recalculate the new lowest outstanding id. 532 */ 533 if (tq->tq_lowest_id == t->tqent_id) { 534 tq->tq_lowest_id = taskq_lowest_id(tq); 535 ASSERT3S(tq->tq_lowest_id, >, t->tqent_id); 536 } 537 538 /* 539 * The task_expire() function takes the tq->tq_lock so drop 540 * drop the lock before synchronously cancelling the timer. 541 */ 542 if (timer_pending(&t->tqent_timer)) { 543 spin_unlock_irqrestore(&tq->tq_lock, flags); 544 del_timer_sync(&t->tqent_timer); 545 spin_lock_irqsave_nested(&tq->tq_lock, flags, 546 tq->tq_lock_class); 547 } 548 549 if (!(t->tqent_flags & TQENT_FLAG_PREALLOC)) 550 task_done(tq, t); 551 552 rc = 0; 553 } 554 spin_unlock_irqrestore(&tq->tq_lock, flags); 555 556 if (t == ERR_PTR(-EBUSY)) { 557 taskq_wait_id(tq, id); 558 rc = EBUSY; 559 } 560 561 return (rc); 562 } 563 EXPORT_SYMBOL(taskq_cancel_id); 564 565 static int taskq_thread_spawn(taskq_t *tq); 566 567 taskqid_t 568 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) 569 { 570 taskq_ent_t *t; 571 taskqid_t rc = TASKQID_INVALID; 572 unsigned long irqflags; 573 574 ASSERT(tq); 575 ASSERT(func); 576 577 spin_lock_irqsave_nested(&tq->tq_lock, irqflags, tq->tq_lock_class); 578 579 /* Taskq being destroyed and all tasks drained */ 580 if (!(tq->tq_flags & TASKQ_ACTIVE)) 581 goto out; 582 583 /* Do not queue the task unless there is idle thread for it */ 584 ASSERT(tq->tq_nactive <= tq->tq_nthreads); 585 if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads)) { 586 /* Dynamic taskq may be able to spawn another thread */ 587 if (!(tq->tq_flags & TASKQ_DYNAMIC) || 588 taskq_thread_spawn(tq) == 0) 589 goto out; 590 } 591 592 if ((t = task_alloc(tq, flags, &irqflags)) == NULL) 593 goto out; 594 595 spin_lock(&t->tqent_lock); 596 597 /* Queue to the front of the list to enforce TQ_NOQUEUE semantics */ 598 if (flags & TQ_NOQUEUE) 599 list_add(&t->tqent_list, &tq->tq_prio_list); 600 /* Queue to the priority list instead of the pending list */ 601 else if (flags & TQ_FRONT) 602 list_add_tail(&t->tqent_list, &tq->tq_prio_list); 603 else 604 list_add_tail(&t->tqent_list, &tq->tq_pend_list); 605 606 t->tqent_id = rc = tq->tq_next_id; 607 tq->tq_next_id++; 608 t->tqent_func = func; 609 t->tqent_arg = arg; 610 t->tqent_taskq = tq; 611 t->tqent_timer.function = NULL; 612 t->tqent_timer.expires = 0; 613 614 t->tqent_birth = jiffies; 615 DTRACE_PROBE1(taskq_ent__birth, taskq_ent_t *, t); 616 617 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); 618 619 spin_unlock(&t->tqent_lock); 620 621 wake_up(&tq->tq_work_waitq); 622 out: 623 /* Spawn additional taskq threads if required. */ 624 if (!(flags & TQ_NOQUEUE) && tq->tq_nactive == tq->tq_nthreads) 625 (void) taskq_thread_spawn(tq); 626 627 spin_unlock_irqrestore(&tq->tq_lock, irqflags); 628 return (rc); 629 } 630 EXPORT_SYMBOL(taskq_dispatch); 631 632 taskqid_t 633 taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg, 634 uint_t flags, clock_t expire_time) 635 { 636 taskqid_t rc = TASKQID_INVALID; 637 taskq_ent_t *t; 638 unsigned long irqflags; 639 640 ASSERT(tq); 641 ASSERT(func); 642 643 spin_lock_irqsave_nested(&tq->tq_lock, irqflags, tq->tq_lock_class); 644 645 /* Taskq being destroyed and all tasks drained */ 646 if (!(tq->tq_flags & TASKQ_ACTIVE)) 647 goto out; 648 649 if ((t = task_alloc(tq, flags, &irqflags)) == NULL) 650 goto out; 651 652 spin_lock(&t->tqent_lock); 653 654 /* Queue to the delay list for subsequent execution */ 655 list_add_tail(&t->tqent_list, &tq->tq_delay_list); 656 657 t->tqent_id = rc = tq->tq_next_id; 658 tq->tq_next_id++; 659 t->tqent_func = func; 660 t->tqent_arg = arg; 661 t->tqent_taskq = tq; 662 t->tqent_timer.function = task_expire; 663 t->tqent_timer.expires = (unsigned long)expire_time; 664 add_timer(&t->tqent_timer); 665 666 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); 667 668 spin_unlock(&t->tqent_lock); 669 out: 670 /* Spawn additional taskq threads if required. */ 671 if (tq->tq_nactive == tq->tq_nthreads) 672 (void) taskq_thread_spawn(tq); 673 spin_unlock_irqrestore(&tq->tq_lock, irqflags); 674 return (rc); 675 } 676 EXPORT_SYMBOL(taskq_dispatch_delay); 677 678 void 679 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, 680 taskq_ent_t *t) 681 { 682 unsigned long irqflags; 683 ASSERT(tq); 684 ASSERT(func); 685 686 spin_lock_irqsave_nested(&tq->tq_lock, irqflags, 687 tq->tq_lock_class); 688 689 /* Taskq being destroyed and all tasks drained */ 690 if (!(tq->tq_flags & TASKQ_ACTIVE)) { 691 t->tqent_id = TASKQID_INVALID; 692 goto out; 693 } 694 695 if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads)) { 696 /* Dynamic taskq may be able to spawn another thread */ 697 if (!(tq->tq_flags & TASKQ_DYNAMIC) || 698 taskq_thread_spawn(tq) == 0) 699 goto out2; 700 flags |= TQ_FRONT; 701 } 702 703 spin_lock(&t->tqent_lock); 704 705 /* 706 * Make sure the entry is not on some other taskq; it is important to 707 * ASSERT() under lock 708 */ 709 ASSERT(taskq_empty_ent(t)); 710 711 /* 712 * Mark it as a prealloc'd task. This is important 713 * to ensure that we don't free it later. 714 */ 715 t->tqent_flags |= TQENT_FLAG_PREALLOC; 716 717 /* Queue to the priority list instead of the pending list */ 718 if (flags & TQ_FRONT) 719 list_add_tail(&t->tqent_list, &tq->tq_prio_list); 720 else 721 list_add_tail(&t->tqent_list, &tq->tq_pend_list); 722 723 t->tqent_id = tq->tq_next_id; 724 tq->tq_next_id++; 725 t->tqent_func = func; 726 t->tqent_arg = arg; 727 t->tqent_taskq = tq; 728 729 t->tqent_birth = jiffies; 730 DTRACE_PROBE1(taskq_ent__birth, taskq_ent_t *, t); 731 732 spin_unlock(&t->tqent_lock); 733 734 wake_up(&tq->tq_work_waitq); 735 out: 736 /* Spawn additional taskq threads if required. */ 737 if (tq->tq_nactive == tq->tq_nthreads) 738 (void) taskq_thread_spawn(tq); 739 out2: 740 spin_unlock_irqrestore(&tq->tq_lock, irqflags); 741 } 742 EXPORT_SYMBOL(taskq_dispatch_ent); 743 744 int 745 taskq_empty_ent(taskq_ent_t *t) 746 { 747 return (list_empty(&t->tqent_list)); 748 } 749 EXPORT_SYMBOL(taskq_empty_ent); 750 751 void 752 taskq_init_ent(taskq_ent_t *t) 753 { 754 spin_lock_init(&t->tqent_lock); 755 init_waitqueue_head(&t->tqent_waitq); 756 timer_setup(&t->tqent_timer, NULL, 0); 757 INIT_LIST_HEAD(&t->tqent_list); 758 t->tqent_id = 0; 759 t->tqent_func = NULL; 760 t->tqent_arg = NULL; 761 t->tqent_flags = 0; 762 t->tqent_taskq = NULL; 763 } 764 EXPORT_SYMBOL(taskq_init_ent); 765 766 /* 767 * Return the next pending task, preference is given to tasks on the 768 * priority list which were dispatched with TQ_FRONT. 769 */ 770 static taskq_ent_t * 771 taskq_next_ent(taskq_t *tq) 772 { 773 struct list_head *list; 774 775 if (!list_empty(&tq->tq_prio_list)) 776 list = &tq->tq_prio_list; 777 else if (!list_empty(&tq->tq_pend_list)) 778 list = &tq->tq_pend_list; 779 else 780 return (NULL); 781 782 return (list_entry(list->next, taskq_ent_t, tqent_list)); 783 } 784 785 /* 786 * Spawns a new thread for the specified taskq. 787 */ 788 static void 789 taskq_thread_spawn_task(void *arg) 790 { 791 taskq_t *tq = (taskq_t *)arg; 792 unsigned long flags; 793 794 if (taskq_thread_create(tq) == NULL) { 795 /* restore spawning count if failed */ 796 spin_lock_irqsave_nested(&tq->tq_lock, flags, 797 tq->tq_lock_class); 798 tq->tq_nspawn--; 799 spin_unlock_irqrestore(&tq->tq_lock, flags); 800 } 801 } 802 803 /* 804 * Spawn addition threads for dynamic taskqs (TASKQ_DYNAMIC) the current 805 * number of threads is insufficient to handle the pending tasks. These 806 * new threads must be created by the dedicated dynamic_taskq to avoid 807 * deadlocks between thread creation and memory reclaim. The system_taskq 808 * which is also a dynamic taskq cannot be safely used for this. 809 */ 810 static int 811 taskq_thread_spawn(taskq_t *tq) 812 { 813 int spawning = 0; 814 815 if (!(tq->tq_flags & TASKQ_DYNAMIC)) 816 return (0); 817 818 if ((tq->tq_nthreads + tq->tq_nspawn < tq->tq_maxthreads) && 819 (tq->tq_flags & TASKQ_ACTIVE)) { 820 spawning = (++tq->tq_nspawn); 821 taskq_dispatch(dynamic_taskq, taskq_thread_spawn_task, 822 tq, TQ_NOSLEEP); 823 } 824 825 return (spawning); 826 } 827 828 /* 829 * Threads in a dynamic taskq should only exit once it has been completely 830 * drained and no other threads are actively servicing tasks. This prevents 831 * threads from being created and destroyed more than is required. 832 * 833 * The first thread is the thread list is treated as the primary thread. 834 * There is nothing special about the primary thread but in order to avoid 835 * all the taskq pids from changing we opt to make it long running. 836 */ 837 static int 838 taskq_thread_should_stop(taskq_t *tq, taskq_thread_t *tqt) 839 { 840 if (!(tq->tq_flags & TASKQ_DYNAMIC)) 841 return (0); 842 843 if (list_first_entry(&(tq->tq_thread_list), taskq_thread_t, 844 tqt_thread_list) == tqt) 845 return (0); 846 847 return 848 ((tq->tq_nspawn == 0) && /* No threads are being spawned */ 849 (tq->tq_nactive == 0) && /* No threads are handling tasks */ 850 (tq->tq_nthreads > 1) && /* More than 1 thread is running */ 851 (!taskq_next_ent(tq)) && /* There are no pending tasks */ 852 (spl_taskq_thread_dynamic)); /* Dynamic taskqs are allowed */ 853 } 854 855 static int 856 taskq_thread(void *args) 857 { 858 DECLARE_WAITQUEUE(wait, current); 859 sigset_t blocked; 860 taskq_thread_t *tqt = args; 861 taskq_t *tq; 862 taskq_ent_t *t; 863 int seq_tasks = 0; 864 unsigned long flags; 865 taskq_ent_t dup_task = {}; 866 867 ASSERT(tqt); 868 ASSERT(tqt->tqt_tq); 869 tq = tqt->tqt_tq; 870 current->flags |= PF_NOFREEZE; 871 872 (void) spl_fstrans_mark(); 873 874 sigfillset(&blocked); 875 sigprocmask(SIG_BLOCK, &blocked, NULL); 876 flush_signals(current); 877 878 tsd_set(taskq_tsd, tq); 879 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); 880 /* 881 * If we are dynamically spawned, decrease spawning count. Note that 882 * we could be created during taskq_create, in which case we shouldn't 883 * do the decrement. But it's fine because taskq_create will reset 884 * tq_nspawn later. 885 */ 886 if (tq->tq_flags & TASKQ_DYNAMIC) 887 tq->tq_nspawn--; 888 889 /* Immediately exit if more threads than allowed were created. */ 890 if (tq->tq_nthreads >= tq->tq_maxthreads) 891 goto error; 892 893 tq->tq_nthreads++; 894 list_add_tail(&tqt->tqt_thread_list, &tq->tq_thread_list); 895 wake_up(&tq->tq_wait_waitq); 896 set_current_state(TASK_INTERRUPTIBLE); 897 898 while (!kthread_should_stop()) { 899 900 if (list_empty(&tq->tq_pend_list) && 901 list_empty(&tq->tq_prio_list)) { 902 903 if (taskq_thread_should_stop(tq, tqt)) { 904 wake_up_all(&tq->tq_wait_waitq); 905 break; 906 } 907 908 add_wait_queue_exclusive(&tq->tq_work_waitq, &wait); 909 spin_unlock_irqrestore(&tq->tq_lock, flags); 910 911 schedule(); 912 seq_tasks = 0; 913 914 spin_lock_irqsave_nested(&tq->tq_lock, flags, 915 tq->tq_lock_class); 916 remove_wait_queue(&tq->tq_work_waitq, &wait); 917 } else { 918 __set_current_state(TASK_RUNNING); 919 } 920 921 if ((t = taskq_next_ent(tq)) != NULL) { 922 list_del_init(&t->tqent_list); 923 924 /* 925 * A TQENT_FLAG_PREALLOC task may be reused or freed 926 * during the task function call. Store tqent_id and 927 * tqent_flags here. 928 * 929 * Also use an on stack taskq_ent_t for tqt_task 930 * assignment in this case; we want to make sure 931 * to duplicate all fields, so the values are 932 * correct when it's accessed via DTRACE_PROBE*. 933 */ 934 tqt->tqt_id = t->tqent_id; 935 tqt->tqt_flags = t->tqent_flags; 936 937 if (t->tqent_flags & TQENT_FLAG_PREALLOC) { 938 dup_task = *t; 939 t = &dup_task; 940 } 941 tqt->tqt_task = t; 942 943 taskq_insert_in_order(tq, tqt); 944 tq->tq_nactive++; 945 spin_unlock_irqrestore(&tq->tq_lock, flags); 946 947 DTRACE_PROBE1(taskq_ent__start, taskq_ent_t *, t); 948 949 /* Perform the requested task */ 950 t->tqent_func(t->tqent_arg); 951 952 DTRACE_PROBE1(taskq_ent__finish, taskq_ent_t *, t); 953 954 spin_lock_irqsave_nested(&tq->tq_lock, flags, 955 tq->tq_lock_class); 956 tq->tq_nactive--; 957 list_del_init(&tqt->tqt_active_list); 958 tqt->tqt_task = NULL; 959 960 /* For prealloc'd tasks, we don't free anything. */ 961 if (!(tqt->tqt_flags & TQENT_FLAG_PREALLOC)) 962 task_done(tq, t); 963 964 /* 965 * When the current lowest outstanding taskqid is 966 * done calculate the new lowest outstanding id 967 */ 968 if (tq->tq_lowest_id == tqt->tqt_id) { 969 tq->tq_lowest_id = taskq_lowest_id(tq); 970 ASSERT3S(tq->tq_lowest_id, >, tqt->tqt_id); 971 } 972 973 /* Spawn additional taskq threads if required. */ 974 if ((++seq_tasks) > spl_taskq_thread_sequential && 975 taskq_thread_spawn(tq)) 976 seq_tasks = 0; 977 978 tqt->tqt_id = TASKQID_INVALID; 979 tqt->tqt_flags = 0; 980 wake_up_all(&tq->tq_wait_waitq); 981 } else { 982 if (taskq_thread_should_stop(tq, tqt)) 983 break; 984 } 985 986 set_current_state(TASK_INTERRUPTIBLE); 987 988 } 989 990 __set_current_state(TASK_RUNNING); 991 tq->tq_nthreads--; 992 list_del_init(&tqt->tqt_thread_list); 993 error: 994 kmem_free(tqt, sizeof (taskq_thread_t)); 995 spin_unlock_irqrestore(&tq->tq_lock, flags); 996 997 tsd_set(taskq_tsd, NULL); 998 999 return (0); 1000 } 1001 1002 static taskq_thread_t * 1003 taskq_thread_create(taskq_t *tq) 1004 { 1005 static int last_used_cpu = 0; 1006 taskq_thread_t *tqt; 1007 1008 tqt = kmem_alloc(sizeof (*tqt), KM_PUSHPAGE); 1009 INIT_LIST_HEAD(&tqt->tqt_thread_list); 1010 INIT_LIST_HEAD(&tqt->tqt_active_list); 1011 tqt->tqt_tq = tq; 1012 tqt->tqt_id = TASKQID_INVALID; 1013 1014 tqt->tqt_thread = spl_kthread_create(taskq_thread, tqt, 1015 "%s", tq->tq_name); 1016 if (tqt->tqt_thread == NULL) { 1017 kmem_free(tqt, sizeof (taskq_thread_t)); 1018 return (NULL); 1019 } 1020 1021 if (spl_taskq_thread_bind) { 1022 last_used_cpu = (last_used_cpu + 1) % num_online_cpus(); 1023 kthread_bind(tqt->tqt_thread, last_used_cpu); 1024 } 1025 1026 if (spl_taskq_thread_priority) 1027 set_user_nice(tqt->tqt_thread, PRIO_TO_NICE(tq->tq_pri)); 1028 1029 wake_up_process(tqt->tqt_thread); 1030 1031 return (tqt); 1032 } 1033 1034 taskq_t * 1035 taskq_create(const char *name, int threads_arg, pri_t pri, 1036 int minalloc, int maxalloc, uint_t flags) 1037 { 1038 taskq_t *tq; 1039 taskq_thread_t *tqt; 1040 int count = 0, rc = 0, i; 1041 unsigned long irqflags; 1042 int nthreads = threads_arg; 1043 1044 ASSERT(name != NULL); 1045 ASSERT(minalloc >= 0); 1046 ASSERT(maxalloc <= INT_MAX); 1047 ASSERT(!(flags & (TASKQ_CPR_SAFE))); /* Unsupported */ 1048 1049 /* Scale the number of threads using nthreads as a percentage */ 1050 if (flags & TASKQ_THREADS_CPU_PCT) { 1051 ASSERT(nthreads <= 100); 1052 ASSERT(nthreads >= 0); 1053 nthreads = MIN(threads_arg, 100); 1054 nthreads = MAX(nthreads, 0); 1055 nthreads = MAX((num_online_cpus() * nthreads) /100, 1); 1056 } 1057 1058 tq = kmem_alloc(sizeof (*tq), KM_PUSHPAGE); 1059 if (tq == NULL) 1060 return (NULL); 1061 1062 tq->tq_hp_support = B_FALSE; 1063 #ifdef HAVE_CPU_HOTPLUG 1064 if (flags & TASKQ_THREADS_CPU_PCT) { 1065 tq->tq_hp_support = B_TRUE; 1066 if (cpuhp_state_add_instance_nocalls(spl_taskq_cpuhp_state, 1067 &tq->tq_hp_cb_node) != 0) { 1068 kmem_free(tq, sizeof (*tq)); 1069 return (NULL); 1070 } 1071 } 1072 #endif 1073 1074 spin_lock_init(&tq->tq_lock); 1075 INIT_LIST_HEAD(&tq->tq_thread_list); 1076 INIT_LIST_HEAD(&tq->tq_active_list); 1077 tq->tq_name = kmem_strdup(name); 1078 tq->tq_nactive = 0; 1079 tq->tq_nthreads = 0; 1080 tq->tq_nspawn = 0; 1081 tq->tq_maxthreads = nthreads; 1082 tq->tq_cpu_pct = threads_arg; 1083 tq->tq_pri = pri; 1084 tq->tq_minalloc = minalloc; 1085 tq->tq_maxalloc = maxalloc; 1086 tq->tq_nalloc = 0; 1087 tq->tq_flags = (flags | TASKQ_ACTIVE); 1088 tq->tq_next_id = TASKQID_INITIAL; 1089 tq->tq_lowest_id = TASKQID_INITIAL; 1090 INIT_LIST_HEAD(&tq->tq_free_list); 1091 INIT_LIST_HEAD(&tq->tq_pend_list); 1092 INIT_LIST_HEAD(&tq->tq_prio_list); 1093 INIT_LIST_HEAD(&tq->tq_delay_list); 1094 init_waitqueue_head(&tq->tq_work_waitq); 1095 init_waitqueue_head(&tq->tq_wait_waitq); 1096 tq->tq_lock_class = TQ_LOCK_GENERAL; 1097 INIT_LIST_HEAD(&tq->tq_taskqs); 1098 1099 if (flags & TASKQ_PREPOPULATE) { 1100 spin_lock_irqsave_nested(&tq->tq_lock, irqflags, 1101 tq->tq_lock_class); 1102 1103 for (i = 0; i < minalloc; i++) 1104 task_done(tq, task_alloc(tq, TQ_PUSHPAGE | TQ_NEW, 1105 &irqflags)); 1106 1107 spin_unlock_irqrestore(&tq->tq_lock, irqflags); 1108 } 1109 1110 if ((flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic) 1111 nthreads = 1; 1112 1113 for (i = 0; i < nthreads; i++) { 1114 tqt = taskq_thread_create(tq); 1115 if (tqt == NULL) 1116 rc = 1; 1117 else 1118 count++; 1119 } 1120 1121 /* Wait for all threads to be started before potential destroy */ 1122 wait_event(tq->tq_wait_waitq, tq->tq_nthreads == count); 1123 /* 1124 * taskq_thread might have touched nspawn, but we don't want them to 1125 * because they're not dynamically spawned. So we reset it to 0 1126 */ 1127 tq->tq_nspawn = 0; 1128 1129 if (rc) { 1130 taskq_destroy(tq); 1131 tq = NULL; 1132 } else { 1133 down_write(&tq_list_sem); 1134 tq->tq_instance = taskq_find_by_name(name) + 1; 1135 list_add_tail(&tq->tq_taskqs, &tq_list); 1136 up_write(&tq_list_sem); 1137 } 1138 1139 return (tq); 1140 } 1141 EXPORT_SYMBOL(taskq_create); 1142 1143 void 1144 taskq_destroy(taskq_t *tq) 1145 { 1146 struct task_struct *thread; 1147 taskq_thread_t *tqt; 1148 taskq_ent_t *t; 1149 unsigned long flags; 1150 1151 ASSERT(tq); 1152 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); 1153 tq->tq_flags &= ~TASKQ_ACTIVE; 1154 spin_unlock_irqrestore(&tq->tq_lock, flags); 1155 1156 #ifdef HAVE_CPU_HOTPLUG 1157 if (tq->tq_hp_support) { 1158 VERIFY0(cpuhp_state_remove_instance_nocalls( 1159 spl_taskq_cpuhp_state, &tq->tq_hp_cb_node)); 1160 } 1161 #endif 1162 /* 1163 * When TASKQ_ACTIVE is clear new tasks may not be added nor may 1164 * new worker threads be spawned for dynamic taskq. 1165 */ 1166 if (dynamic_taskq != NULL) 1167 taskq_wait_outstanding(dynamic_taskq, 0); 1168 1169 taskq_wait(tq); 1170 1171 /* remove taskq from global list used by the kstats */ 1172 down_write(&tq_list_sem); 1173 list_del(&tq->tq_taskqs); 1174 up_write(&tq_list_sem); 1175 1176 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); 1177 /* wait for spawning threads to insert themselves to the list */ 1178 while (tq->tq_nspawn) { 1179 spin_unlock_irqrestore(&tq->tq_lock, flags); 1180 schedule_timeout_interruptible(1); 1181 spin_lock_irqsave_nested(&tq->tq_lock, flags, 1182 tq->tq_lock_class); 1183 } 1184 1185 /* 1186 * Signal each thread to exit and block until it does. Each thread 1187 * is responsible for removing itself from the list and freeing its 1188 * taskq_thread_t. This allows for idle threads to opt to remove 1189 * themselves from the taskq. They can be recreated as needed. 1190 */ 1191 while (!list_empty(&tq->tq_thread_list)) { 1192 tqt = list_entry(tq->tq_thread_list.next, 1193 taskq_thread_t, tqt_thread_list); 1194 thread = tqt->tqt_thread; 1195 spin_unlock_irqrestore(&tq->tq_lock, flags); 1196 1197 kthread_stop(thread); 1198 1199 spin_lock_irqsave_nested(&tq->tq_lock, flags, 1200 tq->tq_lock_class); 1201 } 1202 1203 while (!list_empty(&tq->tq_free_list)) { 1204 t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list); 1205 1206 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); 1207 1208 list_del_init(&t->tqent_list); 1209 task_free(tq, t); 1210 } 1211 1212 ASSERT0(tq->tq_nthreads); 1213 ASSERT0(tq->tq_nalloc); 1214 ASSERT0(tq->tq_nspawn); 1215 ASSERT(list_empty(&tq->tq_thread_list)); 1216 ASSERT(list_empty(&tq->tq_active_list)); 1217 ASSERT(list_empty(&tq->tq_free_list)); 1218 ASSERT(list_empty(&tq->tq_pend_list)); 1219 ASSERT(list_empty(&tq->tq_prio_list)); 1220 ASSERT(list_empty(&tq->tq_delay_list)); 1221 1222 spin_unlock_irqrestore(&tq->tq_lock, flags); 1223 1224 kmem_strfree(tq->tq_name); 1225 kmem_free(tq, sizeof (taskq_t)); 1226 } 1227 EXPORT_SYMBOL(taskq_destroy); 1228 1229 static unsigned int spl_taskq_kick = 0; 1230 1231 /* 1232 * 2.6.36 API Change 1233 * module_param_cb is introduced to take kernel_param_ops and 1234 * module_param_call is marked as obsolete. Also set and get operations 1235 * were changed to take a 'const struct kernel_param *'. 1236 */ 1237 static int 1238 #ifdef module_param_cb 1239 param_set_taskq_kick(const char *val, const struct kernel_param *kp) 1240 #else 1241 param_set_taskq_kick(const char *val, struct kernel_param *kp) 1242 #endif 1243 { 1244 int ret; 1245 taskq_t *tq = NULL; 1246 taskq_ent_t *t; 1247 unsigned long flags; 1248 1249 ret = param_set_uint(val, kp); 1250 if (ret < 0 || !spl_taskq_kick) 1251 return (ret); 1252 /* reset value */ 1253 spl_taskq_kick = 0; 1254 1255 down_read(&tq_list_sem); 1256 list_for_each_entry(tq, &tq_list, tq_taskqs) { 1257 spin_lock_irqsave_nested(&tq->tq_lock, flags, 1258 tq->tq_lock_class); 1259 /* Check if the first pending is older than 5 seconds */ 1260 t = taskq_next_ent(tq); 1261 if (t && time_after(jiffies, t->tqent_birth + 5*HZ)) { 1262 (void) taskq_thread_spawn(tq); 1263 printk(KERN_INFO "spl: Kicked taskq %s/%d\n", 1264 tq->tq_name, tq->tq_instance); 1265 } 1266 spin_unlock_irqrestore(&tq->tq_lock, flags); 1267 } 1268 up_read(&tq_list_sem); 1269 return (ret); 1270 } 1271 1272 #ifdef module_param_cb 1273 static const struct kernel_param_ops param_ops_taskq_kick = { 1274 .set = param_set_taskq_kick, 1275 .get = param_get_uint, 1276 }; 1277 module_param_cb(spl_taskq_kick, ¶m_ops_taskq_kick, &spl_taskq_kick, 0644); 1278 #else 1279 module_param_call(spl_taskq_kick, param_set_taskq_kick, param_get_uint, 1280 &spl_taskq_kick, 0644); 1281 #endif 1282 MODULE_PARM_DESC(spl_taskq_kick, 1283 "Write nonzero to kick stuck taskqs to spawn more threads"); 1284 1285 #ifdef HAVE_CPU_HOTPLUG 1286 /* 1287 * This callback will be called exactly once for each core that comes online, 1288 * for each dynamic taskq. We attempt to expand taskqs that have 1289 * TASKQ_THREADS_CPU_PCT set. We need to redo the percentage calculation every 1290 * time, to correctly determine whether or not to add a thread. 1291 */ 1292 static int 1293 spl_taskq_expand(unsigned int cpu, struct hlist_node *node) 1294 { 1295 taskq_t *tq = list_entry(node, taskq_t, tq_hp_cb_node); 1296 unsigned long flags; 1297 int err = 0; 1298 1299 ASSERT(tq); 1300 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); 1301 1302 if (!(tq->tq_flags & TASKQ_ACTIVE)) 1303 goto out; 1304 1305 ASSERT(tq->tq_flags & TASKQ_THREADS_CPU_PCT); 1306 int nthreads = MIN(tq->tq_cpu_pct, 100); 1307 nthreads = MAX(((num_online_cpus() + 1) * nthreads) / 100, 1); 1308 tq->tq_maxthreads = nthreads; 1309 1310 if (!((tq->tq_flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic) && 1311 tq->tq_maxthreads > tq->tq_nthreads) { 1312 ASSERT3U(tq->tq_maxthreads, ==, tq->tq_nthreads + 1); 1313 taskq_thread_t *tqt = taskq_thread_create(tq); 1314 if (tqt == NULL) 1315 err = -1; 1316 } 1317 1318 out: 1319 spin_unlock_irqrestore(&tq->tq_lock, flags); 1320 return (err); 1321 } 1322 1323 /* 1324 * While we don't support offlining CPUs, it is possible that CPUs will fail 1325 * to online successfully. We do need to be able to handle this case 1326 * gracefully. 1327 */ 1328 static int 1329 spl_taskq_prepare_down(unsigned int cpu, struct hlist_node *node) 1330 { 1331 taskq_t *tq = list_entry(node, taskq_t, tq_hp_cb_node); 1332 unsigned long flags; 1333 1334 ASSERT(tq); 1335 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); 1336 1337 if (!(tq->tq_flags & TASKQ_ACTIVE)) 1338 goto out; 1339 1340 ASSERT(tq->tq_flags & TASKQ_THREADS_CPU_PCT); 1341 int nthreads = MIN(tq->tq_cpu_pct, 100); 1342 nthreads = MAX(((num_online_cpus()) * nthreads) / 100, 1); 1343 tq->tq_maxthreads = nthreads; 1344 1345 if (!((tq->tq_flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic) && 1346 tq->tq_maxthreads < tq->tq_nthreads) { 1347 ASSERT3U(tq->tq_maxthreads, ==, tq->tq_nthreads - 1); 1348 taskq_thread_t *tqt = list_entry(tq->tq_thread_list.next, 1349 taskq_thread_t, tqt_thread_list); 1350 struct task_struct *thread = tqt->tqt_thread; 1351 spin_unlock_irqrestore(&tq->tq_lock, flags); 1352 1353 kthread_stop(thread); 1354 1355 return (0); 1356 } 1357 1358 out: 1359 spin_unlock_irqrestore(&tq->tq_lock, flags); 1360 return (0); 1361 } 1362 #endif 1363 1364 int 1365 spl_taskq_init(void) 1366 { 1367 init_rwsem(&tq_list_sem); 1368 tsd_create(&taskq_tsd, NULL); 1369 1370 #ifdef HAVE_CPU_HOTPLUG 1371 spl_taskq_cpuhp_state = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, 1372 "fs/spl_taskq:online", spl_taskq_expand, spl_taskq_prepare_down); 1373 #endif 1374 1375 system_taskq = taskq_create("spl_system_taskq", MAX(boot_ncpus, 64), 1376 maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE|TASKQ_DYNAMIC); 1377 if (system_taskq == NULL) 1378 return (1); 1379 1380 system_delay_taskq = taskq_create("spl_delay_taskq", MAX(boot_ncpus, 4), 1381 maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE|TASKQ_DYNAMIC); 1382 if (system_delay_taskq == NULL) { 1383 #ifdef HAVE_CPU_HOTPLUG 1384 cpuhp_remove_multi_state(spl_taskq_cpuhp_state); 1385 #endif 1386 taskq_destroy(system_taskq); 1387 return (1); 1388 } 1389 1390 dynamic_taskq = taskq_create("spl_dynamic_taskq", 1, 1391 maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE); 1392 if (dynamic_taskq == NULL) { 1393 #ifdef HAVE_CPU_HOTPLUG 1394 cpuhp_remove_multi_state(spl_taskq_cpuhp_state); 1395 #endif 1396 taskq_destroy(system_taskq); 1397 taskq_destroy(system_delay_taskq); 1398 return (1); 1399 } 1400 1401 /* 1402 * This is used to annotate tq_lock, so 1403 * taskq_dispatch -> taskq_thread_spawn -> taskq_dispatch 1404 * does not trigger a lockdep warning re: possible recursive locking 1405 */ 1406 dynamic_taskq->tq_lock_class = TQ_LOCK_DYNAMIC; 1407 1408 return (0); 1409 } 1410 1411 void 1412 spl_taskq_fini(void) 1413 { 1414 taskq_destroy(dynamic_taskq); 1415 dynamic_taskq = NULL; 1416 1417 taskq_destroy(system_delay_taskq); 1418 system_delay_taskq = NULL; 1419 1420 taskq_destroy(system_taskq); 1421 system_taskq = NULL; 1422 1423 tsd_destroy(&taskq_tsd); 1424 1425 #ifdef HAVE_CPU_HOTPLUG 1426 cpuhp_remove_multi_state(spl_taskq_cpuhp_state); 1427 spl_taskq_cpuhp_state = 0; 1428 #endif 1429 } 1430