1 /*
2 * Copyright (C) 2007-2010 Lawrence Livermore National Security, LLC.
3 * Copyright (C) 2007 The Regents of the University of California.
4 * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
5 * Written by Brian Behlendorf <behlendorf1@llnl.gov>.
6 * UCRL-CODE-235197
7 *
8 * This file is part of the SPL, Solaris Porting Layer.
9 *
10 * The SPL is free software; you can redistribute it and/or modify it
11 * under the terms of the GNU General Public License as published by the
12 * Free Software Foundation; either version 2 of the License, or (at your
13 * option) any later version.
14 *
15 * The SPL is distributed in the hope that it will be useful, but WITHOUT
16 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
17 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
18 * for more details.
19 *
20 * You should have received a copy of the GNU General Public License along
21 * with the SPL. If not, see <http://www.gnu.org/licenses/>.
22 *
23 * Solaris Porting Layer (SPL) Task Queue Implementation.
24 */
25 /*
26 * Copyright (c) 2024, Klara Inc.
27 * Copyright (c) 2024, Syneto
28 */
29
30 #include <sys/timer.h>
31 #include <sys/taskq.h>
32 #include <sys/kmem.h>
33 #include <sys/tsd.h>
34 #include <sys/trace_spl.h>
35 #include <sys/time.h>
36 #include <sys/atomic.h>
37 #include <sys/kstat.h>
38 #include <linux/cpuhotplug.h>
39
40 typedef struct taskq_kstats {
41 /* static values, for completeness */
42 kstat_named_t tqks_threads_max;
43 kstat_named_t tqks_entry_pool_min;
44 kstat_named_t tqks_entry_pool_max;
45
46 /* gauges (inc/dec counters, current value) */
47 kstat_named_t tqks_threads_active;
48 kstat_named_t tqks_threads_idle;
49 kstat_named_t tqks_threads_total;
50 kstat_named_t tqks_tasks_pending;
51 kstat_named_t tqks_tasks_priority;
52 kstat_named_t tqks_tasks_total;
53 kstat_named_t tqks_tasks_delayed;
54 kstat_named_t tqks_entries_free;
55
56 /* counters (inc only, since taskq creation) */
57 kstat_named_t tqks_threads_created;
58 kstat_named_t tqks_threads_destroyed;
59 kstat_named_t tqks_tasks_dispatched;
60 kstat_named_t tqks_tasks_dispatched_delayed;
61 kstat_named_t tqks_tasks_executed_normal;
62 kstat_named_t tqks_tasks_executed_priority;
63 kstat_named_t tqks_tasks_executed;
64 kstat_named_t tqks_tasks_delayed_requeued;
65 kstat_named_t tqks_tasks_cancelled;
66 kstat_named_t tqks_thread_wakeups;
67 kstat_named_t tqks_thread_wakeups_nowork;
68 kstat_named_t tqks_thread_sleeps;
69 } taskq_kstats_t;
70
71 static taskq_kstats_t taskq_kstats_template = {
72 { "threads_max", KSTAT_DATA_UINT64 },
73 { "entry_pool_min", KSTAT_DATA_UINT64 },
74 { "entry_pool_max", KSTAT_DATA_UINT64 },
75 { "threads_active", KSTAT_DATA_UINT64 },
76 { "threads_idle", KSTAT_DATA_UINT64 },
77 { "threads_total", KSTAT_DATA_UINT64 },
78 { "tasks_pending", KSTAT_DATA_UINT64 },
79 { "tasks_priority", KSTAT_DATA_UINT64 },
80 { "tasks_total", KSTAT_DATA_UINT64 },
81 { "tasks_delayed", KSTAT_DATA_UINT64 },
82 { "entries_free", KSTAT_DATA_UINT64 },
83
84 { "threads_created", KSTAT_DATA_UINT64 },
85 { "threads_destroyed", KSTAT_DATA_UINT64 },
86 { "tasks_dispatched", KSTAT_DATA_UINT64 },
87 { "tasks_dispatched_delayed", KSTAT_DATA_UINT64 },
88 { "tasks_executed_normal", KSTAT_DATA_UINT64 },
89 { "tasks_executed_priority", KSTAT_DATA_UINT64 },
90 { "tasks_executed", KSTAT_DATA_UINT64 },
91 { "tasks_delayed_requeued", KSTAT_DATA_UINT64 },
92 { "tasks_cancelled", KSTAT_DATA_UINT64 },
93 { "thread_wakeups", KSTAT_DATA_UINT64 },
94 { "thread_wakeups_nowork", KSTAT_DATA_UINT64 },
95 { "thread_sleeps", KSTAT_DATA_UINT64 },
96 };
97
98 #define TQSTAT_INC(tq, stat) wmsum_add(&tq->tq_sums.tqs_##stat, 1)
99 #define TQSTAT_DEC(tq, stat) wmsum_add(&tq->tq_sums.tqs_##stat, -1)
100
101 #define _TQSTAT_MOD_LIST(mod, tq, t) do { \
102 switch (t->tqent_flags & TQENT_LIST_MASK) { \
103 case TQENT_LIST_NONE: ASSERT(list_empty(&t->tqent_list)); break;\
104 case TQENT_LIST_PENDING: mod(tq, tasks_pending); break; \
105 case TQENT_LIST_PRIORITY: mod(tq, tasks_priority); break; \
106 case TQENT_LIST_DELAY: mod(tq, tasks_delayed); break; \
107 } \
108 } while (0)
109 #define TQSTAT_INC_LIST(tq, t) _TQSTAT_MOD_LIST(TQSTAT_INC, tq, t)
110 #define TQSTAT_DEC_LIST(tq, t) _TQSTAT_MOD_LIST(TQSTAT_DEC, tq, t)
111
112 #define TQENT_SET_LIST(t, l) \
113 t->tqent_flags = (t->tqent_flags & ~TQENT_LIST_MASK) | l;
114
115 static int spl_taskq_thread_bind = 0;
116 module_param(spl_taskq_thread_bind, int, 0644);
117 MODULE_PARM_DESC(spl_taskq_thread_bind, "Bind taskq thread to CPU by default");
118
119 static uint_t spl_taskq_thread_timeout_ms = 5000;
120 module_param(spl_taskq_thread_timeout_ms, uint, 0644);
121 MODULE_PARM_DESC(spl_taskq_thread_timeout_ms,
122 "Minimum idle threads exit interval for dynamic taskqs");
123
124 static int spl_taskq_thread_dynamic = 1;
125 module_param(spl_taskq_thread_dynamic, int, 0444);
126 MODULE_PARM_DESC(spl_taskq_thread_dynamic, "Allow dynamic taskq threads");
127
128 static int spl_taskq_thread_priority = 1;
129 module_param(spl_taskq_thread_priority, int, 0644);
130 MODULE_PARM_DESC(spl_taskq_thread_priority,
131 "Allow non-default priority for taskq threads");
132
133 static uint_t spl_taskq_thread_sequential = 4;
134 module_param(spl_taskq_thread_sequential, uint, 0644);
135 MODULE_PARM_DESC(spl_taskq_thread_sequential,
136 "Create new taskq threads after N sequential tasks");
137
138 /*
139 * Global system-wide dynamic task queue available for all consumers. This
140 * taskq is not intended for long-running tasks; instead, a dedicated taskq
141 * should be created.
142 */
143 taskq_t *system_taskq;
144 EXPORT_SYMBOL(system_taskq);
145 /* Global dynamic task queue for long delay */
146 taskq_t *system_delay_taskq;
147 EXPORT_SYMBOL(system_delay_taskq);
148
149 /* Private dedicated taskq for creating new taskq threads on demand. */
150 static taskq_t *dynamic_taskq;
151 static taskq_thread_t *taskq_thread_create(taskq_t *);
152
153 /* Multi-callback id for cpu hotplugging. */
154 static int spl_taskq_cpuhp_state;
155
156 /* List of all taskqs */
157 LIST_HEAD(tq_list);
158 struct rw_semaphore tq_list_sem;
159 static uint_t taskq_tsd;
160
161 static int
task_km_flags(uint_t flags)162 task_km_flags(uint_t flags)
163 {
164 if (flags & TQ_NOSLEEP)
165 return (KM_NOSLEEP);
166
167 if (flags & TQ_PUSHPAGE)
168 return (KM_PUSHPAGE);
169
170 return (KM_SLEEP);
171 }
172
173 /*
174 * taskq_find_by_name - Find the largest instance number of a named taskq.
175 */
176 static int
taskq_find_by_name(const char * name)177 taskq_find_by_name(const char *name)
178 {
179 struct list_head *tql = NULL;
180 taskq_t *tq;
181
182 list_for_each_prev(tql, &tq_list) {
183 tq = list_entry(tql, taskq_t, tq_taskqs);
184 if (strcmp(name, tq->tq_name) == 0)
185 return (tq->tq_instance);
186 }
187 return (-1);
188 }
189
190 /*
191 * NOTE: Must be called with tq->tq_lock held, returns a list_t which
192 * is not attached to the free, work, or pending taskq lists.
193 */
194 static taskq_ent_t *
task_alloc(taskq_t * tq,uint_t flags,unsigned long * irqflags)195 task_alloc(taskq_t *tq, uint_t flags, unsigned long *irqflags)
196 {
197 taskq_ent_t *t;
198 int count = 0;
199
200 ASSERT(tq);
201 retry:
202 /* Acquire taskq_ent_t's from free list if available */
203 if (!list_empty(&tq->tq_free_list) && !(flags & TQ_NEW)) {
204 t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);
205
206 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
207 ASSERT(!(t->tqent_flags & TQENT_FLAG_CANCEL));
208 ASSERT(!timer_pending(&t->tqent_timer));
209
210 list_del_init(&t->tqent_list);
211 TQSTAT_DEC(tq, entries_free);
212 return (t);
213 }
214
215 /* Free list is empty and memory allocations are prohibited */
216 if (flags & TQ_NOALLOC)
217 return (NULL);
218
219 /* Hit maximum taskq_ent_t pool size */
220 if (tq->tq_nalloc >= tq->tq_maxalloc) {
221 if (flags & TQ_NOSLEEP)
222 return (NULL);
223
224 /*
225 * Sleep periodically polling the free list for an available
226 * taskq_ent_t. Dispatching with TQ_SLEEP should always succeed
227 * but we cannot block forever waiting for an taskq_ent_t to
228 * show up in the free list, otherwise a deadlock can happen.
229 *
230 * Therefore, we need to allocate a new task even if the number
231 * of allocated tasks is above tq->tq_maxalloc, but we still
232 * end up delaying the task allocation by one second, thereby
233 * throttling the task dispatch rate.
234 */
235 spin_unlock_irqrestore(&tq->tq_lock, *irqflags);
236 schedule_timeout_interruptible(HZ / 100);
237 spin_lock_irqsave_nested(&tq->tq_lock, *irqflags,
238 tq->tq_lock_class);
239 if (count < 100) {
240 count++;
241 goto retry;
242 }
243 }
244
245 spin_unlock_irqrestore(&tq->tq_lock, *irqflags);
246 t = kmem_alloc(sizeof (taskq_ent_t), task_km_flags(flags));
247 spin_lock_irqsave_nested(&tq->tq_lock, *irqflags, tq->tq_lock_class);
248
249 if (t) {
250 taskq_init_ent(t);
251 tq->tq_nalloc++;
252 }
253
254 return (t);
255 }
256
257 /*
258 * NOTE: Must be called with tq->tq_lock held, expects the taskq_ent_t
259 * to already be removed from the free, work, or pending taskq lists.
260 */
261 static void
task_free(taskq_t * tq,taskq_ent_t * t)262 task_free(taskq_t *tq, taskq_ent_t *t)
263 {
264 ASSERT(tq);
265 ASSERT(t);
266 ASSERT(list_empty(&t->tqent_list));
267 ASSERT(!timer_pending(&t->tqent_timer));
268
269 kmem_free(t, sizeof (taskq_ent_t));
270 tq->tq_nalloc--;
271 }
272
273 /*
274 * NOTE: Must be called with tq->tq_lock held, either destroys the
275 * taskq_ent_t if too many exist or moves it to the free list for later use.
276 */
277 static void
task_done(taskq_t * tq,taskq_ent_t * t)278 task_done(taskq_t *tq, taskq_ent_t *t)
279 {
280 ASSERT(tq);
281 ASSERT(t);
282 ASSERT(list_empty(&t->tqent_list));
283
284 /* Wake tasks blocked in taskq_wait_id() */
285 wake_up_all(&t->tqent_waitq);
286
287 if (tq->tq_nalloc <= tq->tq_minalloc) {
288 t->tqent_id = TASKQID_INVALID;
289 t->tqent_func = NULL;
290 t->tqent_arg = NULL;
291 t->tqent_flags = 0;
292
293 list_add_tail(&t->tqent_list, &tq->tq_free_list);
294 TQSTAT_INC(tq, entries_free);
295 } else {
296 task_free(tq, t);
297 }
298 }
299
300 /*
301 * When a delayed task timer expires remove it from the delay list and
302 * add it to the priority list in order for immediate processing.
303 */
304 static void
task_expire_impl(taskq_ent_t * t)305 task_expire_impl(taskq_ent_t *t)
306 {
307 taskq_ent_t *w;
308 taskq_t *tq = t->tqent_taskq;
309 struct list_head *l = NULL;
310 unsigned long flags;
311
312 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
313
314 if (t->tqent_flags & TQENT_FLAG_CANCEL) {
315 ASSERT(list_empty(&t->tqent_list));
316 spin_unlock_irqrestore(&tq->tq_lock, flags);
317 return;
318 }
319
320 t->tqent_birth = jiffies;
321 DTRACE_PROBE1(taskq_ent__birth, taskq_ent_t *, t);
322
323 /*
324 * The priority list must be maintained in strict task id order
325 * from lowest to highest for lowest_id to be easily calculable.
326 */
327 list_del(&t->tqent_list);
328 list_for_each_prev(l, &tq->tq_prio_list) {
329 w = list_entry(l, taskq_ent_t, tqent_list);
330 if (w->tqent_id < t->tqent_id) {
331 list_add(&t->tqent_list, l);
332 break;
333 }
334 }
335 if (l == &tq->tq_prio_list)
336 list_add(&t->tqent_list, &tq->tq_prio_list);
337
338 spin_unlock_irqrestore(&tq->tq_lock, flags);
339
340 wake_up(&tq->tq_work_waitq);
341
342 TQSTAT_INC(tq, tasks_delayed_requeued);
343 }
344
345 static void
task_expire(struct timer_list * tl)346 task_expire(struct timer_list *tl)
347 {
348 struct timer_list *tmr = (struct timer_list *)tl;
349 taskq_ent_t *t = from_timer(t, tmr, tqent_timer);
350 task_expire_impl(t);
351 }
352
353 /*
354 * Returns the lowest incomplete taskqid_t. The taskqid_t may
355 * be queued on the pending list, on the priority list, on the
356 * delay list, or on the work list currently being handled, but
357 * it is not 100% complete yet.
358 */
359 static taskqid_t
taskq_lowest_id(taskq_t * tq)360 taskq_lowest_id(taskq_t *tq)
361 {
362 taskqid_t lowest_id = tq->tq_next_id;
363 taskq_ent_t *t;
364 taskq_thread_t *tqt;
365
366 if (!list_empty(&tq->tq_pend_list)) {
367 t = list_entry(tq->tq_pend_list.next, taskq_ent_t, tqent_list);
368 lowest_id = MIN(lowest_id, t->tqent_id);
369 }
370
371 if (!list_empty(&tq->tq_prio_list)) {
372 t = list_entry(tq->tq_prio_list.next, taskq_ent_t, tqent_list);
373 lowest_id = MIN(lowest_id, t->tqent_id);
374 }
375
376 if (!list_empty(&tq->tq_delay_list)) {
377 t = list_entry(tq->tq_delay_list.next, taskq_ent_t, tqent_list);
378 lowest_id = MIN(lowest_id, t->tqent_id);
379 }
380
381 if (!list_empty(&tq->tq_active_list)) {
382 tqt = list_entry(tq->tq_active_list.next, taskq_thread_t,
383 tqt_active_list);
384 ASSERT(tqt->tqt_id != TASKQID_INVALID);
385 lowest_id = MIN(lowest_id, tqt->tqt_id);
386 }
387
388 return (lowest_id);
389 }
390
391 /*
392 * Insert a task into a list keeping the list sorted by increasing taskqid.
393 */
394 static void
taskq_insert_in_order(taskq_t * tq,taskq_thread_t * tqt)395 taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt)
396 {
397 taskq_thread_t *w;
398 struct list_head *l = NULL;
399
400 ASSERT(tq);
401 ASSERT(tqt);
402
403 list_for_each_prev(l, &tq->tq_active_list) {
404 w = list_entry(l, taskq_thread_t, tqt_active_list);
405 if (w->tqt_id < tqt->tqt_id) {
406 list_add(&tqt->tqt_active_list, l);
407 break;
408 }
409 }
410 if (l == &tq->tq_active_list)
411 list_add(&tqt->tqt_active_list, &tq->tq_active_list);
412 }
413
414 /*
415 * Find and return a task from the given list if it exists. The list
416 * must be in lowest to highest task id order.
417 */
418 static taskq_ent_t *
taskq_find_list(taskq_t * tq,struct list_head * lh,taskqid_t id)419 taskq_find_list(taskq_t *tq, struct list_head *lh, taskqid_t id)
420 {
421 struct list_head *l = NULL;
422 taskq_ent_t *t;
423
424 list_for_each(l, lh) {
425 t = list_entry(l, taskq_ent_t, tqent_list);
426
427 if (t->tqent_id == id)
428 return (t);
429
430 if (t->tqent_id > id)
431 break;
432 }
433
434 return (NULL);
435 }
436
437 /*
438 * Find an already dispatched task given the task id regardless of what
439 * state it is in. If a task is still pending it will be returned.
440 * If a task is executing, then -EBUSY will be returned instead.
441 * If the task has already been run then NULL is returned.
442 */
443 static taskq_ent_t *
taskq_find(taskq_t * tq,taskqid_t id)444 taskq_find(taskq_t *tq, taskqid_t id)
445 {
446 taskq_thread_t *tqt;
447 struct list_head *l = NULL;
448 taskq_ent_t *t;
449
450 t = taskq_find_list(tq, &tq->tq_delay_list, id);
451 if (t)
452 return (t);
453
454 t = taskq_find_list(tq, &tq->tq_prio_list, id);
455 if (t)
456 return (t);
457
458 t = taskq_find_list(tq, &tq->tq_pend_list, id);
459 if (t)
460 return (t);
461
462 list_for_each(l, &tq->tq_active_list) {
463 tqt = list_entry(l, taskq_thread_t, tqt_active_list);
464 if (tqt->tqt_id == id) {
465 /*
466 * Instead of returning tqt_task, we just return a non
467 * NULL value to prevent misuse, since tqt_task only
468 * has two valid fields.
469 */
470 return (ERR_PTR(-EBUSY));
471 }
472 }
473
474 return (NULL);
475 }
476
477 /*
478 * Theory for the taskq_wait_id(), taskq_wait_outstanding(), and
479 * taskq_wait() functions below.
480 *
481 * Taskq waiting is accomplished by tracking the lowest outstanding task
482 * id and the next available task id. As tasks are dispatched they are
483 * added to the tail of the pending, priority, or delay lists. As worker
484 * threads become available the tasks are removed from the heads of these
485 * lists and linked to the worker threads. This ensures the lists are
486 * kept sorted by lowest to highest task id.
487 *
488 * Therefore the lowest outstanding task id can be quickly determined by
489 * checking the head item from all of these lists. This value is stored
490 * with the taskq as the lowest id. It only needs to be recalculated when
491 * either the task with the current lowest id completes or is canceled.
492 *
493 * By blocking until the lowest task id exceeds the passed task id the
494 * taskq_wait_outstanding() function can be easily implemented. Similarly,
495 * by blocking until the lowest task id matches the next task id taskq_wait()
496 * can be implemented.
497 *
498 * Callers should be aware that when there are multiple worked threads it
499 * is possible for larger task ids to complete before smaller ones. Also
500 * when the taskq contains delay tasks with small task ids callers may
501 * block for a considerable length of time waiting for them to expire and
502 * execute.
503 */
504 static int
taskq_wait_id_check(taskq_t * tq,taskqid_t id)505 taskq_wait_id_check(taskq_t *tq, taskqid_t id)
506 {
507 int rc;
508 unsigned long flags;
509
510 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
511 rc = (taskq_find(tq, id) == NULL);
512 spin_unlock_irqrestore(&tq->tq_lock, flags);
513
514 return (rc);
515 }
516
517 /*
518 * The taskq_wait_id() function blocks until the passed task id completes.
519 * This does not guarantee that all lower task ids have completed.
520 */
521 void
taskq_wait_id(taskq_t * tq,taskqid_t id)522 taskq_wait_id(taskq_t *tq, taskqid_t id)
523 {
524 wait_event(tq->tq_wait_waitq, taskq_wait_id_check(tq, id));
525 }
526 EXPORT_SYMBOL(taskq_wait_id);
527
528 static int
taskq_wait_outstanding_check(taskq_t * tq,taskqid_t id)529 taskq_wait_outstanding_check(taskq_t *tq, taskqid_t id)
530 {
531 int rc;
532 unsigned long flags;
533
534 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
535 rc = (id < tq->tq_lowest_id);
536 spin_unlock_irqrestore(&tq->tq_lock, flags);
537
538 return (rc);
539 }
540
541 /*
542 * The taskq_wait_outstanding() function will block until all tasks with a
543 * lower taskqid than the passed 'id' have been completed. Note that all
544 * task id's are assigned monotonically at dispatch time. Zero may be
545 * passed for the id to indicate all tasks dispatch up to this point,
546 * but not after, should be waited for.
547 */
548 void
taskq_wait_outstanding(taskq_t * tq,taskqid_t id)549 taskq_wait_outstanding(taskq_t *tq, taskqid_t id)
550 {
551 id = id ? id : tq->tq_next_id - 1;
552 wait_event(tq->tq_wait_waitq, taskq_wait_outstanding_check(tq, id));
553 }
554 EXPORT_SYMBOL(taskq_wait_outstanding);
555
556 static int
taskq_wait_check(taskq_t * tq)557 taskq_wait_check(taskq_t *tq)
558 {
559 int rc;
560 unsigned long flags;
561
562 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
563 rc = (tq->tq_lowest_id == tq->tq_next_id);
564 spin_unlock_irqrestore(&tq->tq_lock, flags);
565
566 return (rc);
567 }
568
569 /*
570 * The taskq_wait() function will block until the taskq is empty.
571 * This means that if a taskq re-dispatches work to itself taskq_wait()
572 * callers will block indefinitely.
573 */
574 void
taskq_wait(taskq_t * tq)575 taskq_wait(taskq_t *tq)
576 {
577 wait_event(tq->tq_wait_waitq, taskq_wait_check(tq));
578 }
579 EXPORT_SYMBOL(taskq_wait);
580
581 int
taskq_member(taskq_t * tq,kthread_t * t)582 taskq_member(taskq_t *tq, kthread_t *t)
583 {
584 return (tq == (taskq_t *)tsd_get_by_thread(taskq_tsd, t));
585 }
586 EXPORT_SYMBOL(taskq_member);
587
588 taskq_t *
taskq_of_curthread(void)589 taskq_of_curthread(void)
590 {
591 return (tsd_get(taskq_tsd));
592 }
593 EXPORT_SYMBOL(taskq_of_curthread);
594
595 /*
596 * Cancel an already dispatched task given the task id. Still pending tasks
597 * will be immediately canceled, and if the task is active the function will
598 * block until it completes. Preallocated tasks which are canceled must be
599 * freed by the caller.
600 */
601 int
taskq_cancel_id(taskq_t * tq,taskqid_t id)602 taskq_cancel_id(taskq_t *tq, taskqid_t id)
603 {
604 taskq_ent_t *t;
605 int rc = ENOENT;
606 unsigned long flags;
607
608 ASSERT(tq);
609
610 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
611 t = taskq_find(tq, id);
612 if (t && t != ERR_PTR(-EBUSY)) {
613 list_del_init(&t->tqent_list);
614 TQSTAT_DEC_LIST(tq, t);
615 TQSTAT_DEC(tq, tasks_total);
616
617 t->tqent_flags |= TQENT_FLAG_CANCEL;
618 TQSTAT_INC(tq, tasks_cancelled);
619
620 /*
621 * When canceling the lowest outstanding task id we
622 * must recalculate the new lowest outstanding id.
623 */
624 if (tq->tq_lowest_id == t->tqent_id) {
625 tq->tq_lowest_id = taskq_lowest_id(tq);
626 ASSERT3S(tq->tq_lowest_id, >, t->tqent_id);
627 }
628
629 /*
630 * The task_expire() function takes the tq->tq_lock so drop
631 * drop the lock before synchronously cancelling the timer.
632 */
633 if (timer_pending(&t->tqent_timer)) {
634 spin_unlock_irqrestore(&tq->tq_lock, flags);
635 del_timer_sync(&t->tqent_timer);
636 spin_lock_irqsave_nested(&tq->tq_lock, flags,
637 tq->tq_lock_class);
638 }
639
640 if (!(t->tqent_flags & TQENT_FLAG_PREALLOC))
641 task_done(tq, t);
642
643 rc = 0;
644 }
645 spin_unlock_irqrestore(&tq->tq_lock, flags);
646
647 if (t == ERR_PTR(-EBUSY)) {
648 taskq_wait_id(tq, id);
649 rc = EBUSY;
650 }
651
652 return (rc);
653 }
654 EXPORT_SYMBOL(taskq_cancel_id);
655
656 static int taskq_thread_spawn(taskq_t *tq);
657
658 taskqid_t
taskq_dispatch(taskq_t * tq,task_func_t func,void * arg,uint_t flags)659 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
660 {
661 taskq_ent_t *t;
662 taskqid_t rc = TASKQID_INVALID;
663 unsigned long irqflags;
664
665 ASSERT(tq);
666 ASSERT(func);
667
668 spin_lock_irqsave_nested(&tq->tq_lock, irqflags, tq->tq_lock_class);
669
670 /* Taskq being destroyed and all tasks drained */
671 if (!(tq->tq_flags & TASKQ_ACTIVE))
672 goto out;
673
674 /* Do not queue the task unless there is idle thread for it */
675 ASSERT(tq->tq_nactive <= tq->tq_nthreads);
676 if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads)) {
677 /* Dynamic taskq may be able to spawn another thread */
678 if (taskq_thread_spawn(tq) == 0)
679 goto out;
680 }
681
682 if ((t = task_alloc(tq, flags, &irqflags)) == NULL)
683 goto out;
684
685 spin_lock(&t->tqent_lock);
686
687 /* Queue to the front of the list to enforce TQ_NOQUEUE semantics */
688 if (flags & TQ_NOQUEUE) {
689 TQENT_SET_LIST(t, TQENT_LIST_PRIORITY);
690 list_add(&t->tqent_list, &tq->tq_prio_list);
691 /* Queue to the priority list instead of the pending list */
692 } else if (flags & TQ_FRONT) {
693 TQENT_SET_LIST(t, TQENT_LIST_PRIORITY);
694 list_add_tail(&t->tqent_list, &tq->tq_prio_list);
695 } else {
696 TQENT_SET_LIST(t, TQENT_LIST_PENDING);
697 list_add_tail(&t->tqent_list, &tq->tq_pend_list);
698 }
699 TQSTAT_INC_LIST(tq, t);
700 TQSTAT_INC(tq, tasks_total);
701
702 t->tqent_id = rc = tq->tq_next_id;
703 tq->tq_next_id++;
704 t->tqent_func = func;
705 t->tqent_arg = arg;
706 t->tqent_taskq = tq;
707 t->tqent_timer.function = NULL;
708 t->tqent_timer.expires = 0;
709
710 t->tqent_birth = jiffies;
711 DTRACE_PROBE1(taskq_ent__birth, taskq_ent_t *, t);
712
713 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
714
715 spin_unlock(&t->tqent_lock);
716
717 wake_up(&tq->tq_work_waitq);
718
719 TQSTAT_INC(tq, tasks_dispatched);
720
721 /* Spawn additional taskq threads if required. */
722 if (!(flags & TQ_NOQUEUE) && tq->tq_nactive == tq->tq_nthreads)
723 (void) taskq_thread_spawn(tq);
724 out:
725 spin_unlock_irqrestore(&tq->tq_lock, irqflags);
726 return (rc);
727 }
728 EXPORT_SYMBOL(taskq_dispatch);
729
730 taskqid_t
taskq_dispatch_delay(taskq_t * tq,task_func_t func,void * arg,uint_t flags,clock_t expire_time)731 taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg,
732 uint_t flags, clock_t expire_time)
733 {
734 taskqid_t rc = TASKQID_INVALID;
735 taskq_ent_t *t;
736 unsigned long irqflags;
737
738 ASSERT(tq);
739 ASSERT(func);
740
741 spin_lock_irqsave_nested(&tq->tq_lock, irqflags, tq->tq_lock_class);
742
743 /* Taskq being destroyed and all tasks drained */
744 if (!(tq->tq_flags & TASKQ_ACTIVE))
745 goto out;
746
747 if ((t = task_alloc(tq, flags, &irqflags)) == NULL)
748 goto out;
749
750 spin_lock(&t->tqent_lock);
751
752 /* Queue to the delay list for subsequent execution */
753 list_add_tail(&t->tqent_list, &tq->tq_delay_list);
754 TQENT_SET_LIST(t, TQENT_LIST_DELAY);
755 TQSTAT_INC_LIST(tq, t);
756 TQSTAT_INC(tq, tasks_total);
757
758 t->tqent_id = rc = tq->tq_next_id;
759 tq->tq_next_id++;
760 t->tqent_func = func;
761 t->tqent_arg = arg;
762 t->tqent_taskq = tq;
763 t->tqent_timer.function = task_expire;
764 t->tqent_timer.expires = (unsigned long)expire_time;
765 add_timer(&t->tqent_timer);
766
767 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
768
769 spin_unlock(&t->tqent_lock);
770
771 TQSTAT_INC(tq, tasks_dispatched_delayed);
772
773 /* Spawn additional taskq threads if required. */
774 if (tq->tq_nactive == tq->tq_nthreads)
775 (void) taskq_thread_spawn(tq);
776 out:
777 spin_unlock_irqrestore(&tq->tq_lock, irqflags);
778 return (rc);
779 }
780 EXPORT_SYMBOL(taskq_dispatch_delay);
781
782 void
taskq_dispatch_ent(taskq_t * tq,task_func_t func,void * arg,uint_t flags,taskq_ent_t * t)783 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
784 taskq_ent_t *t)
785 {
786 unsigned long irqflags;
787 ASSERT(tq);
788 ASSERT(func);
789
790 spin_lock_irqsave_nested(&tq->tq_lock, irqflags,
791 tq->tq_lock_class);
792
793 /* Taskq being destroyed and all tasks drained */
794 if (!(tq->tq_flags & TASKQ_ACTIVE)) {
795 t->tqent_id = TASKQID_INVALID;
796 goto out;
797 }
798
799 if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads)) {
800 /* Dynamic taskq may be able to spawn another thread */
801 if (taskq_thread_spawn(tq) == 0)
802 goto out;
803 flags |= TQ_FRONT;
804 }
805
806 spin_lock(&t->tqent_lock);
807
808 /*
809 * Make sure the entry is not on some other taskq; it is important to
810 * ASSERT() under lock
811 */
812 ASSERT(taskq_empty_ent(t));
813
814 /*
815 * Mark it as a prealloc'd task. This is important
816 * to ensure that we don't free it later.
817 */
818 t->tqent_flags |= TQENT_FLAG_PREALLOC;
819
820 /* Queue to the priority list instead of the pending list */
821 if (flags & TQ_FRONT) {
822 TQENT_SET_LIST(t, TQENT_LIST_PRIORITY);
823 list_add_tail(&t->tqent_list, &tq->tq_prio_list);
824 } else {
825 TQENT_SET_LIST(t, TQENT_LIST_PENDING);
826 list_add_tail(&t->tqent_list, &tq->tq_pend_list);
827 }
828 TQSTAT_INC_LIST(tq, t);
829 TQSTAT_INC(tq, tasks_total);
830
831 t->tqent_id = tq->tq_next_id;
832 tq->tq_next_id++;
833 t->tqent_func = func;
834 t->tqent_arg = arg;
835 t->tqent_taskq = tq;
836
837 t->tqent_birth = jiffies;
838 DTRACE_PROBE1(taskq_ent__birth, taskq_ent_t *, t);
839
840 spin_unlock(&t->tqent_lock);
841
842 wake_up(&tq->tq_work_waitq);
843
844 TQSTAT_INC(tq, tasks_dispatched);
845
846 /* Spawn additional taskq threads if required. */
847 if (tq->tq_nactive == tq->tq_nthreads)
848 (void) taskq_thread_spawn(tq);
849 out:
850 spin_unlock_irqrestore(&tq->tq_lock, irqflags);
851 }
852 EXPORT_SYMBOL(taskq_dispatch_ent);
853
854 int
taskq_empty_ent(taskq_ent_t * t)855 taskq_empty_ent(taskq_ent_t *t)
856 {
857 return (list_empty(&t->tqent_list));
858 }
859 EXPORT_SYMBOL(taskq_empty_ent);
860
861 void
taskq_init_ent(taskq_ent_t * t)862 taskq_init_ent(taskq_ent_t *t)
863 {
864 spin_lock_init(&t->tqent_lock);
865 init_waitqueue_head(&t->tqent_waitq);
866 timer_setup(&t->tqent_timer, NULL, 0);
867 INIT_LIST_HEAD(&t->tqent_list);
868 t->tqent_id = 0;
869 t->tqent_func = NULL;
870 t->tqent_arg = NULL;
871 t->tqent_flags = 0;
872 t->tqent_taskq = NULL;
873 }
874 EXPORT_SYMBOL(taskq_init_ent);
875
876 /*
877 * Return the next pending task, preference is given to tasks on the
878 * priority list which were dispatched with TQ_FRONT.
879 */
880 static taskq_ent_t *
taskq_next_ent(taskq_t * tq)881 taskq_next_ent(taskq_t *tq)
882 {
883 struct list_head *list;
884
885 if (!list_empty(&tq->tq_prio_list))
886 list = &tq->tq_prio_list;
887 else if (!list_empty(&tq->tq_pend_list))
888 list = &tq->tq_pend_list;
889 else
890 return (NULL);
891
892 return (list_entry(list->next, taskq_ent_t, tqent_list));
893 }
894
895 /*
896 * Spawns a new thread for the specified taskq.
897 */
898 static void
taskq_thread_spawn_task(void * arg)899 taskq_thread_spawn_task(void *arg)
900 {
901 taskq_t *tq = (taskq_t *)arg;
902 unsigned long flags;
903
904 if (taskq_thread_create(tq) == NULL) {
905 /* restore spawning count if failed */
906 spin_lock_irqsave_nested(&tq->tq_lock, flags,
907 tq->tq_lock_class);
908 tq->tq_nspawn--;
909 spin_unlock_irqrestore(&tq->tq_lock, flags);
910 }
911 }
912
913 /*
914 * Spawn addition threads for dynamic taskqs (TASKQ_DYNAMIC) the current
915 * number of threads is insufficient to handle the pending tasks. These
916 * new threads must be created by the dedicated dynamic_taskq to avoid
917 * deadlocks between thread creation and memory reclaim. The system_taskq
918 * which is also a dynamic taskq cannot be safely used for this.
919 */
920 static int
taskq_thread_spawn(taskq_t * tq)921 taskq_thread_spawn(taskq_t *tq)
922 {
923 int spawning = 0;
924
925 if (!(tq->tq_flags & TASKQ_DYNAMIC))
926 return (0);
927
928 tq->lastspawnstop = jiffies;
929 if ((tq->tq_nthreads + tq->tq_nspawn < tq->tq_maxthreads) &&
930 (tq->tq_flags & TASKQ_ACTIVE)) {
931 spawning = (++tq->tq_nspawn);
932 taskq_dispatch(dynamic_taskq, taskq_thread_spawn_task,
933 tq, TQ_NOSLEEP);
934 }
935
936 return (spawning);
937 }
938
939 /*
940 * Threads in a dynamic taskq may exit once there is no more work to do.
941 * To prevent threads from being created and destroyed too often limit
942 * the exit rate to one per spl_taskq_thread_timeout_ms.
943 *
944 * The first thread is the thread list is treated as the primary thread.
945 * There is nothing special about the primary thread but in order to avoid
946 * all the taskq pids from changing we opt to make it long running.
947 */
948 static int
taskq_thread_should_stop(taskq_t * tq,taskq_thread_t * tqt)949 taskq_thread_should_stop(taskq_t *tq, taskq_thread_t *tqt)
950 {
951 ASSERT(!taskq_next_ent(tq));
952 if (!(tq->tq_flags & TASKQ_DYNAMIC) || !spl_taskq_thread_dynamic)
953 return (0);
954 if (!(tq->tq_flags & TASKQ_ACTIVE))
955 return (1);
956 if (list_first_entry(&(tq->tq_thread_list), taskq_thread_t,
957 tqt_thread_list) == tqt)
958 return (0);
959 ASSERT3U(tq->tq_nthreads, >, 1);
960 if (tq->tq_nspawn != 0)
961 return (0);
962 if (time_before(jiffies, tq->lastspawnstop +
963 msecs_to_jiffies(spl_taskq_thread_timeout_ms)))
964 return (0);
965 tq->lastspawnstop = jiffies;
966 return (1);
967 }
968
969 static int
taskq_thread(void * args)970 taskq_thread(void *args)
971 {
972 DECLARE_WAITQUEUE(wait, current);
973 sigset_t blocked;
974 taskq_thread_t *tqt = args;
975 taskq_t *tq;
976 taskq_ent_t *t;
977 int seq_tasks = 0;
978 unsigned long flags;
979 taskq_ent_t dup_task = {};
980
981 ASSERT(tqt);
982 ASSERT(tqt->tqt_tq);
983 tq = tqt->tqt_tq;
984 current->flags |= PF_NOFREEZE;
985
986 (void) spl_fstrans_mark();
987
988 sigfillset(&blocked);
989 sigprocmask(SIG_BLOCK, &blocked, NULL);
990 flush_signals(current);
991
992 tsd_set(taskq_tsd, tq);
993 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
994 /*
995 * If we are dynamically spawned, decrease spawning count. Note that
996 * we could be created during taskq_create, in which case we shouldn't
997 * do the decrement. But it's fine because taskq_create will reset
998 * tq_nspawn later.
999 */
1000 if (tq->tq_flags & TASKQ_DYNAMIC)
1001 tq->tq_nspawn--;
1002
1003 /* Immediately exit if more threads than allowed were created. */
1004 if (tq->tq_nthreads >= tq->tq_maxthreads)
1005 goto error;
1006
1007 tq->tq_nthreads++;
1008 list_add_tail(&tqt->tqt_thread_list, &tq->tq_thread_list);
1009 wake_up(&tq->tq_wait_waitq);
1010 set_current_state(TASK_INTERRUPTIBLE);
1011
1012 TQSTAT_INC(tq, threads_total);
1013
1014 while (!kthread_should_stop()) {
1015
1016 if (list_empty(&tq->tq_pend_list) &&
1017 list_empty(&tq->tq_prio_list)) {
1018
1019 if (taskq_thread_should_stop(tq, tqt))
1020 break;
1021
1022 add_wait_queue_exclusive(&tq->tq_work_waitq, &wait);
1023 spin_unlock_irqrestore(&tq->tq_lock, flags);
1024
1025 TQSTAT_INC(tq, thread_sleeps);
1026 TQSTAT_INC(tq, threads_idle);
1027
1028 schedule();
1029 seq_tasks = 0;
1030
1031 TQSTAT_DEC(tq, threads_idle);
1032 TQSTAT_INC(tq, thread_wakeups);
1033
1034 spin_lock_irqsave_nested(&tq->tq_lock, flags,
1035 tq->tq_lock_class);
1036 remove_wait_queue(&tq->tq_work_waitq, &wait);
1037 } else {
1038 __set_current_state(TASK_RUNNING);
1039 }
1040
1041 if ((t = taskq_next_ent(tq)) != NULL) {
1042 list_del_init(&t->tqent_list);
1043 TQSTAT_DEC_LIST(tq, t);
1044 TQSTAT_DEC(tq, tasks_total);
1045
1046 /*
1047 * A TQENT_FLAG_PREALLOC task may be reused or freed
1048 * during the task function call. Store tqent_id and
1049 * tqent_flags here.
1050 *
1051 * Also use an on stack taskq_ent_t for tqt_task
1052 * assignment in this case; we want to make sure
1053 * to duplicate all fields, so the values are
1054 * correct when it's accessed via DTRACE_PROBE*.
1055 */
1056 tqt->tqt_id = t->tqent_id;
1057 tqt->tqt_flags = t->tqent_flags;
1058
1059 if (t->tqent_flags & TQENT_FLAG_PREALLOC) {
1060 dup_task = *t;
1061 t = &dup_task;
1062 }
1063 tqt->tqt_task = t;
1064
1065 taskq_insert_in_order(tq, tqt);
1066 tq->tq_nactive++;
1067 spin_unlock_irqrestore(&tq->tq_lock, flags);
1068
1069 TQSTAT_INC(tq, threads_active);
1070 DTRACE_PROBE1(taskq_ent__start, taskq_ent_t *, t);
1071
1072 /* Perform the requested task */
1073 t->tqent_func(t->tqent_arg);
1074
1075 DTRACE_PROBE1(taskq_ent__finish, taskq_ent_t *, t);
1076
1077 TQSTAT_DEC(tq, threads_active);
1078 if ((t->tqent_flags & TQENT_LIST_MASK) ==
1079 TQENT_LIST_PENDING)
1080 TQSTAT_INC(tq, tasks_executed_normal);
1081 else
1082 TQSTAT_INC(tq, tasks_executed_priority);
1083 TQSTAT_INC(tq, tasks_executed);
1084
1085 spin_lock_irqsave_nested(&tq->tq_lock, flags,
1086 tq->tq_lock_class);
1087
1088 tq->tq_nactive--;
1089 list_del_init(&tqt->tqt_active_list);
1090 tqt->tqt_task = NULL;
1091
1092 /* For prealloc'd tasks, we don't free anything. */
1093 if (!(tqt->tqt_flags & TQENT_FLAG_PREALLOC))
1094 task_done(tq, t);
1095
1096 /*
1097 * When the current lowest outstanding taskqid is
1098 * done calculate the new lowest outstanding id
1099 */
1100 if (tq->tq_lowest_id == tqt->tqt_id) {
1101 tq->tq_lowest_id = taskq_lowest_id(tq);
1102 ASSERT3S(tq->tq_lowest_id, >, tqt->tqt_id);
1103 }
1104
1105 /* Spawn additional taskq threads if required. */
1106 if ((++seq_tasks) > spl_taskq_thread_sequential &&
1107 taskq_thread_spawn(tq))
1108 seq_tasks = 0;
1109
1110 tqt->tqt_id = TASKQID_INVALID;
1111 tqt->tqt_flags = 0;
1112 wake_up_all(&tq->tq_wait_waitq);
1113 } else
1114 TQSTAT_INC(tq, thread_wakeups_nowork);
1115
1116 set_current_state(TASK_INTERRUPTIBLE);
1117
1118 }
1119
1120 __set_current_state(TASK_RUNNING);
1121 tq->tq_nthreads--;
1122 list_del_init(&tqt->tqt_thread_list);
1123
1124 TQSTAT_DEC(tq, threads_total);
1125 TQSTAT_INC(tq, threads_destroyed);
1126
1127 error:
1128 kmem_free(tqt, sizeof (taskq_thread_t));
1129 spin_unlock_irqrestore(&tq->tq_lock, flags);
1130
1131 tsd_set(taskq_tsd, NULL);
1132 thread_exit();
1133
1134 return (0);
1135 }
1136
1137 static taskq_thread_t *
taskq_thread_create(taskq_t * tq)1138 taskq_thread_create(taskq_t *tq)
1139 {
1140 static int last_used_cpu = 0;
1141 taskq_thread_t *tqt;
1142
1143 tqt = kmem_alloc(sizeof (*tqt), KM_PUSHPAGE);
1144 INIT_LIST_HEAD(&tqt->tqt_thread_list);
1145 INIT_LIST_HEAD(&tqt->tqt_active_list);
1146 tqt->tqt_tq = tq;
1147 tqt->tqt_id = TASKQID_INVALID;
1148
1149 tqt->tqt_thread = spl_kthread_create(taskq_thread, tqt,
1150 "%s", tq->tq_name);
1151 if (tqt->tqt_thread == NULL) {
1152 kmem_free(tqt, sizeof (taskq_thread_t));
1153 return (NULL);
1154 }
1155
1156 if (spl_taskq_thread_bind) {
1157 last_used_cpu = (last_used_cpu + 1) % num_online_cpus();
1158 kthread_bind(tqt->tqt_thread, last_used_cpu);
1159 }
1160
1161 if (spl_taskq_thread_priority)
1162 set_user_nice(tqt->tqt_thread, PRIO_TO_NICE(tq->tq_pri));
1163
1164 wake_up_process(tqt->tqt_thread);
1165
1166 TQSTAT_INC(tq, threads_created);
1167
1168 return (tqt);
1169 }
1170
1171 static void
taskq_stats_init(taskq_t * tq)1172 taskq_stats_init(taskq_t *tq)
1173 {
1174 taskq_sums_t *tqs = &tq->tq_sums;
1175 wmsum_init(&tqs->tqs_threads_active, 0);
1176 wmsum_init(&tqs->tqs_threads_idle, 0);
1177 wmsum_init(&tqs->tqs_threads_total, 0);
1178 wmsum_init(&tqs->tqs_tasks_pending, 0);
1179 wmsum_init(&tqs->tqs_tasks_priority, 0);
1180 wmsum_init(&tqs->tqs_tasks_total, 0);
1181 wmsum_init(&tqs->tqs_tasks_delayed, 0);
1182 wmsum_init(&tqs->tqs_entries_free, 0);
1183 wmsum_init(&tqs->tqs_threads_created, 0);
1184 wmsum_init(&tqs->tqs_threads_destroyed, 0);
1185 wmsum_init(&tqs->tqs_tasks_dispatched, 0);
1186 wmsum_init(&tqs->tqs_tasks_dispatched_delayed, 0);
1187 wmsum_init(&tqs->tqs_tasks_executed_normal, 0);
1188 wmsum_init(&tqs->tqs_tasks_executed_priority, 0);
1189 wmsum_init(&tqs->tqs_tasks_executed, 0);
1190 wmsum_init(&tqs->tqs_tasks_delayed_requeued, 0);
1191 wmsum_init(&tqs->tqs_tasks_cancelled, 0);
1192 wmsum_init(&tqs->tqs_thread_wakeups, 0);
1193 wmsum_init(&tqs->tqs_thread_wakeups_nowork, 0);
1194 wmsum_init(&tqs->tqs_thread_sleeps, 0);
1195 }
1196
1197 static void
taskq_stats_fini(taskq_t * tq)1198 taskq_stats_fini(taskq_t *tq)
1199 {
1200 taskq_sums_t *tqs = &tq->tq_sums;
1201 wmsum_fini(&tqs->tqs_threads_active);
1202 wmsum_fini(&tqs->tqs_threads_idle);
1203 wmsum_fini(&tqs->tqs_threads_total);
1204 wmsum_fini(&tqs->tqs_tasks_pending);
1205 wmsum_fini(&tqs->tqs_tasks_priority);
1206 wmsum_fini(&tqs->tqs_tasks_total);
1207 wmsum_fini(&tqs->tqs_tasks_delayed);
1208 wmsum_fini(&tqs->tqs_entries_free);
1209 wmsum_fini(&tqs->tqs_threads_created);
1210 wmsum_fini(&tqs->tqs_threads_destroyed);
1211 wmsum_fini(&tqs->tqs_tasks_dispatched);
1212 wmsum_fini(&tqs->tqs_tasks_dispatched_delayed);
1213 wmsum_fini(&tqs->tqs_tasks_executed_normal);
1214 wmsum_fini(&tqs->tqs_tasks_executed_priority);
1215 wmsum_fini(&tqs->tqs_tasks_executed);
1216 wmsum_fini(&tqs->tqs_tasks_delayed_requeued);
1217 wmsum_fini(&tqs->tqs_tasks_cancelled);
1218 wmsum_fini(&tqs->tqs_thread_wakeups);
1219 wmsum_fini(&tqs->tqs_thread_wakeups_nowork);
1220 wmsum_fini(&tqs->tqs_thread_sleeps);
1221 }
1222
1223 static int
taskq_kstats_update(kstat_t * ksp,int rw)1224 taskq_kstats_update(kstat_t *ksp, int rw)
1225 {
1226 if (rw == KSTAT_WRITE)
1227 return (EACCES);
1228
1229 taskq_t *tq = ksp->ks_private;
1230 taskq_kstats_t *tqks = ksp->ks_data;
1231
1232 tqks->tqks_threads_max.value.ui64 = tq->tq_maxthreads;
1233 tqks->tqks_entry_pool_min.value.ui64 = tq->tq_minalloc;
1234 tqks->tqks_entry_pool_max.value.ui64 = tq->tq_maxalloc;
1235
1236 taskq_sums_t *tqs = &tq->tq_sums;
1237
1238 tqks->tqks_threads_active.value.ui64 =
1239 wmsum_value(&tqs->tqs_threads_active);
1240 tqks->tqks_threads_idle.value.ui64 =
1241 wmsum_value(&tqs->tqs_threads_idle);
1242 tqks->tqks_threads_total.value.ui64 =
1243 wmsum_value(&tqs->tqs_threads_total);
1244 tqks->tqks_tasks_pending.value.ui64 =
1245 wmsum_value(&tqs->tqs_tasks_pending);
1246 tqks->tqks_tasks_priority.value.ui64 =
1247 wmsum_value(&tqs->tqs_tasks_priority);
1248 tqks->tqks_tasks_total.value.ui64 =
1249 wmsum_value(&tqs->tqs_tasks_total);
1250 tqks->tqks_tasks_delayed.value.ui64 =
1251 wmsum_value(&tqs->tqs_tasks_delayed);
1252 tqks->tqks_entries_free.value.ui64 =
1253 wmsum_value(&tqs->tqs_entries_free);
1254 tqks->tqks_threads_created.value.ui64 =
1255 wmsum_value(&tqs->tqs_threads_created);
1256 tqks->tqks_threads_destroyed.value.ui64 =
1257 wmsum_value(&tqs->tqs_threads_destroyed);
1258 tqks->tqks_tasks_dispatched.value.ui64 =
1259 wmsum_value(&tqs->tqs_tasks_dispatched);
1260 tqks->tqks_tasks_dispatched_delayed.value.ui64 =
1261 wmsum_value(&tqs->tqs_tasks_dispatched_delayed);
1262 tqks->tqks_tasks_executed_normal.value.ui64 =
1263 wmsum_value(&tqs->tqs_tasks_executed_normal);
1264 tqks->tqks_tasks_executed_priority.value.ui64 =
1265 wmsum_value(&tqs->tqs_tasks_executed_priority);
1266 tqks->tqks_tasks_executed.value.ui64 =
1267 wmsum_value(&tqs->tqs_tasks_executed);
1268 tqks->tqks_tasks_delayed_requeued.value.ui64 =
1269 wmsum_value(&tqs->tqs_tasks_delayed_requeued);
1270 tqks->tqks_tasks_cancelled.value.ui64 =
1271 wmsum_value(&tqs->tqs_tasks_cancelled);
1272 tqks->tqks_thread_wakeups.value.ui64 =
1273 wmsum_value(&tqs->tqs_thread_wakeups);
1274 tqks->tqks_thread_wakeups_nowork.value.ui64 =
1275 wmsum_value(&tqs->tqs_thread_wakeups_nowork);
1276 tqks->tqks_thread_sleeps.value.ui64 =
1277 wmsum_value(&tqs->tqs_thread_sleeps);
1278
1279 return (0);
1280 }
1281
1282 static void
taskq_kstats_init(taskq_t * tq)1283 taskq_kstats_init(taskq_t *tq)
1284 {
1285 char name[TASKQ_NAMELEN+5]; /* 5 for dot, 3x instance digits, null */
1286 snprintf(name, sizeof (name), "%s.%d", tq->tq_name, tq->tq_instance);
1287
1288 kstat_t *ksp = kstat_create("taskq", 0, name, "misc",
1289 KSTAT_TYPE_NAMED, sizeof (taskq_kstats_t) / sizeof (kstat_named_t),
1290 KSTAT_FLAG_VIRTUAL);
1291
1292 if (ksp == NULL)
1293 return;
1294
1295 ksp->ks_private = tq;
1296 ksp->ks_update = taskq_kstats_update;
1297 ksp->ks_data = kmem_alloc(sizeof (taskq_kstats_t), KM_SLEEP);
1298 memcpy(ksp->ks_data, &taskq_kstats_template, sizeof (taskq_kstats_t));
1299 kstat_install(ksp);
1300
1301 tq->tq_ksp = ksp;
1302 }
1303
1304 static void
taskq_kstats_fini(taskq_t * tq)1305 taskq_kstats_fini(taskq_t *tq)
1306 {
1307 if (tq->tq_ksp == NULL)
1308 return;
1309
1310 kmem_free(tq->tq_ksp->ks_data, sizeof (taskq_kstats_t));
1311 kstat_delete(tq->tq_ksp);
1312
1313 tq->tq_ksp = NULL;
1314 }
1315
1316 taskq_t *
taskq_create(const char * name,int threads_arg,pri_t pri,int minalloc,int maxalloc,uint_t flags)1317 taskq_create(const char *name, int threads_arg, pri_t pri,
1318 int minalloc, int maxalloc, uint_t flags)
1319 {
1320 taskq_t *tq;
1321 taskq_thread_t *tqt;
1322 int count = 0, rc = 0, i;
1323 unsigned long irqflags;
1324 int nthreads = threads_arg;
1325
1326 ASSERT(name != NULL);
1327 ASSERT(minalloc >= 0);
1328 ASSERT(!(flags & (TASKQ_CPR_SAFE))); /* Unsupported */
1329
1330 /* Scale the number of threads using nthreads as a percentage */
1331 if (flags & TASKQ_THREADS_CPU_PCT) {
1332 ASSERT(nthreads <= 100);
1333 ASSERT(nthreads >= 0);
1334 nthreads = MIN(threads_arg, 100);
1335 nthreads = MAX(nthreads, 0);
1336 nthreads = MAX((num_online_cpus() * nthreads) /100, 1);
1337 }
1338
1339 tq = kmem_alloc(sizeof (*tq), KM_PUSHPAGE);
1340 if (tq == NULL)
1341 return (NULL);
1342
1343 tq->tq_hp_support = B_FALSE;
1344
1345 if (flags & TASKQ_THREADS_CPU_PCT) {
1346 tq->tq_hp_support = B_TRUE;
1347 if (cpuhp_state_add_instance_nocalls(spl_taskq_cpuhp_state,
1348 &tq->tq_hp_cb_node) != 0) {
1349 kmem_free(tq, sizeof (*tq));
1350 return (NULL);
1351 }
1352 }
1353
1354 spin_lock_init(&tq->tq_lock);
1355 INIT_LIST_HEAD(&tq->tq_thread_list);
1356 INIT_LIST_HEAD(&tq->tq_active_list);
1357 tq->tq_name = kmem_strdup(name);
1358 tq->tq_nactive = 0;
1359 tq->tq_nthreads = 0;
1360 tq->tq_nspawn = 0;
1361 tq->tq_maxthreads = nthreads;
1362 tq->tq_cpu_pct = threads_arg;
1363 tq->tq_pri = pri;
1364 tq->tq_minalloc = minalloc;
1365 tq->tq_maxalloc = maxalloc;
1366 tq->tq_nalloc = 0;
1367 tq->tq_flags = (flags | TASKQ_ACTIVE);
1368 tq->tq_next_id = TASKQID_INITIAL;
1369 tq->tq_lowest_id = TASKQID_INITIAL;
1370 tq->lastspawnstop = jiffies;
1371 INIT_LIST_HEAD(&tq->tq_free_list);
1372 INIT_LIST_HEAD(&tq->tq_pend_list);
1373 INIT_LIST_HEAD(&tq->tq_prio_list);
1374 INIT_LIST_HEAD(&tq->tq_delay_list);
1375 init_waitqueue_head(&tq->tq_work_waitq);
1376 init_waitqueue_head(&tq->tq_wait_waitq);
1377 tq->tq_lock_class = TQ_LOCK_GENERAL;
1378 INIT_LIST_HEAD(&tq->tq_taskqs);
1379 taskq_stats_init(tq);
1380
1381 if (flags & TASKQ_PREPOPULATE) {
1382 spin_lock_irqsave_nested(&tq->tq_lock, irqflags,
1383 tq->tq_lock_class);
1384
1385 for (i = 0; i < minalloc; i++)
1386 task_done(tq, task_alloc(tq, TQ_PUSHPAGE | TQ_NEW,
1387 &irqflags));
1388
1389 spin_unlock_irqrestore(&tq->tq_lock, irqflags);
1390 }
1391
1392 if ((flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic)
1393 nthreads = 1;
1394
1395 for (i = 0; i < nthreads; i++) {
1396 tqt = taskq_thread_create(tq);
1397 if (tqt == NULL)
1398 rc = 1;
1399 else
1400 count++;
1401 }
1402
1403 /* Wait for all threads to be started before potential destroy */
1404 wait_event(tq->tq_wait_waitq, tq->tq_nthreads == count);
1405 /*
1406 * taskq_thread might have touched nspawn, but we don't want them to
1407 * because they're not dynamically spawned. So we reset it to 0
1408 */
1409 tq->tq_nspawn = 0;
1410
1411 if (rc) {
1412 taskq_destroy(tq);
1413 return (NULL);
1414 }
1415
1416 down_write(&tq_list_sem);
1417 tq->tq_instance = taskq_find_by_name(name) + 1;
1418 list_add_tail(&tq->tq_taskqs, &tq_list);
1419 up_write(&tq_list_sem);
1420
1421 /* Install kstats late, because the name includes tq_instance */
1422 taskq_kstats_init(tq);
1423
1424 return (tq);
1425 }
1426 EXPORT_SYMBOL(taskq_create);
1427
1428 void
taskq_destroy(taskq_t * tq)1429 taskq_destroy(taskq_t *tq)
1430 {
1431 struct task_struct *thread;
1432 taskq_thread_t *tqt;
1433 taskq_ent_t *t;
1434 unsigned long flags;
1435
1436 ASSERT(tq);
1437 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
1438 tq->tq_flags &= ~TASKQ_ACTIVE;
1439 spin_unlock_irqrestore(&tq->tq_lock, flags);
1440
1441 if (tq->tq_hp_support) {
1442 VERIFY0(cpuhp_state_remove_instance_nocalls(
1443 spl_taskq_cpuhp_state, &tq->tq_hp_cb_node));
1444 }
1445
1446 /*
1447 * When TASKQ_ACTIVE is clear new tasks may not be added nor may
1448 * new worker threads be spawned for dynamic taskq.
1449 */
1450 if (dynamic_taskq != NULL)
1451 taskq_wait_outstanding(dynamic_taskq, 0);
1452
1453 taskq_wait(tq);
1454
1455 taskq_kstats_fini(tq);
1456
1457 /* remove taskq from global list used by the kstats */
1458 down_write(&tq_list_sem);
1459 list_del(&tq->tq_taskqs);
1460 up_write(&tq_list_sem);
1461
1462 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
1463 /* wait for spawning threads to insert themselves to the list */
1464 while (tq->tq_nspawn) {
1465 spin_unlock_irqrestore(&tq->tq_lock, flags);
1466 schedule_timeout_interruptible(1);
1467 spin_lock_irqsave_nested(&tq->tq_lock, flags,
1468 tq->tq_lock_class);
1469 }
1470
1471 /*
1472 * Signal each thread to exit and block until it does. Each thread
1473 * is responsible for removing itself from the list and freeing its
1474 * taskq_thread_t. This allows for idle threads to opt to remove
1475 * themselves from the taskq. They can be recreated as needed.
1476 */
1477 while (!list_empty(&tq->tq_thread_list)) {
1478 tqt = list_entry(tq->tq_thread_list.next,
1479 taskq_thread_t, tqt_thread_list);
1480 thread = tqt->tqt_thread;
1481 spin_unlock_irqrestore(&tq->tq_lock, flags);
1482
1483 kthread_stop(thread);
1484
1485 spin_lock_irqsave_nested(&tq->tq_lock, flags,
1486 tq->tq_lock_class);
1487 }
1488
1489 while (!list_empty(&tq->tq_free_list)) {
1490 t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);
1491
1492 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
1493
1494 list_del_init(&t->tqent_list);
1495 task_free(tq, t);
1496 }
1497
1498 ASSERT0(tq->tq_nthreads);
1499 ASSERT0(tq->tq_nalloc);
1500 ASSERT0(tq->tq_nspawn);
1501 ASSERT(list_empty(&tq->tq_thread_list));
1502 ASSERT(list_empty(&tq->tq_active_list));
1503 ASSERT(list_empty(&tq->tq_free_list));
1504 ASSERT(list_empty(&tq->tq_pend_list));
1505 ASSERT(list_empty(&tq->tq_prio_list));
1506 ASSERT(list_empty(&tq->tq_delay_list));
1507
1508 spin_unlock_irqrestore(&tq->tq_lock, flags);
1509
1510 taskq_stats_fini(tq);
1511 kmem_strfree(tq->tq_name);
1512 kmem_free(tq, sizeof (taskq_t));
1513 }
1514 EXPORT_SYMBOL(taskq_destroy);
1515
1516 /*
1517 * Create a taskq with a specified number of pool threads. Allocate
1518 * and return an array of nthreads kthread_t pointers, one for each
1519 * thread in the pool. The array is not ordered and must be freed
1520 * by the caller.
1521 */
1522 taskq_t *
taskq_create_synced(const char * name,int nthreads,pri_t pri,int minalloc,int maxalloc,uint_t flags,kthread_t *** ktpp)1523 taskq_create_synced(const char *name, int nthreads, pri_t pri,
1524 int minalloc, int maxalloc, uint_t flags, kthread_t ***ktpp)
1525 {
1526 taskq_t *tq;
1527 taskq_thread_t *tqt;
1528 int i = 0;
1529 kthread_t **kthreads = kmem_zalloc(sizeof (*kthreads) * nthreads,
1530 KM_SLEEP);
1531
1532 flags &= ~(TASKQ_DYNAMIC | TASKQ_THREADS_CPU_PCT | TASKQ_DC_BATCH);
1533
1534 /* taskq_create spawns all the threads before returning */
1535 tq = taskq_create(name, nthreads, minclsyspri, nthreads, INT_MAX,
1536 flags | TASKQ_PREPOPULATE);
1537 VERIFY(tq != NULL);
1538 VERIFY(tq->tq_nthreads == nthreads);
1539
1540 list_for_each_entry(tqt, &tq->tq_thread_list, tqt_thread_list) {
1541 kthreads[i] = tqt->tqt_thread;
1542 i++;
1543 }
1544
1545 ASSERT3S(i, ==, nthreads);
1546 *ktpp = kthreads;
1547
1548 return (tq);
1549 }
1550 EXPORT_SYMBOL(taskq_create_synced);
1551
1552 static kstat_t *taskq_summary_ksp = NULL;
1553
1554 static int
spl_taskq_kstat_headers(char * buf,size_t size)1555 spl_taskq_kstat_headers(char *buf, size_t size)
1556 {
1557 size_t n = snprintf(buf, size,
1558 "%-20s | %-17s | %-23s\n"
1559 "%-20s | %-17s | %-23s\n"
1560 "%-20s | %-17s | %-23s\n",
1561 "", "threads", "tasks on queue",
1562 "taskq name", "tot [act idl] max", " pend [ norm high] dly",
1563 "--------------------", "-----------------",
1564 "-----------------------");
1565 return (n >= size ? ENOMEM : 0);
1566 }
1567
1568 static int
spl_taskq_kstat_data(char * buf,size_t size,void * data)1569 spl_taskq_kstat_data(char *buf, size_t size, void *data)
1570 {
1571 struct list_head *tql = NULL;
1572 taskq_t *tq;
1573 char name[TASKQ_NAMELEN+5]; /* 5 for dot, 3x instance digits, null */
1574 char threads[25];
1575 char tasks[30];
1576 size_t n;
1577 int err = 0;
1578
1579 down_read(&tq_list_sem);
1580 list_for_each_prev(tql, &tq_list) {
1581 tq = list_entry(tql, taskq_t, tq_taskqs);
1582
1583 mutex_enter(tq->tq_ksp->ks_lock);
1584 taskq_kstats_update(tq->tq_ksp, KSTAT_READ);
1585 taskq_kstats_t *tqks = tq->tq_ksp->ks_data;
1586
1587 snprintf(name, sizeof (name), "%s.%d", tq->tq_name,
1588 tq->tq_instance);
1589 snprintf(threads, sizeof (threads), "%3llu [%3llu %3llu] %3llu",
1590 tqks->tqks_threads_total.value.ui64,
1591 tqks->tqks_threads_active.value.ui64,
1592 tqks->tqks_threads_idle.value.ui64,
1593 tqks->tqks_threads_max.value.ui64);
1594 snprintf(tasks, sizeof (tasks), "%5llu [%5llu %5llu] %3llu",
1595 tqks->tqks_tasks_total.value.ui64,
1596 tqks->tqks_tasks_pending.value.ui64,
1597 tqks->tqks_tasks_priority.value.ui64,
1598 tqks->tqks_tasks_delayed.value.ui64);
1599
1600 mutex_exit(tq->tq_ksp->ks_lock);
1601
1602 n = snprintf(buf, size, "%-20s | %-17s | %-23s\n",
1603 name, threads, tasks);
1604 if (n >= size) {
1605 err = ENOMEM;
1606 break;
1607 }
1608
1609 buf = &buf[n];
1610 size -= n;
1611 }
1612
1613 up_read(&tq_list_sem);
1614
1615 return (err);
1616 }
1617
1618 static void
spl_taskq_kstat_init(void)1619 spl_taskq_kstat_init(void)
1620 {
1621 kstat_t *ksp = kstat_create("taskq", 0, "summary", "misc",
1622 KSTAT_TYPE_RAW, 0, KSTAT_FLAG_VIRTUAL);
1623
1624 if (ksp == NULL)
1625 return;
1626
1627 ksp->ks_data = (void *)(uintptr_t)1;
1628 ksp->ks_ndata = 1;
1629 kstat_set_raw_ops(ksp, spl_taskq_kstat_headers,
1630 spl_taskq_kstat_data, NULL);
1631 kstat_install(ksp);
1632
1633 taskq_summary_ksp = ksp;
1634 }
1635
1636 static void
spl_taskq_kstat_fini(void)1637 spl_taskq_kstat_fini(void)
1638 {
1639 if (taskq_summary_ksp == NULL)
1640 return;
1641
1642 kstat_delete(taskq_summary_ksp);
1643 taskq_summary_ksp = NULL;
1644 }
1645
1646 static unsigned int spl_taskq_kick = 0;
1647
1648 /*
1649 * 2.6.36 API Change
1650 * module_param_cb is introduced to take kernel_param_ops and
1651 * module_param_call is marked as obsolete. Also set and get operations
1652 * were changed to take a 'const struct kernel_param *'.
1653 */
1654 static int
1655 #ifdef module_param_cb
param_set_taskq_kick(const char * val,const struct kernel_param * kp)1656 param_set_taskq_kick(const char *val, const struct kernel_param *kp)
1657 #else
1658 param_set_taskq_kick(const char *val, struct kernel_param *kp)
1659 #endif
1660 {
1661 int ret;
1662 taskq_t *tq = NULL;
1663 taskq_ent_t *t;
1664 unsigned long flags;
1665
1666 ret = param_set_uint(val, kp);
1667 if (ret < 0 || !spl_taskq_kick)
1668 return (ret);
1669 /* reset value */
1670 spl_taskq_kick = 0;
1671
1672 down_read(&tq_list_sem);
1673 list_for_each_entry(tq, &tq_list, tq_taskqs) {
1674 spin_lock_irqsave_nested(&tq->tq_lock, flags,
1675 tq->tq_lock_class);
1676 /* Check if the first pending is older than 5 seconds */
1677 t = taskq_next_ent(tq);
1678 if (t && time_after(jiffies, t->tqent_birth + 5*HZ)) {
1679 (void) taskq_thread_spawn(tq);
1680 printk(KERN_INFO "spl: Kicked taskq %s/%d\n",
1681 tq->tq_name, tq->tq_instance);
1682 }
1683 spin_unlock_irqrestore(&tq->tq_lock, flags);
1684 }
1685 up_read(&tq_list_sem);
1686 return (ret);
1687 }
1688
1689 #ifdef module_param_cb
1690 static const struct kernel_param_ops param_ops_taskq_kick = {
1691 .set = param_set_taskq_kick,
1692 .get = param_get_uint,
1693 };
1694 module_param_cb(spl_taskq_kick, ¶m_ops_taskq_kick, &spl_taskq_kick, 0644);
1695 #else
1696 module_param_call(spl_taskq_kick, param_set_taskq_kick, param_get_uint,
1697 &spl_taskq_kick, 0644);
1698 #endif
1699 MODULE_PARM_DESC(spl_taskq_kick,
1700 "Write nonzero to kick stuck taskqs to spawn more threads");
1701
1702 /*
1703 * This callback will be called exactly once for each core that comes online,
1704 * for each dynamic taskq. We attempt to expand taskqs that have
1705 * TASKQ_THREADS_CPU_PCT set. We need to redo the percentage calculation every
1706 * time, to correctly determine whether or not to add a thread.
1707 */
1708 static int
spl_taskq_expand(unsigned int cpu,struct hlist_node * node)1709 spl_taskq_expand(unsigned int cpu, struct hlist_node *node)
1710 {
1711 taskq_t *tq = list_entry(node, taskq_t, tq_hp_cb_node);
1712 unsigned long flags;
1713 int err = 0;
1714
1715 ASSERT(tq);
1716 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
1717
1718 if (!(tq->tq_flags & TASKQ_ACTIVE)) {
1719 spin_unlock_irqrestore(&tq->tq_lock, flags);
1720 return (err);
1721 }
1722
1723 ASSERT(tq->tq_flags & TASKQ_THREADS_CPU_PCT);
1724 int nthreads = MIN(tq->tq_cpu_pct, 100);
1725 nthreads = MAX(((num_online_cpus() + 1) * nthreads) / 100, 1);
1726 tq->tq_maxthreads = nthreads;
1727
1728 if (!((tq->tq_flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic) &&
1729 tq->tq_maxthreads > tq->tq_nthreads) {
1730 spin_unlock_irqrestore(&tq->tq_lock, flags);
1731 taskq_thread_t *tqt = taskq_thread_create(tq);
1732 if (tqt == NULL)
1733 err = -1;
1734 return (err);
1735 }
1736 spin_unlock_irqrestore(&tq->tq_lock, flags);
1737 return (err);
1738 }
1739
1740 /*
1741 * While we don't support offlining CPUs, it is possible that CPUs will fail
1742 * to online successfully. We do need to be able to handle this case
1743 * gracefully.
1744 */
1745 static int
spl_taskq_prepare_down(unsigned int cpu,struct hlist_node * node)1746 spl_taskq_prepare_down(unsigned int cpu, struct hlist_node *node)
1747 {
1748 taskq_t *tq = list_entry(node, taskq_t, tq_hp_cb_node);
1749 unsigned long flags;
1750
1751 ASSERT(tq);
1752 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
1753
1754 if (!(tq->tq_flags & TASKQ_ACTIVE))
1755 goto out;
1756
1757 ASSERT(tq->tq_flags & TASKQ_THREADS_CPU_PCT);
1758 int nthreads = MIN(tq->tq_cpu_pct, 100);
1759 nthreads = MAX(((num_online_cpus()) * nthreads) / 100, 1);
1760 tq->tq_maxthreads = nthreads;
1761
1762 if (!((tq->tq_flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic) &&
1763 tq->tq_maxthreads < tq->tq_nthreads) {
1764 ASSERT3U(tq->tq_maxthreads, ==, tq->tq_nthreads - 1);
1765 taskq_thread_t *tqt = list_entry(tq->tq_thread_list.next,
1766 taskq_thread_t, tqt_thread_list);
1767 struct task_struct *thread = tqt->tqt_thread;
1768 spin_unlock_irqrestore(&tq->tq_lock, flags);
1769
1770 kthread_stop(thread);
1771
1772 return (0);
1773 }
1774
1775 out:
1776 spin_unlock_irqrestore(&tq->tq_lock, flags);
1777 return (0);
1778 }
1779
1780 int
spl_taskq_init(void)1781 spl_taskq_init(void)
1782 {
1783 init_rwsem(&tq_list_sem);
1784 tsd_create(&taskq_tsd, NULL);
1785
1786 spl_taskq_cpuhp_state = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN,
1787 "fs/spl_taskq:online", spl_taskq_expand, spl_taskq_prepare_down);
1788
1789 system_taskq = taskq_create("spl_system_taskq", MAX(boot_ncpus, 64),
1790 maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE|TASKQ_DYNAMIC);
1791 if (system_taskq == NULL)
1792 return (-ENOMEM);
1793
1794 system_delay_taskq = taskq_create("spl_delay_taskq", MAX(boot_ncpus, 4),
1795 maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE|TASKQ_DYNAMIC);
1796 if (system_delay_taskq == NULL) {
1797 cpuhp_remove_multi_state(spl_taskq_cpuhp_state);
1798 taskq_destroy(system_taskq);
1799 return (-ENOMEM);
1800 }
1801
1802 dynamic_taskq = taskq_create("spl_dynamic_taskq", 1,
1803 maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE);
1804 if (dynamic_taskq == NULL) {
1805 cpuhp_remove_multi_state(spl_taskq_cpuhp_state);
1806 taskq_destroy(system_taskq);
1807 taskq_destroy(system_delay_taskq);
1808 return (-ENOMEM);
1809 }
1810
1811 /*
1812 * This is used to annotate tq_lock, so
1813 * taskq_dispatch -> taskq_thread_spawn -> taskq_dispatch
1814 * does not trigger a lockdep warning re: possible recursive locking
1815 */
1816 dynamic_taskq->tq_lock_class = TQ_LOCK_DYNAMIC;
1817
1818 spl_taskq_kstat_init();
1819
1820 return (0);
1821 }
1822
1823 void
spl_taskq_fini(void)1824 spl_taskq_fini(void)
1825 {
1826 spl_taskq_kstat_fini();
1827
1828 taskq_destroy(dynamic_taskq);
1829 dynamic_taskq = NULL;
1830
1831 taskq_destroy(system_delay_taskq);
1832 system_delay_taskq = NULL;
1833
1834 taskq_destroy(system_taskq);
1835 system_taskq = NULL;
1836
1837 tsd_destroy(&taskq_tsd);
1838
1839 cpuhp_remove_multi_state(spl_taskq_cpuhp_state);
1840 spl_taskq_cpuhp_state = 0;
1841 }
1842