xref: /titanic_52/usr/src/uts/common/inet/squeue.c (revision 9e86db79b7d1bbc5f2f04e99954cbd5eae0e22bb)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 /*
27  * Squeues: General purpose serialization mechanism
28  * ------------------------------------------------
29  *
30  * Background:
31  * -----------
32  *
33  * This is a general purpose high-performance serialization mechanism
34  * currently used by TCP/IP. It is implement by means of a per CPU queue,
35  * a worker thread and a polling thread with are bound to the CPU
36  * associated with the squeue. The squeue is strictly FIFO for both read
37  * and write side and only one thread can process it at any given time.
38  * The design goal of squeue was to offer a very high degree of
39  * parallelization (on a per H/W execution pipeline basis) with at
40  * most one queuing.
41  *
42  * The modules needing protection typically calls squeue_enter() or
43  * squeue_enter_chain() routine as soon as a thread enter the module
44  * from either direction. For each packet, the processing function
45  * and argument is stored in the mblk itself. When the packet is ready
46  * to be processed, the squeue retrieves the stored function and calls
47  * it with the supplied argument and the pointer to the packet itself.
48  * The called function can assume that no other thread is processing
49  * the squeue when it is executing.
50  *
51  * Squeue/connection binding:
52  * --------------------------
53  *
54  * TCP/IP uses an IP classifier in conjunction with squeue where specific
55  * connections are assigned to specific squeue (based on various policies),
56  * at the connection creation time. Once assigned, the connection to
57  * squeue mapping is never changed and all future packets for that
58  * connection are processed on that squeue. The connection ("conn") to
59  * squeue mapping is stored in "conn_t" member "conn_sqp".
60  *
61  * Since the processing of the connection cuts across multiple layers
62  * but still allows packets for different connnection to be processed on
63  * other CPU/squeues, squeues are also termed as "Vertical Perimeter" or
64  * "Per Connection Vertical Perimeter".
65  *
66  * Processing Model:
67  * -----------------
68  *
69  * Squeue doesn't necessary processes packets with its own worker thread.
70  * The callers can pick if they just want to queue the packet, process
71  * their packet if nothing is queued or drain and process. The first two
72  * modes are typically employed when the packet was generated while
73  * already doing the processing behind the squeue and last mode (drain
74  * and process) is typically employed when the thread is entering squeue
75  * for the first time. The squeue still imposes a finite time limit
76  * for which a external thread can do processing after which it switches
77  * processing to its own worker thread.
78  *
79  * Once created, squeues are never deleted. Hence squeue pointers are
80  * always valid. This means that functions outside the squeue can still
81  * refer safely to conn_sqp and their is no need for ref counts.
82  *
83  * Only a thread executing in the squeue can change the squeue of the
84  * connection. It does so by calling a squeue framework function to do this.
85  * After changing the squeue, the thread must leave the squeue. It must not
86  * continue to execute any code that needs squeue protection.
87  *
88  * The squeue framework, after entering the squeue, checks if the current
89  * squeue matches the conn_sqp. If the check fails, the packet is delivered
90  * to right squeue.
91  *
92  * Polling Model:
93  * --------------
94  *
95  * Squeues can control the rate of packet arrival into itself from the
96  * NIC or specific Rx ring within a NIC. As part of capability negotiation
97  * between IP and MAC layer, squeue are created for each TCP soft ring
98  * (or TCP Rx ring - to be implemented in future). As part of this
99  * negotiation, squeues get a cookie for underlying soft ring or Rx
100  * ring, a function to turn off incoming packets and a function to call
101  * to poll for packets. This helps schedule the receive side packet
102  * processing so that queue backlog doesn't build up and packet processing
103  * doesn't keep getting disturbed by high priority interrupts. As part
104  * of this mode, as soon as a backlog starts building, squeue turns off
105  * the interrupts and switches to poll mode. In poll mode, when poll
106  * thread goes down to retrieve packets, it retrieves them in the form of
107  * a chain which improves performance even more. As the squeue/softring
108  * system gets more packets, it gets more efficient by switching to
109  * polling more often and dealing with larger packet chains.
110  *
111  */
112 
113 #include <sys/types.h>
114 #include <sys/cmn_err.h>
115 #include <sys/debug.h>
116 #include <sys/kmem.h>
117 #include <sys/cpuvar.h>
118 #include <sys/condvar_impl.h>
119 #include <sys/systm.h>
120 #include <sys/callb.h>
121 #include <sys/sdt.h>
122 #include <sys/ddi.h>
123 #include <sys/sunddi.h>
124 
125 #include <inet/ipclassifier.h>
126 #include <inet/udp_impl.h>
127 
128 #include <sys/squeue_impl.h>
129 
130 static void squeue_fire(void *);
131 static void squeue_drain(squeue_t *, uint_t, hrtime_t);
132 static void squeue_worker(squeue_t *sqp);
133 static void squeue_polling_thread(squeue_t *sqp);
134 
135 kmem_cache_t *squeue_cache;
136 
137 #define	SQUEUE_MSEC_TO_NSEC 1000000
138 
139 int squeue_drain_ms = 20;
140 int squeue_workerwait_ms = 0;
141 
142 /* The values above converted to ticks or nano seconds */
143 static int squeue_drain_ns = 0;
144 static int squeue_workerwait_tick = 0;
145 
146 #define	MAX_BYTES_TO_PICKUP	150000
147 
148 #define	ENQUEUE_CHAIN(sqp, mp, tail, cnt) {			\
149 	/*							\
150 	 * Enqueue our mblk chain.				\
151 	 */							\
152 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
153 								\
154 	if ((sqp)->sq_last != NULL)				\
155 		(sqp)->sq_last->b_next = (mp);			\
156 	else							\
157 		(sqp)->sq_first = (mp);				\
158 	(sqp)->sq_last = (tail);				\
159 	(sqp)->sq_count += (cnt);				\
160 	ASSERT((sqp)->sq_count > 0);				\
161 	DTRACE_PROBE4(squeue__enqueuechain, squeue_t *, sqp,	\
162 		mblk_t *, mp, mblk_t *, tail, int, cnt);	\
163 								\
164 }
165 
166 /*
167  * Blank the receive ring (in this case it is the soft ring). When
168  * blanked, the soft ring will not send any more packets up.
169  * Blanking may not succeed when there is a CPU already in the soft
170  * ring sending packets up. In that case, SQS_POLLING will not be
171  * set.
172  */
173 #define	SQS_POLLING_ON(sqp, sq_poll_capable, rx_ring) {		\
174 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
175 	if (sq_poll_capable) {					\
176 		ASSERT(rx_ring != NULL);			\
177 		ASSERT(sqp->sq_state & SQS_POLL_CAPAB);		\
178 		if (!(sqp->sq_state & SQS_POLLING)) {		\
179 			if (rx_ring->rr_intr_disable(rx_ring->rr_intr_handle)) \
180 				sqp->sq_state |= SQS_POLLING;	\
181 		}						\
182 	}							\
183 }
184 
185 #define	SQS_POLLING_OFF(sqp, sq_poll_capable, rx_ring) {	\
186 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
187 	if (sq_poll_capable) {					\
188 		ASSERT(rx_ring != NULL);			\
189 		ASSERT(sqp->sq_state & SQS_POLL_CAPAB);		\
190 		if (sqp->sq_state & SQS_POLLING) {		\
191 			sqp->sq_state &= ~SQS_POLLING;		\
192 			rx_ring->rr_intr_enable(rx_ring->rr_intr_handle); \
193 		}						\
194 	}							\
195 }
196 
197 /* Wakeup poll thread only if SQS_POLLING is set */
198 #define	SQS_POLL_RING(sqp) {			\
199 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
200 	if (sqp->sq_state & SQS_POLLING) {			\
201 		ASSERT(sqp->sq_state & SQS_POLL_CAPAB);		\
202 		if (!(sqp->sq_state & SQS_GET_PKTS)) {		\
203 			sqp->sq_state |= SQS_GET_PKTS;		\
204 			cv_signal(&sqp->sq_poll_cv);		\
205 		}						\
206 	}							\
207 }
208 
209 #ifdef DEBUG
210 #define	SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) {		\
211 	(sqp)->sq_curmp = (mp);					\
212 	(sqp)->sq_curproc = (proc);				\
213 	(sqp)->sq_connp = (connp);				\
214 	(mp)->b_tag = (sqp)->sq_tag = (tag);			\
215 }
216 
217 #define	SQUEUE_DBG_CLEAR(sqp)	{				\
218 	(sqp)->sq_curmp = NULL;					\
219 	(sqp)->sq_curproc = NULL;				\
220 	(sqp)->sq_connp = NULL;					\
221 }
222 #else
223 #define	SQUEUE_DBG_SET(sqp, mp, proc, connp, tag)
224 #define	SQUEUE_DBG_CLEAR(sqp)
225 #endif
226 
227 void
228 squeue_init(void)
229 {
230 	squeue_cache = kmem_cache_create("squeue_cache",
231 	    sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0);
232 
233 	squeue_drain_ns = squeue_drain_ms * SQUEUE_MSEC_TO_NSEC;
234 	squeue_workerwait_tick = MSEC_TO_TICK_ROUNDUP(squeue_workerwait_ms);
235 }
236 
237 /* ARGSUSED */
238 squeue_t *
239 squeue_create(clock_t wait, pri_t pri)
240 {
241 	squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP);
242 
243 	bzero(sqp, sizeof (squeue_t));
244 	sqp->sq_bind = PBIND_NONE;
245 	sqp->sq_priority = pri;
246 	sqp->sq_wait = MSEC_TO_TICK(wait);
247 	sqp->sq_worker = thread_create(NULL, 0, squeue_worker,
248 	    sqp, 0, &p0, TS_RUN, pri);
249 
250 	sqp->sq_poll_thr = thread_create(NULL, 0, squeue_polling_thread,
251 	    sqp, 0, &p0, TS_RUN, pri);
252 
253 	sqp->sq_enter = squeue_enter;
254 	sqp->sq_drain = squeue_drain;
255 
256 	return (sqp);
257 }
258 
259 /*
260  * Bind squeue worker thread to the specified CPU, given by CPU id.
261  * If the CPU id  value is -1, bind the worker thread to the value
262  * specified in sq_bind field. If a thread is already bound to a
263  * different CPU, unbind it from the old CPU and bind to the new one.
264  */
265 
266 void
267 squeue_bind(squeue_t *sqp, processorid_t bind)
268 {
269 	mutex_enter(&sqp->sq_lock);
270 	ASSERT(sqp->sq_bind != PBIND_NONE || bind != PBIND_NONE);
271 	ASSERT(MUTEX_HELD(&cpu_lock));
272 
273 	if (sqp->sq_state & SQS_BOUND) {
274 		if (sqp->sq_bind == bind) {
275 			mutex_exit(&sqp->sq_lock);
276 			return;
277 		}
278 		thread_affinity_clear(sqp->sq_worker);
279 	} else {
280 		sqp->sq_state |= SQS_BOUND;
281 	}
282 
283 	if (bind != PBIND_NONE)
284 		sqp->sq_bind = bind;
285 
286 	thread_affinity_set(sqp->sq_worker, sqp->sq_bind);
287 	mutex_exit(&sqp->sq_lock);
288 }
289 
290 void
291 squeue_unbind(squeue_t *sqp)
292 {
293 	mutex_enter(&sqp->sq_lock);
294 	if (!(sqp->sq_state & SQS_BOUND)) {
295 		mutex_exit(&sqp->sq_lock);
296 		return;
297 	}
298 
299 	sqp->sq_state &= ~SQS_BOUND;
300 	thread_affinity_clear(sqp->sq_worker);
301 	mutex_exit(&sqp->sq_lock);
302 }
303 
304 void
305 squeue_worker_wakeup(squeue_t *sqp)
306 {
307 	timeout_id_t tid = (sqp)->sq_tid;
308 
309 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));
310 
311 	if (sqp->sq_wait == 0) {
312 		ASSERT(tid == 0);
313 		ASSERT(!(sqp->sq_state & SQS_TMO_PROG));
314 		sqp->sq_awaken = lbolt;
315 		cv_signal(&sqp->sq_worker_cv);
316 		mutex_exit(&sqp->sq_lock);
317 		return;
318 	}
319 
320 	/*
321 	 * Queue isn't being processed, so take
322 	 * any post enqueue actions needed before leaving.
323 	 */
324 	if (tid != 0) {
325 		/*
326 		 * Waiting for an enter() to process mblk(s).
327 		 */
328 		clock_t	waited = lbolt - sqp->sq_awaken;
329 
330 		if (TICK_TO_MSEC(waited) >= sqp->sq_wait) {
331 			/*
332 			 * Times up and have a worker thread
333 			 * waiting for work, so schedule it.
334 			 */
335 			sqp->sq_tid = 0;
336 			sqp->sq_awaken = lbolt;
337 			cv_signal(&sqp->sq_worker_cv);
338 			mutex_exit(&sqp->sq_lock);
339 			(void) untimeout(tid);
340 			return;
341 		}
342 		mutex_exit(&sqp->sq_lock);
343 		return;
344 	} else if (sqp->sq_state & SQS_TMO_PROG) {
345 		mutex_exit(&sqp->sq_lock);
346 		return;
347 	} else {
348 		clock_t	wait = sqp->sq_wait;
349 		/*
350 		 * Wait up to sqp->sq_wait ms for an
351 		 * enter() to process this queue. We
352 		 * don't want to contend on timeout locks
353 		 * with sq_lock held for performance reasons,
354 		 * so drop the sq_lock before calling timeout
355 		 * but we need to check if timeout is required
356 		 * after re acquiring the sq_lock. Once
357 		 * the sq_lock is dropped, someone else could
358 		 * have processed the packet or the timeout could
359 		 * have already fired.
360 		 */
361 		sqp->sq_state |= SQS_TMO_PROG;
362 		mutex_exit(&sqp->sq_lock);
363 		tid = timeout(squeue_fire, sqp, wait);
364 		mutex_enter(&sqp->sq_lock);
365 		/* Check again if we still need the timeout */
366 		if (((sqp->sq_state & (SQS_PROC|SQS_TMO_PROG)) ==
367 		    SQS_TMO_PROG) && (sqp->sq_tid == 0) &&
368 		    (sqp->sq_first != NULL)) {
369 				sqp->sq_state &= ~SQS_TMO_PROG;
370 				sqp->sq_tid = tid;
371 				mutex_exit(&sqp->sq_lock);
372 				return;
373 		} else {
374 			if (sqp->sq_state & SQS_TMO_PROG) {
375 				sqp->sq_state &= ~SQS_TMO_PROG;
376 				mutex_exit(&sqp->sq_lock);
377 				(void) untimeout(tid);
378 			} else {
379 				/*
380 				 * The timer fired before we could
381 				 * reacquire the sq_lock. squeue_fire
382 				 * removes the SQS_TMO_PROG flag
383 				 * and we don't need to	do anything
384 				 * else.
385 				 */
386 				mutex_exit(&sqp->sq_lock);
387 			}
388 		}
389 	}
390 
391 	ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
392 }
393 
394 /*
395  * squeue_enter() - enter squeue sqp with mblk mp (which can be
396  * a chain), while tail points to the end and cnt in number of
397  * mblks in the chain.
398  *
399  * For a chain of single packet (i.e. mp == tail), go through the
400  * fast path if no one is processing the squeue and nothing is queued.
401  *
402  * The proc and arg for each mblk is already stored in the mblk in
403  * appropriate places.
404  *
405  * The process_flag specifies if we are allowed to process the mblk
406  * and drain in the entering thread context. If process_flag is
407  * SQ_FILL, then we just queue the mblk and return (after signaling
408  * the worker thread if no one else is processing the squeue).
409  */
410 /* ARGSUSED */
411 void
412 squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt,
413     int process_flag, uint8_t tag)
414 {
415 	conn_t		*connp;
416 	sqproc_t	proc;
417 	hrtime_t	now;
418 
419 	ASSERT(sqp != NULL);
420 	ASSERT(mp != NULL);
421 	ASSERT(tail != NULL);
422 	ASSERT(cnt > 0);
423 	ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
424 
425 	mutex_enter(&sqp->sq_lock);
426 
427 	/*
428 	 * Try to process the packet if SQ_FILL flag is not set and
429 	 * we are allowed to process the squeue. The SQ_NODRAIN is
430 	 * ignored if the packet chain consists of more than 1 packet.
431 	 */
432 	if (!(sqp->sq_state & SQS_PROC) && ((process_flag == SQ_PROCESS) ||
433 	    (process_flag == SQ_NODRAIN && sqp->sq_first == NULL))) {
434 		/*
435 		 * See if anything is already queued. If we are the
436 		 * first packet, do inline processing else queue the
437 		 * packet and do the drain.
438 		 */
439 		if (sqp->sq_first == NULL && cnt == 1) {
440 			/*
441 			 * Fast-path, ok to process and nothing queued.
442 			 */
443 			sqp->sq_state |= (SQS_PROC|SQS_FAST);
444 			sqp->sq_run = curthread;
445 			mutex_exit(&sqp->sq_lock);
446 
447 			/*
448 			 * We are the chain of 1 packet so
449 			 * go through this fast path.
450 			 */
451 			ASSERT(mp->b_prev != NULL);
452 			ASSERT(mp->b_queue != NULL);
453 			connp = (conn_t *)mp->b_prev;
454 			mp->b_prev = NULL;
455 			proc = (sqproc_t)mp->b_queue;
456 			mp->b_queue = NULL;
457 			ASSERT(proc != NULL && connp != NULL);
458 			ASSERT(mp->b_next == NULL);
459 
460 			/*
461 			 * Handle squeue switching. More details in the
462 			 * block comment at the top of the file
463 			 */
464 			if (connp->conn_sqp == sqp) {
465 				SQUEUE_DBG_SET(sqp, mp, proc, connp,
466 				    tag);
467 				connp->conn_on_sqp = B_TRUE;
468 				DTRACE_PROBE3(squeue__proc__start, squeue_t *,
469 				    sqp, mblk_t *, mp, conn_t *, connp);
470 				(*proc)(connp, mp, sqp);
471 				DTRACE_PROBE2(squeue__proc__end, squeue_t *,
472 				    sqp, conn_t *, connp);
473 				connp->conn_on_sqp = B_FALSE;
474 				SQUEUE_DBG_CLEAR(sqp);
475 				CONN_DEC_REF(connp);
476 			} else {
477 				SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc,
478 				    connp, SQ_FILL, SQTAG_SQUEUE_CHANGE);
479 			}
480 			ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
481 			mutex_enter(&sqp->sq_lock);
482 			sqp->sq_state &= ~(SQS_PROC|SQS_FAST);
483 			sqp->sq_run = NULL;
484 			if (sqp->sq_first == NULL ||
485 			    process_flag == SQ_NODRAIN) {
486 				if (sqp->sq_first != NULL) {
487 					squeue_worker_wakeup(sqp);
488 					return;
489 				}
490 				/*
491 				 * We processed inline our packet and nothing
492 				 * new has arrived. We are done. In case any
493 				 * control actions are pending, wake up the
494 				 * worker.
495 				 */
496 				if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
497 					cv_signal(&sqp->sq_worker_cv);
498 				mutex_exit(&sqp->sq_lock);
499 				return;
500 			}
501 		} else {
502 			ENQUEUE_CHAIN(sqp, mp, tail, cnt);
503 #ifdef DEBUG
504 			mp->b_tag = tag;
505 #endif
506 		}
507 		/*
508 		 * We are here because either we couldn't do inline
509 		 * processing (because something was already queued),
510 		 * or we had a chain of more than one packet,
511 		 * or something else arrived after we were done with
512 		 * inline processing.
513 		 */
514 		ASSERT(MUTEX_HELD(&sqp->sq_lock));
515 		ASSERT(sqp->sq_first != NULL);
516 		now = gethrtime();
517 		sqp->sq_drain(sqp, SQS_ENTER, now + squeue_drain_ns);
518 
519 		/*
520 		 * If we didn't do a complete drain, the worker
521 		 * thread was already signalled by squeue_drain.
522 		 * In case any control actions are pending, wake
523 		 * up the worker.
524 		 */
525 		sqp->sq_run = NULL;
526 		if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
527 			cv_signal(&sqp->sq_worker_cv);
528 		mutex_exit(&sqp->sq_lock);
529 		return;
530 	} else {
531 		/*
532 		 * We let a thread processing a squeue reenter only
533 		 * once. This helps the case of incoming connection
534 		 * where a SYN-ACK-ACK that triggers the conn_ind
535 		 * doesn't have to queue the packet if listener and
536 		 * eager are on the same squeue. Also helps the
537 		 * loopback connection where the two ends are bound
538 		 * to the same squeue (which is typical on single
539 		 * CPU machines).
540 		 *
541 		 * We let the thread reenter only once for the fear
542 		 * of stack getting blown with multiple traversal.
543 		 */
544 		connp = (conn_t *)mp->b_prev;
545 		if (!(sqp->sq_state & SQS_REENTER) &&
546 		    (process_flag != SQ_FILL) && (sqp->sq_first == NULL) &&
547 		    (sqp->sq_run == curthread) && (cnt == 1) &&
548 		    (connp->conn_on_sqp == B_FALSE)) {
549 			sqp->sq_state |= SQS_REENTER;
550 			mutex_exit(&sqp->sq_lock);
551 
552 			ASSERT(mp->b_prev != NULL);
553 			ASSERT(mp->b_queue != NULL);
554 
555 			mp->b_prev = NULL;
556 			proc = (sqproc_t)mp->b_queue;
557 			mp->b_queue = NULL;
558 
559 			/*
560 			 * Handle squeue switching. More details in the
561 			 * block comment at the top of the file
562 			 */
563 			if (connp->conn_sqp == sqp) {
564 				connp->conn_on_sqp = B_TRUE;
565 				DTRACE_PROBE3(squeue__proc__start, squeue_t *,
566 				    sqp, mblk_t *, mp, conn_t *, connp);
567 				(*proc)(connp, mp, sqp);
568 				DTRACE_PROBE2(squeue__proc__end, squeue_t *,
569 				    sqp, conn_t *, connp);
570 				connp->conn_on_sqp = B_FALSE;
571 				CONN_DEC_REF(connp);
572 			} else {
573 				SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc,
574 				    connp, SQ_FILL, SQTAG_SQUEUE_CHANGE);
575 			}
576 
577 			mutex_enter(&sqp->sq_lock);
578 			sqp->sq_state &= ~SQS_REENTER;
579 			mutex_exit(&sqp->sq_lock);
580 			return;
581 		}
582 
583 		/*
584 		 * Queue is already being processed or there is already
585 		 * one or more paquets on the queue. Enqueue the
586 		 * packet and wakeup the squeue worker thread if the
587 		 * squeue is not being processed.
588 		 */
589 #ifdef DEBUG
590 		mp->b_tag = tag;
591 #endif
592 
593 		ENQUEUE_CHAIN(sqp, mp, tail, cnt);
594 		if (!(sqp->sq_state & SQS_PROC)) {
595 			squeue_worker_wakeup(sqp);
596 			return;
597 		}
598 		/*
599 		 * In case any control actions are pending, wake
600 		 * up the worker.
601 		 */
602 		if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
603 			cv_signal(&sqp->sq_worker_cv);
604 		mutex_exit(&sqp->sq_lock);
605 		return;
606 	}
607 }
608 
609 /*
610  * PRIVATE FUNCTIONS
611  */
612 
613 static void
614 squeue_fire(void *arg)
615 {
616 	squeue_t	*sqp = arg;
617 	uint_t		state;
618 
619 	mutex_enter(&sqp->sq_lock);
620 
621 	state = sqp->sq_state;
622 	if (sqp->sq_tid == 0 && !(state & SQS_TMO_PROG)) {
623 		mutex_exit(&sqp->sq_lock);
624 		return;
625 	}
626 
627 	sqp->sq_tid = 0;
628 	/*
629 	 * The timeout fired before we got a chance to set it.
630 	 * Process it anyway but remove the SQS_TMO_PROG so that
631 	 * the guy trying to set the timeout knows that it has
632 	 * already been processed.
633 	 */
634 	if (state & SQS_TMO_PROG)
635 		sqp->sq_state &= ~SQS_TMO_PROG;
636 
637 	if (!(state & SQS_PROC)) {
638 		sqp->sq_awaken = lbolt;
639 		cv_signal(&sqp->sq_worker_cv);
640 	}
641 	mutex_exit(&sqp->sq_lock);
642 }
643 
644 static void
645 squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire)
646 {
647 	mblk_t		*mp;
648 	mblk_t 		*head;
649 	sqproc_t 	proc;
650 	conn_t		*connp;
651 	timeout_id_t 	tid;
652 	ill_rx_ring_t	*sq_rx_ring = sqp->sq_rx_ring;
653 	hrtime_t 	now;
654 	boolean_t	did_wakeup = B_FALSE;
655 	boolean_t	sq_poll_capable;
656 
657 	sq_poll_capable = (sqp->sq_state & SQS_POLL_CAPAB) != 0;
658 again:
659 	ASSERT(mutex_owned(&sqp->sq_lock));
660 	ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
661 	    SQS_POLL_QUIESCE_DONE)));
662 
663 	head = sqp->sq_first;
664 	sqp->sq_first = NULL;
665 	sqp->sq_last = NULL;
666 	sqp->sq_count = 0;
667 
668 	if ((tid = sqp->sq_tid) != 0)
669 		sqp->sq_tid = 0;
670 
671 	sqp->sq_state |= SQS_PROC | proc_type;
672 
673 	/*
674 	 * We have backlog built up. Switch to polling mode if the
675 	 * device underneath allows it. Need to do it so that
676 	 * more packets don't come in and disturb us (by contending
677 	 * for sq_lock or higher priority thread preempting us).
678 	 *
679 	 * The worker thread is allowed to do active polling while we
680 	 * just disable the interrupts for drain by non worker (kernel
681 	 * or userland) threads so they can peacefully process the
682 	 * packets during time allocated to them.
683 	 */
684 	SQS_POLLING_ON(sqp, sq_poll_capable, sq_rx_ring);
685 	mutex_exit(&sqp->sq_lock);
686 
687 	if (tid != 0)
688 		(void) untimeout(tid);
689 
690 	while ((mp = head) != NULL) {
691 
692 		head = mp->b_next;
693 		mp->b_next = NULL;
694 
695 		proc = (sqproc_t)mp->b_queue;
696 		mp->b_queue = NULL;
697 		connp = (conn_t *)mp->b_prev;
698 		mp->b_prev = NULL;
699 
700 		/*
701 		 * Handle squeue switching. More details in the
702 		 * block comment at the top of the file
703 		 */
704 		if (connp->conn_sqp == sqp) {
705 			SQUEUE_DBG_SET(sqp, mp, proc, connp,
706 			    mp->b_tag);
707 			connp->conn_on_sqp = B_TRUE;
708 			DTRACE_PROBE3(squeue__proc__start, squeue_t *,
709 			    sqp, mblk_t *, mp, conn_t *, connp);
710 			(*proc)(connp, mp, sqp);
711 			DTRACE_PROBE2(squeue__proc__end, squeue_t *,
712 			    sqp, conn_t *, connp);
713 			connp->conn_on_sqp = B_FALSE;
714 			CONN_DEC_REF(connp);
715 		} else {
716 			SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, connp,
717 			    SQ_FILL, SQTAG_SQUEUE_CHANGE);
718 		}
719 	}
720 
721 	SQUEUE_DBG_CLEAR(sqp);
722 
723 	mutex_enter(&sqp->sq_lock);
724 
725 	/*
726 	 * Check if there is still work to do (either more arrived or timer
727 	 * expired). If we are the worker thread and we are polling capable,
728 	 * continue doing the work since no one else is around to do the
729 	 * work anyway (but signal the poll thread to retrieve some packets
730 	 * in the meanwhile). If we are not the worker thread, just
731 	 * signal the worker thread to take up the work if processing time
732 	 * has expired.
733 	 */
734 	if (sqp->sq_first != NULL) {
735 		/*
736 		 * Still more to process. If time quanta not expired, we
737 		 * should let the drain go on. The worker thread is allowed
738 		 * to drain as long as there is anything left.
739 		 */
740 		now = gethrtime();
741 		if ((now < expire) || (proc_type == SQS_WORKER)) {
742 			/*
743 			 * If time not expired or we are worker thread and
744 			 * this squeue is polling capable, continue to do
745 			 * the drain.
746 			 *
747 			 * We turn off interrupts for all userland threads
748 			 * doing drain but we do active polling only for
749 			 * worker thread.
750 			 *
751 			 * Calling SQS_POLL_RING() even in the case of
752 			 * SQS_POLLING_ON() not succeeding is ok as
753 			 * SQS_POLL_RING() will not wake up poll thread
754 			 * if SQS_POLLING bit is not set.
755 			 */
756 			if (proc_type == SQS_WORKER)
757 				SQS_POLL_RING(sqp);
758 			goto again;
759 		} else {
760 			did_wakeup = B_TRUE;
761 			sqp->sq_awaken = lbolt;
762 			cv_signal(&sqp->sq_worker_cv);
763 		}
764 	}
765 
766 	/*
767 	 * If the poll thread is already running, just return. The
768 	 * poll thread continues to hold the proc and will finish
769 	 * processing.
770 	 */
771 	if (sqp->sq_state & SQS_GET_PKTS) {
772 		ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
773 		    SQS_POLL_QUIESCE_DONE)));
774 		sqp->sq_state &= ~proc_type;
775 		return;
776 	}
777 
778 	/*
779 	 *
780 	 * If we are the worker thread and no work is left, send the poll
781 	 * thread down once more to see if something arrived. Otherwise,
782 	 * turn the interrupts back on and we are done.
783 	 */
784 	if ((proc_type == SQS_WORKER) && (sqp->sq_state & SQS_POLLING)) {
785 		/*
786 		 * Do one last check to see if anything arrived
787 		 * in the NIC. We leave the SQS_PROC set to ensure
788 		 * that poll thread keeps the PROC and can decide
789 		 * if it needs to turn polling off or continue
790 		 * processing.
791 		 *
792 		 * If we drop the SQS_PROC here and poll thread comes
793 		 * up empty handed, it can not safely turn polling off
794 		 * since someone else could have acquired the PROC
795 		 * and started draining. The previously running poll
796 		 * thread and the current thread doing drain would end
797 		 * up in a race for turning polling on/off and more
798 		 * complex code would be required to deal with it.
799 		 *
800 		 * Its lot simpler for drain to hand the SQS_PROC to
801 		 * poll thread (if running) and let poll thread finish
802 		 * without worrying about racing with any other thread.
803 		 */
804 		ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
805 		    SQS_POLL_QUIESCE_DONE)));
806 		SQS_POLL_RING(sqp);
807 		sqp->sq_state &= ~proc_type;
808 	} else {
809 		/*
810 		 * The squeue is either not capable of polling or the
811 		 * attempt to blank (i.e., turn SQS_POLLING_ON()) was
812 		 * unsuccessful or poll thread already finished
813 		 * processing and didn't find anything. Since there
814 		 * is nothing queued and we already turn polling on
815 		 * (for all threads doing drain), we should turn
816 		 * polling off and relinquish the PROC.
817 		 */
818 		ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
819 		    SQS_POLL_QUIESCE_DONE)));
820 		SQS_POLLING_OFF(sqp, sq_poll_capable, sq_rx_ring);
821 		sqp->sq_state &= ~(SQS_PROC | proc_type);
822 		if (!did_wakeup && sqp->sq_first != NULL) {
823 			squeue_worker_wakeup(sqp);
824 			mutex_enter(&sqp->sq_lock);
825 		}
826 		/*
827 		 * If we are not the worker and there is a pending quiesce
828 		 * event, wake up the worker
829 		 */
830 		if ((proc_type != SQS_WORKER) &&
831 		    (sqp->sq_state & SQS_WORKER_THR_CONTROL))
832 			cv_signal(&sqp->sq_worker_cv);
833 	}
834 }
835 
836 /*
837  * Quiesce, Restart, or Cleanup of the squeue poll thread.
838  *
839  * Quiesce and Restart: After an squeue poll thread has been quiesced, it does
840  * not attempt to poll the underlying soft ring any more. The quiesce is
841  * triggered by the mac layer when it wants to quiesce a soft ring. Typically
842  * control operations such as changing the fanout of a NIC or VNIC (dladm
843  * setlinkprop) need to quiesce data flow before changing the wiring.
844  * The operation is done by the mac layer, but it calls back into IP to
845  * quiesce the soft ring. After completing the operation (say increase or
846  * decrease of the fanout) the mac layer then calls back into IP to restart
847  * the quiesced soft ring.
848  *
849  * Cleanup: This is triggered when the squeue binding to a soft ring is
850  * removed permanently. Typically interface plumb and unplumb would trigger
851  * this. It can also be triggered from the mac layer when a soft ring is
852  * being deleted say as the result of a fanout reduction. Since squeues are
853  * never deleted, the cleanup marks the squeue as fit for recycling and
854  * moves it to the zeroth squeue set.
855  */
856 static void
857 squeue_poll_thr_control(squeue_t *sqp)
858 {
859 	if (sqp->sq_state & SQS_POLL_THR_RESTART) {
860 		/* Restart implies a previous quiesce */
861 		ASSERT(sqp->sq_state & SQS_POLL_THR_QUIESCED);
862 		sqp->sq_state &= ~(SQS_POLL_THR_QUIESCED |
863 		    SQS_POLL_THR_RESTART);
864 		sqp->sq_state |= SQS_POLL_CAPAB;
865 		cv_signal(&sqp->sq_worker_cv);
866 		return;
867 	}
868 
869 	if (sqp->sq_state & SQS_POLL_THR_QUIESCE) {
870 		sqp->sq_state |= SQS_POLL_THR_QUIESCED;
871 		sqp->sq_state &= ~SQS_POLL_THR_QUIESCE;
872 		cv_signal(&sqp->sq_worker_cv);
873 		return;
874 	}
875 }
876 
877 /*
878  * POLLING Notes
879  *
880  * With polling mode, we want to do as much processing as we possibly can
881  * in worker thread context. The sweet spot is worker thread keeps doing
882  * work all the time in polling mode and writers etc. keep dumping packets
883  * to worker thread. Occassionally, we send the poll thread (running at
884  * lower priority to NIC to get the chain of packets to feed to worker).
885  * Sending the poll thread down to NIC is dependant on 3 criterions
886  *
887  * 1) Its always driven from squeue_drain and only if worker thread is
888  *	doing the drain.
889  * 2) We clear the backlog once and more packets arrived in between.
890  *	Before starting drain again, send the poll thread down if
891  *	the drain is being done by worker thread.
892  * 3) Before exiting the squeue_drain, if the poll thread is not already
893  *	working and we are the worker thread, try to poll one more time.
894  *
895  * For latency sake, we do allow any thread calling squeue_enter
896  * to process its packet provided:
897  *
898  * 1) Nothing is queued
899  * 2) If more packets arrived in between, the non worker thread are allowed
900  *	to do the drain till their time quanta expired provided SQS_GET_PKTS
901  *	wasn't set in between.
902  *
903  * Avoiding deadlocks with interrupts
904  * ==================================
905  *
906  * One of the big problem is that we can't send poll_thr down while holding
907  * the sq_lock since the thread can block. So we drop the sq_lock before
908  * calling sq_get_pkts(). We keep holding the SQS_PROC as long as the
909  * poll thread is running so that no other thread can acquire the
910  * perimeter in between. If the squeue_drain gets done (no more work
911  * left), it leaves the SQS_PROC set if poll thread is running.
912  */
913 
914 /*
915  * This is the squeue poll thread. In poll mode, it polls the underlying
916  * TCP softring and feeds packets into the squeue. The worker thread then
917  * drains the squeue. The poll thread also responds to control signals for
918  * quiesceing, restarting, or cleanup of an squeue. These are driven by
919  * control operations like plumb/unplumb or as a result of dynamic Rx ring
920  * related operations that are driven from the mac layer.
921  */
922 static void
923 squeue_polling_thread(squeue_t *sqp)
924 {
925 	kmutex_t *lock = &sqp->sq_lock;
926 	kcondvar_t *async = &sqp->sq_poll_cv;
927 	ip_mac_rx_t sq_get_pkts;
928 	ip_accept_t ip_accept;
929 	ill_rx_ring_t *sq_rx_ring;
930 	ill_t *sq_ill;
931 	mblk_t *head, *tail, *mp;
932 	uint_t cnt;
933 	void *sq_mac_handle;
934 	callb_cpr_t cprinfo;
935 	size_t bytes_to_pickup;
936 	uint32_t ctl_state;
937 
938 	CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_poll");
939 	mutex_enter(lock);
940 
941 	for (;;) {
942 		CALLB_CPR_SAFE_BEGIN(&cprinfo);
943 		cv_wait(async, lock);
944 		CALLB_CPR_SAFE_END(&cprinfo, lock);
945 
946 		ctl_state = sqp->sq_state & (SQS_POLL_THR_CONTROL |
947 		    SQS_POLL_THR_QUIESCED);
948 		if (ctl_state != 0) {
949 			/*
950 			 * If the squeue is quiesced, then wait for a control
951 			 * request. A quiesced squeue must not poll the
952 			 * underlying soft ring.
953 			 */
954 			if (ctl_state == SQS_POLL_THR_QUIESCED)
955 				continue;
956 			/*
957 			 * Act on control requests to quiesce, cleanup or
958 			 * restart an squeue
959 			 */
960 			squeue_poll_thr_control(sqp);
961 			continue;
962 		}
963 
964 		if (!(sqp->sq_state & SQS_POLL_CAPAB))
965 			continue;
966 
967 		ASSERT((sqp->sq_state &
968 		    (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) ==
969 		    (SQS_PROC|SQS_POLLING|SQS_GET_PKTS));
970 
971 poll_again:
972 		sq_rx_ring = sqp->sq_rx_ring;
973 		sq_get_pkts = sq_rx_ring->rr_rx;
974 		sq_mac_handle = sq_rx_ring->rr_rx_handle;
975 		ip_accept = sq_rx_ring->rr_ip_accept;
976 		sq_ill = sq_rx_ring->rr_ill;
977 		bytes_to_pickup = MAX_BYTES_TO_PICKUP;
978 		mutex_exit(lock);
979 		head = sq_get_pkts(sq_mac_handle, bytes_to_pickup);
980 		mp = NULL;
981 		if (head != NULL) {
982 			/*
983 			 * We got the packet chain from the mac layer. It
984 			 * would be nice to be able to process it inline
985 			 * for better performance but we need to give
986 			 * IP a chance to look at this chain to ensure
987 			 * that packets are really meant for this squeue
988 			 * and do the IP processing.
989 			 */
990 			mp = ip_accept(sq_ill, sq_rx_ring, sqp, head,
991 			    &tail, &cnt);
992 		}
993 		mutex_enter(lock);
994 		if (mp != NULL)
995 			ENQUEUE_CHAIN(sqp, mp, tail, cnt);
996 
997 		ASSERT((sqp->sq_state &
998 		    (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) ==
999 		    (SQS_PROC|SQS_POLLING|SQS_GET_PKTS));
1000 
1001 		if (sqp->sq_first != NULL && !(sqp->sq_state & SQS_WORKER)) {
1002 			/*
1003 			 * We have packets to process and worker thread
1004 			 * is not running.  Check to see if poll thread is
1005 			 * allowed to process. Let it do processing only if it
1006 			 * picked up some packets from the NIC otherwise
1007 			 * wakeup the worker thread.
1008 			 */
1009 			if (mp != NULL) {
1010 				hrtime_t  now;
1011 
1012 				now = gethrtime();
1013 				sqp->sq_run = curthread;
1014 				sqp->sq_drain(sqp, SQS_POLL_PROC, now +
1015 				    squeue_drain_ns);
1016 				sqp->sq_run = NULL;
1017 
1018 				if (sqp->sq_first == NULL)
1019 					goto poll_again;
1020 
1021 				/*
1022 				 * Couldn't do the entire drain because the
1023 				 * time limit expired, let the
1024 				 * worker thread take over.
1025 				 */
1026 			}
1027 
1028 			sqp->sq_awaken = lbolt;
1029 			/*
1030 			 * Put the SQS_PROC_HELD on so the worker
1031 			 * thread can distinguish where its called from. We
1032 			 * can remove the SQS_PROC flag here and turn off the
1033 			 * polling so that it wouldn't matter who gets the
1034 			 * processing but we get better performance this way
1035 			 * and save the cost of turn polling off and possibly
1036 			 * on again as soon as we start draining again.
1037 			 *
1038 			 * We can't remove the SQS_PROC flag without turning
1039 			 * polling off until we can guarantee that control
1040 			 * will return to squeue_drain immediately.
1041 			 */
1042 			sqp->sq_state |= SQS_PROC_HELD;
1043 			sqp->sq_state &= ~SQS_GET_PKTS;
1044 			cv_signal(&sqp->sq_worker_cv);
1045 		} else if (sqp->sq_first == NULL &&
1046 		    !(sqp->sq_state & SQS_WORKER)) {
1047 			/*
1048 			 * Nothing queued and worker thread not running.
1049 			 * Since we hold the proc, no other thread is
1050 			 * processing the squeue. This means that there
1051 			 * is no work to be done and nothing is queued
1052 			 * in squeue or in NIC. Turn polling off and go
1053 			 * back to interrupt mode.
1054 			 */
1055 			sqp->sq_state &= ~(SQS_PROC|SQS_GET_PKTS);
1056 			/* LINTED: constant in conditional context */
1057 			SQS_POLLING_OFF(sqp, B_TRUE, sq_rx_ring);
1058 
1059 			/*
1060 			 * If there is a pending control operation
1061 			 * wake up the worker, since it is currently
1062 			 * not running.
1063 			 */
1064 			if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
1065 				cv_signal(&sqp->sq_worker_cv);
1066 		} else {
1067 			/*
1068 			 * Worker thread is already running. We don't need
1069 			 * to do anything. Indicate that poll thread is done.
1070 			 */
1071 			sqp->sq_state &= ~SQS_GET_PKTS;
1072 		}
1073 		if (sqp->sq_state & SQS_POLL_THR_CONTROL) {
1074 			/*
1075 			 * Act on control requests to quiesce, cleanup or
1076 			 * restart an squeue
1077 			 */
1078 			squeue_poll_thr_control(sqp);
1079 		}
1080 	}
1081 }
1082 
1083 /*
1084  * The squeue worker thread acts on any control requests to quiesce, cleanup
1085  * or restart an ill_rx_ring_t by calling this function. The worker thread
1086  * synchronizes with the squeue poll thread to complete the request and finally
1087  * wakes up the requestor when the request is completed.
1088  */
1089 static void
1090 squeue_worker_thr_control(squeue_t *sqp)
1091 {
1092 	ill_t	*ill;
1093 	ill_rx_ring_t	*rx_ring;
1094 
1095 	ASSERT(MUTEX_HELD(&sqp->sq_lock));
1096 
1097 	if (sqp->sq_state & SQS_POLL_RESTART) {
1098 		/* Restart implies a previous quiesce. */
1099 		ASSERT((sqp->sq_state & (SQS_PROC_HELD |
1100 		    SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)) ==
1101 		    (SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER));
1102 		/*
1103 		 * Request the squeue poll thread to restart and wait till
1104 		 * it actually restarts.
1105 		 */
1106 		sqp->sq_state &= ~SQS_POLL_QUIESCE_DONE;
1107 		sqp->sq_state |= SQS_POLL_THR_RESTART;
1108 		cv_signal(&sqp->sq_poll_cv);
1109 		while (sqp->sq_state & SQS_POLL_THR_QUIESCED)
1110 			cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1111 		sqp->sq_state &= ~(SQS_POLL_RESTART | SQS_PROC |
1112 		    SQS_WORKER);
1113 		/*
1114 		 * Signal any waiter that is waiting for the restart
1115 		 * to complete
1116 		 */
1117 		sqp->sq_state |= SQS_POLL_RESTART_DONE;
1118 		cv_signal(&sqp->sq_ctrlop_done_cv);
1119 		return;
1120 	}
1121 
1122 	if (sqp->sq_state & SQS_PROC_HELD) {
1123 		/* The squeue poll thread handed control to us */
1124 		ASSERT(sqp->sq_state & SQS_PROC);
1125 	}
1126 
1127 	/*
1128 	 * Prevent any other thread from processing the squeue
1129 	 * until we finish the control actions by setting SQS_PROC.
1130 	 * But allow ourself to reenter by setting SQS_WORKER
1131 	 */
1132 	sqp->sq_state |= (SQS_PROC | SQS_WORKER);
1133 
1134 	/* Signal the squeue poll thread and wait for it to quiesce itself */
1135 	if (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) {
1136 		sqp->sq_state |= SQS_POLL_THR_QUIESCE;
1137 		cv_signal(&sqp->sq_poll_cv);
1138 		while (!(sqp->sq_state & SQS_POLL_THR_QUIESCED))
1139 			cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1140 	}
1141 
1142 	rx_ring = sqp->sq_rx_ring;
1143 	ill = rx_ring->rr_ill;
1144 	/*
1145 	 * The lock hierarchy is as follows.
1146 	 * cpu_lock -> ill_lock -> sqset_lock -> sq_lock
1147 	 */
1148 	mutex_exit(&sqp->sq_lock);
1149 	mutex_enter(&ill->ill_lock);
1150 	mutex_enter(&sqp->sq_lock);
1151 
1152 	SQS_POLLING_OFF(sqp, (sqp->sq_state & SQS_POLL_CAPAB) != 0,
1153 	    sqp->sq_rx_ring);
1154 	sqp->sq_state &= ~(SQS_POLL_CAPAB | SQS_GET_PKTS | SQS_PROC_HELD);
1155 	if (sqp->sq_state & SQS_POLL_CLEANUP) {
1156 		/*
1157 		 * Disassociate this squeue from its ill_rx_ring_t.
1158 		 * The rr_sqp, sq_rx_ring fields are protected by the
1159 		 * corresponding squeue, ill_lock* and sq_lock. Holding any
1160 		 * of them will ensure that the ring to squeue mapping does
1161 		 * not change.
1162 		 */
1163 		ASSERT(!(sqp->sq_state & SQS_DEFAULT));
1164 
1165 		sqp->sq_rx_ring = NULL;
1166 		rx_ring->rr_sqp = NULL;
1167 
1168 		sqp->sq_state &= ~(SQS_POLL_CLEANUP | SQS_POLL_THR_QUIESCED |
1169 		    SQS_POLL_QUIESCE_DONE);
1170 		sqp->sq_ill = NULL;
1171 
1172 		rx_ring->rr_rx_handle = NULL;
1173 		rx_ring->rr_intr_handle = NULL;
1174 		rx_ring->rr_intr_enable = NULL;
1175 		rx_ring->rr_intr_disable = NULL;
1176 		sqp->sq_state |= SQS_POLL_CLEANUP_DONE;
1177 	} else {
1178 		sqp->sq_state &= ~SQS_POLL_QUIESCE;
1179 		sqp->sq_state |= SQS_POLL_QUIESCE_DONE;
1180 	}
1181 	/*
1182 	 * Signal any waiter that is waiting for the quiesce or cleanup
1183 	 * to complete and also wait for it to actually see and reset the
1184 	 * SQS_POLL_CLEANUP_DONE.
1185 	 */
1186 	cv_signal(&sqp->sq_ctrlop_done_cv);
1187 	mutex_exit(&ill->ill_lock);
1188 	if (sqp->sq_state & SQS_POLL_CLEANUP_DONE) {
1189 		cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1190 		sqp->sq_state &= ~(SQS_PROC | SQS_WORKER);
1191 	}
1192 }
1193 
1194 static void
1195 squeue_worker(squeue_t *sqp)
1196 {
1197 	kmutex_t *lock = &sqp->sq_lock;
1198 	kcondvar_t *async = &sqp->sq_worker_cv;
1199 	callb_cpr_t cprinfo;
1200 	hrtime_t now;
1201 
1202 	CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_worker");
1203 	mutex_enter(lock);
1204 
1205 	for (;;) {
1206 		for (;;) {
1207 			/*
1208 			 * If the poll thread has handed control to us
1209 			 * we need to break out of the wait.
1210 			 */
1211 			if (sqp->sq_state & SQS_PROC_HELD)
1212 				break;
1213 
1214 			/*
1215 			 * If the squeue is not being processed and we either
1216 			 * have messages to drain or some thread has signaled
1217 			 * some control activity we need to break
1218 			 */
1219 			if (!(sqp->sq_state & SQS_PROC) &&
1220 			    ((sqp->sq_state & SQS_WORKER_THR_CONTROL) ||
1221 			    (sqp->sq_first != NULL)))
1222 				break;
1223 
1224 			/*
1225 			 * If we have started some control action, then check
1226 			 * for the SQS_WORKER flag (since we don't
1227 			 * release the squeue) to make sure we own the squeue
1228 			 * and break out
1229 			 */
1230 			if ((sqp->sq_state & SQS_WORKER_THR_CONTROL) &&
1231 			    (sqp->sq_state & SQS_WORKER))
1232 				break;
1233 
1234 			CALLB_CPR_SAFE_BEGIN(&cprinfo);
1235 			cv_wait(async, lock);
1236 			CALLB_CPR_SAFE_END(&cprinfo, lock);
1237 		}
1238 		if (sqp->sq_state & SQS_WORKER_THR_CONTROL) {
1239 			squeue_worker_thr_control(sqp);
1240 			continue;
1241 		}
1242 		ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
1243 		    SQS_POLL_CLEANUP_DONE | SQS_POLL_QUIESCE_DONE |
1244 		    SQS_WORKER_THR_CONTROL | SQS_POLL_THR_CONTROL)));
1245 
1246 		if (sqp->sq_state & SQS_PROC_HELD)
1247 			sqp->sq_state &= ~SQS_PROC_HELD;
1248 
1249 		now = gethrtime();
1250 		sqp->sq_run = curthread;
1251 		sqp->sq_drain(sqp, SQS_WORKER, now +  squeue_drain_ns);
1252 		sqp->sq_run = NULL;
1253 	}
1254 }
1255 
1256 uintptr_t *
1257 squeue_getprivate(squeue_t *sqp, sqprivate_t p)
1258 {
1259 	ASSERT(p < SQPRIVATE_MAX);
1260 
1261 	return (&sqp->sq_private[p]);
1262 }
1263 
1264 /* ARGSUSED */
1265 void
1266 squeue_wakeup_conn(void *arg, mblk_t *mp, void *arg2)
1267 {
1268 	conn_t *connp = (conn_t *)arg;
1269 	squeue_t *sqp = connp->conn_sqp;
1270 
1271 	/*
1272 	 * Mark the squeue as paused before waking up the thread stuck
1273 	 * in squeue_synch_enter().
1274 	 */
1275 	mutex_enter(&sqp->sq_lock);
1276 	sqp->sq_state |= SQS_PAUSE;
1277 
1278 	/*
1279 	 * Notify the thread that it's OK to proceed; that is done by
1280 	 * clearing the MSGWAITSYNC flag. The synch thread will free the mblk.
1281 	 */
1282 	ASSERT(mp->b_flag & MSGWAITSYNC);
1283 	mp->b_flag &= ~MSGWAITSYNC;
1284 	cv_broadcast(&connp->conn_sq_cv);
1285 
1286 	/*
1287 	 * We are doing something on behalf of another thread, so we have to
1288 	 * pause and wait until it finishes.
1289 	 */
1290 	while (sqp->sq_state & SQS_PAUSE) {
1291 		cv_wait(&sqp->sq_synch_cv, &sqp->sq_lock);
1292 	}
1293 	mutex_exit(&sqp->sq_lock);
1294 }
1295 
1296 int
1297 squeue_synch_enter(squeue_t *sqp, conn_t *connp, mblk_t *use_mp)
1298 {
1299 	mutex_enter(&sqp->sq_lock);
1300 	if (sqp->sq_first == NULL && !(sqp->sq_state & SQS_PROC)) {
1301 		/*
1302 		 * We are OK to proceed if the squeue is empty, and
1303 		 * no one owns the squeue.
1304 		 *
1305 		 * The caller won't own the squeue as this is called from the
1306 		 * application.
1307 		 */
1308 		ASSERT(sqp->sq_run == NULL);
1309 
1310 		sqp->sq_state |= SQS_PROC;
1311 		sqp->sq_run = curthread;
1312 		mutex_exit(&sqp->sq_lock);
1313 
1314 #if SQUEUE_DEBUG
1315 		sqp->sq_curmp = NULL;
1316 		sqp->sq_curproc = NULL;
1317 		sqp->sq_connp = connp;
1318 #endif
1319 		connp->conn_on_sqp = B_TRUE;
1320 		return (0);
1321 	} else {
1322 		mblk_t  *mp;
1323 
1324 		mp = (use_mp == NULL) ? allocb(0, BPRI_MED) : use_mp;
1325 		if (mp == NULL) {
1326 			mutex_exit(&sqp->sq_lock);
1327 			return (ENOMEM);
1328 		}
1329 
1330 		/*
1331 		 * We mark the mblk as awaiting synchronous squeue access
1332 		 * by setting the MSGWAITSYNC flag. Once squeue_wakeup_conn
1333 		 * fires, MSGWAITSYNC is cleared, at which point we know we
1334 		 * have exclusive access.
1335 		 */
1336 		mp->b_flag |= MSGWAITSYNC;
1337 
1338 		CONN_INC_REF(connp);
1339 		SET_SQUEUE(mp, squeue_wakeup_conn, connp);
1340 		ENQUEUE_CHAIN(sqp, mp, mp, 1);
1341 
1342 		ASSERT(sqp->sq_run != curthread);
1343 
1344 		/* Wait until the enqueued mblk get processed. */
1345 		while (mp->b_flag & MSGWAITSYNC)
1346 			cv_wait(&connp->conn_sq_cv, &sqp->sq_lock);
1347 		mutex_exit(&sqp->sq_lock);
1348 
1349 		if (use_mp == NULL)
1350 			freeb(mp);
1351 
1352 		return (0);
1353 	}
1354 }
1355 
1356 void
1357 squeue_synch_exit(squeue_t *sqp, conn_t *connp)
1358 {
1359 	mutex_enter(&sqp->sq_lock);
1360 	if (sqp->sq_run == curthread) {
1361 		ASSERT(sqp->sq_state & SQS_PROC);
1362 
1363 		sqp->sq_state &= ~SQS_PROC;
1364 		sqp->sq_run = NULL;
1365 		connp->conn_on_sqp = B_FALSE;
1366 
1367 		if (sqp->sq_first == NULL) {
1368 			mutex_exit(&sqp->sq_lock);
1369 		} else {
1370 			/*
1371 			 * If this was a normal thread, then it would
1372 			 * (most likely) continue processing the pending
1373 			 * requests. Since the just completed operation
1374 			 * was executed synchronously, the thread should
1375 			 * not be delayed. To compensate, wake up the
1376 			 * worker thread right away when there are outstanding
1377 			 * requests.
1378 			 */
1379 			sqp->sq_awaken = lbolt;
1380 			cv_signal(&sqp->sq_worker_cv);
1381 			mutex_exit(&sqp->sq_lock);
1382 		}
1383 	} else {
1384 		/*
1385 		 * The caller doesn't own the squeue, clear the SQS_PAUSE flag,
1386 		 * and wake up the squeue owner, such that owner can continue
1387 		 * processing.
1388 		 */
1389 		ASSERT(sqp->sq_state & SQS_PAUSE);
1390 		sqp->sq_state &= ~SQS_PAUSE;
1391 
1392 		/* There should be only one thread blocking on sq_synch_cv. */
1393 		cv_signal(&sqp->sq_synch_cv);
1394 		mutex_exit(&sqp->sq_lock);
1395 	}
1396 }
1397