1 /*- 2 * Copyright (c) 2000 Doug Rabson 3 * Copyright (c) 2014 Jeff Roberson 4 * Copyright (c) 2016 Matthew Macy 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26 * SUCH DAMAGE. 27 */ 28 29 #include <sys/cdefs.h> 30 __FBSDID("$FreeBSD$"); 31 32 #include <sys/param.h> 33 #include <sys/systm.h> 34 #include <sys/bus.h> 35 #include <sys/cpuset.h> 36 #include <sys/interrupt.h> 37 #include <sys/kernel.h> 38 #include <sys/kthread.h> 39 #include <sys/libkern.h> 40 #include <sys/limits.h> 41 #include <sys/lock.h> 42 #include <sys/malloc.h> 43 #include <sys/mutex.h> 44 #include <sys/proc.h> 45 #include <sys/sched.h> 46 #include <sys/smp.h> 47 #include <sys/gtaskqueue.h> 48 #include <sys/unistd.h> 49 #include <machine/stdarg.h> 50 51 static MALLOC_DEFINE(M_GTASKQUEUE, "taskqueue", "Task Queues"); 52 static void gtaskqueue_thread_enqueue(void *); 53 static void gtaskqueue_thread_loop(void *arg); 54 55 struct gtaskqueue_busy { 56 struct gtask *tb_running; 57 TAILQ_ENTRY(gtaskqueue_busy) tb_link; 58 }; 59 60 static struct gtask * const TB_DRAIN_WAITER = (struct gtask *)0x1; 61 62 struct gtaskqueue { 63 STAILQ_HEAD(, gtask) tq_queue; 64 gtaskqueue_enqueue_fn tq_enqueue; 65 void *tq_context; 66 char *tq_name; 67 TAILQ_HEAD(, gtaskqueue_busy) tq_active; 68 struct mtx tq_mutex; 69 struct thread **tq_threads; 70 int tq_tcount; 71 int tq_spin; 72 int tq_flags; 73 int tq_callouts; 74 taskqueue_callback_fn tq_callbacks[TASKQUEUE_NUM_CALLBACKS]; 75 void *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS]; 76 }; 77 78 #define TQ_FLAGS_ACTIVE (1 << 0) 79 #define TQ_FLAGS_BLOCKED (1 << 1) 80 #define TQ_FLAGS_UNLOCKED_ENQUEUE (1 << 2) 81 82 #define DT_CALLOUT_ARMED (1 << 0) 83 84 #define TQ_LOCK(tq) \ 85 do { \ 86 if ((tq)->tq_spin) \ 87 mtx_lock_spin(&(tq)->tq_mutex); \ 88 else \ 89 mtx_lock(&(tq)->tq_mutex); \ 90 } while (0) 91 #define TQ_ASSERT_LOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_OWNED) 92 93 #define TQ_UNLOCK(tq) \ 94 do { \ 95 if ((tq)->tq_spin) \ 96 mtx_unlock_spin(&(tq)->tq_mutex); \ 97 else \ 98 mtx_unlock(&(tq)->tq_mutex); \ 99 } while (0) 100 #define TQ_ASSERT_UNLOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED) 101 102 static __inline int 103 TQ_SLEEP(struct gtaskqueue *tq, void *p, struct mtx *m, int pri, const char *wm, 104 int t) 105 { 106 if (tq->tq_spin) 107 return (msleep_spin(p, m, wm, t)); 108 return (msleep(p, m, pri, wm, t)); 109 } 110 111 static struct gtaskqueue * 112 _gtaskqueue_create(const char *name, int mflags, 113 taskqueue_enqueue_fn enqueue, void *context, 114 int mtxflags, const char *mtxname __unused) 115 { 116 struct gtaskqueue *queue; 117 char *tq_name; 118 119 tq_name = malloc(TASKQUEUE_NAMELEN, M_GTASKQUEUE, mflags | M_ZERO); 120 if (!tq_name) 121 return (NULL); 122 123 snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue"); 124 125 queue = malloc(sizeof(struct gtaskqueue), M_GTASKQUEUE, mflags | M_ZERO); 126 if (!queue) 127 return (NULL); 128 129 STAILQ_INIT(&queue->tq_queue); 130 TAILQ_INIT(&queue->tq_active); 131 queue->tq_enqueue = enqueue; 132 queue->tq_context = context; 133 queue->tq_name = tq_name; 134 queue->tq_spin = (mtxflags & MTX_SPIN) != 0; 135 queue->tq_flags |= TQ_FLAGS_ACTIVE; 136 if (enqueue == gtaskqueue_thread_enqueue) 137 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE; 138 mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags); 139 140 return (queue); 141 } 142 143 144 /* 145 * Signal a taskqueue thread to terminate. 146 */ 147 static void 148 gtaskqueue_terminate(struct thread **pp, struct gtaskqueue *tq) 149 { 150 151 while (tq->tq_tcount > 0 || tq->tq_callouts > 0) { 152 wakeup(tq); 153 TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0); 154 } 155 } 156 157 static void 158 gtaskqueue_free(struct gtaskqueue *queue) 159 { 160 161 TQ_LOCK(queue); 162 queue->tq_flags &= ~TQ_FLAGS_ACTIVE; 163 gtaskqueue_terminate(queue->tq_threads, queue); 164 KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?")); 165 KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks")); 166 mtx_destroy(&queue->tq_mutex); 167 free(queue->tq_threads, M_GTASKQUEUE); 168 free(queue->tq_name, M_GTASKQUEUE); 169 free(queue, M_GTASKQUEUE); 170 } 171 172 int 173 grouptaskqueue_enqueue(struct gtaskqueue *queue, struct gtask *gtask) 174 { 175 TQ_LOCK(queue); 176 if (gtask->ta_flags & TASK_ENQUEUED) { 177 TQ_UNLOCK(queue); 178 return (0); 179 } 180 STAILQ_INSERT_TAIL(&queue->tq_queue, gtask, ta_link); 181 gtask->ta_flags |= TASK_ENQUEUED; 182 TQ_UNLOCK(queue); 183 if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) 184 queue->tq_enqueue(queue->tq_context); 185 return (0); 186 } 187 188 static void 189 gtaskqueue_task_nop_fn(void *context) 190 { 191 } 192 193 /* 194 * Block until all currently queued tasks in this taskqueue 195 * have begun execution. Tasks queued during execution of 196 * this function are ignored. 197 */ 198 static void 199 gtaskqueue_drain_tq_queue(struct gtaskqueue *queue) 200 { 201 struct gtask t_barrier; 202 203 if (STAILQ_EMPTY(&queue->tq_queue)) 204 return; 205 206 /* 207 * Enqueue our barrier after all current tasks, but with 208 * the highest priority so that newly queued tasks cannot 209 * pass it. Because of the high priority, we can not use 210 * taskqueue_enqueue_locked directly (which drops the lock 211 * anyway) so just insert it at tail while we have the 212 * queue lock. 213 */ 214 GTASK_INIT(&t_barrier, 0, USHRT_MAX, gtaskqueue_task_nop_fn, &t_barrier); 215 STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link); 216 t_barrier.ta_flags |= TASK_ENQUEUED; 217 218 /* 219 * Once the barrier has executed, all previously queued tasks 220 * have completed or are currently executing. 221 */ 222 while (t_barrier.ta_flags & TASK_ENQUEUED) 223 TQ_SLEEP(queue, &t_barrier, &queue->tq_mutex, PWAIT, "-", 0); 224 } 225 226 /* 227 * Block until all currently executing tasks for this taskqueue 228 * complete. Tasks that begin execution during the execution 229 * of this function are ignored. 230 */ 231 static void 232 gtaskqueue_drain_tq_active(struct gtaskqueue *queue) 233 { 234 struct gtaskqueue_busy tb_marker, *tb_first; 235 236 if (TAILQ_EMPTY(&queue->tq_active)) 237 return; 238 239 /* Block taskq_terminate().*/ 240 queue->tq_callouts++; 241 242 /* 243 * Wait for all currently executing taskqueue threads 244 * to go idle. 245 */ 246 tb_marker.tb_running = TB_DRAIN_WAITER; 247 TAILQ_INSERT_TAIL(&queue->tq_active, &tb_marker, tb_link); 248 while (TAILQ_FIRST(&queue->tq_active) != &tb_marker) 249 TQ_SLEEP(queue, &tb_marker, &queue->tq_mutex, PWAIT, "-", 0); 250 TAILQ_REMOVE(&queue->tq_active, &tb_marker, tb_link); 251 252 /* 253 * Wakeup any other drain waiter that happened to queue up 254 * without any intervening active thread. 255 */ 256 tb_first = TAILQ_FIRST(&queue->tq_active); 257 if (tb_first != NULL && tb_first->tb_running == TB_DRAIN_WAITER) 258 wakeup(tb_first); 259 260 /* Release taskqueue_terminate(). */ 261 queue->tq_callouts--; 262 if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0) 263 wakeup_one(queue->tq_threads); 264 } 265 266 void 267 gtaskqueue_block(struct gtaskqueue *queue) 268 { 269 270 TQ_LOCK(queue); 271 queue->tq_flags |= TQ_FLAGS_BLOCKED; 272 TQ_UNLOCK(queue); 273 } 274 275 void 276 gtaskqueue_unblock(struct gtaskqueue *queue) 277 { 278 279 TQ_LOCK(queue); 280 queue->tq_flags &= ~TQ_FLAGS_BLOCKED; 281 if (!STAILQ_EMPTY(&queue->tq_queue)) 282 queue->tq_enqueue(queue->tq_context); 283 TQ_UNLOCK(queue); 284 } 285 286 static void 287 gtaskqueue_run_locked(struct gtaskqueue *queue) 288 { 289 struct gtaskqueue_busy tb; 290 struct gtaskqueue_busy *tb_first; 291 struct gtask *gtask; 292 293 KASSERT(queue != NULL, ("tq is NULL")); 294 TQ_ASSERT_LOCKED(queue); 295 tb.tb_running = NULL; 296 297 while (STAILQ_FIRST(&queue->tq_queue)) { 298 TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link); 299 300 /* 301 * Carefully remove the first task from the queue and 302 * clear its TASK_ENQUEUED flag 303 */ 304 gtask = STAILQ_FIRST(&queue->tq_queue); 305 KASSERT(gtask != NULL, ("task is NULL")); 306 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 307 gtask->ta_flags &= ~TASK_ENQUEUED; 308 tb.tb_running = gtask; 309 TQ_UNLOCK(queue); 310 311 KASSERT(gtask->ta_func != NULL, ("task->ta_func is NULL")); 312 gtask->ta_func(gtask->ta_context); 313 314 TQ_LOCK(queue); 315 tb.tb_running = NULL; 316 wakeup(gtask); 317 318 TAILQ_REMOVE(&queue->tq_active, &tb, tb_link); 319 tb_first = TAILQ_FIRST(&queue->tq_active); 320 if (tb_first != NULL && 321 tb_first->tb_running == TB_DRAIN_WAITER) 322 wakeup(tb_first); 323 } 324 } 325 326 static int 327 task_is_running(struct gtaskqueue *queue, struct gtask *gtask) 328 { 329 struct gtaskqueue_busy *tb; 330 331 TQ_ASSERT_LOCKED(queue); 332 TAILQ_FOREACH(tb, &queue->tq_active, tb_link) { 333 if (tb->tb_running == gtask) 334 return (1); 335 } 336 return (0); 337 } 338 339 static int 340 gtaskqueue_cancel_locked(struct gtaskqueue *queue, struct gtask *gtask) 341 { 342 343 if (gtask->ta_flags & TASK_ENQUEUED) 344 STAILQ_REMOVE(&queue->tq_queue, gtask, gtask, ta_link); 345 gtask->ta_flags &= ~TASK_ENQUEUED; 346 return (task_is_running(queue, gtask) ? EBUSY : 0); 347 } 348 349 int 350 gtaskqueue_cancel(struct gtaskqueue *queue, struct gtask *gtask) 351 { 352 int error; 353 354 TQ_LOCK(queue); 355 error = gtaskqueue_cancel_locked(queue, gtask); 356 TQ_UNLOCK(queue); 357 358 return (error); 359 } 360 361 void 362 gtaskqueue_drain(struct gtaskqueue *queue, struct gtask *gtask) 363 { 364 365 if (!queue->tq_spin) 366 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 367 368 TQ_LOCK(queue); 369 while ((gtask->ta_flags & TASK_ENQUEUED) || task_is_running(queue, gtask)) 370 TQ_SLEEP(queue, gtask, &queue->tq_mutex, PWAIT, "-", 0); 371 TQ_UNLOCK(queue); 372 } 373 374 void 375 gtaskqueue_drain_all(struct gtaskqueue *queue) 376 { 377 378 if (!queue->tq_spin) 379 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 380 381 TQ_LOCK(queue); 382 gtaskqueue_drain_tq_queue(queue); 383 gtaskqueue_drain_tq_active(queue); 384 TQ_UNLOCK(queue); 385 } 386 387 static int 388 _gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri, 389 cpuset_t *mask, const char *name, va_list ap) 390 { 391 char ktname[MAXCOMLEN + 1]; 392 struct thread *td; 393 struct gtaskqueue *tq; 394 int i, error; 395 396 if (count <= 0) 397 return (EINVAL); 398 399 vsnprintf(ktname, sizeof(ktname), name, ap); 400 tq = *tqp; 401 402 tq->tq_threads = malloc(sizeof(struct thread *) * count, M_GTASKQUEUE, 403 M_NOWAIT | M_ZERO); 404 if (tq->tq_threads == NULL) { 405 printf("%s: no memory for %s threads\n", __func__, ktname); 406 return (ENOMEM); 407 } 408 409 for (i = 0; i < count; i++) { 410 if (count == 1) 411 error = kthread_add(gtaskqueue_thread_loop, tqp, NULL, 412 &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname); 413 else 414 error = kthread_add(gtaskqueue_thread_loop, tqp, NULL, 415 &tq->tq_threads[i], RFSTOPPED, 0, 416 "%s_%d", ktname, i); 417 if (error) { 418 /* should be ok to continue, taskqueue_free will dtrt */ 419 printf("%s: kthread_add(%s): error %d", __func__, 420 ktname, error); 421 tq->tq_threads[i] = NULL; /* paranoid */ 422 } else 423 tq->tq_tcount++; 424 } 425 for (i = 0; i < count; i++) { 426 if (tq->tq_threads[i] == NULL) 427 continue; 428 td = tq->tq_threads[i]; 429 if (mask) { 430 error = cpuset_setthread(td->td_tid, mask); 431 /* 432 * Failing to pin is rarely an actual fatal error; 433 * it'll just affect performance. 434 */ 435 if (error) 436 printf("%s: curthread=%llu: can't pin; " 437 "error=%d\n", 438 __func__, 439 (unsigned long long) td->td_tid, 440 error); 441 } 442 thread_lock(td); 443 sched_prio(td, pri); 444 sched_add(td, SRQ_BORING); 445 thread_unlock(td); 446 } 447 448 return (0); 449 } 450 451 static int 452 gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri, 453 const char *name, ...) 454 { 455 va_list ap; 456 int error; 457 458 va_start(ap, name); 459 error = _gtaskqueue_start_threads(tqp, count, pri, NULL, name, ap); 460 va_end(ap); 461 return (error); 462 } 463 464 static inline void 465 gtaskqueue_run_callback(struct gtaskqueue *tq, 466 enum taskqueue_callback_type cb_type) 467 { 468 taskqueue_callback_fn tq_callback; 469 470 TQ_ASSERT_UNLOCKED(tq); 471 tq_callback = tq->tq_callbacks[cb_type]; 472 if (tq_callback != NULL) 473 tq_callback(tq->tq_cb_contexts[cb_type]); 474 } 475 476 static void 477 gtaskqueue_thread_loop(void *arg) 478 { 479 struct gtaskqueue **tqp, *tq; 480 481 tqp = arg; 482 tq = *tqp; 483 gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT); 484 TQ_LOCK(tq); 485 while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) { 486 /* XXX ? */ 487 gtaskqueue_run_locked(tq); 488 /* 489 * Because taskqueue_run() can drop tq_mutex, we need to 490 * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the 491 * meantime, which means we missed a wakeup. 492 */ 493 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0) 494 break; 495 TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0); 496 } 497 gtaskqueue_run_locked(tq); 498 /* 499 * This thread is on its way out, so just drop the lock temporarily 500 * in order to call the shutdown callback. This allows the callback 501 * to look at the taskqueue, even just before it dies. 502 */ 503 TQ_UNLOCK(tq); 504 gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN); 505 TQ_LOCK(tq); 506 507 /* rendezvous with thread that asked us to terminate */ 508 tq->tq_tcount--; 509 wakeup_one(tq->tq_threads); 510 TQ_UNLOCK(tq); 511 kthread_exit(); 512 } 513 514 static void 515 gtaskqueue_thread_enqueue(void *context) 516 { 517 struct gtaskqueue **tqp, *tq; 518 519 tqp = context; 520 tq = *tqp; 521 wakeup_one(tq); 522 } 523 524 525 static struct gtaskqueue * 526 gtaskqueue_create_fast(const char *name, int mflags, 527 taskqueue_enqueue_fn enqueue, void *context) 528 { 529 return _gtaskqueue_create(name, mflags, enqueue, context, 530 MTX_SPIN, "fast_taskqueue"); 531 } 532 533 534 struct taskqgroup_cpu { 535 LIST_HEAD(, grouptask) tgc_tasks; 536 struct gtaskqueue *tgc_taskq; 537 int tgc_cnt; 538 int tgc_cpu; 539 }; 540 541 struct taskqgroup { 542 struct taskqgroup_cpu tqg_queue[MAXCPU]; 543 struct mtx tqg_lock; 544 char * tqg_name; 545 int tqg_adjusting; 546 int tqg_stride; 547 int tqg_cnt; 548 }; 549 550 struct taskq_bind_task { 551 struct gtask bt_task; 552 int bt_cpuid; 553 }; 554 555 static void 556 taskqgroup_cpu_create(struct taskqgroup *qgroup, int idx, int cpu) 557 { 558 struct taskqgroup_cpu *qcpu; 559 560 qcpu = &qgroup->tqg_queue[idx]; 561 LIST_INIT(&qcpu->tgc_tasks); 562 qcpu->tgc_taskq = gtaskqueue_create_fast(NULL, M_WAITOK, 563 taskqueue_thread_enqueue, &qcpu->tgc_taskq); 564 gtaskqueue_start_threads(&qcpu->tgc_taskq, 1, PI_SOFT, 565 "%s_%d", qgroup->tqg_name, idx); 566 qcpu->tgc_cpu = cpu; 567 } 568 569 static void 570 taskqgroup_cpu_remove(struct taskqgroup *qgroup, int idx) 571 { 572 573 gtaskqueue_free(qgroup->tqg_queue[idx].tgc_taskq); 574 } 575 576 /* 577 * Find the taskq with least # of tasks that doesn't currently have any 578 * other queues from the uniq identifier. 579 */ 580 static int 581 taskqgroup_find(struct taskqgroup *qgroup, void *uniq) 582 { 583 struct grouptask *n; 584 int i, idx, mincnt; 585 int strict; 586 587 mtx_assert(&qgroup->tqg_lock, MA_OWNED); 588 if (qgroup->tqg_cnt == 0) 589 return (0); 590 idx = -1; 591 mincnt = INT_MAX; 592 /* 593 * Two passes; First scan for a queue with the least tasks that 594 * does not already service this uniq id. If that fails simply find 595 * the queue with the least total tasks; 596 */ 597 for (strict = 1; mincnt == INT_MAX; strict = 0) { 598 for (i = 0; i < qgroup->tqg_cnt; i++) { 599 if (qgroup->tqg_queue[i].tgc_cnt > mincnt) 600 continue; 601 if (strict) { 602 LIST_FOREACH(n, 603 &qgroup->tqg_queue[i].tgc_tasks, gt_list) 604 if (n->gt_uniq == uniq) 605 break; 606 if (n != NULL) 607 continue; 608 } 609 mincnt = qgroup->tqg_queue[i].tgc_cnt; 610 idx = i; 611 } 612 } 613 if (idx == -1) 614 panic("taskqgroup_find: Failed to pick a qid."); 615 616 return (idx); 617 } 618 619 void 620 taskqgroup_attach(struct taskqgroup *qgroup, struct grouptask *gtask, 621 void *uniq, int irq, char *name) 622 { 623 cpuset_t mask; 624 int qid; 625 626 gtask->gt_uniq = uniq; 627 gtask->gt_name = name; 628 gtask->gt_irq = irq; 629 gtask->gt_cpu = -1; 630 mtx_lock(&qgroup->tqg_lock); 631 qid = taskqgroup_find(qgroup, uniq); 632 qgroup->tqg_queue[qid].tgc_cnt++; 633 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list); 634 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; 635 if (irq != -1 && smp_started) { 636 gtask->gt_cpu = qgroup->tqg_queue[qid].tgc_cpu; 637 CPU_ZERO(&mask); 638 CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask); 639 mtx_unlock(&qgroup->tqg_lock); 640 intr_setaffinity(irq, &mask); 641 } else 642 mtx_unlock(&qgroup->tqg_lock); 643 } 644 645 static void 646 taskqgroup_attach_deferred(struct taskqgroup *qgroup, struct grouptask *gtask) 647 { 648 cpuset_t mask; 649 int qid, cpu; 650 651 mtx_lock(&qgroup->tqg_lock); 652 qid = taskqgroup_find(qgroup, gtask->gt_uniq); 653 cpu = qgroup->tqg_queue[qid].tgc_cpu; 654 if (gtask->gt_irq != -1) { 655 mtx_unlock(&qgroup->tqg_lock); 656 657 CPU_ZERO(&mask); 658 CPU_SET(cpu, &mask); 659 intr_setaffinity(gtask->gt_irq, &mask); 660 661 mtx_lock(&qgroup->tqg_lock); 662 } 663 qgroup->tqg_queue[qid].tgc_cnt++; 664 665 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, 666 gt_list); 667 MPASS(qgroup->tqg_queue[qid].tgc_taskq != NULL); 668 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; 669 mtx_unlock(&qgroup->tqg_lock); 670 } 671 672 int 673 taskqgroup_attach_cpu(struct taskqgroup *qgroup, struct grouptask *gtask, 674 void *uniq, int cpu, int irq, char *name) 675 { 676 cpuset_t mask; 677 int i, qid; 678 679 qid = -1; 680 gtask->gt_uniq = uniq; 681 gtask->gt_name = name; 682 gtask->gt_irq = irq; 683 gtask->gt_cpu = cpu; 684 mtx_lock(&qgroup->tqg_lock); 685 if (smp_started) { 686 for (i = 0; i < qgroup->tqg_cnt; i++) 687 if (qgroup->tqg_queue[i].tgc_cpu == cpu) { 688 qid = i; 689 break; 690 } 691 if (qid == -1) { 692 mtx_unlock(&qgroup->tqg_lock); 693 return (EINVAL); 694 } 695 } else 696 qid = 0; 697 qgroup->tqg_queue[qid].tgc_cnt++; 698 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list); 699 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; 700 cpu = qgroup->tqg_queue[qid].tgc_cpu; 701 mtx_unlock(&qgroup->tqg_lock); 702 703 CPU_ZERO(&mask); 704 CPU_SET(cpu, &mask); 705 if (irq != -1 && smp_started) 706 intr_setaffinity(irq, &mask); 707 return (0); 708 } 709 710 static int 711 taskqgroup_attach_cpu_deferred(struct taskqgroup *qgroup, struct grouptask *gtask) 712 { 713 cpuset_t mask; 714 int i, qid, irq, cpu; 715 716 qid = -1; 717 irq = gtask->gt_irq; 718 cpu = gtask->gt_cpu; 719 MPASS(smp_started); 720 mtx_lock(&qgroup->tqg_lock); 721 for (i = 0; i < qgroup->tqg_cnt; i++) 722 if (qgroup->tqg_queue[i].tgc_cpu == cpu) { 723 qid = i; 724 break; 725 } 726 if (qid == -1) { 727 mtx_unlock(&qgroup->tqg_lock); 728 return (EINVAL); 729 } 730 qgroup->tqg_queue[qid].tgc_cnt++; 731 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list); 732 MPASS(qgroup->tqg_queue[qid].tgc_taskq != NULL); 733 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; 734 mtx_unlock(&qgroup->tqg_lock); 735 736 CPU_ZERO(&mask); 737 CPU_SET(cpu, &mask); 738 739 if (irq != -1) 740 intr_setaffinity(irq, &mask); 741 return (0); 742 } 743 744 void 745 taskqgroup_detach(struct taskqgroup *qgroup, struct grouptask *gtask) 746 { 747 int i; 748 749 mtx_lock(&qgroup->tqg_lock); 750 for (i = 0; i < qgroup->tqg_cnt; i++) 751 if (qgroup->tqg_queue[i].tgc_taskq == gtask->gt_taskqueue) 752 break; 753 if (i == qgroup->tqg_cnt) 754 panic("taskqgroup_detach: task not in group\n"); 755 qgroup->tqg_queue[i].tgc_cnt--; 756 LIST_REMOVE(gtask, gt_list); 757 mtx_unlock(&qgroup->tqg_lock); 758 gtask->gt_taskqueue = NULL; 759 } 760 761 static void 762 taskqgroup_binder(void *ctx) 763 { 764 struct taskq_bind_task *gtask = (struct taskq_bind_task *)ctx; 765 cpuset_t mask; 766 int error; 767 768 CPU_ZERO(&mask); 769 CPU_SET(gtask->bt_cpuid, &mask); 770 error = cpuset_setthread(curthread->td_tid, &mask); 771 thread_lock(curthread); 772 sched_bind(curthread, gtask->bt_cpuid); 773 thread_unlock(curthread); 774 775 if (error) 776 printf("taskqgroup_binder: setaffinity failed: %d\n", 777 error); 778 free(gtask, M_DEVBUF); 779 } 780 781 static void 782 taskqgroup_bind(struct taskqgroup *qgroup) 783 { 784 struct taskq_bind_task *gtask; 785 int i; 786 787 /* 788 * Bind taskqueue threads to specific CPUs, if they have been assigned 789 * one. 790 */ 791 if (qgroup->tqg_cnt == 1) 792 return; 793 794 for (i = 0; i < qgroup->tqg_cnt; i++) { 795 gtask = malloc(sizeof (*gtask), M_DEVBUF, M_WAITOK); 796 GTASK_INIT(>ask->bt_task, 0, 0, taskqgroup_binder, gtask); 797 gtask->bt_cpuid = qgroup->tqg_queue[i].tgc_cpu; 798 grouptaskqueue_enqueue(qgroup->tqg_queue[i].tgc_taskq, 799 >ask->bt_task); 800 } 801 } 802 803 static int 804 _taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride) 805 { 806 LIST_HEAD(, grouptask) gtask_head = LIST_HEAD_INITIALIZER(NULL); 807 struct grouptask *gtask; 808 int i, k, old_cnt, old_cpu, cpu; 809 810 mtx_assert(&qgroup->tqg_lock, MA_OWNED); 811 812 if (cnt < 1 || cnt * stride > mp_ncpus || !smp_started) { 813 printf("taskqgroup_adjust failed cnt: %d stride: %d mp_ncpus: %d smp_started: %d\n", 814 cnt, stride, mp_ncpus, smp_started); 815 return (EINVAL); 816 } 817 if (qgroup->tqg_adjusting) { 818 printf("taskqgroup_adjust failed: adjusting\n"); 819 return (EBUSY); 820 } 821 qgroup->tqg_adjusting = 1; 822 old_cnt = qgroup->tqg_cnt; 823 old_cpu = 0; 824 if (old_cnt < cnt) 825 old_cpu = qgroup->tqg_queue[old_cnt].tgc_cpu; 826 mtx_unlock(&qgroup->tqg_lock); 827 /* 828 * Set up queue for tasks added before boot. 829 */ 830 if (old_cnt == 0) { 831 LIST_SWAP(>ask_head, &qgroup->tqg_queue[0].tgc_tasks, 832 grouptask, gt_list); 833 qgroup->tqg_queue[0].tgc_cnt = 0; 834 } 835 836 /* 837 * If new taskq threads have been added. 838 */ 839 cpu = old_cpu; 840 for (i = old_cnt; i < cnt; i++) { 841 taskqgroup_cpu_create(qgroup, i, cpu); 842 843 for (k = 0; k < stride; k++) 844 cpu = CPU_NEXT(cpu); 845 } 846 mtx_lock(&qgroup->tqg_lock); 847 qgroup->tqg_cnt = cnt; 848 qgroup->tqg_stride = stride; 849 850 /* 851 * Adjust drivers to use new taskqs. 852 */ 853 for (i = 0; i < old_cnt; i++) { 854 while ((gtask = LIST_FIRST(&qgroup->tqg_queue[i].tgc_tasks))) { 855 LIST_REMOVE(gtask, gt_list); 856 qgroup->tqg_queue[i].tgc_cnt--; 857 LIST_INSERT_HEAD(>ask_head, gtask, gt_list); 858 } 859 } 860 mtx_unlock(&qgroup->tqg_lock); 861 862 while ((gtask = LIST_FIRST(>ask_head))) { 863 LIST_REMOVE(gtask, gt_list); 864 if (gtask->gt_cpu == -1) 865 taskqgroup_attach_deferred(qgroup, gtask); 866 else if (taskqgroup_attach_cpu_deferred(qgroup, gtask)) 867 taskqgroup_attach_deferred(qgroup, gtask); 868 } 869 870 #ifdef INVARIANTS 871 mtx_lock(&qgroup->tqg_lock); 872 for (i = 0; i < qgroup->tqg_cnt; i++) { 873 MPASS(qgroup->tqg_queue[i].tgc_taskq != NULL); 874 LIST_FOREACH(gtask, &qgroup->tqg_queue[i].tgc_tasks, gt_list) 875 MPASS(gtask->gt_taskqueue != NULL); 876 } 877 mtx_unlock(&qgroup->tqg_lock); 878 #endif 879 /* 880 * If taskq thread count has been reduced. 881 */ 882 for (i = cnt; i < old_cnt; i++) 883 taskqgroup_cpu_remove(qgroup, i); 884 885 taskqgroup_bind(qgroup); 886 887 mtx_lock(&qgroup->tqg_lock); 888 qgroup->tqg_adjusting = 0; 889 890 return (0); 891 } 892 893 int 894 taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride) 895 { 896 int error; 897 898 mtx_lock(&qgroup->tqg_lock); 899 error = _taskqgroup_adjust(qgroup, cnt, stride); 900 mtx_unlock(&qgroup->tqg_lock); 901 902 return (error); 903 } 904 905 struct taskqgroup * 906 taskqgroup_create(char *name) 907 { 908 struct taskqgroup *qgroup; 909 910 qgroup = malloc(sizeof(*qgroup), M_GTASKQUEUE, M_WAITOK | M_ZERO); 911 mtx_init(&qgroup->tqg_lock, "taskqgroup", NULL, MTX_DEF); 912 qgroup->tqg_name = name; 913 LIST_INIT(&qgroup->tqg_queue[0].tgc_tasks); 914 915 return (qgroup); 916 } 917 918 void 919 taskqgroup_destroy(struct taskqgroup *qgroup) 920 { 921 922 } 923