xref: /titanic_52/usr/src/uts/common/inet/squeue.c (revision 505d05c73a6e56769f263d4803b22eddd168ee24)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*
23  * Copyright 2005 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 /*
30  * Squeues - TCP/IP serialization mechanism.
31  *
32  * This is a general purpose high-performance serialization mechanism. It is
33  * similar to a taskq with a single worker thread, the difference is that it
34  * does not imply a context switch - the thread placing a request may actually
35  * process it. It is also biased for processing requests in interrupt context.
36  *
37  * Each squeue has a worker thread which may optionally be bound to a CPU.
38  *
39  * Only one thread may process requests from a given squeue at any time. This is
40  * called "entering" squeue.
41  *
42  * Each dispatched request is processed either by
43  *
44  *	a) Dispatching thread or
45  *	b) Some other thread that is currently processing squeue at the time of
46  *		request or
47  *	c) worker thread.
48  *
49  * INTERFACES:
50  *
51  * squeue_t *squeue_create(name, bind, wait, pri)
52  *
53  *	name: symbolic name for squeue.
54  *	wait: time to wait before waiking the worker thread after queueing
55  *		request.
56  *	bind: preferred CPU binding for the worker thread.
57  *	pri:  thread priority for the worker thread.
58  *
59  *   This function never fails and may sleep. It returns a transparent pointer
60  *   to the squeue_t structure that is passed to all other squeue operations.
61  *
62  * void squeue_bind(sqp, bind)
63  *
64  *   Bind squeue worker thread to a CPU specified by the 'bind' argument. The
65  *   'bind' value of -1 binds to the preferred thread specified for
66  *   squeue_create.
67  *
68  *   NOTE: Any value of 'bind' other then -1 is not supported currently, but the
69  *	 API is present - in the future it may be useful to specify different
70  *	 binding.
71  *
72  * void squeue_unbind(sqp)
73  *
74  *   Unbind the worker thread from its preferred CPU.
75  *
76  * void squeue_enter(*sqp, *mp, proc, arg, tag)
77  *
78  *   Post a single request for processing. Each request consists of mblock 'mp',
79  *   function 'proc' to execute and an argument 'arg' to pass to this
80  *   function. The function is called as (*proc)(arg, mp, sqp); The tag is an
81  *   arbitrary number from 0 to 255 which will be stored in mp to track exact
82  *   caller of squeue_enter. The combination of function name and the tag should
83  *   provide enough information to identify the caller.
84  *
85  *   If no one is processing the squeue, squeue_enter() will call the function
86  *   immediately. Otherwise it will add the request to the queue for later
87  *   processing. Once the function is executed, the thread may continue
88  *   executing all other requests pending on the queue.
89  *
90  *   NOTE: The tagging information is only used when SQUEUE_DEBUG is set to 1.
91  *   NOTE: The argument can be conn_t only. Ideally we'd like to have generic
92  *	   argument, but we want to drop connection reference count here - this
93  *	   improves tail-call optimizations.
94  *	   XXX: The arg should have type conn_t.
95  *
96  * void squeue_enter_nodrain(*sqp, *mp, proc, arg, tag)
97  *
98  *   Same as squeue_enter(), but the entering thread will only try to execute a
99  *   single request. It will not continue executing any pending requests.
100  *
101  * void squeue_fill(*sqp, *mp, proc, arg, tag)
102  *
103  *   Just place the request on the queue without trying to execute it. Arrange
104  *   for the worker thread to process the request.
105  *
106  * void squeue_profile_enable(sqp)
107  * void squeue_profile_disable(sqp)
108  *
109  *    Enable or disable profiling for specified 'sqp'. Profiling is only
110  *    available when SQUEUE_PROFILE is set.
111  *
112  * void squeue_profile_reset(sqp)
113  *
114  *    Reset all profiling information to zero. Profiling is only
115  *    available when SQUEUE_PROFILE is set.
116  *
117  * void squeue_profile_start()
118  * void squeue_profile_stop()
119  *
120  *    Globally enable or disabled profiling for all squeues.
121  *
122  * uintptr_t *squeue_getprivate(sqp, p)
123  *
124  *    Each squeue keeps small amount of private data space available for various
125  *    consumers. Current consumers include TCP and NCA. Other consumers need to
126  *    add their private tag to the sqprivate_t enum. The private information is
127  *    limited to an uintptr_t value. The squeue has no knowledge of its content
128  *    and does not manage it in any way.
129  *
130  *    The typical use may be a breakdown of data structures per CPU (since
131  *    squeues are usually per CPU). See NCA for examples of use.
132  *    Currently 'p' may have one legal value SQPRIVATE_TCP.
133  *
134  * processorid_t squeue_binding(sqp)
135  *
136  *    Returns the CPU binding for a given squeue.
137  *
138  * TUNABALES:
139  *
140  * squeue_intrdrain_ms: Maximum time in ms interrupts spend draining any
141  *	squeue. Note that this is approximation - squeues have no control on the
142  *	time it takes to process each request. This limit is only checked
143  *	between processing individual messages.
144  *    Default: 20 ms.
145  *
146  * squeue_writerdrain_ms: Maximum time in ms non-interrupts spend draining any
147  *	squeue. Note that this is approximation - squeues have no control on the
148  *	time it takes to process each request. This limit is only checked
149  *	between processing individual messages.
150  *    Default: 10 ms.
151  *
152  * squeue_workerdrain_ms: Maximum time in ms worker thread spends draining any
153  *	squeue. Note that this is approximation - squeues have no control on the
154  *	time it takes to process each request. This limit is only checked
155  *	between processing individual messages.
156  *    Default: 10 ms.
157  *
158  * squeue_workerwait_ms: When worker thread is interrupted because workerdrain
159  *	expired, how much time to wait before waking worker thread again.
160  *    Default: 10 ms.
161  *
162  * DEFINES:
163  *
164  * SQUEUE_DEBUG: If defined as 1, special code is compiled in which records
165  *	additional information aiding debugging is recorded in squeue.
166  *
167  * SQUEUE_PROFILE: If defined as 1, special code is compiled in which collects
168  *	various squeue statistics and exports them as kstats.
169  *
170  * Ideally we would like both SQUEUE_DEBUG and SQUEUE_PROFILE to be always set,
171  * but it affects performance, so they are enabled on DEBUG kernels and disabled
172  * on non-DEBUG by default.
173  */
174 
175 #include <sys/types.h>
176 #include <sys/cmn_err.h>
177 #include <sys/debug.h>
178 #include <sys/kmem.h>
179 #include <sys/cpuvar.h>
180 #include <sys/condvar_impl.h>
181 #include <sys/systm.h>
182 #include <sys/callb.h>
183 #include <sys/sdt.h>
184 #include <sys/ddi.h>
185 
186 #include <inet/ipclassifier.h>
187 
188 /*
189  * State flags.
190  * Note: The MDB IP module depends on the values of these flags.
191  */
192 #define	SQS_PROC	0x0001	/* being processed */
193 #define	SQS_WORKER	0x0002	/* worker thread */
194 #define	SQS_ENTER	0x0004	/* enter thread */
195 #define	SQS_FAST	0x0008	/* enter-fast thread */
196 #define	SQS_USER	0x0010	/* A non interrupt user */
197 #define	SQS_BOUND	0x0020	/* Worker thread is bound */
198 #define	SQS_PROFILE	0x0040	/* Enable profiling */
199 #define	SQS_REENTER	0x0080	/* Re entered thread */
200 #define	SQS_TMO_PROG	0x0100	/* Timeout is being set */
201 
202 #ifdef DEBUG
203 #define	SQUEUE_DEBUG 1
204 #define	SQUEUE_PROFILE 1
205 #else
206 #define	SQUEUE_DEBUG 0
207 #define	SQUEUE_PROFILE 0
208 #endif
209 
210 #include <sys/squeue_impl.h>
211 
212 static void squeue_fire(void *);
213 static void squeue_drain(squeue_t *, uint_t, clock_t);
214 static void squeue_worker(squeue_t *sqp);
215 
216 #if SQUEUE_PROFILE
217 static kmutex_t squeue_kstat_lock;
218 static int  squeue_kstat_update(kstat_t *, int);
219 #endif
220 
221 kmem_cache_t *squeue_cache;
222 
223 int squeue_intrdrain_ms = 20;
224 int squeue_writerdrain_ms = 10;
225 int squeue_workerdrain_ms = 10;
226 int squeue_workerwait_ms = 10;
227 
228 /* The values above converted to ticks */
229 static int squeue_intrdrain_tick = 0;
230 static int squeue_writerdrain_tick = 0;
231 static int squeue_workerdrain_tick = 0;
232 static int squeue_workerwait_tick = 0;
233 
234 /*
235  * The minimum packet queued when worker thread doing the drain triggers
236  * polling (if squeue allows it). The choice of 3 is arbitrary. You
237  * definitely don't want it to be 1 since that will trigger polling
238  * on very low loads as well (ssh seems to do be one such example
239  * where packet flow was very low yet somehow 1 packet ended up getting
240  * queued and worker thread fires every 10ms and blanking also gets
241  * triggered.
242  */
243 int squeue_worker_poll_min = 3;
244 
245 #if SQUEUE_PROFILE
246 /*
247  * Set to B_TRUE to enable profiling.
248  */
249 static int squeue_profile = B_FALSE;
250 #define	SQ_PROFILING(sqp) (squeue_profile && ((sqp)->sq_state & SQS_PROFILE))
251 
252 #define	SQSTAT(sqp, x) ((sqp)->sq_stats.x++)
253 #define	SQDELTA(sqp, x, d) ((sqp)->sq_stats.x += (d))
254 
255 struct squeue_kstat {
256 	kstat_named_t	sq_count;
257 	kstat_named_t	sq_max_qlen;
258 	kstat_named_t	sq_npackets_worker;
259 	kstat_named_t	sq_npackets_intr;
260 	kstat_named_t	sq_npackets_other;
261 	kstat_named_t	sq_nqueued_intr;
262 	kstat_named_t	sq_nqueued_other;
263 	kstat_named_t	sq_ndrains_worker;
264 	kstat_named_t	sq_ndrains_intr;
265 	kstat_named_t	sq_ndrains_other;
266 	kstat_named_t	sq_time_worker;
267 	kstat_named_t	sq_time_intr;
268 	kstat_named_t	sq_time_other;
269 } squeue_kstat = {
270 	{ "count",		KSTAT_DATA_UINT64 },
271 	{ "max_qlen",		KSTAT_DATA_UINT64 },
272 	{ "packets_worker",	KSTAT_DATA_UINT64 },
273 	{ "packets_intr",	KSTAT_DATA_UINT64 },
274 	{ "packets_other",	KSTAT_DATA_UINT64 },
275 	{ "queued_intr",	KSTAT_DATA_UINT64 },
276 	{ "queued_other",	KSTAT_DATA_UINT64 },
277 	{ "ndrains_worker",	KSTAT_DATA_UINT64 },
278 	{ "ndrains_intr",	KSTAT_DATA_UINT64 },
279 	{ "ndrains_other",	KSTAT_DATA_UINT64 },
280 	{ "time_worker",	KSTAT_DATA_UINT64 },
281 	{ "time_intr",		KSTAT_DATA_UINT64 },
282 	{ "time_other",		KSTAT_DATA_UINT64 },
283 };
284 #endif
285 
286 #define	SQUEUE_WORKER_WAKEUP(sqp) {					\
287 	timeout_id_t tid = (sqp)->sq_tid;				\
288 									\
289 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));				\
290 	/*								\
291 	 * Queue isn't being processed, so take				\
292 	 * any post enqueue actions needed before leaving.		\
293 	 */								\
294 	if (tid != 0) {							\
295 		/*							\
296 		 * Waiting for an enter() to process mblk(s).		\
297 		 */							\
298 		clock_t	waited = lbolt - (sqp)->sq_awaken;		\
299 									\
300 		if (TICK_TO_MSEC(waited) >= (sqp)->sq_wait) {		\
301 			/*						\
302 			 * Times up and have a worker thread		\
303 			 * waiting for work, so schedule it.		\
304 			 */						\
305 			(sqp)->sq_tid = 0;				\
306 			(sqp)->sq_awaken = lbolt;			\
307 			cv_signal(&(sqp)->sq_async);			\
308 			mutex_exit(&(sqp)->sq_lock);			\
309 			(void) untimeout(tid);				\
310 			return;						\
311 		}							\
312 		mutex_exit(&(sqp)->sq_lock);				\
313 		return;							\
314 	} else if ((sqp)->sq_state & SQS_TMO_PROG) {			\
315 		mutex_exit(&(sqp)->sq_lock);				\
316 		return;							\
317 	} else if ((sqp)->sq_wait != 0) {				\
318 		clock_t	wait = (sqp)->sq_wait;				\
319 		/*							\
320 		 * Wait up to sqp->sq_wait ms for an			\
321 		 * enter() to process this queue. We			\
322 		 * don't want to contend on timeout locks		\
323 		 * with sq_lock held for performance reasons,		\
324 		 * so drop the sq_lock before calling timeout		\
325 		 * but we need to check if timeout is required		\
326 		 * after re acquiring the sq_lock. Once			\
327 		 * the sq_lock is dropped, someone else could		\
328 		 * have processed the packet or the timeout could	\
329 		 * have already fired.					\
330 		 */							\
331 		(sqp)->sq_state |= SQS_TMO_PROG;			\
332 		mutex_exit(&(sqp)->sq_lock);				\
333 		tid = timeout(squeue_fire, (sqp), wait);		\
334 		mutex_enter(&(sqp)->sq_lock);				\
335 		/* Check again if we still need the timeout */		\
336 		if ((((sqp)->sq_state & (SQS_PROC|SQS_TMO_PROG)) ==	\
337 			SQS_TMO_PROG) && ((sqp)->sq_tid == 0) &&	\
338 			((sqp)->sq_first != NULL)) {			\
339 				(sqp)->sq_state &= ~SQS_TMO_PROG;	\
340 				(sqp)->sq_awaken = lbolt;		\
341 				(sqp)->sq_tid = tid;			\
342 				mutex_exit(&(sqp)->sq_lock);		\
343 				return;					\
344 		} else {						\
345 			if ((sqp)->sq_state & SQS_TMO_PROG) {		\
346 				(sqp)->sq_state &= ~SQS_TMO_PROG;	\
347 				mutex_exit(&(sqp)->sq_lock);		\
348 				(void) untimeout(tid);			\
349 			} else {					\
350 				/*					\
351 				 * The timer fired before we could 	\
352 				 * reacquire the sq_lock. squeue_fire	\
353 				 * removes the SQS_TMO_PROG flag	\
354 				 * and we don't need to	do anything	\
355 				 * else.				\
356 				 */					\
357 				mutex_exit(&(sqp)->sq_lock);		\
358 			}						\
359 		}							\
360 	} else {							\
361 		/*							\
362 		 * Schedule the worker thread.				\
363 		 */							\
364 		(sqp)->sq_awaken = lbolt;				\
365 		cv_signal(&(sqp)->sq_async);				\
366 		mutex_exit(&(sqp)->sq_lock);				\
367 	}								\
368 	ASSERT(MUTEX_NOT_HELD(&(sqp)->sq_lock)); 			\
369 }
370 
371 #define	ENQUEUE_MP(sqp, mp, proc, arg) {			\
372 	/*							\
373 	 * Enque our mblk.					\
374 	 */							\
375 	(mp)->b_queue = NULL;					\
376 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
377 	ASSERT((mp)->b_prev == NULL && (mp)->b_next == NULL); 	\
378 	(mp)->b_queue = (queue_t *)(proc);			\
379 	(mp)->b_prev = (mblk_t *)(arg);				\
380 								\
381 	if ((sqp)->sq_last != NULL)				\
382 		(sqp)->sq_last->b_next = (mp);			\
383 	else							\
384 		(sqp)->sq_first = (mp);				\
385 	(sqp)->sq_last = (mp);					\
386 	(sqp)->sq_count++;					\
387 	ASSERT((sqp)->sq_count > 0);				\
388 	DTRACE_PROBE2(squeue__enqueue, squeue_t *, sqp,		\
389 	    mblk_t *, mp);					\
390 }
391 
392 
393 #define	ENQUEUE_CHAIN(sqp, mp, tail, cnt) {			\
394 	/*							\
395 	 * Enqueue our mblk chain.				\
396 	 */							\
397 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
398 								\
399 	if ((sqp)->sq_last != NULL)				\
400 		(sqp)->sq_last->b_next = (mp);			\
401 	else							\
402 		(sqp)->sq_first = (mp);				\
403 	(sqp)->sq_last = (tail);				\
404 	(sqp)->sq_count += (cnt);				\
405 	ASSERT((sqp)->sq_count > 0);				\
406 	DTRACE_PROBE4(squeue__enqueuechain, squeue_t *, sqp,	\
407 		mblk_t *, mp, mblk_t *, tail, int, cnt);	\
408 								\
409 }
410 
411 #define	SQS_POLLING_ON(sqp, rx_ring) {				\
412 	ASSERT(rx_ring != NULL);				\
413 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
414 	rx_ring->rr_blank(rx_ring->rr_handle,			\
415 	    MIN((sqp->sq_avg_drain_time * sqp->sq_count),	\
416 		rx_ring->rr_max_blank_time),			\
417 		rx_ring->rr_max_pkt_cnt);			\
418 	rx_ring->rr_poll_state |= ILL_POLLING;			\
419 	rx_ring->rr_poll_time = lbolt;				\
420 }
421 
422 
423 #define	SQS_POLLING_OFF(sqp, rx_ring) {				\
424 	ASSERT(rx_ring != NULL);				\
425 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
426 	rx_ring->rr_blank(rx_ring->rr_handle,			\
427 	    rx_ring->rr_min_blank_time,				\
428 	    rx_ring->rr_min_pkt_cnt);				\
429 }
430 
431 void
432 squeue_init(void)
433 {
434 	squeue_cache = kmem_cache_create("squeue_cache",
435 	    sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0);
436 
437 	squeue_intrdrain_tick = MSEC_TO_TICK_ROUNDUP(squeue_intrdrain_ms);
438 	squeue_writerdrain_tick = MSEC_TO_TICK_ROUNDUP(squeue_writerdrain_ms);
439 	squeue_workerdrain_tick = MSEC_TO_TICK_ROUNDUP(squeue_workerdrain_ms);
440 	squeue_workerwait_tick = MSEC_TO_TICK_ROUNDUP(squeue_workerwait_ms);
441 }
442 
443 /* ARGSUSED */
444 squeue_t *
445 squeue_create(char *name, processorid_t bind, clock_t wait, pri_t pri)
446 {
447 	squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP);
448 
449 	bzero(sqp, sizeof (squeue_t));
450 	(void) strncpy(sqp->sq_name, name, SQ_NAMELEN + 1);
451 	sqp->sq_name[SQ_NAMELEN] = '\0';
452 
453 	sqp->sq_bind = bind;
454 	sqp->sq_wait = MSEC_TO_TICK(wait);
455 	sqp->sq_avg_drain_time =
456 	    drv_hztousec(squeue_intrdrain_tick)/squeue_intrdrain_tick;
457 
458 #if SQUEUE_PROFILE
459 	if ((sqp->sq_kstat = kstat_create("ip", bind, name,
460 		"net", KSTAT_TYPE_NAMED,
461 		sizeof (squeue_kstat) / sizeof (kstat_named_t),
462 		KSTAT_FLAG_VIRTUAL)) != NULL) {
463 		sqp->sq_kstat->ks_lock = &squeue_kstat_lock;
464 		sqp->sq_kstat->ks_data = &squeue_kstat;
465 		sqp->sq_kstat->ks_update = squeue_kstat_update;
466 		sqp->sq_kstat->ks_private = sqp;
467 		kstat_install(sqp->sq_kstat);
468 	}
469 #endif
470 
471 	sqp->sq_worker = thread_create(NULL, 0, squeue_worker,
472 	    sqp, 0, &p0, TS_RUN, pri);
473 
474 	return (sqp);
475 }
476 
477 /* ARGSUSED */
478 void
479 squeue_bind(squeue_t *sqp, processorid_t bind)
480 {
481 	ASSERT(bind == -1);
482 
483 	mutex_enter(&sqp->sq_lock);
484 	if (sqp->sq_state & SQS_BOUND) {
485 		mutex_exit(&sqp->sq_lock);
486 		return;
487 	}
488 
489 	sqp->sq_state |= SQS_BOUND;
490 	mutex_exit(&sqp->sq_lock);
491 
492 	thread_affinity_set(sqp->sq_worker, sqp->sq_bind);
493 }
494 
495 void
496 squeue_unbind(squeue_t *sqp)
497 {
498 	mutex_enter(&sqp->sq_lock);
499 	if (!(sqp->sq_state & SQS_BOUND)) {
500 		mutex_exit(&sqp->sq_lock);
501 		return;
502 	}
503 
504 	sqp->sq_state &= ~SQS_BOUND;
505 	mutex_exit(&sqp->sq_lock);
506 
507 	thread_affinity_clear(sqp->sq_worker);
508 }
509 
510 /*
511  * squeue_enter() - enter squeue sqp with mblk mp (which can be
512  * a chain), while tail points to the end and cnt in number of
513  * mblks in the chain.
514  *
515  * For a chain of single packet (i.e. mp == tail), go through the
516  * fast path if no one is processing the squeue and nothing is queued.
517  *
518  * The proc and arg for each mblk is already stored in the mblk in
519  * appropriate places.
520  */
521 void
522 squeue_enter_chain(squeue_t *sqp, mblk_t *mp, mblk_t *tail,
523     uint32_t cnt, uint8_t tag)
524 {
525 	int		interrupt = servicing_interrupt();
526 	void 		*arg;
527 	sqproc_t	proc;
528 #if SQUEUE_PROFILE
529 	hrtime_t 	start, delta;
530 #endif
531 
532 	ASSERT(sqp != NULL);
533 	ASSERT(mp != NULL);
534 	ASSERT(tail != NULL);
535 	ASSERT(cnt > 0);
536 	ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
537 
538 	mutex_enter(&sqp->sq_lock);
539 	if (!(sqp->sq_state & SQS_PROC)) {
540 		/*
541 		 * See if anything is already queued. If we are the
542 		 * first packet, do inline processing else queue the
543 		 * packet and do the drain.
544 		 */
545 		sqp->sq_run = curthread;
546 		if (sqp->sq_first == NULL && cnt == 1) {
547 			/*
548 			 * Fast-path, ok to process and nothing queued.
549 			 */
550 			sqp->sq_state |= (SQS_PROC|SQS_FAST);
551 			mutex_exit(&sqp->sq_lock);
552 
553 			/*
554 			 * We are the chain of 1 packet so
555 			 * go through this fast path.
556 			 */
557 			arg = mp->b_prev;
558 			mp->b_prev = NULL;
559 			proc = (sqproc_t)mp->b_queue;
560 			mp->b_queue = NULL;
561 
562 			ASSERT(proc != NULL);
563 			ASSERT(arg != NULL);
564 			ASSERT(mp->b_next == NULL);
565 
566 #if SQUEUE_DEBUG
567 			sqp->sq_isintr = interrupt;
568 			sqp->sq_curmp = mp;
569 			sqp->sq_curproc = proc;
570 			sqp->sq_connp = arg;
571 			mp->b_tag = sqp->sq_tag = tag;
572 #endif
573 #if SQUEUE_PROFILE
574 			if (SQ_PROFILING(sqp)) {
575 				if (interrupt)
576 					SQSTAT(sqp, sq_npackets_intr);
577 				else
578 					SQSTAT(sqp, sq_npackets_other);
579 				start = gethrtime();
580 			}
581 #endif
582 			((conn_t *)arg)->conn_on_sqp = B_TRUE;
583 			DTRACE_PROBE3(squeue__proc__start, squeue_t *,
584 			    sqp, mblk_t *, mp, conn_t *, arg);
585 			(*proc)(arg, mp, sqp);
586 			DTRACE_PROBE2(squeue__proc__end, squeue_t *,
587 			    sqp, conn_t *, arg);
588 			((conn_t *)arg)->conn_on_sqp = B_FALSE;
589 
590 #if SQUEUE_PROFILE
591 			if (SQ_PROFILING(sqp)) {
592 				delta = gethrtime() - start;
593 				if (interrupt)
594 					SQDELTA(sqp, sq_time_intr, delta);
595 				else
596 					SQDELTA(sqp, sq_time_other, delta);
597 			}
598 #endif
599 #if SQUEUE_DEBUG
600 			sqp->sq_curmp = NULL;
601 			sqp->sq_curproc = NULL;
602 			sqp->sq_connp = NULL;
603 			sqp->sq_isintr = 0;
604 #endif
605 
606 			CONN_DEC_REF((conn_t *)arg);
607 			ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
608 			mutex_enter(&sqp->sq_lock);
609 			sqp->sq_state &= ~(SQS_PROC|SQS_FAST);
610 			if (sqp->sq_first == NULL) {
611 				/*
612 				 * We processed inline our packet and
613 				 * nothing new has arrived. We are done.
614 				 */
615 				sqp->sq_run = NULL;
616 				mutex_exit(&sqp->sq_lock);
617 				return;
618 			} else if (sqp->sq_bind != CPU->cpu_id) {
619 				/*
620 				 * If the current thread is not running
621 				 * on the CPU to which this squeue is bound,
622 				 * then don't allow it to drain.
623 				 */
624 				sqp->sq_run = NULL;
625 				SQUEUE_WORKER_WAKEUP(sqp);
626 				return;
627 			}
628 		} else {
629 			ENQUEUE_CHAIN(sqp, mp, tail, cnt);
630 #if SQUEUE_DEBUG
631 			mp->b_tag = tag;
632 #endif
633 #if SQUEUE_PROFILE
634 			if (SQ_PROFILING(sqp)) {
635 				if (servicing_interrupt())
636 					SQSTAT(sqp, sq_nqueued_intr);
637 				else
638 					SQSTAT(sqp, sq_nqueued_other);
639 				if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
640 					sqp->sq_stats.sq_max_qlen =
641 					    sqp->sq_count;
642 			}
643 #endif
644 		}
645 
646 		/*
647 		 * We are here because either we couldn't do inline
648 		 * processing (because something was already queued),
649 		 * or we had a chanin of more than one packet,
650 		 * or something else arrived after we were done with
651 		 * inline processing.
652 		 */
653 		ASSERT(MUTEX_HELD(&sqp->sq_lock));
654 		ASSERT(sqp->sq_first != NULL);
655 
656 #if SQUEUE_PROFILE
657 		if (SQ_PROFILING(sqp)) {
658 			start = gethrtime();
659 		}
660 #endif
661 #if SQUEUE_DEBUG
662 		sqp->sq_isintr = interrupt;
663 #endif
664 
665 		if (interrupt) {
666 			squeue_drain(sqp, SQS_ENTER, lbolt +
667 			    squeue_intrdrain_tick);
668 		} else {
669 			squeue_drain(sqp, SQS_USER, lbolt +
670 			    squeue_writerdrain_tick);
671 		}
672 
673 #if SQUEUE_PROFILE
674 		if (SQ_PROFILING(sqp)) {
675 			delta = gethrtime() - start;
676 			if (interrupt)
677 				SQDELTA(sqp, sq_time_intr, delta);
678 			else
679 				SQDELTA(sqp, sq_time_other, delta);
680 		}
681 #endif
682 #if SQUEUE_DEBUG
683 		sqp->sq_isintr = 0;
684 #endif
685 
686 		/*
687 		 * If we didn't do a complete drain, the worker
688 		 * thread was already signalled by squeue_drain.
689 		 */
690 		sqp->sq_run = NULL;
691 		mutex_exit(&sqp->sq_lock);
692 		return;
693 	} else {
694 		ASSERT(sqp->sq_run != NULL);
695 		/*
696 		 * Queue is already being processed. Just enqueue
697 		 * the packet and go away.
698 		 */
699 #if SQUEUE_DEBUG
700 		mp->b_tag = tag;
701 #endif
702 #if SQUEUE_PROFILE
703 		if (SQ_PROFILING(sqp)) {
704 			if (servicing_interrupt())
705 				SQSTAT(sqp, sq_nqueued_intr);
706 			else
707 				SQSTAT(sqp, sq_nqueued_other);
708 			if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
709 				sqp->sq_stats.sq_max_qlen = sqp->sq_count;
710 		}
711 #endif
712 
713 		ENQUEUE_CHAIN(sqp, mp, tail, cnt);
714 		mutex_exit(&sqp->sq_lock);
715 		return;
716 	}
717 }
718 
719 /*
720  * squeue_enter() - enter squeue *sqp with mblk *mp with argument of *arg.
721  */
722 void
723 squeue_enter(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void *arg,
724     uint8_t tag)
725 {
726 	int	interrupt = servicing_interrupt();
727 #if SQUEUE_PROFILE
728 	hrtime_t start, delta;
729 #endif
730 #if SQUEUE_DEBUG
731 	conn_t 	*connp = (conn_t *)arg;
732 	ASSERT(!IPCL_IS_TCP(connp) || connp->conn_tcp->tcp_connp == connp);
733 	ASSERT(!IPCL_IS_UDP(connp) || connp->conn_udp->udp_connp == connp);
734 #endif
735 
736 	ASSERT(proc != NULL);
737 	ASSERT(sqp != NULL);
738 	ASSERT(mp != NULL);
739 	ASSERT(mp->b_next == NULL);
740 	ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
741 
742 	mutex_enter(&sqp->sq_lock);
743 	if (!(sqp->sq_state & SQS_PROC)) {
744 		/*
745 		 * See if anything is already queued. If we are the
746 		 * first packet, do inline processing else queue the
747 		 * packet and do the drain.
748 		 */
749 		sqp->sq_run = curthread;
750 		if (sqp->sq_first == NULL) {
751 			/*
752 			 * Fast-path, ok to process and nothing queued.
753 			 */
754 			sqp->sq_state |= (SQS_PROC|SQS_FAST);
755 			mutex_exit(&sqp->sq_lock);
756 
757 #if SQUEUE_DEBUG
758 			sqp->sq_isintr = interrupt;
759 			sqp->sq_curmp = mp;
760 			sqp->sq_curproc = proc;
761 			sqp->sq_connp = connp;
762 			mp->b_tag = sqp->sq_tag = tag;
763 #endif
764 #if SQUEUE_PROFILE
765 			if (SQ_PROFILING(sqp)) {
766 				if (interrupt)
767 					SQSTAT(sqp, sq_npackets_intr);
768 				else
769 					SQSTAT(sqp, sq_npackets_other);
770 				start = gethrtime();
771 			}
772 #endif
773 			((conn_t *)arg)->conn_on_sqp = B_TRUE;
774 			DTRACE_PROBE3(squeue__proc__start, squeue_t *,
775 			    sqp, mblk_t *, mp, conn_t *, arg);
776 			(*proc)(arg, mp, sqp);
777 			DTRACE_PROBE2(squeue__proc__end, squeue_t *,
778 			    sqp, conn_t *, arg);
779 			((conn_t *)arg)->conn_on_sqp = B_FALSE;
780 
781 #if SQUEUE_PROFILE
782 			if (SQ_PROFILING(sqp)) {
783 				delta = gethrtime() - start;
784 				if (interrupt)
785 					SQDELTA(sqp, sq_time_intr, delta);
786 				else
787 					SQDELTA(sqp, sq_time_other, delta);
788 			}
789 #endif
790 #if SQUEUE_DEBUG
791 			sqp->sq_curmp = NULL;
792 			sqp->sq_curproc = NULL;
793 			sqp->sq_connp = NULL;
794 			sqp->sq_isintr = 0;
795 #endif
796 
797 			CONN_DEC_REF((conn_t *)arg);
798 			ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
799 			mutex_enter(&sqp->sq_lock);
800 			sqp->sq_state &= ~(SQS_PROC|SQS_FAST);
801 			if (sqp->sq_first == NULL) {
802 				/*
803 				 * We processed inline our packet and
804 				 * nothing new has arrived. We are done.
805 				 */
806 				sqp->sq_run = NULL;
807 				mutex_exit(&sqp->sq_lock);
808 				return;
809 			} else if (sqp->sq_bind != CPU->cpu_id) {
810 				/*
811 				 * If the current thread is not running
812 				 * on the CPU to which this squeue is bound,
813 				 * then don't allow it to drain.
814 				 */
815 				sqp->sq_run = NULL;
816 				SQUEUE_WORKER_WAKEUP(sqp);
817 				return;
818 			}
819 		} else {
820 			ENQUEUE_MP(sqp, mp, proc, arg);
821 #if SQUEUE_DEBUG
822 			mp->b_tag = tag;
823 #endif
824 #if SQUEUE_PROFILE
825 			if (SQ_PROFILING(sqp)) {
826 				if (servicing_interrupt())
827 					SQSTAT(sqp, sq_nqueued_intr);
828 				else
829 					SQSTAT(sqp, sq_nqueued_other);
830 				if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
831 					sqp->sq_stats.sq_max_qlen =
832 					    sqp->sq_count;
833 			}
834 #endif
835 		}
836 
837 		/*
838 		 * We are here because either we couldn't do inline
839 		 * processing (because something was already queued)
840 		 * or something else arrived after we were done with
841 		 * inline processing.
842 		 */
843 		ASSERT(MUTEX_HELD(&sqp->sq_lock));
844 		ASSERT(sqp->sq_first != NULL);
845 
846 #if SQUEUE_PROFILE
847 		if (SQ_PROFILING(sqp)) {
848 			start = gethrtime();
849 		}
850 #endif
851 #if SQUEUE_DEBUG
852 		sqp->sq_isintr = interrupt;
853 #endif
854 
855 		if (interrupt) {
856 			squeue_drain(sqp, SQS_ENTER, lbolt +
857 			    squeue_intrdrain_tick);
858 		} else {
859 			squeue_drain(sqp, SQS_USER, lbolt +
860 			    squeue_writerdrain_tick);
861 		}
862 
863 #if SQUEUE_PROFILE
864 		if (SQ_PROFILING(sqp)) {
865 			delta = gethrtime() - start;
866 			if (interrupt)
867 				SQDELTA(sqp, sq_time_intr, delta);
868 			else
869 				SQDELTA(sqp, sq_time_other, delta);
870 		}
871 #endif
872 #if SQUEUE_DEBUG
873 		sqp->sq_isintr = 0;
874 #endif
875 
876 		/*
877 		 * If we didn't do a complete drain, the worker
878 		 * thread was already signalled by squeue_drain.
879 		 */
880 		sqp->sq_run = NULL;
881 		mutex_exit(&sqp->sq_lock);
882 		return;
883 	} else {
884 		ASSERT(sqp->sq_run != NULL);
885 		/*
886 		 * We let a thread processing a squeue reenter only
887 		 * once. This helps the case of incoming connection
888 		 * where a SYN-ACK-ACK that triggers the conn_ind
889 		 * doesn't have to queue the packet if listener and
890 		 * eager are on the same squeue. Also helps the
891 		 * loopback connection where the two ends are bound
892 		 * to the same squeue (which is typical on single
893 		 * CPU machines).
894 		 * We let the thread reenter only once for the fear
895 		 * of stack getting blown with multiple traversal.
896 		 */
897 		if (!(sqp->sq_state & SQS_REENTER) &&
898 		    (sqp->sq_run == curthread) &&
899 		    (((conn_t *)arg)->conn_on_sqp == B_FALSE)) {
900 			sqp->sq_state |= SQS_REENTER;
901 			mutex_exit(&sqp->sq_lock);
902 
903 			((conn_t *)arg)->conn_on_sqp = B_TRUE;
904 			DTRACE_PROBE3(squeue__proc__start, squeue_t *,
905 			    sqp, mblk_t *, mp, conn_t *, arg);
906 			(*proc)(arg, mp, sqp);
907 			DTRACE_PROBE2(squeue__proc__end, squeue_t *,
908 			    sqp, conn_t *, arg);
909 			((conn_t *)arg)->conn_on_sqp = B_FALSE;
910 			CONN_DEC_REF((conn_t *)arg);
911 
912 			mutex_enter(&sqp->sq_lock);
913 			sqp->sq_state &= ~SQS_REENTER;
914 			mutex_exit(&sqp->sq_lock);
915 			return;
916 		}
917 		/*
918 		 * Queue is already being processed. Just enqueue
919 		 * the packet and go away.
920 		 */
921 #if SQUEUE_DEBUG
922 		mp->b_tag = tag;
923 #endif
924 #if SQUEUE_PROFILE
925 		if (SQ_PROFILING(sqp)) {
926 			if (servicing_interrupt())
927 				SQSTAT(sqp, sq_nqueued_intr);
928 			else
929 				SQSTAT(sqp, sq_nqueued_other);
930 			if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
931 				sqp->sq_stats.sq_max_qlen = sqp->sq_count;
932 		}
933 #endif
934 
935 		ENQUEUE_MP(sqp, mp, proc, arg);
936 		mutex_exit(&sqp->sq_lock);
937 		return;
938 	}
939 }
940 
941 void
942 squeue_enter_nodrain(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void *arg,
943     uint8_t tag)
944 {
945 	int		interrupt = servicing_interrupt();
946 	boolean_t	being_processed;
947 #if SQUEUE_DEBUG
948 	conn_t 		*connp = (conn_t *)arg;
949 #endif
950 #if SQUEUE_PROFILE
951 	hrtime_t 	start, delta;
952 #endif
953 
954 	ASSERT(proc != NULL);
955 	ASSERT(sqp != NULL);
956 	ASSERT(mp != NULL);
957 	ASSERT(mp->b_next == NULL);
958 	ASSERT(!IPCL_IS_TCP(connp) || connp->conn_tcp->tcp_connp == connp);
959 	ASSERT(!IPCL_IS_UDP(connp) || connp->conn_udp->udp_connp == connp);
960 	ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
961 
962 	mutex_enter(&sqp->sq_lock);
963 
964 	being_processed = (sqp->sq_state & SQS_PROC);
965 	if (!being_processed && (sqp->sq_first == NULL)) {
966 		/*
967 		 * Fast-path, ok to process and nothing queued.
968 		 */
969 		sqp->sq_state |= (SQS_PROC|SQS_FAST);
970 		sqp->sq_run = curthread;
971 		mutex_exit(&sqp->sq_lock);
972 
973 #if SQUEUE_DEBUG
974 		sqp->sq_isintr = interrupt;
975 		sqp->sq_curmp = mp;
976 		sqp->sq_curproc = proc;
977 		sqp->sq_connp = connp;
978 		mp->b_tag = sqp->sq_tag = tag;
979 #endif
980 
981 #if SQUEUE_PROFILE
982 		if (SQ_PROFILING(sqp)) {
983 			if (interrupt)
984 				SQSTAT(sqp, sq_npackets_intr);
985 			else
986 				SQSTAT(sqp, sq_npackets_other);
987 			start = gethrtime();
988 		}
989 #endif
990 
991 		((conn_t *)arg)->conn_on_sqp = B_TRUE;
992 		DTRACE_PROBE3(squeue__proc__start, squeue_t *,
993 		    sqp, mblk_t *, mp, conn_t *, arg);
994 		(*proc)(arg, mp, sqp);
995 		DTRACE_PROBE2(squeue__proc__end, squeue_t *,
996 		    sqp, conn_t *, arg);
997 		((conn_t *)arg)->conn_on_sqp = B_FALSE;
998 
999 #if SQUEUE_DEBUG
1000 		sqp->sq_curmp = NULL;
1001 		sqp->sq_curproc = NULL;
1002 		sqp->sq_connp = NULL;
1003 		sqp->sq_isintr = 0;
1004 #endif
1005 #if SQUEUE_PROFILE
1006 		if (SQ_PROFILING(sqp)) {
1007 			delta = gethrtime() - start;
1008 			if (interrupt)
1009 				SQDELTA(sqp, sq_time_intr, delta);
1010 			else
1011 				SQDELTA(sqp, sq_time_other, delta);
1012 		}
1013 #endif
1014 
1015 		CONN_DEC_REF((conn_t *)arg);
1016 		mutex_enter(&sqp->sq_lock);
1017 		sqp->sq_state &= ~(SQS_PROC|SQS_FAST);
1018 		sqp->sq_run = NULL;
1019 		if (sqp->sq_first == NULL) {
1020 			/*
1021 			 * We processed inline our packet and
1022 			 * nothing new has arrived. We are done.
1023 			 */
1024 			mutex_exit(&sqp->sq_lock);
1025 		} else {
1026 			SQUEUE_WORKER_WAKEUP(sqp);
1027 		}
1028 		return;
1029 	} else {
1030 		/*
1031 		 * We let a thread processing a squeue reenter only
1032 		 * once. This helps the case of incoming connection
1033 		 * where a SYN-ACK-ACK that triggers the conn_ind
1034 		 * doesn't have to queue the packet if listener and
1035 		 * eager are on the same squeue. Also helps the
1036 		 * loopback connection where the two ends are bound
1037 		 * to the same squeue (which is typical on single
1038 		 * CPU machines).
1039 		 * We let the thread reenter only once for the fear
1040 		 * of stack getting blown with multiple traversal.
1041 		 */
1042 		if (being_processed && !(sqp->sq_state & SQS_REENTER) &&
1043 		    (sqp->sq_run == curthread) &&
1044 		    (((conn_t *)arg)->conn_on_sqp == B_FALSE)) {
1045 			sqp->sq_state |= SQS_REENTER;
1046 			mutex_exit(&sqp->sq_lock);
1047 
1048 			((conn_t *)arg)->conn_on_sqp = B_TRUE;
1049 			DTRACE_PROBE3(squeue__proc__start, squeue_t *,
1050 			    sqp, mblk_t *, mp, conn_t *, arg);
1051 			(*proc)(arg, mp, sqp);
1052 			DTRACE_PROBE2(squeue__proc__end, squeue_t *,
1053 			    sqp, conn_t *, arg);
1054 			((conn_t *)arg)->conn_on_sqp = B_FALSE;
1055 			CONN_DEC_REF((conn_t *)arg);
1056 
1057 			mutex_enter(&sqp->sq_lock);
1058 			sqp->sq_state &= ~SQS_REENTER;
1059 			mutex_exit(&sqp->sq_lock);
1060 			return;
1061 		}
1062 
1063 #if SQUEUE_DEBUG
1064 		mp->b_tag = tag;
1065 #endif
1066 #if SQUEUE_PROFILE
1067 		if (SQ_PROFILING(sqp)) {
1068 			if (servicing_interrupt())
1069 				SQSTAT(sqp, sq_nqueued_intr);
1070 			else
1071 				SQSTAT(sqp, sq_nqueued_other);
1072 			if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
1073 				sqp->sq_stats.sq_max_qlen = sqp->sq_count;
1074 		}
1075 #endif
1076 		ENQUEUE_MP(sqp, mp, proc, arg);
1077 		if (being_processed) {
1078 			/*
1079 			 * Queue is already being processed.
1080 			 * No need to do anything.
1081 			 */
1082 			mutex_exit(&sqp->sq_lock);
1083 			return;
1084 		}
1085 		SQUEUE_WORKER_WAKEUP(sqp);
1086 	}
1087 }
1088 
1089 /*
1090  * squeue_fill() - fill squeue *sqp with mblk *mp with argument of *arg
1091  * without processing the squeue.
1092  */
1093 /* ARGSUSED */
1094 void
1095 squeue_fill(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void * arg,
1096     uint8_t tag)
1097 {
1098 #if SQUEUE_DEBUG
1099 	conn_t *connp = (conn_t *)arg;
1100 #endif
1101 	ASSERT(proc != NULL);
1102 	ASSERT(sqp != NULL);
1103 	ASSERT(mp != NULL);
1104 	ASSERT(mp->b_next == NULL);
1105 	ASSERT(!IPCL_IS_TCP(connp) || connp->conn_tcp->tcp_connp == connp);
1106 	ASSERT(!IPCL_IS_UDP(connp) || connp->conn_udp->udp_connp == connp);
1107 
1108 	ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
1109 	mutex_enter(&sqp->sq_lock);
1110 	ENQUEUE_MP(sqp, mp, proc, arg);
1111 #if SQUEUE_DEBUG
1112 	mp->b_tag = tag;
1113 #endif
1114 #if SQUEUE_PROFILE
1115 	if (SQ_PROFILING(sqp)) {
1116 		if (servicing_interrupt())
1117 			SQSTAT(sqp, sq_nqueued_intr);
1118 		else
1119 			SQSTAT(sqp, sq_nqueued_other);
1120 		if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
1121 			sqp->sq_stats.sq_max_qlen = sqp->sq_count;
1122 	}
1123 #endif
1124 
1125 	/*
1126 	 * If queue is already being processed. No need to do anything.
1127 	 */
1128 	if (sqp->sq_state & SQS_PROC) {
1129 		mutex_exit(&sqp->sq_lock);
1130 		return;
1131 	}
1132 
1133 	SQUEUE_WORKER_WAKEUP(sqp);
1134 }
1135 
1136 
1137 /*
1138  * PRIVATE FUNCTIONS
1139  */
1140 
1141 static void
1142 squeue_fire(void *arg)
1143 {
1144 	squeue_t	*sqp = arg;
1145 	uint_t		state;
1146 
1147 	mutex_enter(&sqp->sq_lock);
1148 
1149 	state = sqp->sq_state;
1150 	if (sqp->sq_tid == 0 && !(state & SQS_TMO_PROG)) {
1151 		mutex_exit(&sqp->sq_lock);
1152 		return;
1153 	}
1154 
1155 	sqp->sq_tid = 0;
1156 	/*
1157 	 * The timeout fired before we got a chance to set it.
1158 	 * Process it anyway but remove the SQS_TMO_PROG so that
1159 	 * the guy trying to set the timeout knows that it has
1160 	 * already been processed.
1161 	 */
1162 	if (state & SQS_TMO_PROG)
1163 		sqp->sq_state &= ~SQS_TMO_PROG;
1164 
1165 	if (!(state & SQS_PROC)) {
1166 		sqp->sq_awaken = lbolt;
1167 		cv_signal(&sqp->sq_async);
1168 	}
1169 	mutex_exit(&sqp->sq_lock);
1170 }
1171 
1172 static void
1173 squeue_drain(squeue_t *sqp, uint_t proc_type, clock_t expire)
1174 {
1175 	mblk_t	*mp;
1176 	mblk_t 	*head;
1177 	sqproc_t proc;
1178 	conn_t	*connp;
1179 	clock_t	start = lbolt;
1180 	clock_t	drain_time;
1181 	timeout_id_t tid;
1182 	uint_t	cnt;
1183 	uint_t	total_cnt = 0;
1184 	ill_rx_ring_t	*sq_rx_ring = sqp->sq_rx_ring;
1185 	int	interrupt = servicing_interrupt();
1186 	boolean_t poll_on = B_FALSE;
1187 
1188 	ASSERT(mutex_owned(&sqp->sq_lock));
1189 	ASSERT(!(sqp->sq_state & SQS_PROC));
1190 
1191 #if SQUEUE_PROFILE
1192 	if (SQ_PROFILING(sqp)) {
1193 		if (interrupt)
1194 			SQSTAT(sqp, sq_ndrains_intr);
1195 		else if (!(proc_type & SQS_WORKER))
1196 			SQSTAT(sqp, sq_ndrains_other);
1197 		else
1198 			SQSTAT(sqp, sq_ndrains_worker);
1199 	}
1200 #endif
1201 
1202 	if ((tid = sqp->sq_tid) != 0)
1203 		sqp->sq_tid = 0;
1204 
1205 	sqp->sq_state |= SQS_PROC | proc_type;
1206 	head = sqp->sq_first;
1207 	sqp->sq_first = NULL;
1208 	sqp->sq_last = NULL;
1209 	cnt = sqp->sq_count;
1210 
1211 	/*
1212 	 * We have backlog built up. Switch to polling mode if the
1213 	 * device underneath allows it. Need to do it only for
1214 	 * drain by non-interrupt thread so interrupts don't
1215 	 * come and disrupt us in between. If its a interrupt thread,
1216 	 * no need because most devices will not issue another
1217 	 * interrupt till this one returns.
1218 	 */
1219 	if ((sqp->sq_state & SQS_POLL_CAPAB) && !(proc_type & SQS_ENTER) &&
1220 		(sqp->sq_count > squeue_worker_poll_min)) {
1221 		ASSERT(sq_rx_ring != NULL);
1222 		SQS_POLLING_ON(sqp, sq_rx_ring);
1223 		poll_on = B_TRUE;
1224 	}
1225 
1226 	mutex_exit(&sqp->sq_lock);
1227 
1228 	if (tid != 0)
1229 		(void) untimeout(tid);
1230 again:
1231 	while ((mp = head) != NULL) {
1232 		head = mp->b_next;
1233 		mp->b_next = NULL;
1234 
1235 		proc = (sqproc_t)mp->b_queue;
1236 		mp->b_queue = NULL;
1237 		connp = (conn_t *)mp->b_prev;
1238 		mp->b_prev = NULL;
1239 #if SQUEUE_DEBUG
1240 		sqp->sq_curmp = mp;
1241 		sqp->sq_curproc = proc;
1242 		sqp->sq_connp = connp;
1243 		sqp->sq_tag = mp->b_tag;
1244 #endif
1245 
1246 #if SQUEUE_PROFILE
1247 		if (SQ_PROFILING(sqp)) {
1248 			if (interrupt)
1249 				SQSTAT(sqp, sq_npackets_intr);
1250 			else if (!(proc_type & SQS_WORKER))
1251 				SQSTAT(sqp, sq_npackets_other);
1252 			else
1253 				SQSTAT(sqp, sq_npackets_worker);
1254 		}
1255 #endif
1256 
1257 		connp->conn_on_sqp = B_TRUE;
1258 		DTRACE_PROBE3(squeue__proc__start, squeue_t *,
1259 		    sqp, mblk_t *, mp, conn_t *, connp);
1260 		(*proc)(connp, mp, sqp);
1261 		DTRACE_PROBE2(squeue__proc__end, squeue_t *,
1262 		    sqp, conn_t *, connp);
1263 		connp->conn_on_sqp = B_FALSE;
1264 		CONN_DEC_REF(connp);
1265 	}
1266 
1267 
1268 #if SQUEUE_DEBUG
1269 	sqp->sq_curmp = NULL;
1270 	sqp->sq_curproc = NULL;
1271 	sqp->sq_connp = NULL;
1272 #endif
1273 
1274 	mutex_enter(&sqp->sq_lock);
1275 	sqp->sq_count -= cnt;
1276 	total_cnt += cnt;
1277 
1278 	if (sqp->sq_first != NULL) {
1279 		if (!expire || (lbolt < expire)) {
1280 			/* More arrived and time not expired */
1281 			head = sqp->sq_first;
1282 			sqp->sq_first = NULL;
1283 			sqp->sq_last = NULL;
1284 			cnt = sqp->sq_count;
1285 			mutex_exit(&sqp->sq_lock);
1286 			goto again;
1287 		}
1288 
1289 		/*
1290 		 * If we are not worker thread and we
1291 		 * reached our time limit to do drain,
1292 		 * signal the worker thread to pick
1293 		 * up the work.
1294 		 * If we were the worker thread, then
1295 		 * we take a break to allow an interrupt
1296 		 * or writer to pick up the load.
1297 		 */
1298 		if (proc_type != SQS_WORKER) {
1299 			sqp->sq_awaken = lbolt;
1300 			cv_signal(&sqp->sq_async);
1301 		}
1302 	}
1303 
1304 	/*
1305 	 * Try to see if we can get a time estimate to process a packet.
1306 	 * Do it only in interrupt context since less chance of context
1307 	 * switch or pinning etc. to get a better estimate.
1308 	 */
1309 	if (interrupt && ((drain_time = (lbolt - start)) > 0))
1310 		sqp->sq_avg_drain_time = ((80 * sqp->sq_avg_drain_time) +
1311 		    (20 * (drv_hztousec(drain_time)/total_cnt)))/100;
1312 
1313 	sqp->sq_state &= ~(SQS_PROC | proc_type);
1314 
1315 	/*
1316 	 * If polling was turned on, turn it off and reduce the default
1317 	 * interrupt blank interval as well to bring new packets in faster
1318 	 * (reduces the latency when there is no backlog).
1319 	 */
1320 	if (poll_on && (sqp->sq_state & SQS_POLL_CAPAB)) {
1321 		ASSERT(sq_rx_ring != NULL);
1322 		SQS_POLLING_OFF(sqp, sq_rx_ring);
1323 	}
1324 }
1325 
1326 static void
1327 squeue_worker(squeue_t *sqp)
1328 {
1329 	kmutex_t *lock = &sqp->sq_lock;
1330 	kcondvar_t *async = &sqp->sq_async;
1331 	callb_cpr_t cprinfo;
1332 #if SQUEUE_PROFILE
1333 	hrtime_t start;
1334 #endif
1335 
1336 	CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "nca");
1337 	mutex_enter(lock);
1338 
1339 	for (;;) {
1340 		while (sqp->sq_first == NULL || (sqp->sq_state & SQS_PROC)) {
1341 			CALLB_CPR_SAFE_BEGIN(&cprinfo);
1342 still_wait:
1343 			cv_wait(async, lock);
1344 			if (sqp->sq_state & SQS_PROC) {
1345 				goto still_wait;
1346 			}
1347 			CALLB_CPR_SAFE_END(&cprinfo, lock);
1348 		}
1349 
1350 #if SQUEUE_PROFILE
1351 		if (SQ_PROFILING(sqp)) {
1352 			start = gethrtime();
1353 		}
1354 #endif
1355 
1356 		ASSERT(squeue_workerdrain_tick != 0);
1357 		sqp->sq_run = curthread;
1358 		squeue_drain(sqp, SQS_WORKER, lbolt +  squeue_workerdrain_tick);
1359 		sqp->sq_run = NULL;
1360 
1361 		if (sqp->sq_first != NULL) {
1362 			/*
1363 			 * Doing too much processing by worker thread
1364 			 * in presense of interrupts can be sub optimal.
1365 			 * Instead, once a drain is done by worker thread
1366 			 * for squeue_writerdrain_ms (the reason we are
1367 			 * here), we force wait for squeue_workerwait_tick
1368 			 * before doing more processing even if sq_wait is
1369 			 * set to 0.
1370 			 *
1371 			 * This can be counterproductive for performance
1372 			 * if worker thread is the only means to process
1373 			 * the packets (interrupts or writers are not
1374 			 * allowed inside the squeue).
1375 			 */
1376 			if (sqp->sq_tid == 0 &&
1377 			    !(sqp->sq_state & SQS_TMO_PROG)) {
1378 				timeout_id_t	tid;
1379 
1380 				sqp->sq_state |= SQS_TMO_PROG;
1381 				mutex_exit(&sqp->sq_lock);
1382 				tid = timeout(squeue_fire, sqp,
1383 				    squeue_workerwait_tick);
1384 				mutex_enter(&sqp->sq_lock);
1385 				/*
1386 				 * Check again if we still need
1387 				 * the timeout
1388 				 */
1389 				if (((sqp->sq_state & (SQS_TMO_PROG|SQS_PROC))
1390 				    == SQS_TMO_PROG) && (sqp->sq_tid == 0) &&
1391 				    (sqp->sq_first != NULL)) {
1392 					sqp->sq_state &= ~SQS_TMO_PROG;
1393 					sqp->sq_awaken = lbolt;
1394 					sqp->sq_tid = tid;
1395 				} else if (sqp->sq_state & SQS_TMO_PROG) {
1396 					/* timeout not needed */
1397 					sqp->sq_state &= ~SQS_TMO_PROG;
1398 					mutex_exit(&(sqp)->sq_lock);
1399 					(void) untimeout(tid);
1400 					mutex_enter(&sqp->sq_lock);
1401 				}
1402 			}
1403 			CALLB_CPR_SAFE_BEGIN(&cprinfo);
1404 			cv_wait(async, lock);
1405 			CALLB_CPR_SAFE_END(&cprinfo, lock);
1406 		}
1407 
1408 
1409 #if SQUEUE_PROFILE
1410 		if (SQ_PROFILING(sqp)) {
1411 			SQDELTA(sqp, sq_time_worker, gethrtime() - start);
1412 		}
1413 #endif
1414 	}
1415 }
1416 
1417 #if SQUEUE_PROFILE
1418 static int
1419 squeue_kstat_update(kstat_t *ksp, int rw)
1420 {
1421 	struct squeue_kstat *sqsp = &squeue_kstat;
1422 	squeue_t *sqp = ksp->ks_private;
1423 
1424 	if (rw == KSTAT_WRITE)
1425 		return (EACCES);
1426 
1427 #if SQUEUE_DEBUG
1428 	sqsp->sq_count.value.ui64 = sqp->sq_count;
1429 	sqsp->sq_max_qlen.value.ui64 = sqp->sq_stats.sq_max_qlen;
1430 #endif
1431 	sqsp->sq_npackets_worker.value.ui64 = sqp->sq_stats.sq_npackets_worker;
1432 	sqsp->sq_npackets_intr.value.ui64 = sqp->sq_stats.sq_npackets_intr;
1433 	sqsp->sq_npackets_other.value.ui64 = sqp->sq_stats.sq_npackets_other;
1434 	sqsp->sq_nqueued_intr.value.ui64 = sqp->sq_stats.sq_nqueued_intr;
1435 	sqsp->sq_nqueued_other.value.ui64 = sqp->sq_stats.sq_nqueued_other;
1436 	sqsp->sq_ndrains_worker.value.ui64 = sqp->sq_stats.sq_ndrains_worker;
1437 	sqsp->sq_ndrains_intr.value.ui64 = sqp->sq_stats.sq_ndrains_intr;
1438 	sqsp->sq_ndrains_other.value.ui64 = sqp->sq_stats.sq_ndrains_other;
1439 	sqsp->sq_time_worker.value.ui64 = sqp->sq_stats.sq_time_worker;
1440 	sqsp->sq_time_intr.value.ui64 = sqp->sq_stats.sq_time_intr;
1441 	sqsp->sq_time_other.value.ui64 = sqp->sq_stats.sq_time_other;
1442 	return (0);
1443 }
1444 #endif
1445 
1446 void
1447 squeue_profile_enable(squeue_t *sqp)
1448 {
1449 	mutex_enter(&sqp->sq_lock);
1450 	sqp->sq_state |= SQS_PROFILE;
1451 	mutex_exit(&sqp->sq_lock);
1452 }
1453 
1454 void
1455 squeue_profile_disable(squeue_t *sqp)
1456 {
1457 	mutex_enter(&sqp->sq_lock);
1458 	sqp->sq_state &= ~SQS_PROFILE;
1459 	mutex_exit(&sqp->sq_lock);
1460 }
1461 
1462 void
1463 squeue_profile_reset(squeue_t *sqp)
1464 {
1465 #if SQUEUE_PROFILE
1466 	bzero(&sqp->sq_stats, sizeof (sqstat_t));
1467 #endif
1468 }
1469 
1470 void
1471 squeue_profile_start(void)
1472 {
1473 #if SQUEUE_PROFILE
1474 	squeue_profile = B_TRUE;
1475 #endif
1476 }
1477 
1478 void
1479 squeue_profile_stop(void)
1480 {
1481 #if SQUEUE_PROFILE
1482 	squeue_profile = B_FALSE;
1483 #endif
1484 }
1485 
1486 uintptr_t *
1487 squeue_getprivate(squeue_t *sqp, sqprivate_t p)
1488 {
1489 	ASSERT(p < SQPRIVATE_MAX);
1490 
1491 	return (&sqp->sq_private[p]);
1492 }
1493 
1494 processorid_t
1495 squeue_binding(squeue_t *sqp)
1496 {
1497 	return (sqp->sq_bind);
1498 }
1499