1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Contains the core associated with submission side polling of the SQ 4 * ring, offloading submissions from the application to a kernel thread. 5 */ 6 #include <linux/kernel.h> 7 #include <linux/errno.h> 8 #include <linux/file.h> 9 #include <linux/mm.h> 10 #include <linux/slab.h> 11 #include <linux/audit.h> 12 #include <linux/security.h> 13 #include <linux/cpuset.h> 14 #include <linux/io_uring.h> 15 16 #include <uapi/linux/io_uring.h> 17 18 #include "io_uring.h" 19 #include "napi.h" 20 #include "sqpoll.h" 21 22 #define IORING_SQPOLL_CAP_ENTRIES_VALUE 8 23 #define IORING_TW_CAP_ENTRIES_VALUE 32 24 25 enum { 26 IO_SQ_THREAD_SHOULD_STOP = 0, 27 IO_SQ_THREAD_SHOULD_PARK, 28 }; 29 30 void io_sq_thread_unpark(struct io_sq_data *sqd) 31 __releases(&sqd->lock) 32 { 33 WARN_ON_ONCE(sqpoll_task_locked(sqd) == current); 34 35 /* 36 * Do the dance but not conditional clear_bit() because it'd race with 37 * other threads incrementing park_pending and setting the bit. 38 */ 39 clear_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); 40 if (atomic_dec_return(&sqd->park_pending)) 41 set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); 42 mutex_unlock(&sqd->lock); 43 wake_up(&sqd->wait); 44 } 45 46 void io_sq_thread_park(struct io_sq_data *sqd) 47 __acquires(&sqd->lock) 48 { 49 struct task_struct *tsk; 50 51 atomic_inc(&sqd->park_pending); 52 set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); 53 mutex_lock(&sqd->lock); 54 55 tsk = sqpoll_task_locked(sqd); 56 if (tsk) { 57 WARN_ON_ONCE(tsk == current); 58 wake_up_process(tsk); 59 } 60 } 61 62 void io_sq_thread_stop(struct io_sq_data *sqd) 63 { 64 struct task_struct *tsk; 65 66 WARN_ON_ONCE(test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state)); 67 68 set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state); 69 mutex_lock(&sqd->lock); 70 tsk = sqpoll_task_locked(sqd); 71 if (tsk) { 72 WARN_ON_ONCE(tsk == current); 73 wake_up_process(tsk); 74 } 75 mutex_unlock(&sqd->lock); 76 wait_for_completion(&sqd->exited); 77 } 78 79 void io_put_sq_data(struct io_sq_data *sqd) 80 { 81 if (refcount_dec_and_test(&sqd->refs)) { 82 WARN_ON_ONCE(atomic_read(&sqd->park_pending)); 83 84 io_sq_thread_stop(sqd); 85 kfree(sqd); 86 } 87 } 88 89 static __cold void io_sqd_update_thread_idle(struct io_sq_data *sqd) 90 { 91 struct io_ring_ctx *ctx; 92 unsigned sq_thread_idle = 0; 93 94 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) 95 sq_thread_idle = max(sq_thread_idle, ctx->sq_thread_idle); 96 sqd->sq_thread_idle = sq_thread_idle; 97 } 98 99 void io_sq_thread_finish(struct io_ring_ctx *ctx) 100 { 101 struct io_sq_data *sqd = ctx->sq_data; 102 103 if (sqd) { 104 io_sq_thread_park(sqd); 105 list_del_init(&ctx->sqd_list); 106 io_sqd_update_thread_idle(sqd); 107 io_sq_thread_unpark(sqd); 108 109 io_put_sq_data(sqd); 110 ctx->sq_data = NULL; 111 } 112 } 113 114 static struct io_sq_data *io_attach_sq_data(struct io_uring_params *p) 115 { 116 struct io_ring_ctx *ctx_attach; 117 struct io_sq_data *sqd; 118 CLASS(fd, f)(p->wq_fd); 119 120 if (fd_empty(f)) 121 return ERR_PTR(-ENXIO); 122 if (!io_is_uring_fops(fd_file(f))) 123 return ERR_PTR(-EINVAL); 124 125 ctx_attach = fd_file(f)->private_data; 126 sqd = ctx_attach->sq_data; 127 if (!sqd) 128 return ERR_PTR(-EINVAL); 129 if (sqd->task_tgid != current->tgid) 130 return ERR_PTR(-EPERM); 131 132 refcount_inc(&sqd->refs); 133 return sqd; 134 } 135 136 static struct io_sq_data *io_get_sq_data(struct io_uring_params *p, 137 bool *attached) 138 { 139 struct io_sq_data *sqd; 140 141 *attached = false; 142 if (p->flags & IORING_SETUP_ATTACH_WQ) { 143 sqd = io_attach_sq_data(p); 144 if (!IS_ERR(sqd)) { 145 *attached = true; 146 return sqd; 147 } 148 /* fall through for EPERM case, setup new sqd/task */ 149 if (PTR_ERR(sqd) != -EPERM) 150 return sqd; 151 } 152 153 sqd = kzalloc(sizeof(*sqd), GFP_KERNEL); 154 if (!sqd) 155 return ERR_PTR(-ENOMEM); 156 157 atomic_set(&sqd->park_pending, 0); 158 refcount_set(&sqd->refs, 1); 159 INIT_LIST_HEAD(&sqd->ctx_list); 160 mutex_init(&sqd->lock); 161 init_waitqueue_head(&sqd->wait); 162 init_completion(&sqd->exited); 163 return sqd; 164 } 165 166 static inline bool io_sqd_events_pending(struct io_sq_data *sqd) 167 { 168 return READ_ONCE(sqd->state); 169 } 170 171 static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries) 172 { 173 unsigned int to_submit; 174 int ret = 0; 175 176 to_submit = io_sqring_entries(ctx); 177 /* if we're handling multiple rings, cap submit size for fairness */ 178 if (cap_entries && to_submit > IORING_SQPOLL_CAP_ENTRIES_VALUE) 179 to_submit = IORING_SQPOLL_CAP_ENTRIES_VALUE; 180 181 if (to_submit || !wq_list_empty(&ctx->iopoll_list)) { 182 const struct cred *creds = NULL; 183 184 if (ctx->sq_creds != current_cred()) 185 creds = override_creds(ctx->sq_creds); 186 187 mutex_lock(&ctx->uring_lock); 188 if (!wq_list_empty(&ctx->iopoll_list)) 189 io_do_iopoll(ctx, true); 190 191 /* 192 * Don't submit if refs are dying, good for io_uring_register(), 193 * but also it is relied upon by io_ring_exit_work() 194 */ 195 if (to_submit && likely(!percpu_ref_is_dying(&ctx->refs)) && 196 !(ctx->flags & IORING_SETUP_R_DISABLED)) 197 ret = io_submit_sqes(ctx, to_submit); 198 mutex_unlock(&ctx->uring_lock); 199 200 if (to_submit && wq_has_sleeper(&ctx->sqo_sq_wait)) 201 wake_up(&ctx->sqo_sq_wait); 202 if (creds) 203 revert_creds(creds); 204 } 205 206 return ret; 207 } 208 209 static bool io_sqd_handle_event(struct io_sq_data *sqd) 210 { 211 bool did_sig = false; 212 struct ksignal ksig; 213 214 if (test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state) || 215 signal_pending(current)) { 216 mutex_unlock(&sqd->lock); 217 if (signal_pending(current)) 218 did_sig = get_signal(&ksig); 219 wait_event(sqd->wait, !atomic_read(&sqd->park_pending)); 220 mutex_lock(&sqd->lock); 221 sqd->sq_cpu = raw_smp_processor_id(); 222 } 223 return did_sig || test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state); 224 } 225 226 /* 227 * Run task_work, processing the retry_list first. The retry_list holds 228 * entries that we passed on in the previous run, if we had more task_work 229 * than we were asked to process. Newly queued task_work isn't run until the 230 * retry list has been fully processed. 231 */ 232 static unsigned int io_sq_tw(struct llist_node **retry_list, int max_entries) 233 { 234 struct io_uring_task *tctx = current->io_uring; 235 unsigned int count = 0; 236 237 if (*retry_list) { 238 *retry_list = io_handle_tw_list(*retry_list, &count, max_entries); 239 if (count >= max_entries) 240 goto out; 241 max_entries -= count; 242 } 243 *retry_list = tctx_task_work_run(tctx, max_entries, &count); 244 out: 245 if (task_work_pending(current)) 246 task_work_run(); 247 return count; 248 } 249 250 static bool io_sq_tw_pending(struct llist_node *retry_list) 251 { 252 struct io_uring_task *tctx = current->io_uring; 253 254 return retry_list || !llist_empty(&tctx->task_list); 255 } 256 257 static void io_sq_update_worktime(struct io_sq_data *sqd, struct rusage *start) 258 { 259 struct rusage end; 260 261 getrusage(current, RUSAGE_SELF, &end); 262 end.ru_stime.tv_sec -= start->ru_stime.tv_sec; 263 end.ru_stime.tv_usec -= start->ru_stime.tv_usec; 264 265 sqd->work_time += end.ru_stime.tv_usec + end.ru_stime.tv_sec * 1000000; 266 } 267 268 static int io_sq_thread(void *data) 269 { 270 struct llist_node *retry_list = NULL; 271 struct io_sq_data *sqd = data; 272 struct io_ring_ctx *ctx; 273 struct rusage start; 274 unsigned long timeout = 0; 275 char buf[TASK_COMM_LEN] = {}; 276 DEFINE_WAIT(wait); 277 278 /* offload context creation failed, just exit */ 279 if (!current->io_uring) { 280 mutex_lock(&sqd->lock); 281 rcu_assign_pointer(sqd->thread, NULL); 282 put_task_struct(current); 283 mutex_unlock(&sqd->lock); 284 goto err_out; 285 } 286 287 snprintf(buf, sizeof(buf), "iou-sqp-%d", sqd->task_pid); 288 set_task_comm(current, buf); 289 290 /* reset to our pid after we've set task_comm, for fdinfo */ 291 sqd->task_pid = current->pid; 292 293 if (sqd->sq_cpu != -1) { 294 set_cpus_allowed_ptr(current, cpumask_of(sqd->sq_cpu)); 295 } else { 296 set_cpus_allowed_ptr(current, cpu_online_mask); 297 sqd->sq_cpu = raw_smp_processor_id(); 298 } 299 300 /* 301 * Force audit context to get setup, in case we do prep side async 302 * operations that would trigger an audit call before any issue side 303 * audit has been done. 304 */ 305 audit_uring_entry(IORING_OP_NOP); 306 audit_uring_exit(true, 0); 307 308 mutex_lock(&sqd->lock); 309 while (1) { 310 bool cap_entries, sqt_spin = false; 311 312 if (io_sqd_events_pending(sqd) || signal_pending(current)) { 313 if (io_sqd_handle_event(sqd)) 314 break; 315 timeout = jiffies + sqd->sq_thread_idle; 316 } 317 318 cap_entries = !list_is_singular(&sqd->ctx_list); 319 getrusage(current, RUSAGE_SELF, &start); 320 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) { 321 int ret = __io_sq_thread(ctx, cap_entries); 322 323 if (!sqt_spin && (ret > 0 || !wq_list_empty(&ctx->iopoll_list))) 324 sqt_spin = true; 325 } 326 if (io_sq_tw(&retry_list, IORING_TW_CAP_ENTRIES_VALUE)) 327 sqt_spin = true; 328 329 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) 330 if (io_napi(ctx)) 331 io_napi_sqpoll_busy_poll(ctx); 332 333 if (sqt_spin || !time_after(jiffies, timeout)) { 334 if (sqt_spin) { 335 io_sq_update_worktime(sqd, &start); 336 timeout = jiffies + sqd->sq_thread_idle; 337 } 338 if (unlikely(need_resched())) { 339 mutex_unlock(&sqd->lock); 340 cond_resched(); 341 mutex_lock(&sqd->lock); 342 sqd->sq_cpu = raw_smp_processor_id(); 343 } 344 continue; 345 } 346 347 prepare_to_wait(&sqd->wait, &wait, TASK_INTERRUPTIBLE); 348 if (!io_sqd_events_pending(sqd) && !io_sq_tw_pending(retry_list)) { 349 bool needs_sched = true; 350 351 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) { 352 atomic_or(IORING_SQ_NEED_WAKEUP, 353 &ctx->rings->sq_flags); 354 if ((ctx->flags & IORING_SETUP_IOPOLL) && 355 !wq_list_empty(&ctx->iopoll_list)) { 356 needs_sched = false; 357 break; 358 } 359 360 /* 361 * Ensure the store of the wakeup flag is not 362 * reordered with the load of the SQ tail 363 */ 364 smp_mb__after_atomic(); 365 366 if (io_sqring_entries(ctx)) { 367 needs_sched = false; 368 break; 369 } 370 } 371 372 if (needs_sched) { 373 mutex_unlock(&sqd->lock); 374 schedule(); 375 mutex_lock(&sqd->lock); 376 sqd->sq_cpu = raw_smp_processor_id(); 377 } 378 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) 379 atomic_andnot(IORING_SQ_NEED_WAKEUP, 380 &ctx->rings->sq_flags); 381 } 382 383 finish_wait(&sqd->wait, &wait); 384 timeout = jiffies + sqd->sq_thread_idle; 385 } 386 387 if (retry_list) 388 io_sq_tw(&retry_list, UINT_MAX); 389 390 io_uring_cancel_generic(true, sqd); 391 rcu_assign_pointer(sqd->thread, NULL); 392 put_task_struct(current); 393 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) 394 atomic_or(IORING_SQ_NEED_WAKEUP, &ctx->rings->sq_flags); 395 io_run_task_work(); 396 mutex_unlock(&sqd->lock); 397 err_out: 398 complete(&sqd->exited); 399 do_exit(0); 400 } 401 402 void io_sqpoll_wait_sq(struct io_ring_ctx *ctx) 403 { 404 DEFINE_WAIT(wait); 405 406 do { 407 if (!io_sqring_full(ctx)) 408 break; 409 prepare_to_wait(&ctx->sqo_sq_wait, &wait, TASK_INTERRUPTIBLE); 410 411 if (!io_sqring_full(ctx)) 412 break; 413 schedule(); 414 } while (!signal_pending(current)); 415 416 finish_wait(&ctx->sqo_sq_wait, &wait); 417 } 418 419 __cold int io_sq_offload_create(struct io_ring_ctx *ctx, 420 struct io_uring_params *p) 421 { 422 struct task_struct *task_to_put = NULL; 423 int ret; 424 425 /* Retain compatibility with failing for an invalid attach attempt */ 426 if ((ctx->flags & (IORING_SETUP_ATTACH_WQ | IORING_SETUP_SQPOLL)) == 427 IORING_SETUP_ATTACH_WQ) { 428 CLASS(fd, f)(p->wq_fd); 429 if (fd_empty(f)) 430 return -ENXIO; 431 if (!io_is_uring_fops(fd_file(f))) 432 return -EINVAL; 433 } 434 if (ctx->flags & IORING_SETUP_SQPOLL) { 435 struct task_struct *tsk; 436 struct io_sq_data *sqd; 437 bool attached; 438 439 ret = security_uring_sqpoll(); 440 if (ret) 441 return ret; 442 443 sqd = io_get_sq_data(p, &attached); 444 if (IS_ERR(sqd)) { 445 ret = PTR_ERR(sqd); 446 goto err; 447 } 448 449 ctx->sq_creds = get_current_cred(); 450 ctx->sq_data = sqd; 451 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle); 452 if (!ctx->sq_thread_idle) 453 ctx->sq_thread_idle = HZ; 454 455 io_sq_thread_park(sqd); 456 list_add(&ctx->sqd_list, &sqd->ctx_list); 457 io_sqd_update_thread_idle(sqd); 458 /* don't attach to a dying SQPOLL thread, would be racy */ 459 ret = (attached && !sqd->thread) ? -ENXIO : 0; 460 io_sq_thread_unpark(sqd); 461 462 if (ret < 0) 463 goto err; 464 if (attached) 465 return 0; 466 467 if (p->flags & IORING_SETUP_SQ_AFF) { 468 cpumask_var_t allowed_mask; 469 int cpu = p->sq_thread_cpu; 470 471 ret = -EINVAL; 472 if (cpu >= nr_cpu_ids || !cpu_online(cpu)) 473 goto err_sqpoll; 474 ret = -ENOMEM; 475 if (!alloc_cpumask_var(&allowed_mask, GFP_KERNEL)) 476 goto err_sqpoll; 477 ret = -EINVAL; 478 cpuset_cpus_allowed(current, allowed_mask); 479 if (!cpumask_test_cpu(cpu, allowed_mask)) { 480 free_cpumask_var(allowed_mask); 481 goto err_sqpoll; 482 } 483 free_cpumask_var(allowed_mask); 484 sqd->sq_cpu = cpu; 485 } else { 486 sqd->sq_cpu = -1; 487 } 488 489 sqd->task_pid = current->pid; 490 sqd->task_tgid = current->tgid; 491 tsk = create_io_thread(io_sq_thread, sqd, NUMA_NO_NODE); 492 if (IS_ERR(tsk)) { 493 ret = PTR_ERR(tsk); 494 goto err_sqpoll; 495 } 496 497 mutex_lock(&sqd->lock); 498 rcu_assign_pointer(sqd->thread, tsk); 499 mutex_unlock(&sqd->lock); 500 501 task_to_put = get_task_struct(tsk); 502 ret = io_uring_alloc_task_context(tsk, ctx); 503 wake_up_new_task(tsk); 504 if (ret) 505 goto err; 506 } else if (p->flags & IORING_SETUP_SQ_AFF) { 507 /* Can't have SQ_AFF without SQPOLL */ 508 ret = -EINVAL; 509 goto err; 510 } 511 return 0; 512 err_sqpoll: 513 complete(&ctx->sq_data->exited); 514 err: 515 io_sq_thread_finish(ctx); 516 if (task_to_put) 517 put_task_struct(task_to_put); 518 return ret; 519 } 520 521 __cold int io_sqpoll_wq_cpu_affinity(struct io_ring_ctx *ctx, 522 cpumask_var_t mask) 523 { 524 struct io_sq_data *sqd = ctx->sq_data; 525 int ret = -EINVAL; 526 527 if (sqd) { 528 struct task_struct *tsk; 529 530 io_sq_thread_park(sqd); 531 /* Don't set affinity for a dying thread */ 532 tsk = sqpoll_task_locked(sqd); 533 if (tsk) 534 ret = io_wq_cpu_affinity(tsk->io_uring, mask); 535 io_sq_thread_unpark(sqd); 536 } 537 538 return ret; 539 } 540