1 /*- 2 * Copyright (c) 2000 Doug Rabson 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 */ 26 27 #include <sys/cdefs.h> 28 __FBSDID("$FreeBSD$"); 29 30 #include <sys/param.h> 31 #include <sys/systm.h> 32 #include <sys/bus.h> 33 #include <sys/interrupt.h> 34 #include <sys/kernel.h> 35 #include <sys/kthread.h> 36 #include <sys/lock.h> 37 #include <sys/malloc.h> 38 #include <sys/mutex.h> 39 #include <sys/taskqueue.h> 40 #include <sys/unistd.h> 41 42 static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues"); 43 static void *taskqueue_giant_ih; 44 static void *taskqueue_ih; 45 static STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues; 46 static struct mtx taskqueue_queues_mutex; 47 static struct proc *taskqueue_thread_proc; 48 49 struct taskqueue { 50 STAILQ_ENTRY(taskqueue) tq_link; 51 STAILQ_HEAD(, task) tq_queue; 52 const char *tq_name; 53 taskqueue_enqueue_fn tq_enqueue; 54 void *tq_context; 55 int tq_draining; 56 struct mtx tq_mutex; 57 }; 58 59 static void init_taskqueue_list(void *data); 60 61 static void 62 init_taskqueue_list(void *data __unused) 63 { 64 65 mtx_init(&taskqueue_queues_mutex, "taskqueue list", NULL, MTX_DEF); 66 STAILQ_INIT(&taskqueue_queues); 67 } 68 SYSINIT(taskqueue_list, SI_SUB_INTRINSIC, SI_ORDER_ANY, init_taskqueue_list, 69 NULL); 70 71 struct taskqueue * 72 taskqueue_create(const char *name, int mflags, 73 taskqueue_enqueue_fn enqueue, void *context) 74 { 75 struct taskqueue *queue; 76 77 queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO); 78 if (!queue) 79 return 0; 80 81 STAILQ_INIT(&queue->tq_queue); 82 queue->tq_name = name; 83 queue->tq_enqueue = enqueue; 84 queue->tq_context = context; 85 queue->tq_draining = 0; 86 mtx_init(&queue->tq_mutex, "taskqueue", NULL, MTX_DEF); 87 88 mtx_lock(&taskqueue_queues_mutex); 89 STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link); 90 mtx_unlock(&taskqueue_queues_mutex); 91 92 return queue; 93 } 94 95 void 96 taskqueue_free(struct taskqueue *queue) 97 { 98 99 mtx_lock(&queue->tq_mutex); 100 KASSERT(queue->tq_draining == 0, ("free'ing a draining taskqueue")); 101 queue->tq_draining = 1; 102 mtx_unlock(&queue->tq_mutex); 103 104 taskqueue_run(queue); 105 106 mtx_lock(&taskqueue_queues_mutex); 107 STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link); 108 mtx_unlock(&taskqueue_queues_mutex); 109 110 mtx_destroy(&queue->tq_mutex); 111 free(queue, M_TASKQUEUE); 112 } 113 114 /* 115 * Returns with the taskqueue locked. 116 */ 117 struct taskqueue * 118 taskqueue_find(const char *name) 119 { 120 struct taskqueue *queue; 121 122 mtx_lock(&taskqueue_queues_mutex); 123 STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) { 124 mtx_lock(&queue->tq_mutex); 125 if (strcmp(queue->tq_name, name) == 0) { 126 mtx_unlock(&taskqueue_queues_mutex); 127 return queue; 128 } 129 mtx_unlock(&queue->tq_mutex); 130 } 131 mtx_unlock(&taskqueue_queues_mutex); 132 return NULL; 133 } 134 135 int 136 taskqueue_enqueue(struct taskqueue *queue, struct task *task) 137 { 138 struct task *ins; 139 struct task *prev; 140 141 mtx_lock(&queue->tq_mutex); 142 143 /* 144 * Don't allow new tasks on a queue which is being freed. 145 */ 146 if (queue->tq_draining) { 147 mtx_unlock(&queue->tq_mutex); 148 return EPIPE; 149 } 150 151 /* 152 * Count multiple enqueues. 153 */ 154 if (task->ta_pending) { 155 task->ta_pending++; 156 mtx_unlock(&queue->tq_mutex); 157 return 0; 158 } 159 160 /* 161 * Optimise the case when all tasks have the same priority. 162 */ 163 prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 164 if (!prev || prev->ta_priority >= task->ta_priority) { 165 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 166 } else { 167 prev = 0; 168 for (ins = STAILQ_FIRST(&queue->tq_queue); ins; 169 prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 170 if (ins->ta_priority < task->ta_priority) 171 break; 172 173 if (prev) 174 STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 175 else 176 STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 177 } 178 179 task->ta_pending = 1; 180 if (queue->tq_enqueue) 181 queue->tq_enqueue(queue->tq_context); 182 183 mtx_unlock(&queue->tq_mutex); 184 185 return 0; 186 } 187 188 void 189 taskqueue_run(struct taskqueue *queue) 190 { 191 struct task *task; 192 int pending; 193 194 mtx_lock(&queue->tq_mutex); 195 while (STAILQ_FIRST(&queue->tq_queue)) { 196 /* 197 * Carefully remove the first task from the queue and 198 * zero its pending count. 199 */ 200 task = STAILQ_FIRST(&queue->tq_queue); 201 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 202 pending = task->ta_pending; 203 task->ta_pending = 0; 204 mtx_unlock(&queue->tq_mutex); 205 206 task->ta_func(task->ta_context, pending); 207 208 mtx_lock(&queue->tq_mutex); 209 } 210 mtx_unlock(&queue->tq_mutex); 211 } 212 213 static void 214 taskqueue_swi_enqueue(void *context) 215 { 216 swi_sched(taskqueue_ih, 0); 217 } 218 219 static void 220 taskqueue_swi_run(void *dummy) 221 { 222 taskqueue_run(taskqueue_swi); 223 } 224 225 static void 226 taskqueue_swi_giant_enqueue(void *context) 227 { 228 swi_sched(taskqueue_giant_ih, 0); 229 } 230 231 static void 232 taskqueue_swi_giant_run(void *dummy) 233 { 234 taskqueue_run(taskqueue_swi_giant); 235 } 236 237 static void 238 taskqueue_thread_loop(void *arg) 239 { 240 for (;;) { 241 mtx_lock(&taskqueue_thread->tq_mutex); 242 while (STAILQ_EMPTY(&taskqueue_thread->tq_queue)) 243 msleep(taskqueue_thread, &taskqueue_thread->tq_mutex, 244 PWAIT, "-", 0); 245 mtx_unlock(&taskqueue_thread->tq_mutex); 246 taskqueue_run(taskqueue_thread); 247 } 248 } 249 250 static void 251 taskqueue_thread_enqueue(void *context) 252 { 253 mtx_assert(&taskqueue_thread->tq_mutex, MA_OWNED); 254 wakeup(taskqueue_thread); 255 } 256 257 TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0, 258 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ, 259 INTR_MPSAFE, &taskqueue_ih)); 260 261 TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, 0, 262 swi_add(NULL, "Giant task queue", taskqueue_swi_giant_run, 263 NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih)); 264 265 TASKQUEUE_DEFINE(thread, taskqueue_thread_enqueue, 0, 266 kthread_create(taskqueue_thread_loop, NULL, 267 &taskqueue_thread_proc, 0, 0, "taskqueue")); 268 269 int 270 taskqueue_enqueue_fast(struct taskqueue *queue, struct task *task) 271 { 272 struct task *ins; 273 struct task *prev; 274 275 mtx_lock_spin(&queue->tq_mutex); 276 277 /* 278 * Don't allow new tasks on a queue which is being freed. 279 */ 280 if (queue->tq_draining) { 281 mtx_unlock_spin(&queue->tq_mutex); 282 return EPIPE; 283 } 284 285 /* 286 * Count multiple enqueues. 287 */ 288 if (task->ta_pending) { 289 task->ta_pending++; 290 mtx_unlock_spin(&queue->tq_mutex); 291 return 0; 292 } 293 294 /* 295 * Optimise the case when all tasks have the same priority. 296 */ 297 prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 298 if (!prev || prev->ta_priority >= task->ta_priority) { 299 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 300 } else { 301 prev = 0; 302 for (ins = STAILQ_FIRST(&queue->tq_queue); ins; 303 prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 304 if (ins->ta_priority < task->ta_priority) 305 break; 306 307 if (prev) 308 STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 309 else 310 STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 311 } 312 313 task->ta_pending = 1; 314 if (queue->tq_enqueue) 315 queue->tq_enqueue(queue->tq_context); 316 317 mtx_unlock_spin(&queue->tq_mutex); 318 319 return 0; 320 } 321 322 static void 323 taskqueue_run_fast(struct taskqueue *queue) 324 { 325 struct task *task; 326 int pending; 327 328 mtx_lock_spin(&queue->tq_mutex); 329 while (STAILQ_FIRST(&queue->tq_queue)) { 330 /* 331 * Carefully remove the first task from the queue and 332 * zero its pending count. 333 */ 334 task = STAILQ_FIRST(&queue->tq_queue); 335 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 336 pending = task->ta_pending; 337 task->ta_pending = 0; 338 mtx_unlock_spin(&queue->tq_mutex); 339 340 task->ta_func(task->ta_context, pending); 341 342 mtx_lock_spin(&queue->tq_mutex); 343 } 344 mtx_unlock_spin(&queue->tq_mutex); 345 } 346 347 struct taskqueue *taskqueue_fast; 348 static void *taskqueue_fast_ih; 349 350 static void 351 taskqueue_fast_schedule(void *context) 352 { 353 swi_sched(taskqueue_fast_ih, 0); 354 } 355 356 static void 357 taskqueue_fast_run(void *dummy) 358 { 359 taskqueue_run_fast(taskqueue_fast); 360 } 361 362 static void 363 taskqueue_define_fast(void *arg) 364 { 365 taskqueue_fast = malloc(sizeof(struct taskqueue), 366 M_TASKQUEUE, M_NOWAIT | M_ZERO); 367 if (!taskqueue_fast) { 368 printf("%s: Unable to allocate fast task queue!\n", __func__); 369 return; 370 } 371 372 STAILQ_INIT(&taskqueue_fast->tq_queue); 373 taskqueue_fast->tq_name = "fast"; 374 taskqueue_fast->tq_enqueue = taskqueue_fast_schedule; 375 mtx_init(&taskqueue_fast->tq_mutex, "taskqueue_fast", NULL, MTX_SPIN); 376 377 mtx_lock(&taskqueue_queues_mutex); 378 STAILQ_INSERT_TAIL(&taskqueue_queues, taskqueue_fast, tq_link); 379 mtx_unlock(&taskqueue_queues_mutex); 380 381 swi_add(NULL, "Fast task queue", taskqueue_fast_run, 382 NULL, SWI_TQ_FAST, 0, &taskqueue_fast_ih); 383 } 384 SYSINIT(taskqueue_fast, SI_SUB_CONFIGURE, SI_ORDER_SECOND, 385 taskqueue_define_fast, NULL); 386