1 /* 2 * Copyright (C) 2007-2010 Lawrence Livermore National Security, LLC. 3 * Copyright (C) 2007 The Regents of the University of California. 4 * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). 5 * Written by Brian Behlendorf <behlendorf1@llnl.gov>. 6 * UCRL-CODE-235197 7 * 8 * This file is part of the SPL, Solaris Porting Layer. 9 * 10 * The SPL is free software; you can redistribute it and/or modify it 11 * under the terms of the GNU General Public License as published by the 12 * Free Software Foundation; either version 2 of the License, or (at your 13 * option) any later version. 14 * 15 * The SPL is distributed in the hope that it will be useful, but WITHOUT 16 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 17 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 18 * for more details. 19 * 20 * You should have received a copy of the GNU General Public License along 21 * with the SPL. If not, see <http://www.gnu.org/licenses/>. 22 * 23 * Solaris Porting Layer (SPL) Task Queue Implementation. 24 */ 25 26 #include <sys/timer.h> 27 #include <sys/taskq.h> 28 #include <sys/kmem.h> 29 #include <sys/tsd.h> 30 #include <sys/trace_spl.h> 31 #ifdef HAVE_CPU_HOTPLUG 32 #include <linux/cpuhotplug.h> 33 #endif 34 35 int spl_taskq_thread_bind = 0; 36 module_param(spl_taskq_thread_bind, int, 0644); 37 MODULE_PARM_DESC(spl_taskq_thread_bind, "Bind taskq thread to CPU by default"); 38 39 40 int spl_taskq_thread_dynamic = 1; 41 module_param(spl_taskq_thread_dynamic, int, 0444); 42 MODULE_PARM_DESC(spl_taskq_thread_dynamic, "Allow dynamic taskq threads"); 43 44 int spl_taskq_thread_priority = 1; 45 module_param(spl_taskq_thread_priority, int, 0644); 46 MODULE_PARM_DESC(spl_taskq_thread_priority, 47 "Allow non-default priority for taskq threads"); 48 49 int spl_taskq_thread_sequential = 4; 50 module_param(spl_taskq_thread_sequential, int, 0644); 51 MODULE_PARM_DESC(spl_taskq_thread_sequential, 52 "Create new taskq threads after N sequential tasks"); 53 54 /* 55 * Global system-wide dynamic task queue available for all consumers. This 56 * taskq is not intended for long-running tasks; instead, a dedicated taskq 57 * should be created. 58 */ 59 taskq_t *system_taskq; 60 EXPORT_SYMBOL(system_taskq); 61 /* Global dynamic task queue for long delay */ 62 taskq_t *system_delay_taskq; 63 EXPORT_SYMBOL(system_delay_taskq); 64 65 /* Private dedicated taskq for creating new taskq threads on demand. */ 66 static taskq_t *dynamic_taskq; 67 static taskq_thread_t *taskq_thread_create(taskq_t *); 68 69 #ifdef HAVE_CPU_HOTPLUG 70 /* Multi-callback id for cpu hotplugging. */ 71 static int spl_taskq_cpuhp_state; 72 #endif 73 74 /* List of all taskqs */ 75 LIST_HEAD(tq_list); 76 struct rw_semaphore tq_list_sem; 77 static uint_t taskq_tsd; 78 79 static int 80 task_km_flags(uint_t flags) 81 { 82 if (flags & TQ_NOSLEEP) 83 return (KM_NOSLEEP); 84 85 if (flags & TQ_PUSHPAGE) 86 return (KM_PUSHPAGE); 87 88 return (KM_SLEEP); 89 } 90 91 /* 92 * taskq_find_by_name - Find the largest instance number of a named taskq. 93 */ 94 static int 95 taskq_find_by_name(const char *name) 96 { 97 struct list_head *tql = NULL; 98 taskq_t *tq; 99 100 list_for_each_prev(tql, &tq_list) { 101 tq = list_entry(tql, taskq_t, tq_taskqs); 102 if (strcmp(name, tq->tq_name) == 0) 103 return (tq->tq_instance); 104 } 105 return (-1); 106 } 107 108 /* 109 * NOTE: Must be called with tq->tq_lock held, returns a list_t which 110 * is not attached to the free, work, or pending taskq lists. 111 */ 112 static taskq_ent_t * 113 task_alloc(taskq_t *tq, uint_t flags, unsigned long *irqflags) 114 { 115 taskq_ent_t *t; 116 int count = 0; 117 118 ASSERT(tq); 119 retry: 120 /* Acquire taskq_ent_t's from free list if available */ 121 if (!list_empty(&tq->tq_free_list) && !(flags & TQ_NEW)) { 122 t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list); 123 124 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); 125 ASSERT(!(t->tqent_flags & TQENT_FLAG_CANCEL)); 126 ASSERT(!timer_pending(&t->tqent_timer)); 127 128 list_del_init(&t->tqent_list); 129 return (t); 130 } 131 132 /* Free list is empty and memory allocations are prohibited */ 133 if (flags & TQ_NOALLOC) 134 return (NULL); 135 136 /* Hit maximum taskq_ent_t pool size */ 137 if (tq->tq_nalloc >= tq->tq_maxalloc) { 138 if (flags & TQ_NOSLEEP) 139 return (NULL); 140 141 /* 142 * Sleep periodically polling the free list for an available 143 * taskq_ent_t. Dispatching with TQ_SLEEP should always succeed 144 * but we cannot block forever waiting for an taskq_ent_t to 145 * show up in the free list, otherwise a deadlock can happen. 146 * 147 * Therefore, we need to allocate a new task even if the number 148 * of allocated tasks is above tq->tq_maxalloc, but we still 149 * end up delaying the task allocation by one second, thereby 150 * throttling the task dispatch rate. 151 */ 152 spin_unlock_irqrestore(&tq->tq_lock, *irqflags); 153 schedule_timeout(HZ / 100); 154 spin_lock_irqsave_nested(&tq->tq_lock, *irqflags, 155 tq->tq_lock_class); 156 if (count < 100) { 157 count++; 158 goto retry; 159 } 160 } 161 162 spin_unlock_irqrestore(&tq->tq_lock, *irqflags); 163 t = kmem_alloc(sizeof (taskq_ent_t), task_km_flags(flags)); 164 spin_lock_irqsave_nested(&tq->tq_lock, *irqflags, tq->tq_lock_class); 165 166 if (t) { 167 taskq_init_ent(t); 168 tq->tq_nalloc++; 169 } 170 171 return (t); 172 } 173 174 /* 175 * NOTE: Must be called with tq->tq_lock held, expects the taskq_ent_t 176 * to already be removed from the free, work, or pending taskq lists. 177 */ 178 static void 179 task_free(taskq_t *tq, taskq_ent_t *t) 180 { 181 ASSERT(tq); 182 ASSERT(t); 183 ASSERT(list_empty(&t->tqent_list)); 184 ASSERT(!timer_pending(&t->tqent_timer)); 185 186 kmem_free(t, sizeof (taskq_ent_t)); 187 tq->tq_nalloc--; 188 } 189 190 /* 191 * NOTE: Must be called with tq->tq_lock held, either destroys the 192 * taskq_ent_t if too many exist or moves it to the free list for later use. 193 */ 194 static void 195 task_done(taskq_t *tq, taskq_ent_t *t) 196 { 197 ASSERT(tq); 198 ASSERT(t); 199 200 /* Wake tasks blocked in taskq_wait_id() */ 201 wake_up_all(&t->tqent_waitq); 202 203 list_del_init(&t->tqent_list); 204 205 if (tq->tq_nalloc <= tq->tq_minalloc) { 206 t->tqent_id = TASKQID_INVALID; 207 t->tqent_func = NULL; 208 t->tqent_arg = NULL; 209 t->tqent_flags = 0; 210 211 list_add_tail(&t->tqent_list, &tq->tq_free_list); 212 } else { 213 task_free(tq, t); 214 } 215 } 216 217 /* 218 * When a delayed task timer expires remove it from the delay list and 219 * add it to the priority list in order for immediate processing. 220 */ 221 static void 222 task_expire_impl(taskq_ent_t *t) 223 { 224 taskq_ent_t *w; 225 taskq_t *tq = t->tqent_taskq; 226 struct list_head *l = NULL; 227 unsigned long flags; 228 229 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); 230 231 if (t->tqent_flags & TQENT_FLAG_CANCEL) { 232 ASSERT(list_empty(&t->tqent_list)); 233 spin_unlock_irqrestore(&tq->tq_lock, flags); 234 return; 235 } 236 237 t->tqent_birth = jiffies; 238 DTRACE_PROBE1(taskq_ent__birth, taskq_ent_t *, t); 239 240 /* 241 * The priority list must be maintained in strict task id order 242 * from lowest to highest for lowest_id to be easily calculable. 243 */ 244 list_del(&t->tqent_list); 245 list_for_each_prev(l, &tq->tq_prio_list) { 246 w = list_entry(l, taskq_ent_t, tqent_list); 247 if (w->tqent_id < t->tqent_id) { 248 list_add(&t->tqent_list, l); 249 break; 250 } 251 } 252 if (l == &tq->tq_prio_list) 253 list_add(&t->tqent_list, &tq->tq_prio_list); 254 255 spin_unlock_irqrestore(&tq->tq_lock, flags); 256 257 wake_up(&tq->tq_work_waitq); 258 } 259 260 static void 261 task_expire(spl_timer_list_t tl) 262 { 263 struct timer_list *tmr = (struct timer_list *)tl; 264 taskq_ent_t *t = from_timer(t, tmr, tqent_timer); 265 task_expire_impl(t); 266 } 267 268 /* 269 * Returns the lowest incomplete taskqid_t. The taskqid_t may 270 * be queued on the pending list, on the priority list, on the 271 * delay list, or on the work list currently being handled, but 272 * it is not 100% complete yet. 273 */ 274 static taskqid_t 275 taskq_lowest_id(taskq_t *tq) 276 { 277 taskqid_t lowest_id = tq->tq_next_id; 278 taskq_ent_t *t; 279 taskq_thread_t *tqt; 280 281 if (!list_empty(&tq->tq_pend_list)) { 282 t = list_entry(tq->tq_pend_list.next, taskq_ent_t, tqent_list); 283 lowest_id = MIN(lowest_id, t->tqent_id); 284 } 285 286 if (!list_empty(&tq->tq_prio_list)) { 287 t = list_entry(tq->tq_prio_list.next, taskq_ent_t, tqent_list); 288 lowest_id = MIN(lowest_id, t->tqent_id); 289 } 290 291 if (!list_empty(&tq->tq_delay_list)) { 292 t = list_entry(tq->tq_delay_list.next, taskq_ent_t, tqent_list); 293 lowest_id = MIN(lowest_id, t->tqent_id); 294 } 295 296 if (!list_empty(&tq->tq_active_list)) { 297 tqt = list_entry(tq->tq_active_list.next, taskq_thread_t, 298 tqt_active_list); 299 ASSERT(tqt->tqt_id != TASKQID_INVALID); 300 lowest_id = MIN(lowest_id, tqt->tqt_id); 301 } 302 303 return (lowest_id); 304 } 305 306 /* 307 * Insert a task into a list keeping the list sorted by increasing taskqid. 308 */ 309 static void 310 taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt) 311 { 312 taskq_thread_t *w; 313 struct list_head *l = NULL; 314 315 ASSERT(tq); 316 ASSERT(tqt); 317 318 list_for_each_prev(l, &tq->tq_active_list) { 319 w = list_entry(l, taskq_thread_t, tqt_active_list); 320 if (w->tqt_id < tqt->tqt_id) { 321 list_add(&tqt->tqt_active_list, l); 322 break; 323 } 324 } 325 if (l == &tq->tq_active_list) 326 list_add(&tqt->tqt_active_list, &tq->tq_active_list); 327 } 328 329 /* 330 * Find and return a task from the given list if it exists. The list 331 * must be in lowest to highest task id order. 332 */ 333 static taskq_ent_t * 334 taskq_find_list(taskq_t *tq, struct list_head *lh, taskqid_t id) 335 { 336 struct list_head *l = NULL; 337 taskq_ent_t *t; 338 339 list_for_each(l, lh) { 340 t = list_entry(l, taskq_ent_t, tqent_list); 341 342 if (t->tqent_id == id) 343 return (t); 344 345 if (t->tqent_id > id) 346 break; 347 } 348 349 return (NULL); 350 } 351 352 /* 353 * Find an already dispatched task given the task id regardless of what 354 * state it is in. If a task is still pending it will be returned. 355 * If a task is executing, then -EBUSY will be returned instead. 356 * If the task has already been run then NULL is returned. 357 */ 358 static taskq_ent_t * 359 taskq_find(taskq_t *tq, taskqid_t id) 360 { 361 taskq_thread_t *tqt; 362 struct list_head *l = NULL; 363 taskq_ent_t *t; 364 365 t = taskq_find_list(tq, &tq->tq_delay_list, id); 366 if (t) 367 return (t); 368 369 t = taskq_find_list(tq, &tq->tq_prio_list, id); 370 if (t) 371 return (t); 372 373 t = taskq_find_list(tq, &tq->tq_pend_list, id); 374 if (t) 375 return (t); 376 377 list_for_each(l, &tq->tq_active_list) { 378 tqt = list_entry(l, taskq_thread_t, tqt_active_list); 379 if (tqt->tqt_id == id) { 380 /* 381 * Instead of returning tqt_task, we just return a non 382 * NULL value to prevent misuse, since tqt_task only 383 * has two valid fields. 384 */ 385 return (ERR_PTR(-EBUSY)); 386 } 387 } 388 389 return (NULL); 390 } 391 392 /* 393 * Theory for the taskq_wait_id(), taskq_wait_outstanding(), and 394 * taskq_wait() functions below. 395 * 396 * Taskq waiting is accomplished by tracking the lowest outstanding task 397 * id and the next available task id. As tasks are dispatched they are 398 * added to the tail of the pending, priority, or delay lists. As worker 399 * threads become available the tasks are removed from the heads of these 400 * lists and linked to the worker threads. This ensures the lists are 401 * kept sorted by lowest to highest task id. 402 * 403 * Therefore the lowest outstanding task id can be quickly determined by 404 * checking the head item from all of these lists. This value is stored 405 * with the taskq as the lowest id. It only needs to be recalculated when 406 * either the task with the current lowest id completes or is canceled. 407 * 408 * By blocking until the lowest task id exceeds the passed task id the 409 * taskq_wait_outstanding() function can be easily implemented. Similarly, 410 * by blocking until the lowest task id matches the next task id taskq_wait() 411 * can be implemented. 412 * 413 * Callers should be aware that when there are multiple worked threads it 414 * is possible for larger task ids to complete before smaller ones. Also 415 * when the taskq contains delay tasks with small task ids callers may 416 * block for a considerable length of time waiting for them to expire and 417 * execute. 418 */ 419 static int 420 taskq_wait_id_check(taskq_t *tq, taskqid_t id) 421 { 422 int rc; 423 unsigned long flags; 424 425 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); 426 rc = (taskq_find(tq, id) == NULL); 427 spin_unlock_irqrestore(&tq->tq_lock, flags); 428 429 return (rc); 430 } 431 432 /* 433 * The taskq_wait_id() function blocks until the passed task id completes. 434 * This does not guarantee that all lower task ids have completed. 435 */ 436 void 437 taskq_wait_id(taskq_t *tq, taskqid_t id) 438 { 439 wait_event(tq->tq_wait_waitq, taskq_wait_id_check(tq, id)); 440 } 441 EXPORT_SYMBOL(taskq_wait_id); 442 443 static int 444 taskq_wait_outstanding_check(taskq_t *tq, taskqid_t id) 445 { 446 int rc; 447 unsigned long flags; 448 449 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); 450 rc = (id < tq->tq_lowest_id); 451 spin_unlock_irqrestore(&tq->tq_lock, flags); 452 453 return (rc); 454 } 455 456 /* 457 * The taskq_wait_outstanding() function will block until all tasks with a 458 * lower taskqid than the passed 'id' have been completed. Note that all 459 * task id's are assigned monotonically at dispatch time. Zero may be 460 * passed for the id to indicate all tasks dispatch up to this point, 461 * but not after, should be waited for. 462 */ 463 void 464 taskq_wait_outstanding(taskq_t *tq, taskqid_t id) 465 { 466 id = id ? id : tq->tq_next_id - 1; 467 wait_event(tq->tq_wait_waitq, taskq_wait_outstanding_check(tq, id)); 468 } 469 EXPORT_SYMBOL(taskq_wait_outstanding); 470 471 static int 472 taskq_wait_check(taskq_t *tq) 473 { 474 int rc; 475 unsigned long flags; 476 477 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); 478 rc = (tq->tq_lowest_id == tq->tq_next_id); 479 spin_unlock_irqrestore(&tq->tq_lock, flags); 480 481 return (rc); 482 } 483 484 /* 485 * The taskq_wait() function will block until the taskq is empty. 486 * This means that if a taskq re-dispatches work to itself taskq_wait() 487 * callers will block indefinitely. 488 */ 489 void 490 taskq_wait(taskq_t *tq) 491 { 492 wait_event(tq->tq_wait_waitq, taskq_wait_check(tq)); 493 } 494 EXPORT_SYMBOL(taskq_wait); 495 496 int 497 taskq_member(taskq_t *tq, kthread_t *t) 498 { 499 return (tq == (taskq_t *)tsd_get_by_thread(taskq_tsd, t)); 500 } 501 EXPORT_SYMBOL(taskq_member); 502 503 taskq_t * 504 taskq_of_curthread(void) 505 { 506 return (tsd_get(taskq_tsd)); 507 } 508 EXPORT_SYMBOL(taskq_of_curthread); 509 510 /* 511 * Cancel an already dispatched task given the task id. Still pending tasks 512 * will be immediately canceled, and if the task is active the function will 513 * block until it completes. Preallocated tasks which are canceled must be 514 * freed by the caller. 515 */ 516 int 517 taskq_cancel_id(taskq_t *tq, taskqid_t id) 518 { 519 taskq_ent_t *t; 520 int rc = ENOENT; 521 unsigned long flags; 522 523 ASSERT(tq); 524 525 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); 526 t = taskq_find(tq, id); 527 if (t && t != ERR_PTR(-EBUSY)) { 528 list_del_init(&t->tqent_list); 529 t->tqent_flags |= TQENT_FLAG_CANCEL; 530 531 /* 532 * When canceling the lowest outstanding task id we 533 * must recalculate the new lowest outstanding id. 534 */ 535 if (tq->tq_lowest_id == t->tqent_id) { 536 tq->tq_lowest_id = taskq_lowest_id(tq); 537 ASSERT3S(tq->tq_lowest_id, >, t->tqent_id); 538 } 539 540 /* 541 * The task_expire() function takes the tq->tq_lock so drop 542 * drop the lock before synchronously cancelling the timer. 543 */ 544 if (timer_pending(&t->tqent_timer)) { 545 spin_unlock_irqrestore(&tq->tq_lock, flags); 546 del_timer_sync(&t->tqent_timer); 547 spin_lock_irqsave_nested(&tq->tq_lock, flags, 548 tq->tq_lock_class); 549 } 550 551 if (!(t->tqent_flags & TQENT_FLAG_PREALLOC)) 552 task_done(tq, t); 553 554 rc = 0; 555 } 556 spin_unlock_irqrestore(&tq->tq_lock, flags); 557 558 if (t == ERR_PTR(-EBUSY)) { 559 taskq_wait_id(tq, id); 560 rc = EBUSY; 561 } 562 563 return (rc); 564 } 565 EXPORT_SYMBOL(taskq_cancel_id); 566 567 static int taskq_thread_spawn(taskq_t *tq); 568 569 taskqid_t 570 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) 571 { 572 taskq_ent_t *t; 573 taskqid_t rc = TASKQID_INVALID; 574 unsigned long irqflags; 575 576 ASSERT(tq); 577 ASSERT(func); 578 579 spin_lock_irqsave_nested(&tq->tq_lock, irqflags, tq->tq_lock_class); 580 581 /* Taskq being destroyed and all tasks drained */ 582 if (!(tq->tq_flags & TASKQ_ACTIVE)) 583 goto out; 584 585 /* Do not queue the task unless there is idle thread for it */ 586 ASSERT(tq->tq_nactive <= tq->tq_nthreads); 587 if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads)) { 588 /* Dynamic taskq may be able to spawn another thread */ 589 if (!(tq->tq_flags & TASKQ_DYNAMIC) || 590 taskq_thread_spawn(tq) == 0) 591 goto out; 592 } 593 594 if ((t = task_alloc(tq, flags, &irqflags)) == NULL) 595 goto out; 596 597 spin_lock(&t->tqent_lock); 598 599 /* Queue to the front of the list to enforce TQ_NOQUEUE semantics */ 600 if (flags & TQ_NOQUEUE) 601 list_add(&t->tqent_list, &tq->tq_prio_list); 602 /* Queue to the priority list instead of the pending list */ 603 else if (flags & TQ_FRONT) 604 list_add_tail(&t->tqent_list, &tq->tq_prio_list); 605 else 606 list_add_tail(&t->tqent_list, &tq->tq_pend_list); 607 608 t->tqent_id = rc = tq->tq_next_id; 609 tq->tq_next_id++; 610 t->tqent_func = func; 611 t->tqent_arg = arg; 612 t->tqent_taskq = tq; 613 t->tqent_timer.function = NULL; 614 t->tqent_timer.expires = 0; 615 616 t->tqent_birth = jiffies; 617 DTRACE_PROBE1(taskq_ent__birth, taskq_ent_t *, t); 618 619 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); 620 621 spin_unlock(&t->tqent_lock); 622 623 wake_up(&tq->tq_work_waitq); 624 out: 625 /* Spawn additional taskq threads if required. */ 626 if (!(flags & TQ_NOQUEUE) && tq->tq_nactive == tq->tq_nthreads) 627 (void) taskq_thread_spawn(tq); 628 629 spin_unlock_irqrestore(&tq->tq_lock, irqflags); 630 return (rc); 631 } 632 EXPORT_SYMBOL(taskq_dispatch); 633 634 taskqid_t 635 taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg, 636 uint_t flags, clock_t expire_time) 637 { 638 taskqid_t rc = TASKQID_INVALID; 639 taskq_ent_t *t; 640 unsigned long irqflags; 641 642 ASSERT(tq); 643 ASSERT(func); 644 645 spin_lock_irqsave_nested(&tq->tq_lock, irqflags, tq->tq_lock_class); 646 647 /* Taskq being destroyed and all tasks drained */ 648 if (!(tq->tq_flags & TASKQ_ACTIVE)) 649 goto out; 650 651 if ((t = task_alloc(tq, flags, &irqflags)) == NULL) 652 goto out; 653 654 spin_lock(&t->tqent_lock); 655 656 /* Queue to the delay list for subsequent execution */ 657 list_add_tail(&t->tqent_list, &tq->tq_delay_list); 658 659 t->tqent_id = rc = tq->tq_next_id; 660 tq->tq_next_id++; 661 t->tqent_func = func; 662 t->tqent_arg = arg; 663 t->tqent_taskq = tq; 664 t->tqent_timer.function = task_expire; 665 t->tqent_timer.expires = (unsigned long)expire_time; 666 add_timer(&t->tqent_timer); 667 668 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); 669 670 spin_unlock(&t->tqent_lock); 671 out: 672 /* Spawn additional taskq threads if required. */ 673 if (tq->tq_nactive == tq->tq_nthreads) 674 (void) taskq_thread_spawn(tq); 675 spin_unlock_irqrestore(&tq->tq_lock, irqflags); 676 return (rc); 677 } 678 EXPORT_SYMBOL(taskq_dispatch_delay); 679 680 void 681 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, 682 taskq_ent_t *t) 683 { 684 unsigned long irqflags; 685 ASSERT(tq); 686 ASSERT(func); 687 688 spin_lock_irqsave_nested(&tq->tq_lock, irqflags, 689 tq->tq_lock_class); 690 691 /* Taskq being destroyed and all tasks drained */ 692 if (!(tq->tq_flags & TASKQ_ACTIVE)) { 693 t->tqent_id = TASKQID_INVALID; 694 goto out; 695 } 696 697 if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads)) { 698 /* Dynamic taskq may be able to spawn another thread */ 699 if (!(tq->tq_flags & TASKQ_DYNAMIC) || 700 taskq_thread_spawn(tq) == 0) 701 goto out2; 702 flags |= TQ_FRONT; 703 } 704 705 spin_lock(&t->tqent_lock); 706 707 /* 708 * Make sure the entry is not on some other taskq; it is important to 709 * ASSERT() under lock 710 */ 711 ASSERT(taskq_empty_ent(t)); 712 713 /* 714 * Mark it as a prealloc'd task. This is important 715 * to ensure that we don't free it later. 716 */ 717 t->tqent_flags |= TQENT_FLAG_PREALLOC; 718 719 /* Queue to the priority list instead of the pending list */ 720 if (flags & TQ_FRONT) 721 list_add_tail(&t->tqent_list, &tq->tq_prio_list); 722 else 723 list_add_tail(&t->tqent_list, &tq->tq_pend_list); 724 725 t->tqent_id = tq->tq_next_id; 726 tq->tq_next_id++; 727 t->tqent_func = func; 728 t->tqent_arg = arg; 729 t->tqent_taskq = tq; 730 731 t->tqent_birth = jiffies; 732 DTRACE_PROBE1(taskq_ent__birth, taskq_ent_t *, t); 733 734 spin_unlock(&t->tqent_lock); 735 736 wake_up(&tq->tq_work_waitq); 737 out: 738 /* Spawn additional taskq threads if required. */ 739 if (tq->tq_nactive == tq->tq_nthreads) 740 (void) taskq_thread_spawn(tq); 741 out2: 742 spin_unlock_irqrestore(&tq->tq_lock, irqflags); 743 } 744 EXPORT_SYMBOL(taskq_dispatch_ent); 745 746 int 747 taskq_empty_ent(taskq_ent_t *t) 748 { 749 return (list_empty(&t->tqent_list)); 750 } 751 EXPORT_SYMBOL(taskq_empty_ent); 752 753 void 754 taskq_init_ent(taskq_ent_t *t) 755 { 756 spin_lock_init(&t->tqent_lock); 757 init_waitqueue_head(&t->tqent_waitq); 758 timer_setup(&t->tqent_timer, NULL, 0); 759 INIT_LIST_HEAD(&t->tqent_list); 760 t->tqent_id = 0; 761 t->tqent_func = NULL; 762 t->tqent_arg = NULL; 763 t->tqent_flags = 0; 764 t->tqent_taskq = NULL; 765 } 766 EXPORT_SYMBOL(taskq_init_ent); 767 768 /* 769 * Return the next pending task, preference is given to tasks on the 770 * priority list which were dispatched with TQ_FRONT. 771 */ 772 static taskq_ent_t * 773 taskq_next_ent(taskq_t *tq) 774 { 775 struct list_head *list; 776 777 if (!list_empty(&tq->tq_prio_list)) 778 list = &tq->tq_prio_list; 779 else if (!list_empty(&tq->tq_pend_list)) 780 list = &tq->tq_pend_list; 781 else 782 return (NULL); 783 784 return (list_entry(list->next, taskq_ent_t, tqent_list)); 785 } 786 787 /* 788 * Spawns a new thread for the specified taskq. 789 */ 790 static void 791 taskq_thread_spawn_task(void *arg) 792 { 793 taskq_t *tq = (taskq_t *)arg; 794 unsigned long flags; 795 796 if (taskq_thread_create(tq) == NULL) { 797 /* restore spawning count if failed */ 798 spin_lock_irqsave_nested(&tq->tq_lock, flags, 799 tq->tq_lock_class); 800 tq->tq_nspawn--; 801 spin_unlock_irqrestore(&tq->tq_lock, flags); 802 } 803 } 804 805 /* 806 * Spawn addition threads for dynamic taskqs (TASKQ_DYNAMIC) the current 807 * number of threads is insufficient to handle the pending tasks. These 808 * new threads must be created by the dedicated dynamic_taskq to avoid 809 * deadlocks between thread creation and memory reclaim. The system_taskq 810 * which is also a dynamic taskq cannot be safely used for this. 811 */ 812 static int 813 taskq_thread_spawn(taskq_t *tq) 814 { 815 int spawning = 0; 816 817 if (!(tq->tq_flags & TASKQ_DYNAMIC)) 818 return (0); 819 820 if ((tq->tq_nthreads + tq->tq_nspawn < tq->tq_maxthreads) && 821 (tq->tq_flags & TASKQ_ACTIVE)) { 822 spawning = (++tq->tq_nspawn); 823 taskq_dispatch(dynamic_taskq, taskq_thread_spawn_task, 824 tq, TQ_NOSLEEP); 825 } 826 827 return (spawning); 828 } 829 830 /* 831 * Threads in a dynamic taskq should only exit once it has been completely 832 * drained and no other threads are actively servicing tasks. This prevents 833 * threads from being created and destroyed more than is required. 834 * 835 * The first thread is the thread list is treated as the primary thread. 836 * There is nothing special about the primary thread but in order to avoid 837 * all the taskq pids from changing we opt to make it long running. 838 */ 839 static int 840 taskq_thread_should_stop(taskq_t *tq, taskq_thread_t *tqt) 841 { 842 if (!(tq->tq_flags & TASKQ_DYNAMIC)) 843 return (0); 844 845 if (list_first_entry(&(tq->tq_thread_list), taskq_thread_t, 846 tqt_thread_list) == tqt) 847 return (0); 848 849 return 850 ((tq->tq_nspawn == 0) && /* No threads are being spawned */ 851 (tq->tq_nactive == 0) && /* No threads are handling tasks */ 852 (tq->tq_nthreads > 1) && /* More than 1 thread is running */ 853 (!taskq_next_ent(tq)) && /* There are no pending tasks */ 854 (spl_taskq_thread_dynamic)); /* Dynamic taskqs are allowed */ 855 } 856 857 static int 858 taskq_thread(void *args) 859 { 860 DECLARE_WAITQUEUE(wait, current); 861 sigset_t blocked; 862 taskq_thread_t *tqt = args; 863 taskq_t *tq; 864 taskq_ent_t *t; 865 int seq_tasks = 0; 866 unsigned long flags; 867 taskq_ent_t dup_task = {}; 868 869 ASSERT(tqt); 870 ASSERT(tqt->tqt_tq); 871 tq = tqt->tqt_tq; 872 current->flags |= PF_NOFREEZE; 873 874 (void) spl_fstrans_mark(); 875 876 sigfillset(&blocked); 877 sigprocmask(SIG_BLOCK, &blocked, NULL); 878 flush_signals(current); 879 880 tsd_set(taskq_tsd, tq); 881 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); 882 /* 883 * If we are dynamically spawned, decrease spawning count. Note that 884 * we could be created during taskq_create, in which case we shouldn't 885 * do the decrement. But it's fine because taskq_create will reset 886 * tq_nspawn later. 887 */ 888 if (tq->tq_flags & TASKQ_DYNAMIC) 889 tq->tq_nspawn--; 890 891 /* Immediately exit if more threads than allowed were created. */ 892 if (tq->tq_nthreads >= tq->tq_maxthreads) 893 goto error; 894 895 tq->tq_nthreads++; 896 list_add_tail(&tqt->tqt_thread_list, &tq->tq_thread_list); 897 wake_up(&tq->tq_wait_waitq); 898 set_current_state(TASK_INTERRUPTIBLE); 899 900 while (!kthread_should_stop()) { 901 902 if (list_empty(&tq->tq_pend_list) && 903 list_empty(&tq->tq_prio_list)) { 904 905 if (taskq_thread_should_stop(tq, tqt)) { 906 wake_up_all(&tq->tq_wait_waitq); 907 break; 908 } 909 910 add_wait_queue_exclusive(&tq->tq_work_waitq, &wait); 911 spin_unlock_irqrestore(&tq->tq_lock, flags); 912 913 schedule(); 914 seq_tasks = 0; 915 916 spin_lock_irqsave_nested(&tq->tq_lock, flags, 917 tq->tq_lock_class); 918 remove_wait_queue(&tq->tq_work_waitq, &wait); 919 } else { 920 __set_current_state(TASK_RUNNING); 921 } 922 923 if ((t = taskq_next_ent(tq)) != NULL) { 924 list_del_init(&t->tqent_list); 925 926 /* 927 * A TQENT_FLAG_PREALLOC task may be reused or freed 928 * during the task function call. Store tqent_id and 929 * tqent_flags here. 930 * 931 * Also use an on stack taskq_ent_t for tqt_task 932 * assignment in this case; we want to make sure 933 * to duplicate all fields, so the values are 934 * correct when it's accessed via DTRACE_PROBE*. 935 */ 936 tqt->tqt_id = t->tqent_id; 937 tqt->tqt_flags = t->tqent_flags; 938 939 if (t->tqent_flags & TQENT_FLAG_PREALLOC) { 940 dup_task = *t; 941 t = &dup_task; 942 } 943 tqt->tqt_task = t; 944 945 taskq_insert_in_order(tq, tqt); 946 tq->tq_nactive++; 947 spin_unlock_irqrestore(&tq->tq_lock, flags); 948 949 DTRACE_PROBE1(taskq_ent__start, taskq_ent_t *, t); 950 951 /* Perform the requested task */ 952 t->tqent_func(t->tqent_arg); 953 954 DTRACE_PROBE1(taskq_ent__finish, taskq_ent_t *, t); 955 956 spin_lock_irqsave_nested(&tq->tq_lock, flags, 957 tq->tq_lock_class); 958 tq->tq_nactive--; 959 list_del_init(&tqt->tqt_active_list); 960 tqt->tqt_task = NULL; 961 962 /* For prealloc'd tasks, we don't free anything. */ 963 if (!(tqt->tqt_flags & TQENT_FLAG_PREALLOC)) 964 task_done(tq, t); 965 966 /* 967 * When the current lowest outstanding taskqid is 968 * done calculate the new lowest outstanding id 969 */ 970 if (tq->tq_lowest_id == tqt->tqt_id) { 971 tq->tq_lowest_id = taskq_lowest_id(tq); 972 ASSERT3S(tq->tq_lowest_id, >, tqt->tqt_id); 973 } 974 975 /* Spawn additional taskq threads if required. */ 976 if ((++seq_tasks) > spl_taskq_thread_sequential && 977 taskq_thread_spawn(tq)) 978 seq_tasks = 0; 979 980 tqt->tqt_id = TASKQID_INVALID; 981 tqt->tqt_flags = 0; 982 wake_up_all(&tq->tq_wait_waitq); 983 } else { 984 if (taskq_thread_should_stop(tq, tqt)) 985 break; 986 } 987 988 set_current_state(TASK_INTERRUPTIBLE); 989 990 } 991 992 __set_current_state(TASK_RUNNING); 993 tq->tq_nthreads--; 994 list_del_init(&tqt->tqt_thread_list); 995 error: 996 kmem_free(tqt, sizeof (taskq_thread_t)); 997 spin_unlock_irqrestore(&tq->tq_lock, flags); 998 999 tsd_set(taskq_tsd, NULL); 1000 thread_exit(); 1001 1002 return (0); 1003 } 1004 1005 static taskq_thread_t * 1006 taskq_thread_create(taskq_t *tq) 1007 { 1008 static int last_used_cpu = 0; 1009 taskq_thread_t *tqt; 1010 1011 tqt = kmem_alloc(sizeof (*tqt), KM_PUSHPAGE); 1012 INIT_LIST_HEAD(&tqt->tqt_thread_list); 1013 INIT_LIST_HEAD(&tqt->tqt_active_list); 1014 tqt->tqt_tq = tq; 1015 tqt->tqt_id = TASKQID_INVALID; 1016 1017 tqt->tqt_thread = spl_kthread_create(taskq_thread, tqt, 1018 "%s", tq->tq_name); 1019 if (tqt->tqt_thread == NULL) { 1020 kmem_free(tqt, sizeof (taskq_thread_t)); 1021 return (NULL); 1022 } 1023 1024 if (spl_taskq_thread_bind) { 1025 last_used_cpu = (last_used_cpu + 1) % num_online_cpus(); 1026 kthread_bind(tqt->tqt_thread, last_used_cpu); 1027 } 1028 1029 if (spl_taskq_thread_priority) 1030 set_user_nice(tqt->tqt_thread, PRIO_TO_NICE(tq->tq_pri)); 1031 1032 wake_up_process(tqt->tqt_thread); 1033 1034 return (tqt); 1035 } 1036 1037 taskq_t * 1038 taskq_create(const char *name, int threads_arg, pri_t pri, 1039 int minalloc, int maxalloc, uint_t flags) 1040 { 1041 taskq_t *tq; 1042 taskq_thread_t *tqt; 1043 int count = 0, rc = 0, i; 1044 unsigned long irqflags; 1045 int nthreads = threads_arg; 1046 1047 ASSERT(name != NULL); 1048 ASSERT(minalloc >= 0); 1049 ASSERT(maxalloc <= INT_MAX); 1050 ASSERT(!(flags & (TASKQ_CPR_SAFE))); /* Unsupported */ 1051 1052 /* Scale the number of threads using nthreads as a percentage */ 1053 if (flags & TASKQ_THREADS_CPU_PCT) { 1054 ASSERT(nthreads <= 100); 1055 ASSERT(nthreads >= 0); 1056 nthreads = MIN(threads_arg, 100); 1057 nthreads = MAX(nthreads, 0); 1058 nthreads = MAX((num_online_cpus() * nthreads) /100, 1); 1059 } 1060 1061 tq = kmem_alloc(sizeof (*tq), KM_PUSHPAGE); 1062 if (tq == NULL) 1063 return (NULL); 1064 1065 tq->tq_hp_support = B_FALSE; 1066 #ifdef HAVE_CPU_HOTPLUG 1067 if (flags & TASKQ_THREADS_CPU_PCT) { 1068 tq->tq_hp_support = B_TRUE; 1069 if (cpuhp_state_add_instance_nocalls(spl_taskq_cpuhp_state, 1070 &tq->tq_hp_cb_node) != 0) { 1071 kmem_free(tq, sizeof (*tq)); 1072 return (NULL); 1073 } 1074 } 1075 #endif 1076 1077 spin_lock_init(&tq->tq_lock); 1078 INIT_LIST_HEAD(&tq->tq_thread_list); 1079 INIT_LIST_HEAD(&tq->tq_active_list); 1080 tq->tq_name = kmem_strdup(name); 1081 tq->tq_nactive = 0; 1082 tq->tq_nthreads = 0; 1083 tq->tq_nspawn = 0; 1084 tq->tq_maxthreads = nthreads; 1085 tq->tq_cpu_pct = threads_arg; 1086 tq->tq_pri = pri; 1087 tq->tq_minalloc = minalloc; 1088 tq->tq_maxalloc = maxalloc; 1089 tq->tq_nalloc = 0; 1090 tq->tq_flags = (flags | TASKQ_ACTIVE); 1091 tq->tq_next_id = TASKQID_INITIAL; 1092 tq->tq_lowest_id = TASKQID_INITIAL; 1093 INIT_LIST_HEAD(&tq->tq_free_list); 1094 INIT_LIST_HEAD(&tq->tq_pend_list); 1095 INIT_LIST_HEAD(&tq->tq_prio_list); 1096 INIT_LIST_HEAD(&tq->tq_delay_list); 1097 init_waitqueue_head(&tq->tq_work_waitq); 1098 init_waitqueue_head(&tq->tq_wait_waitq); 1099 tq->tq_lock_class = TQ_LOCK_GENERAL; 1100 INIT_LIST_HEAD(&tq->tq_taskqs); 1101 1102 if (flags & TASKQ_PREPOPULATE) { 1103 spin_lock_irqsave_nested(&tq->tq_lock, irqflags, 1104 tq->tq_lock_class); 1105 1106 for (i = 0; i < minalloc; i++) 1107 task_done(tq, task_alloc(tq, TQ_PUSHPAGE | TQ_NEW, 1108 &irqflags)); 1109 1110 spin_unlock_irqrestore(&tq->tq_lock, irqflags); 1111 } 1112 1113 if ((flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic) 1114 nthreads = 1; 1115 1116 for (i = 0; i < nthreads; i++) { 1117 tqt = taskq_thread_create(tq); 1118 if (tqt == NULL) 1119 rc = 1; 1120 else 1121 count++; 1122 } 1123 1124 /* Wait for all threads to be started before potential destroy */ 1125 wait_event(tq->tq_wait_waitq, tq->tq_nthreads == count); 1126 /* 1127 * taskq_thread might have touched nspawn, but we don't want them to 1128 * because they're not dynamically spawned. So we reset it to 0 1129 */ 1130 tq->tq_nspawn = 0; 1131 1132 if (rc) { 1133 taskq_destroy(tq); 1134 tq = NULL; 1135 } else { 1136 down_write(&tq_list_sem); 1137 tq->tq_instance = taskq_find_by_name(name) + 1; 1138 list_add_tail(&tq->tq_taskqs, &tq_list); 1139 up_write(&tq_list_sem); 1140 } 1141 1142 return (tq); 1143 } 1144 EXPORT_SYMBOL(taskq_create); 1145 1146 void 1147 taskq_destroy(taskq_t *tq) 1148 { 1149 struct task_struct *thread; 1150 taskq_thread_t *tqt; 1151 taskq_ent_t *t; 1152 unsigned long flags; 1153 1154 ASSERT(tq); 1155 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); 1156 tq->tq_flags &= ~TASKQ_ACTIVE; 1157 spin_unlock_irqrestore(&tq->tq_lock, flags); 1158 1159 #ifdef HAVE_CPU_HOTPLUG 1160 if (tq->tq_hp_support) { 1161 VERIFY0(cpuhp_state_remove_instance_nocalls( 1162 spl_taskq_cpuhp_state, &tq->tq_hp_cb_node)); 1163 } 1164 #endif 1165 /* 1166 * When TASKQ_ACTIVE is clear new tasks may not be added nor may 1167 * new worker threads be spawned for dynamic taskq. 1168 */ 1169 if (dynamic_taskq != NULL) 1170 taskq_wait_outstanding(dynamic_taskq, 0); 1171 1172 taskq_wait(tq); 1173 1174 /* remove taskq from global list used by the kstats */ 1175 down_write(&tq_list_sem); 1176 list_del(&tq->tq_taskqs); 1177 up_write(&tq_list_sem); 1178 1179 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); 1180 /* wait for spawning threads to insert themselves to the list */ 1181 while (tq->tq_nspawn) { 1182 spin_unlock_irqrestore(&tq->tq_lock, flags); 1183 schedule_timeout_interruptible(1); 1184 spin_lock_irqsave_nested(&tq->tq_lock, flags, 1185 tq->tq_lock_class); 1186 } 1187 1188 /* 1189 * Signal each thread to exit and block until it does. Each thread 1190 * is responsible for removing itself from the list and freeing its 1191 * taskq_thread_t. This allows for idle threads to opt to remove 1192 * themselves from the taskq. They can be recreated as needed. 1193 */ 1194 while (!list_empty(&tq->tq_thread_list)) { 1195 tqt = list_entry(tq->tq_thread_list.next, 1196 taskq_thread_t, tqt_thread_list); 1197 thread = tqt->tqt_thread; 1198 spin_unlock_irqrestore(&tq->tq_lock, flags); 1199 1200 kthread_stop(thread); 1201 1202 spin_lock_irqsave_nested(&tq->tq_lock, flags, 1203 tq->tq_lock_class); 1204 } 1205 1206 while (!list_empty(&tq->tq_free_list)) { 1207 t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list); 1208 1209 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); 1210 1211 list_del_init(&t->tqent_list); 1212 task_free(tq, t); 1213 } 1214 1215 ASSERT0(tq->tq_nthreads); 1216 ASSERT0(tq->tq_nalloc); 1217 ASSERT0(tq->tq_nspawn); 1218 ASSERT(list_empty(&tq->tq_thread_list)); 1219 ASSERT(list_empty(&tq->tq_active_list)); 1220 ASSERT(list_empty(&tq->tq_free_list)); 1221 ASSERT(list_empty(&tq->tq_pend_list)); 1222 ASSERT(list_empty(&tq->tq_prio_list)); 1223 ASSERT(list_empty(&tq->tq_delay_list)); 1224 1225 spin_unlock_irqrestore(&tq->tq_lock, flags); 1226 1227 kmem_strfree(tq->tq_name); 1228 kmem_free(tq, sizeof (taskq_t)); 1229 } 1230 EXPORT_SYMBOL(taskq_destroy); 1231 1232 static unsigned int spl_taskq_kick = 0; 1233 1234 /* 1235 * 2.6.36 API Change 1236 * module_param_cb is introduced to take kernel_param_ops and 1237 * module_param_call is marked as obsolete. Also set and get operations 1238 * were changed to take a 'const struct kernel_param *'. 1239 */ 1240 static int 1241 #ifdef module_param_cb 1242 param_set_taskq_kick(const char *val, const struct kernel_param *kp) 1243 #else 1244 param_set_taskq_kick(const char *val, struct kernel_param *kp) 1245 #endif 1246 { 1247 int ret; 1248 taskq_t *tq = NULL; 1249 taskq_ent_t *t; 1250 unsigned long flags; 1251 1252 ret = param_set_uint(val, kp); 1253 if (ret < 0 || !spl_taskq_kick) 1254 return (ret); 1255 /* reset value */ 1256 spl_taskq_kick = 0; 1257 1258 down_read(&tq_list_sem); 1259 list_for_each_entry(tq, &tq_list, tq_taskqs) { 1260 spin_lock_irqsave_nested(&tq->tq_lock, flags, 1261 tq->tq_lock_class); 1262 /* Check if the first pending is older than 5 seconds */ 1263 t = taskq_next_ent(tq); 1264 if (t && time_after(jiffies, t->tqent_birth + 5*HZ)) { 1265 (void) taskq_thread_spawn(tq); 1266 printk(KERN_INFO "spl: Kicked taskq %s/%d\n", 1267 tq->tq_name, tq->tq_instance); 1268 } 1269 spin_unlock_irqrestore(&tq->tq_lock, flags); 1270 } 1271 up_read(&tq_list_sem); 1272 return (ret); 1273 } 1274 1275 #ifdef module_param_cb 1276 static const struct kernel_param_ops param_ops_taskq_kick = { 1277 .set = param_set_taskq_kick, 1278 .get = param_get_uint, 1279 }; 1280 module_param_cb(spl_taskq_kick, ¶m_ops_taskq_kick, &spl_taskq_kick, 0644); 1281 #else 1282 module_param_call(spl_taskq_kick, param_set_taskq_kick, param_get_uint, 1283 &spl_taskq_kick, 0644); 1284 #endif 1285 MODULE_PARM_DESC(spl_taskq_kick, 1286 "Write nonzero to kick stuck taskqs to spawn more threads"); 1287 1288 #ifdef HAVE_CPU_HOTPLUG 1289 /* 1290 * This callback will be called exactly once for each core that comes online, 1291 * for each dynamic taskq. We attempt to expand taskqs that have 1292 * TASKQ_THREADS_CPU_PCT set. We need to redo the percentage calculation every 1293 * time, to correctly determine whether or not to add a thread. 1294 */ 1295 static int 1296 spl_taskq_expand(unsigned int cpu, struct hlist_node *node) 1297 { 1298 taskq_t *tq = list_entry(node, taskq_t, tq_hp_cb_node); 1299 unsigned long flags; 1300 int err = 0; 1301 1302 ASSERT(tq); 1303 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); 1304 1305 if (!(tq->tq_flags & TASKQ_ACTIVE)) { 1306 spin_unlock_irqrestore(&tq->tq_lock, flags); 1307 return (err); 1308 } 1309 1310 ASSERT(tq->tq_flags & TASKQ_THREADS_CPU_PCT); 1311 int nthreads = MIN(tq->tq_cpu_pct, 100); 1312 nthreads = MAX(((num_online_cpus() + 1) * nthreads) / 100, 1); 1313 tq->tq_maxthreads = nthreads; 1314 1315 if (!((tq->tq_flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic) && 1316 tq->tq_maxthreads > tq->tq_nthreads) { 1317 spin_unlock_irqrestore(&tq->tq_lock, flags); 1318 taskq_thread_t *tqt = taskq_thread_create(tq); 1319 if (tqt == NULL) 1320 err = -1; 1321 return (err); 1322 } 1323 spin_unlock_irqrestore(&tq->tq_lock, flags); 1324 return (err); 1325 } 1326 1327 /* 1328 * While we don't support offlining CPUs, it is possible that CPUs will fail 1329 * to online successfully. We do need to be able to handle this case 1330 * gracefully. 1331 */ 1332 static int 1333 spl_taskq_prepare_down(unsigned int cpu, struct hlist_node *node) 1334 { 1335 taskq_t *tq = list_entry(node, taskq_t, tq_hp_cb_node); 1336 unsigned long flags; 1337 1338 ASSERT(tq); 1339 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); 1340 1341 if (!(tq->tq_flags & TASKQ_ACTIVE)) 1342 goto out; 1343 1344 ASSERT(tq->tq_flags & TASKQ_THREADS_CPU_PCT); 1345 int nthreads = MIN(tq->tq_cpu_pct, 100); 1346 nthreads = MAX(((num_online_cpus()) * nthreads) / 100, 1); 1347 tq->tq_maxthreads = nthreads; 1348 1349 if (!((tq->tq_flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic) && 1350 tq->tq_maxthreads < tq->tq_nthreads) { 1351 ASSERT3U(tq->tq_maxthreads, ==, tq->tq_nthreads - 1); 1352 taskq_thread_t *tqt = list_entry(tq->tq_thread_list.next, 1353 taskq_thread_t, tqt_thread_list); 1354 struct task_struct *thread = tqt->tqt_thread; 1355 spin_unlock_irqrestore(&tq->tq_lock, flags); 1356 1357 kthread_stop(thread); 1358 1359 return (0); 1360 } 1361 1362 out: 1363 spin_unlock_irqrestore(&tq->tq_lock, flags); 1364 return (0); 1365 } 1366 #endif 1367 1368 int 1369 spl_taskq_init(void) 1370 { 1371 init_rwsem(&tq_list_sem); 1372 tsd_create(&taskq_tsd, NULL); 1373 1374 #ifdef HAVE_CPU_HOTPLUG 1375 spl_taskq_cpuhp_state = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, 1376 "fs/spl_taskq:online", spl_taskq_expand, spl_taskq_prepare_down); 1377 #endif 1378 1379 system_taskq = taskq_create("spl_system_taskq", MAX(boot_ncpus, 64), 1380 maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE|TASKQ_DYNAMIC); 1381 if (system_taskq == NULL) 1382 return (1); 1383 1384 system_delay_taskq = taskq_create("spl_delay_taskq", MAX(boot_ncpus, 4), 1385 maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE|TASKQ_DYNAMIC); 1386 if (system_delay_taskq == NULL) { 1387 #ifdef HAVE_CPU_HOTPLUG 1388 cpuhp_remove_multi_state(spl_taskq_cpuhp_state); 1389 #endif 1390 taskq_destroy(system_taskq); 1391 return (1); 1392 } 1393 1394 dynamic_taskq = taskq_create("spl_dynamic_taskq", 1, 1395 maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE); 1396 if (dynamic_taskq == NULL) { 1397 #ifdef HAVE_CPU_HOTPLUG 1398 cpuhp_remove_multi_state(spl_taskq_cpuhp_state); 1399 #endif 1400 taskq_destroy(system_taskq); 1401 taskq_destroy(system_delay_taskq); 1402 return (1); 1403 } 1404 1405 /* 1406 * This is used to annotate tq_lock, so 1407 * taskq_dispatch -> taskq_thread_spawn -> taskq_dispatch 1408 * does not trigger a lockdep warning re: possible recursive locking 1409 */ 1410 dynamic_taskq->tq_lock_class = TQ_LOCK_DYNAMIC; 1411 1412 return (0); 1413 } 1414 1415 void 1416 spl_taskq_fini(void) 1417 { 1418 taskq_destroy(dynamic_taskq); 1419 dynamic_taskq = NULL; 1420 1421 taskq_destroy(system_delay_taskq); 1422 system_delay_taskq = NULL; 1423 1424 taskq_destroy(system_taskq); 1425 system_taskq = NULL; 1426 1427 tsd_destroy(&taskq_tsd); 1428 1429 #ifdef HAVE_CPU_HOTPLUG 1430 cpuhp_remove_multi_state(spl_taskq_cpuhp_state); 1431 spl_taskq_cpuhp_state = 0; 1432 #endif 1433 } 1434