1 /*- 2 * Copyright (c) 2000 Doug Rabson 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 */ 26 27 #include <sys/cdefs.h> 28 __FBSDID("$FreeBSD$"); 29 30 #include <sys/param.h> 31 #include <sys/systm.h> 32 #include <sys/bus.h> 33 #include <sys/cpuset.h> 34 #include <sys/interrupt.h> 35 #include <sys/kernel.h> 36 #include <sys/kthread.h> 37 #include <sys/libkern.h> 38 #include <sys/limits.h> 39 #include <sys/lock.h> 40 #include <sys/malloc.h> 41 #include <sys/mutex.h> 42 #include <sys/proc.h> 43 #include <sys/sched.h> 44 #include <sys/smp.h> 45 #include <sys/taskqueue.h> 46 #include <sys/unistd.h> 47 #include <machine/stdarg.h> 48 49 static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues"); 50 static void *taskqueue_giant_ih; 51 static void *taskqueue_ih; 52 static void taskqueue_fast_enqueue(void *); 53 static void taskqueue_swi_enqueue(void *); 54 static void taskqueue_swi_giant_enqueue(void *); 55 56 struct taskqueue_busy { 57 struct task *tb_running; 58 TAILQ_ENTRY(taskqueue_busy) tb_link; 59 }; 60 61 struct task * const TB_DRAIN_WAITER = (struct task *)0x1; 62 63 struct taskqueue { 64 STAILQ_HEAD(, task) tq_queue; 65 taskqueue_enqueue_fn tq_enqueue; 66 void *tq_context; 67 char *tq_name; 68 TAILQ_HEAD(, taskqueue_busy) tq_active; 69 struct mtx tq_mutex; 70 struct thread **tq_threads; 71 struct thread *tq_curthread; 72 int tq_tcount; 73 int tq_spin; 74 int tq_flags; 75 int tq_callouts; 76 taskqueue_callback_fn tq_callbacks[TASKQUEUE_NUM_CALLBACKS]; 77 void *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS]; 78 }; 79 80 #define TQ_FLAGS_ACTIVE (1 << 0) 81 #define TQ_FLAGS_BLOCKED (1 << 1) 82 #define TQ_FLAGS_UNLOCKED_ENQUEUE (1 << 2) 83 84 #define DT_CALLOUT_ARMED (1 << 0) 85 86 #define TQ_LOCK(tq) \ 87 do { \ 88 if ((tq)->tq_spin) \ 89 mtx_lock_spin(&(tq)->tq_mutex); \ 90 else \ 91 mtx_lock(&(tq)->tq_mutex); \ 92 } while (0) 93 #define TQ_ASSERT_LOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_OWNED) 94 95 #define TQ_UNLOCK(tq) \ 96 do { \ 97 if ((tq)->tq_spin) \ 98 mtx_unlock_spin(&(tq)->tq_mutex); \ 99 else \ 100 mtx_unlock(&(tq)->tq_mutex); \ 101 } while (0) 102 #define TQ_ASSERT_UNLOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED) 103 104 void 105 _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task, 106 int priority, task_fn_t func, void *context) 107 { 108 109 TASK_INIT(&timeout_task->t, priority, func, context); 110 callout_init_mtx(&timeout_task->c, &queue->tq_mutex, 111 CALLOUT_RETURNUNLOCKED); 112 timeout_task->q = queue; 113 timeout_task->f = 0; 114 } 115 116 static __inline int 117 TQ_SLEEP(struct taskqueue *tq, void *p, struct mtx *m, int pri, const char *wm, 118 int t) 119 { 120 if (tq->tq_spin) 121 return (msleep_spin(p, m, wm, t)); 122 return (msleep(p, m, pri, wm, t)); 123 } 124 125 static struct taskqueue * 126 _taskqueue_create(const char *name, int mflags, 127 taskqueue_enqueue_fn enqueue, void *context, 128 int mtxflags, const char *mtxname __unused) 129 { 130 struct taskqueue *queue; 131 char *tq_name = NULL; 132 133 if (name != NULL) 134 tq_name = strndup(name, 32, M_TASKQUEUE); 135 if (tq_name == NULL) 136 tq_name = "taskqueue"; 137 138 queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO); 139 if (!queue) 140 return NULL; 141 142 STAILQ_INIT(&queue->tq_queue); 143 TAILQ_INIT(&queue->tq_active); 144 queue->tq_enqueue = enqueue; 145 queue->tq_context = context; 146 queue->tq_name = tq_name; 147 queue->tq_spin = (mtxflags & MTX_SPIN) != 0; 148 queue->tq_flags |= TQ_FLAGS_ACTIVE; 149 if (enqueue == taskqueue_fast_enqueue || 150 enqueue == taskqueue_swi_enqueue || 151 enqueue == taskqueue_swi_giant_enqueue || 152 enqueue == taskqueue_thread_enqueue) 153 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE; 154 mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags); 155 156 return queue; 157 } 158 159 struct taskqueue * 160 taskqueue_create(const char *name, int mflags, 161 taskqueue_enqueue_fn enqueue, void *context) 162 { 163 164 return _taskqueue_create(name, mflags, enqueue, context, 165 MTX_DEF, name); 166 } 167 168 void 169 taskqueue_set_callback(struct taskqueue *queue, 170 enum taskqueue_callback_type cb_type, taskqueue_callback_fn callback, 171 void *context) 172 { 173 174 KASSERT(((cb_type >= TASKQUEUE_CALLBACK_TYPE_MIN) && 175 (cb_type <= TASKQUEUE_CALLBACK_TYPE_MAX)), 176 ("Callback type %d not valid, must be %d-%d", cb_type, 177 TASKQUEUE_CALLBACK_TYPE_MIN, TASKQUEUE_CALLBACK_TYPE_MAX)); 178 KASSERT((queue->tq_callbacks[cb_type] == NULL), 179 ("Re-initialization of taskqueue callback?")); 180 181 queue->tq_callbacks[cb_type] = callback; 182 queue->tq_cb_contexts[cb_type] = context; 183 } 184 185 /* 186 * Signal a taskqueue thread to terminate. 187 */ 188 static void 189 taskqueue_terminate(struct thread **pp, struct taskqueue *tq) 190 { 191 192 while (tq->tq_tcount > 0 || tq->tq_callouts > 0) { 193 wakeup(tq); 194 TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0); 195 } 196 } 197 198 void 199 taskqueue_free(struct taskqueue *queue) 200 { 201 202 TQ_LOCK(queue); 203 queue->tq_flags &= ~TQ_FLAGS_ACTIVE; 204 taskqueue_terminate(queue->tq_threads, queue); 205 KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?")); 206 KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks")); 207 mtx_destroy(&queue->tq_mutex); 208 free(queue->tq_threads, M_TASKQUEUE); 209 free(queue->tq_name, M_TASKQUEUE); 210 free(queue, M_TASKQUEUE); 211 } 212 213 static int 214 taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task) 215 { 216 struct task *ins; 217 struct task *prev; 218 219 KASSERT(task->ta_func != NULL, ("enqueueing task with NULL func")); 220 /* 221 * Count multiple enqueues. 222 */ 223 if (task->ta_pending) { 224 if (task->ta_pending < UCHAR_MAX) 225 task->ta_pending++; 226 TQ_UNLOCK(queue); 227 return (0); 228 } 229 230 /* 231 * Optimise the case when all tasks have the same priority. 232 */ 233 prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 234 if (!prev || prev->ta_priority >= task->ta_priority) { 235 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 236 } else { 237 prev = NULL; 238 for (ins = STAILQ_FIRST(&queue->tq_queue); ins; 239 prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 240 if (ins->ta_priority < task->ta_priority) 241 break; 242 243 if (prev) 244 STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 245 else 246 STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 247 } 248 249 task->ta_pending = 1; 250 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0) 251 TQ_UNLOCK(queue); 252 if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) 253 queue->tq_enqueue(queue->tq_context); 254 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0) 255 TQ_UNLOCK(queue); 256 257 /* Return with lock released. */ 258 return (0); 259 } 260 261 int 262 grouptaskqueue_enqueue(struct taskqueue *queue, struct task *task) 263 { 264 TQ_LOCK(queue); 265 if (task->ta_pending) { 266 TQ_UNLOCK(queue); 267 return (0); 268 } 269 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 270 task->ta_pending = 1; 271 TQ_UNLOCK(queue); 272 if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) 273 queue->tq_enqueue(queue->tq_context); 274 return (0); 275 } 276 277 int 278 taskqueue_enqueue(struct taskqueue *queue, struct task *task) 279 { 280 int res; 281 282 TQ_LOCK(queue); 283 res = taskqueue_enqueue_locked(queue, task); 284 /* The lock is released inside. */ 285 286 return (res); 287 } 288 289 static void 290 taskqueue_timeout_func(void *arg) 291 { 292 struct taskqueue *queue; 293 struct timeout_task *timeout_task; 294 295 timeout_task = arg; 296 queue = timeout_task->q; 297 KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout")); 298 timeout_task->f &= ~DT_CALLOUT_ARMED; 299 queue->tq_callouts--; 300 taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t); 301 /* The lock is released inside. */ 302 } 303 304 int 305 taskqueue_enqueue_timeout(struct taskqueue *queue, 306 struct timeout_task *timeout_task, int ticks) 307 { 308 int res; 309 310 TQ_LOCK(queue); 311 KASSERT(timeout_task->q == NULL || timeout_task->q == queue, 312 ("Migrated queue")); 313 KASSERT(!queue->tq_spin, ("Timeout for spin-queue")); 314 timeout_task->q = queue; 315 res = timeout_task->t.ta_pending; 316 if (ticks == 0) { 317 taskqueue_enqueue_locked(queue, &timeout_task->t); 318 /* The lock is released inside. */ 319 } else { 320 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) { 321 res++; 322 } else { 323 queue->tq_callouts++; 324 timeout_task->f |= DT_CALLOUT_ARMED; 325 if (ticks < 0) 326 ticks = -ticks; /* Ignore overflow. */ 327 } 328 if (ticks > 0) { 329 callout_reset(&timeout_task->c, ticks, 330 taskqueue_timeout_func, timeout_task); 331 } 332 TQ_UNLOCK(queue); 333 } 334 return (res); 335 } 336 337 static void 338 taskqueue_task_nop_fn(void *context, int pending) 339 { 340 } 341 342 /* 343 * Block until all currently queued tasks in this taskqueue 344 * have begun execution. Tasks queued during execution of 345 * this function are ignored. 346 */ 347 static void 348 taskqueue_drain_tq_queue(struct taskqueue *queue) 349 { 350 struct task t_barrier; 351 352 if (STAILQ_EMPTY(&queue->tq_queue)) 353 return; 354 355 /* 356 * Enqueue our barrier after all current tasks, but with 357 * the highest priority so that newly queued tasks cannot 358 * pass it. Because of the high priority, we can not use 359 * taskqueue_enqueue_locked directly (which drops the lock 360 * anyway) so just insert it at tail while we have the 361 * queue lock. 362 */ 363 TASK_INIT(&t_barrier, USHRT_MAX, taskqueue_task_nop_fn, &t_barrier); 364 STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link); 365 t_barrier.ta_pending = 1; 366 367 /* 368 * Once the barrier has executed, all previously queued tasks 369 * have completed or are currently executing. 370 */ 371 while (t_barrier.ta_pending != 0) 372 TQ_SLEEP(queue, &t_barrier, &queue->tq_mutex, PWAIT, "-", 0); 373 } 374 375 /* 376 * Block until all currently executing tasks for this taskqueue 377 * complete. Tasks that begin execution during the execution 378 * of this function are ignored. 379 */ 380 static void 381 taskqueue_drain_tq_active(struct taskqueue *queue) 382 { 383 struct taskqueue_busy tb_marker, *tb_first; 384 385 if (TAILQ_EMPTY(&queue->tq_active)) 386 return; 387 388 /* Block taskq_terminate().*/ 389 queue->tq_callouts++; 390 391 /* 392 * Wait for all currently executing taskqueue threads 393 * to go idle. 394 */ 395 tb_marker.tb_running = TB_DRAIN_WAITER; 396 TAILQ_INSERT_TAIL(&queue->tq_active, &tb_marker, tb_link); 397 while (TAILQ_FIRST(&queue->tq_active) != &tb_marker) 398 TQ_SLEEP(queue, &tb_marker, &queue->tq_mutex, PWAIT, "-", 0); 399 TAILQ_REMOVE(&queue->tq_active, &tb_marker, tb_link); 400 401 /* 402 * Wakeup any other drain waiter that happened to queue up 403 * without any intervening active thread. 404 */ 405 tb_first = TAILQ_FIRST(&queue->tq_active); 406 if (tb_first != NULL && tb_first->tb_running == TB_DRAIN_WAITER) 407 wakeup(tb_first); 408 409 /* Release taskqueue_terminate(). */ 410 queue->tq_callouts--; 411 if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0) 412 wakeup_one(queue->tq_threads); 413 } 414 415 void 416 taskqueue_block(struct taskqueue *queue) 417 { 418 419 TQ_LOCK(queue); 420 queue->tq_flags |= TQ_FLAGS_BLOCKED; 421 TQ_UNLOCK(queue); 422 } 423 424 void 425 taskqueue_unblock(struct taskqueue *queue) 426 { 427 428 TQ_LOCK(queue); 429 queue->tq_flags &= ~TQ_FLAGS_BLOCKED; 430 if (!STAILQ_EMPTY(&queue->tq_queue)) 431 queue->tq_enqueue(queue->tq_context); 432 TQ_UNLOCK(queue); 433 } 434 435 static void 436 taskqueue_run_locked(struct taskqueue *queue) 437 { 438 struct taskqueue_busy tb; 439 struct taskqueue_busy *tb_first; 440 struct task *task; 441 int pending; 442 443 KASSERT(queue != NULL, ("tq is NULL")); 444 TQ_ASSERT_LOCKED(queue); 445 tb.tb_running = NULL; 446 447 while (STAILQ_FIRST(&queue->tq_queue)) { 448 TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link); 449 450 /* 451 * Carefully remove the first task from the queue and 452 * zero its pending count. 453 */ 454 task = STAILQ_FIRST(&queue->tq_queue); 455 KASSERT(task != NULL, ("task is NULL")); 456 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 457 pending = task->ta_pending; 458 task->ta_pending = 0; 459 tb.tb_running = task; 460 TQ_UNLOCK(queue); 461 462 KASSERT(task->ta_func != NULL, ("task->ta_func is NULL")); 463 task->ta_func(task->ta_context, pending); 464 465 TQ_LOCK(queue); 466 tb.tb_running = NULL; 467 if ((task->ta_flags & TASK_SKIP_WAKEUP) == 0) 468 wakeup(task); 469 470 TAILQ_REMOVE(&queue->tq_active, &tb, tb_link); 471 tb_first = TAILQ_FIRST(&queue->tq_active); 472 if (tb_first != NULL && 473 tb_first->tb_running == TB_DRAIN_WAITER) 474 wakeup(tb_first); 475 } 476 } 477 478 void 479 taskqueue_run(struct taskqueue *queue) 480 { 481 482 TQ_LOCK(queue); 483 queue->tq_curthread = curthread; 484 taskqueue_run_locked(queue); 485 queue->tq_curthread = NULL; 486 TQ_UNLOCK(queue); 487 } 488 489 static int 490 task_is_running(struct taskqueue *queue, struct task *task) 491 { 492 struct taskqueue_busy *tb; 493 494 TQ_ASSERT_LOCKED(queue); 495 TAILQ_FOREACH(tb, &queue->tq_active, tb_link) { 496 if (tb->tb_running == task) 497 return (1); 498 } 499 return (0); 500 } 501 502 static int 503 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task, 504 u_int *pendp) 505 { 506 507 if (task->ta_pending > 0) 508 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link); 509 if (pendp != NULL) 510 *pendp = task->ta_pending; 511 task->ta_pending = 0; 512 return (task_is_running(queue, task) ? EBUSY : 0); 513 } 514 515 int 516 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp) 517 { 518 int error; 519 520 TQ_LOCK(queue); 521 error = taskqueue_cancel_locked(queue, task, pendp); 522 TQ_UNLOCK(queue); 523 524 return (error); 525 } 526 527 int 528 taskqueue_cancel_timeout(struct taskqueue *queue, 529 struct timeout_task *timeout_task, u_int *pendp) 530 { 531 u_int pending, pending1; 532 int error; 533 534 TQ_LOCK(queue); 535 pending = !!(callout_stop(&timeout_task->c) > 0); 536 error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1); 537 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) { 538 timeout_task->f &= ~DT_CALLOUT_ARMED; 539 queue->tq_callouts--; 540 } 541 TQ_UNLOCK(queue); 542 543 if (pendp != NULL) 544 *pendp = pending + pending1; 545 return (error); 546 } 547 548 void 549 taskqueue_drain(struct taskqueue *queue, struct task *task) 550 { 551 552 if (!queue->tq_spin) 553 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 554 555 TQ_LOCK(queue); 556 while (task->ta_pending != 0 || task_is_running(queue, task)) 557 TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0); 558 TQ_UNLOCK(queue); 559 } 560 561 void 562 taskqueue_drain_all(struct taskqueue *queue) 563 { 564 565 if (!queue->tq_spin) 566 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 567 568 TQ_LOCK(queue); 569 taskqueue_drain_tq_queue(queue); 570 taskqueue_drain_tq_active(queue); 571 TQ_UNLOCK(queue); 572 } 573 574 void 575 taskqueue_drain_timeout(struct taskqueue *queue, 576 struct timeout_task *timeout_task) 577 { 578 579 callout_drain(&timeout_task->c); 580 taskqueue_drain(queue, &timeout_task->t); 581 } 582 583 static void 584 taskqueue_swi_enqueue(void *context) 585 { 586 swi_sched(taskqueue_ih, 0); 587 } 588 589 static void 590 taskqueue_swi_run(void *dummy) 591 { 592 taskqueue_run(taskqueue_swi); 593 } 594 595 static void 596 taskqueue_swi_giant_enqueue(void *context) 597 { 598 swi_sched(taskqueue_giant_ih, 0); 599 } 600 601 static void 602 taskqueue_swi_giant_run(void *dummy) 603 { 604 taskqueue_run(taskqueue_swi_giant); 605 } 606 607 static int 608 _taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, 609 cpuset_t *mask, const char *name, va_list ap) 610 { 611 char ktname[MAXCOMLEN + 1]; 612 struct thread *td; 613 struct taskqueue *tq; 614 int i, error; 615 616 if (count <= 0) 617 return (EINVAL); 618 619 vsnprintf(ktname, sizeof(ktname), name, ap); 620 tq = *tqp; 621 622 tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE, 623 M_NOWAIT | M_ZERO); 624 if (tq->tq_threads == NULL) { 625 printf("%s: no memory for %s threads\n", __func__, ktname); 626 return (ENOMEM); 627 } 628 629 for (i = 0; i < count; i++) { 630 if (count == 1) 631 error = kthread_add(taskqueue_thread_loop, tqp, NULL, 632 &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname); 633 else 634 error = kthread_add(taskqueue_thread_loop, tqp, NULL, 635 &tq->tq_threads[i], RFSTOPPED, 0, 636 "%s_%d", ktname, i); 637 if (error) { 638 /* should be ok to continue, taskqueue_free will dtrt */ 639 printf("%s: kthread_add(%s): error %d", __func__, 640 ktname, error); 641 tq->tq_threads[i] = NULL; /* paranoid */ 642 } else 643 tq->tq_tcount++; 644 } 645 for (i = 0; i < count; i++) { 646 if (tq->tq_threads[i] == NULL) 647 continue; 648 td = tq->tq_threads[i]; 649 if (mask) { 650 error = cpuset_setthread(td->td_tid, mask); 651 /* 652 * Failing to pin is rarely an actual fatal error; 653 * it'll just affect performance. 654 */ 655 if (error) 656 printf("%s: curthread=%llu: can't pin; " 657 "error=%d\n", 658 __func__, 659 (unsigned long long) td->td_tid, 660 error); 661 } 662 thread_lock(td); 663 sched_prio(td, pri); 664 sched_add(td, SRQ_BORING); 665 thread_unlock(td); 666 } 667 668 return (0); 669 } 670 671 int 672 taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, 673 const char *name, ...) 674 { 675 va_list ap; 676 int error; 677 678 va_start(ap, name); 679 error = _taskqueue_start_threads(tqp, count, pri, NULL, name, ap); 680 va_end(ap); 681 return (error); 682 } 683 684 int 685 taskqueue_start_threads_cpuset(struct taskqueue **tqp, int count, int pri, 686 cpuset_t *mask, const char *name, ...) 687 { 688 va_list ap; 689 int error; 690 691 va_start(ap, name); 692 error = _taskqueue_start_threads(tqp, count, pri, mask, name, ap); 693 va_end(ap); 694 return (error); 695 } 696 697 static inline void 698 taskqueue_run_callback(struct taskqueue *tq, 699 enum taskqueue_callback_type cb_type) 700 { 701 taskqueue_callback_fn tq_callback; 702 703 TQ_ASSERT_UNLOCKED(tq); 704 tq_callback = tq->tq_callbacks[cb_type]; 705 if (tq_callback != NULL) 706 tq_callback(tq->tq_cb_contexts[cb_type]); 707 } 708 709 void 710 taskqueue_thread_loop(void *arg) 711 { 712 struct taskqueue **tqp, *tq; 713 714 tqp = arg; 715 tq = *tqp; 716 taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT); 717 TQ_LOCK(tq); 718 tq->tq_curthread = curthread; 719 while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) { 720 /* XXX ? */ 721 taskqueue_run_locked(tq); 722 /* 723 * Because taskqueue_run() can drop tq_mutex, we need to 724 * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the 725 * meantime, which means we missed a wakeup. 726 */ 727 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0) 728 break; 729 TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0); 730 } 731 taskqueue_run_locked(tq); 732 tq->tq_curthread = NULL; 733 /* 734 * This thread is on its way out, so just drop the lock temporarily 735 * in order to call the shutdown callback. This allows the callback 736 * to look at the taskqueue, even just before it dies. 737 */ 738 TQ_UNLOCK(tq); 739 taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN); 740 TQ_LOCK(tq); 741 742 /* rendezvous with thread that asked us to terminate */ 743 tq->tq_tcount--; 744 wakeup_one(tq->tq_threads); 745 TQ_UNLOCK(tq); 746 kthread_exit(); 747 } 748 749 void 750 taskqueue_thread_enqueue(void *context) 751 { 752 struct taskqueue **tqp, *tq; 753 754 tqp = context; 755 tq = *tqp; 756 if (tq->tq_curthread != curthread) 757 wakeup_one(tq); 758 } 759 760 TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL, 761 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ, 762 INTR_MPSAFE, &taskqueue_ih)); 763 764 TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL, 765 swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run, 766 NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih)); 767 768 TASKQUEUE_DEFINE_THREAD(thread); 769 770 struct taskqueue * 771 taskqueue_create_fast(const char *name, int mflags, 772 taskqueue_enqueue_fn enqueue, void *context) 773 { 774 return _taskqueue_create(name, mflags, enqueue, context, 775 MTX_SPIN, "fast_taskqueue"); 776 } 777 778 static void *taskqueue_fast_ih; 779 780 static void 781 taskqueue_fast_enqueue(void *context) 782 { 783 swi_sched(taskqueue_fast_ih, 0); 784 } 785 786 static void 787 taskqueue_fast_run(void *dummy) 788 { 789 taskqueue_run(taskqueue_fast); 790 } 791 792 TASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, NULL, 793 swi_add(NULL, "fast taskq", taskqueue_fast_run, NULL, 794 SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih)); 795 796 int 797 taskqueue_member(struct taskqueue *queue, struct thread *td) 798 { 799 int i, j, ret = 0; 800 801 for (i = 0, j = 0; ; i++) { 802 if (queue->tq_threads[i] == NULL) 803 continue; 804 if (queue->tq_threads[i] == td) { 805 ret = 1; 806 break; 807 } 808 if (++j >= queue->tq_tcount) 809 break; 810 } 811 return (ret); 812 } 813 814 struct taskqgroup_cpu { 815 LIST_HEAD(, grouptask) tgc_tasks; 816 struct taskqueue *tgc_taskq; 817 int tgc_cnt; 818 int tgc_cpu; 819 }; 820 821 struct taskqgroup { 822 struct taskqgroup_cpu tqg_queue[MAXCPU]; 823 struct mtx tqg_lock; 824 char * tqg_name; 825 int tqg_adjusting; 826 int tqg_stride; 827 int tqg_cnt; 828 }; 829 830 struct taskq_bind_task { 831 struct task bt_task; 832 int bt_cpuid; 833 }; 834 835 static void 836 taskqgroup_cpu_create(struct taskqgroup *qgroup, int idx) 837 { 838 struct taskqgroup_cpu *qcpu; 839 840 qcpu = &qgroup->tqg_queue[idx]; 841 LIST_INIT(&qcpu->tgc_tasks); 842 qcpu->tgc_taskq = taskqueue_create_fast(NULL, M_WAITOK, 843 taskqueue_thread_enqueue, &qcpu->tgc_taskq); 844 taskqueue_start_threads(&qcpu->tgc_taskq, 1, PI_SOFT, 845 "%s_%d", qgroup->tqg_name, idx); 846 qcpu->tgc_cpu = idx * qgroup->tqg_stride; 847 } 848 849 static void 850 taskqgroup_cpu_remove(struct taskqgroup *qgroup, int idx) 851 { 852 853 taskqueue_free(qgroup->tqg_queue[idx].tgc_taskq); 854 } 855 856 /* 857 * Find the taskq with least # of tasks that doesn't currently have any 858 * other queues from the uniq identifier. 859 */ 860 static int 861 taskqgroup_find(struct taskqgroup *qgroup, void *uniq) 862 { 863 struct grouptask *n; 864 int i, idx, mincnt; 865 int strict; 866 867 mtx_assert(&qgroup->tqg_lock, MA_OWNED); 868 if (qgroup->tqg_cnt == 0) 869 return (0); 870 idx = -1; 871 mincnt = INT_MAX; 872 /* 873 * Two passes; First scan for a queue with the least tasks that 874 * does not already service this uniq id. If that fails simply find 875 * the queue with the least total tasks; 876 */ 877 for (strict = 1; mincnt == INT_MAX; strict = 0) { 878 for (i = 0; i < qgroup->tqg_cnt; i++) { 879 if (qgroup->tqg_queue[i].tgc_cnt > mincnt) 880 continue; 881 if (strict) { 882 LIST_FOREACH(n, 883 &qgroup->tqg_queue[i].tgc_tasks, gt_list) 884 if (n->gt_uniq == uniq) 885 break; 886 if (n != NULL) 887 continue; 888 } 889 mincnt = qgroup->tqg_queue[i].tgc_cnt; 890 idx = i; 891 } 892 } 893 if (idx == -1) 894 panic("taskqgroup_find: Failed to pick a qid."); 895 896 return (idx); 897 } 898 899 void 900 taskqgroup_attach(struct taskqgroup *qgroup, struct grouptask *gtask, 901 void *uniq, int irq, char *name) 902 { 903 cpuset_t mask; 904 int qid; 905 906 gtask->gt_uniq = uniq; 907 gtask->gt_name = name; 908 gtask->gt_irq = irq; 909 gtask->gt_cpu = -1; 910 mtx_lock(&qgroup->tqg_lock); 911 qid = taskqgroup_find(qgroup, uniq); 912 qgroup->tqg_queue[qid].tgc_cnt++; 913 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list); 914 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; 915 if (irq != -1 && smp_started) { 916 CPU_ZERO(&mask); 917 CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask); 918 mtx_unlock(&qgroup->tqg_lock); 919 intr_setaffinity(irq, &mask); 920 } else 921 mtx_unlock(&qgroup->tqg_lock); 922 } 923 924 int 925 taskqgroup_attach_cpu(struct taskqgroup *qgroup, struct grouptask *gtask, 926 void *uniq, int cpu, int irq, char *name) 927 { 928 cpuset_t mask; 929 int i, qid; 930 931 qid = -1; 932 gtask->gt_uniq = uniq; 933 gtask->gt_name = name; 934 gtask->gt_irq = irq; 935 gtask->gt_cpu = cpu; 936 mtx_lock(&qgroup->tqg_lock); 937 if (smp_started) { 938 for (i = 0; i < qgroup->tqg_cnt; i++) 939 if (qgroup->tqg_queue[i].tgc_cpu == cpu) { 940 qid = i; 941 break; 942 } 943 if (qid == -1) { 944 mtx_unlock(&qgroup->tqg_lock); 945 return (EINVAL); 946 } 947 } else 948 qid = 0; 949 qgroup->tqg_queue[qid].tgc_cnt++; 950 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list); 951 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; 952 if (irq != -1 && smp_started) { 953 CPU_ZERO(&mask); 954 CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask); 955 mtx_unlock(&qgroup->tqg_lock); 956 intr_setaffinity(irq, &mask); 957 } else 958 mtx_unlock(&qgroup->tqg_lock); 959 return (0); 960 } 961 962 void 963 taskqgroup_detach(struct taskqgroup *qgroup, struct grouptask *gtask) 964 { 965 int i; 966 967 mtx_lock(&qgroup->tqg_lock); 968 for (i = 0; i < qgroup->tqg_cnt; i++) 969 if (qgroup->tqg_queue[i].tgc_taskq == gtask->gt_taskqueue) 970 break; 971 if (i == qgroup->tqg_cnt) 972 panic("taskqgroup_detach: task not in group\n"); 973 qgroup->tqg_queue[i].tgc_cnt--; 974 LIST_REMOVE(gtask, gt_list); 975 mtx_unlock(&qgroup->tqg_lock); 976 gtask->gt_taskqueue = NULL; 977 } 978 979 static void 980 taskqgroup_binder(void *ctx, int pending) 981 { 982 struct taskq_bind_task *task = (struct taskq_bind_task *)ctx; 983 cpuset_t mask; 984 int error; 985 986 CPU_ZERO(&mask); 987 CPU_SET(task->bt_cpuid, &mask); 988 error = cpuset_setthread(curthread->td_tid, &mask); 989 thread_lock(curthread); 990 sched_bind(curthread, task->bt_cpuid); 991 thread_unlock(curthread); 992 993 if (error) 994 printf("taskqgroup_binder: setaffinity failed: %d\n", 995 error); 996 free(task, M_DEVBUF); 997 } 998 999 static void 1000 taskqgroup_bind(struct taskqgroup *qgroup) 1001 { 1002 struct taskq_bind_task *task; 1003 int i; 1004 1005 /* 1006 * Bind taskqueue threads to specific CPUs, if they have been assigned 1007 * one. 1008 */ 1009 for (i = 0; i < qgroup->tqg_cnt; i++) { 1010 task = malloc(sizeof (*task), M_DEVBUF, M_NOWAIT); 1011 TASK_INIT(&task->bt_task, 0, taskqgroup_binder, task); 1012 task->bt_cpuid = qgroup->tqg_queue[i].tgc_cpu; 1013 taskqueue_enqueue(qgroup->tqg_queue[i].tgc_taskq, 1014 &task->bt_task); 1015 } 1016 } 1017 1018 static int 1019 _taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride) 1020 { 1021 LIST_HEAD(, grouptask) gtask_head = LIST_HEAD_INITIALIZER(NULL); 1022 cpuset_t mask; 1023 struct grouptask *gtask; 1024 int i, old_cnt, qid; 1025 1026 mtx_assert(&qgroup->tqg_lock, MA_OWNED); 1027 1028 if (cnt < 1 || cnt * stride > mp_ncpus || !smp_started) { 1029 printf("taskqgroup_adjust failed cnt: %d stride: %d mp_ncpus: %d smp_started: %d\n", 1030 cnt, stride, mp_ncpus, smp_started); 1031 return (EINVAL); 1032 } 1033 if (qgroup->tqg_adjusting) { 1034 printf("taskqgroup_adjust failed: adjusting\n"); 1035 return (EBUSY); 1036 } 1037 qgroup->tqg_adjusting = 1; 1038 old_cnt = qgroup->tqg_cnt; 1039 mtx_unlock(&qgroup->tqg_lock); 1040 /* 1041 * Set up queue for tasks added before boot. 1042 */ 1043 if (old_cnt == 0) { 1044 LIST_SWAP(>ask_head, &qgroup->tqg_queue[0].tgc_tasks, 1045 grouptask, gt_list); 1046 qgroup->tqg_queue[0].tgc_cnt = 0; 1047 } 1048 1049 /* 1050 * If new taskq threads have been added. 1051 */ 1052 for (i = old_cnt; i < cnt; i++) 1053 taskqgroup_cpu_create(qgroup, i); 1054 mtx_lock(&qgroup->tqg_lock); 1055 qgroup->tqg_cnt = cnt; 1056 qgroup->tqg_stride = stride; 1057 1058 /* 1059 * Adjust drivers to use new taskqs. 1060 */ 1061 for (i = 0; i < old_cnt; i++) { 1062 while ((gtask = LIST_FIRST(&qgroup->tqg_queue[i].tgc_tasks))) { 1063 LIST_REMOVE(gtask, gt_list); 1064 qgroup->tqg_queue[i].tgc_cnt--; 1065 LIST_INSERT_HEAD(>ask_head, gtask, gt_list); 1066 } 1067 } 1068 1069 while ((gtask = LIST_FIRST(>ask_head))) { 1070 LIST_REMOVE(gtask, gt_list); 1071 if (gtask->gt_cpu == -1) 1072 qid = taskqgroup_find(qgroup, gtask->gt_uniq); 1073 else { 1074 for (i = 0; i < qgroup->tqg_cnt; i++) 1075 if (qgroup->tqg_queue[i].tgc_cpu == gtask->gt_cpu) { 1076 qid = i; 1077 break; 1078 } 1079 } 1080 qgroup->tqg_queue[qid].tgc_cnt++; 1081 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, 1082 gt_list); 1083 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; 1084 } 1085 /* 1086 * Set new CPU and IRQ affinity 1087 */ 1088 for (i = 0; i < cnt; i++) { 1089 qgroup->tqg_queue[i].tgc_cpu = i * qgroup->tqg_stride; 1090 CPU_ZERO(&mask); 1091 CPU_SET(qgroup->tqg_queue[i].tgc_cpu, &mask); 1092 LIST_FOREACH(gtask, &qgroup->tqg_queue[i].tgc_tasks, gt_list) { 1093 if (gtask->gt_irq == -1) 1094 continue; 1095 intr_setaffinity(gtask->gt_irq, &mask); 1096 } 1097 } 1098 mtx_unlock(&qgroup->tqg_lock); 1099 1100 /* 1101 * If taskq thread count has been reduced. 1102 */ 1103 for (i = cnt; i < old_cnt; i++) 1104 taskqgroup_cpu_remove(qgroup, i); 1105 1106 mtx_lock(&qgroup->tqg_lock); 1107 qgroup->tqg_adjusting = 0; 1108 1109 taskqgroup_bind(qgroup); 1110 1111 return (0); 1112 } 1113 1114 int 1115 taskqgroup_adjust(struct taskqgroup *qgroup, int cpu, int stride) 1116 { 1117 int error; 1118 1119 mtx_lock(&qgroup->tqg_lock); 1120 error = _taskqgroup_adjust(qgroup, cpu, stride); 1121 mtx_unlock(&qgroup->tqg_lock); 1122 1123 return (error); 1124 } 1125 1126 struct taskqgroup * 1127 taskqgroup_create(char *name) 1128 { 1129 struct taskqgroup *qgroup; 1130 1131 qgroup = malloc(sizeof(*qgroup), M_TASKQUEUE, M_WAITOK | M_ZERO); 1132 mtx_init(&qgroup->tqg_lock, "taskqgroup", NULL, MTX_DEF); 1133 qgroup->tqg_name = name; 1134 LIST_INIT(&qgroup->tqg_queue[0].tgc_tasks); 1135 1136 return (qgroup); 1137 } 1138 1139 void 1140 taskqgroup_destroy(struct taskqgroup *qgroup) 1141 { 1142 1143 } 1144