1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Contains the core associated with submission side polling of the SQ 4 * ring, offloading submissions from the application to a kernel thread. 5 */ 6 #include <linux/kernel.h> 7 #include <linux/errno.h> 8 #include <linux/file.h> 9 #include <linux/mm.h> 10 #include <linux/slab.h> 11 #include <linux/audit.h> 12 #include <linux/security.h> 13 #include <linux/cpuset.h> 14 #include <linux/io_uring.h> 15 16 #include <uapi/linux/io_uring.h> 17 18 #include "io_uring.h" 19 #include "napi.h" 20 #include "sqpoll.h" 21 22 #define IORING_SQPOLL_CAP_ENTRIES_VALUE 8 23 #define IORING_TW_CAP_ENTRIES_VALUE 8 24 25 enum { 26 IO_SQ_THREAD_SHOULD_STOP = 0, 27 IO_SQ_THREAD_SHOULD_PARK, 28 }; 29 30 void io_sq_thread_unpark(struct io_sq_data *sqd) 31 __releases(&sqd->lock) 32 { 33 WARN_ON_ONCE(sqd->thread == current); 34 35 /* 36 * Do the dance but not conditional clear_bit() because it'd race with 37 * other threads incrementing park_pending and setting the bit. 38 */ 39 clear_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); 40 if (atomic_dec_return(&sqd->park_pending)) 41 set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); 42 mutex_unlock(&sqd->lock); 43 } 44 45 void io_sq_thread_park(struct io_sq_data *sqd) 46 __acquires(&sqd->lock) 47 { 48 WARN_ON_ONCE(data_race(sqd->thread) == current); 49 50 atomic_inc(&sqd->park_pending); 51 set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); 52 mutex_lock(&sqd->lock); 53 if (sqd->thread) 54 wake_up_process(sqd->thread); 55 } 56 57 void io_sq_thread_stop(struct io_sq_data *sqd) 58 { 59 WARN_ON_ONCE(sqd->thread == current); 60 WARN_ON_ONCE(test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state)); 61 62 set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state); 63 mutex_lock(&sqd->lock); 64 if (sqd->thread) 65 wake_up_process(sqd->thread); 66 mutex_unlock(&sqd->lock); 67 wait_for_completion(&sqd->exited); 68 } 69 70 void io_put_sq_data(struct io_sq_data *sqd) 71 { 72 if (refcount_dec_and_test(&sqd->refs)) { 73 WARN_ON_ONCE(atomic_read(&sqd->park_pending)); 74 75 io_sq_thread_stop(sqd); 76 kfree(sqd); 77 } 78 } 79 80 static __cold void io_sqd_update_thread_idle(struct io_sq_data *sqd) 81 { 82 struct io_ring_ctx *ctx; 83 unsigned sq_thread_idle = 0; 84 85 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) 86 sq_thread_idle = max(sq_thread_idle, ctx->sq_thread_idle); 87 sqd->sq_thread_idle = sq_thread_idle; 88 } 89 90 void io_sq_thread_finish(struct io_ring_ctx *ctx) 91 { 92 struct io_sq_data *sqd = ctx->sq_data; 93 94 if (sqd) { 95 io_sq_thread_park(sqd); 96 list_del_init(&ctx->sqd_list); 97 io_sqd_update_thread_idle(sqd); 98 io_sq_thread_unpark(sqd); 99 100 io_put_sq_data(sqd); 101 ctx->sq_data = NULL; 102 } 103 } 104 105 static struct io_sq_data *io_attach_sq_data(struct io_uring_params *p) 106 { 107 struct io_ring_ctx *ctx_attach; 108 struct io_sq_data *sqd; 109 struct fd f; 110 111 f = fdget(p->wq_fd); 112 if (!fd_file(f)) 113 return ERR_PTR(-ENXIO); 114 if (!io_is_uring_fops(fd_file(f))) { 115 fdput(f); 116 return ERR_PTR(-EINVAL); 117 } 118 119 ctx_attach = fd_file(f)->private_data; 120 sqd = ctx_attach->sq_data; 121 if (!sqd) { 122 fdput(f); 123 return ERR_PTR(-EINVAL); 124 } 125 if (sqd->task_tgid != current->tgid) { 126 fdput(f); 127 return ERR_PTR(-EPERM); 128 } 129 130 refcount_inc(&sqd->refs); 131 fdput(f); 132 return sqd; 133 } 134 135 static struct io_sq_data *io_get_sq_data(struct io_uring_params *p, 136 bool *attached) 137 { 138 struct io_sq_data *sqd; 139 140 *attached = false; 141 if (p->flags & IORING_SETUP_ATTACH_WQ) { 142 sqd = io_attach_sq_data(p); 143 if (!IS_ERR(sqd)) { 144 *attached = true; 145 return sqd; 146 } 147 /* fall through for EPERM case, setup new sqd/task */ 148 if (PTR_ERR(sqd) != -EPERM) 149 return sqd; 150 } 151 152 sqd = kzalloc(sizeof(*sqd), GFP_KERNEL); 153 if (!sqd) 154 return ERR_PTR(-ENOMEM); 155 156 atomic_set(&sqd->park_pending, 0); 157 refcount_set(&sqd->refs, 1); 158 INIT_LIST_HEAD(&sqd->ctx_list); 159 mutex_init(&sqd->lock); 160 init_waitqueue_head(&sqd->wait); 161 init_completion(&sqd->exited); 162 return sqd; 163 } 164 165 static inline bool io_sqd_events_pending(struct io_sq_data *sqd) 166 { 167 return READ_ONCE(sqd->state); 168 } 169 170 static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries) 171 { 172 unsigned int to_submit; 173 int ret = 0; 174 175 to_submit = io_sqring_entries(ctx); 176 /* if we're handling multiple rings, cap submit size for fairness */ 177 if (cap_entries && to_submit > IORING_SQPOLL_CAP_ENTRIES_VALUE) 178 to_submit = IORING_SQPOLL_CAP_ENTRIES_VALUE; 179 180 if (to_submit || !wq_list_empty(&ctx->iopoll_list)) { 181 const struct cred *creds = NULL; 182 183 if (ctx->sq_creds != current_cred()) 184 creds = override_creds(ctx->sq_creds); 185 186 mutex_lock(&ctx->uring_lock); 187 if (!wq_list_empty(&ctx->iopoll_list)) 188 io_do_iopoll(ctx, true); 189 190 /* 191 * Don't submit if refs are dying, good for io_uring_register(), 192 * but also it is relied upon by io_ring_exit_work() 193 */ 194 if (to_submit && likely(!percpu_ref_is_dying(&ctx->refs)) && 195 !(ctx->flags & IORING_SETUP_R_DISABLED)) 196 ret = io_submit_sqes(ctx, to_submit); 197 mutex_unlock(&ctx->uring_lock); 198 199 if (to_submit && wq_has_sleeper(&ctx->sqo_sq_wait)) 200 wake_up(&ctx->sqo_sq_wait); 201 if (creds) 202 revert_creds(creds); 203 } 204 205 return ret; 206 } 207 208 static bool io_sqd_handle_event(struct io_sq_data *sqd) 209 { 210 bool did_sig = false; 211 struct ksignal ksig; 212 213 if (test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state) || 214 signal_pending(current)) { 215 mutex_unlock(&sqd->lock); 216 if (signal_pending(current)) 217 did_sig = get_signal(&ksig); 218 cond_resched(); 219 mutex_lock(&sqd->lock); 220 sqd->sq_cpu = raw_smp_processor_id(); 221 } 222 return did_sig || test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state); 223 } 224 225 /* 226 * Run task_work, processing the retry_list first. The retry_list holds 227 * entries that we passed on in the previous run, if we had more task_work 228 * than we were asked to process. Newly queued task_work isn't run until the 229 * retry list has been fully processed. 230 */ 231 static unsigned int io_sq_tw(struct llist_node **retry_list, int max_entries) 232 { 233 struct io_uring_task *tctx = current->io_uring; 234 unsigned int count = 0; 235 236 if (*retry_list) { 237 *retry_list = io_handle_tw_list(*retry_list, &count, max_entries); 238 if (count >= max_entries) 239 goto out; 240 max_entries -= count; 241 } 242 *retry_list = tctx_task_work_run(tctx, max_entries, &count); 243 out: 244 if (task_work_pending(current)) 245 task_work_run(); 246 return count; 247 } 248 249 static bool io_sq_tw_pending(struct llist_node *retry_list) 250 { 251 struct io_uring_task *tctx = current->io_uring; 252 253 return retry_list || !llist_empty(&tctx->task_list); 254 } 255 256 static void io_sq_update_worktime(struct io_sq_data *sqd, struct rusage *start) 257 { 258 struct rusage end; 259 260 getrusage(current, RUSAGE_SELF, &end); 261 end.ru_stime.tv_sec -= start->ru_stime.tv_sec; 262 end.ru_stime.tv_usec -= start->ru_stime.tv_usec; 263 264 sqd->work_time += end.ru_stime.tv_usec + end.ru_stime.tv_sec * 1000000; 265 } 266 267 static int io_sq_thread(void *data) 268 { 269 struct llist_node *retry_list = NULL; 270 struct io_sq_data *sqd = data; 271 struct io_ring_ctx *ctx; 272 struct rusage start; 273 unsigned long timeout = 0; 274 char buf[TASK_COMM_LEN]; 275 DEFINE_WAIT(wait); 276 277 /* offload context creation failed, just exit */ 278 if (!current->io_uring) 279 goto err_out; 280 281 snprintf(buf, sizeof(buf), "iou-sqp-%d", sqd->task_pid); 282 set_task_comm(current, buf); 283 284 /* reset to our pid after we've set task_comm, for fdinfo */ 285 sqd->task_pid = current->pid; 286 287 if (sqd->sq_cpu != -1) { 288 set_cpus_allowed_ptr(current, cpumask_of(sqd->sq_cpu)); 289 } else { 290 set_cpus_allowed_ptr(current, cpu_online_mask); 291 sqd->sq_cpu = raw_smp_processor_id(); 292 } 293 294 /* 295 * Force audit context to get setup, in case we do prep side async 296 * operations that would trigger an audit call before any issue side 297 * audit has been done. 298 */ 299 audit_uring_entry(IORING_OP_NOP); 300 audit_uring_exit(true, 0); 301 302 mutex_lock(&sqd->lock); 303 while (1) { 304 bool cap_entries, sqt_spin = false; 305 306 if (io_sqd_events_pending(sqd) || signal_pending(current)) { 307 if (io_sqd_handle_event(sqd)) 308 break; 309 timeout = jiffies + sqd->sq_thread_idle; 310 } 311 312 cap_entries = !list_is_singular(&sqd->ctx_list); 313 getrusage(current, RUSAGE_SELF, &start); 314 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) { 315 int ret = __io_sq_thread(ctx, cap_entries); 316 317 if (!sqt_spin && (ret > 0 || !wq_list_empty(&ctx->iopoll_list))) 318 sqt_spin = true; 319 } 320 if (io_sq_tw(&retry_list, IORING_TW_CAP_ENTRIES_VALUE)) 321 sqt_spin = true; 322 323 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) 324 if (io_napi(ctx)) 325 io_napi_sqpoll_busy_poll(ctx); 326 327 if (sqt_spin || !time_after(jiffies, timeout)) { 328 if (sqt_spin) { 329 io_sq_update_worktime(sqd, &start); 330 timeout = jiffies + sqd->sq_thread_idle; 331 } 332 if (unlikely(need_resched())) { 333 mutex_unlock(&sqd->lock); 334 cond_resched(); 335 mutex_lock(&sqd->lock); 336 sqd->sq_cpu = raw_smp_processor_id(); 337 } 338 continue; 339 } 340 341 prepare_to_wait(&sqd->wait, &wait, TASK_INTERRUPTIBLE); 342 if (!io_sqd_events_pending(sqd) && !io_sq_tw_pending(retry_list)) { 343 bool needs_sched = true; 344 345 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) { 346 atomic_or(IORING_SQ_NEED_WAKEUP, 347 &ctx->rings->sq_flags); 348 if ((ctx->flags & IORING_SETUP_IOPOLL) && 349 !wq_list_empty(&ctx->iopoll_list)) { 350 needs_sched = false; 351 break; 352 } 353 354 /* 355 * Ensure the store of the wakeup flag is not 356 * reordered with the load of the SQ tail 357 */ 358 smp_mb__after_atomic(); 359 360 if (io_sqring_entries(ctx)) { 361 needs_sched = false; 362 break; 363 } 364 } 365 366 if (needs_sched) { 367 mutex_unlock(&sqd->lock); 368 schedule(); 369 mutex_lock(&sqd->lock); 370 sqd->sq_cpu = raw_smp_processor_id(); 371 } 372 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) 373 atomic_andnot(IORING_SQ_NEED_WAKEUP, 374 &ctx->rings->sq_flags); 375 } 376 377 finish_wait(&sqd->wait, &wait); 378 timeout = jiffies + sqd->sq_thread_idle; 379 } 380 381 if (retry_list) 382 io_sq_tw(&retry_list, UINT_MAX); 383 384 io_uring_cancel_generic(true, sqd); 385 sqd->thread = NULL; 386 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) 387 atomic_or(IORING_SQ_NEED_WAKEUP, &ctx->rings->sq_flags); 388 io_run_task_work(); 389 mutex_unlock(&sqd->lock); 390 err_out: 391 complete(&sqd->exited); 392 do_exit(0); 393 } 394 395 void io_sqpoll_wait_sq(struct io_ring_ctx *ctx) 396 { 397 DEFINE_WAIT(wait); 398 399 do { 400 if (!io_sqring_full(ctx)) 401 break; 402 prepare_to_wait(&ctx->sqo_sq_wait, &wait, TASK_INTERRUPTIBLE); 403 404 if (!io_sqring_full(ctx)) 405 break; 406 schedule(); 407 } while (!signal_pending(current)); 408 409 finish_wait(&ctx->sqo_sq_wait, &wait); 410 } 411 412 __cold int io_sq_offload_create(struct io_ring_ctx *ctx, 413 struct io_uring_params *p) 414 { 415 int ret; 416 417 /* Retain compatibility with failing for an invalid attach attempt */ 418 if ((ctx->flags & (IORING_SETUP_ATTACH_WQ | IORING_SETUP_SQPOLL)) == 419 IORING_SETUP_ATTACH_WQ) { 420 struct fd f; 421 422 f = fdget(p->wq_fd); 423 if (!fd_file(f)) 424 return -ENXIO; 425 if (!io_is_uring_fops(fd_file(f))) { 426 fdput(f); 427 return -EINVAL; 428 } 429 fdput(f); 430 } 431 if (ctx->flags & IORING_SETUP_SQPOLL) { 432 struct task_struct *tsk; 433 struct io_sq_data *sqd; 434 bool attached; 435 436 ret = security_uring_sqpoll(); 437 if (ret) 438 return ret; 439 440 sqd = io_get_sq_data(p, &attached); 441 if (IS_ERR(sqd)) { 442 ret = PTR_ERR(sqd); 443 goto err; 444 } 445 446 ctx->sq_creds = get_current_cred(); 447 ctx->sq_data = sqd; 448 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle); 449 if (!ctx->sq_thread_idle) 450 ctx->sq_thread_idle = HZ; 451 452 io_sq_thread_park(sqd); 453 list_add(&ctx->sqd_list, &sqd->ctx_list); 454 io_sqd_update_thread_idle(sqd); 455 /* don't attach to a dying SQPOLL thread, would be racy */ 456 ret = (attached && !sqd->thread) ? -ENXIO : 0; 457 io_sq_thread_unpark(sqd); 458 459 if (ret < 0) 460 goto err; 461 if (attached) 462 return 0; 463 464 if (p->flags & IORING_SETUP_SQ_AFF) { 465 cpumask_var_t allowed_mask; 466 int cpu = p->sq_thread_cpu; 467 468 ret = -EINVAL; 469 if (cpu >= nr_cpu_ids || !cpu_online(cpu)) 470 goto err_sqpoll; 471 ret = -ENOMEM; 472 if (!alloc_cpumask_var(&allowed_mask, GFP_KERNEL)) 473 goto err_sqpoll; 474 ret = -EINVAL; 475 cpuset_cpus_allowed(current, allowed_mask); 476 if (!cpumask_test_cpu(cpu, allowed_mask)) { 477 free_cpumask_var(allowed_mask); 478 goto err_sqpoll; 479 } 480 free_cpumask_var(allowed_mask); 481 sqd->sq_cpu = cpu; 482 } else { 483 sqd->sq_cpu = -1; 484 } 485 486 sqd->task_pid = current->pid; 487 sqd->task_tgid = current->tgid; 488 tsk = create_io_thread(io_sq_thread, sqd, NUMA_NO_NODE); 489 if (IS_ERR(tsk)) { 490 ret = PTR_ERR(tsk); 491 goto err_sqpoll; 492 } 493 494 sqd->thread = tsk; 495 ret = io_uring_alloc_task_context(tsk, ctx); 496 wake_up_new_task(tsk); 497 if (ret) 498 goto err; 499 } else if (p->flags & IORING_SETUP_SQ_AFF) { 500 /* Can't have SQ_AFF without SQPOLL */ 501 ret = -EINVAL; 502 goto err; 503 } 504 505 return 0; 506 err_sqpoll: 507 complete(&ctx->sq_data->exited); 508 err: 509 io_sq_thread_finish(ctx); 510 return ret; 511 } 512 513 __cold int io_sqpoll_wq_cpu_affinity(struct io_ring_ctx *ctx, 514 cpumask_var_t mask) 515 { 516 struct io_sq_data *sqd = ctx->sq_data; 517 int ret = -EINVAL; 518 519 if (sqd) { 520 io_sq_thread_park(sqd); 521 /* Don't set affinity for a dying thread */ 522 if (sqd->thread) 523 ret = io_wq_cpu_affinity(sqd->thread->io_uring, mask); 524 io_sq_thread_unpark(sqd); 525 } 526 527 return ret; 528 } 529