1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License, Version 1.0 only 6 * (the "License"). You may not use this file except in compliance 7 * with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or http://www.opensolaris.org/os/licensing. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 /* 23 * Copyright 2006 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 /* 30 * Squeues - TCP/IP serialization mechanism. 31 * 32 * This is a general purpose high-performance serialization mechanism. It is 33 * similar to a taskq with a single worker thread, the difference is that it 34 * does not imply a context switch - the thread placing a request may actually 35 * process it. It is also biased for processing requests in interrupt context. 36 * 37 * Each squeue has a worker thread which may optionally be bound to a CPU. 38 * 39 * Only one thread may process requests from a given squeue at any time. This is 40 * called "entering" squeue. 41 * 42 * Each dispatched request is processed either by 43 * 44 * a) Dispatching thread or 45 * b) Some other thread that is currently processing squeue at the time of 46 * request or 47 * c) worker thread. 48 * 49 * INTERFACES: 50 * 51 * squeue_t *squeue_create(name, bind, wait, pri) 52 * 53 * name: symbolic name for squeue. 54 * wait: time to wait before waiking the worker thread after queueing 55 * request. 56 * bind: preferred CPU binding for the worker thread. 57 * pri: thread priority for the worker thread. 58 * 59 * This function never fails and may sleep. It returns a transparent pointer 60 * to the squeue_t structure that is passed to all other squeue operations. 61 * 62 * void squeue_bind(sqp, bind) 63 * 64 * Bind squeue worker thread to a CPU specified by the 'bind' argument. The 65 * 'bind' value of -1 binds to the preferred thread specified for 66 * squeue_create. 67 * 68 * NOTE: Any value of 'bind' other then -1 is not supported currently, but the 69 * API is present - in the future it may be useful to specify different 70 * binding. 71 * 72 * void squeue_unbind(sqp) 73 * 74 * Unbind the worker thread from its preferred CPU. 75 * 76 * void squeue_enter(*sqp, *mp, proc, arg, tag) 77 * 78 * Post a single request for processing. Each request consists of mblock 'mp', 79 * function 'proc' to execute and an argument 'arg' to pass to this 80 * function. The function is called as (*proc)(arg, mp, sqp); The tag is an 81 * arbitrary number from 0 to 255 which will be stored in mp to track exact 82 * caller of squeue_enter. The combination of function name and the tag should 83 * provide enough information to identify the caller. 84 * 85 * If no one is processing the squeue, squeue_enter() will call the function 86 * immediately. Otherwise it will add the request to the queue for later 87 * processing. Once the function is executed, the thread may continue 88 * executing all other requests pending on the queue. 89 * 90 * NOTE: The tagging information is only used when SQUEUE_DEBUG is set to 1. 91 * NOTE: The argument can be conn_t only. Ideally we'd like to have generic 92 * argument, but we want to drop connection reference count here - this 93 * improves tail-call optimizations. 94 * XXX: The arg should have type conn_t. 95 * 96 * void squeue_enter_nodrain(*sqp, *mp, proc, arg, tag) 97 * 98 * Same as squeue_enter(), but the entering thread will only try to execute a 99 * single request. It will not continue executing any pending requests. 100 * 101 * void squeue_fill(*sqp, *mp, proc, arg, tag) 102 * 103 * Just place the request on the queue without trying to execute it. Arrange 104 * for the worker thread to process the request. 105 * 106 * void squeue_profile_enable(sqp) 107 * void squeue_profile_disable(sqp) 108 * 109 * Enable or disable profiling for specified 'sqp'. Profiling is only 110 * available when SQUEUE_PROFILE is set. 111 * 112 * void squeue_profile_reset(sqp) 113 * 114 * Reset all profiling information to zero. Profiling is only 115 * available when SQUEUE_PROFILE is set. 116 * 117 * void squeue_profile_start() 118 * void squeue_profile_stop() 119 * 120 * Globally enable or disabled profiling for all squeues. 121 * 122 * uintptr_t *squeue_getprivate(sqp, p) 123 * 124 * Each squeue keeps small amount of private data space available for various 125 * consumers. Current consumers include TCP and NCA. Other consumers need to 126 * add their private tag to the sqprivate_t enum. The private information is 127 * limited to an uintptr_t value. The squeue has no knowledge of its content 128 * and does not manage it in any way. 129 * 130 * The typical use may be a breakdown of data structures per CPU (since 131 * squeues are usually per CPU). See NCA for examples of use. 132 * Currently 'p' may have one legal value SQPRIVATE_TCP. 133 * 134 * processorid_t squeue_binding(sqp) 135 * 136 * Returns the CPU binding for a given squeue. 137 * 138 * TUNABALES: 139 * 140 * squeue_intrdrain_ms: Maximum time in ms interrupts spend draining any 141 * squeue. Note that this is approximation - squeues have no control on the 142 * time it takes to process each request. This limit is only checked 143 * between processing individual messages. 144 * Default: 20 ms. 145 * 146 * squeue_writerdrain_ms: Maximum time in ms non-interrupts spend draining any 147 * squeue. Note that this is approximation - squeues have no control on the 148 * time it takes to process each request. This limit is only checked 149 * between processing individual messages. 150 * Default: 10 ms. 151 * 152 * squeue_workerdrain_ms: Maximum time in ms worker thread spends draining any 153 * squeue. Note that this is approximation - squeues have no control on the 154 * time it takes to process each request. This limit is only checked 155 * between processing individual messages. 156 * Default: 10 ms. 157 * 158 * squeue_workerwait_ms: When worker thread is interrupted because workerdrain 159 * expired, how much time to wait before waking worker thread again. 160 * Default: 10 ms. 161 * 162 * DEFINES: 163 * 164 * SQUEUE_DEBUG: If defined as 1, special code is compiled in which records 165 * additional information aiding debugging is recorded in squeue. 166 * 167 * SQUEUE_PROFILE: If defined as 1, special code is compiled in which collects 168 * various squeue statistics and exports them as kstats. 169 * 170 * Ideally we would like both SQUEUE_DEBUG and SQUEUE_PROFILE to be always set, 171 * but it affects performance, so they are enabled on DEBUG kernels and disabled 172 * on non-DEBUG by default. 173 */ 174 175 #include <sys/types.h> 176 #include <sys/cmn_err.h> 177 #include <sys/debug.h> 178 #include <sys/kmem.h> 179 #include <sys/cpuvar.h> 180 #include <sys/condvar_impl.h> 181 #include <sys/systm.h> 182 #include <sys/callb.h> 183 #include <sys/sdt.h> 184 #include <sys/ddi.h> 185 186 #include <inet/ipclassifier.h> 187 #include <inet/udp_impl.h> 188 189 /* 190 * State flags. 191 * Note: The MDB IP module depends on the values of these flags. 192 */ 193 #define SQS_PROC 0x0001 /* being processed */ 194 #define SQS_WORKER 0x0002 /* worker thread */ 195 #define SQS_ENTER 0x0004 /* enter thread */ 196 #define SQS_FAST 0x0008 /* enter-fast thread */ 197 #define SQS_USER 0x0010 /* A non interrupt user */ 198 #define SQS_BOUND 0x0020 /* Worker thread is bound */ 199 #define SQS_PROFILE 0x0040 /* Enable profiling */ 200 #define SQS_REENTER 0x0080 /* Re entered thread */ 201 #define SQS_TMO_PROG 0x0100 /* Timeout is being set */ 202 203 #ifdef DEBUG 204 #define SQUEUE_DEBUG 1 205 #define SQUEUE_PROFILE 1 206 #else 207 #define SQUEUE_DEBUG 0 208 #define SQUEUE_PROFILE 0 209 #endif 210 211 #include <sys/squeue_impl.h> 212 213 static void squeue_fire(void *); 214 static void squeue_drain(squeue_t *, uint_t, hrtime_t); 215 static void squeue_worker(squeue_t *sqp); 216 217 #if SQUEUE_PROFILE 218 static kmutex_t squeue_kstat_lock; 219 static int squeue_kstat_update(kstat_t *, int); 220 #endif 221 222 kmem_cache_t *squeue_cache; 223 224 #define SQUEUE_MSEC_TO_NSEC 1000000 225 226 int squeue_intrdrain_ms = 20; 227 int squeue_writerdrain_ms = 10; 228 int squeue_workerdrain_ms = 10; 229 int squeue_workerwait_ms = 10; 230 231 /* The values above converted to ticks or nano seconds */ 232 static int squeue_intrdrain_ns = 0; 233 static int squeue_writerdrain_ns = 0; 234 static int squeue_workerdrain_ns = 0; 235 static int squeue_workerwait_tick = 0; 236 237 /* 238 * The minimum packet queued when worker thread doing the drain triggers 239 * polling (if squeue allows it). The choice of 3 is arbitrary. You 240 * definitely don't want it to be 1 since that will trigger polling 241 * on very low loads as well (ssh seems to do be one such example 242 * where packet flow was very low yet somehow 1 packet ended up getting 243 * queued and worker thread fires every 10ms and blanking also gets 244 * triggered. 245 */ 246 int squeue_worker_poll_min = 3; 247 248 #if SQUEUE_PROFILE 249 /* 250 * Set to B_TRUE to enable profiling. 251 */ 252 static int squeue_profile = B_FALSE; 253 #define SQ_PROFILING(sqp) (squeue_profile && ((sqp)->sq_state & SQS_PROFILE)) 254 255 #define SQSTAT(sqp, x) ((sqp)->sq_stats.x++) 256 #define SQDELTA(sqp, x, d) ((sqp)->sq_stats.x += (d)) 257 258 struct squeue_kstat { 259 kstat_named_t sq_count; 260 kstat_named_t sq_max_qlen; 261 kstat_named_t sq_npackets_worker; 262 kstat_named_t sq_npackets_intr; 263 kstat_named_t sq_npackets_other; 264 kstat_named_t sq_nqueued_intr; 265 kstat_named_t sq_nqueued_other; 266 kstat_named_t sq_ndrains_worker; 267 kstat_named_t sq_ndrains_intr; 268 kstat_named_t sq_ndrains_other; 269 kstat_named_t sq_time_worker; 270 kstat_named_t sq_time_intr; 271 kstat_named_t sq_time_other; 272 } squeue_kstat = { 273 { "count", KSTAT_DATA_UINT64 }, 274 { "max_qlen", KSTAT_DATA_UINT64 }, 275 { "packets_worker", KSTAT_DATA_UINT64 }, 276 { "packets_intr", KSTAT_DATA_UINT64 }, 277 { "packets_other", KSTAT_DATA_UINT64 }, 278 { "queued_intr", KSTAT_DATA_UINT64 }, 279 { "queued_other", KSTAT_DATA_UINT64 }, 280 { "ndrains_worker", KSTAT_DATA_UINT64 }, 281 { "ndrains_intr", KSTAT_DATA_UINT64 }, 282 { "ndrains_other", KSTAT_DATA_UINT64 }, 283 { "time_worker", KSTAT_DATA_UINT64 }, 284 { "time_intr", KSTAT_DATA_UINT64 }, 285 { "time_other", KSTAT_DATA_UINT64 }, 286 }; 287 #endif 288 289 #define SQUEUE_WORKER_WAKEUP(sqp) { \ 290 timeout_id_t tid = (sqp)->sq_tid; \ 291 \ 292 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 293 /* \ 294 * Queue isn't being processed, so take \ 295 * any post enqueue actions needed before leaving. \ 296 */ \ 297 if (tid != 0) { \ 298 /* \ 299 * Waiting for an enter() to process mblk(s). \ 300 */ \ 301 clock_t waited = lbolt - (sqp)->sq_awaken; \ 302 \ 303 if (TICK_TO_MSEC(waited) >= (sqp)->sq_wait) { \ 304 /* \ 305 * Times up and have a worker thread \ 306 * waiting for work, so schedule it. \ 307 */ \ 308 (sqp)->sq_tid = 0; \ 309 (sqp)->sq_awaken = lbolt; \ 310 cv_signal(&(sqp)->sq_async); \ 311 mutex_exit(&(sqp)->sq_lock); \ 312 (void) untimeout(tid); \ 313 return; \ 314 } \ 315 mutex_exit(&(sqp)->sq_lock); \ 316 return; \ 317 } else if ((sqp)->sq_state & SQS_TMO_PROG) { \ 318 mutex_exit(&(sqp)->sq_lock); \ 319 return; \ 320 } else if ((sqp)->sq_wait != 0) { \ 321 clock_t wait = (sqp)->sq_wait; \ 322 /* \ 323 * Wait up to sqp->sq_wait ms for an \ 324 * enter() to process this queue. We \ 325 * don't want to contend on timeout locks \ 326 * with sq_lock held for performance reasons, \ 327 * so drop the sq_lock before calling timeout \ 328 * but we need to check if timeout is required \ 329 * after re acquiring the sq_lock. Once \ 330 * the sq_lock is dropped, someone else could \ 331 * have processed the packet or the timeout could \ 332 * have already fired. \ 333 */ \ 334 (sqp)->sq_state |= SQS_TMO_PROG; \ 335 mutex_exit(&(sqp)->sq_lock); \ 336 tid = timeout(squeue_fire, (sqp), wait); \ 337 mutex_enter(&(sqp)->sq_lock); \ 338 /* Check again if we still need the timeout */ \ 339 if ((((sqp)->sq_state & (SQS_PROC|SQS_TMO_PROG)) == \ 340 SQS_TMO_PROG) && ((sqp)->sq_tid == 0) && \ 341 ((sqp)->sq_first != NULL)) { \ 342 (sqp)->sq_state &= ~SQS_TMO_PROG; \ 343 (sqp)->sq_awaken = lbolt; \ 344 (sqp)->sq_tid = tid; \ 345 mutex_exit(&(sqp)->sq_lock); \ 346 return; \ 347 } else { \ 348 if ((sqp)->sq_state & SQS_TMO_PROG) { \ 349 (sqp)->sq_state &= ~SQS_TMO_PROG; \ 350 mutex_exit(&(sqp)->sq_lock); \ 351 (void) untimeout(tid); \ 352 } else { \ 353 /* \ 354 * The timer fired before we could \ 355 * reacquire the sq_lock. squeue_fire \ 356 * removes the SQS_TMO_PROG flag \ 357 * and we don't need to do anything \ 358 * else. \ 359 */ \ 360 mutex_exit(&(sqp)->sq_lock); \ 361 } \ 362 } \ 363 } else { \ 364 /* \ 365 * Schedule the worker thread. \ 366 */ \ 367 (sqp)->sq_awaken = lbolt; \ 368 cv_signal(&(sqp)->sq_async); \ 369 mutex_exit(&(sqp)->sq_lock); \ 370 } \ 371 ASSERT(MUTEX_NOT_HELD(&(sqp)->sq_lock)); \ 372 } 373 374 #define ENQUEUE_MP(sqp, mp, proc, arg) { \ 375 /* \ 376 * Enque our mblk. \ 377 */ \ 378 (mp)->b_queue = NULL; \ 379 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 380 ASSERT((mp)->b_prev == NULL && (mp)->b_next == NULL); \ 381 (mp)->b_queue = (queue_t *)(proc); \ 382 (mp)->b_prev = (mblk_t *)(arg); \ 383 \ 384 if ((sqp)->sq_last != NULL) \ 385 (sqp)->sq_last->b_next = (mp); \ 386 else \ 387 (sqp)->sq_first = (mp); \ 388 (sqp)->sq_last = (mp); \ 389 (sqp)->sq_count++; \ 390 ASSERT((sqp)->sq_count > 0); \ 391 DTRACE_PROBE2(squeue__enqueue, squeue_t *, sqp, \ 392 mblk_t *, mp); \ 393 } 394 395 396 #define ENQUEUE_CHAIN(sqp, mp, tail, cnt) { \ 397 /* \ 398 * Enqueue our mblk chain. \ 399 */ \ 400 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 401 \ 402 if ((sqp)->sq_last != NULL) \ 403 (sqp)->sq_last->b_next = (mp); \ 404 else \ 405 (sqp)->sq_first = (mp); \ 406 (sqp)->sq_last = (tail); \ 407 (sqp)->sq_count += (cnt); \ 408 ASSERT((sqp)->sq_count > 0); \ 409 DTRACE_PROBE4(squeue__enqueuechain, squeue_t *, sqp, \ 410 mblk_t *, mp, mblk_t *, tail, int, cnt); \ 411 \ 412 } 413 414 #define SQS_POLLING_ON(sqp, rx_ring) { \ 415 ASSERT(rx_ring != NULL); \ 416 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 417 rx_ring->rr_blank(rx_ring->rr_handle, \ 418 MIN((sqp->sq_avg_drain_time * sqp->sq_count), \ 419 rx_ring->rr_max_blank_time), \ 420 rx_ring->rr_max_pkt_cnt); \ 421 rx_ring->rr_poll_state |= ILL_POLLING; \ 422 rx_ring->rr_poll_time = lbolt; \ 423 } 424 425 426 #define SQS_POLLING_OFF(sqp, rx_ring) { \ 427 ASSERT(rx_ring != NULL); \ 428 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 429 rx_ring->rr_blank(rx_ring->rr_handle, \ 430 rx_ring->rr_min_blank_time, \ 431 rx_ring->rr_min_pkt_cnt); \ 432 } 433 434 void 435 squeue_init(void) 436 { 437 squeue_cache = kmem_cache_create("squeue_cache", 438 sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0); 439 440 squeue_intrdrain_ns = squeue_intrdrain_ms * SQUEUE_MSEC_TO_NSEC; 441 squeue_writerdrain_ns = squeue_writerdrain_ms * SQUEUE_MSEC_TO_NSEC; 442 squeue_workerdrain_ns = squeue_workerdrain_ms * SQUEUE_MSEC_TO_NSEC; 443 squeue_workerwait_tick = MSEC_TO_TICK_ROUNDUP(squeue_workerwait_ms); 444 } 445 446 /* ARGSUSED */ 447 squeue_t * 448 squeue_create(char *name, processorid_t bind, clock_t wait, pri_t pri) 449 { 450 squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP); 451 452 bzero(sqp, sizeof (squeue_t)); 453 (void) strncpy(sqp->sq_name, name, SQ_NAMELEN + 1); 454 sqp->sq_name[SQ_NAMELEN] = '\0'; 455 456 sqp->sq_bind = bind; 457 sqp->sq_wait = MSEC_TO_TICK(wait); 458 sqp->sq_avg_drain_time = 459 drv_hztousec(NSEC_TO_TICK_ROUNDUP(squeue_intrdrain_ns)) / 460 NSEC_TO_TICK_ROUNDUP(squeue_intrdrain_ns); 461 462 #if SQUEUE_PROFILE 463 if ((sqp->sq_kstat = kstat_create("ip", bind, name, 464 "net", KSTAT_TYPE_NAMED, 465 sizeof (squeue_kstat) / sizeof (kstat_named_t), 466 KSTAT_FLAG_VIRTUAL)) != NULL) { 467 sqp->sq_kstat->ks_lock = &squeue_kstat_lock; 468 sqp->sq_kstat->ks_data = &squeue_kstat; 469 sqp->sq_kstat->ks_update = squeue_kstat_update; 470 sqp->sq_kstat->ks_private = sqp; 471 kstat_install(sqp->sq_kstat); 472 } 473 #endif 474 475 sqp->sq_worker = thread_create(NULL, 0, squeue_worker, 476 sqp, 0, &p0, TS_RUN, pri); 477 478 return (sqp); 479 } 480 481 /* ARGSUSED */ 482 void 483 squeue_bind(squeue_t *sqp, processorid_t bind) 484 { 485 ASSERT(bind == -1); 486 487 mutex_enter(&sqp->sq_lock); 488 if (sqp->sq_state & SQS_BOUND) { 489 mutex_exit(&sqp->sq_lock); 490 return; 491 } 492 493 sqp->sq_state |= SQS_BOUND; 494 mutex_exit(&sqp->sq_lock); 495 496 thread_affinity_set(sqp->sq_worker, sqp->sq_bind); 497 } 498 499 void 500 squeue_unbind(squeue_t *sqp) 501 { 502 mutex_enter(&sqp->sq_lock); 503 if (!(sqp->sq_state & SQS_BOUND)) { 504 mutex_exit(&sqp->sq_lock); 505 return; 506 } 507 508 sqp->sq_state &= ~SQS_BOUND; 509 mutex_exit(&sqp->sq_lock); 510 511 thread_affinity_clear(sqp->sq_worker); 512 } 513 514 /* 515 * squeue_enter() - enter squeue sqp with mblk mp (which can be 516 * a chain), while tail points to the end and cnt in number of 517 * mblks in the chain. 518 * 519 * For a chain of single packet (i.e. mp == tail), go through the 520 * fast path if no one is processing the squeue and nothing is queued. 521 * 522 * The proc and arg for each mblk is already stored in the mblk in 523 * appropriate places. 524 */ 525 void 526 squeue_enter_chain(squeue_t *sqp, mblk_t *mp, mblk_t *tail, 527 uint32_t cnt, uint8_t tag) 528 { 529 int interrupt = servicing_interrupt(); 530 void *arg; 531 sqproc_t proc; 532 hrtime_t now; 533 #if SQUEUE_PROFILE 534 hrtime_t start, delta; 535 #endif 536 537 ASSERT(sqp != NULL); 538 ASSERT(mp != NULL); 539 ASSERT(tail != NULL); 540 ASSERT(cnt > 0); 541 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 542 543 mutex_enter(&sqp->sq_lock); 544 if (!(sqp->sq_state & SQS_PROC)) { 545 /* 546 * See if anything is already queued. If we are the 547 * first packet, do inline processing else queue the 548 * packet and do the drain. 549 */ 550 sqp->sq_run = curthread; 551 if (sqp->sq_first == NULL && cnt == 1) { 552 /* 553 * Fast-path, ok to process and nothing queued. 554 */ 555 sqp->sq_state |= (SQS_PROC|SQS_FAST); 556 mutex_exit(&sqp->sq_lock); 557 558 /* 559 * We are the chain of 1 packet so 560 * go through this fast path. 561 */ 562 arg = mp->b_prev; 563 mp->b_prev = NULL; 564 proc = (sqproc_t)mp->b_queue; 565 mp->b_queue = NULL; 566 567 ASSERT(proc != NULL); 568 ASSERT(arg != NULL); 569 ASSERT(mp->b_next == NULL); 570 571 #if SQUEUE_DEBUG 572 sqp->sq_isintr = interrupt; 573 sqp->sq_curmp = mp; 574 sqp->sq_curproc = proc; 575 sqp->sq_connp = arg; 576 mp->b_tag = sqp->sq_tag = tag; 577 #endif 578 #if SQUEUE_PROFILE 579 if (SQ_PROFILING(sqp)) { 580 if (interrupt) 581 SQSTAT(sqp, sq_npackets_intr); 582 else 583 SQSTAT(sqp, sq_npackets_other); 584 start = gethrtime(); 585 } 586 #endif 587 ((conn_t *)arg)->conn_on_sqp = B_TRUE; 588 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 589 sqp, mblk_t *, mp, conn_t *, arg); 590 (*proc)(arg, mp, sqp); 591 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 592 sqp, conn_t *, arg); 593 ((conn_t *)arg)->conn_on_sqp = B_FALSE; 594 595 #if SQUEUE_PROFILE 596 if (SQ_PROFILING(sqp)) { 597 delta = gethrtime() - start; 598 if (interrupt) 599 SQDELTA(sqp, sq_time_intr, delta); 600 else 601 SQDELTA(sqp, sq_time_other, delta); 602 } 603 #endif 604 #if SQUEUE_DEBUG 605 sqp->sq_curmp = NULL; 606 sqp->sq_curproc = NULL; 607 sqp->sq_connp = NULL; 608 sqp->sq_isintr = 0; 609 #endif 610 611 CONN_DEC_REF((conn_t *)arg); 612 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 613 mutex_enter(&sqp->sq_lock); 614 sqp->sq_state &= ~(SQS_PROC|SQS_FAST); 615 if (sqp->sq_first == NULL) { 616 /* 617 * We processed inline our packet and 618 * nothing new has arrived. We are done. 619 */ 620 sqp->sq_run = NULL; 621 mutex_exit(&sqp->sq_lock); 622 return; 623 } else if (sqp->sq_bind != CPU->cpu_id) { 624 /* 625 * If the current thread is not running 626 * on the CPU to which this squeue is bound, 627 * then don't allow it to drain. 628 */ 629 sqp->sq_run = NULL; 630 SQUEUE_WORKER_WAKEUP(sqp); 631 return; 632 } 633 } else { 634 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 635 #if SQUEUE_DEBUG 636 mp->b_tag = tag; 637 #endif 638 #if SQUEUE_PROFILE 639 if (SQ_PROFILING(sqp)) { 640 if (servicing_interrupt()) 641 SQSTAT(sqp, sq_nqueued_intr); 642 else 643 SQSTAT(sqp, sq_nqueued_other); 644 if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) 645 sqp->sq_stats.sq_max_qlen = 646 sqp->sq_count; 647 } 648 #endif 649 } 650 651 /* 652 * We are here because either we couldn't do inline 653 * processing (because something was already queued), 654 * or we had a chanin of more than one packet, 655 * or something else arrived after we were done with 656 * inline processing. 657 */ 658 ASSERT(MUTEX_HELD(&sqp->sq_lock)); 659 ASSERT(sqp->sq_first != NULL); 660 661 #if SQUEUE_PROFILE 662 if (SQ_PROFILING(sqp)) { 663 start = gethrtime(); 664 } 665 #endif 666 #if SQUEUE_DEBUG 667 sqp->sq_isintr = interrupt; 668 #endif 669 670 now = gethrtime(); 671 if (interrupt) { 672 squeue_drain(sqp, SQS_ENTER, now + 673 squeue_intrdrain_ns); 674 } else { 675 squeue_drain(sqp, SQS_USER, now + 676 squeue_writerdrain_ns); 677 } 678 679 #if SQUEUE_PROFILE 680 if (SQ_PROFILING(sqp)) { 681 delta = gethrtime() - start; 682 if (interrupt) 683 SQDELTA(sqp, sq_time_intr, delta); 684 else 685 SQDELTA(sqp, sq_time_other, delta); 686 } 687 #endif 688 #if SQUEUE_DEBUG 689 sqp->sq_isintr = 0; 690 #endif 691 692 /* 693 * If we didn't do a complete drain, the worker 694 * thread was already signalled by squeue_drain. 695 */ 696 sqp->sq_run = NULL; 697 mutex_exit(&sqp->sq_lock); 698 return; 699 } else { 700 ASSERT(sqp->sq_run != NULL); 701 /* 702 * Queue is already being processed. Just enqueue 703 * the packet and go away. 704 */ 705 #if SQUEUE_DEBUG 706 mp->b_tag = tag; 707 #endif 708 #if SQUEUE_PROFILE 709 if (SQ_PROFILING(sqp)) { 710 if (servicing_interrupt()) 711 SQSTAT(sqp, sq_nqueued_intr); 712 else 713 SQSTAT(sqp, sq_nqueued_other); 714 if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) 715 sqp->sq_stats.sq_max_qlen = sqp->sq_count; 716 } 717 #endif 718 719 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 720 mutex_exit(&sqp->sq_lock); 721 return; 722 } 723 } 724 725 /* 726 * squeue_enter() - enter squeue *sqp with mblk *mp with argument of *arg. 727 */ 728 void 729 squeue_enter(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void *arg, 730 uint8_t tag) 731 { 732 int interrupt = servicing_interrupt(); 733 hrtime_t now; 734 #if SQUEUE_PROFILE 735 hrtime_t start, delta; 736 #endif 737 #if SQUEUE_DEBUG 738 conn_t *connp = (conn_t *)arg; 739 ASSERT(!IPCL_IS_TCP(connp) || connp->conn_tcp->tcp_connp == connp); 740 ASSERT(!IPCL_IS_UDP(connp) || connp->conn_udp->udp_connp == connp); 741 #endif 742 743 ASSERT(proc != NULL); 744 ASSERT(sqp != NULL); 745 ASSERT(mp != NULL); 746 ASSERT(mp->b_next == NULL); 747 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 748 749 mutex_enter(&sqp->sq_lock); 750 if (!(sqp->sq_state & SQS_PROC)) { 751 /* 752 * See if anything is already queued. If we are the 753 * first packet, do inline processing else queue the 754 * packet and do the drain. 755 */ 756 sqp->sq_run = curthread; 757 if (sqp->sq_first == NULL) { 758 /* 759 * Fast-path, ok to process and nothing queued. 760 */ 761 sqp->sq_state |= (SQS_PROC|SQS_FAST); 762 mutex_exit(&sqp->sq_lock); 763 764 #if SQUEUE_DEBUG 765 sqp->sq_isintr = interrupt; 766 sqp->sq_curmp = mp; 767 sqp->sq_curproc = proc; 768 sqp->sq_connp = connp; 769 mp->b_tag = sqp->sq_tag = tag; 770 #endif 771 #if SQUEUE_PROFILE 772 if (SQ_PROFILING(sqp)) { 773 if (interrupt) 774 SQSTAT(sqp, sq_npackets_intr); 775 else 776 SQSTAT(sqp, sq_npackets_other); 777 start = gethrtime(); 778 } 779 #endif 780 ((conn_t *)arg)->conn_on_sqp = B_TRUE; 781 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 782 sqp, mblk_t *, mp, conn_t *, arg); 783 (*proc)(arg, mp, sqp); 784 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 785 sqp, conn_t *, arg); 786 ((conn_t *)arg)->conn_on_sqp = B_FALSE; 787 788 #if SQUEUE_PROFILE 789 if (SQ_PROFILING(sqp)) { 790 delta = gethrtime() - start; 791 if (interrupt) 792 SQDELTA(sqp, sq_time_intr, delta); 793 else 794 SQDELTA(sqp, sq_time_other, delta); 795 } 796 #endif 797 #if SQUEUE_DEBUG 798 sqp->sq_curmp = NULL; 799 sqp->sq_curproc = NULL; 800 sqp->sq_connp = NULL; 801 sqp->sq_isintr = 0; 802 #endif 803 804 CONN_DEC_REF((conn_t *)arg); 805 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 806 mutex_enter(&sqp->sq_lock); 807 sqp->sq_state &= ~(SQS_PROC|SQS_FAST); 808 if (sqp->sq_first == NULL) { 809 /* 810 * We processed inline our packet and 811 * nothing new has arrived. We are done. 812 */ 813 sqp->sq_run = NULL; 814 mutex_exit(&sqp->sq_lock); 815 return; 816 } else if (sqp->sq_bind != CPU->cpu_id) { 817 /* 818 * If the current thread is not running 819 * on the CPU to which this squeue is bound, 820 * then don't allow it to drain. 821 */ 822 sqp->sq_run = NULL; 823 SQUEUE_WORKER_WAKEUP(sqp); 824 return; 825 } 826 } else { 827 ENQUEUE_MP(sqp, mp, proc, arg); 828 #if SQUEUE_DEBUG 829 mp->b_tag = tag; 830 #endif 831 #if SQUEUE_PROFILE 832 if (SQ_PROFILING(sqp)) { 833 if (servicing_interrupt()) 834 SQSTAT(sqp, sq_nqueued_intr); 835 else 836 SQSTAT(sqp, sq_nqueued_other); 837 if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) 838 sqp->sq_stats.sq_max_qlen = 839 sqp->sq_count; 840 } 841 #endif 842 } 843 844 /* 845 * We are here because either we couldn't do inline 846 * processing (because something was already queued) 847 * or something else arrived after we were done with 848 * inline processing. 849 */ 850 ASSERT(MUTEX_HELD(&sqp->sq_lock)); 851 ASSERT(sqp->sq_first != NULL); 852 853 #if SQUEUE_PROFILE 854 if (SQ_PROFILING(sqp)) { 855 start = gethrtime(); 856 } 857 #endif 858 #if SQUEUE_DEBUG 859 sqp->sq_isintr = interrupt; 860 #endif 861 862 now = gethrtime(); 863 if (interrupt) { 864 squeue_drain(sqp, SQS_ENTER, now + 865 squeue_intrdrain_ns); 866 } else { 867 squeue_drain(sqp, SQS_USER, now + 868 squeue_writerdrain_ns); 869 } 870 871 #if SQUEUE_PROFILE 872 if (SQ_PROFILING(sqp)) { 873 delta = gethrtime() - start; 874 if (interrupt) 875 SQDELTA(sqp, sq_time_intr, delta); 876 else 877 SQDELTA(sqp, sq_time_other, delta); 878 } 879 #endif 880 #if SQUEUE_DEBUG 881 sqp->sq_isintr = 0; 882 #endif 883 884 /* 885 * If we didn't do a complete drain, the worker 886 * thread was already signalled by squeue_drain. 887 */ 888 sqp->sq_run = NULL; 889 mutex_exit(&sqp->sq_lock); 890 return; 891 } else { 892 ASSERT(sqp->sq_run != NULL); 893 /* 894 * We let a thread processing a squeue reenter only 895 * once. This helps the case of incoming connection 896 * where a SYN-ACK-ACK that triggers the conn_ind 897 * doesn't have to queue the packet if listener and 898 * eager are on the same squeue. Also helps the 899 * loopback connection where the two ends are bound 900 * to the same squeue (which is typical on single 901 * CPU machines). 902 * We let the thread reenter only once for the fear 903 * of stack getting blown with multiple traversal. 904 */ 905 if (!(sqp->sq_state & SQS_REENTER) && 906 (sqp->sq_run == curthread) && 907 (((conn_t *)arg)->conn_on_sqp == B_FALSE)) { 908 sqp->sq_state |= SQS_REENTER; 909 mutex_exit(&sqp->sq_lock); 910 911 ((conn_t *)arg)->conn_on_sqp = B_TRUE; 912 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 913 sqp, mblk_t *, mp, conn_t *, arg); 914 (*proc)(arg, mp, sqp); 915 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 916 sqp, conn_t *, arg); 917 ((conn_t *)arg)->conn_on_sqp = B_FALSE; 918 CONN_DEC_REF((conn_t *)arg); 919 920 mutex_enter(&sqp->sq_lock); 921 sqp->sq_state &= ~SQS_REENTER; 922 mutex_exit(&sqp->sq_lock); 923 return; 924 } 925 /* 926 * Queue is already being processed. Just enqueue 927 * the packet and go away. 928 */ 929 #if SQUEUE_DEBUG 930 mp->b_tag = tag; 931 #endif 932 #if SQUEUE_PROFILE 933 if (SQ_PROFILING(sqp)) { 934 if (servicing_interrupt()) 935 SQSTAT(sqp, sq_nqueued_intr); 936 else 937 SQSTAT(sqp, sq_nqueued_other); 938 if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) 939 sqp->sq_stats.sq_max_qlen = sqp->sq_count; 940 } 941 #endif 942 943 ENQUEUE_MP(sqp, mp, proc, arg); 944 mutex_exit(&sqp->sq_lock); 945 return; 946 } 947 } 948 949 void 950 squeue_enter_nodrain(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void *arg, 951 uint8_t tag) 952 { 953 int interrupt = servicing_interrupt(); 954 boolean_t being_processed; 955 #if SQUEUE_DEBUG 956 conn_t *connp = (conn_t *)arg; 957 #endif 958 #if SQUEUE_PROFILE 959 hrtime_t start, delta; 960 #endif 961 962 ASSERT(proc != NULL); 963 ASSERT(sqp != NULL); 964 ASSERT(mp != NULL); 965 ASSERT(mp->b_next == NULL); 966 ASSERT(!IPCL_IS_TCP(connp) || connp->conn_tcp->tcp_connp == connp); 967 ASSERT(!IPCL_IS_UDP(connp) || connp->conn_udp->udp_connp == connp); 968 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 969 970 mutex_enter(&sqp->sq_lock); 971 972 being_processed = (sqp->sq_state & SQS_PROC); 973 if (!being_processed && (sqp->sq_first == NULL)) { 974 /* 975 * Fast-path, ok to process and nothing queued. 976 */ 977 sqp->sq_state |= (SQS_PROC|SQS_FAST); 978 sqp->sq_run = curthread; 979 mutex_exit(&sqp->sq_lock); 980 981 #if SQUEUE_DEBUG 982 sqp->sq_isintr = interrupt; 983 sqp->sq_curmp = mp; 984 sqp->sq_curproc = proc; 985 sqp->sq_connp = connp; 986 mp->b_tag = sqp->sq_tag = tag; 987 #endif 988 989 #if SQUEUE_PROFILE 990 if (SQ_PROFILING(sqp)) { 991 if (interrupt) 992 SQSTAT(sqp, sq_npackets_intr); 993 else 994 SQSTAT(sqp, sq_npackets_other); 995 start = gethrtime(); 996 } 997 #endif 998 999 ((conn_t *)arg)->conn_on_sqp = B_TRUE; 1000 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 1001 sqp, mblk_t *, mp, conn_t *, arg); 1002 (*proc)(arg, mp, sqp); 1003 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 1004 sqp, conn_t *, arg); 1005 ((conn_t *)arg)->conn_on_sqp = B_FALSE; 1006 1007 #if SQUEUE_DEBUG 1008 sqp->sq_curmp = NULL; 1009 sqp->sq_curproc = NULL; 1010 sqp->sq_connp = NULL; 1011 sqp->sq_isintr = 0; 1012 #endif 1013 #if SQUEUE_PROFILE 1014 if (SQ_PROFILING(sqp)) { 1015 delta = gethrtime() - start; 1016 if (interrupt) 1017 SQDELTA(sqp, sq_time_intr, delta); 1018 else 1019 SQDELTA(sqp, sq_time_other, delta); 1020 } 1021 #endif 1022 1023 CONN_DEC_REF((conn_t *)arg); 1024 mutex_enter(&sqp->sq_lock); 1025 sqp->sq_state &= ~(SQS_PROC|SQS_FAST); 1026 sqp->sq_run = NULL; 1027 if (sqp->sq_first == NULL) { 1028 /* 1029 * We processed inline our packet and 1030 * nothing new has arrived. We are done. 1031 */ 1032 mutex_exit(&sqp->sq_lock); 1033 } else { 1034 SQUEUE_WORKER_WAKEUP(sqp); 1035 } 1036 return; 1037 } else { 1038 /* 1039 * We let a thread processing a squeue reenter only 1040 * once. This helps the case of incoming connection 1041 * where a SYN-ACK-ACK that triggers the conn_ind 1042 * doesn't have to queue the packet if listener and 1043 * eager are on the same squeue. Also helps the 1044 * loopback connection where the two ends are bound 1045 * to the same squeue (which is typical on single 1046 * CPU machines). 1047 * We let the thread reenter only once for the fear 1048 * of stack getting blown with multiple traversal. 1049 */ 1050 if (being_processed && !(sqp->sq_state & SQS_REENTER) && 1051 (sqp->sq_run == curthread) && 1052 (((conn_t *)arg)->conn_on_sqp == B_FALSE)) { 1053 sqp->sq_state |= SQS_REENTER; 1054 mutex_exit(&sqp->sq_lock); 1055 1056 ((conn_t *)arg)->conn_on_sqp = B_TRUE; 1057 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 1058 sqp, mblk_t *, mp, conn_t *, arg); 1059 (*proc)(arg, mp, sqp); 1060 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 1061 sqp, conn_t *, arg); 1062 ((conn_t *)arg)->conn_on_sqp = B_FALSE; 1063 CONN_DEC_REF((conn_t *)arg); 1064 1065 mutex_enter(&sqp->sq_lock); 1066 sqp->sq_state &= ~SQS_REENTER; 1067 mutex_exit(&sqp->sq_lock); 1068 return; 1069 } 1070 1071 #if SQUEUE_DEBUG 1072 mp->b_tag = tag; 1073 #endif 1074 #if SQUEUE_PROFILE 1075 if (SQ_PROFILING(sqp)) { 1076 if (servicing_interrupt()) 1077 SQSTAT(sqp, sq_nqueued_intr); 1078 else 1079 SQSTAT(sqp, sq_nqueued_other); 1080 if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) 1081 sqp->sq_stats.sq_max_qlen = sqp->sq_count; 1082 } 1083 #endif 1084 ENQUEUE_MP(sqp, mp, proc, arg); 1085 if (being_processed) { 1086 /* 1087 * Queue is already being processed. 1088 * No need to do anything. 1089 */ 1090 mutex_exit(&sqp->sq_lock); 1091 return; 1092 } 1093 SQUEUE_WORKER_WAKEUP(sqp); 1094 } 1095 } 1096 1097 /* 1098 * squeue_fill() - fill squeue *sqp with mblk *mp with argument of *arg 1099 * without processing the squeue. 1100 */ 1101 /* ARGSUSED */ 1102 void 1103 squeue_fill(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void * arg, 1104 uint8_t tag) 1105 { 1106 #if SQUEUE_DEBUG 1107 conn_t *connp = (conn_t *)arg; 1108 #endif 1109 ASSERT(proc != NULL); 1110 ASSERT(sqp != NULL); 1111 ASSERT(mp != NULL); 1112 ASSERT(mp->b_next == NULL); 1113 ASSERT(!IPCL_IS_TCP(connp) || connp->conn_tcp->tcp_connp == connp); 1114 ASSERT(!IPCL_IS_UDP(connp) || connp->conn_udp->udp_connp == connp); 1115 1116 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 1117 mutex_enter(&sqp->sq_lock); 1118 ENQUEUE_MP(sqp, mp, proc, arg); 1119 #if SQUEUE_DEBUG 1120 mp->b_tag = tag; 1121 #endif 1122 #if SQUEUE_PROFILE 1123 if (SQ_PROFILING(sqp)) { 1124 if (servicing_interrupt()) 1125 SQSTAT(sqp, sq_nqueued_intr); 1126 else 1127 SQSTAT(sqp, sq_nqueued_other); 1128 if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) 1129 sqp->sq_stats.sq_max_qlen = sqp->sq_count; 1130 } 1131 #endif 1132 1133 /* 1134 * If queue is already being processed. No need to do anything. 1135 */ 1136 if (sqp->sq_state & SQS_PROC) { 1137 mutex_exit(&sqp->sq_lock); 1138 return; 1139 } 1140 1141 SQUEUE_WORKER_WAKEUP(sqp); 1142 } 1143 1144 1145 /* 1146 * PRIVATE FUNCTIONS 1147 */ 1148 1149 static void 1150 squeue_fire(void *arg) 1151 { 1152 squeue_t *sqp = arg; 1153 uint_t state; 1154 1155 mutex_enter(&sqp->sq_lock); 1156 1157 state = sqp->sq_state; 1158 if (sqp->sq_tid == 0 && !(state & SQS_TMO_PROG)) { 1159 mutex_exit(&sqp->sq_lock); 1160 return; 1161 } 1162 1163 sqp->sq_tid = 0; 1164 /* 1165 * The timeout fired before we got a chance to set it. 1166 * Process it anyway but remove the SQS_TMO_PROG so that 1167 * the guy trying to set the timeout knows that it has 1168 * already been processed. 1169 */ 1170 if (state & SQS_TMO_PROG) 1171 sqp->sq_state &= ~SQS_TMO_PROG; 1172 1173 if (!(state & SQS_PROC)) { 1174 sqp->sq_awaken = lbolt; 1175 cv_signal(&sqp->sq_async); 1176 } 1177 mutex_exit(&sqp->sq_lock); 1178 } 1179 1180 static void 1181 squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire) 1182 { 1183 mblk_t *mp; 1184 mblk_t *head; 1185 sqproc_t proc; 1186 conn_t *connp; 1187 clock_t start = lbolt; 1188 clock_t drain_time; 1189 timeout_id_t tid; 1190 uint_t cnt; 1191 uint_t total_cnt = 0; 1192 ill_rx_ring_t *sq_rx_ring = sqp->sq_rx_ring; 1193 int interrupt = servicing_interrupt(); 1194 boolean_t poll_on = B_FALSE; 1195 hrtime_t now; 1196 1197 ASSERT(mutex_owned(&sqp->sq_lock)); 1198 ASSERT(!(sqp->sq_state & SQS_PROC)); 1199 1200 #if SQUEUE_PROFILE 1201 if (SQ_PROFILING(sqp)) { 1202 if (interrupt) 1203 SQSTAT(sqp, sq_ndrains_intr); 1204 else if (!(proc_type & SQS_WORKER)) 1205 SQSTAT(sqp, sq_ndrains_other); 1206 else 1207 SQSTAT(sqp, sq_ndrains_worker); 1208 } 1209 #endif 1210 1211 if ((tid = sqp->sq_tid) != 0) 1212 sqp->sq_tid = 0; 1213 1214 sqp->sq_state |= SQS_PROC | proc_type; 1215 head = sqp->sq_first; 1216 sqp->sq_first = NULL; 1217 sqp->sq_last = NULL; 1218 cnt = sqp->sq_count; 1219 1220 /* 1221 * We have backlog built up. Switch to polling mode if the 1222 * device underneath allows it. Need to do it only for 1223 * drain by non-interrupt thread so interrupts don't 1224 * come and disrupt us in between. If its a interrupt thread, 1225 * no need because most devices will not issue another 1226 * interrupt till this one returns. 1227 */ 1228 if ((sqp->sq_state & SQS_POLL_CAPAB) && !(proc_type & SQS_ENTER) && 1229 (sqp->sq_count > squeue_worker_poll_min)) { 1230 ASSERT(sq_rx_ring != NULL); 1231 SQS_POLLING_ON(sqp, sq_rx_ring); 1232 poll_on = B_TRUE; 1233 } 1234 1235 mutex_exit(&sqp->sq_lock); 1236 1237 if (tid != 0) 1238 (void) untimeout(tid); 1239 again: 1240 while ((mp = head) != NULL) { 1241 head = mp->b_next; 1242 mp->b_next = NULL; 1243 1244 proc = (sqproc_t)mp->b_queue; 1245 mp->b_queue = NULL; 1246 connp = (conn_t *)mp->b_prev; 1247 mp->b_prev = NULL; 1248 #if SQUEUE_DEBUG 1249 sqp->sq_curmp = mp; 1250 sqp->sq_curproc = proc; 1251 sqp->sq_connp = connp; 1252 sqp->sq_tag = mp->b_tag; 1253 #endif 1254 1255 #if SQUEUE_PROFILE 1256 if (SQ_PROFILING(sqp)) { 1257 if (interrupt) 1258 SQSTAT(sqp, sq_npackets_intr); 1259 else if (!(proc_type & SQS_WORKER)) 1260 SQSTAT(sqp, sq_npackets_other); 1261 else 1262 SQSTAT(sqp, sq_npackets_worker); 1263 } 1264 #endif 1265 1266 connp->conn_on_sqp = B_TRUE; 1267 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 1268 sqp, mblk_t *, mp, conn_t *, connp); 1269 (*proc)(connp, mp, sqp); 1270 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 1271 sqp, conn_t *, connp); 1272 connp->conn_on_sqp = B_FALSE; 1273 CONN_DEC_REF(connp); 1274 } 1275 1276 1277 #if SQUEUE_DEBUG 1278 sqp->sq_curmp = NULL; 1279 sqp->sq_curproc = NULL; 1280 sqp->sq_connp = NULL; 1281 #endif 1282 1283 mutex_enter(&sqp->sq_lock); 1284 sqp->sq_count -= cnt; 1285 total_cnt += cnt; 1286 1287 if (sqp->sq_first != NULL) { 1288 1289 now = gethrtime(); 1290 if (!expire || (now < expire)) { 1291 /* More arrived and time not expired */ 1292 head = sqp->sq_first; 1293 sqp->sq_first = NULL; 1294 sqp->sq_last = NULL; 1295 cnt = sqp->sq_count; 1296 mutex_exit(&sqp->sq_lock); 1297 goto again; 1298 } 1299 1300 /* 1301 * If we are not worker thread and we 1302 * reached our time limit to do drain, 1303 * signal the worker thread to pick 1304 * up the work. 1305 * If we were the worker thread, then 1306 * we take a break to allow an interrupt 1307 * or writer to pick up the load. 1308 */ 1309 if (proc_type != SQS_WORKER) { 1310 sqp->sq_awaken = lbolt; 1311 cv_signal(&sqp->sq_async); 1312 } 1313 } 1314 1315 /* 1316 * Try to see if we can get a time estimate to process a packet. 1317 * Do it only in interrupt context since less chance of context 1318 * switch or pinning etc. to get a better estimate. 1319 */ 1320 if (interrupt && ((drain_time = (lbolt - start)) > 0)) 1321 sqp->sq_avg_drain_time = ((80 * sqp->sq_avg_drain_time) + 1322 (20 * (drv_hztousec(drain_time)/total_cnt)))/100; 1323 1324 sqp->sq_state &= ~(SQS_PROC | proc_type); 1325 1326 /* 1327 * If polling was turned on, turn it off and reduce the default 1328 * interrupt blank interval as well to bring new packets in faster 1329 * (reduces the latency when there is no backlog). 1330 */ 1331 if (poll_on && (sqp->sq_state & SQS_POLL_CAPAB)) { 1332 ASSERT(sq_rx_ring != NULL); 1333 SQS_POLLING_OFF(sqp, sq_rx_ring); 1334 } 1335 } 1336 1337 static void 1338 squeue_worker(squeue_t *sqp) 1339 { 1340 kmutex_t *lock = &sqp->sq_lock; 1341 kcondvar_t *async = &sqp->sq_async; 1342 callb_cpr_t cprinfo; 1343 hrtime_t now; 1344 #if SQUEUE_PROFILE 1345 hrtime_t start; 1346 #endif 1347 1348 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "nca"); 1349 mutex_enter(lock); 1350 1351 for (;;) { 1352 while (sqp->sq_first == NULL || (sqp->sq_state & SQS_PROC)) { 1353 CALLB_CPR_SAFE_BEGIN(&cprinfo); 1354 still_wait: 1355 cv_wait(async, lock); 1356 if (sqp->sq_state & SQS_PROC) { 1357 goto still_wait; 1358 } 1359 CALLB_CPR_SAFE_END(&cprinfo, lock); 1360 } 1361 1362 #if SQUEUE_PROFILE 1363 if (SQ_PROFILING(sqp)) { 1364 start = gethrtime(); 1365 } 1366 #endif 1367 1368 ASSERT(squeue_workerdrain_ns != 0); 1369 now = gethrtime(); 1370 sqp->sq_run = curthread; 1371 squeue_drain(sqp, SQS_WORKER, now + squeue_workerdrain_ns); 1372 sqp->sq_run = NULL; 1373 1374 if (sqp->sq_first != NULL) { 1375 /* 1376 * Doing too much processing by worker thread 1377 * in presense of interrupts can be sub optimal. 1378 * Instead, once a drain is done by worker thread 1379 * for squeue_writerdrain_ns (the reason we are 1380 * here), we force wait for squeue_workerwait_tick 1381 * before doing more processing even if sq_wait is 1382 * set to 0. 1383 * 1384 * This can be counterproductive for performance 1385 * if worker thread is the only means to process 1386 * the packets (interrupts or writers are not 1387 * allowed inside the squeue). 1388 */ 1389 if (sqp->sq_tid == 0 && 1390 !(sqp->sq_state & SQS_TMO_PROG)) { 1391 timeout_id_t tid; 1392 1393 sqp->sq_state |= SQS_TMO_PROG; 1394 mutex_exit(&sqp->sq_lock); 1395 tid = timeout(squeue_fire, sqp, 1396 squeue_workerwait_tick); 1397 mutex_enter(&sqp->sq_lock); 1398 /* 1399 * Check again if we still need 1400 * the timeout 1401 */ 1402 if (((sqp->sq_state & (SQS_TMO_PROG|SQS_PROC)) 1403 == SQS_TMO_PROG) && (sqp->sq_tid == 0) && 1404 (sqp->sq_first != NULL)) { 1405 sqp->sq_state &= ~SQS_TMO_PROG; 1406 sqp->sq_awaken = lbolt; 1407 sqp->sq_tid = tid; 1408 } else if (sqp->sq_state & SQS_TMO_PROG) { 1409 /* timeout not needed */ 1410 sqp->sq_state &= ~SQS_TMO_PROG; 1411 mutex_exit(&(sqp)->sq_lock); 1412 (void) untimeout(tid); 1413 mutex_enter(&sqp->sq_lock); 1414 } 1415 } 1416 CALLB_CPR_SAFE_BEGIN(&cprinfo); 1417 cv_wait(async, lock); 1418 CALLB_CPR_SAFE_END(&cprinfo, lock); 1419 } 1420 1421 1422 #if SQUEUE_PROFILE 1423 if (SQ_PROFILING(sqp)) { 1424 SQDELTA(sqp, sq_time_worker, gethrtime() - start); 1425 } 1426 #endif 1427 } 1428 } 1429 1430 #if SQUEUE_PROFILE 1431 static int 1432 squeue_kstat_update(kstat_t *ksp, int rw) 1433 { 1434 struct squeue_kstat *sqsp = &squeue_kstat; 1435 squeue_t *sqp = ksp->ks_private; 1436 1437 if (rw == KSTAT_WRITE) 1438 return (EACCES); 1439 1440 #if SQUEUE_DEBUG 1441 sqsp->sq_count.value.ui64 = sqp->sq_count; 1442 sqsp->sq_max_qlen.value.ui64 = sqp->sq_stats.sq_max_qlen; 1443 #endif 1444 sqsp->sq_npackets_worker.value.ui64 = sqp->sq_stats.sq_npackets_worker; 1445 sqsp->sq_npackets_intr.value.ui64 = sqp->sq_stats.sq_npackets_intr; 1446 sqsp->sq_npackets_other.value.ui64 = sqp->sq_stats.sq_npackets_other; 1447 sqsp->sq_nqueued_intr.value.ui64 = sqp->sq_stats.sq_nqueued_intr; 1448 sqsp->sq_nqueued_other.value.ui64 = sqp->sq_stats.sq_nqueued_other; 1449 sqsp->sq_ndrains_worker.value.ui64 = sqp->sq_stats.sq_ndrains_worker; 1450 sqsp->sq_ndrains_intr.value.ui64 = sqp->sq_stats.sq_ndrains_intr; 1451 sqsp->sq_ndrains_other.value.ui64 = sqp->sq_stats.sq_ndrains_other; 1452 sqsp->sq_time_worker.value.ui64 = sqp->sq_stats.sq_time_worker; 1453 sqsp->sq_time_intr.value.ui64 = sqp->sq_stats.sq_time_intr; 1454 sqsp->sq_time_other.value.ui64 = sqp->sq_stats.sq_time_other; 1455 return (0); 1456 } 1457 #endif 1458 1459 void 1460 squeue_profile_enable(squeue_t *sqp) 1461 { 1462 mutex_enter(&sqp->sq_lock); 1463 sqp->sq_state |= SQS_PROFILE; 1464 mutex_exit(&sqp->sq_lock); 1465 } 1466 1467 void 1468 squeue_profile_disable(squeue_t *sqp) 1469 { 1470 mutex_enter(&sqp->sq_lock); 1471 sqp->sq_state &= ~SQS_PROFILE; 1472 mutex_exit(&sqp->sq_lock); 1473 } 1474 1475 void 1476 squeue_profile_reset(squeue_t *sqp) 1477 { 1478 #if SQUEUE_PROFILE 1479 bzero(&sqp->sq_stats, sizeof (sqstat_t)); 1480 #endif 1481 } 1482 1483 void 1484 squeue_profile_start(void) 1485 { 1486 #if SQUEUE_PROFILE 1487 squeue_profile = B_TRUE; 1488 #endif 1489 } 1490 1491 void 1492 squeue_profile_stop(void) 1493 { 1494 #if SQUEUE_PROFILE 1495 squeue_profile = B_FALSE; 1496 #endif 1497 } 1498 1499 uintptr_t * 1500 squeue_getprivate(squeue_t *sqp, sqprivate_t p) 1501 { 1502 ASSERT(p < SQPRIVATE_MAX); 1503 1504 return (&sqp->sq_private[p]); 1505 } 1506 1507 processorid_t 1508 squeue_binding(squeue_t *sqp) 1509 { 1510 return (sqp->sq_bind); 1511 } 1512