xref: /freebsd/sys/contrib/openzfs/module/os/linux/spl/spl-taskq.c (revision 7a7741af18d6c8a804cc643cb7ecda9d730c6aa6)
1 /*
2  *  Copyright (C) 2007-2010 Lawrence Livermore National Security, LLC.
3  *  Copyright (C) 2007 The Regents of the University of California.
4  *  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
5  *  Written by Brian Behlendorf <behlendorf1@llnl.gov>.
6  *  UCRL-CODE-235197
7  *
8  *  This file is part of the SPL, Solaris Porting Layer.
9  *
10  *  The SPL is free software; you can redistribute it and/or modify it
11  *  under the terms of the GNU General Public License as published by the
12  *  Free Software Foundation; either version 2 of the License, or (at your
13  *  option) any later version.
14  *
15  *  The SPL is distributed in the hope that it will be useful, but WITHOUT
16  *  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
17  *  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
18  *  for more details.
19  *
20  *  You should have received a copy of the GNU General Public License along
21  *  with the SPL.  If not, see <http://www.gnu.org/licenses/>.
22  *
23  *  Solaris Porting Layer (SPL) Task Queue Implementation.
24  */
25 /*
26  * Copyright (c) 2024, Klara Inc.
27  * Copyright (c) 2024, Syneto
28  */
29 
30 #include <sys/timer.h>
31 #include <sys/taskq.h>
32 #include <sys/kmem.h>
33 #include <sys/tsd.h>
34 #include <sys/trace_spl.h>
35 #include <sys/time.h>
36 #include <sys/atomic.h>
37 #include <sys/kstat.h>
38 #include <linux/cpuhotplug.h>
39 
40 typedef struct taskq_kstats {
41 	/* static values, for completeness */
42 	kstat_named_t tqks_threads_max;
43 	kstat_named_t tqks_entry_pool_min;
44 	kstat_named_t tqks_entry_pool_max;
45 
46 	/* gauges (inc/dec counters, current value) */
47 	kstat_named_t tqks_threads_active;
48 	kstat_named_t tqks_threads_idle;
49 	kstat_named_t tqks_threads_total;
50 	kstat_named_t tqks_tasks_pending;
51 	kstat_named_t tqks_tasks_priority;
52 	kstat_named_t tqks_tasks_total;
53 	kstat_named_t tqks_tasks_delayed;
54 	kstat_named_t tqks_entries_free;
55 
56 	/* counters (inc only, since taskq creation) */
57 	kstat_named_t tqks_threads_created;
58 	kstat_named_t tqks_threads_destroyed;
59 	kstat_named_t tqks_tasks_dispatched;
60 	kstat_named_t tqks_tasks_dispatched_delayed;
61 	kstat_named_t tqks_tasks_executed_normal;
62 	kstat_named_t tqks_tasks_executed_priority;
63 	kstat_named_t tqks_tasks_executed;
64 	kstat_named_t tqks_tasks_delayed_requeued;
65 	kstat_named_t tqks_tasks_cancelled;
66 	kstat_named_t tqks_thread_wakeups;
67 	kstat_named_t tqks_thread_wakeups_nowork;
68 	kstat_named_t tqks_thread_sleeps;
69 } taskq_kstats_t;
70 
71 static taskq_kstats_t taskq_kstats_template = {
72 	{ "threads_max",		KSTAT_DATA_UINT64 },
73 	{ "entry_pool_min",		KSTAT_DATA_UINT64 },
74 	{ "entry_pool_max",		KSTAT_DATA_UINT64 },
75 	{ "threads_active",		KSTAT_DATA_UINT64 },
76 	{ "threads_idle",		KSTAT_DATA_UINT64 },
77 	{ "threads_total",		KSTAT_DATA_UINT64 },
78 	{ "tasks_pending",		KSTAT_DATA_UINT64 },
79 	{ "tasks_priority",		KSTAT_DATA_UINT64 },
80 	{ "tasks_total",		KSTAT_DATA_UINT64 },
81 	{ "tasks_delayed",		KSTAT_DATA_UINT64 },
82 	{ "entries_free",		KSTAT_DATA_UINT64 },
83 
84 	{ "threads_created",		KSTAT_DATA_UINT64 },
85 	{ "threads_destroyed",		KSTAT_DATA_UINT64 },
86 	{ "tasks_dispatched",		KSTAT_DATA_UINT64 },
87 	{ "tasks_dispatched_delayed",	KSTAT_DATA_UINT64 },
88 	{ "tasks_executed_normal",	KSTAT_DATA_UINT64 },
89 	{ "tasks_executed_priority",	KSTAT_DATA_UINT64 },
90 	{ "tasks_executed",		KSTAT_DATA_UINT64 },
91 	{ "tasks_delayed_requeued",	KSTAT_DATA_UINT64 },
92 	{ "tasks_cancelled",		KSTAT_DATA_UINT64 },
93 	{ "thread_wakeups",		KSTAT_DATA_UINT64 },
94 	{ "thread_wakeups_nowork",	KSTAT_DATA_UINT64 },
95 	{ "thread_sleeps",		KSTAT_DATA_UINT64 },
96 };
97 
98 #define	TQSTAT_INC(tq, stat)	wmsum_add(&tq->tq_sums.tqs_##stat, 1)
99 #define	TQSTAT_DEC(tq, stat)	wmsum_add(&tq->tq_sums.tqs_##stat, -1)
100 
101 #define	_TQSTAT_MOD_LIST(mod, tq, t) do { \
102 	switch (t->tqent_flags & TQENT_LIST_MASK) {			\
103 	case TQENT_LIST_NONE: ASSERT(list_empty(&t->tqent_list)); break;\
104 	case TQENT_LIST_PENDING: mod(tq, tasks_pending); break;		\
105 	case TQENT_LIST_PRIORITY: mod(tq, tasks_priority); break;	\
106 	case TQENT_LIST_DELAY: mod(tq, tasks_delayed); break;		\
107 	}								\
108 } while (0)
109 #define	TQSTAT_INC_LIST(tq, t)	_TQSTAT_MOD_LIST(TQSTAT_INC, tq, t)
110 #define	TQSTAT_DEC_LIST(tq, t)	_TQSTAT_MOD_LIST(TQSTAT_DEC, tq, t)
111 
112 #define	TQENT_SET_LIST(t, l)	\
113 	t->tqent_flags = (t->tqent_flags & ~TQENT_LIST_MASK) | l;
114 
115 static int spl_taskq_thread_bind = 0;
116 module_param(spl_taskq_thread_bind, int, 0644);
117 MODULE_PARM_DESC(spl_taskq_thread_bind, "Bind taskq thread to CPU by default");
118 
119 static uint_t spl_taskq_thread_timeout_ms = 5000;
120 /* BEGIN CSTYLED */
121 module_param(spl_taskq_thread_timeout_ms, uint, 0644);
122 /* END CSTYLED */
123 MODULE_PARM_DESC(spl_taskq_thread_timeout_ms,
124 	"Minimum idle threads exit interval for dynamic taskqs");
125 
126 static int spl_taskq_thread_dynamic = 1;
127 module_param(spl_taskq_thread_dynamic, int, 0444);
128 MODULE_PARM_DESC(spl_taskq_thread_dynamic, "Allow dynamic taskq threads");
129 
130 static int spl_taskq_thread_priority = 1;
131 module_param(spl_taskq_thread_priority, int, 0644);
132 MODULE_PARM_DESC(spl_taskq_thread_priority,
133 	"Allow non-default priority for taskq threads");
134 
135 static uint_t spl_taskq_thread_sequential = 4;
136 /* BEGIN CSTYLED */
137 module_param(spl_taskq_thread_sequential, uint, 0644);
138 /* END CSTYLED */
139 MODULE_PARM_DESC(spl_taskq_thread_sequential,
140 	"Create new taskq threads after N sequential tasks");
141 
142 /*
143  * Global system-wide dynamic task queue available for all consumers. This
144  * taskq is not intended for long-running tasks; instead, a dedicated taskq
145  * should be created.
146  */
147 taskq_t *system_taskq;
148 EXPORT_SYMBOL(system_taskq);
149 /* Global dynamic task queue for long delay */
150 taskq_t *system_delay_taskq;
151 EXPORT_SYMBOL(system_delay_taskq);
152 
153 /* Private dedicated taskq for creating new taskq threads on demand. */
154 static taskq_t *dynamic_taskq;
155 static taskq_thread_t *taskq_thread_create(taskq_t *);
156 
157 /* Multi-callback id for cpu hotplugging. */
158 static int spl_taskq_cpuhp_state;
159 
160 /* List of all taskqs */
161 LIST_HEAD(tq_list);
162 struct rw_semaphore tq_list_sem;
163 static uint_t taskq_tsd;
164 
165 static int
task_km_flags(uint_t flags)166 task_km_flags(uint_t flags)
167 {
168 	if (flags & TQ_NOSLEEP)
169 		return (KM_NOSLEEP);
170 
171 	if (flags & TQ_PUSHPAGE)
172 		return (KM_PUSHPAGE);
173 
174 	return (KM_SLEEP);
175 }
176 
177 /*
178  * taskq_find_by_name - Find the largest instance number of a named taskq.
179  */
180 static int
taskq_find_by_name(const char * name)181 taskq_find_by_name(const char *name)
182 {
183 	struct list_head *tql = NULL;
184 	taskq_t *tq;
185 
186 	list_for_each_prev(tql, &tq_list) {
187 		tq = list_entry(tql, taskq_t, tq_taskqs);
188 		if (strcmp(name, tq->tq_name) == 0)
189 			return (tq->tq_instance);
190 	}
191 	return (-1);
192 }
193 
194 /*
195  * NOTE: Must be called with tq->tq_lock held, returns a list_t which
196  * is not attached to the free, work, or pending taskq lists.
197  */
198 static taskq_ent_t *
task_alloc(taskq_t * tq,uint_t flags,unsigned long * irqflags)199 task_alloc(taskq_t *tq, uint_t flags, unsigned long *irqflags)
200 {
201 	taskq_ent_t *t;
202 	int count = 0;
203 
204 	ASSERT(tq);
205 retry:
206 	/* Acquire taskq_ent_t's from free list if available */
207 	if (!list_empty(&tq->tq_free_list) && !(flags & TQ_NEW)) {
208 		t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);
209 
210 		ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
211 		ASSERT(!(t->tqent_flags & TQENT_FLAG_CANCEL));
212 		ASSERT(!timer_pending(&t->tqent_timer));
213 
214 		list_del_init(&t->tqent_list);
215 		TQSTAT_DEC(tq, entries_free);
216 		return (t);
217 	}
218 
219 	/* Free list is empty and memory allocations are prohibited */
220 	if (flags & TQ_NOALLOC)
221 		return (NULL);
222 
223 	/* Hit maximum taskq_ent_t pool size */
224 	if (tq->tq_nalloc >= tq->tq_maxalloc) {
225 		if (flags & TQ_NOSLEEP)
226 			return (NULL);
227 
228 		/*
229 		 * Sleep periodically polling the free list for an available
230 		 * taskq_ent_t. Dispatching with TQ_SLEEP should always succeed
231 		 * but we cannot block forever waiting for an taskq_ent_t to
232 		 * show up in the free list, otherwise a deadlock can happen.
233 		 *
234 		 * Therefore, we need to allocate a new task even if the number
235 		 * of allocated tasks is above tq->tq_maxalloc, but we still
236 		 * end up delaying the task allocation by one second, thereby
237 		 * throttling the task dispatch rate.
238 		 */
239 		spin_unlock_irqrestore(&tq->tq_lock, *irqflags);
240 		schedule_timeout_interruptible(HZ / 100);
241 		spin_lock_irqsave_nested(&tq->tq_lock, *irqflags,
242 		    tq->tq_lock_class);
243 		if (count < 100) {
244 			count++;
245 			goto retry;
246 		}
247 	}
248 
249 	spin_unlock_irqrestore(&tq->tq_lock, *irqflags);
250 	t = kmem_alloc(sizeof (taskq_ent_t), task_km_flags(flags));
251 	spin_lock_irqsave_nested(&tq->tq_lock, *irqflags, tq->tq_lock_class);
252 
253 	if (t) {
254 		taskq_init_ent(t);
255 		tq->tq_nalloc++;
256 	}
257 
258 	return (t);
259 }
260 
261 /*
262  * NOTE: Must be called with tq->tq_lock held, expects the taskq_ent_t
263  * to already be removed from the free, work, or pending taskq lists.
264  */
265 static void
task_free(taskq_t * tq,taskq_ent_t * t)266 task_free(taskq_t *tq, taskq_ent_t *t)
267 {
268 	ASSERT(tq);
269 	ASSERT(t);
270 	ASSERT(list_empty(&t->tqent_list));
271 	ASSERT(!timer_pending(&t->tqent_timer));
272 
273 	kmem_free(t, sizeof (taskq_ent_t));
274 	tq->tq_nalloc--;
275 }
276 
277 /*
278  * NOTE: Must be called with tq->tq_lock held, either destroys the
279  * taskq_ent_t if too many exist or moves it to the free list for later use.
280  */
281 static void
task_done(taskq_t * tq,taskq_ent_t * t)282 task_done(taskq_t *tq, taskq_ent_t *t)
283 {
284 	ASSERT(tq);
285 	ASSERT(t);
286 	ASSERT(list_empty(&t->tqent_list));
287 
288 	/* Wake tasks blocked in taskq_wait_id() */
289 	wake_up_all(&t->tqent_waitq);
290 
291 	if (tq->tq_nalloc <= tq->tq_minalloc) {
292 		t->tqent_id = TASKQID_INVALID;
293 		t->tqent_func = NULL;
294 		t->tqent_arg = NULL;
295 		t->tqent_flags = 0;
296 
297 		list_add_tail(&t->tqent_list, &tq->tq_free_list);
298 		TQSTAT_INC(tq, entries_free);
299 	} else {
300 		task_free(tq, t);
301 	}
302 }
303 
304 /*
305  * When a delayed task timer expires remove it from the delay list and
306  * add it to the priority list in order for immediate processing.
307  */
308 static void
task_expire_impl(taskq_ent_t * t)309 task_expire_impl(taskq_ent_t *t)
310 {
311 	taskq_ent_t *w;
312 	taskq_t *tq = t->tqent_taskq;
313 	struct list_head *l = NULL;
314 	unsigned long flags;
315 
316 	spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
317 
318 	if (t->tqent_flags & TQENT_FLAG_CANCEL) {
319 		ASSERT(list_empty(&t->tqent_list));
320 		spin_unlock_irqrestore(&tq->tq_lock, flags);
321 		return;
322 	}
323 
324 	t->tqent_birth = jiffies;
325 	DTRACE_PROBE1(taskq_ent__birth, taskq_ent_t *, t);
326 
327 	/*
328 	 * The priority list must be maintained in strict task id order
329 	 * from lowest to highest for lowest_id to be easily calculable.
330 	 */
331 	list_del(&t->tqent_list);
332 	list_for_each_prev(l, &tq->tq_prio_list) {
333 		w = list_entry(l, taskq_ent_t, tqent_list);
334 		if (w->tqent_id < t->tqent_id) {
335 			list_add(&t->tqent_list, l);
336 			break;
337 		}
338 	}
339 	if (l == &tq->tq_prio_list)
340 		list_add(&t->tqent_list, &tq->tq_prio_list);
341 
342 	spin_unlock_irqrestore(&tq->tq_lock, flags);
343 
344 	wake_up(&tq->tq_work_waitq);
345 
346 	TQSTAT_INC(tq, tasks_delayed_requeued);
347 }
348 
349 static void
task_expire(struct timer_list * tl)350 task_expire(struct timer_list *tl)
351 {
352 	struct timer_list *tmr = (struct timer_list *)tl;
353 	taskq_ent_t *t = from_timer(t, tmr, tqent_timer);
354 	task_expire_impl(t);
355 }
356 
357 /*
358  * Returns the lowest incomplete taskqid_t.  The taskqid_t may
359  * be queued on the pending list, on the priority list, on the
360  * delay list, or on the work list currently being handled, but
361  * it is not 100% complete yet.
362  */
363 static taskqid_t
taskq_lowest_id(taskq_t * tq)364 taskq_lowest_id(taskq_t *tq)
365 {
366 	taskqid_t lowest_id = tq->tq_next_id;
367 	taskq_ent_t *t;
368 	taskq_thread_t *tqt;
369 
370 	if (!list_empty(&tq->tq_pend_list)) {
371 		t = list_entry(tq->tq_pend_list.next, taskq_ent_t, tqent_list);
372 		lowest_id = MIN(lowest_id, t->tqent_id);
373 	}
374 
375 	if (!list_empty(&tq->tq_prio_list)) {
376 		t = list_entry(tq->tq_prio_list.next, taskq_ent_t, tqent_list);
377 		lowest_id = MIN(lowest_id, t->tqent_id);
378 	}
379 
380 	if (!list_empty(&tq->tq_delay_list)) {
381 		t = list_entry(tq->tq_delay_list.next, taskq_ent_t, tqent_list);
382 		lowest_id = MIN(lowest_id, t->tqent_id);
383 	}
384 
385 	if (!list_empty(&tq->tq_active_list)) {
386 		tqt = list_entry(tq->tq_active_list.next, taskq_thread_t,
387 		    tqt_active_list);
388 		ASSERT(tqt->tqt_id != TASKQID_INVALID);
389 		lowest_id = MIN(lowest_id, tqt->tqt_id);
390 	}
391 
392 	return (lowest_id);
393 }
394 
395 /*
396  * Insert a task into a list keeping the list sorted by increasing taskqid.
397  */
398 static void
taskq_insert_in_order(taskq_t * tq,taskq_thread_t * tqt)399 taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt)
400 {
401 	taskq_thread_t *w;
402 	struct list_head *l = NULL;
403 
404 	ASSERT(tq);
405 	ASSERT(tqt);
406 
407 	list_for_each_prev(l, &tq->tq_active_list) {
408 		w = list_entry(l, taskq_thread_t, tqt_active_list);
409 		if (w->tqt_id < tqt->tqt_id) {
410 			list_add(&tqt->tqt_active_list, l);
411 			break;
412 		}
413 	}
414 	if (l == &tq->tq_active_list)
415 		list_add(&tqt->tqt_active_list, &tq->tq_active_list);
416 }
417 
418 /*
419  * Find and return a task from the given list if it exists.  The list
420  * must be in lowest to highest task id order.
421  */
422 static taskq_ent_t *
taskq_find_list(taskq_t * tq,struct list_head * lh,taskqid_t id)423 taskq_find_list(taskq_t *tq, struct list_head *lh, taskqid_t id)
424 {
425 	struct list_head *l = NULL;
426 	taskq_ent_t *t;
427 
428 	list_for_each(l, lh) {
429 		t = list_entry(l, taskq_ent_t, tqent_list);
430 
431 		if (t->tqent_id == id)
432 			return (t);
433 
434 		if (t->tqent_id > id)
435 			break;
436 	}
437 
438 	return (NULL);
439 }
440 
441 /*
442  * Find an already dispatched task given the task id regardless of what
443  * state it is in.  If a task is still pending it will be returned.
444  * If a task is executing, then -EBUSY will be returned instead.
445  * If the task has already been run then NULL is returned.
446  */
447 static taskq_ent_t *
taskq_find(taskq_t * tq,taskqid_t id)448 taskq_find(taskq_t *tq, taskqid_t id)
449 {
450 	taskq_thread_t *tqt;
451 	struct list_head *l = NULL;
452 	taskq_ent_t *t;
453 
454 	t = taskq_find_list(tq, &tq->tq_delay_list, id);
455 	if (t)
456 		return (t);
457 
458 	t = taskq_find_list(tq, &tq->tq_prio_list, id);
459 	if (t)
460 		return (t);
461 
462 	t = taskq_find_list(tq, &tq->tq_pend_list, id);
463 	if (t)
464 		return (t);
465 
466 	list_for_each(l, &tq->tq_active_list) {
467 		tqt = list_entry(l, taskq_thread_t, tqt_active_list);
468 		if (tqt->tqt_id == id) {
469 			/*
470 			 * Instead of returning tqt_task, we just return a non
471 			 * NULL value to prevent misuse, since tqt_task only
472 			 * has two valid fields.
473 			 */
474 			return (ERR_PTR(-EBUSY));
475 		}
476 	}
477 
478 	return (NULL);
479 }
480 
481 /*
482  * Theory for the taskq_wait_id(), taskq_wait_outstanding(), and
483  * taskq_wait() functions below.
484  *
485  * Taskq waiting is accomplished by tracking the lowest outstanding task
486  * id and the next available task id.  As tasks are dispatched they are
487  * added to the tail of the pending, priority, or delay lists.  As worker
488  * threads become available the tasks are removed from the heads of these
489  * lists and linked to the worker threads.  This ensures the lists are
490  * kept sorted by lowest to highest task id.
491  *
492  * Therefore the lowest outstanding task id can be quickly determined by
493  * checking the head item from all of these lists.  This value is stored
494  * with the taskq as the lowest id.  It only needs to be recalculated when
495  * either the task with the current lowest id completes or is canceled.
496  *
497  * By blocking until the lowest task id exceeds the passed task id the
498  * taskq_wait_outstanding() function can be easily implemented.  Similarly,
499  * by blocking until the lowest task id matches the next task id taskq_wait()
500  * can be implemented.
501  *
502  * Callers should be aware that when there are multiple worked threads it
503  * is possible for larger task ids to complete before smaller ones.  Also
504  * when the taskq contains delay tasks with small task ids callers may
505  * block for a considerable length of time waiting for them to expire and
506  * execute.
507  */
508 static int
taskq_wait_id_check(taskq_t * tq,taskqid_t id)509 taskq_wait_id_check(taskq_t *tq, taskqid_t id)
510 {
511 	int rc;
512 	unsigned long flags;
513 
514 	spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
515 	rc = (taskq_find(tq, id) == NULL);
516 	spin_unlock_irqrestore(&tq->tq_lock, flags);
517 
518 	return (rc);
519 }
520 
521 /*
522  * The taskq_wait_id() function blocks until the passed task id completes.
523  * This does not guarantee that all lower task ids have completed.
524  */
525 void
taskq_wait_id(taskq_t * tq,taskqid_t id)526 taskq_wait_id(taskq_t *tq, taskqid_t id)
527 {
528 	wait_event(tq->tq_wait_waitq, taskq_wait_id_check(tq, id));
529 }
530 EXPORT_SYMBOL(taskq_wait_id);
531 
532 static int
taskq_wait_outstanding_check(taskq_t * tq,taskqid_t id)533 taskq_wait_outstanding_check(taskq_t *tq, taskqid_t id)
534 {
535 	int rc;
536 	unsigned long flags;
537 
538 	spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
539 	rc = (id < tq->tq_lowest_id);
540 	spin_unlock_irqrestore(&tq->tq_lock, flags);
541 
542 	return (rc);
543 }
544 
545 /*
546  * The taskq_wait_outstanding() function will block until all tasks with a
547  * lower taskqid than the passed 'id' have been completed.  Note that all
548  * task id's are assigned monotonically at dispatch time.  Zero may be
549  * passed for the id to indicate all tasks dispatch up to this point,
550  * but not after, should be waited for.
551  */
552 void
taskq_wait_outstanding(taskq_t * tq,taskqid_t id)553 taskq_wait_outstanding(taskq_t *tq, taskqid_t id)
554 {
555 	id = id ? id : tq->tq_next_id - 1;
556 	wait_event(tq->tq_wait_waitq, taskq_wait_outstanding_check(tq, id));
557 }
558 EXPORT_SYMBOL(taskq_wait_outstanding);
559 
560 static int
taskq_wait_check(taskq_t * tq)561 taskq_wait_check(taskq_t *tq)
562 {
563 	int rc;
564 	unsigned long flags;
565 
566 	spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
567 	rc = (tq->tq_lowest_id == tq->tq_next_id);
568 	spin_unlock_irqrestore(&tq->tq_lock, flags);
569 
570 	return (rc);
571 }
572 
573 /*
574  * The taskq_wait() function will block until the taskq is empty.
575  * This means that if a taskq re-dispatches work to itself taskq_wait()
576  * callers will block indefinitely.
577  */
578 void
taskq_wait(taskq_t * tq)579 taskq_wait(taskq_t *tq)
580 {
581 	wait_event(tq->tq_wait_waitq, taskq_wait_check(tq));
582 }
583 EXPORT_SYMBOL(taskq_wait);
584 
585 int
taskq_member(taskq_t * tq,kthread_t * t)586 taskq_member(taskq_t *tq, kthread_t *t)
587 {
588 	return (tq == (taskq_t *)tsd_get_by_thread(taskq_tsd, t));
589 }
590 EXPORT_SYMBOL(taskq_member);
591 
592 taskq_t *
taskq_of_curthread(void)593 taskq_of_curthread(void)
594 {
595 	return (tsd_get(taskq_tsd));
596 }
597 EXPORT_SYMBOL(taskq_of_curthread);
598 
599 /*
600  * Cancel an already dispatched task given the task id.  Still pending tasks
601  * will be immediately canceled, and if the task is active the function will
602  * block until it completes.  Preallocated tasks which are canceled must be
603  * freed by the caller.
604  */
605 int
taskq_cancel_id(taskq_t * tq,taskqid_t id)606 taskq_cancel_id(taskq_t *tq, taskqid_t id)
607 {
608 	taskq_ent_t *t;
609 	int rc = ENOENT;
610 	unsigned long flags;
611 
612 	ASSERT(tq);
613 
614 	spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
615 	t = taskq_find(tq, id);
616 	if (t && t != ERR_PTR(-EBUSY)) {
617 		list_del_init(&t->tqent_list);
618 		TQSTAT_DEC_LIST(tq, t);
619 		TQSTAT_DEC(tq, tasks_total);
620 
621 		t->tqent_flags |= TQENT_FLAG_CANCEL;
622 		TQSTAT_INC(tq, tasks_cancelled);
623 
624 		/*
625 		 * When canceling the lowest outstanding task id we
626 		 * must recalculate the new lowest outstanding id.
627 		 */
628 		if (tq->tq_lowest_id == t->tqent_id) {
629 			tq->tq_lowest_id = taskq_lowest_id(tq);
630 			ASSERT3S(tq->tq_lowest_id, >, t->tqent_id);
631 		}
632 
633 		/*
634 		 * The task_expire() function takes the tq->tq_lock so drop
635 		 * drop the lock before synchronously cancelling the timer.
636 		 */
637 		if (timer_pending(&t->tqent_timer)) {
638 			spin_unlock_irqrestore(&tq->tq_lock, flags);
639 			del_timer_sync(&t->tqent_timer);
640 			spin_lock_irqsave_nested(&tq->tq_lock, flags,
641 			    tq->tq_lock_class);
642 		}
643 
644 		if (!(t->tqent_flags & TQENT_FLAG_PREALLOC))
645 			task_done(tq, t);
646 
647 		rc = 0;
648 	}
649 	spin_unlock_irqrestore(&tq->tq_lock, flags);
650 
651 	if (t == ERR_PTR(-EBUSY)) {
652 		taskq_wait_id(tq, id);
653 		rc = EBUSY;
654 	}
655 
656 	return (rc);
657 }
658 EXPORT_SYMBOL(taskq_cancel_id);
659 
660 static int taskq_thread_spawn(taskq_t *tq);
661 
662 taskqid_t
taskq_dispatch(taskq_t * tq,task_func_t func,void * arg,uint_t flags)663 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
664 {
665 	taskq_ent_t *t;
666 	taskqid_t rc = TASKQID_INVALID;
667 	unsigned long irqflags;
668 
669 	ASSERT(tq);
670 	ASSERT(func);
671 
672 	spin_lock_irqsave_nested(&tq->tq_lock, irqflags, tq->tq_lock_class);
673 
674 	/* Taskq being destroyed and all tasks drained */
675 	if (!(tq->tq_flags & TASKQ_ACTIVE))
676 		goto out;
677 
678 	/* Do not queue the task unless there is idle thread for it */
679 	ASSERT(tq->tq_nactive <= tq->tq_nthreads);
680 	if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads)) {
681 		/* Dynamic taskq may be able to spawn another thread */
682 		if (taskq_thread_spawn(tq) == 0)
683 			goto out;
684 	}
685 
686 	if ((t = task_alloc(tq, flags, &irqflags)) == NULL)
687 		goto out;
688 
689 	spin_lock(&t->tqent_lock);
690 
691 	/* Queue to the front of the list to enforce TQ_NOQUEUE semantics */
692 	if (flags & TQ_NOQUEUE) {
693 		TQENT_SET_LIST(t, TQENT_LIST_PRIORITY);
694 		list_add(&t->tqent_list, &tq->tq_prio_list);
695 	/* Queue to the priority list instead of the pending list */
696 	} else if (flags & TQ_FRONT) {
697 		TQENT_SET_LIST(t, TQENT_LIST_PRIORITY);
698 		list_add_tail(&t->tqent_list, &tq->tq_prio_list);
699 	} else {
700 		TQENT_SET_LIST(t, TQENT_LIST_PENDING);
701 		list_add_tail(&t->tqent_list, &tq->tq_pend_list);
702 	}
703 	TQSTAT_INC_LIST(tq, t);
704 	TQSTAT_INC(tq, tasks_total);
705 
706 	t->tqent_id = rc = tq->tq_next_id;
707 	tq->tq_next_id++;
708 	t->tqent_func = func;
709 	t->tqent_arg = arg;
710 	t->tqent_taskq = tq;
711 	t->tqent_timer.function = NULL;
712 	t->tqent_timer.expires = 0;
713 
714 	t->tqent_birth = jiffies;
715 	DTRACE_PROBE1(taskq_ent__birth, taskq_ent_t *, t);
716 
717 	ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
718 
719 	spin_unlock(&t->tqent_lock);
720 
721 	wake_up(&tq->tq_work_waitq);
722 
723 	TQSTAT_INC(tq, tasks_dispatched);
724 
725 	/* Spawn additional taskq threads if required. */
726 	if (!(flags & TQ_NOQUEUE) && tq->tq_nactive == tq->tq_nthreads)
727 		(void) taskq_thread_spawn(tq);
728 out:
729 	spin_unlock_irqrestore(&tq->tq_lock, irqflags);
730 	return (rc);
731 }
732 EXPORT_SYMBOL(taskq_dispatch);
733 
734 taskqid_t
taskq_dispatch_delay(taskq_t * tq,task_func_t func,void * arg,uint_t flags,clock_t expire_time)735 taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg,
736     uint_t flags, clock_t expire_time)
737 {
738 	taskqid_t rc = TASKQID_INVALID;
739 	taskq_ent_t *t;
740 	unsigned long irqflags;
741 
742 	ASSERT(tq);
743 	ASSERT(func);
744 
745 	spin_lock_irqsave_nested(&tq->tq_lock, irqflags, tq->tq_lock_class);
746 
747 	/* Taskq being destroyed and all tasks drained */
748 	if (!(tq->tq_flags & TASKQ_ACTIVE))
749 		goto out;
750 
751 	if ((t = task_alloc(tq, flags, &irqflags)) == NULL)
752 		goto out;
753 
754 	spin_lock(&t->tqent_lock);
755 
756 	/* Queue to the delay list for subsequent execution */
757 	list_add_tail(&t->tqent_list, &tq->tq_delay_list);
758 	TQENT_SET_LIST(t, TQENT_LIST_DELAY);
759 	TQSTAT_INC_LIST(tq, t);
760 	TQSTAT_INC(tq, tasks_total);
761 
762 	t->tqent_id = rc = tq->tq_next_id;
763 	tq->tq_next_id++;
764 	t->tqent_func = func;
765 	t->tqent_arg = arg;
766 	t->tqent_taskq = tq;
767 	t->tqent_timer.function = task_expire;
768 	t->tqent_timer.expires = (unsigned long)expire_time;
769 	add_timer(&t->tqent_timer);
770 
771 	ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
772 
773 	spin_unlock(&t->tqent_lock);
774 
775 	TQSTAT_INC(tq, tasks_dispatched_delayed);
776 
777 	/* Spawn additional taskq threads if required. */
778 	if (tq->tq_nactive == tq->tq_nthreads)
779 		(void) taskq_thread_spawn(tq);
780 out:
781 	spin_unlock_irqrestore(&tq->tq_lock, irqflags);
782 	return (rc);
783 }
784 EXPORT_SYMBOL(taskq_dispatch_delay);
785 
786 void
taskq_dispatch_ent(taskq_t * tq,task_func_t func,void * arg,uint_t flags,taskq_ent_t * t)787 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
788     taskq_ent_t *t)
789 {
790 	unsigned long irqflags;
791 	ASSERT(tq);
792 	ASSERT(func);
793 
794 	spin_lock_irqsave_nested(&tq->tq_lock, irqflags,
795 	    tq->tq_lock_class);
796 
797 	/* Taskq being destroyed and all tasks drained */
798 	if (!(tq->tq_flags & TASKQ_ACTIVE)) {
799 		t->tqent_id = TASKQID_INVALID;
800 		goto out;
801 	}
802 
803 	if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads)) {
804 		/* Dynamic taskq may be able to spawn another thread */
805 		if (taskq_thread_spawn(tq) == 0)
806 			goto out;
807 		flags |= TQ_FRONT;
808 	}
809 
810 	spin_lock(&t->tqent_lock);
811 
812 	/*
813 	 * Make sure the entry is not on some other taskq; it is important to
814 	 * ASSERT() under lock
815 	 */
816 	ASSERT(taskq_empty_ent(t));
817 
818 	/*
819 	 * Mark it as a prealloc'd task.  This is important
820 	 * to ensure that we don't free it later.
821 	 */
822 	t->tqent_flags |= TQENT_FLAG_PREALLOC;
823 
824 	/* Queue to the priority list instead of the pending list */
825 	if (flags & TQ_FRONT) {
826 		TQENT_SET_LIST(t, TQENT_LIST_PRIORITY);
827 		list_add_tail(&t->tqent_list, &tq->tq_prio_list);
828 	} else {
829 		TQENT_SET_LIST(t, TQENT_LIST_PENDING);
830 		list_add_tail(&t->tqent_list, &tq->tq_pend_list);
831 	}
832 	TQSTAT_INC_LIST(tq, t);
833 	TQSTAT_INC(tq, tasks_total);
834 
835 	t->tqent_id = tq->tq_next_id;
836 	tq->tq_next_id++;
837 	t->tqent_func = func;
838 	t->tqent_arg = arg;
839 	t->tqent_taskq = tq;
840 
841 	t->tqent_birth = jiffies;
842 	DTRACE_PROBE1(taskq_ent__birth, taskq_ent_t *, t);
843 
844 	spin_unlock(&t->tqent_lock);
845 
846 	wake_up(&tq->tq_work_waitq);
847 
848 	TQSTAT_INC(tq, tasks_dispatched);
849 
850 	/* Spawn additional taskq threads if required. */
851 	if (tq->tq_nactive == tq->tq_nthreads)
852 		(void) taskq_thread_spawn(tq);
853 out:
854 	spin_unlock_irqrestore(&tq->tq_lock, irqflags);
855 }
856 EXPORT_SYMBOL(taskq_dispatch_ent);
857 
858 int
taskq_empty_ent(taskq_ent_t * t)859 taskq_empty_ent(taskq_ent_t *t)
860 {
861 	return (list_empty(&t->tqent_list));
862 }
863 EXPORT_SYMBOL(taskq_empty_ent);
864 
865 void
taskq_init_ent(taskq_ent_t * t)866 taskq_init_ent(taskq_ent_t *t)
867 {
868 	spin_lock_init(&t->tqent_lock);
869 	init_waitqueue_head(&t->tqent_waitq);
870 	timer_setup(&t->tqent_timer, NULL, 0);
871 	INIT_LIST_HEAD(&t->tqent_list);
872 	t->tqent_id = 0;
873 	t->tqent_func = NULL;
874 	t->tqent_arg = NULL;
875 	t->tqent_flags = 0;
876 	t->tqent_taskq = NULL;
877 }
878 EXPORT_SYMBOL(taskq_init_ent);
879 
880 /*
881  * Return the next pending task, preference is given to tasks on the
882  * priority list which were dispatched with TQ_FRONT.
883  */
884 static taskq_ent_t *
taskq_next_ent(taskq_t * tq)885 taskq_next_ent(taskq_t *tq)
886 {
887 	struct list_head *list;
888 
889 	if (!list_empty(&tq->tq_prio_list))
890 		list = &tq->tq_prio_list;
891 	else if (!list_empty(&tq->tq_pend_list))
892 		list = &tq->tq_pend_list;
893 	else
894 		return (NULL);
895 
896 	return (list_entry(list->next, taskq_ent_t, tqent_list));
897 }
898 
899 /*
900  * Spawns a new thread for the specified taskq.
901  */
902 static void
taskq_thread_spawn_task(void * arg)903 taskq_thread_spawn_task(void *arg)
904 {
905 	taskq_t *tq = (taskq_t *)arg;
906 	unsigned long flags;
907 
908 	if (taskq_thread_create(tq) == NULL) {
909 		/* restore spawning count if failed */
910 		spin_lock_irqsave_nested(&tq->tq_lock, flags,
911 		    tq->tq_lock_class);
912 		tq->tq_nspawn--;
913 		spin_unlock_irqrestore(&tq->tq_lock, flags);
914 	}
915 }
916 
917 /*
918  * Spawn addition threads for dynamic taskqs (TASKQ_DYNAMIC) the current
919  * number of threads is insufficient to handle the pending tasks.  These
920  * new threads must be created by the dedicated dynamic_taskq to avoid
921  * deadlocks between thread creation and memory reclaim.  The system_taskq
922  * which is also a dynamic taskq cannot be safely used for this.
923  */
924 static int
taskq_thread_spawn(taskq_t * tq)925 taskq_thread_spawn(taskq_t *tq)
926 {
927 	int spawning = 0;
928 
929 	if (!(tq->tq_flags & TASKQ_DYNAMIC))
930 		return (0);
931 
932 	tq->lastspawnstop = jiffies;
933 	if ((tq->tq_nthreads + tq->tq_nspawn < tq->tq_maxthreads) &&
934 	    (tq->tq_flags & TASKQ_ACTIVE)) {
935 		spawning = (++tq->tq_nspawn);
936 		taskq_dispatch(dynamic_taskq, taskq_thread_spawn_task,
937 		    tq, TQ_NOSLEEP);
938 	}
939 
940 	return (spawning);
941 }
942 
943 /*
944  * Threads in a dynamic taskq may exit once there is no more work to do.
945  * To prevent threads from being created and destroyed too often limit
946  * the exit rate to one per spl_taskq_thread_timeout_ms.
947  *
948  * The first thread is the thread list is treated as the primary thread.
949  * There is nothing special about the primary thread but in order to avoid
950  * all the taskq pids from changing we opt to make it long running.
951  */
952 static int
taskq_thread_should_stop(taskq_t * tq,taskq_thread_t * tqt)953 taskq_thread_should_stop(taskq_t *tq, taskq_thread_t *tqt)
954 {
955 	ASSERT(!taskq_next_ent(tq));
956 	if (!(tq->tq_flags & TASKQ_DYNAMIC) || !spl_taskq_thread_dynamic)
957 		return (0);
958 	if (!(tq->tq_flags & TASKQ_ACTIVE))
959 		return (1);
960 	if (list_first_entry(&(tq->tq_thread_list), taskq_thread_t,
961 	    tqt_thread_list) == tqt)
962 		return (0);
963 	ASSERT3U(tq->tq_nthreads, >, 1);
964 	if (tq->tq_nspawn != 0)
965 		return (0);
966 	if (time_before(jiffies, tq->lastspawnstop +
967 	    msecs_to_jiffies(spl_taskq_thread_timeout_ms)))
968 		return (0);
969 	tq->lastspawnstop = jiffies;
970 	return (1);
971 }
972 
973 static int
taskq_thread(void * args)974 taskq_thread(void *args)
975 {
976 	DECLARE_WAITQUEUE(wait, current);
977 	sigset_t blocked;
978 	taskq_thread_t *tqt = args;
979 	taskq_t *tq;
980 	taskq_ent_t *t;
981 	int seq_tasks = 0;
982 	unsigned long flags;
983 	taskq_ent_t dup_task = {};
984 
985 	ASSERT(tqt);
986 	ASSERT(tqt->tqt_tq);
987 	tq = tqt->tqt_tq;
988 	current->flags |= PF_NOFREEZE;
989 
990 	(void) spl_fstrans_mark();
991 
992 	sigfillset(&blocked);
993 	sigprocmask(SIG_BLOCK, &blocked, NULL);
994 	flush_signals(current);
995 
996 	tsd_set(taskq_tsd, tq);
997 	spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
998 	/*
999 	 * If we are dynamically spawned, decrease spawning count. Note that
1000 	 * we could be created during taskq_create, in which case we shouldn't
1001 	 * do the decrement. But it's fine because taskq_create will reset
1002 	 * tq_nspawn later.
1003 	 */
1004 	if (tq->tq_flags & TASKQ_DYNAMIC)
1005 		tq->tq_nspawn--;
1006 
1007 	/* Immediately exit if more threads than allowed were created. */
1008 	if (tq->tq_nthreads >= tq->tq_maxthreads)
1009 		goto error;
1010 
1011 	tq->tq_nthreads++;
1012 	list_add_tail(&tqt->tqt_thread_list, &tq->tq_thread_list);
1013 	wake_up(&tq->tq_wait_waitq);
1014 	set_current_state(TASK_INTERRUPTIBLE);
1015 
1016 	TQSTAT_INC(tq, threads_total);
1017 
1018 	while (!kthread_should_stop()) {
1019 
1020 		if (list_empty(&tq->tq_pend_list) &&
1021 		    list_empty(&tq->tq_prio_list)) {
1022 
1023 			if (taskq_thread_should_stop(tq, tqt))
1024 				break;
1025 
1026 			add_wait_queue_exclusive(&tq->tq_work_waitq, &wait);
1027 			spin_unlock_irqrestore(&tq->tq_lock, flags);
1028 
1029 			TQSTAT_INC(tq, thread_sleeps);
1030 			TQSTAT_INC(tq, threads_idle);
1031 
1032 			schedule();
1033 			seq_tasks = 0;
1034 
1035 			TQSTAT_DEC(tq, threads_idle);
1036 			TQSTAT_INC(tq, thread_wakeups);
1037 
1038 			spin_lock_irqsave_nested(&tq->tq_lock, flags,
1039 			    tq->tq_lock_class);
1040 			remove_wait_queue(&tq->tq_work_waitq, &wait);
1041 		} else {
1042 			__set_current_state(TASK_RUNNING);
1043 		}
1044 
1045 		if ((t = taskq_next_ent(tq)) != NULL) {
1046 			list_del_init(&t->tqent_list);
1047 			TQSTAT_DEC_LIST(tq, t);
1048 			TQSTAT_DEC(tq, tasks_total);
1049 
1050 			/*
1051 			 * A TQENT_FLAG_PREALLOC task may be reused or freed
1052 			 * during the task function call. Store tqent_id and
1053 			 * tqent_flags here.
1054 			 *
1055 			 * Also use an on stack taskq_ent_t for tqt_task
1056 			 * assignment in this case; we want to make sure
1057 			 * to duplicate all fields, so the values are
1058 			 * correct when it's accessed via DTRACE_PROBE*.
1059 			 */
1060 			tqt->tqt_id = t->tqent_id;
1061 			tqt->tqt_flags = t->tqent_flags;
1062 
1063 			if (t->tqent_flags & TQENT_FLAG_PREALLOC) {
1064 				dup_task = *t;
1065 				t = &dup_task;
1066 			}
1067 			tqt->tqt_task = t;
1068 
1069 			taskq_insert_in_order(tq, tqt);
1070 			tq->tq_nactive++;
1071 			spin_unlock_irqrestore(&tq->tq_lock, flags);
1072 
1073 			TQSTAT_INC(tq, threads_active);
1074 			DTRACE_PROBE1(taskq_ent__start, taskq_ent_t *, t);
1075 
1076 			/* Perform the requested task */
1077 			t->tqent_func(t->tqent_arg);
1078 
1079 			DTRACE_PROBE1(taskq_ent__finish, taskq_ent_t *, t);
1080 
1081 			TQSTAT_DEC(tq, threads_active);
1082 			if ((t->tqent_flags & TQENT_LIST_MASK) ==
1083 			    TQENT_LIST_PENDING)
1084 				TQSTAT_INC(tq, tasks_executed_normal);
1085 			else
1086 				TQSTAT_INC(tq, tasks_executed_priority);
1087 			TQSTAT_INC(tq, tasks_executed);
1088 
1089 			spin_lock_irqsave_nested(&tq->tq_lock, flags,
1090 			    tq->tq_lock_class);
1091 
1092 			tq->tq_nactive--;
1093 			list_del_init(&tqt->tqt_active_list);
1094 			tqt->tqt_task = NULL;
1095 
1096 			/* For prealloc'd tasks, we don't free anything. */
1097 			if (!(tqt->tqt_flags & TQENT_FLAG_PREALLOC))
1098 				task_done(tq, t);
1099 
1100 			/*
1101 			 * When the current lowest outstanding taskqid is
1102 			 * done calculate the new lowest outstanding id
1103 			 */
1104 			if (tq->tq_lowest_id == tqt->tqt_id) {
1105 				tq->tq_lowest_id = taskq_lowest_id(tq);
1106 				ASSERT3S(tq->tq_lowest_id, >, tqt->tqt_id);
1107 			}
1108 
1109 			/* Spawn additional taskq threads if required. */
1110 			if ((++seq_tasks) > spl_taskq_thread_sequential &&
1111 			    taskq_thread_spawn(tq))
1112 				seq_tasks = 0;
1113 
1114 			tqt->tqt_id = TASKQID_INVALID;
1115 			tqt->tqt_flags = 0;
1116 			wake_up_all(&tq->tq_wait_waitq);
1117 		} else
1118 			TQSTAT_INC(tq, thread_wakeups_nowork);
1119 
1120 		set_current_state(TASK_INTERRUPTIBLE);
1121 
1122 	}
1123 
1124 	__set_current_state(TASK_RUNNING);
1125 	tq->tq_nthreads--;
1126 	list_del_init(&tqt->tqt_thread_list);
1127 
1128 	TQSTAT_DEC(tq, threads_total);
1129 	TQSTAT_INC(tq, threads_destroyed);
1130 
1131 error:
1132 	kmem_free(tqt, sizeof (taskq_thread_t));
1133 	spin_unlock_irqrestore(&tq->tq_lock, flags);
1134 
1135 	tsd_set(taskq_tsd, NULL);
1136 	thread_exit();
1137 
1138 	return (0);
1139 }
1140 
1141 static taskq_thread_t *
taskq_thread_create(taskq_t * tq)1142 taskq_thread_create(taskq_t *tq)
1143 {
1144 	static int last_used_cpu = 0;
1145 	taskq_thread_t *tqt;
1146 
1147 	tqt = kmem_alloc(sizeof (*tqt), KM_PUSHPAGE);
1148 	INIT_LIST_HEAD(&tqt->tqt_thread_list);
1149 	INIT_LIST_HEAD(&tqt->tqt_active_list);
1150 	tqt->tqt_tq = tq;
1151 	tqt->tqt_id = TASKQID_INVALID;
1152 
1153 	tqt->tqt_thread = spl_kthread_create(taskq_thread, tqt,
1154 	    "%s", tq->tq_name);
1155 	if (tqt->tqt_thread == NULL) {
1156 		kmem_free(tqt, sizeof (taskq_thread_t));
1157 		return (NULL);
1158 	}
1159 
1160 	if (spl_taskq_thread_bind) {
1161 		last_used_cpu = (last_used_cpu + 1) % num_online_cpus();
1162 		kthread_bind(tqt->tqt_thread, last_used_cpu);
1163 	}
1164 
1165 	if (spl_taskq_thread_priority)
1166 		set_user_nice(tqt->tqt_thread, PRIO_TO_NICE(tq->tq_pri));
1167 
1168 	wake_up_process(tqt->tqt_thread);
1169 
1170 	TQSTAT_INC(tq, threads_created);
1171 
1172 	return (tqt);
1173 }
1174 
1175 static void
taskq_stats_init(taskq_t * tq)1176 taskq_stats_init(taskq_t *tq)
1177 {
1178 	taskq_sums_t *tqs = &tq->tq_sums;
1179 	wmsum_init(&tqs->tqs_threads_active, 0);
1180 	wmsum_init(&tqs->tqs_threads_idle, 0);
1181 	wmsum_init(&tqs->tqs_threads_total, 0);
1182 	wmsum_init(&tqs->tqs_tasks_pending, 0);
1183 	wmsum_init(&tqs->tqs_tasks_priority, 0);
1184 	wmsum_init(&tqs->tqs_tasks_total, 0);
1185 	wmsum_init(&tqs->tqs_tasks_delayed, 0);
1186 	wmsum_init(&tqs->tqs_entries_free, 0);
1187 	wmsum_init(&tqs->tqs_threads_created, 0);
1188 	wmsum_init(&tqs->tqs_threads_destroyed, 0);
1189 	wmsum_init(&tqs->tqs_tasks_dispatched, 0);
1190 	wmsum_init(&tqs->tqs_tasks_dispatched_delayed, 0);
1191 	wmsum_init(&tqs->tqs_tasks_executed_normal, 0);
1192 	wmsum_init(&tqs->tqs_tasks_executed_priority, 0);
1193 	wmsum_init(&tqs->tqs_tasks_executed, 0);
1194 	wmsum_init(&tqs->tqs_tasks_delayed_requeued, 0);
1195 	wmsum_init(&tqs->tqs_tasks_cancelled, 0);
1196 	wmsum_init(&tqs->tqs_thread_wakeups, 0);
1197 	wmsum_init(&tqs->tqs_thread_wakeups_nowork, 0);
1198 	wmsum_init(&tqs->tqs_thread_sleeps, 0);
1199 }
1200 
1201 static void
taskq_stats_fini(taskq_t * tq)1202 taskq_stats_fini(taskq_t *tq)
1203 {
1204 	taskq_sums_t *tqs = &tq->tq_sums;
1205 	wmsum_fini(&tqs->tqs_threads_active);
1206 	wmsum_fini(&tqs->tqs_threads_idle);
1207 	wmsum_fini(&tqs->tqs_threads_total);
1208 	wmsum_fini(&tqs->tqs_tasks_pending);
1209 	wmsum_fini(&tqs->tqs_tasks_priority);
1210 	wmsum_fini(&tqs->tqs_tasks_total);
1211 	wmsum_fini(&tqs->tqs_tasks_delayed);
1212 	wmsum_fini(&tqs->tqs_entries_free);
1213 	wmsum_fini(&tqs->tqs_threads_created);
1214 	wmsum_fini(&tqs->tqs_threads_destroyed);
1215 	wmsum_fini(&tqs->tqs_tasks_dispatched);
1216 	wmsum_fini(&tqs->tqs_tasks_dispatched_delayed);
1217 	wmsum_fini(&tqs->tqs_tasks_executed_normal);
1218 	wmsum_fini(&tqs->tqs_tasks_executed_priority);
1219 	wmsum_fini(&tqs->tqs_tasks_executed);
1220 	wmsum_fini(&tqs->tqs_tasks_delayed_requeued);
1221 	wmsum_fini(&tqs->tqs_tasks_cancelled);
1222 	wmsum_fini(&tqs->tqs_thread_wakeups);
1223 	wmsum_fini(&tqs->tqs_thread_wakeups_nowork);
1224 	wmsum_fini(&tqs->tqs_thread_sleeps);
1225 }
1226 
1227 static int
taskq_kstats_update(kstat_t * ksp,int rw)1228 taskq_kstats_update(kstat_t *ksp, int rw)
1229 {
1230 	if (rw == KSTAT_WRITE)
1231 		return (EACCES);
1232 
1233 	taskq_t *tq = ksp->ks_private;
1234 	taskq_kstats_t *tqks = ksp->ks_data;
1235 
1236 	tqks->tqks_threads_max.value.ui64 = tq->tq_maxthreads;
1237 	tqks->tqks_entry_pool_min.value.ui64 = tq->tq_minalloc;
1238 	tqks->tqks_entry_pool_max.value.ui64 = tq->tq_maxalloc;
1239 
1240 	taskq_sums_t *tqs = &tq->tq_sums;
1241 
1242 	tqks->tqks_threads_active.value.ui64 =
1243 	    wmsum_value(&tqs->tqs_threads_active);
1244 	tqks->tqks_threads_idle.value.ui64 =
1245 	    wmsum_value(&tqs->tqs_threads_idle);
1246 	tqks->tqks_threads_total.value.ui64 =
1247 	    wmsum_value(&tqs->tqs_threads_total);
1248 	tqks->tqks_tasks_pending.value.ui64 =
1249 	    wmsum_value(&tqs->tqs_tasks_pending);
1250 	tqks->tqks_tasks_priority.value.ui64 =
1251 	    wmsum_value(&tqs->tqs_tasks_priority);
1252 	tqks->tqks_tasks_total.value.ui64 =
1253 	    wmsum_value(&tqs->tqs_tasks_total);
1254 	tqks->tqks_tasks_delayed.value.ui64 =
1255 	    wmsum_value(&tqs->tqs_tasks_delayed);
1256 	tqks->tqks_entries_free.value.ui64 =
1257 	    wmsum_value(&tqs->tqs_entries_free);
1258 	tqks->tqks_threads_created.value.ui64 =
1259 	    wmsum_value(&tqs->tqs_threads_created);
1260 	tqks->tqks_threads_destroyed.value.ui64 =
1261 	    wmsum_value(&tqs->tqs_threads_destroyed);
1262 	tqks->tqks_tasks_dispatched.value.ui64 =
1263 	    wmsum_value(&tqs->tqs_tasks_dispatched);
1264 	tqks->tqks_tasks_dispatched_delayed.value.ui64 =
1265 	    wmsum_value(&tqs->tqs_tasks_dispatched_delayed);
1266 	tqks->tqks_tasks_executed_normal.value.ui64 =
1267 	    wmsum_value(&tqs->tqs_tasks_executed_normal);
1268 	tqks->tqks_tasks_executed_priority.value.ui64 =
1269 	    wmsum_value(&tqs->tqs_tasks_executed_priority);
1270 	tqks->tqks_tasks_executed.value.ui64 =
1271 	    wmsum_value(&tqs->tqs_tasks_executed);
1272 	tqks->tqks_tasks_delayed_requeued.value.ui64 =
1273 	    wmsum_value(&tqs->tqs_tasks_delayed_requeued);
1274 	tqks->tqks_tasks_cancelled.value.ui64 =
1275 	    wmsum_value(&tqs->tqs_tasks_cancelled);
1276 	tqks->tqks_thread_wakeups.value.ui64 =
1277 	    wmsum_value(&tqs->tqs_thread_wakeups);
1278 	tqks->tqks_thread_wakeups_nowork.value.ui64 =
1279 	    wmsum_value(&tqs->tqs_thread_wakeups_nowork);
1280 	tqks->tqks_thread_sleeps.value.ui64 =
1281 	    wmsum_value(&tqs->tqs_thread_sleeps);
1282 
1283 	return (0);
1284 }
1285 
1286 static void
taskq_kstats_init(taskq_t * tq)1287 taskq_kstats_init(taskq_t *tq)
1288 {
1289 	char name[TASKQ_NAMELEN+5]; /* 5 for dot, 3x instance digits, null */
1290 	snprintf(name, sizeof (name), "%s.%d", tq->tq_name, tq->tq_instance);
1291 
1292 	kstat_t *ksp = kstat_create("taskq", 0, name, "misc",
1293 	    KSTAT_TYPE_NAMED, sizeof (taskq_kstats_t) / sizeof (kstat_named_t),
1294 	    KSTAT_FLAG_VIRTUAL);
1295 
1296 	if (ksp == NULL)
1297 		return;
1298 
1299 	ksp->ks_private = tq;
1300 	ksp->ks_update = taskq_kstats_update;
1301 	ksp->ks_data = kmem_alloc(sizeof (taskq_kstats_t), KM_SLEEP);
1302 	memcpy(ksp->ks_data, &taskq_kstats_template, sizeof (taskq_kstats_t));
1303 	kstat_install(ksp);
1304 
1305 	tq->tq_ksp = ksp;
1306 }
1307 
1308 static void
taskq_kstats_fini(taskq_t * tq)1309 taskq_kstats_fini(taskq_t *tq)
1310 {
1311 	if (tq->tq_ksp == NULL)
1312 		return;
1313 
1314 	kmem_free(tq->tq_ksp->ks_data, sizeof (taskq_kstats_t));
1315 	kstat_delete(tq->tq_ksp);
1316 
1317 	tq->tq_ksp = NULL;
1318 }
1319 
1320 taskq_t *
taskq_create(const char * name,int threads_arg,pri_t pri,int minalloc,int maxalloc,uint_t flags)1321 taskq_create(const char *name, int threads_arg, pri_t pri,
1322     int minalloc, int maxalloc, uint_t flags)
1323 {
1324 	taskq_t *tq;
1325 	taskq_thread_t *tqt;
1326 	int count = 0, rc = 0, i;
1327 	unsigned long irqflags;
1328 	int nthreads = threads_arg;
1329 
1330 	ASSERT(name != NULL);
1331 	ASSERT(minalloc >= 0);
1332 	ASSERT(!(flags & (TASKQ_CPR_SAFE))); /* Unsupported */
1333 
1334 	/* Scale the number of threads using nthreads as a percentage */
1335 	if (flags & TASKQ_THREADS_CPU_PCT) {
1336 		ASSERT(nthreads <= 100);
1337 		ASSERT(nthreads >= 0);
1338 		nthreads = MIN(threads_arg, 100);
1339 		nthreads = MAX(nthreads, 0);
1340 		nthreads = MAX((num_online_cpus() * nthreads) /100, 1);
1341 	}
1342 
1343 	tq = kmem_alloc(sizeof (*tq), KM_PUSHPAGE);
1344 	if (tq == NULL)
1345 		return (NULL);
1346 
1347 	tq->tq_hp_support = B_FALSE;
1348 
1349 	if (flags & TASKQ_THREADS_CPU_PCT) {
1350 		tq->tq_hp_support = B_TRUE;
1351 		if (cpuhp_state_add_instance_nocalls(spl_taskq_cpuhp_state,
1352 		    &tq->tq_hp_cb_node) != 0) {
1353 			kmem_free(tq, sizeof (*tq));
1354 			return (NULL);
1355 		}
1356 	}
1357 
1358 	spin_lock_init(&tq->tq_lock);
1359 	INIT_LIST_HEAD(&tq->tq_thread_list);
1360 	INIT_LIST_HEAD(&tq->tq_active_list);
1361 	tq->tq_name = kmem_strdup(name);
1362 	tq->tq_nactive = 0;
1363 	tq->tq_nthreads = 0;
1364 	tq->tq_nspawn = 0;
1365 	tq->tq_maxthreads = nthreads;
1366 	tq->tq_cpu_pct = threads_arg;
1367 	tq->tq_pri = pri;
1368 	tq->tq_minalloc = minalloc;
1369 	tq->tq_maxalloc = maxalloc;
1370 	tq->tq_nalloc = 0;
1371 	tq->tq_flags = (flags | TASKQ_ACTIVE);
1372 	tq->tq_next_id = TASKQID_INITIAL;
1373 	tq->tq_lowest_id = TASKQID_INITIAL;
1374 	tq->lastspawnstop = jiffies;
1375 	INIT_LIST_HEAD(&tq->tq_free_list);
1376 	INIT_LIST_HEAD(&tq->tq_pend_list);
1377 	INIT_LIST_HEAD(&tq->tq_prio_list);
1378 	INIT_LIST_HEAD(&tq->tq_delay_list);
1379 	init_waitqueue_head(&tq->tq_work_waitq);
1380 	init_waitqueue_head(&tq->tq_wait_waitq);
1381 	tq->tq_lock_class = TQ_LOCK_GENERAL;
1382 	INIT_LIST_HEAD(&tq->tq_taskqs);
1383 	taskq_stats_init(tq);
1384 
1385 	if (flags & TASKQ_PREPOPULATE) {
1386 		spin_lock_irqsave_nested(&tq->tq_lock, irqflags,
1387 		    tq->tq_lock_class);
1388 
1389 		for (i = 0; i < minalloc; i++)
1390 			task_done(tq, task_alloc(tq, TQ_PUSHPAGE | TQ_NEW,
1391 			    &irqflags));
1392 
1393 		spin_unlock_irqrestore(&tq->tq_lock, irqflags);
1394 	}
1395 
1396 	if ((flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic)
1397 		nthreads = 1;
1398 
1399 	for (i = 0; i < nthreads; i++) {
1400 		tqt = taskq_thread_create(tq);
1401 		if (tqt == NULL)
1402 			rc = 1;
1403 		else
1404 			count++;
1405 	}
1406 
1407 	/* Wait for all threads to be started before potential destroy */
1408 	wait_event(tq->tq_wait_waitq, tq->tq_nthreads == count);
1409 	/*
1410 	 * taskq_thread might have touched nspawn, but we don't want them to
1411 	 * because they're not dynamically spawned. So we reset it to 0
1412 	 */
1413 	tq->tq_nspawn = 0;
1414 
1415 	if (rc) {
1416 		taskq_destroy(tq);
1417 		return (NULL);
1418 	}
1419 
1420 	down_write(&tq_list_sem);
1421 	tq->tq_instance = taskq_find_by_name(name) + 1;
1422 	list_add_tail(&tq->tq_taskqs, &tq_list);
1423 	up_write(&tq_list_sem);
1424 
1425 	/* Install kstats late, because the name includes tq_instance */
1426 	taskq_kstats_init(tq);
1427 
1428 	return (tq);
1429 }
1430 EXPORT_SYMBOL(taskq_create);
1431 
1432 void
taskq_destroy(taskq_t * tq)1433 taskq_destroy(taskq_t *tq)
1434 {
1435 	struct task_struct *thread;
1436 	taskq_thread_t *tqt;
1437 	taskq_ent_t *t;
1438 	unsigned long flags;
1439 
1440 	ASSERT(tq);
1441 	spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
1442 	tq->tq_flags &= ~TASKQ_ACTIVE;
1443 	spin_unlock_irqrestore(&tq->tq_lock, flags);
1444 
1445 	if (tq->tq_hp_support) {
1446 		VERIFY0(cpuhp_state_remove_instance_nocalls(
1447 		    spl_taskq_cpuhp_state, &tq->tq_hp_cb_node));
1448 	}
1449 
1450 	/*
1451 	 * When TASKQ_ACTIVE is clear new tasks may not be added nor may
1452 	 * new worker threads be spawned for dynamic taskq.
1453 	 */
1454 	if (dynamic_taskq != NULL)
1455 		taskq_wait_outstanding(dynamic_taskq, 0);
1456 
1457 	taskq_wait(tq);
1458 
1459 	taskq_kstats_fini(tq);
1460 
1461 	/* remove taskq from global list used by the kstats */
1462 	down_write(&tq_list_sem);
1463 	list_del(&tq->tq_taskqs);
1464 	up_write(&tq_list_sem);
1465 
1466 	spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
1467 	/* wait for spawning threads to insert themselves to the list */
1468 	while (tq->tq_nspawn) {
1469 		spin_unlock_irqrestore(&tq->tq_lock, flags);
1470 		schedule_timeout_interruptible(1);
1471 		spin_lock_irqsave_nested(&tq->tq_lock, flags,
1472 		    tq->tq_lock_class);
1473 	}
1474 
1475 	/*
1476 	 * Signal each thread to exit and block until it does.  Each thread
1477 	 * is responsible for removing itself from the list and freeing its
1478 	 * taskq_thread_t.  This allows for idle threads to opt to remove
1479 	 * themselves from the taskq.  They can be recreated as needed.
1480 	 */
1481 	while (!list_empty(&tq->tq_thread_list)) {
1482 		tqt = list_entry(tq->tq_thread_list.next,
1483 		    taskq_thread_t, tqt_thread_list);
1484 		thread = tqt->tqt_thread;
1485 		spin_unlock_irqrestore(&tq->tq_lock, flags);
1486 
1487 		kthread_stop(thread);
1488 
1489 		spin_lock_irqsave_nested(&tq->tq_lock, flags,
1490 		    tq->tq_lock_class);
1491 	}
1492 
1493 	while (!list_empty(&tq->tq_free_list)) {
1494 		t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);
1495 
1496 		ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
1497 
1498 		list_del_init(&t->tqent_list);
1499 		task_free(tq, t);
1500 	}
1501 
1502 	ASSERT0(tq->tq_nthreads);
1503 	ASSERT0(tq->tq_nalloc);
1504 	ASSERT0(tq->tq_nspawn);
1505 	ASSERT(list_empty(&tq->tq_thread_list));
1506 	ASSERT(list_empty(&tq->tq_active_list));
1507 	ASSERT(list_empty(&tq->tq_free_list));
1508 	ASSERT(list_empty(&tq->tq_pend_list));
1509 	ASSERT(list_empty(&tq->tq_prio_list));
1510 	ASSERT(list_empty(&tq->tq_delay_list));
1511 
1512 	spin_unlock_irqrestore(&tq->tq_lock, flags);
1513 
1514 	taskq_stats_fini(tq);
1515 	kmem_strfree(tq->tq_name);
1516 	kmem_free(tq, sizeof (taskq_t));
1517 }
1518 EXPORT_SYMBOL(taskq_destroy);
1519 
1520 /*
1521  * Create a taskq with a specified number of pool threads. Allocate
1522  * and return an array of nthreads kthread_t pointers, one for each
1523  * thread in the pool. The array is not ordered and must be freed
1524  * by the caller.
1525  */
1526 taskq_t *
taskq_create_synced(const char * name,int nthreads,pri_t pri,int minalloc,int maxalloc,uint_t flags,kthread_t *** ktpp)1527 taskq_create_synced(const char *name, int nthreads, pri_t pri,
1528     int minalloc, int maxalloc, uint_t flags, kthread_t ***ktpp)
1529 {
1530 	taskq_t *tq;
1531 	taskq_thread_t *tqt;
1532 	int i = 0;
1533 	kthread_t **kthreads = kmem_zalloc(sizeof (*kthreads) * nthreads,
1534 	    KM_SLEEP);
1535 
1536 	flags &= ~(TASKQ_DYNAMIC | TASKQ_THREADS_CPU_PCT | TASKQ_DC_BATCH);
1537 
1538 	/* taskq_create spawns all the threads before returning */
1539 	tq = taskq_create(name, nthreads, minclsyspri, nthreads, INT_MAX,
1540 	    flags | TASKQ_PREPOPULATE);
1541 	VERIFY(tq != NULL);
1542 	VERIFY(tq->tq_nthreads == nthreads);
1543 
1544 	list_for_each_entry(tqt, &tq->tq_thread_list, tqt_thread_list) {
1545 		kthreads[i] = tqt->tqt_thread;
1546 		i++;
1547 	}
1548 
1549 	ASSERT3S(i, ==, nthreads);
1550 	*ktpp = kthreads;
1551 
1552 	return (tq);
1553 }
1554 EXPORT_SYMBOL(taskq_create_synced);
1555 
1556 static kstat_t *taskq_summary_ksp = NULL;
1557 
1558 static int
spl_taskq_kstat_headers(char * buf,size_t size)1559 spl_taskq_kstat_headers(char *buf, size_t size)
1560 {
1561 	size_t n = snprintf(buf, size,
1562 	    "%-20s | %-17s | %-23s\n"
1563 	    "%-20s | %-17s | %-23s\n"
1564 	    "%-20s | %-17s | %-23s\n",
1565 	    "", "threads", "tasks on queue",
1566 	    "taskq name", "tot [act idl] max", " pend [ norm  high] dly",
1567 	    "--------------------", "-----------------",
1568 	    "-----------------------");
1569 	return (n >= size ? ENOMEM : 0);
1570 }
1571 
1572 static int
spl_taskq_kstat_data(char * buf,size_t size,void * data)1573 spl_taskq_kstat_data(char *buf, size_t size, void *data)
1574 {
1575 	struct list_head *tql = NULL;
1576 	taskq_t *tq;
1577 	char name[TASKQ_NAMELEN+5]; /* 5 for dot, 3x instance digits, null */
1578 	char threads[25];
1579 	char tasks[30];
1580 	size_t n;
1581 	int err = 0;
1582 
1583 	down_read(&tq_list_sem);
1584 	list_for_each_prev(tql, &tq_list) {
1585 		tq = list_entry(tql, taskq_t, tq_taskqs);
1586 
1587 		mutex_enter(tq->tq_ksp->ks_lock);
1588 		taskq_kstats_update(tq->tq_ksp, KSTAT_READ);
1589 		taskq_kstats_t *tqks = tq->tq_ksp->ks_data;
1590 
1591 		snprintf(name, sizeof (name), "%s.%d", tq->tq_name,
1592 		    tq->tq_instance);
1593 		snprintf(threads, sizeof (threads), "%3llu [%3llu %3llu] %3llu",
1594 		    tqks->tqks_threads_total.value.ui64,
1595 		    tqks->tqks_threads_active.value.ui64,
1596 		    tqks->tqks_threads_idle.value.ui64,
1597 		    tqks->tqks_threads_max.value.ui64);
1598 		snprintf(tasks, sizeof (tasks), "%5llu [%5llu %5llu] %3llu",
1599 		    tqks->tqks_tasks_total.value.ui64,
1600 		    tqks->tqks_tasks_pending.value.ui64,
1601 		    tqks->tqks_tasks_priority.value.ui64,
1602 		    tqks->tqks_tasks_delayed.value.ui64);
1603 
1604 		mutex_exit(tq->tq_ksp->ks_lock);
1605 
1606 		n = snprintf(buf, size, "%-20s | %-17s | %-23s\n",
1607 		    name, threads, tasks);
1608 		if (n >= size) {
1609 			err = ENOMEM;
1610 			break;
1611 		}
1612 
1613 		buf = &buf[n];
1614 		size -= n;
1615 	}
1616 
1617 	up_read(&tq_list_sem);
1618 
1619 	return (err);
1620 }
1621 
1622 static void
spl_taskq_kstat_init(void)1623 spl_taskq_kstat_init(void)
1624 {
1625 	kstat_t *ksp = kstat_create("taskq", 0, "summary", "misc",
1626 	    KSTAT_TYPE_RAW, 0, KSTAT_FLAG_VIRTUAL);
1627 
1628 	if (ksp == NULL)
1629 		return;
1630 
1631 	ksp->ks_data = (void *)(uintptr_t)1;
1632 	ksp->ks_ndata = 1;
1633 	kstat_set_raw_ops(ksp, spl_taskq_kstat_headers,
1634 	    spl_taskq_kstat_data, NULL);
1635 	kstat_install(ksp);
1636 
1637 	taskq_summary_ksp = ksp;
1638 }
1639 
1640 static void
spl_taskq_kstat_fini(void)1641 spl_taskq_kstat_fini(void)
1642 {
1643 	if (taskq_summary_ksp == NULL)
1644 		return;
1645 
1646 	kstat_delete(taskq_summary_ksp);
1647 	taskq_summary_ksp = NULL;
1648 }
1649 
1650 static unsigned int spl_taskq_kick = 0;
1651 
1652 /*
1653  * 2.6.36 API Change
1654  * module_param_cb is introduced to take kernel_param_ops and
1655  * module_param_call is marked as obsolete. Also set and get operations
1656  * were changed to take a 'const struct kernel_param *'.
1657  */
1658 static int
1659 #ifdef module_param_cb
param_set_taskq_kick(const char * val,const struct kernel_param * kp)1660 param_set_taskq_kick(const char *val, const struct kernel_param *kp)
1661 #else
1662 param_set_taskq_kick(const char *val, struct kernel_param *kp)
1663 #endif
1664 {
1665 	int ret;
1666 	taskq_t *tq = NULL;
1667 	taskq_ent_t *t;
1668 	unsigned long flags;
1669 
1670 	ret = param_set_uint(val, kp);
1671 	if (ret < 0 || !spl_taskq_kick)
1672 		return (ret);
1673 	/* reset value */
1674 	spl_taskq_kick = 0;
1675 
1676 	down_read(&tq_list_sem);
1677 	list_for_each_entry(tq, &tq_list, tq_taskqs) {
1678 		spin_lock_irqsave_nested(&tq->tq_lock, flags,
1679 		    tq->tq_lock_class);
1680 		/* Check if the first pending is older than 5 seconds */
1681 		t = taskq_next_ent(tq);
1682 		if (t && time_after(jiffies, t->tqent_birth + 5*HZ)) {
1683 			(void) taskq_thread_spawn(tq);
1684 			printk(KERN_INFO "spl: Kicked taskq %s/%d\n",
1685 			    tq->tq_name, tq->tq_instance);
1686 		}
1687 		spin_unlock_irqrestore(&tq->tq_lock, flags);
1688 	}
1689 	up_read(&tq_list_sem);
1690 	return (ret);
1691 }
1692 
1693 #ifdef module_param_cb
1694 static const struct kernel_param_ops param_ops_taskq_kick = {
1695 	.set = param_set_taskq_kick,
1696 	.get = param_get_uint,
1697 };
1698 module_param_cb(spl_taskq_kick, &param_ops_taskq_kick, &spl_taskq_kick, 0644);
1699 #else
1700 module_param_call(spl_taskq_kick, param_set_taskq_kick, param_get_uint,
1701 	&spl_taskq_kick, 0644);
1702 #endif
1703 MODULE_PARM_DESC(spl_taskq_kick,
1704 	"Write nonzero to kick stuck taskqs to spawn more threads");
1705 
1706 /*
1707  * This callback will be called exactly once for each core that comes online,
1708  * for each dynamic taskq. We attempt to expand taskqs that have
1709  * TASKQ_THREADS_CPU_PCT set. We need to redo the percentage calculation every
1710  * time, to correctly determine whether or not to add a thread.
1711  */
1712 static int
spl_taskq_expand(unsigned int cpu,struct hlist_node * node)1713 spl_taskq_expand(unsigned int cpu, struct hlist_node *node)
1714 {
1715 	taskq_t *tq = list_entry(node, taskq_t, tq_hp_cb_node);
1716 	unsigned long flags;
1717 	int err = 0;
1718 
1719 	ASSERT(tq);
1720 	spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
1721 
1722 	if (!(tq->tq_flags & TASKQ_ACTIVE)) {
1723 		spin_unlock_irqrestore(&tq->tq_lock, flags);
1724 		return (err);
1725 	}
1726 
1727 	ASSERT(tq->tq_flags & TASKQ_THREADS_CPU_PCT);
1728 	int nthreads = MIN(tq->tq_cpu_pct, 100);
1729 	nthreads = MAX(((num_online_cpus() + 1) * nthreads) / 100, 1);
1730 	tq->tq_maxthreads = nthreads;
1731 
1732 	if (!((tq->tq_flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic) &&
1733 	    tq->tq_maxthreads > tq->tq_nthreads) {
1734 		spin_unlock_irqrestore(&tq->tq_lock, flags);
1735 		taskq_thread_t *tqt = taskq_thread_create(tq);
1736 		if (tqt == NULL)
1737 			err = -1;
1738 		return (err);
1739 	}
1740 	spin_unlock_irqrestore(&tq->tq_lock, flags);
1741 	return (err);
1742 }
1743 
1744 /*
1745  * While we don't support offlining CPUs, it is possible that CPUs will fail
1746  * to online successfully. We do need to be able to handle this case
1747  * gracefully.
1748  */
1749 static int
spl_taskq_prepare_down(unsigned int cpu,struct hlist_node * node)1750 spl_taskq_prepare_down(unsigned int cpu, struct hlist_node *node)
1751 {
1752 	taskq_t *tq = list_entry(node, taskq_t, tq_hp_cb_node);
1753 	unsigned long flags;
1754 
1755 	ASSERT(tq);
1756 	spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
1757 
1758 	if (!(tq->tq_flags & TASKQ_ACTIVE))
1759 		goto out;
1760 
1761 	ASSERT(tq->tq_flags & TASKQ_THREADS_CPU_PCT);
1762 	int nthreads = MIN(tq->tq_cpu_pct, 100);
1763 	nthreads = MAX(((num_online_cpus()) * nthreads) / 100, 1);
1764 	tq->tq_maxthreads = nthreads;
1765 
1766 	if (!((tq->tq_flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic) &&
1767 	    tq->tq_maxthreads < tq->tq_nthreads) {
1768 		ASSERT3U(tq->tq_maxthreads, ==, tq->tq_nthreads - 1);
1769 		taskq_thread_t *tqt = list_entry(tq->tq_thread_list.next,
1770 		    taskq_thread_t, tqt_thread_list);
1771 		struct task_struct *thread = tqt->tqt_thread;
1772 		spin_unlock_irqrestore(&tq->tq_lock, flags);
1773 
1774 		kthread_stop(thread);
1775 
1776 		return (0);
1777 	}
1778 
1779 out:
1780 	spin_unlock_irqrestore(&tq->tq_lock, flags);
1781 	return (0);
1782 }
1783 
1784 int
spl_taskq_init(void)1785 spl_taskq_init(void)
1786 {
1787 	init_rwsem(&tq_list_sem);
1788 	tsd_create(&taskq_tsd, NULL);
1789 
1790 	spl_taskq_cpuhp_state = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN,
1791 	    "fs/spl_taskq:online", spl_taskq_expand, spl_taskq_prepare_down);
1792 
1793 	system_taskq = taskq_create("spl_system_taskq", MAX(boot_ncpus, 64),
1794 	    maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE|TASKQ_DYNAMIC);
1795 	if (system_taskq == NULL)
1796 		return (-ENOMEM);
1797 
1798 	system_delay_taskq = taskq_create("spl_delay_taskq", MAX(boot_ncpus, 4),
1799 	    maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE|TASKQ_DYNAMIC);
1800 	if (system_delay_taskq == NULL) {
1801 		cpuhp_remove_multi_state(spl_taskq_cpuhp_state);
1802 		taskq_destroy(system_taskq);
1803 		return (-ENOMEM);
1804 	}
1805 
1806 	dynamic_taskq = taskq_create("spl_dynamic_taskq", 1,
1807 	    maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE);
1808 	if (dynamic_taskq == NULL) {
1809 		cpuhp_remove_multi_state(spl_taskq_cpuhp_state);
1810 		taskq_destroy(system_taskq);
1811 		taskq_destroy(system_delay_taskq);
1812 		return (-ENOMEM);
1813 	}
1814 
1815 	/*
1816 	 * This is used to annotate tq_lock, so
1817 	 *   taskq_dispatch -> taskq_thread_spawn -> taskq_dispatch
1818 	 * does not trigger a lockdep warning re: possible recursive locking
1819 	 */
1820 	dynamic_taskq->tq_lock_class = TQ_LOCK_DYNAMIC;
1821 
1822 	spl_taskq_kstat_init();
1823 
1824 	return (0);
1825 }
1826 
1827 void
spl_taskq_fini(void)1828 spl_taskq_fini(void)
1829 {
1830 	spl_taskq_kstat_fini();
1831 
1832 	taskq_destroy(dynamic_taskq);
1833 	dynamic_taskq = NULL;
1834 
1835 	taskq_destroy(system_delay_taskq);
1836 	system_delay_taskq = NULL;
1837 
1838 	taskq_destroy(system_taskq);
1839 	system_taskq = NULL;
1840 
1841 	tsd_destroy(&taskq_tsd);
1842 
1843 	cpuhp_remove_multi_state(spl_taskq_cpuhp_state);
1844 	spl_taskq_cpuhp_state = 0;
1845 }
1846