1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Contains the core associated with submission side polling of the SQ 4 * ring, offloading submissions from the application to a kernel thread. 5 */ 6 #include <linux/kernel.h> 7 #include <linux/errno.h> 8 #include <linux/file.h> 9 #include <linux/mm.h> 10 #include <linux/slab.h> 11 #include <linux/audit.h> 12 #include <linux/security.h> 13 #include <linux/io_uring.h> 14 15 #include <uapi/linux/io_uring.h> 16 17 #include "io_uring.h" 18 #include "napi.h" 19 #include "sqpoll.h" 20 21 #define IORING_SQPOLL_CAP_ENTRIES_VALUE 8 22 #define IORING_TW_CAP_ENTRIES_VALUE 8 23 24 enum { 25 IO_SQ_THREAD_SHOULD_STOP = 0, 26 IO_SQ_THREAD_SHOULD_PARK, 27 }; 28 29 void io_sq_thread_unpark(struct io_sq_data *sqd) 30 __releases(&sqd->lock) 31 { 32 WARN_ON_ONCE(sqd->thread == current); 33 34 /* 35 * Do the dance but not conditional clear_bit() because it'd race with 36 * other threads incrementing park_pending and setting the bit. 37 */ 38 clear_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); 39 if (atomic_dec_return(&sqd->park_pending)) 40 set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); 41 mutex_unlock(&sqd->lock); 42 } 43 44 void io_sq_thread_park(struct io_sq_data *sqd) 45 __acquires(&sqd->lock) 46 { 47 WARN_ON_ONCE(sqd->thread == current); 48 49 atomic_inc(&sqd->park_pending); 50 set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); 51 mutex_lock(&sqd->lock); 52 if (sqd->thread) 53 wake_up_process(sqd->thread); 54 } 55 56 void io_sq_thread_stop(struct io_sq_data *sqd) 57 { 58 WARN_ON_ONCE(sqd->thread == current); 59 WARN_ON_ONCE(test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state)); 60 61 set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state); 62 mutex_lock(&sqd->lock); 63 if (sqd->thread) 64 wake_up_process(sqd->thread); 65 mutex_unlock(&sqd->lock); 66 wait_for_completion(&sqd->exited); 67 } 68 69 void io_put_sq_data(struct io_sq_data *sqd) 70 { 71 if (refcount_dec_and_test(&sqd->refs)) { 72 WARN_ON_ONCE(atomic_read(&sqd->park_pending)); 73 74 io_sq_thread_stop(sqd); 75 kfree(sqd); 76 } 77 } 78 79 static __cold void io_sqd_update_thread_idle(struct io_sq_data *sqd) 80 { 81 struct io_ring_ctx *ctx; 82 unsigned sq_thread_idle = 0; 83 84 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) 85 sq_thread_idle = max(sq_thread_idle, ctx->sq_thread_idle); 86 sqd->sq_thread_idle = sq_thread_idle; 87 } 88 89 void io_sq_thread_finish(struct io_ring_ctx *ctx) 90 { 91 struct io_sq_data *sqd = ctx->sq_data; 92 93 if (sqd) { 94 io_sq_thread_park(sqd); 95 list_del_init(&ctx->sqd_list); 96 io_sqd_update_thread_idle(sqd); 97 io_sq_thread_unpark(sqd); 98 99 io_put_sq_data(sqd); 100 ctx->sq_data = NULL; 101 } 102 } 103 104 static struct io_sq_data *io_attach_sq_data(struct io_uring_params *p) 105 { 106 struct io_ring_ctx *ctx_attach; 107 struct io_sq_data *sqd; 108 struct fd f; 109 110 f = fdget(p->wq_fd); 111 if (!f.file) 112 return ERR_PTR(-ENXIO); 113 if (!io_is_uring_fops(f.file)) { 114 fdput(f); 115 return ERR_PTR(-EINVAL); 116 } 117 118 ctx_attach = f.file->private_data; 119 sqd = ctx_attach->sq_data; 120 if (!sqd) { 121 fdput(f); 122 return ERR_PTR(-EINVAL); 123 } 124 if (sqd->task_tgid != current->tgid) { 125 fdput(f); 126 return ERR_PTR(-EPERM); 127 } 128 129 refcount_inc(&sqd->refs); 130 fdput(f); 131 return sqd; 132 } 133 134 static struct io_sq_data *io_get_sq_data(struct io_uring_params *p, 135 bool *attached) 136 { 137 struct io_sq_data *sqd; 138 139 *attached = false; 140 if (p->flags & IORING_SETUP_ATTACH_WQ) { 141 sqd = io_attach_sq_data(p); 142 if (!IS_ERR(sqd)) { 143 *attached = true; 144 return sqd; 145 } 146 /* fall through for EPERM case, setup new sqd/task */ 147 if (PTR_ERR(sqd) != -EPERM) 148 return sqd; 149 } 150 151 sqd = kzalloc(sizeof(*sqd), GFP_KERNEL); 152 if (!sqd) 153 return ERR_PTR(-ENOMEM); 154 155 atomic_set(&sqd->park_pending, 0); 156 refcount_set(&sqd->refs, 1); 157 INIT_LIST_HEAD(&sqd->ctx_list); 158 mutex_init(&sqd->lock); 159 init_waitqueue_head(&sqd->wait); 160 init_completion(&sqd->exited); 161 return sqd; 162 } 163 164 static inline bool io_sqd_events_pending(struct io_sq_data *sqd) 165 { 166 return READ_ONCE(sqd->state); 167 } 168 169 static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries) 170 { 171 unsigned int to_submit; 172 int ret = 0; 173 174 to_submit = io_sqring_entries(ctx); 175 /* if we're handling multiple rings, cap submit size for fairness */ 176 if (cap_entries && to_submit > IORING_SQPOLL_CAP_ENTRIES_VALUE) 177 to_submit = IORING_SQPOLL_CAP_ENTRIES_VALUE; 178 179 if (!wq_list_empty(&ctx->iopoll_list) || to_submit) { 180 const struct cred *creds = NULL; 181 182 if (ctx->sq_creds != current_cred()) 183 creds = override_creds(ctx->sq_creds); 184 185 mutex_lock(&ctx->uring_lock); 186 if (!wq_list_empty(&ctx->iopoll_list)) 187 io_do_iopoll(ctx, true); 188 189 /* 190 * Don't submit if refs are dying, good for io_uring_register(), 191 * but also it is relied upon by io_ring_exit_work() 192 */ 193 if (to_submit && likely(!percpu_ref_is_dying(&ctx->refs)) && 194 !(ctx->flags & IORING_SETUP_R_DISABLED)) 195 ret = io_submit_sqes(ctx, to_submit); 196 mutex_unlock(&ctx->uring_lock); 197 198 if (io_napi(ctx)) 199 ret += io_napi_sqpoll_busy_poll(ctx); 200 201 if (to_submit && wq_has_sleeper(&ctx->sqo_sq_wait)) 202 wake_up(&ctx->sqo_sq_wait); 203 if (creds) 204 revert_creds(creds); 205 } 206 207 return ret; 208 } 209 210 static bool io_sqd_handle_event(struct io_sq_data *sqd) 211 { 212 bool did_sig = false; 213 struct ksignal ksig; 214 215 if (test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state) || 216 signal_pending(current)) { 217 mutex_unlock(&sqd->lock); 218 if (signal_pending(current)) 219 did_sig = get_signal(&ksig); 220 cond_resched(); 221 mutex_lock(&sqd->lock); 222 sqd->sq_cpu = raw_smp_processor_id(); 223 } 224 return did_sig || test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state); 225 } 226 227 /* 228 * Run task_work, processing the retry_list first. The retry_list holds 229 * entries that we passed on in the previous run, if we had more task_work 230 * than we were asked to process. Newly queued task_work isn't run until the 231 * retry list has been fully processed. 232 */ 233 static unsigned int io_sq_tw(struct llist_node **retry_list, int max_entries) 234 { 235 struct io_uring_task *tctx = current->io_uring; 236 unsigned int count = 0; 237 238 if (*retry_list) { 239 *retry_list = io_handle_tw_list(*retry_list, &count, max_entries); 240 if (count >= max_entries) 241 goto out; 242 max_entries -= count; 243 } 244 *retry_list = tctx_task_work_run(tctx, max_entries, &count); 245 out: 246 if (task_work_pending(current)) 247 task_work_run(); 248 return count; 249 } 250 251 static bool io_sq_tw_pending(struct llist_node *retry_list) 252 { 253 struct io_uring_task *tctx = current->io_uring; 254 255 return retry_list || !llist_empty(&tctx->task_list); 256 } 257 258 static void io_sq_update_worktime(struct io_sq_data *sqd, struct rusage *start) 259 { 260 struct rusage end; 261 262 getrusage(current, RUSAGE_SELF, &end); 263 end.ru_stime.tv_sec -= start->ru_stime.tv_sec; 264 end.ru_stime.tv_usec -= start->ru_stime.tv_usec; 265 266 sqd->work_time += end.ru_stime.tv_usec + end.ru_stime.tv_sec * 1000000; 267 } 268 269 static int io_sq_thread(void *data) 270 { 271 struct llist_node *retry_list = NULL; 272 struct io_sq_data *sqd = data; 273 struct io_ring_ctx *ctx; 274 struct rusage start; 275 unsigned long timeout = 0; 276 char buf[TASK_COMM_LEN]; 277 DEFINE_WAIT(wait); 278 279 /* offload context creation failed, just exit */ 280 if (!current->io_uring) 281 goto err_out; 282 283 snprintf(buf, sizeof(buf), "iou-sqp-%d", sqd->task_pid); 284 set_task_comm(current, buf); 285 286 /* reset to our pid after we've set task_comm, for fdinfo */ 287 sqd->task_pid = current->pid; 288 289 if (sqd->sq_cpu != -1) { 290 set_cpus_allowed_ptr(current, cpumask_of(sqd->sq_cpu)); 291 } else { 292 set_cpus_allowed_ptr(current, cpu_online_mask); 293 sqd->sq_cpu = raw_smp_processor_id(); 294 } 295 296 /* 297 * Force audit context to get setup, in case we do prep side async 298 * operations that would trigger an audit call before any issue side 299 * audit has been done. 300 */ 301 audit_uring_entry(IORING_OP_NOP); 302 audit_uring_exit(true, 0); 303 304 mutex_lock(&sqd->lock); 305 while (1) { 306 bool cap_entries, sqt_spin = false; 307 308 if (io_sqd_events_pending(sqd) || signal_pending(current)) { 309 if (io_sqd_handle_event(sqd)) 310 break; 311 timeout = jiffies + sqd->sq_thread_idle; 312 } 313 314 cap_entries = !list_is_singular(&sqd->ctx_list); 315 getrusage(current, RUSAGE_SELF, &start); 316 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) { 317 int ret = __io_sq_thread(ctx, cap_entries); 318 319 if (!sqt_spin && (ret > 0 || !wq_list_empty(&ctx->iopoll_list))) 320 sqt_spin = true; 321 } 322 if (io_sq_tw(&retry_list, IORING_TW_CAP_ENTRIES_VALUE)) 323 sqt_spin = true; 324 325 if (sqt_spin || !time_after(jiffies, timeout)) { 326 if (sqt_spin) { 327 io_sq_update_worktime(sqd, &start); 328 timeout = jiffies + sqd->sq_thread_idle; 329 } 330 if (unlikely(need_resched())) { 331 mutex_unlock(&sqd->lock); 332 cond_resched(); 333 mutex_lock(&sqd->lock); 334 sqd->sq_cpu = raw_smp_processor_id(); 335 } 336 continue; 337 } 338 339 prepare_to_wait(&sqd->wait, &wait, TASK_INTERRUPTIBLE); 340 if (!io_sqd_events_pending(sqd) && !io_sq_tw_pending(retry_list)) { 341 bool needs_sched = true; 342 343 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) { 344 atomic_or(IORING_SQ_NEED_WAKEUP, 345 &ctx->rings->sq_flags); 346 if ((ctx->flags & IORING_SETUP_IOPOLL) && 347 !wq_list_empty(&ctx->iopoll_list)) { 348 needs_sched = false; 349 break; 350 } 351 352 /* 353 * Ensure the store of the wakeup flag is not 354 * reordered with the load of the SQ tail 355 */ 356 smp_mb__after_atomic(); 357 358 if (io_sqring_entries(ctx)) { 359 needs_sched = false; 360 break; 361 } 362 } 363 364 if (needs_sched) { 365 mutex_unlock(&sqd->lock); 366 schedule(); 367 mutex_lock(&sqd->lock); 368 sqd->sq_cpu = raw_smp_processor_id(); 369 } 370 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) 371 atomic_andnot(IORING_SQ_NEED_WAKEUP, 372 &ctx->rings->sq_flags); 373 } 374 375 finish_wait(&sqd->wait, &wait); 376 timeout = jiffies + sqd->sq_thread_idle; 377 } 378 379 if (retry_list) 380 io_sq_tw(&retry_list, UINT_MAX); 381 382 io_uring_cancel_generic(true, sqd); 383 sqd->thread = NULL; 384 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) 385 atomic_or(IORING_SQ_NEED_WAKEUP, &ctx->rings->sq_flags); 386 io_run_task_work(); 387 mutex_unlock(&sqd->lock); 388 err_out: 389 complete(&sqd->exited); 390 do_exit(0); 391 } 392 393 void io_sqpoll_wait_sq(struct io_ring_ctx *ctx) 394 { 395 DEFINE_WAIT(wait); 396 397 do { 398 if (!io_sqring_full(ctx)) 399 break; 400 prepare_to_wait(&ctx->sqo_sq_wait, &wait, TASK_INTERRUPTIBLE); 401 402 if (!io_sqring_full(ctx)) 403 break; 404 schedule(); 405 } while (!signal_pending(current)); 406 407 finish_wait(&ctx->sqo_sq_wait, &wait); 408 } 409 410 __cold int io_sq_offload_create(struct io_ring_ctx *ctx, 411 struct io_uring_params *p) 412 { 413 int ret; 414 415 /* Retain compatibility with failing for an invalid attach attempt */ 416 if ((ctx->flags & (IORING_SETUP_ATTACH_WQ | IORING_SETUP_SQPOLL)) == 417 IORING_SETUP_ATTACH_WQ) { 418 struct fd f; 419 420 f = fdget(p->wq_fd); 421 if (!f.file) 422 return -ENXIO; 423 if (!io_is_uring_fops(f.file)) { 424 fdput(f); 425 return -EINVAL; 426 } 427 fdput(f); 428 } 429 if (ctx->flags & IORING_SETUP_SQPOLL) { 430 struct task_struct *tsk; 431 struct io_sq_data *sqd; 432 bool attached; 433 434 ret = security_uring_sqpoll(); 435 if (ret) 436 return ret; 437 438 sqd = io_get_sq_data(p, &attached); 439 if (IS_ERR(sqd)) { 440 ret = PTR_ERR(sqd); 441 goto err; 442 } 443 444 ctx->sq_creds = get_current_cred(); 445 ctx->sq_data = sqd; 446 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle); 447 if (!ctx->sq_thread_idle) 448 ctx->sq_thread_idle = HZ; 449 450 io_sq_thread_park(sqd); 451 list_add(&ctx->sqd_list, &sqd->ctx_list); 452 io_sqd_update_thread_idle(sqd); 453 /* don't attach to a dying SQPOLL thread, would be racy */ 454 ret = (attached && !sqd->thread) ? -ENXIO : 0; 455 io_sq_thread_unpark(sqd); 456 457 if (ret < 0) 458 goto err; 459 if (attached) 460 return 0; 461 462 if (p->flags & IORING_SETUP_SQ_AFF) { 463 int cpu = p->sq_thread_cpu; 464 465 ret = -EINVAL; 466 if (cpu >= nr_cpu_ids || !cpu_online(cpu)) 467 goto err_sqpoll; 468 sqd->sq_cpu = cpu; 469 } else { 470 sqd->sq_cpu = -1; 471 } 472 473 sqd->task_pid = current->pid; 474 sqd->task_tgid = current->tgid; 475 tsk = create_io_thread(io_sq_thread, sqd, NUMA_NO_NODE); 476 if (IS_ERR(tsk)) { 477 ret = PTR_ERR(tsk); 478 goto err_sqpoll; 479 } 480 481 sqd->thread = tsk; 482 ret = io_uring_alloc_task_context(tsk, ctx); 483 wake_up_new_task(tsk); 484 if (ret) 485 goto err; 486 } else if (p->flags & IORING_SETUP_SQ_AFF) { 487 /* Can't have SQ_AFF without SQPOLL */ 488 ret = -EINVAL; 489 goto err; 490 } 491 492 return 0; 493 err_sqpoll: 494 complete(&ctx->sq_data->exited); 495 err: 496 io_sq_thread_finish(ctx); 497 return ret; 498 } 499 500 __cold int io_sqpoll_wq_cpu_affinity(struct io_ring_ctx *ctx, 501 cpumask_var_t mask) 502 { 503 struct io_sq_data *sqd = ctx->sq_data; 504 int ret = -EINVAL; 505 506 if (sqd) { 507 io_sq_thread_park(sqd); 508 /* Don't set affinity for a dying thread */ 509 if (sqd->thread) 510 ret = io_wq_cpu_affinity(sqd->thread->io_uring, mask); 511 io_sq_thread_unpark(sqd); 512 } 513 514 return ret; 515 } 516