1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Task work handling for io_uring 4 */ 5 #include <linux/kernel.h> 6 #include <linux/errno.h> 7 #include <linux/sched/signal.h> 8 #include <linux/io_uring.h> 9 #include <linux/indirect_call_wrapper.h> 10 11 #include "io_uring.h" 12 #include "tctx.h" 13 #include "poll.h" 14 #include "rw.h" 15 #include "eventfd.h" 16 17 void io_fallback_req_func(struct work_struct *work) 18 { 19 struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx, 20 fallback_work.work); 21 struct llist_node *node = llist_del_all(&ctx->fallback_llist); 22 struct io_kiocb *req, *tmp; 23 struct io_tw_state ts = {}; 24 25 percpu_ref_get(&ctx->refs); 26 mutex_lock(&ctx->uring_lock); 27 ts.cancel = io_should_terminate_tw(ctx); 28 llist_for_each_entry_safe(req, tmp, node, io_task_work.node) 29 req->io_task_work.func((struct io_tw_req){req}, ts); 30 io_submit_flush_completions(ctx); 31 mutex_unlock(&ctx->uring_lock); 32 percpu_ref_put(&ctx->refs); 33 } 34 35 static void ctx_flush_and_put(struct io_ring_ctx *ctx, io_tw_token_t tw) 36 { 37 if (!ctx) 38 return; 39 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) 40 atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); 41 42 io_submit_flush_completions(ctx); 43 mutex_unlock(&ctx->uring_lock); 44 percpu_ref_put(&ctx->refs); 45 } 46 47 /* 48 * Run queued task_work, returning the number of entries processed in *count. 49 * If more entries than max_entries are available, stop processing once this 50 * is reached and return the rest of the list. 51 */ 52 struct llist_node *io_handle_tw_list(struct llist_node *node, 53 unsigned int *count, 54 unsigned int max_entries) 55 { 56 struct io_ring_ctx *ctx = NULL; 57 struct io_tw_state ts = { }; 58 59 do { 60 struct llist_node *next = node->next; 61 struct io_kiocb *req = container_of(node, struct io_kiocb, 62 io_task_work.node); 63 64 if (req->ctx != ctx) { 65 ctx_flush_and_put(ctx, ts); 66 ctx = req->ctx; 67 mutex_lock(&ctx->uring_lock); 68 percpu_ref_get(&ctx->refs); 69 ts.cancel = io_should_terminate_tw(ctx); 70 } 71 INDIRECT_CALL_2(req->io_task_work.func, 72 io_poll_task_func, io_req_rw_complete, 73 (struct io_tw_req){req}, ts); 74 node = next; 75 (*count)++; 76 if (unlikely(need_resched())) { 77 ctx_flush_and_put(ctx, ts); 78 ctx = NULL; 79 cond_resched(); 80 } 81 } while (node && *count < max_entries); 82 83 ctx_flush_and_put(ctx, ts); 84 return node; 85 } 86 87 static __cold void __io_fallback_tw(struct llist_node *node, bool sync) 88 { 89 struct io_ring_ctx *last_ctx = NULL; 90 struct io_kiocb *req; 91 92 while (node) { 93 req = container_of(node, struct io_kiocb, io_task_work.node); 94 node = node->next; 95 if (last_ctx != req->ctx) { 96 if (last_ctx) { 97 if (sync) 98 flush_delayed_work(&last_ctx->fallback_work); 99 percpu_ref_put(&last_ctx->refs); 100 } 101 last_ctx = req->ctx; 102 percpu_ref_get(&last_ctx->refs); 103 } 104 if (llist_add(&req->io_task_work.node, &last_ctx->fallback_llist)) 105 schedule_delayed_work(&last_ctx->fallback_work, 1); 106 } 107 108 if (last_ctx) { 109 if (sync) 110 flush_delayed_work(&last_ctx->fallback_work); 111 percpu_ref_put(&last_ctx->refs); 112 } 113 } 114 115 static void io_fallback_tw(struct io_uring_task *tctx, bool sync) 116 { 117 struct llist_node *node = llist_del_all(&tctx->task_list); 118 119 __io_fallback_tw(node, sync); 120 } 121 122 struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, 123 unsigned int max_entries, 124 unsigned int *count) 125 { 126 struct llist_node *node; 127 128 node = llist_del_all(&tctx->task_list); 129 if (node) { 130 node = llist_reverse_order(node); 131 node = io_handle_tw_list(node, count, max_entries); 132 } 133 134 /* relaxed read is enough as only the task itself sets ->in_cancel */ 135 if (unlikely(atomic_read(&tctx->in_cancel))) 136 io_uring_drop_tctx_refs(current); 137 138 trace_io_uring_task_work_run(tctx, *count); 139 return node; 140 } 141 142 void tctx_task_work(struct callback_head *cb) 143 { 144 struct io_uring_task *tctx; 145 struct llist_node *ret; 146 unsigned int count = 0; 147 148 tctx = container_of(cb, struct io_uring_task, task_work); 149 ret = tctx_task_work_run(tctx, UINT_MAX, &count); 150 /* can't happen */ 151 WARN_ON_ONCE(ret); 152 } 153 154 void io_req_local_work_add(struct io_kiocb *req, unsigned flags) 155 { 156 struct io_ring_ctx *ctx = req->ctx; 157 unsigned nr_wait, nr_tw, nr_tw_prev; 158 struct llist_node *head; 159 160 /* See comment above IO_CQ_WAKE_INIT */ 161 BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES); 162 163 /* 164 * We don't know how many requests there are in the link and whether 165 * they can even be queued lazily, fall back to non-lazy. 166 */ 167 if (req->flags & IO_REQ_LINK_FLAGS) 168 flags &= ~IOU_F_TWQ_LAZY_WAKE; 169 170 guard(rcu)(); 171 172 head = READ_ONCE(ctx->work_llist.first); 173 do { 174 nr_tw_prev = 0; 175 if (head) { 176 struct io_kiocb *first_req = container_of(head, 177 struct io_kiocb, 178 io_task_work.node); 179 /* 180 * Might be executed at any moment, rely on 181 * SLAB_TYPESAFE_BY_RCU to keep it alive. 182 */ 183 nr_tw_prev = READ_ONCE(first_req->nr_tw); 184 } 185 186 /* 187 * Theoretically, it can overflow, but that's fine as one of 188 * previous adds should've tried to wake the task. 189 */ 190 nr_tw = nr_tw_prev + 1; 191 if (!(flags & IOU_F_TWQ_LAZY_WAKE)) 192 nr_tw = IO_CQ_WAKE_FORCE; 193 194 req->nr_tw = nr_tw; 195 req->io_task_work.node.next = head; 196 } while (!try_cmpxchg(&ctx->work_llist.first, &head, 197 &req->io_task_work.node)); 198 199 /* 200 * cmpxchg implies a full barrier, which pairs with the barrier 201 * in set_current_state() on the io_cqring_wait() side. It's used 202 * to ensure that either we see updated ->cq_wait_nr, or waiters 203 * going to sleep will observe the work added to the list, which 204 * is similar to the wait/wawke task state sync. 205 */ 206 207 if (!head) { 208 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) 209 atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); 210 if (ctx->has_evfd) 211 io_eventfd_signal(ctx, false); 212 } 213 214 nr_wait = atomic_read(&ctx->cq_wait_nr); 215 /* not enough or no one is waiting */ 216 if (nr_tw < nr_wait) 217 return; 218 /* the previous add has already woken it up */ 219 if (nr_tw_prev >= nr_wait) 220 return; 221 wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE); 222 } 223 224 void io_req_normal_work_add(struct io_kiocb *req) 225 { 226 struct io_uring_task *tctx = req->tctx; 227 struct io_ring_ctx *ctx = req->ctx; 228 229 /* task_work already pending, we're done */ 230 if (!llist_add(&req->io_task_work.node, &tctx->task_list)) 231 return; 232 233 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) 234 atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); 235 236 /* SQPOLL doesn't need the task_work added, it'll run it itself */ 237 if (ctx->flags & IORING_SETUP_SQPOLL) { 238 __set_notify_signal(tctx->task); 239 return; 240 } 241 242 if (likely(!task_work_add(tctx->task, &tctx->task_work, ctx->notify_method))) 243 return; 244 245 io_fallback_tw(tctx, false); 246 } 247 248 void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags) 249 { 250 if (WARN_ON_ONCE(!(req->ctx->flags & IORING_SETUP_DEFER_TASKRUN))) 251 return; 252 __io_req_task_work_add(req, flags); 253 } 254 255 void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx) 256 { 257 struct llist_node *node = llist_del_all(&ctx->work_llist); 258 259 __io_fallback_tw(node, false); 260 node = llist_del_all(&ctx->retry_llist); 261 __io_fallback_tw(node, false); 262 } 263 264 static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events, 265 int min_events) 266 { 267 if (!io_local_work_pending(ctx)) 268 return false; 269 if (events < min_events) 270 return true; 271 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) 272 atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); 273 return false; 274 } 275 276 static int __io_run_local_work_loop(struct llist_node **node, 277 io_tw_token_t tw, 278 int events) 279 { 280 int ret = 0; 281 282 while (*node) { 283 struct llist_node *next = (*node)->next; 284 struct io_kiocb *req = container_of(*node, struct io_kiocb, 285 io_task_work.node); 286 INDIRECT_CALL_2(req->io_task_work.func, 287 io_poll_task_func, io_req_rw_complete, 288 (struct io_tw_req){req}, tw); 289 *node = next; 290 if (++ret >= events) 291 break; 292 } 293 294 return ret; 295 } 296 297 static int __io_run_local_work(struct io_ring_ctx *ctx, io_tw_token_t tw, 298 int min_events, int max_events) 299 { 300 struct llist_node *node; 301 unsigned int loops = 0; 302 int ret = 0; 303 304 if (WARN_ON_ONCE(ctx->submitter_task != current)) 305 return -EEXIST; 306 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) 307 atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); 308 again: 309 tw.cancel = io_should_terminate_tw(ctx); 310 min_events -= ret; 311 ret = __io_run_local_work_loop(&ctx->retry_llist.first, tw, max_events); 312 if (ctx->retry_llist.first) 313 goto retry_done; 314 315 /* 316 * llists are in reverse order, flip it back the right way before 317 * running the pending items. 318 */ 319 node = llist_reverse_order(llist_del_all(&ctx->work_llist)); 320 ret += __io_run_local_work_loop(&node, tw, max_events - ret); 321 ctx->retry_llist.first = node; 322 loops++; 323 324 if (io_run_local_work_continue(ctx, ret, min_events)) 325 goto again; 326 retry_done: 327 io_submit_flush_completions(ctx); 328 if (io_run_local_work_continue(ctx, ret, min_events)) 329 goto again; 330 331 trace_io_uring_local_work_run(ctx, ret, loops); 332 return ret; 333 } 334 335 int io_run_local_work_locked(struct io_ring_ctx *ctx, int min_events) 336 { 337 struct io_tw_state ts = {}; 338 339 if (!io_local_work_pending(ctx)) 340 return 0; 341 return __io_run_local_work(ctx, ts, min_events, 342 max(IO_LOCAL_TW_DEFAULT_MAX, min_events)); 343 } 344 345 int io_run_local_work(struct io_ring_ctx *ctx, int min_events, int max_events) 346 { 347 struct io_tw_state ts = {}; 348 int ret; 349 350 mutex_lock(&ctx->uring_lock); 351 ret = __io_run_local_work(ctx, ts, min_events, max_events); 352 mutex_unlock(&ctx->uring_lock); 353 return ret; 354 } 355