1 // SPDX-License-Identifier: GPL-2.0
2 /*
3 * Contains the core associated with submission side polling of the SQ
4 * ring, offloading submissions from the application to a kernel thread.
5 */
6 #include <linux/kernel.h>
7 #include <linux/errno.h>
8 #include <linux/file.h>
9 #include <linux/mm.h>
10 #include <linux/slab.h>
11 #include <linux/audit.h>
12 #include <linux/security.h>
13 #include <linux/cpuset.h>
14 #include <linux/sched/cputime.h>
15 #include <linux/io_uring.h>
16
17 #include <uapi/linux/io_uring.h>
18
19 #include "io_uring.h"
20 #include "tctx.h"
21 #include "napi.h"
22 #include "cancel.h"
23 #include "sqpoll.h"
24
25 #define IORING_SQPOLL_CAP_ENTRIES_VALUE 8
26 #define IORING_TW_CAP_ENTRIES_VALUE 32
27
28 enum {
29 IO_SQ_THREAD_SHOULD_STOP = 0,
30 IO_SQ_THREAD_SHOULD_PARK,
31 };
32
io_sq_thread_unpark(struct io_sq_data * sqd)33 void io_sq_thread_unpark(struct io_sq_data *sqd)
34 __releases(&sqd->lock)
35 {
36 WARN_ON_ONCE(sqpoll_task_locked(sqd) == current);
37
38 /*
39 * Do the dance but not conditional clear_bit() because it'd race with
40 * other threads incrementing park_pending and setting the bit.
41 */
42 clear_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
43 if (atomic_dec_return(&sqd->park_pending))
44 set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
45 mutex_unlock(&sqd->lock);
46 wake_up(&sqd->wait);
47 }
48
io_sq_thread_park(struct io_sq_data * sqd)49 void io_sq_thread_park(struct io_sq_data *sqd)
50 __acquires(&sqd->lock)
51 {
52 struct task_struct *tsk;
53
54 atomic_inc(&sqd->park_pending);
55 set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
56 mutex_lock(&sqd->lock);
57
58 tsk = sqpoll_task_locked(sqd);
59 if (tsk) {
60 WARN_ON_ONCE(tsk == current);
61 wake_up_process(tsk);
62 }
63 }
64
io_sq_thread_stop(struct io_sq_data * sqd)65 void io_sq_thread_stop(struct io_sq_data *sqd)
66 {
67 struct task_struct *tsk;
68
69 WARN_ON_ONCE(test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state));
70
71 set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
72 mutex_lock(&sqd->lock);
73 tsk = sqpoll_task_locked(sqd);
74 if (tsk) {
75 WARN_ON_ONCE(tsk == current);
76 wake_up_process(tsk);
77 }
78 mutex_unlock(&sqd->lock);
79 wait_for_completion(&sqd->exited);
80 }
81
io_put_sq_data(struct io_sq_data * sqd)82 void io_put_sq_data(struct io_sq_data *sqd)
83 {
84 if (refcount_dec_and_test(&sqd->refs)) {
85 WARN_ON_ONCE(atomic_read(&sqd->park_pending));
86
87 io_sq_thread_stop(sqd);
88 kfree(sqd);
89 }
90 }
91
io_sqd_update_thread_idle(struct io_sq_data * sqd)92 static __cold void io_sqd_update_thread_idle(struct io_sq_data *sqd)
93 {
94 struct io_ring_ctx *ctx;
95 unsigned sq_thread_idle = 0;
96
97 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
98 sq_thread_idle = max(sq_thread_idle, ctx->sq_thread_idle);
99 sqd->sq_thread_idle = sq_thread_idle;
100 }
101
io_sq_thread_finish(struct io_ring_ctx * ctx)102 void io_sq_thread_finish(struct io_ring_ctx *ctx)
103 {
104 struct io_sq_data *sqd = ctx->sq_data;
105
106 if (sqd) {
107 io_sq_thread_park(sqd);
108 list_del_init(&ctx->sqd_list);
109 io_sqd_update_thread_idle(sqd);
110 io_sq_thread_unpark(sqd);
111
112 io_put_sq_data(sqd);
113 ctx->sq_data = NULL;
114 }
115 }
116
io_attach_sq_data(struct io_uring_params * p)117 static struct io_sq_data *io_attach_sq_data(struct io_uring_params *p)
118 {
119 struct io_ring_ctx *ctx_attach;
120 struct io_sq_data *sqd;
121 CLASS(fd, f)(p->wq_fd);
122
123 if (fd_empty(f))
124 return ERR_PTR(-ENXIO);
125 if (!io_is_uring_fops(fd_file(f)))
126 return ERR_PTR(-EINVAL);
127
128 ctx_attach = fd_file(f)->private_data;
129 sqd = ctx_attach->sq_data;
130 if (!sqd)
131 return ERR_PTR(-EINVAL);
132 if (sqd->task_tgid != current->tgid)
133 return ERR_PTR(-EPERM);
134
135 refcount_inc(&sqd->refs);
136 return sqd;
137 }
138
io_get_sq_data(struct io_uring_params * p,bool * attached)139 static struct io_sq_data *io_get_sq_data(struct io_uring_params *p,
140 bool *attached)
141 {
142 struct io_sq_data *sqd;
143
144 *attached = false;
145 if (p->flags & IORING_SETUP_ATTACH_WQ) {
146 sqd = io_attach_sq_data(p);
147 if (!IS_ERR(sqd)) {
148 *attached = true;
149 return sqd;
150 }
151 /* fall through for EPERM case, setup new sqd/task */
152 if (PTR_ERR(sqd) != -EPERM)
153 return sqd;
154 }
155
156 sqd = kzalloc(sizeof(*sqd), GFP_KERNEL);
157 if (!sqd)
158 return ERR_PTR(-ENOMEM);
159
160 atomic_set(&sqd->park_pending, 0);
161 refcount_set(&sqd->refs, 1);
162 INIT_LIST_HEAD(&sqd->ctx_list);
163 mutex_init(&sqd->lock);
164 init_waitqueue_head(&sqd->wait);
165 init_completion(&sqd->exited);
166 return sqd;
167 }
168
io_sqd_events_pending(struct io_sq_data * sqd)169 static inline bool io_sqd_events_pending(struct io_sq_data *sqd)
170 {
171 return READ_ONCE(sqd->state);
172 }
173
174 struct io_sq_time {
175 bool started;
176 u64 usec;
177 };
178
io_sq_cpu_usec(struct task_struct * tsk)179 u64 io_sq_cpu_usec(struct task_struct *tsk)
180 {
181 u64 utime, stime;
182
183 task_cputime_adjusted(tsk, &utime, &stime);
184 do_div(stime, 1000);
185 return stime;
186 }
187
io_sq_update_worktime(struct io_sq_data * sqd,struct io_sq_time * ist)188 static void io_sq_update_worktime(struct io_sq_data *sqd, struct io_sq_time *ist)
189 {
190 if (!ist->started)
191 return;
192 ist->started = false;
193 sqd->work_time += io_sq_cpu_usec(current) - ist->usec;
194 }
195
io_sq_start_worktime(struct io_sq_time * ist)196 static void io_sq_start_worktime(struct io_sq_time *ist)
197 {
198 if (ist->started)
199 return;
200 ist->started = true;
201 ist->usec = io_sq_cpu_usec(current);
202 }
203
__io_sq_thread(struct io_ring_ctx * ctx,struct io_sq_data * sqd,bool cap_entries,struct io_sq_time * ist)204 static int __io_sq_thread(struct io_ring_ctx *ctx, struct io_sq_data *sqd,
205 bool cap_entries, struct io_sq_time *ist)
206 {
207 unsigned int to_submit;
208 int ret = 0;
209
210 to_submit = io_sqring_entries(ctx);
211 /* if we're handling multiple rings, cap submit size for fairness */
212 if (cap_entries && to_submit > IORING_SQPOLL_CAP_ENTRIES_VALUE)
213 to_submit = IORING_SQPOLL_CAP_ENTRIES_VALUE;
214
215 if (to_submit || !wq_list_empty(&ctx->iopoll_list)) {
216 const struct cred *creds = NULL;
217
218 io_sq_start_worktime(ist);
219
220 if (ctx->sq_creds != current_cred())
221 creds = override_creds(ctx->sq_creds);
222
223 mutex_lock(&ctx->uring_lock);
224 if (!wq_list_empty(&ctx->iopoll_list))
225 io_do_iopoll(ctx, true);
226
227 /*
228 * Don't submit if refs are dying, good for io_uring_register(),
229 * but also it is relied upon by io_ring_exit_work()
230 */
231 if (to_submit && likely(!percpu_ref_is_dying(&ctx->refs)) &&
232 !(ctx->flags & IORING_SETUP_R_DISABLED))
233 ret = io_submit_sqes(ctx, to_submit);
234 mutex_unlock(&ctx->uring_lock);
235
236 if (to_submit && wq_has_sleeper(&ctx->sqo_sq_wait))
237 wake_up(&ctx->sqo_sq_wait);
238 if (creds)
239 revert_creds(creds);
240 }
241
242 return ret;
243 }
244
io_sqd_handle_event(struct io_sq_data * sqd)245 static bool io_sqd_handle_event(struct io_sq_data *sqd)
246 {
247 bool did_sig = false;
248 struct ksignal ksig;
249
250 if (test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state) ||
251 signal_pending(current)) {
252 mutex_unlock(&sqd->lock);
253 if (signal_pending(current))
254 did_sig = get_signal(&ksig);
255 wait_event(sqd->wait, !atomic_read(&sqd->park_pending));
256 mutex_lock(&sqd->lock);
257 sqd->sq_cpu = raw_smp_processor_id();
258 }
259 return did_sig || test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
260 }
261
262 /*
263 * Run task_work, processing the retry_list first. The retry_list holds
264 * entries that we passed on in the previous run, if we had more task_work
265 * than we were asked to process. Newly queued task_work isn't run until the
266 * retry list has been fully processed.
267 */
io_sq_tw(struct llist_node ** retry_list,int max_entries)268 static unsigned int io_sq_tw(struct llist_node **retry_list, int max_entries)
269 {
270 struct io_uring_task *tctx = current->io_uring;
271 unsigned int count = 0;
272
273 if (*retry_list) {
274 *retry_list = io_handle_tw_list(*retry_list, &count, max_entries);
275 if (count >= max_entries)
276 goto out;
277 max_entries -= count;
278 }
279 *retry_list = tctx_task_work_run(tctx, max_entries, &count);
280 out:
281 if (task_work_pending(current))
282 task_work_run();
283 return count;
284 }
285
io_sq_tw_pending(struct llist_node * retry_list)286 static bool io_sq_tw_pending(struct llist_node *retry_list)
287 {
288 struct io_uring_task *tctx = current->io_uring;
289
290 return retry_list || !llist_empty(&tctx->task_list);
291 }
292
io_sq_thread(void * data)293 static int io_sq_thread(void *data)
294 {
295 struct llist_node *retry_list = NULL;
296 struct io_sq_data *sqd = data;
297 struct io_ring_ctx *ctx;
298 unsigned long timeout = 0;
299 char buf[TASK_COMM_LEN] = {};
300 DEFINE_WAIT(wait);
301
302 /* offload context creation failed, just exit */
303 if (!current->io_uring) {
304 mutex_lock(&sqd->lock);
305 rcu_assign_pointer(sqd->thread, NULL);
306 put_task_struct(current);
307 mutex_unlock(&sqd->lock);
308 goto err_out;
309 }
310
311 snprintf(buf, sizeof(buf), "iou-sqp-%d", sqd->task_pid);
312 set_task_comm(current, buf);
313
314 /* reset to our pid after we've set task_comm, for fdinfo */
315 sqd->task_pid = current->pid;
316
317 if (sqd->sq_cpu != -1) {
318 set_cpus_allowed_ptr(current, cpumask_of(sqd->sq_cpu));
319 } else {
320 set_cpus_allowed_ptr(current, cpu_online_mask);
321 sqd->sq_cpu = raw_smp_processor_id();
322 }
323
324 /*
325 * Force audit context to get setup, in case we do prep side async
326 * operations that would trigger an audit call before any issue side
327 * audit has been done.
328 */
329 audit_uring_entry(IORING_OP_NOP);
330 audit_uring_exit(true, 0);
331
332 mutex_lock(&sqd->lock);
333 while (1) {
334 bool cap_entries, sqt_spin = false;
335 struct io_sq_time ist = { };
336
337 if (io_sqd_events_pending(sqd) || signal_pending(current)) {
338 if (io_sqd_handle_event(sqd))
339 break;
340 timeout = jiffies + sqd->sq_thread_idle;
341 }
342
343 cap_entries = !list_is_singular(&sqd->ctx_list);
344 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
345 int ret = __io_sq_thread(ctx, sqd, cap_entries, &ist);
346
347 if (!sqt_spin && (ret > 0 || !wq_list_empty(&ctx->iopoll_list)))
348 sqt_spin = true;
349 }
350 if (io_sq_tw(&retry_list, IORING_TW_CAP_ENTRIES_VALUE))
351 sqt_spin = true;
352
353 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
354 if (io_napi(ctx)) {
355 io_sq_start_worktime(&ist);
356 io_napi_sqpoll_busy_poll(ctx);
357 }
358 }
359
360 io_sq_update_worktime(sqd, &ist);
361
362 if (sqt_spin || !time_after(jiffies, timeout)) {
363 if (sqt_spin)
364 timeout = jiffies + sqd->sq_thread_idle;
365 if (unlikely(need_resched())) {
366 mutex_unlock(&sqd->lock);
367 cond_resched();
368 mutex_lock(&sqd->lock);
369 sqd->sq_cpu = raw_smp_processor_id();
370 }
371 continue;
372 }
373
374 prepare_to_wait(&sqd->wait, &wait, TASK_INTERRUPTIBLE);
375 if (!io_sqd_events_pending(sqd) && !io_sq_tw_pending(retry_list)) {
376 bool needs_sched = true;
377
378 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
379 atomic_or(IORING_SQ_NEED_WAKEUP,
380 &ctx->rings->sq_flags);
381 if ((ctx->flags & IORING_SETUP_IOPOLL) &&
382 !wq_list_empty(&ctx->iopoll_list)) {
383 needs_sched = false;
384 break;
385 }
386
387 /*
388 * Ensure the store of the wakeup flag is not
389 * reordered with the load of the SQ tail
390 */
391 smp_mb__after_atomic();
392
393 if (io_sqring_entries(ctx)) {
394 needs_sched = false;
395 break;
396 }
397 }
398
399 if (needs_sched) {
400 mutex_unlock(&sqd->lock);
401 schedule();
402 mutex_lock(&sqd->lock);
403 sqd->sq_cpu = raw_smp_processor_id();
404 }
405 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
406 atomic_andnot(IORING_SQ_NEED_WAKEUP,
407 &ctx->rings->sq_flags);
408 }
409
410 finish_wait(&sqd->wait, &wait);
411 timeout = jiffies + sqd->sq_thread_idle;
412 }
413
414 if (retry_list)
415 io_sq_tw(&retry_list, UINT_MAX);
416
417 io_uring_cancel_generic(true, sqd);
418 rcu_assign_pointer(sqd->thread, NULL);
419 put_task_struct(current);
420 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
421 atomic_or(IORING_SQ_NEED_WAKEUP, &ctx->rings->sq_flags);
422 io_run_task_work();
423 mutex_unlock(&sqd->lock);
424 err_out:
425 complete(&sqd->exited);
426 do_exit(0);
427 }
428
io_sqpoll_wait_sq(struct io_ring_ctx * ctx)429 void io_sqpoll_wait_sq(struct io_ring_ctx *ctx)
430 {
431 DEFINE_WAIT(wait);
432
433 do {
434 if (!io_sqring_full(ctx))
435 break;
436 prepare_to_wait(&ctx->sqo_sq_wait, &wait, TASK_INTERRUPTIBLE);
437
438 if (!io_sqring_full(ctx))
439 break;
440 schedule();
441 } while (!signal_pending(current));
442
443 finish_wait(&ctx->sqo_sq_wait, &wait);
444 }
445
io_sq_offload_create(struct io_ring_ctx * ctx,struct io_uring_params * p)446 __cold int io_sq_offload_create(struct io_ring_ctx *ctx,
447 struct io_uring_params *p)
448 {
449 int ret;
450
451 /* Retain compatibility with failing for an invalid attach attempt */
452 if ((ctx->flags & (IORING_SETUP_ATTACH_WQ | IORING_SETUP_SQPOLL)) ==
453 IORING_SETUP_ATTACH_WQ) {
454 CLASS(fd, f)(p->wq_fd);
455 if (fd_empty(f))
456 return -ENXIO;
457 if (!io_is_uring_fops(fd_file(f)))
458 return -EINVAL;
459 }
460 if (ctx->flags & IORING_SETUP_SQPOLL) {
461 struct task_struct *tsk;
462 struct io_sq_data *sqd;
463 bool attached;
464
465 ret = security_uring_sqpoll();
466 if (ret)
467 return ret;
468
469 sqd = io_get_sq_data(p, &attached);
470 if (IS_ERR(sqd)) {
471 ret = PTR_ERR(sqd);
472 goto err;
473 }
474
475 ctx->sq_creds = get_current_cred();
476 ctx->sq_data = sqd;
477 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
478 if (!ctx->sq_thread_idle)
479 ctx->sq_thread_idle = HZ;
480
481 io_sq_thread_park(sqd);
482 list_add(&ctx->sqd_list, &sqd->ctx_list);
483 io_sqd_update_thread_idle(sqd);
484 /* don't attach to a dying SQPOLL thread, would be racy */
485 ret = (attached && !sqd->thread) ? -ENXIO : 0;
486 io_sq_thread_unpark(sqd);
487
488 if (ret < 0)
489 goto err;
490 if (attached)
491 return 0;
492
493 if (p->flags & IORING_SETUP_SQ_AFF) {
494 cpumask_var_t allowed_mask;
495 int cpu = p->sq_thread_cpu;
496
497 ret = -EINVAL;
498 if (cpu >= nr_cpu_ids || !cpu_online(cpu))
499 goto err_sqpoll;
500 ret = -ENOMEM;
501 if (!alloc_cpumask_var(&allowed_mask, GFP_KERNEL))
502 goto err_sqpoll;
503 ret = -EINVAL;
504 cpuset_cpus_allowed(current, allowed_mask);
505 if (!cpumask_test_cpu(cpu, allowed_mask)) {
506 free_cpumask_var(allowed_mask);
507 goto err_sqpoll;
508 }
509 free_cpumask_var(allowed_mask);
510 sqd->sq_cpu = cpu;
511 } else {
512 sqd->sq_cpu = -1;
513 }
514
515 sqd->task_pid = current->pid;
516 sqd->task_tgid = current->tgid;
517 tsk = create_io_thread(io_sq_thread, sqd, NUMA_NO_NODE);
518 if (IS_ERR(tsk)) {
519 ret = PTR_ERR(tsk);
520 goto err_sqpoll;
521 }
522
523 mutex_lock(&sqd->lock);
524 rcu_assign_pointer(sqd->thread, tsk);
525 mutex_unlock(&sqd->lock);
526
527 get_task_struct(tsk);
528 ret = io_uring_alloc_task_context(tsk, ctx);
529 wake_up_new_task(tsk);
530 if (ret)
531 goto err;
532 } else if (p->flags & IORING_SETUP_SQ_AFF) {
533 /* Can't have SQ_AFF without SQPOLL */
534 ret = -EINVAL;
535 goto err;
536 }
537 return 0;
538 err_sqpoll:
539 complete(&ctx->sq_data->exited);
540 err:
541 io_sq_thread_finish(ctx);
542 return ret;
543 }
544
io_sqpoll_wq_cpu_affinity(struct io_ring_ctx * ctx,cpumask_var_t mask)545 __cold int io_sqpoll_wq_cpu_affinity(struct io_ring_ctx *ctx,
546 cpumask_var_t mask)
547 {
548 struct io_sq_data *sqd = ctx->sq_data;
549 int ret = -EINVAL;
550
551 if (sqd) {
552 struct task_struct *tsk;
553
554 io_sq_thread_park(sqd);
555 /* Don't set affinity for a dying thread */
556 tsk = sqpoll_task_locked(sqd);
557 if (tsk)
558 ret = io_wq_cpu_affinity(tsk->io_uring, mask);
559 io_sq_thread_unpark(sqd);
560 }
561
562 return ret;
563 }
564