xref: /titanic_51/usr/src/uts/common/io/stream.c (revision 3c112a2b34403220c06c3e2fcac403358cfba168)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
22 /*	  All Rights Reserved  	*/
23 
24 /*
25  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
26  * Use is subject to license terms.
27  */
28 
29 #include <sys/types.h>
30 #include <sys/param.h>
31 #include <sys/thread.h>
32 #include <sys/sysmacros.h>
33 #include <sys/stropts.h>
34 #include <sys/stream.h>
35 #include <sys/strsubr.h>
36 #include <sys/strsun.h>
37 #include <sys/conf.h>
38 #include <sys/debug.h>
39 #include <sys/cmn_err.h>
40 #include <sys/kmem.h>
41 #include <sys/atomic.h>
42 #include <sys/errno.h>
43 #include <sys/vtrace.h>
44 #include <sys/ftrace.h>
45 #include <sys/ontrap.h>
46 #include <sys/multidata.h>
47 #include <sys/multidata_impl.h>
48 #include <sys/sdt.h>
49 #include <sys/strft.h>
50 
51 #ifdef DEBUG
52 #include <sys/kmem_impl.h>
53 #endif
54 
55 /*
56  * This file contains all the STREAMS utility routines that may
57  * be used by modules and drivers.
58  */
59 
60 /*
61  * STREAMS message allocator: principles of operation
62  *
63  * The streams message allocator consists of all the routines that
64  * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
65  * dupb(), freeb() and freemsg().  What follows is a high-level view
66  * of how the allocator works.
67  *
68  * Every streams message consists of one or more mblks, a dblk, and data.
69  * All mblks for all types of messages come from a common mblk_cache.
70  * The dblk and data come in several flavors, depending on how the
71  * message is allocated:
72  *
73  * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
74  *     fixed-size dblk/data caches. For message sizes that are multiples of
75  *     PAGESIZE, dblks are allocated separately from the buffer.
76  *     The associated buffer is allocated by the constructor using kmem_alloc().
77  *     For all other message sizes, dblk and its associated data is allocated
78  *     as a single contiguous chunk of memory.
79  *     Objects in these caches consist of a dblk plus its associated data.
80  *     allocb() determines the nearest-size cache by table lookup:
81  *     the dblk_cache[] array provides the mapping from size to dblk cache.
82  *
83  * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
84  *     kmem_alloc()'ing a buffer for the data and supplying that
85  *     buffer to gesballoc(), described below.
86  *
87  * (3) The four flavors of [d]esballoc[a] are all implemented by a
88  *     common routine, gesballoc() ("generic esballoc").  gesballoc()
89  *     allocates a dblk from the global dblk_esb_cache and sets db_base,
90  *     db_lim and db_frtnp to describe the caller-supplied buffer.
91  *
92  * While there are several routines to allocate messages, there is only
93  * one routine to free messages: freeb().  freeb() simply invokes the
94  * dblk's free method, dbp->db_free(), which is set at allocation time.
95  *
96  * dupb() creates a new reference to a message by allocating a new mblk,
97  * incrementing the dblk reference count and setting the dblk's free
98  * method to dblk_decref().  The dblk's original free method is retained
99  * in db_lastfree.  dblk_decref() decrements the reference count on each
100  * freeb().  If this is not the last reference it just frees the mblk;
101  * if this *is* the last reference, it restores db_free to db_lastfree,
102  * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
103  *
104  * The implementation makes aggressive use of kmem object caching for
105  * maximum performance.  This makes the code simple and compact, but
106  * also a bit abstruse in some places.  The invariants that constitute a
107  * message's constructed state, described below, are more subtle than usual.
108  *
109  * Every dblk has an "attached mblk" as part of its constructed state.
110  * The mblk is allocated by the dblk's constructor and remains attached
111  * until the message is either dup'ed or pulled up.  In the dupb() case
112  * the mblk association doesn't matter until the last free, at which time
113  * dblk_decref() attaches the last mblk to the dblk.  pullupmsg() affects
114  * the mblk association because it swaps the leading mblks of two messages,
115  * so it is responsible for swapping their db_mblk pointers accordingly.
116  * From a constructed-state viewpoint it doesn't matter that a dblk's
117  * attached mblk can change while the message is allocated; all that
118  * matters is that the dblk has *some* attached mblk when it's freed.
119  *
120  * The sizes of the allocb() small-message caches are not magical.
121  * They represent a good trade-off between internal and external
122  * fragmentation for current workloads.  They should be reevaluated
123  * periodically, especially if allocations larger than DBLK_MAX_CACHE
124  * become common.  We use 64-byte alignment so that dblks don't
125  * straddle cache lines unnecessarily.
126  */
127 #define	DBLK_MAX_CACHE		73728
128 #define	DBLK_CACHE_ALIGN	64
129 #define	DBLK_MIN_SIZE		8
130 #define	DBLK_SIZE_SHIFT		3
131 
132 #ifdef _BIG_ENDIAN
133 #define	DBLK_RTFU_SHIFT(field)	\
134 	(8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
135 #else
136 #define	DBLK_RTFU_SHIFT(field)	\
137 	(8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
138 #endif
139 
140 #define	DBLK_RTFU(ref, type, flags, uioflag)	\
141 	(((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
142 	((type) << DBLK_RTFU_SHIFT(db_type)) | \
143 	(((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
144 	((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
145 #define	DBLK_RTFU_REF_MASK	(DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
146 #define	DBLK_RTFU_WORD(dbp)	(*((uint32_t *)&(dbp)->db_ref))
147 #define	MBLK_BAND_FLAG_WORD(mp)	(*((uint32_t *)&(mp)->b_band))
148 
149 static size_t dblk_sizes[] = {
150 #ifdef _LP64
151 	16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856,
152 	8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624,
153 	40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392,
154 #else
155 	64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904,
156 	8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672,
157 	40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440,
158 #endif
159 	DBLK_MAX_CACHE, 0
160 };
161 
162 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
163 static struct kmem_cache *mblk_cache;
164 static struct kmem_cache *dblk_esb_cache;
165 static struct kmem_cache *fthdr_cache;
166 static struct kmem_cache *ftblk_cache;
167 
168 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
169 static mblk_t *allocb_oversize(size_t size, int flags);
170 static int allocb_tryhard_fails;
171 static void frnop_func(void *arg);
172 frtn_t frnop = { frnop_func };
173 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
174 
175 static boolean_t rwnext_enter(queue_t *qp);
176 static void rwnext_exit(queue_t *qp);
177 
178 /*
179  * Patchable mblk/dblk kmem_cache flags.
180  */
181 int dblk_kmem_flags = 0;
182 int mblk_kmem_flags = 0;
183 
184 static int
185 dblk_constructor(void *buf, void *cdrarg, int kmflags)
186 {
187 	dblk_t *dbp = buf;
188 	ssize_t msg_size = (ssize_t)cdrarg;
189 	size_t index;
190 
191 	ASSERT(msg_size != 0);
192 
193 	index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
194 
195 	ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
196 
197 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
198 		return (-1);
199 	if ((msg_size & PAGEOFFSET) == 0) {
200 		dbp->db_base = kmem_alloc(msg_size, kmflags);
201 		if (dbp->db_base == NULL) {
202 			kmem_cache_free(mblk_cache, dbp->db_mblk);
203 			return (-1);
204 		}
205 	} else {
206 		dbp->db_base = (unsigned char *)&dbp[1];
207 	}
208 
209 	dbp->db_mblk->b_datap = dbp;
210 	dbp->db_cache = dblk_cache[index];
211 	dbp->db_lim = dbp->db_base + msg_size;
212 	dbp->db_free = dbp->db_lastfree = dblk_lastfree;
213 	dbp->db_frtnp = NULL;
214 	dbp->db_fthdr = NULL;
215 	dbp->db_credp = NULL;
216 	dbp->db_cpid = -1;
217 	dbp->db_struioflag = 0;
218 	dbp->db_struioun.cksum.flags = 0;
219 	return (0);
220 }
221 
222 /*ARGSUSED*/
223 static int
224 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
225 {
226 	dblk_t *dbp = buf;
227 
228 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
229 		return (-1);
230 	dbp->db_mblk->b_datap = dbp;
231 	dbp->db_cache = dblk_esb_cache;
232 	dbp->db_fthdr = NULL;
233 	dbp->db_credp = NULL;
234 	dbp->db_cpid = -1;
235 	dbp->db_struioflag = 0;
236 	dbp->db_struioun.cksum.flags = 0;
237 	return (0);
238 }
239 
240 static int
241 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
242 {
243 	dblk_t *dbp = buf;
244 	bcache_t *bcp = cdrarg;
245 
246 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
247 		return (-1);
248 
249 	dbp->db_base = kmem_cache_alloc(bcp->buffer_cache, kmflags);
250 	if (dbp->db_base == NULL) {
251 		kmem_cache_free(mblk_cache, dbp->db_mblk);
252 		return (-1);
253 	}
254 
255 	dbp->db_mblk->b_datap = dbp;
256 	dbp->db_cache = (void *)bcp;
257 	dbp->db_lim = dbp->db_base + bcp->size;
258 	dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
259 	dbp->db_frtnp = NULL;
260 	dbp->db_fthdr = NULL;
261 	dbp->db_credp = NULL;
262 	dbp->db_cpid = -1;
263 	dbp->db_struioflag = 0;
264 	dbp->db_struioun.cksum.flags = 0;
265 	return (0);
266 }
267 
268 /*ARGSUSED*/
269 static void
270 dblk_destructor(void *buf, void *cdrarg)
271 {
272 	dblk_t *dbp = buf;
273 	ssize_t msg_size = (ssize_t)cdrarg;
274 
275 	ASSERT(dbp->db_mblk->b_datap == dbp);
276 	ASSERT(msg_size != 0);
277 	ASSERT(dbp->db_struioflag == 0);
278 	ASSERT(dbp->db_struioun.cksum.flags == 0);
279 
280 	if ((msg_size & PAGEOFFSET) == 0) {
281 		kmem_free(dbp->db_base, msg_size);
282 	}
283 
284 	kmem_cache_free(mblk_cache, dbp->db_mblk);
285 }
286 
287 static void
288 bcache_dblk_destructor(void *buf, void *cdrarg)
289 {
290 	dblk_t *dbp = buf;
291 	bcache_t *bcp = cdrarg;
292 
293 	kmem_cache_free(bcp->buffer_cache, dbp->db_base);
294 
295 	ASSERT(dbp->db_mblk->b_datap == dbp);
296 	ASSERT(dbp->db_struioflag == 0);
297 	ASSERT(dbp->db_struioun.cksum.flags == 0);
298 
299 	kmem_cache_free(mblk_cache, dbp->db_mblk);
300 }
301 
302 /* ARGSUSED */
303 static int
304 ftblk_constructor(void *buf, void *cdrarg, int kmflags)
305 {
306 	ftblk_t *fbp = buf;
307 	int i;
308 
309 	bzero(fbp, sizeof (ftblk_t));
310 	if (str_ftstack != 0) {
311 		for (i = 0; i < FTBLK_EVNTS; i++)
312 			fbp->ev[i].stk = kmem_alloc(sizeof (ftstk_t), kmflags);
313 	}
314 
315 	return (0);
316 }
317 
318 /* ARGSUSED */
319 static void
320 ftblk_destructor(void *buf, void *cdrarg)
321 {
322 	ftblk_t *fbp = buf;
323 	int i;
324 
325 	if (str_ftstack != 0) {
326 		for (i = 0; i < FTBLK_EVNTS; i++) {
327 			if (fbp->ev[i].stk != NULL) {
328 				kmem_free(fbp->ev[i].stk, sizeof (ftstk_t));
329 				fbp->ev[i].stk = NULL;
330 			}
331 		}
332 	}
333 }
334 
335 static int
336 fthdr_constructor(void *buf, void *cdrarg, int kmflags)
337 {
338 	fthdr_t *fhp = buf;
339 
340 	return (ftblk_constructor(&fhp->first, cdrarg, kmflags));
341 }
342 
343 static void
344 fthdr_destructor(void *buf, void *cdrarg)
345 {
346 	fthdr_t *fhp = buf;
347 
348 	ftblk_destructor(&fhp->first, cdrarg);
349 }
350 
351 void
352 streams_msg_init(void)
353 {
354 	char name[40];
355 	size_t size;
356 	size_t lastsize = DBLK_MIN_SIZE;
357 	size_t *sizep;
358 	struct kmem_cache *cp;
359 	size_t tot_size;
360 	int offset;
361 
362 	mblk_cache = kmem_cache_create("streams_mblk", sizeof (mblk_t), 32,
363 	    NULL, NULL, NULL, NULL, NULL, mblk_kmem_flags);
364 
365 	for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
366 
367 		if ((offset = (size & PAGEOFFSET)) != 0) {
368 			/*
369 			 * We are in the middle of a page, dblk should
370 			 * be allocated on the same page
371 			 */
372 			tot_size = size + sizeof (dblk_t);
373 			ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
374 			    < PAGESIZE);
375 			ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
376 
377 		} else {
378 
379 			/*
380 			 * buf size is multiple of page size, dblk and
381 			 * buffer are allocated separately.
382 			 */
383 
384 			ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
385 			tot_size = sizeof (dblk_t);
386 		}
387 
388 		(void) sprintf(name, "streams_dblk_%ld", size);
389 		cp = kmem_cache_create(name, tot_size, DBLK_CACHE_ALIGN,
390 		    dblk_constructor, dblk_destructor, NULL, (void *)(size),
391 		    NULL, dblk_kmem_flags);
392 
393 		while (lastsize <= size) {
394 			dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
395 			lastsize += DBLK_MIN_SIZE;
396 		}
397 	}
398 
399 	dblk_esb_cache = kmem_cache_create("streams_dblk_esb", sizeof (dblk_t),
400 	    DBLK_CACHE_ALIGN, dblk_esb_constructor, dblk_destructor, NULL,
401 	    (void *)sizeof (dblk_t), NULL, dblk_kmem_flags);
402 	fthdr_cache = kmem_cache_create("streams_fthdr", sizeof (fthdr_t), 32,
403 	    fthdr_constructor, fthdr_destructor, NULL, NULL, NULL, 0);
404 	ftblk_cache = kmem_cache_create("streams_ftblk", sizeof (ftblk_t), 32,
405 	    ftblk_constructor, ftblk_destructor, NULL, NULL, NULL, 0);
406 
407 	/* Initialize Multidata caches */
408 	mmd_init();
409 
410 	/* initialize throttling queue for esballoc */
411 	esballoc_queue_init();
412 }
413 
414 /*ARGSUSED*/
415 mblk_t *
416 allocb(size_t size, uint_t pri)
417 {
418 	dblk_t *dbp;
419 	mblk_t *mp;
420 	size_t index;
421 
422 	index =  (size - 1)  >> DBLK_SIZE_SHIFT;
423 
424 	if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
425 		if (size != 0) {
426 			mp = allocb_oversize(size, KM_NOSLEEP);
427 			goto out;
428 		}
429 		index = 0;
430 	}
431 
432 	if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
433 		mp = NULL;
434 		goto out;
435 	}
436 
437 	mp = dbp->db_mblk;
438 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
439 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
440 	mp->b_rptr = mp->b_wptr = dbp->db_base;
441 	mp->b_queue = NULL;
442 	MBLK_BAND_FLAG_WORD(mp) = 0;
443 	STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
444 out:
445 	FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
446 
447 	return (mp);
448 }
449 
450 /*
451  * Allocate an mblk taking db_credp and db_cpid from the template.
452  * Allow the cred to be NULL.
453  */
454 mblk_t *
455 allocb_tmpl(size_t size, const mblk_t *tmpl)
456 {
457 	mblk_t *mp = allocb(size, 0);
458 
459 	if (mp != NULL) {
460 		dblk_t *src = tmpl->b_datap;
461 		dblk_t *dst = mp->b_datap;
462 		cred_t *cr;
463 		pid_t cpid;
464 
465 		cr = msg_getcred(tmpl, &cpid);
466 		if (cr != NULL)
467 			crhold(dst->db_credp = cr);
468 		dst->db_cpid = cpid;
469 		dst->db_type = src->db_type;
470 	}
471 	return (mp);
472 }
473 
474 mblk_t *
475 allocb_cred(size_t size, cred_t *cr, pid_t cpid)
476 {
477 	mblk_t *mp = allocb(size, 0);
478 
479 	ASSERT(cr != NULL);
480 	if (mp != NULL) {
481 		dblk_t *dbp = mp->b_datap;
482 
483 		crhold(dbp->db_credp = cr);
484 		dbp->db_cpid = cpid;
485 	}
486 	return (mp);
487 }
488 
489 mblk_t *
490 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr, pid_t cpid)
491 {
492 	mblk_t *mp = allocb_wait(size, 0, flags, error);
493 
494 	ASSERT(cr != NULL);
495 	if (mp != NULL) {
496 		dblk_t *dbp = mp->b_datap;
497 
498 		crhold(dbp->db_credp = cr);
499 		dbp->db_cpid = cpid;
500 	}
501 
502 	return (mp);
503 }
504 
505 /*
506  * Extract the db_cred (and optionally db_cpid) from a message.
507  * We find the first mblk which has a non-NULL db_cred and use that.
508  * If none found we return NULL.
509  * Does NOT get a hold on the cred.
510  */
511 cred_t *
512 msg_getcred(const mblk_t *mp, pid_t *cpidp)
513 {
514 	cred_t *cr = NULL;
515 	cred_t *cr2;
516 	mblk_t *mp2;
517 
518 	while (mp != NULL) {
519 		dblk_t *dbp = mp->b_datap;
520 
521 		cr = dbp->db_credp;
522 		if (cr == NULL) {
523 			mp = mp->b_cont;
524 			continue;
525 		}
526 		if (cpidp != NULL)
527 			*cpidp = dbp->db_cpid;
528 
529 #ifdef DEBUG
530 		/*
531 		 * Normally there should at most one db_credp in a message.
532 		 * But if there are multiple (as in the case of some M_IOC*
533 		 * and some internal messages in TCP/IP bind logic) then
534 		 * they must be identical in the normal case.
535 		 * However, a socket can be shared between different uids
536 		 * in which case data queued in TCP would be from different
537 		 * creds. Thus we can only assert for the zoneid being the
538 		 * same. Due to Multi-level Level Ports for TX, some
539 		 * cred_t can have a NULL cr_zone, and we skip the comparison
540 		 * in that case.
541 		 */
542 		mp2 = mp->b_cont;
543 		while (mp2 != NULL) {
544 			cr2 = DB_CRED(mp2);
545 			if (cr2 != NULL) {
546 				DTRACE_PROBE2(msg__getcred,
547 				    cred_t *, cr, cred_t *, cr2);
548 				ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
549 				    crgetzone(cr) == NULL ||
550 				    crgetzone(cr2) == NULL);
551 			}
552 			mp2 = mp2->b_cont;
553 		}
554 #endif
555 		return (cr);
556 	}
557 	if (cpidp != NULL)
558 		*cpidp = NOPID;
559 	return (NULL);
560 }
561 
562 /*
563  * Variant of msg_getcred which, when a cred is found
564  * 1. Returns with a hold on the cred
565  * 2. Clears the first cred in the mblk.
566  * This is more efficient to use than a msg_getcred() + crhold() when
567  * the message is freed after the cred has been extracted.
568  *
569  * The caller is responsible for ensuring that there is no other reference
570  * on the message since db_credp can not be cleared when there are other
571  * references.
572  */
573 cred_t *
574 msg_extractcred(mblk_t *mp, pid_t *cpidp)
575 {
576 	cred_t *cr = NULL;
577 	cred_t *cr2;
578 	mblk_t *mp2;
579 
580 	while (mp != NULL) {
581 		dblk_t *dbp = mp->b_datap;
582 
583 		cr = dbp->db_credp;
584 		if (cr == NULL) {
585 			mp = mp->b_cont;
586 			continue;
587 		}
588 		ASSERT(dbp->db_ref == 1);
589 		dbp->db_credp = NULL;
590 		if (cpidp != NULL)
591 			*cpidp = dbp->db_cpid;
592 #ifdef DEBUG
593 		/*
594 		 * Normally there should at most one db_credp in a message.
595 		 * But if there are multiple (as in the case of some M_IOC*
596 		 * and some internal messages in TCP/IP bind logic) then
597 		 * they must be identical in the normal case.
598 		 * However, a socket can be shared between different uids
599 		 * in which case data queued in TCP would be from different
600 		 * creds. Thus we can only assert for the zoneid being the
601 		 * same. Due to Multi-level Level Ports for TX, some
602 		 * cred_t can have a NULL cr_zone, and we skip the comparison
603 		 * in that case.
604 		 */
605 		mp2 = mp->b_cont;
606 		while (mp2 != NULL) {
607 			cr2 = DB_CRED(mp2);
608 			if (cr2 != NULL) {
609 				DTRACE_PROBE2(msg__extractcred,
610 				    cred_t *, cr, cred_t *, cr2);
611 				ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
612 				    crgetzone(cr) == NULL ||
613 				    crgetzone(cr2) == NULL);
614 			}
615 			mp2 = mp2->b_cont;
616 		}
617 #endif
618 		return (cr);
619 	}
620 	return (NULL);
621 }
622 /*
623  * Get the label for a message. Uses the first mblk in the message
624  * which has a non-NULL db_credp.
625  * Returns NULL if there is no credp.
626  */
627 extern struct ts_label_s *
628 msg_getlabel(const mblk_t *mp)
629 {
630 	cred_t *cr = msg_getcred(mp, NULL);
631 
632 	if (cr == NULL)
633 		return (NULL);
634 
635 	return (crgetlabel(cr));
636 }
637 
638 void
639 freeb(mblk_t *mp)
640 {
641 	dblk_t *dbp = mp->b_datap;
642 
643 	ASSERT(dbp->db_ref > 0);
644 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
645 	FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
646 
647 	STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
648 
649 	dbp->db_free(mp, dbp);
650 }
651 
652 void
653 freemsg(mblk_t *mp)
654 {
655 	FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
656 	while (mp) {
657 		dblk_t *dbp = mp->b_datap;
658 		mblk_t *mp_cont = mp->b_cont;
659 
660 		ASSERT(dbp->db_ref > 0);
661 		ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
662 
663 		STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
664 
665 		dbp->db_free(mp, dbp);
666 		mp = mp_cont;
667 	}
668 }
669 
670 /*
671  * Reallocate a block for another use.  Try hard to use the old block.
672  * If the old data is wanted (copy), leave b_wptr at the end of the data,
673  * otherwise return b_wptr = b_rptr.
674  *
675  * This routine is private and unstable.
676  */
677 mblk_t	*
678 reallocb(mblk_t *mp, size_t size, uint_t copy)
679 {
680 	mblk_t		*mp1;
681 	unsigned char	*old_rptr;
682 	ptrdiff_t	cur_size;
683 
684 	if (mp == NULL)
685 		return (allocb(size, BPRI_HI));
686 
687 	cur_size = mp->b_wptr - mp->b_rptr;
688 	old_rptr = mp->b_rptr;
689 
690 	ASSERT(mp->b_datap->db_ref != 0);
691 
692 	if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
693 		/*
694 		 * If the data is wanted and it will fit where it is, no
695 		 * work is required.
696 		 */
697 		if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
698 			return (mp);
699 
700 		mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
701 		mp1 = mp;
702 	} else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
703 		/* XXX other mp state could be copied too, db_flags ... ? */
704 		mp1->b_cont = mp->b_cont;
705 	} else {
706 		return (NULL);
707 	}
708 
709 	if (copy) {
710 		bcopy(old_rptr, mp1->b_rptr, cur_size);
711 		mp1->b_wptr = mp1->b_rptr + cur_size;
712 	}
713 
714 	if (mp != mp1)
715 		freeb(mp);
716 
717 	return (mp1);
718 }
719 
720 static void
721 dblk_lastfree(mblk_t *mp, dblk_t *dbp)
722 {
723 	ASSERT(dbp->db_mblk == mp);
724 	if (dbp->db_fthdr != NULL)
725 		str_ftfree(dbp);
726 
727 	/* set credp and projid to be 'unspecified' before returning to cache */
728 	if (dbp->db_credp != NULL) {
729 		crfree(dbp->db_credp);
730 		dbp->db_credp = NULL;
731 	}
732 	dbp->db_cpid = -1;
733 
734 	/* Reset the struioflag and the checksum flag fields */
735 	dbp->db_struioflag = 0;
736 	dbp->db_struioun.cksum.flags = 0;
737 
738 	/* and the COOKED and/or UIOA flag(s) */
739 	dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA);
740 
741 	kmem_cache_free(dbp->db_cache, dbp);
742 }
743 
744 static void
745 dblk_decref(mblk_t *mp, dblk_t *dbp)
746 {
747 	if (dbp->db_ref != 1) {
748 		uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
749 		    -(1 << DBLK_RTFU_SHIFT(db_ref)));
750 		/*
751 		 * atomic_add_32_nv() just decremented db_ref, so we no longer
752 		 * have a reference to the dblk, which means another thread
753 		 * could free it.  Therefore we cannot examine the dblk to
754 		 * determine whether ours was the last reference.  Instead,
755 		 * we extract the new and minimum reference counts from rtfu.
756 		 * Note that all we're really saying is "if (ref != refmin)".
757 		 */
758 		if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
759 		    ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
760 			kmem_cache_free(mblk_cache, mp);
761 			return;
762 		}
763 	}
764 	dbp->db_mblk = mp;
765 	dbp->db_free = dbp->db_lastfree;
766 	dbp->db_lastfree(mp, dbp);
767 }
768 
769 mblk_t *
770 dupb(mblk_t *mp)
771 {
772 	dblk_t *dbp = mp->b_datap;
773 	mblk_t *new_mp;
774 	uint32_t oldrtfu, newrtfu;
775 
776 	if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
777 		goto out;
778 
779 	new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
780 	new_mp->b_rptr = mp->b_rptr;
781 	new_mp->b_wptr = mp->b_wptr;
782 	new_mp->b_datap = dbp;
783 	new_mp->b_queue = NULL;
784 	MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
785 
786 	STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
787 
788 	dbp->db_free = dblk_decref;
789 	do {
790 		ASSERT(dbp->db_ref > 0);
791 		oldrtfu = DBLK_RTFU_WORD(dbp);
792 		newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
793 		/*
794 		 * If db_ref is maxed out we can't dup this message anymore.
795 		 */
796 		if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
797 			kmem_cache_free(mblk_cache, new_mp);
798 			new_mp = NULL;
799 			goto out;
800 		}
801 	} while (cas32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != oldrtfu);
802 
803 out:
804 	FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
805 	return (new_mp);
806 }
807 
808 static void
809 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
810 {
811 	frtn_t *frp = dbp->db_frtnp;
812 
813 	ASSERT(dbp->db_mblk == mp);
814 	frp->free_func(frp->free_arg);
815 	if (dbp->db_fthdr != NULL)
816 		str_ftfree(dbp);
817 
818 	/* set credp and projid to be 'unspecified' before returning to cache */
819 	if (dbp->db_credp != NULL) {
820 		crfree(dbp->db_credp);
821 		dbp->db_credp = NULL;
822 	}
823 	dbp->db_cpid = -1;
824 	dbp->db_struioflag = 0;
825 	dbp->db_struioun.cksum.flags = 0;
826 
827 	kmem_cache_free(dbp->db_cache, dbp);
828 }
829 
830 /*ARGSUSED*/
831 static void
832 frnop_func(void *arg)
833 {
834 }
835 
836 /*
837  * Generic esballoc used to implement the four flavors: [d]esballoc[a].
838  */
839 static mblk_t *
840 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
841 	void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
842 {
843 	dblk_t *dbp;
844 	mblk_t *mp;
845 
846 	ASSERT(base != NULL && frp != NULL);
847 
848 	if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
849 		mp = NULL;
850 		goto out;
851 	}
852 
853 	mp = dbp->db_mblk;
854 	dbp->db_base = base;
855 	dbp->db_lim = base + size;
856 	dbp->db_free = dbp->db_lastfree = lastfree;
857 	dbp->db_frtnp = frp;
858 	DBLK_RTFU_WORD(dbp) = db_rtfu;
859 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
860 	mp->b_rptr = mp->b_wptr = base;
861 	mp->b_queue = NULL;
862 	MBLK_BAND_FLAG_WORD(mp) = 0;
863 
864 out:
865 	FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
866 	return (mp);
867 }
868 
869 /*ARGSUSED*/
870 mblk_t *
871 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
872 {
873 	mblk_t *mp;
874 
875 	/*
876 	 * Note that this is structured to allow the common case (i.e.
877 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
878 	 * call optimization.
879 	 */
880 	if (!str_ftnever) {
881 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
882 		    frp, freebs_enqueue, KM_NOSLEEP);
883 
884 		if (mp != NULL)
885 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
886 		return (mp);
887 	}
888 
889 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
890 	    frp, freebs_enqueue, KM_NOSLEEP));
891 }
892 
893 /*
894  * Same as esballoc() but sleeps waiting for memory.
895  */
896 /*ARGSUSED*/
897 mblk_t *
898 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
899 {
900 	mblk_t *mp;
901 
902 	/*
903 	 * Note that this is structured to allow the common case (i.e.
904 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
905 	 * call optimization.
906 	 */
907 	if (!str_ftnever) {
908 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
909 		    frp, freebs_enqueue, KM_SLEEP);
910 
911 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
912 		return (mp);
913 	}
914 
915 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
916 	    frp, freebs_enqueue, KM_SLEEP));
917 }
918 
919 /*ARGSUSED*/
920 mblk_t *
921 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
922 {
923 	mblk_t *mp;
924 
925 	/*
926 	 * Note that this is structured to allow the common case (i.e.
927 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
928 	 * call optimization.
929 	 */
930 	if (!str_ftnever) {
931 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
932 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
933 
934 		if (mp != NULL)
935 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
936 		return (mp);
937 	}
938 
939 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
940 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
941 }
942 
943 /*ARGSUSED*/
944 mblk_t *
945 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
946 {
947 	mblk_t *mp;
948 
949 	/*
950 	 * Note that this is structured to allow the common case (i.e.
951 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
952 	 * call optimization.
953 	 */
954 	if (!str_ftnever) {
955 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
956 		    frp, freebs_enqueue, KM_NOSLEEP);
957 
958 		if (mp != NULL)
959 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
960 		return (mp);
961 	}
962 
963 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
964 	    frp, freebs_enqueue, KM_NOSLEEP));
965 }
966 
967 /*ARGSUSED*/
968 mblk_t *
969 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
970 {
971 	mblk_t *mp;
972 
973 	/*
974 	 * Note that this is structured to allow the common case (i.e.
975 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
976 	 * call optimization.
977 	 */
978 	if (!str_ftnever) {
979 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
980 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
981 
982 		if (mp != NULL)
983 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
984 		return (mp);
985 	}
986 
987 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
988 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
989 }
990 
991 static void
992 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
993 {
994 	bcache_t *bcp = dbp->db_cache;
995 
996 	ASSERT(dbp->db_mblk == mp);
997 	if (dbp->db_fthdr != NULL)
998 		str_ftfree(dbp);
999 
1000 	/* set credp and projid to be 'unspecified' before returning to cache */
1001 	if (dbp->db_credp != NULL) {
1002 		crfree(dbp->db_credp);
1003 		dbp->db_credp = NULL;
1004 	}
1005 	dbp->db_cpid = -1;
1006 	dbp->db_struioflag = 0;
1007 	dbp->db_struioun.cksum.flags = 0;
1008 
1009 	mutex_enter(&bcp->mutex);
1010 	kmem_cache_free(bcp->dblk_cache, dbp);
1011 	bcp->alloc--;
1012 
1013 	if (bcp->alloc == 0 && bcp->destroy != 0) {
1014 		kmem_cache_destroy(bcp->dblk_cache);
1015 		kmem_cache_destroy(bcp->buffer_cache);
1016 		mutex_exit(&bcp->mutex);
1017 		mutex_destroy(&bcp->mutex);
1018 		kmem_free(bcp, sizeof (bcache_t));
1019 	} else {
1020 		mutex_exit(&bcp->mutex);
1021 	}
1022 }
1023 
1024 bcache_t *
1025 bcache_create(char *name, size_t size, uint_t align)
1026 {
1027 	bcache_t *bcp;
1028 	char buffer[255];
1029 
1030 	ASSERT((align & (align - 1)) == 0);
1031 
1032 	if ((bcp = kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == NULL)
1033 		return (NULL);
1034 
1035 	bcp->size = size;
1036 	bcp->align = align;
1037 	bcp->alloc = 0;
1038 	bcp->destroy = 0;
1039 
1040 	mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
1041 
1042 	(void) sprintf(buffer, "%s_buffer_cache", name);
1043 	bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
1044 	    NULL, NULL, NULL, 0);
1045 	(void) sprintf(buffer, "%s_dblk_cache", name);
1046 	bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
1047 	    DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
1048 	    NULL, (void *)bcp, NULL, 0);
1049 
1050 	return (bcp);
1051 }
1052 
1053 void
1054 bcache_destroy(bcache_t *bcp)
1055 {
1056 	ASSERT(bcp != NULL);
1057 
1058 	mutex_enter(&bcp->mutex);
1059 	if (bcp->alloc == 0) {
1060 		kmem_cache_destroy(bcp->dblk_cache);
1061 		kmem_cache_destroy(bcp->buffer_cache);
1062 		mutex_exit(&bcp->mutex);
1063 		mutex_destroy(&bcp->mutex);
1064 		kmem_free(bcp, sizeof (bcache_t));
1065 	} else {
1066 		bcp->destroy++;
1067 		mutex_exit(&bcp->mutex);
1068 	}
1069 }
1070 
1071 /*ARGSUSED*/
1072 mblk_t *
1073 bcache_allocb(bcache_t *bcp, uint_t pri)
1074 {
1075 	dblk_t *dbp;
1076 	mblk_t *mp = NULL;
1077 
1078 	ASSERT(bcp != NULL);
1079 
1080 	mutex_enter(&bcp->mutex);
1081 	if (bcp->destroy != 0) {
1082 		mutex_exit(&bcp->mutex);
1083 		goto out;
1084 	}
1085 
1086 	if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
1087 		mutex_exit(&bcp->mutex);
1088 		goto out;
1089 	}
1090 	bcp->alloc++;
1091 	mutex_exit(&bcp->mutex);
1092 
1093 	ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
1094 
1095 	mp = dbp->db_mblk;
1096 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1097 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
1098 	mp->b_rptr = mp->b_wptr = dbp->db_base;
1099 	mp->b_queue = NULL;
1100 	MBLK_BAND_FLAG_WORD(mp) = 0;
1101 	STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
1102 out:
1103 	FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
1104 
1105 	return (mp);
1106 }
1107 
1108 static void
1109 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
1110 {
1111 	ASSERT(dbp->db_mblk == mp);
1112 	if (dbp->db_fthdr != NULL)
1113 		str_ftfree(dbp);
1114 
1115 	/* set credp and projid to be 'unspecified' before returning to cache */
1116 	if (dbp->db_credp != NULL) {
1117 		crfree(dbp->db_credp);
1118 		dbp->db_credp = NULL;
1119 	}
1120 	dbp->db_cpid = -1;
1121 	dbp->db_struioflag = 0;
1122 	dbp->db_struioun.cksum.flags = 0;
1123 
1124 	kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
1125 	kmem_cache_free(dbp->db_cache, dbp);
1126 }
1127 
1128 static mblk_t *
1129 allocb_oversize(size_t size, int kmflags)
1130 {
1131 	mblk_t *mp;
1132 	void *buf;
1133 
1134 	size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
1135 	if ((buf = kmem_alloc(size, kmflags)) == NULL)
1136 		return (NULL);
1137 	if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
1138 	    &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
1139 		kmem_free(buf, size);
1140 
1141 	if (mp != NULL)
1142 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
1143 
1144 	return (mp);
1145 }
1146 
1147 mblk_t *
1148 allocb_tryhard(size_t target_size)
1149 {
1150 	size_t size;
1151 	mblk_t *bp;
1152 
1153 	for (size = target_size; size < target_size + 512;
1154 	    size += DBLK_CACHE_ALIGN)
1155 		if ((bp = allocb(size, BPRI_HI)) != NULL)
1156 			return (bp);
1157 	allocb_tryhard_fails++;
1158 	return (NULL);
1159 }
1160 
1161 /*
1162  * This routine is consolidation private for STREAMS internal use
1163  * This routine may only be called from sync routines (i.e., not
1164  * from put or service procedures).  It is located here (rather
1165  * than strsubr.c) so that we don't have to expose all of the
1166  * allocb() implementation details in header files.
1167  */
1168 mblk_t *
1169 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
1170 {
1171 	dblk_t *dbp;
1172 	mblk_t *mp;
1173 	size_t index;
1174 
1175 	index = (size -1) >> DBLK_SIZE_SHIFT;
1176 
1177 	if (flags & STR_NOSIG) {
1178 		if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
1179 			if (size != 0) {
1180 				mp = allocb_oversize(size, KM_SLEEP);
1181 				FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
1182 				    (uintptr_t)mp);
1183 				return (mp);
1184 			}
1185 			index = 0;
1186 		}
1187 
1188 		dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
1189 		mp = dbp->db_mblk;
1190 		DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1191 		mp->b_next = mp->b_prev = mp->b_cont = NULL;
1192 		mp->b_rptr = mp->b_wptr = dbp->db_base;
1193 		mp->b_queue = NULL;
1194 		MBLK_BAND_FLAG_WORD(mp) = 0;
1195 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
1196 
1197 		FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
1198 
1199 	} else {
1200 		while ((mp = allocb(size, pri)) == NULL) {
1201 			if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
1202 				return (NULL);
1203 		}
1204 	}
1205 
1206 	return (mp);
1207 }
1208 
1209 /*
1210  * Call function 'func' with 'arg' when a class zero block can
1211  * be allocated with priority 'pri'.
1212  */
1213 bufcall_id_t
1214 esbbcall(uint_t pri, void (*func)(void *), void *arg)
1215 {
1216 	return (bufcall(1, pri, func, arg));
1217 }
1218 
1219 /*
1220  * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
1221  * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
1222  * This provides consistency for all internal allocators of ioctl.
1223  */
1224 mblk_t *
1225 mkiocb(uint_t cmd)
1226 {
1227 	struct iocblk	*ioc;
1228 	mblk_t		*mp;
1229 
1230 	/*
1231 	 * Allocate enough space for any of the ioctl related messages.
1232 	 */
1233 	if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
1234 		return (NULL);
1235 
1236 	bzero(mp->b_rptr, sizeof (union ioctypes));
1237 
1238 	/*
1239 	 * Set the mblk_t information and ptrs correctly.
1240 	 */
1241 	mp->b_wptr += sizeof (struct iocblk);
1242 	mp->b_datap->db_type = M_IOCTL;
1243 
1244 	/*
1245 	 * Fill in the fields.
1246 	 */
1247 	ioc		= (struct iocblk *)mp->b_rptr;
1248 	ioc->ioc_cmd	= cmd;
1249 	ioc->ioc_cr	= kcred;
1250 	ioc->ioc_id	= getiocseqno();
1251 	ioc->ioc_flag	= IOC_NATIVE;
1252 	return (mp);
1253 }
1254 
1255 /*
1256  * test if block of given size can be allocated with a request of
1257  * the given priority.
1258  * 'pri' is no longer used, but is retained for compatibility.
1259  */
1260 /* ARGSUSED */
1261 int
1262 testb(size_t size, uint_t pri)
1263 {
1264 	return ((size + sizeof (dblk_t)) <= kmem_avail());
1265 }
1266 
1267 /*
1268  * Call function 'func' with argument 'arg' when there is a reasonably
1269  * good chance that a block of size 'size' can be allocated.
1270  * 'pri' is no longer used, but is retained for compatibility.
1271  */
1272 /* ARGSUSED */
1273 bufcall_id_t
1274 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
1275 {
1276 	static long bid = 1;	/* always odd to save checking for zero */
1277 	bufcall_id_t bc_id;
1278 	struct strbufcall *bcp;
1279 
1280 	if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
1281 		return (0);
1282 
1283 	bcp->bc_func = func;
1284 	bcp->bc_arg = arg;
1285 	bcp->bc_size = size;
1286 	bcp->bc_next = NULL;
1287 	bcp->bc_executor = NULL;
1288 
1289 	mutex_enter(&strbcall_lock);
1290 	/*
1291 	 * After bcp is linked into strbcalls and strbcall_lock is dropped there
1292 	 * should be no references to bcp since it may be freed by
1293 	 * runbufcalls(). Since bcp_id field is returned, we save its value in
1294 	 * the local var.
1295 	 */
1296 	bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2);	/* keep it odd */
1297 
1298 	/*
1299 	 * add newly allocated stream event to existing
1300 	 * linked list of events.
1301 	 */
1302 	if (strbcalls.bc_head == NULL) {
1303 		strbcalls.bc_head = strbcalls.bc_tail = bcp;
1304 	} else {
1305 		strbcalls.bc_tail->bc_next = bcp;
1306 		strbcalls.bc_tail = bcp;
1307 	}
1308 
1309 	cv_signal(&strbcall_cv);
1310 	mutex_exit(&strbcall_lock);
1311 	return (bc_id);
1312 }
1313 
1314 /*
1315  * Cancel a bufcall request.
1316  */
1317 void
1318 unbufcall(bufcall_id_t id)
1319 {
1320 	strbufcall_t *bcp, *pbcp;
1321 
1322 	mutex_enter(&strbcall_lock);
1323 again:
1324 	pbcp = NULL;
1325 	for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
1326 		if (id == bcp->bc_id)
1327 			break;
1328 		pbcp = bcp;
1329 	}
1330 	if (bcp) {
1331 		if (bcp->bc_executor != NULL) {
1332 			if (bcp->bc_executor != curthread) {
1333 				cv_wait(&bcall_cv, &strbcall_lock);
1334 				goto again;
1335 			}
1336 		} else {
1337 			if (pbcp)
1338 				pbcp->bc_next = bcp->bc_next;
1339 			else
1340 				strbcalls.bc_head = bcp->bc_next;
1341 			if (bcp == strbcalls.bc_tail)
1342 				strbcalls.bc_tail = pbcp;
1343 			kmem_free(bcp, sizeof (strbufcall_t));
1344 		}
1345 	}
1346 	mutex_exit(&strbcall_lock);
1347 }
1348 
1349 /*
1350  * Duplicate a message block by block (uses dupb), returning
1351  * a pointer to the duplicate message.
1352  * Returns a non-NULL value only if the entire message
1353  * was dup'd.
1354  */
1355 mblk_t *
1356 dupmsg(mblk_t *bp)
1357 {
1358 	mblk_t *head, *nbp;
1359 
1360 	if (!bp || !(nbp = head = dupb(bp)))
1361 		return (NULL);
1362 
1363 	while (bp->b_cont) {
1364 		if (!(nbp->b_cont = dupb(bp->b_cont))) {
1365 			freemsg(head);
1366 			return (NULL);
1367 		}
1368 		nbp = nbp->b_cont;
1369 		bp = bp->b_cont;
1370 	}
1371 	return (head);
1372 }
1373 
1374 #define	DUPB_NOLOAN(bp) \
1375 	((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
1376 	copyb((bp)) : dupb((bp)))
1377 
1378 mblk_t *
1379 dupmsg_noloan(mblk_t *bp)
1380 {
1381 	mblk_t *head, *nbp;
1382 
1383 	if (bp == NULL || DB_TYPE(bp) != M_DATA ||
1384 	    ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
1385 		return (NULL);
1386 
1387 	while (bp->b_cont) {
1388 		if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
1389 			freemsg(head);
1390 			return (NULL);
1391 		}
1392 		nbp = nbp->b_cont;
1393 		bp = bp->b_cont;
1394 	}
1395 	return (head);
1396 }
1397 
1398 /*
1399  * Copy data from message and data block to newly allocated message and
1400  * data block. Returns new message block pointer, or NULL if error.
1401  * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
1402  * as in the original even when db_base is not word aligned. (bug 1052877)
1403  */
1404 mblk_t *
1405 copyb(mblk_t *bp)
1406 {
1407 	mblk_t	*nbp;
1408 	dblk_t	*dp, *ndp;
1409 	uchar_t *base;
1410 	size_t	size;
1411 	size_t	unaligned;
1412 
1413 	ASSERT(bp->b_wptr >= bp->b_rptr);
1414 
1415 	dp = bp->b_datap;
1416 	if (dp->db_fthdr != NULL)
1417 		STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
1418 
1419 	/*
1420 	 * Special handling for Multidata message; this should be
1421 	 * removed once a copy-callback routine is made available.
1422 	 */
1423 	if (dp->db_type == M_MULTIDATA) {
1424 		cred_t *cr;
1425 
1426 		if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL)
1427 			return (NULL);
1428 
1429 		nbp->b_flag = bp->b_flag;
1430 		nbp->b_band = bp->b_band;
1431 		ndp = nbp->b_datap;
1432 
1433 		/* See comments below on potential issues. */
1434 		STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1435 
1436 		ASSERT(ndp->db_type == dp->db_type);
1437 		cr = dp->db_credp;
1438 		if (cr != NULL)
1439 			crhold(ndp->db_credp = cr);
1440 		ndp->db_cpid = dp->db_cpid;
1441 		return (nbp);
1442 	}
1443 
1444 	size = dp->db_lim - dp->db_base;
1445 	unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
1446 	if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
1447 		return (NULL);
1448 	nbp->b_flag = bp->b_flag;
1449 	nbp->b_band = bp->b_band;
1450 	ndp = nbp->b_datap;
1451 
1452 	/*
1453 	 * Well, here is a potential issue.  If we are trying to
1454 	 * trace a flow, and we copy the message, we might lose
1455 	 * information about where this message might have been.
1456 	 * So we should inherit the FT data.  On the other hand,
1457 	 * a user might be interested only in alloc to free data.
1458 	 * So I guess the real answer is to provide a tunable.
1459 	 */
1460 	STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1461 
1462 	base = ndp->db_base + unaligned;
1463 	bcopy(dp->db_base, ndp->db_base + unaligned, size);
1464 
1465 	nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
1466 	nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
1467 
1468 	return (nbp);
1469 }
1470 
1471 /*
1472  * Copy data from message to newly allocated message using new
1473  * data blocks.  Returns a pointer to the new message, or NULL if error.
1474  */
1475 mblk_t *
1476 copymsg(mblk_t *bp)
1477 {
1478 	mblk_t *head, *nbp;
1479 
1480 	if (!bp || !(nbp = head = copyb(bp)))
1481 		return (NULL);
1482 
1483 	while (bp->b_cont) {
1484 		if (!(nbp->b_cont = copyb(bp->b_cont))) {
1485 			freemsg(head);
1486 			return (NULL);
1487 		}
1488 		nbp = nbp->b_cont;
1489 		bp = bp->b_cont;
1490 	}
1491 	return (head);
1492 }
1493 
1494 /*
1495  * link a message block to tail of message
1496  */
1497 void
1498 linkb(mblk_t *mp, mblk_t *bp)
1499 {
1500 	ASSERT(mp && bp);
1501 
1502 	for (; mp->b_cont; mp = mp->b_cont)
1503 		;
1504 	mp->b_cont = bp;
1505 }
1506 
1507 /*
1508  * unlink a message block from head of message
1509  * return pointer to new message.
1510  * NULL if message becomes empty.
1511  */
1512 mblk_t *
1513 unlinkb(mblk_t *bp)
1514 {
1515 	mblk_t *bp1;
1516 
1517 	bp1 = bp->b_cont;
1518 	bp->b_cont = NULL;
1519 	return (bp1);
1520 }
1521 
1522 /*
1523  * remove a message block "bp" from message "mp"
1524  *
1525  * Return pointer to new message or NULL if no message remains.
1526  * Return -1 if bp is not found in message.
1527  */
1528 mblk_t *
1529 rmvb(mblk_t *mp, mblk_t *bp)
1530 {
1531 	mblk_t *tmp;
1532 	mblk_t *lastp = NULL;
1533 
1534 	ASSERT(mp && bp);
1535 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
1536 		if (tmp == bp) {
1537 			if (lastp)
1538 				lastp->b_cont = tmp->b_cont;
1539 			else
1540 				mp = tmp->b_cont;
1541 			tmp->b_cont = NULL;
1542 			return (mp);
1543 		}
1544 		lastp = tmp;
1545 	}
1546 	return ((mblk_t *)-1);
1547 }
1548 
1549 /*
1550  * Concatenate and align first len bytes of common
1551  * message type.  Len == -1, means concat everything.
1552  * Returns 1 on success, 0 on failure
1553  * After the pullup, mp points to the pulled up data.
1554  */
1555 int
1556 pullupmsg(mblk_t *mp, ssize_t len)
1557 {
1558 	mblk_t *bp, *b_cont;
1559 	dblk_t *dbp;
1560 	ssize_t n;
1561 
1562 	ASSERT(mp->b_datap->db_ref > 0);
1563 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
1564 
1565 	/*
1566 	 * We won't handle Multidata message, since it contains
1567 	 * metadata which this function has no knowledge of; we
1568 	 * assert on DEBUG, and return failure otherwise.
1569 	 */
1570 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1571 	if (mp->b_datap->db_type == M_MULTIDATA)
1572 		return (0);
1573 
1574 	if (len == -1) {
1575 		if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
1576 			return (1);
1577 		len = xmsgsize(mp);
1578 	} else {
1579 		ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
1580 		ASSERT(first_mblk_len >= 0);
1581 		/*
1582 		 * If the length is less than that of the first mblk,
1583 		 * we want to pull up the message into an aligned mblk.
1584 		 * Though not part of the spec, some callers assume it.
1585 		 */
1586 		if (len <= first_mblk_len) {
1587 			if (str_aligned(mp->b_rptr))
1588 				return (1);
1589 			len = first_mblk_len;
1590 		} else if (xmsgsize(mp) < len)
1591 			return (0);
1592 	}
1593 
1594 	if ((bp = allocb_tmpl(len, mp)) == NULL)
1595 		return (0);
1596 
1597 	dbp = bp->b_datap;
1598 	*bp = *mp;		/* swap mblks so bp heads the old msg... */
1599 	mp->b_datap = dbp;	/* ... and mp heads the new message */
1600 	mp->b_datap->db_mblk = mp;
1601 	bp->b_datap->db_mblk = bp;
1602 	mp->b_rptr = mp->b_wptr = dbp->db_base;
1603 
1604 	do {
1605 		ASSERT(bp->b_datap->db_ref > 0);
1606 		ASSERT(bp->b_wptr >= bp->b_rptr);
1607 		n = MIN(bp->b_wptr - bp->b_rptr, len);
1608 		ASSERT(n >= 0);		/* allow zero-length mblk_t's */
1609 		if (n > 0)
1610 			bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
1611 		mp->b_wptr += n;
1612 		bp->b_rptr += n;
1613 		len -= n;
1614 		if (bp->b_rptr != bp->b_wptr)
1615 			break;
1616 		b_cont = bp->b_cont;
1617 		freeb(bp);
1618 		bp = b_cont;
1619 	} while (len && bp);
1620 
1621 	mp->b_cont = bp;	/* tack on whatever wasn't pulled up */
1622 
1623 	return (1);
1624 }
1625 
1626 /*
1627  * Concatenate and align at least the first len bytes of common message
1628  * type.  Len == -1 means concatenate everything.  The original message is
1629  * unaltered.  Returns a pointer to a new message on success, otherwise
1630  * returns NULL.
1631  */
1632 mblk_t *
1633 msgpullup(mblk_t *mp, ssize_t len)
1634 {
1635 	mblk_t	*newmp;
1636 	ssize_t	totlen;
1637 	ssize_t	n;
1638 
1639 	/*
1640 	 * We won't handle Multidata message, since it contains
1641 	 * metadata which this function has no knowledge of; we
1642 	 * assert on DEBUG, and return failure otherwise.
1643 	 */
1644 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1645 	if (mp->b_datap->db_type == M_MULTIDATA)
1646 		return (NULL);
1647 
1648 	totlen = xmsgsize(mp);
1649 
1650 	if ((len > 0) && (len > totlen))
1651 		return (NULL);
1652 
1653 	/*
1654 	 * Copy all of the first msg type into one new mblk, then dupmsg
1655 	 * and link the rest onto this.
1656 	 */
1657 
1658 	len = totlen;
1659 
1660 	if ((newmp = allocb_tmpl(len, mp)) == NULL)
1661 		return (NULL);
1662 
1663 	newmp->b_flag = mp->b_flag;
1664 	newmp->b_band = mp->b_band;
1665 
1666 	while (len > 0) {
1667 		n = mp->b_wptr - mp->b_rptr;
1668 		ASSERT(n >= 0);		/* allow zero-length mblk_t's */
1669 		if (n > 0)
1670 			bcopy(mp->b_rptr, newmp->b_wptr, n);
1671 		newmp->b_wptr += n;
1672 		len -= n;
1673 		mp = mp->b_cont;
1674 	}
1675 
1676 	if (mp != NULL) {
1677 		newmp->b_cont = dupmsg(mp);
1678 		if (newmp->b_cont == NULL) {
1679 			freemsg(newmp);
1680 			return (NULL);
1681 		}
1682 	}
1683 
1684 	return (newmp);
1685 }
1686 
1687 /*
1688  * Trim bytes from message
1689  *  len > 0, trim from head
1690  *  len < 0, trim from tail
1691  * Returns 1 on success, 0 on failure.
1692  */
1693 int
1694 adjmsg(mblk_t *mp, ssize_t len)
1695 {
1696 	mblk_t *bp;
1697 	mblk_t *save_bp = NULL;
1698 	mblk_t *prev_bp;
1699 	mblk_t *bcont;
1700 	unsigned char type;
1701 	ssize_t n;
1702 	int fromhead;
1703 	int first;
1704 
1705 	ASSERT(mp != NULL);
1706 	/*
1707 	 * We won't handle Multidata message, since it contains
1708 	 * metadata which this function has no knowledge of; we
1709 	 * assert on DEBUG, and return failure otherwise.
1710 	 */
1711 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1712 	if (mp->b_datap->db_type == M_MULTIDATA)
1713 		return (0);
1714 
1715 	if (len < 0) {
1716 		fromhead = 0;
1717 		len = -len;
1718 	} else {
1719 		fromhead = 1;
1720 	}
1721 
1722 	if (xmsgsize(mp) < len)
1723 		return (0);
1724 
1725 	if (fromhead) {
1726 		first = 1;
1727 		while (len) {
1728 			ASSERT(mp->b_wptr >= mp->b_rptr);
1729 			n = MIN(mp->b_wptr - mp->b_rptr, len);
1730 			mp->b_rptr += n;
1731 			len -= n;
1732 
1733 			/*
1734 			 * If this is not the first zero length
1735 			 * message remove it
1736 			 */
1737 			if (!first && (mp->b_wptr == mp->b_rptr)) {
1738 				bcont = mp->b_cont;
1739 				freeb(mp);
1740 				mp = save_bp->b_cont = bcont;
1741 			} else {
1742 				save_bp = mp;
1743 				mp = mp->b_cont;
1744 			}
1745 			first = 0;
1746 		}
1747 	} else {
1748 		type = mp->b_datap->db_type;
1749 		while (len) {
1750 			bp = mp;
1751 			save_bp = NULL;
1752 
1753 			/*
1754 			 * Find the last message of same type
1755 			 */
1756 			while (bp && bp->b_datap->db_type == type) {
1757 				ASSERT(bp->b_wptr >= bp->b_rptr);
1758 				prev_bp = save_bp;
1759 				save_bp = bp;
1760 				bp = bp->b_cont;
1761 			}
1762 			if (save_bp == NULL)
1763 				break;
1764 			n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
1765 			save_bp->b_wptr -= n;
1766 			len -= n;
1767 
1768 			/*
1769 			 * If this is not the first message
1770 			 * and we have taken away everything
1771 			 * from this message, remove it
1772 			 */
1773 
1774 			if ((save_bp != mp) &&
1775 			    (save_bp->b_wptr == save_bp->b_rptr)) {
1776 				bcont = save_bp->b_cont;
1777 				freeb(save_bp);
1778 				prev_bp->b_cont = bcont;
1779 			}
1780 		}
1781 	}
1782 	return (1);
1783 }
1784 
1785 /*
1786  * get number of data bytes in message
1787  */
1788 size_t
1789 msgdsize(mblk_t *bp)
1790 {
1791 	size_t count = 0;
1792 
1793 	for (; bp; bp = bp->b_cont)
1794 		if (bp->b_datap->db_type == M_DATA) {
1795 			ASSERT(bp->b_wptr >= bp->b_rptr);
1796 			count += bp->b_wptr - bp->b_rptr;
1797 		}
1798 	return (count);
1799 }
1800 
1801 /*
1802  * Get a message off head of queue
1803  *
1804  * If queue has no buffers then mark queue
1805  * with QWANTR. (queue wants to be read by
1806  * someone when data becomes available)
1807  *
1808  * If there is something to take off then do so.
1809  * If queue falls below hi water mark turn off QFULL
1810  * flag.  Decrement weighted count of queue.
1811  * Also turn off QWANTR because queue is being read.
1812  *
1813  * The queue count is maintained on a per-band basis.
1814  * Priority band 0 (normal messages) uses q_count,
1815  * q_lowat, etc.  Non-zero priority bands use the
1816  * fields in their respective qband structures
1817  * (qb_count, qb_lowat, etc.)  All messages appear
1818  * on the same list, linked via their b_next pointers.
1819  * q_first is the head of the list.  q_count does
1820  * not reflect the size of all the messages on the
1821  * queue.  It only reflects those messages in the
1822  * normal band of flow.  The one exception to this
1823  * deals with high priority messages.  They are in
1824  * their own conceptual "band", but are accounted
1825  * against q_count.
1826  *
1827  * If queue count is below the lo water mark and QWANTW
1828  * is set, enable the closest backq which has a service
1829  * procedure and turn off the QWANTW flag.
1830  *
1831  * getq could be built on top of rmvq, but isn't because
1832  * of performance considerations.
1833  *
1834  * A note on the use of q_count and q_mblkcnt:
1835  *   q_count is the traditional byte count for messages that
1836  *   have been put on a queue.  Documentation tells us that
1837  *   we shouldn't rely on that count, but some drivers/modules
1838  *   do.  What was needed, however, is a mechanism to prevent
1839  *   runaway streams from consuming all of the resources,
1840  *   and particularly be able to flow control zero-length
1841  *   messages.  q_mblkcnt is used for this purpose.  It
1842  *   counts the number of mblk's that are being put on
1843  *   the queue.  The intention here, is that each mblk should
1844  *   contain one byte of data and, for the purpose of
1845  *   flow-control, logically does.  A queue will become
1846  *   full when EITHER of these values (q_count and q_mblkcnt)
1847  *   reach the highwater mark.  It will clear when BOTH
1848  *   of them drop below the highwater mark.  And it will
1849  *   backenable when BOTH of them drop below the lowwater
1850  *   mark.
1851  *   With this algorithm, a driver/module might be able
1852  *   to find a reasonably accurate q_count, and the
1853  *   framework can still try and limit resource usage.
1854  */
1855 mblk_t *
1856 getq(queue_t *q)
1857 {
1858 	mblk_t *bp;
1859 	uchar_t band = 0;
1860 
1861 	bp = getq_noenab(q, 0);
1862 	if (bp != NULL)
1863 		band = bp->b_band;
1864 
1865 	/*
1866 	 * Inlined from qbackenable().
1867 	 * Quick check without holding the lock.
1868 	 */
1869 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1870 		return (bp);
1871 
1872 	qbackenable(q, band);
1873 	return (bp);
1874 }
1875 
1876 /*
1877  * Calculate number of data bytes in a single data message block taking
1878  * multidata messages into account.
1879  */
1880 
1881 #define	ADD_MBLK_SIZE(mp, size) 					\
1882 	if (DB_TYPE(mp) != M_MULTIDATA) {				\
1883 		(size) += MBLKL(mp);					\
1884 	} else {							\
1885 		uint_t	pinuse;						\
1886 									\
1887 		mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse);	\
1888 		(size) += pinuse;					\
1889 	}
1890 
1891 /*
1892  * Returns the number of bytes in a message (a message is defined as a
1893  * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we
1894  * also return the number of distinct mblks in the message.
1895  */
1896 int
1897 mp_cont_len(mblk_t *bp, int *mblkcnt)
1898 {
1899 	mblk_t	*mp;
1900 	int	mblks = 0;
1901 	int	bytes = 0;
1902 
1903 	for (mp = bp; mp != NULL; mp = mp->b_cont) {
1904 		ADD_MBLK_SIZE(mp, bytes);
1905 		mblks++;
1906 	}
1907 
1908 	if (mblkcnt != NULL)
1909 		*mblkcnt = mblks;
1910 
1911 	return (bytes);
1912 }
1913 
1914 /*
1915  * Like getq() but does not backenable.  This is used by the stream
1916  * head when a putback() is likely.  The caller must call qbackenable()
1917  * after it is done with accessing the queue.
1918  * The rbytes arguments to getq_noneab() allows callers to specify a
1919  * the maximum number of bytes to return. If the current amount on the
1920  * queue is less than this then the entire message will be returned.
1921  * A value of 0 returns the entire message and is equivalent to the old
1922  * default behaviour prior to the addition of the rbytes argument.
1923  */
1924 mblk_t *
1925 getq_noenab(queue_t *q, ssize_t rbytes)
1926 {
1927 	mblk_t *bp, *mp1;
1928 	mblk_t *mp2 = NULL;
1929 	qband_t *qbp;
1930 	kthread_id_t freezer;
1931 	int	bytecnt = 0, mblkcnt = 0;
1932 
1933 	/* freezestr should allow its caller to call getq/putq */
1934 	freezer = STREAM(q)->sd_freezer;
1935 	if (freezer == curthread) {
1936 		ASSERT(frozenstr(q));
1937 		ASSERT(MUTEX_HELD(QLOCK(q)));
1938 	} else
1939 		mutex_enter(QLOCK(q));
1940 
1941 	if ((bp = q->q_first) == 0) {
1942 		q->q_flag |= QWANTR;
1943 	} else {
1944 		/*
1945 		 * If the caller supplied a byte threshold and there is
1946 		 * more than this amount on the queue then break up the
1947 		 * the message appropriately.  We can only safely do
1948 		 * this for M_DATA messages.
1949 		 */
1950 		if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) &&
1951 		    (q->q_count > rbytes)) {
1952 			/*
1953 			 * Inline version of mp_cont_len() which terminates
1954 			 * when we meet or exceed rbytes.
1955 			 */
1956 			for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) {
1957 				mblkcnt++;
1958 				ADD_MBLK_SIZE(mp1, bytecnt);
1959 				if (bytecnt  >= rbytes)
1960 					break;
1961 			}
1962 			/*
1963 			 * We need to account for the following scenarios:
1964 			 *
1965 			 * 1) Too much data in the first message:
1966 			 *	mp1 will be the mblk which puts us over our
1967 			 *	byte limit.
1968 			 * 2) Not enough data in the first message:
1969 			 *	mp1 will be NULL.
1970 			 * 3) Exactly the right amount of data contained within
1971 			 *    whole mblks:
1972 			 *	mp1->b_cont will be where we break the message.
1973 			 */
1974 			if (bytecnt > rbytes) {
1975 				/*
1976 				 * Dup/copy mp1 and put what we don't need
1977 				 * back onto the queue. Adjust the read/write
1978 				 * and continuation pointers appropriately
1979 				 * and decrement the current mblk count to
1980 				 * reflect we are putting an mblk back onto
1981 				 * the queue.
1982 				 * When adjusting the message pointers, it's
1983 				 * OK to use the existing bytecnt and the
1984 				 * requested amount (rbytes) to calculate the
1985 				 * the new write offset (b_wptr) of what we
1986 				 * are taking. However, we  cannot use these
1987 				 * values when calculating the read offset of
1988 				 * the mblk we are putting back on the queue.
1989 				 * This is because the begining (b_rptr) of the
1990 				 * mblk represents some arbitrary point within
1991 				 * the message.
1992 				 * It's simplest to do this by advancing b_rptr
1993 				 * by the new length of mp1 as we don't have to
1994 				 * remember any intermediate state.
1995 				 */
1996 				ASSERT(mp1 != NULL);
1997 				mblkcnt--;
1998 				if ((mp2 = dupb(mp1)) == NULL &&
1999 				    (mp2 = copyb(mp1)) == NULL) {
2000 					bytecnt = mblkcnt = 0;
2001 					goto dup_failed;
2002 				}
2003 				mp2->b_cont = mp1->b_cont;
2004 				mp1->b_wptr -= bytecnt - rbytes;
2005 				mp2->b_rptr += mp1->b_wptr - mp1->b_rptr;
2006 				mp1->b_cont = NULL;
2007 				bytecnt = rbytes;
2008 			} else {
2009 				/*
2010 				 * Either there is not enough data in the first
2011 				 * message or there is no excess data to deal
2012 				 * with. If mp1 is NULL, we are taking the
2013 				 * whole message. No need to do anything.
2014 				 * Otherwise we assign mp1->b_cont to mp2 as
2015 				 * we will be putting this back onto the head of
2016 				 * the queue.
2017 				 */
2018 				if (mp1 != NULL) {
2019 					mp2 = mp1->b_cont;
2020 					mp1->b_cont = NULL;
2021 				}
2022 			}
2023 			/*
2024 			 * If mp2 is not NULL then we have part of the message
2025 			 * to put back onto the queue.
2026 			 */
2027 			if (mp2 != NULL) {
2028 				if ((mp2->b_next = bp->b_next) == NULL)
2029 					q->q_last = mp2;
2030 				else
2031 					bp->b_next->b_prev = mp2;
2032 				q->q_first = mp2;
2033 			} else {
2034 				if ((q->q_first = bp->b_next) == NULL)
2035 					q->q_last = NULL;
2036 				else
2037 					q->q_first->b_prev = NULL;
2038 			}
2039 		} else {
2040 			/*
2041 			 * Either no byte threshold was supplied, there is
2042 			 * not enough on the queue or we failed to
2043 			 * duplicate/copy a data block. In these cases we
2044 			 * just take the entire first message.
2045 			 */
2046 dup_failed:
2047 			bytecnt = mp_cont_len(bp, &mblkcnt);
2048 			if ((q->q_first = bp->b_next) == NULL)
2049 				q->q_last = NULL;
2050 			else
2051 				q->q_first->b_prev = NULL;
2052 		}
2053 		if (bp->b_band == 0) {
2054 			q->q_count -= bytecnt;
2055 			q->q_mblkcnt -= mblkcnt;
2056 			if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2057 			    (q->q_mblkcnt < q->q_hiwat))) {
2058 				q->q_flag &= ~QFULL;
2059 			}
2060 		} else {
2061 			int i;
2062 
2063 			ASSERT(bp->b_band <= q->q_nband);
2064 			ASSERT(q->q_bandp != NULL);
2065 			ASSERT(MUTEX_HELD(QLOCK(q)));
2066 			qbp = q->q_bandp;
2067 			i = bp->b_band;
2068 			while (--i > 0)
2069 				qbp = qbp->qb_next;
2070 			if (qbp->qb_first == qbp->qb_last) {
2071 				qbp->qb_first = NULL;
2072 				qbp->qb_last = NULL;
2073 			} else {
2074 				qbp->qb_first = bp->b_next;
2075 			}
2076 			qbp->qb_count -= bytecnt;
2077 			qbp->qb_mblkcnt -= mblkcnt;
2078 			if (qbp->qb_mblkcnt == 0 ||
2079 			    ((qbp->qb_count < qbp->qb_hiwat) &&
2080 			    (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2081 				qbp->qb_flag &= ~QB_FULL;
2082 			}
2083 		}
2084 		q->q_flag &= ~QWANTR;
2085 		bp->b_next = NULL;
2086 		bp->b_prev = NULL;
2087 	}
2088 	if (freezer != curthread)
2089 		mutex_exit(QLOCK(q));
2090 
2091 	STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL);
2092 
2093 	return (bp);
2094 }
2095 
2096 /*
2097  * Determine if a backenable is needed after removing a message in the
2098  * specified band.
2099  * NOTE: This routine assumes that something like getq_noenab() has been
2100  * already called.
2101  *
2102  * For the read side it is ok to hold sd_lock across calling this (and the
2103  * stream head often does).
2104  * But for the write side strwakeq might be invoked and it acquires sd_lock.
2105  */
2106 void
2107 qbackenable(queue_t *q, uchar_t band)
2108 {
2109 	int backenab = 0;
2110 	qband_t *qbp;
2111 	kthread_id_t freezer;
2112 
2113 	ASSERT(q);
2114 	ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
2115 
2116 	/*
2117 	 * Quick check without holding the lock.
2118 	 * OK since after getq() has lowered the q_count these flags
2119 	 * would not change unless either the qbackenable() is done by
2120 	 * another thread (which is ok) or the queue has gotten QFULL
2121 	 * in which case another backenable will take place when the queue
2122 	 * drops below q_lowat.
2123 	 */
2124 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
2125 		return;
2126 
2127 	/* freezestr should allow its caller to call getq/putq */
2128 	freezer = STREAM(q)->sd_freezer;
2129 	if (freezer == curthread) {
2130 		ASSERT(frozenstr(q));
2131 		ASSERT(MUTEX_HELD(QLOCK(q)));
2132 	} else
2133 		mutex_enter(QLOCK(q));
2134 
2135 	if (band == 0) {
2136 		if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
2137 		    q->q_mblkcnt < q->q_lowat)) {
2138 			backenab = q->q_flag & (QWANTW|QWANTWSYNC);
2139 		}
2140 	} else {
2141 		int i;
2142 
2143 		ASSERT((unsigned)band <= q->q_nband);
2144 		ASSERT(q->q_bandp != NULL);
2145 
2146 		qbp = q->q_bandp;
2147 		i = band;
2148 		while (--i > 0)
2149 			qbp = qbp->qb_next;
2150 
2151 		if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
2152 		    qbp->qb_mblkcnt < qbp->qb_lowat)) {
2153 			backenab = qbp->qb_flag & QB_WANTW;
2154 		}
2155 	}
2156 
2157 	if (backenab == 0) {
2158 		if (freezer != curthread)
2159 			mutex_exit(QLOCK(q));
2160 		return;
2161 	}
2162 
2163 	/* Have to drop the lock across strwakeq and backenable */
2164 	if (backenab & QWANTWSYNC)
2165 		q->q_flag &= ~QWANTWSYNC;
2166 	if (backenab & (QWANTW|QB_WANTW)) {
2167 		if (band != 0)
2168 			qbp->qb_flag &= ~QB_WANTW;
2169 		else {
2170 			q->q_flag &= ~QWANTW;
2171 		}
2172 	}
2173 
2174 	if (freezer != curthread)
2175 		mutex_exit(QLOCK(q));
2176 
2177 	if (backenab & QWANTWSYNC)
2178 		strwakeq(q, QWANTWSYNC);
2179 	if (backenab & (QWANTW|QB_WANTW))
2180 		backenable(q, band);
2181 }
2182 
2183 /*
2184  * Remove a message from a queue.  The queue count and other
2185  * flow control parameters are adjusted and the back queue
2186  * enabled if necessary.
2187  *
2188  * rmvq can be called with the stream frozen, but other utility functions
2189  * holding QLOCK, and by streams modules without any locks/frozen.
2190  */
2191 void
2192 rmvq(queue_t *q, mblk_t *mp)
2193 {
2194 	ASSERT(mp != NULL);
2195 
2196 	rmvq_noenab(q, mp);
2197 	if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
2198 		/*
2199 		 * qbackenable can handle a frozen stream but not a "random"
2200 		 * qlock being held. Drop lock across qbackenable.
2201 		 */
2202 		mutex_exit(QLOCK(q));
2203 		qbackenable(q, mp->b_band);
2204 		mutex_enter(QLOCK(q));
2205 	} else {
2206 		qbackenable(q, mp->b_band);
2207 	}
2208 }
2209 
2210 /*
2211  * Like rmvq() but without any backenabling.
2212  * This exists to handle SR_CONSOL_DATA in strrput().
2213  */
2214 void
2215 rmvq_noenab(queue_t *q, mblk_t *mp)
2216 {
2217 	int i;
2218 	qband_t *qbp = NULL;
2219 	kthread_id_t freezer;
2220 	int	bytecnt = 0, mblkcnt = 0;
2221 
2222 	freezer = STREAM(q)->sd_freezer;
2223 	if (freezer == curthread) {
2224 		ASSERT(frozenstr(q));
2225 		ASSERT(MUTEX_HELD(QLOCK(q)));
2226 	} else if (MUTEX_HELD(QLOCK(q))) {
2227 		/* Don't drop lock on exit */
2228 		freezer = curthread;
2229 	} else
2230 		mutex_enter(QLOCK(q));
2231 
2232 	ASSERT(mp->b_band <= q->q_nband);
2233 	if (mp->b_band != 0) {		/* Adjust band pointers */
2234 		ASSERT(q->q_bandp != NULL);
2235 		qbp = q->q_bandp;
2236 		i = mp->b_band;
2237 		while (--i > 0)
2238 			qbp = qbp->qb_next;
2239 		if (mp == qbp->qb_first) {
2240 			if (mp->b_next && mp->b_band == mp->b_next->b_band)
2241 				qbp->qb_first = mp->b_next;
2242 			else
2243 				qbp->qb_first = NULL;
2244 		}
2245 		if (mp == qbp->qb_last) {
2246 			if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
2247 				qbp->qb_last = mp->b_prev;
2248 			else
2249 				qbp->qb_last = NULL;
2250 		}
2251 	}
2252 
2253 	/*
2254 	 * Remove the message from the list.
2255 	 */
2256 	if (mp->b_prev)
2257 		mp->b_prev->b_next = mp->b_next;
2258 	else
2259 		q->q_first = mp->b_next;
2260 	if (mp->b_next)
2261 		mp->b_next->b_prev = mp->b_prev;
2262 	else
2263 		q->q_last = mp->b_prev;
2264 	mp->b_next = NULL;
2265 	mp->b_prev = NULL;
2266 
2267 	/* Get the size of the message for q_count accounting */
2268 	bytecnt = mp_cont_len(mp, &mblkcnt);
2269 
2270 	if (mp->b_band == 0) {		/* Perform q_count accounting */
2271 		q->q_count -= bytecnt;
2272 		q->q_mblkcnt -= mblkcnt;
2273 		if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2274 		    (q->q_mblkcnt < q->q_hiwat))) {
2275 			q->q_flag &= ~QFULL;
2276 		}
2277 	} else {			/* Perform qb_count accounting */
2278 		qbp->qb_count -= bytecnt;
2279 		qbp->qb_mblkcnt -= mblkcnt;
2280 		if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) &&
2281 		    (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2282 			qbp->qb_flag &= ~QB_FULL;
2283 		}
2284 	}
2285 	if (freezer != curthread)
2286 		mutex_exit(QLOCK(q));
2287 
2288 	STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL);
2289 }
2290 
2291 /*
2292  * Empty a queue.
2293  * If flag is set, remove all messages.  Otherwise, remove
2294  * only non-control messages.  If queue falls below its low
2295  * water mark, and QWANTW is set, enable the nearest upstream
2296  * service procedure.
2297  *
2298  * Historical note: when merging the M_FLUSH code in strrput with this
2299  * code one difference was discovered. flushq did not have a check
2300  * for q_lowat == 0 in the backenabling test.
2301  *
2302  * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
2303  * if one exists on the queue.
2304  */
2305 void
2306 flushq_common(queue_t *q, int flag, int pcproto_flag)
2307 {
2308 	mblk_t *mp, *nmp;
2309 	qband_t *qbp;
2310 	int backenab = 0;
2311 	unsigned char bpri;
2312 	unsigned char	qbf[NBAND];	/* band flushing backenable flags */
2313 
2314 	if (q->q_first == NULL)
2315 		return;
2316 
2317 	mutex_enter(QLOCK(q));
2318 	mp = q->q_first;
2319 	q->q_first = NULL;
2320 	q->q_last = NULL;
2321 	q->q_count = 0;
2322 	q->q_mblkcnt = 0;
2323 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2324 		qbp->qb_first = NULL;
2325 		qbp->qb_last = NULL;
2326 		qbp->qb_count = 0;
2327 		qbp->qb_mblkcnt = 0;
2328 		qbp->qb_flag &= ~QB_FULL;
2329 	}
2330 	q->q_flag &= ~QFULL;
2331 	mutex_exit(QLOCK(q));
2332 	while (mp) {
2333 		nmp = mp->b_next;
2334 		mp->b_next = mp->b_prev = NULL;
2335 
2336 		STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL);
2337 
2338 		if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
2339 			(void) putq(q, mp);
2340 		else if (flag || datamsg(mp->b_datap->db_type))
2341 			freemsg(mp);
2342 		else
2343 			(void) putq(q, mp);
2344 		mp = nmp;
2345 	}
2346 	bpri = 1;
2347 	mutex_enter(QLOCK(q));
2348 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2349 		if ((qbp->qb_flag & QB_WANTW) &&
2350 		    (((qbp->qb_count < qbp->qb_lowat) &&
2351 		    (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
2352 		    qbp->qb_lowat == 0)) {
2353 			qbp->qb_flag &= ~QB_WANTW;
2354 			backenab = 1;
2355 			qbf[bpri] = 1;
2356 		} else
2357 			qbf[bpri] = 0;
2358 		bpri++;
2359 	}
2360 	ASSERT(bpri == (unsigned char)(q->q_nband + 1));
2361 	if ((q->q_flag & QWANTW) &&
2362 	    (((q->q_count < q->q_lowat) &&
2363 	    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2364 		q->q_flag &= ~QWANTW;
2365 		backenab = 1;
2366 		qbf[0] = 1;
2367 	} else
2368 		qbf[0] = 0;
2369 
2370 	/*
2371 	 * If any band can now be written to, and there is a writer
2372 	 * for that band, then backenable the closest service procedure.
2373 	 */
2374 	if (backenab) {
2375 		mutex_exit(QLOCK(q));
2376 		for (bpri = q->q_nband; bpri != 0; bpri--)
2377 			if (qbf[bpri])
2378 				backenable(q, bpri);
2379 		if (qbf[0])
2380 			backenable(q, 0);
2381 	} else
2382 		mutex_exit(QLOCK(q));
2383 }
2384 
2385 /*
2386  * The real flushing takes place in flushq_common. This is done so that
2387  * a flag which specifies whether or not M_PCPROTO messages should be flushed
2388  * or not. Currently the only place that uses this flag is the stream head.
2389  */
2390 void
2391 flushq(queue_t *q, int flag)
2392 {
2393 	flushq_common(q, flag, 0);
2394 }
2395 
2396 /*
2397  * Flush the queue of messages of the given priority band.
2398  * There is some duplication of code between flushq and flushband.
2399  * This is because we want to optimize the code as much as possible.
2400  * The assumption is that there will be more messages in the normal
2401  * (priority 0) band than in any other.
2402  *
2403  * Historical note: when merging the M_FLUSH code in strrput with this
2404  * code one difference was discovered. flushband had an extra check for
2405  * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
2406  * case. That check does not match the man page for flushband and was not
2407  * in the strrput flush code hence it was removed.
2408  */
2409 void
2410 flushband(queue_t *q, unsigned char pri, int flag)
2411 {
2412 	mblk_t *mp;
2413 	mblk_t *nmp;
2414 	mblk_t *last;
2415 	qband_t *qbp;
2416 	int band;
2417 
2418 	ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
2419 	if (pri > q->q_nband) {
2420 		return;
2421 	}
2422 	mutex_enter(QLOCK(q));
2423 	if (pri == 0) {
2424 		mp = q->q_first;
2425 		q->q_first = NULL;
2426 		q->q_last = NULL;
2427 		q->q_count = 0;
2428 		q->q_mblkcnt = 0;
2429 		for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2430 			qbp->qb_first = NULL;
2431 			qbp->qb_last = NULL;
2432 			qbp->qb_count = 0;
2433 			qbp->qb_mblkcnt = 0;
2434 			qbp->qb_flag &= ~QB_FULL;
2435 		}
2436 		q->q_flag &= ~QFULL;
2437 		mutex_exit(QLOCK(q));
2438 		while (mp) {
2439 			nmp = mp->b_next;
2440 			mp->b_next = mp->b_prev = NULL;
2441 			if ((mp->b_band == 0) &&
2442 			    ((flag == FLUSHALL) ||
2443 			    datamsg(mp->b_datap->db_type)))
2444 				freemsg(mp);
2445 			else
2446 				(void) putq(q, mp);
2447 			mp = nmp;
2448 		}
2449 		mutex_enter(QLOCK(q));
2450 		if ((q->q_flag & QWANTW) &&
2451 		    (((q->q_count < q->q_lowat) &&
2452 		    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2453 			q->q_flag &= ~QWANTW;
2454 			mutex_exit(QLOCK(q));
2455 
2456 			backenable(q, pri);
2457 		} else
2458 			mutex_exit(QLOCK(q));
2459 	} else {	/* pri != 0 */
2460 		boolean_t flushed = B_FALSE;
2461 		band = pri;
2462 
2463 		ASSERT(MUTEX_HELD(QLOCK(q)));
2464 		qbp = q->q_bandp;
2465 		while (--band > 0)
2466 			qbp = qbp->qb_next;
2467 		mp = qbp->qb_first;
2468 		if (mp == NULL) {
2469 			mutex_exit(QLOCK(q));
2470 			return;
2471 		}
2472 		last = qbp->qb_last->b_next;
2473 		/*
2474 		 * rmvq_noenab() and freemsg() are called for each mblk that
2475 		 * meets the criteria.  The loop is executed until the last
2476 		 * mblk has been processed.
2477 		 */
2478 		while (mp != last) {
2479 			ASSERT(mp->b_band == pri);
2480 			nmp = mp->b_next;
2481 			if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
2482 				rmvq_noenab(q, mp);
2483 				freemsg(mp);
2484 				flushed = B_TRUE;
2485 			}
2486 			mp = nmp;
2487 		}
2488 		mutex_exit(QLOCK(q));
2489 
2490 		/*
2491 		 * If any mblk(s) has been freed, we know that qbackenable()
2492 		 * will need to be called.
2493 		 */
2494 		if (flushed)
2495 			qbackenable(q, pri);
2496 	}
2497 }
2498 
2499 /*
2500  * Return 1 if the queue is not full.  If the queue is full, return
2501  * 0 (may not put message) and set QWANTW flag (caller wants to write
2502  * to the queue).
2503  */
2504 int
2505 canput(queue_t *q)
2506 {
2507 	TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
2508 
2509 	/* this is for loopback transports, they should not do a canput */
2510 	ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
2511 
2512 	/* Find next forward module that has a service procedure */
2513 	q = q->q_nfsrv;
2514 
2515 	if (!(q->q_flag & QFULL)) {
2516 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2517 		return (1);
2518 	}
2519 	mutex_enter(QLOCK(q));
2520 	if (q->q_flag & QFULL) {
2521 		q->q_flag |= QWANTW;
2522 		mutex_exit(QLOCK(q));
2523 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
2524 		return (0);
2525 	}
2526 	mutex_exit(QLOCK(q));
2527 	TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2528 	return (1);
2529 }
2530 
2531 /*
2532  * This is the new canput for use with priority bands.  Return 1 if the
2533  * band is not full.  If the band is full, return 0 (may not put message)
2534  * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
2535  * write to the queue).
2536  */
2537 int
2538 bcanput(queue_t *q, unsigned char pri)
2539 {
2540 	qband_t *qbp;
2541 
2542 	TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
2543 	if (!q)
2544 		return (0);
2545 
2546 	/* Find next forward module that has a service procedure */
2547 	q = q->q_nfsrv;
2548 
2549 	mutex_enter(QLOCK(q));
2550 	if (pri == 0) {
2551 		if (q->q_flag & QFULL) {
2552 			q->q_flag |= QWANTW;
2553 			mutex_exit(QLOCK(q));
2554 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2555 			    "bcanput:%p %X %d", q, pri, 0);
2556 			return (0);
2557 		}
2558 	} else {	/* pri != 0 */
2559 		if (pri > q->q_nband) {
2560 			/*
2561 			 * No band exists yet, so return success.
2562 			 */
2563 			mutex_exit(QLOCK(q));
2564 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2565 			    "bcanput:%p %X %d", q, pri, 1);
2566 			return (1);
2567 		}
2568 		qbp = q->q_bandp;
2569 		while (--pri)
2570 			qbp = qbp->qb_next;
2571 		if (qbp->qb_flag & QB_FULL) {
2572 			qbp->qb_flag |= QB_WANTW;
2573 			mutex_exit(QLOCK(q));
2574 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2575 			    "bcanput:%p %X %d", q, pri, 0);
2576 			return (0);
2577 		}
2578 	}
2579 	mutex_exit(QLOCK(q));
2580 	TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2581 	    "bcanput:%p %X %d", q, pri, 1);
2582 	return (1);
2583 }
2584 
2585 /*
2586  * Put a message on a queue.
2587  *
2588  * Messages are enqueued on a priority basis.  The priority classes
2589  * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
2590  * and B_NORMAL (type < QPCTL && band == 0).
2591  *
2592  * Add appropriate weighted data block sizes to queue count.
2593  * If queue hits high water mark then set QFULL flag.
2594  *
2595  * If QNOENAB is not set (putq is allowed to enable the queue),
2596  * enable the queue only if the message is PRIORITY,
2597  * or the QWANTR flag is set (indicating that the service procedure
2598  * is ready to read the queue.  This implies that a service
2599  * procedure must NEVER put a high priority message back on its own
2600  * queue, as this would result in an infinite loop (!).
2601  */
2602 int
2603 putq(queue_t *q, mblk_t *bp)
2604 {
2605 	mblk_t *tmp;
2606 	qband_t *qbp = NULL;
2607 	int mcls = (int)queclass(bp);
2608 	kthread_id_t freezer;
2609 	int	bytecnt = 0, mblkcnt = 0;
2610 
2611 	freezer = STREAM(q)->sd_freezer;
2612 	if (freezer == curthread) {
2613 		ASSERT(frozenstr(q));
2614 		ASSERT(MUTEX_HELD(QLOCK(q)));
2615 	} else
2616 		mutex_enter(QLOCK(q));
2617 
2618 	/*
2619 	 * Make sanity checks and if qband structure is not yet
2620 	 * allocated, do so.
2621 	 */
2622 	if (mcls == QPCTL) {
2623 		if (bp->b_band != 0)
2624 			bp->b_band = 0;		/* force to be correct */
2625 	} else if (bp->b_band != 0) {
2626 		int i;
2627 		qband_t **qbpp;
2628 
2629 		if (bp->b_band > q->q_nband) {
2630 
2631 			/*
2632 			 * The qband structure for this priority band is
2633 			 * not on the queue yet, so we have to allocate
2634 			 * one on the fly.  It would be wasteful to
2635 			 * associate the qband structures with every
2636 			 * queue when the queues are allocated.  This is
2637 			 * because most queues will only need the normal
2638 			 * band of flow which can be described entirely
2639 			 * by the queue itself.
2640 			 */
2641 			qbpp = &q->q_bandp;
2642 			while (*qbpp)
2643 				qbpp = &(*qbpp)->qb_next;
2644 			while (bp->b_band > q->q_nband) {
2645 				if ((*qbpp = allocband()) == NULL) {
2646 					if (freezer != curthread)
2647 						mutex_exit(QLOCK(q));
2648 					return (0);
2649 				}
2650 				(*qbpp)->qb_hiwat = q->q_hiwat;
2651 				(*qbpp)->qb_lowat = q->q_lowat;
2652 				q->q_nband++;
2653 				qbpp = &(*qbpp)->qb_next;
2654 			}
2655 		}
2656 		ASSERT(MUTEX_HELD(QLOCK(q)));
2657 		qbp = q->q_bandp;
2658 		i = bp->b_band;
2659 		while (--i)
2660 			qbp = qbp->qb_next;
2661 	}
2662 
2663 	/*
2664 	 * If queue is empty, add the message and initialize the pointers.
2665 	 * Otherwise, adjust message pointers and queue pointers based on
2666 	 * the type of the message and where it belongs on the queue.  Some
2667 	 * code is duplicated to minimize the number of conditionals and
2668 	 * hopefully minimize the amount of time this routine takes.
2669 	 */
2670 	if (!q->q_first) {
2671 		bp->b_next = NULL;
2672 		bp->b_prev = NULL;
2673 		q->q_first = bp;
2674 		q->q_last = bp;
2675 		if (qbp) {
2676 			qbp->qb_first = bp;
2677 			qbp->qb_last = bp;
2678 		}
2679 	} else if (!qbp) {	/* bp->b_band == 0 */
2680 
2681 		/*
2682 		 * If queue class of message is less than or equal to
2683 		 * that of the last one on the queue, tack on to the end.
2684 		 */
2685 		tmp = q->q_last;
2686 		if (mcls <= (int)queclass(tmp)) {
2687 			bp->b_next = NULL;
2688 			bp->b_prev = tmp;
2689 			tmp->b_next = bp;
2690 			q->q_last = bp;
2691 		} else {
2692 			tmp = q->q_first;
2693 			while ((int)queclass(tmp) >= mcls)
2694 				tmp = tmp->b_next;
2695 
2696 			/*
2697 			 * Insert bp before tmp.
2698 			 */
2699 			bp->b_next = tmp;
2700 			bp->b_prev = tmp->b_prev;
2701 			if (tmp->b_prev)
2702 				tmp->b_prev->b_next = bp;
2703 			else
2704 				q->q_first = bp;
2705 			tmp->b_prev = bp;
2706 		}
2707 	} else {		/* bp->b_band != 0 */
2708 		if (qbp->qb_first) {
2709 			tmp = qbp->qb_last;
2710 
2711 			/*
2712 			 * Insert bp after the last message in this band.
2713 			 */
2714 			bp->b_next = tmp->b_next;
2715 			if (tmp->b_next)
2716 				tmp->b_next->b_prev = bp;
2717 			else
2718 				q->q_last = bp;
2719 			bp->b_prev = tmp;
2720 			tmp->b_next = bp;
2721 		} else {
2722 			tmp = q->q_last;
2723 			if ((mcls < (int)queclass(tmp)) ||
2724 			    (bp->b_band <= tmp->b_band)) {
2725 
2726 				/*
2727 				 * Tack bp on end of queue.
2728 				 */
2729 				bp->b_next = NULL;
2730 				bp->b_prev = tmp;
2731 				tmp->b_next = bp;
2732 				q->q_last = bp;
2733 			} else {
2734 				tmp = q->q_first;
2735 				while (tmp->b_datap->db_type >= QPCTL)
2736 					tmp = tmp->b_next;
2737 				while (tmp->b_band >= bp->b_band)
2738 					tmp = tmp->b_next;
2739 
2740 				/*
2741 				 * Insert bp before tmp.
2742 				 */
2743 				bp->b_next = tmp;
2744 				bp->b_prev = tmp->b_prev;
2745 				if (tmp->b_prev)
2746 					tmp->b_prev->b_next = bp;
2747 				else
2748 					q->q_first = bp;
2749 				tmp->b_prev = bp;
2750 			}
2751 			qbp->qb_first = bp;
2752 		}
2753 		qbp->qb_last = bp;
2754 	}
2755 
2756 	/* Get message byte count for q_count accounting */
2757 	bytecnt = mp_cont_len(bp, &mblkcnt);
2758 
2759 	if (qbp) {
2760 		qbp->qb_count += bytecnt;
2761 		qbp->qb_mblkcnt += mblkcnt;
2762 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2763 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2764 			qbp->qb_flag |= QB_FULL;
2765 		}
2766 	} else {
2767 		q->q_count += bytecnt;
2768 		q->q_mblkcnt += mblkcnt;
2769 		if ((q->q_count >= q->q_hiwat) ||
2770 		    (q->q_mblkcnt >= q->q_hiwat)) {
2771 			q->q_flag |= QFULL;
2772 		}
2773 	}
2774 
2775 	STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL);
2776 
2777 	if ((mcls > QNORM) ||
2778 	    (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
2779 		qenable_locked(q);
2780 	ASSERT(MUTEX_HELD(QLOCK(q)));
2781 	if (freezer != curthread)
2782 		mutex_exit(QLOCK(q));
2783 
2784 	return (1);
2785 }
2786 
2787 /*
2788  * Put stuff back at beginning of Q according to priority order.
2789  * See comment on putq above for details.
2790  */
2791 int
2792 putbq(queue_t *q, mblk_t *bp)
2793 {
2794 	mblk_t *tmp;
2795 	qband_t *qbp = NULL;
2796 	int mcls = (int)queclass(bp);
2797 	kthread_id_t freezer;
2798 	int	bytecnt = 0, mblkcnt = 0;
2799 
2800 	ASSERT(q && bp);
2801 	ASSERT(bp->b_next == NULL);
2802 	freezer = STREAM(q)->sd_freezer;
2803 	if (freezer == curthread) {
2804 		ASSERT(frozenstr(q));
2805 		ASSERT(MUTEX_HELD(QLOCK(q)));
2806 	} else
2807 		mutex_enter(QLOCK(q));
2808 
2809 	/*
2810 	 * Make sanity checks and if qband structure is not yet
2811 	 * allocated, do so.
2812 	 */
2813 	if (mcls == QPCTL) {
2814 		if (bp->b_band != 0)
2815 			bp->b_band = 0;		/* force to be correct */
2816 	} else if (bp->b_band != 0) {
2817 		int i;
2818 		qband_t **qbpp;
2819 
2820 		if (bp->b_band > q->q_nband) {
2821 			qbpp = &q->q_bandp;
2822 			while (*qbpp)
2823 				qbpp = &(*qbpp)->qb_next;
2824 			while (bp->b_band > q->q_nband) {
2825 				if ((*qbpp = allocband()) == NULL) {
2826 					if (freezer != curthread)
2827 						mutex_exit(QLOCK(q));
2828 					return (0);
2829 				}
2830 				(*qbpp)->qb_hiwat = q->q_hiwat;
2831 				(*qbpp)->qb_lowat = q->q_lowat;
2832 				q->q_nband++;
2833 				qbpp = &(*qbpp)->qb_next;
2834 			}
2835 		}
2836 		qbp = q->q_bandp;
2837 		i = bp->b_band;
2838 		while (--i)
2839 			qbp = qbp->qb_next;
2840 	}
2841 
2842 	/*
2843 	 * If queue is empty or if message is high priority,
2844 	 * place on the front of the queue.
2845 	 */
2846 	tmp = q->q_first;
2847 	if ((!tmp) || (mcls == QPCTL)) {
2848 		bp->b_next = tmp;
2849 		if (tmp)
2850 			tmp->b_prev = bp;
2851 		else
2852 			q->q_last = bp;
2853 		q->q_first = bp;
2854 		bp->b_prev = NULL;
2855 		if (qbp) {
2856 			qbp->qb_first = bp;
2857 			qbp->qb_last = bp;
2858 		}
2859 	} else if (qbp) {	/* bp->b_band != 0 */
2860 		tmp = qbp->qb_first;
2861 		if (tmp) {
2862 
2863 			/*
2864 			 * Insert bp before the first message in this band.
2865 			 */
2866 			bp->b_next = tmp;
2867 			bp->b_prev = tmp->b_prev;
2868 			if (tmp->b_prev)
2869 				tmp->b_prev->b_next = bp;
2870 			else
2871 				q->q_first = bp;
2872 			tmp->b_prev = bp;
2873 		} else {
2874 			tmp = q->q_last;
2875 			if ((mcls < (int)queclass(tmp)) ||
2876 			    (bp->b_band < tmp->b_band)) {
2877 
2878 				/*
2879 				 * Tack bp on end of queue.
2880 				 */
2881 				bp->b_next = NULL;
2882 				bp->b_prev = tmp;
2883 				tmp->b_next = bp;
2884 				q->q_last = bp;
2885 			} else {
2886 				tmp = q->q_first;
2887 				while (tmp->b_datap->db_type >= QPCTL)
2888 					tmp = tmp->b_next;
2889 				while (tmp->b_band > bp->b_band)
2890 					tmp = tmp->b_next;
2891 
2892 				/*
2893 				 * Insert bp before tmp.
2894 				 */
2895 				bp->b_next = tmp;
2896 				bp->b_prev = tmp->b_prev;
2897 				if (tmp->b_prev)
2898 					tmp->b_prev->b_next = bp;
2899 				else
2900 					q->q_first = bp;
2901 				tmp->b_prev = bp;
2902 			}
2903 			qbp->qb_last = bp;
2904 		}
2905 		qbp->qb_first = bp;
2906 	} else {		/* bp->b_band == 0 && !QPCTL */
2907 
2908 		/*
2909 		 * If the queue class or band is less than that of the last
2910 		 * message on the queue, tack bp on the end of the queue.
2911 		 */
2912 		tmp = q->q_last;
2913 		if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
2914 			bp->b_next = NULL;
2915 			bp->b_prev = tmp;
2916 			tmp->b_next = bp;
2917 			q->q_last = bp;
2918 		} else {
2919 			tmp = q->q_first;
2920 			while (tmp->b_datap->db_type >= QPCTL)
2921 				tmp = tmp->b_next;
2922 			while (tmp->b_band > bp->b_band)
2923 				tmp = tmp->b_next;
2924 
2925 			/*
2926 			 * Insert bp before tmp.
2927 			 */
2928 			bp->b_next = tmp;
2929 			bp->b_prev = tmp->b_prev;
2930 			if (tmp->b_prev)
2931 				tmp->b_prev->b_next = bp;
2932 			else
2933 				q->q_first = bp;
2934 			tmp->b_prev = bp;
2935 		}
2936 	}
2937 
2938 	/* Get message byte count for q_count accounting */
2939 	bytecnt = mp_cont_len(bp, &mblkcnt);
2940 
2941 	if (qbp) {
2942 		qbp->qb_count += bytecnt;
2943 		qbp->qb_mblkcnt += mblkcnt;
2944 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2945 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2946 			qbp->qb_flag |= QB_FULL;
2947 		}
2948 	} else {
2949 		q->q_count += bytecnt;
2950 		q->q_mblkcnt += mblkcnt;
2951 		if ((q->q_count >= q->q_hiwat) ||
2952 		    (q->q_mblkcnt >= q->q_hiwat)) {
2953 			q->q_flag |= QFULL;
2954 		}
2955 	}
2956 
2957 	STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL);
2958 
2959 	if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
2960 		qenable_locked(q);
2961 	ASSERT(MUTEX_HELD(QLOCK(q)));
2962 	if (freezer != curthread)
2963 		mutex_exit(QLOCK(q));
2964 
2965 	return (1);
2966 }
2967 
2968 /*
2969  * Insert a message before an existing message on the queue.  If the
2970  * existing message is NULL, the new messages is placed on the end of
2971  * the queue.  The queue class of the new message is ignored.  However,
2972  * the priority band of the new message must adhere to the following
2973  * ordering:
2974  *
2975  *	emp->b_prev->b_band >= mp->b_band >= emp->b_band.
2976  *
2977  * All flow control parameters are updated.
2978  *
2979  * insq can be called with the stream frozen, but other utility functions
2980  * holding QLOCK, and by streams modules without any locks/frozen.
2981  */
2982 int
2983 insq(queue_t *q, mblk_t *emp, mblk_t *mp)
2984 {
2985 	mblk_t *tmp;
2986 	qband_t *qbp = NULL;
2987 	int mcls = (int)queclass(mp);
2988 	kthread_id_t freezer;
2989 	int	bytecnt = 0, mblkcnt = 0;
2990 
2991 	freezer = STREAM(q)->sd_freezer;
2992 	if (freezer == curthread) {
2993 		ASSERT(frozenstr(q));
2994 		ASSERT(MUTEX_HELD(QLOCK(q)));
2995 	} else if (MUTEX_HELD(QLOCK(q))) {
2996 		/* Don't drop lock on exit */
2997 		freezer = curthread;
2998 	} else
2999 		mutex_enter(QLOCK(q));
3000 
3001 	if (mcls == QPCTL) {
3002 		if (mp->b_band != 0)
3003 			mp->b_band = 0;		/* force to be correct */
3004 		if (emp && emp->b_prev &&
3005 		    (emp->b_prev->b_datap->db_type < QPCTL))
3006 			goto badord;
3007 	}
3008 	if (emp) {
3009 		if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
3010 		    (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
3011 		    (emp->b_prev->b_band < mp->b_band))) {
3012 			goto badord;
3013 		}
3014 	} else {
3015 		tmp = q->q_last;
3016 		if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
3017 badord:
3018 			cmn_err(CE_WARN,
3019 			    "insq: attempt to insert message out of order "
3020 			    "on q %p", (void *)q);
3021 			if (freezer != curthread)
3022 				mutex_exit(QLOCK(q));
3023 			return (0);
3024 		}
3025 	}
3026 
3027 	if (mp->b_band != 0) {
3028 		int i;
3029 		qband_t **qbpp;
3030 
3031 		if (mp->b_band > q->q_nband) {
3032 			qbpp = &q->q_bandp;
3033 			while (*qbpp)
3034 				qbpp = &(*qbpp)->qb_next;
3035 			while (mp->b_band > q->q_nband) {
3036 				if ((*qbpp = allocband()) == NULL) {
3037 					if (freezer != curthread)
3038 						mutex_exit(QLOCK(q));
3039 					return (0);
3040 				}
3041 				(*qbpp)->qb_hiwat = q->q_hiwat;
3042 				(*qbpp)->qb_lowat = q->q_lowat;
3043 				q->q_nband++;
3044 				qbpp = &(*qbpp)->qb_next;
3045 			}
3046 		}
3047 		qbp = q->q_bandp;
3048 		i = mp->b_band;
3049 		while (--i)
3050 			qbp = qbp->qb_next;
3051 	}
3052 
3053 	if ((mp->b_next = emp) != NULL) {
3054 		if ((mp->b_prev = emp->b_prev) != NULL)
3055 			emp->b_prev->b_next = mp;
3056 		else
3057 			q->q_first = mp;
3058 		emp->b_prev = mp;
3059 	} else {
3060 		if ((mp->b_prev = q->q_last) != NULL)
3061 			q->q_last->b_next = mp;
3062 		else
3063 			q->q_first = mp;
3064 		q->q_last = mp;
3065 	}
3066 
3067 	/* Get mblk and byte count for q_count accounting */
3068 	bytecnt = mp_cont_len(mp, &mblkcnt);
3069 
3070 	if (qbp) {	/* adjust qband pointers and count */
3071 		if (!qbp->qb_first) {
3072 			qbp->qb_first = mp;
3073 			qbp->qb_last = mp;
3074 		} else {
3075 			if (mp->b_prev == NULL || (mp->b_prev != NULL &&
3076 			    (mp->b_prev->b_band != mp->b_band)))
3077 				qbp->qb_first = mp;
3078 			else if (mp->b_next == NULL || (mp->b_next != NULL &&
3079 			    (mp->b_next->b_band != mp->b_band)))
3080 				qbp->qb_last = mp;
3081 		}
3082 		qbp->qb_count += bytecnt;
3083 		qbp->qb_mblkcnt += mblkcnt;
3084 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
3085 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
3086 			qbp->qb_flag |= QB_FULL;
3087 		}
3088 	} else {
3089 		q->q_count += bytecnt;
3090 		q->q_mblkcnt += mblkcnt;
3091 		if ((q->q_count >= q->q_hiwat) ||
3092 		    (q->q_mblkcnt >= q->q_hiwat)) {
3093 			q->q_flag |= QFULL;
3094 		}
3095 	}
3096 
3097 	STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL);
3098 
3099 	if (canenable(q) && (q->q_flag & QWANTR))
3100 		qenable_locked(q);
3101 
3102 	ASSERT(MUTEX_HELD(QLOCK(q)));
3103 	if (freezer != curthread)
3104 		mutex_exit(QLOCK(q));
3105 
3106 	return (1);
3107 }
3108 
3109 /*
3110  * Create and put a control message on queue.
3111  */
3112 int
3113 putctl(queue_t *q, int type)
3114 {
3115 	mblk_t *bp;
3116 
3117 	if ((datamsg(type) && (type != M_DELAY)) ||
3118 	    (bp = allocb_tryhard(0)) == NULL)
3119 		return (0);
3120 	bp->b_datap->db_type = (unsigned char) type;
3121 
3122 	put(q, bp);
3123 
3124 	return (1);
3125 }
3126 
3127 /*
3128  * Control message with a single-byte parameter
3129  */
3130 int
3131 putctl1(queue_t *q, int type, int param)
3132 {
3133 	mblk_t *bp;
3134 
3135 	if ((datamsg(type) && (type != M_DELAY)) ||
3136 	    (bp = allocb_tryhard(1)) == NULL)
3137 		return (0);
3138 	bp->b_datap->db_type = (unsigned char)type;
3139 	*bp->b_wptr++ = (unsigned char)param;
3140 
3141 	put(q, bp);
3142 
3143 	return (1);
3144 }
3145 
3146 int
3147 putnextctl1(queue_t *q, int type, int param)
3148 {
3149 	mblk_t *bp;
3150 
3151 	if ((datamsg(type) && (type != M_DELAY)) ||
3152 	    ((bp = allocb_tryhard(1)) == NULL))
3153 		return (0);
3154 
3155 	bp->b_datap->db_type = (unsigned char)type;
3156 	*bp->b_wptr++ = (unsigned char)param;
3157 
3158 	putnext(q, bp);
3159 
3160 	return (1);
3161 }
3162 
3163 int
3164 putnextctl(queue_t *q, int type)
3165 {
3166 	mblk_t *bp;
3167 
3168 	if ((datamsg(type) && (type != M_DELAY)) ||
3169 	    ((bp = allocb_tryhard(0)) == NULL))
3170 		return (0);
3171 	bp->b_datap->db_type = (unsigned char)type;
3172 
3173 	putnext(q, bp);
3174 
3175 	return (1);
3176 }
3177 
3178 /*
3179  * Return the queue upstream from this one
3180  */
3181 queue_t *
3182 backq(queue_t *q)
3183 {
3184 	q = _OTHERQ(q);
3185 	if (q->q_next) {
3186 		q = q->q_next;
3187 		return (_OTHERQ(q));
3188 	}
3189 	return (NULL);
3190 }
3191 
3192 /*
3193  * Send a block back up the queue in reverse from this
3194  * one (e.g. to respond to ioctls)
3195  */
3196 void
3197 qreply(queue_t *q, mblk_t *bp)
3198 {
3199 	ASSERT(q && bp);
3200 
3201 	putnext(_OTHERQ(q), bp);
3202 }
3203 
3204 /*
3205  * Streams Queue Scheduling
3206  *
3207  * Queues are enabled through qenable() when they have messages to
3208  * process.  They are serviced by queuerun(), which runs each enabled
3209  * queue's service procedure.  The call to queuerun() is processor
3210  * dependent - the general principle is that it be run whenever a queue
3211  * is enabled but before returning to user level.  For system calls,
3212  * the function runqueues() is called if their action causes a queue
3213  * to be enabled.  For device interrupts, queuerun() should be
3214  * called before returning from the last level of interrupt.  Beyond
3215  * this, no timing assumptions should be made about queue scheduling.
3216  */
3217 
3218 /*
3219  * Enable a queue: put it on list of those whose service procedures are
3220  * ready to run and set up the scheduling mechanism.
3221  * The broadcast is done outside the mutex -> to avoid the woken thread
3222  * from contending with the mutex. This is OK 'cos the queue has been
3223  * enqueued on the runlist and flagged safely at this point.
3224  */
3225 void
3226 qenable(queue_t *q)
3227 {
3228 	mutex_enter(QLOCK(q));
3229 	qenable_locked(q);
3230 	mutex_exit(QLOCK(q));
3231 }
3232 /*
3233  * Return number of messages on queue
3234  */
3235 int
3236 qsize(queue_t *qp)
3237 {
3238 	int count = 0;
3239 	mblk_t *mp;
3240 
3241 	mutex_enter(QLOCK(qp));
3242 	for (mp = qp->q_first; mp; mp = mp->b_next)
3243 		count++;
3244 	mutex_exit(QLOCK(qp));
3245 	return (count);
3246 }
3247 
3248 /*
3249  * noenable - set queue so that putq() will not enable it.
3250  * enableok - set queue so that putq() can enable it.
3251  */
3252 void
3253 noenable(queue_t *q)
3254 {
3255 	mutex_enter(QLOCK(q));
3256 	q->q_flag |= QNOENB;
3257 	mutex_exit(QLOCK(q));
3258 }
3259 
3260 void
3261 enableok(queue_t *q)
3262 {
3263 	mutex_enter(QLOCK(q));
3264 	q->q_flag &= ~QNOENB;
3265 	mutex_exit(QLOCK(q));
3266 }
3267 
3268 /*
3269  * Set queue fields.
3270  */
3271 int
3272 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
3273 {
3274 	qband_t *qbp = NULL;
3275 	queue_t	*wrq;
3276 	int error = 0;
3277 	kthread_id_t freezer;
3278 
3279 	freezer = STREAM(q)->sd_freezer;
3280 	if (freezer == curthread) {
3281 		ASSERT(frozenstr(q));
3282 		ASSERT(MUTEX_HELD(QLOCK(q)));
3283 	} else
3284 		mutex_enter(QLOCK(q));
3285 
3286 	if (what >= QBAD) {
3287 		error = EINVAL;
3288 		goto done;
3289 	}
3290 	if (pri != 0) {
3291 		int i;
3292 		qband_t **qbpp;
3293 
3294 		if (pri > q->q_nband) {
3295 			qbpp = &q->q_bandp;
3296 			while (*qbpp)
3297 				qbpp = &(*qbpp)->qb_next;
3298 			while (pri > q->q_nband) {
3299 				if ((*qbpp = allocband()) == NULL) {
3300 					error = EAGAIN;
3301 					goto done;
3302 				}
3303 				(*qbpp)->qb_hiwat = q->q_hiwat;
3304 				(*qbpp)->qb_lowat = q->q_lowat;
3305 				q->q_nband++;
3306 				qbpp = &(*qbpp)->qb_next;
3307 			}
3308 		}
3309 		qbp = q->q_bandp;
3310 		i = pri;
3311 		while (--i)
3312 			qbp = qbp->qb_next;
3313 	}
3314 	switch (what) {
3315 
3316 	case QHIWAT:
3317 		if (qbp)
3318 			qbp->qb_hiwat = (size_t)val;
3319 		else
3320 			q->q_hiwat = (size_t)val;
3321 		break;
3322 
3323 	case QLOWAT:
3324 		if (qbp)
3325 			qbp->qb_lowat = (size_t)val;
3326 		else
3327 			q->q_lowat = (size_t)val;
3328 		break;
3329 
3330 	case QMAXPSZ:
3331 		if (qbp)
3332 			error = EINVAL;
3333 		else
3334 			q->q_maxpsz = (ssize_t)val;
3335 
3336 		/*
3337 		 * Performance concern, strwrite looks at the module below
3338 		 * the stream head for the maxpsz each time it does a write
3339 		 * we now cache it at the stream head.  Check to see if this
3340 		 * queue is sitting directly below the stream head.
3341 		 */
3342 		wrq = STREAM(q)->sd_wrq;
3343 		if (q != wrq->q_next)
3344 			break;
3345 
3346 		/*
3347 		 * If the stream is not frozen drop the current QLOCK and
3348 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3349 		 */
3350 		if (freezer != curthread) {
3351 			mutex_exit(QLOCK(q));
3352 			mutex_enter(QLOCK(wrq));
3353 		}
3354 		ASSERT(MUTEX_HELD(QLOCK(wrq)));
3355 
3356 		if (strmsgsz != 0) {
3357 			if (val == INFPSZ)
3358 				val = strmsgsz;
3359 			else  {
3360 				if (STREAM(q)->sd_vnode->v_type == VFIFO)
3361 					val = MIN(PIPE_BUF, val);
3362 				else
3363 					val = MIN(strmsgsz, val);
3364 			}
3365 		}
3366 		STREAM(q)->sd_qn_maxpsz = val;
3367 		if (freezer != curthread) {
3368 			mutex_exit(QLOCK(wrq));
3369 			mutex_enter(QLOCK(q));
3370 		}
3371 		break;
3372 
3373 	case QMINPSZ:
3374 		if (qbp)
3375 			error = EINVAL;
3376 		else
3377 			q->q_minpsz = (ssize_t)val;
3378 
3379 		/*
3380 		 * Performance concern, strwrite looks at the module below
3381 		 * the stream head for the maxpsz each time it does a write
3382 		 * we now cache it at the stream head.  Check to see if this
3383 		 * queue is sitting directly below the stream head.
3384 		 */
3385 		wrq = STREAM(q)->sd_wrq;
3386 		if (q != wrq->q_next)
3387 			break;
3388 
3389 		/*
3390 		 * If the stream is not frozen drop the current QLOCK and
3391 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3392 		 */
3393 		if (freezer != curthread) {
3394 			mutex_exit(QLOCK(q));
3395 			mutex_enter(QLOCK(wrq));
3396 		}
3397 		STREAM(q)->sd_qn_minpsz = (ssize_t)val;
3398 
3399 		if (freezer != curthread) {
3400 			mutex_exit(QLOCK(wrq));
3401 			mutex_enter(QLOCK(q));
3402 		}
3403 		break;
3404 
3405 	case QSTRUIOT:
3406 		if (qbp)
3407 			error = EINVAL;
3408 		else
3409 			q->q_struiot = (ushort_t)val;
3410 		break;
3411 
3412 	case QCOUNT:
3413 	case QFIRST:
3414 	case QLAST:
3415 	case QFLAG:
3416 		error = EPERM;
3417 		break;
3418 
3419 	default:
3420 		error = EINVAL;
3421 		break;
3422 	}
3423 done:
3424 	if (freezer != curthread)
3425 		mutex_exit(QLOCK(q));
3426 	return (error);
3427 }
3428 
3429 /*
3430  * Get queue fields.
3431  */
3432 int
3433 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
3434 {
3435 	qband_t 	*qbp = NULL;
3436 	int 		error = 0;
3437 	kthread_id_t 	freezer;
3438 
3439 	freezer = STREAM(q)->sd_freezer;
3440 	if (freezer == curthread) {
3441 		ASSERT(frozenstr(q));
3442 		ASSERT(MUTEX_HELD(QLOCK(q)));
3443 	} else
3444 		mutex_enter(QLOCK(q));
3445 	if (what >= QBAD) {
3446 		error = EINVAL;
3447 		goto done;
3448 	}
3449 	if (pri != 0) {
3450 		int i;
3451 		qband_t **qbpp;
3452 
3453 		if (pri > q->q_nband) {
3454 			qbpp = &q->q_bandp;
3455 			while (*qbpp)
3456 				qbpp = &(*qbpp)->qb_next;
3457 			while (pri > q->q_nband) {
3458 				if ((*qbpp = allocband()) == NULL) {
3459 					error = EAGAIN;
3460 					goto done;
3461 				}
3462 				(*qbpp)->qb_hiwat = q->q_hiwat;
3463 				(*qbpp)->qb_lowat = q->q_lowat;
3464 				q->q_nband++;
3465 				qbpp = &(*qbpp)->qb_next;
3466 			}
3467 		}
3468 		qbp = q->q_bandp;
3469 		i = pri;
3470 		while (--i)
3471 			qbp = qbp->qb_next;
3472 	}
3473 	switch (what) {
3474 	case QHIWAT:
3475 		if (qbp)
3476 			*(size_t *)valp = qbp->qb_hiwat;
3477 		else
3478 			*(size_t *)valp = q->q_hiwat;
3479 		break;
3480 
3481 	case QLOWAT:
3482 		if (qbp)
3483 			*(size_t *)valp = qbp->qb_lowat;
3484 		else
3485 			*(size_t *)valp = q->q_lowat;
3486 		break;
3487 
3488 	case QMAXPSZ:
3489 		if (qbp)
3490 			error = EINVAL;
3491 		else
3492 			*(ssize_t *)valp = q->q_maxpsz;
3493 		break;
3494 
3495 	case QMINPSZ:
3496 		if (qbp)
3497 			error = EINVAL;
3498 		else
3499 			*(ssize_t *)valp = q->q_minpsz;
3500 		break;
3501 
3502 	case QCOUNT:
3503 		if (qbp)
3504 			*(size_t *)valp = qbp->qb_count;
3505 		else
3506 			*(size_t *)valp = q->q_count;
3507 		break;
3508 
3509 	case QFIRST:
3510 		if (qbp)
3511 			*(mblk_t **)valp = qbp->qb_first;
3512 		else
3513 			*(mblk_t **)valp = q->q_first;
3514 		break;
3515 
3516 	case QLAST:
3517 		if (qbp)
3518 			*(mblk_t **)valp = qbp->qb_last;
3519 		else
3520 			*(mblk_t **)valp = q->q_last;
3521 		break;
3522 
3523 	case QFLAG:
3524 		if (qbp)
3525 			*(uint_t *)valp = qbp->qb_flag;
3526 		else
3527 			*(uint_t *)valp = q->q_flag;
3528 		break;
3529 
3530 	case QSTRUIOT:
3531 		if (qbp)
3532 			error = EINVAL;
3533 		else
3534 			*(short *)valp = q->q_struiot;
3535 		break;
3536 
3537 	default:
3538 		error = EINVAL;
3539 		break;
3540 	}
3541 done:
3542 	if (freezer != curthread)
3543 		mutex_exit(QLOCK(q));
3544 	return (error);
3545 }
3546 
3547 /*
3548  * Function awakes all in cvwait/sigwait/pollwait, on one of:
3549  *	QWANTWSYNC or QWANTR or QWANTW,
3550  *
3551  * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
3552  *	 deferred wakeup will be done. Also if strpoll() in progress then a
3553  *	 deferred pollwakeup will be done.
3554  */
3555 void
3556 strwakeq(queue_t *q, int flag)
3557 {
3558 	stdata_t 	*stp = STREAM(q);
3559 	pollhead_t 	*pl;
3560 
3561 	mutex_enter(&stp->sd_lock);
3562 	pl = &stp->sd_pollist;
3563 	if (flag & QWANTWSYNC) {
3564 		ASSERT(!(q->q_flag & QREADR));
3565 		if (stp->sd_flag & WSLEEP) {
3566 			stp->sd_flag &= ~WSLEEP;
3567 			cv_broadcast(&stp->sd_wrq->q_wait);
3568 		} else {
3569 			stp->sd_wakeq |= WSLEEP;
3570 		}
3571 
3572 		mutex_exit(&stp->sd_lock);
3573 		pollwakeup(pl, POLLWRNORM);
3574 		mutex_enter(&stp->sd_lock);
3575 
3576 		if (stp->sd_sigflags & S_WRNORM)
3577 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3578 	} else if (flag & QWANTR) {
3579 		if (stp->sd_flag & RSLEEP) {
3580 			stp->sd_flag &= ~RSLEEP;
3581 			cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
3582 		} else {
3583 			stp->sd_wakeq |= RSLEEP;
3584 		}
3585 
3586 		mutex_exit(&stp->sd_lock);
3587 		pollwakeup(pl, POLLIN | POLLRDNORM);
3588 		mutex_enter(&stp->sd_lock);
3589 
3590 		{
3591 			int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
3592 
3593 			if (events)
3594 				strsendsig(stp->sd_siglist, events, 0, 0);
3595 		}
3596 	} else {
3597 		if (stp->sd_flag & WSLEEP) {
3598 			stp->sd_flag &= ~WSLEEP;
3599 			cv_broadcast(&stp->sd_wrq->q_wait);
3600 		}
3601 
3602 		mutex_exit(&stp->sd_lock);
3603 		pollwakeup(pl, POLLWRNORM);
3604 		mutex_enter(&stp->sd_lock);
3605 
3606 		if (stp->sd_sigflags & S_WRNORM)
3607 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3608 	}
3609 	mutex_exit(&stp->sd_lock);
3610 }
3611 
3612 int
3613 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
3614 {
3615 	stdata_t *stp = STREAM(q);
3616 	int typ  = STRUIOT_STANDARD;
3617 	uio_t	 *uiop = &dp->d_uio;
3618 	dblk_t	 *dbp;
3619 	ssize_t	 uiocnt;
3620 	ssize_t	 cnt;
3621 	unsigned char *ptr;
3622 	ssize_t	 resid;
3623 	int	 error = 0;
3624 	on_trap_data_t otd;
3625 	queue_t	*stwrq;
3626 
3627 	/*
3628 	 * Plumbing may change while taking the type so store the
3629 	 * queue in a temporary variable. It doesn't matter even
3630 	 * if the we take the type from the previous plumbing,
3631 	 * that's because if the plumbing has changed when we were
3632 	 * holding the queue in a temporary variable, we can continue
3633 	 * processing the message the way it would have been processed
3634 	 * in the old plumbing, without any side effects but a bit
3635 	 * extra processing for partial ip header checksum.
3636 	 *
3637 	 * This has been done to avoid holding the sd_lock which is
3638 	 * very hot.
3639 	 */
3640 
3641 	stwrq = stp->sd_struiowrq;
3642 	if (stwrq)
3643 		typ = stwrq->q_struiot;
3644 
3645 	for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
3646 		dbp = mp->b_datap;
3647 		ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
3648 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3649 		cnt = MIN(uiocnt, uiop->uio_resid);
3650 		if (!(dbp->db_struioflag & STRUIO_SPEC) ||
3651 		    (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
3652 			/*
3653 			 * Either this mblk has already been processed
3654 			 * or there is no more room in this mblk (?).
3655 			 */
3656 			continue;
3657 		}
3658 		switch (typ) {
3659 		case STRUIOT_STANDARD:
3660 			if (noblock) {
3661 				if (on_trap(&otd, OT_DATA_ACCESS)) {
3662 					no_trap();
3663 					error = EWOULDBLOCK;
3664 					goto out;
3665 				}
3666 			}
3667 			if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
3668 				if (noblock)
3669 					no_trap();
3670 				goto out;
3671 			}
3672 			if (noblock)
3673 				no_trap();
3674 			break;
3675 
3676 		default:
3677 			error = EIO;
3678 			goto out;
3679 		}
3680 		dbp->db_struioflag |= STRUIO_DONE;
3681 		dbp->db_cksumstuff += cnt;
3682 	}
3683 out:
3684 	if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
3685 		/*
3686 		 * A fault has occured and some bytes were moved to the
3687 		 * current mblk, the uio_t has already been updated by
3688 		 * the appropriate uio routine, so also update the mblk
3689 		 * to reflect this in case this same mblk chain is used
3690 		 * again (after the fault has been handled).
3691 		 */
3692 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3693 		if (uiocnt >= resid)
3694 			dbp->db_cksumstuff += resid;
3695 	}
3696 	return (error);
3697 }
3698 
3699 /*
3700  * Try to enter queue synchronously. Any attempt to enter a closing queue will
3701  * fails. The qp->q_rwcnt keeps track of the number of successful entries so
3702  * that removeq() will not try to close the queue while a thread is inside the
3703  * queue.
3704  */
3705 static boolean_t
3706 rwnext_enter(queue_t *qp)
3707 {
3708 	mutex_enter(QLOCK(qp));
3709 	if (qp->q_flag & QWCLOSE) {
3710 		mutex_exit(QLOCK(qp));
3711 		return (B_FALSE);
3712 	}
3713 	qp->q_rwcnt++;
3714 	ASSERT(qp->q_rwcnt != 0);
3715 	mutex_exit(QLOCK(qp));
3716 	return (B_TRUE);
3717 }
3718 
3719 /*
3720  * Decrease the count of threads running in sync stream queue and wake up any
3721  * threads blocked in removeq().
3722  */
3723 static void
3724 rwnext_exit(queue_t *qp)
3725 {
3726 	mutex_enter(QLOCK(qp));
3727 	qp->q_rwcnt--;
3728 	if (qp->q_flag & QWANTRMQSYNC) {
3729 		qp->q_flag &= ~QWANTRMQSYNC;
3730 		cv_broadcast(&qp->q_wait);
3731 	}
3732 	mutex_exit(QLOCK(qp));
3733 }
3734 
3735 /*
3736  * The purpose of rwnext() is to call the rw procedure of the next
3737  * (downstream) modules queue.
3738  *
3739  * treated as put entrypoint for perimeter syncronization.
3740  *
3741  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3742  * sync queues). If it is CIPUT sync queue sq_count is incremented and it does
3743  * not matter if any regular put entrypoints have been already entered. We
3744  * can't increment one of the sq_putcounts (instead of sq_count) because
3745  * qwait_rw won't know which counter to decrement.
3746  *
3747  * It would be reasonable to add the lockless FASTPUT logic.
3748  */
3749 int
3750 rwnext(queue_t *qp, struiod_t *dp)
3751 {
3752 	queue_t		*nqp;
3753 	syncq_t		*sq;
3754 	uint16_t	count;
3755 	uint16_t	flags;
3756 	struct qinit	*qi;
3757 	int		(*proc)();
3758 	struct stdata	*stp;
3759 	int		isread;
3760 	int		rval;
3761 
3762 	stp = STREAM(qp);
3763 	/*
3764 	 * Prevent q_next from changing by holding sd_lock until acquiring
3765 	 * SQLOCK. Note that a read-side rwnext from the streamhead will
3766 	 * already have sd_lock acquired. In either case sd_lock is always
3767 	 * released after acquiring SQLOCK.
3768 	 *
3769 	 * The streamhead read-side holding sd_lock when calling rwnext is
3770 	 * required to prevent a race condition were M_DATA mblks flowing
3771 	 * up the read-side of the stream could be bypassed by a rwnext()
3772 	 * down-call. In this case sd_lock acts as the streamhead perimeter.
3773 	 */
3774 	if ((nqp = _WR(qp)) == qp) {
3775 		isread = 0;
3776 		mutex_enter(&stp->sd_lock);
3777 		qp = nqp->q_next;
3778 	} else {
3779 		isread = 1;
3780 		if (nqp != stp->sd_wrq)
3781 			/* Not streamhead */
3782 			mutex_enter(&stp->sd_lock);
3783 		qp = _RD(nqp->q_next);
3784 	}
3785 	qi = qp->q_qinfo;
3786 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
3787 		/*
3788 		 * Not a synchronous module or no r/w procedure for this
3789 		 * queue, so just return EINVAL and let the caller handle it.
3790 		 */
3791 		mutex_exit(&stp->sd_lock);
3792 		return (EINVAL);
3793 	}
3794 
3795 	if (rwnext_enter(qp) == B_FALSE) {
3796 		mutex_exit(&stp->sd_lock);
3797 		return (EINVAL);
3798 	}
3799 
3800 	sq = qp->q_syncq;
3801 	mutex_enter(SQLOCK(sq));
3802 	mutex_exit(&stp->sd_lock);
3803 	count = sq->sq_count;
3804 	flags = sq->sq_flags;
3805 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3806 
3807 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3808 		/*
3809 		 * if this queue is being closed, return.
3810 		 */
3811 		if (qp->q_flag & QWCLOSE) {
3812 			mutex_exit(SQLOCK(sq));
3813 			rwnext_exit(qp);
3814 			return (EINVAL);
3815 		}
3816 
3817 		/*
3818 		 * Wait until we can enter the inner perimeter.
3819 		 */
3820 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3821 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3822 		count = sq->sq_count;
3823 		flags = sq->sq_flags;
3824 	}
3825 
3826 	if (isread == 0 && stp->sd_struiowrq == NULL ||
3827 	    isread == 1 && stp->sd_struiordq == NULL) {
3828 		/*
3829 		 * Stream plumbing changed while waiting for inner perimeter
3830 		 * so just return EINVAL and let the caller handle it.
3831 		 */
3832 		mutex_exit(SQLOCK(sq));
3833 		rwnext_exit(qp);
3834 		return (EINVAL);
3835 	}
3836 	if (!(flags & SQ_CIPUT))
3837 		sq->sq_flags = flags | SQ_EXCL;
3838 	sq->sq_count = count + 1;
3839 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3840 	/*
3841 	 * Note: The only message ordering guarantee that rwnext() makes is
3842 	 *	 for the write queue flow-control case. All others (r/w queue
3843 	 *	 with q_count > 0 (or q_first != 0)) are the resposibilty of
3844 	 *	 the queue's rw procedure. This could be genralized here buy
3845 	 *	 running the queue's service procedure, but that wouldn't be
3846 	 *	 the most efficent for all cases.
3847 	 */
3848 	mutex_exit(SQLOCK(sq));
3849 	if (! isread && (qp->q_flag & QFULL)) {
3850 		/*
3851 		 * Write queue may be flow controlled. If so,
3852 		 * mark the queue for wakeup when it's not.
3853 		 */
3854 		mutex_enter(QLOCK(qp));
3855 		if (qp->q_flag & QFULL) {
3856 			qp->q_flag |= QWANTWSYNC;
3857 			mutex_exit(QLOCK(qp));
3858 			rval = EWOULDBLOCK;
3859 			goto out;
3860 		}
3861 		mutex_exit(QLOCK(qp));
3862 	}
3863 
3864 	if (! isread && dp->d_mp)
3865 		STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
3866 		    dp->d_mp->b_datap->db_base);
3867 
3868 	rval = (*proc)(qp, dp);
3869 
3870 	if (isread && dp->d_mp)
3871 		STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
3872 		    dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
3873 out:
3874 	/*
3875 	 * The queue is protected from being freed by sq_count, so it is
3876 	 * safe to call rwnext_exit and reacquire SQLOCK(sq).
3877 	 */
3878 	rwnext_exit(qp);
3879 
3880 	mutex_enter(SQLOCK(sq));
3881 	flags = sq->sq_flags;
3882 	ASSERT(sq->sq_count != 0);
3883 	sq->sq_count--;
3884 	if (flags & SQ_TAIL) {
3885 		putnext_tail(sq, qp, flags);
3886 		/*
3887 		 * The only purpose of this ASSERT is to preserve calling stack
3888 		 * in DEBUG kernel.
3889 		 */
3890 		ASSERT(flags & SQ_TAIL);
3891 		return (rval);
3892 	}
3893 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3894 	/*
3895 	 * Safe to always drop SQ_EXCL:
3896 	 *	Not SQ_CIPUT means we set SQ_EXCL above
3897 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3898 	 *	did a qwriter(INNER) in which case nobody else
3899 	 *	is in the inner perimeter and we are exiting.
3900 	 *
3901 	 * I would like to make the following assertion:
3902 	 *
3903 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3904 	 * 	sq->sq_count == 0);
3905 	 *
3906 	 * which indicates that if we are both putshared and exclusive,
3907 	 * we became exclusive while executing the putproc, and the only
3908 	 * claim on the syncq was the one we dropped a few lines above.
3909 	 * But other threads that enter putnext while the syncq is exclusive
3910 	 * need to make a claim as they may need to drop SQLOCK in the
3911 	 * has_writers case to avoid deadlocks.  If these threads are
3912 	 * delayed or preempted, it is possible that the writer thread can
3913 	 * find out that there are other claims making the (sq_count == 0)
3914 	 * test invalid.
3915 	 */
3916 
3917 	sq->sq_flags = flags & ~SQ_EXCL;
3918 	if (sq->sq_flags & SQ_WANTWAKEUP) {
3919 		sq->sq_flags &= ~SQ_WANTWAKEUP;
3920 		cv_broadcast(&sq->sq_wait);
3921 	}
3922 	mutex_exit(SQLOCK(sq));
3923 	return (rval);
3924 }
3925 
3926 /*
3927  * The purpose of infonext() is to call the info procedure of the next
3928  * (downstream) modules queue.
3929  *
3930  * treated as put entrypoint for perimeter syncronization.
3931  *
3932  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3933  * sync queues). If it is CIPUT sync queue regular sq_count is incremented and
3934  * it does not matter if any regular put entrypoints have been already
3935  * entered.
3936  */
3937 int
3938 infonext(queue_t *qp, infod_t *idp)
3939 {
3940 	queue_t		*nqp;
3941 	syncq_t		*sq;
3942 	uint16_t	count;
3943 	uint16_t 	flags;
3944 	struct qinit	*qi;
3945 	int		(*proc)();
3946 	struct stdata	*stp;
3947 	int		rval;
3948 
3949 	stp = STREAM(qp);
3950 	/*
3951 	 * Prevent q_next from changing by holding sd_lock until
3952 	 * acquiring SQLOCK.
3953 	 */
3954 	mutex_enter(&stp->sd_lock);
3955 	if ((nqp = _WR(qp)) == qp) {
3956 		qp = nqp->q_next;
3957 	} else {
3958 		qp = _RD(nqp->q_next);
3959 	}
3960 	qi = qp->q_qinfo;
3961 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
3962 		mutex_exit(&stp->sd_lock);
3963 		return (EINVAL);
3964 	}
3965 	sq = qp->q_syncq;
3966 	mutex_enter(SQLOCK(sq));
3967 	mutex_exit(&stp->sd_lock);
3968 	count = sq->sq_count;
3969 	flags = sq->sq_flags;
3970 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3971 
3972 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3973 		/*
3974 		 * Wait until we can enter the inner perimeter.
3975 		 */
3976 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3977 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3978 		count = sq->sq_count;
3979 		flags = sq->sq_flags;
3980 	}
3981 
3982 	if (! (flags & SQ_CIPUT))
3983 		sq->sq_flags = flags | SQ_EXCL;
3984 	sq->sq_count = count + 1;
3985 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3986 	mutex_exit(SQLOCK(sq));
3987 
3988 	rval = (*proc)(qp, idp);
3989 
3990 	mutex_enter(SQLOCK(sq));
3991 	flags = sq->sq_flags;
3992 	ASSERT(sq->sq_count != 0);
3993 	sq->sq_count--;
3994 	if (flags & SQ_TAIL) {
3995 		putnext_tail(sq, qp, flags);
3996 		/*
3997 		 * The only purpose of this ASSERT is to preserve calling stack
3998 		 * in DEBUG kernel.
3999 		 */
4000 		ASSERT(flags & SQ_TAIL);
4001 		return (rval);
4002 	}
4003 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
4004 /*
4005  * XXXX
4006  * I am not certain the next comment is correct here.  I need to consider
4007  * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
4008  * might cause other problems.  It just might be safer to drop it if
4009  * !SQ_CIPUT because that is when we set it.
4010  */
4011 	/*
4012 	 * Safe to always drop SQ_EXCL:
4013 	 *	Not SQ_CIPUT means we set SQ_EXCL above
4014 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
4015 	 *	did a qwriter(INNER) in which case nobody else
4016 	 *	is in the inner perimeter and we are exiting.
4017 	 *
4018 	 * I would like to make the following assertion:
4019 	 *
4020 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
4021 	 *	sq->sq_count == 0);
4022 	 *
4023 	 * which indicates that if we are both putshared and exclusive,
4024 	 * we became exclusive while executing the putproc, and the only
4025 	 * claim on the syncq was the one we dropped a few lines above.
4026 	 * But other threads that enter putnext while the syncq is exclusive
4027 	 * need to make a claim as they may need to drop SQLOCK in the
4028 	 * has_writers case to avoid deadlocks.  If these threads are
4029 	 * delayed or preempted, it is possible that the writer thread can
4030 	 * find out that there are other claims making the (sq_count == 0)
4031 	 * test invalid.
4032 	 */
4033 
4034 	sq->sq_flags = flags & ~SQ_EXCL;
4035 	mutex_exit(SQLOCK(sq));
4036 	return (rval);
4037 }
4038 
4039 /*
4040  * Return nonzero if the queue is responsible for struio(), else return 0.
4041  */
4042 int
4043 isuioq(queue_t *q)
4044 {
4045 	if (q->q_flag & QREADR)
4046 		return (STREAM(q)->sd_struiordq == q);
4047 	else
4048 		return (STREAM(q)->sd_struiowrq == q);
4049 }
4050 
4051 #if defined(__sparc)
4052 int disable_putlocks = 0;
4053 #else
4054 int disable_putlocks = 1;
4055 #endif
4056 
4057 /*
4058  * called by create_putlock.
4059  */
4060 static void
4061 create_syncq_putlocks(queue_t *q)
4062 {
4063 	syncq_t	*sq = q->q_syncq;
4064 	ciputctrl_t *cip;
4065 	int i;
4066 
4067 	ASSERT(sq != NULL);
4068 
4069 	ASSERT(disable_putlocks == 0);
4070 	ASSERT(n_ciputctrl >= min_n_ciputctrl);
4071 	ASSERT(ciputctrl_cache != NULL);
4072 
4073 	if (!(sq->sq_type & SQ_CIPUT))
4074 		return;
4075 
4076 	for (i = 0; i <= 1; i++) {
4077 		if (sq->sq_ciputctrl == NULL) {
4078 			cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4079 			SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4080 			mutex_enter(SQLOCK(sq));
4081 			if (sq->sq_ciputctrl != NULL) {
4082 				mutex_exit(SQLOCK(sq));
4083 				kmem_cache_free(ciputctrl_cache, cip);
4084 			} else {
4085 				ASSERT(sq->sq_nciputctrl == 0);
4086 				sq->sq_nciputctrl = n_ciputctrl - 1;
4087 				/*
4088 				 * putnext checks sq_ciputctrl without holding
4089 				 * SQLOCK. if it is not NULL putnext assumes
4090 				 * sq_nciputctrl is initialized. membar below
4091 				 * insures that.
4092 				 */
4093 				membar_producer();
4094 				sq->sq_ciputctrl = cip;
4095 				mutex_exit(SQLOCK(sq));
4096 			}
4097 		}
4098 		ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
4099 		if (i == 1)
4100 			break;
4101 		q = _OTHERQ(q);
4102 		if (!(q->q_flag & QPERQ)) {
4103 			ASSERT(sq == q->q_syncq);
4104 			break;
4105 		}
4106 		ASSERT(q->q_syncq != NULL);
4107 		ASSERT(sq != q->q_syncq);
4108 		sq = q->q_syncq;
4109 		ASSERT(sq->sq_type & SQ_CIPUT);
4110 	}
4111 }
4112 
4113 /*
4114  * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
4115  * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
4116  * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
4117  * starting from q and down to the driver.
4118  *
4119  * This should be called after the affected queues are part of stream
4120  * geometry. It should be called from driver/module open routine after
4121  * qprocson() call. It is also called from nfs syscall where it is known that
4122  * stream is configured and won't change its geometry during create_putlock
4123  * call.
4124  *
4125  * caller normally uses 0 value for the stream argument to speed up MT putnext
4126  * into the perimeter of q for example because its perimeter is per module
4127  * (e.g. IP).
4128  *
4129  * caller normally uses non 0 value for the stream argument to hint the system
4130  * that the stream of q is a very contended global system stream
4131  * (e.g. NFS/UDP) and the part of the stream from q to the driver is
4132  * particularly MT hot.
4133  *
4134  * Caller insures stream plumbing won't happen while we are here and therefore
4135  * q_next can be safely used.
4136  */
4137 
4138 void
4139 create_putlocks(queue_t *q, int stream)
4140 {
4141 	ciputctrl_t	*cip;
4142 	struct stdata	*stp = STREAM(q);
4143 
4144 	q = _WR(q);
4145 	ASSERT(stp != NULL);
4146 
4147 	if (disable_putlocks != 0)
4148 		return;
4149 
4150 	if (n_ciputctrl < min_n_ciputctrl)
4151 		return;
4152 
4153 	ASSERT(ciputctrl_cache != NULL);
4154 
4155 	if (stream != 0 && stp->sd_ciputctrl == NULL) {
4156 		cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4157 		SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4158 		mutex_enter(&stp->sd_lock);
4159 		if (stp->sd_ciputctrl != NULL) {
4160 			mutex_exit(&stp->sd_lock);
4161 			kmem_cache_free(ciputctrl_cache, cip);
4162 		} else {
4163 			ASSERT(stp->sd_nciputctrl == 0);
4164 			stp->sd_nciputctrl = n_ciputctrl - 1;
4165 			/*
4166 			 * putnext checks sd_ciputctrl without holding
4167 			 * sd_lock. if it is not NULL putnext assumes
4168 			 * sd_nciputctrl is initialized. membar below
4169 			 * insures that.
4170 			 */
4171 			membar_producer();
4172 			stp->sd_ciputctrl = cip;
4173 			mutex_exit(&stp->sd_lock);
4174 		}
4175 	}
4176 
4177 	ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);
4178 
4179 	while (_SAMESTR(q)) {
4180 		create_syncq_putlocks(q);
4181 		if (stream == 0)
4182 			return;
4183 		q = q->q_next;
4184 	}
4185 	ASSERT(q != NULL);
4186 	create_syncq_putlocks(q);
4187 }
4188 
4189 /*
4190  * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
4191  * through a stream.
4192  *
4193  * Data currently record per-event is a timestamp, module/driver name,
4194  * downstream module/driver name, optional callstack, event type and a per
4195  * type datum.  Much of the STREAMS framework is instrumented for automatic
4196  * flow tracing (when enabled).  Events can be defined and used by STREAMS
4197  * modules and drivers.
4198  *
4199  * Global objects:
4200  *
4201  *	str_ftevent() - Add a flow-trace event to a dblk.
4202  *	str_ftfree() - Free flow-trace data
4203  *
4204  * Local objects:
4205  *
4206  *	fthdr_cache - pointer to the kmem cache for trace header.
4207  *	ftblk_cache - pointer to the kmem cache for trace data blocks.
4208  */
4209 
4210 int str_ftnever = 1;	/* Don't do STREAMS flow tracing */
4211 int str_ftstack = 0;	/* Don't record event call stacks */
4212 
4213 void
4214 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
4215 {
4216 	ftblk_t *bp = hp->tail;
4217 	ftblk_t *nbp;
4218 	ftevnt_t *ep;
4219 	int ix, nix;
4220 
4221 	ASSERT(hp != NULL);
4222 
4223 	for (;;) {
4224 		if ((ix = bp->ix) == FTBLK_EVNTS) {
4225 			/*
4226 			 * Tail doesn't have room, so need a new tail.
4227 			 *
4228 			 * To make this MT safe, first, allocate a new
4229 			 * ftblk, and initialize it.  To make life a
4230 			 * little easier, reserve the first slot (mostly
4231 			 * by making ix = 1).  When we are finished with
4232 			 * the initialization, CAS this pointer to the
4233 			 * tail.  If this succeeds, this is the new
4234 			 * "next" block.  Otherwise, another thread
4235 			 * got here first, so free the block and start
4236 			 * again.
4237 			 */
4238 			nbp = kmem_cache_alloc(ftblk_cache, KM_NOSLEEP);
4239 			if (nbp == NULL) {
4240 				/* no mem, so punt */
4241 				str_ftnever++;
4242 				/* free up all flow data? */
4243 				return;
4244 			}
4245 			nbp->nxt = NULL;
4246 			nbp->ix = 1;
4247 			/*
4248 			 * Just in case there is another thread about
4249 			 * to get the next index, we need to make sure
4250 			 * the value is there for it.
4251 			 */
4252 			membar_producer();
4253 			if (casptr(&hp->tail, bp, nbp) == bp) {
4254 				/* CAS was successful */
4255 				bp->nxt = nbp;
4256 				membar_producer();
4257 				bp = nbp;
4258 				ix = 0;
4259 				goto cas_good;
4260 			} else {
4261 				kmem_cache_free(ftblk_cache, nbp);
4262 				bp = hp->tail;
4263 				continue;
4264 			}
4265 		}
4266 		nix = ix + 1;
4267 		if (cas32((uint32_t *)&bp->ix, ix, nix) == ix) {
4268 		cas_good:
4269 			if (curthread != hp->thread) {
4270 				hp->thread = curthread;
4271 				evnt |= FTEV_CS;
4272 			}
4273 			if (CPU->cpu_seqid != hp->cpu_seqid) {
4274 				hp->cpu_seqid = CPU->cpu_seqid;
4275 				evnt |= FTEV_PS;
4276 			}
4277 			ep = &bp->ev[ix];
4278 			break;
4279 		}
4280 	}
4281 
4282 	if (evnt & FTEV_QMASK) {
4283 		queue_t *qp = p;
4284 
4285 		if (!(qp->q_flag & QREADR))
4286 			evnt |= FTEV_ISWR;
4287 
4288 		ep->mid = Q2NAME(qp);
4289 
4290 		/*
4291 		 * We only record the next queue name for FTEV_PUTNEXT since
4292 		 * that's the only time we *really* need it, and the putnext()
4293 		 * code ensures that qp->q_next won't vanish.  (We could use
4294 		 * claimstr()/releasestr() but at a performance cost.)
4295 		 */
4296 		if ((evnt & FTEV_MASK) == FTEV_PUTNEXT && qp->q_next != NULL)
4297 			ep->midnext = Q2NAME(qp->q_next);
4298 		else
4299 			ep->midnext = NULL;
4300 	} else {
4301 		ep->mid = p;
4302 		ep->midnext = NULL;
4303 	}
4304 
4305 	if (ep->stk != NULL)
4306 		ep->stk->fs_depth = getpcstack(ep->stk->fs_stk, FTSTK_DEPTH);
4307 
4308 	ep->ts = gethrtime();
4309 	ep->evnt = evnt;
4310 	ep->data = data;
4311 	hp->hash = (hp->hash << 9) + hp->hash;
4312 	hp->hash += (evnt << 16) | data;
4313 	hp->hash += (uintptr_t)ep->mid;
4314 }
4315 
4316 /*
4317  * Free flow-trace data.
4318  */
4319 void
4320 str_ftfree(dblk_t *dbp)
4321 {
4322 	fthdr_t *hp = dbp->db_fthdr;
4323 	ftblk_t *bp = &hp->first;
4324 	ftblk_t *nbp;
4325 
4326 	if (bp != hp->tail || bp->ix != 0) {
4327 		/*
4328 		 * Clear out the hash, have the tail point to itself, and free
4329 		 * any continuation blocks.
4330 		 */
4331 		bp = hp->first.nxt;
4332 		hp->tail = &hp->first;
4333 		hp->hash = 0;
4334 		hp->first.nxt = NULL;
4335 		hp->first.ix = 0;
4336 		while (bp != NULL) {
4337 			nbp = bp->nxt;
4338 			kmem_cache_free(ftblk_cache, bp);
4339 			bp = nbp;
4340 		}
4341 	}
4342 	kmem_cache_free(fthdr_cache, hp);
4343 	dbp->db_fthdr = NULL;
4344 }
4345