xref: /illumos-gate/usr/src/uts/common/inet/squeue.c (revision 9dd2e6b590a600c51a70c5d4c872d4cdeedc9aab)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
23  */
24 
25 /*
26  * Copyright 2017 Joyent, Inc.
27  */
28 
29 /*
30  * Squeues: General purpose serialization mechanism
31  * ------------------------------------------------
32  *
33  * Background:
34  * -----------
35  *
36  * This is a general purpose high-performance serialization mechanism
37  * currently used by TCP/IP. It is implement by means of a per CPU queue,
38  * a worker thread and a polling thread with are bound to the CPU
39  * associated with the squeue. The squeue is strictly FIFO for both read
40  * and write side and only one thread can process it at any given time.
41  * The design goal of squeue was to offer a very high degree of
42  * parallelization (on a per H/W execution pipeline basis) with at
43  * most one queuing.
44  *
45  * The modules needing protection typically calls SQUEUE_ENTER_ONE() or
46  * SQUEUE_ENTER() macro as soon as a thread enter the module
47  * from either direction. For each packet, the processing function
48  * and argument is stored in the mblk itself. When the packet is ready
49  * to be processed, the squeue retrieves the stored function and calls
50  * it with the supplied argument and the pointer to the packet itself.
51  * The called function can assume that no other thread is processing
52  * the squeue when it is executing.
53  *
54  * Squeue/connection binding:
55  * --------------------------
56  *
57  * TCP/IP uses an IP classifier in conjunction with squeue where specific
58  * connections are assigned to specific squeue (based on various policies),
59  * at the connection creation time. Once assigned, the connection to
60  * squeue mapping is never changed and all future packets for that
61  * connection are processed on that squeue. The connection ("conn") to
62  * squeue mapping is stored in "conn_t" member "conn_sqp".
63  *
64  * Since the processing of the connection cuts across multiple layers
65  * but still allows packets for different connnection to be processed on
66  * other CPU/squeues, squeues are also termed as "Vertical Perimeter" or
67  * "Per Connection Vertical Perimeter".
68  *
69  * Processing Model:
70  * -----------------
71  *
72  * Squeue doesn't necessary processes packets with its own worker thread.
73  * The callers can pick if they just want to queue the packet, process
74  * their packet if nothing is queued or drain and process. The first two
75  * modes are typically employed when the packet was generated while
76  * already doing the processing behind the squeue and last mode (drain
77  * and process) is typically employed when the thread is entering squeue
78  * for the first time. The squeue still imposes a finite time limit
79  * for which a external thread can do processing after which it switches
80  * processing to its own worker thread.
81  *
82  * Once created, squeues are never deleted. Hence squeue pointers are
83  * always valid. This means that functions outside the squeue can still
84  * refer safely to conn_sqp and their is no need for ref counts.
85  *
86  * Only a thread executing in the squeue can change the squeue of the
87  * connection. It does so by calling a squeue framework function to do this.
88  * After changing the squeue, the thread must leave the squeue. It must not
89  * continue to execute any code that needs squeue protection.
90  *
91  * The squeue framework, after entering the squeue, checks if the current
92  * squeue matches the conn_sqp. If the check fails, the packet is delivered
93  * to right squeue.
94  *
95  * Polling Model:
96  * --------------
97  *
98  * Squeues can control the rate of packet arrival into itself from the
99  * NIC or specific Rx ring within a NIC. As part of capability negotiation
100  * between IP and MAC layer, squeue are created for each TCP soft ring
101  * (or TCP Rx ring - to be implemented in future). As part of this
102  * negotiation, squeues get a cookie for underlying soft ring or Rx
103  * ring, a function to turn off incoming packets and a function to call
104  * to poll for packets. This helps schedule the receive side packet
105  * processing so that queue backlog doesn't build up and packet processing
106  * doesn't keep getting disturbed by high priority interrupts. As part
107  * of this mode, as soon as a backlog starts building, squeue turns off
108  * the interrupts and switches to poll mode. In poll mode, when poll
109  * thread goes down to retrieve packets, it retrieves them in the form of
110  * a chain which improves performance even more. As the squeue/softring
111  * system gets more packets, it gets more efficient by switching to
112  * polling more often and dealing with larger packet chains.
113  *
114  */
115 
116 #include <sys/types.h>
117 #include <sys/cmn_err.h>
118 #include <sys/debug.h>
119 #include <sys/kmem.h>
120 #include <sys/cpuvar.h>
121 #include <sys/condvar_impl.h>
122 #include <sys/systm.h>
123 #include <sys/callb.h>
124 #include <sys/sdt.h>
125 #include <sys/ddi.h>
126 #include <sys/sunddi.h>
127 #include <sys/stack.h>
128 #include <sys/archsystm.h>
129 
130 #include <inet/ipclassifier.h>
131 #include <inet/udp_impl.h>
132 
133 #include <sys/squeue_impl.h>
134 
135 static void squeue_drain(squeue_t *, uint_t, hrtime_t);
136 static void squeue_worker(squeue_t *sqp);
137 static void squeue_polling_thread(squeue_t *sqp);
138 static void squeue_worker_wakeup(squeue_t *sqp);
139 static void squeue_try_drain_one(squeue_t *, conn_t *);
140 
141 kmem_cache_t *squeue_cache;
142 
143 #define	SQUEUE_MSEC_TO_NSEC 1000000
144 
145 int squeue_drain_ms = 20;
146 
147 /* The values above converted to ticks or nano seconds */
148 static uint_t squeue_drain_ns = 0;
149 
150 uintptr_t squeue_drain_stack_needed = 10240;
151 uint_t squeue_drain_stack_toodeep;
152 
153 #define	MAX_BYTES_TO_PICKUP	150000
154 
155 #define	ENQUEUE_CHAIN(sqp, mp, tail, cnt) {			\
156 	/*							\
157 	 * Enqueue our mblk chain.				\
158 	 */							\
159 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
160 								\
161 	if ((sqp)->sq_last != NULL)				\
162 		(sqp)->sq_last->b_next = (mp);			\
163 	else							\
164 		(sqp)->sq_first = (mp);				\
165 	(sqp)->sq_last = (tail);				\
166 	(sqp)->sq_count += (cnt);				\
167 	ASSERT((sqp)->sq_count > 0);				\
168 	DTRACE_PROBE4(squeue__enqueuechain, squeue_t *, sqp,	\
169 		mblk_t *, mp, mblk_t *, tail, int, cnt);	\
170 								\
171 }
172 
173 /*
174  * Blank the receive ring (in this case it is the soft ring). When
175  * blanked, the soft ring will not send any more packets up.
176  * Blanking may not succeed when there is a CPU already in the soft
177  * ring sending packets up. In that case, SQS_POLLING will not be
178  * set.
179  */
180 #define	SQS_POLLING_ON(sqp, sq_poll_capable, rx_ring) {		\
181 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
182 	if (sq_poll_capable) {					\
183 		ASSERT(rx_ring != NULL);			\
184 		ASSERT(sqp->sq_state & SQS_POLL_CAPAB);		\
185 		if (!(sqp->sq_state & SQS_POLLING)) {		\
186 			if (rx_ring->rr_intr_disable(rx_ring->rr_intr_handle)) \
187 				sqp->sq_state |= SQS_POLLING;	\
188 		}						\
189 	}							\
190 }
191 
192 #define	SQS_POLLING_OFF(sqp, sq_poll_capable, rx_ring) {	\
193 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
194 	if (sq_poll_capable) {					\
195 		ASSERT(rx_ring != NULL);			\
196 		ASSERT(sqp->sq_state & SQS_POLL_CAPAB);		\
197 		if (sqp->sq_state & SQS_POLLING) {		\
198 			sqp->sq_state &= ~SQS_POLLING;		\
199 			rx_ring->rr_intr_enable(rx_ring->rr_intr_handle); \
200 		}						\
201 	}							\
202 }
203 
204 /* Wakeup poll thread only if SQS_POLLING is set */
205 #define	SQS_POLL_RING(sqp) {			\
206 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
207 	if (sqp->sq_state & SQS_POLLING) {			\
208 		ASSERT(sqp->sq_state & SQS_POLL_CAPAB);		\
209 		if (!(sqp->sq_state & SQS_GET_PKTS)) {		\
210 			sqp->sq_state |= SQS_GET_PKTS;		\
211 			cv_signal(&sqp->sq_poll_cv);		\
212 		}						\
213 	}							\
214 }
215 
216 #ifdef DEBUG
217 #define	SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) {		\
218 	(sqp)->sq_curmp = (mp);					\
219 	(sqp)->sq_curproc = (proc);				\
220 	(sqp)->sq_connp = (connp);				\
221 	(mp)->b_tag = (sqp)->sq_tag = (tag);			\
222 }
223 
224 #define	SQUEUE_DBG_CLEAR(sqp)	{				\
225 	(sqp)->sq_curmp = NULL;					\
226 	(sqp)->sq_curproc = NULL;				\
227 	(sqp)->sq_connp = NULL;					\
228 }
229 #else
230 #define	SQUEUE_DBG_SET(sqp, mp, proc, connp, tag)
231 #define	SQUEUE_DBG_CLEAR(sqp)
232 #endif
233 
234 void
235 squeue_init(void)
236 {
237 	squeue_cache = kmem_cache_create("squeue_cache",
238 	    sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0);
239 
240 	squeue_drain_ns = squeue_drain_ms * SQUEUE_MSEC_TO_NSEC;
241 }
242 
243 squeue_t *
244 squeue_create(pri_t pri)
245 {
246 	squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP);
247 
248 	bzero(sqp, sizeof (squeue_t));
249 	sqp->sq_bind = PBIND_NONE;
250 	sqp->sq_priority = pri;
251 	sqp->sq_worker = thread_create(NULL, 0, squeue_worker,
252 	    sqp, 0, &p0, TS_RUN, pri);
253 
254 	sqp->sq_poll_thr = thread_create(NULL, 0, squeue_polling_thread,
255 	    sqp, 0, &p0, TS_RUN, pri);
256 
257 	sqp->sq_enter = squeue_enter;
258 	sqp->sq_drain = squeue_drain;
259 
260 	return (sqp);
261 }
262 
263 /*
264  * Bind squeue worker thread to the specified CPU, given by CPU id.
265  * If the CPU id  value is -1, bind the worker thread to the value
266  * specified in sq_bind field. If a thread is already bound to a
267  * different CPU, unbind it from the old CPU and bind to the new one.
268  */
269 
270 void
271 squeue_bind(squeue_t *sqp, processorid_t bind)
272 {
273 	mutex_enter(&sqp->sq_lock);
274 	ASSERT(sqp->sq_bind != PBIND_NONE || bind != PBIND_NONE);
275 	ASSERT(MUTEX_HELD(&cpu_lock));
276 
277 	if (sqp->sq_state & SQS_BOUND) {
278 		if (sqp->sq_bind == bind) {
279 			mutex_exit(&sqp->sq_lock);
280 			return;
281 		}
282 		thread_affinity_clear(sqp->sq_worker);
283 	} else {
284 		sqp->sq_state |= SQS_BOUND;
285 	}
286 
287 	if (bind != PBIND_NONE)
288 		sqp->sq_bind = bind;
289 
290 	thread_affinity_set(sqp->sq_worker, sqp->sq_bind);
291 	mutex_exit(&sqp->sq_lock);
292 }
293 
294 void
295 squeue_unbind(squeue_t *sqp)
296 {
297 	mutex_enter(&sqp->sq_lock);
298 	if (!(sqp->sq_state & SQS_BOUND)) {
299 		mutex_exit(&sqp->sq_lock);
300 		return;
301 	}
302 
303 	sqp->sq_state &= ~SQS_BOUND;
304 	thread_affinity_clear(sqp->sq_worker);
305 	mutex_exit(&sqp->sq_lock);
306 }
307 
308 /*
309  * squeue_enter() - enter squeue sqp with mblk mp (which can be
310  * a chain), while tail points to the end and cnt in number of
311  * mblks in the chain.
312  *
313  * For a chain of single packet (i.e. mp == tail), go through the
314  * fast path if no one is processing the squeue and nothing is queued.
315  *
316  * The proc and arg for each mblk is already stored in the mblk in
317  * appropriate places.
318  *
319  * The process_flag specifies if we are allowed to process the mblk
320  * and drain in the entering thread context. If process_flag is
321  * SQ_FILL, then we just queue the mblk and return (after signaling
322  * the worker thread if no one else is processing the squeue).
323  *
324  * The ira argument can be used when the count is one.
325  * For a chain the caller needs to prepend any needed mblks from
326  * ip_recv_attr_to_mblk().
327  */
328 /* ARGSUSED */
329 void
330 squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt,
331     ip_recv_attr_t *ira, int process_flag, uint8_t tag)
332 {
333 	conn_t		*connp;
334 	sqproc_t	proc;
335 	hrtime_t	now;
336 
337 	ASSERT(sqp != NULL);
338 	ASSERT(mp != NULL);
339 	ASSERT(tail != NULL);
340 	ASSERT(cnt > 0);
341 	ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
342 	ASSERT(ira == NULL || cnt == 1);
343 
344 	mutex_enter(&sqp->sq_lock);
345 
346 	/*
347 	 * Try to process the packet if SQ_FILL flag is not set and
348 	 * we are allowed to process the squeue. The SQ_NODRAIN is
349 	 * ignored if the packet chain consists of more than 1 packet.
350 	 */
351 	if (!(sqp->sq_state & SQS_PROC) && ((process_flag == SQ_PROCESS) ||
352 	    (process_flag == SQ_NODRAIN && sqp->sq_first == NULL))) {
353 		/*
354 		 * See if anything is already queued. If we are the
355 		 * first packet, do inline processing else queue the
356 		 * packet and do the drain.
357 		 */
358 		if (sqp->sq_first == NULL && cnt == 1) {
359 			/*
360 			 * Fast-path, ok to process and nothing queued.
361 			 */
362 			sqp->sq_state |= (SQS_PROC|SQS_FAST);
363 			sqp->sq_run = curthread;
364 			mutex_exit(&sqp->sq_lock);
365 
366 			/*
367 			 * We are the chain of 1 packet so
368 			 * go through this fast path.
369 			 */
370 			ASSERT(mp->b_prev != NULL);
371 			ASSERT(mp->b_queue != NULL);
372 			connp = (conn_t *)mp->b_prev;
373 			mp->b_prev = NULL;
374 			proc = (sqproc_t)mp->b_queue;
375 			mp->b_queue = NULL;
376 			ASSERT(proc != NULL && connp != NULL);
377 			ASSERT(mp->b_next == NULL);
378 
379 			/*
380 			 * Handle squeue switching. More details in the
381 			 * block comment at the top of the file
382 			 */
383 			if (connp->conn_sqp == sqp) {
384 				SQUEUE_DBG_SET(sqp, mp, proc, connp,
385 				    tag);
386 				connp->conn_on_sqp = B_TRUE;
387 				DTRACE_PROBE3(squeue__proc__start, squeue_t *,
388 				    sqp, mblk_t *, mp, conn_t *, connp);
389 				(*proc)(connp, mp, sqp, ira);
390 				DTRACE_PROBE2(squeue__proc__end, squeue_t *,
391 				    sqp, conn_t *, connp);
392 				connp->conn_on_sqp = B_FALSE;
393 				SQUEUE_DBG_CLEAR(sqp);
394 				CONN_DEC_REF(connp);
395 			} else {
396 				SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc,
397 				    connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE);
398 			}
399 			ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
400 			mutex_enter(&sqp->sq_lock);
401 			sqp->sq_state &= ~(SQS_PROC|SQS_FAST);
402 			sqp->sq_run = NULL;
403 			if (sqp->sq_first == NULL ||
404 			    process_flag == SQ_NODRAIN) {
405 				/*
406 				 * Even if SQ_NODRAIN was specified, it may
407 				 * still be best to process a single queued
408 				 * item if it matches the active connection.
409 				 */
410 				if (sqp->sq_first != NULL) {
411 					squeue_try_drain_one(sqp, connp);
412 				}
413 
414 				/*
415 				 * If work or control actions are pending, wake
416 				 * up the worker thread.
417 				 */
418 				if (sqp->sq_first != NULL ||
419 				    sqp->sq_state & SQS_WORKER_THR_CONTROL) {
420 					squeue_worker_wakeup(sqp);
421 				}
422 				mutex_exit(&sqp->sq_lock);
423 				return;
424 			}
425 		} else {
426 			if (ira != NULL) {
427 				mblk_t	*attrmp;
428 
429 				ASSERT(cnt == 1);
430 				attrmp = ip_recv_attr_to_mblk(ira);
431 				if (attrmp == NULL) {
432 					mutex_exit(&sqp->sq_lock);
433 					ip_drop_input("squeue: "
434 					    "ip_recv_attr_to_mblk",
435 					    mp, NULL);
436 					/* Caller already set b_prev/b_next */
437 					mp->b_prev = mp->b_next = NULL;
438 					freemsg(mp);
439 					return;
440 				}
441 				ASSERT(attrmp->b_cont == NULL);
442 				attrmp->b_cont = mp;
443 				/* Move connp and func to new */
444 				attrmp->b_queue = mp->b_queue;
445 				mp->b_queue = NULL;
446 				attrmp->b_prev = mp->b_prev;
447 				mp->b_prev = NULL;
448 
449 				ASSERT(mp == tail);
450 				tail = mp = attrmp;
451 			}
452 
453 			ENQUEUE_CHAIN(sqp, mp, tail, cnt);
454 #ifdef DEBUG
455 			mp->b_tag = tag;
456 #endif
457 		}
458 		/*
459 		 * We are here because either we couldn't do inline
460 		 * processing (because something was already queued),
461 		 * or we had a chain of more than one packet,
462 		 * or something else arrived after we were done with
463 		 * inline processing.
464 		 */
465 		ASSERT(MUTEX_HELD(&sqp->sq_lock));
466 		ASSERT(sqp->sq_first != NULL);
467 		now = gethrtime();
468 		sqp->sq_run = curthread;
469 		sqp->sq_drain(sqp, SQS_ENTER, now + squeue_drain_ns);
470 
471 		/*
472 		 * If we didn't do a complete drain, the worker
473 		 * thread was already signalled by squeue_drain.
474 		 * In case any control actions are pending, wake
475 		 * up the worker.
476 		 */
477 		sqp->sq_run = NULL;
478 		if (sqp->sq_state & SQS_WORKER_THR_CONTROL) {
479 			squeue_worker_wakeup(sqp);
480 		}
481 	} else {
482 		/*
483 		 * We let a thread processing a squeue reenter only
484 		 * once. This helps the case of incoming connection
485 		 * where a SYN-ACK-ACK that triggers the conn_ind
486 		 * doesn't have to queue the packet if listener and
487 		 * eager are on the same squeue. Also helps the
488 		 * loopback connection where the two ends are bound
489 		 * to the same squeue (which is typical on single
490 		 * CPU machines).
491 		 *
492 		 * We let the thread reenter only once for the fear
493 		 * of stack getting blown with multiple traversal.
494 		 */
495 		connp = (conn_t *)mp->b_prev;
496 		if (!(sqp->sq_state & SQS_REENTER) &&
497 		    (process_flag != SQ_FILL) && (sqp->sq_first == NULL) &&
498 		    (sqp->sq_run == curthread) && (cnt == 1) &&
499 		    (connp->conn_on_sqp == B_FALSE)) {
500 			sqp->sq_state |= SQS_REENTER;
501 			mutex_exit(&sqp->sq_lock);
502 
503 			ASSERT(mp->b_prev != NULL);
504 			ASSERT(mp->b_queue != NULL);
505 
506 			mp->b_prev = NULL;
507 			proc = (sqproc_t)mp->b_queue;
508 			mp->b_queue = NULL;
509 
510 			/*
511 			 * Handle squeue switching. More details in the
512 			 * block comment at the top of the file
513 			 */
514 			if (connp->conn_sqp == sqp) {
515 				connp->conn_on_sqp = B_TRUE;
516 				DTRACE_PROBE3(squeue__proc__start, squeue_t *,
517 				    sqp, mblk_t *, mp, conn_t *, connp);
518 				(*proc)(connp, mp, sqp, ira);
519 				DTRACE_PROBE2(squeue__proc__end, squeue_t *,
520 				    sqp, conn_t *, connp);
521 				connp->conn_on_sqp = B_FALSE;
522 				CONN_DEC_REF(connp);
523 			} else {
524 				SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc,
525 				    connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE);
526 			}
527 
528 			mutex_enter(&sqp->sq_lock);
529 			sqp->sq_state &= ~SQS_REENTER;
530 			mutex_exit(&sqp->sq_lock);
531 			return;
532 		}
533 
534 		/*
535 		 * Queue is already being processed or there is already
536 		 * one or more paquets on the queue. Enqueue the
537 		 * packet and wakeup the squeue worker thread if the
538 		 * squeue is not being processed.
539 		 */
540 #ifdef DEBUG
541 		mp->b_tag = tag;
542 #endif
543 		if (ira != NULL) {
544 			mblk_t	*attrmp;
545 
546 			ASSERT(cnt == 1);
547 			attrmp = ip_recv_attr_to_mblk(ira);
548 			if (attrmp == NULL) {
549 				mutex_exit(&sqp->sq_lock);
550 				ip_drop_input("squeue: ip_recv_attr_to_mblk",
551 				    mp, NULL);
552 				/* Caller already set b_prev/b_next */
553 				mp->b_prev = mp->b_next = NULL;
554 				freemsg(mp);
555 				return;
556 			}
557 			ASSERT(attrmp->b_cont == NULL);
558 			attrmp->b_cont = mp;
559 			/* Move connp and func to new */
560 			attrmp->b_queue = mp->b_queue;
561 			mp->b_queue = NULL;
562 			attrmp->b_prev = mp->b_prev;
563 			mp->b_prev = NULL;
564 
565 			ASSERT(mp == tail);
566 			tail = mp = attrmp;
567 		}
568 		ENQUEUE_CHAIN(sqp, mp, tail, cnt);
569 		/*
570 		 * If the worker isn't running or control actions are pending,
571 		 * wake it it up now.
572 		 */
573 		if ((sqp->sq_state & SQS_PROC) == 0 ||
574 		    (sqp->sq_state & SQS_WORKER_THR_CONTROL) != 0) {
575 			squeue_worker_wakeup(sqp);
576 		}
577 	}
578 	mutex_exit(&sqp->sq_lock);
579 }
580 
581 /*
582  * PRIVATE FUNCTIONS
583  */
584 
585 
586 /*
587  * Wake up worker thread for squeue to process queued work.
588  */
589 static void
590 squeue_worker_wakeup(squeue_t *sqp)
591 {
592 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));
593 
594 	cv_signal(&sqp->sq_worker_cv);
595 	sqp->sq_awoken = gethrtime();
596 }
597 
598 static void
599 squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire)
600 {
601 	mblk_t		*mp;
602 	mblk_t		*head;
603 	sqproc_t	proc;
604 	conn_t		*connp;
605 	ill_rx_ring_t	*sq_rx_ring = sqp->sq_rx_ring;
606 	hrtime_t	now;
607 	boolean_t	sq_poll_capable;
608 	ip_recv_attr_t	*ira, iras;
609 
610 	/*
611 	 * Before doing any work, check our stack depth; if we're not a
612 	 * worker thread for this squeue and we're beginning to get tight on
613 	 * on stack, kick the worker, bump a counter and return.
614 	 */
615 	if (proc_type != SQS_WORKER && STACK_BIAS + (uintptr_t)getfp() -
616 	    (uintptr_t)curthread->t_stkbase < squeue_drain_stack_needed) {
617 		ASSERT(mutex_owned(&sqp->sq_lock));
618 		squeue_worker_wakeup(sqp);
619 		squeue_drain_stack_toodeep++;
620 		return;
621 	}
622 
623 	sq_poll_capable = (sqp->sq_state & SQS_POLL_CAPAB) != 0;
624 again:
625 	ASSERT(mutex_owned(&sqp->sq_lock));
626 	ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
627 	    SQS_POLL_QUIESCE_DONE)));
628 
629 	head = sqp->sq_first;
630 	sqp->sq_first = NULL;
631 	sqp->sq_last = NULL;
632 	sqp->sq_count = 0;
633 
634 	sqp->sq_state |= SQS_PROC | proc_type;
635 
636 	/*
637 	 * We have backlog built up. Switch to polling mode if the
638 	 * device underneath allows it. Need to do it so that
639 	 * more packets don't come in and disturb us (by contending
640 	 * for sq_lock or higher priority thread preempting us).
641 	 *
642 	 * The worker thread is allowed to do active polling while we
643 	 * just disable the interrupts for drain by non worker (kernel
644 	 * or userland) threads so they can peacefully process the
645 	 * packets during time allocated to them.
646 	 */
647 	SQS_POLLING_ON(sqp, sq_poll_capable, sq_rx_ring);
648 	mutex_exit(&sqp->sq_lock);
649 
650 	while ((mp = head) != NULL) {
651 
652 		head = mp->b_next;
653 		mp->b_next = NULL;
654 
655 		proc = (sqproc_t)mp->b_queue;
656 		mp->b_queue = NULL;
657 		connp = (conn_t *)mp->b_prev;
658 		mp->b_prev = NULL;
659 
660 		/* Is there an ip_recv_attr_t to handle? */
661 		if (ip_recv_attr_is_mblk(mp)) {
662 			mblk_t	*attrmp = mp;
663 
664 			ASSERT(attrmp->b_cont != NULL);
665 
666 			mp = attrmp->b_cont;
667 			attrmp->b_cont = NULL;
668 			ASSERT(mp->b_queue == NULL);
669 			ASSERT(mp->b_prev == NULL);
670 
671 			if (!ip_recv_attr_from_mblk(attrmp, &iras)) {
672 				/* The ill or ip_stack_t disappeared on us */
673 				ip_drop_input("ip_recv_attr_from_mblk",
674 				    mp, NULL);
675 				ira_cleanup(&iras, B_TRUE);
676 				CONN_DEC_REF(connp);
677 				continue;
678 			}
679 			ira = &iras;
680 		} else {
681 			ira = NULL;
682 		}
683 
684 
685 		/*
686 		 * Handle squeue switching. More details in the
687 		 * block comment at the top of the file
688 		 */
689 		if (connp->conn_sqp == sqp) {
690 			SQUEUE_DBG_SET(sqp, mp, proc, connp,
691 			    mp->b_tag);
692 			connp->conn_on_sqp = B_TRUE;
693 			DTRACE_PROBE3(squeue__proc__start, squeue_t *,
694 			    sqp, mblk_t *, mp, conn_t *, connp);
695 			(*proc)(connp, mp, sqp, ira);
696 			DTRACE_PROBE2(squeue__proc__end, squeue_t *,
697 			    sqp, conn_t *, connp);
698 			connp->conn_on_sqp = B_FALSE;
699 			CONN_DEC_REF(connp);
700 		} else {
701 			SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, connp, ira,
702 			    SQ_FILL, SQTAG_SQUEUE_CHANGE);
703 		}
704 		if (ira != NULL)
705 			ira_cleanup(ira, B_TRUE);
706 	}
707 
708 	SQUEUE_DBG_CLEAR(sqp);
709 
710 	mutex_enter(&sqp->sq_lock);
711 
712 	/*
713 	 * Check if there is still work to do (either more arrived or timer
714 	 * expired). If we are the worker thread and we are polling capable,
715 	 * continue doing the work since no one else is around to do the
716 	 * work anyway (but signal the poll thread to retrieve some packets
717 	 * in the meanwhile). If we are not the worker thread, just
718 	 * signal the worker thread to take up the work if processing time
719 	 * has expired.
720 	 */
721 	if (sqp->sq_first != NULL) {
722 		/*
723 		 * Still more to process. If time quanta not expired, we
724 		 * should let the drain go on. The worker thread is allowed
725 		 * to drain as long as there is anything left.
726 		 */
727 		now = gethrtime();
728 		if ((now < expire) || (proc_type == SQS_WORKER)) {
729 			/*
730 			 * If time not expired or we are worker thread and
731 			 * this squeue is polling capable, continue to do
732 			 * the drain.
733 			 *
734 			 * We turn off interrupts for all userland threads
735 			 * doing drain but we do active polling only for
736 			 * worker thread.
737 			 *
738 			 * Calling SQS_POLL_RING() even in the case of
739 			 * SQS_POLLING_ON() not succeeding is ok as
740 			 * SQS_POLL_RING() will not wake up poll thread
741 			 * if SQS_POLLING bit is not set.
742 			 */
743 			if (proc_type == SQS_WORKER)
744 				SQS_POLL_RING(sqp);
745 			goto again;
746 		}
747 
748 		squeue_worker_wakeup(sqp);
749 	}
750 
751 	/*
752 	 * If the poll thread is already running, just return. The
753 	 * poll thread continues to hold the proc and will finish
754 	 * processing.
755 	 */
756 	if (sqp->sq_state & SQS_GET_PKTS) {
757 		ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
758 		    SQS_POLL_QUIESCE_DONE)));
759 		sqp->sq_state &= ~proc_type;
760 		return;
761 	}
762 
763 	/*
764 	 *
765 	 * If we are the worker thread and no work is left, send the poll
766 	 * thread down once more to see if something arrived. Otherwise,
767 	 * turn the interrupts back on and we are done.
768 	 */
769 	if ((proc_type == SQS_WORKER) && (sqp->sq_state & SQS_POLLING)) {
770 		/*
771 		 * Do one last check to see if anything arrived
772 		 * in the NIC. We leave the SQS_PROC set to ensure
773 		 * that poll thread keeps the PROC and can decide
774 		 * if it needs to turn polling off or continue
775 		 * processing.
776 		 *
777 		 * If we drop the SQS_PROC here and poll thread comes
778 		 * up empty handed, it can not safely turn polling off
779 		 * since someone else could have acquired the PROC
780 		 * and started draining. The previously running poll
781 		 * thread and the current thread doing drain would end
782 		 * up in a race for turning polling on/off and more
783 		 * complex code would be required to deal with it.
784 		 *
785 		 * Its lot simpler for drain to hand the SQS_PROC to
786 		 * poll thread (if running) and let poll thread finish
787 		 * without worrying about racing with any other thread.
788 		 */
789 		ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
790 		    SQS_POLL_QUIESCE_DONE)));
791 		SQS_POLL_RING(sqp);
792 		sqp->sq_state &= ~proc_type;
793 	} else {
794 		/*
795 		 * The squeue is either not capable of polling or the
796 		 * attempt to blank (i.e., turn SQS_POLLING_ON()) was
797 		 * unsuccessful or poll thread already finished
798 		 * processing and didn't find anything. Since there
799 		 * is nothing queued and we already turn polling on
800 		 * (for all threads doing drain), we should turn
801 		 * polling off and relinquish the PROC.
802 		 */
803 		ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
804 		    SQS_POLL_QUIESCE_DONE)));
805 		SQS_POLLING_OFF(sqp, sq_poll_capable, sq_rx_ring);
806 		sqp->sq_state &= ~(SQS_PROC | proc_type);
807 		/*
808 		 * If we are not the worker and there is a pending quiesce
809 		 * event, wake up the worker
810 		 */
811 		if ((proc_type != SQS_WORKER) &&
812 		    (sqp->sq_state & SQS_WORKER_THR_CONTROL)) {
813 			squeue_worker_wakeup(sqp);
814 		}
815 	}
816 }
817 
818 /*
819  * Quiesce, Restart, or Cleanup of the squeue poll thread.
820  *
821  * Quiesce and Restart: After an squeue poll thread has been quiesced, it does
822  * not attempt to poll the underlying soft ring any more. The quiesce is
823  * triggered by the mac layer when it wants to quiesce a soft ring. Typically
824  * control operations such as changing the fanout of a NIC or VNIC (dladm
825  * setlinkprop) need to quiesce data flow before changing the wiring.
826  * The operation is done by the mac layer, but it calls back into IP to
827  * quiesce the soft ring. After completing the operation (say increase or
828  * decrease of the fanout) the mac layer then calls back into IP to restart
829  * the quiesced soft ring.
830  *
831  * Cleanup: This is triggered when the squeue binding to a soft ring is
832  * removed permanently. Typically interface plumb and unplumb would trigger
833  * this. It can also be triggered from the mac layer when a soft ring is
834  * being deleted say as the result of a fanout reduction. Since squeues are
835  * never deleted, the cleanup marks the squeue as fit for recycling and
836  * moves it to the zeroth squeue set.
837  */
838 static void
839 squeue_poll_thr_control(squeue_t *sqp)
840 {
841 	if (sqp->sq_state & SQS_POLL_THR_RESTART) {
842 		/* Restart implies a previous quiesce */
843 		ASSERT(sqp->sq_state & SQS_POLL_THR_QUIESCED);
844 		sqp->sq_state &= ~(SQS_POLL_THR_QUIESCED |
845 		    SQS_POLL_THR_RESTART);
846 		sqp->sq_state |= SQS_POLL_CAPAB;
847 		cv_signal(&sqp->sq_worker_cv);
848 		return;
849 	}
850 
851 	if (sqp->sq_state & SQS_POLL_THR_QUIESCE) {
852 		sqp->sq_state |= SQS_POLL_THR_QUIESCED;
853 		sqp->sq_state &= ~SQS_POLL_THR_QUIESCE;
854 		cv_signal(&sqp->sq_worker_cv);
855 		return;
856 	}
857 }
858 
859 /*
860  * POLLING Notes
861  *
862  * With polling mode, we want to do as much processing as we possibly can
863  * in worker thread context. The sweet spot is worker thread keeps doing
864  * work all the time in polling mode and writers etc. keep dumping packets
865  * to worker thread. Occassionally, we send the poll thread (running at
866  * lower priority to NIC to get the chain of packets to feed to worker).
867  * Sending the poll thread down to NIC is dependant on 3 criterions
868  *
869  * 1) Its always driven from squeue_drain and only if worker thread is
870  *	doing the drain.
871  * 2) We clear the backlog once and more packets arrived in between.
872  *	Before starting drain again, send the poll thread down if
873  *	the drain is being done by worker thread.
874  * 3) Before exiting the squeue_drain, if the poll thread is not already
875  *	working and we are the worker thread, try to poll one more time.
876  *
877  * For latency sake, we do allow any thread calling squeue_enter
878  * to process its packet provided:
879  *
880  * 1) Nothing is queued
881  * 2) If more packets arrived in between, the non worker thread are allowed
882  *	to do the drain till their time quanta expired provided SQS_GET_PKTS
883  *	wasn't set in between.
884  *
885  * Avoiding deadlocks with interrupts
886  * ==================================
887  *
888  * One of the big problem is that we can't send poll_thr down while holding
889  * the sq_lock since the thread can block. So we drop the sq_lock before
890  * calling sq_get_pkts(). We keep holding the SQS_PROC as long as the
891  * poll thread is running so that no other thread can acquire the
892  * perimeter in between. If the squeue_drain gets done (no more work
893  * left), it leaves the SQS_PROC set if poll thread is running.
894  */
895 
896 /*
897  * This is the squeue poll thread. In poll mode, it polls the underlying
898  * TCP softring and feeds packets into the squeue. The worker thread then
899  * drains the squeue. The poll thread also responds to control signals for
900  * quiesceing, restarting, or cleanup of an squeue. These are driven by
901  * control operations like plumb/unplumb or as a result of dynamic Rx ring
902  * related operations that are driven from the mac layer.
903  */
904 static void
905 squeue_polling_thread(squeue_t *sqp)
906 {
907 	kmutex_t *lock = &sqp->sq_lock;
908 	kcondvar_t *async = &sqp->sq_poll_cv;
909 	ip_mac_rx_t sq_get_pkts;
910 	ip_accept_t ip_accept;
911 	ill_rx_ring_t *sq_rx_ring;
912 	ill_t *sq_ill;
913 	mblk_t *head, *tail, *mp;
914 	uint_t cnt;
915 	void *sq_mac_handle;
916 	callb_cpr_t cprinfo;
917 	size_t bytes_to_pickup;
918 	uint32_t ctl_state;
919 
920 	CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_poll");
921 	mutex_enter(lock);
922 
923 	for (;;) {
924 		CALLB_CPR_SAFE_BEGIN(&cprinfo);
925 		cv_wait(async, lock);
926 		CALLB_CPR_SAFE_END(&cprinfo, lock);
927 
928 		ctl_state = sqp->sq_state & (SQS_POLL_THR_CONTROL |
929 		    SQS_POLL_THR_QUIESCED);
930 		if (ctl_state != 0) {
931 			/*
932 			 * If the squeue is quiesced, then wait for a control
933 			 * request. A quiesced squeue must not poll the
934 			 * underlying soft ring.
935 			 */
936 			if (ctl_state == SQS_POLL_THR_QUIESCED)
937 				continue;
938 			/*
939 			 * Act on control requests to quiesce, cleanup or
940 			 * restart an squeue
941 			 */
942 			squeue_poll_thr_control(sqp);
943 			continue;
944 		}
945 
946 		if (!(sqp->sq_state & SQS_POLL_CAPAB))
947 			continue;
948 
949 		ASSERT((sqp->sq_state &
950 		    (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) ==
951 		    (SQS_PROC|SQS_POLLING|SQS_GET_PKTS));
952 
953 poll_again:
954 		sq_rx_ring = sqp->sq_rx_ring;
955 		sq_get_pkts = sq_rx_ring->rr_rx;
956 		sq_mac_handle = sq_rx_ring->rr_rx_handle;
957 		ip_accept = sq_rx_ring->rr_ip_accept;
958 		sq_ill = sq_rx_ring->rr_ill;
959 		bytes_to_pickup = MAX_BYTES_TO_PICKUP;
960 		mutex_exit(lock);
961 		head = sq_get_pkts(sq_mac_handle, bytes_to_pickup);
962 		mp = NULL;
963 		if (head != NULL) {
964 			/*
965 			 * We got the packet chain from the mac layer. It
966 			 * would be nice to be able to process it inline
967 			 * for better performance but we need to give
968 			 * IP a chance to look at this chain to ensure
969 			 * that packets are really meant for this squeue
970 			 * and do the IP processing.
971 			 */
972 			mp = ip_accept(sq_ill, sq_rx_ring, sqp, head,
973 			    &tail, &cnt);
974 		}
975 		mutex_enter(lock);
976 		if (mp != NULL) {
977 			/*
978 			 * The ip_accept function has already added an
979 			 * ip_recv_attr_t mblk if that is needed.
980 			 */
981 			ENQUEUE_CHAIN(sqp, mp, tail, cnt);
982 		}
983 		ASSERT((sqp->sq_state &
984 		    (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) ==
985 		    (SQS_PROC|SQS_POLLING|SQS_GET_PKTS));
986 
987 		if (sqp->sq_first != NULL && !(sqp->sq_state & SQS_WORKER)) {
988 			/*
989 			 * We have packets to process and worker thread
990 			 * is not running.  Check to see if poll thread is
991 			 * allowed to process. Let it do processing only if it
992 			 * picked up some packets from the NIC otherwise
993 			 * wakeup the worker thread.
994 			 */
995 			if (mp != NULL) {
996 				hrtime_t  now;
997 
998 				now = gethrtime();
999 				sqp->sq_run = curthread;
1000 				sqp->sq_drain(sqp, SQS_POLL_PROC, now +
1001 				    squeue_drain_ns);
1002 				sqp->sq_run = NULL;
1003 
1004 				if (sqp->sq_first == NULL)
1005 					goto poll_again;
1006 
1007 				/*
1008 				 * Couldn't do the entire drain because the
1009 				 * time limit expired, let the
1010 				 * worker thread take over.
1011 				 */
1012 			}
1013 
1014 			/*
1015 			 * Put the SQS_PROC_HELD on so the worker
1016 			 * thread can distinguish where its called from. We
1017 			 * can remove the SQS_PROC flag here and turn off the
1018 			 * polling so that it wouldn't matter who gets the
1019 			 * processing but we get better performance this way
1020 			 * and save the cost of turn polling off and possibly
1021 			 * on again as soon as we start draining again.
1022 			 *
1023 			 * We can't remove the SQS_PROC flag without turning
1024 			 * polling off until we can guarantee that control
1025 			 * will return to squeue_drain immediately.
1026 			 */
1027 			sqp->sq_state |= SQS_PROC_HELD;
1028 			sqp->sq_state &= ~SQS_GET_PKTS;
1029 			squeue_worker_wakeup(sqp);
1030 		} else if (sqp->sq_first == NULL &&
1031 		    !(sqp->sq_state & SQS_WORKER)) {
1032 			/*
1033 			 * Nothing queued and worker thread not running.
1034 			 * Since we hold the proc, no other thread is
1035 			 * processing the squeue. This means that there
1036 			 * is no work to be done and nothing is queued
1037 			 * in squeue or in NIC. Turn polling off and go
1038 			 * back to interrupt mode.
1039 			 */
1040 			sqp->sq_state &= ~(SQS_PROC|SQS_GET_PKTS);
1041 			/* LINTED: constant in conditional context */
1042 			SQS_POLLING_OFF(sqp, B_TRUE, sq_rx_ring);
1043 
1044 			/*
1045 			 * If there is a pending control operation
1046 			 * wake up the worker, since it is currently
1047 			 * not running.
1048 			 */
1049 			if (sqp->sq_state & SQS_WORKER_THR_CONTROL) {
1050 				squeue_worker_wakeup(sqp);
1051 			}
1052 		} else {
1053 			/*
1054 			 * Worker thread is already running. We don't need
1055 			 * to do anything. Indicate that poll thread is done.
1056 			 */
1057 			sqp->sq_state &= ~SQS_GET_PKTS;
1058 		}
1059 		if (sqp->sq_state & SQS_POLL_THR_CONTROL) {
1060 			/*
1061 			 * Act on control requests to quiesce, cleanup or
1062 			 * restart an squeue
1063 			 */
1064 			squeue_poll_thr_control(sqp);
1065 		}
1066 	}
1067 }
1068 
1069 /*
1070  * The squeue worker thread acts on any control requests to quiesce, cleanup
1071  * or restart an ill_rx_ring_t by calling this function. The worker thread
1072  * synchronizes with the squeue poll thread to complete the request and finally
1073  * wakes up the requestor when the request is completed.
1074  */
1075 static void
1076 squeue_worker_thr_control(squeue_t *sqp)
1077 {
1078 	ill_t	*ill;
1079 	ill_rx_ring_t	*rx_ring;
1080 
1081 	ASSERT(MUTEX_HELD(&sqp->sq_lock));
1082 
1083 	if (sqp->sq_state & SQS_POLL_RESTART) {
1084 		/* Restart implies a previous quiesce. */
1085 		ASSERT((sqp->sq_state & (SQS_PROC_HELD |
1086 		    SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)) ==
1087 		    (SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER));
1088 		/*
1089 		 * Request the squeue poll thread to restart and wait till
1090 		 * it actually restarts.
1091 		 */
1092 		sqp->sq_state &= ~SQS_POLL_QUIESCE_DONE;
1093 		sqp->sq_state |= SQS_POLL_THR_RESTART;
1094 		cv_signal(&sqp->sq_poll_cv);
1095 		while (sqp->sq_state & SQS_POLL_THR_QUIESCED)
1096 			cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1097 		sqp->sq_state &= ~(SQS_POLL_RESTART | SQS_PROC |
1098 		    SQS_WORKER);
1099 		/*
1100 		 * Signal any waiter that is waiting for the restart
1101 		 * to complete
1102 		 */
1103 		sqp->sq_state |= SQS_POLL_RESTART_DONE;
1104 		cv_signal(&sqp->sq_ctrlop_done_cv);
1105 		return;
1106 	}
1107 
1108 	if (sqp->sq_state & SQS_PROC_HELD) {
1109 		/* The squeue poll thread handed control to us */
1110 		ASSERT(sqp->sq_state & SQS_PROC);
1111 	}
1112 
1113 	/*
1114 	 * Prevent any other thread from processing the squeue
1115 	 * until we finish the control actions by setting SQS_PROC.
1116 	 * But allow ourself to reenter by setting SQS_WORKER
1117 	 */
1118 	sqp->sq_state |= (SQS_PROC | SQS_WORKER);
1119 
1120 	/* Signal the squeue poll thread and wait for it to quiesce itself */
1121 	if (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) {
1122 		sqp->sq_state |= SQS_POLL_THR_QUIESCE;
1123 		cv_signal(&sqp->sq_poll_cv);
1124 		while (!(sqp->sq_state & SQS_POLL_THR_QUIESCED))
1125 			cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1126 	}
1127 
1128 	rx_ring = sqp->sq_rx_ring;
1129 	ill = rx_ring->rr_ill;
1130 	/*
1131 	 * The lock hierarchy is as follows.
1132 	 * cpu_lock -> ill_lock -> sqset_lock -> sq_lock
1133 	 */
1134 	mutex_exit(&sqp->sq_lock);
1135 	mutex_enter(&ill->ill_lock);
1136 	mutex_enter(&sqp->sq_lock);
1137 
1138 	SQS_POLLING_OFF(sqp, (sqp->sq_state & SQS_POLL_CAPAB) != 0,
1139 	    sqp->sq_rx_ring);
1140 	sqp->sq_state &= ~(SQS_POLL_CAPAB | SQS_GET_PKTS | SQS_PROC_HELD);
1141 	if (sqp->sq_state & SQS_POLL_CLEANUP) {
1142 		/*
1143 		 * Disassociate this squeue from its ill_rx_ring_t.
1144 		 * The rr_sqp, sq_rx_ring fields are protected by the
1145 		 * corresponding squeue, ill_lock* and sq_lock. Holding any
1146 		 * of them will ensure that the ring to squeue mapping does
1147 		 * not change.
1148 		 */
1149 		ASSERT(!(sqp->sq_state & SQS_DEFAULT));
1150 
1151 		sqp->sq_rx_ring = NULL;
1152 		rx_ring->rr_sqp = NULL;
1153 
1154 		sqp->sq_state &= ~(SQS_POLL_CLEANUP | SQS_POLL_THR_QUIESCED |
1155 		    SQS_POLL_QUIESCE_DONE);
1156 		sqp->sq_ill = NULL;
1157 
1158 		rx_ring->rr_rx_handle = NULL;
1159 		rx_ring->rr_intr_handle = NULL;
1160 		rx_ring->rr_intr_enable = NULL;
1161 		rx_ring->rr_intr_disable = NULL;
1162 		sqp->sq_state |= SQS_POLL_CLEANUP_DONE;
1163 	} else {
1164 		sqp->sq_state &= ~SQS_POLL_QUIESCE;
1165 		sqp->sq_state |= SQS_POLL_QUIESCE_DONE;
1166 	}
1167 	/*
1168 	 * Signal any waiter that is waiting for the quiesce or cleanup
1169 	 * to complete and also wait for it to actually see and reset the
1170 	 * SQS_POLL_CLEANUP_DONE.
1171 	 */
1172 	cv_signal(&sqp->sq_ctrlop_done_cv);
1173 	mutex_exit(&ill->ill_lock);
1174 	if (sqp->sq_state & SQS_POLL_CLEANUP_DONE) {
1175 		cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1176 		sqp->sq_state &= ~(SQS_PROC | SQS_WORKER);
1177 	}
1178 }
1179 
1180 static void
1181 squeue_worker(squeue_t *sqp)
1182 {
1183 	kmutex_t *lock = &sqp->sq_lock;
1184 	kcondvar_t *async = &sqp->sq_worker_cv;
1185 	callb_cpr_t cprinfo;
1186 	hrtime_t now;
1187 
1188 	CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_worker");
1189 	mutex_enter(lock);
1190 
1191 	for (;;) {
1192 		for (;;) {
1193 			/*
1194 			 * If the poll thread has handed control to us
1195 			 * we need to break out of the wait.
1196 			 */
1197 			if (sqp->sq_state & SQS_PROC_HELD)
1198 				break;
1199 
1200 			/*
1201 			 * If the squeue is not being processed and we either
1202 			 * have messages to drain or some thread has signaled
1203 			 * some control activity we need to break
1204 			 */
1205 			if (!(sqp->sq_state & SQS_PROC) &&
1206 			    ((sqp->sq_state & SQS_WORKER_THR_CONTROL) ||
1207 			    (sqp->sq_first != NULL)))
1208 				break;
1209 
1210 			/*
1211 			 * If we have started some control action, then check
1212 			 * for the SQS_WORKER flag (since we don't
1213 			 * release the squeue) to make sure we own the squeue
1214 			 * and break out
1215 			 */
1216 			if ((sqp->sq_state & SQS_WORKER_THR_CONTROL) &&
1217 			    (sqp->sq_state & SQS_WORKER))
1218 				break;
1219 
1220 			CALLB_CPR_SAFE_BEGIN(&cprinfo);
1221 			cv_wait(async, lock);
1222 			CALLB_CPR_SAFE_END(&cprinfo, lock);
1223 		}
1224 		if (sqp->sq_state & SQS_WORKER_THR_CONTROL) {
1225 			squeue_worker_thr_control(sqp);
1226 			continue;
1227 		}
1228 		ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
1229 		    SQS_POLL_CLEANUP_DONE | SQS_POLL_QUIESCE_DONE |
1230 		    SQS_WORKER_THR_CONTROL | SQS_POLL_THR_CONTROL)));
1231 
1232 		if (sqp->sq_state & SQS_PROC_HELD)
1233 			sqp->sq_state &= ~SQS_PROC_HELD;
1234 
1235 		now = gethrtime();
1236 		sqp->sq_run = curthread;
1237 		sqp->sq_drain(sqp, SQS_WORKER, now +  squeue_drain_ns);
1238 		sqp->sq_run = NULL;
1239 	}
1240 }
1241 
1242 uintptr_t *
1243 squeue_getprivate(squeue_t *sqp, sqprivate_t p)
1244 {
1245 	ASSERT(p < SQPRIVATE_MAX);
1246 
1247 	return (&sqp->sq_private[p]);
1248 }
1249 
1250 /* ARGSUSED */
1251 void
1252 squeue_wakeup_conn(void *arg, mblk_t *mp, void *arg2, ip_recv_attr_t *dummy)
1253 {
1254 	conn_t *connp = (conn_t *)arg;
1255 	squeue_t *sqp = connp->conn_sqp;
1256 
1257 	/*
1258 	 * Mark the squeue as paused before waking up the thread stuck
1259 	 * in squeue_synch_enter().
1260 	 */
1261 	mutex_enter(&sqp->sq_lock);
1262 	sqp->sq_state |= SQS_PAUSE;
1263 
1264 	/*
1265 	 * Notify the thread that it's OK to proceed; that is done by
1266 	 * clearing the MSGWAITSYNC flag. The synch thread will free the mblk.
1267 	 */
1268 	ASSERT(mp->b_flag & MSGWAITSYNC);
1269 	mp->b_flag &= ~MSGWAITSYNC;
1270 	cv_broadcast(&connp->conn_sq_cv);
1271 
1272 	/*
1273 	 * We are doing something on behalf of another thread, so we have to
1274 	 * pause and wait until it finishes.
1275 	 */
1276 	while (sqp->sq_state & SQS_PAUSE) {
1277 		cv_wait(&sqp->sq_synch_cv, &sqp->sq_lock);
1278 	}
1279 	mutex_exit(&sqp->sq_lock);
1280 }
1281 
1282 int
1283 squeue_synch_enter(conn_t *connp, mblk_t *use_mp)
1284 {
1285 	squeue_t *sqp;
1286 
1287 again:
1288 	sqp = connp->conn_sqp;
1289 
1290 	mutex_enter(&sqp->sq_lock);
1291 	if (sqp->sq_first == NULL && !(sqp->sq_state & SQS_PROC)) {
1292 		/*
1293 		 * We are OK to proceed if the squeue is empty, and
1294 		 * no one owns the squeue.
1295 		 *
1296 		 * The caller won't own the squeue as this is called from the
1297 		 * application.
1298 		 */
1299 		ASSERT(sqp->sq_run == NULL);
1300 
1301 		sqp->sq_state |= SQS_PROC;
1302 		sqp->sq_run = curthread;
1303 		mutex_exit(&sqp->sq_lock);
1304 
1305 		/*
1306 		 * Handle squeue switching. The conn's squeue can only change
1307 		 * while there is a thread in the squeue, which is why we do
1308 		 * the check after entering the squeue. If it has changed, exit
1309 		 * this squeue and redo everything with the new sqeueue.
1310 		 */
1311 		if (sqp != connp->conn_sqp) {
1312 			mutex_enter(&sqp->sq_lock);
1313 			sqp->sq_state &= ~SQS_PROC;
1314 			sqp->sq_run = NULL;
1315 			mutex_exit(&sqp->sq_lock);
1316 			goto again;
1317 		}
1318 #if SQUEUE_DEBUG
1319 		sqp->sq_curmp = NULL;
1320 		sqp->sq_curproc = NULL;
1321 		sqp->sq_connp = connp;
1322 #endif
1323 		connp->conn_on_sqp = B_TRUE;
1324 		return (0);
1325 	} else {
1326 		mblk_t  *mp;
1327 
1328 		mp = (use_mp == NULL) ? allocb(0, BPRI_MED) : use_mp;
1329 		if (mp == NULL) {
1330 			mutex_exit(&sqp->sq_lock);
1331 			return (ENOMEM);
1332 		}
1333 
1334 		/*
1335 		 * We mark the mblk as awaiting synchronous squeue access
1336 		 * by setting the MSGWAITSYNC flag. Once squeue_wakeup_conn
1337 		 * fires, MSGWAITSYNC is cleared, at which point we know we
1338 		 * have exclusive access.
1339 		 */
1340 		mp->b_flag |= MSGWAITSYNC;
1341 
1342 		CONN_INC_REF(connp);
1343 		SET_SQUEUE(mp, squeue_wakeup_conn, connp);
1344 		ENQUEUE_CHAIN(sqp, mp, mp, 1);
1345 
1346 		ASSERT(sqp->sq_run != curthread);
1347 
1348 		/* Wait until the enqueued mblk get processed. */
1349 		while (mp->b_flag & MSGWAITSYNC)
1350 			cv_wait(&connp->conn_sq_cv, &sqp->sq_lock);
1351 		mutex_exit(&sqp->sq_lock);
1352 
1353 		if (use_mp == NULL)
1354 			freeb(mp);
1355 
1356 		return (0);
1357 	}
1358 }
1359 
1360 /*
1361  * If possible, attempt to immediately process a single queued request, should
1362  * it match the supplied conn_t reference.  This is primarily intended to elide
1363  * squeue worker thread wake-ups during local TCP connect() or close()
1364  * operations where the response is placed on the squeue during processing.
1365  */
1366 static void
1367 squeue_try_drain_one(squeue_t *sqp, conn_t *compare_conn)
1368 {
1369 	mblk_t *next, *mp = sqp->sq_first;
1370 	conn_t *connp;
1371 	sqproc_t proc = (sqproc_t)mp->b_queue;
1372 	ip_recv_attr_t iras, *ira = NULL;
1373 
1374 	ASSERT(MUTEX_HELD(&sqp->sq_lock));
1375 	ASSERT((sqp->sq_state & SQS_PROC) == 0);
1376 	ASSERT(sqp->sq_run == NULL);
1377 	VERIFY(mp != NULL);
1378 
1379 	/*
1380 	 * There is no guarantee that compare_conn references a valid object at
1381 	 * this time, so under no circumstance may it be deferenced unless it
1382 	 * matches the squeue entry.
1383 	 */
1384 	connp = (conn_t *)mp->b_prev;
1385 	if (connp != compare_conn) {
1386 		return;
1387 	}
1388 
1389 	next = mp->b_next;
1390 	proc = (sqproc_t)mp->b_queue;
1391 
1392 	ASSERT(proc != NULL);
1393 	ASSERT(sqp->sq_count > 0);
1394 
1395 	/* Dequeue item from squeue */
1396 	if (next == NULL) {
1397 		sqp->sq_first = NULL;
1398 		sqp->sq_last = NULL;
1399 	} else {
1400 		sqp->sq_first = next;
1401 	}
1402 	sqp->sq_count--;
1403 
1404 	sqp->sq_state |= SQS_PROC;
1405 	sqp->sq_run = curthread;
1406 	mutex_exit(&sqp->sq_lock);
1407 
1408 	/* Prep mblk_t and retrieve ira if needed */
1409 	mp->b_prev = NULL;
1410 	mp->b_queue = NULL;
1411 	mp->b_next = NULL;
1412 	if (ip_recv_attr_is_mblk(mp)) {
1413 		mblk_t	*attrmp = mp;
1414 
1415 		ASSERT(attrmp->b_cont != NULL);
1416 
1417 		mp = attrmp->b_cont;
1418 		attrmp->b_cont = NULL;
1419 
1420 		ASSERT(mp->b_queue == NULL);
1421 		ASSERT(mp->b_prev == NULL);
1422 
1423 		if (!ip_recv_attr_from_mblk(attrmp, &iras)) {
1424 			/* ill_t or ip_stack_t disappeared */
1425 			ip_drop_input("ip_recv_attr_from_mblk", mp, NULL);
1426 			ira_cleanup(&iras, B_TRUE);
1427 			CONN_DEC_REF(connp);
1428 			goto done;
1429 		}
1430 		ira = &iras;
1431 	}
1432 
1433 	SQUEUE_DBG_SET(sqp, mp, proc, connp, mp->b_tag);
1434 	connp->conn_on_sqp = B_TRUE;
1435 	DTRACE_PROBE3(squeue__proc__start, squeue_t *, sqp, mblk_t *, mp,
1436 	    conn_t *, connp);
1437 	(*proc)(connp, mp, sqp, ira);
1438 	DTRACE_PROBE2(squeue__proc__end, squeue_t *, sqp, conn_t *, connp);
1439 	connp->conn_on_sqp = B_FALSE;
1440 	CONN_DEC_REF(connp);
1441 	SQUEUE_DBG_CLEAR(sqp);
1442 
1443 done:
1444 	mutex_enter(&sqp->sq_lock);
1445 	sqp->sq_state &= ~(SQS_PROC);
1446 	sqp->sq_run = NULL;
1447 }
1448 
1449 void
1450 squeue_synch_exit(conn_t *connp, int flag)
1451 {
1452 	squeue_t *sqp = connp->conn_sqp;
1453 
1454 	ASSERT(flag == SQ_NODRAIN || flag == SQ_PROCESS);
1455 
1456 	mutex_enter(&sqp->sq_lock);
1457 	if (sqp->sq_run != curthread) {
1458 		/*
1459 		 * The caller doesn't own the squeue, clear the SQS_PAUSE flag,
1460 		 * and wake up the squeue owner, such that owner can continue
1461 		 * processing.
1462 		 */
1463 		ASSERT(sqp->sq_state & SQS_PAUSE);
1464 		sqp->sq_state &= ~SQS_PAUSE;
1465 
1466 		/* There should be only one thread blocking on sq_synch_cv. */
1467 		cv_signal(&sqp->sq_synch_cv);
1468 		mutex_exit(&sqp->sq_lock);
1469 		return;
1470 	}
1471 
1472 	ASSERT(sqp->sq_state & SQS_PROC);
1473 
1474 	sqp->sq_state &= ~SQS_PROC;
1475 	sqp->sq_run = NULL;
1476 	connp->conn_on_sqp = B_FALSE;
1477 
1478 	/* If the caller opted in, attempt to process the head squeue item. */
1479 	if (flag == SQ_PROCESS && sqp->sq_first != NULL) {
1480 		squeue_try_drain_one(sqp, connp);
1481 	}
1482 
1483 	/* Wake up the worker if further requests are pending. */
1484 	if (sqp->sq_first != NULL) {
1485 		squeue_worker_wakeup(sqp);
1486 	}
1487 	mutex_exit(&sqp->sq_lock);
1488 }
1489