1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License, Version 1.0 only 6 * (the "License"). You may not use this file except in compliance 7 * with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or http://www.opensolaris.org/os/licensing. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 /* 23 * Copyright 2005 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 /* 30 * Squeues - TCP/IP serialization mechanism. 31 * 32 * This is a general purpose high-performance serialization mechanism. It is 33 * similar to a taskq with a single worker thread, the difference is that it 34 * does not imply a context switch - the thread placing a request may actually 35 * process it. It is also biased for processing requests in interrupt context. 36 * 37 * Each squeue has a worker thread which may optionally be bound to a CPU. 38 * 39 * Only one thread may process requests from a given squeue at any time. This is 40 * called "entering" squeue. 41 * 42 * Each dispatched request is processed either by 43 * 44 * a) Dispatching thread or 45 * b) Some other thread that is currently processing squeue at the time of 46 * request or 47 * c) worker thread. 48 * 49 * INTERFACES: 50 * 51 * squeue_t *squeue_create(name, bind, wait, pri) 52 * 53 * name: symbolic name for squeue. 54 * wait: time to wait before waiking the worker thread after queueing 55 * request. 56 * bind: preferred CPU binding for the worker thread. 57 * pri: thread priority for the worker thread. 58 * 59 * This function never fails and may sleep. It returns a transparent pointer 60 * to the squeue_t structure that is passed to all other squeue operations. 61 * 62 * void squeue_bind(sqp, bind) 63 * 64 * Bind squeue worker thread to a CPU specified by the 'bind' argument. The 65 * 'bind' value of -1 binds to the preferred thread specified for 66 * squeue_create. 67 * 68 * NOTE: Any value of 'bind' other then -1 is not supported currently, but the 69 * API is present - in the future it may be useful to specify different 70 * binding. 71 * 72 * void squeue_unbind(sqp) 73 * 74 * Unbind the worker thread from its preferred CPU. 75 * 76 * void squeue_enter(*sqp, *mp, proc, arg, tag) 77 * 78 * Post a single request for processing. Each request consists of mblock 'mp', 79 * function 'proc' to execute and an argument 'arg' to pass to this 80 * function. The function is called as (*proc)(arg, mp, sqp); The tag is an 81 * arbitrary number from 0 to 255 which will be stored in mp to track exact 82 * caller of squeue_enter. The combination of function name and the tag should 83 * provide enough information to identify the caller. 84 * 85 * If no one is processing the squeue, squeue_enter() will call the function 86 * immediately. Otherwise it will add the request to the queue for later 87 * processing. Once the function is executed, the thread may continue 88 * executing all other requests pending on the queue. 89 * 90 * NOTE: The tagging information is only used when SQUEUE_DEBUG is set to 1. 91 * NOTE: The argument can be conn_t only. Ideally we'd like to have generic 92 * argument, but we want to drop connection reference count here - this 93 * improves tail-call optimizations. 94 * XXX: The arg should have type conn_t. 95 * 96 * void squeue_enter_nodrain(*sqp, *mp, proc, arg, tag) 97 * 98 * Same as squeue_enter(), but the entering thread will only try to execute a 99 * single request. It will not continue executing any pending requests. 100 * 101 * void squeue_fill(*sqp, *mp, proc, arg, tag) 102 * 103 * Just place the request on the queue without trying to execute it. Arrange 104 * for the worker thread to process the request. 105 * 106 * void squeue_profile_enable(sqp) 107 * void squeue_profile_disable(sqp) 108 * 109 * Enable or disable profiling for specified 'sqp'. Profiling is only 110 * available when SQUEUE_PROFILE is set. 111 * 112 * void squeue_profile_reset(sqp) 113 * 114 * Reset all profiling information to zero. Profiling is only 115 * available when SQUEUE_PROFILE is set. 116 * 117 * void squeue_profile_start() 118 * void squeue_profile_stop() 119 * 120 * Globally enable or disabled profiling for all squeues. 121 * 122 * uintptr_t *squeue_getprivate(sqp, p) 123 * 124 * Each squeue keeps small amount of private data space available for various 125 * consumers. Current consumers include TCP and NCA. Other consumers need to 126 * add their private tag to the sqprivate_t enum. The private information is 127 * limited to an uintptr_t value. The squeue has no knowledge of its content 128 * and does not manage it in any way. 129 * 130 * The typical use may be a breakdown of data structures per CPU (since 131 * squeues are usually per CPU). See NCA for examples of use. 132 * Currently 'p' may have one legal value SQPRIVATE_TCP. 133 * 134 * processorid_t squeue_binding(sqp) 135 * 136 * Returns the CPU binding for a given squeue. 137 * 138 * TUNABALES: 139 * 140 * squeue_intrdrain_ms: Maximum time in ms interrupts spend draining any 141 * squeue. Note that this is approximation - squeues have no control on the 142 * time it takes to process each request. This limit is only checked 143 * between processing individual messages. 144 * Default: 20 ms. 145 * 146 * squeue_writerdrain_ms: Maximum time in ms non-interrupts spend draining any 147 * squeue. Note that this is approximation - squeues have no control on the 148 * time it takes to process each request. This limit is only checked 149 * between processing individual messages. 150 * Default: 10 ms. 151 * 152 * squeue_workerdrain_ms: Maximum time in ms worker thread spends draining any 153 * squeue. Note that this is approximation - squeues have no control on the 154 * time it takes to process each request. This limit is only checked 155 * between processing individual messages. 156 * Default: 10 ms. 157 * 158 * squeue_workerwait_ms: When worker thread is interrupted because workerdrain 159 * expired, how much time to wait before waking worker thread again. 160 * Default: 10 ms. 161 * 162 * DEFINES: 163 * 164 * SQUEUE_DEBUG: If defined as 1, special code is compiled in which records 165 * additional information aiding debugging is recorded in squeue. 166 * 167 * SQUEUE_PROFILE: If defined as 1, special code is compiled in which collects 168 * various squeue statistics and exports them as kstats. 169 * 170 * Ideally we would like both SQUEUE_DEBUG and SQUEUE_PROFILE to be always set, 171 * but it affects performance, so they are enabled on DEBUG kernels and disabled 172 * on non-DEBUG by default. 173 */ 174 175 #include <sys/types.h> 176 #include <sys/cmn_err.h> 177 #include <sys/debug.h> 178 #include <sys/kmem.h> 179 #include <sys/cpuvar.h> 180 #include <sys/condvar_impl.h> 181 #include <sys/systm.h> 182 #include <sys/callb.h> 183 #include <sys/sdt.h> 184 #include <sys/ddi.h> 185 186 #include <inet/ipclassifier.h> 187 188 /* 189 * State flags. 190 * Note: The MDB IP module depends on the values of these flags. 191 */ 192 #define SQS_PROC 0x0001 /* being processed */ 193 #define SQS_WORKER 0x0002 /* worker thread */ 194 #define SQS_ENTER 0x0004 /* enter thread */ 195 #define SQS_FAST 0x0008 /* enter-fast thread */ 196 #define SQS_USER 0x0010 /* A non interrupt user */ 197 #define SQS_BOUND 0x0020 /* Worker thread is bound */ 198 #define SQS_PROFILE 0x0040 /* Enable profiling */ 199 #define SQS_REENTER 0x0080 /* Re entered thread */ 200 #define SQS_TMO_PROG 0x0100 /* Timeout is being set */ 201 202 #ifdef DEBUG 203 #define SQUEUE_DEBUG 1 204 #define SQUEUE_PROFILE 1 205 #else 206 #define SQUEUE_DEBUG 0 207 #define SQUEUE_PROFILE 0 208 #endif 209 210 #include <sys/squeue_impl.h> 211 212 static void squeue_fire(void *); 213 static void squeue_drain(squeue_t *, uint_t, clock_t); 214 static void squeue_worker(squeue_t *sqp); 215 216 #if SQUEUE_PROFILE 217 static kmutex_t squeue_kstat_lock; 218 static int squeue_kstat_update(kstat_t *, int); 219 #endif 220 221 kmem_cache_t *squeue_cache; 222 223 int squeue_intrdrain_ms = 20; 224 int squeue_writerdrain_ms = 10; 225 int squeue_workerdrain_ms = 10; 226 int squeue_workerwait_ms = 10; 227 228 /* The values above converted to ticks */ 229 static int squeue_intrdrain_tick = 0; 230 static int squeue_writerdrain_tick = 0; 231 static int squeue_workerdrain_tick = 0; 232 static int squeue_workerwait_tick = 0; 233 234 /* 235 * The minimum packet queued when worker thread doing the drain triggers 236 * polling (if squeue allows it). The choice of 3 is arbitrary. You 237 * definitely don't want it to be 1 since that will trigger polling 238 * on very low loads as well (ssh seems to do be one such example 239 * where packet flow was very low yet somehow 1 packet ended up getting 240 * queued and worker thread fires every 10ms and blanking also gets 241 * triggered. 242 */ 243 int squeue_worker_poll_min = 3; 244 245 #if SQUEUE_PROFILE 246 /* 247 * Set to B_TRUE to enable profiling. 248 */ 249 static int squeue_profile = B_FALSE; 250 #define SQ_PROFILING(sqp) (squeue_profile && ((sqp)->sq_state & SQS_PROFILE)) 251 252 #define SQSTAT(sqp, x) ((sqp)->sq_stats.x++) 253 #define SQDELTA(sqp, x, d) ((sqp)->sq_stats.x += (d)) 254 255 struct squeue_kstat { 256 kstat_named_t sq_count; 257 kstat_named_t sq_max_qlen; 258 kstat_named_t sq_npackets_worker; 259 kstat_named_t sq_npackets_intr; 260 kstat_named_t sq_npackets_other; 261 kstat_named_t sq_nqueued_intr; 262 kstat_named_t sq_nqueued_other; 263 kstat_named_t sq_ndrains_worker; 264 kstat_named_t sq_ndrains_intr; 265 kstat_named_t sq_ndrains_other; 266 kstat_named_t sq_time_worker; 267 kstat_named_t sq_time_intr; 268 kstat_named_t sq_time_other; 269 } squeue_kstat = { 270 { "count", KSTAT_DATA_UINT64 }, 271 { "max_qlen", KSTAT_DATA_UINT64 }, 272 { "packets_worker", KSTAT_DATA_UINT64 }, 273 { "packets_intr", KSTAT_DATA_UINT64 }, 274 { "packets_other", KSTAT_DATA_UINT64 }, 275 { "queued_intr", KSTAT_DATA_UINT64 }, 276 { "queued_other", KSTAT_DATA_UINT64 }, 277 { "ndrains_worker", KSTAT_DATA_UINT64 }, 278 { "ndrains_intr", KSTAT_DATA_UINT64 }, 279 { "ndrains_other", KSTAT_DATA_UINT64 }, 280 { "time_worker", KSTAT_DATA_UINT64 }, 281 { "time_intr", KSTAT_DATA_UINT64 }, 282 { "time_other", KSTAT_DATA_UINT64 }, 283 }; 284 #endif 285 286 #define SQUEUE_WORKER_WAKEUP(sqp) { \ 287 timeout_id_t tid = (sqp)->sq_tid; \ 288 \ 289 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 290 /* \ 291 * Queue isn't being processed, so take \ 292 * any post enqueue actions needed before leaving. \ 293 */ \ 294 if (tid != 0) { \ 295 /* \ 296 * Waiting for an enter() to process mblk(s). \ 297 */ \ 298 clock_t waited = lbolt - (sqp)->sq_awaken; \ 299 \ 300 if (TICK_TO_MSEC(waited) >= (sqp)->sq_wait) { \ 301 /* \ 302 * Times up and have a worker thread \ 303 * waiting for work, so schedule it. \ 304 */ \ 305 (sqp)->sq_tid = 0; \ 306 (sqp)->sq_awaken = lbolt; \ 307 cv_signal(&(sqp)->sq_async); \ 308 mutex_exit(&(sqp)->sq_lock); \ 309 (void) untimeout(tid); \ 310 return; \ 311 } \ 312 mutex_exit(&(sqp)->sq_lock); \ 313 return; \ 314 } else if ((sqp)->sq_state & SQS_TMO_PROG) { \ 315 mutex_exit(&(sqp)->sq_lock); \ 316 return; \ 317 } else if ((sqp)->sq_wait != 0) { \ 318 clock_t wait = (sqp)->sq_wait; \ 319 /* \ 320 * Wait up to sqp->sq_wait ms for an \ 321 * enter() to process this queue. We \ 322 * don't want to contend on timeout locks \ 323 * with sq_lock held for performance reasons, \ 324 * so drop the sq_lock before calling timeout \ 325 * but we need to check if timeout is required \ 326 * after re acquiring the sq_lock. Once \ 327 * the sq_lock is dropped, someone else could \ 328 * have processed the packet or the timeout could \ 329 * have already fired. \ 330 */ \ 331 (sqp)->sq_state |= SQS_TMO_PROG; \ 332 mutex_exit(&(sqp)->sq_lock); \ 333 tid = timeout(squeue_fire, (sqp), wait); \ 334 mutex_enter(&(sqp)->sq_lock); \ 335 /* Check again if we still need the timeout */ \ 336 if ((((sqp)->sq_state & (SQS_PROC|SQS_TMO_PROG)) == \ 337 SQS_TMO_PROG) && ((sqp)->sq_tid == 0) && \ 338 ((sqp)->sq_first != NULL)) { \ 339 (sqp)->sq_state &= ~SQS_TMO_PROG; \ 340 (sqp)->sq_awaken = lbolt; \ 341 (sqp)->sq_tid = tid; \ 342 mutex_exit(&(sqp)->sq_lock); \ 343 return; \ 344 } else { \ 345 if ((sqp)->sq_state & SQS_TMO_PROG) { \ 346 (sqp)->sq_state &= ~SQS_TMO_PROG; \ 347 mutex_exit(&(sqp)->sq_lock); \ 348 (void) untimeout(tid); \ 349 } else { \ 350 /* \ 351 * The timer fired before we could \ 352 * reacquire the sq_lock. squeue_fire \ 353 * removes the SQS_TMO_PROG flag \ 354 * and we don't need to do anything \ 355 * else. \ 356 */ \ 357 mutex_exit(&(sqp)->sq_lock); \ 358 } \ 359 } \ 360 } else { \ 361 /* \ 362 * Schedule the worker thread. \ 363 */ \ 364 (sqp)->sq_awaken = lbolt; \ 365 cv_signal(&(sqp)->sq_async); \ 366 mutex_exit(&(sqp)->sq_lock); \ 367 } \ 368 ASSERT(MUTEX_NOT_HELD(&(sqp)->sq_lock)); \ 369 } 370 371 #define ENQUEUE_MP(sqp, mp, proc, arg) { \ 372 /* \ 373 * Enque our mblk. \ 374 */ \ 375 (mp)->b_queue = NULL; \ 376 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 377 ASSERT((mp)->b_prev == NULL && (mp)->b_next == NULL); \ 378 (mp)->b_queue = (queue_t *)(proc); \ 379 (mp)->b_prev = (mblk_t *)(arg); \ 380 \ 381 if ((sqp)->sq_last != NULL) \ 382 (sqp)->sq_last->b_next = (mp); \ 383 else \ 384 (sqp)->sq_first = (mp); \ 385 (sqp)->sq_last = (mp); \ 386 (sqp)->sq_count++; \ 387 ASSERT((sqp)->sq_count > 0); \ 388 DTRACE_PROBE2(squeue__enqueue, squeue_t *, sqp, \ 389 mblk_t *, mp); \ 390 } 391 392 393 #define ENQUEUE_CHAIN(sqp, mp, tail, cnt) { \ 394 /* \ 395 * Enqueue our mblk chain. \ 396 */ \ 397 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 398 \ 399 if ((sqp)->sq_last != NULL) \ 400 (sqp)->sq_last->b_next = (mp); \ 401 else \ 402 (sqp)->sq_first = (mp); \ 403 (sqp)->sq_last = (tail); \ 404 (sqp)->sq_count += (cnt); \ 405 ASSERT((sqp)->sq_count > 0); \ 406 DTRACE_PROBE4(squeue__enqueuechain, squeue_t *, sqp, \ 407 mblk_t *, mp, mblk_t *, tail, int, cnt); \ 408 \ 409 } 410 411 #define SQS_POLLING_ON(sqp, rx_ring) { \ 412 ASSERT(rx_ring != NULL); \ 413 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 414 rx_ring->rr_blank(rx_ring->rr_handle, \ 415 MIN((sqp->sq_avg_drain_time * sqp->sq_count), \ 416 rx_ring->rr_max_blank_time), \ 417 rx_ring->rr_max_pkt_cnt); \ 418 rx_ring->rr_poll_state |= ILL_POLLING; \ 419 rx_ring->rr_poll_time = lbolt; \ 420 } 421 422 423 #define SQS_POLLING_OFF(sqp, rx_ring) { \ 424 ASSERT(rx_ring != NULL); \ 425 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 426 rx_ring->rr_blank(rx_ring->rr_handle, \ 427 rx_ring->rr_min_blank_time, \ 428 rx_ring->rr_min_pkt_cnt); \ 429 } 430 431 void 432 squeue_init(void) 433 { 434 squeue_cache = kmem_cache_create("squeue_cache", 435 sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0); 436 437 squeue_intrdrain_tick = MSEC_TO_TICK_ROUNDUP(squeue_intrdrain_ms); 438 squeue_writerdrain_tick = MSEC_TO_TICK_ROUNDUP(squeue_writerdrain_ms); 439 squeue_workerdrain_tick = MSEC_TO_TICK_ROUNDUP(squeue_workerdrain_ms); 440 squeue_workerwait_tick = MSEC_TO_TICK_ROUNDUP(squeue_workerwait_ms); 441 } 442 443 /* ARGSUSED */ 444 squeue_t * 445 squeue_create(char *name, processorid_t bind, clock_t wait, pri_t pri) 446 { 447 squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP); 448 449 bzero(sqp, sizeof (squeue_t)); 450 (void) strncpy(sqp->sq_name, name, SQ_NAMELEN + 1); 451 sqp->sq_name[SQ_NAMELEN] = '\0'; 452 453 sqp->sq_bind = bind; 454 sqp->sq_wait = MSEC_TO_TICK(wait); 455 sqp->sq_avg_drain_time = 456 drv_hztousec(squeue_intrdrain_tick)/squeue_intrdrain_tick; 457 458 #if SQUEUE_PROFILE 459 if ((sqp->sq_kstat = kstat_create("ip", bind, name, 460 "net", KSTAT_TYPE_NAMED, 461 sizeof (squeue_kstat) / sizeof (kstat_named_t), 462 KSTAT_FLAG_VIRTUAL)) != NULL) { 463 sqp->sq_kstat->ks_lock = &squeue_kstat_lock; 464 sqp->sq_kstat->ks_data = &squeue_kstat; 465 sqp->sq_kstat->ks_update = squeue_kstat_update; 466 sqp->sq_kstat->ks_private = sqp; 467 kstat_install(sqp->sq_kstat); 468 } 469 #endif 470 471 sqp->sq_worker = thread_create(NULL, 0, squeue_worker, 472 sqp, 0, &p0, TS_RUN, pri); 473 474 return (sqp); 475 } 476 477 /* ARGSUSED */ 478 void 479 squeue_bind(squeue_t *sqp, processorid_t bind) 480 { 481 ASSERT(bind == -1); 482 483 mutex_enter(&sqp->sq_lock); 484 if (sqp->sq_state & SQS_BOUND) { 485 mutex_exit(&sqp->sq_lock); 486 return; 487 } 488 489 sqp->sq_state |= SQS_BOUND; 490 mutex_exit(&sqp->sq_lock); 491 492 thread_affinity_set(sqp->sq_worker, sqp->sq_bind); 493 } 494 495 void 496 squeue_unbind(squeue_t *sqp) 497 { 498 mutex_enter(&sqp->sq_lock); 499 if (!(sqp->sq_state & SQS_BOUND)) { 500 mutex_exit(&sqp->sq_lock); 501 return; 502 } 503 504 sqp->sq_state &= ~SQS_BOUND; 505 mutex_exit(&sqp->sq_lock); 506 507 thread_affinity_clear(sqp->sq_worker); 508 } 509 510 /* 511 * squeue_enter() - enter squeue sqp with mblk mp (which can be 512 * a chain), while tail points to the end and cnt in number of 513 * mblks in the chain. 514 * 515 * For a chain of single packet (i.e. mp == tail), go through the 516 * fast path if no one is processing the squeue and nothing is queued. 517 * 518 * The proc and arg for each mblk is already stored in the mblk in 519 * appropriate places. 520 */ 521 void 522 squeue_enter_chain(squeue_t *sqp, mblk_t *mp, mblk_t *tail, 523 uint32_t cnt, uint8_t tag) 524 { 525 int interrupt = servicing_interrupt(); 526 void *arg; 527 sqproc_t proc; 528 #if SQUEUE_PROFILE 529 hrtime_t start, delta; 530 #endif 531 532 ASSERT(sqp != NULL); 533 ASSERT(mp != NULL); 534 ASSERT(tail != NULL); 535 ASSERT(cnt > 0); 536 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 537 538 mutex_enter(&sqp->sq_lock); 539 if (!(sqp->sq_state & SQS_PROC)) { 540 /* 541 * See if anything is already queued. If we are the 542 * first packet, do inline processing else queue the 543 * packet and do the drain. 544 */ 545 sqp->sq_run = curthread; 546 if (sqp->sq_first == NULL && cnt == 1) { 547 /* 548 * Fast-path, ok to process and nothing queued. 549 */ 550 sqp->sq_state |= (SQS_PROC|SQS_FAST); 551 mutex_exit(&sqp->sq_lock); 552 553 /* 554 * We are the chain of 1 packet so 555 * go through this fast path. 556 */ 557 arg = mp->b_prev; 558 mp->b_prev = NULL; 559 proc = (sqproc_t)mp->b_queue; 560 mp->b_queue = NULL; 561 562 ASSERT(proc != NULL); 563 ASSERT(arg != NULL); 564 ASSERT(mp->b_next == NULL); 565 566 #if SQUEUE_DEBUG 567 sqp->sq_isintr = interrupt; 568 sqp->sq_curmp = mp; 569 sqp->sq_curproc = proc; 570 sqp->sq_connp = arg; 571 mp->b_tag = sqp->sq_tag = tag; 572 #endif 573 #if SQUEUE_PROFILE 574 if (SQ_PROFILING(sqp)) { 575 if (interrupt) 576 SQSTAT(sqp, sq_npackets_intr); 577 else 578 SQSTAT(sqp, sq_npackets_other); 579 start = gethrtime(); 580 } 581 #endif 582 ((conn_t *)arg)->conn_on_sqp = B_TRUE; 583 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 584 sqp, mblk_t *, mp, conn_t *, arg); 585 (*proc)(arg, mp, sqp); 586 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 587 sqp, conn_t *, arg); 588 ((conn_t *)arg)->conn_on_sqp = B_FALSE; 589 590 #if SQUEUE_PROFILE 591 if (SQ_PROFILING(sqp)) { 592 delta = gethrtime() - start; 593 if (interrupt) 594 SQDELTA(sqp, sq_time_intr, delta); 595 else 596 SQDELTA(sqp, sq_time_other, delta); 597 } 598 #endif 599 #if SQUEUE_DEBUG 600 sqp->sq_curmp = NULL; 601 sqp->sq_curproc = NULL; 602 sqp->sq_connp = NULL; 603 sqp->sq_isintr = 0; 604 #endif 605 606 CONN_DEC_REF((conn_t *)arg); 607 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 608 mutex_enter(&sqp->sq_lock); 609 sqp->sq_state &= ~(SQS_PROC|SQS_FAST); 610 if (sqp->sq_first == NULL) { 611 /* 612 * We processed inline our packet and 613 * nothing new has arrived. We are done. 614 */ 615 sqp->sq_run = NULL; 616 mutex_exit(&sqp->sq_lock); 617 return; 618 } else if (sqp->sq_bind != CPU->cpu_id) { 619 /* 620 * If the current thread is not running 621 * on the CPU to which this squeue is bound, 622 * then don't allow it to drain. 623 */ 624 sqp->sq_run = NULL; 625 SQUEUE_WORKER_WAKEUP(sqp); 626 return; 627 } 628 } else { 629 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 630 #if SQUEUE_DEBUG 631 mp->b_tag = tag; 632 #endif 633 #if SQUEUE_PROFILE 634 if (SQ_PROFILING(sqp)) { 635 if (servicing_interrupt()) 636 SQSTAT(sqp, sq_nqueued_intr); 637 else 638 SQSTAT(sqp, sq_nqueued_other); 639 if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) 640 sqp->sq_stats.sq_max_qlen = 641 sqp->sq_count; 642 } 643 #endif 644 } 645 646 /* 647 * We are here because either we couldn't do inline 648 * processing (because something was already queued), 649 * or we had a chanin of more than one packet, 650 * or something else arrived after we were done with 651 * inline processing. 652 */ 653 ASSERT(MUTEX_HELD(&sqp->sq_lock)); 654 ASSERT(sqp->sq_first != NULL); 655 656 #if SQUEUE_PROFILE 657 if (SQ_PROFILING(sqp)) { 658 start = gethrtime(); 659 } 660 #endif 661 #if SQUEUE_DEBUG 662 sqp->sq_isintr = interrupt; 663 #endif 664 665 if (interrupt) { 666 squeue_drain(sqp, SQS_ENTER, lbolt + 667 squeue_intrdrain_tick); 668 } else { 669 squeue_drain(sqp, SQS_USER, lbolt + 670 squeue_writerdrain_tick); 671 } 672 673 #if SQUEUE_PROFILE 674 if (SQ_PROFILING(sqp)) { 675 delta = gethrtime() - start; 676 if (interrupt) 677 SQDELTA(sqp, sq_time_intr, delta); 678 else 679 SQDELTA(sqp, sq_time_other, delta); 680 } 681 #endif 682 #if SQUEUE_DEBUG 683 sqp->sq_isintr = 0; 684 #endif 685 686 /* 687 * If we didn't do a complete drain, the worker 688 * thread was already signalled by squeue_drain. 689 */ 690 sqp->sq_run = NULL; 691 mutex_exit(&sqp->sq_lock); 692 return; 693 } else { 694 ASSERT(sqp->sq_run != NULL); 695 /* 696 * Queue is already being processed. Just enqueue 697 * the packet and go away. 698 */ 699 #if SQUEUE_DEBUG 700 mp->b_tag = tag; 701 #endif 702 #if SQUEUE_PROFILE 703 if (SQ_PROFILING(sqp)) { 704 if (servicing_interrupt()) 705 SQSTAT(sqp, sq_nqueued_intr); 706 else 707 SQSTAT(sqp, sq_nqueued_other); 708 if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) 709 sqp->sq_stats.sq_max_qlen = sqp->sq_count; 710 } 711 #endif 712 713 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 714 mutex_exit(&sqp->sq_lock); 715 return; 716 } 717 } 718 719 /* 720 * squeue_enter() - enter squeue *sqp with mblk *mp with argument of *arg. 721 */ 722 void 723 squeue_enter(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void *arg, 724 uint8_t tag) 725 { 726 int interrupt = servicing_interrupt(); 727 #if SQUEUE_PROFILE 728 hrtime_t start, delta; 729 #endif 730 #if SQUEUE_DEBUG 731 conn_t *connp = (conn_t *)arg; 732 ASSERT(connp->conn_tcp->tcp_connp == connp); 733 #endif 734 735 ASSERT(proc != NULL); 736 ASSERT(sqp != NULL); 737 ASSERT(mp != NULL); 738 ASSERT(mp->b_next == NULL); 739 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 740 741 mutex_enter(&sqp->sq_lock); 742 if (!(sqp->sq_state & SQS_PROC)) { 743 /* 744 * See if anything is already queued. If we are the 745 * first packet, do inline processing else queue the 746 * packet and do the drain. 747 */ 748 sqp->sq_run = curthread; 749 if (sqp->sq_first == NULL) { 750 /* 751 * Fast-path, ok to process and nothing queued. 752 */ 753 sqp->sq_state |= (SQS_PROC|SQS_FAST); 754 mutex_exit(&sqp->sq_lock); 755 756 #if SQUEUE_DEBUG 757 sqp->sq_isintr = interrupt; 758 sqp->sq_curmp = mp; 759 sqp->sq_curproc = proc; 760 sqp->sq_connp = connp; 761 mp->b_tag = sqp->sq_tag = tag; 762 #endif 763 #if SQUEUE_PROFILE 764 if (SQ_PROFILING(sqp)) { 765 if (interrupt) 766 SQSTAT(sqp, sq_npackets_intr); 767 else 768 SQSTAT(sqp, sq_npackets_other); 769 start = gethrtime(); 770 } 771 #endif 772 ((conn_t *)arg)->conn_on_sqp = B_TRUE; 773 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 774 sqp, mblk_t *, mp, conn_t *, arg); 775 (*proc)(arg, mp, sqp); 776 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 777 sqp, conn_t *, arg); 778 ((conn_t *)arg)->conn_on_sqp = B_FALSE; 779 780 #if SQUEUE_PROFILE 781 if (SQ_PROFILING(sqp)) { 782 delta = gethrtime() - start; 783 if (interrupt) 784 SQDELTA(sqp, sq_time_intr, delta); 785 else 786 SQDELTA(sqp, sq_time_other, delta); 787 } 788 #endif 789 #if SQUEUE_DEBUG 790 sqp->sq_curmp = NULL; 791 sqp->sq_curproc = NULL; 792 sqp->sq_connp = NULL; 793 sqp->sq_isintr = 0; 794 #endif 795 796 CONN_DEC_REF((conn_t *)arg); 797 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 798 mutex_enter(&sqp->sq_lock); 799 sqp->sq_state &= ~(SQS_PROC|SQS_FAST); 800 if (sqp->sq_first == NULL) { 801 /* 802 * We processed inline our packet and 803 * nothing new has arrived. We are done. 804 */ 805 sqp->sq_run = NULL; 806 mutex_exit(&sqp->sq_lock); 807 return; 808 } else if (sqp->sq_bind != CPU->cpu_id) { 809 /* 810 * If the current thread is not running 811 * on the CPU to which this squeue is bound, 812 * then don't allow it to drain. 813 */ 814 sqp->sq_run = NULL; 815 SQUEUE_WORKER_WAKEUP(sqp); 816 return; 817 } 818 } else { 819 ENQUEUE_MP(sqp, mp, proc, arg); 820 #if SQUEUE_DEBUG 821 mp->b_tag = tag; 822 #endif 823 #if SQUEUE_PROFILE 824 if (SQ_PROFILING(sqp)) { 825 if (servicing_interrupt()) 826 SQSTAT(sqp, sq_nqueued_intr); 827 else 828 SQSTAT(sqp, sq_nqueued_other); 829 if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) 830 sqp->sq_stats.sq_max_qlen = 831 sqp->sq_count; 832 } 833 #endif 834 } 835 836 /* 837 * We are here because either we couldn't do inline 838 * processing (because something was already queued) 839 * or something else arrived after we were done with 840 * inline processing. 841 */ 842 ASSERT(MUTEX_HELD(&sqp->sq_lock)); 843 ASSERT(sqp->sq_first != NULL); 844 845 #if SQUEUE_PROFILE 846 if (SQ_PROFILING(sqp)) { 847 start = gethrtime(); 848 } 849 #endif 850 #if SQUEUE_DEBUG 851 sqp->sq_isintr = interrupt; 852 #endif 853 854 if (interrupt) { 855 squeue_drain(sqp, SQS_ENTER, lbolt + 856 squeue_intrdrain_tick); 857 } else { 858 squeue_drain(sqp, SQS_USER, lbolt + 859 squeue_writerdrain_tick); 860 } 861 862 #if SQUEUE_PROFILE 863 if (SQ_PROFILING(sqp)) { 864 delta = gethrtime() - start; 865 if (interrupt) 866 SQDELTA(sqp, sq_time_intr, delta); 867 else 868 SQDELTA(sqp, sq_time_other, delta); 869 } 870 #endif 871 #if SQUEUE_DEBUG 872 sqp->sq_isintr = 0; 873 #endif 874 875 /* 876 * If we didn't do a complete drain, the worker 877 * thread was already signalled by squeue_drain. 878 */ 879 sqp->sq_run = NULL; 880 mutex_exit(&sqp->sq_lock); 881 return; 882 } else { 883 ASSERT(sqp->sq_run != NULL); 884 /* 885 * We let a thread processing a squeue reenter only 886 * once. This helps the case of incoming connection 887 * where a SYN-ACK-ACK that triggers the conn_ind 888 * doesn't have to queue the packet if listener and 889 * eager are on the same squeue. Also helps the 890 * loopback connection where the two ends are bound 891 * to the same squeue (which is typical on single 892 * CPU machines). 893 * We let the thread reenter only once for the fear 894 * of stack getting blown with multiple traversal. 895 */ 896 if (!(sqp->sq_state & SQS_REENTER) && 897 (sqp->sq_run == curthread) && 898 (((conn_t *)arg)->conn_on_sqp == B_FALSE)) { 899 sqp->sq_state |= SQS_REENTER; 900 mutex_exit(&sqp->sq_lock); 901 902 ((conn_t *)arg)->conn_on_sqp = B_TRUE; 903 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 904 sqp, mblk_t *, mp, conn_t *, arg); 905 (*proc)(arg, mp, sqp); 906 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 907 sqp, conn_t *, arg); 908 ((conn_t *)arg)->conn_on_sqp = B_FALSE; 909 CONN_DEC_REF((conn_t *)arg); 910 911 mutex_enter(&sqp->sq_lock); 912 sqp->sq_state &= ~SQS_REENTER; 913 mutex_exit(&sqp->sq_lock); 914 return; 915 } 916 /* 917 * Queue is already being processed. Just enqueue 918 * the packet and go away. 919 */ 920 #if SQUEUE_DEBUG 921 mp->b_tag = tag; 922 #endif 923 #if SQUEUE_PROFILE 924 if (SQ_PROFILING(sqp)) { 925 if (servicing_interrupt()) 926 SQSTAT(sqp, sq_nqueued_intr); 927 else 928 SQSTAT(sqp, sq_nqueued_other); 929 if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) 930 sqp->sq_stats.sq_max_qlen = sqp->sq_count; 931 } 932 #endif 933 934 ENQUEUE_MP(sqp, mp, proc, arg); 935 mutex_exit(&sqp->sq_lock); 936 return; 937 } 938 } 939 940 void 941 squeue_enter_nodrain(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void *arg, 942 uint8_t tag) 943 { 944 int interrupt = servicing_interrupt(); 945 boolean_t being_processed; 946 #if SQUEUE_DEBUG 947 conn_t *connp = (conn_t *)arg; 948 #endif 949 #if SQUEUE_PROFILE 950 hrtime_t start, delta; 951 #endif 952 953 ASSERT(proc != NULL); 954 ASSERT(sqp != NULL); 955 ASSERT(mp != NULL); 956 ASSERT(mp->b_next == NULL); 957 ASSERT(connp->conn_tcp->tcp_connp == connp); 958 959 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 960 mutex_enter(&sqp->sq_lock); 961 962 being_processed = (sqp->sq_state & SQS_PROC); 963 if (!being_processed && (sqp->sq_first == NULL)) { 964 /* 965 * Fast-path, ok to process and nothing queued. 966 */ 967 sqp->sq_state |= (SQS_PROC|SQS_FAST); 968 sqp->sq_run = curthread; 969 mutex_exit(&sqp->sq_lock); 970 971 #if SQUEUE_DEBUG 972 sqp->sq_isintr = interrupt; 973 sqp->sq_curmp = mp; 974 sqp->sq_curproc = proc; 975 sqp->sq_connp = connp; 976 mp->b_tag = sqp->sq_tag = tag; 977 #endif 978 979 #if SQUEUE_PROFILE 980 if (SQ_PROFILING(sqp)) { 981 if (interrupt) 982 SQSTAT(sqp, sq_npackets_intr); 983 else 984 SQSTAT(sqp, sq_npackets_other); 985 start = gethrtime(); 986 } 987 #endif 988 989 ((conn_t *)arg)->conn_on_sqp = B_TRUE; 990 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 991 sqp, mblk_t *, mp, conn_t *, arg); 992 (*proc)(arg, mp, sqp); 993 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 994 sqp, conn_t *, arg); 995 ((conn_t *)arg)->conn_on_sqp = B_FALSE; 996 997 #if SQUEUE_DEBUG 998 sqp->sq_curmp = NULL; 999 sqp->sq_curproc = NULL; 1000 sqp->sq_connp = NULL; 1001 sqp->sq_isintr = 0; 1002 #endif 1003 #if SQUEUE_PROFILE 1004 if (SQ_PROFILING(sqp)) { 1005 delta = gethrtime() - start; 1006 if (interrupt) 1007 SQDELTA(sqp, sq_time_intr, delta); 1008 else 1009 SQDELTA(sqp, sq_time_other, delta); 1010 } 1011 #endif 1012 1013 CONN_DEC_REF((conn_t *)arg); 1014 mutex_enter(&sqp->sq_lock); 1015 sqp->sq_state &= ~(SQS_PROC|SQS_FAST); 1016 sqp->sq_run = NULL; 1017 if (sqp->sq_first == NULL) { 1018 /* 1019 * We processed inline our packet and 1020 * nothing new has arrived. We are done. 1021 */ 1022 mutex_exit(&sqp->sq_lock); 1023 } else { 1024 SQUEUE_WORKER_WAKEUP(sqp); 1025 } 1026 return; 1027 } else { 1028 /* 1029 * We let a thread processing a squeue reenter only 1030 * once. This helps the case of incoming connection 1031 * where a SYN-ACK-ACK that triggers the conn_ind 1032 * doesn't have to queue the packet if listener and 1033 * eager are on the same squeue. Also helps the 1034 * loopback connection where the two ends are bound 1035 * to the same squeue (which is typical on single 1036 * CPU machines). 1037 * We let the thread reenter only once for the fear 1038 * of stack getting blown with multiple traversal. 1039 */ 1040 if (being_processed && !(sqp->sq_state & SQS_REENTER) && 1041 (sqp->sq_run == curthread) && 1042 (((conn_t *)arg)->conn_on_sqp == B_FALSE)) { 1043 sqp->sq_state |= SQS_REENTER; 1044 mutex_exit(&sqp->sq_lock); 1045 1046 ((conn_t *)arg)->conn_on_sqp = B_TRUE; 1047 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 1048 sqp, mblk_t *, mp, conn_t *, arg); 1049 (*proc)(arg, mp, sqp); 1050 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 1051 sqp, conn_t *, arg); 1052 ((conn_t *)arg)->conn_on_sqp = B_FALSE; 1053 CONN_DEC_REF((conn_t *)arg); 1054 1055 mutex_enter(&sqp->sq_lock); 1056 sqp->sq_state &= ~SQS_REENTER; 1057 mutex_exit(&sqp->sq_lock); 1058 return; 1059 } 1060 1061 #if SQUEUE_DEBUG 1062 mp->b_tag = tag; 1063 #endif 1064 #if SQUEUE_PROFILE 1065 if (SQ_PROFILING(sqp)) { 1066 if (servicing_interrupt()) 1067 SQSTAT(sqp, sq_nqueued_intr); 1068 else 1069 SQSTAT(sqp, sq_nqueued_other); 1070 if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) 1071 sqp->sq_stats.sq_max_qlen = sqp->sq_count; 1072 } 1073 #endif 1074 ENQUEUE_MP(sqp, mp, proc, arg); 1075 if (being_processed) { 1076 /* 1077 * Queue is already being processed. 1078 * No need to do anything. 1079 */ 1080 mutex_exit(&sqp->sq_lock); 1081 return; 1082 } 1083 SQUEUE_WORKER_WAKEUP(sqp); 1084 } 1085 } 1086 1087 /* 1088 * squeue_fill() - fill squeue *sqp with mblk *mp with argument of *arg 1089 * without processing the squeue. 1090 */ 1091 /* ARGSUSED */ 1092 void 1093 squeue_fill(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void * arg, 1094 uint8_t tag) 1095 { 1096 #if SQUEUE_DEBUG 1097 conn_t *connp = (conn_t *)arg; 1098 #endif 1099 ASSERT(proc != NULL); 1100 ASSERT(sqp != NULL); 1101 ASSERT(mp != NULL); 1102 ASSERT(mp->b_next == NULL); 1103 ASSERT(connp->conn_tcp->tcp_connp == connp); 1104 1105 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 1106 mutex_enter(&sqp->sq_lock); 1107 ENQUEUE_MP(sqp, mp, proc, arg); 1108 #if SQUEUE_DEBUG 1109 mp->b_tag = tag; 1110 #endif 1111 #if SQUEUE_PROFILE 1112 if (SQ_PROFILING(sqp)) { 1113 if (servicing_interrupt()) 1114 SQSTAT(sqp, sq_nqueued_intr); 1115 else 1116 SQSTAT(sqp, sq_nqueued_other); 1117 if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) 1118 sqp->sq_stats.sq_max_qlen = sqp->sq_count; 1119 } 1120 #endif 1121 1122 /* 1123 * If queue is already being processed. No need to do anything. 1124 */ 1125 if (sqp->sq_state & SQS_PROC) { 1126 mutex_exit(&sqp->sq_lock); 1127 return; 1128 } 1129 1130 SQUEUE_WORKER_WAKEUP(sqp); 1131 } 1132 1133 1134 /* 1135 * PRIVATE FUNCTIONS 1136 */ 1137 1138 static void 1139 squeue_fire(void *arg) 1140 { 1141 squeue_t *sqp = arg; 1142 uint_t state; 1143 1144 mutex_enter(&sqp->sq_lock); 1145 1146 state = sqp->sq_state; 1147 if (sqp->sq_tid == 0 && !(state & SQS_TMO_PROG)) { 1148 mutex_exit(&sqp->sq_lock); 1149 return; 1150 } 1151 1152 sqp->sq_tid = 0; 1153 /* 1154 * The timeout fired before we got a chance to set it. 1155 * Process it anyway but remove the SQS_TMO_PROG so that 1156 * the guy trying to set the timeout knows that it has 1157 * already been processed. 1158 */ 1159 if (state & SQS_TMO_PROG) 1160 sqp->sq_state &= ~SQS_TMO_PROG; 1161 1162 if (!(state & SQS_PROC)) { 1163 sqp->sq_awaken = lbolt; 1164 cv_signal(&sqp->sq_async); 1165 } 1166 mutex_exit(&sqp->sq_lock); 1167 } 1168 1169 static void 1170 squeue_drain(squeue_t *sqp, uint_t proc_type, clock_t expire) 1171 { 1172 mblk_t *mp; 1173 mblk_t *head; 1174 sqproc_t proc; 1175 conn_t *connp; 1176 clock_t start = lbolt; 1177 clock_t drain_time; 1178 timeout_id_t tid; 1179 uint_t cnt; 1180 uint_t total_cnt = 0; 1181 ill_rx_ring_t *sq_rx_ring = sqp->sq_rx_ring; 1182 int interrupt = servicing_interrupt(); 1183 boolean_t poll_on = B_FALSE; 1184 1185 ASSERT(mutex_owned(&sqp->sq_lock)); 1186 ASSERT(!(sqp->sq_state & SQS_PROC)); 1187 1188 #if SQUEUE_PROFILE 1189 if (SQ_PROFILING(sqp)) { 1190 if (interrupt) 1191 SQSTAT(sqp, sq_ndrains_intr); 1192 else if (!(proc_type & SQS_WORKER)) 1193 SQSTAT(sqp, sq_ndrains_other); 1194 else 1195 SQSTAT(sqp, sq_ndrains_worker); 1196 } 1197 #endif 1198 1199 if ((tid = sqp->sq_tid) != 0) 1200 sqp->sq_tid = 0; 1201 1202 sqp->sq_state |= SQS_PROC | proc_type; 1203 head = sqp->sq_first; 1204 sqp->sq_first = NULL; 1205 sqp->sq_last = NULL; 1206 cnt = sqp->sq_count; 1207 1208 /* 1209 * We have backlog built up. Switch to polling mode if the 1210 * device underneath allows it. Need to do it only for 1211 * drain by non-interrupt thread so interrupts don't 1212 * come and disrupt us in between. If its a interrupt thread, 1213 * no need because most devices will not issue another 1214 * interrupt till this one returns. 1215 */ 1216 if ((sqp->sq_state & SQS_POLL_CAPAB) && !(proc_type & SQS_ENTER) && 1217 (sqp->sq_count > squeue_worker_poll_min)) { 1218 ASSERT(sq_rx_ring != NULL); 1219 SQS_POLLING_ON(sqp, sq_rx_ring); 1220 poll_on = B_TRUE; 1221 } 1222 1223 mutex_exit(&sqp->sq_lock); 1224 1225 if (tid != 0) 1226 (void) untimeout(tid); 1227 again: 1228 while ((mp = head) != NULL) { 1229 head = mp->b_next; 1230 mp->b_next = NULL; 1231 1232 proc = (sqproc_t)mp->b_queue; 1233 mp->b_queue = NULL; 1234 connp = (conn_t *)mp->b_prev; 1235 mp->b_prev = NULL; 1236 #if SQUEUE_DEBUG 1237 sqp->sq_curmp = mp; 1238 sqp->sq_curproc = proc; 1239 sqp->sq_connp = connp; 1240 sqp->sq_tag = mp->b_tag; 1241 #endif 1242 1243 #if SQUEUE_PROFILE 1244 if (SQ_PROFILING(sqp)) { 1245 if (interrupt) 1246 SQSTAT(sqp, sq_npackets_intr); 1247 else if (!(proc_type & SQS_WORKER)) 1248 SQSTAT(sqp, sq_npackets_other); 1249 else 1250 SQSTAT(sqp, sq_npackets_worker); 1251 } 1252 #endif 1253 1254 connp->conn_on_sqp = B_TRUE; 1255 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 1256 sqp, mblk_t *, mp, conn_t *, connp); 1257 (*proc)(connp, mp, sqp); 1258 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 1259 sqp, conn_t *, connp); 1260 connp->conn_on_sqp = B_FALSE; 1261 CONN_DEC_REF(connp); 1262 } 1263 1264 1265 #if SQUEUE_DEBUG 1266 sqp->sq_curmp = NULL; 1267 sqp->sq_curproc = NULL; 1268 sqp->sq_connp = NULL; 1269 #endif 1270 1271 mutex_enter(&sqp->sq_lock); 1272 sqp->sq_count -= cnt; 1273 total_cnt += cnt; 1274 1275 if (sqp->sq_first != NULL) { 1276 if (!expire || (lbolt < expire)) { 1277 /* More arrived and time not expired */ 1278 head = sqp->sq_first; 1279 sqp->sq_first = NULL; 1280 sqp->sq_last = NULL; 1281 cnt = sqp->sq_count; 1282 mutex_exit(&sqp->sq_lock); 1283 goto again; 1284 } 1285 1286 /* 1287 * If we are not worker thread and we 1288 * reached our time limit to do drain, 1289 * signal the worker thread to pick 1290 * up the work. 1291 * If we were the worker thread, then 1292 * we take a break to allow an interrupt 1293 * or writer to pick up the load. 1294 */ 1295 if (proc_type != SQS_WORKER) { 1296 sqp->sq_awaken = lbolt; 1297 cv_signal(&sqp->sq_async); 1298 } 1299 } 1300 1301 /* 1302 * Try to see if we can get a time estimate to process a packet. 1303 * Do it only in interrupt context since less chance of context 1304 * switch or pinning etc. to get a better estimate. 1305 */ 1306 if (interrupt && ((drain_time = (lbolt - start)) > 0)) 1307 sqp->sq_avg_drain_time = ((80 * sqp->sq_avg_drain_time) + 1308 (20 * (drv_hztousec(drain_time)/total_cnt)))/100; 1309 1310 sqp->sq_state &= ~(SQS_PROC | proc_type); 1311 1312 /* 1313 * If polling was turned on, turn it off and reduce the default 1314 * interrupt blank interval as well to bring new packets in faster 1315 * (reduces the latency when there is no backlog). 1316 */ 1317 if (poll_on && (sqp->sq_state & SQS_POLL_CAPAB)) { 1318 ASSERT(sq_rx_ring != NULL); 1319 SQS_POLLING_OFF(sqp, sq_rx_ring); 1320 } 1321 } 1322 1323 static void 1324 squeue_worker(squeue_t *sqp) 1325 { 1326 kmutex_t *lock = &sqp->sq_lock; 1327 kcondvar_t *async = &sqp->sq_async; 1328 callb_cpr_t cprinfo; 1329 #if SQUEUE_PROFILE 1330 hrtime_t start; 1331 #endif 1332 1333 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "nca"); 1334 mutex_enter(lock); 1335 1336 for (;;) { 1337 while (sqp->sq_first == NULL || (sqp->sq_state & SQS_PROC)) { 1338 CALLB_CPR_SAFE_BEGIN(&cprinfo); 1339 still_wait: 1340 cv_wait(async, lock); 1341 if (sqp->sq_state & SQS_PROC) { 1342 goto still_wait; 1343 } 1344 CALLB_CPR_SAFE_END(&cprinfo, lock); 1345 } 1346 1347 #if SQUEUE_PROFILE 1348 if (SQ_PROFILING(sqp)) { 1349 start = gethrtime(); 1350 } 1351 #endif 1352 1353 ASSERT(squeue_workerdrain_tick != 0); 1354 sqp->sq_run = curthread; 1355 squeue_drain(sqp, SQS_WORKER, lbolt + squeue_workerdrain_tick); 1356 sqp->sq_run = NULL; 1357 1358 if (sqp->sq_first != NULL) { 1359 /* 1360 * Doing too much processing by worker thread 1361 * in presense of interrupts can be sub optimal. 1362 * Instead, once a drain is done by worker thread 1363 * for squeue_writerdrain_ms (the reason we are 1364 * here), we force wait for squeue_workerwait_tick 1365 * before doing more processing even if sq_wait is 1366 * set to 0. 1367 * 1368 * This can be counterproductive for performance 1369 * if worker thread is the only means to process 1370 * the packets (interrupts or writers are not 1371 * allowed inside the squeue). 1372 */ 1373 if (sqp->sq_tid == 0 && 1374 !(sqp->sq_state & SQS_TMO_PROG)) { 1375 timeout_id_t tid; 1376 1377 sqp->sq_state |= SQS_TMO_PROG; 1378 mutex_exit(&sqp->sq_lock); 1379 tid = timeout(squeue_fire, sqp, 1380 squeue_workerwait_tick); 1381 mutex_enter(&sqp->sq_lock); 1382 /* 1383 * Check again if we still need 1384 * the timeout 1385 */ 1386 if (((sqp->sq_state & (SQS_TMO_PROG|SQS_PROC)) 1387 == SQS_TMO_PROG) && (sqp->sq_tid == 0) && 1388 (sqp->sq_first != NULL)) { 1389 sqp->sq_state &= ~SQS_TMO_PROG; 1390 sqp->sq_awaken = lbolt; 1391 sqp->sq_tid = tid; 1392 } else if (sqp->sq_state & SQS_TMO_PROG) { 1393 /* timeout not needed */ 1394 sqp->sq_state &= ~SQS_TMO_PROG; 1395 mutex_exit(&(sqp)->sq_lock); 1396 (void) untimeout(tid); 1397 mutex_enter(&sqp->sq_lock); 1398 } 1399 } 1400 CALLB_CPR_SAFE_BEGIN(&cprinfo); 1401 cv_wait(async, lock); 1402 CALLB_CPR_SAFE_END(&cprinfo, lock); 1403 } 1404 1405 1406 #if SQUEUE_PROFILE 1407 if (SQ_PROFILING(sqp)) { 1408 SQDELTA(sqp, sq_time_worker, gethrtime() - start); 1409 } 1410 #endif 1411 } 1412 } 1413 1414 #if SQUEUE_PROFILE 1415 static int 1416 squeue_kstat_update(kstat_t *ksp, int rw) 1417 { 1418 struct squeue_kstat *sqsp = &squeue_kstat; 1419 squeue_t *sqp = ksp->ks_private; 1420 1421 if (rw == KSTAT_WRITE) 1422 return (EACCES); 1423 1424 #if SQUEUE_DEBUG 1425 sqsp->sq_count.value.ui64 = sqp->sq_count; 1426 sqsp->sq_max_qlen.value.ui64 = sqp->sq_stats.sq_max_qlen; 1427 #endif 1428 sqsp->sq_npackets_worker.value.ui64 = sqp->sq_stats.sq_npackets_worker; 1429 sqsp->sq_npackets_intr.value.ui64 = sqp->sq_stats.sq_npackets_intr; 1430 sqsp->sq_npackets_other.value.ui64 = sqp->sq_stats.sq_npackets_other; 1431 sqsp->sq_nqueued_intr.value.ui64 = sqp->sq_stats.sq_nqueued_intr; 1432 sqsp->sq_nqueued_other.value.ui64 = sqp->sq_stats.sq_nqueued_other; 1433 sqsp->sq_ndrains_worker.value.ui64 = sqp->sq_stats.sq_ndrains_worker; 1434 sqsp->sq_ndrains_intr.value.ui64 = sqp->sq_stats.sq_ndrains_intr; 1435 sqsp->sq_ndrains_other.value.ui64 = sqp->sq_stats.sq_ndrains_other; 1436 sqsp->sq_time_worker.value.ui64 = sqp->sq_stats.sq_time_worker; 1437 sqsp->sq_time_intr.value.ui64 = sqp->sq_stats.sq_time_intr; 1438 sqsp->sq_time_other.value.ui64 = sqp->sq_stats.sq_time_other; 1439 return (0); 1440 } 1441 #endif 1442 1443 void 1444 squeue_profile_enable(squeue_t *sqp) 1445 { 1446 mutex_enter(&sqp->sq_lock); 1447 sqp->sq_state |= SQS_PROFILE; 1448 mutex_exit(&sqp->sq_lock); 1449 } 1450 1451 void 1452 squeue_profile_disable(squeue_t *sqp) 1453 { 1454 mutex_enter(&sqp->sq_lock); 1455 sqp->sq_state &= ~SQS_PROFILE; 1456 mutex_exit(&sqp->sq_lock); 1457 } 1458 1459 void 1460 squeue_profile_reset(squeue_t *sqp) 1461 { 1462 #if SQUEUE_PROFILE 1463 bzero(&sqp->sq_stats, sizeof (sqstat_t)); 1464 #endif 1465 } 1466 1467 void 1468 squeue_profile_start(void) 1469 { 1470 #if SQUEUE_PROFILE 1471 squeue_profile = B_TRUE; 1472 #endif 1473 } 1474 1475 void 1476 squeue_profile_stop(void) 1477 { 1478 #if SQUEUE_PROFILE 1479 squeue_profile = B_FALSE; 1480 #endif 1481 } 1482 1483 uintptr_t * 1484 squeue_getprivate(squeue_t *sqp, sqprivate_t p) 1485 { 1486 ASSERT(p < SQPRIVATE_MAX); 1487 1488 return (&sqp->sq_private[p]); 1489 } 1490 1491 processorid_t 1492 squeue_binding(squeue_t *sqp) 1493 { 1494 return (sqp->sq_bind); 1495 } 1496