1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. 23 */ 24 25 /* 26 * Copyright 2012 Joyent, Inc. All rights reserved. 27 */ 28 29 /* 30 * Squeues: General purpose serialization mechanism 31 * ------------------------------------------------ 32 * 33 * Background: 34 * ----------- 35 * 36 * This is a general purpose high-performance serialization mechanism 37 * currently used by TCP/IP. It is implement by means of a per CPU queue, 38 * a worker thread and a polling thread with are bound to the CPU 39 * associated with the squeue. The squeue is strictly FIFO for both read 40 * and write side and only one thread can process it at any given time. 41 * The design goal of squeue was to offer a very high degree of 42 * parallelization (on a per H/W execution pipeline basis) with at 43 * most one queuing. 44 * 45 * The modules needing protection typically calls SQUEUE_ENTER_ONE() or 46 * SQUEUE_ENTER() macro as soon as a thread enter the module 47 * from either direction. For each packet, the processing function 48 * and argument is stored in the mblk itself. When the packet is ready 49 * to be processed, the squeue retrieves the stored function and calls 50 * it with the supplied argument and the pointer to the packet itself. 51 * The called function can assume that no other thread is processing 52 * the squeue when it is executing. 53 * 54 * Squeue/connection binding: 55 * -------------------------- 56 * 57 * TCP/IP uses an IP classifier in conjunction with squeue where specific 58 * connections are assigned to specific squeue (based on various policies), 59 * at the connection creation time. Once assigned, the connection to 60 * squeue mapping is never changed and all future packets for that 61 * connection are processed on that squeue. The connection ("conn") to 62 * squeue mapping is stored in "conn_t" member "conn_sqp". 63 * 64 * Since the processing of the connection cuts across multiple layers 65 * but still allows packets for different connnection to be processed on 66 * other CPU/squeues, squeues are also termed as "Vertical Perimeter" or 67 * "Per Connection Vertical Perimeter". 68 * 69 * Processing Model: 70 * ----------------- 71 * 72 * Squeue doesn't necessary processes packets with its own worker thread. 73 * The callers can pick if they just want to queue the packet, process 74 * their packet if nothing is queued or drain and process. The first two 75 * modes are typically employed when the packet was generated while 76 * already doing the processing behind the squeue and last mode (drain 77 * and process) is typically employed when the thread is entering squeue 78 * for the first time. The squeue still imposes a finite time limit 79 * for which a external thread can do processing after which it switches 80 * processing to its own worker thread. 81 * 82 * Once created, squeues are never deleted. Hence squeue pointers are 83 * always valid. This means that functions outside the squeue can still 84 * refer safely to conn_sqp and their is no need for ref counts. 85 * 86 * Only a thread executing in the squeue can change the squeue of the 87 * connection. It does so by calling a squeue framework function to do this. 88 * After changing the squeue, the thread must leave the squeue. It must not 89 * continue to execute any code that needs squeue protection. 90 * 91 * The squeue framework, after entering the squeue, checks if the current 92 * squeue matches the conn_sqp. If the check fails, the packet is delivered 93 * to right squeue. 94 * 95 * Polling Model: 96 * -------------- 97 * 98 * Squeues can control the rate of packet arrival into itself from the 99 * NIC or specific Rx ring within a NIC. As part of capability negotiation 100 * between IP and MAC layer, squeue are created for each TCP soft ring 101 * (or TCP Rx ring - to be implemented in future). As part of this 102 * negotiation, squeues get a cookie for underlying soft ring or Rx 103 * ring, a function to turn off incoming packets and a function to call 104 * to poll for packets. This helps schedule the receive side packet 105 * processing so that queue backlog doesn't build up and packet processing 106 * doesn't keep getting disturbed by high priority interrupts. As part 107 * of this mode, as soon as a backlog starts building, squeue turns off 108 * the interrupts and switches to poll mode. In poll mode, when poll 109 * thread goes down to retrieve packets, it retrieves them in the form of 110 * a chain which improves performance even more. As the squeue/softring 111 * system gets more packets, it gets more efficient by switching to 112 * polling more often and dealing with larger packet chains. 113 * 114 */ 115 116 #include <sys/types.h> 117 #include <sys/cmn_err.h> 118 #include <sys/debug.h> 119 #include <sys/kmem.h> 120 #include <sys/cpuvar.h> 121 #include <sys/condvar_impl.h> 122 #include <sys/systm.h> 123 #include <sys/callb.h> 124 #include <sys/sdt.h> 125 #include <sys/ddi.h> 126 #include <sys/sunddi.h> 127 #include <sys/stack.h> 128 #include <sys/archsystm.h> 129 130 #include <inet/ipclassifier.h> 131 #include <inet/udp_impl.h> 132 133 #include <sys/squeue_impl.h> 134 135 static void squeue_fire(void *); 136 static void squeue_drain(squeue_t *, uint_t, hrtime_t); 137 static void squeue_worker(squeue_t *sqp); 138 static void squeue_polling_thread(squeue_t *sqp); 139 140 kmem_cache_t *squeue_cache; 141 142 #define SQUEUE_MSEC_TO_NSEC 1000000 143 144 int squeue_drain_ms = 20; 145 int squeue_workerwait_ms = 0; 146 147 /* The values above converted to ticks or nano seconds */ 148 static int squeue_drain_ns = 0; 149 static int squeue_workerwait_tick = 0; 150 151 uintptr_t squeue_drain_stack_needed = 10240; 152 uint_t squeue_drain_stack_toodeep; 153 154 #define MAX_BYTES_TO_PICKUP 150000 155 156 #define ENQUEUE_CHAIN(sqp, mp, tail, cnt) { \ 157 /* \ 158 * Enqueue our mblk chain. \ 159 */ \ 160 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 161 \ 162 if ((sqp)->sq_last != NULL) \ 163 (sqp)->sq_last->b_next = (mp); \ 164 else \ 165 (sqp)->sq_first = (mp); \ 166 (sqp)->sq_last = (tail); \ 167 (sqp)->sq_count += (cnt); \ 168 ASSERT((sqp)->sq_count > 0); \ 169 DTRACE_PROBE4(squeue__enqueuechain, squeue_t *, sqp, \ 170 mblk_t *, mp, mblk_t *, tail, int, cnt); \ 171 \ 172 } 173 174 /* 175 * Blank the receive ring (in this case it is the soft ring). When 176 * blanked, the soft ring will not send any more packets up. 177 * Blanking may not succeed when there is a CPU already in the soft 178 * ring sending packets up. In that case, SQS_POLLING will not be 179 * set. 180 */ 181 #define SQS_POLLING_ON(sqp, sq_poll_capable, rx_ring) { \ 182 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 183 if (sq_poll_capable) { \ 184 ASSERT(rx_ring != NULL); \ 185 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \ 186 if (!(sqp->sq_state & SQS_POLLING)) { \ 187 if (rx_ring->rr_intr_disable(rx_ring->rr_intr_handle)) \ 188 sqp->sq_state |= SQS_POLLING; \ 189 } \ 190 } \ 191 } 192 193 #define SQS_POLLING_OFF(sqp, sq_poll_capable, rx_ring) { \ 194 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 195 if (sq_poll_capable) { \ 196 ASSERT(rx_ring != NULL); \ 197 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \ 198 if (sqp->sq_state & SQS_POLLING) { \ 199 sqp->sq_state &= ~SQS_POLLING; \ 200 rx_ring->rr_intr_enable(rx_ring->rr_intr_handle); \ 201 } \ 202 } \ 203 } 204 205 /* Wakeup poll thread only if SQS_POLLING is set */ 206 #define SQS_POLL_RING(sqp) { \ 207 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 208 if (sqp->sq_state & SQS_POLLING) { \ 209 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \ 210 if (!(sqp->sq_state & SQS_GET_PKTS)) { \ 211 sqp->sq_state |= SQS_GET_PKTS; \ 212 cv_signal(&sqp->sq_poll_cv); \ 213 } \ 214 } \ 215 } 216 217 #ifdef DEBUG 218 #define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) { \ 219 (sqp)->sq_curmp = (mp); \ 220 (sqp)->sq_curproc = (proc); \ 221 (sqp)->sq_connp = (connp); \ 222 (mp)->b_tag = (sqp)->sq_tag = (tag); \ 223 } 224 225 #define SQUEUE_DBG_CLEAR(sqp) { \ 226 (sqp)->sq_curmp = NULL; \ 227 (sqp)->sq_curproc = NULL; \ 228 (sqp)->sq_connp = NULL; \ 229 } 230 #else 231 #define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) 232 #define SQUEUE_DBG_CLEAR(sqp) 233 #endif 234 235 void 236 squeue_init(void) 237 { 238 squeue_cache = kmem_cache_create("squeue_cache", 239 sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0); 240 241 squeue_drain_ns = squeue_drain_ms * SQUEUE_MSEC_TO_NSEC; 242 squeue_workerwait_tick = MSEC_TO_TICK_ROUNDUP(squeue_workerwait_ms); 243 } 244 245 /* ARGSUSED */ 246 squeue_t * 247 squeue_create(clock_t wait, pri_t pri) 248 { 249 squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP); 250 251 bzero(sqp, sizeof (squeue_t)); 252 sqp->sq_bind = PBIND_NONE; 253 sqp->sq_priority = pri; 254 sqp->sq_wait = MSEC_TO_TICK(wait); 255 sqp->sq_worker = thread_create(NULL, 0, squeue_worker, 256 sqp, 0, &p0, TS_RUN, pri); 257 258 sqp->sq_poll_thr = thread_create(NULL, 0, squeue_polling_thread, 259 sqp, 0, &p0, TS_RUN, pri); 260 261 sqp->sq_enter = squeue_enter; 262 sqp->sq_drain = squeue_drain; 263 264 return (sqp); 265 } 266 267 /* 268 * Bind squeue worker thread to the specified CPU, given by CPU id. 269 * If the CPU id value is -1, bind the worker thread to the value 270 * specified in sq_bind field. If a thread is already bound to a 271 * different CPU, unbind it from the old CPU and bind to the new one. 272 */ 273 274 void 275 squeue_bind(squeue_t *sqp, processorid_t bind) 276 { 277 mutex_enter(&sqp->sq_lock); 278 ASSERT(sqp->sq_bind != PBIND_NONE || bind != PBIND_NONE); 279 ASSERT(MUTEX_HELD(&cpu_lock)); 280 281 if (sqp->sq_state & SQS_BOUND) { 282 if (sqp->sq_bind == bind) { 283 mutex_exit(&sqp->sq_lock); 284 return; 285 } 286 thread_affinity_clear(sqp->sq_worker); 287 } else { 288 sqp->sq_state |= SQS_BOUND; 289 } 290 291 if (bind != PBIND_NONE) 292 sqp->sq_bind = bind; 293 294 thread_affinity_set(sqp->sq_worker, sqp->sq_bind); 295 mutex_exit(&sqp->sq_lock); 296 } 297 298 void 299 squeue_unbind(squeue_t *sqp) 300 { 301 mutex_enter(&sqp->sq_lock); 302 if (!(sqp->sq_state & SQS_BOUND)) { 303 mutex_exit(&sqp->sq_lock); 304 return; 305 } 306 307 sqp->sq_state &= ~SQS_BOUND; 308 thread_affinity_clear(sqp->sq_worker); 309 mutex_exit(&sqp->sq_lock); 310 } 311 312 void 313 squeue_worker_wakeup(squeue_t *sqp) 314 { 315 timeout_id_t tid = (sqp)->sq_tid; 316 317 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); 318 319 if (sqp->sq_wait == 0) { 320 ASSERT(tid == 0); 321 ASSERT(!(sqp->sq_state & SQS_TMO_PROG)); 322 sqp->sq_awaken = ddi_get_lbolt(); 323 cv_signal(&sqp->sq_worker_cv); 324 mutex_exit(&sqp->sq_lock); 325 return; 326 } 327 328 /* 329 * Queue isn't being processed, so take 330 * any post enqueue actions needed before leaving. 331 */ 332 if (tid != 0) { 333 /* 334 * Waiting for an enter() to process mblk(s). 335 */ 336 clock_t now = ddi_get_lbolt(); 337 clock_t waited = now - sqp->sq_awaken; 338 339 if (TICK_TO_MSEC(waited) >= sqp->sq_wait) { 340 /* 341 * Times up and have a worker thread 342 * waiting for work, so schedule it. 343 */ 344 sqp->sq_tid = 0; 345 sqp->sq_awaken = now; 346 cv_signal(&sqp->sq_worker_cv); 347 mutex_exit(&sqp->sq_lock); 348 (void) untimeout(tid); 349 return; 350 } 351 mutex_exit(&sqp->sq_lock); 352 return; 353 } else if (sqp->sq_state & SQS_TMO_PROG) { 354 mutex_exit(&sqp->sq_lock); 355 return; 356 } else { 357 clock_t wait = sqp->sq_wait; 358 /* 359 * Wait up to sqp->sq_wait ms for an 360 * enter() to process this queue. We 361 * don't want to contend on timeout locks 362 * with sq_lock held for performance reasons, 363 * so drop the sq_lock before calling timeout 364 * but we need to check if timeout is required 365 * after re acquiring the sq_lock. Once 366 * the sq_lock is dropped, someone else could 367 * have processed the packet or the timeout could 368 * have already fired. 369 */ 370 sqp->sq_state |= SQS_TMO_PROG; 371 mutex_exit(&sqp->sq_lock); 372 tid = timeout(squeue_fire, sqp, wait); 373 mutex_enter(&sqp->sq_lock); 374 /* Check again if we still need the timeout */ 375 if (((sqp->sq_state & (SQS_PROC|SQS_TMO_PROG)) == 376 SQS_TMO_PROG) && (sqp->sq_tid == 0) && 377 (sqp->sq_first != NULL)) { 378 sqp->sq_state &= ~SQS_TMO_PROG; 379 sqp->sq_tid = tid; 380 mutex_exit(&sqp->sq_lock); 381 return; 382 } else { 383 if (sqp->sq_state & SQS_TMO_PROG) { 384 sqp->sq_state &= ~SQS_TMO_PROG; 385 mutex_exit(&sqp->sq_lock); 386 (void) untimeout(tid); 387 } else { 388 /* 389 * The timer fired before we could 390 * reacquire the sq_lock. squeue_fire 391 * removes the SQS_TMO_PROG flag 392 * and we don't need to do anything 393 * else. 394 */ 395 mutex_exit(&sqp->sq_lock); 396 } 397 } 398 } 399 400 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 401 } 402 403 /* 404 * squeue_enter() - enter squeue sqp with mblk mp (which can be 405 * a chain), while tail points to the end and cnt in number of 406 * mblks in the chain. 407 * 408 * For a chain of single packet (i.e. mp == tail), go through the 409 * fast path if no one is processing the squeue and nothing is queued. 410 * 411 * The proc and arg for each mblk is already stored in the mblk in 412 * appropriate places. 413 * 414 * The process_flag specifies if we are allowed to process the mblk 415 * and drain in the entering thread context. If process_flag is 416 * SQ_FILL, then we just queue the mblk and return (after signaling 417 * the worker thread if no one else is processing the squeue). 418 * 419 * The ira argument can be used when the count is one. 420 * For a chain the caller needs to prepend any needed mblks from 421 * ip_recv_attr_to_mblk(). 422 */ 423 /* ARGSUSED */ 424 void 425 squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt, 426 ip_recv_attr_t *ira, int process_flag, uint8_t tag) 427 { 428 conn_t *connp; 429 sqproc_t proc; 430 hrtime_t now; 431 432 ASSERT(sqp != NULL); 433 ASSERT(mp != NULL); 434 ASSERT(tail != NULL); 435 ASSERT(cnt > 0); 436 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 437 ASSERT(ira == NULL || cnt == 1); 438 439 mutex_enter(&sqp->sq_lock); 440 441 /* 442 * Try to process the packet if SQ_FILL flag is not set and 443 * we are allowed to process the squeue. The SQ_NODRAIN is 444 * ignored if the packet chain consists of more than 1 packet. 445 */ 446 if (!(sqp->sq_state & SQS_PROC) && ((process_flag == SQ_PROCESS) || 447 (process_flag == SQ_NODRAIN && sqp->sq_first == NULL))) { 448 /* 449 * See if anything is already queued. If we are the 450 * first packet, do inline processing else queue the 451 * packet and do the drain. 452 */ 453 if (sqp->sq_first == NULL && cnt == 1) { 454 /* 455 * Fast-path, ok to process and nothing queued. 456 */ 457 sqp->sq_state |= (SQS_PROC|SQS_FAST); 458 sqp->sq_run = curthread; 459 mutex_exit(&sqp->sq_lock); 460 461 /* 462 * We are the chain of 1 packet so 463 * go through this fast path. 464 */ 465 ASSERT(mp->b_prev != NULL); 466 ASSERT(mp->b_queue != NULL); 467 connp = (conn_t *)mp->b_prev; 468 mp->b_prev = NULL; 469 proc = (sqproc_t)mp->b_queue; 470 mp->b_queue = NULL; 471 ASSERT(proc != NULL && connp != NULL); 472 ASSERT(mp->b_next == NULL); 473 474 /* 475 * Handle squeue switching. More details in the 476 * block comment at the top of the file 477 */ 478 if (connp->conn_sqp == sqp) { 479 SQUEUE_DBG_SET(sqp, mp, proc, connp, 480 tag); 481 connp->conn_on_sqp = B_TRUE; 482 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 483 sqp, mblk_t *, mp, conn_t *, connp); 484 (*proc)(connp, mp, sqp, ira); 485 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 486 sqp, conn_t *, connp); 487 connp->conn_on_sqp = B_FALSE; 488 SQUEUE_DBG_CLEAR(sqp); 489 CONN_DEC_REF(connp); 490 } else { 491 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, 492 connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE); 493 } 494 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 495 mutex_enter(&sqp->sq_lock); 496 sqp->sq_state &= ~(SQS_PROC|SQS_FAST); 497 sqp->sq_run = NULL; 498 if (sqp->sq_first == NULL || 499 process_flag == SQ_NODRAIN) { 500 if (sqp->sq_first != NULL) { 501 squeue_worker_wakeup(sqp); 502 return; 503 } 504 /* 505 * We processed inline our packet and nothing 506 * new has arrived. We are done. In case any 507 * control actions are pending, wake up the 508 * worker. 509 */ 510 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) 511 cv_signal(&sqp->sq_worker_cv); 512 mutex_exit(&sqp->sq_lock); 513 return; 514 } 515 } else { 516 if (ira != NULL) { 517 mblk_t *attrmp; 518 519 ASSERT(cnt == 1); 520 attrmp = ip_recv_attr_to_mblk(ira); 521 if (attrmp == NULL) { 522 mutex_exit(&sqp->sq_lock); 523 ip_drop_input("squeue: " 524 "ip_recv_attr_to_mblk", 525 mp, NULL); 526 /* Caller already set b_prev/b_next */ 527 mp->b_prev = mp->b_next = NULL; 528 freemsg(mp); 529 return; 530 } 531 ASSERT(attrmp->b_cont == NULL); 532 attrmp->b_cont = mp; 533 /* Move connp and func to new */ 534 attrmp->b_queue = mp->b_queue; 535 mp->b_queue = NULL; 536 attrmp->b_prev = mp->b_prev; 537 mp->b_prev = NULL; 538 539 ASSERT(mp == tail); 540 tail = mp = attrmp; 541 } 542 543 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 544 #ifdef DEBUG 545 mp->b_tag = tag; 546 #endif 547 } 548 /* 549 * We are here because either we couldn't do inline 550 * processing (because something was already queued), 551 * or we had a chain of more than one packet, 552 * or something else arrived after we were done with 553 * inline processing. 554 */ 555 ASSERT(MUTEX_HELD(&sqp->sq_lock)); 556 ASSERT(sqp->sq_first != NULL); 557 now = gethrtime(); 558 sqp->sq_run = curthread; 559 sqp->sq_drain(sqp, SQS_ENTER, now + squeue_drain_ns); 560 561 /* 562 * If we didn't do a complete drain, the worker 563 * thread was already signalled by squeue_drain. 564 * In case any control actions are pending, wake 565 * up the worker. 566 */ 567 sqp->sq_run = NULL; 568 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) 569 cv_signal(&sqp->sq_worker_cv); 570 mutex_exit(&sqp->sq_lock); 571 return; 572 } else { 573 /* 574 * We let a thread processing a squeue reenter only 575 * once. This helps the case of incoming connection 576 * where a SYN-ACK-ACK that triggers the conn_ind 577 * doesn't have to queue the packet if listener and 578 * eager are on the same squeue. Also helps the 579 * loopback connection where the two ends are bound 580 * to the same squeue (which is typical on single 581 * CPU machines). 582 * 583 * We let the thread reenter only once for the fear 584 * of stack getting blown with multiple traversal. 585 */ 586 connp = (conn_t *)mp->b_prev; 587 if (!(sqp->sq_state & SQS_REENTER) && 588 (process_flag != SQ_FILL) && (sqp->sq_first == NULL) && 589 (sqp->sq_run == curthread) && (cnt == 1) && 590 (connp->conn_on_sqp == B_FALSE)) { 591 sqp->sq_state |= SQS_REENTER; 592 mutex_exit(&sqp->sq_lock); 593 594 ASSERT(mp->b_prev != NULL); 595 ASSERT(mp->b_queue != NULL); 596 597 mp->b_prev = NULL; 598 proc = (sqproc_t)mp->b_queue; 599 mp->b_queue = NULL; 600 601 /* 602 * Handle squeue switching. More details in the 603 * block comment at the top of the file 604 */ 605 if (connp->conn_sqp == sqp) { 606 connp->conn_on_sqp = B_TRUE; 607 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 608 sqp, mblk_t *, mp, conn_t *, connp); 609 (*proc)(connp, mp, sqp, ira); 610 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 611 sqp, conn_t *, connp); 612 connp->conn_on_sqp = B_FALSE; 613 CONN_DEC_REF(connp); 614 } else { 615 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, 616 connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE); 617 } 618 619 mutex_enter(&sqp->sq_lock); 620 sqp->sq_state &= ~SQS_REENTER; 621 mutex_exit(&sqp->sq_lock); 622 return; 623 } 624 625 /* 626 * Queue is already being processed or there is already 627 * one or more paquets on the queue. Enqueue the 628 * packet and wakeup the squeue worker thread if the 629 * squeue is not being processed. 630 */ 631 #ifdef DEBUG 632 mp->b_tag = tag; 633 #endif 634 if (ira != NULL) { 635 mblk_t *attrmp; 636 637 ASSERT(cnt == 1); 638 attrmp = ip_recv_attr_to_mblk(ira); 639 if (attrmp == NULL) { 640 mutex_exit(&sqp->sq_lock); 641 ip_drop_input("squeue: ip_recv_attr_to_mblk", 642 mp, NULL); 643 /* Caller already set b_prev/b_next */ 644 mp->b_prev = mp->b_next = NULL; 645 freemsg(mp); 646 return; 647 } 648 ASSERT(attrmp->b_cont == NULL); 649 attrmp->b_cont = mp; 650 /* Move connp and func to new */ 651 attrmp->b_queue = mp->b_queue; 652 mp->b_queue = NULL; 653 attrmp->b_prev = mp->b_prev; 654 mp->b_prev = NULL; 655 656 ASSERT(mp == tail); 657 tail = mp = attrmp; 658 } 659 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 660 if (!(sqp->sq_state & SQS_PROC)) { 661 squeue_worker_wakeup(sqp); 662 return; 663 } 664 /* 665 * In case any control actions are pending, wake 666 * up the worker. 667 */ 668 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) 669 cv_signal(&sqp->sq_worker_cv); 670 mutex_exit(&sqp->sq_lock); 671 return; 672 } 673 } 674 675 /* 676 * PRIVATE FUNCTIONS 677 */ 678 679 static void 680 squeue_fire(void *arg) 681 { 682 squeue_t *sqp = arg; 683 uint_t state; 684 685 mutex_enter(&sqp->sq_lock); 686 687 state = sqp->sq_state; 688 if (sqp->sq_tid == 0 && !(state & SQS_TMO_PROG)) { 689 mutex_exit(&sqp->sq_lock); 690 return; 691 } 692 693 sqp->sq_tid = 0; 694 /* 695 * The timeout fired before we got a chance to set it. 696 * Process it anyway but remove the SQS_TMO_PROG so that 697 * the guy trying to set the timeout knows that it has 698 * already been processed. 699 */ 700 if (state & SQS_TMO_PROG) 701 sqp->sq_state &= ~SQS_TMO_PROG; 702 703 if (!(state & SQS_PROC)) { 704 sqp->sq_awaken = ddi_get_lbolt(); 705 cv_signal(&sqp->sq_worker_cv); 706 } 707 mutex_exit(&sqp->sq_lock); 708 } 709 710 static void 711 squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire) 712 { 713 mblk_t *mp; 714 mblk_t *head; 715 sqproc_t proc; 716 conn_t *connp; 717 timeout_id_t tid; 718 ill_rx_ring_t *sq_rx_ring = sqp->sq_rx_ring; 719 hrtime_t now; 720 boolean_t did_wakeup = B_FALSE; 721 boolean_t sq_poll_capable; 722 ip_recv_attr_t *ira, iras; 723 724 /* 725 * Before doing any work, check our stack depth; if we're not a 726 * worker thread for this squeue and we're beginning to get tight on 727 * on stack, kick the worker, bump a counter and return. 728 */ 729 if (proc_type != SQS_WORKER && STACK_BIAS + (uintptr_t)getfp() - 730 (uintptr_t)curthread->t_stkbase < squeue_drain_stack_needed) { 731 ASSERT(mutex_owned(&sqp->sq_lock)); 732 sqp->sq_awaken = ddi_get_lbolt(); 733 cv_signal(&sqp->sq_worker_cv); 734 squeue_drain_stack_toodeep++; 735 return; 736 } 737 738 sq_poll_capable = (sqp->sq_state & SQS_POLL_CAPAB) != 0; 739 again: 740 ASSERT(mutex_owned(&sqp->sq_lock)); 741 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 742 SQS_POLL_QUIESCE_DONE))); 743 744 head = sqp->sq_first; 745 sqp->sq_first = NULL; 746 sqp->sq_last = NULL; 747 sqp->sq_count = 0; 748 749 if ((tid = sqp->sq_tid) != 0) 750 sqp->sq_tid = 0; 751 752 sqp->sq_state |= SQS_PROC | proc_type; 753 754 /* 755 * We have backlog built up. Switch to polling mode if the 756 * device underneath allows it. Need to do it so that 757 * more packets don't come in and disturb us (by contending 758 * for sq_lock or higher priority thread preempting us). 759 * 760 * The worker thread is allowed to do active polling while we 761 * just disable the interrupts for drain by non worker (kernel 762 * or userland) threads so they can peacefully process the 763 * packets during time allocated to them. 764 */ 765 SQS_POLLING_ON(sqp, sq_poll_capable, sq_rx_ring); 766 mutex_exit(&sqp->sq_lock); 767 768 if (tid != 0) 769 (void) untimeout(tid); 770 771 while ((mp = head) != NULL) { 772 773 head = mp->b_next; 774 mp->b_next = NULL; 775 776 proc = (sqproc_t)mp->b_queue; 777 mp->b_queue = NULL; 778 connp = (conn_t *)mp->b_prev; 779 mp->b_prev = NULL; 780 781 /* Is there an ip_recv_attr_t to handle? */ 782 if (ip_recv_attr_is_mblk(mp)) { 783 mblk_t *attrmp = mp; 784 785 ASSERT(attrmp->b_cont != NULL); 786 787 mp = attrmp->b_cont; 788 attrmp->b_cont = NULL; 789 ASSERT(mp->b_queue == NULL); 790 ASSERT(mp->b_prev == NULL); 791 792 if (!ip_recv_attr_from_mblk(attrmp, &iras)) { 793 /* The ill or ip_stack_t disappeared on us */ 794 ip_drop_input("ip_recv_attr_from_mblk", 795 mp, NULL); 796 ira_cleanup(&iras, B_TRUE); 797 CONN_DEC_REF(connp); 798 continue; 799 } 800 ira = &iras; 801 } else { 802 ira = NULL; 803 } 804 805 806 /* 807 * Handle squeue switching. More details in the 808 * block comment at the top of the file 809 */ 810 if (connp->conn_sqp == sqp) { 811 SQUEUE_DBG_SET(sqp, mp, proc, connp, 812 mp->b_tag); 813 connp->conn_on_sqp = B_TRUE; 814 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 815 sqp, mblk_t *, mp, conn_t *, connp); 816 (*proc)(connp, mp, sqp, ira); 817 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 818 sqp, conn_t *, connp); 819 connp->conn_on_sqp = B_FALSE; 820 CONN_DEC_REF(connp); 821 } else { 822 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, connp, ira, 823 SQ_FILL, SQTAG_SQUEUE_CHANGE); 824 } 825 if (ira != NULL) 826 ira_cleanup(ira, B_TRUE); 827 } 828 829 SQUEUE_DBG_CLEAR(sqp); 830 831 mutex_enter(&sqp->sq_lock); 832 833 /* 834 * Check if there is still work to do (either more arrived or timer 835 * expired). If we are the worker thread and we are polling capable, 836 * continue doing the work since no one else is around to do the 837 * work anyway (but signal the poll thread to retrieve some packets 838 * in the meanwhile). If we are not the worker thread, just 839 * signal the worker thread to take up the work if processing time 840 * has expired. 841 */ 842 if (sqp->sq_first != NULL) { 843 /* 844 * Still more to process. If time quanta not expired, we 845 * should let the drain go on. The worker thread is allowed 846 * to drain as long as there is anything left. 847 */ 848 now = gethrtime(); 849 if ((now < expire) || (proc_type == SQS_WORKER)) { 850 /* 851 * If time not expired or we are worker thread and 852 * this squeue is polling capable, continue to do 853 * the drain. 854 * 855 * We turn off interrupts for all userland threads 856 * doing drain but we do active polling only for 857 * worker thread. 858 * 859 * Calling SQS_POLL_RING() even in the case of 860 * SQS_POLLING_ON() not succeeding is ok as 861 * SQS_POLL_RING() will not wake up poll thread 862 * if SQS_POLLING bit is not set. 863 */ 864 if (proc_type == SQS_WORKER) 865 SQS_POLL_RING(sqp); 866 goto again; 867 } else { 868 did_wakeup = B_TRUE; 869 sqp->sq_awaken = ddi_get_lbolt(); 870 cv_signal(&sqp->sq_worker_cv); 871 } 872 } 873 874 /* 875 * If the poll thread is already running, just return. The 876 * poll thread continues to hold the proc and will finish 877 * processing. 878 */ 879 if (sqp->sq_state & SQS_GET_PKTS) { 880 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 881 SQS_POLL_QUIESCE_DONE))); 882 sqp->sq_state &= ~proc_type; 883 return; 884 } 885 886 /* 887 * 888 * If we are the worker thread and no work is left, send the poll 889 * thread down once more to see if something arrived. Otherwise, 890 * turn the interrupts back on and we are done. 891 */ 892 if ((proc_type == SQS_WORKER) && (sqp->sq_state & SQS_POLLING)) { 893 /* 894 * Do one last check to see if anything arrived 895 * in the NIC. We leave the SQS_PROC set to ensure 896 * that poll thread keeps the PROC and can decide 897 * if it needs to turn polling off or continue 898 * processing. 899 * 900 * If we drop the SQS_PROC here and poll thread comes 901 * up empty handed, it can not safely turn polling off 902 * since someone else could have acquired the PROC 903 * and started draining. The previously running poll 904 * thread and the current thread doing drain would end 905 * up in a race for turning polling on/off and more 906 * complex code would be required to deal with it. 907 * 908 * Its lot simpler for drain to hand the SQS_PROC to 909 * poll thread (if running) and let poll thread finish 910 * without worrying about racing with any other thread. 911 */ 912 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 913 SQS_POLL_QUIESCE_DONE))); 914 SQS_POLL_RING(sqp); 915 sqp->sq_state &= ~proc_type; 916 } else { 917 /* 918 * The squeue is either not capable of polling or the 919 * attempt to blank (i.e., turn SQS_POLLING_ON()) was 920 * unsuccessful or poll thread already finished 921 * processing and didn't find anything. Since there 922 * is nothing queued and we already turn polling on 923 * (for all threads doing drain), we should turn 924 * polling off and relinquish the PROC. 925 */ 926 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 927 SQS_POLL_QUIESCE_DONE))); 928 SQS_POLLING_OFF(sqp, sq_poll_capable, sq_rx_ring); 929 sqp->sq_state &= ~(SQS_PROC | proc_type); 930 if (!did_wakeup && sqp->sq_first != NULL) { 931 squeue_worker_wakeup(sqp); 932 mutex_enter(&sqp->sq_lock); 933 } 934 /* 935 * If we are not the worker and there is a pending quiesce 936 * event, wake up the worker 937 */ 938 if ((proc_type != SQS_WORKER) && 939 (sqp->sq_state & SQS_WORKER_THR_CONTROL)) 940 cv_signal(&sqp->sq_worker_cv); 941 } 942 } 943 944 /* 945 * Quiesce, Restart, or Cleanup of the squeue poll thread. 946 * 947 * Quiesce and Restart: After an squeue poll thread has been quiesced, it does 948 * not attempt to poll the underlying soft ring any more. The quiesce is 949 * triggered by the mac layer when it wants to quiesce a soft ring. Typically 950 * control operations such as changing the fanout of a NIC or VNIC (dladm 951 * setlinkprop) need to quiesce data flow before changing the wiring. 952 * The operation is done by the mac layer, but it calls back into IP to 953 * quiesce the soft ring. After completing the operation (say increase or 954 * decrease of the fanout) the mac layer then calls back into IP to restart 955 * the quiesced soft ring. 956 * 957 * Cleanup: This is triggered when the squeue binding to a soft ring is 958 * removed permanently. Typically interface plumb and unplumb would trigger 959 * this. It can also be triggered from the mac layer when a soft ring is 960 * being deleted say as the result of a fanout reduction. Since squeues are 961 * never deleted, the cleanup marks the squeue as fit for recycling and 962 * moves it to the zeroth squeue set. 963 */ 964 static void 965 squeue_poll_thr_control(squeue_t *sqp) 966 { 967 if (sqp->sq_state & SQS_POLL_THR_RESTART) { 968 /* Restart implies a previous quiesce */ 969 ASSERT(sqp->sq_state & SQS_POLL_THR_QUIESCED); 970 sqp->sq_state &= ~(SQS_POLL_THR_QUIESCED | 971 SQS_POLL_THR_RESTART); 972 sqp->sq_state |= SQS_POLL_CAPAB; 973 cv_signal(&sqp->sq_worker_cv); 974 return; 975 } 976 977 if (sqp->sq_state & SQS_POLL_THR_QUIESCE) { 978 sqp->sq_state |= SQS_POLL_THR_QUIESCED; 979 sqp->sq_state &= ~SQS_POLL_THR_QUIESCE; 980 cv_signal(&sqp->sq_worker_cv); 981 return; 982 } 983 } 984 985 /* 986 * POLLING Notes 987 * 988 * With polling mode, we want to do as much processing as we possibly can 989 * in worker thread context. The sweet spot is worker thread keeps doing 990 * work all the time in polling mode and writers etc. keep dumping packets 991 * to worker thread. Occassionally, we send the poll thread (running at 992 * lower priority to NIC to get the chain of packets to feed to worker). 993 * Sending the poll thread down to NIC is dependant on 3 criterions 994 * 995 * 1) Its always driven from squeue_drain and only if worker thread is 996 * doing the drain. 997 * 2) We clear the backlog once and more packets arrived in between. 998 * Before starting drain again, send the poll thread down if 999 * the drain is being done by worker thread. 1000 * 3) Before exiting the squeue_drain, if the poll thread is not already 1001 * working and we are the worker thread, try to poll one more time. 1002 * 1003 * For latency sake, we do allow any thread calling squeue_enter 1004 * to process its packet provided: 1005 * 1006 * 1) Nothing is queued 1007 * 2) If more packets arrived in between, the non worker thread are allowed 1008 * to do the drain till their time quanta expired provided SQS_GET_PKTS 1009 * wasn't set in between. 1010 * 1011 * Avoiding deadlocks with interrupts 1012 * ================================== 1013 * 1014 * One of the big problem is that we can't send poll_thr down while holding 1015 * the sq_lock since the thread can block. So we drop the sq_lock before 1016 * calling sq_get_pkts(). We keep holding the SQS_PROC as long as the 1017 * poll thread is running so that no other thread can acquire the 1018 * perimeter in between. If the squeue_drain gets done (no more work 1019 * left), it leaves the SQS_PROC set if poll thread is running. 1020 */ 1021 1022 /* 1023 * This is the squeue poll thread. In poll mode, it polls the underlying 1024 * TCP softring and feeds packets into the squeue. The worker thread then 1025 * drains the squeue. The poll thread also responds to control signals for 1026 * quiesceing, restarting, or cleanup of an squeue. These are driven by 1027 * control operations like plumb/unplumb or as a result of dynamic Rx ring 1028 * related operations that are driven from the mac layer. 1029 */ 1030 static void 1031 squeue_polling_thread(squeue_t *sqp) 1032 { 1033 kmutex_t *lock = &sqp->sq_lock; 1034 kcondvar_t *async = &sqp->sq_poll_cv; 1035 ip_mac_rx_t sq_get_pkts; 1036 ip_accept_t ip_accept; 1037 ill_rx_ring_t *sq_rx_ring; 1038 ill_t *sq_ill; 1039 mblk_t *head, *tail, *mp; 1040 uint_t cnt; 1041 void *sq_mac_handle; 1042 callb_cpr_t cprinfo; 1043 size_t bytes_to_pickup; 1044 uint32_t ctl_state; 1045 1046 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_poll"); 1047 mutex_enter(lock); 1048 1049 for (;;) { 1050 CALLB_CPR_SAFE_BEGIN(&cprinfo); 1051 cv_wait(async, lock); 1052 CALLB_CPR_SAFE_END(&cprinfo, lock); 1053 1054 ctl_state = sqp->sq_state & (SQS_POLL_THR_CONTROL | 1055 SQS_POLL_THR_QUIESCED); 1056 if (ctl_state != 0) { 1057 /* 1058 * If the squeue is quiesced, then wait for a control 1059 * request. A quiesced squeue must not poll the 1060 * underlying soft ring. 1061 */ 1062 if (ctl_state == SQS_POLL_THR_QUIESCED) 1063 continue; 1064 /* 1065 * Act on control requests to quiesce, cleanup or 1066 * restart an squeue 1067 */ 1068 squeue_poll_thr_control(sqp); 1069 continue; 1070 } 1071 1072 if (!(sqp->sq_state & SQS_POLL_CAPAB)) 1073 continue; 1074 1075 ASSERT((sqp->sq_state & 1076 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) == 1077 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)); 1078 1079 poll_again: 1080 sq_rx_ring = sqp->sq_rx_ring; 1081 sq_get_pkts = sq_rx_ring->rr_rx; 1082 sq_mac_handle = sq_rx_ring->rr_rx_handle; 1083 ip_accept = sq_rx_ring->rr_ip_accept; 1084 sq_ill = sq_rx_ring->rr_ill; 1085 bytes_to_pickup = MAX_BYTES_TO_PICKUP; 1086 mutex_exit(lock); 1087 head = sq_get_pkts(sq_mac_handle, bytes_to_pickup); 1088 mp = NULL; 1089 if (head != NULL) { 1090 /* 1091 * We got the packet chain from the mac layer. It 1092 * would be nice to be able to process it inline 1093 * for better performance but we need to give 1094 * IP a chance to look at this chain to ensure 1095 * that packets are really meant for this squeue 1096 * and do the IP processing. 1097 */ 1098 mp = ip_accept(sq_ill, sq_rx_ring, sqp, head, 1099 &tail, &cnt); 1100 } 1101 mutex_enter(lock); 1102 if (mp != NULL) { 1103 /* 1104 * The ip_accept function has already added an 1105 * ip_recv_attr_t mblk if that is needed. 1106 */ 1107 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 1108 } 1109 ASSERT((sqp->sq_state & 1110 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) == 1111 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)); 1112 1113 if (sqp->sq_first != NULL && !(sqp->sq_state & SQS_WORKER)) { 1114 /* 1115 * We have packets to process and worker thread 1116 * is not running. Check to see if poll thread is 1117 * allowed to process. Let it do processing only if it 1118 * picked up some packets from the NIC otherwise 1119 * wakeup the worker thread. 1120 */ 1121 if (mp != NULL) { 1122 hrtime_t now; 1123 1124 now = gethrtime(); 1125 sqp->sq_run = curthread; 1126 sqp->sq_drain(sqp, SQS_POLL_PROC, now + 1127 squeue_drain_ns); 1128 sqp->sq_run = NULL; 1129 1130 if (sqp->sq_first == NULL) 1131 goto poll_again; 1132 1133 /* 1134 * Couldn't do the entire drain because the 1135 * time limit expired, let the 1136 * worker thread take over. 1137 */ 1138 } 1139 1140 sqp->sq_awaken = ddi_get_lbolt(); 1141 /* 1142 * Put the SQS_PROC_HELD on so the worker 1143 * thread can distinguish where its called from. We 1144 * can remove the SQS_PROC flag here and turn off the 1145 * polling so that it wouldn't matter who gets the 1146 * processing but we get better performance this way 1147 * and save the cost of turn polling off and possibly 1148 * on again as soon as we start draining again. 1149 * 1150 * We can't remove the SQS_PROC flag without turning 1151 * polling off until we can guarantee that control 1152 * will return to squeue_drain immediately. 1153 */ 1154 sqp->sq_state |= SQS_PROC_HELD; 1155 sqp->sq_state &= ~SQS_GET_PKTS; 1156 cv_signal(&sqp->sq_worker_cv); 1157 } else if (sqp->sq_first == NULL && 1158 !(sqp->sq_state & SQS_WORKER)) { 1159 /* 1160 * Nothing queued and worker thread not running. 1161 * Since we hold the proc, no other thread is 1162 * processing the squeue. This means that there 1163 * is no work to be done and nothing is queued 1164 * in squeue or in NIC. Turn polling off and go 1165 * back to interrupt mode. 1166 */ 1167 sqp->sq_state &= ~(SQS_PROC|SQS_GET_PKTS); 1168 /* LINTED: constant in conditional context */ 1169 SQS_POLLING_OFF(sqp, B_TRUE, sq_rx_ring); 1170 1171 /* 1172 * If there is a pending control operation 1173 * wake up the worker, since it is currently 1174 * not running. 1175 */ 1176 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) 1177 cv_signal(&sqp->sq_worker_cv); 1178 } else { 1179 /* 1180 * Worker thread is already running. We don't need 1181 * to do anything. Indicate that poll thread is done. 1182 */ 1183 sqp->sq_state &= ~SQS_GET_PKTS; 1184 } 1185 if (sqp->sq_state & SQS_POLL_THR_CONTROL) { 1186 /* 1187 * Act on control requests to quiesce, cleanup or 1188 * restart an squeue 1189 */ 1190 squeue_poll_thr_control(sqp); 1191 } 1192 } 1193 } 1194 1195 /* 1196 * The squeue worker thread acts on any control requests to quiesce, cleanup 1197 * or restart an ill_rx_ring_t by calling this function. The worker thread 1198 * synchronizes with the squeue poll thread to complete the request and finally 1199 * wakes up the requestor when the request is completed. 1200 */ 1201 static void 1202 squeue_worker_thr_control(squeue_t *sqp) 1203 { 1204 ill_t *ill; 1205 ill_rx_ring_t *rx_ring; 1206 1207 ASSERT(MUTEX_HELD(&sqp->sq_lock)); 1208 1209 if (sqp->sq_state & SQS_POLL_RESTART) { 1210 /* Restart implies a previous quiesce. */ 1211 ASSERT((sqp->sq_state & (SQS_PROC_HELD | 1212 SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)) == 1213 (SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)); 1214 /* 1215 * Request the squeue poll thread to restart and wait till 1216 * it actually restarts. 1217 */ 1218 sqp->sq_state &= ~SQS_POLL_QUIESCE_DONE; 1219 sqp->sq_state |= SQS_POLL_THR_RESTART; 1220 cv_signal(&sqp->sq_poll_cv); 1221 while (sqp->sq_state & SQS_POLL_THR_QUIESCED) 1222 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock); 1223 sqp->sq_state &= ~(SQS_POLL_RESTART | SQS_PROC | 1224 SQS_WORKER); 1225 /* 1226 * Signal any waiter that is waiting for the restart 1227 * to complete 1228 */ 1229 sqp->sq_state |= SQS_POLL_RESTART_DONE; 1230 cv_signal(&sqp->sq_ctrlop_done_cv); 1231 return; 1232 } 1233 1234 if (sqp->sq_state & SQS_PROC_HELD) { 1235 /* The squeue poll thread handed control to us */ 1236 ASSERT(sqp->sq_state & SQS_PROC); 1237 } 1238 1239 /* 1240 * Prevent any other thread from processing the squeue 1241 * until we finish the control actions by setting SQS_PROC. 1242 * But allow ourself to reenter by setting SQS_WORKER 1243 */ 1244 sqp->sq_state |= (SQS_PROC | SQS_WORKER); 1245 1246 /* Signal the squeue poll thread and wait for it to quiesce itself */ 1247 if (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) { 1248 sqp->sq_state |= SQS_POLL_THR_QUIESCE; 1249 cv_signal(&sqp->sq_poll_cv); 1250 while (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) 1251 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock); 1252 } 1253 1254 rx_ring = sqp->sq_rx_ring; 1255 ill = rx_ring->rr_ill; 1256 /* 1257 * The lock hierarchy is as follows. 1258 * cpu_lock -> ill_lock -> sqset_lock -> sq_lock 1259 */ 1260 mutex_exit(&sqp->sq_lock); 1261 mutex_enter(&ill->ill_lock); 1262 mutex_enter(&sqp->sq_lock); 1263 1264 SQS_POLLING_OFF(sqp, (sqp->sq_state & SQS_POLL_CAPAB) != 0, 1265 sqp->sq_rx_ring); 1266 sqp->sq_state &= ~(SQS_POLL_CAPAB | SQS_GET_PKTS | SQS_PROC_HELD); 1267 if (sqp->sq_state & SQS_POLL_CLEANUP) { 1268 /* 1269 * Disassociate this squeue from its ill_rx_ring_t. 1270 * The rr_sqp, sq_rx_ring fields are protected by the 1271 * corresponding squeue, ill_lock* and sq_lock. Holding any 1272 * of them will ensure that the ring to squeue mapping does 1273 * not change. 1274 */ 1275 ASSERT(!(sqp->sq_state & SQS_DEFAULT)); 1276 1277 sqp->sq_rx_ring = NULL; 1278 rx_ring->rr_sqp = NULL; 1279 1280 sqp->sq_state &= ~(SQS_POLL_CLEANUP | SQS_POLL_THR_QUIESCED | 1281 SQS_POLL_QUIESCE_DONE); 1282 sqp->sq_ill = NULL; 1283 1284 rx_ring->rr_rx_handle = NULL; 1285 rx_ring->rr_intr_handle = NULL; 1286 rx_ring->rr_intr_enable = NULL; 1287 rx_ring->rr_intr_disable = NULL; 1288 sqp->sq_state |= SQS_POLL_CLEANUP_DONE; 1289 } else { 1290 sqp->sq_state &= ~SQS_POLL_QUIESCE; 1291 sqp->sq_state |= SQS_POLL_QUIESCE_DONE; 1292 } 1293 /* 1294 * Signal any waiter that is waiting for the quiesce or cleanup 1295 * to complete and also wait for it to actually see and reset the 1296 * SQS_POLL_CLEANUP_DONE. 1297 */ 1298 cv_signal(&sqp->sq_ctrlop_done_cv); 1299 mutex_exit(&ill->ill_lock); 1300 if (sqp->sq_state & SQS_POLL_CLEANUP_DONE) { 1301 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock); 1302 sqp->sq_state &= ~(SQS_PROC | SQS_WORKER); 1303 } 1304 } 1305 1306 static void 1307 squeue_worker(squeue_t *sqp) 1308 { 1309 kmutex_t *lock = &sqp->sq_lock; 1310 kcondvar_t *async = &sqp->sq_worker_cv; 1311 callb_cpr_t cprinfo; 1312 hrtime_t now; 1313 1314 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_worker"); 1315 mutex_enter(lock); 1316 1317 for (;;) { 1318 for (;;) { 1319 /* 1320 * If the poll thread has handed control to us 1321 * we need to break out of the wait. 1322 */ 1323 if (sqp->sq_state & SQS_PROC_HELD) 1324 break; 1325 1326 /* 1327 * If the squeue is not being processed and we either 1328 * have messages to drain or some thread has signaled 1329 * some control activity we need to break 1330 */ 1331 if (!(sqp->sq_state & SQS_PROC) && 1332 ((sqp->sq_state & SQS_WORKER_THR_CONTROL) || 1333 (sqp->sq_first != NULL))) 1334 break; 1335 1336 /* 1337 * If we have started some control action, then check 1338 * for the SQS_WORKER flag (since we don't 1339 * release the squeue) to make sure we own the squeue 1340 * and break out 1341 */ 1342 if ((sqp->sq_state & SQS_WORKER_THR_CONTROL) && 1343 (sqp->sq_state & SQS_WORKER)) 1344 break; 1345 1346 CALLB_CPR_SAFE_BEGIN(&cprinfo); 1347 cv_wait(async, lock); 1348 CALLB_CPR_SAFE_END(&cprinfo, lock); 1349 } 1350 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) { 1351 squeue_worker_thr_control(sqp); 1352 continue; 1353 } 1354 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 1355 SQS_POLL_CLEANUP_DONE | SQS_POLL_QUIESCE_DONE | 1356 SQS_WORKER_THR_CONTROL | SQS_POLL_THR_CONTROL))); 1357 1358 if (sqp->sq_state & SQS_PROC_HELD) 1359 sqp->sq_state &= ~SQS_PROC_HELD; 1360 1361 now = gethrtime(); 1362 sqp->sq_run = curthread; 1363 sqp->sq_drain(sqp, SQS_WORKER, now + squeue_drain_ns); 1364 sqp->sq_run = NULL; 1365 } 1366 } 1367 1368 uintptr_t * 1369 squeue_getprivate(squeue_t *sqp, sqprivate_t p) 1370 { 1371 ASSERT(p < SQPRIVATE_MAX); 1372 1373 return (&sqp->sq_private[p]); 1374 } 1375 1376 /* ARGSUSED */ 1377 void 1378 squeue_wakeup_conn(void *arg, mblk_t *mp, void *arg2, ip_recv_attr_t *dummy) 1379 { 1380 conn_t *connp = (conn_t *)arg; 1381 squeue_t *sqp = connp->conn_sqp; 1382 1383 /* 1384 * Mark the squeue as paused before waking up the thread stuck 1385 * in squeue_synch_enter(). 1386 */ 1387 mutex_enter(&sqp->sq_lock); 1388 sqp->sq_state |= SQS_PAUSE; 1389 1390 /* 1391 * Notify the thread that it's OK to proceed; that is done by 1392 * clearing the MSGWAITSYNC flag. The synch thread will free the mblk. 1393 */ 1394 ASSERT(mp->b_flag & MSGWAITSYNC); 1395 mp->b_flag &= ~MSGWAITSYNC; 1396 cv_broadcast(&connp->conn_sq_cv); 1397 1398 /* 1399 * We are doing something on behalf of another thread, so we have to 1400 * pause and wait until it finishes. 1401 */ 1402 while (sqp->sq_state & SQS_PAUSE) { 1403 cv_wait(&sqp->sq_synch_cv, &sqp->sq_lock); 1404 } 1405 mutex_exit(&sqp->sq_lock); 1406 } 1407 1408 int 1409 squeue_synch_enter(conn_t *connp, mblk_t *use_mp) 1410 { 1411 squeue_t *sqp; 1412 1413 again: 1414 sqp = connp->conn_sqp; 1415 1416 mutex_enter(&sqp->sq_lock); 1417 if (sqp->sq_first == NULL && !(sqp->sq_state & SQS_PROC)) { 1418 /* 1419 * We are OK to proceed if the squeue is empty, and 1420 * no one owns the squeue. 1421 * 1422 * The caller won't own the squeue as this is called from the 1423 * application. 1424 */ 1425 ASSERT(sqp->sq_run == NULL); 1426 1427 sqp->sq_state |= SQS_PROC; 1428 sqp->sq_run = curthread; 1429 mutex_exit(&sqp->sq_lock); 1430 1431 /* 1432 * Handle squeue switching. The conn's squeue can only change 1433 * while there is a thread in the squeue, which is why we do 1434 * the check after entering the squeue. If it has changed, exit 1435 * this squeue and redo everything with the new sqeueue. 1436 */ 1437 if (sqp != connp->conn_sqp) { 1438 mutex_enter(&sqp->sq_lock); 1439 sqp->sq_state &= ~SQS_PROC; 1440 sqp->sq_run = NULL; 1441 mutex_exit(&sqp->sq_lock); 1442 goto again; 1443 } 1444 #if SQUEUE_DEBUG 1445 sqp->sq_curmp = NULL; 1446 sqp->sq_curproc = NULL; 1447 sqp->sq_connp = connp; 1448 #endif 1449 connp->conn_on_sqp = B_TRUE; 1450 return (0); 1451 } else { 1452 mblk_t *mp; 1453 1454 mp = (use_mp == NULL) ? allocb(0, BPRI_MED) : use_mp; 1455 if (mp == NULL) { 1456 mutex_exit(&sqp->sq_lock); 1457 return (ENOMEM); 1458 } 1459 1460 /* 1461 * We mark the mblk as awaiting synchronous squeue access 1462 * by setting the MSGWAITSYNC flag. Once squeue_wakeup_conn 1463 * fires, MSGWAITSYNC is cleared, at which point we know we 1464 * have exclusive access. 1465 */ 1466 mp->b_flag |= MSGWAITSYNC; 1467 1468 CONN_INC_REF(connp); 1469 SET_SQUEUE(mp, squeue_wakeup_conn, connp); 1470 ENQUEUE_CHAIN(sqp, mp, mp, 1); 1471 1472 ASSERT(sqp->sq_run != curthread); 1473 1474 /* Wait until the enqueued mblk get processed. */ 1475 while (mp->b_flag & MSGWAITSYNC) 1476 cv_wait(&connp->conn_sq_cv, &sqp->sq_lock); 1477 mutex_exit(&sqp->sq_lock); 1478 1479 if (use_mp == NULL) 1480 freeb(mp); 1481 1482 return (0); 1483 } 1484 } 1485 1486 void 1487 squeue_synch_exit(conn_t *connp) 1488 { 1489 squeue_t *sqp = connp->conn_sqp; 1490 1491 mutex_enter(&sqp->sq_lock); 1492 if (sqp->sq_run == curthread) { 1493 ASSERT(sqp->sq_state & SQS_PROC); 1494 1495 sqp->sq_state &= ~SQS_PROC; 1496 sqp->sq_run = NULL; 1497 connp->conn_on_sqp = B_FALSE; 1498 1499 if (sqp->sq_first == NULL) { 1500 mutex_exit(&sqp->sq_lock); 1501 } else { 1502 /* 1503 * If this was a normal thread, then it would 1504 * (most likely) continue processing the pending 1505 * requests. Since the just completed operation 1506 * was executed synchronously, the thread should 1507 * not be delayed. To compensate, wake up the 1508 * worker thread right away when there are outstanding 1509 * requests. 1510 */ 1511 sqp->sq_awaken = ddi_get_lbolt(); 1512 cv_signal(&sqp->sq_worker_cv); 1513 mutex_exit(&sqp->sq_lock); 1514 } 1515 } else { 1516 /* 1517 * The caller doesn't own the squeue, clear the SQS_PAUSE flag, 1518 * and wake up the squeue owner, such that owner can continue 1519 * processing. 1520 */ 1521 ASSERT(sqp->sq_state & SQS_PAUSE); 1522 sqp->sq_state &= ~SQS_PAUSE; 1523 1524 /* There should be only one thread blocking on sq_synch_cv. */ 1525 cv_signal(&sqp->sq_synch_cv); 1526 mutex_exit(&sqp->sq_lock); 1527 } 1528 } 1529