xref: /illumos-gate/usr/src/uts/common/inet/squeue.c (revision 2576e7a56bb1b296053722f3ebc688cef754350f)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
23  */
24 
25 /*
26  * Copyright 2017 Joyent, Inc.
27  * Copyright 2026 Oxide Computer Company
28  */
29 
30 /*
31  * Squeues: General purpose serialization mechanism
32  * ------------------------------------------------
33  *
34  * Background:
35  * -----------
36  *
37  * This is a general purpose high-performance serialization mechanism
38  * currently used by TCP/IP. It is implement by means of a per CPU queue,
39  * a worker thread and a polling thread with are bound to the CPU
40  * associated with the squeue. The squeue is strictly FIFO for both read
41  * and write side and only one thread can process it at any given time.
42  * The design goal of squeue was to offer a very high degree of
43  * parallelization (on a per H/W execution pipeline basis) with at
44  * most one queuing.
45  *
46  * The modules needing protection typically calls SQUEUE_ENTER_ONE() or
47  * SQUEUE_ENTER() macro as soon as a thread enter the module
48  * from either direction. For each packet, the processing function
49  * and argument is stored in the mblk itself. When the packet is ready
50  * to be processed, the squeue retrieves the stored function and calls
51  * it with the supplied argument and the pointer to the packet itself.
52  * The called function can assume that no other thread is processing
53  * the squeue when it is executing.
54  *
55  * Squeue/connection binding:
56  * --------------------------
57  *
58  * TCP/IP uses an IP classifier in conjunction with squeue where specific
59  * connections are assigned to specific squeue (based on various policies),
60  * at the connection creation time. Once assigned, the connection to
61  * squeue mapping is never changed and all future packets for that
62  * connection are processed on that squeue. The connection ("conn") to
63  * squeue mapping is stored in "conn_t" member "conn_sqp".
64  *
65  * Since the processing of the connection cuts across multiple layers
66  * but still allows packets for different connnection to be processed on
67  * other CPU/squeues, squeues are also termed as "Vertical Perimeter" or
68  * "Per Connection Vertical Perimeter".
69  *
70  * Processing Model:
71  * -----------------
72  *
73  * Squeue doesn't necessary processes packets with its own worker thread.
74  * The callers can pick if they just want to queue the packet, process
75  * their packet if nothing is queued or drain and process. The first two
76  * modes are typically employed when the packet was generated while
77  * already doing the processing behind the squeue and last mode (drain
78  * and process) is typically employed when the thread is entering squeue
79  * for the first time. The squeue still imposes a finite time limit
80  * for which a external thread can do processing after which it switches
81  * processing to its own worker thread.
82  *
83  * Once created, squeues are never deleted. Hence squeue pointers are
84  * always valid. This means that functions outside the squeue can still
85  * refer safely to conn_sqp and their is no need for ref counts.
86  *
87  * Only a thread executing in the squeue can change the squeue of the
88  * connection. It does so by calling a squeue framework function to do this.
89  * After changing the squeue, the thread must leave the squeue. It must not
90  * continue to execute any code that needs squeue protection.
91  *
92  * The squeue framework, after entering the squeue, checks if the current
93  * squeue matches the conn_sqp. If the check fails, the packet is delivered
94  * to right squeue.
95  *
96  * Polling Model:
97  * --------------
98  *
99  * Squeues can control the rate of packet arrival into itself from the
100  * NIC or specific Rx ring within a NIC. As part of capability negotiation
101  * between IP and MAC layer, squeue are created for each TCP soft ring
102  * (or TCP Rx ring - to be implemented in future). As part of this
103  * negotiation, squeues get a cookie for underlying soft ring or Rx
104  * ring, a function to turn off incoming packets and a function to call
105  * to poll for packets. This helps schedule the receive side packet
106  * processing so that queue backlog doesn't build up and packet processing
107  * doesn't keep getting disturbed by high priority interrupts. As part
108  * of this mode, as soon as a backlog starts building, squeue turns off
109  * the interrupts and switches to poll mode. In poll mode, when poll
110  * thread goes down to retrieve packets, it retrieves them in the form of
111  * a chain which improves performance even more. As the squeue/softring
112  * system gets more packets, it gets more efficient by switching to
113  * polling more often and dealing with larger packet chains.
114  *
115  */
116 
117 #include <sys/types.h>
118 #include <sys/cmn_err.h>
119 #include <sys/debug.h>
120 #include <sys/kmem.h>
121 #include <sys/cpuvar.h>
122 #include <sys/condvar_impl.h>
123 #include <sys/systm.h>
124 #include <sys/callb.h>
125 #include <sys/sdt.h>
126 #include <sys/ddi.h>
127 #include <sys/sunddi.h>
128 #include <sys/stack.h>
129 #include <sys/archsystm.h>
130 
131 #include <inet/ipclassifier.h>
132 #include <inet/udp_impl.h>
133 
134 #include <sys/squeue_impl.h>
135 
136 static void squeue_drain(squeue_t *, uint_t, hrtime_t);
137 static void squeue_worker(squeue_t *sqp);
138 static void squeue_polling_thread(squeue_t *sqp);
139 static void squeue_worker_wakeup(squeue_t *sqp);
140 static void squeue_try_drain_one(squeue_t *, conn_t *);
141 
142 kmem_cache_t *squeue_cache;
143 
144 #define	SQUEUE_MSEC_TO_NSEC 1000000
145 
146 int squeue_drain_ms = 20;
147 
148 /* The values above converted to ticks or nano seconds */
149 static uint_t squeue_drain_ns = 0;
150 
151 uintptr_t squeue_drain_stack_needed = 10240;
152 uint_t squeue_drain_stack_toodeep;
153 
154 /*
155  * The number of bytes the squeue is allowed to poll from the softring in a
156  * single read. The accounting is done on a per-mblk basis, so the squeue may
157  * poll one mblk/MTU worth of data over the limit.
158  */
159 size_t squeue_poll_budget_bytes = 150000;
160 
161 #define	ENQUEUE_CHAIN(sqp, mp, tail, cnt) {			\
162 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
163 								\
164 	if ((sqp)->sq_last != NULL)				\
165 		(sqp)->sq_last->b_next = (mp);			\
166 	else							\
167 		(sqp)->sq_first = (mp);				\
168 	(sqp)->sq_last = (tail);				\
169 	(sqp)->sq_count += (cnt);				\
170 	ASSERT((sqp)->sq_count > 0);				\
171 	DTRACE_PROBE4(squeue__enqueuechain, squeue_t *, sqp,	\
172 		mblk_t *, mp, mblk_t *, tail, int, cnt);	\
173 								\
174 }
175 
176 /*
177  * Blank the receive ring (in this case it is the soft ring). When
178  * blanked, the soft ring will not send any more packets up.
179  * Blanking may not succeed when there is a CPU already in the soft
180  * ring sending packets up. In that case, SQS_POLLING will not be
181  * set.
182  */
183 #define	SQS_POLLING_ON(sqp, sq_poll_capable, rx_ring) {		\
184 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
185 	if (sq_poll_capable) {					\
186 		ASSERT(rx_ring != NULL);			\
187 		ASSERT(sqp->sq_state & SQS_POLL_CAPAB);		\
188 		if (!(sqp->sq_state & SQS_POLLING)) {		\
189 			if (rx_ring->rr_intr_disable(rx_ring->rr_intr_handle)) \
190 				sqp->sq_state |= SQS_POLLING;	\
191 		}						\
192 	}							\
193 }
194 
195 #define	SQS_POLLING_OFF(sqp, sq_poll_capable, rx_ring) {	\
196 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
197 	if (sq_poll_capable) {					\
198 		ASSERT(rx_ring != NULL);			\
199 		ASSERT(sqp->sq_state & SQS_POLL_CAPAB);		\
200 		if (sqp->sq_state & SQS_POLLING) {		\
201 			sqp->sq_state &= ~SQS_POLLING;		\
202 			rx_ring->rr_intr_enable(rx_ring->rr_intr_handle); \
203 		}						\
204 	}							\
205 }
206 
207 /* Wakeup poll thread only if SQS_POLLING is set */
208 #define	SQS_POLL_RING(sqp) {			\
209 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
210 	if (sqp->sq_state & SQS_POLLING) {			\
211 		ASSERT(sqp->sq_state & SQS_POLL_CAPAB);		\
212 		if (!(sqp->sq_state & SQS_GET_PKTS)) {		\
213 			sqp->sq_state |= SQS_GET_PKTS;		\
214 			cv_signal(&sqp->sq_poll_cv);		\
215 		}						\
216 	}							\
217 }
218 
219 #ifdef DEBUG
220 #define	SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) {		\
221 	(sqp)->sq_curmp = (mp);					\
222 	(sqp)->sq_curproc = (proc);				\
223 	(sqp)->sq_connp = (connp);				\
224 	(mp)->b_tag = (sqp)->sq_tag = (tag);			\
225 }
226 
227 #define	SQUEUE_DBG_CLEAR(sqp)	{				\
228 	(sqp)->sq_curmp = NULL;					\
229 	(sqp)->sq_curproc = NULL;				\
230 	(sqp)->sq_connp = NULL;					\
231 }
232 #else
233 #define	SQUEUE_DBG_SET(sqp, mp, proc, connp, tag)
234 #define	SQUEUE_DBG_CLEAR(sqp)
235 #endif
236 
237 void
238 squeue_init(void)
239 {
240 	squeue_cache = kmem_cache_create("squeue_cache",
241 	    sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0);
242 
243 	squeue_drain_ns = squeue_drain_ms * SQUEUE_MSEC_TO_NSEC;
244 }
245 
246 squeue_t *
247 squeue_create(pri_t pri)
248 {
249 	squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP);
250 
251 	bzero(sqp, sizeof (squeue_t));
252 	sqp->sq_bind = PBIND_NONE;
253 	sqp->sq_priority = pri;
254 	sqp->sq_worker = thread_create(NULL, 0, squeue_worker,
255 	    sqp, 0, &p0, TS_RUN, pri);
256 
257 	sqp->sq_poll_thr = thread_create(NULL, 0, squeue_polling_thread,
258 	    sqp, 0, &p0, TS_RUN, pri);
259 
260 	sqp->sq_enter = squeue_enter;
261 	sqp->sq_drain = squeue_drain;
262 
263 	return (sqp);
264 }
265 
266 /*
267  * Bind squeue worker thread to the specified CPU, given by CPU id.
268  * If the CPU id  value is -1, bind the worker thread to the value
269  * specified in sq_bind field. If a thread is already bound to a
270  * different CPU, unbind it from the old CPU and bind to the new one.
271  */
272 
273 void
274 squeue_bind(squeue_t *sqp, processorid_t bind)
275 {
276 	mutex_enter(&sqp->sq_lock);
277 	ASSERT(sqp->sq_bind != PBIND_NONE || bind != PBIND_NONE);
278 	ASSERT(MUTEX_HELD(&cpu_lock));
279 
280 	if (sqp->sq_state & SQS_BOUND) {
281 		if (sqp->sq_bind == bind) {
282 			mutex_exit(&sqp->sq_lock);
283 			return;
284 		}
285 		thread_affinity_clear(sqp->sq_worker);
286 	} else {
287 		sqp->sq_state |= SQS_BOUND;
288 	}
289 
290 	if (bind != PBIND_NONE)
291 		sqp->sq_bind = bind;
292 
293 	thread_affinity_set(sqp->sq_worker, sqp->sq_bind);
294 	mutex_exit(&sqp->sq_lock);
295 }
296 
297 void
298 squeue_unbind(squeue_t *sqp)
299 {
300 	mutex_enter(&sqp->sq_lock);
301 	if (!(sqp->sq_state & SQS_BOUND)) {
302 		mutex_exit(&sqp->sq_lock);
303 		return;
304 	}
305 
306 	sqp->sq_state &= ~SQS_BOUND;
307 	thread_affinity_clear(sqp->sq_worker);
308 	mutex_exit(&sqp->sq_lock);
309 }
310 
311 /*
312  * squeue_enter() - enter squeue sqp with mblk mp (which can be
313  * a chain), while tail points to the end and cnt in number of
314  * mblks in the chain.
315  *
316  * For a chain of single packet (i.e. mp == tail), go through the
317  * fast path if no one is processing the squeue and nothing is queued.
318  *
319  * The proc and arg for each mblk is already stored in the mblk in
320  * appropriate places.
321  *
322  * The process_flag specifies if we are allowed to process the mblk
323  * and drain in the entering thread context. If process_flag is
324  * SQ_FILL, then we just queue the mblk and return (after signaling
325  * the worker thread if no one else is processing the squeue).
326  *
327  * The ira argument can be used when the count is one.
328  * For a chain the caller needs to prepend any needed mblks from
329  * ip_recv_attr_to_mblk().
330  */
331 /* ARGSUSED */
332 void
333 squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt,
334     ip_recv_attr_t *ira, int process_flag, uint8_t tag)
335 {
336 	conn_t		*connp;
337 	sqproc_t	proc;
338 	hrtime_t	now;
339 
340 	ASSERT(sqp != NULL);
341 	ASSERT(mp != NULL);
342 	ASSERT(tail != NULL);
343 	ASSERT(cnt > 0);
344 	ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
345 	ASSERT(ira == NULL || cnt == 1);
346 
347 	mutex_enter(&sqp->sq_lock);
348 
349 	/*
350 	 * Try to process the packet if SQ_FILL flag is not set and
351 	 * we are allowed to process the squeue. The SQ_NODRAIN is
352 	 * ignored if the packet chain consists of more than 1 packet.
353 	 */
354 	if (!(sqp->sq_state & SQS_PROC) && ((process_flag == SQ_PROCESS) ||
355 	    (process_flag == SQ_NODRAIN && sqp->sq_first == NULL))) {
356 		/*
357 		 * See if anything is already queued. If we are the
358 		 * first packet, do inline processing else queue the
359 		 * packet and do the drain.
360 		 */
361 		if (sqp->sq_first == NULL && cnt == 1) {
362 			/*
363 			 * Fast-path, ok to process and nothing queued.
364 			 */
365 			sqp->sq_state |= (SQS_PROC|SQS_FAST);
366 			sqp->sq_run = curthread;
367 			mutex_exit(&sqp->sq_lock);
368 
369 			/*
370 			 * We are the chain of 1 packet so
371 			 * go through this fast path.
372 			 */
373 			ASSERT(mp->b_prev != NULL);
374 			ASSERT(mp->b_queue != NULL);
375 			connp = (conn_t *)mp->b_prev;
376 			mp->b_prev = NULL;
377 			proc = (sqproc_t)mp->b_queue;
378 			mp->b_queue = NULL;
379 			ASSERT(proc != NULL && connp != NULL);
380 			ASSERT(mp->b_next == NULL);
381 
382 			/*
383 			 * Handle squeue switching. More details in the
384 			 * block comment at the top of the file
385 			 */
386 			if (connp->conn_sqp == sqp) {
387 				SQUEUE_DBG_SET(sqp, mp, proc, connp,
388 				    tag);
389 				connp->conn_on_sqp = B_TRUE;
390 				DTRACE_PROBE3(squeue__proc__start, squeue_t *,
391 				    sqp, mblk_t *, mp, conn_t *, connp);
392 				(*proc)(connp, mp, sqp, ira);
393 				DTRACE_PROBE2(squeue__proc__end, squeue_t *,
394 				    sqp, conn_t *, connp);
395 				connp->conn_on_sqp = B_FALSE;
396 				SQUEUE_DBG_CLEAR(sqp);
397 				CONN_DEC_REF(connp);
398 			} else {
399 				SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc,
400 				    connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE);
401 			}
402 			ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
403 			mutex_enter(&sqp->sq_lock);
404 			sqp->sq_state &= ~(SQS_PROC|SQS_FAST);
405 			sqp->sq_run = NULL;
406 			if (sqp->sq_first == NULL ||
407 			    process_flag == SQ_NODRAIN) {
408 				/*
409 				 * Even if SQ_NODRAIN was specified, it may
410 				 * still be best to process a single queued
411 				 * item if it matches the active connection.
412 				 */
413 				if (sqp->sq_first != NULL) {
414 					squeue_try_drain_one(sqp, connp);
415 				}
416 
417 				/*
418 				 * If work or control actions are pending, wake
419 				 * up the worker thread.
420 				 */
421 				if (sqp->sq_first != NULL ||
422 				    sqp->sq_state & SQS_WORKER_THR_CONTROL) {
423 					squeue_worker_wakeup(sqp);
424 				}
425 				mutex_exit(&sqp->sq_lock);
426 				return;
427 			}
428 		} else {
429 			if (ira != NULL) {
430 				mblk_t	*attrmp;
431 
432 				ASSERT(cnt == 1);
433 				attrmp = ip_recv_attr_to_mblk(ira);
434 				if (attrmp == NULL) {
435 					mutex_exit(&sqp->sq_lock);
436 					ip_drop_input("squeue: "
437 					    "ip_recv_attr_to_mblk",
438 					    mp, NULL);
439 					/* Caller already set b_prev/b_next */
440 					mp->b_prev = mp->b_next = NULL;
441 					freemsg(mp);
442 					return;
443 				}
444 				ASSERT(attrmp->b_cont == NULL);
445 				attrmp->b_cont = mp;
446 				/* Move connp and func to new */
447 				attrmp->b_queue = mp->b_queue;
448 				mp->b_queue = NULL;
449 				attrmp->b_prev = mp->b_prev;
450 				mp->b_prev = NULL;
451 
452 				ASSERT(mp == tail);
453 				tail = mp = attrmp;
454 			}
455 
456 			ENQUEUE_CHAIN(sqp, mp, tail, cnt);
457 #ifdef DEBUG
458 			mp->b_tag = tag;
459 #endif
460 		}
461 		/*
462 		 * We are here because either we couldn't do inline
463 		 * processing (because something was already queued),
464 		 * or we had a chain of more than one packet,
465 		 * or something else arrived after we were done with
466 		 * inline processing.
467 		 */
468 		ASSERT(MUTEX_HELD(&sqp->sq_lock));
469 		ASSERT(sqp->sq_first != NULL);
470 		now = gethrtime();
471 		sqp->sq_run = curthread;
472 		sqp->sq_drain(sqp, SQS_ENTER, now + squeue_drain_ns);
473 
474 		/*
475 		 * If we didn't do a complete drain, the worker
476 		 * thread was already signalled by squeue_drain.
477 		 * In case any control actions are pending, wake
478 		 * up the worker.
479 		 */
480 		sqp->sq_run = NULL;
481 		if (sqp->sq_state & SQS_WORKER_THR_CONTROL) {
482 			squeue_worker_wakeup(sqp);
483 		}
484 	} else {
485 		/*
486 		 * We let a thread processing a squeue reenter only
487 		 * once. This helps the case of incoming connection
488 		 * where a SYN-ACK-ACK that triggers the conn_ind
489 		 * doesn't have to queue the packet if listener and
490 		 * eager are on the same squeue. Also helps the
491 		 * loopback connection where the two ends are bound
492 		 * to the same squeue (which is typical on single
493 		 * CPU machines).
494 		 *
495 		 * We let the thread reenter only once for the fear
496 		 * of stack getting blown with multiple traversal.
497 		 */
498 		connp = (conn_t *)mp->b_prev;
499 		if (!(sqp->sq_state & SQS_REENTER) &&
500 		    (process_flag != SQ_FILL) && (sqp->sq_first == NULL) &&
501 		    (sqp->sq_run == curthread) && (cnt == 1) &&
502 		    (connp->conn_on_sqp == B_FALSE)) {
503 			sqp->sq_state |= SQS_REENTER;
504 			mutex_exit(&sqp->sq_lock);
505 
506 			ASSERT(mp->b_prev != NULL);
507 			ASSERT(mp->b_queue != NULL);
508 
509 			mp->b_prev = NULL;
510 			proc = (sqproc_t)mp->b_queue;
511 			mp->b_queue = NULL;
512 
513 			/*
514 			 * Handle squeue switching. More details in the
515 			 * block comment at the top of the file
516 			 */
517 			if (connp->conn_sqp == sqp) {
518 				connp->conn_on_sqp = B_TRUE;
519 				DTRACE_PROBE3(squeue__proc__start, squeue_t *,
520 				    sqp, mblk_t *, mp, conn_t *, connp);
521 				(*proc)(connp, mp, sqp, ira);
522 				DTRACE_PROBE2(squeue__proc__end, squeue_t *,
523 				    sqp, conn_t *, connp);
524 				connp->conn_on_sqp = B_FALSE;
525 				CONN_DEC_REF(connp);
526 			} else {
527 				SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc,
528 				    connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE);
529 			}
530 
531 			mutex_enter(&sqp->sq_lock);
532 			sqp->sq_state &= ~SQS_REENTER;
533 			mutex_exit(&sqp->sq_lock);
534 			return;
535 		}
536 
537 		/*
538 		 * Queue is already being processed or there is already
539 		 * one or more paquets on the queue. Enqueue the
540 		 * packet and wakeup the squeue worker thread if the
541 		 * squeue is not being processed.
542 		 */
543 #ifdef DEBUG
544 		mp->b_tag = tag;
545 #endif
546 		if (ira != NULL) {
547 			mblk_t	*attrmp;
548 
549 			ASSERT(cnt == 1);
550 			attrmp = ip_recv_attr_to_mblk(ira);
551 			if (attrmp == NULL) {
552 				mutex_exit(&sqp->sq_lock);
553 				ip_drop_input("squeue: ip_recv_attr_to_mblk",
554 				    mp, NULL);
555 				/* Caller already set b_prev/b_next */
556 				mp->b_prev = mp->b_next = NULL;
557 				freemsg(mp);
558 				return;
559 			}
560 			ASSERT(attrmp->b_cont == NULL);
561 			attrmp->b_cont = mp;
562 			/* Move connp and func to new */
563 			attrmp->b_queue = mp->b_queue;
564 			mp->b_queue = NULL;
565 			attrmp->b_prev = mp->b_prev;
566 			mp->b_prev = NULL;
567 
568 			ASSERT(mp == tail);
569 			tail = mp = attrmp;
570 		}
571 		ENQUEUE_CHAIN(sqp, mp, tail, cnt);
572 		/*
573 		 * If the worker isn't running or control actions are pending,
574 		 * wake it it up now.
575 		 */
576 		if ((sqp->sq_state & SQS_PROC) == 0 ||
577 		    (sqp->sq_state & SQS_WORKER_THR_CONTROL) != 0) {
578 			squeue_worker_wakeup(sqp);
579 		}
580 	}
581 	mutex_exit(&sqp->sq_lock);
582 }
583 
584 /*
585  * PRIVATE FUNCTIONS
586  */
587 
588 
589 /*
590  * Wake up worker thread for squeue to process queued work.
591  */
592 static void
593 squeue_worker_wakeup(squeue_t *sqp)
594 {
595 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));
596 
597 	cv_signal(&sqp->sq_worker_cv);
598 	sqp->sq_awoken = gethrtime();
599 }
600 
601 static void
602 squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire)
603 {
604 	mblk_t		*mp;
605 	mblk_t		*head;
606 	sqproc_t	proc;
607 	conn_t		*connp;
608 	ill_rx_ring_t	*sq_rx_ring = sqp->sq_rx_ring;
609 	hrtime_t	now;
610 	boolean_t	sq_poll_capable;
611 	ip_recv_attr_t	*ira, iras;
612 
613 	/*
614 	 * Before doing any work, check our stack depth; if we're not a
615 	 * worker thread for this squeue and we're beginning to get tight on
616 	 * on stack, kick the worker, bump a counter and return.
617 	 */
618 	if (proc_type != SQS_WORKER && STACK_BIAS + (uintptr_t)getfp() -
619 	    (uintptr_t)curthread->t_stkbase < squeue_drain_stack_needed) {
620 		ASSERT(mutex_owned(&sqp->sq_lock));
621 		squeue_worker_wakeup(sqp);
622 		squeue_drain_stack_toodeep++;
623 		return;
624 	}
625 
626 	sq_poll_capable = (sqp->sq_state & SQS_POLL_CAPAB) != 0;
627 again:
628 	ASSERT(mutex_owned(&sqp->sq_lock));
629 	ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
630 	    SQS_POLL_QUIESCE_DONE)));
631 
632 	head = sqp->sq_first;
633 	sqp->sq_first = NULL;
634 	sqp->sq_last = NULL;
635 	sqp->sq_count = 0;
636 
637 	sqp->sq_state |= SQS_PROC | proc_type;
638 
639 	/*
640 	 * We have backlog built up. Switch to polling mode if the
641 	 * device underneath allows it. Need to do it so that
642 	 * more packets don't come in and disturb us (by contending
643 	 * for sq_lock or higher priority thread preempting us).
644 	 *
645 	 * The worker thread is allowed to do active polling while we
646 	 * just disable the interrupts for drain by non worker (kernel
647 	 * or userland) threads so they can peacefully process the
648 	 * packets during time allocated to them.
649 	 */
650 	SQS_POLLING_ON(sqp, sq_poll_capable, sq_rx_ring);
651 	mutex_exit(&sqp->sq_lock);
652 
653 	while ((mp = head) != NULL) {
654 
655 		head = mp->b_next;
656 		mp->b_next = NULL;
657 
658 		proc = (sqproc_t)mp->b_queue;
659 		mp->b_queue = NULL;
660 		connp = (conn_t *)mp->b_prev;
661 		mp->b_prev = NULL;
662 
663 		/* Is there an ip_recv_attr_t to handle? */
664 		if (ip_recv_attr_is_mblk(mp)) {
665 			mblk_t	*attrmp = mp;
666 
667 			ASSERT(attrmp->b_cont != NULL);
668 
669 			mp = attrmp->b_cont;
670 			attrmp->b_cont = NULL;
671 			ASSERT(mp->b_queue == NULL);
672 			ASSERT(mp->b_prev == NULL);
673 
674 			if (!ip_recv_attr_from_mblk(attrmp, &iras)) {
675 				/* The ill or ip_stack_t disappeared on us */
676 				ip_drop_input("ip_recv_attr_from_mblk",
677 				    mp, NULL);
678 				ira_cleanup(&iras, B_TRUE);
679 				CONN_DEC_REF(connp);
680 				continue;
681 			}
682 			ira = &iras;
683 		} else {
684 			ira = NULL;
685 		}
686 
687 
688 		/*
689 		 * Handle squeue switching. More details in the
690 		 * block comment at the top of the file
691 		 */
692 		if (connp->conn_sqp == sqp) {
693 			SQUEUE_DBG_SET(sqp, mp, proc, connp,
694 			    mp->b_tag);
695 			connp->conn_on_sqp = B_TRUE;
696 			DTRACE_PROBE3(squeue__proc__start, squeue_t *,
697 			    sqp, mblk_t *, mp, conn_t *, connp);
698 			(*proc)(connp, mp, sqp, ira);
699 			DTRACE_PROBE2(squeue__proc__end, squeue_t *,
700 			    sqp, conn_t *, connp);
701 			connp->conn_on_sqp = B_FALSE;
702 			CONN_DEC_REF(connp);
703 		} else {
704 			SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, connp, ira,
705 			    SQ_FILL, SQTAG_SQUEUE_CHANGE);
706 		}
707 		if (ira != NULL)
708 			ira_cleanup(ira, B_TRUE);
709 	}
710 
711 	SQUEUE_DBG_CLEAR(sqp);
712 
713 	mutex_enter(&sqp->sq_lock);
714 
715 	/*
716 	 * Check if there is still work to do (either more arrived or timer
717 	 * expired). If we are the worker thread and we are polling capable,
718 	 * continue doing the work since no one else is around to do the
719 	 * work anyway (but signal the poll thread to retrieve some packets
720 	 * in the meanwhile). If we are not the worker thread, just
721 	 * signal the worker thread to take up the work if processing time
722 	 * has expired.
723 	 */
724 	if (sqp->sq_first != NULL) {
725 		/*
726 		 * Still more to process. If time quanta not expired, we
727 		 * should let the drain go on. The worker thread is allowed
728 		 * to drain as long as there is anything left.
729 		 */
730 		now = gethrtime();
731 		if ((now < expire) || (proc_type == SQS_WORKER)) {
732 			/*
733 			 * If time not expired or we are worker thread and
734 			 * this squeue is polling capable, continue to do
735 			 * the drain.
736 			 *
737 			 * We turn off interrupts for all userland threads
738 			 * doing drain but we do active polling only for
739 			 * worker thread.
740 			 *
741 			 * Calling SQS_POLL_RING() even in the case of
742 			 * SQS_POLLING_ON() not succeeding is ok as
743 			 * SQS_POLL_RING() will not wake up poll thread
744 			 * if SQS_POLLING bit is not set.
745 			 */
746 			if (proc_type == SQS_WORKER)
747 				SQS_POLL_RING(sqp);
748 			goto again;
749 		}
750 
751 		squeue_worker_wakeup(sqp);
752 	}
753 
754 	/*
755 	 * If the poll thread is already running, just return. The
756 	 * poll thread continues to hold the proc and will finish
757 	 * processing.
758 	 */
759 	if (sqp->sq_state & SQS_GET_PKTS) {
760 		ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
761 		    SQS_POLL_QUIESCE_DONE)));
762 		sqp->sq_state &= ~proc_type;
763 		return;
764 	}
765 
766 	/*
767 	 *
768 	 * If we are the worker thread and no work is left, send the poll
769 	 * thread down once more to see if something arrived. Otherwise,
770 	 * turn the interrupts back on and we are done.
771 	 */
772 	if ((proc_type == SQS_WORKER) && (sqp->sq_state & SQS_POLLING)) {
773 		/*
774 		 * Do one last check to see if anything arrived
775 		 * in the NIC. We leave the SQS_PROC set to ensure
776 		 * that poll thread keeps the PROC and can decide
777 		 * if it needs to turn polling off or continue
778 		 * processing.
779 		 *
780 		 * If we drop the SQS_PROC here and poll thread comes
781 		 * up empty handed, it can not safely turn polling off
782 		 * since someone else could have acquired the PROC
783 		 * and started draining. The previously running poll
784 		 * thread and the current thread doing drain would end
785 		 * up in a race for turning polling on/off and more
786 		 * complex code would be required to deal with it.
787 		 *
788 		 * Its lot simpler for drain to hand the SQS_PROC to
789 		 * poll thread (if running) and let poll thread finish
790 		 * without worrying about racing with any other thread.
791 		 */
792 		ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
793 		    SQS_POLL_QUIESCE_DONE)));
794 		SQS_POLL_RING(sqp);
795 		sqp->sq_state &= ~proc_type;
796 	} else {
797 		/*
798 		 * The squeue is either not capable of polling or the
799 		 * attempt to blank (i.e., turn SQS_POLLING_ON()) was
800 		 * unsuccessful or poll thread already finished
801 		 * processing and didn't find anything. Since there
802 		 * is nothing queued and we already turn polling on
803 		 * (for all threads doing drain), we should turn
804 		 * polling off and relinquish the PROC.
805 		 */
806 		ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
807 		    SQS_POLL_QUIESCE_DONE)));
808 		SQS_POLLING_OFF(sqp, sq_poll_capable, sq_rx_ring);
809 		sqp->sq_state &= ~(SQS_PROC | proc_type);
810 		/*
811 		 * If we are not the worker and there is a pending quiesce
812 		 * event, wake up the worker
813 		 */
814 		if ((proc_type != SQS_WORKER) &&
815 		    (sqp->sq_state & SQS_WORKER_THR_CONTROL)) {
816 			squeue_worker_wakeup(sqp);
817 		}
818 	}
819 }
820 
821 /*
822  * Quiesce, Restart, or Cleanup of the squeue poll thread.
823  *
824  * Quiesce and Restart: After an squeue poll thread has been quiesced, it does
825  * not attempt to poll the underlying soft ring any more. The quiesce is
826  * triggered by the mac layer when it wants to quiesce a soft ring. Typically
827  * control operations such as changing the fanout of a NIC or VNIC (dladm
828  * setlinkprop) need to quiesce data flow before changing the wiring.
829  * The operation is done by the mac layer, but it calls back into IP to
830  * quiesce the soft ring. After completing the operation (say increase or
831  * decrease of the fanout) the mac layer then calls back into IP to restart
832  * the quiesced soft ring.
833  *
834  * Cleanup: This is triggered when the squeue binding to a soft ring is
835  * removed permanently. Typically interface plumb and unplumb would trigger
836  * this. It can also be triggered from the mac layer when a soft ring is
837  * being deleted say as the result of a fanout reduction. Since squeues are
838  * never deleted, the cleanup marks the squeue as fit for recycling and
839  * moves it to the zeroth squeue set.
840  */
841 static void
842 squeue_poll_thr_control(squeue_t *sqp)
843 {
844 	if (sqp->sq_state & SQS_POLL_THR_RESTART) {
845 		/* Restart implies a previous quiesce */
846 		ASSERT(sqp->sq_state & SQS_POLL_THR_QUIESCED);
847 		sqp->sq_state &= ~(SQS_POLL_THR_QUIESCED |
848 		    SQS_POLL_THR_RESTART);
849 		sqp->sq_state |= SQS_POLL_CAPAB;
850 		cv_signal(&sqp->sq_worker_cv);
851 		return;
852 	}
853 
854 	if (sqp->sq_state & SQS_POLL_THR_QUIESCE) {
855 		sqp->sq_state |= SQS_POLL_THR_QUIESCED;
856 		sqp->sq_state &= ~SQS_POLL_THR_QUIESCE;
857 		cv_signal(&sqp->sq_worker_cv);
858 		return;
859 	}
860 }
861 
862 /*
863  * POLLING Notes
864  *
865  * With polling mode, we want to do as much processing as we possibly can
866  * in worker thread context. The sweet spot is worker thread keeps doing
867  * work all the time in polling mode and writers etc. keep dumping packets
868  * to worker thread. Occassionally, we send the poll thread (running at
869  * lower priority to NIC to get the chain of packets to feed to worker).
870  * Sending the poll thread down to NIC is dependant on 3 criterions
871  *
872  * 1) Its always driven from squeue_drain and only if worker thread is
873  *	doing the drain.
874  * 2) We clear the backlog once and more packets arrived in between.
875  *	Before starting drain again, send the poll thread down if
876  *	the drain is being done by worker thread.
877  * 3) Before exiting the squeue_drain, if the poll thread is not already
878  *	working and we are the worker thread, try to poll one more time.
879  *
880  * For latency sake, we do allow any thread calling squeue_enter
881  * to process its packet provided:
882  *
883  * 1) Nothing is queued
884  * 2) If more packets arrived in between, the non worker thread are allowed
885  *	to do the drain till their time quanta expired provided SQS_GET_PKTS
886  *	wasn't set in between.
887  *
888  * Avoiding deadlocks with interrupts
889  * ==================================
890  *
891  * One of the big problem is that we can't send poll_thr down while holding
892  * the sq_lock since the thread can block. So we drop the sq_lock before
893  * calling sq_get_pkts(). We keep holding the SQS_PROC as long as the
894  * poll thread is running so that no other thread can acquire the
895  * perimeter in between. If the squeue_drain gets done (no more work
896  * left), it leaves the SQS_PROC set if poll thread is running.
897  */
898 
899 /*
900  * This is the squeue poll thread. In poll mode, it polls the underlying
901  * TCP softring and feeds packets into the squeue. The worker thread then
902  * drains the squeue. The poll thread also responds to control signals for
903  * quiesceing, restarting, or cleanup of an squeue. These are driven by
904  * control operations like plumb/unplumb or as a result of dynamic Rx ring
905  * related operations that are driven from the mac layer.
906  */
907 static void
908 squeue_polling_thread(squeue_t *sqp)
909 {
910 	kmutex_t *lock = &sqp->sq_lock;
911 	kcondvar_t *async = &sqp->sq_poll_cv;
912 	ip_mac_rx_t sq_get_pkts;
913 	ip_accept_t ip_accept;
914 	ill_rx_ring_t *sq_rx_ring;
915 	ill_t *sq_ill;
916 	mblk_t *head, *tail, *mp;
917 	uint_t cnt;
918 	void *sq_mac_handle;
919 	callb_cpr_t cprinfo;
920 	size_t bytes_to_pickup;
921 	uint32_t ctl_state;
922 
923 	CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_poll");
924 	mutex_enter(lock);
925 
926 	for (;;) {
927 		CALLB_CPR_SAFE_BEGIN(&cprinfo);
928 		cv_wait(async, lock);
929 		CALLB_CPR_SAFE_END(&cprinfo, lock);
930 
931 		ctl_state = sqp->sq_state & (SQS_POLL_THR_CONTROL |
932 		    SQS_POLL_THR_QUIESCED);
933 		if (ctl_state != 0) {
934 			/*
935 			 * If the squeue is quiesced, then wait for a control
936 			 * request. A quiesced squeue must not poll the
937 			 * underlying soft ring.
938 			 */
939 			if (ctl_state == SQS_POLL_THR_QUIESCED)
940 				continue;
941 			/*
942 			 * Act on control requests to quiesce, cleanup or
943 			 * restart an squeue
944 			 */
945 			squeue_poll_thr_control(sqp);
946 			continue;
947 		}
948 
949 		if (!(sqp->sq_state & SQS_POLL_CAPAB))
950 			continue;
951 
952 		ASSERT((sqp->sq_state &
953 		    (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) ==
954 		    (SQS_PROC|SQS_POLLING|SQS_GET_PKTS));
955 
956 poll_again:
957 		sq_rx_ring = sqp->sq_rx_ring;
958 		sq_get_pkts = sq_rx_ring->rr_rx;
959 		sq_mac_handle = sq_rx_ring->rr_rx_handle;
960 		ip_accept = sq_rx_ring->rr_ip_accept;
961 		sq_ill = sq_rx_ring->rr_ill;
962 		bytes_to_pickup = squeue_poll_budget_bytes;
963 		mutex_exit(lock);
964 		head = sq_get_pkts(sq_mac_handle, bytes_to_pickup);
965 		mp = NULL;
966 		if (head != NULL) {
967 			/*
968 			 * We got the packet chain from the mac layer. It
969 			 * would be nice to be able to process it inline
970 			 * for better performance but we need to give
971 			 * IP a chance to look at this chain to ensure
972 			 * that packets are really meant for this squeue
973 			 * and do the IP processing.
974 			 */
975 			mp = ip_accept(sq_ill, sq_rx_ring, sqp, head,
976 			    &tail, &cnt);
977 		}
978 		mutex_enter(lock);
979 		if (mp != NULL) {
980 			/*
981 			 * The ip_accept function has already added an
982 			 * ip_recv_attr_t mblk if that is needed.
983 			 */
984 			ENQUEUE_CHAIN(sqp, mp, tail, cnt);
985 		}
986 		ASSERT((sqp->sq_state &
987 		    (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) ==
988 		    (SQS_PROC|SQS_POLLING|SQS_GET_PKTS));
989 
990 		if (sqp->sq_first != NULL && !(sqp->sq_state & SQS_WORKER)) {
991 			/*
992 			 * We have packets to process and worker thread
993 			 * is not running.  Check to see if poll thread is
994 			 * allowed to process. Let it do processing only if it
995 			 * picked up some packets from the NIC otherwise
996 			 * wakeup the worker thread.
997 			 */
998 			if (mp != NULL) {
999 				hrtime_t  now;
1000 
1001 				now = gethrtime();
1002 				sqp->sq_run = curthread;
1003 				sqp->sq_drain(sqp, SQS_POLL_PROC, now +
1004 				    squeue_drain_ns);
1005 				sqp->sq_run = NULL;
1006 
1007 				if (sqp->sq_first == NULL)
1008 					goto poll_again;
1009 
1010 				/*
1011 				 * Couldn't do the entire drain because the
1012 				 * time limit expired, let the
1013 				 * worker thread take over.
1014 				 */
1015 			}
1016 
1017 			/*
1018 			 * Put the SQS_PROC_HELD on so the worker
1019 			 * thread can distinguish where its called from. We
1020 			 * can remove the SQS_PROC flag here and turn off the
1021 			 * polling so that it wouldn't matter who gets the
1022 			 * processing but we get better performance this way
1023 			 * and save the cost of turn polling off and possibly
1024 			 * on again as soon as we start draining again.
1025 			 *
1026 			 * We can't remove the SQS_PROC flag without turning
1027 			 * polling off until we can guarantee that control
1028 			 * will return to squeue_drain immediately.
1029 			 */
1030 			sqp->sq_state |= SQS_PROC_HELD;
1031 			sqp->sq_state &= ~SQS_GET_PKTS;
1032 			squeue_worker_wakeup(sqp);
1033 		} else if (sqp->sq_first == NULL &&
1034 		    !(sqp->sq_state & SQS_WORKER)) {
1035 			/*
1036 			 * Nothing queued and worker thread not running.
1037 			 * Since we hold the proc, no other thread is
1038 			 * processing the squeue. This means that there
1039 			 * is no work to be done and nothing is queued
1040 			 * in squeue or in NIC. Turn polling off and go
1041 			 * back to interrupt mode.
1042 			 */
1043 			sqp->sq_state &= ~(SQS_PROC|SQS_GET_PKTS);
1044 			/* LINTED: constant in conditional context */
1045 			SQS_POLLING_OFF(sqp, B_TRUE, sq_rx_ring);
1046 
1047 			/*
1048 			 * If there is a pending control operation
1049 			 * wake up the worker, since it is currently
1050 			 * not running.
1051 			 */
1052 			if (sqp->sq_state & SQS_WORKER_THR_CONTROL) {
1053 				squeue_worker_wakeup(sqp);
1054 			}
1055 		} else {
1056 			/*
1057 			 * Worker thread is already running. We don't need
1058 			 * to do anything. Indicate that poll thread is done.
1059 			 */
1060 			sqp->sq_state &= ~SQS_GET_PKTS;
1061 		}
1062 		if (sqp->sq_state & SQS_POLL_THR_CONTROL) {
1063 			/*
1064 			 * Act on control requests to quiesce, cleanup or
1065 			 * restart an squeue
1066 			 */
1067 			squeue_poll_thr_control(sqp);
1068 		}
1069 	}
1070 }
1071 
1072 /*
1073  * The squeue worker thread acts on any control requests to quiesce, cleanup
1074  * or restart an ill_rx_ring_t by calling this function. The worker thread
1075  * synchronizes with the squeue poll thread to complete the request and finally
1076  * wakes up the requestor when the request is completed.
1077  */
1078 static void
1079 squeue_worker_thr_control(squeue_t *sqp)
1080 {
1081 	ill_t	*ill;
1082 	ill_rx_ring_t	*rx_ring;
1083 
1084 	ASSERT(MUTEX_HELD(&sqp->sq_lock));
1085 
1086 	if (sqp->sq_state & SQS_POLL_RESTART) {
1087 		/* Restart implies a previous quiesce. */
1088 		ASSERT((sqp->sq_state & (SQS_PROC_HELD |
1089 		    SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)) ==
1090 		    (SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER));
1091 		/*
1092 		 * Request the squeue poll thread to restart and wait till
1093 		 * it actually restarts.
1094 		 */
1095 		sqp->sq_state &= ~SQS_POLL_QUIESCE_DONE;
1096 		sqp->sq_state |= SQS_POLL_THR_RESTART;
1097 		cv_signal(&sqp->sq_poll_cv);
1098 		while (sqp->sq_state & SQS_POLL_THR_QUIESCED)
1099 			cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1100 		sqp->sq_state &= ~(SQS_POLL_RESTART | SQS_PROC |
1101 		    SQS_WORKER);
1102 		/*
1103 		 * Signal any waiter that is waiting for the restart
1104 		 * to complete
1105 		 */
1106 		sqp->sq_state |= SQS_POLL_RESTART_DONE;
1107 		cv_signal(&sqp->sq_ctrlop_done_cv);
1108 		return;
1109 	}
1110 
1111 	if (sqp->sq_state & SQS_PROC_HELD) {
1112 		/* The squeue poll thread handed control to us */
1113 		ASSERT(sqp->sq_state & SQS_PROC);
1114 	}
1115 
1116 	/*
1117 	 * Prevent any other thread from processing the squeue
1118 	 * until we finish the control actions by setting SQS_PROC.
1119 	 * But allow ourself to reenter by setting SQS_WORKER
1120 	 */
1121 	sqp->sq_state |= (SQS_PROC | SQS_WORKER);
1122 
1123 	/* Signal the squeue poll thread and wait for it to quiesce itself */
1124 	if (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) {
1125 		sqp->sq_state |= SQS_POLL_THR_QUIESCE;
1126 		cv_signal(&sqp->sq_poll_cv);
1127 		while (!(sqp->sq_state & SQS_POLL_THR_QUIESCED))
1128 			cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1129 	}
1130 
1131 	rx_ring = sqp->sq_rx_ring;
1132 	ill = rx_ring->rr_ill;
1133 	/*
1134 	 * The lock hierarchy is as follows.
1135 	 * cpu_lock -> ill_lock -> sqset_lock -> sq_lock
1136 	 */
1137 	mutex_exit(&sqp->sq_lock);
1138 	mutex_enter(&ill->ill_lock);
1139 	mutex_enter(&sqp->sq_lock);
1140 
1141 	SQS_POLLING_OFF(sqp, (sqp->sq_state & SQS_POLL_CAPAB) != 0,
1142 	    sqp->sq_rx_ring);
1143 	sqp->sq_state &= ~(SQS_POLL_CAPAB | SQS_GET_PKTS | SQS_PROC_HELD);
1144 	if (sqp->sq_state & SQS_POLL_CLEANUP) {
1145 		/*
1146 		 * Disassociate this squeue from its ill_rx_ring_t.
1147 		 * The rr_sqp, sq_rx_ring fields are protected by the
1148 		 * corresponding squeue, ill_lock* and sq_lock. Holding any
1149 		 * of them will ensure that the ring to squeue mapping does
1150 		 * not change.
1151 		 */
1152 		ASSERT(!(sqp->sq_state & SQS_DEFAULT));
1153 
1154 		sqp->sq_rx_ring = NULL;
1155 		rx_ring->rr_sqp = NULL;
1156 
1157 		sqp->sq_state &= ~(SQS_POLL_CLEANUP | SQS_POLL_THR_QUIESCED |
1158 		    SQS_POLL_QUIESCE_DONE);
1159 		sqp->sq_ill = NULL;
1160 
1161 		rx_ring->rr_rx_handle = NULL;
1162 		rx_ring->rr_intr_handle = NULL;
1163 		rx_ring->rr_intr_enable = NULL;
1164 		rx_ring->rr_intr_disable = NULL;
1165 		sqp->sq_state |= SQS_POLL_CLEANUP_DONE;
1166 	} else {
1167 		sqp->sq_state &= ~SQS_POLL_QUIESCE;
1168 		sqp->sq_state |= SQS_POLL_QUIESCE_DONE;
1169 	}
1170 	/*
1171 	 * Signal any waiter that is waiting for the quiesce or cleanup
1172 	 * to complete and also wait for it to actually see and reset the
1173 	 * SQS_POLL_CLEANUP_DONE.
1174 	 */
1175 	cv_signal(&sqp->sq_ctrlop_done_cv);
1176 	mutex_exit(&ill->ill_lock);
1177 	if (sqp->sq_state & SQS_POLL_CLEANUP_DONE) {
1178 		cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1179 		sqp->sq_state &= ~(SQS_PROC | SQS_WORKER);
1180 	}
1181 }
1182 
1183 static void
1184 squeue_worker(squeue_t *sqp)
1185 {
1186 	kmutex_t *lock = &sqp->sq_lock;
1187 	kcondvar_t *async = &sqp->sq_worker_cv;
1188 	callb_cpr_t cprinfo;
1189 	hrtime_t now;
1190 
1191 	CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_worker");
1192 	mutex_enter(lock);
1193 
1194 	for (;;) {
1195 		for (;;) {
1196 			/*
1197 			 * If the poll thread has handed control to us
1198 			 * we need to break out of the wait.
1199 			 */
1200 			if (sqp->sq_state & SQS_PROC_HELD)
1201 				break;
1202 
1203 			/*
1204 			 * If the squeue is not being processed and we either
1205 			 * have messages to drain or some thread has signaled
1206 			 * some control activity we need to break
1207 			 */
1208 			if (!(sqp->sq_state & SQS_PROC) &&
1209 			    ((sqp->sq_state & SQS_WORKER_THR_CONTROL) ||
1210 			    (sqp->sq_first != NULL)))
1211 				break;
1212 
1213 			/*
1214 			 * If we have started some control action, then check
1215 			 * for the SQS_WORKER flag (since we don't
1216 			 * release the squeue) to make sure we own the squeue
1217 			 * and break out
1218 			 */
1219 			if ((sqp->sq_state & SQS_WORKER_THR_CONTROL) &&
1220 			    (sqp->sq_state & SQS_WORKER))
1221 				break;
1222 
1223 			CALLB_CPR_SAFE_BEGIN(&cprinfo);
1224 			cv_wait(async, lock);
1225 			CALLB_CPR_SAFE_END(&cprinfo, lock);
1226 		}
1227 		if (sqp->sq_state & SQS_WORKER_THR_CONTROL) {
1228 			squeue_worker_thr_control(sqp);
1229 			continue;
1230 		}
1231 		ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
1232 		    SQS_POLL_CLEANUP_DONE | SQS_POLL_QUIESCE_DONE |
1233 		    SQS_WORKER_THR_CONTROL | SQS_POLL_THR_CONTROL)));
1234 
1235 		if (sqp->sq_state & SQS_PROC_HELD)
1236 			sqp->sq_state &= ~SQS_PROC_HELD;
1237 
1238 		now = gethrtime();
1239 		sqp->sq_run = curthread;
1240 		sqp->sq_drain(sqp, SQS_WORKER, now +  squeue_drain_ns);
1241 		sqp->sq_run = NULL;
1242 	}
1243 }
1244 
1245 uintptr_t *
1246 squeue_getprivate(squeue_t *sqp, sqprivate_t p)
1247 {
1248 	ASSERT(p < SQPRIVATE_MAX);
1249 
1250 	return (&sqp->sq_private[p]);
1251 }
1252 
1253 /* ARGSUSED */
1254 void
1255 squeue_wakeup_conn(void *arg, mblk_t *mp, void *arg2, ip_recv_attr_t *dummy)
1256 {
1257 	conn_t *connp = (conn_t *)arg;
1258 	squeue_t *sqp = connp->conn_sqp;
1259 
1260 	/*
1261 	 * Mark the squeue as paused before waking up the thread stuck
1262 	 * in squeue_synch_enter().
1263 	 */
1264 	mutex_enter(&sqp->sq_lock);
1265 	sqp->sq_state |= SQS_PAUSE;
1266 
1267 	/*
1268 	 * Notify the thread that it's OK to proceed; that is done by
1269 	 * clearing the MSGWAITSYNC flag. The synch thread will free the mblk.
1270 	 */
1271 	ASSERT(mp->b_flag & MSGWAITSYNC);
1272 	mp->b_flag &= ~MSGWAITSYNC;
1273 	cv_broadcast(&connp->conn_sq_cv);
1274 
1275 	/*
1276 	 * We are doing something on behalf of another thread, so we have to
1277 	 * pause and wait until it finishes.
1278 	 */
1279 	while (sqp->sq_state & SQS_PAUSE) {
1280 		cv_wait(&sqp->sq_synch_cv, &sqp->sq_lock);
1281 	}
1282 	mutex_exit(&sqp->sq_lock);
1283 }
1284 
1285 int
1286 squeue_synch_enter(conn_t *connp, mblk_t *use_mp)
1287 {
1288 	squeue_t *sqp;
1289 
1290 again:
1291 	sqp = connp->conn_sqp;
1292 
1293 	mutex_enter(&sqp->sq_lock);
1294 	if (sqp->sq_first == NULL && !(sqp->sq_state & SQS_PROC)) {
1295 		/*
1296 		 * We are OK to proceed if the squeue is empty, and
1297 		 * no one owns the squeue.
1298 		 *
1299 		 * The caller won't own the squeue as this is called from the
1300 		 * application.
1301 		 */
1302 		ASSERT(sqp->sq_run == NULL);
1303 
1304 		sqp->sq_state |= SQS_PROC;
1305 		sqp->sq_run = curthread;
1306 		mutex_exit(&sqp->sq_lock);
1307 
1308 		/*
1309 		 * Handle squeue switching. The conn's squeue can only change
1310 		 * while there is a thread in the squeue, which is why we do
1311 		 * the check after entering the squeue. If it has changed, exit
1312 		 * this squeue and redo everything with the new sqeueue.
1313 		 */
1314 		if (sqp != connp->conn_sqp) {
1315 			mutex_enter(&sqp->sq_lock);
1316 			sqp->sq_state &= ~SQS_PROC;
1317 			sqp->sq_run = NULL;
1318 			mutex_exit(&sqp->sq_lock);
1319 			goto again;
1320 		}
1321 #if SQUEUE_DEBUG
1322 		sqp->sq_curmp = NULL;
1323 		sqp->sq_curproc = NULL;
1324 		sqp->sq_connp = connp;
1325 #endif
1326 		connp->conn_on_sqp = B_TRUE;
1327 		return (0);
1328 	} else {
1329 		mblk_t  *mp;
1330 
1331 		mp = (use_mp == NULL) ? allocb(0, BPRI_MED) : use_mp;
1332 		if (mp == NULL) {
1333 			mutex_exit(&sqp->sq_lock);
1334 			return (ENOMEM);
1335 		}
1336 
1337 		/*
1338 		 * We mark the mblk as awaiting synchronous squeue access
1339 		 * by setting the MSGWAITSYNC flag. Once squeue_wakeup_conn
1340 		 * fires, MSGWAITSYNC is cleared, at which point we know we
1341 		 * have exclusive access.
1342 		 */
1343 		mp->b_flag |= MSGWAITSYNC;
1344 
1345 		CONN_INC_REF(connp);
1346 		SET_SQUEUE(mp, squeue_wakeup_conn, connp);
1347 		ENQUEUE_CHAIN(sqp, mp, mp, 1);
1348 
1349 		ASSERT(sqp->sq_run != curthread);
1350 
1351 		/* Wait until the enqueued mblk get processed. */
1352 		while (mp->b_flag & MSGWAITSYNC)
1353 			cv_wait(&connp->conn_sq_cv, &sqp->sq_lock);
1354 		mutex_exit(&sqp->sq_lock);
1355 
1356 		if (use_mp == NULL)
1357 			freeb(mp);
1358 
1359 		return (0);
1360 	}
1361 }
1362 
1363 /*
1364  * If possible, attempt to immediately process a single queued request, should
1365  * it match the supplied conn_t reference.  This is primarily intended to elide
1366  * squeue worker thread wake-ups during local TCP connect() or close()
1367  * operations where the response is placed on the squeue during processing.
1368  */
1369 static void
1370 squeue_try_drain_one(squeue_t *sqp, conn_t *compare_conn)
1371 {
1372 	mblk_t *next, *mp = sqp->sq_first;
1373 	conn_t *connp;
1374 	sqproc_t proc = (sqproc_t)mp->b_queue;
1375 	ip_recv_attr_t iras, *ira = NULL;
1376 
1377 	ASSERT(MUTEX_HELD(&sqp->sq_lock));
1378 	ASSERT((sqp->sq_state & SQS_PROC) == 0);
1379 	ASSERT(sqp->sq_run == NULL);
1380 	VERIFY(mp != NULL);
1381 
1382 	/*
1383 	 * There is no guarantee that compare_conn references a valid object at
1384 	 * this time, so under no circumstance may it be deferenced unless it
1385 	 * matches the squeue entry.
1386 	 */
1387 	connp = (conn_t *)mp->b_prev;
1388 	if (connp != compare_conn) {
1389 		return;
1390 	}
1391 
1392 	next = mp->b_next;
1393 	proc = (sqproc_t)mp->b_queue;
1394 
1395 	ASSERT(proc != NULL);
1396 	ASSERT(sqp->sq_count > 0);
1397 
1398 	/* Dequeue item from squeue */
1399 	if (next == NULL) {
1400 		sqp->sq_first = NULL;
1401 		sqp->sq_last = NULL;
1402 	} else {
1403 		sqp->sq_first = next;
1404 	}
1405 	sqp->sq_count--;
1406 
1407 	sqp->sq_state |= SQS_PROC;
1408 	sqp->sq_run = curthread;
1409 	mutex_exit(&sqp->sq_lock);
1410 
1411 	/* Prep mblk_t and retrieve ira if needed */
1412 	mp->b_prev = NULL;
1413 	mp->b_queue = NULL;
1414 	mp->b_next = NULL;
1415 	if (ip_recv_attr_is_mblk(mp)) {
1416 		mblk_t	*attrmp = mp;
1417 
1418 		ASSERT(attrmp->b_cont != NULL);
1419 
1420 		mp = attrmp->b_cont;
1421 		attrmp->b_cont = NULL;
1422 
1423 		ASSERT(mp->b_queue == NULL);
1424 		ASSERT(mp->b_prev == NULL);
1425 
1426 		if (!ip_recv_attr_from_mblk(attrmp, &iras)) {
1427 			/* ill_t or ip_stack_t disappeared */
1428 			ip_drop_input("ip_recv_attr_from_mblk", mp, NULL);
1429 			ira_cleanup(&iras, B_TRUE);
1430 			CONN_DEC_REF(connp);
1431 			goto done;
1432 		}
1433 		ira = &iras;
1434 	}
1435 
1436 	SQUEUE_DBG_SET(sqp, mp, proc, connp, mp->b_tag);
1437 	connp->conn_on_sqp = B_TRUE;
1438 	DTRACE_PROBE3(squeue__proc__start, squeue_t *, sqp, mblk_t *, mp,
1439 	    conn_t *, connp);
1440 	(*proc)(connp, mp, sqp, ira);
1441 	DTRACE_PROBE2(squeue__proc__end, squeue_t *, sqp, conn_t *, connp);
1442 	connp->conn_on_sqp = B_FALSE;
1443 	CONN_DEC_REF(connp);
1444 	SQUEUE_DBG_CLEAR(sqp);
1445 
1446 	if (ira != NULL)
1447 		ira_cleanup(ira, B_TRUE);
1448 
1449 done:
1450 	mutex_enter(&sqp->sq_lock);
1451 	sqp->sq_state &= ~(SQS_PROC);
1452 	sqp->sq_run = NULL;
1453 }
1454 
1455 void
1456 squeue_synch_exit(conn_t *connp, int flag)
1457 {
1458 	squeue_t *sqp = connp->conn_sqp;
1459 
1460 	ASSERT(flag == SQ_NODRAIN || flag == SQ_PROCESS);
1461 
1462 	mutex_enter(&sqp->sq_lock);
1463 	if (sqp->sq_run != curthread) {
1464 		/*
1465 		 * The caller doesn't own the squeue, clear the SQS_PAUSE flag,
1466 		 * and wake up the squeue owner, such that owner can continue
1467 		 * processing.
1468 		 */
1469 		ASSERT(sqp->sq_state & SQS_PAUSE);
1470 		sqp->sq_state &= ~SQS_PAUSE;
1471 
1472 		/* There should be only one thread blocking on sq_synch_cv. */
1473 		cv_signal(&sqp->sq_synch_cv);
1474 		mutex_exit(&sqp->sq_lock);
1475 		return;
1476 	}
1477 
1478 	ASSERT(sqp->sq_state & SQS_PROC);
1479 
1480 	sqp->sq_state &= ~SQS_PROC;
1481 	sqp->sq_run = NULL;
1482 	connp->conn_on_sqp = B_FALSE;
1483 
1484 	/* If the caller opted in, attempt to process the head squeue item. */
1485 	if (flag == SQ_PROCESS && sqp->sq_first != NULL) {
1486 		squeue_try_drain_one(sqp, connp);
1487 	}
1488 
1489 	/* Wake up the worker if further requests are pending. */
1490 	if (sqp->sq_first != NULL) {
1491 		squeue_worker_wakeup(sqp);
1492 	}
1493 	mutex_exit(&sqp->sq_lock);
1494 }
1495