1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Basic worker thread pool for io_uring 4 * 5 * Copyright (C) 2019 Jens Axboe 6 * 7 */ 8 #include <linux/kernel.h> 9 #include <linux/init.h> 10 #include <linux/errno.h> 11 #include <linux/sched/signal.h> 12 #include <linux/percpu.h> 13 #include <linux/slab.h> 14 #include <linux/rculist_nulls.h> 15 #include <linux/cpu.h> 16 #include <linux/cpuset.h> 17 #include <linux/task_work.h> 18 #include <linux/audit.h> 19 #include <linux/mmu_context.h> 20 #include <uapi/linux/io_uring.h> 21 22 #include "io-wq.h" 23 #include "slist.h" 24 #include "io_uring.h" 25 26 #define WORKER_IDLE_TIMEOUT (5 * HZ) 27 #define WORKER_INIT_LIMIT 3 28 29 enum { 30 IO_WORKER_F_UP = 0, /* up and active */ 31 IO_WORKER_F_RUNNING = 1, /* account as running */ 32 IO_WORKER_F_FREE = 2, /* worker on free list */ 33 }; 34 35 enum { 36 IO_WQ_BIT_EXIT = 0, /* wq exiting */ 37 }; 38 39 enum { 40 IO_ACCT_STALLED_BIT = 0, /* stalled on hash */ 41 }; 42 43 /* 44 * One for each thread in a wq pool 45 */ 46 struct io_worker { 47 refcount_t ref; 48 unsigned long flags; 49 struct hlist_nulls_node nulls_node; 50 struct list_head all_list; 51 struct task_struct *task; 52 struct io_wq *wq; 53 struct io_wq_acct *acct; 54 55 struct io_wq_work *cur_work; 56 raw_spinlock_t lock; 57 58 struct completion ref_done; 59 60 unsigned long create_state; 61 struct callback_head create_work; 62 int init_retries; 63 64 union { 65 struct rcu_head rcu; 66 struct delayed_work work; 67 }; 68 }; 69 70 #if BITS_PER_LONG == 64 71 #define IO_WQ_HASH_ORDER 6 72 #else 73 #define IO_WQ_HASH_ORDER 5 74 #endif 75 76 #define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER) 77 78 struct io_wq_acct { 79 /** 80 * Protects access to the worker lists. 81 */ 82 raw_spinlock_t workers_lock; 83 84 unsigned nr_workers; 85 unsigned max_workers; 86 atomic_t nr_running; 87 88 /** 89 * The list of free workers. Protected by #workers_lock 90 * (write) and RCU (read). 91 */ 92 struct hlist_nulls_head free_list; 93 94 /** 95 * The list of all workers. Protected by #workers_lock 96 * (write) and RCU (read). 97 */ 98 struct list_head all_list; 99 100 raw_spinlock_t lock; 101 struct io_wq_work_list work_list; 102 unsigned long flags; 103 }; 104 105 enum { 106 IO_WQ_ACCT_BOUND, 107 IO_WQ_ACCT_UNBOUND, 108 IO_WQ_ACCT_NR, 109 }; 110 111 /* 112 * Per io_wq state 113 */ 114 struct io_wq { 115 unsigned long state; 116 117 free_work_fn *free_work; 118 io_wq_work_fn *do_work; 119 120 struct io_wq_hash *hash; 121 122 atomic_t worker_refs; 123 struct completion worker_done; 124 125 struct hlist_node cpuhp_node; 126 127 struct task_struct *task; 128 129 struct io_wq_acct acct[IO_WQ_ACCT_NR]; 130 131 struct wait_queue_entry wait; 132 133 struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS]; 134 135 cpumask_var_t cpu_mask; 136 }; 137 138 static enum cpuhp_state io_wq_online; 139 140 struct io_cb_cancel_data { 141 work_cancel_fn *fn; 142 void *data; 143 int nr_running; 144 int nr_pending; 145 bool cancel_all; 146 }; 147 148 static bool create_io_worker(struct io_wq *wq, struct io_wq_acct *acct); 149 static void io_wq_dec_running(struct io_worker *worker); 150 static bool io_acct_cancel_pending_work(struct io_wq *wq, 151 struct io_wq_acct *acct, 152 struct io_cb_cancel_data *match); 153 static void create_worker_cb(struct callback_head *cb); 154 static void io_wq_cancel_tw_create(struct io_wq *wq); 155 156 static bool io_worker_get(struct io_worker *worker) 157 { 158 return refcount_inc_not_zero(&worker->ref); 159 } 160 161 static void io_worker_release(struct io_worker *worker) 162 { 163 if (refcount_dec_and_test(&worker->ref)) 164 complete(&worker->ref_done); 165 } 166 167 static inline struct io_wq_acct *io_get_acct(struct io_wq *wq, bool bound) 168 { 169 return &wq->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND]; 170 } 171 172 static inline struct io_wq_acct *io_work_get_acct(struct io_wq *wq, 173 unsigned int work_flags) 174 { 175 return io_get_acct(wq, !(work_flags & IO_WQ_WORK_UNBOUND)); 176 } 177 178 static inline struct io_wq_acct *io_wq_get_acct(struct io_worker *worker) 179 { 180 return worker->acct; 181 } 182 183 static void io_worker_ref_put(struct io_wq *wq) 184 { 185 if (atomic_dec_and_test(&wq->worker_refs)) 186 complete(&wq->worker_done); 187 } 188 189 bool io_wq_worker_stopped(void) 190 { 191 struct io_worker *worker = current->worker_private; 192 193 if (WARN_ON_ONCE(!io_wq_current_is_worker())) 194 return true; 195 196 return test_bit(IO_WQ_BIT_EXIT, &worker->wq->state); 197 } 198 199 static void io_worker_cancel_cb(struct io_worker *worker) 200 { 201 struct io_wq_acct *acct = io_wq_get_acct(worker); 202 struct io_wq *wq = worker->wq; 203 204 atomic_dec(&acct->nr_running); 205 raw_spin_lock(&acct->workers_lock); 206 acct->nr_workers--; 207 raw_spin_unlock(&acct->workers_lock); 208 io_worker_ref_put(wq); 209 clear_bit_unlock(0, &worker->create_state); 210 io_worker_release(worker); 211 } 212 213 static bool io_task_worker_match(struct callback_head *cb, void *data) 214 { 215 struct io_worker *worker; 216 217 if (cb->func != create_worker_cb) 218 return false; 219 worker = container_of(cb, struct io_worker, create_work); 220 return worker == data; 221 } 222 223 static void io_worker_exit(struct io_worker *worker) 224 { 225 struct io_wq *wq = worker->wq; 226 struct io_wq_acct *acct = io_wq_get_acct(worker); 227 228 while (1) { 229 struct callback_head *cb = task_work_cancel_match(wq->task, 230 io_task_worker_match, worker); 231 232 if (!cb) 233 break; 234 io_worker_cancel_cb(worker); 235 } 236 237 io_worker_release(worker); 238 wait_for_completion(&worker->ref_done); 239 240 raw_spin_lock(&acct->workers_lock); 241 if (test_bit(IO_WORKER_F_FREE, &worker->flags)) 242 hlist_nulls_del_rcu(&worker->nulls_node); 243 list_del_rcu(&worker->all_list); 244 raw_spin_unlock(&acct->workers_lock); 245 io_wq_dec_running(worker); 246 /* 247 * this worker is a goner, clear ->worker_private to avoid any 248 * inc/dec running calls that could happen as part of exit from 249 * touching 'worker'. 250 */ 251 current->worker_private = NULL; 252 253 kfree_rcu(worker, rcu); 254 io_worker_ref_put(wq); 255 do_exit(0); 256 } 257 258 static inline bool __io_acct_run_queue(struct io_wq_acct *acct) 259 { 260 return !test_bit(IO_ACCT_STALLED_BIT, &acct->flags) && 261 !wq_list_empty(&acct->work_list); 262 } 263 264 /* 265 * If there's work to do, returns true with acct->lock acquired. If not, 266 * returns false with no lock held. 267 */ 268 static inline bool io_acct_run_queue(struct io_wq_acct *acct) 269 __acquires(&acct->lock) 270 { 271 raw_spin_lock(&acct->lock); 272 if (__io_acct_run_queue(acct)) 273 return true; 274 275 raw_spin_unlock(&acct->lock); 276 return false; 277 } 278 279 /* 280 * Check head of free list for an available worker. If one isn't available, 281 * caller must create one. 282 */ 283 static bool io_acct_activate_free_worker(struct io_wq_acct *acct) 284 __must_hold(RCU) 285 { 286 struct hlist_nulls_node *n; 287 struct io_worker *worker; 288 289 /* 290 * Iterate free_list and see if we can find an idle worker to 291 * activate. If a given worker is on the free_list but in the process 292 * of exiting, keep trying. 293 */ 294 hlist_nulls_for_each_entry_rcu(worker, n, &acct->free_list, nulls_node) { 295 if (!io_worker_get(worker)) 296 continue; 297 /* 298 * If the worker is already running, it's either already 299 * starting work or finishing work. In either case, if it does 300 * to go sleep, we'll kick off a new task for this work anyway. 301 */ 302 wake_up_process(worker->task); 303 io_worker_release(worker); 304 return true; 305 } 306 307 return false; 308 } 309 310 /* 311 * We need a worker. If we find a free one, we're good. If not, and we're 312 * below the max number of workers, create one. 313 */ 314 static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct) 315 { 316 /* 317 * Most likely an attempt to queue unbounded work on an io_wq that 318 * wasn't setup with any unbounded workers. 319 */ 320 if (unlikely(!acct->max_workers)) 321 pr_warn_once("io-wq is not configured for unbound workers"); 322 323 raw_spin_lock(&acct->workers_lock); 324 if (acct->nr_workers >= acct->max_workers) { 325 raw_spin_unlock(&acct->workers_lock); 326 return true; 327 } 328 acct->nr_workers++; 329 raw_spin_unlock(&acct->workers_lock); 330 atomic_inc(&acct->nr_running); 331 atomic_inc(&wq->worker_refs); 332 return create_io_worker(wq, acct); 333 } 334 335 static void io_wq_inc_running(struct io_worker *worker) 336 { 337 struct io_wq_acct *acct = io_wq_get_acct(worker); 338 339 atomic_inc(&acct->nr_running); 340 } 341 342 static void create_worker_cb(struct callback_head *cb) 343 { 344 struct io_worker *worker; 345 struct io_wq *wq; 346 347 struct io_wq_acct *acct; 348 bool do_create = false; 349 350 worker = container_of(cb, struct io_worker, create_work); 351 wq = worker->wq; 352 acct = worker->acct; 353 raw_spin_lock(&acct->workers_lock); 354 355 if (acct->nr_workers < acct->max_workers) { 356 acct->nr_workers++; 357 do_create = true; 358 } 359 raw_spin_unlock(&acct->workers_lock); 360 if (do_create) { 361 create_io_worker(wq, acct); 362 } else { 363 atomic_dec(&acct->nr_running); 364 io_worker_ref_put(wq); 365 } 366 clear_bit_unlock(0, &worker->create_state); 367 io_worker_release(worker); 368 } 369 370 static bool io_queue_worker_create(struct io_worker *worker, 371 struct io_wq_acct *acct, 372 task_work_func_t func) 373 { 374 struct io_wq *wq = worker->wq; 375 376 /* raced with exit, just ignore create call */ 377 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) 378 goto fail; 379 if (!io_worker_get(worker)) 380 goto fail; 381 /* 382 * create_state manages ownership of create_work/index. We should 383 * only need one entry per worker, as the worker going to sleep 384 * will trigger the condition, and waking will clear it once it 385 * runs the task_work. 386 */ 387 if (test_bit(0, &worker->create_state) || 388 test_and_set_bit_lock(0, &worker->create_state)) 389 goto fail_release; 390 391 atomic_inc(&wq->worker_refs); 392 init_task_work(&worker->create_work, func); 393 if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) { 394 /* 395 * EXIT may have been set after checking it above, check after 396 * adding the task_work and remove any creation item if it is 397 * now set. wq exit does that too, but we can have added this 398 * work item after we canceled in io_wq_exit_workers(). 399 */ 400 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) 401 io_wq_cancel_tw_create(wq); 402 io_worker_ref_put(wq); 403 return true; 404 } 405 io_worker_ref_put(wq); 406 clear_bit_unlock(0, &worker->create_state); 407 fail_release: 408 io_worker_release(worker); 409 fail: 410 atomic_dec(&acct->nr_running); 411 io_worker_ref_put(wq); 412 return false; 413 } 414 415 static void io_wq_dec_running(struct io_worker *worker) 416 { 417 struct io_wq_acct *acct = io_wq_get_acct(worker); 418 struct io_wq *wq = worker->wq; 419 420 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 421 return; 422 423 if (!atomic_dec_and_test(&acct->nr_running)) 424 return; 425 if (!io_acct_run_queue(acct)) 426 return; 427 428 raw_spin_unlock(&acct->lock); 429 atomic_inc(&acct->nr_running); 430 atomic_inc(&wq->worker_refs); 431 io_queue_worker_create(worker, acct, create_worker_cb); 432 } 433 434 /* 435 * Worker will start processing some work. Move it to the busy list, if 436 * it's currently on the freelist 437 */ 438 static void __io_worker_busy(struct io_wq_acct *acct, struct io_worker *worker) 439 { 440 if (test_bit(IO_WORKER_F_FREE, &worker->flags)) { 441 clear_bit(IO_WORKER_F_FREE, &worker->flags); 442 raw_spin_lock(&acct->workers_lock); 443 hlist_nulls_del_init_rcu(&worker->nulls_node); 444 raw_spin_unlock(&acct->workers_lock); 445 } 446 } 447 448 /* 449 * No work, worker going to sleep. Move to freelist. 450 */ 451 static void __io_worker_idle(struct io_wq_acct *acct, struct io_worker *worker) 452 __must_hold(acct->workers_lock) 453 { 454 if (!test_bit(IO_WORKER_F_FREE, &worker->flags)) { 455 set_bit(IO_WORKER_F_FREE, &worker->flags); 456 hlist_nulls_add_head_rcu(&worker->nulls_node, &acct->free_list); 457 } 458 } 459 460 static inline unsigned int __io_get_work_hash(unsigned int work_flags) 461 { 462 return work_flags >> IO_WQ_HASH_SHIFT; 463 } 464 465 static inline unsigned int io_get_work_hash(struct io_wq_work *work) 466 { 467 return __io_get_work_hash(atomic_read(&work->flags)); 468 } 469 470 static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash) 471 { 472 bool ret = false; 473 474 spin_lock_irq(&wq->hash->wait.lock); 475 if (list_empty(&wq->wait.entry)) { 476 __add_wait_queue(&wq->hash->wait, &wq->wait); 477 if (!test_bit(hash, &wq->hash->map)) { 478 __set_current_state(TASK_RUNNING); 479 list_del_init(&wq->wait.entry); 480 ret = true; 481 } 482 } 483 spin_unlock_irq(&wq->hash->wait.lock); 484 return ret; 485 } 486 487 static struct io_wq_work *io_get_next_work(struct io_wq_acct *acct, 488 struct io_wq *wq) 489 __must_hold(acct->lock) 490 { 491 struct io_wq_work_node *node, *prev; 492 struct io_wq_work *work, *tail; 493 unsigned int stall_hash = -1U; 494 495 wq_list_for_each(node, prev, &acct->work_list) { 496 unsigned int work_flags; 497 unsigned int hash; 498 499 work = container_of(node, struct io_wq_work, list); 500 501 /* not hashed, can run anytime */ 502 work_flags = atomic_read(&work->flags); 503 if (!__io_wq_is_hashed(work_flags)) { 504 wq_list_del(&acct->work_list, node, prev); 505 return work; 506 } 507 508 hash = __io_get_work_hash(work_flags); 509 /* all items with this hash lie in [work, tail] */ 510 tail = wq->hash_tail[hash]; 511 512 /* hashed, can run if not already running */ 513 if (!test_and_set_bit(hash, &wq->hash->map)) { 514 wq->hash_tail[hash] = NULL; 515 wq_list_cut(&acct->work_list, &tail->list, prev); 516 return work; 517 } 518 if (stall_hash == -1U) 519 stall_hash = hash; 520 /* fast forward to a next hash, for-each will fix up @prev */ 521 node = &tail->list; 522 } 523 524 if (stall_hash != -1U) { 525 bool unstalled; 526 527 /* 528 * Set this before dropping the lock to avoid racing with new 529 * work being added and clearing the stalled bit. 530 */ 531 set_bit(IO_ACCT_STALLED_BIT, &acct->flags); 532 raw_spin_unlock(&acct->lock); 533 unstalled = io_wait_on_hash(wq, stall_hash); 534 raw_spin_lock(&acct->lock); 535 if (unstalled) { 536 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 537 if (wq_has_sleeper(&wq->hash->wait)) 538 wake_up(&wq->hash->wait); 539 } 540 } 541 542 return NULL; 543 } 544 545 static void io_assign_current_work(struct io_worker *worker, 546 struct io_wq_work *work) 547 { 548 if (work) { 549 io_run_task_work(); 550 cond_resched(); 551 } 552 553 raw_spin_lock(&worker->lock); 554 worker->cur_work = work; 555 raw_spin_unlock(&worker->lock); 556 } 557 558 /* 559 * Called with acct->lock held, drops it before returning 560 */ 561 static void io_worker_handle_work(struct io_wq_acct *acct, 562 struct io_worker *worker) 563 __releases(&acct->lock) 564 { 565 struct io_wq *wq = worker->wq; 566 bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state); 567 568 do { 569 struct io_wq_work *work; 570 571 /* 572 * If we got some work, mark us as busy. If we didn't, but 573 * the list isn't empty, it means we stalled on hashed work. 574 * Mark us stalled so we don't keep looking for work when we 575 * can't make progress, any work completion or insertion will 576 * clear the stalled flag. 577 */ 578 work = io_get_next_work(acct, wq); 579 if (work) { 580 /* 581 * Make sure cancelation can find this, even before 582 * it becomes the active work. That avoids a window 583 * where the work has been removed from our general 584 * work list, but isn't yet discoverable as the 585 * current work item for this worker. 586 */ 587 raw_spin_lock(&worker->lock); 588 worker->cur_work = work; 589 raw_spin_unlock(&worker->lock); 590 } 591 592 raw_spin_unlock(&acct->lock); 593 594 if (!work) 595 break; 596 597 __io_worker_busy(acct, worker); 598 599 io_assign_current_work(worker, work); 600 __set_current_state(TASK_RUNNING); 601 602 /* handle a whole dependent link */ 603 do { 604 struct io_wq_work *next_hashed, *linked; 605 unsigned int work_flags = atomic_read(&work->flags); 606 unsigned int hash = __io_wq_is_hashed(work_flags) 607 ? __io_get_work_hash(work_flags) 608 : -1U; 609 610 next_hashed = wq_next_work(work); 611 612 if (do_kill && 613 (work_flags & IO_WQ_WORK_UNBOUND)) 614 atomic_or(IO_WQ_WORK_CANCEL, &work->flags); 615 wq->do_work(work); 616 io_assign_current_work(worker, NULL); 617 618 linked = wq->free_work(work); 619 work = next_hashed; 620 if (!work && linked && !io_wq_is_hashed(linked)) { 621 work = linked; 622 linked = NULL; 623 } 624 io_assign_current_work(worker, work); 625 if (linked) 626 io_wq_enqueue(wq, linked); 627 628 if (hash != -1U && !next_hashed) { 629 /* serialize hash clear with wake_up() */ 630 spin_lock_irq(&wq->hash->wait.lock); 631 clear_bit(hash, &wq->hash->map); 632 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 633 spin_unlock_irq(&wq->hash->wait.lock); 634 if (wq_has_sleeper(&wq->hash->wait)) 635 wake_up(&wq->hash->wait); 636 } 637 } while (work); 638 639 if (!__io_acct_run_queue(acct)) 640 break; 641 raw_spin_lock(&acct->lock); 642 } while (1); 643 } 644 645 static int io_wq_worker(void *data) 646 { 647 struct io_worker *worker = data; 648 struct io_wq_acct *acct = io_wq_get_acct(worker); 649 struct io_wq *wq = worker->wq; 650 bool exit_mask = false, last_timeout = false; 651 char buf[TASK_COMM_LEN] = {}; 652 653 set_mask_bits(&worker->flags, 0, 654 BIT(IO_WORKER_F_UP) | BIT(IO_WORKER_F_RUNNING)); 655 656 snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid); 657 set_task_comm(current, buf); 658 659 while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { 660 long ret; 661 662 set_current_state(TASK_INTERRUPTIBLE); 663 664 /* 665 * If we have work to do, io_acct_run_queue() returns with 666 * the acct->lock held. If not, it will drop it. 667 */ 668 while (io_acct_run_queue(acct)) 669 io_worker_handle_work(acct, worker); 670 671 raw_spin_lock(&acct->workers_lock); 672 /* 673 * Last sleep timed out. Exit if we're not the last worker, 674 * or if someone modified our affinity. 675 */ 676 if (last_timeout && (exit_mask || acct->nr_workers > 1)) { 677 acct->nr_workers--; 678 raw_spin_unlock(&acct->workers_lock); 679 __set_current_state(TASK_RUNNING); 680 break; 681 } 682 last_timeout = false; 683 __io_worker_idle(acct, worker); 684 raw_spin_unlock(&acct->workers_lock); 685 if (io_run_task_work()) 686 continue; 687 ret = schedule_timeout(WORKER_IDLE_TIMEOUT); 688 if (signal_pending(current)) { 689 struct ksignal ksig; 690 691 if (!get_signal(&ksig)) 692 continue; 693 break; 694 } 695 if (!ret) { 696 last_timeout = true; 697 exit_mask = !cpumask_test_cpu(raw_smp_processor_id(), 698 wq->cpu_mask); 699 } 700 } 701 702 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) && io_acct_run_queue(acct)) 703 io_worker_handle_work(acct, worker); 704 705 io_worker_exit(worker); 706 return 0; 707 } 708 709 /* 710 * Called when a worker is scheduled in. Mark us as currently running. 711 */ 712 void io_wq_worker_running(struct task_struct *tsk) 713 { 714 struct io_worker *worker = tsk->worker_private; 715 716 if (!worker) 717 return; 718 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 719 return; 720 if (test_bit(IO_WORKER_F_RUNNING, &worker->flags)) 721 return; 722 set_bit(IO_WORKER_F_RUNNING, &worker->flags); 723 io_wq_inc_running(worker); 724 } 725 726 /* 727 * Called when worker is going to sleep. If there are no workers currently 728 * running and we have work pending, wake up a free one or create a new one. 729 */ 730 void io_wq_worker_sleeping(struct task_struct *tsk) 731 { 732 struct io_worker *worker = tsk->worker_private; 733 734 if (!worker) 735 return; 736 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 737 return; 738 if (!test_bit(IO_WORKER_F_RUNNING, &worker->flags)) 739 return; 740 741 clear_bit(IO_WORKER_F_RUNNING, &worker->flags); 742 io_wq_dec_running(worker); 743 } 744 745 static void io_init_new_worker(struct io_wq *wq, struct io_wq_acct *acct, struct io_worker *worker, 746 struct task_struct *tsk) 747 { 748 tsk->worker_private = worker; 749 worker->task = tsk; 750 set_cpus_allowed_ptr(tsk, wq->cpu_mask); 751 752 raw_spin_lock(&acct->workers_lock); 753 hlist_nulls_add_head_rcu(&worker->nulls_node, &acct->free_list); 754 list_add_tail_rcu(&worker->all_list, &acct->all_list); 755 set_bit(IO_WORKER_F_FREE, &worker->flags); 756 raw_spin_unlock(&acct->workers_lock); 757 wake_up_new_task(tsk); 758 } 759 760 static bool io_wq_work_match_all(struct io_wq_work *work, void *data) 761 { 762 return true; 763 } 764 765 static inline bool io_should_retry_thread(struct io_worker *worker, long err) 766 { 767 /* 768 * Prevent perpetual task_work retry, if the task (or its group) is 769 * exiting. 770 */ 771 if (fatal_signal_pending(current)) 772 return false; 773 if (worker->init_retries++ >= WORKER_INIT_LIMIT) 774 return false; 775 776 switch (err) { 777 case -EAGAIN: 778 case -ERESTARTSYS: 779 case -ERESTARTNOINTR: 780 case -ERESTARTNOHAND: 781 return true; 782 default: 783 return false; 784 } 785 } 786 787 static void queue_create_worker_retry(struct io_worker *worker) 788 { 789 /* 790 * We only bother retrying because there's a chance that the 791 * failure to create a worker is due to some temporary condition 792 * in the forking task (e.g. outstanding signal); give the task 793 * some time to clear that condition. 794 */ 795 schedule_delayed_work(&worker->work, 796 msecs_to_jiffies(worker->init_retries * 5)); 797 } 798 799 static void create_worker_cont(struct callback_head *cb) 800 { 801 struct io_worker *worker; 802 struct task_struct *tsk; 803 struct io_wq *wq; 804 struct io_wq_acct *acct; 805 806 worker = container_of(cb, struct io_worker, create_work); 807 clear_bit_unlock(0, &worker->create_state); 808 wq = worker->wq; 809 acct = io_wq_get_acct(worker); 810 tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); 811 if (!IS_ERR(tsk)) { 812 io_init_new_worker(wq, acct, worker, tsk); 813 io_worker_release(worker); 814 return; 815 } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) { 816 atomic_dec(&acct->nr_running); 817 raw_spin_lock(&acct->workers_lock); 818 acct->nr_workers--; 819 if (!acct->nr_workers) { 820 struct io_cb_cancel_data match = { 821 .fn = io_wq_work_match_all, 822 .cancel_all = true, 823 }; 824 825 raw_spin_unlock(&acct->workers_lock); 826 while (io_acct_cancel_pending_work(wq, acct, &match)) 827 ; 828 } else { 829 raw_spin_unlock(&acct->workers_lock); 830 } 831 io_worker_ref_put(wq); 832 kfree(worker); 833 return; 834 } 835 836 /* re-create attempts grab a new worker ref, drop the existing one */ 837 io_worker_release(worker); 838 queue_create_worker_retry(worker); 839 } 840 841 static void io_workqueue_create(struct work_struct *work) 842 { 843 struct io_worker *worker = container_of(work, struct io_worker, 844 work.work); 845 struct io_wq_acct *acct = io_wq_get_acct(worker); 846 847 if (!io_queue_worker_create(worker, acct, create_worker_cont)) 848 kfree(worker); 849 } 850 851 static bool create_io_worker(struct io_wq *wq, struct io_wq_acct *acct) 852 { 853 struct io_worker *worker; 854 struct task_struct *tsk; 855 856 __set_current_state(TASK_RUNNING); 857 858 worker = kzalloc(sizeof(*worker), GFP_KERNEL); 859 if (!worker) { 860 fail: 861 atomic_dec(&acct->nr_running); 862 raw_spin_lock(&acct->workers_lock); 863 acct->nr_workers--; 864 raw_spin_unlock(&acct->workers_lock); 865 io_worker_ref_put(wq); 866 return false; 867 } 868 869 refcount_set(&worker->ref, 1); 870 worker->wq = wq; 871 worker->acct = acct; 872 raw_spin_lock_init(&worker->lock); 873 init_completion(&worker->ref_done); 874 875 tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); 876 if (!IS_ERR(tsk)) { 877 io_init_new_worker(wq, acct, worker, tsk); 878 } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) { 879 kfree(worker); 880 goto fail; 881 } else { 882 INIT_DELAYED_WORK(&worker->work, io_workqueue_create); 883 queue_create_worker_retry(worker); 884 } 885 886 return true; 887 } 888 889 /* 890 * Iterate the passed in list and call the specific function for each 891 * worker that isn't exiting 892 */ 893 static bool io_acct_for_each_worker(struct io_wq_acct *acct, 894 bool (*func)(struct io_worker *, void *), 895 void *data) 896 { 897 struct io_worker *worker; 898 bool ret = false; 899 900 list_for_each_entry_rcu(worker, &acct->all_list, all_list) { 901 if (io_worker_get(worker)) { 902 /* no task if node is/was offline */ 903 if (worker->task) 904 ret = func(worker, data); 905 io_worker_release(worker); 906 if (ret) 907 break; 908 } 909 } 910 911 return ret; 912 } 913 914 static bool io_wq_for_each_worker(struct io_wq *wq, 915 bool (*func)(struct io_worker *, void *), 916 void *data) 917 { 918 for (int i = 0; i < IO_WQ_ACCT_NR; i++) { 919 if (!io_acct_for_each_worker(&wq->acct[i], func, data)) 920 return false; 921 } 922 923 return true; 924 } 925 926 static bool io_wq_worker_wake(struct io_worker *worker, void *data) 927 { 928 __set_notify_signal(worker->task); 929 wake_up_process(worker->task); 930 return false; 931 } 932 933 static void io_run_cancel(struct io_wq_work *work, struct io_wq *wq) 934 { 935 do { 936 atomic_or(IO_WQ_WORK_CANCEL, &work->flags); 937 wq->do_work(work); 938 work = wq->free_work(work); 939 } while (work); 940 } 941 942 static void io_wq_insert_work(struct io_wq *wq, struct io_wq_acct *acct, 943 struct io_wq_work *work, unsigned int work_flags) 944 { 945 unsigned int hash; 946 struct io_wq_work *tail; 947 948 if (!__io_wq_is_hashed(work_flags)) { 949 append: 950 wq_list_add_tail(&work->list, &acct->work_list); 951 return; 952 } 953 954 hash = __io_get_work_hash(work_flags); 955 tail = wq->hash_tail[hash]; 956 wq->hash_tail[hash] = work; 957 if (!tail) 958 goto append; 959 960 wq_list_add_after(&work->list, &tail->list, &acct->work_list); 961 } 962 963 static bool io_wq_work_match_item(struct io_wq_work *work, void *data) 964 { 965 return work == data; 966 } 967 968 void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) 969 { 970 unsigned int work_flags = atomic_read(&work->flags); 971 struct io_wq_acct *acct = io_work_get_acct(wq, work_flags); 972 struct io_cb_cancel_data match = { 973 .fn = io_wq_work_match_item, 974 .data = work, 975 .cancel_all = false, 976 }; 977 bool do_create; 978 979 /* 980 * If io-wq is exiting for this task, or if the request has explicitly 981 * been marked as one that should not get executed, cancel it here. 982 */ 983 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) || 984 (work_flags & IO_WQ_WORK_CANCEL)) { 985 io_run_cancel(work, wq); 986 return; 987 } 988 989 raw_spin_lock(&acct->lock); 990 io_wq_insert_work(wq, acct, work, work_flags); 991 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 992 raw_spin_unlock(&acct->lock); 993 994 rcu_read_lock(); 995 do_create = !io_acct_activate_free_worker(acct); 996 rcu_read_unlock(); 997 998 if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) || 999 !atomic_read(&acct->nr_running))) { 1000 bool did_create; 1001 1002 did_create = io_wq_create_worker(wq, acct); 1003 if (likely(did_create)) 1004 return; 1005 1006 raw_spin_lock(&acct->workers_lock); 1007 if (acct->nr_workers) { 1008 raw_spin_unlock(&acct->workers_lock); 1009 return; 1010 } 1011 raw_spin_unlock(&acct->workers_lock); 1012 1013 /* fatal condition, failed to create the first worker */ 1014 io_acct_cancel_pending_work(wq, acct, &match); 1015 } 1016 } 1017 1018 /* 1019 * Work items that hash to the same value will not be done in parallel. 1020 * Used to limit concurrent writes, generally hashed by inode. 1021 */ 1022 void io_wq_hash_work(struct io_wq_work *work, void *val) 1023 { 1024 unsigned int bit; 1025 1026 bit = hash_ptr(val, IO_WQ_HASH_ORDER); 1027 atomic_or(IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT), &work->flags); 1028 } 1029 1030 static bool __io_wq_worker_cancel(struct io_worker *worker, 1031 struct io_cb_cancel_data *match, 1032 struct io_wq_work *work) 1033 { 1034 if (work && match->fn(work, match->data)) { 1035 atomic_or(IO_WQ_WORK_CANCEL, &work->flags); 1036 __set_notify_signal(worker->task); 1037 return true; 1038 } 1039 1040 return false; 1041 } 1042 1043 static bool io_wq_worker_cancel(struct io_worker *worker, void *data) 1044 { 1045 struct io_cb_cancel_data *match = data; 1046 1047 /* 1048 * Hold the lock to avoid ->cur_work going out of scope, caller 1049 * may dereference the passed in work. 1050 */ 1051 raw_spin_lock(&worker->lock); 1052 if (__io_wq_worker_cancel(worker, match, worker->cur_work)) 1053 match->nr_running++; 1054 raw_spin_unlock(&worker->lock); 1055 1056 return match->nr_running && !match->cancel_all; 1057 } 1058 1059 static inline void io_wq_remove_pending(struct io_wq *wq, 1060 struct io_wq_acct *acct, 1061 struct io_wq_work *work, 1062 struct io_wq_work_node *prev) 1063 { 1064 unsigned int hash = io_get_work_hash(work); 1065 struct io_wq_work *prev_work = NULL; 1066 1067 if (io_wq_is_hashed(work) && work == wq->hash_tail[hash]) { 1068 if (prev) 1069 prev_work = container_of(prev, struct io_wq_work, list); 1070 if (prev_work && io_get_work_hash(prev_work) == hash) 1071 wq->hash_tail[hash] = prev_work; 1072 else 1073 wq->hash_tail[hash] = NULL; 1074 } 1075 wq_list_del(&acct->work_list, &work->list, prev); 1076 } 1077 1078 static bool io_acct_cancel_pending_work(struct io_wq *wq, 1079 struct io_wq_acct *acct, 1080 struct io_cb_cancel_data *match) 1081 { 1082 struct io_wq_work_node *node, *prev; 1083 struct io_wq_work *work; 1084 1085 raw_spin_lock(&acct->lock); 1086 wq_list_for_each(node, prev, &acct->work_list) { 1087 work = container_of(node, struct io_wq_work, list); 1088 if (!match->fn(work, match->data)) 1089 continue; 1090 io_wq_remove_pending(wq, acct, work, prev); 1091 raw_spin_unlock(&acct->lock); 1092 io_run_cancel(work, wq); 1093 match->nr_pending++; 1094 /* not safe to continue after unlock */ 1095 return true; 1096 } 1097 raw_spin_unlock(&acct->lock); 1098 1099 return false; 1100 } 1101 1102 static void io_wq_cancel_pending_work(struct io_wq *wq, 1103 struct io_cb_cancel_data *match) 1104 { 1105 int i; 1106 retry: 1107 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1108 struct io_wq_acct *acct = io_get_acct(wq, i == 0); 1109 1110 if (io_acct_cancel_pending_work(wq, acct, match)) { 1111 if (match->cancel_all) 1112 goto retry; 1113 break; 1114 } 1115 } 1116 } 1117 1118 static void io_acct_cancel_running_work(struct io_wq_acct *acct, 1119 struct io_cb_cancel_data *match) 1120 { 1121 raw_spin_lock(&acct->workers_lock); 1122 io_acct_for_each_worker(acct, io_wq_worker_cancel, match); 1123 raw_spin_unlock(&acct->workers_lock); 1124 } 1125 1126 static void io_wq_cancel_running_work(struct io_wq *wq, 1127 struct io_cb_cancel_data *match) 1128 { 1129 rcu_read_lock(); 1130 1131 for (int i = 0; i < IO_WQ_ACCT_NR; i++) 1132 io_acct_cancel_running_work(&wq->acct[i], match); 1133 1134 rcu_read_unlock(); 1135 } 1136 1137 enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, 1138 void *data, bool cancel_all) 1139 { 1140 struct io_cb_cancel_data match = { 1141 .fn = cancel, 1142 .data = data, 1143 .cancel_all = cancel_all, 1144 }; 1145 1146 /* 1147 * First check pending list, if we're lucky we can just remove it 1148 * from there. CANCEL_OK means that the work is returned as-new, 1149 * no completion will be posted for it. 1150 * 1151 * Then check if a free (going busy) or busy worker has the work 1152 * currently running. If we find it there, we'll return CANCEL_RUNNING 1153 * as an indication that we attempt to signal cancellation. The 1154 * completion will run normally in this case. 1155 * 1156 * Do both of these while holding the acct->workers_lock, to ensure that 1157 * we'll find a work item regardless of state. 1158 */ 1159 io_wq_cancel_pending_work(wq, &match); 1160 if (match.nr_pending && !match.cancel_all) 1161 return IO_WQ_CANCEL_OK; 1162 1163 io_wq_cancel_running_work(wq, &match); 1164 if (match.nr_running && !match.cancel_all) 1165 return IO_WQ_CANCEL_RUNNING; 1166 1167 if (match.nr_running) 1168 return IO_WQ_CANCEL_RUNNING; 1169 if (match.nr_pending) 1170 return IO_WQ_CANCEL_OK; 1171 return IO_WQ_CANCEL_NOTFOUND; 1172 } 1173 1174 static int io_wq_hash_wake(struct wait_queue_entry *wait, unsigned mode, 1175 int sync, void *key) 1176 { 1177 struct io_wq *wq = container_of(wait, struct io_wq, wait); 1178 int i; 1179 1180 list_del_init(&wait->entry); 1181 1182 rcu_read_lock(); 1183 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1184 struct io_wq_acct *acct = &wq->acct[i]; 1185 1186 if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags)) 1187 io_acct_activate_free_worker(acct); 1188 } 1189 rcu_read_unlock(); 1190 return 1; 1191 } 1192 1193 struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) 1194 { 1195 int ret, i; 1196 struct io_wq *wq; 1197 1198 if (WARN_ON_ONCE(!data->free_work || !data->do_work)) 1199 return ERR_PTR(-EINVAL); 1200 if (WARN_ON_ONCE(!bounded)) 1201 return ERR_PTR(-EINVAL); 1202 1203 wq = kzalloc(sizeof(struct io_wq), GFP_KERNEL); 1204 if (!wq) 1205 return ERR_PTR(-ENOMEM); 1206 1207 refcount_inc(&data->hash->refs); 1208 wq->hash = data->hash; 1209 wq->free_work = data->free_work; 1210 wq->do_work = data->do_work; 1211 1212 ret = -ENOMEM; 1213 1214 if (!alloc_cpumask_var(&wq->cpu_mask, GFP_KERNEL)) 1215 goto err; 1216 cpuset_cpus_allowed(data->task, wq->cpu_mask); 1217 wq->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; 1218 wq->acct[IO_WQ_ACCT_UNBOUND].max_workers = 1219 task_rlimit(current, RLIMIT_NPROC); 1220 INIT_LIST_HEAD(&wq->wait.entry); 1221 wq->wait.func = io_wq_hash_wake; 1222 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1223 struct io_wq_acct *acct = &wq->acct[i]; 1224 1225 atomic_set(&acct->nr_running, 0); 1226 1227 raw_spin_lock_init(&acct->workers_lock); 1228 INIT_HLIST_NULLS_HEAD(&acct->free_list, 0); 1229 INIT_LIST_HEAD(&acct->all_list); 1230 1231 INIT_WQ_LIST(&acct->work_list); 1232 raw_spin_lock_init(&acct->lock); 1233 } 1234 1235 wq->task = get_task_struct(data->task); 1236 atomic_set(&wq->worker_refs, 1); 1237 init_completion(&wq->worker_done); 1238 ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1239 if (ret) 1240 goto err; 1241 1242 return wq; 1243 err: 1244 io_wq_put_hash(data->hash); 1245 free_cpumask_var(wq->cpu_mask); 1246 kfree(wq); 1247 return ERR_PTR(ret); 1248 } 1249 1250 static bool io_task_work_match(struct callback_head *cb, void *data) 1251 { 1252 struct io_worker *worker; 1253 1254 if (cb->func != create_worker_cb && cb->func != create_worker_cont) 1255 return false; 1256 worker = container_of(cb, struct io_worker, create_work); 1257 return worker->wq == data; 1258 } 1259 1260 void io_wq_exit_start(struct io_wq *wq) 1261 { 1262 set_bit(IO_WQ_BIT_EXIT, &wq->state); 1263 } 1264 1265 static void io_wq_cancel_tw_create(struct io_wq *wq) 1266 { 1267 struct callback_head *cb; 1268 1269 while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) { 1270 struct io_worker *worker; 1271 1272 worker = container_of(cb, struct io_worker, create_work); 1273 io_worker_cancel_cb(worker); 1274 /* 1275 * Only the worker continuation helper has worker allocated and 1276 * hence needs freeing. 1277 */ 1278 if (cb->func == create_worker_cont) 1279 kfree(worker); 1280 } 1281 } 1282 1283 static void io_wq_exit_workers(struct io_wq *wq) 1284 { 1285 if (!wq->task) 1286 return; 1287 1288 io_wq_cancel_tw_create(wq); 1289 1290 rcu_read_lock(); 1291 io_wq_for_each_worker(wq, io_wq_worker_wake, NULL); 1292 rcu_read_unlock(); 1293 io_worker_ref_put(wq); 1294 wait_for_completion(&wq->worker_done); 1295 1296 spin_lock_irq(&wq->hash->wait.lock); 1297 list_del_init(&wq->wait.entry); 1298 spin_unlock_irq(&wq->hash->wait.lock); 1299 1300 put_task_struct(wq->task); 1301 wq->task = NULL; 1302 } 1303 1304 static void io_wq_destroy(struct io_wq *wq) 1305 { 1306 struct io_cb_cancel_data match = { 1307 .fn = io_wq_work_match_all, 1308 .cancel_all = true, 1309 }; 1310 1311 cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1312 io_wq_cancel_pending_work(wq, &match); 1313 free_cpumask_var(wq->cpu_mask); 1314 io_wq_put_hash(wq->hash); 1315 kfree(wq); 1316 } 1317 1318 void io_wq_put_and_exit(struct io_wq *wq) 1319 { 1320 WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state)); 1321 1322 io_wq_exit_workers(wq); 1323 io_wq_destroy(wq); 1324 } 1325 1326 struct online_data { 1327 unsigned int cpu; 1328 bool online; 1329 }; 1330 1331 static bool io_wq_worker_affinity(struct io_worker *worker, void *data) 1332 { 1333 struct online_data *od = data; 1334 1335 if (od->online) 1336 cpumask_set_cpu(od->cpu, worker->wq->cpu_mask); 1337 else 1338 cpumask_clear_cpu(od->cpu, worker->wq->cpu_mask); 1339 return false; 1340 } 1341 1342 static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online) 1343 { 1344 struct online_data od = { 1345 .cpu = cpu, 1346 .online = online 1347 }; 1348 1349 rcu_read_lock(); 1350 io_wq_for_each_worker(wq, io_wq_worker_affinity, &od); 1351 rcu_read_unlock(); 1352 return 0; 1353 } 1354 1355 static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node) 1356 { 1357 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1358 1359 return __io_wq_cpu_online(wq, cpu, true); 1360 } 1361 1362 static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node) 1363 { 1364 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1365 1366 return __io_wq_cpu_online(wq, cpu, false); 1367 } 1368 1369 int io_wq_cpu_affinity(struct io_uring_task *tctx, cpumask_var_t mask) 1370 { 1371 cpumask_var_t allowed_mask; 1372 int ret = 0; 1373 1374 if (!tctx || !tctx->io_wq) 1375 return -EINVAL; 1376 1377 if (!alloc_cpumask_var(&allowed_mask, GFP_KERNEL)) 1378 return -ENOMEM; 1379 1380 rcu_read_lock(); 1381 cpuset_cpus_allowed(tctx->io_wq->task, allowed_mask); 1382 if (mask) { 1383 if (cpumask_subset(mask, allowed_mask)) 1384 cpumask_copy(tctx->io_wq->cpu_mask, mask); 1385 else 1386 ret = -EINVAL; 1387 } else { 1388 cpumask_copy(tctx->io_wq->cpu_mask, allowed_mask); 1389 } 1390 rcu_read_unlock(); 1391 1392 free_cpumask_var(allowed_mask); 1393 return ret; 1394 } 1395 1396 /* 1397 * Set max number of unbounded workers, returns old value. If new_count is 0, 1398 * then just return the old value. 1399 */ 1400 int io_wq_max_workers(struct io_wq *wq, int *new_count) 1401 { 1402 struct io_wq_acct *acct; 1403 int prev[IO_WQ_ACCT_NR]; 1404 int i; 1405 1406 BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND); 1407 BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND); 1408 BUILD_BUG_ON((int) IO_WQ_ACCT_NR != 2); 1409 1410 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1411 if (new_count[i] > task_rlimit(current, RLIMIT_NPROC)) 1412 new_count[i] = task_rlimit(current, RLIMIT_NPROC); 1413 } 1414 1415 for (i = 0; i < IO_WQ_ACCT_NR; i++) 1416 prev[i] = 0; 1417 1418 rcu_read_lock(); 1419 1420 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1421 acct = &wq->acct[i]; 1422 raw_spin_lock(&acct->workers_lock); 1423 prev[i] = max_t(int, acct->max_workers, prev[i]); 1424 if (new_count[i]) 1425 acct->max_workers = new_count[i]; 1426 raw_spin_unlock(&acct->workers_lock); 1427 } 1428 rcu_read_unlock(); 1429 1430 for (i = 0; i < IO_WQ_ACCT_NR; i++) 1431 new_count[i] = prev[i]; 1432 1433 return 0; 1434 } 1435 1436 static __init int io_wq_init(void) 1437 { 1438 int ret; 1439 1440 ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online", 1441 io_wq_cpu_online, io_wq_cpu_offline); 1442 if (ret < 0) 1443 return ret; 1444 io_wq_online = ret; 1445 return 0; 1446 } 1447 subsys_initcall(io_wq_init); 1448