1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Basic worker thread pool for io_uring 4 * 5 * Copyright (C) 2019 Jens Axboe 6 * 7 */ 8 #include <linux/kernel.h> 9 #include <linux/init.h> 10 #include <linux/errno.h> 11 #include <linux/sched/signal.h> 12 #include <linux/percpu.h> 13 #include <linux/slab.h> 14 #include <linux/rculist_nulls.h> 15 #include <linux/cpu.h> 16 #include <linux/cpuset.h> 17 #include <linux/task_work.h> 18 #include <linux/audit.h> 19 #include <linux/mmu_context.h> 20 #include <uapi/linux/io_uring.h> 21 22 #include "io-wq.h" 23 #include "slist.h" 24 #include "io_uring.h" 25 26 #define WORKER_IDLE_TIMEOUT (5 * HZ) 27 #define WORKER_INIT_LIMIT 3 28 29 enum { 30 IO_WORKER_F_UP = 0, /* up and active */ 31 IO_WORKER_F_RUNNING = 1, /* account as running */ 32 IO_WORKER_F_FREE = 2, /* worker on free list */ 33 }; 34 35 enum { 36 IO_WQ_BIT_EXIT = 0, /* wq exiting */ 37 }; 38 39 enum { 40 IO_ACCT_STALLED_BIT = 0, /* stalled on hash */ 41 }; 42 43 /* 44 * One for each thread in a wq pool 45 */ 46 struct io_worker { 47 refcount_t ref; 48 unsigned long flags; 49 struct hlist_nulls_node nulls_node; 50 struct list_head all_list; 51 struct task_struct *task; 52 struct io_wq *wq; 53 struct io_wq_acct *acct; 54 55 struct io_wq_work *cur_work; 56 raw_spinlock_t lock; 57 58 struct completion ref_done; 59 60 unsigned long create_state; 61 struct callback_head create_work; 62 int init_retries; 63 64 union { 65 struct rcu_head rcu; 66 struct delayed_work work; 67 }; 68 }; 69 70 #if BITS_PER_LONG == 64 71 #define IO_WQ_HASH_ORDER 6 72 #else 73 #define IO_WQ_HASH_ORDER 5 74 #endif 75 76 #define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER) 77 78 struct io_wq_acct { 79 /** 80 * Protects access to the worker lists. 81 */ 82 raw_spinlock_t workers_lock; 83 84 unsigned nr_workers; 85 unsigned max_workers; 86 atomic_t nr_running; 87 88 /** 89 * The list of free workers. Protected by #workers_lock 90 * (write) and RCU (read). 91 */ 92 struct hlist_nulls_head free_list; 93 94 /** 95 * The list of all workers. Protected by #workers_lock 96 * (write) and RCU (read). 97 */ 98 struct list_head all_list; 99 100 raw_spinlock_t lock; 101 struct io_wq_work_list work_list; 102 unsigned long flags; 103 }; 104 105 enum { 106 IO_WQ_ACCT_BOUND, 107 IO_WQ_ACCT_UNBOUND, 108 IO_WQ_ACCT_NR, 109 }; 110 111 /* 112 * Per io_wq state 113 */ 114 struct io_wq { 115 unsigned long state; 116 117 struct io_wq_hash *hash; 118 119 atomic_t worker_refs; 120 struct completion worker_done; 121 122 struct hlist_node cpuhp_node; 123 124 struct task_struct *task; 125 126 struct io_wq_acct acct[IO_WQ_ACCT_NR]; 127 128 struct wait_queue_entry wait; 129 130 struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS]; 131 132 cpumask_var_t cpu_mask; 133 }; 134 135 static enum cpuhp_state io_wq_online; 136 137 struct io_cb_cancel_data { 138 work_cancel_fn *fn; 139 void *data; 140 int nr_running; 141 int nr_pending; 142 bool cancel_all; 143 }; 144 145 static bool create_io_worker(struct io_wq *wq, struct io_wq_acct *acct); 146 static void io_wq_dec_running(struct io_worker *worker); 147 static bool io_acct_cancel_pending_work(struct io_wq *wq, 148 struct io_wq_acct *acct, 149 struct io_cb_cancel_data *match); 150 static void create_worker_cb(struct callback_head *cb); 151 static void io_wq_cancel_tw_create(struct io_wq *wq); 152 153 static inline unsigned int __io_get_work_hash(unsigned int work_flags) 154 { 155 return work_flags >> IO_WQ_HASH_SHIFT; 156 } 157 158 static inline unsigned int io_get_work_hash(struct io_wq_work *work) 159 { 160 return __io_get_work_hash(atomic_read(&work->flags)); 161 } 162 163 static bool io_worker_get(struct io_worker *worker) 164 { 165 return refcount_inc_not_zero(&worker->ref); 166 } 167 168 static void io_worker_release(struct io_worker *worker) 169 { 170 if (refcount_dec_and_test(&worker->ref)) 171 complete(&worker->ref_done); 172 } 173 174 static inline struct io_wq_acct *io_get_acct(struct io_wq *wq, bool bound) 175 { 176 return &wq->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND]; 177 } 178 179 static inline struct io_wq_acct *io_work_get_acct(struct io_wq *wq, 180 unsigned int work_flags) 181 { 182 return io_get_acct(wq, !(work_flags & IO_WQ_WORK_UNBOUND)); 183 } 184 185 static inline struct io_wq_acct *io_wq_get_acct(struct io_worker *worker) 186 { 187 return worker->acct; 188 } 189 190 static void io_worker_ref_put(struct io_wq *wq) 191 { 192 if (atomic_dec_and_test(&wq->worker_refs)) 193 complete(&wq->worker_done); 194 } 195 196 bool io_wq_worker_stopped(void) 197 { 198 struct io_worker *worker = current->worker_private; 199 200 if (WARN_ON_ONCE(!io_wq_current_is_worker())) 201 return true; 202 203 return test_bit(IO_WQ_BIT_EXIT, &worker->wq->state); 204 } 205 206 static void io_worker_cancel_cb(struct io_worker *worker) 207 { 208 struct io_wq_acct *acct = io_wq_get_acct(worker); 209 struct io_wq *wq = worker->wq; 210 211 atomic_dec(&acct->nr_running); 212 raw_spin_lock(&acct->workers_lock); 213 acct->nr_workers--; 214 raw_spin_unlock(&acct->workers_lock); 215 io_worker_ref_put(wq); 216 clear_bit_unlock(0, &worker->create_state); 217 io_worker_release(worker); 218 } 219 220 static bool io_task_worker_match(struct callback_head *cb, void *data) 221 { 222 struct io_worker *worker; 223 224 if (cb->func != create_worker_cb) 225 return false; 226 worker = container_of(cb, struct io_worker, create_work); 227 return worker == data; 228 } 229 230 static void io_worker_exit(struct io_worker *worker) 231 { 232 struct io_wq *wq = worker->wq; 233 struct io_wq_acct *acct = io_wq_get_acct(worker); 234 235 while (1) { 236 struct callback_head *cb = task_work_cancel_match(wq->task, 237 io_task_worker_match, worker); 238 239 if (!cb) 240 break; 241 io_worker_cancel_cb(worker); 242 } 243 244 io_worker_release(worker); 245 wait_for_completion(&worker->ref_done); 246 247 raw_spin_lock(&acct->workers_lock); 248 if (test_bit(IO_WORKER_F_FREE, &worker->flags)) 249 hlist_nulls_del_rcu(&worker->nulls_node); 250 list_del_rcu(&worker->all_list); 251 raw_spin_unlock(&acct->workers_lock); 252 io_wq_dec_running(worker); 253 /* 254 * this worker is a goner, clear ->worker_private to avoid any 255 * inc/dec running calls that could happen as part of exit from 256 * touching 'worker'. 257 */ 258 current->worker_private = NULL; 259 260 kfree_rcu(worker, rcu); 261 io_worker_ref_put(wq); 262 do_exit(0); 263 } 264 265 static inline bool __io_acct_run_queue(struct io_wq_acct *acct) 266 { 267 return !test_bit(IO_ACCT_STALLED_BIT, &acct->flags) && 268 !wq_list_empty(&acct->work_list); 269 } 270 271 /* 272 * If there's work to do, returns true with acct->lock acquired. If not, 273 * returns false with no lock held. 274 */ 275 static inline bool io_acct_run_queue(struct io_wq_acct *acct) 276 __acquires(&acct->lock) 277 { 278 raw_spin_lock(&acct->lock); 279 if (__io_acct_run_queue(acct)) 280 return true; 281 282 raw_spin_unlock(&acct->lock); 283 return false; 284 } 285 286 /* 287 * Check head of free list for an available worker. If one isn't available, 288 * caller must create one. 289 */ 290 static bool io_acct_activate_free_worker(struct io_wq_acct *acct) 291 __must_hold(RCU) 292 { 293 struct hlist_nulls_node *n; 294 struct io_worker *worker; 295 296 /* 297 * Iterate free_list and see if we can find an idle worker to 298 * activate. If a given worker is on the free_list but in the process 299 * of exiting, keep trying. 300 */ 301 hlist_nulls_for_each_entry_rcu(worker, n, &acct->free_list, nulls_node) { 302 if (!io_worker_get(worker)) 303 continue; 304 /* 305 * If the worker is already running, it's either already 306 * starting work or finishing work. In either case, if it does 307 * to go sleep, we'll kick off a new task for this work anyway. 308 */ 309 wake_up_process(worker->task); 310 io_worker_release(worker); 311 return true; 312 } 313 314 return false; 315 } 316 317 /* 318 * We need a worker. If we find a free one, we're good. If not, and we're 319 * below the max number of workers, create one. 320 */ 321 static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct) 322 { 323 /* 324 * Most likely an attempt to queue unbounded work on an io_wq that 325 * wasn't setup with any unbounded workers. 326 */ 327 if (unlikely(!acct->max_workers)) 328 pr_warn_once("io-wq is not configured for unbound workers"); 329 330 raw_spin_lock(&acct->workers_lock); 331 if (acct->nr_workers >= acct->max_workers) { 332 raw_spin_unlock(&acct->workers_lock); 333 return true; 334 } 335 acct->nr_workers++; 336 raw_spin_unlock(&acct->workers_lock); 337 atomic_inc(&acct->nr_running); 338 atomic_inc(&wq->worker_refs); 339 return create_io_worker(wq, acct); 340 } 341 342 static void io_wq_inc_running(struct io_worker *worker) 343 { 344 struct io_wq_acct *acct = io_wq_get_acct(worker); 345 346 atomic_inc(&acct->nr_running); 347 } 348 349 static void create_worker_cb(struct callback_head *cb) 350 { 351 struct io_worker *worker; 352 struct io_wq *wq; 353 354 struct io_wq_acct *acct; 355 bool do_create = false; 356 357 worker = container_of(cb, struct io_worker, create_work); 358 wq = worker->wq; 359 acct = worker->acct; 360 361 rcu_read_lock(); 362 do_create = !io_acct_activate_free_worker(acct); 363 rcu_read_unlock(); 364 if (!do_create) 365 goto no_need_create; 366 367 raw_spin_lock(&acct->workers_lock); 368 369 if (acct->nr_workers < acct->max_workers) { 370 acct->nr_workers++; 371 do_create = true; 372 } 373 raw_spin_unlock(&acct->workers_lock); 374 if (do_create) { 375 create_io_worker(wq, acct); 376 } else { 377 no_need_create: 378 atomic_dec(&acct->nr_running); 379 io_worker_ref_put(wq); 380 } 381 clear_bit_unlock(0, &worker->create_state); 382 io_worker_release(worker); 383 } 384 385 static bool io_queue_worker_create(struct io_worker *worker, 386 struct io_wq_acct *acct, 387 task_work_func_t func) 388 { 389 struct io_wq *wq = worker->wq; 390 391 /* raced with exit, just ignore create call */ 392 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) 393 goto fail; 394 if (!io_worker_get(worker)) 395 goto fail; 396 /* 397 * create_state manages ownership of create_work/index. We should 398 * only need one entry per worker, as the worker going to sleep 399 * will trigger the condition, and waking will clear it once it 400 * runs the task_work. 401 */ 402 if (test_bit(0, &worker->create_state) || 403 test_and_set_bit_lock(0, &worker->create_state)) 404 goto fail_release; 405 406 atomic_inc(&wq->worker_refs); 407 init_task_work(&worker->create_work, func); 408 if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) { 409 /* 410 * EXIT may have been set after checking it above, check after 411 * adding the task_work and remove any creation item if it is 412 * now set. wq exit does that too, but we can have added this 413 * work item after we canceled in io_wq_exit_workers(). 414 */ 415 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) 416 io_wq_cancel_tw_create(wq); 417 io_worker_ref_put(wq); 418 return true; 419 } 420 io_worker_ref_put(wq); 421 clear_bit_unlock(0, &worker->create_state); 422 fail_release: 423 io_worker_release(worker); 424 fail: 425 atomic_dec(&acct->nr_running); 426 io_worker_ref_put(wq); 427 return false; 428 } 429 430 /* Defer if current and next work are both hashed to the same chain */ 431 static bool io_wq_hash_defer(struct io_wq_work *work, struct io_wq_acct *acct) 432 { 433 unsigned int hash, work_flags; 434 struct io_wq_work *next; 435 436 lockdep_assert_held(&acct->lock); 437 438 work_flags = atomic_read(&work->flags); 439 if (!__io_wq_is_hashed(work_flags)) 440 return false; 441 442 /* should not happen, io_acct_run_queue() said we had work */ 443 if (wq_list_empty(&acct->work_list)) 444 return true; 445 446 hash = __io_get_work_hash(work_flags); 447 next = container_of(acct->work_list.first, struct io_wq_work, list); 448 work_flags = atomic_read(&next->flags); 449 if (!__io_wq_is_hashed(work_flags)) 450 return false; 451 return hash == __io_get_work_hash(work_flags); 452 } 453 454 static void io_wq_dec_running(struct io_worker *worker) 455 { 456 struct io_wq_acct *acct = io_wq_get_acct(worker); 457 struct io_wq *wq = worker->wq; 458 459 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 460 return; 461 462 if (!atomic_dec_and_test(&acct->nr_running)) 463 return; 464 if (!worker->cur_work) 465 return; 466 if (!io_acct_run_queue(acct)) 467 return; 468 if (io_wq_hash_defer(worker->cur_work, acct)) { 469 raw_spin_unlock(&acct->lock); 470 return; 471 } 472 473 raw_spin_unlock(&acct->lock); 474 atomic_inc(&acct->nr_running); 475 atomic_inc(&wq->worker_refs); 476 io_queue_worker_create(worker, acct, create_worker_cb); 477 } 478 479 /* 480 * Worker will start processing some work. Move it to the busy list, if 481 * it's currently on the freelist 482 */ 483 static void __io_worker_busy(struct io_wq_acct *acct, struct io_worker *worker) 484 { 485 if (test_bit(IO_WORKER_F_FREE, &worker->flags)) { 486 clear_bit(IO_WORKER_F_FREE, &worker->flags); 487 raw_spin_lock(&acct->workers_lock); 488 hlist_nulls_del_init_rcu(&worker->nulls_node); 489 raw_spin_unlock(&acct->workers_lock); 490 } 491 } 492 493 /* 494 * No work, worker going to sleep. Move to freelist. 495 */ 496 static void __io_worker_idle(struct io_wq_acct *acct, struct io_worker *worker) 497 __must_hold(acct->workers_lock) 498 { 499 if (!test_bit(IO_WORKER_F_FREE, &worker->flags)) { 500 set_bit(IO_WORKER_F_FREE, &worker->flags); 501 hlist_nulls_add_head_rcu(&worker->nulls_node, &acct->free_list); 502 } 503 } 504 505 static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash) 506 { 507 bool ret = false; 508 509 spin_lock_irq(&wq->hash->wait.lock); 510 if (list_empty(&wq->wait.entry)) { 511 __add_wait_queue(&wq->hash->wait, &wq->wait); 512 if (!test_bit(hash, &wq->hash->map)) { 513 __set_current_state(TASK_RUNNING); 514 list_del_init(&wq->wait.entry); 515 ret = true; 516 } 517 } 518 spin_unlock_irq(&wq->hash->wait.lock); 519 return ret; 520 } 521 522 static struct io_wq_work *io_get_next_work(struct io_wq_acct *acct, 523 struct io_wq *wq) 524 __must_hold(acct->lock) 525 { 526 struct io_wq_work_node *node, *prev; 527 struct io_wq_work *work, *tail; 528 unsigned int stall_hash = -1U; 529 530 wq_list_for_each(node, prev, &acct->work_list) { 531 unsigned int work_flags; 532 unsigned int hash; 533 534 work = container_of(node, struct io_wq_work, list); 535 536 /* not hashed, can run anytime */ 537 work_flags = atomic_read(&work->flags); 538 if (!__io_wq_is_hashed(work_flags)) { 539 wq_list_del(&acct->work_list, node, prev); 540 return work; 541 } 542 543 hash = __io_get_work_hash(work_flags); 544 /* all items with this hash lie in [work, tail] */ 545 tail = wq->hash_tail[hash]; 546 547 /* hashed, can run if not already running */ 548 if (!test_and_set_bit(hash, &wq->hash->map)) { 549 wq->hash_tail[hash] = NULL; 550 wq_list_cut(&acct->work_list, &tail->list, prev); 551 return work; 552 } 553 if (stall_hash == -1U) 554 stall_hash = hash; 555 /* fast forward to a next hash, for-each will fix up @prev */ 556 node = &tail->list; 557 } 558 559 if (stall_hash != -1U) { 560 bool unstalled; 561 562 /* 563 * Set this before dropping the lock to avoid racing with new 564 * work being added and clearing the stalled bit. 565 */ 566 set_bit(IO_ACCT_STALLED_BIT, &acct->flags); 567 raw_spin_unlock(&acct->lock); 568 unstalled = io_wait_on_hash(wq, stall_hash); 569 raw_spin_lock(&acct->lock); 570 if (unstalled) { 571 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 572 if (wq_has_sleeper(&wq->hash->wait)) 573 wake_up(&wq->hash->wait); 574 } 575 } 576 577 return NULL; 578 } 579 580 static void io_assign_current_work(struct io_worker *worker, 581 struct io_wq_work *work) 582 { 583 if (work) { 584 io_run_task_work(); 585 cond_resched(); 586 } 587 588 raw_spin_lock(&worker->lock); 589 worker->cur_work = work; 590 raw_spin_unlock(&worker->lock); 591 } 592 593 /* 594 * Called with acct->lock held, drops it before returning 595 */ 596 static void io_worker_handle_work(struct io_wq_acct *acct, 597 struct io_worker *worker) 598 __releases(&acct->lock) 599 { 600 struct io_wq *wq = worker->wq; 601 bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state); 602 603 do { 604 struct io_wq_work *work; 605 606 /* 607 * If we got some work, mark us as busy. If we didn't, but 608 * the list isn't empty, it means we stalled on hashed work. 609 * Mark us stalled so we don't keep looking for work when we 610 * can't make progress, any work completion or insertion will 611 * clear the stalled flag. 612 */ 613 work = io_get_next_work(acct, wq); 614 if (work) { 615 /* 616 * Make sure cancelation can find this, even before 617 * it becomes the active work. That avoids a window 618 * where the work has been removed from our general 619 * work list, but isn't yet discoverable as the 620 * current work item for this worker. 621 */ 622 raw_spin_lock(&worker->lock); 623 worker->cur_work = work; 624 raw_spin_unlock(&worker->lock); 625 } 626 627 raw_spin_unlock(&acct->lock); 628 629 if (!work) 630 break; 631 632 __io_worker_busy(acct, worker); 633 634 io_assign_current_work(worker, work); 635 __set_current_state(TASK_RUNNING); 636 637 /* handle a whole dependent link */ 638 do { 639 struct io_wq_work *next_hashed, *linked; 640 unsigned int work_flags = atomic_read(&work->flags); 641 unsigned int hash = __io_wq_is_hashed(work_flags) 642 ? __io_get_work_hash(work_flags) 643 : -1U; 644 645 next_hashed = wq_next_work(work); 646 647 if (do_kill && 648 (work_flags & IO_WQ_WORK_UNBOUND)) 649 atomic_or(IO_WQ_WORK_CANCEL, &work->flags); 650 io_wq_submit_work(work); 651 io_assign_current_work(worker, NULL); 652 653 linked = io_wq_free_work(work); 654 work = next_hashed; 655 if (!work && linked && !io_wq_is_hashed(linked)) { 656 work = linked; 657 linked = NULL; 658 } 659 io_assign_current_work(worker, work); 660 if (linked) 661 io_wq_enqueue(wq, linked); 662 663 if (hash != -1U && !next_hashed) { 664 /* serialize hash clear with wake_up() */ 665 spin_lock_irq(&wq->hash->wait.lock); 666 clear_bit(hash, &wq->hash->map); 667 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 668 spin_unlock_irq(&wq->hash->wait.lock); 669 if (wq_has_sleeper(&wq->hash->wait)) 670 wake_up(&wq->hash->wait); 671 } 672 } while (work); 673 674 if (!__io_acct_run_queue(acct)) 675 break; 676 raw_spin_lock(&acct->lock); 677 } while (1); 678 } 679 680 static int io_wq_worker(void *data) 681 { 682 struct io_worker *worker = data; 683 struct io_wq_acct *acct = io_wq_get_acct(worker); 684 struct io_wq *wq = worker->wq; 685 bool exit_mask = false, last_timeout = false; 686 char buf[TASK_COMM_LEN] = {}; 687 688 set_mask_bits(&worker->flags, 0, 689 BIT(IO_WORKER_F_UP) | BIT(IO_WORKER_F_RUNNING)); 690 691 snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid); 692 set_task_comm(current, buf); 693 694 while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { 695 long ret; 696 697 set_current_state(TASK_INTERRUPTIBLE); 698 699 /* 700 * If we have work to do, io_acct_run_queue() returns with 701 * the acct->lock held. If not, it will drop it. 702 */ 703 while (io_acct_run_queue(acct)) 704 io_worker_handle_work(acct, worker); 705 706 raw_spin_lock(&acct->workers_lock); 707 /* 708 * Last sleep timed out. Exit if we're not the last worker, 709 * or if someone modified our affinity. 710 */ 711 if (last_timeout && (exit_mask || acct->nr_workers > 1)) { 712 acct->nr_workers--; 713 raw_spin_unlock(&acct->workers_lock); 714 __set_current_state(TASK_RUNNING); 715 break; 716 } 717 last_timeout = false; 718 __io_worker_idle(acct, worker); 719 raw_spin_unlock(&acct->workers_lock); 720 if (io_run_task_work()) 721 continue; 722 ret = schedule_timeout(WORKER_IDLE_TIMEOUT); 723 if (signal_pending(current)) { 724 struct ksignal ksig; 725 726 if (!get_signal(&ksig)) 727 continue; 728 break; 729 } 730 if (!ret) { 731 last_timeout = true; 732 exit_mask = !cpumask_test_cpu(raw_smp_processor_id(), 733 wq->cpu_mask); 734 } 735 } 736 737 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) && io_acct_run_queue(acct)) 738 io_worker_handle_work(acct, worker); 739 740 io_worker_exit(worker); 741 return 0; 742 } 743 744 /* 745 * Called when a worker is scheduled in. Mark us as currently running. 746 */ 747 void io_wq_worker_running(struct task_struct *tsk) 748 { 749 struct io_worker *worker = tsk->worker_private; 750 751 if (!worker) 752 return; 753 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 754 return; 755 if (test_bit(IO_WORKER_F_RUNNING, &worker->flags)) 756 return; 757 set_bit(IO_WORKER_F_RUNNING, &worker->flags); 758 io_wq_inc_running(worker); 759 } 760 761 /* 762 * Called when worker is going to sleep. If there are no workers currently 763 * running and we have work pending, wake up a free one or create a new one. 764 */ 765 void io_wq_worker_sleeping(struct task_struct *tsk) 766 { 767 struct io_worker *worker = tsk->worker_private; 768 769 if (!worker) 770 return; 771 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 772 return; 773 if (!test_bit(IO_WORKER_F_RUNNING, &worker->flags)) 774 return; 775 776 clear_bit(IO_WORKER_F_RUNNING, &worker->flags); 777 io_wq_dec_running(worker); 778 } 779 780 static void io_init_new_worker(struct io_wq *wq, struct io_wq_acct *acct, struct io_worker *worker, 781 struct task_struct *tsk) 782 { 783 tsk->worker_private = worker; 784 worker->task = tsk; 785 set_cpus_allowed_ptr(tsk, wq->cpu_mask); 786 787 raw_spin_lock(&acct->workers_lock); 788 hlist_nulls_add_head_rcu(&worker->nulls_node, &acct->free_list); 789 list_add_tail_rcu(&worker->all_list, &acct->all_list); 790 set_bit(IO_WORKER_F_FREE, &worker->flags); 791 raw_spin_unlock(&acct->workers_lock); 792 wake_up_new_task(tsk); 793 } 794 795 static bool io_wq_work_match_all(struct io_wq_work *work, void *data) 796 { 797 return true; 798 } 799 800 static inline bool io_should_retry_thread(struct io_worker *worker, long err) 801 { 802 /* 803 * Prevent perpetual task_work retry, if the task (or its group) is 804 * exiting. 805 */ 806 if (fatal_signal_pending(current)) 807 return false; 808 if (worker->init_retries++ >= WORKER_INIT_LIMIT) 809 return false; 810 811 switch (err) { 812 case -EAGAIN: 813 case -ERESTARTSYS: 814 case -ERESTARTNOINTR: 815 case -ERESTARTNOHAND: 816 return true; 817 default: 818 return false; 819 } 820 } 821 822 static void queue_create_worker_retry(struct io_worker *worker) 823 { 824 /* 825 * We only bother retrying because there's a chance that the 826 * failure to create a worker is due to some temporary condition 827 * in the forking task (e.g. outstanding signal); give the task 828 * some time to clear that condition. 829 */ 830 schedule_delayed_work(&worker->work, 831 msecs_to_jiffies(worker->init_retries * 5)); 832 } 833 834 static void create_worker_cont(struct callback_head *cb) 835 { 836 struct io_worker *worker; 837 struct task_struct *tsk; 838 struct io_wq *wq; 839 struct io_wq_acct *acct; 840 841 worker = container_of(cb, struct io_worker, create_work); 842 clear_bit_unlock(0, &worker->create_state); 843 wq = worker->wq; 844 acct = io_wq_get_acct(worker); 845 tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); 846 if (!IS_ERR(tsk)) { 847 io_init_new_worker(wq, acct, worker, tsk); 848 io_worker_release(worker); 849 return; 850 } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) { 851 atomic_dec(&acct->nr_running); 852 raw_spin_lock(&acct->workers_lock); 853 acct->nr_workers--; 854 if (!acct->nr_workers) { 855 struct io_cb_cancel_data match = { 856 .fn = io_wq_work_match_all, 857 .cancel_all = true, 858 }; 859 860 raw_spin_unlock(&acct->workers_lock); 861 while (io_acct_cancel_pending_work(wq, acct, &match)) 862 ; 863 } else { 864 raw_spin_unlock(&acct->workers_lock); 865 } 866 io_worker_ref_put(wq); 867 kfree(worker); 868 return; 869 } 870 871 /* re-create attempts grab a new worker ref, drop the existing one */ 872 io_worker_release(worker); 873 queue_create_worker_retry(worker); 874 } 875 876 static void io_workqueue_create(struct work_struct *work) 877 { 878 struct io_worker *worker = container_of(work, struct io_worker, 879 work.work); 880 struct io_wq_acct *acct = io_wq_get_acct(worker); 881 882 if (!io_queue_worker_create(worker, acct, create_worker_cont)) 883 kfree(worker); 884 } 885 886 static bool create_io_worker(struct io_wq *wq, struct io_wq_acct *acct) 887 { 888 struct io_worker *worker; 889 struct task_struct *tsk; 890 891 __set_current_state(TASK_RUNNING); 892 893 worker = kzalloc(sizeof(*worker), GFP_KERNEL); 894 if (!worker) { 895 fail: 896 atomic_dec(&acct->nr_running); 897 raw_spin_lock(&acct->workers_lock); 898 acct->nr_workers--; 899 raw_spin_unlock(&acct->workers_lock); 900 io_worker_ref_put(wq); 901 return false; 902 } 903 904 refcount_set(&worker->ref, 1); 905 worker->wq = wq; 906 worker->acct = acct; 907 raw_spin_lock_init(&worker->lock); 908 init_completion(&worker->ref_done); 909 910 tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); 911 if (!IS_ERR(tsk)) { 912 io_init_new_worker(wq, acct, worker, tsk); 913 } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) { 914 kfree(worker); 915 goto fail; 916 } else { 917 INIT_DELAYED_WORK(&worker->work, io_workqueue_create); 918 queue_create_worker_retry(worker); 919 } 920 921 return true; 922 } 923 924 /* 925 * Iterate the passed in list and call the specific function for each 926 * worker that isn't exiting 927 */ 928 static bool io_acct_for_each_worker(struct io_wq_acct *acct, 929 bool (*func)(struct io_worker *, void *), 930 void *data) 931 { 932 struct io_worker *worker; 933 bool ret = false; 934 935 list_for_each_entry_rcu(worker, &acct->all_list, all_list) { 936 if (io_worker_get(worker)) { 937 /* no task if node is/was offline */ 938 if (worker->task) 939 ret = func(worker, data); 940 io_worker_release(worker); 941 if (ret) 942 break; 943 } 944 } 945 946 return ret; 947 } 948 949 static bool io_wq_for_each_worker(struct io_wq *wq, 950 bool (*func)(struct io_worker *, void *), 951 void *data) 952 { 953 for (int i = 0; i < IO_WQ_ACCT_NR; i++) { 954 if (!io_acct_for_each_worker(&wq->acct[i], func, data)) 955 return false; 956 } 957 958 return true; 959 } 960 961 static bool io_wq_worker_wake(struct io_worker *worker, void *data) 962 { 963 __set_notify_signal(worker->task); 964 wake_up_process(worker->task); 965 return false; 966 } 967 968 static void io_run_cancel(struct io_wq_work *work, struct io_wq *wq) 969 { 970 do { 971 atomic_or(IO_WQ_WORK_CANCEL, &work->flags); 972 io_wq_submit_work(work); 973 work = io_wq_free_work(work); 974 } while (work); 975 } 976 977 static void io_wq_insert_work(struct io_wq *wq, struct io_wq_acct *acct, 978 struct io_wq_work *work, unsigned int work_flags) 979 { 980 unsigned int hash; 981 struct io_wq_work *tail; 982 983 if (!__io_wq_is_hashed(work_flags)) { 984 append: 985 wq_list_add_tail(&work->list, &acct->work_list); 986 return; 987 } 988 989 hash = __io_get_work_hash(work_flags); 990 tail = wq->hash_tail[hash]; 991 wq->hash_tail[hash] = work; 992 if (!tail) 993 goto append; 994 995 wq_list_add_after(&work->list, &tail->list, &acct->work_list); 996 } 997 998 static bool io_wq_work_match_item(struct io_wq_work *work, void *data) 999 { 1000 return work == data; 1001 } 1002 1003 void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) 1004 { 1005 unsigned int work_flags = atomic_read(&work->flags); 1006 struct io_wq_acct *acct = io_work_get_acct(wq, work_flags); 1007 struct io_cb_cancel_data match = { 1008 .fn = io_wq_work_match_item, 1009 .data = work, 1010 .cancel_all = false, 1011 }; 1012 bool do_create; 1013 1014 /* 1015 * If io-wq is exiting for this task, or if the request has explicitly 1016 * been marked as one that should not get executed, cancel it here. 1017 */ 1018 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) || 1019 (work_flags & IO_WQ_WORK_CANCEL)) { 1020 io_run_cancel(work, wq); 1021 return; 1022 } 1023 1024 raw_spin_lock(&acct->lock); 1025 io_wq_insert_work(wq, acct, work, work_flags); 1026 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 1027 raw_spin_unlock(&acct->lock); 1028 1029 rcu_read_lock(); 1030 do_create = !io_acct_activate_free_worker(acct); 1031 rcu_read_unlock(); 1032 1033 if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) || 1034 !atomic_read(&acct->nr_running))) { 1035 bool did_create; 1036 1037 did_create = io_wq_create_worker(wq, acct); 1038 if (likely(did_create)) 1039 return; 1040 1041 raw_spin_lock(&acct->workers_lock); 1042 if (acct->nr_workers) { 1043 raw_spin_unlock(&acct->workers_lock); 1044 return; 1045 } 1046 raw_spin_unlock(&acct->workers_lock); 1047 1048 /* fatal condition, failed to create the first worker */ 1049 io_acct_cancel_pending_work(wq, acct, &match); 1050 } 1051 } 1052 1053 /* 1054 * Work items that hash to the same value will not be done in parallel. 1055 * Used to limit concurrent writes, generally hashed by inode. 1056 */ 1057 void io_wq_hash_work(struct io_wq_work *work, void *val) 1058 { 1059 unsigned int bit; 1060 1061 bit = hash_ptr(val, IO_WQ_HASH_ORDER); 1062 atomic_or(IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT), &work->flags); 1063 } 1064 1065 static bool __io_wq_worker_cancel(struct io_worker *worker, 1066 struct io_cb_cancel_data *match, 1067 struct io_wq_work *work) 1068 { 1069 if (work && match->fn(work, match->data)) { 1070 atomic_or(IO_WQ_WORK_CANCEL, &work->flags); 1071 __set_notify_signal(worker->task); 1072 return true; 1073 } 1074 1075 return false; 1076 } 1077 1078 static bool io_wq_worker_cancel(struct io_worker *worker, void *data) 1079 { 1080 struct io_cb_cancel_data *match = data; 1081 1082 /* 1083 * Hold the lock to avoid ->cur_work going out of scope, caller 1084 * may dereference the passed in work. 1085 */ 1086 raw_spin_lock(&worker->lock); 1087 if (__io_wq_worker_cancel(worker, match, worker->cur_work)) 1088 match->nr_running++; 1089 raw_spin_unlock(&worker->lock); 1090 1091 return match->nr_running && !match->cancel_all; 1092 } 1093 1094 static inline void io_wq_remove_pending(struct io_wq *wq, 1095 struct io_wq_acct *acct, 1096 struct io_wq_work *work, 1097 struct io_wq_work_node *prev) 1098 { 1099 unsigned int hash = io_get_work_hash(work); 1100 struct io_wq_work *prev_work = NULL; 1101 1102 if (io_wq_is_hashed(work) && work == wq->hash_tail[hash]) { 1103 if (prev) 1104 prev_work = container_of(prev, struct io_wq_work, list); 1105 if (prev_work && io_get_work_hash(prev_work) == hash) 1106 wq->hash_tail[hash] = prev_work; 1107 else 1108 wq->hash_tail[hash] = NULL; 1109 } 1110 wq_list_del(&acct->work_list, &work->list, prev); 1111 } 1112 1113 static bool io_acct_cancel_pending_work(struct io_wq *wq, 1114 struct io_wq_acct *acct, 1115 struct io_cb_cancel_data *match) 1116 { 1117 struct io_wq_work_node *node, *prev; 1118 struct io_wq_work *work; 1119 1120 raw_spin_lock(&acct->lock); 1121 wq_list_for_each(node, prev, &acct->work_list) { 1122 work = container_of(node, struct io_wq_work, list); 1123 if (!match->fn(work, match->data)) 1124 continue; 1125 io_wq_remove_pending(wq, acct, work, prev); 1126 raw_spin_unlock(&acct->lock); 1127 io_run_cancel(work, wq); 1128 match->nr_pending++; 1129 /* not safe to continue after unlock */ 1130 return true; 1131 } 1132 raw_spin_unlock(&acct->lock); 1133 1134 return false; 1135 } 1136 1137 static void io_wq_cancel_pending_work(struct io_wq *wq, 1138 struct io_cb_cancel_data *match) 1139 { 1140 int i; 1141 retry: 1142 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1143 struct io_wq_acct *acct = io_get_acct(wq, i == 0); 1144 1145 if (io_acct_cancel_pending_work(wq, acct, match)) { 1146 if (match->cancel_all) 1147 goto retry; 1148 break; 1149 } 1150 } 1151 } 1152 1153 static void io_acct_cancel_running_work(struct io_wq_acct *acct, 1154 struct io_cb_cancel_data *match) 1155 { 1156 raw_spin_lock(&acct->workers_lock); 1157 io_acct_for_each_worker(acct, io_wq_worker_cancel, match); 1158 raw_spin_unlock(&acct->workers_lock); 1159 } 1160 1161 static void io_wq_cancel_running_work(struct io_wq *wq, 1162 struct io_cb_cancel_data *match) 1163 { 1164 rcu_read_lock(); 1165 1166 for (int i = 0; i < IO_WQ_ACCT_NR; i++) 1167 io_acct_cancel_running_work(&wq->acct[i], match); 1168 1169 rcu_read_unlock(); 1170 } 1171 1172 enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, 1173 void *data, bool cancel_all) 1174 { 1175 struct io_cb_cancel_data match = { 1176 .fn = cancel, 1177 .data = data, 1178 .cancel_all = cancel_all, 1179 }; 1180 1181 /* 1182 * First check pending list, if we're lucky we can just remove it 1183 * from there. CANCEL_OK means that the work is returned as-new, 1184 * no completion will be posted for it. 1185 * 1186 * Then check if a free (going busy) or busy worker has the work 1187 * currently running. If we find it there, we'll return CANCEL_RUNNING 1188 * as an indication that we attempt to signal cancellation. The 1189 * completion will run normally in this case. 1190 * 1191 * Do both of these while holding the acct->workers_lock, to ensure that 1192 * we'll find a work item regardless of state. 1193 */ 1194 io_wq_cancel_pending_work(wq, &match); 1195 if (match.nr_pending && !match.cancel_all) 1196 return IO_WQ_CANCEL_OK; 1197 1198 io_wq_cancel_running_work(wq, &match); 1199 if (match.nr_running && !match.cancel_all) 1200 return IO_WQ_CANCEL_RUNNING; 1201 1202 if (match.nr_running) 1203 return IO_WQ_CANCEL_RUNNING; 1204 if (match.nr_pending) 1205 return IO_WQ_CANCEL_OK; 1206 return IO_WQ_CANCEL_NOTFOUND; 1207 } 1208 1209 static int io_wq_hash_wake(struct wait_queue_entry *wait, unsigned mode, 1210 int sync, void *key) 1211 { 1212 struct io_wq *wq = container_of(wait, struct io_wq, wait); 1213 int i; 1214 1215 list_del_init(&wait->entry); 1216 1217 rcu_read_lock(); 1218 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1219 struct io_wq_acct *acct = &wq->acct[i]; 1220 1221 if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags)) 1222 io_acct_activate_free_worker(acct); 1223 } 1224 rcu_read_unlock(); 1225 return 1; 1226 } 1227 1228 struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) 1229 { 1230 int ret, i; 1231 struct io_wq *wq; 1232 1233 if (WARN_ON_ONCE(!bounded)) 1234 return ERR_PTR(-EINVAL); 1235 1236 wq = kzalloc(sizeof(struct io_wq), GFP_KERNEL); 1237 if (!wq) 1238 return ERR_PTR(-ENOMEM); 1239 1240 refcount_inc(&data->hash->refs); 1241 wq->hash = data->hash; 1242 1243 ret = -ENOMEM; 1244 1245 if (!alloc_cpumask_var(&wq->cpu_mask, GFP_KERNEL)) 1246 goto err; 1247 cpuset_cpus_allowed(data->task, wq->cpu_mask); 1248 wq->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; 1249 wq->acct[IO_WQ_ACCT_UNBOUND].max_workers = 1250 task_rlimit(current, RLIMIT_NPROC); 1251 INIT_LIST_HEAD(&wq->wait.entry); 1252 wq->wait.func = io_wq_hash_wake; 1253 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1254 struct io_wq_acct *acct = &wq->acct[i]; 1255 1256 atomic_set(&acct->nr_running, 0); 1257 1258 raw_spin_lock_init(&acct->workers_lock); 1259 INIT_HLIST_NULLS_HEAD(&acct->free_list, 0); 1260 INIT_LIST_HEAD(&acct->all_list); 1261 1262 INIT_WQ_LIST(&acct->work_list); 1263 raw_spin_lock_init(&acct->lock); 1264 } 1265 1266 wq->task = get_task_struct(data->task); 1267 atomic_set(&wq->worker_refs, 1); 1268 init_completion(&wq->worker_done); 1269 ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1270 if (ret) { 1271 put_task_struct(wq->task); 1272 goto err; 1273 } 1274 1275 return wq; 1276 err: 1277 io_wq_put_hash(data->hash); 1278 free_cpumask_var(wq->cpu_mask); 1279 kfree(wq); 1280 return ERR_PTR(ret); 1281 } 1282 1283 static bool io_task_work_match(struct callback_head *cb, void *data) 1284 { 1285 struct io_worker *worker; 1286 1287 if (cb->func != create_worker_cb && cb->func != create_worker_cont) 1288 return false; 1289 worker = container_of(cb, struct io_worker, create_work); 1290 return worker->wq == data; 1291 } 1292 1293 void io_wq_exit_start(struct io_wq *wq) 1294 { 1295 set_bit(IO_WQ_BIT_EXIT, &wq->state); 1296 } 1297 1298 static void io_wq_cancel_tw_create(struct io_wq *wq) 1299 { 1300 struct callback_head *cb; 1301 1302 while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) { 1303 struct io_worker *worker; 1304 1305 worker = container_of(cb, struct io_worker, create_work); 1306 io_worker_cancel_cb(worker); 1307 /* 1308 * Only the worker continuation helper has worker allocated and 1309 * hence needs freeing. 1310 */ 1311 if (cb->func == create_worker_cont) 1312 kfree(worker); 1313 } 1314 } 1315 1316 static void io_wq_exit_workers(struct io_wq *wq) 1317 { 1318 if (!wq->task) 1319 return; 1320 1321 io_wq_cancel_tw_create(wq); 1322 1323 rcu_read_lock(); 1324 io_wq_for_each_worker(wq, io_wq_worker_wake, NULL); 1325 rcu_read_unlock(); 1326 io_worker_ref_put(wq); 1327 wait_for_completion(&wq->worker_done); 1328 1329 spin_lock_irq(&wq->hash->wait.lock); 1330 list_del_init(&wq->wait.entry); 1331 spin_unlock_irq(&wq->hash->wait.lock); 1332 1333 put_task_struct(wq->task); 1334 wq->task = NULL; 1335 } 1336 1337 static void io_wq_destroy(struct io_wq *wq) 1338 { 1339 struct io_cb_cancel_data match = { 1340 .fn = io_wq_work_match_all, 1341 .cancel_all = true, 1342 }; 1343 1344 cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1345 io_wq_cancel_pending_work(wq, &match); 1346 free_cpumask_var(wq->cpu_mask); 1347 io_wq_put_hash(wq->hash); 1348 kfree(wq); 1349 } 1350 1351 void io_wq_put_and_exit(struct io_wq *wq) 1352 { 1353 WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state)); 1354 1355 io_wq_exit_workers(wq); 1356 io_wq_destroy(wq); 1357 } 1358 1359 struct online_data { 1360 unsigned int cpu; 1361 bool online; 1362 }; 1363 1364 static bool io_wq_worker_affinity(struct io_worker *worker, void *data) 1365 { 1366 struct online_data *od = data; 1367 1368 if (od->online) 1369 cpumask_set_cpu(od->cpu, worker->wq->cpu_mask); 1370 else 1371 cpumask_clear_cpu(od->cpu, worker->wq->cpu_mask); 1372 return false; 1373 } 1374 1375 static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online) 1376 { 1377 struct online_data od = { 1378 .cpu = cpu, 1379 .online = online 1380 }; 1381 1382 rcu_read_lock(); 1383 io_wq_for_each_worker(wq, io_wq_worker_affinity, &od); 1384 rcu_read_unlock(); 1385 return 0; 1386 } 1387 1388 static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node) 1389 { 1390 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1391 1392 return __io_wq_cpu_online(wq, cpu, true); 1393 } 1394 1395 static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node) 1396 { 1397 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1398 1399 return __io_wq_cpu_online(wq, cpu, false); 1400 } 1401 1402 int io_wq_cpu_affinity(struct io_uring_task *tctx, cpumask_var_t mask) 1403 { 1404 cpumask_var_t allowed_mask; 1405 int ret = 0; 1406 1407 if (!tctx || !tctx->io_wq) 1408 return -EINVAL; 1409 1410 if (!alloc_cpumask_var(&allowed_mask, GFP_KERNEL)) 1411 return -ENOMEM; 1412 1413 rcu_read_lock(); 1414 cpuset_cpus_allowed(tctx->io_wq->task, allowed_mask); 1415 if (mask) { 1416 if (cpumask_subset(mask, allowed_mask)) 1417 cpumask_copy(tctx->io_wq->cpu_mask, mask); 1418 else 1419 ret = -EINVAL; 1420 } else { 1421 cpumask_copy(tctx->io_wq->cpu_mask, allowed_mask); 1422 } 1423 rcu_read_unlock(); 1424 1425 free_cpumask_var(allowed_mask); 1426 return ret; 1427 } 1428 1429 /* 1430 * Set max number of unbounded workers, returns old value. If new_count is 0, 1431 * then just return the old value. 1432 */ 1433 int io_wq_max_workers(struct io_wq *wq, int *new_count) 1434 { 1435 struct io_wq_acct *acct; 1436 int prev[IO_WQ_ACCT_NR]; 1437 int i; 1438 1439 BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND); 1440 BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND); 1441 BUILD_BUG_ON((int) IO_WQ_ACCT_NR != 2); 1442 1443 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1444 if (new_count[i] > task_rlimit(current, RLIMIT_NPROC)) 1445 new_count[i] = task_rlimit(current, RLIMIT_NPROC); 1446 } 1447 1448 for (i = 0; i < IO_WQ_ACCT_NR; i++) 1449 prev[i] = 0; 1450 1451 rcu_read_lock(); 1452 1453 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1454 acct = &wq->acct[i]; 1455 raw_spin_lock(&acct->workers_lock); 1456 prev[i] = max_t(int, acct->max_workers, prev[i]); 1457 if (new_count[i]) 1458 acct->max_workers = new_count[i]; 1459 raw_spin_unlock(&acct->workers_lock); 1460 } 1461 rcu_read_unlock(); 1462 1463 for (i = 0; i < IO_WQ_ACCT_NR; i++) 1464 new_count[i] = prev[i]; 1465 1466 return 0; 1467 } 1468 1469 static __init int io_wq_init(void) 1470 { 1471 int ret; 1472 1473 ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online", 1474 io_wq_cpu_online, io_wq_cpu_offline); 1475 if (ret < 0) 1476 return ret; 1477 io_wq_online = ret; 1478 return 0; 1479 } 1480 subsys_initcall(io_wq_init); 1481