1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Basic worker thread pool for io_uring 4 * 5 * Copyright (C) 2019 Jens Axboe 6 * 7 */ 8 #include <linux/kernel.h> 9 #include <linux/init.h> 10 #include <linux/errno.h> 11 #include <linux/sched/signal.h> 12 #include <linux/percpu.h> 13 #include <linux/slab.h> 14 #include <linux/rculist_nulls.h> 15 #include <linux/cpu.h> 16 #include <linux/task_work.h> 17 #include <linux/audit.h> 18 #include <linux/mmu_context.h> 19 #include <uapi/linux/io_uring.h> 20 21 #include "io-wq.h" 22 #include "slist.h" 23 #include "io_uring.h" 24 25 #define WORKER_IDLE_TIMEOUT (5 * HZ) 26 27 enum { 28 IO_WORKER_F_UP = 0, /* up and active */ 29 IO_WORKER_F_RUNNING = 1, /* account as running */ 30 IO_WORKER_F_FREE = 2, /* worker on free list */ 31 IO_WORKER_F_BOUND = 3, /* is doing bounded work */ 32 }; 33 34 enum { 35 IO_WQ_BIT_EXIT = 0, /* wq exiting */ 36 }; 37 38 enum { 39 IO_ACCT_STALLED_BIT = 0, /* stalled on hash */ 40 }; 41 42 /* 43 * One for each thread in a wq pool 44 */ 45 struct io_worker { 46 refcount_t ref; 47 int create_index; 48 unsigned long flags; 49 struct hlist_nulls_node nulls_node; 50 struct list_head all_list; 51 struct task_struct *task; 52 struct io_wq *wq; 53 54 struct io_wq_work *cur_work; 55 raw_spinlock_t lock; 56 57 struct completion ref_done; 58 59 unsigned long create_state; 60 struct callback_head create_work; 61 62 union { 63 struct rcu_head rcu; 64 struct work_struct work; 65 }; 66 }; 67 68 #if BITS_PER_LONG == 64 69 #define IO_WQ_HASH_ORDER 6 70 #else 71 #define IO_WQ_HASH_ORDER 5 72 #endif 73 74 #define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER) 75 76 struct io_wq_acct { 77 unsigned nr_workers; 78 unsigned max_workers; 79 int index; 80 atomic_t nr_running; 81 raw_spinlock_t lock; 82 struct io_wq_work_list work_list; 83 unsigned long flags; 84 }; 85 86 enum { 87 IO_WQ_ACCT_BOUND, 88 IO_WQ_ACCT_UNBOUND, 89 IO_WQ_ACCT_NR, 90 }; 91 92 /* 93 * Per io_wq state 94 */ 95 struct io_wq { 96 unsigned long state; 97 98 free_work_fn *free_work; 99 io_wq_work_fn *do_work; 100 101 struct io_wq_hash *hash; 102 103 atomic_t worker_refs; 104 struct completion worker_done; 105 106 struct hlist_node cpuhp_node; 107 108 struct task_struct *task; 109 110 struct io_wq_acct acct[IO_WQ_ACCT_NR]; 111 112 /* lock protects access to elements below */ 113 raw_spinlock_t lock; 114 115 struct hlist_nulls_head free_list; 116 struct list_head all_list; 117 118 struct wait_queue_entry wait; 119 120 struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS]; 121 122 cpumask_var_t cpu_mask; 123 }; 124 125 static enum cpuhp_state io_wq_online; 126 127 struct io_cb_cancel_data { 128 work_cancel_fn *fn; 129 void *data; 130 int nr_running; 131 int nr_pending; 132 bool cancel_all; 133 }; 134 135 static bool create_io_worker(struct io_wq *wq, int index); 136 static void io_wq_dec_running(struct io_worker *worker); 137 static bool io_acct_cancel_pending_work(struct io_wq *wq, 138 struct io_wq_acct *acct, 139 struct io_cb_cancel_data *match); 140 static void create_worker_cb(struct callback_head *cb); 141 static void io_wq_cancel_tw_create(struct io_wq *wq); 142 143 static bool io_worker_get(struct io_worker *worker) 144 { 145 return refcount_inc_not_zero(&worker->ref); 146 } 147 148 static void io_worker_release(struct io_worker *worker) 149 { 150 if (refcount_dec_and_test(&worker->ref)) 151 complete(&worker->ref_done); 152 } 153 154 static inline struct io_wq_acct *io_get_acct(struct io_wq *wq, bool bound) 155 { 156 return &wq->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND]; 157 } 158 159 static inline struct io_wq_acct *io_work_get_acct(struct io_wq *wq, 160 struct io_wq_work *work) 161 { 162 return io_get_acct(wq, !(work->flags & IO_WQ_WORK_UNBOUND)); 163 } 164 165 static inline struct io_wq_acct *io_wq_get_acct(struct io_worker *worker) 166 { 167 return io_get_acct(worker->wq, test_bit(IO_WORKER_F_BOUND, &worker->flags)); 168 } 169 170 static void io_worker_ref_put(struct io_wq *wq) 171 { 172 if (atomic_dec_and_test(&wq->worker_refs)) 173 complete(&wq->worker_done); 174 } 175 176 bool io_wq_worker_stopped(void) 177 { 178 struct io_worker *worker = current->worker_private; 179 180 if (WARN_ON_ONCE(!io_wq_current_is_worker())) 181 return true; 182 183 return test_bit(IO_WQ_BIT_EXIT, &worker->wq->state); 184 } 185 186 static void io_worker_cancel_cb(struct io_worker *worker) 187 { 188 struct io_wq_acct *acct = io_wq_get_acct(worker); 189 struct io_wq *wq = worker->wq; 190 191 atomic_dec(&acct->nr_running); 192 raw_spin_lock(&wq->lock); 193 acct->nr_workers--; 194 raw_spin_unlock(&wq->lock); 195 io_worker_ref_put(wq); 196 clear_bit_unlock(0, &worker->create_state); 197 io_worker_release(worker); 198 } 199 200 static bool io_task_worker_match(struct callback_head *cb, void *data) 201 { 202 struct io_worker *worker; 203 204 if (cb->func != create_worker_cb) 205 return false; 206 worker = container_of(cb, struct io_worker, create_work); 207 return worker == data; 208 } 209 210 static void io_worker_exit(struct io_worker *worker) 211 { 212 struct io_wq *wq = worker->wq; 213 214 while (1) { 215 struct callback_head *cb = task_work_cancel_match(wq->task, 216 io_task_worker_match, worker); 217 218 if (!cb) 219 break; 220 io_worker_cancel_cb(worker); 221 } 222 223 io_worker_release(worker); 224 wait_for_completion(&worker->ref_done); 225 226 raw_spin_lock(&wq->lock); 227 if (test_bit(IO_WORKER_F_FREE, &worker->flags)) 228 hlist_nulls_del_rcu(&worker->nulls_node); 229 list_del_rcu(&worker->all_list); 230 raw_spin_unlock(&wq->lock); 231 io_wq_dec_running(worker); 232 /* 233 * this worker is a goner, clear ->worker_private to avoid any 234 * inc/dec running calls that could happen as part of exit from 235 * touching 'worker'. 236 */ 237 current->worker_private = NULL; 238 239 kfree_rcu(worker, rcu); 240 io_worker_ref_put(wq); 241 do_exit(0); 242 } 243 244 static inline bool __io_acct_run_queue(struct io_wq_acct *acct) 245 { 246 return !test_bit(IO_ACCT_STALLED_BIT, &acct->flags) && 247 !wq_list_empty(&acct->work_list); 248 } 249 250 /* 251 * If there's work to do, returns true with acct->lock acquired. If not, 252 * returns false with no lock held. 253 */ 254 static inline bool io_acct_run_queue(struct io_wq_acct *acct) 255 __acquires(&acct->lock) 256 { 257 raw_spin_lock(&acct->lock); 258 if (__io_acct_run_queue(acct)) 259 return true; 260 261 raw_spin_unlock(&acct->lock); 262 return false; 263 } 264 265 /* 266 * Check head of free list for an available worker. If one isn't available, 267 * caller must create one. 268 */ 269 static bool io_wq_activate_free_worker(struct io_wq *wq, 270 struct io_wq_acct *acct) 271 __must_hold(RCU) 272 { 273 struct hlist_nulls_node *n; 274 struct io_worker *worker; 275 276 /* 277 * Iterate free_list and see if we can find an idle worker to 278 * activate. If a given worker is on the free_list but in the process 279 * of exiting, keep trying. 280 */ 281 hlist_nulls_for_each_entry_rcu(worker, n, &wq->free_list, nulls_node) { 282 if (!io_worker_get(worker)) 283 continue; 284 if (io_wq_get_acct(worker) != acct) { 285 io_worker_release(worker); 286 continue; 287 } 288 /* 289 * If the worker is already running, it's either already 290 * starting work or finishing work. In either case, if it does 291 * to go sleep, we'll kick off a new task for this work anyway. 292 */ 293 wake_up_process(worker->task); 294 io_worker_release(worker); 295 return true; 296 } 297 298 return false; 299 } 300 301 /* 302 * We need a worker. If we find a free one, we're good. If not, and we're 303 * below the max number of workers, create one. 304 */ 305 static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct) 306 { 307 /* 308 * Most likely an attempt to queue unbounded work on an io_wq that 309 * wasn't setup with any unbounded workers. 310 */ 311 if (unlikely(!acct->max_workers)) 312 pr_warn_once("io-wq is not configured for unbound workers"); 313 314 raw_spin_lock(&wq->lock); 315 if (acct->nr_workers >= acct->max_workers) { 316 raw_spin_unlock(&wq->lock); 317 return true; 318 } 319 acct->nr_workers++; 320 raw_spin_unlock(&wq->lock); 321 atomic_inc(&acct->nr_running); 322 atomic_inc(&wq->worker_refs); 323 return create_io_worker(wq, acct->index); 324 } 325 326 static void io_wq_inc_running(struct io_worker *worker) 327 { 328 struct io_wq_acct *acct = io_wq_get_acct(worker); 329 330 atomic_inc(&acct->nr_running); 331 } 332 333 static void create_worker_cb(struct callback_head *cb) 334 { 335 struct io_worker *worker; 336 struct io_wq *wq; 337 338 struct io_wq_acct *acct; 339 bool do_create = false; 340 341 worker = container_of(cb, struct io_worker, create_work); 342 wq = worker->wq; 343 acct = &wq->acct[worker->create_index]; 344 raw_spin_lock(&wq->lock); 345 346 if (acct->nr_workers < acct->max_workers) { 347 acct->nr_workers++; 348 do_create = true; 349 } 350 raw_spin_unlock(&wq->lock); 351 if (do_create) { 352 create_io_worker(wq, worker->create_index); 353 } else { 354 atomic_dec(&acct->nr_running); 355 io_worker_ref_put(wq); 356 } 357 clear_bit_unlock(0, &worker->create_state); 358 io_worker_release(worker); 359 } 360 361 static bool io_queue_worker_create(struct io_worker *worker, 362 struct io_wq_acct *acct, 363 task_work_func_t func) 364 { 365 struct io_wq *wq = worker->wq; 366 367 /* raced with exit, just ignore create call */ 368 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) 369 goto fail; 370 if (!io_worker_get(worker)) 371 goto fail; 372 /* 373 * create_state manages ownership of create_work/index. We should 374 * only need one entry per worker, as the worker going to sleep 375 * will trigger the condition, and waking will clear it once it 376 * runs the task_work. 377 */ 378 if (test_bit(0, &worker->create_state) || 379 test_and_set_bit_lock(0, &worker->create_state)) 380 goto fail_release; 381 382 atomic_inc(&wq->worker_refs); 383 init_task_work(&worker->create_work, func); 384 worker->create_index = acct->index; 385 if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) { 386 /* 387 * EXIT may have been set after checking it above, check after 388 * adding the task_work and remove any creation item if it is 389 * now set. wq exit does that too, but we can have added this 390 * work item after we canceled in io_wq_exit_workers(). 391 */ 392 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) 393 io_wq_cancel_tw_create(wq); 394 io_worker_ref_put(wq); 395 return true; 396 } 397 io_worker_ref_put(wq); 398 clear_bit_unlock(0, &worker->create_state); 399 fail_release: 400 io_worker_release(worker); 401 fail: 402 atomic_dec(&acct->nr_running); 403 io_worker_ref_put(wq); 404 return false; 405 } 406 407 static void io_wq_dec_running(struct io_worker *worker) 408 { 409 struct io_wq_acct *acct = io_wq_get_acct(worker); 410 struct io_wq *wq = worker->wq; 411 412 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 413 return; 414 415 if (!atomic_dec_and_test(&acct->nr_running)) 416 return; 417 if (!io_acct_run_queue(acct)) 418 return; 419 420 raw_spin_unlock(&acct->lock); 421 atomic_inc(&acct->nr_running); 422 atomic_inc(&wq->worker_refs); 423 io_queue_worker_create(worker, acct, create_worker_cb); 424 } 425 426 /* 427 * Worker will start processing some work. Move it to the busy list, if 428 * it's currently on the freelist 429 */ 430 static void __io_worker_busy(struct io_wq *wq, struct io_worker *worker) 431 { 432 if (test_bit(IO_WORKER_F_FREE, &worker->flags)) { 433 clear_bit(IO_WORKER_F_FREE, &worker->flags); 434 raw_spin_lock(&wq->lock); 435 hlist_nulls_del_init_rcu(&worker->nulls_node); 436 raw_spin_unlock(&wq->lock); 437 } 438 } 439 440 /* 441 * No work, worker going to sleep. Move to freelist. 442 */ 443 static void __io_worker_idle(struct io_wq *wq, struct io_worker *worker) 444 __must_hold(wq->lock) 445 { 446 if (!test_bit(IO_WORKER_F_FREE, &worker->flags)) { 447 set_bit(IO_WORKER_F_FREE, &worker->flags); 448 hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list); 449 } 450 } 451 452 static inline unsigned int io_get_work_hash(struct io_wq_work *work) 453 { 454 return work->flags >> IO_WQ_HASH_SHIFT; 455 } 456 457 static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash) 458 { 459 bool ret = false; 460 461 spin_lock_irq(&wq->hash->wait.lock); 462 if (list_empty(&wq->wait.entry)) { 463 __add_wait_queue(&wq->hash->wait, &wq->wait); 464 if (!test_bit(hash, &wq->hash->map)) { 465 __set_current_state(TASK_RUNNING); 466 list_del_init(&wq->wait.entry); 467 ret = true; 468 } 469 } 470 spin_unlock_irq(&wq->hash->wait.lock); 471 return ret; 472 } 473 474 static struct io_wq_work *io_get_next_work(struct io_wq_acct *acct, 475 struct io_worker *worker) 476 __must_hold(acct->lock) 477 { 478 struct io_wq_work_node *node, *prev; 479 struct io_wq_work *work, *tail; 480 unsigned int stall_hash = -1U; 481 struct io_wq *wq = worker->wq; 482 483 wq_list_for_each(node, prev, &acct->work_list) { 484 unsigned int hash; 485 486 work = container_of(node, struct io_wq_work, list); 487 488 /* not hashed, can run anytime */ 489 if (!io_wq_is_hashed(work)) { 490 wq_list_del(&acct->work_list, node, prev); 491 return work; 492 } 493 494 hash = io_get_work_hash(work); 495 /* all items with this hash lie in [work, tail] */ 496 tail = wq->hash_tail[hash]; 497 498 /* hashed, can run if not already running */ 499 if (!test_and_set_bit(hash, &wq->hash->map)) { 500 wq->hash_tail[hash] = NULL; 501 wq_list_cut(&acct->work_list, &tail->list, prev); 502 return work; 503 } 504 if (stall_hash == -1U) 505 stall_hash = hash; 506 /* fast forward to a next hash, for-each will fix up @prev */ 507 node = &tail->list; 508 } 509 510 if (stall_hash != -1U) { 511 bool unstalled; 512 513 /* 514 * Set this before dropping the lock to avoid racing with new 515 * work being added and clearing the stalled bit. 516 */ 517 set_bit(IO_ACCT_STALLED_BIT, &acct->flags); 518 raw_spin_unlock(&acct->lock); 519 unstalled = io_wait_on_hash(wq, stall_hash); 520 raw_spin_lock(&acct->lock); 521 if (unstalled) { 522 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 523 if (wq_has_sleeper(&wq->hash->wait)) 524 wake_up(&wq->hash->wait); 525 } 526 } 527 528 return NULL; 529 } 530 531 static void io_assign_current_work(struct io_worker *worker, 532 struct io_wq_work *work) 533 { 534 if (work) { 535 io_run_task_work(); 536 cond_resched(); 537 } 538 539 raw_spin_lock(&worker->lock); 540 worker->cur_work = work; 541 raw_spin_unlock(&worker->lock); 542 } 543 544 /* 545 * Called with acct->lock held, drops it before returning 546 */ 547 static void io_worker_handle_work(struct io_wq_acct *acct, 548 struct io_worker *worker) 549 __releases(&acct->lock) 550 { 551 struct io_wq *wq = worker->wq; 552 bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state); 553 554 do { 555 struct io_wq_work *work; 556 557 /* 558 * If we got some work, mark us as busy. If we didn't, but 559 * the list isn't empty, it means we stalled on hashed work. 560 * Mark us stalled so we don't keep looking for work when we 561 * can't make progress, any work completion or insertion will 562 * clear the stalled flag. 563 */ 564 work = io_get_next_work(acct, worker); 565 if (work) { 566 /* 567 * Make sure cancelation can find this, even before 568 * it becomes the active work. That avoids a window 569 * where the work has been removed from our general 570 * work list, but isn't yet discoverable as the 571 * current work item for this worker. 572 */ 573 raw_spin_lock(&worker->lock); 574 worker->cur_work = work; 575 raw_spin_unlock(&worker->lock); 576 } 577 578 raw_spin_unlock(&acct->lock); 579 580 if (!work) 581 break; 582 583 __io_worker_busy(wq, worker); 584 585 io_assign_current_work(worker, work); 586 __set_current_state(TASK_RUNNING); 587 588 /* handle a whole dependent link */ 589 do { 590 struct io_wq_work *next_hashed, *linked; 591 unsigned int hash = io_get_work_hash(work); 592 593 next_hashed = wq_next_work(work); 594 595 if (unlikely(do_kill) && (work->flags & IO_WQ_WORK_UNBOUND)) 596 work->flags |= IO_WQ_WORK_CANCEL; 597 wq->do_work(work); 598 io_assign_current_work(worker, NULL); 599 600 linked = wq->free_work(work); 601 work = next_hashed; 602 if (!work && linked && !io_wq_is_hashed(linked)) { 603 work = linked; 604 linked = NULL; 605 } 606 io_assign_current_work(worker, work); 607 if (linked) 608 io_wq_enqueue(wq, linked); 609 610 if (hash != -1U && !next_hashed) { 611 /* serialize hash clear with wake_up() */ 612 spin_lock_irq(&wq->hash->wait.lock); 613 clear_bit(hash, &wq->hash->map); 614 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 615 spin_unlock_irq(&wq->hash->wait.lock); 616 if (wq_has_sleeper(&wq->hash->wait)) 617 wake_up(&wq->hash->wait); 618 } 619 } while (work); 620 621 if (!__io_acct_run_queue(acct)) 622 break; 623 raw_spin_lock(&acct->lock); 624 } while (1); 625 } 626 627 static int io_wq_worker(void *data) 628 { 629 struct io_worker *worker = data; 630 struct io_wq_acct *acct = io_wq_get_acct(worker); 631 struct io_wq *wq = worker->wq; 632 bool exit_mask = false, last_timeout = false; 633 char buf[TASK_COMM_LEN]; 634 635 set_mask_bits(&worker->flags, 0, 636 BIT(IO_WORKER_F_UP) | BIT(IO_WORKER_F_RUNNING)); 637 638 snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid); 639 set_task_comm(current, buf); 640 641 while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { 642 long ret; 643 644 set_current_state(TASK_INTERRUPTIBLE); 645 646 /* 647 * If we have work to do, io_acct_run_queue() returns with 648 * the acct->lock held. If not, it will drop it. 649 */ 650 while (io_acct_run_queue(acct)) 651 io_worker_handle_work(acct, worker); 652 653 raw_spin_lock(&wq->lock); 654 /* 655 * Last sleep timed out. Exit if we're not the last worker, 656 * or if someone modified our affinity. 657 */ 658 if (last_timeout && (exit_mask || acct->nr_workers > 1)) { 659 acct->nr_workers--; 660 raw_spin_unlock(&wq->lock); 661 __set_current_state(TASK_RUNNING); 662 break; 663 } 664 last_timeout = false; 665 __io_worker_idle(wq, worker); 666 raw_spin_unlock(&wq->lock); 667 if (io_run_task_work()) 668 continue; 669 ret = schedule_timeout(WORKER_IDLE_TIMEOUT); 670 if (signal_pending(current)) { 671 struct ksignal ksig; 672 673 if (!get_signal(&ksig)) 674 continue; 675 break; 676 } 677 if (!ret) { 678 last_timeout = true; 679 exit_mask = !cpumask_test_cpu(raw_smp_processor_id(), 680 wq->cpu_mask); 681 } 682 } 683 684 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) && io_acct_run_queue(acct)) 685 io_worker_handle_work(acct, worker); 686 687 io_worker_exit(worker); 688 return 0; 689 } 690 691 /* 692 * Called when a worker is scheduled in. Mark us as currently running. 693 */ 694 void io_wq_worker_running(struct task_struct *tsk) 695 { 696 struct io_worker *worker = tsk->worker_private; 697 698 if (!worker) 699 return; 700 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 701 return; 702 if (test_bit(IO_WORKER_F_RUNNING, &worker->flags)) 703 return; 704 set_bit(IO_WORKER_F_RUNNING, &worker->flags); 705 io_wq_inc_running(worker); 706 } 707 708 /* 709 * Called when worker is going to sleep. If there are no workers currently 710 * running and we have work pending, wake up a free one or create a new one. 711 */ 712 void io_wq_worker_sleeping(struct task_struct *tsk) 713 { 714 struct io_worker *worker = tsk->worker_private; 715 716 if (!worker) 717 return; 718 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 719 return; 720 if (!test_bit(IO_WORKER_F_RUNNING, &worker->flags)) 721 return; 722 723 clear_bit(IO_WORKER_F_RUNNING, &worker->flags); 724 io_wq_dec_running(worker); 725 } 726 727 static void io_init_new_worker(struct io_wq *wq, struct io_worker *worker, 728 struct task_struct *tsk) 729 { 730 tsk->worker_private = worker; 731 worker->task = tsk; 732 set_cpus_allowed_ptr(tsk, wq->cpu_mask); 733 734 raw_spin_lock(&wq->lock); 735 hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list); 736 list_add_tail_rcu(&worker->all_list, &wq->all_list); 737 set_bit(IO_WORKER_F_FREE, &worker->flags); 738 raw_spin_unlock(&wq->lock); 739 wake_up_new_task(tsk); 740 } 741 742 static bool io_wq_work_match_all(struct io_wq_work *work, void *data) 743 { 744 return true; 745 } 746 747 static inline bool io_should_retry_thread(long err) 748 { 749 /* 750 * Prevent perpetual task_work retry, if the task (or its group) is 751 * exiting. 752 */ 753 if (fatal_signal_pending(current)) 754 return false; 755 756 switch (err) { 757 case -EAGAIN: 758 case -ERESTARTSYS: 759 case -ERESTARTNOINTR: 760 case -ERESTARTNOHAND: 761 return true; 762 default: 763 return false; 764 } 765 } 766 767 static void create_worker_cont(struct callback_head *cb) 768 { 769 struct io_worker *worker; 770 struct task_struct *tsk; 771 struct io_wq *wq; 772 773 worker = container_of(cb, struct io_worker, create_work); 774 clear_bit_unlock(0, &worker->create_state); 775 wq = worker->wq; 776 tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); 777 if (!IS_ERR(tsk)) { 778 io_init_new_worker(wq, worker, tsk); 779 io_worker_release(worker); 780 return; 781 } else if (!io_should_retry_thread(PTR_ERR(tsk))) { 782 struct io_wq_acct *acct = io_wq_get_acct(worker); 783 784 atomic_dec(&acct->nr_running); 785 raw_spin_lock(&wq->lock); 786 acct->nr_workers--; 787 if (!acct->nr_workers) { 788 struct io_cb_cancel_data match = { 789 .fn = io_wq_work_match_all, 790 .cancel_all = true, 791 }; 792 793 raw_spin_unlock(&wq->lock); 794 while (io_acct_cancel_pending_work(wq, acct, &match)) 795 ; 796 } else { 797 raw_spin_unlock(&wq->lock); 798 } 799 io_worker_ref_put(wq); 800 kfree(worker); 801 return; 802 } 803 804 /* re-create attempts grab a new worker ref, drop the existing one */ 805 io_worker_release(worker); 806 schedule_work(&worker->work); 807 } 808 809 static void io_workqueue_create(struct work_struct *work) 810 { 811 struct io_worker *worker = container_of(work, struct io_worker, work); 812 struct io_wq_acct *acct = io_wq_get_acct(worker); 813 814 if (!io_queue_worker_create(worker, acct, create_worker_cont)) 815 kfree(worker); 816 } 817 818 static bool create_io_worker(struct io_wq *wq, int index) 819 { 820 struct io_wq_acct *acct = &wq->acct[index]; 821 struct io_worker *worker; 822 struct task_struct *tsk; 823 824 __set_current_state(TASK_RUNNING); 825 826 worker = kzalloc(sizeof(*worker), GFP_KERNEL); 827 if (!worker) { 828 fail: 829 atomic_dec(&acct->nr_running); 830 raw_spin_lock(&wq->lock); 831 acct->nr_workers--; 832 raw_spin_unlock(&wq->lock); 833 io_worker_ref_put(wq); 834 return false; 835 } 836 837 refcount_set(&worker->ref, 1); 838 worker->wq = wq; 839 raw_spin_lock_init(&worker->lock); 840 init_completion(&worker->ref_done); 841 842 if (index == IO_WQ_ACCT_BOUND) 843 set_bit(IO_WORKER_F_BOUND, &worker->flags); 844 845 tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); 846 if (!IS_ERR(tsk)) { 847 io_init_new_worker(wq, worker, tsk); 848 } else if (!io_should_retry_thread(PTR_ERR(tsk))) { 849 kfree(worker); 850 goto fail; 851 } else { 852 INIT_WORK(&worker->work, io_workqueue_create); 853 schedule_work(&worker->work); 854 } 855 856 return true; 857 } 858 859 /* 860 * Iterate the passed in list and call the specific function for each 861 * worker that isn't exiting 862 */ 863 static bool io_wq_for_each_worker(struct io_wq *wq, 864 bool (*func)(struct io_worker *, void *), 865 void *data) 866 { 867 struct io_worker *worker; 868 bool ret = false; 869 870 list_for_each_entry_rcu(worker, &wq->all_list, all_list) { 871 if (io_worker_get(worker)) { 872 /* no task if node is/was offline */ 873 if (worker->task) 874 ret = func(worker, data); 875 io_worker_release(worker); 876 if (ret) 877 break; 878 } 879 } 880 881 return ret; 882 } 883 884 static bool io_wq_worker_wake(struct io_worker *worker, void *data) 885 { 886 __set_notify_signal(worker->task); 887 wake_up_process(worker->task); 888 return false; 889 } 890 891 static void io_run_cancel(struct io_wq_work *work, struct io_wq *wq) 892 { 893 do { 894 work->flags |= IO_WQ_WORK_CANCEL; 895 wq->do_work(work); 896 work = wq->free_work(work); 897 } while (work); 898 } 899 900 static void io_wq_insert_work(struct io_wq *wq, struct io_wq_work *work) 901 { 902 struct io_wq_acct *acct = io_work_get_acct(wq, work); 903 unsigned int hash; 904 struct io_wq_work *tail; 905 906 if (!io_wq_is_hashed(work)) { 907 append: 908 wq_list_add_tail(&work->list, &acct->work_list); 909 return; 910 } 911 912 hash = io_get_work_hash(work); 913 tail = wq->hash_tail[hash]; 914 wq->hash_tail[hash] = work; 915 if (!tail) 916 goto append; 917 918 wq_list_add_after(&work->list, &tail->list, &acct->work_list); 919 } 920 921 static bool io_wq_work_match_item(struct io_wq_work *work, void *data) 922 { 923 return work == data; 924 } 925 926 void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) 927 { 928 struct io_wq_acct *acct = io_work_get_acct(wq, work); 929 unsigned long work_flags = work->flags; 930 struct io_cb_cancel_data match = { 931 .fn = io_wq_work_match_item, 932 .data = work, 933 .cancel_all = false, 934 }; 935 bool do_create; 936 937 /* 938 * If io-wq is exiting for this task, or if the request has explicitly 939 * been marked as one that should not get executed, cancel it here. 940 */ 941 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) || 942 (work->flags & IO_WQ_WORK_CANCEL)) { 943 io_run_cancel(work, wq); 944 return; 945 } 946 947 raw_spin_lock(&acct->lock); 948 io_wq_insert_work(wq, work); 949 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 950 raw_spin_unlock(&acct->lock); 951 952 rcu_read_lock(); 953 do_create = !io_wq_activate_free_worker(wq, acct); 954 rcu_read_unlock(); 955 956 if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) || 957 !atomic_read(&acct->nr_running))) { 958 bool did_create; 959 960 did_create = io_wq_create_worker(wq, acct); 961 if (likely(did_create)) 962 return; 963 964 raw_spin_lock(&wq->lock); 965 if (acct->nr_workers) { 966 raw_spin_unlock(&wq->lock); 967 return; 968 } 969 raw_spin_unlock(&wq->lock); 970 971 /* fatal condition, failed to create the first worker */ 972 io_acct_cancel_pending_work(wq, acct, &match); 973 } 974 } 975 976 /* 977 * Work items that hash to the same value will not be done in parallel. 978 * Used to limit concurrent writes, generally hashed by inode. 979 */ 980 void io_wq_hash_work(struct io_wq_work *work, void *val) 981 { 982 unsigned int bit; 983 984 bit = hash_ptr(val, IO_WQ_HASH_ORDER); 985 work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT)); 986 } 987 988 static bool __io_wq_worker_cancel(struct io_worker *worker, 989 struct io_cb_cancel_data *match, 990 struct io_wq_work *work) 991 { 992 if (work && match->fn(work, match->data)) { 993 work->flags |= IO_WQ_WORK_CANCEL; 994 __set_notify_signal(worker->task); 995 return true; 996 } 997 998 return false; 999 } 1000 1001 static bool io_wq_worker_cancel(struct io_worker *worker, void *data) 1002 { 1003 struct io_cb_cancel_data *match = data; 1004 1005 /* 1006 * Hold the lock to avoid ->cur_work going out of scope, caller 1007 * may dereference the passed in work. 1008 */ 1009 raw_spin_lock(&worker->lock); 1010 if (__io_wq_worker_cancel(worker, match, worker->cur_work)) 1011 match->nr_running++; 1012 raw_spin_unlock(&worker->lock); 1013 1014 return match->nr_running && !match->cancel_all; 1015 } 1016 1017 static inline void io_wq_remove_pending(struct io_wq *wq, 1018 struct io_wq_work *work, 1019 struct io_wq_work_node *prev) 1020 { 1021 struct io_wq_acct *acct = io_work_get_acct(wq, work); 1022 unsigned int hash = io_get_work_hash(work); 1023 struct io_wq_work *prev_work = NULL; 1024 1025 if (io_wq_is_hashed(work) && work == wq->hash_tail[hash]) { 1026 if (prev) 1027 prev_work = container_of(prev, struct io_wq_work, list); 1028 if (prev_work && io_get_work_hash(prev_work) == hash) 1029 wq->hash_tail[hash] = prev_work; 1030 else 1031 wq->hash_tail[hash] = NULL; 1032 } 1033 wq_list_del(&acct->work_list, &work->list, prev); 1034 } 1035 1036 static bool io_acct_cancel_pending_work(struct io_wq *wq, 1037 struct io_wq_acct *acct, 1038 struct io_cb_cancel_data *match) 1039 { 1040 struct io_wq_work_node *node, *prev; 1041 struct io_wq_work *work; 1042 1043 raw_spin_lock(&acct->lock); 1044 wq_list_for_each(node, prev, &acct->work_list) { 1045 work = container_of(node, struct io_wq_work, list); 1046 if (!match->fn(work, match->data)) 1047 continue; 1048 io_wq_remove_pending(wq, work, prev); 1049 raw_spin_unlock(&acct->lock); 1050 io_run_cancel(work, wq); 1051 match->nr_pending++; 1052 /* not safe to continue after unlock */ 1053 return true; 1054 } 1055 raw_spin_unlock(&acct->lock); 1056 1057 return false; 1058 } 1059 1060 static void io_wq_cancel_pending_work(struct io_wq *wq, 1061 struct io_cb_cancel_data *match) 1062 { 1063 int i; 1064 retry: 1065 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1066 struct io_wq_acct *acct = io_get_acct(wq, i == 0); 1067 1068 if (io_acct_cancel_pending_work(wq, acct, match)) { 1069 if (match->cancel_all) 1070 goto retry; 1071 break; 1072 } 1073 } 1074 } 1075 1076 static void io_wq_cancel_running_work(struct io_wq *wq, 1077 struct io_cb_cancel_data *match) 1078 { 1079 rcu_read_lock(); 1080 io_wq_for_each_worker(wq, io_wq_worker_cancel, match); 1081 rcu_read_unlock(); 1082 } 1083 1084 enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, 1085 void *data, bool cancel_all) 1086 { 1087 struct io_cb_cancel_data match = { 1088 .fn = cancel, 1089 .data = data, 1090 .cancel_all = cancel_all, 1091 }; 1092 1093 /* 1094 * First check pending list, if we're lucky we can just remove it 1095 * from there. CANCEL_OK means that the work is returned as-new, 1096 * no completion will be posted for it. 1097 * 1098 * Then check if a free (going busy) or busy worker has the work 1099 * currently running. If we find it there, we'll return CANCEL_RUNNING 1100 * as an indication that we attempt to signal cancellation. The 1101 * completion will run normally in this case. 1102 * 1103 * Do both of these while holding the wq->lock, to ensure that 1104 * we'll find a work item regardless of state. 1105 */ 1106 io_wq_cancel_pending_work(wq, &match); 1107 if (match.nr_pending && !match.cancel_all) 1108 return IO_WQ_CANCEL_OK; 1109 1110 raw_spin_lock(&wq->lock); 1111 io_wq_cancel_running_work(wq, &match); 1112 raw_spin_unlock(&wq->lock); 1113 if (match.nr_running && !match.cancel_all) 1114 return IO_WQ_CANCEL_RUNNING; 1115 1116 if (match.nr_running) 1117 return IO_WQ_CANCEL_RUNNING; 1118 if (match.nr_pending) 1119 return IO_WQ_CANCEL_OK; 1120 return IO_WQ_CANCEL_NOTFOUND; 1121 } 1122 1123 static int io_wq_hash_wake(struct wait_queue_entry *wait, unsigned mode, 1124 int sync, void *key) 1125 { 1126 struct io_wq *wq = container_of(wait, struct io_wq, wait); 1127 int i; 1128 1129 list_del_init(&wait->entry); 1130 1131 rcu_read_lock(); 1132 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1133 struct io_wq_acct *acct = &wq->acct[i]; 1134 1135 if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags)) 1136 io_wq_activate_free_worker(wq, acct); 1137 } 1138 rcu_read_unlock(); 1139 return 1; 1140 } 1141 1142 struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) 1143 { 1144 int ret, i; 1145 struct io_wq *wq; 1146 1147 if (WARN_ON_ONCE(!data->free_work || !data->do_work)) 1148 return ERR_PTR(-EINVAL); 1149 if (WARN_ON_ONCE(!bounded)) 1150 return ERR_PTR(-EINVAL); 1151 1152 wq = kzalloc(sizeof(struct io_wq), GFP_KERNEL); 1153 if (!wq) 1154 return ERR_PTR(-ENOMEM); 1155 1156 refcount_inc(&data->hash->refs); 1157 wq->hash = data->hash; 1158 wq->free_work = data->free_work; 1159 wq->do_work = data->do_work; 1160 1161 ret = -ENOMEM; 1162 1163 if (!alloc_cpumask_var(&wq->cpu_mask, GFP_KERNEL)) 1164 goto err; 1165 cpumask_copy(wq->cpu_mask, cpu_possible_mask); 1166 wq->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; 1167 wq->acct[IO_WQ_ACCT_UNBOUND].max_workers = 1168 task_rlimit(current, RLIMIT_NPROC); 1169 INIT_LIST_HEAD(&wq->wait.entry); 1170 wq->wait.func = io_wq_hash_wake; 1171 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1172 struct io_wq_acct *acct = &wq->acct[i]; 1173 1174 acct->index = i; 1175 atomic_set(&acct->nr_running, 0); 1176 INIT_WQ_LIST(&acct->work_list); 1177 raw_spin_lock_init(&acct->lock); 1178 } 1179 1180 raw_spin_lock_init(&wq->lock); 1181 INIT_HLIST_NULLS_HEAD(&wq->free_list, 0); 1182 INIT_LIST_HEAD(&wq->all_list); 1183 1184 wq->task = get_task_struct(data->task); 1185 atomic_set(&wq->worker_refs, 1); 1186 init_completion(&wq->worker_done); 1187 ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1188 if (ret) 1189 goto err; 1190 1191 return wq; 1192 err: 1193 io_wq_put_hash(data->hash); 1194 free_cpumask_var(wq->cpu_mask); 1195 kfree(wq); 1196 return ERR_PTR(ret); 1197 } 1198 1199 static bool io_task_work_match(struct callback_head *cb, void *data) 1200 { 1201 struct io_worker *worker; 1202 1203 if (cb->func != create_worker_cb && cb->func != create_worker_cont) 1204 return false; 1205 worker = container_of(cb, struct io_worker, create_work); 1206 return worker->wq == data; 1207 } 1208 1209 void io_wq_exit_start(struct io_wq *wq) 1210 { 1211 set_bit(IO_WQ_BIT_EXIT, &wq->state); 1212 } 1213 1214 static void io_wq_cancel_tw_create(struct io_wq *wq) 1215 { 1216 struct callback_head *cb; 1217 1218 while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) { 1219 struct io_worker *worker; 1220 1221 worker = container_of(cb, struct io_worker, create_work); 1222 io_worker_cancel_cb(worker); 1223 /* 1224 * Only the worker continuation helper has worker allocated and 1225 * hence needs freeing. 1226 */ 1227 if (cb->func == create_worker_cont) 1228 kfree(worker); 1229 } 1230 } 1231 1232 static void io_wq_exit_workers(struct io_wq *wq) 1233 { 1234 if (!wq->task) 1235 return; 1236 1237 io_wq_cancel_tw_create(wq); 1238 1239 rcu_read_lock(); 1240 io_wq_for_each_worker(wq, io_wq_worker_wake, NULL); 1241 rcu_read_unlock(); 1242 io_worker_ref_put(wq); 1243 wait_for_completion(&wq->worker_done); 1244 1245 spin_lock_irq(&wq->hash->wait.lock); 1246 list_del_init(&wq->wait.entry); 1247 spin_unlock_irq(&wq->hash->wait.lock); 1248 1249 put_task_struct(wq->task); 1250 wq->task = NULL; 1251 } 1252 1253 static void io_wq_destroy(struct io_wq *wq) 1254 { 1255 struct io_cb_cancel_data match = { 1256 .fn = io_wq_work_match_all, 1257 .cancel_all = true, 1258 }; 1259 1260 cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1261 io_wq_cancel_pending_work(wq, &match); 1262 free_cpumask_var(wq->cpu_mask); 1263 io_wq_put_hash(wq->hash); 1264 kfree(wq); 1265 } 1266 1267 void io_wq_put_and_exit(struct io_wq *wq) 1268 { 1269 WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state)); 1270 1271 io_wq_exit_workers(wq); 1272 io_wq_destroy(wq); 1273 } 1274 1275 struct online_data { 1276 unsigned int cpu; 1277 bool online; 1278 }; 1279 1280 static bool io_wq_worker_affinity(struct io_worker *worker, void *data) 1281 { 1282 struct online_data *od = data; 1283 1284 if (od->online) 1285 cpumask_set_cpu(od->cpu, worker->wq->cpu_mask); 1286 else 1287 cpumask_clear_cpu(od->cpu, worker->wq->cpu_mask); 1288 return false; 1289 } 1290 1291 static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online) 1292 { 1293 struct online_data od = { 1294 .cpu = cpu, 1295 .online = online 1296 }; 1297 1298 rcu_read_lock(); 1299 io_wq_for_each_worker(wq, io_wq_worker_affinity, &od); 1300 rcu_read_unlock(); 1301 return 0; 1302 } 1303 1304 static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node) 1305 { 1306 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1307 1308 return __io_wq_cpu_online(wq, cpu, true); 1309 } 1310 1311 static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node) 1312 { 1313 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1314 1315 return __io_wq_cpu_online(wq, cpu, false); 1316 } 1317 1318 int io_wq_cpu_affinity(struct io_uring_task *tctx, cpumask_var_t mask) 1319 { 1320 if (!tctx || !tctx->io_wq) 1321 return -EINVAL; 1322 1323 rcu_read_lock(); 1324 if (mask) 1325 cpumask_copy(tctx->io_wq->cpu_mask, mask); 1326 else 1327 cpumask_copy(tctx->io_wq->cpu_mask, cpu_possible_mask); 1328 rcu_read_unlock(); 1329 1330 return 0; 1331 } 1332 1333 /* 1334 * Set max number of unbounded workers, returns old value. If new_count is 0, 1335 * then just return the old value. 1336 */ 1337 int io_wq_max_workers(struct io_wq *wq, int *new_count) 1338 { 1339 struct io_wq_acct *acct; 1340 int prev[IO_WQ_ACCT_NR]; 1341 int i; 1342 1343 BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND); 1344 BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND); 1345 BUILD_BUG_ON((int) IO_WQ_ACCT_NR != 2); 1346 1347 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1348 if (new_count[i] > task_rlimit(current, RLIMIT_NPROC)) 1349 new_count[i] = task_rlimit(current, RLIMIT_NPROC); 1350 } 1351 1352 for (i = 0; i < IO_WQ_ACCT_NR; i++) 1353 prev[i] = 0; 1354 1355 rcu_read_lock(); 1356 1357 raw_spin_lock(&wq->lock); 1358 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1359 acct = &wq->acct[i]; 1360 prev[i] = max_t(int, acct->max_workers, prev[i]); 1361 if (new_count[i]) 1362 acct->max_workers = new_count[i]; 1363 } 1364 raw_spin_unlock(&wq->lock); 1365 rcu_read_unlock(); 1366 1367 for (i = 0; i < IO_WQ_ACCT_NR; i++) 1368 new_count[i] = prev[i]; 1369 1370 return 0; 1371 } 1372 1373 static __init int io_wq_init(void) 1374 { 1375 int ret; 1376 1377 ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online", 1378 io_wq_cpu_online, io_wq_cpu_offline); 1379 if (ret < 0) 1380 return ret; 1381 io_wq_online = ret; 1382 return 0; 1383 } 1384 subsys_initcall(io_wq_init); 1385