1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Basic worker thread pool for io_uring 4 * 5 * Copyright (C) 2019 Jens Axboe 6 * 7 */ 8 #include <linux/kernel.h> 9 #include <linux/init.h> 10 #include <linux/errno.h> 11 #include <linux/sched/signal.h> 12 #include <linux/percpu.h> 13 #include <linux/slab.h> 14 #include <linux/rculist_nulls.h> 15 #include <linux/cpu.h> 16 #include <linux/cpuset.h> 17 #include <linux/task_work.h> 18 #include <linux/audit.h> 19 #include <linux/mmu_context.h> 20 #include <uapi/linux/io_uring.h> 21 22 #include "io-wq.h" 23 #include "slist.h" 24 #include "io_uring.h" 25 26 #define WORKER_IDLE_TIMEOUT (5 * HZ) 27 #define WORKER_INIT_LIMIT 3 28 29 enum { 30 IO_WORKER_F_UP = 0, /* up and active */ 31 IO_WORKER_F_RUNNING = 1, /* account as running */ 32 IO_WORKER_F_FREE = 2, /* worker on free list */ 33 IO_WORKER_F_BOUND = 3, /* is doing bounded work */ 34 }; 35 36 enum { 37 IO_WQ_BIT_EXIT = 0, /* wq exiting */ 38 }; 39 40 enum { 41 IO_ACCT_STALLED_BIT = 0, /* stalled on hash */ 42 }; 43 44 /* 45 * One for each thread in a wq pool 46 */ 47 struct io_worker { 48 refcount_t ref; 49 int create_index; 50 unsigned long flags; 51 struct hlist_nulls_node nulls_node; 52 struct list_head all_list; 53 struct task_struct *task; 54 struct io_wq *wq; 55 56 struct io_wq_work *cur_work; 57 raw_spinlock_t lock; 58 59 struct completion ref_done; 60 61 unsigned long create_state; 62 struct callback_head create_work; 63 int init_retries; 64 65 union { 66 struct rcu_head rcu; 67 struct work_struct work; 68 }; 69 }; 70 71 #if BITS_PER_LONG == 64 72 #define IO_WQ_HASH_ORDER 6 73 #else 74 #define IO_WQ_HASH_ORDER 5 75 #endif 76 77 #define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER) 78 79 struct io_wq_acct { 80 unsigned nr_workers; 81 unsigned max_workers; 82 int index; 83 atomic_t nr_running; 84 raw_spinlock_t lock; 85 struct io_wq_work_list work_list; 86 unsigned long flags; 87 }; 88 89 enum { 90 IO_WQ_ACCT_BOUND, 91 IO_WQ_ACCT_UNBOUND, 92 IO_WQ_ACCT_NR, 93 }; 94 95 /* 96 * Per io_wq state 97 */ 98 struct io_wq { 99 unsigned long state; 100 101 free_work_fn *free_work; 102 io_wq_work_fn *do_work; 103 104 struct io_wq_hash *hash; 105 106 atomic_t worker_refs; 107 struct completion worker_done; 108 109 struct hlist_node cpuhp_node; 110 111 struct task_struct *task; 112 113 struct io_wq_acct acct[IO_WQ_ACCT_NR]; 114 115 /* lock protects access to elements below */ 116 raw_spinlock_t lock; 117 118 struct hlist_nulls_head free_list; 119 struct list_head all_list; 120 121 struct wait_queue_entry wait; 122 123 struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS]; 124 125 cpumask_var_t cpu_mask; 126 }; 127 128 static enum cpuhp_state io_wq_online; 129 130 struct io_cb_cancel_data { 131 work_cancel_fn *fn; 132 void *data; 133 int nr_running; 134 int nr_pending; 135 bool cancel_all; 136 }; 137 138 static bool create_io_worker(struct io_wq *wq, int index); 139 static void io_wq_dec_running(struct io_worker *worker); 140 static bool io_acct_cancel_pending_work(struct io_wq *wq, 141 struct io_wq_acct *acct, 142 struct io_cb_cancel_data *match); 143 static void create_worker_cb(struct callback_head *cb); 144 static void io_wq_cancel_tw_create(struct io_wq *wq); 145 146 static bool io_worker_get(struct io_worker *worker) 147 { 148 return refcount_inc_not_zero(&worker->ref); 149 } 150 151 static void io_worker_release(struct io_worker *worker) 152 { 153 if (refcount_dec_and_test(&worker->ref)) 154 complete(&worker->ref_done); 155 } 156 157 static inline struct io_wq_acct *io_get_acct(struct io_wq *wq, bool bound) 158 { 159 return &wq->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND]; 160 } 161 162 static inline struct io_wq_acct *io_work_get_acct(struct io_wq *wq, 163 struct io_wq_work *work) 164 { 165 return io_get_acct(wq, !(atomic_read(&work->flags) & IO_WQ_WORK_UNBOUND)); 166 } 167 168 static inline struct io_wq_acct *io_wq_get_acct(struct io_worker *worker) 169 { 170 return io_get_acct(worker->wq, test_bit(IO_WORKER_F_BOUND, &worker->flags)); 171 } 172 173 static void io_worker_ref_put(struct io_wq *wq) 174 { 175 if (atomic_dec_and_test(&wq->worker_refs)) 176 complete(&wq->worker_done); 177 } 178 179 bool io_wq_worker_stopped(void) 180 { 181 struct io_worker *worker = current->worker_private; 182 183 if (WARN_ON_ONCE(!io_wq_current_is_worker())) 184 return true; 185 186 return test_bit(IO_WQ_BIT_EXIT, &worker->wq->state); 187 } 188 189 static void io_worker_cancel_cb(struct io_worker *worker) 190 { 191 struct io_wq_acct *acct = io_wq_get_acct(worker); 192 struct io_wq *wq = worker->wq; 193 194 atomic_dec(&acct->nr_running); 195 raw_spin_lock(&wq->lock); 196 acct->nr_workers--; 197 raw_spin_unlock(&wq->lock); 198 io_worker_ref_put(wq); 199 clear_bit_unlock(0, &worker->create_state); 200 io_worker_release(worker); 201 } 202 203 static bool io_task_worker_match(struct callback_head *cb, void *data) 204 { 205 struct io_worker *worker; 206 207 if (cb->func != create_worker_cb) 208 return false; 209 worker = container_of(cb, struct io_worker, create_work); 210 return worker == data; 211 } 212 213 static void io_worker_exit(struct io_worker *worker) 214 { 215 struct io_wq *wq = worker->wq; 216 217 while (1) { 218 struct callback_head *cb = task_work_cancel_match(wq->task, 219 io_task_worker_match, worker); 220 221 if (!cb) 222 break; 223 io_worker_cancel_cb(worker); 224 } 225 226 io_worker_release(worker); 227 wait_for_completion(&worker->ref_done); 228 229 raw_spin_lock(&wq->lock); 230 if (test_bit(IO_WORKER_F_FREE, &worker->flags)) 231 hlist_nulls_del_rcu(&worker->nulls_node); 232 list_del_rcu(&worker->all_list); 233 raw_spin_unlock(&wq->lock); 234 io_wq_dec_running(worker); 235 /* 236 * this worker is a goner, clear ->worker_private to avoid any 237 * inc/dec running calls that could happen as part of exit from 238 * touching 'worker'. 239 */ 240 current->worker_private = NULL; 241 242 kfree_rcu(worker, rcu); 243 io_worker_ref_put(wq); 244 do_exit(0); 245 } 246 247 static inline bool __io_acct_run_queue(struct io_wq_acct *acct) 248 { 249 return !test_bit(IO_ACCT_STALLED_BIT, &acct->flags) && 250 !wq_list_empty(&acct->work_list); 251 } 252 253 /* 254 * If there's work to do, returns true with acct->lock acquired. If not, 255 * returns false with no lock held. 256 */ 257 static inline bool io_acct_run_queue(struct io_wq_acct *acct) 258 __acquires(&acct->lock) 259 { 260 raw_spin_lock(&acct->lock); 261 if (__io_acct_run_queue(acct)) 262 return true; 263 264 raw_spin_unlock(&acct->lock); 265 return false; 266 } 267 268 /* 269 * Check head of free list for an available worker. If one isn't available, 270 * caller must create one. 271 */ 272 static bool io_wq_activate_free_worker(struct io_wq *wq, 273 struct io_wq_acct *acct) 274 __must_hold(RCU) 275 { 276 struct hlist_nulls_node *n; 277 struct io_worker *worker; 278 279 /* 280 * Iterate free_list and see if we can find an idle worker to 281 * activate. If a given worker is on the free_list but in the process 282 * of exiting, keep trying. 283 */ 284 hlist_nulls_for_each_entry_rcu(worker, n, &wq->free_list, nulls_node) { 285 if (!io_worker_get(worker)) 286 continue; 287 if (io_wq_get_acct(worker) != acct) { 288 io_worker_release(worker); 289 continue; 290 } 291 /* 292 * If the worker is already running, it's either already 293 * starting work or finishing work. In either case, if it does 294 * to go sleep, we'll kick off a new task for this work anyway. 295 */ 296 wake_up_process(worker->task); 297 io_worker_release(worker); 298 return true; 299 } 300 301 return false; 302 } 303 304 /* 305 * We need a worker. If we find a free one, we're good. If not, and we're 306 * below the max number of workers, create one. 307 */ 308 static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct) 309 { 310 /* 311 * Most likely an attempt to queue unbounded work on an io_wq that 312 * wasn't setup with any unbounded workers. 313 */ 314 if (unlikely(!acct->max_workers)) 315 pr_warn_once("io-wq is not configured for unbound workers"); 316 317 raw_spin_lock(&wq->lock); 318 if (acct->nr_workers >= acct->max_workers) { 319 raw_spin_unlock(&wq->lock); 320 return true; 321 } 322 acct->nr_workers++; 323 raw_spin_unlock(&wq->lock); 324 atomic_inc(&acct->nr_running); 325 atomic_inc(&wq->worker_refs); 326 return create_io_worker(wq, acct->index); 327 } 328 329 static void io_wq_inc_running(struct io_worker *worker) 330 { 331 struct io_wq_acct *acct = io_wq_get_acct(worker); 332 333 atomic_inc(&acct->nr_running); 334 } 335 336 static void create_worker_cb(struct callback_head *cb) 337 { 338 struct io_worker *worker; 339 struct io_wq *wq; 340 341 struct io_wq_acct *acct; 342 bool do_create = false; 343 344 worker = container_of(cb, struct io_worker, create_work); 345 wq = worker->wq; 346 acct = &wq->acct[worker->create_index]; 347 raw_spin_lock(&wq->lock); 348 349 if (acct->nr_workers < acct->max_workers) { 350 acct->nr_workers++; 351 do_create = true; 352 } 353 raw_spin_unlock(&wq->lock); 354 if (do_create) { 355 create_io_worker(wq, worker->create_index); 356 } else { 357 atomic_dec(&acct->nr_running); 358 io_worker_ref_put(wq); 359 } 360 clear_bit_unlock(0, &worker->create_state); 361 io_worker_release(worker); 362 } 363 364 static bool io_queue_worker_create(struct io_worker *worker, 365 struct io_wq_acct *acct, 366 task_work_func_t func) 367 { 368 struct io_wq *wq = worker->wq; 369 370 /* raced with exit, just ignore create call */ 371 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) 372 goto fail; 373 if (!io_worker_get(worker)) 374 goto fail; 375 /* 376 * create_state manages ownership of create_work/index. We should 377 * only need one entry per worker, as the worker going to sleep 378 * will trigger the condition, and waking will clear it once it 379 * runs the task_work. 380 */ 381 if (test_bit(0, &worker->create_state) || 382 test_and_set_bit_lock(0, &worker->create_state)) 383 goto fail_release; 384 385 atomic_inc(&wq->worker_refs); 386 init_task_work(&worker->create_work, func); 387 worker->create_index = acct->index; 388 if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) { 389 /* 390 * EXIT may have been set after checking it above, check after 391 * adding the task_work and remove any creation item if it is 392 * now set. wq exit does that too, but we can have added this 393 * work item after we canceled in io_wq_exit_workers(). 394 */ 395 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) 396 io_wq_cancel_tw_create(wq); 397 io_worker_ref_put(wq); 398 return true; 399 } 400 io_worker_ref_put(wq); 401 clear_bit_unlock(0, &worker->create_state); 402 fail_release: 403 io_worker_release(worker); 404 fail: 405 atomic_dec(&acct->nr_running); 406 io_worker_ref_put(wq); 407 return false; 408 } 409 410 static void io_wq_dec_running(struct io_worker *worker) 411 { 412 struct io_wq_acct *acct = io_wq_get_acct(worker); 413 struct io_wq *wq = worker->wq; 414 415 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 416 return; 417 418 if (!atomic_dec_and_test(&acct->nr_running)) 419 return; 420 if (!io_acct_run_queue(acct)) 421 return; 422 423 raw_spin_unlock(&acct->lock); 424 atomic_inc(&acct->nr_running); 425 atomic_inc(&wq->worker_refs); 426 io_queue_worker_create(worker, acct, create_worker_cb); 427 } 428 429 /* 430 * Worker will start processing some work. Move it to the busy list, if 431 * it's currently on the freelist 432 */ 433 static void __io_worker_busy(struct io_wq *wq, struct io_worker *worker) 434 { 435 if (test_bit(IO_WORKER_F_FREE, &worker->flags)) { 436 clear_bit(IO_WORKER_F_FREE, &worker->flags); 437 raw_spin_lock(&wq->lock); 438 hlist_nulls_del_init_rcu(&worker->nulls_node); 439 raw_spin_unlock(&wq->lock); 440 } 441 } 442 443 /* 444 * No work, worker going to sleep. Move to freelist. 445 */ 446 static void __io_worker_idle(struct io_wq *wq, struct io_worker *worker) 447 __must_hold(wq->lock) 448 { 449 if (!test_bit(IO_WORKER_F_FREE, &worker->flags)) { 450 set_bit(IO_WORKER_F_FREE, &worker->flags); 451 hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list); 452 } 453 } 454 455 static inline unsigned int io_get_work_hash(struct io_wq_work *work) 456 { 457 return atomic_read(&work->flags) >> IO_WQ_HASH_SHIFT; 458 } 459 460 static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash) 461 { 462 bool ret = false; 463 464 spin_lock_irq(&wq->hash->wait.lock); 465 if (list_empty(&wq->wait.entry)) { 466 __add_wait_queue(&wq->hash->wait, &wq->wait); 467 if (!test_bit(hash, &wq->hash->map)) { 468 __set_current_state(TASK_RUNNING); 469 list_del_init(&wq->wait.entry); 470 ret = true; 471 } 472 } 473 spin_unlock_irq(&wq->hash->wait.lock); 474 return ret; 475 } 476 477 static struct io_wq_work *io_get_next_work(struct io_wq_acct *acct, 478 struct io_worker *worker) 479 __must_hold(acct->lock) 480 { 481 struct io_wq_work_node *node, *prev; 482 struct io_wq_work *work, *tail; 483 unsigned int stall_hash = -1U; 484 struct io_wq *wq = worker->wq; 485 486 wq_list_for_each(node, prev, &acct->work_list) { 487 unsigned int hash; 488 489 work = container_of(node, struct io_wq_work, list); 490 491 /* not hashed, can run anytime */ 492 if (!io_wq_is_hashed(work)) { 493 wq_list_del(&acct->work_list, node, prev); 494 return work; 495 } 496 497 hash = io_get_work_hash(work); 498 /* all items with this hash lie in [work, tail] */ 499 tail = wq->hash_tail[hash]; 500 501 /* hashed, can run if not already running */ 502 if (!test_and_set_bit(hash, &wq->hash->map)) { 503 wq->hash_tail[hash] = NULL; 504 wq_list_cut(&acct->work_list, &tail->list, prev); 505 return work; 506 } 507 if (stall_hash == -1U) 508 stall_hash = hash; 509 /* fast forward to a next hash, for-each will fix up @prev */ 510 node = &tail->list; 511 } 512 513 if (stall_hash != -1U) { 514 bool unstalled; 515 516 /* 517 * Set this before dropping the lock to avoid racing with new 518 * work being added and clearing the stalled bit. 519 */ 520 set_bit(IO_ACCT_STALLED_BIT, &acct->flags); 521 raw_spin_unlock(&acct->lock); 522 unstalled = io_wait_on_hash(wq, stall_hash); 523 raw_spin_lock(&acct->lock); 524 if (unstalled) { 525 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 526 if (wq_has_sleeper(&wq->hash->wait)) 527 wake_up(&wq->hash->wait); 528 } 529 } 530 531 return NULL; 532 } 533 534 static void io_assign_current_work(struct io_worker *worker, 535 struct io_wq_work *work) 536 { 537 if (work) { 538 io_run_task_work(); 539 cond_resched(); 540 } 541 542 raw_spin_lock(&worker->lock); 543 worker->cur_work = work; 544 raw_spin_unlock(&worker->lock); 545 } 546 547 /* 548 * Called with acct->lock held, drops it before returning 549 */ 550 static void io_worker_handle_work(struct io_wq_acct *acct, 551 struct io_worker *worker) 552 __releases(&acct->lock) 553 { 554 struct io_wq *wq = worker->wq; 555 bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state); 556 557 do { 558 struct io_wq_work *work; 559 560 /* 561 * If we got some work, mark us as busy. If we didn't, but 562 * the list isn't empty, it means we stalled on hashed work. 563 * Mark us stalled so we don't keep looking for work when we 564 * can't make progress, any work completion or insertion will 565 * clear the stalled flag. 566 */ 567 work = io_get_next_work(acct, worker); 568 if (work) { 569 /* 570 * Make sure cancelation can find this, even before 571 * it becomes the active work. That avoids a window 572 * where the work has been removed from our general 573 * work list, but isn't yet discoverable as the 574 * current work item for this worker. 575 */ 576 raw_spin_lock(&worker->lock); 577 worker->cur_work = work; 578 raw_spin_unlock(&worker->lock); 579 } 580 581 raw_spin_unlock(&acct->lock); 582 583 if (!work) 584 break; 585 586 __io_worker_busy(wq, worker); 587 588 io_assign_current_work(worker, work); 589 __set_current_state(TASK_RUNNING); 590 591 /* handle a whole dependent link */ 592 do { 593 struct io_wq_work *next_hashed, *linked; 594 unsigned int hash = io_get_work_hash(work); 595 596 next_hashed = wq_next_work(work); 597 598 if (do_kill && 599 (atomic_read(&work->flags) & IO_WQ_WORK_UNBOUND)) 600 atomic_or(IO_WQ_WORK_CANCEL, &work->flags); 601 wq->do_work(work); 602 io_assign_current_work(worker, NULL); 603 604 linked = wq->free_work(work); 605 work = next_hashed; 606 if (!work && linked && !io_wq_is_hashed(linked)) { 607 work = linked; 608 linked = NULL; 609 } 610 io_assign_current_work(worker, work); 611 if (linked) 612 io_wq_enqueue(wq, linked); 613 614 if (hash != -1U && !next_hashed) { 615 /* serialize hash clear with wake_up() */ 616 spin_lock_irq(&wq->hash->wait.lock); 617 clear_bit(hash, &wq->hash->map); 618 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 619 spin_unlock_irq(&wq->hash->wait.lock); 620 if (wq_has_sleeper(&wq->hash->wait)) 621 wake_up(&wq->hash->wait); 622 } 623 } while (work); 624 625 if (!__io_acct_run_queue(acct)) 626 break; 627 raw_spin_lock(&acct->lock); 628 } while (1); 629 } 630 631 static int io_wq_worker(void *data) 632 { 633 struct io_worker *worker = data; 634 struct io_wq_acct *acct = io_wq_get_acct(worker); 635 struct io_wq *wq = worker->wq; 636 bool exit_mask = false, last_timeout = false; 637 char buf[TASK_COMM_LEN]; 638 639 set_mask_bits(&worker->flags, 0, 640 BIT(IO_WORKER_F_UP) | BIT(IO_WORKER_F_RUNNING)); 641 642 snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid); 643 set_task_comm(current, buf); 644 645 while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { 646 long ret; 647 648 set_current_state(TASK_INTERRUPTIBLE); 649 650 /* 651 * If we have work to do, io_acct_run_queue() returns with 652 * the acct->lock held. If not, it will drop it. 653 */ 654 while (io_acct_run_queue(acct)) 655 io_worker_handle_work(acct, worker); 656 657 raw_spin_lock(&wq->lock); 658 /* 659 * Last sleep timed out. Exit if we're not the last worker, 660 * or if someone modified our affinity. 661 */ 662 if (last_timeout && (exit_mask || acct->nr_workers > 1)) { 663 acct->nr_workers--; 664 raw_spin_unlock(&wq->lock); 665 __set_current_state(TASK_RUNNING); 666 break; 667 } 668 last_timeout = false; 669 __io_worker_idle(wq, worker); 670 raw_spin_unlock(&wq->lock); 671 if (io_run_task_work()) 672 continue; 673 ret = schedule_timeout(WORKER_IDLE_TIMEOUT); 674 if (signal_pending(current)) { 675 struct ksignal ksig; 676 677 if (!get_signal(&ksig)) 678 continue; 679 break; 680 } 681 if (!ret) { 682 last_timeout = true; 683 exit_mask = !cpumask_test_cpu(raw_smp_processor_id(), 684 wq->cpu_mask); 685 } 686 } 687 688 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) && io_acct_run_queue(acct)) 689 io_worker_handle_work(acct, worker); 690 691 io_worker_exit(worker); 692 return 0; 693 } 694 695 /* 696 * Called when a worker is scheduled in. Mark us as currently running. 697 */ 698 void io_wq_worker_running(struct task_struct *tsk) 699 { 700 struct io_worker *worker = tsk->worker_private; 701 702 if (!worker) 703 return; 704 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 705 return; 706 if (test_bit(IO_WORKER_F_RUNNING, &worker->flags)) 707 return; 708 set_bit(IO_WORKER_F_RUNNING, &worker->flags); 709 io_wq_inc_running(worker); 710 } 711 712 /* 713 * Called when worker is going to sleep. If there are no workers currently 714 * running and we have work pending, wake up a free one or create a new one. 715 */ 716 void io_wq_worker_sleeping(struct task_struct *tsk) 717 { 718 struct io_worker *worker = tsk->worker_private; 719 720 if (!worker) 721 return; 722 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 723 return; 724 if (!test_bit(IO_WORKER_F_RUNNING, &worker->flags)) 725 return; 726 727 clear_bit(IO_WORKER_F_RUNNING, &worker->flags); 728 io_wq_dec_running(worker); 729 } 730 731 static void io_init_new_worker(struct io_wq *wq, struct io_worker *worker, 732 struct task_struct *tsk) 733 { 734 tsk->worker_private = worker; 735 worker->task = tsk; 736 set_cpus_allowed_ptr(tsk, wq->cpu_mask); 737 738 raw_spin_lock(&wq->lock); 739 hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list); 740 list_add_tail_rcu(&worker->all_list, &wq->all_list); 741 set_bit(IO_WORKER_F_FREE, &worker->flags); 742 raw_spin_unlock(&wq->lock); 743 wake_up_new_task(tsk); 744 } 745 746 static bool io_wq_work_match_all(struct io_wq_work *work, void *data) 747 { 748 return true; 749 } 750 751 static inline bool io_should_retry_thread(struct io_worker *worker, long err) 752 { 753 /* 754 * Prevent perpetual task_work retry, if the task (or its group) is 755 * exiting. 756 */ 757 if (fatal_signal_pending(current)) 758 return false; 759 if (worker->init_retries++ >= WORKER_INIT_LIMIT) 760 return false; 761 762 switch (err) { 763 case -EAGAIN: 764 case -ERESTARTSYS: 765 case -ERESTARTNOINTR: 766 case -ERESTARTNOHAND: 767 return true; 768 default: 769 return false; 770 } 771 } 772 773 static void create_worker_cont(struct callback_head *cb) 774 { 775 struct io_worker *worker; 776 struct task_struct *tsk; 777 struct io_wq *wq; 778 779 worker = container_of(cb, struct io_worker, create_work); 780 clear_bit_unlock(0, &worker->create_state); 781 wq = worker->wq; 782 tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); 783 if (!IS_ERR(tsk)) { 784 io_init_new_worker(wq, worker, tsk); 785 io_worker_release(worker); 786 return; 787 } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) { 788 struct io_wq_acct *acct = io_wq_get_acct(worker); 789 790 atomic_dec(&acct->nr_running); 791 raw_spin_lock(&wq->lock); 792 acct->nr_workers--; 793 if (!acct->nr_workers) { 794 struct io_cb_cancel_data match = { 795 .fn = io_wq_work_match_all, 796 .cancel_all = true, 797 }; 798 799 raw_spin_unlock(&wq->lock); 800 while (io_acct_cancel_pending_work(wq, acct, &match)) 801 ; 802 } else { 803 raw_spin_unlock(&wq->lock); 804 } 805 io_worker_ref_put(wq); 806 kfree(worker); 807 return; 808 } 809 810 /* re-create attempts grab a new worker ref, drop the existing one */ 811 io_worker_release(worker); 812 schedule_work(&worker->work); 813 } 814 815 static void io_workqueue_create(struct work_struct *work) 816 { 817 struct io_worker *worker = container_of(work, struct io_worker, work); 818 struct io_wq_acct *acct = io_wq_get_acct(worker); 819 820 if (!io_queue_worker_create(worker, acct, create_worker_cont)) 821 kfree(worker); 822 } 823 824 static bool create_io_worker(struct io_wq *wq, int index) 825 { 826 struct io_wq_acct *acct = &wq->acct[index]; 827 struct io_worker *worker; 828 struct task_struct *tsk; 829 830 __set_current_state(TASK_RUNNING); 831 832 worker = kzalloc(sizeof(*worker), GFP_KERNEL); 833 if (!worker) { 834 fail: 835 atomic_dec(&acct->nr_running); 836 raw_spin_lock(&wq->lock); 837 acct->nr_workers--; 838 raw_spin_unlock(&wq->lock); 839 io_worker_ref_put(wq); 840 return false; 841 } 842 843 refcount_set(&worker->ref, 1); 844 worker->wq = wq; 845 raw_spin_lock_init(&worker->lock); 846 init_completion(&worker->ref_done); 847 848 if (index == IO_WQ_ACCT_BOUND) 849 set_bit(IO_WORKER_F_BOUND, &worker->flags); 850 851 tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); 852 if (!IS_ERR(tsk)) { 853 io_init_new_worker(wq, worker, tsk); 854 } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) { 855 kfree(worker); 856 goto fail; 857 } else { 858 INIT_WORK(&worker->work, io_workqueue_create); 859 schedule_work(&worker->work); 860 } 861 862 return true; 863 } 864 865 /* 866 * Iterate the passed in list and call the specific function for each 867 * worker that isn't exiting 868 */ 869 static bool io_wq_for_each_worker(struct io_wq *wq, 870 bool (*func)(struct io_worker *, void *), 871 void *data) 872 { 873 struct io_worker *worker; 874 bool ret = false; 875 876 list_for_each_entry_rcu(worker, &wq->all_list, all_list) { 877 if (io_worker_get(worker)) { 878 /* no task if node is/was offline */ 879 if (worker->task) 880 ret = func(worker, data); 881 io_worker_release(worker); 882 if (ret) 883 break; 884 } 885 } 886 887 return ret; 888 } 889 890 static bool io_wq_worker_wake(struct io_worker *worker, void *data) 891 { 892 __set_notify_signal(worker->task); 893 wake_up_process(worker->task); 894 return false; 895 } 896 897 static void io_run_cancel(struct io_wq_work *work, struct io_wq *wq) 898 { 899 do { 900 atomic_or(IO_WQ_WORK_CANCEL, &work->flags); 901 wq->do_work(work); 902 work = wq->free_work(work); 903 } while (work); 904 } 905 906 static void io_wq_insert_work(struct io_wq *wq, struct io_wq_work *work) 907 { 908 struct io_wq_acct *acct = io_work_get_acct(wq, work); 909 unsigned int hash; 910 struct io_wq_work *tail; 911 912 if (!io_wq_is_hashed(work)) { 913 append: 914 wq_list_add_tail(&work->list, &acct->work_list); 915 return; 916 } 917 918 hash = io_get_work_hash(work); 919 tail = wq->hash_tail[hash]; 920 wq->hash_tail[hash] = work; 921 if (!tail) 922 goto append; 923 924 wq_list_add_after(&work->list, &tail->list, &acct->work_list); 925 } 926 927 static bool io_wq_work_match_item(struct io_wq_work *work, void *data) 928 { 929 return work == data; 930 } 931 932 void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) 933 { 934 struct io_wq_acct *acct = io_work_get_acct(wq, work); 935 unsigned int work_flags = atomic_read(&work->flags); 936 struct io_cb_cancel_data match = { 937 .fn = io_wq_work_match_item, 938 .data = work, 939 .cancel_all = false, 940 }; 941 bool do_create; 942 943 /* 944 * If io-wq is exiting for this task, or if the request has explicitly 945 * been marked as one that should not get executed, cancel it here. 946 */ 947 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) || 948 (work_flags & IO_WQ_WORK_CANCEL)) { 949 io_run_cancel(work, wq); 950 return; 951 } 952 953 raw_spin_lock(&acct->lock); 954 io_wq_insert_work(wq, work); 955 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 956 raw_spin_unlock(&acct->lock); 957 958 rcu_read_lock(); 959 do_create = !io_wq_activate_free_worker(wq, acct); 960 rcu_read_unlock(); 961 962 if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) || 963 !atomic_read(&acct->nr_running))) { 964 bool did_create; 965 966 did_create = io_wq_create_worker(wq, acct); 967 if (likely(did_create)) 968 return; 969 970 raw_spin_lock(&wq->lock); 971 if (acct->nr_workers) { 972 raw_spin_unlock(&wq->lock); 973 return; 974 } 975 raw_spin_unlock(&wq->lock); 976 977 /* fatal condition, failed to create the first worker */ 978 io_acct_cancel_pending_work(wq, acct, &match); 979 } 980 } 981 982 /* 983 * Work items that hash to the same value will not be done in parallel. 984 * Used to limit concurrent writes, generally hashed by inode. 985 */ 986 void io_wq_hash_work(struct io_wq_work *work, void *val) 987 { 988 unsigned int bit; 989 990 bit = hash_ptr(val, IO_WQ_HASH_ORDER); 991 atomic_or(IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT), &work->flags); 992 } 993 994 static bool __io_wq_worker_cancel(struct io_worker *worker, 995 struct io_cb_cancel_data *match, 996 struct io_wq_work *work) 997 { 998 if (work && match->fn(work, match->data)) { 999 atomic_or(IO_WQ_WORK_CANCEL, &work->flags); 1000 __set_notify_signal(worker->task); 1001 return true; 1002 } 1003 1004 return false; 1005 } 1006 1007 static bool io_wq_worker_cancel(struct io_worker *worker, void *data) 1008 { 1009 struct io_cb_cancel_data *match = data; 1010 1011 /* 1012 * Hold the lock to avoid ->cur_work going out of scope, caller 1013 * may dereference the passed in work. 1014 */ 1015 raw_spin_lock(&worker->lock); 1016 if (__io_wq_worker_cancel(worker, match, worker->cur_work)) 1017 match->nr_running++; 1018 raw_spin_unlock(&worker->lock); 1019 1020 return match->nr_running && !match->cancel_all; 1021 } 1022 1023 static inline void io_wq_remove_pending(struct io_wq *wq, 1024 struct io_wq_work *work, 1025 struct io_wq_work_node *prev) 1026 { 1027 struct io_wq_acct *acct = io_work_get_acct(wq, work); 1028 unsigned int hash = io_get_work_hash(work); 1029 struct io_wq_work *prev_work = NULL; 1030 1031 if (io_wq_is_hashed(work) && work == wq->hash_tail[hash]) { 1032 if (prev) 1033 prev_work = container_of(prev, struct io_wq_work, list); 1034 if (prev_work && io_get_work_hash(prev_work) == hash) 1035 wq->hash_tail[hash] = prev_work; 1036 else 1037 wq->hash_tail[hash] = NULL; 1038 } 1039 wq_list_del(&acct->work_list, &work->list, prev); 1040 } 1041 1042 static bool io_acct_cancel_pending_work(struct io_wq *wq, 1043 struct io_wq_acct *acct, 1044 struct io_cb_cancel_data *match) 1045 { 1046 struct io_wq_work_node *node, *prev; 1047 struct io_wq_work *work; 1048 1049 raw_spin_lock(&acct->lock); 1050 wq_list_for_each(node, prev, &acct->work_list) { 1051 work = container_of(node, struct io_wq_work, list); 1052 if (!match->fn(work, match->data)) 1053 continue; 1054 io_wq_remove_pending(wq, work, prev); 1055 raw_spin_unlock(&acct->lock); 1056 io_run_cancel(work, wq); 1057 match->nr_pending++; 1058 /* not safe to continue after unlock */ 1059 return true; 1060 } 1061 raw_spin_unlock(&acct->lock); 1062 1063 return false; 1064 } 1065 1066 static void io_wq_cancel_pending_work(struct io_wq *wq, 1067 struct io_cb_cancel_data *match) 1068 { 1069 int i; 1070 retry: 1071 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1072 struct io_wq_acct *acct = io_get_acct(wq, i == 0); 1073 1074 if (io_acct_cancel_pending_work(wq, acct, match)) { 1075 if (match->cancel_all) 1076 goto retry; 1077 break; 1078 } 1079 } 1080 } 1081 1082 static void io_wq_cancel_running_work(struct io_wq *wq, 1083 struct io_cb_cancel_data *match) 1084 { 1085 rcu_read_lock(); 1086 io_wq_for_each_worker(wq, io_wq_worker_cancel, match); 1087 rcu_read_unlock(); 1088 } 1089 1090 enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, 1091 void *data, bool cancel_all) 1092 { 1093 struct io_cb_cancel_data match = { 1094 .fn = cancel, 1095 .data = data, 1096 .cancel_all = cancel_all, 1097 }; 1098 1099 /* 1100 * First check pending list, if we're lucky we can just remove it 1101 * from there. CANCEL_OK means that the work is returned as-new, 1102 * no completion will be posted for it. 1103 * 1104 * Then check if a free (going busy) or busy worker has the work 1105 * currently running. If we find it there, we'll return CANCEL_RUNNING 1106 * as an indication that we attempt to signal cancellation. The 1107 * completion will run normally in this case. 1108 * 1109 * Do both of these while holding the wq->lock, to ensure that 1110 * we'll find a work item regardless of state. 1111 */ 1112 io_wq_cancel_pending_work(wq, &match); 1113 if (match.nr_pending && !match.cancel_all) 1114 return IO_WQ_CANCEL_OK; 1115 1116 raw_spin_lock(&wq->lock); 1117 io_wq_cancel_running_work(wq, &match); 1118 raw_spin_unlock(&wq->lock); 1119 if (match.nr_running && !match.cancel_all) 1120 return IO_WQ_CANCEL_RUNNING; 1121 1122 if (match.nr_running) 1123 return IO_WQ_CANCEL_RUNNING; 1124 if (match.nr_pending) 1125 return IO_WQ_CANCEL_OK; 1126 return IO_WQ_CANCEL_NOTFOUND; 1127 } 1128 1129 static int io_wq_hash_wake(struct wait_queue_entry *wait, unsigned mode, 1130 int sync, void *key) 1131 { 1132 struct io_wq *wq = container_of(wait, struct io_wq, wait); 1133 int i; 1134 1135 list_del_init(&wait->entry); 1136 1137 rcu_read_lock(); 1138 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1139 struct io_wq_acct *acct = &wq->acct[i]; 1140 1141 if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags)) 1142 io_wq_activate_free_worker(wq, acct); 1143 } 1144 rcu_read_unlock(); 1145 return 1; 1146 } 1147 1148 struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) 1149 { 1150 int ret, i; 1151 struct io_wq *wq; 1152 1153 if (WARN_ON_ONCE(!data->free_work || !data->do_work)) 1154 return ERR_PTR(-EINVAL); 1155 if (WARN_ON_ONCE(!bounded)) 1156 return ERR_PTR(-EINVAL); 1157 1158 wq = kzalloc(sizeof(struct io_wq), GFP_KERNEL); 1159 if (!wq) 1160 return ERR_PTR(-ENOMEM); 1161 1162 refcount_inc(&data->hash->refs); 1163 wq->hash = data->hash; 1164 wq->free_work = data->free_work; 1165 wq->do_work = data->do_work; 1166 1167 ret = -ENOMEM; 1168 1169 if (!alloc_cpumask_var(&wq->cpu_mask, GFP_KERNEL)) 1170 goto err; 1171 cpuset_cpus_allowed(data->task, wq->cpu_mask); 1172 wq->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; 1173 wq->acct[IO_WQ_ACCT_UNBOUND].max_workers = 1174 task_rlimit(current, RLIMIT_NPROC); 1175 INIT_LIST_HEAD(&wq->wait.entry); 1176 wq->wait.func = io_wq_hash_wake; 1177 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1178 struct io_wq_acct *acct = &wq->acct[i]; 1179 1180 acct->index = i; 1181 atomic_set(&acct->nr_running, 0); 1182 INIT_WQ_LIST(&acct->work_list); 1183 raw_spin_lock_init(&acct->lock); 1184 } 1185 1186 raw_spin_lock_init(&wq->lock); 1187 INIT_HLIST_NULLS_HEAD(&wq->free_list, 0); 1188 INIT_LIST_HEAD(&wq->all_list); 1189 1190 wq->task = get_task_struct(data->task); 1191 atomic_set(&wq->worker_refs, 1); 1192 init_completion(&wq->worker_done); 1193 ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1194 if (ret) 1195 goto err; 1196 1197 return wq; 1198 err: 1199 io_wq_put_hash(data->hash); 1200 free_cpumask_var(wq->cpu_mask); 1201 kfree(wq); 1202 return ERR_PTR(ret); 1203 } 1204 1205 static bool io_task_work_match(struct callback_head *cb, void *data) 1206 { 1207 struct io_worker *worker; 1208 1209 if (cb->func != create_worker_cb && cb->func != create_worker_cont) 1210 return false; 1211 worker = container_of(cb, struct io_worker, create_work); 1212 return worker->wq == data; 1213 } 1214 1215 void io_wq_exit_start(struct io_wq *wq) 1216 { 1217 set_bit(IO_WQ_BIT_EXIT, &wq->state); 1218 } 1219 1220 static void io_wq_cancel_tw_create(struct io_wq *wq) 1221 { 1222 struct callback_head *cb; 1223 1224 while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) { 1225 struct io_worker *worker; 1226 1227 worker = container_of(cb, struct io_worker, create_work); 1228 io_worker_cancel_cb(worker); 1229 /* 1230 * Only the worker continuation helper has worker allocated and 1231 * hence needs freeing. 1232 */ 1233 if (cb->func == create_worker_cont) 1234 kfree(worker); 1235 } 1236 } 1237 1238 static void io_wq_exit_workers(struct io_wq *wq) 1239 { 1240 if (!wq->task) 1241 return; 1242 1243 io_wq_cancel_tw_create(wq); 1244 1245 rcu_read_lock(); 1246 io_wq_for_each_worker(wq, io_wq_worker_wake, NULL); 1247 rcu_read_unlock(); 1248 io_worker_ref_put(wq); 1249 wait_for_completion(&wq->worker_done); 1250 1251 spin_lock_irq(&wq->hash->wait.lock); 1252 list_del_init(&wq->wait.entry); 1253 spin_unlock_irq(&wq->hash->wait.lock); 1254 1255 put_task_struct(wq->task); 1256 wq->task = NULL; 1257 } 1258 1259 static void io_wq_destroy(struct io_wq *wq) 1260 { 1261 struct io_cb_cancel_data match = { 1262 .fn = io_wq_work_match_all, 1263 .cancel_all = true, 1264 }; 1265 1266 cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1267 io_wq_cancel_pending_work(wq, &match); 1268 free_cpumask_var(wq->cpu_mask); 1269 io_wq_put_hash(wq->hash); 1270 kfree(wq); 1271 } 1272 1273 void io_wq_put_and_exit(struct io_wq *wq) 1274 { 1275 WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state)); 1276 1277 io_wq_exit_workers(wq); 1278 io_wq_destroy(wq); 1279 } 1280 1281 struct online_data { 1282 unsigned int cpu; 1283 bool online; 1284 }; 1285 1286 static bool io_wq_worker_affinity(struct io_worker *worker, void *data) 1287 { 1288 struct online_data *od = data; 1289 1290 if (od->online) 1291 cpumask_set_cpu(od->cpu, worker->wq->cpu_mask); 1292 else 1293 cpumask_clear_cpu(od->cpu, worker->wq->cpu_mask); 1294 return false; 1295 } 1296 1297 static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online) 1298 { 1299 struct online_data od = { 1300 .cpu = cpu, 1301 .online = online 1302 }; 1303 1304 rcu_read_lock(); 1305 io_wq_for_each_worker(wq, io_wq_worker_affinity, &od); 1306 rcu_read_unlock(); 1307 return 0; 1308 } 1309 1310 static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node) 1311 { 1312 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1313 1314 return __io_wq_cpu_online(wq, cpu, true); 1315 } 1316 1317 static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node) 1318 { 1319 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1320 1321 return __io_wq_cpu_online(wq, cpu, false); 1322 } 1323 1324 int io_wq_cpu_affinity(struct io_uring_task *tctx, cpumask_var_t mask) 1325 { 1326 cpumask_var_t allowed_mask; 1327 int ret = 0; 1328 1329 if (!tctx || !tctx->io_wq) 1330 return -EINVAL; 1331 1332 if (!alloc_cpumask_var(&allowed_mask, GFP_KERNEL)) 1333 return -ENOMEM; 1334 1335 rcu_read_lock(); 1336 cpuset_cpus_allowed(tctx->io_wq->task, allowed_mask); 1337 if (mask) { 1338 if (cpumask_subset(mask, allowed_mask)) 1339 cpumask_copy(tctx->io_wq->cpu_mask, mask); 1340 else 1341 ret = -EINVAL; 1342 } else { 1343 cpumask_copy(tctx->io_wq->cpu_mask, allowed_mask); 1344 } 1345 rcu_read_unlock(); 1346 1347 free_cpumask_var(allowed_mask); 1348 return ret; 1349 } 1350 1351 /* 1352 * Set max number of unbounded workers, returns old value. If new_count is 0, 1353 * then just return the old value. 1354 */ 1355 int io_wq_max_workers(struct io_wq *wq, int *new_count) 1356 { 1357 struct io_wq_acct *acct; 1358 int prev[IO_WQ_ACCT_NR]; 1359 int i; 1360 1361 BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND); 1362 BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND); 1363 BUILD_BUG_ON((int) IO_WQ_ACCT_NR != 2); 1364 1365 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1366 if (new_count[i] > task_rlimit(current, RLIMIT_NPROC)) 1367 new_count[i] = task_rlimit(current, RLIMIT_NPROC); 1368 } 1369 1370 for (i = 0; i < IO_WQ_ACCT_NR; i++) 1371 prev[i] = 0; 1372 1373 rcu_read_lock(); 1374 1375 raw_spin_lock(&wq->lock); 1376 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1377 acct = &wq->acct[i]; 1378 prev[i] = max_t(int, acct->max_workers, prev[i]); 1379 if (new_count[i]) 1380 acct->max_workers = new_count[i]; 1381 } 1382 raw_spin_unlock(&wq->lock); 1383 rcu_read_unlock(); 1384 1385 for (i = 0; i < IO_WQ_ACCT_NR; i++) 1386 new_count[i] = prev[i]; 1387 1388 return 0; 1389 } 1390 1391 static __init int io_wq_init(void) 1392 { 1393 int ret; 1394 1395 ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online", 1396 io_wq_cpu_online, io_wq_cpu_offline); 1397 if (ret < 0) 1398 return ret; 1399 io_wq_online = ret; 1400 return 0; 1401 } 1402 subsys_initcall(io_wq_init); 1403