1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Contains the core associated with submission side polling of the SQ 4 * ring, offloading submissions from the application to a kernel thread. 5 */ 6 #include <linux/kernel.h> 7 #include <linux/errno.h> 8 #include <linux/file.h> 9 #include <linux/mm.h> 10 #include <linux/slab.h> 11 #include <linux/audit.h> 12 #include <linux/security.h> 13 #include <linux/cpuset.h> 14 #include <linux/sched/cputime.h> 15 #include <linux/io_uring.h> 16 17 #include <uapi/linux/io_uring.h> 18 19 #include "io_uring.h" 20 #include "tctx.h" 21 #include "napi.h" 22 #include "cancel.h" 23 #include "sqpoll.h" 24 25 #define IORING_SQPOLL_CAP_ENTRIES_VALUE 8 26 #define IORING_TW_CAP_ENTRIES_VALUE 32 27 28 enum { 29 IO_SQ_THREAD_SHOULD_STOP = 0, 30 IO_SQ_THREAD_SHOULD_PARK, 31 }; 32 33 void io_sq_thread_unpark(struct io_sq_data *sqd) 34 __releases(&sqd->lock) 35 { 36 WARN_ON_ONCE(sqpoll_task_locked(sqd) == current); 37 38 /* 39 * Do the dance but not conditional clear_bit() because it'd race with 40 * other threads incrementing park_pending and setting the bit. 41 */ 42 clear_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); 43 if (atomic_dec_return(&sqd->park_pending)) 44 set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); 45 mutex_unlock(&sqd->lock); 46 wake_up(&sqd->wait); 47 } 48 49 void io_sq_thread_park(struct io_sq_data *sqd) 50 __acquires(&sqd->lock) 51 { 52 struct task_struct *tsk; 53 54 atomic_inc(&sqd->park_pending); 55 set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); 56 mutex_lock(&sqd->lock); 57 58 tsk = sqpoll_task_locked(sqd); 59 if (tsk) { 60 WARN_ON_ONCE(tsk == current); 61 wake_up_process(tsk); 62 } 63 } 64 65 void io_sq_thread_stop(struct io_sq_data *sqd) 66 { 67 struct task_struct *tsk; 68 69 WARN_ON_ONCE(test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state)); 70 71 set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state); 72 mutex_lock(&sqd->lock); 73 tsk = sqpoll_task_locked(sqd); 74 if (tsk) { 75 WARN_ON_ONCE(tsk == current); 76 wake_up_process(tsk); 77 } 78 mutex_unlock(&sqd->lock); 79 wait_for_completion(&sqd->exited); 80 } 81 82 void io_put_sq_data(struct io_sq_data *sqd) 83 { 84 if (refcount_dec_and_test(&sqd->refs)) { 85 WARN_ON_ONCE(atomic_read(&sqd->park_pending)); 86 87 io_sq_thread_stop(sqd); 88 kfree(sqd); 89 } 90 } 91 92 static __cold void io_sqd_update_thread_idle(struct io_sq_data *sqd) 93 { 94 struct io_ring_ctx *ctx; 95 unsigned sq_thread_idle = 0; 96 97 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) 98 sq_thread_idle = max(sq_thread_idle, ctx->sq_thread_idle); 99 sqd->sq_thread_idle = sq_thread_idle; 100 } 101 102 void io_sq_thread_finish(struct io_ring_ctx *ctx) 103 { 104 struct io_sq_data *sqd = ctx->sq_data; 105 106 if (sqd) { 107 io_sq_thread_park(sqd); 108 list_del_init(&ctx->sqd_list); 109 io_sqd_update_thread_idle(sqd); 110 io_sq_thread_unpark(sqd); 111 112 io_put_sq_data(sqd); 113 ctx->sq_data = NULL; 114 } 115 } 116 117 static struct io_sq_data *io_attach_sq_data(struct io_uring_params *p) 118 { 119 struct io_ring_ctx *ctx_attach; 120 struct io_sq_data *sqd; 121 CLASS(fd, f)(p->wq_fd); 122 123 if (fd_empty(f)) 124 return ERR_PTR(-ENXIO); 125 if (!io_is_uring_fops(fd_file(f))) 126 return ERR_PTR(-EINVAL); 127 128 ctx_attach = fd_file(f)->private_data; 129 sqd = ctx_attach->sq_data; 130 if (!sqd) 131 return ERR_PTR(-EINVAL); 132 if (sqd->task_tgid != current->tgid) 133 return ERR_PTR(-EPERM); 134 135 refcount_inc(&sqd->refs); 136 return sqd; 137 } 138 139 static struct io_sq_data *io_get_sq_data(struct io_uring_params *p, 140 bool *attached) 141 { 142 struct io_sq_data *sqd; 143 144 *attached = false; 145 if (p->flags & IORING_SETUP_ATTACH_WQ) { 146 sqd = io_attach_sq_data(p); 147 if (!IS_ERR(sqd)) { 148 *attached = true; 149 return sqd; 150 } 151 /* fall through for EPERM case, setup new sqd/task */ 152 if (PTR_ERR(sqd) != -EPERM) 153 return sqd; 154 } 155 156 sqd = kzalloc(sizeof(*sqd), GFP_KERNEL); 157 if (!sqd) 158 return ERR_PTR(-ENOMEM); 159 160 atomic_set(&sqd->park_pending, 0); 161 refcount_set(&sqd->refs, 1); 162 INIT_LIST_HEAD(&sqd->ctx_list); 163 mutex_init(&sqd->lock); 164 init_waitqueue_head(&sqd->wait); 165 init_completion(&sqd->exited); 166 return sqd; 167 } 168 169 static inline bool io_sqd_events_pending(struct io_sq_data *sqd) 170 { 171 return READ_ONCE(sqd->state); 172 } 173 174 struct io_sq_time { 175 bool started; 176 u64 usec; 177 }; 178 179 u64 io_sq_cpu_usec(struct task_struct *tsk) 180 { 181 u64 utime, stime; 182 183 task_cputime_adjusted(tsk, &utime, &stime); 184 do_div(stime, 1000); 185 return stime; 186 } 187 188 static void io_sq_update_worktime(struct io_sq_data *sqd, struct io_sq_time *ist) 189 { 190 if (!ist->started) 191 return; 192 ist->started = false; 193 sqd->work_time += io_sq_cpu_usec(current) - ist->usec; 194 } 195 196 static void io_sq_start_worktime(struct io_sq_time *ist) 197 { 198 if (ist->started) 199 return; 200 ist->started = true; 201 ist->usec = io_sq_cpu_usec(current); 202 } 203 204 static int __io_sq_thread(struct io_ring_ctx *ctx, struct io_sq_data *sqd, 205 bool cap_entries, struct io_sq_time *ist) 206 { 207 unsigned int to_submit; 208 int ret = 0; 209 210 to_submit = io_sqring_entries(ctx); 211 /* if we're handling multiple rings, cap submit size for fairness */ 212 if (cap_entries && to_submit > IORING_SQPOLL_CAP_ENTRIES_VALUE) 213 to_submit = IORING_SQPOLL_CAP_ENTRIES_VALUE; 214 215 if (to_submit || !wq_list_empty(&ctx->iopoll_list)) { 216 const struct cred *creds = NULL; 217 218 io_sq_start_worktime(ist); 219 220 if (ctx->sq_creds != current_cred()) 221 creds = override_creds(ctx->sq_creds); 222 223 mutex_lock(&ctx->uring_lock); 224 if (!wq_list_empty(&ctx->iopoll_list)) 225 io_do_iopoll(ctx, true); 226 227 /* 228 * Don't submit if refs are dying, good for io_uring_register(), 229 * but also it is relied upon by io_ring_exit_work() 230 */ 231 if (to_submit && likely(!percpu_ref_is_dying(&ctx->refs)) && 232 !(ctx->flags & IORING_SETUP_R_DISABLED)) 233 ret = io_submit_sqes(ctx, to_submit); 234 mutex_unlock(&ctx->uring_lock); 235 236 if (to_submit && wq_has_sleeper(&ctx->sqo_sq_wait)) 237 wake_up(&ctx->sqo_sq_wait); 238 if (creds) 239 revert_creds(creds); 240 } 241 242 return ret; 243 } 244 245 static bool io_sqd_handle_event(struct io_sq_data *sqd) 246 { 247 bool did_sig = false; 248 struct ksignal ksig; 249 250 if (test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state) || 251 signal_pending(current)) { 252 mutex_unlock(&sqd->lock); 253 if (signal_pending(current)) 254 did_sig = get_signal(&ksig); 255 wait_event(sqd->wait, !atomic_read(&sqd->park_pending)); 256 mutex_lock(&sqd->lock); 257 sqd->sq_cpu = raw_smp_processor_id(); 258 } 259 return did_sig || test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state); 260 } 261 262 /* 263 * Run task_work, processing the retry_list first. The retry_list holds 264 * entries that we passed on in the previous run, if we had more task_work 265 * than we were asked to process. Newly queued task_work isn't run until the 266 * retry list has been fully processed. 267 */ 268 static unsigned int io_sq_tw(struct llist_node **retry_list, int max_entries) 269 { 270 struct io_uring_task *tctx = current->io_uring; 271 unsigned int count = 0; 272 273 if (*retry_list) { 274 *retry_list = io_handle_tw_list(*retry_list, &count, max_entries); 275 if (count >= max_entries) 276 goto out; 277 max_entries -= count; 278 } 279 *retry_list = tctx_task_work_run(tctx, max_entries, &count); 280 out: 281 if (task_work_pending(current)) 282 task_work_run(); 283 return count; 284 } 285 286 static bool io_sq_tw_pending(struct llist_node *retry_list) 287 { 288 struct io_uring_task *tctx = current->io_uring; 289 290 return retry_list || !llist_empty(&tctx->task_list); 291 } 292 293 static int io_sq_thread(void *data) 294 { 295 struct llist_node *retry_list = NULL; 296 struct io_sq_data *sqd = data; 297 struct io_ring_ctx *ctx; 298 unsigned long timeout = 0; 299 char buf[TASK_COMM_LEN] = {}; 300 DEFINE_WAIT(wait); 301 302 /* offload context creation failed, just exit */ 303 if (!current->io_uring) { 304 mutex_lock(&sqd->lock); 305 rcu_assign_pointer(sqd->thread, NULL); 306 put_task_struct(current); 307 mutex_unlock(&sqd->lock); 308 goto err_out; 309 } 310 311 snprintf(buf, sizeof(buf), "iou-sqp-%d", sqd->task_pid); 312 set_task_comm(current, buf); 313 314 /* reset to our pid after we've set task_comm, for fdinfo */ 315 sqd->task_pid = current->pid; 316 317 if (sqd->sq_cpu != -1) { 318 set_cpus_allowed_ptr(current, cpumask_of(sqd->sq_cpu)); 319 } else { 320 set_cpus_allowed_ptr(current, cpu_online_mask); 321 sqd->sq_cpu = raw_smp_processor_id(); 322 } 323 324 /* 325 * Force audit context to get setup, in case we do prep side async 326 * operations that would trigger an audit call before any issue side 327 * audit has been done. 328 */ 329 audit_uring_entry(IORING_OP_NOP); 330 audit_uring_exit(true, 0); 331 332 mutex_lock(&sqd->lock); 333 while (1) { 334 bool cap_entries, sqt_spin = false; 335 struct io_sq_time ist = { }; 336 337 if (io_sqd_events_pending(sqd) || signal_pending(current)) { 338 if (io_sqd_handle_event(sqd)) 339 break; 340 timeout = jiffies + sqd->sq_thread_idle; 341 } 342 343 cap_entries = !list_is_singular(&sqd->ctx_list); 344 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) { 345 int ret = __io_sq_thread(ctx, sqd, cap_entries, &ist); 346 347 if (!sqt_spin && (ret > 0 || !wq_list_empty(&ctx->iopoll_list))) 348 sqt_spin = true; 349 } 350 if (io_sq_tw(&retry_list, IORING_TW_CAP_ENTRIES_VALUE)) 351 sqt_spin = true; 352 353 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) { 354 if (io_napi(ctx)) { 355 io_sq_start_worktime(&ist); 356 io_napi_sqpoll_busy_poll(ctx); 357 } 358 } 359 360 io_sq_update_worktime(sqd, &ist); 361 362 if (sqt_spin || !time_after(jiffies, timeout)) { 363 if (sqt_spin) 364 timeout = jiffies + sqd->sq_thread_idle; 365 if (unlikely(need_resched())) { 366 mutex_unlock(&sqd->lock); 367 cond_resched(); 368 mutex_lock(&sqd->lock); 369 sqd->sq_cpu = raw_smp_processor_id(); 370 } 371 continue; 372 } 373 374 prepare_to_wait(&sqd->wait, &wait, TASK_INTERRUPTIBLE); 375 if (!io_sqd_events_pending(sqd) && !io_sq_tw_pending(retry_list)) { 376 bool needs_sched = true; 377 378 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) { 379 atomic_or(IORING_SQ_NEED_WAKEUP, 380 &ctx->rings->sq_flags); 381 if ((ctx->flags & IORING_SETUP_IOPOLL) && 382 !wq_list_empty(&ctx->iopoll_list)) { 383 needs_sched = false; 384 break; 385 } 386 387 /* 388 * Ensure the store of the wakeup flag is not 389 * reordered with the load of the SQ tail 390 */ 391 smp_mb__after_atomic(); 392 393 if (io_sqring_entries(ctx)) { 394 needs_sched = false; 395 break; 396 } 397 } 398 399 if (needs_sched) { 400 mutex_unlock(&sqd->lock); 401 schedule(); 402 mutex_lock(&sqd->lock); 403 sqd->sq_cpu = raw_smp_processor_id(); 404 } 405 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) 406 atomic_andnot(IORING_SQ_NEED_WAKEUP, 407 &ctx->rings->sq_flags); 408 } 409 410 finish_wait(&sqd->wait, &wait); 411 timeout = jiffies + sqd->sq_thread_idle; 412 } 413 414 if (retry_list) 415 io_sq_tw(&retry_list, UINT_MAX); 416 417 io_uring_cancel_generic(true, sqd); 418 rcu_assign_pointer(sqd->thread, NULL); 419 put_task_struct(current); 420 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) 421 atomic_or(IORING_SQ_NEED_WAKEUP, &ctx->rings->sq_flags); 422 io_run_task_work(); 423 mutex_unlock(&sqd->lock); 424 err_out: 425 complete(&sqd->exited); 426 do_exit(0); 427 } 428 429 void io_sqpoll_wait_sq(struct io_ring_ctx *ctx) 430 { 431 DEFINE_WAIT(wait); 432 433 do { 434 if (!io_sqring_full(ctx)) 435 break; 436 prepare_to_wait(&ctx->sqo_sq_wait, &wait, TASK_INTERRUPTIBLE); 437 438 if (!io_sqring_full(ctx)) 439 break; 440 schedule(); 441 } while (!signal_pending(current)); 442 443 finish_wait(&ctx->sqo_sq_wait, &wait); 444 } 445 446 __cold int io_sq_offload_create(struct io_ring_ctx *ctx, 447 struct io_uring_params *p) 448 { 449 int ret; 450 451 /* Retain compatibility with failing for an invalid attach attempt */ 452 if ((ctx->flags & (IORING_SETUP_ATTACH_WQ | IORING_SETUP_SQPOLL)) == 453 IORING_SETUP_ATTACH_WQ) { 454 CLASS(fd, f)(p->wq_fd); 455 if (fd_empty(f)) 456 return -ENXIO; 457 if (!io_is_uring_fops(fd_file(f))) 458 return -EINVAL; 459 } 460 if (ctx->flags & IORING_SETUP_SQPOLL) { 461 struct task_struct *tsk; 462 struct io_sq_data *sqd; 463 bool attached; 464 465 ret = security_uring_sqpoll(); 466 if (ret) 467 return ret; 468 469 sqd = io_get_sq_data(p, &attached); 470 if (IS_ERR(sqd)) { 471 ret = PTR_ERR(sqd); 472 goto err; 473 } 474 475 ctx->sq_creds = get_current_cred(); 476 ctx->sq_data = sqd; 477 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle); 478 if (!ctx->sq_thread_idle) 479 ctx->sq_thread_idle = HZ; 480 481 io_sq_thread_park(sqd); 482 list_add(&ctx->sqd_list, &sqd->ctx_list); 483 io_sqd_update_thread_idle(sqd); 484 /* don't attach to a dying SQPOLL thread, would be racy */ 485 ret = (attached && !sqd->thread) ? -ENXIO : 0; 486 io_sq_thread_unpark(sqd); 487 488 if (ret < 0) 489 goto err; 490 if (attached) 491 return 0; 492 493 if (p->flags & IORING_SETUP_SQ_AFF) { 494 cpumask_var_t allowed_mask; 495 int cpu = p->sq_thread_cpu; 496 497 ret = -EINVAL; 498 if (cpu >= nr_cpu_ids || !cpu_online(cpu)) 499 goto err_sqpoll; 500 ret = -ENOMEM; 501 if (!alloc_cpumask_var(&allowed_mask, GFP_KERNEL)) 502 goto err_sqpoll; 503 ret = -EINVAL; 504 cpuset_cpus_allowed(current, allowed_mask); 505 if (!cpumask_test_cpu(cpu, allowed_mask)) { 506 free_cpumask_var(allowed_mask); 507 goto err_sqpoll; 508 } 509 free_cpumask_var(allowed_mask); 510 sqd->sq_cpu = cpu; 511 } else { 512 sqd->sq_cpu = -1; 513 } 514 515 sqd->task_pid = current->pid; 516 sqd->task_tgid = current->tgid; 517 tsk = create_io_thread(io_sq_thread, sqd, NUMA_NO_NODE); 518 if (IS_ERR(tsk)) { 519 ret = PTR_ERR(tsk); 520 goto err_sqpoll; 521 } 522 523 mutex_lock(&sqd->lock); 524 rcu_assign_pointer(sqd->thread, tsk); 525 mutex_unlock(&sqd->lock); 526 527 get_task_struct(tsk); 528 ret = io_uring_alloc_task_context(tsk, ctx); 529 wake_up_new_task(tsk); 530 if (ret) 531 goto err; 532 } else if (p->flags & IORING_SETUP_SQ_AFF) { 533 /* Can't have SQ_AFF without SQPOLL */ 534 ret = -EINVAL; 535 goto err; 536 } 537 return 0; 538 err_sqpoll: 539 complete(&ctx->sq_data->exited); 540 err: 541 io_sq_thread_finish(ctx); 542 return ret; 543 } 544 545 __cold int io_sqpoll_wq_cpu_affinity(struct io_ring_ctx *ctx, 546 cpumask_var_t mask) 547 { 548 struct io_sq_data *sqd = ctx->sq_data; 549 int ret = -EINVAL; 550 551 if (sqd) { 552 struct task_struct *tsk; 553 554 io_sq_thread_park(sqd); 555 /* Don't set affinity for a dying thread */ 556 tsk = sqpoll_task_locked(sqd); 557 if (tsk) 558 ret = io_wq_cpu_affinity(tsk->io_uring, mask); 559 io_sq_thread_unpark(sqd); 560 } 561 562 return ret; 563 } 564