1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Task work handling for io_uring 4 */ 5 #include <linux/kernel.h> 6 #include <linux/errno.h> 7 #include <linux/sched/signal.h> 8 #include <linux/io_uring.h> 9 #include <linux/indirect_call_wrapper.h> 10 11 #include "io_uring.h" 12 #include "tctx.h" 13 #include "poll.h" 14 #include "rw.h" 15 #include "eventfd.h" 16 #include "wait.h" 17 18 void io_fallback_req_func(struct work_struct *work) 19 { 20 struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx, 21 fallback_work.work); 22 struct llist_node *node = llist_del_all(&ctx->fallback_llist); 23 struct io_kiocb *req, *tmp; 24 struct io_tw_state ts = {}; 25 26 percpu_ref_get(&ctx->refs); 27 mutex_lock(&ctx->uring_lock); 28 ts.cancel = io_should_terminate_tw(ctx); 29 llist_for_each_entry_safe(req, tmp, node, io_task_work.node) 30 req->io_task_work.func((struct io_tw_req){req}, ts); 31 io_submit_flush_completions(ctx); 32 mutex_unlock(&ctx->uring_lock); 33 percpu_ref_put(&ctx->refs); 34 } 35 36 static void ctx_flush_and_put(struct io_ring_ctx *ctx, io_tw_token_t tw) 37 { 38 if (!ctx) 39 return; 40 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) 41 atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); 42 43 io_submit_flush_completions(ctx); 44 mutex_unlock(&ctx->uring_lock); 45 percpu_ref_put(&ctx->refs); 46 } 47 48 /* 49 * Run queued task_work, returning the number of entries processed in *count. 50 * If more entries than max_entries are available, stop processing once this 51 * is reached and return the rest of the list. 52 */ 53 struct llist_node *io_handle_tw_list(struct llist_node *node, 54 unsigned int *count, 55 unsigned int max_entries) 56 { 57 struct io_ring_ctx *ctx = NULL; 58 struct io_tw_state ts = { }; 59 60 do { 61 struct llist_node *next = node->next; 62 struct io_kiocb *req = container_of(node, struct io_kiocb, 63 io_task_work.node); 64 65 if (req->ctx != ctx) { 66 ctx_flush_and_put(ctx, ts); 67 ctx = req->ctx; 68 mutex_lock(&ctx->uring_lock); 69 percpu_ref_get(&ctx->refs); 70 ts.cancel = io_should_terminate_tw(ctx); 71 } 72 INDIRECT_CALL_2(req->io_task_work.func, 73 io_poll_task_func, io_req_rw_complete, 74 (struct io_tw_req){req}, ts); 75 node = next; 76 (*count)++; 77 if (unlikely(need_resched())) { 78 ctx_flush_and_put(ctx, ts); 79 ctx = NULL; 80 cond_resched(); 81 } 82 } while (node && *count < max_entries); 83 84 ctx_flush_and_put(ctx, ts); 85 return node; 86 } 87 88 static __cold void __io_fallback_tw(struct llist_node *node, bool sync) 89 { 90 struct io_ring_ctx *last_ctx = NULL; 91 struct io_kiocb *req; 92 93 while (node) { 94 req = container_of(node, struct io_kiocb, io_task_work.node); 95 node = node->next; 96 if (last_ctx != req->ctx) { 97 if (last_ctx) { 98 if (sync) 99 flush_delayed_work(&last_ctx->fallback_work); 100 percpu_ref_put(&last_ctx->refs); 101 } 102 last_ctx = req->ctx; 103 percpu_ref_get(&last_ctx->refs); 104 } 105 if (llist_add(&req->io_task_work.node, &last_ctx->fallback_llist)) 106 schedule_delayed_work(&last_ctx->fallback_work, 1); 107 } 108 109 if (last_ctx) { 110 if (sync) 111 flush_delayed_work(&last_ctx->fallback_work); 112 percpu_ref_put(&last_ctx->refs); 113 } 114 } 115 116 static void io_fallback_tw(struct io_uring_task *tctx, bool sync) 117 { 118 struct llist_node *node = llist_del_all(&tctx->task_list); 119 120 __io_fallback_tw(node, sync); 121 } 122 123 struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, 124 unsigned int max_entries, 125 unsigned int *count) 126 { 127 struct llist_node *node; 128 129 node = llist_del_all(&tctx->task_list); 130 if (node) { 131 node = llist_reverse_order(node); 132 node = io_handle_tw_list(node, count, max_entries); 133 } 134 135 /* relaxed read is enough as only the task itself sets ->in_cancel */ 136 if (unlikely(atomic_read(&tctx->in_cancel))) 137 io_uring_drop_tctx_refs(current); 138 139 trace_io_uring_task_work_run(tctx, *count); 140 return node; 141 } 142 143 void tctx_task_work(struct callback_head *cb) 144 { 145 struct io_uring_task *tctx; 146 struct llist_node *ret; 147 unsigned int count = 0; 148 149 tctx = container_of(cb, struct io_uring_task, task_work); 150 ret = tctx_task_work_run(tctx, UINT_MAX, &count); 151 /* can't happen */ 152 WARN_ON_ONCE(ret); 153 } 154 155 void io_req_local_work_add(struct io_kiocb *req, unsigned flags) 156 { 157 struct io_ring_ctx *ctx = req->ctx; 158 unsigned nr_wait, nr_tw, nr_tw_prev; 159 struct llist_node *head; 160 161 /* See comment above IO_CQ_WAKE_INIT */ 162 BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES); 163 164 /* 165 * We don't know how many requests there are in the link and whether 166 * they can even be queued lazily, fall back to non-lazy. 167 */ 168 if (req->flags & IO_REQ_LINK_FLAGS) 169 flags &= ~IOU_F_TWQ_LAZY_WAKE; 170 171 guard(rcu)(); 172 173 head = READ_ONCE(ctx->work_llist.first); 174 do { 175 nr_tw_prev = 0; 176 if (head) { 177 struct io_kiocb *first_req = container_of(head, 178 struct io_kiocb, 179 io_task_work.node); 180 /* 181 * Might be executed at any moment, rely on 182 * SLAB_TYPESAFE_BY_RCU to keep it alive. 183 */ 184 nr_tw_prev = READ_ONCE(first_req->nr_tw); 185 } 186 187 /* 188 * Theoretically, it can overflow, but that's fine as one of 189 * previous adds should've tried to wake the task. 190 */ 191 nr_tw = nr_tw_prev + 1; 192 if (!(flags & IOU_F_TWQ_LAZY_WAKE)) 193 nr_tw = IO_CQ_WAKE_FORCE; 194 195 req->nr_tw = nr_tw; 196 req->io_task_work.node.next = head; 197 } while (!try_cmpxchg(&ctx->work_llist.first, &head, 198 &req->io_task_work.node)); 199 200 /* 201 * cmpxchg implies a full barrier, which pairs with the barrier 202 * in set_current_state() on the io_cqring_wait() side. It's used 203 * to ensure that either we see updated ->cq_wait_nr, or waiters 204 * going to sleep will observe the work added to the list, which 205 * is similar to the wait/wawke task state sync. 206 */ 207 208 if (!head) { 209 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) 210 atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); 211 if (ctx->has_evfd) 212 io_eventfd_signal(ctx, false); 213 } 214 215 nr_wait = atomic_read(&ctx->cq_wait_nr); 216 /* not enough or no one is waiting */ 217 if (nr_tw < nr_wait) 218 return; 219 /* the previous add has already woken it up */ 220 if (nr_tw_prev >= nr_wait) 221 return; 222 wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE); 223 } 224 225 void io_req_normal_work_add(struct io_kiocb *req) 226 { 227 struct io_uring_task *tctx = req->tctx; 228 struct io_ring_ctx *ctx = req->ctx; 229 230 /* task_work already pending, we're done */ 231 if (!llist_add(&req->io_task_work.node, &tctx->task_list)) 232 return; 233 234 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) 235 atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); 236 237 /* SQPOLL doesn't need the task_work added, it'll run it itself */ 238 if (ctx->flags & IORING_SETUP_SQPOLL) { 239 __set_notify_signal(tctx->task); 240 return; 241 } 242 243 if (likely(!task_work_add(tctx->task, &tctx->task_work, ctx->notify_method))) 244 return; 245 246 io_fallback_tw(tctx, false); 247 } 248 249 void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags) 250 { 251 if (WARN_ON_ONCE(!(req->ctx->flags & IORING_SETUP_DEFER_TASKRUN))) 252 return; 253 __io_req_task_work_add(req, flags); 254 } 255 256 void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx) 257 { 258 struct llist_node *node = llist_del_all(&ctx->work_llist); 259 260 __io_fallback_tw(node, false); 261 node = llist_del_all(&ctx->retry_llist); 262 __io_fallback_tw(node, false); 263 } 264 265 static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events, 266 int min_events) 267 { 268 if (!io_local_work_pending(ctx)) 269 return false; 270 if (events < min_events) 271 return true; 272 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) 273 atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); 274 return false; 275 } 276 277 static int __io_run_local_work_loop(struct llist_node **node, 278 io_tw_token_t tw, 279 int events) 280 { 281 int ret = 0; 282 283 while (*node) { 284 struct llist_node *next = (*node)->next; 285 struct io_kiocb *req = container_of(*node, struct io_kiocb, 286 io_task_work.node); 287 INDIRECT_CALL_2(req->io_task_work.func, 288 io_poll_task_func, io_req_rw_complete, 289 (struct io_tw_req){req}, tw); 290 *node = next; 291 if (++ret >= events) 292 break; 293 } 294 295 return ret; 296 } 297 298 static int __io_run_local_work(struct io_ring_ctx *ctx, io_tw_token_t tw, 299 int min_events, int max_events) 300 { 301 struct llist_node *node; 302 unsigned int loops = 0; 303 int ret = 0; 304 305 if (WARN_ON_ONCE(ctx->submitter_task != current)) 306 return -EEXIST; 307 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) 308 atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); 309 again: 310 tw.cancel = io_should_terminate_tw(ctx); 311 min_events -= ret; 312 ret = __io_run_local_work_loop(&ctx->retry_llist.first, tw, max_events); 313 if (ctx->retry_llist.first) 314 goto retry_done; 315 316 /* 317 * llists are in reverse order, flip it back the right way before 318 * running the pending items. 319 */ 320 node = llist_reverse_order(llist_del_all(&ctx->work_llist)); 321 ret += __io_run_local_work_loop(&node, tw, max_events - ret); 322 ctx->retry_llist.first = node; 323 loops++; 324 325 if (io_run_local_work_continue(ctx, ret, min_events)) 326 goto again; 327 retry_done: 328 io_submit_flush_completions(ctx); 329 if (io_run_local_work_continue(ctx, ret, min_events)) 330 goto again; 331 332 trace_io_uring_local_work_run(ctx, ret, loops); 333 return ret; 334 } 335 336 int io_run_local_work_locked(struct io_ring_ctx *ctx, int min_events) 337 { 338 struct io_tw_state ts = {}; 339 340 if (!io_local_work_pending(ctx)) 341 return 0; 342 return __io_run_local_work(ctx, ts, min_events, 343 max(IO_LOCAL_TW_DEFAULT_MAX, min_events)); 344 } 345 346 int io_run_local_work(struct io_ring_ctx *ctx, int min_events, int max_events) 347 { 348 struct io_tw_state ts = {}; 349 int ret; 350 351 mutex_lock(&ctx->uring_lock); 352 ret = __io_run_local_work(ctx, ts, min_events, max_events); 353 mutex_unlock(&ctx->uring_lock); 354 return ret; 355 } 356