1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Contains the core associated with submission side polling of the SQ 4 * ring, offloading submissions from the application to a kernel thread. 5 */ 6 #include <linux/kernel.h> 7 #include <linux/errno.h> 8 #include <linux/file.h> 9 #include <linux/mm.h> 10 #include <linux/slab.h> 11 #include <linux/audit.h> 12 #include <linux/security.h> 13 #include <linux/cpuset.h> 14 #include <linux/sched/cputime.h> 15 #include <linux/io_uring.h> 16 17 #include <uapi/linux/io_uring.h> 18 19 #include "io_uring.h" 20 #include "tctx.h" 21 #include "napi.h" 22 #include "sqpoll.h" 23 24 #define IORING_SQPOLL_CAP_ENTRIES_VALUE 8 25 #define IORING_TW_CAP_ENTRIES_VALUE 32 26 27 enum { 28 IO_SQ_THREAD_SHOULD_STOP = 0, 29 IO_SQ_THREAD_SHOULD_PARK, 30 }; 31 32 void io_sq_thread_unpark(struct io_sq_data *sqd) 33 __releases(&sqd->lock) 34 { 35 WARN_ON_ONCE(sqpoll_task_locked(sqd) == current); 36 37 /* 38 * Do the dance but not conditional clear_bit() because it'd race with 39 * other threads incrementing park_pending and setting the bit. 40 */ 41 clear_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); 42 if (atomic_dec_return(&sqd->park_pending)) 43 set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); 44 mutex_unlock(&sqd->lock); 45 wake_up(&sqd->wait); 46 } 47 48 void io_sq_thread_park(struct io_sq_data *sqd) 49 __acquires(&sqd->lock) 50 { 51 struct task_struct *tsk; 52 53 atomic_inc(&sqd->park_pending); 54 set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); 55 mutex_lock(&sqd->lock); 56 57 tsk = sqpoll_task_locked(sqd); 58 if (tsk) { 59 WARN_ON_ONCE(tsk == current); 60 wake_up_process(tsk); 61 } 62 } 63 64 void io_sq_thread_stop(struct io_sq_data *sqd) 65 { 66 struct task_struct *tsk; 67 68 WARN_ON_ONCE(test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state)); 69 70 set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state); 71 mutex_lock(&sqd->lock); 72 tsk = sqpoll_task_locked(sqd); 73 if (tsk) { 74 WARN_ON_ONCE(tsk == current); 75 wake_up_process(tsk); 76 } 77 mutex_unlock(&sqd->lock); 78 wait_for_completion(&sqd->exited); 79 } 80 81 void io_put_sq_data(struct io_sq_data *sqd) 82 { 83 if (refcount_dec_and_test(&sqd->refs)) { 84 WARN_ON_ONCE(atomic_read(&sqd->park_pending)); 85 86 io_sq_thread_stop(sqd); 87 kfree(sqd); 88 } 89 } 90 91 static __cold void io_sqd_update_thread_idle(struct io_sq_data *sqd) 92 { 93 struct io_ring_ctx *ctx; 94 unsigned sq_thread_idle = 0; 95 96 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) 97 sq_thread_idle = max(sq_thread_idle, ctx->sq_thread_idle); 98 sqd->sq_thread_idle = sq_thread_idle; 99 } 100 101 void io_sq_thread_finish(struct io_ring_ctx *ctx) 102 { 103 struct io_sq_data *sqd = ctx->sq_data; 104 105 if (sqd) { 106 io_sq_thread_park(sqd); 107 list_del_init(&ctx->sqd_list); 108 io_sqd_update_thread_idle(sqd); 109 io_sq_thread_unpark(sqd); 110 111 io_put_sq_data(sqd); 112 ctx->sq_data = NULL; 113 } 114 } 115 116 static struct io_sq_data *io_attach_sq_data(struct io_uring_params *p) 117 { 118 struct io_ring_ctx *ctx_attach; 119 struct io_sq_data *sqd; 120 CLASS(fd, f)(p->wq_fd); 121 122 if (fd_empty(f)) 123 return ERR_PTR(-ENXIO); 124 if (!io_is_uring_fops(fd_file(f))) 125 return ERR_PTR(-EINVAL); 126 127 ctx_attach = fd_file(f)->private_data; 128 sqd = ctx_attach->sq_data; 129 if (!sqd) 130 return ERR_PTR(-EINVAL); 131 if (sqd->task_tgid != current->tgid) 132 return ERR_PTR(-EPERM); 133 134 refcount_inc(&sqd->refs); 135 return sqd; 136 } 137 138 static struct io_sq_data *io_get_sq_data(struct io_uring_params *p, 139 bool *attached) 140 { 141 struct io_sq_data *sqd; 142 143 *attached = false; 144 if (p->flags & IORING_SETUP_ATTACH_WQ) { 145 sqd = io_attach_sq_data(p); 146 if (!IS_ERR(sqd)) { 147 *attached = true; 148 return sqd; 149 } 150 /* fall through for EPERM case, setup new sqd/task */ 151 if (PTR_ERR(sqd) != -EPERM) 152 return sqd; 153 } 154 155 sqd = kzalloc(sizeof(*sqd), GFP_KERNEL); 156 if (!sqd) 157 return ERR_PTR(-ENOMEM); 158 159 atomic_set(&sqd->park_pending, 0); 160 refcount_set(&sqd->refs, 1); 161 INIT_LIST_HEAD(&sqd->ctx_list); 162 mutex_init(&sqd->lock); 163 init_waitqueue_head(&sqd->wait); 164 init_completion(&sqd->exited); 165 return sqd; 166 } 167 168 static inline bool io_sqd_events_pending(struct io_sq_data *sqd) 169 { 170 return READ_ONCE(sqd->state); 171 } 172 173 struct io_sq_time { 174 bool started; 175 u64 usec; 176 }; 177 178 u64 io_sq_cpu_usec(struct task_struct *tsk) 179 { 180 u64 utime, stime; 181 182 task_cputime_adjusted(tsk, &utime, &stime); 183 do_div(stime, 1000); 184 return stime; 185 } 186 187 static void io_sq_update_worktime(struct io_sq_data *sqd, struct io_sq_time *ist) 188 { 189 if (!ist->started) 190 return; 191 ist->started = false; 192 sqd->work_time += io_sq_cpu_usec(current) - ist->usec; 193 } 194 195 static void io_sq_start_worktime(struct io_sq_time *ist) 196 { 197 if (ist->started) 198 return; 199 ist->started = true; 200 ist->usec = io_sq_cpu_usec(current); 201 } 202 203 static int __io_sq_thread(struct io_ring_ctx *ctx, struct io_sq_data *sqd, 204 bool cap_entries, struct io_sq_time *ist) 205 { 206 unsigned int to_submit; 207 int ret = 0; 208 209 to_submit = io_sqring_entries(ctx); 210 /* if we're handling multiple rings, cap submit size for fairness */ 211 if (cap_entries && to_submit > IORING_SQPOLL_CAP_ENTRIES_VALUE) 212 to_submit = IORING_SQPOLL_CAP_ENTRIES_VALUE; 213 214 if (to_submit || !wq_list_empty(&ctx->iopoll_list)) { 215 const struct cred *creds = NULL; 216 217 io_sq_start_worktime(ist); 218 219 if (ctx->sq_creds != current_cred()) 220 creds = override_creds(ctx->sq_creds); 221 222 mutex_lock(&ctx->uring_lock); 223 if (!wq_list_empty(&ctx->iopoll_list)) 224 io_do_iopoll(ctx, true); 225 226 /* 227 * Don't submit if refs are dying, good for io_uring_register(), 228 * but also it is relied upon by io_ring_exit_work() 229 */ 230 if (to_submit && likely(!percpu_ref_is_dying(&ctx->refs)) && 231 !(ctx->flags & IORING_SETUP_R_DISABLED)) 232 ret = io_submit_sqes(ctx, to_submit); 233 mutex_unlock(&ctx->uring_lock); 234 235 if (to_submit && wq_has_sleeper(&ctx->sqo_sq_wait)) 236 wake_up(&ctx->sqo_sq_wait); 237 if (creds) 238 revert_creds(creds); 239 } 240 241 return ret; 242 } 243 244 static bool io_sqd_handle_event(struct io_sq_data *sqd) 245 { 246 bool did_sig = false; 247 struct ksignal ksig; 248 249 if (test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state) || 250 signal_pending(current)) { 251 mutex_unlock(&sqd->lock); 252 if (signal_pending(current)) 253 did_sig = get_signal(&ksig); 254 wait_event(sqd->wait, !atomic_read(&sqd->park_pending)); 255 mutex_lock(&sqd->lock); 256 sqd->sq_cpu = raw_smp_processor_id(); 257 } 258 return did_sig || test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state); 259 } 260 261 /* 262 * Run task_work, processing the retry_list first. The retry_list holds 263 * entries that we passed on in the previous run, if we had more task_work 264 * than we were asked to process. Newly queued task_work isn't run until the 265 * retry list has been fully processed. 266 */ 267 static unsigned int io_sq_tw(struct llist_node **retry_list, int max_entries) 268 { 269 struct io_uring_task *tctx = current->io_uring; 270 unsigned int count = 0; 271 272 if (*retry_list) { 273 *retry_list = io_handle_tw_list(*retry_list, &count, max_entries); 274 if (count >= max_entries) 275 goto out; 276 max_entries -= count; 277 } 278 *retry_list = tctx_task_work_run(tctx, max_entries, &count); 279 out: 280 if (task_work_pending(current)) 281 task_work_run(); 282 return count; 283 } 284 285 static bool io_sq_tw_pending(struct llist_node *retry_list) 286 { 287 struct io_uring_task *tctx = current->io_uring; 288 289 return retry_list || !llist_empty(&tctx->task_list); 290 } 291 292 static int io_sq_thread(void *data) 293 { 294 struct llist_node *retry_list = NULL; 295 struct io_sq_data *sqd = data; 296 struct io_ring_ctx *ctx; 297 unsigned long timeout = 0; 298 char buf[TASK_COMM_LEN] = {}; 299 DEFINE_WAIT(wait); 300 301 /* offload context creation failed, just exit */ 302 if (!current->io_uring) { 303 mutex_lock(&sqd->lock); 304 rcu_assign_pointer(sqd->thread, NULL); 305 put_task_struct(current); 306 mutex_unlock(&sqd->lock); 307 goto err_out; 308 } 309 310 snprintf(buf, sizeof(buf), "iou-sqp-%d", sqd->task_pid); 311 set_task_comm(current, buf); 312 313 /* reset to our pid after we've set task_comm, for fdinfo */ 314 sqd->task_pid = current->pid; 315 316 if (sqd->sq_cpu != -1) { 317 set_cpus_allowed_ptr(current, cpumask_of(sqd->sq_cpu)); 318 } else { 319 set_cpus_allowed_ptr(current, cpu_online_mask); 320 sqd->sq_cpu = raw_smp_processor_id(); 321 } 322 323 /* 324 * Force audit context to get setup, in case we do prep side async 325 * operations that would trigger an audit call before any issue side 326 * audit has been done. 327 */ 328 audit_uring_entry(IORING_OP_NOP); 329 audit_uring_exit(true, 0); 330 331 mutex_lock(&sqd->lock); 332 while (1) { 333 bool cap_entries, sqt_spin = false; 334 struct io_sq_time ist = { }; 335 336 if (io_sqd_events_pending(sqd) || signal_pending(current)) { 337 if (io_sqd_handle_event(sqd)) 338 break; 339 timeout = jiffies + sqd->sq_thread_idle; 340 } 341 342 cap_entries = !list_is_singular(&sqd->ctx_list); 343 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) { 344 int ret = __io_sq_thread(ctx, sqd, cap_entries, &ist); 345 346 if (!sqt_spin && (ret > 0 || !wq_list_empty(&ctx->iopoll_list))) 347 sqt_spin = true; 348 } 349 if (io_sq_tw(&retry_list, IORING_TW_CAP_ENTRIES_VALUE)) 350 sqt_spin = true; 351 352 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) { 353 if (io_napi(ctx)) { 354 io_sq_start_worktime(&ist); 355 io_napi_sqpoll_busy_poll(ctx); 356 } 357 } 358 359 io_sq_update_worktime(sqd, &ist); 360 361 if (sqt_spin || !time_after(jiffies, timeout)) { 362 if (sqt_spin) 363 timeout = jiffies + sqd->sq_thread_idle; 364 if (unlikely(need_resched())) { 365 mutex_unlock(&sqd->lock); 366 cond_resched(); 367 mutex_lock(&sqd->lock); 368 sqd->sq_cpu = raw_smp_processor_id(); 369 } 370 continue; 371 } 372 373 prepare_to_wait(&sqd->wait, &wait, TASK_INTERRUPTIBLE); 374 if (!io_sqd_events_pending(sqd) && !io_sq_tw_pending(retry_list)) { 375 bool needs_sched = true; 376 377 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) { 378 atomic_or(IORING_SQ_NEED_WAKEUP, 379 &ctx->rings->sq_flags); 380 if ((ctx->flags & IORING_SETUP_IOPOLL) && 381 !wq_list_empty(&ctx->iopoll_list)) { 382 needs_sched = false; 383 break; 384 } 385 386 /* 387 * Ensure the store of the wakeup flag is not 388 * reordered with the load of the SQ tail 389 */ 390 smp_mb__after_atomic(); 391 392 if (io_sqring_entries(ctx)) { 393 needs_sched = false; 394 break; 395 } 396 } 397 398 if (needs_sched) { 399 mutex_unlock(&sqd->lock); 400 schedule(); 401 mutex_lock(&sqd->lock); 402 sqd->sq_cpu = raw_smp_processor_id(); 403 } 404 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) 405 atomic_andnot(IORING_SQ_NEED_WAKEUP, 406 &ctx->rings->sq_flags); 407 } 408 409 finish_wait(&sqd->wait, &wait); 410 timeout = jiffies + sqd->sq_thread_idle; 411 } 412 413 if (retry_list) 414 io_sq_tw(&retry_list, UINT_MAX); 415 416 io_uring_cancel_generic(true, sqd); 417 rcu_assign_pointer(sqd->thread, NULL); 418 put_task_struct(current); 419 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) 420 atomic_or(IORING_SQ_NEED_WAKEUP, &ctx->rings->sq_flags); 421 io_run_task_work(); 422 mutex_unlock(&sqd->lock); 423 err_out: 424 complete(&sqd->exited); 425 do_exit(0); 426 } 427 428 void io_sqpoll_wait_sq(struct io_ring_ctx *ctx) 429 { 430 DEFINE_WAIT(wait); 431 432 do { 433 if (!io_sqring_full(ctx)) 434 break; 435 prepare_to_wait(&ctx->sqo_sq_wait, &wait, TASK_INTERRUPTIBLE); 436 437 if (!io_sqring_full(ctx)) 438 break; 439 schedule(); 440 } while (!signal_pending(current)); 441 442 finish_wait(&ctx->sqo_sq_wait, &wait); 443 } 444 445 __cold int io_sq_offload_create(struct io_ring_ctx *ctx, 446 struct io_uring_params *p) 447 { 448 int ret; 449 450 /* Retain compatibility with failing for an invalid attach attempt */ 451 if ((ctx->flags & (IORING_SETUP_ATTACH_WQ | IORING_SETUP_SQPOLL)) == 452 IORING_SETUP_ATTACH_WQ) { 453 CLASS(fd, f)(p->wq_fd); 454 if (fd_empty(f)) 455 return -ENXIO; 456 if (!io_is_uring_fops(fd_file(f))) 457 return -EINVAL; 458 } 459 if (ctx->flags & IORING_SETUP_SQPOLL) { 460 struct task_struct *tsk; 461 struct io_sq_data *sqd; 462 bool attached; 463 464 ret = security_uring_sqpoll(); 465 if (ret) 466 return ret; 467 468 sqd = io_get_sq_data(p, &attached); 469 if (IS_ERR(sqd)) { 470 ret = PTR_ERR(sqd); 471 goto err; 472 } 473 474 ctx->sq_creds = get_current_cred(); 475 ctx->sq_data = sqd; 476 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle); 477 if (!ctx->sq_thread_idle) 478 ctx->sq_thread_idle = HZ; 479 480 io_sq_thread_park(sqd); 481 list_add(&ctx->sqd_list, &sqd->ctx_list); 482 io_sqd_update_thread_idle(sqd); 483 /* don't attach to a dying SQPOLL thread, would be racy */ 484 ret = (attached && !sqd->thread) ? -ENXIO : 0; 485 io_sq_thread_unpark(sqd); 486 487 if (ret < 0) 488 goto err; 489 if (attached) 490 return 0; 491 492 if (p->flags & IORING_SETUP_SQ_AFF) { 493 cpumask_var_t allowed_mask; 494 int cpu = p->sq_thread_cpu; 495 496 ret = -EINVAL; 497 if (cpu >= nr_cpu_ids || !cpu_online(cpu)) 498 goto err_sqpoll; 499 ret = -ENOMEM; 500 if (!alloc_cpumask_var(&allowed_mask, GFP_KERNEL)) 501 goto err_sqpoll; 502 ret = -EINVAL; 503 cpuset_cpus_allowed(current, allowed_mask); 504 if (!cpumask_test_cpu(cpu, allowed_mask)) { 505 free_cpumask_var(allowed_mask); 506 goto err_sqpoll; 507 } 508 free_cpumask_var(allowed_mask); 509 sqd->sq_cpu = cpu; 510 } else { 511 sqd->sq_cpu = -1; 512 } 513 514 sqd->task_pid = current->pid; 515 sqd->task_tgid = current->tgid; 516 tsk = create_io_thread(io_sq_thread, sqd, NUMA_NO_NODE); 517 if (IS_ERR(tsk)) { 518 ret = PTR_ERR(tsk); 519 goto err_sqpoll; 520 } 521 522 mutex_lock(&sqd->lock); 523 rcu_assign_pointer(sqd->thread, tsk); 524 mutex_unlock(&sqd->lock); 525 526 get_task_struct(tsk); 527 ret = io_uring_alloc_task_context(tsk, ctx); 528 wake_up_new_task(tsk); 529 if (ret) 530 goto err; 531 } else if (p->flags & IORING_SETUP_SQ_AFF) { 532 /* Can't have SQ_AFF without SQPOLL */ 533 ret = -EINVAL; 534 goto err; 535 } 536 return 0; 537 err_sqpoll: 538 complete(&ctx->sq_data->exited); 539 err: 540 io_sq_thread_finish(ctx); 541 return ret; 542 } 543 544 __cold int io_sqpoll_wq_cpu_affinity(struct io_ring_ctx *ctx, 545 cpumask_var_t mask) 546 { 547 struct io_sq_data *sqd = ctx->sq_data; 548 int ret = -EINVAL; 549 550 if (sqd) { 551 struct task_struct *tsk; 552 553 io_sq_thread_park(sqd); 554 /* Don't set affinity for a dying thread */ 555 tsk = sqpoll_task_locked(sqd); 556 if (tsk) 557 ret = io_wq_cpu_affinity(tsk->io_uring, mask); 558 io_sq_thread_unpark(sqd); 559 } 560 561 return ret; 562 } 563