1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2009 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 26 /* 27 * Squeues: General purpose serialization mechanism 28 * ------------------------------------------------ 29 * 30 * Background: 31 * ----------- 32 * 33 * This is a general purpose high-performance serialization mechanism 34 * currently used by TCP/IP. It is implement by means of a per CPU queue, 35 * a worker thread and a polling thread with are bound to the CPU 36 * associated with the squeue. The squeue is strictly FIFO for both read 37 * and write side and only one thread can process it at any given time. 38 * The design goal of squeue was to offer a very high degree of 39 * parallelization (on a per H/W execution pipeline basis) with at 40 * most one queuing. 41 * 42 * The modules needing protection typically calls squeue_enter() or 43 * squeue_enter_chain() routine as soon as a thread enter the module 44 * from either direction. For each packet, the processing function 45 * and argument is stored in the mblk itself. When the packet is ready 46 * to be processed, the squeue retrieves the stored function and calls 47 * it with the supplied argument and the pointer to the packet itself. 48 * The called function can assume that no other thread is processing 49 * the squeue when it is executing. 50 * 51 * Squeue/connection binding: 52 * -------------------------- 53 * 54 * TCP/IP uses an IP classifier in conjunction with squeue where specific 55 * connections are assigned to specific squeue (based on various policies), 56 * at the connection creation time. Once assigned, the connection to 57 * squeue mapping is never changed and all future packets for that 58 * connection are processed on that squeue. The connection ("conn") to 59 * squeue mapping is stored in "conn_t" member "conn_sqp". 60 * 61 * Since the processing of the connection cuts across multiple layers 62 * but still allows packets for different connnection to be processed on 63 * other CPU/squeues, squeues are also termed as "Vertical Perimeter" or 64 * "Per Connection Vertical Perimeter". 65 * 66 * Processing Model: 67 * ----------------- 68 * 69 * Squeue doesn't necessary processes packets with its own worker thread. 70 * The callers can pick if they just want to queue the packet, process 71 * their packet if nothing is queued or drain and process. The first two 72 * modes are typically employed when the packet was generated while 73 * already doing the processing behind the squeue and last mode (drain 74 * and process) is typically employed when the thread is entering squeue 75 * for the first time. The squeue still imposes a finite time limit 76 * for which a external thread can do processing after which it switches 77 * processing to its own worker thread. 78 * 79 * Once created, squeues are never deleted. Hence squeue pointers are 80 * always valid. This means that functions outside the squeue can still 81 * refer safely to conn_sqp and their is no need for ref counts. 82 * 83 * Only a thread executing in the squeue can change the squeue of the 84 * connection. It does so by calling a squeue framework function to do this. 85 * After changing the squeue, the thread must leave the squeue. It must not 86 * continue to execute any code that needs squeue protection. 87 * 88 * The squeue framework, after entering the squeue, checks if the current 89 * squeue matches the conn_sqp. If the check fails, the packet is delivered 90 * to right squeue. 91 * 92 * Polling Model: 93 * -------------- 94 * 95 * Squeues can control the rate of packet arrival into itself from the 96 * NIC or specific Rx ring within a NIC. As part of capability negotiation 97 * between IP and MAC layer, squeue are created for each TCP soft ring 98 * (or TCP Rx ring - to be implemented in future). As part of this 99 * negotiation, squeues get a cookie for underlying soft ring or Rx 100 * ring, a function to turn off incoming packets and a function to call 101 * to poll for packets. This helps schedule the receive side packet 102 * processing so that queue backlog doesn't build up and packet processing 103 * doesn't keep getting disturbed by high priority interrupts. As part 104 * of this mode, as soon as a backlog starts building, squeue turns off 105 * the interrupts and switches to poll mode. In poll mode, when poll 106 * thread goes down to retrieve packets, it retrieves them in the form of 107 * a chain which improves performance even more. As the squeue/softring 108 * system gets more packets, it gets more efficient by switching to 109 * polling more often and dealing with larger packet chains. 110 * 111 */ 112 113 #include <sys/types.h> 114 #include <sys/cmn_err.h> 115 #include <sys/debug.h> 116 #include <sys/kmem.h> 117 #include <sys/cpuvar.h> 118 #include <sys/condvar_impl.h> 119 #include <sys/systm.h> 120 #include <sys/callb.h> 121 #include <sys/sdt.h> 122 #include <sys/ddi.h> 123 #include <sys/sunddi.h> 124 125 #include <inet/ipclassifier.h> 126 #include <inet/udp_impl.h> 127 128 #include <sys/squeue_impl.h> 129 130 static void squeue_fire(void *); 131 static void squeue_drain(squeue_t *, uint_t, hrtime_t); 132 static void squeue_worker(squeue_t *sqp); 133 static void squeue_polling_thread(squeue_t *sqp); 134 135 kmem_cache_t *squeue_cache; 136 137 #define SQUEUE_MSEC_TO_NSEC 1000000 138 139 int squeue_drain_ms = 20; 140 int squeue_workerwait_ms = 0; 141 142 /* The values above converted to ticks or nano seconds */ 143 static int squeue_drain_ns = 0; 144 static int squeue_workerwait_tick = 0; 145 146 #define MAX_BYTES_TO_PICKUP 150000 147 148 #define ENQUEUE_CHAIN(sqp, mp, tail, cnt) { \ 149 /* \ 150 * Enqueue our mblk chain. \ 151 */ \ 152 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 153 \ 154 if ((sqp)->sq_last != NULL) \ 155 (sqp)->sq_last->b_next = (mp); \ 156 else \ 157 (sqp)->sq_first = (mp); \ 158 (sqp)->sq_last = (tail); \ 159 (sqp)->sq_count += (cnt); \ 160 ASSERT((sqp)->sq_count > 0); \ 161 DTRACE_PROBE4(squeue__enqueuechain, squeue_t *, sqp, \ 162 mblk_t *, mp, mblk_t *, tail, int, cnt); \ 163 \ 164 } 165 166 #define SQS_POLLING_ON(sqp, sq_poll_capable, rx_ring) { \ 167 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 168 if (sq_poll_capable) { \ 169 ASSERT(rx_ring != NULL); \ 170 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \ 171 if (!(sqp->sq_state & SQS_POLLING)) { \ 172 sqp->sq_state |= SQS_POLLING; \ 173 rx_ring->rr_intr_disable(rx_ring->rr_intr_handle); \ 174 } \ 175 } \ 176 } 177 178 #define SQS_POLLING_OFF(sqp, sq_poll_capable, rx_ring) { \ 179 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 180 if (sq_poll_capable) { \ 181 ASSERT(rx_ring != NULL); \ 182 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \ 183 if (sqp->sq_state & SQS_POLLING) { \ 184 sqp->sq_state &= ~SQS_POLLING; \ 185 rx_ring->rr_intr_enable(rx_ring->rr_intr_handle); \ 186 } \ 187 } \ 188 } 189 190 #define SQS_POLL_RING(sqp, sq_poll_capable) { \ 191 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 192 if (sq_poll_capable) { \ 193 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \ 194 if (!(sqp->sq_state & SQS_GET_PKTS)) { \ 195 sqp->sq_state |= SQS_GET_PKTS; \ 196 cv_signal(&sqp->sq_poll_cv); \ 197 } \ 198 } \ 199 } 200 201 #ifdef DEBUG 202 #define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) { \ 203 (sqp)->sq_curmp = (mp); \ 204 (sqp)->sq_curproc = (proc); \ 205 (sqp)->sq_connp = (connp); \ 206 (mp)->b_tag = (sqp)->sq_tag = (tag); \ 207 } 208 209 #define SQUEUE_DBG_CLEAR(sqp) { \ 210 (sqp)->sq_curmp = NULL; \ 211 (sqp)->sq_curproc = NULL; \ 212 (sqp)->sq_connp = NULL; \ 213 } 214 #else 215 #define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) 216 #define SQUEUE_DBG_CLEAR(sqp) 217 #endif 218 219 void 220 squeue_init(void) 221 { 222 squeue_cache = kmem_cache_create("squeue_cache", 223 sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0); 224 225 squeue_drain_ns = squeue_drain_ms * SQUEUE_MSEC_TO_NSEC; 226 squeue_workerwait_tick = MSEC_TO_TICK_ROUNDUP(squeue_workerwait_ms); 227 } 228 229 /* ARGSUSED */ 230 squeue_t * 231 squeue_create(clock_t wait, pri_t pri) 232 { 233 squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP); 234 235 bzero(sqp, sizeof (squeue_t)); 236 sqp->sq_bind = PBIND_NONE; 237 sqp->sq_priority = pri; 238 sqp->sq_wait = MSEC_TO_TICK(wait); 239 sqp->sq_worker = thread_create(NULL, 0, squeue_worker, 240 sqp, 0, &p0, TS_RUN, pri); 241 242 sqp->sq_poll_thr = thread_create(NULL, 0, squeue_polling_thread, 243 sqp, 0, &p0, TS_RUN, pri); 244 245 sqp->sq_enter = squeue_enter; 246 sqp->sq_drain = squeue_drain; 247 248 return (sqp); 249 } 250 251 /* 252 * Bind squeue worker thread to the specified CPU, given by CPU id. 253 * If the CPU id value is -1, bind the worker thread to the value 254 * specified in sq_bind field. If a thread is already bound to a 255 * different CPU, unbind it from the old CPU and bind to the new one. 256 */ 257 258 void 259 squeue_bind(squeue_t *sqp, processorid_t bind) 260 { 261 mutex_enter(&sqp->sq_lock); 262 ASSERT(sqp->sq_bind != PBIND_NONE || bind != PBIND_NONE); 263 ASSERT(MUTEX_HELD(&cpu_lock)); 264 265 if (sqp->sq_state & SQS_BOUND) { 266 if (sqp->sq_bind == bind) { 267 mutex_exit(&sqp->sq_lock); 268 return; 269 } 270 thread_affinity_clear(sqp->sq_worker); 271 } else { 272 sqp->sq_state |= SQS_BOUND; 273 } 274 275 if (bind != PBIND_NONE) 276 sqp->sq_bind = bind; 277 278 thread_affinity_set(sqp->sq_worker, sqp->sq_bind); 279 mutex_exit(&sqp->sq_lock); 280 } 281 282 void 283 squeue_unbind(squeue_t *sqp) 284 { 285 mutex_enter(&sqp->sq_lock); 286 if (!(sqp->sq_state & SQS_BOUND)) { 287 mutex_exit(&sqp->sq_lock); 288 return; 289 } 290 291 sqp->sq_state &= ~SQS_BOUND; 292 thread_affinity_clear(sqp->sq_worker); 293 mutex_exit(&sqp->sq_lock); 294 } 295 296 void 297 squeue_worker_wakeup(squeue_t *sqp) 298 { 299 timeout_id_t tid = (sqp)->sq_tid; 300 301 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); 302 303 if (sqp->sq_wait == 0) { 304 ASSERT(tid == 0); 305 ASSERT(!(sqp->sq_state & SQS_TMO_PROG)); 306 sqp->sq_awaken = lbolt; 307 cv_signal(&sqp->sq_worker_cv); 308 mutex_exit(&sqp->sq_lock); 309 return; 310 } 311 312 /* 313 * Queue isn't being processed, so take 314 * any post enqueue actions needed before leaving. 315 */ 316 if (tid != 0) { 317 /* 318 * Waiting for an enter() to process mblk(s). 319 */ 320 clock_t waited = lbolt - sqp->sq_awaken; 321 322 if (TICK_TO_MSEC(waited) >= sqp->sq_wait) { 323 /* 324 * Times up and have a worker thread 325 * waiting for work, so schedule it. 326 */ 327 sqp->sq_tid = 0; 328 sqp->sq_awaken = lbolt; 329 cv_signal(&sqp->sq_worker_cv); 330 mutex_exit(&sqp->sq_lock); 331 (void) untimeout(tid); 332 return; 333 } 334 mutex_exit(&sqp->sq_lock); 335 return; 336 } else if (sqp->sq_state & SQS_TMO_PROG) { 337 mutex_exit(&sqp->sq_lock); 338 return; 339 } else { 340 clock_t wait = sqp->sq_wait; 341 /* 342 * Wait up to sqp->sq_wait ms for an 343 * enter() to process this queue. We 344 * don't want to contend on timeout locks 345 * with sq_lock held for performance reasons, 346 * so drop the sq_lock before calling timeout 347 * but we need to check if timeout is required 348 * after re acquiring the sq_lock. Once 349 * the sq_lock is dropped, someone else could 350 * have processed the packet or the timeout could 351 * have already fired. 352 */ 353 sqp->sq_state |= SQS_TMO_PROG; 354 mutex_exit(&sqp->sq_lock); 355 tid = timeout(squeue_fire, sqp, wait); 356 mutex_enter(&sqp->sq_lock); 357 /* Check again if we still need the timeout */ 358 if (((sqp->sq_state & (SQS_PROC|SQS_TMO_PROG)) == 359 SQS_TMO_PROG) && (sqp->sq_tid == 0) && 360 (sqp->sq_first != NULL)) { 361 sqp->sq_state &= ~SQS_TMO_PROG; 362 sqp->sq_tid = tid; 363 mutex_exit(&sqp->sq_lock); 364 return; 365 } else { 366 if (sqp->sq_state & SQS_TMO_PROG) { 367 sqp->sq_state &= ~SQS_TMO_PROG; 368 mutex_exit(&sqp->sq_lock); 369 (void) untimeout(tid); 370 } else { 371 /* 372 * The timer fired before we could 373 * reacquire the sq_lock. squeue_fire 374 * removes the SQS_TMO_PROG flag 375 * and we don't need to do anything 376 * else. 377 */ 378 mutex_exit(&sqp->sq_lock); 379 } 380 } 381 } 382 383 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 384 } 385 386 /* 387 * squeue_enter() - enter squeue sqp with mblk mp (which can be 388 * a chain), while tail points to the end and cnt in number of 389 * mblks in the chain. 390 * 391 * For a chain of single packet (i.e. mp == tail), go through the 392 * fast path if no one is processing the squeue and nothing is queued. 393 * 394 * The proc and arg for each mblk is already stored in the mblk in 395 * appropriate places. 396 * 397 * The process_flag specifies if we are allowed to process the mblk 398 * and drain in the entering thread context. If process_flag is 399 * SQ_FILL, then we just queue the mblk and return (after signaling 400 * the worker thread if no one else is processing the squeue). 401 */ 402 /* ARGSUSED */ 403 void 404 squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt, 405 int process_flag, uint8_t tag) 406 { 407 conn_t *connp; 408 sqproc_t proc; 409 hrtime_t now; 410 411 ASSERT(sqp != NULL); 412 ASSERT(mp != NULL); 413 ASSERT(tail != NULL); 414 ASSERT(cnt > 0); 415 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 416 417 mutex_enter(&sqp->sq_lock); 418 419 /* 420 * Try to process the packet if SQ_FILL flag is not set and 421 * we are allowed to process the squeue. The SQ_NODRAIN is 422 * ignored if the packet chain consists of more than 1 packet. 423 */ 424 if (!(sqp->sq_state & SQS_PROC) && ((process_flag == SQ_PROCESS) || 425 (process_flag == SQ_NODRAIN && sqp->sq_first == NULL))) { 426 /* 427 * See if anything is already queued. If we are the 428 * first packet, do inline processing else queue the 429 * packet and do the drain. 430 */ 431 if (sqp->sq_first == NULL && cnt == 1) { 432 /* 433 * Fast-path, ok to process and nothing queued. 434 */ 435 sqp->sq_state |= (SQS_PROC|SQS_FAST); 436 sqp->sq_run = curthread; 437 mutex_exit(&sqp->sq_lock); 438 439 /* 440 * We are the chain of 1 packet so 441 * go through this fast path. 442 */ 443 ASSERT(mp->b_prev != NULL); 444 ASSERT(mp->b_queue != NULL); 445 connp = (conn_t *)mp->b_prev; 446 mp->b_prev = NULL; 447 proc = (sqproc_t)mp->b_queue; 448 mp->b_queue = NULL; 449 ASSERT(proc != NULL && connp != NULL); 450 ASSERT(mp->b_next == NULL); 451 452 /* 453 * Handle squeue switching. More details in the 454 * block comment at the top of the file 455 */ 456 if (connp->conn_sqp == sqp) { 457 SQUEUE_DBG_SET(sqp, mp, proc, connp, 458 tag); 459 connp->conn_on_sqp = B_TRUE; 460 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 461 sqp, mblk_t *, mp, conn_t *, connp); 462 (*proc)(connp, mp, sqp); 463 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 464 sqp, conn_t *, connp); 465 connp->conn_on_sqp = B_FALSE; 466 SQUEUE_DBG_CLEAR(sqp); 467 CONN_DEC_REF(connp); 468 } else { 469 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, 470 connp, SQ_FILL, SQTAG_SQUEUE_CHANGE); 471 } 472 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 473 mutex_enter(&sqp->sq_lock); 474 sqp->sq_state &= ~(SQS_PROC|SQS_FAST); 475 sqp->sq_run = NULL; 476 if (sqp->sq_first == NULL || 477 process_flag == SQ_NODRAIN) { 478 if (sqp->sq_first != NULL) { 479 squeue_worker_wakeup(sqp); 480 return; 481 } 482 /* 483 * We processed inline our packet and nothing 484 * new has arrived. We are done. In case any 485 * control actions are pending, wake up the 486 * worker. 487 */ 488 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) 489 cv_signal(&sqp->sq_worker_cv); 490 mutex_exit(&sqp->sq_lock); 491 return; 492 } 493 } else { 494 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 495 #ifdef DEBUG 496 mp->b_tag = tag; 497 #endif 498 } 499 /* 500 * We are here because either we couldn't do inline 501 * processing (because something was already queued), 502 * or we had a chain of more than one packet, 503 * or something else arrived after we were done with 504 * inline processing. 505 */ 506 ASSERT(MUTEX_HELD(&sqp->sq_lock)); 507 ASSERT(sqp->sq_first != NULL); 508 now = gethrtime(); 509 sqp->sq_drain(sqp, SQS_ENTER, now + squeue_drain_ns); 510 511 /* 512 * If we didn't do a complete drain, the worker 513 * thread was already signalled by squeue_drain. 514 * In case any control actions are pending, wake 515 * up the worker. 516 */ 517 sqp->sq_run = NULL; 518 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) 519 cv_signal(&sqp->sq_worker_cv); 520 mutex_exit(&sqp->sq_lock); 521 return; 522 } else { 523 /* 524 * We let a thread processing a squeue reenter only 525 * once. This helps the case of incoming connection 526 * where a SYN-ACK-ACK that triggers the conn_ind 527 * doesn't have to queue the packet if listener and 528 * eager are on the same squeue. Also helps the 529 * loopback connection where the two ends are bound 530 * to the same squeue (which is typical on single 531 * CPU machines). 532 * 533 * We let the thread reenter only once for the fear 534 * of stack getting blown with multiple traversal. 535 */ 536 connp = (conn_t *)mp->b_prev; 537 if (!(sqp->sq_state & SQS_REENTER) && 538 (process_flag != SQ_FILL) && (sqp->sq_first == NULL) && 539 (sqp->sq_run == curthread) && (cnt == 1) && 540 (connp->conn_on_sqp == B_FALSE)) { 541 sqp->sq_state |= SQS_REENTER; 542 mutex_exit(&sqp->sq_lock); 543 544 ASSERT(mp->b_prev != NULL); 545 ASSERT(mp->b_queue != NULL); 546 547 mp->b_prev = NULL; 548 proc = (sqproc_t)mp->b_queue; 549 mp->b_queue = NULL; 550 551 /* 552 * Handle squeue switching. More details in the 553 * block comment at the top of the file 554 */ 555 if (connp->conn_sqp == sqp) { 556 connp->conn_on_sqp = B_TRUE; 557 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 558 sqp, mblk_t *, mp, conn_t *, connp); 559 (*proc)(connp, mp, sqp); 560 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 561 sqp, conn_t *, connp); 562 connp->conn_on_sqp = B_FALSE; 563 CONN_DEC_REF(connp); 564 } else { 565 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, 566 connp, SQ_FILL, SQTAG_SQUEUE_CHANGE); 567 } 568 569 mutex_enter(&sqp->sq_lock); 570 sqp->sq_state &= ~SQS_REENTER; 571 mutex_exit(&sqp->sq_lock); 572 return; 573 } 574 575 /* 576 * Queue is already being processed or there is already 577 * one or more paquets on the queue. Enqueue the 578 * packet and wakeup the squeue worker thread if the 579 * squeue is not being processed. 580 */ 581 #ifdef DEBUG 582 mp->b_tag = tag; 583 #endif 584 585 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 586 if (!(sqp->sq_state & SQS_PROC)) { 587 squeue_worker_wakeup(sqp); 588 return; 589 } 590 /* 591 * In case any control actions are pending, wake 592 * up the worker. 593 */ 594 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) 595 cv_signal(&sqp->sq_worker_cv); 596 mutex_exit(&sqp->sq_lock); 597 return; 598 } 599 } 600 601 /* 602 * PRIVATE FUNCTIONS 603 */ 604 605 static void 606 squeue_fire(void *arg) 607 { 608 squeue_t *sqp = arg; 609 uint_t state; 610 611 mutex_enter(&sqp->sq_lock); 612 613 state = sqp->sq_state; 614 if (sqp->sq_tid == 0 && !(state & SQS_TMO_PROG)) { 615 mutex_exit(&sqp->sq_lock); 616 return; 617 } 618 619 sqp->sq_tid = 0; 620 /* 621 * The timeout fired before we got a chance to set it. 622 * Process it anyway but remove the SQS_TMO_PROG so that 623 * the guy trying to set the timeout knows that it has 624 * already been processed. 625 */ 626 if (state & SQS_TMO_PROG) 627 sqp->sq_state &= ~SQS_TMO_PROG; 628 629 if (!(state & SQS_PROC)) { 630 sqp->sq_awaken = lbolt; 631 cv_signal(&sqp->sq_worker_cv); 632 } 633 mutex_exit(&sqp->sq_lock); 634 } 635 636 static void 637 squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire) 638 { 639 mblk_t *mp; 640 mblk_t *head; 641 sqproc_t proc; 642 conn_t *connp; 643 timeout_id_t tid; 644 ill_rx_ring_t *sq_rx_ring = sqp->sq_rx_ring; 645 hrtime_t now; 646 boolean_t did_wakeup = B_FALSE; 647 boolean_t sq_poll_capable; 648 649 sq_poll_capable = (sqp->sq_state & SQS_POLL_CAPAB) != 0; 650 again: 651 ASSERT(mutex_owned(&sqp->sq_lock)); 652 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 653 SQS_POLL_QUIESCE_DONE))); 654 655 head = sqp->sq_first; 656 sqp->sq_first = NULL; 657 sqp->sq_last = NULL; 658 sqp->sq_count = 0; 659 660 if ((tid = sqp->sq_tid) != 0) 661 sqp->sq_tid = 0; 662 663 sqp->sq_state |= SQS_PROC | proc_type; 664 665 666 /* 667 * We have backlog built up. Switch to polling mode if the 668 * device underneath allows it. Need to do it so that 669 * more packets don't come in and disturb us (by contending 670 * for sq_lock or higher priority thread preempting us). 671 * 672 * The worker thread is allowed to do active polling while we 673 * just disable the interrupts for drain by non worker (kernel 674 * or userland) threads so they can peacefully process the 675 * packets during time allocated to them. 676 */ 677 SQS_POLLING_ON(sqp, sq_poll_capable, sq_rx_ring); 678 mutex_exit(&sqp->sq_lock); 679 680 if (tid != 0) 681 (void) untimeout(tid); 682 683 while ((mp = head) != NULL) { 684 685 head = mp->b_next; 686 mp->b_next = NULL; 687 688 proc = (sqproc_t)mp->b_queue; 689 mp->b_queue = NULL; 690 connp = (conn_t *)mp->b_prev; 691 mp->b_prev = NULL; 692 693 /* 694 * Handle squeue switching. More details in the 695 * block comment at the top of the file 696 */ 697 if (connp->conn_sqp == sqp) { 698 SQUEUE_DBG_SET(sqp, mp, proc, connp, 699 mp->b_tag); 700 connp->conn_on_sqp = B_TRUE; 701 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 702 sqp, mblk_t *, mp, conn_t *, connp); 703 (*proc)(connp, mp, sqp); 704 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 705 sqp, conn_t *, connp); 706 connp->conn_on_sqp = B_FALSE; 707 CONN_DEC_REF(connp); 708 } else { 709 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, connp, 710 SQ_FILL, SQTAG_SQUEUE_CHANGE); 711 } 712 } 713 714 SQUEUE_DBG_CLEAR(sqp); 715 716 mutex_enter(&sqp->sq_lock); 717 718 /* 719 * Check if there is still work to do (either more arrived or timer 720 * expired). If we are the worker thread and we are polling capable, 721 * continue doing the work since no one else is around to do the 722 * work anyway (but signal the poll thread to retrieve some packets 723 * in the meanwhile). If we are not the worker thread, just 724 * signal the worker thread to take up the work if processing time 725 * has expired. 726 */ 727 if (sqp->sq_first != NULL) { 728 /* 729 * Still more to process. If time quanta not expired, we 730 * should let the drain go on. The worker thread is allowed 731 * to drain as long as there is anything left. 732 */ 733 now = gethrtime(); 734 if ((now < expire) || (proc_type == SQS_WORKER)) { 735 /* 736 * If time not expired or we are worker thread and 737 * this squeue is polling capable, continue to do 738 * the drain. 739 * 740 * We turn off interrupts for all userland threads 741 * doing drain but we do active polling only for 742 * worker thread. 743 */ 744 if (proc_type == SQS_WORKER) 745 SQS_POLL_RING(sqp, sq_poll_capable); 746 goto again; 747 } else { 748 did_wakeup = B_TRUE; 749 sqp->sq_awaken = lbolt; 750 cv_signal(&sqp->sq_worker_cv); 751 } 752 } 753 754 /* 755 * If the poll thread is already running, just return. The 756 * poll thread continues to hold the proc and will finish 757 * processing. 758 */ 759 if (sqp->sq_state & SQS_GET_PKTS) { 760 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 761 SQS_POLL_QUIESCE_DONE))); 762 sqp->sq_state &= ~proc_type; 763 return; 764 } 765 766 /* 767 * 768 * If we are the worker thread and no work is left, send the poll 769 * thread down once more to see if something arrived. Otherwise, 770 * turn the interrupts back on and we are done. 771 */ 772 if ((proc_type == SQS_WORKER) && 773 (sqp->sq_state & SQS_POLL_CAPAB)) { 774 /* 775 * Do one last check to see if anything arrived 776 * in the NIC. We leave the SQS_PROC set to ensure 777 * that poll thread keeps the PROC and can decide 778 * if it needs to turn polling off or continue 779 * processing. 780 * 781 * If we drop the SQS_PROC here and poll thread comes 782 * up empty handed, it can not safely turn polling off 783 * since someone else could have acquired the PROC 784 * and started draining. The previously running poll 785 * thread and the current thread doing drain would end 786 * up in a race for turning polling on/off and more 787 * complex code would be required to deal with it. 788 * 789 * Its lot simpler for drain to hand the SQS_PROC to 790 * poll thread (if running) and let poll thread finish 791 * without worrying about racing with any other thread. 792 */ 793 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 794 SQS_POLL_QUIESCE_DONE))); 795 SQS_POLL_RING(sqp, sq_poll_capable); 796 sqp->sq_state &= ~proc_type; 797 } else { 798 /* 799 * The squeue is either not capable of polling or 800 * poll thread already finished processing and didn't 801 * find anything. Since there is nothing queued and 802 * we already turn polling on (for all threads doing 803 * drain), we should turn polling off and relinquish 804 * the PROC. 805 */ 806 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 807 SQS_POLL_QUIESCE_DONE))); 808 SQS_POLLING_OFF(sqp, sq_poll_capable, sq_rx_ring); 809 sqp->sq_state &= ~(SQS_PROC | proc_type); 810 if (!did_wakeup && sqp->sq_first != NULL) { 811 squeue_worker_wakeup(sqp); 812 mutex_enter(&sqp->sq_lock); 813 } 814 /* 815 * If we are not the worker and there is a pending quiesce 816 * event, wake up the worker 817 */ 818 if ((proc_type != SQS_WORKER) && 819 (sqp->sq_state & SQS_WORKER_THR_CONTROL)) 820 cv_signal(&sqp->sq_worker_cv); 821 } 822 } 823 824 /* 825 * Quiesce, Restart, or Cleanup of the squeue poll thread. 826 * 827 * Quiesce and Restart: After an squeue poll thread has been quiesced, it does 828 * not attempt to poll the underlying soft ring any more. The quiesce is 829 * triggered by the mac layer when it wants to quiesce a soft ring. Typically 830 * control operations such as changing the fanout of a NIC or VNIC (dladm 831 * setlinkprop) need to quiesce data flow before changing the wiring. 832 * The operation is done by the mac layer, but it calls back into IP to 833 * quiesce the soft ring. After completing the operation (say increase or 834 * decrease of the fanout) the mac layer then calls back into IP to restart 835 * the quiesced soft ring. 836 * 837 * Cleanup: This is triggered when the squeue binding to a soft ring is 838 * removed permanently. Typically interface plumb and unplumb would trigger 839 * this. It can also be triggered from the mac layer when a soft ring is 840 * being deleted say as the result of a fanout reduction. Since squeues are 841 * never deleted, the cleanup marks the squeue as fit for recycling and 842 * moves it to the zeroth squeue set. 843 */ 844 static void 845 squeue_poll_thr_control(squeue_t *sqp) 846 { 847 if (sqp->sq_state & SQS_POLL_THR_RESTART) { 848 /* Restart implies a previous quiesce */ 849 ASSERT(sqp->sq_state & SQS_POLL_THR_QUIESCED); 850 sqp->sq_state &= ~(SQS_POLL_THR_QUIESCED | 851 SQS_POLL_THR_RESTART); 852 sqp->sq_state |= SQS_POLL_CAPAB; 853 cv_signal(&sqp->sq_worker_cv); 854 return; 855 } 856 857 if (sqp->sq_state & SQS_POLL_THR_QUIESCE) { 858 sqp->sq_state |= SQS_POLL_THR_QUIESCED; 859 sqp->sq_state &= ~SQS_POLL_THR_QUIESCE; 860 cv_signal(&sqp->sq_worker_cv); 861 return; 862 } 863 } 864 865 /* 866 * POLLING Notes 867 * 868 * With polling mode, we want to do as much processing as we possibly can 869 * in worker thread context. The sweet spot is worker thread keeps doing 870 * work all the time in polling mode and writers etc. keep dumping packets 871 * to worker thread. Occassionally, we send the poll thread (running at 872 * lower priority to NIC to get the chain of packets to feed to worker). 873 * Sending the poll thread down to NIC is dependant on 3 criterions 874 * 875 * 1) Its always driven from squeue_drain and only if worker thread is 876 * doing the drain. 877 * 2) We clear the backlog once and more packets arrived in between. 878 * Before starting drain again, send the poll thread down if 879 * the drain is being done by worker thread. 880 * 3) Before exiting the squeue_drain, if the poll thread is not already 881 * working and we are the worker thread, try to poll one more time. 882 * 883 * For latency sake, we do allow any thread calling squeue_enter 884 * to process its packet provided: 885 * 886 * 1) Nothing is queued 887 * 2) If more packets arrived in between, the non worker thread are allowed 888 * to do the drain till their time quanta expired provided SQS_GET_PKTS 889 * wasn't set in between. 890 * 891 * Avoiding deadlocks with interrupts 892 * ================================== 893 * 894 * One of the big problem is that we can't send poll_thr down while holding 895 * the sq_lock since the thread can block. So we drop the sq_lock before 896 * calling sq_get_pkts(). We keep holding the SQS_PROC as long as the 897 * poll thread is running so that no other thread can acquire the 898 * perimeter in between. If the squeue_drain gets done (no more work 899 * left), it leaves the SQS_PROC set if poll thread is running. 900 */ 901 902 /* 903 * This is the squeue poll thread. In poll mode, it polls the underlying 904 * TCP softring and feeds packets into the squeue. The worker thread then 905 * drains the squeue. The poll thread also responds to control signals for 906 * quiesceing, restarting, or cleanup of an squeue. These are driven by 907 * control operations like plumb/unplumb or as a result of dynamic Rx ring 908 * related operations that are driven from the mac layer. 909 */ 910 static void 911 squeue_polling_thread(squeue_t *sqp) 912 { 913 kmutex_t *lock = &sqp->sq_lock; 914 kcondvar_t *async = &sqp->sq_poll_cv; 915 ip_mac_rx_t sq_get_pkts; 916 ip_accept_t ip_accept; 917 ill_rx_ring_t *sq_rx_ring; 918 ill_t *sq_ill; 919 mblk_t *head, *tail, *mp; 920 uint_t cnt; 921 void *sq_mac_handle; 922 callb_cpr_t cprinfo; 923 size_t bytes_to_pickup; 924 uint32_t ctl_state; 925 926 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_poll"); 927 mutex_enter(lock); 928 929 for (;;) { 930 CALLB_CPR_SAFE_BEGIN(&cprinfo); 931 cv_wait(async, lock); 932 CALLB_CPR_SAFE_END(&cprinfo, lock); 933 934 ctl_state = sqp->sq_state & (SQS_POLL_THR_CONTROL | 935 SQS_POLL_THR_QUIESCED); 936 if (ctl_state != 0) { 937 /* 938 * If the squeue is quiesced, then wait for a control 939 * request. A quiesced squeue must not poll the 940 * underlying soft ring. 941 */ 942 if (ctl_state == SQS_POLL_THR_QUIESCED) 943 continue; 944 /* 945 * Act on control requests to quiesce, cleanup or 946 * restart an squeue 947 */ 948 squeue_poll_thr_control(sqp); 949 continue; 950 } 951 952 if (!(sqp->sq_state & SQS_POLL_CAPAB)) 953 continue; 954 955 ASSERT((sqp->sq_state & 956 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) == 957 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)); 958 959 poll_again: 960 sq_rx_ring = sqp->sq_rx_ring; 961 sq_get_pkts = sq_rx_ring->rr_rx; 962 sq_mac_handle = sq_rx_ring->rr_rx_handle; 963 ip_accept = sq_rx_ring->rr_ip_accept; 964 sq_ill = sq_rx_ring->rr_ill; 965 bytes_to_pickup = MAX_BYTES_TO_PICKUP; 966 mutex_exit(lock); 967 head = sq_get_pkts(sq_mac_handle, bytes_to_pickup); 968 mp = NULL; 969 if (head != NULL) { 970 /* 971 * We got the packet chain from the mac layer. It 972 * would be nice to be able to process it inline 973 * for better performance but we need to give 974 * IP a chance to look at this chain to ensure 975 * that packets are really meant for this squeue 976 * and do the IP processing. 977 */ 978 mp = ip_accept(sq_ill, sq_rx_ring, sqp, head, 979 &tail, &cnt); 980 } 981 mutex_enter(lock); 982 if (mp != NULL) 983 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 984 985 ASSERT((sqp->sq_state & 986 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) == 987 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)); 988 989 if (sqp->sq_first != NULL && !(sqp->sq_state & SQS_WORKER)) { 990 /* 991 * We have packets to process and worker thread 992 * is not running. Check to see if poll thread is 993 * allowed to process. Let it do processing only if it 994 * picked up some packets from the NIC otherwise 995 * wakeup the worker thread. 996 */ 997 if (mp != NULL) { 998 hrtime_t now; 999 1000 now = gethrtime(); 1001 sqp->sq_run = curthread; 1002 sqp->sq_drain(sqp, SQS_POLL_PROC, now + 1003 squeue_drain_ns); 1004 sqp->sq_run = NULL; 1005 1006 if (sqp->sq_first == NULL) 1007 goto poll_again; 1008 1009 /* 1010 * Couldn't do the entire drain because the 1011 * time limit expired, let the 1012 * worker thread take over. 1013 */ 1014 } 1015 1016 sqp->sq_awaken = lbolt; 1017 /* 1018 * Put the SQS_PROC_HELD on so the worker 1019 * thread can distinguish where its called from. We 1020 * can remove the SQS_PROC flag here and turn off the 1021 * polling so that it wouldn't matter who gets the 1022 * processing but we get better performance this way 1023 * and save the cost of turn polling off and possibly 1024 * on again as soon as we start draining again. 1025 * 1026 * We can't remove the SQS_PROC flag without turning 1027 * polling off until we can guarantee that control 1028 * will return to squeue_drain immediately. 1029 */ 1030 sqp->sq_state |= SQS_PROC_HELD; 1031 sqp->sq_state &= ~SQS_GET_PKTS; 1032 cv_signal(&sqp->sq_worker_cv); 1033 } else if (sqp->sq_first == NULL && 1034 !(sqp->sq_state & SQS_WORKER)) { 1035 /* 1036 * Nothing queued and worker thread not running. 1037 * Since we hold the proc, no other thread is 1038 * processing the squeue. This means that there 1039 * is no work to be done and nothing is queued 1040 * in squeue or in NIC. Turn polling off and go 1041 * back to interrupt mode. 1042 */ 1043 sqp->sq_state &= ~(SQS_PROC|SQS_GET_PKTS); 1044 /* LINTED: constant in conditional context */ 1045 SQS_POLLING_OFF(sqp, B_TRUE, sq_rx_ring); 1046 } else { 1047 /* 1048 * Worker thread is already running. We don't need 1049 * to do anything. Indicate that poll thread is done. 1050 */ 1051 sqp->sq_state &= ~SQS_GET_PKTS; 1052 } 1053 if (sqp->sq_state & SQS_POLL_THR_CONTROL) { 1054 /* 1055 * Act on control requests to quiesce, cleanup or 1056 * restart an squeue 1057 */ 1058 squeue_poll_thr_control(sqp); 1059 } 1060 } 1061 } 1062 1063 /* 1064 * The squeue worker thread acts on any control requests to quiesce, cleanup 1065 * or restart an ill_rx_ring_t by calling this function. The worker thread 1066 * synchronizes with the squeue poll thread to complete the request and finally 1067 * wakes up the requestor when the request is completed. 1068 */ 1069 static void 1070 squeue_worker_thr_control(squeue_t *sqp) 1071 { 1072 ill_t *ill; 1073 ill_rx_ring_t *rx_ring; 1074 1075 ASSERT(MUTEX_HELD(&sqp->sq_lock)); 1076 1077 if (sqp->sq_state & SQS_POLL_RESTART) { 1078 /* Restart implies a previous quiesce. */ 1079 ASSERT((sqp->sq_state & (SQS_PROC_HELD | 1080 SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)) == 1081 (SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)); 1082 /* 1083 * Request the squeue poll thread to restart and wait till 1084 * it actually restarts. 1085 */ 1086 sqp->sq_state &= ~SQS_POLL_QUIESCE_DONE; 1087 sqp->sq_state |= SQS_POLL_THR_RESTART; 1088 cv_signal(&sqp->sq_poll_cv); 1089 while (sqp->sq_state & SQS_POLL_THR_QUIESCED) 1090 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock); 1091 sqp->sq_state &= ~(SQS_POLL_RESTART | SQS_PROC | 1092 SQS_WORKER); 1093 /* 1094 * Signal any waiter that is waiting for the restart 1095 * to complete 1096 */ 1097 sqp->sq_state |= SQS_POLL_RESTART_DONE; 1098 cv_signal(&sqp->sq_ctrlop_done_cv); 1099 return; 1100 } 1101 1102 if (sqp->sq_state & SQS_PROC_HELD) { 1103 /* The squeue poll thread handed control to us */ 1104 ASSERT(sqp->sq_state & SQS_PROC); 1105 } 1106 1107 /* 1108 * Prevent any other thread from processing the squeue 1109 * until we finish the control actions by setting SQS_PROC. 1110 * But allow ourself to reenter by setting SQS_WORKER 1111 */ 1112 sqp->sq_state |= (SQS_PROC | SQS_WORKER); 1113 1114 /* Signal the squeue poll thread and wait for it to quiesce itself */ 1115 if (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) { 1116 sqp->sq_state |= SQS_POLL_THR_QUIESCE; 1117 cv_signal(&sqp->sq_poll_cv); 1118 while (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) 1119 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock); 1120 } 1121 1122 rx_ring = sqp->sq_rx_ring; 1123 ill = rx_ring->rr_ill; 1124 /* 1125 * The lock hierarchy is as follows. 1126 * cpu_lock -> ill_lock -> sqset_lock -> sq_lock 1127 */ 1128 mutex_exit(&sqp->sq_lock); 1129 mutex_enter(&ill->ill_lock); 1130 mutex_enter(&sqp->sq_lock); 1131 1132 SQS_POLLING_OFF(sqp, (sqp->sq_state & SQS_POLL_CAPAB) != 0, 1133 sqp->sq_rx_ring); 1134 sqp->sq_state &= ~(SQS_POLL_CAPAB | SQS_GET_PKTS | SQS_PROC_HELD); 1135 if (sqp->sq_state & SQS_POLL_CLEANUP) { 1136 /* 1137 * Disassociate this squeue from its ill_rx_ring_t. 1138 * The rr_sqp, sq_rx_ring fields are protected by the 1139 * corresponding squeue, ill_lock* and sq_lock. Holding any 1140 * of them will ensure that the ring to squeue mapping does 1141 * not change. 1142 */ 1143 ASSERT(!(sqp->sq_state & SQS_DEFAULT)); 1144 1145 sqp->sq_rx_ring = NULL; 1146 rx_ring->rr_sqp = NULL; 1147 1148 sqp->sq_state &= ~(SQS_POLL_CLEANUP | SQS_POLL_THR_QUIESCED | 1149 SQS_POLL_QUIESCE_DONE); 1150 sqp->sq_ill = NULL; 1151 1152 rx_ring->rr_rx_handle = NULL; 1153 rx_ring->rr_intr_handle = NULL; 1154 rx_ring->rr_intr_enable = NULL; 1155 rx_ring->rr_intr_disable = NULL; 1156 sqp->sq_state |= SQS_POLL_CLEANUP_DONE; 1157 } else { 1158 sqp->sq_state &= ~SQS_POLL_QUIESCE; 1159 sqp->sq_state |= SQS_POLL_QUIESCE_DONE; 1160 } 1161 /* 1162 * Signal any waiter that is waiting for the quiesce or cleanup 1163 * to complete and also wait for it to actually see and reset the 1164 * SQS_POLL_CLEANUP_DONE. 1165 */ 1166 cv_signal(&sqp->sq_ctrlop_done_cv); 1167 mutex_exit(&ill->ill_lock); 1168 if (sqp->sq_state & SQS_POLL_CLEANUP_DONE) { 1169 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock); 1170 sqp->sq_state &= ~(SQS_PROC | SQS_WORKER); 1171 } 1172 } 1173 1174 static void 1175 squeue_worker(squeue_t *sqp) 1176 { 1177 kmutex_t *lock = &sqp->sq_lock; 1178 kcondvar_t *async = &sqp->sq_worker_cv; 1179 callb_cpr_t cprinfo; 1180 hrtime_t now; 1181 1182 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_worker"); 1183 mutex_enter(lock); 1184 1185 for (;;) { 1186 for (;;) { 1187 /* 1188 * If the poll thread has handed control to us 1189 * we need to break out of the wait. 1190 */ 1191 if (sqp->sq_state & SQS_PROC_HELD) 1192 break; 1193 1194 /* 1195 * If the squeue is not being processed and we either 1196 * have messages to drain or some thread has signaled 1197 * some control activity we need to break 1198 */ 1199 if (!(sqp->sq_state & SQS_PROC) && 1200 ((sqp->sq_state & SQS_WORKER_THR_CONTROL) || 1201 (sqp->sq_first != NULL))) 1202 break; 1203 1204 /* 1205 * If we have started some control action, then check 1206 * for the SQS_WORKER flag (since we don't 1207 * release the squeue) to make sure we own the squeue 1208 * and break out 1209 */ 1210 if ((sqp->sq_state & SQS_WORKER_THR_CONTROL) && 1211 (sqp->sq_state & SQS_WORKER)) 1212 break; 1213 1214 CALLB_CPR_SAFE_BEGIN(&cprinfo); 1215 cv_wait(async, lock); 1216 CALLB_CPR_SAFE_END(&cprinfo, lock); 1217 } 1218 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) { 1219 squeue_worker_thr_control(sqp); 1220 continue; 1221 } 1222 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 1223 SQS_POLL_CLEANUP_DONE | SQS_POLL_QUIESCE_DONE | 1224 SQS_WORKER_THR_CONTROL | SQS_POLL_THR_CONTROL))); 1225 1226 if (sqp->sq_state & SQS_PROC_HELD) 1227 sqp->sq_state &= ~SQS_PROC_HELD; 1228 1229 now = gethrtime(); 1230 sqp->sq_run = curthread; 1231 sqp->sq_drain(sqp, SQS_WORKER, now + squeue_drain_ns); 1232 sqp->sq_run = NULL; 1233 } 1234 } 1235 1236 uintptr_t * 1237 squeue_getprivate(squeue_t *sqp, sqprivate_t p) 1238 { 1239 ASSERT(p < SQPRIVATE_MAX); 1240 1241 return (&sqp->sq_private[p]); 1242 } 1243 1244 /* ARGSUSED */ 1245 void 1246 squeue_wakeup_conn(void *arg, mblk_t *mp, void *arg2) 1247 { 1248 conn_t *connp = (conn_t *)arg; 1249 squeue_t *sqp = connp->conn_sqp; 1250 1251 /* 1252 * Mark the squeue as paused before waking up the thread stuck 1253 * in squeue_synch_enter(). 1254 */ 1255 mutex_enter(&sqp->sq_lock); 1256 sqp->sq_state |= SQS_PAUSE; 1257 1258 /* 1259 * Notify the thread that it's OK to proceed; that is done by 1260 * clearing the MSGWAITSYNC flag. The synch thread will free the mblk. 1261 */ 1262 ASSERT(mp->b_flag & MSGWAITSYNC); 1263 mp->b_flag &= ~MSGWAITSYNC; 1264 cv_broadcast(&connp->conn_sq_cv); 1265 1266 /* 1267 * We are doing something on behalf of another thread, so we have to 1268 * pause and wait until it finishes. 1269 */ 1270 while (sqp->sq_state & SQS_PAUSE) { 1271 cv_wait(&sqp->sq_synch_cv, &sqp->sq_lock); 1272 } 1273 mutex_exit(&sqp->sq_lock); 1274 } 1275 1276 int 1277 squeue_synch_enter(squeue_t *sqp, conn_t *connp, mblk_t *use_mp) 1278 { 1279 mutex_enter(&sqp->sq_lock); 1280 if (sqp->sq_first == NULL && !(sqp->sq_state & SQS_PROC)) { 1281 /* 1282 * We are OK to proceed if the squeue is empty, and 1283 * no one owns the squeue. 1284 * 1285 * The caller won't own the squeue as this is called from the 1286 * application. 1287 */ 1288 ASSERT(sqp->sq_run == NULL); 1289 1290 sqp->sq_state |= SQS_PROC; 1291 sqp->sq_run = curthread; 1292 mutex_exit(&sqp->sq_lock); 1293 1294 #if SQUEUE_DEBUG 1295 sqp->sq_curmp = NULL; 1296 sqp->sq_curproc = NULL; 1297 sqp->sq_connp = connp; 1298 #endif 1299 connp->conn_on_sqp = B_TRUE; 1300 return (0); 1301 } else { 1302 mblk_t *mp; 1303 1304 mp = (use_mp == NULL) ? allocb(0, BPRI_MED) : use_mp; 1305 if (mp == NULL) { 1306 mutex_exit(&sqp->sq_lock); 1307 return (ENOMEM); 1308 } 1309 1310 /* 1311 * We mark the mblk as awaiting synchronous squeue access 1312 * by setting the MSGWAITSYNC flag. Once squeue_wakeup_conn 1313 * fires, MSGWAITSYNC is cleared, at which point we know we 1314 * have exclusive access. 1315 */ 1316 mp->b_flag |= MSGWAITSYNC; 1317 1318 CONN_INC_REF(connp); 1319 SET_SQUEUE(mp, squeue_wakeup_conn, connp); 1320 ENQUEUE_CHAIN(sqp, mp, mp, 1); 1321 1322 ASSERT(sqp->sq_run != curthread); 1323 1324 /* Wait until the enqueued mblk get processed. */ 1325 while (mp->b_flag & MSGWAITSYNC) 1326 cv_wait(&connp->conn_sq_cv, &sqp->sq_lock); 1327 mutex_exit(&sqp->sq_lock); 1328 1329 if (use_mp == NULL) 1330 freeb(mp); 1331 1332 return (0); 1333 } 1334 } 1335 1336 void 1337 squeue_synch_exit(squeue_t *sqp, conn_t *connp) 1338 { 1339 mutex_enter(&sqp->sq_lock); 1340 if (sqp->sq_run == curthread) { 1341 ASSERT(sqp->sq_state & SQS_PROC); 1342 1343 sqp->sq_state &= ~SQS_PROC; 1344 sqp->sq_run = NULL; 1345 connp->conn_on_sqp = B_FALSE; 1346 1347 if (sqp->sq_first == NULL) { 1348 mutex_exit(&sqp->sq_lock); 1349 } else { 1350 /* 1351 * If this was a normal thread, then it would 1352 * (most likely) continue processing the pending 1353 * requests. Since the just completed operation 1354 * was executed synchronously, the thread should 1355 * not be delayed. To compensate, wake up the 1356 * worker thread right away when there are outstanding 1357 * requests. 1358 */ 1359 sqp->sq_awaken = lbolt; 1360 cv_signal(&sqp->sq_worker_cv); 1361 mutex_exit(&sqp->sq_lock); 1362 } 1363 } else { 1364 /* 1365 * The caller doesn't own the squeue, clear the SQS_PAUSE flag, 1366 * and wake up the squeue owner, such that owner can continue 1367 * processing. 1368 */ 1369 ASSERT(sqp->sq_state & SQS_PAUSE); 1370 sqp->sq_state &= ~SQS_PAUSE; 1371 1372 /* There should be only one thread blocking on sq_synch_cv. */ 1373 cv_signal(&sqp->sq_synch_cv); 1374 mutex_exit(&sqp->sq_lock); 1375 } 1376 } 1377