1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2009 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 26 /* 27 * Squeues: General purpose serialization mechanism 28 * ------------------------------------------------ 29 * 30 * Background: 31 * ----------- 32 * 33 * This is a general purpose high-performance serialization mechanism 34 * currently used by TCP/IP. It is implement by means of a per CPU queue, 35 * a worker thread and a polling thread with are bound to the CPU 36 * associated with the squeue. The squeue is strictly FIFO for both read 37 * and write side and only one thread can process it at any given time. 38 * The design goal of squeue was to offer a very high degree of 39 * parallelization (on a per H/W execution pipeline basis) with at 40 * most one queuing. 41 * 42 * The modules needing protection typically calls squeue_enter() or 43 * squeue_enter_chain() routine as soon as a thread enter the module 44 * from either direction. For each packet, the processing function 45 * and argument is stored in the mblk itself. When the packet is ready 46 * to be processed, the squeue retrieves the stored function and calls 47 * it with the supplied argument and the pointer to the packet itself. 48 * The called function can assume that no other thread is processing 49 * the squeue when it is executing. 50 * 51 * Squeue/connection binding: 52 * -------------------------- 53 * 54 * TCP/IP uses an IP classifier in conjunction with squeue where specific 55 * connections are assigned to specific squeue (based on various policies), 56 * at the connection creation time. Once assigned, the connection to 57 * squeue mapping is never changed and all future packets for that 58 * connection are processed on that squeue. The connection ("conn") to 59 * squeue mapping is stored in "conn_t" member "conn_sqp". 60 * 61 * Since the processing of the connection cuts across multiple layers 62 * but still allows packets for different connnection to be processed on 63 * other CPU/squeues, squeues are also termed as "Vertical Perimeter" or 64 * "Per Connection Vertical Perimeter". 65 * 66 * Processing Model: 67 * ----------------- 68 * 69 * Squeue doesn't necessary processes packets with its own worker thread. 70 * The callers can pick if they just want to queue the packet, process 71 * their packet if nothing is queued or drain and process. The first two 72 * modes are typically employed when the packet was generated while 73 * already doing the processing behind the squeue and last mode (drain 74 * and process) is typically employed when the thread is entering squeue 75 * for the first time. The squeue still imposes a finite time limit 76 * for which a external thread can do processing after which it switches 77 * processing to its own worker thread. 78 * 79 * Once created, squeues are never deleted. Hence squeue pointers are 80 * always valid. This means that functions outside the squeue can still 81 * refer safely to conn_sqp and their is no need for ref counts. 82 * 83 * Only a thread executing in the squeue can change the squeue of the 84 * connection. It does so by calling a squeue framework function to do this. 85 * After changing the squeue, the thread must leave the squeue. It must not 86 * continue to execute any code that needs squeue protection. 87 * 88 * The squeue framework, after entering the squeue, checks if the current 89 * squeue matches the conn_sqp. If the check fails, the packet is delivered 90 * to right squeue. 91 * 92 * Polling Model: 93 * -------------- 94 * 95 * Squeues can control the rate of packet arrival into itself from the 96 * NIC or specific Rx ring within a NIC. As part of capability negotiation 97 * between IP and MAC layer, squeue are created for each TCP soft ring 98 * (or TCP Rx ring - to be implemented in future). As part of this 99 * negotiation, squeues get a cookie for underlying soft ring or Rx 100 * ring, a function to turn off incoming packets and a function to call 101 * to poll for packets. This helps schedule the receive side packet 102 * processing so that queue backlog doesn't build up and packet processing 103 * doesn't keep getting disturbed by high priority interrupts. As part 104 * of this mode, as soon as a backlog starts building, squeue turns off 105 * the interrupts and switches to poll mode. In poll mode, when poll 106 * thread goes down to retrieve packets, it retrieves them in the form of 107 * a chain which improves performance even more. As the squeue/softring 108 * system gets more packets, it gets more efficient by switching to 109 * polling more often and dealing with larger packet chains. 110 * 111 */ 112 113 #include <sys/types.h> 114 #include <sys/cmn_err.h> 115 #include <sys/debug.h> 116 #include <sys/kmem.h> 117 #include <sys/cpuvar.h> 118 #include <sys/condvar_impl.h> 119 #include <sys/systm.h> 120 #include <sys/callb.h> 121 #include <sys/sdt.h> 122 #include <sys/ddi.h> 123 #include <sys/sunddi.h> 124 125 #include <inet/ipclassifier.h> 126 #include <inet/udp_impl.h> 127 128 #include <sys/squeue_impl.h> 129 130 static void squeue_fire(void *); 131 static void squeue_drain(squeue_t *, uint_t, hrtime_t); 132 static void squeue_worker(squeue_t *sqp); 133 static void squeue_polling_thread(squeue_t *sqp); 134 135 kmem_cache_t *squeue_cache; 136 137 #define SQUEUE_MSEC_TO_NSEC 1000000 138 139 int squeue_drain_ms = 20; 140 int squeue_workerwait_ms = 0; 141 142 /* The values above converted to ticks or nano seconds */ 143 static int squeue_drain_ns = 0; 144 static int squeue_workerwait_tick = 0; 145 146 #define MAX_BYTES_TO_PICKUP 150000 147 148 #define ENQUEUE_CHAIN(sqp, mp, tail, cnt) { \ 149 /* \ 150 * Enqueue our mblk chain. \ 151 */ \ 152 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 153 \ 154 if ((sqp)->sq_last != NULL) \ 155 (sqp)->sq_last->b_next = (mp); \ 156 else \ 157 (sqp)->sq_first = (mp); \ 158 (sqp)->sq_last = (tail); \ 159 (sqp)->sq_count += (cnt); \ 160 ASSERT((sqp)->sq_count > 0); \ 161 DTRACE_PROBE4(squeue__enqueuechain, squeue_t *, sqp, \ 162 mblk_t *, mp, mblk_t *, tail, int, cnt); \ 163 \ 164 } 165 166 /* 167 * Blank the receive ring (in this case it is the soft ring). When 168 * blanked, the soft ring will not send any more packets up. 169 * Blanking may not succeed when there is a CPU already in the soft 170 * ring sending packets up. In that case, SQS_POLLING will not be 171 * set. 172 */ 173 #define SQS_POLLING_ON(sqp, sq_poll_capable, rx_ring) { \ 174 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 175 if (sq_poll_capable) { \ 176 ASSERT(rx_ring != NULL); \ 177 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \ 178 if (!(sqp->sq_state & SQS_POLLING)) { \ 179 if (rx_ring->rr_intr_disable(rx_ring->rr_intr_handle)) \ 180 sqp->sq_state |= SQS_POLLING; \ 181 } \ 182 } \ 183 } 184 185 #define SQS_POLLING_OFF(sqp, sq_poll_capable, rx_ring) { \ 186 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 187 if (sq_poll_capable) { \ 188 ASSERT(rx_ring != NULL); \ 189 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \ 190 if (sqp->sq_state & SQS_POLLING) { \ 191 sqp->sq_state &= ~SQS_POLLING; \ 192 rx_ring->rr_intr_enable(rx_ring->rr_intr_handle); \ 193 } \ 194 } \ 195 } 196 197 /* Wakeup poll thread only if SQS_POLLING is set */ 198 #define SQS_POLL_RING(sqp) { \ 199 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 200 if (sqp->sq_state & SQS_POLLING) { \ 201 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \ 202 if (!(sqp->sq_state & SQS_GET_PKTS)) { \ 203 sqp->sq_state |= SQS_GET_PKTS; \ 204 cv_signal(&sqp->sq_poll_cv); \ 205 } \ 206 } \ 207 } 208 209 #ifdef DEBUG 210 #define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) { \ 211 (sqp)->sq_curmp = (mp); \ 212 (sqp)->sq_curproc = (proc); \ 213 (sqp)->sq_connp = (connp); \ 214 (mp)->b_tag = (sqp)->sq_tag = (tag); \ 215 } 216 217 #define SQUEUE_DBG_CLEAR(sqp) { \ 218 (sqp)->sq_curmp = NULL; \ 219 (sqp)->sq_curproc = NULL; \ 220 (sqp)->sq_connp = NULL; \ 221 } 222 #else 223 #define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) 224 #define SQUEUE_DBG_CLEAR(sqp) 225 #endif 226 227 void 228 squeue_init(void) 229 { 230 squeue_cache = kmem_cache_create("squeue_cache", 231 sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0); 232 233 squeue_drain_ns = squeue_drain_ms * SQUEUE_MSEC_TO_NSEC; 234 squeue_workerwait_tick = MSEC_TO_TICK_ROUNDUP(squeue_workerwait_ms); 235 } 236 237 /* ARGSUSED */ 238 squeue_t * 239 squeue_create(clock_t wait, pri_t pri) 240 { 241 squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP); 242 243 bzero(sqp, sizeof (squeue_t)); 244 sqp->sq_bind = PBIND_NONE; 245 sqp->sq_priority = pri; 246 sqp->sq_wait = MSEC_TO_TICK(wait); 247 sqp->sq_worker = thread_create(NULL, 0, squeue_worker, 248 sqp, 0, &p0, TS_RUN, pri); 249 250 sqp->sq_poll_thr = thread_create(NULL, 0, squeue_polling_thread, 251 sqp, 0, &p0, TS_RUN, pri); 252 253 sqp->sq_enter = squeue_enter; 254 sqp->sq_drain = squeue_drain; 255 256 return (sqp); 257 } 258 259 /* 260 * Bind squeue worker thread to the specified CPU, given by CPU id. 261 * If the CPU id value is -1, bind the worker thread to the value 262 * specified in sq_bind field. If a thread is already bound to a 263 * different CPU, unbind it from the old CPU and bind to the new one. 264 */ 265 266 void 267 squeue_bind(squeue_t *sqp, processorid_t bind) 268 { 269 mutex_enter(&sqp->sq_lock); 270 ASSERT(sqp->sq_bind != PBIND_NONE || bind != PBIND_NONE); 271 ASSERT(MUTEX_HELD(&cpu_lock)); 272 273 if (sqp->sq_state & SQS_BOUND) { 274 if (sqp->sq_bind == bind) { 275 mutex_exit(&sqp->sq_lock); 276 return; 277 } 278 thread_affinity_clear(sqp->sq_worker); 279 } else { 280 sqp->sq_state |= SQS_BOUND; 281 } 282 283 if (bind != PBIND_NONE) 284 sqp->sq_bind = bind; 285 286 thread_affinity_set(sqp->sq_worker, sqp->sq_bind); 287 mutex_exit(&sqp->sq_lock); 288 } 289 290 void 291 squeue_unbind(squeue_t *sqp) 292 { 293 mutex_enter(&sqp->sq_lock); 294 if (!(sqp->sq_state & SQS_BOUND)) { 295 mutex_exit(&sqp->sq_lock); 296 return; 297 } 298 299 sqp->sq_state &= ~SQS_BOUND; 300 thread_affinity_clear(sqp->sq_worker); 301 mutex_exit(&sqp->sq_lock); 302 } 303 304 void 305 squeue_worker_wakeup(squeue_t *sqp) 306 { 307 timeout_id_t tid = (sqp)->sq_tid; 308 309 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); 310 311 if (sqp->sq_wait == 0) { 312 ASSERT(tid == 0); 313 ASSERT(!(sqp->sq_state & SQS_TMO_PROG)); 314 sqp->sq_awaken = lbolt; 315 cv_signal(&sqp->sq_worker_cv); 316 mutex_exit(&sqp->sq_lock); 317 return; 318 } 319 320 /* 321 * Queue isn't being processed, so take 322 * any post enqueue actions needed before leaving. 323 */ 324 if (tid != 0) { 325 /* 326 * Waiting for an enter() to process mblk(s). 327 */ 328 clock_t waited = lbolt - sqp->sq_awaken; 329 330 if (TICK_TO_MSEC(waited) >= sqp->sq_wait) { 331 /* 332 * Times up and have a worker thread 333 * waiting for work, so schedule it. 334 */ 335 sqp->sq_tid = 0; 336 sqp->sq_awaken = lbolt; 337 cv_signal(&sqp->sq_worker_cv); 338 mutex_exit(&sqp->sq_lock); 339 (void) untimeout(tid); 340 return; 341 } 342 mutex_exit(&sqp->sq_lock); 343 return; 344 } else if (sqp->sq_state & SQS_TMO_PROG) { 345 mutex_exit(&sqp->sq_lock); 346 return; 347 } else { 348 clock_t wait = sqp->sq_wait; 349 /* 350 * Wait up to sqp->sq_wait ms for an 351 * enter() to process this queue. We 352 * don't want to contend on timeout locks 353 * with sq_lock held for performance reasons, 354 * so drop the sq_lock before calling timeout 355 * but we need to check if timeout is required 356 * after re acquiring the sq_lock. Once 357 * the sq_lock is dropped, someone else could 358 * have processed the packet or the timeout could 359 * have already fired. 360 */ 361 sqp->sq_state |= SQS_TMO_PROG; 362 mutex_exit(&sqp->sq_lock); 363 tid = timeout(squeue_fire, sqp, wait); 364 mutex_enter(&sqp->sq_lock); 365 /* Check again if we still need the timeout */ 366 if (((sqp->sq_state & (SQS_PROC|SQS_TMO_PROG)) == 367 SQS_TMO_PROG) && (sqp->sq_tid == 0) && 368 (sqp->sq_first != NULL)) { 369 sqp->sq_state &= ~SQS_TMO_PROG; 370 sqp->sq_tid = tid; 371 mutex_exit(&sqp->sq_lock); 372 return; 373 } else { 374 if (sqp->sq_state & SQS_TMO_PROG) { 375 sqp->sq_state &= ~SQS_TMO_PROG; 376 mutex_exit(&sqp->sq_lock); 377 (void) untimeout(tid); 378 } else { 379 /* 380 * The timer fired before we could 381 * reacquire the sq_lock. squeue_fire 382 * removes the SQS_TMO_PROG flag 383 * and we don't need to do anything 384 * else. 385 */ 386 mutex_exit(&sqp->sq_lock); 387 } 388 } 389 } 390 391 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 392 } 393 394 /* 395 * squeue_enter() - enter squeue sqp with mblk mp (which can be 396 * a chain), while tail points to the end and cnt in number of 397 * mblks in the chain. 398 * 399 * For a chain of single packet (i.e. mp == tail), go through the 400 * fast path if no one is processing the squeue and nothing is queued. 401 * 402 * The proc and arg for each mblk is already stored in the mblk in 403 * appropriate places. 404 * 405 * The process_flag specifies if we are allowed to process the mblk 406 * and drain in the entering thread context. If process_flag is 407 * SQ_FILL, then we just queue the mblk and return (after signaling 408 * the worker thread if no one else is processing the squeue). 409 */ 410 /* ARGSUSED */ 411 void 412 squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt, 413 int process_flag, uint8_t tag) 414 { 415 conn_t *connp; 416 sqproc_t proc; 417 hrtime_t now; 418 419 ASSERT(sqp != NULL); 420 ASSERT(mp != NULL); 421 ASSERT(tail != NULL); 422 ASSERT(cnt > 0); 423 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 424 425 mutex_enter(&sqp->sq_lock); 426 427 /* 428 * Try to process the packet if SQ_FILL flag is not set and 429 * we are allowed to process the squeue. The SQ_NODRAIN is 430 * ignored if the packet chain consists of more than 1 packet. 431 */ 432 if (!(sqp->sq_state & SQS_PROC) && ((process_flag == SQ_PROCESS) || 433 (process_flag == SQ_NODRAIN && sqp->sq_first == NULL))) { 434 /* 435 * See if anything is already queued. If we are the 436 * first packet, do inline processing else queue the 437 * packet and do the drain. 438 */ 439 if (sqp->sq_first == NULL && cnt == 1) { 440 /* 441 * Fast-path, ok to process and nothing queued. 442 */ 443 sqp->sq_state |= (SQS_PROC|SQS_FAST); 444 sqp->sq_run = curthread; 445 mutex_exit(&sqp->sq_lock); 446 447 /* 448 * We are the chain of 1 packet so 449 * go through this fast path. 450 */ 451 ASSERT(mp->b_prev != NULL); 452 ASSERT(mp->b_queue != NULL); 453 connp = (conn_t *)mp->b_prev; 454 mp->b_prev = NULL; 455 proc = (sqproc_t)mp->b_queue; 456 mp->b_queue = NULL; 457 ASSERT(proc != NULL && connp != NULL); 458 ASSERT(mp->b_next == NULL); 459 460 /* 461 * Handle squeue switching. More details in the 462 * block comment at the top of the file 463 */ 464 if (connp->conn_sqp == sqp) { 465 SQUEUE_DBG_SET(sqp, mp, proc, connp, 466 tag); 467 connp->conn_on_sqp = B_TRUE; 468 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 469 sqp, mblk_t *, mp, conn_t *, connp); 470 (*proc)(connp, mp, sqp); 471 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 472 sqp, conn_t *, connp); 473 connp->conn_on_sqp = B_FALSE; 474 SQUEUE_DBG_CLEAR(sqp); 475 CONN_DEC_REF(connp); 476 } else { 477 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, 478 connp, SQ_FILL, SQTAG_SQUEUE_CHANGE); 479 } 480 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 481 mutex_enter(&sqp->sq_lock); 482 sqp->sq_state &= ~(SQS_PROC|SQS_FAST); 483 sqp->sq_run = NULL; 484 if (sqp->sq_first == NULL || 485 process_flag == SQ_NODRAIN) { 486 if (sqp->sq_first != NULL) { 487 squeue_worker_wakeup(sqp); 488 return; 489 } 490 /* 491 * We processed inline our packet and nothing 492 * new has arrived. We are done. In case any 493 * control actions are pending, wake up the 494 * worker. 495 */ 496 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) 497 cv_signal(&sqp->sq_worker_cv); 498 mutex_exit(&sqp->sq_lock); 499 return; 500 } 501 } else { 502 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 503 #ifdef DEBUG 504 mp->b_tag = tag; 505 #endif 506 } 507 /* 508 * We are here because either we couldn't do inline 509 * processing (because something was already queued), 510 * or we had a chain of more than one packet, 511 * or something else arrived after we were done with 512 * inline processing. 513 */ 514 ASSERT(MUTEX_HELD(&sqp->sq_lock)); 515 ASSERT(sqp->sq_first != NULL); 516 now = gethrtime(); 517 sqp->sq_drain(sqp, SQS_ENTER, now + squeue_drain_ns); 518 519 /* 520 * If we didn't do a complete drain, the worker 521 * thread was already signalled by squeue_drain. 522 * In case any control actions are pending, wake 523 * up the worker. 524 */ 525 sqp->sq_run = NULL; 526 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) 527 cv_signal(&sqp->sq_worker_cv); 528 mutex_exit(&sqp->sq_lock); 529 return; 530 } else { 531 /* 532 * We let a thread processing a squeue reenter only 533 * once. This helps the case of incoming connection 534 * where a SYN-ACK-ACK that triggers the conn_ind 535 * doesn't have to queue the packet if listener and 536 * eager are on the same squeue. Also helps the 537 * loopback connection where the two ends are bound 538 * to the same squeue (which is typical on single 539 * CPU machines). 540 * 541 * We let the thread reenter only once for the fear 542 * of stack getting blown with multiple traversal. 543 */ 544 connp = (conn_t *)mp->b_prev; 545 if (!(sqp->sq_state & SQS_REENTER) && 546 (process_flag != SQ_FILL) && (sqp->sq_first == NULL) && 547 (sqp->sq_run == curthread) && (cnt == 1) && 548 (connp->conn_on_sqp == B_FALSE)) { 549 sqp->sq_state |= SQS_REENTER; 550 mutex_exit(&sqp->sq_lock); 551 552 ASSERT(mp->b_prev != NULL); 553 ASSERT(mp->b_queue != NULL); 554 555 mp->b_prev = NULL; 556 proc = (sqproc_t)mp->b_queue; 557 mp->b_queue = NULL; 558 559 /* 560 * Handle squeue switching. More details in the 561 * block comment at the top of the file 562 */ 563 if (connp->conn_sqp == sqp) { 564 connp->conn_on_sqp = B_TRUE; 565 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 566 sqp, mblk_t *, mp, conn_t *, connp); 567 (*proc)(connp, mp, sqp); 568 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 569 sqp, conn_t *, connp); 570 connp->conn_on_sqp = B_FALSE; 571 CONN_DEC_REF(connp); 572 } else { 573 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, 574 connp, SQ_FILL, SQTAG_SQUEUE_CHANGE); 575 } 576 577 mutex_enter(&sqp->sq_lock); 578 sqp->sq_state &= ~SQS_REENTER; 579 mutex_exit(&sqp->sq_lock); 580 return; 581 } 582 583 /* 584 * Queue is already being processed or there is already 585 * one or more paquets on the queue. Enqueue the 586 * packet and wakeup the squeue worker thread if the 587 * squeue is not being processed. 588 */ 589 #ifdef DEBUG 590 mp->b_tag = tag; 591 #endif 592 593 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 594 if (!(sqp->sq_state & SQS_PROC)) { 595 squeue_worker_wakeup(sqp); 596 return; 597 } 598 /* 599 * In case any control actions are pending, wake 600 * up the worker. 601 */ 602 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) 603 cv_signal(&sqp->sq_worker_cv); 604 mutex_exit(&sqp->sq_lock); 605 return; 606 } 607 } 608 609 /* 610 * PRIVATE FUNCTIONS 611 */ 612 613 static void 614 squeue_fire(void *arg) 615 { 616 squeue_t *sqp = arg; 617 uint_t state; 618 619 mutex_enter(&sqp->sq_lock); 620 621 state = sqp->sq_state; 622 if (sqp->sq_tid == 0 && !(state & SQS_TMO_PROG)) { 623 mutex_exit(&sqp->sq_lock); 624 return; 625 } 626 627 sqp->sq_tid = 0; 628 /* 629 * The timeout fired before we got a chance to set it. 630 * Process it anyway but remove the SQS_TMO_PROG so that 631 * the guy trying to set the timeout knows that it has 632 * already been processed. 633 */ 634 if (state & SQS_TMO_PROG) 635 sqp->sq_state &= ~SQS_TMO_PROG; 636 637 if (!(state & SQS_PROC)) { 638 sqp->sq_awaken = lbolt; 639 cv_signal(&sqp->sq_worker_cv); 640 } 641 mutex_exit(&sqp->sq_lock); 642 } 643 644 static void 645 squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire) 646 { 647 mblk_t *mp; 648 mblk_t *head; 649 sqproc_t proc; 650 conn_t *connp; 651 timeout_id_t tid; 652 ill_rx_ring_t *sq_rx_ring = sqp->sq_rx_ring; 653 hrtime_t now; 654 boolean_t did_wakeup = B_FALSE; 655 boolean_t sq_poll_capable; 656 657 sq_poll_capable = (sqp->sq_state & SQS_POLL_CAPAB) != 0; 658 again: 659 ASSERT(mutex_owned(&sqp->sq_lock)); 660 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 661 SQS_POLL_QUIESCE_DONE))); 662 663 head = sqp->sq_first; 664 sqp->sq_first = NULL; 665 sqp->sq_last = NULL; 666 sqp->sq_count = 0; 667 668 if ((tid = sqp->sq_tid) != 0) 669 sqp->sq_tid = 0; 670 671 sqp->sq_state |= SQS_PROC | proc_type; 672 673 /* 674 * We have backlog built up. Switch to polling mode if the 675 * device underneath allows it. Need to do it so that 676 * more packets don't come in and disturb us (by contending 677 * for sq_lock or higher priority thread preempting us). 678 * 679 * The worker thread is allowed to do active polling while we 680 * just disable the interrupts for drain by non worker (kernel 681 * or userland) threads so they can peacefully process the 682 * packets during time allocated to them. 683 */ 684 SQS_POLLING_ON(sqp, sq_poll_capable, sq_rx_ring); 685 mutex_exit(&sqp->sq_lock); 686 687 if (tid != 0) 688 (void) untimeout(tid); 689 690 while ((mp = head) != NULL) { 691 692 head = mp->b_next; 693 mp->b_next = NULL; 694 695 proc = (sqproc_t)mp->b_queue; 696 mp->b_queue = NULL; 697 connp = (conn_t *)mp->b_prev; 698 mp->b_prev = NULL; 699 700 /* 701 * Handle squeue switching. More details in the 702 * block comment at the top of the file 703 */ 704 if (connp->conn_sqp == sqp) { 705 SQUEUE_DBG_SET(sqp, mp, proc, connp, 706 mp->b_tag); 707 connp->conn_on_sqp = B_TRUE; 708 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 709 sqp, mblk_t *, mp, conn_t *, connp); 710 (*proc)(connp, mp, sqp); 711 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 712 sqp, conn_t *, connp); 713 connp->conn_on_sqp = B_FALSE; 714 CONN_DEC_REF(connp); 715 } else { 716 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, connp, 717 SQ_FILL, SQTAG_SQUEUE_CHANGE); 718 } 719 } 720 721 SQUEUE_DBG_CLEAR(sqp); 722 723 mutex_enter(&sqp->sq_lock); 724 725 /* 726 * Check if there is still work to do (either more arrived or timer 727 * expired). If we are the worker thread and we are polling capable, 728 * continue doing the work since no one else is around to do the 729 * work anyway (but signal the poll thread to retrieve some packets 730 * in the meanwhile). If we are not the worker thread, just 731 * signal the worker thread to take up the work if processing time 732 * has expired. 733 */ 734 if (sqp->sq_first != NULL) { 735 /* 736 * Still more to process. If time quanta not expired, we 737 * should let the drain go on. The worker thread is allowed 738 * to drain as long as there is anything left. 739 */ 740 now = gethrtime(); 741 if ((now < expire) || (proc_type == SQS_WORKER)) { 742 /* 743 * If time not expired or we are worker thread and 744 * this squeue is polling capable, continue to do 745 * the drain. 746 * 747 * We turn off interrupts for all userland threads 748 * doing drain but we do active polling only for 749 * worker thread. 750 * 751 * Calling SQS_POLL_RING() even in the case of 752 * SQS_POLLING_ON() not succeeding is ok as 753 * SQS_POLL_RING() will not wake up poll thread 754 * if SQS_POLLING bit is not set. 755 */ 756 if (proc_type == SQS_WORKER) 757 SQS_POLL_RING(sqp); 758 goto again; 759 } else { 760 did_wakeup = B_TRUE; 761 sqp->sq_awaken = lbolt; 762 cv_signal(&sqp->sq_worker_cv); 763 } 764 } 765 766 /* 767 * If the poll thread is already running, just return. The 768 * poll thread continues to hold the proc and will finish 769 * processing. 770 */ 771 if (sqp->sq_state & SQS_GET_PKTS) { 772 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 773 SQS_POLL_QUIESCE_DONE))); 774 sqp->sq_state &= ~proc_type; 775 return; 776 } 777 778 /* 779 * 780 * If we are the worker thread and no work is left, send the poll 781 * thread down once more to see if something arrived. Otherwise, 782 * turn the interrupts back on and we are done. 783 */ 784 if ((proc_type == SQS_WORKER) && (sqp->sq_state & SQS_POLLING)) { 785 /* 786 * Do one last check to see if anything arrived 787 * in the NIC. We leave the SQS_PROC set to ensure 788 * that poll thread keeps the PROC and can decide 789 * if it needs to turn polling off or continue 790 * processing. 791 * 792 * If we drop the SQS_PROC here and poll thread comes 793 * up empty handed, it can not safely turn polling off 794 * since someone else could have acquired the PROC 795 * and started draining. The previously running poll 796 * thread and the current thread doing drain would end 797 * up in a race for turning polling on/off and more 798 * complex code would be required to deal with it. 799 * 800 * Its lot simpler for drain to hand the SQS_PROC to 801 * poll thread (if running) and let poll thread finish 802 * without worrying about racing with any other thread. 803 */ 804 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 805 SQS_POLL_QUIESCE_DONE))); 806 SQS_POLL_RING(sqp); 807 sqp->sq_state &= ~proc_type; 808 } else { 809 /* 810 * The squeue is either not capable of polling or the 811 * attempt to blank (i.e., turn SQS_POLLING_ON()) was 812 * unsuccessful or poll thread already finished 813 * processing and didn't find anything. Since there 814 * is nothing queued and we already turn polling on 815 * (for all threads doing drain), we should turn 816 * polling off and relinquish the PROC. 817 */ 818 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 819 SQS_POLL_QUIESCE_DONE))); 820 SQS_POLLING_OFF(sqp, sq_poll_capable, sq_rx_ring); 821 sqp->sq_state &= ~(SQS_PROC | proc_type); 822 if (!did_wakeup && sqp->sq_first != NULL) { 823 squeue_worker_wakeup(sqp); 824 mutex_enter(&sqp->sq_lock); 825 } 826 /* 827 * If we are not the worker and there is a pending quiesce 828 * event, wake up the worker 829 */ 830 if ((proc_type != SQS_WORKER) && 831 (sqp->sq_state & SQS_WORKER_THR_CONTROL)) 832 cv_signal(&sqp->sq_worker_cv); 833 } 834 } 835 836 /* 837 * Quiesce, Restart, or Cleanup of the squeue poll thread. 838 * 839 * Quiesce and Restart: After an squeue poll thread has been quiesced, it does 840 * not attempt to poll the underlying soft ring any more. The quiesce is 841 * triggered by the mac layer when it wants to quiesce a soft ring. Typically 842 * control operations such as changing the fanout of a NIC or VNIC (dladm 843 * setlinkprop) need to quiesce data flow before changing the wiring. 844 * The operation is done by the mac layer, but it calls back into IP to 845 * quiesce the soft ring. After completing the operation (say increase or 846 * decrease of the fanout) the mac layer then calls back into IP to restart 847 * the quiesced soft ring. 848 * 849 * Cleanup: This is triggered when the squeue binding to a soft ring is 850 * removed permanently. Typically interface plumb and unplumb would trigger 851 * this. It can also be triggered from the mac layer when a soft ring is 852 * being deleted say as the result of a fanout reduction. Since squeues are 853 * never deleted, the cleanup marks the squeue as fit for recycling and 854 * moves it to the zeroth squeue set. 855 */ 856 static void 857 squeue_poll_thr_control(squeue_t *sqp) 858 { 859 if (sqp->sq_state & SQS_POLL_THR_RESTART) { 860 /* Restart implies a previous quiesce */ 861 ASSERT(sqp->sq_state & SQS_POLL_THR_QUIESCED); 862 sqp->sq_state &= ~(SQS_POLL_THR_QUIESCED | 863 SQS_POLL_THR_RESTART); 864 sqp->sq_state |= SQS_POLL_CAPAB; 865 cv_signal(&sqp->sq_worker_cv); 866 return; 867 } 868 869 if (sqp->sq_state & SQS_POLL_THR_QUIESCE) { 870 sqp->sq_state |= SQS_POLL_THR_QUIESCED; 871 sqp->sq_state &= ~SQS_POLL_THR_QUIESCE; 872 cv_signal(&sqp->sq_worker_cv); 873 return; 874 } 875 } 876 877 /* 878 * POLLING Notes 879 * 880 * With polling mode, we want to do as much processing as we possibly can 881 * in worker thread context. The sweet spot is worker thread keeps doing 882 * work all the time in polling mode and writers etc. keep dumping packets 883 * to worker thread. Occassionally, we send the poll thread (running at 884 * lower priority to NIC to get the chain of packets to feed to worker). 885 * Sending the poll thread down to NIC is dependant on 3 criterions 886 * 887 * 1) Its always driven from squeue_drain and only if worker thread is 888 * doing the drain. 889 * 2) We clear the backlog once and more packets arrived in between. 890 * Before starting drain again, send the poll thread down if 891 * the drain is being done by worker thread. 892 * 3) Before exiting the squeue_drain, if the poll thread is not already 893 * working and we are the worker thread, try to poll one more time. 894 * 895 * For latency sake, we do allow any thread calling squeue_enter 896 * to process its packet provided: 897 * 898 * 1) Nothing is queued 899 * 2) If more packets arrived in between, the non worker thread are allowed 900 * to do the drain till their time quanta expired provided SQS_GET_PKTS 901 * wasn't set in between. 902 * 903 * Avoiding deadlocks with interrupts 904 * ================================== 905 * 906 * One of the big problem is that we can't send poll_thr down while holding 907 * the sq_lock since the thread can block. So we drop the sq_lock before 908 * calling sq_get_pkts(). We keep holding the SQS_PROC as long as the 909 * poll thread is running so that no other thread can acquire the 910 * perimeter in between. If the squeue_drain gets done (no more work 911 * left), it leaves the SQS_PROC set if poll thread is running. 912 */ 913 914 /* 915 * This is the squeue poll thread. In poll mode, it polls the underlying 916 * TCP softring and feeds packets into the squeue. The worker thread then 917 * drains the squeue. The poll thread also responds to control signals for 918 * quiesceing, restarting, or cleanup of an squeue. These are driven by 919 * control operations like plumb/unplumb or as a result of dynamic Rx ring 920 * related operations that are driven from the mac layer. 921 */ 922 static void 923 squeue_polling_thread(squeue_t *sqp) 924 { 925 kmutex_t *lock = &sqp->sq_lock; 926 kcondvar_t *async = &sqp->sq_poll_cv; 927 ip_mac_rx_t sq_get_pkts; 928 ip_accept_t ip_accept; 929 ill_rx_ring_t *sq_rx_ring; 930 ill_t *sq_ill; 931 mblk_t *head, *tail, *mp; 932 uint_t cnt; 933 void *sq_mac_handle; 934 callb_cpr_t cprinfo; 935 size_t bytes_to_pickup; 936 uint32_t ctl_state; 937 938 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_poll"); 939 mutex_enter(lock); 940 941 for (;;) { 942 CALLB_CPR_SAFE_BEGIN(&cprinfo); 943 cv_wait(async, lock); 944 CALLB_CPR_SAFE_END(&cprinfo, lock); 945 946 ctl_state = sqp->sq_state & (SQS_POLL_THR_CONTROL | 947 SQS_POLL_THR_QUIESCED); 948 if (ctl_state != 0) { 949 /* 950 * If the squeue is quiesced, then wait for a control 951 * request. A quiesced squeue must not poll the 952 * underlying soft ring. 953 */ 954 if (ctl_state == SQS_POLL_THR_QUIESCED) 955 continue; 956 /* 957 * Act on control requests to quiesce, cleanup or 958 * restart an squeue 959 */ 960 squeue_poll_thr_control(sqp); 961 continue; 962 } 963 964 if (!(sqp->sq_state & SQS_POLL_CAPAB)) 965 continue; 966 967 ASSERT((sqp->sq_state & 968 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) == 969 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)); 970 971 poll_again: 972 sq_rx_ring = sqp->sq_rx_ring; 973 sq_get_pkts = sq_rx_ring->rr_rx; 974 sq_mac_handle = sq_rx_ring->rr_rx_handle; 975 ip_accept = sq_rx_ring->rr_ip_accept; 976 sq_ill = sq_rx_ring->rr_ill; 977 bytes_to_pickup = MAX_BYTES_TO_PICKUP; 978 mutex_exit(lock); 979 head = sq_get_pkts(sq_mac_handle, bytes_to_pickup); 980 mp = NULL; 981 if (head != NULL) { 982 /* 983 * We got the packet chain from the mac layer. It 984 * would be nice to be able to process it inline 985 * for better performance but we need to give 986 * IP a chance to look at this chain to ensure 987 * that packets are really meant for this squeue 988 * and do the IP processing. 989 */ 990 mp = ip_accept(sq_ill, sq_rx_ring, sqp, head, 991 &tail, &cnt); 992 } 993 mutex_enter(lock); 994 if (mp != NULL) 995 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 996 997 ASSERT((sqp->sq_state & 998 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) == 999 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)); 1000 1001 if (sqp->sq_first != NULL && !(sqp->sq_state & SQS_WORKER)) { 1002 /* 1003 * We have packets to process and worker thread 1004 * is not running. Check to see if poll thread is 1005 * allowed to process. Let it do processing only if it 1006 * picked up some packets from the NIC otherwise 1007 * wakeup the worker thread. 1008 */ 1009 if (mp != NULL) { 1010 hrtime_t now; 1011 1012 now = gethrtime(); 1013 sqp->sq_run = curthread; 1014 sqp->sq_drain(sqp, SQS_POLL_PROC, now + 1015 squeue_drain_ns); 1016 sqp->sq_run = NULL; 1017 1018 if (sqp->sq_first == NULL) 1019 goto poll_again; 1020 1021 /* 1022 * Couldn't do the entire drain because the 1023 * time limit expired, let the 1024 * worker thread take over. 1025 */ 1026 } 1027 1028 sqp->sq_awaken = lbolt; 1029 /* 1030 * Put the SQS_PROC_HELD on so the worker 1031 * thread can distinguish where its called from. We 1032 * can remove the SQS_PROC flag here and turn off the 1033 * polling so that it wouldn't matter who gets the 1034 * processing but we get better performance this way 1035 * and save the cost of turn polling off and possibly 1036 * on again as soon as we start draining again. 1037 * 1038 * We can't remove the SQS_PROC flag without turning 1039 * polling off until we can guarantee that control 1040 * will return to squeue_drain immediately. 1041 */ 1042 sqp->sq_state |= SQS_PROC_HELD; 1043 sqp->sq_state &= ~SQS_GET_PKTS; 1044 cv_signal(&sqp->sq_worker_cv); 1045 } else if (sqp->sq_first == NULL && 1046 !(sqp->sq_state & SQS_WORKER)) { 1047 /* 1048 * Nothing queued and worker thread not running. 1049 * Since we hold the proc, no other thread is 1050 * processing the squeue. This means that there 1051 * is no work to be done and nothing is queued 1052 * in squeue or in NIC. Turn polling off and go 1053 * back to interrupt mode. 1054 */ 1055 sqp->sq_state &= ~(SQS_PROC|SQS_GET_PKTS); 1056 /* LINTED: constant in conditional context */ 1057 SQS_POLLING_OFF(sqp, B_TRUE, sq_rx_ring); 1058 1059 /* 1060 * If there is a pending control operation 1061 * wake up the worker, since it is currently 1062 * not running. 1063 */ 1064 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) 1065 cv_signal(&sqp->sq_worker_cv); 1066 } else { 1067 /* 1068 * Worker thread is already running. We don't need 1069 * to do anything. Indicate that poll thread is done. 1070 */ 1071 sqp->sq_state &= ~SQS_GET_PKTS; 1072 } 1073 if (sqp->sq_state & SQS_POLL_THR_CONTROL) { 1074 /* 1075 * Act on control requests to quiesce, cleanup or 1076 * restart an squeue 1077 */ 1078 squeue_poll_thr_control(sqp); 1079 } 1080 } 1081 } 1082 1083 /* 1084 * The squeue worker thread acts on any control requests to quiesce, cleanup 1085 * or restart an ill_rx_ring_t by calling this function. The worker thread 1086 * synchronizes with the squeue poll thread to complete the request and finally 1087 * wakes up the requestor when the request is completed. 1088 */ 1089 static void 1090 squeue_worker_thr_control(squeue_t *sqp) 1091 { 1092 ill_t *ill; 1093 ill_rx_ring_t *rx_ring; 1094 1095 ASSERT(MUTEX_HELD(&sqp->sq_lock)); 1096 1097 if (sqp->sq_state & SQS_POLL_RESTART) { 1098 /* Restart implies a previous quiesce. */ 1099 ASSERT((sqp->sq_state & (SQS_PROC_HELD | 1100 SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)) == 1101 (SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)); 1102 /* 1103 * Request the squeue poll thread to restart and wait till 1104 * it actually restarts. 1105 */ 1106 sqp->sq_state &= ~SQS_POLL_QUIESCE_DONE; 1107 sqp->sq_state |= SQS_POLL_THR_RESTART; 1108 cv_signal(&sqp->sq_poll_cv); 1109 while (sqp->sq_state & SQS_POLL_THR_QUIESCED) 1110 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock); 1111 sqp->sq_state &= ~(SQS_POLL_RESTART | SQS_PROC | 1112 SQS_WORKER); 1113 /* 1114 * Signal any waiter that is waiting for the restart 1115 * to complete 1116 */ 1117 sqp->sq_state |= SQS_POLL_RESTART_DONE; 1118 cv_signal(&sqp->sq_ctrlop_done_cv); 1119 return; 1120 } 1121 1122 if (sqp->sq_state & SQS_PROC_HELD) { 1123 /* The squeue poll thread handed control to us */ 1124 ASSERT(sqp->sq_state & SQS_PROC); 1125 } 1126 1127 /* 1128 * Prevent any other thread from processing the squeue 1129 * until we finish the control actions by setting SQS_PROC. 1130 * But allow ourself to reenter by setting SQS_WORKER 1131 */ 1132 sqp->sq_state |= (SQS_PROC | SQS_WORKER); 1133 1134 /* Signal the squeue poll thread and wait for it to quiesce itself */ 1135 if (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) { 1136 sqp->sq_state |= SQS_POLL_THR_QUIESCE; 1137 cv_signal(&sqp->sq_poll_cv); 1138 while (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) 1139 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock); 1140 } 1141 1142 rx_ring = sqp->sq_rx_ring; 1143 ill = rx_ring->rr_ill; 1144 /* 1145 * The lock hierarchy is as follows. 1146 * cpu_lock -> ill_lock -> sqset_lock -> sq_lock 1147 */ 1148 mutex_exit(&sqp->sq_lock); 1149 mutex_enter(&ill->ill_lock); 1150 mutex_enter(&sqp->sq_lock); 1151 1152 SQS_POLLING_OFF(sqp, (sqp->sq_state & SQS_POLL_CAPAB) != 0, 1153 sqp->sq_rx_ring); 1154 sqp->sq_state &= ~(SQS_POLL_CAPAB | SQS_GET_PKTS | SQS_PROC_HELD); 1155 if (sqp->sq_state & SQS_POLL_CLEANUP) { 1156 /* 1157 * Disassociate this squeue from its ill_rx_ring_t. 1158 * The rr_sqp, sq_rx_ring fields are protected by the 1159 * corresponding squeue, ill_lock* and sq_lock. Holding any 1160 * of them will ensure that the ring to squeue mapping does 1161 * not change. 1162 */ 1163 ASSERT(!(sqp->sq_state & SQS_DEFAULT)); 1164 1165 sqp->sq_rx_ring = NULL; 1166 rx_ring->rr_sqp = NULL; 1167 1168 sqp->sq_state &= ~(SQS_POLL_CLEANUP | SQS_POLL_THR_QUIESCED | 1169 SQS_POLL_QUIESCE_DONE); 1170 sqp->sq_ill = NULL; 1171 1172 rx_ring->rr_rx_handle = NULL; 1173 rx_ring->rr_intr_handle = NULL; 1174 rx_ring->rr_intr_enable = NULL; 1175 rx_ring->rr_intr_disable = NULL; 1176 sqp->sq_state |= SQS_POLL_CLEANUP_DONE; 1177 } else { 1178 sqp->sq_state &= ~SQS_POLL_QUIESCE; 1179 sqp->sq_state |= SQS_POLL_QUIESCE_DONE; 1180 } 1181 /* 1182 * Signal any waiter that is waiting for the quiesce or cleanup 1183 * to complete and also wait for it to actually see and reset the 1184 * SQS_POLL_CLEANUP_DONE. 1185 */ 1186 cv_signal(&sqp->sq_ctrlop_done_cv); 1187 mutex_exit(&ill->ill_lock); 1188 if (sqp->sq_state & SQS_POLL_CLEANUP_DONE) { 1189 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock); 1190 sqp->sq_state &= ~(SQS_PROC | SQS_WORKER); 1191 } 1192 } 1193 1194 static void 1195 squeue_worker(squeue_t *sqp) 1196 { 1197 kmutex_t *lock = &sqp->sq_lock; 1198 kcondvar_t *async = &sqp->sq_worker_cv; 1199 callb_cpr_t cprinfo; 1200 hrtime_t now; 1201 1202 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_worker"); 1203 mutex_enter(lock); 1204 1205 for (;;) { 1206 for (;;) { 1207 /* 1208 * If the poll thread has handed control to us 1209 * we need to break out of the wait. 1210 */ 1211 if (sqp->sq_state & SQS_PROC_HELD) 1212 break; 1213 1214 /* 1215 * If the squeue is not being processed and we either 1216 * have messages to drain or some thread has signaled 1217 * some control activity we need to break 1218 */ 1219 if (!(sqp->sq_state & SQS_PROC) && 1220 ((sqp->sq_state & SQS_WORKER_THR_CONTROL) || 1221 (sqp->sq_first != NULL))) 1222 break; 1223 1224 /* 1225 * If we have started some control action, then check 1226 * for the SQS_WORKER flag (since we don't 1227 * release the squeue) to make sure we own the squeue 1228 * and break out 1229 */ 1230 if ((sqp->sq_state & SQS_WORKER_THR_CONTROL) && 1231 (sqp->sq_state & SQS_WORKER)) 1232 break; 1233 1234 CALLB_CPR_SAFE_BEGIN(&cprinfo); 1235 cv_wait(async, lock); 1236 CALLB_CPR_SAFE_END(&cprinfo, lock); 1237 } 1238 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) { 1239 squeue_worker_thr_control(sqp); 1240 continue; 1241 } 1242 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 1243 SQS_POLL_CLEANUP_DONE | SQS_POLL_QUIESCE_DONE | 1244 SQS_WORKER_THR_CONTROL | SQS_POLL_THR_CONTROL))); 1245 1246 if (sqp->sq_state & SQS_PROC_HELD) 1247 sqp->sq_state &= ~SQS_PROC_HELD; 1248 1249 now = gethrtime(); 1250 sqp->sq_run = curthread; 1251 sqp->sq_drain(sqp, SQS_WORKER, now + squeue_drain_ns); 1252 sqp->sq_run = NULL; 1253 } 1254 } 1255 1256 uintptr_t * 1257 squeue_getprivate(squeue_t *sqp, sqprivate_t p) 1258 { 1259 ASSERT(p < SQPRIVATE_MAX); 1260 1261 return (&sqp->sq_private[p]); 1262 } 1263 1264 /* ARGSUSED */ 1265 void 1266 squeue_wakeup_conn(void *arg, mblk_t *mp, void *arg2) 1267 { 1268 conn_t *connp = (conn_t *)arg; 1269 squeue_t *sqp = connp->conn_sqp; 1270 1271 /* 1272 * Mark the squeue as paused before waking up the thread stuck 1273 * in squeue_synch_enter(). 1274 */ 1275 mutex_enter(&sqp->sq_lock); 1276 sqp->sq_state |= SQS_PAUSE; 1277 1278 /* 1279 * Notify the thread that it's OK to proceed; that is done by 1280 * clearing the MSGWAITSYNC flag. The synch thread will free the mblk. 1281 */ 1282 ASSERT(mp->b_flag & MSGWAITSYNC); 1283 mp->b_flag &= ~MSGWAITSYNC; 1284 cv_broadcast(&connp->conn_sq_cv); 1285 1286 /* 1287 * We are doing something on behalf of another thread, so we have to 1288 * pause and wait until it finishes. 1289 */ 1290 while (sqp->sq_state & SQS_PAUSE) { 1291 cv_wait(&sqp->sq_synch_cv, &sqp->sq_lock); 1292 } 1293 mutex_exit(&sqp->sq_lock); 1294 } 1295 1296 int 1297 squeue_synch_enter(squeue_t *sqp, conn_t *connp, mblk_t *use_mp) 1298 { 1299 mutex_enter(&sqp->sq_lock); 1300 if (sqp->sq_first == NULL && !(sqp->sq_state & SQS_PROC)) { 1301 /* 1302 * We are OK to proceed if the squeue is empty, and 1303 * no one owns the squeue. 1304 * 1305 * The caller won't own the squeue as this is called from the 1306 * application. 1307 */ 1308 ASSERT(sqp->sq_run == NULL); 1309 1310 sqp->sq_state |= SQS_PROC; 1311 sqp->sq_run = curthread; 1312 mutex_exit(&sqp->sq_lock); 1313 1314 #if SQUEUE_DEBUG 1315 sqp->sq_curmp = NULL; 1316 sqp->sq_curproc = NULL; 1317 sqp->sq_connp = connp; 1318 #endif 1319 connp->conn_on_sqp = B_TRUE; 1320 return (0); 1321 } else { 1322 mblk_t *mp; 1323 1324 mp = (use_mp == NULL) ? allocb(0, BPRI_MED) : use_mp; 1325 if (mp == NULL) { 1326 mutex_exit(&sqp->sq_lock); 1327 return (ENOMEM); 1328 } 1329 1330 /* 1331 * We mark the mblk as awaiting synchronous squeue access 1332 * by setting the MSGWAITSYNC flag. Once squeue_wakeup_conn 1333 * fires, MSGWAITSYNC is cleared, at which point we know we 1334 * have exclusive access. 1335 */ 1336 mp->b_flag |= MSGWAITSYNC; 1337 1338 CONN_INC_REF(connp); 1339 SET_SQUEUE(mp, squeue_wakeup_conn, connp); 1340 ENQUEUE_CHAIN(sqp, mp, mp, 1); 1341 1342 ASSERT(sqp->sq_run != curthread); 1343 1344 /* Wait until the enqueued mblk get processed. */ 1345 while (mp->b_flag & MSGWAITSYNC) 1346 cv_wait(&connp->conn_sq_cv, &sqp->sq_lock); 1347 mutex_exit(&sqp->sq_lock); 1348 1349 if (use_mp == NULL) 1350 freeb(mp); 1351 1352 return (0); 1353 } 1354 } 1355 1356 void 1357 squeue_synch_exit(squeue_t *sqp, conn_t *connp) 1358 { 1359 mutex_enter(&sqp->sq_lock); 1360 if (sqp->sq_run == curthread) { 1361 ASSERT(sqp->sq_state & SQS_PROC); 1362 1363 sqp->sq_state &= ~SQS_PROC; 1364 sqp->sq_run = NULL; 1365 connp->conn_on_sqp = B_FALSE; 1366 1367 if (sqp->sq_first == NULL) { 1368 mutex_exit(&sqp->sq_lock); 1369 } else { 1370 /* 1371 * If this was a normal thread, then it would 1372 * (most likely) continue processing the pending 1373 * requests. Since the just completed operation 1374 * was executed synchronously, the thread should 1375 * not be delayed. To compensate, wake up the 1376 * worker thread right away when there are outstanding 1377 * requests. 1378 */ 1379 sqp->sq_awaken = lbolt; 1380 cv_signal(&sqp->sq_worker_cv); 1381 mutex_exit(&sqp->sq_lock); 1382 } 1383 } else { 1384 /* 1385 * The caller doesn't own the squeue, clear the SQS_PAUSE flag, 1386 * and wake up the squeue owner, such that owner can continue 1387 * processing. 1388 */ 1389 ASSERT(sqp->sq_state & SQS_PAUSE); 1390 sqp->sq_state &= ~SQS_PAUSE; 1391 1392 /* There should be only one thread blocking on sq_synch_cv. */ 1393 cv_signal(&sqp->sq_synch_cv); 1394 mutex_exit(&sqp->sq_lock); 1395 } 1396 } 1397