1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2010 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 /* 26 * Copyright 2011 Nexenta Systems, Inc. All rights reserved. 27 * Copyright 2012 Garrett D'Amore <garrett@damore.org>. All rights reserved. 28 * Copyright (c) 2014, 2018 by Delphix. All rights reserved. 29 */ 30 31 #include <thread.h> 32 #include <synch.h> 33 #include <unistd.h> 34 #include <string.h> 35 #include <errno.h> 36 #include <sys/debug.h> 37 #include <sys/sysmacros.h> 38 39 #include "libzfs_taskq.h" 40 41 #define ZFS_TASKQ_ACTIVE 0x00010000 42 #define ZFS_TASKQ_NAMELEN 31 43 44 typedef struct zfs_taskq_ent { 45 struct zfs_taskq_ent *ztqent_next; 46 struct zfs_taskq_ent *ztqent_prev; 47 ztask_func_t *ztqent_func; 48 void *ztqent_arg; 49 uintptr_t ztqent_flags; 50 } zfs_taskq_ent_t; 51 52 struct zfs_taskq { 53 char ztq_name[ZFS_TASKQ_NAMELEN + 1]; 54 mutex_t ztq_lock; 55 rwlock_t ztq_threadlock; 56 cond_t ztq_dispatch_cv; 57 cond_t ztq_wait_cv; 58 thread_t *ztq_threadlist; 59 int ztq_flags; 60 int ztq_active; 61 int ztq_nthreads; 62 int ztq_nalloc; 63 int ztq_minalloc; 64 int ztq_maxalloc; 65 cond_t ztq_maxalloc_cv; 66 int ztq_maxalloc_wait; 67 zfs_taskq_ent_t *ztq_freelist; 68 zfs_taskq_ent_t ztq_task; 69 }; 70 71 static zfs_taskq_ent_t * 72 ztask_alloc(zfs_taskq_t *ztq, int ztqflags) 73 { 74 zfs_taskq_ent_t *t; 75 timestruc_t ts; 76 int err; 77 78 again: if ((t = ztq->ztq_freelist) != NULL && 79 ztq->ztq_nalloc >= ztq->ztq_minalloc) { 80 ztq->ztq_freelist = t->ztqent_next; 81 } else { 82 if (ztq->ztq_nalloc >= ztq->ztq_maxalloc) { 83 if (!(ztqflags & UMEM_NOFAIL)) 84 return (NULL); 85 86 /* 87 * We don't want to exceed ztq_maxalloc, but we can't 88 * wait for other tasks to complete (and thus free up 89 * task structures) without risking deadlock with 90 * the caller. So, we just delay for one second 91 * to throttle the allocation rate. If we have tasks 92 * complete before one second timeout expires then 93 * zfs_taskq_ent_free will signal us and we will 94 * immediately retry the allocation. 95 */ 96 ztq->ztq_maxalloc_wait++; 97 98 ts.tv_sec = 1; 99 ts.tv_nsec = 0; 100 err = cond_reltimedwait(&ztq->ztq_maxalloc_cv, 101 &ztq->ztq_lock, &ts); 102 103 ztq->ztq_maxalloc_wait--; 104 if (err == 0) 105 goto again; /* signaled */ 106 } 107 mutex_exit(&ztq->ztq_lock); 108 109 t = umem_alloc(sizeof (zfs_taskq_ent_t), ztqflags); 110 111 mutex_enter(&ztq->ztq_lock); 112 if (t != NULL) 113 ztq->ztq_nalloc++; 114 } 115 return (t); 116 } 117 118 static void 119 ztask_free(zfs_taskq_t *ztq, zfs_taskq_ent_t *t) 120 { 121 if (ztq->ztq_nalloc <= ztq->ztq_minalloc) { 122 t->ztqent_next = ztq->ztq_freelist; 123 ztq->ztq_freelist = t; 124 } else { 125 ztq->ztq_nalloc--; 126 mutex_exit(&ztq->ztq_lock); 127 umem_free(t, sizeof (zfs_taskq_ent_t)); 128 mutex_enter(&ztq->ztq_lock); 129 } 130 131 if (ztq->ztq_maxalloc_wait) 132 VERIFY0(cond_signal(&ztq->ztq_maxalloc_cv)); 133 } 134 135 zfs_taskqid_t 136 zfs_taskq_dispatch(zfs_taskq_t *ztq, ztask_func_t func, void *arg, 137 uint_t ztqflags) 138 { 139 zfs_taskq_ent_t *t; 140 141 mutex_enter(&ztq->ztq_lock); 142 ASSERT(ztq->ztq_flags & ZFS_TASKQ_ACTIVE); 143 if ((t = ztask_alloc(ztq, ztqflags)) == NULL) { 144 mutex_exit(&ztq->ztq_lock); 145 return (0); 146 } 147 if (ztqflags & ZFS_TQ_FRONT) { 148 t->ztqent_next = ztq->ztq_task.ztqent_next; 149 t->ztqent_prev = &ztq->ztq_task; 150 } else { 151 t->ztqent_next = &ztq->ztq_task; 152 t->ztqent_prev = ztq->ztq_task.ztqent_prev; 153 } 154 t->ztqent_next->ztqent_prev = t; 155 t->ztqent_prev->ztqent_next = t; 156 t->ztqent_func = func; 157 t->ztqent_arg = arg; 158 t->ztqent_flags = 0; 159 VERIFY0(cond_signal(&ztq->ztq_dispatch_cv)); 160 mutex_exit(&ztq->ztq_lock); 161 return (1); 162 } 163 164 void 165 zfs_taskq_wait(zfs_taskq_t *ztq) 166 { 167 mutex_enter(&ztq->ztq_lock); 168 while (ztq->ztq_task.ztqent_next != &ztq->ztq_task || 169 ztq->ztq_active != 0) { 170 int ret = cond_wait(&ztq->ztq_wait_cv, &ztq->ztq_lock); 171 VERIFY(ret == 0 || ret == EINTR); 172 } 173 mutex_exit(&ztq->ztq_lock); 174 } 175 176 static void * 177 zfs_taskq_thread(void *arg) 178 { 179 zfs_taskq_t *ztq = arg; 180 zfs_taskq_ent_t *t; 181 boolean_t prealloc; 182 183 mutex_enter(&ztq->ztq_lock); 184 while (ztq->ztq_flags & ZFS_TASKQ_ACTIVE) { 185 if ((t = ztq->ztq_task.ztqent_next) == &ztq->ztq_task) { 186 int ret; 187 if (--ztq->ztq_active == 0) 188 VERIFY0(cond_broadcast(&ztq->ztq_wait_cv)); 189 ret = cond_wait(&ztq->ztq_dispatch_cv, &ztq->ztq_lock); 190 VERIFY(ret == 0 || ret == EINTR); 191 ztq->ztq_active++; 192 continue; 193 } 194 t->ztqent_prev->ztqent_next = t->ztqent_next; 195 t->ztqent_next->ztqent_prev = t->ztqent_prev; 196 t->ztqent_next = NULL; 197 t->ztqent_prev = NULL; 198 prealloc = t->ztqent_flags & ZFS_TQENT_FLAG_PREALLOC; 199 mutex_exit(&ztq->ztq_lock); 200 201 VERIFY0(rw_rdlock(&ztq->ztq_threadlock)); 202 t->ztqent_func(t->ztqent_arg); 203 VERIFY0(rw_unlock(&ztq->ztq_threadlock)); 204 205 mutex_enter(&ztq->ztq_lock); 206 if (!prealloc) 207 ztask_free(ztq, t); 208 } 209 ztq->ztq_nthreads--; 210 VERIFY0(cond_broadcast(&ztq->ztq_wait_cv)); 211 mutex_exit(&ztq->ztq_lock); 212 return (NULL); 213 } 214 215 /*ARGSUSED*/ 216 zfs_taskq_t * 217 zfs_taskq_create(const char *name, int nthreads, pri_t pri, int minalloc, 218 int maxalloc, uint_t flags) 219 { 220 zfs_taskq_t *ztq = umem_zalloc(sizeof (zfs_taskq_t), UMEM_NOFAIL); 221 int t; 222 223 ASSERT3S(nthreads, >=, 1); 224 225 VERIFY0(rwlock_init(&ztq->ztq_threadlock, USYNC_THREAD, NULL)); 226 VERIFY0(cond_init(&ztq->ztq_dispatch_cv, USYNC_THREAD, NULL)); 227 VERIFY0(cond_init(&ztq->ztq_wait_cv, USYNC_THREAD, NULL)); 228 VERIFY0(cond_init(&ztq->ztq_maxalloc_cv, USYNC_THREAD, NULL)); 229 VERIFY0(mutex_init( 230 &ztq->ztq_lock, LOCK_NORMAL | LOCK_ERRORCHECK, NULL)); 231 232 (void) strncpy(ztq->ztq_name, name, ZFS_TASKQ_NAMELEN + 1); 233 234 ztq->ztq_flags = flags | ZFS_TASKQ_ACTIVE; 235 ztq->ztq_active = nthreads; 236 ztq->ztq_nthreads = nthreads; 237 ztq->ztq_minalloc = minalloc; 238 ztq->ztq_maxalloc = maxalloc; 239 ztq->ztq_task.ztqent_next = &ztq->ztq_task; 240 ztq->ztq_task.ztqent_prev = &ztq->ztq_task; 241 ztq->ztq_threadlist = 242 umem_alloc(nthreads * sizeof (thread_t), UMEM_NOFAIL); 243 244 if (flags & ZFS_TASKQ_PREPOPULATE) { 245 mutex_enter(&ztq->ztq_lock); 246 while (minalloc-- > 0) 247 ztask_free(ztq, ztask_alloc(ztq, UMEM_NOFAIL)); 248 mutex_exit(&ztq->ztq_lock); 249 } 250 251 for (t = 0; t < nthreads; t++) { 252 (void) thr_create(0, 0, zfs_taskq_thread, 253 ztq, THR_BOUND, &ztq->ztq_threadlist[t]); 254 } 255 256 return (ztq); 257 } 258 259 void 260 zfs_taskq_destroy(zfs_taskq_t *ztq) 261 { 262 int t; 263 int nthreads = ztq->ztq_nthreads; 264 265 zfs_taskq_wait(ztq); 266 267 mutex_enter(&ztq->ztq_lock); 268 269 ztq->ztq_flags &= ~ZFS_TASKQ_ACTIVE; 270 VERIFY0(cond_broadcast(&ztq->ztq_dispatch_cv)); 271 272 while (ztq->ztq_nthreads != 0) { 273 int ret = cond_wait(&ztq->ztq_wait_cv, &ztq->ztq_lock); 274 VERIFY(ret == 0 || ret == EINTR); 275 } 276 277 ztq->ztq_minalloc = 0; 278 while (ztq->ztq_nalloc != 0) { 279 ASSERT(ztq->ztq_freelist != NULL); 280 ztask_free(ztq, ztask_alloc(ztq, UMEM_NOFAIL)); 281 } 282 283 mutex_exit(&ztq->ztq_lock); 284 285 for (t = 0; t < nthreads; t++) 286 (void) thr_join(ztq->ztq_threadlist[t], NULL, NULL); 287 288 umem_free(ztq->ztq_threadlist, nthreads * sizeof (thread_t)); 289 290 VERIFY0(rwlock_destroy(&ztq->ztq_threadlock)); 291 VERIFY0(cond_destroy(&ztq->ztq_dispatch_cv)); 292 VERIFY0(cond_destroy(&ztq->ztq_wait_cv)); 293 VERIFY0(cond_destroy(&ztq->ztq_maxalloc_cv)); 294 VERIFY0(mutex_destroy(&ztq->ztq_lock)); 295 296 umem_free(ztq, sizeof (zfs_taskq_t)); 297 } 298