1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2007 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 26 #pragma ident "%Z%%M% %I% %E% SMI" 27 28 /* 29 * Squeues - TCP/IP serialization mechanism. 30 * 31 * This is a general purpose high-performance serialization mechanism. It is 32 * similar to a taskq with a single worker thread, the difference is that it 33 * does not imply a context switch - the thread placing a request may actually 34 * process it. It is also biased for processing requests in interrupt context. 35 * 36 * Each squeue has a worker thread which may optionally be bound to a CPU. 37 * 38 * Only one thread may process requests from a given squeue at any time. This is 39 * called "entering" squeue. 40 * 41 * Each dispatched request is processed either by 42 * 43 * a) Dispatching thread or 44 * b) Some other thread that is currently processing squeue at the time of 45 * request or 46 * c) worker thread. 47 * 48 * INTERFACES: 49 * 50 * squeue_t *squeue_create(name, bind, wait, pri) 51 * 52 * name: symbolic name for squeue. 53 * wait: time to wait before waiking the worker thread after queueing 54 * request. 55 * bind: preferred CPU binding for the worker thread. 56 * pri: thread priority for the worker thread. 57 * 58 * This function never fails and may sleep. It returns a transparent pointer 59 * to the squeue_t structure that is passed to all other squeue operations. 60 * 61 * void squeue_bind(sqp, bind) 62 * 63 * Bind squeue worker thread to a CPU specified by the 'bind' argument. The 64 * 'bind' value of -1 binds to the preferred thread specified for 65 * squeue_create. 66 * 67 * NOTE: Any value of 'bind' other then -1 is not supported currently, but the 68 * API is present - in the future it may be useful to specify different 69 * binding. 70 * 71 * void squeue_unbind(sqp) 72 * 73 * Unbind the worker thread from its preferred CPU. 74 * 75 * void squeue_enter(*sqp, *mp, proc, arg, tag) 76 * 77 * Post a single request for processing. Each request consists of mblock 'mp', 78 * function 'proc' to execute and an argument 'arg' to pass to this 79 * function. The function is called as (*proc)(arg, mp, sqp); The tag is an 80 * arbitrary number from 0 to 255 which will be stored in mp to track exact 81 * caller of squeue_enter. The combination of function name and the tag should 82 * provide enough information to identify the caller. 83 * 84 * If no one is processing the squeue, squeue_enter() will call the function 85 * immediately. Otherwise it will add the request to the queue for later 86 * processing. Once the function is executed, the thread may continue 87 * executing all other requests pending on the queue. 88 * 89 * NOTE: The tagging information is only used when SQUEUE_DEBUG is set to 1. 90 * NOTE: The argument can be conn_t only. Ideally we'd like to have generic 91 * argument, but we want to drop connection reference count here - this 92 * improves tail-call optimizations. 93 * XXX: The arg should have type conn_t. 94 * 95 * void squeue_enter_nodrain(*sqp, *mp, proc, arg, tag) 96 * 97 * Same as squeue_enter(), but the entering thread will only try to execute a 98 * single request. It will not continue executing any pending requests. 99 * 100 * void squeue_fill(*sqp, *mp, proc, arg, tag) 101 * 102 * Just place the request on the queue without trying to execute it. Arrange 103 * for the worker thread to process the request. 104 * 105 * void squeue_profile_enable(sqp) 106 * void squeue_profile_disable(sqp) 107 * 108 * Enable or disable profiling for specified 'sqp'. Profiling is only 109 * available when SQUEUE_PROFILE is set. 110 * 111 * void squeue_profile_reset(sqp) 112 * 113 * Reset all profiling information to zero. Profiling is only 114 * available when SQUEUE_PROFILE is set. 115 * 116 * void squeue_profile_start() 117 * void squeue_profile_stop() 118 * 119 * Globally enable or disabled profiling for all squeues. 120 * 121 * uintptr_t *squeue_getprivate(sqp, p) 122 * 123 * Each squeue keeps small amount of private data space available for various 124 * consumers. Current consumers include TCP and NCA. Other consumers need to 125 * add their private tag to the sqprivate_t enum. The private information is 126 * limited to an uintptr_t value. The squeue has no knowledge of its content 127 * and does not manage it in any way. 128 * 129 * The typical use may be a breakdown of data structures per CPU (since 130 * squeues are usually per CPU). See NCA for examples of use. 131 * Currently 'p' may have one legal value SQPRIVATE_TCP. 132 * 133 * processorid_t squeue_binding(sqp) 134 * 135 * Returns the CPU binding for a given squeue. 136 * 137 * TUNABALES: 138 * 139 * squeue_intrdrain_ms: Maximum time in ms interrupts spend draining any 140 * squeue. Note that this is approximation - squeues have no control on the 141 * time it takes to process each request. This limit is only checked 142 * between processing individual messages. 143 * Default: 20 ms. 144 * 145 * squeue_writerdrain_ms: Maximum time in ms non-interrupts spend draining any 146 * squeue. Note that this is approximation - squeues have no control on the 147 * time it takes to process each request. This limit is only checked 148 * between processing individual messages. 149 * Default: 10 ms. 150 * 151 * squeue_workerdrain_ms: Maximum time in ms worker thread spends draining any 152 * squeue. Note that this is approximation - squeues have no control on the 153 * time it takes to process each request. This limit is only checked 154 * between processing individual messages. 155 * Default: 10 ms. 156 * 157 * squeue_workerwait_ms: When worker thread is interrupted because workerdrain 158 * expired, how much time to wait before waking worker thread again. 159 * Default: 10 ms. 160 */ 161 162 #include <sys/types.h> 163 #include <sys/cmn_err.h> 164 #include <sys/debug.h> 165 #include <sys/kmem.h> 166 #include <sys/cpuvar.h> 167 #include <sys/condvar_impl.h> 168 #include <sys/systm.h> 169 #include <sys/callb.h> 170 #include <sys/sdt.h> 171 #include <sys/ddi.h> 172 173 #include <inet/ipclassifier.h> 174 #include <inet/udp_impl.h> 175 176 /* 177 * State flags. 178 * Note: The MDB IP module depends on the values of these flags. 179 */ 180 #define SQS_PROC 0x0001 /* being processed */ 181 #define SQS_WORKER 0x0002 /* worker thread */ 182 #define SQS_ENTER 0x0004 /* enter thread */ 183 #define SQS_FAST 0x0008 /* enter-fast thread */ 184 #define SQS_USER 0x0010 /* A non interrupt user */ 185 #define SQS_BOUND 0x0020 /* Worker thread is bound */ 186 #define SQS_PROFILE 0x0040 /* Enable profiling */ 187 #define SQS_REENTER 0x0080 /* Re entered thread */ 188 #define SQS_TMO_PROG 0x0100 /* Timeout is being set */ 189 190 #include <sys/squeue_impl.h> 191 192 static void squeue_fire(void *); 193 static void squeue_drain(squeue_t *, uint_t, hrtime_t); 194 static void squeue_worker(squeue_t *sqp); 195 196 #if SQUEUE_PROFILE 197 static kmutex_t squeue_kstat_lock; 198 static int squeue_kstat_update(kstat_t *, int); 199 #endif 200 201 kmem_cache_t *squeue_cache; 202 203 #define SQUEUE_MSEC_TO_NSEC 1000000 204 205 int squeue_intrdrain_ms = 20; 206 int squeue_writerdrain_ms = 10; 207 int squeue_workerdrain_ms = 10; 208 int squeue_workerwait_ms = 10; 209 210 /* The values above converted to ticks or nano seconds */ 211 static int squeue_intrdrain_ns = 0; 212 static int squeue_writerdrain_ns = 0; 213 static int squeue_workerdrain_ns = 0; 214 static int squeue_workerwait_tick = 0; 215 216 /* 217 * The minimum packet queued when worker thread doing the drain triggers 218 * polling (if squeue allows it). The choice of 3 is arbitrary. You 219 * definitely don't want it to be 1 since that will trigger polling 220 * on very low loads as well (ssh seems to do be one such example 221 * where packet flow was very low yet somehow 1 packet ended up getting 222 * queued and worker thread fires every 10ms and blanking also gets 223 * triggered. 224 */ 225 int squeue_worker_poll_min = 3; 226 227 #if SQUEUE_PROFILE 228 /* 229 * Set to B_TRUE to enable profiling. 230 */ 231 static int squeue_profile = B_FALSE; 232 #define SQ_PROFILING(sqp) (squeue_profile && ((sqp)->sq_state & SQS_PROFILE)) 233 234 #define SQSTAT(sqp, x) ((sqp)->sq_stats.x++) 235 #define SQDELTA(sqp, x, d) ((sqp)->sq_stats.x += (d)) 236 237 struct squeue_kstat { 238 kstat_named_t sq_count; 239 kstat_named_t sq_max_qlen; 240 kstat_named_t sq_npackets_worker; 241 kstat_named_t sq_npackets_intr; 242 kstat_named_t sq_npackets_other; 243 kstat_named_t sq_nqueued_intr; 244 kstat_named_t sq_nqueued_other; 245 kstat_named_t sq_ndrains_worker; 246 kstat_named_t sq_ndrains_intr; 247 kstat_named_t sq_ndrains_other; 248 kstat_named_t sq_time_worker; 249 kstat_named_t sq_time_intr; 250 kstat_named_t sq_time_other; 251 } squeue_kstat = { 252 { "count", KSTAT_DATA_UINT64 }, 253 { "max_qlen", KSTAT_DATA_UINT64 }, 254 { "packets_worker", KSTAT_DATA_UINT64 }, 255 { "packets_intr", KSTAT_DATA_UINT64 }, 256 { "packets_other", KSTAT_DATA_UINT64 }, 257 { "queued_intr", KSTAT_DATA_UINT64 }, 258 { "queued_other", KSTAT_DATA_UINT64 }, 259 { "ndrains_worker", KSTAT_DATA_UINT64 }, 260 { "ndrains_intr", KSTAT_DATA_UINT64 }, 261 { "ndrains_other", KSTAT_DATA_UINT64 }, 262 { "time_worker", KSTAT_DATA_UINT64 }, 263 { "time_intr", KSTAT_DATA_UINT64 }, 264 { "time_other", KSTAT_DATA_UINT64 }, 265 }; 266 #endif 267 268 #define SQUEUE_WORKER_WAKEUP(sqp) { \ 269 timeout_id_t tid = (sqp)->sq_tid; \ 270 \ 271 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 272 /* \ 273 * Queue isn't being processed, so take \ 274 * any post enqueue actions needed before leaving. \ 275 */ \ 276 if (tid != 0) { \ 277 /* \ 278 * Waiting for an enter() to process mblk(s). \ 279 */ \ 280 clock_t waited = lbolt - (sqp)->sq_awaken; \ 281 \ 282 if (TICK_TO_MSEC(waited) >= (sqp)->sq_wait) { \ 283 /* \ 284 * Times up and have a worker thread \ 285 * waiting for work, so schedule it. \ 286 */ \ 287 (sqp)->sq_tid = 0; \ 288 (sqp)->sq_awaken = lbolt; \ 289 cv_signal(&(sqp)->sq_async); \ 290 mutex_exit(&(sqp)->sq_lock); \ 291 (void) untimeout(tid); \ 292 return; \ 293 } \ 294 mutex_exit(&(sqp)->sq_lock); \ 295 return; \ 296 } else if ((sqp)->sq_state & SQS_TMO_PROG) { \ 297 mutex_exit(&(sqp)->sq_lock); \ 298 return; \ 299 } else if ((sqp)->sq_wait != 0) { \ 300 clock_t wait = (sqp)->sq_wait; \ 301 /* \ 302 * Wait up to sqp->sq_wait ms for an \ 303 * enter() to process this queue. We \ 304 * don't want to contend on timeout locks \ 305 * with sq_lock held for performance reasons, \ 306 * so drop the sq_lock before calling timeout \ 307 * but we need to check if timeout is required \ 308 * after re acquiring the sq_lock. Once \ 309 * the sq_lock is dropped, someone else could \ 310 * have processed the packet or the timeout could \ 311 * have already fired. \ 312 */ \ 313 (sqp)->sq_state |= SQS_TMO_PROG; \ 314 mutex_exit(&(sqp)->sq_lock); \ 315 tid = timeout(squeue_fire, (sqp), wait); \ 316 mutex_enter(&(sqp)->sq_lock); \ 317 /* Check again if we still need the timeout */ \ 318 if ((((sqp)->sq_state & (SQS_PROC|SQS_TMO_PROG)) == \ 319 SQS_TMO_PROG) && ((sqp)->sq_tid == 0) && \ 320 ((sqp)->sq_first != NULL)) { \ 321 (sqp)->sq_state &= ~SQS_TMO_PROG; \ 322 (sqp)->sq_awaken = lbolt; \ 323 (sqp)->sq_tid = tid; \ 324 mutex_exit(&(sqp)->sq_lock); \ 325 return; \ 326 } else { \ 327 if ((sqp)->sq_state & SQS_TMO_PROG) { \ 328 (sqp)->sq_state &= ~SQS_TMO_PROG; \ 329 mutex_exit(&(sqp)->sq_lock); \ 330 (void) untimeout(tid); \ 331 } else { \ 332 /* \ 333 * The timer fired before we could \ 334 * reacquire the sq_lock. squeue_fire \ 335 * removes the SQS_TMO_PROG flag \ 336 * and we don't need to do anything \ 337 * else. \ 338 */ \ 339 mutex_exit(&(sqp)->sq_lock); \ 340 } \ 341 } \ 342 } else { \ 343 /* \ 344 * Schedule the worker thread. \ 345 */ \ 346 (sqp)->sq_awaken = lbolt; \ 347 cv_signal(&(sqp)->sq_async); \ 348 mutex_exit(&(sqp)->sq_lock); \ 349 } \ 350 ASSERT(MUTEX_NOT_HELD(&(sqp)->sq_lock)); \ 351 } 352 353 #define ENQUEUE_MP(sqp, mp, proc, arg) { \ 354 /* \ 355 * Enque our mblk. \ 356 */ \ 357 (mp)->b_queue = NULL; \ 358 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 359 ASSERT((mp)->b_prev == NULL && (mp)->b_next == NULL); \ 360 (mp)->b_queue = (queue_t *)(proc); \ 361 (mp)->b_prev = (mblk_t *)(arg); \ 362 \ 363 if ((sqp)->sq_last != NULL) \ 364 (sqp)->sq_last->b_next = (mp); \ 365 else \ 366 (sqp)->sq_first = (mp); \ 367 (sqp)->sq_last = (mp); \ 368 (sqp)->sq_count++; \ 369 ASSERT((sqp)->sq_count > 0); \ 370 DTRACE_PROBE2(squeue__enqueue, squeue_t *, sqp, \ 371 mblk_t *, mp); \ 372 } 373 374 375 #define ENQUEUE_CHAIN(sqp, mp, tail, cnt) { \ 376 /* \ 377 * Enqueue our mblk chain. \ 378 */ \ 379 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 380 \ 381 if ((sqp)->sq_last != NULL) \ 382 (sqp)->sq_last->b_next = (mp); \ 383 else \ 384 (sqp)->sq_first = (mp); \ 385 (sqp)->sq_last = (tail); \ 386 (sqp)->sq_count += (cnt); \ 387 ASSERT((sqp)->sq_count > 0); \ 388 DTRACE_PROBE4(squeue__enqueuechain, squeue_t *, sqp, \ 389 mblk_t *, mp, mblk_t *, tail, int, cnt); \ 390 \ 391 } 392 393 #define SQS_POLLING_ON(sqp, rx_ring) { \ 394 ASSERT(rx_ring != NULL); \ 395 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 396 rx_ring->rr_blank(rx_ring->rr_handle, \ 397 MIN((sqp->sq_avg_drain_time * sqp->sq_count), \ 398 rx_ring->rr_max_blank_time), \ 399 rx_ring->rr_max_pkt_cnt); \ 400 rx_ring->rr_poll_state |= ILL_POLLING; \ 401 rx_ring->rr_poll_time = lbolt; \ 402 } 403 404 405 #define SQS_POLLING_OFF(sqp, rx_ring) { \ 406 ASSERT(rx_ring != NULL); \ 407 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 408 rx_ring->rr_blank(rx_ring->rr_handle, \ 409 rx_ring->rr_min_blank_time, \ 410 rx_ring->rr_min_pkt_cnt); \ 411 } 412 413 void 414 squeue_init(void) 415 { 416 squeue_cache = kmem_cache_create("squeue_cache", 417 sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0); 418 419 squeue_intrdrain_ns = squeue_intrdrain_ms * SQUEUE_MSEC_TO_NSEC; 420 squeue_writerdrain_ns = squeue_writerdrain_ms * SQUEUE_MSEC_TO_NSEC; 421 squeue_workerdrain_ns = squeue_workerdrain_ms * SQUEUE_MSEC_TO_NSEC; 422 squeue_workerwait_tick = MSEC_TO_TICK_ROUNDUP(squeue_workerwait_ms); 423 } 424 425 /* ARGSUSED */ 426 squeue_t * 427 squeue_create(char *name, processorid_t bind, clock_t wait, pri_t pri) 428 { 429 squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP); 430 431 bzero(sqp, sizeof (squeue_t)); 432 (void) strncpy(sqp->sq_name, name, SQ_NAMELEN + 1); 433 sqp->sq_name[SQ_NAMELEN] = '\0'; 434 435 sqp->sq_bind = bind; 436 sqp->sq_wait = MSEC_TO_TICK(wait); 437 sqp->sq_avg_drain_time = 438 drv_hztousec(NSEC_TO_TICK_ROUNDUP(squeue_intrdrain_ns)) / 439 NSEC_TO_TICK_ROUNDUP(squeue_intrdrain_ns); 440 441 #if SQUEUE_PROFILE 442 if ((sqp->sq_kstat = kstat_create("ip", bind, name, 443 "net", KSTAT_TYPE_NAMED, 444 sizeof (squeue_kstat) / sizeof (kstat_named_t), 445 KSTAT_FLAG_VIRTUAL)) != NULL) { 446 sqp->sq_kstat->ks_lock = &squeue_kstat_lock; 447 sqp->sq_kstat->ks_data = &squeue_kstat; 448 sqp->sq_kstat->ks_update = squeue_kstat_update; 449 sqp->sq_kstat->ks_private = sqp; 450 kstat_install(sqp->sq_kstat); 451 } 452 #endif 453 454 sqp->sq_worker = thread_create(NULL, 0, squeue_worker, 455 sqp, 0, &p0, TS_RUN, pri); 456 457 return (sqp); 458 } 459 460 /* ARGSUSED */ 461 void 462 squeue_bind(squeue_t *sqp, processorid_t bind) 463 { 464 ASSERT(bind == -1); 465 466 mutex_enter(&sqp->sq_lock); 467 if (sqp->sq_state & SQS_BOUND) { 468 mutex_exit(&sqp->sq_lock); 469 return; 470 } 471 472 sqp->sq_state |= SQS_BOUND; 473 mutex_exit(&sqp->sq_lock); 474 475 thread_affinity_set(sqp->sq_worker, sqp->sq_bind); 476 } 477 478 void 479 squeue_unbind(squeue_t *sqp) 480 { 481 mutex_enter(&sqp->sq_lock); 482 if (!(sqp->sq_state & SQS_BOUND)) { 483 mutex_exit(&sqp->sq_lock); 484 return; 485 } 486 487 sqp->sq_state &= ~SQS_BOUND; 488 mutex_exit(&sqp->sq_lock); 489 490 thread_affinity_clear(sqp->sq_worker); 491 } 492 493 /* 494 * squeue_enter() - enter squeue sqp with mblk mp (which can be 495 * a chain), while tail points to the end and cnt in number of 496 * mblks in the chain. 497 * 498 * For a chain of single packet (i.e. mp == tail), go through the 499 * fast path if no one is processing the squeue and nothing is queued. 500 * 501 * The proc and arg for each mblk is already stored in the mblk in 502 * appropriate places. 503 */ 504 void 505 squeue_enter_chain(squeue_t *sqp, mblk_t *mp, mblk_t *tail, 506 uint32_t cnt, uint8_t tag) 507 { 508 int interrupt = servicing_interrupt(); 509 void *arg; 510 sqproc_t proc; 511 hrtime_t now; 512 #if SQUEUE_PROFILE 513 hrtime_t start, delta; 514 #endif 515 516 ASSERT(sqp != NULL); 517 ASSERT(mp != NULL); 518 ASSERT(tail != NULL); 519 ASSERT(cnt > 0); 520 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 521 522 mutex_enter(&sqp->sq_lock); 523 if (!(sqp->sq_state & SQS_PROC)) { 524 /* 525 * See if anything is already queued. If we are the 526 * first packet, do inline processing else queue the 527 * packet and do the drain. 528 */ 529 sqp->sq_run = curthread; 530 if (sqp->sq_first == NULL && cnt == 1) { 531 /* 532 * Fast-path, ok to process and nothing queued. 533 */ 534 sqp->sq_state |= (SQS_PROC|SQS_FAST); 535 mutex_exit(&sqp->sq_lock); 536 537 /* 538 * We are the chain of 1 packet so 539 * go through this fast path. 540 */ 541 arg = mp->b_prev; 542 mp->b_prev = NULL; 543 proc = (sqproc_t)mp->b_queue; 544 mp->b_queue = NULL; 545 546 ASSERT(proc != NULL); 547 ASSERT(arg != NULL); 548 ASSERT(mp->b_next == NULL); 549 550 #if SQUEUE_DEBUG 551 sqp->sq_isintr = interrupt; 552 sqp->sq_curmp = mp; 553 sqp->sq_curproc = proc; 554 sqp->sq_connp = arg; 555 mp->b_tag = sqp->sq_tag = tag; 556 #endif 557 #if SQUEUE_PROFILE 558 if (SQ_PROFILING(sqp)) { 559 if (interrupt) 560 SQSTAT(sqp, sq_npackets_intr); 561 else 562 SQSTAT(sqp, sq_npackets_other); 563 start = gethrtime(); 564 } 565 #endif 566 ((conn_t *)arg)->conn_on_sqp = B_TRUE; 567 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 568 sqp, mblk_t *, mp, conn_t *, arg); 569 (*proc)(arg, mp, sqp); 570 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 571 sqp, conn_t *, arg); 572 ((conn_t *)arg)->conn_on_sqp = B_FALSE; 573 574 #if SQUEUE_PROFILE 575 if (SQ_PROFILING(sqp)) { 576 delta = gethrtime() - start; 577 if (interrupt) 578 SQDELTA(sqp, sq_time_intr, delta); 579 else 580 SQDELTA(sqp, sq_time_other, delta); 581 } 582 #endif 583 #if SQUEUE_DEBUG 584 sqp->sq_curmp = NULL; 585 sqp->sq_curproc = NULL; 586 sqp->sq_connp = NULL; 587 sqp->sq_isintr = 0; 588 #endif 589 590 CONN_DEC_REF((conn_t *)arg); 591 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 592 mutex_enter(&sqp->sq_lock); 593 sqp->sq_state &= ~(SQS_PROC|SQS_FAST); 594 if (sqp->sq_first == NULL) { 595 /* 596 * We processed inline our packet and 597 * nothing new has arrived. We are done. 598 */ 599 sqp->sq_run = NULL; 600 mutex_exit(&sqp->sq_lock); 601 return; 602 } else if (sqp->sq_bind != CPU->cpu_id) { 603 /* 604 * If the current thread is not running 605 * on the CPU to which this squeue is bound, 606 * then don't allow it to drain. 607 */ 608 sqp->sq_run = NULL; 609 SQUEUE_WORKER_WAKEUP(sqp); 610 return; 611 } 612 } else { 613 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 614 #if SQUEUE_DEBUG 615 mp->b_tag = tag; 616 #endif 617 #if SQUEUE_PROFILE 618 if (SQ_PROFILING(sqp)) { 619 if (servicing_interrupt()) 620 SQSTAT(sqp, sq_nqueued_intr); 621 else 622 SQSTAT(sqp, sq_nqueued_other); 623 if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) 624 sqp->sq_stats.sq_max_qlen = 625 sqp->sq_count; 626 } 627 #endif 628 } 629 630 /* 631 * We are here because either we couldn't do inline 632 * processing (because something was already queued), 633 * or we had a chanin of more than one packet, 634 * or something else arrived after we were done with 635 * inline processing. 636 */ 637 ASSERT(MUTEX_HELD(&sqp->sq_lock)); 638 ASSERT(sqp->sq_first != NULL); 639 640 #if SQUEUE_PROFILE 641 if (SQ_PROFILING(sqp)) { 642 start = gethrtime(); 643 } 644 #endif 645 #if SQUEUE_DEBUG 646 sqp->sq_isintr = interrupt; 647 #endif 648 649 now = gethrtime(); 650 if (interrupt) { 651 squeue_drain(sqp, SQS_ENTER, now + 652 squeue_intrdrain_ns); 653 } else { 654 squeue_drain(sqp, SQS_USER, now + 655 squeue_writerdrain_ns); 656 } 657 658 #if SQUEUE_PROFILE 659 if (SQ_PROFILING(sqp)) { 660 delta = gethrtime() - start; 661 if (interrupt) 662 SQDELTA(sqp, sq_time_intr, delta); 663 else 664 SQDELTA(sqp, sq_time_other, delta); 665 } 666 #endif 667 #if SQUEUE_DEBUG 668 sqp->sq_isintr = 0; 669 #endif 670 671 /* 672 * If we didn't do a complete drain, the worker 673 * thread was already signalled by squeue_drain. 674 */ 675 sqp->sq_run = NULL; 676 mutex_exit(&sqp->sq_lock); 677 return; 678 } else { 679 ASSERT(sqp->sq_run != NULL); 680 /* 681 * Queue is already being processed. Just enqueue 682 * the packet and go away. 683 */ 684 #if SQUEUE_DEBUG 685 mp->b_tag = tag; 686 #endif 687 #if SQUEUE_PROFILE 688 if (SQ_PROFILING(sqp)) { 689 if (servicing_interrupt()) 690 SQSTAT(sqp, sq_nqueued_intr); 691 else 692 SQSTAT(sqp, sq_nqueued_other); 693 if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) 694 sqp->sq_stats.sq_max_qlen = sqp->sq_count; 695 } 696 #endif 697 698 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 699 mutex_exit(&sqp->sq_lock); 700 return; 701 } 702 } 703 704 /* 705 * squeue_enter() - enter squeue *sqp with mblk *mp with argument of *arg. 706 */ 707 void 708 squeue_enter(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void *arg, 709 uint8_t tag) 710 { 711 int interrupt = servicing_interrupt(); 712 hrtime_t now; 713 #if SQUEUE_PROFILE 714 hrtime_t start, delta; 715 #endif 716 #if SQUEUE_DEBUG 717 conn_t *connp = (conn_t *)arg; 718 ASSERT(!IPCL_IS_TCP(connp) || connp->conn_tcp->tcp_connp == connp); 719 ASSERT(!IPCL_IS_UDP(connp) || connp->conn_udp->udp_connp == connp); 720 #endif 721 722 ASSERT(proc != NULL); 723 ASSERT(sqp != NULL); 724 ASSERT(mp != NULL); 725 ASSERT(mp->b_next == NULL); 726 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 727 728 mutex_enter(&sqp->sq_lock); 729 if (!(sqp->sq_state & SQS_PROC)) { 730 /* 731 * See if anything is already queued. If we are the 732 * first packet, do inline processing else queue the 733 * packet and do the drain. 734 */ 735 sqp->sq_run = curthread; 736 if (sqp->sq_first == NULL) { 737 /* 738 * Fast-path, ok to process and nothing queued. 739 */ 740 sqp->sq_state |= (SQS_PROC|SQS_FAST); 741 mutex_exit(&sqp->sq_lock); 742 743 #if SQUEUE_DEBUG 744 sqp->sq_isintr = interrupt; 745 sqp->sq_curmp = mp; 746 sqp->sq_curproc = proc; 747 sqp->sq_connp = connp; 748 mp->b_tag = sqp->sq_tag = tag; 749 #endif 750 #if SQUEUE_PROFILE 751 if (SQ_PROFILING(sqp)) { 752 if (interrupt) 753 SQSTAT(sqp, sq_npackets_intr); 754 else 755 SQSTAT(sqp, sq_npackets_other); 756 start = gethrtime(); 757 } 758 #endif 759 ((conn_t *)arg)->conn_on_sqp = B_TRUE; 760 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 761 sqp, mblk_t *, mp, conn_t *, arg); 762 (*proc)(arg, mp, sqp); 763 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 764 sqp, conn_t *, arg); 765 ((conn_t *)arg)->conn_on_sqp = B_FALSE; 766 767 #if SQUEUE_PROFILE 768 if (SQ_PROFILING(sqp)) { 769 delta = gethrtime() - start; 770 if (interrupt) 771 SQDELTA(sqp, sq_time_intr, delta); 772 else 773 SQDELTA(sqp, sq_time_other, delta); 774 } 775 #endif 776 #if SQUEUE_DEBUG 777 sqp->sq_curmp = NULL; 778 sqp->sq_curproc = NULL; 779 sqp->sq_connp = NULL; 780 sqp->sq_isintr = 0; 781 #endif 782 783 CONN_DEC_REF((conn_t *)arg); 784 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 785 mutex_enter(&sqp->sq_lock); 786 sqp->sq_state &= ~(SQS_PROC|SQS_FAST); 787 if (sqp->sq_first == NULL) { 788 /* 789 * We processed inline our packet and 790 * nothing new has arrived. We are done. 791 */ 792 sqp->sq_run = NULL; 793 mutex_exit(&sqp->sq_lock); 794 return; 795 } else if (sqp->sq_bind != CPU->cpu_id) { 796 /* 797 * If the current thread is not running 798 * on the CPU to which this squeue is bound, 799 * then don't allow it to drain. 800 */ 801 sqp->sq_run = NULL; 802 SQUEUE_WORKER_WAKEUP(sqp); 803 return; 804 } 805 } else { 806 ENQUEUE_MP(sqp, mp, proc, arg); 807 #if SQUEUE_DEBUG 808 mp->b_tag = tag; 809 #endif 810 #if SQUEUE_PROFILE 811 if (SQ_PROFILING(sqp)) { 812 if (servicing_interrupt()) 813 SQSTAT(sqp, sq_nqueued_intr); 814 else 815 SQSTAT(sqp, sq_nqueued_other); 816 if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) 817 sqp->sq_stats.sq_max_qlen = 818 sqp->sq_count; 819 } 820 #endif 821 } 822 823 /* 824 * We are here because either we couldn't do inline 825 * processing (because something was already queued) 826 * or something else arrived after we were done with 827 * inline processing. 828 */ 829 ASSERT(MUTEX_HELD(&sqp->sq_lock)); 830 ASSERT(sqp->sq_first != NULL); 831 832 #if SQUEUE_PROFILE 833 if (SQ_PROFILING(sqp)) { 834 start = gethrtime(); 835 } 836 #endif 837 #if SQUEUE_DEBUG 838 sqp->sq_isintr = interrupt; 839 #endif 840 841 now = gethrtime(); 842 if (interrupt) { 843 squeue_drain(sqp, SQS_ENTER, now + 844 squeue_intrdrain_ns); 845 } else { 846 squeue_drain(sqp, SQS_USER, now + 847 squeue_writerdrain_ns); 848 } 849 850 #if SQUEUE_PROFILE 851 if (SQ_PROFILING(sqp)) { 852 delta = gethrtime() - start; 853 if (interrupt) 854 SQDELTA(sqp, sq_time_intr, delta); 855 else 856 SQDELTA(sqp, sq_time_other, delta); 857 } 858 #endif 859 #if SQUEUE_DEBUG 860 sqp->sq_isintr = 0; 861 #endif 862 863 /* 864 * If we didn't do a complete drain, the worker 865 * thread was already signalled by squeue_drain. 866 */ 867 sqp->sq_run = NULL; 868 mutex_exit(&sqp->sq_lock); 869 return; 870 } else { 871 ASSERT(sqp->sq_run != NULL); 872 /* 873 * We let a thread processing a squeue reenter only 874 * once. This helps the case of incoming connection 875 * where a SYN-ACK-ACK that triggers the conn_ind 876 * doesn't have to queue the packet if listener and 877 * eager are on the same squeue. Also helps the 878 * loopback connection where the two ends are bound 879 * to the same squeue (which is typical on single 880 * CPU machines). 881 * We let the thread reenter only once for the fear 882 * of stack getting blown with multiple traversal. 883 */ 884 if (!(sqp->sq_state & SQS_REENTER) && 885 (sqp->sq_run == curthread) && sqp->sq_first == NULL && 886 (((conn_t *)arg)->conn_on_sqp == B_FALSE)) { 887 sqp->sq_state |= SQS_REENTER; 888 mutex_exit(&sqp->sq_lock); 889 890 ((conn_t *)arg)->conn_on_sqp = B_TRUE; 891 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 892 sqp, mblk_t *, mp, conn_t *, arg); 893 (*proc)(arg, mp, sqp); 894 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 895 sqp, conn_t *, arg); 896 ((conn_t *)arg)->conn_on_sqp = B_FALSE; 897 CONN_DEC_REF((conn_t *)arg); 898 899 mutex_enter(&sqp->sq_lock); 900 sqp->sq_state &= ~SQS_REENTER; 901 mutex_exit(&sqp->sq_lock); 902 return; 903 } 904 /* 905 * Queue is already being processed. Just enqueue 906 * the packet and go away. 907 */ 908 #if SQUEUE_DEBUG 909 mp->b_tag = tag; 910 #endif 911 #if SQUEUE_PROFILE 912 if (SQ_PROFILING(sqp)) { 913 if (servicing_interrupt()) 914 SQSTAT(sqp, sq_nqueued_intr); 915 else 916 SQSTAT(sqp, sq_nqueued_other); 917 if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) 918 sqp->sq_stats.sq_max_qlen = sqp->sq_count; 919 } 920 #endif 921 922 ENQUEUE_MP(sqp, mp, proc, arg); 923 mutex_exit(&sqp->sq_lock); 924 return; 925 } 926 } 927 928 void 929 squeue_enter_nodrain(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void *arg, 930 uint8_t tag) 931 { 932 int interrupt = servicing_interrupt(); 933 boolean_t being_processed; 934 #if SQUEUE_DEBUG 935 conn_t *connp = (conn_t *)arg; 936 #endif 937 #if SQUEUE_PROFILE 938 hrtime_t start, delta; 939 #endif 940 941 ASSERT(proc != NULL); 942 ASSERT(sqp != NULL); 943 ASSERT(mp != NULL); 944 ASSERT(mp->b_next == NULL); 945 ASSERT(!IPCL_IS_TCP(connp) || connp->conn_tcp->tcp_connp == connp); 946 ASSERT(!IPCL_IS_UDP(connp) || connp->conn_udp->udp_connp == connp); 947 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 948 949 mutex_enter(&sqp->sq_lock); 950 951 being_processed = (sqp->sq_state & SQS_PROC); 952 if (!being_processed && (sqp->sq_first == NULL)) { 953 /* 954 * Fast-path, ok to process and nothing queued. 955 */ 956 sqp->sq_state |= (SQS_PROC|SQS_FAST); 957 sqp->sq_run = curthread; 958 mutex_exit(&sqp->sq_lock); 959 960 #if SQUEUE_DEBUG 961 sqp->sq_isintr = interrupt; 962 sqp->sq_curmp = mp; 963 sqp->sq_curproc = proc; 964 sqp->sq_connp = connp; 965 mp->b_tag = sqp->sq_tag = tag; 966 #endif 967 968 #if SQUEUE_PROFILE 969 if (SQ_PROFILING(sqp)) { 970 if (interrupt) 971 SQSTAT(sqp, sq_npackets_intr); 972 else 973 SQSTAT(sqp, sq_npackets_other); 974 start = gethrtime(); 975 } 976 #endif 977 978 ((conn_t *)arg)->conn_on_sqp = B_TRUE; 979 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 980 sqp, mblk_t *, mp, conn_t *, arg); 981 (*proc)(arg, mp, sqp); 982 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 983 sqp, conn_t *, arg); 984 ((conn_t *)arg)->conn_on_sqp = B_FALSE; 985 986 #if SQUEUE_DEBUG 987 sqp->sq_curmp = NULL; 988 sqp->sq_curproc = NULL; 989 sqp->sq_connp = NULL; 990 sqp->sq_isintr = 0; 991 #endif 992 #if SQUEUE_PROFILE 993 if (SQ_PROFILING(sqp)) { 994 delta = gethrtime() - start; 995 if (interrupt) 996 SQDELTA(sqp, sq_time_intr, delta); 997 else 998 SQDELTA(sqp, sq_time_other, delta); 999 } 1000 #endif 1001 1002 CONN_DEC_REF((conn_t *)arg); 1003 mutex_enter(&sqp->sq_lock); 1004 sqp->sq_state &= ~(SQS_PROC|SQS_FAST); 1005 sqp->sq_run = NULL; 1006 if (sqp->sq_first == NULL) { 1007 /* 1008 * We processed inline our packet and 1009 * nothing new has arrived. We are done. 1010 */ 1011 mutex_exit(&sqp->sq_lock); 1012 } else { 1013 SQUEUE_WORKER_WAKEUP(sqp); 1014 } 1015 return; 1016 } else { 1017 /* 1018 * We let a thread processing a squeue reenter only 1019 * once. This helps the case of incoming connection 1020 * where a SYN-ACK-ACK that triggers the conn_ind 1021 * doesn't have to queue the packet if listener and 1022 * eager are on the same squeue. Also helps the 1023 * loopback connection where the two ends are bound 1024 * to the same squeue (which is typical on single 1025 * CPU machines). 1026 * We let the thread reenter only once for the fear 1027 * of stack getting blown with multiple traversal. 1028 */ 1029 if (being_processed && !(sqp->sq_state & SQS_REENTER) && 1030 (sqp->sq_run == curthread) && sqp->sq_first == NULL && 1031 (((conn_t *)arg)->conn_on_sqp == B_FALSE)) { 1032 sqp->sq_state |= SQS_REENTER; 1033 mutex_exit(&sqp->sq_lock); 1034 1035 ((conn_t *)arg)->conn_on_sqp = B_TRUE; 1036 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 1037 sqp, mblk_t *, mp, conn_t *, arg); 1038 (*proc)(arg, mp, sqp); 1039 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 1040 sqp, conn_t *, arg); 1041 ((conn_t *)arg)->conn_on_sqp = B_FALSE; 1042 CONN_DEC_REF((conn_t *)arg); 1043 1044 mutex_enter(&sqp->sq_lock); 1045 sqp->sq_state &= ~SQS_REENTER; 1046 mutex_exit(&sqp->sq_lock); 1047 return; 1048 } 1049 1050 #if SQUEUE_DEBUG 1051 mp->b_tag = tag; 1052 #endif 1053 #if SQUEUE_PROFILE 1054 if (SQ_PROFILING(sqp)) { 1055 if (servicing_interrupt()) 1056 SQSTAT(sqp, sq_nqueued_intr); 1057 else 1058 SQSTAT(sqp, sq_nqueued_other); 1059 if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) 1060 sqp->sq_stats.sq_max_qlen = sqp->sq_count; 1061 } 1062 #endif 1063 ENQUEUE_MP(sqp, mp, proc, arg); 1064 if (being_processed) { 1065 /* 1066 * Queue is already being processed. 1067 * No need to do anything. 1068 */ 1069 mutex_exit(&sqp->sq_lock); 1070 return; 1071 } 1072 SQUEUE_WORKER_WAKEUP(sqp); 1073 } 1074 } 1075 1076 /* 1077 * squeue_fill() - fill squeue *sqp with mblk *mp with argument of *arg 1078 * without processing the squeue. 1079 */ 1080 /* ARGSUSED */ 1081 void 1082 squeue_fill(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void * arg, 1083 uint8_t tag) 1084 { 1085 #if SQUEUE_DEBUG 1086 conn_t *connp = (conn_t *)arg; 1087 #endif 1088 ASSERT(proc != NULL); 1089 ASSERT(sqp != NULL); 1090 ASSERT(mp != NULL); 1091 ASSERT(mp->b_next == NULL); 1092 ASSERT(!IPCL_IS_TCP(connp) || connp->conn_tcp->tcp_connp == connp); 1093 ASSERT(!IPCL_IS_UDP(connp) || connp->conn_udp->udp_connp == connp); 1094 1095 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 1096 mutex_enter(&sqp->sq_lock); 1097 ENQUEUE_MP(sqp, mp, proc, arg); 1098 #if SQUEUE_DEBUG 1099 mp->b_tag = tag; 1100 #endif 1101 #if SQUEUE_PROFILE 1102 if (SQ_PROFILING(sqp)) { 1103 if (servicing_interrupt()) 1104 SQSTAT(sqp, sq_nqueued_intr); 1105 else 1106 SQSTAT(sqp, sq_nqueued_other); 1107 if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) 1108 sqp->sq_stats.sq_max_qlen = sqp->sq_count; 1109 } 1110 #endif 1111 1112 /* 1113 * If queue is already being processed. No need to do anything. 1114 */ 1115 if (sqp->sq_state & SQS_PROC) { 1116 mutex_exit(&sqp->sq_lock); 1117 return; 1118 } 1119 1120 SQUEUE_WORKER_WAKEUP(sqp); 1121 } 1122 1123 1124 /* 1125 * PRIVATE FUNCTIONS 1126 */ 1127 1128 static void 1129 squeue_fire(void *arg) 1130 { 1131 squeue_t *sqp = arg; 1132 uint_t state; 1133 1134 mutex_enter(&sqp->sq_lock); 1135 1136 state = sqp->sq_state; 1137 if (sqp->sq_tid == 0 && !(state & SQS_TMO_PROG)) { 1138 mutex_exit(&sqp->sq_lock); 1139 return; 1140 } 1141 1142 sqp->sq_tid = 0; 1143 /* 1144 * The timeout fired before we got a chance to set it. 1145 * Process it anyway but remove the SQS_TMO_PROG so that 1146 * the guy trying to set the timeout knows that it has 1147 * already been processed. 1148 */ 1149 if (state & SQS_TMO_PROG) 1150 sqp->sq_state &= ~SQS_TMO_PROG; 1151 1152 if (!(state & SQS_PROC)) { 1153 sqp->sq_awaken = lbolt; 1154 cv_signal(&sqp->sq_async); 1155 } 1156 mutex_exit(&sqp->sq_lock); 1157 } 1158 1159 static void 1160 squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire) 1161 { 1162 mblk_t *mp; 1163 mblk_t *head; 1164 sqproc_t proc; 1165 conn_t *connp; 1166 clock_t start = lbolt; 1167 clock_t drain_time; 1168 timeout_id_t tid; 1169 uint_t cnt; 1170 uint_t total_cnt = 0; 1171 ill_rx_ring_t *sq_rx_ring = sqp->sq_rx_ring; 1172 int interrupt = servicing_interrupt(); 1173 boolean_t poll_on = B_FALSE; 1174 hrtime_t now; 1175 1176 ASSERT(mutex_owned(&sqp->sq_lock)); 1177 ASSERT(!(sqp->sq_state & SQS_PROC)); 1178 1179 #if SQUEUE_PROFILE 1180 if (SQ_PROFILING(sqp)) { 1181 if (interrupt) 1182 SQSTAT(sqp, sq_ndrains_intr); 1183 else if (!(proc_type & SQS_WORKER)) 1184 SQSTAT(sqp, sq_ndrains_other); 1185 else 1186 SQSTAT(sqp, sq_ndrains_worker); 1187 } 1188 #endif 1189 1190 if ((tid = sqp->sq_tid) != 0) 1191 sqp->sq_tid = 0; 1192 1193 sqp->sq_state |= SQS_PROC | proc_type; 1194 head = sqp->sq_first; 1195 sqp->sq_first = NULL; 1196 sqp->sq_last = NULL; 1197 cnt = sqp->sq_count; 1198 1199 /* 1200 * We have backlog built up. Switch to polling mode if the 1201 * device underneath allows it. Need to do it only for 1202 * drain by non-interrupt thread so interrupts don't 1203 * come and disrupt us in between. If its a interrupt thread, 1204 * no need because most devices will not issue another 1205 * interrupt till this one returns. 1206 */ 1207 if ((sqp->sq_state & SQS_POLL_CAPAB) && !(proc_type & SQS_ENTER) && 1208 (sqp->sq_count > squeue_worker_poll_min)) { 1209 ASSERT(sq_rx_ring != NULL); 1210 SQS_POLLING_ON(sqp, sq_rx_ring); 1211 poll_on = B_TRUE; 1212 } 1213 1214 mutex_exit(&sqp->sq_lock); 1215 1216 if (tid != 0) 1217 (void) untimeout(tid); 1218 again: 1219 while ((mp = head) != NULL) { 1220 head = mp->b_next; 1221 mp->b_next = NULL; 1222 1223 proc = (sqproc_t)mp->b_queue; 1224 mp->b_queue = NULL; 1225 connp = (conn_t *)mp->b_prev; 1226 mp->b_prev = NULL; 1227 #if SQUEUE_DEBUG 1228 sqp->sq_curmp = mp; 1229 sqp->sq_curproc = proc; 1230 sqp->sq_connp = connp; 1231 sqp->sq_tag = mp->b_tag; 1232 #endif 1233 1234 #if SQUEUE_PROFILE 1235 if (SQ_PROFILING(sqp)) { 1236 if (interrupt) 1237 SQSTAT(sqp, sq_npackets_intr); 1238 else if (!(proc_type & SQS_WORKER)) 1239 SQSTAT(sqp, sq_npackets_other); 1240 else 1241 SQSTAT(sqp, sq_npackets_worker); 1242 } 1243 #endif 1244 1245 connp->conn_on_sqp = B_TRUE; 1246 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 1247 sqp, mblk_t *, mp, conn_t *, connp); 1248 (*proc)(connp, mp, sqp); 1249 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 1250 sqp, conn_t *, connp); 1251 connp->conn_on_sqp = B_FALSE; 1252 CONN_DEC_REF(connp); 1253 } 1254 1255 1256 #if SQUEUE_DEBUG 1257 sqp->sq_curmp = NULL; 1258 sqp->sq_curproc = NULL; 1259 sqp->sq_connp = NULL; 1260 #endif 1261 1262 mutex_enter(&sqp->sq_lock); 1263 sqp->sq_count -= cnt; 1264 total_cnt += cnt; 1265 1266 if (sqp->sq_first != NULL) { 1267 1268 now = gethrtime(); 1269 if (!expire || (now < expire)) { 1270 /* More arrived and time not expired */ 1271 head = sqp->sq_first; 1272 sqp->sq_first = NULL; 1273 sqp->sq_last = NULL; 1274 cnt = sqp->sq_count; 1275 mutex_exit(&sqp->sq_lock); 1276 goto again; 1277 } 1278 1279 /* 1280 * If we are not worker thread and we 1281 * reached our time limit to do drain, 1282 * signal the worker thread to pick 1283 * up the work. 1284 * If we were the worker thread, then 1285 * we take a break to allow an interrupt 1286 * or writer to pick up the load. 1287 */ 1288 if (proc_type != SQS_WORKER) { 1289 sqp->sq_awaken = lbolt; 1290 cv_signal(&sqp->sq_async); 1291 } 1292 } 1293 1294 /* 1295 * Try to see if we can get a time estimate to process a packet. 1296 * Do it only in interrupt context since less chance of context 1297 * switch or pinning etc. to get a better estimate. 1298 */ 1299 if (interrupt && ((drain_time = (lbolt - start)) > 0)) 1300 sqp->sq_avg_drain_time = ((80 * sqp->sq_avg_drain_time) + 1301 (20 * (drv_hztousec(drain_time)/total_cnt)))/100; 1302 1303 sqp->sq_state &= ~(SQS_PROC | proc_type); 1304 1305 /* 1306 * If polling was turned on, turn it off and reduce the default 1307 * interrupt blank interval as well to bring new packets in faster 1308 * (reduces the latency when there is no backlog). 1309 */ 1310 if (poll_on && (sqp->sq_state & SQS_POLL_CAPAB)) { 1311 ASSERT(sq_rx_ring != NULL); 1312 SQS_POLLING_OFF(sqp, sq_rx_ring); 1313 } 1314 } 1315 1316 static void 1317 squeue_worker(squeue_t *sqp) 1318 { 1319 kmutex_t *lock = &sqp->sq_lock; 1320 kcondvar_t *async = &sqp->sq_async; 1321 callb_cpr_t cprinfo; 1322 hrtime_t now; 1323 #if SQUEUE_PROFILE 1324 hrtime_t start; 1325 #endif 1326 1327 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "nca"); 1328 mutex_enter(lock); 1329 1330 for (;;) { 1331 while (sqp->sq_first == NULL || (sqp->sq_state & SQS_PROC)) { 1332 CALLB_CPR_SAFE_BEGIN(&cprinfo); 1333 still_wait: 1334 cv_wait(async, lock); 1335 if (sqp->sq_state & SQS_PROC) { 1336 goto still_wait; 1337 } 1338 CALLB_CPR_SAFE_END(&cprinfo, lock); 1339 } 1340 1341 #if SQUEUE_PROFILE 1342 if (SQ_PROFILING(sqp)) { 1343 start = gethrtime(); 1344 } 1345 #endif 1346 1347 ASSERT(squeue_workerdrain_ns != 0); 1348 now = gethrtime(); 1349 sqp->sq_run = curthread; 1350 squeue_drain(sqp, SQS_WORKER, now + squeue_workerdrain_ns); 1351 sqp->sq_run = NULL; 1352 1353 if (sqp->sq_first != NULL) { 1354 /* 1355 * Doing too much processing by worker thread 1356 * in presense of interrupts can be sub optimal. 1357 * Instead, once a drain is done by worker thread 1358 * for squeue_writerdrain_ns (the reason we are 1359 * here), we force wait for squeue_workerwait_tick 1360 * before doing more processing even if sq_wait is 1361 * set to 0. 1362 * 1363 * This can be counterproductive for performance 1364 * if worker thread is the only means to process 1365 * the packets (interrupts or writers are not 1366 * allowed inside the squeue). 1367 */ 1368 if (sqp->sq_tid == 0 && 1369 !(sqp->sq_state & SQS_TMO_PROG)) { 1370 timeout_id_t tid; 1371 1372 sqp->sq_state |= SQS_TMO_PROG; 1373 mutex_exit(&sqp->sq_lock); 1374 tid = timeout(squeue_fire, sqp, 1375 squeue_workerwait_tick); 1376 mutex_enter(&sqp->sq_lock); 1377 /* 1378 * Check again if we still need 1379 * the timeout 1380 */ 1381 if (((sqp->sq_state & (SQS_TMO_PROG|SQS_PROC)) 1382 == SQS_TMO_PROG) && (sqp->sq_tid == 0) && 1383 (sqp->sq_first != NULL)) { 1384 sqp->sq_state &= ~SQS_TMO_PROG; 1385 sqp->sq_awaken = lbolt; 1386 sqp->sq_tid = tid; 1387 } else if (sqp->sq_state & SQS_TMO_PROG) { 1388 /* timeout not needed */ 1389 sqp->sq_state &= ~SQS_TMO_PROG; 1390 mutex_exit(&(sqp)->sq_lock); 1391 (void) untimeout(tid); 1392 mutex_enter(&sqp->sq_lock); 1393 } 1394 } 1395 CALLB_CPR_SAFE_BEGIN(&cprinfo); 1396 cv_wait(async, lock); 1397 CALLB_CPR_SAFE_END(&cprinfo, lock); 1398 } 1399 1400 1401 #if SQUEUE_PROFILE 1402 if (SQ_PROFILING(sqp)) { 1403 SQDELTA(sqp, sq_time_worker, gethrtime() - start); 1404 } 1405 #endif 1406 } 1407 } 1408 1409 #if SQUEUE_PROFILE 1410 static int 1411 squeue_kstat_update(kstat_t *ksp, int rw) 1412 { 1413 struct squeue_kstat *sqsp = &squeue_kstat; 1414 squeue_t *sqp = ksp->ks_private; 1415 1416 if (rw == KSTAT_WRITE) 1417 return (EACCES); 1418 1419 #if SQUEUE_DEBUG 1420 sqsp->sq_count.value.ui64 = sqp->sq_count; 1421 sqsp->sq_max_qlen.value.ui64 = sqp->sq_stats.sq_max_qlen; 1422 #endif 1423 sqsp->sq_npackets_worker.value.ui64 = sqp->sq_stats.sq_npackets_worker; 1424 sqsp->sq_npackets_intr.value.ui64 = sqp->sq_stats.sq_npackets_intr; 1425 sqsp->sq_npackets_other.value.ui64 = sqp->sq_stats.sq_npackets_other; 1426 sqsp->sq_nqueued_intr.value.ui64 = sqp->sq_stats.sq_nqueued_intr; 1427 sqsp->sq_nqueued_other.value.ui64 = sqp->sq_stats.sq_nqueued_other; 1428 sqsp->sq_ndrains_worker.value.ui64 = sqp->sq_stats.sq_ndrains_worker; 1429 sqsp->sq_ndrains_intr.value.ui64 = sqp->sq_stats.sq_ndrains_intr; 1430 sqsp->sq_ndrains_other.value.ui64 = sqp->sq_stats.sq_ndrains_other; 1431 sqsp->sq_time_worker.value.ui64 = sqp->sq_stats.sq_time_worker; 1432 sqsp->sq_time_intr.value.ui64 = sqp->sq_stats.sq_time_intr; 1433 sqsp->sq_time_other.value.ui64 = sqp->sq_stats.sq_time_other; 1434 return (0); 1435 } 1436 #endif 1437 1438 void 1439 squeue_profile_enable(squeue_t *sqp) 1440 { 1441 mutex_enter(&sqp->sq_lock); 1442 sqp->sq_state |= SQS_PROFILE; 1443 mutex_exit(&sqp->sq_lock); 1444 } 1445 1446 void 1447 squeue_profile_disable(squeue_t *sqp) 1448 { 1449 mutex_enter(&sqp->sq_lock); 1450 sqp->sq_state &= ~SQS_PROFILE; 1451 mutex_exit(&sqp->sq_lock); 1452 } 1453 1454 void 1455 squeue_profile_reset(squeue_t *sqp) 1456 { 1457 #if SQUEUE_PROFILE 1458 bzero(&sqp->sq_stats, sizeof (sqstat_t)); 1459 #endif 1460 } 1461 1462 void 1463 squeue_profile_start(void) 1464 { 1465 #if SQUEUE_PROFILE 1466 squeue_profile = B_TRUE; 1467 #endif 1468 } 1469 1470 void 1471 squeue_profile_stop(void) 1472 { 1473 #if SQUEUE_PROFILE 1474 squeue_profile = B_FALSE; 1475 #endif 1476 } 1477 1478 uintptr_t * 1479 squeue_getprivate(squeue_t *sqp, sqprivate_t p) 1480 { 1481 ASSERT(p < SQPRIVATE_MAX); 1482 1483 return (&sqp->sq_private[p]); 1484 } 1485 1486 processorid_t 1487 squeue_binding(squeue_t *sqp) 1488 { 1489 return (sqp->sq_bind); 1490 } 1491