xref: /titanic_51/usr/src/uts/common/inet/squeue.c (revision 6319b0c72e1681f79a5f33dfa976a63eedd4a2a4)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
23  */
24 
25 /*
26  * Squeues: General purpose serialization mechanism
27  * ------------------------------------------------
28  *
29  * Background:
30  * -----------
31  *
32  * This is a general purpose high-performance serialization mechanism
33  * currently used by TCP/IP. It is implement by means of a per CPU queue,
34  * a worker thread and a polling thread with are bound to the CPU
35  * associated with the squeue. The squeue is strictly FIFO for both read
36  * and write side and only one thread can process it at any given time.
37  * The design goal of squeue was to offer a very high degree of
38  * parallelization (on a per H/W execution pipeline basis) with at
39  * most one queuing.
40  *
41  * The modules needing protection typically calls SQUEUE_ENTER_ONE() or
42  * SQUEUE_ENTER() macro as soon as a thread enter the module
43  * from either direction. For each packet, the processing function
44  * and argument is stored in the mblk itself. When the packet is ready
45  * to be processed, the squeue retrieves the stored function and calls
46  * it with the supplied argument and the pointer to the packet itself.
47  * The called function can assume that no other thread is processing
48  * the squeue when it is executing.
49  *
50  * Squeue/connection binding:
51  * --------------------------
52  *
53  * TCP/IP uses an IP classifier in conjunction with squeue where specific
54  * connections are assigned to specific squeue (based on various policies),
55  * at the connection creation time. Once assigned, the connection to
56  * squeue mapping is never changed and all future packets for that
57  * connection are processed on that squeue. The connection ("conn") to
58  * squeue mapping is stored in "conn_t" member "conn_sqp".
59  *
60  * Since the processing of the connection cuts across multiple layers
61  * but still allows packets for different connnection to be processed on
62  * other CPU/squeues, squeues are also termed as "Vertical Perimeter" or
63  * "Per Connection Vertical Perimeter".
64  *
65  * Processing Model:
66  * -----------------
67  *
68  * Squeue doesn't necessary processes packets with its own worker thread.
69  * The callers can pick if they just want to queue the packet, process
70  * their packet if nothing is queued or drain and process. The first two
71  * modes are typically employed when the packet was generated while
72  * already doing the processing behind the squeue and last mode (drain
73  * and process) is typically employed when the thread is entering squeue
74  * for the first time. The squeue still imposes a finite time limit
75  * for which a external thread can do processing after which it switches
76  * processing to its own worker thread.
77  *
78  * Once created, squeues are never deleted. Hence squeue pointers are
79  * always valid. This means that functions outside the squeue can still
80  * refer safely to conn_sqp and their is no need for ref counts.
81  *
82  * Only a thread executing in the squeue can change the squeue of the
83  * connection. It does so by calling a squeue framework function to do this.
84  * After changing the squeue, the thread must leave the squeue. It must not
85  * continue to execute any code that needs squeue protection.
86  *
87  * The squeue framework, after entering the squeue, checks if the current
88  * squeue matches the conn_sqp. If the check fails, the packet is delivered
89  * to right squeue.
90  *
91  * Polling Model:
92  * --------------
93  *
94  * Squeues can control the rate of packet arrival into itself from the
95  * NIC or specific Rx ring within a NIC. As part of capability negotiation
96  * between IP and MAC layer, squeue are created for each TCP soft ring
97  * (or TCP Rx ring - to be implemented in future). As part of this
98  * negotiation, squeues get a cookie for underlying soft ring or Rx
99  * ring, a function to turn off incoming packets and a function to call
100  * to poll for packets. This helps schedule the receive side packet
101  * processing so that queue backlog doesn't build up and packet processing
102  * doesn't keep getting disturbed by high priority interrupts. As part
103  * of this mode, as soon as a backlog starts building, squeue turns off
104  * the interrupts and switches to poll mode. In poll mode, when poll
105  * thread goes down to retrieve packets, it retrieves them in the form of
106  * a chain which improves performance even more. As the squeue/softring
107  * system gets more packets, it gets more efficient by switching to
108  * polling more often and dealing with larger packet chains.
109  *
110  */
111 
112 #include <sys/types.h>
113 #include <sys/cmn_err.h>
114 #include <sys/debug.h>
115 #include <sys/kmem.h>
116 #include <sys/cpuvar.h>
117 #include <sys/condvar_impl.h>
118 #include <sys/systm.h>
119 #include <sys/callb.h>
120 #include <sys/sdt.h>
121 #include <sys/ddi.h>
122 #include <sys/sunddi.h>
123 
124 #include <inet/ipclassifier.h>
125 #include <inet/udp_impl.h>
126 
127 #include <sys/squeue_impl.h>
128 
129 static void squeue_fire(void *);
130 static void squeue_drain(squeue_t *, uint_t, hrtime_t);
131 static void squeue_worker(squeue_t *sqp);
132 static void squeue_polling_thread(squeue_t *sqp);
133 
134 kmem_cache_t *squeue_cache;
135 
136 #define	SQUEUE_MSEC_TO_NSEC 1000000
137 
138 int squeue_drain_ms = 20;
139 int squeue_workerwait_ms = 0;
140 
141 /* The values above converted to ticks or nano seconds */
142 static int squeue_drain_ns = 0;
143 static int squeue_workerwait_tick = 0;
144 
145 #define	MAX_BYTES_TO_PICKUP	150000
146 
147 #define	ENQUEUE_CHAIN(sqp, mp, tail, cnt) {			\
148 	/*							\
149 	 * Enqueue our mblk chain.				\
150 	 */							\
151 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
152 								\
153 	if ((sqp)->sq_last != NULL)				\
154 		(sqp)->sq_last->b_next = (mp);			\
155 	else							\
156 		(sqp)->sq_first = (mp);				\
157 	(sqp)->sq_last = (tail);				\
158 	(sqp)->sq_count += (cnt);				\
159 	ASSERT((sqp)->sq_count > 0);				\
160 	DTRACE_PROBE4(squeue__enqueuechain, squeue_t *, sqp,	\
161 		mblk_t *, mp, mblk_t *, tail, int, cnt);	\
162 								\
163 }
164 
165 /*
166  * Blank the receive ring (in this case it is the soft ring). When
167  * blanked, the soft ring will not send any more packets up.
168  * Blanking may not succeed when there is a CPU already in the soft
169  * ring sending packets up. In that case, SQS_POLLING will not be
170  * set.
171  */
172 #define	SQS_POLLING_ON(sqp, sq_poll_capable, rx_ring) {		\
173 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
174 	if (sq_poll_capable) {					\
175 		ASSERT(rx_ring != NULL);			\
176 		ASSERT(sqp->sq_state & SQS_POLL_CAPAB);		\
177 		if (!(sqp->sq_state & SQS_POLLING)) {		\
178 			if (rx_ring->rr_intr_disable(rx_ring->rr_intr_handle)) \
179 				sqp->sq_state |= SQS_POLLING;	\
180 		}						\
181 	}							\
182 }
183 
184 #define	SQS_POLLING_OFF(sqp, sq_poll_capable, rx_ring) {	\
185 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
186 	if (sq_poll_capable) {					\
187 		ASSERT(rx_ring != NULL);			\
188 		ASSERT(sqp->sq_state & SQS_POLL_CAPAB);		\
189 		if (sqp->sq_state & SQS_POLLING) {		\
190 			sqp->sq_state &= ~SQS_POLLING;		\
191 			rx_ring->rr_intr_enable(rx_ring->rr_intr_handle); \
192 		}						\
193 	}							\
194 }
195 
196 /* Wakeup poll thread only if SQS_POLLING is set */
197 #define	SQS_POLL_RING(sqp) {			\
198 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
199 	if (sqp->sq_state & SQS_POLLING) {			\
200 		ASSERT(sqp->sq_state & SQS_POLL_CAPAB);		\
201 		if (!(sqp->sq_state & SQS_GET_PKTS)) {		\
202 			sqp->sq_state |= SQS_GET_PKTS;		\
203 			cv_signal(&sqp->sq_poll_cv);		\
204 		}						\
205 	}							\
206 }
207 
208 #ifdef DEBUG
209 #define	SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) {		\
210 	(sqp)->sq_curmp = (mp);					\
211 	(sqp)->sq_curproc = (proc);				\
212 	(sqp)->sq_connp = (connp);				\
213 	(mp)->b_tag = (sqp)->sq_tag = (tag);			\
214 }
215 
216 #define	SQUEUE_DBG_CLEAR(sqp)	{				\
217 	(sqp)->sq_curmp = NULL;					\
218 	(sqp)->sq_curproc = NULL;				\
219 	(sqp)->sq_connp = NULL;					\
220 }
221 #else
222 #define	SQUEUE_DBG_SET(sqp, mp, proc, connp, tag)
223 #define	SQUEUE_DBG_CLEAR(sqp)
224 #endif
225 
226 void
227 squeue_init(void)
228 {
229 	squeue_cache = kmem_cache_create("squeue_cache",
230 	    sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0);
231 
232 	squeue_drain_ns = squeue_drain_ms * SQUEUE_MSEC_TO_NSEC;
233 	squeue_workerwait_tick = MSEC_TO_TICK_ROUNDUP(squeue_workerwait_ms);
234 }
235 
236 /* ARGSUSED */
237 squeue_t *
238 squeue_create(clock_t wait, pri_t pri)
239 {
240 	squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP);
241 
242 	bzero(sqp, sizeof (squeue_t));
243 	sqp->sq_bind = PBIND_NONE;
244 	sqp->sq_priority = pri;
245 	sqp->sq_wait = MSEC_TO_TICK(wait);
246 	sqp->sq_worker = thread_create(NULL, 0, squeue_worker,
247 	    sqp, 0, &p0, TS_RUN, pri);
248 
249 	sqp->sq_poll_thr = thread_create(NULL, 0, squeue_polling_thread,
250 	    sqp, 0, &p0, TS_RUN, pri);
251 
252 	sqp->sq_enter = squeue_enter;
253 	sqp->sq_drain = squeue_drain;
254 
255 	return (sqp);
256 }
257 
258 /*
259  * Bind squeue worker thread to the specified CPU, given by CPU id.
260  * If the CPU id  value is -1, bind the worker thread to the value
261  * specified in sq_bind field. If a thread is already bound to a
262  * different CPU, unbind it from the old CPU and bind to the new one.
263  */
264 
265 void
266 squeue_bind(squeue_t *sqp, processorid_t bind)
267 {
268 	mutex_enter(&sqp->sq_lock);
269 	ASSERT(sqp->sq_bind != PBIND_NONE || bind != PBIND_NONE);
270 	ASSERT(MUTEX_HELD(&cpu_lock));
271 
272 	if (sqp->sq_state & SQS_BOUND) {
273 		if (sqp->sq_bind == bind) {
274 			mutex_exit(&sqp->sq_lock);
275 			return;
276 		}
277 		thread_affinity_clear(sqp->sq_worker);
278 	} else {
279 		sqp->sq_state |= SQS_BOUND;
280 	}
281 
282 	if (bind != PBIND_NONE)
283 		sqp->sq_bind = bind;
284 
285 	thread_affinity_set(sqp->sq_worker, sqp->sq_bind);
286 	mutex_exit(&sqp->sq_lock);
287 }
288 
289 void
290 squeue_unbind(squeue_t *sqp)
291 {
292 	mutex_enter(&sqp->sq_lock);
293 	if (!(sqp->sq_state & SQS_BOUND)) {
294 		mutex_exit(&sqp->sq_lock);
295 		return;
296 	}
297 
298 	sqp->sq_state &= ~SQS_BOUND;
299 	thread_affinity_clear(sqp->sq_worker);
300 	mutex_exit(&sqp->sq_lock);
301 }
302 
303 void
304 squeue_worker_wakeup(squeue_t *sqp)
305 {
306 	timeout_id_t tid = (sqp)->sq_tid;
307 
308 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));
309 
310 	if (sqp->sq_wait == 0) {
311 		ASSERT(tid == 0);
312 		ASSERT(!(sqp->sq_state & SQS_TMO_PROG));
313 		sqp->sq_awaken = ddi_get_lbolt();
314 		cv_signal(&sqp->sq_worker_cv);
315 		mutex_exit(&sqp->sq_lock);
316 		return;
317 	}
318 
319 	/*
320 	 * Queue isn't being processed, so take
321 	 * any post enqueue actions needed before leaving.
322 	 */
323 	if (tid != 0) {
324 		/*
325 		 * Waiting for an enter() to process mblk(s).
326 		 */
327 		clock_t now = ddi_get_lbolt();
328 		clock_t	waited = now - sqp->sq_awaken;
329 
330 		if (TICK_TO_MSEC(waited) >= sqp->sq_wait) {
331 			/*
332 			 * Times up and have a worker thread
333 			 * waiting for work, so schedule it.
334 			 */
335 			sqp->sq_tid = 0;
336 			sqp->sq_awaken = now;
337 			cv_signal(&sqp->sq_worker_cv);
338 			mutex_exit(&sqp->sq_lock);
339 			(void) untimeout(tid);
340 			return;
341 		}
342 		mutex_exit(&sqp->sq_lock);
343 		return;
344 	} else if (sqp->sq_state & SQS_TMO_PROG) {
345 		mutex_exit(&sqp->sq_lock);
346 		return;
347 	} else {
348 		clock_t	wait = sqp->sq_wait;
349 		/*
350 		 * Wait up to sqp->sq_wait ms for an
351 		 * enter() to process this queue. We
352 		 * don't want to contend on timeout locks
353 		 * with sq_lock held for performance reasons,
354 		 * so drop the sq_lock before calling timeout
355 		 * but we need to check if timeout is required
356 		 * after re acquiring the sq_lock. Once
357 		 * the sq_lock is dropped, someone else could
358 		 * have processed the packet or the timeout could
359 		 * have already fired.
360 		 */
361 		sqp->sq_state |= SQS_TMO_PROG;
362 		mutex_exit(&sqp->sq_lock);
363 		tid = timeout(squeue_fire, sqp, wait);
364 		mutex_enter(&sqp->sq_lock);
365 		/* Check again if we still need the timeout */
366 		if (((sqp->sq_state & (SQS_PROC|SQS_TMO_PROG)) ==
367 		    SQS_TMO_PROG) && (sqp->sq_tid == 0) &&
368 		    (sqp->sq_first != NULL)) {
369 				sqp->sq_state &= ~SQS_TMO_PROG;
370 				sqp->sq_tid = tid;
371 				mutex_exit(&sqp->sq_lock);
372 				return;
373 		} else {
374 			if (sqp->sq_state & SQS_TMO_PROG) {
375 				sqp->sq_state &= ~SQS_TMO_PROG;
376 				mutex_exit(&sqp->sq_lock);
377 				(void) untimeout(tid);
378 			} else {
379 				/*
380 				 * The timer fired before we could
381 				 * reacquire the sq_lock. squeue_fire
382 				 * removes the SQS_TMO_PROG flag
383 				 * and we don't need to	do anything
384 				 * else.
385 				 */
386 				mutex_exit(&sqp->sq_lock);
387 			}
388 		}
389 	}
390 
391 	ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
392 }
393 
394 /*
395  * squeue_enter() - enter squeue sqp with mblk mp (which can be
396  * a chain), while tail points to the end and cnt in number of
397  * mblks in the chain.
398  *
399  * For a chain of single packet (i.e. mp == tail), go through the
400  * fast path if no one is processing the squeue and nothing is queued.
401  *
402  * The proc and arg for each mblk is already stored in the mblk in
403  * appropriate places.
404  *
405  * The process_flag specifies if we are allowed to process the mblk
406  * and drain in the entering thread context. If process_flag is
407  * SQ_FILL, then we just queue the mblk and return (after signaling
408  * the worker thread if no one else is processing the squeue).
409  *
410  * The ira argument can be used when the count is one.
411  * For a chain the caller needs to prepend any needed mblks from
412  * ip_recv_attr_to_mblk().
413  */
414 /* ARGSUSED */
415 void
416 squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt,
417     ip_recv_attr_t *ira, int process_flag, uint8_t tag)
418 {
419 	conn_t		*connp;
420 	sqproc_t	proc;
421 	hrtime_t	now;
422 
423 	ASSERT(sqp != NULL);
424 	ASSERT(mp != NULL);
425 	ASSERT(tail != NULL);
426 	ASSERT(cnt > 0);
427 	ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
428 	ASSERT(ira == NULL || cnt == 1);
429 
430 	mutex_enter(&sqp->sq_lock);
431 
432 	/*
433 	 * Try to process the packet if SQ_FILL flag is not set and
434 	 * we are allowed to process the squeue. The SQ_NODRAIN is
435 	 * ignored if the packet chain consists of more than 1 packet.
436 	 */
437 	if (!(sqp->sq_state & SQS_PROC) && ((process_flag == SQ_PROCESS) ||
438 	    (process_flag == SQ_NODRAIN && sqp->sq_first == NULL))) {
439 		/*
440 		 * See if anything is already queued. If we are the
441 		 * first packet, do inline processing else queue the
442 		 * packet and do the drain.
443 		 */
444 		if (sqp->sq_first == NULL && cnt == 1) {
445 			/*
446 			 * Fast-path, ok to process and nothing queued.
447 			 */
448 			sqp->sq_state |= (SQS_PROC|SQS_FAST);
449 			sqp->sq_run = curthread;
450 			mutex_exit(&sqp->sq_lock);
451 
452 			/*
453 			 * We are the chain of 1 packet so
454 			 * go through this fast path.
455 			 */
456 			ASSERT(mp->b_prev != NULL);
457 			ASSERT(mp->b_queue != NULL);
458 			connp = (conn_t *)mp->b_prev;
459 			mp->b_prev = NULL;
460 			proc = (sqproc_t)mp->b_queue;
461 			mp->b_queue = NULL;
462 			ASSERT(proc != NULL && connp != NULL);
463 			ASSERT(mp->b_next == NULL);
464 
465 			/*
466 			 * Handle squeue switching. More details in the
467 			 * block comment at the top of the file
468 			 */
469 			if (connp->conn_sqp == sqp) {
470 				SQUEUE_DBG_SET(sqp, mp, proc, connp,
471 				    tag);
472 				connp->conn_on_sqp = B_TRUE;
473 				DTRACE_PROBE3(squeue__proc__start, squeue_t *,
474 				    sqp, mblk_t *, mp, conn_t *, connp);
475 				(*proc)(connp, mp, sqp, ira);
476 				DTRACE_PROBE2(squeue__proc__end, squeue_t *,
477 				    sqp, conn_t *, connp);
478 				connp->conn_on_sqp = B_FALSE;
479 				SQUEUE_DBG_CLEAR(sqp);
480 				CONN_DEC_REF(connp);
481 			} else {
482 				SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc,
483 				    connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE);
484 			}
485 			ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
486 			mutex_enter(&sqp->sq_lock);
487 			sqp->sq_state &= ~(SQS_PROC|SQS_FAST);
488 			sqp->sq_run = NULL;
489 			if (sqp->sq_first == NULL ||
490 			    process_flag == SQ_NODRAIN) {
491 				if (sqp->sq_first != NULL) {
492 					squeue_worker_wakeup(sqp);
493 					return;
494 				}
495 				/*
496 				 * We processed inline our packet and nothing
497 				 * new has arrived. We are done. In case any
498 				 * control actions are pending, wake up the
499 				 * worker.
500 				 */
501 				if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
502 					cv_signal(&sqp->sq_worker_cv);
503 				mutex_exit(&sqp->sq_lock);
504 				return;
505 			}
506 		} else {
507 			if (ira != NULL) {
508 				mblk_t	*attrmp;
509 
510 				ASSERT(cnt == 1);
511 				attrmp = ip_recv_attr_to_mblk(ira);
512 				if (attrmp == NULL) {
513 					mutex_exit(&sqp->sq_lock);
514 					ip_drop_input("squeue: "
515 					    "ip_recv_attr_to_mblk",
516 					    mp, NULL);
517 					/* Caller already set b_prev/b_next */
518 					mp->b_prev = mp->b_next = NULL;
519 					freemsg(mp);
520 					return;
521 				}
522 				ASSERT(attrmp->b_cont == NULL);
523 				attrmp->b_cont = mp;
524 				/* Move connp and func to new */
525 				attrmp->b_queue = mp->b_queue;
526 				mp->b_queue = NULL;
527 				attrmp->b_prev = mp->b_prev;
528 				mp->b_prev = NULL;
529 
530 				ASSERT(mp == tail);
531 				tail = mp = attrmp;
532 			}
533 
534 			ENQUEUE_CHAIN(sqp, mp, tail, cnt);
535 #ifdef DEBUG
536 			mp->b_tag = tag;
537 #endif
538 		}
539 		/*
540 		 * We are here because either we couldn't do inline
541 		 * processing (because something was already queued),
542 		 * or we had a chain of more than one packet,
543 		 * or something else arrived after we were done with
544 		 * inline processing.
545 		 */
546 		ASSERT(MUTEX_HELD(&sqp->sq_lock));
547 		ASSERT(sqp->sq_first != NULL);
548 		now = gethrtime();
549 		sqp->sq_drain(sqp, SQS_ENTER, now + squeue_drain_ns);
550 
551 		/*
552 		 * If we didn't do a complete drain, the worker
553 		 * thread was already signalled by squeue_drain.
554 		 * In case any control actions are pending, wake
555 		 * up the worker.
556 		 */
557 		sqp->sq_run = NULL;
558 		if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
559 			cv_signal(&sqp->sq_worker_cv);
560 		mutex_exit(&sqp->sq_lock);
561 		return;
562 	} else {
563 		/*
564 		 * We let a thread processing a squeue reenter only
565 		 * once. This helps the case of incoming connection
566 		 * where a SYN-ACK-ACK that triggers the conn_ind
567 		 * doesn't have to queue the packet if listener and
568 		 * eager are on the same squeue. Also helps the
569 		 * loopback connection where the two ends are bound
570 		 * to the same squeue (which is typical on single
571 		 * CPU machines).
572 		 *
573 		 * We let the thread reenter only once for the fear
574 		 * of stack getting blown with multiple traversal.
575 		 */
576 		connp = (conn_t *)mp->b_prev;
577 		if (!(sqp->sq_state & SQS_REENTER) &&
578 		    (process_flag != SQ_FILL) && (sqp->sq_first == NULL) &&
579 		    (sqp->sq_run == curthread) && (cnt == 1) &&
580 		    (connp->conn_on_sqp == B_FALSE)) {
581 			sqp->sq_state |= SQS_REENTER;
582 			mutex_exit(&sqp->sq_lock);
583 
584 			ASSERT(mp->b_prev != NULL);
585 			ASSERT(mp->b_queue != NULL);
586 
587 			mp->b_prev = NULL;
588 			proc = (sqproc_t)mp->b_queue;
589 			mp->b_queue = NULL;
590 
591 			/*
592 			 * Handle squeue switching. More details in the
593 			 * block comment at the top of the file
594 			 */
595 			if (connp->conn_sqp == sqp) {
596 				connp->conn_on_sqp = B_TRUE;
597 				DTRACE_PROBE3(squeue__proc__start, squeue_t *,
598 				    sqp, mblk_t *, mp, conn_t *, connp);
599 				(*proc)(connp, mp, sqp, ira);
600 				DTRACE_PROBE2(squeue__proc__end, squeue_t *,
601 				    sqp, conn_t *, connp);
602 				connp->conn_on_sqp = B_FALSE;
603 				CONN_DEC_REF(connp);
604 			} else {
605 				SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc,
606 				    connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE);
607 			}
608 
609 			mutex_enter(&sqp->sq_lock);
610 			sqp->sq_state &= ~SQS_REENTER;
611 			mutex_exit(&sqp->sq_lock);
612 			return;
613 		}
614 
615 		/*
616 		 * Queue is already being processed or there is already
617 		 * one or more paquets on the queue. Enqueue the
618 		 * packet and wakeup the squeue worker thread if the
619 		 * squeue is not being processed.
620 		 */
621 #ifdef DEBUG
622 		mp->b_tag = tag;
623 #endif
624 		if (ira != NULL) {
625 			mblk_t	*attrmp;
626 
627 			ASSERT(cnt == 1);
628 			attrmp = ip_recv_attr_to_mblk(ira);
629 			if (attrmp == NULL) {
630 				mutex_exit(&sqp->sq_lock);
631 				ip_drop_input("squeue: ip_recv_attr_to_mblk",
632 				    mp, NULL);
633 				/* Caller already set b_prev/b_next */
634 				mp->b_prev = mp->b_next = NULL;
635 				freemsg(mp);
636 				return;
637 			}
638 			ASSERT(attrmp->b_cont == NULL);
639 			attrmp->b_cont = mp;
640 			/* Move connp and func to new */
641 			attrmp->b_queue = mp->b_queue;
642 			mp->b_queue = NULL;
643 			attrmp->b_prev = mp->b_prev;
644 			mp->b_prev = NULL;
645 
646 			ASSERT(mp == tail);
647 			tail = mp = attrmp;
648 		}
649 		ENQUEUE_CHAIN(sqp, mp, tail, cnt);
650 		if (!(sqp->sq_state & SQS_PROC)) {
651 			squeue_worker_wakeup(sqp);
652 			return;
653 		}
654 		/*
655 		 * In case any control actions are pending, wake
656 		 * up the worker.
657 		 */
658 		if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
659 			cv_signal(&sqp->sq_worker_cv);
660 		mutex_exit(&sqp->sq_lock);
661 		return;
662 	}
663 }
664 
665 /*
666  * PRIVATE FUNCTIONS
667  */
668 
669 static void
670 squeue_fire(void *arg)
671 {
672 	squeue_t	*sqp = arg;
673 	uint_t		state;
674 
675 	mutex_enter(&sqp->sq_lock);
676 
677 	state = sqp->sq_state;
678 	if (sqp->sq_tid == 0 && !(state & SQS_TMO_PROG)) {
679 		mutex_exit(&sqp->sq_lock);
680 		return;
681 	}
682 
683 	sqp->sq_tid = 0;
684 	/*
685 	 * The timeout fired before we got a chance to set it.
686 	 * Process it anyway but remove the SQS_TMO_PROG so that
687 	 * the guy trying to set the timeout knows that it has
688 	 * already been processed.
689 	 */
690 	if (state & SQS_TMO_PROG)
691 		sqp->sq_state &= ~SQS_TMO_PROG;
692 
693 	if (!(state & SQS_PROC)) {
694 		sqp->sq_awaken = ddi_get_lbolt();
695 		cv_signal(&sqp->sq_worker_cv);
696 	}
697 	mutex_exit(&sqp->sq_lock);
698 }
699 
700 static void
701 squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire)
702 {
703 	mblk_t		*mp;
704 	mblk_t 		*head;
705 	sqproc_t 	proc;
706 	conn_t		*connp;
707 	timeout_id_t 	tid;
708 	ill_rx_ring_t	*sq_rx_ring = sqp->sq_rx_ring;
709 	hrtime_t 	now;
710 	boolean_t	did_wakeup = B_FALSE;
711 	boolean_t	sq_poll_capable;
712 	ip_recv_attr_t	*ira, iras;
713 
714 	sq_poll_capable = (sqp->sq_state & SQS_POLL_CAPAB) != 0;
715 again:
716 	ASSERT(mutex_owned(&sqp->sq_lock));
717 	ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
718 	    SQS_POLL_QUIESCE_DONE)));
719 
720 	head = sqp->sq_first;
721 	sqp->sq_first = NULL;
722 	sqp->sq_last = NULL;
723 	sqp->sq_count = 0;
724 
725 	if ((tid = sqp->sq_tid) != 0)
726 		sqp->sq_tid = 0;
727 
728 	sqp->sq_state |= SQS_PROC | proc_type;
729 
730 	/*
731 	 * We have backlog built up. Switch to polling mode if the
732 	 * device underneath allows it. Need to do it so that
733 	 * more packets don't come in and disturb us (by contending
734 	 * for sq_lock or higher priority thread preempting us).
735 	 *
736 	 * The worker thread is allowed to do active polling while we
737 	 * just disable the interrupts for drain by non worker (kernel
738 	 * or userland) threads so they can peacefully process the
739 	 * packets during time allocated to them.
740 	 */
741 	SQS_POLLING_ON(sqp, sq_poll_capable, sq_rx_ring);
742 	mutex_exit(&sqp->sq_lock);
743 
744 	if (tid != 0)
745 		(void) untimeout(tid);
746 
747 	while ((mp = head) != NULL) {
748 
749 		head = mp->b_next;
750 		mp->b_next = NULL;
751 
752 		proc = (sqproc_t)mp->b_queue;
753 		mp->b_queue = NULL;
754 		connp = (conn_t *)mp->b_prev;
755 		mp->b_prev = NULL;
756 
757 		/* Is there an ip_recv_attr_t to handle? */
758 		if (ip_recv_attr_is_mblk(mp)) {
759 			mblk_t	*attrmp = mp;
760 
761 			ASSERT(attrmp->b_cont != NULL);
762 
763 			mp = attrmp->b_cont;
764 			attrmp->b_cont = NULL;
765 			ASSERT(mp->b_queue == NULL);
766 			ASSERT(mp->b_prev == NULL);
767 
768 			if (!ip_recv_attr_from_mblk(attrmp, &iras)) {
769 				/* The ill or ip_stack_t disappeared on us */
770 				ip_drop_input("ip_recv_attr_from_mblk",
771 				    mp, NULL);
772 				ira_cleanup(&iras, B_TRUE);
773 				CONN_DEC_REF(connp);
774 				continue;
775 			}
776 			ira = &iras;
777 		} else {
778 			ira = NULL;
779 		}
780 
781 
782 		/*
783 		 * Handle squeue switching. More details in the
784 		 * block comment at the top of the file
785 		 */
786 		if (connp->conn_sqp == sqp) {
787 			SQUEUE_DBG_SET(sqp, mp, proc, connp,
788 			    mp->b_tag);
789 			connp->conn_on_sqp = B_TRUE;
790 			DTRACE_PROBE3(squeue__proc__start, squeue_t *,
791 			    sqp, mblk_t *, mp, conn_t *, connp);
792 			(*proc)(connp, mp, sqp, ira);
793 			DTRACE_PROBE2(squeue__proc__end, squeue_t *,
794 			    sqp, conn_t *, connp);
795 			connp->conn_on_sqp = B_FALSE;
796 			CONN_DEC_REF(connp);
797 		} else {
798 			SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, connp, ira,
799 			    SQ_FILL, SQTAG_SQUEUE_CHANGE);
800 		}
801 		if (ira != NULL)
802 			ira_cleanup(ira, B_TRUE);
803 	}
804 
805 	SQUEUE_DBG_CLEAR(sqp);
806 
807 	mutex_enter(&sqp->sq_lock);
808 
809 	/*
810 	 * Check if there is still work to do (either more arrived or timer
811 	 * expired). If we are the worker thread and we are polling capable,
812 	 * continue doing the work since no one else is around to do the
813 	 * work anyway (but signal the poll thread to retrieve some packets
814 	 * in the meanwhile). If we are not the worker thread, just
815 	 * signal the worker thread to take up the work if processing time
816 	 * has expired.
817 	 */
818 	if (sqp->sq_first != NULL) {
819 		/*
820 		 * Still more to process. If time quanta not expired, we
821 		 * should let the drain go on. The worker thread is allowed
822 		 * to drain as long as there is anything left.
823 		 */
824 		now = gethrtime();
825 		if ((now < expire) || (proc_type == SQS_WORKER)) {
826 			/*
827 			 * If time not expired or we are worker thread and
828 			 * this squeue is polling capable, continue to do
829 			 * the drain.
830 			 *
831 			 * We turn off interrupts for all userland threads
832 			 * doing drain but we do active polling only for
833 			 * worker thread.
834 			 *
835 			 * Calling SQS_POLL_RING() even in the case of
836 			 * SQS_POLLING_ON() not succeeding is ok as
837 			 * SQS_POLL_RING() will not wake up poll thread
838 			 * if SQS_POLLING bit is not set.
839 			 */
840 			if (proc_type == SQS_WORKER)
841 				SQS_POLL_RING(sqp);
842 			goto again;
843 		} else {
844 			did_wakeup = B_TRUE;
845 			sqp->sq_awaken = ddi_get_lbolt();
846 			cv_signal(&sqp->sq_worker_cv);
847 		}
848 	}
849 
850 	/*
851 	 * If the poll thread is already running, just return. The
852 	 * poll thread continues to hold the proc and will finish
853 	 * processing.
854 	 */
855 	if (sqp->sq_state & SQS_GET_PKTS) {
856 		ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
857 		    SQS_POLL_QUIESCE_DONE)));
858 		sqp->sq_state &= ~proc_type;
859 		return;
860 	}
861 
862 	/*
863 	 *
864 	 * If we are the worker thread and no work is left, send the poll
865 	 * thread down once more to see if something arrived. Otherwise,
866 	 * turn the interrupts back on and we are done.
867 	 */
868 	if ((proc_type == SQS_WORKER) && (sqp->sq_state & SQS_POLLING)) {
869 		/*
870 		 * Do one last check to see if anything arrived
871 		 * in the NIC. We leave the SQS_PROC set to ensure
872 		 * that poll thread keeps the PROC and can decide
873 		 * if it needs to turn polling off or continue
874 		 * processing.
875 		 *
876 		 * If we drop the SQS_PROC here and poll thread comes
877 		 * up empty handed, it can not safely turn polling off
878 		 * since someone else could have acquired the PROC
879 		 * and started draining. The previously running poll
880 		 * thread and the current thread doing drain would end
881 		 * up in a race for turning polling on/off and more
882 		 * complex code would be required to deal with it.
883 		 *
884 		 * Its lot simpler for drain to hand the SQS_PROC to
885 		 * poll thread (if running) and let poll thread finish
886 		 * without worrying about racing with any other thread.
887 		 */
888 		ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
889 		    SQS_POLL_QUIESCE_DONE)));
890 		SQS_POLL_RING(sqp);
891 		sqp->sq_state &= ~proc_type;
892 	} else {
893 		/*
894 		 * The squeue is either not capable of polling or the
895 		 * attempt to blank (i.e., turn SQS_POLLING_ON()) was
896 		 * unsuccessful or poll thread already finished
897 		 * processing and didn't find anything. Since there
898 		 * is nothing queued and we already turn polling on
899 		 * (for all threads doing drain), we should turn
900 		 * polling off and relinquish the PROC.
901 		 */
902 		ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
903 		    SQS_POLL_QUIESCE_DONE)));
904 		SQS_POLLING_OFF(sqp, sq_poll_capable, sq_rx_ring);
905 		sqp->sq_state &= ~(SQS_PROC | proc_type);
906 		if (!did_wakeup && sqp->sq_first != NULL) {
907 			squeue_worker_wakeup(sqp);
908 			mutex_enter(&sqp->sq_lock);
909 		}
910 		/*
911 		 * If we are not the worker and there is a pending quiesce
912 		 * event, wake up the worker
913 		 */
914 		if ((proc_type != SQS_WORKER) &&
915 		    (sqp->sq_state & SQS_WORKER_THR_CONTROL))
916 			cv_signal(&sqp->sq_worker_cv);
917 	}
918 }
919 
920 /*
921  * Quiesce, Restart, or Cleanup of the squeue poll thread.
922  *
923  * Quiesce and Restart: After an squeue poll thread has been quiesced, it does
924  * not attempt to poll the underlying soft ring any more. The quiesce is
925  * triggered by the mac layer when it wants to quiesce a soft ring. Typically
926  * control operations such as changing the fanout of a NIC or VNIC (dladm
927  * setlinkprop) need to quiesce data flow before changing the wiring.
928  * The operation is done by the mac layer, but it calls back into IP to
929  * quiesce the soft ring. After completing the operation (say increase or
930  * decrease of the fanout) the mac layer then calls back into IP to restart
931  * the quiesced soft ring.
932  *
933  * Cleanup: This is triggered when the squeue binding to a soft ring is
934  * removed permanently. Typically interface plumb and unplumb would trigger
935  * this. It can also be triggered from the mac layer when a soft ring is
936  * being deleted say as the result of a fanout reduction. Since squeues are
937  * never deleted, the cleanup marks the squeue as fit for recycling and
938  * moves it to the zeroth squeue set.
939  */
940 static void
941 squeue_poll_thr_control(squeue_t *sqp)
942 {
943 	if (sqp->sq_state & SQS_POLL_THR_RESTART) {
944 		/* Restart implies a previous quiesce */
945 		ASSERT(sqp->sq_state & SQS_POLL_THR_QUIESCED);
946 		sqp->sq_state &= ~(SQS_POLL_THR_QUIESCED |
947 		    SQS_POLL_THR_RESTART);
948 		sqp->sq_state |= SQS_POLL_CAPAB;
949 		cv_signal(&sqp->sq_worker_cv);
950 		return;
951 	}
952 
953 	if (sqp->sq_state & SQS_POLL_THR_QUIESCE) {
954 		sqp->sq_state |= SQS_POLL_THR_QUIESCED;
955 		sqp->sq_state &= ~SQS_POLL_THR_QUIESCE;
956 		cv_signal(&sqp->sq_worker_cv);
957 		return;
958 	}
959 }
960 
961 /*
962  * POLLING Notes
963  *
964  * With polling mode, we want to do as much processing as we possibly can
965  * in worker thread context. The sweet spot is worker thread keeps doing
966  * work all the time in polling mode and writers etc. keep dumping packets
967  * to worker thread. Occassionally, we send the poll thread (running at
968  * lower priority to NIC to get the chain of packets to feed to worker).
969  * Sending the poll thread down to NIC is dependant on 3 criterions
970  *
971  * 1) Its always driven from squeue_drain and only if worker thread is
972  *	doing the drain.
973  * 2) We clear the backlog once and more packets arrived in between.
974  *	Before starting drain again, send the poll thread down if
975  *	the drain is being done by worker thread.
976  * 3) Before exiting the squeue_drain, if the poll thread is not already
977  *	working and we are the worker thread, try to poll one more time.
978  *
979  * For latency sake, we do allow any thread calling squeue_enter
980  * to process its packet provided:
981  *
982  * 1) Nothing is queued
983  * 2) If more packets arrived in between, the non worker thread are allowed
984  *	to do the drain till their time quanta expired provided SQS_GET_PKTS
985  *	wasn't set in between.
986  *
987  * Avoiding deadlocks with interrupts
988  * ==================================
989  *
990  * One of the big problem is that we can't send poll_thr down while holding
991  * the sq_lock since the thread can block. So we drop the sq_lock before
992  * calling sq_get_pkts(). We keep holding the SQS_PROC as long as the
993  * poll thread is running so that no other thread can acquire the
994  * perimeter in between. If the squeue_drain gets done (no more work
995  * left), it leaves the SQS_PROC set if poll thread is running.
996  */
997 
998 /*
999  * This is the squeue poll thread. In poll mode, it polls the underlying
1000  * TCP softring and feeds packets into the squeue. The worker thread then
1001  * drains the squeue. The poll thread also responds to control signals for
1002  * quiesceing, restarting, or cleanup of an squeue. These are driven by
1003  * control operations like plumb/unplumb or as a result of dynamic Rx ring
1004  * related operations that are driven from the mac layer.
1005  */
1006 static void
1007 squeue_polling_thread(squeue_t *sqp)
1008 {
1009 	kmutex_t *lock = &sqp->sq_lock;
1010 	kcondvar_t *async = &sqp->sq_poll_cv;
1011 	ip_mac_rx_t sq_get_pkts;
1012 	ip_accept_t ip_accept;
1013 	ill_rx_ring_t *sq_rx_ring;
1014 	ill_t *sq_ill;
1015 	mblk_t *head, *tail, *mp;
1016 	uint_t cnt;
1017 	void *sq_mac_handle;
1018 	callb_cpr_t cprinfo;
1019 	size_t bytes_to_pickup;
1020 	uint32_t ctl_state;
1021 
1022 	CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_poll");
1023 	mutex_enter(lock);
1024 
1025 	for (;;) {
1026 		CALLB_CPR_SAFE_BEGIN(&cprinfo);
1027 		cv_wait(async, lock);
1028 		CALLB_CPR_SAFE_END(&cprinfo, lock);
1029 
1030 		ctl_state = sqp->sq_state & (SQS_POLL_THR_CONTROL |
1031 		    SQS_POLL_THR_QUIESCED);
1032 		if (ctl_state != 0) {
1033 			/*
1034 			 * If the squeue is quiesced, then wait for a control
1035 			 * request. A quiesced squeue must not poll the
1036 			 * underlying soft ring.
1037 			 */
1038 			if (ctl_state == SQS_POLL_THR_QUIESCED)
1039 				continue;
1040 			/*
1041 			 * Act on control requests to quiesce, cleanup or
1042 			 * restart an squeue
1043 			 */
1044 			squeue_poll_thr_control(sqp);
1045 			continue;
1046 		}
1047 
1048 		if (!(sqp->sq_state & SQS_POLL_CAPAB))
1049 			continue;
1050 
1051 		ASSERT((sqp->sq_state &
1052 		    (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) ==
1053 		    (SQS_PROC|SQS_POLLING|SQS_GET_PKTS));
1054 
1055 poll_again:
1056 		sq_rx_ring = sqp->sq_rx_ring;
1057 		sq_get_pkts = sq_rx_ring->rr_rx;
1058 		sq_mac_handle = sq_rx_ring->rr_rx_handle;
1059 		ip_accept = sq_rx_ring->rr_ip_accept;
1060 		sq_ill = sq_rx_ring->rr_ill;
1061 		bytes_to_pickup = MAX_BYTES_TO_PICKUP;
1062 		mutex_exit(lock);
1063 		head = sq_get_pkts(sq_mac_handle, bytes_to_pickup);
1064 		mp = NULL;
1065 		if (head != NULL) {
1066 			/*
1067 			 * We got the packet chain from the mac layer. It
1068 			 * would be nice to be able to process it inline
1069 			 * for better performance but we need to give
1070 			 * IP a chance to look at this chain to ensure
1071 			 * that packets are really meant for this squeue
1072 			 * and do the IP processing.
1073 			 */
1074 			mp = ip_accept(sq_ill, sq_rx_ring, sqp, head,
1075 			    &tail, &cnt);
1076 		}
1077 		mutex_enter(lock);
1078 		if (mp != NULL) {
1079 			/*
1080 			 * The ip_accept function has already added an
1081 			 * ip_recv_attr_t mblk if that is needed.
1082 			 */
1083 			ENQUEUE_CHAIN(sqp, mp, tail, cnt);
1084 		}
1085 		ASSERT((sqp->sq_state &
1086 		    (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) ==
1087 		    (SQS_PROC|SQS_POLLING|SQS_GET_PKTS));
1088 
1089 		if (sqp->sq_first != NULL && !(sqp->sq_state & SQS_WORKER)) {
1090 			/*
1091 			 * We have packets to process and worker thread
1092 			 * is not running.  Check to see if poll thread is
1093 			 * allowed to process. Let it do processing only if it
1094 			 * picked up some packets from the NIC otherwise
1095 			 * wakeup the worker thread.
1096 			 */
1097 			if (mp != NULL) {
1098 				hrtime_t  now;
1099 
1100 				now = gethrtime();
1101 				sqp->sq_run = curthread;
1102 				sqp->sq_drain(sqp, SQS_POLL_PROC, now +
1103 				    squeue_drain_ns);
1104 				sqp->sq_run = NULL;
1105 
1106 				if (sqp->sq_first == NULL)
1107 					goto poll_again;
1108 
1109 				/*
1110 				 * Couldn't do the entire drain because the
1111 				 * time limit expired, let the
1112 				 * worker thread take over.
1113 				 */
1114 			}
1115 
1116 			sqp->sq_awaken = ddi_get_lbolt();
1117 			/*
1118 			 * Put the SQS_PROC_HELD on so the worker
1119 			 * thread can distinguish where its called from. We
1120 			 * can remove the SQS_PROC flag here and turn off the
1121 			 * polling so that it wouldn't matter who gets the
1122 			 * processing but we get better performance this way
1123 			 * and save the cost of turn polling off and possibly
1124 			 * on again as soon as we start draining again.
1125 			 *
1126 			 * We can't remove the SQS_PROC flag without turning
1127 			 * polling off until we can guarantee that control
1128 			 * will return to squeue_drain immediately.
1129 			 */
1130 			sqp->sq_state |= SQS_PROC_HELD;
1131 			sqp->sq_state &= ~SQS_GET_PKTS;
1132 			cv_signal(&sqp->sq_worker_cv);
1133 		} else if (sqp->sq_first == NULL &&
1134 		    !(sqp->sq_state & SQS_WORKER)) {
1135 			/*
1136 			 * Nothing queued and worker thread not running.
1137 			 * Since we hold the proc, no other thread is
1138 			 * processing the squeue. This means that there
1139 			 * is no work to be done and nothing is queued
1140 			 * in squeue or in NIC. Turn polling off and go
1141 			 * back to interrupt mode.
1142 			 */
1143 			sqp->sq_state &= ~(SQS_PROC|SQS_GET_PKTS);
1144 			/* LINTED: constant in conditional context */
1145 			SQS_POLLING_OFF(sqp, B_TRUE, sq_rx_ring);
1146 
1147 			/*
1148 			 * If there is a pending control operation
1149 			 * wake up the worker, since it is currently
1150 			 * not running.
1151 			 */
1152 			if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
1153 				cv_signal(&sqp->sq_worker_cv);
1154 		} else {
1155 			/*
1156 			 * Worker thread is already running. We don't need
1157 			 * to do anything. Indicate that poll thread is done.
1158 			 */
1159 			sqp->sq_state &= ~SQS_GET_PKTS;
1160 		}
1161 		if (sqp->sq_state & SQS_POLL_THR_CONTROL) {
1162 			/*
1163 			 * Act on control requests to quiesce, cleanup or
1164 			 * restart an squeue
1165 			 */
1166 			squeue_poll_thr_control(sqp);
1167 		}
1168 	}
1169 }
1170 
1171 /*
1172  * The squeue worker thread acts on any control requests to quiesce, cleanup
1173  * or restart an ill_rx_ring_t by calling this function. The worker thread
1174  * synchronizes with the squeue poll thread to complete the request and finally
1175  * wakes up the requestor when the request is completed.
1176  */
1177 static void
1178 squeue_worker_thr_control(squeue_t *sqp)
1179 {
1180 	ill_t	*ill;
1181 	ill_rx_ring_t	*rx_ring;
1182 
1183 	ASSERT(MUTEX_HELD(&sqp->sq_lock));
1184 
1185 	if (sqp->sq_state & SQS_POLL_RESTART) {
1186 		/* Restart implies a previous quiesce. */
1187 		ASSERT((sqp->sq_state & (SQS_PROC_HELD |
1188 		    SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)) ==
1189 		    (SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER));
1190 		/*
1191 		 * Request the squeue poll thread to restart and wait till
1192 		 * it actually restarts.
1193 		 */
1194 		sqp->sq_state &= ~SQS_POLL_QUIESCE_DONE;
1195 		sqp->sq_state |= SQS_POLL_THR_RESTART;
1196 		cv_signal(&sqp->sq_poll_cv);
1197 		while (sqp->sq_state & SQS_POLL_THR_QUIESCED)
1198 			cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1199 		sqp->sq_state &= ~(SQS_POLL_RESTART | SQS_PROC |
1200 		    SQS_WORKER);
1201 		/*
1202 		 * Signal any waiter that is waiting for the restart
1203 		 * to complete
1204 		 */
1205 		sqp->sq_state |= SQS_POLL_RESTART_DONE;
1206 		cv_signal(&sqp->sq_ctrlop_done_cv);
1207 		return;
1208 	}
1209 
1210 	if (sqp->sq_state & SQS_PROC_HELD) {
1211 		/* The squeue poll thread handed control to us */
1212 		ASSERT(sqp->sq_state & SQS_PROC);
1213 	}
1214 
1215 	/*
1216 	 * Prevent any other thread from processing the squeue
1217 	 * until we finish the control actions by setting SQS_PROC.
1218 	 * But allow ourself to reenter by setting SQS_WORKER
1219 	 */
1220 	sqp->sq_state |= (SQS_PROC | SQS_WORKER);
1221 
1222 	/* Signal the squeue poll thread and wait for it to quiesce itself */
1223 	if (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) {
1224 		sqp->sq_state |= SQS_POLL_THR_QUIESCE;
1225 		cv_signal(&sqp->sq_poll_cv);
1226 		while (!(sqp->sq_state & SQS_POLL_THR_QUIESCED))
1227 			cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1228 	}
1229 
1230 	rx_ring = sqp->sq_rx_ring;
1231 	ill = rx_ring->rr_ill;
1232 	/*
1233 	 * The lock hierarchy is as follows.
1234 	 * cpu_lock -> ill_lock -> sqset_lock -> sq_lock
1235 	 */
1236 	mutex_exit(&sqp->sq_lock);
1237 	mutex_enter(&ill->ill_lock);
1238 	mutex_enter(&sqp->sq_lock);
1239 
1240 	SQS_POLLING_OFF(sqp, (sqp->sq_state & SQS_POLL_CAPAB) != 0,
1241 	    sqp->sq_rx_ring);
1242 	sqp->sq_state &= ~(SQS_POLL_CAPAB | SQS_GET_PKTS | SQS_PROC_HELD);
1243 	if (sqp->sq_state & SQS_POLL_CLEANUP) {
1244 		/*
1245 		 * Disassociate this squeue from its ill_rx_ring_t.
1246 		 * The rr_sqp, sq_rx_ring fields are protected by the
1247 		 * corresponding squeue, ill_lock* and sq_lock. Holding any
1248 		 * of them will ensure that the ring to squeue mapping does
1249 		 * not change.
1250 		 */
1251 		ASSERT(!(sqp->sq_state & SQS_DEFAULT));
1252 
1253 		sqp->sq_rx_ring = NULL;
1254 		rx_ring->rr_sqp = NULL;
1255 
1256 		sqp->sq_state &= ~(SQS_POLL_CLEANUP | SQS_POLL_THR_QUIESCED |
1257 		    SQS_POLL_QUIESCE_DONE);
1258 		sqp->sq_ill = NULL;
1259 
1260 		rx_ring->rr_rx_handle = NULL;
1261 		rx_ring->rr_intr_handle = NULL;
1262 		rx_ring->rr_intr_enable = NULL;
1263 		rx_ring->rr_intr_disable = NULL;
1264 		sqp->sq_state |= SQS_POLL_CLEANUP_DONE;
1265 	} else {
1266 		sqp->sq_state &= ~SQS_POLL_QUIESCE;
1267 		sqp->sq_state |= SQS_POLL_QUIESCE_DONE;
1268 	}
1269 	/*
1270 	 * Signal any waiter that is waiting for the quiesce or cleanup
1271 	 * to complete and also wait for it to actually see and reset the
1272 	 * SQS_POLL_CLEANUP_DONE.
1273 	 */
1274 	cv_signal(&sqp->sq_ctrlop_done_cv);
1275 	mutex_exit(&ill->ill_lock);
1276 	if (sqp->sq_state & SQS_POLL_CLEANUP_DONE) {
1277 		cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1278 		sqp->sq_state &= ~(SQS_PROC | SQS_WORKER);
1279 	}
1280 }
1281 
1282 static void
1283 squeue_worker(squeue_t *sqp)
1284 {
1285 	kmutex_t *lock = &sqp->sq_lock;
1286 	kcondvar_t *async = &sqp->sq_worker_cv;
1287 	callb_cpr_t cprinfo;
1288 	hrtime_t now;
1289 
1290 	CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_worker");
1291 	mutex_enter(lock);
1292 
1293 	for (;;) {
1294 		for (;;) {
1295 			/*
1296 			 * If the poll thread has handed control to us
1297 			 * we need to break out of the wait.
1298 			 */
1299 			if (sqp->sq_state & SQS_PROC_HELD)
1300 				break;
1301 
1302 			/*
1303 			 * If the squeue is not being processed and we either
1304 			 * have messages to drain or some thread has signaled
1305 			 * some control activity we need to break
1306 			 */
1307 			if (!(sqp->sq_state & SQS_PROC) &&
1308 			    ((sqp->sq_state & SQS_WORKER_THR_CONTROL) ||
1309 			    (sqp->sq_first != NULL)))
1310 				break;
1311 
1312 			/*
1313 			 * If we have started some control action, then check
1314 			 * for the SQS_WORKER flag (since we don't
1315 			 * release the squeue) to make sure we own the squeue
1316 			 * and break out
1317 			 */
1318 			if ((sqp->sq_state & SQS_WORKER_THR_CONTROL) &&
1319 			    (sqp->sq_state & SQS_WORKER))
1320 				break;
1321 
1322 			CALLB_CPR_SAFE_BEGIN(&cprinfo);
1323 			cv_wait(async, lock);
1324 			CALLB_CPR_SAFE_END(&cprinfo, lock);
1325 		}
1326 		if (sqp->sq_state & SQS_WORKER_THR_CONTROL) {
1327 			squeue_worker_thr_control(sqp);
1328 			continue;
1329 		}
1330 		ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
1331 		    SQS_POLL_CLEANUP_DONE | SQS_POLL_QUIESCE_DONE |
1332 		    SQS_WORKER_THR_CONTROL | SQS_POLL_THR_CONTROL)));
1333 
1334 		if (sqp->sq_state & SQS_PROC_HELD)
1335 			sqp->sq_state &= ~SQS_PROC_HELD;
1336 
1337 		now = gethrtime();
1338 		sqp->sq_run = curthread;
1339 		sqp->sq_drain(sqp, SQS_WORKER, now +  squeue_drain_ns);
1340 		sqp->sq_run = NULL;
1341 	}
1342 }
1343 
1344 uintptr_t *
1345 squeue_getprivate(squeue_t *sqp, sqprivate_t p)
1346 {
1347 	ASSERT(p < SQPRIVATE_MAX);
1348 
1349 	return (&sqp->sq_private[p]);
1350 }
1351 
1352 /* ARGSUSED */
1353 void
1354 squeue_wakeup_conn(void *arg, mblk_t *mp, void *arg2, ip_recv_attr_t *dummy)
1355 {
1356 	conn_t *connp = (conn_t *)arg;
1357 	squeue_t *sqp = connp->conn_sqp;
1358 
1359 	/*
1360 	 * Mark the squeue as paused before waking up the thread stuck
1361 	 * in squeue_synch_enter().
1362 	 */
1363 	mutex_enter(&sqp->sq_lock);
1364 	sqp->sq_state |= SQS_PAUSE;
1365 
1366 	/*
1367 	 * Notify the thread that it's OK to proceed; that is done by
1368 	 * clearing the MSGWAITSYNC flag. The synch thread will free the mblk.
1369 	 */
1370 	ASSERT(mp->b_flag & MSGWAITSYNC);
1371 	mp->b_flag &= ~MSGWAITSYNC;
1372 	cv_broadcast(&connp->conn_sq_cv);
1373 
1374 	/*
1375 	 * We are doing something on behalf of another thread, so we have to
1376 	 * pause and wait until it finishes.
1377 	 */
1378 	while (sqp->sq_state & SQS_PAUSE) {
1379 		cv_wait(&sqp->sq_synch_cv, &sqp->sq_lock);
1380 	}
1381 	mutex_exit(&sqp->sq_lock);
1382 }
1383 
1384 int
1385 squeue_synch_enter(conn_t *connp, mblk_t *use_mp)
1386 {
1387 	squeue_t *sqp;
1388 
1389 again:
1390 	sqp = connp->conn_sqp;
1391 
1392 	mutex_enter(&sqp->sq_lock);
1393 	if (sqp->sq_first == NULL && !(sqp->sq_state & SQS_PROC)) {
1394 		/*
1395 		 * We are OK to proceed if the squeue is empty, and
1396 		 * no one owns the squeue.
1397 		 *
1398 		 * The caller won't own the squeue as this is called from the
1399 		 * application.
1400 		 */
1401 		ASSERT(sqp->sq_run == NULL);
1402 
1403 		sqp->sq_state |= SQS_PROC;
1404 		sqp->sq_run = curthread;
1405 		mutex_exit(&sqp->sq_lock);
1406 
1407 		/*
1408 		 * Handle squeue switching. The conn's squeue can only change
1409 		 * while there is a thread in the squeue, which is why we do
1410 		 * the check after entering the squeue. If it has changed, exit
1411 		 * this squeue and redo everything with the new sqeueue.
1412 		 */
1413 		if (sqp != connp->conn_sqp) {
1414 			mutex_enter(&sqp->sq_lock);
1415 			sqp->sq_state &= ~SQS_PROC;
1416 			sqp->sq_run = NULL;
1417 			mutex_exit(&sqp->sq_lock);
1418 			goto again;
1419 		}
1420 #if SQUEUE_DEBUG
1421 		sqp->sq_curmp = NULL;
1422 		sqp->sq_curproc = NULL;
1423 		sqp->sq_connp = connp;
1424 #endif
1425 		connp->conn_on_sqp = B_TRUE;
1426 		return (0);
1427 	} else {
1428 		mblk_t  *mp;
1429 
1430 		mp = (use_mp == NULL) ? allocb(0, BPRI_MED) : use_mp;
1431 		if (mp == NULL) {
1432 			mutex_exit(&sqp->sq_lock);
1433 			return (ENOMEM);
1434 		}
1435 
1436 		/*
1437 		 * We mark the mblk as awaiting synchronous squeue access
1438 		 * by setting the MSGWAITSYNC flag. Once squeue_wakeup_conn
1439 		 * fires, MSGWAITSYNC is cleared, at which point we know we
1440 		 * have exclusive access.
1441 		 */
1442 		mp->b_flag |= MSGWAITSYNC;
1443 
1444 		CONN_INC_REF(connp);
1445 		SET_SQUEUE(mp, squeue_wakeup_conn, connp);
1446 		ENQUEUE_CHAIN(sqp, mp, mp, 1);
1447 
1448 		ASSERT(sqp->sq_run != curthread);
1449 
1450 		/* Wait until the enqueued mblk get processed. */
1451 		while (mp->b_flag & MSGWAITSYNC)
1452 			cv_wait(&connp->conn_sq_cv, &sqp->sq_lock);
1453 		mutex_exit(&sqp->sq_lock);
1454 
1455 		if (use_mp == NULL)
1456 			freeb(mp);
1457 
1458 		return (0);
1459 	}
1460 }
1461 
1462 void
1463 squeue_synch_exit(conn_t *connp)
1464 {
1465 	squeue_t *sqp = connp->conn_sqp;
1466 
1467 	mutex_enter(&sqp->sq_lock);
1468 	if (sqp->sq_run == curthread) {
1469 		ASSERT(sqp->sq_state & SQS_PROC);
1470 
1471 		sqp->sq_state &= ~SQS_PROC;
1472 		sqp->sq_run = NULL;
1473 		connp->conn_on_sqp = B_FALSE;
1474 
1475 		if (sqp->sq_first == NULL) {
1476 			mutex_exit(&sqp->sq_lock);
1477 		} else {
1478 			/*
1479 			 * If this was a normal thread, then it would
1480 			 * (most likely) continue processing the pending
1481 			 * requests. Since the just completed operation
1482 			 * was executed synchronously, the thread should
1483 			 * not be delayed. To compensate, wake up the
1484 			 * worker thread right away when there are outstanding
1485 			 * requests.
1486 			 */
1487 			sqp->sq_awaken = ddi_get_lbolt();
1488 			cv_signal(&sqp->sq_worker_cv);
1489 			mutex_exit(&sqp->sq_lock);
1490 		}
1491 	} else {
1492 		/*
1493 		 * The caller doesn't own the squeue, clear the SQS_PAUSE flag,
1494 		 * and wake up the squeue owner, such that owner can continue
1495 		 * processing.
1496 		 */
1497 		ASSERT(sqp->sq_state & SQS_PAUSE);
1498 		sqp->sq_state &= ~SQS_PAUSE;
1499 
1500 		/* There should be only one thread blocking on sq_synch_cv. */
1501 		cv_signal(&sqp->sq_synch_cv);
1502 		mutex_exit(&sqp->sq_lock);
1503 	}
1504 }
1505