1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Basic worker thread pool for io_uring 4 * 5 * Copyright (C) 2019 Jens Axboe 6 * 7 */ 8 #include <linux/kernel.h> 9 #include <linux/init.h> 10 #include <linux/errno.h> 11 #include <linux/sched/signal.h> 12 #include <linux/percpu.h> 13 #include <linux/slab.h> 14 #include <linux/rculist_nulls.h> 15 #include <linux/cpu.h> 16 #include <linux/cpuset.h> 17 #include <linux/task_work.h> 18 #include <linux/audit.h> 19 #include <linux/mmu_context.h> 20 #include <linux/sched/sysctl.h> 21 #include <uapi/linux/io_uring.h> 22 23 #include "io-wq.h" 24 #include "slist.h" 25 #include "io_uring.h" 26 27 #define WORKER_IDLE_TIMEOUT (5 * HZ) 28 #define WORKER_INIT_LIMIT 3 29 30 enum { 31 IO_WORKER_F_UP = 0, /* up and active */ 32 IO_WORKER_F_RUNNING = 1, /* account as running */ 33 IO_WORKER_F_FREE = 2, /* worker on free list */ 34 }; 35 36 enum { 37 IO_WQ_BIT_EXIT = 0, /* wq exiting */ 38 IO_WQ_BIT_EXIT_ON_IDLE = 1, /* allow all workers to exit on idle */ 39 }; 40 41 enum { 42 IO_ACCT_STALLED_BIT = 0, /* stalled on hash */ 43 }; 44 45 /* 46 * One for each thread in a wq pool 47 */ 48 struct io_worker { 49 refcount_t ref; 50 unsigned long flags; 51 struct hlist_nulls_node nulls_node; 52 struct list_head all_list; 53 struct task_struct *task; 54 struct io_wq *wq; 55 struct io_wq_acct *acct; 56 57 struct io_wq_work *cur_work; 58 raw_spinlock_t lock; 59 60 struct completion ref_done; 61 62 unsigned long create_state; 63 struct callback_head create_work; 64 int init_retries; 65 66 union { 67 struct rcu_head rcu; 68 struct delayed_work work; 69 }; 70 }; 71 72 #if BITS_PER_LONG == 64 73 #define IO_WQ_HASH_ORDER 6 74 #else 75 #define IO_WQ_HASH_ORDER 5 76 #endif 77 78 #define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER) 79 80 struct io_wq_acct { 81 /** 82 * Protects access to the worker lists. 83 */ 84 raw_spinlock_t workers_lock; 85 86 unsigned nr_workers; 87 unsigned max_workers; 88 atomic_t nr_running; 89 90 /** 91 * The list of free workers. Protected by #workers_lock 92 * (write) and RCU (read). 93 */ 94 struct hlist_nulls_head free_list; 95 96 /** 97 * The list of all workers. Protected by #workers_lock 98 * (write) and RCU (read). 99 */ 100 struct list_head all_list; 101 102 raw_spinlock_t lock; 103 struct io_wq_work_list work_list; 104 unsigned long flags; 105 }; 106 107 enum { 108 IO_WQ_ACCT_BOUND, 109 IO_WQ_ACCT_UNBOUND, 110 IO_WQ_ACCT_NR, 111 }; 112 113 /* 114 * Per io_wq state 115 */ 116 struct io_wq { 117 unsigned long state; 118 119 struct io_wq_hash *hash; 120 121 atomic_t worker_refs; 122 struct completion worker_done; 123 124 struct hlist_node cpuhp_node; 125 126 struct task_struct *task; 127 128 struct io_wq_acct acct[IO_WQ_ACCT_NR]; 129 130 struct wait_queue_entry wait; 131 132 struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS]; 133 134 cpumask_var_t cpu_mask; 135 }; 136 137 static enum cpuhp_state io_wq_online; 138 139 struct io_cb_cancel_data { 140 work_cancel_fn *fn; 141 void *data; 142 int nr_running; 143 int nr_pending; 144 bool cancel_all; 145 }; 146 147 static bool create_io_worker(struct io_wq *wq, struct io_wq_acct *acct); 148 static void io_wq_dec_running(struct io_worker *worker); 149 static bool io_acct_cancel_pending_work(struct io_wq *wq, 150 struct io_wq_acct *acct, 151 struct io_cb_cancel_data *match); 152 static void create_worker_cb(struct callback_head *cb); 153 static void io_wq_cancel_tw_create(struct io_wq *wq); 154 155 static inline unsigned int __io_get_work_hash(unsigned int work_flags) 156 { 157 return work_flags >> IO_WQ_HASH_SHIFT; 158 } 159 160 static inline unsigned int io_get_work_hash(struct io_wq_work *work) 161 { 162 return __io_get_work_hash(atomic_read(&work->flags)); 163 } 164 165 static bool io_worker_get(struct io_worker *worker) 166 { 167 return refcount_inc_not_zero(&worker->ref); 168 } 169 170 static void io_worker_release(struct io_worker *worker) 171 { 172 if (refcount_dec_and_test(&worker->ref)) 173 complete(&worker->ref_done); 174 } 175 176 static inline struct io_wq_acct *io_get_acct(struct io_wq *wq, bool bound) 177 { 178 return &wq->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND]; 179 } 180 181 static inline struct io_wq_acct *io_work_get_acct(struct io_wq *wq, 182 unsigned int work_flags) 183 { 184 return io_get_acct(wq, !(work_flags & IO_WQ_WORK_UNBOUND)); 185 } 186 187 static inline struct io_wq_acct *io_wq_get_acct(struct io_worker *worker) 188 { 189 return worker->acct; 190 } 191 192 static void io_worker_ref_put(struct io_wq *wq) 193 { 194 if (atomic_dec_and_test(&wq->worker_refs)) 195 complete(&wq->worker_done); 196 } 197 198 bool io_wq_worker_stopped(void) 199 { 200 struct io_worker *worker = current->worker_private; 201 202 if (WARN_ON_ONCE(!io_wq_current_is_worker())) 203 return true; 204 205 return test_bit(IO_WQ_BIT_EXIT, &worker->wq->state); 206 } 207 208 static void io_worker_cancel_cb(struct io_worker *worker) 209 { 210 struct io_wq_acct *acct = io_wq_get_acct(worker); 211 struct io_wq *wq = worker->wq; 212 213 atomic_dec(&acct->nr_running); 214 raw_spin_lock(&acct->workers_lock); 215 acct->nr_workers--; 216 raw_spin_unlock(&acct->workers_lock); 217 io_worker_ref_put(wq); 218 clear_bit_unlock(0, &worker->create_state); 219 io_worker_release(worker); 220 } 221 222 static bool io_task_worker_match(struct callback_head *cb, void *data) 223 { 224 struct io_worker *worker; 225 226 if (cb->func != create_worker_cb) 227 return false; 228 worker = container_of(cb, struct io_worker, create_work); 229 return worker == data; 230 } 231 232 static void io_worker_exit(struct io_worker *worker) 233 { 234 struct io_wq *wq = worker->wq; 235 struct io_wq_acct *acct = io_wq_get_acct(worker); 236 237 while (1) { 238 struct callback_head *cb = task_work_cancel_match(wq->task, 239 io_task_worker_match, worker); 240 241 if (!cb) 242 break; 243 io_worker_cancel_cb(worker); 244 } 245 246 io_worker_release(worker); 247 wait_for_completion(&worker->ref_done); 248 249 raw_spin_lock(&acct->workers_lock); 250 if (test_bit(IO_WORKER_F_FREE, &worker->flags)) 251 hlist_nulls_del_rcu(&worker->nulls_node); 252 list_del_rcu(&worker->all_list); 253 raw_spin_unlock(&acct->workers_lock); 254 io_wq_dec_running(worker); 255 /* 256 * this worker is a goner, clear ->worker_private to avoid any 257 * inc/dec running calls that could happen as part of exit from 258 * touching 'worker'. 259 */ 260 current->worker_private = NULL; 261 262 kfree_rcu(worker, rcu); 263 io_worker_ref_put(wq); 264 do_exit(0); 265 } 266 267 static inline bool __io_acct_run_queue(struct io_wq_acct *acct) 268 { 269 return !test_bit(IO_ACCT_STALLED_BIT, &acct->flags) && 270 !wq_list_empty(&acct->work_list); 271 } 272 273 /* 274 * If there's work to do, returns true with acct->lock acquired. If not, 275 * returns false with no lock held. 276 */ 277 static inline bool io_acct_run_queue(struct io_wq_acct *acct) 278 __acquires(&acct->lock) 279 { 280 raw_spin_lock(&acct->lock); 281 if (__io_acct_run_queue(acct)) 282 return true; 283 284 raw_spin_unlock(&acct->lock); 285 return false; 286 } 287 288 /* 289 * Check head of free list for an available worker. If one isn't available, 290 * caller must create one. 291 */ 292 static bool io_acct_activate_free_worker(struct io_wq_acct *acct) 293 __must_hold(RCU) 294 { 295 struct hlist_nulls_node *n; 296 struct io_worker *worker; 297 298 /* 299 * Iterate free_list and see if we can find an idle worker to 300 * activate. If a given worker is on the free_list but in the process 301 * of exiting, keep trying. 302 */ 303 hlist_nulls_for_each_entry_rcu(worker, n, &acct->free_list, nulls_node) { 304 if (!io_worker_get(worker)) 305 continue; 306 /* 307 * If the worker is already running, it's either already 308 * starting work or finishing work. In either case, if it does 309 * to go sleep, we'll kick off a new task for this work anyway. 310 */ 311 wake_up_process(worker->task); 312 io_worker_release(worker); 313 return true; 314 } 315 316 return false; 317 } 318 319 /* 320 * We need a worker. If we find a free one, we're good. If not, and we're 321 * below the max number of workers, create one. 322 */ 323 static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct) 324 { 325 /* 326 * Most likely an attempt to queue unbounded work on an io_wq that 327 * wasn't setup with any unbounded workers. 328 */ 329 if (unlikely(!acct->max_workers)) 330 pr_warn_once("io-wq is not configured for unbound workers"); 331 332 raw_spin_lock(&acct->workers_lock); 333 if (acct->nr_workers >= acct->max_workers) { 334 raw_spin_unlock(&acct->workers_lock); 335 return true; 336 } 337 acct->nr_workers++; 338 raw_spin_unlock(&acct->workers_lock); 339 atomic_inc(&acct->nr_running); 340 atomic_inc(&wq->worker_refs); 341 return create_io_worker(wq, acct); 342 } 343 344 static void io_wq_inc_running(struct io_worker *worker) 345 { 346 struct io_wq_acct *acct = io_wq_get_acct(worker); 347 348 atomic_inc(&acct->nr_running); 349 } 350 351 static void create_worker_cb(struct callback_head *cb) 352 { 353 struct io_worker *worker; 354 struct io_wq *wq; 355 356 struct io_wq_acct *acct; 357 bool activated_free_worker, do_create = false; 358 359 worker = container_of(cb, struct io_worker, create_work); 360 wq = worker->wq; 361 acct = worker->acct; 362 363 rcu_read_lock(); 364 activated_free_worker = io_acct_activate_free_worker(acct); 365 rcu_read_unlock(); 366 if (activated_free_worker) 367 goto no_need_create; 368 369 raw_spin_lock(&acct->workers_lock); 370 371 if (acct->nr_workers < acct->max_workers) { 372 acct->nr_workers++; 373 do_create = true; 374 } 375 raw_spin_unlock(&acct->workers_lock); 376 if (do_create) { 377 create_io_worker(wq, acct); 378 } else { 379 no_need_create: 380 atomic_dec(&acct->nr_running); 381 io_worker_ref_put(wq); 382 } 383 clear_bit_unlock(0, &worker->create_state); 384 io_worker_release(worker); 385 } 386 387 static bool io_queue_worker_create(struct io_worker *worker, 388 struct io_wq_acct *acct, 389 task_work_func_t func) 390 { 391 struct io_wq *wq = worker->wq; 392 393 /* raced with exit, just ignore create call */ 394 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) 395 goto fail; 396 if (!io_worker_get(worker)) 397 goto fail; 398 /* 399 * create_state manages ownership of create_work/index. We should 400 * only need one entry per worker, as the worker going to sleep 401 * will trigger the condition, and waking will clear it once it 402 * runs the task_work. 403 */ 404 if (test_bit(0, &worker->create_state) || 405 test_and_set_bit_lock(0, &worker->create_state)) 406 goto fail_release; 407 408 atomic_inc(&wq->worker_refs); 409 init_task_work(&worker->create_work, func); 410 if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) { 411 /* 412 * EXIT may have been set after checking it above, check after 413 * adding the task_work and remove any creation item if it is 414 * now set. wq exit does that too, but we can have added this 415 * work item after we canceled in io_wq_exit_workers(). 416 */ 417 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) 418 io_wq_cancel_tw_create(wq); 419 io_worker_ref_put(wq); 420 return true; 421 } 422 io_worker_ref_put(wq); 423 clear_bit_unlock(0, &worker->create_state); 424 fail_release: 425 io_worker_release(worker); 426 fail: 427 atomic_dec(&acct->nr_running); 428 io_worker_ref_put(wq); 429 return false; 430 } 431 432 /* Defer if current and next work are both hashed to the same chain */ 433 static bool io_wq_hash_defer(struct io_wq_work *work, struct io_wq_acct *acct) 434 { 435 unsigned int hash, work_flags; 436 struct io_wq_work *next; 437 438 lockdep_assert_held(&acct->lock); 439 440 work_flags = atomic_read(&work->flags); 441 if (!__io_wq_is_hashed(work_flags)) 442 return false; 443 444 /* should not happen, io_acct_run_queue() said we had work */ 445 if (wq_list_empty(&acct->work_list)) 446 return true; 447 448 hash = __io_get_work_hash(work_flags); 449 next = container_of(acct->work_list.first, struct io_wq_work, list); 450 work_flags = atomic_read(&next->flags); 451 if (!__io_wq_is_hashed(work_flags)) 452 return false; 453 return hash == __io_get_work_hash(work_flags); 454 } 455 456 static void io_wq_dec_running(struct io_worker *worker) 457 { 458 struct io_wq_acct *acct = io_wq_get_acct(worker); 459 struct io_wq *wq = worker->wq; 460 461 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 462 return; 463 464 if (!atomic_dec_and_test(&acct->nr_running)) 465 return; 466 if (!worker->cur_work) 467 return; 468 if (!io_acct_run_queue(acct)) 469 return; 470 if (io_wq_hash_defer(worker->cur_work, acct)) { 471 raw_spin_unlock(&acct->lock); 472 return; 473 } 474 475 raw_spin_unlock(&acct->lock); 476 atomic_inc(&acct->nr_running); 477 atomic_inc(&wq->worker_refs); 478 io_queue_worker_create(worker, acct, create_worker_cb); 479 } 480 481 /* 482 * Worker will start processing some work. Move it to the busy list, if 483 * it's currently on the freelist 484 */ 485 static void __io_worker_busy(struct io_wq_acct *acct, struct io_worker *worker) 486 { 487 if (test_bit(IO_WORKER_F_FREE, &worker->flags)) { 488 clear_bit(IO_WORKER_F_FREE, &worker->flags); 489 raw_spin_lock(&acct->workers_lock); 490 hlist_nulls_del_init_rcu(&worker->nulls_node); 491 raw_spin_unlock(&acct->workers_lock); 492 } 493 } 494 495 /* 496 * No work, worker going to sleep. Move to freelist. 497 */ 498 static void __io_worker_idle(struct io_wq_acct *acct, struct io_worker *worker) 499 __must_hold(acct->workers_lock) 500 { 501 if (!test_bit(IO_WORKER_F_FREE, &worker->flags)) { 502 set_bit(IO_WORKER_F_FREE, &worker->flags); 503 hlist_nulls_add_head_rcu(&worker->nulls_node, &acct->free_list); 504 } 505 } 506 507 static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash) 508 { 509 bool ret = false; 510 511 spin_lock_irq(&wq->hash->wait.lock); 512 if (list_empty(&wq->wait.entry)) { 513 __add_wait_queue(&wq->hash->wait, &wq->wait); 514 if (!test_bit(hash, &wq->hash->map)) { 515 __set_current_state(TASK_RUNNING); 516 list_del_init(&wq->wait.entry); 517 ret = true; 518 } 519 } 520 spin_unlock_irq(&wq->hash->wait.lock); 521 return ret; 522 } 523 524 static struct io_wq_work *io_get_next_work(struct io_wq_acct *acct, 525 struct io_wq *wq) 526 __must_hold(acct->lock) 527 { 528 struct io_wq_work_node *node, *prev; 529 struct io_wq_work *work, *tail; 530 unsigned int stall_hash = -1U; 531 532 wq_list_for_each(node, prev, &acct->work_list) { 533 unsigned int work_flags; 534 unsigned int hash; 535 536 work = container_of(node, struct io_wq_work, list); 537 538 /* not hashed, can run anytime */ 539 work_flags = atomic_read(&work->flags); 540 if (!__io_wq_is_hashed(work_flags)) { 541 wq_list_del(&acct->work_list, node, prev); 542 return work; 543 } 544 545 hash = __io_get_work_hash(work_flags); 546 /* all items with this hash lie in [work, tail] */ 547 tail = wq->hash_tail[hash]; 548 549 /* hashed, can run if not already running */ 550 if (!test_and_set_bit(hash, &wq->hash->map)) { 551 wq->hash_tail[hash] = NULL; 552 wq_list_cut(&acct->work_list, &tail->list, prev); 553 return work; 554 } 555 if (stall_hash == -1U) 556 stall_hash = hash; 557 /* fast forward to a next hash, for-each will fix up @prev */ 558 node = &tail->list; 559 } 560 561 if (stall_hash != -1U) { 562 bool unstalled; 563 564 /* 565 * Set this before dropping the lock to avoid racing with new 566 * work being added and clearing the stalled bit. 567 */ 568 set_bit(IO_ACCT_STALLED_BIT, &acct->flags); 569 raw_spin_unlock(&acct->lock); 570 unstalled = io_wait_on_hash(wq, stall_hash); 571 raw_spin_lock(&acct->lock); 572 if (unstalled) { 573 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 574 if (wq_has_sleeper(&wq->hash->wait)) 575 wake_up(&wq->hash->wait); 576 } 577 } 578 579 return NULL; 580 } 581 582 static void io_assign_current_work(struct io_worker *worker, 583 struct io_wq_work *work) 584 { 585 if (work) { 586 io_run_task_work(); 587 cond_resched(); 588 } 589 590 raw_spin_lock(&worker->lock); 591 worker->cur_work = work; 592 raw_spin_unlock(&worker->lock); 593 } 594 595 /* 596 * Called with acct->lock held, drops it before returning 597 */ 598 static void io_worker_handle_work(struct io_wq_acct *acct, 599 struct io_worker *worker) 600 __releases(&acct->lock) 601 { 602 struct io_wq *wq = worker->wq; 603 604 do { 605 bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state); 606 struct io_wq_work *work; 607 608 /* 609 * If we got some work, mark us as busy. If we didn't, but 610 * the list isn't empty, it means we stalled on hashed work. 611 * Mark us stalled so we don't keep looking for work when we 612 * can't make progress, any work completion or insertion will 613 * clear the stalled flag. 614 */ 615 work = io_get_next_work(acct, wq); 616 if (work) { 617 /* 618 * Make sure cancelation can find this, even before 619 * it becomes the active work. That avoids a window 620 * where the work has been removed from our general 621 * work list, but isn't yet discoverable as the 622 * current work item for this worker. 623 */ 624 raw_spin_lock(&worker->lock); 625 worker->cur_work = work; 626 raw_spin_unlock(&worker->lock); 627 } 628 629 raw_spin_unlock(&acct->lock); 630 631 if (!work) 632 break; 633 634 __io_worker_busy(acct, worker); 635 636 io_assign_current_work(worker, work); 637 __set_current_state(TASK_RUNNING); 638 639 /* handle a whole dependent link */ 640 do { 641 struct io_wq_work *next_hashed, *linked; 642 unsigned int work_flags = atomic_read(&work->flags); 643 unsigned int hash = __io_wq_is_hashed(work_flags) 644 ? __io_get_work_hash(work_flags) 645 : -1U; 646 647 next_hashed = wq_next_work(work); 648 649 if (do_kill && 650 (work_flags & IO_WQ_WORK_UNBOUND)) 651 atomic_or(IO_WQ_WORK_CANCEL, &work->flags); 652 io_wq_submit_work(work); 653 io_assign_current_work(worker, NULL); 654 655 linked = io_wq_free_work(work); 656 work = next_hashed; 657 if (!work && linked && !io_wq_is_hashed(linked)) { 658 work = linked; 659 linked = NULL; 660 } 661 io_assign_current_work(worker, work); 662 if (linked) 663 io_wq_enqueue(wq, linked); 664 665 if (hash != -1U && !next_hashed) { 666 /* serialize hash clear with wake_up() */ 667 spin_lock_irq(&wq->hash->wait.lock); 668 clear_bit(hash, &wq->hash->map); 669 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 670 spin_unlock_irq(&wq->hash->wait.lock); 671 if (wq_has_sleeper(&wq->hash->wait)) 672 wake_up(&wq->hash->wait); 673 } 674 } while (work); 675 676 if (!__io_acct_run_queue(acct)) 677 break; 678 raw_spin_lock(&acct->lock); 679 } while (1); 680 } 681 682 static int io_wq_worker(void *data) 683 { 684 struct io_worker *worker = data; 685 struct io_wq_acct *acct = io_wq_get_acct(worker); 686 struct io_wq *wq = worker->wq; 687 bool exit_mask = false, last_timeout = false; 688 char buf[TASK_COMM_LEN] = {}; 689 690 set_mask_bits(&worker->flags, 0, 691 BIT(IO_WORKER_F_UP) | BIT(IO_WORKER_F_RUNNING)); 692 693 snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid); 694 set_task_comm(current, buf); 695 696 while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { 697 long ret; 698 699 set_current_state(TASK_INTERRUPTIBLE); 700 701 /* 702 * If we have work to do, io_acct_run_queue() returns with 703 * the acct->lock held. If not, it will drop it. 704 */ 705 while (io_acct_run_queue(acct)) 706 io_worker_handle_work(acct, worker); 707 708 raw_spin_lock(&acct->workers_lock); 709 /* 710 * Last sleep timed out. Exit if we're not the last worker, 711 * or if someone modified our affinity. If wq is marked 712 * idle-exit, drop the worker as well. This is used to avoid 713 * keeping io-wq workers around for tasks that no longer have 714 * any active io_uring instances. 715 */ 716 if ((last_timeout && (exit_mask || acct->nr_workers > 1)) || 717 test_bit(IO_WQ_BIT_EXIT_ON_IDLE, &wq->state)) { 718 acct->nr_workers--; 719 raw_spin_unlock(&acct->workers_lock); 720 __set_current_state(TASK_RUNNING); 721 break; 722 } 723 last_timeout = false; 724 __io_worker_idle(acct, worker); 725 raw_spin_unlock(&acct->workers_lock); 726 if (io_run_task_work()) 727 continue; 728 ret = schedule_timeout(WORKER_IDLE_TIMEOUT); 729 if (signal_pending(current)) { 730 struct ksignal ksig; 731 732 if (!get_signal(&ksig)) 733 continue; 734 break; 735 } 736 if (!ret) { 737 last_timeout = true; 738 exit_mask = !cpumask_test_cpu(raw_smp_processor_id(), 739 wq->cpu_mask); 740 } 741 } 742 743 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) && io_acct_run_queue(acct)) 744 io_worker_handle_work(acct, worker); 745 746 io_worker_exit(worker); 747 return 0; 748 } 749 750 /* 751 * Called when a worker is scheduled in. Mark us as currently running. 752 */ 753 void io_wq_worker_running(struct task_struct *tsk) 754 { 755 struct io_worker *worker = tsk->worker_private; 756 757 if (!worker) 758 return; 759 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 760 return; 761 if (test_bit(IO_WORKER_F_RUNNING, &worker->flags)) 762 return; 763 set_bit(IO_WORKER_F_RUNNING, &worker->flags); 764 io_wq_inc_running(worker); 765 } 766 767 /* 768 * Called when worker is going to sleep. If there are no workers currently 769 * running and we have work pending, wake up a free one or create a new one. 770 */ 771 void io_wq_worker_sleeping(struct task_struct *tsk) 772 { 773 struct io_worker *worker = tsk->worker_private; 774 775 if (!worker) 776 return; 777 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 778 return; 779 if (!test_bit(IO_WORKER_F_RUNNING, &worker->flags)) 780 return; 781 782 clear_bit(IO_WORKER_F_RUNNING, &worker->flags); 783 io_wq_dec_running(worker); 784 } 785 786 static void io_init_new_worker(struct io_wq *wq, struct io_wq_acct *acct, struct io_worker *worker, 787 struct task_struct *tsk) 788 { 789 tsk->worker_private = worker; 790 worker->task = tsk; 791 set_cpus_allowed_ptr(tsk, wq->cpu_mask); 792 793 raw_spin_lock(&acct->workers_lock); 794 hlist_nulls_add_head_rcu(&worker->nulls_node, &acct->free_list); 795 list_add_tail_rcu(&worker->all_list, &acct->all_list); 796 set_bit(IO_WORKER_F_FREE, &worker->flags); 797 raw_spin_unlock(&acct->workers_lock); 798 wake_up_new_task(tsk); 799 } 800 801 static bool io_wq_work_match_all(struct io_wq_work *work, void *data) 802 { 803 return true; 804 } 805 806 static inline bool io_should_retry_thread(struct io_worker *worker, long err) 807 { 808 /* 809 * Prevent perpetual task_work retry, if the task (or its group) is 810 * exiting. 811 */ 812 if (fatal_signal_pending(current)) 813 return false; 814 815 worker->init_retries++; 816 switch (err) { 817 case -EAGAIN: 818 return worker->init_retries <= WORKER_INIT_LIMIT; 819 /* Analogous to a fork() syscall, always retry on a restartable error */ 820 case -ERESTARTSYS: 821 case -ERESTARTNOINTR: 822 case -ERESTARTNOHAND: 823 return true; 824 default: 825 return false; 826 } 827 } 828 829 static void queue_create_worker_retry(struct io_worker *worker) 830 { 831 /* 832 * We only bother retrying because there's a chance that the 833 * failure to create a worker is due to some temporary condition 834 * in the forking task (e.g. outstanding signal); give the task 835 * some time to clear that condition. 836 */ 837 schedule_delayed_work(&worker->work, 838 msecs_to_jiffies(worker->init_retries * 5)); 839 } 840 841 static void create_worker_cont(struct callback_head *cb) 842 { 843 struct io_worker *worker; 844 struct task_struct *tsk; 845 struct io_wq *wq; 846 struct io_wq_acct *acct; 847 848 worker = container_of(cb, struct io_worker, create_work); 849 clear_bit_unlock(0, &worker->create_state); 850 wq = worker->wq; 851 acct = io_wq_get_acct(worker); 852 tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); 853 if (!IS_ERR(tsk)) { 854 io_init_new_worker(wq, acct, worker, tsk); 855 io_worker_release(worker); 856 return; 857 } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) { 858 atomic_dec(&acct->nr_running); 859 raw_spin_lock(&acct->workers_lock); 860 acct->nr_workers--; 861 if (!acct->nr_workers) { 862 struct io_cb_cancel_data match = { 863 .fn = io_wq_work_match_all, 864 .cancel_all = true, 865 }; 866 867 raw_spin_unlock(&acct->workers_lock); 868 while (io_acct_cancel_pending_work(wq, acct, &match)) 869 ; 870 } else { 871 raw_spin_unlock(&acct->workers_lock); 872 } 873 io_worker_ref_put(wq); 874 kfree(worker); 875 return; 876 } 877 878 /* re-create attempts grab a new worker ref, drop the existing one */ 879 io_worker_release(worker); 880 queue_create_worker_retry(worker); 881 } 882 883 static void io_workqueue_create(struct work_struct *work) 884 { 885 struct io_worker *worker = container_of(work, struct io_worker, 886 work.work); 887 struct io_wq_acct *acct = io_wq_get_acct(worker); 888 889 if (!io_queue_worker_create(worker, acct, create_worker_cont)) 890 kfree(worker); 891 } 892 893 static bool create_io_worker(struct io_wq *wq, struct io_wq_acct *acct) 894 { 895 struct io_worker *worker; 896 struct task_struct *tsk; 897 898 __set_current_state(TASK_RUNNING); 899 900 worker = kzalloc(sizeof(*worker), GFP_KERNEL); 901 if (!worker) { 902 fail: 903 atomic_dec(&acct->nr_running); 904 raw_spin_lock(&acct->workers_lock); 905 acct->nr_workers--; 906 raw_spin_unlock(&acct->workers_lock); 907 io_worker_ref_put(wq); 908 return false; 909 } 910 911 refcount_set(&worker->ref, 1); 912 worker->wq = wq; 913 worker->acct = acct; 914 raw_spin_lock_init(&worker->lock); 915 init_completion(&worker->ref_done); 916 917 tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); 918 if (!IS_ERR(tsk)) { 919 io_init_new_worker(wq, acct, worker, tsk); 920 } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) { 921 kfree(worker); 922 goto fail; 923 } else { 924 INIT_DELAYED_WORK(&worker->work, io_workqueue_create); 925 queue_create_worker_retry(worker); 926 } 927 928 return true; 929 } 930 931 /* 932 * Iterate the passed in list and call the specific function for each 933 * worker that isn't exiting 934 */ 935 static bool io_acct_for_each_worker(struct io_wq_acct *acct, 936 bool (*func)(struct io_worker *, void *), 937 void *data) 938 { 939 struct io_worker *worker; 940 bool ret = false; 941 942 list_for_each_entry_rcu(worker, &acct->all_list, all_list) { 943 if (io_worker_get(worker)) { 944 /* no task if node is/was offline */ 945 if (worker->task) 946 ret = func(worker, data); 947 io_worker_release(worker); 948 if (ret) 949 break; 950 } 951 } 952 953 return ret; 954 } 955 956 static void io_wq_for_each_worker(struct io_wq *wq, 957 bool (*func)(struct io_worker *, void *), 958 void *data) 959 { 960 for (int i = 0; i < IO_WQ_ACCT_NR; i++) 961 if (io_acct_for_each_worker(&wq->acct[i], func, data)) 962 break; 963 } 964 965 static bool io_wq_worker_wake(struct io_worker *worker, void *data) 966 { 967 __set_notify_signal(worker->task); 968 wake_up_process(worker->task); 969 return false; 970 } 971 972 void io_wq_set_exit_on_idle(struct io_wq *wq, bool enable) 973 { 974 if (!wq->task) 975 return; 976 977 if (!enable) { 978 clear_bit(IO_WQ_BIT_EXIT_ON_IDLE, &wq->state); 979 return; 980 } 981 982 if (test_and_set_bit(IO_WQ_BIT_EXIT_ON_IDLE, &wq->state)) 983 return; 984 985 rcu_read_lock(); 986 io_wq_for_each_worker(wq, io_wq_worker_wake, NULL); 987 rcu_read_unlock(); 988 } 989 990 static void io_run_cancel(struct io_wq_work *work, struct io_wq *wq) 991 { 992 do { 993 atomic_or(IO_WQ_WORK_CANCEL, &work->flags); 994 io_wq_submit_work(work); 995 work = io_wq_free_work(work); 996 } while (work); 997 } 998 999 static void io_wq_insert_work(struct io_wq *wq, struct io_wq_acct *acct, 1000 struct io_wq_work *work, unsigned int work_flags) 1001 { 1002 unsigned int hash; 1003 struct io_wq_work *tail; 1004 1005 if (!__io_wq_is_hashed(work_flags)) { 1006 append: 1007 wq_list_add_tail(&work->list, &acct->work_list); 1008 return; 1009 } 1010 1011 hash = __io_get_work_hash(work_flags); 1012 tail = wq->hash_tail[hash]; 1013 wq->hash_tail[hash] = work; 1014 if (!tail) 1015 goto append; 1016 1017 wq_list_add_after(&work->list, &tail->list, &acct->work_list); 1018 } 1019 1020 static bool io_wq_work_match_item(struct io_wq_work *work, void *data) 1021 { 1022 return work == data; 1023 } 1024 1025 void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) 1026 { 1027 unsigned int work_flags = atomic_read(&work->flags); 1028 struct io_wq_acct *acct = io_work_get_acct(wq, work_flags); 1029 struct io_cb_cancel_data match = { 1030 .fn = io_wq_work_match_item, 1031 .data = work, 1032 .cancel_all = false, 1033 }; 1034 bool do_create; 1035 1036 /* 1037 * If io-wq is exiting for this task, or if the request has explicitly 1038 * been marked as one that should not get executed, cancel it here. 1039 */ 1040 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) || 1041 (work_flags & IO_WQ_WORK_CANCEL)) { 1042 io_run_cancel(work, wq); 1043 return; 1044 } 1045 1046 raw_spin_lock(&acct->lock); 1047 io_wq_insert_work(wq, acct, work, work_flags); 1048 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 1049 raw_spin_unlock(&acct->lock); 1050 1051 rcu_read_lock(); 1052 do_create = !io_acct_activate_free_worker(acct); 1053 rcu_read_unlock(); 1054 1055 if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) || 1056 !atomic_read(&acct->nr_running))) { 1057 bool did_create; 1058 1059 did_create = io_wq_create_worker(wq, acct); 1060 if (likely(did_create)) 1061 return; 1062 1063 raw_spin_lock(&acct->workers_lock); 1064 if (acct->nr_workers) { 1065 raw_spin_unlock(&acct->workers_lock); 1066 return; 1067 } 1068 raw_spin_unlock(&acct->workers_lock); 1069 1070 /* fatal condition, failed to create the first worker */ 1071 io_acct_cancel_pending_work(wq, acct, &match); 1072 } 1073 } 1074 1075 /* 1076 * Work items that hash to the same value will not be done in parallel. 1077 * Used to limit concurrent writes, generally hashed by inode. 1078 */ 1079 void io_wq_hash_work(struct io_wq_work *work, void *val) 1080 { 1081 unsigned int bit; 1082 1083 bit = hash_ptr(val, IO_WQ_HASH_ORDER); 1084 atomic_or(IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT), &work->flags); 1085 } 1086 1087 static bool __io_wq_worker_cancel(struct io_worker *worker, 1088 struct io_cb_cancel_data *match, 1089 struct io_wq_work *work) 1090 { 1091 if (work && match->fn(work, match->data)) { 1092 atomic_or(IO_WQ_WORK_CANCEL, &work->flags); 1093 __set_notify_signal(worker->task); 1094 return true; 1095 } 1096 1097 return false; 1098 } 1099 1100 static bool io_wq_worker_cancel(struct io_worker *worker, void *data) 1101 { 1102 struct io_cb_cancel_data *match = data; 1103 1104 /* 1105 * Hold the lock to avoid ->cur_work going out of scope, caller 1106 * may dereference the passed in work. 1107 */ 1108 raw_spin_lock(&worker->lock); 1109 if (__io_wq_worker_cancel(worker, match, worker->cur_work)) 1110 match->nr_running++; 1111 raw_spin_unlock(&worker->lock); 1112 1113 return match->nr_running && !match->cancel_all; 1114 } 1115 1116 static inline void io_wq_remove_pending(struct io_wq *wq, 1117 struct io_wq_acct *acct, 1118 struct io_wq_work *work, 1119 struct io_wq_work_node *prev) 1120 { 1121 unsigned int hash = io_get_work_hash(work); 1122 struct io_wq_work *prev_work = NULL; 1123 1124 if (io_wq_is_hashed(work) && work == wq->hash_tail[hash]) { 1125 if (prev) 1126 prev_work = container_of(prev, struct io_wq_work, list); 1127 if (prev_work && io_get_work_hash(prev_work) == hash) 1128 wq->hash_tail[hash] = prev_work; 1129 else 1130 wq->hash_tail[hash] = NULL; 1131 } 1132 wq_list_del(&acct->work_list, &work->list, prev); 1133 } 1134 1135 static bool io_acct_cancel_pending_work(struct io_wq *wq, 1136 struct io_wq_acct *acct, 1137 struct io_cb_cancel_data *match) 1138 { 1139 struct io_wq_work_node *node, *prev; 1140 struct io_wq_work *work; 1141 1142 raw_spin_lock(&acct->lock); 1143 wq_list_for_each(node, prev, &acct->work_list) { 1144 work = container_of(node, struct io_wq_work, list); 1145 if (!match->fn(work, match->data)) 1146 continue; 1147 io_wq_remove_pending(wq, acct, work, prev); 1148 raw_spin_unlock(&acct->lock); 1149 io_run_cancel(work, wq); 1150 match->nr_pending++; 1151 /* not safe to continue after unlock */ 1152 return true; 1153 } 1154 raw_spin_unlock(&acct->lock); 1155 1156 return false; 1157 } 1158 1159 static void io_wq_cancel_pending_work(struct io_wq *wq, 1160 struct io_cb_cancel_data *match) 1161 { 1162 int i; 1163 retry: 1164 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1165 struct io_wq_acct *acct = io_get_acct(wq, i == 0); 1166 1167 if (io_acct_cancel_pending_work(wq, acct, match)) { 1168 if (match->cancel_all) 1169 goto retry; 1170 break; 1171 } 1172 } 1173 } 1174 1175 static void io_acct_cancel_running_work(struct io_wq_acct *acct, 1176 struct io_cb_cancel_data *match) 1177 { 1178 raw_spin_lock(&acct->workers_lock); 1179 io_acct_for_each_worker(acct, io_wq_worker_cancel, match); 1180 raw_spin_unlock(&acct->workers_lock); 1181 } 1182 1183 static void io_wq_cancel_running_work(struct io_wq *wq, 1184 struct io_cb_cancel_data *match) 1185 { 1186 rcu_read_lock(); 1187 1188 for (int i = 0; i < IO_WQ_ACCT_NR; i++) 1189 io_acct_cancel_running_work(&wq->acct[i], match); 1190 1191 rcu_read_unlock(); 1192 } 1193 1194 enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, 1195 void *data, bool cancel_all) 1196 { 1197 struct io_cb_cancel_data match = { 1198 .fn = cancel, 1199 .data = data, 1200 .cancel_all = cancel_all, 1201 }; 1202 1203 /* 1204 * First check pending list, if we're lucky we can just remove it 1205 * from there. CANCEL_OK means that the work is returned as-new, 1206 * no completion will be posted for it. 1207 * 1208 * Then check if a free (going busy) or busy worker has the work 1209 * currently running. If we find it there, we'll return CANCEL_RUNNING 1210 * as an indication that we attempt to signal cancellation. The 1211 * completion will run normally in this case. 1212 * 1213 * Do both of these while holding the acct->workers_lock, to ensure that 1214 * we'll find a work item regardless of state. 1215 */ 1216 io_wq_cancel_pending_work(wq, &match); 1217 if (match.nr_pending && !match.cancel_all) 1218 return IO_WQ_CANCEL_OK; 1219 1220 io_wq_cancel_running_work(wq, &match); 1221 if (match.nr_running && !match.cancel_all) 1222 return IO_WQ_CANCEL_RUNNING; 1223 1224 if (match.nr_running) 1225 return IO_WQ_CANCEL_RUNNING; 1226 if (match.nr_pending) 1227 return IO_WQ_CANCEL_OK; 1228 return IO_WQ_CANCEL_NOTFOUND; 1229 } 1230 1231 static int io_wq_hash_wake(struct wait_queue_entry *wait, unsigned mode, 1232 int sync, void *key) 1233 { 1234 struct io_wq *wq = container_of(wait, struct io_wq, wait); 1235 int i; 1236 1237 list_del_init(&wait->entry); 1238 1239 rcu_read_lock(); 1240 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1241 struct io_wq_acct *acct = &wq->acct[i]; 1242 1243 if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags)) 1244 io_acct_activate_free_worker(acct); 1245 } 1246 rcu_read_unlock(); 1247 return 1; 1248 } 1249 1250 struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) 1251 { 1252 int ret, i; 1253 struct io_wq *wq; 1254 1255 if (WARN_ON_ONCE(!bounded)) 1256 return ERR_PTR(-EINVAL); 1257 1258 wq = kzalloc(sizeof(struct io_wq), GFP_KERNEL); 1259 if (!wq) 1260 return ERR_PTR(-ENOMEM); 1261 1262 refcount_inc(&data->hash->refs); 1263 wq->hash = data->hash; 1264 1265 ret = -ENOMEM; 1266 1267 if (!alloc_cpumask_var(&wq->cpu_mask, GFP_KERNEL)) 1268 goto err; 1269 cpuset_cpus_allowed(data->task, wq->cpu_mask); 1270 wq->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; 1271 wq->acct[IO_WQ_ACCT_UNBOUND].max_workers = 1272 task_rlimit(current, RLIMIT_NPROC); 1273 INIT_LIST_HEAD(&wq->wait.entry); 1274 wq->wait.func = io_wq_hash_wake; 1275 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1276 struct io_wq_acct *acct = &wq->acct[i]; 1277 1278 atomic_set(&acct->nr_running, 0); 1279 1280 raw_spin_lock_init(&acct->workers_lock); 1281 INIT_HLIST_NULLS_HEAD(&acct->free_list, 0); 1282 INIT_LIST_HEAD(&acct->all_list); 1283 1284 INIT_WQ_LIST(&acct->work_list); 1285 raw_spin_lock_init(&acct->lock); 1286 } 1287 1288 wq->task = get_task_struct(data->task); 1289 atomic_set(&wq->worker_refs, 1); 1290 init_completion(&wq->worker_done); 1291 ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1292 if (ret) { 1293 put_task_struct(wq->task); 1294 goto err; 1295 } 1296 1297 return wq; 1298 err: 1299 io_wq_put_hash(data->hash); 1300 free_cpumask_var(wq->cpu_mask); 1301 kfree(wq); 1302 return ERR_PTR(ret); 1303 } 1304 1305 static bool io_task_work_match(struct callback_head *cb, void *data) 1306 { 1307 struct io_worker *worker; 1308 1309 if (cb->func != create_worker_cb && cb->func != create_worker_cont) 1310 return false; 1311 worker = container_of(cb, struct io_worker, create_work); 1312 return worker->wq == data; 1313 } 1314 1315 void io_wq_exit_start(struct io_wq *wq) 1316 { 1317 set_bit(IO_WQ_BIT_EXIT, &wq->state); 1318 } 1319 1320 static void io_wq_cancel_tw_create(struct io_wq *wq) 1321 { 1322 struct callback_head *cb; 1323 1324 while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) { 1325 struct io_worker *worker; 1326 1327 worker = container_of(cb, struct io_worker, create_work); 1328 io_worker_cancel_cb(worker); 1329 /* 1330 * Only the worker continuation helper has worker allocated and 1331 * hence needs freeing. 1332 */ 1333 if (cb->func == create_worker_cont) 1334 kfree(worker); 1335 } 1336 } 1337 1338 static void io_wq_exit_workers(struct io_wq *wq) 1339 { 1340 unsigned long timeout, warn_timeout; 1341 1342 if (!wq->task) 1343 return; 1344 1345 io_wq_cancel_tw_create(wq); 1346 1347 rcu_read_lock(); 1348 io_wq_for_each_worker(wq, io_wq_worker_wake, NULL); 1349 rcu_read_unlock(); 1350 io_worker_ref_put(wq); 1351 1352 /* 1353 * Shut up hung task complaint, see for example 1354 * 1355 * https://lore.kernel.org/all/696fc9e7.a70a0220.111c58.0006.GAE@google.com/ 1356 * 1357 * where completely overloading the system with tons of long running 1358 * io-wq items can easily trigger the hung task timeout. Only sleep 1359 * uninterruptibly for half that time, and warn if we exceeded end 1360 * up waiting more than IO_URING_EXIT_WAIT_MAX. 1361 */ 1362 timeout = sysctl_hung_task_timeout_secs * HZ / 2; 1363 if (!timeout) 1364 timeout = MAX_SCHEDULE_TIMEOUT; 1365 warn_timeout = jiffies + IO_URING_EXIT_WAIT_MAX; 1366 do { 1367 if (wait_for_completion_timeout(&wq->worker_done, timeout)) 1368 break; 1369 WARN_ON_ONCE(time_after(jiffies, warn_timeout)); 1370 } while (1); 1371 1372 spin_lock_irq(&wq->hash->wait.lock); 1373 list_del_init(&wq->wait.entry); 1374 spin_unlock_irq(&wq->hash->wait.lock); 1375 1376 put_task_struct(wq->task); 1377 wq->task = NULL; 1378 } 1379 1380 static void io_wq_destroy(struct io_wq *wq) 1381 { 1382 struct io_cb_cancel_data match = { 1383 .fn = io_wq_work_match_all, 1384 .cancel_all = true, 1385 }; 1386 1387 cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1388 io_wq_cancel_pending_work(wq, &match); 1389 free_cpumask_var(wq->cpu_mask); 1390 io_wq_put_hash(wq->hash); 1391 kfree(wq); 1392 } 1393 1394 void io_wq_put_and_exit(struct io_wq *wq) 1395 { 1396 WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state)); 1397 1398 io_wq_exit_workers(wq); 1399 io_wq_destroy(wq); 1400 } 1401 1402 struct online_data { 1403 unsigned int cpu; 1404 bool online; 1405 }; 1406 1407 static bool io_wq_worker_affinity(struct io_worker *worker, void *data) 1408 { 1409 struct online_data *od = data; 1410 1411 if (od->online) 1412 cpumask_set_cpu(od->cpu, worker->wq->cpu_mask); 1413 else 1414 cpumask_clear_cpu(od->cpu, worker->wq->cpu_mask); 1415 return false; 1416 } 1417 1418 static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online) 1419 { 1420 struct online_data od = { 1421 .cpu = cpu, 1422 .online = online 1423 }; 1424 1425 rcu_read_lock(); 1426 io_wq_for_each_worker(wq, io_wq_worker_affinity, &od); 1427 rcu_read_unlock(); 1428 return 0; 1429 } 1430 1431 static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node) 1432 { 1433 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1434 1435 return __io_wq_cpu_online(wq, cpu, true); 1436 } 1437 1438 static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node) 1439 { 1440 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1441 1442 return __io_wq_cpu_online(wq, cpu, false); 1443 } 1444 1445 int io_wq_cpu_affinity(struct io_uring_task *tctx, cpumask_var_t mask) 1446 { 1447 cpumask_var_t allowed_mask; 1448 int ret = 0; 1449 1450 if (!tctx || !tctx->io_wq) 1451 return -EINVAL; 1452 1453 if (!alloc_cpumask_var(&allowed_mask, GFP_KERNEL)) 1454 return -ENOMEM; 1455 1456 rcu_read_lock(); 1457 cpuset_cpus_allowed(tctx->io_wq->task, allowed_mask); 1458 if (mask) { 1459 if (cpumask_subset(mask, allowed_mask)) 1460 cpumask_copy(tctx->io_wq->cpu_mask, mask); 1461 else 1462 ret = -EINVAL; 1463 } else { 1464 cpumask_copy(tctx->io_wq->cpu_mask, allowed_mask); 1465 } 1466 rcu_read_unlock(); 1467 1468 free_cpumask_var(allowed_mask); 1469 return ret; 1470 } 1471 1472 /* 1473 * Set max number of unbounded workers, returns old value. If new_count is 0, 1474 * then just return the old value. 1475 */ 1476 int io_wq_max_workers(struct io_wq *wq, int *new_count) 1477 { 1478 struct io_wq_acct *acct; 1479 int prev[IO_WQ_ACCT_NR]; 1480 int i; 1481 1482 BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND); 1483 BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND); 1484 BUILD_BUG_ON((int) IO_WQ_ACCT_NR != 2); 1485 1486 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1487 if (new_count[i] > task_rlimit(current, RLIMIT_NPROC)) 1488 new_count[i] = task_rlimit(current, RLIMIT_NPROC); 1489 } 1490 1491 for (i = 0; i < IO_WQ_ACCT_NR; i++) 1492 prev[i] = 0; 1493 1494 rcu_read_lock(); 1495 1496 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1497 acct = &wq->acct[i]; 1498 raw_spin_lock(&acct->workers_lock); 1499 prev[i] = max_t(int, acct->max_workers, prev[i]); 1500 if (new_count[i]) 1501 acct->max_workers = new_count[i]; 1502 raw_spin_unlock(&acct->workers_lock); 1503 } 1504 rcu_read_unlock(); 1505 1506 for (i = 0; i < IO_WQ_ACCT_NR; i++) 1507 new_count[i] = prev[i]; 1508 1509 return 0; 1510 } 1511 1512 static __init int io_wq_init(void) 1513 { 1514 int ret; 1515 1516 ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online", 1517 io_wq_cpu_online, io_wq_cpu_offline); 1518 if (ret < 0) 1519 return ret; 1520 io_wq_online = ret; 1521 return 0; 1522 } 1523 subsys_initcall(io_wq_init); 1524