1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Waiting for completion events 4 */ 5 #include <linux/kernel.h> 6 #include <linux/sched/signal.h> 7 #include <linux/io_uring.h> 8 9 #include <trace/events/io_uring.h> 10 11 #include <uapi/linux/io_uring.h> 12 13 #include "io_uring.h" 14 #include "napi.h" 15 #include "wait.h" 16 17 static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode, 18 int wake_flags, void *key) 19 { 20 struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue, wq); 21 22 /* 23 * Cannot safely flush overflowed CQEs from here, ensure we wake up 24 * the task, and the next invocation will do it. 25 */ 26 if (io_should_wake(iowq) || io_has_work(iowq->ctx)) 27 return autoremove_wake_function(curr, mode, wake_flags, key); 28 return -1; 29 } 30 31 int io_run_task_work_sig(struct io_ring_ctx *ctx) 32 { 33 if (io_local_work_pending(ctx)) { 34 __set_current_state(TASK_RUNNING); 35 if (io_run_local_work(ctx, INT_MAX, IO_LOCAL_TW_DEFAULT_MAX) > 0) 36 return 0; 37 } 38 if (io_run_task_work() > 0) 39 return 0; 40 if (task_sigpending(current)) 41 return -EINTR; 42 return 0; 43 } 44 45 static bool current_pending_io(void) 46 { 47 struct io_uring_task *tctx = current->io_uring; 48 49 if (!tctx) 50 return false; 51 return percpu_counter_read_positive(&tctx->inflight); 52 } 53 54 static enum hrtimer_restart io_cqring_timer_wakeup(struct hrtimer *timer) 55 { 56 struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t); 57 58 WRITE_ONCE(iowq->hit_timeout, 1); 59 iowq->min_timeout = 0; 60 wake_up_process(iowq->wq.private); 61 return HRTIMER_NORESTART; 62 } 63 64 /* 65 * Doing min_timeout portion. If we saw any timeouts, events, or have work, 66 * wake up. If not, and we have a normal timeout, switch to that and keep 67 * sleeping. 68 */ 69 static enum hrtimer_restart io_cqring_min_timer_wakeup(struct hrtimer *timer) 70 { 71 struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t); 72 struct io_ring_ctx *ctx = iowq->ctx; 73 74 /* no general timeout, or shorter (or equal), we are done */ 75 if (iowq->timeout == KTIME_MAX || 76 ktime_compare(iowq->min_timeout, iowq->timeout) >= 0) 77 goto out_wake; 78 /* work we may need to run, wake function will see if we need to wake */ 79 if (io_has_work(ctx)) 80 goto out_wake; 81 /* got events since we started waiting, min timeout is done */ 82 scoped_guard(rcu) { 83 struct io_rings *rings = io_get_rings(ctx); 84 85 if (iowq->cq_min_tail != READ_ONCE(rings->cq.tail)) 86 goto out_wake; 87 /* if we have any events and min timeout expired, we're done */ 88 if (io_cqring_events(ctx)) 89 goto out_wake; 90 } 91 /* 92 * If using deferred task_work running and application is waiting on 93 * more than one request, ensure we reset it now where we are switching 94 * to normal sleeps. Any request completion post min_wait should wake 95 * the task and return. 96 */ 97 if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) { 98 atomic_set(&ctx->cq_wait_nr, 1); 99 smp_mb(); 100 if (!llist_empty(&ctx->work_llist)) 101 goto out_wake; 102 } 103 104 /* any generated CQE posted past this time should wake us up */ 105 iowq->cq_tail = iowq->cq_min_tail; 106 107 hrtimer_update_function(&iowq->t, io_cqring_timer_wakeup); 108 hrtimer_set_expires(timer, iowq->timeout); 109 return HRTIMER_RESTART; 110 out_wake: 111 return io_cqring_timer_wakeup(timer); 112 } 113 114 static int io_cqring_schedule_timeout(struct io_wait_queue *iowq, 115 clockid_t clock_id, ktime_t start_time) 116 { 117 ktime_t timeout; 118 119 if (iowq->min_timeout) { 120 timeout = ktime_add_ns(iowq->min_timeout, start_time); 121 hrtimer_setup_on_stack(&iowq->t, io_cqring_min_timer_wakeup, clock_id, 122 HRTIMER_MODE_ABS); 123 } else { 124 timeout = iowq->timeout; 125 hrtimer_setup_on_stack(&iowq->t, io_cqring_timer_wakeup, clock_id, 126 HRTIMER_MODE_ABS); 127 } 128 129 hrtimer_set_expires_range_ns(&iowq->t, timeout, 0); 130 hrtimer_start_expires(&iowq->t, HRTIMER_MODE_ABS); 131 132 if (!READ_ONCE(iowq->hit_timeout)) 133 schedule(); 134 135 hrtimer_cancel(&iowq->t); 136 destroy_hrtimer_on_stack(&iowq->t); 137 __set_current_state(TASK_RUNNING); 138 139 return READ_ONCE(iowq->hit_timeout) ? -ETIME : 0; 140 } 141 142 static int __io_cqring_wait_schedule(struct io_ring_ctx *ctx, 143 struct io_wait_queue *iowq, 144 struct ext_arg *ext_arg, 145 ktime_t start_time) 146 { 147 int ret = 0; 148 149 /* 150 * Mark us as being in io_wait if we have pending requests, so cpufreq 151 * can take into account that the task is waiting for IO - turns out 152 * to be important for low QD IO. 153 */ 154 if (ext_arg->iowait && current_pending_io()) 155 current->in_iowait = 1; 156 if (iowq->timeout != KTIME_MAX || iowq->min_timeout) 157 ret = io_cqring_schedule_timeout(iowq, ctx->clockid, start_time); 158 else 159 schedule(); 160 current->in_iowait = 0; 161 return ret; 162 } 163 164 /* If this returns > 0, the caller should retry */ 165 static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx, 166 struct io_wait_queue *iowq, 167 struct ext_arg *ext_arg, 168 ktime_t start_time) 169 { 170 if (unlikely(READ_ONCE(ctx->check_cq))) 171 return 1; 172 if (unlikely(io_local_work_pending(ctx))) 173 return 1; 174 if (unlikely(task_work_pending(current))) 175 return 1; 176 if (unlikely(task_sigpending(current))) 177 return -EINTR; 178 if (unlikely(io_should_wake(iowq))) 179 return 0; 180 181 return __io_cqring_wait_schedule(ctx, iowq, ext_arg, start_time); 182 } 183 184 /* 185 * Wait until events become available, if we don't already have some. The 186 * application must reap them itself, as they reside on the shared cq ring. 187 */ 188 int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags, 189 struct ext_arg *ext_arg) 190 { 191 struct io_wait_queue iowq; 192 struct io_rings *rings; 193 ktime_t start_time; 194 int ret, nr_wait; 195 196 min_events = min_t(int, min_events, ctx->cq_entries); 197 198 if (!io_allowed_run_tw(ctx)) 199 return -EEXIST; 200 if (io_local_work_pending(ctx)) 201 io_run_local_work(ctx, min_events, 202 max(IO_LOCAL_TW_DEFAULT_MAX, min_events)); 203 io_run_task_work(); 204 205 if (unlikely(test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq))) 206 io_cqring_do_overflow_flush(ctx); 207 208 rcu_read_lock(); 209 rings = io_get_rings(ctx); 210 if (__io_cqring_events_user(ctx) >= min_events) { 211 rcu_read_unlock(); 212 return 0; 213 } 214 215 init_waitqueue_func_entry(&iowq.wq, io_wake_function); 216 iowq.wq.private = current; 217 INIT_LIST_HEAD(&iowq.wq.entry); 218 iowq.ctx = ctx; 219 iowq.cq_tail = READ_ONCE(rings->cq.head) + min_events; 220 iowq.cq_min_tail = READ_ONCE(rings->cq.tail); 221 nr_wait = (int) iowq.cq_tail - READ_ONCE(rings->cq.tail); 222 rcu_read_unlock(); 223 rings = NULL; 224 iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts); 225 iowq.hit_timeout = 0; 226 iowq.min_timeout = ext_arg->min_time; 227 iowq.timeout = KTIME_MAX; 228 start_time = io_get_time(ctx); 229 230 if (ext_arg->ts_set) { 231 iowq.timeout = timespec64_to_ktime(ext_arg->ts); 232 if (!(flags & IORING_ENTER_ABS_TIMER)) 233 iowq.timeout = ktime_add(iowq.timeout, start_time); 234 } 235 236 if (ext_arg->sig) { 237 #ifdef CONFIG_COMPAT 238 if (in_compat_syscall()) 239 ret = set_compat_user_sigmask((const compat_sigset_t __user *)ext_arg->sig, 240 ext_arg->argsz); 241 else 242 #endif 243 ret = set_user_sigmask(ext_arg->sig, ext_arg->argsz); 244 245 if (ret) 246 return ret; 247 } 248 249 io_napi_busy_loop(ctx, &iowq); 250 251 trace_io_uring_cqring_wait(ctx, min_events); 252 do { 253 unsigned long check_cq; 254 255 if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) { 256 atomic_set(&ctx->cq_wait_nr, nr_wait); 257 set_current_state(TASK_INTERRUPTIBLE); 258 } else { 259 prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq, 260 TASK_INTERRUPTIBLE); 261 } 262 263 ret = io_cqring_wait_schedule(ctx, &iowq, ext_arg, start_time); 264 __set_current_state(TASK_RUNNING); 265 atomic_set(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT); 266 267 /* 268 * Run task_work after scheduling and before io_should_wake(). 269 * If we got woken because of task_work being processed, run it 270 * now rather than let the caller do another wait loop. 271 */ 272 if (io_local_work_pending(ctx)) 273 io_run_local_work(ctx, nr_wait, nr_wait); 274 io_run_task_work(); 275 276 /* 277 * Non-local task_work will be run on exit to userspace, but 278 * if we're using DEFER_TASKRUN, then we could have waited 279 * with a timeout for a number of requests. If the timeout 280 * hits, we could have some requests ready to process. Ensure 281 * this break is _after_ we have run task_work, to avoid 282 * deferring running potentially pending requests until the 283 * next time we wait for events. 284 */ 285 if (ret < 0) 286 break; 287 288 check_cq = READ_ONCE(ctx->check_cq); 289 if (unlikely(check_cq)) { 290 /* let the caller flush overflows, retry */ 291 if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT)) 292 io_cqring_do_overflow_flush(ctx); 293 if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT)) { 294 ret = -EBADR; 295 break; 296 } 297 } 298 299 if (io_should_wake(&iowq)) { 300 ret = 0; 301 break; 302 } 303 cond_resched(); 304 305 /* if min timeout has been hit, don't reset wait count */ 306 if (!iowq.hit_timeout) 307 scoped_guard(rcu) 308 nr_wait = (int) iowq.cq_tail - 309 READ_ONCE(io_get_rings(ctx)->cq.tail); 310 else 311 nr_wait = 1; 312 } while (1); 313 314 if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN)) 315 finish_wait(&ctx->cq_wait, &iowq.wq); 316 restore_saved_sigmask_unless(ret == -EINTR); 317 318 guard(rcu)(); 319 return READ_ONCE(io_get_rings(ctx)->cq.head) == READ_ONCE(io_get_rings(ctx)->cq.tail) ? ret : 0; 320 } 321