1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Basic worker thread pool for io_uring 4 * 5 * Copyright (C) 2019 Jens Axboe 6 * 7 */ 8 #include <linux/kernel.h> 9 #include <linux/init.h> 10 #include <linux/errno.h> 11 #include <linux/sched/signal.h> 12 #include <linux/percpu.h> 13 #include <linux/slab.h> 14 #include <linux/rculist_nulls.h> 15 #include <linux/cpu.h> 16 #include <linux/cpuset.h> 17 #include <linux/task_work.h> 18 #include <linux/audit.h> 19 #include <linux/mmu_context.h> 20 #include <uapi/linux/io_uring.h> 21 22 #include "io-wq.h" 23 #include "slist.h" 24 #include "io_uring.h" 25 26 #define WORKER_IDLE_TIMEOUT (5 * HZ) 27 #define WORKER_INIT_LIMIT 3 28 29 enum { 30 IO_WORKER_F_UP = 0, /* up and active */ 31 IO_WORKER_F_RUNNING = 1, /* account as running */ 32 IO_WORKER_F_FREE = 2, /* worker on free list */ 33 }; 34 35 enum { 36 IO_WQ_BIT_EXIT = 0, /* wq exiting */ 37 }; 38 39 enum { 40 IO_ACCT_STALLED_BIT = 0, /* stalled on hash */ 41 }; 42 43 /* 44 * One for each thread in a wq pool 45 */ 46 struct io_worker { 47 refcount_t ref; 48 unsigned long flags; 49 struct hlist_nulls_node nulls_node; 50 struct list_head all_list; 51 struct task_struct *task; 52 struct io_wq *wq; 53 struct io_wq_acct *acct; 54 55 struct io_wq_work *cur_work; 56 raw_spinlock_t lock; 57 58 struct completion ref_done; 59 60 unsigned long create_state; 61 struct callback_head create_work; 62 int init_retries; 63 64 union { 65 struct rcu_head rcu; 66 struct delayed_work work; 67 }; 68 }; 69 70 #if BITS_PER_LONG == 64 71 #define IO_WQ_HASH_ORDER 6 72 #else 73 #define IO_WQ_HASH_ORDER 5 74 #endif 75 76 #define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER) 77 78 struct io_wq_acct { 79 /** 80 * Protects access to the worker lists. 81 */ 82 raw_spinlock_t workers_lock; 83 84 unsigned nr_workers; 85 unsigned max_workers; 86 atomic_t nr_running; 87 88 /** 89 * The list of free workers. Protected by #workers_lock 90 * (write) and RCU (read). 91 */ 92 struct hlist_nulls_head free_list; 93 94 /** 95 * The list of all workers. Protected by #workers_lock 96 * (write) and RCU (read). 97 */ 98 struct list_head all_list; 99 100 raw_spinlock_t lock; 101 struct io_wq_work_list work_list; 102 unsigned long flags; 103 }; 104 105 enum { 106 IO_WQ_ACCT_BOUND, 107 IO_WQ_ACCT_UNBOUND, 108 IO_WQ_ACCT_NR, 109 }; 110 111 /* 112 * Per io_wq state 113 */ 114 struct io_wq { 115 unsigned long state; 116 117 struct io_wq_hash *hash; 118 119 atomic_t worker_refs; 120 struct completion worker_done; 121 122 struct hlist_node cpuhp_node; 123 124 struct task_struct *task; 125 126 struct io_wq_acct acct[IO_WQ_ACCT_NR]; 127 128 struct wait_queue_entry wait; 129 130 struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS]; 131 132 cpumask_var_t cpu_mask; 133 }; 134 135 static enum cpuhp_state io_wq_online; 136 137 struct io_cb_cancel_data { 138 work_cancel_fn *fn; 139 void *data; 140 int nr_running; 141 int nr_pending; 142 bool cancel_all; 143 }; 144 145 static bool create_io_worker(struct io_wq *wq, struct io_wq_acct *acct); 146 static void io_wq_dec_running(struct io_worker *worker); 147 static bool io_acct_cancel_pending_work(struct io_wq *wq, 148 struct io_wq_acct *acct, 149 struct io_cb_cancel_data *match); 150 static void create_worker_cb(struct callback_head *cb); 151 static void io_wq_cancel_tw_create(struct io_wq *wq); 152 153 static inline unsigned int __io_get_work_hash(unsigned int work_flags) 154 { 155 return work_flags >> IO_WQ_HASH_SHIFT; 156 } 157 158 static inline unsigned int io_get_work_hash(struct io_wq_work *work) 159 { 160 return __io_get_work_hash(atomic_read(&work->flags)); 161 } 162 163 static bool io_worker_get(struct io_worker *worker) 164 { 165 return refcount_inc_not_zero(&worker->ref); 166 } 167 168 static void io_worker_release(struct io_worker *worker) 169 { 170 if (refcount_dec_and_test(&worker->ref)) 171 complete(&worker->ref_done); 172 } 173 174 static inline struct io_wq_acct *io_get_acct(struct io_wq *wq, bool bound) 175 { 176 return &wq->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND]; 177 } 178 179 static inline struct io_wq_acct *io_work_get_acct(struct io_wq *wq, 180 unsigned int work_flags) 181 { 182 return io_get_acct(wq, !(work_flags & IO_WQ_WORK_UNBOUND)); 183 } 184 185 static inline struct io_wq_acct *io_wq_get_acct(struct io_worker *worker) 186 { 187 return worker->acct; 188 } 189 190 static void io_worker_ref_put(struct io_wq *wq) 191 { 192 if (atomic_dec_and_test(&wq->worker_refs)) 193 complete(&wq->worker_done); 194 } 195 196 bool io_wq_worker_stopped(void) 197 { 198 struct io_worker *worker = current->worker_private; 199 200 if (WARN_ON_ONCE(!io_wq_current_is_worker())) 201 return true; 202 203 return test_bit(IO_WQ_BIT_EXIT, &worker->wq->state); 204 } 205 206 static void io_worker_cancel_cb(struct io_worker *worker) 207 { 208 struct io_wq_acct *acct = io_wq_get_acct(worker); 209 struct io_wq *wq = worker->wq; 210 211 atomic_dec(&acct->nr_running); 212 raw_spin_lock(&acct->workers_lock); 213 acct->nr_workers--; 214 raw_spin_unlock(&acct->workers_lock); 215 io_worker_ref_put(wq); 216 clear_bit_unlock(0, &worker->create_state); 217 io_worker_release(worker); 218 } 219 220 static bool io_task_worker_match(struct callback_head *cb, void *data) 221 { 222 struct io_worker *worker; 223 224 if (cb->func != create_worker_cb) 225 return false; 226 worker = container_of(cb, struct io_worker, create_work); 227 return worker == data; 228 } 229 230 static void io_worker_exit(struct io_worker *worker) 231 { 232 struct io_wq *wq = worker->wq; 233 struct io_wq_acct *acct = io_wq_get_acct(worker); 234 235 while (1) { 236 struct callback_head *cb = task_work_cancel_match(wq->task, 237 io_task_worker_match, worker); 238 239 if (!cb) 240 break; 241 io_worker_cancel_cb(worker); 242 } 243 244 io_worker_release(worker); 245 wait_for_completion(&worker->ref_done); 246 247 raw_spin_lock(&acct->workers_lock); 248 if (test_bit(IO_WORKER_F_FREE, &worker->flags)) 249 hlist_nulls_del_rcu(&worker->nulls_node); 250 list_del_rcu(&worker->all_list); 251 raw_spin_unlock(&acct->workers_lock); 252 io_wq_dec_running(worker); 253 /* 254 * this worker is a goner, clear ->worker_private to avoid any 255 * inc/dec running calls that could happen as part of exit from 256 * touching 'worker'. 257 */ 258 current->worker_private = NULL; 259 260 kfree_rcu(worker, rcu); 261 io_worker_ref_put(wq); 262 do_exit(0); 263 } 264 265 static inline bool __io_acct_run_queue(struct io_wq_acct *acct) 266 { 267 return !test_bit(IO_ACCT_STALLED_BIT, &acct->flags) && 268 !wq_list_empty(&acct->work_list); 269 } 270 271 /* 272 * If there's work to do, returns true with acct->lock acquired. If not, 273 * returns false with no lock held. 274 */ 275 static inline bool io_acct_run_queue(struct io_wq_acct *acct) 276 __acquires(&acct->lock) 277 { 278 raw_spin_lock(&acct->lock); 279 if (__io_acct_run_queue(acct)) 280 return true; 281 282 raw_spin_unlock(&acct->lock); 283 return false; 284 } 285 286 /* 287 * Check head of free list for an available worker. If one isn't available, 288 * caller must create one. 289 */ 290 static bool io_acct_activate_free_worker(struct io_wq_acct *acct) 291 __must_hold(RCU) 292 { 293 struct hlist_nulls_node *n; 294 struct io_worker *worker; 295 296 /* 297 * Iterate free_list and see if we can find an idle worker to 298 * activate. If a given worker is on the free_list but in the process 299 * of exiting, keep trying. 300 */ 301 hlist_nulls_for_each_entry_rcu(worker, n, &acct->free_list, nulls_node) { 302 if (!io_worker_get(worker)) 303 continue; 304 /* 305 * If the worker is already running, it's either already 306 * starting work or finishing work. In either case, if it does 307 * to go sleep, we'll kick off a new task for this work anyway. 308 */ 309 wake_up_process(worker->task); 310 io_worker_release(worker); 311 return true; 312 } 313 314 return false; 315 } 316 317 /* 318 * We need a worker. If we find a free one, we're good. If not, and we're 319 * below the max number of workers, create one. 320 */ 321 static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct) 322 { 323 /* 324 * Most likely an attempt to queue unbounded work on an io_wq that 325 * wasn't setup with any unbounded workers. 326 */ 327 if (unlikely(!acct->max_workers)) 328 pr_warn_once("io-wq is not configured for unbound workers"); 329 330 raw_spin_lock(&acct->workers_lock); 331 if (acct->nr_workers >= acct->max_workers) { 332 raw_spin_unlock(&acct->workers_lock); 333 return true; 334 } 335 acct->nr_workers++; 336 raw_spin_unlock(&acct->workers_lock); 337 atomic_inc(&acct->nr_running); 338 atomic_inc(&wq->worker_refs); 339 return create_io_worker(wq, acct); 340 } 341 342 static void io_wq_inc_running(struct io_worker *worker) 343 { 344 struct io_wq_acct *acct = io_wq_get_acct(worker); 345 346 atomic_inc(&acct->nr_running); 347 } 348 349 static void create_worker_cb(struct callback_head *cb) 350 { 351 struct io_worker *worker; 352 struct io_wq *wq; 353 354 struct io_wq_acct *acct; 355 bool activated_free_worker, do_create = false; 356 357 worker = container_of(cb, struct io_worker, create_work); 358 wq = worker->wq; 359 acct = worker->acct; 360 361 rcu_read_lock(); 362 activated_free_worker = io_acct_activate_free_worker(acct); 363 rcu_read_unlock(); 364 if (activated_free_worker) 365 goto no_need_create; 366 367 raw_spin_lock(&acct->workers_lock); 368 369 if (acct->nr_workers < acct->max_workers) { 370 acct->nr_workers++; 371 do_create = true; 372 } 373 raw_spin_unlock(&acct->workers_lock); 374 if (do_create) { 375 create_io_worker(wq, acct); 376 } else { 377 no_need_create: 378 atomic_dec(&acct->nr_running); 379 io_worker_ref_put(wq); 380 } 381 clear_bit_unlock(0, &worker->create_state); 382 io_worker_release(worker); 383 } 384 385 static bool io_queue_worker_create(struct io_worker *worker, 386 struct io_wq_acct *acct, 387 task_work_func_t func) 388 { 389 struct io_wq *wq = worker->wq; 390 391 /* raced with exit, just ignore create call */ 392 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) 393 goto fail; 394 if (!io_worker_get(worker)) 395 goto fail; 396 /* 397 * create_state manages ownership of create_work/index. We should 398 * only need one entry per worker, as the worker going to sleep 399 * will trigger the condition, and waking will clear it once it 400 * runs the task_work. 401 */ 402 if (test_bit(0, &worker->create_state) || 403 test_and_set_bit_lock(0, &worker->create_state)) 404 goto fail_release; 405 406 atomic_inc(&wq->worker_refs); 407 init_task_work(&worker->create_work, func); 408 if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) { 409 /* 410 * EXIT may have been set after checking it above, check after 411 * adding the task_work and remove any creation item if it is 412 * now set. wq exit does that too, but we can have added this 413 * work item after we canceled in io_wq_exit_workers(). 414 */ 415 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) 416 io_wq_cancel_tw_create(wq); 417 io_worker_ref_put(wq); 418 return true; 419 } 420 io_worker_ref_put(wq); 421 clear_bit_unlock(0, &worker->create_state); 422 fail_release: 423 io_worker_release(worker); 424 fail: 425 atomic_dec(&acct->nr_running); 426 io_worker_ref_put(wq); 427 return false; 428 } 429 430 /* Defer if current and next work are both hashed to the same chain */ 431 static bool io_wq_hash_defer(struct io_wq_work *work, struct io_wq_acct *acct) 432 { 433 unsigned int hash, work_flags; 434 struct io_wq_work *next; 435 436 lockdep_assert_held(&acct->lock); 437 438 work_flags = atomic_read(&work->flags); 439 if (!__io_wq_is_hashed(work_flags)) 440 return false; 441 442 /* should not happen, io_acct_run_queue() said we had work */ 443 if (wq_list_empty(&acct->work_list)) 444 return true; 445 446 hash = __io_get_work_hash(work_flags); 447 next = container_of(acct->work_list.first, struct io_wq_work, list); 448 work_flags = atomic_read(&next->flags); 449 if (!__io_wq_is_hashed(work_flags)) 450 return false; 451 return hash == __io_get_work_hash(work_flags); 452 } 453 454 static void io_wq_dec_running(struct io_worker *worker) 455 { 456 struct io_wq_acct *acct = io_wq_get_acct(worker); 457 struct io_wq *wq = worker->wq; 458 459 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 460 return; 461 462 if (!atomic_dec_and_test(&acct->nr_running)) 463 return; 464 if (!worker->cur_work) 465 return; 466 if (!io_acct_run_queue(acct)) 467 return; 468 if (io_wq_hash_defer(worker->cur_work, acct)) { 469 raw_spin_unlock(&acct->lock); 470 return; 471 } 472 473 raw_spin_unlock(&acct->lock); 474 atomic_inc(&acct->nr_running); 475 atomic_inc(&wq->worker_refs); 476 io_queue_worker_create(worker, acct, create_worker_cb); 477 } 478 479 /* 480 * Worker will start processing some work. Move it to the busy list, if 481 * it's currently on the freelist 482 */ 483 static void __io_worker_busy(struct io_wq_acct *acct, struct io_worker *worker) 484 { 485 if (test_bit(IO_WORKER_F_FREE, &worker->flags)) { 486 clear_bit(IO_WORKER_F_FREE, &worker->flags); 487 raw_spin_lock(&acct->workers_lock); 488 hlist_nulls_del_init_rcu(&worker->nulls_node); 489 raw_spin_unlock(&acct->workers_lock); 490 } 491 } 492 493 /* 494 * No work, worker going to sleep. Move to freelist. 495 */ 496 static void __io_worker_idle(struct io_wq_acct *acct, struct io_worker *worker) 497 __must_hold(acct->workers_lock) 498 { 499 if (!test_bit(IO_WORKER_F_FREE, &worker->flags)) { 500 set_bit(IO_WORKER_F_FREE, &worker->flags); 501 hlist_nulls_add_head_rcu(&worker->nulls_node, &acct->free_list); 502 } 503 } 504 505 static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash) 506 { 507 bool ret = false; 508 509 spin_lock_irq(&wq->hash->wait.lock); 510 if (list_empty(&wq->wait.entry)) { 511 __add_wait_queue(&wq->hash->wait, &wq->wait); 512 if (!test_bit(hash, &wq->hash->map)) { 513 __set_current_state(TASK_RUNNING); 514 list_del_init(&wq->wait.entry); 515 ret = true; 516 } 517 } 518 spin_unlock_irq(&wq->hash->wait.lock); 519 return ret; 520 } 521 522 static struct io_wq_work *io_get_next_work(struct io_wq_acct *acct, 523 struct io_wq *wq) 524 __must_hold(acct->lock) 525 { 526 struct io_wq_work_node *node, *prev; 527 struct io_wq_work *work, *tail; 528 unsigned int stall_hash = -1U; 529 530 wq_list_for_each(node, prev, &acct->work_list) { 531 unsigned int work_flags; 532 unsigned int hash; 533 534 work = container_of(node, struct io_wq_work, list); 535 536 /* not hashed, can run anytime */ 537 work_flags = atomic_read(&work->flags); 538 if (!__io_wq_is_hashed(work_flags)) { 539 wq_list_del(&acct->work_list, node, prev); 540 return work; 541 } 542 543 hash = __io_get_work_hash(work_flags); 544 /* all items with this hash lie in [work, tail] */ 545 tail = wq->hash_tail[hash]; 546 547 /* hashed, can run if not already running */ 548 if (!test_and_set_bit(hash, &wq->hash->map)) { 549 wq->hash_tail[hash] = NULL; 550 wq_list_cut(&acct->work_list, &tail->list, prev); 551 return work; 552 } 553 if (stall_hash == -1U) 554 stall_hash = hash; 555 /* fast forward to a next hash, for-each will fix up @prev */ 556 node = &tail->list; 557 } 558 559 if (stall_hash != -1U) { 560 bool unstalled; 561 562 /* 563 * Set this before dropping the lock to avoid racing with new 564 * work being added and clearing the stalled bit. 565 */ 566 set_bit(IO_ACCT_STALLED_BIT, &acct->flags); 567 raw_spin_unlock(&acct->lock); 568 unstalled = io_wait_on_hash(wq, stall_hash); 569 raw_spin_lock(&acct->lock); 570 if (unstalled) { 571 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 572 if (wq_has_sleeper(&wq->hash->wait)) 573 wake_up(&wq->hash->wait); 574 } 575 } 576 577 return NULL; 578 } 579 580 static void io_assign_current_work(struct io_worker *worker, 581 struct io_wq_work *work) 582 { 583 if (work) { 584 io_run_task_work(); 585 cond_resched(); 586 } 587 588 raw_spin_lock(&worker->lock); 589 worker->cur_work = work; 590 raw_spin_unlock(&worker->lock); 591 } 592 593 /* 594 * Called with acct->lock held, drops it before returning 595 */ 596 static void io_worker_handle_work(struct io_wq_acct *acct, 597 struct io_worker *worker) 598 __releases(&acct->lock) 599 { 600 struct io_wq *wq = worker->wq; 601 bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state); 602 603 do { 604 struct io_wq_work *work; 605 606 /* 607 * If we got some work, mark us as busy. If we didn't, but 608 * the list isn't empty, it means we stalled on hashed work. 609 * Mark us stalled so we don't keep looking for work when we 610 * can't make progress, any work completion or insertion will 611 * clear the stalled flag. 612 */ 613 work = io_get_next_work(acct, wq); 614 if (work) { 615 /* 616 * Make sure cancelation can find this, even before 617 * it becomes the active work. That avoids a window 618 * where the work has been removed from our general 619 * work list, but isn't yet discoverable as the 620 * current work item for this worker. 621 */ 622 raw_spin_lock(&worker->lock); 623 worker->cur_work = work; 624 raw_spin_unlock(&worker->lock); 625 } 626 627 raw_spin_unlock(&acct->lock); 628 629 if (!work) 630 break; 631 632 __io_worker_busy(acct, worker); 633 634 io_assign_current_work(worker, work); 635 __set_current_state(TASK_RUNNING); 636 637 /* handle a whole dependent link */ 638 do { 639 struct io_wq_work *next_hashed, *linked; 640 unsigned int work_flags = atomic_read(&work->flags); 641 unsigned int hash = __io_wq_is_hashed(work_flags) 642 ? __io_get_work_hash(work_flags) 643 : -1U; 644 645 next_hashed = wq_next_work(work); 646 647 if (do_kill && 648 (work_flags & IO_WQ_WORK_UNBOUND)) 649 atomic_or(IO_WQ_WORK_CANCEL, &work->flags); 650 io_wq_submit_work(work); 651 io_assign_current_work(worker, NULL); 652 653 linked = io_wq_free_work(work); 654 work = next_hashed; 655 if (!work && linked && !io_wq_is_hashed(linked)) { 656 work = linked; 657 linked = NULL; 658 } 659 io_assign_current_work(worker, work); 660 if (linked) 661 io_wq_enqueue(wq, linked); 662 663 if (hash != -1U && !next_hashed) { 664 /* serialize hash clear with wake_up() */ 665 spin_lock_irq(&wq->hash->wait.lock); 666 clear_bit(hash, &wq->hash->map); 667 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 668 spin_unlock_irq(&wq->hash->wait.lock); 669 if (wq_has_sleeper(&wq->hash->wait)) 670 wake_up(&wq->hash->wait); 671 } 672 } while (work); 673 674 if (!__io_acct_run_queue(acct)) 675 break; 676 raw_spin_lock(&acct->lock); 677 } while (1); 678 } 679 680 static int io_wq_worker(void *data) 681 { 682 struct io_worker *worker = data; 683 struct io_wq_acct *acct = io_wq_get_acct(worker); 684 struct io_wq *wq = worker->wq; 685 bool exit_mask = false, last_timeout = false; 686 char buf[TASK_COMM_LEN] = {}; 687 688 set_mask_bits(&worker->flags, 0, 689 BIT(IO_WORKER_F_UP) | BIT(IO_WORKER_F_RUNNING)); 690 691 snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid); 692 set_task_comm(current, buf); 693 694 while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { 695 long ret; 696 697 set_current_state(TASK_INTERRUPTIBLE); 698 699 /* 700 * If we have work to do, io_acct_run_queue() returns with 701 * the acct->lock held. If not, it will drop it. 702 */ 703 while (io_acct_run_queue(acct)) 704 io_worker_handle_work(acct, worker); 705 706 raw_spin_lock(&acct->workers_lock); 707 /* 708 * Last sleep timed out. Exit if we're not the last worker, 709 * or if someone modified our affinity. 710 */ 711 if (last_timeout && (exit_mask || acct->nr_workers > 1)) { 712 acct->nr_workers--; 713 raw_spin_unlock(&acct->workers_lock); 714 __set_current_state(TASK_RUNNING); 715 break; 716 } 717 last_timeout = false; 718 __io_worker_idle(acct, worker); 719 raw_spin_unlock(&acct->workers_lock); 720 if (io_run_task_work()) 721 continue; 722 ret = schedule_timeout(WORKER_IDLE_TIMEOUT); 723 if (signal_pending(current)) { 724 struct ksignal ksig; 725 726 if (!get_signal(&ksig)) 727 continue; 728 break; 729 } 730 if (!ret) { 731 last_timeout = true; 732 exit_mask = !cpumask_test_cpu(raw_smp_processor_id(), 733 wq->cpu_mask); 734 } 735 } 736 737 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) && io_acct_run_queue(acct)) 738 io_worker_handle_work(acct, worker); 739 740 io_worker_exit(worker); 741 return 0; 742 } 743 744 /* 745 * Called when a worker is scheduled in. Mark us as currently running. 746 */ 747 void io_wq_worker_running(struct task_struct *tsk) 748 { 749 struct io_worker *worker = tsk->worker_private; 750 751 if (!worker) 752 return; 753 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 754 return; 755 if (test_bit(IO_WORKER_F_RUNNING, &worker->flags)) 756 return; 757 set_bit(IO_WORKER_F_RUNNING, &worker->flags); 758 io_wq_inc_running(worker); 759 } 760 761 /* 762 * Called when worker is going to sleep. If there are no workers currently 763 * running and we have work pending, wake up a free one or create a new one. 764 */ 765 void io_wq_worker_sleeping(struct task_struct *tsk) 766 { 767 struct io_worker *worker = tsk->worker_private; 768 769 if (!worker) 770 return; 771 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 772 return; 773 if (!test_bit(IO_WORKER_F_RUNNING, &worker->flags)) 774 return; 775 776 clear_bit(IO_WORKER_F_RUNNING, &worker->flags); 777 io_wq_dec_running(worker); 778 } 779 780 static void io_init_new_worker(struct io_wq *wq, struct io_wq_acct *acct, struct io_worker *worker, 781 struct task_struct *tsk) 782 { 783 tsk->worker_private = worker; 784 worker->task = tsk; 785 set_cpus_allowed_ptr(tsk, wq->cpu_mask); 786 787 raw_spin_lock(&acct->workers_lock); 788 hlist_nulls_add_head_rcu(&worker->nulls_node, &acct->free_list); 789 list_add_tail_rcu(&worker->all_list, &acct->all_list); 790 set_bit(IO_WORKER_F_FREE, &worker->flags); 791 raw_spin_unlock(&acct->workers_lock); 792 wake_up_new_task(tsk); 793 } 794 795 static bool io_wq_work_match_all(struct io_wq_work *work, void *data) 796 { 797 return true; 798 } 799 800 static inline bool io_should_retry_thread(struct io_worker *worker, long err) 801 { 802 /* 803 * Prevent perpetual task_work retry, if the task (or its group) is 804 * exiting. 805 */ 806 if (fatal_signal_pending(current)) 807 return false; 808 809 worker->init_retries++; 810 switch (err) { 811 case -EAGAIN: 812 return worker->init_retries <= WORKER_INIT_LIMIT; 813 /* Analogous to a fork() syscall, always retry on a restartable error */ 814 case -ERESTARTSYS: 815 case -ERESTARTNOINTR: 816 case -ERESTARTNOHAND: 817 return true; 818 default: 819 return false; 820 } 821 } 822 823 static void queue_create_worker_retry(struct io_worker *worker) 824 { 825 /* 826 * We only bother retrying because there's a chance that the 827 * failure to create a worker is due to some temporary condition 828 * in the forking task (e.g. outstanding signal); give the task 829 * some time to clear that condition. 830 */ 831 schedule_delayed_work(&worker->work, 832 msecs_to_jiffies(worker->init_retries * 5)); 833 } 834 835 static void create_worker_cont(struct callback_head *cb) 836 { 837 struct io_worker *worker; 838 struct task_struct *tsk; 839 struct io_wq *wq; 840 struct io_wq_acct *acct; 841 842 worker = container_of(cb, struct io_worker, create_work); 843 clear_bit_unlock(0, &worker->create_state); 844 wq = worker->wq; 845 acct = io_wq_get_acct(worker); 846 tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); 847 if (!IS_ERR(tsk)) { 848 io_init_new_worker(wq, acct, worker, tsk); 849 io_worker_release(worker); 850 return; 851 } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) { 852 atomic_dec(&acct->nr_running); 853 raw_spin_lock(&acct->workers_lock); 854 acct->nr_workers--; 855 if (!acct->nr_workers) { 856 struct io_cb_cancel_data match = { 857 .fn = io_wq_work_match_all, 858 .cancel_all = true, 859 }; 860 861 raw_spin_unlock(&acct->workers_lock); 862 while (io_acct_cancel_pending_work(wq, acct, &match)) 863 ; 864 } else { 865 raw_spin_unlock(&acct->workers_lock); 866 } 867 io_worker_ref_put(wq); 868 kfree(worker); 869 return; 870 } 871 872 /* re-create attempts grab a new worker ref, drop the existing one */ 873 io_worker_release(worker); 874 queue_create_worker_retry(worker); 875 } 876 877 static void io_workqueue_create(struct work_struct *work) 878 { 879 struct io_worker *worker = container_of(work, struct io_worker, 880 work.work); 881 struct io_wq_acct *acct = io_wq_get_acct(worker); 882 883 if (!io_queue_worker_create(worker, acct, create_worker_cont)) 884 kfree(worker); 885 } 886 887 static bool create_io_worker(struct io_wq *wq, struct io_wq_acct *acct) 888 { 889 struct io_worker *worker; 890 struct task_struct *tsk; 891 892 __set_current_state(TASK_RUNNING); 893 894 worker = kzalloc(sizeof(*worker), GFP_KERNEL); 895 if (!worker) { 896 fail: 897 atomic_dec(&acct->nr_running); 898 raw_spin_lock(&acct->workers_lock); 899 acct->nr_workers--; 900 raw_spin_unlock(&acct->workers_lock); 901 io_worker_ref_put(wq); 902 return false; 903 } 904 905 refcount_set(&worker->ref, 1); 906 worker->wq = wq; 907 worker->acct = acct; 908 raw_spin_lock_init(&worker->lock); 909 init_completion(&worker->ref_done); 910 911 tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); 912 if (!IS_ERR(tsk)) { 913 io_init_new_worker(wq, acct, worker, tsk); 914 } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) { 915 kfree(worker); 916 goto fail; 917 } else { 918 INIT_DELAYED_WORK(&worker->work, io_workqueue_create); 919 queue_create_worker_retry(worker); 920 } 921 922 return true; 923 } 924 925 /* 926 * Iterate the passed in list and call the specific function for each 927 * worker that isn't exiting 928 */ 929 static bool io_acct_for_each_worker(struct io_wq_acct *acct, 930 bool (*func)(struct io_worker *, void *), 931 void *data) 932 { 933 struct io_worker *worker; 934 bool ret = false; 935 936 list_for_each_entry_rcu(worker, &acct->all_list, all_list) { 937 if (io_worker_get(worker)) { 938 /* no task if node is/was offline */ 939 if (worker->task) 940 ret = func(worker, data); 941 io_worker_release(worker); 942 if (ret) 943 break; 944 } 945 } 946 947 return ret; 948 } 949 950 static bool io_wq_for_each_worker(struct io_wq *wq, 951 bool (*func)(struct io_worker *, void *), 952 void *data) 953 { 954 for (int i = 0; i < IO_WQ_ACCT_NR; i++) { 955 if (!io_acct_for_each_worker(&wq->acct[i], func, data)) 956 return false; 957 } 958 959 return true; 960 } 961 962 static bool io_wq_worker_wake(struct io_worker *worker, void *data) 963 { 964 __set_notify_signal(worker->task); 965 wake_up_process(worker->task); 966 return false; 967 } 968 969 static void io_run_cancel(struct io_wq_work *work, struct io_wq *wq) 970 { 971 do { 972 atomic_or(IO_WQ_WORK_CANCEL, &work->flags); 973 io_wq_submit_work(work); 974 work = io_wq_free_work(work); 975 } while (work); 976 } 977 978 static void io_wq_insert_work(struct io_wq *wq, struct io_wq_acct *acct, 979 struct io_wq_work *work, unsigned int work_flags) 980 { 981 unsigned int hash; 982 struct io_wq_work *tail; 983 984 if (!__io_wq_is_hashed(work_flags)) { 985 append: 986 wq_list_add_tail(&work->list, &acct->work_list); 987 return; 988 } 989 990 hash = __io_get_work_hash(work_flags); 991 tail = wq->hash_tail[hash]; 992 wq->hash_tail[hash] = work; 993 if (!tail) 994 goto append; 995 996 wq_list_add_after(&work->list, &tail->list, &acct->work_list); 997 } 998 999 static bool io_wq_work_match_item(struct io_wq_work *work, void *data) 1000 { 1001 return work == data; 1002 } 1003 1004 void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) 1005 { 1006 unsigned int work_flags = atomic_read(&work->flags); 1007 struct io_wq_acct *acct = io_work_get_acct(wq, work_flags); 1008 struct io_cb_cancel_data match = { 1009 .fn = io_wq_work_match_item, 1010 .data = work, 1011 .cancel_all = false, 1012 }; 1013 bool do_create; 1014 1015 /* 1016 * If io-wq is exiting for this task, or if the request has explicitly 1017 * been marked as one that should not get executed, cancel it here. 1018 */ 1019 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) || 1020 (work_flags & IO_WQ_WORK_CANCEL)) { 1021 io_run_cancel(work, wq); 1022 return; 1023 } 1024 1025 raw_spin_lock(&acct->lock); 1026 io_wq_insert_work(wq, acct, work, work_flags); 1027 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 1028 raw_spin_unlock(&acct->lock); 1029 1030 rcu_read_lock(); 1031 do_create = !io_acct_activate_free_worker(acct); 1032 rcu_read_unlock(); 1033 1034 if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) || 1035 !atomic_read(&acct->nr_running))) { 1036 bool did_create; 1037 1038 did_create = io_wq_create_worker(wq, acct); 1039 if (likely(did_create)) 1040 return; 1041 1042 raw_spin_lock(&acct->workers_lock); 1043 if (acct->nr_workers) { 1044 raw_spin_unlock(&acct->workers_lock); 1045 return; 1046 } 1047 raw_spin_unlock(&acct->workers_lock); 1048 1049 /* fatal condition, failed to create the first worker */ 1050 io_acct_cancel_pending_work(wq, acct, &match); 1051 } 1052 } 1053 1054 /* 1055 * Work items that hash to the same value will not be done in parallel. 1056 * Used to limit concurrent writes, generally hashed by inode. 1057 */ 1058 void io_wq_hash_work(struct io_wq_work *work, void *val) 1059 { 1060 unsigned int bit; 1061 1062 bit = hash_ptr(val, IO_WQ_HASH_ORDER); 1063 atomic_or(IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT), &work->flags); 1064 } 1065 1066 static bool __io_wq_worker_cancel(struct io_worker *worker, 1067 struct io_cb_cancel_data *match, 1068 struct io_wq_work *work) 1069 { 1070 if (work && match->fn(work, match->data)) { 1071 atomic_or(IO_WQ_WORK_CANCEL, &work->flags); 1072 __set_notify_signal(worker->task); 1073 return true; 1074 } 1075 1076 return false; 1077 } 1078 1079 static bool io_wq_worker_cancel(struct io_worker *worker, void *data) 1080 { 1081 struct io_cb_cancel_data *match = data; 1082 1083 /* 1084 * Hold the lock to avoid ->cur_work going out of scope, caller 1085 * may dereference the passed in work. 1086 */ 1087 raw_spin_lock(&worker->lock); 1088 if (__io_wq_worker_cancel(worker, match, worker->cur_work)) 1089 match->nr_running++; 1090 raw_spin_unlock(&worker->lock); 1091 1092 return match->nr_running && !match->cancel_all; 1093 } 1094 1095 static inline void io_wq_remove_pending(struct io_wq *wq, 1096 struct io_wq_acct *acct, 1097 struct io_wq_work *work, 1098 struct io_wq_work_node *prev) 1099 { 1100 unsigned int hash = io_get_work_hash(work); 1101 struct io_wq_work *prev_work = NULL; 1102 1103 if (io_wq_is_hashed(work) && work == wq->hash_tail[hash]) { 1104 if (prev) 1105 prev_work = container_of(prev, struct io_wq_work, list); 1106 if (prev_work && io_get_work_hash(prev_work) == hash) 1107 wq->hash_tail[hash] = prev_work; 1108 else 1109 wq->hash_tail[hash] = NULL; 1110 } 1111 wq_list_del(&acct->work_list, &work->list, prev); 1112 } 1113 1114 static bool io_acct_cancel_pending_work(struct io_wq *wq, 1115 struct io_wq_acct *acct, 1116 struct io_cb_cancel_data *match) 1117 { 1118 struct io_wq_work_node *node, *prev; 1119 struct io_wq_work *work; 1120 1121 raw_spin_lock(&acct->lock); 1122 wq_list_for_each(node, prev, &acct->work_list) { 1123 work = container_of(node, struct io_wq_work, list); 1124 if (!match->fn(work, match->data)) 1125 continue; 1126 io_wq_remove_pending(wq, acct, work, prev); 1127 raw_spin_unlock(&acct->lock); 1128 io_run_cancel(work, wq); 1129 match->nr_pending++; 1130 /* not safe to continue after unlock */ 1131 return true; 1132 } 1133 raw_spin_unlock(&acct->lock); 1134 1135 return false; 1136 } 1137 1138 static void io_wq_cancel_pending_work(struct io_wq *wq, 1139 struct io_cb_cancel_data *match) 1140 { 1141 int i; 1142 retry: 1143 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1144 struct io_wq_acct *acct = io_get_acct(wq, i == 0); 1145 1146 if (io_acct_cancel_pending_work(wq, acct, match)) { 1147 if (match->cancel_all) 1148 goto retry; 1149 break; 1150 } 1151 } 1152 } 1153 1154 static void io_acct_cancel_running_work(struct io_wq_acct *acct, 1155 struct io_cb_cancel_data *match) 1156 { 1157 raw_spin_lock(&acct->workers_lock); 1158 io_acct_for_each_worker(acct, io_wq_worker_cancel, match); 1159 raw_spin_unlock(&acct->workers_lock); 1160 } 1161 1162 static void io_wq_cancel_running_work(struct io_wq *wq, 1163 struct io_cb_cancel_data *match) 1164 { 1165 rcu_read_lock(); 1166 1167 for (int i = 0; i < IO_WQ_ACCT_NR; i++) 1168 io_acct_cancel_running_work(&wq->acct[i], match); 1169 1170 rcu_read_unlock(); 1171 } 1172 1173 enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, 1174 void *data, bool cancel_all) 1175 { 1176 struct io_cb_cancel_data match = { 1177 .fn = cancel, 1178 .data = data, 1179 .cancel_all = cancel_all, 1180 }; 1181 1182 /* 1183 * First check pending list, if we're lucky we can just remove it 1184 * from there. CANCEL_OK means that the work is returned as-new, 1185 * no completion will be posted for it. 1186 * 1187 * Then check if a free (going busy) or busy worker has the work 1188 * currently running. If we find it there, we'll return CANCEL_RUNNING 1189 * as an indication that we attempt to signal cancellation. The 1190 * completion will run normally in this case. 1191 * 1192 * Do both of these while holding the acct->workers_lock, to ensure that 1193 * we'll find a work item regardless of state. 1194 */ 1195 io_wq_cancel_pending_work(wq, &match); 1196 if (match.nr_pending && !match.cancel_all) 1197 return IO_WQ_CANCEL_OK; 1198 1199 io_wq_cancel_running_work(wq, &match); 1200 if (match.nr_running && !match.cancel_all) 1201 return IO_WQ_CANCEL_RUNNING; 1202 1203 if (match.nr_running) 1204 return IO_WQ_CANCEL_RUNNING; 1205 if (match.nr_pending) 1206 return IO_WQ_CANCEL_OK; 1207 return IO_WQ_CANCEL_NOTFOUND; 1208 } 1209 1210 static int io_wq_hash_wake(struct wait_queue_entry *wait, unsigned mode, 1211 int sync, void *key) 1212 { 1213 struct io_wq *wq = container_of(wait, struct io_wq, wait); 1214 int i; 1215 1216 list_del_init(&wait->entry); 1217 1218 rcu_read_lock(); 1219 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1220 struct io_wq_acct *acct = &wq->acct[i]; 1221 1222 if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags)) 1223 io_acct_activate_free_worker(acct); 1224 } 1225 rcu_read_unlock(); 1226 return 1; 1227 } 1228 1229 struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) 1230 { 1231 int ret, i; 1232 struct io_wq *wq; 1233 1234 if (WARN_ON_ONCE(!bounded)) 1235 return ERR_PTR(-EINVAL); 1236 1237 wq = kzalloc(sizeof(struct io_wq), GFP_KERNEL); 1238 if (!wq) 1239 return ERR_PTR(-ENOMEM); 1240 1241 refcount_inc(&data->hash->refs); 1242 wq->hash = data->hash; 1243 1244 ret = -ENOMEM; 1245 1246 if (!alloc_cpumask_var(&wq->cpu_mask, GFP_KERNEL)) 1247 goto err; 1248 cpuset_cpus_allowed(data->task, wq->cpu_mask); 1249 wq->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; 1250 wq->acct[IO_WQ_ACCT_UNBOUND].max_workers = 1251 task_rlimit(current, RLIMIT_NPROC); 1252 INIT_LIST_HEAD(&wq->wait.entry); 1253 wq->wait.func = io_wq_hash_wake; 1254 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1255 struct io_wq_acct *acct = &wq->acct[i]; 1256 1257 atomic_set(&acct->nr_running, 0); 1258 1259 raw_spin_lock_init(&acct->workers_lock); 1260 INIT_HLIST_NULLS_HEAD(&acct->free_list, 0); 1261 INIT_LIST_HEAD(&acct->all_list); 1262 1263 INIT_WQ_LIST(&acct->work_list); 1264 raw_spin_lock_init(&acct->lock); 1265 } 1266 1267 wq->task = get_task_struct(data->task); 1268 atomic_set(&wq->worker_refs, 1); 1269 init_completion(&wq->worker_done); 1270 ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1271 if (ret) { 1272 put_task_struct(wq->task); 1273 goto err; 1274 } 1275 1276 return wq; 1277 err: 1278 io_wq_put_hash(data->hash); 1279 free_cpumask_var(wq->cpu_mask); 1280 kfree(wq); 1281 return ERR_PTR(ret); 1282 } 1283 1284 static bool io_task_work_match(struct callback_head *cb, void *data) 1285 { 1286 struct io_worker *worker; 1287 1288 if (cb->func != create_worker_cb && cb->func != create_worker_cont) 1289 return false; 1290 worker = container_of(cb, struct io_worker, create_work); 1291 return worker->wq == data; 1292 } 1293 1294 void io_wq_exit_start(struct io_wq *wq) 1295 { 1296 set_bit(IO_WQ_BIT_EXIT, &wq->state); 1297 } 1298 1299 static void io_wq_cancel_tw_create(struct io_wq *wq) 1300 { 1301 struct callback_head *cb; 1302 1303 while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) { 1304 struct io_worker *worker; 1305 1306 worker = container_of(cb, struct io_worker, create_work); 1307 io_worker_cancel_cb(worker); 1308 /* 1309 * Only the worker continuation helper has worker allocated and 1310 * hence needs freeing. 1311 */ 1312 if (cb->func == create_worker_cont) 1313 kfree(worker); 1314 } 1315 } 1316 1317 static void io_wq_exit_workers(struct io_wq *wq) 1318 { 1319 if (!wq->task) 1320 return; 1321 1322 io_wq_cancel_tw_create(wq); 1323 1324 rcu_read_lock(); 1325 io_wq_for_each_worker(wq, io_wq_worker_wake, NULL); 1326 rcu_read_unlock(); 1327 io_worker_ref_put(wq); 1328 wait_for_completion(&wq->worker_done); 1329 1330 spin_lock_irq(&wq->hash->wait.lock); 1331 list_del_init(&wq->wait.entry); 1332 spin_unlock_irq(&wq->hash->wait.lock); 1333 1334 put_task_struct(wq->task); 1335 wq->task = NULL; 1336 } 1337 1338 static void io_wq_destroy(struct io_wq *wq) 1339 { 1340 struct io_cb_cancel_data match = { 1341 .fn = io_wq_work_match_all, 1342 .cancel_all = true, 1343 }; 1344 1345 cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1346 io_wq_cancel_pending_work(wq, &match); 1347 free_cpumask_var(wq->cpu_mask); 1348 io_wq_put_hash(wq->hash); 1349 kfree(wq); 1350 } 1351 1352 void io_wq_put_and_exit(struct io_wq *wq) 1353 { 1354 WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state)); 1355 1356 io_wq_exit_workers(wq); 1357 io_wq_destroy(wq); 1358 } 1359 1360 struct online_data { 1361 unsigned int cpu; 1362 bool online; 1363 }; 1364 1365 static bool io_wq_worker_affinity(struct io_worker *worker, void *data) 1366 { 1367 struct online_data *od = data; 1368 1369 if (od->online) 1370 cpumask_set_cpu(od->cpu, worker->wq->cpu_mask); 1371 else 1372 cpumask_clear_cpu(od->cpu, worker->wq->cpu_mask); 1373 return false; 1374 } 1375 1376 static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online) 1377 { 1378 struct online_data od = { 1379 .cpu = cpu, 1380 .online = online 1381 }; 1382 1383 rcu_read_lock(); 1384 io_wq_for_each_worker(wq, io_wq_worker_affinity, &od); 1385 rcu_read_unlock(); 1386 return 0; 1387 } 1388 1389 static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node) 1390 { 1391 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1392 1393 return __io_wq_cpu_online(wq, cpu, true); 1394 } 1395 1396 static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node) 1397 { 1398 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1399 1400 return __io_wq_cpu_online(wq, cpu, false); 1401 } 1402 1403 int io_wq_cpu_affinity(struct io_uring_task *tctx, cpumask_var_t mask) 1404 { 1405 cpumask_var_t allowed_mask; 1406 int ret = 0; 1407 1408 if (!tctx || !tctx->io_wq) 1409 return -EINVAL; 1410 1411 if (!alloc_cpumask_var(&allowed_mask, GFP_KERNEL)) 1412 return -ENOMEM; 1413 1414 rcu_read_lock(); 1415 cpuset_cpus_allowed(tctx->io_wq->task, allowed_mask); 1416 if (mask) { 1417 if (cpumask_subset(mask, allowed_mask)) 1418 cpumask_copy(tctx->io_wq->cpu_mask, mask); 1419 else 1420 ret = -EINVAL; 1421 } else { 1422 cpumask_copy(tctx->io_wq->cpu_mask, allowed_mask); 1423 } 1424 rcu_read_unlock(); 1425 1426 free_cpumask_var(allowed_mask); 1427 return ret; 1428 } 1429 1430 /* 1431 * Set max number of unbounded workers, returns old value. If new_count is 0, 1432 * then just return the old value. 1433 */ 1434 int io_wq_max_workers(struct io_wq *wq, int *new_count) 1435 { 1436 struct io_wq_acct *acct; 1437 int prev[IO_WQ_ACCT_NR]; 1438 int i; 1439 1440 BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND); 1441 BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND); 1442 BUILD_BUG_ON((int) IO_WQ_ACCT_NR != 2); 1443 1444 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1445 if (new_count[i] > task_rlimit(current, RLIMIT_NPROC)) 1446 new_count[i] = task_rlimit(current, RLIMIT_NPROC); 1447 } 1448 1449 for (i = 0; i < IO_WQ_ACCT_NR; i++) 1450 prev[i] = 0; 1451 1452 rcu_read_lock(); 1453 1454 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1455 acct = &wq->acct[i]; 1456 raw_spin_lock(&acct->workers_lock); 1457 prev[i] = max_t(int, acct->max_workers, prev[i]); 1458 if (new_count[i]) 1459 acct->max_workers = new_count[i]; 1460 raw_spin_unlock(&acct->workers_lock); 1461 } 1462 rcu_read_unlock(); 1463 1464 for (i = 0; i < IO_WQ_ACCT_NR; i++) 1465 new_count[i] = prev[i]; 1466 1467 return 0; 1468 } 1469 1470 static __init int io_wq_init(void) 1471 { 1472 int ret; 1473 1474 ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online", 1475 io_wq_cpu_online, io_wq_cpu_offline); 1476 if (ret < 0) 1477 return ret; 1478 io_wq_online = ret; 1479 return 0; 1480 } 1481 subsys_initcall(io_wq_init); 1482