1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2009 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 26 /* 27 * Squeues: General purpose serialization mechanism 28 * ------------------------------------------------ 29 * 30 * Background: 31 * ----------- 32 * 33 * This is a general purpose high-performance serialization mechanism 34 * currently used by TCP/IP. It is implement by means of a per CPU queue, 35 * a worker thread and a polling thread with are bound to the CPU 36 * associated with the squeue. The squeue is strictly FIFO for both read 37 * and write side and only one thread can process it at any given time. 38 * The design goal of squeue was to offer a very high degree of 39 * parallelization (on a per H/W execution pipeline basis) with at 40 * most one queuing. 41 * 42 * The modules needing protection typically calls SQUEUE_ENTER_ONE() or 43 * SQUEUE_ENTER() macro as soon as a thread enter the module 44 * from either direction. For each packet, the processing function 45 * and argument is stored in the mblk itself. When the packet is ready 46 * to be processed, the squeue retrieves the stored function and calls 47 * it with the supplied argument and the pointer to the packet itself. 48 * The called function can assume that no other thread is processing 49 * the squeue when it is executing. 50 * 51 * Squeue/connection binding: 52 * -------------------------- 53 * 54 * TCP/IP uses an IP classifier in conjunction with squeue where specific 55 * connections are assigned to specific squeue (based on various policies), 56 * at the connection creation time. Once assigned, the connection to 57 * squeue mapping is never changed and all future packets for that 58 * connection are processed on that squeue. The connection ("conn") to 59 * squeue mapping is stored in "conn_t" member "conn_sqp". 60 * 61 * Since the processing of the connection cuts across multiple layers 62 * but still allows packets for different connnection to be processed on 63 * other CPU/squeues, squeues are also termed as "Vertical Perimeter" or 64 * "Per Connection Vertical Perimeter". 65 * 66 * Processing Model: 67 * ----------------- 68 * 69 * Squeue doesn't necessary processes packets with its own worker thread. 70 * The callers can pick if they just want to queue the packet, process 71 * their packet if nothing is queued or drain and process. The first two 72 * modes are typically employed when the packet was generated while 73 * already doing the processing behind the squeue and last mode (drain 74 * and process) is typically employed when the thread is entering squeue 75 * for the first time. The squeue still imposes a finite time limit 76 * for which a external thread can do processing after which it switches 77 * processing to its own worker thread. 78 * 79 * Once created, squeues are never deleted. Hence squeue pointers are 80 * always valid. This means that functions outside the squeue can still 81 * refer safely to conn_sqp and their is no need for ref counts. 82 * 83 * Only a thread executing in the squeue can change the squeue of the 84 * connection. It does so by calling a squeue framework function to do this. 85 * After changing the squeue, the thread must leave the squeue. It must not 86 * continue to execute any code that needs squeue protection. 87 * 88 * The squeue framework, after entering the squeue, checks if the current 89 * squeue matches the conn_sqp. If the check fails, the packet is delivered 90 * to right squeue. 91 * 92 * Polling Model: 93 * -------------- 94 * 95 * Squeues can control the rate of packet arrival into itself from the 96 * NIC or specific Rx ring within a NIC. As part of capability negotiation 97 * between IP and MAC layer, squeue are created for each TCP soft ring 98 * (or TCP Rx ring - to be implemented in future). As part of this 99 * negotiation, squeues get a cookie for underlying soft ring or Rx 100 * ring, a function to turn off incoming packets and a function to call 101 * to poll for packets. This helps schedule the receive side packet 102 * processing so that queue backlog doesn't build up and packet processing 103 * doesn't keep getting disturbed by high priority interrupts. As part 104 * of this mode, as soon as a backlog starts building, squeue turns off 105 * the interrupts and switches to poll mode. In poll mode, when poll 106 * thread goes down to retrieve packets, it retrieves them in the form of 107 * a chain which improves performance even more. As the squeue/softring 108 * system gets more packets, it gets more efficient by switching to 109 * polling more often and dealing with larger packet chains. 110 * 111 */ 112 113 #include <sys/types.h> 114 #include <sys/cmn_err.h> 115 #include <sys/debug.h> 116 #include <sys/kmem.h> 117 #include <sys/cpuvar.h> 118 #include <sys/condvar_impl.h> 119 #include <sys/systm.h> 120 #include <sys/callb.h> 121 #include <sys/sdt.h> 122 #include <sys/ddi.h> 123 #include <sys/sunddi.h> 124 125 #include <inet/ipclassifier.h> 126 #include <inet/udp_impl.h> 127 128 #include <sys/squeue_impl.h> 129 130 static void squeue_fire(void *); 131 static void squeue_drain(squeue_t *, uint_t, hrtime_t); 132 static void squeue_worker(squeue_t *sqp); 133 static void squeue_polling_thread(squeue_t *sqp); 134 135 kmem_cache_t *squeue_cache; 136 137 #define SQUEUE_MSEC_TO_NSEC 1000000 138 139 int squeue_drain_ms = 20; 140 int squeue_workerwait_ms = 0; 141 142 /* The values above converted to ticks or nano seconds */ 143 static int squeue_drain_ns = 0; 144 static int squeue_workerwait_tick = 0; 145 146 #define MAX_BYTES_TO_PICKUP 150000 147 148 #define ENQUEUE_CHAIN(sqp, mp, tail, cnt) { \ 149 /* \ 150 * Enqueue our mblk chain. \ 151 */ \ 152 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 153 \ 154 if ((sqp)->sq_last != NULL) \ 155 (sqp)->sq_last->b_next = (mp); \ 156 else \ 157 (sqp)->sq_first = (mp); \ 158 (sqp)->sq_last = (tail); \ 159 (sqp)->sq_count += (cnt); \ 160 ASSERT((sqp)->sq_count > 0); \ 161 DTRACE_PROBE4(squeue__enqueuechain, squeue_t *, sqp, \ 162 mblk_t *, mp, mblk_t *, tail, int, cnt); \ 163 \ 164 } 165 166 /* 167 * Blank the receive ring (in this case it is the soft ring). When 168 * blanked, the soft ring will not send any more packets up. 169 * Blanking may not succeed when there is a CPU already in the soft 170 * ring sending packets up. In that case, SQS_POLLING will not be 171 * set. 172 */ 173 #define SQS_POLLING_ON(sqp, sq_poll_capable, rx_ring) { \ 174 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 175 if (sq_poll_capable) { \ 176 ASSERT(rx_ring != NULL); \ 177 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \ 178 if (!(sqp->sq_state & SQS_POLLING)) { \ 179 if (rx_ring->rr_intr_disable(rx_ring->rr_intr_handle)) \ 180 sqp->sq_state |= SQS_POLLING; \ 181 } \ 182 } \ 183 } 184 185 #define SQS_POLLING_OFF(sqp, sq_poll_capable, rx_ring) { \ 186 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 187 if (sq_poll_capable) { \ 188 ASSERT(rx_ring != NULL); \ 189 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \ 190 if (sqp->sq_state & SQS_POLLING) { \ 191 sqp->sq_state &= ~SQS_POLLING; \ 192 rx_ring->rr_intr_enable(rx_ring->rr_intr_handle); \ 193 } \ 194 } \ 195 } 196 197 /* Wakeup poll thread only if SQS_POLLING is set */ 198 #define SQS_POLL_RING(sqp) { \ 199 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 200 if (sqp->sq_state & SQS_POLLING) { \ 201 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \ 202 if (!(sqp->sq_state & SQS_GET_PKTS)) { \ 203 sqp->sq_state |= SQS_GET_PKTS; \ 204 cv_signal(&sqp->sq_poll_cv); \ 205 } \ 206 } \ 207 } 208 209 #ifdef DEBUG 210 #define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) { \ 211 (sqp)->sq_curmp = (mp); \ 212 (sqp)->sq_curproc = (proc); \ 213 (sqp)->sq_connp = (connp); \ 214 (mp)->b_tag = (sqp)->sq_tag = (tag); \ 215 } 216 217 #define SQUEUE_DBG_CLEAR(sqp) { \ 218 (sqp)->sq_curmp = NULL; \ 219 (sqp)->sq_curproc = NULL; \ 220 (sqp)->sq_connp = NULL; \ 221 } 222 #else 223 #define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) 224 #define SQUEUE_DBG_CLEAR(sqp) 225 #endif 226 227 void 228 squeue_init(void) 229 { 230 squeue_cache = kmem_cache_create("squeue_cache", 231 sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0); 232 233 squeue_drain_ns = squeue_drain_ms * SQUEUE_MSEC_TO_NSEC; 234 squeue_workerwait_tick = MSEC_TO_TICK_ROUNDUP(squeue_workerwait_ms); 235 } 236 237 /* ARGSUSED */ 238 squeue_t * 239 squeue_create(clock_t wait, pri_t pri) 240 { 241 squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP); 242 243 bzero(sqp, sizeof (squeue_t)); 244 sqp->sq_bind = PBIND_NONE; 245 sqp->sq_priority = pri; 246 sqp->sq_wait = MSEC_TO_TICK(wait); 247 sqp->sq_worker = thread_create(NULL, 0, squeue_worker, 248 sqp, 0, &p0, TS_RUN, pri); 249 250 sqp->sq_poll_thr = thread_create(NULL, 0, squeue_polling_thread, 251 sqp, 0, &p0, TS_RUN, pri); 252 253 sqp->sq_enter = squeue_enter; 254 sqp->sq_drain = squeue_drain; 255 256 return (sqp); 257 } 258 259 /* 260 * Bind squeue worker thread to the specified CPU, given by CPU id. 261 * If the CPU id value is -1, bind the worker thread to the value 262 * specified in sq_bind field. If a thread is already bound to a 263 * different CPU, unbind it from the old CPU and bind to the new one. 264 */ 265 266 void 267 squeue_bind(squeue_t *sqp, processorid_t bind) 268 { 269 mutex_enter(&sqp->sq_lock); 270 ASSERT(sqp->sq_bind != PBIND_NONE || bind != PBIND_NONE); 271 ASSERT(MUTEX_HELD(&cpu_lock)); 272 273 if (sqp->sq_state & SQS_BOUND) { 274 if (sqp->sq_bind == bind) { 275 mutex_exit(&sqp->sq_lock); 276 return; 277 } 278 thread_affinity_clear(sqp->sq_worker); 279 } else { 280 sqp->sq_state |= SQS_BOUND; 281 } 282 283 if (bind != PBIND_NONE) 284 sqp->sq_bind = bind; 285 286 thread_affinity_set(sqp->sq_worker, sqp->sq_bind); 287 mutex_exit(&sqp->sq_lock); 288 } 289 290 void 291 squeue_unbind(squeue_t *sqp) 292 { 293 mutex_enter(&sqp->sq_lock); 294 if (!(sqp->sq_state & SQS_BOUND)) { 295 mutex_exit(&sqp->sq_lock); 296 return; 297 } 298 299 sqp->sq_state &= ~SQS_BOUND; 300 thread_affinity_clear(sqp->sq_worker); 301 mutex_exit(&sqp->sq_lock); 302 } 303 304 void 305 squeue_worker_wakeup(squeue_t *sqp) 306 { 307 timeout_id_t tid = (sqp)->sq_tid; 308 309 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); 310 311 if (sqp->sq_wait == 0) { 312 ASSERT(tid == 0); 313 ASSERT(!(sqp->sq_state & SQS_TMO_PROG)); 314 sqp->sq_awaken = ddi_get_lbolt(); 315 cv_signal(&sqp->sq_worker_cv); 316 mutex_exit(&sqp->sq_lock); 317 return; 318 } 319 320 /* 321 * Queue isn't being processed, so take 322 * any post enqueue actions needed before leaving. 323 */ 324 if (tid != 0) { 325 /* 326 * Waiting for an enter() to process mblk(s). 327 */ 328 clock_t now = ddi_get_lbolt(); 329 clock_t waited = now - sqp->sq_awaken; 330 331 if (TICK_TO_MSEC(waited) >= sqp->sq_wait) { 332 /* 333 * Times up and have a worker thread 334 * waiting for work, so schedule it. 335 */ 336 sqp->sq_tid = 0; 337 sqp->sq_awaken = now; 338 cv_signal(&sqp->sq_worker_cv); 339 mutex_exit(&sqp->sq_lock); 340 (void) untimeout(tid); 341 return; 342 } 343 mutex_exit(&sqp->sq_lock); 344 return; 345 } else if (sqp->sq_state & SQS_TMO_PROG) { 346 mutex_exit(&sqp->sq_lock); 347 return; 348 } else { 349 clock_t wait = sqp->sq_wait; 350 /* 351 * Wait up to sqp->sq_wait ms for an 352 * enter() to process this queue. We 353 * don't want to contend on timeout locks 354 * with sq_lock held for performance reasons, 355 * so drop the sq_lock before calling timeout 356 * but we need to check if timeout is required 357 * after re acquiring the sq_lock. Once 358 * the sq_lock is dropped, someone else could 359 * have processed the packet or the timeout could 360 * have already fired. 361 */ 362 sqp->sq_state |= SQS_TMO_PROG; 363 mutex_exit(&sqp->sq_lock); 364 tid = timeout(squeue_fire, sqp, wait); 365 mutex_enter(&sqp->sq_lock); 366 /* Check again if we still need the timeout */ 367 if (((sqp->sq_state & (SQS_PROC|SQS_TMO_PROG)) == 368 SQS_TMO_PROG) && (sqp->sq_tid == 0) && 369 (sqp->sq_first != NULL)) { 370 sqp->sq_state &= ~SQS_TMO_PROG; 371 sqp->sq_tid = tid; 372 mutex_exit(&sqp->sq_lock); 373 return; 374 } else { 375 if (sqp->sq_state & SQS_TMO_PROG) { 376 sqp->sq_state &= ~SQS_TMO_PROG; 377 mutex_exit(&sqp->sq_lock); 378 (void) untimeout(tid); 379 } else { 380 /* 381 * The timer fired before we could 382 * reacquire the sq_lock. squeue_fire 383 * removes the SQS_TMO_PROG flag 384 * and we don't need to do anything 385 * else. 386 */ 387 mutex_exit(&sqp->sq_lock); 388 } 389 } 390 } 391 392 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 393 } 394 395 /* 396 * squeue_enter() - enter squeue sqp with mblk mp (which can be 397 * a chain), while tail points to the end and cnt in number of 398 * mblks in the chain. 399 * 400 * For a chain of single packet (i.e. mp == tail), go through the 401 * fast path if no one is processing the squeue and nothing is queued. 402 * 403 * The proc and arg for each mblk is already stored in the mblk in 404 * appropriate places. 405 * 406 * The process_flag specifies if we are allowed to process the mblk 407 * and drain in the entering thread context. If process_flag is 408 * SQ_FILL, then we just queue the mblk and return (after signaling 409 * the worker thread if no one else is processing the squeue). 410 * 411 * The ira argument can be used when the count is one. 412 * For a chain the caller needs to prepend any needed mblks from 413 * ip_recv_attr_to_mblk(). 414 */ 415 /* ARGSUSED */ 416 void 417 squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt, 418 ip_recv_attr_t *ira, int process_flag, uint8_t tag) 419 { 420 conn_t *connp; 421 sqproc_t proc; 422 hrtime_t now; 423 424 ASSERT(sqp != NULL); 425 ASSERT(mp != NULL); 426 ASSERT(tail != NULL); 427 ASSERT(cnt > 0); 428 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 429 ASSERT(ira == NULL || cnt == 1); 430 431 mutex_enter(&sqp->sq_lock); 432 433 /* 434 * Try to process the packet if SQ_FILL flag is not set and 435 * we are allowed to process the squeue. The SQ_NODRAIN is 436 * ignored if the packet chain consists of more than 1 packet. 437 */ 438 if (!(sqp->sq_state & SQS_PROC) && ((process_flag == SQ_PROCESS) || 439 (process_flag == SQ_NODRAIN && sqp->sq_first == NULL))) { 440 /* 441 * See if anything is already queued. If we are the 442 * first packet, do inline processing else queue the 443 * packet and do the drain. 444 */ 445 if (sqp->sq_first == NULL && cnt == 1) { 446 /* 447 * Fast-path, ok to process and nothing queued. 448 */ 449 sqp->sq_state |= (SQS_PROC|SQS_FAST); 450 sqp->sq_run = curthread; 451 mutex_exit(&sqp->sq_lock); 452 453 /* 454 * We are the chain of 1 packet so 455 * go through this fast path. 456 */ 457 ASSERT(mp->b_prev != NULL); 458 ASSERT(mp->b_queue != NULL); 459 connp = (conn_t *)mp->b_prev; 460 mp->b_prev = NULL; 461 proc = (sqproc_t)mp->b_queue; 462 mp->b_queue = NULL; 463 ASSERT(proc != NULL && connp != NULL); 464 ASSERT(mp->b_next == NULL); 465 466 /* 467 * Handle squeue switching. More details in the 468 * block comment at the top of the file 469 */ 470 if (connp->conn_sqp == sqp) { 471 SQUEUE_DBG_SET(sqp, mp, proc, connp, 472 tag); 473 connp->conn_on_sqp = B_TRUE; 474 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 475 sqp, mblk_t *, mp, conn_t *, connp); 476 (*proc)(connp, mp, sqp, ira); 477 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 478 sqp, conn_t *, connp); 479 connp->conn_on_sqp = B_FALSE; 480 SQUEUE_DBG_CLEAR(sqp); 481 CONN_DEC_REF(connp); 482 } else { 483 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, 484 connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE); 485 } 486 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 487 mutex_enter(&sqp->sq_lock); 488 sqp->sq_state &= ~(SQS_PROC|SQS_FAST); 489 sqp->sq_run = NULL; 490 if (sqp->sq_first == NULL || 491 process_flag == SQ_NODRAIN) { 492 if (sqp->sq_first != NULL) { 493 squeue_worker_wakeup(sqp); 494 return; 495 } 496 /* 497 * We processed inline our packet and nothing 498 * new has arrived. We are done. In case any 499 * control actions are pending, wake up the 500 * worker. 501 */ 502 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) 503 cv_signal(&sqp->sq_worker_cv); 504 mutex_exit(&sqp->sq_lock); 505 return; 506 } 507 } else { 508 if (ira != NULL) { 509 mblk_t *attrmp; 510 511 ASSERT(cnt == 1); 512 attrmp = ip_recv_attr_to_mblk(ira); 513 if (attrmp == NULL) { 514 mutex_exit(&sqp->sq_lock); 515 ip_drop_input("squeue: " 516 "ip_recv_attr_to_mblk", 517 mp, NULL); 518 /* Caller already set b_prev/b_next */ 519 mp->b_prev = mp->b_next = NULL; 520 freemsg(mp); 521 return; 522 } 523 ASSERT(attrmp->b_cont == NULL); 524 attrmp->b_cont = mp; 525 /* Move connp and func to new */ 526 attrmp->b_queue = mp->b_queue; 527 mp->b_queue = NULL; 528 attrmp->b_prev = mp->b_prev; 529 mp->b_prev = NULL; 530 531 ASSERT(mp == tail); 532 tail = mp = attrmp; 533 } 534 535 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 536 #ifdef DEBUG 537 mp->b_tag = tag; 538 #endif 539 } 540 /* 541 * We are here because either we couldn't do inline 542 * processing (because something was already queued), 543 * or we had a chain of more than one packet, 544 * or something else arrived after we were done with 545 * inline processing. 546 */ 547 ASSERT(MUTEX_HELD(&sqp->sq_lock)); 548 ASSERT(sqp->sq_first != NULL); 549 now = gethrtime(); 550 sqp->sq_drain(sqp, SQS_ENTER, now + squeue_drain_ns); 551 552 /* 553 * If we didn't do a complete drain, the worker 554 * thread was already signalled by squeue_drain. 555 * In case any control actions are pending, wake 556 * up the worker. 557 */ 558 sqp->sq_run = NULL; 559 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) 560 cv_signal(&sqp->sq_worker_cv); 561 mutex_exit(&sqp->sq_lock); 562 return; 563 } else { 564 /* 565 * We let a thread processing a squeue reenter only 566 * once. This helps the case of incoming connection 567 * where a SYN-ACK-ACK that triggers the conn_ind 568 * doesn't have to queue the packet if listener and 569 * eager are on the same squeue. Also helps the 570 * loopback connection where the two ends are bound 571 * to the same squeue (which is typical on single 572 * CPU machines). 573 * 574 * We let the thread reenter only once for the fear 575 * of stack getting blown with multiple traversal. 576 */ 577 connp = (conn_t *)mp->b_prev; 578 if (!(sqp->sq_state & SQS_REENTER) && 579 (process_flag != SQ_FILL) && (sqp->sq_first == NULL) && 580 (sqp->sq_run == curthread) && (cnt == 1) && 581 (connp->conn_on_sqp == B_FALSE)) { 582 sqp->sq_state |= SQS_REENTER; 583 mutex_exit(&sqp->sq_lock); 584 585 ASSERT(mp->b_prev != NULL); 586 ASSERT(mp->b_queue != NULL); 587 588 mp->b_prev = NULL; 589 proc = (sqproc_t)mp->b_queue; 590 mp->b_queue = NULL; 591 592 /* 593 * Handle squeue switching. More details in the 594 * block comment at the top of the file 595 */ 596 if (connp->conn_sqp == sqp) { 597 connp->conn_on_sqp = B_TRUE; 598 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 599 sqp, mblk_t *, mp, conn_t *, connp); 600 (*proc)(connp, mp, sqp, ira); 601 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 602 sqp, conn_t *, connp); 603 connp->conn_on_sqp = B_FALSE; 604 CONN_DEC_REF(connp); 605 } else { 606 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, 607 connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE); 608 } 609 610 mutex_enter(&sqp->sq_lock); 611 sqp->sq_state &= ~SQS_REENTER; 612 mutex_exit(&sqp->sq_lock); 613 return; 614 } 615 616 /* 617 * Queue is already being processed or there is already 618 * one or more paquets on the queue. Enqueue the 619 * packet and wakeup the squeue worker thread if the 620 * squeue is not being processed. 621 */ 622 #ifdef DEBUG 623 mp->b_tag = tag; 624 #endif 625 if (ira != NULL) { 626 mblk_t *attrmp; 627 628 ASSERT(cnt == 1); 629 attrmp = ip_recv_attr_to_mblk(ira); 630 if (attrmp == NULL) { 631 mutex_exit(&sqp->sq_lock); 632 ip_drop_input("squeue: ip_recv_attr_to_mblk", 633 mp, NULL); 634 /* Caller already set b_prev/b_next */ 635 mp->b_prev = mp->b_next = NULL; 636 freemsg(mp); 637 return; 638 } 639 ASSERT(attrmp->b_cont == NULL); 640 attrmp->b_cont = mp; 641 /* Move connp and func to new */ 642 attrmp->b_queue = mp->b_queue; 643 mp->b_queue = NULL; 644 attrmp->b_prev = mp->b_prev; 645 mp->b_prev = NULL; 646 647 ASSERT(mp == tail); 648 tail = mp = attrmp; 649 } 650 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 651 if (!(sqp->sq_state & SQS_PROC)) { 652 squeue_worker_wakeup(sqp); 653 return; 654 } 655 /* 656 * In case any control actions are pending, wake 657 * up the worker. 658 */ 659 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) 660 cv_signal(&sqp->sq_worker_cv); 661 mutex_exit(&sqp->sq_lock); 662 return; 663 } 664 } 665 666 /* 667 * PRIVATE FUNCTIONS 668 */ 669 670 static void 671 squeue_fire(void *arg) 672 { 673 squeue_t *sqp = arg; 674 uint_t state; 675 676 mutex_enter(&sqp->sq_lock); 677 678 state = sqp->sq_state; 679 if (sqp->sq_tid == 0 && !(state & SQS_TMO_PROG)) { 680 mutex_exit(&sqp->sq_lock); 681 return; 682 } 683 684 sqp->sq_tid = 0; 685 /* 686 * The timeout fired before we got a chance to set it. 687 * Process it anyway but remove the SQS_TMO_PROG so that 688 * the guy trying to set the timeout knows that it has 689 * already been processed. 690 */ 691 if (state & SQS_TMO_PROG) 692 sqp->sq_state &= ~SQS_TMO_PROG; 693 694 if (!(state & SQS_PROC)) { 695 sqp->sq_awaken = ddi_get_lbolt(); 696 cv_signal(&sqp->sq_worker_cv); 697 } 698 mutex_exit(&sqp->sq_lock); 699 } 700 701 static void 702 squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire) 703 { 704 mblk_t *mp; 705 mblk_t *head; 706 sqproc_t proc; 707 conn_t *connp; 708 timeout_id_t tid; 709 ill_rx_ring_t *sq_rx_ring = sqp->sq_rx_ring; 710 hrtime_t now; 711 boolean_t did_wakeup = B_FALSE; 712 boolean_t sq_poll_capable; 713 ip_recv_attr_t *ira, iras; 714 715 sq_poll_capable = (sqp->sq_state & SQS_POLL_CAPAB) != 0; 716 again: 717 ASSERT(mutex_owned(&sqp->sq_lock)); 718 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 719 SQS_POLL_QUIESCE_DONE))); 720 721 head = sqp->sq_first; 722 sqp->sq_first = NULL; 723 sqp->sq_last = NULL; 724 sqp->sq_count = 0; 725 726 if ((tid = sqp->sq_tid) != 0) 727 sqp->sq_tid = 0; 728 729 sqp->sq_state |= SQS_PROC | proc_type; 730 731 /* 732 * We have backlog built up. Switch to polling mode if the 733 * device underneath allows it. Need to do it so that 734 * more packets don't come in and disturb us (by contending 735 * for sq_lock or higher priority thread preempting us). 736 * 737 * The worker thread is allowed to do active polling while we 738 * just disable the interrupts for drain by non worker (kernel 739 * or userland) threads so they can peacefully process the 740 * packets during time allocated to them. 741 */ 742 SQS_POLLING_ON(sqp, sq_poll_capable, sq_rx_ring); 743 mutex_exit(&sqp->sq_lock); 744 745 if (tid != 0) 746 (void) untimeout(tid); 747 748 while ((mp = head) != NULL) { 749 750 head = mp->b_next; 751 mp->b_next = NULL; 752 753 proc = (sqproc_t)mp->b_queue; 754 mp->b_queue = NULL; 755 connp = (conn_t *)mp->b_prev; 756 mp->b_prev = NULL; 757 758 /* Is there an ip_recv_attr_t to handle? */ 759 if (ip_recv_attr_is_mblk(mp)) { 760 mblk_t *attrmp = mp; 761 762 ASSERT(attrmp->b_cont != NULL); 763 764 mp = attrmp->b_cont; 765 attrmp->b_cont = NULL; 766 ASSERT(mp->b_queue == NULL); 767 ASSERT(mp->b_prev == NULL); 768 769 if (!ip_recv_attr_from_mblk(attrmp, &iras)) { 770 /* The ill or ip_stack_t disappeared on us */ 771 ip_drop_input("ip_recv_attr_from_mblk", 772 mp, NULL); 773 ira_cleanup(&iras, B_TRUE); 774 CONN_DEC_REF(connp); 775 continue; 776 } 777 ira = &iras; 778 } else { 779 ira = NULL; 780 } 781 782 783 /* 784 * Handle squeue switching. More details in the 785 * block comment at the top of the file 786 */ 787 if (connp->conn_sqp == sqp) { 788 SQUEUE_DBG_SET(sqp, mp, proc, connp, 789 mp->b_tag); 790 connp->conn_on_sqp = B_TRUE; 791 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 792 sqp, mblk_t *, mp, conn_t *, connp); 793 (*proc)(connp, mp, sqp, ira); 794 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 795 sqp, conn_t *, connp); 796 connp->conn_on_sqp = B_FALSE; 797 CONN_DEC_REF(connp); 798 } else { 799 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, connp, ira, 800 SQ_FILL, SQTAG_SQUEUE_CHANGE); 801 } 802 if (ira != NULL) 803 ira_cleanup(ira, B_TRUE); 804 } 805 806 SQUEUE_DBG_CLEAR(sqp); 807 808 mutex_enter(&sqp->sq_lock); 809 810 /* 811 * Check if there is still work to do (either more arrived or timer 812 * expired). If we are the worker thread and we are polling capable, 813 * continue doing the work since no one else is around to do the 814 * work anyway (but signal the poll thread to retrieve some packets 815 * in the meanwhile). If we are not the worker thread, just 816 * signal the worker thread to take up the work if processing time 817 * has expired. 818 */ 819 if (sqp->sq_first != NULL) { 820 /* 821 * Still more to process. If time quanta not expired, we 822 * should let the drain go on. The worker thread is allowed 823 * to drain as long as there is anything left. 824 */ 825 now = gethrtime(); 826 if ((now < expire) || (proc_type == SQS_WORKER)) { 827 /* 828 * If time not expired or we are worker thread and 829 * this squeue is polling capable, continue to do 830 * the drain. 831 * 832 * We turn off interrupts for all userland threads 833 * doing drain but we do active polling only for 834 * worker thread. 835 * 836 * Calling SQS_POLL_RING() even in the case of 837 * SQS_POLLING_ON() not succeeding is ok as 838 * SQS_POLL_RING() will not wake up poll thread 839 * if SQS_POLLING bit is not set. 840 */ 841 if (proc_type == SQS_WORKER) 842 SQS_POLL_RING(sqp); 843 goto again; 844 } else { 845 did_wakeup = B_TRUE; 846 sqp->sq_awaken = ddi_get_lbolt(); 847 cv_signal(&sqp->sq_worker_cv); 848 } 849 } 850 851 /* 852 * If the poll thread is already running, just return. The 853 * poll thread continues to hold the proc and will finish 854 * processing. 855 */ 856 if (sqp->sq_state & SQS_GET_PKTS) { 857 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 858 SQS_POLL_QUIESCE_DONE))); 859 sqp->sq_state &= ~proc_type; 860 return; 861 } 862 863 /* 864 * 865 * If we are the worker thread and no work is left, send the poll 866 * thread down once more to see if something arrived. Otherwise, 867 * turn the interrupts back on and we are done. 868 */ 869 if ((proc_type == SQS_WORKER) && (sqp->sq_state & SQS_POLLING)) { 870 /* 871 * Do one last check to see if anything arrived 872 * in the NIC. We leave the SQS_PROC set to ensure 873 * that poll thread keeps the PROC and can decide 874 * if it needs to turn polling off or continue 875 * processing. 876 * 877 * If we drop the SQS_PROC here and poll thread comes 878 * up empty handed, it can not safely turn polling off 879 * since someone else could have acquired the PROC 880 * and started draining. The previously running poll 881 * thread and the current thread doing drain would end 882 * up in a race for turning polling on/off and more 883 * complex code would be required to deal with it. 884 * 885 * Its lot simpler for drain to hand the SQS_PROC to 886 * poll thread (if running) and let poll thread finish 887 * without worrying about racing with any other thread. 888 */ 889 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 890 SQS_POLL_QUIESCE_DONE))); 891 SQS_POLL_RING(sqp); 892 sqp->sq_state &= ~proc_type; 893 } else { 894 /* 895 * The squeue is either not capable of polling or the 896 * attempt to blank (i.e., turn SQS_POLLING_ON()) was 897 * unsuccessful or poll thread already finished 898 * processing and didn't find anything. Since there 899 * is nothing queued and we already turn polling on 900 * (for all threads doing drain), we should turn 901 * polling off and relinquish the PROC. 902 */ 903 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 904 SQS_POLL_QUIESCE_DONE))); 905 SQS_POLLING_OFF(sqp, sq_poll_capable, sq_rx_ring); 906 sqp->sq_state &= ~(SQS_PROC | proc_type); 907 if (!did_wakeup && sqp->sq_first != NULL) { 908 squeue_worker_wakeup(sqp); 909 mutex_enter(&sqp->sq_lock); 910 } 911 /* 912 * If we are not the worker and there is a pending quiesce 913 * event, wake up the worker 914 */ 915 if ((proc_type != SQS_WORKER) && 916 (sqp->sq_state & SQS_WORKER_THR_CONTROL)) 917 cv_signal(&sqp->sq_worker_cv); 918 } 919 } 920 921 /* 922 * Quiesce, Restart, or Cleanup of the squeue poll thread. 923 * 924 * Quiesce and Restart: After an squeue poll thread has been quiesced, it does 925 * not attempt to poll the underlying soft ring any more. The quiesce is 926 * triggered by the mac layer when it wants to quiesce a soft ring. Typically 927 * control operations such as changing the fanout of a NIC or VNIC (dladm 928 * setlinkprop) need to quiesce data flow before changing the wiring. 929 * The operation is done by the mac layer, but it calls back into IP to 930 * quiesce the soft ring. After completing the operation (say increase or 931 * decrease of the fanout) the mac layer then calls back into IP to restart 932 * the quiesced soft ring. 933 * 934 * Cleanup: This is triggered when the squeue binding to a soft ring is 935 * removed permanently. Typically interface plumb and unplumb would trigger 936 * this. It can also be triggered from the mac layer when a soft ring is 937 * being deleted say as the result of a fanout reduction. Since squeues are 938 * never deleted, the cleanup marks the squeue as fit for recycling and 939 * moves it to the zeroth squeue set. 940 */ 941 static void 942 squeue_poll_thr_control(squeue_t *sqp) 943 { 944 if (sqp->sq_state & SQS_POLL_THR_RESTART) { 945 /* Restart implies a previous quiesce */ 946 ASSERT(sqp->sq_state & SQS_POLL_THR_QUIESCED); 947 sqp->sq_state &= ~(SQS_POLL_THR_QUIESCED | 948 SQS_POLL_THR_RESTART); 949 sqp->sq_state |= SQS_POLL_CAPAB; 950 cv_signal(&sqp->sq_worker_cv); 951 return; 952 } 953 954 if (sqp->sq_state & SQS_POLL_THR_QUIESCE) { 955 sqp->sq_state |= SQS_POLL_THR_QUIESCED; 956 sqp->sq_state &= ~SQS_POLL_THR_QUIESCE; 957 cv_signal(&sqp->sq_worker_cv); 958 return; 959 } 960 } 961 962 /* 963 * POLLING Notes 964 * 965 * With polling mode, we want to do as much processing as we possibly can 966 * in worker thread context. The sweet spot is worker thread keeps doing 967 * work all the time in polling mode and writers etc. keep dumping packets 968 * to worker thread. Occassionally, we send the poll thread (running at 969 * lower priority to NIC to get the chain of packets to feed to worker). 970 * Sending the poll thread down to NIC is dependant on 3 criterions 971 * 972 * 1) Its always driven from squeue_drain and only if worker thread is 973 * doing the drain. 974 * 2) We clear the backlog once and more packets arrived in between. 975 * Before starting drain again, send the poll thread down if 976 * the drain is being done by worker thread. 977 * 3) Before exiting the squeue_drain, if the poll thread is not already 978 * working and we are the worker thread, try to poll one more time. 979 * 980 * For latency sake, we do allow any thread calling squeue_enter 981 * to process its packet provided: 982 * 983 * 1) Nothing is queued 984 * 2) If more packets arrived in between, the non worker thread are allowed 985 * to do the drain till their time quanta expired provided SQS_GET_PKTS 986 * wasn't set in between. 987 * 988 * Avoiding deadlocks with interrupts 989 * ================================== 990 * 991 * One of the big problem is that we can't send poll_thr down while holding 992 * the sq_lock since the thread can block. So we drop the sq_lock before 993 * calling sq_get_pkts(). We keep holding the SQS_PROC as long as the 994 * poll thread is running so that no other thread can acquire the 995 * perimeter in between. If the squeue_drain gets done (no more work 996 * left), it leaves the SQS_PROC set if poll thread is running. 997 */ 998 999 /* 1000 * This is the squeue poll thread. In poll mode, it polls the underlying 1001 * TCP softring and feeds packets into the squeue. The worker thread then 1002 * drains the squeue. The poll thread also responds to control signals for 1003 * quiesceing, restarting, or cleanup of an squeue. These are driven by 1004 * control operations like plumb/unplumb or as a result of dynamic Rx ring 1005 * related operations that are driven from the mac layer. 1006 */ 1007 static void 1008 squeue_polling_thread(squeue_t *sqp) 1009 { 1010 kmutex_t *lock = &sqp->sq_lock; 1011 kcondvar_t *async = &sqp->sq_poll_cv; 1012 ip_mac_rx_t sq_get_pkts; 1013 ip_accept_t ip_accept; 1014 ill_rx_ring_t *sq_rx_ring; 1015 ill_t *sq_ill; 1016 mblk_t *head, *tail, *mp; 1017 uint_t cnt; 1018 void *sq_mac_handle; 1019 callb_cpr_t cprinfo; 1020 size_t bytes_to_pickup; 1021 uint32_t ctl_state; 1022 1023 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_poll"); 1024 mutex_enter(lock); 1025 1026 for (;;) { 1027 CALLB_CPR_SAFE_BEGIN(&cprinfo); 1028 cv_wait(async, lock); 1029 CALLB_CPR_SAFE_END(&cprinfo, lock); 1030 1031 ctl_state = sqp->sq_state & (SQS_POLL_THR_CONTROL | 1032 SQS_POLL_THR_QUIESCED); 1033 if (ctl_state != 0) { 1034 /* 1035 * If the squeue is quiesced, then wait for a control 1036 * request. A quiesced squeue must not poll the 1037 * underlying soft ring. 1038 */ 1039 if (ctl_state == SQS_POLL_THR_QUIESCED) 1040 continue; 1041 /* 1042 * Act on control requests to quiesce, cleanup or 1043 * restart an squeue 1044 */ 1045 squeue_poll_thr_control(sqp); 1046 continue; 1047 } 1048 1049 if (!(sqp->sq_state & SQS_POLL_CAPAB)) 1050 continue; 1051 1052 ASSERT((sqp->sq_state & 1053 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) == 1054 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)); 1055 1056 poll_again: 1057 sq_rx_ring = sqp->sq_rx_ring; 1058 sq_get_pkts = sq_rx_ring->rr_rx; 1059 sq_mac_handle = sq_rx_ring->rr_rx_handle; 1060 ip_accept = sq_rx_ring->rr_ip_accept; 1061 sq_ill = sq_rx_ring->rr_ill; 1062 bytes_to_pickup = MAX_BYTES_TO_PICKUP; 1063 mutex_exit(lock); 1064 head = sq_get_pkts(sq_mac_handle, bytes_to_pickup); 1065 mp = NULL; 1066 if (head != NULL) { 1067 /* 1068 * We got the packet chain from the mac layer. It 1069 * would be nice to be able to process it inline 1070 * for better performance but we need to give 1071 * IP a chance to look at this chain to ensure 1072 * that packets are really meant for this squeue 1073 * and do the IP processing. 1074 */ 1075 mp = ip_accept(sq_ill, sq_rx_ring, sqp, head, 1076 &tail, &cnt); 1077 } 1078 mutex_enter(lock); 1079 if (mp != NULL) { 1080 /* 1081 * The ip_accept function has already added an 1082 * ip_recv_attr_t mblk if that is needed. 1083 */ 1084 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 1085 } 1086 ASSERT((sqp->sq_state & 1087 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) == 1088 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)); 1089 1090 if (sqp->sq_first != NULL && !(sqp->sq_state & SQS_WORKER)) { 1091 /* 1092 * We have packets to process and worker thread 1093 * is not running. Check to see if poll thread is 1094 * allowed to process. Let it do processing only if it 1095 * picked up some packets from the NIC otherwise 1096 * wakeup the worker thread. 1097 */ 1098 if (mp != NULL) { 1099 hrtime_t now; 1100 1101 now = gethrtime(); 1102 sqp->sq_run = curthread; 1103 sqp->sq_drain(sqp, SQS_POLL_PROC, now + 1104 squeue_drain_ns); 1105 sqp->sq_run = NULL; 1106 1107 if (sqp->sq_first == NULL) 1108 goto poll_again; 1109 1110 /* 1111 * Couldn't do the entire drain because the 1112 * time limit expired, let the 1113 * worker thread take over. 1114 */ 1115 } 1116 1117 sqp->sq_awaken = ddi_get_lbolt(); 1118 /* 1119 * Put the SQS_PROC_HELD on so the worker 1120 * thread can distinguish where its called from. We 1121 * can remove the SQS_PROC flag here and turn off the 1122 * polling so that it wouldn't matter who gets the 1123 * processing but we get better performance this way 1124 * and save the cost of turn polling off and possibly 1125 * on again as soon as we start draining again. 1126 * 1127 * We can't remove the SQS_PROC flag without turning 1128 * polling off until we can guarantee that control 1129 * will return to squeue_drain immediately. 1130 */ 1131 sqp->sq_state |= SQS_PROC_HELD; 1132 sqp->sq_state &= ~SQS_GET_PKTS; 1133 cv_signal(&sqp->sq_worker_cv); 1134 } else if (sqp->sq_first == NULL && 1135 !(sqp->sq_state & SQS_WORKER)) { 1136 /* 1137 * Nothing queued and worker thread not running. 1138 * Since we hold the proc, no other thread is 1139 * processing the squeue. This means that there 1140 * is no work to be done and nothing is queued 1141 * in squeue or in NIC. Turn polling off and go 1142 * back to interrupt mode. 1143 */ 1144 sqp->sq_state &= ~(SQS_PROC|SQS_GET_PKTS); 1145 /* LINTED: constant in conditional context */ 1146 SQS_POLLING_OFF(sqp, B_TRUE, sq_rx_ring); 1147 1148 /* 1149 * If there is a pending control operation 1150 * wake up the worker, since it is currently 1151 * not running. 1152 */ 1153 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) 1154 cv_signal(&sqp->sq_worker_cv); 1155 } else { 1156 /* 1157 * Worker thread is already running. We don't need 1158 * to do anything. Indicate that poll thread is done. 1159 */ 1160 sqp->sq_state &= ~SQS_GET_PKTS; 1161 } 1162 if (sqp->sq_state & SQS_POLL_THR_CONTROL) { 1163 /* 1164 * Act on control requests to quiesce, cleanup or 1165 * restart an squeue 1166 */ 1167 squeue_poll_thr_control(sqp); 1168 } 1169 } 1170 } 1171 1172 /* 1173 * The squeue worker thread acts on any control requests to quiesce, cleanup 1174 * or restart an ill_rx_ring_t by calling this function. The worker thread 1175 * synchronizes with the squeue poll thread to complete the request and finally 1176 * wakes up the requestor when the request is completed. 1177 */ 1178 static void 1179 squeue_worker_thr_control(squeue_t *sqp) 1180 { 1181 ill_t *ill; 1182 ill_rx_ring_t *rx_ring; 1183 1184 ASSERT(MUTEX_HELD(&sqp->sq_lock)); 1185 1186 if (sqp->sq_state & SQS_POLL_RESTART) { 1187 /* Restart implies a previous quiesce. */ 1188 ASSERT((sqp->sq_state & (SQS_PROC_HELD | 1189 SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)) == 1190 (SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)); 1191 /* 1192 * Request the squeue poll thread to restart and wait till 1193 * it actually restarts. 1194 */ 1195 sqp->sq_state &= ~SQS_POLL_QUIESCE_DONE; 1196 sqp->sq_state |= SQS_POLL_THR_RESTART; 1197 cv_signal(&sqp->sq_poll_cv); 1198 while (sqp->sq_state & SQS_POLL_THR_QUIESCED) 1199 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock); 1200 sqp->sq_state &= ~(SQS_POLL_RESTART | SQS_PROC | 1201 SQS_WORKER); 1202 /* 1203 * Signal any waiter that is waiting for the restart 1204 * to complete 1205 */ 1206 sqp->sq_state |= SQS_POLL_RESTART_DONE; 1207 cv_signal(&sqp->sq_ctrlop_done_cv); 1208 return; 1209 } 1210 1211 if (sqp->sq_state & SQS_PROC_HELD) { 1212 /* The squeue poll thread handed control to us */ 1213 ASSERT(sqp->sq_state & SQS_PROC); 1214 } 1215 1216 /* 1217 * Prevent any other thread from processing the squeue 1218 * until we finish the control actions by setting SQS_PROC. 1219 * But allow ourself to reenter by setting SQS_WORKER 1220 */ 1221 sqp->sq_state |= (SQS_PROC | SQS_WORKER); 1222 1223 /* Signal the squeue poll thread and wait for it to quiesce itself */ 1224 if (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) { 1225 sqp->sq_state |= SQS_POLL_THR_QUIESCE; 1226 cv_signal(&sqp->sq_poll_cv); 1227 while (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) 1228 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock); 1229 } 1230 1231 rx_ring = sqp->sq_rx_ring; 1232 ill = rx_ring->rr_ill; 1233 /* 1234 * The lock hierarchy is as follows. 1235 * cpu_lock -> ill_lock -> sqset_lock -> sq_lock 1236 */ 1237 mutex_exit(&sqp->sq_lock); 1238 mutex_enter(&ill->ill_lock); 1239 mutex_enter(&sqp->sq_lock); 1240 1241 SQS_POLLING_OFF(sqp, (sqp->sq_state & SQS_POLL_CAPAB) != 0, 1242 sqp->sq_rx_ring); 1243 sqp->sq_state &= ~(SQS_POLL_CAPAB | SQS_GET_PKTS | SQS_PROC_HELD); 1244 if (sqp->sq_state & SQS_POLL_CLEANUP) { 1245 /* 1246 * Disassociate this squeue from its ill_rx_ring_t. 1247 * The rr_sqp, sq_rx_ring fields are protected by the 1248 * corresponding squeue, ill_lock* and sq_lock. Holding any 1249 * of them will ensure that the ring to squeue mapping does 1250 * not change. 1251 */ 1252 ASSERT(!(sqp->sq_state & SQS_DEFAULT)); 1253 1254 sqp->sq_rx_ring = NULL; 1255 rx_ring->rr_sqp = NULL; 1256 1257 sqp->sq_state &= ~(SQS_POLL_CLEANUP | SQS_POLL_THR_QUIESCED | 1258 SQS_POLL_QUIESCE_DONE); 1259 sqp->sq_ill = NULL; 1260 1261 rx_ring->rr_rx_handle = NULL; 1262 rx_ring->rr_intr_handle = NULL; 1263 rx_ring->rr_intr_enable = NULL; 1264 rx_ring->rr_intr_disable = NULL; 1265 sqp->sq_state |= SQS_POLL_CLEANUP_DONE; 1266 } else { 1267 sqp->sq_state &= ~SQS_POLL_QUIESCE; 1268 sqp->sq_state |= SQS_POLL_QUIESCE_DONE; 1269 } 1270 /* 1271 * Signal any waiter that is waiting for the quiesce or cleanup 1272 * to complete and also wait for it to actually see and reset the 1273 * SQS_POLL_CLEANUP_DONE. 1274 */ 1275 cv_signal(&sqp->sq_ctrlop_done_cv); 1276 mutex_exit(&ill->ill_lock); 1277 if (sqp->sq_state & SQS_POLL_CLEANUP_DONE) { 1278 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock); 1279 sqp->sq_state &= ~(SQS_PROC | SQS_WORKER); 1280 } 1281 } 1282 1283 static void 1284 squeue_worker(squeue_t *sqp) 1285 { 1286 kmutex_t *lock = &sqp->sq_lock; 1287 kcondvar_t *async = &sqp->sq_worker_cv; 1288 callb_cpr_t cprinfo; 1289 hrtime_t now; 1290 1291 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_worker"); 1292 mutex_enter(lock); 1293 1294 for (;;) { 1295 for (;;) { 1296 /* 1297 * If the poll thread has handed control to us 1298 * we need to break out of the wait. 1299 */ 1300 if (sqp->sq_state & SQS_PROC_HELD) 1301 break; 1302 1303 /* 1304 * If the squeue is not being processed and we either 1305 * have messages to drain or some thread has signaled 1306 * some control activity we need to break 1307 */ 1308 if (!(sqp->sq_state & SQS_PROC) && 1309 ((sqp->sq_state & SQS_WORKER_THR_CONTROL) || 1310 (sqp->sq_first != NULL))) 1311 break; 1312 1313 /* 1314 * If we have started some control action, then check 1315 * for the SQS_WORKER flag (since we don't 1316 * release the squeue) to make sure we own the squeue 1317 * and break out 1318 */ 1319 if ((sqp->sq_state & SQS_WORKER_THR_CONTROL) && 1320 (sqp->sq_state & SQS_WORKER)) 1321 break; 1322 1323 CALLB_CPR_SAFE_BEGIN(&cprinfo); 1324 cv_wait(async, lock); 1325 CALLB_CPR_SAFE_END(&cprinfo, lock); 1326 } 1327 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) { 1328 squeue_worker_thr_control(sqp); 1329 continue; 1330 } 1331 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 1332 SQS_POLL_CLEANUP_DONE | SQS_POLL_QUIESCE_DONE | 1333 SQS_WORKER_THR_CONTROL | SQS_POLL_THR_CONTROL))); 1334 1335 if (sqp->sq_state & SQS_PROC_HELD) 1336 sqp->sq_state &= ~SQS_PROC_HELD; 1337 1338 now = gethrtime(); 1339 sqp->sq_run = curthread; 1340 sqp->sq_drain(sqp, SQS_WORKER, now + squeue_drain_ns); 1341 sqp->sq_run = NULL; 1342 } 1343 } 1344 1345 uintptr_t * 1346 squeue_getprivate(squeue_t *sqp, sqprivate_t p) 1347 { 1348 ASSERT(p < SQPRIVATE_MAX); 1349 1350 return (&sqp->sq_private[p]); 1351 } 1352 1353 /* ARGSUSED */ 1354 void 1355 squeue_wakeup_conn(void *arg, mblk_t *mp, void *arg2, ip_recv_attr_t *dummy) 1356 { 1357 conn_t *connp = (conn_t *)arg; 1358 squeue_t *sqp = connp->conn_sqp; 1359 1360 /* 1361 * Mark the squeue as paused before waking up the thread stuck 1362 * in squeue_synch_enter(). 1363 */ 1364 mutex_enter(&sqp->sq_lock); 1365 sqp->sq_state |= SQS_PAUSE; 1366 1367 /* 1368 * Notify the thread that it's OK to proceed; that is done by 1369 * clearing the MSGWAITSYNC flag. The synch thread will free the mblk. 1370 */ 1371 ASSERT(mp->b_flag & MSGWAITSYNC); 1372 mp->b_flag &= ~MSGWAITSYNC; 1373 cv_broadcast(&connp->conn_sq_cv); 1374 1375 /* 1376 * We are doing something on behalf of another thread, so we have to 1377 * pause and wait until it finishes. 1378 */ 1379 while (sqp->sq_state & SQS_PAUSE) { 1380 cv_wait(&sqp->sq_synch_cv, &sqp->sq_lock); 1381 } 1382 mutex_exit(&sqp->sq_lock); 1383 } 1384 1385 int 1386 squeue_synch_enter(squeue_t *sqp, conn_t *connp, mblk_t *use_mp) 1387 { 1388 mutex_enter(&sqp->sq_lock); 1389 if (sqp->sq_first == NULL && !(sqp->sq_state & SQS_PROC)) { 1390 /* 1391 * We are OK to proceed if the squeue is empty, and 1392 * no one owns the squeue. 1393 * 1394 * The caller won't own the squeue as this is called from the 1395 * application. 1396 */ 1397 ASSERT(sqp->sq_run == NULL); 1398 1399 sqp->sq_state |= SQS_PROC; 1400 sqp->sq_run = curthread; 1401 mutex_exit(&sqp->sq_lock); 1402 1403 #if SQUEUE_DEBUG 1404 sqp->sq_curmp = NULL; 1405 sqp->sq_curproc = NULL; 1406 sqp->sq_connp = connp; 1407 #endif 1408 connp->conn_on_sqp = B_TRUE; 1409 return (0); 1410 } else { 1411 mblk_t *mp; 1412 1413 mp = (use_mp == NULL) ? allocb(0, BPRI_MED) : use_mp; 1414 if (mp == NULL) { 1415 mutex_exit(&sqp->sq_lock); 1416 return (ENOMEM); 1417 } 1418 1419 /* 1420 * We mark the mblk as awaiting synchronous squeue access 1421 * by setting the MSGWAITSYNC flag. Once squeue_wakeup_conn 1422 * fires, MSGWAITSYNC is cleared, at which point we know we 1423 * have exclusive access. 1424 */ 1425 mp->b_flag |= MSGWAITSYNC; 1426 1427 CONN_INC_REF(connp); 1428 SET_SQUEUE(mp, squeue_wakeup_conn, connp); 1429 ENQUEUE_CHAIN(sqp, mp, mp, 1); 1430 1431 ASSERT(sqp->sq_run != curthread); 1432 1433 /* Wait until the enqueued mblk get processed. */ 1434 while (mp->b_flag & MSGWAITSYNC) 1435 cv_wait(&connp->conn_sq_cv, &sqp->sq_lock); 1436 mutex_exit(&sqp->sq_lock); 1437 1438 if (use_mp == NULL) 1439 freeb(mp); 1440 1441 return (0); 1442 } 1443 } 1444 1445 void 1446 squeue_synch_exit(squeue_t *sqp, conn_t *connp) 1447 { 1448 mutex_enter(&sqp->sq_lock); 1449 if (sqp->sq_run == curthread) { 1450 ASSERT(sqp->sq_state & SQS_PROC); 1451 1452 sqp->sq_state &= ~SQS_PROC; 1453 sqp->sq_run = NULL; 1454 connp->conn_on_sqp = B_FALSE; 1455 1456 if (sqp->sq_first == NULL) { 1457 mutex_exit(&sqp->sq_lock); 1458 } else { 1459 /* 1460 * If this was a normal thread, then it would 1461 * (most likely) continue processing the pending 1462 * requests. Since the just completed operation 1463 * was executed synchronously, the thread should 1464 * not be delayed. To compensate, wake up the 1465 * worker thread right away when there are outstanding 1466 * requests. 1467 */ 1468 sqp->sq_awaken = ddi_get_lbolt(); 1469 cv_signal(&sqp->sq_worker_cv); 1470 mutex_exit(&sqp->sq_lock); 1471 } 1472 } else { 1473 /* 1474 * The caller doesn't own the squeue, clear the SQS_PAUSE flag, 1475 * and wake up the squeue owner, such that owner can continue 1476 * processing. 1477 */ 1478 ASSERT(sqp->sq_state & SQS_PAUSE); 1479 sqp->sq_state &= ~SQS_PAUSE; 1480 1481 /* There should be only one thread blocking on sq_synch_cv. */ 1482 cv_signal(&sqp->sq_synch_cv); 1483 mutex_exit(&sqp->sq_lock); 1484 } 1485 } 1486