1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. 23 */ 24 25 /* 26 * Squeues: General purpose serialization mechanism 27 * ------------------------------------------------ 28 * 29 * Background: 30 * ----------- 31 * 32 * This is a general purpose high-performance serialization mechanism 33 * currently used by TCP/IP. It is implement by means of a per CPU queue, 34 * a worker thread and a polling thread with are bound to the CPU 35 * associated with the squeue. The squeue is strictly FIFO for both read 36 * and write side and only one thread can process it at any given time. 37 * The design goal of squeue was to offer a very high degree of 38 * parallelization (on a per H/W execution pipeline basis) with at 39 * most one queuing. 40 * 41 * The modules needing protection typically calls SQUEUE_ENTER_ONE() or 42 * SQUEUE_ENTER() macro as soon as a thread enter the module 43 * from either direction. For each packet, the processing function 44 * and argument is stored in the mblk itself. When the packet is ready 45 * to be processed, the squeue retrieves the stored function and calls 46 * it with the supplied argument and the pointer to the packet itself. 47 * The called function can assume that no other thread is processing 48 * the squeue when it is executing. 49 * 50 * Squeue/connection binding: 51 * -------------------------- 52 * 53 * TCP/IP uses an IP classifier in conjunction with squeue where specific 54 * connections are assigned to specific squeue (based on various policies), 55 * at the connection creation time. Once assigned, the connection to 56 * squeue mapping is never changed and all future packets for that 57 * connection are processed on that squeue. The connection ("conn") to 58 * squeue mapping is stored in "conn_t" member "conn_sqp". 59 * 60 * Since the processing of the connection cuts across multiple layers 61 * but still allows packets for different connnection to be processed on 62 * other CPU/squeues, squeues are also termed as "Vertical Perimeter" or 63 * "Per Connection Vertical Perimeter". 64 * 65 * Processing Model: 66 * ----------------- 67 * 68 * Squeue doesn't necessary processes packets with its own worker thread. 69 * The callers can pick if they just want to queue the packet, process 70 * their packet if nothing is queued or drain and process. The first two 71 * modes are typically employed when the packet was generated while 72 * already doing the processing behind the squeue and last mode (drain 73 * and process) is typically employed when the thread is entering squeue 74 * for the first time. The squeue still imposes a finite time limit 75 * for which a external thread can do processing after which it switches 76 * processing to its own worker thread. 77 * 78 * Once created, squeues are never deleted. Hence squeue pointers are 79 * always valid. This means that functions outside the squeue can still 80 * refer safely to conn_sqp and their is no need for ref counts. 81 * 82 * Only a thread executing in the squeue can change the squeue of the 83 * connection. It does so by calling a squeue framework function to do this. 84 * After changing the squeue, the thread must leave the squeue. It must not 85 * continue to execute any code that needs squeue protection. 86 * 87 * The squeue framework, after entering the squeue, checks if the current 88 * squeue matches the conn_sqp. If the check fails, the packet is delivered 89 * to right squeue. 90 * 91 * Polling Model: 92 * -------------- 93 * 94 * Squeues can control the rate of packet arrival into itself from the 95 * NIC or specific Rx ring within a NIC. As part of capability negotiation 96 * between IP and MAC layer, squeue are created for each TCP soft ring 97 * (or TCP Rx ring - to be implemented in future). As part of this 98 * negotiation, squeues get a cookie for underlying soft ring or Rx 99 * ring, a function to turn off incoming packets and a function to call 100 * to poll for packets. This helps schedule the receive side packet 101 * processing so that queue backlog doesn't build up and packet processing 102 * doesn't keep getting disturbed by high priority interrupts. As part 103 * of this mode, as soon as a backlog starts building, squeue turns off 104 * the interrupts and switches to poll mode. In poll mode, when poll 105 * thread goes down to retrieve packets, it retrieves them in the form of 106 * a chain which improves performance even more. As the squeue/softring 107 * system gets more packets, it gets more efficient by switching to 108 * polling more often and dealing with larger packet chains. 109 * 110 */ 111 112 #include <sys/types.h> 113 #include <sys/cmn_err.h> 114 #include <sys/debug.h> 115 #include <sys/kmem.h> 116 #include <sys/cpuvar.h> 117 #include <sys/condvar_impl.h> 118 #include <sys/systm.h> 119 #include <sys/callb.h> 120 #include <sys/sdt.h> 121 #include <sys/ddi.h> 122 #include <sys/sunddi.h> 123 124 #include <inet/ipclassifier.h> 125 #include <inet/udp_impl.h> 126 127 #include <sys/squeue_impl.h> 128 129 static void squeue_fire(void *); 130 static void squeue_drain(squeue_t *, uint_t, hrtime_t); 131 static void squeue_worker(squeue_t *sqp); 132 static void squeue_polling_thread(squeue_t *sqp); 133 134 kmem_cache_t *squeue_cache; 135 136 #define SQUEUE_MSEC_TO_NSEC 1000000 137 138 int squeue_drain_ms = 20; 139 int squeue_workerwait_ms = 0; 140 141 /* The values above converted to ticks or nano seconds */ 142 static int squeue_drain_ns = 0; 143 static int squeue_workerwait_tick = 0; 144 145 #define MAX_BYTES_TO_PICKUP 150000 146 147 #define ENQUEUE_CHAIN(sqp, mp, tail, cnt) { \ 148 /* \ 149 * Enqueue our mblk chain. \ 150 */ \ 151 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 152 \ 153 if ((sqp)->sq_last != NULL) \ 154 (sqp)->sq_last->b_next = (mp); \ 155 else \ 156 (sqp)->sq_first = (mp); \ 157 (sqp)->sq_last = (tail); \ 158 (sqp)->sq_count += (cnt); \ 159 ASSERT((sqp)->sq_count > 0); \ 160 DTRACE_PROBE4(squeue__enqueuechain, squeue_t *, sqp, \ 161 mblk_t *, mp, mblk_t *, tail, int, cnt); \ 162 \ 163 } 164 165 /* 166 * Blank the receive ring (in this case it is the soft ring). When 167 * blanked, the soft ring will not send any more packets up. 168 * Blanking may not succeed when there is a CPU already in the soft 169 * ring sending packets up. In that case, SQS_POLLING will not be 170 * set. 171 */ 172 #define SQS_POLLING_ON(sqp, sq_poll_capable, rx_ring) { \ 173 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 174 if (sq_poll_capable) { \ 175 ASSERT(rx_ring != NULL); \ 176 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \ 177 if (!(sqp->sq_state & SQS_POLLING)) { \ 178 if (rx_ring->rr_intr_disable(rx_ring->rr_intr_handle)) \ 179 sqp->sq_state |= SQS_POLLING; \ 180 } \ 181 } \ 182 } 183 184 #define SQS_POLLING_OFF(sqp, sq_poll_capable, rx_ring) { \ 185 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 186 if (sq_poll_capable) { \ 187 ASSERT(rx_ring != NULL); \ 188 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \ 189 if (sqp->sq_state & SQS_POLLING) { \ 190 sqp->sq_state &= ~SQS_POLLING; \ 191 rx_ring->rr_intr_enable(rx_ring->rr_intr_handle); \ 192 } \ 193 } \ 194 } 195 196 /* Wakeup poll thread only if SQS_POLLING is set */ 197 #define SQS_POLL_RING(sqp) { \ 198 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 199 if (sqp->sq_state & SQS_POLLING) { \ 200 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \ 201 if (!(sqp->sq_state & SQS_GET_PKTS)) { \ 202 sqp->sq_state |= SQS_GET_PKTS; \ 203 cv_signal(&sqp->sq_poll_cv); \ 204 } \ 205 } \ 206 } 207 208 #ifdef DEBUG 209 #define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) { \ 210 (sqp)->sq_curmp = (mp); \ 211 (sqp)->sq_curproc = (proc); \ 212 (sqp)->sq_connp = (connp); \ 213 (mp)->b_tag = (sqp)->sq_tag = (tag); \ 214 } 215 216 #define SQUEUE_DBG_CLEAR(sqp) { \ 217 (sqp)->sq_curmp = NULL; \ 218 (sqp)->sq_curproc = NULL; \ 219 (sqp)->sq_connp = NULL; \ 220 } 221 #else 222 #define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) 223 #define SQUEUE_DBG_CLEAR(sqp) 224 #endif 225 226 void 227 squeue_init(void) 228 { 229 squeue_cache = kmem_cache_create("squeue_cache", 230 sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0); 231 232 squeue_drain_ns = squeue_drain_ms * SQUEUE_MSEC_TO_NSEC; 233 squeue_workerwait_tick = MSEC_TO_TICK_ROUNDUP(squeue_workerwait_ms); 234 } 235 236 /* ARGSUSED */ 237 squeue_t * 238 squeue_create(clock_t wait, pri_t pri) 239 { 240 squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP); 241 242 bzero(sqp, sizeof (squeue_t)); 243 sqp->sq_bind = PBIND_NONE; 244 sqp->sq_priority = pri; 245 sqp->sq_wait = MSEC_TO_TICK(wait); 246 sqp->sq_worker = thread_create(NULL, 0, squeue_worker, 247 sqp, 0, &p0, TS_RUN, pri); 248 249 sqp->sq_poll_thr = thread_create(NULL, 0, squeue_polling_thread, 250 sqp, 0, &p0, TS_RUN, pri); 251 252 sqp->sq_enter = squeue_enter; 253 sqp->sq_drain = squeue_drain; 254 255 return (sqp); 256 } 257 258 /* 259 * Bind squeue worker thread to the specified CPU, given by CPU id. 260 * If the CPU id value is -1, bind the worker thread to the value 261 * specified in sq_bind field. If a thread is already bound to a 262 * different CPU, unbind it from the old CPU and bind to the new one. 263 */ 264 265 void 266 squeue_bind(squeue_t *sqp, processorid_t bind) 267 { 268 mutex_enter(&sqp->sq_lock); 269 ASSERT(sqp->sq_bind != PBIND_NONE || bind != PBIND_NONE); 270 ASSERT(MUTEX_HELD(&cpu_lock)); 271 272 if (sqp->sq_state & SQS_BOUND) { 273 if (sqp->sq_bind == bind) { 274 mutex_exit(&sqp->sq_lock); 275 return; 276 } 277 thread_affinity_clear(sqp->sq_worker); 278 } else { 279 sqp->sq_state |= SQS_BOUND; 280 } 281 282 if (bind != PBIND_NONE) 283 sqp->sq_bind = bind; 284 285 thread_affinity_set(sqp->sq_worker, sqp->sq_bind); 286 mutex_exit(&sqp->sq_lock); 287 } 288 289 void 290 squeue_unbind(squeue_t *sqp) 291 { 292 mutex_enter(&sqp->sq_lock); 293 if (!(sqp->sq_state & SQS_BOUND)) { 294 mutex_exit(&sqp->sq_lock); 295 return; 296 } 297 298 sqp->sq_state &= ~SQS_BOUND; 299 thread_affinity_clear(sqp->sq_worker); 300 mutex_exit(&sqp->sq_lock); 301 } 302 303 void 304 squeue_worker_wakeup(squeue_t *sqp) 305 { 306 timeout_id_t tid = (sqp)->sq_tid; 307 308 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); 309 310 if (sqp->sq_wait == 0) { 311 ASSERT(tid == 0); 312 ASSERT(!(sqp->sq_state & SQS_TMO_PROG)); 313 sqp->sq_awaken = ddi_get_lbolt(); 314 cv_signal(&sqp->sq_worker_cv); 315 mutex_exit(&sqp->sq_lock); 316 return; 317 } 318 319 /* 320 * Queue isn't being processed, so take 321 * any post enqueue actions needed before leaving. 322 */ 323 if (tid != 0) { 324 /* 325 * Waiting for an enter() to process mblk(s). 326 */ 327 clock_t now = ddi_get_lbolt(); 328 clock_t waited = now - sqp->sq_awaken; 329 330 if (TICK_TO_MSEC(waited) >= sqp->sq_wait) { 331 /* 332 * Times up and have a worker thread 333 * waiting for work, so schedule it. 334 */ 335 sqp->sq_tid = 0; 336 sqp->sq_awaken = now; 337 cv_signal(&sqp->sq_worker_cv); 338 mutex_exit(&sqp->sq_lock); 339 (void) untimeout(tid); 340 return; 341 } 342 mutex_exit(&sqp->sq_lock); 343 return; 344 } else if (sqp->sq_state & SQS_TMO_PROG) { 345 mutex_exit(&sqp->sq_lock); 346 return; 347 } else { 348 clock_t wait = sqp->sq_wait; 349 /* 350 * Wait up to sqp->sq_wait ms for an 351 * enter() to process this queue. We 352 * don't want to contend on timeout locks 353 * with sq_lock held for performance reasons, 354 * so drop the sq_lock before calling timeout 355 * but we need to check if timeout is required 356 * after re acquiring the sq_lock. Once 357 * the sq_lock is dropped, someone else could 358 * have processed the packet or the timeout could 359 * have already fired. 360 */ 361 sqp->sq_state |= SQS_TMO_PROG; 362 mutex_exit(&sqp->sq_lock); 363 tid = timeout(squeue_fire, sqp, wait); 364 mutex_enter(&sqp->sq_lock); 365 /* Check again if we still need the timeout */ 366 if (((sqp->sq_state & (SQS_PROC|SQS_TMO_PROG)) == 367 SQS_TMO_PROG) && (sqp->sq_tid == 0) && 368 (sqp->sq_first != NULL)) { 369 sqp->sq_state &= ~SQS_TMO_PROG; 370 sqp->sq_tid = tid; 371 mutex_exit(&sqp->sq_lock); 372 return; 373 } else { 374 if (sqp->sq_state & SQS_TMO_PROG) { 375 sqp->sq_state &= ~SQS_TMO_PROG; 376 mutex_exit(&sqp->sq_lock); 377 (void) untimeout(tid); 378 } else { 379 /* 380 * The timer fired before we could 381 * reacquire the sq_lock. squeue_fire 382 * removes the SQS_TMO_PROG flag 383 * and we don't need to do anything 384 * else. 385 */ 386 mutex_exit(&sqp->sq_lock); 387 } 388 } 389 } 390 391 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 392 } 393 394 /* 395 * squeue_enter() - enter squeue sqp with mblk mp (which can be 396 * a chain), while tail points to the end and cnt in number of 397 * mblks in the chain. 398 * 399 * For a chain of single packet (i.e. mp == tail), go through the 400 * fast path if no one is processing the squeue and nothing is queued. 401 * 402 * The proc and arg for each mblk is already stored in the mblk in 403 * appropriate places. 404 * 405 * The process_flag specifies if we are allowed to process the mblk 406 * and drain in the entering thread context. If process_flag is 407 * SQ_FILL, then we just queue the mblk and return (after signaling 408 * the worker thread if no one else is processing the squeue). 409 * 410 * The ira argument can be used when the count is one. 411 * For a chain the caller needs to prepend any needed mblks from 412 * ip_recv_attr_to_mblk(). 413 */ 414 /* ARGSUSED */ 415 void 416 squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt, 417 ip_recv_attr_t *ira, int process_flag, uint8_t tag) 418 { 419 conn_t *connp; 420 sqproc_t proc; 421 hrtime_t now; 422 423 ASSERT(sqp != NULL); 424 ASSERT(mp != NULL); 425 ASSERT(tail != NULL); 426 ASSERT(cnt > 0); 427 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 428 ASSERT(ira == NULL || cnt == 1); 429 430 mutex_enter(&sqp->sq_lock); 431 432 /* 433 * Try to process the packet if SQ_FILL flag is not set and 434 * we are allowed to process the squeue. The SQ_NODRAIN is 435 * ignored if the packet chain consists of more than 1 packet. 436 */ 437 if (!(sqp->sq_state & SQS_PROC) && ((process_flag == SQ_PROCESS) || 438 (process_flag == SQ_NODRAIN && sqp->sq_first == NULL))) { 439 /* 440 * See if anything is already queued. If we are the 441 * first packet, do inline processing else queue the 442 * packet and do the drain. 443 */ 444 if (sqp->sq_first == NULL && cnt == 1) { 445 /* 446 * Fast-path, ok to process and nothing queued. 447 */ 448 sqp->sq_state |= (SQS_PROC|SQS_FAST); 449 sqp->sq_run = curthread; 450 mutex_exit(&sqp->sq_lock); 451 452 /* 453 * We are the chain of 1 packet so 454 * go through this fast path. 455 */ 456 ASSERT(mp->b_prev != NULL); 457 ASSERT(mp->b_queue != NULL); 458 connp = (conn_t *)mp->b_prev; 459 mp->b_prev = NULL; 460 proc = (sqproc_t)mp->b_queue; 461 mp->b_queue = NULL; 462 ASSERT(proc != NULL && connp != NULL); 463 ASSERT(mp->b_next == NULL); 464 465 /* 466 * Handle squeue switching. More details in the 467 * block comment at the top of the file 468 */ 469 if (connp->conn_sqp == sqp) { 470 SQUEUE_DBG_SET(sqp, mp, proc, connp, 471 tag); 472 connp->conn_on_sqp = B_TRUE; 473 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 474 sqp, mblk_t *, mp, conn_t *, connp); 475 (*proc)(connp, mp, sqp, ira); 476 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 477 sqp, conn_t *, connp); 478 connp->conn_on_sqp = B_FALSE; 479 SQUEUE_DBG_CLEAR(sqp); 480 CONN_DEC_REF(connp); 481 } else { 482 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, 483 connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE); 484 } 485 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 486 mutex_enter(&sqp->sq_lock); 487 sqp->sq_state &= ~(SQS_PROC|SQS_FAST); 488 sqp->sq_run = NULL; 489 if (sqp->sq_first == NULL || 490 process_flag == SQ_NODRAIN) { 491 if (sqp->sq_first != NULL) { 492 squeue_worker_wakeup(sqp); 493 return; 494 } 495 /* 496 * We processed inline our packet and nothing 497 * new has arrived. We are done. In case any 498 * control actions are pending, wake up the 499 * worker. 500 */ 501 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) 502 cv_signal(&sqp->sq_worker_cv); 503 mutex_exit(&sqp->sq_lock); 504 return; 505 } 506 } else { 507 if (ira != NULL) { 508 mblk_t *attrmp; 509 510 ASSERT(cnt == 1); 511 attrmp = ip_recv_attr_to_mblk(ira); 512 if (attrmp == NULL) { 513 mutex_exit(&sqp->sq_lock); 514 ip_drop_input("squeue: " 515 "ip_recv_attr_to_mblk", 516 mp, NULL); 517 /* Caller already set b_prev/b_next */ 518 mp->b_prev = mp->b_next = NULL; 519 freemsg(mp); 520 return; 521 } 522 ASSERT(attrmp->b_cont == NULL); 523 attrmp->b_cont = mp; 524 /* Move connp and func to new */ 525 attrmp->b_queue = mp->b_queue; 526 mp->b_queue = NULL; 527 attrmp->b_prev = mp->b_prev; 528 mp->b_prev = NULL; 529 530 ASSERT(mp == tail); 531 tail = mp = attrmp; 532 } 533 534 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 535 #ifdef DEBUG 536 mp->b_tag = tag; 537 #endif 538 } 539 /* 540 * We are here because either we couldn't do inline 541 * processing (because something was already queued), 542 * or we had a chain of more than one packet, 543 * or something else arrived after we were done with 544 * inline processing. 545 */ 546 ASSERT(MUTEX_HELD(&sqp->sq_lock)); 547 ASSERT(sqp->sq_first != NULL); 548 now = gethrtime(); 549 sqp->sq_drain(sqp, SQS_ENTER, now + squeue_drain_ns); 550 551 /* 552 * If we didn't do a complete drain, the worker 553 * thread was already signalled by squeue_drain. 554 * In case any control actions are pending, wake 555 * up the worker. 556 */ 557 sqp->sq_run = NULL; 558 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) 559 cv_signal(&sqp->sq_worker_cv); 560 mutex_exit(&sqp->sq_lock); 561 return; 562 } else { 563 /* 564 * We let a thread processing a squeue reenter only 565 * once. This helps the case of incoming connection 566 * where a SYN-ACK-ACK that triggers the conn_ind 567 * doesn't have to queue the packet if listener and 568 * eager are on the same squeue. Also helps the 569 * loopback connection where the two ends are bound 570 * to the same squeue (which is typical on single 571 * CPU machines). 572 * 573 * We let the thread reenter only once for the fear 574 * of stack getting blown with multiple traversal. 575 */ 576 connp = (conn_t *)mp->b_prev; 577 if (!(sqp->sq_state & SQS_REENTER) && 578 (process_flag != SQ_FILL) && (sqp->sq_first == NULL) && 579 (sqp->sq_run == curthread) && (cnt == 1) && 580 (connp->conn_on_sqp == B_FALSE)) { 581 sqp->sq_state |= SQS_REENTER; 582 mutex_exit(&sqp->sq_lock); 583 584 ASSERT(mp->b_prev != NULL); 585 ASSERT(mp->b_queue != NULL); 586 587 mp->b_prev = NULL; 588 proc = (sqproc_t)mp->b_queue; 589 mp->b_queue = NULL; 590 591 /* 592 * Handle squeue switching. More details in the 593 * block comment at the top of the file 594 */ 595 if (connp->conn_sqp == sqp) { 596 connp->conn_on_sqp = B_TRUE; 597 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 598 sqp, mblk_t *, mp, conn_t *, connp); 599 (*proc)(connp, mp, sqp, ira); 600 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 601 sqp, conn_t *, connp); 602 connp->conn_on_sqp = B_FALSE; 603 CONN_DEC_REF(connp); 604 } else { 605 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, 606 connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE); 607 } 608 609 mutex_enter(&sqp->sq_lock); 610 sqp->sq_state &= ~SQS_REENTER; 611 mutex_exit(&sqp->sq_lock); 612 return; 613 } 614 615 /* 616 * Queue is already being processed or there is already 617 * one or more paquets on the queue. Enqueue the 618 * packet and wakeup the squeue worker thread if the 619 * squeue is not being processed. 620 */ 621 #ifdef DEBUG 622 mp->b_tag = tag; 623 #endif 624 if (ira != NULL) { 625 mblk_t *attrmp; 626 627 ASSERT(cnt == 1); 628 attrmp = ip_recv_attr_to_mblk(ira); 629 if (attrmp == NULL) { 630 mutex_exit(&sqp->sq_lock); 631 ip_drop_input("squeue: ip_recv_attr_to_mblk", 632 mp, NULL); 633 /* Caller already set b_prev/b_next */ 634 mp->b_prev = mp->b_next = NULL; 635 freemsg(mp); 636 return; 637 } 638 ASSERT(attrmp->b_cont == NULL); 639 attrmp->b_cont = mp; 640 /* Move connp and func to new */ 641 attrmp->b_queue = mp->b_queue; 642 mp->b_queue = NULL; 643 attrmp->b_prev = mp->b_prev; 644 mp->b_prev = NULL; 645 646 ASSERT(mp == tail); 647 tail = mp = attrmp; 648 } 649 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 650 if (!(sqp->sq_state & SQS_PROC)) { 651 squeue_worker_wakeup(sqp); 652 return; 653 } 654 /* 655 * In case any control actions are pending, wake 656 * up the worker. 657 */ 658 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) 659 cv_signal(&sqp->sq_worker_cv); 660 mutex_exit(&sqp->sq_lock); 661 return; 662 } 663 } 664 665 /* 666 * PRIVATE FUNCTIONS 667 */ 668 669 static void 670 squeue_fire(void *arg) 671 { 672 squeue_t *sqp = arg; 673 uint_t state; 674 675 mutex_enter(&sqp->sq_lock); 676 677 state = sqp->sq_state; 678 if (sqp->sq_tid == 0 && !(state & SQS_TMO_PROG)) { 679 mutex_exit(&sqp->sq_lock); 680 return; 681 } 682 683 sqp->sq_tid = 0; 684 /* 685 * The timeout fired before we got a chance to set it. 686 * Process it anyway but remove the SQS_TMO_PROG so that 687 * the guy trying to set the timeout knows that it has 688 * already been processed. 689 */ 690 if (state & SQS_TMO_PROG) 691 sqp->sq_state &= ~SQS_TMO_PROG; 692 693 if (!(state & SQS_PROC)) { 694 sqp->sq_awaken = ddi_get_lbolt(); 695 cv_signal(&sqp->sq_worker_cv); 696 } 697 mutex_exit(&sqp->sq_lock); 698 } 699 700 static void 701 squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire) 702 { 703 mblk_t *mp; 704 mblk_t *head; 705 sqproc_t proc; 706 conn_t *connp; 707 timeout_id_t tid; 708 ill_rx_ring_t *sq_rx_ring = sqp->sq_rx_ring; 709 hrtime_t now; 710 boolean_t did_wakeup = B_FALSE; 711 boolean_t sq_poll_capable; 712 ip_recv_attr_t *ira, iras; 713 714 sq_poll_capable = (sqp->sq_state & SQS_POLL_CAPAB) != 0; 715 again: 716 ASSERT(mutex_owned(&sqp->sq_lock)); 717 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 718 SQS_POLL_QUIESCE_DONE))); 719 720 head = sqp->sq_first; 721 sqp->sq_first = NULL; 722 sqp->sq_last = NULL; 723 sqp->sq_count = 0; 724 725 if ((tid = sqp->sq_tid) != 0) 726 sqp->sq_tid = 0; 727 728 sqp->sq_state |= SQS_PROC | proc_type; 729 730 /* 731 * We have backlog built up. Switch to polling mode if the 732 * device underneath allows it. Need to do it so that 733 * more packets don't come in and disturb us (by contending 734 * for sq_lock or higher priority thread preempting us). 735 * 736 * The worker thread is allowed to do active polling while we 737 * just disable the interrupts for drain by non worker (kernel 738 * or userland) threads so they can peacefully process the 739 * packets during time allocated to them. 740 */ 741 SQS_POLLING_ON(sqp, sq_poll_capable, sq_rx_ring); 742 mutex_exit(&sqp->sq_lock); 743 744 if (tid != 0) 745 (void) untimeout(tid); 746 747 while ((mp = head) != NULL) { 748 749 head = mp->b_next; 750 mp->b_next = NULL; 751 752 proc = (sqproc_t)mp->b_queue; 753 mp->b_queue = NULL; 754 connp = (conn_t *)mp->b_prev; 755 mp->b_prev = NULL; 756 757 /* Is there an ip_recv_attr_t to handle? */ 758 if (ip_recv_attr_is_mblk(mp)) { 759 mblk_t *attrmp = mp; 760 761 ASSERT(attrmp->b_cont != NULL); 762 763 mp = attrmp->b_cont; 764 attrmp->b_cont = NULL; 765 ASSERT(mp->b_queue == NULL); 766 ASSERT(mp->b_prev == NULL); 767 768 if (!ip_recv_attr_from_mblk(attrmp, &iras)) { 769 /* The ill or ip_stack_t disappeared on us */ 770 ip_drop_input("ip_recv_attr_from_mblk", 771 mp, NULL); 772 ira_cleanup(&iras, B_TRUE); 773 CONN_DEC_REF(connp); 774 continue; 775 } 776 ira = &iras; 777 } else { 778 ira = NULL; 779 } 780 781 782 /* 783 * Handle squeue switching. More details in the 784 * block comment at the top of the file 785 */ 786 if (connp->conn_sqp == sqp) { 787 SQUEUE_DBG_SET(sqp, mp, proc, connp, 788 mp->b_tag); 789 connp->conn_on_sqp = B_TRUE; 790 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 791 sqp, mblk_t *, mp, conn_t *, connp); 792 (*proc)(connp, mp, sqp, ira); 793 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 794 sqp, conn_t *, connp); 795 connp->conn_on_sqp = B_FALSE; 796 CONN_DEC_REF(connp); 797 } else { 798 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, connp, ira, 799 SQ_FILL, SQTAG_SQUEUE_CHANGE); 800 } 801 if (ira != NULL) 802 ira_cleanup(ira, B_TRUE); 803 } 804 805 SQUEUE_DBG_CLEAR(sqp); 806 807 mutex_enter(&sqp->sq_lock); 808 809 /* 810 * Check if there is still work to do (either more arrived or timer 811 * expired). If we are the worker thread and we are polling capable, 812 * continue doing the work since no one else is around to do the 813 * work anyway (but signal the poll thread to retrieve some packets 814 * in the meanwhile). If we are not the worker thread, just 815 * signal the worker thread to take up the work if processing time 816 * has expired. 817 */ 818 if (sqp->sq_first != NULL) { 819 /* 820 * Still more to process. If time quanta not expired, we 821 * should let the drain go on. The worker thread is allowed 822 * to drain as long as there is anything left. 823 */ 824 now = gethrtime(); 825 if ((now < expire) || (proc_type == SQS_WORKER)) { 826 /* 827 * If time not expired or we are worker thread and 828 * this squeue is polling capable, continue to do 829 * the drain. 830 * 831 * We turn off interrupts for all userland threads 832 * doing drain but we do active polling only for 833 * worker thread. 834 * 835 * Calling SQS_POLL_RING() even in the case of 836 * SQS_POLLING_ON() not succeeding is ok as 837 * SQS_POLL_RING() will not wake up poll thread 838 * if SQS_POLLING bit is not set. 839 */ 840 if (proc_type == SQS_WORKER) 841 SQS_POLL_RING(sqp); 842 goto again; 843 } else { 844 did_wakeup = B_TRUE; 845 sqp->sq_awaken = ddi_get_lbolt(); 846 cv_signal(&sqp->sq_worker_cv); 847 } 848 } 849 850 /* 851 * If the poll thread is already running, just return. The 852 * poll thread continues to hold the proc and will finish 853 * processing. 854 */ 855 if (sqp->sq_state & SQS_GET_PKTS) { 856 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 857 SQS_POLL_QUIESCE_DONE))); 858 sqp->sq_state &= ~proc_type; 859 return; 860 } 861 862 /* 863 * 864 * If we are the worker thread and no work is left, send the poll 865 * thread down once more to see if something arrived. Otherwise, 866 * turn the interrupts back on and we are done. 867 */ 868 if ((proc_type == SQS_WORKER) && (sqp->sq_state & SQS_POLLING)) { 869 /* 870 * Do one last check to see if anything arrived 871 * in the NIC. We leave the SQS_PROC set to ensure 872 * that poll thread keeps the PROC and can decide 873 * if it needs to turn polling off or continue 874 * processing. 875 * 876 * If we drop the SQS_PROC here and poll thread comes 877 * up empty handed, it can not safely turn polling off 878 * since someone else could have acquired the PROC 879 * and started draining. The previously running poll 880 * thread and the current thread doing drain would end 881 * up in a race for turning polling on/off and more 882 * complex code would be required to deal with it. 883 * 884 * Its lot simpler for drain to hand the SQS_PROC to 885 * poll thread (if running) and let poll thread finish 886 * without worrying about racing with any other thread. 887 */ 888 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 889 SQS_POLL_QUIESCE_DONE))); 890 SQS_POLL_RING(sqp); 891 sqp->sq_state &= ~proc_type; 892 } else { 893 /* 894 * The squeue is either not capable of polling or the 895 * attempt to blank (i.e., turn SQS_POLLING_ON()) was 896 * unsuccessful or poll thread already finished 897 * processing and didn't find anything. Since there 898 * is nothing queued and we already turn polling on 899 * (for all threads doing drain), we should turn 900 * polling off and relinquish the PROC. 901 */ 902 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 903 SQS_POLL_QUIESCE_DONE))); 904 SQS_POLLING_OFF(sqp, sq_poll_capable, sq_rx_ring); 905 sqp->sq_state &= ~(SQS_PROC | proc_type); 906 if (!did_wakeup && sqp->sq_first != NULL) { 907 squeue_worker_wakeup(sqp); 908 mutex_enter(&sqp->sq_lock); 909 } 910 /* 911 * If we are not the worker and there is a pending quiesce 912 * event, wake up the worker 913 */ 914 if ((proc_type != SQS_WORKER) && 915 (sqp->sq_state & SQS_WORKER_THR_CONTROL)) 916 cv_signal(&sqp->sq_worker_cv); 917 } 918 } 919 920 /* 921 * Quiesce, Restart, or Cleanup of the squeue poll thread. 922 * 923 * Quiesce and Restart: After an squeue poll thread has been quiesced, it does 924 * not attempt to poll the underlying soft ring any more. The quiesce is 925 * triggered by the mac layer when it wants to quiesce a soft ring. Typically 926 * control operations such as changing the fanout of a NIC or VNIC (dladm 927 * setlinkprop) need to quiesce data flow before changing the wiring. 928 * The operation is done by the mac layer, but it calls back into IP to 929 * quiesce the soft ring. After completing the operation (say increase or 930 * decrease of the fanout) the mac layer then calls back into IP to restart 931 * the quiesced soft ring. 932 * 933 * Cleanup: This is triggered when the squeue binding to a soft ring is 934 * removed permanently. Typically interface plumb and unplumb would trigger 935 * this. It can also be triggered from the mac layer when a soft ring is 936 * being deleted say as the result of a fanout reduction. Since squeues are 937 * never deleted, the cleanup marks the squeue as fit for recycling and 938 * moves it to the zeroth squeue set. 939 */ 940 static void 941 squeue_poll_thr_control(squeue_t *sqp) 942 { 943 if (sqp->sq_state & SQS_POLL_THR_RESTART) { 944 /* Restart implies a previous quiesce */ 945 ASSERT(sqp->sq_state & SQS_POLL_THR_QUIESCED); 946 sqp->sq_state &= ~(SQS_POLL_THR_QUIESCED | 947 SQS_POLL_THR_RESTART); 948 sqp->sq_state |= SQS_POLL_CAPAB; 949 cv_signal(&sqp->sq_worker_cv); 950 return; 951 } 952 953 if (sqp->sq_state & SQS_POLL_THR_QUIESCE) { 954 sqp->sq_state |= SQS_POLL_THR_QUIESCED; 955 sqp->sq_state &= ~SQS_POLL_THR_QUIESCE; 956 cv_signal(&sqp->sq_worker_cv); 957 return; 958 } 959 } 960 961 /* 962 * POLLING Notes 963 * 964 * With polling mode, we want to do as much processing as we possibly can 965 * in worker thread context. The sweet spot is worker thread keeps doing 966 * work all the time in polling mode and writers etc. keep dumping packets 967 * to worker thread. Occassionally, we send the poll thread (running at 968 * lower priority to NIC to get the chain of packets to feed to worker). 969 * Sending the poll thread down to NIC is dependant on 3 criterions 970 * 971 * 1) Its always driven from squeue_drain and only if worker thread is 972 * doing the drain. 973 * 2) We clear the backlog once and more packets arrived in between. 974 * Before starting drain again, send the poll thread down if 975 * the drain is being done by worker thread. 976 * 3) Before exiting the squeue_drain, if the poll thread is not already 977 * working and we are the worker thread, try to poll one more time. 978 * 979 * For latency sake, we do allow any thread calling squeue_enter 980 * to process its packet provided: 981 * 982 * 1) Nothing is queued 983 * 2) If more packets arrived in between, the non worker thread are allowed 984 * to do the drain till their time quanta expired provided SQS_GET_PKTS 985 * wasn't set in between. 986 * 987 * Avoiding deadlocks with interrupts 988 * ================================== 989 * 990 * One of the big problem is that we can't send poll_thr down while holding 991 * the sq_lock since the thread can block. So we drop the sq_lock before 992 * calling sq_get_pkts(). We keep holding the SQS_PROC as long as the 993 * poll thread is running so that no other thread can acquire the 994 * perimeter in between. If the squeue_drain gets done (no more work 995 * left), it leaves the SQS_PROC set if poll thread is running. 996 */ 997 998 /* 999 * This is the squeue poll thread. In poll mode, it polls the underlying 1000 * TCP softring and feeds packets into the squeue. The worker thread then 1001 * drains the squeue. The poll thread also responds to control signals for 1002 * quiesceing, restarting, or cleanup of an squeue. These are driven by 1003 * control operations like plumb/unplumb or as a result of dynamic Rx ring 1004 * related operations that are driven from the mac layer. 1005 */ 1006 static void 1007 squeue_polling_thread(squeue_t *sqp) 1008 { 1009 kmutex_t *lock = &sqp->sq_lock; 1010 kcondvar_t *async = &sqp->sq_poll_cv; 1011 ip_mac_rx_t sq_get_pkts; 1012 ip_accept_t ip_accept; 1013 ill_rx_ring_t *sq_rx_ring; 1014 ill_t *sq_ill; 1015 mblk_t *head, *tail, *mp; 1016 uint_t cnt; 1017 void *sq_mac_handle; 1018 callb_cpr_t cprinfo; 1019 size_t bytes_to_pickup; 1020 uint32_t ctl_state; 1021 1022 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_poll"); 1023 mutex_enter(lock); 1024 1025 for (;;) { 1026 CALLB_CPR_SAFE_BEGIN(&cprinfo); 1027 cv_wait(async, lock); 1028 CALLB_CPR_SAFE_END(&cprinfo, lock); 1029 1030 ctl_state = sqp->sq_state & (SQS_POLL_THR_CONTROL | 1031 SQS_POLL_THR_QUIESCED); 1032 if (ctl_state != 0) { 1033 /* 1034 * If the squeue is quiesced, then wait for a control 1035 * request. A quiesced squeue must not poll the 1036 * underlying soft ring. 1037 */ 1038 if (ctl_state == SQS_POLL_THR_QUIESCED) 1039 continue; 1040 /* 1041 * Act on control requests to quiesce, cleanup or 1042 * restart an squeue 1043 */ 1044 squeue_poll_thr_control(sqp); 1045 continue; 1046 } 1047 1048 if (!(sqp->sq_state & SQS_POLL_CAPAB)) 1049 continue; 1050 1051 ASSERT((sqp->sq_state & 1052 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) == 1053 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)); 1054 1055 poll_again: 1056 sq_rx_ring = sqp->sq_rx_ring; 1057 sq_get_pkts = sq_rx_ring->rr_rx; 1058 sq_mac_handle = sq_rx_ring->rr_rx_handle; 1059 ip_accept = sq_rx_ring->rr_ip_accept; 1060 sq_ill = sq_rx_ring->rr_ill; 1061 bytes_to_pickup = MAX_BYTES_TO_PICKUP; 1062 mutex_exit(lock); 1063 head = sq_get_pkts(sq_mac_handle, bytes_to_pickup); 1064 mp = NULL; 1065 if (head != NULL) { 1066 /* 1067 * We got the packet chain from the mac layer. It 1068 * would be nice to be able to process it inline 1069 * for better performance but we need to give 1070 * IP a chance to look at this chain to ensure 1071 * that packets are really meant for this squeue 1072 * and do the IP processing. 1073 */ 1074 mp = ip_accept(sq_ill, sq_rx_ring, sqp, head, 1075 &tail, &cnt); 1076 } 1077 mutex_enter(lock); 1078 if (mp != NULL) { 1079 /* 1080 * The ip_accept function has already added an 1081 * ip_recv_attr_t mblk if that is needed. 1082 */ 1083 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 1084 } 1085 ASSERT((sqp->sq_state & 1086 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) == 1087 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)); 1088 1089 if (sqp->sq_first != NULL && !(sqp->sq_state & SQS_WORKER)) { 1090 /* 1091 * We have packets to process and worker thread 1092 * is not running. Check to see if poll thread is 1093 * allowed to process. Let it do processing only if it 1094 * picked up some packets from the NIC otherwise 1095 * wakeup the worker thread. 1096 */ 1097 if (mp != NULL) { 1098 hrtime_t now; 1099 1100 now = gethrtime(); 1101 sqp->sq_run = curthread; 1102 sqp->sq_drain(sqp, SQS_POLL_PROC, now + 1103 squeue_drain_ns); 1104 sqp->sq_run = NULL; 1105 1106 if (sqp->sq_first == NULL) 1107 goto poll_again; 1108 1109 /* 1110 * Couldn't do the entire drain because the 1111 * time limit expired, let the 1112 * worker thread take over. 1113 */ 1114 } 1115 1116 sqp->sq_awaken = ddi_get_lbolt(); 1117 /* 1118 * Put the SQS_PROC_HELD on so the worker 1119 * thread can distinguish where its called from. We 1120 * can remove the SQS_PROC flag here and turn off the 1121 * polling so that it wouldn't matter who gets the 1122 * processing but we get better performance this way 1123 * and save the cost of turn polling off and possibly 1124 * on again as soon as we start draining again. 1125 * 1126 * We can't remove the SQS_PROC flag without turning 1127 * polling off until we can guarantee that control 1128 * will return to squeue_drain immediately. 1129 */ 1130 sqp->sq_state |= SQS_PROC_HELD; 1131 sqp->sq_state &= ~SQS_GET_PKTS; 1132 cv_signal(&sqp->sq_worker_cv); 1133 } else if (sqp->sq_first == NULL && 1134 !(sqp->sq_state & SQS_WORKER)) { 1135 /* 1136 * Nothing queued and worker thread not running. 1137 * Since we hold the proc, no other thread is 1138 * processing the squeue. This means that there 1139 * is no work to be done and nothing is queued 1140 * in squeue or in NIC. Turn polling off and go 1141 * back to interrupt mode. 1142 */ 1143 sqp->sq_state &= ~(SQS_PROC|SQS_GET_PKTS); 1144 /* LINTED: constant in conditional context */ 1145 SQS_POLLING_OFF(sqp, B_TRUE, sq_rx_ring); 1146 1147 /* 1148 * If there is a pending control operation 1149 * wake up the worker, since it is currently 1150 * not running. 1151 */ 1152 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) 1153 cv_signal(&sqp->sq_worker_cv); 1154 } else { 1155 /* 1156 * Worker thread is already running. We don't need 1157 * to do anything. Indicate that poll thread is done. 1158 */ 1159 sqp->sq_state &= ~SQS_GET_PKTS; 1160 } 1161 if (sqp->sq_state & SQS_POLL_THR_CONTROL) { 1162 /* 1163 * Act on control requests to quiesce, cleanup or 1164 * restart an squeue 1165 */ 1166 squeue_poll_thr_control(sqp); 1167 } 1168 } 1169 } 1170 1171 /* 1172 * The squeue worker thread acts on any control requests to quiesce, cleanup 1173 * or restart an ill_rx_ring_t by calling this function. The worker thread 1174 * synchronizes with the squeue poll thread to complete the request and finally 1175 * wakes up the requestor when the request is completed. 1176 */ 1177 static void 1178 squeue_worker_thr_control(squeue_t *sqp) 1179 { 1180 ill_t *ill; 1181 ill_rx_ring_t *rx_ring; 1182 1183 ASSERT(MUTEX_HELD(&sqp->sq_lock)); 1184 1185 if (sqp->sq_state & SQS_POLL_RESTART) { 1186 /* Restart implies a previous quiesce. */ 1187 ASSERT((sqp->sq_state & (SQS_PROC_HELD | 1188 SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)) == 1189 (SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)); 1190 /* 1191 * Request the squeue poll thread to restart and wait till 1192 * it actually restarts. 1193 */ 1194 sqp->sq_state &= ~SQS_POLL_QUIESCE_DONE; 1195 sqp->sq_state |= SQS_POLL_THR_RESTART; 1196 cv_signal(&sqp->sq_poll_cv); 1197 while (sqp->sq_state & SQS_POLL_THR_QUIESCED) 1198 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock); 1199 sqp->sq_state &= ~(SQS_POLL_RESTART | SQS_PROC | 1200 SQS_WORKER); 1201 /* 1202 * Signal any waiter that is waiting for the restart 1203 * to complete 1204 */ 1205 sqp->sq_state |= SQS_POLL_RESTART_DONE; 1206 cv_signal(&sqp->sq_ctrlop_done_cv); 1207 return; 1208 } 1209 1210 if (sqp->sq_state & SQS_PROC_HELD) { 1211 /* The squeue poll thread handed control to us */ 1212 ASSERT(sqp->sq_state & SQS_PROC); 1213 } 1214 1215 /* 1216 * Prevent any other thread from processing the squeue 1217 * until we finish the control actions by setting SQS_PROC. 1218 * But allow ourself to reenter by setting SQS_WORKER 1219 */ 1220 sqp->sq_state |= (SQS_PROC | SQS_WORKER); 1221 1222 /* Signal the squeue poll thread and wait for it to quiesce itself */ 1223 if (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) { 1224 sqp->sq_state |= SQS_POLL_THR_QUIESCE; 1225 cv_signal(&sqp->sq_poll_cv); 1226 while (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) 1227 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock); 1228 } 1229 1230 rx_ring = sqp->sq_rx_ring; 1231 ill = rx_ring->rr_ill; 1232 /* 1233 * The lock hierarchy is as follows. 1234 * cpu_lock -> ill_lock -> sqset_lock -> sq_lock 1235 */ 1236 mutex_exit(&sqp->sq_lock); 1237 mutex_enter(&ill->ill_lock); 1238 mutex_enter(&sqp->sq_lock); 1239 1240 SQS_POLLING_OFF(sqp, (sqp->sq_state & SQS_POLL_CAPAB) != 0, 1241 sqp->sq_rx_ring); 1242 sqp->sq_state &= ~(SQS_POLL_CAPAB | SQS_GET_PKTS | SQS_PROC_HELD); 1243 if (sqp->sq_state & SQS_POLL_CLEANUP) { 1244 /* 1245 * Disassociate this squeue from its ill_rx_ring_t. 1246 * The rr_sqp, sq_rx_ring fields are protected by the 1247 * corresponding squeue, ill_lock* and sq_lock. Holding any 1248 * of them will ensure that the ring to squeue mapping does 1249 * not change. 1250 */ 1251 ASSERT(!(sqp->sq_state & SQS_DEFAULT)); 1252 1253 sqp->sq_rx_ring = NULL; 1254 rx_ring->rr_sqp = NULL; 1255 1256 sqp->sq_state &= ~(SQS_POLL_CLEANUP | SQS_POLL_THR_QUIESCED | 1257 SQS_POLL_QUIESCE_DONE); 1258 sqp->sq_ill = NULL; 1259 1260 rx_ring->rr_rx_handle = NULL; 1261 rx_ring->rr_intr_handle = NULL; 1262 rx_ring->rr_intr_enable = NULL; 1263 rx_ring->rr_intr_disable = NULL; 1264 sqp->sq_state |= SQS_POLL_CLEANUP_DONE; 1265 } else { 1266 sqp->sq_state &= ~SQS_POLL_QUIESCE; 1267 sqp->sq_state |= SQS_POLL_QUIESCE_DONE; 1268 } 1269 /* 1270 * Signal any waiter that is waiting for the quiesce or cleanup 1271 * to complete and also wait for it to actually see and reset the 1272 * SQS_POLL_CLEANUP_DONE. 1273 */ 1274 cv_signal(&sqp->sq_ctrlop_done_cv); 1275 mutex_exit(&ill->ill_lock); 1276 if (sqp->sq_state & SQS_POLL_CLEANUP_DONE) { 1277 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock); 1278 sqp->sq_state &= ~(SQS_PROC | SQS_WORKER); 1279 } 1280 } 1281 1282 static void 1283 squeue_worker(squeue_t *sqp) 1284 { 1285 kmutex_t *lock = &sqp->sq_lock; 1286 kcondvar_t *async = &sqp->sq_worker_cv; 1287 callb_cpr_t cprinfo; 1288 hrtime_t now; 1289 1290 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_worker"); 1291 mutex_enter(lock); 1292 1293 for (;;) { 1294 for (;;) { 1295 /* 1296 * If the poll thread has handed control to us 1297 * we need to break out of the wait. 1298 */ 1299 if (sqp->sq_state & SQS_PROC_HELD) 1300 break; 1301 1302 /* 1303 * If the squeue is not being processed and we either 1304 * have messages to drain or some thread has signaled 1305 * some control activity we need to break 1306 */ 1307 if (!(sqp->sq_state & SQS_PROC) && 1308 ((sqp->sq_state & SQS_WORKER_THR_CONTROL) || 1309 (sqp->sq_first != NULL))) 1310 break; 1311 1312 /* 1313 * If we have started some control action, then check 1314 * for the SQS_WORKER flag (since we don't 1315 * release the squeue) to make sure we own the squeue 1316 * and break out 1317 */ 1318 if ((sqp->sq_state & SQS_WORKER_THR_CONTROL) && 1319 (sqp->sq_state & SQS_WORKER)) 1320 break; 1321 1322 CALLB_CPR_SAFE_BEGIN(&cprinfo); 1323 cv_wait(async, lock); 1324 CALLB_CPR_SAFE_END(&cprinfo, lock); 1325 } 1326 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) { 1327 squeue_worker_thr_control(sqp); 1328 continue; 1329 } 1330 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 1331 SQS_POLL_CLEANUP_DONE | SQS_POLL_QUIESCE_DONE | 1332 SQS_WORKER_THR_CONTROL | SQS_POLL_THR_CONTROL))); 1333 1334 if (sqp->sq_state & SQS_PROC_HELD) 1335 sqp->sq_state &= ~SQS_PROC_HELD; 1336 1337 now = gethrtime(); 1338 sqp->sq_run = curthread; 1339 sqp->sq_drain(sqp, SQS_WORKER, now + squeue_drain_ns); 1340 sqp->sq_run = NULL; 1341 } 1342 } 1343 1344 uintptr_t * 1345 squeue_getprivate(squeue_t *sqp, sqprivate_t p) 1346 { 1347 ASSERT(p < SQPRIVATE_MAX); 1348 1349 return (&sqp->sq_private[p]); 1350 } 1351 1352 /* ARGSUSED */ 1353 void 1354 squeue_wakeup_conn(void *arg, mblk_t *mp, void *arg2, ip_recv_attr_t *dummy) 1355 { 1356 conn_t *connp = (conn_t *)arg; 1357 squeue_t *sqp = connp->conn_sqp; 1358 1359 /* 1360 * Mark the squeue as paused before waking up the thread stuck 1361 * in squeue_synch_enter(). 1362 */ 1363 mutex_enter(&sqp->sq_lock); 1364 sqp->sq_state |= SQS_PAUSE; 1365 1366 /* 1367 * Notify the thread that it's OK to proceed; that is done by 1368 * clearing the MSGWAITSYNC flag. The synch thread will free the mblk. 1369 */ 1370 ASSERT(mp->b_flag & MSGWAITSYNC); 1371 mp->b_flag &= ~MSGWAITSYNC; 1372 cv_broadcast(&connp->conn_sq_cv); 1373 1374 /* 1375 * We are doing something on behalf of another thread, so we have to 1376 * pause and wait until it finishes. 1377 */ 1378 while (sqp->sq_state & SQS_PAUSE) { 1379 cv_wait(&sqp->sq_synch_cv, &sqp->sq_lock); 1380 } 1381 mutex_exit(&sqp->sq_lock); 1382 } 1383 1384 int 1385 squeue_synch_enter(conn_t *connp, mblk_t *use_mp) 1386 { 1387 squeue_t *sqp; 1388 1389 again: 1390 sqp = connp->conn_sqp; 1391 1392 mutex_enter(&sqp->sq_lock); 1393 if (sqp->sq_first == NULL && !(sqp->sq_state & SQS_PROC)) { 1394 /* 1395 * We are OK to proceed if the squeue is empty, and 1396 * no one owns the squeue. 1397 * 1398 * The caller won't own the squeue as this is called from the 1399 * application. 1400 */ 1401 ASSERT(sqp->sq_run == NULL); 1402 1403 sqp->sq_state |= SQS_PROC; 1404 sqp->sq_run = curthread; 1405 mutex_exit(&sqp->sq_lock); 1406 1407 /* 1408 * Handle squeue switching. The conn's squeue can only change 1409 * while there is a thread in the squeue, which is why we do 1410 * the check after entering the squeue. If it has changed, exit 1411 * this squeue and redo everything with the new sqeueue. 1412 */ 1413 if (sqp != connp->conn_sqp) { 1414 mutex_enter(&sqp->sq_lock); 1415 sqp->sq_state &= ~SQS_PROC; 1416 sqp->sq_run = NULL; 1417 mutex_exit(&sqp->sq_lock); 1418 goto again; 1419 } 1420 #if SQUEUE_DEBUG 1421 sqp->sq_curmp = NULL; 1422 sqp->sq_curproc = NULL; 1423 sqp->sq_connp = connp; 1424 #endif 1425 connp->conn_on_sqp = B_TRUE; 1426 return (0); 1427 } else { 1428 mblk_t *mp; 1429 1430 mp = (use_mp == NULL) ? allocb(0, BPRI_MED) : use_mp; 1431 if (mp == NULL) { 1432 mutex_exit(&sqp->sq_lock); 1433 return (ENOMEM); 1434 } 1435 1436 /* 1437 * We mark the mblk as awaiting synchronous squeue access 1438 * by setting the MSGWAITSYNC flag. Once squeue_wakeup_conn 1439 * fires, MSGWAITSYNC is cleared, at which point we know we 1440 * have exclusive access. 1441 */ 1442 mp->b_flag |= MSGWAITSYNC; 1443 1444 CONN_INC_REF(connp); 1445 SET_SQUEUE(mp, squeue_wakeup_conn, connp); 1446 ENQUEUE_CHAIN(sqp, mp, mp, 1); 1447 1448 ASSERT(sqp->sq_run != curthread); 1449 1450 /* Wait until the enqueued mblk get processed. */ 1451 while (mp->b_flag & MSGWAITSYNC) 1452 cv_wait(&connp->conn_sq_cv, &sqp->sq_lock); 1453 mutex_exit(&sqp->sq_lock); 1454 1455 if (use_mp == NULL) 1456 freeb(mp); 1457 1458 return (0); 1459 } 1460 } 1461 1462 void 1463 squeue_synch_exit(conn_t *connp) 1464 { 1465 squeue_t *sqp = connp->conn_sqp; 1466 1467 mutex_enter(&sqp->sq_lock); 1468 if (sqp->sq_run == curthread) { 1469 ASSERT(sqp->sq_state & SQS_PROC); 1470 1471 sqp->sq_state &= ~SQS_PROC; 1472 sqp->sq_run = NULL; 1473 connp->conn_on_sqp = B_FALSE; 1474 1475 if (sqp->sq_first == NULL) { 1476 mutex_exit(&sqp->sq_lock); 1477 } else { 1478 /* 1479 * If this was a normal thread, then it would 1480 * (most likely) continue processing the pending 1481 * requests. Since the just completed operation 1482 * was executed synchronously, the thread should 1483 * not be delayed. To compensate, wake up the 1484 * worker thread right away when there are outstanding 1485 * requests. 1486 */ 1487 sqp->sq_awaken = ddi_get_lbolt(); 1488 cv_signal(&sqp->sq_worker_cv); 1489 mutex_exit(&sqp->sq_lock); 1490 } 1491 } else { 1492 /* 1493 * The caller doesn't own the squeue, clear the SQS_PAUSE flag, 1494 * and wake up the squeue owner, such that owner can continue 1495 * processing. 1496 */ 1497 ASSERT(sqp->sq_state & SQS_PAUSE); 1498 sqp->sq_state &= ~SQS_PAUSE; 1499 1500 /* There should be only one thread blocking on sq_synch_cv. */ 1501 cv_signal(&sqp->sq_synch_cv); 1502 mutex_exit(&sqp->sq_lock); 1503 } 1504 } 1505