1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. 23 */ 24 25 /* 26 * Copyright 2017 Joyent, Inc. 27 * Copyright 2026 Oxide Computer Company 28 */ 29 30 /* 31 * Squeues: General purpose serialization mechanism 32 * ------------------------------------------------ 33 * 34 * Background: 35 * ----------- 36 * 37 * This is a general purpose high-performance serialization mechanism 38 * currently used by TCP/IP. It is implement by means of a per CPU queue, 39 * a worker thread and a polling thread with are bound to the CPU 40 * associated with the squeue. The squeue is strictly FIFO for both read 41 * and write side and only one thread can process it at any given time. 42 * The design goal of squeue was to offer a very high degree of 43 * parallelization (on a per H/W execution pipeline basis) with at 44 * most one queuing. 45 * 46 * The modules needing protection typically calls SQUEUE_ENTER_ONE() or 47 * SQUEUE_ENTER() macro as soon as a thread enter the module 48 * from either direction. For each packet, the processing function 49 * and argument is stored in the mblk itself. When the packet is ready 50 * to be processed, the squeue retrieves the stored function and calls 51 * it with the supplied argument and the pointer to the packet itself. 52 * The called function can assume that no other thread is processing 53 * the squeue when it is executing. 54 * 55 * Squeue/connection binding: 56 * -------------------------- 57 * 58 * TCP/IP uses an IP classifier in conjunction with squeue where specific 59 * connections are assigned to specific squeue (based on various policies), 60 * at the connection creation time. Once assigned, the connection to 61 * squeue mapping is never changed and all future packets for that 62 * connection are processed on that squeue. The connection ("conn") to 63 * squeue mapping is stored in "conn_t" member "conn_sqp". 64 * 65 * Since the processing of the connection cuts across multiple layers 66 * but still allows packets for different connnection to be processed on 67 * other CPU/squeues, squeues are also termed as "Vertical Perimeter" or 68 * "Per Connection Vertical Perimeter". 69 * 70 * Processing Model: 71 * ----------------- 72 * 73 * Squeue doesn't necessary processes packets with its own worker thread. 74 * The callers can pick if they just want to queue the packet, process 75 * their packet if nothing is queued or drain and process. The first two 76 * modes are typically employed when the packet was generated while 77 * already doing the processing behind the squeue and last mode (drain 78 * and process) is typically employed when the thread is entering squeue 79 * for the first time. The squeue still imposes a finite time limit 80 * for which a external thread can do processing after which it switches 81 * processing to its own worker thread. 82 * 83 * Once created, squeues are never deleted. Hence squeue pointers are 84 * always valid. This means that functions outside the squeue can still 85 * refer safely to conn_sqp and their is no need for ref counts. 86 * 87 * Only a thread executing in the squeue can change the squeue of the 88 * connection. It does so by calling a squeue framework function to do this. 89 * After changing the squeue, the thread must leave the squeue. It must not 90 * continue to execute any code that needs squeue protection. 91 * 92 * The squeue framework, after entering the squeue, checks if the current 93 * squeue matches the conn_sqp. If the check fails, the packet is delivered 94 * to right squeue. 95 * 96 * Polling Model: 97 * -------------- 98 * 99 * Squeues can control the rate of packet arrival into itself from the 100 * NIC or specific Rx ring within a NIC. As part of capability negotiation 101 * between IP and MAC layer, squeue are created for each TCP soft ring 102 * (or TCP Rx ring - to be implemented in future). As part of this 103 * negotiation, squeues get a cookie for underlying soft ring or Rx 104 * ring, a function to turn off incoming packets and a function to call 105 * to poll for packets. This helps schedule the receive side packet 106 * processing so that queue backlog doesn't build up and packet processing 107 * doesn't keep getting disturbed by high priority interrupts. As part 108 * of this mode, as soon as a backlog starts building, squeue turns off 109 * the interrupts and switches to poll mode. In poll mode, when poll 110 * thread goes down to retrieve packets, it retrieves them in the form of 111 * a chain which improves performance even more. As the squeue/softring 112 * system gets more packets, it gets more efficient by switching to 113 * polling more often and dealing with larger packet chains. 114 * 115 */ 116 117 #include <sys/types.h> 118 #include <sys/cmn_err.h> 119 #include <sys/debug.h> 120 #include <sys/kmem.h> 121 #include <sys/cpuvar.h> 122 #include <sys/condvar_impl.h> 123 #include <sys/systm.h> 124 #include <sys/callb.h> 125 #include <sys/sdt.h> 126 #include <sys/ddi.h> 127 #include <sys/sunddi.h> 128 #include <sys/stack.h> 129 #include <sys/archsystm.h> 130 131 #include <inet/ipclassifier.h> 132 #include <inet/udp_impl.h> 133 134 #include <sys/squeue_impl.h> 135 136 static void squeue_drain(squeue_t *, uint_t, hrtime_t); 137 static void squeue_worker(squeue_t *sqp); 138 static void squeue_polling_thread(squeue_t *sqp); 139 static void squeue_worker_wakeup(squeue_t *sqp); 140 static void squeue_try_drain_one(squeue_t *, conn_t *); 141 142 kmem_cache_t *squeue_cache; 143 144 #define SQUEUE_MSEC_TO_NSEC 1000000 145 146 int squeue_drain_ms = 20; 147 148 /* The values above converted to ticks or nano seconds */ 149 static uint_t squeue_drain_ns = 0; 150 151 uintptr_t squeue_drain_stack_needed = 10240; 152 uint_t squeue_drain_stack_toodeep; 153 154 /* 155 * The number of bytes the squeue is allowed to poll from the softring in a 156 * single read. The accounting is done on a per-mblk basis, so the squeue may 157 * poll one mblk/MTU worth of data over the limit. 158 */ 159 size_t squeue_poll_budget_bytes = 150000; 160 161 #define ENQUEUE_CHAIN(sqp, mp, tail, cnt) { \ 162 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 163 \ 164 if ((sqp)->sq_last != NULL) \ 165 (sqp)->sq_last->b_next = (mp); \ 166 else \ 167 (sqp)->sq_first = (mp); \ 168 (sqp)->sq_last = (tail); \ 169 (sqp)->sq_count += (cnt); \ 170 ASSERT((sqp)->sq_count > 0); \ 171 DTRACE_PROBE4(squeue__enqueuechain, squeue_t *, sqp, \ 172 mblk_t *, mp, mblk_t *, tail, int, cnt); \ 173 \ 174 } 175 176 /* 177 * Blank the receive ring (in this case it is the soft ring). When 178 * blanked, the soft ring will not send any more packets up. 179 * Blanking may not succeed when there is a CPU already in the soft 180 * ring sending packets up. In that case, SQS_POLLING will not be 181 * set. 182 */ 183 #define SQS_POLLING_ON(sqp, sq_poll_capable, rx_ring) { \ 184 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 185 if (sq_poll_capable) { \ 186 ASSERT(rx_ring != NULL); \ 187 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \ 188 if (!(sqp->sq_state & SQS_POLLING)) { \ 189 if (rx_ring->rr_intr_disable(rx_ring->rr_intr_handle)) \ 190 sqp->sq_state |= SQS_POLLING; \ 191 } \ 192 } \ 193 } 194 195 #define SQS_POLLING_OFF(sqp, sq_poll_capable, rx_ring) { \ 196 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 197 if (sq_poll_capable) { \ 198 ASSERT(rx_ring != NULL); \ 199 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \ 200 if (sqp->sq_state & SQS_POLLING) { \ 201 sqp->sq_state &= ~SQS_POLLING; \ 202 rx_ring->rr_intr_enable(rx_ring->rr_intr_handle); \ 203 } \ 204 } \ 205 } 206 207 /* Wakeup poll thread only if SQS_POLLING is set */ 208 #define SQS_POLL_RING(sqp) { \ 209 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 210 if (sqp->sq_state & SQS_POLLING) { \ 211 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \ 212 if (!(sqp->sq_state & SQS_GET_PKTS)) { \ 213 sqp->sq_state |= SQS_GET_PKTS; \ 214 cv_signal(&sqp->sq_poll_cv); \ 215 } \ 216 } \ 217 } 218 219 #ifdef DEBUG 220 #define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) { \ 221 (sqp)->sq_curmp = (mp); \ 222 (sqp)->sq_curproc = (proc); \ 223 (sqp)->sq_connp = (connp); \ 224 (mp)->b_tag = (sqp)->sq_tag = (tag); \ 225 } 226 227 #define SQUEUE_DBG_CLEAR(sqp) { \ 228 (sqp)->sq_curmp = NULL; \ 229 (sqp)->sq_curproc = NULL; \ 230 (sqp)->sq_connp = NULL; \ 231 } 232 #else 233 #define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) 234 #define SQUEUE_DBG_CLEAR(sqp) 235 #endif 236 237 void 238 squeue_init(void) 239 { 240 squeue_cache = kmem_cache_create("squeue_cache", 241 sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0); 242 243 squeue_drain_ns = squeue_drain_ms * SQUEUE_MSEC_TO_NSEC; 244 } 245 246 squeue_t * 247 squeue_create(pri_t pri) 248 { 249 squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP); 250 251 bzero(sqp, sizeof (squeue_t)); 252 sqp->sq_bind = PBIND_NONE; 253 sqp->sq_priority = pri; 254 sqp->sq_worker = thread_create(NULL, 0, squeue_worker, 255 sqp, 0, &p0, TS_RUN, pri); 256 257 sqp->sq_poll_thr = thread_create(NULL, 0, squeue_polling_thread, 258 sqp, 0, &p0, TS_RUN, pri); 259 260 sqp->sq_enter = squeue_enter; 261 sqp->sq_drain = squeue_drain; 262 263 return (sqp); 264 } 265 266 /* 267 * Bind squeue worker thread to the specified CPU, given by CPU id. 268 * If the CPU id value is -1, bind the worker thread to the value 269 * specified in sq_bind field. If a thread is already bound to a 270 * different CPU, unbind it from the old CPU and bind to the new one. 271 */ 272 273 void 274 squeue_bind(squeue_t *sqp, processorid_t bind) 275 { 276 mutex_enter(&sqp->sq_lock); 277 ASSERT(sqp->sq_bind != PBIND_NONE || bind != PBIND_NONE); 278 ASSERT(MUTEX_HELD(&cpu_lock)); 279 280 if (sqp->sq_state & SQS_BOUND) { 281 if (sqp->sq_bind == bind) { 282 mutex_exit(&sqp->sq_lock); 283 return; 284 } 285 thread_affinity_clear(sqp->sq_worker); 286 } else { 287 sqp->sq_state |= SQS_BOUND; 288 } 289 290 if (bind != PBIND_NONE) 291 sqp->sq_bind = bind; 292 293 thread_affinity_set(sqp->sq_worker, sqp->sq_bind); 294 mutex_exit(&sqp->sq_lock); 295 } 296 297 void 298 squeue_unbind(squeue_t *sqp) 299 { 300 mutex_enter(&sqp->sq_lock); 301 if (!(sqp->sq_state & SQS_BOUND)) { 302 mutex_exit(&sqp->sq_lock); 303 return; 304 } 305 306 sqp->sq_state &= ~SQS_BOUND; 307 thread_affinity_clear(sqp->sq_worker); 308 mutex_exit(&sqp->sq_lock); 309 } 310 311 /* 312 * squeue_enter() - enter squeue sqp with mblk mp (which can be 313 * a chain), while tail points to the end and cnt in number of 314 * mblks in the chain. 315 * 316 * For a chain of single packet (i.e. mp == tail), go through the 317 * fast path if no one is processing the squeue and nothing is queued. 318 * 319 * The proc and arg for each mblk is already stored in the mblk in 320 * appropriate places. 321 * 322 * The process_flag specifies if we are allowed to process the mblk 323 * and drain in the entering thread context. If process_flag is 324 * SQ_FILL, then we just queue the mblk and return (after signaling 325 * the worker thread if no one else is processing the squeue). 326 * 327 * The ira argument can be used when the count is one. 328 * For a chain the caller needs to prepend any needed mblks from 329 * ip_recv_attr_to_mblk(). 330 */ 331 /* ARGSUSED */ 332 void 333 squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt, 334 ip_recv_attr_t *ira, int process_flag, uint8_t tag) 335 { 336 conn_t *connp; 337 sqproc_t proc; 338 hrtime_t now; 339 340 ASSERT(sqp != NULL); 341 ASSERT(mp != NULL); 342 ASSERT(tail != NULL); 343 ASSERT(cnt > 0); 344 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 345 ASSERT(ira == NULL || cnt == 1); 346 347 mutex_enter(&sqp->sq_lock); 348 349 /* 350 * Try to process the packet if SQ_FILL flag is not set and 351 * we are allowed to process the squeue. The SQ_NODRAIN is 352 * ignored if the packet chain consists of more than 1 packet. 353 */ 354 if (!(sqp->sq_state & SQS_PROC) && ((process_flag == SQ_PROCESS) || 355 (process_flag == SQ_NODRAIN && sqp->sq_first == NULL))) { 356 /* 357 * See if anything is already queued. If we are the 358 * first packet, do inline processing else queue the 359 * packet and do the drain. 360 */ 361 if (sqp->sq_first == NULL && cnt == 1) { 362 /* 363 * Fast-path, ok to process and nothing queued. 364 */ 365 sqp->sq_state |= (SQS_PROC|SQS_FAST); 366 sqp->sq_run = curthread; 367 mutex_exit(&sqp->sq_lock); 368 369 /* 370 * We are the chain of 1 packet so 371 * go through this fast path. 372 */ 373 ASSERT(mp->b_prev != NULL); 374 ASSERT(mp->b_queue != NULL); 375 connp = (conn_t *)mp->b_prev; 376 mp->b_prev = NULL; 377 proc = (sqproc_t)mp->b_queue; 378 mp->b_queue = NULL; 379 ASSERT(proc != NULL && connp != NULL); 380 ASSERT(mp->b_next == NULL); 381 382 /* 383 * Handle squeue switching. More details in the 384 * block comment at the top of the file 385 */ 386 if (connp->conn_sqp == sqp) { 387 SQUEUE_DBG_SET(sqp, mp, proc, connp, 388 tag); 389 connp->conn_on_sqp = B_TRUE; 390 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 391 sqp, mblk_t *, mp, conn_t *, connp); 392 (*proc)(connp, mp, sqp, ira); 393 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 394 sqp, conn_t *, connp); 395 connp->conn_on_sqp = B_FALSE; 396 SQUEUE_DBG_CLEAR(sqp); 397 CONN_DEC_REF(connp); 398 } else { 399 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, 400 connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE); 401 } 402 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 403 mutex_enter(&sqp->sq_lock); 404 sqp->sq_state &= ~(SQS_PROC|SQS_FAST); 405 sqp->sq_run = NULL; 406 if (sqp->sq_first == NULL || 407 process_flag == SQ_NODRAIN) { 408 /* 409 * Even if SQ_NODRAIN was specified, it may 410 * still be best to process a single queued 411 * item if it matches the active connection. 412 */ 413 if (sqp->sq_first != NULL) { 414 squeue_try_drain_one(sqp, connp); 415 } 416 417 /* 418 * If work or control actions are pending, wake 419 * up the worker thread. 420 */ 421 if (sqp->sq_first != NULL || 422 sqp->sq_state & SQS_WORKER_THR_CONTROL) { 423 squeue_worker_wakeup(sqp); 424 } 425 mutex_exit(&sqp->sq_lock); 426 return; 427 } 428 } else { 429 if (ira != NULL) { 430 mblk_t *attrmp; 431 432 ASSERT(cnt == 1); 433 attrmp = ip_recv_attr_to_mblk(ira); 434 if (attrmp == NULL) { 435 mutex_exit(&sqp->sq_lock); 436 ip_drop_input("squeue: " 437 "ip_recv_attr_to_mblk", 438 mp, NULL); 439 /* Caller already set b_prev/b_next */ 440 mp->b_prev = mp->b_next = NULL; 441 freemsg(mp); 442 return; 443 } 444 ASSERT(attrmp->b_cont == NULL); 445 attrmp->b_cont = mp; 446 /* Move connp and func to new */ 447 attrmp->b_queue = mp->b_queue; 448 mp->b_queue = NULL; 449 attrmp->b_prev = mp->b_prev; 450 mp->b_prev = NULL; 451 452 ASSERT(mp == tail); 453 tail = mp = attrmp; 454 } 455 456 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 457 #ifdef DEBUG 458 mp->b_tag = tag; 459 #endif 460 } 461 /* 462 * We are here because either we couldn't do inline 463 * processing (because something was already queued), 464 * or we had a chain of more than one packet, 465 * or something else arrived after we were done with 466 * inline processing. 467 */ 468 ASSERT(MUTEX_HELD(&sqp->sq_lock)); 469 ASSERT(sqp->sq_first != NULL); 470 now = gethrtime(); 471 sqp->sq_run = curthread; 472 sqp->sq_drain(sqp, SQS_ENTER, now + squeue_drain_ns); 473 474 /* 475 * If we didn't do a complete drain, the worker 476 * thread was already signalled by squeue_drain. 477 * In case any control actions are pending, wake 478 * up the worker. 479 */ 480 sqp->sq_run = NULL; 481 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) { 482 squeue_worker_wakeup(sqp); 483 } 484 } else { 485 /* 486 * We let a thread processing a squeue reenter only 487 * once. This helps the case of incoming connection 488 * where a SYN-ACK-ACK that triggers the conn_ind 489 * doesn't have to queue the packet if listener and 490 * eager are on the same squeue. Also helps the 491 * loopback connection where the two ends are bound 492 * to the same squeue (which is typical on single 493 * CPU machines). 494 * 495 * We let the thread reenter only once for the fear 496 * of stack getting blown with multiple traversal. 497 */ 498 connp = (conn_t *)mp->b_prev; 499 if (!(sqp->sq_state & SQS_REENTER) && 500 (process_flag != SQ_FILL) && (sqp->sq_first == NULL) && 501 (sqp->sq_run == curthread) && (cnt == 1) && 502 (connp->conn_on_sqp == B_FALSE)) { 503 sqp->sq_state |= SQS_REENTER; 504 mutex_exit(&sqp->sq_lock); 505 506 ASSERT(mp->b_prev != NULL); 507 ASSERT(mp->b_queue != NULL); 508 509 mp->b_prev = NULL; 510 proc = (sqproc_t)mp->b_queue; 511 mp->b_queue = NULL; 512 513 /* 514 * Handle squeue switching. More details in the 515 * block comment at the top of the file 516 */ 517 if (connp->conn_sqp == sqp) { 518 connp->conn_on_sqp = B_TRUE; 519 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 520 sqp, mblk_t *, mp, conn_t *, connp); 521 (*proc)(connp, mp, sqp, ira); 522 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 523 sqp, conn_t *, connp); 524 connp->conn_on_sqp = B_FALSE; 525 CONN_DEC_REF(connp); 526 } else { 527 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, 528 connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE); 529 } 530 531 mutex_enter(&sqp->sq_lock); 532 sqp->sq_state &= ~SQS_REENTER; 533 mutex_exit(&sqp->sq_lock); 534 return; 535 } 536 537 /* 538 * Queue is already being processed or there is already 539 * one or more paquets on the queue. Enqueue the 540 * packet and wakeup the squeue worker thread if the 541 * squeue is not being processed. 542 */ 543 #ifdef DEBUG 544 mp->b_tag = tag; 545 #endif 546 if (ira != NULL) { 547 mblk_t *attrmp; 548 549 ASSERT(cnt == 1); 550 attrmp = ip_recv_attr_to_mblk(ira); 551 if (attrmp == NULL) { 552 mutex_exit(&sqp->sq_lock); 553 ip_drop_input("squeue: ip_recv_attr_to_mblk", 554 mp, NULL); 555 /* Caller already set b_prev/b_next */ 556 mp->b_prev = mp->b_next = NULL; 557 freemsg(mp); 558 return; 559 } 560 ASSERT(attrmp->b_cont == NULL); 561 attrmp->b_cont = mp; 562 /* Move connp and func to new */ 563 attrmp->b_queue = mp->b_queue; 564 mp->b_queue = NULL; 565 attrmp->b_prev = mp->b_prev; 566 mp->b_prev = NULL; 567 568 ASSERT(mp == tail); 569 tail = mp = attrmp; 570 } 571 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 572 /* 573 * If the worker isn't running or control actions are pending, 574 * wake it it up now. 575 */ 576 if ((sqp->sq_state & SQS_PROC) == 0 || 577 (sqp->sq_state & SQS_WORKER_THR_CONTROL) != 0) { 578 squeue_worker_wakeup(sqp); 579 } 580 } 581 mutex_exit(&sqp->sq_lock); 582 } 583 584 /* 585 * PRIVATE FUNCTIONS 586 */ 587 588 589 /* 590 * Wake up worker thread for squeue to process queued work. 591 */ 592 static void 593 squeue_worker_wakeup(squeue_t *sqp) 594 { 595 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); 596 597 cv_signal(&sqp->sq_worker_cv); 598 sqp->sq_awoken = gethrtime(); 599 } 600 601 static void 602 squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire) 603 { 604 mblk_t *mp; 605 mblk_t *head; 606 sqproc_t proc; 607 conn_t *connp; 608 ill_rx_ring_t *sq_rx_ring = sqp->sq_rx_ring; 609 hrtime_t now; 610 boolean_t sq_poll_capable; 611 ip_recv_attr_t *ira, iras; 612 613 /* 614 * Before doing any work, check our stack depth; if we're not a 615 * worker thread for this squeue and we're beginning to get tight on 616 * on stack, kick the worker, bump a counter and return. 617 */ 618 if (proc_type != SQS_WORKER && STACK_BIAS + (uintptr_t)getfp() - 619 (uintptr_t)curthread->t_stkbase < squeue_drain_stack_needed) { 620 ASSERT(mutex_owned(&sqp->sq_lock)); 621 squeue_worker_wakeup(sqp); 622 squeue_drain_stack_toodeep++; 623 return; 624 } 625 626 sq_poll_capable = (sqp->sq_state & SQS_POLL_CAPAB) != 0; 627 again: 628 ASSERT(mutex_owned(&sqp->sq_lock)); 629 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 630 SQS_POLL_QUIESCE_DONE))); 631 632 head = sqp->sq_first; 633 sqp->sq_first = NULL; 634 sqp->sq_last = NULL; 635 sqp->sq_count = 0; 636 637 sqp->sq_state |= SQS_PROC | proc_type; 638 639 /* 640 * We have backlog built up. Switch to polling mode if the 641 * device underneath allows it. Need to do it so that 642 * more packets don't come in and disturb us (by contending 643 * for sq_lock or higher priority thread preempting us). 644 * 645 * The worker thread is allowed to do active polling while we 646 * just disable the interrupts for drain by non worker (kernel 647 * or userland) threads so they can peacefully process the 648 * packets during time allocated to them. 649 */ 650 SQS_POLLING_ON(sqp, sq_poll_capable, sq_rx_ring); 651 mutex_exit(&sqp->sq_lock); 652 653 while ((mp = head) != NULL) { 654 655 head = mp->b_next; 656 mp->b_next = NULL; 657 658 proc = (sqproc_t)mp->b_queue; 659 mp->b_queue = NULL; 660 connp = (conn_t *)mp->b_prev; 661 mp->b_prev = NULL; 662 663 /* Is there an ip_recv_attr_t to handle? */ 664 if (ip_recv_attr_is_mblk(mp)) { 665 mblk_t *attrmp = mp; 666 667 ASSERT(attrmp->b_cont != NULL); 668 669 mp = attrmp->b_cont; 670 attrmp->b_cont = NULL; 671 ASSERT(mp->b_queue == NULL); 672 ASSERT(mp->b_prev == NULL); 673 674 if (!ip_recv_attr_from_mblk(attrmp, &iras)) { 675 /* The ill or ip_stack_t disappeared on us */ 676 ip_drop_input("ip_recv_attr_from_mblk", 677 mp, NULL); 678 ira_cleanup(&iras, B_TRUE); 679 CONN_DEC_REF(connp); 680 continue; 681 } 682 ira = &iras; 683 } else { 684 ira = NULL; 685 } 686 687 688 /* 689 * Handle squeue switching. More details in the 690 * block comment at the top of the file 691 */ 692 if (connp->conn_sqp == sqp) { 693 SQUEUE_DBG_SET(sqp, mp, proc, connp, 694 mp->b_tag); 695 connp->conn_on_sqp = B_TRUE; 696 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 697 sqp, mblk_t *, mp, conn_t *, connp); 698 (*proc)(connp, mp, sqp, ira); 699 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 700 sqp, conn_t *, connp); 701 connp->conn_on_sqp = B_FALSE; 702 CONN_DEC_REF(connp); 703 } else { 704 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, connp, ira, 705 SQ_FILL, SQTAG_SQUEUE_CHANGE); 706 } 707 if (ira != NULL) 708 ira_cleanup(ira, B_TRUE); 709 } 710 711 SQUEUE_DBG_CLEAR(sqp); 712 713 mutex_enter(&sqp->sq_lock); 714 715 /* 716 * Check if there is still work to do (either more arrived or timer 717 * expired). If we are the worker thread and we are polling capable, 718 * continue doing the work since no one else is around to do the 719 * work anyway (but signal the poll thread to retrieve some packets 720 * in the meanwhile). If we are not the worker thread, just 721 * signal the worker thread to take up the work if processing time 722 * has expired. 723 */ 724 if (sqp->sq_first != NULL) { 725 /* 726 * Still more to process. If time quanta not expired, we 727 * should let the drain go on. The worker thread is allowed 728 * to drain as long as there is anything left. 729 */ 730 now = gethrtime(); 731 if ((now < expire) || (proc_type == SQS_WORKER)) { 732 /* 733 * If time not expired or we are worker thread and 734 * this squeue is polling capable, continue to do 735 * the drain. 736 * 737 * We turn off interrupts for all userland threads 738 * doing drain but we do active polling only for 739 * worker thread. 740 * 741 * Calling SQS_POLL_RING() even in the case of 742 * SQS_POLLING_ON() not succeeding is ok as 743 * SQS_POLL_RING() will not wake up poll thread 744 * if SQS_POLLING bit is not set. 745 */ 746 if (proc_type == SQS_WORKER) 747 SQS_POLL_RING(sqp); 748 goto again; 749 } 750 751 squeue_worker_wakeup(sqp); 752 } 753 754 /* 755 * If the poll thread is already running, just return. The 756 * poll thread continues to hold the proc and will finish 757 * processing. 758 */ 759 if (sqp->sq_state & SQS_GET_PKTS) { 760 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 761 SQS_POLL_QUIESCE_DONE))); 762 sqp->sq_state &= ~proc_type; 763 return; 764 } 765 766 /* 767 * 768 * If we are the worker thread and no work is left, send the poll 769 * thread down once more to see if something arrived. Otherwise, 770 * turn the interrupts back on and we are done. 771 */ 772 if ((proc_type == SQS_WORKER) && (sqp->sq_state & SQS_POLLING)) { 773 /* 774 * Do one last check to see if anything arrived 775 * in the NIC. We leave the SQS_PROC set to ensure 776 * that poll thread keeps the PROC and can decide 777 * if it needs to turn polling off or continue 778 * processing. 779 * 780 * If we drop the SQS_PROC here and poll thread comes 781 * up empty handed, it can not safely turn polling off 782 * since someone else could have acquired the PROC 783 * and started draining. The previously running poll 784 * thread and the current thread doing drain would end 785 * up in a race for turning polling on/off and more 786 * complex code would be required to deal with it. 787 * 788 * Its lot simpler for drain to hand the SQS_PROC to 789 * poll thread (if running) and let poll thread finish 790 * without worrying about racing with any other thread. 791 */ 792 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 793 SQS_POLL_QUIESCE_DONE))); 794 SQS_POLL_RING(sqp); 795 sqp->sq_state &= ~proc_type; 796 } else { 797 /* 798 * The squeue is either not capable of polling or the 799 * attempt to blank (i.e., turn SQS_POLLING_ON()) was 800 * unsuccessful or poll thread already finished 801 * processing and didn't find anything. Since there 802 * is nothing queued and we already turn polling on 803 * (for all threads doing drain), we should turn 804 * polling off and relinquish the PROC. 805 */ 806 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 807 SQS_POLL_QUIESCE_DONE))); 808 SQS_POLLING_OFF(sqp, sq_poll_capable, sq_rx_ring); 809 sqp->sq_state &= ~(SQS_PROC | proc_type); 810 /* 811 * If we are not the worker and there is a pending quiesce 812 * event, wake up the worker 813 */ 814 if ((proc_type != SQS_WORKER) && 815 (sqp->sq_state & SQS_WORKER_THR_CONTROL)) { 816 squeue_worker_wakeup(sqp); 817 } 818 } 819 } 820 821 /* 822 * Quiesce, Restart, or Cleanup of the squeue poll thread. 823 * 824 * Quiesce and Restart: After an squeue poll thread has been quiesced, it does 825 * not attempt to poll the underlying soft ring any more. The quiesce is 826 * triggered by the mac layer when it wants to quiesce a soft ring. Typically 827 * control operations such as changing the fanout of a NIC or VNIC (dladm 828 * setlinkprop) need to quiesce data flow before changing the wiring. 829 * The operation is done by the mac layer, but it calls back into IP to 830 * quiesce the soft ring. After completing the operation (say increase or 831 * decrease of the fanout) the mac layer then calls back into IP to restart 832 * the quiesced soft ring. 833 * 834 * Cleanup: This is triggered when the squeue binding to a soft ring is 835 * removed permanently. Typically interface plumb and unplumb would trigger 836 * this. It can also be triggered from the mac layer when a soft ring is 837 * being deleted say as the result of a fanout reduction. Since squeues are 838 * never deleted, the cleanup marks the squeue as fit for recycling and 839 * moves it to the zeroth squeue set. 840 */ 841 static void 842 squeue_poll_thr_control(squeue_t *sqp) 843 { 844 if (sqp->sq_state & SQS_POLL_THR_RESTART) { 845 /* Restart implies a previous quiesce */ 846 ASSERT(sqp->sq_state & SQS_POLL_THR_QUIESCED); 847 sqp->sq_state &= ~(SQS_POLL_THR_QUIESCED | 848 SQS_POLL_THR_RESTART); 849 sqp->sq_state |= SQS_POLL_CAPAB; 850 cv_signal(&sqp->sq_worker_cv); 851 return; 852 } 853 854 if (sqp->sq_state & SQS_POLL_THR_QUIESCE) { 855 sqp->sq_state |= SQS_POLL_THR_QUIESCED; 856 sqp->sq_state &= ~SQS_POLL_THR_QUIESCE; 857 cv_signal(&sqp->sq_worker_cv); 858 return; 859 } 860 } 861 862 /* 863 * POLLING Notes 864 * 865 * With polling mode, we want to do as much processing as we possibly can 866 * in worker thread context. The sweet spot is worker thread keeps doing 867 * work all the time in polling mode and writers etc. keep dumping packets 868 * to worker thread. Occassionally, we send the poll thread (running at 869 * lower priority to NIC to get the chain of packets to feed to worker). 870 * Sending the poll thread down to NIC is dependant on 3 criterions 871 * 872 * 1) Its always driven from squeue_drain and only if worker thread is 873 * doing the drain. 874 * 2) We clear the backlog once and more packets arrived in between. 875 * Before starting drain again, send the poll thread down if 876 * the drain is being done by worker thread. 877 * 3) Before exiting the squeue_drain, if the poll thread is not already 878 * working and we are the worker thread, try to poll one more time. 879 * 880 * For latency sake, we do allow any thread calling squeue_enter 881 * to process its packet provided: 882 * 883 * 1) Nothing is queued 884 * 2) If more packets arrived in between, the non worker thread are allowed 885 * to do the drain till their time quanta expired provided SQS_GET_PKTS 886 * wasn't set in between. 887 * 888 * Avoiding deadlocks with interrupts 889 * ================================== 890 * 891 * One of the big problem is that we can't send poll_thr down while holding 892 * the sq_lock since the thread can block. So we drop the sq_lock before 893 * calling sq_get_pkts(). We keep holding the SQS_PROC as long as the 894 * poll thread is running so that no other thread can acquire the 895 * perimeter in between. If the squeue_drain gets done (no more work 896 * left), it leaves the SQS_PROC set if poll thread is running. 897 */ 898 899 /* 900 * This is the squeue poll thread. In poll mode, it polls the underlying 901 * TCP softring and feeds packets into the squeue. The worker thread then 902 * drains the squeue. The poll thread also responds to control signals for 903 * quiesceing, restarting, or cleanup of an squeue. These are driven by 904 * control operations like plumb/unplumb or as a result of dynamic Rx ring 905 * related operations that are driven from the mac layer. 906 */ 907 static void 908 squeue_polling_thread(squeue_t *sqp) 909 { 910 kmutex_t *lock = &sqp->sq_lock; 911 kcondvar_t *async = &sqp->sq_poll_cv; 912 ip_mac_rx_t sq_get_pkts; 913 ip_accept_t ip_accept; 914 ill_rx_ring_t *sq_rx_ring; 915 ill_t *sq_ill; 916 mblk_t *head, *tail, *mp; 917 uint_t cnt; 918 void *sq_mac_handle; 919 callb_cpr_t cprinfo; 920 size_t bytes_to_pickup; 921 uint32_t ctl_state; 922 923 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_poll"); 924 mutex_enter(lock); 925 926 for (;;) { 927 CALLB_CPR_SAFE_BEGIN(&cprinfo); 928 cv_wait(async, lock); 929 CALLB_CPR_SAFE_END(&cprinfo, lock); 930 931 ctl_state = sqp->sq_state & (SQS_POLL_THR_CONTROL | 932 SQS_POLL_THR_QUIESCED); 933 if (ctl_state != 0) { 934 /* 935 * If the squeue is quiesced, then wait for a control 936 * request. A quiesced squeue must not poll the 937 * underlying soft ring. 938 */ 939 if (ctl_state == SQS_POLL_THR_QUIESCED) 940 continue; 941 /* 942 * Act on control requests to quiesce, cleanup or 943 * restart an squeue 944 */ 945 squeue_poll_thr_control(sqp); 946 continue; 947 } 948 949 if (!(sqp->sq_state & SQS_POLL_CAPAB)) 950 continue; 951 952 ASSERT((sqp->sq_state & 953 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) == 954 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)); 955 956 poll_again: 957 sq_rx_ring = sqp->sq_rx_ring; 958 sq_get_pkts = sq_rx_ring->rr_rx; 959 sq_mac_handle = sq_rx_ring->rr_rx_handle; 960 ip_accept = sq_rx_ring->rr_ip_accept; 961 sq_ill = sq_rx_ring->rr_ill; 962 bytes_to_pickup = squeue_poll_budget_bytes; 963 mutex_exit(lock); 964 head = sq_get_pkts(sq_mac_handle, bytes_to_pickup); 965 mp = NULL; 966 if (head != NULL) { 967 /* 968 * We got the packet chain from the mac layer. It 969 * would be nice to be able to process it inline 970 * for better performance but we need to give 971 * IP a chance to look at this chain to ensure 972 * that packets are really meant for this squeue 973 * and do the IP processing. 974 */ 975 mp = ip_accept(sq_ill, sq_rx_ring, sqp, head, 976 &tail, &cnt); 977 } 978 mutex_enter(lock); 979 if (mp != NULL) { 980 /* 981 * The ip_accept function has already added an 982 * ip_recv_attr_t mblk if that is needed. 983 */ 984 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 985 } 986 ASSERT((sqp->sq_state & 987 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) == 988 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)); 989 990 if (sqp->sq_first != NULL && !(sqp->sq_state & SQS_WORKER)) { 991 /* 992 * We have packets to process and worker thread 993 * is not running. Check to see if poll thread is 994 * allowed to process. Let it do processing only if it 995 * picked up some packets from the NIC otherwise 996 * wakeup the worker thread. 997 */ 998 if (mp != NULL) { 999 hrtime_t now; 1000 1001 now = gethrtime(); 1002 sqp->sq_run = curthread; 1003 sqp->sq_drain(sqp, SQS_POLL_PROC, now + 1004 squeue_drain_ns); 1005 sqp->sq_run = NULL; 1006 1007 if (sqp->sq_first == NULL) 1008 goto poll_again; 1009 1010 /* 1011 * Couldn't do the entire drain because the 1012 * time limit expired, let the 1013 * worker thread take over. 1014 */ 1015 } 1016 1017 /* 1018 * Put the SQS_PROC_HELD on so the worker 1019 * thread can distinguish where its called from. We 1020 * can remove the SQS_PROC flag here and turn off the 1021 * polling so that it wouldn't matter who gets the 1022 * processing but we get better performance this way 1023 * and save the cost of turn polling off and possibly 1024 * on again as soon as we start draining again. 1025 * 1026 * We can't remove the SQS_PROC flag without turning 1027 * polling off until we can guarantee that control 1028 * will return to squeue_drain immediately. 1029 */ 1030 sqp->sq_state |= SQS_PROC_HELD; 1031 sqp->sq_state &= ~SQS_GET_PKTS; 1032 squeue_worker_wakeup(sqp); 1033 } else if (sqp->sq_first == NULL && 1034 !(sqp->sq_state & SQS_WORKER)) { 1035 /* 1036 * Nothing queued and worker thread not running. 1037 * Since we hold the proc, no other thread is 1038 * processing the squeue. This means that there 1039 * is no work to be done and nothing is queued 1040 * in squeue or in NIC. Turn polling off and go 1041 * back to interrupt mode. 1042 */ 1043 sqp->sq_state &= ~(SQS_PROC|SQS_GET_PKTS); 1044 /* LINTED: constant in conditional context */ 1045 SQS_POLLING_OFF(sqp, B_TRUE, sq_rx_ring); 1046 1047 /* 1048 * If there is a pending control operation 1049 * wake up the worker, since it is currently 1050 * not running. 1051 */ 1052 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) { 1053 squeue_worker_wakeup(sqp); 1054 } 1055 } else { 1056 /* 1057 * Worker thread is already running. We don't need 1058 * to do anything. Indicate that poll thread is done. 1059 */ 1060 sqp->sq_state &= ~SQS_GET_PKTS; 1061 } 1062 if (sqp->sq_state & SQS_POLL_THR_CONTROL) { 1063 /* 1064 * Act on control requests to quiesce, cleanup or 1065 * restart an squeue 1066 */ 1067 squeue_poll_thr_control(sqp); 1068 } 1069 } 1070 } 1071 1072 /* 1073 * The squeue worker thread acts on any control requests to quiesce, cleanup 1074 * or restart an ill_rx_ring_t by calling this function. The worker thread 1075 * synchronizes with the squeue poll thread to complete the request and finally 1076 * wakes up the requestor when the request is completed. 1077 */ 1078 static void 1079 squeue_worker_thr_control(squeue_t *sqp) 1080 { 1081 ill_t *ill; 1082 ill_rx_ring_t *rx_ring; 1083 1084 ASSERT(MUTEX_HELD(&sqp->sq_lock)); 1085 1086 if (sqp->sq_state & SQS_POLL_RESTART) { 1087 /* Restart implies a previous quiesce. */ 1088 ASSERT((sqp->sq_state & (SQS_PROC_HELD | 1089 SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)) == 1090 (SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)); 1091 /* 1092 * Request the squeue poll thread to restart and wait till 1093 * it actually restarts. 1094 */ 1095 sqp->sq_state &= ~SQS_POLL_QUIESCE_DONE; 1096 sqp->sq_state |= SQS_POLL_THR_RESTART; 1097 cv_signal(&sqp->sq_poll_cv); 1098 while (sqp->sq_state & SQS_POLL_THR_QUIESCED) 1099 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock); 1100 sqp->sq_state &= ~(SQS_POLL_RESTART | SQS_PROC | 1101 SQS_WORKER); 1102 /* 1103 * Signal any waiter that is waiting for the restart 1104 * to complete 1105 */ 1106 sqp->sq_state |= SQS_POLL_RESTART_DONE; 1107 cv_signal(&sqp->sq_ctrlop_done_cv); 1108 return; 1109 } 1110 1111 if (sqp->sq_state & SQS_PROC_HELD) { 1112 /* The squeue poll thread handed control to us */ 1113 ASSERT(sqp->sq_state & SQS_PROC); 1114 } 1115 1116 /* 1117 * Prevent any other thread from processing the squeue 1118 * until we finish the control actions by setting SQS_PROC. 1119 * But allow ourself to reenter by setting SQS_WORKER 1120 */ 1121 sqp->sq_state |= (SQS_PROC | SQS_WORKER); 1122 1123 /* Signal the squeue poll thread and wait for it to quiesce itself */ 1124 if (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) { 1125 sqp->sq_state |= SQS_POLL_THR_QUIESCE; 1126 cv_signal(&sqp->sq_poll_cv); 1127 while (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) 1128 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock); 1129 } 1130 1131 rx_ring = sqp->sq_rx_ring; 1132 ill = rx_ring->rr_ill; 1133 /* 1134 * The lock hierarchy is as follows. 1135 * cpu_lock -> ill_lock -> sqset_lock -> sq_lock 1136 */ 1137 mutex_exit(&sqp->sq_lock); 1138 mutex_enter(&ill->ill_lock); 1139 mutex_enter(&sqp->sq_lock); 1140 1141 SQS_POLLING_OFF(sqp, (sqp->sq_state & SQS_POLL_CAPAB) != 0, 1142 sqp->sq_rx_ring); 1143 sqp->sq_state &= ~(SQS_POLL_CAPAB | SQS_GET_PKTS | SQS_PROC_HELD); 1144 if (sqp->sq_state & SQS_POLL_CLEANUP) { 1145 /* 1146 * Disassociate this squeue from its ill_rx_ring_t. 1147 * The rr_sqp, sq_rx_ring fields are protected by the 1148 * corresponding squeue, ill_lock* and sq_lock. Holding any 1149 * of them will ensure that the ring to squeue mapping does 1150 * not change. 1151 */ 1152 ASSERT(!(sqp->sq_state & SQS_DEFAULT)); 1153 1154 sqp->sq_rx_ring = NULL; 1155 rx_ring->rr_sqp = NULL; 1156 1157 sqp->sq_state &= ~(SQS_POLL_CLEANUP | SQS_POLL_THR_QUIESCED | 1158 SQS_POLL_QUIESCE_DONE); 1159 sqp->sq_ill = NULL; 1160 1161 rx_ring->rr_rx_handle = NULL; 1162 rx_ring->rr_intr_handle = NULL; 1163 rx_ring->rr_intr_enable = NULL; 1164 rx_ring->rr_intr_disable = NULL; 1165 sqp->sq_state |= SQS_POLL_CLEANUP_DONE; 1166 } else { 1167 sqp->sq_state &= ~SQS_POLL_QUIESCE; 1168 sqp->sq_state |= SQS_POLL_QUIESCE_DONE; 1169 } 1170 /* 1171 * Signal any waiter that is waiting for the quiesce or cleanup 1172 * to complete and also wait for it to actually see and reset the 1173 * SQS_POLL_CLEANUP_DONE. 1174 */ 1175 cv_signal(&sqp->sq_ctrlop_done_cv); 1176 mutex_exit(&ill->ill_lock); 1177 if (sqp->sq_state & SQS_POLL_CLEANUP_DONE) { 1178 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock); 1179 sqp->sq_state &= ~(SQS_PROC | SQS_WORKER); 1180 } 1181 } 1182 1183 static void 1184 squeue_worker(squeue_t *sqp) 1185 { 1186 kmutex_t *lock = &sqp->sq_lock; 1187 kcondvar_t *async = &sqp->sq_worker_cv; 1188 callb_cpr_t cprinfo; 1189 hrtime_t now; 1190 1191 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_worker"); 1192 mutex_enter(lock); 1193 1194 for (;;) { 1195 for (;;) { 1196 /* 1197 * If the poll thread has handed control to us 1198 * we need to break out of the wait. 1199 */ 1200 if (sqp->sq_state & SQS_PROC_HELD) 1201 break; 1202 1203 /* 1204 * If the squeue is not being processed and we either 1205 * have messages to drain or some thread has signaled 1206 * some control activity we need to break 1207 */ 1208 if (!(sqp->sq_state & SQS_PROC) && 1209 ((sqp->sq_state & SQS_WORKER_THR_CONTROL) || 1210 (sqp->sq_first != NULL))) 1211 break; 1212 1213 /* 1214 * If we have started some control action, then check 1215 * for the SQS_WORKER flag (since we don't 1216 * release the squeue) to make sure we own the squeue 1217 * and break out 1218 */ 1219 if ((sqp->sq_state & SQS_WORKER_THR_CONTROL) && 1220 (sqp->sq_state & SQS_WORKER)) 1221 break; 1222 1223 CALLB_CPR_SAFE_BEGIN(&cprinfo); 1224 cv_wait(async, lock); 1225 CALLB_CPR_SAFE_END(&cprinfo, lock); 1226 } 1227 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) { 1228 squeue_worker_thr_control(sqp); 1229 continue; 1230 } 1231 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 1232 SQS_POLL_CLEANUP_DONE | SQS_POLL_QUIESCE_DONE | 1233 SQS_WORKER_THR_CONTROL | SQS_POLL_THR_CONTROL))); 1234 1235 if (sqp->sq_state & SQS_PROC_HELD) 1236 sqp->sq_state &= ~SQS_PROC_HELD; 1237 1238 now = gethrtime(); 1239 sqp->sq_run = curthread; 1240 sqp->sq_drain(sqp, SQS_WORKER, now + squeue_drain_ns); 1241 sqp->sq_run = NULL; 1242 } 1243 } 1244 1245 uintptr_t * 1246 squeue_getprivate(squeue_t *sqp, sqprivate_t p) 1247 { 1248 ASSERT(p < SQPRIVATE_MAX); 1249 1250 return (&sqp->sq_private[p]); 1251 } 1252 1253 /* ARGSUSED */ 1254 void 1255 squeue_wakeup_conn(void *arg, mblk_t *mp, void *arg2, ip_recv_attr_t *dummy) 1256 { 1257 conn_t *connp = (conn_t *)arg; 1258 squeue_t *sqp = connp->conn_sqp; 1259 1260 /* 1261 * Mark the squeue as paused before waking up the thread stuck 1262 * in squeue_synch_enter(). 1263 */ 1264 mutex_enter(&sqp->sq_lock); 1265 sqp->sq_state |= SQS_PAUSE; 1266 1267 /* 1268 * Notify the thread that it's OK to proceed; that is done by 1269 * clearing the MSGWAITSYNC flag. The synch thread will free the mblk. 1270 */ 1271 ASSERT(mp->b_flag & MSGWAITSYNC); 1272 mp->b_flag &= ~MSGWAITSYNC; 1273 cv_broadcast(&connp->conn_sq_cv); 1274 1275 /* 1276 * We are doing something on behalf of another thread, so we have to 1277 * pause and wait until it finishes. 1278 */ 1279 while (sqp->sq_state & SQS_PAUSE) { 1280 cv_wait(&sqp->sq_synch_cv, &sqp->sq_lock); 1281 } 1282 mutex_exit(&sqp->sq_lock); 1283 } 1284 1285 int 1286 squeue_synch_enter(conn_t *connp, mblk_t *use_mp) 1287 { 1288 squeue_t *sqp; 1289 1290 again: 1291 sqp = connp->conn_sqp; 1292 1293 mutex_enter(&sqp->sq_lock); 1294 if (sqp->sq_first == NULL && !(sqp->sq_state & SQS_PROC)) { 1295 /* 1296 * We are OK to proceed if the squeue is empty, and 1297 * no one owns the squeue. 1298 * 1299 * The caller won't own the squeue as this is called from the 1300 * application. 1301 */ 1302 ASSERT(sqp->sq_run == NULL); 1303 1304 sqp->sq_state |= SQS_PROC; 1305 sqp->sq_run = curthread; 1306 mutex_exit(&sqp->sq_lock); 1307 1308 /* 1309 * Handle squeue switching. The conn's squeue can only change 1310 * while there is a thread in the squeue, which is why we do 1311 * the check after entering the squeue. If it has changed, exit 1312 * this squeue and redo everything with the new sqeueue. 1313 */ 1314 if (sqp != connp->conn_sqp) { 1315 mutex_enter(&sqp->sq_lock); 1316 sqp->sq_state &= ~SQS_PROC; 1317 sqp->sq_run = NULL; 1318 mutex_exit(&sqp->sq_lock); 1319 goto again; 1320 } 1321 #if SQUEUE_DEBUG 1322 sqp->sq_curmp = NULL; 1323 sqp->sq_curproc = NULL; 1324 sqp->sq_connp = connp; 1325 #endif 1326 connp->conn_on_sqp = B_TRUE; 1327 return (0); 1328 } else { 1329 mblk_t *mp; 1330 1331 mp = (use_mp == NULL) ? allocb(0, BPRI_MED) : use_mp; 1332 if (mp == NULL) { 1333 mutex_exit(&sqp->sq_lock); 1334 return (ENOMEM); 1335 } 1336 1337 /* 1338 * We mark the mblk as awaiting synchronous squeue access 1339 * by setting the MSGWAITSYNC flag. Once squeue_wakeup_conn 1340 * fires, MSGWAITSYNC is cleared, at which point we know we 1341 * have exclusive access. 1342 */ 1343 mp->b_flag |= MSGWAITSYNC; 1344 1345 CONN_INC_REF(connp); 1346 SET_SQUEUE(mp, squeue_wakeup_conn, connp); 1347 ENQUEUE_CHAIN(sqp, mp, mp, 1); 1348 1349 ASSERT(sqp->sq_run != curthread); 1350 1351 /* Wait until the enqueued mblk get processed. */ 1352 while (mp->b_flag & MSGWAITSYNC) 1353 cv_wait(&connp->conn_sq_cv, &sqp->sq_lock); 1354 mutex_exit(&sqp->sq_lock); 1355 1356 if (use_mp == NULL) 1357 freeb(mp); 1358 1359 return (0); 1360 } 1361 } 1362 1363 /* 1364 * If possible, attempt to immediately process a single queued request, should 1365 * it match the supplied conn_t reference. This is primarily intended to elide 1366 * squeue worker thread wake-ups during local TCP connect() or close() 1367 * operations where the response is placed on the squeue during processing. 1368 */ 1369 static void 1370 squeue_try_drain_one(squeue_t *sqp, conn_t *compare_conn) 1371 { 1372 mblk_t *next, *mp = sqp->sq_first; 1373 conn_t *connp; 1374 sqproc_t proc = (sqproc_t)mp->b_queue; 1375 ip_recv_attr_t iras, *ira = NULL; 1376 1377 ASSERT(MUTEX_HELD(&sqp->sq_lock)); 1378 ASSERT((sqp->sq_state & SQS_PROC) == 0); 1379 ASSERT(sqp->sq_run == NULL); 1380 VERIFY(mp != NULL); 1381 1382 /* 1383 * There is no guarantee that compare_conn references a valid object at 1384 * this time, so under no circumstance may it be deferenced unless it 1385 * matches the squeue entry. 1386 */ 1387 connp = (conn_t *)mp->b_prev; 1388 if (connp != compare_conn) { 1389 return; 1390 } 1391 1392 next = mp->b_next; 1393 proc = (sqproc_t)mp->b_queue; 1394 1395 ASSERT(proc != NULL); 1396 ASSERT(sqp->sq_count > 0); 1397 1398 /* Dequeue item from squeue */ 1399 if (next == NULL) { 1400 sqp->sq_first = NULL; 1401 sqp->sq_last = NULL; 1402 } else { 1403 sqp->sq_first = next; 1404 } 1405 sqp->sq_count--; 1406 1407 sqp->sq_state |= SQS_PROC; 1408 sqp->sq_run = curthread; 1409 mutex_exit(&sqp->sq_lock); 1410 1411 /* Prep mblk_t and retrieve ira if needed */ 1412 mp->b_prev = NULL; 1413 mp->b_queue = NULL; 1414 mp->b_next = NULL; 1415 if (ip_recv_attr_is_mblk(mp)) { 1416 mblk_t *attrmp = mp; 1417 1418 ASSERT(attrmp->b_cont != NULL); 1419 1420 mp = attrmp->b_cont; 1421 attrmp->b_cont = NULL; 1422 1423 ASSERT(mp->b_queue == NULL); 1424 ASSERT(mp->b_prev == NULL); 1425 1426 if (!ip_recv_attr_from_mblk(attrmp, &iras)) { 1427 /* ill_t or ip_stack_t disappeared */ 1428 ip_drop_input("ip_recv_attr_from_mblk", mp, NULL); 1429 ira_cleanup(&iras, B_TRUE); 1430 CONN_DEC_REF(connp); 1431 goto done; 1432 } 1433 ira = &iras; 1434 } 1435 1436 SQUEUE_DBG_SET(sqp, mp, proc, connp, mp->b_tag); 1437 connp->conn_on_sqp = B_TRUE; 1438 DTRACE_PROBE3(squeue__proc__start, squeue_t *, sqp, mblk_t *, mp, 1439 conn_t *, connp); 1440 (*proc)(connp, mp, sqp, ira); 1441 DTRACE_PROBE2(squeue__proc__end, squeue_t *, sqp, conn_t *, connp); 1442 connp->conn_on_sqp = B_FALSE; 1443 CONN_DEC_REF(connp); 1444 SQUEUE_DBG_CLEAR(sqp); 1445 1446 if (ira != NULL) 1447 ira_cleanup(ira, B_TRUE); 1448 1449 done: 1450 mutex_enter(&sqp->sq_lock); 1451 sqp->sq_state &= ~(SQS_PROC); 1452 sqp->sq_run = NULL; 1453 } 1454 1455 void 1456 squeue_synch_exit(conn_t *connp, int flag) 1457 { 1458 squeue_t *sqp = connp->conn_sqp; 1459 1460 ASSERT(flag == SQ_NODRAIN || flag == SQ_PROCESS); 1461 1462 mutex_enter(&sqp->sq_lock); 1463 if (sqp->sq_run != curthread) { 1464 /* 1465 * The caller doesn't own the squeue, clear the SQS_PAUSE flag, 1466 * and wake up the squeue owner, such that owner can continue 1467 * processing. 1468 */ 1469 ASSERT(sqp->sq_state & SQS_PAUSE); 1470 sqp->sq_state &= ~SQS_PAUSE; 1471 1472 /* There should be only one thread blocking on sq_synch_cv. */ 1473 cv_signal(&sqp->sq_synch_cv); 1474 mutex_exit(&sqp->sq_lock); 1475 return; 1476 } 1477 1478 ASSERT(sqp->sq_state & SQS_PROC); 1479 1480 sqp->sq_state &= ~SQS_PROC; 1481 sqp->sq_run = NULL; 1482 connp->conn_on_sqp = B_FALSE; 1483 1484 /* If the caller opted in, attempt to process the head squeue item. */ 1485 if (flag == SQ_PROCESS && sqp->sq_first != NULL) { 1486 squeue_try_drain_one(sqp, connp); 1487 } 1488 1489 /* Wake up the worker if further requests are pending. */ 1490 if (sqp->sq_first != NULL) { 1491 squeue_worker_wakeup(sqp); 1492 } 1493 mutex_exit(&sqp->sq_lock); 1494 } 1495