xref: /freebsd/sys/compat/linuxkpi/common/src/linux_work.c (revision 99282790b7d01ec3c4072621d46a0d7302517ad4)
1 /*-
2  * Copyright (c) 2017-2019 Hans Petter Selasky
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice unmodified, this list of conditions, and the following
10  *    disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
16  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
17  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
18  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
19  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
20  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
21  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
22  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
24  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25  */
26 
27 #include <sys/cdefs.h>
28 __FBSDID("$FreeBSD$");
29 
30 #include <linux/workqueue.h>
31 #include <linux/wait.h>
32 #include <linux/compat.h>
33 #include <linux/spinlock.h>
34 #include <linux/rcupdate.h>
35 
36 #include <sys/kernel.h>
37 
38 /*
39  * Define all work struct states
40  */
41 enum {
42 	WORK_ST_IDLE,			/* idle - not started */
43 	WORK_ST_TIMER,			/* timer is being started */
44 	WORK_ST_TASK,			/* taskqueue is being queued */
45 	WORK_ST_EXEC,			/* callback is being called */
46 	WORK_ST_CANCEL,			/* cancel is being requested */
47 	WORK_ST_MAX,
48 };
49 
50 /*
51  * Define global workqueues
52  */
53 static struct workqueue_struct *linux_system_short_wq;
54 static struct workqueue_struct *linux_system_long_wq;
55 
56 struct workqueue_struct *system_wq;
57 struct workqueue_struct *system_long_wq;
58 struct workqueue_struct *system_unbound_wq;
59 struct workqueue_struct *system_highpri_wq;
60 struct workqueue_struct *system_power_efficient_wq;
61 
62 static int linux_default_wq_cpus = 4;
63 
64 static void linux_delayed_work_timer_fn(void *);
65 
66 /*
67  * This function atomically updates the work state and returns the
68  * previous state at the time of update.
69  */
70 static uint8_t
71 linux_update_state(atomic_t *v, const uint8_t *pstate)
72 {
73 	int c, old;
74 
75 	c = v->counter;
76 
77 	while ((old = atomic_cmpxchg(v, c, pstate[c])) != c)
78 		c = old;
79 
80 	return (c);
81 }
82 
83 /*
84  * A LinuxKPI task is allowed to free itself inside the callback function
85  * and cannot safely be referred after the callback function has
86  * completed. This function gives the linux_work_fn() function a hint,
87  * that the task is not going away and can have its state checked
88  * again. Without this extra hint LinuxKPI tasks cannot be serialized
89  * accross multiple worker threads.
90  */
91 static bool
92 linux_work_exec_unblock(struct work_struct *work)
93 {
94 	struct workqueue_struct *wq;
95 	struct work_exec *exec;
96 	bool retval = false;
97 
98 	wq = work->work_queue;
99 	if (unlikely(wq == NULL))
100 		goto done;
101 
102 	WQ_EXEC_LOCK(wq);
103 	TAILQ_FOREACH(exec, &wq->exec_head, entry) {
104 		if (exec->target == work) {
105 			exec->target = NULL;
106 			retval = true;
107 			break;
108 		}
109 	}
110 	WQ_EXEC_UNLOCK(wq);
111 done:
112 	return (retval);
113 }
114 
115 static void
116 linux_delayed_work_enqueue(struct delayed_work *dwork)
117 {
118 	struct taskqueue *tq;
119 
120 	tq = dwork->work.work_queue->taskqueue;
121 	taskqueue_enqueue(tq, &dwork->work.work_task);
122 }
123 
124 /*
125  * This function queues the given work structure on the given
126  * workqueue. It returns non-zero if the work was successfully
127  * [re-]queued. Else the work is already pending for completion.
128  */
129 bool
130 linux_queue_work_on(int cpu __unused, struct workqueue_struct *wq,
131     struct work_struct *work)
132 {
133 	static const uint8_t states[WORK_ST_MAX] __aligned(8) = {
134 		[WORK_ST_IDLE] = WORK_ST_TASK,		/* start queuing task */
135 		[WORK_ST_TIMER] = WORK_ST_TIMER,	/* NOP */
136 		[WORK_ST_TASK] = WORK_ST_TASK,		/* NOP */
137 		[WORK_ST_EXEC] = WORK_ST_TASK,		/* queue task another time */
138 		[WORK_ST_CANCEL] = WORK_ST_TASK,	/* start queuing task again */
139 	};
140 
141 	if (atomic_read(&wq->draining) != 0)
142 		return (!work_pending(work));
143 
144 	switch (linux_update_state(&work->state, states)) {
145 	case WORK_ST_EXEC:
146 	case WORK_ST_CANCEL:
147 		if (linux_work_exec_unblock(work) != 0)
148 			return (true);
149 		/* FALLTHROUGH */
150 	case WORK_ST_IDLE:
151 		work->work_queue = wq;
152 		taskqueue_enqueue(wq->taskqueue, &work->work_task);
153 		return (true);
154 	default:
155 		return (false);		/* already on a queue */
156 	}
157 }
158 
159 /*
160  * Callback func for linux_queue_rcu_work
161  */
162 static void
163 rcu_work_func(struct rcu_head *rcu)
164 {
165 	struct rcu_work *rwork;
166 
167 	rwork = container_of(rcu, struct rcu_work, rcu);
168 	linux_queue_work_on(WORK_CPU_UNBOUND, rwork->wq, &rwork->work);
169 }
170 
171 /*
172  * This function queue a work after a grace period
173  * If the work was already pending it returns false,
174  * if not it calls call_rcu and returns true.
175  */
176 bool
177 linux_queue_rcu_work(struct workqueue_struct *wq, struct rcu_work *rwork)
178 {
179 
180 	if (!linux_work_pending(&rwork->work)) {
181 		rwork->wq = wq;
182 		linux_call_rcu(RCU_TYPE_REGULAR, &rwork->rcu, rcu_work_func);
183 		return (true);
184 	}
185 	return (false);
186 }
187 
188 /*
189  * This function waits for the last execution of a work and then
190  * flush the work.
191  * It returns true if the work was pending and we waited, it returns
192  * false otherwise.
193  */
194 bool
195 linux_flush_rcu_work(struct rcu_work *rwork)
196 {
197 
198 	if (linux_work_pending(&rwork->work)) {
199 		linux_rcu_barrier(RCU_TYPE_REGULAR);
200 		linux_flush_work(&rwork->work);
201 		return (true);
202 	}
203 	return (linux_flush_work(&rwork->work));
204 }
205 
206 /*
207  * This function queues the given work structure on the given
208  * workqueue after a given delay in ticks. It returns non-zero if the
209  * work was successfully [re-]queued. Else the work is already pending
210  * for completion.
211  */
212 bool
213 linux_queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
214     struct delayed_work *dwork, unsigned delay)
215 {
216 	static const uint8_t states[WORK_ST_MAX] __aligned(8) = {
217 		[WORK_ST_IDLE] = WORK_ST_TIMER,		/* start timeout */
218 		[WORK_ST_TIMER] = WORK_ST_TIMER,	/* NOP */
219 		[WORK_ST_TASK] = WORK_ST_TASK,		/* NOP */
220 		[WORK_ST_EXEC] = WORK_ST_TIMER,		/* start timeout */
221 		[WORK_ST_CANCEL] = WORK_ST_TIMER,	/* start timeout */
222 	};
223 
224 	if (atomic_read(&wq->draining) != 0)
225 		return (!work_pending(&dwork->work));
226 
227 	switch (linux_update_state(&dwork->work.state, states)) {
228 	case WORK_ST_EXEC:
229 	case WORK_ST_CANCEL:
230 		if (delay == 0 && linux_work_exec_unblock(&dwork->work) != 0) {
231 			dwork->timer.expires = jiffies;
232 			return (true);
233 		}
234 		/* FALLTHROUGH */
235 	case WORK_ST_IDLE:
236 		dwork->work.work_queue = wq;
237 		dwork->timer.expires = jiffies + delay;
238 
239 		if (delay == 0) {
240 			linux_delayed_work_enqueue(dwork);
241 		} else if (unlikely(cpu != WORK_CPU_UNBOUND)) {
242 			mtx_lock(&dwork->timer.mtx);
243 			callout_reset_on(&dwork->timer.callout, delay,
244 			    &linux_delayed_work_timer_fn, dwork, cpu);
245 			mtx_unlock(&dwork->timer.mtx);
246 		} else {
247 			mtx_lock(&dwork->timer.mtx);
248 			callout_reset(&dwork->timer.callout, delay,
249 			    &linux_delayed_work_timer_fn, dwork);
250 			mtx_unlock(&dwork->timer.mtx);
251 		}
252 		return (true);
253 	default:
254 		return (false);		/* already on a queue */
255 	}
256 }
257 
258 void
259 linux_work_fn(void *context, int pending)
260 {
261 	static const uint8_t states[WORK_ST_MAX] __aligned(8) = {
262 		[WORK_ST_IDLE] = WORK_ST_IDLE,		/* NOP */
263 		[WORK_ST_TIMER] = WORK_ST_EXEC,		/* delayed work w/o timeout */
264 		[WORK_ST_TASK] = WORK_ST_EXEC,		/* call callback */
265 		[WORK_ST_EXEC] = WORK_ST_IDLE,		/* complete callback */
266 		[WORK_ST_CANCEL] = WORK_ST_EXEC,	/* failed to cancel */
267 	};
268 	struct work_struct *work;
269 	struct workqueue_struct *wq;
270 	struct work_exec exec;
271 	struct task_struct *task;
272 
273 	task = current;
274 
275 	/* setup local variables */
276 	work = context;
277 	wq = work->work_queue;
278 
279 	/* store target pointer */
280 	exec.target = work;
281 
282 	/* insert executor into list */
283 	WQ_EXEC_LOCK(wq);
284 	TAILQ_INSERT_TAIL(&wq->exec_head, &exec, entry);
285 	while (1) {
286 		switch (linux_update_state(&work->state, states)) {
287 		case WORK_ST_TIMER:
288 		case WORK_ST_TASK:
289 		case WORK_ST_CANCEL:
290 			WQ_EXEC_UNLOCK(wq);
291 
292 			/* set current work structure */
293 			task->work = work;
294 
295 			/* call work function */
296 			work->func(work);
297 
298 			/* set current work structure */
299 			task->work = NULL;
300 
301 			WQ_EXEC_LOCK(wq);
302 			/* check if unblocked */
303 			if (exec.target != work) {
304 				/* reapply block */
305 				exec.target = work;
306 				break;
307 			}
308 			/* FALLTHROUGH */
309 		default:
310 			goto done;
311 		}
312 	}
313 done:
314 	/* remove executor from list */
315 	TAILQ_REMOVE(&wq->exec_head, &exec, entry);
316 	WQ_EXEC_UNLOCK(wq);
317 }
318 
319 void
320 linux_delayed_work_fn(void *context, int pending)
321 {
322 	struct delayed_work *dwork = context;
323 
324 	/*
325 	 * Make sure the timer belonging to the delayed work gets
326 	 * drained before invoking the work function. Else the timer
327 	 * mutex may still be in use which can lead to use-after-free
328 	 * situations, because the work function might free the work
329 	 * structure before returning.
330 	 */
331 	callout_drain(&dwork->timer.callout);
332 
333 	linux_work_fn(&dwork->work, pending);
334 }
335 
336 static void
337 linux_delayed_work_timer_fn(void *arg)
338 {
339 	static const uint8_t states[WORK_ST_MAX] __aligned(8) = {
340 		[WORK_ST_IDLE] = WORK_ST_IDLE,		/* NOP */
341 		[WORK_ST_TIMER] = WORK_ST_TASK,		/* start queueing task */
342 		[WORK_ST_TASK] = WORK_ST_TASK,		/* NOP */
343 		[WORK_ST_EXEC] = WORK_ST_EXEC,		/* NOP */
344 		[WORK_ST_CANCEL] = WORK_ST_TASK,	/* failed to cancel */
345 	};
346 	struct delayed_work *dwork = arg;
347 
348 	switch (linux_update_state(&dwork->work.state, states)) {
349 	case WORK_ST_TIMER:
350 	case WORK_ST_CANCEL:
351 		linux_delayed_work_enqueue(dwork);
352 		break;
353 	default:
354 		break;
355 	}
356 }
357 
358 /*
359  * This function cancels the given work structure in a synchronous
360  * fashion. It returns non-zero if the work was successfully
361  * cancelled. Else the work was already cancelled.
362  */
363 bool
364 linux_cancel_work_sync(struct work_struct *work)
365 {
366 	static const uint8_t states[WORK_ST_MAX] __aligned(8) = {
367 		[WORK_ST_IDLE] = WORK_ST_IDLE,		/* NOP */
368 		[WORK_ST_TIMER] = WORK_ST_TIMER,	/* can't happen */
369 		[WORK_ST_TASK] = WORK_ST_IDLE,		/* cancel and drain */
370 		[WORK_ST_EXEC] = WORK_ST_IDLE,		/* too late, drain */
371 		[WORK_ST_CANCEL] = WORK_ST_IDLE,	/* cancel and drain */
372 	};
373 	struct taskqueue *tq;
374 	bool retval = false;
375 
376 	WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL,
377 	    "linux_cancel_work_sync() might sleep");
378 retry:
379 	switch (linux_update_state(&work->state, states)) {
380 	case WORK_ST_IDLE:
381 	case WORK_ST_TIMER:
382 		return (retval);
383 	case WORK_ST_EXEC:
384 		tq = work->work_queue->taskqueue;
385 		if (taskqueue_cancel(tq, &work->work_task, NULL) != 0)
386 			taskqueue_drain(tq, &work->work_task);
387 		goto retry;	/* work may have restarted itself */
388 	default:
389 		tq = work->work_queue->taskqueue;
390 		if (taskqueue_cancel(tq, &work->work_task, NULL) != 0)
391 			taskqueue_drain(tq, &work->work_task);
392 		retval = true;
393 		goto retry;
394 	}
395 }
396 
397 /*
398  * This function atomically stops the timer and callback. The timer
399  * callback will not be called after this function returns. This
400  * functions returns true when the timeout was cancelled. Else the
401  * timeout was not started or has already been called.
402  */
403 static inline bool
404 linux_cancel_timer(struct delayed_work *dwork, bool drain)
405 {
406 	bool cancelled;
407 
408 	mtx_lock(&dwork->timer.mtx);
409 	cancelled = (callout_stop(&dwork->timer.callout) == 1);
410 	mtx_unlock(&dwork->timer.mtx);
411 
412 	/* check if we should drain */
413 	if (drain)
414 		callout_drain(&dwork->timer.callout);
415 	return (cancelled);
416 }
417 
418 /*
419  * This function cancels the given delayed work structure in a
420  * non-blocking fashion. It returns non-zero if the work was
421  * successfully cancelled. Else the work may still be busy or already
422  * cancelled.
423  */
424 bool
425 linux_cancel_delayed_work(struct delayed_work *dwork)
426 {
427 	static const uint8_t states[WORK_ST_MAX] __aligned(8) = {
428 		[WORK_ST_IDLE] = WORK_ST_IDLE,		/* NOP */
429 		[WORK_ST_TIMER] = WORK_ST_CANCEL,	/* try to cancel */
430 		[WORK_ST_TASK] = WORK_ST_CANCEL,	/* try to cancel */
431 		[WORK_ST_EXEC] = WORK_ST_EXEC,		/* NOP */
432 		[WORK_ST_CANCEL] = WORK_ST_CANCEL,	/* NOP */
433 	};
434 	struct taskqueue *tq;
435 
436 	switch (linux_update_state(&dwork->work.state, states)) {
437 	case WORK_ST_TIMER:
438 	case WORK_ST_CANCEL:
439 		if (linux_cancel_timer(dwork, 0)) {
440 			atomic_cmpxchg(&dwork->work.state,
441 			    WORK_ST_CANCEL, WORK_ST_IDLE);
442 			return (true);
443 		}
444 		/* FALLTHROUGH */
445 	case WORK_ST_TASK:
446 		tq = dwork->work.work_queue->taskqueue;
447 		if (taskqueue_cancel(tq, &dwork->work.work_task, NULL) == 0) {
448 			atomic_cmpxchg(&dwork->work.state,
449 			    WORK_ST_CANCEL, WORK_ST_IDLE);
450 			return (true);
451 		}
452 		/* FALLTHROUGH */
453 	default:
454 		return (false);
455 	}
456 }
457 
458 /*
459  * This function cancels the given work structure in a synchronous
460  * fashion. It returns non-zero if the work was successfully
461  * cancelled. Else the work was already cancelled.
462  */
463 bool
464 linux_cancel_delayed_work_sync(struct delayed_work *dwork)
465 {
466 	static const uint8_t states[WORK_ST_MAX] __aligned(8) = {
467 		[WORK_ST_IDLE] = WORK_ST_IDLE,		/* NOP */
468 		[WORK_ST_TIMER] = WORK_ST_IDLE,		/* cancel and drain */
469 		[WORK_ST_TASK] = WORK_ST_IDLE,		/* cancel and drain */
470 		[WORK_ST_EXEC] = WORK_ST_IDLE,		/* too late, drain */
471 		[WORK_ST_CANCEL] = WORK_ST_IDLE,	/* cancel and drain */
472 	};
473 	struct taskqueue *tq;
474 	bool retval = false;
475 
476 	WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL,
477 	    "linux_cancel_delayed_work_sync() might sleep");
478 retry:
479 	switch (linux_update_state(&dwork->work.state, states)) {
480 	case WORK_ST_IDLE:
481 		return (retval);
482 	case WORK_ST_EXEC:
483 		tq = dwork->work.work_queue->taskqueue;
484 		if (taskqueue_cancel(tq, &dwork->work.work_task, NULL) != 0)
485 			taskqueue_drain(tq, &dwork->work.work_task);
486 		goto retry;	/* work may have restarted itself */
487 	case WORK_ST_TIMER:
488 	case WORK_ST_CANCEL:
489 		if (linux_cancel_timer(dwork, 1)) {
490 			/*
491 			 * Make sure taskqueue is also drained before
492 			 * returning:
493 			 */
494 			tq = dwork->work.work_queue->taskqueue;
495 			taskqueue_drain(tq, &dwork->work.work_task);
496 			retval = true;
497 			goto retry;
498 		}
499 		/* FALLTHROUGH */
500 	default:
501 		tq = dwork->work.work_queue->taskqueue;
502 		if (taskqueue_cancel(tq, &dwork->work.work_task, NULL) != 0)
503 			taskqueue_drain(tq, &dwork->work.work_task);
504 		retval = true;
505 		goto retry;
506 	}
507 }
508 
509 /*
510  * This function waits until the given work structure is completed.
511  * It returns non-zero if the work was successfully
512  * waited for. Else the work was not waited for.
513  */
514 bool
515 linux_flush_work(struct work_struct *work)
516 {
517 	struct taskqueue *tq;
518 	bool retval;
519 
520 	WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL,
521 	    "linux_flush_work() might sleep");
522 
523 	switch (atomic_read(&work->state)) {
524 	case WORK_ST_IDLE:
525 		return (false);
526 	default:
527 		tq = work->work_queue->taskqueue;
528 		retval = taskqueue_poll_is_busy(tq, &work->work_task);
529 		taskqueue_drain(tq, &work->work_task);
530 		return (retval);
531 	}
532 }
533 
534 /*
535  * This function waits until the given delayed work structure is
536  * completed. It returns non-zero if the work was successfully waited
537  * for. Else the work was not waited for.
538  */
539 bool
540 linux_flush_delayed_work(struct delayed_work *dwork)
541 {
542 	struct taskqueue *tq;
543 	bool retval;
544 
545 	WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL,
546 	    "linux_flush_delayed_work() might sleep");
547 
548 	switch (atomic_read(&dwork->work.state)) {
549 	case WORK_ST_IDLE:
550 		return (false);
551 	case WORK_ST_TIMER:
552 		if (linux_cancel_timer(dwork, 1))
553 			linux_delayed_work_enqueue(dwork);
554 		/* FALLTHROUGH */
555 	default:
556 		tq = dwork->work.work_queue->taskqueue;
557 		retval = taskqueue_poll_is_busy(tq, &dwork->work.work_task);
558 		taskqueue_drain(tq, &dwork->work.work_task);
559 		return (retval);
560 	}
561 }
562 
563 /*
564  * This function returns true if the given work is pending, and not
565  * yet executing:
566  */
567 bool
568 linux_work_pending(struct work_struct *work)
569 {
570 	switch (atomic_read(&work->state)) {
571 	case WORK_ST_TIMER:
572 	case WORK_ST_TASK:
573 	case WORK_ST_CANCEL:
574 		return (true);
575 	default:
576 		return (false);
577 	}
578 }
579 
580 /*
581  * This function returns true if the given work is busy.
582  */
583 bool
584 linux_work_busy(struct work_struct *work)
585 {
586 	struct taskqueue *tq;
587 
588 	switch (atomic_read(&work->state)) {
589 	case WORK_ST_IDLE:
590 		return (false);
591 	case WORK_ST_EXEC:
592 		tq = work->work_queue->taskqueue;
593 		return (taskqueue_poll_is_busy(tq, &work->work_task));
594 	default:
595 		return (true);
596 	}
597 }
598 
599 struct workqueue_struct *
600 linux_create_workqueue_common(const char *name, int cpus)
601 {
602 	struct workqueue_struct *wq;
603 
604 	/*
605 	 * If zero CPUs are specified use the default number of CPUs:
606 	 */
607 	if (cpus == 0)
608 		cpus = linux_default_wq_cpus;
609 
610 	wq = kmalloc(sizeof(*wq), M_WAITOK | M_ZERO);
611 	wq->taskqueue = taskqueue_create(name, M_WAITOK,
612 	    taskqueue_thread_enqueue, &wq->taskqueue);
613 	atomic_set(&wq->draining, 0);
614 	taskqueue_start_threads(&wq->taskqueue, cpus, PWAIT, "%s", name);
615 	TAILQ_INIT(&wq->exec_head);
616 	mtx_init(&wq->exec_mtx, "linux_wq_exec", NULL, MTX_DEF);
617 
618 	return (wq);
619 }
620 
621 void
622 linux_destroy_workqueue(struct workqueue_struct *wq)
623 {
624 	atomic_inc(&wq->draining);
625 	drain_workqueue(wq);
626 	taskqueue_free(wq->taskqueue);
627 	mtx_destroy(&wq->exec_mtx);
628 	kfree(wq);
629 }
630 
631 void
632 linux_init_delayed_work(struct delayed_work *dwork, work_func_t func)
633 {
634 	memset(dwork, 0, sizeof(*dwork));
635 	dwork->work.func = func;
636 	TASK_INIT(&dwork->work.work_task, 0, linux_delayed_work_fn, dwork);
637 	mtx_init(&dwork->timer.mtx, spin_lock_name("lkpi-dwork"), NULL,
638 	    MTX_DEF | MTX_NOWITNESS);
639 	callout_init_mtx(&dwork->timer.callout, &dwork->timer.mtx, 0);
640 }
641 
642 struct work_struct *
643 linux_current_work(void)
644 {
645 	return (current->work);
646 }
647 
648 static void
649 linux_work_init(void *arg)
650 {
651 	int max_wq_cpus = mp_ncpus + 1;
652 
653 	/* avoid deadlock when there are too few threads */
654 	if (max_wq_cpus < 4)
655 		max_wq_cpus = 4;
656 
657 	/* set default number of CPUs */
658 	linux_default_wq_cpus = max_wq_cpus;
659 
660 	linux_system_short_wq = alloc_workqueue("linuxkpi_short_wq", 0, max_wq_cpus);
661 	linux_system_long_wq = alloc_workqueue("linuxkpi_long_wq", 0, max_wq_cpus);
662 
663 	/* populate the workqueue pointers */
664 	system_long_wq = linux_system_long_wq;
665 	system_wq = linux_system_short_wq;
666 	system_power_efficient_wq = linux_system_short_wq;
667 	system_unbound_wq = linux_system_short_wq;
668 	system_highpri_wq = linux_system_short_wq;
669 }
670 SYSINIT(linux_work_init, SI_SUB_TASKQ, SI_ORDER_THIRD, linux_work_init, NULL);
671 
672 static void
673 linux_work_uninit(void *arg)
674 {
675 	destroy_workqueue(linux_system_short_wq);
676 	destroy_workqueue(linux_system_long_wq);
677 
678 	/* clear workqueue pointers */
679 	system_long_wq = NULL;
680 	system_wq = NULL;
681 	system_power_efficient_wq = NULL;
682 	system_unbound_wq = NULL;
683 	system_highpri_wq = NULL;
684 }
685 SYSUNINIT(linux_work_uninit, SI_SUB_TASKQ, SI_ORDER_THIRD, linux_work_uninit, NULL);
686