xref: /linux/io_uring/tw.c (revision 7642e668606009fcd3fe1fc161a79ef90403d507)
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Task work handling for io_uring
4  */
5 #include <linux/kernel.h>
6 #include <linux/errno.h>
7 #include <linux/sched/signal.h>
8 #include <linux/io_uring.h>
9 #include <linux/indirect_call_wrapper.h>
10 
11 #include "io_uring.h"
12 #include "tctx.h"
13 #include "poll.h"
14 #include "rw.h"
15 #include "eventfd.h"
16 
17 void io_fallback_req_func(struct work_struct *work)
18 {
19 	struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx,
20 						fallback_work.work);
21 	struct llist_node *node = llist_del_all(&ctx->fallback_llist);
22 	struct io_kiocb *req, *tmp;
23 	struct io_tw_state ts = {};
24 
25 	percpu_ref_get(&ctx->refs);
26 	mutex_lock(&ctx->uring_lock);
27 	ts.cancel = io_should_terminate_tw(ctx);
28 	llist_for_each_entry_safe(req, tmp, node, io_task_work.node)
29 		req->io_task_work.func((struct io_tw_req){req}, ts);
30 	io_submit_flush_completions(ctx);
31 	mutex_unlock(&ctx->uring_lock);
32 	percpu_ref_put(&ctx->refs);
33 }
34 
35 static void ctx_flush_and_put(struct io_ring_ctx *ctx, io_tw_token_t tw)
36 {
37 	if (!ctx)
38 		return;
39 	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
40 		atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
41 
42 	io_submit_flush_completions(ctx);
43 	mutex_unlock(&ctx->uring_lock);
44 	percpu_ref_put(&ctx->refs);
45 }
46 
47 /*
48  * Run queued task_work, returning the number of entries processed in *count.
49  * If more entries than max_entries are available, stop processing once this
50  * is reached and return the rest of the list.
51  */
52 struct llist_node *io_handle_tw_list(struct llist_node *node,
53 				     unsigned int *count,
54 				     unsigned int max_entries)
55 {
56 	struct io_ring_ctx *ctx = NULL;
57 	struct io_tw_state ts = { };
58 
59 	do {
60 		struct llist_node *next = node->next;
61 		struct io_kiocb *req = container_of(node, struct io_kiocb,
62 						    io_task_work.node);
63 
64 		if (req->ctx != ctx) {
65 			ctx_flush_and_put(ctx, ts);
66 			ctx = req->ctx;
67 			mutex_lock(&ctx->uring_lock);
68 			percpu_ref_get(&ctx->refs);
69 			ts.cancel = io_should_terminate_tw(ctx);
70 		}
71 		INDIRECT_CALL_2(req->io_task_work.func,
72 				io_poll_task_func, io_req_rw_complete,
73 				(struct io_tw_req){req}, ts);
74 		node = next;
75 		(*count)++;
76 		if (unlikely(need_resched())) {
77 			ctx_flush_and_put(ctx, ts);
78 			ctx = NULL;
79 			cond_resched();
80 		}
81 	} while (node && *count < max_entries);
82 
83 	ctx_flush_and_put(ctx, ts);
84 	return node;
85 }
86 
87 static __cold void __io_fallback_tw(struct llist_node *node, bool sync)
88 {
89 	struct io_ring_ctx *last_ctx = NULL;
90 	struct io_kiocb *req;
91 
92 	while (node) {
93 		req = container_of(node, struct io_kiocb, io_task_work.node);
94 		node = node->next;
95 		if (last_ctx != req->ctx) {
96 			if (last_ctx) {
97 				if (sync)
98 					flush_delayed_work(&last_ctx->fallback_work);
99 				percpu_ref_put(&last_ctx->refs);
100 			}
101 			last_ctx = req->ctx;
102 			percpu_ref_get(&last_ctx->refs);
103 		}
104 		if (llist_add(&req->io_task_work.node, &last_ctx->fallback_llist))
105 			schedule_delayed_work(&last_ctx->fallback_work, 1);
106 	}
107 
108 	if (last_ctx) {
109 		if (sync)
110 			flush_delayed_work(&last_ctx->fallback_work);
111 		percpu_ref_put(&last_ctx->refs);
112 	}
113 }
114 
115 static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
116 {
117 	struct llist_node *node = llist_del_all(&tctx->task_list);
118 
119 	__io_fallback_tw(node, sync);
120 }
121 
122 struct llist_node *tctx_task_work_run(struct io_uring_task *tctx,
123 				      unsigned int max_entries,
124 				      unsigned int *count)
125 {
126 	struct llist_node *node;
127 
128 	node = llist_del_all(&tctx->task_list);
129 	if (node) {
130 		node = llist_reverse_order(node);
131 		node = io_handle_tw_list(node, count, max_entries);
132 	}
133 
134 	/* relaxed read is enough as only the task itself sets ->in_cancel */
135 	if (unlikely(atomic_read(&tctx->in_cancel)))
136 		io_uring_drop_tctx_refs(current);
137 
138 	trace_io_uring_task_work_run(tctx, *count);
139 	return node;
140 }
141 
142 void tctx_task_work(struct callback_head *cb)
143 {
144 	struct io_uring_task *tctx;
145 	struct llist_node *ret;
146 	unsigned int count = 0;
147 
148 	tctx = container_of(cb, struct io_uring_task, task_work);
149 	ret = tctx_task_work_run(tctx, UINT_MAX, &count);
150 	/* can't happen */
151 	WARN_ON_ONCE(ret);
152 }
153 
154 void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
155 {
156 	struct io_ring_ctx *ctx = req->ctx;
157 	unsigned nr_wait, nr_tw, nr_tw_prev;
158 	struct llist_node *head;
159 
160 	/* See comment above IO_CQ_WAKE_INIT */
161 	BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
162 
163 	/*
164 	 * We don't know how many requests there are in the link and whether
165 	 * they can even be queued lazily, fall back to non-lazy.
166 	 */
167 	if (req->flags & IO_REQ_LINK_FLAGS)
168 		flags &= ~IOU_F_TWQ_LAZY_WAKE;
169 
170 	guard(rcu)();
171 
172 	head = READ_ONCE(ctx->work_llist.first);
173 	do {
174 		nr_tw_prev = 0;
175 		if (head) {
176 			struct io_kiocb *first_req = container_of(head,
177 							struct io_kiocb,
178 							io_task_work.node);
179 			/*
180 			 * Might be executed at any moment, rely on
181 			 * SLAB_TYPESAFE_BY_RCU to keep it alive.
182 			 */
183 			nr_tw_prev = READ_ONCE(first_req->nr_tw);
184 		}
185 
186 		/*
187 		 * Theoretically, it can overflow, but that's fine as one of
188 		 * previous adds should've tried to wake the task.
189 		 */
190 		nr_tw = nr_tw_prev + 1;
191 		if (!(flags & IOU_F_TWQ_LAZY_WAKE))
192 			nr_tw = IO_CQ_WAKE_FORCE;
193 
194 		req->nr_tw = nr_tw;
195 		req->io_task_work.node.next = head;
196 	} while (!try_cmpxchg(&ctx->work_llist.first, &head,
197 			      &req->io_task_work.node));
198 
199 	/*
200 	 * cmpxchg implies a full barrier, which pairs with the barrier
201 	 * in set_current_state() on the io_cqring_wait() side. It's used
202 	 * to ensure that either we see updated ->cq_wait_nr, or waiters
203 	 * going to sleep will observe the work added to the list, which
204 	 * is similar to the wait/wawke task state sync.
205 	 */
206 
207 	if (!head) {
208 		if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
209 			atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
210 		if (ctx->has_evfd)
211 			io_eventfd_signal(ctx, false);
212 	}
213 
214 	nr_wait = atomic_read(&ctx->cq_wait_nr);
215 	/* not enough or no one is waiting */
216 	if (nr_tw < nr_wait)
217 		return;
218 	/* the previous add has already woken it up */
219 	if (nr_tw_prev >= nr_wait)
220 		return;
221 	wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
222 }
223 
224 void io_req_normal_work_add(struct io_kiocb *req)
225 {
226 	struct io_uring_task *tctx = req->tctx;
227 	struct io_ring_ctx *ctx = req->ctx;
228 
229 	/* task_work already pending, we're done */
230 	if (!llist_add(&req->io_task_work.node, &tctx->task_list))
231 		return;
232 
233 	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
234 		atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
235 
236 	/* SQPOLL doesn't need the task_work added, it'll run it itself */
237 	if (ctx->flags & IORING_SETUP_SQPOLL) {
238 		__set_notify_signal(tctx->task);
239 		return;
240 	}
241 
242 	if (likely(!task_work_add(tctx->task, &tctx->task_work, ctx->notify_method)))
243 		return;
244 
245 	io_fallback_tw(tctx, false);
246 }
247 
248 void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags)
249 {
250 	if (WARN_ON_ONCE(!(req->ctx->flags & IORING_SETUP_DEFER_TASKRUN)))
251 		return;
252 	__io_req_task_work_add(req, flags);
253 }
254 
255 void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
256 {
257 	struct llist_node *node = llist_del_all(&ctx->work_llist);
258 
259 	__io_fallback_tw(node, false);
260 	node = llist_del_all(&ctx->retry_llist);
261 	__io_fallback_tw(node, false);
262 }
263 
264 static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
265 				       int min_events)
266 {
267 	if (!io_local_work_pending(ctx))
268 		return false;
269 	if (events < min_events)
270 		return true;
271 	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
272 		atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
273 	return false;
274 }
275 
276 static int __io_run_local_work_loop(struct llist_node **node,
277 				    io_tw_token_t tw,
278 				    int events)
279 {
280 	int ret = 0;
281 
282 	while (*node) {
283 		struct llist_node *next = (*node)->next;
284 		struct io_kiocb *req = container_of(*node, struct io_kiocb,
285 						    io_task_work.node);
286 		INDIRECT_CALL_2(req->io_task_work.func,
287 				io_poll_task_func, io_req_rw_complete,
288 				(struct io_tw_req){req}, tw);
289 		*node = next;
290 		if (++ret >= events)
291 			break;
292 	}
293 
294 	return ret;
295 }
296 
297 static int __io_run_local_work(struct io_ring_ctx *ctx, io_tw_token_t tw,
298 			       int min_events, int max_events)
299 {
300 	struct llist_node *node;
301 	unsigned int loops = 0;
302 	int ret = 0;
303 
304 	if (WARN_ON_ONCE(ctx->submitter_task != current))
305 		return -EEXIST;
306 	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
307 		atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
308 again:
309 	tw.cancel = io_should_terminate_tw(ctx);
310 	min_events -= ret;
311 	ret = __io_run_local_work_loop(&ctx->retry_llist.first, tw, max_events);
312 	if (ctx->retry_llist.first)
313 		goto retry_done;
314 
315 	/*
316 	 * llists are in reverse order, flip it back the right way before
317 	 * running the pending items.
318 	 */
319 	node = llist_reverse_order(llist_del_all(&ctx->work_llist));
320 	ret += __io_run_local_work_loop(&node, tw, max_events - ret);
321 	ctx->retry_llist.first = node;
322 	loops++;
323 
324 	if (io_run_local_work_continue(ctx, ret, min_events))
325 		goto again;
326 retry_done:
327 	io_submit_flush_completions(ctx);
328 	if (io_run_local_work_continue(ctx, ret, min_events))
329 		goto again;
330 
331 	trace_io_uring_local_work_run(ctx, ret, loops);
332 	return ret;
333 }
334 
335 int io_run_local_work_locked(struct io_ring_ctx *ctx, int min_events)
336 {
337 	struct io_tw_state ts = {};
338 
339 	if (!io_local_work_pending(ctx))
340 		return 0;
341 	return __io_run_local_work(ctx, ts, min_events,
342 					max(IO_LOCAL_TW_DEFAULT_MAX, min_events));
343 }
344 
345 int io_run_local_work(struct io_ring_ctx *ctx, int min_events, int max_events)
346 {
347 	struct io_tw_state ts = {};
348 	int ret;
349 
350 	mutex_lock(&ctx->uring_lock);
351 	ret = __io_run_local_work(ctx, ts, min_events, max_events);
352 	mutex_unlock(&ctx->uring_lock);
353 	return ret;
354 }
355