1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. 23 */ 24 25 /* 26 * Copyright 2017 Joyent, Inc. 27 */ 28 29 /* 30 * Squeues: General purpose serialization mechanism 31 * ------------------------------------------------ 32 * 33 * Background: 34 * ----------- 35 * 36 * This is a general purpose high-performance serialization mechanism 37 * currently used by TCP/IP. It is implement by means of a per CPU queue, 38 * a worker thread and a polling thread with are bound to the CPU 39 * associated with the squeue. The squeue is strictly FIFO for both read 40 * and write side and only one thread can process it at any given time. 41 * The design goal of squeue was to offer a very high degree of 42 * parallelization (on a per H/W execution pipeline basis) with at 43 * most one queuing. 44 * 45 * The modules needing protection typically calls SQUEUE_ENTER_ONE() or 46 * SQUEUE_ENTER() macro as soon as a thread enter the module 47 * from either direction. For each packet, the processing function 48 * and argument is stored in the mblk itself. When the packet is ready 49 * to be processed, the squeue retrieves the stored function and calls 50 * it with the supplied argument and the pointer to the packet itself. 51 * The called function can assume that no other thread is processing 52 * the squeue when it is executing. 53 * 54 * Squeue/connection binding: 55 * -------------------------- 56 * 57 * TCP/IP uses an IP classifier in conjunction with squeue where specific 58 * connections are assigned to specific squeue (based on various policies), 59 * at the connection creation time. Once assigned, the connection to 60 * squeue mapping is never changed and all future packets for that 61 * connection are processed on that squeue. The connection ("conn") to 62 * squeue mapping is stored in "conn_t" member "conn_sqp". 63 * 64 * Since the processing of the connection cuts across multiple layers 65 * but still allows packets for different connnection to be processed on 66 * other CPU/squeues, squeues are also termed as "Vertical Perimeter" or 67 * "Per Connection Vertical Perimeter". 68 * 69 * Processing Model: 70 * ----------------- 71 * 72 * Squeue doesn't necessary processes packets with its own worker thread. 73 * The callers can pick if they just want to queue the packet, process 74 * their packet if nothing is queued or drain and process. The first two 75 * modes are typically employed when the packet was generated while 76 * already doing the processing behind the squeue and last mode (drain 77 * and process) is typically employed when the thread is entering squeue 78 * for the first time. The squeue still imposes a finite time limit 79 * for which a external thread can do processing after which it switches 80 * processing to its own worker thread. 81 * 82 * Once created, squeues are never deleted. Hence squeue pointers are 83 * always valid. This means that functions outside the squeue can still 84 * refer safely to conn_sqp and their is no need for ref counts. 85 * 86 * Only a thread executing in the squeue can change the squeue of the 87 * connection. It does so by calling a squeue framework function to do this. 88 * After changing the squeue, the thread must leave the squeue. It must not 89 * continue to execute any code that needs squeue protection. 90 * 91 * The squeue framework, after entering the squeue, checks if the current 92 * squeue matches the conn_sqp. If the check fails, the packet is delivered 93 * to right squeue. 94 * 95 * Polling Model: 96 * -------------- 97 * 98 * Squeues can control the rate of packet arrival into itself from the 99 * NIC or specific Rx ring within a NIC. As part of capability negotiation 100 * between IP and MAC layer, squeue are created for each TCP soft ring 101 * (or TCP Rx ring - to be implemented in future). As part of this 102 * negotiation, squeues get a cookie for underlying soft ring or Rx 103 * ring, a function to turn off incoming packets and a function to call 104 * to poll for packets. This helps schedule the receive side packet 105 * processing so that queue backlog doesn't build up and packet processing 106 * doesn't keep getting disturbed by high priority interrupts. As part 107 * of this mode, as soon as a backlog starts building, squeue turns off 108 * the interrupts and switches to poll mode. In poll mode, when poll 109 * thread goes down to retrieve packets, it retrieves them in the form of 110 * a chain which improves performance even more. As the squeue/softring 111 * system gets more packets, it gets more efficient by switching to 112 * polling more often and dealing with larger packet chains. 113 * 114 */ 115 116 #include <sys/types.h> 117 #include <sys/cmn_err.h> 118 #include <sys/debug.h> 119 #include <sys/kmem.h> 120 #include <sys/cpuvar.h> 121 #include <sys/condvar_impl.h> 122 #include <sys/systm.h> 123 #include <sys/callb.h> 124 #include <sys/sdt.h> 125 #include <sys/ddi.h> 126 #include <sys/sunddi.h> 127 #include <sys/stack.h> 128 #include <sys/archsystm.h> 129 130 #include <inet/ipclassifier.h> 131 #include <inet/udp_impl.h> 132 133 #include <sys/squeue_impl.h> 134 135 static void squeue_drain(squeue_t *, uint_t, hrtime_t); 136 static void squeue_worker(squeue_t *sqp); 137 static void squeue_polling_thread(squeue_t *sqp); 138 static void squeue_worker_wakeup(squeue_t *sqp); 139 static void squeue_try_drain_one(squeue_t *, conn_t *); 140 141 kmem_cache_t *squeue_cache; 142 143 #define SQUEUE_MSEC_TO_NSEC 1000000 144 145 int squeue_drain_ms = 20; 146 147 /* The values above converted to ticks or nano seconds */ 148 static uint_t squeue_drain_ns = 0; 149 150 uintptr_t squeue_drain_stack_needed = 10240; 151 uint_t squeue_drain_stack_toodeep; 152 153 #define MAX_BYTES_TO_PICKUP 150000 154 155 #define ENQUEUE_CHAIN(sqp, mp, tail, cnt) { \ 156 /* \ 157 * Enqueue our mblk chain. \ 158 */ \ 159 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 160 \ 161 if ((sqp)->sq_last != NULL) \ 162 (sqp)->sq_last->b_next = (mp); \ 163 else \ 164 (sqp)->sq_first = (mp); \ 165 (sqp)->sq_last = (tail); \ 166 (sqp)->sq_count += (cnt); \ 167 ASSERT((sqp)->sq_count > 0); \ 168 DTRACE_PROBE4(squeue__enqueuechain, squeue_t *, sqp, \ 169 mblk_t *, mp, mblk_t *, tail, int, cnt); \ 170 \ 171 } 172 173 /* 174 * Blank the receive ring (in this case it is the soft ring). When 175 * blanked, the soft ring will not send any more packets up. 176 * Blanking may not succeed when there is a CPU already in the soft 177 * ring sending packets up. In that case, SQS_POLLING will not be 178 * set. 179 */ 180 #define SQS_POLLING_ON(sqp, sq_poll_capable, rx_ring) { \ 181 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 182 if (sq_poll_capable) { \ 183 ASSERT(rx_ring != NULL); \ 184 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \ 185 if (!(sqp->sq_state & SQS_POLLING)) { \ 186 if (rx_ring->rr_intr_disable(rx_ring->rr_intr_handle)) \ 187 sqp->sq_state |= SQS_POLLING; \ 188 } \ 189 } \ 190 } 191 192 #define SQS_POLLING_OFF(sqp, sq_poll_capable, rx_ring) { \ 193 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 194 if (sq_poll_capable) { \ 195 ASSERT(rx_ring != NULL); \ 196 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \ 197 if (sqp->sq_state & SQS_POLLING) { \ 198 sqp->sq_state &= ~SQS_POLLING; \ 199 rx_ring->rr_intr_enable(rx_ring->rr_intr_handle); \ 200 } \ 201 } \ 202 } 203 204 /* Wakeup poll thread only if SQS_POLLING is set */ 205 #define SQS_POLL_RING(sqp) { \ 206 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 207 if (sqp->sq_state & SQS_POLLING) { \ 208 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \ 209 if (!(sqp->sq_state & SQS_GET_PKTS)) { \ 210 sqp->sq_state |= SQS_GET_PKTS; \ 211 cv_signal(&sqp->sq_poll_cv); \ 212 } \ 213 } \ 214 } 215 216 #ifdef DEBUG 217 #define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) { \ 218 (sqp)->sq_curmp = (mp); \ 219 (sqp)->sq_curproc = (proc); \ 220 (sqp)->sq_connp = (connp); \ 221 (mp)->b_tag = (sqp)->sq_tag = (tag); \ 222 } 223 224 #define SQUEUE_DBG_CLEAR(sqp) { \ 225 (sqp)->sq_curmp = NULL; \ 226 (sqp)->sq_curproc = NULL; \ 227 (sqp)->sq_connp = NULL; \ 228 } 229 #else 230 #define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) 231 #define SQUEUE_DBG_CLEAR(sqp) 232 #endif 233 234 void 235 squeue_init(void) 236 { 237 squeue_cache = kmem_cache_create("squeue_cache", 238 sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0); 239 240 squeue_drain_ns = squeue_drain_ms * SQUEUE_MSEC_TO_NSEC; 241 } 242 243 squeue_t * 244 squeue_create(pri_t pri) 245 { 246 squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP); 247 248 bzero(sqp, sizeof (squeue_t)); 249 sqp->sq_bind = PBIND_NONE; 250 sqp->sq_priority = pri; 251 sqp->sq_worker = thread_create(NULL, 0, squeue_worker, 252 sqp, 0, &p0, TS_RUN, pri); 253 254 sqp->sq_poll_thr = thread_create(NULL, 0, squeue_polling_thread, 255 sqp, 0, &p0, TS_RUN, pri); 256 257 sqp->sq_enter = squeue_enter; 258 sqp->sq_drain = squeue_drain; 259 260 return (sqp); 261 } 262 263 /* 264 * Bind squeue worker thread to the specified CPU, given by CPU id. 265 * If the CPU id value is -1, bind the worker thread to the value 266 * specified in sq_bind field. If a thread is already bound to a 267 * different CPU, unbind it from the old CPU and bind to the new one. 268 */ 269 270 void 271 squeue_bind(squeue_t *sqp, processorid_t bind) 272 { 273 mutex_enter(&sqp->sq_lock); 274 ASSERT(sqp->sq_bind != PBIND_NONE || bind != PBIND_NONE); 275 ASSERT(MUTEX_HELD(&cpu_lock)); 276 277 if (sqp->sq_state & SQS_BOUND) { 278 if (sqp->sq_bind == bind) { 279 mutex_exit(&sqp->sq_lock); 280 return; 281 } 282 thread_affinity_clear(sqp->sq_worker); 283 } else { 284 sqp->sq_state |= SQS_BOUND; 285 } 286 287 if (bind != PBIND_NONE) 288 sqp->sq_bind = bind; 289 290 thread_affinity_set(sqp->sq_worker, sqp->sq_bind); 291 mutex_exit(&sqp->sq_lock); 292 } 293 294 void 295 squeue_unbind(squeue_t *sqp) 296 { 297 mutex_enter(&sqp->sq_lock); 298 if (!(sqp->sq_state & SQS_BOUND)) { 299 mutex_exit(&sqp->sq_lock); 300 return; 301 } 302 303 sqp->sq_state &= ~SQS_BOUND; 304 thread_affinity_clear(sqp->sq_worker); 305 mutex_exit(&sqp->sq_lock); 306 } 307 308 /* 309 * squeue_enter() - enter squeue sqp with mblk mp (which can be 310 * a chain), while tail points to the end and cnt in number of 311 * mblks in the chain. 312 * 313 * For a chain of single packet (i.e. mp == tail), go through the 314 * fast path if no one is processing the squeue and nothing is queued. 315 * 316 * The proc and arg for each mblk is already stored in the mblk in 317 * appropriate places. 318 * 319 * The process_flag specifies if we are allowed to process the mblk 320 * and drain in the entering thread context. If process_flag is 321 * SQ_FILL, then we just queue the mblk and return (after signaling 322 * the worker thread if no one else is processing the squeue). 323 * 324 * The ira argument can be used when the count is one. 325 * For a chain the caller needs to prepend any needed mblks from 326 * ip_recv_attr_to_mblk(). 327 */ 328 /* ARGSUSED */ 329 void 330 squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt, 331 ip_recv_attr_t *ira, int process_flag, uint8_t tag) 332 { 333 conn_t *connp; 334 sqproc_t proc; 335 hrtime_t now; 336 337 ASSERT(sqp != NULL); 338 ASSERT(mp != NULL); 339 ASSERT(tail != NULL); 340 ASSERT(cnt > 0); 341 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 342 ASSERT(ira == NULL || cnt == 1); 343 344 mutex_enter(&sqp->sq_lock); 345 346 /* 347 * Try to process the packet if SQ_FILL flag is not set and 348 * we are allowed to process the squeue. The SQ_NODRAIN is 349 * ignored if the packet chain consists of more than 1 packet. 350 */ 351 if (!(sqp->sq_state & SQS_PROC) && ((process_flag == SQ_PROCESS) || 352 (process_flag == SQ_NODRAIN && sqp->sq_first == NULL))) { 353 /* 354 * See if anything is already queued. If we are the 355 * first packet, do inline processing else queue the 356 * packet and do the drain. 357 */ 358 if (sqp->sq_first == NULL && cnt == 1) { 359 /* 360 * Fast-path, ok to process and nothing queued. 361 */ 362 sqp->sq_state |= (SQS_PROC|SQS_FAST); 363 sqp->sq_run = curthread; 364 mutex_exit(&sqp->sq_lock); 365 366 /* 367 * We are the chain of 1 packet so 368 * go through this fast path. 369 */ 370 ASSERT(mp->b_prev != NULL); 371 ASSERT(mp->b_queue != NULL); 372 connp = (conn_t *)mp->b_prev; 373 mp->b_prev = NULL; 374 proc = (sqproc_t)mp->b_queue; 375 mp->b_queue = NULL; 376 ASSERT(proc != NULL && connp != NULL); 377 ASSERT(mp->b_next == NULL); 378 379 /* 380 * Handle squeue switching. More details in the 381 * block comment at the top of the file 382 */ 383 if (connp->conn_sqp == sqp) { 384 SQUEUE_DBG_SET(sqp, mp, proc, connp, 385 tag); 386 connp->conn_on_sqp = B_TRUE; 387 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 388 sqp, mblk_t *, mp, conn_t *, connp); 389 (*proc)(connp, mp, sqp, ira); 390 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 391 sqp, conn_t *, connp); 392 connp->conn_on_sqp = B_FALSE; 393 SQUEUE_DBG_CLEAR(sqp); 394 CONN_DEC_REF(connp); 395 } else { 396 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, 397 connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE); 398 } 399 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 400 mutex_enter(&sqp->sq_lock); 401 sqp->sq_state &= ~(SQS_PROC|SQS_FAST); 402 sqp->sq_run = NULL; 403 if (sqp->sq_first == NULL || 404 process_flag == SQ_NODRAIN) { 405 /* 406 * Even if SQ_NODRAIN was specified, it may 407 * still be best to process a single queued 408 * item if it matches the active connection. 409 */ 410 if (sqp->sq_first != NULL) { 411 squeue_try_drain_one(sqp, connp); 412 } 413 414 /* 415 * If work or control actions are pending, wake 416 * up the worker thread. 417 */ 418 if (sqp->sq_first != NULL || 419 sqp->sq_state & SQS_WORKER_THR_CONTROL) { 420 squeue_worker_wakeup(sqp); 421 } 422 mutex_exit(&sqp->sq_lock); 423 return; 424 } 425 } else { 426 if (ira != NULL) { 427 mblk_t *attrmp; 428 429 ASSERT(cnt == 1); 430 attrmp = ip_recv_attr_to_mblk(ira); 431 if (attrmp == NULL) { 432 mutex_exit(&sqp->sq_lock); 433 ip_drop_input("squeue: " 434 "ip_recv_attr_to_mblk", 435 mp, NULL); 436 /* Caller already set b_prev/b_next */ 437 mp->b_prev = mp->b_next = NULL; 438 freemsg(mp); 439 return; 440 } 441 ASSERT(attrmp->b_cont == NULL); 442 attrmp->b_cont = mp; 443 /* Move connp and func to new */ 444 attrmp->b_queue = mp->b_queue; 445 mp->b_queue = NULL; 446 attrmp->b_prev = mp->b_prev; 447 mp->b_prev = NULL; 448 449 ASSERT(mp == tail); 450 tail = mp = attrmp; 451 } 452 453 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 454 #ifdef DEBUG 455 mp->b_tag = tag; 456 #endif 457 } 458 /* 459 * We are here because either we couldn't do inline 460 * processing (because something was already queued), 461 * or we had a chain of more than one packet, 462 * or something else arrived after we were done with 463 * inline processing. 464 */ 465 ASSERT(MUTEX_HELD(&sqp->sq_lock)); 466 ASSERT(sqp->sq_first != NULL); 467 now = gethrtime(); 468 sqp->sq_run = curthread; 469 sqp->sq_drain(sqp, SQS_ENTER, now + squeue_drain_ns); 470 471 /* 472 * If we didn't do a complete drain, the worker 473 * thread was already signalled by squeue_drain. 474 * In case any control actions are pending, wake 475 * up the worker. 476 */ 477 sqp->sq_run = NULL; 478 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) { 479 squeue_worker_wakeup(sqp); 480 } 481 } else { 482 /* 483 * We let a thread processing a squeue reenter only 484 * once. This helps the case of incoming connection 485 * where a SYN-ACK-ACK that triggers the conn_ind 486 * doesn't have to queue the packet if listener and 487 * eager are on the same squeue. Also helps the 488 * loopback connection where the two ends are bound 489 * to the same squeue (which is typical on single 490 * CPU machines). 491 * 492 * We let the thread reenter only once for the fear 493 * of stack getting blown with multiple traversal. 494 */ 495 connp = (conn_t *)mp->b_prev; 496 if (!(sqp->sq_state & SQS_REENTER) && 497 (process_flag != SQ_FILL) && (sqp->sq_first == NULL) && 498 (sqp->sq_run == curthread) && (cnt == 1) && 499 (connp->conn_on_sqp == B_FALSE)) { 500 sqp->sq_state |= SQS_REENTER; 501 mutex_exit(&sqp->sq_lock); 502 503 ASSERT(mp->b_prev != NULL); 504 ASSERT(mp->b_queue != NULL); 505 506 mp->b_prev = NULL; 507 proc = (sqproc_t)mp->b_queue; 508 mp->b_queue = NULL; 509 510 /* 511 * Handle squeue switching. More details in the 512 * block comment at the top of the file 513 */ 514 if (connp->conn_sqp == sqp) { 515 connp->conn_on_sqp = B_TRUE; 516 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 517 sqp, mblk_t *, mp, conn_t *, connp); 518 (*proc)(connp, mp, sqp, ira); 519 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 520 sqp, conn_t *, connp); 521 connp->conn_on_sqp = B_FALSE; 522 CONN_DEC_REF(connp); 523 } else { 524 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, 525 connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE); 526 } 527 528 mutex_enter(&sqp->sq_lock); 529 sqp->sq_state &= ~SQS_REENTER; 530 mutex_exit(&sqp->sq_lock); 531 return; 532 } 533 534 /* 535 * Queue is already being processed or there is already 536 * one or more paquets on the queue. Enqueue the 537 * packet and wakeup the squeue worker thread if the 538 * squeue is not being processed. 539 */ 540 #ifdef DEBUG 541 mp->b_tag = tag; 542 #endif 543 if (ira != NULL) { 544 mblk_t *attrmp; 545 546 ASSERT(cnt == 1); 547 attrmp = ip_recv_attr_to_mblk(ira); 548 if (attrmp == NULL) { 549 mutex_exit(&sqp->sq_lock); 550 ip_drop_input("squeue: ip_recv_attr_to_mblk", 551 mp, NULL); 552 /* Caller already set b_prev/b_next */ 553 mp->b_prev = mp->b_next = NULL; 554 freemsg(mp); 555 return; 556 } 557 ASSERT(attrmp->b_cont == NULL); 558 attrmp->b_cont = mp; 559 /* Move connp and func to new */ 560 attrmp->b_queue = mp->b_queue; 561 mp->b_queue = NULL; 562 attrmp->b_prev = mp->b_prev; 563 mp->b_prev = NULL; 564 565 ASSERT(mp == tail); 566 tail = mp = attrmp; 567 } 568 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 569 /* 570 * If the worker isn't running or control actions are pending, 571 * wake it it up now. 572 */ 573 if ((sqp->sq_state & SQS_PROC) == 0 || 574 (sqp->sq_state & SQS_WORKER_THR_CONTROL) != 0) { 575 squeue_worker_wakeup(sqp); 576 } 577 } 578 mutex_exit(&sqp->sq_lock); 579 } 580 581 /* 582 * PRIVATE FUNCTIONS 583 */ 584 585 586 /* 587 * Wake up worker thread for squeue to process queued work. 588 */ 589 static void 590 squeue_worker_wakeup(squeue_t *sqp) 591 { 592 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); 593 594 cv_signal(&sqp->sq_worker_cv); 595 sqp->sq_awoken = gethrtime(); 596 } 597 598 static void 599 squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire) 600 { 601 mblk_t *mp; 602 mblk_t *head; 603 sqproc_t proc; 604 conn_t *connp; 605 ill_rx_ring_t *sq_rx_ring = sqp->sq_rx_ring; 606 hrtime_t now; 607 boolean_t sq_poll_capable; 608 ip_recv_attr_t *ira, iras; 609 610 /* 611 * Before doing any work, check our stack depth; if we're not a 612 * worker thread for this squeue and we're beginning to get tight on 613 * on stack, kick the worker, bump a counter and return. 614 */ 615 if (proc_type != SQS_WORKER && STACK_BIAS + (uintptr_t)getfp() - 616 (uintptr_t)curthread->t_stkbase < squeue_drain_stack_needed) { 617 ASSERT(mutex_owned(&sqp->sq_lock)); 618 squeue_worker_wakeup(sqp); 619 squeue_drain_stack_toodeep++; 620 return; 621 } 622 623 sq_poll_capable = (sqp->sq_state & SQS_POLL_CAPAB) != 0; 624 again: 625 ASSERT(mutex_owned(&sqp->sq_lock)); 626 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 627 SQS_POLL_QUIESCE_DONE))); 628 629 head = sqp->sq_first; 630 sqp->sq_first = NULL; 631 sqp->sq_last = NULL; 632 sqp->sq_count = 0; 633 634 sqp->sq_state |= SQS_PROC | proc_type; 635 636 /* 637 * We have backlog built up. Switch to polling mode if the 638 * device underneath allows it. Need to do it so that 639 * more packets don't come in and disturb us (by contending 640 * for sq_lock or higher priority thread preempting us). 641 * 642 * The worker thread is allowed to do active polling while we 643 * just disable the interrupts for drain by non worker (kernel 644 * or userland) threads so they can peacefully process the 645 * packets during time allocated to them. 646 */ 647 SQS_POLLING_ON(sqp, sq_poll_capable, sq_rx_ring); 648 mutex_exit(&sqp->sq_lock); 649 650 while ((mp = head) != NULL) { 651 652 head = mp->b_next; 653 mp->b_next = NULL; 654 655 proc = (sqproc_t)mp->b_queue; 656 mp->b_queue = NULL; 657 connp = (conn_t *)mp->b_prev; 658 mp->b_prev = NULL; 659 660 /* Is there an ip_recv_attr_t to handle? */ 661 if (ip_recv_attr_is_mblk(mp)) { 662 mblk_t *attrmp = mp; 663 664 ASSERT(attrmp->b_cont != NULL); 665 666 mp = attrmp->b_cont; 667 attrmp->b_cont = NULL; 668 ASSERT(mp->b_queue == NULL); 669 ASSERT(mp->b_prev == NULL); 670 671 if (!ip_recv_attr_from_mblk(attrmp, &iras)) { 672 /* The ill or ip_stack_t disappeared on us */ 673 ip_drop_input("ip_recv_attr_from_mblk", 674 mp, NULL); 675 ira_cleanup(&iras, B_TRUE); 676 CONN_DEC_REF(connp); 677 continue; 678 } 679 ira = &iras; 680 } else { 681 ira = NULL; 682 } 683 684 685 /* 686 * Handle squeue switching. More details in the 687 * block comment at the top of the file 688 */ 689 if (connp->conn_sqp == sqp) { 690 SQUEUE_DBG_SET(sqp, mp, proc, connp, 691 mp->b_tag); 692 connp->conn_on_sqp = B_TRUE; 693 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 694 sqp, mblk_t *, mp, conn_t *, connp); 695 (*proc)(connp, mp, sqp, ira); 696 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 697 sqp, conn_t *, connp); 698 connp->conn_on_sqp = B_FALSE; 699 CONN_DEC_REF(connp); 700 } else { 701 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, connp, ira, 702 SQ_FILL, SQTAG_SQUEUE_CHANGE); 703 } 704 if (ira != NULL) 705 ira_cleanup(ira, B_TRUE); 706 } 707 708 SQUEUE_DBG_CLEAR(sqp); 709 710 mutex_enter(&sqp->sq_lock); 711 712 /* 713 * Check if there is still work to do (either more arrived or timer 714 * expired). If we are the worker thread and we are polling capable, 715 * continue doing the work since no one else is around to do the 716 * work anyway (but signal the poll thread to retrieve some packets 717 * in the meanwhile). If we are not the worker thread, just 718 * signal the worker thread to take up the work if processing time 719 * has expired. 720 */ 721 if (sqp->sq_first != NULL) { 722 /* 723 * Still more to process. If time quanta not expired, we 724 * should let the drain go on. The worker thread is allowed 725 * to drain as long as there is anything left. 726 */ 727 now = gethrtime(); 728 if ((now < expire) || (proc_type == SQS_WORKER)) { 729 /* 730 * If time not expired or we are worker thread and 731 * this squeue is polling capable, continue to do 732 * the drain. 733 * 734 * We turn off interrupts for all userland threads 735 * doing drain but we do active polling only for 736 * worker thread. 737 * 738 * Calling SQS_POLL_RING() even in the case of 739 * SQS_POLLING_ON() not succeeding is ok as 740 * SQS_POLL_RING() will not wake up poll thread 741 * if SQS_POLLING bit is not set. 742 */ 743 if (proc_type == SQS_WORKER) 744 SQS_POLL_RING(sqp); 745 goto again; 746 } 747 748 squeue_worker_wakeup(sqp); 749 } 750 751 /* 752 * If the poll thread is already running, just return. The 753 * poll thread continues to hold the proc and will finish 754 * processing. 755 */ 756 if (sqp->sq_state & SQS_GET_PKTS) { 757 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 758 SQS_POLL_QUIESCE_DONE))); 759 sqp->sq_state &= ~proc_type; 760 return; 761 } 762 763 /* 764 * 765 * If we are the worker thread and no work is left, send the poll 766 * thread down once more to see if something arrived. Otherwise, 767 * turn the interrupts back on and we are done. 768 */ 769 if ((proc_type == SQS_WORKER) && (sqp->sq_state & SQS_POLLING)) { 770 /* 771 * Do one last check to see if anything arrived 772 * in the NIC. We leave the SQS_PROC set to ensure 773 * that poll thread keeps the PROC and can decide 774 * if it needs to turn polling off or continue 775 * processing. 776 * 777 * If we drop the SQS_PROC here and poll thread comes 778 * up empty handed, it can not safely turn polling off 779 * since someone else could have acquired the PROC 780 * and started draining. The previously running poll 781 * thread and the current thread doing drain would end 782 * up in a race for turning polling on/off and more 783 * complex code would be required to deal with it. 784 * 785 * Its lot simpler for drain to hand the SQS_PROC to 786 * poll thread (if running) and let poll thread finish 787 * without worrying about racing with any other thread. 788 */ 789 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 790 SQS_POLL_QUIESCE_DONE))); 791 SQS_POLL_RING(sqp); 792 sqp->sq_state &= ~proc_type; 793 } else { 794 /* 795 * The squeue is either not capable of polling or the 796 * attempt to blank (i.e., turn SQS_POLLING_ON()) was 797 * unsuccessful or poll thread already finished 798 * processing and didn't find anything. Since there 799 * is nothing queued and we already turn polling on 800 * (for all threads doing drain), we should turn 801 * polling off and relinquish the PROC. 802 */ 803 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 804 SQS_POLL_QUIESCE_DONE))); 805 SQS_POLLING_OFF(sqp, sq_poll_capable, sq_rx_ring); 806 sqp->sq_state &= ~(SQS_PROC | proc_type); 807 /* 808 * If we are not the worker and there is a pending quiesce 809 * event, wake up the worker 810 */ 811 if ((proc_type != SQS_WORKER) && 812 (sqp->sq_state & SQS_WORKER_THR_CONTROL)) { 813 squeue_worker_wakeup(sqp); 814 } 815 } 816 } 817 818 /* 819 * Quiesce, Restart, or Cleanup of the squeue poll thread. 820 * 821 * Quiesce and Restart: After an squeue poll thread has been quiesced, it does 822 * not attempt to poll the underlying soft ring any more. The quiesce is 823 * triggered by the mac layer when it wants to quiesce a soft ring. Typically 824 * control operations such as changing the fanout of a NIC or VNIC (dladm 825 * setlinkprop) need to quiesce data flow before changing the wiring. 826 * The operation is done by the mac layer, but it calls back into IP to 827 * quiesce the soft ring. After completing the operation (say increase or 828 * decrease of the fanout) the mac layer then calls back into IP to restart 829 * the quiesced soft ring. 830 * 831 * Cleanup: This is triggered when the squeue binding to a soft ring is 832 * removed permanently. Typically interface plumb and unplumb would trigger 833 * this. It can also be triggered from the mac layer when a soft ring is 834 * being deleted say as the result of a fanout reduction. Since squeues are 835 * never deleted, the cleanup marks the squeue as fit for recycling and 836 * moves it to the zeroth squeue set. 837 */ 838 static void 839 squeue_poll_thr_control(squeue_t *sqp) 840 { 841 if (sqp->sq_state & SQS_POLL_THR_RESTART) { 842 /* Restart implies a previous quiesce */ 843 ASSERT(sqp->sq_state & SQS_POLL_THR_QUIESCED); 844 sqp->sq_state &= ~(SQS_POLL_THR_QUIESCED | 845 SQS_POLL_THR_RESTART); 846 sqp->sq_state |= SQS_POLL_CAPAB; 847 cv_signal(&sqp->sq_worker_cv); 848 return; 849 } 850 851 if (sqp->sq_state & SQS_POLL_THR_QUIESCE) { 852 sqp->sq_state |= SQS_POLL_THR_QUIESCED; 853 sqp->sq_state &= ~SQS_POLL_THR_QUIESCE; 854 cv_signal(&sqp->sq_worker_cv); 855 return; 856 } 857 } 858 859 /* 860 * POLLING Notes 861 * 862 * With polling mode, we want to do as much processing as we possibly can 863 * in worker thread context. The sweet spot is worker thread keeps doing 864 * work all the time in polling mode and writers etc. keep dumping packets 865 * to worker thread. Occassionally, we send the poll thread (running at 866 * lower priority to NIC to get the chain of packets to feed to worker). 867 * Sending the poll thread down to NIC is dependant on 3 criterions 868 * 869 * 1) Its always driven from squeue_drain and only if worker thread is 870 * doing the drain. 871 * 2) We clear the backlog once and more packets arrived in between. 872 * Before starting drain again, send the poll thread down if 873 * the drain is being done by worker thread. 874 * 3) Before exiting the squeue_drain, if the poll thread is not already 875 * working and we are the worker thread, try to poll one more time. 876 * 877 * For latency sake, we do allow any thread calling squeue_enter 878 * to process its packet provided: 879 * 880 * 1) Nothing is queued 881 * 2) If more packets arrived in between, the non worker thread are allowed 882 * to do the drain till their time quanta expired provided SQS_GET_PKTS 883 * wasn't set in between. 884 * 885 * Avoiding deadlocks with interrupts 886 * ================================== 887 * 888 * One of the big problem is that we can't send poll_thr down while holding 889 * the sq_lock since the thread can block. So we drop the sq_lock before 890 * calling sq_get_pkts(). We keep holding the SQS_PROC as long as the 891 * poll thread is running so that no other thread can acquire the 892 * perimeter in between. If the squeue_drain gets done (no more work 893 * left), it leaves the SQS_PROC set if poll thread is running. 894 */ 895 896 /* 897 * This is the squeue poll thread. In poll mode, it polls the underlying 898 * TCP softring and feeds packets into the squeue. The worker thread then 899 * drains the squeue. The poll thread also responds to control signals for 900 * quiesceing, restarting, or cleanup of an squeue. These are driven by 901 * control operations like plumb/unplumb or as a result of dynamic Rx ring 902 * related operations that are driven from the mac layer. 903 */ 904 static void 905 squeue_polling_thread(squeue_t *sqp) 906 { 907 kmutex_t *lock = &sqp->sq_lock; 908 kcondvar_t *async = &sqp->sq_poll_cv; 909 ip_mac_rx_t sq_get_pkts; 910 ip_accept_t ip_accept; 911 ill_rx_ring_t *sq_rx_ring; 912 ill_t *sq_ill; 913 mblk_t *head, *tail, *mp; 914 uint_t cnt; 915 void *sq_mac_handle; 916 callb_cpr_t cprinfo; 917 size_t bytes_to_pickup; 918 uint32_t ctl_state; 919 920 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_poll"); 921 mutex_enter(lock); 922 923 for (;;) { 924 CALLB_CPR_SAFE_BEGIN(&cprinfo); 925 cv_wait(async, lock); 926 CALLB_CPR_SAFE_END(&cprinfo, lock); 927 928 ctl_state = sqp->sq_state & (SQS_POLL_THR_CONTROL | 929 SQS_POLL_THR_QUIESCED); 930 if (ctl_state != 0) { 931 /* 932 * If the squeue is quiesced, then wait for a control 933 * request. A quiesced squeue must not poll the 934 * underlying soft ring. 935 */ 936 if (ctl_state == SQS_POLL_THR_QUIESCED) 937 continue; 938 /* 939 * Act on control requests to quiesce, cleanup or 940 * restart an squeue 941 */ 942 squeue_poll_thr_control(sqp); 943 continue; 944 } 945 946 if (!(sqp->sq_state & SQS_POLL_CAPAB)) 947 continue; 948 949 ASSERT((sqp->sq_state & 950 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) == 951 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)); 952 953 poll_again: 954 sq_rx_ring = sqp->sq_rx_ring; 955 sq_get_pkts = sq_rx_ring->rr_rx; 956 sq_mac_handle = sq_rx_ring->rr_rx_handle; 957 ip_accept = sq_rx_ring->rr_ip_accept; 958 sq_ill = sq_rx_ring->rr_ill; 959 bytes_to_pickup = MAX_BYTES_TO_PICKUP; 960 mutex_exit(lock); 961 head = sq_get_pkts(sq_mac_handle, bytes_to_pickup); 962 mp = NULL; 963 if (head != NULL) { 964 /* 965 * We got the packet chain from the mac layer. It 966 * would be nice to be able to process it inline 967 * for better performance but we need to give 968 * IP a chance to look at this chain to ensure 969 * that packets are really meant for this squeue 970 * and do the IP processing. 971 */ 972 mp = ip_accept(sq_ill, sq_rx_ring, sqp, head, 973 &tail, &cnt); 974 } 975 mutex_enter(lock); 976 if (mp != NULL) { 977 /* 978 * The ip_accept function has already added an 979 * ip_recv_attr_t mblk if that is needed. 980 */ 981 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 982 } 983 ASSERT((sqp->sq_state & 984 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) == 985 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)); 986 987 if (sqp->sq_first != NULL && !(sqp->sq_state & SQS_WORKER)) { 988 /* 989 * We have packets to process and worker thread 990 * is not running. Check to see if poll thread is 991 * allowed to process. Let it do processing only if it 992 * picked up some packets from the NIC otherwise 993 * wakeup the worker thread. 994 */ 995 if (mp != NULL) { 996 hrtime_t now; 997 998 now = gethrtime(); 999 sqp->sq_run = curthread; 1000 sqp->sq_drain(sqp, SQS_POLL_PROC, now + 1001 squeue_drain_ns); 1002 sqp->sq_run = NULL; 1003 1004 if (sqp->sq_first == NULL) 1005 goto poll_again; 1006 1007 /* 1008 * Couldn't do the entire drain because the 1009 * time limit expired, let the 1010 * worker thread take over. 1011 */ 1012 } 1013 1014 /* 1015 * Put the SQS_PROC_HELD on so the worker 1016 * thread can distinguish where its called from. We 1017 * can remove the SQS_PROC flag here and turn off the 1018 * polling so that it wouldn't matter who gets the 1019 * processing but we get better performance this way 1020 * and save the cost of turn polling off and possibly 1021 * on again as soon as we start draining again. 1022 * 1023 * We can't remove the SQS_PROC flag without turning 1024 * polling off until we can guarantee that control 1025 * will return to squeue_drain immediately. 1026 */ 1027 sqp->sq_state |= SQS_PROC_HELD; 1028 sqp->sq_state &= ~SQS_GET_PKTS; 1029 squeue_worker_wakeup(sqp); 1030 } else if (sqp->sq_first == NULL && 1031 !(sqp->sq_state & SQS_WORKER)) { 1032 /* 1033 * Nothing queued and worker thread not running. 1034 * Since we hold the proc, no other thread is 1035 * processing the squeue. This means that there 1036 * is no work to be done and nothing is queued 1037 * in squeue or in NIC. Turn polling off and go 1038 * back to interrupt mode. 1039 */ 1040 sqp->sq_state &= ~(SQS_PROC|SQS_GET_PKTS); 1041 /* LINTED: constant in conditional context */ 1042 SQS_POLLING_OFF(sqp, B_TRUE, sq_rx_ring); 1043 1044 /* 1045 * If there is a pending control operation 1046 * wake up the worker, since it is currently 1047 * not running. 1048 */ 1049 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) { 1050 squeue_worker_wakeup(sqp); 1051 } 1052 } else { 1053 /* 1054 * Worker thread is already running. We don't need 1055 * to do anything. Indicate that poll thread is done. 1056 */ 1057 sqp->sq_state &= ~SQS_GET_PKTS; 1058 } 1059 if (sqp->sq_state & SQS_POLL_THR_CONTROL) { 1060 /* 1061 * Act on control requests to quiesce, cleanup or 1062 * restart an squeue 1063 */ 1064 squeue_poll_thr_control(sqp); 1065 } 1066 } 1067 } 1068 1069 /* 1070 * The squeue worker thread acts on any control requests to quiesce, cleanup 1071 * or restart an ill_rx_ring_t by calling this function. The worker thread 1072 * synchronizes with the squeue poll thread to complete the request and finally 1073 * wakes up the requestor when the request is completed. 1074 */ 1075 static void 1076 squeue_worker_thr_control(squeue_t *sqp) 1077 { 1078 ill_t *ill; 1079 ill_rx_ring_t *rx_ring; 1080 1081 ASSERT(MUTEX_HELD(&sqp->sq_lock)); 1082 1083 if (sqp->sq_state & SQS_POLL_RESTART) { 1084 /* Restart implies a previous quiesce. */ 1085 ASSERT((sqp->sq_state & (SQS_PROC_HELD | 1086 SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)) == 1087 (SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)); 1088 /* 1089 * Request the squeue poll thread to restart and wait till 1090 * it actually restarts. 1091 */ 1092 sqp->sq_state &= ~SQS_POLL_QUIESCE_DONE; 1093 sqp->sq_state |= SQS_POLL_THR_RESTART; 1094 cv_signal(&sqp->sq_poll_cv); 1095 while (sqp->sq_state & SQS_POLL_THR_QUIESCED) 1096 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock); 1097 sqp->sq_state &= ~(SQS_POLL_RESTART | SQS_PROC | 1098 SQS_WORKER); 1099 /* 1100 * Signal any waiter that is waiting for the restart 1101 * to complete 1102 */ 1103 sqp->sq_state |= SQS_POLL_RESTART_DONE; 1104 cv_signal(&sqp->sq_ctrlop_done_cv); 1105 return; 1106 } 1107 1108 if (sqp->sq_state & SQS_PROC_HELD) { 1109 /* The squeue poll thread handed control to us */ 1110 ASSERT(sqp->sq_state & SQS_PROC); 1111 } 1112 1113 /* 1114 * Prevent any other thread from processing the squeue 1115 * until we finish the control actions by setting SQS_PROC. 1116 * But allow ourself to reenter by setting SQS_WORKER 1117 */ 1118 sqp->sq_state |= (SQS_PROC | SQS_WORKER); 1119 1120 /* Signal the squeue poll thread and wait for it to quiesce itself */ 1121 if (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) { 1122 sqp->sq_state |= SQS_POLL_THR_QUIESCE; 1123 cv_signal(&sqp->sq_poll_cv); 1124 while (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) 1125 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock); 1126 } 1127 1128 rx_ring = sqp->sq_rx_ring; 1129 ill = rx_ring->rr_ill; 1130 /* 1131 * The lock hierarchy is as follows. 1132 * cpu_lock -> ill_lock -> sqset_lock -> sq_lock 1133 */ 1134 mutex_exit(&sqp->sq_lock); 1135 mutex_enter(&ill->ill_lock); 1136 mutex_enter(&sqp->sq_lock); 1137 1138 SQS_POLLING_OFF(sqp, (sqp->sq_state & SQS_POLL_CAPAB) != 0, 1139 sqp->sq_rx_ring); 1140 sqp->sq_state &= ~(SQS_POLL_CAPAB | SQS_GET_PKTS | SQS_PROC_HELD); 1141 if (sqp->sq_state & SQS_POLL_CLEANUP) { 1142 /* 1143 * Disassociate this squeue from its ill_rx_ring_t. 1144 * The rr_sqp, sq_rx_ring fields are protected by the 1145 * corresponding squeue, ill_lock* and sq_lock. Holding any 1146 * of them will ensure that the ring to squeue mapping does 1147 * not change. 1148 */ 1149 ASSERT(!(sqp->sq_state & SQS_DEFAULT)); 1150 1151 sqp->sq_rx_ring = NULL; 1152 rx_ring->rr_sqp = NULL; 1153 1154 sqp->sq_state &= ~(SQS_POLL_CLEANUP | SQS_POLL_THR_QUIESCED | 1155 SQS_POLL_QUIESCE_DONE); 1156 sqp->sq_ill = NULL; 1157 1158 rx_ring->rr_rx_handle = NULL; 1159 rx_ring->rr_intr_handle = NULL; 1160 rx_ring->rr_intr_enable = NULL; 1161 rx_ring->rr_intr_disable = NULL; 1162 sqp->sq_state |= SQS_POLL_CLEANUP_DONE; 1163 } else { 1164 sqp->sq_state &= ~SQS_POLL_QUIESCE; 1165 sqp->sq_state |= SQS_POLL_QUIESCE_DONE; 1166 } 1167 /* 1168 * Signal any waiter that is waiting for the quiesce or cleanup 1169 * to complete and also wait for it to actually see and reset the 1170 * SQS_POLL_CLEANUP_DONE. 1171 */ 1172 cv_signal(&sqp->sq_ctrlop_done_cv); 1173 mutex_exit(&ill->ill_lock); 1174 if (sqp->sq_state & SQS_POLL_CLEANUP_DONE) { 1175 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock); 1176 sqp->sq_state &= ~(SQS_PROC | SQS_WORKER); 1177 } 1178 } 1179 1180 static void 1181 squeue_worker(squeue_t *sqp) 1182 { 1183 kmutex_t *lock = &sqp->sq_lock; 1184 kcondvar_t *async = &sqp->sq_worker_cv; 1185 callb_cpr_t cprinfo; 1186 hrtime_t now; 1187 1188 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_worker"); 1189 mutex_enter(lock); 1190 1191 for (;;) { 1192 for (;;) { 1193 /* 1194 * If the poll thread has handed control to us 1195 * we need to break out of the wait. 1196 */ 1197 if (sqp->sq_state & SQS_PROC_HELD) 1198 break; 1199 1200 /* 1201 * If the squeue is not being processed and we either 1202 * have messages to drain or some thread has signaled 1203 * some control activity we need to break 1204 */ 1205 if (!(sqp->sq_state & SQS_PROC) && 1206 ((sqp->sq_state & SQS_WORKER_THR_CONTROL) || 1207 (sqp->sq_first != NULL))) 1208 break; 1209 1210 /* 1211 * If we have started some control action, then check 1212 * for the SQS_WORKER flag (since we don't 1213 * release the squeue) to make sure we own the squeue 1214 * and break out 1215 */ 1216 if ((sqp->sq_state & SQS_WORKER_THR_CONTROL) && 1217 (sqp->sq_state & SQS_WORKER)) 1218 break; 1219 1220 CALLB_CPR_SAFE_BEGIN(&cprinfo); 1221 cv_wait(async, lock); 1222 CALLB_CPR_SAFE_END(&cprinfo, lock); 1223 } 1224 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) { 1225 squeue_worker_thr_control(sqp); 1226 continue; 1227 } 1228 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 1229 SQS_POLL_CLEANUP_DONE | SQS_POLL_QUIESCE_DONE | 1230 SQS_WORKER_THR_CONTROL | SQS_POLL_THR_CONTROL))); 1231 1232 if (sqp->sq_state & SQS_PROC_HELD) 1233 sqp->sq_state &= ~SQS_PROC_HELD; 1234 1235 now = gethrtime(); 1236 sqp->sq_run = curthread; 1237 sqp->sq_drain(sqp, SQS_WORKER, now + squeue_drain_ns); 1238 sqp->sq_run = NULL; 1239 } 1240 } 1241 1242 uintptr_t * 1243 squeue_getprivate(squeue_t *sqp, sqprivate_t p) 1244 { 1245 ASSERT(p < SQPRIVATE_MAX); 1246 1247 return (&sqp->sq_private[p]); 1248 } 1249 1250 /* ARGSUSED */ 1251 void 1252 squeue_wakeup_conn(void *arg, mblk_t *mp, void *arg2, ip_recv_attr_t *dummy) 1253 { 1254 conn_t *connp = (conn_t *)arg; 1255 squeue_t *sqp = connp->conn_sqp; 1256 1257 /* 1258 * Mark the squeue as paused before waking up the thread stuck 1259 * in squeue_synch_enter(). 1260 */ 1261 mutex_enter(&sqp->sq_lock); 1262 sqp->sq_state |= SQS_PAUSE; 1263 1264 /* 1265 * Notify the thread that it's OK to proceed; that is done by 1266 * clearing the MSGWAITSYNC flag. The synch thread will free the mblk. 1267 */ 1268 ASSERT(mp->b_flag & MSGWAITSYNC); 1269 mp->b_flag &= ~MSGWAITSYNC; 1270 cv_broadcast(&connp->conn_sq_cv); 1271 1272 /* 1273 * We are doing something on behalf of another thread, so we have to 1274 * pause and wait until it finishes. 1275 */ 1276 while (sqp->sq_state & SQS_PAUSE) { 1277 cv_wait(&sqp->sq_synch_cv, &sqp->sq_lock); 1278 } 1279 mutex_exit(&sqp->sq_lock); 1280 } 1281 1282 int 1283 squeue_synch_enter(conn_t *connp, mblk_t *use_mp) 1284 { 1285 squeue_t *sqp; 1286 1287 again: 1288 sqp = connp->conn_sqp; 1289 1290 mutex_enter(&sqp->sq_lock); 1291 if (sqp->sq_first == NULL && !(sqp->sq_state & SQS_PROC)) { 1292 /* 1293 * We are OK to proceed if the squeue is empty, and 1294 * no one owns the squeue. 1295 * 1296 * The caller won't own the squeue as this is called from the 1297 * application. 1298 */ 1299 ASSERT(sqp->sq_run == NULL); 1300 1301 sqp->sq_state |= SQS_PROC; 1302 sqp->sq_run = curthread; 1303 mutex_exit(&sqp->sq_lock); 1304 1305 /* 1306 * Handle squeue switching. The conn's squeue can only change 1307 * while there is a thread in the squeue, which is why we do 1308 * the check after entering the squeue. If it has changed, exit 1309 * this squeue and redo everything with the new sqeueue. 1310 */ 1311 if (sqp != connp->conn_sqp) { 1312 mutex_enter(&sqp->sq_lock); 1313 sqp->sq_state &= ~SQS_PROC; 1314 sqp->sq_run = NULL; 1315 mutex_exit(&sqp->sq_lock); 1316 goto again; 1317 } 1318 #if SQUEUE_DEBUG 1319 sqp->sq_curmp = NULL; 1320 sqp->sq_curproc = NULL; 1321 sqp->sq_connp = connp; 1322 #endif 1323 connp->conn_on_sqp = B_TRUE; 1324 return (0); 1325 } else { 1326 mblk_t *mp; 1327 1328 mp = (use_mp == NULL) ? allocb(0, BPRI_MED) : use_mp; 1329 if (mp == NULL) { 1330 mutex_exit(&sqp->sq_lock); 1331 return (ENOMEM); 1332 } 1333 1334 /* 1335 * We mark the mblk as awaiting synchronous squeue access 1336 * by setting the MSGWAITSYNC flag. Once squeue_wakeup_conn 1337 * fires, MSGWAITSYNC is cleared, at which point we know we 1338 * have exclusive access. 1339 */ 1340 mp->b_flag |= MSGWAITSYNC; 1341 1342 CONN_INC_REF(connp); 1343 SET_SQUEUE(mp, squeue_wakeup_conn, connp); 1344 ENQUEUE_CHAIN(sqp, mp, mp, 1); 1345 1346 ASSERT(sqp->sq_run != curthread); 1347 1348 /* Wait until the enqueued mblk get processed. */ 1349 while (mp->b_flag & MSGWAITSYNC) 1350 cv_wait(&connp->conn_sq_cv, &sqp->sq_lock); 1351 mutex_exit(&sqp->sq_lock); 1352 1353 if (use_mp == NULL) 1354 freeb(mp); 1355 1356 return (0); 1357 } 1358 } 1359 1360 /* 1361 * If possible, attempt to immediately process a single queued request, should 1362 * it match the supplied conn_t reference. This is primarily intended to elide 1363 * squeue worker thread wake-ups during local TCP connect() or close() 1364 * operations where the response is placed on the squeue during processing. 1365 */ 1366 static void 1367 squeue_try_drain_one(squeue_t *sqp, conn_t *compare_conn) 1368 { 1369 mblk_t *next, *mp = sqp->sq_first; 1370 conn_t *connp; 1371 sqproc_t proc = (sqproc_t)mp->b_queue; 1372 ip_recv_attr_t iras, *ira = NULL; 1373 1374 ASSERT(MUTEX_HELD(&sqp->sq_lock)); 1375 ASSERT((sqp->sq_state & SQS_PROC) == 0); 1376 ASSERT(sqp->sq_run == NULL); 1377 VERIFY(mp != NULL); 1378 1379 /* 1380 * There is no guarantee that compare_conn references a valid object at 1381 * this time, so under no circumstance may it be deferenced unless it 1382 * matches the squeue entry. 1383 */ 1384 connp = (conn_t *)mp->b_prev; 1385 if (connp != compare_conn) { 1386 return; 1387 } 1388 1389 next = mp->b_next; 1390 proc = (sqproc_t)mp->b_queue; 1391 1392 ASSERT(proc != NULL); 1393 ASSERT(sqp->sq_count > 0); 1394 1395 /* Dequeue item from squeue */ 1396 if (next == NULL) { 1397 sqp->sq_first = NULL; 1398 sqp->sq_last = NULL; 1399 } else { 1400 sqp->sq_first = next; 1401 } 1402 sqp->sq_count--; 1403 1404 sqp->sq_state |= SQS_PROC; 1405 sqp->sq_run = curthread; 1406 mutex_exit(&sqp->sq_lock); 1407 1408 /* Prep mblk_t and retrieve ira if needed */ 1409 mp->b_prev = NULL; 1410 mp->b_queue = NULL; 1411 mp->b_next = NULL; 1412 if (ip_recv_attr_is_mblk(mp)) { 1413 mblk_t *attrmp = mp; 1414 1415 ASSERT(attrmp->b_cont != NULL); 1416 1417 mp = attrmp->b_cont; 1418 attrmp->b_cont = NULL; 1419 1420 ASSERT(mp->b_queue == NULL); 1421 ASSERT(mp->b_prev == NULL); 1422 1423 if (!ip_recv_attr_from_mblk(attrmp, &iras)) { 1424 /* ill_t or ip_stack_t disappeared */ 1425 ip_drop_input("ip_recv_attr_from_mblk", mp, NULL); 1426 ira_cleanup(&iras, B_TRUE); 1427 CONN_DEC_REF(connp); 1428 goto done; 1429 } 1430 ira = &iras; 1431 } 1432 1433 SQUEUE_DBG_SET(sqp, mp, proc, connp, mp->b_tag); 1434 connp->conn_on_sqp = B_TRUE; 1435 DTRACE_PROBE3(squeue__proc__start, squeue_t *, sqp, mblk_t *, mp, 1436 conn_t *, connp); 1437 (*proc)(connp, mp, sqp, ira); 1438 DTRACE_PROBE2(squeue__proc__end, squeue_t *, sqp, conn_t *, connp); 1439 connp->conn_on_sqp = B_FALSE; 1440 CONN_DEC_REF(connp); 1441 SQUEUE_DBG_CLEAR(sqp); 1442 1443 done: 1444 mutex_enter(&sqp->sq_lock); 1445 sqp->sq_state &= ~(SQS_PROC); 1446 sqp->sq_run = NULL; 1447 } 1448 1449 void 1450 squeue_synch_exit(conn_t *connp, int flag) 1451 { 1452 squeue_t *sqp = connp->conn_sqp; 1453 1454 ASSERT(flag == SQ_NODRAIN || flag == SQ_PROCESS); 1455 1456 mutex_enter(&sqp->sq_lock); 1457 if (sqp->sq_run != curthread) { 1458 /* 1459 * The caller doesn't own the squeue, clear the SQS_PAUSE flag, 1460 * and wake up the squeue owner, such that owner can continue 1461 * processing. 1462 */ 1463 ASSERT(sqp->sq_state & SQS_PAUSE); 1464 sqp->sq_state &= ~SQS_PAUSE; 1465 1466 /* There should be only one thread blocking on sq_synch_cv. */ 1467 cv_signal(&sqp->sq_synch_cv); 1468 mutex_exit(&sqp->sq_lock); 1469 return; 1470 } 1471 1472 ASSERT(sqp->sq_state & SQS_PROC); 1473 1474 sqp->sq_state &= ~SQS_PROC; 1475 sqp->sq_run = NULL; 1476 connp->conn_on_sqp = B_FALSE; 1477 1478 /* If the caller opted in, attempt to process the head squeue item. */ 1479 if (flag == SQ_PROCESS && sqp->sq_first != NULL) { 1480 squeue_try_drain_one(sqp, connp); 1481 } 1482 1483 /* Wake up the worker if further requests are pending. */ 1484 if (sqp->sq_first != NULL) { 1485 squeue_worker_wakeup(sqp); 1486 } 1487 mutex_exit(&sqp->sq_lock); 1488 } 1489