17642e668SJens Axboe // SPDX-License-Identifier: GPL-2.0 27642e668SJens Axboe /* 37642e668SJens Axboe * Task work handling for io_uring 47642e668SJens Axboe */ 57642e668SJens Axboe #include <linux/kernel.h> 67642e668SJens Axboe #include <linux/errno.h> 77642e668SJens Axboe #include <linux/sched/signal.h> 87642e668SJens Axboe #include <linux/io_uring.h> 97642e668SJens Axboe #include <linux/indirect_call_wrapper.h> 107642e668SJens Axboe 117642e668SJens Axboe #include "io_uring.h" 127642e668SJens Axboe #include "tctx.h" 137642e668SJens Axboe #include "poll.h" 147642e668SJens Axboe #include "rw.h" 157642e668SJens Axboe #include "eventfd.h" 160105b056SJens Axboe #include "wait.h" 177642e668SJens Axboe 187642e668SJens Axboe void io_fallback_req_func(struct work_struct *work) 197642e668SJens Axboe { 207642e668SJens Axboe struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx, 217642e668SJens Axboe fallback_work.work); 227642e668SJens Axboe struct llist_node *node = llist_del_all(&ctx->fallback_llist); 237642e668SJens Axboe struct io_kiocb *req, *tmp; 247642e668SJens Axboe struct io_tw_state ts = {}; 257642e668SJens Axboe 267642e668SJens Axboe percpu_ref_get(&ctx->refs); 277642e668SJens Axboe mutex_lock(&ctx->uring_lock); 287642e668SJens Axboe ts.cancel = io_should_terminate_tw(ctx); 297642e668SJens Axboe llist_for_each_entry_safe(req, tmp, node, io_task_work.node) 307642e668SJens Axboe req->io_task_work.func((struct io_tw_req){req}, ts); 317642e668SJens Axboe io_submit_flush_completions(ctx); 327642e668SJens Axboe mutex_unlock(&ctx->uring_lock); 337642e668SJens Axboe percpu_ref_put(&ctx->refs); 347642e668SJens Axboe } 357642e668SJens Axboe 367642e668SJens Axboe static void ctx_flush_and_put(struct io_ring_ctx *ctx, io_tw_token_t tw) 377642e668SJens Axboe { 387642e668SJens Axboe if (!ctx) 397642e668SJens Axboe return; 407642e668SJens Axboe if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) 417642e668SJens Axboe atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); 427642e668SJens Axboe 437642e668SJens Axboe io_submit_flush_completions(ctx); 447642e668SJens Axboe mutex_unlock(&ctx->uring_lock); 457642e668SJens Axboe percpu_ref_put(&ctx->refs); 467642e668SJens Axboe } 477642e668SJens Axboe 487642e668SJens Axboe /* 497642e668SJens Axboe * Run queued task_work, returning the number of entries processed in *count. 507642e668SJens Axboe * If more entries than max_entries are available, stop processing once this 517642e668SJens Axboe * is reached and return the rest of the list. 527642e668SJens Axboe */ 537642e668SJens Axboe struct llist_node *io_handle_tw_list(struct llist_node *node, 547642e668SJens Axboe unsigned int *count, 557642e668SJens Axboe unsigned int max_entries) 567642e668SJens Axboe { 577642e668SJens Axboe struct io_ring_ctx *ctx = NULL; 587642e668SJens Axboe struct io_tw_state ts = { }; 597642e668SJens Axboe 607642e668SJens Axboe do { 617642e668SJens Axboe struct llist_node *next = node->next; 627642e668SJens Axboe struct io_kiocb *req = container_of(node, struct io_kiocb, 637642e668SJens Axboe io_task_work.node); 647642e668SJens Axboe 657642e668SJens Axboe if (req->ctx != ctx) { 667642e668SJens Axboe ctx_flush_and_put(ctx, ts); 677642e668SJens Axboe ctx = req->ctx; 687642e668SJens Axboe mutex_lock(&ctx->uring_lock); 697642e668SJens Axboe percpu_ref_get(&ctx->refs); 707642e668SJens Axboe ts.cancel = io_should_terminate_tw(ctx); 717642e668SJens Axboe } 727642e668SJens Axboe INDIRECT_CALL_2(req->io_task_work.func, 737642e668SJens Axboe io_poll_task_func, io_req_rw_complete, 747642e668SJens Axboe (struct io_tw_req){req}, ts); 757642e668SJens Axboe node = next; 767642e668SJens Axboe (*count)++; 777642e668SJens Axboe if (unlikely(need_resched())) { 787642e668SJens Axboe ctx_flush_and_put(ctx, ts); 797642e668SJens Axboe ctx = NULL; 807642e668SJens Axboe cond_resched(); 817642e668SJens Axboe } 827642e668SJens Axboe } while (node && *count < max_entries); 837642e668SJens Axboe 847642e668SJens Axboe ctx_flush_and_put(ctx, ts); 857642e668SJens Axboe return node; 867642e668SJens Axboe } 877642e668SJens Axboe 887642e668SJens Axboe static __cold void __io_fallback_tw(struct llist_node *node, bool sync) 897642e668SJens Axboe { 907642e668SJens Axboe struct io_ring_ctx *last_ctx = NULL; 917642e668SJens Axboe struct io_kiocb *req; 927642e668SJens Axboe 937642e668SJens Axboe while (node) { 947642e668SJens Axboe req = container_of(node, struct io_kiocb, io_task_work.node); 957642e668SJens Axboe node = node->next; 967642e668SJens Axboe if (last_ctx != req->ctx) { 977642e668SJens Axboe if (last_ctx) { 987642e668SJens Axboe if (sync) 997642e668SJens Axboe flush_delayed_work(&last_ctx->fallback_work); 1007642e668SJens Axboe percpu_ref_put(&last_ctx->refs); 1017642e668SJens Axboe } 1027642e668SJens Axboe last_ctx = req->ctx; 1037642e668SJens Axboe percpu_ref_get(&last_ctx->refs); 1047642e668SJens Axboe } 1057642e668SJens Axboe if (llist_add(&req->io_task_work.node, &last_ctx->fallback_llist)) 1067642e668SJens Axboe schedule_delayed_work(&last_ctx->fallback_work, 1); 1077642e668SJens Axboe } 1087642e668SJens Axboe 1097642e668SJens Axboe if (last_ctx) { 1107642e668SJens Axboe if (sync) 1117642e668SJens Axboe flush_delayed_work(&last_ctx->fallback_work); 1127642e668SJens Axboe percpu_ref_put(&last_ctx->refs); 1137642e668SJens Axboe } 1147642e668SJens Axboe } 1157642e668SJens Axboe 1167642e668SJens Axboe static void io_fallback_tw(struct io_uring_task *tctx, bool sync) 1177642e668SJens Axboe { 1187642e668SJens Axboe struct llist_node *node = llist_del_all(&tctx->task_list); 1197642e668SJens Axboe 1207642e668SJens Axboe __io_fallback_tw(node, sync); 1217642e668SJens Axboe } 1227642e668SJens Axboe 1237642e668SJens Axboe struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, 1247642e668SJens Axboe unsigned int max_entries, 1257642e668SJens Axboe unsigned int *count) 1267642e668SJens Axboe { 1277642e668SJens Axboe struct llist_node *node; 1287642e668SJens Axboe 1297642e668SJens Axboe node = llist_del_all(&tctx->task_list); 1307642e668SJens Axboe if (node) { 1317642e668SJens Axboe node = llist_reverse_order(node); 1327642e668SJens Axboe node = io_handle_tw_list(node, count, max_entries); 1337642e668SJens Axboe } 1347642e668SJens Axboe 1357642e668SJens Axboe /* relaxed read is enough as only the task itself sets ->in_cancel */ 1367642e668SJens Axboe if (unlikely(atomic_read(&tctx->in_cancel))) 1377642e668SJens Axboe io_uring_drop_tctx_refs(current); 1387642e668SJens Axboe 1397642e668SJens Axboe trace_io_uring_task_work_run(tctx, *count); 1407642e668SJens Axboe return node; 1417642e668SJens Axboe } 1427642e668SJens Axboe 1437642e668SJens Axboe void tctx_task_work(struct callback_head *cb) 1447642e668SJens Axboe { 1457642e668SJens Axboe struct io_uring_task *tctx; 1467642e668SJens Axboe struct llist_node *ret; 1477642e668SJens Axboe unsigned int count = 0; 1487642e668SJens Axboe 1497642e668SJens Axboe tctx = container_of(cb, struct io_uring_task, task_work); 1507642e668SJens Axboe ret = tctx_task_work_run(tctx, UINT_MAX, &count); 1517642e668SJens Axboe /* can't happen */ 1527642e668SJens Axboe WARN_ON_ONCE(ret); 1537642e668SJens Axboe } 1547642e668SJens Axboe 155*96189080SJens Axboe /* 156*96189080SJens Axboe * Sets IORING_SQ_TASKRUN in the sq_flags shared with userspace, using the 157*96189080SJens Axboe * RCU protected rings pointer to be safe against concurrent ring resizing. 158*96189080SJens Axboe */ 159*96189080SJens Axboe static void io_ctx_mark_taskrun(struct io_ring_ctx *ctx) 160*96189080SJens Axboe { 161*96189080SJens Axboe lockdep_assert_in_rcu_read_lock(); 162*96189080SJens Axboe 163*96189080SJens Axboe if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) { 164*96189080SJens Axboe struct io_rings *rings = rcu_dereference(ctx->rings_rcu); 165*96189080SJens Axboe 166*96189080SJens Axboe atomic_or(IORING_SQ_TASKRUN, &rings->sq_flags); 167*96189080SJens Axboe } 168*96189080SJens Axboe } 169*96189080SJens Axboe 1707642e668SJens Axboe void io_req_local_work_add(struct io_kiocb *req, unsigned flags) 1717642e668SJens Axboe { 1727642e668SJens Axboe struct io_ring_ctx *ctx = req->ctx; 1737642e668SJens Axboe unsigned nr_wait, nr_tw, nr_tw_prev; 1747642e668SJens Axboe struct llist_node *head; 1757642e668SJens Axboe 1767642e668SJens Axboe /* See comment above IO_CQ_WAKE_INIT */ 1777642e668SJens Axboe BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES); 1787642e668SJens Axboe 1797642e668SJens Axboe /* 1807642e668SJens Axboe * We don't know how many requests there are in the link and whether 1817642e668SJens Axboe * they can even be queued lazily, fall back to non-lazy. 1827642e668SJens Axboe */ 1837642e668SJens Axboe if (req->flags & IO_REQ_LINK_FLAGS) 1847642e668SJens Axboe flags &= ~IOU_F_TWQ_LAZY_WAKE; 1857642e668SJens Axboe 1867642e668SJens Axboe guard(rcu)(); 1877642e668SJens Axboe 1887642e668SJens Axboe head = READ_ONCE(ctx->work_llist.first); 1897642e668SJens Axboe do { 1907642e668SJens Axboe nr_tw_prev = 0; 1917642e668SJens Axboe if (head) { 1927642e668SJens Axboe struct io_kiocb *first_req = container_of(head, 1937642e668SJens Axboe struct io_kiocb, 1947642e668SJens Axboe io_task_work.node); 1957642e668SJens Axboe /* 1967642e668SJens Axboe * Might be executed at any moment, rely on 1977642e668SJens Axboe * SLAB_TYPESAFE_BY_RCU to keep it alive. 1987642e668SJens Axboe */ 1997642e668SJens Axboe nr_tw_prev = READ_ONCE(first_req->nr_tw); 2007642e668SJens Axboe } 2017642e668SJens Axboe 2027642e668SJens Axboe /* 2037642e668SJens Axboe * Theoretically, it can overflow, but that's fine as one of 2047642e668SJens Axboe * previous adds should've tried to wake the task. 2057642e668SJens Axboe */ 2067642e668SJens Axboe nr_tw = nr_tw_prev + 1; 2077642e668SJens Axboe if (!(flags & IOU_F_TWQ_LAZY_WAKE)) 2087642e668SJens Axboe nr_tw = IO_CQ_WAKE_FORCE; 2097642e668SJens Axboe 2107642e668SJens Axboe req->nr_tw = nr_tw; 2117642e668SJens Axboe req->io_task_work.node.next = head; 2127642e668SJens Axboe } while (!try_cmpxchg(&ctx->work_llist.first, &head, 2137642e668SJens Axboe &req->io_task_work.node)); 2147642e668SJens Axboe 2157642e668SJens Axboe /* 2167642e668SJens Axboe * cmpxchg implies a full barrier, which pairs with the barrier 2177642e668SJens Axboe * in set_current_state() on the io_cqring_wait() side. It's used 2187642e668SJens Axboe * to ensure that either we see updated ->cq_wait_nr, or waiters 2197642e668SJens Axboe * going to sleep will observe the work added to the list, which 2207642e668SJens Axboe * is similar to the wait/wawke task state sync. 2217642e668SJens Axboe */ 2227642e668SJens Axboe 2237642e668SJens Axboe if (!head) { 224*96189080SJens Axboe io_ctx_mark_taskrun(ctx); 2257642e668SJens Axboe if (ctx->has_evfd) 2267642e668SJens Axboe io_eventfd_signal(ctx, false); 2277642e668SJens Axboe } 2287642e668SJens Axboe 2297642e668SJens Axboe nr_wait = atomic_read(&ctx->cq_wait_nr); 2307642e668SJens Axboe /* not enough or no one is waiting */ 2317642e668SJens Axboe if (nr_tw < nr_wait) 2327642e668SJens Axboe return; 2337642e668SJens Axboe /* the previous add has already woken it up */ 2347642e668SJens Axboe if (nr_tw_prev >= nr_wait) 2357642e668SJens Axboe return; 2367642e668SJens Axboe wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE); 2377642e668SJens Axboe } 2387642e668SJens Axboe 2397642e668SJens Axboe void io_req_normal_work_add(struct io_kiocb *req) 2407642e668SJens Axboe { 2417642e668SJens Axboe struct io_uring_task *tctx = req->tctx; 2427642e668SJens Axboe struct io_ring_ctx *ctx = req->ctx; 2437642e668SJens Axboe 2447642e668SJens Axboe /* task_work already pending, we're done */ 2457642e668SJens Axboe if (!llist_add(&req->io_task_work.node, &tctx->task_list)) 2467642e668SJens Axboe return; 2477642e668SJens Axboe 248*96189080SJens Axboe /* 249*96189080SJens Axboe * Doesn't need to use ->rings_rcu, as resizing isn't supported for 250*96189080SJens Axboe * !DEFER_TASKRUN. 251*96189080SJens Axboe */ 2527642e668SJens Axboe if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) 2537642e668SJens Axboe atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); 2547642e668SJens Axboe 2557642e668SJens Axboe /* SQPOLL doesn't need the task_work added, it'll run it itself */ 2567642e668SJens Axboe if (ctx->flags & IORING_SETUP_SQPOLL) { 2577642e668SJens Axboe __set_notify_signal(tctx->task); 2587642e668SJens Axboe return; 2597642e668SJens Axboe } 2607642e668SJens Axboe 2617642e668SJens Axboe if (likely(!task_work_add(tctx->task, &tctx->task_work, ctx->notify_method))) 2627642e668SJens Axboe return; 2637642e668SJens Axboe 2647642e668SJens Axboe io_fallback_tw(tctx, false); 2657642e668SJens Axboe } 2667642e668SJens Axboe 2677642e668SJens Axboe void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags) 2687642e668SJens Axboe { 2697642e668SJens Axboe if (WARN_ON_ONCE(!(req->ctx->flags & IORING_SETUP_DEFER_TASKRUN))) 2707642e668SJens Axboe return; 2717642e668SJens Axboe __io_req_task_work_add(req, flags); 2727642e668SJens Axboe } 2737642e668SJens Axboe 2747642e668SJens Axboe void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx) 2757642e668SJens Axboe { 2767642e668SJens Axboe struct llist_node *node = llist_del_all(&ctx->work_llist); 2777642e668SJens Axboe 2787642e668SJens Axboe __io_fallback_tw(node, false); 2797642e668SJens Axboe node = llist_del_all(&ctx->retry_llist); 2807642e668SJens Axboe __io_fallback_tw(node, false); 2817642e668SJens Axboe } 2827642e668SJens Axboe 2837642e668SJens Axboe static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events, 2847642e668SJens Axboe int min_events) 2857642e668SJens Axboe { 2867642e668SJens Axboe if (!io_local_work_pending(ctx)) 2877642e668SJens Axboe return false; 2887642e668SJens Axboe if (events < min_events) 2897642e668SJens Axboe return true; 2907642e668SJens Axboe if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) 2917642e668SJens Axboe atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); 2927642e668SJens Axboe return false; 2937642e668SJens Axboe } 2947642e668SJens Axboe 2957642e668SJens Axboe static int __io_run_local_work_loop(struct llist_node **node, 2967642e668SJens Axboe io_tw_token_t tw, 2977642e668SJens Axboe int events) 2987642e668SJens Axboe { 2997642e668SJens Axboe int ret = 0; 3007642e668SJens Axboe 3017642e668SJens Axboe while (*node) { 3027642e668SJens Axboe struct llist_node *next = (*node)->next; 3037642e668SJens Axboe struct io_kiocb *req = container_of(*node, struct io_kiocb, 3047642e668SJens Axboe io_task_work.node); 3057642e668SJens Axboe INDIRECT_CALL_2(req->io_task_work.func, 3067642e668SJens Axboe io_poll_task_func, io_req_rw_complete, 3077642e668SJens Axboe (struct io_tw_req){req}, tw); 3087642e668SJens Axboe *node = next; 3097642e668SJens Axboe if (++ret >= events) 3107642e668SJens Axboe break; 3117642e668SJens Axboe } 3127642e668SJens Axboe 3137642e668SJens Axboe return ret; 3147642e668SJens Axboe } 3157642e668SJens Axboe 3167642e668SJens Axboe static int __io_run_local_work(struct io_ring_ctx *ctx, io_tw_token_t tw, 3177642e668SJens Axboe int min_events, int max_events) 3187642e668SJens Axboe { 3197642e668SJens Axboe struct llist_node *node; 3207642e668SJens Axboe unsigned int loops = 0; 3217642e668SJens Axboe int ret = 0; 3227642e668SJens Axboe 3237642e668SJens Axboe if (WARN_ON_ONCE(ctx->submitter_task != current)) 3247642e668SJens Axboe return -EEXIST; 3257642e668SJens Axboe if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) 3267642e668SJens Axboe atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); 3277642e668SJens Axboe again: 3287642e668SJens Axboe tw.cancel = io_should_terminate_tw(ctx); 3297642e668SJens Axboe min_events -= ret; 3307642e668SJens Axboe ret = __io_run_local_work_loop(&ctx->retry_llist.first, tw, max_events); 3317642e668SJens Axboe if (ctx->retry_llist.first) 3327642e668SJens Axboe goto retry_done; 3337642e668SJens Axboe 3347642e668SJens Axboe /* 3357642e668SJens Axboe * llists are in reverse order, flip it back the right way before 3367642e668SJens Axboe * running the pending items. 3377642e668SJens Axboe */ 3387642e668SJens Axboe node = llist_reverse_order(llist_del_all(&ctx->work_llist)); 3397642e668SJens Axboe ret += __io_run_local_work_loop(&node, tw, max_events - ret); 3407642e668SJens Axboe ctx->retry_llist.first = node; 3417642e668SJens Axboe loops++; 3427642e668SJens Axboe 3437642e668SJens Axboe if (io_run_local_work_continue(ctx, ret, min_events)) 3447642e668SJens Axboe goto again; 3457642e668SJens Axboe retry_done: 3467642e668SJens Axboe io_submit_flush_completions(ctx); 3477642e668SJens Axboe if (io_run_local_work_continue(ctx, ret, min_events)) 3487642e668SJens Axboe goto again; 3497642e668SJens Axboe 3507642e668SJens Axboe trace_io_uring_local_work_run(ctx, ret, loops); 3517642e668SJens Axboe return ret; 3527642e668SJens Axboe } 3537642e668SJens Axboe 3547642e668SJens Axboe int io_run_local_work_locked(struct io_ring_ctx *ctx, int min_events) 3557642e668SJens Axboe { 3567642e668SJens Axboe struct io_tw_state ts = {}; 3577642e668SJens Axboe 3587642e668SJens Axboe if (!io_local_work_pending(ctx)) 3597642e668SJens Axboe return 0; 3607642e668SJens Axboe return __io_run_local_work(ctx, ts, min_events, 3617642e668SJens Axboe max(IO_LOCAL_TW_DEFAULT_MAX, min_events)); 3627642e668SJens Axboe } 3637642e668SJens Axboe 3647642e668SJens Axboe int io_run_local_work(struct io_ring_ctx *ctx, int min_events, int max_events) 3657642e668SJens Axboe { 3667642e668SJens Axboe struct io_tw_state ts = {}; 3677642e668SJens Axboe int ret; 3687642e668SJens Axboe 3697642e668SJens Axboe mutex_lock(&ctx->uring_lock); 3707642e668SJens Axboe ret = __io_run_local_work(ctx, ts, min_events, max_events); 3717642e668SJens Axboe mutex_unlock(&ctx->uring_lock); 3727642e668SJens Axboe return ret; 3737642e668SJens Axboe } 374