1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Basic worker thread pool for io_uring 4 * 5 * Copyright (C) 2019 Jens Axboe 6 * 7 */ 8 #include <linux/kernel.h> 9 #include <linux/init.h> 10 #include <linux/errno.h> 11 #include <linux/sched/signal.h> 12 #include <linux/percpu.h> 13 #include <linux/slab.h> 14 #include <linux/rculist_nulls.h> 15 #include <linux/cpu.h> 16 #include <linux/cpuset.h> 17 #include <linux/task_work.h> 18 #include <linux/audit.h> 19 #include <linux/mmu_context.h> 20 #include <uapi/linux/io_uring.h> 21 22 #include "io-wq.h" 23 #include "slist.h" 24 #include "io_uring.h" 25 26 #define WORKER_IDLE_TIMEOUT (5 * HZ) 27 #define WORKER_INIT_LIMIT 3 28 29 enum { 30 IO_WORKER_F_UP = 0, /* up and active */ 31 IO_WORKER_F_RUNNING = 1, /* account as running */ 32 IO_WORKER_F_FREE = 2, /* worker on free list */ 33 }; 34 35 enum { 36 IO_WQ_BIT_EXIT = 0, /* wq exiting */ 37 }; 38 39 enum { 40 IO_ACCT_STALLED_BIT = 0, /* stalled on hash */ 41 }; 42 43 /* 44 * One for each thread in a wq pool 45 */ 46 struct io_worker { 47 refcount_t ref; 48 unsigned long flags; 49 struct hlist_nulls_node nulls_node; 50 struct list_head all_list; 51 struct task_struct *task; 52 struct io_wq *wq; 53 struct io_wq_acct *acct; 54 55 struct io_wq_work *cur_work; 56 raw_spinlock_t lock; 57 58 struct completion ref_done; 59 60 unsigned long create_state; 61 struct callback_head create_work; 62 int init_retries; 63 64 union { 65 struct rcu_head rcu; 66 struct delayed_work work; 67 }; 68 }; 69 70 #if BITS_PER_LONG == 64 71 #define IO_WQ_HASH_ORDER 6 72 #else 73 #define IO_WQ_HASH_ORDER 5 74 #endif 75 76 #define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER) 77 78 struct io_wq_acct { 79 /** 80 * Protects access to the worker lists. 81 */ 82 raw_spinlock_t workers_lock; 83 84 unsigned nr_workers; 85 unsigned max_workers; 86 atomic_t nr_running; 87 88 /** 89 * The list of free workers. Protected by #workers_lock 90 * (write) and RCU (read). 91 */ 92 struct hlist_nulls_head free_list; 93 94 /** 95 * The list of all workers. Protected by #workers_lock 96 * (write) and RCU (read). 97 */ 98 struct list_head all_list; 99 100 raw_spinlock_t lock; 101 struct io_wq_work_list work_list; 102 unsigned long flags; 103 }; 104 105 enum { 106 IO_WQ_ACCT_BOUND, 107 IO_WQ_ACCT_UNBOUND, 108 IO_WQ_ACCT_NR, 109 }; 110 111 /* 112 * Per io_wq state 113 */ 114 struct io_wq { 115 unsigned long state; 116 117 struct io_wq_hash *hash; 118 119 atomic_t worker_refs; 120 struct completion worker_done; 121 122 struct hlist_node cpuhp_node; 123 124 struct task_struct *task; 125 126 struct io_wq_acct acct[IO_WQ_ACCT_NR]; 127 128 struct wait_queue_entry wait; 129 130 struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS]; 131 132 cpumask_var_t cpu_mask; 133 }; 134 135 static enum cpuhp_state io_wq_online; 136 137 struct io_cb_cancel_data { 138 work_cancel_fn *fn; 139 void *data; 140 int nr_running; 141 int nr_pending; 142 bool cancel_all; 143 }; 144 145 static bool create_io_worker(struct io_wq *wq, struct io_wq_acct *acct); 146 static void io_wq_dec_running(struct io_worker *worker); 147 static bool io_acct_cancel_pending_work(struct io_wq *wq, 148 struct io_wq_acct *acct, 149 struct io_cb_cancel_data *match); 150 static void create_worker_cb(struct callback_head *cb); 151 static void io_wq_cancel_tw_create(struct io_wq *wq); 152 153 static inline unsigned int __io_get_work_hash(unsigned int work_flags) 154 { 155 return work_flags >> IO_WQ_HASH_SHIFT; 156 } 157 158 static inline unsigned int io_get_work_hash(struct io_wq_work *work) 159 { 160 return __io_get_work_hash(atomic_read(&work->flags)); 161 } 162 163 static bool io_worker_get(struct io_worker *worker) 164 { 165 return refcount_inc_not_zero(&worker->ref); 166 } 167 168 static void io_worker_release(struct io_worker *worker) 169 { 170 if (refcount_dec_and_test(&worker->ref)) 171 complete(&worker->ref_done); 172 } 173 174 static inline struct io_wq_acct *io_get_acct(struct io_wq *wq, bool bound) 175 { 176 return &wq->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND]; 177 } 178 179 static inline struct io_wq_acct *io_work_get_acct(struct io_wq *wq, 180 unsigned int work_flags) 181 { 182 return io_get_acct(wq, !(work_flags & IO_WQ_WORK_UNBOUND)); 183 } 184 185 static inline struct io_wq_acct *io_wq_get_acct(struct io_worker *worker) 186 { 187 return worker->acct; 188 } 189 190 static void io_worker_ref_put(struct io_wq *wq) 191 { 192 if (atomic_dec_and_test(&wq->worker_refs)) 193 complete(&wq->worker_done); 194 } 195 196 bool io_wq_worker_stopped(void) 197 { 198 struct io_worker *worker = current->worker_private; 199 200 if (WARN_ON_ONCE(!io_wq_current_is_worker())) 201 return true; 202 203 return test_bit(IO_WQ_BIT_EXIT, &worker->wq->state); 204 } 205 206 static void io_worker_cancel_cb(struct io_worker *worker) 207 { 208 struct io_wq_acct *acct = io_wq_get_acct(worker); 209 struct io_wq *wq = worker->wq; 210 211 atomic_dec(&acct->nr_running); 212 raw_spin_lock(&acct->workers_lock); 213 acct->nr_workers--; 214 raw_spin_unlock(&acct->workers_lock); 215 io_worker_ref_put(wq); 216 clear_bit_unlock(0, &worker->create_state); 217 io_worker_release(worker); 218 } 219 220 static bool io_task_worker_match(struct callback_head *cb, void *data) 221 { 222 struct io_worker *worker; 223 224 if (cb->func != create_worker_cb) 225 return false; 226 worker = container_of(cb, struct io_worker, create_work); 227 return worker == data; 228 } 229 230 static void io_worker_exit(struct io_worker *worker) 231 { 232 struct io_wq *wq = worker->wq; 233 struct io_wq_acct *acct = io_wq_get_acct(worker); 234 235 while (1) { 236 struct callback_head *cb = task_work_cancel_match(wq->task, 237 io_task_worker_match, worker); 238 239 if (!cb) 240 break; 241 io_worker_cancel_cb(worker); 242 } 243 244 io_worker_release(worker); 245 wait_for_completion(&worker->ref_done); 246 247 raw_spin_lock(&acct->workers_lock); 248 if (test_bit(IO_WORKER_F_FREE, &worker->flags)) 249 hlist_nulls_del_rcu(&worker->nulls_node); 250 list_del_rcu(&worker->all_list); 251 raw_spin_unlock(&acct->workers_lock); 252 io_wq_dec_running(worker); 253 /* 254 * this worker is a goner, clear ->worker_private to avoid any 255 * inc/dec running calls that could happen as part of exit from 256 * touching 'worker'. 257 */ 258 current->worker_private = NULL; 259 260 kfree_rcu(worker, rcu); 261 io_worker_ref_put(wq); 262 do_exit(0); 263 } 264 265 static inline bool __io_acct_run_queue(struct io_wq_acct *acct) 266 { 267 return !test_bit(IO_ACCT_STALLED_BIT, &acct->flags) && 268 !wq_list_empty(&acct->work_list); 269 } 270 271 /* 272 * If there's work to do, returns true with acct->lock acquired. If not, 273 * returns false with no lock held. 274 */ 275 static inline bool io_acct_run_queue(struct io_wq_acct *acct) 276 __acquires(&acct->lock) 277 { 278 raw_spin_lock(&acct->lock); 279 if (__io_acct_run_queue(acct)) 280 return true; 281 282 raw_spin_unlock(&acct->lock); 283 return false; 284 } 285 286 /* 287 * Check head of free list for an available worker. If one isn't available, 288 * caller must create one. 289 */ 290 static bool io_acct_activate_free_worker(struct io_wq_acct *acct) 291 __must_hold(RCU) 292 { 293 struct hlist_nulls_node *n; 294 struct io_worker *worker; 295 296 /* 297 * Iterate free_list and see if we can find an idle worker to 298 * activate. If a given worker is on the free_list but in the process 299 * of exiting, keep trying. 300 */ 301 hlist_nulls_for_each_entry_rcu(worker, n, &acct->free_list, nulls_node) { 302 if (!io_worker_get(worker)) 303 continue; 304 /* 305 * If the worker is already running, it's either already 306 * starting work or finishing work. In either case, if it does 307 * to go sleep, we'll kick off a new task for this work anyway. 308 */ 309 wake_up_process(worker->task); 310 io_worker_release(worker); 311 return true; 312 } 313 314 return false; 315 } 316 317 /* 318 * We need a worker. If we find a free one, we're good. If not, and we're 319 * below the max number of workers, create one. 320 */ 321 static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct) 322 { 323 /* 324 * Most likely an attempt to queue unbounded work on an io_wq that 325 * wasn't setup with any unbounded workers. 326 */ 327 if (unlikely(!acct->max_workers)) 328 pr_warn_once("io-wq is not configured for unbound workers"); 329 330 raw_spin_lock(&acct->workers_lock); 331 if (acct->nr_workers >= acct->max_workers) { 332 raw_spin_unlock(&acct->workers_lock); 333 return true; 334 } 335 acct->nr_workers++; 336 raw_spin_unlock(&acct->workers_lock); 337 atomic_inc(&acct->nr_running); 338 atomic_inc(&wq->worker_refs); 339 return create_io_worker(wq, acct); 340 } 341 342 static void io_wq_inc_running(struct io_worker *worker) 343 { 344 struct io_wq_acct *acct = io_wq_get_acct(worker); 345 346 atomic_inc(&acct->nr_running); 347 } 348 349 static void create_worker_cb(struct callback_head *cb) 350 { 351 struct io_worker *worker; 352 struct io_wq *wq; 353 354 struct io_wq_acct *acct; 355 bool do_create = false; 356 357 worker = container_of(cb, struct io_worker, create_work); 358 wq = worker->wq; 359 acct = worker->acct; 360 raw_spin_lock(&acct->workers_lock); 361 362 if (acct->nr_workers < acct->max_workers) { 363 acct->nr_workers++; 364 do_create = true; 365 } 366 raw_spin_unlock(&acct->workers_lock); 367 if (do_create) { 368 create_io_worker(wq, acct); 369 } else { 370 atomic_dec(&acct->nr_running); 371 io_worker_ref_put(wq); 372 } 373 clear_bit_unlock(0, &worker->create_state); 374 io_worker_release(worker); 375 } 376 377 static bool io_queue_worker_create(struct io_worker *worker, 378 struct io_wq_acct *acct, 379 task_work_func_t func) 380 { 381 struct io_wq *wq = worker->wq; 382 383 /* raced with exit, just ignore create call */ 384 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) 385 goto fail; 386 if (!io_worker_get(worker)) 387 goto fail; 388 /* 389 * create_state manages ownership of create_work/index. We should 390 * only need one entry per worker, as the worker going to sleep 391 * will trigger the condition, and waking will clear it once it 392 * runs the task_work. 393 */ 394 if (test_bit(0, &worker->create_state) || 395 test_and_set_bit_lock(0, &worker->create_state)) 396 goto fail_release; 397 398 atomic_inc(&wq->worker_refs); 399 init_task_work(&worker->create_work, func); 400 if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) { 401 /* 402 * EXIT may have been set after checking it above, check after 403 * adding the task_work and remove any creation item if it is 404 * now set. wq exit does that too, but we can have added this 405 * work item after we canceled in io_wq_exit_workers(). 406 */ 407 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) 408 io_wq_cancel_tw_create(wq); 409 io_worker_ref_put(wq); 410 return true; 411 } 412 io_worker_ref_put(wq); 413 clear_bit_unlock(0, &worker->create_state); 414 fail_release: 415 io_worker_release(worker); 416 fail: 417 atomic_dec(&acct->nr_running); 418 io_worker_ref_put(wq); 419 return false; 420 } 421 422 /* Defer if current and next work are both hashed to the same chain */ 423 static bool io_wq_hash_defer(struct io_wq_work *work, struct io_wq_acct *acct) 424 { 425 unsigned int hash, work_flags; 426 struct io_wq_work *next; 427 428 lockdep_assert_held(&acct->lock); 429 430 work_flags = atomic_read(&work->flags); 431 if (!__io_wq_is_hashed(work_flags)) 432 return false; 433 434 /* should not happen, io_acct_run_queue() said we had work */ 435 if (wq_list_empty(&acct->work_list)) 436 return true; 437 438 hash = __io_get_work_hash(work_flags); 439 next = container_of(acct->work_list.first, struct io_wq_work, list); 440 work_flags = atomic_read(&next->flags); 441 if (!__io_wq_is_hashed(work_flags)) 442 return false; 443 return hash == __io_get_work_hash(work_flags); 444 } 445 446 static void io_wq_dec_running(struct io_worker *worker) 447 { 448 struct io_wq_acct *acct = io_wq_get_acct(worker); 449 struct io_wq *wq = worker->wq; 450 451 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 452 return; 453 454 if (!atomic_dec_and_test(&acct->nr_running)) 455 return; 456 if (!worker->cur_work) 457 return; 458 if (!io_acct_run_queue(acct)) 459 return; 460 if (io_wq_hash_defer(worker->cur_work, acct)) { 461 raw_spin_unlock(&acct->lock); 462 return; 463 } 464 465 raw_spin_unlock(&acct->lock); 466 atomic_inc(&acct->nr_running); 467 atomic_inc(&wq->worker_refs); 468 io_queue_worker_create(worker, acct, create_worker_cb); 469 } 470 471 /* 472 * Worker will start processing some work. Move it to the busy list, if 473 * it's currently on the freelist 474 */ 475 static void __io_worker_busy(struct io_wq_acct *acct, struct io_worker *worker) 476 { 477 if (test_bit(IO_WORKER_F_FREE, &worker->flags)) { 478 clear_bit(IO_WORKER_F_FREE, &worker->flags); 479 raw_spin_lock(&acct->workers_lock); 480 hlist_nulls_del_init_rcu(&worker->nulls_node); 481 raw_spin_unlock(&acct->workers_lock); 482 } 483 } 484 485 /* 486 * No work, worker going to sleep. Move to freelist. 487 */ 488 static void __io_worker_idle(struct io_wq_acct *acct, struct io_worker *worker) 489 __must_hold(acct->workers_lock) 490 { 491 if (!test_bit(IO_WORKER_F_FREE, &worker->flags)) { 492 set_bit(IO_WORKER_F_FREE, &worker->flags); 493 hlist_nulls_add_head_rcu(&worker->nulls_node, &acct->free_list); 494 } 495 } 496 497 static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash) 498 { 499 bool ret = false; 500 501 spin_lock_irq(&wq->hash->wait.lock); 502 if (list_empty(&wq->wait.entry)) { 503 __add_wait_queue(&wq->hash->wait, &wq->wait); 504 if (!test_bit(hash, &wq->hash->map)) { 505 __set_current_state(TASK_RUNNING); 506 list_del_init(&wq->wait.entry); 507 ret = true; 508 } 509 } 510 spin_unlock_irq(&wq->hash->wait.lock); 511 return ret; 512 } 513 514 static struct io_wq_work *io_get_next_work(struct io_wq_acct *acct, 515 struct io_wq *wq) 516 __must_hold(acct->lock) 517 { 518 struct io_wq_work_node *node, *prev; 519 struct io_wq_work *work, *tail; 520 unsigned int stall_hash = -1U; 521 522 wq_list_for_each(node, prev, &acct->work_list) { 523 unsigned int work_flags; 524 unsigned int hash; 525 526 work = container_of(node, struct io_wq_work, list); 527 528 /* not hashed, can run anytime */ 529 work_flags = atomic_read(&work->flags); 530 if (!__io_wq_is_hashed(work_flags)) { 531 wq_list_del(&acct->work_list, node, prev); 532 return work; 533 } 534 535 hash = __io_get_work_hash(work_flags); 536 /* all items with this hash lie in [work, tail] */ 537 tail = wq->hash_tail[hash]; 538 539 /* hashed, can run if not already running */ 540 if (!test_and_set_bit(hash, &wq->hash->map)) { 541 wq->hash_tail[hash] = NULL; 542 wq_list_cut(&acct->work_list, &tail->list, prev); 543 return work; 544 } 545 if (stall_hash == -1U) 546 stall_hash = hash; 547 /* fast forward to a next hash, for-each will fix up @prev */ 548 node = &tail->list; 549 } 550 551 if (stall_hash != -1U) { 552 bool unstalled; 553 554 /* 555 * Set this before dropping the lock to avoid racing with new 556 * work being added and clearing the stalled bit. 557 */ 558 set_bit(IO_ACCT_STALLED_BIT, &acct->flags); 559 raw_spin_unlock(&acct->lock); 560 unstalled = io_wait_on_hash(wq, stall_hash); 561 raw_spin_lock(&acct->lock); 562 if (unstalled) { 563 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 564 if (wq_has_sleeper(&wq->hash->wait)) 565 wake_up(&wq->hash->wait); 566 } 567 } 568 569 return NULL; 570 } 571 572 static void io_assign_current_work(struct io_worker *worker, 573 struct io_wq_work *work) 574 { 575 if (work) { 576 io_run_task_work(); 577 cond_resched(); 578 } 579 580 raw_spin_lock(&worker->lock); 581 worker->cur_work = work; 582 raw_spin_unlock(&worker->lock); 583 } 584 585 /* 586 * Called with acct->lock held, drops it before returning 587 */ 588 static void io_worker_handle_work(struct io_wq_acct *acct, 589 struct io_worker *worker) 590 __releases(&acct->lock) 591 { 592 struct io_wq *wq = worker->wq; 593 bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state); 594 595 do { 596 struct io_wq_work *work; 597 598 /* 599 * If we got some work, mark us as busy. If we didn't, but 600 * the list isn't empty, it means we stalled on hashed work. 601 * Mark us stalled so we don't keep looking for work when we 602 * can't make progress, any work completion or insertion will 603 * clear the stalled flag. 604 */ 605 work = io_get_next_work(acct, wq); 606 if (work) { 607 /* 608 * Make sure cancelation can find this, even before 609 * it becomes the active work. That avoids a window 610 * where the work has been removed from our general 611 * work list, but isn't yet discoverable as the 612 * current work item for this worker. 613 */ 614 raw_spin_lock(&worker->lock); 615 worker->cur_work = work; 616 raw_spin_unlock(&worker->lock); 617 } 618 619 raw_spin_unlock(&acct->lock); 620 621 if (!work) 622 break; 623 624 __io_worker_busy(acct, worker); 625 626 io_assign_current_work(worker, work); 627 __set_current_state(TASK_RUNNING); 628 629 /* handle a whole dependent link */ 630 do { 631 struct io_wq_work *next_hashed, *linked; 632 unsigned int work_flags = atomic_read(&work->flags); 633 unsigned int hash = __io_wq_is_hashed(work_flags) 634 ? __io_get_work_hash(work_flags) 635 : -1U; 636 637 next_hashed = wq_next_work(work); 638 639 if (do_kill && 640 (work_flags & IO_WQ_WORK_UNBOUND)) 641 atomic_or(IO_WQ_WORK_CANCEL, &work->flags); 642 io_wq_submit_work(work); 643 io_assign_current_work(worker, NULL); 644 645 linked = io_wq_free_work(work); 646 work = next_hashed; 647 if (!work && linked && !io_wq_is_hashed(linked)) { 648 work = linked; 649 linked = NULL; 650 } 651 io_assign_current_work(worker, work); 652 if (linked) 653 io_wq_enqueue(wq, linked); 654 655 if (hash != -1U && !next_hashed) { 656 /* serialize hash clear with wake_up() */ 657 spin_lock_irq(&wq->hash->wait.lock); 658 clear_bit(hash, &wq->hash->map); 659 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 660 spin_unlock_irq(&wq->hash->wait.lock); 661 if (wq_has_sleeper(&wq->hash->wait)) 662 wake_up(&wq->hash->wait); 663 } 664 } while (work); 665 666 if (!__io_acct_run_queue(acct)) 667 break; 668 raw_spin_lock(&acct->lock); 669 } while (1); 670 } 671 672 static int io_wq_worker(void *data) 673 { 674 struct io_worker *worker = data; 675 struct io_wq_acct *acct = io_wq_get_acct(worker); 676 struct io_wq *wq = worker->wq; 677 bool exit_mask = false, last_timeout = false; 678 char buf[TASK_COMM_LEN] = {}; 679 680 set_mask_bits(&worker->flags, 0, 681 BIT(IO_WORKER_F_UP) | BIT(IO_WORKER_F_RUNNING)); 682 683 snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid); 684 set_task_comm(current, buf); 685 686 while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { 687 long ret; 688 689 set_current_state(TASK_INTERRUPTIBLE); 690 691 /* 692 * If we have work to do, io_acct_run_queue() returns with 693 * the acct->lock held. If not, it will drop it. 694 */ 695 while (io_acct_run_queue(acct)) 696 io_worker_handle_work(acct, worker); 697 698 raw_spin_lock(&acct->workers_lock); 699 /* 700 * Last sleep timed out. Exit if we're not the last worker, 701 * or if someone modified our affinity. 702 */ 703 if (last_timeout && (exit_mask || acct->nr_workers > 1)) { 704 acct->nr_workers--; 705 raw_spin_unlock(&acct->workers_lock); 706 __set_current_state(TASK_RUNNING); 707 break; 708 } 709 last_timeout = false; 710 __io_worker_idle(acct, worker); 711 raw_spin_unlock(&acct->workers_lock); 712 if (io_run_task_work()) 713 continue; 714 ret = schedule_timeout(WORKER_IDLE_TIMEOUT); 715 if (signal_pending(current)) { 716 struct ksignal ksig; 717 718 if (!get_signal(&ksig)) 719 continue; 720 break; 721 } 722 if (!ret) { 723 last_timeout = true; 724 exit_mask = !cpumask_test_cpu(raw_smp_processor_id(), 725 wq->cpu_mask); 726 } 727 } 728 729 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) && io_acct_run_queue(acct)) 730 io_worker_handle_work(acct, worker); 731 732 io_worker_exit(worker); 733 return 0; 734 } 735 736 /* 737 * Called when a worker is scheduled in. Mark us as currently running. 738 */ 739 void io_wq_worker_running(struct task_struct *tsk) 740 { 741 struct io_worker *worker = tsk->worker_private; 742 743 if (!worker) 744 return; 745 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 746 return; 747 if (test_bit(IO_WORKER_F_RUNNING, &worker->flags)) 748 return; 749 set_bit(IO_WORKER_F_RUNNING, &worker->flags); 750 io_wq_inc_running(worker); 751 } 752 753 /* 754 * Called when worker is going to sleep. If there are no workers currently 755 * running and we have work pending, wake up a free one or create a new one. 756 */ 757 void io_wq_worker_sleeping(struct task_struct *tsk) 758 { 759 struct io_worker *worker = tsk->worker_private; 760 761 if (!worker) 762 return; 763 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 764 return; 765 if (!test_bit(IO_WORKER_F_RUNNING, &worker->flags)) 766 return; 767 768 clear_bit(IO_WORKER_F_RUNNING, &worker->flags); 769 io_wq_dec_running(worker); 770 } 771 772 static void io_init_new_worker(struct io_wq *wq, struct io_wq_acct *acct, struct io_worker *worker, 773 struct task_struct *tsk) 774 { 775 tsk->worker_private = worker; 776 worker->task = tsk; 777 set_cpus_allowed_ptr(tsk, wq->cpu_mask); 778 779 raw_spin_lock(&acct->workers_lock); 780 hlist_nulls_add_head_rcu(&worker->nulls_node, &acct->free_list); 781 list_add_tail_rcu(&worker->all_list, &acct->all_list); 782 set_bit(IO_WORKER_F_FREE, &worker->flags); 783 raw_spin_unlock(&acct->workers_lock); 784 wake_up_new_task(tsk); 785 } 786 787 static bool io_wq_work_match_all(struct io_wq_work *work, void *data) 788 { 789 return true; 790 } 791 792 static inline bool io_should_retry_thread(struct io_worker *worker, long err) 793 { 794 /* 795 * Prevent perpetual task_work retry, if the task (or its group) is 796 * exiting. 797 */ 798 if (fatal_signal_pending(current)) 799 return false; 800 if (worker->init_retries++ >= WORKER_INIT_LIMIT) 801 return false; 802 803 switch (err) { 804 case -EAGAIN: 805 case -ERESTARTSYS: 806 case -ERESTARTNOINTR: 807 case -ERESTARTNOHAND: 808 return true; 809 default: 810 return false; 811 } 812 } 813 814 static void queue_create_worker_retry(struct io_worker *worker) 815 { 816 /* 817 * We only bother retrying because there's a chance that the 818 * failure to create a worker is due to some temporary condition 819 * in the forking task (e.g. outstanding signal); give the task 820 * some time to clear that condition. 821 */ 822 schedule_delayed_work(&worker->work, 823 msecs_to_jiffies(worker->init_retries * 5)); 824 } 825 826 static void create_worker_cont(struct callback_head *cb) 827 { 828 struct io_worker *worker; 829 struct task_struct *tsk; 830 struct io_wq *wq; 831 struct io_wq_acct *acct; 832 833 worker = container_of(cb, struct io_worker, create_work); 834 clear_bit_unlock(0, &worker->create_state); 835 wq = worker->wq; 836 acct = io_wq_get_acct(worker); 837 tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); 838 if (!IS_ERR(tsk)) { 839 io_init_new_worker(wq, acct, worker, tsk); 840 io_worker_release(worker); 841 return; 842 } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) { 843 atomic_dec(&acct->nr_running); 844 raw_spin_lock(&acct->workers_lock); 845 acct->nr_workers--; 846 if (!acct->nr_workers) { 847 struct io_cb_cancel_data match = { 848 .fn = io_wq_work_match_all, 849 .cancel_all = true, 850 }; 851 852 raw_spin_unlock(&acct->workers_lock); 853 while (io_acct_cancel_pending_work(wq, acct, &match)) 854 ; 855 } else { 856 raw_spin_unlock(&acct->workers_lock); 857 } 858 io_worker_ref_put(wq); 859 kfree(worker); 860 return; 861 } 862 863 /* re-create attempts grab a new worker ref, drop the existing one */ 864 io_worker_release(worker); 865 queue_create_worker_retry(worker); 866 } 867 868 static void io_workqueue_create(struct work_struct *work) 869 { 870 struct io_worker *worker = container_of(work, struct io_worker, 871 work.work); 872 struct io_wq_acct *acct = io_wq_get_acct(worker); 873 874 if (!io_queue_worker_create(worker, acct, create_worker_cont)) 875 kfree(worker); 876 } 877 878 static bool create_io_worker(struct io_wq *wq, struct io_wq_acct *acct) 879 { 880 struct io_worker *worker; 881 struct task_struct *tsk; 882 883 __set_current_state(TASK_RUNNING); 884 885 worker = kzalloc(sizeof(*worker), GFP_KERNEL); 886 if (!worker) { 887 fail: 888 atomic_dec(&acct->nr_running); 889 raw_spin_lock(&acct->workers_lock); 890 acct->nr_workers--; 891 raw_spin_unlock(&acct->workers_lock); 892 io_worker_ref_put(wq); 893 return false; 894 } 895 896 refcount_set(&worker->ref, 1); 897 worker->wq = wq; 898 worker->acct = acct; 899 raw_spin_lock_init(&worker->lock); 900 init_completion(&worker->ref_done); 901 902 tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); 903 if (!IS_ERR(tsk)) { 904 io_init_new_worker(wq, acct, worker, tsk); 905 } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) { 906 kfree(worker); 907 goto fail; 908 } else { 909 INIT_DELAYED_WORK(&worker->work, io_workqueue_create); 910 queue_create_worker_retry(worker); 911 } 912 913 return true; 914 } 915 916 /* 917 * Iterate the passed in list and call the specific function for each 918 * worker that isn't exiting 919 */ 920 static bool io_acct_for_each_worker(struct io_wq_acct *acct, 921 bool (*func)(struct io_worker *, void *), 922 void *data) 923 { 924 struct io_worker *worker; 925 bool ret = false; 926 927 list_for_each_entry_rcu(worker, &acct->all_list, all_list) { 928 if (io_worker_get(worker)) { 929 /* no task if node is/was offline */ 930 if (worker->task) 931 ret = func(worker, data); 932 io_worker_release(worker); 933 if (ret) 934 break; 935 } 936 } 937 938 return ret; 939 } 940 941 static bool io_wq_for_each_worker(struct io_wq *wq, 942 bool (*func)(struct io_worker *, void *), 943 void *data) 944 { 945 for (int i = 0; i < IO_WQ_ACCT_NR; i++) { 946 if (!io_acct_for_each_worker(&wq->acct[i], func, data)) 947 return false; 948 } 949 950 return true; 951 } 952 953 static bool io_wq_worker_wake(struct io_worker *worker, void *data) 954 { 955 __set_notify_signal(worker->task); 956 wake_up_process(worker->task); 957 return false; 958 } 959 960 static void io_run_cancel(struct io_wq_work *work, struct io_wq *wq) 961 { 962 do { 963 atomic_or(IO_WQ_WORK_CANCEL, &work->flags); 964 io_wq_submit_work(work); 965 work = io_wq_free_work(work); 966 } while (work); 967 } 968 969 static void io_wq_insert_work(struct io_wq *wq, struct io_wq_acct *acct, 970 struct io_wq_work *work, unsigned int work_flags) 971 { 972 unsigned int hash; 973 struct io_wq_work *tail; 974 975 if (!__io_wq_is_hashed(work_flags)) { 976 append: 977 wq_list_add_tail(&work->list, &acct->work_list); 978 return; 979 } 980 981 hash = __io_get_work_hash(work_flags); 982 tail = wq->hash_tail[hash]; 983 wq->hash_tail[hash] = work; 984 if (!tail) 985 goto append; 986 987 wq_list_add_after(&work->list, &tail->list, &acct->work_list); 988 } 989 990 static bool io_wq_work_match_item(struct io_wq_work *work, void *data) 991 { 992 return work == data; 993 } 994 995 void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) 996 { 997 unsigned int work_flags = atomic_read(&work->flags); 998 struct io_wq_acct *acct = io_work_get_acct(wq, work_flags); 999 struct io_cb_cancel_data match = { 1000 .fn = io_wq_work_match_item, 1001 .data = work, 1002 .cancel_all = false, 1003 }; 1004 bool do_create; 1005 1006 /* 1007 * If io-wq is exiting for this task, or if the request has explicitly 1008 * been marked as one that should not get executed, cancel it here. 1009 */ 1010 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) || 1011 (work_flags & IO_WQ_WORK_CANCEL)) { 1012 io_run_cancel(work, wq); 1013 return; 1014 } 1015 1016 raw_spin_lock(&acct->lock); 1017 io_wq_insert_work(wq, acct, work, work_flags); 1018 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 1019 raw_spin_unlock(&acct->lock); 1020 1021 rcu_read_lock(); 1022 do_create = !io_acct_activate_free_worker(acct); 1023 rcu_read_unlock(); 1024 1025 if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) || 1026 !atomic_read(&acct->nr_running))) { 1027 bool did_create; 1028 1029 did_create = io_wq_create_worker(wq, acct); 1030 if (likely(did_create)) 1031 return; 1032 1033 raw_spin_lock(&acct->workers_lock); 1034 if (acct->nr_workers) { 1035 raw_spin_unlock(&acct->workers_lock); 1036 return; 1037 } 1038 raw_spin_unlock(&acct->workers_lock); 1039 1040 /* fatal condition, failed to create the first worker */ 1041 io_acct_cancel_pending_work(wq, acct, &match); 1042 } 1043 } 1044 1045 /* 1046 * Work items that hash to the same value will not be done in parallel. 1047 * Used to limit concurrent writes, generally hashed by inode. 1048 */ 1049 void io_wq_hash_work(struct io_wq_work *work, void *val) 1050 { 1051 unsigned int bit; 1052 1053 bit = hash_ptr(val, IO_WQ_HASH_ORDER); 1054 atomic_or(IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT), &work->flags); 1055 } 1056 1057 static bool __io_wq_worker_cancel(struct io_worker *worker, 1058 struct io_cb_cancel_data *match, 1059 struct io_wq_work *work) 1060 { 1061 if (work && match->fn(work, match->data)) { 1062 atomic_or(IO_WQ_WORK_CANCEL, &work->flags); 1063 __set_notify_signal(worker->task); 1064 return true; 1065 } 1066 1067 return false; 1068 } 1069 1070 static bool io_wq_worker_cancel(struct io_worker *worker, void *data) 1071 { 1072 struct io_cb_cancel_data *match = data; 1073 1074 /* 1075 * Hold the lock to avoid ->cur_work going out of scope, caller 1076 * may dereference the passed in work. 1077 */ 1078 raw_spin_lock(&worker->lock); 1079 if (__io_wq_worker_cancel(worker, match, worker->cur_work)) 1080 match->nr_running++; 1081 raw_spin_unlock(&worker->lock); 1082 1083 return match->nr_running && !match->cancel_all; 1084 } 1085 1086 static inline void io_wq_remove_pending(struct io_wq *wq, 1087 struct io_wq_acct *acct, 1088 struct io_wq_work *work, 1089 struct io_wq_work_node *prev) 1090 { 1091 unsigned int hash = io_get_work_hash(work); 1092 struct io_wq_work *prev_work = NULL; 1093 1094 if (io_wq_is_hashed(work) && work == wq->hash_tail[hash]) { 1095 if (prev) 1096 prev_work = container_of(prev, struct io_wq_work, list); 1097 if (prev_work && io_get_work_hash(prev_work) == hash) 1098 wq->hash_tail[hash] = prev_work; 1099 else 1100 wq->hash_tail[hash] = NULL; 1101 } 1102 wq_list_del(&acct->work_list, &work->list, prev); 1103 } 1104 1105 static bool io_acct_cancel_pending_work(struct io_wq *wq, 1106 struct io_wq_acct *acct, 1107 struct io_cb_cancel_data *match) 1108 { 1109 struct io_wq_work_node *node, *prev; 1110 struct io_wq_work *work; 1111 1112 raw_spin_lock(&acct->lock); 1113 wq_list_for_each(node, prev, &acct->work_list) { 1114 work = container_of(node, struct io_wq_work, list); 1115 if (!match->fn(work, match->data)) 1116 continue; 1117 io_wq_remove_pending(wq, acct, work, prev); 1118 raw_spin_unlock(&acct->lock); 1119 io_run_cancel(work, wq); 1120 match->nr_pending++; 1121 /* not safe to continue after unlock */ 1122 return true; 1123 } 1124 raw_spin_unlock(&acct->lock); 1125 1126 return false; 1127 } 1128 1129 static void io_wq_cancel_pending_work(struct io_wq *wq, 1130 struct io_cb_cancel_data *match) 1131 { 1132 int i; 1133 retry: 1134 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1135 struct io_wq_acct *acct = io_get_acct(wq, i == 0); 1136 1137 if (io_acct_cancel_pending_work(wq, acct, match)) { 1138 if (match->cancel_all) 1139 goto retry; 1140 break; 1141 } 1142 } 1143 } 1144 1145 static void io_acct_cancel_running_work(struct io_wq_acct *acct, 1146 struct io_cb_cancel_data *match) 1147 { 1148 raw_spin_lock(&acct->workers_lock); 1149 io_acct_for_each_worker(acct, io_wq_worker_cancel, match); 1150 raw_spin_unlock(&acct->workers_lock); 1151 } 1152 1153 static void io_wq_cancel_running_work(struct io_wq *wq, 1154 struct io_cb_cancel_data *match) 1155 { 1156 rcu_read_lock(); 1157 1158 for (int i = 0; i < IO_WQ_ACCT_NR; i++) 1159 io_acct_cancel_running_work(&wq->acct[i], match); 1160 1161 rcu_read_unlock(); 1162 } 1163 1164 enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, 1165 void *data, bool cancel_all) 1166 { 1167 struct io_cb_cancel_data match = { 1168 .fn = cancel, 1169 .data = data, 1170 .cancel_all = cancel_all, 1171 }; 1172 1173 /* 1174 * First check pending list, if we're lucky we can just remove it 1175 * from there. CANCEL_OK means that the work is returned as-new, 1176 * no completion will be posted for it. 1177 * 1178 * Then check if a free (going busy) or busy worker has the work 1179 * currently running. If we find it there, we'll return CANCEL_RUNNING 1180 * as an indication that we attempt to signal cancellation. The 1181 * completion will run normally in this case. 1182 * 1183 * Do both of these while holding the acct->workers_lock, to ensure that 1184 * we'll find a work item regardless of state. 1185 */ 1186 io_wq_cancel_pending_work(wq, &match); 1187 if (match.nr_pending && !match.cancel_all) 1188 return IO_WQ_CANCEL_OK; 1189 1190 io_wq_cancel_running_work(wq, &match); 1191 if (match.nr_running && !match.cancel_all) 1192 return IO_WQ_CANCEL_RUNNING; 1193 1194 if (match.nr_running) 1195 return IO_WQ_CANCEL_RUNNING; 1196 if (match.nr_pending) 1197 return IO_WQ_CANCEL_OK; 1198 return IO_WQ_CANCEL_NOTFOUND; 1199 } 1200 1201 static int io_wq_hash_wake(struct wait_queue_entry *wait, unsigned mode, 1202 int sync, void *key) 1203 { 1204 struct io_wq *wq = container_of(wait, struct io_wq, wait); 1205 int i; 1206 1207 list_del_init(&wait->entry); 1208 1209 rcu_read_lock(); 1210 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1211 struct io_wq_acct *acct = &wq->acct[i]; 1212 1213 if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags)) 1214 io_acct_activate_free_worker(acct); 1215 } 1216 rcu_read_unlock(); 1217 return 1; 1218 } 1219 1220 struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) 1221 { 1222 int ret, i; 1223 struct io_wq *wq; 1224 1225 if (WARN_ON_ONCE(!bounded)) 1226 return ERR_PTR(-EINVAL); 1227 1228 wq = kzalloc(sizeof(struct io_wq), GFP_KERNEL); 1229 if (!wq) 1230 return ERR_PTR(-ENOMEM); 1231 1232 refcount_inc(&data->hash->refs); 1233 wq->hash = data->hash; 1234 1235 ret = -ENOMEM; 1236 1237 if (!alloc_cpumask_var(&wq->cpu_mask, GFP_KERNEL)) 1238 goto err; 1239 cpuset_cpus_allowed(data->task, wq->cpu_mask); 1240 wq->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; 1241 wq->acct[IO_WQ_ACCT_UNBOUND].max_workers = 1242 task_rlimit(current, RLIMIT_NPROC); 1243 INIT_LIST_HEAD(&wq->wait.entry); 1244 wq->wait.func = io_wq_hash_wake; 1245 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1246 struct io_wq_acct *acct = &wq->acct[i]; 1247 1248 atomic_set(&acct->nr_running, 0); 1249 1250 raw_spin_lock_init(&acct->workers_lock); 1251 INIT_HLIST_NULLS_HEAD(&acct->free_list, 0); 1252 INIT_LIST_HEAD(&acct->all_list); 1253 1254 INIT_WQ_LIST(&acct->work_list); 1255 raw_spin_lock_init(&acct->lock); 1256 } 1257 1258 wq->task = get_task_struct(data->task); 1259 atomic_set(&wq->worker_refs, 1); 1260 init_completion(&wq->worker_done); 1261 ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1262 if (ret) { 1263 put_task_struct(wq->task); 1264 goto err; 1265 } 1266 1267 return wq; 1268 err: 1269 io_wq_put_hash(data->hash); 1270 free_cpumask_var(wq->cpu_mask); 1271 kfree(wq); 1272 return ERR_PTR(ret); 1273 } 1274 1275 static bool io_task_work_match(struct callback_head *cb, void *data) 1276 { 1277 struct io_worker *worker; 1278 1279 if (cb->func != create_worker_cb && cb->func != create_worker_cont) 1280 return false; 1281 worker = container_of(cb, struct io_worker, create_work); 1282 return worker->wq == data; 1283 } 1284 1285 void io_wq_exit_start(struct io_wq *wq) 1286 { 1287 set_bit(IO_WQ_BIT_EXIT, &wq->state); 1288 } 1289 1290 static void io_wq_cancel_tw_create(struct io_wq *wq) 1291 { 1292 struct callback_head *cb; 1293 1294 while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) { 1295 struct io_worker *worker; 1296 1297 worker = container_of(cb, struct io_worker, create_work); 1298 io_worker_cancel_cb(worker); 1299 /* 1300 * Only the worker continuation helper has worker allocated and 1301 * hence needs freeing. 1302 */ 1303 if (cb->func == create_worker_cont) 1304 kfree(worker); 1305 } 1306 } 1307 1308 static void io_wq_exit_workers(struct io_wq *wq) 1309 { 1310 if (!wq->task) 1311 return; 1312 1313 io_wq_cancel_tw_create(wq); 1314 1315 rcu_read_lock(); 1316 io_wq_for_each_worker(wq, io_wq_worker_wake, NULL); 1317 rcu_read_unlock(); 1318 io_worker_ref_put(wq); 1319 wait_for_completion(&wq->worker_done); 1320 1321 spin_lock_irq(&wq->hash->wait.lock); 1322 list_del_init(&wq->wait.entry); 1323 spin_unlock_irq(&wq->hash->wait.lock); 1324 1325 put_task_struct(wq->task); 1326 wq->task = NULL; 1327 } 1328 1329 static void io_wq_destroy(struct io_wq *wq) 1330 { 1331 struct io_cb_cancel_data match = { 1332 .fn = io_wq_work_match_all, 1333 .cancel_all = true, 1334 }; 1335 1336 cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1337 io_wq_cancel_pending_work(wq, &match); 1338 free_cpumask_var(wq->cpu_mask); 1339 io_wq_put_hash(wq->hash); 1340 kfree(wq); 1341 } 1342 1343 void io_wq_put_and_exit(struct io_wq *wq) 1344 { 1345 WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state)); 1346 1347 io_wq_exit_workers(wq); 1348 io_wq_destroy(wq); 1349 } 1350 1351 struct online_data { 1352 unsigned int cpu; 1353 bool online; 1354 }; 1355 1356 static bool io_wq_worker_affinity(struct io_worker *worker, void *data) 1357 { 1358 struct online_data *od = data; 1359 1360 if (od->online) 1361 cpumask_set_cpu(od->cpu, worker->wq->cpu_mask); 1362 else 1363 cpumask_clear_cpu(od->cpu, worker->wq->cpu_mask); 1364 return false; 1365 } 1366 1367 static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online) 1368 { 1369 struct online_data od = { 1370 .cpu = cpu, 1371 .online = online 1372 }; 1373 1374 rcu_read_lock(); 1375 io_wq_for_each_worker(wq, io_wq_worker_affinity, &od); 1376 rcu_read_unlock(); 1377 return 0; 1378 } 1379 1380 static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node) 1381 { 1382 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1383 1384 return __io_wq_cpu_online(wq, cpu, true); 1385 } 1386 1387 static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node) 1388 { 1389 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1390 1391 return __io_wq_cpu_online(wq, cpu, false); 1392 } 1393 1394 int io_wq_cpu_affinity(struct io_uring_task *tctx, cpumask_var_t mask) 1395 { 1396 cpumask_var_t allowed_mask; 1397 int ret = 0; 1398 1399 if (!tctx || !tctx->io_wq) 1400 return -EINVAL; 1401 1402 if (!alloc_cpumask_var(&allowed_mask, GFP_KERNEL)) 1403 return -ENOMEM; 1404 1405 rcu_read_lock(); 1406 cpuset_cpus_allowed(tctx->io_wq->task, allowed_mask); 1407 if (mask) { 1408 if (cpumask_subset(mask, allowed_mask)) 1409 cpumask_copy(tctx->io_wq->cpu_mask, mask); 1410 else 1411 ret = -EINVAL; 1412 } else { 1413 cpumask_copy(tctx->io_wq->cpu_mask, allowed_mask); 1414 } 1415 rcu_read_unlock(); 1416 1417 free_cpumask_var(allowed_mask); 1418 return ret; 1419 } 1420 1421 /* 1422 * Set max number of unbounded workers, returns old value. If new_count is 0, 1423 * then just return the old value. 1424 */ 1425 int io_wq_max_workers(struct io_wq *wq, int *new_count) 1426 { 1427 struct io_wq_acct *acct; 1428 int prev[IO_WQ_ACCT_NR]; 1429 int i; 1430 1431 BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND); 1432 BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND); 1433 BUILD_BUG_ON((int) IO_WQ_ACCT_NR != 2); 1434 1435 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1436 if (new_count[i] > task_rlimit(current, RLIMIT_NPROC)) 1437 new_count[i] = task_rlimit(current, RLIMIT_NPROC); 1438 } 1439 1440 for (i = 0; i < IO_WQ_ACCT_NR; i++) 1441 prev[i] = 0; 1442 1443 rcu_read_lock(); 1444 1445 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1446 acct = &wq->acct[i]; 1447 raw_spin_lock(&acct->workers_lock); 1448 prev[i] = max_t(int, acct->max_workers, prev[i]); 1449 if (new_count[i]) 1450 acct->max_workers = new_count[i]; 1451 raw_spin_unlock(&acct->workers_lock); 1452 } 1453 rcu_read_unlock(); 1454 1455 for (i = 0; i < IO_WQ_ACCT_NR; i++) 1456 new_count[i] = prev[i]; 1457 1458 return 0; 1459 } 1460 1461 static __init int io_wq_init(void) 1462 { 1463 int ret; 1464 1465 ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online", 1466 io_wq_cpu_online, io_wq_cpu_offline); 1467 if (ret < 0) 1468 return ret; 1469 io_wq_online = ret; 1470 return 0; 1471 } 1472 subsys_initcall(io_wq_init); 1473