xref: /titanic_44/usr/src/uts/common/inet/squeue.c (revision c623edd31cf7c582de6a3f26cc42c651d6002f73)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 /*
27  * Squeues: General purpose serialization mechanism
28  * ------------------------------------------------
29  *
30  * Background:
31  * -----------
32  *
33  * This is a general purpose high-performance serialization mechanism
34  * currently used by TCP/IP. It is implement by means of a per CPU queue,
35  * a worker thread and a polling thread with are bound to the CPU
36  * associated with the squeue. The squeue is strictly FIFO for both read
37  * and write side and only one thread can process it at any given time.
38  * The design goal of squeue was to offer a very high degree of
39  * parallelization (on a per H/W execution pipeline basis) with at
40  * most one queuing.
41  *
42  * The modules needing protection typically calls squeue_enter() or
43  * squeue_enter_chain() routine as soon as a thread enter the module
44  * from either direction. For each packet, the processing function
45  * and argument is stored in the mblk itself. When the packet is ready
46  * to be processed, the squeue retrieves the stored function and calls
47  * it with the supplied argument and the pointer to the packet itself.
48  * The called function can assume that no other thread is processing
49  * the squeue when it is executing.
50  *
51  * Squeue/connection binding:
52  * --------------------------
53  *
54  * TCP/IP uses an IP classifier in conjunction with squeue where specific
55  * connections are assigned to specific squeue (based on various policies),
56  * at the connection creation time. Once assigned, the connection to
57  * squeue mapping is never changed and all future packets for that
58  * connection are processed on that squeue. The connection ("conn") to
59  * squeue mapping is stored in "conn_t" member "conn_sqp".
60  *
61  * Since the processing of the connection cuts across multiple layers
62  * but still allows packets for different connnection to be processed on
63  * other CPU/squeues, squeues are also termed as "Vertical Perimeter" or
64  * "Per Connection Vertical Perimeter".
65  *
66  * Processing Model:
67  * -----------------
68  *
69  * Squeue doesn't necessary processes packets with its own worker thread.
70  * The callers can pick if they just want to queue the packet, process
71  * their packet if nothing is queued or drain and process. The first two
72  * modes are typically employed when the packet was generated while
73  * already doing the processing behind the squeue and last mode (drain
74  * and process) is typically employed when the thread is entering squeue
75  * for the first time. The squeue still imposes a finite time limit
76  * for which a external thread can do processing after which it switches
77  * processing to its own worker thread.
78  *
79  * Once created, squeues are never deleted. Hence squeue pointers are
80  * always valid. This means that functions outside the squeue can still
81  * refer safely to conn_sqp and their is no need for ref counts.
82  *
83  * Only a thread executing in the squeue can change the squeue of the
84  * connection. It does so by calling a squeue framework function to do this.
85  * After changing the squeue, the thread must leave the squeue. It must not
86  * continue to execute any code that needs squeue protection.
87  *
88  * The squeue framework, after entering the squeue, checks if the current
89  * squeue matches the conn_sqp. If the check fails, the packet is delivered
90  * to right squeue.
91  *
92  * Polling Model:
93  * --------------
94  *
95  * Squeues can control the rate of packet arrival into itself from the
96  * NIC or specific Rx ring within a NIC. As part of capability negotiation
97  * between IP and MAC layer, squeue are created for each TCP soft ring
98  * (or TCP Rx ring - to be implemented in future). As part of this
99  * negotiation, squeues get a cookie for underlying soft ring or Rx
100  * ring, a function to turn off incoming packets and a function to call
101  * to poll for packets. This helps schedule the receive side packet
102  * processing so that queue backlog doesn't build up and packet processing
103  * doesn't keep getting disturbed by high priority interrupts. As part
104  * of this mode, as soon as a backlog starts building, squeue turns off
105  * the interrupts and switches to poll mode. In poll mode, when poll
106  * thread goes down to retrieve packets, it retrieves them in the form of
107  * a chain which improves performance even more. As the squeue/softring
108  * system gets more packets, it gets more efficient by switching to
109  * polling more often and dealing with larger packet chains.
110  *
111  */
112 
113 #include <sys/types.h>
114 #include <sys/cmn_err.h>
115 #include <sys/debug.h>
116 #include <sys/kmem.h>
117 #include <sys/cpuvar.h>
118 #include <sys/condvar_impl.h>
119 #include <sys/systm.h>
120 #include <sys/callb.h>
121 #include <sys/sdt.h>
122 #include <sys/ddi.h>
123 #include <sys/sunddi.h>
124 
125 #include <inet/ipclassifier.h>
126 #include <inet/udp_impl.h>
127 
128 #include <sys/squeue_impl.h>
129 
130 static void squeue_fire(void *);
131 static void squeue_drain(squeue_t *, uint_t, hrtime_t);
132 static void squeue_worker(squeue_t *sqp);
133 static void squeue_polling_thread(squeue_t *sqp);
134 
135 kmem_cache_t *squeue_cache;
136 
137 #define	SQUEUE_MSEC_TO_NSEC 1000000
138 
139 int squeue_drain_ms = 20;
140 int squeue_workerwait_ms = 0;
141 
142 /* The values above converted to ticks or nano seconds */
143 static int squeue_drain_ns = 0;
144 static int squeue_workerwait_tick = 0;
145 
146 #define	MAX_BYTES_TO_PICKUP	150000
147 
148 #define	ENQUEUE_CHAIN(sqp, mp, tail, cnt) {			\
149 	/*							\
150 	 * Enqueue our mblk chain.				\
151 	 */							\
152 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
153 								\
154 	if ((sqp)->sq_last != NULL)				\
155 		(sqp)->sq_last->b_next = (mp);			\
156 	else							\
157 		(sqp)->sq_first = (mp);				\
158 	(sqp)->sq_last = (tail);				\
159 	(sqp)->sq_count += (cnt);				\
160 	ASSERT((sqp)->sq_count > 0);				\
161 	DTRACE_PROBE4(squeue__enqueuechain, squeue_t *, sqp,	\
162 		mblk_t *, mp, mblk_t *, tail, int, cnt);	\
163 								\
164 }
165 
166 #define	SQS_POLLING_ON(sqp, sq_poll_capable, rx_ring) {		\
167 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
168 	if (sq_poll_capable) {					\
169 		ASSERT(rx_ring != NULL);			\
170 		ASSERT(sqp->sq_state & SQS_POLL_CAPAB);		\
171 		if (!(sqp->sq_state & SQS_POLLING)) {		\
172 			sqp->sq_state |= SQS_POLLING;		\
173 			rx_ring->rr_intr_disable(rx_ring->rr_intr_handle); \
174 		}						\
175 	}							\
176 }
177 
178 #define	SQS_POLLING_OFF(sqp, sq_poll_capable, rx_ring) {	\
179 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
180 	if (sq_poll_capable) {					\
181 		ASSERT(rx_ring != NULL);			\
182 		ASSERT(sqp->sq_state & SQS_POLL_CAPAB);		\
183 		if (sqp->sq_state & SQS_POLLING) {		\
184 			sqp->sq_state &= ~SQS_POLLING;		\
185 			rx_ring->rr_intr_enable(rx_ring->rr_intr_handle); \
186 		}						\
187 	}							\
188 }
189 
190 #define	SQS_POLL_RING(sqp, sq_poll_capable) {			\
191 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
192 	if (sq_poll_capable) {					\
193 		ASSERT(sqp->sq_state & SQS_POLL_CAPAB);		\
194 		if (!(sqp->sq_state & SQS_GET_PKTS)) {		\
195 			sqp->sq_state |= SQS_GET_PKTS;		\
196 			cv_signal(&sqp->sq_poll_cv);		\
197 		}						\
198 	}							\
199 }
200 
201 #ifdef DEBUG
202 #define	SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) {		\
203 	(sqp)->sq_curmp = (mp);					\
204 	(sqp)->sq_curproc = (proc);				\
205 	(sqp)->sq_connp = (connp);				\
206 	(mp)->b_tag = (sqp)->sq_tag = (tag);			\
207 }
208 
209 #define	SQUEUE_DBG_CLEAR(sqp)	{				\
210 	(sqp)->sq_curmp = NULL;					\
211 	(sqp)->sq_curproc = NULL;				\
212 	(sqp)->sq_connp = NULL;					\
213 }
214 #else
215 #define	SQUEUE_DBG_SET(sqp, mp, proc, connp, tag)
216 #define	SQUEUE_DBG_CLEAR(sqp)
217 #endif
218 
219 void
220 squeue_init(void)
221 {
222 	squeue_cache = kmem_cache_create("squeue_cache",
223 	    sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0);
224 
225 	squeue_drain_ns = squeue_drain_ms * SQUEUE_MSEC_TO_NSEC;
226 	squeue_workerwait_tick = MSEC_TO_TICK_ROUNDUP(squeue_workerwait_ms);
227 }
228 
229 /* ARGSUSED */
230 squeue_t *
231 squeue_create(clock_t wait, pri_t pri)
232 {
233 	squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP);
234 
235 	bzero(sqp, sizeof (squeue_t));
236 	sqp->sq_bind = PBIND_NONE;
237 	sqp->sq_priority = pri;
238 	sqp->sq_wait = MSEC_TO_TICK(wait);
239 	sqp->sq_worker = thread_create(NULL, 0, squeue_worker,
240 	    sqp, 0, &p0, TS_RUN, pri);
241 
242 	sqp->sq_poll_thr = thread_create(NULL, 0, squeue_polling_thread,
243 	    sqp, 0, &p0, TS_RUN, pri);
244 
245 	sqp->sq_enter = squeue_enter;
246 	sqp->sq_drain = squeue_drain;
247 
248 	return (sqp);
249 }
250 
251 /*
252  * Bind squeue worker thread to the specified CPU, given by CPU id.
253  * If the CPU id  value is -1, bind the worker thread to the value
254  * specified in sq_bind field. If a thread is already bound to a
255  * different CPU, unbind it from the old CPU and bind to the new one.
256  */
257 
258 void
259 squeue_bind(squeue_t *sqp, processorid_t bind)
260 {
261 	mutex_enter(&sqp->sq_lock);
262 	ASSERT(sqp->sq_bind != PBIND_NONE || bind != PBIND_NONE);
263 	ASSERT(MUTEX_HELD(&cpu_lock));
264 
265 	if (sqp->sq_state & SQS_BOUND) {
266 		if (sqp->sq_bind == bind) {
267 			mutex_exit(&sqp->sq_lock);
268 			return;
269 		}
270 		thread_affinity_clear(sqp->sq_worker);
271 	} else {
272 		sqp->sq_state |= SQS_BOUND;
273 	}
274 
275 	if (bind != PBIND_NONE)
276 		sqp->sq_bind = bind;
277 
278 	thread_affinity_set(sqp->sq_worker, sqp->sq_bind);
279 	mutex_exit(&sqp->sq_lock);
280 }
281 
282 void
283 squeue_unbind(squeue_t *sqp)
284 {
285 	mutex_enter(&sqp->sq_lock);
286 	if (!(sqp->sq_state & SQS_BOUND)) {
287 		mutex_exit(&sqp->sq_lock);
288 		return;
289 	}
290 
291 	sqp->sq_state &= ~SQS_BOUND;
292 	thread_affinity_clear(sqp->sq_worker);
293 	mutex_exit(&sqp->sq_lock);
294 }
295 
296 void
297 squeue_worker_wakeup(squeue_t *sqp)
298 {
299 	timeout_id_t tid = (sqp)->sq_tid;
300 
301 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));
302 
303 	if (sqp->sq_wait == 0) {
304 		ASSERT(tid == 0);
305 		ASSERT(!(sqp->sq_state & SQS_TMO_PROG));
306 		sqp->sq_awaken = lbolt;
307 		cv_signal(&sqp->sq_worker_cv);
308 		mutex_exit(&sqp->sq_lock);
309 		return;
310 	}
311 
312 	/*
313 	 * Queue isn't being processed, so take
314 	 * any post enqueue actions needed before leaving.
315 	 */
316 	if (tid != 0) {
317 		/*
318 		 * Waiting for an enter() to process mblk(s).
319 		 */
320 		clock_t	waited = lbolt - sqp->sq_awaken;
321 
322 		if (TICK_TO_MSEC(waited) >= sqp->sq_wait) {
323 			/*
324 			 * Times up and have a worker thread
325 			 * waiting for work, so schedule it.
326 			 */
327 			sqp->sq_tid = 0;
328 			sqp->sq_awaken = lbolt;
329 			cv_signal(&sqp->sq_worker_cv);
330 			mutex_exit(&sqp->sq_lock);
331 			(void) untimeout(tid);
332 			return;
333 		}
334 		mutex_exit(&sqp->sq_lock);
335 		return;
336 	} else if (sqp->sq_state & SQS_TMO_PROG) {
337 		mutex_exit(&sqp->sq_lock);
338 		return;
339 	} else {
340 		clock_t	wait = sqp->sq_wait;
341 		/*
342 		 * Wait up to sqp->sq_wait ms for an
343 		 * enter() to process this queue. We
344 		 * don't want to contend on timeout locks
345 		 * with sq_lock held for performance reasons,
346 		 * so drop the sq_lock before calling timeout
347 		 * but we need to check if timeout is required
348 		 * after re acquiring the sq_lock. Once
349 		 * the sq_lock is dropped, someone else could
350 		 * have processed the packet or the timeout could
351 		 * have already fired.
352 		 */
353 		sqp->sq_state |= SQS_TMO_PROG;
354 		mutex_exit(&sqp->sq_lock);
355 		tid = timeout(squeue_fire, sqp, wait);
356 		mutex_enter(&sqp->sq_lock);
357 		/* Check again if we still need the timeout */
358 		if (((sqp->sq_state & (SQS_PROC|SQS_TMO_PROG)) ==
359 		    SQS_TMO_PROG) && (sqp->sq_tid == 0) &&
360 		    (sqp->sq_first != NULL)) {
361 				sqp->sq_state &= ~SQS_TMO_PROG;
362 				sqp->sq_tid = tid;
363 				mutex_exit(&sqp->sq_lock);
364 				return;
365 		} else {
366 			if (sqp->sq_state & SQS_TMO_PROG) {
367 				sqp->sq_state &= ~SQS_TMO_PROG;
368 				mutex_exit(&sqp->sq_lock);
369 				(void) untimeout(tid);
370 			} else {
371 				/*
372 				 * The timer fired before we could
373 				 * reacquire the sq_lock. squeue_fire
374 				 * removes the SQS_TMO_PROG flag
375 				 * and we don't need to	do anything
376 				 * else.
377 				 */
378 				mutex_exit(&sqp->sq_lock);
379 			}
380 		}
381 	}
382 
383 	ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
384 }
385 
386 /*
387  * squeue_enter() - enter squeue sqp with mblk mp (which can be
388  * a chain), while tail points to the end and cnt in number of
389  * mblks in the chain.
390  *
391  * For a chain of single packet (i.e. mp == tail), go through the
392  * fast path if no one is processing the squeue and nothing is queued.
393  *
394  * The proc and arg for each mblk is already stored in the mblk in
395  * appropriate places.
396  *
397  * The process_flag specifies if we are allowed to process the mblk
398  * and drain in the entering thread context. If process_flag is
399  * SQ_FILL, then we just queue the mblk and return (after signaling
400  * the worker thread if no one else is processing the squeue).
401  */
402 /* ARGSUSED */
403 void
404 squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt,
405     int process_flag, uint8_t tag)
406 {
407 	conn_t		*connp;
408 	sqproc_t	proc;
409 	hrtime_t	now;
410 
411 	ASSERT(sqp != NULL);
412 	ASSERT(mp != NULL);
413 	ASSERT(tail != NULL);
414 	ASSERT(cnt > 0);
415 	ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
416 
417 	mutex_enter(&sqp->sq_lock);
418 
419 	/*
420 	 * Try to process the packet if SQ_FILL flag is not set and
421 	 * we are allowed to process the squeue. The SQ_NODRAIN is
422 	 * ignored if the packet chain consists of more than 1 packet.
423 	 */
424 	if (!(sqp->sq_state & SQS_PROC) && ((process_flag == SQ_PROCESS) ||
425 	    (process_flag == SQ_NODRAIN && sqp->sq_first == NULL))) {
426 		/*
427 		 * See if anything is already queued. If we are the
428 		 * first packet, do inline processing else queue the
429 		 * packet and do the drain.
430 		 */
431 		if (sqp->sq_first == NULL && cnt == 1) {
432 			/*
433 			 * Fast-path, ok to process and nothing queued.
434 			 */
435 			sqp->sq_state |= (SQS_PROC|SQS_FAST);
436 			sqp->sq_run = curthread;
437 			mutex_exit(&sqp->sq_lock);
438 
439 			/*
440 			 * We are the chain of 1 packet so
441 			 * go through this fast path.
442 			 */
443 			ASSERT(mp->b_prev != NULL);
444 			ASSERT(mp->b_queue != NULL);
445 			connp = (conn_t *)mp->b_prev;
446 			mp->b_prev = NULL;
447 			proc = (sqproc_t)mp->b_queue;
448 			mp->b_queue = NULL;
449 			ASSERT(proc != NULL && connp != NULL);
450 			ASSERT(mp->b_next == NULL);
451 
452 			/*
453 			 * Handle squeue switching. More details in the
454 			 * block comment at the top of the file
455 			 */
456 			if (connp->conn_sqp == sqp) {
457 				SQUEUE_DBG_SET(sqp, mp, proc, connp,
458 				    tag);
459 				connp->conn_on_sqp = B_TRUE;
460 				DTRACE_PROBE3(squeue__proc__start, squeue_t *,
461 				    sqp, mblk_t *, mp, conn_t *, connp);
462 				(*proc)(connp, mp, sqp);
463 				DTRACE_PROBE2(squeue__proc__end, squeue_t *,
464 				    sqp, conn_t *, connp);
465 				connp->conn_on_sqp = B_FALSE;
466 				SQUEUE_DBG_CLEAR(sqp);
467 				CONN_DEC_REF(connp);
468 			} else {
469 				SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc,
470 				    connp, SQ_FILL, SQTAG_SQUEUE_CHANGE);
471 			}
472 			ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
473 			mutex_enter(&sqp->sq_lock);
474 			sqp->sq_state &= ~(SQS_PROC|SQS_FAST);
475 			sqp->sq_run = NULL;
476 			if (sqp->sq_first == NULL ||
477 			    process_flag == SQ_NODRAIN) {
478 				if (sqp->sq_first != NULL) {
479 					squeue_worker_wakeup(sqp);
480 					return;
481 				}
482 				/*
483 				 * We processed inline our packet and nothing
484 				 * new has arrived. We are done. In case any
485 				 * control actions are pending, wake up the
486 				 * worker.
487 				 */
488 				if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
489 					cv_signal(&sqp->sq_worker_cv);
490 				mutex_exit(&sqp->sq_lock);
491 				return;
492 			}
493 		} else {
494 			ENQUEUE_CHAIN(sqp, mp, tail, cnt);
495 #ifdef DEBUG
496 			mp->b_tag = tag;
497 #endif
498 		}
499 		/*
500 		 * We are here because either we couldn't do inline
501 		 * processing (because something was already queued),
502 		 * or we had a chain of more than one packet,
503 		 * or something else arrived after we were done with
504 		 * inline processing.
505 		 */
506 		ASSERT(MUTEX_HELD(&sqp->sq_lock));
507 		ASSERT(sqp->sq_first != NULL);
508 		now = gethrtime();
509 		sqp->sq_drain(sqp, SQS_ENTER, now + squeue_drain_ns);
510 
511 		/*
512 		 * If we didn't do a complete drain, the worker
513 		 * thread was already signalled by squeue_drain.
514 		 * In case any control actions are pending, wake
515 		 * up the worker.
516 		 */
517 		sqp->sq_run = NULL;
518 		if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
519 			cv_signal(&sqp->sq_worker_cv);
520 		mutex_exit(&sqp->sq_lock);
521 		return;
522 	} else {
523 		/*
524 		 * We let a thread processing a squeue reenter only
525 		 * once. This helps the case of incoming connection
526 		 * where a SYN-ACK-ACK that triggers the conn_ind
527 		 * doesn't have to queue the packet if listener and
528 		 * eager are on the same squeue. Also helps the
529 		 * loopback connection where the two ends are bound
530 		 * to the same squeue (which is typical on single
531 		 * CPU machines).
532 		 *
533 		 * We let the thread reenter only once for the fear
534 		 * of stack getting blown with multiple traversal.
535 		 */
536 		connp = (conn_t *)mp->b_prev;
537 		if (!(sqp->sq_state & SQS_REENTER) &&
538 		    (process_flag != SQ_FILL) && (sqp->sq_first == NULL) &&
539 		    (sqp->sq_run == curthread) && (cnt == 1) &&
540 		    (connp->conn_on_sqp == B_FALSE)) {
541 			sqp->sq_state |= SQS_REENTER;
542 			mutex_exit(&sqp->sq_lock);
543 
544 			ASSERT(mp->b_prev != NULL);
545 			ASSERT(mp->b_queue != NULL);
546 
547 			mp->b_prev = NULL;
548 			proc = (sqproc_t)mp->b_queue;
549 			mp->b_queue = NULL;
550 
551 			/*
552 			 * Handle squeue switching. More details in the
553 			 * block comment at the top of the file
554 			 */
555 			if (connp->conn_sqp == sqp) {
556 				connp->conn_on_sqp = B_TRUE;
557 				DTRACE_PROBE3(squeue__proc__start, squeue_t *,
558 				    sqp, mblk_t *, mp, conn_t *, connp);
559 				(*proc)(connp, mp, sqp);
560 				DTRACE_PROBE2(squeue__proc__end, squeue_t *,
561 				    sqp, conn_t *, connp);
562 				connp->conn_on_sqp = B_FALSE;
563 				CONN_DEC_REF(connp);
564 			} else {
565 				SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc,
566 				    connp, SQ_FILL, SQTAG_SQUEUE_CHANGE);
567 			}
568 
569 			mutex_enter(&sqp->sq_lock);
570 			sqp->sq_state &= ~SQS_REENTER;
571 			mutex_exit(&sqp->sq_lock);
572 			return;
573 		}
574 
575 		/*
576 		 * Queue is already being processed or there is already
577 		 * one or more paquets on the queue. Enqueue the
578 		 * packet and wakeup the squeue worker thread if the
579 		 * squeue is not being processed.
580 		 */
581 #ifdef DEBUG
582 		mp->b_tag = tag;
583 #endif
584 
585 		ENQUEUE_CHAIN(sqp, mp, tail, cnt);
586 		if (!(sqp->sq_state & SQS_PROC)) {
587 			squeue_worker_wakeup(sqp);
588 			return;
589 		}
590 		/*
591 		 * In case any control actions are pending, wake
592 		 * up the worker.
593 		 */
594 		if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
595 			cv_signal(&sqp->sq_worker_cv);
596 		mutex_exit(&sqp->sq_lock);
597 		return;
598 	}
599 }
600 
601 /*
602  * PRIVATE FUNCTIONS
603  */
604 
605 static void
606 squeue_fire(void *arg)
607 {
608 	squeue_t	*sqp = arg;
609 	uint_t		state;
610 
611 	mutex_enter(&sqp->sq_lock);
612 
613 	state = sqp->sq_state;
614 	if (sqp->sq_tid == 0 && !(state & SQS_TMO_PROG)) {
615 		mutex_exit(&sqp->sq_lock);
616 		return;
617 	}
618 
619 	sqp->sq_tid = 0;
620 	/*
621 	 * The timeout fired before we got a chance to set it.
622 	 * Process it anyway but remove the SQS_TMO_PROG so that
623 	 * the guy trying to set the timeout knows that it has
624 	 * already been processed.
625 	 */
626 	if (state & SQS_TMO_PROG)
627 		sqp->sq_state &= ~SQS_TMO_PROG;
628 
629 	if (!(state & SQS_PROC)) {
630 		sqp->sq_awaken = lbolt;
631 		cv_signal(&sqp->sq_worker_cv);
632 	}
633 	mutex_exit(&sqp->sq_lock);
634 }
635 
636 static void
637 squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire)
638 {
639 	mblk_t		*mp;
640 	mblk_t 		*head;
641 	sqproc_t 	proc;
642 	conn_t		*connp;
643 	timeout_id_t 	tid;
644 	ill_rx_ring_t	*sq_rx_ring = sqp->sq_rx_ring;
645 	hrtime_t 	now;
646 	boolean_t	did_wakeup = B_FALSE;
647 	boolean_t	sq_poll_capable;
648 
649 	sq_poll_capable = (sqp->sq_state & SQS_POLL_CAPAB) != 0;
650 again:
651 	ASSERT(mutex_owned(&sqp->sq_lock));
652 	ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
653 	    SQS_POLL_QUIESCE_DONE)));
654 
655 	head = sqp->sq_first;
656 	sqp->sq_first = NULL;
657 	sqp->sq_last = NULL;
658 	sqp->sq_count = 0;
659 
660 	if ((tid = sqp->sq_tid) != 0)
661 		sqp->sq_tid = 0;
662 
663 	sqp->sq_state |= SQS_PROC | proc_type;
664 
665 
666 	/*
667 	 * We have backlog built up. Switch to polling mode if the
668 	 * device underneath allows it. Need to do it so that
669 	 * more packets don't come in and disturb us (by contending
670 	 * for sq_lock or higher priority thread preempting us).
671 	 *
672 	 * The worker thread is allowed to do active polling while we
673 	 * just disable the interrupts for drain by non worker (kernel
674 	 * or userland) threads so they can peacefully process the
675 	 * packets during time allocated to them.
676 	 */
677 	SQS_POLLING_ON(sqp, sq_poll_capable, sq_rx_ring);
678 	mutex_exit(&sqp->sq_lock);
679 
680 	if (tid != 0)
681 		(void) untimeout(tid);
682 
683 	while ((mp = head) != NULL) {
684 
685 		head = mp->b_next;
686 		mp->b_next = NULL;
687 
688 		proc = (sqproc_t)mp->b_queue;
689 		mp->b_queue = NULL;
690 		connp = (conn_t *)mp->b_prev;
691 		mp->b_prev = NULL;
692 
693 		/*
694 		 * Handle squeue switching. More details in the
695 		 * block comment at the top of the file
696 		 */
697 		if (connp->conn_sqp == sqp) {
698 			SQUEUE_DBG_SET(sqp, mp, proc, connp,
699 			    mp->b_tag);
700 			connp->conn_on_sqp = B_TRUE;
701 			DTRACE_PROBE3(squeue__proc__start, squeue_t *,
702 			    sqp, mblk_t *, mp, conn_t *, connp);
703 			(*proc)(connp, mp, sqp);
704 			DTRACE_PROBE2(squeue__proc__end, squeue_t *,
705 			    sqp, conn_t *, connp);
706 			connp->conn_on_sqp = B_FALSE;
707 			CONN_DEC_REF(connp);
708 		} else {
709 			SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, connp,
710 			    SQ_FILL, SQTAG_SQUEUE_CHANGE);
711 		}
712 	}
713 
714 	SQUEUE_DBG_CLEAR(sqp);
715 
716 	mutex_enter(&sqp->sq_lock);
717 
718 	/*
719 	 * Check if there is still work to do (either more arrived or timer
720 	 * expired). If we are the worker thread and we are polling capable,
721 	 * continue doing the work since no one else is around to do the
722 	 * work anyway (but signal the poll thread to retrieve some packets
723 	 * in the meanwhile). If we are not the worker thread, just
724 	 * signal the worker thread to take up the work if processing time
725 	 * has expired.
726 	 */
727 	if (sqp->sq_first != NULL) {
728 		/*
729 		 * Still more to process. If time quanta not expired, we
730 		 * should let the drain go on. The worker thread is allowed
731 		 * to drain as long as there is anything left.
732 		 */
733 		now = gethrtime();
734 		if ((now < expire) || (proc_type == SQS_WORKER)) {
735 			/*
736 			 * If time not expired or we are worker thread and
737 			 * this squeue is polling capable, continue to do
738 			 * the drain.
739 			 *
740 			 * We turn off interrupts for all userland threads
741 			 * doing drain but we do active polling only for
742 			 * worker thread.
743 			 */
744 			if (proc_type == SQS_WORKER)
745 				SQS_POLL_RING(sqp, sq_poll_capable);
746 			goto again;
747 		} else {
748 			did_wakeup = B_TRUE;
749 			sqp->sq_awaken = lbolt;
750 			cv_signal(&sqp->sq_worker_cv);
751 		}
752 	}
753 
754 	/*
755 	 * If the poll thread is already running, just return. The
756 	 * poll thread continues to hold the proc and will finish
757 	 * processing.
758 	 */
759 	if (sqp->sq_state & SQS_GET_PKTS) {
760 		ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
761 		    SQS_POLL_QUIESCE_DONE)));
762 		sqp->sq_state &= ~proc_type;
763 		return;
764 	}
765 
766 	/*
767 	 *
768 	 * If we are the worker thread and no work is left, send the poll
769 	 * thread down once more to see if something arrived. Otherwise,
770 	 * turn the interrupts back on and we are done.
771 	 */
772 	if ((proc_type == SQS_WORKER) &&
773 	    (sqp->sq_state & SQS_POLL_CAPAB)) {
774 		/*
775 		 * Do one last check to see if anything arrived
776 		 * in the NIC. We leave the SQS_PROC set to ensure
777 		 * that poll thread keeps the PROC and can decide
778 		 * if it needs to turn polling off or continue
779 		 * processing.
780 		 *
781 		 * If we drop the SQS_PROC here and poll thread comes
782 		 * up empty handed, it can not safely turn polling off
783 		 * since someone else could have acquired the PROC
784 		 * and started draining. The previously running poll
785 		 * thread and the current thread doing drain would end
786 		 * up in a race for turning polling on/off and more
787 		 * complex code would be required to deal with it.
788 		 *
789 		 * Its lot simpler for drain to hand the SQS_PROC to
790 		 * poll thread (if running) and let poll thread finish
791 		 * without worrying about racing with any other thread.
792 		 */
793 		ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
794 		    SQS_POLL_QUIESCE_DONE)));
795 		SQS_POLL_RING(sqp, sq_poll_capable);
796 		sqp->sq_state &= ~proc_type;
797 	} else {
798 		/*
799 		 * The squeue is either not capable of polling or
800 		 * poll thread already finished processing and didn't
801 		 * find anything. Since there is nothing queued and
802 		 * we already turn polling on (for all threads doing
803 		 * drain), we should turn polling off and relinquish
804 		 * the PROC.
805 		 */
806 		ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
807 		    SQS_POLL_QUIESCE_DONE)));
808 		SQS_POLLING_OFF(sqp, sq_poll_capable, sq_rx_ring);
809 		sqp->sq_state &= ~(SQS_PROC | proc_type);
810 		if (!did_wakeup && sqp->sq_first != NULL) {
811 			squeue_worker_wakeup(sqp);
812 			mutex_enter(&sqp->sq_lock);
813 		}
814 		/*
815 		 * If we are not the worker and there is a pending quiesce
816 		 * event, wake up the worker
817 		 */
818 		if ((proc_type != SQS_WORKER) &&
819 		    (sqp->sq_state & SQS_WORKER_THR_CONTROL))
820 			cv_signal(&sqp->sq_worker_cv);
821 	}
822 }
823 
824 /*
825  * Quiesce, Restart, or Cleanup of the squeue poll thread.
826  *
827  * Quiesce and Restart: After an squeue poll thread has been quiesced, it does
828  * not attempt to poll the underlying soft ring any more. The quiesce is
829  * triggered by the mac layer when it wants to quiesce a soft ring. Typically
830  * control operations such as changing the fanout of a NIC or VNIC (dladm
831  * setlinkprop) need to quiesce data flow before changing the wiring.
832  * The operation is done by the mac layer, but it calls back into IP to
833  * quiesce the soft ring. After completing the operation (say increase or
834  * decrease of the fanout) the mac layer then calls back into IP to restart
835  * the quiesced soft ring.
836  *
837  * Cleanup: This is triggered when the squeue binding to a soft ring is
838  * removed permanently. Typically interface plumb and unplumb would trigger
839  * this. It can also be triggered from the mac layer when a soft ring is
840  * being deleted say as the result of a fanout reduction. Since squeues are
841  * never deleted, the cleanup marks the squeue as fit for recycling and
842  * moves it to the zeroth squeue set.
843  */
844 static void
845 squeue_poll_thr_control(squeue_t *sqp)
846 {
847 	if (sqp->sq_state & SQS_POLL_THR_RESTART) {
848 		/* Restart implies a previous quiesce */
849 		ASSERT(sqp->sq_state & SQS_POLL_THR_QUIESCED);
850 		sqp->sq_state &= ~(SQS_POLL_THR_QUIESCED |
851 		    SQS_POLL_THR_RESTART);
852 		sqp->sq_state |= SQS_POLL_CAPAB;
853 		cv_signal(&sqp->sq_worker_cv);
854 		return;
855 	}
856 
857 	if (sqp->sq_state & SQS_POLL_THR_QUIESCE) {
858 		sqp->sq_state |= SQS_POLL_THR_QUIESCED;
859 		sqp->sq_state &= ~SQS_POLL_THR_QUIESCE;
860 		cv_signal(&sqp->sq_worker_cv);
861 		return;
862 	}
863 }
864 
865 /*
866  * POLLING Notes
867  *
868  * With polling mode, we want to do as much processing as we possibly can
869  * in worker thread context. The sweet spot is worker thread keeps doing
870  * work all the time in polling mode and writers etc. keep dumping packets
871  * to worker thread. Occassionally, we send the poll thread (running at
872  * lower priority to NIC to get the chain of packets to feed to worker).
873  * Sending the poll thread down to NIC is dependant on 3 criterions
874  *
875  * 1) Its always driven from squeue_drain and only if worker thread is
876  *	doing the drain.
877  * 2) We clear the backlog once and more packets arrived in between.
878  *	Before starting drain again, send the poll thread down if
879  *	the drain is being done by worker thread.
880  * 3) Before exiting the squeue_drain, if the poll thread is not already
881  *	working and we are the worker thread, try to poll one more time.
882  *
883  * For latency sake, we do allow any thread calling squeue_enter
884  * to process its packet provided:
885  *
886  * 1) Nothing is queued
887  * 2) If more packets arrived in between, the non worker thread are allowed
888  *	to do the drain till their time quanta expired provided SQS_GET_PKTS
889  *	wasn't set in between.
890  *
891  * Avoiding deadlocks with interrupts
892  * ==================================
893  *
894  * One of the big problem is that we can't send poll_thr down while holding
895  * the sq_lock since the thread can block. So we drop the sq_lock before
896  * calling sq_get_pkts(). We keep holding the SQS_PROC as long as the
897  * poll thread is running so that no other thread can acquire the
898  * perimeter in between. If the squeue_drain gets done (no more work
899  * left), it leaves the SQS_PROC set if poll thread is running.
900  */
901 
902 /*
903  * This is the squeue poll thread. In poll mode, it polls the underlying
904  * TCP softring and feeds packets into the squeue. The worker thread then
905  * drains the squeue. The poll thread also responds to control signals for
906  * quiesceing, restarting, or cleanup of an squeue. These are driven by
907  * control operations like plumb/unplumb or as a result of dynamic Rx ring
908  * related operations that are driven from the mac layer.
909  */
910 static void
911 squeue_polling_thread(squeue_t *sqp)
912 {
913 	kmutex_t *lock = &sqp->sq_lock;
914 	kcondvar_t *async = &sqp->sq_poll_cv;
915 	ip_mac_rx_t sq_get_pkts;
916 	ip_accept_t ip_accept;
917 	ill_rx_ring_t *sq_rx_ring;
918 	ill_t *sq_ill;
919 	mblk_t *head, *tail, *mp;
920 	uint_t cnt;
921 	void *sq_mac_handle;
922 	callb_cpr_t cprinfo;
923 	size_t bytes_to_pickup;
924 	uint32_t ctl_state;
925 
926 	CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_poll");
927 	mutex_enter(lock);
928 
929 	for (;;) {
930 		CALLB_CPR_SAFE_BEGIN(&cprinfo);
931 		cv_wait(async, lock);
932 		CALLB_CPR_SAFE_END(&cprinfo, lock);
933 
934 		ctl_state = sqp->sq_state & (SQS_POLL_THR_CONTROL |
935 		    SQS_POLL_THR_QUIESCED);
936 		if (ctl_state != 0) {
937 			/*
938 			 * If the squeue is quiesced, then wait for a control
939 			 * request. A quiesced squeue must not poll the
940 			 * underlying soft ring.
941 			 */
942 			if (ctl_state == SQS_POLL_THR_QUIESCED)
943 				continue;
944 			/*
945 			 * Act on control requests to quiesce, cleanup or
946 			 * restart an squeue
947 			 */
948 			squeue_poll_thr_control(sqp);
949 			continue;
950 		}
951 
952 		if (!(sqp->sq_state & SQS_POLL_CAPAB))
953 			continue;
954 
955 		ASSERT((sqp->sq_state &
956 		    (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) ==
957 		    (SQS_PROC|SQS_POLLING|SQS_GET_PKTS));
958 
959 poll_again:
960 		sq_rx_ring = sqp->sq_rx_ring;
961 		sq_get_pkts = sq_rx_ring->rr_rx;
962 		sq_mac_handle = sq_rx_ring->rr_rx_handle;
963 		ip_accept = sq_rx_ring->rr_ip_accept;
964 		sq_ill = sq_rx_ring->rr_ill;
965 		bytes_to_pickup = MAX_BYTES_TO_PICKUP;
966 		mutex_exit(lock);
967 		head = sq_get_pkts(sq_mac_handle, bytes_to_pickup);
968 		mp = NULL;
969 		if (head != NULL) {
970 			/*
971 			 * We got the packet chain from the mac layer. It
972 			 * would be nice to be able to process it inline
973 			 * for better performance but we need to give
974 			 * IP a chance to look at this chain to ensure
975 			 * that packets are really meant for this squeue
976 			 * and do the IP processing.
977 			 */
978 			mp = ip_accept(sq_ill, sq_rx_ring, sqp, head,
979 			    &tail, &cnt);
980 		}
981 		mutex_enter(lock);
982 		if (mp != NULL)
983 			ENQUEUE_CHAIN(sqp, mp, tail, cnt);
984 
985 		ASSERT((sqp->sq_state &
986 		    (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) ==
987 		    (SQS_PROC|SQS_POLLING|SQS_GET_PKTS));
988 
989 		if (sqp->sq_first != NULL && !(sqp->sq_state & SQS_WORKER)) {
990 			/*
991 			 * We have packets to process and worker thread
992 			 * is not running.  Check to see if poll thread is
993 			 * allowed to process. Let it do processing only if it
994 			 * picked up some packets from the NIC otherwise
995 			 * wakeup the worker thread.
996 			 */
997 			if (mp != NULL) {
998 				hrtime_t  now;
999 
1000 				now = gethrtime();
1001 				sqp->sq_run = curthread;
1002 				sqp->sq_drain(sqp, SQS_POLL_PROC, now +
1003 				    squeue_drain_ns);
1004 				sqp->sq_run = NULL;
1005 
1006 				if (sqp->sq_first == NULL)
1007 					goto poll_again;
1008 
1009 				/*
1010 				 * Couldn't do the entire drain because the
1011 				 * time limit expired, let the
1012 				 * worker thread take over.
1013 				 */
1014 			}
1015 
1016 			sqp->sq_awaken = lbolt;
1017 			/*
1018 			 * Put the SQS_PROC_HELD on so the worker
1019 			 * thread can distinguish where its called from. We
1020 			 * can remove the SQS_PROC flag here and turn off the
1021 			 * polling so that it wouldn't matter who gets the
1022 			 * processing but we get better performance this way
1023 			 * and save the cost of turn polling off and possibly
1024 			 * on again as soon as we start draining again.
1025 			 *
1026 			 * We can't remove the SQS_PROC flag without turning
1027 			 * polling off until we can guarantee that control
1028 			 * will return to squeue_drain immediately.
1029 			 */
1030 			sqp->sq_state |= SQS_PROC_HELD;
1031 			sqp->sq_state &= ~SQS_GET_PKTS;
1032 			cv_signal(&sqp->sq_worker_cv);
1033 		} else if (sqp->sq_first == NULL &&
1034 		    !(sqp->sq_state & SQS_WORKER)) {
1035 			/*
1036 			 * Nothing queued and worker thread not running.
1037 			 * Since we hold the proc, no other thread is
1038 			 * processing the squeue. This means that there
1039 			 * is no work to be done and nothing is queued
1040 			 * in squeue or in NIC. Turn polling off and go
1041 			 * back to interrupt mode.
1042 			 */
1043 			sqp->sq_state &= ~(SQS_PROC|SQS_GET_PKTS);
1044 			/* LINTED: constant in conditional context */
1045 			SQS_POLLING_OFF(sqp, B_TRUE, sq_rx_ring);
1046 		} else {
1047 			/*
1048 			 * Worker thread is already running. We don't need
1049 			 * to do anything. Indicate that poll thread is done.
1050 			 */
1051 			sqp->sq_state &= ~SQS_GET_PKTS;
1052 		}
1053 		if (sqp->sq_state & SQS_POLL_THR_CONTROL) {
1054 			/*
1055 			 * Act on control requests to quiesce, cleanup or
1056 			 * restart an squeue
1057 			 */
1058 			squeue_poll_thr_control(sqp);
1059 		}
1060 	}
1061 }
1062 
1063 /*
1064  * The squeue worker thread acts on any control requests to quiesce, cleanup
1065  * or restart an ill_rx_ring_t by calling this function. The worker thread
1066  * synchronizes with the squeue poll thread to complete the request and finally
1067  * wakes up the requestor when the request is completed.
1068  */
1069 static void
1070 squeue_worker_thr_control(squeue_t *sqp)
1071 {
1072 	ill_t	*ill;
1073 	ill_rx_ring_t	*rx_ring;
1074 
1075 	ASSERT(MUTEX_HELD(&sqp->sq_lock));
1076 
1077 	if (sqp->sq_state & SQS_POLL_RESTART) {
1078 		/* Restart implies a previous quiesce. */
1079 		ASSERT((sqp->sq_state & (SQS_PROC_HELD |
1080 		    SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)) ==
1081 		    (SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER));
1082 		/*
1083 		 * Request the squeue poll thread to restart and wait till
1084 		 * it actually restarts.
1085 		 */
1086 		sqp->sq_state &= ~SQS_POLL_QUIESCE_DONE;
1087 		sqp->sq_state |= SQS_POLL_THR_RESTART;
1088 		cv_signal(&sqp->sq_poll_cv);
1089 		while (sqp->sq_state & SQS_POLL_THR_QUIESCED)
1090 			cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1091 		sqp->sq_state &= ~(SQS_POLL_RESTART | SQS_PROC |
1092 		    SQS_WORKER);
1093 		/*
1094 		 * Signal any waiter that is waiting for the restart
1095 		 * to complete
1096 		 */
1097 		sqp->sq_state |= SQS_POLL_RESTART_DONE;
1098 		cv_signal(&sqp->sq_ctrlop_done_cv);
1099 		return;
1100 	}
1101 
1102 	if (sqp->sq_state & SQS_PROC_HELD) {
1103 		/* The squeue poll thread handed control to us */
1104 		ASSERT(sqp->sq_state & SQS_PROC);
1105 	}
1106 
1107 	/*
1108 	 * Prevent any other thread from processing the squeue
1109 	 * until we finish the control actions by setting SQS_PROC.
1110 	 * But allow ourself to reenter by setting SQS_WORKER
1111 	 */
1112 	sqp->sq_state |= (SQS_PROC | SQS_WORKER);
1113 
1114 	/* Signal the squeue poll thread and wait for it to quiesce itself */
1115 	if (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) {
1116 		sqp->sq_state |= SQS_POLL_THR_QUIESCE;
1117 		cv_signal(&sqp->sq_poll_cv);
1118 		while (!(sqp->sq_state & SQS_POLL_THR_QUIESCED))
1119 			cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1120 	}
1121 
1122 	rx_ring = sqp->sq_rx_ring;
1123 	ill = rx_ring->rr_ill;
1124 	/*
1125 	 * The lock hierarchy is as follows.
1126 	 * cpu_lock -> ill_lock -> sqset_lock -> sq_lock
1127 	 */
1128 	mutex_exit(&sqp->sq_lock);
1129 	mutex_enter(&ill->ill_lock);
1130 	mutex_enter(&sqp->sq_lock);
1131 
1132 	SQS_POLLING_OFF(sqp, (sqp->sq_state & SQS_POLL_CAPAB) != 0,
1133 	    sqp->sq_rx_ring);
1134 	sqp->sq_state &= ~(SQS_POLL_CAPAB | SQS_GET_PKTS | SQS_PROC_HELD);
1135 	if (sqp->sq_state & SQS_POLL_CLEANUP) {
1136 		/*
1137 		 * Disassociate this squeue from its ill_rx_ring_t.
1138 		 * The rr_sqp, sq_rx_ring fields are protected by the
1139 		 * corresponding squeue, ill_lock* and sq_lock. Holding any
1140 		 * of them will ensure that the ring to squeue mapping does
1141 		 * not change.
1142 		 */
1143 		ASSERT(!(sqp->sq_state & SQS_DEFAULT));
1144 
1145 		sqp->sq_rx_ring = NULL;
1146 		rx_ring->rr_sqp = NULL;
1147 
1148 		sqp->sq_state &= ~(SQS_POLL_CLEANUP | SQS_POLL_THR_QUIESCED |
1149 		    SQS_POLL_QUIESCE_DONE);
1150 		sqp->sq_ill = NULL;
1151 
1152 		rx_ring->rr_rx_handle = NULL;
1153 		rx_ring->rr_intr_handle = NULL;
1154 		rx_ring->rr_intr_enable = NULL;
1155 		rx_ring->rr_intr_disable = NULL;
1156 		sqp->sq_state |= SQS_POLL_CLEANUP_DONE;
1157 	} else {
1158 		sqp->sq_state &= ~SQS_POLL_QUIESCE;
1159 		sqp->sq_state |= SQS_POLL_QUIESCE_DONE;
1160 	}
1161 	/*
1162 	 * Signal any waiter that is waiting for the quiesce or cleanup
1163 	 * to complete and also wait for it to actually see and reset the
1164 	 * SQS_POLL_CLEANUP_DONE.
1165 	 */
1166 	cv_signal(&sqp->sq_ctrlop_done_cv);
1167 	mutex_exit(&ill->ill_lock);
1168 	if (sqp->sq_state & SQS_POLL_CLEANUP_DONE) {
1169 		cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1170 		sqp->sq_state &= ~(SQS_PROC | SQS_WORKER);
1171 	}
1172 }
1173 
1174 static void
1175 squeue_worker(squeue_t *sqp)
1176 {
1177 	kmutex_t *lock = &sqp->sq_lock;
1178 	kcondvar_t *async = &sqp->sq_worker_cv;
1179 	callb_cpr_t cprinfo;
1180 	hrtime_t now;
1181 
1182 	CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_worker");
1183 	mutex_enter(lock);
1184 
1185 	for (;;) {
1186 		for (;;) {
1187 			/*
1188 			 * If the poll thread has handed control to us
1189 			 * we need to break out of the wait.
1190 			 */
1191 			if (sqp->sq_state & SQS_PROC_HELD)
1192 				break;
1193 
1194 			/*
1195 			 * If the squeue is not being processed and we either
1196 			 * have messages to drain or some thread has signaled
1197 			 * some control activity we need to break
1198 			 */
1199 			if (!(sqp->sq_state & SQS_PROC) &&
1200 			    ((sqp->sq_state & SQS_WORKER_THR_CONTROL) ||
1201 			    (sqp->sq_first != NULL)))
1202 				break;
1203 
1204 			/*
1205 			 * If we have started some control action, then check
1206 			 * for the SQS_WORKER flag (since we don't
1207 			 * release the squeue) to make sure we own the squeue
1208 			 * and break out
1209 			 */
1210 			if ((sqp->sq_state & SQS_WORKER_THR_CONTROL) &&
1211 			    (sqp->sq_state & SQS_WORKER))
1212 				break;
1213 
1214 			CALLB_CPR_SAFE_BEGIN(&cprinfo);
1215 			cv_wait(async, lock);
1216 			CALLB_CPR_SAFE_END(&cprinfo, lock);
1217 		}
1218 		if (sqp->sq_state & SQS_WORKER_THR_CONTROL) {
1219 			squeue_worker_thr_control(sqp);
1220 			continue;
1221 		}
1222 		ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
1223 		    SQS_POLL_CLEANUP_DONE | SQS_POLL_QUIESCE_DONE |
1224 		    SQS_WORKER_THR_CONTROL | SQS_POLL_THR_CONTROL)));
1225 
1226 		if (sqp->sq_state & SQS_PROC_HELD)
1227 			sqp->sq_state &= ~SQS_PROC_HELD;
1228 
1229 		now = gethrtime();
1230 		sqp->sq_run = curthread;
1231 		sqp->sq_drain(sqp, SQS_WORKER, now +  squeue_drain_ns);
1232 		sqp->sq_run = NULL;
1233 	}
1234 }
1235 
1236 uintptr_t *
1237 squeue_getprivate(squeue_t *sqp, sqprivate_t p)
1238 {
1239 	ASSERT(p < SQPRIVATE_MAX);
1240 
1241 	return (&sqp->sq_private[p]);
1242 }
1243 
1244 /* ARGSUSED */
1245 void
1246 squeue_wakeup_conn(void *arg, mblk_t *mp, void *arg2)
1247 {
1248 	conn_t *connp = (conn_t *)arg;
1249 	squeue_t *sqp = connp->conn_sqp;
1250 
1251 	/*
1252 	 * Mark the squeue as paused before waking up the thread stuck
1253 	 * in squeue_synch_enter().
1254 	 */
1255 	mutex_enter(&sqp->sq_lock);
1256 	sqp->sq_state |= SQS_PAUSE;
1257 
1258 	/*
1259 	 * Notify the thread that it's OK to proceed; that is done by
1260 	 * clearing the MSGWAITSYNC flag. The synch thread will free the mblk.
1261 	 */
1262 	ASSERT(mp->b_flag & MSGWAITSYNC);
1263 	mp->b_flag &= ~MSGWAITSYNC;
1264 	cv_broadcast(&connp->conn_sq_cv);
1265 
1266 	/*
1267 	 * We are doing something on behalf of another thread, so we have to
1268 	 * pause and wait until it finishes.
1269 	 */
1270 	while (sqp->sq_state & SQS_PAUSE) {
1271 		cv_wait(&sqp->sq_synch_cv, &sqp->sq_lock);
1272 	}
1273 	mutex_exit(&sqp->sq_lock);
1274 }
1275 
1276 /* ARGSUSED */
1277 int
1278 squeue_synch_enter(squeue_t *sqp, void *arg, uint8_t tag)
1279 {
1280 	conn_t *connp = (conn_t *)arg;
1281 
1282 	mutex_enter(&sqp->sq_lock);
1283 	if (sqp->sq_first == NULL && !(sqp->sq_state & SQS_PROC)) {
1284 		/*
1285 		 * We are OK to proceed if the squeue is empty, and
1286 		 * no one owns the squeue.
1287 		 *
1288 		 * The caller won't own the squeue as this is called from the
1289 		 * application.
1290 		 */
1291 		ASSERT(sqp->sq_run == NULL);
1292 
1293 		sqp->sq_state |= SQS_PROC;
1294 		sqp->sq_run = curthread;
1295 		mutex_exit(&sqp->sq_lock);
1296 
1297 #if SQUEUE_DEBUG
1298 		sqp->sq_curmp = NULL;
1299 		sqp->sq_curproc = NULL;
1300 		sqp->sq_connp = connp;
1301 #endif
1302 		connp->conn_on_sqp = B_TRUE;
1303 		return (0);
1304 	} else {
1305 		mblk_t  *mp;
1306 
1307 		mp = allocb(0, BPRI_MED);
1308 		if (mp == NULL) {
1309 			mutex_exit(&sqp->sq_lock);
1310 			return (ENOMEM);
1311 		}
1312 
1313 		/*
1314 		 * We mark the mblk as awaiting synchronous squeue access
1315 		 * by setting the MSGWAITSYNC flag. Once squeue_wakeup_conn
1316 		 * fires, MSGWAITSYNC is cleared, at which point we know we
1317 		 * have exclusive access.
1318 		 */
1319 		mp->b_flag |= MSGWAITSYNC;
1320 
1321 		CONN_INC_REF(connp);
1322 		SET_SQUEUE(mp, squeue_wakeup_conn, connp);
1323 		ENQUEUE_CHAIN(sqp, mp, mp, 1);
1324 
1325 		ASSERT(sqp->sq_run != curthread);
1326 
1327 		/* Wait until the enqueued mblk get processed. */
1328 		while (mp->b_flag & MSGWAITSYNC)
1329 			cv_wait(&connp->conn_sq_cv, &sqp->sq_lock);
1330 		mutex_exit(&sqp->sq_lock);
1331 
1332 		freeb(mp);
1333 
1334 		return (0);
1335 	}
1336 }
1337 
1338 /* ARGSUSED */
1339 void
1340 squeue_synch_exit(squeue_t *sqp, void *arg)
1341 {
1342 	conn_t	*connp = (conn_t *)arg;
1343 
1344 	mutex_enter(&sqp->sq_lock);
1345 	if (sqp->sq_run == curthread) {
1346 		ASSERT(sqp->sq_state & SQS_PROC);
1347 
1348 		sqp->sq_state &= ~SQS_PROC;
1349 		sqp->sq_run = NULL;
1350 		connp->conn_on_sqp = B_FALSE;
1351 
1352 		if (sqp->sq_first == NULL) {
1353 			mutex_exit(&sqp->sq_lock);
1354 		} else {
1355 			/*
1356 			 * If this was a normal thread, then it would
1357 			 * (most likely) continue processing the pending
1358 			 * requests. Since the just completed operation
1359 			 * was executed synchronously, the thread should
1360 			 * not be delayed. To compensate, wake up the
1361 			 * worker thread right away when there are outstanding
1362 			 * requests.
1363 			 */
1364 			sqp->sq_awaken = lbolt;
1365 			cv_signal(&sqp->sq_worker_cv);
1366 			mutex_exit(&sqp->sq_lock);
1367 		}
1368 	} else {
1369 		/*
1370 		 * The caller doesn't own the squeue, clear the SQS_PAUSE flag,
1371 		 * and wake up the squeue owner, such that owner can continue
1372 		 * processing.
1373 		 */
1374 		ASSERT(sqp->sq_state & SQS_PAUSE);
1375 		sqp->sq_state &= ~SQS_PAUSE;
1376 
1377 		/* There should be only one thread blocking on sq_synch_cv. */
1378 		cv_signal(&sqp->sq_synch_cv);
1379 		mutex_exit(&sqp->sq_lock);
1380 	}
1381 }
1382