1 // SPDX-License-Identifier: GPL-2.0
2 /*
3 * Contains the core associated with submission side polling of the SQ
4 * ring, offloading submissions from the application to a kernel thread.
5 */
6 #include <linux/kernel.h>
7 #include <linux/errno.h>
8 #include <linux/file.h>
9 #include <linux/mm.h>
10 #include <linux/slab.h>
11 #include <linux/audit.h>
12 #include <linux/security.h>
13 #include <linux/cpuset.h>
14 #include <linux/sched/cputime.h>
15 #include <linux/io_uring.h>
16
17 #include <uapi/linux/io_uring.h>
18
19 #include "io_uring.h"
20 #include "tctx.h"
21 #include "napi.h"
22 #include "sqpoll.h"
23
24 #define IORING_SQPOLL_CAP_ENTRIES_VALUE 8
25 #define IORING_TW_CAP_ENTRIES_VALUE 32
26
27 enum {
28 IO_SQ_THREAD_SHOULD_STOP = 0,
29 IO_SQ_THREAD_SHOULD_PARK,
30 };
31
io_sq_thread_unpark(struct io_sq_data * sqd)32 void io_sq_thread_unpark(struct io_sq_data *sqd)
33 __releases(&sqd->lock)
34 {
35 WARN_ON_ONCE(sqpoll_task_locked(sqd) == current);
36
37 /*
38 * Do the dance but not conditional clear_bit() because it'd race with
39 * other threads incrementing park_pending and setting the bit.
40 */
41 clear_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
42 if (atomic_dec_return(&sqd->park_pending))
43 set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
44 mutex_unlock(&sqd->lock);
45 wake_up(&sqd->wait);
46 }
47
io_sq_thread_park(struct io_sq_data * sqd)48 void io_sq_thread_park(struct io_sq_data *sqd)
49 __acquires(&sqd->lock)
50 {
51 struct task_struct *tsk;
52
53 atomic_inc(&sqd->park_pending);
54 set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
55 mutex_lock(&sqd->lock);
56
57 tsk = sqpoll_task_locked(sqd);
58 if (tsk) {
59 WARN_ON_ONCE(tsk == current);
60 wake_up_process(tsk);
61 }
62 }
63
io_sq_thread_stop(struct io_sq_data * sqd)64 void io_sq_thread_stop(struct io_sq_data *sqd)
65 {
66 struct task_struct *tsk;
67
68 WARN_ON_ONCE(test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state));
69
70 set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
71 mutex_lock(&sqd->lock);
72 tsk = sqpoll_task_locked(sqd);
73 if (tsk) {
74 WARN_ON_ONCE(tsk == current);
75 wake_up_process(tsk);
76 }
77 mutex_unlock(&sqd->lock);
78 wait_for_completion(&sqd->exited);
79 }
80
io_put_sq_data(struct io_sq_data * sqd)81 void io_put_sq_data(struct io_sq_data *sqd)
82 {
83 if (refcount_dec_and_test(&sqd->refs)) {
84 WARN_ON_ONCE(atomic_read(&sqd->park_pending));
85
86 io_sq_thread_stop(sqd);
87 kfree(sqd);
88 }
89 }
90
io_sqd_update_thread_idle(struct io_sq_data * sqd)91 static __cold void io_sqd_update_thread_idle(struct io_sq_data *sqd)
92 {
93 struct io_ring_ctx *ctx;
94 unsigned sq_thread_idle = 0;
95
96 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
97 sq_thread_idle = max(sq_thread_idle, ctx->sq_thread_idle);
98 sqd->sq_thread_idle = sq_thread_idle;
99 }
100
io_sq_thread_finish(struct io_ring_ctx * ctx)101 void io_sq_thread_finish(struct io_ring_ctx *ctx)
102 {
103 struct io_sq_data *sqd = ctx->sq_data;
104
105 if (sqd) {
106 io_sq_thread_park(sqd);
107 list_del_init(&ctx->sqd_list);
108 io_sqd_update_thread_idle(sqd);
109 io_sq_thread_unpark(sqd);
110
111 io_put_sq_data(sqd);
112 ctx->sq_data = NULL;
113 }
114 }
115
io_attach_sq_data(struct io_uring_params * p)116 static struct io_sq_data *io_attach_sq_data(struct io_uring_params *p)
117 {
118 struct io_ring_ctx *ctx_attach;
119 struct io_sq_data *sqd;
120 CLASS(fd, f)(p->wq_fd);
121
122 if (fd_empty(f))
123 return ERR_PTR(-ENXIO);
124 if (!io_is_uring_fops(fd_file(f)))
125 return ERR_PTR(-EINVAL);
126
127 ctx_attach = fd_file(f)->private_data;
128 sqd = ctx_attach->sq_data;
129 if (!sqd)
130 return ERR_PTR(-EINVAL);
131 if (sqd->task_tgid != current->tgid)
132 return ERR_PTR(-EPERM);
133
134 refcount_inc(&sqd->refs);
135 return sqd;
136 }
137
io_get_sq_data(struct io_uring_params * p,bool * attached)138 static struct io_sq_data *io_get_sq_data(struct io_uring_params *p,
139 bool *attached)
140 {
141 struct io_sq_data *sqd;
142
143 *attached = false;
144 if (p->flags & IORING_SETUP_ATTACH_WQ) {
145 sqd = io_attach_sq_data(p);
146 if (!IS_ERR(sqd)) {
147 *attached = true;
148 return sqd;
149 }
150 /* fall through for EPERM case, setup new sqd/task */
151 if (PTR_ERR(sqd) != -EPERM)
152 return sqd;
153 }
154
155 sqd = kzalloc(sizeof(*sqd), GFP_KERNEL);
156 if (!sqd)
157 return ERR_PTR(-ENOMEM);
158
159 atomic_set(&sqd->park_pending, 0);
160 refcount_set(&sqd->refs, 1);
161 INIT_LIST_HEAD(&sqd->ctx_list);
162 mutex_init(&sqd->lock);
163 init_waitqueue_head(&sqd->wait);
164 init_completion(&sqd->exited);
165 return sqd;
166 }
167
io_sqd_events_pending(struct io_sq_data * sqd)168 static inline bool io_sqd_events_pending(struct io_sq_data *sqd)
169 {
170 return READ_ONCE(sqd->state);
171 }
172
173 struct io_sq_time {
174 bool started;
175 u64 usec;
176 };
177
io_sq_cpu_usec(struct task_struct * tsk)178 u64 io_sq_cpu_usec(struct task_struct *tsk)
179 {
180 u64 utime, stime;
181
182 task_cputime_adjusted(tsk, &utime, &stime);
183 do_div(stime, 1000);
184 return stime;
185 }
186
io_sq_update_worktime(struct io_sq_data * sqd,struct io_sq_time * ist)187 static void io_sq_update_worktime(struct io_sq_data *sqd, struct io_sq_time *ist)
188 {
189 if (!ist->started)
190 return;
191 ist->started = false;
192 sqd->work_time += io_sq_cpu_usec(current) - ist->usec;
193 }
194
io_sq_start_worktime(struct io_sq_time * ist)195 static void io_sq_start_worktime(struct io_sq_time *ist)
196 {
197 if (ist->started)
198 return;
199 ist->started = true;
200 ist->usec = io_sq_cpu_usec(current);
201 }
202
__io_sq_thread(struct io_ring_ctx * ctx,struct io_sq_data * sqd,bool cap_entries,struct io_sq_time * ist)203 static int __io_sq_thread(struct io_ring_ctx *ctx, struct io_sq_data *sqd,
204 bool cap_entries, struct io_sq_time *ist)
205 {
206 unsigned int to_submit;
207 int ret = 0;
208
209 to_submit = io_sqring_entries(ctx);
210 /* if we're handling multiple rings, cap submit size for fairness */
211 if (cap_entries && to_submit > IORING_SQPOLL_CAP_ENTRIES_VALUE)
212 to_submit = IORING_SQPOLL_CAP_ENTRIES_VALUE;
213
214 if (to_submit || !wq_list_empty(&ctx->iopoll_list)) {
215 const struct cred *creds = NULL;
216
217 io_sq_start_worktime(ist);
218
219 if (ctx->sq_creds != current_cred())
220 creds = override_creds(ctx->sq_creds);
221
222 mutex_lock(&ctx->uring_lock);
223 if (!wq_list_empty(&ctx->iopoll_list))
224 io_do_iopoll(ctx, true);
225
226 /*
227 * Don't submit if refs are dying, good for io_uring_register(),
228 * but also it is relied upon by io_ring_exit_work()
229 */
230 if (to_submit && likely(!percpu_ref_is_dying(&ctx->refs)) &&
231 !(ctx->flags & IORING_SETUP_R_DISABLED))
232 ret = io_submit_sqes(ctx, to_submit);
233 mutex_unlock(&ctx->uring_lock);
234
235 if (to_submit && wq_has_sleeper(&ctx->sqo_sq_wait))
236 wake_up(&ctx->sqo_sq_wait);
237 if (creds)
238 revert_creds(creds);
239 }
240
241 return ret;
242 }
243
io_sqd_handle_event(struct io_sq_data * sqd)244 static bool io_sqd_handle_event(struct io_sq_data *sqd)
245 {
246 bool did_sig = false;
247 struct ksignal ksig;
248
249 if (test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state) ||
250 signal_pending(current)) {
251 mutex_unlock(&sqd->lock);
252 if (signal_pending(current))
253 did_sig = get_signal(&ksig);
254 wait_event(sqd->wait, !atomic_read(&sqd->park_pending));
255 mutex_lock(&sqd->lock);
256 sqd->sq_cpu = raw_smp_processor_id();
257 }
258 return did_sig || test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
259 }
260
261 /*
262 * Run task_work, processing the retry_list first. The retry_list holds
263 * entries that we passed on in the previous run, if we had more task_work
264 * than we were asked to process. Newly queued task_work isn't run until the
265 * retry list has been fully processed.
266 */
io_sq_tw(struct llist_node ** retry_list,int max_entries)267 static unsigned int io_sq_tw(struct llist_node **retry_list, int max_entries)
268 {
269 struct io_uring_task *tctx = current->io_uring;
270 unsigned int count = 0;
271
272 if (*retry_list) {
273 *retry_list = io_handle_tw_list(*retry_list, &count, max_entries);
274 if (count >= max_entries)
275 goto out;
276 max_entries -= count;
277 }
278 *retry_list = tctx_task_work_run(tctx, max_entries, &count);
279 out:
280 if (task_work_pending(current))
281 task_work_run();
282 return count;
283 }
284
io_sq_tw_pending(struct llist_node * retry_list)285 static bool io_sq_tw_pending(struct llist_node *retry_list)
286 {
287 struct io_uring_task *tctx = current->io_uring;
288
289 return retry_list || !llist_empty(&tctx->task_list);
290 }
291
io_sq_thread(void * data)292 static int io_sq_thread(void *data)
293 {
294 struct llist_node *retry_list = NULL;
295 struct io_sq_data *sqd = data;
296 struct io_ring_ctx *ctx;
297 unsigned long timeout = 0;
298 char buf[TASK_COMM_LEN] = {};
299 DEFINE_WAIT(wait);
300
301 /* offload context creation failed, just exit */
302 if (!current->io_uring) {
303 mutex_lock(&sqd->lock);
304 rcu_assign_pointer(sqd->thread, NULL);
305 put_task_struct(current);
306 mutex_unlock(&sqd->lock);
307 goto err_out;
308 }
309
310 snprintf(buf, sizeof(buf), "iou-sqp-%d", sqd->task_pid);
311 set_task_comm(current, buf);
312
313 /* reset to our pid after we've set task_comm, for fdinfo */
314 sqd->task_pid = current->pid;
315
316 if (sqd->sq_cpu != -1) {
317 set_cpus_allowed_ptr(current, cpumask_of(sqd->sq_cpu));
318 } else {
319 set_cpus_allowed_ptr(current, cpu_online_mask);
320 sqd->sq_cpu = raw_smp_processor_id();
321 }
322
323 /*
324 * Force audit context to get setup, in case we do prep side async
325 * operations that would trigger an audit call before any issue side
326 * audit has been done.
327 */
328 audit_uring_entry(IORING_OP_NOP);
329 audit_uring_exit(true, 0);
330
331 mutex_lock(&sqd->lock);
332 while (1) {
333 bool cap_entries, sqt_spin = false;
334 struct io_sq_time ist = { };
335
336 if (io_sqd_events_pending(sqd) || signal_pending(current)) {
337 if (io_sqd_handle_event(sqd))
338 break;
339 timeout = jiffies + sqd->sq_thread_idle;
340 }
341
342 cap_entries = !list_is_singular(&sqd->ctx_list);
343 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
344 int ret = __io_sq_thread(ctx, sqd, cap_entries, &ist);
345
346 if (!sqt_spin && (ret > 0 || !wq_list_empty(&ctx->iopoll_list)))
347 sqt_spin = true;
348 }
349 if (io_sq_tw(&retry_list, IORING_TW_CAP_ENTRIES_VALUE))
350 sqt_spin = true;
351
352 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
353 if (io_napi(ctx)) {
354 io_sq_start_worktime(&ist);
355 io_napi_sqpoll_busy_poll(ctx);
356 }
357 }
358
359 io_sq_update_worktime(sqd, &ist);
360
361 if (sqt_spin || !time_after(jiffies, timeout)) {
362 if (sqt_spin)
363 timeout = jiffies + sqd->sq_thread_idle;
364 if (unlikely(need_resched())) {
365 mutex_unlock(&sqd->lock);
366 cond_resched();
367 mutex_lock(&sqd->lock);
368 sqd->sq_cpu = raw_smp_processor_id();
369 }
370 continue;
371 }
372
373 prepare_to_wait(&sqd->wait, &wait, TASK_INTERRUPTIBLE);
374 if (!io_sqd_events_pending(sqd) && !io_sq_tw_pending(retry_list)) {
375 bool needs_sched = true;
376
377 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
378 atomic_or(IORING_SQ_NEED_WAKEUP,
379 &ctx->rings->sq_flags);
380 if ((ctx->flags & IORING_SETUP_IOPOLL) &&
381 !wq_list_empty(&ctx->iopoll_list)) {
382 needs_sched = false;
383 break;
384 }
385
386 /*
387 * Ensure the store of the wakeup flag is not
388 * reordered with the load of the SQ tail
389 */
390 smp_mb__after_atomic();
391
392 if (io_sqring_entries(ctx)) {
393 needs_sched = false;
394 break;
395 }
396 }
397
398 if (needs_sched) {
399 mutex_unlock(&sqd->lock);
400 schedule();
401 mutex_lock(&sqd->lock);
402 sqd->sq_cpu = raw_smp_processor_id();
403 }
404 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
405 atomic_andnot(IORING_SQ_NEED_WAKEUP,
406 &ctx->rings->sq_flags);
407 }
408
409 finish_wait(&sqd->wait, &wait);
410 timeout = jiffies + sqd->sq_thread_idle;
411 }
412
413 if (retry_list)
414 io_sq_tw(&retry_list, UINT_MAX);
415
416 io_uring_cancel_generic(true, sqd);
417 rcu_assign_pointer(sqd->thread, NULL);
418 put_task_struct(current);
419 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
420 atomic_or(IORING_SQ_NEED_WAKEUP, &ctx->rings->sq_flags);
421 io_run_task_work();
422 mutex_unlock(&sqd->lock);
423 err_out:
424 complete(&sqd->exited);
425 do_exit(0);
426 }
427
io_sqpoll_wait_sq(struct io_ring_ctx * ctx)428 void io_sqpoll_wait_sq(struct io_ring_ctx *ctx)
429 {
430 DEFINE_WAIT(wait);
431
432 do {
433 if (!io_sqring_full(ctx))
434 break;
435 prepare_to_wait(&ctx->sqo_sq_wait, &wait, TASK_INTERRUPTIBLE);
436
437 if (!io_sqring_full(ctx))
438 break;
439 schedule();
440 } while (!signal_pending(current));
441
442 finish_wait(&ctx->sqo_sq_wait, &wait);
443 }
444
io_sq_offload_create(struct io_ring_ctx * ctx,struct io_uring_params * p)445 __cold int io_sq_offload_create(struct io_ring_ctx *ctx,
446 struct io_uring_params *p)
447 {
448 int ret;
449
450 /* Retain compatibility with failing for an invalid attach attempt */
451 if ((ctx->flags & (IORING_SETUP_ATTACH_WQ | IORING_SETUP_SQPOLL)) ==
452 IORING_SETUP_ATTACH_WQ) {
453 CLASS(fd, f)(p->wq_fd);
454 if (fd_empty(f))
455 return -ENXIO;
456 if (!io_is_uring_fops(fd_file(f)))
457 return -EINVAL;
458 }
459 if (ctx->flags & IORING_SETUP_SQPOLL) {
460 struct task_struct *tsk;
461 struct io_sq_data *sqd;
462 bool attached;
463
464 ret = security_uring_sqpoll();
465 if (ret)
466 return ret;
467
468 sqd = io_get_sq_data(p, &attached);
469 if (IS_ERR(sqd)) {
470 ret = PTR_ERR(sqd);
471 goto err;
472 }
473
474 ctx->sq_creds = get_current_cred();
475 ctx->sq_data = sqd;
476 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
477 if (!ctx->sq_thread_idle)
478 ctx->sq_thread_idle = HZ;
479
480 io_sq_thread_park(sqd);
481 list_add(&ctx->sqd_list, &sqd->ctx_list);
482 io_sqd_update_thread_idle(sqd);
483 /* don't attach to a dying SQPOLL thread, would be racy */
484 ret = (attached && !sqd->thread) ? -ENXIO : 0;
485 io_sq_thread_unpark(sqd);
486
487 if (ret < 0)
488 goto err;
489 if (attached)
490 return 0;
491
492 if (p->flags & IORING_SETUP_SQ_AFF) {
493 cpumask_var_t allowed_mask;
494 int cpu = p->sq_thread_cpu;
495
496 ret = -EINVAL;
497 if (cpu >= nr_cpu_ids || !cpu_online(cpu))
498 goto err_sqpoll;
499 ret = -ENOMEM;
500 if (!alloc_cpumask_var(&allowed_mask, GFP_KERNEL))
501 goto err_sqpoll;
502 ret = -EINVAL;
503 cpuset_cpus_allowed(current, allowed_mask);
504 if (!cpumask_test_cpu(cpu, allowed_mask)) {
505 free_cpumask_var(allowed_mask);
506 goto err_sqpoll;
507 }
508 free_cpumask_var(allowed_mask);
509 sqd->sq_cpu = cpu;
510 } else {
511 sqd->sq_cpu = -1;
512 }
513
514 sqd->task_pid = current->pid;
515 sqd->task_tgid = current->tgid;
516 tsk = create_io_thread(io_sq_thread, sqd, NUMA_NO_NODE);
517 if (IS_ERR(tsk)) {
518 ret = PTR_ERR(tsk);
519 goto err_sqpoll;
520 }
521
522 mutex_lock(&sqd->lock);
523 rcu_assign_pointer(sqd->thread, tsk);
524 mutex_unlock(&sqd->lock);
525
526 get_task_struct(tsk);
527 ret = io_uring_alloc_task_context(tsk, ctx);
528 wake_up_new_task(tsk);
529 if (ret)
530 goto err;
531 } else if (p->flags & IORING_SETUP_SQ_AFF) {
532 /* Can't have SQ_AFF without SQPOLL */
533 ret = -EINVAL;
534 goto err;
535 }
536 return 0;
537 err_sqpoll:
538 complete(&ctx->sq_data->exited);
539 err:
540 io_sq_thread_finish(ctx);
541 return ret;
542 }
543
io_sqpoll_wq_cpu_affinity(struct io_ring_ctx * ctx,cpumask_var_t mask)544 __cold int io_sqpoll_wq_cpu_affinity(struct io_ring_ctx *ctx,
545 cpumask_var_t mask)
546 {
547 struct io_sq_data *sqd = ctx->sq_data;
548 int ret = -EINVAL;
549
550 if (sqd) {
551 struct task_struct *tsk;
552
553 io_sq_thread_park(sqd);
554 /* Don't set affinity for a dying thread */
555 tsk = sqpoll_task_locked(sqd);
556 if (tsk)
557 ret = io_wq_cpu_affinity(tsk->io_uring, mask);
558 io_sq_thread_unpark(sqd);
559 }
560
561 return ret;
562 }
563