1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License, Version 1.0 only 6 * (the "License"). You may not use this file except in compliance 7 * with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or http://www.opensolaris.org/os/licensing. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 /* 23 * Copyright 2005 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 /* 30 * Squeues - TCP/IP serialization mechanism. 31 * 32 * This is a general purpose high-performance serialization mechanism. It is 33 * similar to a taskq with a single worker thread, the difference is that it 34 * does not imply a context switch - the thread placing a request may actually 35 * process it. It is also biased for processing requests in interrupt context. 36 * 37 * Each squeue has a worker thread which may optionally be bound to a CPU. 38 * 39 * Only one thread may process requests from a given squeue at any time. This is 40 * called "entering" squeue. 41 * 42 * Each dispatched request is processed either by 43 * 44 * a) Dispatching thread or 45 * b) Some other thread that is currently processing squeue at the time of 46 * request or 47 * c) worker thread. 48 * 49 * INTERFACES: 50 * 51 * squeue_t *squeue_create(name, bind, wait, pri) 52 * 53 * name: symbolic name for squeue. 54 * wait: time to wait before waiking the worker thread after queueing 55 * request. 56 * bind: preferred CPU binding for the worker thread. 57 * pri: thread priority for the worker thread. 58 * 59 * This function never fails and may sleep. It returns a transparent pointer 60 * to the squeue_t structure that is passed to all other squeue operations. 61 * 62 * void squeue_bind(sqp, bind) 63 * 64 * Bind squeue worker thread to a CPU specified by the 'bind' argument. The 65 * 'bind' value of -1 binds to the preferred thread specified for 66 * squeue_create. 67 * 68 * NOTE: Any value of 'bind' other then -1 is not supported currently, but the 69 * API is present - in the future it may be useful to specify different 70 * binding. 71 * 72 * void squeue_unbind(sqp) 73 * 74 * Unbind the worker thread from its preferred CPU. 75 * 76 * void squeue_enter(*sqp, *mp, proc, arg, tag) 77 * 78 * Post a single request for processing. Each request consists of mblock 'mp', 79 * function 'proc' to execute and an argument 'arg' to pass to this 80 * function. The function is called as (*proc)(arg, mp, sqp); The tag is an 81 * arbitrary number from 0 to 255 which will be stored in mp to track exact 82 * caller of squeue_enter. The combination of function name and the tag should 83 * provide enough information to identify the caller. 84 * 85 * If no one is processing the squeue, squeue_enter() will call the function 86 * immediately. Otherwise it will add the request to the queue for later 87 * processing. Once the function is executed, the thread may continue 88 * executing all other requests pending on the queue. 89 * 90 * NOTE: The tagging information is only used when SQUEUE_DEBUG is set to 1. 91 * NOTE: The argument can be conn_t only. Ideally we'd like to have generic 92 * argument, but we want to drop connection reference count here - this 93 * improves tail-call optimizations. 94 * XXX: The arg should have type conn_t. 95 * 96 * void squeue_enter_nodrain(*sqp, *mp, proc, arg, tag) 97 * 98 * Same as squeue_enter(), but the entering thread will only try to execute a 99 * single request. It will not continue executing any pending requests. 100 * 101 * void squeue_fill(*sqp, *mp, proc, arg, tag) 102 * 103 * Just place the request on the queue without trying to execute it. Arrange 104 * for the worker thread to process the request. 105 * 106 * void squeue_profile_enable(sqp) 107 * void squeue_profile_disable(sqp) 108 * 109 * Enable or disable profiling for specified 'sqp'. Profiling is only 110 * available when SQUEUE_PROFILE is set. 111 * 112 * void squeue_profile_reset(sqp) 113 * 114 * Reset all profiling information to zero. Profiling is only 115 * available when SQUEUE_PROFILE is set. 116 * 117 * void squeue_profile_start() 118 * void squeue_profile_stop() 119 * 120 * Globally enable or disabled profiling for all squeues. 121 * 122 * uintptr_t *squeue_getprivate(sqp, p) 123 * 124 * Each squeue keeps small amount of private data space available for various 125 * consumers. Current consumers include TCP and NCA. Other consumers need to 126 * add their private tag to the sqprivate_t enum. The private information is 127 * limited to an uintptr_t value. The squeue has no knowledge of its content 128 * and does not manage it in any way. 129 * 130 * The typical use may be a breakdown of data structures per CPU (since 131 * squeues are usually per CPU). See NCA for examples of use. 132 * Currently 'p' may have one legal value SQPRIVATE_TCP. 133 * 134 * processorid_t squeue_binding(sqp) 135 * 136 * Returns the CPU binding for a given squeue. 137 * 138 * TUNABALES: 139 * 140 * squeue_intrdrain_ms: Maximum time in ms interrupts spend draining any 141 * squeue. Note that this is approximation - squeues have no control on the 142 * time it takes to process each request. This limit is only checked 143 * between processing individual messages. 144 * Default: 20 ms. 145 * 146 * squeue_writerdrain_ms: Maximum time in ms non-interrupts spend draining any 147 * squeue. Note that this is approximation - squeues have no control on the 148 * time it takes to process each request. This limit is only checked 149 * between processing individual messages. 150 * Default: 10 ms. 151 * 152 * squeue_workerdrain_ms: Maximum time in ms worker thread spends draining any 153 * squeue. Note that this is approximation - squeues have no control on the 154 * time it takes to process each request. This limit is only checked 155 * between processing individual messages. 156 * Default: 10 ms. 157 * 158 * squeue_workerwait_ms: When worker thread is interrupted because workerdrain 159 * expired, how much time to wait before waking worker thread again. 160 * Default: 10 ms. 161 * 162 * DEFINES: 163 * 164 * SQUEUE_DEBUG: If defined as 1, special code is compiled in which records 165 * additional information aiding debugging is recorded in squeue. 166 * 167 * SQUEUE_PROFILE: If defined as 1, special code is compiled in which collects 168 * various squeue statistics and exports them as kstats. 169 * 170 * Ideally we would like both SQUEUE_DEBUG and SQUEUE_PROFILE to be always set, 171 * but it affects performance, so they are enabled on DEBUG kernels and disabled 172 * on non-DEBUG by default. 173 */ 174 175 #include <sys/types.h> 176 #include <sys/cmn_err.h> 177 #include <sys/debug.h> 178 #include <sys/kmem.h> 179 #include <sys/cpuvar.h> 180 #include <sys/condvar_impl.h> 181 #include <sys/systm.h> 182 #include <sys/callb.h> 183 #include <sys/sdt.h> 184 #include <sys/ddi.h> 185 186 #include <inet/ipclassifier.h> 187 188 /* 189 * State flags. 190 * Note: The MDB IP module depends on the values of these flags. 191 */ 192 #define SQS_PROC 0x0001 /* being processed */ 193 #define SQS_WORKER 0x0002 /* worker thread */ 194 #define SQS_ENTER 0x0004 /* enter thread */ 195 #define SQS_FAST 0x0008 /* enter-fast thread */ 196 #define SQS_USER 0x0010 /* A non interrupt user */ 197 #define SQS_BOUND 0x0020 /* Worker thread is bound */ 198 #define SQS_PROFILE 0x0040 /* Enable profiling */ 199 #define SQS_REENTER 0x0080 /* Re entered thread */ 200 #define SQS_TMO_PROG 0x0100 /* Timeout is being set */ 201 202 #ifdef DEBUG 203 #define SQUEUE_DEBUG 1 204 #define SQUEUE_PROFILE 1 205 #else 206 #define SQUEUE_DEBUG 0 207 #define SQUEUE_PROFILE 0 208 #endif 209 210 #include <sys/squeue_impl.h> 211 212 static void squeue_fire(void *); 213 static void squeue_drain(squeue_t *, uint_t, clock_t); 214 static void squeue_worker(squeue_t *sqp); 215 216 #if SQUEUE_PROFILE 217 static kmutex_t squeue_kstat_lock; 218 static int squeue_kstat_update(kstat_t *, int); 219 #endif 220 221 kmem_cache_t *squeue_cache; 222 223 int squeue_intrdrain_ms = 20; 224 int squeue_writerdrain_ms = 10; 225 int squeue_workerdrain_ms = 10; 226 int squeue_workerwait_ms = 10; 227 228 /* The values above converted to ticks */ 229 static int squeue_intrdrain_tick = 0; 230 static int squeue_writerdrain_tick = 0; 231 static int squeue_workerdrain_tick = 0; 232 static int squeue_workerwait_tick = 0; 233 234 /* 235 * The minimum packet queued when worker thread doing the drain triggers 236 * polling (if squeue allows it). The choice of 3 is arbitrary. You 237 * definitely don't want it to be 1 since that will trigger polling 238 * on very low loads as well (ssh seems to do be one such example 239 * where packet flow was very low yet somehow 1 packet ended up getting 240 * queued and worker thread fires every 10ms and blanking also gets 241 * triggered. 242 */ 243 int squeue_worker_poll_min = 3; 244 245 #if SQUEUE_PROFILE 246 /* 247 * Set to B_TRUE to enable profiling. 248 */ 249 static int squeue_profile = B_FALSE; 250 #define SQ_PROFILING(sqp) (squeue_profile && ((sqp)->sq_state & SQS_PROFILE)) 251 252 #define SQSTAT(sqp, x) ((sqp)->sq_stats.x++) 253 #define SQDELTA(sqp, x, d) ((sqp)->sq_stats.x += (d)) 254 255 struct squeue_kstat { 256 kstat_named_t sq_count; 257 kstat_named_t sq_max_qlen; 258 kstat_named_t sq_npackets_worker; 259 kstat_named_t sq_npackets_intr; 260 kstat_named_t sq_npackets_other; 261 kstat_named_t sq_nqueued_intr; 262 kstat_named_t sq_nqueued_other; 263 kstat_named_t sq_ndrains_worker; 264 kstat_named_t sq_ndrains_intr; 265 kstat_named_t sq_ndrains_other; 266 kstat_named_t sq_time_worker; 267 kstat_named_t sq_time_intr; 268 kstat_named_t sq_time_other; 269 } squeue_kstat = { 270 { "count", KSTAT_DATA_UINT64 }, 271 { "max_qlen", KSTAT_DATA_UINT64 }, 272 { "packets_worker", KSTAT_DATA_UINT64 }, 273 { "packets_intr", KSTAT_DATA_UINT64 }, 274 { "packets_other", KSTAT_DATA_UINT64 }, 275 { "queued_intr", KSTAT_DATA_UINT64 }, 276 { "queued_other", KSTAT_DATA_UINT64 }, 277 { "ndrains_worker", KSTAT_DATA_UINT64 }, 278 { "ndrains_intr", KSTAT_DATA_UINT64 }, 279 { "ndrains_other", KSTAT_DATA_UINT64 }, 280 { "time_worker", KSTAT_DATA_UINT64 }, 281 { "time_intr", KSTAT_DATA_UINT64 }, 282 { "time_other", KSTAT_DATA_UINT64 }, 283 }; 284 #endif 285 286 #define SQUEUE_WORKER_WAKEUP(sqp) { \ 287 timeout_id_t tid = (sqp)->sq_tid; \ 288 \ 289 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 290 /* \ 291 * Queue isn't being processed, so take \ 292 * any post enqueue actions needed before leaving. \ 293 */ \ 294 if (tid != 0) { \ 295 /* \ 296 * Waiting for an enter() to process mblk(s). \ 297 */ \ 298 clock_t waited = lbolt - (sqp)->sq_awaken; \ 299 \ 300 if (TICK_TO_MSEC(waited) >= (sqp)->sq_wait) { \ 301 /* \ 302 * Times up and have a worker thread \ 303 * waiting for work, so schedule it. \ 304 */ \ 305 (sqp)->sq_tid = 0; \ 306 (sqp)->sq_awaken = lbolt; \ 307 cv_signal(&(sqp)->sq_async); \ 308 mutex_exit(&(sqp)->sq_lock); \ 309 (void) untimeout(tid); \ 310 return; \ 311 } \ 312 mutex_exit(&(sqp)->sq_lock); \ 313 return; \ 314 } else if ((sqp)->sq_state & SQS_TMO_PROG) { \ 315 mutex_exit(&(sqp)->sq_lock); \ 316 return; \ 317 } else if ((sqp)->sq_wait != 0) { \ 318 clock_t wait = (sqp)->sq_wait; \ 319 /* \ 320 * Wait up to sqp->sq_wait ms for an \ 321 * enter() to process this queue. We \ 322 * don't want to contend on timeout locks \ 323 * with sq_lock held for performance reasons, \ 324 * so drop the sq_lock before calling timeout \ 325 * but we need to check if timeout is required \ 326 * after re acquiring the sq_lock. Once \ 327 * the sq_lock is dropped, someone else could \ 328 * have processed the packet or the timeout could \ 329 * have already fired. \ 330 */ \ 331 (sqp)->sq_state |= SQS_TMO_PROG; \ 332 mutex_exit(&(sqp)->sq_lock); \ 333 tid = timeout(squeue_fire, (sqp), wait); \ 334 mutex_enter(&(sqp)->sq_lock); \ 335 /* Check again if we still need the timeout */ \ 336 if ((((sqp)->sq_state & (SQS_PROC|SQS_TMO_PROG)) == \ 337 SQS_TMO_PROG) && ((sqp)->sq_tid == 0) && \ 338 ((sqp)->sq_first != NULL)) { \ 339 (sqp)->sq_state &= ~SQS_TMO_PROG; \ 340 (sqp)->sq_awaken = lbolt; \ 341 (sqp)->sq_tid = tid; \ 342 mutex_exit(&(sqp)->sq_lock); \ 343 return; \ 344 } else { \ 345 if ((sqp)->sq_state & SQS_TMO_PROG) { \ 346 (sqp)->sq_state &= ~SQS_TMO_PROG; \ 347 mutex_exit(&(sqp)->sq_lock); \ 348 (void) untimeout(tid); \ 349 } else { \ 350 /* \ 351 * The timer fired before we could \ 352 * reacquire the sq_lock. squeue_fire \ 353 * removes the SQS_TMO_PROG flag \ 354 * and we don't need to do anything \ 355 * else. \ 356 */ \ 357 mutex_exit(&(sqp)->sq_lock); \ 358 } \ 359 } \ 360 } else { \ 361 /* \ 362 * Schedule the worker thread. \ 363 */ \ 364 (sqp)->sq_awaken = lbolt; \ 365 cv_signal(&(sqp)->sq_async); \ 366 mutex_exit(&(sqp)->sq_lock); \ 367 } \ 368 ASSERT(MUTEX_NOT_HELD(&(sqp)->sq_lock)); \ 369 } 370 371 #define ENQUEUE_MP(sqp, mp, proc, arg) { \ 372 /* \ 373 * Enque our mblk. \ 374 */ \ 375 (mp)->b_queue = NULL; \ 376 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 377 ASSERT((mp)->b_prev == NULL && (mp)->b_next == NULL); \ 378 (mp)->b_queue = (queue_t *)(proc); \ 379 (mp)->b_prev = (mblk_t *)(arg); \ 380 \ 381 if ((sqp)->sq_last != NULL) \ 382 (sqp)->sq_last->b_next = (mp); \ 383 else \ 384 (sqp)->sq_first = (mp); \ 385 (sqp)->sq_last = (mp); \ 386 (sqp)->sq_count++; \ 387 ASSERT((sqp)->sq_count > 0); \ 388 DTRACE_PROBE2(squeue__enqueue, squeue_t *, sqp, \ 389 mblk_t *, mp); \ 390 } 391 392 393 #define ENQUEUE_CHAIN(sqp, mp, tail, cnt) { \ 394 /* \ 395 * Enqueue our mblk chain. \ 396 */ \ 397 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 398 \ 399 if ((sqp)->sq_last != NULL) \ 400 (sqp)->sq_last->b_next = (mp); \ 401 else \ 402 (sqp)->sq_first = (mp); \ 403 (sqp)->sq_last = (tail); \ 404 (sqp)->sq_count += (cnt); \ 405 ASSERT((sqp)->sq_count > 0); \ 406 DTRACE_PROBE4(squeue__enqueuechain, squeue_t *, sqp, \ 407 mblk_t *, mp, mblk_t *, tail, int, cnt); \ 408 \ 409 } 410 411 #define SQS_POLLING_ON(sqp, rx_ring) { \ 412 ASSERT(rx_ring != NULL); \ 413 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 414 rx_ring->rr_blank(rx_ring->rr_handle, \ 415 MIN((sqp->sq_avg_drain_time * sqp->sq_count), \ 416 rx_ring->rr_max_blank_time), \ 417 rx_ring->rr_max_pkt_cnt); \ 418 rx_ring->rr_poll_state |= ILL_POLLING; \ 419 rx_ring->rr_poll_time = lbolt; \ 420 } 421 422 423 #define SQS_POLLING_OFF(sqp, rx_ring) { \ 424 ASSERT(rx_ring != NULL); \ 425 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 426 rx_ring->rr_blank(rx_ring->rr_handle, \ 427 rx_ring->rr_min_blank_time, \ 428 rx_ring->rr_min_pkt_cnt); \ 429 } 430 431 void 432 squeue_init(void) 433 { 434 squeue_cache = kmem_cache_create("squeue_cache", 435 sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0); 436 437 squeue_intrdrain_tick = MSEC_TO_TICK_ROUNDUP(squeue_intrdrain_ms); 438 squeue_writerdrain_tick = MSEC_TO_TICK_ROUNDUP(squeue_writerdrain_ms); 439 squeue_workerdrain_tick = MSEC_TO_TICK_ROUNDUP(squeue_workerdrain_ms); 440 squeue_workerwait_tick = MSEC_TO_TICK_ROUNDUP(squeue_workerwait_ms); 441 } 442 443 /* ARGSUSED */ 444 squeue_t * 445 squeue_create(char *name, processorid_t bind, clock_t wait, pri_t pri) 446 { 447 squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP); 448 449 bzero(sqp, sizeof (squeue_t)); 450 (void) strncpy(sqp->sq_name, name, SQ_NAMELEN + 1); 451 sqp->sq_name[SQ_NAMELEN] = '\0'; 452 453 sqp->sq_bind = bind; 454 sqp->sq_wait = MSEC_TO_TICK(wait); 455 sqp->sq_avg_drain_time = 456 drv_hztousec(squeue_intrdrain_tick)/squeue_intrdrain_tick; 457 458 #if SQUEUE_PROFILE 459 if ((sqp->sq_kstat = kstat_create("ip", bind, name, 460 "net", KSTAT_TYPE_NAMED, 461 sizeof (squeue_kstat) / sizeof (kstat_named_t), 462 KSTAT_FLAG_VIRTUAL)) != NULL) { 463 sqp->sq_kstat->ks_lock = &squeue_kstat_lock; 464 sqp->sq_kstat->ks_data = &squeue_kstat; 465 sqp->sq_kstat->ks_update = squeue_kstat_update; 466 sqp->sq_kstat->ks_private = sqp; 467 kstat_install(sqp->sq_kstat); 468 } 469 #endif 470 471 sqp->sq_worker = thread_create(NULL, 0, squeue_worker, 472 sqp, 0, &p0, TS_RUN, pri); 473 474 return (sqp); 475 } 476 477 /* ARGSUSED */ 478 void 479 squeue_bind(squeue_t *sqp, processorid_t bind) 480 { 481 ASSERT(bind == -1); 482 483 mutex_enter(&sqp->sq_lock); 484 if (sqp->sq_state & SQS_BOUND) { 485 mutex_exit(&sqp->sq_lock); 486 return; 487 } 488 489 sqp->sq_state |= SQS_BOUND; 490 mutex_exit(&sqp->sq_lock); 491 492 thread_affinity_set(sqp->sq_worker, sqp->sq_bind); 493 } 494 495 void 496 squeue_unbind(squeue_t *sqp) 497 { 498 mutex_enter(&sqp->sq_lock); 499 if (!(sqp->sq_state & SQS_BOUND)) { 500 mutex_exit(&sqp->sq_lock); 501 return; 502 } 503 504 sqp->sq_state &= ~SQS_BOUND; 505 mutex_exit(&sqp->sq_lock); 506 507 thread_affinity_clear(sqp->sq_worker); 508 } 509 510 /* 511 * squeue_enter() - enter squeue sqp with mblk mp (which can be 512 * a chain), while tail points to the end and cnt in number of 513 * mblks in the chain. 514 * 515 * For a chain of single packet (i.e. mp == tail), go through the 516 * fast path if no one is processing the squeue and nothing is queued. 517 * 518 * The proc and arg for each mblk is already stored in the mblk in 519 * appropriate places. 520 */ 521 void 522 squeue_enter_chain(squeue_t *sqp, mblk_t *mp, mblk_t *tail, 523 uint32_t cnt, uint8_t tag) 524 { 525 int interrupt = servicing_interrupt(); 526 void *arg; 527 sqproc_t proc; 528 #if SQUEUE_PROFILE 529 hrtime_t start, delta; 530 #endif 531 532 ASSERT(sqp != NULL); 533 ASSERT(mp != NULL); 534 ASSERT(tail != NULL); 535 ASSERT(cnt > 0); 536 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 537 538 mutex_enter(&sqp->sq_lock); 539 if (!(sqp->sq_state & SQS_PROC)) { 540 /* 541 * See if anything is already queued. If we are the 542 * first packet, do inline processing else queue the 543 * packet and do the drain. 544 */ 545 sqp->sq_run = curthread; 546 if (sqp->sq_first == NULL && cnt == 1) { 547 /* 548 * Fast-path, ok to process and nothing queued. 549 */ 550 sqp->sq_state |= (SQS_PROC|SQS_FAST); 551 mutex_exit(&sqp->sq_lock); 552 553 /* 554 * We are the chain of 1 packet so 555 * go through this fast path. 556 */ 557 arg = mp->b_prev; 558 mp->b_prev = NULL; 559 proc = (sqproc_t)mp->b_queue; 560 mp->b_queue = NULL; 561 562 ASSERT(proc != NULL); 563 ASSERT(arg != NULL); 564 ASSERT(mp->b_next == NULL); 565 566 #if SQUEUE_DEBUG 567 sqp->sq_isintr = interrupt; 568 sqp->sq_curmp = mp; 569 sqp->sq_curproc = proc; 570 sqp->sq_connp = arg; 571 mp->b_tag = sqp->sq_tag = tag; 572 #endif 573 #if SQUEUE_PROFILE 574 if (SQ_PROFILING(sqp)) { 575 if (interrupt) 576 SQSTAT(sqp, sq_npackets_intr); 577 else 578 SQSTAT(sqp, sq_npackets_other); 579 start = gethrtime(); 580 } 581 #endif 582 ((conn_t *)arg)->conn_on_sqp = B_TRUE; 583 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 584 sqp, mblk_t *, mp, conn_t *, arg); 585 (*proc)(arg, mp, sqp); 586 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 587 sqp, conn_t *, arg); 588 ((conn_t *)arg)->conn_on_sqp = B_FALSE; 589 590 #if SQUEUE_PROFILE 591 if (SQ_PROFILING(sqp)) { 592 delta = gethrtime() - start; 593 if (interrupt) 594 SQDELTA(sqp, sq_time_intr, delta); 595 else 596 SQDELTA(sqp, sq_time_other, delta); 597 } 598 #endif 599 #if SQUEUE_DEBUG 600 sqp->sq_curmp = NULL; 601 sqp->sq_curproc = NULL; 602 sqp->sq_connp = NULL; 603 sqp->sq_isintr = 0; 604 #endif 605 606 CONN_DEC_REF((conn_t *)arg); 607 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 608 mutex_enter(&sqp->sq_lock); 609 sqp->sq_state &= ~(SQS_PROC|SQS_FAST); 610 if (sqp->sq_first == NULL) { 611 /* 612 * We processed inline our packet and 613 * nothing new has arrived. We are done. 614 */ 615 sqp->sq_run = NULL; 616 mutex_exit(&sqp->sq_lock); 617 return; 618 } else if (sqp->sq_bind != CPU->cpu_id) { 619 /* 620 * If the current thread is not running 621 * on the CPU to which this squeue is bound, 622 * then don't allow it to drain. 623 */ 624 sqp->sq_run = NULL; 625 SQUEUE_WORKER_WAKEUP(sqp); 626 return; 627 } 628 } else { 629 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 630 #if SQUEUE_DEBUG 631 mp->b_tag = tag; 632 #endif 633 #if SQUEUE_PROFILE 634 if (SQ_PROFILING(sqp)) { 635 if (servicing_interrupt()) 636 SQSTAT(sqp, sq_nqueued_intr); 637 else 638 SQSTAT(sqp, sq_nqueued_other); 639 if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) 640 sqp->sq_stats.sq_max_qlen = 641 sqp->sq_count; 642 } 643 #endif 644 } 645 646 /* 647 * We are here because either we couldn't do inline 648 * processing (because something was already queued), 649 * or we had a chanin of more than one packet, 650 * or something else arrived after we were done with 651 * inline processing. 652 */ 653 ASSERT(MUTEX_HELD(&sqp->sq_lock)); 654 ASSERT(sqp->sq_first != NULL); 655 656 #if SQUEUE_PROFILE 657 if (SQ_PROFILING(sqp)) { 658 start = gethrtime(); 659 } 660 #endif 661 #if SQUEUE_DEBUG 662 sqp->sq_isintr = interrupt; 663 #endif 664 665 if (interrupt) { 666 squeue_drain(sqp, SQS_ENTER, lbolt + 667 squeue_intrdrain_tick); 668 } else { 669 squeue_drain(sqp, SQS_USER, lbolt + 670 squeue_writerdrain_tick); 671 } 672 673 #if SQUEUE_PROFILE 674 if (SQ_PROFILING(sqp)) { 675 delta = gethrtime() - start; 676 if (interrupt) 677 SQDELTA(sqp, sq_time_intr, delta); 678 else 679 SQDELTA(sqp, sq_time_other, delta); 680 } 681 #endif 682 #if SQUEUE_DEBUG 683 sqp->sq_isintr = 0; 684 #endif 685 686 /* 687 * If we didn't do a complete drain, the worker 688 * thread was already signalled by squeue_drain. 689 */ 690 sqp->sq_run = NULL; 691 mutex_exit(&sqp->sq_lock); 692 return; 693 } else { 694 ASSERT(sqp->sq_run != NULL); 695 /* 696 * Queue is already being processed. Just enqueue 697 * the packet and go away. 698 */ 699 #if SQUEUE_DEBUG 700 mp->b_tag = tag; 701 #endif 702 #if SQUEUE_PROFILE 703 if (SQ_PROFILING(sqp)) { 704 if (servicing_interrupt()) 705 SQSTAT(sqp, sq_nqueued_intr); 706 else 707 SQSTAT(sqp, sq_nqueued_other); 708 if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) 709 sqp->sq_stats.sq_max_qlen = sqp->sq_count; 710 } 711 #endif 712 713 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 714 mutex_exit(&sqp->sq_lock); 715 return; 716 } 717 } 718 719 /* 720 * squeue_enter() - enter squeue *sqp with mblk *mp with argument of *arg. 721 */ 722 void 723 squeue_enter(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void *arg, 724 uint8_t tag) 725 { 726 int interrupt = servicing_interrupt(); 727 #if SQUEUE_PROFILE 728 hrtime_t start, delta; 729 #endif 730 #if SQUEUE_DEBUG 731 conn_t *connp = (conn_t *)arg; 732 ASSERT(!IPCL_IS_TCP(connp) || connp->conn_tcp->tcp_connp == connp); 733 ASSERT(!IPCL_IS_UDP(connp) || connp->conn_udp->udp_connp == connp); 734 #endif 735 736 ASSERT(proc != NULL); 737 ASSERT(sqp != NULL); 738 ASSERT(mp != NULL); 739 ASSERT(mp->b_next == NULL); 740 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 741 742 mutex_enter(&sqp->sq_lock); 743 if (!(sqp->sq_state & SQS_PROC)) { 744 /* 745 * See if anything is already queued. If we are the 746 * first packet, do inline processing else queue the 747 * packet and do the drain. 748 */ 749 sqp->sq_run = curthread; 750 if (sqp->sq_first == NULL) { 751 /* 752 * Fast-path, ok to process and nothing queued. 753 */ 754 sqp->sq_state |= (SQS_PROC|SQS_FAST); 755 mutex_exit(&sqp->sq_lock); 756 757 #if SQUEUE_DEBUG 758 sqp->sq_isintr = interrupt; 759 sqp->sq_curmp = mp; 760 sqp->sq_curproc = proc; 761 sqp->sq_connp = connp; 762 mp->b_tag = sqp->sq_tag = tag; 763 #endif 764 #if SQUEUE_PROFILE 765 if (SQ_PROFILING(sqp)) { 766 if (interrupt) 767 SQSTAT(sqp, sq_npackets_intr); 768 else 769 SQSTAT(sqp, sq_npackets_other); 770 start = gethrtime(); 771 } 772 #endif 773 ((conn_t *)arg)->conn_on_sqp = B_TRUE; 774 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 775 sqp, mblk_t *, mp, conn_t *, arg); 776 (*proc)(arg, mp, sqp); 777 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 778 sqp, conn_t *, arg); 779 ((conn_t *)arg)->conn_on_sqp = B_FALSE; 780 781 #if SQUEUE_PROFILE 782 if (SQ_PROFILING(sqp)) { 783 delta = gethrtime() - start; 784 if (interrupt) 785 SQDELTA(sqp, sq_time_intr, delta); 786 else 787 SQDELTA(sqp, sq_time_other, delta); 788 } 789 #endif 790 #if SQUEUE_DEBUG 791 sqp->sq_curmp = NULL; 792 sqp->sq_curproc = NULL; 793 sqp->sq_connp = NULL; 794 sqp->sq_isintr = 0; 795 #endif 796 797 CONN_DEC_REF((conn_t *)arg); 798 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 799 mutex_enter(&sqp->sq_lock); 800 sqp->sq_state &= ~(SQS_PROC|SQS_FAST); 801 if (sqp->sq_first == NULL) { 802 /* 803 * We processed inline our packet and 804 * nothing new has arrived. We are done. 805 */ 806 sqp->sq_run = NULL; 807 mutex_exit(&sqp->sq_lock); 808 return; 809 } else if (sqp->sq_bind != CPU->cpu_id) { 810 /* 811 * If the current thread is not running 812 * on the CPU to which this squeue is bound, 813 * then don't allow it to drain. 814 */ 815 sqp->sq_run = NULL; 816 SQUEUE_WORKER_WAKEUP(sqp); 817 return; 818 } 819 } else { 820 ENQUEUE_MP(sqp, mp, proc, arg); 821 #if SQUEUE_DEBUG 822 mp->b_tag = tag; 823 #endif 824 #if SQUEUE_PROFILE 825 if (SQ_PROFILING(sqp)) { 826 if (servicing_interrupt()) 827 SQSTAT(sqp, sq_nqueued_intr); 828 else 829 SQSTAT(sqp, sq_nqueued_other); 830 if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) 831 sqp->sq_stats.sq_max_qlen = 832 sqp->sq_count; 833 } 834 #endif 835 } 836 837 /* 838 * We are here because either we couldn't do inline 839 * processing (because something was already queued) 840 * or something else arrived after we were done with 841 * inline processing. 842 */ 843 ASSERT(MUTEX_HELD(&sqp->sq_lock)); 844 ASSERT(sqp->sq_first != NULL); 845 846 #if SQUEUE_PROFILE 847 if (SQ_PROFILING(sqp)) { 848 start = gethrtime(); 849 } 850 #endif 851 #if SQUEUE_DEBUG 852 sqp->sq_isintr = interrupt; 853 #endif 854 855 if (interrupt) { 856 squeue_drain(sqp, SQS_ENTER, lbolt + 857 squeue_intrdrain_tick); 858 } else { 859 squeue_drain(sqp, SQS_USER, lbolt + 860 squeue_writerdrain_tick); 861 } 862 863 #if SQUEUE_PROFILE 864 if (SQ_PROFILING(sqp)) { 865 delta = gethrtime() - start; 866 if (interrupt) 867 SQDELTA(sqp, sq_time_intr, delta); 868 else 869 SQDELTA(sqp, sq_time_other, delta); 870 } 871 #endif 872 #if SQUEUE_DEBUG 873 sqp->sq_isintr = 0; 874 #endif 875 876 /* 877 * If we didn't do a complete drain, the worker 878 * thread was already signalled by squeue_drain. 879 */ 880 sqp->sq_run = NULL; 881 mutex_exit(&sqp->sq_lock); 882 return; 883 } else { 884 ASSERT(sqp->sq_run != NULL); 885 /* 886 * We let a thread processing a squeue reenter only 887 * once. This helps the case of incoming connection 888 * where a SYN-ACK-ACK that triggers the conn_ind 889 * doesn't have to queue the packet if listener and 890 * eager are on the same squeue. Also helps the 891 * loopback connection where the two ends are bound 892 * to the same squeue (which is typical on single 893 * CPU machines). 894 * We let the thread reenter only once for the fear 895 * of stack getting blown with multiple traversal. 896 */ 897 if (!(sqp->sq_state & SQS_REENTER) && 898 (sqp->sq_run == curthread) && 899 (((conn_t *)arg)->conn_on_sqp == B_FALSE)) { 900 sqp->sq_state |= SQS_REENTER; 901 mutex_exit(&sqp->sq_lock); 902 903 ((conn_t *)arg)->conn_on_sqp = B_TRUE; 904 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 905 sqp, mblk_t *, mp, conn_t *, arg); 906 (*proc)(arg, mp, sqp); 907 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 908 sqp, conn_t *, arg); 909 ((conn_t *)arg)->conn_on_sqp = B_FALSE; 910 CONN_DEC_REF((conn_t *)arg); 911 912 mutex_enter(&sqp->sq_lock); 913 sqp->sq_state &= ~SQS_REENTER; 914 mutex_exit(&sqp->sq_lock); 915 return; 916 } 917 /* 918 * Queue is already being processed. Just enqueue 919 * the packet and go away. 920 */ 921 #if SQUEUE_DEBUG 922 mp->b_tag = tag; 923 #endif 924 #if SQUEUE_PROFILE 925 if (SQ_PROFILING(sqp)) { 926 if (servicing_interrupt()) 927 SQSTAT(sqp, sq_nqueued_intr); 928 else 929 SQSTAT(sqp, sq_nqueued_other); 930 if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) 931 sqp->sq_stats.sq_max_qlen = sqp->sq_count; 932 } 933 #endif 934 935 ENQUEUE_MP(sqp, mp, proc, arg); 936 mutex_exit(&sqp->sq_lock); 937 return; 938 } 939 } 940 941 void 942 squeue_enter_nodrain(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void *arg, 943 uint8_t tag) 944 { 945 int interrupt = servicing_interrupt(); 946 boolean_t being_processed; 947 #if SQUEUE_DEBUG 948 conn_t *connp = (conn_t *)arg; 949 #endif 950 #if SQUEUE_PROFILE 951 hrtime_t start, delta; 952 #endif 953 954 ASSERT(proc != NULL); 955 ASSERT(sqp != NULL); 956 ASSERT(mp != NULL); 957 ASSERT(mp->b_next == NULL); 958 ASSERT(!IPCL_IS_TCP(connp) || connp->conn_tcp->tcp_connp == connp); 959 ASSERT(!IPCL_IS_UDP(connp) || connp->conn_udp->udp_connp == connp); 960 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 961 962 mutex_enter(&sqp->sq_lock); 963 964 being_processed = (sqp->sq_state & SQS_PROC); 965 if (!being_processed && (sqp->sq_first == NULL)) { 966 /* 967 * Fast-path, ok to process and nothing queued. 968 */ 969 sqp->sq_state |= (SQS_PROC|SQS_FAST); 970 sqp->sq_run = curthread; 971 mutex_exit(&sqp->sq_lock); 972 973 #if SQUEUE_DEBUG 974 sqp->sq_isintr = interrupt; 975 sqp->sq_curmp = mp; 976 sqp->sq_curproc = proc; 977 sqp->sq_connp = connp; 978 mp->b_tag = sqp->sq_tag = tag; 979 #endif 980 981 #if SQUEUE_PROFILE 982 if (SQ_PROFILING(sqp)) { 983 if (interrupt) 984 SQSTAT(sqp, sq_npackets_intr); 985 else 986 SQSTAT(sqp, sq_npackets_other); 987 start = gethrtime(); 988 } 989 #endif 990 991 ((conn_t *)arg)->conn_on_sqp = B_TRUE; 992 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 993 sqp, mblk_t *, mp, conn_t *, arg); 994 (*proc)(arg, mp, sqp); 995 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 996 sqp, conn_t *, arg); 997 ((conn_t *)arg)->conn_on_sqp = B_FALSE; 998 999 #if SQUEUE_DEBUG 1000 sqp->sq_curmp = NULL; 1001 sqp->sq_curproc = NULL; 1002 sqp->sq_connp = NULL; 1003 sqp->sq_isintr = 0; 1004 #endif 1005 #if SQUEUE_PROFILE 1006 if (SQ_PROFILING(sqp)) { 1007 delta = gethrtime() - start; 1008 if (interrupt) 1009 SQDELTA(sqp, sq_time_intr, delta); 1010 else 1011 SQDELTA(sqp, sq_time_other, delta); 1012 } 1013 #endif 1014 1015 CONN_DEC_REF((conn_t *)arg); 1016 mutex_enter(&sqp->sq_lock); 1017 sqp->sq_state &= ~(SQS_PROC|SQS_FAST); 1018 sqp->sq_run = NULL; 1019 if (sqp->sq_first == NULL) { 1020 /* 1021 * We processed inline our packet and 1022 * nothing new has arrived. We are done. 1023 */ 1024 mutex_exit(&sqp->sq_lock); 1025 } else { 1026 SQUEUE_WORKER_WAKEUP(sqp); 1027 } 1028 return; 1029 } else { 1030 /* 1031 * We let a thread processing a squeue reenter only 1032 * once. This helps the case of incoming connection 1033 * where a SYN-ACK-ACK that triggers the conn_ind 1034 * doesn't have to queue the packet if listener and 1035 * eager are on the same squeue. Also helps the 1036 * loopback connection where the two ends are bound 1037 * to the same squeue (which is typical on single 1038 * CPU machines). 1039 * We let the thread reenter only once for the fear 1040 * of stack getting blown with multiple traversal. 1041 */ 1042 if (being_processed && !(sqp->sq_state & SQS_REENTER) && 1043 (sqp->sq_run == curthread) && 1044 (((conn_t *)arg)->conn_on_sqp == B_FALSE)) { 1045 sqp->sq_state |= SQS_REENTER; 1046 mutex_exit(&sqp->sq_lock); 1047 1048 ((conn_t *)arg)->conn_on_sqp = B_TRUE; 1049 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 1050 sqp, mblk_t *, mp, conn_t *, arg); 1051 (*proc)(arg, mp, sqp); 1052 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 1053 sqp, conn_t *, arg); 1054 ((conn_t *)arg)->conn_on_sqp = B_FALSE; 1055 CONN_DEC_REF((conn_t *)arg); 1056 1057 mutex_enter(&sqp->sq_lock); 1058 sqp->sq_state &= ~SQS_REENTER; 1059 mutex_exit(&sqp->sq_lock); 1060 return; 1061 } 1062 1063 #if SQUEUE_DEBUG 1064 mp->b_tag = tag; 1065 #endif 1066 #if SQUEUE_PROFILE 1067 if (SQ_PROFILING(sqp)) { 1068 if (servicing_interrupt()) 1069 SQSTAT(sqp, sq_nqueued_intr); 1070 else 1071 SQSTAT(sqp, sq_nqueued_other); 1072 if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) 1073 sqp->sq_stats.sq_max_qlen = sqp->sq_count; 1074 } 1075 #endif 1076 ENQUEUE_MP(sqp, mp, proc, arg); 1077 if (being_processed) { 1078 /* 1079 * Queue is already being processed. 1080 * No need to do anything. 1081 */ 1082 mutex_exit(&sqp->sq_lock); 1083 return; 1084 } 1085 SQUEUE_WORKER_WAKEUP(sqp); 1086 } 1087 } 1088 1089 /* 1090 * squeue_fill() - fill squeue *sqp with mblk *mp with argument of *arg 1091 * without processing the squeue. 1092 */ 1093 /* ARGSUSED */ 1094 void 1095 squeue_fill(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void * arg, 1096 uint8_t tag) 1097 { 1098 #if SQUEUE_DEBUG 1099 conn_t *connp = (conn_t *)arg; 1100 #endif 1101 ASSERT(proc != NULL); 1102 ASSERT(sqp != NULL); 1103 ASSERT(mp != NULL); 1104 ASSERT(mp->b_next == NULL); 1105 ASSERT(!IPCL_IS_TCP(connp) || connp->conn_tcp->tcp_connp == connp); 1106 ASSERT(!IPCL_IS_UDP(connp) || connp->conn_udp->udp_connp == connp); 1107 1108 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 1109 mutex_enter(&sqp->sq_lock); 1110 ENQUEUE_MP(sqp, mp, proc, arg); 1111 #if SQUEUE_DEBUG 1112 mp->b_tag = tag; 1113 #endif 1114 #if SQUEUE_PROFILE 1115 if (SQ_PROFILING(sqp)) { 1116 if (servicing_interrupt()) 1117 SQSTAT(sqp, sq_nqueued_intr); 1118 else 1119 SQSTAT(sqp, sq_nqueued_other); 1120 if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) 1121 sqp->sq_stats.sq_max_qlen = sqp->sq_count; 1122 } 1123 #endif 1124 1125 /* 1126 * If queue is already being processed. No need to do anything. 1127 */ 1128 if (sqp->sq_state & SQS_PROC) { 1129 mutex_exit(&sqp->sq_lock); 1130 return; 1131 } 1132 1133 SQUEUE_WORKER_WAKEUP(sqp); 1134 } 1135 1136 1137 /* 1138 * PRIVATE FUNCTIONS 1139 */ 1140 1141 static void 1142 squeue_fire(void *arg) 1143 { 1144 squeue_t *sqp = arg; 1145 uint_t state; 1146 1147 mutex_enter(&sqp->sq_lock); 1148 1149 state = sqp->sq_state; 1150 if (sqp->sq_tid == 0 && !(state & SQS_TMO_PROG)) { 1151 mutex_exit(&sqp->sq_lock); 1152 return; 1153 } 1154 1155 sqp->sq_tid = 0; 1156 /* 1157 * The timeout fired before we got a chance to set it. 1158 * Process it anyway but remove the SQS_TMO_PROG so that 1159 * the guy trying to set the timeout knows that it has 1160 * already been processed. 1161 */ 1162 if (state & SQS_TMO_PROG) 1163 sqp->sq_state &= ~SQS_TMO_PROG; 1164 1165 if (!(state & SQS_PROC)) { 1166 sqp->sq_awaken = lbolt; 1167 cv_signal(&sqp->sq_async); 1168 } 1169 mutex_exit(&sqp->sq_lock); 1170 } 1171 1172 static void 1173 squeue_drain(squeue_t *sqp, uint_t proc_type, clock_t expire) 1174 { 1175 mblk_t *mp; 1176 mblk_t *head; 1177 sqproc_t proc; 1178 conn_t *connp; 1179 clock_t start = lbolt; 1180 clock_t drain_time; 1181 timeout_id_t tid; 1182 uint_t cnt; 1183 uint_t total_cnt = 0; 1184 ill_rx_ring_t *sq_rx_ring = sqp->sq_rx_ring; 1185 int interrupt = servicing_interrupt(); 1186 boolean_t poll_on = B_FALSE; 1187 1188 ASSERT(mutex_owned(&sqp->sq_lock)); 1189 ASSERT(!(sqp->sq_state & SQS_PROC)); 1190 1191 #if SQUEUE_PROFILE 1192 if (SQ_PROFILING(sqp)) { 1193 if (interrupt) 1194 SQSTAT(sqp, sq_ndrains_intr); 1195 else if (!(proc_type & SQS_WORKER)) 1196 SQSTAT(sqp, sq_ndrains_other); 1197 else 1198 SQSTAT(sqp, sq_ndrains_worker); 1199 } 1200 #endif 1201 1202 if ((tid = sqp->sq_tid) != 0) 1203 sqp->sq_tid = 0; 1204 1205 sqp->sq_state |= SQS_PROC | proc_type; 1206 head = sqp->sq_first; 1207 sqp->sq_first = NULL; 1208 sqp->sq_last = NULL; 1209 cnt = sqp->sq_count; 1210 1211 /* 1212 * We have backlog built up. Switch to polling mode if the 1213 * device underneath allows it. Need to do it only for 1214 * drain by non-interrupt thread so interrupts don't 1215 * come and disrupt us in between. If its a interrupt thread, 1216 * no need because most devices will not issue another 1217 * interrupt till this one returns. 1218 */ 1219 if ((sqp->sq_state & SQS_POLL_CAPAB) && !(proc_type & SQS_ENTER) && 1220 (sqp->sq_count > squeue_worker_poll_min)) { 1221 ASSERT(sq_rx_ring != NULL); 1222 SQS_POLLING_ON(sqp, sq_rx_ring); 1223 poll_on = B_TRUE; 1224 } 1225 1226 mutex_exit(&sqp->sq_lock); 1227 1228 if (tid != 0) 1229 (void) untimeout(tid); 1230 again: 1231 while ((mp = head) != NULL) { 1232 head = mp->b_next; 1233 mp->b_next = NULL; 1234 1235 proc = (sqproc_t)mp->b_queue; 1236 mp->b_queue = NULL; 1237 connp = (conn_t *)mp->b_prev; 1238 mp->b_prev = NULL; 1239 #if SQUEUE_DEBUG 1240 sqp->sq_curmp = mp; 1241 sqp->sq_curproc = proc; 1242 sqp->sq_connp = connp; 1243 sqp->sq_tag = mp->b_tag; 1244 #endif 1245 1246 #if SQUEUE_PROFILE 1247 if (SQ_PROFILING(sqp)) { 1248 if (interrupt) 1249 SQSTAT(sqp, sq_npackets_intr); 1250 else if (!(proc_type & SQS_WORKER)) 1251 SQSTAT(sqp, sq_npackets_other); 1252 else 1253 SQSTAT(sqp, sq_npackets_worker); 1254 } 1255 #endif 1256 1257 connp->conn_on_sqp = B_TRUE; 1258 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 1259 sqp, mblk_t *, mp, conn_t *, connp); 1260 (*proc)(connp, mp, sqp); 1261 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 1262 sqp, conn_t *, connp); 1263 connp->conn_on_sqp = B_FALSE; 1264 CONN_DEC_REF(connp); 1265 } 1266 1267 1268 #if SQUEUE_DEBUG 1269 sqp->sq_curmp = NULL; 1270 sqp->sq_curproc = NULL; 1271 sqp->sq_connp = NULL; 1272 #endif 1273 1274 mutex_enter(&sqp->sq_lock); 1275 sqp->sq_count -= cnt; 1276 total_cnt += cnt; 1277 1278 if (sqp->sq_first != NULL) { 1279 if (!expire || (lbolt < expire)) { 1280 /* More arrived and time not expired */ 1281 head = sqp->sq_first; 1282 sqp->sq_first = NULL; 1283 sqp->sq_last = NULL; 1284 cnt = sqp->sq_count; 1285 mutex_exit(&sqp->sq_lock); 1286 goto again; 1287 } 1288 1289 /* 1290 * If we are not worker thread and we 1291 * reached our time limit to do drain, 1292 * signal the worker thread to pick 1293 * up the work. 1294 * If we were the worker thread, then 1295 * we take a break to allow an interrupt 1296 * or writer to pick up the load. 1297 */ 1298 if (proc_type != SQS_WORKER) { 1299 sqp->sq_awaken = lbolt; 1300 cv_signal(&sqp->sq_async); 1301 } 1302 } 1303 1304 /* 1305 * Try to see if we can get a time estimate to process a packet. 1306 * Do it only in interrupt context since less chance of context 1307 * switch or pinning etc. to get a better estimate. 1308 */ 1309 if (interrupt && ((drain_time = (lbolt - start)) > 0)) 1310 sqp->sq_avg_drain_time = ((80 * sqp->sq_avg_drain_time) + 1311 (20 * (drv_hztousec(drain_time)/total_cnt)))/100; 1312 1313 sqp->sq_state &= ~(SQS_PROC | proc_type); 1314 1315 /* 1316 * If polling was turned on, turn it off and reduce the default 1317 * interrupt blank interval as well to bring new packets in faster 1318 * (reduces the latency when there is no backlog). 1319 */ 1320 if (poll_on && (sqp->sq_state & SQS_POLL_CAPAB)) { 1321 ASSERT(sq_rx_ring != NULL); 1322 SQS_POLLING_OFF(sqp, sq_rx_ring); 1323 } 1324 } 1325 1326 static void 1327 squeue_worker(squeue_t *sqp) 1328 { 1329 kmutex_t *lock = &sqp->sq_lock; 1330 kcondvar_t *async = &sqp->sq_async; 1331 callb_cpr_t cprinfo; 1332 #if SQUEUE_PROFILE 1333 hrtime_t start; 1334 #endif 1335 1336 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "nca"); 1337 mutex_enter(lock); 1338 1339 for (;;) { 1340 while (sqp->sq_first == NULL || (sqp->sq_state & SQS_PROC)) { 1341 CALLB_CPR_SAFE_BEGIN(&cprinfo); 1342 still_wait: 1343 cv_wait(async, lock); 1344 if (sqp->sq_state & SQS_PROC) { 1345 goto still_wait; 1346 } 1347 CALLB_CPR_SAFE_END(&cprinfo, lock); 1348 } 1349 1350 #if SQUEUE_PROFILE 1351 if (SQ_PROFILING(sqp)) { 1352 start = gethrtime(); 1353 } 1354 #endif 1355 1356 ASSERT(squeue_workerdrain_tick != 0); 1357 sqp->sq_run = curthread; 1358 squeue_drain(sqp, SQS_WORKER, lbolt + squeue_workerdrain_tick); 1359 sqp->sq_run = NULL; 1360 1361 if (sqp->sq_first != NULL) { 1362 /* 1363 * Doing too much processing by worker thread 1364 * in presense of interrupts can be sub optimal. 1365 * Instead, once a drain is done by worker thread 1366 * for squeue_writerdrain_ms (the reason we are 1367 * here), we force wait for squeue_workerwait_tick 1368 * before doing more processing even if sq_wait is 1369 * set to 0. 1370 * 1371 * This can be counterproductive for performance 1372 * if worker thread is the only means to process 1373 * the packets (interrupts or writers are not 1374 * allowed inside the squeue). 1375 */ 1376 if (sqp->sq_tid == 0 && 1377 !(sqp->sq_state & SQS_TMO_PROG)) { 1378 timeout_id_t tid; 1379 1380 sqp->sq_state |= SQS_TMO_PROG; 1381 mutex_exit(&sqp->sq_lock); 1382 tid = timeout(squeue_fire, sqp, 1383 squeue_workerwait_tick); 1384 mutex_enter(&sqp->sq_lock); 1385 /* 1386 * Check again if we still need 1387 * the timeout 1388 */ 1389 if (((sqp->sq_state & (SQS_TMO_PROG|SQS_PROC)) 1390 == SQS_TMO_PROG) && (sqp->sq_tid == 0) && 1391 (sqp->sq_first != NULL)) { 1392 sqp->sq_state &= ~SQS_TMO_PROG; 1393 sqp->sq_awaken = lbolt; 1394 sqp->sq_tid = tid; 1395 } else if (sqp->sq_state & SQS_TMO_PROG) { 1396 /* timeout not needed */ 1397 sqp->sq_state &= ~SQS_TMO_PROG; 1398 mutex_exit(&(sqp)->sq_lock); 1399 (void) untimeout(tid); 1400 mutex_enter(&sqp->sq_lock); 1401 } 1402 } 1403 CALLB_CPR_SAFE_BEGIN(&cprinfo); 1404 cv_wait(async, lock); 1405 CALLB_CPR_SAFE_END(&cprinfo, lock); 1406 } 1407 1408 1409 #if SQUEUE_PROFILE 1410 if (SQ_PROFILING(sqp)) { 1411 SQDELTA(sqp, sq_time_worker, gethrtime() - start); 1412 } 1413 #endif 1414 } 1415 } 1416 1417 #if SQUEUE_PROFILE 1418 static int 1419 squeue_kstat_update(kstat_t *ksp, int rw) 1420 { 1421 struct squeue_kstat *sqsp = &squeue_kstat; 1422 squeue_t *sqp = ksp->ks_private; 1423 1424 if (rw == KSTAT_WRITE) 1425 return (EACCES); 1426 1427 #if SQUEUE_DEBUG 1428 sqsp->sq_count.value.ui64 = sqp->sq_count; 1429 sqsp->sq_max_qlen.value.ui64 = sqp->sq_stats.sq_max_qlen; 1430 #endif 1431 sqsp->sq_npackets_worker.value.ui64 = sqp->sq_stats.sq_npackets_worker; 1432 sqsp->sq_npackets_intr.value.ui64 = sqp->sq_stats.sq_npackets_intr; 1433 sqsp->sq_npackets_other.value.ui64 = sqp->sq_stats.sq_npackets_other; 1434 sqsp->sq_nqueued_intr.value.ui64 = sqp->sq_stats.sq_nqueued_intr; 1435 sqsp->sq_nqueued_other.value.ui64 = sqp->sq_stats.sq_nqueued_other; 1436 sqsp->sq_ndrains_worker.value.ui64 = sqp->sq_stats.sq_ndrains_worker; 1437 sqsp->sq_ndrains_intr.value.ui64 = sqp->sq_stats.sq_ndrains_intr; 1438 sqsp->sq_ndrains_other.value.ui64 = sqp->sq_stats.sq_ndrains_other; 1439 sqsp->sq_time_worker.value.ui64 = sqp->sq_stats.sq_time_worker; 1440 sqsp->sq_time_intr.value.ui64 = sqp->sq_stats.sq_time_intr; 1441 sqsp->sq_time_other.value.ui64 = sqp->sq_stats.sq_time_other; 1442 return (0); 1443 } 1444 #endif 1445 1446 void 1447 squeue_profile_enable(squeue_t *sqp) 1448 { 1449 mutex_enter(&sqp->sq_lock); 1450 sqp->sq_state |= SQS_PROFILE; 1451 mutex_exit(&sqp->sq_lock); 1452 } 1453 1454 void 1455 squeue_profile_disable(squeue_t *sqp) 1456 { 1457 mutex_enter(&sqp->sq_lock); 1458 sqp->sq_state &= ~SQS_PROFILE; 1459 mutex_exit(&sqp->sq_lock); 1460 } 1461 1462 void 1463 squeue_profile_reset(squeue_t *sqp) 1464 { 1465 #if SQUEUE_PROFILE 1466 bzero(&sqp->sq_stats, sizeof (sqstat_t)); 1467 #endif 1468 } 1469 1470 void 1471 squeue_profile_start(void) 1472 { 1473 #if SQUEUE_PROFILE 1474 squeue_profile = B_TRUE; 1475 #endif 1476 } 1477 1478 void 1479 squeue_profile_stop(void) 1480 { 1481 #if SQUEUE_PROFILE 1482 squeue_profile = B_FALSE; 1483 #endif 1484 } 1485 1486 uintptr_t * 1487 squeue_getprivate(squeue_t *sqp, sqprivate_t p) 1488 { 1489 ASSERT(p < SQPRIVATE_MAX); 1490 1491 return (&sqp->sq_private[p]); 1492 } 1493 1494 processorid_t 1495 squeue_binding(squeue_t *sqp) 1496 { 1497 return (sqp->sq_bind); 1498 } 1499