xref: /titanic_41/usr/src/uts/common/inet/squeue.c (revision 0b6016e6ff70af39f99c9cc28e0c2207c8f5413c)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*
23  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 /*
30  * Squeues - TCP/IP serialization mechanism.
31  *
32  * This is a general purpose high-performance serialization mechanism. It is
33  * similar to a taskq with a single worker thread, the difference is that it
34  * does not imply a context switch - the thread placing a request may actually
35  * process it. It is also biased for processing requests in interrupt context.
36  *
37  * Each squeue has a worker thread which may optionally be bound to a CPU.
38  *
39  * Only one thread may process requests from a given squeue at any time. This is
40  * called "entering" squeue.
41  *
42  * Each dispatched request is processed either by
43  *
44  *	a) Dispatching thread or
45  *	b) Some other thread that is currently processing squeue at the time of
46  *		request or
47  *	c) worker thread.
48  *
49  * INTERFACES:
50  *
51  * squeue_t *squeue_create(name, bind, wait, pri)
52  *
53  *	name: symbolic name for squeue.
54  *	wait: time to wait before waiking the worker thread after queueing
55  *		request.
56  *	bind: preferred CPU binding for the worker thread.
57  *	pri:  thread priority for the worker thread.
58  *
59  *   This function never fails and may sleep. It returns a transparent pointer
60  *   to the squeue_t structure that is passed to all other squeue operations.
61  *
62  * void squeue_bind(sqp, bind)
63  *
64  *   Bind squeue worker thread to a CPU specified by the 'bind' argument. The
65  *   'bind' value of -1 binds to the preferred thread specified for
66  *   squeue_create.
67  *
68  *   NOTE: Any value of 'bind' other then -1 is not supported currently, but the
69  *	 API is present - in the future it may be useful to specify different
70  *	 binding.
71  *
72  * void squeue_unbind(sqp)
73  *
74  *   Unbind the worker thread from its preferred CPU.
75  *
76  * void squeue_enter(*sqp, *mp, proc, arg, tag)
77  *
78  *   Post a single request for processing. Each request consists of mblock 'mp',
79  *   function 'proc' to execute and an argument 'arg' to pass to this
80  *   function. The function is called as (*proc)(arg, mp, sqp); The tag is an
81  *   arbitrary number from 0 to 255 which will be stored in mp to track exact
82  *   caller of squeue_enter. The combination of function name and the tag should
83  *   provide enough information to identify the caller.
84  *
85  *   If no one is processing the squeue, squeue_enter() will call the function
86  *   immediately. Otherwise it will add the request to the queue for later
87  *   processing. Once the function is executed, the thread may continue
88  *   executing all other requests pending on the queue.
89  *
90  *   NOTE: The tagging information is only used when SQUEUE_DEBUG is set to 1.
91  *   NOTE: The argument can be conn_t only. Ideally we'd like to have generic
92  *	   argument, but we want to drop connection reference count here - this
93  *	   improves tail-call optimizations.
94  *	   XXX: The arg should have type conn_t.
95  *
96  * void squeue_enter_nodrain(*sqp, *mp, proc, arg, tag)
97  *
98  *   Same as squeue_enter(), but the entering thread will only try to execute a
99  *   single request. It will not continue executing any pending requests.
100  *
101  * void squeue_fill(*sqp, *mp, proc, arg, tag)
102  *
103  *   Just place the request on the queue without trying to execute it. Arrange
104  *   for the worker thread to process the request.
105  *
106  * void squeue_profile_enable(sqp)
107  * void squeue_profile_disable(sqp)
108  *
109  *    Enable or disable profiling for specified 'sqp'. Profiling is only
110  *    available when SQUEUE_PROFILE is set.
111  *
112  * void squeue_profile_reset(sqp)
113  *
114  *    Reset all profiling information to zero. Profiling is only
115  *    available when SQUEUE_PROFILE is set.
116  *
117  * void squeue_profile_start()
118  * void squeue_profile_stop()
119  *
120  *    Globally enable or disabled profiling for all squeues.
121  *
122  * uintptr_t *squeue_getprivate(sqp, p)
123  *
124  *    Each squeue keeps small amount of private data space available for various
125  *    consumers. Current consumers include TCP and NCA. Other consumers need to
126  *    add their private tag to the sqprivate_t enum. The private information is
127  *    limited to an uintptr_t value. The squeue has no knowledge of its content
128  *    and does not manage it in any way.
129  *
130  *    The typical use may be a breakdown of data structures per CPU (since
131  *    squeues are usually per CPU). See NCA for examples of use.
132  *    Currently 'p' may have one legal value SQPRIVATE_TCP.
133  *
134  * processorid_t squeue_binding(sqp)
135  *
136  *    Returns the CPU binding for a given squeue.
137  *
138  * TUNABALES:
139  *
140  * squeue_intrdrain_ms: Maximum time in ms interrupts spend draining any
141  *	squeue. Note that this is approximation - squeues have no control on the
142  *	time it takes to process each request. This limit is only checked
143  *	between processing individual messages.
144  *    Default: 20 ms.
145  *
146  * squeue_writerdrain_ms: Maximum time in ms non-interrupts spend draining any
147  *	squeue. Note that this is approximation - squeues have no control on the
148  *	time it takes to process each request. This limit is only checked
149  *	between processing individual messages.
150  *    Default: 10 ms.
151  *
152  * squeue_workerdrain_ms: Maximum time in ms worker thread spends draining any
153  *	squeue. Note that this is approximation - squeues have no control on the
154  *	time it takes to process each request. This limit is only checked
155  *	between processing individual messages.
156  *    Default: 10 ms.
157  *
158  * squeue_workerwait_ms: When worker thread is interrupted because workerdrain
159  *	expired, how much time to wait before waking worker thread again.
160  *    Default: 10 ms.
161  *
162  * DEFINES:
163  *
164  * SQUEUE_DEBUG: If defined as 1, special code is compiled in which records
165  *	additional information aiding debugging is recorded in squeue.
166  *
167  * SQUEUE_PROFILE: If defined as 1, special code is compiled in which collects
168  *	various squeue statistics and exports them as kstats.
169  *
170  * Ideally we would like both SQUEUE_DEBUG and SQUEUE_PROFILE to be always set,
171  * but it affects performance, so they are enabled on DEBUG kernels and disabled
172  * on non-DEBUG by default.
173  */
174 
175 #include <sys/types.h>
176 #include <sys/cmn_err.h>
177 #include <sys/debug.h>
178 #include <sys/kmem.h>
179 #include <sys/cpuvar.h>
180 #include <sys/condvar_impl.h>
181 #include <sys/systm.h>
182 #include <sys/callb.h>
183 #include <sys/sdt.h>
184 #include <sys/ddi.h>
185 
186 #include <inet/ipclassifier.h>
187 #include <inet/udp_impl.h>
188 
189 /*
190  * State flags.
191  * Note: The MDB IP module depends on the values of these flags.
192  */
193 #define	SQS_PROC	0x0001	/* being processed */
194 #define	SQS_WORKER	0x0002	/* worker thread */
195 #define	SQS_ENTER	0x0004	/* enter thread */
196 #define	SQS_FAST	0x0008	/* enter-fast thread */
197 #define	SQS_USER	0x0010	/* A non interrupt user */
198 #define	SQS_BOUND	0x0020	/* Worker thread is bound */
199 #define	SQS_PROFILE	0x0040	/* Enable profiling */
200 #define	SQS_REENTER	0x0080	/* Re entered thread */
201 #define	SQS_TMO_PROG	0x0100	/* Timeout is being set */
202 
203 #ifdef DEBUG
204 #define	SQUEUE_DEBUG 1
205 #define	SQUEUE_PROFILE 1
206 #else
207 #define	SQUEUE_DEBUG 0
208 #define	SQUEUE_PROFILE 0
209 #endif
210 
211 #include <sys/squeue_impl.h>
212 
213 static void squeue_fire(void *);
214 static void squeue_drain(squeue_t *, uint_t, hrtime_t);
215 static void squeue_worker(squeue_t *sqp);
216 
217 #if SQUEUE_PROFILE
218 static kmutex_t squeue_kstat_lock;
219 static int  squeue_kstat_update(kstat_t *, int);
220 #endif
221 
222 kmem_cache_t *squeue_cache;
223 
224 #define	SQUEUE_MSEC_TO_NSEC 1000000
225 
226 int squeue_intrdrain_ms = 20;
227 int squeue_writerdrain_ms = 10;
228 int squeue_workerdrain_ms = 10;
229 int squeue_workerwait_ms = 10;
230 
231 /* The values above converted to ticks or nano seconds */
232 static int squeue_intrdrain_ns = 0;
233 static int squeue_writerdrain_ns = 0;
234 static int squeue_workerdrain_ns = 0;
235 static int squeue_workerwait_tick = 0;
236 
237 /*
238  * The minimum packet queued when worker thread doing the drain triggers
239  * polling (if squeue allows it). The choice of 3 is arbitrary. You
240  * definitely don't want it to be 1 since that will trigger polling
241  * on very low loads as well (ssh seems to do be one such example
242  * where packet flow was very low yet somehow 1 packet ended up getting
243  * queued and worker thread fires every 10ms and blanking also gets
244  * triggered.
245  */
246 int squeue_worker_poll_min = 3;
247 
248 #if SQUEUE_PROFILE
249 /*
250  * Set to B_TRUE to enable profiling.
251  */
252 static int squeue_profile = B_FALSE;
253 #define	SQ_PROFILING(sqp) (squeue_profile && ((sqp)->sq_state & SQS_PROFILE))
254 
255 #define	SQSTAT(sqp, x) ((sqp)->sq_stats.x++)
256 #define	SQDELTA(sqp, x, d) ((sqp)->sq_stats.x += (d))
257 
258 struct squeue_kstat {
259 	kstat_named_t	sq_count;
260 	kstat_named_t	sq_max_qlen;
261 	kstat_named_t	sq_npackets_worker;
262 	kstat_named_t	sq_npackets_intr;
263 	kstat_named_t	sq_npackets_other;
264 	kstat_named_t	sq_nqueued_intr;
265 	kstat_named_t	sq_nqueued_other;
266 	kstat_named_t	sq_ndrains_worker;
267 	kstat_named_t	sq_ndrains_intr;
268 	kstat_named_t	sq_ndrains_other;
269 	kstat_named_t	sq_time_worker;
270 	kstat_named_t	sq_time_intr;
271 	kstat_named_t	sq_time_other;
272 } squeue_kstat = {
273 	{ "count",		KSTAT_DATA_UINT64 },
274 	{ "max_qlen",		KSTAT_DATA_UINT64 },
275 	{ "packets_worker",	KSTAT_DATA_UINT64 },
276 	{ "packets_intr",	KSTAT_DATA_UINT64 },
277 	{ "packets_other",	KSTAT_DATA_UINT64 },
278 	{ "queued_intr",	KSTAT_DATA_UINT64 },
279 	{ "queued_other",	KSTAT_DATA_UINT64 },
280 	{ "ndrains_worker",	KSTAT_DATA_UINT64 },
281 	{ "ndrains_intr",	KSTAT_DATA_UINT64 },
282 	{ "ndrains_other",	KSTAT_DATA_UINT64 },
283 	{ "time_worker",	KSTAT_DATA_UINT64 },
284 	{ "time_intr",		KSTAT_DATA_UINT64 },
285 	{ "time_other",		KSTAT_DATA_UINT64 },
286 };
287 #endif
288 
289 #define	SQUEUE_WORKER_WAKEUP(sqp) {					\
290 	timeout_id_t tid = (sqp)->sq_tid;				\
291 									\
292 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));				\
293 	/*								\
294 	 * Queue isn't being processed, so take				\
295 	 * any post enqueue actions needed before leaving.		\
296 	 */								\
297 	if (tid != 0) {							\
298 		/*							\
299 		 * Waiting for an enter() to process mblk(s).		\
300 		 */							\
301 		clock_t	waited = lbolt - (sqp)->sq_awaken;		\
302 									\
303 		if (TICK_TO_MSEC(waited) >= (sqp)->sq_wait) {		\
304 			/*						\
305 			 * Times up and have a worker thread		\
306 			 * waiting for work, so schedule it.		\
307 			 */						\
308 			(sqp)->sq_tid = 0;				\
309 			(sqp)->sq_awaken = lbolt;			\
310 			cv_signal(&(sqp)->sq_async);			\
311 			mutex_exit(&(sqp)->sq_lock);			\
312 			(void) untimeout(tid);				\
313 			return;						\
314 		}							\
315 		mutex_exit(&(sqp)->sq_lock);				\
316 		return;							\
317 	} else if ((sqp)->sq_state & SQS_TMO_PROG) {			\
318 		mutex_exit(&(sqp)->sq_lock);				\
319 		return;							\
320 	} else if ((sqp)->sq_wait != 0) {				\
321 		clock_t	wait = (sqp)->sq_wait;				\
322 		/*							\
323 		 * Wait up to sqp->sq_wait ms for an			\
324 		 * enter() to process this queue. We			\
325 		 * don't want to contend on timeout locks		\
326 		 * with sq_lock held for performance reasons,		\
327 		 * so drop the sq_lock before calling timeout		\
328 		 * but we need to check if timeout is required		\
329 		 * after re acquiring the sq_lock. Once			\
330 		 * the sq_lock is dropped, someone else could		\
331 		 * have processed the packet or the timeout could	\
332 		 * have already fired.					\
333 		 */							\
334 		(sqp)->sq_state |= SQS_TMO_PROG;			\
335 		mutex_exit(&(sqp)->sq_lock);				\
336 		tid = timeout(squeue_fire, (sqp), wait);		\
337 		mutex_enter(&(sqp)->sq_lock);				\
338 		/* Check again if we still need the timeout */		\
339 		if ((((sqp)->sq_state & (SQS_PROC|SQS_TMO_PROG)) ==	\
340 			SQS_TMO_PROG) && ((sqp)->sq_tid == 0) &&	\
341 			((sqp)->sq_first != NULL)) {			\
342 				(sqp)->sq_state &= ~SQS_TMO_PROG;	\
343 				(sqp)->sq_awaken = lbolt;		\
344 				(sqp)->sq_tid = tid;			\
345 				mutex_exit(&(sqp)->sq_lock);		\
346 				return;					\
347 		} else {						\
348 			if ((sqp)->sq_state & SQS_TMO_PROG) {		\
349 				(sqp)->sq_state &= ~SQS_TMO_PROG;	\
350 				mutex_exit(&(sqp)->sq_lock);		\
351 				(void) untimeout(tid);			\
352 			} else {					\
353 				/*					\
354 				 * The timer fired before we could 	\
355 				 * reacquire the sq_lock. squeue_fire	\
356 				 * removes the SQS_TMO_PROG flag	\
357 				 * and we don't need to	do anything	\
358 				 * else.				\
359 				 */					\
360 				mutex_exit(&(sqp)->sq_lock);		\
361 			}						\
362 		}							\
363 	} else {							\
364 		/*							\
365 		 * Schedule the worker thread.				\
366 		 */							\
367 		(sqp)->sq_awaken = lbolt;				\
368 		cv_signal(&(sqp)->sq_async);				\
369 		mutex_exit(&(sqp)->sq_lock);				\
370 	}								\
371 	ASSERT(MUTEX_NOT_HELD(&(sqp)->sq_lock)); 			\
372 }
373 
374 #define	ENQUEUE_MP(sqp, mp, proc, arg) {			\
375 	/*							\
376 	 * Enque our mblk.					\
377 	 */							\
378 	(mp)->b_queue = NULL;					\
379 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
380 	ASSERT((mp)->b_prev == NULL && (mp)->b_next == NULL); 	\
381 	(mp)->b_queue = (queue_t *)(proc);			\
382 	(mp)->b_prev = (mblk_t *)(arg);				\
383 								\
384 	if ((sqp)->sq_last != NULL)				\
385 		(sqp)->sq_last->b_next = (mp);			\
386 	else							\
387 		(sqp)->sq_first = (mp);				\
388 	(sqp)->sq_last = (mp);					\
389 	(sqp)->sq_count++;					\
390 	ASSERT((sqp)->sq_count > 0);				\
391 	DTRACE_PROBE2(squeue__enqueue, squeue_t *, sqp,		\
392 	    mblk_t *, mp);					\
393 }
394 
395 
396 #define	ENQUEUE_CHAIN(sqp, mp, tail, cnt) {			\
397 	/*							\
398 	 * Enqueue our mblk chain.				\
399 	 */							\
400 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
401 								\
402 	if ((sqp)->sq_last != NULL)				\
403 		(sqp)->sq_last->b_next = (mp);			\
404 	else							\
405 		(sqp)->sq_first = (mp);				\
406 	(sqp)->sq_last = (tail);				\
407 	(sqp)->sq_count += (cnt);				\
408 	ASSERT((sqp)->sq_count > 0);				\
409 	DTRACE_PROBE4(squeue__enqueuechain, squeue_t *, sqp,	\
410 		mblk_t *, mp, mblk_t *, tail, int, cnt);	\
411 								\
412 }
413 
414 #define	SQS_POLLING_ON(sqp, rx_ring) {				\
415 	ASSERT(rx_ring != NULL);				\
416 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
417 	rx_ring->rr_blank(rx_ring->rr_handle,			\
418 	    MIN((sqp->sq_avg_drain_time * sqp->sq_count),	\
419 		rx_ring->rr_max_blank_time),			\
420 		rx_ring->rr_max_pkt_cnt);			\
421 	rx_ring->rr_poll_state |= ILL_POLLING;			\
422 	rx_ring->rr_poll_time = lbolt;				\
423 }
424 
425 
426 #define	SQS_POLLING_OFF(sqp, rx_ring) {				\
427 	ASSERT(rx_ring != NULL);				\
428 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
429 	rx_ring->rr_blank(rx_ring->rr_handle,			\
430 	    rx_ring->rr_min_blank_time,				\
431 	    rx_ring->rr_min_pkt_cnt);				\
432 }
433 
434 void
435 squeue_init(void)
436 {
437 	squeue_cache = kmem_cache_create("squeue_cache",
438 	    sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0);
439 
440 	squeue_intrdrain_ns = squeue_intrdrain_ms * SQUEUE_MSEC_TO_NSEC;
441 	squeue_writerdrain_ns = squeue_writerdrain_ms * SQUEUE_MSEC_TO_NSEC;
442 	squeue_workerdrain_ns = squeue_workerdrain_ms * SQUEUE_MSEC_TO_NSEC;
443 	squeue_workerwait_tick = MSEC_TO_TICK_ROUNDUP(squeue_workerwait_ms);
444 }
445 
446 /* ARGSUSED */
447 squeue_t *
448 squeue_create(char *name, processorid_t bind, clock_t wait, pri_t pri)
449 {
450 	squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP);
451 
452 	bzero(sqp, sizeof (squeue_t));
453 	(void) strncpy(sqp->sq_name, name, SQ_NAMELEN + 1);
454 	sqp->sq_name[SQ_NAMELEN] = '\0';
455 
456 	sqp->sq_bind = bind;
457 	sqp->sq_wait = MSEC_TO_TICK(wait);
458 	sqp->sq_avg_drain_time =
459 	    drv_hztousec(NSEC_TO_TICK_ROUNDUP(squeue_intrdrain_ns)) /
460 	    NSEC_TO_TICK_ROUNDUP(squeue_intrdrain_ns);
461 
462 #if SQUEUE_PROFILE
463 	if ((sqp->sq_kstat = kstat_create("ip", bind, name,
464 		"net", KSTAT_TYPE_NAMED,
465 		sizeof (squeue_kstat) / sizeof (kstat_named_t),
466 		KSTAT_FLAG_VIRTUAL)) != NULL) {
467 		sqp->sq_kstat->ks_lock = &squeue_kstat_lock;
468 		sqp->sq_kstat->ks_data = &squeue_kstat;
469 		sqp->sq_kstat->ks_update = squeue_kstat_update;
470 		sqp->sq_kstat->ks_private = sqp;
471 		kstat_install(sqp->sq_kstat);
472 	}
473 #endif
474 
475 	sqp->sq_worker = thread_create(NULL, 0, squeue_worker,
476 	    sqp, 0, &p0, TS_RUN, pri);
477 
478 	return (sqp);
479 }
480 
481 /* ARGSUSED */
482 void
483 squeue_bind(squeue_t *sqp, processorid_t bind)
484 {
485 	ASSERT(bind == -1);
486 
487 	mutex_enter(&sqp->sq_lock);
488 	if (sqp->sq_state & SQS_BOUND) {
489 		mutex_exit(&sqp->sq_lock);
490 		return;
491 	}
492 
493 	sqp->sq_state |= SQS_BOUND;
494 	mutex_exit(&sqp->sq_lock);
495 
496 	thread_affinity_set(sqp->sq_worker, sqp->sq_bind);
497 }
498 
499 void
500 squeue_unbind(squeue_t *sqp)
501 {
502 	mutex_enter(&sqp->sq_lock);
503 	if (!(sqp->sq_state & SQS_BOUND)) {
504 		mutex_exit(&sqp->sq_lock);
505 		return;
506 	}
507 
508 	sqp->sq_state &= ~SQS_BOUND;
509 	mutex_exit(&sqp->sq_lock);
510 
511 	thread_affinity_clear(sqp->sq_worker);
512 }
513 
514 /*
515  * squeue_enter() - enter squeue sqp with mblk mp (which can be
516  * a chain), while tail points to the end and cnt in number of
517  * mblks in the chain.
518  *
519  * For a chain of single packet (i.e. mp == tail), go through the
520  * fast path if no one is processing the squeue and nothing is queued.
521  *
522  * The proc and arg for each mblk is already stored in the mblk in
523  * appropriate places.
524  */
525 void
526 squeue_enter_chain(squeue_t *sqp, mblk_t *mp, mblk_t *tail,
527     uint32_t cnt, uint8_t tag)
528 {
529 	int		interrupt = servicing_interrupt();
530 	void 		*arg;
531 	sqproc_t	proc;
532 	hrtime_t	now;
533 #if SQUEUE_PROFILE
534 	hrtime_t 	start, delta;
535 #endif
536 
537 	ASSERT(sqp != NULL);
538 	ASSERT(mp != NULL);
539 	ASSERT(tail != NULL);
540 	ASSERT(cnt > 0);
541 	ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
542 
543 	mutex_enter(&sqp->sq_lock);
544 	if (!(sqp->sq_state & SQS_PROC)) {
545 		/*
546 		 * See if anything is already queued. If we are the
547 		 * first packet, do inline processing else queue the
548 		 * packet and do the drain.
549 		 */
550 		sqp->sq_run = curthread;
551 		if (sqp->sq_first == NULL && cnt == 1) {
552 			/*
553 			 * Fast-path, ok to process and nothing queued.
554 			 */
555 			sqp->sq_state |= (SQS_PROC|SQS_FAST);
556 			mutex_exit(&sqp->sq_lock);
557 
558 			/*
559 			 * We are the chain of 1 packet so
560 			 * go through this fast path.
561 			 */
562 			arg = mp->b_prev;
563 			mp->b_prev = NULL;
564 			proc = (sqproc_t)mp->b_queue;
565 			mp->b_queue = NULL;
566 
567 			ASSERT(proc != NULL);
568 			ASSERT(arg != NULL);
569 			ASSERT(mp->b_next == NULL);
570 
571 #if SQUEUE_DEBUG
572 			sqp->sq_isintr = interrupt;
573 			sqp->sq_curmp = mp;
574 			sqp->sq_curproc = proc;
575 			sqp->sq_connp = arg;
576 			mp->b_tag = sqp->sq_tag = tag;
577 #endif
578 #if SQUEUE_PROFILE
579 			if (SQ_PROFILING(sqp)) {
580 				if (interrupt)
581 					SQSTAT(sqp, sq_npackets_intr);
582 				else
583 					SQSTAT(sqp, sq_npackets_other);
584 				start = gethrtime();
585 			}
586 #endif
587 			((conn_t *)arg)->conn_on_sqp = B_TRUE;
588 			DTRACE_PROBE3(squeue__proc__start, squeue_t *,
589 			    sqp, mblk_t *, mp, conn_t *, arg);
590 			(*proc)(arg, mp, sqp);
591 			DTRACE_PROBE2(squeue__proc__end, squeue_t *,
592 			    sqp, conn_t *, arg);
593 			((conn_t *)arg)->conn_on_sqp = B_FALSE;
594 
595 #if SQUEUE_PROFILE
596 			if (SQ_PROFILING(sqp)) {
597 				delta = gethrtime() - start;
598 				if (interrupt)
599 					SQDELTA(sqp, sq_time_intr, delta);
600 				else
601 					SQDELTA(sqp, sq_time_other, delta);
602 			}
603 #endif
604 #if SQUEUE_DEBUG
605 			sqp->sq_curmp = NULL;
606 			sqp->sq_curproc = NULL;
607 			sqp->sq_connp = NULL;
608 			sqp->sq_isintr = 0;
609 #endif
610 
611 			CONN_DEC_REF((conn_t *)arg);
612 			ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
613 			mutex_enter(&sqp->sq_lock);
614 			sqp->sq_state &= ~(SQS_PROC|SQS_FAST);
615 			if (sqp->sq_first == NULL) {
616 				/*
617 				 * We processed inline our packet and
618 				 * nothing new has arrived. We are done.
619 				 */
620 				sqp->sq_run = NULL;
621 				mutex_exit(&sqp->sq_lock);
622 				return;
623 			} else if (sqp->sq_bind != CPU->cpu_id) {
624 				/*
625 				 * If the current thread is not running
626 				 * on the CPU to which this squeue is bound,
627 				 * then don't allow it to drain.
628 				 */
629 				sqp->sq_run = NULL;
630 				SQUEUE_WORKER_WAKEUP(sqp);
631 				return;
632 			}
633 		} else {
634 			ENQUEUE_CHAIN(sqp, mp, tail, cnt);
635 #if SQUEUE_DEBUG
636 			mp->b_tag = tag;
637 #endif
638 #if SQUEUE_PROFILE
639 			if (SQ_PROFILING(sqp)) {
640 				if (servicing_interrupt())
641 					SQSTAT(sqp, sq_nqueued_intr);
642 				else
643 					SQSTAT(sqp, sq_nqueued_other);
644 				if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
645 					sqp->sq_stats.sq_max_qlen =
646 					    sqp->sq_count;
647 			}
648 #endif
649 		}
650 
651 		/*
652 		 * We are here because either we couldn't do inline
653 		 * processing (because something was already queued),
654 		 * or we had a chanin of more than one packet,
655 		 * or something else arrived after we were done with
656 		 * inline processing.
657 		 */
658 		ASSERT(MUTEX_HELD(&sqp->sq_lock));
659 		ASSERT(sqp->sq_first != NULL);
660 
661 #if SQUEUE_PROFILE
662 		if (SQ_PROFILING(sqp)) {
663 			start = gethrtime();
664 		}
665 #endif
666 #if SQUEUE_DEBUG
667 		sqp->sq_isintr = interrupt;
668 #endif
669 
670 		now = gethrtime();
671 		if (interrupt) {
672 			squeue_drain(sqp, SQS_ENTER, now +
673 			    squeue_intrdrain_ns);
674 		} else {
675 			squeue_drain(sqp, SQS_USER, now +
676 			    squeue_writerdrain_ns);
677 		}
678 
679 #if SQUEUE_PROFILE
680 		if (SQ_PROFILING(sqp)) {
681 			delta = gethrtime() - start;
682 			if (interrupt)
683 				SQDELTA(sqp, sq_time_intr, delta);
684 			else
685 				SQDELTA(sqp, sq_time_other, delta);
686 		}
687 #endif
688 #if SQUEUE_DEBUG
689 		sqp->sq_isintr = 0;
690 #endif
691 
692 		/*
693 		 * If we didn't do a complete drain, the worker
694 		 * thread was already signalled by squeue_drain.
695 		 */
696 		sqp->sq_run = NULL;
697 		mutex_exit(&sqp->sq_lock);
698 		return;
699 	} else {
700 		ASSERT(sqp->sq_run != NULL);
701 		/*
702 		 * Queue is already being processed. Just enqueue
703 		 * the packet and go away.
704 		 */
705 #if SQUEUE_DEBUG
706 		mp->b_tag = tag;
707 #endif
708 #if SQUEUE_PROFILE
709 		if (SQ_PROFILING(sqp)) {
710 			if (servicing_interrupt())
711 				SQSTAT(sqp, sq_nqueued_intr);
712 			else
713 				SQSTAT(sqp, sq_nqueued_other);
714 			if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
715 				sqp->sq_stats.sq_max_qlen = sqp->sq_count;
716 		}
717 #endif
718 
719 		ENQUEUE_CHAIN(sqp, mp, tail, cnt);
720 		mutex_exit(&sqp->sq_lock);
721 		return;
722 	}
723 }
724 
725 /*
726  * squeue_enter() - enter squeue *sqp with mblk *mp with argument of *arg.
727  */
728 void
729 squeue_enter(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void *arg,
730     uint8_t tag)
731 {
732 	int	interrupt = servicing_interrupt();
733 	hrtime_t now;
734 #if SQUEUE_PROFILE
735 	hrtime_t start, delta;
736 #endif
737 #if SQUEUE_DEBUG
738 	conn_t 	*connp = (conn_t *)arg;
739 	ASSERT(!IPCL_IS_TCP(connp) || connp->conn_tcp->tcp_connp == connp);
740 	ASSERT(!IPCL_IS_UDP(connp) || connp->conn_udp->udp_connp == connp);
741 #endif
742 
743 	ASSERT(proc != NULL);
744 	ASSERT(sqp != NULL);
745 	ASSERT(mp != NULL);
746 	ASSERT(mp->b_next == NULL);
747 	ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
748 
749 	mutex_enter(&sqp->sq_lock);
750 	if (!(sqp->sq_state & SQS_PROC)) {
751 		/*
752 		 * See if anything is already queued. If we are the
753 		 * first packet, do inline processing else queue the
754 		 * packet and do the drain.
755 		 */
756 		sqp->sq_run = curthread;
757 		if (sqp->sq_first == NULL) {
758 			/*
759 			 * Fast-path, ok to process and nothing queued.
760 			 */
761 			sqp->sq_state |= (SQS_PROC|SQS_FAST);
762 			mutex_exit(&sqp->sq_lock);
763 
764 #if SQUEUE_DEBUG
765 			sqp->sq_isintr = interrupt;
766 			sqp->sq_curmp = mp;
767 			sqp->sq_curproc = proc;
768 			sqp->sq_connp = connp;
769 			mp->b_tag = sqp->sq_tag = tag;
770 #endif
771 #if SQUEUE_PROFILE
772 			if (SQ_PROFILING(sqp)) {
773 				if (interrupt)
774 					SQSTAT(sqp, sq_npackets_intr);
775 				else
776 					SQSTAT(sqp, sq_npackets_other);
777 				start = gethrtime();
778 			}
779 #endif
780 			((conn_t *)arg)->conn_on_sqp = B_TRUE;
781 			DTRACE_PROBE3(squeue__proc__start, squeue_t *,
782 			    sqp, mblk_t *, mp, conn_t *, arg);
783 			(*proc)(arg, mp, sqp);
784 			DTRACE_PROBE2(squeue__proc__end, squeue_t *,
785 			    sqp, conn_t *, arg);
786 			((conn_t *)arg)->conn_on_sqp = B_FALSE;
787 
788 #if SQUEUE_PROFILE
789 			if (SQ_PROFILING(sqp)) {
790 				delta = gethrtime() - start;
791 				if (interrupt)
792 					SQDELTA(sqp, sq_time_intr, delta);
793 				else
794 					SQDELTA(sqp, sq_time_other, delta);
795 			}
796 #endif
797 #if SQUEUE_DEBUG
798 			sqp->sq_curmp = NULL;
799 			sqp->sq_curproc = NULL;
800 			sqp->sq_connp = NULL;
801 			sqp->sq_isintr = 0;
802 #endif
803 
804 			CONN_DEC_REF((conn_t *)arg);
805 			ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
806 			mutex_enter(&sqp->sq_lock);
807 			sqp->sq_state &= ~(SQS_PROC|SQS_FAST);
808 			if (sqp->sq_first == NULL) {
809 				/*
810 				 * We processed inline our packet and
811 				 * nothing new has arrived. We are done.
812 				 */
813 				sqp->sq_run = NULL;
814 				mutex_exit(&sqp->sq_lock);
815 				return;
816 			} else if (sqp->sq_bind != CPU->cpu_id) {
817 				/*
818 				 * If the current thread is not running
819 				 * on the CPU to which this squeue is bound,
820 				 * then don't allow it to drain.
821 				 */
822 				sqp->sq_run = NULL;
823 				SQUEUE_WORKER_WAKEUP(sqp);
824 				return;
825 			}
826 		} else {
827 			ENQUEUE_MP(sqp, mp, proc, arg);
828 #if SQUEUE_DEBUG
829 			mp->b_tag = tag;
830 #endif
831 #if SQUEUE_PROFILE
832 			if (SQ_PROFILING(sqp)) {
833 				if (servicing_interrupt())
834 					SQSTAT(sqp, sq_nqueued_intr);
835 				else
836 					SQSTAT(sqp, sq_nqueued_other);
837 				if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
838 					sqp->sq_stats.sq_max_qlen =
839 					    sqp->sq_count;
840 			}
841 #endif
842 		}
843 
844 		/*
845 		 * We are here because either we couldn't do inline
846 		 * processing (because something was already queued)
847 		 * or something else arrived after we were done with
848 		 * inline processing.
849 		 */
850 		ASSERT(MUTEX_HELD(&sqp->sq_lock));
851 		ASSERT(sqp->sq_first != NULL);
852 
853 #if SQUEUE_PROFILE
854 		if (SQ_PROFILING(sqp)) {
855 			start = gethrtime();
856 		}
857 #endif
858 #if SQUEUE_DEBUG
859 		sqp->sq_isintr = interrupt;
860 #endif
861 
862 		now = gethrtime();
863 		if (interrupt) {
864 			squeue_drain(sqp, SQS_ENTER, now +
865 			    squeue_intrdrain_ns);
866 		} else {
867 			squeue_drain(sqp, SQS_USER, now +
868 			    squeue_writerdrain_ns);
869 		}
870 
871 #if SQUEUE_PROFILE
872 		if (SQ_PROFILING(sqp)) {
873 			delta = gethrtime() - start;
874 			if (interrupt)
875 				SQDELTA(sqp, sq_time_intr, delta);
876 			else
877 				SQDELTA(sqp, sq_time_other, delta);
878 		}
879 #endif
880 #if SQUEUE_DEBUG
881 		sqp->sq_isintr = 0;
882 #endif
883 
884 		/*
885 		 * If we didn't do a complete drain, the worker
886 		 * thread was already signalled by squeue_drain.
887 		 */
888 		sqp->sq_run = NULL;
889 		mutex_exit(&sqp->sq_lock);
890 		return;
891 	} else {
892 		ASSERT(sqp->sq_run != NULL);
893 		/*
894 		 * We let a thread processing a squeue reenter only
895 		 * once. This helps the case of incoming connection
896 		 * where a SYN-ACK-ACK that triggers the conn_ind
897 		 * doesn't have to queue the packet if listener and
898 		 * eager are on the same squeue. Also helps the
899 		 * loopback connection where the two ends are bound
900 		 * to the same squeue (which is typical on single
901 		 * CPU machines).
902 		 * We let the thread reenter only once for the fear
903 		 * of stack getting blown with multiple traversal.
904 		 */
905 		if (!(sqp->sq_state & SQS_REENTER) &&
906 		    (sqp->sq_run == curthread) &&
907 		    (((conn_t *)arg)->conn_on_sqp == B_FALSE)) {
908 			sqp->sq_state |= SQS_REENTER;
909 			mutex_exit(&sqp->sq_lock);
910 
911 			((conn_t *)arg)->conn_on_sqp = B_TRUE;
912 			DTRACE_PROBE3(squeue__proc__start, squeue_t *,
913 			    sqp, mblk_t *, mp, conn_t *, arg);
914 			(*proc)(arg, mp, sqp);
915 			DTRACE_PROBE2(squeue__proc__end, squeue_t *,
916 			    sqp, conn_t *, arg);
917 			((conn_t *)arg)->conn_on_sqp = B_FALSE;
918 			CONN_DEC_REF((conn_t *)arg);
919 
920 			mutex_enter(&sqp->sq_lock);
921 			sqp->sq_state &= ~SQS_REENTER;
922 			mutex_exit(&sqp->sq_lock);
923 			return;
924 		}
925 		/*
926 		 * Queue is already being processed. Just enqueue
927 		 * the packet and go away.
928 		 */
929 #if SQUEUE_DEBUG
930 		mp->b_tag = tag;
931 #endif
932 #if SQUEUE_PROFILE
933 		if (SQ_PROFILING(sqp)) {
934 			if (servicing_interrupt())
935 				SQSTAT(sqp, sq_nqueued_intr);
936 			else
937 				SQSTAT(sqp, sq_nqueued_other);
938 			if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
939 				sqp->sq_stats.sq_max_qlen = sqp->sq_count;
940 		}
941 #endif
942 
943 		ENQUEUE_MP(sqp, mp, proc, arg);
944 		mutex_exit(&sqp->sq_lock);
945 		return;
946 	}
947 }
948 
949 void
950 squeue_enter_nodrain(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void *arg,
951     uint8_t tag)
952 {
953 	int		interrupt = servicing_interrupt();
954 	boolean_t	being_processed;
955 #if SQUEUE_DEBUG
956 	conn_t 		*connp = (conn_t *)arg;
957 #endif
958 #if SQUEUE_PROFILE
959 	hrtime_t 	start, delta;
960 #endif
961 
962 	ASSERT(proc != NULL);
963 	ASSERT(sqp != NULL);
964 	ASSERT(mp != NULL);
965 	ASSERT(mp->b_next == NULL);
966 	ASSERT(!IPCL_IS_TCP(connp) || connp->conn_tcp->tcp_connp == connp);
967 	ASSERT(!IPCL_IS_UDP(connp) || connp->conn_udp->udp_connp == connp);
968 	ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
969 
970 	mutex_enter(&sqp->sq_lock);
971 
972 	being_processed = (sqp->sq_state & SQS_PROC);
973 	if (!being_processed && (sqp->sq_first == NULL)) {
974 		/*
975 		 * Fast-path, ok to process and nothing queued.
976 		 */
977 		sqp->sq_state |= (SQS_PROC|SQS_FAST);
978 		sqp->sq_run = curthread;
979 		mutex_exit(&sqp->sq_lock);
980 
981 #if SQUEUE_DEBUG
982 		sqp->sq_isintr = interrupt;
983 		sqp->sq_curmp = mp;
984 		sqp->sq_curproc = proc;
985 		sqp->sq_connp = connp;
986 		mp->b_tag = sqp->sq_tag = tag;
987 #endif
988 
989 #if SQUEUE_PROFILE
990 		if (SQ_PROFILING(sqp)) {
991 			if (interrupt)
992 				SQSTAT(sqp, sq_npackets_intr);
993 			else
994 				SQSTAT(sqp, sq_npackets_other);
995 			start = gethrtime();
996 		}
997 #endif
998 
999 		((conn_t *)arg)->conn_on_sqp = B_TRUE;
1000 		DTRACE_PROBE3(squeue__proc__start, squeue_t *,
1001 		    sqp, mblk_t *, mp, conn_t *, arg);
1002 		(*proc)(arg, mp, sqp);
1003 		DTRACE_PROBE2(squeue__proc__end, squeue_t *,
1004 		    sqp, conn_t *, arg);
1005 		((conn_t *)arg)->conn_on_sqp = B_FALSE;
1006 
1007 #if SQUEUE_DEBUG
1008 		sqp->sq_curmp = NULL;
1009 		sqp->sq_curproc = NULL;
1010 		sqp->sq_connp = NULL;
1011 		sqp->sq_isintr = 0;
1012 #endif
1013 #if SQUEUE_PROFILE
1014 		if (SQ_PROFILING(sqp)) {
1015 			delta = gethrtime() - start;
1016 			if (interrupt)
1017 				SQDELTA(sqp, sq_time_intr, delta);
1018 			else
1019 				SQDELTA(sqp, sq_time_other, delta);
1020 		}
1021 #endif
1022 
1023 		CONN_DEC_REF((conn_t *)arg);
1024 		mutex_enter(&sqp->sq_lock);
1025 		sqp->sq_state &= ~(SQS_PROC|SQS_FAST);
1026 		sqp->sq_run = NULL;
1027 		if (sqp->sq_first == NULL) {
1028 			/*
1029 			 * We processed inline our packet and
1030 			 * nothing new has arrived. We are done.
1031 			 */
1032 			mutex_exit(&sqp->sq_lock);
1033 		} else {
1034 			SQUEUE_WORKER_WAKEUP(sqp);
1035 		}
1036 		return;
1037 	} else {
1038 		/*
1039 		 * We let a thread processing a squeue reenter only
1040 		 * once. This helps the case of incoming connection
1041 		 * where a SYN-ACK-ACK that triggers the conn_ind
1042 		 * doesn't have to queue the packet if listener and
1043 		 * eager are on the same squeue. Also helps the
1044 		 * loopback connection where the two ends are bound
1045 		 * to the same squeue (which is typical on single
1046 		 * CPU machines).
1047 		 * We let the thread reenter only once for the fear
1048 		 * of stack getting blown with multiple traversal.
1049 		 */
1050 		if (being_processed && !(sqp->sq_state & SQS_REENTER) &&
1051 		    (sqp->sq_run == curthread) &&
1052 		    (((conn_t *)arg)->conn_on_sqp == B_FALSE)) {
1053 			sqp->sq_state |= SQS_REENTER;
1054 			mutex_exit(&sqp->sq_lock);
1055 
1056 			((conn_t *)arg)->conn_on_sqp = B_TRUE;
1057 			DTRACE_PROBE3(squeue__proc__start, squeue_t *,
1058 			    sqp, mblk_t *, mp, conn_t *, arg);
1059 			(*proc)(arg, mp, sqp);
1060 			DTRACE_PROBE2(squeue__proc__end, squeue_t *,
1061 			    sqp, conn_t *, arg);
1062 			((conn_t *)arg)->conn_on_sqp = B_FALSE;
1063 			CONN_DEC_REF((conn_t *)arg);
1064 
1065 			mutex_enter(&sqp->sq_lock);
1066 			sqp->sq_state &= ~SQS_REENTER;
1067 			mutex_exit(&sqp->sq_lock);
1068 			return;
1069 		}
1070 
1071 #if SQUEUE_DEBUG
1072 		mp->b_tag = tag;
1073 #endif
1074 #if SQUEUE_PROFILE
1075 		if (SQ_PROFILING(sqp)) {
1076 			if (servicing_interrupt())
1077 				SQSTAT(sqp, sq_nqueued_intr);
1078 			else
1079 				SQSTAT(sqp, sq_nqueued_other);
1080 			if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
1081 				sqp->sq_stats.sq_max_qlen = sqp->sq_count;
1082 		}
1083 #endif
1084 		ENQUEUE_MP(sqp, mp, proc, arg);
1085 		if (being_processed) {
1086 			/*
1087 			 * Queue is already being processed.
1088 			 * No need to do anything.
1089 			 */
1090 			mutex_exit(&sqp->sq_lock);
1091 			return;
1092 		}
1093 		SQUEUE_WORKER_WAKEUP(sqp);
1094 	}
1095 }
1096 
1097 /*
1098  * squeue_fill() - fill squeue *sqp with mblk *mp with argument of *arg
1099  * without processing the squeue.
1100  */
1101 /* ARGSUSED */
1102 void
1103 squeue_fill(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void * arg,
1104     uint8_t tag)
1105 {
1106 #if SQUEUE_DEBUG
1107 	conn_t *connp = (conn_t *)arg;
1108 #endif
1109 	ASSERT(proc != NULL);
1110 	ASSERT(sqp != NULL);
1111 	ASSERT(mp != NULL);
1112 	ASSERT(mp->b_next == NULL);
1113 	ASSERT(!IPCL_IS_TCP(connp) || connp->conn_tcp->tcp_connp == connp);
1114 	ASSERT(!IPCL_IS_UDP(connp) || connp->conn_udp->udp_connp == connp);
1115 
1116 	ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
1117 	mutex_enter(&sqp->sq_lock);
1118 	ENQUEUE_MP(sqp, mp, proc, arg);
1119 #if SQUEUE_DEBUG
1120 	mp->b_tag = tag;
1121 #endif
1122 #if SQUEUE_PROFILE
1123 	if (SQ_PROFILING(sqp)) {
1124 		if (servicing_interrupt())
1125 			SQSTAT(sqp, sq_nqueued_intr);
1126 		else
1127 			SQSTAT(sqp, sq_nqueued_other);
1128 		if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
1129 			sqp->sq_stats.sq_max_qlen = sqp->sq_count;
1130 	}
1131 #endif
1132 
1133 	/*
1134 	 * If queue is already being processed. No need to do anything.
1135 	 */
1136 	if (sqp->sq_state & SQS_PROC) {
1137 		mutex_exit(&sqp->sq_lock);
1138 		return;
1139 	}
1140 
1141 	SQUEUE_WORKER_WAKEUP(sqp);
1142 }
1143 
1144 
1145 /*
1146  * PRIVATE FUNCTIONS
1147  */
1148 
1149 static void
1150 squeue_fire(void *arg)
1151 {
1152 	squeue_t	*sqp = arg;
1153 	uint_t		state;
1154 
1155 	mutex_enter(&sqp->sq_lock);
1156 
1157 	state = sqp->sq_state;
1158 	if (sqp->sq_tid == 0 && !(state & SQS_TMO_PROG)) {
1159 		mutex_exit(&sqp->sq_lock);
1160 		return;
1161 	}
1162 
1163 	sqp->sq_tid = 0;
1164 	/*
1165 	 * The timeout fired before we got a chance to set it.
1166 	 * Process it anyway but remove the SQS_TMO_PROG so that
1167 	 * the guy trying to set the timeout knows that it has
1168 	 * already been processed.
1169 	 */
1170 	if (state & SQS_TMO_PROG)
1171 		sqp->sq_state &= ~SQS_TMO_PROG;
1172 
1173 	if (!(state & SQS_PROC)) {
1174 		sqp->sq_awaken = lbolt;
1175 		cv_signal(&sqp->sq_async);
1176 	}
1177 	mutex_exit(&sqp->sq_lock);
1178 }
1179 
1180 static void
1181 squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire)
1182 {
1183 	mblk_t	*mp;
1184 	mblk_t 	*head;
1185 	sqproc_t proc;
1186 	conn_t	*connp;
1187 	clock_t	start = lbolt;
1188 	clock_t	drain_time;
1189 	timeout_id_t tid;
1190 	uint_t	cnt;
1191 	uint_t	total_cnt = 0;
1192 	ill_rx_ring_t	*sq_rx_ring = sqp->sq_rx_ring;
1193 	int	interrupt = servicing_interrupt();
1194 	boolean_t poll_on = B_FALSE;
1195 	hrtime_t now;
1196 
1197 	ASSERT(mutex_owned(&sqp->sq_lock));
1198 	ASSERT(!(sqp->sq_state & SQS_PROC));
1199 
1200 #if SQUEUE_PROFILE
1201 	if (SQ_PROFILING(sqp)) {
1202 		if (interrupt)
1203 			SQSTAT(sqp, sq_ndrains_intr);
1204 		else if (!(proc_type & SQS_WORKER))
1205 			SQSTAT(sqp, sq_ndrains_other);
1206 		else
1207 			SQSTAT(sqp, sq_ndrains_worker);
1208 	}
1209 #endif
1210 
1211 	if ((tid = sqp->sq_tid) != 0)
1212 		sqp->sq_tid = 0;
1213 
1214 	sqp->sq_state |= SQS_PROC | proc_type;
1215 	head = sqp->sq_first;
1216 	sqp->sq_first = NULL;
1217 	sqp->sq_last = NULL;
1218 	cnt = sqp->sq_count;
1219 
1220 	/*
1221 	 * We have backlog built up. Switch to polling mode if the
1222 	 * device underneath allows it. Need to do it only for
1223 	 * drain by non-interrupt thread so interrupts don't
1224 	 * come and disrupt us in between. If its a interrupt thread,
1225 	 * no need because most devices will not issue another
1226 	 * interrupt till this one returns.
1227 	 */
1228 	if ((sqp->sq_state & SQS_POLL_CAPAB) && !(proc_type & SQS_ENTER) &&
1229 		(sqp->sq_count > squeue_worker_poll_min)) {
1230 		ASSERT(sq_rx_ring != NULL);
1231 		SQS_POLLING_ON(sqp, sq_rx_ring);
1232 		poll_on = B_TRUE;
1233 	}
1234 
1235 	mutex_exit(&sqp->sq_lock);
1236 
1237 	if (tid != 0)
1238 		(void) untimeout(tid);
1239 again:
1240 	while ((mp = head) != NULL) {
1241 		head = mp->b_next;
1242 		mp->b_next = NULL;
1243 
1244 		proc = (sqproc_t)mp->b_queue;
1245 		mp->b_queue = NULL;
1246 		connp = (conn_t *)mp->b_prev;
1247 		mp->b_prev = NULL;
1248 #if SQUEUE_DEBUG
1249 		sqp->sq_curmp = mp;
1250 		sqp->sq_curproc = proc;
1251 		sqp->sq_connp = connp;
1252 		sqp->sq_tag = mp->b_tag;
1253 #endif
1254 
1255 #if SQUEUE_PROFILE
1256 		if (SQ_PROFILING(sqp)) {
1257 			if (interrupt)
1258 				SQSTAT(sqp, sq_npackets_intr);
1259 			else if (!(proc_type & SQS_WORKER))
1260 				SQSTAT(sqp, sq_npackets_other);
1261 			else
1262 				SQSTAT(sqp, sq_npackets_worker);
1263 		}
1264 #endif
1265 
1266 		connp->conn_on_sqp = B_TRUE;
1267 		DTRACE_PROBE3(squeue__proc__start, squeue_t *,
1268 		    sqp, mblk_t *, mp, conn_t *, connp);
1269 		(*proc)(connp, mp, sqp);
1270 		DTRACE_PROBE2(squeue__proc__end, squeue_t *,
1271 		    sqp, conn_t *, connp);
1272 		connp->conn_on_sqp = B_FALSE;
1273 		CONN_DEC_REF(connp);
1274 	}
1275 
1276 
1277 #if SQUEUE_DEBUG
1278 	sqp->sq_curmp = NULL;
1279 	sqp->sq_curproc = NULL;
1280 	sqp->sq_connp = NULL;
1281 #endif
1282 
1283 	mutex_enter(&sqp->sq_lock);
1284 	sqp->sq_count -= cnt;
1285 	total_cnt += cnt;
1286 
1287 	if (sqp->sq_first != NULL) {
1288 
1289 		now = gethrtime();
1290 		if (!expire || (now < expire)) {
1291 			/* More arrived and time not expired */
1292 			head = sqp->sq_first;
1293 			sqp->sq_first = NULL;
1294 			sqp->sq_last = NULL;
1295 			cnt = sqp->sq_count;
1296 			mutex_exit(&sqp->sq_lock);
1297 			goto again;
1298 		}
1299 
1300 		/*
1301 		 * If we are not worker thread and we
1302 		 * reached our time limit to do drain,
1303 		 * signal the worker thread to pick
1304 		 * up the work.
1305 		 * If we were the worker thread, then
1306 		 * we take a break to allow an interrupt
1307 		 * or writer to pick up the load.
1308 		 */
1309 		if (proc_type != SQS_WORKER) {
1310 			sqp->sq_awaken = lbolt;
1311 			cv_signal(&sqp->sq_async);
1312 		}
1313 	}
1314 
1315 	/*
1316 	 * Try to see if we can get a time estimate to process a packet.
1317 	 * Do it only in interrupt context since less chance of context
1318 	 * switch or pinning etc. to get a better estimate.
1319 	 */
1320 	if (interrupt && ((drain_time = (lbolt - start)) > 0))
1321 		sqp->sq_avg_drain_time = ((80 * sqp->sq_avg_drain_time) +
1322 		    (20 * (drv_hztousec(drain_time)/total_cnt)))/100;
1323 
1324 	sqp->sq_state &= ~(SQS_PROC | proc_type);
1325 
1326 	/*
1327 	 * If polling was turned on, turn it off and reduce the default
1328 	 * interrupt blank interval as well to bring new packets in faster
1329 	 * (reduces the latency when there is no backlog).
1330 	 */
1331 	if (poll_on && (sqp->sq_state & SQS_POLL_CAPAB)) {
1332 		ASSERT(sq_rx_ring != NULL);
1333 		SQS_POLLING_OFF(sqp, sq_rx_ring);
1334 	}
1335 }
1336 
1337 static void
1338 squeue_worker(squeue_t *sqp)
1339 {
1340 	kmutex_t *lock = &sqp->sq_lock;
1341 	kcondvar_t *async = &sqp->sq_async;
1342 	callb_cpr_t cprinfo;
1343 	hrtime_t now;
1344 #if SQUEUE_PROFILE
1345 	hrtime_t start;
1346 #endif
1347 
1348 	CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "nca");
1349 	mutex_enter(lock);
1350 
1351 	for (;;) {
1352 		while (sqp->sq_first == NULL || (sqp->sq_state & SQS_PROC)) {
1353 			CALLB_CPR_SAFE_BEGIN(&cprinfo);
1354 still_wait:
1355 			cv_wait(async, lock);
1356 			if (sqp->sq_state & SQS_PROC) {
1357 				goto still_wait;
1358 			}
1359 			CALLB_CPR_SAFE_END(&cprinfo, lock);
1360 		}
1361 
1362 #if SQUEUE_PROFILE
1363 		if (SQ_PROFILING(sqp)) {
1364 			start = gethrtime();
1365 		}
1366 #endif
1367 
1368 		ASSERT(squeue_workerdrain_ns != 0);
1369 		now = gethrtime();
1370 		sqp->sq_run = curthread;
1371 		squeue_drain(sqp, SQS_WORKER, now +  squeue_workerdrain_ns);
1372 		sqp->sq_run = NULL;
1373 
1374 		if (sqp->sq_first != NULL) {
1375 			/*
1376 			 * Doing too much processing by worker thread
1377 			 * in presense of interrupts can be sub optimal.
1378 			 * Instead, once a drain is done by worker thread
1379 			 * for squeue_writerdrain_ns (the reason we are
1380 			 * here), we force wait for squeue_workerwait_tick
1381 			 * before doing more processing even if sq_wait is
1382 			 * set to 0.
1383 			 *
1384 			 * This can be counterproductive for performance
1385 			 * if worker thread is the only means to process
1386 			 * the packets (interrupts or writers are not
1387 			 * allowed inside the squeue).
1388 			 */
1389 			if (sqp->sq_tid == 0 &&
1390 			    !(sqp->sq_state & SQS_TMO_PROG)) {
1391 				timeout_id_t	tid;
1392 
1393 				sqp->sq_state |= SQS_TMO_PROG;
1394 				mutex_exit(&sqp->sq_lock);
1395 				tid = timeout(squeue_fire, sqp,
1396 				    squeue_workerwait_tick);
1397 				mutex_enter(&sqp->sq_lock);
1398 				/*
1399 				 * Check again if we still need
1400 				 * the timeout
1401 				 */
1402 				if (((sqp->sq_state & (SQS_TMO_PROG|SQS_PROC))
1403 				    == SQS_TMO_PROG) && (sqp->sq_tid == 0) &&
1404 				    (sqp->sq_first != NULL)) {
1405 					sqp->sq_state &= ~SQS_TMO_PROG;
1406 					sqp->sq_awaken = lbolt;
1407 					sqp->sq_tid = tid;
1408 				} else if (sqp->sq_state & SQS_TMO_PROG) {
1409 					/* timeout not needed */
1410 					sqp->sq_state &= ~SQS_TMO_PROG;
1411 					mutex_exit(&(sqp)->sq_lock);
1412 					(void) untimeout(tid);
1413 					mutex_enter(&sqp->sq_lock);
1414 				}
1415 			}
1416 			CALLB_CPR_SAFE_BEGIN(&cprinfo);
1417 			cv_wait(async, lock);
1418 			CALLB_CPR_SAFE_END(&cprinfo, lock);
1419 		}
1420 
1421 
1422 #if SQUEUE_PROFILE
1423 		if (SQ_PROFILING(sqp)) {
1424 			SQDELTA(sqp, sq_time_worker, gethrtime() - start);
1425 		}
1426 #endif
1427 	}
1428 }
1429 
1430 #if SQUEUE_PROFILE
1431 static int
1432 squeue_kstat_update(kstat_t *ksp, int rw)
1433 {
1434 	struct squeue_kstat *sqsp = &squeue_kstat;
1435 	squeue_t *sqp = ksp->ks_private;
1436 
1437 	if (rw == KSTAT_WRITE)
1438 		return (EACCES);
1439 
1440 #if SQUEUE_DEBUG
1441 	sqsp->sq_count.value.ui64 = sqp->sq_count;
1442 	sqsp->sq_max_qlen.value.ui64 = sqp->sq_stats.sq_max_qlen;
1443 #endif
1444 	sqsp->sq_npackets_worker.value.ui64 = sqp->sq_stats.sq_npackets_worker;
1445 	sqsp->sq_npackets_intr.value.ui64 = sqp->sq_stats.sq_npackets_intr;
1446 	sqsp->sq_npackets_other.value.ui64 = sqp->sq_stats.sq_npackets_other;
1447 	sqsp->sq_nqueued_intr.value.ui64 = sqp->sq_stats.sq_nqueued_intr;
1448 	sqsp->sq_nqueued_other.value.ui64 = sqp->sq_stats.sq_nqueued_other;
1449 	sqsp->sq_ndrains_worker.value.ui64 = sqp->sq_stats.sq_ndrains_worker;
1450 	sqsp->sq_ndrains_intr.value.ui64 = sqp->sq_stats.sq_ndrains_intr;
1451 	sqsp->sq_ndrains_other.value.ui64 = sqp->sq_stats.sq_ndrains_other;
1452 	sqsp->sq_time_worker.value.ui64 = sqp->sq_stats.sq_time_worker;
1453 	sqsp->sq_time_intr.value.ui64 = sqp->sq_stats.sq_time_intr;
1454 	sqsp->sq_time_other.value.ui64 = sqp->sq_stats.sq_time_other;
1455 	return (0);
1456 }
1457 #endif
1458 
1459 void
1460 squeue_profile_enable(squeue_t *sqp)
1461 {
1462 	mutex_enter(&sqp->sq_lock);
1463 	sqp->sq_state |= SQS_PROFILE;
1464 	mutex_exit(&sqp->sq_lock);
1465 }
1466 
1467 void
1468 squeue_profile_disable(squeue_t *sqp)
1469 {
1470 	mutex_enter(&sqp->sq_lock);
1471 	sqp->sq_state &= ~SQS_PROFILE;
1472 	mutex_exit(&sqp->sq_lock);
1473 }
1474 
1475 void
1476 squeue_profile_reset(squeue_t *sqp)
1477 {
1478 #if SQUEUE_PROFILE
1479 	bzero(&sqp->sq_stats, sizeof (sqstat_t));
1480 #endif
1481 }
1482 
1483 void
1484 squeue_profile_start(void)
1485 {
1486 #if SQUEUE_PROFILE
1487 	squeue_profile = B_TRUE;
1488 #endif
1489 }
1490 
1491 void
1492 squeue_profile_stop(void)
1493 {
1494 #if SQUEUE_PROFILE
1495 	squeue_profile = B_FALSE;
1496 #endif
1497 }
1498 
1499 uintptr_t *
1500 squeue_getprivate(squeue_t *sqp, sqprivate_t p)
1501 {
1502 	ASSERT(p < SQPRIVATE_MAX);
1503 
1504 	return (&sqp->sq_private[p]);
1505 }
1506 
1507 processorid_t
1508 squeue_binding(squeue_t *sqp)
1509 {
1510 	return (sqp->sq_bind);
1511 }
1512