1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3 * Copyright (C) 2007-2010 Lawrence Livermore National Security, LLC.
4 * Copyright (C) 2007 The Regents of the University of California.
5 * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
6 * Written by Brian Behlendorf <behlendorf1@llnl.gov>.
7 * UCRL-CODE-235197
8 *
9 * This file is part of the SPL, Solaris Porting Layer.
10 *
11 * The SPL is free software; you can redistribute it and/or modify it
12 * under the terms of the GNU General Public License as published by the
13 * Free Software Foundation; either version 2 of the License, or (at your
14 * option) any later version.
15 *
16 * The SPL is distributed in the hope that it will be useful, but WITHOUT
17 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19 * for more details.
20 *
21 * You should have received a copy of the GNU General Public License along
22 * with the SPL. If not, see <http://www.gnu.org/licenses/>.
23 *
24 * Solaris Porting Layer (SPL) Task Queue Implementation.
25 */
26 /*
27 * Copyright (c) 2024, Klara Inc.
28 * Copyright (c) 2024, Syneto
29 */
30
31 #include <sys/timer.h>
32 #include <sys/taskq.h>
33 #include <sys/kmem.h>
34 #include <sys/tsd.h>
35 #include <sys/trace_spl.h>
36 #include <sys/time.h>
37 #include <sys/atomic.h>
38 #include <sys/kstat.h>
39 #include <linux/cpuhotplug.h>
40 #include <linux/mod_compat.h>
41
42 /* Linux 6.2 renamed timer_delete_sync(); point it at its old name for those. */
43 #ifndef HAVE_TIMER_DELETE_SYNC
44 #define timer_delete_sync(t) del_timer_sync(t)
45 #endif
46
47 typedef struct taskq_kstats {
48 /* static values, for completeness */
49 kstat_named_t tqks_threads_max;
50 kstat_named_t tqks_entry_pool_min;
51 kstat_named_t tqks_entry_pool_max;
52
53 /* gauges (inc/dec counters, current value) */
54 kstat_named_t tqks_threads_active;
55 kstat_named_t tqks_threads_idle;
56 kstat_named_t tqks_threads_total;
57 kstat_named_t tqks_tasks_pending;
58 kstat_named_t tqks_tasks_priority;
59 kstat_named_t tqks_tasks_total;
60 kstat_named_t tqks_tasks_delayed;
61 kstat_named_t tqks_entries_free;
62
63 /* counters (inc only, since taskq creation) */
64 kstat_named_t tqks_threads_created;
65 kstat_named_t tqks_threads_destroyed;
66 kstat_named_t tqks_tasks_dispatched;
67 kstat_named_t tqks_tasks_dispatched_delayed;
68 kstat_named_t tqks_tasks_executed_normal;
69 kstat_named_t tqks_tasks_executed_priority;
70 kstat_named_t tqks_tasks_executed;
71 kstat_named_t tqks_tasks_delayed_requeued;
72 kstat_named_t tqks_tasks_cancelled;
73 kstat_named_t tqks_thread_wakeups;
74 kstat_named_t tqks_thread_wakeups_nowork;
75 kstat_named_t tqks_thread_sleeps;
76 } taskq_kstats_t;
77
78 static taskq_kstats_t taskq_kstats_template = {
79 { "threads_max", KSTAT_DATA_UINT64 },
80 { "entry_pool_min", KSTAT_DATA_UINT64 },
81 { "entry_pool_max", KSTAT_DATA_UINT64 },
82 { "threads_active", KSTAT_DATA_UINT64 },
83 { "threads_idle", KSTAT_DATA_UINT64 },
84 { "threads_total", KSTAT_DATA_UINT64 },
85 { "tasks_pending", KSTAT_DATA_UINT64 },
86 { "tasks_priority", KSTAT_DATA_UINT64 },
87 { "tasks_total", KSTAT_DATA_UINT64 },
88 { "tasks_delayed", KSTAT_DATA_UINT64 },
89 { "entries_free", KSTAT_DATA_UINT64 },
90
91 { "threads_created", KSTAT_DATA_UINT64 },
92 { "threads_destroyed", KSTAT_DATA_UINT64 },
93 { "tasks_dispatched", KSTAT_DATA_UINT64 },
94 { "tasks_dispatched_delayed", KSTAT_DATA_UINT64 },
95 { "tasks_executed_normal", KSTAT_DATA_UINT64 },
96 { "tasks_executed_priority", KSTAT_DATA_UINT64 },
97 { "tasks_executed", KSTAT_DATA_UINT64 },
98 { "tasks_delayed_requeued", KSTAT_DATA_UINT64 },
99 { "tasks_cancelled", KSTAT_DATA_UINT64 },
100 { "thread_wakeups", KSTAT_DATA_UINT64 },
101 { "thread_wakeups_nowork", KSTAT_DATA_UINT64 },
102 { "thread_sleeps", KSTAT_DATA_UINT64 },
103 };
104
105 #define TQSTAT_INC(tq, stat) wmsum_add(&tq->tq_sums.tqs_##stat, 1)
106 #define TQSTAT_DEC(tq, stat) wmsum_add(&tq->tq_sums.tqs_##stat, -1)
107
108 #define _TQSTAT_MOD_LIST(mod, tq, t) do { \
109 switch (t->tqent_flags & TQENT_LIST_MASK) { \
110 case TQENT_LIST_NONE: ASSERT(list_empty(&t->tqent_list)); break;\
111 case TQENT_LIST_PENDING: mod(tq, tasks_pending); break; \
112 case TQENT_LIST_PRIORITY: mod(tq, tasks_priority); break; \
113 case TQENT_LIST_DELAY: mod(tq, tasks_delayed); break; \
114 } \
115 } while (0)
116 #define TQSTAT_INC_LIST(tq, t) _TQSTAT_MOD_LIST(TQSTAT_INC, tq, t)
117 #define TQSTAT_DEC_LIST(tq, t) _TQSTAT_MOD_LIST(TQSTAT_DEC, tq, t)
118
119 #define TQENT_SET_LIST(t, l) \
120 t->tqent_flags = (t->tqent_flags & ~TQENT_LIST_MASK) | l;
121
122 static int spl_taskq_thread_bind = 0;
123 module_param(spl_taskq_thread_bind, int, 0644);
124 MODULE_PARM_DESC(spl_taskq_thread_bind, "Bind taskq thread to CPU by default");
125
126 static uint_t spl_taskq_thread_timeout_ms = 5000;
127 module_param(spl_taskq_thread_timeout_ms, uint, 0644);
128 MODULE_PARM_DESC(spl_taskq_thread_timeout_ms,
129 "Minimum idle threads exit interval for dynamic taskqs");
130
131 static int spl_taskq_thread_dynamic = 1;
132 module_param(spl_taskq_thread_dynamic, int, 0444);
133 MODULE_PARM_DESC(spl_taskq_thread_dynamic, "Allow dynamic taskq threads");
134
135 static int spl_taskq_thread_priority = 1;
136 module_param(spl_taskq_thread_priority, int, 0644);
137 MODULE_PARM_DESC(spl_taskq_thread_priority,
138 "Allow non-default priority for taskq threads");
139
140 static uint_t spl_taskq_thread_sequential = 4;
141 module_param(spl_taskq_thread_sequential, uint, 0644);
142 MODULE_PARM_DESC(spl_taskq_thread_sequential,
143 "Create new taskq threads after N sequential tasks");
144
145 /*
146 * Global system-wide dynamic task queue available for all consumers. This
147 * taskq is not intended for long-running tasks; instead, a dedicated taskq
148 * should be created.
149 */
150 taskq_t *system_taskq;
151 EXPORT_SYMBOL(system_taskq);
152 /* Global dynamic task queue for long delay */
153 taskq_t *system_delay_taskq;
154 EXPORT_SYMBOL(system_delay_taskq);
155
156 /* Private dedicated taskq for creating new taskq threads on demand. */
157 static taskq_t *dynamic_taskq;
158 static taskq_thread_t *taskq_thread_create(taskq_t *);
159
160 /* Multi-callback id for cpu hotplugging. */
161 static int spl_taskq_cpuhp_state;
162
163 /* List of all taskqs */
164 LIST_HEAD(tq_list);
165 struct rw_semaphore tq_list_sem;
166 static uint_t taskq_tsd;
167
168 static int
task_km_flags(uint_t flags)169 task_km_flags(uint_t flags)
170 {
171 if (flags & TQ_NOSLEEP)
172 return (KM_NOSLEEP);
173
174 if (flags & TQ_PUSHPAGE)
175 return (KM_PUSHPAGE);
176
177 return (KM_SLEEP);
178 }
179
180 /*
181 * taskq_find_by_name - Find the largest instance number of a named taskq.
182 */
183 static int
taskq_find_by_name(const char * name)184 taskq_find_by_name(const char *name)
185 {
186 struct list_head *tql = NULL;
187 taskq_t *tq;
188
189 list_for_each_prev(tql, &tq_list) {
190 tq = list_entry(tql, taskq_t, tq_taskqs);
191 if (strcmp(name, tq->tq_name) == 0)
192 return (tq->tq_instance);
193 }
194 return (-1);
195 }
196
197 /*
198 * NOTE: Must be called with tq->tq_lock held, returns a list_t which
199 * is not attached to the free, work, or pending taskq lists.
200 */
201 static taskq_ent_t *
task_alloc(taskq_t * tq,uint_t flags,unsigned long * irqflags)202 task_alloc(taskq_t *tq, uint_t flags, unsigned long *irqflags)
203 {
204 taskq_ent_t *t;
205 int count = 0;
206
207 ASSERT(tq);
208 retry:
209 /* Acquire taskq_ent_t's from free list if available */
210 if (!list_empty(&tq->tq_free_list) && !(flags & TQ_NEW)) {
211 t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);
212
213 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
214 ASSERT(!(t->tqent_flags & TQENT_FLAG_CANCEL));
215 ASSERT(!timer_pending(&t->tqent_timer));
216
217 list_del_init(&t->tqent_list);
218 TQSTAT_DEC(tq, entries_free);
219 return (t);
220 }
221
222 /* Free list is empty and memory allocations are prohibited */
223 if (flags & TQ_NOALLOC)
224 return (NULL);
225
226 /* Hit maximum taskq_ent_t pool size */
227 if (tq->tq_nalloc >= tq->tq_maxalloc) {
228 if (flags & TQ_NOSLEEP)
229 return (NULL);
230
231 /*
232 * Sleep periodically polling the free list for an available
233 * taskq_ent_t. Dispatching with TQ_SLEEP should always succeed
234 * but we cannot block forever waiting for an taskq_ent_t to
235 * show up in the free list, otherwise a deadlock can happen.
236 *
237 * Therefore, we need to allocate a new task even if the number
238 * of allocated tasks is above tq->tq_maxalloc, but we still
239 * end up delaying the task allocation by one second, thereby
240 * throttling the task dispatch rate.
241 */
242 spin_unlock_irqrestore(&tq->tq_lock, *irqflags);
243 schedule_timeout_interruptible(HZ / 100);
244 spin_lock_irqsave_nested(&tq->tq_lock, *irqflags,
245 tq->tq_lock_class);
246 if (count < 100) {
247 count++;
248 goto retry;
249 }
250 }
251
252 spin_unlock_irqrestore(&tq->tq_lock, *irqflags);
253 t = kmem_alloc(sizeof (taskq_ent_t), task_km_flags(flags));
254 spin_lock_irqsave_nested(&tq->tq_lock, *irqflags, tq->tq_lock_class);
255
256 if (t) {
257 taskq_init_ent(t);
258 tq->tq_nalloc++;
259 }
260
261 return (t);
262 }
263
264 /*
265 * NOTE: Must be called with tq->tq_lock held, expects the taskq_ent_t
266 * to already be removed from the free, work, or pending taskq lists.
267 */
268 static void
task_free(taskq_t * tq,taskq_ent_t * t)269 task_free(taskq_t *tq, taskq_ent_t *t)
270 {
271 ASSERT(tq);
272 ASSERT(t);
273 ASSERT(list_empty(&t->tqent_list));
274 ASSERT(!timer_pending(&t->tqent_timer));
275
276 kmem_free(t, sizeof (taskq_ent_t));
277 tq->tq_nalloc--;
278 }
279
280 /*
281 * NOTE: Must be called with tq->tq_lock held, either destroys the
282 * taskq_ent_t if too many exist or moves it to the free list for later use.
283 */
284 static void
task_done(taskq_t * tq,taskq_ent_t * t)285 task_done(taskq_t *tq, taskq_ent_t *t)
286 {
287 ASSERT(tq);
288 ASSERT(t);
289 ASSERT(list_empty(&t->tqent_list));
290
291 /* Wake tasks blocked in taskq_wait_id() */
292 wake_up_all(&t->tqent_waitq);
293
294 if (tq->tq_nalloc <= tq->tq_minalloc) {
295 t->tqent_id = TASKQID_INVALID;
296 t->tqent_func = NULL;
297 t->tqent_arg = NULL;
298 t->tqent_flags = 0;
299
300 list_add_tail(&t->tqent_list, &tq->tq_free_list);
301 TQSTAT_INC(tq, entries_free);
302 } else {
303 task_free(tq, t);
304 }
305 }
306
307 /*
308 * When a delayed task timer expires remove it from the delay list and
309 * add it to the priority list in order for immediate processing.
310 */
311 static void
task_expire_impl(taskq_ent_t * t)312 task_expire_impl(taskq_ent_t *t)
313 {
314 taskq_ent_t *w;
315 taskq_t *tq = t->tqent_taskq;
316 struct list_head *l = NULL;
317 unsigned long flags;
318
319 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
320
321 if (t->tqent_flags & TQENT_FLAG_CANCEL) {
322 ASSERT(list_empty(&t->tqent_list));
323 spin_unlock_irqrestore(&tq->tq_lock, flags);
324 return;
325 }
326
327 t->tqent_birth = jiffies;
328 DTRACE_PROBE1(taskq_ent__birth, taskq_ent_t *, t);
329
330 /*
331 * The priority list must be maintained in strict task id order
332 * from lowest to highest for lowest_id to be easily calculable.
333 */
334 list_del(&t->tqent_list);
335 list_for_each_prev(l, &tq->tq_prio_list) {
336 w = list_entry(l, taskq_ent_t, tqent_list);
337 if (w->tqent_id < t->tqent_id) {
338 list_add(&t->tqent_list, l);
339 break;
340 }
341 }
342 if (l == &tq->tq_prio_list)
343 list_add(&t->tqent_list, &tq->tq_prio_list);
344
345 spin_unlock_irqrestore(&tq->tq_lock, flags);
346
347 wake_up(&tq->tq_work_waitq);
348
349 TQSTAT_INC(tq, tasks_delayed_requeued);
350 }
351
352 static void
task_expire(struct timer_list * tl)353 task_expire(struct timer_list *tl)
354 {
355 struct timer_list *tmr = (struct timer_list *)tl;
356 taskq_ent_t *t = from_timer(t, tmr, tqent_timer);
357 task_expire_impl(t);
358 }
359
360 /*
361 * Returns the lowest incomplete taskqid_t. The taskqid_t may
362 * be queued on the pending list, on the priority list, on the
363 * delay list, or on the work list currently being handled, but
364 * it is not 100% complete yet.
365 */
366 static taskqid_t
taskq_lowest_id(taskq_t * tq)367 taskq_lowest_id(taskq_t *tq)
368 {
369 taskqid_t lowest_id = tq->tq_next_id;
370 taskq_ent_t *t;
371 taskq_thread_t *tqt;
372
373 if (!list_empty(&tq->tq_pend_list)) {
374 t = list_entry(tq->tq_pend_list.next, taskq_ent_t, tqent_list);
375 lowest_id = MIN(lowest_id, t->tqent_id);
376 }
377
378 if (!list_empty(&tq->tq_prio_list)) {
379 t = list_entry(tq->tq_prio_list.next, taskq_ent_t, tqent_list);
380 lowest_id = MIN(lowest_id, t->tqent_id);
381 }
382
383 if (!list_empty(&tq->tq_delay_list)) {
384 t = list_entry(tq->tq_delay_list.next, taskq_ent_t, tqent_list);
385 lowest_id = MIN(lowest_id, t->tqent_id);
386 }
387
388 if (!list_empty(&tq->tq_active_list)) {
389 tqt = list_entry(tq->tq_active_list.next, taskq_thread_t,
390 tqt_active_list);
391 ASSERT(tqt->tqt_id != TASKQID_INVALID);
392 lowest_id = MIN(lowest_id, tqt->tqt_id);
393 }
394
395 return (lowest_id);
396 }
397
398 /*
399 * Insert a task into a list keeping the list sorted by increasing taskqid.
400 */
401 static void
taskq_insert_in_order(taskq_t * tq,taskq_thread_t * tqt)402 taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt)
403 {
404 taskq_thread_t *w;
405 struct list_head *l = NULL;
406
407 ASSERT(tq);
408 ASSERT(tqt);
409
410 list_for_each_prev(l, &tq->tq_active_list) {
411 w = list_entry(l, taskq_thread_t, tqt_active_list);
412 if (w->tqt_id < tqt->tqt_id) {
413 list_add(&tqt->tqt_active_list, l);
414 break;
415 }
416 }
417 if (l == &tq->tq_active_list)
418 list_add(&tqt->tqt_active_list, &tq->tq_active_list);
419 }
420
421 /*
422 * Find and return a task from the given list if it exists. The list
423 * must be in lowest to highest task id order.
424 */
425 static taskq_ent_t *
taskq_find_list(taskq_t * tq,struct list_head * lh,taskqid_t id)426 taskq_find_list(taskq_t *tq, struct list_head *lh, taskqid_t id)
427 {
428 struct list_head *l = NULL;
429 taskq_ent_t *t;
430
431 list_for_each(l, lh) {
432 t = list_entry(l, taskq_ent_t, tqent_list);
433
434 if (t->tqent_id == id)
435 return (t);
436
437 if (t->tqent_id > id)
438 break;
439 }
440
441 return (NULL);
442 }
443
444 /*
445 * Find an already dispatched task given the task id regardless of what
446 * state it is in. If a task is still pending it will be returned.
447 * If a task is executing, then -EBUSY will be returned instead.
448 * If the task has already been run then NULL is returned.
449 */
450 static taskq_ent_t *
taskq_find(taskq_t * tq,taskqid_t id)451 taskq_find(taskq_t *tq, taskqid_t id)
452 {
453 taskq_thread_t *tqt;
454 struct list_head *l = NULL;
455 taskq_ent_t *t;
456
457 t = taskq_find_list(tq, &tq->tq_delay_list, id);
458 if (t)
459 return (t);
460
461 t = taskq_find_list(tq, &tq->tq_prio_list, id);
462 if (t)
463 return (t);
464
465 t = taskq_find_list(tq, &tq->tq_pend_list, id);
466 if (t)
467 return (t);
468
469 list_for_each(l, &tq->tq_active_list) {
470 tqt = list_entry(l, taskq_thread_t, tqt_active_list);
471 if (tqt->tqt_id == id) {
472 /*
473 * Instead of returning tqt_task, we just return a non
474 * NULL value to prevent misuse, since tqt_task only
475 * has two valid fields.
476 */
477 return (ERR_PTR(-EBUSY));
478 }
479 }
480
481 return (NULL);
482 }
483
484 /*
485 * Theory for the taskq_wait_id(), taskq_wait_outstanding(), and
486 * taskq_wait() functions below.
487 *
488 * Taskq waiting is accomplished by tracking the lowest outstanding task
489 * id and the next available task id. As tasks are dispatched they are
490 * added to the tail of the pending, priority, or delay lists. As worker
491 * threads become available the tasks are removed from the heads of these
492 * lists and linked to the worker threads. This ensures the lists are
493 * kept sorted by lowest to highest task id.
494 *
495 * Therefore the lowest outstanding task id can be quickly determined by
496 * checking the head item from all of these lists. This value is stored
497 * with the taskq as the lowest id. It only needs to be recalculated when
498 * either the task with the current lowest id completes or is canceled.
499 *
500 * By blocking until the lowest task id exceeds the passed task id the
501 * taskq_wait_outstanding() function can be easily implemented. Similarly,
502 * by blocking until the lowest task id matches the next task id taskq_wait()
503 * can be implemented.
504 *
505 * Callers should be aware that when there are multiple worked threads it
506 * is possible for larger task ids to complete before smaller ones. Also
507 * when the taskq contains delay tasks with small task ids callers may
508 * block for a considerable length of time waiting for them to expire and
509 * execute.
510 */
511 static int
taskq_wait_id_check(taskq_t * tq,taskqid_t id)512 taskq_wait_id_check(taskq_t *tq, taskqid_t id)
513 {
514 int rc;
515 unsigned long flags;
516
517 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
518 rc = (taskq_find(tq, id) == NULL);
519 spin_unlock_irqrestore(&tq->tq_lock, flags);
520
521 return (rc);
522 }
523
524 /*
525 * The taskq_wait_id() function blocks until the passed task id completes.
526 * This does not guarantee that all lower task ids have completed.
527 */
528 void
taskq_wait_id(taskq_t * tq,taskqid_t id)529 taskq_wait_id(taskq_t *tq, taskqid_t id)
530 {
531 wait_event(tq->tq_wait_waitq, taskq_wait_id_check(tq, id));
532 }
533 EXPORT_SYMBOL(taskq_wait_id);
534
535 static int
taskq_wait_outstanding_check(taskq_t * tq,taskqid_t id)536 taskq_wait_outstanding_check(taskq_t *tq, taskqid_t id)
537 {
538 int rc;
539 unsigned long flags;
540
541 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
542 rc = (id < tq->tq_lowest_id);
543 spin_unlock_irqrestore(&tq->tq_lock, flags);
544
545 return (rc);
546 }
547
548 /*
549 * The taskq_wait_outstanding() function will block until all tasks with a
550 * lower taskqid than the passed 'id' have been completed. Note that all
551 * task id's are assigned monotonically at dispatch time. Zero may be
552 * passed for the id to indicate all tasks dispatch up to this point,
553 * but not after, should be waited for.
554 */
555 void
taskq_wait_outstanding(taskq_t * tq,taskqid_t id)556 taskq_wait_outstanding(taskq_t *tq, taskqid_t id)
557 {
558 id = id ? id : tq->tq_next_id - 1;
559 wait_event(tq->tq_wait_waitq, taskq_wait_outstanding_check(tq, id));
560 }
561 EXPORT_SYMBOL(taskq_wait_outstanding);
562
563 static int
taskq_wait_check(taskq_t * tq)564 taskq_wait_check(taskq_t *tq)
565 {
566 int rc;
567 unsigned long flags;
568
569 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
570 rc = (tq->tq_lowest_id == tq->tq_next_id);
571 spin_unlock_irqrestore(&tq->tq_lock, flags);
572
573 return (rc);
574 }
575
576 /*
577 * The taskq_wait() function will block until the taskq is empty.
578 * This means that if a taskq re-dispatches work to itself taskq_wait()
579 * callers will block indefinitely.
580 */
581 void
taskq_wait(taskq_t * tq)582 taskq_wait(taskq_t *tq)
583 {
584 wait_event(tq->tq_wait_waitq, taskq_wait_check(tq));
585 }
586 EXPORT_SYMBOL(taskq_wait);
587
588 int
taskq_member(taskq_t * tq,kthread_t * t)589 taskq_member(taskq_t *tq, kthread_t *t)
590 {
591 return (tq == (taskq_t *)tsd_get_by_thread(taskq_tsd, t));
592 }
593 EXPORT_SYMBOL(taskq_member);
594
595 taskq_t *
taskq_of_curthread(void)596 taskq_of_curthread(void)
597 {
598 return (tsd_get(taskq_tsd));
599 }
600 EXPORT_SYMBOL(taskq_of_curthread);
601
602 /*
603 * Cancel an already dispatched task given the task id. Still pending tasks
604 * will be immediately canceled, and if the task is active the function will
605 * block until it completes. Preallocated tasks which are canceled must be
606 * freed by the caller.
607 */
608 int
taskq_cancel_id(taskq_t * tq,taskqid_t id)609 taskq_cancel_id(taskq_t *tq, taskqid_t id)
610 {
611 taskq_ent_t *t;
612 int rc = ENOENT;
613 unsigned long flags;
614
615 ASSERT(tq);
616
617 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
618 t = taskq_find(tq, id);
619 if (t && t != ERR_PTR(-EBUSY)) {
620 list_del_init(&t->tqent_list);
621 TQSTAT_DEC_LIST(tq, t);
622 TQSTAT_DEC(tq, tasks_total);
623
624 t->tqent_flags |= TQENT_FLAG_CANCEL;
625 TQSTAT_INC(tq, tasks_cancelled);
626
627 /*
628 * When canceling the lowest outstanding task id we
629 * must recalculate the new lowest outstanding id.
630 */
631 if (tq->tq_lowest_id == t->tqent_id) {
632 tq->tq_lowest_id = taskq_lowest_id(tq);
633 ASSERT3S(tq->tq_lowest_id, >, t->tqent_id);
634 }
635
636 /*
637 * The task_expire() function takes the tq->tq_lock so drop
638 * drop the lock before synchronously cancelling the timer.
639 */
640 if (timer_pending(&t->tqent_timer)) {
641 spin_unlock_irqrestore(&tq->tq_lock, flags);
642 timer_delete_sync(&t->tqent_timer);
643 spin_lock_irqsave_nested(&tq->tq_lock, flags,
644 tq->tq_lock_class);
645 }
646
647 if (!(t->tqent_flags & TQENT_FLAG_PREALLOC))
648 task_done(tq, t);
649
650 rc = 0;
651 }
652 spin_unlock_irqrestore(&tq->tq_lock, flags);
653
654 if (t == ERR_PTR(-EBUSY)) {
655 taskq_wait_id(tq, id);
656 rc = EBUSY;
657 }
658
659 return (rc);
660 }
661 EXPORT_SYMBOL(taskq_cancel_id);
662
663 static int taskq_thread_spawn(taskq_t *tq);
664
665 taskqid_t
taskq_dispatch(taskq_t * tq,task_func_t func,void * arg,uint_t flags)666 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
667 {
668 taskq_ent_t *t;
669 taskqid_t rc = TASKQID_INVALID;
670 unsigned long irqflags;
671
672 ASSERT(tq);
673 ASSERT(func);
674
675 spin_lock_irqsave_nested(&tq->tq_lock, irqflags, tq->tq_lock_class);
676
677 /* Taskq being destroyed and all tasks drained */
678 if (!(tq->tq_flags & TASKQ_ACTIVE))
679 goto out;
680
681 /* Do not queue the task unless there is idle thread for it */
682 ASSERT(tq->tq_nactive <= tq->tq_nthreads);
683 if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads)) {
684 /* Dynamic taskq may be able to spawn another thread */
685 if (taskq_thread_spawn(tq) == 0)
686 goto out;
687 }
688
689 if ((t = task_alloc(tq, flags, &irqflags)) == NULL)
690 goto out;
691
692 spin_lock(&t->tqent_lock);
693
694 /* Queue to the front of the list to enforce TQ_NOQUEUE semantics */
695 if (flags & TQ_NOQUEUE) {
696 TQENT_SET_LIST(t, TQENT_LIST_PRIORITY);
697 list_add(&t->tqent_list, &tq->tq_prio_list);
698 /* Queue to the priority list instead of the pending list */
699 } else if (flags & TQ_FRONT) {
700 TQENT_SET_LIST(t, TQENT_LIST_PRIORITY);
701 list_add_tail(&t->tqent_list, &tq->tq_prio_list);
702 } else {
703 TQENT_SET_LIST(t, TQENT_LIST_PENDING);
704 list_add_tail(&t->tqent_list, &tq->tq_pend_list);
705 }
706 TQSTAT_INC_LIST(tq, t);
707 TQSTAT_INC(tq, tasks_total);
708
709 t->tqent_id = rc = tq->tq_next_id;
710 tq->tq_next_id++;
711 t->tqent_func = func;
712 t->tqent_arg = arg;
713 t->tqent_taskq = tq;
714 t->tqent_timer.function = NULL;
715 t->tqent_timer.expires = 0;
716
717 t->tqent_birth = jiffies;
718 DTRACE_PROBE1(taskq_ent__birth, taskq_ent_t *, t);
719
720 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
721
722 spin_unlock(&t->tqent_lock);
723
724 wake_up(&tq->tq_work_waitq);
725
726 TQSTAT_INC(tq, tasks_dispatched);
727
728 /* Spawn additional taskq threads if required. */
729 if (!(flags & TQ_NOQUEUE) && tq->tq_nactive == tq->tq_nthreads)
730 (void) taskq_thread_spawn(tq);
731 out:
732 spin_unlock_irqrestore(&tq->tq_lock, irqflags);
733 return (rc);
734 }
735 EXPORT_SYMBOL(taskq_dispatch);
736
737 taskqid_t
taskq_dispatch_delay(taskq_t * tq,task_func_t func,void * arg,uint_t flags,clock_t expire_time)738 taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg,
739 uint_t flags, clock_t expire_time)
740 {
741 taskqid_t rc = TASKQID_INVALID;
742 taskq_ent_t *t;
743 unsigned long irqflags;
744
745 ASSERT(tq);
746 ASSERT(func);
747
748 spin_lock_irqsave_nested(&tq->tq_lock, irqflags, tq->tq_lock_class);
749
750 /* Taskq being destroyed and all tasks drained */
751 if (!(tq->tq_flags & TASKQ_ACTIVE))
752 goto out;
753
754 if ((t = task_alloc(tq, flags, &irqflags)) == NULL)
755 goto out;
756
757 spin_lock(&t->tqent_lock);
758
759 /* Queue to the delay list for subsequent execution */
760 list_add_tail(&t->tqent_list, &tq->tq_delay_list);
761 TQENT_SET_LIST(t, TQENT_LIST_DELAY);
762 TQSTAT_INC_LIST(tq, t);
763 TQSTAT_INC(tq, tasks_total);
764
765 t->tqent_id = rc = tq->tq_next_id;
766 tq->tq_next_id++;
767 t->tqent_func = func;
768 t->tqent_arg = arg;
769 t->tqent_taskq = tq;
770 t->tqent_timer.function = task_expire;
771 t->tqent_timer.expires = (unsigned long)expire_time;
772 add_timer(&t->tqent_timer);
773
774 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
775
776 spin_unlock(&t->tqent_lock);
777
778 TQSTAT_INC(tq, tasks_dispatched_delayed);
779
780 /* Spawn additional taskq threads if required. */
781 if (tq->tq_nactive == tq->tq_nthreads)
782 (void) taskq_thread_spawn(tq);
783 out:
784 spin_unlock_irqrestore(&tq->tq_lock, irqflags);
785 return (rc);
786 }
787 EXPORT_SYMBOL(taskq_dispatch_delay);
788
789 void
taskq_dispatch_ent(taskq_t * tq,task_func_t func,void * arg,uint_t flags,taskq_ent_t * t)790 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
791 taskq_ent_t *t)
792 {
793 unsigned long irqflags;
794 ASSERT(tq);
795 ASSERT(func);
796
797 spin_lock_irqsave_nested(&tq->tq_lock, irqflags,
798 tq->tq_lock_class);
799
800 /* Taskq being destroyed and all tasks drained */
801 if (!(tq->tq_flags & TASKQ_ACTIVE)) {
802 t->tqent_id = TASKQID_INVALID;
803 goto out;
804 }
805
806 if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads)) {
807 /* Dynamic taskq may be able to spawn another thread */
808 if (taskq_thread_spawn(tq) == 0)
809 goto out;
810 flags |= TQ_FRONT;
811 }
812
813 spin_lock(&t->tqent_lock);
814
815 /*
816 * Make sure the entry is not on some other taskq; it is important to
817 * ASSERT() under lock
818 */
819 ASSERT(taskq_empty_ent(t));
820
821 /*
822 * Mark it as a prealloc'd task. This is important
823 * to ensure that we don't free it later.
824 */
825 t->tqent_flags |= TQENT_FLAG_PREALLOC;
826
827 /* Queue to the priority list instead of the pending list */
828 if (flags & TQ_FRONT) {
829 TQENT_SET_LIST(t, TQENT_LIST_PRIORITY);
830 list_add_tail(&t->tqent_list, &tq->tq_prio_list);
831 } else {
832 TQENT_SET_LIST(t, TQENT_LIST_PENDING);
833 list_add_tail(&t->tqent_list, &tq->tq_pend_list);
834 }
835 TQSTAT_INC_LIST(tq, t);
836 TQSTAT_INC(tq, tasks_total);
837
838 t->tqent_id = tq->tq_next_id;
839 tq->tq_next_id++;
840 t->tqent_func = func;
841 t->tqent_arg = arg;
842 t->tqent_taskq = tq;
843
844 t->tqent_birth = jiffies;
845 DTRACE_PROBE1(taskq_ent__birth, taskq_ent_t *, t);
846
847 spin_unlock(&t->tqent_lock);
848
849 wake_up(&tq->tq_work_waitq);
850
851 TQSTAT_INC(tq, tasks_dispatched);
852
853 /* Spawn additional taskq threads if required. */
854 if (tq->tq_nactive == tq->tq_nthreads)
855 (void) taskq_thread_spawn(tq);
856 out:
857 spin_unlock_irqrestore(&tq->tq_lock, irqflags);
858 }
859 EXPORT_SYMBOL(taskq_dispatch_ent);
860
861 int
taskq_empty_ent(taskq_ent_t * t)862 taskq_empty_ent(taskq_ent_t *t)
863 {
864 return (list_empty(&t->tqent_list));
865 }
866 EXPORT_SYMBOL(taskq_empty_ent);
867
868 void
taskq_init_ent(taskq_ent_t * t)869 taskq_init_ent(taskq_ent_t *t)
870 {
871 spin_lock_init(&t->tqent_lock);
872 init_waitqueue_head(&t->tqent_waitq);
873 timer_setup(&t->tqent_timer, NULL, 0);
874 INIT_LIST_HEAD(&t->tqent_list);
875 t->tqent_id = 0;
876 t->tqent_func = NULL;
877 t->tqent_arg = NULL;
878 t->tqent_flags = 0;
879 t->tqent_taskq = NULL;
880 }
881 EXPORT_SYMBOL(taskq_init_ent);
882
883 /*
884 * Return the next pending task, preference is given to tasks on the
885 * priority list which were dispatched with TQ_FRONT.
886 */
887 static taskq_ent_t *
taskq_next_ent(taskq_t * tq)888 taskq_next_ent(taskq_t *tq)
889 {
890 struct list_head *list;
891
892 if (!list_empty(&tq->tq_prio_list))
893 list = &tq->tq_prio_list;
894 else if (!list_empty(&tq->tq_pend_list))
895 list = &tq->tq_pend_list;
896 else
897 return (NULL);
898
899 return (list_entry(list->next, taskq_ent_t, tqent_list));
900 }
901
902 /*
903 * Spawns a new thread for the specified taskq.
904 */
905 static void
taskq_thread_spawn_task(void * arg)906 taskq_thread_spawn_task(void *arg)
907 {
908 taskq_t *tq = (taskq_t *)arg;
909 unsigned long flags;
910
911 if (taskq_thread_create(tq) == NULL) {
912 /* restore spawning count if failed */
913 spin_lock_irqsave_nested(&tq->tq_lock, flags,
914 tq->tq_lock_class);
915 tq->tq_nspawn--;
916 spin_unlock_irqrestore(&tq->tq_lock, flags);
917 }
918 }
919
920 /*
921 * Spawn addition threads for dynamic taskqs (TASKQ_DYNAMIC) the current
922 * number of threads is insufficient to handle the pending tasks. These
923 * new threads must be created by the dedicated dynamic_taskq to avoid
924 * deadlocks between thread creation and memory reclaim. The system_taskq
925 * which is also a dynamic taskq cannot be safely used for this.
926 */
927 static int
taskq_thread_spawn(taskq_t * tq)928 taskq_thread_spawn(taskq_t *tq)
929 {
930 int spawning = 0;
931
932 if (!(tq->tq_flags & TASKQ_DYNAMIC))
933 return (0);
934
935 tq->lastspawnstop = jiffies;
936 if ((tq->tq_nthreads + tq->tq_nspawn < tq->tq_maxthreads) &&
937 (tq->tq_flags & TASKQ_ACTIVE)) {
938 spawning = (++tq->tq_nspawn);
939 taskq_dispatch(dynamic_taskq, taskq_thread_spawn_task,
940 tq, TQ_NOSLEEP);
941 }
942
943 return (spawning);
944 }
945
946 /*
947 * Threads in a dynamic taskq may exit once there is no more work to do.
948 * To prevent threads from being created and destroyed too often limit
949 * the exit rate to one per spl_taskq_thread_timeout_ms.
950 *
951 * The first thread is the thread list is treated as the primary thread.
952 * There is nothing special about the primary thread but in order to avoid
953 * all the taskq pids from changing we opt to make it long running.
954 */
955 static int
taskq_thread_should_stop(taskq_t * tq,taskq_thread_t * tqt)956 taskq_thread_should_stop(taskq_t *tq, taskq_thread_t *tqt)
957 {
958 ASSERT(!taskq_next_ent(tq));
959 if (!(tq->tq_flags & TASKQ_DYNAMIC) || !spl_taskq_thread_dynamic)
960 return (0);
961 if (!(tq->tq_flags & TASKQ_ACTIVE))
962 return (1);
963 if (list_first_entry(&(tq->tq_thread_list), taskq_thread_t,
964 tqt_thread_list) == tqt)
965 return (0);
966 ASSERT3U(tq->tq_nthreads, >, 1);
967 if (tq->tq_nspawn != 0)
968 return (0);
969 if (time_before(jiffies, tq->lastspawnstop +
970 msecs_to_jiffies(spl_taskq_thread_timeout_ms)))
971 return (0);
972 tq->lastspawnstop = jiffies;
973 return (1);
974 }
975
976 static int
taskq_thread(void * args)977 taskq_thread(void *args)
978 {
979 DECLARE_WAITQUEUE(wait, current);
980 sigset_t blocked;
981 taskq_thread_t *tqt = args;
982 taskq_t *tq;
983 taskq_ent_t *t;
984 int seq_tasks = 0;
985 unsigned long flags;
986 taskq_ent_t dup_task = {};
987
988 ASSERT(tqt);
989 ASSERT(tqt->tqt_tq);
990 tq = tqt->tqt_tq;
991 current->flags |= PF_NOFREEZE;
992
993 (void) spl_fstrans_mark();
994
995 sigfillset(&blocked);
996 sigprocmask(SIG_BLOCK, &blocked, NULL);
997 flush_signals(current);
998
999 tsd_set(taskq_tsd, tq);
1000 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
1001 /*
1002 * If we are dynamically spawned, decrease spawning count. Note that
1003 * we could be created during taskq_create, in which case we shouldn't
1004 * do the decrement. But it's fine because taskq_create will reset
1005 * tq_nspawn later.
1006 */
1007 if (tq->tq_flags & TASKQ_DYNAMIC)
1008 tq->tq_nspawn--;
1009
1010 /* Immediately exit if more threads than allowed were created. */
1011 if (tq->tq_nthreads >= tq->tq_maxthreads)
1012 goto error;
1013
1014 tq->tq_nthreads++;
1015 list_add_tail(&tqt->tqt_thread_list, &tq->tq_thread_list);
1016 wake_up(&tq->tq_wait_waitq);
1017 set_current_state(TASK_INTERRUPTIBLE);
1018
1019 TQSTAT_INC(tq, threads_total);
1020
1021 while (!kthread_should_stop()) {
1022
1023 if (list_empty(&tq->tq_pend_list) &&
1024 list_empty(&tq->tq_prio_list)) {
1025
1026 if (taskq_thread_should_stop(tq, tqt))
1027 break;
1028
1029 add_wait_queue_exclusive(&tq->tq_work_waitq, &wait);
1030 spin_unlock_irqrestore(&tq->tq_lock, flags);
1031
1032 TQSTAT_INC(tq, thread_sleeps);
1033 TQSTAT_INC(tq, threads_idle);
1034
1035 schedule();
1036 seq_tasks = 0;
1037
1038 TQSTAT_DEC(tq, threads_idle);
1039 TQSTAT_INC(tq, thread_wakeups);
1040
1041 spin_lock_irqsave_nested(&tq->tq_lock, flags,
1042 tq->tq_lock_class);
1043 remove_wait_queue(&tq->tq_work_waitq, &wait);
1044 } else {
1045 __set_current_state(TASK_RUNNING);
1046 }
1047
1048 if ((t = taskq_next_ent(tq)) != NULL) {
1049 list_del_init(&t->tqent_list);
1050 TQSTAT_DEC_LIST(tq, t);
1051 TQSTAT_DEC(tq, tasks_total);
1052
1053 /*
1054 * A TQENT_FLAG_PREALLOC task may be reused or freed
1055 * during the task function call. Store tqent_id and
1056 * tqent_flags here.
1057 *
1058 * Also use an on stack taskq_ent_t for tqt_task
1059 * assignment in this case; we want to make sure
1060 * to duplicate all fields, so the values are
1061 * correct when it's accessed via DTRACE_PROBE*.
1062 */
1063 tqt->tqt_id = t->tqent_id;
1064 tqt->tqt_flags = t->tqent_flags;
1065
1066 if (t->tqent_flags & TQENT_FLAG_PREALLOC) {
1067 dup_task = *t;
1068 t = &dup_task;
1069 }
1070 tqt->tqt_task = t;
1071
1072 taskq_insert_in_order(tq, tqt);
1073 tq->tq_nactive++;
1074 spin_unlock_irqrestore(&tq->tq_lock, flags);
1075
1076 TQSTAT_INC(tq, threads_active);
1077 DTRACE_PROBE1(taskq_ent__start, taskq_ent_t *, t);
1078
1079 /* Perform the requested task */
1080 t->tqent_func(t->tqent_arg);
1081
1082 DTRACE_PROBE1(taskq_ent__finish, taskq_ent_t *, t);
1083
1084 TQSTAT_DEC(tq, threads_active);
1085 if ((t->tqent_flags & TQENT_LIST_MASK) ==
1086 TQENT_LIST_PENDING)
1087 TQSTAT_INC(tq, tasks_executed_normal);
1088 else
1089 TQSTAT_INC(tq, tasks_executed_priority);
1090 TQSTAT_INC(tq, tasks_executed);
1091
1092 spin_lock_irqsave_nested(&tq->tq_lock, flags,
1093 tq->tq_lock_class);
1094
1095 tq->tq_nactive--;
1096 list_del_init(&tqt->tqt_active_list);
1097 tqt->tqt_task = NULL;
1098
1099 /* For prealloc'd tasks, we don't free anything. */
1100 if (!(tqt->tqt_flags & TQENT_FLAG_PREALLOC))
1101 task_done(tq, t);
1102
1103 /*
1104 * When the current lowest outstanding taskqid is
1105 * done calculate the new lowest outstanding id
1106 */
1107 if (tq->tq_lowest_id == tqt->tqt_id) {
1108 tq->tq_lowest_id = taskq_lowest_id(tq);
1109 ASSERT3S(tq->tq_lowest_id, >, tqt->tqt_id);
1110 }
1111
1112 /* Spawn additional taskq threads if required. */
1113 if ((++seq_tasks) > spl_taskq_thread_sequential &&
1114 taskq_thread_spawn(tq))
1115 seq_tasks = 0;
1116
1117 tqt->tqt_id = TASKQID_INVALID;
1118 tqt->tqt_flags = 0;
1119 wake_up_all(&tq->tq_wait_waitq);
1120 } else
1121 TQSTAT_INC(tq, thread_wakeups_nowork);
1122
1123 set_current_state(TASK_INTERRUPTIBLE);
1124
1125 }
1126
1127 __set_current_state(TASK_RUNNING);
1128 tq->tq_nthreads--;
1129 list_del_init(&tqt->tqt_thread_list);
1130
1131 TQSTAT_DEC(tq, threads_total);
1132 TQSTAT_INC(tq, threads_destroyed);
1133
1134 error:
1135 kmem_free(tqt, sizeof (taskq_thread_t));
1136 spin_unlock_irqrestore(&tq->tq_lock, flags);
1137
1138 tsd_set(taskq_tsd, NULL);
1139 thread_exit();
1140
1141 return (0);
1142 }
1143
1144 static taskq_thread_t *
taskq_thread_create(taskq_t * tq)1145 taskq_thread_create(taskq_t *tq)
1146 {
1147 static int last_used_cpu = 0;
1148 taskq_thread_t *tqt;
1149
1150 tqt = kmem_alloc(sizeof (*tqt), KM_PUSHPAGE);
1151 INIT_LIST_HEAD(&tqt->tqt_thread_list);
1152 INIT_LIST_HEAD(&tqt->tqt_active_list);
1153 tqt->tqt_tq = tq;
1154 tqt->tqt_id = TASKQID_INVALID;
1155
1156 tqt->tqt_thread = spl_kthread_create(taskq_thread, tqt,
1157 "%s", tq->tq_name);
1158 if (tqt->tqt_thread == NULL) {
1159 kmem_free(tqt, sizeof (taskq_thread_t));
1160 return (NULL);
1161 }
1162
1163 if (spl_taskq_thread_bind) {
1164 last_used_cpu = (last_used_cpu + 1) % num_online_cpus();
1165 kthread_bind(tqt->tqt_thread, last_used_cpu);
1166 }
1167
1168 if (spl_taskq_thread_priority)
1169 set_user_nice(tqt->tqt_thread, PRIO_TO_NICE(tq->tq_pri));
1170
1171 wake_up_process(tqt->tqt_thread);
1172
1173 TQSTAT_INC(tq, threads_created);
1174
1175 return (tqt);
1176 }
1177
1178 static void
taskq_stats_init(taskq_t * tq)1179 taskq_stats_init(taskq_t *tq)
1180 {
1181 taskq_sums_t *tqs = &tq->tq_sums;
1182 wmsum_init(&tqs->tqs_threads_active, 0);
1183 wmsum_init(&tqs->tqs_threads_idle, 0);
1184 wmsum_init(&tqs->tqs_threads_total, 0);
1185 wmsum_init(&tqs->tqs_tasks_pending, 0);
1186 wmsum_init(&tqs->tqs_tasks_priority, 0);
1187 wmsum_init(&tqs->tqs_tasks_total, 0);
1188 wmsum_init(&tqs->tqs_tasks_delayed, 0);
1189 wmsum_init(&tqs->tqs_entries_free, 0);
1190 wmsum_init(&tqs->tqs_threads_created, 0);
1191 wmsum_init(&tqs->tqs_threads_destroyed, 0);
1192 wmsum_init(&tqs->tqs_tasks_dispatched, 0);
1193 wmsum_init(&tqs->tqs_tasks_dispatched_delayed, 0);
1194 wmsum_init(&tqs->tqs_tasks_executed_normal, 0);
1195 wmsum_init(&tqs->tqs_tasks_executed_priority, 0);
1196 wmsum_init(&tqs->tqs_tasks_executed, 0);
1197 wmsum_init(&tqs->tqs_tasks_delayed_requeued, 0);
1198 wmsum_init(&tqs->tqs_tasks_cancelled, 0);
1199 wmsum_init(&tqs->tqs_thread_wakeups, 0);
1200 wmsum_init(&tqs->tqs_thread_wakeups_nowork, 0);
1201 wmsum_init(&tqs->tqs_thread_sleeps, 0);
1202 }
1203
1204 static void
taskq_stats_fini(taskq_t * tq)1205 taskq_stats_fini(taskq_t *tq)
1206 {
1207 taskq_sums_t *tqs = &tq->tq_sums;
1208 wmsum_fini(&tqs->tqs_threads_active);
1209 wmsum_fini(&tqs->tqs_threads_idle);
1210 wmsum_fini(&tqs->tqs_threads_total);
1211 wmsum_fini(&tqs->tqs_tasks_pending);
1212 wmsum_fini(&tqs->tqs_tasks_priority);
1213 wmsum_fini(&tqs->tqs_tasks_total);
1214 wmsum_fini(&tqs->tqs_tasks_delayed);
1215 wmsum_fini(&tqs->tqs_entries_free);
1216 wmsum_fini(&tqs->tqs_threads_created);
1217 wmsum_fini(&tqs->tqs_threads_destroyed);
1218 wmsum_fini(&tqs->tqs_tasks_dispatched);
1219 wmsum_fini(&tqs->tqs_tasks_dispatched_delayed);
1220 wmsum_fini(&tqs->tqs_tasks_executed_normal);
1221 wmsum_fini(&tqs->tqs_tasks_executed_priority);
1222 wmsum_fini(&tqs->tqs_tasks_executed);
1223 wmsum_fini(&tqs->tqs_tasks_delayed_requeued);
1224 wmsum_fini(&tqs->tqs_tasks_cancelled);
1225 wmsum_fini(&tqs->tqs_thread_wakeups);
1226 wmsum_fini(&tqs->tqs_thread_wakeups_nowork);
1227 wmsum_fini(&tqs->tqs_thread_sleeps);
1228 }
1229
1230 static int
taskq_kstats_update(kstat_t * ksp,int rw)1231 taskq_kstats_update(kstat_t *ksp, int rw)
1232 {
1233 if (rw == KSTAT_WRITE)
1234 return (EACCES);
1235
1236 taskq_t *tq = ksp->ks_private;
1237 taskq_kstats_t *tqks = ksp->ks_data;
1238
1239 tqks->tqks_threads_max.value.ui64 = tq->tq_maxthreads;
1240 tqks->tqks_entry_pool_min.value.ui64 = tq->tq_minalloc;
1241 tqks->tqks_entry_pool_max.value.ui64 = tq->tq_maxalloc;
1242
1243 taskq_sums_t *tqs = &tq->tq_sums;
1244
1245 tqks->tqks_threads_active.value.ui64 =
1246 wmsum_value(&tqs->tqs_threads_active);
1247 tqks->tqks_threads_idle.value.ui64 =
1248 wmsum_value(&tqs->tqs_threads_idle);
1249 tqks->tqks_threads_total.value.ui64 =
1250 wmsum_value(&tqs->tqs_threads_total);
1251 tqks->tqks_tasks_pending.value.ui64 =
1252 wmsum_value(&tqs->tqs_tasks_pending);
1253 tqks->tqks_tasks_priority.value.ui64 =
1254 wmsum_value(&tqs->tqs_tasks_priority);
1255 tqks->tqks_tasks_total.value.ui64 =
1256 wmsum_value(&tqs->tqs_tasks_total);
1257 tqks->tqks_tasks_delayed.value.ui64 =
1258 wmsum_value(&tqs->tqs_tasks_delayed);
1259 tqks->tqks_entries_free.value.ui64 =
1260 wmsum_value(&tqs->tqs_entries_free);
1261 tqks->tqks_threads_created.value.ui64 =
1262 wmsum_value(&tqs->tqs_threads_created);
1263 tqks->tqks_threads_destroyed.value.ui64 =
1264 wmsum_value(&tqs->tqs_threads_destroyed);
1265 tqks->tqks_tasks_dispatched.value.ui64 =
1266 wmsum_value(&tqs->tqs_tasks_dispatched);
1267 tqks->tqks_tasks_dispatched_delayed.value.ui64 =
1268 wmsum_value(&tqs->tqs_tasks_dispatched_delayed);
1269 tqks->tqks_tasks_executed_normal.value.ui64 =
1270 wmsum_value(&tqs->tqs_tasks_executed_normal);
1271 tqks->tqks_tasks_executed_priority.value.ui64 =
1272 wmsum_value(&tqs->tqs_tasks_executed_priority);
1273 tqks->tqks_tasks_executed.value.ui64 =
1274 wmsum_value(&tqs->tqs_tasks_executed);
1275 tqks->tqks_tasks_delayed_requeued.value.ui64 =
1276 wmsum_value(&tqs->tqs_tasks_delayed_requeued);
1277 tqks->tqks_tasks_cancelled.value.ui64 =
1278 wmsum_value(&tqs->tqs_tasks_cancelled);
1279 tqks->tqks_thread_wakeups.value.ui64 =
1280 wmsum_value(&tqs->tqs_thread_wakeups);
1281 tqks->tqks_thread_wakeups_nowork.value.ui64 =
1282 wmsum_value(&tqs->tqs_thread_wakeups_nowork);
1283 tqks->tqks_thread_sleeps.value.ui64 =
1284 wmsum_value(&tqs->tqs_thread_sleeps);
1285
1286 return (0);
1287 }
1288
1289 static void
taskq_kstats_init(taskq_t * tq)1290 taskq_kstats_init(taskq_t *tq)
1291 {
1292 char name[TASKQ_NAMELEN+5]; /* 5 for dot, 3x instance digits, null */
1293 snprintf(name, sizeof (name), "%s.%d", tq->tq_name, tq->tq_instance);
1294
1295 kstat_t *ksp = kstat_create("taskq", 0, name, "misc",
1296 KSTAT_TYPE_NAMED, sizeof (taskq_kstats_t) / sizeof (kstat_named_t),
1297 KSTAT_FLAG_VIRTUAL);
1298
1299 if (ksp == NULL)
1300 return;
1301
1302 ksp->ks_private = tq;
1303 ksp->ks_update = taskq_kstats_update;
1304 ksp->ks_data = kmem_alloc(sizeof (taskq_kstats_t), KM_SLEEP);
1305 memcpy(ksp->ks_data, &taskq_kstats_template, sizeof (taskq_kstats_t));
1306 kstat_install(ksp);
1307
1308 tq->tq_ksp = ksp;
1309 }
1310
1311 static void
taskq_kstats_fini(taskq_t * tq)1312 taskq_kstats_fini(taskq_t *tq)
1313 {
1314 if (tq->tq_ksp == NULL)
1315 return;
1316
1317 kmem_free(tq->tq_ksp->ks_data, sizeof (taskq_kstats_t));
1318 kstat_delete(tq->tq_ksp);
1319
1320 tq->tq_ksp = NULL;
1321 }
1322
1323 taskq_t *
taskq_create(const char * name,int threads_arg,pri_t pri,int minalloc,int maxalloc,uint_t flags)1324 taskq_create(const char *name, int threads_arg, pri_t pri,
1325 int minalloc, int maxalloc, uint_t flags)
1326 {
1327 taskq_t *tq;
1328 taskq_thread_t *tqt;
1329 int count = 0, rc = 0, i;
1330 unsigned long irqflags;
1331 int nthreads = threads_arg;
1332
1333 ASSERT(name != NULL);
1334 ASSERT(minalloc >= 0);
1335 ASSERT(!(flags & (TASKQ_CPR_SAFE))); /* Unsupported */
1336
1337 /* Scale the number of threads using nthreads as a percentage */
1338 if (flags & TASKQ_THREADS_CPU_PCT) {
1339 ASSERT(nthreads <= 100);
1340 ASSERT(nthreads >= 0);
1341 nthreads = MIN(threads_arg, 100);
1342 nthreads = MAX(nthreads, 0);
1343 nthreads = MAX((num_online_cpus() * nthreads) /100, 1);
1344 }
1345
1346 tq = kmem_alloc(sizeof (*tq), KM_PUSHPAGE);
1347 if (tq == NULL)
1348 return (NULL);
1349
1350 tq->tq_hp_support = B_FALSE;
1351
1352 if (flags & TASKQ_THREADS_CPU_PCT) {
1353 tq->tq_hp_support = B_TRUE;
1354 if (cpuhp_state_add_instance_nocalls(spl_taskq_cpuhp_state,
1355 &tq->tq_hp_cb_node) != 0) {
1356 kmem_free(tq, sizeof (*tq));
1357 return (NULL);
1358 }
1359 }
1360
1361 spin_lock_init(&tq->tq_lock);
1362 INIT_LIST_HEAD(&tq->tq_thread_list);
1363 INIT_LIST_HEAD(&tq->tq_active_list);
1364 tq->tq_name = kmem_strdup(name);
1365 tq->tq_nactive = 0;
1366 tq->tq_nthreads = 0;
1367 tq->tq_nspawn = 0;
1368 tq->tq_maxthreads = nthreads;
1369 tq->tq_cpu_pct = threads_arg;
1370 tq->tq_pri = pri;
1371 tq->tq_minalloc = minalloc;
1372 tq->tq_maxalloc = maxalloc;
1373 tq->tq_nalloc = 0;
1374 tq->tq_flags = (flags | TASKQ_ACTIVE);
1375 tq->tq_next_id = TASKQID_INITIAL;
1376 tq->tq_lowest_id = TASKQID_INITIAL;
1377 tq->lastspawnstop = jiffies;
1378 INIT_LIST_HEAD(&tq->tq_free_list);
1379 INIT_LIST_HEAD(&tq->tq_pend_list);
1380 INIT_LIST_HEAD(&tq->tq_prio_list);
1381 INIT_LIST_HEAD(&tq->tq_delay_list);
1382 init_waitqueue_head(&tq->tq_work_waitq);
1383 init_waitqueue_head(&tq->tq_wait_waitq);
1384 tq->tq_lock_class = TQ_LOCK_GENERAL;
1385 INIT_LIST_HEAD(&tq->tq_taskqs);
1386 taskq_stats_init(tq);
1387
1388 if (flags & TASKQ_PREPOPULATE) {
1389 spin_lock_irqsave_nested(&tq->tq_lock, irqflags,
1390 tq->tq_lock_class);
1391
1392 for (i = 0; i < minalloc; i++)
1393 task_done(tq, task_alloc(tq, TQ_PUSHPAGE | TQ_NEW,
1394 &irqflags));
1395
1396 spin_unlock_irqrestore(&tq->tq_lock, irqflags);
1397 }
1398
1399 if ((flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic)
1400 nthreads = 1;
1401
1402 for (i = 0; i < nthreads; i++) {
1403 tqt = taskq_thread_create(tq);
1404 if (tqt == NULL)
1405 rc = 1;
1406 else
1407 count++;
1408 }
1409
1410 /* Wait for all threads to be started before potential destroy */
1411 wait_event(tq->tq_wait_waitq, tq->tq_nthreads == count);
1412 /*
1413 * taskq_thread might have touched nspawn, but we don't want them to
1414 * because they're not dynamically spawned. So we reset it to 0
1415 */
1416 tq->tq_nspawn = 0;
1417
1418 if (rc) {
1419 taskq_destroy(tq);
1420 return (NULL);
1421 }
1422
1423 down_write(&tq_list_sem);
1424 tq->tq_instance = taskq_find_by_name(name) + 1;
1425 list_add_tail(&tq->tq_taskqs, &tq_list);
1426 up_write(&tq_list_sem);
1427
1428 /* Install kstats late, because the name includes tq_instance */
1429 taskq_kstats_init(tq);
1430
1431 return (tq);
1432 }
1433 EXPORT_SYMBOL(taskq_create);
1434
1435 void
taskq_destroy(taskq_t * tq)1436 taskq_destroy(taskq_t *tq)
1437 {
1438 struct task_struct *thread;
1439 taskq_thread_t *tqt;
1440 taskq_ent_t *t;
1441 unsigned long flags;
1442
1443 ASSERT(tq);
1444 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
1445 tq->tq_flags &= ~TASKQ_ACTIVE;
1446 spin_unlock_irqrestore(&tq->tq_lock, flags);
1447
1448 if (tq->tq_hp_support) {
1449 VERIFY0(cpuhp_state_remove_instance_nocalls(
1450 spl_taskq_cpuhp_state, &tq->tq_hp_cb_node));
1451 }
1452
1453 /*
1454 * When TASKQ_ACTIVE is clear new tasks may not be added nor may
1455 * new worker threads be spawned for dynamic taskq.
1456 */
1457 if (dynamic_taskq != NULL)
1458 taskq_wait_outstanding(dynamic_taskq, 0);
1459
1460 taskq_wait(tq);
1461
1462 taskq_kstats_fini(tq);
1463
1464 /* remove taskq from global list used by the kstats */
1465 down_write(&tq_list_sem);
1466 list_del(&tq->tq_taskqs);
1467 up_write(&tq_list_sem);
1468
1469 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
1470 /* wait for spawning threads to insert themselves to the list */
1471 while (tq->tq_nspawn) {
1472 spin_unlock_irqrestore(&tq->tq_lock, flags);
1473 schedule_timeout_interruptible(1);
1474 spin_lock_irqsave_nested(&tq->tq_lock, flags,
1475 tq->tq_lock_class);
1476 }
1477
1478 /*
1479 * Signal each thread to exit and block until it does. Each thread
1480 * is responsible for removing itself from the list and freeing its
1481 * taskq_thread_t. This allows for idle threads to opt to remove
1482 * themselves from the taskq. They can be recreated as needed.
1483 */
1484 while (!list_empty(&tq->tq_thread_list)) {
1485 tqt = list_entry(tq->tq_thread_list.next,
1486 taskq_thread_t, tqt_thread_list);
1487 thread = tqt->tqt_thread;
1488 spin_unlock_irqrestore(&tq->tq_lock, flags);
1489
1490 kthread_stop(thread);
1491
1492 spin_lock_irqsave_nested(&tq->tq_lock, flags,
1493 tq->tq_lock_class);
1494 }
1495
1496 while (!list_empty(&tq->tq_free_list)) {
1497 t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);
1498
1499 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
1500
1501 list_del_init(&t->tqent_list);
1502 task_free(tq, t);
1503 }
1504
1505 ASSERT0(tq->tq_nthreads);
1506 ASSERT0(tq->tq_nalloc);
1507 ASSERT0(tq->tq_nspawn);
1508 ASSERT(list_empty(&tq->tq_thread_list));
1509 ASSERT(list_empty(&tq->tq_active_list));
1510 ASSERT(list_empty(&tq->tq_free_list));
1511 ASSERT(list_empty(&tq->tq_pend_list));
1512 ASSERT(list_empty(&tq->tq_prio_list));
1513 ASSERT(list_empty(&tq->tq_delay_list));
1514
1515 spin_unlock_irqrestore(&tq->tq_lock, flags);
1516
1517 taskq_stats_fini(tq);
1518 kmem_strfree(tq->tq_name);
1519 kmem_free(tq, sizeof (taskq_t));
1520 }
1521 EXPORT_SYMBOL(taskq_destroy);
1522
1523 /*
1524 * Create a taskq with a specified number of pool threads. Allocate
1525 * and return an array of nthreads kthread_t pointers, one for each
1526 * thread in the pool. The array is not ordered and must be freed
1527 * by the caller.
1528 */
1529 taskq_t *
taskq_create_synced(const char * name,int nthreads,pri_t pri,int minalloc,int maxalloc,uint_t flags,kthread_t *** ktpp)1530 taskq_create_synced(const char *name, int nthreads, pri_t pri,
1531 int minalloc, int maxalloc, uint_t flags, kthread_t ***ktpp)
1532 {
1533 taskq_t *tq;
1534 taskq_thread_t *tqt;
1535 int i = 0;
1536 kthread_t **kthreads = kmem_zalloc(sizeof (*kthreads) * nthreads,
1537 KM_SLEEP);
1538
1539 flags &= ~(TASKQ_DYNAMIC | TASKQ_THREADS_CPU_PCT | TASKQ_DC_BATCH);
1540
1541 /* taskq_create spawns all the threads before returning */
1542 tq = taskq_create(name, nthreads, minclsyspri, nthreads, INT_MAX,
1543 flags | TASKQ_PREPOPULATE);
1544 VERIFY(tq != NULL);
1545 VERIFY(tq->tq_nthreads == nthreads);
1546
1547 list_for_each_entry(tqt, &tq->tq_thread_list, tqt_thread_list) {
1548 kthreads[i] = tqt->tqt_thread;
1549 i++;
1550 }
1551
1552 ASSERT3S(i, ==, nthreads);
1553 *ktpp = kthreads;
1554
1555 return (tq);
1556 }
1557 EXPORT_SYMBOL(taskq_create_synced);
1558
1559 static kstat_t *taskq_summary_ksp = NULL;
1560
1561 static int
spl_taskq_kstat_headers(char * buf,size_t size)1562 spl_taskq_kstat_headers(char *buf, size_t size)
1563 {
1564 size_t n = snprintf(buf, size,
1565 "%-20s | %-17s | %-23s\n"
1566 "%-20s | %-17s | %-23s\n"
1567 "%-20s | %-17s | %-23s\n",
1568 "", "threads", "tasks on queue",
1569 "taskq name", "tot [act idl] max", " pend [ norm high] dly",
1570 "--------------------", "-----------------",
1571 "-----------------------");
1572 return (n >= size ? ENOMEM : 0);
1573 }
1574
1575 static int
spl_taskq_kstat_data(char * buf,size_t size,void * data)1576 spl_taskq_kstat_data(char *buf, size_t size, void *data)
1577 {
1578 struct list_head *tql = NULL;
1579 taskq_t *tq;
1580 char name[TASKQ_NAMELEN+5]; /* 5 for dot, 3x instance digits, null */
1581 char threads[25];
1582 char tasks[30];
1583 size_t n;
1584 int err = 0;
1585
1586 down_read(&tq_list_sem);
1587 list_for_each_prev(tql, &tq_list) {
1588 tq = list_entry(tql, taskq_t, tq_taskqs);
1589
1590 mutex_enter(tq->tq_ksp->ks_lock);
1591 taskq_kstats_update(tq->tq_ksp, KSTAT_READ);
1592 taskq_kstats_t *tqks = tq->tq_ksp->ks_data;
1593
1594 snprintf(name, sizeof (name), "%s.%d", tq->tq_name,
1595 tq->tq_instance);
1596 snprintf(threads, sizeof (threads), "%3llu [%3llu %3llu] %3llu",
1597 tqks->tqks_threads_total.value.ui64,
1598 tqks->tqks_threads_active.value.ui64,
1599 tqks->tqks_threads_idle.value.ui64,
1600 tqks->tqks_threads_max.value.ui64);
1601 snprintf(tasks, sizeof (tasks), "%5llu [%5llu %5llu] %3llu",
1602 tqks->tqks_tasks_total.value.ui64,
1603 tqks->tqks_tasks_pending.value.ui64,
1604 tqks->tqks_tasks_priority.value.ui64,
1605 tqks->tqks_tasks_delayed.value.ui64);
1606
1607 mutex_exit(tq->tq_ksp->ks_lock);
1608
1609 n = snprintf(buf, size, "%-20s | %-17s | %-23s\n",
1610 name, threads, tasks);
1611 if (n >= size) {
1612 err = ENOMEM;
1613 break;
1614 }
1615
1616 buf = &buf[n];
1617 size -= n;
1618 }
1619
1620 up_read(&tq_list_sem);
1621
1622 return (err);
1623 }
1624
1625 static void
spl_taskq_kstat_init(void)1626 spl_taskq_kstat_init(void)
1627 {
1628 kstat_t *ksp = kstat_create("taskq", 0, "summary", "misc",
1629 KSTAT_TYPE_RAW, 0, KSTAT_FLAG_VIRTUAL);
1630
1631 if (ksp == NULL)
1632 return;
1633
1634 ksp->ks_data = (void *)(uintptr_t)1;
1635 ksp->ks_ndata = 1;
1636 kstat_set_raw_ops(ksp, spl_taskq_kstat_headers,
1637 spl_taskq_kstat_data, NULL);
1638 kstat_install(ksp);
1639
1640 taskq_summary_ksp = ksp;
1641 }
1642
1643 static void
spl_taskq_kstat_fini(void)1644 spl_taskq_kstat_fini(void)
1645 {
1646 if (taskq_summary_ksp == NULL)
1647 return;
1648
1649 kstat_delete(taskq_summary_ksp);
1650 taskq_summary_ksp = NULL;
1651 }
1652
1653 static unsigned int spl_taskq_kick = 0;
1654
1655 static int
param_set_taskq_kick(const char * val,zfs_kernel_param_t * kp)1656 param_set_taskq_kick(const char *val, zfs_kernel_param_t *kp)
1657 {
1658 int ret;
1659 taskq_t *tq = NULL;
1660 taskq_ent_t *t;
1661 unsigned long flags;
1662
1663 ret = param_set_uint(val, kp);
1664 if (ret < 0 || !spl_taskq_kick)
1665 return (ret);
1666 /* reset value */
1667 spl_taskq_kick = 0;
1668
1669 down_read(&tq_list_sem);
1670 list_for_each_entry(tq, &tq_list, tq_taskqs) {
1671 spin_lock_irqsave_nested(&tq->tq_lock, flags,
1672 tq->tq_lock_class);
1673 /* Check if the first pending is older than 5 seconds */
1674 t = taskq_next_ent(tq);
1675 if (t && time_after(jiffies, t->tqent_birth + 5*HZ)) {
1676 (void) taskq_thread_spawn(tq);
1677 printk(KERN_INFO "spl: Kicked taskq %s/%d\n",
1678 tq->tq_name, tq->tq_instance);
1679 }
1680 spin_unlock_irqrestore(&tq->tq_lock, flags);
1681 }
1682 up_read(&tq_list_sem);
1683 return (ret);
1684 }
1685
1686 module_param_call(spl_taskq_kick, param_set_taskq_kick, param_get_uint,
1687 &spl_taskq_kick, 0644);
1688 MODULE_PARM_DESC(spl_taskq_kick,
1689 "Write nonzero to kick stuck taskqs to spawn more threads");
1690
1691 /*
1692 * This callback will be called exactly once for each core that comes online,
1693 * for each dynamic taskq. We attempt to expand taskqs that have
1694 * TASKQ_THREADS_CPU_PCT set. We need to redo the percentage calculation every
1695 * time, to correctly determine whether or not to add a thread.
1696 */
1697 static int
spl_taskq_expand(unsigned int cpu,struct hlist_node * node)1698 spl_taskq_expand(unsigned int cpu, struct hlist_node *node)
1699 {
1700 taskq_t *tq = list_entry(node, taskq_t, tq_hp_cb_node);
1701 unsigned long flags;
1702 int err = 0;
1703
1704 ASSERT(tq);
1705 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
1706
1707 if (!(tq->tq_flags & TASKQ_ACTIVE)) {
1708 spin_unlock_irqrestore(&tq->tq_lock, flags);
1709 return (err);
1710 }
1711
1712 ASSERT(tq->tq_flags & TASKQ_THREADS_CPU_PCT);
1713 int nthreads = MIN(tq->tq_cpu_pct, 100);
1714 nthreads = MAX(((num_online_cpus() + 1) * nthreads) / 100, 1);
1715 tq->tq_maxthreads = nthreads;
1716
1717 if (!((tq->tq_flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic) &&
1718 tq->tq_maxthreads > tq->tq_nthreads) {
1719 spin_unlock_irqrestore(&tq->tq_lock, flags);
1720 taskq_thread_t *tqt = taskq_thread_create(tq);
1721 if (tqt == NULL)
1722 err = -1;
1723 return (err);
1724 }
1725 spin_unlock_irqrestore(&tq->tq_lock, flags);
1726 return (err);
1727 }
1728
1729 /*
1730 * While we don't support offlining CPUs, it is possible that CPUs will fail
1731 * to online successfully. We do need to be able to handle this case
1732 * gracefully.
1733 */
1734 static int
spl_taskq_prepare_down(unsigned int cpu,struct hlist_node * node)1735 spl_taskq_prepare_down(unsigned int cpu, struct hlist_node *node)
1736 {
1737 taskq_t *tq = list_entry(node, taskq_t, tq_hp_cb_node);
1738 unsigned long flags;
1739
1740 ASSERT(tq);
1741 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
1742
1743 if (!(tq->tq_flags & TASKQ_ACTIVE))
1744 goto out;
1745
1746 ASSERT(tq->tq_flags & TASKQ_THREADS_CPU_PCT);
1747 int nthreads = MIN(tq->tq_cpu_pct, 100);
1748 nthreads = MAX(((num_online_cpus()) * nthreads) / 100, 1);
1749 tq->tq_maxthreads = nthreads;
1750
1751 if (!((tq->tq_flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic) &&
1752 tq->tq_maxthreads < tq->tq_nthreads) {
1753 ASSERT3U(tq->tq_maxthreads, ==, tq->tq_nthreads - 1);
1754 taskq_thread_t *tqt = list_entry(tq->tq_thread_list.next,
1755 taskq_thread_t, tqt_thread_list);
1756 struct task_struct *thread = tqt->tqt_thread;
1757 spin_unlock_irqrestore(&tq->tq_lock, flags);
1758
1759 kthread_stop(thread);
1760
1761 return (0);
1762 }
1763
1764 out:
1765 spin_unlock_irqrestore(&tq->tq_lock, flags);
1766 return (0);
1767 }
1768
1769 int
spl_taskq_init(void)1770 spl_taskq_init(void)
1771 {
1772 init_rwsem(&tq_list_sem);
1773 tsd_create(&taskq_tsd, NULL);
1774
1775 spl_taskq_cpuhp_state = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN,
1776 "fs/spl_taskq:online", spl_taskq_expand, spl_taskq_prepare_down);
1777
1778 system_taskq = taskq_create("spl_system_taskq", MAX(boot_ncpus, 64),
1779 maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE|TASKQ_DYNAMIC);
1780 if (system_taskq == NULL)
1781 return (-ENOMEM);
1782
1783 system_delay_taskq = taskq_create("spl_delay_taskq", MAX(boot_ncpus, 4),
1784 maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE|TASKQ_DYNAMIC);
1785 if (system_delay_taskq == NULL) {
1786 cpuhp_remove_multi_state(spl_taskq_cpuhp_state);
1787 taskq_destroy(system_taskq);
1788 return (-ENOMEM);
1789 }
1790
1791 dynamic_taskq = taskq_create("spl_dynamic_taskq", 1,
1792 maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE);
1793 if (dynamic_taskq == NULL) {
1794 cpuhp_remove_multi_state(spl_taskq_cpuhp_state);
1795 taskq_destroy(system_taskq);
1796 taskq_destroy(system_delay_taskq);
1797 return (-ENOMEM);
1798 }
1799
1800 /*
1801 * This is used to annotate tq_lock, so
1802 * taskq_dispatch -> taskq_thread_spawn -> taskq_dispatch
1803 * does not trigger a lockdep warning re: possible recursive locking
1804 */
1805 dynamic_taskq->tq_lock_class = TQ_LOCK_DYNAMIC;
1806
1807 spl_taskq_kstat_init();
1808
1809 return (0);
1810 }
1811
1812 void
spl_taskq_fini(void)1813 spl_taskq_fini(void)
1814 {
1815 spl_taskq_kstat_fini();
1816
1817 taskq_destroy(dynamic_taskq);
1818 dynamic_taskq = NULL;
1819
1820 taskq_destroy(system_delay_taskq);
1821 system_delay_taskq = NULL;
1822
1823 taskq_destroy(system_taskq);
1824 system_taskq = NULL;
1825
1826 tsd_destroy(&taskq_tsd);
1827
1828 cpuhp_remove_multi_state(spl_taskq_cpuhp_state);
1829 spl_taskq_cpuhp_state = 0;
1830 }
1831