1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3 * Copyright (C) 2007-2010 Lawrence Livermore National Security, LLC.
4 * Copyright (C) 2007 The Regents of the University of California.
5 * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
6 * Written by Brian Behlendorf <behlendorf1@llnl.gov>.
7 * UCRL-CODE-235197
8 *
9 * This file is part of the SPL, Solaris Porting Layer.
10 *
11 * The SPL is free software; you can redistribute it and/or modify it
12 * under the terms of the GNU General Public License as published by the
13 * Free Software Foundation; either version 2 of the License, or (at your
14 * option) any later version.
15 *
16 * The SPL is distributed in the hope that it will be useful, but WITHOUT
17 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19 * for more details.
20 *
21 * You should have received a copy of the GNU General Public License along
22 * with the SPL. If not, see <http://www.gnu.org/licenses/>.
23 *
24 * Solaris Porting Layer (SPL) Task Queue Implementation.
25 */
26 /*
27 * Copyright (c) 2024, Klara Inc.
28 * Copyright (c) 2024, Syneto
29 */
30
31 #include <sys/timer.h>
32 #include <sys/taskq.h>
33 #include <sys/kmem.h>
34 #include <sys/tsd.h>
35 #include <sys/trace_spl.h>
36 #include <sys/time.h>
37 #include <sys/atomic.h>
38 #include <sys/kstat.h>
39 #include <linux/cpuhotplug.h>
40
41 typedef struct taskq_kstats {
42 /* static values, for completeness */
43 kstat_named_t tqks_threads_max;
44 kstat_named_t tqks_entry_pool_min;
45 kstat_named_t tqks_entry_pool_max;
46
47 /* gauges (inc/dec counters, current value) */
48 kstat_named_t tqks_threads_active;
49 kstat_named_t tqks_threads_idle;
50 kstat_named_t tqks_threads_total;
51 kstat_named_t tqks_tasks_pending;
52 kstat_named_t tqks_tasks_priority;
53 kstat_named_t tqks_tasks_total;
54 kstat_named_t tqks_tasks_delayed;
55 kstat_named_t tqks_entries_free;
56
57 /* counters (inc only, since taskq creation) */
58 kstat_named_t tqks_threads_created;
59 kstat_named_t tqks_threads_destroyed;
60 kstat_named_t tqks_tasks_dispatched;
61 kstat_named_t tqks_tasks_dispatched_delayed;
62 kstat_named_t tqks_tasks_executed_normal;
63 kstat_named_t tqks_tasks_executed_priority;
64 kstat_named_t tqks_tasks_executed;
65 kstat_named_t tqks_tasks_delayed_requeued;
66 kstat_named_t tqks_tasks_cancelled;
67 kstat_named_t tqks_thread_wakeups;
68 kstat_named_t tqks_thread_wakeups_nowork;
69 kstat_named_t tqks_thread_sleeps;
70 } taskq_kstats_t;
71
72 static taskq_kstats_t taskq_kstats_template = {
73 { "threads_max", KSTAT_DATA_UINT64 },
74 { "entry_pool_min", KSTAT_DATA_UINT64 },
75 { "entry_pool_max", KSTAT_DATA_UINT64 },
76 { "threads_active", KSTAT_DATA_UINT64 },
77 { "threads_idle", KSTAT_DATA_UINT64 },
78 { "threads_total", KSTAT_DATA_UINT64 },
79 { "tasks_pending", KSTAT_DATA_UINT64 },
80 { "tasks_priority", KSTAT_DATA_UINT64 },
81 { "tasks_total", KSTAT_DATA_UINT64 },
82 { "tasks_delayed", KSTAT_DATA_UINT64 },
83 { "entries_free", KSTAT_DATA_UINT64 },
84
85 { "threads_created", KSTAT_DATA_UINT64 },
86 { "threads_destroyed", KSTAT_DATA_UINT64 },
87 { "tasks_dispatched", KSTAT_DATA_UINT64 },
88 { "tasks_dispatched_delayed", KSTAT_DATA_UINT64 },
89 { "tasks_executed_normal", KSTAT_DATA_UINT64 },
90 { "tasks_executed_priority", KSTAT_DATA_UINT64 },
91 { "tasks_executed", KSTAT_DATA_UINT64 },
92 { "tasks_delayed_requeued", KSTAT_DATA_UINT64 },
93 { "tasks_cancelled", KSTAT_DATA_UINT64 },
94 { "thread_wakeups", KSTAT_DATA_UINT64 },
95 { "thread_wakeups_nowork", KSTAT_DATA_UINT64 },
96 { "thread_sleeps", KSTAT_DATA_UINT64 },
97 };
98
99 #define TQSTAT_INC(tq, stat) wmsum_add(&tq->tq_sums.tqs_##stat, 1)
100 #define TQSTAT_DEC(tq, stat) wmsum_add(&tq->tq_sums.tqs_##stat, -1)
101
102 #define _TQSTAT_MOD_LIST(mod, tq, t) do { \
103 switch (t->tqent_flags & TQENT_LIST_MASK) { \
104 case TQENT_LIST_NONE: ASSERT(list_empty(&t->tqent_list)); break;\
105 case TQENT_LIST_PENDING: mod(tq, tasks_pending); break; \
106 case TQENT_LIST_PRIORITY: mod(tq, tasks_priority); break; \
107 case TQENT_LIST_DELAY: mod(tq, tasks_delayed); break; \
108 } \
109 } while (0)
110 #define TQSTAT_INC_LIST(tq, t) _TQSTAT_MOD_LIST(TQSTAT_INC, tq, t)
111 #define TQSTAT_DEC_LIST(tq, t) _TQSTAT_MOD_LIST(TQSTAT_DEC, tq, t)
112
113 #define TQENT_SET_LIST(t, l) \
114 t->tqent_flags = (t->tqent_flags & ~TQENT_LIST_MASK) | l;
115
116 static int spl_taskq_thread_bind = 0;
117 module_param(spl_taskq_thread_bind, int, 0644);
118 MODULE_PARM_DESC(spl_taskq_thread_bind, "Bind taskq thread to CPU by default");
119
120 static uint_t spl_taskq_thread_timeout_ms = 5000;
121 module_param(spl_taskq_thread_timeout_ms, uint, 0644);
122 MODULE_PARM_DESC(spl_taskq_thread_timeout_ms,
123 "Minimum idle threads exit interval for dynamic taskqs");
124
125 static int spl_taskq_thread_dynamic = 1;
126 module_param(spl_taskq_thread_dynamic, int, 0444);
127 MODULE_PARM_DESC(spl_taskq_thread_dynamic, "Allow dynamic taskq threads");
128
129 static int spl_taskq_thread_priority = 1;
130 module_param(spl_taskq_thread_priority, int, 0644);
131 MODULE_PARM_DESC(spl_taskq_thread_priority,
132 "Allow non-default priority for taskq threads");
133
134 static uint_t spl_taskq_thread_sequential = 4;
135 module_param(spl_taskq_thread_sequential, uint, 0644);
136 MODULE_PARM_DESC(spl_taskq_thread_sequential,
137 "Create new taskq threads after N sequential tasks");
138
139 /*
140 * Global system-wide dynamic task queue available for all consumers. This
141 * taskq is not intended for long-running tasks; instead, a dedicated taskq
142 * should be created.
143 */
144 taskq_t *system_taskq;
145 EXPORT_SYMBOL(system_taskq);
146 /* Global dynamic task queue for long delay */
147 taskq_t *system_delay_taskq;
148 EXPORT_SYMBOL(system_delay_taskq);
149
150 /* Private dedicated taskq for creating new taskq threads on demand. */
151 static taskq_t *dynamic_taskq;
152 static taskq_thread_t *taskq_thread_create(taskq_t *);
153
154 /* Multi-callback id for cpu hotplugging. */
155 static int spl_taskq_cpuhp_state;
156
157 /* List of all taskqs */
158 LIST_HEAD(tq_list);
159 struct rw_semaphore tq_list_sem;
160 static uint_t taskq_tsd;
161
162 static int
task_km_flags(uint_t flags)163 task_km_flags(uint_t flags)
164 {
165 if (flags & TQ_NOSLEEP)
166 return (KM_NOSLEEP);
167
168 if (flags & TQ_PUSHPAGE)
169 return (KM_PUSHPAGE);
170
171 return (KM_SLEEP);
172 }
173
174 /*
175 * taskq_find_by_name - Find the largest instance number of a named taskq.
176 */
177 static int
taskq_find_by_name(const char * name)178 taskq_find_by_name(const char *name)
179 {
180 struct list_head *tql = NULL;
181 taskq_t *tq;
182
183 list_for_each_prev(tql, &tq_list) {
184 tq = list_entry(tql, taskq_t, tq_taskqs);
185 if (strcmp(name, tq->tq_name) == 0)
186 return (tq->tq_instance);
187 }
188 return (-1);
189 }
190
191 /*
192 * NOTE: Must be called with tq->tq_lock held, returns a list_t which
193 * is not attached to the free, work, or pending taskq lists.
194 */
195 static taskq_ent_t *
task_alloc(taskq_t * tq,uint_t flags,unsigned long * irqflags)196 task_alloc(taskq_t *tq, uint_t flags, unsigned long *irqflags)
197 {
198 taskq_ent_t *t;
199 int count = 0;
200
201 ASSERT(tq);
202 retry:
203 /* Acquire taskq_ent_t's from free list if available */
204 if (!list_empty(&tq->tq_free_list) && !(flags & TQ_NEW)) {
205 t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);
206
207 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
208 ASSERT(!(t->tqent_flags & TQENT_FLAG_CANCEL));
209 ASSERT(!timer_pending(&t->tqent_timer));
210
211 list_del_init(&t->tqent_list);
212 TQSTAT_DEC(tq, entries_free);
213 return (t);
214 }
215
216 /* Free list is empty and memory allocations are prohibited */
217 if (flags & TQ_NOALLOC)
218 return (NULL);
219
220 /* Hit maximum taskq_ent_t pool size */
221 if (tq->tq_nalloc >= tq->tq_maxalloc) {
222 if (flags & TQ_NOSLEEP)
223 return (NULL);
224
225 /*
226 * Sleep periodically polling the free list for an available
227 * taskq_ent_t. Dispatching with TQ_SLEEP should always succeed
228 * but we cannot block forever waiting for an taskq_ent_t to
229 * show up in the free list, otherwise a deadlock can happen.
230 *
231 * Therefore, we need to allocate a new task even if the number
232 * of allocated tasks is above tq->tq_maxalloc, but we still
233 * end up delaying the task allocation by one second, thereby
234 * throttling the task dispatch rate.
235 */
236 spin_unlock_irqrestore(&tq->tq_lock, *irqflags);
237 schedule_timeout_interruptible(HZ / 100);
238 spin_lock_irqsave_nested(&tq->tq_lock, *irqflags,
239 tq->tq_lock_class);
240 if (count < 100) {
241 count++;
242 goto retry;
243 }
244 }
245
246 spin_unlock_irqrestore(&tq->tq_lock, *irqflags);
247 t = kmem_alloc(sizeof (taskq_ent_t), task_km_flags(flags));
248 spin_lock_irqsave_nested(&tq->tq_lock, *irqflags, tq->tq_lock_class);
249
250 if (t) {
251 taskq_init_ent(t);
252 tq->tq_nalloc++;
253 }
254
255 return (t);
256 }
257
258 /*
259 * NOTE: Must be called with tq->tq_lock held, expects the taskq_ent_t
260 * to already be removed from the free, work, or pending taskq lists.
261 */
262 static void
task_free(taskq_t * tq,taskq_ent_t * t)263 task_free(taskq_t *tq, taskq_ent_t *t)
264 {
265 ASSERT(tq);
266 ASSERT(t);
267 ASSERT(list_empty(&t->tqent_list));
268 ASSERT(!timer_pending(&t->tqent_timer));
269
270 kmem_free(t, sizeof (taskq_ent_t));
271 tq->tq_nalloc--;
272 }
273
274 /*
275 * NOTE: Must be called with tq->tq_lock held, either destroys the
276 * taskq_ent_t if too many exist or moves it to the free list for later use.
277 */
278 static void
task_done(taskq_t * tq,taskq_ent_t * t)279 task_done(taskq_t *tq, taskq_ent_t *t)
280 {
281 ASSERT(tq);
282 ASSERT(t);
283 ASSERT(list_empty(&t->tqent_list));
284
285 /* Wake tasks blocked in taskq_wait_id() */
286 wake_up_all(&t->tqent_waitq);
287
288 if (tq->tq_nalloc <= tq->tq_minalloc) {
289 t->tqent_id = TASKQID_INVALID;
290 t->tqent_func = NULL;
291 t->tqent_arg = NULL;
292 t->tqent_flags = 0;
293
294 list_add_tail(&t->tqent_list, &tq->tq_free_list);
295 TQSTAT_INC(tq, entries_free);
296 } else {
297 task_free(tq, t);
298 }
299 }
300
301 /*
302 * When a delayed task timer expires remove it from the delay list and
303 * add it to the priority list in order for immediate processing.
304 */
305 static void
task_expire_impl(taskq_ent_t * t)306 task_expire_impl(taskq_ent_t *t)
307 {
308 taskq_ent_t *w;
309 taskq_t *tq = t->tqent_taskq;
310 struct list_head *l = NULL;
311 unsigned long flags;
312
313 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
314
315 if (t->tqent_flags & TQENT_FLAG_CANCEL) {
316 ASSERT(list_empty(&t->tqent_list));
317 spin_unlock_irqrestore(&tq->tq_lock, flags);
318 return;
319 }
320
321 t->tqent_birth = jiffies;
322 DTRACE_PROBE1(taskq_ent__birth, taskq_ent_t *, t);
323
324 /*
325 * The priority list must be maintained in strict task id order
326 * from lowest to highest for lowest_id to be easily calculable.
327 */
328 list_del(&t->tqent_list);
329 list_for_each_prev(l, &tq->tq_prio_list) {
330 w = list_entry(l, taskq_ent_t, tqent_list);
331 if (w->tqent_id < t->tqent_id) {
332 list_add(&t->tqent_list, l);
333 break;
334 }
335 }
336 if (l == &tq->tq_prio_list)
337 list_add(&t->tqent_list, &tq->tq_prio_list);
338
339 spin_unlock_irqrestore(&tq->tq_lock, flags);
340
341 wake_up(&tq->tq_work_waitq);
342
343 TQSTAT_INC(tq, tasks_delayed_requeued);
344 }
345
346 static void
task_expire(struct timer_list * tl)347 task_expire(struct timer_list *tl)
348 {
349 struct timer_list *tmr = (struct timer_list *)tl;
350 taskq_ent_t *t = from_timer(t, tmr, tqent_timer);
351 task_expire_impl(t);
352 }
353
354 /*
355 * Returns the lowest incomplete taskqid_t. The taskqid_t may
356 * be queued on the pending list, on the priority list, on the
357 * delay list, or on the work list currently being handled, but
358 * it is not 100% complete yet.
359 */
360 static taskqid_t
taskq_lowest_id(taskq_t * tq)361 taskq_lowest_id(taskq_t *tq)
362 {
363 taskqid_t lowest_id = tq->tq_next_id;
364 taskq_ent_t *t;
365 taskq_thread_t *tqt;
366
367 if (!list_empty(&tq->tq_pend_list)) {
368 t = list_entry(tq->tq_pend_list.next, taskq_ent_t, tqent_list);
369 lowest_id = MIN(lowest_id, t->tqent_id);
370 }
371
372 if (!list_empty(&tq->tq_prio_list)) {
373 t = list_entry(tq->tq_prio_list.next, taskq_ent_t, tqent_list);
374 lowest_id = MIN(lowest_id, t->tqent_id);
375 }
376
377 if (!list_empty(&tq->tq_delay_list)) {
378 t = list_entry(tq->tq_delay_list.next, taskq_ent_t, tqent_list);
379 lowest_id = MIN(lowest_id, t->tqent_id);
380 }
381
382 if (!list_empty(&tq->tq_active_list)) {
383 tqt = list_entry(tq->tq_active_list.next, taskq_thread_t,
384 tqt_active_list);
385 ASSERT(tqt->tqt_id != TASKQID_INVALID);
386 lowest_id = MIN(lowest_id, tqt->tqt_id);
387 }
388
389 return (lowest_id);
390 }
391
392 /*
393 * Insert a task into a list keeping the list sorted by increasing taskqid.
394 */
395 static void
taskq_insert_in_order(taskq_t * tq,taskq_thread_t * tqt)396 taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt)
397 {
398 taskq_thread_t *w;
399 struct list_head *l = NULL;
400
401 ASSERT(tq);
402 ASSERT(tqt);
403
404 list_for_each_prev(l, &tq->tq_active_list) {
405 w = list_entry(l, taskq_thread_t, tqt_active_list);
406 if (w->tqt_id < tqt->tqt_id) {
407 list_add(&tqt->tqt_active_list, l);
408 break;
409 }
410 }
411 if (l == &tq->tq_active_list)
412 list_add(&tqt->tqt_active_list, &tq->tq_active_list);
413 }
414
415 /*
416 * Find and return a task from the given list if it exists. The list
417 * must be in lowest to highest task id order.
418 */
419 static taskq_ent_t *
taskq_find_list(taskq_t * tq,struct list_head * lh,taskqid_t id)420 taskq_find_list(taskq_t *tq, struct list_head *lh, taskqid_t id)
421 {
422 struct list_head *l = NULL;
423 taskq_ent_t *t;
424
425 list_for_each(l, lh) {
426 t = list_entry(l, taskq_ent_t, tqent_list);
427
428 if (t->tqent_id == id)
429 return (t);
430
431 if (t->tqent_id > id)
432 break;
433 }
434
435 return (NULL);
436 }
437
438 /*
439 * Find an already dispatched task given the task id regardless of what
440 * state it is in. If a task is still pending it will be returned.
441 * If a task is executing, then -EBUSY will be returned instead.
442 * If the task has already been run then NULL is returned.
443 */
444 static taskq_ent_t *
taskq_find(taskq_t * tq,taskqid_t id)445 taskq_find(taskq_t *tq, taskqid_t id)
446 {
447 taskq_thread_t *tqt;
448 struct list_head *l = NULL;
449 taskq_ent_t *t;
450
451 t = taskq_find_list(tq, &tq->tq_delay_list, id);
452 if (t)
453 return (t);
454
455 t = taskq_find_list(tq, &tq->tq_prio_list, id);
456 if (t)
457 return (t);
458
459 t = taskq_find_list(tq, &tq->tq_pend_list, id);
460 if (t)
461 return (t);
462
463 list_for_each(l, &tq->tq_active_list) {
464 tqt = list_entry(l, taskq_thread_t, tqt_active_list);
465 if (tqt->tqt_id == id) {
466 /*
467 * Instead of returning tqt_task, we just return a non
468 * NULL value to prevent misuse, since tqt_task only
469 * has two valid fields.
470 */
471 return (ERR_PTR(-EBUSY));
472 }
473 }
474
475 return (NULL);
476 }
477
478 /*
479 * Theory for the taskq_wait_id(), taskq_wait_outstanding(), and
480 * taskq_wait() functions below.
481 *
482 * Taskq waiting is accomplished by tracking the lowest outstanding task
483 * id and the next available task id. As tasks are dispatched they are
484 * added to the tail of the pending, priority, or delay lists. As worker
485 * threads become available the tasks are removed from the heads of these
486 * lists and linked to the worker threads. This ensures the lists are
487 * kept sorted by lowest to highest task id.
488 *
489 * Therefore the lowest outstanding task id can be quickly determined by
490 * checking the head item from all of these lists. This value is stored
491 * with the taskq as the lowest id. It only needs to be recalculated when
492 * either the task with the current lowest id completes or is canceled.
493 *
494 * By blocking until the lowest task id exceeds the passed task id the
495 * taskq_wait_outstanding() function can be easily implemented. Similarly,
496 * by blocking until the lowest task id matches the next task id taskq_wait()
497 * can be implemented.
498 *
499 * Callers should be aware that when there are multiple worked threads it
500 * is possible for larger task ids to complete before smaller ones. Also
501 * when the taskq contains delay tasks with small task ids callers may
502 * block for a considerable length of time waiting for them to expire and
503 * execute.
504 */
505 static int
taskq_wait_id_check(taskq_t * tq,taskqid_t id)506 taskq_wait_id_check(taskq_t *tq, taskqid_t id)
507 {
508 int rc;
509 unsigned long flags;
510
511 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
512 rc = (taskq_find(tq, id) == NULL);
513 spin_unlock_irqrestore(&tq->tq_lock, flags);
514
515 return (rc);
516 }
517
518 /*
519 * The taskq_wait_id() function blocks until the passed task id completes.
520 * This does not guarantee that all lower task ids have completed.
521 */
522 void
taskq_wait_id(taskq_t * tq,taskqid_t id)523 taskq_wait_id(taskq_t *tq, taskqid_t id)
524 {
525 wait_event(tq->tq_wait_waitq, taskq_wait_id_check(tq, id));
526 }
527 EXPORT_SYMBOL(taskq_wait_id);
528
529 static int
taskq_wait_outstanding_check(taskq_t * tq,taskqid_t id)530 taskq_wait_outstanding_check(taskq_t *tq, taskqid_t id)
531 {
532 int rc;
533 unsigned long flags;
534
535 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
536 rc = (id < tq->tq_lowest_id);
537 spin_unlock_irqrestore(&tq->tq_lock, flags);
538
539 return (rc);
540 }
541
542 /*
543 * The taskq_wait_outstanding() function will block until all tasks with a
544 * lower taskqid than the passed 'id' have been completed. Note that all
545 * task id's are assigned monotonically at dispatch time. Zero may be
546 * passed for the id to indicate all tasks dispatch up to this point,
547 * but not after, should be waited for.
548 */
549 void
taskq_wait_outstanding(taskq_t * tq,taskqid_t id)550 taskq_wait_outstanding(taskq_t *tq, taskqid_t id)
551 {
552 id = id ? id : tq->tq_next_id - 1;
553 wait_event(tq->tq_wait_waitq, taskq_wait_outstanding_check(tq, id));
554 }
555 EXPORT_SYMBOL(taskq_wait_outstanding);
556
557 static int
taskq_wait_check(taskq_t * tq)558 taskq_wait_check(taskq_t *tq)
559 {
560 int rc;
561 unsigned long flags;
562
563 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
564 rc = (tq->tq_lowest_id == tq->tq_next_id);
565 spin_unlock_irqrestore(&tq->tq_lock, flags);
566
567 return (rc);
568 }
569
570 /*
571 * The taskq_wait() function will block until the taskq is empty.
572 * This means that if a taskq re-dispatches work to itself taskq_wait()
573 * callers will block indefinitely.
574 */
575 void
taskq_wait(taskq_t * tq)576 taskq_wait(taskq_t *tq)
577 {
578 wait_event(tq->tq_wait_waitq, taskq_wait_check(tq));
579 }
580 EXPORT_SYMBOL(taskq_wait);
581
582 int
taskq_member(taskq_t * tq,kthread_t * t)583 taskq_member(taskq_t *tq, kthread_t *t)
584 {
585 return (tq == (taskq_t *)tsd_get_by_thread(taskq_tsd, t));
586 }
587 EXPORT_SYMBOL(taskq_member);
588
589 taskq_t *
taskq_of_curthread(void)590 taskq_of_curthread(void)
591 {
592 return (tsd_get(taskq_tsd));
593 }
594 EXPORT_SYMBOL(taskq_of_curthread);
595
596 /*
597 * Cancel an already dispatched task given the task id. Still pending tasks
598 * will be immediately canceled, and if the task is active the function will
599 * block until it completes. Preallocated tasks which are canceled must be
600 * freed by the caller.
601 */
602 int
taskq_cancel_id(taskq_t * tq,taskqid_t id)603 taskq_cancel_id(taskq_t *tq, taskqid_t id)
604 {
605 taskq_ent_t *t;
606 int rc = ENOENT;
607 unsigned long flags;
608
609 ASSERT(tq);
610
611 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
612 t = taskq_find(tq, id);
613 if (t && t != ERR_PTR(-EBUSY)) {
614 list_del_init(&t->tqent_list);
615 TQSTAT_DEC_LIST(tq, t);
616 TQSTAT_DEC(tq, tasks_total);
617
618 t->tqent_flags |= TQENT_FLAG_CANCEL;
619 TQSTAT_INC(tq, tasks_cancelled);
620
621 /*
622 * When canceling the lowest outstanding task id we
623 * must recalculate the new lowest outstanding id.
624 */
625 if (tq->tq_lowest_id == t->tqent_id) {
626 tq->tq_lowest_id = taskq_lowest_id(tq);
627 ASSERT3S(tq->tq_lowest_id, >, t->tqent_id);
628 }
629
630 /*
631 * The task_expire() function takes the tq->tq_lock so drop
632 * drop the lock before synchronously cancelling the timer.
633 */
634 if (timer_pending(&t->tqent_timer)) {
635 spin_unlock_irqrestore(&tq->tq_lock, flags);
636 del_timer_sync(&t->tqent_timer);
637 spin_lock_irqsave_nested(&tq->tq_lock, flags,
638 tq->tq_lock_class);
639 }
640
641 if (!(t->tqent_flags & TQENT_FLAG_PREALLOC))
642 task_done(tq, t);
643
644 rc = 0;
645 }
646 spin_unlock_irqrestore(&tq->tq_lock, flags);
647
648 if (t == ERR_PTR(-EBUSY)) {
649 taskq_wait_id(tq, id);
650 rc = EBUSY;
651 }
652
653 return (rc);
654 }
655 EXPORT_SYMBOL(taskq_cancel_id);
656
657 static int taskq_thread_spawn(taskq_t *tq);
658
659 taskqid_t
taskq_dispatch(taskq_t * tq,task_func_t func,void * arg,uint_t flags)660 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
661 {
662 taskq_ent_t *t;
663 taskqid_t rc = TASKQID_INVALID;
664 unsigned long irqflags;
665
666 ASSERT(tq);
667 ASSERT(func);
668
669 spin_lock_irqsave_nested(&tq->tq_lock, irqflags, tq->tq_lock_class);
670
671 /* Taskq being destroyed and all tasks drained */
672 if (!(tq->tq_flags & TASKQ_ACTIVE))
673 goto out;
674
675 /* Do not queue the task unless there is idle thread for it */
676 ASSERT(tq->tq_nactive <= tq->tq_nthreads);
677 if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads)) {
678 /* Dynamic taskq may be able to spawn another thread */
679 if (taskq_thread_spawn(tq) == 0)
680 goto out;
681 }
682
683 if ((t = task_alloc(tq, flags, &irqflags)) == NULL)
684 goto out;
685
686 spin_lock(&t->tqent_lock);
687
688 /* Queue to the front of the list to enforce TQ_NOQUEUE semantics */
689 if (flags & TQ_NOQUEUE) {
690 TQENT_SET_LIST(t, TQENT_LIST_PRIORITY);
691 list_add(&t->tqent_list, &tq->tq_prio_list);
692 /* Queue to the priority list instead of the pending list */
693 } else if (flags & TQ_FRONT) {
694 TQENT_SET_LIST(t, TQENT_LIST_PRIORITY);
695 list_add_tail(&t->tqent_list, &tq->tq_prio_list);
696 } else {
697 TQENT_SET_LIST(t, TQENT_LIST_PENDING);
698 list_add_tail(&t->tqent_list, &tq->tq_pend_list);
699 }
700 TQSTAT_INC_LIST(tq, t);
701 TQSTAT_INC(tq, tasks_total);
702
703 t->tqent_id = rc = tq->tq_next_id;
704 tq->tq_next_id++;
705 t->tqent_func = func;
706 t->tqent_arg = arg;
707 t->tqent_taskq = tq;
708 t->tqent_timer.function = NULL;
709 t->tqent_timer.expires = 0;
710
711 t->tqent_birth = jiffies;
712 DTRACE_PROBE1(taskq_ent__birth, taskq_ent_t *, t);
713
714 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
715
716 spin_unlock(&t->tqent_lock);
717
718 wake_up(&tq->tq_work_waitq);
719
720 TQSTAT_INC(tq, tasks_dispatched);
721
722 /* Spawn additional taskq threads if required. */
723 if (!(flags & TQ_NOQUEUE) && tq->tq_nactive == tq->tq_nthreads)
724 (void) taskq_thread_spawn(tq);
725 out:
726 spin_unlock_irqrestore(&tq->tq_lock, irqflags);
727 return (rc);
728 }
729 EXPORT_SYMBOL(taskq_dispatch);
730
731 taskqid_t
taskq_dispatch_delay(taskq_t * tq,task_func_t func,void * arg,uint_t flags,clock_t expire_time)732 taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg,
733 uint_t flags, clock_t expire_time)
734 {
735 taskqid_t rc = TASKQID_INVALID;
736 taskq_ent_t *t;
737 unsigned long irqflags;
738
739 ASSERT(tq);
740 ASSERT(func);
741
742 spin_lock_irqsave_nested(&tq->tq_lock, irqflags, tq->tq_lock_class);
743
744 /* Taskq being destroyed and all tasks drained */
745 if (!(tq->tq_flags & TASKQ_ACTIVE))
746 goto out;
747
748 if ((t = task_alloc(tq, flags, &irqflags)) == NULL)
749 goto out;
750
751 spin_lock(&t->tqent_lock);
752
753 /* Queue to the delay list for subsequent execution */
754 list_add_tail(&t->tqent_list, &tq->tq_delay_list);
755 TQENT_SET_LIST(t, TQENT_LIST_DELAY);
756 TQSTAT_INC_LIST(tq, t);
757 TQSTAT_INC(tq, tasks_total);
758
759 t->tqent_id = rc = tq->tq_next_id;
760 tq->tq_next_id++;
761 t->tqent_func = func;
762 t->tqent_arg = arg;
763 t->tqent_taskq = tq;
764 t->tqent_timer.function = task_expire;
765 t->tqent_timer.expires = (unsigned long)expire_time;
766 add_timer(&t->tqent_timer);
767
768 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
769
770 spin_unlock(&t->tqent_lock);
771
772 TQSTAT_INC(tq, tasks_dispatched_delayed);
773
774 /* Spawn additional taskq threads if required. */
775 if (tq->tq_nactive == tq->tq_nthreads)
776 (void) taskq_thread_spawn(tq);
777 out:
778 spin_unlock_irqrestore(&tq->tq_lock, irqflags);
779 return (rc);
780 }
781 EXPORT_SYMBOL(taskq_dispatch_delay);
782
783 void
taskq_dispatch_ent(taskq_t * tq,task_func_t func,void * arg,uint_t flags,taskq_ent_t * t)784 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
785 taskq_ent_t *t)
786 {
787 unsigned long irqflags;
788 ASSERT(tq);
789 ASSERT(func);
790
791 spin_lock_irqsave_nested(&tq->tq_lock, irqflags,
792 tq->tq_lock_class);
793
794 /* Taskq being destroyed and all tasks drained */
795 if (!(tq->tq_flags & TASKQ_ACTIVE)) {
796 t->tqent_id = TASKQID_INVALID;
797 goto out;
798 }
799
800 if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads)) {
801 /* Dynamic taskq may be able to spawn another thread */
802 if (taskq_thread_spawn(tq) == 0)
803 goto out;
804 flags |= TQ_FRONT;
805 }
806
807 spin_lock(&t->tqent_lock);
808
809 /*
810 * Make sure the entry is not on some other taskq; it is important to
811 * ASSERT() under lock
812 */
813 ASSERT(taskq_empty_ent(t));
814
815 /*
816 * Mark it as a prealloc'd task. This is important
817 * to ensure that we don't free it later.
818 */
819 t->tqent_flags |= TQENT_FLAG_PREALLOC;
820
821 /* Queue to the priority list instead of the pending list */
822 if (flags & TQ_FRONT) {
823 TQENT_SET_LIST(t, TQENT_LIST_PRIORITY);
824 list_add_tail(&t->tqent_list, &tq->tq_prio_list);
825 } else {
826 TQENT_SET_LIST(t, TQENT_LIST_PENDING);
827 list_add_tail(&t->tqent_list, &tq->tq_pend_list);
828 }
829 TQSTAT_INC_LIST(tq, t);
830 TQSTAT_INC(tq, tasks_total);
831
832 t->tqent_id = tq->tq_next_id;
833 tq->tq_next_id++;
834 t->tqent_func = func;
835 t->tqent_arg = arg;
836 t->tqent_taskq = tq;
837
838 t->tqent_birth = jiffies;
839 DTRACE_PROBE1(taskq_ent__birth, taskq_ent_t *, t);
840
841 spin_unlock(&t->tqent_lock);
842
843 wake_up(&tq->tq_work_waitq);
844
845 TQSTAT_INC(tq, tasks_dispatched);
846
847 /* Spawn additional taskq threads if required. */
848 if (tq->tq_nactive == tq->tq_nthreads)
849 (void) taskq_thread_spawn(tq);
850 out:
851 spin_unlock_irqrestore(&tq->tq_lock, irqflags);
852 }
853 EXPORT_SYMBOL(taskq_dispatch_ent);
854
855 int
taskq_empty_ent(taskq_ent_t * t)856 taskq_empty_ent(taskq_ent_t *t)
857 {
858 return (list_empty(&t->tqent_list));
859 }
860 EXPORT_SYMBOL(taskq_empty_ent);
861
862 void
taskq_init_ent(taskq_ent_t * t)863 taskq_init_ent(taskq_ent_t *t)
864 {
865 spin_lock_init(&t->tqent_lock);
866 init_waitqueue_head(&t->tqent_waitq);
867 timer_setup(&t->tqent_timer, NULL, 0);
868 INIT_LIST_HEAD(&t->tqent_list);
869 t->tqent_id = 0;
870 t->tqent_func = NULL;
871 t->tqent_arg = NULL;
872 t->tqent_flags = 0;
873 t->tqent_taskq = NULL;
874 }
875 EXPORT_SYMBOL(taskq_init_ent);
876
877 /*
878 * Return the next pending task, preference is given to tasks on the
879 * priority list which were dispatched with TQ_FRONT.
880 */
881 static taskq_ent_t *
taskq_next_ent(taskq_t * tq)882 taskq_next_ent(taskq_t *tq)
883 {
884 struct list_head *list;
885
886 if (!list_empty(&tq->tq_prio_list))
887 list = &tq->tq_prio_list;
888 else if (!list_empty(&tq->tq_pend_list))
889 list = &tq->tq_pend_list;
890 else
891 return (NULL);
892
893 return (list_entry(list->next, taskq_ent_t, tqent_list));
894 }
895
896 /*
897 * Spawns a new thread for the specified taskq.
898 */
899 static void
taskq_thread_spawn_task(void * arg)900 taskq_thread_spawn_task(void *arg)
901 {
902 taskq_t *tq = (taskq_t *)arg;
903 unsigned long flags;
904
905 if (taskq_thread_create(tq) == NULL) {
906 /* restore spawning count if failed */
907 spin_lock_irqsave_nested(&tq->tq_lock, flags,
908 tq->tq_lock_class);
909 tq->tq_nspawn--;
910 spin_unlock_irqrestore(&tq->tq_lock, flags);
911 }
912 }
913
914 /*
915 * Spawn addition threads for dynamic taskqs (TASKQ_DYNAMIC) the current
916 * number of threads is insufficient to handle the pending tasks. These
917 * new threads must be created by the dedicated dynamic_taskq to avoid
918 * deadlocks between thread creation and memory reclaim. The system_taskq
919 * which is also a dynamic taskq cannot be safely used for this.
920 */
921 static int
taskq_thread_spawn(taskq_t * tq)922 taskq_thread_spawn(taskq_t *tq)
923 {
924 int spawning = 0;
925
926 if (!(tq->tq_flags & TASKQ_DYNAMIC))
927 return (0);
928
929 tq->lastspawnstop = jiffies;
930 if ((tq->tq_nthreads + tq->tq_nspawn < tq->tq_maxthreads) &&
931 (tq->tq_flags & TASKQ_ACTIVE)) {
932 spawning = (++tq->tq_nspawn);
933 taskq_dispatch(dynamic_taskq, taskq_thread_spawn_task,
934 tq, TQ_NOSLEEP);
935 }
936
937 return (spawning);
938 }
939
940 /*
941 * Threads in a dynamic taskq may exit once there is no more work to do.
942 * To prevent threads from being created and destroyed too often limit
943 * the exit rate to one per spl_taskq_thread_timeout_ms.
944 *
945 * The first thread is the thread list is treated as the primary thread.
946 * There is nothing special about the primary thread but in order to avoid
947 * all the taskq pids from changing we opt to make it long running.
948 */
949 static int
taskq_thread_should_stop(taskq_t * tq,taskq_thread_t * tqt)950 taskq_thread_should_stop(taskq_t *tq, taskq_thread_t *tqt)
951 {
952 ASSERT(!taskq_next_ent(tq));
953 if (!(tq->tq_flags & TASKQ_DYNAMIC) || !spl_taskq_thread_dynamic)
954 return (0);
955 if (!(tq->tq_flags & TASKQ_ACTIVE))
956 return (1);
957 if (list_first_entry(&(tq->tq_thread_list), taskq_thread_t,
958 tqt_thread_list) == tqt)
959 return (0);
960 ASSERT3U(tq->tq_nthreads, >, 1);
961 if (tq->tq_nspawn != 0)
962 return (0);
963 if (time_before(jiffies, tq->lastspawnstop +
964 msecs_to_jiffies(spl_taskq_thread_timeout_ms)))
965 return (0);
966 tq->lastspawnstop = jiffies;
967 return (1);
968 }
969
970 static int
taskq_thread(void * args)971 taskq_thread(void *args)
972 {
973 DECLARE_WAITQUEUE(wait, current);
974 sigset_t blocked;
975 taskq_thread_t *tqt = args;
976 taskq_t *tq;
977 taskq_ent_t *t;
978 int seq_tasks = 0;
979 unsigned long flags;
980 taskq_ent_t dup_task = {};
981
982 ASSERT(tqt);
983 ASSERT(tqt->tqt_tq);
984 tq = tqt->tqt_tq;
985 current->flags |= PF_NOFREEZE;
986
987 (void) spl_fstrans_mark();
988
989 sigfillset(&blocked);
990 sigprocmask(SIG_BLOCK, &blocked, NULL);
991 flush_signals(current);
992
993 tsd_set(taskq_tsd, tq);
994 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
995 /*
996 * If we are dynamically spawned, decrease spawning count. Note that
997 * we could be created during taskq_create, in which case we shouldn't
998 * do the decrement. But it's fine because taskq_create will reset
999 * tq_nspawn later.
1000 */
1001 if (tq->tq_flags & TASKQ_DYNAMIC)
1002 tq->tq_nspawn--;
1003
1004 /* Immediately exit if more threads than allowed were created. */
1005 if (tq->tq_nthreads >= tq->tq_maxthreads)
1006 goto error;
1007
1008 tq->tq_nthreads++;
1009 list_add_tail(&tqt->tqt_thread_list, &tq->tq_thread_list);
1010 wake_up(&tq->tq_wait_waitq);
1011 set_current_state(TASK_INTERRUPTIBLE);
1012
1013 TQSTAT_INC(tq, threads_total);
1014
1015 while (!kthread_should_stop()) {
1016
1017 if (list_empty(&tq->tq_pend_list) &&
1018 list_empty(&tq->tq_prio_list)) {
1019
1020 if (taskq_thread_should_stop(tq, tqt))
1021 break;
1022
1023 add_wait_queue_exclusive(&tq->tq_work_waitq, &wait);
1024 spin_unlock_irqrestore(&tq->tq_lock, flags);
1025
1026 TQSTAT_INC(tq, thread_sleeps);
1027 TQSTAT_INC(tq, threads_idle);
1028
1029 schedule();
1030 seq_tasks = 0;
1031
1032 TQSTAT_DEC(tq, threads_idle);
1033 TQSTAT_INC(tq, thread_wakeups);
1034
1035 spin_lock_irqsave_nested(&tq->tq_lock, flags,
1036 tq->tq_lock_class);
1037 remove_wait_queue(&tq->tq_work_waitq, &wait);
1038 } else {
1039 __set_current_state(TASK_RUNNING);
1040 }
1041
1042 if ((t = taskq_next_ent(tq)) != NULL) {
1043 list_del_init(&t->tqent_list);
1044 TQSTAT_DEC_LIST(tq, t);
1045 TQSTAT_DEC(tq, tasks_total);
1046
1047 /*
1048 * A TQENT_FLAG_PREALLOC task may be reused or freed
1049 * during the task function call. Store tqent_id and
1050 * tqent_flags here.
1051 *
1052 * Also use an on stack taskq_ent_t for tqt_task
1053 * assignment in this case; we want to make sure
1054 * to duplicate all fields, so the values are
1055 * correct when it's accessed via DTRACE_PROBE*.
1056 */
1057 tqt->tqt_id = t->tqent_id;
1058 tqt->tqt_flags = t->tqent_flags;
1059
1060 if (t->tqent_flags & TQENT_FLAG_PREALLOC) {
1061 dup_task = *t;
1062 t = &dup_task;
1063 }
1064 tqt->tqt_task = t;
1065
1066 taskq_insert_in_order(tq, tqt);
1067 tq->tq_nactive++;
1068 spin_unlock_irqrestore(&tq->tq_lock, flags);
1069
1070 TQSTAT_INC(tq, threads_active);
1071 DTRACE_PROBE1(taskq_ent__start, taskq_ent_t *, t);
1072
1073 /* Perform the requested task */
1074 t->tqent_func(t->tqent_arg);
1075
1076 DTRACE_PROBE1(taskq_ent__finish, taskq_ent_t *, t);
1077
1078 TQSTAT_DEC(tq, threads_active);
1079 if ((t->tqent_flags & TQENT_LIST_MASK) ==
1080 TQENT_LIST_PENDING)
1081 TQSTAT_INC(tq, tasks_executed_normal);
1082 else
1083 TQSTAT_INC(tq, tasks_executed_priority);
1084 TQSTAT_INC(tq, tasks_executed);
1085
1086 spin_lock_irqsave_nested(&tq->tq_lock, flags,
1087 tq->tq_lock_class);
1088
1089 tq->tq_nactive--;
1090 list_del_init(&tqt->tqt_active_list);
1091 tqt->tqt_task = NULL;
1092
1093 /* For prealloc'd tasks, we don't free anything. */
1094 if (!(tqt->tqt_flags & TQENT_FLAG_PREALLOC))
1095 task_done(tq, t);
1096
1097 /*
1098 * When the current lowest outstanding taskqid is
1099 * done calculate the new lowest outstanding id
1100 */
1101 if (tq->tq_lowest_id == tqt->tqt_id) {
1102 tq->tq_lowest_id = taskq_lowest_id(tq);
1103 ASSERT3S(tq->tq_lowest_id, >, tqt->tqt_id);
1104 }
1105
1106 /* Spawn additional taskq threads if required. */
1107 if ((++seq_tasks) > spl_taskq_thread_sequential &&
1108 taskq_thread_spawn(tq))
1109 seq_tasks = 0;
1110
1111 tqt->tqt_id = TASKQID_INVALID;
1112 tqt->tqt_flags = 0;
1113 wake_up_all(&tq->tq_wait_waitq);
1114 } else
1115 TQSTAT_INC(tq, thread_wakeups_nowork);
1116
1117 set_current_state(TASK_INTERRUPTIBLE);
1118
1119 }
1120
1121 __set_current_state(TASK_RUNNING);
1122 tq->tq_nthreads--;
1123 list_del_init(&tqt->tqt_thread_list);
1124
1125 TQSTAT_DEC(tq, threads_total);
1126 TQSTAT_INC(tq, threads_destroyed);
1127
1128 error:
1129 kmem_free(tqt, sizeof (taskq_thread_t));
1130 spin_unlock_irqrestore(&tq->tq_lock, flags);
1131
1132 tsd_set(taskq_tsd, NULL);
1133 thread_exit();
1134
1135 return (0);
1136 }
1137
1138 static taskq_thread_t *
taskq_thread_create(taskq_t * tq)1139 taskq_thread_create(taskq_t *tq)
1140 {
1141 static int last_used_cpu = 0;
1142 taskq_thread_t *tqt;
1143
1144 tqt = kmem_alloc(sizeof (*tqt), KM_PUSHPAGE);
1145 INIT_LIST_HEAD(&tqt->tqt_thread_list);
1146 INIT_LIST_HEAD(&tqt->tqt_active_list);
1147 tqt->tqt_tq = tq;
1148 tqt->tqt_id = TASKQID_INVALID;
1149
1150 tqt->tqt_thread = spl_kthread_create(taskq_thread, tqt,
1151 "%s", tq->tq_name);
1152 if (tqt->tqt_thread == NULL) {
1153 kmem_free(tqt, sizeof (taskq_thread_t));
1154 return (NULL);
1155 }
1156
1157 if (spl_taskq_thread_bind) {
1158 last_used_cpu = (last_used_cpu + 1) % num_online_cpus();
1159 kthread_bind(tqt->tqt_thread, last_used_cpu);
1160 }
1161
1162 if (spl_taskq_thread_priority)
1163 set_user_nice(tqt->tqt_thread, PRIO_TO_NICE(tq->tq_pri));
1164
1165 wake_up_process(tqt->tqt_thread);
1166
1167 TQSTAT_INC(tq, threads_created);
1168
1169 return (tqt);
1170 }
1171
1172 static void
taskq_stats_init(taskq_t * tq)1173 taskq_stats_init(taskq_t *tq)
1174 {
1175 taskq_sums_t *tqs = &tq->tq_sums;
1176 wmsum_init(&tqs->tqs_threads_active, 0);
1177 wmsum_init(&tqs->tqs_threads_idle, 0);
1178 wmsum_init(&tqs->tqs_threads_total, 0);
1179 wmsum_init(&tqs->tqs_tasks_pending, 0);
1180 wmsum_init(&tqs->tqs_tasks_priority, 0);
1181 wmsum_init(&tqs->tqs_tasks_total, 0);
1182 wmsum_init(&tqs->tqs_tasks_delayed, 0);
1183 wmsum_init(&tqs->tqs_entries_free, 0);
1184 wmsum_init(&tqs->tqs_threads_created, 0);
1185 wmsum_init(&tqs->tqs_threads_destroyed, 0);
1186 wmsum_init(&tqs->tqs_tasks_dispatched, 0);
1187 wmsum_init(&tqs->tqs_tasks_dispatched_delayed, 0);
1188 wmsum_init(&tqs->tqs_tasks_executed_normal, 0);
1189 wmsum_init(&tqs->tqs_tasks_executed_priority, 0);
1190 wmsum_init(&tqs->tqs_tasks_executed, 0);
1191 wmsum_init(&tqs->tqs_tasks_delayed_requeued, 0);
1192 wmsum_init(&tqs->tqs_tasks_cancelled, 0);
1193 wmsum_init(&tqs->tqs_thread_wakeups, 0);
1194 wmsum_init(&tqs->tqs_thread_wakeups_nowork, 0);
1195 wmsum_init(&tqs->tqs_thread_sleeps, 0);
1196 }
1197
1198 static void
taskq_stats_fini(taskq_t * tq)1199 taskq_stats_fini(taskq_t *tq)
1200 {
1201 taskq_sums_t *tqs = &tq->tq_sums;
1202 wmsum_fini(&tqs->tqs_threads_active);
1203 wmsum_fini(&tqs->tqs_threads_idle);
1204 wmsum_fini(&tqs->tqs_threads_total);
1205 wmsum_fini(&tqs->tqs_tasks_pending);
1206 wmsum_fini(&tqs->tqs_tasks_priority);
1207 wmsum_fini(&tqs->tqs_tasks_total);
1208 wmsum_fini(&tqs->tqs_tasks_delayed);
1209 wmsum_fini(&tqs->tqs_entries_free);
1210 wmsum_fini(&tqs->tqs_threads_created);
1211 wmsum_fini(&tqs->tqs_threads_destroyed);
1212 wmsum_fini(&tqs->tqs_tasks_dispatched);
1213 wmsum_fini(&tqs->tqs_tasks_dispatched_delayed);
1214 wmsum_fini(&tqs->tqs_tasks_executed_normal);
1215 wmsum_fini(&tqs->tqs_tasks_executed_priority);
1216 wmsum_fini(&tqs->tqs_tasks_executed);
1217 wmsum_fini(&tqs->tqs_tasks_delayed_requeued);
1218 wmsum_fini(&tqs->tqs_tasks_cancelled);
1219 wmsum_fini(&tqs->tqs_thread_wakeups);
1220 wmsum_fini(&tqs->tqs_thread_wakeups_nowork);
1221 wmsum_fini(&tqs->tqs_thread_sleeps);
1222 }
1223
1224 static int
taskq_kstats_update(kstat_t * ksp,int rw)1225 taskq_kstats_update(kstat_t *ksp, int rw)
1226 {
1227 if (rw == KSTAT_WRITE)
1228 return (EACCES);
1229
1230 taskq_t *tq = ksp->ks_private;
1231 taskq_kstats_t *tqks = ksp->ks_data;
1232
1233 tqks->tqks_threads_max.value.ui64 = tq->tq_maxthreads;
1234 tqks->tqks_entry_pool_min.value.ui64 = tq->tq_minalloc;
1235 tqks->tqks_entry_pool_max.value.ui64 = tq->tq_maxalloc;
1236
1237 taskq_sums_t *tqs = &tq->tq_sums;
1238
1239 tqks->tqks_threads_active.value.ui64 =
1240 wmsum_value(&tqs->tqs_threads_active);
1241 tqks->tqks_threads_idle.value.ui64 =
1242 wmsum_value(&tqs->tqs_threads_idle);
1243 tqks->tqks_threads_total.value.ui64 =
1244 wmsum_value(&tqs->tqs_threads_total);
1245 tqks->tqks_tasks_pending.value.ui64 =
1246 wmsum_value(&tqs->tqs_tasks_pending);
1247 tqks->tqks_tasks_priority.value.ui64 =
1248 wmsum_value(&tqs->tqs_tasks_priority);
1249 tqks->tqks_tasks_total.value.ui64 =
1250 wmsum_value(&tqs->tqs_tasks_total);
1251 tqks->tqks_tasks_delayed.value.ui64 =
1252 wmsum_value(&tqs->tqs_tasks_delayed);
1253 tqks->tqks_entries_free.value.ui64 =
1254 wmsum_value(&tqs->tqs_entries_free);
1255 tqks->tqks_threads_created.value.ui64 =
1256 wmsum_value(&tqs->tqs_threads_created);
1257 tqks->tqks_threads_destroyed.value.ui64 =
1258 wmsum_value(&tqs->tqs_threads_destroyed);
1259 tqks->tqks_tasks_dispatched.value.ui64 =
1260 wmsum_value(&tqs->tqs_tasks_dispatched);
1261 tqks->tqks_tasks_dispatched_delayed.value.ui64 =
1262 wmsum_value(&tqs->tqs_tasks_dispatched_delayed);
1263 tqks->tqks_tasks_executed_normal.value.ui64 =
1264 wmsum_value(&tqs->tqs_tasks_executed_normal);
1265 tqks->tqks_tasks_executed_priority.value.ui64 =
1266 wmsum_value(&tqs->tqs_tasks_executed_priority);
1267 tqks->tqks_tasks_executed.value.ui64 =
1268 wmsum_value(&tqs->tqs_tasks_executed);
1269 tqks->tqks_tasks_delayed_requeued.value.ui64 =
1270 wmsum_value(&tqs->tqs_tasks_delayed_requeued);
1271 tqks->tqks_tasks_cancelled.value.ui64 =
1272 wmsum_value(&tqs->tqs_tasks_cancelled);
1273 tqks->tqks_thread_wakeups.value.ui64 =
1274 wmsum_value(&tqs->tqs_thread_wakeups);
1275 tqks->tqks_thread_wakeups_nowork.value.ui64 =
1276 wmsum_value(&tqs->tqs_thread_wakeups_nowork);
1277 tqks->tqks_thread_sleeps.value.ui64 =
1278 wmsum_value(&tqs->tqs_thread_sleeps);
1279
1280 return (0);
1281 }
1282
1283 static void
taskq_kstats_init(taskq_t * tq)1284 taskq_kstats_init(taskq_t *tq)
1285 {
1286 char name[TASKQ_NAMELEN+5]; /* 5 for dot, 3x instance digits, null */
1287 snprintf(name, sizeof (name), "%s.%d", tq->tq_name, tq->tq_instance);
1288
1289 kstat_t *ksp = kstat_create("taskq", 0, name, "misc",
1290 KSTAT_TYPE_NAMED, sizeof (taskq_kstats_t) / sizeof (kstat_named_t),
1291 KSTAT_FLAG_VIRTUAL);
1292
1293 if (ksp == NULL)
1294 return;
1295
1296 ksp->ks_private = tq;
1297 ksp->ks_update = taskq_kstats_update;
1298 ksp->ks_data = kmem_alloc(sizeof (taskq_kstats_t), KM_SLEEP);
1299 memcpy(ksp->ks_data, &taskq_kstats_template, sizeof (taskq_kstats_t));
1300 kstat_install(ksp);
1301
1302 tq->tq_ksp = ksp;
1303 }
1304
1305 static void
taskq_kstats_fini(taskq_t * tq)1306 taskq_kstats_fini(taskq_t *tq)
1307 {
1308 if (tq->tq_ksp == NULL)
1309 return;
1310
1311 kmem_free(tq->tq_ksp->ks_data, sizeof (taskq_kstats_t));
1312 kstat_delete(tq->tq_ksp);
1313
1314 tq->tq_ksp = NULL;
1315 }
1316
1317 taskq_t *
taskq_create(const char * name,int threads_arg,pri_t pri,int minalloc,int maxalloc,uint_t flags)1318 taskq_create(const char *name, int threads_arg, pri_t pri,
1319 int minalloc, int maxalloc, uint_t flags)
1320 {
1321 taskq_t *tq;
1322 taskq_thread_t *tqt;
1323 int count = 0, rc = 0, i;
1324 unsigned long irqflags;
1325 int nthreads = threads_arg;
1326
1327 ASSERT(name != NULL);
1328 ASSERT(minalloc >= 0);
1329 ASSERT(!(flags & (TASKQ_CPR_SAFE))); /* Unsupported */
1330
1331 /* Scale the number of threads using nthreads as a percentage */
1332 if (flags & TASKQ_THREADS_CPU_PCT) {
1333 ASSERT(nthreads <= 100);
1334 ASSERT(nthreads >= 0);
1335 nthreads = MIN(threads_arg, 100);
1336 nthreads = MAX(nthreads, 0);
1337 nthreads = MAX((num_online_cpus() * nthreads) /100, 1);
1338 }
1339
1340 tq = kmem_alloc(sizeof (*tq), KM_PUSHPAGE);
1341 if (tq == NULL)
1342 return (NULL);
1343
1344 tq->tq_hp_support = B_FALSE;
1345
1346 if (flags & TASKQ_THREADS_CPU_PCT) {
1347 tq->tq_hp_support = B_TRUE;
1348 if (cpuhp_state_add_instance_nocalls(spl_taskq_cpuhp_state,
1349 &tq->tq_hp_cb_node) != 0) {
1350 kmem_free(tq, sizeof (*tq));
1351 return (NULL);
1352 }
1353 }
1354
1355 spin_lock_init(&tq->tq_lock);
1356 INIT_LIST_HEAD(&tq->tq_thread_list);
1357 INIT_LIST_HEAD(&tq->tq_active_list);
1358 tq->tq_name = kmem_strdup(name);
1359 tq->tq_nactive = 0;
1360 tq->tq_nthreads = 0;
1361 tq->tq_nspawn = 0;
1362 tq->tq_maxthreads = nthreads;
1363 tq->tq_cpu_pct = threads_arg;
1364 tq->tq_pri = pri;
1365 tq->tq_minalloc = minalloc;
1366 tq->tq_maxalloc = maxalloc;
1367 tq->tq_nalloc = 0;
1368 tq->tq_flags = (flags | TASKQ_ACTIVE);
1369 tq->tq_next_id = TASKQID_INITIAL;
1370 tq->tq_lowest_id = TASKQID_INITIAL;
1371 tq->lastspawnstop = jiffies;
1372 INIT_LIST_HEAD(&tq->tq_free_list);
1373 INIT_LIST_HEAD(&tq->tq_pend_list);
1374 INIT_LIST_HEAD(&tq->tq_prio_list);
1375 INIT_LIST_HEAD(&tq->tq_delay_list);
1376 init_waitqueue_head(&tq->tq_work_waitq);
1377 init_waitqueue_head(&tq->tq_wait_waitq);
1378 tq->tq_lock_class = TQ_LOCK_GENERAL;
1379 INIT_LIST_HEAD(&tq->tq_taskqs);
1380 taskq_stats_init(tq);
1381
1382 if (flags & TASKQ_PREPOPULATE) {
1383 spin_lock_irqsave_nested(&tq->tq_lock, irqflags,
1384 tq->tq_lock_class);
1385
1386 for (i = 0; i < minalloc; i++)
1387 task_done(tq, task_alloc(tq, TQ_PUSHPAGE | TQ_NEW,
1388 &irqflags));
1389
1390 spin_unlock_irqrestore(&tq->tq_lock, irqflags);
1391 }
1392
1393 if ((flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic)
1394 nthreads = 1;
1395
1396 for (i = 0; i < nthreads; i++) {
1397 tqt = taskq_thread_create(tq);
1398 if (tqt == NULL)
1399 rc = 1;
1400 else
1401 count++;
1402 }
1403
1404 /* Wait for all threads to be started before potential destroy */
1405 wait_event(tq->tq_wait_waitq, tq->tq_nthreads == count);
1406 /*
1407 * taskq_thread might have touched nspawn, but we don't want them to
1408 * because they're not dynamically spawned. So we reset it to 0
1409 */
1410 tq->tq_nspawn = 0;
1411
1412 if (rc) {
1413 taskq_destroy(tq);
1414 return (NULL);
1415 }
1416
1417 down_write(&tq_list_sem);
1418 tq->tq_instance = taskq_find_by_name(name) + 1;
1419 list_add_tail(&tq->tq_taskqs, &tq_list);
1420 up_write(&tq_list_sem);
1421
1422 /* Install kstats late, because the name includes tq_instance */
1423 taskq_kstats_init(tq);
1424
1425 return (tq);
1426 }
1427 EXPORT_SYMBOL(taskq_create);
1428
1429 void
taskq_destroy(taskq_t * tq)1430 taskq_destroy(taskq_t *tq)
1431 {
1432 struct task_struct *thread;
1433 taskq_thread_t *tqt;
1434 taskq_ent_t *t;
1435 unsigned long flags;
1436
1437 ASSERT(tq);
1438 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
1439 tq->tq_flags &= ~TASKQ_ACTIVE;
1440 spin_unlock_irqrestore(&tq->tq_lock, flags);
1441
1442 if (tq->tq_hp_support) {
1443 VERIFY0(cpuhp_state_remove_instance_nocalls(
1444 spl_taskq_cpuhp_state, &tq->tq_hp_cb_node));
1445 }
1446
1447 /*
1448 * When TASKQ_ACTIVE is clear new tasks may not be added nor may
1449 * new worker threads be spawned for dynamic taskq.
1450 */
1451 if (dynamic_taskq != NULL)
1452 taskq_wait_outstanding(dynamic_taskq, 0);
1453
1454 taskq_wait(tq);
1455
1456 taskq_kstats_fini(tq);
1457
1458 /* remove taskq from global list used by the kstats */
1459 down_write(&tq_list_sem);
1460 list_del(&tq->tq_taskqs);
1461 up_write(&tq_list_sem);
1462
1463 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
1464 /* wait for spawning threads to insert themselves to the list */
1465 while (tq->tq_nspawn) {
1466 spin_unlock_irqrestore(&tq->tq_lock, flags);
1467 schedule_timeout_interruptible(1);
1468 spin_lock_irqsave_nested(&tq->tq_lock, flags,
1469 tq->tq_lock_class);
1470 }
1471
1472 /*
1473 * Signal each thread to exit and block until it does. Each thread
1474 * is responsible for removing itself from the list and freeing its
1475 * taskq_thread_t. This allows for idle threads to opt to remove
1476 * themselves from the taskq. They can be recreated as needed.
1477 */
1478 while (!list_empty(&tq->tq_thread_list)) {
1479 tqt = list_entry(tq->tq_thread_list.next,
1480 taskq_thread_t, tqt_thread_list);
1481 thread = tqt->tqt_thread;
1482 spin_unlock_irqrestore(&tq->tq_lock, flags);
1483
1484 kthread_stop(thread);
1485
1486 spin_lock_irqsave_nested(&tq->tq_lock, flags,
1487 tq->tq_lock_class);
1488 }
1489
1490 while (!list_empty(&tq->tq_free_list)) {
1491 t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);
1492
1493 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
1494
1495 list_del_init(&t->tqent_list);
1496 task_free(tq, t);
1497 }
1498
1499 ASSERT0(tq->tq_nthreads);
1500 ASSERT0(tq->tq_nalloc);
1501 ASSERT0(tq->tq_nspawn);
1502 ASSERT(list_empty(&tq->tq_thread_list));
1503 ASSERT(list_empty(&tq->tq_active_list));
1504 ASSERT(list_empty(&tq->tq_free_list));
1505 ASSERT(list_empty(&tq->tq_pend_list));
1506 ASSERT(list_empty(&tq->tq_prio_list));
1507 ASSERT(list_empty(&tq->tq_delay_list));
1508
1509 spin_unlock_irqrestore(&tq->tq_lock, flags);
1510
1511 taskq_stats_fini(tq);
1512 kmem_strfree(tq->tq_name);
1513 kmem_free(tq, sizeof (taskq_t));
1514 }
1515 EXPORT_SYMBOL(taskq_destroy);
1516
1517 /*
1518 * Create a taskq with a specified number of pool threads. Allocate
1519 * and return an array of nthreads kthread_t pointers, one for each
1520 * thread in the pool. The array is not ordered and must be freed
1521 * by the caller.
1522 */
1523 taskq_t *
taskq_create_synced(const char * name,int nthreads,pri_t pri,int minalloc,int maxalloc,uint_t flags,kthread_t *** ktpp)1524 taskq_create_synced(const char *name, int nthreads, pri_t pri,
1525 int minalloc, int maxalloc, uint_t flags, kthread_t ***ktpp)
1526 {
1527 taskq_t *tq;
1528 taskq_thread_t *tqt;
1529 int i = 0;
1530 kthread_t **kthreads = kmem_zalloc(sizeof (*kthreads) * nthreads,
1531 KM_SLEEP);
1532
1533 flags &= ~(TASKQ_DYNAMIC | TASKQ_THREADS_CPU_PCT | TASKQ_DC_BATCH);
1534
1535 /* taskq_create spawns all the threads before returning */
1536 tq = taskq_create(name, nthreads, minclsyspri, nthreads, INT_MAX,
1537 flags | TASKQ_PREPOPULATE);
1538 VERIFY(tq != NULL);
1539 VERIFY(tq->tq_nthreads == nthreads);
1540
1541 list_for_each_entry(tqt, &tq->tq_thread_list, tqt_thread_list) {
1542 kthreads[i] = tqt->tqt_thread;
1543 i++;
1544 }
1545
1546 ASSERT3S(i, ==, nthreads);
1547 *ktpp = kthreads;
1548
1549 return (tq);
1550 }
1551 EXPORT_SYMBOL(taskq_create_synced);
1552
1553 static kstat_t *taskq_summary_ksp = NULL;
1554
1555 static int
spl_taskq_kstat_headers(char * buf,size_t size)1556 spl_taskq_kstat_headers(char *buf, size_t size)
1557 {
1558 size_t n = snprintf(buf, size,
1559 "%-20s | %-17s | %-23s\n"
1560 "%-20s | %-17s | %-23s\n"
1561 "%-20s | %-17s | %-23s\n",
1562 "", "threads", "tasks on queue",
1563 "taskq name", "tot [act idl] max", " pend [ norm high] dly",
1564 "--------------------", "-----------------",
1565 "-----------------------");
1566 return (n >= size ? ENOMEM : 0);
1567 }
1568
1569 static int
spl_taskq_kstat_data(char * buf,size_t size,void * data)1570 spl_taskq_kstat_data(char *buf, size_t size, void *data)
1571 {
1572 struct list_head *tql = NULL;
1573 taskq_t *tq;
1574 char name[TASKQ_NAMELEN+5]; /* 5 for dot, 3x instance digits, null */
1575 char threads[25];
1576 char tasks[30];
1577 size_t n;
1578 int err = 0;
1579
1580 down_read(&tq_list_sem);
1581 list_for_each_prev(tql, &tq_list) {
1582 tq = list_entry(tql, taskq_t, tq_taskqs);
1583
1584 mutex_enter(tq->tq_ksp->ks_lock);
1585 taskq_kstats_update(tq->tq_ksp, KSTAT_READ);
1586 taskq_kstats_t *tqks = tq->tq_ksp->ks_data;
1587
1588 snprintf(name, sizeof (name), "%s.%d", tq->tq_name,
1589 tq->tq_instance);
1590 snprintf(threads, sizeof (threads), "%3llu [%3llu %3llu] %3llu",
1591 tqks->tqks_threads_total.value.ui64,
1592 tqks->tqks_threads_active.value.ui64,
1593 tqks->tqks_threads_idle.value.ui64,
1594 tqks->tqks_threads_max.value.ui64);
1595 snprintf(tasks, sizeof (tasks), "%5llu [%5llu %5llu] %3llu",
1596 tqks->tqks_tasks_total.value.ui64,
1597 tqks->tqks_tasks_pending.value.ui64,
1598 tqks->tqks_tasks_priority.value.ui64,
1599 tqks->tqks_tasks_delayed.value.ui64);
1600
1601 mutex_exit(tq->tq_ksp->ks_lock);
1602
1603 n = snprintf(buf, size, "%-20s | %-17s | %-23s\n",
1604 name, threads, tasks);
1605 if (n >= size) {
1606 err = ENOMEM;
1607 break;
1608 }
1609
1610 buf = &buf[n];
1611 size -= n;
1612 }
1613
1614 up_read(&tq_list_sem);
1615
1616 return (err);
1617 }
1618
1619 static void
spl_taskq_kstat_init(void)1620 spl_taskq_kstat_init(void)
1621 {
1622 kstat_t *ksp = kstat_create("taskq", 0, "summary", "misc",
1623 KSTAT_TYPE_RAW, 0, KSTAT_FLAG_VIRTUAL);
1624
1625 if (ksp == NULL)
1626 return;
1627
1628 ksp->ks_data = (void *)(uintptr_t)1;
1629 ksp->ks_ndata = 1;
1630 kstat_set_raw_ops(ksp, spl_taskq_kstat_headers,
1631 spl_taskq_kstat_data, NULL);
1632 kstat_install(ksp);
1633
1634 taskq_summary_ksp = ksp;
1635 }
1636
1637 static void
spl_taskq_kstat_fini(void)1638 spl_taskq_kstat_fini(void)
1639 {
1640 if (taskq_summary_ksp == NULL)
1641 return;
1642
1643 kstat_delete(taskq_summary_ksp);
1644 taskq_summary_ksp = NULL;
1645 }
1646
1647 static unsigned int spl_taskq_kick = 0;
1648
1649 /*
1650 * 2.6.36 API Change
1651 * module_param_cb is introduced to take kernel_param_ops and
1652 * module_param_call is marked as obsolete. Also set and get operations
1653 * were changed to take a 'const struct kernel_param *'.
1654 */
1655 static int
1656 #ifdef module_param_cb
param_set_taskq_kick(const char * val,const struct kernel_param * kp)1657 param_set_taskq_kick(const char *val, const struct kernel_param *kp)
1658 #else
1659 param_set_taskq_kick(const char *val, struct kernel_param *kp)
1660 #endif
1661 {
1662 int ret;
1663 taskq_t *tq = NULL;
1664 taskq_ent_t *t;
1665 unsigned long flags;
1666
1667 ret = param_set_uint(val, kp);
1668 if (ret < 0 || !spl_taskq_kick)
1669 return (ret);
1670 /* reset value */
1671 spl_taskq_kick = 0;
1672
1673 down_read(&tq_list_sem);
1674 list_for_each_entry(tq, &tq_list, tq_taskqs) {
1675 spin_lock_irqsave_nested(&tq->tq_lock, flags,
1676 tq->tq_lock_class);
1677 /* Check if the first pending is older than 5 seconds */
1678 t = taskq_next_ent(tq);
1679 if (t && time_after(jiffies, t->tqent_birth + 5*HZ)) {
1680 (void) taskq_thread_spawn(tq);
1681 printk(KERN_INFO "spl: Kicked taskq %s/%d\n",
1682 tq->tq_name, tq->tq_instance);
1683 }
1684 spin_unlock_irqrestore(&tq->tq_lock, flags);
1685 }
1686 up_read(&tq_list_sem);
1687 return (ret);
1688 }
1689
1690 #ifdef module_param_cb
1691 static const struct kernel_param_ops param_ops_taskq_kick = {
1692 .set = param_set_taskq_kick,
1693 .get = param_get_uint,
1694 };
1695 module_param_cb(spl_taskq_kick, ¶m_ops_taskq_kick, &spl_taskq_kick, 0644);
1696 #else
1697 module_param_call(spl_taskq_kick, param_set_taskq_kick, param_get_uint,
1698 &spl_taskq_kick, 0644);
1699 #endif
1700 MODULE_PARM_DESC(spl_taskq_kick,
1701 "Write nonzero to kick stuck taskqs to spawn more threads");
1702
1703 /*
1704 * This callback will be called exactly once for each core that comes online,
1705 * for each dynamic taskq. We attempt to expand taskqs that have
1706 * TASKQ_THREADS_CPU_PCT set. We need to redo the percentage calculation every
1707 * time, to correctly determine whether or not to add a thread.
1708 */
1709 static int
spl_taskq_expand(unsigned int cpu,struct hlist_node * node)1710 spl_taskq_expand(unsigned int cpu, struct hlist_node *node)
1711 {
1712 taskq_t *tq = list_entry(node, taskq_t, tq_hp_cb_node);
1713 unsigned long flags;
1714 int err = 0;
1715
1716 ASSERT(tq);
1717 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
1718
1719 if (!(tq->tq_flags & TASKQ_ACTIVE)) {
1720 spin_unlock_irqrestore(&tq->tq_lock, flags);
1721 return (err);
1722 }
1723
1724 ASSERT(tq->tq_flags & TASKQ_THREADS_CPU_PCT);
1725 int nthreads = MIN(tq->tq_cpu_pct, 100);
1726 nthreads = MAX(((num_online_cpus() + 1) * nthreads) / 100, 1);
1727 tq->tq_maxthreads = nthreads;
1728
1729 if (!((tq->tq_flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic) &&
1730 tq->tq_maxthreads > tq->tq_nthreads) {
1731 spin_unlock_irqrestore(&tq->tq_lock, flags);
1732 taskq_thread_t *tqt = taskq_thread_create(tq);
1733 if (tqt == NULL)
1734 err = -1;
1735 return (err);
1736 }
1737 spin_unlock_irqrestore(&tq->tq_lock, flags);
1738 return (err);
1739 }
1740
1741 /*
1742 * While we don't support offlining CPUs, it is possible that CPUs will fail
1743 * to online successfully. We do need to be able to handle this case
1744 * gracefully.
1745 */
1746 static int
spl_taskq_prepare_down(unsigned int cpu,struct hlist_node * node)1747 spl_taskq_prepare_down(unsigned int cpu, struct hlist_node *node)
1748 {
1749 taskq_t *tq = list_entry(node, taskq_t, tq_hp_cb_node);
1750 unsigned long flags;
1751
1752 ASSERT(tq);
1753 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
1754
1755 if (!(tq->tq_flags & TASKQ_ACTIVE))
1756 goto out;
1757
1758 ASSERT(tq->tq_flags & TASKQ_THREADS_CPU_PCT);
1759 int nthreads = MIN(tq->tq_cpu_pct, 100);
1760 nthreads = MAX(((num_online_cpus()) * nthreads) / 100, 1);
1761 tq->tq_maxthreads = nthreads;
1762
1763 if (!((tq->tq_flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic) &&
1764 tq->tq_maxthreads < tq->tq_nthreads) {
1765 ASSERT3U(tq->tq_maxthreads, ==, tq->tq_nthreads - 1);
1766 taskq_thread_t *tqt = list_entry(tq->tq_thread_list.next,
1767 taskq_thread_t, tqt_thread_list);
1768 struct task_struct *thread = tqt->tqt_thread;
1769 spin_unlock_irqrestore(&tq->tq_lock, flags);
1770
1771 kthread_stop(thread);
1772
1773 return (0);
1774 }
1775
1776 out:
1777 spin_unlock_irqrestore(&tq->tq_lock, flags);
1778 return (0);
1779 }
1780
1781 int
spl_taskq_init(void)1782 spl_taskq_init(void)
1783 {
1784 init_rwsem(&tq_list_sem);
1785 tsd_create(&taskq_tsd, NULL);
1786
1787 spl_taskq_cpuhp_state = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN,
1788 "fs/spl_taskq:online", spl_taskq_expand, spl_taskq_prepare_down);
1789
1790 system_taskq = taskq_create("spl_system_taskq", MAX(boot_ncpus, 64),
1791 maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE|TASKQ_DYNAMIC);
1792 if (system_taskq == NULL)
1793 return (-ENOMEM);
1794
1795 system_delay_taskq = taskq_create("spl_delay_taskq", MAX(boot_ncpus, 4),
1796 maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE|TASKQ_DYNAMIC);
1797 if (system_delay_taskq == NULL) {
1798 cpuhp_remove_multi_state(spl_taskq_cpuhp_state);
1799 taskq_destroy(system_taskq);
1800 return (-ENOMEM);
1801 }
1802
1803 dynamic_taskq = taskq_create("spl_dynamic_taskq", 1,
1804 maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE);
1805 if (dynamic_taskq == NULL) {
1806 cpuhp_remove_multi_state(spl_taskq_cpuhp_state);
1807 taskq_destroy(system_taskq);
1808 taskq_destroy(system_delay_taskq);
1809 return (-ENOMEM);
1810 }
1811
1812 /*
1813 * This is used to annotate tq_lock, so
1814 * taskq_dispatch -> taskq_thread_spawn -> taskq_dispatch
1815 * does not trigger a lockdep warning re: possible recursive locking
1816 */
1817 dynamic_taskq->tq_lock_class = TQ_LOCK_DYNAMIC;
1818
1819 spl_taskq_kstat_init();
1820
1821 return (0);
1822 }
1823
1824 void
spl_taskq_fini(void)1825 spl_taskq_fini(void)
1826 {
1827 spl_taskq_kstat_fini();
1828
1829 taskq_destroy(dynamic_taskq);
1830 dynamic_taskq = NULL;
1831
1832 taskq_destroy(system_delay_taskq);
1833 system_delay_taskq = NULL;
1834
1835 taskq_destroy(system_taskq);
1836 system_taskq = NULL;
1837
1838 tsd_destroy(&taskq_tsd);
1839
1840 cpuhp_remove_multi_state(spl_taskq_cpuhp_state);
1841 spl_taskq_cpuhp_state = 0;
1842 }
1843