xref: /linux/io_uring/tw.c (revision ff3ac1a0bd75400375678c0f81c2b613cbc03e14)
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Task work handling for io_uring
4  */
5 #include <linux/kernel.h>
6 #include <linux/errno.h>
7 #include <linux/sched/signal.h>
8 #include <linux/io_uring.h>
9 #include <linux/indirect_call_wrapper.h>
10 
11 #include "io_uring.h"
12 #include "tctx.h"
13 #include "poll.h"
14 #include "rw.h"
15 #include "eventfd.h"
16 #include "wait.h"
17 #include "mpscq.h"
18 
19 static void ctx_flush_and_put(struct io_ring_ctx *ctx, io_tw_token_t tw)
20 {
21 	if (!ctx)
22 		return;
23 	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
24 		atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
25 
26 	io_submit_flush_completions(ctx);
27 	mutex_unlock(&ctx->uring_lock);
28 	percpu_ref_put(&ctx->refs);
29 }
30 
31 void io_tctx_fallback_work(struct work_struct *work)
32 {
33 	struct io_uring_task *tctx = container_of(work, struct io_uring_task,
34 						  fallback_work);
35 	unsigned int count = 0;
36 
37 	/*
38 	 * Run the entries directly. We're in PF_KTHRED context, hence
39 	 * io_should_terminate_tw() is true and they will be marked as
40 	 * canceled.
41 	 */
42 	tctx_task_work_run(tctx, UINT_MAX, &count);
43 	put_task_struct(tctx->task);
44 }
45 
46 static void io_fallback_tw(struct io_uring_task *tctx)
47 {
48 	/*
49 	 * The task ref both keeps ->task valid and, as __io_uring_free() is
50 	 * only called when the task itself is freed, ensures the tctx (and
51 	 * the queued work) stay around until the drain has run.
52 	 */
53 	get_task_struct(tctx->task);
54 	if (!queue_work(system_dfl_wq, &tctx->fallback_work))
55 		put_task_struct(tctx->task);
56 }
57 
58 /*
59  * Run queued task_work, processing no more than max_entries, with the number
60  * of entries processed added to *count. If more entries than max_entries are
61  * available, the remainder simply stay on the queue for the next run.
62  */
63 void tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries,
64 			unsigned int *count)
65 {
66 	struct io_ring_ctx *ctx = NULL;
67 	struct io_tw_state ts = { };
68 
69 	while (*count < max_entries) {
70 		struct llist_node *node = mpscq_pop(&tctx->task_list,
71 						    &tctx->task_head);
72 		struct io_kiocb *req;
73 
74 		if (!node) {
75 			if (mpscq_empty(&tctx->task_list))
76 				break;
77 			/*
78 			 * A producer has published a node but hasn't
79 			 * linked it into the queue yet (see mpscq_pop()).
80 			 * Give it a chance to finish rather than spinning,
81 			 * and don't sit on the ctx lock while doing so.
82 			 */
83 			ctx_flush_and_put(ctx, ts);
84 			ctx = NULL;
85 			cond_resched();
86 			continue;
87 		}
88 		req = container_of(node, struct io_kiocb, io_task_work.node);
89 		if (req->ctx != ctx) {
90 			ctx_flush_and_put(ctx, ts);
91 			ctx = req->ctx;
92 			mutex_lock(&ctx->uring_lock);
93 			percpu_ref_get(&ctx->refs);
94 			ts.cancel = io_should_terminate_tw(ctx);
95 		}
96 		INDIRECT_CALL_2(req->io_task_work.func,
97 				io_poll_task_func, io_req_rw_complete,
98 				(struct io_tw_req){req}, ts);
99 		(*count)++;
100 		/*
101 		 * Break if most recent pop emptied the queue. This helps
102 		 * bound task_work run, and also protects the regular
103 		 * task_work addition.
104 		 */
105 		if (mpscq_pop_emptied(&tctx->task_list, tctx->task_head))
106 			break;
107 		if (unlikely(need_resched())) {
108 			ctx_flush_and_put(ctx, ts);
109 			ctx = NULL;
110 			cond_resched();
111 		}
112 	}
113 	ctx_flush_and_put(ctx, ts);
114 
115 	/*
116 	 * Relaxed read is enough as only the task itself sets ->in_cancel.
117 	 * The tctx may also be drained by io_tctx_fallback_work(), in which
118 	 * case current is a kworker that has no tctx refs to drop.
119 	 */
120 	if (unlikely(atomic_read(&tctx->in_cancel)) &&
121 	    current->io_uring == tctx)
122 		io_uring_drop_tctx_refs(current);
123 
124 	trace_io_uring_task_work_run(tctx, *count);
125 }
126 
127 void tctx_task_work(struct callback_head *cb)
128 {
129 	struct io_uring_task *tctx;
130 	unsigned int count = 0;
131 
132 	tctx = container_of(cb, struct io_uring_task, task_work);
133 	tctx_task_work_run(tctx, UINT_MAX, &count);
134 }
135 
136 /*
137  * Sets IORING_SQ_TASKRUN in the sq_flags shared with userspace, using the
138  * RCU protected rings pointer to be safe against concurrent ring resizing.
139  */
140 static void io_ctx_mark_taskrun(struct io_ring_ctx *ctx)
141 {
142 	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) {
143 		struct io_rings *rings;
144 
145 		guard(rcu)();
146 		rings = rcu_dereference(ctx->rings_rcu);
147 		atomic_or(IORING_SQ_TASKRUN, &rings->sq_flags);
148 	}
149 }
150 
151 void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
152 {
153 	struct io_ring_ctx *ctx = req->ctx;
154 	int nr_wait;
155 
156 	/*
157 	 * We don't know how many requests there are in the link and whether
158 	 * they can even be queued lazily, fall back to non-lazy.
159 	 */
160 	if (req->flags & IO_REQ_LINK_FLAGS)
161 		flags &= ~IOU_F_TWQ_LAZY_WAKE;
162 
163 	/*
164 	 * The xchg() in mpscq_push() implies a full barrier, which pairs with
165 	 * the barrier in set_current_state() on the io_cqring_wait() side. This
166 	 * ensures that either we see the updated ->cq_wait_nr, or waiters going
167 	 * to sleep will observe the work added to the list, which is similar to
168 	 * the wait/wake task state sync.
169 	 */
170 	if (mpscq_push(&ctx->work_list, &req->io_task_work.node)) {
171 		io_ctx_mark_taskrun(ctx);
172 		if (data_race(ctx->int_flags) & IO_RING_F_HAS_EVFD)
173 			io_eventfd_signal(ctx, false);
174 	}
175 
176 	/*
177 	 * No one is waiting (IO_CQ_WAKE_INIT), or this cycle's wake up has
178 	 * already been issued (zero or negative, see below).
179 	 */
180 	nr_wait = atomic_read(&ctx->cq_wait_nr);
181 	if (nr_wait <= 0)
182 		return;
183 	if (flags & IOU_F_TWQ_LAZY_WAKE) {
184 		/*
185 		 * ->cq_wait_nr counts down the number of lazy adds, once it
186 		 * hits zero we're good to wake the waiter. A producer that
187 		 * gets delayed between pushing its entry and getting here
188 		 * may count down a later wait cycle. That's OK, it'll be an
189 		 * early wake, not a lost one.
190 		 */
191 		if (!atomic_dec_and_test(&ctx->cq_wait_nr))
192 			return;
193 	} else if (atomic_xchg(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT) <= 0) {
194 		/*
195 		 * Potentially raced with lazy add, claim the wake. A value
196 		 * <= 0 means a lazy add hit zero or another forced add
197 		 * claimed IO_CQ_WAKE_INIT. Either way, the wake up for this
198 		 * wait cycle has already been done.
199 		 */
200 		return;
201 	}
202 	wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
203 }
204 
205 void io_req_normal_work_add(struct io_kiocb *req)
206 {
207 	struct io_uring_task *tctx = req->tctx;
208 	struct io_ring_ctx *ctx = req->ctx;
209 
210 	/* tw run already pending, nothing else to do */
211 	if (!mpscq_push(&tctx->task_list, &req->io_task_work.node))
212 		return;
213 
214 	/*
215 	 * Doesn't need to use ->rings_rcu, as resizing isn't supported for
216 	 * !DEFER_TASKRUN.
217 	 */
218 	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
219 		atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
220 
221 	/* SQPOLL doesn't need the task_work added, it'll run it itself */
222 	if (ctx->flags & IORING_SETUP_SQPOLL) {
223 		__set_notify_signal(tctx->task);
224 		return;
225 	}
226 
227 	if (likely(!task_work_add(tctx->task, &tctx->task_work, ctx->notify_method)))
228 		return;
229 
230 	io_fallback_tw(tctx);
231 }
232 
233 void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags)
234 {
235 	if (WARN_ON_ONCE(!(req->ctx->flags & IORING_SETUP_DEFER_TASKRUN)))
236 		return;
237 	__io_req_task_work_add(req, flags);
238 }
239 
240 void __cold io_cancel_local_task_work(struct io_ring_ctx *ctx)
241 {
242 	struct io_tw_state ts = { .cancel = true };
243 	struct llist_node *node;
244 
245 	/*
246 	 * The work list consumer side is serialized by ->uring_lock, see
247 	 * __io_run_local_work(). Grab it to guard against racing with normal
248 	 * task_work running, as the task may be exiting. The ring is going
249 	 * away, run the entries in cancel mode right here - the callers
250 	 * provide the same process context the per-ctx fallback work that
251 	 * they were previously punted to ran in.
252 	 */
253 	guard(mutex)(&ctx->uring_lock);
254 
255 	while (!mpscq_empty(&ctx->work_list)) {
256 		struct io_kiocb *req;
257 
258 		node = mpscq_pop(&ctx->work_list, &ctx->work_head);
259 		if (!node) {
260 			/* a producer is mid-push, wait for it to link */
261 			cond_resched();
262 			continue;
263 		}
264 		req = container_of(node, struct io_kiocb, io_task_work.node);
265 		req->io_task_work.func((struct io_tw_req){req}, ts);
266 	}
267 	io_submit_flush_completions(ctx);
268 }
269 
270 static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
271 				       int min_events)
272 {
273 	if (!io_local_work_pending(ctx))
274 		return false;
275 	if (events < min_events)
276 		return true;
277 	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
278 		atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
279 	return false;
280 }
281 
282 static int __io_run_local_work_loop(struct io_ring_ctx *ctx,
283 				    io_tw_token_t tw,
284 				    int events)
285 {
286 	int ret = 0;
287 
288 	while (ret < events) {
289 		struct llist_node *node = mpscq_pop(&ctx->work_list, &ctx->work_head);
290 		struct io_kiocb *req;
291 
292 		if (!node)
293 			break;
294 		req = container_of(node, struct io_kiocb, io_task_work.node);
295 		INDIRECT_CALL_2(req->io_task_work.func,
296 				io_poll_task_func, io_req_rw_complete,
297 				(struct io_tw_req){req}, tw);
298 		ret++;
299 	}
300 
301 	return ret;
302 }
303 
304 static int __io_run_local_work(struct io_ring_ctx *ctx, io_tw_token_t tw,
305 			       int min_events, int max_events)
306 {
307 	unsigned int loops = 0;
308 	int ret = 0;
309 
310 	if (WARN_ON_ONCE(ctx->submitter_task != current))
311 		return -EEXIST;
312 	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
313 		atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
314 again:
315 	/*
316 	 * If the last loop made no progress while work is still pending,
317 	 * a producer has published a node but hasn't linked it into the
318 	 * queue yet (see mpscq_pop()). Give it a chance to finish rather
319 	 * than spinning on the queue.
320 	 */
321 	if (unlikely(loops && !ret))
322 		cond_resched();
323 	tw.cancel = io_should_terminate_tw(ctx);
324 	min_events -= ret;
325 	ret = __io_run_local_work_loop(ctx, tw, max_events);
326 	loops++;
327 
328 	if (io_run_local_work_continue(ctx, ret, min_events))
329 		goto again;
330 	io_submit_flush_completions(ctx);
331 	if (io_run_local_work_continue(ctx, ret, min_events))
332 		goto again;
333 
334 	trace_io_uring_local_work_run(ctx, ret, loops);
335 	return ret;
336 }
337 
338 int io_run_local_work_locked(struct io_ring_ctx *ctx, int min_events)
339 {
340 	struct io_tw_state ts = {};
341 
342 	if (!io_local_work_pending(ctx))
343 		return 0;
344 	return __io_run_local_work(ctx, ts, min_events,
345 					max(IO_LOCAL_TW_DEFAULT_MAX, min_events));
346 }
347 
348 int io_run_local_work(struct io_ring_ctx *ctx, int min_events, int max_events)
349 {
350 	struct io_tw_state ts = {};
351 	int ret;
352 
353 	mutex_lock(&ctx->uring_lock);
354 	ret = __io_run_local_work(ctx, ts, min_events, max_events);
355 	mutex_unlock(&ctx->uring_lock);
356 	return ret;
357 }
358