xref: /linux/io_uring/wait.c (revision 37a93dd5c49b5fda807fd204edf2547c3493319c)
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Waiting for completion events
4  */
5 #include <linux/kernel.h>
6 #include <linux/sched/signal.h>
7 #include <linux/io_uring.h>
8 
9 #include <trace/events/io_uring.h>
10 
11 #include <uapi/linux/io_uring.h>
12 
13 #include "io_uring.h"
14 #include "napi.h"
15 #include "wait.h"
16 
17 static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
18 			    int wake_flags, void *key)
19 {
20 	struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue, wq);
21 
22 	/*
23 	 * Cannot safely flush overflowed CQEs from here, ensure we wake up
24 	 * the task, and the next invocation will do it.
25 	 */
26 	if (io_should_wake(iowq) || io_has_work(iowq->ctx))
27 		return autoremove_wake_function(curr, mode, wake_flags, key);
28 	return -1;
29 }
30 
31 int io_run_task_work_sig(struct io_ring_ctx *ctx)
32 {
33 	if (io_local_work_pending(ctx)) {
34 		__set_current_state(TASK_RUNNING);
35 		if (io_run_local_work(ctx, INT_MAX, IO_LOCAL_TW_DEFAULT_MAX) > 0)
36 			return 0;
37 	}
38 	if (io_run_task_work() > 0)
39 		return 0;
40 	if (task_sigpending(current))
41 		return -EINTR;
42 	return 0;
43 }
44 
45 static bool current_pending_io(void)
46 {
47 	struct io_uring_task *tctx = current->io_uring;
48 
49 	if (!tctx)
50 		return false;
51 	return percpu_counter_read_positive(&tctx->inflight);
52 }
53 
54 static enum hrtimer_restart io_cqring_timer_wakeup(struct hrtimer *timer)
55 {
56 	struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t);
57 
58 	WRITE_ONCE(iowq->hit_timeout, 1);
59 	iowq->min_timeout = 0;
60 	wake_up_process(iowq->wq.private);
61 	return HRTIMER_NORESTART;
62 }
63 
64 /*
65  * Doing min_timeout portion. If we saw any timeouts, events, or have work,
66  * wake up. If not, and we have a normal timeout, switch to that and keep
67  * sleeping.
68  */
69 static enum hrtimer_restart io_cqring_min_timer_wakeup(struct hrtimer *timer)
70 {
71 	struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t);
72 	struct io_ring_ctx *ctx = iowq->ctx;
73 
74 	/* no general timeout, or shorter (or equal), we are done */
75 	if (iowq->timeout == KTIME_MAX ||
76 	    ktime_compare(iowq->min_timeout, iowq->timeout) >= 0)
77 		goto out_wake;
78 	/* work we may need to run, wake function will see if we need to wake */
79 	if (io_has_work(ctx))
80 		goto out_wake;
81 	/* got events since we started waiting, min timeout is done */
82 	if (iowq->cq_min_tail != READ_ONCE(ctx->rings->cq.tail))
83 		goto out_wake;
84 	/* if we have any events and min timeout expired, we're done */
85 	if (io_cqring_events(ctx))
86 		goto out_wake;
87 
88 	/*
89 	 * If using deferred task_work running and application is waiting on
90 	 * more than one request, ensure we reset it now where we are switching
91 	 * to normal sleeps. Any request completion post min_wait should wake
92 	 * the task and return.
93 	 */
94 	if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
95 		atomic_set(&ctx->cq_wait_nr, 1);
96 		smp_mb();
97 		if (!llist_empty(&ctx->work_llist))
98 			goto out_wake;
99 	}
100 
101 	/* any generated CQE posted past this time should wake us up */
102 	iowq->cq_tail = iowq->cq_min_tail;
103 
104 	hrtimer_update_function(&iowq->t, io_cqring_timer_wakeup);
105 	hrtimer_set_expires(timer, iowq->timeout);
106 	return HRTIMER_RESTART;
107 out_wake:
108 	return io_cqring_timer_wakeup(timer);
109 }
110 
111 static int io_cqring_schedule_timeout(struct io_wait_queue *iowq,
112 				      clockid_t clock_id, ktime_t start_time)
113 {
114 	ktime_t timeout;
115 
116 	if (iowq->min_timeout) {
117 		timeout = ktime_add_ns(iowq->min_timeout, start_time);
118 		hrtimer_setup_on_stack(&iowq->t, io_cqring_min_timer_wakeup, clock_id,
119 				       HRTIMER_MODE_ABS);
120 	} else {
121 		timeout = iowq->timeout;
122 		hrtimer_setup_on_stack(&iowq->t, io_cqring_timer_wakeup, clock_id,
123 				       HRTIMER_MODE_ABS);
124 	}
125 
126 	hrtimer_set_expires_range_ns(&iowq->t, timeout, 0);
127 	hrtimer_start_expires(&iowq->t, HRTIMER_MODE_ABS);
128 
129 	if (!READ_ONCE(iowq->hit_timeout))
130 		schedule();
131 
132 	hrtimer_cancel(&iowq->t);
133 	destroy_hrtimer_on_stack(&iowq->t);
134 	__set_current_state(TASK_RUNNING);
135 
136 	return READ_ONCE(iowq->hit_timeout) ? -ETIME : 0;
137 }
138 
139 static int __io_cqring_wait_schedule(struct io_ring_ctx *ctx,
140 				     struct io_wait_queue *iowq,
141 				     struct ext_arg *ext_arg,
142 				     ktime_t start_time)
143 {
144 	int ret = 0;
145 
146 	/*
147 	 * Mark us as being in io_wait if we have pending requests, so cpufreq
148 	 * can take into account that the task is waiting for IO - turns out
149 	 * to be important for low QD IO.
150 	 */
151 	if (ext_arg->iowait && current_pending_io())
152 		current->in_iowait = 1;
153 	if (iowq->timeout != KTIME_MAX || iowq->min_timeout)
154 		ret = io_cqring_schedule_timeout(iowq, ctx->clockid, start_time);
155 	else
156 		schedule();
157 	current->in_iowait = 0;
158 	return ret;
159 }
160 
161 /* If this returns > 0, the caller should retry */
162 static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx,
163 					  struct io_wait_queue *iowq,
164 					  struct ext_arg *ext_arg,
165 					  ktime_t start_time)
166 {
167 	if (unlikely(READ_ONCE(ctx->check_cq)))
168 		return 1;
169 	if (unlikely(io_local_work_pending(ctx)))
170 		return 1;
171 	if (unlikely(task_work_pending(current)))
172 		return 1;
173 	if (unlikely(task_sigpending(current)))
174 		return -EINTR;
175 	if (unlikely(io_should_wake(iowq)))
176 		return 0;
177 
178 	return __io_cqring_wait_schedule(ctx, iowq, ext_arg, start_time);
179 }
180 
181 /*
182  * Wait until events become available, if we don't already have some. The
183  * application must reap them itself, as they reside on the shared cq ring.
184  */
185 int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags,
186 		   struct ext_arg *ext_arg)
187 {
188 	struct io_wait_queue iowq;
189 	struct io_rings *rings = ctx->rings;
190 	ktime_t start_time;
191 	int ret;
192 
193 	min_events = min_t(int, min_events, ctx->cq_entries);
194 
195 	if (!io_allowed_run_tw(ctx))
196 		return -EEXIST;
197 	if (io_local_work_pending(ctx))
198 		io_run_local_work(ctx, min_events,
199 				  max(IO_LOCAL_TW_DEFAULT_MAX, min_events));
200 	io_run_task_work();
201 
202 	if (unlikely(test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)))
203 		io_cqring_do_overflow_flush(ctx);
204 	if (__io_cqring_events_user(ctx) >= min_events)
205 		return 0;
206 
207 	init_waitqueue_func_entry(&iowq.wq, io_wake_function);
208 	iowq.wq.private = current;
209 	INIT_LIST_HEAD(&iowq.wq.entry);
210 	iowq.ctx = ctx;
211 	iowq.cq_tail = READ_ONCE(ctx->rings->cq.head) + min_events;
212 	iowq.cq_min_tail = READ_ONCE(ctx->rings->cq.tail);
213 	iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
214 	iowq.hit_timeout = 0;
215 	iowq.min_timeout = ext_arg->min_time;
216 	iowq.timeout = KTIME_MAX;
217 	start_time = io_get_time(ctx);
218 
219 	if (ext_arg->ts_set) {
220 		iowq.timeout = timespec64_to_ktime(ext_arg->ts);
221 		if (!(flags & IORING_ENTER_ABS_TIMER))
222 			iowq.timeout = ktime_add(iowq.timeout, start_time);
223 	}
224 
225 	if (ext_arg->sig) {
226 #ifdef CONFIG_COMPAT
227 		if (in_compat_syscall())
228 			ret = set_compat_user_sigmask((const compat_sigset_t __user *)ext_arg->sig,
229 						      ext_arg->argsz);
230 		else
231 #endif
232 			ret = set_user_sigmask(ext_arg->sig, ext_arg->argsz);
233 
234 		if (ret)
235 			return ret;
236 	}
237 
238 	io_napi_busy_loop(ctx, &iowq);
239 
240 	trace_io_uring_cqring_wait(ctx, min_events);
241 	do {
242 		unsigned long check_cq;
243 		int nr_wait;
244 
245 		/* if min timeout has been hit, don't reset wait count */
246 		if (!iowq.hit_timeout)
247 			nr_wait = (int) iowq.cq_tail -
248 					READ_ONCE(ctx->rings->cq.tail);
249 		else
250 			nr_wait = 1;
251 
252 		if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
253 			atomic_set(&ctx->cq_wait_nr, nr_wait);
254 			set_current_state(TASK_INTERRUPTIBLE);
255 		} else {
256 			prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
257 							TASK_INTERRUPTIBLE);
258 		}
259 
260 		ret = io_cqring_wait_schedule(ctx, &iowq, ext_arg, start_time);
261 		__set_current_state(TASK_RUNNING);
262 		atomic_set(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT);
263 
264 		/*
265 		 * Run task_work after scheduling and before io_should_wake().
266 		 * If we got woken because of task_work being processed, run it
267 		 * now rather than let the caller do another wait loop.
268 		 */
269 		if (io_local_work_pending(ctx))
270 			io_run_local_work(ctx, nr_wait, nr_wait);
271 		io_run_task_work();
272 
273 		/*
274 		 * Non-local task_work will be run on exit to userspace, but
275 		 * if we're using DEFER_TASKRUN, then we could have waited
276 		 * with a timeout for a number of requests. If the timeout
277 		 * hits, we could have some requests ready to process. Ensure
278 		 * this break is _after_ we have run task_work, to avoid
279 		 * deferring running potentially pending requests until the
280 		 * next time we wait for events.
281 		 */
282 		if (ret < 0)
283 			break;
284 
285 		check_cq = READ_ONCE(ctx->check_cq);
286 		if (unlikely(check_cq)) {
287 			/* let the caller flush overflows, retry */
288 			if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
289 				io_cqring_do_overflow_flush(ctx);
290 			if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT)) {
291 				ret = -EBADR;
292 				break;
293 			}
294 		}
295 
296 		if (io_should_wake(&iowq)) {
297 			ret = 0;
298 			break;
299 		}
300 		cond_resched();
301 	} while (1);
302 
303 	if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN))
304 		finish_wait(&ctx->cq_wait, &iowq.wq);
305 	restore_saved_sigmask_unless(ret == -EINTR);
306 
307 	return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0;
308 }
309