xref: /linux/io_uring/wait.c (revision 35c2c39832e569449b9192fa1afbbc4c66227af7)
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Waiting for completion events
4  */
5 #include <linux/kernel.h>
6 #include <linux/sched/signal.h>
7 #include <linux/io_uring.h>
8 
9 #include <trace/events/io_uring.h>
10 
11 #include <uapi/linux/io_uring.h>
12 
13 #include "io_uring.h"
14 #include "napi.h"
15 #include "wait.h"
16 
17 static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
18 			    int wake_flags, void *key)
19 {
20 	struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue, wq);
21 
22 	/*
23 	 * Cannot safely flush overflowed CQEs from here, ensure we wake up
24 	 * the task, and the next invocation will do it.
25 	 */
26 	if (io_should_wake(iowq) || io_has_work(iowq->ctx))
27 		return autoremove_wake_function(curr, mode, wake_flags, key);
28 	return -1;
29 }
30 
31 int io_run_task_work_sig(struct io_ring_ctx *ctx)
32 {
33 	if (io_local_work_pending(ctx)) {
34 		__set_current_state(TASK_RUNNING);
35 		if (io_run_local_work(ctx, INT_MAX, IO_LOCAL_TW_DEFAULT_MAX) > 0)
36 			return 0;
37 	}
38 	if (io_run_task_work() > 0)
39 		return 0;
40 	if (task_sigpending(current))
41 		return -EINTR;
42 	return 0;
43 }
44 
45 static bool current_pending_io(void)
46 {
47 	struct io_uring_task *tctx = current->io_uring;
48 
49 	if (!tctx)
50 		return false;
51 	return percpu_counter_read_positive(&tctx->inflight);
52 }
53 
54 static enum hrtimer_restart io_cqring_timer_wakeup(struct hrtimer *timer)
55 {
56 	struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t);
57 
58 	WRITE_ONCE(iowq->hit_timeout, 1);
59 	iowq->min_timeout = 0;
60 	wake_up_process(iowq->wq.private);
61 	return HRTIMER_NORESTART;
62 }
63 
64 /*
65  * Doing min_timeout portion. If we saw any timeouts, events, or have work,
66  * wake up. If not, and we have a normal timeout, switch to that and keep
67  * sleeping.
68  */
69 static enum hrtimer_restart io_cqring_min_timer_wakeup(struct hrtimer *timer)
70 {
71 	struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t);
72 	struct io_ring_ctx *ctx = iowq->ctx;
73 
74 	/* no general timeout, or shorter (or equal), we are done */
75 	if (iowq->timeout == KTIME_MAX ||
76 	    ktime_compare(iowq->min_timeout, iowq->timeout) >= 0)
77 		goto out_wake;
78 	/* work we may need to run, wake function will see if we need to wake */
79 	if (io_has_work(ctx))
80 		goto out_wake;
81 	/* got events since we started waiting, min timeout is done */
82 	scoped_guard(rcu) {
83 		struct io_rings *rings = io_get_rings(ctx);
84 
85 		if (iowq->cq_min_tail != READ_ONCE(rings->cq.tail))
86 			goto out_wake;
87 		/* if we have any events and min timeout expired, we're done */
88 		if (io_cqring_events(ctx))
89 			goto out_wake;
90 	}
91 	/*
92 	 * If using deferred task_work running and application is waiting on
93 	 * more than one request, ensure we reset it now where we are switching
94 	 * to normal sleeps. Any request completion post min_wait should wake
95 	 * the task and return.
96 	 */
97 	if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
98 		atomic_set(&ctx->cq_wait_nr, 1);
99 		smp_mb();
100 		if (!llist_empty(&ctx->work_llist))
101 			goto out_wake;
102 	}
103 
104 	/* any generated CQE posted past this time should wake us up */
105 	iowq->cq_tail = iowq->cq_min_tail;
106 
107 	hrtimer_update_function(&iowq->t, io_cqring_timer_wakeup);
108 	hrtimer_set_expires(timer, iowq->timeout);
109 	return HRTIMER_RESTART;
110 out_wake:
111 	return io_cqring_timer_wakeup(timer);
112 }
113 
114 static int io_cqring_schedule_timeout(struct io_wait_queue *iowq,
115 				      clockid_t clock_id, ktime_t start_time)
116 {
117 	ktime_t timeout;
118 
119 	if (iowq->min_timeout) {
120 		timeout = ktime_add_ns(iowq->min_timeout, start_time);
121 		hrtimer_setup_on_stack(&iowq->t, io_cqring_min_timer_wakeup, clock_id,
122 				       HRTIMER_MODE_ABS);
123 	} else {
124 		timeout = iowq->timeout;
125 		hrtimer_setup_on_stack(&iowq->t, io_cqring_timer_wakeup, clock_id,
126 				       HRTIMER_MODE_ABS);
127 	}
128 
129 	hrtimer_set_expires_range_ns(&iowq->t, timeout, 0);
130 	hrtimer_start_expires(&iowq->t, HRTIMER_MODE_ABS);
131 
132 	if (!READ_ONCE(iowq->hit_timeout))
133 		schedule();
134 
135 	hrtimer_cancel(&iowq->t);
136 	destroy_hrtimer_on_stack(&iowq->t);
137 	__set_current_state(TASK_RUNNING);
138 
139 	return READ_ONCE(iowq->hit_timeout) ? -ETIME : 0;
140 }
141 
142 static int __io_cqring_wait_schedule(struct io_ring_ctx *ctx,
143 				     struct io_wait_queue *iowq,
144 				     struct ext_arg *ext_arg,
145 				     ktime_t start_time)
146 {
147 	int ret = 0;
148 
149 	/*
150 	 * Mark us as being in io_wait if we have pending requests, so cpufreq
151 	 * can take into account that the task is waiting for IO - turns out
152 	 * to be important for low QD IO.
153 	 */
154 	if (ext_arg->iowait && current_pending_io())
155 		current->in_iowait = 1;
156 	if (iowq->timeout != KTIME_MAX || iowq->min_timeout)
157 		ret = io_cqring_schedule_timeout(iowq, ctx->clockid, start_time);
158 	else
159 		schedule();
160 	current->in_iowait = 0;
161 	return ret;
162 }
163 
164 /* If this returns > 0, the caller should retry */
165 static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx,
166 					  struct io_wait_queue *iowq,
167 					  struct ext_arg *ext_arg,
168 					  ktime_t start_time)
169 {
170 	if (unlikely(READ_ONCE(ctx->check_cq)))
171 		return 1;
172 	if (unlikely(io_local_work_pending(ctx)))
173 		return 1;
174 	if (unlikely(task_work_pending(current)))
175 		return 1;
176 	if (unlikely(task_sigpending(current)))
177 		return -EINTR;
178 	if (unlikely(io_should_wake(iowq)))
179 		return 0;
180 
181 	return __io_cqring_wait_schedule(ctx, iowq, ext_arg, start_time);
182 }
183 
184 /*
185  * Wait until events become available, if we don't already have some. The
186  * application must reap them itself, as they reside on the shared cq ring.
187  */
188 int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags,
189 		   struct ext_arg *ext_arg)
190 {
191 	struct io_wait_queue iowq;
192 	struct io_rings *rings;
193 	ktime_t start_time;
194 	int ret, nr_wait;
195 
196 	min_events = min_t(int, min_events, ctx->cq_entries);
197 
198 	if (!io_allowed_run_tw(ctx))
199 		return -EEXIST;
200 	if (io_local_work_pending(ctx))
201 		io_run_local_work(ctx, min_events,
202 				  max(IO_LOCAL_TW_DEFAULT_MAX, min_events));
203 	io_run_task_work();
204 
205 	if (unlikely(test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)))
206 		io_cqring_do_overflow_flush(ctx);
207 
208 	rcu_read_lock();
209 	rings = io_get_rings(ctx);
210 	if (__io_cqring_events_user(ctx) >= min_events) {
211 		rcu_read_unlock();
212 		return 0;
213 	}
214 
215 	init_waitqueue_func_entry(&iowq.wq, io_wake_function);
216 	iowq.wq.private = current;
217 	INIT_LIST_HEAD(&iowq.wq.entry);
218 	iowq.ctx = ctx;
219 	iowq.cq_tail = READ_ONCE(rings->cq.head) + min_events;
220 	iowq.cq_min_tail = READ_ONCE(rings->cq.tail);
221 	nr_wait = (int) iowq.cq_tail - READ_ONCE(rings->cq.tail);
222 	rcu_read_unlock();
223 	rings = NULL;
224 	iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
225 	iowq.hit_timeout = 0;
226 	iowq.min_timeout = ext_arg->min_time;
227 	iowq.timeout = KTIME_MAX;
228 	start_time = io_get_time(ctx);
229 
230 	if (ext_arg->ts_set) {
231 		iowq.timeout = timespec64_to_ktime(ext_arg->ts);
232 		if (!(flags & IORING_ENTER_ABS_TIMER))
233 			iowq.timeout = ktime_add(iowq.timeout, start_time);
234 	}
235 
236 	if (ext_arg->sig) {
237 #ifdef CONFIG_COMPAT
238 		if (in_compat_syscall())
239 			ret = set_compat_user_sigmask((const compat_sigset_t __user *)ext_arg->sig,
240 						      ext_arg->argsz);
241 		else
242 #endif
243 			ret = set_user_sigmask(ext_arg->sig, ext_arg->argsz);
244 
245 		if (ret)
246 			return ret;
247 	}
248 
249 	io_napi_busy_loop(ctx, &iowq);
250 
251 	trace_io_uring_cqring_wait(ctx, min_events);
252 	do {
253 		unsigned long check_cq;
254 
255 		if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
256 			atomic_set(&ctx->cq_wait_nr, nr_wait);
257 			set_current_state(TASK_INTERRUPTIBLE);
258 		} else {
259 			prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
260 							TASK_INTERRUPTIBLE);
261 		}
262 
263 		ret = io_cqring_wait_schedule(ctx, &iowq, ext_arg, start_time);
264 		__set_current_state(TASK_RUNNING);
265 		atomic_set(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT);
266 
267 		/*
268 		 * Run task_work after scheduling and before io_should_wake().
269 		 * If we got woken because of task_work being processed, run it
270 		 * now rather than let the caller do another wait loop.
271 		 */
272 		if (io_local_work_pending(ctx))
273 			io_run_local_work(ctx, nr_wait, nr_wait);
274 		io_run_task_work();
275 
276 		/*
277 		 * Non-local task_work will be run on exit to userspace, but
278 		 * if we're using DEFER_TASKRUN, then we could have waited
279 		 * with a timeout for a number of requests. If the timeout
280 		 * hits, we could have some requests ready to process. Ensure
281 		 * this break is _after_ we have run task_work, to avoid
282 		 * deferring running potentially pending requests until the
283 		 * next time we wait for events.
284 		 */
285 		if (ret < 0)
286 			break;
287 
288 		check_cq = READ_ONCE(ctx->check_cq);
289 		if (unlikely(check_cq)) {
290 			/* let the caller flush overflows, retry */
291 			if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
292 				io_cqring_do_overflow_flush(ctx);
293 			if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT)) {
294 				ret = -EBADR;
295 				break;
296 			}
297 		}
298 
299 		if (io_should_wake(&iowq)) {
300 			ret = 0;
301 			break;
302 		}
303 		cond_resched();
304 
305 		/* if min timeout has been hit, don't reset wait count */
306 		if (!iowq.hit_timeout)
307 			scoped_guard(rcu)
308 				nr_wait = (int) iowq.cq_tail -
309 						READ_ONCE(io_get_rings(ctx)->cq.tail);
310 		else
311 			nr_wait = 1;
312 	} while (1);
313 
314 	if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN))
315 		finish_wait(&ctx->cq_wait, &iowq.wq);
316 	restore_saved_sigmask_unless(ret == -EINTR);
317 
318 	guard(rcu)();
319 	return READ_ONCE(io_get_rings(ctx)->cq.head) == READ_ONCE(io_get_rings(ctx)->cq.tail) ? ret : 0;
320 }
321