xref: /linux/io_uring/sqpoll.c (revision 2fe3c78a2c26dd5ee811024a1b7d6cfb4d654319)
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Contains the core associated with submission side polling of the SQ
4  * ring, offloading submissions from the application to a kernel thread.
5  */
6 #include <linux/kernel.h>
7 #include <linux/errno.h>
8 #include <linux/file.h>
9 #include <linux/mm.h>
10 #include <linux/slab.h>
11 #include <linux/audit.h>
12 #include <linux/security.h>
13 #include <linux/cpuset.h>
14 #include <linux/io_uring.h>
15 
16 #include <uapi/linux/io_uring.h>
17 
18 #include "io_uring.h"
19 #include "napi.h"
20 #include "sqpoll.h"
21 
22 #define IORING_SQPOLL_CAP_ENTRIES_VALUE 8
23 #define IORING_TW_CAP_ENTRIES_VALUE	8
24 
25 enum {
26 	IO_SQ_THREAD_SHOULD_STOP = 0,
27 	IO_SQ_THREAD_SHOULD_PARK,
28 };
29 
30 void io_sq_thread_unpark(struct io_sq_data *sqd)
31 	__releases(&sqd->lock)
32 {
33 	WARN_ON_ONCE(sqd->thread == current);
34 
35 	/*
36 	 * Do the dance but not conditional clear_bit() because it'd race with
37 	 * other threads incrementing park_pending and setting the bit.
38 	 */
39 	clear_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
40 	if (atomic_dec_return(&sqd->park_pending))
41 		set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
42 	mutex_unlock(&sqd->lock);
43 }
44 
45 void io_sq_thread_park(struct io_sq_data *sqd)
46 	__acquires(&sqd->lock)
47 {
48 	WARN_ON_ONCE(data_race(sqd->thread) == current);
49 
50 	atomic_inc(&sqd->park_pending);
51 	set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
52 	mutex_lock(&sqd->lock);
53 	if (sqd->thread)
54 		wake_up_process(sqd->thread);
55 }
56 
57 void io_sq_thread_stop(struct io_sq_data *sqd)
58 {
59 	WARN_ON_ONCE(sqd->thread == current);
60 	WARN_ON_ONCE(test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state));
61 
62 	set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
63 	mutex_lock(&sqd->lock);
64 	if (sqd->thread)
65 		wake_up_process(sqd->thread);
66 	mutex_unlock(&sqd->lock);
67 	wait_for_completion(&sqd->exited);
68 }
69 
70 void io_put_sq_data(struct io_sq_data *sqd)
71 {
72 	if (refcount_dec_and_test(&sqd->refs)) {
73 		WARN_ON_ONCE(atomic_read(&sqd->park_pending));
74 
75 		io_sq_thread_stop(sqd);
76 		kfree(sqd);
77 	}
78 }
79 
80 static __cold void io_sqd_update_thread_idle(struct io_sq_data *sqd)
81 {
82 	struct io_ring_ctx *ctx;
83 	unsigned sq_thread_idle = 0;
84 
85 	list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
86 		sq_thread_idle = max(sq_thread_idle, ctx->sq_thread_idle);
87 	sqd->sq_thread_idle = sq_thread_idle;
88 }
89 
90 void io_sq_thread_finish(struct io_ring_ctx *ctx)
91 {
92 	struct io_sq_data *sqd = ctx->sq_data;
93 
94 	if (sqd) {
95 		io_sq_thread_park(sqd);
96 		list_del_init(&ctx->sqd_list);
97 		io_sqd_update_thread_idle(sqd);
98 		io_sq_thread_unpark(sqd);
99 
100 		io_put_sq_data(sqd);
101 		ctx->sq_data = NULL;
102 	}
103 }
104 
105 static struct io_sq_data *io_attach_sq_data(struct io_uring_params *p)
106 {
107 	struct io_ring_ctx *ctx_attach;
108 	struct io_sq_data *sqd;
109 	struct fd f;
110 
111 	f = fdget(p->wq_fd);
112 	if (!f.file)
113 		return ERR_PTR(-ENXIO);
114 	if (!io_is_uring_fops(f.file)) {
115 		fdput(f);
116 		return ERR_PTR(-EINVAL);
117 	}
118 
119 	ctx_attach = f.file->private_data;
120 	sqd = ctx_attach->sq_data;
121 	if (!sqd) {
122 		fdput(f);
123 		return ERR_PTR(-EINVAL);
124 	}
125 	if (sqd->task_tgid != current->tgid) {
126 		fdput(f);
127 		return ERR_PTR(-EPERM);
128 	}
129 
130 	refcount_inc(&sqd->refs);
131 	fdput(f);
132 	return sqd;
133 }
134 
135 static struct io_sq_data *io_get_sq_data(struct io_uring_params *p,
136 					 bool *attached)
137 {
138 	struct io_sq_data *sqd;
139 
140 	*attached = false;
141 	if (p->flags & IORING_SETUP_ATTACH_WQ) {
142 		sqd = io_attach_sq_data(p);
143 		if (!IS_ERR(sqd)) {
144 			*attached = true;
145 			return sqd;
146 		}
147 		/* fall through for EPERM case, setup new sqd/task */
148 		if (PTR_ERR(sqd) != -EPERM)
149 			return sqd;
150 	}
151 
152 	sqd = kzalloc(sizeof(*sqd), GFP_KERNEL);
153 	if (!sqd)
154 		return ERR_PTR(-ENOMEM);
155 
156 	atomic_set(&sqd->park_pending, 0);
157 	refcount_set(&sqd->refs, 1);
158 	INIT_LIST_HEAD(&sqd->ctx_list);
159 	mutex_init(&sqd->lock);
160 	init_waitqueue_head(&sqd->wait);
161 	init_completion(&sqd->exited);
162 	return sqd;
163 }
164 
165 static inline bool io_sqd_events_pending(struct io_sq_data *sqd)
166 {
167 	return READ_ONCE(sqd->state);
168 }
169 
170 static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries)
171 {
172 	unsigned int to_submit;
173 	int ret = 0;
174 
175 	to_submit = io_sqring_entries(ctx);
176 	/* if we're handling multiple rings, cap submit size for fairness */
177 	if (cap_entries && to_submit > IORING_SQPOLL_CAP_ENTRIES_VALUE)
178 		to_submit = IORING_SQPOLL_CAP_ENTRIES_VALUE;
179 
180 	if (to_submit || !wq_list_empty(&ctx->iopoll_list)) {
181 		const struct cred *creds = NULL;
182 
183 		if (ctx->sq_creds != current_cred())
184 			creds = override_creds(ctx->sq_creds);
185 
186 		mutex_lock(&ctx->uring_lock);
187 		if (!wq_list_empty(&ctx->iopoll_list))
188 			io_do_iopoll(ctx, true);
189 
190 		/*
191 		 * Don't submit if refs are dying, good for io_uring_register(),
192 		 * but also it is relied upon by io_ring_exit_work()
193 		 */
194 		if (to_submit && likely(!percpu_ref_is_dying(&ctx->refs)) &&
195 		    !(ctx->flags & IORING_SETUP_R_DISABLED))
196 			ret = io_submit_sqes(ctx, to_submit);
197 		mutex_unlock(&ctx->uring_lock);
198 
199 		if (io_napi(ctx))
200 			ret += io_napi_sqpoll_busy_poll(ctx);
201 
202 		if (to_submit && wq_has_sleeper(&ctx->sqo_sq_wait))
203 			wake_up(&ctx->sqo_sq_wait);
204 		if (creds)
205 			revert_creds(creds);
206 	}
207 
208 	return ret;
209 }
210 
211 static bool io_sqd_handle_event(struct io_sq_data *sqd)
212 {
213 	bool did_sig = false;
214 	struct ksignal ksig;
215 
216 	if (test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state) ||
217 	    signal_pending(current)) {
218 		mutex_unlock(&sqd->lock);
219 		if (signal_pending(current))
220 			did_sig = get_signal(&ksig);
221 		cond_resched();
222 		mutex_lock(&sqd->lock);
223 		sqd->sq_cpu = raw_smp_processor_id();
224 	}
225 	return did_sig || test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
226 }
227 
228 /*
229  * Run task_work, processing the retry_list first. The retry_list holds
230  * entries that we passed on in the previous run, if we had more task_work
231  * than we were asked to process. Newly queued task_work isn't run until the
232  * retry list has been fully processed.
233  */
234 static unsigned int io_sq_tw(struct llist_node **retry_list, int max_entries)
235 {
236 	struct io_uring_task *tctx = current->io_uring;
237 	unsigned int count = 0;
238 
239 	if (*retry_list) {
240 		*retry_list = io_handle_tw_list(*retry_list, &count, max_entries);
241 		if (count >= max_entries)
242 			goto out;
243 		max_entries -= count;
244 	}
245 	*retry_list = tctx_task_work_run(tctx, max_entries, &count);
246 out:
247 	if (task_work_pending(current))
248 		task_work_run();
249 	return count;
250 }
251 
252 static bool io_sq_tw_pending(struct llist_node *retry_list)
253 {
254 	struct io_uring_task *tctx = current->io_uring;
255 
256 	return retry_list || !llist_empty(&tctx->task_list);
257 }
258 
259 static void io_sq_update_worktime(struct io_sq_data *sqd, struct rusage *start)
260 {
261 	struct rusage end;
262 
263 	getrusage(current, RUSAGE_SELF, &end);
264 	end.ru_stime.tv_sec -= start->ru_stime.tv_sec;
265 	end.ru_stime.tv_usec -= start->ru_stime.tv_usec;
266 
267 	sqd->work_time += end.ru_stime.tv_usec + end.ru_stime.tv_sec * 1000000;
268 }
269 
270 static int io_sq_thread(void *data)
271 {
272 	struct llist_node *retry_list = NULL;
273 	struct io_sq_data *sqd = data;
274 	struct io_ring_ctx *ctx;
275 	struct rusage start;
276 	unsigned long timeout = 0;
277 	char buf[TASK_COMM_LEN];
278 	DEFINE_WAIT(wait);
279 
280 	/* offload context creation failed, just exit */
281 	if (!current->io_uring)
282 		goto err_out;
283 
284 	snprintf(buf, sizeof(buf), "iou-sqp-%d", sqd->task_pid);
285 	set_task_comm(current, buf);
286 
287 	/* reset to our pid after we've set task_comm, for fdinfo */
288 	sqd->task_pid = current->pid;
289 
290 	if (sqd->sq_cpu != -1) {
291 		set_cpus_allowed_ptr(current, cpumask_of(sqd->sq_cpu));
292 	} else {
293 		set_cpus_allowed_ptr(current, cpu_online_mask);
294 		sqd->sq_cpu = raw_smp_processor_id();
295 	}
296 
297 	/*
298 	 * Force audit context to get setup, in case we do prep side async
299 	 * operations that would trigger an audit call before any issue side
300 	 * audit has been done.
301 	 */
302 	audit_uring_entry(IORING_OP_NOP);
303 	audit_uring_exit(true, 0);
304 
305 	mutex_lock(&sqd->lock);
306 	while (1) {
307 		bool cap_entries, sqt_spin = false;
308 
309 		if (io_sqd_events_pending(sqd) || signal_pending(current)) {
310 			if (io_sqd_handle_event(sqd))
311 				break;
312 			timeout = jiffies + sqd->sq_thread_idle;
313 		}
314 
315 		cap_entries = !list_is_singular(&sqd->ctx_list);
316 		getrusage(current, RUSAGE_SELF, &start);
317 		list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
318 			int ret = __io_sq_thread(ctx, cap_entries);
319 
320 			if (!sqt_spin && (ret > 0 || !wq_list_empty(&ctx->iopoll_list)))
321 				sqt_spin = true;
322 		}
323 		if (io_sq_tw(&retry_list, IORING_TW_CAP_ENTRIES_VALUE))
324 			sqt_spin = true;
325 
326 		if (sqt_spin || !time_after(jiffies, timeout)) {
327 			if (sqt_spin) {
328 				io_sq_update_worktime(sqd, &start);
329 				timeout = jiffies + sqd->sq_thread_idle;
330 			}
331 			if (unlikely(need_resched())) {
332 				mutex_unlock(&sqd->lock);
333 				cond_resched();
334 				mutex_lock(&sqd->lock);
335 				sqd->sq_cpu = raw_smp_processor_id();
336 			}
337 			continue;
338 		}
339 
340 		prepare_to_wait(&sqd->wait, &wait, TASK_INTERRUPTIBLE);
341 		if (!io_sqd_events_pending(sqd) && !io_sq_tw_pending(retry_list)) {
342 			bool needs_sched = true;
343 
344 			list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
345 				atomic_or(IORING_SQ_NEED_WAKEUP,
346 						&ctx->rings->sq_flags);
347 				if ((ctx->flags & IORING_SETUP_IOPOLL) &&
348 				    !wq_list_empty(&ctx->iopoll_list)) {
349 					needs_sched = false;
350 					break;
351 				}
352 
353 				/*
354 				 * Ensure the store of the wakeup flag is not
355 				 * reordered with the load of the SQ tail
356 				 */
357 				smp_mb__after_atomic();
358 
359 				if (io_sqring_entries(ctx)) {
360 					needs_sched = false;
361 					break;
362 				}
363 			}
364 
365 			if (needs_sched) {
366 				mutex_unlock(&sqd->lock);
367 				schedule();
368 				mutex_lock(&sqd->lock);
369 				sqd->sq_cpu = raw_smp_processor_id();
370 			}
371 			list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
372 				atomic_andnot(IORING_SQ_NEED_WAKEUP,
373 						&ctx->rings->sq_flags);
374 		}
375 
376 		finish_wait(&sqd->wait, &wait);
377 		timeout = jiffies + sqd->sq_thread_idle;
378 	}
379 
380 	if (retry_list)
381 		io_sq_tw(&retry_list, UINT_MAX);
382 
383 	io_uring_cancel_generic(true, sqd);
384 	sqd->thread = NULL;
385 	list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
386 		atomic_or(IORING_SQ_NEED_WAKEUP, &ctx->rings->sq_flags);
387 	io_run_task_work();
388 	mutex_unlock(&sqd->lock);
389 err_out:
390 	complete(&sqd->exited);
391 	do_exit(0);
392 }
393 
394 void io_sqpoll_wait_sq(struct io_ring_ctx *ctx)
395 {
396 	DEFINE_WAIT(wait);
397 
398 	do {
399 		if (!io_sqring_full(ctx))
400 			break;
401 		prepare_to_wait(&ctx->sqo_sq_wait, &wait, TASK_INTERRUPTIBLE);
402 
403 		if (!io_sqring_full(ctx))
404 			break;
405 		schedule();
406 	} while (!signal_pending(current));
407 
408 	finish_wait(&ctx->sqo_sq_wait, &wait);
409 }
410 
411 __cold int io_sq_offload_create(struct io_ring_ctx *ctx,
412 				struct io_uring_params *p)
413 {
414 	int ret;
415 
416 	/* Retain compatibility with failing for an invalid attach attempt */
417 	if ((ctx->flags & (IORING_SETUP_ATTACH_WQ | IORING_SETUP_SQPOLL)) ==
418 				IORING_SETUP_ATTACH_WQ) {
419 		struct fd f;
420 
421 		f = fdget(p->wq_fd);
422 		if (!f.file)
423 			return -ENXIO;
424 		if (!io_is_uring_fops(f.file)) {
425 			fdput(f);
426 			return -EINVAL;
427 		}
428 		fdput(f);
429 	}
430 	if (ctx->flags & IORING_SETUP_SQPOLL) {
431 		struct task_struct *tsk;
432 		struct io_sq_data *sqd;
433 		bool attached;
434 
435 		ret = security_uring_sqpoll();
436 		if (ret)
437 			return ret;
438 
439 		sqd = io_get_sq_data(p, &attached);
440 		if (IS_ERR(sqd)) {
441 			ret = PTR_ERR(sqd);
442 			goto err;
443 		}
444 
445 		ctx->sq_creds = get_current_cred();
446 		ctx->sq_data = sqd;
447 		ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
448 		if (!ctx->sq_thread_idle)
449 			ctx->sq_thread_idle = HZ;
450 
451 		io_sq_thread_park(sqd);
452 		list_add(&ctx->sqd_list, &sqd->ctx_list);
453 		io_sqd_update_thread_idle(sqd);
454 		/* don't attach to a dying SQPOLL thread, would be racy */
455 		ret = (attached && !sqd->thread) ? -ENXIO : 0;
456 		io_sq_thread_unpark(sqd);
457 
458 		if (ret < 0)
459 			goto err;
460 		if (attached)
461 			return 0;
462 
463 		if (p->flags & IORING_SETUP_SQ_AFF) {
464 			struct cpumask allowed_mask;
465 			int cpu = p->sq_thread_cpu;
466 
467 			ret = -EINVAL;
468 			cpuset_cpus_allowed(current, &allowed_mask);
469 			if (!cpumask_test_cpu(cpu, &allowed_mask))
470 				goto err_sqpoll;
471 			sqd->sq_cpu = cpu;
472 		} else {
473 			sqd->sq_cpu = -1;
474 		}
475 
476 		sqd->task_pid = current->pid;
477 		sqd->task_tgid = current->tgid;
478 		tsk = create_io_thread(io_sq_thread, sqd, NUMA_NO_NODE);
479 		if (IS_ERR(tsk)) {
480 			ret = PTR_ERR(tsk);
481 			goto err_sqpoll;
482 		}
483 
484 		sqd->thread = tsk;
485 		ret = io_uring_alloc_task_context(tsk, ctx);
486 		wake_up_new_task(tsk);
487 		if (ret)
488 			goto err;
489 	} else if (p->flags & IORING_SETUP_SQ_AFF) {
490 		/* Can't have SQ_AFF without SQPOLL */
491 		ret = -EINVAL;
492 		goto err;
493 	}
494 
495 	return 0;
496 err_sqpoll:
497 	complete(&ctx->sq_data->exited);
498 err:
499 	io_sq_thread_finish(ctx);
500 	return ret;
501 }
502 
503 __cold int io_sqpoll_wq_cpu_affinity(struct io_ring_ctx *ctx,
504 				     cpumask_var_t mask)
505 {
506 	struct io_sq_data *sqd = ctx->sq_data;
507 	int ret = -EINVAL;
508 
509 	if (sqd) {
510 		io_sq_thread_park(sqd);
511 		/* Don't set affinity for a dying thread */
512 		if (sqd->thread)
513 			ret = io_wq_cpu_affinity(sqd->thread->io_uring, mask);
514 		io_sq_thread_unpark(sqd);
515 	}
516 
517 	return ret;
518 }
519