1 /*- 2 * Copyright (c) 2000 Doug Rabson 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 */ 26 27 #include <sys/cdefs.h> 28 __FBSDID("$FreeBSD$"); 29 30 #include <sys/param.h> 31 #include <sys/systm.h> 32 #include <sys/bus.h> 33 #include <sys/cpuset.h> 34 #include <sys/interrupt.h> 35 #include <sys/kernel.h> 36 #include <sys/kthread.h> 37 #include <sys/libkern.h> 38 #include <sys/limits.h> 39 #include <sys/lock.h> 40 #include <sys/malloc.h> 41 #include <sys/mutex.h> 42 #include <sys/proc.h> 43 #include <sys/sched.h> 44 #include <sys/smp.h> 45 #include <sys/taskqueue.h> 46 #include <sys/unistd.h> 47 #include <machine/stdarg.h> 48 49 static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues"); 50 static void *taskqueue_giant_ih; 51 static void *taskqueue_ih; 52 static void taskqueue_fast_enqueue(void *); 53 static void taskqueue_swi_enqueue(void *); 54 static void taskqueue_swi_giant_enqueue(void *); 55 56 struct taskqueue_busy { 57 struct task *tb_running; 58 TAILQ_ENTRY(taskqueue_busy) tb_link; 59 }; 60 61 struct task * const TB_DRAIN_WAITER = (struct task *)0x1; 62 63 struct taskqueue { 64 STAILQ_HEAD(, task) tq_queue; 65 taskqueue_enqueue_fn tq_enqueue; 66 void *tq_context; 67 char *tq_name; 68 TAILQ_HEAD(, taskqueue_busy) tq_active; 69 struct mtx tq_mutex; 70 struct thread **tq_threads; 71 int tq_tcount; 72 int tq_spin; 73 int tq_flags; 74 int tq_callouts; 75 taskqueue_callback_fn tq_callbacks[TASKQUEUE_NUM_CALLBACKS]; 76 void *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS]; 77 }; 78 79 #define TQ_FLAGS_ACTIVE (1 << 0) 80 #define TQ_FLAGS_BLOCKED (1 << 1) 81 #define TQ_FLAGS_UNLOCKED_ENQUEUE (1 << 2) 82 83 #define DT_CALLOUT_ARMED (1 << 0) 84 85 #define TQ_LOCK(tq) \ 86 do { \ 87 if ((tq)->tq_spin) \ 88 mtx_lock_spin(&(tq)->tq_mutex); \ 89 else \ 90 mtx_lock(&(tq)->tq_mutex); \ 91 } while (0) 92 #define TQ_ASSERT_LOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_OWNED) 93 94 #define TQ_UNLOCK(tq) \ 95 do { \ 96 if ((tq)->tq_spin) \ 97 mtx_unlock_spin(&(tq)->tq_mutex); \ 98 else \ 99 mtx_unlock(&(tq)->tq_mutex); \ 100 } while (0) 101 #define TQ_ASSERT_UNLOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED) 102 103 void 104 _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task, 105 int priority, task_fn_t func, void *context) 106 { 107 108 TASK_INIT(&timeout_task->t, priority, func, context); 109 callout_init_mtx(&timeout_task->c, &queue->tq_mutex, 110 CALLOUT_RETURNUNLOCKED); 111 timeout_task->q = queue; 112 timeout_task->f = 0; 113 } 114 115 static __inline int 116 TQ_SLEEP(struct taskqueue *tq, void *p, struct mtx *m, int pri, const char *wm, 117 int t) 118 { 119 if (tq->tq_spin) 120 return (msleep_spin(p, m, wm, t)); 121 return (msleep(p, m, pri, wm, t)); 122 } 123 124 static struct taskqueue * 125 _taskqueue_create(const char *name, int mflags, 126 taskqueue_enqueue_fn enqueue, void *context, 127 int mtxflags, const char *mtxname __unused) 128 { 129 struct taskqueue *queue; 130 char *tq_name; 131 132 tq_name = malloc(TASKQUEUE_NAMELEN, M_TASKQUEUE, mflags | M_ZERO); 133 if (tq_name == NULL) 134 return (NULL); 135 136 queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO); 137 if (queue == NULL) { 138 free(tq_name, M_TASKQUEUE); 139 return (NULL); 140 } 141 142 snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue"); 143 144 STAILQ_INIT(&queue->tq_queue); 145 TAILQ_INIT(&queue->tq_active); 146 queue->tq_enqueue = enqueue; 147 queue->tq_context = context; 148 queue->tq_name = tq_name; 149 queue->tq_spin = (mtxflags & MTX_SPIN) != 0; 150 queue->tq_flags |= TQ_FLAGS_ACTIVE; 151 if (enqueue == taskqueue_fast_enqueue || 152 enqueue == taskqueue_swi_enqueue || 153 enqueue == taskqueue_swi_giant_enqueue || 154 enqueue == taskqueue_thread_enqueue) 155 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE; 156 mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags); 157 158 return (queue); 159 } 160 161 struct taskqueue * 162 taskqueue_create(const char *name, int mflags, 163 taskqueue_enqueue_fn enqueue, void *context) 164 { 165 166 return _taskqueue_create(name, mflags, enqueue, context, 167 MTX_DEF, name); 168 } 169 170 void 171 taskqueue_set_callback(struct taskqueue *queue, 172 enum taskqueue_callback_type cb_type, taskqueue_callback_fn callback, 173 void *context) 174 { 175 176 KASSERT(((cb_type >= TASKQUEUE_CALLBACK_TYPE_MIN) && 177 (cb_type <= TASKQUEUE_CALLBACK_TYPE_MAX)), 178 ("Callback type %d not valid, must be %d-%d", cb_type, 179 TASKQUEUE_CALLBACK_TYPE_MIN, TASKQUEUE_CALLBACK_TYPE_MAX)); 180 KASSERT((queue->tq_callbacks[cb_type] == NULL), 181 ("Re-initialization of taskqueue callback?")); 182 183 queue->tq_callbacks[cb_type] = callback; 184 queue->tq_cb_contexts[cb_type] = context; 185 } 186 187 /* 188 * Signal a taskqueue thread to terminate. 189 */ 190 static void 191 taskqueue_terminate(struct thread **pp, struct taskqueue *tq) 192 { 193 194 while (tq->tq_tcount > 0 || tq->tq_callouts > 0) { 195 wakeup(tq); 196 TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0); 197 } 198 } 199 200 void 201 taskqueue_free(struct taskqueue *queue) 202 { 203 204 TQ_LOCK(queue); 205 queue->tq_flags &= ~TQ_FLAGS_ACTIVE; 206 taskqueue_terminate(queue->tq_threads, queue); 207 KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?")); 208 KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks")); 209 mtx_destroy(&queue->tq_mutex); 210 free(queue->tq_threads, M_TASKQUEUE); 211 free(queue->tq_name, M_TASKQUEUE); 212 free(queue, M_TASKQUEUE); 213 } 214 215 static int 216 taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task) 217 { 218 struct task *ins; 219 struct task *prev; 220 221 KASSERT(task->ta_func != NULL, ("enqueueing task with NULL func")); 222 /* 223 * Count multiple enqueues. 224 */ 225 if (task->ta_pending) { 226 if (task->ta_pending < USHRT_MAX) 227 task->ta_pending++; 228 TQ_UNLOCK(queue); 229 return (0); 230 } 231 232 /* 233 * Optimise the case when all tasks have the same priority. 234 */ 235 prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 236 if (!prev || prev->ta_priority >= task->ta_priority) { 237 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 238 } else { 239 prev = NULL; 240 for (ins = STAILQ_FIRST(&queue->tq_queue); ins; 241 prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 242 if (ins->ta_priority < task->ta_priority) 243 break; 244 245 if (prev) 246 STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 247 else 248 STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 249 } 250 251 task->ta_pending = 1; 252 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0) 253 TQ_UNLOCK(queue); 254 if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) 255 queue->tq_enqueue(queue->tq_context); 256 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0) 257 TQ_UNLOCK(queue); 258 259 /* Return with lock released. */ 260 return (0); 261 } 262 263 int 264 grouptaskqueue_enqueue(struct taskqueue *queue, struct task *task) 265 { 266 TQ_LOCK(queue); 267 if (task->ta_pending) { 268 TQ_UNLOCK(queue); 269 return (0); 270 } 271 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 272 task->ta_pending = 1; 273 TQ_UNLOCK(queue); 274 if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) 275 queue->tq_enqueue(queue->tq_context); 276 return (0); 277 } 278 279 int 280 taskqueue_enqueue(struct taskqueue *queue, struct task *task) 281 { 282 int res; 283 284 TQ_LOCK(queue); 285 res = taskqueue_enqueue_locked(queue, task); 286 /* The lock is released inside. */ 287 288 return (res); 289 } 290 291 static void 292 taskqueue_timeout_func(void *arg) 293 { 294 struct taskqueue *queue; 295 struct timeout_task *timeout_task; 296 297 timeout_task = arg; 298 queue = timeout_task->q; 299 KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout")); 300 timeout_task->f &= ~DT_CALLOUT_ARMED; 301 queue->tq_callouts--; 302 taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t); 303 /* The lock is released inside. */ 304 } 305 306 int 307 taskqueue_enqueue_timeout(struct taskqueue *queue, 308 struct timeout_task *timeout_task, int ticks) 309 { 310 int res; 311 312 TQ_LOCK(queue); 313 KASSERT(timeout_task->q == NULL || timeout_task->q == queue, 314 ("Migrated queue")); 315 KASSERT(!queue->tq_spin, ("Timeout for spin-queue")); 316 timeout_task->q = queue; 317 res = timeout_task->t.ta_pending; 318 if (ticks == 0) { 319 taskqueue_enqueue_locked(queue, &timeout_task->t); 320 /* The lock is released inside. */ 321 } else { 322 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) { 323 res++; 324 } else { 325 queue->tq_callouts++; 326 timeout_task->f |= DT_CALLOUT_ARMED; 327 if (ticks < 0) 328 ticks = -ticks; /* Ignore overflow. */ 329 } 330 if (ticks > 0) { 331 callout_reset(&timeout_task->c, ticks, 332 taskqueue_timeout_func, timeout_task); 333 } 334 TQ_UNLOCK(queue); 335 } 336 return (res); 337 } 338 339 static void 340 taskqueue_task_nop_fn(void *context, int pending) 341 { 342 } 343 344 /* 345 * Block until all currently queued tasks in this taskqueue 346 * have begun execution. Tasks queued during execution of 347 * this function are ignored. 348 */ 349 static void 350 taskqueue_drain_tq_queue(struct taskqueue *queue) 351 { 352 struct task t_barrier; 353 354 if (STAILQ_EMPTY(&queue->tq_queue)) 355 return; 356 357 /* 358 * Enqueue our barrier after all current tasks, but with 359 * the highest priority so that newly queued tasks cannot 360 * pass it. Because of the high priority, we can not use 361 * taskqueue_enqueue_locked directly (which drops the lock 362 * anyway) so just insert it at tail while we have the 363 * queue lock. 364 */ 365 TASK_INIT(&t_barrier, USHRT_MAX, taskqueue_task_nop_fn, &t_barrier); 366 STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link); 367 t_barrier.ta_pending = 1; 368 369 /* 370 * Once the barrier has executed, all previously queued tasks 371 * have completed or are currently executing. 372 */ 373 while (t_barrier.ta_pending != 0) 374 TQ_SLEEP(queue, &t_barrier, &queue->tq_mutex, PWAIT, "-", 0); 375 } 376 377 /* 378 * Block until all currently executing tasks for this taskqueue 379 * complete. Tasks that begin execution during the execution 380 * of this function are ignored. 381 */ 382 static void 383 taskqueue_drain_tq_active(struct taskqueue *queue) 384 { 385 struct taskqueue_busy tb_marker, *tb_first; 386 387 if (TAILQ_EMPTY(&queue->tq_active)) 388 return; 389 390 /* Block taskq_terminate().*/ 391 queue->tq_callouts++; 392 393 /* 394 * Wait for all currently executing taskqueue threads 395 * to go idle. 396 */ 397 tb_marker.tb_running = TB_DRAIN_WAITER; 398 TAILQ_INSERT_TAIL(&queue->tq_active, &tb_marker, tb_link); 399 while (TAILQ_FIRST(&queue->tq_active) != &tb_marker) 400 TQ_SLEEP(queue, &tb_marker, &queue->tq_mutex, PWAIT, "-", 0); 401 TAILQ_REMOVE(&queue->tq_active, &tb_marker, tb_link); 402 403 /* 404 * Wakeup any other drain waiter that happened to queue up 405 * without any intervening active thread. 406 */ 407 tb_first = TAILQ_FIRST(&queue->tq_active); 408 if (tb_first != NULL && tb_first->tb_running == TB_DRAIN_WAITER) 409 wakeup(tb_first); 410 411 /* Release taskqueue_terminate(). */ 412 queue->tq_callouts--; 413 if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0) 414 wakeup_one(queue->tq_threads); 415 } 416 417 void 418 taskqueue_block(struct taskqueue *queue) 419 { 420 421 TQ_LOCK(queue); 422 queue->tq_flags |= TQ_FLAGS_BLOCKED; 423 TQ_UNLOCK(queue); 424 } 425 426 void 427 taskqueue_unblock(struct taskqueue *queue) 428 { 429 430 TQ_LOCK(queue); 431 queue->tq_flags &= ~TQ_FLAGS_BLOCKED; 432 if (!STAILQ_EMPTY(&queue->tq_queue)) 433 queue->tq_enqueue(queue->tq_context); 434 TQ_UNLOCK(queue); 435 } 436 437 static void 438 taskqueue_run_locked(struct taskqueue *queue) 439 { 440 struct taskqueue_busy tb; 441 struct taskqueue_busy *tb_first; 442 struct task *task; 443 int pending; 444 445 KASSERT(queue != NULL, ("tq is NULL")); 446 TQ_ASSERT_LOCKED(queue); 447 tb.tb_running = NULL; 448 449 while (STAILQ_FIRST(&queue->tq_queue)) { 450 TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link); 451 452 /* 453 * Carefully remove the first task from the queue and 454 * zero its pending count. 455 */ 456 task = STAILQ_FIRST(&queue->tq_queue); 457 KASSERT(task != NULL, ("task is NULL")); 458 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 459 pending = task->ta_pending; 460 task->ta_pending = 0; 461 tb.tb_running = task; 462 TQ_UNLOCK(queue); 463 464 KASSERT(task->ta_func != NULL, ("task->ta_func is NULL")); 465 task->ta_func(task->ta_context, pending); 466 467 TQ_LOCK(queue); 468 tb.tb_running = NULL; 469 wakeup(task); 470 471 TAILQ_REMOVE(&queue->tq_active, &tb, tb_link); 472 tb_first = TAILQ_FIRST(&queue->tq_active); 473 if (tb_first != NULL && 474 tb_first->tb_running == TB_DRAIN_WAITER) 475 wakeup(tb_first); 476 } 477 } 478 479 void 480 taskqueue_run(struct taskqueue *queue) 481 { 482 483 TQ_LOCK(queue); 484 taskqueue_run_locked(queue); 485 TQ_UNLOCK(queue); 486 } 487 488 static int 489 task_is_running(struct taskqueue *queue, struct task *task) 490 { 491 struct taskqueue_busy *tb; 492 493 TQ_ASSERT_LOCKED(queue); 494 TAILQ_FOREACH(tb, &queue->tq_active, tb_link) { 495 if (tb->tb_running == task) 496 return (1); 497 } 498 return (0); 499 } 500 501 static int 502 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task, 503 u_int *pendp) 504 { 505 506 if (task->ta_pending > 0) 507 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link); 508 if (pendp != NULL) 509 *pendp = task->ta_pending; 510 task->ta_pending = 0; 511 return (task_is_running(queue, task) ? EBUSY : 0); 512 } 513 514 int 515 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp) 516 { 517 int error; 518 519 TQ_LOCK(queue); 520 error = taskqueue_cancel_locked(queue, task, pendp); 521 TQ_UNLOCK(queue); 522 523 return (error); 524 } 525 526 int 527 taskqueue_cancel_timeout(struct taskqueue *queue, 528 struct timeout_task *timeout_task, u_int *pendp) 529 { 530 u_int pending, pending1; 531 int error; 532 533 TQ_LOCK(queue); 534 pending = !!(callout_stop(&timeout_task->c) > 0); 535 error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1); 536 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) { 537 timeout_task->f &= ~DT_CALLOUT_ARMED; 538 queue->tq_callouts--; 539 } 540 TQ_UNLOCK(queue); 541 542 if (pendp != NULL) 543 *pendp = pending + pending1; 544 return (error); 545 } 546 547 void 548 taskqueue_drain(struct taskqueue *queue, struct task *task) 549 { 550 551 if (!queue->tq_spin) 552 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 553 554 TQ_LOCK(queue); 555 while (task->ta_pending != 0 || task_is_running(queue, task)) 556 TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0); 557 TQ_UNLOCK(queue); 558 } 559 560 void 561 taskqueue_drain_all(struct taskqueue *queue) 562 { 563 564 if (!queue->tq_spin) 565 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 566 567 TQ_LOCK(queue); 568 taskqueue_drain_tq_queue(queue); 569 taskqueue_drain_tq_active(queue); 570 TQ_UNLOCK(queue); 571 } 572 573 void 574 taskqueue_drain_timeout(struct taskqueue *queue, 575 struct timeout_task *timeout_task) 576 { 577 578 callout_drain(&timeout_task->c); 579 taskqueue_drain(queue, &timeout_task->t); 580 } 581 582 static void 583 taskqueue_swi_enqueue(void *context) 584 { 585 swi_sched(taskqueue_ih, 0); 586 } 587 588 static void 589 taskqueue_swi_run(void *dummy) 590 { 591 taskqueue_run(taskqueue_swi); 592 } 593 594 static void 595 taskqueue_swi_giant_enqueue(void *context) 596 { 597 swi_sched(taskqueue_giant_ih, 0); 598 } 599 600 static void 601 taskqueue_swi_giant_run(void *dummy) 602 { 603 taskqueue_run(taskqueue_swi_giant); 604 } 605 606 static int 607 _taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, 608 cpuset_t *mask, const char *name, va_list ap) 609 { 610 char ktname[MAXCOMLEN + 1]; 611 struct thread *td; 612 struct taskqueue *tq; 613 int i, error; 614 615 if (count <= 0) 616 return (EINVAL); 617 618 vsnprintf(ktname, sizeof(ktname), name, ap); 619 tq = *tqp; 620 621 tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE, 622 M_NOWAIT | M_ZERO); 623 if (tq->tq_threads == NULL) { 624 printf("%s: no memory for %s threads\n", __func__, ktname); 625 return (ENOMEM); 626 } 627 628 for (i = 0; i < count; i++) { 629 if (count == 1) 630 error = kthread_add(taskqueue_thread_loop, tqp, NULL, 631 &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname); 632 else 633 error = kthread_add(taskqueue_thread_loop, tqp, NULL, 634 &tq->tq_threads[i], RFSTOPPED, 0, 635 "%s_%d", ktname, i); 636 if (error) { 637 /* should be ok to continue, taskqueue_free will dtrt */ 638 printf("%s: kthread_add(%s): error %d", __func__, 639 ktname, error); 640 tq->tq_threads[i] = NULL; /* paranoid */ 641 } else 642 tq->tq_tcount++; 643 } 644 for (i = 0; i < count; i++) { 645 if (tq->tq_threads[i] == NULL) 646 continue; 647 td = tq->tq_threads[i]; 648 if (mask) { 649 error = cpuset_setthread(td->td_tid, mask); 650 /* 651 * Failing to pin is rarely an actual fatal error; 652 * it'll just affect performance. 653 */ 654 if (error) 655 printf("%s: curthread=%llu: can't pin; " 656 "error=%d\n", 657 __func__, 658 (unsigned long long) td->td_tid, 659 error); 660 } 661 thread_lock(td); 662 sched_prio(td, pri); 663 sched_add(td, SRQ_BORING); 664 thread_unlock(td); 665 } 666 667 return (0); 668 } 669 670 int 671 taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, 672 const char *name, ...) 673 { 674 va_list ap; 675 int error; 676 677 va_start(ap, name); 678 error = _taskqueue_start_threads(tqp, count, pri, NULL, name, ap); 679 va_end(ap); 680 return (error); 681 } 682 683 int 684 taskqueue_start_threads_cpuset(struct taskqueue **tqp, int count, int pri, 685 cpuset_t *mask, const char *name, ...) 686 { 687 va_list ap; 688 int error; 689 690 va_start(ap, name); 691 error = _taskqueue_start_threads(tqp, count, pri, mask, name, ap); 692 va_end(ap); 693 return (error); 694 } 695 696 static inline void 697 taskqueue_run_callback(struct taskqueue *tq, 698 enum taskqueue_callback_type cb_type) 699 { 700 taskqueue_callback_fn tq_callback; 701 702 TQ_ASSERT_UNLOCKED(tq); 703 tq_callback = tq->tq_callbacks[cb_type]; 704 if (tq_callback != NULL) 705 tq_callback(tq->tq_cb_contexts[cb_type]); 706 } 707 708 void 709 taskqueue_thread_loop(void *arg) 710 { 711 struct taskqueue **tqp, *tq; 712 713 tqp = arg; 714 tq = *tqp; 715 taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT); 716 TQ_LOCK(tq); 717 while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) { 718 /* XXX ? */ 719 taskqueue_run_locked(tq); 720 /* 721 * Because taskqueue_run() can drop tq_mutex, we need to 722 * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the 723 * meantime, which means we missed a wakeup. 724 */ 725 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0) 726 break; 727 TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0); 728 } 729 taskqueue_run_locked(tq); 730 /* 731 * This thread is on its way out, so just drop the lock temporarily 732 * in order to call the shutdown callback. This allows the callback 733 * to look at the taskqueue, even just before it dies. 734 */ 735 TQ_UNLOCK(tq); 736 taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN); 737 TQ_LOCK(tq); 738 739 /* rendezvous with thread that asked us to terminate */ 740 tq->tq_tcount--; 741 wakeup_one(tq->tq_threads); 742 TQ_UNLOCK(tq); 743 kthread_exit(); 744 } 745 746 void 747 taskqueue_thread_enqueue(void *context) 748 { 749 struct taskqueue **tqp, *tq; 750 751 tqp = context; 752 tq = *tqp; 753 wakeup_one(tq); 754 } 755 756 TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL, 757 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ, 758 INTR_MPSAFE, &taskqueue_ih)); 759 760 TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL, 761 swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run, 762 NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih)); 763 764 TASKQUEUE_DEFINE_THREAD(thread); 765 766 struct taskqueue * 767 taskqueue_create_fast(const char *name, int mflags, 768 taskqueue_enqueue_fn enqueue, void *context) 769 { 770 return _taskqueue_create(name, mflags, enqueue, context, 771 MTX_SPIN, "fast_taskqueue"); 772 } 773 774 static void *taskqueue_fast_ih; 775 776 static void 777 taskqueue_fast_enqueue(void *context) 778 { 779 swi_sched(taskqueue_fast_ih, 0); 780 } 781 782 static void 783 taskqueue_fast_run(void *dummy) 784 { 785 taskqueue_run(taskqueue_fast); 786 } 787 788 TASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, NULL, 789 swi_add(NULL, "fast taskq", taskqueue_fast_run, NULL, 790 SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih)); 791 792 int 793 taskqueue_member(struct taskqueue *queue, struct thread *td) 794 { 795 int i, j, ret = 0; 796 797 for (i = 0, j = 0; ; i++) { 798 if (queue->tq_threads[i] == NULL) 799 continue; 800 if (queue->tq_threads[i] == td) { 801 ret = 1; 802 break; 803 } 804 if (++j >= queue->tq_tcount) 805 break; 806 } 807 return (ret); 808 } 809 810 struct taskqgroup_cpu { 811 LIST_HEAD(, grouptask) tgc_tasks; 812 struct taskqueue *tgc_taskq; 813 int tgc_cnt; 814 int tgc_cpu; 815 }; 816 817 struct taskqgroup { 818 struct taskqgroup_cpu tqg_queue[MAXCPU]; 819 struct mtx tqg_lock; 820 char * tqg_name; 821 int tqg_adjusting; 822 int tqg_stride; 823 int tqg_cnt; 824 }; 825 826 struct taskq_bind_task { 827 struct task bt_task; 828 int bt_cpuid; 829 }; 830 831 static void 832 taskqgroup_cpu_create(struct taskqgroup *qgroup, int idx) 833 { 834 struct taskqgroup_cpu *qcpu; 835 836 qcpu = &qgroup->tqg_queue[idx]; 837 LIST_INIT(&qcpu->tgc_tasks); 838 qcpu->tgc_taskq = taskqueue_create_fast(NULL, M_WAITOK, 839 taskqueue_thread_enqueue, &qcpu->tgc_taskq); 840 taskqueue_start_threads(&qcpu->tgc_taskq, 1, PI_SOFT, 841 "%s_%d", qgroup->tqg_name, idx); 842 qcpu->tgc_cpu = idx * qgroup->tqg_stride; 843 } 844 845 static void 846 taskqgroup_cpu_remove(struct taskqgroup *qgroup, int idx) 847 { 848 849 taskqueue_free(qgroup->tqg_queue[idx].tgc_taskq); 850 } 851 852 /* 853 * Find the taskq with least # of tasks that doesn't currently have any 854 * other queues from the uniq identifier. 855 */ 856 static int 857 taskqgroup_find(struct taskqgroup *qgroup, void *uniq) 858 { 859 struct grouptask *n; 860 int i, idx, mincnt; 861 int strict; 862 863 mtx_assert(&qgroup->tqg_lock, MA_OWNED); 864 if (qgroup->tqg_cnt == 0) 865 return (0); 866 idx = -1; 867 mincnt = INT_MAX; 868 /* 869 * Two passes; First scan for a queue with the least tasks that 870 * does not already service this uniq id. If that fails simply find 871 * the queue with the least total tasks; 872 */ 873 for (strict = 1; mincnt == INT_MAX; strict = 0) { 874 for (i = 0; i < qgroup->tqg_cnt; i++) { 875 if (qgroup->tqg_queue[i].tgc_cnt > mincnt) 876 continue; 877 if (strict) { 878 LIST_FOREACH(n, 879 &qgroup->tqg_queue[i].tgc_tasks, gt_list) 880 if (n->gt_uniq == uniq) 881 break; 882 if (n != NULL) 883 continue; 884 } 885 mincnt = qgroup->tqg_queue[i].tgc_cnt; 886 idx = i; 887 } 888 } 889 if (idx == -1) 890 panic("taskqgroup_find: Failed to pick a qid."); 891 892 return (idx); 893 } 894 895 void 896 taskqgroup_attach(struct taskqgroup *qgroup, struct grouptask *gtask, 897 void *uniq, int irq, char *name) 898 { 899 cpuset_t mask; 900 int qid; 901 902 gtask->gt_uniq = uniq; 903 gtask->gt_name = name; 904 gtask->gt_irq = irq; 905 gtask->gt_cpu = -1; 906 mtx_lock(&qgroup->tqg_lock); 907 qid = taskqgroup_find(qgroup, uniq); 908 qgroup->tqg_queue[qid].tgc_cnt++; 909 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list); 910 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; 911 if (irq != -1 && smp_started) { 912 CPU_ZERO(&mask); 913 CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask); 914 mtx_unlock(&qgroup->tqg_lock); 915 intr_setaffinity(irq, &mask); 916 } else 917 mtx_unlock(&qgroup->tqg_lock); 918 } 919 920 int 921 taskqgroup_attach_cpu(struct taskqgroup *qgroup, struct grouptask *gtask, 922 void *uniq, int cpu, int irq, char *name) 923 { 924 cpuset_t mask; 925 int i, qid; 926 927 qid = -1; 928 gtask->gt_uniq = uniq; 929 gtask->gt_name = name; 930 gtask->gt_irq = irq; 931 gtask->gt_cpu = cpu; 932 mtx_lock(&qgroup->tqg_lock); 933 if (smp_started) { 934 for (i = 0; i < qgroup->tqg_cnt; i++) 935 if (qgroup->tqg_queue[i].tgc_cpu == cpu) { 936 qid = i; 937 break; 938 } 939 if (qid == -1) { 940 mtx_unlock(&qgroup->tqg_lock); 941 return (EINVAL); 942 } 943 } else 944 qid = 0; 945 qgroup->tqg_queue[qid].tgc_cnt++; 946 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list); 947 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; 948 if (irq != -1 && smp_started) { 949 CPU_ZERO(&mask); 950 CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask); 951 mtx_unlock(&qgroup->tqg_lock); 952 intr_setaffinity(irq, &mask); 953 } else 954 mtx_unlock(&qgroup->tqg_lock); 955 return (0); 956 } 957 958 void 959 taskqgroup_detach(struct taskqgroup *qgroup, struct grouptask *gtask) 960 { 961 int i; 962 963 mtx_lock(&qgroup->tqg_lock); 964 for (i = 0; i < qgroup->tqg_cnt; i++) 965 if (qgroup->tqg_queue[i].tgc_taskq == gtask->gt_taskqueue) 966 break; 967 if (i == qgroup->tqg_cnt) 968 panic("taskqgroup_detach: task not in group\n"); 969 qgroup->tqg_queue[i].tgc_cnt--; 970 LIST_REMOVE(gtask, gt_list); 971 mtx_unlock(&qgroup->tqg_lock); 972 gtask->gt_taskqueue = NULL; 973 } 974 975 static void 976 taskqgroup_binder(void *ctx, int pending) 977 { 978 struct taskq_bind_task *task = (struct taskq_bind_task *)ctx; 979 cpuset_t mask; 980 int error; 981 982 CPU_ZERO(&mask); 983 CPU_SET(task->bt_cpuid, &mask); 984 error = cpuset_setthread(curthread->td_tid, &mask); 985 thread_lock(curthread); 986 sched_bind(curthread, task->bt_cpuid); 987 thread_unlock(curthread); 988 989 if (error) 990 printf("taskqgroup_binder: setaffinity failed: %d\n", 991 error); 992 free(task, M_DEVBUF); 993 } 994 995 static void 996 taskqgroup_bind(struct taskqgroup *qgroup) 997 { 998 struct taskq_bind_task *task; 999 int i; 1000 1001 /* 1002 * Bind taskqueue threads to specific CPUs, if they have been assigned 1003 * one. 1004 */ 1005 for (i = 0; i < qgroup->tqg_cnt; i++) { 1006 task = malloc(sizeof (*task), M_DEVBUF, M_NOWAIT); 1007 TASK_INIT(&task->bt_task, 0, taskqgroup_binder, task); 1008 task->bt_cpuid = qgroup->tqg_queue[i].tgc_cpu; 1009 taskqueue_enqueue(qgroup->tqg_queue[i].tgc_taskq, 1010 &task->bt_task); 1011 } 1012 } 1013 1014 static int 1015 _taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride) 1016 { 1017 LIST_HEAD(, grouptask) gtask_head = LIST_HEAD_INITIALIZER(NULL); 1018 cpuset_t mask; 1019 struct grouptask *gtask; 1020 int i, old_cnt, qid; 1021 1022 mtx_assert(&qgroup->tqg_lock, MA_OWNED); 1023 1024 if (cnt < 1 || cnt * stride > mp_ncpus || !smp_started) { 1025 printf("taskqgroup_adjust failed cnt: %d stride: %d mp_ncpus: %d smp_started: %d\n", 1026 cnt, stride, mp_ncpus, smp_started); 1027 return (EINVAL); 1028 } 1029 if (qgroup->tqg_adjusting) { 1030 printf("taskqgroup_adjust failed: adjusting\n"); 1031 return (EBUSY); 1032 } 1033 qgroup->tqg_adjusting = 1; 1034 old_cnt = qgroup->tqg_cnt; 1035 mtx_unlock(&qgroup->tqg_lock); 1036 /* 1037 * Set up queue for tasks added before boot. 1038 */ 1039 if (old_cnt == 0) { 1040 LIST_SWAP(>ask_head, &qgroup->tqg_queue[0].tgc_tasks, 1041 grouptask, gt_list); 1042 qgroup->tqg_queue[0].tgc_cnt = 0; 1043 } 1044 1045 /* 1046 * If new taskq threads have been added. 1047 */ 1048 for (i = old_cnt; i < cnt; i++) 1049 taskqgroup_cpu_create(qgroup, i); 1050 mtx_lock(&qgroup->tqg_lock); 1051 qgroup->tqg_cnt = cnt; 1052 qgroup->tqg_stride = stride; 1053 1054 /* 1055 * Adjust drivers to use new taskqs. 1056 */ 1057 for (i = 0; i < old_cnt; i++) { 1058 while ((gtask = LIST_FIRST(&qgroup->tqg_queue[i].tgc_tasks))) { 1059 LIST_REMOVE(gtask, gt_list); 1060 qgroup->tqg_queue[i].tgc_cnt--; 1061 LIST_INSERT_HEAD(>ask_head, gtask, gt_list); 1062 } 1063 } 1064 1065 while ((gtask = LIST_FIRST(>ask_head))) { 1066 LIST_REMOVE(gtask, gt_list); 1067 if (gtask->gt_cpu == -1) 1068 qid = taskqgroup_find(qgroup, gtask->gt_uniq); 1069 else { 1070 for (i = 0; i < qgroup->tqg_cnt; i++) 1071 if (qgroup->tqg_queue[i].tgc_cpu == gtask->gt_cpu) { 1072 qid = i; 1073 break; 1074 } 1075 } 1076 qgroup->tqg_queue[qid].tgc_cnt++; 1077 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, 1078 gt_list); 1079 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; 1080 } 1081 /* 1082 * Set new CPU and IRQ affinity 1083 */ 1084 for (i = 0; i < cnt; i++) { 1085 qgroup->tqg_queue[i].tgc_cpu = i * qgroup->tqg_stride; 1086 CPU_ZERO(&mask); 1087 CPU_SET(qgroup->tqg_queue[i].tgc_cpu, &mask); 1088 LIST_FOREACH(gtask, &qgroup->tqg_queue[i].tgc_tasks, gt_list) { 1089 if (gtask->gt_irq == -1) 1090 continue; 1091 intr_setaffinity(gtask->gt_irq, &mask); 1092 } 1093 } 1094 mtx_unlock(&qgroup->tqg_lock); 1095 1096 /* 1097 * If taskq thread count has been reduced. 1098 */ 1099 for (i = cnt; i < old_cnt; i++) 1100 taskqgroup_cpu_remove(qgroup, i); 1101 1102 mtx_lock(&qgroup->tqg_lock); 1103 qgroup->tqg_adjusting = 0; 1104 1105 taskqgroup_bind(qgroup); 1106 1107 return (0); 1108 } 1109 1110 int 1111 taskqgroup_adjust(struct taskqgroup *qgroup, int cpu, int stride) 1112 { 1113 int error; 1114 1115 mtx_lock(&qgroup->tqg_lock); 1116 error = _taskqgroup_adjust(qgroup, cpu, stride); 1117 mtx_unlock(&qgroup->tqg_lock); 1118 1119 return (error); 1120 } 1121 1122 struct taskqgroup * 1123 taskqgroup_create(char *name) 1124 { 1125 struct taskqgroup *qgroup; 1126 1127 qgroup = malloc(sizeof(*qgroup), M_TASKQUEUE, M_WAITOK | M_ZERO); 1128 mtx_init(&qgroup->tqg_lock, "taskqgroup", NULL, MTX_DEF); 1129 qgroup->tqg_name = name; 1130 LIST_INIT(&qgroup->tqg_queue[0].tgc_tasks); 1131 1132 return (qgroup); 1133 } 1134 1135 void 1136 taskqgroup_destroy(struct taskqgroup *qgroup) 1137 { 1138 1139 } 1140