1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. 23 */ 24 25 /* 26 * Squeues: General purpose serialization mechanism 27 * ------------------------------------------------ 28 * 29 * Background: 30 * ----------- 31 * 32 * This is a general purpose high-performance serialization mechanism 33 * currently used by TCP/IP. It is implement by means of a per CPU queue, 34 * a worker thread and a polling thread with are bound to the CPU 35 * associated with the squeue. The squeue is strictly FIFO for both read 36 * and write side and only one thread can process it at any given time. 37 * The design goal of squeue was to offer a very high degree of 38 * parallelization (on a per H/W execution pipeline basis) with at 39 * most one queuing. 40 * 41 * The modules needing protection typically calls SQUEUE_ENTER_ONE() or 42 * SQUEUE_ENTER() macro as soon as a thread enter the module 43 * from either direction. For each packet, the processing function 44 * and argument is stored in the mblk itself. When the packet is ready 45 * to be processed, the squeue retrieves the stored function and calls 46 * it with the supplied argument and the pointer to the packet itself. 47 * The called function can assume that no other thread is processing 48 * the squeue when it is executing. 49 * 50 * Squeue/connection binding: 51 * -------------------------- 52 * 53 * TCP/IP uses an IP classifier in conjunction with squeue where specific 54 * connections are assigned to specific squeue (based on various policies), 55 * at the connection creation time. Once assigned, the connection to 56 * squeue mapping is never changed and all future packets for that 57 * connection are processed on that squeue. The connection ("conn") to 58 * squeue mapping is stored in "conn_t" member "conn_sqp". 59 * 60 * Since the processing of the connection cuts across multiple layers 61 * but still allows packets for different connnection to be processed on 62 * other CPU/squeues, squeues are also termed as "Vertical Perimeter" or 63 * "Per Connection Vertical Perimeter". 64 * 65 * Processing Model: 66 * ----------------- 67 * 68 * Squeue doesn't necessary processes packets with its own worker thread. 69 * The callers can pick if they just want to queue the packet, process 70 * their packet if nothing is queued or drain and process. The first two 71 * modes are typically employed when the packet was generated while 72 * already doing the processing behind the squeue and last mode (drain 73 * and process) is typically employed when the thread is entering squeue 74 * for the first time. The squeue still imposes a finite time limit 75 * for which a external thread can do processing after which it switches 76 * processing to its own worker thread. 77 * 78 * Once created, squeues are never deleted. Hence squeue pointers are 79 * always valid. This means that functions outside the squeue can still 80 * refer safely to conn_sqp and their is no need for ref counts. 81 * 82 * Only a thread executing in the squeue can change the squeue of the 83 * connection. It does so by calling a squeue framework function to do this. 84 * After changing the squeue, the thread must leave the squeue. It must not 85 * continue to execute any code that needs squeue protection. 86 * 87 * The squeue framework, after entering the squeue, checks if the current 88 * squeue matches the conn_sqp. If the check fails, the packet is delivered 89 * to right squeue. 90 * 91 * Polling Model: 92 * -------------- 93 * 94 * Squeues can control the rate of packet arrival into itself from the 95 * NIC or specific Rx ring within a NIC. As part of capability negotiation 96 * between IP and MAC layer, squeue are created for each TCP soft ring 97 * (or TCP Rx ring - to be implemented in future). As part of this 98 * negotiation, squeues get a cookie for underlying soft ring or Rx 99 * ring, a function to turn off incoming packets and a function to call 100 * to poll for packets. This helps schedule the receive side packet 101 * processing so that queue backlog doesn't build up and packet processing 102 * doesn't keep getting disturbed by high priority interrupts. As part 103 * of this mode, as soon as a backlog starts building, squeue turns off 104 * the interrupts and switches to poll mode. In poll mode, when poll 105 * thread goes down to retrieve packets, it retrieves them in the form of 106 * a chain which improves performance even more. As the squeue/softring 107 * system gets more packets, it gets more efficient by switching to 108 * polling more often and dealing with larger packet chains. 109 * 110 */ 111 112 #include <sys/types.h> 113 #include <sys/cmn_err.h> 114 #include <sys/debug.h> 115 #include <sys/kmem.h> 116 #include <sys/cpuvar.h> 117 #include <sys/condvar_impl.h> 118 #include <sys/systm.h> 119 #include <sys/callb.h> 120 #include <sys/sdt.h> 121 #include <sys/ddi.h> 122 #include <sys/sunddi.h> 123 124 #include <inet/ipclassifier.h> 125 #include <inet/udp_impl.h> 126 127 #include <sys/squeue_impl.h> 128 129 static void squeue_fire(void *); 130 static void squeue_drain(squeue_t *, uint_t, hrtime_t); 131 static void squeue_worker(squeue_t *sqp); 132 static void squeue_polling_thread(squeue_t *sqp); 133 134 kmem_cache_t *squeue_cache; 135 136 #define SQUEUE_MSEC_TO_NSEC 1000000 137 138 int squeue_drain_ms = 20; 139 int squeue_workerwait_ms = 0; 140 141 /* The values above converted to ticks or nano seconds */ 142 static int squeue_drain_ns = 0; 143 static int squeue_workerwait_tick = 0; 144 145 #define MAX_BYTES_TO_PICKUP 150000 146 147 #define ENQUEUE_CHAIN(sqp, mp, tail, cnt) { \ 148 /* \ 149 * Enqueue our mblk chain. \ 150 */ \ 151 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 152 \ 153 if ((sqp)->sq_last != NULL) \ 154 (sqp)->sq_last->b_next = (mp); \ 155 else \ 156 (sqp)->sq_first = (mp); \ 157 (sqp)->sq_last = (tail); \ 158 (sqp)->sq_count += (cnt); \ 159 ASSERT((sqp)->sq_count > 0); \ 160 DTRACE_PROBE4(squeue__enqueuechain, squeue_t *, sqp, \ 161 mblk_t *, mp, mblk_t *, tail, int, cnt); \ 162 \ 163 } 164 165 /* 166 * Blank the receive ring (in this case it is the soft ring). When 167 * blanked, the soft ring will not send any more packets up. 168 * Blanking may not succeed when there is a CPU already in the soft 169 * ring sending packets up. In that case, SQS_POLLING will not be 170 * set. 171 */ 172 #define SQS_POLLING_ON(sqp, sq_poll_capable, rx_ring) { \ 173 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 174 if (sq_poll_capable) { \ 175 ASSERT(rx_ring != NULL); \ 176 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \ 177 if (!(sqp->sq_state & SQS_POLLING)) { \ 178 if (rx_ring->rr_intr_disable(rx_ring->rr_intr_handle)) \ 179 sqp->sq_state |= SQS_POLLING; \ 180 } \ 181 } \ 182 } 183 184 #define SQS_POLLING_OFF(sqp, sq_poll_capable, rx_ring) { \ 185 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 186 if (sq_poll_capable) { \ 187 ASSERT(rx_ring != NULL); \ 188 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \ 189 if (sqp->sq_state & SQS_POLLING) { \ 190 sqp->sq_state &= ~SQS_POLLING; \ 191 rx_ring->rr_intr_enable(rx_ring->rr_intr_handle); \ 192 } \ 193 } \ 194 } 195 196 /* Wakeup poll thread only if SQS_POLLING is set */ 197 #define SQS_POLL_RING(sqp) { \ 198 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 199 if (sqp->sq_state & SQS_POLLING) { \ 200 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \ 201 if (!(sqp->sq_state & SQS_GET_PKTS)) { \ 202 sqp->sq_state |= SQS_GET_PKTS; \ 203 cv_signal(&sqp->sq_poll_cv); \ 204 } \ 205 } \ 206 } 207 208 #ifdef DEBUG 209 #define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) { \ 210 (sqp)->sq_curmp = (mp); \ 211 (sqp)->sq_curproc = (proc); \ 212 (sqp)->sq_connp = (connp); \ 213 (mp)->b_tag = (sqp)->sq_tag = (tag); \ 214 } 215 216 #define SQUEUE_DBG_CLEAR(sqp) { \ 217 (sqp)->sq_curmp = NULL; \ 218 (sqp)->sq_curproc = NULL; \ 219 (sqp)->sq_connp = NULL; \ 220 } 221 #else 222 #define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) 223 #define SQUEUE_DBG_CLEAR(sqp) 224 #endif 225 226 void 227 squeue_init(void) 228 { 229 squeue_cache = kmem_cache_create("squeue_cache", 230 sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0); 231 232 squeue_drain_ns = squeue_drain_ms * SQUEUE_MSEC_TO_NSEC; 233 squeue_workerwait_tick = MSEC_TO_TICK_ROUNDUP(squeue_workerwait_ms); 234 } 235 236 /* ARGSUSED */ 237 squeue_t * 238 squeue_create(clock_t wait, pri_t pri) 239 { 240 squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP); 241 242 bzero(sqp, sizeof (squeue_t)); 243 sqp->sq_bind = PBIND_NONE; 244 sqp->sq_priority = pri; 245 sqp->sq_wait = MSEC_TO_TICK(wait); 246 sqp->sq_worker = thread_create(NULL, 0, squeue_worker, 247 sqp, 0, &p0, TS_RUN, pri); 248 249 sqp->sq_poll_thr = thread_create(NULL, 0, squeue_polling_thread, 250 sqp, 0, &p0, TS_RUN, pri); 251 252 sqp->sq_enter = squeue_enter; 253 sqp->sq_drain = squeue_drain; 254 255 return (sqp); 256 } 257 258 /* 259 * Bind squeue worker thread to the specified CPU, given by CPU id. 260 * If the CPU id value is -1, bind the worker thread to the value 261 * specified in sq_bind field. If a thread is already bound to a 262 * different CPU, unbind it from the old CPU and bind to the new one. 263 */ 264 265 void 266 squeue_bind(squeue_t *sqp, processorid_t bind) 267 { 268 mutex_enter(&sqp->sq_lock); 269 ASSERT(sqp->sq_bind != PBIND_NONE || bind != PBIND_NONE); 270 ASSERT(MUTEX_HELD(&cpu_lock)); 271 272 if (sqp->sq_state & SQS_BOUND) { 273 if (sqp->sq_bind == bind) { 274 mutex_exit(&sqp->sq_lock); 275 return; 276 } 277 thread_affinity_clear(sqp->sq_worker); 278 } else { 279 sqp->sq_state |= SQS_BOUND; 280 } 281 282 if (bind != PBIND_NONE) 283 sqp->sq_bind = bind; 284 285 thread_affinity_set(sqp->sq_worker, sqp->sq_bind); 286 mutex_exit(&sqp->sq_lock); 287 } 288 289 void 290 squeue_unbind(squeue_t *sqp) 291 { 292 mutex_enter(&sqp->sq_lock); 293 if (!(sqp->sq_state & SQS_BOUND)) { 294 mutex_exit(&sqp->sq_lock); 295 return; 296 } 297 298 sqp->sq_state &= ~SQS_BOUND; 299 thread_affinity_clear(sqp->sq_worker); 300 mutex_exit(&sqp->sq_lock); 301 } 302 303 void 304 squeue_worker_wakeup(squeue_t *sqp) 305 { 306 timeout_id_t tid = (sqp)->sq_tid; 307 308 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); 309 310 if (sqp->sq_wait == 0) { 311 ASSERT(tid == 0); 312 ASSERT(!(sqp->sq_state & SQS_TMO_PROG)); 313 sqp->sq_awaken = ddi_get_lbolt(); 314 cv_signal(&sqp->sq_worker_cv); 315 mutex_exit(&sqp->sq_lock); 316 return; 317 } 318 319 /* 320 * Queue isn't being processed, so take 321 * any post enqueue actions needed before leaving. 322 */ 323 if (tid != 0) { 324 /* 325 * Waiting for an enter() to process mblk(s). 326 */ 327 clock_t now = ddi_get_lbolt(); 328 clock_t waited = now - sqp->sq_awaken; 329 330 if (TICK_TO_MSEC(waited) >= sqp->sq_wait) { 331 /* 332 * Times up and have a worker thread 333 * waiting for work, so schedule it. 334 */ 335 sqp->sq_tid = 0; 336 sqp->sq_awaken = now; 337 cv_signal(&sqp->sq_worker_cv); 338 mutex_exit(&sqp->sq_lock); 339 (void) untimeout(tid); 340 return; 341 } 342 mutex_exit(&sqp->sq_lock); 343 return; 344 } else if (sqp->sq_state & SQS_TMO_PROG) { 345 mutex_exit(&sqp->sq_lock); 346 return; 347 } else { 348 clock_t wait = sqp->sq_wait; 349 /* 350 * Wait up to sqp->sq_wait ms for an 351 * enter() to process this queue. We 352 * don't want to contend on timeout locks 353 * with sq_lock held for performance reasons, 354 * so drop the sq_lock before calling timeout 355 * but we need to check if timeout is required 356 * after re acquiring the sq_lock. Once 357 * the sq_lock is dropped, someone else could 358 * have processed the packet or the timeout could 359 * have already fired. 360 */ 361 sqp->sq_state |= SQS_TMO_PROG; 362 mutex_exit(&sqp->sq_lock); 363 tid = timeout(squeue_fire, sqp, wait); 364 mutex_enter(&sqp->sq_lock); 365 /* Check again if we still need the timeout */ 366 if (((sqp->sq_state & (SQS_PROC|SQS_TMO_PROG)) == 367 SQS_TMO_PROG) && (sqp->sq_tid == 0) && 368 (sqp->sq_first != NULL)) { 369 sqp->sq_state &= ~SQS_TMO_PROG; 370 sqp->sq_tid = tid; 371 mutex_exit(&sqp->sq_lock); 372 return; 373 } else { 374 if (sqp->sq_state & SQS_TMO_PROG) { 375 sqp->sq_state &= ~SQS_TMO_PROG; 376 mutex_exit(&sqp->sq_lock); 377 (void) untimeout(tid); 378 } else { 379 /* 380 * The timer fired before we could 381 * reacquire the sq_lock. squeue_fire 382 * removes the SQS_TMO_PROG flag 383 * and we don't need to do anything 384 * else. 385 */ 386 mutex_exit(&sqp->sq_lock); 387 } 388 } 389 } 390 391 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 392 } 393 394 /* 395 * squeue_enter() - enter squeue sqp with mblk mp (which can be 396 * a chain), while tail points to the end and cnt in number of 397 * mblks in the chain. 398 * 399 * For a chain of single packet (i.e. mp == tail), go through the 400 * fast path if no one is processing the squeue and nothing is queued. 401 * 402 * The proc and arg for each mblk is already stored in the mblk in 403 * appropriate places. 404 * 405 * The process_flag specifies if we are allowed to process the mblk 406 * and drain in the entering thread context. If process_flag is 407 * SQ_FILL, then we just queue the mblk and return (after signaling 408 * the worker thread if no one else is processing the squeue). 409 * 410 * The ira argument can be used when the count is one. 411 * For a chain the caller needs to prepend any needed mblks from 412 * ip_recv_attr_to_mblk(). 413 */ 414 /* ARGSUSED */ 415 void 416 squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt, 417 ip_recv_attr_t *ira, int process_flag, uint8_t tag) 418 { 419 conn_t *connp; 420 sqproc_t proc; 421 hrtime_t now; 422 423 ASSERT(sqp != NULL); 424 ASSERT(mp != NULL); 425 ASSERT(tail != NULL); 426 ASSERT(cnt > 0); 427 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 428 ASSERT(ira == NULL || cnt == 1); 429 430 mutex_enter(&sqp->sq_lock); 431 432 /* 433 * Try to process the packet if SQ_FILL flag is not set and 434 * we are allowed to process the squeue. The SQ_NODRAIN is 435 * ignored if the packet chain consists of more than 1 packet. 436 */ 437 if (!(sqp->sq_state & SQS_PROC) && ((process_flag == SQ_PROCESS) || 438 (process_flag == SQ_NODRAIN && sqp->sq_first == NULL))) { 439 /* 440 * See if anything is already queued. If we are the 441 * first packet, do inline processing else queue the 442 * packet and do the drain. 443 */ 444 if (sqp->sq_first == NULL && cnt == 1) { 445 /* 446 * Fast-path, ok to process and nothing queued. 447 */ 448 sqp->sq_state |= (SQS_PROC|SQS_FAST); 449 sqp->sq_run = curthread; 450 mutex_exit(&sqp->sq_lock); 451 452 /* 453 * We are the chain of 1 packet so 454 * go through this fast path. 455 */ 456 ASSERT(mp->b_prev != NULL); 457 ASSERT(mp->b_queue != NULL); 458 connp = (conn_t *)mp->b_prev; 459 mp->b_prev = NULL; 460 proc = (sqproc_t)mp->b_queue; 461 mp->b_queue = NULL; 462 ASSERT(proc != NULL && connp != NULL); 463 ASSERT(mp->b_next == NULL); 464 465 /* 466 * Handle squeue switching. More details in the 467 * block comment at the top of the file 468 */ 469 if (connp->conn_sqp == sqp) { 470 SQUEUE_DBG_SET(sqp, mp, proc, connp, 471 tag); 472 connp->conn_on_sqp = B_TRUE; 473 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 474 sqp, mblk_t *, mp, conn_t *, connp); 475 (*proc)(connp, mp, sqp, ira); 476 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 477 sqp, conn_t *, connp); 478 connp->conn_on_sqp = B_FALSE; 479 SQUEUE_DBG_CLEAR(sqp); 480 CONN_DEC_REF(connp); 481 } else { 482 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, 483 connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE); 484 } 485 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 486 mutex_enter(&sqp->sq_lock); 487 sqp->sq_state &= ~(SQS_PROC|SQS_FAST); 488 sqp->sq_run = NULL; 489 if (sqp->sq_first == NULL || 490 process_flag == SQ_NODRAIN) { 491 if (sqp->sq_first != NULL) { 492 squeue_worker_wakeup(sqp); 493 return; 494 } 495 /* 496 * We processed inline our packet and nothing 497 * new has arrived. We are done. In case any 498 * control actions are pending, wake up the 499 * worker. 500 */ 501 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) 502 cv_signal(&sqp->sq_worker_cv); 503 mutex_exit(&sqp->sq_lock); 504 return; 505 } 506 } else { 507 if (ira != NULL) { 508 mblk_t *attrmp; 509 510 ASSERT(cnt == 1); 511 attrmp = ip_recv_attr_to_mblk(ira); 512 if (attrmp == NULL) { 513 mutex_exit(&sqp->sq_lock); 514 ip_drop_input("squeue: " 515 "ip_recv_attr_to_mblk", 516 mp, NULL); 517 /* Caller already set b_prev/b_next */ 518 mp->b_prev = mp->b_next = NULL; 519 freemsg(mp); 520 return; 521 } 522 ASSERT(attrmp->b_cont == NULL); 523 attrmp->b_cont = mp; 524 /* Move connp and func to new */ 525 attrmp->b_queue = mp->b_queue; 526 mp->b_queue = NULL; 527 attrmp->b_prev = mp->b_prev; 528 mp->b_prev = NULL; 529 530 ASSERT(mp == tail); 531 tail = mp = attrmp; 532 } 533 534 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 535 #ifdef DEBUG 536 mp->b_tag = tag; 537 #endif 538 } 539 /* 540 * We are here because either we couldn't do inline 541 * processing (because something was already queued), 542 * or we had a chain of more than one packet, 543 * or something else arrived after we were done with 544 * inline processing. 545 */ 546 ASSERT(MUTEX_HELD(&sqp->sq_lock)); 547 ASSERT(sqp->sq_first != NULL); 548 now = gethrtime(); 549 sqp->sq_run = curthread; 550 sqp->sq_drain(sqp, SQS_ENTER, now + squeue_drain_ns); 551 552 /* 553 * If we didn't do a complete drain, the worker 554 * thread was already signalled by squeue_drain. 555 * In case any control actions are pending, wake 556 * up the worker. 557 */ 558 sqp->sq_run = NULL; 559 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) 560 cv_signal(&sqp->sq_worker_cv); 561 mutex_exit(&sqp->sq_lock); 562 return; 563 } else { 564 /* 565 * We let a thread processing a squeue reenter only 566 * once. This helps the case of incoming connection 567 * where a SYN-ACK-ACK that triggers the conn_ind 568 * doesn't have to queue the packet if listener and 569 * eager are on the same squeue. Also helps the 570 * loopback connection where the two ends are bound 571 * to the same squeue (which is typical on single 572 * CPU machines). 573 * 574 * We let the thread reenter only once for the fear 575 * of stack getting blown with multiple traversal. 576 */ 577 connp = (conn_t *)mp->b_prev; 578 if (!(sqp->sq_state & SQS_REENTER) && 579 (process_flag != SQ_FILL) && (sqp->sq_first == NULL) && 580 (sqp->sq_run == curthread) && (cnt == 1) && 581 (connp->conn_on_sqp == B_FALSE)) { 582 sqp->sq_state |= SQS_REENTER; 583 mutex_exit(&sqp->sq_lock); 584 585 ASSERT(mp->b_prev != NULL); 586 ASSERT(mp->b_queue != NULL); 587 588 mp->b_prev = NULL; 589 proc = (sqproc_t)mp->b_queue; 590 mp->b_queue = NULL; 591 592 /* 593 * Handle squeue switching. More details in the 594 * block comment at the top of the file 595 */ 596 if (connp->conn_sqp == sqp) { 597 connp->conn_on_sqp = B_TRUE; 598 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 599 sqp, mblk_t *, mp, conn_t *, connp); 600 (*proc)(connp, mp, sqp, ira); 601 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 602 sqp, conn_t *, connp); 603 connp->conn_on_sqp = B_FALSE; 604 CONN_DEC_REF(connp); 605 } else { 606 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, 607 connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE); 608 } 609 610 mutex_enter(&sqp->sq_lock); 611 sqp->sq_state &= ~SQS_REENTER; 612 mutex_exit(&sqp->sq_lock); 613 return; 614 } 615 616 /* 617 * Queue is already being processed or there is already 618 * one or more paquets on the queue. Enqueue the 619 * packet and wakeup the squeue worker thread if the 620 * squeue is not being processed. 621 */ 622 #ifdef DEBUG 623 mp->b_tag = tag; 624 #endif 625 if (ira != NULL) { 626 mblk_t *attrmp; 627 628 ASSERT(cnt == 1); 629 attrmp = ip_recv_attr_to_mblk(ira); 630 if (attrmp == NULL) { 631 mutex_exit(&sqp->sq_lock); 632 ip_drop_input("squeue: ip_recv_attr_to_mblk", 633 mp, NULL); 634 /* Caller already set b_prev/b_next */ 635 mp->b_prev = mp->b_next = NULL; 636 freemsg(mp); 637 return; 638 } 639 ASSERT(attrmp->b_cont == NULL); 640 attrmp->b_cont = mp; 641 /* Move connp and func to new */ 642 attrmp->b_queue = mp->b_queue; 643 mp->b_queue = NULL; 644 attrmp->b_prev = mp->b_prev; 645 mp->b_prev = NULL; 646 647 ASSERT(mp == tail); 648 tail = mp = attrmp; 649 } 650 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 651 if (!(sqp->sq_state & SQS_PROC)) { 652 squeue_worker_wakeup(sqp); 653 return; 654 } 655 /* 656 * In case any control actions are pending, wake 657 * up the worker. 658 */ 659 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) 660 cv_signal(&sqp->sq_worker_cv); 661 mutex_exit(&sqp->sq_lock); 662 return; 663 } 664 } 665 666 /* 667 * PRIVATE FUNCTIONS 668 */ 669 670 static void 671 squeue_fire(void *arg) 672 { 673 squeue_t *sqp = arg; 674 uint_t state; 675 676 mutex_enter(&sqp->sq_lock); 677 678 state = sqp->sq_state; 679 if (sqp->sq_tid == 0 && !(state & SQS_TMO_PROG)) { 680 mutex_exit(&sqp->sq_lock); 681 return; 682 } 683 684 sqp->sq_tid = 0; 685 /* 686 * The timeout fired before we got a chance to set it. 687 * Process it anyway but remove the SQS_TMO_PROG so that 688 * the guy trying to set the timeout knows that it has 689 * already been processed. 690 */ 691 if (state & SQS_TMO_PROG) 692 sqp->sq_state &= ~SQS_TMO_PROG; 693 694 if (!(state & SQS_PROC)) { 695 sqp->sq_awaken = ddi_get_lbolt(); 696 cv_signal(&sqp->sq_worker_cv); 697 } 698 mutex_exit(&sqp->sq_lock); 699 } 700 701 static void 702 squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire) 703 { 704 mblk_t *mp; 705 mblk_t *head; 706 sqproc_t proc; 707 conn_t *connp; 708 timeout_id_t tid; 709 ill_rx_ring_t *sq_rx_ring = sqp->sq_rx_ring; 710 hrtime_t now; 711 boolean_t did_wakeup = B_FALSE; 712 boolean_t sq_poll_capable; 713 ip_recv_attr_t *ira, iras; 714 715 sq_poll_capable = (sqp->sq_state & SQS_POLL_CAPAB) != 0; 716 again: 717 ASSERT(mutex_owned(&sqp->sq_lock)); 718 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 719 SQS_POLL_QUIESCE_DONE))); 720 721 head = sqp->sq_first; 722 sqp->sq_first = NULL; 723 sqp->sq_last = NULL; 724 sqp->sq_count = 0; 725 726 if ((tid = sqp->sq_tid) != 0) 727 sqp->sq_tid = 0; 728 729 sqp->sq_state |= SQS_PROC | proc_type; 730 731 /* 732 * We have backlog built up. Switch to polling mode if the 733 * device underneath allows it. Need to do it so that 734 * more packets don't come in and disturb us (by contending 735 * for sq_lock or higher priority thread preempting us). 736 * 737 * The worker thread is allowed to do active polling while we 738 * just disable the interrupts for drain by non worker (kernel 739 * or userland) threads so they can peacefully process the 740 * packets during time allocated to them. 741 */ 742 SQS_POLLING_ON(sqp, sq_poll_capable, sq_rx_ring); 743 mutex_exit(&sqp->sq_lock); 744 745 if (tid != 0) 746 (void) untimeout(tid); 747 748 while ((mp = head) != NULL) { 749 750 head = mp->b_next; 751 mp->b_next = NULL; 752 753 proc = (sqproc_t)mp->b_queue; 754 mp->b_queue = NULL; 755 connp = (conn_t *)mp->b_prev; 756 mp->b_prev = NULL; 757 758 /* Is there an ip_recv_attr_t to handle? */ 759 if (ip_recv_attr_is_mblk(mp)) { 760 mblk_t *attrmp = mp; 761 762 ASSERT(attrmp->b_cont != NULL); 763 764 mp = attrmp->b_cont; 765 attrmp->b_cont = NULL; 766 ASSERT(mp->b_queue == NULL); 767 ASSERT(mp->b_prev == NULL); 768 769 if (!ip_recv_attr_from_mblk(attrmp, &iras)) { 770 /* The ill or ip_stack_t disappeared on us */ 771 ip_drop_input("ip_recv_attr_from_mblk", 772 mp, NULL); 773 ira_cleanup(&iras, B_TRUE); 774 CONN_DEC_REF(connp); 775 continue; 776 } 777 ira = &iras; 778 } else { 779 ira = NULL; 780 } 781 782 783 /* 784 * Handle squeue switching. More details in the 785 * block comment at the top of the file 786 */ 787 if (connp->conn_sqp == sqp) { 788 SQUEUE_DBG_SET(sqp, mp, proc, connp, 789 mp->b_tag); 790 connp->conn_on_sqp = B_TRUE; 791 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 792 sqp, mblk_t *, mp, conn_t *, connp); 793 (*proc)(connp, mp, sqp, ira); 794 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 795 sqp, conn_t *, connp); 796 connp->conn_on_sqp = B_FALSE; 797 CONN_DEC_REF(connp); 798 } else { 799 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, connp, ira, 800 SQ_FILL, SQTAG_SQUEUE_CHANGE); 801 } 802 if (ira != NULL) 803 ira_cleanup(ira, B_TRUE); 804 } 805 806 SQUEUE_DBG_CLEAR(sqp); 807 808 mutex_enter(&sqp->sq_lock); 809 810 /* 811 * Check if there is still work to do (either more arrived or timer 812 * expired). If we are the worker thread and we are polling capable, 813 * continue doing the work since no one else is around to do the 814 * work anyway (but signal the poll thread to retrieve some packets 815 * in the meanwhile). If we are not the worker thread, just 816 * signal the worker thread to take up the work if processing time 817 * has expired. 818 */ 819 if (sqp->sq_first != NULL) { 820 /* 821 * Still more to process. If time quanta not expired, we 822 * should let the drain go on. The worker thread is allowed 823 * to drain as long as there is anything left. 824 */ 825 now = gethrtime(); 826 if ((now < expire) || (proc_type == SQS_WORKER)) { 827 /* 828 * If time not expired or we are worker thread and 829 * this squeue is polling capable, continue to do 830 * the drain. 831 * 832 * We turn off interrupts for all userland threads 833 * doing drain but we do active polling only for 834 * worker thread. 835 * 836 * Calling SQS_POLL_RING() even in the case of 837 * SQS_POLLING_ON() not succeeding is ok as 838 * SQS_POLL_RING() will not wake up poll thread 839 * if SQS_POLLING bit is not set. 840 */ 841 if (proc_type == SQS_WORKER) 842 SQS_POLL_RING(sqp); 843 goto again; 844 } else { 845 did_wakeup = B_TRUE; 846 sqp->sq_awaken = ddi_get_lbolt(); 847 cv_signal(&sqp->sq_worker_cv); 848 } 849 } 850 851 /* 852 * If the poll thread is already running, just return. The 853 * poll thread continues to hold the proc and will finish 854 * processing. 855 */ 856 if (sqp->sq_state & SQS_GET_PKTS) { 857 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 858 SQS_POLL_QUIESCE_DONE))); 859 sqp->sq_state &= ~proc_type; 860 return; 861 } 862 863 /* 864 * 865 * If we are the worker thread and no work is left, send the poll 866 * thread down once more to see if something arrived. Otherwise, 867 * turn the interrupts back on and we are done. 868 */ 869 if ((proc_type == SQS_WORKER) && (sqp->sq_state & SQS_POLLING)) { 870 /* 871 * Do one last check to see if anything arrived 872 * in the NIC. We leave the SQS_PROC set to ensure 873 * that poll thread keeps the PROC and can decide 874 * if it needs to turn polling off or continue 875 * processing. 876 * 877 * If we drop the SQS_PROC here and poll thread comes 878 * up empty handed, it can not safely turn polling off 879 * since someone else could have acquired the PROC 880 * and started draining. The previously running poll 881 * thread and the current thread doing drain would end 882 * up in a race for turning polling on/off and more 883 * complex code would be required to deal with it. 884 * 885 * Its lot simpler for drain to hand the SQS_PROC to 886 * poll thread (if running) and let poll thread finish 887 * without worrying about racing with any other thread. 888 */ 889 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 890 SQS_POLL_QUIESCE_DONE))); 891 SQS_POLL_RING(sqp); 892 sqp->sq_state &= ~proc_type; 893 } else { 894 /* 895 * The squeue is either not capable of polling or the 896 * attempt to blank (i.e., turn SQS_POLLING_ON()) was 897 * unsuccessful or poll thread already finished 898 * processing and didn't find anything. Since there 899 * is nothing queued and we already turn polling on 900 * (for all threads doing drain), we should turn 901 * polling off and relinquish the PROC. 902 */ 903 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 904 SQS_POLL_QUIESCE_DONE))); 905 SQS_POLLING_OFF(sqp, sq_poll_capable, sq_rx_ring); 906 sqp->sq_state &= ~(SQS_PROC | proc_type); 907 if (!did_wakeup && sqp->sq_first != NULL) { 908 squeue_worker_wakeup(sqp); 909 mutex_enter(&sqp->sq_lock); 910 } 911 /* 912 * If we are not the worker and there is a pending quiesce 913 * event, wake up the worker 914 */ 915 if ((proc_type != SQS_WORKER) && 916 (sqp->sq_state & SQS_WORKER_THR_CONTROL)) 917 cv_signal(&sqp->sq_worker_cv); 918 } 919 } 920 921 /* 922 * Quiesce, Restart, or Cleanup of the squeue poll thread. 923 * 924 * Quiesce and Restart: After an squeue poll thread has been quiesced, it does 925 * not attempt to poll the underlying soft ring any more. The quiesce is 926 * triggered by the mac layer when it wants to quiesce a soft ring. Typically 927 * control operations such as changing the fanout of a NIC or VNIC (dladm 928 * setlinkprop) need to quiesce data flow before changing the wiring. 929 * The operation is done by the mac layer, but it calls back into IP to 930 * quiesce the soft ring. After completing the operation (say increase or 931 * decrease of the fanout) the mac layer then calls back into IP to restart 932 * the quiesced soft ring. 933 * 934 * Cleanup: This is triggered when the squeue binding to a soft ring is 935 * removed permanently. Typically interface plumb and unplumb would trigger 936 * this. It can also be triggered from the mac layer when a soft ring is 937 * being deleted say as the result of a fanout reduction. Since squeues are 938 * never deleted, the cleanup marks the squeue as fit for recycling and 939 * moves it to the zeroth squeue set. 940 */ 941 static void 942 squeue_poll_thr_control(squeue_t *sqp) 943 { 944 if (sqp->sq_state & SQS_POLL_THR_RESTART) { 945 /* Restart implies a previous quiesce */ 946 ASSERT(sqp->sq_state & SQS_POLL_THR_QUIESCED); 947 sqp->sq_state &= ~(SQS_POLL_THR_QUIESCED | 948 SQS_POLL_THR_RESTART); 949 sqp->sq_state |= SQS_POLL_CAPAB; 950 cv_signal(&sqp->sq_worker_cv); 951 return; 952 } 953 954 if (sqp->sq_state & SQS_POLL_THR_QUIESCE) { 955 sqp->sq_state |= SQS_POLL_THR_QUIESCED; 956 sqp->sq_state &= ~SQS_POLL_THR_QUIESCE; 957 cv_signal(&sqp->sq_worker_cv); 958 return; 959 } 960 } 961 962 /* 963 * POLLING Notes 964 * 965 * With polling mode, we want to do as much processing as we possibly can 966 * in worker thread context. The sweet spot is worker thread keeps doing 967 * work all the time in polling mode and writers etc. keep dumping packets 968 * to worker thread. Occassionally, we send the poll thread (running at 969 * lower priority to NIC to get the chain of packets to feed to worker). 970 * Sending the poll thread down to NIC is dependant on 3 criterions 971 * 972 * 1) Its always driven from squeue_drain and only if worker thread is 973 * doing the drain. 974 * 2) We clear the backlog once and more packets arrived in between. 975 * Before starting drain again, send the poll thread down if 976 * the drain is being done by worker thread. 977 * 3) Before exiting the squeue_drain, if the poll thread is not already 978 * working and we are the worker thread, try to poll one more time. 979 * 980 * For latency sake, we do allow any thread calling squeue_enter 981 * to process its packet provided: 982 * 983 * 1) Nothing is queued 984 * 2) If more packets arrived in between, the non worker thread are allowed 985 * to do the drain till their time quanta expired provided SQS_GET_PKTS 986 * wasn't set in between. 987 * 988 * Avoiding deadlocks with interrupts 989 * ================================== 990 * 991 * One of the big problem is that we can't send poll_thr down while holding 992 * the sq_lock since the thread can block. So we drop the sq_lock before 993 * calling sq_get_pkts(). We keep holding the SQS_PROC as long as the 994 * poll thread is running so that no other thread can acquire the 995 * perimeter in between. If the squeue_drain gets done (no more work 996 * left), it leaves the SQS_PROC set if poll thread is running. 997 */ 998 999 /* 1000 * This is the squeue poll thread. In poll mode, it polls the underlying 1001 * TCP softring and feeds packets into the squeue. The worker thread then 1002 * drains the squeue. The poll thread also responds to control signals for 1003 * quiesceing, restarting, or cleanup of an squeue. These are driven by 1004 * control operations like plumb/unplumb or as a result of dynamic Rx ring 1005 * related operations that are driven from the mac layer. 1006 */ 1007 static void 1008 squeue_polling_thread(squeue_t *sqp) 1009 { 1010 kmutex_t *lock = &sqp->sq_lock; 1011 kcondvar_t *async = &sqp->sq_poll_cv; 1012 ip_mac_rx_t sq_get_pkts; 1013 ip_accept_t ip_accept; 1014 ill_rx_ring_t *sq_rx_ring; 1015 ill_t *sq_ill; 1016 mblk_t *head, *tail, *mp; 1017 uint_t cnt; 1018 void *sq_mac_handle; 1019 callb_cpr_t cprinfo; 1020 size_t bytes_to_pickup; 1021 uint32_t ctl_state; 1022 1023 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_poll"); 1024 mutex_enter(lock); 1025 1026 for (;;) { 1027 CALLB_CPR_SAFE_BEGIN(&cprinfo); 1028 cv_wait(async, lock); 1029 CALLB_CPR_SAFE_END(&cprinfo, lock); 1030 1031 ctl_state = sqp->sq_state & (SQS_POLL_THR_CONTROL | 1032 SQS_POLL_THR_QUIESCED); 1033 if (ctl_state != 0) { 1034 /* 1035 * If the squeue is quiesced, then wait for a control 1036 * request. A quiesced squeue must not poll the 1037 * underlying soft ring. 1038 */ 1039 if (ctl_state == SQS_POLL_THR_QUIESCED) 1040 continue; 1041 /* 1042 * Act on control requests to quiesce, cleanup or 1043 * restart an squeue 1044 */ 1045 squeue_poll_thr_control(sqp); 1046 continue; 1047 } 1048 1049 if (!(sqp->sq_state & SQS_POLL_CAPAB)) 1050 continue; 1051 1052 ASSERT((sqp->sq_state & 1053 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) == 1054 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)); 1055 1056 poll_again: 1057 sq_rx_ring = sqp->sq_rx_ring; 1058 sq_get_pkts = sq_rx_ring->rr_rx; 1059 sq_mac_handle = sq_rx_ring->rr_rx_handle; 1060 ip_accept = sq_rx_ring->rr_ip_accept; 1061 sq_ill = sq_rx_ring->rr_ill; 1062 bytes_to_pickup = MAX_BYTES_TO_PICKUP; 1063 mutex_exit(lock); 1064 head = sq_get_pkts(sq_mac_handle, bytes_to_pickup); 1065 mp = NULL; 1066 if (head != NULL) { 1067 /* 1068 * We got the packet chain from the mac layer. It 1069 * would be nice to be able to process it inline 1070 * for better performance but we need to give 1071 * IP a chance to look at this chain to ensure 1072 * that packets are really meant for this squeue 1073 * and do the IP processing. 1074 */ 1075 mp = ip_accept(sq_ill, sq_rx_ring, sqp, head, 1076 &tail, &cnt); 1077 } 1078 mutex_enter(lock); 1079 if (mp != NULL) { 1080 /* 1081 * The ip_accept function has already added an 1082 * ip_recv_attr_t mblk if that is needed. 1083 */ 1084 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 1085 } 1086 ASSERT((sqp->sq_state & 1087 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) == 1088 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)); 1089 1090 if (sqp->sq_first != NULL && !(sqp->sq_state & SQS_WORKER)) { 1091 /* 1092 * We have packets to process and worker thread 1093 * is not running. Check to see if poll thread is 1094 * allowed to process. Let it do processing only if it 1095 * picked up some packets from the NIC otherwise 1096 * wakeup the worker thread. 1097 */ 1098 if (mp != NULL) { 1099 hrtime_t now; 1100 1101 now = gethrtime(); 1102 sqp->sq_run = curthread; 1103 sqp->sq_drain(sqp, SQS_POLL_PROC, now + 1104 squeue_drain_ns); 1105 sqp->sq_run = NULL; 1106 1107 if (sqp->sq_first == NULL) 1108 goto poll_again; 1109 1110 /* 1111 * Couldn't do the entire drain because the 1112 * time limit expired, let the 1113 * worker thread take over. 1114 */ 1115 } 1116 1117 sqp->sq_awaken = ddi_get_lbolt(); 1118 /* 1119 * Put the SQS_PROC_HELD on so the worker 1120 * thread can distinguish where its called from. We 1121 * can remove the SQS_PROC flag here and turn off the 1122 * polling so that it wouldn't matter who gets the 1123 * processing but we get better performance this way 1124 * and save the cost of turn polling off and possibly 1125 * on again as soon as we start draining again. 1126 * 1127 * We can't remove the SQS_PROC flag without turning 1128 * polling off until we can guarantee that control 1129 * will return to squeue_drain immediately. 1130 */ 1131 sqp->sq_state |= SQS_PROC_HELD; 1132 sqp->sq_state &= ~SQS_GET_PKTS; 1133 cv_signal(&sqp->sq_worker_cv); 1134 } else if (sqp->sq_first == NULL && 1135 !(sqp->sq_state & SQS_WORKER)) { 1136 /* 1137 * Nothing queued and worker thread not running. 1138 * Since we hold the proc, no other thread is 1139 * processing the squeue. This means that there 1140 * is no work to be done and nothing is queued 1141 * in squeue or in NIC. Turn polling off and go 1142 * back to interrupt mode. 1143 */ 1144 sqp->sq_state &= ~(SQS_PROC|SQS_GET_PKTS); 1145 /* LINTED: constant in conditional context */ 1146 SQS_POLLING_OFF(sqp, B_TRUE, sq_rx_ring); 1147 1148 /* 1149 * If there is a pending control operation 1150 * wake up the worker, since it is currently 1151 * not running. 1152 */ 1153 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) 1154 cv_signal(&sqp->sq_worker_cv); 1155 } else { 1156 /* 1157 * Worker thread is already running. We don't need 1158 * to do anything. Indicate that poll thread is done. 1159 */ 1160 sqp->sq_state &= ~SQS_GET_PKTS; 1161 } 1162 if (sqp->sq_state & SQS_POLL_THR_CONTROL) { 1163 /* 1164 * Act on control requests to quiesce, cleanup or 1165 * restart an squeue 1166 */ 1167 squeue_poll_thr_control(sqp); 1168 } 1169 } 1170 } 1171 1172 /* 1173 * The squeue worker thread acts on any control requests to quiesce, cleanup 1174 * or restart an ill_rx_ring_t by calling this function. The worker thread 1175 * synchronizes with the squeue poll thread to complete the request and finally 1176 * wakes up the requestor when the request is completed. 1177 */ 1178 static void 1179 squeue_worker_thr_control(squeue_t *sqp) 1180 { 1181 ill_t *ill; 1182 ill_rx_ring_t *rx_ring; 1183 1184 ASSERT(MUTEX_HELD(&sqp->sq_lock)); 1185 1186 if (sqp->sq_state & SQS_POLL_RESTART) { 1187 /* Restart implies a previous quiesce. */ 1188 ASSERT((sqp->sq_state & (SQS_PROC_HELD | 1189 SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)) == 1190 (SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)); 1191 /* 1192 * Request the squeue poll thread to restart and wait till 1193 * it actually restarts. 1194 */ 1195 sqp->sq_state &= ~SQS_POLL_QUIESCE_DONE; 1196 sqp->sq_state |= SQS_POLL_THR_RESTART; 1197 cv_signal(&sqp->sq_poll_cv); 1198 while (sqp->sq_state & SQS_POLL_THR_QUIESCED) 1199 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock); 1200 sqp->sq_state &= ~(SQS_POLL_RESTART | SQS_PROC | 1201 SQS_WORKER); 1202 /* 1203 * Signal any waiter that is waiting for the restart 1204 * to complete 1205 */ 1206 sqp->sq_state |= SQS_POLL_RESTART_DONE; 1207 cv_signal(&sqp->sq_ctrlop_done_cv); 1208 return; 1209 } 1210 1211 if (sqp->sq_state & SQS_PROC_HELD) { 1212 /* The squeue poll thread handed control to us */ 1213 ASSERT(sqp->sq_state & SQS_PROC); 1214 } 1215 1216 /* 1217 * Prevent any other thread from processing the squeue 1218 * until we finish the control actions by setting SQS_PROC. 1219 * But allow ourself to reenter by setting SQS_WORKER 1220 */ 1221 sqp->sq_state |= (SQS_PROC | SQS_WORKER); 1222 1223 /* Signal the squeue poll thread and wait for it to quiesce itself */ 1224 if (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) { 1225 sqp->sq_state |= SQS_POLL_THR_QUIESCE; 1226 cv_signal(&sqp->sq_poll_cv); 1227 while (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) 1228 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock); 1229 } 1230 1231 rx_ring = sqp->sq_rx_ring; 1232 ill = rx_ring->rr_ill; 1233 /* 1234 * The lock hierarchy is as follows. 1235 * cpu_lock -> ill_lock -> sqset_lock -> sq_lock 1236 */ 1237 mutex_exit(&sqp->sq_lock); 1238 mutex_enter(&ill->ill_lock); 1239 mutex_enter(&sqp->sq_lock); 1240 1241 SQS_POLLING_OFF(sqp, (sqp->sq_state & SQS_POLL_CAPAB) != 0, 1242 sqp->sq_rx_ring); 1243 sqp->sq_state &= ~(SQS_POLL_CAPAB | SQS_GET_PKTS | SQS_PROC_HELD); 1244 if (sqp->sq_state & SQS_POLL_CLEANUP) { 1245 /* 1246 * Disassociate this squeue from its ill_rx_ring_t. 1247 * The rr_sqp, sq_rx_ring fields are protected by the 1248 * corresponding squeue, ill_lock* and sq_lock. Holding any 1249 * of them will ensure that the ring to squeue mapping does 1250 * not change. 1251 */ 1252 ASSERT(!(sqp->sq_state & SQS_DEFAULT)); 1253 1254 sqp->sq_rx_ring = NULL; 1255 rx_ring->rr_sqp = NULL; 1256 1257 sqp->sq_state &= ~(SQS_POLL_CLEANUP | SQS_POLL_THR_QUIESCED | 1258 SQS_POLL_QUIESCE_DONE); 1259 sqp->sq_ill = NULL; 1260 1261 rx_ring->rr_rx_handle = NULL; 1262 rx_ring->rr_intr_handle = NULL; 1263 rx_ring->rr_intr_enable = NULL; 1264 rx_ring->rr_intr_disable = NULL; 1265 sqp->sq_state |= SQS_POLL_CLEANUP_DONE; 1266 } else { 1267 sqp->sq_state &= ~SQS_POLL_QUIESCE; 1268 sqp->sq_state |= SQS_POLL_QUIESCE_DONE; 1269 } 1270 /* 1271 * Signal any waiter that is waiting for the quiesce or cleanup 1272 * to complete and also wait for it to actually see and reset the 1273 * SQS_POLL_CLEANUP_DONE. 1274 */ 1275 cv_signal(&sqp->sq_ctrlop_done_cv); 1276 mutex_exit(&ill->ill_lock); 1277 if (sqp->sq_state & SQS_POLL_CLEANUP_DONE) { 1278 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock); 1279 sqp->sq_state &= ~(SQS_PROC | SQS_WORKER); 1280 } 1281 } 1282 1283 static void 1284 squeue_worker(squeue_t *sqp) 1285 { 1286 kmutex_t *lock = &sqp->sq_lock; 1287 kcondvar_t *async = &sqp->sq_worker_cv; 1288 callb_cpr_t cprinfo; 1289 hrtime_t now; 1290 1291 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_worker"); 1292 mutex_enter(lock); 1293 1294 for (;;) { 1295 for (;;) { 1296 /* 1297 * If the poll thread has handed control to us 1298 * we need to break out of the wait. 1299 */ 1300 if (sqp->sq_state & SQS_PROC_HELD) 1301 break; 1302 1303 /* 1304 * If the squeue is not being processed and we either 1305 * have messages to drain or some thread has signaled 1306 * some control activity we need to break 1307 */ 1308 if (!(sqp->sq_state & SQS_PROC) && 1309 ((sqp->sq_state & SQS_WORKER_THR_CONTROL) || 1310 (sqp->sq_first != NULL))) 1311 break; 1312 1313 /* 1314 * If we have started some control action, then check 1315 * for the SQS_WORKER flag (since we don't 1316 * release the squeue) to make sure we own the squeue 1317 * and break out 1318 */ 1319 if ((sqp->sq_state & SQS_WORKER_THR_CONTROL) && 1320 (sqp->sq_state & SQS_WORKER)) 1321 break; 1322 1323 CALLB_CPR_SAFE_BEGIN(&cprinfo); 1324 cv_wait(async, lock); 1325 CALLB_CPR_SAFE_END(&cprinfo, lock); 1326 } 1327 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) { 1328 squeue_worker_thr_control(sqp); 1329 continue; 1330 } 1331 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 1332 SQS_POLL_CLEANUP_DONE | SQS_POLL_QUIESCE_DONE | 1333 SQS_WORKER_THR_CONTROL | SQS_POLL_THR_CONTROL))); 1334 1335 if (sqp->sq_state & SQS_PROC_HELD) 1336 sqp->sq_state &= ~SQS_PROC_HELD; 1337 1338 now = gethrtime(); 1339 sqp->sq_run = curthread; 1340 sqp->sq_drain(sqp, SQS_WORKER, now + squeue_drain_ns); 1341 sqp->sq_run = NULL; 1342 } 1343 } 1344 1345 uintptr_t * 1346 squeue_getprivate(squeue_t *sqp, sqprivate_t p) 1347 { 1348 ASSERT(p < SQPRIVATE_MAX); 1349 1350 return (&sqp->sq_private[p]); 1351 } 1352 1353 /* ARGSUSED */ 1354 void 1355 squeue_wakeup_conn(void *arg, mblk_t *mp, void *arg2, ip_recv_attr_t *dummy) 1356 { 1357 conn_t *connp = (conn_t *)arg; 1358 squeue_t *sqp = connp->conn_sqp; 1359 1360 /* 1361 * Mark the squeue as paused before waking up the thread stuck 1362 * in squeue_synch_enter(). 1363 */ 1364 mutex_enter(&sqp->sq_lock); 1365 sqp->sq_state |= SQS_PAUSE; 1366 1367 /* 1368 * Notify the thread that it's OK to proceed; that is done by 1369 * clearing the MSGWAITSYNC flag. The synch thread will free the mblk. 1370 */ 1371 ASSERT(mp->b_flag & MSGWAITSYNC); 1372 mp->b_flag &= ~MSGWAITSYNC; 1373 cv_broadcast(&connp->conn_sq_cv); 1374 1375 /* 1376 * We are doing something on behalf of another thread, so we have to 1377 * pause and wait until it finishes. 1378 */ 1379 while (sqp->sq_state & SQS_PAUSE) { 1380 cv_wait(&sqp->sq_synch_cv, &sqp->sq_lock); 1381 } 1382 mutex_exit(&sqp->sq_lock); 1383 } 1384 1385 int 1386 squeue_synch_enter(conn_t *connp, mblk_t *use_mp) 1387 { 1388 squeue_t *sqp; 1389 1390 again: 1391 sqp = connp->conn_sqp; 1392 1393 mutex_enter(&sqp->sq_lock); 1394 if (sqp->sq_first == NULL && !(sqp->sq_state & SQS_PROC)) { 1395 /* 1396 * We are OK to proceed if the squeue is empty, and 1397 * no one owns the squeue. 1398 * 1399 * The caller won't own the squeue as this is called from the 1400 * application. 1401 */ 1402 ASSERT(sqp->sq_run == NULL); 1403 1404 sqp->sq_state |= SQS_PROC; 1405 sqp->sq_run = curthread; 1406 mutex_exit(&sqp->sq_lock); 1407 1408 /* 1409 * Handle squeue switching. The conn's squeue can only change 1410 * while there is a thread in the squeue, which is why we do 1411 * the check after entering the squeue. If it has changed, exit 1412 * this squeue and redo everything with the new sqeueue. 1413 */ 1414 if (sqp != connp->conn_sqp) { 1415 mutex_enter(&sqp->sq_lock); 1416 sqp->sq_state &= ~SQS_PROC; 1417 sqp->sq_run = NULL; 1418 mutex_exit(&sqp->sq_lock); 1419 goto again; 1420 } 1421 #if SQUEUE_DEBUG 1422 sqp->sq_curmp = NULL; 1423 sqp->sq_curproc = NULL; 1424 sqp->sq_connp = connp; 1425 #endif 1426 connp->conn_on_sqp = B_TRUE; 1427 return (0); 1428 } else { 1429 mblk_t *mp; 1430 1431 mp = (use_mp == NULL) ? allocb(0, BPRI_MED) : use_mp; 1432 if (mp == NULL) { 1433 mutex_exit(&sqp->sq_lock); 1434 return (ENOMEM); 1435 } 1436 1437 /* 1438 * We mark the mblk as awaiting synchronous squeue access 1439 * by setting the MSGWAITSYNC flag. Once squeue_wakeup_conn 1440 * fires, MSGWAITSYNC is cleared, at which point we know we 1441 * have exclusive access. 1442 */ 1443 mp->b_flag |= MSGWAITSYNC; 1444 1445 CONN_INC_REF(connp); 1446 SET_SQUEUE(mp, squeue_wakeup_conn, connp); 1447 ENQUEUE_CHAIN(sqp, mp, mp, 1); 1448 1449 ASSERT(sqp->sq_run != curthread); 1450 1451 /* Wait until the enqueued mblk get processed. */ 1452 while (mp->b_flag & MSGWAITSYNC) 1453 cv_wait(&connp->conn_sq_cv, &sqp->sq_lock); 1454 mutex_exit(&sqp->sq_lock); 1455 1456 if (use_mp == NULL) 1457 freeb(mp); 1458 1459 return (0); 1460 } 1461 } 1462 1463 void 1464 squeue_synch_exit(conn_t *connp) 1465 { 1466 squeue_t *sqp = connp->conn_sqp; 1467 1468 mutex_enter(&sqp->sq_lock); 1469 if (sqp->sq_run == curthread) { 1470 ASSERT(sqp->sq_state & SQS_PROC); 1471 1472 sqp->sq_state &= ~SQS_PROC; 1473 sqp->sq_run = NULL; 1474 connp->conn_on_sqp = B_FALSE; 1475 1476 if (sqp->sq_first == NULL) { 1477 mutex_exit(&sqp->sq_lock); 1478 } else { 1479 /* 1480 * If this was a normal thread, then it would 1481 * (most likely) continue processing the pending 1482 * requests. Since the just completed operation 1483 * was executed synchronously, the thread should 1484 * not be delayed. To compensate, wake up the 1485 * worker thread right away when there are outstanding 1486 * requests. 1487 */ 1488 sqp->sq_awaken = ddi_get_lbolt(); 1489 cv_signal(&sqp->sq_worker_cv); 1490 mutex_exit(&sqp->sq_lock); 1491 } 1492 } else { 1493 /* 1494 * The caller doesn't own the squeue, clear the SQS_PAUSE flag, 1495 * and wake up the squeue owner, such that owner can continue 1496 * processing. 1497 */ 1498 ASSERT(sqp->sq_state & SQS_PAUSE); 1499 sqp->sq_state &= ~SQS_PAUSE; 1500 1501 /* There should be only one thread blocking on sq_synch_cv. */ 1502 cv_signal(&sqp->sq_synch_cv); 1503 mutex_exit(&sqp->sq_lock); 1504 } 1505 } 1506