xref: /linux/io_uring/wait.c (revision 8be01e1280912a84f6bcf963ceed6c9f13ba1986)
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Waiting for completion events
4  */
5 #include <linux/kernel.h>
6 #include <linux/sched/signal.h>
7 #include <linux/io_uring.h>
8 #include <linux/time_namespace.h>
9 
10 #include <trace/events/io_uring.h>
11 
12 #include <uapi/linux/io_uring.h>
13 
14 #include "io_uring.h"
15 #include "napi.h"
16 #include "wait.h"
17 
io_wake_function(struct wait_queue_entry * curr,unsigned int mode,int wake_flags,void * key)18 static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
19 			    int wake_flags, void *key)
20 {
21 	struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue, wq);
22 
23 	/*
24 	 * Cannot safely flush overflowed CQEs from here, ensure we wake up
25 	 * the task, and the next invocation will do it.
26 	 */
27 	if (io_should_wake(iowq) || io_has_work(iowq->ctx))
28 		return autoremove_wake_function(curr, mode, wake_flags, key);
29 	return -1;
30 }
31 
io_run_task_work_sig(struct io_ring_ctx * ctx)32 int io_run_task_work_sig(struct io_ring_ctx *ctx)
33 {
34 	if (io_local_work_pending(ctx)) {
35 		__set_current_state(TASK_RUNNING);
36 		if (io_run_local_work(ctx, INT_MAX, IO_LOCAL_TW_DEFAULT_MAX) > 0)
37 			return 0;
38 	}
39 	if (io_run_task_work() > 0)
40 		return 0;
41 	if (task_sigpending(current))
42 		return -EINTR;
43 	return 0;
44 }
45 
current_pending_io(void)46 static bool current_pending_io(void)
47 {
48 	struct io_uring_task *tctx = current->io_uring;
49 
50 	if (!tctx)
51 		return false;
52 	return percpu_counter_read_positive(&tctx->inflight);
53 }
54 
io_cqring_timer_wakeup(struct hrtimer * timer)55 static enum hrtimer_restart io_cqring_timer_wakeup(struct hrtimer *timer)
56 {
57 	struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t);
58 
59 	WRITE_ONCE(iowq->hit_timeout, 1);
60 	iowq->min_timeout = 0;
61 	wake_up_process(iowq->wq.private);
62 	return HRTIMER_NORESTART;
63 }
64 
65 /*
66  * Doing min_timeout portion. If we saw any timeouts, events, or have work,
67  * wake up. If not, and we have a normal timeout, switch to that and keep
68  * sleeping.
69  */
io_cqring_min_timer_wakeup(struct hrtimer * timer)70 static enum hrtimer_restart io_cqring_min_timer_wakeup(struct hrtimer *timer)
71 {
72 	struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t);
73 	struct io_ring_ctx *ctx = iowq->ctx;
74 
75 	/* no general timeout, or shorter (or equal), we are done */
76 	if (iowq->timeout == KTIME_MAX ||
77 	    ktime_compare(iowq->min_timeout, iowq->timeout) >= 0)
78 		goto out_wake;
79 	/* work we may need to run, wake function will see if we need to wake */
80 	if (io_has_work(ctx))
81 		goto out_wake;
82 	/* got events since we started waiting, min timeout is done */
83 	scoped_guard(rcu) {
84 		struct io_rings *rings = io_get_rings(ctx);
85 
86 		if (iowq->cq_min_tail != READ_ONCE(rings->cq.tail))
87 			goto out_wake;
88 		/* if we have any events and min timeout expired, we're done */
89 		if (io_cqring_events(ctx))
90 			goto out_wake;
91 	}
92 	/*
93 	 * If using deferred task_work running and application is waiting on
94 	 * more than one request, ensure we reset it now where we are switching
95 	 * to normal sleeps. Any request completion post min_wait should wake
96 	 * the task and return.
97 	 */
98 	if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
99 		atomic_set(&ctx->cq_wait_nr, 1);
100 		smp_mb();
101 		if (!llist_empty(&ctx->work_llist))
102 			goto out_wake;
103 	}
104 
105 	/* any generated CQE posted past this time should wake us up */
106 	iowq->cq_tail = iowq->cq_min_tail;
107 
108 	hrtimer_update_function(&iowq->t, io_cqring_timer_wakeup);
109 	hrtimer_set_expires(timer, iowq->timeout);
110 	return HRTIMER_RESTART;
111 out_wake:
112 	return io_cqring_timer_wakeup(timer);
113 }
114 
io_cqring_schedule_timeout(struct io_wait_queue * iowq,clockid_t clock_id,ktime_t start_time)115 static int io_cqring_schedule_timeout(struct io_wait_queue *iowq,
116 				      clockid_t clock_id, ktime_t start_time)
117 {
118 	ktime_t timeout;
119 
120 	if (iowq->min_timeout) {
121 		timeout = ktime_add_ns(iowq->min_timeout, start_time);
122 		hrtimer_setup_on_stack(&iowq->t, io_cqring_min_timer_wakeup, clock_id,
123 				       HRTIMER_MODE_ABS);
124 	} else {
125 		timeout = iowq->timeout;
126 		hrtimer_setup_on_stack(&iowq->t, io_cqring_timer_wakeup, clock_id,
127 				       HRTIMER_MODE_ABS);
128 	}
129 
130 	hrtimer_set_expires_range_ns(&iowq->t, timeout, 0);
131 	hrtimer_start_expires(&iowq->t, HRTIMER_MODE_ABS);
132 
133 	if (!READ_ONCE(iowq->hit_timeout))
134 		schedule();
135 
136 	hrtimer_cancel(&iowq->t);
137 	destroy_hrtimer_on_stack(&iowq->t);
138 	__set_current_state(TASK_RUNNING);
139 
140 	return READ_ONCE(iowq->hit_timeout) ? -ETIME : 0;
141 }
142 
__io_cqring_wait_schedule(struct io_ring_ctx * ctx,struct io_wait_queue * iowq,struct ext_arg * ext_arg,ktime_t start_time)143 static int __io_cqring_wait_schedule(struct io_ring_ctx *ctx,
144 				     struct io_wait_queue *iowq,
145 				     struct ext_arg *ext_arg,
146 				     ktime_t start_time)
147 {
148 	int ret = 0;
149 
150 	/*
151 	 * Mark us as being in io_wait if we have pending requests, so cpufreq
152 	 * can take into account that the task is waiting for IO - turns out
153 	 * to be important for low QD IO.
154 	 */
155 	if (ext_arg->iowait && current_pending_io())
156 		current->in_iowait = 1;
157 	if (iowq->timeout != KTIME_MAX || iowq->min_timeout)
158 		ret = io_cqring_schedule_timeout(iowq, ctx->clockid, start_time);
159 	else
160 		schedule();
161 	current->in_iowait = 0;
162 	return ret;
163 }
164 
165 /* If this returns > 0, the caller should retry */
io_cqring_wait_schedule(struct io_ring_ctx * ctx,struct io_wait_queue * iowq,struct ext_arg * ext_arg,ktime_t start_time)166 static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx,
167 					  struct io_wait_queue *iowq,
168 					  struct ext_arg *ext_arg,
169 					  ktime_t start_time)
170 {
171 	if (unlikely(READ_ONCE(ctx->check_cq)))
172 		return 1;
173 	if (unlikely(io_local_work_pending(ctx)))
174 		return 1;
175 	if (unlikely(task_work_pending(current)))
176 		return 1;
177 	if (unlikely(task_sigpending(current)))
178 		return -EINTR;
179 	if (unlikely(io_should_wake(iowq)))
180 		return 0;
181 
182 	return __io_cqring_wait_schedule(ctx, iowq, ext_arg, start_time);
183 }
184 
185 /*
186  * Wait until events become available, if we don't already have some. The
187  * application must reap them itself, as they reside on the shared cq ring.
188  */
io_cqring_wait(struct io_ring_ctx * ctx,int min_events,u32 flags,struct ext_arg * ext_arg)189 int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags,
190 		   struct ext_arg *ext_arg)
191 {
192 	struct io_wait_queue iowq;
193 	struct io_rings *rings;
194 	ktime_t start_time;
195 	int ret, nr_wait;
196 
197 	min_events = min_t(int, min_events, ctx->cq_entries);
198 
199 	if (!io_allowed_run_tw(ctx))
200 		return -EEXIST;
201 	if (io_local_work_pending(ctx))
202 		io_run_local_work(ctx, min_events,
203 				  max(IO_LOCAL_TW_DEFAULT_MAX, min_events));
204 	io_run_task_work();
205 
206 	if (unlikely(test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)))
207 		io_cqring_do_overflow_flush(ctx);
208 
209 	rcu_read_lock();
210 	rings = io_get_rings(ctx);
211 	if (__io_cqring_events_user(ctx) >= min_events) {
212 		rcu_read_unlock();
213 		return 0;
214 	}
215 
216 	init_waitqueue_func_entry(&iowq.wq, io_wake_function);
217 	iowq.wq.private = current;
218 	INIT_LIST_HEAD(&iowq.wq.entry);
219 	iowq.ctx = ctx;
220 	iowq.cq_tail = READ_ONCE(rings->cq.head) + min_events;
221 	iowq.cq_min_tail = READ_ONCE(rings->cq.tail);
222 	nr_wait = (int) iowq.cq_tail - READ_ONCE(rings->cq.tail);
223 	rcu_read_unlock();
224 	rings = NULL;
225 	iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
226 	iowq.hit_timeout = 0;
227 	iowq.min_timeout = ext_arg->min_time;
228 	iowq.timeout = KTIME_MAX;
229 	start_time = io_get_time(ctx);
230 
231 	if (ext_arg->ts_set) {
232 		iowq.timeout = timespec64_to_ktime(ext_arg->ts);
233 		if (flags & IORING_ENTER_ABS_TIMER)
234 			iowq.timeout = timens_ktime_to_host(ctx->clockid,
235 							    iowq.timeout);
236 		else
237 			iowq.timeout = ktime_add(iowq.timeout, start_time);
238 	}
239 
240 	if (ext_arg->sig) {
241 #ifdef CONFIG_COMPAT
242 		if (in_compat_syscall())
243 			ret = set_compat_user_sigmask((const compat_sigset_t __user *)ext_arg->sig,
244 						      ext_arg->argsz);
245 		else
246 #endif
247 			ret = set_user_sigmask(ext_arg->sig, ext_arg->argsz);
248 
249 		if (ret)
250 			return ret;
251 	}
252 
253 	io_napi_busy_loop(ctx, &iowq);
254 
255 	trace_io_uring_cqring_wait(ctx, min_events);
256 	do {
257 		unsigned long check_cq;
258 
259 		if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
260 			atomic_set(&ctx->cq_wait_nr, nr_wait);
261 			set_current_state(TASK_INTERRUPTIBLE);
262 		} else {
263 			prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
264 							TASK_INTERRUPTIBLE);
265 		}
266 
267 		ret = io_cqring_wait_schedule(ctx, &iowq, ext_arg, start_time);
268 		__set_current_state(TASK_RUNNING);
269 		atomic_set(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT);
270 
271 		/*
272 		 * Run task_work after scheduling and before io_should_wake().
273 		 * If we got woken because of task_work being processed, run it
274 		 * now rather than let the caller do another wait loop.
275 		 */
276 		if (io_local_work_pending(ctx))
277 			io_run_local_work(ctx, nr_wait, nr_wait);
278 		io_run_task_work();
279 
280 		/*
281 		 * Non-local task_work will be run on exit to userspace, but
282 		 * if we're using DEFER_TASKRUN, then we could have waited
283 		 * with a timeout for a number of requests. If the timeout
284 		 * hits, we could have some requests ready to process. Ensure
285 		 * this break is _after_ we have run task_work, to avoid
286 		 * deferring running potentially pending requests until the
287 		 * next time we wait for events.
288 		 */
289 		if (ret < 0)
290 			break;
291 
292 		check_cq = READ_ONCE(ctx->check_cq);
293 		if (unlikely(check_cq)) {
294 			/* let the caller flush overflows, retry */
295 			if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
296 				io_cqring_do_overflow_flush(ctx);
297 			if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT)) {
298 				ret = -EBADR;
299 				break;
300 			}
301 		}
302 
303 		if (io_should_wake(&iowq)) {
304 			ret = 0;
305 			break;
306 		}
307 		cond_resched();
308 
309 		/* if min timeout has been hit, don't reset wait count */
310 		if (!iowq.hit_timeout)
311 			scoped_guard(rcu)
312 				nr_wait = (int) iowq.cq_tail -
313 						READ_ONCE(io_get_rings(ctx)->cq.tail);
314 		else
315 			nr_wait = 1;
316 	} while (1);
317 
318 	if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN))
319 		finish_wait(&ctx->cq_wait, &iowq.wq);
320 	restore_saved_sigmask_unless(ret == -EINTR);
321 
322 	guard(rcu)();
323 	return READ_ONCE(io_get_rings(ctx)->cq.head) == READ_ONCE(io_get_rings(ctx)->cq.tail) ? ret : 0;
324 }
325