1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. 23 */ 24 25 /* 26 * Copyright 2017 Joyent, Inc. 27 */ 28 29 /* 30 * Squeues: General purpose serialization mechanism 31 * ------------------------------------------------ 32 * 33 * Background: 34 * ----------- 35 * 36 * This is a general purpose high-performance serialization mechanism 37 * currently used by TCP/IP. It is implement by means of a per CPU queue, 38 * a worker thread and a polling thread with are bound to the CPU 39 * associated with the squeue. The squeue is strictly FIFO for both read 40 * and write side and only one thread can process it at any given time. 41 * The design goal of squeue was to offer a very high degree of 42 * parallelization (on a per H/W execution pipeline basis) with at 43 * most one queuing. 44 * 45 * The modules needing protection typically calls SQUEUE_ENTER_ONE() or 46 * SQUEUE_ENTER() macro as soon as a thread enter the module 47 * from either direction. For each packet, the processing function 48 * and argument is stored in the mblk itself. When the packet is ready 49 * to be processed, the squeue retrieves the stored function and calls 50 * it with the supplied argument and the pointer to the packet itself. 51 * The called function can assume that no other thread is processing 52 * the squeue when it is executing. 53 * 54 * Squeue/connection binding: 55 * -------------------------- 56 * 57 * TCP/IP uses an IP classifier in conjunction with squeue where specific 58 * connections are assigned to specific squeue (based on various policies), 59 * at the connection creation time. Once assigned, the connection to 60 * squeue mapping is never changed and all future packets for that 61 * connection are processed on that squeue. The connection ("conn") to 62 * squeue mapping is stored in "conn_t" member "conn_sqp". 63 * 64 * Since the processing of the connection cuts across multiple layers 65 * but still allows packets for different connnection to be processed on 66 * other CPU/squeues, squeues are also termed as "Vertical Perimeter" or 67 * "Per Connection Vertical Perimeter". 68 * 69 * Processing Model: 70 * ----------------- 71 * 72 * Squeue doesn't necessary processes packets with its own worker thread. 73 * The callers can pick if they just want to queue the packet, process 74 * their packet if nothing is queued or drain and process. The first two 75 * modes are typically employed when the packet was generated while 76 * already doing the processing behind the squeue and last mode (drain 77 * and process) is typically employed when the thread is entering squeue 78 * for the first time. The squeue still imposes a finite time limit 79 * for which a external thread can do processing after which it switches 80 * processing to its own worker thread. 81 * 82 * Once created, squeues are never deleted. Hence squeue pointers are 83 * always valid. This means that functions outside the squeue can still 84 * refer safely to conn_sqp and their is no need for ref counts. 85 * 86 * Only a thread executing in the squeue can change the squeue of the 87 * connection. It does so by calling a squeue framework function to do this. 88 * After changing the squeue, the thread must leave the squeue. It must not 89 * continue to execute any code that needs squeue protection. 90 * 91 * The squeue framework, after entering the squeue, checks if the current 92 * squeue matches the conn_sqp. If the check fails, the packet is delivered 93 * to right squeue. 94 * 95 * Polling Model: 96 * -------------- 97 * 98 * Squeues can control the rate of packet arrival into itself from the 99 * NIC or specific Rx ring within a NIC. As part of capability negotiation 100 * between IP and MAC layer, squeue are created for each TCP soft ring 101 * (or TCP Rx ring - to be implemented in future). As part of this 102 * negotiation, squeues get a cookie for underlying soft ring or Rx 103 * ring, a function to turn off incoming packets and a function to call 104 * to poll for packets. This helps schedule the receive side packet 105 * processing so that queue backlog doesn't build up and packet processing 106 * doesn't keep getting disturbed by high priority interrupts. As part 107 * of this mode, as soon as a backlog starts building, squeue turns off 108 * the interrupts and switches to poll mode. In poll mode, when poll 109 * thread goes down to retrieve packets, it retrieves them in the form of 110 * a chain which improves performance even more. As the squeue/softring 111 * system gets more packets, it gets more efficient by switching to 112 * polling more often and dealing with larger packet chains. 113 * 114 */ 115 116 #include <sys/types.h> 117 #include <sys/cmn_err.h> 118 #include <sys/debug.h> 119 #include <sys/kmem.h> 120 #include <sys/cpuvar.h> 121 #include <sys/condvar_impl.h> 122 #include <sys/systm.h> 123 #include <sys/callb.h> 124 #include <sys/sdt.h> 125 #include <sys/ddi.h> 126 #include <sys/sunddi.h> 127 #include <sys/stack.h> 128 #include <sys/archsystm.h> 129 130 #include <inet/ipclassifier.h> 131 #include <inet/udp_impl.h> 132 133 #include <sys/squeue_impl.h> 134 135 static void squeue_drain(squeue_t *, uint_t, hrtime_t); 136 static void squeue_worker(squeue_t *sqp); 137 static void squeue_polling_thread(squeue_t *sqp); 138 static void squeue_worker_wakeup(squeue_t *sqp); 139 140 kmem_cache_t *squeue_cache; 141 142 #define SQUEUE_MSEC_TO_NSEC 1000000 143 144 int squeue_drain_ms = 20; 145 146 /* The values above converted to ticks or nano seconds */ 147 static uint_t squeue_drain_ns = 0; 148 149 uintptr_t squeue_drain_stack_needed = 10240; 150 uint_t squeue_drain_stack_toodeep; 151 152 #define MAX_BYTES_TO_PICKUP 150000 153 154 #define ENQUEUE_CHAIN(sqp, mp, tail, cnt) { \ 155 /* \ 156 * Enqueue our mblk chain. \ 157 */ \ 158 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 159 \ 160 if ((sqp)->sq_last != NULL) \ 161 (sqp)->sq_last->b_next = (mp); \ 162 else \ 163 (sqp)->sq_first = (mp); \ 164 (sqp)->sq_last = (tail); \ 165 (sqp)->sq_count += (cnt); \ 166 ASSERT((sqp)->sq_count > 0); \ 167 DTRACE_PROBE4(squeue__enqueuechain, squeue_t *, sqp, \ 168 mblk_t *, mp, mblk_t *, tail, int, cnt); \ 169 \ 170 } 171 172 /* 173 * Blank the receive ring (in this case it is the soft ring). When 174 * blanked, the soft ring will not send any more packets up. 175 * Blanking may not succeed when there is a CPU already in the soft 176 * ring sending packets up. In that case, SQS_POLLING will not be 177 * set. 178 */ 179 #define SQS_POLLING_ON(sqp, sq_poll_capable, rx_ring) { \ 180 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 181 if (sq_poll_capable) { \ 182 ASSERT(rx_ring != NULL); \ 183 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \ 184 if (!(sqp->sq_state & SQS_POLLING)) { \ 185 if (rx_ring->rr_intr_disable(rx_ring->rr_intr_handle)) \ 186 sqp->sq_state |= SQS_POLLING; \ 187 } \ 188 } \ 189 } 190 191 #define SQS_POLLING_OFF(sqp, sq_poll_capable, rx_ring) { \ 192 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 193 if (sq_poll_capable) { \ 194 ASSERT(rx_ring != NULL); \ 195 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \ 196 if (sqp->sq_state & SQS_POLLING) { \ 197 sqp->sq_state &= ~SQS_POLLING; \ 198 rx_ring->rr_intr_enable(rx_ring->rr_intr_handle); \ 199 } \ 200 } \ 201 } 202 203 /* Wakeup poll thread only if SQS_POLLING is set */ 204 #define SQS_POLL_RING(sqp) { \ 205 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ 206 if (sqp->sq_state & SQS_POLLING) { \ 207 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \ 208 if (!(sqp->sq_state & SQS_GET_PKTS)) { \ 209 sqp->sq_state |= SQS_GET_PKTS; \ 210 cv_signal(&sqp->sq_poll_cv); \ 211 } \ 212 } \ 213 } 214 215 #ifdef DEBUG 216 #define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) { \ 217 (sqp)->sq_curmp = (mp); \ 218 (sqp)->sq_curproc = (proc); \ 219 (sqp)->sq_connp = (connp); \ 220 (mp)->b_tag = (sqp)->sq_tag = (tag); \ 221 } 222 223 #define SQUEUE_DBG_CLEAR(sqp) { \ 224 (sqp)->sq_curmp = NULL; \ 225 (sqp)->sq_curproc = NULL; \ 226 (sqp)->sq_connp = NULL; \ 227 } 228 #else 229 #define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) 230 #define SQUEUE_DBG_CLEAR(sqp) 231 #endif 232 233 void 234 squeue_init(void) 235 { 236 squeue_cache = kmem_cache_create("squeue_cache", 237 sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0); 238 239 squeue_drain_ns = squeue_drain_ms * SQUEUE_MSEC_TO_NSEC; 240 } 241 242 squeue_t * 243 squeue_create(pri_t pri) 244 { 245 squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP); 246 247 bzero(sqp, sizeof (squeue_t)); 248 sqp->sq_bind = PBIND_NONE; 249 sqp->sq_priority = pri; 250 sqp->sq_worker = thread_create(NULL, 0, squeue_worker, 251 sqp, 0, &p0, TS_RUN, pri); 252 253 sqp->sq_poll_thr = thread_create(NULL, 0, squeue_polling_thread, 254 sqp, 0, &p0, TS_RUN, pri); 255 256 sqp->sq_enter = squeue_enter; 257 sqp->sq_drain = squeue_drain; 258 259 return (sqp); 260 } 261 262 /* 263 * Bind squeue worker thread to the specified CPU, given by CPU id. 264 * If the CPU id value is -1, bind the worker thread to the value 265 * specified in sq_bind field. If a thread is already bound to a 266 * different CPU, unbind it from the old CPU and bind to the new one. 267 */ 268 269 void 270 squeue_bind(squeue_t *sqp, processorid_t bind) 271 { 272 mutex_enter(&sqp->sq_lock); 273 ASSERT(sqp->sq_bind != PBIND_NONE || bind != PBIND_NONE); 274 ASSERT(MUTEX_HELD(&cpu_lock)); 275 276 if (sqp->sq_state & SQS_BOUND) { 277 if (sqp->sq_bind == bind) { 278 mutex_exit(&sqp->sq_lock); 279 return; 280 } 281 thread_affinity_clear(sqp->sq_worker); 282 } else { 283 sqp->sq_state |= SQS_BOUND; 284 } 285 286 if (bind != PBIND_NONE) 287 sqp->sq_bind = bind; 288 289 thread_affinity_set(sqp->sq_worker, sqp->sq_bind); 290 mutex_exit(&sqp->sq_lock); 291 } 292 293 void 294 squeue_unbind(squeue_t *sqp) 295 { 296 mutex_enter(&sqp->sq_lock); 297 if (!(sqp->sq_state & SQS_BOUND)) { 298 mutex_exit(&sqp->sq_lock); 299 return; 300 } 301 302 sqp->sq_state &= ~SQS_BOUND; 303 thread_affinity_clear(sqp->sq_worker); 304 mutex_exit(&sqp->sq_lock); 305 } 306 307 /* 308 * squeue_enter() - enter squeue sqp with mblk mp (which can be 309 * a chain), while tail points to the end and cnt in number of 310 * mblks in the chain. 311 * 312 * For a chain of single packet (i.e. mp == tail), go through the 313 * fast path if no one is processing the squeue and nothing is queued. 314 * 315 * The proc and arg for each mblk is already stored in the mblk in 316 * appropriate places. 317 * 318 * The process_flag specifies if we are allowed to process the mblk 319 * and drain in the entering thread context. If process_flag is 320 * SQ_FILL, then we just queue the mblk and return (after signaling 321 * the worker thread if no one else is processing the squeue). 322 * 323 * The ira argument can be used when the count is one. 324 * For a chain the caller needs to prepend any needed mblks from 325 * ip_recv_attr_to_mblk(). 326 */ 327 /* ARGSUSED */ 328 void 329 squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt, 330 ip_recv_attr_t *ira, int process_flag, uint8_t tag) 331 { 332 conn_t *connp; 333 sqproc_t proc; 334 hrtime_t now; 335 336 ASSERT(sqp != NULL); 337 ASSERT(mp != NULL); 338 ASSERT(tail != NULL); 339 ASSERT(cnt > 0); 340 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 341 ASSERT(ira == NULL || cnt == 1); 342 343 mutex_enter(&sqp->sq_lock); 344 345 /* 346 * Try to process the packet if SQ_FILL flag is not set and 347 * we are allowed to process the squeue. The SQ_NODRAIN is 348 * ignored if the packet chain consists of more than 1 packet. 349 */ 350 if (!(sqp->sq_state & SQS_PROC) && ((process_flag == SQ_PROCESS) || 351 (process_flag == SQ_NODRAIN && sqp->sq_first == NULL))) { 352 /* 353 * See if anything is already queued. If we are the 354 * first packet, do inline processing else queue the 355 * packet and do the drain. 356 */ 357 if (sqp->sq_first == NULL && cnt == 1) { 358 /* 359 * Fast-path, ok to process and nothing queued. 360 */ 361 sqp->sq_state |= (SQS_PROC|SQS_FAST); 362 sqp->sq_run = curthread; 363 mutex_exit(&sqp->sq_lock); 364 365 /* 366 * We are the chain of 1 packet so 367 * go through this fast path. 368 */ 369 ASSERT(mp->b_prev != NULL); 370 ASSERT(mp->b_queue != NULL); 371 connp = (conn_t *)mp->b_prev; 372 mp->b_prev = NULL; 373 proc = (sqproc_t)mp->b_queue; 374 mp->b_queue = NULL; 375 ASSERT(proc != NULL && connp != NULL); 376 ASSERT(mp->b_next == NULL); 377 378 /* 379 * Handle squeue switching. More details in the 380 * block comment at the top of the file 381 */ 382 if (connp->conn_sqp == sqp) { 383 SQUEUE_DBG_SET(sqp, mp, proc, connp, 384 tag); 385 connp->conn_on_sqp = B_TRUE; 386 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 387 sqp, mblk_t *, mp, conn_t *, connp); 388 (*proc)(connp, mp, sqp, ira); 389 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 390 sqp, conn_t *, connp); 391 connp->conn_on_sqp = B_FALSE; 392 SQUEUE_DBG_CLEAR(sqp); 393 CONN_DEC_REF(connp); 394 } else { 395 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, 396 connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE); 397 } 398 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); 399 mutex_enter(&sqp->sq_lock); 400 sqp->sq_state &= ~(SQS_PROC|SQS_FAST); 401 sqp->sq_run = NULL; 402 if (sqp->sq_first == NULL || 403 process_flag == SQ_NODRAIN) { 404 /* 405 * If work or control actions are pending, wake 406 * up the worker thread. 407 */ 408 if (sqp->sq_first != NULL || 409 sqp->sq_state & SQS_WORKER_THR_CONTROL) { 410 squeue_worker_wakeup(sqp); 411 } 412 mutex_exit(&sqp->sq_lock); 413 return; 414 } 415 } else { 416 if (ira != NULL) { 417 mblk_t *attrmp; 418 419 ASSERT(cnt == 1); 420 attrmp = ip_recv_attr_to_mblk(ira); 421 if (attrmp == NULL) { 422 mutex_exit(&sqp->sq_lock); 423 ip_drop_input("squeue: " 424 "ip_recv_attr_to_mblk", 425 mp, NULL); 426 /* Caller already set b_prev/b_next */ 427 mp->b_prev = mp->b_next = NULL; 428 freemsg(mp); 429 return; 430 } 431 ASSERT(attrmp->b_cont == NULL); 432 attrmp->b_cont = mp; 433 /* Move connp and func to new */ 434 attrmp->b_queue = mp->b_queue; 435 mp->b_queue = NULL; 436 attrmp->b_prev = mp->b_prev; 437 mp->b_prev = NULL; 438 439 ASSERT(mp == tail); 440 tail = mp = attrmp; 441 } 442 443 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 444 #ifdef DEBUG 445 mp->b_tag = tag; 446 #endif 447 } 448 /* 449 * We are here because either we couldn't do inline 450 * processing (because something was already queued), 451 * or we had a chain of more than one packet, 452 * or something else arrived after we were done with 453 * inline processing. 454 */ 455 ASSERT(MUTEX_HELD(&sqp->sq_lock)); 456 ASSERT(sqp->sq_first != NULL); 457 now = gethrtime(); 458 sqp->sq_run = curthread; 459 sqp->sq_drain(sqp, SQS_ENTER, now + squeue_drain_ns); 460 461 /* 462 * If we didn't do a complete drain, the worker 463 * thread was already signalled by squeue_drain. 464 * In case any control actions are pending, wake 465 * up the worker. 466 */ 467 sqp->sq_run = NULL; 468 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) { 469 squeue_worker_wakeup(sqp); 470 } 471 } else { 472 /* 473 * We let a thread processing a squeue reenter only 474 * once. This helps the case of incoming connection 475 * where a SYN-ACK-ACK that triggers the conn_ind 476 * doesn't have to queue the packet if listener and 477 * eager are on the same squeue. Also helps the 478 * loopback connection where the two ends are bound 479 * to the same squeue (which is typical on single 480 * CPU machines). 481 * 482 * We let the thread reenter only once for the fear 483 * of stack getting blown with multiple traversal. 484 */ 485 connp = (conn_t *)mp->b_prev; 486 if (!(sqp->sq_state & SQS_REENTER) && 487 (process_flag != SQ_FILL) && (sqp->sq_first == NULL) && 488 (sqp->sq_run == curthread) && (cnt == 1) && 489 (connp->conn_on_sqp == B_FALSE)) { 490 sqp->sq_state |= SQS_REENTER; 491 mutex_exit(&sqp->sq_lock); 492 493 ASSERT(mp->b_prev != NULL); 494 ASSERT(mp->b_queue != NULL); 495 496 mp->b_prev = NULL; 497 proc = (sqproc_t)mp->b_queue; 498 mp->b_queue = NULL; 499 500 /* 501 * Handle squeue switching. More details in the 502 * block comment at the top of the file 503 */ 504 if (connp->conn_sqp == sqp) { 505 connp->conn_on_sqp = B_TRUE; 506 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 507 sqp, mblk_t *, mp, conn_t *, connp); 508 (*proc)(connp, mp, sqp, ira); 509 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 510 sqp, conn_t *, connp); 511 connp->conn_on_sqp = B_FALSE; 512 CONN_DEC_REF(connp); 513 } else { 514 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, 515 connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE); 516 } 517 518 mutex_enter(&sqp->sq_lock); 519 sqp->sq_state &= ~SQS_REENTER; 520 mutex_exit(&sqp->sq_lock); 521 return; 522 } 523 524 /* 525 * Queue is already being processed or there is already 526 * one or more paquets on the queue. Enqueue the 527 * packet and wakeup the squeue worker thread if the 528 * squeue is not being processed. 529 */ 530 #ifdef DEBUG 531 mp->b_tag = tag; 532 #endif 533 if (ira != NULL) { 534 mblk_t *attrmp; 535 536 ASSERT(cnt == 1); 537 attrmp = ip_recv_attr_to_mblk(ira); 538 if (attrmp == NULL) { 539 mutex_exit(&sqp->sq_lock); 540 ip_drop_input("squeue: ip_recv_attr_to_mblk", 541 mp, NULL); 542 /* Caller already set b_prev/b_next */ 543 mp->b_prev = mp->b_next = NULL; 544 freemsg(mp); 545 return; 546 } 547 ASSERT(attrmp->b_cont == NULL); 548 attrmp->b_cont = mp; 549 /* Move connp and func to new */ 550 attrmp->b_queue = mp->b_queue; 551 mp->b_queue = NULL; 552 attrmp->b_prev = mp->b_prev; 553 mp->b_prev = NULL; 554 555 ASSERT(mp == tail); 556 tail = mp = attrmp; 557 } 558 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 559 /* 560 * If the worker isn't running or control actions are pending, 561 * wake it it up now. 562 */ 563 if ((sqp->sq_state & SQS_PROC) == 0 || 564 (sqp->sq_state & SQS_WORKER_THR_CONTROL) != 0) { 565 squeue_worker_wakeup(sqp); 566 } 567 } 568 mutex_exit(&sqp->sq_lock); 569 } 570 571 /* 572 * PRIVATE FUNCTIONS 573 */ 574 575 576 /* 577 * Wake up worker thread for squeue to process queued work. 578 */ 579 static void 580 squeue_worker_wakeup(squeue_t *sqp) 581 { 582 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); 583 584 cv_signal(&sqp->sq_worker_cv); 585 sqp->sq_awoken = gethrtime(); 586 } 587 588 static void 589 squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire) 590 { 591 mblk_t *mp; 592 mblk_t *head; 593 sqproc_t proc; 594 conn_t *connp; 595 ill_rx_ring_t *sq_rx_ring = sqp->sq_rx_ring; 596 hrtime_t now; 597 boolean_t sq_poll_capable; 598 ip_recv_attr_t *ira, iras; 599 600 /* 601 * Before doing any work, check our stack depth; if we're not a 602 * worker thread for this squeue and we're beginning to get tight on 603 * on stack, kick the worker, bump a counter and return. 604 */ 605 if (proc_type != SQS_WORKER && STACK_BIAS + (uintptr_t)getfp() - 606 (uintptr_t)curthread->t_stkbase < squeue_drain_stack_needed) { 607 ASSERT(mutex_owned(&sqp->sq_lock)); 608 squeue_worker_wakeup(sqp); 609 squeue_drain_stack_toodeep++; 610 return; 611 } 612 613 sq_poll_capable = (sqp->sq_state & SQS_POLL_CAPAB) != 0; 614 again: 615 ASSERT(mutex_owned(&sqp->sq_lock)); 616 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 617 SQS_POLL_QUIESCE_DONE))); 618 619 head = sqp->sq_first; 620 sqp->sq_first = NULL; 621 sqp->sq_last = NULL; 622 sqp->sq_count = 0; 623 624 sqp->sq_state |= SQS_PROC | proc_type; 625 626 /* 627 * We have backlog built up. Switch to polling mode if the 628 * device underneath allows it. Need to do it so that 629 * more packets don't come in and disturb us (by contending 630 * for sq_lock or higher priority thread preempting us). 631 * 632 * The worker thread is allowed to do active polling while we 633 * just disable the interrupts for drain by non worker (kernel 634 * or userland) threads so they can peacefully process the 635 * packets during time allocated to them. 636 */ 637 SQS_POLLING_ON(sqp, sq_poll_capable, sq_rx_ring); 638 mutex_exit(&sqp->sq_lock); 639 640 while ((mp = head) != NULL) { 641 642 head = mp->b_next; 643 mp->b_next = NULL; 644 645 proc = (sqproc_t)mp->b_queue; 646 mp->b_queue = NULL; 647 connp = (conn_t *)mp->b_prev; 648 mp->b_prev = NULL; 649 650 /* Is there an ip_recv_attr_t to handle? */ 651 if (ip_recv_attr_is_mblk(mp)) { 652 mblk_t *attrmp = mp; 653 654 ASSERT(attrmp->b_cont != NULL); 655 656 mp = attrmp->b_cont; 657 attrmp->b_cont = NULL; 658 ASSERT(mp->b_queue == NULL); 659 ASSERT(mp->b_prev == NULL); 660 661 if (!ip_recv_attr_from_mblk(attrmp, &iras)) { 662 /* The ill or ip_stack_t disappeared on us */ 663 ip_drop_input("ip_recv_attr_from_mblk", 664 mp, NULL); 665 ira_cleanup(&iras, B_TRUE); 666 CONN_DEC_REF(connp); 667 continue; 668 } 669 ira = &iras; 670 } else { 671 ira = NULL; 672 } 673 674 675 /* 676 * Handle squeue switching. More details in the 677 * block comment at the top of the file 678 */ 679 if (connp->conn_sqp == sqp) { 680 SQUEUE_DBG_SET(sqp, mp, proc, connp, 681 mp->b_tag); 682 connp->conn_on_sqp = B_TRUE; 683 DTRACE_PROBE3(squeue__proc__start, squeue_t *, 684 sqp, mblk_t *, mp, conn_t *, connp); 685 (*proc)(connp, mp, sqp, ira); 686 DTRACE_PROBE2(squeue__proc__end, squeue_t *, 687 sqp, conn_t *, connp); 688 connp->conn_on_sqp = B_FALSE; 689 CONN_DEC_REF(connp); 690 } else { 691 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, connp, ira, 692 SQ_FILL, SQTAG_SQUEUE_CHANGE); 693 } 694 if (ira != NULL) 695 ira_cleanup(ira, B_TRUE); 696 } 697 698 SQUEUE_DBG_CLEAR(sqp); 699 700 mutex_enter(&sqp->sq_lock); 701 702 /* 703 * Check if there is still work to do (either more arrived or timer 704 * expired). If we are the worker thread and we are polling capable, 705 * continue doing the work since no one else is around to do the 706 * work anyway (but signal the poll thread to retrieve some packets 707 * in the meanwhile). If we are not the worker thread, just 708 * signal the worker thread to take up the work if processing time 709 * has expired. 710 */ 711 if (sqp->sq_first != NULL) { 712 /* 713 * Still more to process. If time quanta not expired, we 714 * should let the drain go on. The worker thread is allowed 715 * to drain as long as there is anything left. 716 */ 717 now = gethrtime(); 718 if ((now < expire) || (proc_type == SQS_WORKER)) { 719 /* 720 * If time not expired or we are worker thread and 721 * this squeue is polling capable, continue to do 722 * the drain. 723 * 724 * We turn off interrupts for all userland threads 725 * doing drain but we do active polling only for 726 * worker thread. 727 * 728 * Calling SQS_POLL_RING() even in the case of 729 * SQS_POLLING_ON() not succeeding is ok as 730 * SQS_POLL_RING() will not wake up poll thread 731 * if SQS_POLLING bit is not set. 732 */ 733 if (proc_type == SQS_WORKER) 734 SQS_POLL_RING(sqp); 735 goto again; 736 } 737 738 squeue_worker_wakeup(sqp); 739 } 740 741 /* 742 * If the poll thread is already running, just return. The 743 * poll thread continues to hold the proc and will finish 744 * processing. 745 */ 746 if (sqp->sq_state & SQS_GET_PKTS) { 747 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 748 SQS_POLL_QUIESCE_DONE))); 749 sqp->sq_state &= ~proc_type; 750 return; 751 } 752 753 /* 754 * 755 * If we are the worker thread and no work is left, send the poll 756 * thread down once more to see if something arrived. Otherwise, 757 * turn the interrupts back on and we are done. 758 */ 759 if ((proc_type == SQS_WORKER) && (sqp->sq_state & SQS_POLLING)) { 760 /* 761 * Do one last check to see if anything arrived 762 * in the NIC. We leave the SQS_PROC set to ensure 763 * that poll thread keeps the PROC and can decide 764 * if it needs to turn polling off or continue 765 * processing. 766 * 767 * If we drop the SQS_PROC here and poll thread comes 768 * up empty handed, it can not safely turn polling off 769 * since someone else could have acquired the PROC 770 * and started draining. The previously running poll 771 * thread and the current thread doing drain would end 772 * up in a race for turning polling on/off and more 773 * complex code would be required to deal with it. 774 * 775 * Its lot simpler for drain to hand the SQS_PROC to 776 * poll thread (if running) and let poll thread finish 777 * without worrying about racing with any other thread. 778 */ 779 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 780 SQS_POLL_QUIESCE_DONE))); 781 SQS_POLL_RING(sqp); 782 sqp->sq_state &= ~proc_type; 783 } else { 784 /* 785 * The squeue is either not capable of polling or the 786 * attempt to blank (i.e., turn SQS_POLLING_ON()) was 787 * unsuccessful or poll thread already finished 788 * processing and didn't find anything. Since there 789 * is nothing queued and we already turn polling on 790 * (for all threads doing drain), we should turn 791 * polling off and relinquish the PROC. 792 */ 793 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 794 SQS_POLL_QUIESCE_DONE))); 795 SQS_POLLING_OFF(sqp, sq_poll_capable, sq_rx_ring); 796 sqp->sq_state &= ~(SQS_PROC | proc_type); 797 /* 798 * If we are not the worker and there is a pending quiesce 799 * event, wake up the worker 800 */ 801 if ((proc_type != SQS_WORKER) && 802 (sqp->sq_state & SQS_WORKER_THR_CONTROL)) { 803 squeue_worker_wakeup(sqp); 804 } 805 } 806 } 807 808 /* 809 * Quiesce, Restart, or Cleanup of the squeue poll thread. 810 * 811 * Quiesce and Restart: After an squeue poll thread has been quiesced, it does 812 * not attempt to poll the underlying soft ring any more. The quiesce is 813 * triggered by the mac layer when it wants to quiesce a soft ring. Typically 814 * control operations such as changing the fanout of a NIC or VNIC (dladm 815 * setlinkprop) need to quiesce data flow before changing the wiring. 816 * The operation is done by the mac layer, but it calls back into IP to 817 * quiesce the soft ring. After completing the operation (say increase or 818 * decrease of the fanout) the mac layer then calls back into IP to restart 819 * the quiesced soft ring. 820 * 821 * Cleanup: This is triggered when the squeue binding to a soft ring is 822 * removed permanently. Typically interface plumb and unplumb would trigger 823 * this. It can also be triggered from the mac layer when a soft ring is 824 * being deleted say as the result of a fanout reduction. Since squeues are 825 * never deleted, the cleanup marks the squeue as fit for recycling and 826 * moves it to the zeroth squeue set. 827 */ 828 static void 829 squeue_poll_thr_control(squeue_t *sqp) 830 { 831 if (sqp->sq_state & SQS_POLL_THR_RESTART) { 832 /* Restart implies a previous quiesce */ 833 ASSERT(sqp->sq_state & SQS_POLL_THR_QUIESCED); 834 sqp->sq_state &= ~(SQS_POLL_THR_QUIESCED | 835 SQS_POLL_THR_RESTART); 836 sqp->sq_state |= SQS_POLL_CAPAB; 837 cv_signal(&sqp->sq_worker_cv); 838 return; 839 } 840 841 if (sqp->sq_state & SQS_POLL_THR_QUIESCE) { 842 sqp->sq_state |= SQS_POLL_THR_QUIESCED; 843 sqp->sq_state &= ~SQS_POLL_THR_QUIESCE; 844 cv_signal(&sqp->sq_worker_cv); 845 return; 846 } 847 } 848 849 /* 850 * POLLING Notes 851 * 852 * With polling mode, we want to do as much processing as we possibly can 853 * in worker thread context. The sweet spot is worker thread keeps doing 854 * work all the time in polling mode and writers etc. keep dumping packets 855 * to worker thread. Occassionally, we send the poll thread (running at 856 * lower priority to NIC to get the chain of packets to feed to worker). 857 * Sending the poll thread down to NIC is dependant on 3 criterions 858 * 859 * 1) Its always driven from squeue_drain and only if worker thread is 860 * doing the drain. 861 * 2) We clear the backlog once and more packets arrived in between. 862 * Before starting drain again, send the poll thread down if 863 * the drain is being done by worker thread. 864 * 3) Before exiting the squeue_drain, if the poll thread is not already 865 * working and we are the worker thread, try to poll one more time. 866 * 867 * For latency sake, we do allow any thread calling squeue_enter 868 * to process its packet provided: 869 * 870 * 1) Nothing is queued 871 * 2) If more packets arrived in between, the non worker thread are allowed 872 * to do the drain till their time quanta expired provided SQS_GET_PKTS 873 * wasn't set in between. 874 * 875 * Avoiding deadlocks with interrupts 876 * ================================== 877 * 878 * One of the big problem is that we can't send poll_thr down while holding 879 * the sq_lock since the thread can block. So we drop the sq_lock before 880 * calling sq_get_pkts(). We keep holding the SQS_PROC as long as the 881 * poll thread is running so that no other thread can acquire the 882 * perimeter in between. If the squeue_drain gets done (no more work 883 * left), it leaves the SQS_PROC set if poll thread is running. 884 */ 885 886 /* 887 * This is the squeue poll thread. In poll mode, it polls the underlying 888 * TCP softring and feeds packets into the squeue. The worker thread then 889 * drains the squeue. The poll thread also responds to control signals for 890 * quiesceing, restarting, or cleanup of an squeue. These are driven by 891 * control operations like plumb/unplumb or as a result of dynamic Rx ring 892 * related operations that are driven from the mac layer. 893 */ 894 static void 895 squeue_polling_thread(squeue_t *sqp) 896 { 897 kmutex_t *lock = &sqp->sq_lock; 898 kcondvar_t *async = &sqp->sq_poll_cv; 899 ip_mac_rx_t sq_get_pkts; 900 ip_accept_t ip_accept; 901 ill_rx_ring_t *sq_rx_ring; 902 ill_t *sq_ill; 903 mblk_t *head, *tail, *mp; 904 uint_t cnt; 905 void *sq_mac_handle; 906 callb_cpr_t cprinfo; 907 size_t bytes_to_pickup; 908 uint32_t ctl_state; 909 910 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_poll"); 911 mutex_enter(lock); 912 913 for (;;) { 914 CALLB_CPR_SAFE_BEGIN(&cprinfo); 915 cv_wait(async, lock); 916 CALLB_CPR_SAFE_END(&cprinfo, lock); 917 918 ctl_state = sqp->sq_state & (SQS_POLL_THR_CONTROL | 919 SQS_POLL_THR_QUIESCED); 920 if (ctl_state != 0) { 921 /* 922 * If the squeue is quiesced, then wait for a control 923 * request. A quiesced squeue must not poll the 924 * underlying soft ring. 925 */ 926 if (ctl_state == SQS_POLL_THR_QUIESCED) 927 continue; 928 /* 929 * Act on control requests to quiesce, cleanup or 930 * restart an squeue 931 */ 932 squeue_poll_thr_control(sqp); 933 continue; 934 } 935 936 if (!(sqp->sq_state & SQS_POLL_CAPAB)) 937 continue; 938 939 ASSERT((sqp->sq_state & 940 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) == 941 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)); 942 943 poll_again: 944 sq_rx_ring = sqp->sq_rx_ring; 945 sq_get_pkts = sq_rx_ring->rr_rx; 946 sq_mac_handle = sq_rx_ring->rr_rx_handle; 947 ip_accept = sq_rx_ring->rr_ip_accept; 948 sq_ill = sq_rx_ring->rr_ill; 949 bytes_to_pickup = MAX_BYTES_TO_PICKUP; 950 mutex_exit(lock); 951 head = sq_get_pkts(sq_mac_handle, bytes_to_pickup); 952 mp = NULL; 953 if (head != NULL) { 954 /* 955 * We got the packet chain from the mac layer. It 956 * would be nice to be able to process it inline 957 * for better performance but we need to give 958 * IP a chance to look at this chain to ensure 959 * that packets are really meant for this squeue 960 * and do the IP processing. 961 */ 962 mp = ip_accept(sq_ill, sq_rx_ring, sqp, head, 963 &tail, &cnt); 964 } 965 mutex_enter(lock); 966 if (mp != NULL) { 967 /* 968 * The ip_accept function has already added an 969 * ip_recv_attr_t mblk if that is needed. 970 */ 971 ENQUEUE_CHAIN(sqp, mp, tail, cnt); 972 } 973 ASSERT((sqp->sq_state & 974 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) == 975 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)); 976 977 if (sqp->sq_first != NULL && !(sqp->sq_state & SQS_WORKER)) { 978 /* 979 * We have packets to process and worker thread 980 * is not running. Check to see if poll thread is 981 * allowed to process. Let it do processing only if it 982 * picked up some packets from the NIC otherwise 983 * wakeup the worker thread. 984 */ 985 if (mp != NULL) { 986 hrtime_t now; 987 988 now = gethrtime(); 989 sqp->sq_run = curthread; 990 sqp->sq_drain(sqp, SQS_POLL_PROC, now + 991 squeue_drain_ns); 992 sqp->sq_run = NULL; 993 994 if (sqp->sq_first == NULL) 995 goto poll_again; 996 997 /* 998 * Couldn't do the entire drain because the 999 * time limit expired, let the 1000 * worker thread take over. 1001 */ 1002 } 1003 1004 /* 1005 * Put the SQS_PROC_HELD on so the worker 1006 * thread can distinguish where its called from. We 1007 * can remove the SQS_PROC flag here and turn off the 1008 * polling so that it wouldn't matter who gets the 1009 * processing but we get better performance this way 1010 * and save the cost of turn polling off and possibly 1011 * on again as soon as we start draining again. 1012 * 1013 * We can't remove the SQS_PROC flag without turning 1014 * polling off until we can guarantee that control 1015 * will return to squeue_drain immediately. 1016 */ 1017 sqp->sq_state |= SQS_PROC_HELD; 1018 sqp->sq_state &= ~SQS_GET_PKTS; 1019 squeue_worker_wakeup(sqp); 1020 } else if (sqp->sq_first == NULL && 1021 !(sqp->sq_state & SQS_WORKER)) { 1022 /* 1023 * Nothing queued and worker thread not running. 1024 * Since we hold the proc, no other thread is 1025 * processing the squeue. This means that there 1026 * is no work to be done and nothing is queued 1027 * in squeue or in NIC. Turn polling off and go 1028 * back to interrupt mode. 1029 */ 1030 sqp->sq_state &= ~(SQS_PROC|SQS_GET_PKTS); 1031 /* LINTED: constant in conditional context */ 1032 SQS_POLLING_OFF(sqp, B_TRUE, sq_rx_ring); 1033 1034 /* 1035 * If there is a pending control operation 1036 * wake up the worker, since it is currently 1037 * not running. 1038 */ 1039 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) { 1040 squeue_worker_wakeup(sqp); 1041 } 1042 } else { 1043 /* 1044 * Worker thread is already running. We don't need 1045 * to do anything. Indicate that poll thread is done. 1046 */ 1047 sqp->sq_state &= ~SQS_GET_PKTS; 1048 } 1049 if (sqp->sq_state & SQS_POLL_THR_CONTROL) { 1050 /* 1051 * Act on control requests to quiesce, cleanup or 1052 * restart an squeue 1053 */ 1054 squeue_poll_thr_control(sqp); 1055 } 1056 } 1057 } 1058 1059 /* 1060 * The squeue worker thread acts on any control requests to quiesce, cleanup 1061 * or restart an ill_rx_ring_t by calling this function. The worker thread 1062 * synchronizes with the squeue poll thread to complete the request and finally 1063 * wakes up the requestor when the request is completed. 1064 */ 1065 static void 1066 squeue_worker_thr_control(squeue_t *sqp) 1067 { 1068 ill_t *ill; 1069 ill_rx_ring_t *rx_ring; 1070 1071 ASSERT(MUTEX_HELD(&sqp->sq_lock)); 1072 1073 if (sqp->sq_state & SQS_POLL_RESTART) { 1074 /* Restart implies a previous quiesce. */ 1075 ASSERT((sqp->sq_state & (SQS_PROC_HELD | 1076 SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)) == 1077 (SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)); 1078 /* 1079 * Request the squeue poll thread to restart and wait till 1080 * it actually restarts. 1081 */ 1082 sqp->sq_state &= ~SQS_POLL_QUIESCE_DONE; 1083 sqp->sq_state |= SQS_POLL_THR_RESTART; 1084 cv_signal(&sqp->sq_poll_cv); 1085 while (sqp->sq_state & SQS_POLL_THR_QUIESCED) 1086 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock); 1087 sqp->sq_state &= ~(SQS_POLL_RESTART | SQS_PROC | 1088 SQS_WORKER); 1089 /* 1090 * Signal any waiter that is waiting for the restart 1091 * to complete 1092 */ 1093 sqp->sq_state |= SQS_POLL_RESTART_DONE; 1094 cv_signal(&sqp->sq_ctrlop_done_cv); 1095 return; 1096 } 1097 1098 if (sqp->sq_state & SQS_PROC_HELD) { 1099 /* The squeue poll thread handed control to us */ 1100 ASSERT(sqp->sq_state & SQS_PROC); 1101 } 1102 1103 /* 1104 * Prevent any other thread from processing the squeue 1105 * until we finish the control actions by setting SQS_PROC. 1106 * But allow ourself to reenter by setting SQS_WORKER 1107 */ 1108 sqp->sq_state |= (SQS_PROC | SQS_WORKER); 1109 1110 /* Signal the squeue poll thread and wait for it to quiesce itself */ 1111 if (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) { 1112 sqp->sq_state |= SQS_POLL_THR_QUIESCE; 1113 cv_signal(&sqp->sq_poll_cv); 1114 while (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) 1115 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock); 1116 } 1117 1118 rx_ring = sqp->sq_rx_ring; 1119 ill = rx_ring->rr_ill; 1120 /* 1121 * The lock hierarchy is as follows. 1122 * cpu_lock -> ill_lock -> sqset_lock -> sq_lock 1123 */ 1124 mutex_exit(&sqp->sq_lock); 1125 mutex_enter(&ill->ill_lock); 1126 mutex_enter(&sqp->sq_lock); 1127 1128 SQS_POLLING_OFF(sqp, (sqp->sq_state & SQS_POLL_CAPAB) != 0, 1129 sqp->sq_rx_ring); 1130 sqp->sq_state &= ~(SQS_POLL_CAPAB | SQS_GET_PKTS | SQS_PROC_HELD); 1131 if (sqp->sq_state & SQS_POLL_CLEANUP) { 1132 /* 1133 * Disassociate this squeue from its ill_rx_ring_t. 1134 * The rr_sqp, sq_rx_ring fields are protected by the 1135 * corresponding squeue, ill_lock* and sq_lock. Holding any 1136 * of them will ensure that the ring to squeue mapping does 1137 * not change. 1138 */ 1139 ASSERT(!(sqp->sq_state & SQS_DEFAULT)); 1140 1141 sqp->sq_rx_ring = NULL; 1142 rx_ring->rr_sqp = NULL; 1143 1144 sqp->sq_state &= ~(SQS_POLL_CLEANUP | SQS_POLL_THR_QUIESCED | 1145 SQS_POLL_QUIESCE_DONE); 1146 sqp->sq_ill = NULL; 1147 1148 rx_ring->rr_rx_handle = NULL; 1149 rx_ring->rr_intr_handle = NULL; 1150 rx_ring->rr_intr_enable = NULL; 1151 rx_ring->rr_intr_disable = NULL; 1152 sqp->sq_state |= SQS_POLL_CLEANUP_DONE; 1153 } else { 1154 sqp->sq_state &= ~SQS_POLL_QUIESCE; 1155 sqp->sq_state |= SQS_POLL_QUIESCE_DONE; 1156 } 1157 /* 1158 * Signal any waiter that is waiting for the quiesce or cleanup 1159 * to complete and also wait for it to actually see and reset the 1160 * SQS_POLL_CLEANUP_DONE. 1161 */ 1162 cv_signal(&sqp->sq_ctrlop_done_cv); 1163 mutex_exit(&ill->ill_lock); 1164 if (sqp->sq_state & SQS_POLL_CLEANUP_DONE) { 1165 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock); 1166 sqp->sq_state &= ~(SQS_PROC | SQS_WORKER); 1167 } 1168 } 1169 1170 static void 1171 squeue_worker(squeue_t *sqp) 1172 { 1173 kmutex_t *lock = &sqp->sq_lock; 1174 kcondvar_t *async = &sqp->sq_worker_cv; 1175 callb_cpr_t cprinfo; 1176 hrtime_t now; 1177 1178 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_worker"); 1179 mutex_enter(lock); 1180 1181 for (;;) { 1182 for (;;) { 1183 /* 1184 * If the poll thread has handed control to us 1185 * we need to break out of the wait. 1186 */ 1187 if (sqp->sq_state & SQS_PROC_HELD) 1188 break; 1189 1190 /* 1191 * If the squeue is not being processed and we either 1192 * have messages to drain or some thread has signaled 1193 * some control activity we need to break 1194 */ 1195 if (!(sqp->sq_state & SQS_PROC) && 1196 ((sqp->sq_state & SQS_WORKER_THR_CONTROL) || 1197 (sqp->sq_first != NULL))) 1198 break; 1199 1200 /* 1201 * If we have started some control action, then check 1202 * for the SQS_WORKER flag (since we don't 1203 * release the squeue) to make sure we own the squeue 1204 * and break out 1205 */ 1206 if ((sqp->sq_state & SQS_WORKER_THR_CONTROL) && 1207 (sqp->sq_state & SQS_WORKER)) 1208 break; 1209 1210 CALLB_CPR_SAFE_BEGIN(&cprinfo); 1211 cv_wait(async, lock); 1212 CALLB_CPR_SAFE_END(&cprinfo, lock); 1213 } 1214 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) { 1215 squeue_worker_thr_control(sqp); 1216 continue; 1217 } 1218 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | 1219 SQS_POLL_CLEANUP_DONE | SQS_POLL_QUIESCE_DONE | 1220 SQS_WORKER_THR_CONTROL | SQS_POLL_THR_CONTROL))); 1221 1222 if (sqp->sq_state & SQS_PROC_HELD) 1223 sqp->sq_state &= ~SQS_PROC_HELD; 1224 1225 now = gethrtime(); 1226 sqp->sq_run = curthread; 1227 sqp->sq_drain(sqp, SQS_WORKER, now + squeue_drain_ns); 1228 sqp->sq_run = NULL; 1229 } 1230 } 1231 1232 uintptr_t * 1233 squeue_getprivate(squeue_t *sqp, sqprivate_t p) 1234 { 1235 ASSERT(p < SQPRIVATE_MAX); 1236 1237 return (&sqp->sq_private[p]); 1238 } 1239 1240 /* ARGSUSED */ 1241 void 1242 squeue_wakeup_conn(void *arg, mblk_t *mp, void *arg2, ip_recv_attr_t *dummy) 1243 { 1244 conn_t *connp = (conn_t *)arg; 1245 squeue_t *sqp = connp->conn_sqp; 1246 1247 /* 1248 * Mark the squeue as paused before waking up the thread stuck 1249 * in squeue_synch_enter(). 1250 */ 1251 mutex_enter(&sqp->sq_lock); 1252 sqp->sq_state |= SQS_PAUSE; 1253 1254 /* 1255 * Notify the thread that it's OK to proceed; that is done by 1256 * clearing the MSGWAITSYNC flag. The synch thread will free the mblk. 1257 */ 1258 ASSERT(mp->b_flag & MSGWAITSYNC); 1259 mp->b_flag &= ~MSGWAITSYNC; 1260 cv_broadcast(&connp->conn_sq_cv); 1261 1262 /* 1263 * We are doing something on behalf of another thread, so we have to 1264 * pause and wait until it finishes. 1265 */ 1266 while (sqp->sq_state & SQS_PAUSE) { 1267 cv_wait(&sqp->sq_synch_cv, &sqp->sq_lock); 1268 } 1269 mutex_exit(&sqp->sq_lock); 1270 } 1271 1272 int 1273 squeue_synch_enter(conn_t *connp, mblk_t *use_mp) 1274 { 1275 squeue_t *sqp; 1276 1277 again: 1278 sqp = connp->conn_sqp; 1279 1280 mutex_enter(&sqp->sq_lock); 1281 if (sqp->sq_first == NULL && !(sqp->sq_state & SQS_PROC)) { 1282 /* 1283 * We are OK to proceed if the squeue is empty, and 1284 * no one owns the squeue. 1285 * 1286 * The caller won't own the squeue as this is called from the 1287 * application. 1288 */ 1289 ASSERT(sqp->sq_run == NULL); 1290 1291 sqp->sq_state |= SQS_PROC; 1292 sqp->sq_run = curthread; 1293 mutex_exit(&sqp->sq_lock); 1294 1295 /* 1296 * Handle squeue switching. The conn's squeue can only change 1297 * while there is a thread in the squeue, which is why we do 1298 * the check after entering the squeue. If it has changed, exit 1299 * this squeue and redo everything with the new sqeueue. 1300 */ 1301 if (sqp != connp->conn_sqp) { 1302 mutex_enter(&sqp->sq_lock); 1303 sqp->sq_state &= ~SQS_PROC; 1304 sqp->sq_run = NULL; 1305 mutex_exit(&sqp->sq_lock); 1306 goto again; 1307 } 1308 #if SQUEUE_DEBUG 1309 sqp->sq_curmp = NULL; 1310 sqp->sq_curproc = NULL; 1311 sqp->sq_connp = connp; 1312 #endif 1313 connp->conn_on_sqp = B_TRUE; 1314 return (0); 1315 } else { 1316 mblk_t *mp; 1317 1318 mp = (use_mp == NULL) ? allocb(0, BPRI_MED) : use_mp; 1319 if (mp == NULL) { 1320 mutex_exit(&sqp->sq_lock); 1321 return (ENOMEM); 1322 } 1323 1324 /* 1325 * We mark the mblk as awaiting synchronous squeue access 1326 * by setting the MSGWAITSYNC flag. Once squeue_wakeup_conn 1327 * fires, MSGWAITSYNC is cleared, at which point we know we 1328 * have exclusive access. 1329 */ 1330 mp->b_flag |= MSGWAITSYNC; 1331 1332 CONN_INC_REF(connp); 1333 SET_SQUEUE(mp, squeue_wakeup_conn, connp); 1334 ENQUEUE_CHAIN(sqp, mp, mp, 1); 1335 1336 ASSERT(sqp->sq_run != curthread); 1337 1338 /* Wait until the enqueued mblk get processed. */ 1339 while (mp->b_flag & MSGWAITSYNC) 1340 cv_wait(&connp->conn_sq_cv, &sqp->sq_lock); 1341 mutex_exit(&sqp->sq_lock); 1342 1343 if (use_mp == NULL) 1344 freeb(mp); 1345 1346 return (0); 1347 } 1348 } 1349 1350 void 1351 squeue_synch_exit(conn_t *connp) 1352 { 1353 squeue_t *sqp = connp->conn_sqp; 1354 1355 mutex_enter(&sqp->sq_lock); 1356 if (sqp->sq_run == curthread) { 1357 ASSERT(sqp->sq_state & SQS_PROC); 1358 1359 sqp->sq_state &= ~SQS_PROC; 1360 sqp->sq_run = NULL; 1361 connp->conn_on_sqp = B_FALSE; 1362 1363 if (sqp->sq_first != NULL) { 1364 /* 1365 * If this was a normal thread, then it would 1366 * (most likely) continue processing the pending 1367 * requests. Since the just completed operation 1368 * was executed synchronously, the thread should 1369 * not be delayed. To compensate, wake up the 1370 * worker thread right away when there are outstanding 1371 * requests. 1372 */ 1373 squeue_worker_wakeup(sqp); 1374 } 1375 } else { 1376 /* 1377 * The caller doesn't own the squeue, clear the SQS_PAUSE flag, 1378 * and wake up the squeue owner, such that owner can continue 1379 * processing. 1380 */ 1381 ASSERT(sqp->sq_state & SQS_PAUSE); 1382 sqp->sq_state &= ~SQS_PAUSE; 1383 1384 /* There should be only one thread blocking on sq_synch_cv. */ 1385 cv_signal(&sqp->sq_synch_cv); 1386 } 1387 mutex_exit(&sqp->sq_lock); 1388 } 1389