xref: /linux/io_uring/tw.c (revision 23b0f90ba871f096474e1c27c3d14f455189d2d9)
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Task work handling for io_uring
4  */
5 #include <linux/kernel.h>
6 #include <linux/errno.h>
7 #include <linux/sched/signal.h>
8 #include <linux/io_uring.h>
9 #include <linux/indirect_call_wrapper.h>
10 
11 #include "io_uring.h"
12 #include "tctx.h"
13 #include "poll.h"
14 #include "rw.h"
15 #include "eventfd.h"
16 #include "wait.h"
17 
18 void io_fallback_req_func(struct work_struct *work)
19 {
20 	struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx,
21 						fallback_work.work);
22 	struct llist_node *node = llist_del_all(&ctx->fallback_llist);
23 	struct io_kiocb *req, *tmp;
24 	struct io_tw_state ts = {};
25 
26 	percpu_ref_get(&ctx->refs);
27 	mutex_lock(&ctx->uring_lock);
28 	ts.cancel = io_should_terminate_tw(ctx);
29 	llist_for_each_entry_safe(req, tmp, node, io_task_work.node)
30 		req->io_task_work.func((struct io_tw_req){req}, ts);
31 	io_submit_flush_completions(ctx);
32 	mutex_unlock(&ctx->uring_lock);
33 	percpu_ref_put(&ctx->refs);
34 }
35 
36 static void ctx_flush_and_put(struct io_ring_ctx *ctx, io_tw_token_t tw)
37 {
38 	if (!ctx)
39 		return;
40 	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
41 		atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
42 
43 	io_submit_flush_completions(ctx);
44 	mutex_unlock(&ctx->uring_lock);
45 	percpu_ref_put(&ctx->refs);
46 }
47 
48 /*
49  * Run queued task_work, returning the number of entries processed in *count.
50  * If more entries than max_entries are available, stop processing once this
51  * is reached and return the rest of the list.
52  */
53 struct llist_node *io_handle_tw_list(struct llist_node *node,
54 				     unsigned int *count,
55 				     unsigned int max_entries)
56 {
57 	struct io_ring_ctx *ctx = NULL;
58 	struct io_tw_state ts = { };
59 
60 	do {
61 		struct llist_node *next = node->next;
62 		struct io_kiocb *req = container_of(node, struct io_kiocb,
63 						    io_task_work.node);
64 
65 		if (req->ctx != ctx) {
66 			ctx_flush_and_put(ctx, ts);
67 			ctx = req->ctx;
68 			mutex_lock(&ctx->uring_lock);
69 			percpu_ref_get(&ctx->refs);
70 			ts.cancel = io_should_terminate_tw(ctx);
71 		}
72 		INDIRECT_CALL_2(req->io_task_work.func,
73 				io_poll_task_func, io_req_rw_complete,
74 				(struct io_tw_req){req}, ts);
75 		node = next;
76 		(*count)++;
77 		if (unlikely(need_resched())) {
78 			ctx_flush_and_put(ctx, ts);
79 			ctx = NULL;
80 			cond_resched();
81 		}
82 	} while (node && *count < max_entries);
83 
84 	ctx_flush_and_put(ctx, ts);
85 	return node;
86 }
87 
88 static __cold void __io_fallback_tw(struct llist_node *node, bool sync)
89 {
90 	struct io_ring_ctx *last_ctx = NULL;
91 	struct io_kiocb *req;
92 
93 	while (node) {
94 		req = container_of(node, struct io_kiocb, io_task_work.node);
95 		node = node->next;
96 		if (last_ctx != req->ctx) {
97 			if (last_ctx) {
98 				if (sync)
99 					flush_delayed_work(&last_ctx->fallback_work);
100 				percpu_ref_put(&last_ctx->refs);
101 			}
102 			last_ctx = req->ctx;
103 			percpu_ref_get(&last_ctx->refs);
104 		}
105 		if (llist_add(&req->io_task_work.node, &last_ctx->fallback_llist))
106 			schedule_delayed_work(&last_ctx->fallback_work, 1);
107 	}
108 
109 	if (last_ctx) {
110 		if (sync)
111 			flush_delayed_work(&last_ctx->fallback_work);
112 		percpu_ref_put(&last_ctx->refs);
113 	}
114 }
115 
116 static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
117 {
118 	struct llist_node *node = llist_del_all(&tctx->task_list);
119 
120 	__io_fallback_tw(node, sync);
121 }
122 
123 struct llist_node *tctx_task_work_run(struct io_uring_task *tctx,
124 				      unsigned int max_entries,
125 				      unsigned int *count)
126 {
127 	struct llist_node *node;
128 
129 	node = llist_del_all(&tctx->task_list);
130 	if (node) {
131 		node = llist_reverse_order(node);
132 		node = io_handle_tw_list(node, count, max_entries);
133 	}
134 
135 	/* relaxed read is enough as only the task itself sets ->in_cancel */
136 	if (unlikely(atomic_read(&tctx->in_cancel)))
137 		io_uring_drop_tctx_refs(current);
138 
139 	trace_io_uring_task_work_run(tctx, *count);
140 	return node;
141 }
142 
143 void tctx_task_work(struct callback_head *cb)
144 {
145 	struct io_uring_task *tctx;
146 	struct llist_node *ret;
147 	unsigned int count = 0;
148 
149 	tctx = container_of(cb, struct io_uring_task, task_work);
150 	ret = tctx_task_work_run(tctx, UINT_MAX, &count);
151 	/* can't happen */
152 	WARN_ON_ONCE(ret);
153 }
154 
155 void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
156 {
157 	struct io_ring_ctx *ctx = req->ctx;
158 	unsigned nr_wait, nr_tw, nr_tw_prev;
159 	struct llist_node *head;
160 
161 	/* See comment above IO_CQ_WAKE_INIT */
162 	BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
163 
164 	/*
165 	 * We don't know how many requests there are in the link and whether
166 	 * they can even be queued lazily, fall back to non-lazy.
167 	 */
168 	if (req->flags & IO_REQ_LINK_FLAGS)
169 		flags &= ~IOU_F_TWQ_LAZY_WAKE;
170 
171 	guard(rcu)();
172 
173 	head = READ_ONCE(ctx->work_llist.first);
174 	do {
175 		nr_tw_prev = 0;
176 		if (head) {
177 			struct io_kiocb *first_req = container_of(head,
178 							struct io_kiocb,
179 							io_task_work.node);
180 			/*
181 			 * Might be executed at any moment, rely on
182 			 * SLAB_TYPESAFE_BY_RCU to keep it alive.
183 			 */
184 			nr_tw_prev = READ_ONCE(first_req->nr_tw);
185 		}
186 
187 		/*
188 		 * Theoretically, it can overflow, but that's fine as one of
189 		 * previous adds should've tried to wake the task.
190 		 */
191 		nr_tw = nr_tw_prev + 1;
192 		if (!(flags & IOU_F_TWQ_LAZY_WAKE))
193 			nr_tw = IO_CQ_WAKE_FORCE;
194 
195 		req->nr_tw = nr_tw;
196 		req->io_task_work.node.next = head;
197 	} while (!try_cmpxchg(&ctx->work_llist.first, &head,
198 			      &req->io_task_work.node));
199 
200 	/*
201 	 * cmpxchg implies a full barrier, which pairs with the barrier
202 	 * in set_current_state() on the io_cqring_wait() side. It's used
203 	 * to ensure that either we see updated ->cq_wait_nr, or waiters
204 	 * going to sleep will observe the work added to the list, which
205 	 * is similar to the wait/wawke task state sync.
206 	 */
207 
208 	if (!head) {
209 		if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
210 			atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
211 		if (ctx->has_evfd)
212 			io_eventfd_signal(ctx, false);
213 	}
214 
215 	nr_wait = atomic_read(&ctx->cq_wait_nr);
216 	/* not enough or no one is waiting */
217 	if (nr_tw < nr_wait)
218 		return;
219 	/* the previous add has already woken it up */
220 	if (nr_tw_prev >= nr_wait)
221 		return;
222 	wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
223 }
224 
225 void io_req_normal_work_add(struct io_kiocb *req)
226 {
227 	struct io_uring_task *tctx = req->tctx;
228 	struct io_ring_ctx *ctx = req->ctx;
229 
230 	/* task_work already pending, we're done */
231 	if (!llist_add(&req->io_task_work.node, &tctx->task_list))
232 		return;
233 
234 	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
235 		atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
236 
237 	/* SQPOLL doesn't need the task_work added, it'll run it itself */
238 	if (ctx->flags & IORING_SETUP_SQPOLL) {
239 		__set_notify_signal(tctx->task);
240 		return;
241 	}
242 
243 	if (likely(!task_work_add(tctx->task, &tctx->task_work, ctx->notify_method)))
244 		return;
245 
246 	io_fallback_tw(tctx, false);
247 }
248 
249 void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags)
250 {
251 	if (WARN_ON_ONCE(!(req->ctx->flags & IORING_SETUP_DEFER_TASKRUN)))
252 		return;
253 	__io_req_task_work_add(req, flags);
254 }
255 
256 void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
257 {
258 	struct llist_node *node = llist_del_all(&ctx->work_llist);
259 
260 	__io_fallback_tw(node, false);
261 	node = llist_del_all(&ctx->retry_llist);
262 	__io_fallback_tw(node, false);
263 }
264 
265 static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
266 				       int min_events)
267 {
268 	if (!io_local_work_pending(ctx))
269 		return false;
270 	if (events < min_events)
271 		return true;
272 	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
273 		atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
274 	return false;
275 }
276 
277 static int __io_run_local_work_loop(struct llist_node **node,
278 				    io_tw_token_t tw,
279 				    int events)
280 {
281 	int ret = 0;
282 
283 	while (*node) {
284 		struct llist_node *next = (*node)->next;
285 		struct io_kiocb *req = container_of(*node, struct io_kiocb,
286 						    io_task_work.node);
287 		INDIRECT_CALL_2(req->io_task_work.func,
288 				io_poll_task_func, io_req_rw_complete,
289 				(struct io_tw_req){req}, tw);
290 		*node = next;
291 		if (++ret >= events)
292 			break;
293 	}
294 
295 	return ret;
296 }
297 
298 static int __io_run_local_work(struct io_ring_ctx *ctx, io_tw_token_t tw,
299 			       int min_events, int max_events)
300 {
301 	struct llist_node *node;
302 	unsigned int loops = 0;
303 	int ret = 0;
304 
305 	if (WARN_ON_ONCE(ctx->submitter_task != current))
306 		return -EEXIST;
307 	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
308 		atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
309 again:
310 	tw.cancel = io_should_terminate_tw(ctx);
311 	min_events -= ret;
312 	ret = __io_run_local_work_loop(&ctx->retry_llist.first, tw, max_events);
313 	if (ctx->retry_llist.first)
314 		goto retry_done;
315 
316 	/*
317 	 * llists are in reverse order, flip it back the right way before
318 	 * running the pending items.
319 	 */
320 	node = llist_reverse_order(llist_del_all(&ctx->work_llist));
321 	ret += __io_run_local_work_loop(&node, tw, max_events - ret);
322 	ctx->retry_llist.first = node;
323 	loops++;
324 
325 	if (io_run_local_work_continue(ctx, ret, min_events))
326 		goto again;
327 retry_done:
328 	io_submit_flush_completions(ctx);
329 	if (io_run_local_work_continue(ctx, ret, min_events))
330 		goto again;
331 
332 	trace_io_uring_local_work_run(ctx, ret, loops);
333 	return ret;
334 }
335 
336 int io_run_local_work_locked(struct io_ring_ctx *ctx, int min_events)
337 {
338 	struct io_tw_state ts = {};
339 
340 	if (!io_local_work_pending(ctx))
341 		return 0;
342 	return __io_run_local_work(ctx, ts, min_events,
343 					max(IO_LOCAL_TW_DEFAULT_MAX, min_events));
344 }
345 
346 int io_run_local_work(struct io_ring_ctx *ctx, int min_events, int max_events)
347 {
348 	struct io_tw_state ts = {};
349 	int ret;
350 
351 	mutex_lock(&ctx->uring_lock);
352 	ret = __io_run_local_work(ctx, ts, min_events, max_events);
353 	mutex_unlock(&ctx->uring_lock);
354 	return ret;
355 }
356