1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Basic worker thread pool for io_uring 4 * 5 * Copyright (C) 2019 Jens Axboe 6 * 7 */ 8 #include <linux/kernel.h> 9 #include <linux/init.h> 10 #include <linux/errno.h> 11 #include <linux/sched/signal.h> 12 #include <linux/percpu.h> 13 #include <linux/slab.h> 14 #include <linux/rculist_nulls.h> 15 #include <linux/cpu.h> 16 #include <linux/cpuset.h> 17 #include <linux/task_work.h> 18 #include <linux/audit.h> 19 #include <linux/mmu_context.h> 20 #include <uapi/linux/io_uring.h> 21 22 #include "io-wq.h" 23 #include "slist.h" 24 #include "io_uring.h" 25 26 #define WORKER_IDLE_TIMEOUT (5 * HZ) 27 #define WORKER_INIT_LIMIT 3 28 29 enum { 30 IO_WORKER_F_UP = 0, /* up and active */ 31 IO_WORKER_F_RUNNING = 1, /* account as running */ 32 IO_WORKER_F_FREE = 2, /* worker on free list */ 33 IO_WORKER_F_BOUND = 3, /* is doing bounded work */ 34 }; 35 36 enum { 37 IO_WQ_BIT_EXIT = 0, /* wq exiting */ 38 }; 39 40 enum { 41 IO_ACCT_STALLED_BIT = 0, /* stalled on hash */ 42 }; 43 44 /* 45 * One for each thread in a wq pool 46 */ 47 struct io_worker { 48 refcount_t ref; 49 int create_index; 50 unsigned long flags; 51 struct hlist_nulls_node nulls_node; 52 struct list_head all_list; 53 struct task_struct *task; 54 struct io_wq *wq; 55 56 struct io_wq_work *cur_work; 57 raw_spinlock_t lock; 58 59 struct completion ref_done; 60 61 unsigned long create_state; 62 struct callback_head create_work; 63 int init_retries; 64 65 union { 66 struct rcu_head rcu; 67 struct delayed_work work; 68 }; 69 }; 70 71 #if BITS_PER_LONG == 64 72 #define IO_WQ_HASH_ORDER 6 73 #else 74 #define IO_WQ_HASH_ORDER 5 75 #endif 76 77 #define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER) 78 79 struct io_wq_acct { 80 unsigned nr_workers; 81 unsigned max_workers; 82 int index; 83 atomic_t nr_running; 84 raw_spinlock_t lock; 85 struct io_wq_work_list work_list; 86 unsigned long flags; 87 }; 88 89 enum { 90 IO_WQ_ACCT_BOUND, 91 IO_WQ_ACCT_UNBOUND, 92 IO_WQ_ACCT_NR, 93 }; 94 95 /* 96 * Per io_wq state 97 */ 98 struct io_wq { 99 unsigned long state; 100 101 free_work_fn *free_work; 102 io_wq_work_fn *do_work; 103 104 struct io_wq_hash *hash; 105 106 atomic_t worker_refs; 107 struct completion worker_done; 108 109 struct hlist_node cpuhp_node; 110 111 struct task_struct *task; 112 113 struct io_wq_acct acct[IO_WQ_ACCT_NR]; 114 115 /* lock protects access to elements below */ 116 raw_spinlock_t lock; 117 118 struct hlist_nulls_head free_list; 119 struct list_head all_list; 120 121 struct wait_queue_entry wait; 122 123 struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS]; 124 125 cpumask_var_t cpu_mask; 126 }; 127 128 static enum cpuhp_state io_wq_online; 129 130 struct io_cb_cancel_data { 131 work_cancel_fn *fn; 132 void *data; 133 int nr_running; 134 int nr_pending; 135 bool cancel_all; 136 }; 137 138 static bool create_io_worker(struct io_wq *wq, int index); 139 static void io_wq_dec_running(struct io_worker *worker); 140 static bool io_acct_cancel_pending_work(struct io_wq *wq, 141 struct io_wq_acct *acct, 142 struct io_cb_cancel_data *match); 143 static void create_worker_cb(struct callback_head *cb); 144 static void io_wq_cancel_tw_create(struct io_wq *wq); 145 146 static bool io_worker_get(struct io_worker *worker) 147 { 148 return refcount_inc_not_zero(&worker->ref); 149 } 150 151 static void io_worker_release(struct io_worker *worker) 152 { 153 if (refcount_dec_and_test(&worker->ref)) 154 complete(&worker->ref_done); 155 } 156 157 static inline struct io_wq_acct *io_get_acct(struct io_wq *wq, bool bound) 158 { 159 return &wq->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND]; 160 } 161 162 static inline struct io_wq_acct *io_work_get_acct(struct io_wq *wq, 163 struct io_wq_work *work) 164 { 165 return io_get_acct(wq, !(atomic_read(&work->flags) & IO_WQ_WORK_UNBOUND)); 166 } 167 168 static inline struct io_wq_acct *io_wq_get_acct(struct io_worker *worker) 169 { 170 return io_get_acct(worker->wq, test_bit(IO_WORKER_F_BOUND, &worker->flags)); 171 } 172 173 static void io_worker_ref_put(struct io_wq *wq) 174 { 175 if (atomic_dec_and_test(&wq->worker_refs)) 176 complete(&wq->worker_done); 177 } 178 179 bool io_wq_worker_stopped(void) 180 { 181 struct io_worker *worker = current->worker_private; 182 183 if (WARN_ON_ONCE(!io_wq_current_is_worker())) 184 return true; 185 186 return test_bit(IO_WQ_BIT_EXIT, &worker->wq->state); 187 } 188 189 static void io_worker_cancel_cb(struct io_worker *worker) 190 { 191 struct io_wq_acct *acct = io_wq_get_acct(worker); 192 struct io_wq *wq = worker->wq; 193 194 atomic_dec(&acct->nr_running); 195 raw_spin_lock(&wq->lock); 196 acct->nr_workers--; 197 raw_spin_unlock(&wq->lock); 198 io_worker_ref_put(wq); 199 clear_bit_unlock(0, &worker->create_state); 200 io_worker_release(worker); 201 } 202 203 static bool io_task_worker_match(struct callback_head *cb, void *data) 204 { 205 struct io_worker *worker; 206 207 if (cb->func != create_worker_cb) 208 return false; 209 worker = container_of(cb, struct io_worker, create_work); 210 return worker == data; 211 } 212 213 static void io_worker_exit(struct io_worker *worker) 214 { 215 struct io_wq *wq = worker->wq; 216 217 while (1) { 218 struct callback_head *cb = task_work_cancel_match(wq->task, 219 io_task_worker_match, worker); 220 221 if (!cb) 222 break; 223 io_worker_cancel_cb(worker); 224 } 225 226 io_worker_release(worker); 227 wait_for_completion(&worker->ref_done); 228 229 raw_spin_lock(&wq->lock); 230 if (test_bit(IO_WORKER_F_FREE, &worker->flags)) 231 hlist_nulls_del_rcu(&worker->nulls_node); 232 list_del_rcu(&worker->all_list); 233 raw_spin_unlock(&wq->lock); 234 io_wq_dec_running(worker); 235 /* 236 * this worker is a goner, clear ->worker_private to avoid any 237 * inc/dec running calls that could happen as part of exit from 238 * touching 'worker'. 239 */ 240 current->worker_private = NULL; 241 242 kfree_rcu(worker, rcu); 243 io_worker_ref_put(wq); 244 do_exit(0); 245 } 246 247 static inline bool __io_acct_run_queue(struct io_wq_acct *acct) 248 { 249 return !test_bit(IO_ACCT_STALLED_BIT, &acct->flags) && 250 !wq_list_empty(&acct->work_list); 251 } 252 253 /* 254 * If there's work to do, returns true with acct->lock acquired. If not, 255 * returns false with no lock held. 256 */ 257 static inline bool io_acct_run_queue(struct io_wq_acct *acct) 258 __acquires(&acct->lock) 259 { 260 raw_spin_lock(&acct->lock); 261 if (__io_acct_run_queue(acct)) 262 return true; 263 264 raw_spin_unlock(&acct->lock); 265 return false; 266 } 267 268 /* 269 * Check head of free list for an available worker. If one isn't available, 270 * caller must create one. 271 */ 272 static bool io_wq_activate_free_worker(struct io_wq *wq, 273 struct io_wq_acct *acct) 274 __must_hold(RCU) 275 { 276 struct hlist_nulls_node *n; 277 struct io_worker *worker; 278 279 /* 280 * Iterate free_list and see if we can find an idle worker to 281 * activate. If a given worker is on the free_list but in the process 282 * of exiting, keep trying. 283 */ 284 hlist_nulls_for_each_entry_rcu(worker, n, &wq->free_list, nulls_node) { 285 if (!io_worker_get(worker)) 286 continue; 287 if (io_wq_get_acct(worker) != acct) { 288 io_worker_release(worker); 289 continue; 290 } 291 /* 292 * If the worker is already running, it's either already 293 * starting work or finishing work. In either case, if it does 294 * to go sleep, we'll kick off a new task for this work anyway. 295 */ 296 wake_up_process(worker->task); 297 io_worker_release(worker); 298 return true; 299 } 300 301 return false; 302 } 303 304 /* 305 * We need a worker. If we find a free one, we're good. If not, and we're 306 * below the max number of workers, create one. 307 */ 308 static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct) 309 { 310 /* 311 * Most likely an attempt to queue unbounded work on an io_wq that 312 * wasn't setup with any unbounded workers. 313 */ 314 if (unlikely(!acct->max_workers)) 315 pr_warn_once("io-wq is not configured for unbound workers"); 316 317 raw_spin_lock(&wq->lock); 318 if (acct->nr_workers >= acct->max_workers) { 319 raw_spin_unlock(&wq->lock); 320 return true; 321 } 322 acct->nr_workers++; 323 raw_spin_unlock(&wq->lock); 324 atomic_inc(&acct->nr_running); 325 atomic_inc(&wq->worker_refs); 326 return create_io_worker(wq, acct->index); 327 } 328 329 static void io_wq_inc_running(struct io_worker *worker) 330 { 331 struct io_wq_acct *acct = io_wq_get_acct(worker); 332 333 atomic_inc(&acct->nr_running); 334 } 335 336 static void create_worker_cb(struct callback_head *cb) 337 { 338 struct io_worker *worker; 339 struct io_wq *wq; 340 341 struct io_wq_acct *acct; 342 bool do_create = false; 343 344 worker = container_of(cb, struct io_worker, create_work); 345 wq = worker->wq; 346 acct = &wq->acct[worker->create_index]; 347 raw_spin_lock(&wq->lock); 348 349 if (acct->nr_workers < acct->max_workers) { 350 acct->nr_workers++; 351 do_create = true; 352 } 353 raw_spin_unlock(&wq->lock); 354 if (do_create) { 355 create_io_worker(wq, worker->create_index); 356 } else { 357 atomic_dec(&acct->nr_running); 358 io_worker_ref_put(wq); 359 } 360 clear_bit_unlock(0, &worker->create_state); 361 io_worker_release(worker); 362 } 363 364 static bool io_queue_worker_create(struct io_worker *worker, 365 struct io_wq_acct *acct, 366 task_work_func_t func) 367 { 368 struct io_wq *wq = worker->wq; 369 370 /* raced with exit, just ignore create call */ 371 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) 372 goto fail; 373 if (!io_worker_get(worker)) 374 goto fail; 375 /* 376 * create_state manages ownership of create_work/index. We should 377 * only need one entry per worker, as the worker going to sleep 378 * will trigger the condition, and waking will clear it once it 379 * runs the task_work. 380 */ 381 if (test_bit(0, &worker->create_state) || 382 test_and_set_bit_lock(0, &worker->create_state)) 383 goto fail_release; 384 385 atomic_inc(&wq->worker_refs); 386 init_task_work(&worker->create_work, func); 387 worker->create_index = acct->index; 388 if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) { 389 /* 390 * EXIT may have been set after checking it above, check after 391 * adding the task_work and remove any creation item if it is 392 * now set. wq exit does that too, but we can have added this 393 * work item after we canceled in io_wq_exit_workers(). 394 */ 395 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) 396 io_wq_cancel_tw_create(wq); 397 io_worker_ref_put(wq); 398 return true; 399 } 400 io_worker_ref_put(wq); 401 clear_bit_unlock(0, &worker->create_state); 402 fail_release: 403 io_worker_release(worker); 404 fail: 405 atomic_dec(&acct->nr_running); 406 io_worker_ref_put(wq); 407 return false; 408 } 409 410 static void io_wq_dec_running(struct io_worker *worker) 411 { 412 struct io_wq_acct *acct = io_wq_get_acct(worker); 413 struct io_wq *wq = worker->wq; 414 415 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 416 return; 417 418 if (!atomic_dec_and_test(&acct->nr_running)) 419 return; 420 if (!io_acct_run_queue(acct)) 421 return; 422 423 raw_spin_unlock(&acct->lock); 424 atomic_inc(&acct->nr_running); 425 atomic_inc(&wq->worker_refs); 426 io_queue_worker_create(worker, acct, create_worker_cb); 427 } 428 429 /* 430 * Worker will start processing some work. Move it to the busy list, if 431 * it's currently on the freelist 432 */ 433 static void __io_worker_busy(struct io_wq *wq, struct io_worker *worker) 434 { 435 if (test_bit(IO_WORKER_F_FREE, &worker->flags)) { 436 clear_bit(IO_WORKER_F_FREE, &worker->flags); 437 raw_spin_lock(&wq->lock); 438 hlist_nulls_del_init_rcu(&worker->nulls_node); 439 raw_spin_unlock(&wq->lock); 440 } 441 } 442 443 /* 444 * No work, worker going to sleep. Move to freelist. 445 */ 446 static void __io_worker_idle(struct io_wq *wq, struct io_worker *worker) 447 __must_hold(wq->lock) 448 { 449 if (!test_bit(IO_WORKER_F_FREE, &worker->flags)) { 450 set_bit(IO_WORKER_F_FREE, &worker->flags); 451 hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list); 452 } 453 } 454 455 static inline unsigned int io_get_work_hash(struct io_wq_work *work) 456 { 457 return atomic_read(&work->flags) >> IO_WQ_HASH_SHIFT; 458 } 459 460 static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash) 461 { 462 bool ret = false; 463 464 spin_lock_irq(&wq->hash->wait.lock); 465 if (list_empty(&wq->wait.entry)) { 466 __add_wait_queue(&wq->hash->wait, &wq->wait); 467 if (!test_bit(hash, &wq->hash->map)) { 468 __set_current_state(TASK_RUNNING); 469 list_del_init(&wq->wait.entry); 470 ret = true; 471 } 472 } 473 spin_unlock_irq(&wq->hash->wait.lock); 474 return ret; 475 } 476 477 static struct io_wq_work *io_get_next_work(struct io_wq_acct *acct, 478 struct io_worker *worker) 479 __must_hold(acct->lock) 480 { 481 struct io_wq_work_node *node, *prev; 482 struct io_wq_work *work, *tail; 483 unsigned int stall_hash = -1U; 484 struct io_wq *wq = worker->wq; 485 486 wq_list_for_each(node, prev, &acct->work_list) { 487 unsigned int hash; 488 489 work = container_of(node, struct io_wq_work, list); 490 491 /* not hashed, can run anytime */ 492 if (!io_wq_is_hashed(work)) { 493 wq_list_del(&acct->work_list, node, prev); 494 return work; 495 } 496 497 hash = io_get_work_hash(work); 498 /* all items with this hash lie in [work, tail] */ 499 tail = wq->hash_tail[hash]; 500 501 /* hashed, can run if not already running */ 502 if (!test_and_set_bit(hash, &wq->hash->map)) { 503 wq->hash_tail[hash] = NULL; 504 wq_list_cut(&acct->work_list, &tail->list, prev); 505 return work; 506 } 507 if (stall_hash == -1U) 508 stall_hash = hash; 509 /* fast forward to a next hash, for-each will fix up @prev */ 510 node = &tail->list; 511 } 512 513 if (stall_hash != -1U) { 514 bool unstalled; 515 516 /* 517 * Set this before dropping the lock to avoid racing with new 518 * work being added and clearing the stalled bit. 519 */ 520 set_bit(IO_ACCT_STALLED_BIT, &acct->flags); 521 raw_spin_unlock(&acct->lock); 522 unstalled = io_wait_on_hash(wq, stall_hash); 523 raw_spin_lock(&acct->lock); 524 if (unstalled) { 525 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 526 if (wq_has_sleeper(&wq->hash->wait)) 527 wake_up(&wq->hash->wait); 528 } 529 } 530 531 return NULL; 532 } 533 534 static void io_assign_current_work(struct io_worker *worker, 535 struct io_wq_work *work) 536 { 537 if (work) { 538 io_run_task_work(); 539 cond_resched(); 540 } 541 542 raw_spin_lock(&worker->lock); 543 worker->cur_work = work; 544 raw_spin_unlock(&worker->lock); 545 } 546 547 /* 548 * Called with acct->lock held, drops it before returning 549 */ 550 static void io_worker_handle_work(struct io_wq_acct *acct, 551 struct io_worker *worker) 552 __releases(&acct->lock) 553 { 554 struct io_wq *wq = worker->wq; 555 bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state); 556 557 do { 558 struct io_wq_work *work; 559 560 /* 561 * If we got some work, mark us as busy. If we didn't, but 562 * the list isn't empty, it means we stalled on hashed work. 563 * Mark us stalled so we don't keep looking for work when we 564 * can't make progress, any work completion or insertion will 565 * clear the stalled flag. 566 */ 567 work = io_get_next_work(acct, worker); 568 if (work) { 569 /* 570 * Make sure cancelation can find this, even before 571 * it becomes the active work. That avoids a window 572 * where the work has been removed from our general 573 * work list, but isn't yet discoverable as the 574 * current work item for this worker. 575 */ 576 raw_spin_lock(&worker->lock); 577 worker->cur_work = work; 578 raw_spin_unlock(&worker->lock); 579 } 580 581 raw_spin_unlock(&acct->lock); 582 583 if (!work) 584 break; 585 586 __io_worker_busy(wq, worker); 587 588 io_assign_current_work(worker, work); 589 __set_current_state(TASK_RUNNING); 590 591 /* handle a whole dependent link */ 592 do { 593 struct io_wq_work *next_hashed, *linked; 594 unsigned int hash = io_get_work_hash(work); 595 596 next_hashed = wq_next_work(work); 597 598 if (do_kill && 599 (atomic_read(&work->flags) & IO_WQ_WORK_UNBOUND)) 600 atomic_or(IO_WQ_WORK_CANCEL, &work->flags); 601 wq->do_work(work); 602 io_assign_current_work(worker, NULL); 603 604 linked = wq->free_work(work); 605 work = next_hashed; 606 if (!work && linked && !io_wq_is_hashed(linked)) { 607 work = linked; 608 linked = NULL; 609 } 610 io_assign_current_work(worker, work); 611 if (linked) 612 io_wq_enqueue(wq, linked); 613 614 if (hash != -1U && !next_hashed) { 615 /* serialize hash clear with wake_up() */ 616 spin_lock_irq(&wq->hash->wait.lock); 617 clear_bit(hash, &wq->hash->map); 618 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 619 spin_unlock_irq(&wq->hash->wait.lock); 620 if (wq_has_sleeper(&wq->hash->wait)) 621 wake_up(&wq->hash->wait); 622 } 623 } while (work); 624 625 if (!__io_acct_run_queue(acct)) 626 break; 627 raw_spin_lock(&acct->lock); 628 } while (1); 629 } 630 631 static int io_wq_worker(void *data) 632 { 633 struct io_worker *worker = data; 634 struct io_wq_acct *acct = io_wq_get_acct(worker); 635 struct io_wq *wq = worker->wq; 636 bool exit_mask = false, last_timeout = false; 637 char buf[TASK_COMM_LEN] = {}; 638 639 set_mask_bits(&worker->flags, 0, 640 BIT(IO_WORKER_F_UP) | BIT(IO_WORKER_F_RUNNING)); 641 642 snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid); 643 set_task_comm(current, buf); 644 645 while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { 646 long ret; 647 648 set_current_state(TASK_INTERRUPTIBLE); 649 650 /* 651 * If we have work to do, io_acct_run_queue() returns with 652 * the acct->lock held. If not, it will drop it. 653 */ 654 while (io_acct_run_queue(acct)) 655 io_worker_handle_work(acct, worker); 656 657 raw_spin_lock(&wq->lock); 658 /* 659 * Last sleep timed out. Exit if we're not the last worker, 660 * or if someone modified our affinity. 661 */ 662 if (last_timeout && (exit_mask || acct->nr_workers > 1)) { 663 acct->nr_workers--; 664 raw_spin_unlock(&wq->lock); 665 __set_current_state(TASK_RUNNING); 666 break; 667 } 668 last_timeout = false; 669 __io_worker_idle(wq, worker); 670 raw_spin_unlock(&wq->lock); 671 if (io_run_task_work()) 672 continue; 673 ret = schedule_timeout(WORKER_IDLE_TIMEOUT); 674 if (signal_pending(current)) { 675 struct ksignal ksig; 676 677 if (!get_signal(&ksig)) 678 continue; 679 break; 680 } 681 if (!ret) { 682 last_timeout = true; 683 exit_mask = !cpumask_test_cpu(raw_smp_processor_id(), 684 wq->cpu_mask); 685 } 686 } 687 688 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) && io_acct_run_queue(acct)) 689 io_worker_handle_work(acct, worker); 690 691 io_worker_exit(worker); 692 return 0; 693 } 694 695 /* 696 * Called when a worker is scheduled in. Mark us as currently running. 697 */ 698 void io_wq_worker_running(struct task_struct *tsk) 699 { 700 struct io_worker *worker = tsk->worker_private; 701 702 if (!worker) 703 return; 704 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 705 return; 706 if (test_bit(IO_WORKER_F_RUNNING, &worker->flags)) 707 return; 708 set_bit(IO_WORKER_F_RUNNING, &worker->flags); 709 io_wq_inc_running(worker); 710 } 711 712 /* 713 * Called when worker is going to sleep. If there are no workers currently 714 * running and we have work pending, wake up a free one or create a new one. 715 */ 716 void io_wq_worker_sleeping(struct task_struct *tsk) 717 { 718 struct io_worker *worker = tsk->worker_private; 719 720 if (!worker) 721 return; 722 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 723 return; 724 if (!test_bit(IO_WORKER_F_RUNNING, &worker->flags)) 725 return; 726 727 clear_bit(IO_WORKER_F_RUNNING, &worker->flags); 728 io_wq_dec_running(worker); 729 } 730 731 static void io_init_new_worker(struct io_wq *wq, struct io_worker *worker, 732 struct task_struct *tsk) 733 { 734 tsk->worker_private = worker; 735 worker->task = tsk; 736 set_cpus_allowed_ptr(tsk, wq->cpu_mask); 737 738 raw_spin_lock(&wq->lock); 739 hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list); 740 list_add_tail_rcu(&worker->all_list, &wq->all_list); 741 set_bit(IO_WORKER_F_FREE, &worker->flags); 742 raw_spin_unlock(&wq->lock); 743 wake_up_new_task(tsk); 744 } 745 746 static bool io_wq_work_match_all(struct io_wq_work *work, void *data) 747 { 748 return true; 749 } 750 751 static inline bool io_should_retry_thread(struct io_worker *worker, long err) 752 { 753 /* 754 * Prevent perpetual task_work retry, if the task (or its group) is 755 * exiting. 756 */ 757 if (fatal_signal_pending(current)) 758 return false; 759 if (worker->init_retries++ >= WORKER_INIT_LIMIT) 760 return false; 761 762 switch (err) { 763 case -EAGAIN: 764 case -ERESTARTSYS: 765 case -ERESTARTNOINTR: 766 case -ERESTARTNOHAND: 767 return true; 768 default: 769 return false; 770 } 771 } 772 773 static void queue_create_worker_retry(struct io_worker *worker) 774 { 775 /* 776 * We only bother retrying because there's a chance that the 777 * failure to create a worker is due to some temporary condition 778 * in the forking task (e.g. outstanding signal); give the task 779 * some time to clear that condition. 780 */ 781 schedule_delayed_work(&worker->work, 782 msecs_to_jiffies(worker->init_retries * 5)); 783 } 784 785 static void create_worker_cont(struct callback_head *cb) 786 { 787 struct io_worker *worker; 788 struct task_struct *tsk; 789 struct io_wq *wq; 790 791 worker = container_of(cb, struct io_worker, create_work); 792 clear_bit_unlock(0, &worker->create_state); 793 wq = worker->wq; 794 tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); 795 if (!IS_ERR(tsk)) { 796 io_init_new_worker(wq, worker, tsk); 797 io_worker_release(worker); 798 return; 799 } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) { 800 struct io_wq_acct *acct = io_wq_get_acct(worker); 801 802 atomic_dec(&acct->nr_running); 803 raw_spin_lock(&wq->lock); 804 acct->nr_workers--; 805 if (!acct->nr_workers) { 806 struct io_cb_cancel_data match = { 807 .fn = io_wq_work_match_all, 808 .cancel_all = true, 809 }; 810 811 raw_spin_unlock(&wq->lock); 812 while (io_acct_cancel_pending_work(wq, acct, &match)) 813 ; 814 } else { 815 raw_spin_unlock(&wq->lock); 816 } 817 io_worker_ref_put(wq); 818 kfree(worker); 819 return; 820 } 821 822 /* re-create attempts grab a new worker ref, drop the existing one */ 823 io_worker_release(worker); 824 queue_create_worker_retry(worker); 825 } 826 827 static void io_workqueue_create(struct work_struct *work) 828 { 829 struct io_worker *worker = container_of(work, struct io_worker, 830 work.work); 831 struct io_wq_acct *acct = io_wq_get_acct(worker); 832 833 if (!io_queue_worker_create(worker, acct, create_worker_cont)) 834 kfree(worker); 835 } 836 837 static bool create_io_worker(struct io_wq *wq, int index) 838 { 839 struct io_wq_acct *acct = &wq->acct[index]; 840 struct io_worker *worker; 841 struct task_struct *tsk; 842 843 __set_current_state(TASK_RUNNING); 844 845 worker = kzalloc(sizeof(*worker), GFP_KERNEL); 846 if (!worker) { 847 fail: 848 atomic_dec(&acct->nr_running); 849 raw_spin_lock(&wq->lock); 850 acct->nr_workers--; 851 raw_spin_unlock(&wq->lock); 852 io_worker_ref_put(wq); 853 return false; 854 } 855 856 refcount_set(&worker->ref, 1); 857 worker->wq = wq; 858 raw_spin_lock_init(&worker->lock); 859 init_completion(&worker->ref_done); 860 861 if (index == IO_WQ_ACCT_BOUND) 862 set_bit(IO_WORKER_F_BOUND, &worker->flags); 863 864 tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); 865 if (!IS_ERR(tsk)) { 866 io_init_new_worker(wq, worker, tsk); 867 } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) { 868 kfree(worker); 869 goto fail; 870 } else { 871 INIT_DELAYED_WORK(&worker->work, io_workqueue_create); 872 queue_create_worker_retry(worker); 873 } 874 875 return true; 876 } 877 878 /* 879 * Iterate the passed in list and call the specific function for each 880 * worker that isn't exiting 881 */ 882 static bool io_wq_for_each_worker(struct io_wq *wq, 883 bool (*func)(struct io_worker *, void *), 884 void *data) 885 { 886 struct io_worker *worker; 887 bool ret = false; 888 889 list_for_each_entry_rcu(worker, &wq->all_list, all_list) { 890 if (io_worker_get(worker)) { 891 /* no task if node is/was offline */ 892 if (worker->task) 893 ret = func(worker, data); 894 io_worker_release(worker); 895 if (ret) 896 break; 897 } 898 } 899 900 return ret; 901 } 902 903 static bool io_wq_worker_wake(struct io_worker *worker, void *data) 904 { 905 __set_notify_signal(worker->task); 906 wake_up_process(worker->task); 907 return false; 908 } 909 910 static void io_run_cancel(struct io_wq_work *work, struct io_wq *wq) 911 { 912 do { 913 atomic_or(IO_WQ_WORK_CANCEL, &work->flags); 914 wq->do_work(work); 915 work = wq->free_work(work); 916 } while (work); 917 } 918 919 static void io_wq_insert_work(struct io_wq *wq, struct io_wq_work *work) 920 { 921 struct io_wq_acct *acct = io_work_get_acct(wq, work); 922 unsigned int hash; 923 struct io_wq_work *tail; 924 925 if (!io_wq_is_hashed(work)) { 926 append: 927 wq_list_add_tail(&work->list, &acct->work_list); 928 return; 929 } 930 931 hash = io_get_work_hash(work); 932 tail = wq->hash_tail[hash]; 933 wq->hash_tail[hash] = work; 934 if (!tail) 935 goto append; 936 937 wq_list_add_after(&work->list, &tail->list, &acct->work_list); 938 } 939 940 static bool io_wq_work_match_item(struct io_wq_work *work, void *data) 941 { 942 return work == data; 943 } 944 945 void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) 946 { 947 struct io_wq_acct *acct = io_work_get_acct(wq, work); 948 unsigned int work_flags = atomic_read(&work->flags); 949 struct io_cb_cancel_data match = { 950 .fn = io_wq_work_match_item, 951 .data = work, 952 .cancel_all = false, 953 }; 954 bool do_create; 955 956 /* 957 * If io-wq is exiting for this task, or if the request has explicitly 958 * been marked as one that should not get executed, cancel it here. 959 */ 960 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) || 961 (work_flags & IO_WQ_WORK_CANCEL)) { 962 io_run_cancel(work, wq); 963 return; 964 } 965 966 raw_spin_lock(&acct->lock); 967 io_wq_insert_work(wq, work); 968 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 969 raw_spin_unlock(&acct->lock); 970 971 rcu_read_lock(); 972 do_create = !io_wq_activate_free_worker(wq, acct); 973 rcu_read_unlock(); 974 975 if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) || 976 !atomic_read(&acct->nr_running))) { 977 bool did_create; 978 979 did_create = io_wq_create_worker(wq, acct); 980 if (likely(did_create)) 981 return; 982 983 raw_spin_lock(&wq->lock); 984 if (acct->nr_workers) { 985 raw_spin_unlock(&wq->lock); 986 return; 987 } 988 raw_spin_unlock(&wq->lock); 989 990 /* fatal condition, failed to create the first worker */ 991 io_acct_cancel_pending_work(wq, acct, &match); 992 } 993 } 994 995 /* 996 * Work items that hash to the same value will not be done in parallel. 997 * Used to limit concurrent writes, generally hashed by inode. 998 */ 999 void io_wq_hash_work(struct io_wq_work *work, void *val) 1000 { 1001 unsigned int bit; 1002 1003 bit = hash_ptr(val, IO_WQ_HASH_ORDER); 1004 atomic_or(IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT), &work->flags); 1005 } 1006 1007 static bool __io_wq_worker_cancel(struct io_worker *worker, 1008 struct io_cb_cancel_data *match, 1009 struct io_wq_work *work) 1010 { 1011 if (work && match->fn(work, match->data)) { 1012 atomic_or(IO_WQ_WORK_CANCEL, &work->flags); 1013 __set_notify_signal(worker->task); 1014 return true; 1015 } 1016 1017 return false; 1018 } 1019 1020 static bool io_wq_worker_cancel(struct io_worker *worker, void *data) 1021 { 1022 struct io_cb_cancel_data *match = data; 1023 1024 /* 1025 * Hold the lock to avoid ->cur_work going out of scope, caller 1026 * may dereference the passed in work. 1027 */ 1028 raw_spin_lock(&worker->lock); 1029 if (__io_wq_worker_cancel(worker, match, worker->cur_work)) 1030 match->nr_running++; 1031 raw_spin_unlock(&worker->lock); 1032 1033 return match->nr_running && !match->cancel_all; 1034 } 1035 1036 static inline void io_wq_remove_pending(struct io_wq *wq, 1037 struct io_wq_work *work, 1038 struct io_wq_work_node *prev) 1039 { 1040 struct io_wq_acct *acct = io_work_get_acct(wq, work); 1041 unsigned int hash = io_get_work_hash(work); 1042 struct io_wq_work *prev_work = NULL; 1043 1044 if (io_wq_is_hashed(work) && work == wq->hash_tail[hash]) { 1045 if (prev) 1046 prev_work = container_of(prev, struct io_wq_work, list); 1047 if (prev_work && io_get_work_hash(prev_work) == hash) 1048 wq->hash_tail[hash] = prev_work; 1049 else 1050 wq->hash_tail[hash] = NULL; 1051 } 1052 wq_list_del(&acct->work_list, &work->list, prev); 1053 } 1054 1055 static bool io_acct_cancel_pending_work(struct io_wq *wq, 1056 struct io_wq_acct *acct, 1057 struct io_cb_cancel_data *match) 1058 { 1059 struct io_wq_work_node *node, *prev; 1060 struct io_wq_work *work; 1061 1062 raw_spin_lock(&acct->lock); 1063 wq_list_for_each(node, prev, &acct->work_list) { 1064 work = container_of(node, struct io_wq_work, list); 1065 if (!match->fn(work, match->data)) 1066 continue; 1067 io_wq_remove_pending(wq, work, prev); 1068 raw_spin_unlock(&acct->lock); 1069 io_run_cancel(work, wq); 1070 match->nr_pending++; 1071 /* not safe to continue after unlock */ 1072 return true; 1073 } 1074 raw_spin_unlock(&acct->lock); 1075 1076 return false; 1077 } 1078 1079 static void io_wq_cancel_pending_work(struct io_wq *wq, 1080 struct io_cb_cancel_data *match) 1081 { 1082 int i; 1083 retry: 1084 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1085 struct io_wq_acct *acct = io_get_acct(wq, i == 0); 1086 1087 if (io_acct_cancel_pending_work(wq, acct, match)) { 1088 if (match->cancel_all) 1089 goto retry; 1090 break; 1091 } 1092 } 1093 } 1094 1095 static void io_wq_cancel_running_work(struct io_wq *wq, 1096 struct io_cb_cancel_data *match) 1097 { 1098 rcu_read_lock(); 1099 io_wq_for_each_worker(wq, io_wq_worker_cancel, match); 1100 rcu_read_unlock(); 1101 } 1102 1103 enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, 1104 void *data, bool cancel_all) 1105 { 1106 struct io_cb_cancel_data match = { 1107 .fn = cancel, 1108 .data = data, 1109 .cancel_all = cancel_all, 1110 }; 1111 1112 /* 1113 * First check pending list, if we're lucky we can just remove it 1114 * from there. CANCEL_OK means that the work is returned as-new, 1115 * no completion will be posted for it. 1116 * 1117 * Then check if a free (going busy) or busy worker has the work 1118 * currently running. If we find it there, we'll return CANCEL_RUNNING 1119 * as an indication that we attempt to signal cancellation. The 1120 * completion will run normally in this case. 1121 * 1122 * Do both of these while holding the wq->lock, to ensure that 1123 * we'll find a work item regardless of state. 1124 */ 1125 io_wq_cancel_pending_work(wq, &match); 1126 if (match.nr_pending && !match.cancel_all) 1127 return IO_WQ_CANCEL_OK; 1128 1129 raw_spin_lock(&wq->lock); 1130 io_wq_cancel_running_work(wq, &match); 1131 raw_spin_unlock(&wq->lock); 1132 if (match.nr_running && !match.cancel_all) 1133 return IO_WQ_CANCEL_RUNNING; 1134 1135 if (match.nr_running) 1136 return IO_WQ_CANCEL_RUNNING; 1137 if (match.nr_pending) 1138 return IO_WQ_CANCEL_OK; 1139 return IO_WQ_CANCEL_NOTFOUND; 1140 } 1141 1142 static int io_wq_hash_wake(struct wait_queue_entry *wait, unsigned mode, 1143 int sync, void *key) 1144 { 1145 struct io_wq *wq = container_of(wait, struct io_wq, wait); 1146 int i; 1147 1148 list_del_init(&wait->entry); 1149 1150 rcu_read_lock(); 1151 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1152 struct io_wq_acct *acct = &wq->acct[i]; 1153 1154 if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags)) 1155 io_wq_activate_free_worker(wq, acct); 1156 } 1157 rcu_read_unlock(); 1158 return 1; 1159 } 1160 1161 struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) 1162 { 1163 int ret, i; 1164 struct io_wq *wq; 1165 1166 if (WARN_ON_ONCE(!data->free_work || !data->do_work)) 1167 return ERR_PTR(-EINVAL); 1168 if (WARN_ON_ONCE(!bounded)) 1169 return ERR_PTR(-EINVAL); 1170 1171 wq = kzalloc(sizeof(struct io_wq), GFP_KERNEL); 1172 if (!wq) 1173 return ERR_PTR(-ENOMEM); 1174 1175 refcount_inc(&data->hash->refs); 1176 wq->hash = data->hash; 1177 wq->free_work = data->free_work; 1178 wq->do_work = data->do_work; 1179 1180 ret = -ENOMEM; 1181 1182 if (!alloc_cpumask_var(&wq->cpu_mask, GFP_KERNEL)) 1183 goto err; 1184 cpuset_cpus_allowed(data->task, wq->cpu_mask); 1185 wq->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; 1186 wq->acct[IO_WQ_ACCT_UNBOUND].max_workers = 1187 task_rlimit(current, RLIMIT_NPROC); 1188 INIT_LIST_HEAD(&wq->wait.entry); 1189 wq->wait.func = io_wq_hash_wake; 1190 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1191 struct io_wq_acct *acct = &wq->acct[i]; 1192 1193 acct->index = i; 1194 atomic_set(&acct->nr_running, 0); 1195 INIT_WQ_LIST(&acct->work_list); 1196 raw_spin_lock_init(&acct->lock); 1197 } 1198 1199 raw_spin_lock_init(&wq->lock); 1200 INIT_HLIST_NULLS_HEAD(&wq->free_list, 0); 1201 INIT_LIST_HEAD(&wq->all_list); 1202 1203 wq->task = get_task_struct(data->task); 1204 atomic_set(&wq->worker_refs, 1); 1205 init_completion(&wq->worker_done); 1206 ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1207 if (ret) 1208 goto err; 1209 1210 return wq; 1211 err: 1212 io_wq_put_hash(data->hash); 1213 free_cpumask_var(wq->cpu_mask); 1214 kfree(wq); 1215 return ERR_PTR(ret); 1216 } 1217 1218 static bool io_task_work_match(struct callback_head *cb, void *data) 1219 { 1220 struct io_worker *worker; 1221 1222 if (cb->func != create_worker_cb && cb->func != create_worker_cont) 1223 return false; 1224 worker = container_of(cb, struct io_worker, create_work); 1225 return worker->wq == data; 1226 } 1227 1228 void io_wq_exit_start(struct io_wq *wq) 1229 { 1230 set_bit(IO_WQ_BIT_EXIT, &wq->state); 1231 } 1232 1233 static void io_wq_cancel_tw_create(struct io_wq *wq) 1234 { 1235 struct callback_head *cb; 1236 1237 while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) { 1238 struct io_worker *worker; 1239 1240 worker = container_of(cb, struct io_worker, create_work); 1241 io_worker_cancel_cb(worker); 1242 /* 1243 * Only the worker continuation helper has worker allocated and 1244 * hence needs freeing. 1245 */ 1246 if (cb->func == create_worker_cont) 1247 kfree(worker); 1248 } 1249 } 1250 1251 static void io_wq_exit_workers(struct io_wq *wq) 1252 { 1253 if (!wq->task) 1254 return; 1255 1256 io_wq_cancel_tw_create(wq); 1257 1258 rcu_read_lock(); 1259 io_wq_for_each_worker(wq, io_wq_worker_wake, NULL); 1260 rcu_read_unlock(); 1261 io_worker_ref_put(wq); 1262 wait_for_completion(&wq->worker_done); 1263 1264 spin_lock_irq(&wq->hash->wait.lock); 1265 list_del_init(&wq->wait.entry); 1266 spin_unlock_irq(&wq->hash->wait.lock); 1267 1268 put_task_struct(wq->task); 1269 wq->task = NULL; 1270 } 1271 1272 static void io_wq_destroy(struct io_wq *wq) 1273 { 1274 struct io_cb_cancel_data match = { 1275 .fn = io_wq_work_match_all, 1276 .cancel_all = true, 1277 }; 1278 1279 cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1280 io_wq_cancel_pending_work(wq, &match); 1281 free_cpumask_var(wq->cpu_mask); 1282 io_wq_put_hash(wq->hash); 1283 kfree(wq); 1284 } 1285 1286 void io_wq_put_and_exit(struct io_wq *wq) 1287 { 1288 WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state)); 1289 1290 io_wq_exit_workers(wq); 1291 io_wq_destroy(wq); 1292 } 1293 1294 struct online_data { 1295 unsigned int cpu; 1296 bool online; 1297 }; 1298 1299 static bool io_wq_worker_affinity(struct io_worker *worker, void *data) 1300 { 1301 struct online_data *od = data; 1302 1303 if (od->online) 1304 cpumask_set_cpu(od->cpu, worker->wq->cpu_mask); 1305 else 1306 cpumask_clear_cpu(od->cpu, worker->wq->cpu_mask); 1307 return false; 1308 } 1309 1310 static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online) 1311 { 1312 struct online_data od = { 1313 .cpu = cpu, 1314 .online = online 1315 }; 1316 1317 rcu_read_lock(); 1318 io_wq_for_each_worker(wq, io_wq_worker_affinity, &od); 1319 rcu_read_unlock(); 1320 return 0; 1321 } 1322 1323 static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node) 1324 { 1325 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1326 1327 return __io_wq_cpu_online(wq, cpu, true); 1328 } 1329 1330 static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node) 1331 { 1332 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1333 1334 return __io_wq_cpu_online(wq, cpu, false); 1335 } 1336 1337 int io_wq_cpu_affinity(struct io_uring_task *tctx, cpumask_var_t mask) 1338 { 1339 cpumask_var_t allowed_mask; 1340 int ret = 0; 1341 1342 if (!tctx || !tctx->io_wq) 1343 return -EINVAL; 1344 1345 if (!alloc_cpumask_var(&allowed_mask, GFP_KERNEL)) 1346 return -ENOMEM; 1347 1348 rcu_read_lock(); 1349 cpuset_cpus_allowed(tctx->io_wq->task, allowed_mask); 1350 if (mask) { 1351 if (cpumask_subset(mask, allowed_mask)) 1352 cpumask_copy(tctx->io_wq->cpu_mask, mask); 1353 else 1354 ret = -EINVAL; 1355 } else { 1356 cpumask_copy(tctx->io_wq->cpu_mask, allowed_mask); 1357 } 1358 rcu_read_unlock(); 1359 1360 free_cpumask_var(allowed_mask); 1361 return ret; 1362 } 1363 1364 /* 1365 * Set max number of unbounded workers, returns old value. If new_count is 0, 1366 * then just return the old value. 1367 */ 1368 int io_wq_max_workers(struct io_wq *wq, int *new_count) 1369 { 1370 struct io_wq_acct *acct; 1371 int prev[IO_WQ_ACCT_NR]; 1372 int i; 1373 1374 BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND); 1375 BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND); 1376 BUILD_BUG_ON((int) IO_WQ_ACCT_NR != 2); 1377 1378 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1379 if (new_count[i] > task_rlimit(current, RLIMIT_NPROC)) 1380 new_count[i] = task_rlimit(current, RLIMIT_NPROC); 1381 } 1382 1383 for (i = 0; i < IO_WQ_ACCT_NR; i++) 1384 prev[i] = 0; 1385 1386 rcu_read_lock(); 1387 1388 raw_spin_lock(&wq->lock); 1389 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1390 acct = &wq->acct[i]; 1391 prev[i] = max_t(int, acct->max_workers, prev[i]); 1392 if (new_count[i]) 1393 acct->max_workers = new_count[i]; 1394 } 1395 raw_spin_unlock(&wq->lock); 1396 rcu_read_unlock(); 1397 1398 for (i = 0; i < IO_WQ_ACCT_NR; i++) 1399 new_count[i] = prev[i]; 1400 1401 return 0; 1402 } 1403 1404 static __init int io_wq_init(void) 1405 { 1406 int ret; 1407 1408 ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online", 1409 io_wq_cpu_online, io_wq_cpu_offline); 1410 if (ret < 0) 1411 return ret; 1412 io_wq_online = ret; 1413 return 0; 1414 } 1415 subsys_initcall(io_wq_init); 1416