1 /*- 2 * SPDX-License-Identifier: BSD-2-Clause 3 * 4 * Copyright (c) 2000 Doug Rabson 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26 * SUCH DAMAGE. 27 */ 28 29 #include <sys/cdefs.h> 30 __FBSDID("$FreeBSD$"); 31 32 #include <sys/param.h> 33 #include <sys/systm.h> 34 #include <sys/bus.h> 35 #include <sys/cpuset.h> 36 #include <sys/interrupt.h> 37 #include <sys/kernel.h> 38 #include <sys/kthread.h> 39 #include <sys/libkern.h> 40 #include <sys/limits.h> 41 #include <sys/lock.h> 42 #include <sys/malloc.h> 43 #include <sys/mutex.h> 44 #include <sys/proc.h> 45 #include <sys/epoch.h> 46 #include <sys/sched.h> 47 #include <sys/smp.h> 48 #include <sys/taskqueue.h> 49 #include <sys/unistd.h> 50 #include <machine/stdarg.h> 51 52 static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues"); 53 static void *taskqueue_giant_ih; 54 static void *taskqueue_ih; 55 static void taskqueue_fast_enqueue(void *); 56 static void taskqueue_swi_enqueue(void *); 57 static void taskqueue_swi_giant_enqueue(void *); 58 59 struct taskqueue_busy { 60 struct task *tb_running; 61 u_int tb_seq; 62 bool tb_canceling; 63 LIST_ENTRY(taskqueue_busy) tb_link; 64 }; 65 66 struct taskqueue { 67 STAILQ_HEAD(, task) tq_queue; 68 LIST_HEAD(, taskqueue_busy) tq_active; 69 struct task *tq_hint; 70 u_int tq_seq; 71 int tq_callouts; 72 struct mtx_padalign tq_mutex; 73 taskqueue_enqueue_fn tq_enqueue; 74 void *tq_context; 75 char *tq_name; 76 struct thread **tq_threads; 77 int tq_tcount; 78 int tq_spin; 79 int tq_flags; 80 taskqueue_callback_fn tq_callbacks[TASKQUEUE_NUM_CALLBACKS]; 81 void *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS]; 82 }; 83 84 #define TQ_FLAGS_ACTIVE (1 << 0) 85 #define TQ_FLAGS_BLOCKED (1 << 1) 86 #define TQ_FLAGS_UNLOCKED_ENQUEUE (1 << 2) 87 88 #define DT_CALLOUT_ARMED (1 << 0) 89 #define DT_DRAIN_IN_PROGRESS (1 << 1) 90 91 #define TQ_LOCK(tq) \ 92 do { \ 93 if ((tq)->tq_spin) \ 94 mtx_lock_spin(&(tq)->tq_mutex); \ 95 else \ 96 mtx_lock(&(tq)->tq_mutex); \ 97 } while (0) 98 #define TQ_ASSERT_LOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_OWNED) 99 100 #define TQ_UNLOCK(tq) \ 101 do { \ 102 if ((tq)->tq_spin) \ 103 mtx_unlock_spin(&(tq)->tq_mutex); \ 104 else \ 105 mtx_unlock(&(tq)->tq_mutex); \ 106 } while (0) 107 #define TQ_ASSERT_UNLOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED) 108 109 void 110 _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task, 111 int priority, task_fn_t func, void *context) 112 { 113 114 TASK_INIT(&timeout_task->t, priority, func, context); 115 callout_init_mtx(&timeout_task->c, &queue->tq_mutex, 116 CALLOUT_RETURNUNLOCKED); 117 timeout_task->q = queue; 118 timeout_task->f = 0; 119 } 120 121 static __inline int 122 TQ_SLEEP(struct taskqueue *tq, void *p, const char *wm) 123 { 124 if (tq->tq_spin) 125 return (msleep_spin(p, (struct mtx *)&tq->tq_mutex, wm, 0)); 126 return (msleep(p, &tq->tq_mutex, 0, wm, 0)); 127 } 128 129 static struct taskqueue_busy * 130 task_get_busy(struct taskqueue *queue, struct task *task) 131 { 132 struct taskqueue_busy *tb; 133 134 TQ_ASSERT_LOCKED(queue); 135 LIST_FOREACH(tb, &queue->tq_active, tb_link) { 136 if (tb->tb_running == task) 137 return (tb); 138 } 139 return (NULL); 140 } 141 142 static struct taskqueue * 143 _taskqueue_create(const char *name, int mflags, 144 taskqueue_enqueue_fn enqueue, void *context, 145 int mtxflags, const char *mtxname __unused) 146 { 147 struct taskqueue *queue; 148 char *tq_name; 149 150 tq_name = malloc(TASKQUEUE_NAMELEN, M_TASKQUEUE, mflags | M_ZERO); 151 if (tq_name == NULL) 152 return (NULL); 153 154 queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO); 155 if (queue == NULL) { 156 free(tq_name, M_TASKQUEUE); 157 return (NULL); 158 } 159 160 snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue"); 161 162 STAILQ_INIT(&queue->tq_queue); 163 LIST_INIT(&queue->tq_active); 164 queue->tq_enqueue = enqueue; 165 queue->tq_context = context; 166 queue->tq_name = tq_name; 167 queue->tq_spin = (mtxflags & MTX_SPIN) != 0; 168 queue->tq_flags |= TQ_FLAGS_ACTIVE; 169 if (enqueue == taskqueue_fast_enqueue || 170 enqueue == taskqueue_swi_enqueue || 171 enqueue == taskqueue_swi_giant_enqueue || 172 enqueue == taskqueue_thread_enqueue) 173 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE; 174 mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags); 175 176 return (queue); 177 } 178 179 struct taskqueue * 180 taskqueue_create(const char *name, int mflags, 181 taskqueue_enqueue_fn enqueue, void *context) 182 { 183 184 return _taskqueue_create(name, mflags, enqueue, context, 185 MTX_DEF, name); 186 } 187 188 void 189 taskqueue_set_callback(struct taskqueue *queue, 190 enum taskqueue_callback_type cb_type, taskqueue_callback_fn callback, 191 void *context) 192 { 193 194 KASSERT(((cb_type >= TASKQUEUE_CALLBACK_TYPE_MIN) && 195 (cb_type <= TASKQUEUE_CALLBACK_TYPE_MAX)), 196 ("Callback type %d not valid, must be %d-%d", cb_type, 197 TASKQUEUE_CALLBACK_TYPE_MIN, TASKQUEUE_CALLBACK_TYPE_MAX)); 198 KASSERT((queue->tq_callbacks[cb_type] == NULL), 199 ("Re-initialization of taskqueue callback?")); 200 201 queue->tq_callbacks[cb_type] = callback; 202 queue->tq_cb_contexts[cb_type] = context; 203 } 204 205 /* 206 * Signal a taskqueue thread to terminate. 207 */ 208 static void 209 taskqueue_terminate(struct thread **pp, struct taskqueue *tq) 210 { 211 212 while (tq->tq_tcount > 0 || tq->tq_callouts > 0) { 213 wakeup(tq); 214 TQ_SLEEP(tq, pp, "tq_destroy"); 215 } 216 } 217 218 void 219 taskqueue_free(struct taskqueue *queue) 220 { 221 222 TQ_LOCK(queue); 223 queue->tq_flags &= ~TQ_FLAGS_ACTIVE; 224 taskqueue_terminate(queue->tq_threads, queue); 225 KASSERT(LIST_EMPTY(&queue->tq_active), ("Tasks still running?")); 226 KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks")); 227 mtx_destroy(&queue->tq_mutex); 228 free(queue->tq_threads, M_TASKQUEUE); 229 free(queue->tq_name, M_TASKQUEUE); 230 free(queue, M_TASKQUEUE); 231 } 232 233 static int 234 taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task, int flags) 235 { 236 struct task *ins; 237 struct task *prev; 238 struct taskqueue_busy *tb; 239 240 KASSERT(task->ta_func != NULL, ("enqueueing task with NULL func")); 241 /* 242 * Ignore canceling task if requested. 243 */ 244 if (__predict_false((flags & TASKQUEUE_FAIL_IF_CANCELING) != 0)) { 245 tb = task_get_busy(queue, task); 246 if (tb != NULL && tb->tb_canceling) { 247 TQ_UNLOCK(queue); 248 return (ECANCELED); 249 } 250 } 251 252 /* 253 * Count multiple enqueues. 254 */ 255 if (task->ta_pending) { 256 if (__predict_false((flags & TASKQUEUE_FAIL_IF_PENDING) != 0)) { 257 TQ_UNLOCK(queue); 258 return (EEXIST); 259 } 260 if (task->ta_pending < USHRT_MAX) 261 task->ta_pending++; 262 TQ_UNLOCK(queue); 263 return (0); 264 } 265 266 /* 267 * Optimise cases when all tasks use small set of priorities. 268 * In case of only one priority we always insert at the end. 269 * In case of two tq_hint typically gives the insertion point. 270 * In case of more then two tq_hint should halve the search. 271 */ 272 prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 273 if (!prev || prev->ta_priority >= task->ta_priority) { 274 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 275 } else { 276 prev = queue->tq_hint; 277 if (prev && prev->ta_priority >= task->ta_priority) { 278 ins = STAILQ_NEXT(prev, ta_link); 279 } else { 280 prev = NULL; 281 ins = STAILQ_FIRST(&queue->tq_queue); 282 } 283 for (; ins; prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 284 if (ins->ta_priority < task->ta_priority) 285 break; 286 287 if (prev) { 288 STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 289 queue->tq_hint = task; 290 } else 291 STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 292 } 293 294 task->ta_pending = 1; 295 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0) 296 TQ_UNLOCK(queue); 297 if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) 298 queue->tq_enqueue(queue->tq_context); 299 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0) 300 TQ_UNLOCK(queue); 301 302 /* Return with lock released. */ 303 return (0); 304 } 305 306 int 307 taskqueue_enqueue_flags(struct taskqueue *queue, struct task *task, int flags) 308 { 309 int res; 310 311 TQ_LOCK(queue); 312 res = taskqueue_enqueue_locked(queue, task, flags); 313 /* The lock is released inside. */ 314 315 return (res); 316 } 317 318 int 319 taskqueue_enqueue(struct taskqueue *queue, struct task *task) 320 { 321 return (taskqueue_enqueue_flags(queue, task, 0)); 322 } 323 324 static void 325 taskqueue_timeout_func(void *arg) 326 { 327 struct taskqueue *queue; 328 struct timeout_task *timeout_task; 329 330 timeout_task = arg; 331 queue = timeout_task->q; 332 KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout")); 333 timeout_task->f &= ~DT_CALLOUT_ARMED; 334 queue->tq_callouts--; 335 taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t, 0); 336 /* The lock is released inside. */ 337 } 338 339 int 340 taskqueue_enqueue_timeout_sbt(struct taskqueue *queue, 341 struct timeout_task *timeout_task, sbintime_t sbt, sbintime_t pr, int flags) 342 { 343 int res; 344 345 TQ_LOCK(queue); 346 KASSERT(timeout_task->q == NULL || timeout_task->q == queue, 347 ("Migrated queue")); 348 timeout_task->q = queue; 349 res = timeout_task->t.ta_pending; 350 if (timeout_task->f & DT_DRAIN_IN_PROGRESS) { 351 /* Do nothing */ 352 TQ_UNLOCK(queue); 353 res = -1; 354 } else if (sbt == 0) { 355 taskqueue_enqueue_locked(queue, &timeout_task->t, 0); 356 /* The lock is released inside. */ 357 } else { 358 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) { 359 res++; 360 } else { 361 queue->tq_callouts++; 362 timeout_task->f |= DT_CALLOUT_ARMED; 363 if (sbt < 0) 364 sbt = -sbt; /* Ignore overflow. */ 365 } 366 if (sbt > 0) { 367 if (queue->tq_spin) 368 flags |= C_DIRECT_EXEC; 369 callout_reset_sbt(&timeout_task->c, sbt, pr, 370 taskqueue_timeout_func, timeout_task, flags); 371 } 372 TQ_UNLOCK(queue); 373 } 374 return (res); 375 } 376 377 int 378 taskqueue_enqueue_timeout(struct taskqueue *queue, 379 struct timeout_task *ttask, int ticks) 380 { 381 382 return (taskqueue_enqueue_timeout_sbt(queue, ttask, ticks * tick_sbt, 383 0, C_HARDCLOCK)); 384 } 385 386 static void 387 taskqueue_task_nop_fn(void *context, int pending) 388 { 389 } 390 391 /* 392 * Block until all currently queued tasks in this taskqueue 393 * have begun execution. Tasks queued during execution of 394 * this function are ignored. 395 */ 396 static int 397 taskqueue_drain_tq_queue(struct taskqueue *queue) 398 { 399 struct task t_barrier; 400 401 if (STAILQ_EMPTY(&queue->tq_queue)) 402 return (0); 403 404 /* 405 * Enqueue our barrier after all current tasks, but with 406 * the highest priority so that newly queued tasks cannot 407 * pass it. Because of the high priority, we can not use 408 * taskqueue_enqueue_locked directly (which drops the lock 409 * anyway) so just insert it at tail while we have the 410 * queue lock. 411 */ 412 TASK_INIT(&t_barrier, UCHAR_MAX, taskqueue_task_nop_fn, &t_barrier); 413 STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link); 414 queue->tq_hint = &t_barrier; 415 t_barrier.ta_pending = 1; 416 417 /* 418 * Once the barrier has executed, all previously queued tasks 419 * have completed or are currently executing. 420 */ 421 while (t_barrier.ta_pending != 0) 422 TQ_SLEEP(queue, &t_barrier, "tq_qdrain"); 423 return (1); 424 } 425 426 /* 427 * Block until all currently executing tasks for this taskqueue 428 * complete. Tasks that begin execution during the execution 429 * of this function are ignored. 430 */ 431 static int 432 taskqueue_drain_tq_active(struct taskqueue *queue) 433 { 434 struct taskqueue_busy *tb; 435 u_int seq; 436 437 if (LIST_EMPTY(&queue->tq_active)) 438 return (0); 439 440 /* Block taskq_terminate().*/ 441 queue->tq_callouts++; 442 443 /* Wait for any active task with sequence from the past. */ 444 seq = queue->tq_seq; 445 restart: 446 LIST_FOREACH(tb, &queue->tq_active, tb_link) { 447 if ((int)(tb->tb_seq - seq) <= 0) { 448 TQ_SLEEP(queue, tb->tb_running, "tq_adrain"); 449 goto restart; 450 } 451 } 452 453 /* Release taskqueue_terminate(). */ 454 queue->tq_callouts--; 455 if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0) 456 wakeup_one(queue->tq_threads); 457 return (1); 458 } 459 460 void 461 taskqueue_block(struct taskqueue *queue) 462 { 463 464 TQ_LOCK(queue); 465 queue->tq_flags |= TQ_FLAGS_BLOCKED; 466 TQ_UNLOCK(queue); 467 } 468 469 void 470 taskqueue_unblock(struct taskqueue *queue) 471 { 472 473 TQ_LOCK(queue); 474 queue->tq_flags &= ~TQ_FLAGS_BLOCKED; 475 if (!STAILQ_EMPTY(&queue->tq_queue)) 476 queue->tq_enqueue(queue->tq_context); 477 TQ_UNLOCK(queue); 478 } 479 480 static void 481 taskqueue_run_locked(struct taskqueue *queue) 482 { 483 struct epoch_tracker et; 484 struct taskqueue_busy tb; 485 struct task *task; 486 bool in_net_epoch; 487 int pending; 488 489 KASSERT(queue != NULL, ("tq is NULL")); 490 TQ_ASSERT_LOCKED(queue); 491 tb.tb_running = NULL; 492 LIST_INSERT_HEAD(&queue->tq_active, &tb, tb_link); 493 in_net_epoch = false; 494 495 while ((task = STAILQ_FIRST(&queue->tq_queue)) != NULL) { 496 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 497 if (queue->tq_hint == task) 498 queue->tq_hint = NULL; 499 pending = task->ta_pending; 500 task->ta_pending = 0; 501 tb.tb_running = task; 502 tb.tb_seq = ++queue->tq_seq; 503 tb.tb_canceling = false; 504 TQ_UNLOCK(queue); 505 506 KASSERT(task->ta_func != NULL, ("task->ta_func is NULL")); 507 if (!in_net_epoch && TASK_IS_NET(task)) { 508 in_net_epoch = true; 509 NET_EPOCH_ENTER(et); 510 } else if (in_net_epoch && !TASK_IS_NET(task)) { 511 NET_EPOCH_EXIT(et); 512 in_net_epoch = false; 513 } 514 task->ta_func(task->ta_context, pending); 515 516 TQ_LOCK(queue); 517 wakeup(task); 518 } 519 if (in_net_epoch) 520 NET_EPOCH_EXIT(et); 521 LIST_REMOVE(&tb, tb_link); 522 } 523 524 void 525 taskqueue_run(struct taskqueue *queue) 526 { 527 528 TQ_LOCK(queue); 529 taskqueue_run_locked(queue); 530 TQ_UNLOCK(queue); 531 } 532 533 /* 534 * Only use this function in single threaded contexts. It returns 535 * non-zero if the given task is either pending or running. Else the 536 * task is idle and can be queued again or freed. 537 */ 538 int 539 taskqueue_poll_is_busy(struct taskqueue *queue, struct task *task) 540 { 541 int retval; 542 543 TQ_LOCK(queue); 544 retval = task->ta_pending > 0 || task_get_busy(queue, task) != NULL; 545 TQ_UNLOCK(queue); 546 547 return (retval); 548 } 549 550 static int 551 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task, 552 u_int *pendp) 553 { 554 struct taskqueue_busy *tb; 555 int retval = 0; 556 557 if (task->ta_pending > 0) { 558 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link); 559 if (queue->tq_hint == task) 560 queue->tq_hint = NULL; 561 } 562 if (pendp != NULL) 563 *pendp = task->ta_pending; 564 task->ta_pending = 0; 565 tb = task_get_busy(queue, task); 566 if (tb != NULL) { 567 tb->tb_canceling = true; 568 retval = EBUSY; 569 } 570 571 return (retval); 572 } 573 574 int 575 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp) 576 { 577 int error; 578 579 TQ_LOCK(queue); 580 error = taskqueue_cancel_locked(queue, task, pendp); 581 TQ_UNLOCK(queue); 582 583 return (error); 584 } 585 586 int 587 taskqueue_cancel_timeout(struct taskqueue *queue, 588 struct timeout_task *timeout_task, u_int *pendp) 589 { 590 u_int pending, pending1; 591 int error; 592 593 TQ_LOCK(queue); 594 pending = !!(callout_stop(&timeout_task->c) > 0); 595 error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1); 596 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) { 597 timeout_task->f &= ~DT_CALLOUT_ARMED; 598 queue->tq_callouts--; 599 } 600 TQ_UNLOCK(queue); 601 602 if (pendp != NULL) 603 *pendp = pending + pending1; 604 return (error); 605 } 606 607 void 608 taskqueue_drain(struct taskqueue *queue, struct task *task) 609 { 610 611 if (!queue->tq_spin) 612 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 613 614 TQ_LOCK(queue); 615 while (task->ta_pending != 0 || task_get_busy(queue, task) != NULL) 616 TQ_SLEEP(queue, task, "tq_drain"); 617 TQ_UNLOCK(queue); 618 } 619 620 void 621 taskqueue_drain_all(struct taskqueue *queue) 622 { 623 624 if (!queue->tq_spin) 625 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 626 627 TQ_LOCK(queue); 628 (void)taskqueue_drain_tq_queue(queue); 629 (void)taskqueue_drain_tq_active(queue); 630 TQ_UNLOCK(queue); 631 } 632 633 void 634 taskqueue_drain_timeout(struct taskqueue *queue, 635 struct timeout_task *timeout_task) 636 { 637 638 /* 639 * Set flag to prevent timer from re-starting during drain: 640 */ 641 TQ_LOCK(queue); 642 KASSERT((timeout_task->f & DT_DRAIN_IN_PROGRESS) == 0, 643 ("Drain already in progress")); 644 timeout_task->f |= DT_DRAIN_IN_PROGRESS; 645 TQ_UNLOCK(queue); 646 647 callout_drain(&timeout_task->c); 648 taskqueue_drain(queue, &timeout_task->t); 649 650 /* 651 * Clear flag to allow timer to re-start: 652 */ 653 TQ_LOCK(queue); 654 timeout_task->f &= ~DT_DRAIN_IN_PROGRESS; 655 TQ_UNLOCK(queue); 656 } 657 658 void 659 taskqueue_quiesce(struct taskqueue *queue) 660 { 661 int ret; 662 663 TQ_LOCK(queue); 664 do { 665 ret = taskqueue_drain_tq_queue(queue); 666 if (ret == 0) 667 ret = taskqueue_drain_tq_active(queue); 668 } while (ret != 0); 669 TQ_UNLOCK(queue); 670 } 671 672 static void 673 taskqueue_swi_enqueue(void *context) 674 { 675 swi_sched(taskqueue_ih, 0); 676 } 677 678 static void 679 taskqueue_swi_run(void *dummy) 680 { 681 taskqueue_run(taskqueue_swi); 682 } 683 684 static void 685 taskqueue_swi_giant_enqueue(void *context) 686 { 687 swi_sched(taskqueue_giant_ih, 0); 688 } 689 690 static void 691 taskqueue_swi_giant_run(void *dummy) 692 { 693 taskqueue_run(taskqueue_swi_giant); 694 } 695 696 static int 697 _taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, 698 cpuset_t *mask, struct proc *p, const char *name, va_list ap) 699 { 700 char ktname[MAXCOMLEN + 1]; 701 struct thread *td; 702 struct taskqueue *tq; 703 int i, error; 704 705 if (count <= 0) 706 return (EINVAL); 707 708 vsnprintf(ktname, sizeof(ktname), name, ap); 709 tq = *tqp; 710 711 tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE, 712 M_NOWAIT | M_ZERO); 713 if (tq->tq_threads == NULL) { 714 printf("%s: no memory for %s threads\n", __func__, ktname); 715 return (ENOMEM); 716 } 717 718 for (i = 0; i < count; i++) { 719 if (count == 1) 720 error = kthread_add(taskqueue_thread_loop, tqp, p, 721 &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname); 722 else 723 error = kthread_add(taskqueue_thread_loop, tqp, p, 724 &tq->tq_threads[i], RFSTOPPED, 0, 725 "%s_%d", ktname, i); 726 if (error) { 727 /* should be ok to continue, taskqueue_free will dtrt */ 728 printf("%s: kthread_add(%s): error %d", __func__, 729 ktname, error); 730 tq->tq_threads[i] = NULL; /* paranoid */ 731 } else 732 tq->tq_tcount++; 733 } 734 if (tq->tq_tcount == 0) { 735 free(tq->tq_threads, M_TASKQUEUE); 736 tq->tq_threads = NULL; 737 return (ENOMEM); 738 } 739 for (i = 0; i < count; i++) { 740 if (tq->tq_threads[i] == NULL) 741 continue; 742 td = tq->tq_threads[i]; 743 if (mask) { 744 error = cpuset_setthread(td->td_tid, mask); 745 /* 746 * Failing to pin is rarely an actual fatal error; 747 * it'll just affect performance. 748 */ 749 if (error) 750 printf("%s: curthread=%llu: can't pin; " 751 "error=%d\n", 752 __func__, 753 (unsigned long long) td->td_tid, 754 error); 755 } 756 thread_lock(td); 757 sched_prio(td, pri); 758 sched_add(td, SRQ_BORING); 759 } 760 761 return (0); 762 } 763 764 int 765 taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, 766 const char *name, ...) 767 { 768 va_list ap; 769 int error; 770 771 va_start(ap, name); 772 error = _taskqueue_start_threads(tqp, count, pri, NULL, NULL, name, ap); 773 va_end(ap); 774 return (error); 775 } 776 777 int 778 taskqueue_start_threads_in_proc(struct taskqueue **tqp, int count, int pri, 779 struct proc *proc, const char *name, ...) 780 { 781 va_list ap; 782 int error; 783 784 va_start(ap, name); 785 error = _taskqueue_start_threads(tqp, count, pri, NULL, proc, name, ap); 786 va_end(ap); 787 return (error); 788 } 789 790 int 791 taskqueue_start_threads_cpuset(struct taskqueue **tqp, int count, int pri, 792 cpuset_t *mask, const char *name, ...) 793 { 794 va_list ap; 795 int error; 796 797 va_start(ap, name); 798 error = _taskqueue_start_threads(tqp, count, pri, mask, NULL, name, ap); 799 va_end(ap); 800 return (error); 801 } 802 803 static inline void 804 taskqueue_run_callback(struct taskqueue *tq, 805 enum taskqueue_callback_type cb_type) 806 { 807 taskqueue_callback_fn tq_callback; 808 809 TQ_ASSERT_UNLOCKED(tq); 810 tq_callback = tq->tq_callbacks[cb_type]; 811 if (tq_callback != NULL) 812 tq_callback(tq->tq_cb_contexts[cb_type]); 813 } 814 815 void 816 taskqueue_thread_loop(void *arg) 817 { 818 struct taskqueue **tqp, *tq; 819 820 tqp = arg; 821 tq = *tqp; 822 taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT); 823 TQ_LOCK(tq); 824 while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) { 825 /* XXX ? */ 826 taskqueue_run_locked(tq); 827 /* 828 * Because taskqueue_run() can drop tq_mutex, we need to 829 * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the 830 * meantime, which means we missed a wakeup. 831 */ 832 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0) 833 break; 834 TQ_SLEEP(tq, tq, "-"); 835 } 836 taskqueue_run_locked(tq); 837 /* 838 * This thread is on its way out, so just drop the lock temporarily 839 * in order to call the shutdown callback. This allows the callback 840 * to look at the taskqueue, even just before it dies. 841 */ 842 TQ_UNLOCK(tq); 843 taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN); 844 TQ_LOCK(tq); 845 846 /* rendezvous with thread that asked us to terminate */ 847 tq->tq_tcount--; 848 wakeup_one(tq->tq_threads); 849 TQ_UNLOCK(tq); 850 kthread_exit(); 851 } 852 853 void 854 taskqueue_thread_enqueue(void *context) 855 { 856 struct taskqueue **tqp, *tq; 857 858 tqp = context; 859 tq = *tqp; 860 wakeup_any(tq); 861 } 862 863 TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL, 864 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ, 865 INTR_MPSAFE, &taskqueue_ih)); 866 867 TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL, 868 swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run, 869 NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih)); 870 871 TASKQUEUE_DEFINE_THREAD(thread); 872 873 struct taskqueue * 874 taskqueue_create_fast(const char *name, int mflags, 875 taskqueue_enqueue_fn enqueue, void *context) 876 { 877 return _taskqueue_create(name, mflags, enqueue, context, 878 MTX_SPIN, "fast_taskqueue"); 879 } 880 881 static void *taskqueue_fast_ih; 882 883 static void 884 taskqueue_fast_enqueue(void *context) 885 { 886 swi_sched(taskqueue_fast_ih, 0); 887 } 888 889 static void 890 taskqueue_fast_run(void *dummy) 891 { 892 taskqueue_run(taskqueue_fast); 893 } 894 895 TASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, NULL, 896 swi_add(NULL, "fast taskq", taskqueue_fast_run, NULL, 897 SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih)); 898 899 int 900 taskqueue_member(struct taskqueue *queue, struct thread *td) 901 { 902 int i, j, ret = 0; 903 904 for (i = 0, j = 0; ; i++) { 905 if (queue->tq_threads[i] == NULL) 906 continue; 907 if (queue->tq_threads[i] == td) { 908 ret = 1; 909 break; 910 } 911 if (++j >= queue->tq_tcount) 912 break; 913 } 914 return (ret); 915 } 916