1 // SPDX-License-Identifier: GPL-2.0
2 /*
3 * Waiting for completion events
4 */
5 #include <linux/kernel.h>
6 #include <linux/sched/signal.h>
7 #include <linux/io_uring.h>
8 #include <linux/time_namespace.h>
9
10 #include <trace/events/io_uring.h>
11
12 #include <uapi/linux/io_uring.h>
13
14 #include "io_uring.h"
15 #include "napi.h"
16 #include "wait.h"
17
io_wake_function(struct wait_queue_entry * curr,unsigned int mode,int wake_flags,void * key)18 static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
19 int wake_flags, void *key)
20 {
21 struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue, wq);
22
23 /*
24 * Cannot safely flush overflowed CQEs from here, ensure we wake up
25 * the task, and the next invocation will do it.
26 */
27 if (io_should_wake(iowq) || io_has_work(iowq->ctx))
28 return autoremove_wake_function(curr, mode, wake_flags, key);
29 return -1;
30 }
31
io_run_task_work_sig(struct io_ring_ctx * ctx)32 int io_run_task_work_sig(struct io_ring_ctx *ctx)
33 {
34 if (io_local_work_pending(ctx)) {
35 __set_current_state(TASK_RUNNING);
36 if (io_run_local_work(ctx, INT_MAX, IO_LOCAL_TW_DEFAULT_MAX) > 0)
37 return 0;
38 }
39 if (io_run_task_work() > 0)
40 return 0;
41 if (task_sigpending(current))
42 return -EINTR;
43 return 0;
44 }
45
current_pending_io(void)46 static bool current_pending_io(void)
47 {
48 struct io_uring_task *tctx = current->io_uring;
49
50 if (!tctx)
51 return false;
52 return percpu_counter_read_positive(&tctx->inflight);
53 }
54
io_cqring_timer_wakeup(struct hrtimer * timer)55 static enum hrtimer_restart io_cqring_timer_wakeup(struct hrtimer *timer)
56 {
57 struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t);
58
59 WRITE_ONCE(iowq->hit_timeout, 1);
60 iowq->min_timeout = 0;
61 wake_up_process(iowq->wq.private);
62 return HRTIMER_NORESTART;
63 }
64
65 /*
66 * Doing min_timeout portion. If we saw any timeouts, events, or have work,
67 * wake up. If not, and we have a normal timeout, switch to that and keep
68 * sleeping.
69 */
io_cqring_min_timer_wakeup(struct hrtimer * timer)70 static enum hrtimer_restart io_cqring_min_timer_wakeup(struct hrtimer *timer)
71 {
72 struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t);
73 struct io_ring_ctx *ctx = iowq->ctx;
74
75 /* no general timeout, or shorter (or equal), we are done */
76 if (iowq->timeout == KTIME_MAX ||
77 ktime_compare(iowq->min_timeout, iowq->timeout) >= 0)
78 goto out_wake;
79 /* work we may need to run, wake function will see if we need to wake */
80 if (io_has_work(ctx))
81 goto out_wake;
82 /* got events since we started waiting, min timeout is done */
83 scoped_guard(rcu) {
84 struct io_rings *rings = io_get_rings(ctx);
85
86 if (iowq->cq_min_tail != READ_ONCE(rings->cq.tail))
87 goto out_wake;
88 /* if we have any events and min timeout expired, we're done */
89 if (io_cqring_events(ctx))
90 goto out_wake;
91 }
92 /*
93 * If using deferred task_work running and application is waiting on
94 * more than one request, ensure we reset it now where we are switching
95 * to normal sleeps. Any request completion post min_wait should wake
96 * the task and return.
97 */
98 if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
99 atomic_set(&ctx->cq_wait_nr, 1);
100 smp_mb();
101 if (!llist_empty(&ctx->work_llist))
102 goto out_wake;
103 }
104
105 /* any generated CQE posted past this time should wake us up */
106 iowq->cq_tail = iowq->cq_min_tail;
107
108 hrtimer_update_function(&iowq->t, io_cqring_timer_wakeup);
109 hrtimer_set_expires(timer, iowq->timeout);
110 return HRTIMER_RESTART;
111 out_wake:
112 return io_cqring_timer_wakeup(timer);
113 }
114
io_cqring_schedule_timeout(struct io_wait_queue * iowq,clockid_t clock_id,ktime_t start_time)115 static int io_cqring_schedule_timeout(struct io_wait_queue *iowq,
116 clockid_t clock_id, ktime_t start_time)
117 {
118 ktime_t timeout;
119
120 if (iowq->min_timeout) {
121 timeout = ktime_add_ns(iowq->min_timeout, start_time);
122 hrtimer_setup_on_stack(&iowq->t, io_cqring_min_timer_wakeup, clock_id,
123 HRTIMER_MODE_ABS);
124 } else {
125 timeout = iowq->timeout;
126 hrtimer_setup_on_stack(&iowq->t, io_cqring_timer_wakeup, clock_id,
127 HRTIMER_MODE_ABS);
128 }
129
130 hrtimer_set_expires_range_ns(&iowq->t, timeout, 0);
131 hrtimer_start_expires(&iowq->t, HRTIMER_MODE_ABS);
132
133 if (!READ_ONCE(iowq->hit_timeout))
134 schedule();
135
136 hrtimer_cancel(&iowq->t);
137 destroy_hrtimer_on_stack(&iowq->t);
138 __set_current_state(TASK_RUNNING);
139
140 return READ_ONCE(iowq->hit_timeout) ? -ETIME : 0;
141 }
142
__io_cqring_wait_schedule(struct io_ring_ctx * ctx,struct io_wait_queue * iowq,struct ext_arg * ext_arg,ktime_t start_time)143 static int __io_cqring_wait_schedule(struct io_ring_ctx *ctx,
144 struct io_wait_queue *iowq,
145 struct ext_arg *ext_arg,
146 ktime_t start_time)
147 {
148 int ret = 0;
149
150 /*
151 * Mark us as being in io_wait if we have pending requests, so cpufreq
152 * can take into account that the task is waiting for IO - turns out
153 * to be important for low QD IO.
154 */
155 if (ext_arg->iowait && current_pending_io())
156 current->in_iowait = 1;
157 if (iowq->timeout != KTIME_MAX || iowq->min_timeout)
158 ret = io_cqring_schedule_timeout(iowq, ctx->clockid, start_time);
159 else
160 schedule();
161 current->in_iowait = 0;
162 return ret;
163 }
164
165 /* If this returns > 0, the caller should retry */
io_cqring_wait_schedule(struct io_ring_ctx * ctx,struct io_wait_queue * iowq,struct ext_arg * ext_arg,ktime_t start_time)166 static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx,
167 struct io_wait_queue *iowq,
168 struct ext_arg *ext_arg,
169 ktime_t start_time)
170 {
171 if (unlikely(READ_ONCE(ctx->check_cq)))
172 return 1;
173 if (unlikely(io_local_work_pending(ctx)))
174 return 1;
175 if (unlikely(task_work_pending(current)))
176 return 1;
177 if (unlikely(task_sigpending(current)))
178 return -EINTR;
179 if (unlikely(io_should_wake(iowq)))
180 return 0;
181
182 return __io_cqring_wait_schedule(ctx, iowq, ext_arg, start_time);
183 }
184
185 /*
186 * Wait until events become available, if we don't already have some. The
187 * application must reap them itself, as they reside on the shared cq ring.
188 */
io_cqring_wait(struct io_ring_ctx * ctx,int min_events,u32 flags,struct ext_arg * ext_arg)189 int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags,
190 struct ext_arg *ext_arg)
191 {
192 struct io_wait_queue iowq;
193 struct io_rings *rings;
194 ktime_t start_time;
195 int ret, nr_wait;
196
197 min_events = min_t(int, min_events, ctx->cq_entries);
198
199 if (!io_allowed_run_tw(ctx))
200 return -EEXIST;
201 if (io_local_work_pending(ctx))
202 io_run_local_work(ctx, min_events,
203 max(IO_LOCAL_TW_DEFAULT_MAX, min_events));
204 io_run_task_work();
205
206 if (unlikely(test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)))
207 io_cqring_do_overflow_flush(ctx);
208
209 rcu_read_lock();
210 rings = io_get_rings(ctx);
211 if (__io_cqring_events_user(ctx) >= min_events) {
212 rcu_read_unlock();
213 return 0;
214 }
215
216 init_waitqueue_func_entry(&iowq.wq, io_wake_function);
217 iowq.wq.private = current;
218 INIT_LIST_HEAD(&iowq.wq.entry);
219 iowq.ctx = ctx;
220 iowq.cq_tail = READ_ONCE(rings->cq.head) + min_events;
221 iowq.cq_min_tail = READ_ONCE(rings->cq.tail);
222 nr_wait = (int) iowq.cq_tail - READ_ONCE(rings->cq.tail);
223 rcu_read_unlock();
224 rings = NULL;
225 iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
226 iowq.hit_timeout = 0;
227 iowq.min_timeout = ext_arg->min_time;
228 iowq.timeout = KTIME_MAX;
229 start_time = io_get_time(ctx);
230
231 if (ext_arg->ts_set) {
232 iowq.timeout = timespec64_to_ktime(ext_arg->ts);
233 if (flags & IORING_ENTER_ABS_TIMER)
234 iowq.timeout = timens_ktime_to_host(ctx->clockid,
235 iowq.timeout);
236 else
237 iowq.timeout = ktime_add(iowq.timeout, start_time);
238 }
239
240 if (ext_arg->sig) {
241 #ifdef CONFIG_COMPAT
242 if (in_compat_syscall())
243 ret = set_compat_user_sigmask((const compat_sigset_t __user *)ext_arg->sig,
244 ext_arg->argsz);
245 else
246 #endif
247 ret = set_user_sigmask(ext_arg->sig, ext_arg->argsz);
248
249 if (ret)
250 return ret;
251 }
252
253 io_napi_busy_loop(ctx, &iowq);
254
255 trace_io_uring_cqring_wait(ctx, min_events);
256 do {
257 unsigned long check_cq;
258
259 if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
260 atomic_set(&ctx->cq_wait_nr, nr_wait);
261 set_current_state(TASK_INTERRUPTIBLE);
262 } else {
263 prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
264 TASK_INTERRUPTIBLE);
265 }
266
267 ret = io_cqring_wait_schedule(ctx, &iowq, ext_arg, start_time);
268 __set_current_state(TASK_RUNNING);
269 atomic_set(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT);
270
271 /*
272 * Run task_work after scheduling and before io_should_wake().
273 * If we got woken because of task_work being processed, run it
274 * now rather than let the caller do another wait loop.
275 */
276 if (io_local_work_pending(ctx))
277 io_run_local_work(ctx, nr_wait, nr_wait);
278 io_run_task_work();
279
280 /*
281 * Non-local task_work will be run on exit to userspace, but
282 * if we're using DEFER_TASKRUN, then we could have waited
283 * with a timeout for a number of requests. If the timeout
284 * hits, we could have some requests ready to process. Ensure
285 * this break is _after_ we have run task_work, to avoid
286 * deferring running potentially pending requests until the
287 * next time we wait for events.
288 */
289 if (ret < 0)
290 break;
291
292 check_cq = READ_ONCE(ctx->check_cq);
293 if (unlikely(check_cq)) {
294 /* let the caller flush overflows, retry */
295 if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
296 io_cqring_do_overflow_flush(ctx);
297 if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT)) {
298 ret = -EBADR;
299 break;
300 }
301 }
302
303 if (io_should_wake(&iowq)) {
304 ret = 0;
305 break;
306 }
307 cond_resched();
308
309 /* if min timeout has been hit, don't reset wait count */
310 if (!iowq.hit_timeout)
311 scoped_guard(rcu)
312 nr_wait = (int) iowq.cq_tail -
313 READ_ONCE(io_get_rings(ctx)->cq.tail);
314 else
315 nr_wait = 1;
316 } while (1);
317
318 if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN))
319 finish_wait(&ctx->cq_wait, &iowq.wq);
320 restore_saved_sigmask_unless(ret == -EINTR);
321
322 guard(rcu)();
323 return READ_ONCE(io_get_rings(ctx)->cq.head) == READ_ONCE(io_get_rings(ctx)->cq.tail) ? ret : 0;
324 }
325