xref: /linux/io_uring/tw.c (revision 96189080265e6bb5dde3a4afbaf947af493e3f82)
17642e668SJens Axboe // SPDX-License-Identifier: GPL-2.0
27642e668SJens Axboe /*
37642e668SJens Axboe  * Task work handling for io_uring
47642e668SJens Axboe  */
57642e668SJens Axboe #include <linux/kernel.h>
67642e668SJens Axboe #include <linux/errno.h>
77642e668SJens Axboe #include <linux/sched/signal.h>
87642e668SJens Axboe #include <linux/io_uring.h>
97642e668SJens Axboe #include <linux/indirect_call_wrapper.h>
107642e668SJens Axboe 
117642e668SJens Axboe #include "io_uring.h"
127642e668SJens Axboe #include "tctx.h"
137642e668SJens Axboe #include "poll.h"
147642e668SJens Axboe #include "rw.h"
157642e668SJens Axboe #include "eventfd.h"
160105b056SJens Axboe #include "wait.h"
177642e668SJens Axboe 
187642e668SJens Axboe void io_fallback_req_func(struct work_struct *work)
197642e668SJens Axboe {
207642e668SJens Axboe 	struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx,
217642e668SJens Axboe 						fallback_work.work);
227642e668SJens Axboe 	struct llist_node *node = llist_del_all(&ctx->fallback_llist);
237642e668SJens Axboe 	struct io_kiocb *req, *tmp;
247642e668SJens Axboe 	struct io_tw_state ts = {};
257642e668SJens Axboe 
267642e668SJens Axboe 	percpu_ref_get(&ctx->refs);
277642e668SJens Axboe 	mutex_lock(&ctx->uring_lock);
287642e668SJens Axboe 	ts.cancel = io_should_terminate_tw(ctx);
297642e668SJens Axboe 	llist_for_each_entry_safe(req, tmp, node, io_task_work.node)
307642e668SJens Axboe 		req->io_task_work.func((struct io_tw_req){req}, ts);
317642e668SJens Axboe 	io_submit_flush_completions(ctx);
327642e668SJens Axboe 	mutex_unlock(&ctx->uring_lock);
337642e668SJens Axboe 	percpu_ref_put(&ctx->refs);
347642e668SJens Axboe }
357642e668SJens Axboe 
367642e668SJens Axboe static void ctx_flush_and_put(struct io_ring_ctx *ctx, io_tw_token_t tw)
377642e668SJens Axboe {
387642e668SJens Axboe 	if (!ctx)
397642e668SJens Axboe 		return;
407642e668SJens Axboe 	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
417642e668SJens Axboe 		atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
427642e668SJens Axboe 
437642e668SJens Axboe 	io_submit_flush_completions(ctx);
447642e668SJens Axboe 	mutex_unlock(&ctx->uring_lock);
457642e668SJens Axboe 	percpu_ref_put(&ctx->refs);
467642e668SJens Axboe }
477642e668SJens Axboe 
487642e668SJens Axboe /*
497642e668SJens Axboe  * Run queued task_work, returning the number of entries processed in *count.
507642e668SJens Axboe  * If more entries than max_entries are available, stop processing once this
517642e668SJens Axboe  * is reached and return the rest of the list.
527642e668SJens Axboe  */
537642e668SJens Axboe struct llist_node *io_handle_tw_list(struct llist_node *node,
547642e668SJens Axboe 				     unsigned int *count,
557642e668SJens Axboe 				     unsigned int max_entries)
567642e668SJens Axboe {
577642e668SJens Axboe 	struct io_ring_ctx *ctx = NULL;
587642e668SJens Axboe 	struct io_tw_state ts = { };
597642e668SJens Axboe 
607642e668SJens Axboe 	do {
617642e668SJens Axboe 		struct llist_node *next = node->next;
627642e668SJens Axboe 		struct io_kiocb *req = container_of(node, struct io_kiocb,
637642e668SJens Axboe 						    io_task_work.node);
647642e668SJens Axboe 
657642e668SJens Axboe 		if (req->ctx != ctx) {
667642e668SJens Axboe 			ctx_flush_and_put(ctx, ts);
677642e668SJens Axboe 			ctx = req->ctx;
687642e668SJens Axboe 			mutex_lock(&ctx->uring_lock);
697642e668SJens Axboe 			percpu_ref_get(&ctx->refs);
707642e668SJens Axboe 			ts.cancel = io_should_terminate_tw(ctx);
717642e668SJens Axboe 		}
727642e668SJens Axboe 		INDIRECT_CALL_2(req->io_task_work.func,
737642e668SJens Axboe 				io_poll_task_func, io_req_rw_complete,
747642e668SJens Axboe 				(struct io_tw_req){req}, ts);
757642e668SJens Axboe 		node = next;
767642e668SJens Axboe 		(*count)++;
777642e668SJens Axboe 		if (unlikely(need_resched())) {
787642e668SJens Axboe 			ctx_flush_and_put(ctx, ts);
797642e668SJens Axboe 			ctx = NULL;
807642e668SJens Axboe 			cond_resched();
817642e668SJens Axboe 		}
827642e668SJens Axboe 	} while (node && *count < max_entries);
837642e668SJens Axboe 
847642e668SJens Axboe 	ctx_flush_and_put(ctx, ts);
857642e668SJens Axboe 	return node;
867642e668SJens Axboe }
877642e668SJens Axboe 
887642e668SJens Axboe static __cold void __io_fallback_tw(struct llist_node *node, bool sync)
897642e668SJens Axboe {
907642e668SJens Axboe 	struct io_ring_ctx *last_ctx = NULL;
917642e668SJens Axboe 	struct io_kiocb *req;
927642e668SJens Axboe 
937642e668SJens Axboe 	while (node) {
947642e668SJens Axboe 		req = container_of(node, struct io_kiocb, io_task_work.node);
957642e668SJens Axboe 		node = node->next;
967642e668SJens Axboe 		if (last_ctx != req->ctx) {
977642e668SJens Axboe 			if (last_ctx) {
987642e668SJens Axboe 				if (sync)
997642e668SJens Axboe 					flush_delayed_work(&last_ctx->fallback_work);
1007642e668SJens Axboe 				percpu_ref_put(&last_ctx->refs);
1017642e668SJens Axboe 			}
1027642e668SJens Axboe 			last_ctx = req->ctx;
1037642e668SJens Axboe 			percpu_ref_get(&last_ctx->refs);
1047642e668SJens Axboe 		}
1057642e668SJens Axboe 		if (llist_add(&req->io_task_work.node, &last_ctx->fallback_llist))
1067642e668SJens Axboe 			schedule_delayed_work(&last_ctx->fallback_work, 1);
1077642e668SJens Axboe 	}
1087642e668SJens Axboe 
1097642e668SJens Axboe 	if (last_ctx) {
1107642e668SJens Axboe 		if (sync)
1117642e668SJens Axboe 			flush_delayed_work(&last_ctx->fallback_work);
1127642e668SJens Axboe 		percpu_ref_put(&last_ctx->refs);
1137642e668SJens Axboe 	}
1147642e668SJens Axboe }
1157642e668SJens Axboe 
1167642e668SJens Axboe static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
1177642e668SJens Axboe {
1187642e668SJens Axboe 	struct llist_node *node = llist_del_all(&tctx->task_list);
1197642e668SJens Axboe 
1207642e668SJens Axboe 	__io_fallback_tw(node, sync);
1217642e668SJens Axboe }
1227642e668SJens Axboe 
1237642e668SJens Axboe struct llist_node *tctx_task_work_run(struct io_uring_task *tctx,
1247642e668SJens Axboe 				      unsigned int max_entries,
1257642e668SJens Axboe 				      unsigned int *count)
1267642e668SJens Axboe {
1277642e668SJens Axboe 	struct llist_node *node;
1287642e668SJens Axboe 
1297642e668SJens Axboe 	node = llist_del_all(&tctx->task_list);
1307642e668SJens Axboe 	if (node) {
1317642e668SJens Axboe 		node = llist_reverse_order(node);
1327642e668SJens Axboe 		node = io_handle_tw_list(node, count, max_entries);
1337642e668SJens Axboe 	}
1347642e668SJens Axboe 
1357642e668SJens Axboe 	/* relaxed read is enough as only the task itself sets ->in_cancel */
1367642e668SJens Axboe 	if (unlikely(atomic_read(&tctx->in_cancel)))
1377642e668SJens Axboe 		io_uring_drop_tctx_refs(current);
1387642e668SJens Axboe 
1397642e668SJens Axboe 	trace_io_uring_task_work_run(tctx, *count);
1407642e668SJens Axboe 	return node;
1417642e668SJens Axboe }
1427642e668SJens Axboe 
1437642e668SJens Axboe void tctx_task_work(struct callback_head *cb)
1447642e668SJens Axboe {
1457642e668SJens Axboe 	struct io_uring_task *tctx;
1467642e668SJens Axboe 	struct llist_node *ret;
1477642e668SJens Axboe 	unsigned int count = 0;
1487642e668SJens Axboe 
1497642e668SJens Axboe 	tctx = container_of(cb, struct io_uring_task, task_work);
1507642e668SJens Axboe 	ret = tctx_task_work_run(tctx, UINT_MAX, &count);
1517642e668SJens Axboe 	/* can't happen */
1527642e668SJens Axboe 	WARN_ON_ONCE(ret);
1537642e668SJens Axboe }
1547642e668SJens Axboe 
155*96189080SJens Axboe /*
156*96189080SJens Axboe  * Sets IORING_SQ_TASKRUN in the sq_flags shared with userspace, using the
157*96189080SJens Axboe  * RCU protected rings pointer to be safe against concurrent ring resizing.
158*96189080SJens Axboe  */
159*96189080SJens Axboe static void io_ctx_mark_taskrun(struct io_ring_ctx *ctx)
160*96189080SJens Axboe {
161*96189080SJens Axboe 	lockdep_assert_in_rcu_read_lock();
162*96189080SJens Axboe 
163*96189080SJens Axboe 	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) {
164*96189080SJens Axboe 		struct io_rings *rings = rcu_dereference(ctx->rings_rcu);
165*96189080SJens Axboe 
166*96189080SJens Axboe 		atomic_or(IORING_SQ_TASKRUN, &rings->sq_flags);
167*96189080SJens Axboe 	}
168*96189080SJens Axboe }
169*96189080SJens Axboe 
1707642e668SJens Axboe void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
1717642e668SJens Axboe {
1727642e668SJens Axboe 	struct io_ring_ctx *ctx = req->ctx;
1737642e668SJens Axboe 	unsigned nr_wait, nr_tw, nr_tw_prev;
1747642e668SJens Axboe 	struct llist_node *head;
1757642e668SJens Axboe 
1767642e668SJens Axboe 	/* See comment above IO_CQ_WAKE_INIT */
1777642e668SJens Axboe 	BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
1787642e668SJens Axboe 
1797642e668SJens Axboe 	/*
1807642e668SJens Axboe 	 * We don't know how many requests there are in the link and whether
1817642e668SJens Axboe 	 * they can even be queued lazily, fall back to non-lazy.
1827642e668SJens Axboe 	 */
1837642e668SJens Axboe 	if (req->flags & IO_REQ_LINK_FLAGS)
1847642e668SJens Axboe 		flags &= ~IOU_F_TWQ_LAZY_WAKE;
1857642e668SJens Axboe 
1867642e668SJens Axboe 	guard(rcu)();
1877642e668SJens Axboe 
1887642e668SJens Axboe 	head = READ_ONCE(ctx->work_llist.first);
1897642e668SJens Axboe 	do {
1907642e668SJens Axboe 		nr_tw_prev = 0;
1917642e668SJens Axboe 		if (head) {
1927642e668SJens Axboe 			struct io_kiocb *first_req = container_of(head,
1937642e668SJens Axboe 							struct io_kiocb,
1947642e668SJens Axboe 							io_task_work.node);
1957642e668SJens Axboe 			/*
1967642e668SJens Axboe 			 * Might be executed at any moment, rely on
1977642e668SJens Axboe 			 * SLAB_TYPESAFE_BY_RCU to keep it alive.
1987642e668SJens Axboe 			 */
1997642e668SJens Axboe 			nr_tw_prev = READ_ONCE(first_req->nr_tw);
2007642e668SJens Axboe 		}
2017642e668SJens Axboe 
2027642e668SJens Axboe 		/*
2037642e668SJens Axboe 		 * Theoretically, it can overflow, but that's fine as one of
2047642e668SJens Axboe 		 * previous adds should've tried to wake the task.
2057642e668SJens Axboe 		 */
2067642e668SJens Axboe 		nr_tw = nr_tw_prev + 1;
2077642e668SJens Axboe 		if (!(flags & IOU_F_TWQ_LAZY_WAKE))
2087642e668SJens Axboe 			nr_tw = IO_CQ_WAKE_FORCE;
2097642e668SJens Axboe 
2107642e668SJens Axboe 		req->nr_tw = nr_tw;
2117642e668SJens Axboe 		req->io_task_work.node.next = head;
2127642e668SJens Axboe 	} while (!try_cmpxchg(&ctx->work_llist.first, &head,
2137642e668SJens Axboe 			      &req->io_task_work.node));
2147642e668SJens Axboe 
2157642e668SJens Axboe 	/*
2167642e668SJens Axboe 	 * cmpxchg implies a full barrier, which pairs with the barrier
2177642e668SJens Axboe 	 * in set_current_state() on the io_cqring_wait() side. It's used
2187642e668SJens Axboe 	 * to ensure that either we see updated ->cq_wait_nr, or waiters
2197642e668SJens Axboe 	 * going to sleep will observe the work added to the list, which
2207642e668SJens Axboe 	 * is similar to the wait/wawke task state sync.
2217642e668SJens Axboe 	 */
2227642e668SJens Axboe 
2237642e668SJens Axboe 	if (!head) {
224*96189080SJens Axboe 		io_ctx_mark_taskrun(ctx);
2257642e668SJens Axboe 		if (ctx->has_evfd)
2267642e668SJens Axboe 			io_eventfd_signal(ctx, false);
2277642e668SJens Axboe 	}
2287642e668SJens Axboe 
2297642e668SJens Axboe 	nr_wait = atomic_read(&ctx->cq_wait_nr);
2307642e668SJens Axboe 	/* not enough or no one is waiting */
2317642e668SJens Axboe 	if (nr_tw < nr_wait)
2327642e668SJens Axboe 		return;
2337642e668SJens Axboe 	/* the previous add has already woken it up */
2347642e668SJens Axboe 	if (nr_tw_prev >= nr_wait)
2357642e668SJens Axboe 		return;
2367642e668SJens Axboe 	wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
2377642e668SJens Axboe }
2387642e668SJens Axboe 
2397642e668SJens Axboe void io_req_normal_work_add(struct io_kiocb *req)
2407642e668SJens Axboe {
2417642e668SJens Axboe 	struct io_uring_task *tctx = req->tctx;
2427642e668SJens Axboe 	struct io_ring_ctx *ctx = req->ctx;
2437642e668SJens Axboe 
2447642e668SJens Axboe 	/* task_work already pending, we're done */
2457642e668SJens Axboe 	if (!llist_add(&req->io_task_work.node, &tctx->task_list))
2467642e668SJens Axboe 		return;
2477642e668SJens Axboe 
248*96189080SJens Axboe 	/*
249*96189080SJens Axboe 	 * Doesn't need to use ->rings_rcu, as resizing isn't supported for
250*96189080SJens Axboe 	 * !DEFER_TASKRUN.
251*96189080SJens Axboe 	 */
2527642e668SJens Axboe 	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
2537642e668SJens Axboe 		atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
2547642e668SJens Axboe 
2557642e668SJens Axboe 	/* SQPOLL doesn't need the task_work added, it'll run it itself */
2567642e668SJens Axboe 	if (ctx->flags & IORING_SETUP_SQPOLL) {
2577642e668SJens Axboe 		__set_notify_signal(tctx->task);
2587642e668SJens Axboe 		return;
2597642e668SJens Axboe 	}
2607642e668SJens Axboe 
2617642e668SJens Axboe 	if (likely(!task_work_add(tctx->task, &tctx->task_work, ctx->notify_method)))
2627642e668SJens Axboe 		return;
2637642e668SJens Axboe 
2647642e668SJens Axboe 	io_fallback_tw(tctx, false);
2657642e668SJens Axboe }
2667642e668SJens Axboe 
2677642e668SJens Axboe void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags)
2687642e668SJens Axboe {
2697642e668SJens Axboe 	if (WARN_ON_ONCE(!(req->ctx->flags & IORING_SETUP_DEFER_TASKRUN)))
2707642e668SJens Axboe 		return;
2717642e668SJens Axboe 	__io_req_task_work_add(req, flags);
2727642e668SJens Axboe }
2737642e668SJens Axboe 
2747642e668SJens Axboe void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
2757642e668SJens Axboe {
2767642e668SJens Axboe 	struct llist_node *node = llist_del_all(&ctx->work_llist);
2777642e668SJens Axboe 
2787642e668SJens Axboe 	__io_fallback_tw(node, false);
2797642e668SJens Axboe 	node = llist_del_all(&ctx->retry_llist);
2807642e668SJens Axboe 	__io_fallback_tw(node, false);
2817642e668SJens Axboe }
2827642e668SJens Axboe 
2837642e668SJens Axboe static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
2847642e668SJens Axboe 				       int min_events)
2857642e668SJens Axboe {
2867642e668SJens Axboe 	if (!io_local_work_pending(ctx))
2877642e668SJens Axboe 		return false;
2887642e668SJens Axboe 	if (events < min_events)
2897642e668SJens Axboe 		return true;
2907642e668SJens Axboe 	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
2917642e668SJens Axboe 		atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
2927642e668SJens Axboe 	return false;
2937642e668SJens Axboe }
2947642e668SJens Axboe 
2957642e668SJens Axboe static int __io_run_local_work_loop(struct llist_node **node,
2967642e668SJens Axboe 				    io_tw_token_t tw,
2977642e668SJens Axboe 				    int events)
2987642e668SJens Axboe {
2997642e668SJens Axboe 	int ret = 0;
3007642e668SJens Axboe 
3017642e668SJens Axboe 	while (*node) {
3027642e668SJens Axboe 		struct llist_node *next = (*node)->next;
3037642e668SJens Axboe 		struct io_kiocb *req = container_of(*node, struct io_kiocb,
3047642e668SJens Axboe 						    io_task_work.node);
3057642e668SJens Axboe 		INDIRECT_CALL_2(req->io_task_work.func,
3067642e668SJens Axboe 				io_poll_task_func, io_req_rw_complete,
3077642e668SJens Axboe 				(struct io_tw_req){req}, tw);
3087642e668SJens Axboe 		*node = next;
3097642e668SJens Axboe 		if (++ret >= events)
3107642e668SJens Axboe 			break;
3117642e668SJens Axboe 	}
3127642e668SJens Axboe 
3137642e668SJens Axboe 	return ret;
3147642e668SJens Axboe }
3157642e668SJens Axboe 
3167642e668SJens Axboe static int __io_run_local_work(struct io_ring_ctx *ctx, io_tw_token_t tw,
3177642e668SJens Axboe 			       int min_events, int max_events)
3187642e668SJens Axboe {
3197642e668SJens Axboe 	struct llist_node *node;
3207642e668SJens Axboe 	unsigned int loops = 0;
3217642e668SJens Axboe 	int ret = 0;
3227642e668SJens Axboe 
3237642e668SJens Axboe 	if (WARN_ON_ONCE(ctx->submitter_task != current))
3247642e668SJens Axboe 		return -EEXIST;
3257642e668SJens Axboe 	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
3267642e668SJens Axboe 		atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
3277642e668SJens Axboe again:
3287642e668SJens Axboe 	tw.cancel = io_should_terminate_tw(ctx);
3297642e668SJens Axboe 	min_events -= ret;
3307642e668SJens Axboe 	ret = __io_run_local_work_loop(&ctx->retry_llist.first, tw, max_events);
3317642e668SJens Axboe 	if (ctx->retry_llist.first)
3327642e668SJens Axboe 		goto retry_done;
3337642e668SJens Axboe 
3347642e668SJens Axboe 	/*
3357642e668SJens Axboe 	 * llists are in reverse order, flip it back the right way before
3367642e668SJens Axboe 	 * running the pending items.
3377642e668SJens Axboe 	 */
3387642e668SJens Axboe 	node = llist_reverse_order(llist_del_all(&ctx->work_llist));
3397642e668SJens Axboe 	ret += __io_run_local_work_loop(&node, tw, max_events - ret);
3407642e668SJens Axboe 	ctx->retry_llist.first = node;
3417642e668SJens Axboe 	loops++;
3427642e668SJens Axboe 
3437642e668SJens Axboe 	if (io_run_local_work_continue(ctx, ret, min_events))
3447642e668SJens Axboe 		goto again;
3457642e668SJens Axboe retry_done:
3467642e668SJens Axboe 	io_submit_flush_completions(ctx);
3477642e668SJens Axboe 	if (io_run_local_work_continue(ctx, ret, min_events))
3487642e668SJens Axboe 		goto again;
3497642e668SJens Axboe 
3507642e668SJens Axboe 	trace_io_uring_local_work_run(ctx, ret, loops);
3517642e668SJens Axboe 	return ret;
3527642e668SJens Axboe }
3537642e668SJens Axboe 
3547642e668SJens Axboe int io_run_local_work_locked(struct io_ring_ctx *ctx, int min_events)
3557642e668SJens Axboe {
3567642e668SJens Axboe 	struct io_tw_state ts = {};
3577642e668SJens Axboe 
3587642e668SJens Axboe 	if (!io_local_work_pending(ctx))
3597642e668SJens Axboe 		return 0;
3607642e668SJens Axboe 	return __io_run_local_work(ctx, ts, min_events,
3617642e668SJens Axboe 					max(IO_LOCAL_TW_DEFAULT_MAX, min_events));
3627642e668SJens Axboe }
3637642e668SJens Axboe 
3647642e668SJens Axboe int io_run_local_work(struct io_ring_ctx *ctx, int min_events, int max_events)
3657642e668SJens Axboe {
3667642e668SJens Axboe 	struct io_tw_state ts = {};
3677642e668SJens Axboe 	int ret;
3687642e668SJens Axboe 
3697642e668SJens Axboe 	mutex_lock(&ctx->uring_lock);
3707642e668SJens Axboe 	ret = __io_run_local_work(ctx, ts, min_events, max_events);
3717642e668SJens Axboe 	mutex_unlock(&ctx->uring_lock);
3727642e668SJens Axboe 	return ret;
3737642e668SJens Axboe }
374