xref: /illumos-gate/usr/src/lib/libzfs/common/libzfs_taskq.c (revision 5328fc53d11d7151861fa272e4fb0248b8f0e145)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 /*
26  * Copyright 2011 Nexenta Systems, Inc.  All rights reserved.
27  * Copyright 2012 Garrett D'Amore <garrett@damore.org>.  All rights reserved.
28  * Copyright (c) 2014, 2018 by Delphix. All rights reserved.
29  */
30 
31 #include <thread.h>
32 #include <synch.h>
33 #include <unistd.h>
34 #include <string.h>
35 #include <errno.h>
36 #include <sys/debug.h>
37 #include <sys/sysmacros.h>
38 
39 #include "libzfs_taskq.h"
40 
41 #define	ZFS_TASKQ_ACTIVE	0x00010000
42 #define	ZFS_TASKQ_NAMELEN	31
43 
44 typedef struct zfs_taskq_ent {
45 	struct zfs_taskq_ent	*ztqent_next;
46 	struct zfs_taskq_ent	*ztqent_prev;
47 	ztask_func_t		*ztqent_func;
48 	void			*ztqent_arg;
49 	uintptr_t		ztqent_flags;
50 } zfs_taskq_ent_t;
51 
52 struct zfs_taskq {
53 	char		ztq_name[ZFS_TASKQ_NAMELEN + 1];
54 	mutex_t		ztq_lock;
55 	rwlock_t	ztq_threadlock;
56 	cond_t		ztq_dispatch_cv;
57 	cond_t		ztq_wait_cv;
58 	thread_t	*ztq_threadlist;
59 	int		ztq_flags;
60 	int		ztq_active;
61 	int		ztq_nthreads;
62 	int		ztq_nalloc;
63 	int		ztq_minalloc;
64 	int		ztq_maxalloc;
65 	cond_t		ztq_maxalloc_cv;
66 	int		ztq_maxalloc_wait;
67 	zfs_taskq_ent_t	*ztq_freelist;
68 	zfs_taskq_ent_t	ztq_task;
69 };
70 
71 static zfs_taskq_ent_t *
72 ztask_alloc(zfs_taskq_t *ztq, int ztqflags)
73 {
74 	zfs_taskq_ent_t *t;
75 	timestruc_t ts;
76 	int err;
77 
78 again:	if ((t = ztq->ztq_freelist) != NULL &&
79 	    ztq->ztq_nalloc >= ztq->ztq_minalloc) {
80 		ztq->ztq_freelist = t->ztqent_next;
81 	} else {
82 		if (ztq->ztq_nalloc >= ztq->ztq_maxalloc) {
83 			if (!(ztqflags & UMEM_NOFAIL))
84 				return (NULL);
85 
86 			/*
87 			 * We don't want to exceed ztq_maxalloc, but we can't
88 			 * wait for other tasks to complete (and thus free up
89 			 * task structures) without risking deadlock with
90 			 * the caller.  So, we just delay for one second
91 			 * to throttle the allocation rate. If we have tasks
92 			 * complete before one second timeout expires then
93 			 * zfs_taskq_ent_free will signal us and we will
94 			 * immediately retry the allocation.
95 			 */
96 			ztq->ztq_maxalloc_wait++;
97 
98 			ts.tv_sec = 1;
99 			ts.tv_nsec = 0;
100 			err = cond_reltimedwait(&ztq->ztq_maxalloc_cv,
101 			    &ztq->ztq_lock, &ts);
102 
103 			ztq->ztq_maxalloc_wait--;
104 			if (err == 0)
105 				goto again;		/* signaled */
106 		}
107 		mutex_exit(&ztq->ztq_lock);
108 
109 		t = umem_alloc(sizeof (zfs_taskq_ent_t), ztqflags);
110 
111 		mutex_enter(&ztq->ztq_lock);
112 		if (t != NULL)
113 			ztq->ztq_nalloc++;
114 	}
115 	return (t);
116 }
117 
118 static void
119 ztask_free(zfs_taskq_t *ztq, zfs_taskq_ent_t *t)
120 {
121 	if (ztq->ztq_nalloc <= ztq->ztq_minalloc) {
122 		t->ztqent_next = ztq->ztq_freelist;
123 		ztq->ztq_freelist = t;
124 	} else {
125 		ztq->ztq_nalloc--;
126 		mutex_exit(&ztq->ztq_lock);
127 		umem_free(t, sizeof (zfs_taskq_ent_t));
128 		mutex_enter(&ztq->ztq_lock);
129 	}
130 
131 	if (ztq->ztq_maxalloc_wait)
132 		VERIFY0(cond_signal(&ztq->ztq_maxalloc_cv));
133 }
134 
135 zfs_taskqid_t
136 zfs_taskq_dispatch(zfs_taskq_t *ztq, ztask_func_t func, void *arg,
137     uint_t ztqflags)
138 {
139 	zfs_taskq_ent_t *t;
140 
141 	mutex_enter(&ztq->ztq_lock);
142 	ASSERT(ztq->ztq_flags & ZFS_TASKQ_ACTIVE);
143 	if ((t = ztask_alloc(ztq, ztqflags)) == NULL) {
144 		mutex_exit(&ztq->ztq_lock);
145 		return (0);
146 	}
147 	if (ztqflags & ZFS_TQ_FRONT) {
148 		t->ztqent_next = ztq->ztq_task.ztqent_next;
149 		t->ztqent_prev = &ztq->ztq_task;
150 	} else {
151 		t->ztqent_next = &ztq->ztq_task;
152 		t->ztqent_prev = ztq->ztq_task.ztqent_prev;
153 	}
154 	t->ztqent_next->ztqent_prev = t;
155 	t->ztqent_prev->ztqent_next = t;
156 	t->ztqent_func = func;
157 	t->ztqent_arg = arg;
158 	t->ztqent_flags = 0;
159 	VERIFY0(cond_signal(&ztq->ztq_dispatch_cv));
160 	mutex_exit(&ztq->ztq_lock);
161 	return (1);
162 }
163 
164 void
165 zfs_taskq_wait(zfs_taskq_t *ztq)
166 {
167 	mutex_enter(&ztq->ztq_lock);
168 	while (ztq->ztq_task.ztqent_next != &ztq->ztq_task ||
169 	    ztq->ztq_active != 0) {
170 		int ret = cond_wait(&ztq->ztq_wait_cv, &ztq->ztq_lock);
171 		VERIFY(ret == 0 || ret == EINTR);
172 	}
173 	mutex_exit(&ztq->ztq_lock);
174 }
175 
176 static void *
177 zfs_taskq_thread(void *arg)
178 {
179 	zfs_taskq_t *ztq = arg;
180 	zfs_taskq_ent_t *t;
181 	boolean_t prealloc;
182 
183 	mutex_enter(&ztq->ztq_lock);
184 	while (ztq->ztq_flags & ZFS_TASKQ_ACTIVE) {
185 		if ((t = ztq->ztq_task.ztqent_next) == &ztq->ztq_task) {
186 			int ret;
187 			if (--ztq->ztq_active == 0)
188 				VERIFY0(cond_broadcast(&ztq->ztq_wait_cv));
189 			ret = cond_wait(&ztq->ztq_dispatch_cv, &ztq->ztq_lock);
190 			VERIFY(ret == 0 || ret == EINTR);
191 			ztq->ztq_active++;
192 			continue;
193 		}
194 		t->ztqent_prev->ztqent_next = t->ztqent_next;
195 		t->ztqent_next->ztqent_prev = t->ztqent_prev;
196 		t->ztqent_next = NULL;
197 		t->ztqent_prev = NULL;
198 		prealloc = t->ztqent_flags & ZFS_TQENT_FLAG_PREALLOC;
199 		mutex_exit(&ztq->ztq_lock);
200 
201 		VERIFY0(rw_rdlock(&ztq->ztq_threadlock));
202 		t->ztqent_func(t->ztqent_arg);
203 		VERIFY0(rw_unlock(&ztq->ztq_threadlock));
204 
205 		mutex_enter(&ztq->ztq_lock);
206 		if (!prealloc)
207 			ztask_free(ztq, t);
208 	}
209 	ztq->ztq_nthreads--;
210 	VERIFY0(cond_broadcast(&ztq->ztq_wait_cv));
211 	mutex_exit(&ztq->ztq_lock);
212 	return (NULL);
213 }
214 
215 /*ARGSUSED*/
216 zfs_taskq_t *
217 zfs_taskq_create(const char *name, int nthreads, pri_t pri, int minalloc,
218     int maxalloc, uint_t flags)
219 {
220 	zfs_taskq_t *ztq = umem_zalloc(sizeof (zfs_taskq_t), UMEM_NOFAIL);
221 	int t;
222 
223 	ASSERT3S(nthreads, >=, 1);
224 
225 	VERIFY0(rwlock_init(&ztq->ztq_threadlock, USYNC_THREAD, NULL));
226 	VERIFY0(cond_init(&ztq->ztq_dispatch_cv, USYNC_THREAD, NULL));
227 	VERIFY0(cond_init(&ztq->ztq_wait_cv, USYNC_THREAD, NULL));
228 	VERIFY0(cond_init(&ztq->ztq_maxalloc_cv, USYNC_THREAD, NULL));
229 	VERIFY0(mutex_init(
230 	    &ztq->ztq_lock, LOCK_NORMAL | LOCK_ERRORCHECK, NULL));
231 
232 	(void) strncpy(ztq->ztq_name, name, ZFS_TASKQ_NAMELEN + 1);
233 
234 	ztq->ztq_flags = flags | ZFS_TASKQ_ACTIVE;
235 	ztq->ztq_active = nthreads;
236 	ztq->ztq_nthreads = nthreads;
237 	ztq->ztq_minalloc = minalloc;
238 	ztq->ztq_maxalloc = maxalloc;
239 	ztq->ztq_task.ztqent_next = &ztq->ztq_task;
240 	ztq->ztq_task.ztqent_prev = &ztq->ztq_task;
241 	ztq->ztq_threadlist =
242 	    umem_alloc(nthreads * sizeof (thread_t), UMEM_NOFAIL);
243 
244 	if (flags & ZFS_TASKQ_PREPOPULATE) {
245 		mutex_enter(&ztq->ztq_lock);
246 		while (minalloc-- > 0)
247 			ztask_free(ztq, ztask_alloc(ztq, UMEM_NOFAIL));
248 		mutex_exit(&ztq->ztq_lock);
249 	}
250 
251 	for (t = 0; t < nthreads; t++) {
252 		(void) thr_create(0, 0, zfs_taskq_thread,
253 		    ztq, THR_BOUND, &ztq->ztq_threadlist[t]);
254 	}
255 
256 	return (ztq);
257 }
258 
259 void
260 zfs_taskq_destroy(zfs_taskq_t *ztq)
261 {
262 	int t;
263 	int nthreads = ztq->ztq_nthreads;
264 
265 	zfs_taskq_wait(ztq);
266 
267 	mutex_enter(&ztq->ztq_lock);
268 
269 	ztq->ztq_flags &= ~ZFS_TASKQ_ACTIVE;
270 	VERIFY0(cond_broadcast(&ztq->ztq_dispatch_cv));
271 
272 	while (ztq->ztq_nthreads != 0) {
273 		int ret = cond_wait(&ztq->ztq_wait_cv, &ztq->ztq_lock);
274 		VERIFY(ret == 0 || ret == EINTR);
275 	}
276 
277 	ztq->ztq_minalloc = 0;
278 	while (ztq->ztq_nalloc != 0) {
279 		ASSERT(ztq->ztq_freelist != NULL);
280 		ztask_free(ztq, ztask_alloc(ztq, UMEM_NOFAIL));
281 	}
282 
283 	mutex_exit(&ztq->ztq_lock);
284 
285 	for (t = 0; t < nthreads; t++)
286 		(void) thr_join(ztq->ztq_threadlist[t], NULL, NULL);
287 
288 	umem_free(ztq->ztq_threadlist, nthreads * sizeof (thread_t));
289 
290 	VERIFY0(rwlock_destroy(&ztq->ztq_threadlock));
291 	VERIFY0(cond_destroy(&ztq->ztq_dispatch_cv));
292 	VERIFY0(cond_destroy(&ztq->ztq_wait_cv));
293 	VERIFY0(cond_destroy(&ztq->ztq_maxalloc_cv));
294 	VERIFY0(mutex_destroy(&ztq->ztq_lock));
295 
296 	umem_free(ztq, sizeof (zfs_taskq_t));
297 }
298