1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Contains the core associated with submission side polling of the SQ 4 * ring, offloading submissions from the application to a kernel thread. 5 */ 6 #include <linux/kernel.h> 7 #include <linux/errno.h> 8 #include <linux/file.h> 9 #include <linux/mm.h> 10 #include <linux/slab.h> 11 #include <linux/audit.h> 12 #include <linux/security.h> 13 #include <linux/cpuset.h> 14 #include <linux/sched/cputime.h> 15 #include <linux/io_uring.h> 16 17 #include <uapi/linux/io_uring.h> 18 19 #include "io_uring.h" 20 #include "tctx.h" 21 #include "napi.h" 22 #include "cancel.h" 23 #include "sqpoll.h" 24 25 #define IORING_SQPOLL_CAP_ENTRIES_VALUE 8 26 #define IORING_TW_CAP_ENTRIES_VALUE 32 27 28 enum { 29 IO_SQ_THREAD_SHOULD_STOP = 0, 30 IO_SQ_THREAD_SHOULD_PARK, 31 }; 32 33 void io_sq_thread_unpark(struct io_sq_data *sqd) 34 __releases(&sqd->lock) 35 { 36 WARN_ON_ONCE(sqpoll_task_locked(sqd) == current); 37 38 /* 39 * Do the dance but not conditional clear_bit() because it'd race with 40 * other threads incrementing park_pending and setting the bit. 41 */ 42 clear_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); 43 if (atomic_dec_return(&sqd->park_pending)) 44 set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); 45 mutex_unlock(&sqd->lock); 46 wake_up(&sqd->wait); 47 } 48 49 void io_sq_thread_park(struct io_sq_data *sqd) 50 __acquires(&sqd->lock) 51 { 52 struct task_struct *tsk; 53 54 atomic_inc(&sqd->park_pending); 55 set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); 56 mutex_lock(&sqd->lock); 57 58 tsk = sqpoll_task_locked(sqd); 59 if (tsk) { 60 WARN_ON_ONCE(tsk == current); 61 wake_up_process(tsk); 62 } 63 } 64 65 void io_sq_thread_stop(struct io_sq_data *sqd) 66 { 67 struct task_struct *tsk; 68 69 WARN_ON_ONCE(test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state)); 70 71 set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state); 72 mutex_lock(&sqd->lock); 73 tsk = sqpoll_task_locked(sqd); 74 if (tsk) { 75 WARN_ON_ONCE(tsk == current); 76 wake_up_process(tsk); 77 } 78 mutex_unlock(&sqd->lock); 79 wait_for_completion(&sqd->exited); 80 } 81 82 void io_put_sq_data(struct io_sq_data *sqd) 83 { 84 if (refcount_dec_and_test(&sqd->refs)) { 85 WARN_ON_ONCE(atomic_read(&sqd->park_pending)); 86 87 io_sq_thread_stop(sqd); 88 kfree(sqd); 89 } 90 } 91 92 static __cold void io_sqd_update_thread_idle(struct io_sq_data *sqd) 93 { 94 struct io_ring_ctx *ctx; 95 unsigned sq_thread_idle = 0; 96 97 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) 98 sq_thread_idle = max(sq_thread_idle, ctx->sq_thread_idle); 99 sqd->sq_thread_idle = sq_thread_idle; 100 } 101 102 void io_sq_thread_finish(struct io_ring_ctx *ctx) 103 { 104 struct io_sq_data *sqd = ctx->sq_data; 105 106 if (sqd) { 107 io_sq_thread_park(sqd); 108 list_del_init(&ctx->sqd_list); 109 io_sqd_update_thread_idle(sqd); 110 io_sq_thread_unpark(sqd); 111 112 io_put_sq_data(sqd); 113 ctx->sq_data = NULL; 114 } 115 } 116 117 static struct io_sq_data *io_attach_sq_data(struct io_uring_params *p) 118 { 119 struct io_ring_ctx *ctx_attach; 120 struct io_sq_data *sqd; 121 CLASS(fd, f)(p->wq_fd); 122 123 if (fd_empty(f)) 124 return ERR_PTR(-ENXIO); 125 if (!io_is_uring_fops(fd_file(f))) 126 return ERR_PTR(-EINVAL); 127 128 ctx_attach = fd_file(f)->private_data; 129 sqd = ctx_attach->sq_data; 130 if (!sqd) 131 return ERR_PTR(-EINVAL); 132 if (sqd->task_tgid != current->tgid) 133 return ERR_PTR(-EPERM); 134 135 refcount_inc(&sqd->refs); 136 return sqd; 137 } 138 139 static struct io_sq_data *io_get_sq_data(struct io_uring_params *p, 140 bool *attached) 141 { 142 struct io_sq_data *sqd; 143 144 *attached = false; 145 if (p->flags & IORING_SETUP_ATTACH_WQ) { 146 sqd = io_attach_sq_data(p); 147 if (!IS_ERR(sqd)) { 148 *attached = true; 149 return sqd; 150 } 151 /* fall through for EPERM case, setup new sqd/task */ 152 if (PTR_ERR(sqd) != -EPERM) 153 return sqd; 154 } 155 156 sqd = kzalloc_obj(*sqd); 157 if (!sqd) 158 return ERR_PTR(-ENOMEM); 159 160 atomic_set(&sqd->park_pending, 0); 161 refcount_set(&sqd->refs, 1); 162 INIT_LIST_HEAD(&sqd->ctx_list); 163 mutex_init(&sqd->lock); 164 init_waitqueue_head(&sqd->wait); 165 init_completion(&sqd->exited); 166 return sqd; 167 } 168 169 static inline bool io_sqd_events_pending(struct io_sq_data *sqd) 170 { 171 return READ_ONCE(sqd->state); 172 } 173 174 struct io_sq_time { 175 bool started; 176 u64 usec; 177 }; 178 179 u64 io_sq_cpu_usec(struct task_struct *tsk) 180 { 181 u64 utime, stime; 182 183 task_cputime_adjusted(tsk, &utime, &stime); 184 do_div(stime, 1000); 185 return stime; 186 } 187 188 static void io_sq_update_worktime(struct io_sq_data *sqd, struct io_sq_time *ist) 189 { 190 if (!ist->started) 191 return; 192 ist->started = false; 193 sqd->work_time += io_sq_cpu_usec(current) - ist->usec; 194 } 195 196 static void io_sq_start_worktime(struct io_sq_time *ist) 197 { 198 if (ist->started) 199 return; 200 ist->started = true; 201 ist->usec = io_sq_cpu_usec(current); 202 } 203 204 static int __io_sq_thread(struct io_ring_ctx *ctx, struct io_sq_data *sqd, 205 bool cap_entries, struct io_sq_time *ist) 206 { 207 unsigned int to_submit; 208 int ret = 0; 209 210 to_submit = io_sqring_entries(ctx); 211 /* if we're handling multiple rings, cap submit size for fairness */ 212 if (cap_entries && to_submit > IORING_SQPOLL_CAP_ENTRIES_VALUE) 213 to_submit = IORING_SQPOLL_CAP_ENTRIES_VALUE; 214 215 if (to_submit || !list_empty(&ctx->iopoll_list)) { 216 const struct cred *creds = NULL; 217 218 io_sq_start_worktime(ist); 219 220 if (ctx->sq_creds != current_cred()) 221 creds = override_creds(ctx->sq_creds); 222 223 mutex_lock(&ctx->uring_lock); 224 if (!list_empty(&ctx->iopoll_list)) 225 io_do_iopoll(ctx, true); 226 227 /* 228 * Don't submit if refs are dying, good for io_uring_register(), 229 * but also it is relied upon by io_ring_exit_work() 230 */ 231 if (to_submit && likely(!percpu_ref_is_dying(&ctx->refs)) && 232 !(ctx->flags & IORING_SETUP_R_DISABLED)) 233 ret = io_submit_sqes(ctx, to_submit); 234 mutex_unlock(&ctx->uring_lock); 235 236 if (to_submit && wq_has_sleeper(&ctx->sqo_sq_wait)) 237 wake_up(&ctx->sqo_sq_wait); 238 if (creds) 239 revert_creds(creds); 240 } 241 242 return ret; 243 } 244 245 static bool io_sqd_handle_event(struct io_sq_data *sqd) 246 { 247 bool did_sig = false; 248 struct ksignal ksig; 249 250 if (test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state) || 251 signal_pending(current)) { 252 mutex_unlock(&sqd->lock); 253 if (signal_pending(current)) 254 did_sig = get_signal(&ksig); 255 wait_event(sqd->wait, !atomic_read(&sqd->park_pending)); 256 mutex_lock(&sqd->lock); 257 sqd->sq_cpu = raw_smp_processor_id(); 258 } 259 return did_sig || test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state); 260 } 261 262 /* 263 * Run task_work, processing no more than max_entries at a time. If more 264 * than that is pending, it simply stays on the queue for the next run. 265 */ 266 static unsigned int io_sq_tw(int max_entries) 267 { 268 struct io_uring_task *tctx = current->io_uring; 269 unsigned int count = 0; 270 271 tctx_task_work_run(tctx, max_entries, &count); 272 if (task_work_pending(current)) 273 task_work_run(); 274 return count; 275 } 276 277 static bool io_sq_tw_pending(void) 278 { 279 struct io_uring_task *tctx = current->io_uring; 280 281 return !mpscq_empty(&tctx->task_list); 282 } 283 284 static int io_sq_thread(void *data) 285 { 286 struct io_sq_data *sqd = data; 287 struct io_ring_ctx *ctx; 288 unsigned long timeout = 0; 289 char buf[TASK_COMM_LEN] = {}; 290 DEFINE_WAIT(wait); 291 292 /* offload context creation failed, just exit */ 293 if (!current->io_uring) { 294 mutex_lock(&sqd->lock); 295 rcu_assign_pointer(sqd->thread, NULL); 296 put_task_struct(current); 297 mutex_unlock(&sqd->lock); 298 goto err_out; 299 } 300 301 snprintf(buf, sizeof(buf), "iou-sqp-%d", sqd->task_pid); 302 set_task_comm(current, buf); 303 304 /* reset to our pid after we've set task_comm, for fdinfo */ 305 sqd->task_pid = current->pid; 306 307 if (sqd->sq_cpu != -1) { 308 set_cpus_allowed_ptr(current, cpumask_of(sqd->sq_cpu)); 309 } else { 310 set_cpus_allowed_ptr(current, cpu_online_mask); 311 sqd->sq_cpu = raw_smp_processor_id(); 312 } 313 314 /* 315 * Force audit context to get setup, in case we do prep side async 316 * operations that would trigger an audit call before any issue side 317 * audit has been done. 318 */ 319 audit_uring_entry(IORING_OP_NOP); 320 audit_uring_exit(true, 0); 321 322 mutex_lock(&sqd->lock); 323 while (1) { 324 bool cap_entries, sqt_spin = false; 325 struct io_sq_time ist = { }; 326 327 if (io_sqd_events_pending(sqd) || signal_pending(current)) { 328 if (io_sqd_handle_event(sqd)) 329 break; 330 timeout = jiffies + sqd->sq_thread_idle; 331 } 332 333 cap_entries = !list_is_singular(&sqd->ctx_list); 334 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) { 335 int ret = __io_sq_thread(ctx, sqd, cap_entries, &ist); 336 337 if (!sqt_spin && (ret > 0 || !list_empty(&ctx->iopoll_list))) 338 sqt_spin = true; 339 } 340 if (io_sq_tw(IORING_TW_CAP_ENTRIES_VALUE)) 341 sqt_spin = true; 342 343 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) { 344 if (io_napi(ctx)) { 345 io_sq_start_worktime(&ist); 346 io_napi_sqpoll_busy_poll(ctx); 347 } 348 } 349 350 io_sq_update_worktime(sqd, &ist); 351 352 if (sqt_spin || !time_after(jiffies, timeout)) { 353 if (sqt_spin) 354 timeout = jiffies + sqd->sq_thread_idle; 355 if (unlikely(need_resched())) { 356 mutex_unlock(&sqd->lock); 357 cond_resched(); 358 mutex_lock(&sqd->lock); 359 sqd->sq_cpu = raw_smp_processor_id(); 360 } 361 continue; 362 } 363 364 prepare_to_wait(&sqd->wait, &wait, TASK_INTERRUPTIBLE); 365 if (!io_sqd_events_pending(sqd) && !io_sq_tw_pending()) { 366 bool needs_sched = true; 367 368 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) { 369 atomic_or(IORING_SQ_NEED_WAKEUP, 370 &ctx->rings->sq_flags); 371 if ((ctx->flags & IORING_SETUP_IOPOLL) && 372 !list_empty(&ctx->iopoll_list)) { 373 needs_sched = false; 374 break; 375 } 376 377 /* 378 * Ensure the store of the wakeup flag is not 379 * reordered with the load of the SQ tail 380 */ 381 smp_mb__after_atomic(); 382 383 if (io_sqring_entries(ctx)) { 384 needs_sched = false; 385 break; 386 } 387 } 388 389 if (needs_sched) { 390 mutex_unlock(&sqd->lock); 391 schedule(); 392 mutex_lock(&sqd->lock); 393 sqd->sq_cpu = raw_smp_processor_id(); 394 } 395 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) 396 atomic_andnot(IORING_SQ_NEED_WAKEUP, 397 &ctx->rings->sq_flags); 398 } 399 400 finish_wait(&sqd->wait, &wait); 401 timeout = jiffies + sqd->sq_thread_idle; 402 } 403 404 if (io_sq_tw_pending()) 405 io_sq_tw(UINT_MAX); 406 407 io_uring_cancel_generic(true, sqd); 408 rcu_assign_pointer(sqd->thread, NULL); 409 put_task_struct(current); 410 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) 411 atomic_or(IORING_SQ_NEED_WAKEUP, &ctx->rings->sq_flags); 412 io_run_task_work(); 413 mutex_unlock(&sqd->lock); 414 err_out: 415 complete(&sqd->exited); 416 do_exit(0); 417 } 418 419 void io_sqpoll_wait_sq(struct io_ring_ctx *ctx) 420 { 421 DEFINE_WAIT(wait); 422 423 do { 424 if (!io_sqring_full(ctx)) 425 break; 426 prepare_to_wait(&ctx->sqo_sq_wait, &wait, TASK_INTERRUPTIBLE); 427 428 if (!io_sqring_full(ctx)) 429 break; 430 schedule(); 431 } while (!signal_pending(current)); 432 433 finish_wait(&ctx->sqo_sq_wait, &wait); 434 } 435 436 __cold int io_sq_offload_create(struct io_ring_ctx *ctx, 437 struct io_uring_params *p) 438 { 439 int ret; 440 441 /* Retain compatibility with failing for an invalid attach attempt */ 442 if ((ctx->flags & (IORING_SETUP_ATTACH_WQ | IORING_SETUP_SQPOLL)) == 443 IORING_SETUP_ATTACH_WQ) { 444 CLASS(fd, f)(p->wq_fd); 445 if (fd_empty(f)) 446 return -ENXIO; 447 if (!io_is_uring_fops(fd_file(f))) 448 return -EINVAL; 449 } 450 if (ctx->flags & IORING_SETUP_SQPOLL) { 451 struct io_uring_task *tctx; 452 struct task_struct *tsk; 453 struct io_sq_data *sqd; 454 bool attached; 455 456 ret = security_uring_sqpoll(); 457 if (ret) 458 return ret; 459 460 sqd = io_get_sq_data(p, &attached); 461 if (IS_ERR(sqd)) { 462 ret = PTR_ERR(sqd); 463 goto err; 464 } 465 466 ctx->sq_creds = get_current_cred(); 467 ctx->sq_data = sqd; 468 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle); 469 if (!ctx->sq_thread_idle) 470 ctx->sq_thread_idle = HZ; 471 472 io_sq_thread_park(sqd); 473 list_add(&ctx->sqd_list, &sqd->ctx_list); 474 io_sqd_update_thread_idle(sqd); 475 /* don't attach to a dying SQPOLL thread, would be racy */ 476 ret = (attached && !sqd->thread) ? -ENXIO : 0; 477 io_sq_thread_unpark(sqd); 478 479 if (ret < 0) 480 goto err; 481 if (attached) 482 return 0; 483 484 if (p->flags & IORING_SETUP_SQ_AFF) { 485 cpumask_var_t allowed_mask; 486 int cpu = p->sq_thread_cpu; 487 488 ret = -EINVAL; 489 if (cpu >= nr_cpu_ids || !cpu_online(cpu)) 490 goto err_sqpoll; 491 ret = -ENOMEM; 492 if (!alloc_cpumask_var(&allowed_mask, GFP_KERNEL)) 493 goto err_sqpoll; 494 ret = -EINVAL; 495 cpuset_cpus_allowed(current, allowed_mask); 496 if (!cpumask_test_cpu(cpu, allowed_mask)) { 497 free_cpumask_var(allowed_mask); 498 goto err_sqpoll; 499 } 500 free_cpumask_var(allowed_mask); 501 sqd->sq_cpu = cpu; 502 } else { 503 sqd->sq_cpu = -1; 504 } 505 506 sqd->task_pid = current->pid; 507 sqd->task_tgid = current->tgid; 508 tsk = create_io_thread(io_sq_thread, sqd, NUMA_NO_NODE); 509 if (IS_ERR(tsk)) { 510 ret = PTR_ERR(tsk); 511 goto err_sqpoll; 512 } 513 514 mutex_lock(&sqd->lock); 515 rcu_assign_pointer(sqd->thread, tsk); 516 mutex_unlock(&sqd->lock); 517 518 ret = 0; 519 get_task_struct(tsk); 520 tctx = io_uring_alloc_task_context(tsk, ctx); 521 if (!IS_ERR(tctx)) 522 tsk->io_uring = tctx; 523 else 524 ret = PTR_ERR(tctx); 525 wake_up_new_task(tsk); 526 if (ret) 527 goto err; 528 } else if (p->flags & IORING_SETUP_SQ_AFF) { 529 /* Can't have SQ_AFF without SQPOLL */ 530 ret = -EINVAL; 531 goto err; 532 } 533 return 0; 534 err_sqpoll: 535 complete(&ctx->sq_data->exited); 536 err: 537 io_sq_thread_finish(ctx); 538 return ret; 539 } 540 541 __cold int io_sqpoll_wq_cpu_affinity(struct io_ring_ctx *ctx, 542 cpumask_var_t mask) 543 { 544 struct io_sq_data *sqd = ctx->sq_data; 545 int ret = -EINVAL; 546 547 if (sqd) { 548 struct task_struct *tsk; 549 550 io_sq_thread_park(sqd); 551 /* Don't set affinity for a dying thread */ 552 tsk = sqpoll_task_locked(sqd); 553 if (tsk) 554 ret = io_wq_cpu_affinity(tsk->io_uring, mask); 555 io_sq_thread_unpark(sqd); 556 } 557 558 return ret; 559 } 560