1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Task work handling for io_uring 4 */ 5 #include <linux/kernel.h> 6 #include <linux/errno.h> 7 #include <linux/sched/signal.h> 8 #include <linux/io_uring.h> 9 #include <linux/indirect_call_wrapper.h> 10 11 #include "io_uring.h" 12 #include "tctx.h" 13 #include "poll.h" 14 #include "rw.h" 15 #include "eventfd.h" 16 #include "wait.h" 17 #include "mpscq.h" 18 19 static void ctx_flush_and_put(struct io_ring_ctx *ctx, io_tw_token_t tw) 20 { 21 if (!ctx) 22 return; 23 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) 24 atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); 25 26 io_submit_flush_completions(ctx); 27 mutex_unlock(&ctx->uring_lock); 28 percpu_ref_put(&ctx->refs); 29 } 30 31 void io_tctx_fallback_work(struct work_struct *work) 32 { 33 struct io_uring_task *tctx = container_of(work, struct io_uring_task, 34 fallback_work); 35 unsigned int count = 0; 36 37 /* 38 * Run the entries directly. We're in PF_KTHRED context, hence 39 * io_should_terminate_tw() is true and they will be marked as 40 * canceled. 41 */ 42 tctx_task_work_run(tctx, UINT_MAX, &count); 43 put_task_struct(tctx->task); 44 } 45 46 static void io_fallback_tw(struct io_uring_task *tctx) 47 { 48 /* 49 * The task ref both keeps ->task valid and, as __io_uring_free() is 50 * only called when the task itself is freed, ensures the tctx (and 51 * the queued work) stay around until the drain has run. 52 */ 53 get_task_struct(tctx->task); 54 if (!queue_work(system_dfl_wq, &tctx->fallback_work)) 55 put_task_struct(tctx->task); 56 } 57 58 /* 59 * Run queued task_work, processing no more than max_entries, with the number 60 * of entries processed added to *count. If more entries than max_entries are 61 * available, the remainder simply stay on the queue for the next run. 62 */ 63 void tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries, 64 unsigned int *count) 65 { 66 struct io_ring_ctx *ctx = NULL; 67 struct io_tw_state ts = { }; 68 69 while (*count < max_entries) { 70 struct llist_node *node = mpscq_pop(&tctx->task_list, 71 &tctx->task_head); 72 struct io_kiocb *req; 73 74 if (!node) { 75 if (mpscq_empty(&tctx->task_list)) 76 break; 77 /* 78 * A producer has published a node but hasn't 79 * linked it into the queue yet (see mpscq_pop()). 80 * Give it a chance to finish rather than spinning, 81 * and don't sit on the ctx lock while doing so. 82 */ 83 ctx_flush_and_put(ctx, ts); 84 ctx = NULL; 85 cond_resched(); 86 continue; 87 } 88 req = container_of(node, struct io_kiocb, io_task_work.node); 89 if (req->ctx != ctx) { 90 ctx_flush_and_put(ctx, ts); 91 ctx = req->ctx; 92 mutex_lock(&ctx->uring_lock); 93 percpu_ref_get(&ctx->refs); 94 ts.cancel = io_should_terminate_tw(ctx); 95 } 96 INDIRECT_CALL_2(req->io_task_work.func, 97 io_poll_task_func, io_req_rw_complete, 98 (struct io_tw_req){req}, ts); 99 (*count)++; 100 /* 101 * Break if most recent pop emptied the queue. This helps 102 * bound task_work run, and also protects the regular 103 * task_work addition. 104 */ 105 if (mpscq_pop_emptied(&tctx->task_list, tctx->task_head)) 106 break; 107 if (unlikely(need_resched())) { 108 ctx_flush_and_put(ctx, ts); 109 ctx = NULL; 110 cond_resched(); 111 } 112 } 113 ctx_flush_and_put(ctx, ts); 114 115 /* 116 * Relaxed read is enough as only the task itself sets ->in_cancel. 117 * The tctx may also be drained by io_tctx_fallback_work(), in which 118 * case current is a kworker that has no tctx refs to drop. 119 */ 120 if (unlikely(atomic_read(&tctx->in_cancel)) && 121 current->io_uring == tctx) 122 io_uring_drop_tctx_refs(current); 123 124 trace_io_uring_task_work_run(tctx, *count); 125 } 126 127 void tctx_task_work(struct callback_head *cb) 128 { 129 struct io_uring_task *tctx; 130 unsigned int count = 0; 131 132 tctx = container_of(cb, struct io_uring_task, task_work); 133 tctx_task_work_run(tctx, UINT_MAX, &count); 134 } 135 136 /* 137 * Sets IORING_SQ_TASKRUN in the sq_flags shared with userspace, using the 138 * RCU protected rings pointer to be safe against concurrent ring resizing. 139 */ 140 static void io_ctx_mark_taskrun(struct io_ring_ctx *ctx) 141 { 142 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) { 143 struct io_rings *rings; 144 145 guard(rcu)(); 146 rings = rcu_dereference(ctx->rings_rcu); 147 atomic_or(IORING_SQ_TASKRUN, &rings->sq_flags); 148 } 149 } 150 151 void io_req_local_work_add(struct io_kiocb *req, unsigned flags) 152 { 153 struct io_ring_ctx *ctx = req->ctx; 154 int nr_wait; 155 156 /* 157 * We don't know how many requests there are in the link and whether 158 * they can even be queued lazily, fall back to non-lazy. 159 */ 160 if (req->flags & IO_REQ_LINK_FLAGS) 161 flags &= ~IOU_F_TWQ_LAZY_WAKE; 162 163 /* 164 * The xchg() in mpscq_push() implies a full barrier, which pairs with 165 * the barrier in set_current_state() on the io_cqring_wait() side. This 166 * ensures that either we see the updated ->cq_wait_nr, or waiters going 167 * to sleep will observe the work added to the list, which is similar to 168 * the wait/wake task state sync. 169 */ 170 if (mpscq_push(&ctx->work_list, &req->io_task_work.node)) { 171 io_ctx_mark_taskrun(ctx); 172 if (data_race(ctx->int_flags) & IO_RING_F_HAS_EVFD) 173 io_eventfd_signal(ctx, false); 174 } 175 176 /* 177 * No one is waiting (IO_CQ_WAKE_INIT), or this cycle's wake up has 178 * already been issued (zero or negative, see below). 179 */ 180 nr_wait = atomic_read(&ctx->cq_wait_nr); 181 if (nr_wait <= 0) 182 return; 183 if (flags & IOU_F_TWQ_LAZY_WAKE) { 184 /* 185 * ->cq_wait_nr counts down the number of lazy adds, once it 186 * hits zero we're good to wake the waiter. A producer that 187 * gets delayed between pushing its entry and getting here 188 * may count down a later wait cycle. That's OK, it'll be an 189 * early wake, not a lost one. 190 */ 191 if (!atomic_dec_and_test(&ctx->cq_wait_nr)) 192 return; 193 } else if (atomic_xchg(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT) <= 0) { 194 /* 195 * Potentially raced with lazy add, claim the wake. A value 196 * <= 0 means a lazy add hit zero or another forced add 197 * claimed IO_CQ_WAKE_INIT. Either way, the wake up for this 198 * wait cycle has already been done. 199 */ 200 return; 201 } 202 wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE); 203 } 204 205 void io_req_normal_work_add(struct io_kiocb *req) 206 { 207 struct io_uring_task *tctx = req->tctx; 208 struct io_ring_ctx *ctx = req->ctx; 209 210 /* tw run already pending, nothing else to do */ 211 if (!mpscq_push(&tctx->task_list, &req->io_task_work.node)) 212 return; 213 214 /* 215 * Doesn't need to use ->rings_rcu, as resizing isn't supported for 216 * !DEFER_TASKRUN. 217 */ 218 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) 219 atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); 220 221 /* SQPOLL doesn't need the task_work added, it'll run it itself */ 222 if (ctx->flags & IORING_SETUP_SQPOLL) { 223 __set_notify_signal(tctx->task); 224 return; 225 } 226 227 if (likely(!task_work_add(tctx->task, &tctx->task_work, ctx->notify_method))) 228 return; 229 230 io_fallback_tw(tctx); 231 } 232 233 void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags) 234 { 235 if (WARN_ON_ONCE(!(req->ctx->flags & IORING_SETUP_DEFER_TASKRUN))) 236 return; 237 __io_req_task_work_add(req, flags); 238 } 239 240 void __cold io_cancel_local_task_work(struct io_ring_ctx *ctx) 241 { 242 struct io_tw_state ts = { .cancel = true }; 243 struct llist_node *node; 244 245 /* 246 * The work list consumer side is serialized by ->uring_lock, see 247 * __io_run_local_work(). Grab it to guard against racing with normal 248 * task_work running, as the task may be exiting. The ring is going 249 * away, run the entries in cancel mode right here - the callers 250 * provide the same process context the per-ctx fallback work that 251 * they were previously punted to ran in. 252 */ 253 guard(mutex)(&ctx->uring_lock); 254 255 while (!mpscq_empty(&ctx->work_list)) { 256 struct io_kiocb *req; 257 258 node = mpscq_pop(&ctx->work_list, &ctx->work_head); 259 if (!node) { 260 /* a producer is mid-push, wait for it to link */ 261 cond_resched(); 262 continue; 263 } 264 req = container_of(node, struct io_kiocb, io_task_work.node); 265 req->io_task_work.func((struct io_tw_req){req}, ts); 266 } 267 io_submit_flush_completions(ctx); 268 } 269 270 static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events, 271 int min_events) 272 { 273 if (!io_local_work_pending(ctx)) 274 return false; 275 if (events < min_events) 276 return true; 277 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) 278 atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); 279 return false; 280 } 281 282 static int __io_run_local_work_loop(struct io_ring_ctx *ctx, 283 io_tw_token_t tw, 284 int events) 285 { 286 int ret = 0; 287 288 while (ret < events) { 289 struct llist_node *node = mpscq_pop(&ctx->work_list, &ctx->work_head); 290 struct io_kiocb *req; 291 292 if (!node) 293 break; 294 req = container_of(node, struct io_kiocb, io_task_work.node); 295 INDIRECT_CALL_2(req->io_task_work.func, 296 io_poll_task_func, io_req_rw_complete, 297 (struct io_tw_req){req}, tw); 298 ret++; 299 } 300 301 return ret; 302 } 303 304 static int __io_run_local_work(struct io_ring_ctx *ctx, io_tw_token_t tw, 305 int min_events, int max_events) 306 { 307 unsigned int loops = 0; 308 int ret = 0; 309 310 if (WARN_ON_ONCE(ctx->submitter_task != current)) 311 return -EEXIST; 312 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) 313 atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); 314 again: 315 /* 316 * If the last loop made no progress while work is still pending, 317 * a producer has published a node but hasn't linked it into the 318 * queue yet (see mpscq_pop()). Give it a chance to finish rather 319 * than spinning on the queue. 320 */ 321 if (unlikely(loops && !ret)) 322 cond_resched(); 323 tw.cancel = io_should_terminate_tw(ctx); 324 min_events -= ret; 325 ret = __io_run_local_work_loop(ctx, tw, max_events); 326 loops++; 327 328 if (io_run_local_work_continue(ctx, ret, min_events)) 329 goto again; 330 io_submit_flush_completions(ctx); 331 if (io_run_local_work_continue(ctx, ret, min_events)) 332 goto again; 333 334 trace_io_uring_local_work_run(ctx, ret, loops); 335 return ret; 336 } 337 338 int io_run_local_work_locked(struct io_ring_ctx *ctx, int min_events) 339 { 340 struct io_tw_state ts = {}; 341 342 if (!io_local_work_pending(ctx)) 343 return 0; 344 return __io_run_local_work(ctx, ts, min_events, 345 max(IO_LOCAL_TW_DEFAULT_MAX, min_events)); 346 } 347 348 int io_run_local_work(struct io_ring_ctx *ctx, int min_events, int max_events) 349 { 350 struct io_tw_state ts = {}; 351 int ret; 352 353 mutex_lock(&ctx->uring_lock); 354 ret = __io_run_local_work(ctx, ts, min_events, max_events); 355 mutex_unlock(&ctx->uring_lock); 356 return ret; 357 } 358