xref: /linux/io_uring/tw.c (revision 576cce91480a949f5b83578300f37023b933e0a2)
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Task work handling for io_uring
4  */
5 #include <linux/kernel.h>
6 #include <linux/errno.h>
7 #include <linux/sched/signal.h>
8 #include <linux/io_uring.h>
9 #include <linux/indirect_call_wrapper.h>
10 
11 #include "io_uring.h"
12 #include "tctx.h"
13 #include "poll.h"
14 #include "rw.h"
15 #include "eventfd.h"
16 #include "wait.h"
17 #include "mpscq.h"
18 
19 static void ctx_flush_and_put(struct io_ring_ctx *ctx, io_tw_token_t tw)
20 {
21 	if (!ctx)
22 		return;
23 	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
24 		atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
25 
26 	io_submit_flush_completions(ctx);
27 	mutex_unlock(&ctx->uring_lock);
28 	percpu_ref_put(&ctx->refs);
29 }
30 
31 void io_tctx_fallback_work(struct work_struct *work)
32 {
33 	struct io_uring_task *tctx = container_of(work, struct io_uring_task,
34 						  fallback_work);
35 	unsigned int count = 0;
36 
37 	/* see tctx_task_work() - a set bit must always have a run coming */
38 	clear_bit(0, &tctx->tw_pending);
39 	smp_mb__after_atomic();
40 
41 	/*
42 	 * Run the entries directly. We're in PF_KTHRED context, hence
43 	 * io_should_terminate_tw() is true and they will be marked as
44 	 * canceled.
45 	 */
46 	tctx_task_work_run(tctx, UINT_MAX, &count);
47 	put_task_struct(tctx->task);
48 }
49 
50 static void io_fallback_tw(struct io_uring_task *tctx)
51 {
52 	/*
53 	 * The task ref both keeps ->task valid and, as __io_uring_free() is
54 	 * only called when the task itself is freed, ensures the tctx (and
55 	 * the queued work) stay around until the drain has run.
56 	 */
57 	get_task_struct(tctx->task);
58 	if (!queue_work(system_unbound_wq, &tctx->fallback_work))
59 		put_task_struct(tctx->task);
60 }
61 
62 /*
63  * Run queued task_work, processing no more than max_entries, with the number
64  * of entries processed added to *count. If more entries than max_entries are
65  * available, the remainder simply stay on the queue for the next run.
66  */
67 void tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries,
68 			unsigned int *count)
69 {
70 	struct io_ring_ctx *ctx = NULL;
71 	struct io_tw_state ts = { };
72 
73 	while (*count < max_entries) {
74 		struct llist_node *node = mpscq_pop(&tctx->task_list,
75 						    &tctx->task_head);
76 		struct io_kiocb *req;
77 
78 		if (!node) {
79 			if (mpscq_empty(&tctx->task_list))
80 				break;
81 			/*
82 			 * A producer has published a node but hasn't
83 			 * linked it into the queue yet (see mpscq_pop()).
84 			 * Give it a chance to finish rather than spinning,
85 			 * and don't sit on the ctx lock while doing so.
86 			 */
87 			ctx_flush_and_put(ctx, ts);
88 			ctx = NULL;
89 			cond_resched();
90 			continue;
91 		}
92 		req = container_of(node, struct io_kiocb, io_task_work.node);
93 		if (req->ctx != ctx) {
94 			ctx_flush_and_put(ctx, ts);
95 			ctx = req->ctx;
96 			mutex_lock(&ctx->uring_lock);
97 			percpu_ref_get(&ctx->refs);
98 			ts.cancel = io_should_terminate_tw(ctx);
99 		}
100 		INDIRECT_CALL_2(req->io_task_work.func,
101 				io_poll_task_func, io_req_rw_complete,
102 				(struct io_tw_req){req}, ts);
103 		(*count)++;
104 		if (unlikely(need_resched())) {
105 			ctx_flush_and_put(ctx, ts);
106 			ctx = NULL;
107 			cond_resched();
108 		}
109 	}
110 	ctx_flush_and_put(ctx, ts);
111 
112 	/*
113 	 * Relaxed read is enough as only the task itself sets ->in_cancel.
114 	 * The tctx may also be drained by io_tctx_fallback_work(), in which
115 	 * case current is a kworker that has no tctx refs to drop.
116 	 */
117 	if (unlikely(atomic_read(&tctx->in_cancel)) &&
118 	    current->io_uring == tctx)
119 		io_uring_drop_tctx_refs(current);
120 
121 	trace_io_uring_task_work_run(tctx, *count);
122 }
123 
124 void tctx_task_work(struct callback_head *cb)
125 {
126 	struct io_uring_task *tctx;
127 	unsigned int count = 0;
128 
129 	tctx = container_of(cb, struct io_uring_task, task_work);
130 	clear_bit(0, &tctx->tw_pending);
131 	smp_mb__after_atomic();
132 	tctx_task_work_run(tctx, UINT_MAX, &count);
133 }
134 
135 /*
136  * Sets IORING_SQ_TASKRUN in the sq_flags shared with userspace, using the
137  * RCU protected rings pointer to be safe against concurrent ring resizing.
138  */
139 static void io_ctx_mark_taskrun(struct io_ring_ctx *ctx)
140 {
141 	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) {
142 		struct io_rings *rings;
143 
144 		guard(rcu)();
145 		rings = rcu_dereference(ctx->rings_rcu);
146 		atomic_or(IORING_SQ_TASKRUN, &rings->sq_flags);
147 	}
148 }
149 
150 void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
151 {
152 	struct io_ring_ctx *ctx = req->ctx;
153 	int nr_wait;
154 
155 	/*
156 	 * We don't know how many requests there are in the link and whether
157 	 * they can even be queued lazily, fall back to non-lazy.
158 	 */
159 	if (req->flags & IO_REQ_LINK_FLAGS)
160 		flags &= ~IOU_F_TWQ_LAZY_WAKE;
161 
162 	/*
163 	 * The xchg() in mpscq_push() implies a full barrier, which pairs with
164 	 * the barrier in set_current_state() on the io_cqring_wait() side. This
165 	 * ensures that either we see the updated ->cq_wait_nr, or waiters going
166 	 * to sleep will observe the work added to the list, which is similar to
167 	 * the wait/wake task state sync.
168 	 */
169 	if (mpscq_push(&ctx->work_list, &req->io_task_work.node)) {
170 		io_ctx_mark_taskrun(ctx);
171 		if (data_race(ctx->int_flags) & IO_RING_F_HAS_EVFD)
172 			io_eventfd_signal(ctx, false);
173 	}
174 
175 	/*
176 	 * No one is waiting (IO_CQ_WAKE_INIT), or this cycle's wake up has
177 	 * already been issued (zero or negative, see below).
178 	 */
179 	nr_wait = atomic_read(&ctx->cq_wait_nr);
180 	if (nr_wait <= 0)
181 		return;
182 	if (flags & IOU_F_TWQ_LAZY_WAKE) {
183 		/*
184 		 * ->cq_wait_nr counts down the number of lazy adds, once it
185 		 * hits zero we're good to wake the waiter. A producer that
186 		 * gets delayed between pushing its entry and getting here
187 		 * may count down a later wait cycle. That's OK, it'll be an
188 		 * early wake, not a lost one.
189 		 */
190 		if (!atomic_dec_and_test(&ctx->cq_wait_nr))
191 			return;
192 	} else if (atomic_xchg(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT) <= 0) {
193 		/*
194 		 * Potentially raced with lazy add, claim the wake. A value
195 		 * <= 0 means a lazy add hit zero or another forced add
196 		 * claimed IO_CQ_WAKE_INIT. Either way, the wake up for this
197 		 * wait cycle has already been done.
198 		 */
199 		return;
200 	}
201 	wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
202 }
203 
204 void io_req_normal_work_add(struct io_kiocb *req)
205 {
206 	struct io_uring_task *tctx = req->tctx;
207 	struct io_ring_ctx *ctx = req->ctx;
208 
209 	/* task_work already pending, we're done */
210 	if (!mpscq_push(&tctx->task_list, &req->io_task_work.node))
211 		return;
212 
213 	/*
214 	 * Doesn't need to use ->rings_rcu, as resizing isn't supported for
215 	 * !DEFER_TASKRUN.
216 	 */
217 	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
218 		atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
219 
220 	/* SQPOLL doesn't need the task_work added, it'll run it itself */
221 	if (ctx->flags & IORING_SETUP_SQPOLL) {
222 		__set_notify_signal(tctx->task);
223 		return;
224 	}
225 
226 	/* task_work must only be added once */
227 	if (test_and_set_bit(0, &tctx->tw_pending))
228 		return;
229 
230 	if (likely(!task_work_add(tctx->task, &tctx->task_work, ctx->notify_method)))
231 		return;
232 
233 	io_fallback_tw(tctx);
234 }
235 
236 void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags)
237 {
238 	if (WARN_ON_ONCE(!(req->ctx->flags & IORING_SETUP_DEFER_TASKRUN)))
239 		return;
240 	__io_req_task_work_add(req, flags);
241 }
242 
243 void __cold io_cancel_local_task_work(struct io_ring_ctx *ctx)
244 {
245 	struct io_tw_state ts = { .cancel = true };
246 	struct llist_node *node;
247 
248 	/*
249 	 * The work list consumer side is serialized by ->uring_lock, see
250 	 * __io_run_local_work(). Grab it to guard against racing with normal
251 	 * task_work running, as the task may be exiting. The ring is going
252 	 * away, run the entries in cancel mode right here - the callers
253 	 * provide the same process context the per-ctx fallback work that
254 	 * they were previously punted to ran in.
255 	 */
256 	guard(mutex)(&ctx->uring_lock);
257 
258 	while (!mpscq_empty(&ctx->work_list)) {
259 		struct io_kiocb *req;
260 
261 		node = mpscq_pop(&ctx->work_list, &ctx->work_head);
262 		if (!node) {
263 			/* a producer is mid-push, wait for it to link */
264 			cond_resched();
265 			continue;
266 		}
267 		req = container_of(node, struct io_kiocb, io_task_work.node);
268 		req->io_task_work.func((struct io_tw_req){req}, ts);
269 	}
270 	io_submit_flush_completions(ctx);
271 }
272 
273 static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
274 				       int min_events)
275 {
276 	if (!io_local_work_pending(ctx))
277 		return false;
278 	if (events < min_events)
279 		return true;
280 	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
281 		atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
282 	return false;
283 }
284 
285 static int __io_run_local_work_loop(struct io_ring_ctx *ctx,
286 				    io_tw_token_t tw,
287 				    int events)
288 {
289 	int ret = 0;
290 
291 	while (ret < events) {
292 		struct llist_node *node = mpscq_pop(&ctx->work_list, &ctx->work_head);
293 		struct io_kiocb *req;
294 
295 		if (!node)
296 			break;
297 		req = container_of(node, struct io_kiocb, io_task_work.node);
298 		INDIRECT_CALL_2(req->io_task_work.func,
299 				io_poll_task_func, io_req_rw_complete,
300 				(struct io_tw_req){req}, tw);
301 		ret++;
302 	}
303 
304 	return ret;
305 }
306 
307 static int __io_run_local_work(struct io_ring_ctx *ctx, io_tw_token_t tw,
308 			       int min_events, int max_events)
309 {
310 	unsigned int loops = 0;
311 	int ret = 0;
312 
313 	if (WARN_ON_ONCE(ctx->submitter_task != current))
314 		return -EEXIST;
315 	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
316 		atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
317 again:
318 	/*
319 	 * If the last loop made no progress while work is still pending,
320 	 * a producer has published a node but hasn't linked it into the
321 	 * queue yet (see mpscq_pop()). Give it a chance to finish rather
322 	 * than spinning on the queue.
323 	 */
324 	if (unlikely(loops && !ret))
325 		cond_resched();
326 	tw.cancel = io_should_terminate_tw(ctx);
327 	min_events -= ret;
328 	ret = __io_run_local_work_loop(ctx, tw, max_events);
329 	loops++;
330 
331 	if (io_run_local_work_continue(ctx, ret, min_events))
332 		goto again;
333 	io_submit_flush_completions(ctx);
334 	if (io_run_local_work_continue(ctx, ret, min_events))
335 		goto again;
336 
337 	trace_io_uring_local_work_run(ctx, ret, loops);
338 	return ret;
339 }
340 
341 int io_run_local_work_locked(struct io_ring_ctx *ctx, int min_events)
342 {
343 	struct io_tw_state ts = {};
344 
345 	if (!io_local_work_pending(ctx))
346 		return 0;
347 	return __io_run_local_work(ctx, ts, min_events,
348 					max(IO_LOCAL_TW_DEFAULT_MAX, min_events));
349 }
350 
351 int io_run_local_work(struct io_ring_ctx *ctx, int min_events, int max_events)
352 {
353 	struct io_tw_state ts = {};
354 	int ret;
355 
356 	mutex_lock(&ctx->uring_lock);
357 	ret = __io_run_local_work(ctx, ts, min_events, max_events);
358 	mutex_unlock(&ctx->uring_lock);
359 	return ret;
360 }
361