1 /*- 2 * Copyright (c) 2000 Doug Rabson 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 */ 26 27 #include <sys/cdefs.h> 28 __FBSDID("$FreeBSD$"); 29 30 #include <sys/param.h> 31 #include <sys/systm.h> 32 #include <sys/bus.h> 33 #include <sys/interrupt.h> 34 #include <sys/kernel.h> 35 #include <sys/lock.h> 36 #include <sys/malloc.h> 37 #include <sys/mutex.h> 38 #include <sys/taskqueue.h> 39 #include <sys/kthread.h> 40 #include <sys/unistd.h> 41 42 static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues"); 43 44 static STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues; 45 46 static void *taskqueue_ih; 47 static void *taskqueue_giant_ih; 48 static struct mtx taskqueue_queues_mutex; 49 static struct proc *taskqueue_thread_proc; 50 51 struct taskqueue { 52 STAILQ_ENTRY(taskqueue) tq_link; 53 STAILQ_HEAD(, task) tq_queue; 54 const char *tq_name; 55 taskqueue_enqueue_fn tq_enqueue; 56 void *tq_context; 57 int tq_draining; 58 struct mtx tq_mutex; 59 }; 60 61 static void init_taskqueue_list(void *data); 62 63 static void 64 init_taskqueue_list(void *data __unused) 65 { 66 67 mtx_init(&taskqueue_queues_mutex, "taskqueue list", NULL, MTX_DEF); 68 STAILQ_INIT(&taskqueue_queues); 69 } 70 SYSINIT(taskqueue_list, SI_SUB_INTRINSIC, SI_ORDER_ANY, init_taskqueue_list, 71 NULL); 72 73 struct taskqueue * 74 taskqueue_create(const char *name, int mflags, 75 taskqueue_enqueue_fn enqueue, void *context) 76 { 77 struct taskqueue *queue; 78 79 queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO); 80 if (!queue) 81 return 0; 82 83 STAILQ_INIT(&queue->tq_queue); 84 queue->tq_name = name; 85 queue->tq_enqueue = enqueue; 86 queue->tq_context = context; 87 queue->tq_draining = 0; 88 mtx_init(&queue->tq_mutex, "taskqueue", NULL, MTX_DEF); 89 90 mtx_lock(&taskqueue_queues_mutex); 91 STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link); 92 mtx_unlock(&taskqueue_queues_mutex); 93 94 return queue; 95 } 96 97 void 98 taskqueue_free(struct taskqueue *queue) 99 { 100 101 mtx_lock(&queue->tq_mutex); 102 KASSERT(queue->tq_draining == 0, ("free'ing a draining taskqueue")); 103 queue->tq_draining = 1; 104 mtx_unlock(&queue->tq_mutex); 105 106 taskqueue_run(queue); 107 108 mtx_lock(&taskqueue_queues_mutex); 109 STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link); 110 mtx_unlock(&taskqueue_queues_mutex); 111 112 mtx_destroy(&queue->tq_mutex); 113 free(queue, M_TASKQUEUE); 114 } 115 116 /* 117 * Returns with the taskqueue locked. 118 */ 119 struct taskqueue * 120 taskqueue_find(const char *name) 121 { 122 struct taskqueue *queue; 123 124 mtx_lock(&taskqueue_queues_mutex); 125 STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) { 126 mtx_lock(&queue->tq_mutex); 127 if (!strcmp(queue->tq_name, name)) { 128 mtx_unlock(&taskqueue_queues_mutex); 129 return queue; 130 } 131 mtx_unlock(&queue->tq_mutex); 132 } 133 mtx_unlock(&taskqueue_queues_mutex); 134 return 0; 135 } 136 137 int 138 taskqueue_enqueue(struct taskqueue *queue, struct task *task) 139 { 140 struct task *ins; 141 struct task *prev; 142 143 mtx_lock(&queue->tq_mutex); 144 145 /* 146 * Don't allow new tasks on a queue which is being freed. 147 */ 148 if (queue->tq_draining) { 149 mtx_unlock(&queue->tq_mutex); 150 return EPIPE; 151 } 152 153 /* 154 * Count multiple enqueues. 155 */ 156 if (task->ta_pending) { 157 task->ta_pending++; 158 mtx_unlock(&queue->tq_mutex); 159 return 0; 160 } 161 162 /* 163 * Optimise the case when all tasks have the same priority. 164 */ 165 prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 166 if (!prev || prev->ta_priority >= task->ta_priority) { 167 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 168 } else { 169 prev = 0; 170 for (ins = STAILQ_FIRST(&queue->tq_queue); ins; 171 prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 172 if (ins->ta_priority < task->ta_priority) 173 break; 174 175 if (prev) 176 STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 177 else 178 STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 179 } 180 181 task->ta_pending = 1; 182 if (queue->tq_enqueue) 183 queue->tq_enqueue(queue->tq_context); 184 185 mtx_unlock(&queue->tq_mutex); 186 187 return 0; 188 } 189 190 void 191 taskqueue_run(struct taskqueue *queue) 192 { 193 struct task *task; 194 int pending; 195 196 mtx_lock(&queue->tq_mutex); 197 while (STAILQ_FIRST(&queue->tq_queue)) { 198 /* 199 * Carefully remove the first task from the queue and 200 * zero its pending count. 201 */ 202 task = STAILQ_FIRST(&queue->tq_queue); 203 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 204 pending = task->ta_pending; 205 task->ta_pending = 0; 206 mtx_unlock(&queue->tq_mutex); 207 208 task->ta_func(task->ta_context, pending); 209 210 mtx_lock(&queue->tq_mutex); 211 } 212 mtx_unlock(&queue->tq_mutex); 213 } 214 215 static void 216 taskqueue_swi_enqueue(void *context) 217 { 218 swi_sched(taskqueue_ih, 0); 219 } 220 221 static void 222 taskqueue_swi_run(void *dummy) 223 { 224 taskqueue_run(taskqueue_swi); 225 } 226 227 static void 228 taskqueue_swi_giant_enqueue(void *context) 229 { 230 swi_sched(taskqueue_giant_ih, 0); 231 } 232 233 static void 234 taskqueue_swi_giant_run(void *dummy) 235 { 236 taskqueue_run(taskqueue_swi_giant); 237 } 238 239 static void 240 taskqueue_kthread(void *arg) 241 { 242 struct mtx kthread_mutex; 243 244 bzero(&kthread_mutex, sizeof(kthread_mutex)); 245 246 mtx_init(&kthread_mutex, "taskqueue kthread", NULL, MTX_DEF); 247 248 mtx_lock(&kthread_mutex); 249 250 for (;;) { 251 mtx_unlock(&kthread_mutex); 252 taskqueue_run(taskqueue_thread); 253 mtx_lock(&kthread_mutex); 254 msleep(&taskqueue_thread, &kthread_mutex, PWAIT, "tqthr", 0); 255 } 256 } 257 258 static void 259 taskqueue_thread_enqueue(void *context) 260 { 261 wakeup(&taskqueue_thread); 262 } 263 264 TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0, 265 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ, 266 INTR_MPSAFE, &taskqueue_ih)); 267 268 TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, 0, 269 swi_add(NULL, "Giant task queue", taskqueue_swi_giant_run, 270 NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih)); 271 272 TASKQUEUE_DEFINE(thread, taskqueue_thread_enqueue, 0, 273 kthread_create(taskqueue_kthread, NULL, 274 &taskqueue_thread_proc, RFNOWAIT, 0, "taskqueue")); 275 276 int 277 taskqueue_enqueue_fast(struct taskqueue *queue, struct task *task) 278 { 279 struct task *ins; 280 struct task *prev; 281 282 mtx_lock_spin(&queue->tq_mutex); 283 284 /* 285 * Don't allow new tasks on a queue which is being freed. 286 */ 287 if (queue->tq_draining) { 288 mtx_unlock_spin(&queue->tq_mutex); 289 return EPIPE; 290 } 291 292 /* 293 * Count multiple enqueues. 294 */ 295 if (task->ta_pending) { 296 task->ta_pending++; 297 mtx_unlock_spin(&queue->tq_mutex); 298 return 0; 299 } 300 301 /* 302 * Optimise the case when all tasks have the same priority. 303 */ 304 prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 305 if (!prev || prev->ta_priority >= task->ta_priority) { 306 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 307 } else { 308 prev = 0; 309 for (ins = STAILQ_FIRST(&queue->tq_queue); ins; 310 prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 311 if (ins->ta_priority < task->ta_priority) 312 break; 313 314 if (prev) 315 STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 316 else 317 STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 318 } 319 320 task->ta_pending = 1; 321 if (queue->tq_enqueue) 322 queue->tq_enqueue(queue->tq_context); 323 324 mtx_unlock_spin(&queue->tq_mutex); 325 326 return 0; 327 } 328 329 static void 330 taskqueue_run_fast(struct taskqueue *queue) 331 { 332 struct task *task; 333 int pending; 334 335 mtx_lock_spin(&queue->tq_mutex); 336 while (STAILQ_FIRST(&queue->tq_queue)) { 337 /* 338 * Carefully remove the first task from the queue and 339 * zero its pending count. 340 */ 341 task = STAILQ_FIRST(&queue->tq_queue); 342 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 343 pending = task->ta_pending; 344 task->ta_pending = 0; 345 mtx_unlock_spin(&queue->tq_mutex); 346 347 task->ta_func(task->ta_context, pending); 348 349 mtx_lock_spin(&queue->tq_mutex); 350 } 351 mtx_unlock_spin(&queue->tq_mutex); 352 } 353 354 struct taskqueue *taskqueue_fast; 355 static void *taskqueue_fast_ih; 356 357 static void 358 taskqueue_fast_schedule(void *context) 359 { 360 swi_sched(taskqueue_fast_ih, 0); 361 } 362 363 static void 364 taskqueue_fast_run(void *dummy) 365 { 366 taskqueue_run_fast(taskqueue_fast); 367 } 368 369 static void 370 taskqueue_define_fast(void *arg) 371 { 372 taskqueue_fast = malloc(sizeof(struct taskqueue), 373 M_TASKQUEUE, M_NOWAIT | M_ZERO); 374 if (!taskqueue_fast) { 375 printf("%s: Unable to allocate fast task queue!\n", __func__); 376 return; 377 } 378 379 STAILQ_INIT(&taskqueue_fast->tq_queue); 380 taskqueue_fast->tq_name = "fast"; 381 taskqueue_fast->tq_enqueue = taskqueue_fast_schedule; 382 mtx_init(&taskqueue_fast->tq_mutex, "taskqueue_fast", NULL, MTX_SPIN); 383 384 mtx_lock(&taskqueue_queues_mutex); 385 STAILQ_INSERT_TAIL(&taskqueue_queues, taskqueue_fast, tq_link); 386 mtx_unlock(&taskqueue_queues_mutex); 387 388 swi_add(NULL, "Fast task queue", taskqueue_fast_run, 389 NULL, SWI_TQ_FAST, 0, &taskqueue_fast_ih); 390 } 391 SYSINIT(taskqueue_fast, SI_SUB_CONFIGURE, SI_ORDER_SECOND, 392 taskqueue_define_fast, NULL); 393