1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License, Version 1.0 only 6 * (the "License"). You may not use this file except in compliance 7 * with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or http://www.opensolaris.org/os/licensing. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 /* 23 * Copyright 2005 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 /* 30 * Squeues - TCP/IP serialization mechanism. 31 * 32 * This is a general purpose high-performance serialization mechanism. It is 33 * similar to a taskq with a single worker thread, the difference is that it 34 * does not imply a context switch - the thread placing a request may actually 35 * process it. It is also biased for processing requests in interrupt context. 36 * 37 * Each squeue has a worker thread which may optionally be bound to a CPU. 38 * 39 * Only one thread may process requests from a given squeue at any time. This is 40 * called "entering" squeue. 41 * 42 * Each dispatched request is processed either by 43 * 44 * a) Dispatching thread or 45 * b) Some other thread that is currently processing squeue at the time of 46 * request or 47 * c) worker thread. 48 * 49 * INTERFACES: 50 * 51 * squeue_t *squeue_create(name, bind, wait, pri) 52 * 53 * name: symbolic name for squeue. 54 * wait: time to wait before waiking the worker thread after queueing 55 * request. 56 * bind: preferred CPU binding for the worker thread. 57 * pri: thread priority for the worker thread. 58 * 59 * This function never fails and may sleep. It returns a transparent pointer 60 * to the squeue_t structure that is passed to all other squeue operations. 61 * 62 * void squeue_bind(sqp, bind) 63 * 64 * Bind squeue worker thread to a CPU specified by the 'bind' argument. The 65 * 'bind' value of -1 binds to the preferred thread specified for 66 * squeue_create. 67 * 68 * NOTE: Any value of 'bind' other then -1 is not supported currently, but the 69 * API is present - in the future it may be useful to specify different 70 * binding. 71 * 72 * void squeue_unbind(sqp) 73 * 74 * Unbind the worker thread from its preferred CPU. 75 * 76 * void squeue_enter(*sqp, *mp, proc, arg, tag) 77 * 78 * Post a single request for processing. Each request consists of mblock 'mp', 79 * function 'proc' to execute and an argument 'arg' to pass to this 80 * function. The function is called as (*proc)(arg, mp, sqp); The tag is an 81 * arbitrary number from 0 to 255 which will be stored in mp to track exact 82 * caller of squeue_enter. The combination of function name and the tag should 83 * provide enough information to identify the caller. 84 * 85 * If no one is processing the squeue, squeue_enter() will call the function 86 * immediately. Otherwise it will add the request to the queue for later 87 * processing. Once the function is executed, the thread may continue 88 * executing all other requests pending on the queue. 89 * 90 * NOTE: The tagging information is only used when SQUEUE_DEBUG is set to 1. 91 * NOTE: The argument can be conn_t only. Ideally we'd like to have generic 92 * argument, but we want to drop connection reference count here - this 93 * improves tail-call optimizations. 94 * XXX: The arg should have type conn_t. 95 * 96 * void squeue_enter_nodrain(*sqp, *mp, proc, arg, tag) 97 * 98 * Same as squeue_enter(), but the entering thread will only try to execute a 99 * single request. It will not continue executing any pending requests. 100 * 101 * void squeue_fill(*sqp, *mp, proc, arg, tag) 102 * 103 * Just place the request on the queue without trying to execute it. Arrange 104 * for the worker thread to process the request. 105 * 106 * void squeue_profile_enable(sqp) 107 * void squeue_profile_disable(sqp) 108 * 109 * Enable or disable profiling for specified 'sqp'. Profiling is only 110 * available when SQUEUE_PROFILE is set. 111 * 112 * void squeue_profile_reset(sqp) 113 * 114 * Reset all profiling information to zero. Profiling is only 115 * available when SQUEUE_PROFILE is set. 116 * 117 * void squeue_profile_start() 118 * void squeue_profile_stop() 119 * 120 * Globally enable or disabled profiling for all squeues. 121 * 122 * uintptr_t *squeue_getprivate(sqp, p) 123 * 124 * Each squeue keeps small amount of private data space available for various 125 * consumers. Current consumers include TCP and NCA. Other consumers need to 126 * add their private tag to the sqprivate_t enum. The private information is 127 * limited to an uintptr_t value. The squeue has no knowledge of its content 128 * and does not manage it in any way. 129 * 130 * The typical use may be a breakdown of data structures per CPU (since 131 * squeues are usually per CPU). See NCA for examples of use. 132 * Currently 'p' may have one legal value SQPRIVATE_TCP. 133 * 134 * processorid_t squeue_binding(sqp) 135 * 136 * Returns the CPU binding for a given squeue. 137 * 138 * TUNABALES: 139 * 140 * squeue_intrdrain_ms: Maximum time in ms interrupts spend draining any 141 * squeue. Note that this is approximation - squeues have no control on the 142 * time it takes to process each request. This limit is only checked 143 * between processing individual messages. 144 * Default: 20 ms. 145 * 146 * squeue_writerdrain_ms: Maximum time in ms non-interrupts spend draining any 147 * squeue. Note that this is approximation - squeues have no control on the 148 * time it takes to process each request. This limit is only checked 149 * between processing individual messages. 150 * Default: 10 ms. 151 * 152 * squeue_workerdrain_ms: Maximum time in ms worker thread spends draining any 153 * squeue. Note that this is approximation - squeues have no control on the 154 * time it takes to process each request. This limit is only checked 155 * between processing individual messages. 156 * Default: 10 ms. 157 * 158 * squeue_workerwait_ms: When worker thread is interrupted because workerdrain 159 * expired, how much time to wait before waking worker thread again. 160 * Default: 10 ms. 161 * 162 * DEFINES: 163 * 164 * SQUEUE_DEBUG: If defined as 1, special code is compiled in which records 165 * additional information aiding debugging is recorded in squeue. 166 * 167 * SQUEUE_PROFILE: If defined as 1, special code is compiled in which collects 168 * various squeue statistics and exports them as kstats. 169 * 170 * Ideally we would like both SQUEUE_DEBUG and SQUEUE_PROFILE to be always set, 171 * but it affects performance, so they are enabled on DEBUG kernels and disabled 172 * on non-DEBUG by default. 173 */ 174 175 #include <sys/types.h> 176 #include <sys/cmn_err.h> 177 #include <sys/debug.h> 178 #include <sys/kmem.h> 179 #include <sys/cpuvar.h> 180 #include <sys/condvar_impl.h> 181 #include <sys/systm.h> 182 #include <sys/callb.h> 183 #include <sys/sdt.h> 184 #include <sys/ddi.h> 185 186 #include <inet/ipclassifier.h> 187 188 /* 189 * State flags. 190 * Note: The MDB IP module depends on the values of these flags. 191 */ 192 #define SQS_PROC 0x0001 /* being processed */ 193 #define SQS_WORKER 0x0002 /* worker thread */ 194 #define SQS_ENTER 0x0004 /* enter thread */ 195 #define SQS_FAST 0x0008 /* enter-fast thread */ 196 #define SQS_USER 0x0010 /* A non interrupt user */ 197 #define SQS_BOUND 0x0020 /* Worker thread is bound */ 198 #define SQS_PROFILE 0x0040 /* Enable profiling */ 199 #define SQS_REENTER 0x0080 /* Re entered thread */ 200 #define SQS_TMO_PROG 0x0100 /* Timeout is being set */ 201 202 #ifdef DEBUG 203 #define SQUEUE_DEBUG 1 204 #define SQUEUE_PROFILE 1 205 #else 206 #define SQUEUE_DEBUG 0 207 #define SQUEUE_PROFILE 0 208 #endif 209 210 #include <sys/squeue_impl.h> 211 212 static void squeue_fire(void *); 213 static void squeue_drain(squeue_t *, uint_t, hrtime_t); 214 static void squeue_worker(squeue_t *sqp); 215 216 #if SQUEUE_PROFILE 217 static kmutex_t squeue_kstat_lock; 218 static int squeue_kstat_update(kstat_t *, int); 219 #endif 220 221 kmem_cache_t *squeue_cache; 222 223 #define SQUEUE_MSEC_TO_NSEC 1000000 224 225 int squeue_intrdrain_ms = 20; 226 int squeue_writerdrain_ms = 10; 227 int squeue_workerdrain_ms = 10; 228 int squeue_workerwait_ms = 10; 229 230 /* The values above converted to ticks or nano seconds */ 231 static int squeue_intrdrain_ns = 0; 232 static int squeue_writerdrain_ns = 0; 233 static int squeue_workerdrain_ns = 0; 234 static int squeue_workerwait_tick = 0; 235 236 /* 237 * The minimum packet queued when worker thread doing the drain triggers 238 * polling (if squeue allows it). The choice of 3 is arbitrary. You 239 * definitely don't want it to be 1 since that will trigger polling 240 * on very low loads as well (ssh seems to do be one such example 241 * where packet flow was very low yet somehow 1 packet ended up getting 242 * queued and worker thread fires every 10ms and blanking also gets 243 * triggered. 244 */ 245 int squeue_worker_poll_min = 3; 246 247 #if SQUEUE_PROFILE 248 /* 249 * Set to B_TRUE to enable profiling. 250 */ 251 static int squeue_profile = B_FALSE; 252 #define SQ_PROFILING(sqp) (squeue_profile && ((sqp)->sq_state & SQS_PROFILE)) 253 254 #define SQSTAT(sqp, x) ((sqp)->sq_stats.x++) 255 #define SQDELTA(sqp, x, d) ((sqp)->sq_stats.x += (d)) 256 257 struct squeue_kstat { 258 kstat_named_t sq_count; 259 kstat_named_t sq_max_qlen; 260 kstat_named_t sq_npackets_worker; 261 kstat_named_t sq_npackets_intr; 262 kstat_named_t sq_npackets_other; 263 kstat_named_t sq_nqueued_intr; 264 kstat_named_t sq_nqueued_other; 265 kstat_named_t sq_ndrains_worker; 266 kstat_named_t sq_ndrains_intr; 267 kstat_named_t sq_ndrains_other; 268 kstat_named_t sq_time_worker; 269 kstat_named_t sq_time_intr; 270 kstat_named_t sq_time_other; 271 } squeue_kstat = { 272 { "count", KSTAT_DATA_UINT64 }, 273 { "max_qlen", KSTAT_DATA_UINT64 }, 274 { "packets_worker", KSTAT_DATA_UINT64 }, 275 { "packets_intr", KSTAT_DATA_UINT64 }, 276 { "packets_other", KSTAT_DATA_UINT64 }, 277 { "queued_intr", KSTAT_DATA_UINT64 }, 278 { "queued_other", KSTAT_DATA_UINT64 }, 279 { "ndrains_worker", KSTAT_DATA_UINT64 }, 280 { "ndrains_intr", KSTAT_DATA_UINT64 }, 281 { "ndrains_other", KSTAT_DATA_UINT64 }, 282 { "time_worker", KSTAT_DATA_UINT64 }, 283 { "time_intr", KSTAT_DATA_UINT64 }, 284 { "time_other", KSTAT_DATA_UINT64 }, 285 }; 286 #endif 287 288 #define SQUEUE_WORKER_WAKEUP(sqp) { \ 289 timeout_id_t tid = (sqp)->sq_tid; \ 290 \ 291 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 292 /* \ 293 * Queue isn't being processed, so take \ 294 * any post enqueue actions needed before leaving. \ 295 */ \ 296 if (tid != 0) { \ 297 /* \ 298 * Waiting for an enter() to process mblk(s). \ 299 */ \ 300 clock_t waited = lbolt - (sqp)->sq_awaken; \ 301 \ 302 if (TICK_TO_MSEC(waited) >= (sqp)->sq_wait) { \ 303 /* \ 304 * Times up and have a worker thread \ 305 * waiting for work, so schedule it. \ 306 */ \ 307 (sqp)->sq_tid = 0; \ 308 (sqp)->sq_awaken = lbolt; \ 309 cv_signal(&(sqp)->sq_async); \ 310 mutex_exit(&(sqp)->sq_lock); \ 311 (void) untimeout(tid); \ 312 return; \ 313 } \ 314 mutex_exit(&(sqp)->sq_lock); \ 315 return; \ 316 } else if ((sqp)->sq_state & SQS_TMO_PROG) { \ 317 mutex_exit(&(sqp)->sq_lock); \ 318 return; \ 319 } else if ((sqp)->sq_wait != 0) { \ 320 clock_t wait = (sqp)->sq_wait; \ 321 /* \ 322 * Wait up to sqp->sq_wait ms for an \ 323 * enter() to process this queue. We \ 324 * don't want to contend on timeout locks \ 325 * with sq_lock held for performance reasons, \ 326 * so drop the sq_lock before calling timeout \ 327 * but we need to check if timeout is required \ 328 * after re acquiring the sq_lock. Once \ 329 * the sq_lock is dropped, someone else could \ 330 * have processed the packet or the timeout could \ 331 * have already fired. \ 332 */ \ 333 (sqp)->sq_state |= SQS_TMO_PROG; \ 334 mutex_exit(&(sqp)->sq_lock); \ 335 tid = timeout(squeue_fire, (sqp), wait); \ 336 mutex_enter(&(sqp)->sq_lock); \ 337 /* Check again if we still need the timeout */ \ 338 if ((((sqp)->sq_state & (SQS_PROC|SQS_TMO_PROG)) == \ 339 SQS_TMO_PROG) && ((sqp)->sq_tid == 0) && \ 340 ((sqp)->sq_first != NULL)) { \ 341 (sqp)->sq_state &= ~SQS_TMO_PROG; \ 342 (sqp)->sq_awaken = lbolt; \ 343 (sqp)->sq_tid = tid; \ 344 mutex_exit(&(sqp)->sq_lock); \ 345 return; \ 346 } else { \ 347 if ((sqp)->sq_state & SQS_TMO_PROG) { \ 348 (sqp)->sq_state &= ~SQS_TMO_PROG; \ 349 mutex_exit(&(sqp)->sq_lock); \ 350 (void) untimeout(tid); \ 351 } else { \ 352 /* \ 353 * The timer fired before we could \ 354 * reacquire the sq_lock. squeue_fire \ 355 * removes the SQS_TMO_PROG flag \ 356 * and we don't need to do anything \ 357 * else. \ 358 */ \ 359 mutex_exit(&(sqp)->sq_lock); \ 360 } \ 361 } \ 362 } else { \ 363 /* \ 364 * Schedule the worker thread. \ 365 */ \ 366 (sqp)->sq_awaken = lbolt; \ 367 cv_signal(&(sqp)->sq_async); \ 368 mutex_exit(&(sqp)->sq_lock); \ 369 } \ 370 ASSERT(MUTEX_NOT_HELD(&(sqp)->sq_lock)); \ 371 } 372 373 #define ENQUEUE_MP(sqp, mp, proc, arg) { \ 374 /* \ 375 * Enque our mblk. \ 376 */ \ 377 (mp)->b_queue = NULL; \ 378 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 379 ASSERT((mp)->b_prev == NULL && (mp)->b_next == NULL); \ 380 (mp)->b_queue = (queue_t *)(proc); \ 381 (mp)->b_prev = (mblk_t *)(arg); \ 382 \ 383 if ((sqp)->sq_last != NULL) \ 384 (sqp)->sq_last->b_next = (mp); \ 385 else \ 386 (sqp)->sq_first = (mp); \ 387 (sqp)->sq_last = (mp); \ 388 (sqp)->sq_count++; \ 389 ASSERT((sqp)->sq_count > 0); \ 390 DTRACE_PROBE2(squeue__enqueue, squeue_t *, sqp, \ 391 mblk_t *, mp); \ 392 } 393 394 395 #define ENQUEUE_CHAIN(sqp, mp, tail, cnt) { \ 396 /* \ 397 * Enqueue our mblk chain. \ 398 */ \ 399 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 400 \ 401 if ((sqp)->sq_last != NULL) \ 402 (sqp)->sq_last->b_next = (mp); \ 403 else \ 404 (sqp)->sq_first = (mp); \ 405 (sqp)->sq_last = (tail); \ 406 (sqp)->sq_count += (cnt); \ 407 ASSERT((sqp)->sq_count > 0); \ 408 DTRACE_PROBE4(squeue__enqueuechain, squeue_t *, sqp, \ 409 mblk_t *, mp, mblk_t *, tail, int, cnt); \ 410 \ 411 } 412 413 #define SQS_POLLING_ON(sqp, rx_ring) { \ 414 ASSERT(rx_ring != NULL); \ 415 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 416 rx_ring->rr_blank(rx_ring->rr_handle, \ 417 MIN((sqp->sq_avg_drain_time * sqp->sq_count), \ 418 rx_ring->rr_max_blank_time), \ 419 rx_ring->rr_max_pkt_cnt); \ 420 rx_ring->rr_poll_state |= ILL_POLLING; \ 421 rx_ring->rr_poll_time = lbolt; \ 422 } 423 424 425 #define SQS_POLLING_OFF(sqp, rx_ring) { \ 426 ASSERT(rx_ring != NULL); \ 427 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 428 rx_ring->rr_blank(rx_ring->rr_handle, \ 429 rx_ring->rr_min_blank_time, \ 430 rx_ring->rr_min_pkt_cnt); \ 431 } 432 433 void 434 squeue_init(void) 435 { 436 squeue_cache = kmem_cache_create("squeue_cache", 437 sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0); 438 439 squeue_intrdrain_ns = squeue_intrdrain_ms * SQUEUE_MSEC_TO_NSEC; 440 squeue_writerdrain_ns = squeue_writerdrain_ms * SQUEUE_MSEC_TO_NSEC; 441 squeue_workerdrain_ns = squeue_workerdrain_ms * SQUEUE_MSEC_TO_NSEC; 442 squeue_workerwait_tick = MSEC_TO_TICK_ROUNDUP(squeue_workerwait_ms); 443 } 444 445 /* ARGSUSED */ 446 squeue_t * 447 squeue_create(char *name, processorid_t bind, clock_t wait, pri_t pri) 448 { 449 squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP); 450 451 bzero(sqp, sizeof (squeue_t)); 452 (void) strncpy(sqp->sq_name, name, SQ_NAMELEN + 1); 453 sqp->sq_name[SQ_NAMELEN] = '\0'; 454 455 sqp->sq_bind = bind; 456 sqp->sq_wait = MSEC_TO_TICK(wait); 457 sqp->sq_avg_drain_time = 458 drv_hztousec(NSEC_TO_TICK_ROUNDUP(squeue_intrdrain_ns)) / 459 NSEC_TO_TICK_ROUNDUP(squeue_intrdrain_ns); 460 461 #if SQUEUE_PROFILE 462 if ((sqp->sq_kstat = kstat_create("ip", bind, name, 463 "net", KSTAT_TYPE_NAMED, 464 sizeof (squeue_kstat) / sizeof (kstat_named_t), 465 KSTAT_FLAG_VIRTUAL)) != NULL) { 466 sqp->sq_kstat->ks_lock = &squeue_kstat_lock; 467 sqp->sq_kstat->ks_data = &squeue_kstat; 468 sqp->sq_kstat->ks_update = squeue_kstat_update; 469 sqp->sq_kstat->ks_private = sqp; 470 kstat_install(sqp->sq_kstat); 471 } 472 #endif 473 474 sqp->sq_worker = thread_create(NULL, 0, squeue_worker, 475 sqp, 0, &p0, TS_RUN, pri); 476 477 return (sqp); 478 } 479 480 /* ARGSUSED */ 481 void 482 squeue_bind(squeue_t *sqp, processorid_t bind) 483 { 484 ASSERT(bind == -1); 485 486 mutex_enter(&sqp->sq_lock); 487 if (sqp->sq_state & SQS_BOUND) { 488 mutex_exit(&sqp->sq_lock); 489 return; 490 } 491 492 sqp->sq_state |= SQS_BOUND; 493 mutex_exit(&sqp->sq_lock); 494 495 thread_affinity_set(sqp->sq_worker, sqp->sq_bind); 496 } 497 498 void 499 squeue_unbind(squeue_t *sqp) 500 { 501 mutex_enter(&sqp->sq_lock); 502 if (!(sqp->sq_state & SQS_BOUND)) { 503 mutex_exit(&sqp->sq_lock); 504 return; 505 } 506 507 sqp->sq_state &= ~SQS_BOUND; 508 mutex_exit(&sqp->sq_lock); 509 510 thread_affinity_clear(sqp->sq_worker); 511 } 512 513 /* 514 * squeue_enter() - enter squeue sqp with mblk mp (which can be 515 * a chain), while tail points to the end and cnt in number of 516 * mblks in the chain. 517 * 518 * For a chain of single packet (i.e. mp == tail), go through the 519 * fast path if no one is processing the squeue and nothing is queued. 520 * 521 * The proc and arg for each mblk is already stored in the mblk in 522 * appropriate places. 523 */ 524 void 525 squeue_enter_chain(squeue_t *sqp, mblk_t *mp, mblk_t *tail, 526 uint32_t cnt, uint8_t tag) 527 { 528 int interrupt = servicing_interrupt(); 529 void *arg; 530 sqproc_t proc; 531 hrtime_t now; 532 #if SQUEUE_PROFILE 533 hrtime_t start, delta; 534 #endif 535 536 ASSERT(sqp != NULL); 537 ASSERT(mp != NULL); 538 ASSERT(tail != NULL); 539 ASSERT(cnt > 0); 540 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 541 542 mutex_enter(&sqp->sq_lock); 543 if (!(sqp->sq_state & SQS_PROC)) { 544 /* 545 * See if anything is already queued. If we are the 546 * first packet, do inline processing else queue the 547 * packet and do the drain. 548 */ 549 sqp->sq_run = curthread; 550 if (sqp->sq_first == NULL && cnt == 1) { 551 /* 552 * Fast-path, ok to process and nothing queued. 553 */ 554 sqp->sq_state |= (SQS_PROC|SQS_FAST); 555 mutex_exit(&sqp->sq_lock); 556 557 /* 558 * We are the chain of 1 packet so 559 * go through this fast path. 560 */ 561 arg = mp->b_prev; 562 mp->b_prev = NULL; 563 proc = (sqproc_t)mp->b_queue; 564 mp->b_queue = NULL; 565 566 ASSERT(proc != NULL); 567 ASSERT(arg != NULL); 568 ASSERT(mp->b_next == NULL); 569 570 #if SQUEUE_DEBUG 571 sqp->sq_isintr = interrupt; 572 sqp->sq_curmp = mp; 573 sqp->sq_curproc = proc; 574 sqp->sq_connp = arg; 575 mp->b_tag = sqp->sq_tag = tag; 576 #endif 577 #if SQUEUE_PROFILE 578 if (SQ_PROFILING(sqp)) { 579 if (interrupt) 580 SQSTAT(sqp, sq_npackets_intr); 581 else 582 SQSTAT(sqp, sq_npackets_other); 583 start = gethrtime(); 584 } 585 #endif 586 ((conn_t *)arg)->conn_on_sqp = B_TRUE; 587 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 588 sqp, mblk_t *, mp, conn_t *, arg); 589 (*proc)(arg, mp, sqp); 590 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 591 sqp, conn_t *, arg); 592 ((conn_t *)arg)->conn_on_sqp = B_FALSE; 593 594 #if SQUEUE_PROFILE 595 if (SQ_PROFILING(sqp)) { 596 delta = gethrtime() - start; 597 if (interrupt) 598 SQDELTA(sqp, sq_time_intr, delta); 599 else 600 SQDELTA(sqp, sq_time_other, delta); 601 } 602 #endif 603 #if SQUEUE_DEBUG 604 sqp->sq_curmp = NULL; 605 sqp->sq_curproc = NULL; 606 sqp->sq_connp = NULL; 607 sqp->sq_isintr = 0; 608 #endif 609 610 CONN_DEC_REF((conn_t *)arg); 611 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 612 mutex_enter(&sqp->sq_lock); 613 sqp->sq_state &= ~(SQS_PROC|SQS_FAST); 614 if (sqp->sq_first == NULL) { 615 /* 616 * We processed inline our packet and 617 * nothing new has arrived. We are done. 618 */ 619 sqp->sq_run = NULL; 620 mutex_exit(&sqp->sq_lock); 621 return; 622 } else if (sqp->sq_bind != CPU->cpu_id) { 623 /* 624 * If the current thread is not running 625 * on the CPU to which this squeue is bound, 626 * then don't allow it to drain. 627 */ 628 sqp->sq_run = NULL; 629 SQUEUE_WORKER_WAKEUP(sqp); 630 return; 631 } 632 } else { 633 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 634 #if SQUEUE_DEBUG 635 mp->b_tag = tag; 636 #endif 637 #if SQUEUE_PROFILE 638 if (SQ_PROFILING(sqp)) { 639 if (servicing_interrupt()) 640 SQSTAT(sqp, sq_nqueued_intr); 641 else 642 SQSTAT(sqp, sq_nqueued_other); 643 if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) 644 sqp->sq_stats.sq_max_qlen = 645 sqp->sq_count; 646 } 647 #endif 648 } 649 650 /* 651 * We are here because either we couldn't do inline 652 * processing (because something was already queued), 653 * or we had a chanin of more than one packet, 654 * or something else arrived after we were done with 655 * inline processing. 656 */ 657 ASSERT(MUTEX_HELD(&sqp->sq_lock)); 658 ASSERT(sqp->sq_first != NULL); 659 660 #if SQUEUE_PROFILE 661 if (SQ_PROFILING(sqp)) { 662 start = gethrtime(); 663 } 664 #endif 665 #if SQUEUE_DEBUG 666 sqp->sq_isintr = interrupt; 667 #endif 668 669 now = gethrtime(); 670 if (interrupt) { 671 squeue_drain(sqp, SQS_ENTER, now + 672 squeue_intrdrain_ns); 673 } else { 674 squeue_drain(sqp, SQS_USER, now + 675 squeue_writerdrain_ns); 676 } 677 678 #if SQUEUE_PROFILE 679 if (SQ_PROFILING(sqp)) { 680 delta = gethrtime() - start; 681 if (interrupt) 682 SQDELTA(sqp, sq_time_intr, delta); 683 else 684 SQDELTA(sqp, sq_time_other, delta); 685 } 686 #endif 687 #if SQUEUE_DEBUG 688 sqp->sq_isintr = 0; 689 #endif 690 691 /* 692 * If we didn't do a complete drain, the worker 693 * thread was already signalled by squeue_drain. 694 */ 695 sqp->sq_run = NULL; 696 mutex_exit(&sqp->sq_lock); 697 return; 698 } else { 699 ASSERT(sqp->sq_run != NULL); 700 /* 701 * Queue is already being processed. Just enqueue 702 * the packet and go away. 703 */ 704 #if SQUEUE_DEBUG 705 mp->b_tag = tag; 706 #endif 707 #if SQUEUE_PROFILE 708 if (SQ_PROFILING(sqp)) { 709 if (servicing_interrupt()) 710 SQSTAT(sqp, sq_nqueued_intr); 711 else 712 SQSTAT(sqp, sq_nqueued_other); 713 if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) 714 sqp->sq_stats.sq_max_qlen = sqp->sq_count; 715 } 716 #endif 717 718 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 719 mutex_exit(&sqp->sq_lock); 720 return; 721 } 722 } 723 724 /* 725 * squeue_enter() - enter squeue *sqp with mblk *mp with argument of *arg. 726 */ 727 void 728 squeue_enter(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void *arg, 729 uint8_t tag) 730 { 731 int interrupt = servicing_interrupt(); 732 hrtime_t now; 733 #if SQUEUE_PROFILE 734 hrtime_t start, delta; 735 #endif 736 #if SQUEUE_DEBUG 737 conn_t *connp = (conn_t *)arg; 738 ASSERT(!IPCL_IS_TCP(connp) || connp->conn_tcp->tcp_connp == connp); 739 ASSERT(!IPCL_IS_UDP(connp) || connp->conn_udp->udp_connp == connp); 740 #endif 741 742 ASSERT(proc != NULL); 743 ASSERT(sqp != NULL); 744 ASSERT(mp != NULL); 745 ASSERT(mp->b_next == NULL); 746 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 747 748 mutex_enter(&sqp->sq_lock); 749 if (!(sqp->sq_state & SQS_PROC)) { 750 /* 751 * See if anything is already queued. If we are the 752 * first packet, do inline processing else queue the 753 * packet and do the drain. 754 */ 755 sqp->sq_run = curthread; 756 if (sqp->sq_first == NULL) { 757 /* 758 * Fast-path, ok to process and nothing queued. 759 */ 760 sqp->sq_state |= (SQS_PROC|SQS_FAST); 761 mutex_exit(&sqp->sq_lock); 762 763 #if SQUEUE_DEBUG 764 sqp->sq_isintr = interrupt; 765 sqp->sq_curmp = mp; 766 sqp->sq_curproc = proc; 767 sqp->sq_connp = connp; 768 mp->b_tag = sqp->sq_tag = tag; 769 #endif 770 #if SQUEUE_PROFILE 771 if (SQ_PROFILING(sqp)) { 772 if (interrupt) 773 SQSTAT(sqp, sq_npackets_intr); 774 else 775 SQSTAT(sqp, sq_npackets_other); 776 start = gethrtime(); 777 } 778 #endif 779 ((conn_t *)arg)->conn_on_sqp = B_TRUE; 780 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 781 sqp, mblk_t *, mp, conn_t *, arg); 782 (*proc)(arg, mp, sqp); 783 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 784 sqp, conn_t *, arg); 785 ((conn_t *)arg)->conn_on_sqp = B_FALSE; 786 787 #if SQUEUE_PROFILE 788 if (SQ_PROFILING(sqp)) { 789 delta = gethrtime() - start; 790 if (interrupt) 791 SQDELTA(sqp, sq_time_intr, delta); 792 else 793 SQDELTA(sqp, sq_time_other, delta); 794 } 795 #endif 796 #if SQUEUE_DEBUG 797 sqp->sq_curmp = NULL; 798 sqp->sq_curproc = NULL; 799 sqp->sq_connp = NULL; 800 sqp->sq_isintr = 0; 801 #endif 802 803 CONN_DEC_REF((conn_t *)arg); 804 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 805 mutex_enter(&sqp->sq_lock); 806 sqp->sq_state &= ~(SQS_PROC|SQS_FAST); 807 if (sqp->sq_first == NULL) { 808 /* 809 * We processed inline our packet and 810 * nothing new has arrived. We are done. 811 */ 812 sqp->sq_run = NULL; 813 mutex_exit(&sqp->sq_lock); 814 return; 815 } else if (sqp->sq_bind != CPU->cpu_id) { 816 /* 817 * If the current thread is not running 818 * on the CPU to which this squeue is bound, 819 * then don't allow it to drain. 820 */ 821 sqp->sq_run = NULL; 822 SQUEUE_WORKER_WAKEUP(sqp); 823 return; 824 } 825 } else { 826 ENQUEUE_MP(sqp, mp, proc, arg); 827 #if SQUEUE_DEBUG 828 mp->b_tag = tag; 829 #endif 830 #if SQUEUE_PROFILE 831 if (SQ_PROFILING(sqp)) { 832 if (servicing_interrupt()) 833 SQSTAT(sqp, sq_nqueued_intr); 834 else 835 SQSTAT(sqp, sq_nqueued_other); 836 if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) 837 sqp->sq_stats.sq_max_qlen = 838 sqp->sq_count; 839 } 840 #endif 841 } 842 843 /* 844 * We are here because either we couldn't do inline 845 * processing (because something was already queued) 846 * or something else arrived after we were done with 847 * inline processing. 848 */ 849 ASSERT(MUTEX_HELD(&sqp->sq_lock)); 850 ASSERT(sqp->sq_first != NULL); 851 852 #if SQUEUE_PROFILE 853 if (SQ_PROFILING(sqp)) { 854 start = gethrtime(); 855 } 856 #endif 857 #if SQUEUE_DEBUG 858 sqp->sq_isintr = interrupt; 859 #endif 860 861 now = gethrtime(); 862 if (interrupt) { 863 squeue_drain(sqp, SQS_ENTER, now + 864 squeue_intrdrain_ns); 865 } else { 866 squeue_drain(sqp, SQS_USER, now + 867 squeue_writerdrain_ns); 868 } 869 870 #if SQUEUE_PROFILE 871 if (SQ_PROFILING(sqp)) { 872 delta = gethrtime() - start; 873 if (interrupt) 874 SQDELTA(sqp, sq_time_intr, delta); 875 else 876 SQDELTA(sqp, sq_time_other, delta); 877 } 878 #endif 879 #if SQUEUE_DEBUG 880 sqp->sq_isintr = 0; 881 #endif 882 883 /* 884 * If we didn't do a complete drain, the worker 885 * thread was already signalled by squeue_drain. 886 */ 887 sqp->sq_run = NULL; 888 mutex_exit(&sqp->sq_lock); 889 return; 890 } else { 891 ASSERT(sqp->sq_run != NULL); 892 /* 893 * We let a thread processing a squeue reenter only 894 * once. This helps the case of incoming connection 895 * where a SYN-ACK-ACK that triggers the conn_ind 896 * doesn't have to queue the packet if listener and 897 * eager are on the same squeue. Also helps the 898 * loopback connection where the two ends are bound 899 * to the same squeue (which is typical on single 900 * CPU machines). 901 * We let the thread reenter only once for the fear 902 * of stack getting blown with multiple traversal. 903 */ 904 if (!(sqp->sq_state & SQS_REENTER) && 905 (sqp->sq_run == curthread) && 906 (((conn_t *)arg)->conn_on_sqp == B_FALSE)) { 907 sqp->sq_state |= SQS_REENTER; 908 mutex_exit(&sqp->sq_lock); 909 910 ((conn_t *)arg)->conn_on_sqp = B_TRUE; 911 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 912 sqp, mblk_t *, mp, conn_t *, arg); 913 (*proc)(arg, mp, sqp); 914 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 915 sqp, conn_t *, arg); 916 ((conn_t *)arg)->conn_on_sqp = B_FALSE; 917 CONN_DEC_REF((conn_t *)arg); 918 919 mutex_enter(&sqp->sq_lock); 920 sqp->sq_state &= ~SQS_REENTER; 921 mutex_exit(&sqp->sq_lock); 922 return; 923 } 924 /* 925 * Queue is already being processed. Just enqueue 926 * the packet and go away. 927 */ 928 #if SQUEUE_DEBUG 929 mp->b_tag = tag; 930 #endif 931 #if SQUEUE_PROFILE 932 if (SQ_PROFILING(sqp)) { 933 if (servicing_interrupt()) 934 SQSTAT(sqp, sq_nqueued_intr); 935 else 936 SQSTAT(sqp, sq_nqueued_other); 937 if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) 938 sqp->sq_stats.sq_max_qlen = sqp->sq_count; 939 } 940 #endif 941 942 ENQUEUE_MP(sqp, mp, proc, arg); 943 mutex_exit(&sqp->sq_lock); 944 return; 945 } 946 } 947 948 void 949 squeue_enter_nodrain(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void *arg, 950 uint8_t tag) 951 { 952 int interrupt = servicing_interrupt(); 953 boolean_t being_processed; 954 #if SQUEUE_DEBUG 955 conn_t *connp = (conn_t *)arg; 956 #endif 957 #if SQUEUE_PROFILE 958 hrtime_t start, delta; 959 #endif 960 961 ASSERT(proc != NULL); 962 ASSERT(sqp != NULL); 963 ASSERT(mp != NULL); 964 ASSERT(mp->b_next == NULL); 965 ASSERT(!IPCL_IS_TCP(connp) || connp->conn_tcp->tcp_connp == connp); 966 ASSERT(!IPCL_IS_UDP(connp) || connp->conn_udp->udp_connp == connp); 967 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 968 969 mutex_enter(&sqp->sq_lock); 970 971 being_processed = (sqp->sq_state & SQS_PROC); 972 if (!being_processed && (sqp->sq_first == NULL)) { 973 /* 974 * Fast-path, ok to process and nothing queued. 975 */ 976 sqp->sq_state |= (SQS_PROC|SQS_FAST); 977 sqp->sq_run = curthread; 978 mutex_exit(&sqp->sq_lock); 979 980 #if SQUEUE_DEBUG 981 sqp->sq_isintr = interrupt; 982 sqp->sq_curmp = mp; 983 sqp->sq_curproc = proc; 984 sqp->sq_connp = connp; 985 mp->b_tag = sqp->sq_tag = tag; 986 #endif 987 988 #if SQUEUE_PROFILE 989 if (SQ_PROFILING(sqp)) { 990 if (interrupt) 991 SQSTAT(sqp, sq_npackets_intr); 992 else 993 SQSTAT(sqp, sq_npackets_other); 994 start = gethrtime(); 995 } 996 #endif 997 998 ((conn_t *)arg)->conn_on_sqp = B_TRUE; 999 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 1000 sqp, mblk_t *, mp, conn_t *, arg); 1001 (*proc)(arg, mp, sqp); 1002 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 1003 sqp, conn_t *, arg); 1004 ((conn_t *)arg)->conn_on_sqp = B_FALSE; 1005 1006 #if SQUEUE_DEBUG 1007 sqp->sq_curmp = NULL; 1008 sqp->sq_curproc = NULL; 1009 sqp->sq_connp = NULL; 1010 sqp->sq_isintr = 0; 1011 #endif 1012 #if SQUEUE_PROFILE 1013 if (SQ_PROFILING(sqp)) { 1014 delta = gethrtime() - start; 1015 if (interrupt) 1016 SQDELTA(sqp, sq_time_intr, delta); 1017 else 1018 SQDELTA(sqp, sq_time_other, delta); 1019 } 1020 #endif 1021 1022 CONN_DEC_REF((conn_t *)arg); 1023 mutex_enter(&sqp->sq_lock); 1024 sqp->sq_state &= ~(SQS_PROC|SQS_FAST); 1025 sqp->sq_run = NULL; 1026 if (sqp->sq_first == NULL) { 1027 /* 1028 * We processed inline our packet and 1029 * nothing new has arrived. We are done. 1030 */ 1031 mutex_exit(&sqp->sq_lock); 1032 } else { 1033 SQUEUE_WORKER_WAKEUP(sqp); 1034 } 1035 return; 1036 } else { 1037 /* 1038 * We let a thread processing a squeue reenter only 1039 * once. This helps the case of incoming connection 1040 * where a SYN-ACK-ACK that triggers the conn_ind 1041 * doesn't have to queue the packet if listener and 1042 * eager are on the same squeue. Also helps the 1043 * loopback connection where the two ends are bound 1044 * to the same squeue (which is typical on single 1045 * CPU machines). 1046 * We let the thread reenter only once for the fear 1047 * of stack getting blown with multiple traversal. 1048 */ 1049 if (being_processed && !(sqp->sq_state & SQS_REENTER) && 1050 (sqp->sq_run == curthread) && 1051 (((conn_t *)arg)->conn_on_sqp == B_FALSE)) { 1052 sqp->sq_state |= SQS_REENTER; 1053 mutex_exit(&sqp->sq_lock); 1054 1055 ((conn_t *)arg)->conn_on_sqp = B_TRUE; 1056 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 1057 sqp, mblk_t *, mp, conn_t *, arg); 1058 (*proc)(arg, mp, sqp); 1059 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 1060 sqp, conn_t *, arg); 1061 ((conn_t *)arg)->conn_on_sqp = B_FALSE; 1062 CONN_DEC_REF((conn_t *)arg); 1063 1064 mutex_enter(&sqp->sq_lock); 1065 sqp->sq_state &= ~SQS_REENTER; 1066 mutex_exit(&sqp->sq_lock); 1067 return; 1068 } 1069 1070 #if SQUEUE_DEBUG 1071 mp->b_tag = tag; 1072 #endif 1073 #if SQUEUE_PROFILE 1074 if (SQ_PROFILING(sqp)) { 1075 if (servicing_interrupt()) 1076 SQSTAT(sqp, sq_nqueued_intr); 1077 else 1078 SQSTAT(sqp, sq_nqueued_other); 1079 if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) 1080 sqp->sq_stats.sq_max_qlen = sqp->sq_count; 1081 } 1082 #endif 1083 ENQUEUE_MP(sqp, mp, proc, arg); 1084 if (being_processed) { 1085 /* 1086 * Queue is already being processed. 1087 * No need to do anything. 1088 */ 1089 mutex_exit(&sqp->sq_lock); 1090 return; 1091 } 1092 SQUEUE_WORKER_WAKEUP(sqp); 1093 } 1094 } 1095 1096 /* 1097 * squeue_fill() - fill squeue *sqp with mblk *mp with argument of *arg 1098 * without processing the squeue. 1099 */ 1100 /* ARGSUSED */ 1101 void 1102 squeue_fill(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void * arg, 1103 uint8_t tag) 1104 { 1105 #if SQUEUE_DEBUG 1106 conn_t *connp = (conn_t *)arg; 1107 #endif 1108 ASSERT(proc != NULL); 1109 ASSERT(sqp != NULL); 1110 ASSERT(mp != NULL); 1111 ASSERT(mp->b_next == NULL); 1112 ASSERT(!IPCL_IS_TCP(connp) || connp->conn_tcp->tcp_connp == connp); 1113 ASSERT(!IPCL_IS_UDP(connp) || connp->conn_udp->udp_connp == connp); 1114 1115 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 1116 mutex_enter(&sqp->sq_lock); 1117 ENQUEUE_MP(sqp, mp, proc, arg); 1118 #if SQUEUE_DEBUG 1119 mp->b_tag = tag; 1120 #endif 1121 #if SQUEUE_PROFILE 1122 if (SQ_PROFILING(sqp)) { 1123 if (servicing_interrupt()) 1124 SQSTAT(sqp, sq_nqueued_intr); 1125 else 1126 SQSTAT(sqp, sq_nqueued_other); 1127 if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) 1128 sqp->sq_stats.sq_max_qlen = sqp->sq_count; 1129 } 1130 #endif 1131 1132 /* 1133 * If queue is already being processed. No need to do anything. 1134 */ 1135 if (sqp->sq_state & SQS_PROC) { 1136 mutex_exit(&sqp->sq_lock); 1137 return; 1138 } 1139 1140 SQUEUE_WORKER_WAKEUP(sqp); 1141 } 1142 1143 1144 /* 1145 * PRIVATE FUNCTIONS 1146 */ 1147 1148 static void 1149 squeue_fire(void *arg) 1150 { 1151 squeue_t *sqp = arg; 1152 uint_t state; 1153 1154 mutex_enter(&sqp->sq_lock); 1155 1156 state = sqp->sq_state; 1157 if (sqp->sq_tid == 0 && !(state & SQS_TMO_PROG)) { 1158 mutex_exit(&sqp->sq_lock); 1159 return; 1160 } 1161 1162 sqp->sq_tid = 0; 1163 /* 1164 * The timeout fired before we got a chance to set it. 1165 * Process it anyway but remove the SQS_TMO_PROG so that 1166 * the guy trying to set the timeout knows that it has 1167 * already been processed. 1168 */ 1169 if (state & SQS_TMO_PROG) 1170 sqp->sq_state &= ~SQS_TMO_PROG; 1171 1172 if (!(state & SQS_PROC)) { 1173 sqp->sq_awaken = lbolt; 1174 cv_signal(&sqp->sq_async); 1175 } 1176 mutex_exit(&sqp->sq_lock); 1177 } 1178 1179 static void 1180 squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire) 1181 { 1182 mblk_t *mp; 1183 mblk_t *head; 1184 sqproc_t proc; 1185 conn_t *connp; 1186 clock_t start = lbolt; 1187 clock_t drain_time; 1188 timeout_id_t tid; 1189 uint_t cnt; 1190 uint_t total_cnt = 0; 1191 ill_rx_ring_t *sq_rx_ring = sqp->sq_rx_ring; 1192 int interrupt = servicing_interrupt(); 1193 boolean_t poll_on = B_FALSE; 1194 hrtime_t now; 1195 1196 ASSERT(mutex_owned(&sqp->sq_lock)); 1197 ASSERT(!(sqp->sq_state & SQS_PROC)); 1198 1199 #if SQUEUE_PROFILE 1200 if (SQ_PROFILING(sqp)) { 1201 if (interrupt) 1202 SQSTAT(sqp, sq_ndrains_intr); 1203 else if (!(proc_type & SQS_WORKER)) 1204 SQSTAT(sqp, sq_ndrains_other); 1205 else 1206 SQSTAT(sqp, sq_ndrains_worker); 1207 } 1208 #endif 1209 1210 if ((tid = sqp->sq_tid) != 0) 1211 sqp->sq_tid = 0; 1212 1213 sqp->sq_state |= SQS_PROC | proc_type; 1214 head = sqp->sq_first; 1215 sqp->sq_first = NULL; 1216 sqp->sq_last = NULL; 1217 cnt = sqp->sq_count; 1218 1219 /* 1220 * We have backlog built up. Switch to polling mode if the 1221 * device underneath allows it. Need to do it only for 1222 * drain by non-interrupt thread so interrupts don't 1223 * come and disrupt us in between. If its a interrupt thread, 1224 * no need because most devices will not issue another 1225 * interrupt till this one returns. 1226 */ 1227 if ((sqp->sq_state & SQS_POLL_CAPAB) && !(proc_type & SQS_ENTER) && 1228 (sqp->sq_count > squeue_worker_poll_min)) { 1229 ASSERT(sq_rx_ring != NULL); 1230 SQS_POLLING_ON(sqp, sq_rx_ring); 1231 poll_on = B_TRUE; 1232 } 1233 1234 mutex_exit(&sqp->sq_lock); 1235 1236 if (tid != 0) 1237 (void) untimeout(tid); 1238 again: 1239 while ((mp = head) != NULL) { 1240 head = mp->b_next; 1241 mp->b_next = NULL; 1242 1243 proc = (sqproc_t)mp->b_queue; 1244 mp->b_queue = NULL; 1245 connp = (conn_t *)mp->b_prev; 1246 mp->b_prev = NULL; 1247 #if SQUEUE_DEBUG 1248 sqp->sq_curmp = mp; 1249 sqp->sq_curproc = proc; 1250 sqp->sq_connp = connp; 1251 sqp->sq_tag = mp->b_tag; 1252 #endif 1253 1254 #if SQUEUE_PROFILE 1255 if (SQ_PROFILING(sqp)) { 1256 if (interrupt) 1257 SQSTAT(sqp, sq_npackets_intr); 1258 else if (!(proc_type & SQS_WORKER)) 1259 SQSTAT(sqp, sq_npackets_other); 1260 else 1261 SQSTAT(sqp, sq_npackets_worker); 1262 } 1263 #endif 1264 1265 connp->conn_on_sqp = B_TRUE; 1266 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 1267 sqp, mblk_t *, mp, conn_t *, connp); 1268 (*proc)(connp, mp, sqp); 1269 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 1270 sqp, conn_t *, connp); 1271 connp->conn_on_sqp = B_FALSE; 1272 CONN_DEC_REF(connp); 1273 } 1274 1275 1276 #if SQUEUE_DEBUG 1277 sqp->sq_curmp = NULL; 1278 sqp->sq_curproc = NULL; 1279 sqp->sq_connp = NULL; 1280 #endif 1281 1282 mutex_enter(&sqp->sq_lock); 1283 sqp->sq_count -= cnt; 1284 total_cnt += cnt; 1285 1286 if (sqp->sq_first != NULL) { 1287 1288 now = gethrtime(); 1289 if (!expire || (now < expire)) { 1290 /* More arrived and time not expired */ 1291 head = sqp->sq_first; 1292 sqp->sq_first = NULL; 1293 sqp->sq_last = NULL; 1294 cnt = sqp->sq_count; 1295 mutex_exit(&sqp->sq_lock); 1296 goto again; 1297 } 1298 1299 /* 1300 * If we are not worker thread and we 1301 * reached our time limit to do drain, 1302 * signal the worker thread to pick 1303 * up the work. 1304 * If we were the worker thread, then 1305 * we take a break to allow an interrupt 1306 * or writer to pick up the load. 1307 */ 1308 if (proc_type != SQS_WORKER) { 1309 sqp->sq_awaken = lbolt; 1310 cv_signal(&sqp->sq_async); 1311 } 1312 } 1313 1314 /* 1315 * Try to see if we can get a time estimate to process a packet. 1316 * Do it only in interrupt context since less chance of context 1317 * switch or pinning etc. to get a better estimate. 1318 */ 1319 if (interrupt && ((drain_time = (lbolt - start)) > 0)) 1320 sqp->sq_avg_drain_time = ((80 * sqp->sq_avg_drain_time) + 1321 (20 * (drv_hztousec(drain_time)/total_cnt)))/100; 1322 1323 sqp->sq_state &= ~(SQS_PROC | proc_type); 1324 1325 /* 1326 * If polling was turned on, turn it off and reduce the default 1327 * interrupt blank interval as well to bring new packets in faster 1328 * (reduces the latency when there is no backlog). 1329 */ 1330 if (poll_on && (sqp->sq_state & SQS_POLL_CAPAB)) { 1331 ASSERT(sq_rx_ring != NULL); 1332 SQS_POLLING_OFF(sqp, sq_rx_ring); 1333 } 1334 } 1335 1336 static void 1337 squeue_worker(squeue_t *sqp) 1338 { 1339 kmutex_t *lock = &sqp->sq_lock; 1340 kcondvar_t *async = &sqp->sq_async; 1341 callb_cpr_t cprinfo; 1342 hrtime_t now; 1343 #if SQUEUE_PROFILE 1344 hrtime_t start; 1345 #endif 1346 1347 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "nca"); 1348 mutex_enter(lock); 1349 1350 for (;;) { 1351 while (sqp->sq_first == NULL || (sqp->sq_state & SQS_PROC)) { 1352 CALLB_CPR_SAFE_BEGIN(&cprinfo); 1353 still_wait: 1354 cv_wait(async, lock); 1355 if (sqp->sq_state & SQS_PROC) { 1356 goto still_wait; 1357 } 1358 CALLB_CPR_SAFE_END(&cprinfo, lock); 1359 } 1360 1361 #if SQUEUE_PROFILE 1362 if (SQ_PROFILING(sqp)) { 1363 start = gethrtime(); 1364 } 1365 #endif 1366 1367 ASSERT(squeue_workerdrain_ns != 0); 1368 now = gethrtime(); 1369 sqp->sq_run = curthread; 1370 squeue_drain(sqp, SQS_WORKER, now + squeue_workerdrain_ns); 1371 sqp->sq_run = NULL; 1372 1373 if (sqp->sq_first != NULL) { 1374 /* 1375 * Doing too much processing by worker thread 1376 * in presense of interrupts can be sub optimal. 1377 * Instead, once a drain is done by worker thread 1378 * for squeue_writerdrain_ns (the reason we are 1379 * here), we force wait for squeue_workerwait_tick 1380 * before doing more processing even if sq_wait is 1381 * set to 0. 1382 * 1383 * This can be counterproductive for performance 1384 * if worker thread is the only means to process 1385 * the packets (interrupts or writers are not 1386 * allowed inside the squeue). 1387 */ 1388 if (sqp->sq_tid == 0 && 1389 !(sqp->sq_state & SQS_TMO_PROG)) { 1390 timeout_id_t tid; 1391 1392 sqp->sq_state |= SQS_TMO_PROG; 1393 mutex_exit(&sqp->sq_lock); 1394 tid = timeout(squeue_fire, sqp, 1395 squeue_workerwait_tick); 1396 mutex_enter(&sqp->sq_lock); 1397 /* 1398 * Check again if we still need 1399 * the timeout 1400 */ 1401 if (((sqp->sq_state & (SQS_TMO_PROG|SQS_PROC)) 1402 == SQS_TMO_PROG) && (sqp->sq_tid == 0) && 1403 (sqp->sq_first != NULL)) { 1404 sqp->sq_state &= ~SQS_TMO_PROG; 1405 sqp->sq_awaken = lbolt; 1406 sqp->sq_tid = tid; 1407 } else if (sqp->sq_state & SQS_TMO_PROG) { 1408 /* timeout not needed */ 1409 sqp->sq_state &= ~SQS_TMO_PROG; 1410 mutex_exit(&(sqp)->sq_lock); 1411 (void) untimeout(tid); 1412 mutex_enter(&sqp->sq_lock); 1413 } 1414 } 1415 CALLB_CPR_SAFE_BEGIN(&cprinfo); 1416 cv_wait(async, lock); 1417 CALLB_CPR_SAFE_END(&cprinfo, lock); 1418 } 1419 1420 1421 #if SQUEUE_PROFILE 1422 if (SQ_PROFILING(sqp)) { 1423 SQDELTA(sqp, sq_time_worker, gethrtime() - start); 1424 } 1425 #endif 1426 } 1427 } 1428 1429 #if SQUEUE_PROFILE 1430 static int 1431 squeue_kstat_update(kstat_t *ksp, int rw) 1432 { 1433 struct squeue_kstat *sqsp = &squeue_kstat; 1434 squeue_t *sqp = ksp->ks_private; 1435 1436 if (rw == KSTAT_WRITE) 1437 return (EACCES); 1438 1439 #if SQUEUE_DEBUG 1440 sqsp->sq_count.value.ui64 = sqp->sq_count; 1441 sqsp->sq_max_qlen.value.ui64 = sqp->sq_stats.sq_max_qlen; 1442 #endif 1443 sqsp->sq_npackets_worker.value.ui64 = sqp->sq_stats.sq_npackets_worker; 1444 sqsp->sq_npackets_intr.value.ui64 = sqp->sq_stats.sq_npackets_intr; 1445 sqsp->sq_npackets_other.value.ui64 = sqp->sq_stats.sq_npackets_other; 1446 sqsp->sq_nqueued_intr.value.ui64 = sqp->sq_stats.sq_nqueued_intr; 1447 sqsp->sq_nqueued_other.value.ui64 = sqp->sq_stats.sq_nqueued_other; 1448 sqsp->sq_ndrains_worker.value.ui64 = sqp->sq_stats.sq_ndrains_worker; 1449 sqsp->sq_ndrains_intr.value.ui64 = sqp->sq_stats.sq_ndrains_intr; 1450 sqsp->sq_ndrains_other.value.ui64 = sqp->sq_stats.sq_ndrains_other; 1451 sqsp->sq_time_worker.value.ui64 = sqp->sq_stats.sq_time_worker; 1452 sqsp->sq_time_intr.value.ui64 = sqp->sq_stats.sq_time_intr; 1453 sqsp->sq_time_other.value.ui64 = sqp->sq_stats.sq_time_other; 1454 return (0); 1455 } 1456 #endif 1457 1458 void 1459 squeue_profile_enable(squeue_t *sqp) 1460 { 1461 mutex_enter(&sqp->sq_lock); 1462 sqp->sq_state |= SQS_PROFILE; 1463 mutex_exit(&sqp->sq_lock); 1464 } 1465 1466 void 1467 squeue_profile_disable(squeue_t *sqp) 1468 { 1469 mutex_enter(&sqp->sq_lock); 1470 sqp->sq_state &= ~SQS_PROFILE; 1471 mutex_exit(&sqp->sq_lock); 1472 } 1473 1474 void 1475 squeue_profile_reset(squeue_t *sqp) 1476 { 1477 #if SQUEUE_PROFILE 1478 bzero(&sqp->sq_stats, sizeof (sqstat_t)); 1479 #endif 1480 } 1481 1482 void 1483 squeue_profile_start(void) 1484 { 1485 #if SQUEUE_PROFILE 1486 squeue_profile = B_TRUE; 1487 #endif 1488 } 1489 1490 void 1491 squeue_profile_stop(void) 1492 { 1493 #if SQUEUE_PROFILE 1494 squeue_profile = B_FALSE; 1495 #endif 1496 } 1497 1498 uintptr_t * 1499 squeue_getprivate(squeue_t *sqp, sqprivate_t p) 1500 { 1501 ASSERT(p < SQPRIVATE_MAX); 1502 1503 return (&sqp->sq_private[p]); 1504 } 1505 1506 processorid_t 1507 squeue_binding(squeue_t *sqp) 1508 { 1509 return (sqp->sq_bind); 1510 } 1511