1 /*- 2 * SPDX-License-Identifier: BSD-2-Clause 3 * 4 * Copyright (c) 2000 Doug Rabson 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26 * SUCH DAMAGE. 27 */ 28 29 #include <sys/cdefs.h> 30 #include <sys/param.h> 31 #include <sys/systm.h> 32 #include <sys/bus.h> 33 #include <sys/cpuset.h> 34 #include <sys/interrupt.h> 35 #include <sys/kernel.h> 36 #include <sys/kthread.h> 37 #include <sys/libkern.h> 38 #include <sys/limits.h> 39 #include <sys/lock.h> 40 #include <sys/malloc.h> 41 #include <sys/mutex.h> 42 #include <sys/proc.h> 43 #include <sys/epoch.h> 44 #include <sys/sched.h> 45 #include <sys/smp.h> 46 #include <sys/taskqueue.h> 47 #include <sys/unistd.h> 48 #include <machine/stdarg.h> 49 50 static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues"); 51 static void *taskqueue_giant_ih; 52 static void *taskqueue_ih; 53 static void taskqueue_fast_enqueue(void *); 54 static void taskqueue_swi_enqueue(void *); 55 static void taskqueue_swi_giant_enqueue(void *); 56 57 struct taskqueue_busy { 58 struct task *tb_running; 59 u_int tb_seq; 60 bool tb_canceling; 61 LIST_ENTRY(taskqueue_busy) tb_link; 62 }; 63 64 struct taskqueue { 65 STAILQ_HEAD(, task) tq_queue; 66 LIST_HEAD(, taskqueue_busy) tq_active; 67 struct task *tq_hint; 68 u_int tq_seq; 69 int tq_callouts; 70 struct mtx_padalign tq_mutex; 71 taskqueue_enqueue_fn tq_enqueue; 72 void *tq_context; 73 char *tq_name; 74 struct thread **tq_threads; 75 int tq_tcount; 76 int tq_spin; 77 int tq_flags; 78 taskqueue_callback_fn tq_callbacks[TASKQUEUE_NUM_CALLBACKS]; 79 void *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS]; 80 }; 81 82 #define TQ_FLAGS_ACTIVE (1 << 0) 83 #define TQ_FLAGS_BLOCKED (1 << 1) 84 #define TQ_FLAGS_UNLOCKED_ENQUEUE (1 << 2) 85 86 #define DT_CALLOUT_ARMED (1 << 0) 87 #define DT_DRAIN_IN_PROGRESS (1 << 1) 88 89 #define TQ_LOCK(tq) \ 90 do { \ 91 if ((tq)->tq_spin) \ 92 mtx_lock_spin(&(tq)->tq_mutex); \ 93 else \ 94 mtx_lock(&(tq)->tq_mutex); \ 95 } while (0) 96 #define TQ_ASSERT_LOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_OWNED) 97 98 #define TQ_UNLOCK(tq) \ 99 do { \ 100 if ((tq)->tq_spin) \ 101 mtx_unlock_spin(&(tq)->tq_mutex); \ 102 else \ 103 mtx_unlock(&(tq)->tq_mutex); \ 104 } while (0) 105 #define TQ_ASSERT_UNLOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED) 106 107 void 108 _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task, 109 int priority, task_fn_t func, void *context) 110 { 111 112 TASK_INIT(&timeout_task->t, priority, func, context); 113 callout_init_mtx(&timeout_task->c, &queue->tq_mutex, 114 CALLOUT_RETURNUNLOCKED); 115 timeout_task->q = queue; 116 timeout_task->f = 0; 117 } 118 119 static __inline int 120 TQ_SLEEP(struct taskqueue *tq, void *p, const char *wm) 121 { 122 if (tq->tq_spin) 123 return (msleep_spin(p, (struct mtx *)&tq->tq_mutex, wm, 0)); 124 return (msleep(p, &tq->tq_mutex, 0, wm, 0)); 125 } 126 127 static struct taskqueue_busy * 128 task_get_busy(struct taskqueue *queue, struct task *task) 129 { 130 struct taskqueue_busy *tb; 131 132 TQ_ASSERT_LOCKED(queue); 133 LIST_FOREACH(tb, &queue->tq_active, tb_link) { 134 if (tb->tb_running == task) 135 return (tb); 136 } 137 return (NULL); 138 } 139 140 static struct taskqueue * 141 _taskqueue_create(const char *name, int mflags, 142 taskqueue_enqueue_fn enqueue, void *context, 143 int mtxflags, const char *mtxname __unused) 144 { 145 struct taskqueue *queue; 146 char *tq_name; 147 148 tq_name = malloc(TASKQUEUE_NAMELEN, M_TASKQUEUE, mflags | M_ZERO); 149 if (tq_name == NULL) 150 return (NULL); 151 152 queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO); 153 if (queue == NULL) { 154 free(tq_name, M_TASKQUEUE); 155 return (NULL); 156 } 157 158 snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue"); 159 160 STAILQ_INIT(&queue->tq_queue); 161 LIST_INIT(&queue->tq_active); 162 queue->tq_enqueue = enqueue; 163 queue->tq_context = context; 164 queue->tq_name = tq_name; 165 queue->tq_spin = (mtxflags & MTX_SPIN) != 0; 166 queue->tq_flags |= TQ_FLAGS_ACTIVE; 167 if (enqueue == taskqueue_fast_enqueue || 168 enqueue == taskqueue_swi_enqueue || 169 enqueue == taskqueue_swi_giant_enqueue || 170 enqueue == taskqueue_thread_enqueue) 171 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE; 172 mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags); 173 174 return (queue); 175 } 176 177 struct taskqueue * 178 taskqueue_create(const char *name, int mflags, 179 taskqueue_enqueue_fn enqueue, void *context) 180 { 181 182 return _taskqueue_create(name, mflags, enqueue, context, 183 MTX_DEF, name); 184 } 185 186 void 187 taskqueue_set_callback(struct taskqueue *queue, 188 enum taskqueue_callback_type cb_type, taskqueue_callback_fn callback, 189 void *context) 190 { 191 192 KASSERT(((cb_type >= TASKQUEUE_CALLBACK_TYPE_MIN) && 193 (cb_type <= TASKQUEUE_CALLBACK_TYPE_MAX)), 194 ("Callback type %d not valid, must be %d-%d", cb_type, 195 TASKQUEUE_CALLBACK_TYPE_MIN, TASKQUEUE_CALLBACK_TYPE_MAX)); 196 KASSERT((queue->tq_callbacks[cb_type] == NULL), 197 ("Re-initialization of taskqueue callback?")); 198 199 queue->tq_callbacks[cb_type] = callback; 200 queue->tq_cb_contexts[cb_type] = context; 201 } 202 203 /* 204 * Signal a taskqueue thread to terminate. 205 */ 206 static void 207 taskqueue_terminate(struct thread **pp, struct taskqueue *tq) 208 { 209 210 while (tq->tq_tcount > 0 || tq->tq_callouts > 0) { 211 wakeup(tq); 212 TQ_SLEEP(tq, pp, "tq_destroy"); 213 } 214 } 215 216 void 217 taskqueue_free(struct taskqueue *queue) 218 { 219 220 TQ_LOCK(queue); 221 queue->tq_flags &= ~TQ_FLAGS_ACTIVE; 222 taskqueue_terminate(queue->tq_threads, queue); 223 KASSERT(LIST_EMPTY(&queue->tq_active), ("Tasks still running?")); 224 KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks")); 225 mtx_destroy(&queue->tq_mutex); 226 free(queue->tq_threads, M_TASKQUEUE); 227 free(queue->tq_name, M_TASKQUEUE); 228 free(queue, M_TASKQUEUE); 229 } 230 231 static int 232 taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task, int flags) 233 { 234 struct task *ins; 235 struct task *prev; 236 struct taskqueue_busy *tb; 237 238 KASSERT(task->ta_func != NULL, ("enqueueing task with NULL func")); 239 /* 240 * Ignore canceling task if requested. 241 */ 242 if (__predict_false((flags & TASKQUEUE_FAIL_IF_CANCELING) != 0)) { 243 tb = task_get_busy(queue, task); 244 if (tb != NULL && tb->tb_canceling) { 245 TQ_UNLOCK(queue); 246 return (ECANCELED); 247 } 248 } 249 250 /* 251 * Count multiple enqueues. 252 */ 253 if (task->ta_pending) { 254 if (__predict_false((flags & TASKQUEUE_FAIL_IF_PENDING) != 0)) { 255 TQ_UNLOCK(queue); 256 return (EEXIST); 257 } 258 if (task->ta_pending < USHRT_MAX) 259 task->ta_pending++; 260 TQ_UNLOCK(queue); 261 return (0); 262 } 263 264 /* 265 * Optimise cases when all tasks use small set of priorities. 266 * In case of only one priority we always insert at the end. 267 * In case of two tq_hint typically gives the insertion point. 268 * In case of more then two tq_hint should halve the search. 269 */ 270 prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 271 if (!prev || prev->ta_priority >= task->ta_priority) { 272 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 273 } else { 274 prev = queue->tq_hint; 275 if (prev && prev->ta_priority >= task->ta_priority) { 276 ins = STAILQ_NEXT(prev, ta_link); 277 } else { 278 prev = NULL; 279 ins = STAILQ_FIRST(&queue->tq_queue); 280 } 281 for (; ins; prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 282 if (ins->ta_priority < task->ta_priority) 283 break; 284 285 if (prev) { 286 STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 287 queue->tq_hint = task; 288 } else 289 STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 290 } 291 292 task->ta_pending = 1; 293 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0) 294 TQ_UNLOCK(queue); 295 if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) 296 queue->tq_enqueue(queue->tq_context); 297 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0) 298 TQ_UNLOCK(queue); 299 300 /* Return with lock released. */ 301 return (0); 302 } 303 304 int 305 taskqueue_enqueue_flags(struct taskqueue *queue, struct task *task, int flags) 306 { 307 int res; 308 309 TQ_LOCK(queue); 310 res = taskqueue_enqueue_locked(queue, task, flags); 311 /* The lock is released inside. */ 312 313 return (res); 314 } 315 316 int 317 taskqueue_enqueue(struct taskqueue *queue, struct task *task) 318 { 319 return (taskqueue_enqueue_flags(queue, task, 0)); 320 } 321 322 static void 323 taskqueue_timeout_func(void *arg) 324 { 325 struct taskqueue *queue; 326 struct timeout_task *timeout_task; 327 328 timeout_task = arg; 329 queue = timeout_task->q; 330 KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout")); 331 timeout_task->f &= ~DT_CALLOUT_ARMED; 332 queue->tq_callouts--; 333 taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t, 0); 334 /* The lock is released inside. */ 335 } 336 337 int 338 taskqueue_enqueue_timeout_sbt(struct taskqueue *queue, 339 struct timeout_task *timeout_task, sbintime_t sbt, sbintime_t pr, int flags) 340 { 341 int res; 342 343 TQ_LOCK(queue); 344 KASSERT(timeout_task->q == NULL || timeout_task->q == queue, 345 ("Migrated queue")); 346 timeout_task->q = queue; 347 res = timeout_task->t.ta_pending; 348 if (timeout_task->f & DT_DRAIN_IN_PROGRESS) { 349 /* Do nothing */ 350 TQ_UNLOCK(queue); 351 res = -1; 352 } else if (sbt == 0) { 353 taskqueue_enqueue_locked(queue, &timeout_task->t, 0); 354 /* The lock is released inside. */ 355 } else { 356 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) { 357 res++; 358 } else { 359 queue->tq_callouts++; 360 timeout_task->f |= DT_CALLOUT_ARMED; 361 if (sbt < 0) 362 sbt = -sbt; /* Ignore overflow. */ 363 } 364 if (sbt > 0) { 365 if (queue->tq_spin) 366 flags |= C_DIRECT_EXEC; 367 callout_reset_sbt(&timeout_task->c, sbt, pr, 368 taskqueue_timeout_func, timeout_task, flags); 369 } 370 TQ_UNLOCK(queue); 371 } 372 return (res); 373 } 374 375 int 376 taskqueue_enqueue_timeout(struct taskqueue *queue, 377 struct timeout_task *ttask, int ticks) 378 { 379 380 return (taskqueue_enqueue_timeout_sbt(queue, ttask, ticks * tick_sbt, 381 0, C_HARDCLOCK)); 382 } 383 384 static void 385 taskqueue_task_nop_fn(void *context, int pending) 386 { 387 } 388 389 /* 390 * Block until all currently queued tasks in this taskqueue 391 * have begun execution. Tasks queued during execution of 392 * this function are ignored. 393 */ 394 static int 395 taskqueue_drain_tq_queue(struct taskqueue *queue) 396 { 397 struct task t_barrier; 398 399 if (STAILQ_EMPTY(&queue->tq_queue)) 400 return (0); 401 402 /* 403 * Enqueue our barrier after all current tasks, but with 404 * the highest priority so that newly queued tasks cannot 405 * pass it. Because of the high priority, we can not use 406 * taskqueue_enqueue_locked directly (which drops the lock 407 * anyway) so just insert it at tail while we have the 408 * queue lock. 409 */ 410 TASK_INIT(&t_barrier, UCHAR_MAX, taskqueue_task_nop_fn, &t_barrier); 411 STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link); 412 queue->tq_hint = &t_barrier; 413 t_barrier.ta_pending = 1; 414 415 /* 416 * Once the barrier has executed, all previously queued tasks 417 * have completed or are currently executing. 418 */ 419 while (t_barrier.ta_pending != 0) 420 TQ_SLEEP(queue, &t_barrier, "tq_qdrain"); 421 return (1); 422 } 423 424 /* 425 * Block until all currently executing tasks for this taskqueue 426 * complete. Tasks that begin execution during the execution 427 * of this function are ignored. 428 */ 429 static int 430 taskqueue_drain_tq_active(struct taskqueue *queue) 431 { 432 struct taskqueue_busy *tb; 433 u_int seq; 434 435 if (LIST_EMPTY(&queue->tq_active)) 436 return (0); 437 438 /* Block taskq_terminate().*/ 439 queue->tq_callouts++; 440 441 /* Wait for any active task with sequence from the past. */ 442 seq = queue->tq_seq; 443 restart: 444 LIST_FOREACH(tb, &queue->tq_active, tb_link) { 445 if ((int)(tb->tb_seq - seq) <= 0) { 446 TQ_SLEEP(queue, tb->tb_running, "tq_adrain"); 447 goto restart; 448 } 449 } 450 451 /* Release taskqueue_terminate(). */ 452 queue->tq_callouts--; 453 if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0) 454 wakeup_one(queue->tq_threads); 455 return (1); 456 } 457 458 void 459 taskqueue_block(struct taskqueue *queue) 460 { 461 462 TQ_LOCK(queue); 463 queue->tq_flags |= TQ_FLAGS_BLOCKED; 464 TQ_UNLOCK(queue); 465 } 466 467 void 468 taskqueue_unblock(struct taskqueue *queue) 469 { 470 471 TQ_LOCK(queue); 472 queue->tq_flags &= ~TQ_FLAGS_BLOCKED; 473 if (!STAILQ_EMPTY(&queue->tq_queue)) 474 queue->tq_enqueue(queue->tq_context); 475 TQ_UNLOCK(queue); 476 } 477 478 static void 479 taskqueue_run_locked(struct taskqueue *queue) 480 { 481 struct epoch_tracker et; 482 struct taskqueue_busy tb; 483 struct task *task; 484 bool in_net_epoch; 485 int pending; 486 487 KASSERT(queue != NULL, ("tq is NULL")); 488 TQ_ASSERT_LOCKED(queue); 489 tb.tb_running = NULL; 490 LIST_INSERT_HEAD(&queue->tq_active, &tb, tb_link); 491 in_net_epoch = false; 492 493 while ((task = STAILQ_FIRST(&queue->tq_queue)) != NULL) { 494 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 495 if (queue->tq_hint == task) 496 queue->tq_hint = NULL; 497 pending = task->ta_pending; 498 task->ta_pending = 0; 499 tb.tb_running = task; 500 tb.tb_seq = ++queue->tq_seq; 501 tb.tb_canceling = false; 502 TQ_UNLOCK(queue); 503 504 KASSERT(task->ta_func != NULL, ("task->ta_func is NULL")); 505 if (!in_net_epoch && TASK_IS_NET(task)) { 506 in_net_epoch = true; 507 NET_EPOCH_ENTER(et); 508 } else if (in_net_epoch && !TASK_IS_NET(task)) { 509 NET_EPOCH_EXIT(et); 510 in_net_epoch = false; 511 } 512 task->ta_func(task->ta_context, pending); 513 514 TQ_LOCK(queue); 515 wakeup(task); 516 } 517 if (in_net_epoch) 518 NET_EPOCH_EXIT(et); 519 LIST_REMOVE(&tb, tb_link); 520 } 521 522 void 523 taskqueue_run(struct taskqueue *queue) 524 { 525 526 TQ_LOCK(queue); 527 taskqueue_run_locked(queue); 528 TQ_UNLOCK(queue); 529 } 530 531 /* 532 * Only use this function in single threaded contexts. It returns 533 * non-zero if the given task is either pending or running. Else the 534 * task is idle and can be queued again or freed. 535 */ 536 int 537 taskqueue_poll_is_busy(struct taskqueue *queue, struct task *task) 538 { 539 int retval; 540 541 TQ_LOCK(queue); 542 retval = task->ta_pending > 0 || task_get_busy(queue, task) != NULL; 543 TQ_UNLOCK(queue); 544 545 return (retval); 546 } 547 548 static int 549 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task, 550 u_int *pendp) 551 { 552 struct taskqueue_busy *tb; 553 int retval = 0; 554 555 if (task->ta_pending > 0) { 556 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link); 557 if (queue->tq_hint == task) 558 queue->tq_hint = NULL; 559 } 560 if (pendp != NULL) 561 *pendp = task->ta_pending; 562 task->ta_pending = 0; 563 tb = task_get_busy(queue, task); 564 if (tb != NULL) { 565 tb->tb_canceling = true; 566 retval = EBUSY; 567 } 568 569 return (retval); 570 } 571 572 int 573 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp) 574 { 575 int error; 576 577 TQ_LOCK(queue); 578 error = taskqueue_cancel_locked(queue, task, pendp); 579 TQ_UNLOCK(queue); 580 581 return (error); 582 } 583 584 int 585 taskqueue_cancel_timeout(struct taskqueue *queue, 586 struct timeout_task *timeout_task, u_int *pendp) 587 { 588 u_int pending, pending1; 589 int error; 590 591 TQ_LOCK(queue); 592 pending = !!(callout_stop(&timeout_task->c) > 0); 593 error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1); 594 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) { 595 timeout_task->f &= ~DT_CALLOUT_ARMED; 596 queue->tq_callouts--; 597 } 598 TQ_UNLOCK(queue); 599 600 if (pendp != NULL) 601 *pendp = pending + pending1; 602 return (error); 603 } 604 605 void 606 taskqueue_drain(struct taskqueue *queue, struct task *task) 607 { 608 609 if (!queue->tq_spin) 610 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 611 612 TQ_LOCK(queue); 613 while (task->ta_pending != 0 || task_get_busy(queue, task) != NULL) 614 TQ_SLEEP(queue, task, "tq_drain"); 615 TQ_UNLOCK(queue); 616 } 617 618 void 619 taskqueue_drain_all(struct taskqueue *queue) 620 { 621 622 if (!queue->tq_spin) 623 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 624 625 TQ_LOCK(queue); 626 (void)taskqueue_drain_tq_queue(queue); 627 (void)taskqueue_drain_tq_active(queue); 628 TQ_UNLOCK(queue); 629 } 630 631 void 632 taskqueue_drain_timeout(struct taskqueue *queue, 633 struct timeout_task *timeout_task) 634 { 635 636 /* 637 * Set flag to prevent timer from re-starting during drain: 638 */ 639 TQ_LOCK(queue); 640 KASSERT((timeout_task->f & DT_DRAIN_IN_PROGRESS) == 0, 641 ("Drain already in progress")); 642 timeout_task->f |= DT_DRAIN_IN_PROGRESS; 643 TQ_UNLOCK(queue); 644 645 callout_drain(&timeout_task->c); 646 taskqueue_drain(queue, &timeout_task->t); 647 648 /* 649 * Clear flag to allow timer to re-start: 650 */ 651 TQ_LOCK(queue); 652 timeout_task->f &= ~DT_DRAIN_IN_PROGRESS; 653 TQ_UNLOCK(queue); 654 } 655 656 void 657 taskqueue_quiesce(struct taskqueue *queue) 658 { 659 int ret; 660 661 TQ_LOCK(queue); 662 do { 663 ret = taskqueue_drain_tq_queue(queue); 664 if (ret == 0) 665 ret = taskqueue_drain_tq_active(queue); 666 } while (ret != 0); 667 TQ_UNLOCK(queue); 668 } 669 670 static void 671 taskqueue_swi_enqueue(void *context) 672 { 673 swi_sched(taskqueue_ih, 0); 674 } 675 676 static void 677 taskqueue_swi_run(void *dummy) 678 { 679 taskqueue_run(taskqueue_swi); 680 } 681 682 static void 683 taskqueue_swi_giant_enqueue(void *context) 684 { 685 swi_sched(taskqueue_giant_ih, 0); 686 } 687 688 static void 689 taskqueue_swi_giant_run(void *dummy) 690 { 691 taskqueue_run(taskqueue_swi_giant); 692 } 693 694 static int 695 _taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, 696 cpuset_t *mask, struct proc *p, const char *name, va_list ap) 697 { 698 char ktname[MAXCOMLEN + 1]; 699 struct thread *td; 700 struct taskqueue *tq; 701 int i, error; 702 703 if (count <= 0) 704 return (EINVAL); 705 706 vsnprintf(ktname, sizeof(ktname), name, ap); 707 tq = *tqp; 708 709 tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE, 710 M_NOWAIT | M_ZERO); 711 if (tq->tq_threads == NULL) { 712 printf("%s: no memory for %s threads\n", __func__, ktname); 713 return (ENOMEM); 714 } 715 716 for (i = 0; i < count; i++) { 717 if (count == 1) 718 error = kthread_add(taskqueue_thread_loop, tqp, p, 719 &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname); 720 else 721 error = kthread_add(taskqueue_thread_loop, tqp, p, 722 &tq->tq_threads[i], RFSTOPPED, 0, 723 "%s_%d", ktname, i); 724 if (error) { 725 /* should be ok to continue, taskqueue_free will dtrt */ 726 printf("%s: kthread_add(%s): error %d", __func__, 727 ktname, error); 728 tq->tq_threads[i] = NULL; /* paranoid */ 729 } else 730 tq->tq_tcount++; 731 } 732 if (tq->tq_tcount == 0) { 733 free(tq->tq_threads, M_TASKQUEUE); 734 tq->tq_threads = NULL; 735 return (ENOMEM); 736 } 737 for (i = 0; i < count; i++) { 738 if (tq->tq_threads[i] == NULL) 739 continue; 740 td = tq->tq_threads[i]; 741 if (mask) { 742 error = cpuset_setthread(td->td_tid, mask); 743 /* 744 * Failing to pin is rarely an actual fatal error; 745 * it'll just affect performance. 746 */ 747 if (error) 748 printf("%s: curthread=%llu: can't pin; " 749 "error=%d\n", 750 __func__, 751 (unsigned long long) td->td_tid, 752 error); 753 } 754 thread_lock(td); 755 sched_prio(td, pri); 756 sched_add(td, SRQ_BORING); 757 } 758 759 return (0); 760 } 761 762 int 763 taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, 764 const char *name, ...) 765 { 766 va_list ap; 767 int error; 768 769 va_start(ap, name); 770 error = _taskqueue_start_threads(tqp, count, pri, NULL, NULL, name, ap); 771 va_end(ap); 772 return (error); 773 } 774 775 int 776 taskqueue_start_threads_in_proc(struct taskqueue **tqp, int count, int pri, 777 struct proc *proc, const char *name, ...) 778 { 779 va_list ap; 780 int error; 781 782 va_start(ap, name); 783 error = _taskqueue_start_threads(tqp, count, pri, NULL, proc, name, ap); 784 va_end(ap); 785 return (error); 786 } 787 788 int 789 taskqueue_start_threads_cpuset(struct taskqueue **tqp, int count, int pri, 790 cpuset_t *mask, const char *name, ...) 791 { 792 va_list ap; 793 int error; 794 795 va_start(ap, name); 796 error = _taskqueue_start_threads(tqp, count, pri, mask, NULL, name, ap); 797 va_end(ap); 798 return (error); 799 } 800 801 static inline void 802 taskqueue_run_callback(struct taskqueue *tq, 803 enum taskqueue_callback_type cb_type) 804 { 805 taskqueue_callback_fn tq_callback; 806 807 TQ_ASSERT_UNLOCKED(tq); 808 tq_callback = tq->tq_callbacks[cb_type]; 809 if (tq_callback != NULL) 810 tq_callback(tq->tq_cb_contexts[cb_type]); 811 } 812 813 void 814 taskqueue_thread_loop(void *arg) 815 { 816 struct taskqueue **tqp, *tq; 817 818 tqp = arg; 819 tq = *tqp; 820 taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT); 821 TQ_LOCK(tq); 822 while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) { 823 /* XXX ? */ 824 taskqueue_run_locked(tq); 825 /* 826 * Because taskqueue_run() can drop tq_mutex, we need to 827 * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the 828 * meantime, which means we missed a wakeup. 829 */ 830 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0) 831 break; 832 TQ_SLEEP(tq, tq, "-"); 833 } 834 taskqueue_run_locked(tq); 835 /* 836 * This thread is on its way out, so just drop the lock temporarily 837 * in order to call the shutdown callback. This allows the callback 838 * to look at the taskqueue, even just before it dies. 839 */ 840 TQ_UNLOCK(tq); 841 taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN); 842 TQ_LOCK(tq); 843 844 /* rendezvous with thread that asked us to terminate */ 845 tq->tq_tcount--; 846 wakeup_one(tq->tq_threads); 847 TQ_UNLOCK(tq); 848 kthread_exit(); 849 } 850 851 void 852 taskqueue_thread_enqueue(void *context) 853 { 854 struct taskqueue **tqp, *tq; 855 856 tqp = context; 857 tq = *tqp; 858 wakeup_any(tq); 859 } 860 861 TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL, 862 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ, 863 INTR_MPSAFE, &taskqueue_ih)); 864 865 TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL, 866 swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run, 867 NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih)); 868 869 TASKQUEUE_DEFINE_THREAD(thread); 870 871 struct taskqueue * 872 taskqueue_create_fast(const char *name, int mflags, 873 taskqueue_enqueue_fn enqueue, void *context) 874 { 875 return _taskqueue_create(name, mflags, enqueue, context, 876 MTX_SPIN, "fast_taskqueue"); 877 } 878 879 static void *taskqueue_fast_ih; 880 881 static void 882 taskqueue_fast_enqueue(void *context) 883 { 884 swi_sched(taskqueue_fast_ih, 0); 885 } 886 887 static void 888 taskqueue_fast_run(void *dummy) 889 { 890 taskqueue_run(taskqueue_fast); 891 } 892 893 TASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, NULL, 894 swi_add(NULL, "fast taskq", taskqueue_fast_run, NULL, 895 SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih)); 896 897 int 898 taskqueue_member(struct taskqueue *queue, struct thread *td) 899 { 900 int i, j, ret = 0; 901 902 for (i = 0, j = 0; ; i++) { 903 if (queue->tq_threads[i] == NULL) 904 continue; 905 if (queue->tq_threads[i] == td) { 906 ret = 1; 907 break; 908 } 909 if (++j >= queue->tq_tcount) 910 break; 911 } 912 return (ret); 913 } 914