1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21 /*
22 * Copyright 2010 Sun Microsystems, Inc. All rights reserved.
23 * Use is subject to license terms.
24 */
25 /*
26 * Copyright 2011 Nexenta Systems, Inc. All rights reserved.
27 * Copyright 2012 Garrett D'Amore <garrett@damore.org>. All rights reserved.
28 * Copyright (c) 2014, 2018 by Delphix. All rights reserved.
29 */
30
31 #include <thread.h>
32 #include <synch.h>
33 #include <unistd.h>
34 #include <string.h>
35 #include <errno.h>
36 #include <sys/debug.h>
37 #include <sys/sysmacros.h>
38
39 #include "libzfs_taskq.h"
40
41 #define ZFS_TASKQ_ACTIVE 0x00010000
42 #define ZFS_TASKQ_NAMELEN 31
43
44 typedef struct zfs_taskq_ent {
45 struct zfs_taskq_ent *ztqent_next;
46 struct zfs_taskq_ent *ztqent_prev;
47 ztask_func_t *ztqent_func;
48 void *ztqent_arg;
49 uintptr_t ztqent_flags;
50 } zfs_taskq_ent_t;
51
52 struct zfs_taskq {
53 char ztq_name[ZFS_TASKQ_NAMELEN + 1];
54 mutex_t ztq_lock;
55 rwlock_t ztq_threadlock;
56 cond_t ztq_dispatch_cv;
57 cond_t ztq_wait_cv;
58 thread_t *ztq_threadlist;
59 int ztq_flags;
60 int ztq_active;
61 int ztq_nthreads;
62 int ztq_nalloc;
63 int ztq_minalloc;
64 int ztq_maxalloc;
65 cond_t ztq_maxalloc_cv;
66 int ztq_maxalloc_wait;
67 zfs_taskq_ent_t *ztq_freelist;
68 zfs_taskq_ent_t ztq_task;
69 };
70
71 static zfs_taskq_ent_t *
ztask_alloc(zfs_taskq_t * ztq,int ztqflags)72 ztask_alloc(zfs_taskq_t *ztq, int ztqflags)
73 {
74 zfs_taskq_ent_t *t;
75 timestruc_t ts;
76 int err;
77
78 again: if ((t = ztq->ztq_freelist) != NULL &&
79 ztq->ztq_nalloc >= ztq->ztq_minalloc) {
80 ztq->ztq_freelist = t->ztqent_next;
81 } else {
82 if (ztq->ztq_nalloc >= ztq->ztq_maxalloc) {
83 if (!(ztqflags & UMEM_NOFAIL))
84 return (NULL);
85
86 /*
87 * We don't want to exceed ztq_maxalloc, but we can't
88 * wait for other tasks to complete (and thus free up
89 * task structures) without risking deadlock with
90 * the caller. So, we just delay for one second
91 * to throttle the allocation rate. If we have tasks
92 * complete before one second timeout expires then
93 * zfs_taskq_ent_free will signal us and we will
94 * immediately retry the allocation.
95 */
96 ztq->ztq_maxalloc_wait++;
97
98 ts.tv_sec = 1;
99 ts.tv_nsec = 0;
100 err = cond_reltimedwait(&ztq->ztq_maxalloc_cv,
101 &ztq->ztq_lock, &ts);
102
103 ztq->ztq_maxalloc_wait--;
104 if (err == 0)
105 goto again; /* signaled */
106 }
107 mutex_exit(&ztq->ztq_lock);
108
109 t = umem_alloc(sizeof (zfs_taskq_ent_t), ztqflags);
110
111 mutex_enter(&ztq->ztq_lock);
112 if (t != NULL)
113 ztq->ztq_nalloc++;
114 }
115 return (t);
116 }
117
118 static void
ztask_free(zfs_taskq_t * ztq,zfs_taskq_ent_t * t)119 ztask_free(zfs_taskq_t *ztq, zfs_taskq_ent_t *t)
120 {
121 if (ztq->ztq_nalloc <= ztq->ztq_minalloc) {
122 t->ztqent_next = ztq->ztq_freelist;
123 ztq->ztq_freelist = t;
124 } else {
125 ztq->ztq_nalloc--;
126 mutex_exit(&ztq->ztq_lock);
127 umem_free(t, sizeof (zfs_taskq_ent_t));
128 mutex_enter(&ztq->ztq_lock);
129 }
130
131 if (ztq->ztq_maxalloc_wait)
132 VERIFY0(cond_signal(&ztq->ztq_maxalloc_cv));
133 }
134
135 zfs_taskqid_t
zfs_taskq_dispatch(zfs_taskq_t * ztq,ztask_func_t func,void * arg,uint_t ztqflags)136 zfs_taskq_dispatch(zfs_taskq_t *ztq, ztask_func_t func, void *arg,
137 uint_t ztqflags)
138 {
139 zfs_taskq_ent_t *t;
140
141 mutex_enter(&ztq->ztq_lock);
142 ASSERT(ztq->ztq_flags & ZFS_TASKQ_ACTIVE);
143 if ((t = ztask_alloc(ztq, ztqflags)) == NULL) {
144 mutex_exit(&ztq->ztq_lock);
145 return (0);
146 }
147 if (ztqflags & ZFS_TQ_FRONT) {
148 t->ztqent_next = ztq->ztq_task.ztqent_next;
149 t->ztqent_prev = &ztq->ztq_task;
150 } else {
151 t->ztqent_next = &ztq->ztq_task;
152 t->ztqent_prev = ztq->ztq_task.ztqent_prev;
153 }
154 t->ztqent_next->ztqent_prev = t;
155 t->ztqent_prev->ztqent_next = t;
156 t->ztqent_func = func;
157 t->ztqent_arg = arg;
158 t->ztqent_flags = 0;
159 VERIFY0(cond_signal(&ztq->ztq_dispatch_cv));
160 mutex_exit(&ztq->ztq_lock);
161 return (1);
162 }
163
164 void
zfs_taskq_wait(zfs_taskq_t * ztq)165 zfs_taskq_wait(zfs_taskq_t *ztq)
166 {
167 mutex_enter(&ztq->ztq_lock);
168 while (ztq->ztq_task.ztqent_next != &ztq->ztq_task ||
169 ztq->ztq_active != 0) {
170 int ret = cond_wait(&ztq->ztq_wait_cv, &ztq->ztq_lock);
171 VERIFY(ret == 0 || ret == EINTR);
172 }
173 mutex_exit(&ztq->ztq_lock);
174 }
175
176 static void *
zfs_taskq_thread(void * arg)177 zfs_taskq_thread(void *arg)
178 {
179 zfs_taskq_t *ztq = arg;
180 zfs_taskq_ent_t *t;
181 boolean_t prealloc;
182
183 mutex_enter(&ztq->ztq_lock);
184 while (ztq->ztq_flags & ZFS_TASKQ_ACTIVE) {
185 if ((t = ztq->ztq_task.ztqent_next) == &ztq->ztq_task) {
186 int ret;
187 if (--ztq->ztq_active == 0)
188 VERIFY0(cond_broadcast(&ztq->ztq_wait_cv));
189 ret = cond_wait(&ztq->ztq_dispatch_cv, &ztq->ztq_lock);
190 VERIFY(ret == 0 || ret == EINTR);
191 ztq->ztq_active++;
192 continue;
193 }
194 t->ztqent_prev->ztqent_next = t->ztqent_next;
195 t->ztqent_next->ztqent_prev = t->ztqent_prev;
196 t->ztqent_next = NULL;
197 t->ztqent_prev = NULL;
198 prealloc = t->ztqent_flags & ZFS_TQENT_FLAG_PREALLOC;
199 mutex_exit(&ztq->ztq_lock);
200
201 VERIFY0(rw_rdlock(&ztq->ztq_threadlock));
202 t->ztqent_func(t->ztqent_arg);
203 VERIFY0(rw_unlock(&ztq->ztq_threadlock));
204
205 mutex_enter(&ztq->ztq_lock);
206 if (!prealloc)
207 ztask_free(ztq, t);
208 }
209 ztq->ztq_nthreads--;
210 VERIFY0(cond_broadcast(&ztq->ztq_wait_cv));
211 mutex_exit(&ztq->ztq_lock);
212 return (NULL);
213 }
214
215 /*ARGSUSED*/
216 zfs_taskq_t *
zfs_taskq_create(const char * name,int nthreads,pri_t pri,int minalloc,int maxalloc,uint_t flags)217 zfs_taskq_create(const char *name, int nthreads, pri_t pri, int minalloc,
218 int maxalloc, uint_t flags)
219 {
220 zfs_taskq_t *ztq = umem_zalloc(sizeof (zfs_taskq_t), UMEM_NOFAIL);
221 int t;
222
223 ASSERT3S(nthreads, >=, 1);
224
225 VERIFY0(rwlock_init(&ztq->ztq_threadlock, USYNC_THREAD, NULL));
226 VERIFY0(cond_init(&ztq->ztq_dispatch_cv, USYNC_THREAD, NULL));
227 VERIFY0(cond_init(&ztq->ztq_wait_cv, USYNC_THREAD, NULL));
228 VERIFY0(cond_init(&ztq->ztq_maxalloc_cv, USYNC_THREAD, NULL));
229 VERIFY0(mutex_init(
230 &ztq->ztq_lock, LOCK_NORMAL | LOCK_ERRORCHECK, NULL));
231
232 (void) strncpy(ztq->ztq_name, name, ZFS_TASKQ_NAMELEN + 1);
233
234 ztq->ztq_flags = flags | ZFS_TASKQ_ACTIVE;
235 ztq->ztq_active = nthreads;
236 ztq->ztq_nthreads = nthreads;
237 ztq->ztq_minalloc = minalloc;
238 ztq->ztq_maxalloc = maxalloc;
239 ztq->ztq_task.ztqent_next = &ztq->ztq_task;
240 ztq->ztq_task.ztqent_prev = &ztq->ztq_task;
241 ztq->ztq_threadlist =
242 umem_alloc(nthreads * sizeof (thread_t), UMEM_NOFAIL);
243
244 if (flags & ZFS_TASKQ_PREPOPULATE) {
245 mutex_enter(&ztq->ztq_lock);
246 while (minalloc-- > 0)
247 ztask_free(ztq, ztask_alloc(ztq, UMEM_NOFAIL));
248 mutex_exit(&ztq->ztq_lock);
249 }
250
251 for (t = 0; t < nthreads; t++) {
252 (void) thr_create(0, 0, zfs_taskq_thread,
253 ztq, THR_BOUND, &ztq->ztq_threadlist[t]);
254 }
255
256 return (ztq);
257 }
258
259 void
zfs_taskq_destroy(zfs_taskq_t * ztq)260 zfs_taskq_destroy(zfs_taskq_t *ztq)
261 {
262 int t;
263 int nthreads = ztq->ztq_nthreads;
264
265 zfs_taskq_wait(ztq);
266
267 mutex_enter(&ztq->ztq_lock);
268
269 ztq->ztq_flags &= ~ZFS_TASKQ_ACTIVE;
270 VERIFY0(cond_broadcast(&ztq->ztq_dispatch_cv));
271
272 while (ztq->ztq_nthreads != 0) {
273 int ret = cond_wait(&ztq->ztq_wait_cv, &ztq->ztq_lock);
274 VERIFY(ret == 0 || ret == EINTR);
275 }
276
277 ztq->ztq_minalloc = 0;
278 while (ztq->ztq_nalloc != 0) {
279 ASSERT(ztq->ztq_freelist != NULL);
280 ztask_free(ztq, ztask_alloc(ztq, UMEM_NOFAIL));
281 }
282
283 mutex_exit(&ztq->ztq_lock);
284
285 for (t = 0; t < nthreads; t++)
286 (void) thr_join(ztq->ztq_threadlist[t], NULL, NULL);
287
288 umem_free(ztq->ztq_threadlist, nthreads * sizeof (thread_t));
289
290 VERIFY0(rwlock_destroy(&ztq->ztq_threadlock));
291 VERIFY0(cond_destroy(&ztq->ztq_dispatch_cv));
292 VERIFY0(cond_destroy(&ztq->ztq_wait_cv));
293 VERIFY0(cond_destroy(&ztq->ztq_maxalloc_cv));
294 VERIFY0(mutex_destroy(&ztq->ztq_lock));
295
296 umem_free(ztq, sizeof (zfs_taskq_t));
297 }
298