xref: /freebsd/sys/contrib/openzfs/module/os/linux/spl/spl-taskq.c (revision b1c1ee4429fcca8f69873a8be66184e68e1b19d7)
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3  *  Copyright (C) 2007-2010 Lawrence Livermore National Security, LLC.
4  *  Copyright (C) 2007 The Regents of the University of California.
5  *  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
6  *  Written by Brian Behlendorf <behlendorf1@llnl.gov>.
7  *  UCRL-CODE-235197
8  *
9  *  This file is part of the SPL, Solaris Porting Layer.
10  *
11  *  The SPL is free software; you can redistribute it and/or modify it
12  *  under the terms of the GNU General Public License as published by the
13  *  Free Software Foundation; either version 2 of the License, or (at your
14  *  option) any later version.
15  *
16  *  The SPL is distributed in the hope that it will be useful, but WITHOUT
17  *  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18  *  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
19  *  for more details.
20  *
21  *  You should have received a copy of the GNU General Public License along
22  *  with the SPL.  If not, see <http://www.gnu.org/licenses/>.
23  *
24  *  Solaris Porting Layer (SPL) Task Queue Implementation.
25  */
26 /*
27  * Copyright (c) 2024, Klara Inc.
28  * Copyright (c) 2024, Syneto
29  */
30 
31 #include <sys/timer.h>
32 #include <sys/taskq.h>
33 #include <sys/kmem.h>
34 #include <sys/tsd.h>
35 #include <sys/trace_spl.h>
36 #include <sys/time.h>
37 #include <sys/atomic.h>
38 #include <sys/kstat.h>
39 #include <linux/cpuhotplug.h>
40 #include <linux/mod_compat.h>
41 
42 /* Linux 6.2 renamed timer_delete_sync(); point it at its old name for those. */
43 #ifndef HAVE_TIMER_DELETE_SYNC
44 #define	timer_delete_sync(t)	del_timer_sync(t)
45 #endif
46 
47 typedef struct taskq_kstats {
48 	/* static values, for completeness */
49 	kstat_named_t tqks_threads_max;
50 	kstat_named_t tqks_entry_pool_min;
51 	kstat_named_t tqks_entry_pool_max;
52 
53 	/* gauges (inc/dec counters, current value) */
54 	kstat_named_t tqks_threads_active;
55 	kstat_named_t tqks_threads_idle;
56 	kstat_named_t tqks_threads_total;
57 	kstat_named_t tqks_tasks_pending;
58 	kstat_named_t tqks_tasks_priority;
59 	kstat_named_t tqks_tasks_total;
60 	kstat_named_t tqks_tasks_delayed;
61 	kstat_named_t tqks_entries_free;
62 
63 	/* counters (inc only, since taskq creation) */
64 	kstat_named_t tqks_threads_created;
65 	kstat_named_t tqks_threads_destroyed;
66 	kstat_named_t tqks_tasks_dispatched;
67 	kstat_named_t tqks_tasks_dispatched_delayed;
68 	kstat_named_t tqks_tasks_executed_normal;
69 	kstat_named_t tqks_tasks_executed_priority;
70 	kstat_named_t tqks_tasks_executed;
71 	kstat_named_t tqks_tasks_delayed_requeued;
72 	kstat_named_t tqks_tasks_cancelled;
73 	kstat_named_t tqks_thread_wakeups;
74 	kstat_named_t tqks_thread_wakeups_nowork;
75 	kstat_named_t tqks_thread_sleeps;
76 } taskq_kstats_t;
77 
78 static taskq_kstats_t taskq_kstats_template = {
79 	{ "threads_max",		KSTAT_DATA_UINT64 },
80 	{ "entry_pool_min",		KSTAT_DATA_UINT64 },
81 	{ "entry_pool_max",		KSTAT_DATA_UINT64 },
82 	{ "threads_active",		KSTAT_DATA_UINT64 },
83 	{ "threads_idle",		KSTAT_DATA_UINT64 },
84 	{ "threads_total",		KSTAT_DATA_UINT64 },
85 	{ "tasks_pending",		KSTAT_DATA_UINT64 },
86 	{ "tasks_priority",		KSTAT_DATA_UINT64 },
87 	{ "tasks_total",		KSTAT_DATA_UINT64 },
88 	{ "tasks_delayed",		KSTAT_DATA_UINT64 },
89 	{ "entries_free",		KSTAT_DATA_UINT64 },
90 
91 	{ "threads_created",		KSTAT_DATA_UINT64 },
92 	{ "threads_destroyed",		KSTAT_DATA_UINT64 },
93 	{ "tasks_dispatched",		KSTAT_DATA_UINT64 },
94 	{ "tasks_dispatched_delayed",	KSTAT_DATA_UINT64 },
95 	{ "tasks_executed_normal",	KSTAT_DATA_UINT64 },
96 	{ "tasks_executed_priority",	KSTAT_DATA_UINT64 },
97 	{ "tasks_executed",		KSTAT_DATA_UINT64 },
98 	{ "tasks_delayed_requeued",	KSTAT_DATA_UINT64 },
99 	{ "tasks_cancelled",		KSTAT_DATA_UINT64 },
100 	{ "thread_wakeups",		KSTAT_DATA_UINT64 },
101 	{ "thread_wakeups_nowork",	KSTAT_DATA_UINT64 },
102 	{ "thread_sleeps",		KSTAT_DATA_UINT64 },
103 };
104 
105 #define	TQSTAT_INC(tq, stat)	wmsum_add(&tq->tq_sums.tqs_##stat, 1)
106 #define	TQSTAT_DEC(tq, stat)	wmsum_add(&tq->tq_sums.tqs_##stat, -1)
107 
108 #define	_TQSTAT_MOD_LIST(mod, tq, t) do { \
109 	switch (t->tqent_flags & TQENT_LIST_MASK) {			\
110 	case TQENT_LIST_NONE: ASSERT(list_empty(&t->tqent_list)); break;\
111 	case TQENT_LIST_PENDING: mod(tq, tasks_pending); break;		\
112 	case TQENT_LIST_PRIORITY: mod(tq, tasks_priority); break;	\
113 	case TQENT_LIST_DELAY: mod(tq, tasks_delayed); break;		\
114 	}								\
115 } while (0)
116 #define	TQSTAT_INC_LIST(tq, t)	_TQSTAT_MOD_LIST(TQSTAT_INC, tq, t)
117 #define	TQSTAT_DEC_LIST(tq, t)	_TQSTAT_MOD_LIST(TQSTAT_DEC, tq, t)
118 
119 #define	TQENT_SET_LIST(t, l)	\
120 	t->tqent_flags = (t->tqent_flags & ~TQENT_LIST_MASK) | l;
121 
122 static int spl_taskq_thread_bind = 0;
123 module_param(spl_taskq_thread_bind, int, 0644);
124 MODULE_PARM_DESC(spl_taskq_thread_bind, "Bind taskq thread to CPU by default");
125 
126 static uint_t spl_taskq_thread_timeout_ms = 5000;
127 module_param(spl_taskq_thread_timeout_ms, uint, 0644);
128 MODULE_PARM_DESC(spl_taskq_thread_timeout_ms,
129 	"Minimum idle threads exit interval for dynamic taskqs");
130 
131 static int spl_taskq_thread_dynamic = 1;
132 module_param(spl_taskq_thread_dynamic, int, 0444);
133 MODULE_PARM_DESC(spl_taskq_thread_dynamic, "Allow dynamic taskq threads");
134 
135 static int spl_taskq_thread_priority = 1;
136 module_param(spl_taskq_thread_priority, int, 0644);
137 MODULE_PARM_DESC(spl_taskq_thread_priority,
138 	"Allow non-default priority for taskq threads");
139 
140 static uint_t spl_taskq_thread_sequential = 4;
141 module_param(spl_taskq_thread_sequential, uint, 0644);
142 MODULE_PARM_DESC(spl_taskq_thread_sequential,
143 	"Create new taskq threads after N sequential tasks");
144 
145 /*
146  * Global system-wide dynamic task queue available for all consumers. This
147  * taskq is not intended for long-running tasks; instead, a dedicated taskq
148  * should be created.
149  */
150 taskq_t *system_taskq;
151 EXPORT_SYMBOL(system_taskq);
152 /* Global dynamic task queue for long delay */
153 taskq_t *system_delay_taskq;
154 EXPORT_SYMBOL(system_delay_taskq);
155 
156 /* Private dedicated taskq for creating new taskq threads on demand. */
157 static taskq_t *dynamic_taskq;
158 static taskq_thread_t *taskq_thread_create(taskq_t *);
159 
160 /* Multi-callback id for cpu hotplugging. */
161 static int spl_taskq_cpuhp_state;
162 
163 /* List of all taskqs */
164 LIST_HEAD(tq_list);
165 struct rw_semaphore tq_list_sem;
166 static uint_t taskq_tsd;
167 
168 static int
task_km_flags(uint_t flags)169 task_km_flags(uint_t flags)
170 {
171 	if (flags & TQ_NOSLEEP)
172 		return (KM_NOSLEEP);
173 
174 	if (flags & TQ_PUSHPAGE)
175 		return (KM_PUSHPAGE);
176 
177 	return (KM_SLEEP);
178 }
179 
180 /*
181  * taskq_find_by_name - Find the largest instance number of a named taskq.
182  */
183 static int
taskq_find_by_name(const char * name)184 taskq_find_by_name(const char *name)
185 {
186 	struct list_head *tql = NULL;
187 	taskq_t *tq;
188 
189 	list_for_each_prev(tql, &tq_list) {
190 		tq = list_entry(tql, taskq_t, tq_taskqs);
191 		if (strcmp(name, tq->tq_name) == 0)
192 			return (tq->tq_instance);
193 	}
194 	return (-1);
195 }
196 
197 /*
198  * NOTE: Must be called with tq->tq_lock held, returns a list_t which
199  * is not attached to the free, work, or pending taskq lists.
200  */
201 static taskq_ent_t *
task_alloc(taskq_t * tq,uint_t flags,unsigned long * irqflags)202 task_alloc(taskq_t *tq, uint_t flags, unsigned long *irqflags)
203 {
204 	taskq_ent_t *t;
205 	int count = 0;
206 
207 	ASSERT(tq);
208 retry:
209 	/* Acquire taskq_ent_t's from free list if available */
210 	if (!list_empty(&tq->tq_free_list) && !(flags & TQ_NEW)) {
211 		t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);
212 
213 		ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
214 		ASSERT(!(t->tqent_flags & TQENT_FLAG_CANCEL));
215 		ASSERT(!timer_pending(&t->tqent_timer));
216 
217 		list_del_init(&t->tqent_list);
218 		TQSTAT_DEC(tq, entries_free);
219 		return (t);
220 	}
221 
222 	/* Free list is empty and memory allocations are prohibited */
223 	if (flags & TQ_NOALLOC)
224 		return (NULL);
225 
226 	/* Hit maximum taskq_ent_t pool size */
227 	if (tq->tq_nalloc >= tq->tq_maxalloc) {
228 		if (flags & TQ_NOSLEEP)
229 			return (NULL);
230 
231 		/*
232 		 * Sleep periodically polling the free list for an available
233 		 * taskq_ent_t. Dispatching with TQ_SLEEP should always succeed
234 		 * but we cannot block forever waiting for an taskq_ent_t to
235 		 * show up in the free list, otherwise a deadlock can happen.
236 		 *
237 		 * Therefore, we need to allocate a new task even if the number
238 		 * of allocated tasks is above tq->tq_maxalloc, but we still
239 		 * end up delaying the task allocation by one second, thereby
240 		 * throttling the task dispatch rate.
241 		 */
242 		spin_unlock_irqrestore(&tq->tq_lock, *irqflags);
243 		schedule_timeout_interruptible(HZ / 100);
244 		spin_lock_irqsave_nested(&tq->tq_lock, *irqflags,
245 		    tq->tq_lock_class);
246 		if (count < 100) {
247 			count++;
248 			goto retry;
249 		}
250 	}
251 
252 	spin_unlock_irqrestore(&tq->tq_lock, *irqflags);
253 	t = kmem_alloc(sizeof (taskq_ent_t), task_km_flags(flags));
254 	spin_lock_irqsave_nested(&tq->tq_lock, *irqflags, tq->tq_lock_class);
255 
256 	if (t) {
257 		taskq_init_ent(t);
258 		tq->tq_nalloc++;
259 	}
260 
261 	return (t);
262 }
263 
264 /*
265  * NOTE: Must be called with tq->tq_lock held, expects the taskq_ent_t
266  * to already be removed from the free, work, or pending taskq lists.
267  */
268 static void
task_free(taskq_t * tq,taskq_ent_t * t)269 task_free(taskq_t *tq, taskq_ent_t *t)
270 {
271 	ASSERT(tq);
272 	ASSERT(t);
273 	ASSERT(list_empty(&t->tqent_list));
274 	ASSERT(!timer_pending(&t->tqent_timer));
275 
276 	kmem_free(t, sizeof (taskq_ent_t));
277 	tq->tq_nalloc--;
278 }
279 
280 /*
281  * NOTE: Must be called with tq->tq_lock held, either destroys the
282  * taskq_ent_t if too many exist or moves it to the free list for later use.
283  */
284 static void
task_done(taskq_t * tq,taskq_ent_t * t)285 task_done(taskq_t *tq, taskq_ent_t *t)
286 {
287 	ASSERT(tq);
288 	ASSERT(t);
289 	ASSERT(list_empty(&t->tqent_list));
290 
291 	/* Wake tasks blocked in taskq_wait_id() */
292 	wake_up_all(&t->tqent_waitq);
293 
294 	if (tq->tq_nalloc <= tq->tq_minalloc) {
295 		t->tqent_id = TASKQID_INVALID;
296 		t->tqent_func = NULL;
297 		t->tqent_arg = NULL;
298 		t->tqent_flags = 0;
299 
300 		list_add_tail(&t->tqent_list, &tq->tq_free_list);
301 		TQSTAT_INC(tq, entries_free);
302 	} else {
303 		task_free(tq, t);
304 	}
305 }
306 
307 /*
308  * When a delayed task timer expires remove it from the delay list and
309  * add it to the priority list in order for immediate processing.
310  */
311 static void
task_expire_impl(taskq_ent_t * t)312 task_expire_impl(taskq_ent_t *t)
313 {
314 	taskq_ent_t *w;
315 	taskq_t *tq = t->tqent_taskq;
316 	struct list_head *l = NULL;
317 	unsigned long flags;
318 
319 	spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
320 
321 	if (t->tqent_flags & TQENT_FLAG_CANCEL) {
322 		ASSERT(list_empty(&t->tqent_list));
323 		spin_unlock_irqrestore(&tq->tq_lock, flags);
324 		return;
325 	}
326 
327 	t->tqent_birth = jiffies;
328 	DTRACE_PROBE1(taskq_ent__birth, taskq_ent_t *, t);
329 
330 	/*
331 	 * The priority list must be maintained in strict task id order
332 	 * from lowest to highest for lowest_id to be easily calculable.
333 	 */
334 	list_del(&t->tqent_list);
335 	list_for_each_prev(l, &tq->tq_prio_list) {
336 		w = list_entry(l, taskq_ent_t, tqent_list);
337 		if (w->tqent_id < t->tqent_id) {
338 			list_add(&t->tqent_list, l);
339 			break;
340 		}
341 	}
342 	if (l == &tq->tq_prio_list)
343 		list_add(&t->tqent_list, &tq->tq_prio_list);
344 
345 	spin_unlock_irqrestore(&tq->tq_lock, flags);
346 
347 	wake_up(&tq->tq_work_waitq);
348 
349 	TQSTAT_INC(tq, tasks_delayed_requeued);
350 }
351 
352 static void
task_expire(struct timer_list * tl)353 task_expire(struct timer_list *tl)
354 {
355 	struct timer_list *tmr = (struct timer_list *)tl;
356 	taskq_ent_t *t = from_timer(t, tmr, tqent_timer);
357 	task_expire_impl(t);
358 }
359 
360 /*
361  * Returns the lowest incomplete taskqid_t.  The taskqid_t may
362  * be queued on the pending list, on the priority list, on the
363  * delay list, or on the work list currently being handled, but
364  * it is not 100% complete yet.
365  */
366 static taskqid_t
taskq_lowest_id(taskq_t * tq)367 taskq_lowest_id(taskq_t *tq)
368 {
369 	taskqid_t lowest_id = tq->tq_next_id;
370 	taskq_ent_t *t;
371 	taskq_thread_t *tqt;
372 
373 	if (!list_empty(&tq->tq_pend_list)) {
374 		t = list_entry(tq->tq_pend_list.next, taskq_ent_t, tqent_list);
375 		lowest_id = MIN(lowest_id, t->tqent_id);
376 	}
377 
378 	if (!list_empty(&tq->tq_prio_list)) {
379 		t = list_entry(tq->tq_prio_list.next, taskq_ent_t, tqent_list);
380 		lowest_id = MIN(lowest_id, t->tqent_id);
381 	}
382 
383 	if (!list_empty(&tq->tq_delay_list)) {
384 		t = list_entry(tq->tq_delay_list.next, taskq_ent_t, tqent_list);
385 		lowest_id = MIN(lowest_id, t->tqent_id);
386 	}
387 
388 	if (!list_empty(&tq->tq_active_list)) {
389 		tqt = list_entry(tq->tq_active_list.next, taskq_thread_t,
390 		    tqt_active_list);
391 		ASSERT(tqt->tqt_id != TASKQID_INVALID);
392 		lowest_id = MIN(lowest_id, tqt->tqt_id);
393 	}
394 
395 	return (lowest_id);
396 }
397 
398 /*
399  * Insert a task into a list keeping the list sorted by increasing taskqid.
400  */
401 static void
taskq_insert_in_order(taskq_t * tq,taskq_thread_t * tqt)402 taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt)
403 {
404 	taskq_thread_t *w;
405 	struct list_head *l = NULL;
406 
407 	ASSERT(tq);
408 	ASSERT(tqt);
409 
410 	list_for_each_prev(l, &tq->tq_active_list) {
411 		w = list_entry(l, taskq_thread_t, tqt_active_list);
412 		if (w->tqt_id < tqt->tqt_id) {
413 			list_add(&tqt->tqt_active_list, l);
414 			break;
415 		}
416 	}
417 	if (l == &tq->tq_active_list)
418 		list_add(&tqt->tqt_active_list, &tq->tq_active_list);
419 }
420 
421 /*
422  * Find and return a task from the given list if it exists.  The list
423  * must be in lowest to highest task id order.
424  */
425 static taskq_ent_t *
taskq_find_list(taskq_t * tq,struct list_head * lh,taskqid_t id)426 taskq_find_list(taskq_t *tq, struct list_head *lh, taskqid_t id)
427 {
428 	struct list_head *l = NULL;
429 	taskq_ent_t *t;
430 
431 	list_for_each(l, lh) {
432 		t = list_entry(l, taskq_ent_t, tqent_list);
433 
434 		if (t->tqent_id == id)
435 			return (t);
436 
437 		if (t->tqent_id > id)
438 			break;
439 	}
440 
441 	return (NULL);
442 }
443 
444 /*
445  * Find an already dispatched task given the task id regardless of what
446  * state it is in.  If a task is still pending it will be returned.
447  * If a task is executing, then -EBUSY will be returned instead.
448  * If the task has already been run then NULL is returned.
449  */
450 static taskq_ent_t *
taskq_find(taskq_t * tq,taskqid_t id)451 taskq_find(taskq_t *tq, taskqid_t id)
452 {
453 	taskq_thread_t *tqt;
454 	struct list_head *l = NULL;
455 	taskq_ent_t *t;
456 
457 	t = taskq_find_list(tq, &tq->tq_delay_list, id);
458 	if (t)
459 		return (t);
460 
461 	t = taskq_find_list(tq, &tq->tq_prio_list, id);
462 	if (t)
463 		return (t);
464 
465 	t = taskq_find_list(tq, &tq->tq_pend_list, id);
466 	if (t)
467 		return (t);
468 
469 	list_for_each(l, &tq->tq_active_list) {
470 		tqt = list_entry(l, taskq_thread_t, tqt_active_list);
471 		if (tqt->tqt_id == id) {
472 			/*
473 			 * Instead of returning tqt_task, we just return a non
474 			 * NULL value to prevent misuse, since tqt_task only
475 			 * has two valid fields.
476 			 */
477 			return (ERR_PTR(-EBUSY));
478 		}
479 	}
480 
481 	return (NULL);
482 }
483 
484 /*
485  * Theory for the taskq_wait_id(), taskq_wait_outstanding(), and
486  * taskq_wait() functions below.
487  *
488  * Taskq waiting is accomplished by tracking the lowest outstanding task
489  * id and the next available task id.  As tasks are dispatched they are
490  * added to the tail of the pending, priority, or delay lists.  As worker
491  * threads become available the tasks are removed from the heads of these
492  * lists and linked to the worker threads.  This ensures the lists are
493  * kept sorted by lowest to highest task id.
494  *
495  * Therefore the lowest outstanding task id can be quickly determined by
496  * checking the head item from all of these lists.  This value is stored
497  * with the taskq as the lowest id.  It only needs to be recalculated when
498  * either the task with the current lowest id completes or is canceled.
499  *
500  * By blocking until the lowest task id exceeds the passed task id the
501  * taskq_wait_outstanding() function can be easily implemented.  Similarly,
502  * by blocking until the lowest task id matches the next task id taskq_wait()
503  * can be implemented.
504  *
505  * Callers should be aware that when there are multiple worked threads it
506  * is possible for larger task ids to complete before smaller ones.  Also
507  * when the taskq contains delay tasks with small task ids callers may
508  * block for a considerable length of time waiting for them to expire and
509  * execute.
510  */
511 static int
taskq_wait_id_check(taskq_t * tq,taskqid_t id)512 taskq_wait_id_check(taskq_t *tq, taskqid_t id)
513 {
514 	int rc;
515 	unsigned long flags;
516 
517 	spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
518 	rc = (taskq_find(tq, id) == NULL);
519 	spin_unlock_irqrestore(&tq->tq_lock, flags);
520 
521 	return (rc);
522 }
523 
524 /*
525  * The taskq_wait_id() function blocks until the passed task id completes.
526  * This does not guarantee that all lower task ids have completed.
527  */
528 void
taskq_wait_id(taskq_t * tq,taskqid_t id)529 taskq_wait_id(taskq_t *tq, taskqid_t id)
530 {
531 	wait_event(tq->tq_wait_waitq, taskq_wait_id_check(tq, id));
532 }
533 EXPORT_SYMBOL(taskq_wait_id);
534 
535 static int
taskq_wait_outstanding_check(taskq_t * tq,taskqid_t id)536 taskq_wait_outstanding_check(taskq_t *tq, taskqid_t id)
537 {
538 	int rc;
539 	unsigned long flags;
540 
541 	spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
542 	rc = (id < tq->tq_lowest_id);
543 	spin_unlock_irqrestore(&tq->tq_lock, flags);
544 
545 	return (rc);
546 }
547 
548 /*
549  * The taskq_wait_outstanding() function will block until all tasks with a
550  * lower taskqid than the passed 'id' have been completed.  Note that all
551  * task id's are assigned monotonically at dispatch time.  Zero may be
552  * passed for the id to indicate all tasks dispatch up to this point,
553  * but not after, should be waited for.
554  */
555 void
taskq_wait_outstanding(taskq_t * tq,taskqid_t id)556 taskq_wait_outstanding(taskq_t *tq, taskqid_t id)
557 {
558 	id = id ? id : tq->tq_next_id - 1;
559 	wait_event(tq->tq_wait_waitq, taskq_wait_outstanding_check(tq, id));
560 }
561 EXPORT_SYMBOL(taskq_wait_outstanding);
562 
563 static int
taskq_wait_check(taskq_t * tq)564 taskq_wait_check(taskq_t *tq)
565 {
566 	int rc;
567 	unsigned long flags;
568 
569 	spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
570 	rc = (tq->tq_lowest_id == tq->tq_next_id);
571 	spin_unlock_irqrestore(&tq->tq_lock, flags);
572 
573 	return (rc);
574 }
575 
576 /*
577  * The taskq_wait() function will block until the taskq is empty.
578  * This means that if a taskq re-dispatches work to itself taskq_wait()
579  * callers will block indefinitely.
580  */
581 void
taskq_wait(taskq_t * tq)582 taskq_wait(taskq_t *tq)
583 {
584 	wait_event(tq->tq_wait_waitq, taskq_wait_check(tq));
585 }
586 EXPORT_SYMBOL(taskq_wait);
587 
588 int
taskq_member(taskq_t * tq,kthread_t * t)589 taskq_member(taskq_t *tq, kthread_t *t)
590 {
591 	return (tq == (taskq_t *)tsd_get_by_thread(taskq_tsd, t));
592 }
593 EXPORT_SYMBOL(taskq_member);
594 
595 taskq_t *
taskq_of_curthread(void)596 taskq_of_curthread(void)
597 {
598 	return (tsd_get(taskq_tsd));
599 }
600 EXPORT_SYMBOL(taskq_of_curthread);
601 
602 /*
603  * Cancel an already dispatched task given the task id.  Still pending tasks
604  * will be immediately canceled, and if the task is active the function will
605  * block until it completes.  Preallocated tasks which are canceled must be
606  * freed by the caller.
607  */
608 int
taskq_cancel_id(taskq_t * tq,taskqid_t id)609 taskq_cancel_id(taskq_t *tq, taskqid_t id)
610 {
611 	taskq_ent_t *t;
612 	int rc = ENOENT;
613 	unsigned long flags;
614 
615 	ASSERT(tq);
616 
617 	spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
618 	t = taskq_find(tq, id);
619 	if (t && t != ERR_PTR(-EBUSY)) {
620 		list_del_init(&t->tqent_list);
621 		TQSTAT_DEC_LIST(tq, t);
622 		TQSTAT_DEC(tq, tasks_total);
623 
624 		t->tqent_flags |= TQENT_FLAG_CANCEL;
625 		TQSTAT_INC(tq, tasks_cancelled);
626 
627 		/*
628 		 * When canceling the lowest outstanding task id we
629 		 * must recalculate the new lowest outstanding id.
630 		 */
631 		if (tq->tq_lowest_id == t->tqent_id) {
632 			tq->tq_lowest_id = taskq_lowest_id(tq);
633 			ASSERT3S(tq->tq_lowest_id, >, t->tqent_id);
634 		}
635 
636 		/*
637 		 * The task_expire() function takes the tq->tq_lock so drop
638 		 * drop the lock before synchronously cancelling the timer.
639 		 */
640 		if (timer_pending(&t->tqent_timer)) {
641 			spin_unlock_irqrestore(&tq->tq_lock, flags);
642 			timer_delete_sync(&t->tqent_timer);
643 			spin_lock_irqsave_nested(&tq->tq_lock, flags,
644 			    tq->tq_lock_class);
645 		}
646 
647 		if (!(t->tqent_flags & TQENT_FLAG_PREALLOC))
648 			task_done(tq, t);
649 
650 		rc = 0;
651 	}
652 	spin_unlock_irqrestore(&tq->tq_lock, flags);
653 
654 	if (t == ERR_PTR(-EBUSY)) {
655 		taskq_wait_id(tq, id);
656 		rc = EBUSY;
657 	}
658 
659 	return (rc);
660 }
661 EXPORT_SYMBOL(taskq_cancel_id);
662 
663 static int taskq_thread_spawn(taskq_t *tq);
664 
665 taskqid_t
taskq_dispatch(taskq_t * tq,task_func_t func,void * arg,uint_t flags)666 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
667 {
668 	taskq_ent_t *t;
669 	taskqid_t rc = TASKQID_INVALID;
670 	unsigned long irqflags;
671 
672 	ASSERT(tq);
673 	ASSERT(func);
674 
675 	spin_lock_irqsave_nested(&tq->tq_lock, irqflags, tq->tq_lock_class);
676 
677 	/* Taskq being destroyed and all tasks drained */
678 	if (!(tq->tq_flags & TASKQ_ACTIVE))
679 		goto out;
680 
681 	/* Do not queue the task unless there is idle thread for it */
682 	ASSERT(tq->tq_nactive <= tq->tq_nthreads);
683 	if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads)) {
684 		/* Dynamic taskq may be able to spawn another thread */
685 		if (taskq_thread_spawn(tq) == 0)
686 			goto out;
687 	}
688 
689 	if ((t = task_alloc(tq, flags, &irqflags)) == NULL)
690 		goto out;
691 
692 	spin_lock(&t->tqent_lock);
693 
694 	/* Queue to the front of the list to enforce TQ_NOQUEUE semantics */
695 	if (flags & TQ_NOQUEUE) {
696 		TQENT_SET_LIST(t, TQENT_LIST_PRIORITY);
697 		list_add(&t->tqent_list, &tq->tq_prio_list);
698 	/* Queue to the priority list instead of the pending list */
699 	} else if (flags & TQ_FRONT) {
700 		TQENT_SET_LIST(t, TQENT_LIST_PRIORITY);
701 		list_add_tail(&t->tqent_list, &tq->tq_prio_list);
702 	} else {
703 		TQENT_SET_LIST(t, TQENT_LIST_PENDING);
704 		list_add_tail(&t->tqent_list, &tq->tq_pend_list);
705 	}
706 	TQSTAT_INC_LIST(tq, t);
707 	TQSTAT_INC(tq, tasks_total);
708 
709 	t->tqent_id = rc = tq->tq_next_id;
710 	tq->tq_next_id++;
711 	t->tqent_func = func;
712 	t->tqent_arg = arg;
713 	t->tqent_taskq = tq;
714 	t->tqent_timer.function = NULL;
715 	t->tqent_timer.expires = 0;
716 
717 	t->tqent_birth = jiffies;
718 	DTRACE_PROBE1(taskq_ent__birth, taskq_ent_t *, t);
719 
720 	ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
721 
722 	spin_unlock(&t->tqent_lock);
723 
724 	wake_up(&tq->tq_work_waitq);
725 
726 	TQSTAT_INC(tq, tasks_dispatched);
727 
728 	/* Spawn additional taskq threads if required. */
729 	if (!(flags & TQ_NOQUEUE) && tq->tq_nactive == tq->tq_nthreads)
730 		(void) taskq_thread_spawn(tq);
731 out:
732 	spin_unlock_irqrestore(&tq->tq_lock, irqflags);
733 	return (rc);
734 }
735 EXPORT_SYMBOL(taskq_dispatch);
736 
737 taskqid_t
taskq_dispatch_delay(taskq_t * tq,task_func_t func,void * arg,uint_t flags,clock_t expire_time)738 taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg,
739     uint_t flags, clock_t expire_time)
740 {
741 	taskqid_t rc = TASKQID_INVALID;
742 	taskq_ent_t *t;
743 	unsigned long irqflags;
744 
745 	ASSERT(tq);
746 	ASSERT(func);
747 
748 	spin_lock_irqsave_nested(&tq->tq_lock, irqflags, tq->tq_lock_class);
749 
750 	/* Taskq being destroyed and all tasks drained */
751 	if (!(tq->tq_flags & TASKQ_ACTIVE))
752 		goto out;
753 
754 	if ((t = task_alloc(tq, flags, &irqflags)) == NULL)
755 		goto out;
756 
757 	spin_lock(&t->tqent_lock);
758 
759 	/* Queue to the delay list for subsequent execution */
760 	list_add_tail(&t->tqent_list, &tq->tq_delay_list);
761 	TQENT_SET_LIST(t, TQENT_LIST_DELAY);
762 	TQSTAT_INC_LIST(tq, t);
763 	TQSTAT_INC(tq, tasks_total);
764 
765 	t->tqent_id = rc = tq->tq_next_id;
766 	tq->tq_next_id++;
767 	t->tqent_func = func;
768 	t->tqent_arg = arg;
769 	t->tqent_taskq = tq;
770 	t->tqent_timer.function = task_expire;
771 	t->tqent_timer.expires = (unsigned long)expire_time;
772 	add_timer(&t->tqent_timer);
773 
774 	ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
775 
776 	spin_unlock(&t->tqent_lock);
777 
778 	TQSTAT_INC(tq, tasks_dispatched_delayed);
779 
780 	/* Spawn additional taskq threads if required. */
781 	if (tq->tq_nactive == tq->tq_nthreads)
782 		(void) taskq_thread_spawn(tq);
783 out:
784 	spin_unlock_irqrestore(&tq->tq_lock, irqflags);
785 	return (rc);
786 }
787 EXPORT_SYMBOL(taskq_dispatch_delay);
788 
789 void
taskq_dispatch_ent(taskq_t * tq,task_func_t func,void * arg,uint_t flags,taskq_ent_t * t)790 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
791     taskq_ent_t *t)
792 {
793 	unsigned long irqflags;
794 	ASSERT(tq);
795 	ASSERT(func);
796 
797 	spin_lock_irqsave_nested(&tq->tq_lock, irqflags,
798 	    tq->tq_lock_class);
799 
800 	/* Taskq being destroyed and all tasks drained */
801 	if (!(tq->tq_flags & TASKQ_ACTIVE)) {
802 		t->tqent_id = TASKQID_INVALID;
803 		goto out;
804 	}
805 
806 	if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads)) {
807 		/* Dynamic taskq may be able to spawn another thread */
808 		if (taskq_thread_spawn(tq) == 0)
809 			goto out;
810 		flags |= TQ_FRONT;
811 	}
812 
813 	spin_lock(&t->tqent_lock);
814 
815 	/*
816 	 * Make sure the entry is not on some other taskq; it is important to
817 	 * ASSERT() under lock
818 	 */
819 	ASSERT(taskq_empty_ent(t));
820 
821 	/*
822 	 * Mark it as a prealloc'd task.  This is important
823 	 * to ensure that we don't free it later.
824 	 */
825 	t->tqent_flags |= TQENT_FLAG_PREALLOC;
826 
827 	/* Queue to the priority list instead of the pending list */
828 	if (flags & TQ_FRONT) {
829 		TQENT_SET_LIST(t, TQENT_LIST_PRIORITY);
830 		list_add_tail(&t->tqent_list, &tq->tq_prio_list);
831 	} else {
832 		TQENT_SET_LIST(t, TQENT_LIST_PENDING);
833 		list_add_tail(&t->tqent_list, &tq->tq_pend_list);
834 	}
835 	TQSTAT_INC_LIST(tq, t);
836 	TQSTAT_INC(tq, tasks_total);
837 
838 	t->tqent_id = tq->tq_next_id;
839 	tq->tq_next_id++;
840 	t->tqent_func = func;
841 	t->tqent_arg = arg;
842 	t->tqent_taskq = tq;
843 
844 	t->tqent_birth = jiffies;
845 	DTRACE_PROBE1(taskq_ent__birth, taskq_ent_t *, t);
846 
847 	spin_unlock(&t->tqent_lock);
848 
849 	wake_up(&tq->tq_work_waitq);
850 
851 	TQSTAT_INC(tq, tasks_dispatched);
852 
853 	/* Spawn additional taskq threads if required. */
854 	if (tq->tq_nactive == tq->tq_nthreads)
855 		(void) taskq_thread_spawn(tq);
856 out:
857 	spin_unlock_irqrestore(&tq->tq_lock, irqflags);
858 }
859 EXPORT_SYMBOL(taskq_dispatch_ent);
860 
861 int
taskq_empty_ent(taskq_ent_t * t)862 taskq_empty_ent(taskq_ent_t *t)
863 {
864 	return (list_empty(&t->tqent_list));
865 }
866 EXPORT_SYMBOL(taskq_empty_ent);
867 
868 void
taskq_init_ent(taskq_ent_t * t)869 taskq_init_ent(taskq_ent_t *t)
870 {
871 	spin_lock_init(&t->tqent_lock);
872 	init_waitqueue_head(&t->tqent_waitq);
873 	timer_setup(&t->tqent_timer, NULL, 0);
874 	INIT_LIST_HEAD(&t->tqent_list);
875 	t->tqent_id = 0;
876 	t->tqent_func = NULL;
877 	t->tqent_arg = NULL;
878 	t->tqent_flags = 0;
879 	t->tqent_taskq = NULL;
880 }
881 EXPORT_SYMBOL(taskq_init_ent);
882 
883 /*
884  * Return the next pending task, preference is given to tasks on the
885  * priority list which were dispatched with TQ_FRONT.
886  */
887 static taskq_ent_t *
taskq_next_ent(taskq_t * tq)888 taskq_next_ent(taskq_t *tq)
889 {
890 	struct list_head *list;
891 
892 	if (!list_empty(&tq->tq_prio_list))
893 		list = &tq->tq_prio_list;
894 	else if (!list_empty(&tq->tq_pend_list))
895 		list = &tq->tq_pend_list;
896 	else
897 		return (NULL);
898 
899 	return (list_entry(list->next, taskq_ent_t, tqent_list));
900 }
901 
902 /*
903  * Spawns a new thread for the specified taskq.
904  */
905 static void
taskq_thread_spawn_task(void * arg)906 taskq_thread_spawn_task(void *arg)
907 {
908 	taskq_t *tq = (taskq_t *)arg;
909 	unsigned long flags;
910 
911 	if (taskq_thread_create(tq) == NULL) {
912 		/* restore spawning count if failed */
913 		spin_lock_irqsave_nested(&tq->tq_lock, flags,
914 		    tq->tq_lock_class);
915 		tq->tq_nspawn--;
916 		spin_unlock_irqrestore(&tq->tq_lock, flags);
917 	}
918 }
919 
920 /*
921  * Spawn addition threads for dynamic taskqs (TASKQ_DYNAMIC) the current
922  * number of threads is insufficient to handle the pending tasks.  These
923  * new threads must be created by the dedicated dynamic_taskq to avoid
924  * deadlocks between thread creation and memory reclaim.  The system_taskq
925  * which is also a dynamic taskq cannot be safely used for this.
926  */
927 static int
taskq_thread_spawn(taskq_t * tq)928 taskq_thread_spawn(taskq_t *tq)
929 {
930 	int spawning = 0;
931 
932 	if (!(tq->tq_flags & TASKQ_DYNAMIC))
933 		return (0);
934 
935 	tq->lastspawnstop = jiffies;
936 	if ((tq->tq_nthreads + tq->tq_nspawn < tq->tq_maxthreads) &&
937 	    (tq->tq_flags & TASKQ_ACTIVE)) {
938 		spawning = (++tq->tq_nspawn);
939 		taskq_dispatch(dynamic_taskq, taskq_thread_spawn_task,
940 		    tq, TQ_NOSLEEP);
941 	}
942 
943 	return (spawning);
944 }
945 
946 /*
947  * Threads in a dynamic taskq may exit once there is no more work to do.
948  * To prevent threads from being created and destroyed too often limit
949  * the exit rate to one per spl_taskq_thread_timeout_ms.
950  *
951  * The first thread is the thread list is treated as the primary thread.
952  * There is nothing special about the primary thread but in order to avoid
953  * all the taskq pids from changing we opt to make it long running.
954  */
955 static int
taskq_thread_should_stop(taskq_t * tq,taskq_thread_t * tqt)956 taskq_thread_should_stop(taskq_t *tq, taskq_thread_t *tqt)
957 {
958 	ASSERT(!taskq_next_ent(tq));
959 	if (!(tq->tq_flags & TASKQ_DYNAMIC) || !spl_taskq_thread_dynamic)
960 		return (0);
961 	if (!(tq->tq_flags & TASKQ_ACTIVE))
962 		return (1);
963 	if (list_first_entry(&(tq->tq_thread_list), taskq_thread_t,
964 	    tqt_thread_list) == tqt)
965 		return (0);
966 	ASSERT3U(tq->tq_nthreads, >, 1);
967 	if (tq->tq_nspawn != 0)
968 		return (0);
969 	if (time_before(jiffies, tq->lastspawnstop +
970 	    msecs_to_jiffies(spl_taskq_thread_timeout_ms)))
971 		return (0);
972 	tq->lastspawnstop = jiffies;
973 	return (1);
974 }
975 
976 static int
taskq_thread(void * args)977 taskq_thread(void *args)
978 {
979 	DECLARE_WAITQUEUE(wait, current);
980 	sigset_t blocked;
981 	taskq_thread_t *tqt = args;
982 	taskq_t *tq;
983 	taskq_ent_t *t;
984 	int seq_tasks = 0;
985 	unsigned long flags;
986 	taskq_ent_t dup_task = {};
987 
988 	ASSERT(tqt);
989 	ASSERT(tqt->tqt_tq);
990 	tq = tqt->tqt_tq;
991 	current->flags |= PF_NOFREEZE;
992 
993 	(void) spl_fstrans_mark();
994 
995 	sigfillset(&blocked);
996 	sigprocmask(SIG_BLOCK, &blocked, NULL);
997 	flush_signals(current);
998 
999 	tsd_set(taskq_tsd, tq);
1000 	spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
1001 	/*
1002 	 * If we are dynamically spawned, decrease spawning count. Note that
1003 	 * we could be created during taskq_create, in which case we shouldn't
1004 	 * do the decrement. But it's fine because taskq_create will reset
1005 	 * tq_nspawn later.
1006 	 */
1007 	if (tq->tq_flags & TASKQ_DYNAMIC)
1008 		tq->tq_nspawn--;
1009 
1010 	/* Immediately exit if more threads than allowed were created. */
1011 	if (tq->tq_nthreads >= tq->tq_maxthreads)
1012 		goto error;
1013 
1014 	tq->tq_nthreads++;
1015 	list_add_tail(&tqt->tqt_thread_list, &tq->tq_thread_list);
1016 	wake_up(&tq->tq_wait_waitq);
1017 	set_current_state(TASK_INTERRUPTIBLE);
1018 
1019 	TQSTAT_INC(tq, threads_total);
1020 
1021 	while (!kthread_should_stop()) {
1022 
1023 		if (list_empty(&tq->tq_pend_list) &&
1024 		    list_empty(&tq->tq_prio_list)) {
1025 
1026 			if (taskq_thread_should_stop(tq, tqt))
1027 				break;
1028 
1029 			add_wait_queue_exclusive(&tq->tq_work_waitq, &wait);
1030 			spin_unlock_irqrestore(&tq->tq_lock, flags);
1031 
1032 			TQSTAT_INC(tq, thread_sleeps);
1033 			TQSTAT_INC(tq, threads_idle);
1034 
1035 			schedule();
1036 			seq_tasks = 0;
1037 
1038 			TQSTAT_DEC(tq, threads_idle);
1039 			TQSTAT_INC(tq, thread_wakeups);
1040 
1041 			spin_lock_irqsave_nested(&tq->tq_lock, flags,
1042 			    tq->tq_lock_class);
1043 			remove_wait_queue(&tq->tq_work_waitq, &wait);
1044 		} else {
1045 			__set_current_state(TASK_RUNNING);
1046 		}
1047 
1048 		if ((t = taskq_next_ent(tq)) != NULL) {
1049 			list_del_init(&t->tqent_list);
1050 			TQSTAT_DEC_LIST(tq, t);
1051 			TQSTAT_DEC(tq, tasks_total);
1052 
1053 			/*
1054 			 * A TQENT_FLAG_PREALLOC task may be reused or freed
1055 			 * during the task function call. Store tqent_id and
1056 			 * tqent_flags here.
1057 			 *
1058 			 * Also use an on stack taskq_ent_t for tqt_task
1059 			 * assignment in this case; we want to make sure
1060 			 * to duplicate all fields, so the values are
1061 			 * correct when it's accessed via DTRACE_PROBE*.
1062 			 */
1063 			tqt->tqt_id = t->tqent_id;
1064 			tqt->tqt_flags = t->tqent_flags;
1065 
1066 			if (t->tqent_flags & TQENT_FLAG_PREALLOC) {
1067 				dup_task = *t;
1068 				t = &dup_task;
1069 			}
1070 			tqt->tqt_task = t;
1071 
1072 			taskq_insert_in_order(tq, tqt);
1073 			tq->tq_nactive++;
1074 			spin_unlock_irqrestore(&tq->tq_lock, flags);
1075 
1076 			TQSTAT_INC(tq, threads_active);
1077 			DTRACE_PROBE1(taskq_ent__start, taskq_ent_t *, t);
1078 
1079 			/* Perform the requested task */
1080 			t->tqent_func(t->tqent_arg);
1081 
1082 			DTRACE_PROBE1(taskq_ent__finish, taskq_ent_t *, t);
1083 
1084 			TQSTAT_DEC(tq, threads_active);
1085 			if ((t->tqent_flags & TQENT_LIST_MASK) ==
1086 			    TQENT_LIST_PENDING)
1087 				TQSTAT_INC(tq, tasks_executed_normal);
1088 			else
1089 				TQSTAT_INC(tq, tasks_executed_priority);
1090 			TQSTAT_INC(tq, tasks_executed);
1091 
1092 			spin_lock_irqsave_nested(&tq->tq_lock, flags,
1093 			    tq->tq_lock_class);
1094 
1095 			tq->tq_nactive--;
1096 			list_del_init(&tqt->tqt_active_list);
1097 			tqt->tqt_task = NULL;
1098 
1099 			/* For prealloc'd tasks, we don't free anything. */
1100 			if (!(tqt->tqt_flags & TQENT_FLAG_PREALLOC))
1101 				task_done(tq, t);
1102 
1103 			/*
1104 			 * When the current lowest outstanding taskqid is
1105 			 * done calculate the new lowest outstanding id
1106 			 */
1107 			if (tq->tq_lowest_id == tqt->tqt_id) {
1108 				tq->tq_lowest_id = taskq_lowest_id(tq);
1109 				ASSERT3S(tq->tq_lowest_id, >, tqt->tqt_id);
1110 			}
1111 
1112 			/* Spawn additional taskq threads if required. */
1113 			if ((++seq_tasks) > spl_taskq_thread_sequential &&
1114 			    taskq_thread_spawn(tq))
1115 				seq_tasks = 0;
1116 
1117 			tqt->tqt_id = TASKQID_INVALID;
1118 			tqt->tqt_flags = 0;
1119 			wake_up_all(&tq->tq_wait_waitq);
1120 		} else
1121 			TQSTAT_INC(tq, thread_wakeups_nowork);
1122 
1123 		set_current_state(TASK_INTERRUPTIBLE);
1124 
1125 	}
1126 
1127 	__set_current_state(TASK_RUNNING);
1128 	tq->tq_nthreads--;
1129 	list_del_init(&tqt->tqt_thread_list);
1130 
1131 	TQSTAT_DEC(tq, threads_total);
1132 	TQSTAT_INC(tq, threads_destroyed);
1133 
1134 error:
1135 	kmem_free(tqt, sizeof (taskq_thread_t));
1136 	spin_unlock_irqrestore(&tq->tq_lock, flags);
1137 
1138 	tsd_set(taskq_tsd, NULL);
1139 	thread_exit();
1140 
1141 	return (0);
1142 }
1143 
1144 static taskq_thread_t *
taskq_thread_create(taskq_t * tq)1145 taskq_thread_create(taskq_t *tq)
1146 {
1147 	static int last_used_cpu = 0;
1148 	taskq_thread_t *tqt;
1149 
1150 	tqt = kmem_alloc(sizeof (*tqt), KM_PUSHPAGE);
1151 	INIT_LIST_HEAD(&tqt->tqt_thread_list);
1152 	INIT_LIST_HEAD(&tqt->tqt_active_list);
1153 	tqt->tqt_tq = tq;
1154 	tqt->tqt_id = TASKQID_INVALID;
1155 
1156 	tqt->tqt_thread = spl_kthread_create(taskq_thread, tqt,
1157 	    "%s", tq->tq_name);
1158 	if (tqt->tqt_thread == NULL) {
1159 		kmem_free(tqt, sizeof (taskq_thread_t));
1160 		return (NULL);
1161 	}
1162 
1163 	if (spl_taskq_thread_bind) {
1164 		last_used_cpu = (last_used_cpu + 1) % num_online_cpus();
1165 		kthread_bind(tqt->tqt_thread, last_used_cpu);
1166 	}
1167 
1168 	if (spl_taskq_thread_priority)
1169 		set_user_nice(tqt->tqt_thread, PRIO_TO_NICE(tq->tq_pri));
1170 
1171 	wake_up_process(tqt->tqt_thread);
1172 
1173 	TQSTAT_INC(tq, threads_created);
1174 
1175 	return (tqt);
1176 }
1177 
1178 static void
taskq_stats_init(taskq_t * tq)1179 taskq_stats_init(taskq_t *tq)
1180 {
1181 	taskq_sums_t *tqs = &tq->tq_sums;
1182 	wmsum_init(&tqs->tqs_threads_active, 0);
1183 	wmsum_init(&tqs->tqs_threads_idle, 0);
1184 	wmsum_init(&tqs->tqs_threads_total, 0);
1185 	wmsum_init(&tqs->tqs_tasks_pending, 0);
1186 	wmsum_init(&tqs->tqs_tasks_priority, 0);
1187 	wmsum_init(&tqs->tqs_tasks_total, 0);
1188 	wmsum_init(&tqs->tqs_tasks_delayed, 0);
1189 	wmsum_init(&tqs->tqs_entries_free, 0);
1190 	wmsum_init(&tqs->tqs_threads_created, 0);
1191 	wmsum_init(&tqs->tqs_threads_destroyed, 0);
1192 	wmsum_init(&tqs->tqs_tasks_dispatched, 0);
1193 	wmsum_init(&tqs->tqs_tasks_dispatched_delayed, 0);
1194 	wmsum_init(&tqs->tqs_tasks_executed_normal, 0);
1195 	wmsum_init(&tqs->tqs_tasks_executed_priority, 0);
1196 	wmsum_init(&tqs->tqs_tasks_executed, 0);
1197 	wmsum_init(&tqs->tqs_tasks_delayed_requeued, 0);
1198 	wmsum_init(&tqs->tqs_tasks_cancelled, 0);
1199 	wmsum_init(&tqs->tqs_thread_wakeups, 0);
1200 	wmsum_init(&tqs->tqs_thread_wakeups_nowork, 0);
1201 	wmsum_init(&tqs->tqs_thread_sleeps, 0);
1202 }
1203 
1204 static void
taskq_stats_fini(taskq_t * tq)1205 taskq_stats_fini(taskq_t *tq)
1206 {
1207 	taskq_sums_t *tqs = &tq->tq_sums;
1208 	wmsum_fini(&tqs->tqs_threads_active);
1209 	wmsum_fini(&tqs->tqs_threads_idle);
1210 	wmsum_fini(&tqs->tqs_threads_total);
1211 	wmsum_fini(&tqs->tqs_tasks_pending);
1212 	wmsum_fini(&tqs->tqs_tasks_priority);
1213 	wmsum_fini(&tqs->tqs_tasks_total);
1214 	wmsum_fini(&tqs->tqs_tasks_delayed);
1215 	wmsum_fini(&tqs->tqs_entries_free);
1216 	wmsum_fini(&tqs->tqs_threads_created);
1217 	wmsum_fini(&tqs->tqs_threads_destroyed);
1218 	wmsum_fini(&tqs->tqs_tasks_dispatched);
1219 	wmsum_fini(&tqs->tqs_tasks_dispatched_delayed);
1220 	wmsum_fini(&tqs->tqs_tasks_executed_normal);
1221 	wmsum_fini(&tqs->tqs_tasks_executed_priority);
1222 	wmsum_fini(&tqs->tqs_tasks_executed);
1223 	wmsum_fini(&tqs->tqs_tasks_delayed_requeued);
1224 	wmsum_fini(&tqs->tqs_tasks_cancelled);
1225 	wmsum_fini(&tqs->tqs_thread_wakeups);
1226 	wmsum_fini(&tqs->tqs_thread_wakeups_nowork);
1227 	wmsum_fini(&tqs->tqs_thread_sleeps);
1228 }
1229 
1230 static int
taskq_kstats_update(kstat_t * ksp,int rw)1231 taskq_kstats_update(kstat_t *ksp, int rw)
1232 {
1233 	if (rw == KSTAT_WRITE)
1234 		return (EACCES);
1235 
1236 	taskq_t *tq = ksp->ks_private;
1237 	taskq_kstats_t *tqks = ksp->ks_data;
1238 
1239 	tqks->tqks_threads_max.value.ui64 = tq->tq_maxthreads;
1240 	tqks->tqks_entry_pool_min.value.ui64 = tq->tq_minalloc;
1241 	tqks->tqks_entry_pool_max.value.ui64 = tq->tq_maxalloc;
1242 
1243 	taskq_sums_t *tqs = &tq->tq_sums;
1244 
1245 	tqks->tqks_threads_active.value.ui64 =
1246 	    wmsum_value(&tqs->tqs_threads_active);
1247 	tqks->tqks_threads_idle.value.ui64 =
1248 	    wmsum_value(&tqs->tqs_threads_idle);
1249 	tqks->tqks_threads_total.value.ui64 =
1250 	    wmsum_value(&tqs->tqs_threads_total);
1251 	tqks->tqks_tasks_pending.value.ui64 =
1252 	    wmsum_value(&tqs->tqs_tasks_pending);
1253 	tqks->tqks_tasks_priority.value.ui64 =
1254 	    wmsum_value(&tqs->tqs_tasks_priority);
1255 	tqks->tqks_tasks_total.value.ui64 =
1256 	    wmsum_value(&tqs->tqs_tasks_total);
1257 	tqks->tqks_tasks_delayed.value.ui64 =
1258 	    wmsum_value(&tqs->tqs_tasks_delayed);
1259 	tqks->tqks_entries_free.value.ui64 =
1260 	    wmsum_value(&tqs->tqs_entries_free);
1261 	tqks->tqks_threads_created.value.ui64 =
1262 	    wmsum_value(&tqs->tqs_threads_created);
1263 	tqks->tqks_threads_destroyed.value.ui64 =
1264 	    wmsum_value(&tqs->tqs_threads_destroyed);
1265 	tqks->tqks_tasks_dispatched.value.ui64 =
1266 	    wmsum_value(&tqs->tqs_tasks_dispatched);
1267 	tqks->tqks_tasks_dispatched_delayed.value.ui64 =
1268 	    wmsum_value(&tqs->tqs_tasks_dispatched_delayed);
1269 	tqks->tqks_tasks_executed_normal.value.ui64 =
1270 	    wmsum_value(&tqs->tqs_tasks_executed_normal);
1271 	tqks->tqks_tasks_executed_priority.value.ui64 =
1272 	    wmsum_value(&tqs->tqs_tasks_executed_priority);
1273 	tqks->tqks_tasks_executed.value.ui64 =
1274 	    wmsum_value(&tqs->tqs_tasks_executed);
1275 	tqks->tqks_tasks_delayed_requeued.value.ui64 =
1276 	    wmsum_value(&tqs->tqs_tasks_delayed_requeued);
1277 	tqks->tqks_tasks_cancelled.value.ui64 =
1278 	    wmsum_value(&tqs->tqs_tasks_cancelled);
1279 	tqks->tqks_thread_wakeups.value.ui64 =
1280 	    wmsum_value(&tqs->tqs_thread_wakeups);
1281 	tqks->tqks_thread_wakeups_nowork.value.ui64 =
1282 	    wmsum_value(&tqs->tqs_thread_wakeups_nowork);
1283 	tqks->tqks_thread_sleeps.value.ui64 =
1284 	    wmsum_value(&tqs->tqs_thread_sleeps);
1285 
1286 	return (0);
1287 }
1288 
1289 static void
taskq_kstats_init(taskq_t * tq)1290 taskq_kstats_init(taskq_t *tq)
1291 {
1292 	char name[TASKQ_NAMELEN+5]; /* 5 for dot, 3x instance digits, null */
1293 	snprintf(name, sizeof (name), "%s.%d", tq->tq_name, tq->tq_instance);
1294 
1295 	kstat_t *ksp = kstat_create("taskq", 0, name, "misc",
1296 	    KSTAT_TYPE_NAMED, sizeof (taskq_kstats_t) / sizeof (kstat_named_t),
1297 	    KSTAT_FLAG_VIRTUAL);
1298 
1299 	if (ksp == NULL)
1300 		return;
1301 
1302 	ksp->ks_private = tq;
1303 	ksp->ks_update = taskq_kstats_update;
1304 	ksp->ks_data = kmem_alloc(sizeof (taskq_kstats_t), KM_SLEEP);
1305 	memcpy(ksp->ks_data, &taskq_kstats_template, sizeof (taskq_kstats_t));
1306 	kstat_install(ksp);
1307 
1308 	tq->tq_ksp = ksp;
1309 }
1310 
1311 static void
taskq_kstats_fini(taskq_t * tq)1312 taskq_kstats_fini(taskq_t *tq)
1313 {
1314 	if (tq->tq_ksp == NULL)
1315 		return;
1316 
1317 	kmem_free(tq->tq_ksp->ks_data, sizeof (taskq_kstats_t));
1318 	kstat_delete(tq->tq_ksp);
1319 
1320 	tq->tq_ksp = NULL;
1321 }
1322 
1323 taskq_t *
taskq_create(const char * name,int threads_arg,pri_t pri,int minalloc,int maxalloc,uint_t flags)1324 taskq_create(const char *name, int threads_arg, pri_t pri,
1325     int minalloc, int maxalloc, uint_t flags)
1326 {
1327 	taskq_t *tq;
1328 	taskq_thread_t *tqt;
1329 	int count = 0, rc = 0, i;
1330 	unsigned long irqflags;
1331 	int nthreads = threads_arg;
1332 
1333 	ASSERT(name != NULL);
1334 	ASSERT(minalloc >= 0);
1335 	ASSERT(!(flags & (TASKQ_CPR_SAFE))); /* Unsupported */
1336 
1337 	/* Scale the number of threads using nthreads as a percentage */
1338 	if (flags & TASKQ_THREADS_CPU_PCT) {
1339 		ASSERT(nthreads <= 100);
1340 		ASSERT(nthreads >= 0);
1341 		nthreads = MIN(threads_arg, 100);
1342 		nthreads = MAX(nthreads, 0);
1343 		nthreads = MAX((num_online_cpus() * nthreads) /100, 1);
1344 	}
1345 
1346 	tq = kmem_alloc(sizeof (*tq), KM_PUSHPAGE);
1347 	if (tq == NULL)
1348 		return (NULL);
1349 
1350 	tq->tq_hp_support = B_FALSE;
1351 
1352 	if (flags & TASKQ_THREADS_CPU_PCT) {
1353 		tq->tq_hp_support = B_TRUE;
1354 		if (cpuhp_state_add_instance_nocalls(spl_taskq_cpuhp_state,
1355 		    &tq->tq_hp_cb_node) != 0) {
1356 			kmem_free(tq, sizeof (*tq));
1357 			return (NULL);
1358 		}
1359 	}
1360 
1361 	spin_lock_init(&tq->tq_lock);
1362 	INIT_LIST_HEAD(&tq->tq_thread_list);
1363 	INIT_LIST_HEAD(&tq->tq_active_list);
1364 	tq->tq_name = kmem_strdup(name);
1365 	tq->tq_nactive = 0;
1366 	tq->tq_nthreads = 0;
1367 	tq->tq_nspawn = 0;
1368 	tq->tq_maxthreads = nthreads;
1369 	tq->tq_cpu_pct = threads_arg;
1370 	tq->tq_pri = pri;
1371 	tq->tq_minalloc = minalloc;
1372 	tq->tq_maxalloc = maxalloc;
1373 	tq->tq_nalloc = 0;
1374 	tq->tq_flags = (flags | TASKQ_ACTIVE);
1375 	tq->tq_next_id = TASKQID_INITIAL;
1376 	tq->tq_lowest_id = TASKQID_INITIAL;
1377 	tq->lastspawnstop = jiffies;
1378 	INIT_LIST_HEAD(&tq->tq_free_list);
1379 	INIT_LIST_HEAD(&tq->tq_pend_list);
1380 	INIT_LIST_HEAD(&tq->tq_prio_list);
1381 	INIT_LIST_HEAD(&tq->tq_delay_list);
1382 	init_waitqueue_head(&tq->tq_work_waitq);
1383 	init_waitqueue_head(&tq->tq_wait_waitq);
1384 	tq->tq_lock_class = TQ_LOCK_GENERAL;
1385 	INIT_LIST_HEAD(&tq->tq_taskqs);
1386 	taskq_stats_init(tq);
1387 
1388 	if (flags & TASKQ_PREPOPULATE) {
1389 		spin_lock_irqsave_nested(&tq->tq_lock, irqflags,
1390 		    tq->tq_lock_class);
1391 
1392 		for (i = 0; i < minalloc; i++)
1393 			task_done(tq, task_alloc(tq, TQ_PUSHPAGE | TQ_NEW,
1394 			    &irqflags));
1395 
1396 		spin_unlock_irqrestore(&tq->tq_lock, irqflags);
1397 	}
1398 
1399 	if ((flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic)
1400 		nthreads = 1;
1401 
1402 	for (i = 0; i < nthreads; i++) {
1403 		tqt = taskq_thread_create(tq);
1404 		if (tqt == NULL)
1405 			rc = 1;
1406 		else
1407 			count++;
1408 	}
1409 
1410 	/* Wait for all threads to be started before potential destroy */
1411 	wait_event(tq->tq_wait_waitq, tq->tq_nthreads == count);
1412 	/*
1413 	 * taskq_thread might have touched nspawn, but we don't want them to
1414 	 * because they're not dynamically spawned. So we reset it to 0
1415 	 */
1416 	tq->tq_nspawn = 0;
1417 
1418 	if (rc) {
1419 		taskq_destroy(tq);
1420 		return (NULL);
1421 	}
1422 
1423 	down_write(&tq_list_sem);
1424 	tq->tq_instance = taskq_find_by_name(name) + 1;
1425 	list_add_tail(&tq->tq_taskqs, &tq_list);
1426 	up_write(&tq_list_sem);
1427 
1428 	/* Install kstats late, because the name includes tq_instance */
1429 	taskq_kstats_init(tq);
1430 
1431 	return (tq);
1432 }
1433 EXPORT_SYMBOL(taskq_create);
1434 
1435 void
taskq_destroy(taskq_t * tq)1436 taskq_destroy(taskq_t *tq)
1437 {
1438 	struct task_struct *thread;
1439 	taskq_thread_t *tqt;
1440 	taskq_ent_t *t;
1441 	unsigned long flags;
1442 
1443 	ASSERT(tq);
1444 	spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
1445 	tq->tq_flags &= ~TASKQ_ACTIVE;
1446 	spin_unlock_irqrestore(&tq->tq_lock, flags);
1447 
1448 	if (tq->tq_hp_support) {
1449 		VERIFY0(cpuhp_state_remove_instance_nocalls(
1450 		    spl_taskq_cpuhp_state, &tq->tq_hp_cb_node));
1451 	}
1452 
1453 	/*
1454 	 * When TASKQ_ACTIVE is clear new tasks may not be added nor may
1455 	 * new worker threads be spawned for dynamic taskq.
1456 	 */
1457 	if (dynamic_taskq != NULL)
1458 		taskq_wait_outstanding(dynamic_taskq, 0);
1459 
1460 	taskq_wait(tq);
1461 
1462 	taskq_kstats_fini(tq);
1463 
1464 	/* remove taskq from global list used by the kstats */
1465 	down_write(&tq_list_sem);
1466 	list_del(&tq->tq_taskqs);
1467 	up_write(&tq_list_sem);
1468 
1469 	spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
1470 	/* wait for spawning threads to insert themselves to the list */
1471 	while (tq->tq_nspawn) {
1472 		spin_unlock_irqrestore(&tq->tq_lock, flags);
1473 		schedule_timeout_interruptible(1);
1474 		spin_lock_irqsave_nested(&tq->tq_lock, flags,
1475 		    tq->tq_lock_class);
1476 	}
1477 
1478 	/*
1479 	 * Signal each thread to exit and block until it does.  Each thread
1480 	 * is responsible for removing itself from the list and freeing its
1481 	 * taskq_thread_t.  This allows for idle threads to opt to remove
1482 	 * themselves from the taskq.  They can be recreated as needed.
1483 	 */
1484 	while (!list_empty(&tq->tq_thread_list)) {
1485 		tqt = list_entry(tq->tq_thread_list.next,
1486 		    taskq_thread_t, tqt_thread_list);
1487 		thread = tqt->tqt_thread;
1488 		spin_unlock_irqrestore(&tq->tq_lock, flags);
1489 
1490 		kthread_stop(thread);
1491 
1492 		spin_lock_irqsave_nested(&tq->tq_lock, flags,
1493 		    tq->tq_lock_class);
1494 	}
1495 
1496 	while (!list_empty(&tq->tq_free_list)) {
1497 		t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);
1498 
1499 		ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
1500 
1501 		list_del_init(&t->tqent_list);
1502 		task_free(tq, t);
1503 	}
1504 
1505 	ASSERT0(tq->tq_nthreads);
1506 	ASSERT0(tq->tq_nalloc);
1507 	ASSERT0(tq->tq_nspawn);
1508 	ASSERT(list_empty(&tq->tq_thread_list));
1509 	ASSERT(list_empty(&tq->tq_active_list));
1510 	ASSERT(list_empty(&tq->tq_free_list));
1511 	ASSERT(list_empty(&tq->tq_pend_list));
1512 	ASSERT(list_empty(&tq->tq_prio_list));
1513 	ASSERT(list_empty(&tq->tq_delay_list));
1514 
1515 	spin_unlock_irqrestore(&tq->tq_lock, flags);
1516 
1517 	taskq_stats_fini(tq);
1518 	kmem_strfree(tq->tq_name);
1519 	kmem_free(tq, sizeof (taskq_t));
1520 }
1521 EXPORT_SYMBOL(taskq_destroy);
1522 
1523 /*
1524  * Create a taskq with a specified number of pool threads. Allocate
1525  * and return an array of nthreads kthread_t pointers, one for each
1526  * thread in the pool. The array is not ordered and must be freed
1527  * by the caller.
1528  */
1529 taskq_t *
taskq_create_synced(const char * name,int nthreads,pri_t pri,int minalloc,int maxalloc,uint_t flags,kthread_t *** ktpp)1530 taskq_create_synced(const char *name, int nthreads, pri_t pri,
1531     int minalloc, int maxalloc, uint_t flags, kthread_t ***ktpp)
1532 {
1533 	taskq_t *tq;
1534 	taskq_thread_t *tqt;
1535 	int i = 0;
1536 	kthread_t **kthreads = kmem_zalloc(sizeof (*kthreads) * nthreads,
1537 	    KM_SLEEP);
1538 
1539 	flags &= ~(TASKQ_DYNAMIC | TASKQ_THREADS_CPU_PCT | TASKQ_DC_BATCH);
1540 
1541 	/* taskq_create spawns all the threads before returning */
1542 	tq = taskq_create(name, nthreads, minclsyspri, nthreads, INT_MAX,
1543 	    flags | TASKQ_PREPOPULATE);
1544 	VERIFY(tq != NULL);
1545 	VERIFY(tq->tq_nthreads == nthreads);
1546 
1547 	list_for_each_entry(tqt, &tq->tq_thread_list, tqt_thread_list) {
1548 		kthreads[i] = tqt->tqt_thread;
1549 		i++;
1550 	}
1551 
1552 	ASSERT3S(i, ==, nthreads);
1553 	*ktpp = kthreads;
1554 
1555 	return (tq);
1556 }
1557 EXPORT_SYMBOL(taskq_create_synced);
1558 
1559 static kstat_t *taskq_summary_ksp = NULL;
1560 
1561 static int
spl_taskq_kstat_headers(char * buf,size_t size)1562 spl_taskq_kstat_headers(char *buf, size_t size)
1563 {
1564 	size_t n = snprintf(buf, size,
1565 	    "%-20s | %-17s | %-23s\n"
1566 	    "%-20s | %-17s | %-23s\n"
1567 	    "%-20s | %-17s | %-23s\n",
1568 	    "", "threads", "tasks on queue",
1569 	    "taskq name", "tot [act idl] max", " pend [ norm  high] dly",
1570 	    "--------------------", "-----------------",
1571 	    "-----------------------");
1572 	return (n >= size ? ENOMEM : 0);
1573 }
1574 
1575 static int
spl_taskq_kstat_data(char * buf,size_t size,void * data)1576 spl_taskq_kstat_data(char *buf, size_t size, void *data)
1577 {
1578 	struct list_head *tql = NULL;
1579 	taskq_t *tq;
1580 	char name[TASKQ_NAMELEN+5]; /* 5 for dot, 3x instance digits, null */
1581 	char threads[25];
1582 	char tasks[30];
1583 	size_t n;
1584 	int err = 0;
1585 
1586 	down_read(&tq_list_sem);
1587 	list_for_each_prev(tql, &tq_list) {
1588 		tq = list_entry(tql, taskq_t, tq_taskqs);
1589 
1590 		mutex_enter(tq->tq_ksp->ks_lock);
1591 		taskq_kstats_update(tq->tq_ksp, KSTAT_READ);
1592 		taskq_kstats_t *tqks = tq->tq_ksp->ks_data;
1593 
1594 		snprintf(name, sizeof (name), "%s.%d", tq->tq_name,
1595 		    tq->tq_instance);
1596 		snprintf(threads, sizeof (threads), "%3llu [%3llu %3llu] %3llu",
1597 		    tqks->tqks_threads_total.value.ui64,
1598 		    tqks->tqks_threads_active.value.ui64,
1599 		    tqks->tqks_threads_idle.value.ui64,
1600 		    tqks->tqks_threads_max.value.ui64);
1601 		snprintf(tasks, sizeof (tasks), "%5llu [%5llu %5llu] %3llu",
1602 		    tqks->tqks_tasks_total.value.ui64,
1603 		    tqks->tqks_tasks_pending.value.ui64,
1604 		    tqks->tqks_tasks_priority.value.ui64,
1605 		    tqks->tqks_tasks_delayed.value.ui64);
1606 
1607 		mutex_exit(tq->tq_ksp->ks_lock);
1608 
1609 		n = snprintf(buf, size, "%-20s | %-17s | %-23s\n",
1610 		    name, threads, tasks);
1611 		if (n >= size) {
1612 			err = ENOMEM;
1613 			break;
1614 		}
1615 
1616 		buf = &buf[n];
1617 		size -= n;
1618 	}
1619 
1620 	up_read(&tq_list_sem);
1621 
1622 	return (err);
1623 }
1624 
1625 static void
spl_taskq_kstat_init(void)1626 spl_taskq_kstat_init(void)
1627 {
1628 	kstat_t *ksp = kstat_create("taskq", 0, "summary", "misc",
1629 	    KSTAT_TYPE_RAW, 0, KSTAT_FLAG_VIRTUAL);
1630 
1631 	if (ksp == NULL)
1632 		return;
1633 
1634 	ksp->ks_data = (void *)(uintptr_t)1;
1635 	ksp->ks_ndata = 1;
1636 	kstat_set_raw_ops(ksp, spl_taskq_kstat_headers,
1637 	    spl_taskq_kstat_data, NULL);
1638 	kstat_install(ksp);
1639 
1640 	taskq_summary_ksp = ksp;
1641 }
1642 
1643 static void
spl_taskq_kstat_fini(void)1644 spl_taskq_kstat_fini(void)
1645 {
1646 	if (taskq_summary_ksp == NULL)
1647 		return;
1648 
1649 	kstat_delete(taskq_summary_ksp);
1650 	taskq_summary_ksp = NULL;
1651 }
1652 
1653 static unsigned int spl_taskq_kick = 0;
1654 
1655 static int
param_set_taskq_kick(const char * val,zfs_kernel_param_t * kp)1656 param_set_taskq_kick(const char *val, zfs_kernel_param_t *kp)
1657 {
1658 	int ret;
1659 	taskq_t *tq = NULL;
1660 	taskq_ent_t *t;
1661 	unsigned long flags;
1662 
1663 	ret = param_set_uint(val, kp);
1664 	if (ret < 0 || !spl_taskq_kick)
1665 		return (ret);
1666 	/* reset value */
1667 	spl_taskq_kick = 0;
1668 
1669 	down_read(&tq_list_sem);
1670 	list_for_each_entry(tq, &tq_list, tq_taskqs) {
1671 		spin_lock_irqsave_nested(&tq->tq_lock, flags,
1672 		    tq->tq_lock_class);
1673 		/* Check if the first pending is older than 5 seconds */
1674 		t = taskq_next_ent(tq);
1675 		if (t && time_after(jiffies, t->tqent_birth + 5*HZ)) {
1676 			(void) taskq_thread_spawn(tq);
1677 			printk(KERN_INFO "spl: Kicked taskq %s/%d\n",
1678 			    tq->tq_name, tq->tq_instance);
1679 		}
1680 		spin_unlock_irqrestore(&tq->tq_lock, flags);
1681 	}
1682 	up_read(&tq_list_sem);
1683 	return (ret);
1684 }
1685 
1686 module_param_call(spl_taskq_kick, param_set_taskq_kick, param_get_uint,
1687 	&spl_taskq_kick, 0644);
1688 MODULE_PARM_DESC(spl_taskq_kick,
1689 	"Write nonzero to kick stuck taskqs to spawn more threads");
1690 
1691 /*
1692  * This callback will be called exactly once for each core that comes online,
1693  * for each dynamic taskq. We attempt to expand taskqs that have
1694  * TASKQ_THREADS_CPU_PCT set. We need to redo the percentage calculation every
1695  * time, to correctly determine whether or not to add a thread.
1696  */
1697 static int
spl_taskq_expand(unsigned int cpu,struct hlist_node * node)1698 spl_taskq_expand(unsigned int cpu, struct hlist_node *node)
1699 {
1700 	taskq_t *tq = list_entry(node, taskq_t, tq_hp_cb_node);
1701 	unsigned long flags;
1702 	int err = 0;
1703 
1704 	ASSERT(tq);
1705 	spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
1706 
1707 	if (!(tq->tq_flags & TASKQ_ACTIVE)) {
1708 		spin_unlock_irqrestore(&tq->tq_lock, flags);
1709 		return (err);
1710 	}
1711 
1712 	ASSERT(tq->tq_flags & TASKQ_THREADS_CPU_PCT);
1713 	int nthreads = MIN(tq->tq_cpu_pct, 100);
1714 	nthreads = MAX(((num_online_cpus() + 1) * nthreads) / 100, 1);
1715 	tq->tq_maxthreads = nthreads;
1716 
1717 	if (!((tq->tq_flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic) &&
1718 	    tq->tq_maxthreads > tq->tq_nthreads) {
1719 		spin_unlock_irqrestore(&tq->tq_lock, flags);
1720 		taskq_thread_t *tqt = taskq_thread_create(tq);
1721 		if (tqt == NULL)
1722 			err = -1;
1723 		return (err);
1724 	}
1725 	spin_unlock_irqrestore(&tq->tq_lock, flags);
1726 	return (err);
1727 }
1728 
1729 /*
1730  * While we don't support offlining CPUs, it is possible that CPUs will fail
1731  * to online successfully. We do need to be able to handle this case
1732  * gracefully.
1733  */
1734 static int
spl_taskq_prepare_down(unsigned int cpu,struct hlist_node * node)1735 spl_taskq_prepare_down(unsigned int cpu, struct hlist_node *node)
1736 {
1737 	taskq_t *tq = list_entry(node, taskq_t, tq_hp_cb_node);
1738 	unsigned long flags;
1739 
1740 	ASSERT(tq);
1741 	spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
1742 
1743 	if (!(tq->tq_flags & TASKQ_ACTIVE))
1744 		goto out;
1745 
1746 	ASSERT(tq->tq_flags & TASKQ_THREADS_CPU_PCT);
1747 	int nthreads = MIN(tq->tq_cpu_pct, 100);
1748 	nthreads = MAX(((num_online_cpus()) * nthreads) / 100, 1);
1749 	tq->tq_maxthreads = nthreads;
1750 
1751 	if (!((tq->tq_flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic) &&
1752 	    tq->tq_maxthreads < tq->tq_nthreads) {
1753 		ASSERT3U(tq->tq_maxthreads, ==, tq->tq_nthreads - 1);
1754 		taskq_thread_t *tqt = list_entry(tq->tq_thread_list.next,
1755 		    taskq_thread_t, tqt_thread_list);
1756 		struct task_struct *thread = tqt->tqt_thread;
1757 		spin_unlock_irqrestore(&tq->tq_lock, flags);
1758 
1759 		kthread_stop(thread);
1760 
1761 		return (0);
1762 	}
1763 
1764 out:
1765 	spin_unlock_irqrestore(&tq->tq_lock, flags);
1766 	return (0);
1767 }
1768 
1769 int
spl_taskq_init(void)1770 spl_taskq_init(void)
1771 {
1772 	init_rwsem(&tq_list_sem);
1773 	tsd_create(&taskq_tsd, NULL);
1774 
1775 	spl_taskq_cpuhp_state = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN,
1776 	    "fs/spl_taskq:online", spl_taskq_expand, spl_taskq_prepare_down);
1777 
1778 	system_taskq = taskq_create("spl_system_taskq", MAX(boot_ncpus, 64),
1779 	    maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE|TASKQ_DYNAMIC);
1780 	if (system_taskq == NULL)
1781 		return (-ENOMEM);
1782 
1783 	system_delay_taskq = taskq_create("spl_delay_taskq", MAX(boot_ncpus, 4),
1784 	    maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE|TASKQ_DYNAMIC);
1785 	if (system_delay_taskq == NULL) {
1786 		cpuhp_remove_multi_state(spl_taskq_cpuhp_state);
1787 		taskq_destroy(system_taskq);
1788 		return (-ENOMEM);
1789 	}
1790 
1791 	dynamic_taskq = taskq_create("spl_dynamic_taskq", 1,
1792 	    maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE);
1793 	if (dynamic_taskq == NULL) {
1794 		cpuhp_remove_multi_state(spl_taskq_cpuhp_state);
1795 		taskq_destroy(system_taskq);
1796 		taskq_destroy(system_delay_taskq);
1797 		return (-ENOMEM);
1798 	}
1799 
1800 	/*
1801 	 * This is used to annotate tq_lock, so
1802 	 *   taskq_dispatch -> taskq_thread_spawn -> taskq_dispatch
1803 	 * does not trigger a lockdep warning re: possible recursive locking
1804 	 */
1805 	dynamic_taskq->tq_lock_class = TQ_LOCK_DYNAMIC;
1806 
1807 	spl_taskq_kstat_init();
1808 
1809 	return (0);
1810 }
1811 
1812 void
spl_taskq_fini(void)1813 spl_taskq_fini(void)
1814 {
1815 	spl_taskq_kstat_fini();
1816 
1817 	taskq_destroy(dynamic_taskq);
1818 	dynamic_taskq = NULL;
1819 
1820 	taskq_destroy(system_delay_taskq);
1821 	system_delay_taskq = NULL;
1822 
1823 	taskq_destroy(system_taskq);
1824 	system_taskq = NULL;
1825 
1826 	tsd_destroy(&taskq_tsd);
1827 
1828 	cpuhp_remove_multi_state(spl_taskq_cpuhp_state);
1829 	spl_taskq_cpuhp_state = 0;
1830 }
1831