1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Task work handling for io_uring 4 */ 5 #include <linux/kernel.h> 6 #include <linux/errno.h> 7 #include <linux/sched/signal.h> 8 #include <linux/io_uring.h> 9 #include <linux/indirect_call_wrapper.h> 10 11 #include "io_uring.h" 12 #include "tctx.h" 13 #include "poll.h" 14 #include "rw.h" 15 #include "eventfd.h" 16 #include "wait.h" 17 18 void io_fallback_req_func(struct work_struct *work) 19 { 20 struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx, 21 fallback_work.work); 22 struct llist_node *node = llist_del_all(&ctx->fallback_llist); 23 struct io_kiocb *req, *tmp; 24 struct io_tw_state ts = {}; 25 26 percpu_ref_get(&ctx->refs); 27 mutex_lock(&ctx->uring_lock); 28 ts.cancel = io_should_terminate_tw(ctx); 29 llist_for_each_entry_safe(req, tmp, node, io_task_work.node) 30 req->io_task_work.func((struct io_tw_req){req}, ts); 31 io_submit_flush_completions(ctx); 32 mutex_unlock(&ctx->uring_lock); 33 percpu_ref_put(&ctx->refs); 34 } 35 36 static void ctx_flush_and_put(struct io_ring_ctx *ctx, io_tw_token_t tw) 37 { 38 if (!ctx) 39 return; 40 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) 41 atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); 42 43 io_submit_flush_completions(ctx); 44 mutex_unlock(&ctx->uring_lock); 45 percpu_ref_put(&ctx->refs); 46 } 47 48 /* 49 * Run queued task_work, returning the number of entries processed in *count. 50 * If more entries than max_entries are available, stop processing once this 51 * is reached and return the rest of the list. 52 */ 53 struct llist_node *io_handle_tw_list(struct llist_node *node, 54 unsigned int *count, 55 unsigned int max_entries) 56 { 57 struct io_ring_ctx *ctx = NULL; 58 struct io_tw_state ts = { }; 59 60 do { 61 struct llist_node *next = node->next; 62 struct io_kiocb *req = container_of(node, struct io_kiocb, 63 io_task_work.node); 64 65 if (req->ctx != ctx) { 66 ctx_flush_and_put(ctx, ts); 67 ctx = req->ctx; 68 mutex_lock(&ctx->uring_lock); 69 percpu_ref_get(&ctx->refs); 70 ts.cancel = io_should_terminate_tw(ctx); 71 } 72 INDIRECT_CALL_2(req->io_task_work.func, 73 io_poll_task_func, io_req_rw_complete, 74 (struct io_tw_req){req}, ts); 75 node = next; 76 (*count)++; 77 if (unlikely(need_resched())) { 78 ctx_flush_and_put(ctx, ts); 79 ctx = NULL; 80 cond_resched(); 81 } 82 } while (node && *count < max_entries); 83 84 ctx_flush_and_put(ctx, ts); 85 return node; 86 } 87 88 static __cold void __io_fallback_tw(struct llist_node *node, bool sync) 89 { 90 struct io_ring_ctx *last_ctx = NULL; 91 struct io_kiocb *req; 92 93 while (node) { 94 req = container_of(node, struct io_kiocb, io_task_work.node); 95 node = node->next; 96 if (last_ctx != req->ctx) { 97 if (last_ctx) { 98 if (sync) 99 flush_delayed_work(&last_ctx->fallback_work); 100 percpu_ref_put(&last_ctx->refs); 101 } 102 last_ctx = req->ctx; 103 percpu_ref_get(&last_ctx->refs); 104 } 105 if (llist_add(&req->io_task_work.node, &last_ctx->fallback_llist)) 106 schedule_delayed_work(&last_ctx->fallback_work, 1); 107 } 108 109 if (last_ctx) { 110 if (sync) 111 flush_delayed_work(&last_ctx->fallback_work); 112 percpu_ref_put(&last_ctx->refs); 113 } 114 } 115 116 static void io_fallback_tw(struct io_uring_task *tctx, bool sync) 117 { 118 struct llist_node *node = llist_del_all(&tctx->task_list); 119 120 __io_fallback_tw(node, sync); 121 } 122 123 struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, 124 unsigned int max_entries, 125 unsigned int *count) 126 { 127 struct llist_node *node; 128 129 node = llist_del_all(&tctx->task_list); 130 if (node) { 131 node = llist_reverse_order(node); 132 node = io_handle_tw_list(node, count, max_entries); 133 } 134 135 /* relaxed read is enough as only the task itself sets ->in_cancel */ 136 if (unlikely(atomic_read(&tctx->in_cancel))) 137 io_uring_drop_tctx_refs(current); 138 139 trace_io_uring_task_work_run(tctx, *count); 140 return node; 141 } 142 143 void tctx_task_work(struct callback_head *cb) 144 { 145 struct io_uring_task *tctx; 146 struct llist_node *ret; 147 unsigned int count = 0; 148 149 tctx = container_of(cb, struct io_uring_task, task_work); 150 ret = tctx_task_work_run(tctx, UINT_MAX, &count); 151 /* can't happen */ 152 WARN_ON_ONCE(ret); 153 } 154 155 /* 156 * Sets IORING_SQ_TASKRUN in the sq_flags shared with userspace, using the 157 * RCU protected rings pointer to be safe against concurrent ring resizing. 158 */ 159 static void io_ctx_mark_taskrun(struct io_ring_ctx *ctx) 160 { 161 lockdep_assert_in_rcu_read_lock(); 162 163 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) { 164 struct io_rings *rings = rcu_dereference(ctx->rings_rcu); 165 166 atomic_or(IORING_SQ_TASKRUN, &rings->sq_flags); 167 } 168 } 169 170 void io_req_local_work_add(struct io_kiocb *req, unsigned flags) 171 { 172 struct io_ring_ctx *ctx = req->ctx; 173 unsigned nr_wait, nr_tw, nr_tw_prev; 174 struct llist_node *head; 175 176 /* See comment above IO_CQ_WAKE_INIT */ 177 BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES); 178 179 /* 180 * We don't know how many requests there are in the link and whether 181 * they can even be queued lazily, fall back to non-lazy. 182 */ 183 if (req->flags & IO_REQ_LINK_FLAGS) 184 flags &= ~IOU_F_TWQ_LAZY_WAKE; 185 186 guard(rcu)(); 187 188 head = READ_ONCE(ctx->work_llist.first); 189 do { 190 nr_tw_prev = 0; 191 if (head) { 192 struct io_kiocb *first_req = container_of(head, 193 struct io_kiocb, 194 io_task_work.node); 195 /* 196 * Might be executed at any moment, rely on 197 * SLAB_TYPESAFE_BY_RCU to keep it alive. 198 */ 199 nr_tw_prev = READ_ONCE(first_req->nr_tw); 200 } 201 202 /* 203 * Theoretically, it can overflow, but that's fine as one of 204 * previous adds should've tried to wake the task. 205 */ 206 nr_tw = nr_tw_prev + 1; 207 if (!(flags & IOU_F_TWQ_LAZY_WAKE)) 208 nr_tw = IO_CQ_WAKE_FORCE; 209 210 req->nr_tw = nr_tw; 211 req->io_task_work.node.next = head; 212 } while (!try_cmpxchg(&ctx->work_llist.first, &head, 213 &req->io_task_work.node)); 214 215 /* 216 * cmpxchg implies a full barrier, which pairs with the barrier 217 * in set_current_state() on the io_cqring_wait() side. It's used 218 * to ensure that either we see updated ->cq_wait_nr, or waiters 219 * going to sleep will observe the work added to the list, which 220 * is similar to the wait/wawke task state sync. 221 */ 222 223 if (!head) { 224 io_ctx_mark_taskrun(ctx); 225 if (ctx->has_evfd) 226 io_eventfd_signal(ctx, false); 227 } 228 229 nr_wait = atomic_read(&ctx->cq_wait_nr); 230 /* not enough or no one is waiting */ 231 if (nr_tw < nr_wait) 232 return; 233 /* the previous add has already woken it up */ 234 if (nr_tw_prev >= nr_wait) 235 return; 236 wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE); 237 } 238 239 void io_req_normal_work_add(struct io_kiocb *req) 240 { 241 struct io_uring_task *tctx = req->tctx; 242 struct io_ring_ctx *ctx = req->ctx; 243 244 /* task_work already pending, we're done */ 245 if (!llist_add(&req->io_task_work.node, &tctx->task_list)) 246 return; 247 248 /* 249 * Doesn't need to use ->rings_rcu, as resizing isn't supported for 250 * !DEFER_TASKRUN. 251 */ 252 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) 253 atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); 254 255 /* SQPOLL doesn't need the task_work added, it'll run it itself */ 256 if (ctx->flags & IORING_SETUP_SQPOLL) { 257 __set_notify_signal(tctx->task); 258 return; 259 } 260 261 if (likely(!task_work_add(tctx->task, &tctx->task_work, ctx->notify_method))) 262 return; 263 264 io_fallback_tw(tctx, false); 265 } 266 267 void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags) 268 { 269 if (WARN_ON_ONCE(!(req->ctx->flags & IORING_SETUP_DEFER_TASKRUN))) 270 return; 271 __io_req_task_work_add(req, flags); 272 } 273 274 void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx) 275 { 276 struct llist_node *node = llist_del_all(&ctx->work_llist); 277 278 __io_fallback_tw(node, false); 279 node = llist_del_all(&ctx->retry_llist); 280 __io_fallback_tw(node, false); 281 } 282 283 static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events, 284 int min_events) 285 { 286 if (!io_local_work_pending(ctx)) 287 return false; 288 if (events < min_events) 289 return true; 290 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) 291 atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); 292 return false; 293 } 294 295 static int __io_run_local_work_loop(struct llist_node **node, 296 io_tw_token_t tw, 297 int events) 298 { 299 int ret = 0; 300 301 while (*node) { 302 struct llist_node *next = (*node)->next; 303 struct io_kiocb *req = container_of(*node, struct io_kiocb, 304 io_task_work.node); 305 INDIRECT_CALL_2(req->io_task_work.func, 306 io_poll_task_func, io_req_rw_complete, 307 (struct io_tw_req){req}, tw); 308 *node = next; 309 if (++ret >= events) 310 break; 311 } 312 313 return ret; 314 } 315 316 static int __io_run_local_work(struct io_ring_ctx *ctx, io_tw_token_t tw, 317 int min_events, int max_events) 318 { 319 struct llist_node *node; 320 unsigned int loops = 0; 321 int ret = 0; 322 323 if (WARN_ON_ONCE(ctx->submitter_task != current)) 324 return -EEXIST; 325 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) 326 atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); 327 again: 328 tw.cancel = io_should_terminate_tw(ctx); 329 min_events -= ret; 330 ret = __io_run_local_work_loop(&ctx->retry_llist.first, tw, max_events); 331 if (ctx->retry_llist.first) 332 goto retry_done; 333 334 /* 335 * llists are in reverse order, flip it back the right way before 336 * running the pending items. 337 */ 338 node = llist_reverse_order(llist_del_all(&ctx->work_llist)); 339 ret += __io_run_local_work_loop(&node, tw, max_events - ret); 340 ctx->retry_llist.first = node; 341 loops++; 342 343 if (io_run_local_work_continue(ctx, ret, min_events)) 344 goto again; 345 retry_done: 346 io_submit_flush_completions(ctx); 347 if (io_run_local_work_continue(ctx, ret, min_events)) 348 goto again; 349 350 trace_io_uring_local_work_run(ctx, ret, loops); 351 return ret; 352 } 353 354 int io_run_local_work_locked(struct io_ring_ctx *ctx, int min_events) 355 { 356 struct io_tw_state ts = {}; 357 358 if (!io_local_work_pending(ctx)) 359 return 0; 360 return __io_run_local_work(ctx, ts, min_events, 361 max(IO_LOCAL_TW_DEFAULT_MAX, min_events)); 362 } 363 364 int io_run_local_work(struct io_ring_ctx *ctx, int min_events, int max_events) 365 { 366 struct io_tw_state ts = {}; 367 int ret; 368 369 mutex_lock(&ctx->uring_lock); 370 ret = __io_run_local_work(ctx, ts, min_events, max_events); 371 mutex_unlock(&ctx->uring_lock); 372 return ret; 373 } 374