1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Waiting for completion events 4 */ 5 #include <linux/kernel.h> 6 #include <linux/sched/signal.h> 7 #include <linux/io_uring.h> 8 #include <linux/time_namespace.h> 9 10 #include <trace/events/io_uring.h> 11 12 #include <uapi/linux/io_uring.h> 13 14 #include "io_uring.h" 15 #include "napi.h" 16 #include "wait.h" 17 18 static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode, 19 int wake_flags, void *key) 20 { 21 struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue, wq); 22 23 /* 24 * Cannot safely flush overflowed CQEs from here, ensure we wake up 25 * the task, and the next invocation will do it. 26 */ 27 if (io_should_wake(iowq) || io_has_work(iowq->ctx)) 28 return autoremove_wake_function(curr, mode, wake_flags, key); 29 return -1; 30 } 31 32 int io_run_task_work_sig(struct io_ring_ctx *ctx) 33 { 34 if (io_local_work_pending(ctx)) { 35 __set_current_state(TASK_RUNNING); 36 if (io_run_local_work(ctx, INT_MAX, IO_LOCAL_TW_DEFAULT_MAX) > 0) 37 return 0; 38 } 39 if (io_run_task_work() > 0) 40 return 0; 41 if (task_sigpending(current)) 42 return -EINTR; 43 return 0; 44 } 45 46 static bool current_pending_io(void) 47 { 48 struct io_uring_task *tctx = current->io_uring; 49 50 if (!tctx) 51 return false; 52 return percpu_counter_read_positive(&tctx->inflight); 53 } 54 55 static enum hrtimer_restart io_cqring_timer_wakeup(struct hrtimer *timer) 56 { 57 struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t); 58 59 WRITE_ONCE(iowq->hit_timeout, 1); 60 iowq->min_timeout = 0; 61 wake_up_process(iowq->wq.private); 62 return HRTIMER_NORESTART; 63 } 64 65 /* 66 * Doing min_timeout portion. If we saw any timeouts, events, or have work, 67 * wake up. If not, and we have a normal timeout, switch to that and keep 68 * sleeping. 69 */ 70 static enum hrtimer_restart io_cqring_min_timer_wakeup(struct hrtimer *timer) 71 { 72 struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t); 73 struct io_ring_ctx *ctx = iowq->ctx; 74 75 /* no general timeout, or shorter (or equal), we are done */ 76 if (iowq->timeout == KTIME_MAX || 77 ktime_compare(iowq->min_timeout, iowq->timeout) >= 0) 78 goto out_wake; 79 /* work we may need to run, wake function will see if we need to wake */ 80 if (io_has_work(ctx)) 81 goto out_wake; 82 /* got events since we started waiting, min timeout is done */ 83 scoped_guard(rcu) { 84 struct io_rings *rings = io_get_rings(ctx); 85 86 if (iowq->cq_min_tail != READ_ONCE(rings->cq.tail)) 87 goto out_wake; 88 /* if we have any events and min timeout expired, we're done */ 89 if (io_cqring_events(ctx)) 90 goto out_wake; 91 } 92 /* 93 * If using deferred task_work running and application is waiting on 94 * more than one request, ensure we reset it now where we are switching 95 * to normal sleeps. Any request completion post min_wait should wake 96 * the task and return. 97 */ 98 if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) { 99 atomic_set(&ctx->cq_wait_nr, 1); 100 smp_mb(); 101 if (!llist_empty(&ctx->work_llist)) 102 goto out_wake; 103 } 104 105 /* any generated CQE posted past this time should wake us up */ 106 iowq->cq_tail = iowq->cq_min_tail; 107 108 hrtimer_update_function(&iowq->t, io_cqring_timer_wakeup); 109 hrtimer_set_expires(timer, iowq->timeout); 110 return HRTIMER_RESTART; 111 out_wake: 112 return io_cqring_timer_wakeup(timer); 113 } 114 115 static int io_cqring_schedule_timeout(struct io_wait_queue *iowq, 116 clockid_t clock_id, ktime_t start_time) 117 { 118 ktime_t timeout; 119 120 if (iowq->min_timeout) { 121 timeout = ktime_add_ns(iowq->min_timeout, start_time); 122 hrtimer_setup_on_stack(&iowq->t, io_cqring_min_timer_wakeup, clock_id, 123 HRTIMER_MODE_ABS); 124 } else { 125 timeout = iowq->timeout; 126 hrtimer_setup_on_stack(&iowq->t, io_cqring_timer_wakeup, clock_id, 127 HRTIMER_MODE_ABS); 128 } 129 130 hrtimer_set_expires_range_ns(&iowq->t, timeout, 0); 131 hrtimer_start_expires(&iowq->t, HRTIMER_MODE_ABS); 132 133 if (!READ_ONCE(iowq->hit_timeout)) 134 schedule(); 135 136 hrtimer_cancel(&iowq->t); 137 destroy_hrtimer_on_stack(&iowq->t); 138 __set_current_state(TASK_RUNNING); 139 140 return READ_ONCE(iowq->hit_timeout) ? -ETIME : 0; 141 } 142 143 static int __io_cqring_wait_schedule(struct io_ring_ctx *ctx, 144 struct io_wait_queue *iowq, 145 struct ext_arg *ext_arg, 146 ktime_t start_time) 147 { 148 int ret = 0; 149 150 /* 151 * Mark us as being in io_wait if we have pending requests, so cpufreq 152 * can take into account that the task is waiting for IO - turns out 153 * to be important for low QD IO. 154 */ 155 if (ext_arg->iowait && current_pending_io()) 156 current->in_iowait = 1; 157 if (iowq->timeout != KTIME_MAX || iowq->min_timeout) 158 ret = io_cqring_schedule_timeout(iowq, ctx->clockid, start_time); 159 else 160 schedule(); 161 current->in_iowait = 0; 162 return ret; 163 } 164 165 /* If this returns > 0, the caller should retry */ 166 static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx, 167 struct io_wait_queue *iowq, 168 struct ext_arg *ext_arg, 169 ktime_t start_time) 170 { 171 if (unlikely(READ_ONCE(ctx->check_cq))) 172 return 1; 173 if (unlikely(io_local_work_pending(ctx))) 174 return 1; 175 if (unlikely(task_work_pending(current))) 176 return 1; 177 if (unlikely(task_sigpending(current))) 178 return -EINTR; 179 if (unlikely(io_should_wake(iowq))) 180 return 0; 181 182 return __io_cqring_wait_schedule(ctx, iowq, ext_arg, start_time); 183 } 184 185 /* 186 * Wait until events become available, if we don't already have some. The 187 * application must reap them itself, as they reside on the shared cq ring. 188 */ 189 int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags, 190 struct ext_arg *ext_arg) 191 { 192 struct io_wait_queue iowq; 193 struct io_rings *rings; 194 ktime_t start_time; 195 int ret, nr_wait; 196 197 min_events = min_t(int, min_events, ctx->cq_entries); 198 199 if (!io_allowed_run_tw(ctx)) 200 return -EEXIST; 201 if (io_local_work_pending(ctx)) 202 io_run_local_work(ctx, min_events, 203 max(IO_LOCAL_TW_DEFAULT_MAX, min_events)); 204 io_run_task_work(); 205 206 if (unlikely(test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq))) 207 io_cqring_do_overflow_flush(ctx); 208 209 rcu_read_lock(); 210 rings = io_get_rings(ctx); 211 if (__io_cqring_events_user(ctx) >= min_events) { 212 rcu_read_unlock(); 213 return 0; 214 } 215 216 init_waitqueue_func_entry(&iowq.wq, io_wake_function); 217 iowq.wq.private = current; 218 INIT_LIST_HEAD(&iowq.wq.entry); 219 iowq.ctx = ctx; 220 iowq.cq_tail = READ_ONCE(rings->cq.head) + min_events; 221 iowq.cq_min_tail = READ_ONCE(rings->cq.tail); 222 nr_wait = (int) iowq.cq_tail - READ_ONCE(rings->cq.tail); 223 rcu_read_unlock(); 224 rings = NULL; 225 iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts); 226 iowq.hit_timeout = 0; 227 iowq.min_timeout = ext_arg->min_time; 228 iowq.timeout = KTIME_MAX; 229 start_time = io_get_time(ctx); 230 231 if (ext_arg->ts_set) { 232 iowq.timeout = timespec64_to_ktime(ext_arg->ts); 233 if (flags & IORING_ENTER_ABS_TIMER) 234 iowq.timeout = timens_ktime_to_host(ctx->clockid, 235 iowq.timeout); 236 else 237 iowq.timeout = ktime_add(iowq.timeout, start_time); 238 } 239 240 if (ext_arg->sig) { 241 #ifdef CONFIG_COMPAT 242 if (in_compat_syscall()) 243 ret = set_compat_user_sigmask((const compat_sigset_t __user *)ext_arg->sig, 244 ext_arg->argsz); 245 else 246 #endif 247 ret = set_user_sigmask(ext_arg->sig, ext_arg->argsz); 248 249 if (ret) 250 return ret; 251 } 252 253 io_napi_busy_loop(ctx, &iowq); 254 255 trace_io_uring_cqring_wait(ctx, min_events); 256 do { 257 unsigned long check_cq; 258 259 if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) { 260 atomic_set(&ctx->cq_wait_nr, nr_wait); 261 set_current_state(TASK_INTERRUPTIBLE); 262 } else { 263 prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq, 264 TASK_INTERRUPTIBLE); 265 } 266 267 ret = io_cqring_wait_schedule(ctx, &iowq, ext_arg, start_time); 268 __set_current_state(TASK_RUNNING); 269 atomic_set(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT); 270 271 /* 272 * Run task_work after scheduling and before io_should_wake(). 273 * If we got woken because of task_work being processed, run it 274 * now rather than let the caller do another wait loop. 275 */ 276 if (io_local_work_pending(ctx)) 277 io_run_local_work(ctx, nr_wait, nr_wait); 278 io_run_task_work(); 279 280 /* 281 * Non-local task_work will be run on exit to userspace, but 282 * if we're using DEFER_TASKRUN, then we could have waited 283 * with a timeout for a number of requests. If the timeout 284 * hits, we could have some requests ready to process. Ensure 285 * this break is _after_ we have run task_work, to avoid 286 * deferring running potentially pending requests until the 287 * next time we wait for events. 288 */ 289 if (ret < 0) 290 break; 291 292 check_cq = READ_ONCE(ctx->check_cq); 293 if (unlikely(check_cq)) { 294 /* let the caller flush overflows, retry */ 295 if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT)) 296 io_cqring_do_overflow_flush(ctx); 297 if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT)) { 298 ret = -EBADR; 299 break; 300 } 301 } 302 303 if (io_should_wake(&iowq)) { 304 ret = 0; 305 break; 306 } 307 cond_resched(); 308 309 /* if min timeout has been hit, don't reset wait count */ 310 if (!iowq.hit_timeout) 311 scoped_guard(rcu) 312 nr_wait = (int) iowq.cq_tail - 313 READ_ONCE(io_get_rings(ctx)->cq.tail); 314 else 315 nr_wait = 1; 316 } while (1); 317 318 if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN)) 319 finish_wait(&ctx->cq_wait, &iowq.wq); 320 restore_saved_sigmask_unless(ret == -EINTR); 321 322 guard(rcu)(); 323 return READ_ONCE(io_get_rings(ctx)->cq.head) == READ_ONCE(io_get_rings(ctx)->cq.tail) ? ret : 0; 324 } 325