1 // SPDX-License-Identifier: GPL-2.0
2 /*
3 * Basic worker thread pool for io_uring
4 *
5 * Copyright (C) 2019 Jens Axboe
6 *
7 */
8 #include <linux/kernel.h>
9 #include <linux/init.h>
10 #include <linux/errno.h>
11 #include <linux/sched/signal.h>
12 #include <linux/percpu.h>
13 #include <linux/slab.h>
14 #include <linux/rculist_nulls.h>
15 #include <linux/cpu.h>
16 #include <linux/cpuset.h>
17 #include <linux/task_work.h>
18 #include <linux/audit.h>
19 #include <linux/mmu_context.h>
20 #include <linux/sched/sysctl.h>
21 #include <uapi/linux/io_uring.h>
22
23 #include "io-wq.h"
24 #include "slist.h"
25 #include "io_uring.h"
26
27 #define WORKER_IDLE_TIMEOUT (5 * HZ)
28 #define WORKER_INIT_LIMIT 3
29
30 enum {
31 IO_WORKER_F_UP = 0, /* up and active */
32 IO_WORKER_F_RUNNING = 1, /* account as running */
33 IO_WORKER_F_FREE = 2, /* worker on free list */
34 };
35
36 enum {
37 IO_WQ_BIT_EXIT = 0, /* wq exiting */
38 IO_WQ_BIT_EXIT_ON_IDLE = 1, /* allow all workers to exit on idle */
39 };
40
41 enum {
42 IO_ACCT_STALLED_BIT = 0, /* stalled on hash */
43 };
44
45 /*
46 * One for each thread in a wq pool
47 */
48 struct io_worker {
49 refcount_t ref;
50 unsigned long flags;
51 struct hlist_nulls_node nulls_node;
52 struct list_head all_list;
53 struct task_struct *task;
54 struct io_wq *wq;
55 struct io_wq_acct *acct;
56
57 struct io_wq_work *cur_work;
58 raw_spinlock_t lock;
59
60 struct completion ref_done;
61
62 unsigned long create_state;
63 struct callback_head create_work;
64 int init_retries;
65
66 union {
67 struct rcu_head rcu;
68 struct delayed_work work;
69 };
70 };
71
72 #if BITS_PER_LONG == 64
73 #define IO_WQ_HASH_ORDER 6
74 #else
75 #define IO_WQ_HASH_ORDER 5
76 #endif
77
78 #define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER)
79
80 struct io_wq_acct {
81 /**
82 * Protects access to the worker lists.
83 */
84 raw_spinlock_t workers_lock;
85
86 unsigned nr_workers;
87 unsigned max_workers;
88 atomic_t nr_running;
89
90 /**
91 * The list of free workers. Protected by #workers_lock
92 * (write) and RCU (read).
93 */
94 struct hlist_nulls_head free_list;
95
96 /**
97 * The list of all workers. Protected by #workers_lock
98 * (write) and RCU (read).
99 */
100 struct list_head all_list;
101
102 raw_spinlock_t lock;
103 struct io_wq_work_list work_list;
104 unsigned long flags;
105 };
106
107 enum {
108 IO_WQ_ACCT_BOUND,
109 IO_WQ_ACCT_UNBOUND,
110 IO_WQ_ACCT_NR,
111 };
112
113 /*
114 * Per io_wq state
115 */
116 struct io_wq {
117 unsigned long state;
118
119 struct io_wq_hash *hash;
120
121 atomic_t worker_refs;
122 struct completion worker_done;
123
124 struct hlist_node cpuhp_node;
125
126 struct task_struct *task;
127
128 struct io_wq_acct acct[IO_WQ_ACCT_NR];
129
130 struct wait_queue_entry wait;
131
132 struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
133
134 cpumask_var_t cpu_mask;
135 };
136
137 static enum cpuhp_state io_wq_online;
138
139 struct io_cb_cancel_data {
140 work_cancel_fn *fn;
141 void *data;
142 int nr_running;
143 int nr_pending;
144 bool cancel_all;
145 };
146
147 static bool create_io_worker(struct io_wq *wq, struct io_wq_acct *acct);
148 static void io_wq_dec_running(struct io_worker *worker);
149 static bool io_acct_cancel_pending_work(struct io_wq *wq,
150 struct io_wq_acct *acct,
151 struct io_cb_cancel_data *match);
152 static void create_worker_cb(struct callback_head *cb);
153 static void io_wq_cancel_tw_create(struct io_wq *wq);
154
__io_get_work_hash(unsigned int work_flags)155 static inline unsigned int __io_get_work_hash(unsigned int work_flags)
156 {
157 return work_flags >> IO_WQ_HASH_SHIFT;
158 }
159
io_get_work_hash(struct io_wq_work * work)160 static inline unsigned int io_get_work_hash(struct io_wq_work *work)
161 {
162 return __io_get_work_hash(atomic_read(&work->flags));
163 }
164
io_worker_get(struct io_worker * worker)165 static bool io_worker_get(struct io_worker *worker)
166 {
167 return refcount_inc_not_zero(&worker->ref);
168 }
169
io_worker_release(struct io_worker * worker)170 static void io_worker_release(struct io_worker *worker)
171 {
172 if (refcount_dec_and_test(&worker->ref))
173 complete(&worker->ref_done);
174 }
175
io_get_acct(struct io_wq * wq,bool bound)176 static inline struct io_wq_acct *io_get_acct(struct io_wq *wq, bool bound)
177 {
178 return &wq->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND];
179 }
180
io_work_get_acct(struct io_wq * wq,unsigned int work_flags)181 static inline struct io_wq_acct *io_work_get_acct(struct io_wq *wq,
182 unsigned int work_flags)
183 {
184 return io_get_acct(wq, !(work_flags & IO_WQ_WORK_UNBOUND));
185 }
186
io_wq_get_acct(struct io_worker * worker)187 static inline struct io_wq_acct *io_wq_get_acct(struct io_worker *worker)
188 {
189 return worker->acct;
190 }
191
io_worker_ref_put(struct io_wq * wq)192 static void io_worker_ref_put(struct io_wq *wq)
193 {
194 if (atomic_dec_and_test(&wq->worker_refs))
195 complete(&wq->worker_done);
196 }
197
io_wq_worker_stopped(void)198 bool io_wq_worker_stopped(void)
199 {
200 struct io_worker *worker = current->worker_private;
201
202 if (WARN_ON_ONCE(!io_wq_current_is_worker()))
203 return true;
204
205 return test_bit(IO_WQ_BIT_EXIT, &worker->wq->state);
206 }
207
io_worker_cancel_cb(struct io_worker * worker)208 static void io_worker_cancel_cb(struct io_worker *worker)
209 {
210 struct io_wq_acct *acct = io_wq_get_acct(worker);
211 struct io_wq *wq = worker->wq;
212
213 atomic_dec(&acct->nr_running);
214 raw_spin_lock(&acct->workers_lock);
215 acct->nr_workers--;
216 raw_spin_unlock(&acct->workers_lock);
217 io_worker_ref_put(wq);
218 clear_bit_unlock(0, &worker->create_state);
219 io_worker_release(worker);
220 }
221
io_task_worker_match(struct callback_head * cb,void * data)222 static bool io_task_worker_match(struct callback_head *cb, void *data)
223 {
224 struct io_worker *worker;
225
226 if (cb->func != create_worker_cb)
227 return false;
228 worker = container_of(cb, struct io_worker, create_work);
229 return worker == data;
230 }
231
io_worker_exit(struct io_worker * worker)232 static void io_worker_exit(struct io_worker *worker)
233 {
234 struct io_wq *wq = worker->wq;
235 struct io_wq_acct *acct = io_wq_get_acct(worker);
236
237 while (1) {
238 struct callback_head *cb = task_work_cancel_match(wq->task,
239 io_task_worker_match, worker);
240
241 if (!cb)
242 break;
243 io_worker_cancel_cb(worker);
244 }
245
246 io_worker_release(worker);
247 wait_for_completion(&worker->ref_done);
248
249 raw_spin_lock(&acct->workers_lock);
250 if (test_bit(IO_WORKER_F_FREE, &worker->flags))
251 hlist_nulls_del_rcu(&worker->nulls_node);
252 list_del_rcu(&worker->all_list);
253 raw_spin_unlock(&acct->workers_lock);
254 io_wq_dec_running(worker);
255 /*
256 * this worker is a goner, clear ->worker_private to avoid any
257 * inc/dec running calls that could happen as part of exit from
258 * touching 'worker'.
259 */
260 current->worker_private = NULL;
261
262 kfree_rcu(worker, rcu);
263 io_worker_ref_put(wq);
264 do_exit(0);
265 }
266
__io_acct_run_queue(struct io_wq_acct * acct)267 static inline bool __io_acct_run_queue(struct io_wq_acct *acct)
268 {
269 return !test_bit(IO_ACCT_STALLED_BIT, &acct->flags) &&
270 !wq_list_empty(&acct->work_list);
271 }
272
273 /*
274 * If there's work to do, returns true with acct->lock acquired. If not,
275 * returns false with no lock held.
276 */
io_acct_run_queue(struct io_wq_acct * acct)277 static inline bool io_acct_run_queue(struct io_wq_acct *acct)
278 __acquires(&acct->lock)
279 {
280 raw_spin_lock(&acct->lock);
281 if (__io_acct_run_queue(acct))
282 return true;
283
284 raw_spin_unlock(&acct->lock);
285 return false;
286 }
287
288 /*
289 * Check head of free list for an available worker. If one isn't available,
290 * caller must create one.
291 */
io_acct_activate_free_worker(struct io_wq_acct * acct)292 static bool io_acct_activate_free_worker(struct io_wq_acct *acct)
293 __must_hold(RCU)
294 {
295 struct hlist_nulls_node *n;
296 struct io_worker *worker;
297
298 /*
299 * Iterate free_list and see if we can find an idle worker to
300 * activate. If a given worker is on the free_list but in the process
301 * of exiting, keep trying.
302 */
303 hlist_nulls_for_each_entry_rcu(worker, n, &acct->free_list, nulls_node) {
304 if (!io_worker_get(worker))
305 continue;
306 /*
307 * If the worker is already running, it's either already
308 * starting work or finishing work. In either case, if it does
309 * to go sleep, we'll kick off a new task for this work anyway.
310 */
311 wake_up_process(worker->task);
312 io_worker_release(worker);
313 return true;
314 }
315
316 return false;
317 }
318
319 /*
320 * We need a worker. If we find a free one, we're good. If not, and we're
321 * below the max number of workers, create one.
322 */
io_wq_create_worker(struct io_wq * wq,struct io_wq_acct * acct)323 static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct)
324 {
325 /*
326 * Most likely an attempt to queue unbounded work on an io_wq that
327 * wasn't setup with any unbounded workers.
328 */
329 if (unlikely(!acct->max_workers))
330 pr_warn_once("io-wq is not configured for unbound workers");
331
332 raw_spin_lock(&acct->workers_lock);
333 if (acct->nr_workers >= acct->max_workers) {
334 raw_spin_unlock(&acct->workers_lock);
335 return true;
336 }
337 acct->nr_workers++;
338 raw_spin_unlock(&acct->workers_lock);
339 atomic_inc(&acct->nr_running);
340 atomic_inc(&wq->worker_refs);
341 return create_io_worker(wq, acct);
342 }
343
io_wq_inc_running(struct io_worker * worker)344 static void io_wq_inc_running(struct io_worker *worker)
345 {
346 struct io_wq_acct *acct = io_wq_get_acct(worker);
347
348 atomic_inc(&acct->nr_running);
349 }
350
create_worker_cb(struct callback_head * cb)351 static void create_worker_cb(struct callback_head *cb)
352 {
353 struct io_worker *worker;
354 struct io_wq *wq;
355
356 struct io_wq_acct *acct;
357 bool activated_free_worker, do_create = false;
358
359 worker = container_of(cb, struct io_worker, create_work);
360 wq = worker->wq;
361 acct = worker->acct;
362
363 rcu_read_lock();
364 activated_free_worker = io_acct_activate_free_worker(acct);
365 rcu_read_unlock();
366 if (activated_free_worker)
367 goto no_need_create;
368
369 raw_spin_lock(&acct->workers_lock);
370
371 if (acct->nr_workers < acct->max_workers) {
372 acct->nr_workers++;
373 do_create = true;
374 }
375 raw_spin_unlock(&acct->workers_lock);
376 if (do_create) {
377 create_io_worker(wq, acct);
378 } else {
379 no_need_create:
380 atomic_dec(&acct->nr_running);
381 io_worker_ref_put(wq);
382 }
383 clear_bit_unlock(0, &worker->create_state);
384 io_worker_release(worker);
385 }
386
io_queue_worker_create(struct io_worker * worker,struct io_wq_acct * acct,task_work_func_t func)387 static bool io_queue_worker_create(struct io_worker *worker,
388 struct io_wq_acct *acct,
389 task_work_func_t func)
390 {
391 struct io_wq *wq = worker->wq;
392
393 /* raced with exit, just ignore create call */
394 if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
395 goto fail;
396 if (!io_worker_get(worker))
397 goto fail;
398 /*
399 * create_state manages ownership of create_work/index. We should
400 * only need one entry per worker, as the worker going to sleep
401 * will trigger the condition, and waking will clear it once it
402 * runs the task_work.
403 */
404 if (test_bit(0, &worker->create_state) ||
405 test_and_set_bit_lock(0, &worker->create_state))
406 goto fail_release;
407
408 atomic_inc(&wq->worker_refs);
409 init_task_work(&worker->create_work, func);
410 if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) {
411 /*
412 * EXIT may have been set after checking it above, check after
413 * adding the task_work and remove any creation item if it is
414 * now set. wq exit does that too, but we can have added this
415 * work item after we canceled in io_wq_exit_workers().
416 */
417 if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
418 io_wq_cancel_tw_create(wq);
419 io_worker_ref_put(wq);
420 return true;
421 }
422 io_worker_ref_put(wq);
423 clear_bit_unlock(0, &worker->create_state);
424 fail_release:
425 io_worker_release(worker);
426 fail:
427 atomic_dec(&acct->nr_running);
428 io_worker_ref_put(wq);
429 return false;
430 }
431
432 /* Defer if current and next work are both hashed to the same chain */
io_wq_hash_defer(struct io_wq_work * work,struct io_wq_acct * acct)433 static bool io_wq_hash_defer(struct io_wq_work *work, struct io_wq_acct *acct)
434 {
435 unsigned int hash, work_flags;
436 struct io_wq_work *next;
437
438 lockdep_assert_held(&acct->lock);
439
440 work_flags = atomic_read(&work->flags);
441 if (!__io_wq_is_hashed(work_flags))
442 return false;
443
444 /* should not happen, io_acct_run_queue() said we had work */
445 if (wq_list_empty(&acct->work_list))
446 return true;
447
448 hash = __io_get_work_hash(work_flags);
449 next = container_of(acct->work_list.first, struct io_wq_work, list);
450 work_flags = atomic_read(&next->flags);
451 if (!__io_wq_is_hashed(work_flags))
452 return false;
453 return hash == __io_get_work_hash(work_flags);
454 }
455
io_wq_dec_running(struct io_worker * worker)456 static void io_wq_dec_running(struct io_worker *worker)
457 {
458 struct io_wq_acct *acct = io_wq_get_acct(worker);
459 struct io_wq *wq = worker->wq;
460
461 if (!test_bit(IO_WORKER_F_UP, &worker->flags))
462 return;
463
464 if (!atomic_dec_and_test(&acct->nr_running))
465 return;
466 if (!worker->cur_work)
467 return;
468 if (!io_acct_run_queue(acct))
469 return;
470 if (io_wq_hash_defer(worker->cur_work, acct)) {
471 raw_spin_unlock(&acct->lock);
472 return;
473 }
474
475 raw_spin_unlock(&acct->lock);
476 atomic_inc(&acct->nr_running);
477 atomic_inc(&wq->worker_refs);
478 io_queue_worker_create(worker, acct, create_worker_cb);
479 }
480
481 /*
482 * Worker will start processing some work. Move it to the busy list, if
483 * it's currently on the freelist
484 */
__io_worker_busy(struct io_wq_acct * acct,struct io_worker * worker)485 static void __io_worker_busy(struct io_wq_acct *acct, struct io_worker *worker)
486 {
487 if (test_bit(IO_WORKER_F_FREE, &worker->flags)) {
488 clear_bit(IO_WORKER_F_FREE, &worker->flags);
489 raw_spin_lock(&acct->workers_lock);
490 hlist_nulls_del_init_rcu(&worker->nulls_node);
491 raw_spin_unlock(&acct->workers_lock);
492 }
493 }
494
495 /*
496 * No work, worker going to sleep. Move to freelist.
497 */
__io_worker_idle(struct io_wq_acct * acct,struct io_worker * worker)498 static void __io_worker_idle(struct io_wq_acct *acct, struct io_worker *worker)
499 __must_hold(acct->workers_lock)
500 {
501 if (!test_bit(IO_WORKER_F_FREE, &worker->flags)) {
502 set_bit(IO_WORKER_F_FREE, &worker->flags);
503 hlist_nulls_add_head_rcu(&worker->nulls_node, &acct->free_list);
504 }
505 }
506
io_wait_on_hash(struct io_wq * wq,unsigned int hash)507 static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash)
508 {
509 bool ret = false;
510
511 spin_lock_irq(&wq->hash->wait.lock);
512 if (list_empty(&wq->wait.entry)) {
513 __add_wait_queue(&wq->hash->wait, &wq->wait);
514 if (!test_bit(hash, &wq->hash->map)) {
515 __set_current_state(TASK_RUNNING);
516 list_del_init(&wq->wait.entry);
517 ret = true;
518 }
519 }
520 spin_unlock_irq(&wq->hash->wait.lock);
521 return ret;
522 }
523
io_get_next_work(struct io_wq_acct * acct,struct io_wq * wq)524 static struct io_wq_work *io_get_next_work(struct io_wq_acct *acct,
525 struct io_wq *wq)
526 __must_hold(acct->lock)
527 {
528 struct io_wq_work_node *node, *prev;
529 struct io_wq_work *work, *tail;
530 unsigned int stall_hash = -1U;
531
532 wq_list_for_each(node, prev, &acct->work_list) {
533 unsigned int work_flags;
534 unsigned int hash;
535
536 work = container_of(node, struct io_wq_work, list);
537
538 /* not hashed, can run anytime */
539 work_flags = atomic_read(&work->flags);
540 if (!__io_wq_is_hashed(work_flags)) {
541 wq_list_del(&acct->work_list, node, prev);
542 return work;
543 }
544
545 hash = __io_get_work_hash(work_flags);
546 /* all items with this hash lie in [work, tail] */
547 tail = wq->hash_tail[hash];
548
549 /* hashed, can run if not already running */
550 if (!test_and_set_bit(hash, &wq->hash->map)) {
551 wq->hash_tail[hash] = NULL;
552 wq_list_cut(&acct->work_list, &tail->list, prev);
553 return work;
554 }
555 if (stall_hash == -1U)
556 stall_hash = hash;
557 /* fast forward to a next hash, for-each will fix up @prev */
558 node = &tail->list;
559 }
560
561 if (stall_hash != -1U) {
562 bool unstalled;
563
564 /*
565 * Set this before dropping the lock to avoid racing with new
566 * work being added and clearing the stalled bit.
567 */
568 set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
569 raw_spin_unlock(&acct->lock);
570 unstalled = io_wait_on_hash(wq, stall_hash);
571 raw_spin_lock(&acct->lock);
572 if (unstalled) {
573 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
574 if (wq_has_sleeper(&wq->hash->wait))
575 wake_up(&wq->hash->wait);
576 }
577 }
578
579 return NULL;
580 }
581
io_assign_current_work(struct io_worker * worker,struct io_wq_work * work)582 static void io_assign_current_work(struct io_worker *worker,
583 struct io_wq_work *work)
584 {
585 if (work) {
586 io_run_task_work();
587 cond_resched();
588 }
589
590 raw_spin_lock(&worker->lock);
591 worker->cur_work = work;
592 raw_spin_unlock(&worker->lock);
593 }
594
595 /*
596 * Called with acct->lock held, drops it before returning
597 */
io_worker_handle_work(struct io_wq_acct * acct,struct io_worker * worker)598 static void io_worker_handle_work(struct io_wq_acct *acct,
599 struct io_worker *worker)
600 __releases(&acct->lock)
601 {
602 struct io_wq *wq = worker->wq;
603
604 do {
605 bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
606 struct io_wq_work *work;
607
608 /*
609 * If we got some work, mark us as busy. If we didn't, but
610 * the list isn't empty, it means we stalled on hashed work.
611 * Mark us stalled so we don't keep looking for work when we
612 * can't make progress, any work completion or insertion will
613 * clear the stalled flag.
614 */
615 work = io_get_next_work(acct, wq);
616 if (work) {
617 /*
618 * Make sure cancelation can find this, even before
619 * it becomes the active work. That avoids a window
620 * where the work has been removed from our general
621 * work list, but isn't yet discoverable as the
622 * current work item for this worker.
623 */
624 raw_spin_lock(&worker->lock);
625 worker->cur_work = work;
626 raw_spin_unlock(&worker->lock);
627 }
628
629 raw_spin_unlock(&acct->lock);
630
631 if (!work)
632 break;
633
634 __io_worker_busy(acct, worker);
635
636 io_assign_current_work(worker, work);
637 __set_current_state(TASK_RUNNING);
638
639 /* handle a whole dependent link */
640 do {
641 struct io_wq_work *next_hashed, *linked;
642 unsigned int work_flags = atomic_read(&work->flags);
643 unsigned int hash = __io_wq_is_hashed(work_flags)
644 ? __io_get_work_hash(work_flags)
645 : -1U;
646
647 next_hashed = wq_next_work(work);
648
649 if (do_kill &&
650 (work_flags & IO_WQ_WORK_UNBOUND))
651 atomic_or(IO_WQ_WORK_CANCEL, &work->flags);
652 io_wq_submit_work(work);
653 io_assign_current_work(worker, NULL);
654
655 linked = io_wq_free_work(work);
656 work = next_hashed;
657 if (!work && linked && !io_wq_is_hashed(linked)) {
658 work = linked;
659 linked = NULL;
660 }
661 io_assign_current_work(worker, work);
662 if (linked)
663 io_wq_enqueue(wq, linked);
664
665 if (hash != -1U && !next_hashed) {
666 /* serialize hash clear with wake_up() */
667 spin_lock_irq(&wq->hash->wait.lock);
668 clear_bit(hash, &wq->hash->map);
669 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
670 spin_unlock_irq(&wq->hash->wait.lock);
671 if (wq_has_sleeper(&wq->hash->wait))
672 wake_up(&wq->hash->wait);
673 }
674 } while (work);
675
676 if (!__io_acct_run_queue(acct))
677 break;
678 raw_spin_lock(&acct->lock);
679 } while (1);
680 }
681
io_wq_worker(void * data)682 static int io_wq_worker(void *data)
683 {
684 struct io_worker *worker = data;
685 struct io_wq_acct *acct = io_wq_get_acct(worker);
686 struct io_wq *wq = worker->wq;
687 bool exit_mask = false, last_timeout = false;
688 char buf[TASK_COMM_LEN] = {};
689
690 set_mask_bits(&worker->flags, 0,
691 BIT(IO_WORKER_F_UP) | BIT(IO_WORKER_F_RUNNING));
692
693 snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
694 set_task_comm(current, buf);
695
696 while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
697 long ret;
698
699 set_current_state(TASK_INTERRUPTIBLE);
700
701 /*
702 * If we have work to do, io_acct_run_queue() returns with
703 * the acct->lock held. If not, it will drop it.
704 */
705 while (io_acct_run_queue(acct))
706 io_worker_handle_work(acct, worker);
707
708 raw_spin_lock(&acct->workers_lock);
709 /*
710 * Last sleep timed out. Exit if we're not the last worker,
711 * or if someone modified our affinity. If wq is marked
712 * idle-exit, drop the worker as well. This is used to avoid
713 * keeping io-wq workers around for tasks that no longer have
714 * any active io_uring instances.
715 */
716 if ((last_timeout && (exit_mask || acct->nr_workers > 1)) ||
717 test_bit(IO_WQ_BIT_EXIT_ON_IDLE, &wq->state)) {
718 acct->nr_workers--;
719 raw_spin_unlock(&acct->workers_lock);
720 __set_current_state(TASK_RUNNING);
721 break;
722 }
723 last_timeout = false;
724 __io_worker_idle(acct, worker);
725 raw_spin_unlock(&acct->workers_lock);
726 if (io_run_task_work())
727 continue;
728 ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
729 if (signal_pending(current)) {
730 struct ksignal ksig;
731
732 if (!get_signal(&ksig))
733 continue;
734 break;
735 }
736 if (!ret) {
737 last_timeout = true;
738 exit_mask = !cpumask_test_cpu(raw_smp_processor_id(),
739 wq->cpu_mask);
740 }
741 }
742
743 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) && io_acct_run_queue(acct))
744 io_worker_handle_work(acct, worker);
745
746 io_worker_exit(worker);
747 return 0;
748 }
749
750 /*
751 * Called when a worker is scheduled in. Mark us as currently running.
752 */
io_wq_worker_running(struct task_struct * tsk)753 void io_wq_worker_running(struct task_struct *tsk)
754 {
755 struct io_worker *worker = tsk->worker_private;
756
757 if (!worker)
758 return;
759 if (!test_bit(IO_WORKER_F_UP, &worker->flags))
760 return;
761 if (test_bit(IO_WORKER_F_RUNNING, &worker->flags))
762 return;
763 set_bit(IO_WORKER_F_RUNNING, &worker->flags);
764 io_wq_inc_running(worker);
765 }
766
767 /*
768 * Called when worker is going to sleep. If there are no workers currently
769 * running and we have work pending, wake up a free one or create a new one.
770 */
io_wq_worker_sleeping(struct task_struct * tsk)771 void io_wq_worker_sleeping(struct task_struct *tsk)
772 {
773 struct io_worker *worker = tsk->worker_private;
774
775 if (!worker)
776 return;
777 if (!test_bit(IO_WORKER_F_UP, &worker->flags))
778 return;
779 if (!test_bit(IO_WORKER_F_RUNNING, &worker->flags))
780 return;
781
782 clear_bit(IO_WORKER_F_RUNNING, &worker->flags);
783 io_wq_dec_running(worker);
784 }
785
io_init_new_worker(struct io_wq * wq,struct io_wq_acct * acct,struct io_worker * worker,struct task_struct * tsk)786 static void io_init_new_worker(struct io_wq *wq, struct io_wq_acct *acct, struct io_worker *worker,
787 struct task_struct *tsk)
788 {
789 tsk->worker_private = worker;
790 worker->task = tsk;
791 set_cpus_allowed_ptr(tsk, wq->cpu_mask);
792
793 raw_spin_lock(&acct->workers_lock);
794 hlist_nulls_add_head_rcu(&worker->nulls_node, &acct->free_list);
795 list_add_tail_rcu(&worker->all_list, &acct->all_list);
796 set_bit(IO_WORKER_F_FREE, &worker->flags);
797 raw_spin_unlock(&acct->workers_lock);
798 wake_up_new_task(tsk);
799 }
800
io_wq_work_match_all(struct io_wq_work * work,void * data)801 static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
802 {
803 return true;
804 }
805
io_should_retry_thread(struct io_worker * worker,long err)806 static inline bool io_should_retry_thread(struct io_worker *worker, long err)
807 {
808 /*
809 * Prevent perpetual task_work retry, if the task (or its group) is
810 * exiting.
811 */
812 if (fatal_signal_pending(current))
813 return false;
814
815 worker->init_retries++;
816 switch (err) {
817 case -EAGAIN:
818 return worker->init_retries <= WORKER_INIT_LIMIT;
819 /* Analogous to a fork() syscall, always retry on a restartable error */
820 case -ERESTARTSYS:
821 case -ERESTARTNOINTR:
822 case -ERESTARTNOHAND:
823 return true;
824 default:
825 return false;
826 }
827 }
828
queue_create_worker_retry(struct io_worker * worker)829 static void queue_create_worker_retry(struct io_worker *worker)
830 {
831 /*
832 * We only bother retrying because there's a chance that the
833 * failure to create a worker is due to some temporary condition
834 * in the forking task (e.g. outstanding signal); give the task
835 * some time to clear that condition.
836 */
837 schedule_delayed_work(&worker->work,
838 msecs_to_jiffies(worker->init_retries * 5));
839 }
840
create_worker_cont(struct callback_head * cb)841 static void create_worker_cont(struct callback_head *cb)
842 {
843 struct io_worker *worker;
844 struct task_struct *tsk;
845 struct io_wq *wq;
846 struct io_wq_acct *acct;
847
848 worker = container_of(cb, struct io_worker, create_work);
849 clear_bit_unlock(0, &worker->create_state);
850 wq = worker->wq;
851 acct = io_wq_get_acct(worker);
852 tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE);
853 if (!IS_ERR(tsk)) {
854 io_init_new_worker(wq, acct, worker, tsk);
855 io_worker_release(worker);
856 return;
857 } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) {
858 atomic_dec(&acct->nr_running);
859 raw_spin_lock(&acct->workers_lock);
860 acct->nr_workers--;
861 if (!acct->nr_workers) {
862 struct io_cb_cancel_data match = {
863 .fn = io_wq_work_match_all,
864 .cancel_all = true,
865 };
866
867 raw_spin_unlock(&acct->workers_lock);
868 while (io_acct_cancel_pending_work(wq, acct, &match))
869 ;
870 } else {
871 raw_spin_unlock(&acct->workers_lock);
872 }
873 io_worker_ref_put(wq);
874 kfree(worker);
875 return;
876 }
877
878 /* re-create attempts grab a new worker ref, drop the existing one */
879 io_worker_release(worker);
880 queue_create_worker_retry(worker);
881 }
882
io_workqueue_create(struct work_struct * work)883 static void io_workqueue_create(struct work_struct *work)
884 {
885 struct io_worker *worker = container_of(work, struct io_worker,
886 work.work);
887 struct io_wq_acct *acct = io_wq_get_acct(worker);
888
889 if (!io_queue_worker_create(worker, acct, create_worker_cont))
890 kfree(worker);
891 }
892
create_io_worker(struct io_wq * wq,struct io_wq_acct * acct)893 static bool create_io_worker(struct io_wq *wq, struct io_wq_acct *acct)
894 {
895 struct io_worker *worker;
896 struct task_struct *tsk;
897
898 __set_current_state(TASK_RUNNING);
899
900 worker = kzalloc_obj(*worker);
901 if (!worker) {
902 fail:
903 atomic_dec(&acct->nr_running);
904 raw_spin_lock(&acct->workers_lock);
905 acct->nr_workers--;
906 raw_spin_unlock(&acct->workers_lock);
907 io_worker_ref_put(wq);
908 return false;
909 }
910
911 refcount_set(&worker->ref, 1);
912 worker->wq = wq;
913 worker->acct = acct;
914 raw_spin_lock_init(&worker->lock);
915 init_completion(&worker->ref_done);
916
917 tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE);
918 if (!IS_ERR(tsk)) {
919 io_init_new_worker(wq, acct, worker, tsk);
920 } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) {
921 kfree(worker);
922 goto fail;
923 } else {
924 INIT_DELAYED_WORK(&worker->work, io_workqueue_create);
925 queue_create_worker_retry(worker);
926 }
927
928 return true;
929 }
930
931 /*
932 * Iterate the passed in list and call the specific function for each
933 * worker that isn't exiting
934 */
io_acct_for_each_worker(struct io_wq_acct * acct,bool (* func)(struct io_worker *,void *),void * data)935 static bool io_acct_for_each_worker(struct io_wq_acct *acct,
936 bool (*func)(struct io_worker *, void *),
937 void *data)
938 {
939 struct io_worker *worker;
940 bool ret = false;
941
942 list_for_each_entry_rcu(worker, &acct->all_list, all_list) {
943 if (io_worker_get(worker)) {
944 /* no task if node is/was offline */
945 if (worker->task)
946 ret = func(worker, data);
947 io_worker_release(worker);
948 if (ret)
949 break;
950 }
951 }
952
953 return ret;
954 }
955
io_wq_for_each_worker(struct io_wq * wq,bool (* func)(struct io_worker *,void *),void * data)956 static void io_wq_for_each_worker(struct io_wq *wq,
957 bool (*func)(struct io_worker *, void *),
958 void *data)
959 {
960 for (int i = 0; i < IO_WQ_ACCT_NR; i++)
961 if (io_acct_for_each_worker(&wq->acct[i], func, data))
962 break;
963 }
964
io_wq_worker_wake(struct io_worker * worker,void * data)965 static bool io_wq_worker_wake(struct io_worker *worker, void *data)
966 {
967 __set_notify_signal(worker->task);
968 wake_up_process(worker->task);
969 return false;
970 }
971
io_wq_set_exit_on_idle(struct io_wq * wq,bool enable)972 void io_wq_set_exit_on_idle(struct io_wq *wq, bool enable)
973 {
974 if (!wq->task)
975 return;
976
977 if (!enable) {
978 clear_bit(IO_WQ_BIT_EXIT_ON_IDLE, &wq->state);
979 return;
980 }
981
982 if (test_and_set_bit(IO_WQ_BIT_EXIT_ON_IDLE, &wq->state))
983 return;
984
985 rcu_read_lock();
986 io_wq_for_each_worker(wq, io_wq_worker_wake, NULL);
987 rcu_read_unlock();
988 }
989
io_run_cancel(struct io_wq_work * work,struct io_wq * wq)990 static void io_run_cancel(struct io_wq_work *work, struct io_wq *wq)
991 {
992 do {
993 atomic_or(IO_WQ_WORK_CANCEL, &work->flags);
994 io_wq_submit_work(work);
995 work = io_wq_free_work(work);
996 } while (work);
997 }
998
io_wq_insert_work(struct io_wq * wq,struct io_wq_acct * acct,struct io_wq_work * work,unsigned int work_flags)999 static void io_wq_insert_work(struct io_wq *wq, struct io_wq_acct *acct,
1000 struct io_wq_work *work, unsigned int work_flags)
1001 {
1002 unsigned int hash;
1003 struct io_wq_work *tail;
1004
1005 if (!__io_wq_is_hashed(work_flags)) {
1006 append:
1007 wq_list_add_tail(&work->list, &acct->work_list);
1008 return;
1009 }
1010
1011 hash = __io_get_work_hash(work_flags);
1012 tail = wq->hash_tail[hash];
1013 wq->hash_tail[hash] = work;
1014 if (!tail)
1015 goto append;
1016
1017 wq_list_add_after(&work->list, &tail->list, &acct->work_list);
1018 }
1019
io_wq_work_match_item(struct io_wq_work * work,void * data)1020 static bool io_wq_work_match_item(struct io_wq_work *work, void *data)
1021 {
1022 return work == data;
1023 }
1024
io_wq_enqueue(struct io_wq * wq,struct io_wq_work * work)1025 void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
1026 {
1027 unsigned int work_flags = atomic_read(&work->flags);
1028 struct io_wq_acct *acct = io_work_get_acct(wq, work_flags);
1029 struct io_cb_cancel_data match = {
1030 .fn = io_wq_work_match_item,
1031 .data = work,
1032 .cancel_all = false,
1033 };
1034 bool do_create;
1035
1036 /*
1037 * If io-wq is exiting for this task, or if the request has explicitly
1038 * been marked as one that should not get executed, cancel it here.
1039 */
1040 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) ||
1041 (work_flags & IO_WQ_WORK_CANCEL)) {
1042 io_run_cancel(work, wq);
1043 return;
1044 }
1045
1046 raw_spin_lock(&acct->lock);
1047 io_wq_insert_work(wq, acct, work, work_flags);
1048 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
1049 raw_spin_unlock(&acct->lock);
1050
1051 rcu_read_lock();
1052 do_create = !io_acct_activate_free_worker(acct);
1053 rcu_read_unlock();
1054
1055 if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) ||
1056 !atomic_read(&acct->nr_running))) {
1057 bool did_create;
1058
1059 did_create = io_wq_create_worker(wq, acct);
1060 if (likely(did_create))
1061 return;
1062
1063 raw_spin_lock(&acct->workers_lock);
1064 if (acct->nr_workers) {
1065 raw_spin_unlock(&acct->workers_lock);
1066 return;
1067 }
1068 raw_spin_unlock(&acct->workers_lock);
1069
1070 /* fatal condition, failed to create the first worker */
1071 io_acct_cancel_pending_work(wq, acct, &match);
1072 }
1073 }
1074
1075 /*
1076 * Work items that hash to the same value will not be done in parallel.
1077 * Used to limit concurrent writes, generally hashed by inode.
1078 */
io_wq_hash_work(struct io_wq_work * work,void * val)1079 void io_wq_hash_work(struct io_wq_work *work, void *val)
1080 {
1081 unsigned int bit;
1082
1083 bit = hash_ptr(val, IO_WQ_HASH_ORDER);
1084 atomic_or(IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT), &work->flags);
1085 }
1086
__io_wq_worker_cancel(struct io_worker * worker,struct io_cb_cancel_data * match,struct io_wq_work * work)1087 static bool __io_wq_worker_cancel(struct io_worker *worker,
1088 struct io_cb_cancel_data *match,
1089 struct io_wq_work *work)
1090 {
1091 if (work && match->fn(work, match->data)) {
1092 atomic_or(IO_WQ_WORK_CANCEL, &work->flags);
1093 __set_notify_signal(worker->task);
1094 return true;
1095 }
1096
1097 return false;
1098 }
1099
io_wq_worker_cancel(struct io_worker * worker,void * data)1100 static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
1101 {
1102 struct io_cb_cancel_data *match = data;
1103
1104 /*
1105 * Hold the lock to avoid ->cur_work going out of scope, caller
1106 * may dereference the passed in work.
1107 */
1108 raw_spin_lock(&worker->lock);
1109 if (__io_wq_worker_cancel(worker, match, worker->cur_work))
1110 match->nr_running++;
1111 raw_spin_unlock(&worker->lock);
1112
1113 return match->nr_running && !match->cancel_all;
1114 }
1115
io_wq_remove_pending(struct io_wq * wq,struct io_wq_acct * acct,struct io_wq_work * work,struct io_wq_work_node * prev)1116 static inline void io_wq_remove_pending(struct io_wq *wq,
1117 struct io_wq_acct *acct,
1118 struct io_wq_work *work,
1119 struct io_wq_work_node *prev)
1120 {
1121 unsigned int hash = io_get_work_hash(work);
1122 struct io_wq_work *prev_work = NULL;
1123
1124 if (io_wq_is_hashed(work) && work == wq->hash_tail[hash]) {
1125 if (prev)
1126 prev_work = container_of(prev, struct io_wq_work, list);
1127 if (prev_work && io_get_work_hash(prev_work) == hash)
1128 wq->hash_tail[hash] = prev_work;
1129 else
1130 wq->hash_tail[hash] = NULL;
1131 }
1132 wq_list_del(&acct->work_list, &work->list, prev);
1133 }
1134
io_acct_cancel_pending_work(struct io_wq * wq,struct io_wq_acct * acct,struct io_cb_cancel_data * match)1135 static bool io_acct_cancel_pending_work(struct io_wq *wq,
1136 struct io_wq_acct *acct,
1137 struct io_cb_cancel_data *match)
1138 {
1139 struct io_wq_work_node *node, *prev;
1140 struct io_wq_work *work;
1141
1142 raw_spin_lock(&acct->lock);
1143 wq_list_for_each(node, prev, &acct->work_list) {
1144 work = container_of(node, struct io_wq_work, list);
1145 if (!match->fn(work, match->data))
1146 continue;
1147 io_wq_remove_pending(wq, acct, work, prev);
1148 raw_spin_unlock(&acct->lock);
1149 io_run_cancel(work, wq);
1150 match->nr_pending++;
1151 /* not safe to continue after unlock */
1152 return true;
1153 }
1154 raw_spin_unlock(&acct->lock);
1155
1156 return false;
1157 }
1158
io_wq_cancel_pending_work(struct io_wq * wq,struct io_cb_cancel_data * match)1159 static void io_wq_cancel_pending_work(struct io_wq *wq,
1160 struct io_cb_cancel_data *match)
1161 {
1162 int i;
1163 retry:
1164 for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1165 struct io_wq_acct *acct = io_get_acct(wq, i == 0);
1166
1167 if (io_acct_cancel_pending_work(wq, acct, match)) {
1168 if (match->cancel_all)
1169 goto retry;
1170 break;
1171 }
1172 }
1173 }
1174
io_acct_cancel_running_work(struct io_wq_acct * acct,struct io_cb_cancel_data * match)1175 static void io_acct_cancel_running_work(struct io_wq_acct *acct,
1176 struct io_cb_cancel_data *match)
1177 {
1178 raw_spin_lock(&acct->workers_lock);
1179 io_acct_for_each_worker(acct, io_wq_worker_cancel, match);
1180 raw_spin_unlock(&acct->workers_lock);
1181 }
1182
io_wq_cancel_running_work(struct io_wq * wq,struct io_cb_cancel_data * match)1183 static void io_wq_cancel_running_work(struct io_wq *wq,
1184 struct io_cb_cancel_data *match)
1185 {
1186 rcu_read_lock();
1187
1188 for (int i = 0; i < IO_WQ_ACCT_NR; i++)
1189 io_acct_cancel_running_work(&wq->acct[i], match);
1190
1191 rcu_read_unlock();
1192 }
1193
io_wq_cancel_cb(struct io_wq * wq,work_cancel_fn * cancel,void * data,bool cancel_all)1194 enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
1195 void *data, bool cancel_all)
1196 {
1197 struct io_cb_cancel_data match = {
1198 .fn = cancel,
1199 .data = data,
1200 .cancel_all = cancel_all,
1201 };
1202
1203 /*
1204 * First check pending list, if we're lucky we can just remove it
1205 * from there. CANCEL_OK means that the work is returned as-new,
1206 * no completion will be posted for it.
1207 *
1208 * Then check if a free (going busy) or busy worker has the work
1209 * currently running. If we find it there, we'll return CANCEL_RUNNING
1210 * as an indication that we attempt to signal cancellation. The
1211 * completion will run normally in this case.
1212 *
1213 * Do both of these while holding the acct->workers_lock, to ensure that
1214 * we'll find a work item regardless of state.
1215 */
1216 io_wq_cancel_pending_work(wq, &match);
1217 if (match.nr_pending && !match.cancel_all)
1218 return IO_WQ_CANCEL_OK;
1219
1220 io_wq_cancel_running_work(wq, &match);
1221 if (match.nr_running && !match.cancel_all)
1222 return IO_WQ_CANCEL_RUNNING;
1223
1224 if (match.nr_running)
1225 return IO_WQ_CANCEL_RUNNING;
1226 if (match.nr_pending)
1227 return IO_WQ_CANCEL_OK;
1228 return IO_WQ_CANCEL_NOTFOUND;
1229 }
1230
io_wq_hash_wake(struct wait_queue_entry * wait,unsigned mode,int sync,void * key)1231 static int io_wq_hash_wake(struct wait_queue_entry *wait, unsigned mode,
1232 int sync, void *key)
1233 {
1234 struct io_wq *wq = container_of(wait, struct io_wq, wait);
1235 int i;
1236
1237 list_del_init(&wait->entry);
1238
1239 rcu_read_lock();
1240 for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1241 struct io_wq_acct *acct = &wq->acct[i];
1242
1243 if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
1244 io_acct_activate_free_worker(acct);
1245 }
1246 rcu_read_unlock();
1247 return 1;
1248 }
1249
io_wq_create(unsigned bounded,struct io_wq_data * data)1250 struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
1251 {
1252 int ret, i;
1253 struct io_wq *wq;
1254
1255 if (WARN_ON_ONCE(!bounded))
1256 return ERR_PTR(-EINVAL);
1257
1258 wq = kzalloc_obj(struct io_wq);
1259 if (!wq)
1260 return ERR_PTR(-ENOMEM);
1261
1262 refcount_inc(&data->hash->refs);
1263 wq->hash = data->hash;
1264
1265 ret = -ENOMEM;
1266
1267 if (!alloc_cpumask_var(&wq->cpu_mask, GFP_KERNEL))
1268 goto err;
1269 cpuset_cpus_allowed(data->task, wq->cpu_mask);
1270 wq->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
1271 wq->acct[IO_WQ_ACCT_UNBOUND].max_workers =
1272 task_rlimit(current, RLIMIT_NPROC);
1273 INIT_LIST_HEAD(&wq->wait.entry);
1274 wq->wait.func = io_wq_hash_wake;
1275 for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1276 struct io_wq_acct *acct = &wq->acct[i];
1277
1278 atomic_set(&acct->nr_running, 0);
1279
1280 raw_spin_lock_init(&acct->workers_lock);
1281 INIT_HLIST_NULLS_HEAD(&acct->free_list, 0);
1282 INIT_LIST_HEAD(&acct->all_list);
1283
1284 INIT_WQ_LIST(&acct->work_list);
1285 raw_spin_lock_init(&acct->lock);
1286 }
1287
1288 wq->task = get_task_struct(data->task);
1289 atomic_set(&wq->worker_refs, 1);
1290 init_completion(&wq->worker_done);
1291 ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1292 if (ret) {
1293 put_task_struct(wq->task);
1294 goto err;
1295 }
1296
1297 return wq;
1298 err:
1299 io_wq_put_hash(data->hash);
1300 free_cpumask_var(wq->cpu_mask);
1301 kfree(wq);
1302 return ERR_PTR(ret);
1303 }
1304
io_task_work_match(struct callback_head * cb,void * data)1305 static bool io_task_work_match(struct callback_head *cb, void *data)
1306 {
1307 struct io_worker *worker;
1308
1309 if (cb->func != create_worker_cb && cb->func != create_worker_cont)
1310 return false;
1311 worker = container_of(cb, struct io_worker, create_work);
1312 return worker->wq == data;
1313 }
1314
io_wq_exit_start(struct io_wq * wq)1315 void io_wq_exit_start(struct io_wq *wq)
1316 {
1317 set_bit(IO_WQ_BIT_EXIT, &wq->state);
1318 }
1319
io_wq_cancel_tw_create(struct io_wq * wq)1320 static void io_wq_cancel_tw_create(struct io_wq *wq)
1321 {
1322 struct callback_head *cb;
1323
1324 while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
1325 struct io_worker *worker;
1326
1327 worker = container_of(cb, struct io_worker, create_work);
1328 io_worker_cancel_cb(worker);
1329 /*
1330 * Only the worker continuation helper has worker allocated and
1331 * hence needs freeing.
1332 */
1333 if (cb->func == create_worker_cont)
1334 kfree(worker);
1335 }
1336 }
1337
io_wq_exit_workers(struct io_wq * wq)1338 static void io_wq_exit_workers(struct io_wq *wq)
1339 {
1340 unsigned long timeout, warn_timeout;
1341
1342 if (!wq->task)
1343 return;
1344
1345 io_wq_cancel_tw_create(wq);
1346
1347 rcu_read_lock();
1348 io_wq_for_each_worker(wq, io_wq_worker_wake, NULL);
1349 rcu_read_unlock();
1350 io_worker_ref_put(wq);
1351
1352 /*
1353 * Shut up hung task complaint, see for example
1354 *
1355 * https://lore.kernel.org/all/696fc9e7.a70a0220.111c58.0006.GAE@google.com/
1356 *
1357 * where completely overloading the system with tons of long running
1358 * io-wq items can easily trigger the hung task timeout. Only sleep
1359 * uninterruptibly for half that time, and warn if we exceeded end
1360 * up waiting more than IO_URING_EXIT_WAIT_MAX.
1361 */
1362 timeout = sysctl_hung_task_timeout_secs * HZ / 2;
1363 if (!timeout)
1364 timeout = MAX_SCHEDULE_TIMEOUT;
1365 warn_timeout = jiffies + IO_URING_EXIT_WAIT_MAX;
1366 do {
1367 if (wait_for_completion_timeout(&wq->worker_done, timeout))
1368 break;
1369 WARN_ON_ONCE(time_after(jiffies, warn_timeout));
1370 } while (1);
1371
1372 spin_lock_irq(&wq->hash->wait.lock);
1373 list_del_init(&wq->wait.entry);
1374 spin_unlock_irq(&wq->hash->wait.lock);
1375
1376 put_task_struct(wq->task);
1377 wq->task = NULL;
1378 }
1379
io_wq_destroy(struct io_wq * wq)1380 static void io_wq_destroy(struct io_wq *wq)
1381 {
1382 struct io_cb_cancel_data match = {
1383 .fn = io_wq_work_match_all,
1384 .cancel_all = true,
1385 };
1386
1387 cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1388 io_wq_cancel_pending_work(wq, &match);
1389 free_cpumask_var(wq->cpu_mask);
1390 io_wq_put_hash(wq->hash);
1391 kfree(wq);
1392 }
1393
io_wq_put_and_exit(struct io_wq * wq)1394 void io_wq_put_and_exit(struct io_wq *wq)
1395 {
1396 WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state));
1397
1398 io_wq_exit_workers(wq);
1399 io_wq_destroy(wq);
1400 }
1401
1402 struct online_data {
1403 unsigned int cpu;
1404 bool online;
1405 };
1406
io_wq_worker_affinity(struct io_worker * worker,void * data)1407 static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
1408 {
1409 struct online_data *od = data;
1410
1411 if (od->online)
1412 cpumask_set_cpu(od->cpu, worker->wq->cpu_mask);
1413 else
1414 cpumask_clear_cpu(od->cpu, worker->wq->cpu_mask);
1415 return false;
1416 }
1417
__io_wq_cpu_online(struct io_wq * wq,unsigned int cpu,bool online)1418 static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online)
1419 {
1420 struct online_data od = {
1421 .cpu = cpu,
1422 .online = online
1423 };
1424
1425 rcu_read_lock();
1426 io_wq_for_each_worker(wq, io_wq_worker_affinity, &od);
1427 rcu_read_unlock();
1428 return 0;
1429 }
1430
io_wq_cpu_online(unsigned int cpu,struct hlist_node * node)1431 static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node)
1432 {
1433 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
1434
1435 return __io_wq_cpu_online(wq, cpu, true);
1436 }
1437
io_wq_cpu_offline(unsigned int cpu,struct hlist_node * node)1438 static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)
1439 {
1440 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
1441
1442 return __io_wq_cpu_online(wq, cpu, false);
1443 }
1444
io_wq_cpu_affinity(struct io_uring_task * tctx,cpumask_var_t mask)1445 int io_wq_cpu_affinity(struct io_uring_task *tctx, cpumask_var_t mask)
1446 {
1447 cpumask_var_t allowed_mask;
1448 int ret = 0;
1449
1450 if (!tctx || !tctx->io_wq)
1451 return -EINVAL;
1452
1453 if (!alloc_cpumask_var(&allowed_mask, GFP_KERNEL))
1454 return -ENOMEM;
1455
1456 rcu_read_lock();
1457 cpuset_cpus_allowed(tctx->io_wq->task, allowed_mask);
1458 if (mask) {
1459 if (cpumask_subset(mask, allowed_mask))
1460 cpumask_copy(tctx->io_wq->cpu_mask, mask);
1461 else
1462 ret = -EINVAL;
1463 } else {
1464 cpumask_copy(tctx->io_wq->cpu_mask, allowed_mask);
1465 }
1466 rcu_read_unlock();
1467
1468 free_cpumask_var(allowed_mask);
1469 return ret;
1470 }
1471
1472 /*
1473 * Set max number of unbounded workers, returns old value. If new_count is 0,
1474 * then just return the old value.
1475 */
io_wq_max_workers(struct io_wq * wq,int * new_count)1476 int io_wq_max_workers(struct io_wq *wq, int *new_count)
1477 {
1478 struct io_wq_acct *acct;
1479 int prev[IO_WQ_ACCT_NR];
1480 int i;
1481
1482 BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND);
1483 BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
1484 BUILD_BUG_ON((int) IO_WQ_ACCT_NR != 2);
1485
1486 for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1487 if (new_count[i] > task_rlimit(current, RLIMIT_NPROC))
1488 new_count[i] = task_rlimit(current, RLIMIT_NPROC);
1489 }
1490
1491 for (i = 0; i < IO_WQ_ACCT_NR; i++)
1492 prev[i] = 0;
1493
1494 rcu_read_lock();
1495
1496 for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1497 acct = &wq->acct[i];
1498 raw_spin_lock(&acct->workers_lock);
1499 prev[i] = max_t(int, acct->max_workers, prev[i]);
1500 if (new_count[i])
1501 acct->max_workers = new_count[i];
1502 raw_spin_unlock(&acct->workers_lock);
1503 }
1504 rcu_read_unlock();
1505
1506 for (i = 0; i < IO_WQ_ACCT_NR; i++)
1507 new_count[i] = prev[i];
1508
1509 return 0;
1510 }
1511
io_wq_init(void)1512 static __init int io_wq_init(void)
1513 {
1514 int ret;
1515
1516 ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online",
1517 io_wq_cpu_online, io_wq_cpu_offline);
1518 if (ret < 0)
1519 return ret;
1520 io_wq_online = ret;
1521 return 0;
1522 }
1523 subsys_initcall(io_wq_init);
1524