1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21 /*
22 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
23 */
24
25 /*
26 * Copyright 2017 Joyent, Inc.
27 * Copyright 2026 Oxide Computer Company
28 */
29
30 /*
31 * Squeues: General purpose serialization mechanism
32 * ------------------------------------------------
33 *
34 * Background:
35 * -----------
36 *
37 * This is a general purpose high-performance serialization mechanism
38 * currently used by TCP/IP. It is implement by means of a per CPU queue,
39 * a worker thread and a polling thread with are bound to the CPU
40 * associated with the squeue. The squeue is strictly FIFO for both read
41 * and write side and only one thread can process it at any given time.
42 * The design goal of squeue was to offer a very high degree of
43 * parallelization (on a per H/W execution pipeline basis) with at
44 * most one queuing.
45 *
46 * The modules needing protection typically calls SQUEUE_ENTER_ONE() or
47 * SQUEUE_ENTER() macro as soon as a thread enter the module
48 * from either direction. For each packet, the processing function
49 * and argument is stored in the mblk itself. When the packet is ready
50 * to be processed, the squeue retrieves the stored function and calls
51 * it with the supplied argument and the pointer to the packet itself.
52 * The called function can assume that no other thread is processing
53 * the squeue when it is executing.
54 *
55 * Squeue/connection binding:
56 * --------------------------
57 *
58 * TCP/IP uses an IP classifier in conjunction with squeue where specific
59 * connections are assigned to specific squeue (based on various policies),
60 * at the connection creation time. Once assigned, the connection to
61 * squeue mapping is never changed and all future packets for that
62 * connection are processed on that squeue. The connection ("conn") to
63 * squeue mapping is stored in "conn_t" member "conn_sqp".
64 *
65 * Since the processing of the connection cuts across multiple layers
66 * but still allows packets for different connnection to be processed on
67 * other CPU/squeues, squeues are also termed as "Vertical Perimeter" or
68 * "Per Connection Vertical Perimeter".
69 *
70 * Processing Model:
71 * -----------------
72 *
73 * Squeue doesn't necessary processes packets with its own worker thread.
74 * The callers can pick if they just want to queue the packet, process
75 * their packet if nothing is queued or drain and process. The first two
76 * modes are typically employed when the packet was generated while
77 * already doing the processing behind the squeue and last mode (drain
78 * and process) is typically employed when the thread is entering squeue
79 * for the first time. The squeue still imposes a finite time limit
80 * for which a external thread can do processing after which it switches
81 * processing to its own worker thread.
82 *
83 * Once created, squeues are never deleted. Hence squeue pointers are
84 * always valid. This means that functions outside the squeue can still
85 * refer safely to conn_sqp and their is no need for ref counts.
86 *
87 * Only a thread executing in the squeue can change the squeue of the
88 * connection. It does so by calling a squeue framework function to do this.
89 * After changing the squeue, the thread must leave the squeue. It must not
90 * continue to execute any code that needs squeue protection.
91 *
92 * The squeue framework, after entering the squeue, checks if the current
93 * squeue matches the conn_sqp. If the check fails, the packet is delivered
94 * to right squeue.
95 *
96 * Polling Model:
97 * --------------
98 *
99 * Squeues can control the rate of packet arrival into itself from the
100 * NIC or specific Rx ring within a NIC. As part of capability negotiation
101 * between IP and MAC layer, squeue are created for each TCP soft ring
102 * (or TCP Rx ring - to be implemented in future). As part of this
103 * negotiation, squeues get a cookie for underlying soft ring or Rx
104 * ring, a function to turn off incoming packets and a function to call
105 * to poll for packets. This helps schedule the receive side packet
106 * processing so that queue backlog doesn't build up and packet processing
107 * doesn't keep getting disturbed by high priority interrupts. As part
108 * of this mode, as soon as a backlog starts building, squeue turns off
109 * the interrupts and switches to poll mode. In poll mode, when poll
110 * thread goes down to retrieve packets, it retrieves them in the form of
111 * a chain which improves performance even more. As the squeue/softring
112 * system gets more packets, it gets more efficient by switching to
113 * polling more often and dealing with larger packet chains.
114 *
115 */
116
117 #include <sys/types.h>
118 #include <sys/cmn_err.h>
119 #include <sys/debug.h>
120 #include <sys/kmem.h>
121 #include <sys/cpuvar.h>
122 #include <sys/condvar_impl.h>
123 #include <sys/systm.h>
124 #include <sys/callb.h>
125 #include <sys/sdt.h>
126 #include <sys/ddi.h>
127 #include <sys/sunddi.h>
128 #include <sys/stack.h>
129 #include <sys/archsystm.h>
130
131 #include <inet/ipclassifier.h>
132 #include <inet/udp_impl.h>
133
134 #include <sys/squeue_impl.h>
135
136 static void squeue_drain(squeue_t *, uint_t, hrtime_t);
137 static void squeue_worker(squeue_t *sqp);
138 static void squeue_polling_thread(squeue_t *sqp);
139 static void squeue_worker_wakeup(squeue_t *sqp);
140 static void squeue_try_drain_one(squeue_t *, conn_t *);
141
142 kmem_cache_t *squeue_cache;
143
144 #define SQUEUE_MSEC_TO_NSEC 1000000
145
146 int squeue_drain_ms = 20;
147
148 /* The values above converted to ticks or nano seconds */
149 static uint_t squeue_drain_ns = 0;
150
151 uintptr_t squeue_drain_stack_needed = 10240;
152 uint_t squeue_drain_stack_toodeep;
153
154 /*
155 * The number of bytes the squeue is allowed to poll from the softring in a
156 * single read. The accounting is done on a per-mblk basis, so the squeue may
157 * poll one mblk/MTU worth of data over the limit.
158 */
159 size_t squeue_poll_budget_bytes = 150000;
160
161 #define ENQUEUE_CHAIN(sqp, mp, tail, cnt) { \
162 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \
163 \
164 if ((sqp)->sq_last != NULL) \
165 (sqp)->sq_last->b_next = (mp); \
166 else \
167 (sqp)->sq_first = (mp); \
168 (sqp)->sq_last = (tail); \
169 (sqp)->sq_count += (cnt); \
170 ASSERT((sqp)->sq_count > 0); \
171 DTRACE_PROBE4(squeue__enqueuechain, squeue_t *, sqp, \
172 mblk_t *, mp, mblk_t *, tail, int, cnt); \
173 \
174 }
175
176 /*
177 * Blank the receive ring (in this case it is the soft ring). When
178 * blanked, the soft ring will not send any more packets up.
179 * Blanking may not succeed when there is a CPU already in the soft
180 * ring sending packets up. In that case, SQS_POLLING will not be
181 * set.
182 */
183 #define SQS_POLLING_ON(sqp, sq_poll_capable, rx_ring) { \
184 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \
185 if (sq_poll_capable) { \
186 ASSERT(rx_ring != NULL); \
187 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \
188 if (!(sqp->sq_state & SQS_POLLING)) { \
189 if (rx_ring->rr_intr_disable(rx_ring->rr_intr_handle)) \
190 sqp->sq_state |= SQS_POLLING; \
191 } \
192 } \
193 }
194
195 #define SQS_POLLING_OFF(sqp, sq_poll_capable, rx_ring) { \
196 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \
197 if (sq_poll_capable) { \
198 ASSERT(rx_ring != NULL); \
199 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \
200 if (sqp->sq_state & SQS_POLLING) { \
201 sqp->sq_state &= ~SQS_POLLING; \
202 rx_ring->rr_intr_enable(rx_ring->rr_intr_handle); \
203 } \
204 } \
205 }
206
207 /* Wakeup poll thread only if SQS_POLLING is set */
208 #define SQS_POLL_RING(sqp) { \
209 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \
210 if (sqp->sq_state & SQS_POLLING) { \
211 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \
212 if (!(sqp->sq_state & SQS_GET_PKTS)) { \
213 sqp->sq_state |= SQS_GET_PKTS; \
214 cv_signal(&sqp->sq_poll_cv); \
215 } \
216 } \
217 }
218
219 #ifdef DEBUG
220 #define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) { \
221 (sqp)->sq_curmp = (mp); \
222 (sqp)->sq_curproc = (proc); \
223 (sqp)->sq_connp = (connp); \
224 (mp)->b_tag = (sqp)->sq_tag = (tag); \
225 }
226
227 #define SQUEUE_DBG_CLEAR(sqp) { \
228 (sqp)->sq_curmp = NULL; \
229 (sqp)->sq_curproc = NULL; \
230 (sqp)->sq_connp = NULL; \
231 }
232 #else
233 #define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag)
234 #define SQUEUE_DBG_CLEAR(sqp)
235 #endif
236
237 void
squeue_init(void)238 squeue_init(void)
239 {
240 squeue_cache = kmem_cache_create("squeue_cache",
241 sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0);
242
243 squeue_drain_ns = squeue_drain_ms * SQUEUE_MSEC_TO_NSEC;
244 }
245
246 squeue_t *
squeue_create(pri_t pri)247 squeue_create(pri_t pri)
248 {
249 squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP);
250
251 bzero(sqp, sizeof (squeue_t));
252 sqp->sq_bind = PBIND_NONE;
253 sqp->sq_priority = pri;
254 sqp->sq_worker = thread_create(NULL, 0, squeue_worker,
255 sqp, 0, &p0, TS_RUN, pri);
256
257 sqp->sq_poll_thr = thread_create(NULL, 0, squeue_polling_thread,
258 sqp, 0, &p0, TS_RUN, pri);
259
260 sqp->sq_enter = squeue_enter;
261 sqp->sq_drain = squeue_drain;
262
263 return (sqp);
264 }
265
266 /*
267 * Bind squeue worker thread to the specified CPU, given by CPU id.
268 * If the CPU id value is -1, bind the worker thread to the value
269 * specified in sq_bind field. If a thread is already bound to a
270 * different CPU, unbind it from the old CPU and bind to the new one.
271 */
272
273 void
squeue_bind(squeue_t * sqp,processorid_t bind)274 squeue_bind(squeue_t *sqp, processorid_t bind)
275 {
276 mutex_enter(&sqp->sq_lock);
277 ASSERT(sqp->sq_bind != PBIND_NONE || bind != PBIND_NONE);
278 ASSERT(MUTEX_HELD(&cpu_lock));
279
280 if (sqp->sq_state & SQS_BOUND) {
281 if (sqp->sq_bind == bind) {
282 mutex_exit(&sqp->sq_lock);
283 return;
284 }
285 thread_affinity_clear(sqp->sq_worker);
286 } else {
287 sqp->sq_state |= SQS_BOUND;
288 }
289
290 if (bind != PBIND_NONE)
291 sqp->sq_bind = bind;
292
293 thread_affinity_set(sqp->sq_worker, sqp->sq_bind);
294 mutex_exit(&sqp->sq_lock);
295 }
296
297 void
squeue_unbind(squeue_t * sqp)298 squeue_unbind(squeue_t *sqp)
299 {
300 mutex_enter(&sqp->sq_lock);
301 if (!(sqp->sq_state & SQS_BOUND)) {
302 mutex_exit(&sqp->sq_lock);
303 return;
304 }
305
306 sqp->sq_state &= ~SQS_BOUND;
307 thread_affinity_clear(sqp->sq_worker);
308 mutex_exit(&sqp->sq_lock);
309 }
310
311 /*
312 * squeue_enter() - enter squeue sqp with mblk mp (which can be
313 * a chain), while tail points to the end and cnt in number of
314 * mblks in the chain.
315 *
316 * For a chain of single packet (i.e. mp == tail), go through the
317 * fast path if no one is processing the squeue and nothing is queued.
318 *
319 * The proc and arg for each mblk is already stored in the mblk in
320 * appropriate places.
321 *
322 * The process_flag specifies if we are allowed to process the mblk
323 * and drain in the entering thread context. If process_flag is
324 * SQ_FILL, then we just queue the mblk and return (after signaling
325 * the worker thread if no one else is processing the squeue).
326 *
327 * The ira argument can be used when the count is one.
328 * For a chain the caller needs to prepend any needed mblks from
329 * ip_recv_attr_to_mblk().
330 */
331 /* ARGSUSED */
332 void
squeue_enter(squeue_t * sqp,mblk_t * mp,mblk_t * tail,uint32_t cnt,ip_recv_attr_t * ira,int process_flag,uint8_t tag)333 squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt,
334 ip_recv_attr_t *ira, int process_flag, uint8_t tag)
335 {
336 conn_t *connp;
337 sqproc_t proc;
338 hrtime_t now;
339
340 ASSERT(sqp != NULL);
341 ASSERT(mp != NULL);
342 ASSERT(tail != NULL);
343 ASSERT(cnt > 0);
344 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
345 ASSERT(ira == NULL || cnt == 1);
346
347 mutex_enter(&sqp->sq_lock);
348
349 /*
350 * Try to process the packet if SQ_FILL flag is not set and
351 * we are allowed to process the squeue. The SQ_NODRAIN is
352 * ignored if the packet chain consists of more than 1 packet.
353 */
354 if (!(sqp->sq_state & SQS_PROC) && ((process_flag == SQ_PROCESS) ||
355 (process_flag == SQ_NODRAIN && sqp->sq_first == NULL))) {
356 /*
357 * See if anything is already queued. If we are the
358 * first packet, do inline processing else queue the
359 * packet and do the drain.
360 */
361 if (sqp->sq_first == NULL && cnt == 1) {
362 /*
363 * Fast-path, ok to process and nothing queued.
364 */
365 sqp->sq_state |= (SQS_PROC|SQS_FAST);
366 sqp->sq_run = curthread;
367 mutex_exit(&sqp->sq_lock);
368
369 /*
370 * We are the chain of 1 packet so
371 * go through this fast path.
372 */
373 ASSERT(mp->b_prev != NULL);
374 ASSERT(mp->b_queue != NULL);
375 connp = (conn_t *)mp->b_prev;
376 mp->b_prev = NULL;
377 proc = (sqproc_t)mp->b_queue;
378 mp->b_queue = NULL;
379 ASSERT(proc != NULL && connp != NULL);
380 ASSERT(mp->b_next == NULL);
381
382 /*
383 * Handle squeue switching. More details in the
384 * block comment at the top of the file
385 */
386 if (connp->conn_sqp == sqp) {
387 SQUEUE_DBG_SET(sqp, mp, proc, connp,
388 tag);
389 connp->conn_on_sqp = B_TRUE;
390 DTRACE_PROBE3(squeue__proc__start, squeue_t *,
391 sqp, mblk_t *, mp, conn_t *, connp);
392 (*proc)(connp, mp, sqp, ira);
393 DTRACE_PROBE2(squeue__proc__end, squeue_t *,
394 sqp, conn_t *, connp);
395 connp->conn_on_sqp = B_FALSE;
396 SQUEUE_DBG_CLEAR(sqp);
397 CONN_DEC_REF(connp);
398 } else {
399 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc,
400 connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE);
401 }
402 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
403 mutex_enter(&sqp->sq_lock);
404 sqp->sq_state &= ~(SQS_PROC|SQS_FAST);
405 sqp->sq_run = NULL;
406 if (sqp->sq_first == NULL ||
407 process_flag == SQ_NODRAIN) {
408 /*
409 * Even if SQ_NODRAIN was specified, it may
410 * still be best to process a single queued
411 * item if it matches the active connection.
412 */
413 if (sqp->sq_first != NULL) {
414 squeue_try_drain_one(sqp, connp);
415 }
416
417 /*
418 * If work or control actions are pending, wake
419 * up the worker thread.
420 */
421 if (sqp->sq_first != NULL ||
422 sqp->sq_state & SQS_WORKER_THR_CONTROL) {
423 squeue_worker_wakeup(sqp);
424 }
425 mutex_exit(&sqp->sq_lock);
426 return;
427 }
428 } else {
429 if (ira != NULL) {
430 mblk_t *attrmp;
431
432 ASSERT(cnt == 1);
433 attrmp = ip_recv_attr_to_mblk(ira);
434 if (attrmp == NULL) {
435 mutex_exit(&sqp->sq_lock);
436 ip_drop_input("squeue: "
437 "ip_recv_attr_to_mblk",
438 mp, NULL);
439 /* Caller already set b_prev/b_next */
440 mp->b_prev = mp->b_next = NULL;
441 freemsg(mp);
442 return;
443 }
444 ASSERT(attrmp->b_cont == NULL);
445 attrmp->b_cont = mp;
446 /* Move connp and func to new */
447 attrmp->b_queue = mp->b_queue;
448 mp->b_queue = NULL;
449 attrmp->b_prev = mp->b_prev;
450 mp->b_prev = NULL;
451
452 ASSERT(mp == tail);
453 tail = mp = attrmp;
454 }
455
456 ENQUEUE_CHAIN(sqp, mp, tail, cnt);
457 #ifdef DEBUG
458 mp->b_tag = tag;
459 #endif
460 }
461 /*
462 * We are here because either we couldn't do inline
463 * processing (because something was already queued),
464 * or we had a chain of more than one packet,
465 * or something else arrived after we were done with
466 * inline processing.
467 */
468 ASSERT(MUTEX_HELD(&sqp->sq_lock));
469 ASSERT(sqp->sq_first != NULL);
470 now = gethrtime();
471 sqp->sq_run = curthread;
472 sqp->sq_drain(sqp, SQS_ENTER, now + squeue_drain_ns);
473
474 /*
475 * If we didn't do a complete drain, the worker
476 * thread was already signalled by squeue_drain.
477 * In case any control actions are pending, wake
478 * up the worker.
479 */
480 sqp->sq_run = NULL;
481 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) {
482 squeue_worker_wakeup(sqp);
483 }
484 } else {
485 /*
486 * We let a thread processing a squeue reenter only
487 * once. This helps the case of incoming connection
488 * where a SYN-ACK-ACK that triggers the conn_ind
489 * doesn't have to queue the packet if listener and
490 * eager are on the same squeue. Also helps the
491 * loopback connection where the two ends are bound
492 * to the same squeue (which is typical on single
493 * CPU machines).
494 *
495 * We let the thread reenter only once for the fear
496 * of stack getting blown with multiple traversal.
497 */
498 connp = (conn_t *)mp->b_prev;
499 if (!(sqp->sq_state & SQS_REENTER) &&
500 (process_flag != SQ_FILL) && (sqp->sq_first == NULL) &&
501 (sqp->sq_run == curthread) && (cnt == 1) &&
502 (connp->conn_on_sqp == B_FALSE)) {
503 sqp->sq_state |= SQS_REENTER;
504 mutex_exit(&sqp->sq_lock);
505
506 ASSERT(mp->b_prev != NULL);
507 ASSERT(mp->b_queue != NULL);
508
509 mp->b_prev = NULL;
510 proc = (sqproc_t)mp->b_queue;
511 mp->b_queue = NULL;
512
513 /*
514 * Handle squeue switching. More details in the
515 * block comment at the top of the file
516 */
517 if (connp->conn_sqp == sqp) {
518 connp->conn_on_sqp = B_TRUE;
519 DTRACE_PROBE3(squeue__proc__start, squeue_t *,
520 sqp, mblk_t *, mp, conn_t *, connp);
521 (*proc)(connp, mp, sqp, ira);
522 DTRACE_PROBE2(squeue__proc__end, squeue_t *,
523 sqp, conn_t *, connp);
524 connp->conn_on_sqp = B_FALSE;
525 CONN_DEC_REF(connp);
526 } else {
527 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc,
528 connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE);
529 }
530
531 mutex_enter(&sqp->sq_lock);
532 sqp->sq_state &= ~SQS_REENTER;
533 mutex_exit(&sqp->sq_lock);
534 return;
535 }
536
537 /*
538 * Queue is already being processed or there is already
539 * one or more paquets on the queue. Enqueue the
540 * packet and wakeup the squeue worker thread if the
541 * squeue is not being processed.
542 */
543 #ifdef DEBUG
544 mp->b_tag = tag;
545 #endif
546 if (ira != NULL) {
547 mblk_t *attrmp;
548
549 ASSERT(cnt == 1);
550 attrmp = ip_recv_attr_to_mblk(ira);
551 if (attrmp == NULL) {
552 mutex_exit(&sqp->sq_lock);
553 ip_drop_input("squeue: ip_recv_attr_to_mblk",
554 mp, NULL);
555 /* Caller already set b_prev/b_next */
556 mp->b_prev = mp->b_next = NULL;
557 freemsg(mp);
558 return;
559 }
560 ASSERT(attrmp->b_cont == NULL);
561 attrmp->b_cont = mp;
562 /* Move connp and func to new */
563 attrmp->b_queue = mp->b_queue;
564 mp->b_queue = NULL;
565 attrmp->b_prev = mp->b_prev;
566 mp->b_prev = NULL;
567
568 ASSERT(mp == tail);
569 tail = mp = attrmp;
570 }
571 ENQUEUE_CHAIN(sqp, mp, tail, cnt);
572 /*
573 * If the worker isn't running or control actions are pending,
574 * wake it it up now.
575 */
576 if ((sqp->sq_state & SQS_PROC) == 0 ||
577 (sqp->sq_state & SQS_WORKER_THR_CONTROL) != 0) {
578 squeue_worker_wakeup(sqp);
579 }
580 }
581 mutex_exit(&sqp->sq_lock);
582 }
583
584 /*
585 * PRIVATE FUNCTIONS
586 */
587
588
589 /*
590 * Wake up worker thread for squeue to process queued work.
591 */
592 static void
squeue_worker_wakeup(squeue_t * sqp)593 squeue_worker_wakeup(squeue_t *sqp)
594 {
595 ASSERT(MUTEX_HELD(&(sqp)->sq_lock));
596
597 cv_signal(&sqp->sq_worker_cv);
598 sqp->sq_awoken = gethrtime();
599 }
600
601 static void
squeue_drain(squeue_t * sqp,uint_t proc_type,hrtime_t expire)602 squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire)
603 {
604 mblk_t *mp;
605 mblk_t *head;
606 sqproc_t proc;
607 conn_t *connp;
608 ill_rx_ring_t *sq_rx_ring = sqp->sq_rx_ring;
609 hrtime_t now;
610 boolean_t sq_poll_capable;
611 ip_recv_attr_t *ira, iras;
612
613 /*
614 * Before doing any work, check our stack depth; if we're not a
615 * worker thread for this squeue and we're beginning to get tight on
616 * on stack, kick the worker, bump a counter and return.
617 */
618 if (proc_type != SQS_WORKER && STACK_BIAS + (uintptr_t)getfp() -
619 (uintptr_t)curthread->t_stkbase < squeue_drain_stack_needed) {
620 ASSERT(mutex_owned(&sqp->sq_lock));
621 squeue_worker_wakeup(sqp);
622 squeue_drain_stack_toodeep++;
623 return;
624 }
625
626 sq_poll_capable = (sqp->sq_state & SQS_POLL_CAPAB) != 0;
627 again:
628 ASSERT(mutex_owned(&sqp->sq_lock));
629 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
630 SQS_POLL_QUIESCE_DONE)));
631
632 head = sqp->sq_first;
633 sqp->sq_first = NULL;
634 sqp->sq_last = NULL;
635 sqp->sq_count = 0;
636
637 sqp->sq_state |= SQS_PROC | proc_type;
638
639 /*
640 * We have backlog built up. Switch to polling mode if the
641 * device underneath allows it. Need to do it so that
642 * more packets don't come in and disturb us (by contending
643 * for sq_lock or higher priority thread preempting us).
644 *
645 * The worker thread is allowed to do active polling while we
646 * just disable the interrupts for drain by non worker (kernel
647 * or userland) threads so they can peacefully process the
648 * packets during time allocated to them.
649 */
650 SQS_POLLING_ON(sqp, sq_poll_capable, sq_rx_ring);
651 mutex_exit(&sqp->sq_lock);
652
653 while ((mp = head) != NULL) {
654
655 head = mp->b_next;
656 mp->b_next = NULL;
657
658 proc = (sqproc_t)mp->b_queue;
659 mp->b_queue = NULL;
660 connp = (conn_t *)mp->b_prev;
661 mp->b_prev = NULL;
662
663 /* Is there an ip_recv_attr_t to handle? */
664 if (ip_recv_attr_is_mblk(mp)) {
665 mblk_t *attrmp = mp;
666
667 ASSERT(attrmp->b_cont != NULL);
668
669 mp = attrmp->b_cont;
670 attrmp->b_cont = NULL;
671 ASSERT(mp->b_queue == NULL);
672 ASSERT(mp->b_prev == NULL);
673
674 if (!ip_recv_attr_from_mblk(attrmp, &iras)) {
675 /* The ill or ip_stack_t disappeared on us */
676 ip_drop_input("ip_recv_attr_from_mblk",
677 mp, NULL);
678 ira_cleanup(&iras, B_TRUE);
679 CONN_DEC_REF(connp);
680 continue;
681 }
682 ira = &iras;
683 } else {
684 ira = NULL;
685 }
686
687
688 /*
689 * Handle squeue switching. More details in the
690 * block comment at the top of the file
691 */
692 if (connp->conn_sqp == sqp) {
693 SQUEUE_DBG_SET(sqp, mp, proc, connp,
694 mp->b_tag);
695 connp->conn_on_sqp = B_TRUE;
696 DTRACE_PROBE3(squeue__proc__start, squeue_t *,
697 sqp, mblk_t *, mp, conn_t *, connp);
698 (*proc)(connp, mp, sqp, ira);
699 DTRACE_PROBE2(squeue__proc__end, squeue_t *,
700 sqp, conn_t *, connp);
701 connp->conn_on_sqp = B_FALSE;
702 CONN_DEC_REF(connp);
703 } else {
704 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, connp, ira,
705 SQ_FILL, SQTAG_SQUEUE_CHANGE);
706 }
707 if (ira != NULL)
708 ira_cleanup(ira, B_TRUE);
709 }
710
711 SQUEUE_DBG_CLEAR(sqp);
712
713 mutex_enter(&sqp->sq_lock);
714
715 /*
716 * Check if there is still work to do (either more arrived or timer
717 * expired). If we are the worker thread and we are polling capable,
718 * continue doing the work since no one else is around to do the
719 * work anyway (but signal the poll thread to retrieve some packets
720 * in the meanwhile). If we are not the worker thread, just
721 * signal the worker thread to take up the work if processing time
722 * has expired.
723 */
724 if (sqp->sq_first != NULL) {
725 /*
726 * Still more to process. If time quanta not expired, we
727 * should let the drain go on. The worker thread is allowed
728 * to drain as long as there is anything left.
729 */
730 now = gethrtime();
731 if ((now < expire) || (proc_type == SQS_WORKER)) {
732 /*
733 * If time not expired or we are worker thread and
734 * this squeue is polling capable, continue to do
735 * the drain.
736 *
737 * We turn off interrupts for all userland threads
738 * doing drain but we do active polling only for
739 * worker thread.
740 *
741 * Calling SQS_POLL_RING() even in the case of
742 * SQS_POLLING_ON() not succeeding is ok as
743 * SQS_POLL_RING() will not wake up poll thread
744 * if SQS_POLLING bit is not set.
745 */
746 if (proc_type == SQS_WORKER)
747 SQS_POLL_RING(sqp);
748 goto again;
749 }
750
751 squeue_worker_wakeup(sqp);
752 }
753
754 /*
755 * If the poll thread is already running, just return. The
756 * poll thread continues to hold the proc and will finish
757 * processing.
758 */
759 if (sqp->sq_state & SQS_GET_PKTS) {
760 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
761 SQS_POLL_QUIESCE_DONE)));
762 sqp->sq_state &= ~proc_type;
763 return;
764 }
765
766 /*
767 *
768 * If we are the worker thread and no work is left, send the poll
769 * thread down once more to see if something arrived. Otherwise,
770 * turn the interrupts back on and we are done.
771 */
772 if ((proc_type == SQS_WORKER) && (sqp->sq_state & SQS_POLLING)) {
773 /*
774 * Do one last check to see if anything arrived
775 * in the NIC. We leave the SQS_PROC set to ensure
776 * that poll thread keeps the PROC and can decide
777 * if it needs to turn polling off or continue
778 * processing.
779 *
780 * If we drop the SQS_PROC here and poll thread comes
781 * up empty handed, it can not safely turn polling off
782 * since someone else could have acquired the PROC
783 * and started draining. The previously running poll
784 * thread and the current thread doing drain would end
785 * up in a race for turning polling on/off and more
786 * complex code would be required to deal with it.
787 *
788 * Its lot simpler for drain to hand the SQS_PROC to
789 * poll thread (if running) and let poll thread finish
790 * without worrying about racing with any other thread.
791 */
792 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
793 SQS_POLL_QUIESCE_DONE)));
794 SQS_POLL_RING(sqp);
795 sqp->sq_state &= ~proc_type;
796 } else {
797 /*
798 * The squeue is either not capable of polling or the
799 * attempt to blank (i.e., turn SQS_POLLING_ON()) was
800 * unsuccessful or poll thread already finished
801 * processing and didn't find anything. Since there
802 * is nothing queued and we already turn polling on
803 * (for all threads doing drain), we should turn
804 * polling off and relinquish the PROC.
805 */
806 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
807 SQS_POLL_QUIESCE_DONE)));
808 SQS_POLLING_OFF(sqp, sq_poll_capable, sq_rx_ring);
809 sqp->sq_state &= ~(SQS_PROC | proc_type);
810 /*
811 * If we are not the worker and there is a pending quiesce
812 * event, wake up the worker
813 */
814 if ((proc_type != SQS_WORKER) &&
815 (sqp->sq_state & SQS_WORKER_THR_CONTROL)) {
816 squeue_worker_wakeup(sqp);
817 }
818 }
819 }
820
821 /*
822 * Quiesce, Restart, or Cleanup of the squeue poll thread.
823 *
824 * Quiesce and Restart: After an squeue poll thread has been quiesced, it does
825 * not attempt to poll the underlying soft ring any more. The quiesce is
826 * triggered by the mac layer when it wants to quiesce a soft ring. Typically
827 * control operations such as changing the fanout of a NIC or VNIC (dladm
828 * setlinkprop) need to quiesce data flow before changing the wiring.
829 * The operation is done by the mac layer, but it calls back into IP to
830 * quiesce the soft ring. After completing the operation (say increase or
831 * decrease of the fanout) the mac layer then calls back into IP to restart
832 * the quiesced soft ring.
833 *
834 * Cleanup: This is triggered when the squeue binding to a soft ring is
835 * removed permanently. Typically interface plumb and unplumb would trigger
836 * this. It can also be triggered from the mac layer when a soft ring is
837 * being deleted say as the result of a fanout reduction. Since squeues are
838 * never deleted, the cleanup marks the squeue as fit for recycling and
839 * moves it to the zeroth squeue set.
840 */
841 static void
squeue_poll_thr_control(squeue_t * sqp)842 squeue_poll_thr_control(squeue_t *sqp)
843 {
844 if (sqp->sq_state & SQS_POLL_THR_RESTART) {
845 /* Restart implies a previous quiesce */
846 ASSERT(sqp->sq_state & SQS_POLL_THR_QUIESCED);
847 sqp->sq_state &= ~(SQS_POLL_THR_QUIESCED |
848 SQS_POLL_THR_RESTART);
849 sqp->sq_state |= SQS_POLL_CAPAB;
850 cv_signal(&sqp->sq_worker_cv);
851 return;
852 }
853
854 if (sqp->sq_state & SQS_POLL_THR_QUIESCE) {
855 sqp->sq_state |= SQS_POLL_THR_QUIESCED;
856 sqp->sq_state &= ~SQS_POLL_THR_QUIESCE;
857 cv_signal(&sqp->sq_worker_cv);
858 return;
859 }
860 }
861
862 /*
863 * POLLING Notes
864 *
865 * With polling mode, we want to do as much processing as we possibly can
866 * in worker thread context. The sweet spot is worker thread keeps doing
867 * work all the time in polling mode and writers etc. keep dumping packets
868 * to worker thread. Occassionally, we send the poll thread (running at
869 * lower priority to NIC to get the chain of packets to feed to worker).
870 * Sending the poll thread down to NIC is dependant on 3 criterions
871 *
872 * 1) Its always driven from squeue_drain and only if worker thread is
873 * doing the drain.
874 * 2) We clear the backlog once and more packets arrived in between.
875 * Before starting drain again, send the poll thread down if
876 * the drain is being done by worker thread.
877 * 3) Before exiting the squeue_drain, if the poll thread is not already
878 * working and we are the worker thread, try to poll one more time.
879 *
880 * For latency sake, we do allow any thread calling squeue_enter
881 * to process its packet provided:
882 *
883 * 1) Nothing is queued
884 * 2) If more packets arrived in between, the non worker thread are allowed
885 * to do the drain till their time quanta expired provided SQS_GET_PKTS
886 * wasn't set in between.
887 *
888 * Avoiding deadlocks with interrupts
889 * ==================================
890 *
891 * One of the big problem is that we can't send poll_thr down while holding
892 * the sq_lock since the thread can block. So we drop the sq_lock before
893 * calling sq_get_pkts(). We keep holding the SQS_PROC as long as the
894 * poll thread is running so that no other thread can acquire the
895 * perimeter in between. If the squeue_drain gets done (no more work
896 * left), it leaves the SQS_PROC set if poll thread is running.
897 */
898
899 /*
900 * This is the squeue poll thread. In poll mode, it polls the underlying
901 * TCP softring and feeds packets into the squeue. The worker thread then
902 * drains the squeue. The poll thread also responds to control signals for
903 * quiesceing, restarting, or cleanup of an squeue. These are driven by
904 * control operations like plumb/unplumb or as a result of dynamic Rx ring
905 * related operations that are driven from the mac layer.
906 */
907 static void
squeue_polling_thread(squeue_t * sqp)908 squeue_polling_thread(squeue_t *sqp)
909 {
910 kmutex_t *lock = &sqp->sq_lock;
911 kcondvar_t *async = &sqp->sq_poll_cv;
912 ip_mac_rx_t sq_get_pkts;
913 ip_accept_t ip_accept;
914 ill_rx_ring_t *sq_rx_ring;
915 ill_t *sq_ill;
916 mblk_t *head, *tail, *mp;
917 uint_t cnt;
918 void *sq_mac_handle;
919 callb_cpr_t cprinfo;
920 size_t bytes_to_pickup;
921 uint32_t ctl_state;
922
923 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_poll");
924 mutex_enter(lock);
925
926 for (;;) {
927 CALLB_CPR_SAFE_BEGIN(&cprinfo);
928 cv_wait(async, lock);
929 CALLB_CPR_SAFE_END(&cprinfo, lock);
930
931 ctl_state = sqp->sq_state & (SQS_POLL_THR_CONTROL |
932 SQS_POLL_THR_QUIESCED);
933 if (ctl_state != 0) {
934 /*
935 * If the squeue is quiesced, then wait for a control
936 * request. A quiesced squeue must not poll the
937 * underlying soft ring.
938 */
939 if (ctl_state == SQS_POLL_THR_QUIESCED)
940 continue;
941 /*
942 * Act on control requests to quiesce, cleanup or
943 * restart an squeue
944 */
945 squeue_poll_thr_control(sqp);
946 continue;
947 }
948
949 if (!(sqp->sq_state & SQS_POLL_CAPAB))
950 continue;
951
952 ASSERT((sqp->sq_state &
953 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) ==
954 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS));
955
956 poll_again:
957 sq_rx_ring = sqp->sq_rx_ring;
958 sq_get_pkts = sq_rx_ring->rr_rx;
959 sq_mac_handle = sq_rx_ring->rr_rx_handle;
960 ip_accept = sq_rx_ring->rr_ip_accept;
961 sq_ill = sq_rx_ring->rr_ill;
962 bytes_to_pickup = squeue_poll_budget_bytes;
963 mutex_exit(lock);
964 head = sq_get_pkts(sq_mac_handle, bytes_to_pickup);
965 mp = NULL;
966 if (head != NULL) {
967 /*
968 * We got the packet chain from the mac layer. It
969 * would be nice to be able to process it inline
970 * for better performance but we need to give
971 * IP a chance to look at this chain to ensure
972 * that packets are really meant for this squeue
973 * and do the IP processing.
974 */
975 mp = ip_accept(sq_ill, sq_rx_ring, sqp, head,
976 &tail, &cnt);
977 }
978 mutex_enter(lock);
979 if (mp != NULL) {
980 /*
981 * The ip_accept function has already added an
982 * ip_recv_attr_t mblk if that is needed.
983 */
984 ENQUEUE_CHAIN(sqp, mp, tail, cnt);
985 }
986 ASSERT((sqp->sq_state &
987 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) ==
988 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS));
989
990 if (sqp->sq_first != NULL && !(sqp->sq_state & SQS_WORKER)) {
991 /*
992 * We have packets to process and worker thread
993 * is not running. Check to see if poll thread is
994 * allowed to process. Let it do processing only if it
995 * picked up some packets from the NIC otherwise
996 * wakeup the worker thread.
997 */
998 if (mp != NULL) {
999 hrtime_t now;
1000
1001 now = gethrtime();
1002 sqp->sq_run = curthread;
1003 sqp->sq_drain(sqp, SQS_POLL_PROC, now +
1004 squeue_drain_ns);
1005 sqp->sq_run = NULL;
1006
1007 if (sqp->sq_first == NULL)
1008 goto poll_again;
1009
1010 /*
1011 * Couldn't do the entire drain because the
1012 * time limit expired, let the
1013 * worker thread take over.
1014 */
1015 }
1016
1017 /*
1018 * Put the SQS_PROC_HELD on so the worker
1019 * thread can distinguish where its called from. We
1020 * can remove the SQS_PROC flag here and turn off the
1021 * polling so that it wouldn't matter who gets the
1022 * processing but we get better performance this way
1023 * and save the cost of turn polling off and possibly
1024 * on again as soon as we start draining again.
1025 *
1026 * We can't remove the SQS_PROC flag without turning
1027 * polling off until we can guarantee that control
1028 * will return to squeue_drain immediately.
1029 */
1030 sqp->sq_state |= SQS_PROC_HELD;
1031 sqp->sq_state &= ~SQS_GET_PKTS;
1032 squeue_worker_wakeup(sqp);
1033 } else if (sqp->sq_first == NULL &&
1034 !(sqp->sq_state & SQS_WORKER)) {
1035 /*
1036 * Nothing queued and worker thread not running.
1037 * Since we hold the proc, no other thread is
1038 * processing the squeue. This means that there
1039 * is no work to be done and nothing is queued
1040 * in squeue or in NIC. Turn polling off and go
1041 * back to interrupt mode.
1042 */
1043 sqp->sq_state &= ~(SQS_PROC|SQS_GET_PKTS);
1044 /* LINTED: constant in conditional context */
1045 SQS_POLLING_OFF(sqp, B_TRUE, sq_rx_ring);
1046
1047 /*
1048 * If there is a pending control operation
1049 * wake up the worker, since it is currently
1050 * not running.
1051 */
1052 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) {
1053 squeue_worker_wakeup(sqp);
1054 }
1055 } else {
1056 /*
1057 * Worker thread is already running. We don't need
1058 * to do anything. Indicate that poll thread is done.
1059 */
1060 sqp->sq_state &= ~SQS_GET_PKTS;
1061 }
1062 if (sqp->sq_state & SQS_POLL_THR_CONTROL) {
1063 /*
1064 * Act on control requests to quiesce, cleanup or
1065 * restart an squeue
1066 */
1067 squeue_poll_thr_control(sqp);
1068 }
1069 }
1070 }
1071
1072 /*
1073 * The squeue worker thread acts on any control requests to quiesce, cleanup
1074 * or restart an ill_rx_ring_t by calling this function. The worker thread
1075 * synchronizes with the squeue poll thread to complete the request and finally
1076 * wakes up the requestor when the request is completed.
1077 */
1078 static void
squeue_worker_thr_control(squeue_t * sqp)1079 squeue_worker_thr_control(squeue_t *sqp)
1080 {
1081 ill_t *ill;
1082 ill_rx_ring_t *rx_ring;
1083
1084 ASSERT(MUTEX_HELD(&sqp->sq_lock));
1085
1086 if (sqp->sq_state & SQS_POLL_RESTART) {
1087 /* Restart implies a previous quiesce. */
1088 ASSERT((sqp->sq_state & (SQS_PROC_HELD |
1089 SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)) ==
1090 (SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER));
1091 /*
1092 * Request the squeue poll thread to restart and wait till
1093 * it actually restarts.
1094 */
1095 sqp->sq_state &= ~SQS_POLL_QUIESCE_DONE;
1096 sqp->sq_state |= SQS_POLL_THR_RESTART;
1097 cv_signal(&sqp->sq_poll_cv);
1098 while (sqp->sq_state & SQS_POLL_THR_QUIESCED)
1099 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1100 sqp->sq_state &= ~(SQS_POLL_RESTART | SQS_PROC |
1101 SQS_WORKER);
1102 /*
1103 * Signal any waiter that is waiting for the restart
1104 * to complete
1105 */
1106 sqp->sq_state |= SQS_POLL_RESTART_DONE;
1107 cv_signal(&sqp->sq_ctrlop_done_cv);
1108 return;
1109 }
1110
1111 if (sqp->sq_state & SQS_PROC_HELD) {
1112 /* The squeue poll thread handed control to us */
1113 ASSERT(sqp->sq_state & SQS_PROC);
1114 }
1115
1116 /*
1117 * Prevent any other thread from processing the squeue
1118 * until we finish the control actions by setting SQS_PROC.
1119 * But allow ourself to reenter by setting SQS_WORKER
1120 */
1121 sqp->sq_state |= (SQS_PROC | SQS_WORKER);
1122
1123 /* Signal the squeue poll thread and wait for it to quiesce itself */
1124 if (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) {
1125 sqp->sq_state |= SQS_POLL_THR_QUIESCE;
1126 cv_signal(&sqp->sq_poll_cv);
1127 while (!(sqp->sq_state & SQS_POLL_THR_QUIESCED))
1128 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1129 }
1130
1131 rx_ring = sqp->sq_rx_ring;
1132 ill = rx_ring->rr_ill;
1133 /*
1134 * The lock hierarchy is as follows.
1135 * cpu_lock -> ill_lock -> sqset_lock -> sq_lock
1136 */
1137 mutex_exit(&sqp->sq_lock);
1138 mutex_enter(&ill->ill_lock);
1139 mutex_enter(&sqp->sq_lock);
1140
1141 SQS_POLLING_OFF(sqp, (sqp->sq_state & SQS_POLL_CAPAB) != 0,
1142 sqp->sq_rx_ring);
1143 sqp->sq_state &= ~(SQS_POLL_CAPAB | SQS_GET_PKTS | SQS_PROC_HELD);
1144 if (sqp->sq_state & SQS_POLL_CLEANUP) {
1145 /*
1146 * Disassociate this squeue from its ill_rx_ring_t.
1147 * The rr_sqp, sq_rx_ring fields are protected by the
1148 * corresponding squeue, ill_lock* and sq_lock. Holding any
1149 * of them will ensure that the ring to squeue mapping does
1150 * not change.
1151 */
1152 ASSERT(!(sqp->sq_state & SQS_DEFAULT));
1153
1154 sqp->sq_rx_ring = NULL;
1155 rx_ring->rr_sqp = NULL;
1156
1157 sqp->sq_state &= ~(SQS_POLL_CLEANUP | SQS_POLL_THR_QUIESCED |
1158 SQS_POLL_QUIESCE_DONE);
1159 sqp->sq_ill = NULL;
1160
1161 rx_ring->rr_rx_handle = NULL;
1162 rx_ring->rr_intr_handle = NULL;
1163 rx_ring->rr_intr_enable = NULL;
1164 rx_ring->rr_intr_disable = NULL;
1165 sqp->sq_state |= SQS_POLL_CLEANUP_DONE;
1166 } else {
1167 sqp->sq_state &= ~SQS_POLL_QUIESCE;
1168 sqp->sq_state |= SQS_POLL_QUIESCE_DONE;
1169 }
1170 /*
1171 * Signal any waiter that is waiting for the quiesce or cleanup
1172 * to complete and also wait for it to actually see and reset the
1173 * SQS_POLL_CLEANUP_DONE.
1174 */
1175 cv_signal(&sqp->sq_ctrlop_done_cv);
1176 mutex_exit(&ill->ill_lock);
1177 if (sqp->sq_state & SQS_POLL_CLEANUP_DONE) {
1178 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1179 sqp->sq_state &= ~(SQS_PROC | SQS_WORKER);
1180 }
1181 }
1182
1183 static void
squeue_worker(squeue_t * sqp)1184 squeue_worker(squeue_t *sqp)
1185 {
1186 kmutex_t *lock = &sqp->sq_lock;
1187 kcondvar_t *async = &sqp->sq_worker_cv;
1188 callb_cpr_t cprinfo;
1189 hrtime_t now;
1190
1191 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_worker");
1192 mutex_enter(lock);
1193
1194 for (;;) {
1195 for (;;) {
1196 /*
1197 * If the poll thread has handed control to us
1198 * we need to break out of the wait.
1199 */
1200 if (sqp->sq_state & SQS_PROC_HELD)
1201 break;
1202
1203 /*
1204 * If the squeue is not being processed and we either
1205 * have messages to drain or some thread has signaled
1206 * some control activity we need to break
1207 */
1208 if (!(sqp->sq_state & SQS_PROC) &&
1209 ((sqp->sq_state & SQS_WORKER_THR_CONTROL) ||
1210 (sqp->sq_first != NULL)))
1211 break;
1212
1213 /*
1214 * If we have started some control action, then check
1215 * for the SQS_WORKER flag (since we don't
1216 * release the squeue) to make sure we own the squeue
1217 * and break out
1218 */
1219 if ((sqp->sq_state & SQS_WORKER_THR_CONTROL) &&
1220 (sqp->sq_state & SQS_WORKER))
1221 break;
1222
1223 CALLB_CPR_SAFE_BEGIN(&cprinfo);
1224 cv_wait(async, lock);
1225 CALLB_CPR_SAFE_END(&cprinfo, lock);
1226 }
1227 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) {
1228 squeue_worker_thr_control(sqp);
1229 continue;
1230 }
1231 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
1232 SQS_POLL_CLEANUP_DONE | SQS_POLL_QUIESCE_DONE |
1233 SQS_WORKER_THR_CONTROL | SQS_POLL_THR_CONTROL)));
1234
1235 if (sqp->sq_state & SQS_PROC_HELD)
1236 sqp->sq_state &= ~SQS_PROC_HELD;
1237
1238 now = gethrtime();
1239 sqp->sq_run = curthread;
1240 sqp->sq_drain(sqp, SQS_WORKER, now + squeue_drain_ns);
1241 sqp->sq_run = NULL;
1242 }
1243 }
1244
1245 uintptr_t *
squeue_getprivate(squeue_t * sqp,sqprivate_t p)1246 squeue_getprivate(squeue_t *sqp, sqprivate_t p)
1247 {
1248 ASSERT(p < SQPRIVATE_MAX);
1249
1250 return (&sqp->sq_private[p]);
1251 }
1252
1253 /* ARGSUSED */
1254 void
squeue_wakeup_conn(void * arg,mblk_t * mp,void * arg2,ip_recv_attr_t * dummy)1255 squeue_wakeup_conn(void *arg, mblk_t *mp, void *arg2, ip_recv_attr_t *dummy)
1256 {
1257 conn_t *connp = (conn_t *)arg;
1258 squeue_t *sqp = connp->conn_sqp;
1259
1260 /*
1261 * Mark the squeue as paused before waking up the thread stuck
1262 * in squeue_synch_enter().
1263 */
1264 mutex_enter(&sqp->sq_lock);
1265 sqp->sq_state |= SQS_PAUSE;
1266
1267 /*
1268 * Notify the thread that it's OK to proceed; that is done by
1269 * clearing the MSGWAITSYNC flag. The synch thread will free the mblk.
1270 */
1271 ASSERT(mp->b_flag & MSGWAITSYNC);
1272 mp->b_flag &= ~MSGWAITSYNC;
1273 cv_broadcast(&connp->conn_sq_cv);
1274
1275 /*
1276 * We are doing something on behalf of another thread, so we have to
1277 * pause and wait until it finishes.
1278 */
1279 while (sqp->sq_state & SQS_PAUSE) {
1280 cv_wait(&sqp->sq_synch_cv, &sqp->sq_lock);
1281 }
1282 mutex_exit(&sqp->sq_lock);
1283 }
1284
1285 int
squeue_synch_enter(conn_t * connp,mblk_t * use_mp)1286 squeue_synch_enter(conn_t *connp, mblk_t *use_mp)
1287 {
1288 squeue_t *sqp;
1289
1290 again:
1291 sqp = connp->conn_sqp;
1292
1293 mutex_enter(&sqp->sq_lock);
1294 if (sqp->sq_first == NULL && !(sqp->sq_state & SQS_PROC)) {
1295 /*
1296 * We are OK to proceed if the squeue is empty, and
1297 * no one owns the squeue.
1298 *
1299 * The caller won't own the squeue as this is called from the
1300 * application.
1301 */
1302 ASSERT(sqp->sq_run == NULL);
1303
1304 sqp->sq_state |= SQS_PROC;
1305 sqp->sq_run = curthread;
1306 mutex_exit(&sqp->sq_lock);
1307
1308 /*
1309 * Handle squeue switching. The conn's squeue can only change
1310 * while there is a thread in the squeue, which is why we do
1311 * the check after entering the squeue. If it has changed, exit
1312 * this squeue and redo everything with the new sqeueue.
1313 */
1314 if (sqp != connp->conn_sqp) {
1315 mutex_enter(&sqp->sq_lock);
1316 sqp->sq_state &= ~SQS_PROC;
1317 sqp->sq_run = NULL;
1318 mutex_exit(&sqp->sq_lock);
1319 goto again;
1320 }
1321 #if SQUEUE_DEBUG
1322 sqp->sq_curmp = NULL;
1323 sqp->sq_curproc = NULL;
1324 sqp->sq_connp = connp;
1325 #endif
1326 connp->conn_on_sqp = B_TRUE;
1327 return (0);
1328 } else {
1329 mblk_t *mp;
1330
1331 mp = (use_mp == NULL) ? allocb(0, BPRI_MED) : use_mp;
1332 if (mp == NULL) {
1333 mutex_exit(&sqp->sq_lock);
1334 return (ENOMEM);
1335 }
1336
1337 /*
1338 * We mark the mblk as awaiting synchronous squeue access
1339 * by setting the MSGWAITSYNC flag. Once squeue_wakeup_conn
1340 * fires, MSGWAITSYNC is cleared, at which point we know we
1341 * have exclusive access.
1342 */
1343 mp->b_flag |= MSGWAITSYNC;
1344
1345 CONN_INC_REF(connp);
1346 SET_SQUEUE(mp, squeue_wakeup_conn, connp);
1347 ENQUEUE_CHAIN(sqp, mp, mp, 1);
1348
1349 ASSERT(sqp->sq_run != curthread);
1350
1351 /* Wait until the enqueued mblk get processed. */
1352 while (mp->b_flag & MSGWAITSYNC)
1353 cv_wait(&connp->conn_sq_cv, &sqp->sq_lock);
1354 mutex_exit(&sqp->sq_lock);
1355
1356 if (use_mp == NULL)
1357 freeb(mp);
1358
1359 return (0);
1360 }
1361 }
1362
1363 /*
1364 * If possible, attempt to immediately process a single queued request, should
1365 * it match the supplied conn_t reference. This is primarily intended to elide
1366 * squeue worker thread wake-ups during local TCP connect() or close()
1367 * operations where the response is placed on the squeue during processing.
1368 */
1369 static void
squeue_try_drain_one(squeue_t * sqp,conn_t * compare_conn)1370 squeue_try_drain_one(squeue_t *sqp, conn_t *compare_conn)
1371 {
1372 mblk_t *next, *mp = sqp->sq_first;
1373 conn_t *connp;
1374 sqproc_t proc = (sqproc_t)mp->b_queue;
1375 ip_recv_attr_t iras, *ira = NULL;
1376
1377 ASSERT(MUTEX_HELD(&sqp->sq_lock));
1378 ASSERT((sqp->sq_state & SQS_PROC) == 0);
1379 ASSERT(sqp->sq_run == NULL);
1380 VERIFY(mp != NULL);
1381
1382 /*
1383 * There is no guarantee that compare_conn references a valid object at
1384 * this time, so under no circumstance may it be deferenced unless it
1385 * matches the squeue entry.
1386 */
1387 connp = (conn_t *)mp->b_prev;
1388 if (connp != compare_conn) {
1389 return;
1390 }
1391
1392 next = mp->b_next;
1393 proc = (sqproc_t)mp->b_queue;
1394
1395 ASSERT(proc != NULL);
1396 ASSERT(sqp->sq_count > 0);
1397
1398 /* Dequeue item from squeue */
1399 if (next == NULL) {
1400 sqp->sq_first = NULL;
1401 sqp->sq_last = NULL;
1402 } else {
1403 sqp->sq_first = next;
1404 }
1405 sqp->sq_count--;
1406
1407 sqp->sq_state |= SQS_PROC;
1408 sqp->sq_run = curthread;
1409 mutex_exit(&sqp->sq_lock);
1410
1411 /* Prep mblk_t and retrieve ira if needed */
1412 mp->b_prev = NULL;
1413 mp->b_queue = NULL;
1414 mp->b_next = NULL;
1415 if (ip_recv_attr_is_mblk(mp)) {
1416 mblk_t *attrmp = mp;
1417
1418 ASSERT(attrmp->b_cont != NULL);
1419
1420 mp = attrmp->b_cont;
1421 attrmp->b_cont = NULL;
1422
1423 ASSERT(mp->b_queue == NULL);
1424 ASSERT(mp->b_prev == NULL);
1425
1426 if (!ip_recv_attr_from_mblk(attrmp, &iras)) {
1427 /* ill_t or ip_stack_t disappeared */
1428 ip_drop_input("ip_recv_attr_from_mblk", mp, NULL);
1429 ira_cleanup(&iras, B_TRUE);
1430 CONN_DEC_REF(connp);
1431 goto done;
1432 }
1433 ira = &iras;
1434 }
1435
1436 SQUEUE_DBG_SET(sqp, mp, proc, connp, mp->b_tag);
1437 connp->conn_on_sqp = B_TRUE;
1438 DTRACE_PROBE3(squeue__proc__start, squeue_t *, sqp, mblk_t *, mp,
1439 conn_t *, connp);
1440 (*proc)(connp, mp, sqp, ira);
1441 DTRACE_PROBE2(squeue__proc__end, squeue_t *, sqp, conn_t *, connp);
1442 connp->conn_on_sqp = B_FALSE;
1443 CONN_DEC_REF(connp);
1444 SQUEUE_DBG_CLEAR(sqp);
1445
1446 if (ira != NULL)
1447 ira_cleanup(ira, B_TRUE);
1448
1449 done:
1450 mutex_enter(&sqp->sq_lock);
1451 sqp->sq_state &= ~(SQS_PROC);
1452 sqp->sq_run = NULL;
1453 }
1454
1455 void
squeue_synch_exit(conn_t * connp,int flag)1456 squeue_synch_exit(conn_t *connp, int flag)
1457 {
1458 squeue_t *sqp = connp->conn_sqp;
1459
1460 ASSERT(flag == SQ_NODRAIN || flag == SQ_PROCESS);
1461
1462 mutex_enter(&sqp->sq_lock);
1463 if (sqp->sq_run != curthread) {
1464 /*
1465 * The caller doesn't own the squeue, clear the SQS_PAUSE flag,
1466 * and wake up the squeue owner, such that owner can continue
1467 * processing.
1468 */
1469 ASSERT(sqp->sq_state & SQS_PAUSE);
1470 sqp->sq_state &= ~SQS_PAUSE;
1471
1472 /* There should be only one thread blocking on sq_synch_cv. */
1473 cv_signal(&sqp->sq_synch_cv);
1474 mutex_exit(&sqp->sq_lock);
1475 return;
1476 }
1477
1478 ASSERT(sqp->sq_state & SQS_PROC);
1479
1480 sqp->sq_state &= ~SQS_PROC;
1481 sqp->sq_run = NULL;
1482 connp->conn_on_sqp = B_FALSE;
1483
1484 /* If the caller opted in, attempt to process the head squeue item. */
1485 if (flag == SQ_PROCESS && sqp->sq_first != NULL) {
1486 squeue_try_drain_one(sqp, connp);
1487 }
1488
1489 /* Wake up the worker if further requests are pending. */
1490 if (sqp->sq_first != NULL) {
1491 squeue_worker_wakeup(sqp);
1492 }
1493 mutex_exit(&sqp->sq_lock);
1494 }
1495