1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Task work handling for io_uring 4 */ 5 #include <linux/kernel.h> 6 #include <linux/errno.h> 7 #include <linux/sched/signal.h> 8 #include <linux/io_uring.h> 9 #include <linux/indirect_call_wrapper.h> 10 11 #include "io_uring.h" 12 #include "tctx.h" 13 #include "poll.h" 14 #include "rw.h" 15 #include "eventfd.h" 16 #include "wait.h" 17 #include "mpscq.h" 18 19 static void ctx_flush_and_put(struct io_ring_ctx *ctx, io_tw_token_t tw) 20 { 21 if (!ctx) 22 return; 23 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) 24 atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); 25 26 io_submit_flush_completions(ctx); 27 mutex_unlock(&ctx->uring_lock); 28 percpu_ref_put(&ctx->refs); 29 } 30 31 void io_tctx_fallback_work(struct work_struct *work) 32 { 33 struct io_uring_task *tctx = container_of(work, struct io_uring_task, 34 fallback_work); 35 unsigned int count = 0; 36 37 /* see tctx_task_work() - a set bit must always have a run coming */ 38 clear_bit(0, &tctx->tw_pending); 39 smp_mb__after_atomic(); 40 41 /* 42 * Run the entries directly. We're in PF_KTHRED context, hence 43 * io_should_terminate_tw() is true and they will be marked as 44 * canceled. 45 */ 46 tctx_task_work_run(tctx, UINT_MAX, &count); 47 put_task_struct(tctx->task); 48 } 49 50 static void io_fallback_tw(struct io_uring_task *tctx) 51 { 52 /* 53 * The task ref both keeps ->task valid and, as __io_uring_free() is 54 * only called when the task itself is freed, ensures the tctx (and 55 * the queued work) stay around until the drain has run. 56 */ 57 get_task_struct(tctx->task); 58 if (!queue_work(system_unbound_wq, &tctx->fallback_work)) 59 put_task_struct(tctx->task); 60 } 61 62 /* 63 * Run queued task_work, processing no more than max_entries, with the number 64 * of entries processed added to *count. If more entries than max_entries are 65 * available, the remainder simply stay on the queue for the next run. 66 */ 67 void tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries, 68 unsigned int *count) 69 { 70 struct io_ring_ctx *ctx = NULL; 71 struct io_tw_state ts = { }; 72 73 while (*count < max_entries) { 74 struct llist_node *node = mpscq_pop(&tctx->task_list, 75 &tctx->task_head); 76 struct io_kiocb *req; 77 78 if (!node) { 79 if (mpscq_empty(&tctx->task_list)) 80 break; 81 /* 82 * A producer has published a node but hasn't 83 * linked it into the queue yet (see mpscq_pop()). 84 * Give it a chance to finish rather than spinning, 85 * and don't sit on the ctx lock while doing so. 86 */ 87 ctx_flush_and_put(ctx, ts); 88 ctx = NULL; 89 cond_resched(); 90 continue; 91 } 92 req = container_of(node, struct io_kiocb, io_task_work.node); 93 if (req->ctx != ctx) { 94 ctx_flush_and_put(ctx, ts); 95 ctx = req->ctx; 96 mutex_lock(&ctx->uring_lock); 97 percpu_ref_get(&ctx->refs); 98 ts.cancel = io_should_terminate_tw(ctx); 99 } 100 INDIRECT_CALL_2(req->io_task_work.func, 101 io_poll_task_func, io_req_rw_complete, 102 (struct io_tw_req){req}, ts); 103 (*count)++; 104 if (unlikely(need_resched())) { 105 ctx_flush_and_put(ctx, ts); 106 ctx = NULL; 107 cond_resched(); 108 } 109 } 110 ctx_flush_and_put(ctx, ts); 111 112 /* 113 * Relaxed read is enough as only the task itself sets ->in_cancel. 114 * The tctx may also be drained by io_tctx_fallback_work(), in which 115 * case current is a kworker that has no tctx refs to drop. 116 */ 117 if (unlikely(atomic_read(&tctx->in_cancel)) && 118 current->io_uring == tctx) 119 io_uring_drop_tctx_refs(current); 120 121 trace_io_uring_task_work_run(tctx, *count); 122 } 123 124 void tctx_task_work(struct callback_head *cb) 125 { 126 struct io_uring_task *tctx; 127 unsigned int count = 0; 128 129 tctx = container_of(cb, struct io_uring_task, task_work); 130 clear_bit(0, &tctx->tw_pending); 131 smp_mb__after_atomic(); 132 tctx_task_work_run(tctx, UINT_MAX, &count); 133 } 134 135 /* 136 * Sets IORING_SQ_TASKRUN in the sq_flags shared with userspace, using the 137 * RCU protected rings pointer to be safe against concurrent ring resizing. 138 */ 139 static void io_ctx_mark_taskrun(struct io_ring_ctx *ctx) 140 { 141 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) { 142 struct io_rings *rings; 143 144 guard(rcu)(); 145 rings = rcu_dereference(ctx->rings_rcu); 146 atomic_or(IORING_SQ_TASKRUN, &rings->sq_flags); 147 } 148 } 149 150 void io_req_local_work_add(struct io_kiocb *req, unsigned flags) 151 { 152 struct io_ring_ctx *ctx = req->ctx; 153 int nr_wait; 154 155 /* 156 * We don't know how many requests there are in the link and whether 157 * they can even be queued lazily, fall back to non-lazy. 158 */ 159 if (req->flags & IO_REQ_LINK_FLAGS) 160 flags &= ~IOU_F_TWQ_LAZY_WAKE; 161 162 /* 163 * The xchg() in mpscq_push() implies a full barrier, which pairs with 164 * the barrier in set_current_state() on the io_cqring_wait() side. This 165 * ensures that either we see the updated ->cq_wait_nr, or waiters going 166 * to sleep will observe the work added to the list, which is similar to 167 * the wait/wake task state sync. 168 */ 169 if (mpscq_push(&ctx->work_list, &req->io_task_work.node)) { 170 io_ctx_mark_taskrun(ctx); 171 if (data_race(ctx->int_flags) & IO_RING_F_HAS_EVFD) 172 io_eventfd_signal(ctx, false); 173 } 174 175 /* 176 * No one is waiting (IO_CQ_WAKE_INIT), or this cycle's wake up has 177 * already been issued (zero or negative, see below). 178 */ 179 nr_wait = atomic_read(&ctx->cq_wait_nr); 180 if (nr_wait <= 0) 181 return; 182 if (flags & IOU_F_TWQ_LAZY_WAKE) { 183 /* 184 * ->cq_wait_nr counts down the number of lazy adds, once it 185 * hits zero we're good to wake the waiter. A producer that 186 * gets delayed between pushing its entry and getting here 187 * may count down a later wait cycle. That's OK, it'll be an 188 * early wake, not a lost one. 189 */ 190 if (!atomic_dec_and_test(&ctx->cq_wait_nr)) 191 return; 192 } else if (atomic_xchg(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT) <= 0) { 193 /* 194 * Potentially raced with lazy add, claim the wake. A value 195 * <= 0 means a lazy add hit zero or another forced add 196 * claimed IO_CQ_WAKE_INIT. Either way, the wake up for this 197 * wait cycle has already been done. 198 */ 199 return; 200 } 201 wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE); 202 } 203 204 void io_req_normal_work_add(struct io_kiocb *req) 205 { 206 struct io_uring_task *tctx = req->tctx; 207 struct io_ring_ctx *ctx = req->ctx; 208 209 /* task_work already pending, we're done */ 210 if (!mpscq_push(&tctx->task_list, &req->io_task_work.node)) 211 return; 212 213 /* 214 * Doesn't need to use ->rings_rcu, as resizing isn't supported for 215 * !DEFER_TASKRUN. 216 */ 217 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) 218 atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); 219 220 /* SQPOLL doesn't need the task_work added, it'll run it itself */ 221 if (ctx->flags & IORING_SETUP_SQPOLL) { 222 __set_notify_signal(tctx->task); 223 return; 224 } 225 226 /* task_work must only be added once */ 227 if (test_and_set_bit(0, &tctx->tw_pending)) 228 return; 229 230 if (likely(!task_work_add(tctx->task, &tctx->task_work, ctx->notify_method))) 231 return; 232 233 io_fallback_tw(tctx); 234 } 235 236 void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags) 237 { 238 if (WARN_ON_ONCE(!(req->ctx->flags & IORING_SETUP_DEFER_TASKRUN))) 239 return; 240 __io_req_task_work_add(req, flags); 241 } 242 243 void __cold io_cancel_local_task_work(struct io_ring_ctx *ctx) 244 { 245 struct io_tw_state ts = { .cancel = true }; 246 struct llist_node *node; 247 248 /* 249 * The work list consumer side is serialized by ->uring_lock, see 250 * __io_run_local_work(). Grab it to guard against racing with normal 251 * task_work running, as the task may be exiting. The ring is going 252 * away, run the entries in cancel mode right here - the callers 253 * provide the same process context the per-ctx fallback work that 254 * they were previously punted to ran in. 255 */ 256 guard(mutex)(&ctx->uring_lock); 257 258 while (!mpscq_empty(&ctx->work_list)) { 259 struct io_kiocb *req; 260 261 node = mpscq_pop(&ctx->work_list, &ctx->work_head); 262 if (!node) { 263 /* a producer is mid-push, wait for it to link */ 264 cond_resched(); 265 continue; 266 } 267 req = container_of(node, struct io_kiocb, io_task_work.node); 268 req->io_task_work.func((struct io_tw_req){req}, ts); 269 } 270 io_submit_flush_completions(ctx); 271 } 272 273 static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events, 274 int min_events) 275 { 276 if (!io_local_work_pending(ctx)) 277 return false; 278 if (events < min_events) 279 return true; 280 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) 281 atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); 282 return false; 283 } 284 285 static int __io_run_local_work_loop(struct io_ring_ctx *ctx, 286 io_tw_token_t tw, 287 int events) 288 { 289 int ret = 0; 290 291 while (ret < events) { 292 struct llist_node *node = mpscq_pop(&ctx->work_list, &ctx->work_head); 293 struct io_kiocb *req; 294 295 if (!node) 296 break; 297 req = container_of(node, struct io_kiocb, io_task_work.node); 298 INDIRECT_CALL_2(req->io_task_work.func, 299 io_poll_task_func, io_req_rw_complete, 300 (struct io_tw_req){req}, tw); 301 ret++; 302 } 303 304 return ret; 305 } 306 307 static int __io_run_local_work(struct io_ring_ctx *ctx, io_tw_token_t tw, 308 int min_events, int max_events) 309 { 310 unsigned int loops = 0; 311 int ret = 0; 312 313 if (WARN_ON_ONCE(ctx->submitter_task != current)) 314 return -EEXIST; 315 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) 316 atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); 317 again: 318 /* 319 * If the last loop made no progress while work is still pending, 320 * a producer has published a node but hasn't linked it into the 321 * queue yet (see mpscq_pop()). Give it a chance to finish rather 322 * than spinning on the queue. 323 */ 324 if (unlikely(loops && !ret)) 325 cond_resched(); 326 tw.cancel = io_should_terminate_tw(ctx); 327 min_events -= ret; 328 ret = __io_run_local_work_loop(ctx, tw, max_events); 329 loops++; 330 331 if (io_run_local_work_continue(ctx, ret, min_events)) 332 goto again; 333 io_submit_flush_completions(ctx); 334 if (io_run_local_work_continue(ctx, ret, min_events)) 335 goto again; 336 337 trace_io_uring_local_work_run(ctx, ret, loops); 338 return ret; 339 } 340 341 int io_run_local_work_locked(struct io_ring_ctx *ctx, int min_events) 342 { 343 struct io_tw_state ts = {}; 344 345 if (!io_local_work_pending(ctx)) 346 return 0; 347 return __io_run_local_work(ctx, ts, min_events, 348 max(IO_LOCAL_TW_DEFAULT_MAX, min_events)); 349 } 350 351 int io_run_local_work(struct io_ring_ctx *ctx, int min_events, int max_events) 352 { 353 struct io_tw_state ts = {}; 354 int ret; 355 356 mutex_lock(&ctx->uring_lock); 357 ret = __io_run_local_work(ctx, ts, min_events, max_events); 358 mutex_unlock(&ctx->uring_lock); 359 return ret; 360 } 361