1 /*- 2 * Copyright (c) 2000 Doug Rabson 3 * Copyright (c) 2014 Jeff Roberson 4 * Copyright (c) 2016 Matthew Macy 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26 * SUCH DAMAGE. 27 */ 28 29 #include <sys/cdefs.h> 30 __FBSDID("$FreeBSD$"); 31 32 #include <sys/param.h> 33 #include <sys/systm.h> 34 #include <sys/bus.h> 35 #include <sys/cpuset.h> 36 #include <sys/interrupt.h> 37 #include <sys/kernel.h> 38 #include <sys/kthread.h> 39 #include <sys/libkern.h> 40 #include <sys/limits.h> 41 #include <sys/lock.h> 42 #include <sys/malloc.h> 43 #include <sys/mutex.h> 44 #include <sys/proc.h> 45 #include <sys/sched.h> 46 #include <sys/smp.h> 47 #include <sys/gtaskqueue.h> 48 #include <sys/unistd.h> 49 #include <machine/stdarg.h> 50 51 static MALLOC_DEFINE(M_GTASKQUEUE, "taskqueue", "Task Queues"); 52 static void gtaskqueue_thread_enqueue(void *); 53 static void gtaskqueue_thread_loop(void *arg); 54 55 56 struct gtaskqueue_busy { 57 struct gtask *tb_running; 58 TAILQ_ENTRY(gtaskqueue_busy) tb_link; 59 }; 60 61 static struct gtask * const TB_DRAIN_WAITER = (struct gtask *)0x1; 62 63 struct gtaskqueue { 64 STAILQ_HEAD(, gtask) tq_queue; 65 gtaskqueue_enqueue_fn tq_enqueue; 66 void *tq_context; 67 char *tq_name; 68 TAILQ_HEAD(, gtaskqueue_busy) tq_active; 69 struct mtx tq_mutex; 70 struct thread **tq_threads; 71 int tq_tcount; 72 int tq_spin; 73 int tq_flags; 74 int tq_callouts; 75 taskqueue_callback_fn tq_callbacks[TASKQUEUE_NUM_CALLBACKS]; 76 void *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS]; 77 }; 78 79 #define TQ_FLAGS_ACTIVE (1 << 0) 80 #define TQ_FLAGS_BLOCKED (1 << 1) 81 #define TQ_FLAGS_UNLOCKED_ENQUEUE (1 << 2) 82 83 #define DT_CALLOUT_ARMED (1 << 0) 84 85 #define TQ_LOCK(tq) \ 86 do { \ 87 if ((tq)->tq_spin) \ 88 mtx_lock_spin(&(tq)->tq_mutex); \ 89 else \ 90 mtx_lock(&(tq)->tq_mutex); \ 91 } while (0) 92 #define TQ_ASSERT_LOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_OWNED) 93 94 #define TQ_UNLOCK(tq) \ 95 do { \ 96 if ((tq)->tq_spin) \ 97 mtx_unlock_spin(&(tq)->tq_mutex); \ 98 else \ 99 mtx_unlock(&(tq)->tq_mutex); \ 100 } while (0) 101 #define TQ_ASSERT_UNLOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED) 102 103 static __inline int 104 TQ_SLEEP(struct gtaskqueue *tq, void *p, struct mtx *m, int pri, const char *wm, 105 int t) 106 { 107 if (tq->tq_spin) 108 return (msleep_spin(p, m, wm, t)); 109 return (msleep(p, m, pri, wm, t)); 110 } 111 112 static struct gtaskqueue * 113 _gtaskqueue_create(const char *name, int mflags, 114 taskqueue_enqueue_fn enqueue, void *context, 115 int mtxflags, const char *mtxname __unused) 116 { 117 struct gtaskqueue *queue; 118 char *tq_name; 119 120 tq_name = malloc(TASKQUEUE_NAMELEN, M_GTASKQUEUE, mflags | M_ZERO); 121 if (!tq_name) 122 return (NULL); 123 124 snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue"); 125 126 queue = malloc(sizeof(struct gtaskqueue), M_GTASKQUEUE, mflags | M_ZERO); 127 if (!queue) 128 return (NULL); 129 130 STAILQ_INIT(&queue->tq_queue); 131 TAILQ_INIT(&queue->tq_active); 132 queue->tq_enqueue = enqueue; 133 queue->tq_context = context; 134 queue->tq_name = tq_name; 135 queue->tq_spin = (mtxflags & MTX_SPIN) != 0; 136 queue->tq_flags |= TQ_FLAGS_ACTIVE; 137 if (enqueue == gtaskqueue_thread_enqueue) 138 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE; 139 mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags); 140 141 return (queue); 142 } 143 144 145 /* 146 * Signal a taskqueue thread to terminate. 147 */ 148 static void 149 gtaskqueue_terminate(struct thread **pp, struct gtaskqueue *tq) 150 { 151 152 while (tq->tq_tcount > 0 || tq->tq_callouts > 0) { 153 wakeup(tq); 154 TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0); 155 } 156 } 157 158 static void 159 gtaskqueue_free(struct gtaskqueue *queue) 160 { 161 162 TQ_LOCK(queue); 163 queue->tq_flags &= ~TQ_FLAGS_ACTIVE; 164 gtaskqueue_terminate(queue->tq_threads, queue); 165 KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?")); 166 KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks")); 167 mtx_destroy(&queue->tq_mutex); 168 free(queue->tq_threads, M_GTASKQUEUE); 169 free(queue->tq_name, M_GTASKQUEUE); 170 free(queue, M_GTASKQUEUE); 171 } 172 173 int 174 grouptaskqueue_enqueue(struct gtaskqueue *queue, struct gtask *gtask) 175 { 176 TQ_LOCK(queue); 177 if (gtask->ta_flags & TASK_ENQUEUED) { 178 TQ_UNLOCK(queue); 179 return (0); 180 } 181 STAILQ_INSERT_TAIL(&queue->tq_queue, gtask, ta_link); 182 gtask->ta_flags |= TASK_ENQUEUED; 183 TQ_UNLOCK(queue); 184 if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) 185 queue->tq_enqueue(queue->tq_context); 186 return (0); 187 } 188 189 static void 190 gtaskqueue_task_nop_fn(void *context) 191 { 192 } 193 194 /* 195 * Block until all currently queued tasks in this taskqueue 196 * have begun execution. Tasks queued during execution of 197 * this function are ignored. 198 */ 199 static void 200 gtaskqueue_drain_tq_queue(struct gtaskqueue *queue) 201 { 202 struct gtask t_barrier; 203 204 if (STAILQ_EMPTY(&queue->tq_queue)) 205 return; 206 207 /* 208 * Enqueue our barrier after all current tasks, but with 209 * the highest priority so that newly queued tasks cannot 210 * pass it. Because of the high priority, we can not use 211 * taskqueue_enqueue_locked directly (which drops the lock 212 * anyway) so just insert it at tail while we have the 213 * queue lock. 214 */ 215 GTASK_INIT(&t_barrier, 0, USHRT_MAX, gtaskqueue_task_nop_fn, &t_barrier); 216 STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link); 217 t_barrier.ta_flags |= TASK_ENQUEUED; 218 219 /* 220 * Once the barrier has executed, all previously queued tasks 221 * have completed or are currently executing. 222 */ 223 while (t_barrier.ta_flags & TASK_ENQUEUED) 224 TQ_SLEEP(queue, &t_barrier, &queue->tq_mutex, PWAIT, "-", 0); 225 } 226 227 /* 228 * Block until all currently executing tasks for this taskqueue 229 * complete. Tasks that begin execution during the execution 230 * of this function are ignored. 231 */ 232 static void 233 gtaskqueue_drain_tq_active(struct gtaskqueue *queue) 234 { 235 struct gtaskqueue_busy tb_marker, *tb_first; 236 237 if (TAILQ_EMPTY(&queue->tq_active)) 238 return; 239 240 /* Block taskq_terminate().*/ 241 queue->tq_callouts++; 242 243 /* 244 * Wait for all currently executing taskqueue threads 245 * to go idle. 246 */ 247 tb_marker.tb_running = TB_DRAIN_WAITER; 248 TAILQ_INSERT_TAIL(&queue->tq_active, &tb_marker, tb_link); 249 while (TAILQ_FIRST(&queue->tq_active) != &tb_marker) 250 TQ_SLEEP(queue, &tb_marker, &queue->tq_mutex, PWAIT, "-", 0); 251 TAILQ_REMOVE(&queue->tq_active, &tb_marker, tb_link); 252 253 /* 254 * Wakeup any other drain waiter that happened to queue up 255 * without any intervening active thread. 256 */ 257 tb_first = TAILQ_FIRST(&queue->tq_active); 258 if (tb_first != NULL && tb_first->tb_running == TB_DRAIN_WAITER) 259 wakeup(tb_first); 260 261 /* Release taskqueue_terminate(). */ 262 queue->tq_callouts--; 263 if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0) 264 wakeup_one(queue->tq_threads); 265 } 266 267 void 268 gtaskqueue_block(struct gtaskqueue *queue) 269 { 270 271 TQ_LOCK(queue); 272 queue->tq_flags |= TQ_FLAGS_BLOCKED; 273 TQ_UNLOCK(queue); 274 } 275 276 void 277 gtaskqueue_unblock(struct gtaskqueue *queue) 278 { 279 280 TQ_LOCK(queue); 281 queue->tq_flags &= ~TQ_FLAGS_BLOCKED; 282 if (!STAILQ_EMPTY(&queue->tq_queue)) 283 queue->tq_enqueue(queue->tq_context); 284 TQ_UNLOCK(queue); 285 } 286 287 static void 288 gtaskqueue_run_locked(struct gtaskqueue *queue) 289 { 290 struct gtaskqueue_busy tb; 291 struct gtaskqueue_busy *tb_first; 292 struct gtask *gtask; 293 294 KASSERT(queue != NULL, ("tq is NULL")); 295 TQ_ASSERT_LOCKED(queue); 296 tb.tb_running = NULL; 297 298 while (STAILQ_FIRST(&queue->tq_queue)) { 299 TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link); 300 301 /* 302 * Carefully remove the first task from the queue and 303 * clear its TASK_ENQUEUED flag 304 */ 305 gtask = STAILQ_FIRST(&queue->tq_queue); 306 KASSERT(gtask != NULL, ("task is NULL")); 307 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 308 gtask->ta_flags &= ~TASK_ENQUEUED; 309 tb.tb_running = gtask; 310 TQ_UNLOCK(queue); 311 312 KASSERT(gtask->ta_func != NULL, ("task->ta_func is NULL")); 313 gtask->ta_func(gtask->ta_context); 314 315 TQ_LOCK(queue); 316 tb.tb_running = NULL; 317 wakeup(gtask); 318 319 TAILQ_REMOVE(&queue->tq_active, &tb, tb_link); 320 tb_first = TAILQ_FIRST(&queue->tq_active); 321 if (tb_first != NULL && 322 tb_first->tb_running == TB_DRAIN_WAITER) 323 wakeup(tb_first); 324 } 325 } 326 327 static int 328 task_is_running(struct gtaskqueue *queue, struct gtask *gtask) 329 { 330 struct gtaskqueue_busy *tb; 331 332 TQ_ASSERT_LOCKED(queue); 333 TAILQ_FOREACH(tb, &queue->tq_active, tb_link) { 334 if (tb->tb_running == gtask) 335 return (1); 336 } 337 return (0); 338 } 339 340 static int 341 gtaskqueue_cancel_locked(struct gtaskqueue *queue, struct gtask *gtask) 342 { 343 344 if (gtask->ta_flags & TASK_ENQUEUED) 345 STAILQ_REMOVE(&queue->tq_queue, gtask, gtask, ta_link); 346 gtask->ta_flags &= ~TASK_ENQUEUED; 347 return (task_is_running(queue, gtask) ? EBUSY : 0); 348 } 349 350 int 351 gtaskqueue_cancel(struct gtaskqueue *queue, struct gtask *gtask) 352 { 353 int error; 354 355 TQ_LOCK(queue); 356 error = gtaskqueue_cancel_locked(queue, gtask); 357 TQ_UNLOCK(queue); 358 359 return (error); 360 } 361 362 void 363 gtaskqueue_drain(struct gtaskqueue *queue, struct gtask *gtask) 364 { 365 366 if (!queue->tq_spin) 367 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 368 369 TQ_LOCK(queue); 370 while ((gtask->ta_flags & TASK_ENQUEUED) || task_is_running(queue, gtask)) 371 TQ_SLEEP(queue, gtask, &queue->tq_mutex, PWAIT, "-", 0); 372 TQ_UNLOCK(queue); 373 } 374 375 void 376 gtaskqueue_drain_all(struct gtaskqueue *queue) 377 { 378 379 if (!queue->tq_spin) 380 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 381 382 TQ_LOCK(queue); 383 gtaskqueue_drain_tq_queue(queue); 384 gtaskqueue_drain_tq_active(queue); 385 TQ_UNLOCK(queue); 386 } 387 388 static int 389 _gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri, 390 cpuset_t *mask, const char *name, va_list ap) 391 { 392 char ktname[MAXCOMLEN + 1]; 393 struct thread *td; 394 struct gtaskqueue *tq; 395 int i, error; 396 397 if (count <= 0) 398 return (EINVAL); 399 400 vsnprintf(ktname, sizeof(ktname), name, ap); 401 tq = *tqp; 402 403 tq->tq_threads = malloc(sizeof(struct thread *) * count, M_GTASKQUEUE, 404 M_NOWAIT | M_ZERO); 405 if (tq->tq_threads == NULL) { 406 printf("%s: no memory for %s threads\n", __func__, ktname); 407 return (ENOMEM); 408 } 409 410 for (i = 0; i < count; i++) { 411 if (count == 1) 412 error = kthread_add(gtaskqueue_thread_loop, tqp, NULL, 413 &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname); 414 else 415 error = kthread_add(gtaskqueue_thread_loop, tqp, NULL, 416 &tq->tq_threads[i], RFSTOPPED, 0, 417 "%s_%d", ktname, i); 418 if (error) { 419 /* should be ok to continue, taskqueue_free will dtrt */ 420 printf("%s: kthread_add(%s): error %d", __func__, 421 ktname, error); 422 tq->tq_threads[i] = NULL; /* paranoid */ 423 } else 424 tq->tq_tcount++; 425 } 426 for (i = 0; i < count; i++) { 427 if (tq->tq_threads[i] == NULL) 428 continue; 429 td = tq->tq_threads[i]; 430 if (mask) { 431 error = cpuset_setthread(td->td_tid, mask); 432 /* 433 * Failing to pin is rarely an actual fatal error; 434 * it'll just affect performance. 435 */ 436 if (error) 437 printf("%s: curthread=%llu: can't pin; " 438 "error=%d\n", 439 __func__, 440 (unsigned long long) td->td_tid, 441 error); 442 } 443 thread_lock(td); 444 sched_prio(td, pri); 445 sched_add(td, SRQ_BORING); 446 thread_unlock(td); 447 } 448 449 return (0); 450 } 451 452 static int 453 gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri, 454 const char *name, ...) 455 { 456 va_list ap; 457 int error; 458 459 va_start(ap, name); 460 error = _gtaskqueue_start_threads(tqp, count, pri, NULL, name, ap); 461 va_end(ap); 462 return (error); 463 } 464 465 static inline void 466 gtaskqueue_run_callback(struct gtaskqueue *tq, 467 enum taskqueue_callback_type cb_type) 468 { 469 taskqueue_callback_fn tq_callback; 470 471 TQ_ASSERT_UNLOCKED(tq); 472 tq_callback = tq->tq_callbacks[cb_type]; 473 if (tq_callback != NULL) 474 tq_callback(tq->tq_cb_contexts[cb_type]); 475 } 476 477 static void 478 gtaskqueue_thread_loop(void *arg) 479 { 480 struct gtaskqueue **tqp, *tq; 481 482 tqp = arg; 483 tq = *tqp; 484 gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT); 485 TQ_LOCK(tq); 486 while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) { 487 /* XXX ? */ 488 gtaskqueue_run_locked(tq); 489 /* 490 * Because taskqueue_run() can drop tq_mutex, we need to 491 * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the 492 * meantime, which means we missed a wakeup. 493 */ 494 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0) 495 break; 496 TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0); 497 } 498 gtaskqueue_run_locked(tq); 499 /* 500 * This thread is on its way out, so just drop the lock temporarily 501 * in order to call the shutdown callback. This allows the callback 502 * to look at the taskqueue, even just before it dies. 503 */ 504 TQ_UNLOCK(tq); 505 gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN); 506 TQ_LOCK(tq); 507 508 /* rendezvous with thread that asked us to terminate */ 509 tq->tq_tcount--; 510 wakeup_one(tq->tq_threads); 511 TQ_UNLOCK(tq); 512 kthread_exit(); 513 } 514 515 static void 516 gtaskqueue_thread_enqueue(void *context) 517 { 518 struct gtaskqueue **tqp, *tq; 519 520 tqp = context; 521 tq = *tqp; 522 wakeup_one(tq); 523 } 524 525 526 static struct gtaskqueue * 527 gtaskqueue_create_fast(const char *name, int mflags, 528 taskqueue_enqueue_fn enqueue, void *context) 529 { 530 return _gtaskqueue_create(name, mflags, enqueue, context, 531 MTX_SPIN, "fast_taskqueue"); 532 } 533 534 535 struct taskqgroup_cpu { 536 LIST_HEAD(, grouptask) tgc_tasks; 537 struct gtaskqueue *tgc_taskq; 538 int tgc_cnt; 539 int tgc_cpu; 540 }; 541 542 struct taskqgroup { 543 struct taskqgroup_cpu tqg_queue[MAXCPU]; 544 struct mtx tqg_lock; 545 char * tqg_name; 546 int tqg_adjusting; 547 int tqg_stride; 548 int tqg_cnt; 549 }; 550 551 struct taskq_bind_task { 552 struct gtask bt_task; 553 int bt_cpuid; 554 }; 555 556 static void 557 taskqgroup_cpu_create(struct taskqgroup *qgroup, int idx) 558 { 559 struct taskqgroup_cpu *qcpu; 560 561 qcpu = &qgroup->tqg_queue[idx]; 562 LIST_INIT(&qcpu->tgc_tasks); 563 qcpu->tgc_taskq = gtaskqueue_create_fast(NULL, M_WAITOK, 564 taskqueue_thread_enqueue, &qcpu->tgc_taskq); 565 gtaskqueue_start_threads(&qcpu->tgc_taskq, 1, PI_SOFT, 566 "%s_%d", qgroup->tqg_name, idx); 567 qcpu->tgc_cpu = idx * qgroup->tqg_stride; 568 } 569 570 static void 571 taskqgroup_cpu_remove(struct taskqgroup *qgroup, int idx) 572 { 573 574 gtaskqueue_free(qgroup->tqg_queue[idx].tgc_taskq); 575 } 576 577 /* 578 * Find the taskq with least # of tasks that doesn't currently have any 579 * other queues from the uniq identifier. 580 */ 581 static int 582 taskqgroup_find(struct taskqgroup *qgroup, void *uniq) 583 { 584 struct grouptask *n; 585 int i, idx, mincnt; 586 int strict; 587 588 mtx_assert(&qgroup->tqg_lock, MA_OWNED); 589 if (qgroup->tqg_cnt == 0) 590 return (0); 591 idx = -1; 592 mincnt = INT_MAX; 593 /* 594 * Two passes; First scan for a queue with the least tasks that 595 * does not already service this uniq id. If that fails simply find 596 * the queue with the least total tasks; 597 */ 598 for (strict = 1; mincnt == INT_MAX; strict = 0) { 599 for (i = 0; i < qgroup->tqg_cnt; i++) { 600 if (qgroup->tqg_queue[i].tgc_cnt > mincnt) 601 continue; 602 if (strict) { 603 LIST_FOREACH(n, 604 &qgroup->tqg_queue[i].tgc_tasks, gt_list) 605 if (n->gt_uniq == uniq) 606 break; 607 if (n != NULL) 608 continue; 609 } 610 mincnt = qgroup->tqg_queue[i].tgc_cnt; 611 idx = i; 612 } 613 } 614 if (idx == -1) 615 panic("taskqgroup_find: Failed to pick a qid."); 616 617 return (idx); 618 } 619 620 void 621 taskqgroup_attach(struct taskqgroup *qgroup, struct grouptask *gtask, 622 void *uniq, int irq, char *name) 623 { 624 cpuset_t mask; 625 int qid; 626 627 gtask->gt_uniq = uniq; 628 gtask->gt_name = name; 629 gtask->gt_irq = irq; 630 gtask->gt_cpu = -1; 631 mtx_lock(&qgroup->tqg_lock); 632 qid = taskqgroup_find(qgroup, uniq); 633 qgroup->tqg_queue[qid].tgc_cnt++; 634 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list); 635 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; 636 if (irq != -1 && smp_started) { 637 CPU_ZERO(&mask); 638 CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask); 639 mtx_unlock(&qgroup->tqg_lock); 640 intr_setaffinity(irq, &mask); 641 } else 642 mtx_unlock(&qgroup->tqg_lock); 643 } 644 645 int 646 taskqgroup_attach_cpu(struct taskqgroup *qgroup, struct grouptask *gtask, 647 void *uniq, int cpu, int irq, char *name) 648 { 649 cpuset_t mask; 650 int i, qid; 651 652 qid = -1; 653 gtask->gt_uniq = uniq; 654 gtask->gt_name = name; 655 gtask->gt_irq = irq; 656 gtask->gt_cpu = cpu; 657 mtx_lock(&qgroup->tqg_lock); 658 if (smp_started) { 659 for (i = 0; i < qgroup->tqg_cnt; i++) 660 if (qgroup->tqg_queue[i].tgc_cpu == cpu) { 661 qid = i; 662 break; 663 } 664 if (qid == -1) { 665 mtx_unlock(&qgroup->tqg_lock); 666 return (EINVAL); 667 } 668 } else 669 qid = 0; 670 qgroup->tqg_queue[qid].tgc_cnt++; 671 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list); 672 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; 673 if (irq != -1 && smp_started) { 674 CPU_ZERO(&mask); 675 CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask); 676 mtx_unlock(&qgroup->tqg_lock); 677 intr_setaffinity(irq, &mask); 678 } else 679 mtx_unlock(&qgroup->tqg_lock); 680 return (0); 681 } 682 683 void 684 taskqgroup_detach(struct taskqgroup *qgroup, struct grouptask *gtask) 685 { 686 int i; 687 688 mtx_lock(&qgroup->tqg_lock); 689 for (i = 0; i < qgroup->tqg_cnt; i++) 690 if (qgroup->tqg_queue[i].tgc_taskq == gtask->gt_taskqueue) 691 break; 692 if (i == qgroup->tqg_cnt) 693 panic("taskqgroup_detach: task not in group\n"); 694 qgroup->tqg_queue[i].tgc_cnt--; 695 LIST_REMOVE(gtask, gt_list); 696 mtx_unlock(&qgroup->tqg_lock); 697 gtask->gt_taskqueue = NULL; 698 } 699 700 static void 701 taskqgroup_binder(void *ctx) 702 { 703 struct taskq_bind_task *gtask = (struct taskq_bind_task *)ctx; 704 cpuset_t mask; 705 int error; 706 707 CPU_ZERO(&mask); 708 CPU_SET(gtask->bt_cpuid, &mask); 709 error = cpuset_setthread(curthread->td_tid, &mask); 710 thread_lock(curthread); 711 sched_bind(curthread, gtask->bt_cpuid); 712 thread_unlock(curthread); 713 714 if (error) 715 printf("taskqgroup_binder: setaffinity failed: %d\n", 716 error); 717 free(gtask, M_DEVBUF); 718 } 719 720 static void 721 taskqgroup_bind(struct taskqgroup *qgroup) 722 { 723 struct taskq_bind_task *gtask; 724 int i; 725 726 /* 727 * Bind taskqueue threads to specific CPUs, if they have been assigned 728 * one. 729 */ 730 for (i = 0; i < qgroup->tqg_cnt; i++) { 731 gtask = malloc(sizeof (*gtask), M_DEVBUF, M_NOWAIT); 732 GTASK_INIT(>ask->bt_task, 0, 0, taskqgroup_binder, gtask); 733 gtask->bt_cpuid = qgroup->tqg_queue[i].tgc_cpu; 734 grouptaskqueue_enqueue(qgroup->tqg_queue[i].tgc_taskq, 735 >ask->bt_task); 736 } 737 } 738 739 static int 740 _taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride) 741 { 742 LIST_HEAD(, grouptask) gtask_head = LIST_HEAD_INITIALIZER(NULL); 743 cpuset_t mask; 744 struct grouptask *gtask; 745 int i, old_cnt, qid; 746 747 mtx_assert(&qgroup->tqg_lock, MA_OWNED); 748 749 if (cnt < 1 || cnt * stride > mp_ncpus || !smp_started) { 750 printf("taskqgroup_adjust failed cnt: %d stride: %d mp_ncpus: %d smp_started: %d\n", 751 cnt, stride, mp_ncpus, smp_started); 752 return (EINVAL); 753 } 754 if (qgroup->tqg_adjusting) { 755 printf("taskqgroup_adjust failed: adjusting\n"); 756 return (EBUSY); 757 } 758 qgroup->tqg_adjusting = 1; 759 old_cnt = qgroup->tqg_cnt; 760 mtx_unlock(&qgroup->tqg_lock); 761 /* 762 * Set up queue for tasks added before boot. 763 */ 764 if (old_cnt == 0) { 765 LIST_SWAP(>ask_head, &qgroup->tqg_queue[0].tgc_tasks, 766 grouptask, gt_list); 767 qgroup->tqg_queue[0].tgc_cnt = 0; 768 } 769 770 /* 771 * If new taskq threads have been added. 772 */ 773 for (i = old_cnt; i < cnt; i++) 774 taskqgroup_cpu_create(qgroup, i); 775 mtx_lock(&qgroup->tqg_lock); 776 qgroup->tqg_cnt = cnt; 777 qgroup->tqg_stride = stride; 778 779 /* 780 * Adjust drivers to use new taskqs. 781 */ 782 for (i = 0; i < old_cnt; i++) { 783 while ((gtask = LIST_FIRST(&qgroup->tqg_queue[i].tgc_tasks))) { 784 LIST_REMOVE(gtask, gt_list); 785 qgroup->tqg_queue[i].tgc_cnt--; 786 LIST_INSERT_HEAD(>ask_head, gtask, gt_list); 787 } 788 } 789 790 while ((gtask = LIST_FIRST(>ask_head))) { 791 LIST_REMOVE(gtask, gt_list); 792 if (gtask->gt_cpu == -1) 793 qid = taskqgroup_find(qgroup, gtask->gt_uniq); 794 else { 795 for (i = 0; i < qgroup->tqg_cnt; i++) 796 if (qgroup->tqg_queue[i].tgc_cpu == gtask->gt_cpu) { 797 qid = i; 798 break; 799 } 800 } 801 qgroup->tqg_queue[qid].tgc_cnt++; 802 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, 803 gt_list); 804 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; 805 } 806 /* 807 * Set new CPU and IRQ affinity 808 */ 809 for (i = 0; i < cnt; i++) { 810 qgroup->tqg_queue[i].tgc_cpu = i * qgroup->tqg_stride; 811 CPU_ZERO(&mask); 812 CPU_SET(qgroup->tqg_queue[i].tgc_cpu, &mask); 813 LIST_FOREACH(gtask, &qgroup->tqg_queue[i].tgc_tasks, gt_list) { 814 if (gtask->gt_irq == -1) 815 continue; 816 intr_setaffinity(gtask->gt_irq, &mask); 817 } 818 } 819 mtx_unlock(&qgroup->tqg_lock); 820 821 /* 822 * If taskq thread count has been reduced. 823 */ 824 for (i = cnt; i < old_cnt; i++) 825 taskqgroup_cpu_remove(qgroup, i); 826 827 mtx_lock(&qgroup->tqg_lock); 828 qgroup->tqg_adjusting = 0; 829 830 taskqgroup_bind(qgroup); 831 832 return (0); 833 } 834 835 int 836 taskqgroup_adjust(struct taskqgroup *qgroup, int cpu, int stride) 837 { 838 int error; 839 840 mtx_lock(&qgroup->tqg_lock); 841 error = _taskqgroup_adjust(qgroup, cpu, stride); 842 mtx_unlock(&qgroup->tqg_lock); 843 844 return (error); 845 } 846 847 struct taskqgroup * 848 taskqgroup_create(char *name) 849 { 850 struct taskqgroup *qgroup; 851 852 qgroup = malloc(sizeof(*qgroup), M_GTASKQUEUE, M_WAITOK | M_ZERO); 853 mtx_init(&qgroup->tqg_lock, "taskqgroup", NULL, MTX_DEF); 854 qgroup->tqg_name = name; 855 LIST_INIT(&qgroup->tqg_queue[0].tgc_tasks); 856 857 return (qgroup); 858 } 859 860 void 861 taskqgroup_destroy(struct taskqgroup *qgroup) 862 { 863 864 } 865