xref: /illumos-gate/usr/src/uts/common/io/stream.c (revision 9675e051a26ceeb832dfe8df37f88667f42253e0)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
22 /*	All Rights Reserved	*/
23 
24 /*
25  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
26  * Use is subject to license terms.
27  *
28  * Copyright 2021 Tintri by DDN, Inc. All rights reserved.
29  */
30 
31 #include <sys/types.h>
32 #include <sys/param.h>
33 #include <sys/thread.h>
34 #include <sys/sysmacros.h>
35 #include <sys/stropts.h>
36 #include <sys/stream.h>
37 #include <sys/strsubr.h>
38 #include <sys/strsun.h>
39 #include <sys/conf.h>
40 #include <sys/debug.h>
41 #include <sys/cmn_err.h>
42 #include <sys/kmem.h>
43 #include <sys/atomic.h>
44 #include <sys/errno.h>
45 #include <sys/vtrace.h>
46 #include <sys/ftrace.h>
47 #include <sys/ontrap.h>
48 #include <sys/multidata.h>
49 #include <sys/multidata_impl.h>
50 #include <sys/sdt.h>
51 #include <sys/strft.h>
52 
53 #ifdef DEBUG
54 #include <sys/kmem_impl.h>
55 #endif
56 
57 /*
58  * This file contains all the STREAMS utility routines that may
59  * be used by modules and drivers.
60  */
61 
62 /*
63  * STREAMS message allocator: principles of operation
64  *
65  * The streams message allocator consists of all the routines that
66  * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
67  * dupb(), freeb() and freemsg().  What follows is a high-level view
68  * of how the allocator works.
69  *
70  * Every streams message consists of one or more mblks, a dblk, and data.
71  * All mblks for all types of messages come from a common mblk_cache.
72  * The dblk and data come in several flavors, depending on how the
73  * message is allocated:
74  *
75  * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
76  *     fixed-size dblk/data caches. For message sizes that are multiples of
77  *     PAGESIZE, dblks are allocated separately from the buffer.
78  *     The associated buffer is allocated by the constructor using kmem_alloc().
79  *     For all other message sizes, dblk and its associated data is allocated
80  *     as a single contiguous chunk of memory.
81  *     Objects in these caches consist of a dblk plus its associated data.
82  *     allocb() determines the nearest-size cache by table lookup:
83  *     the dblk_cache[] array provides the mapping from size to dblk cache.
84  *
85  * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
86  *     kmem_alloc()'ing a buffer for the data and supplying that
87  *     buffer to gesballoc(), described below.
88  *
89  * (3) The four flavors of [d]esballoc[a] are all implemented by a
90  *     common routine, gesballoc() ("generic esballoc").  gesballoc()
91  *     allocates a dblk from the global dblk_esb_cache and sets db_base,
92  *     db_lim and db_frtnp to describe the caller-supplied buffer.
93  *
94  * While there are several routines to allocate messages, there is only
95  * one routine to free messages: freeb().  freeb() simply invokes the
96  * dblk's free method, dbp->db_free(), which is set at allocation time.
97  *
98  * dupb() creates a new reference to a message by allocating a new mblk,
99  * incrementing the dblk reference count and setting the dblk's free
100  * method to dblk_decref().  The dblk's original free method is retained
101  * in db_lastfree.  dblk_decref() decrements the reference count on each
102  * freeb().  If this is not the last reference it just frees the mblk;
103  * if this *is* the last reference, it restores db_free to db_lastfree,
104  * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
105  *
106  * The implementation makes aggressive use of kmem object caching for
107  * maximum performance.  This makes the code simple and compact, but
108  * also a bit abstruse in some places.  The invariants that constitute a
109  * message's constructed state, described below, are more subtle than usual.
110  *
111  * Every dblk has an "attached mblk" as part of its constructed state.
112  * The mblk is allocated by the dblk's constructor and remains attached
113  * until the message is either dup'ed or pulled up.  In the dupb() case
114  * the mblk association doesn't matter until the last free, at which time
115  * dblk_decref() attaches the last mblk to the dblk.  pullupmsg() affects
116  * the mblk association because it swaps the leading mblks of two messages,
117  * so it is responsible for swapping their db_mblk pointers accordingly.
118  * From a constructed-state viewpoint it doesn't matter that a dblk's
119  * attached mblk can change while the message is allocated; all that
120  * matters is that the dblk has *some* attached mblk when it's freed.
121  *
122  * The sizes of the allocb() small-message caches are not magical.
123  * They represent a good trade-off between internal and external
124  * fragmentation for current workloads.  They should be reevaluated
125  * periodically, especially if allocations larger than DBLK_MAX_CACHE
126  * become common.  We use 64-byte alignment so that dblks don't
127  * straddle cache lines unnecessarily.
128  */
129 #define	DBLK_MAX_CACHE		73728
130 #define	DBLK_CACHE_ALIGN	64
131 #define	DBLK_MIN_SIZE		8
132 #define	DBLK_SIZE_SHIFT		3
133 
134 #ifdef _BIG_ENDIAN
135 #define	DBLK_RTFU_SHIFT(field)	\
136 	(8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
137 #else
138 #define	DBLK_RTFU_SHIFT(field)	\
139 	(8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
140 #endif
141 
142 #define	DBLK_RTFU(ref, type, flags, uioflag)	\
143 	(((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
144 	((type) << DBLK_RTFU_SHIFT(db_type)) | \
145 	(((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
146 	((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
147 #define	DBLK_RTFU_REF_MASK	(DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
148 #define	DBLK_RTFU_WORD(dbp)	(*((uint32_t *)&(dbp)->db_ref))
149 #define	MBLK_BAND_FLAG_WORD(mp)	(*((uint32_t *)&(mp)->b_band))
150 
151 static size_t dblk_sizes[] = {
152 #ifdef _LP64
153 	16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856,
154 	8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624,
155 	40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392,
156 #else
157 	64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904,
158 	8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672,
159 	40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440,
160 #endif
161 	DBLK_MAX_CACHE, 0
162 };
163 
164 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
165 static struct kmem_cache *mblk_cache;
166 static struct kmem_cache *dblk_esb_cache;
167 static struct kmem_cache *fthdr_cache;
168 static struct kmem_cache *ftblk_cache;
169 
170 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
171 static mblk_t *allocb_oversize(size_t size, int flags);
172 static int allocb_tryhard_fails;
173 static void frnop_func(void *arg);
174 frtn_t frnop = { frnop_func };
175 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
176 
177 static boolean_t rwnext_enter(queue_t *qp);
178 static void rwnext_exit(queue_t *qp);
179 
180 /*
181  * Patchable mblk/dblk kmem_cache flags.
182  */
183 int dblk_kmem_flags = 0;
184 int mblk_kmem_flags = 0;
185 
186 static int
187 dblk_constructor(void *buf, void *cdrarg, int kmflags)
188 {
189 	dblk_t *dbp = buf;
190 	ssize_t msg_size = (ssize_t)cdrarg;
191 	size_t index;
192 
193 	ASSERT(msg_size != 0);
194 
195 	index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
196 
197 	ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
198 
199 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
200 		return (-1);
201 	if ((msg_size & PAGEOFFSET) == 0) {
202 		dbp->db_base = kmem_alloc(msg_size, kmflags);
203 		if (dbp->db_base == NULL) {
204 			kmem_cache_free(mblk_cache, dbp->db_mblk);
205 			return (-1);
206 		}
207 	} else {
208 		dbp->db_base = (unsigned char *)&dbp[1];
209 	}
210 
211 	dbp->db_mblk->b_datap = dbp;
212 	dbp->db_cache = dblk_cache[index];
213 	dbp->db_lim = dbp->db_base + msg_size;
214 	dbp->db_free = dbp->db_lastfree = dblk_lastfree;
215 	dbp->db_frtnp = NULL;
216 	dbp->db_fthdr = NULL;
217 	dbp->db_credp = NULL;
218 	dbp->db_cpid = -1;
219 	dbp->db_struioflag = 0;
220 	dbp->db_struioun.cksum.flags = 0;
221 	return (0);
222 }
223 
224 /*ARGSUSED*/
225 static int
226 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
227 {
228 	dblk_t *dbp = buf;
229 
230 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
231 		return (-1);
232 	dbp->db_mblk->b_datap = dbp;
233 	dbp->db_cache = dblk_esb_cache;
234 	dbp->db_fthdr = NULL;
235 	dbp->db_credp = NULL;
236 	dbp->db_cpid = -1;
237 	dbp->db_struioflag = 0;
238 	dbp->db_struioun.cksum.flags = 0;
239 	return (0);
240 }
241 
242 static int
243 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
244 {
245 	dblk_t *dbp = buf;
246 	bcache_t *bcp = cdrarg;
247 
248 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
249 		return (-1);
250 
251 	dbp->db_base = kmem_cache_alloc(bcp->buffer_cache, kmflags);
252 	if (dbp->db_base == NULL) {
253 		kmem_cache_free(mblk_cache, dbp->db_mblk);
254 		return (-1);
255 	}
256 
257 	dbp->db_mblk->b_datap = dbp;
258 	dbp->db_cache = (void *)bcp;
259 	dbp->db_lim = dbp->db_base + bcp->size;
260 	dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
261 	dbp->db_frtnp = NULL;
262 	dbp->db_fthdr = NULL;
263 	dbp->db_credp = NULL;
264 	dbp->db_cpid = -1;
265 	dbp->db_struioflag = 0;
266 	dbp->db_struioun.cksum.flags = 0;
267 	return (0);
268 }
269 
270 /*ARGSUSED*/
271 static void
272 dblk_destructor(void *buf, void *cdrarg)
273 {
274 	dblk_t *dbp = buf;
275 	ssize_t msg_size = (ssize_t)cdrarg;
276 
277 	ASSERT(dbp->db_mblk->b_datap == dbp);
278 	ASSERT(msg_size != 0);
279 	ASSERT(dbp->db_struioflag == 0);
280 	ASSERT(dbp->db_struioun.cksum.flags == 0);
281 
282 	if ((msg_size & PAGEOFFSET) == 0) {
283 		kmem_free(dbp->db_base, msg_size);
284 	}
285 
286 	kmem_cache_free(mblk_cache, dbp->db_mblk);
287 }
288 
289 static void
290 bcache_dblk_destructor(void *buf, void *cdrarg)
291 {
292 	dblk_t *dbp = buf;
293 	bcache_t *bcp = cdrarg;
294 
295 	kmem_cache_free(bcp->buffer_cache, dbp->db_base);
296 
297 	ASSERT(dbp->db_mblk->b_datap == dbp);
298 	ASSERT(dbp->db_struioflag == 0);
299 	ASSERT(dbp->db_struioun.cksum.flags == 0);
300 
301 	kmem_cache_free(mblk_cache, dbp->db_mblk);
302 }
303 
304 /* ARGSUSED */
305 static int
306 ftblk_constructor(void *buf, void *cdrarg, int kmflags)
307 {
308 	ftblk_t *fbp = buf;
309 	int i;
310 
311 	bzero(fbp, sizeof (ftblk_t));
312 	if (str_ftstack != 0) {
313 		for (i = 0; i < FTBLK_EVNTS; i++)
314 			fbp->ev[i].stk = kmem_alloc(sizeof (ftstk_t), kmflags);
315 	}
316 
317 	return (0);
318 }
319 
320 /* ARGSUSED */
321 static void
322 ftblk_destructor(void *buf, void *cdrarg)
323 {
324 	ftblk_t *fbp = buf;
325 	int i;
326 
327 	if (str_ftstack != 0) {
328 		for (i = 0; i < FTBLK_EVNTS; i++) {
329 			if (fbp->ev[i].stk != NULL) {
330 				kmem_free(fbp->ev[i].stk, sizeof (ftstk_t));
331 				fbp->ev[i].stk = NULL;
332 			}
333 		}
334 	}
335 }
336 
337 static int
338 fthdr_constructor(void *buf, void *cdrarg, int kmflags)
339 {
340 	fthdr_t *fhp = buf;
341 
342 	return (ftblk_constructor(&fhp->first, cdrarg, kmflags));
343 }
344 
345 static void
346 fthdr_destructor(void *buf, void *cdrarg)
347 {
348 	fthdr_t *fhp = buf;
349 
350 	ftblk_destructor(&fhp->first, cdrarg);
351 }
352 
353 void
354 streams_msg_init(void)
355 {
356 	char name[40];
357 	size_t size;
358 	size_t lastsize = DBLK_MIN_SIZE;
359 	size_t *sizep;
360 	struct kmem_cache *cp;
361 	size_t tot_size;
362 	int offset;
363 
364 	mblk_cache = kmem_cache_create("streams_mblk", sizeof (mblk_t), 32,
365 	    NULL, NULL, NULL, NULL, NULL, mblk_kmem_flags);
366 
367 	for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
368 
369 		if ((offset = (size & PAGEOFFSET)) != 0) {
370 			/*
371 			 * We are in the middle of a page, dblk should
372 			 * be allocated on the same page
373 			 */
374 			tot_size = size + sizeof (dblk_t);
375 			ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
376 			    < PAGESIZE);
377 			ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
378 
379 		} else {
380 
381 			/*
382 			 * buf size is multiple of page size, dblk and
383 			 * buffer are allocated separately.
384 			 */
385 
386 			ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
387 			tot_size = sizeof (dblk_t);
388 		}
389 
390 		(void) sprintf(name, "streams_dblk_%ld", size);
391 		cp = kmem_cache_create(name, tot_size, DBLK_CACHE_ALIGN,
392 		    dblk_constructor, dblk_destructor, NULL, (void *)(size),
393 		    NULL, dblk_kmem_flags);
394 
395 		while (lastsize <= size) {
396 			dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
397 			lastsize += DBLK_MIN_SIZE;
398 		}
399 	}
400 
401 	dblk_esb_cache = kmem_cache_create("streams_dblk_esb", sizeof (dblk_t),
402 	    DBLK_CACHE_ALIGN, dblk_esb_constructor, dblk_destructor, NULL,
403 	    (void *)sizeof (dblk_t), NULL, dblk_kmem_flags);
404 	fthdr_cache = kmem_cache_create("streams_fthdr", sizeof (fthdr_t), 32,
405 	    fthdr_constructor, fthdr_destructor, NULL, NULL, NULL, 0);
406 	ftblk_cache = kmem_cache_create("streams_ftblk", sizeof (ftblk_t), 32,
407 	    ftblk_constructor, ftblk_destructor, NULL, NULL, NULL, 0);
408 
409 	/* Initialize Multidata caches */
410 	mmd_init();
411 
412 	/* initialize throttling queue for esballoc */
413 	esballoc_queue_init();
414 }
415 
416 /*ARGSUSED*/
417 mblk_t *
418 allocb(size_t size, uint_t pri)
419 {
420 	dblk_t *dbp;
421 	mblk_t *mp;
422 	size_t index;
423 
424 	index =  (size - 1)  >> DBLK_SIZE_SHIFT;
425 
426 	if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
427 		if (size != 0) {
428 			mp = allocb_oversize(size, KM_NOSLEEP);
429 			goto out;
430 		}
431 		index = 0;
432 	}
433 
434 	if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
435 		mp = NULL;
436 		goto out;
437 	}
438 
439 	mp = dbp->db_mblk;
440 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
441 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
442 	mp->b_rptr = mp->b_wptr = dbp->db_base;
443 	mp->b_queue = NULL;
444 	MBLK_BAND_FLAG_WORD(mp) = 0;
445 	STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
446 out:
447 	FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
448 
449 	return (mp);
450 }
451 
452 /*
453  * Allocate an mblk taking db_credp and db_cpid from the template.
454  * Allow the cred to be NULL.
455  */
456 mblk_t *
457 allocb_tmpl(size_t size, const mblk_t *tmpl)
458 {
459 	mblk_t *mp = allocb(size, 0);
460 
461 	if (mp != NULL) {
462 		dblk_t *src = tmpl->b_datap;
463 		dblk_t *dst = mp->b_datap;
464 		cred_t *cr;
465 		pid_t cpid;
466 
467 		cr = msg_getcred(tmpl, &cpid);
468 		if (cr != NULL)
469 			crhold(dst->db_credp = cr);
470 		dst->db_cpid = cpid;
471 		dst->db_type = src->db_type;
472 	}
473 	return (mp);
474 }
475 
476 mblk_t *
477 allocb_cred(size_t size, cred_t *cr, pid_t cpid)
478 {
479 	mblk_t *mp = allocb(size, 0);
480 
481 	ASSERT(cr != NULL);
482 	if (mp != NULL) {
483 		dblk_t *dbp = mp->b_datap;
484 
485 		crhold(dbp->db_credp = cr);
486 		dbp->db_cpid = cpid;
487 	}
488 	return (mp);
489 }
490 
491 mblk_t *
492 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr, pid_t cpid)
493 {
494 	mblk_t *mp = allocb_wait(size, 0, flags, error);
495 
496 	ASSERT(cr != NULL);
497 	if (mp != NULL) {
498 		dblk_t *dbp = mp->b_datap;
499 
500 		crhold(dbp->db_credp = cr);
501 		dbp->db_cpid = cpid;
502 	}
503 
504 	return (mp);
505 }
506 
507 /*
508  * Extract the db_cred (and optionally db_cpid) from a message.
509  * We find the first mblk which has a non-NULL db_cred and use that.
510  * If none found we return NULL.
511  * Does NOT get a hold on the cred.
512  */
513 cred_t *
514 msg_getcred(const mblk_t *mp, pid_t *cpidp)
515 {
516 	cred_t *cr = NULL;
517 	cred_t *cr2;
518 	mblk_t *mp2;
519 
520 	while (mp != NULL) {
521 		dblk_t *dbp = mp->b_datap;
522 
523 		cr = dbp->db_credp;
524 		if (cr == NULL) {
525 			mp = mp->b_cont;
526 			continue;
527 		}
528 		if (cpidp != NULL)
529 			*cpidp = dbp->db_cpid;
530 
531 #ifdef DEBUG
532 		/*
533 		 * Normally there should at most one db_credp in a message.
534 		 * But if there are multiple (as in the case of some M_IOC*
535 		 * and some internal messages in TCP/IP bind logic) then
536 		 * they must be identical in the normal case.
537 		 * However, a socket can be shared between different uids
538 		 * in which case data queued in TCP would be from different
539 		 * creds. Thus we can only assert for the zoneid being the
540 		 * same. Due to Multi-level Level Ports for TX, some
541 		 * cred_t can have a NULL cr_zone, and we skip the comparison
542 		 * in that case.
543 		 */
544 		mp2 = mp->b_cont;
545 		while (mp2 != NULL) {
546 			cr2 = DB_CRED(mp2);
547 			if (cr2 != NULL) {
548 				DTRACE_PROBE2(msg__getcred,
549 				    cred_t *, cr, cred_t *, cr2);
550 				ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
551 				    crgetzone(cr) == NULL ||
552 				    crgetzone(cr2) == NULL);
553 			}
554 			mp2 = mp2->b_cont;
555 		}
556 #endif
557 		return (cr);
558 	}
559 	if (cpidp != NULL)
560 		*cpidp = NOPID;
561 	return (NULL);
562 }
563 
564 /*
565  * Variant of msg_getcred which, when a cred is found
566  * 1. Returns with a hold on the cred
567  * 2. Clears the first cred in the mblk.
568  * This is more efficient to use than a msg_getcred() + crhold() when
569  * the message is freed after the cred has been extracted.
570  *
571  * The caller is responsible for ensuring that there is no other reference
572  * on the message since db_credp can not be cleared when there are other
573  * references.
574  */
575 cred_t *
576 msg_extractcred(mblk_t *mp, pid_t *cpidp)
577 {
578 	cred_t *cr = NULL;
579 	cred_t *cr2;
580 	mblk_t *mp2;
581 
582 	while (mp != NULL) {
583 		dblk_t *dbp = mp->b_datap;
584 
585 		cr = dbp->db_credp;
586 		if (cr == NULL) {
587 			mp = mp->b_cont;
588 			continue;
589 		}
590 		ASSERT(dbp->db_ref == 1);
591 		dbp->db_credp = NULL;
592 		if (cpidp != NULL)
593 			*cpidp = dbp->db_cpid;
594 #ifdef DEBUG
595 		/*
596 		 * Normally there should at most one db_credp in a message.
597 		 * But if there are multiple (as in the case of some M_IOC*
598 		 * and some internal messages in TCP/IP bind logic) then
599 		 * they must be identical in the normal case.
600 		 * However, a socket can be shared between different uids
601 		 * in which case data queued in TCP would be from different
602 		 * creds. Thus we can only assert for the zoneid being the
603 		 * same. Due to Multi-level Level Ports for TX, some
604 		 * cred_t can have a NULL cr_zone, and we skip the comparison
605 		 * in that case.
606 		 */
607 		mp2 = mp->b_cont;
608 		while (mp2 != NULL) {
609 			cr2 = DB_CRED(mp2);
610 			if (cr2 != NULL) {
611 				DTRACE_PROBE2(msg__extractcred,
612 				    cred_t *, cr, cred_t *, cr2);
613 				ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
614 				    crgetzone(cr) == NULL ||
615 				    crgetzone(cr2) == NULL);
616 			}
617 			mp2 = mp2->b_cont;
618 		}
619 #endif
620 		return (cr);
621 	}
622 	return (NULL);
623 }
624 /*
625  * Get the label for a message. Uses the first mblk in the message
626  * which has a non-NULL db_credp.
627  * Returns NULL if there is no credp.
628  */
629 extern struct ts_label_s *
630 msg_getlabel(const mblk_t *mp)
631 {
632 	cred_t *cr = msg_getcred(mp, NULL);
633 
634 	if (cr == NULL)
635 		return (NULL);
636 
637 	return (crgetlabel(cr));
638 }
639 
640 void
641 freeb(mblk_t *mp)
642 {
643 	dblk_t *dbp = mp->b_datap;
644 
645 	ASSERT(dbp->db_ref > 0);
646 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
647 	FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
648 
649 	STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
650 
651 	dbp->db_free(mp, dbp);
652 }
653 
654 void
655 freemsg(mblk_t *mp)
656 {
657 	FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
658 	while (mp) {
659 		dblk_t *dbp = mp->b_datap;
660 		mblk_t *mp_cont = mp->b_cont;
661 
662 		ASSERT(dbp->db_ref > 0);
663 		ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
664 
665 		STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
666 
667 		dbp->db_free(mp, dbp);
668 		mp = mp_cont;
669 	}
670 }
671 
672 /*
673  * Reallocate a block for another use.  Try hard to use the old block.
674  * If the old data is wanted (copy), leave b_wptr at the end of the data,
675  * otherwise return b_wptr = b_rptr.
676  *
677  * This routine is private and unstable.
678  */
679 mblk_t	*
680 reallocb(mblk_t *mp, size_t size, uint_t copy)
681 {
682 	mblk_t		*mp1;
683 	unsigned char	*old_rptr;
684 	ptrdiff_t	cur_size;
685 
686 	if (mp == NULL)
687 		return (allocb(size, BPRI_HI));
688 
689 	cur_size = mp->b_wptr - mp->b_rptr;
690 	old_rptr = mp->b_rptr;
691 
692 	ASSERT(mp->b_datap->db_ref != 0);
693 
694 	if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
695 		/*
696 		 * If the data is wanted and it will fit where it is, no
697 		 * work is required.
698 		 */
699 		if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
700 			return (mp);
701 
702 		mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
703 		mp1 = mp;
704 	} else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
705 		/* XXX other mp state could be copied too, db_flags ... ? */
706 		mp1->b_cont = mp->b_cont;
707 	} else {
708 		return (NULL);
709 	}
710 
711 	if (copy) {
712 		bcopy(old_rptr, mp1->b_rptr, cur_size);
713 		mp1->b_wptr = mp1->b_rptr + cur_size;
714 	}
715 
716 	if (mp != mp1)
717 		freeb(mp);
718 
719 	return (mp1);
720 }
721 
722 static void
723 dblk_lastfree(mblk_t *mp, dblk_t *dbp)
724 {
725 	ASSERT(dbp->db_mblk == mp);
726 	if (dbp->db_fthdr != NULL)
727 		str_ftfree(dbp);
728 
729 	/* set credp and projid to be 'unspecified' before returning to cache */
730 	if (dbp->db_credp != NULL) {
731 		crfree(dbp->db_credp);
732 		dbp->db_credp = NULL;
733 	}
734 	dbp->db_cpid = -1;
735 
736 	/* Reset the struioflag and the checksum flag fields */
737 	dbp->db_struioflag = 0;
738 	dbp->db_struioun.cksum.flags = 0;
739 
740 	/* and the COOKED and/or UIOA flag(s) */
741 	dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA);
742 
743 	kmem_cache_free(dbp->db_cache, dbp);
744 }
745 
746 static void
747 dblk_decref(mblk_t *mp, dblk_t *dbp)
748 {
749 	if (dbp->db_ref != 1) {
750 		uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
751 		    -(1 << DBLK_RTFU_SHIFT(db_ref)));
752 		/*
753 		 * atomic_add_32_nv() just decremented db_ref, so we no longer
754 		 * have a reference to the dblk, which means another thread
755 		 * could free it.  Therefore we cannot examine the dblk to
756 		 * determine whether ours was the last reference.  Instead,
757 		 * we extract the new and minimum reference counts from rtfu.
758 		 * Note that all we're really saying is "if (ref != refmin)".
759 		 */
760 		if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
761 		    ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
762 			kmem_cache_free(mblk_cache, mp);
763 			return;
764 		}
765 	}
766 	dbp->db_mblk = mp;
767 	dbp->db_free = dbp->db_lastfree;
768 	dbp->db_lastfree(mp, dbp);
769 }
770 
771 mblk_t *
772 dupb(mblk_t *mp)
773 {
774 	dblk_t *dbp = mp->b_datap;
775 	mblk_t *new_mp;
776 	uint32_t oldrtfu, newrtfu;
777 
778 	if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
779 		goto out;
780 
781 	new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
782 	new_mp->b_rptr = mp->b_rptr;
783 	new_mp->b_wptr = mp->b_wptr;
784 	new_mp->b_datap = dbp;
785 	new_mp->b_queue = NULL;
786 	MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
787 
788 	STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
789 
790 	dbp->db_free = dblk_decref;
791 	do {
792 		ASSERT(dbp->db_ref > 0);
793 		oldrtfu = DBLK_RTFU_WORD(dbp);
794 		newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
795 		/*
796 		 * If db_ref is maxed out we can't dup this message anymore.
797 		 */
798 		if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
799 			kmem_cache_free(mblk_cache, new_mp);
800 			new_mp = NULL;
801 			goto out;
802 		}
803 	} while (atomic_cas_32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) !=
804 	    oldrtfu);
805 
806 out:
807 	FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
808 	return (new_mp);
809 }
810 
811 static void
812 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
813 {
814 	frtn_t *frp = dbp->db_frtnp;
815 
816 	ASSERT(dbp->db_mblk == mp);
817 	frp->free_func(frp->free_arg);
818 	if (dbp->db_fthdr != NULL)
819 		str_ftfree(dbp);
820 
821 	/* set credp and projid to be 'unspecified' before returning to cache */
822 	if (dbp->db_credp != NULL) {
823 		crfree(dbp->db_credp);
824 		dbp->db_credp = NULL;
825 	}
826 	dbp->db_cpid = -1;
827 	dbp->db_struioflag = 0;
828 	dbp->db_struioun.cksum.flags = 0;
829 
830 	kmem_cache_free(dbp->db_cache, dbp);
831 }
832 
833 /*ARGSUSED*/
834 static void
835 frnop_func(void *arg)
836 {
837 }
838 
839 /*
840  * Generic esballoc used to implement the four flavors: [d]esballoc[a].
841  *
842  * The variants with a 'd' prefix (desballoc, desballoca)
843  *	directly free the mblk when it loses its last ref,
844  *	where the other variants free asynchronously.
845  * The variants with an 'a' suffix (esballoca, desballoca)
846  *	add an extra ref, effectively letting the streams subsystem
847  *	know that the message data should not be modified.
848  *	(eg. see db_ref checks in reallocb and elsewhere)
849  *
850  * The method used by the 'a' suffix functions to keep the dblk
851  * db_ref > 1 is non-obvious.  The macro DBLK_RTFU(2,...) passed to
852  * gesballoc sets the initial db_ref = 2 and sets the DBLK_REFMIN
853  * bit in db_flags.  In dblk_decref() that flag essentially means
854  * the dblk has one extra ref, so the "last ref" is one, not zero.
855  */
856 static mblk_t *
857 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
858     void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
859 {
860 	dblk_t *dbp;
861 	mblk_t *mp;
862 
863 	ASSERT(base != NULL && frp != NULL);
864 
865 	if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
866 		mp = NULL;
867 		goto out;
868 	}
869 
870 	mp = dbp->db_mblk;
871 	dbp->db_base = base;
872 	dbp->db_lim = base + size;
873 	dbp->db_free = dbp->db_lastfree = lastfree;
874 	dbp->db_frtnp = frp;
875 	DBLK_RTFU_WORD(dbp) = db_rtfu;
876 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
877 	mp->b_rptr = mp->b_wptr = base;
878 	mp->b_queue = NULL;
879 	MBLK_BAND_FLAG_WORD(mp) = 0;
880 
881 out:
882 	FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
883 	return (mp);
884 }
885 
886 /*ARGSUSED*/
887 mblk_t *
888 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
889 {
890 	mblk_t *mp;
891 
892 	/*
893 	 * Note that this is structured to allow the common case (i.e.
894 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
895 	 * call optimization.
896 	 */
897 	if (!str_ftnever) {
898 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
899 		    frp, freebs_enqueue, KM_NOSLEEP);
900 
901 		if (mp != NULL)
902 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
903 		return (mp);
904 	}
905 
906 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
907 	    frp, freebs_enqueue, KM_NOSLEEP));
908 }
909 
910 /*
911  * Same as esballoc() but sleeps waiting for memory.
912  */
913 /*ARGSUSED*/
914 mblk_t *
915 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
916 {
917 	mblk_t *mp;
918 
919 	/*
920 	 * Note that this is structured to allow the common case (i.e.
921 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
922 	 * call optimization.
923 	 */
924 	if (!str_ftnever) {
925 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
926 		    frp, freebs_enqueue, KM_SLEEP);
927 
928 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
929 		return (mp);
930 	}
931 
932 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
933 	    frp, freebs_enqueue, KM_SLEEP));
934 }
935 
936 /*ARGSUSED*/
937 mblk_t *
938 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
939 {
940 	mblk_t *mp;
941 
942 	/*
943 	 * Note that this is structured to allow the common case (i.e.
944 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
945 	 * call optimization.
946 	 */
947 	if (!str_ftnever) {
948 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
949 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
950 
951 		if (mp != NULL)
952 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
953 		return (mp);
954 	}
955 
956 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
957 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
958 }
959 
960 /*ARGSUSED*/
961 mblk_t *
962 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
963 {
964 	mblk_t *mp;
965 
966 	/*
967 	 * Note that this is structured to allow the common case (i.e.
968 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
969 	 * call optimization.
970 	 */
971 	if (!str_ftnever) {
972 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
973 		    frp, freebs_enqueue, KM_NOSLEEP);
974 
975 		if (mp != NULL)
976 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
977 		return (mp);
978 	}
979 
980 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
981 	    frp, freebs_enqueue, KM_NOSLEEP));
982 }
983 
984 /*ARGSUSED*/
985 mblk_t *
986 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
987 {
988 	mblk_t *mp;
989 
990 	/*
991 	 * Note that this is structured to allow the common case (i.e.
992 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
993 	 * call optimization.
994 	 */
995 	if (!str_ftnever) {
996 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
997 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
998 
999 		if (mp != NULL)
1000 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
1001 		return (mp);
1002 	}
1003 
1004 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
1005 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
1006 }
1007 
1008 static void
1009 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
1010 {
1011 	bcache_t *bcp = dbp->db_cache;
1012 
1013 	ASSERT(dbp->db_mblk == mp);
1014 	if (dbp->db_fthdr != NULL)
1015 		str_ftfree(dbp);
1016 
1017 	/* set credp and projid to be 'unspecified' before returning to cache */
1018 	if (dbp->db_credp != NULL) {
1019 		crfree(dbp->db_credp);
1020 		dbp->db_credp = NULL;
1021 	}
1022 	dbp->db_cpid = -1;
1023 	dbp->db_struioflag = 0;
1024 	dbp->db_struioun.cksum.flags = 0;
1025 
1026 	mutex_enter(&bcp->mutex);
1027 	kmem_cache_free(bcp->dblk_cache, dbp);
1028 	bcp->alloc--;
1029 
1030 	if (bcp->alloc == 0 && bcp->destroy != 0) {
1031 		kmem_cache_destroy(bcp->dblk_cache);
1032 		kmem_cache_destroy(bcp->buffer_cache);
1033 		mutex_exit(&bcp->mutex);
1034 		mutex_destroy(&bcp->mutex);
1035 		kmem_free(bcp, sizeof (bcache_t));
1036 	} else {
1037 		mutex_exit(&bcp->mutex);
1038 	}
1039 }
1040 
1041 bcache_t *
1042 bcache_create(char *name, size_t size, uint_t align)
1043 {
1044 	bcache_t *bcp;
1045 	char buffer[255];
1046 
1047 	ASSERT((align & (align - 1)) == 0);
1048 
1049 	if ((bcp = kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == NULL)
1050 		return (NULL);
1051 
1052 	bcp->size = size;
1053 	bcp->align = align;
1054 	bcp->alloc = 0;
1055 	bcp->destroy = 0;
1056 
1057 	mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
1058 
1059 	(void) sprintf(buffer, "%s_buffer_cache", name);
1060 	bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
1061 	    NULL, NULL, NULL, 0);
1062 	(void) sprintf(buffer, "%s_dblk_cache", name);
1063 	bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
1064 	    DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
1065 	    NULL, (void *)bcp, NULL, 0);
1066 
1067 	return (bcp);
1068 }
1069 
1070 void
1071 bcache_destroy(bcache_t *bcp)
1072 {
1073 	ASSERT(bcp != NULL);
1074 
1075 	mutex_enter(&bcp->mutex);
1076 	if (bcp->alloc == 0) {
1077 		kmem_cache_destroy(bcp->dblk_cache);
1078 		kmem_cache_destroy(bcp->buffer_cache);
1079 		mutex_exit(&bcp->mutex);
1080 		mutex_destroy(&bcp->mutex);
1081 		kmem_free(bcp, sizeof (bcache_t));
1082 	} else {
1083 		bcp->destroy++;
1084 		mutex_exit(&bcp->mutex);
1085 	}
1086 }
1087 
1088 /*ARGSUSED*/
1089 mblk_t *
1090 bcache_allocb(bcache_t *bcp, uint_t pri)
1091 {
1092 	dblk_t *dbp;
1093 	mblk_t *mp = NULL;
1094 
1095 	ASSERT(bcp != NULL);
1096 
1097 	mutex_enter(&bcp->mutex);
1098 	if (bcp->destroy != 0) {
1099 		mutex_exit(&bcp->mutex);
1100 		goto out;
1101 	}
1102 
1103 	if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
1104 		mutex_exit(&bcp->mutex);
1105 		goto out;
1106 	}
1107 	bcp->alloc++;
1108 	mutex_exit(&bcp->mutex);
1109 
1110 	ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
1111 
1112 	mp = dbp->db_mblk;
1113 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1114 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
1115 	mp->b_rptr = mp->b_wptr = dbp->db_base;
1116 	mp->b_queue = NULL;
1117 	MBLK_BAND_FLAG_WORD(mp) = 0;
1118 	STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
1119 out:
1120 	FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
1121 
1122 	return (mp);
1123 }
1124 
1125 static void
1126 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
1127 {
1128 	ASSERT(dbp->db_mblk == mp);
1129 	if (dbp->db_fthdr != NULL)
1130 		str_ftfree(dbp);
1131 
1132 	/* set credp and projid to be 'unspecified' before returning to cache */
1133 	if (dbp->db_credp != NULL) {
1134 		crfree(dbp->db_credp);
1135 		dbp->db_credp = NULL;
1136 	}
1137 	dbp->db_cpid = -1;
1138 	dbp->db_struioflag = 0;
1139 	dbp->db_struioun.cksum.flags = 0;
1140 
1141 	kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
1142 	kmem_cache_free(dbp->db_cache, dbp);
1143 }
1144 
1145 static mblk_t *
1146 allocb_oversize(size_t size, int kmflags)
1147 {
1148 	mblk_t *mp;
1149 	void *buf;
1150 
1151 	size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
1152 	if ((buf = kmem_alloc(size, kmflags)) == NULL)
1153 		return (NULL);
1154 	if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
1155 	    &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
1156 		kmem_free(buf, size);
1157 
1158 	if (mp != NULL)
1159 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
1160 
1161 	return (mp);
1162 }
1163 
1164 mblk_t *
1165 allocb_tryhard(size_t target_size)
1166 {
1167 	size_t size;
1168 	mblk_t *bp;
1169 
1170 	for (size = target_size; size < target_size + 512;
1171 	    size += DBLK_CACHE_ALIGN)
1172 		if ((bp = allocb(size, BPRI_HI)) != NULL)
1173 			return (bp);
1174 	allocb_tryhard_fails++;
1175 	return (NULL);
1176 }
1177 
1178 /*
1179  * This routine is consolidation private for STREAMS internal use
1180  * This routine may only be called from sync routines (i.e., not
1181  * from put or service procedures).  It is located here (rather
1182  * than strsubr.c) so that we don't have to expose all of the
1183  * allocb() implementation details in header files.
1184  */
1185 mblk_t *
1186 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
1187 {
1188 	dblk_t *dbp;
1189 	mblk_t *mp;
1190 	size_t index;
1191 
1192 	index = (size -1) >> DBLK_SIZE_SHIFT;
1193 
1194 	if (flags & STR_NOSIG) {
1195 		if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
1196 			if (size != 0) {
1197 				mp = allocb_oversize(size, KM_SLEEP);
1198 				FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
1199 				    (uintptr_t)mp);
1200 				return (mp);
1201 			}
1202 			index = 0;
1203 		}
1204 
1205 		dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
1206 		mp = dbp->db_mblk;
1207 		DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1208 		mp->b_next = mp->b_prev = mp->b_cont = NULL;
1209 		mp->b_rptr = mp->b_wptr = dbp->db_base;
1210 		mp->b_queue = NULL;
1211 		MBLK_BAND_FLAG_WORD(mp) = 0;
1212 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
1213 
1214 		FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
1215 
1216 	} else {
1217 		while ((mp = allocb(size, pri)) == NULL) {
1218 			if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
1219 				return (NULL);
1220 		}
1221 	}
1222 
1223 	return (mp);
1224 }
1225 
1226 /*
1227  * Call function 'func' with 'arg' when a class zero block can
1228  * be allocated with priority 'pri'.
1229  */
1230 bufcall_id_t
1231 esbbcall(uint_t pri, void (*func)(void *), void *arg)
1232 {
1233 	return (bufcall(1, pri, func, arg));
1234 }
1235 
1236 /*
1237  * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
1238  * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
1239  * This provides consistency for all internal allocators of ioctl.
1240  */
1241 mblk_t *
1242 mkiocb(uint_t cmd)
1243 {
1244 	struct iocblk	*ioc;
1245 	mblk_t		*mp;
1246 
1247 	/*
1248 	 * Allocate enough space for any of the ioctl related messages.
1249 	 */
1250 	if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
1251 		return (NULL);
1252 
1253 	bzero(mp->b_rptr, sizeof (union ioctypes));
1254 
1255 	/*
1256 	 * Set the mblk_t information and ptrs correctly.
1257 	 */
1258 	mp->b_wptr += sizeof (struct iocblk);
1259 	mp->b_datap->db_type = M_IOCTL;
1260 
1261 	/*
1262 	 * Fill in the fields.
1263 	 */
1264 	ioc		= (struct iocblk *)mp->b_rptr;
1265 	ioc->ioc_cmd	= cmd;
1266 	ioc->ioc_cr	= kcred;
1267 	ioc->ioc_id	= getiocseqno();
1268 	ioc->ioc_flag	= IOC_NATIVE;
1269 	return (mp);
1270 }
1271 
1272 /*
1273  * test if block of given size can be allocated with a request of
1274  * the given priority.
1275  * 'pri' is no longer used, but is retained for compatibility.
1276  */
1277 /* ARGSUSED */
1278 int
1279 testb(size_t size, uint_t pri)
1280 {
1281 	return ((size + sizeof (dblk_t)) <= kmem_avail());
1282 }
1283 
1284 /*
1285  * Call function 'func' with argument 'arg' when there is a reasonably
1286  * good chance that a block of size 'size' can be allocated.
1287  * 'pri' is no longer used, but is retained for compatibility.
1288  */
1289 /* ARGSUSED */
1290 bufcall_id_t
1291 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
1292 {
1293 	static long bid = 1;	/* always odd to save checking for zero */
1294 	bufcall_id_t bc_id;
1295 	struct strbufcall *bcp;
1296 
1297 	if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
1298 		return (0);
1299 
1300 	bcp->bc_func = func;
1301 	bcp->bc_arg = arg;
1302 	bcp->bc_size = size;
1303 	bcp->bc_next = NULL;
1304 	bcp->bc_executor = NULL;
1305 
1306 	mutex_enter(&strbcall_lock);
1307 	/*
1308 	 * After bcp is linked into strbcalls and strbcall_lock is dropped there
1309 	 * should be no references to bcp since it may be freed by
1310 	 * runbufcalls(). Since bcp_id field is returned, we save its value in
1311 	 * the local var.
1312 	 */
1313 	bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2);	/* keep it odd */
1314 
1315 	/*
1316 	 * add newly allocated stream event to existing
1317 	 * linked list of events.
1318 	 */
1319 	if (strbcalls.bc_head == NULL) {
1320 		strbcalls.bc_head = strbcalls.bc_tail = bcp;
1321 	} else {
1322 		strbcalls.bc_tail->bc_next = bcp;
1323 		strbcalls.bc_tail = bcp;
1324 	}
1325 
1326 	cv_signal(&strbcall_cv);
1327 	mutex_exit(&strbcall_lock);
1328 	return (bc_id);
1329 }
1330 
1331 /*
1332  * Cancel a bufcall request.
1333  */
1334 void
1335 unbufcall(bufcall_id_t id)
1336 {
1337 	strbufcall_t *bcp, *pbcp;
1338 
1339 	mutex_enter(&strbcall_lock);
1340 again:
1341 	pbcp = NULL;
1342 	for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
1343 		if (id == bcp->bc_id)
1344 			break;
1345 		pbcp = bcp;
1346 	}
1347 	if (bcp) {
1348 		if (bcp->bc_executor != NULL) {
1349 			if (bcp->bc_executor != curthread) {
1350 				cv_wait(&bcall_cv, &strbcall_lock);
1351 				goto again;
1352 			}
1353 		} else {
1354 			if (pbcp)
1355 				pbcp->bc_next = bcp->bc_next;
1356 			else
1357 				strbcalls.bc_head = bcp->bc_next;
1358 			if (bcp == strbcalls.bc_tail)
1359 				strbcalls.bc_tail = pbcp;
1360 			kmem_free(bcp, sizeof (strbufcall_t));
1361 		}
1362 	}
1363 	mutex_exit(&strbcall_lock);
1364 }
1365 
1366 /*
1367  * Duplicate a message block by block (uses dupb), returning
1368  * a pointer to the duplicate message.
1369  * Returns a non-NULL value only if the entire message
1370  * was dup'd.
1371  */
1372 mblk_t *
1373 dupmsg(mblk_t *bp)
1374 {
1375 	mblk_t *head, *nbp;
1376 
1377 	if (!bp || !(nbp = head = dupb(bp)))
1378 		return (NULL);
1379 
1380 	while (bp->b_cont) {
1381 		if (!(nbp->b_cont = dupb(bp->b_cont))) {
1382 			freemsg(head);
1383 			return (NULL);
1384 		}
1385 		nbp = nbp->b_cont;
1386 		bp = bp->b_cont;
1387 	}
1388 	return (head);
1389 }
1390 
1391 #define	DUPB_NOLOAN(bp) \
1392 	((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
1393 	copyb((bp)) : dupb((bp)))
1394 
1395 mblk_t *
1396 dupmsg_noloan(mblk_t *bp)
1397 {
1398 	mblk_t *head, *nbp;
1399 
1400 	if (bp == NULL || DB_TYPE(bp) != M_DATA ||
1401 	    ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
1402 		return (NULL);
1403 
1404 	while (bp->b_cont) {
1405 		if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
1406 			freemsg(head);
1407 			return (NULL);
1408 		}
1409 		nbp = nbp->b_cont;
1410 		bp = bp->b_cont;
1411 	}
1412 	return (head);
1413 }
1414 
1415 /*
1416  * Copy data from message and data block to newly allocated message and
1417  * data block. Returns new message block pointer, or NULL if error.
1418  * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
1419  * as in the original even when db_base is not word aligned. (bug 1052877)
1420  */
1421 mblk_t *
1422 copyb(mblk_t *bp)
1423 {
1424 	mblk_t	*nbp;
1425 	dblk_t	*dp, *ndp;
1426 	uchar_t *base;
1427 	size_t	size;
1428 	size_t	unaligned;
1429 
1430 	ASSERT(bp->b_wptr >= bp->b_rptr);
1431 
1432 	dp = bp->b_datap;
1433 	if (dp->db_fthdr != NULL)
1434 		STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
1435 
1436 	/*
1437 	 * Special handling for Multidata message; this should be
1438 	 * removed once a copy-callback routine is made available.
1439 	 */
1440 	if (dp->db_type == M_MULTIDATA) {
1441 		cred_t *cr;
1442 
1443 		if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL)
1444 			return (NULL);
1445 
1446 		nbp->b_flag = bp->b_flag;
1447 		nbp->b_band = bp->b_band;
1448 		ndp = nbp->b_datap;
1449 
1450 		/* See comments below on potential issues. */
1451 		STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1452 
1453 		ASSERT(ndp->db_type == dp->db_type);
1454 		cr = dp->db_credp;
1455 		if (cr != NULL)
1456 			crhold(ndp->db_credp = cr);
1457 		ndp->db_cpid = dp->db_cpid;
1458 		return (nbp);
1459 	}
1460 
1461 	size = dp->db_lim - dp->db_base;
1462 	unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
1463 	if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
1464 		return (NULL);
1465 	nbp->b_flag = bp->b_flag;
1466 	nbp->b_band = bp->b_band;
1467 	ndp = nbp->b_datap;
1468 
1469 	/*
1470 	 * Copy the various checksum information that came in
1471 	 * originally.
1472 	 */
1473 	ndp->db_cksumstart = dp->db_cksumstart;
1474 	ndp->db_cksumend = dp->db_cksumend;
1475 	ndp->db_cksumstuff = dp->db_cksumstuff;
1476 	bcopy(dp->db_struioun.data, ndp->db_struioun.data,
1477 	    sizeof (dp->db_struioun.data));
1478 
1479 	/*
1480 	 * Well, here is a potential issue.  If we are trying to
1481 	 * trace a flow, and we copy the message, we might lose
1482 	 * information about where this message might have been.
1483 	 * So we should inherit the FT data.  On the other hand,
1484 	 * a user might be interested only in alloc to free data.
1485 	 * So I guess the real answer is to provide a tunable.
1486 	 */
1487 	STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1488 
1489 	base = ndp->db_base + unaligned;
1490 	bcopy(dp->db_base, ndp->db_base + unaligned, size);
1491 
1492 	nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
1493 	nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
1494 
1495 	return (nbp);
1496 }
1497 
1498 /*
1499  * Copy data from message to newly allocated message using new
1500  * data blocks.  Returns a pointer to the new message, or NULL if error.
1501  */
1502 mblk_t *
1503 copymsg(mblk_t *bp)
1504 {
1505 	mblk_t *head, *nbp;
1506 
1507 	if (!bp || !(nbp = head = copyb(bp)))
1508 		return (NULL);
1509 
1510 	while (bp->b_cont) {
1511 		if (!(nbp->b_cont = copyb(bp->b_cont))) {
1512 			freemsg(head);
1513 			return (NULL);
1514 		}
1515 		nbp = nbp->b_cont;
1516 		bp = bp->b_cont;
1517 	}
1518 	return (head);
1519 }
1520 
1521 /*
1522  * link a message block to tail of message
1523  */
1524 void
1525 linkb(mblk_t *mp, mblk_t *bp)
1526 {
1527 	ASSERT(mp && bp);
1528 
1529 	for (; mp->b_cont; mp = mp->b_cont)
1530 		;
1531 	mp->b_cont = bp;
1532 }
1533 
1534 /*
1535  * unlink a message block from head of message
1536  * return pointer to new message.
1537  * NULL if message becomes empty.
1538  */
1539 mblk_t *
1540 unlinkb(mblk_t *bp)
1541 {
1542 	mblk_t *bp1;
1543 
1544 	bp1 = bp->b_cont;
1545 	bp->b_cont = NULL;
1546 	return (bp1);
1547 }
1548 
1549 /*
1550  * remove a message block "bp" from message "mp"
1551  *
1552  * Return pointer to new message or NULL if no message remains.
1553  * Return -1 if bp is not found in message.
1554  */
1555 mblk_t *
1556 rmvb(mblk_t *mp, mblk_t *bp)
1557 {
1558 	mblk_t *tmp;
1559 	mblk_t *lastp = NULL;
1560 
1561 	ASSERT(mp && bp);
1562 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
1563 		if (tmp == bp) {
1564 			if (lastp)
1565 				lastp->b_cont = tmp->b_cont;
1566 			else
1567 				mp = tmp->b_cont;
1568 			tmp->b_cont = NULL;
1569 			return (mp);
1570 		}
1571 		lastp = tmp;
1572 	}
1573 	return ((mblk_t *)-1);
1574 }
1575 
1576 /*
1577  * Concatenate and align first len bytes of common
1578  * message type.  Len == -1, means concat everything.
1579  * Returns 1 on success, 0 on failure
1580  * After the pullup, mp points to the pulled up data.
1581  */
1582 int
1583 pullupmsg(mblk_t *mp, ssize_t len)
1584 {
1585 	mblk_t *bp, *b_cont;
1586 	dblk_t *dbp;
1587 	ssize_t n;
1588 
1589 	ASSERT(mp->b_datap->db_ref > 0);
1590 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
1591 
1592 	/*
1593 	 * We won't handle Multidata message, since it contains
1594 	 * metadata which this function has no knowledge of; we
1595 	 * assert on DEBUG, and return failure otherwise.
1596 	 */
1597 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1598 	if (mp->b_datap->db_type == M_MULTIDATA)
1599 		return (0);
1600 
1601 	if (len == -1) {
1602 		if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
1603 			return (1);
1604 		len = xmsgsize(mp);
1605 	} else {
1606 		ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
1607 		ASSERT(first_mblk_len >= 0);
1608 		/*
1609 		 * If the length is less than that of the first mblk,
1610 		 * we want to pull up the message into an aligned mblk.
1611 		 * Though not part of the spec, some callers assume it.
1612 		 */
1613 		if (len <= first_mblk_len) {
1614 			if (str_aligned(mp->b_rptr))
1615 				return (1);
1616 			len = first_mblk_len;
1617 		} else if (xmsgsize(mp) < len)
1618 			return (0);
1619 	}
1620 
1621 	if ((bp = allocb_tmpl(len, mp)) == NULL)
1622 		return (0);
1623 
1624 	dbp = bp->b_datap;
1625 	*bp = *mp;		/* swap mblks so bp heads the old msg... */
1626 	mp->b_datap = dbp;	/* ... and mp heads the new message */
1627 	mp->b_datap->db_mblk = mp;
1628 	bp->b_datap->db_mblk = bp;
1629 	mp->b_rptr = mp->b_wptr = dbp->db_base;
1630 
1631 	do {
1632 		ASSERT(bp->b_datap->db_ref > 0);
1633 		ASSERT(bp->b_wptr >= bp->b_rptr);
1634 		n = MIN(bp->b_wptr - bp->b_rptr, len);
1635 		ASSERT(n >= 0);		/* allow zero-length mblk_t's */
1636 		if (n > 0)
1637 			bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
1638 		mp->b_wptr += n;
1639 		bp->b_rptr += n;
1640 		len -= n;
1641 		if (bp->b_rptr != bp->b_wptr)
1642 			break;
1643 		b_cont = bp->b_cont;
1644 		freeb(bp);
1645 		bp = b_cont;
1646 	} while (len && bp);
1647 
1648 	mp->b_cont = bp;	/* tack on whatever wasn't pulled up */
1649 
1650 	return (1);
1651 }
1652 
1653 /*
1654  * Concatenate and align at least the first len bytes of common message
1655  * type.  Len == -1 means concatenate everything.  The original message is
1656  * unaltered.  Returns a pointer to a new message on success, otherwise
1657  * returns NULL.
1658  */
1659 mblk_t *
1660 msgpullup(mblk_t *mp, ssize_t len)
1661 {
1662 	mblk_t	*newmp;
1663 	ssize_t	totlen;
1664 	ssize_t	n;
1665 
1666 	/*
1667 	 * We won't handle Multidata message, since it contains
1668 	 * metadata which this function has no knowledge of; we
1669 	 * assert on DEBUG, and return failure otherwise.
1670 	 */
1671 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1672 	if (mp->b_datap->db_type == M_MULTIDATA)
1673 		return (NULL);
1674 
1675 	totlen = xmsgsize(mp);
1676 
1677 	if ((len > 0) && (len > totlen))
1678 		return (NULL);
1679 
1680 	/*
1681 	 * Copy all of the first msg type into one new mblk, then dupmsg
1682 	 * and link the rest onto this.
1683 	 */
1684 
1685 	len = totlen;
1686 
1687 	if ((newmp = allocb_tmpl(len, mp)) == NULL)
1688 		return (NULL);
1689 
1690 	newmp->b_flag = mp->b_flag;
1691 	newmp->b_band = mp->b_band;
1692 
1693 	while (len > 0) {
1694 		n = mp->b_wptr - mp->b_rptr;
1695 		ASSERT(n >= 0);		/* allow zero-length mblk_t's */
1696 		if (n > 0)
1697 			bcopy(mp->b_rptr, newmp->b_wptr, n);
1698 		newmp->b_wptr += n;
1699 		len -= n;
1700 		mp = mp->b_cont;
1701 	}
1702 
1703 	if (mp != NULL) {
1704 		newmp->b_cont = dupmsg(mp);
1705 		if (newmp->b_cont == NULL) {
1706 			freemsg(newmp);
1707 			return (NULL);
1708 		}
1709 	}
1710 
1711 	return (newmp);
1712 }
1713 
1714 /*
1715  * Trim bytes from message
1716  *  len > 0, trim from head
1717  *  len < 0, trim from tail
1718  * Returns 1 on success, 0 on failure.
1719  */
1720 int
1721 adjmsg(mblk_t *mp, ssize_t len)
1722 {
1723 	mblk_t *bp;
1724 	mblk_t *save_bp = NULL;
1725 	mblk_t *prev_bp;
1726 	mblk_t *bcont;
1727 	unsigned char type;
1728 	ssize_t n;
1729 	int fromhead;
1730 	int first;
1731 
1732 	ASSERT(mp != NULL);
1733 	/*
1734 	 * We won't handle Multidata message, since it contains
1735 	 * metadata which this function has no knowledge of; we
1736 	 * assert on DEBUG, and return failure otherwise.
1737 	 */
1738 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1739 	if (mp->b_datap->db_type == M_MULTIDATA)
1740 		return (0);
1741 
1742 	if (len < 0) {
1743 		fromhead = 0;
1744 		len = -len;
1745 	} else {
1746 		fromhead = 1;
1747 	}
1748 
1749 	if (xmsgsize(mp) < len)
1750 		return (0);
1751 
1752 	if (fromhead) {
1753 		first = 1;
1754 		while (len) {
1755 			ASSERT(mp->b_wptr >= mp->b_rptr);
1756 			n = MIN(mp->b_wptr - mp->b_rptr, len);
1757 			mp->b_rptr += n;
1758 			len -= n;
1759 
1760 			/*
1761 			 * If this is not the first zero length
1762 			 * message remove it
1763 			 */
1764 			if (!first && (mp->b_wptr == mp->b_rptr)) {
1765 				bcont = mp->b_cont;
1766 				freeb(mp);
1767 				mp = save_bp->b_cont = bcont;
1768 			} else {
1769 				save_bp = mp;
1770 				mp = mp->b_cont;
1771 			}
1772 			first = 0;
1773 		}
1774 	} else {
1775 		type = mp->b_datap->db_type;
1776 		while (len) {
1777 			bp = mp;
1778 			save_bp = NULL;
1779 
1780 			/*
1781 			 * Find the last message of same type
1782 			 */
1783 			while (bp && bp->b_datap->db_type == type) {
1784 				ASSERT(bp->b_wptr >= bp->b_rptr);
1785 				prev_bp = save_bp;
1786 				save_bp = bp;
1787 				bp = bp->b_cont;
1788 			}
1789 			if (save_bp == NULL)
1790 				break;
1791 			n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
1792 			save_bp->b_wptr -= n;
1793 			len -= n;
1794 
1795 			/*
1796 			 * If this is not the first message
1797 			 * and we have taken away everything
1798 			 * from this message, remove it
1799 			 */
1800 
1801 			if ((save_bp != mp) &&
1802 			    (save_bp->b_wptr == save_bp->b_rptr)) {
1803 				bcont = save_bp->b_cont;
1804 				freeb(save_bp);
1805 				prev_bp->b_cont = bcont;
1806 			}
1807 		}
1808 	}
1809 	return (1);
1810 }
1811 
1812 /*
1813  * get number of data bytes in message
1814  */
1815 size_t
1816 msgdsize(mblk_t *bp)
1817 {
1818 	size_t count = 0;
1819 
1820 	for (; bp; bp = bp->b_cont)
1821 		if (bp->b_datap->db_type == M_DATA) {
1822 			ASSERT(bp->b_wptr >= bp->b_rptr);
1823 			count += bp->b_wptr - bp->b_rptr;
1824 		}
1825 	return (count);
1826 }
1827 
1828 /*
1829  * Get a message off head of queue
1830  *
1831  * If queue has no buffers then mark queue
1832  * with QWANTR. (queue wants to be read by
1833  * someone when data becomes available)
1834  *
1835  * If there is something to take off then do so.
1836  * If queue falls below hi water mark turn off QFULL
1837  * flag.  Decrement weighted count of queue.
1838  * Also turn off QWANTR because queue is being read.
1839  *
1840  * The queue count is maintained on a per-band basis.
1841  * Priority band 0 (normal messages) uses q_count,
1842  * q_lowat, etc.  Non-zero priority bands use the
1843  * fields in their respective qband structures
1844  * (qb_count, qb_lowat, etc.)  All messages appear
1845  * on the same list, linked via their b_next pointers.
1846  * q_first is the head of the list.  q_count does
1847  * not reflect the size of all the messages on the
1848  * queue.  It only reflects those messages in the
1849  * normal band of flow.  The one exception to this
1850  * deals with high priority messages.  They are in
1851  * their own conceptual "band", but are accounted
1852  * against q_count.
1853  *
1854  * If queue count is below the lo water mark and QWANTW
1855  * is set, enable the closest backq which has a service
1856  * procedure and turn off the QWANTW flag.
1857  *
1858  * getq could be built on top of rmvq, but isn't because
1859  * of performance considerations.
1860  *
1861  * A note on the use of q_count and q_mblkcnt:
1862  *   q_count is the traditional byte count for messages that
1863  *   have been put on a queue.  Documentation tells us that
1864  *   we shouldn't rely on that count, but some drivers/modules
1865  *   do.  What was needed, however, is a mechanism to prevent
1866  *   runaway streams from consuming all of the resources,
1867  *   and particularly be able to flow control zero-length
1868  *   messages.  q_mblkcnt is used for this purpose.  It
1869  *   counts the number of mblk's that are being put on
1870  *   the queue.  The intention here, is that each mblk should
1871  *   contain one byte of data and, for the purpose of
1872  *   flow-control, logically does.  A queue will become
1873  *   full when EITHER of these values (q_count and q_mblkcnt)
1874  *   reach the highwater mark.  It will clear when BOTH
1875  *   of them drop below the highwater mark.  And it will
1876  *   backenable when BOTH of them drop below the lowwater
1877  *   mark.
1878  *   With this algorithm, a driver/module might be able
1879  *   to find a reasonably accurate q_count, and the
1880  *   framework can still try and limit resource usage.
1881  */
1882 mblk_t *
1883 getq(queue_t *q)
1884 {
1885 	mblk_t *bp;
1886 	uchar_t band = 0;
1887 
1888 	bp = getq_noenab(q, 0);
1889 	if (bp != NULL)
1890 		band = bp->b_band;
1891 
1892 	/*
1893 	 * Inlined from qbackenable().
1894 	 * Quick check without holding the lock.
1895 	 */
1896 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1897 		return (bp);
1898 
1899 	qbackenable(q, band);
1900 	return (bp);
1901 }
1902 
1903 /*
1904  * Calculate number of data bytes in a single data message block taking
1905  * multidata messages into account.
1906  */
1907 
1908 #define	ADD_MBLK_SIZE(mp, size)						\
1909 	if (DB_TYPE(mp) != M_MULTIDATA) {				\
1910 		(size) += MBLKL(mp);					\
1911 	} else {							\
1912 		uint_t	pinuse;						\
1913 									\
1914 		mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse);	\
1915 		(size) += pinuse;					\
1916 	}
1917 
1918 /*
1919  * Returns the number of bytes in a message (a message is defined as a
1920  * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we
1921  * also return the number of distinct mblks in the message.
1922  */
1923 int
1924 mp_cont_len(mblk_t *bp, int *mblkcnt)
1925 {
1926 	mblk_t	*mp;
1927 	int	mblks = 0;
1928 	int	bytes = 0;
1929 
1930 	for (mp = bp; mp != NULL; mp = mp->b_cont) {
1931 		ADD_MBLK_SIZE(mp, bytes);
1932 		mblks++;
1933 	}
1934 
1935 	if (mblkcnt != NULL)
1936 		*mblkcnt = mblks;
1937 
1938 	return (bytes);
1939 }
1940 
1941 /*
1942  * Like getq() but does not backenable.  This is used by the stream
1943  * head when a putback() is likely.  The caller must call qbackenable()
1944  * after it is done with accessing the queue.
1945  * The rbytes arguments to getq_noneab() allows callers to specify a
1946  * the maximum number of bytes to return. If the current amount on the
1947  * queue is less than this then the entire message will be returned.
1948  * A value of 0 returns the entire message and is equivalent to the old
1949  * default behaviour prior to the addition of the rbytes argument.
1950  */
1951 mblk_t *
1952 getq_noenab(queue_t *q, ssize_t rbytes)
1953 {
1954 	mblk_t *bp, *mp1;
1955 	mblk_t *mp2 = NULL;
1956 	qband_t *qbp;
1957 	kthread_id_t freezer;
1958 	int	bytecnt = 0, mblkcnt = 0;
1959 
1960 	/* freezestr should allow its caller to call getq/putq */
1961 	freezer = STREAM(q)->sd_freezer;
1962 	if (freezer == curthread) {
1963 		ASSERT(frozenstr(q));
1964 		ASSERT(MUTEX_HELD(QLOCK(q)));
1965 	} else
1966 		mutex_enter(QLOCK(q));
1967 
1968 	if ((bp = q->q_first) == 0) {
1969 		q->q_flag |= QWANTR;
1970 	} else {
1971 		/*
1972 		 * If the caller supplied a byte threshold and there is
1973 		 * more than this amount on the queue then break up the
1974 		 * the message appropriately.  We can only safely do
1975 		 * this for M_DATA messages.
1976 		 */
1977 		if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) &&
1978 		    (q->q_count > rbytes)) {
1979 			/*
1980 			 * Inline version of mp_cont_len() which terminates
1981 			 * when we meet or exceed rbytes.
1982 			 */
1983 			for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) {
1984 				mblkcnt++;
1985 				ADD_MBLK_SIZE(mp1, bytecnt);
1986 				if (bytecnt  >= rbytes)
1987 					break;
1988 			}
1989 			/*
1990 			 * We need to account for the following scenarios:
1991 			 *
1992 			 * 1) Too much data in the first message:
1993 			 *	mp1 will be the mblk which puts us over our
1994 			 *	byte limit.
1995 			 * 2) Not enough data in the first message:
1996 			 *	mp1 will be NULL.
1997 			 * 3) Exactly the right amount of data contained within
1998 			 *    whole mblks:
1999 			 *	mp1->b_cont will be where we break the message.
2000 			 */
2001 			if (bytecnt > rbytes) {
2002 				/*
2003 				 * Dup/copy mp1 and put what we don't need
2004 				 * back onto the queue. Adjust the read/write
2005 				 * and continuation pointers appropriately
2006 				 * and decrement the current mblk count to
2007 				 * reflect we are putting an mblk back onto
2008 				 * the queue.
2009 				 * When adjusting the message pointers, it's
2010 				 * OK to use the existing bytecnt and the
2011 				 * requested amount (rbytes) to calculate the
2012 				 * the new write offset (b_wptr) of what we
2013 				 * are taking. However, we  cannot use these
2014 				 * values when calculating the read offset of
2015 				 * the mblk we are putting back on the queue.
2016 				 * This is because the begining (b_rptr) of the
2017 				 * mblk represents some arbitrary point within
2018 				 * the message.
2019 				 * It's simplest to do this by advancing b_rptr
2020 				 * by the new length of mp1 as we don't have to
2021 				 * remember any intermediate state.
2022 				 */
2023 				ASSERT(mp1 != NULL);
2024 				mblkcnt--;
2025 				if ((mp2 = dupb(mp1)) == NULL &&
2026 				    (mp2 = copyb(mp1)) == NULL) {
2027 					bytecnt = mblkcnt = 0;
2028 					goto dup_failed;
2029 				}
2030 				mp2->b_cont = mp1->b_cont;
2031 				mp1->b_wptr -= bytecnt - rbytes;
2032 				mp2->b_rptr += mp1->b_wptr - mp1->b_rptr;
2033 				mp1->b_cont = NULL;
2034 				bytecnt = rbytes;
2035 			} else {
2036 				/*
2037 				 * Either there is not enough data in the first
2038 				 * message or there is no excess data to deal
2039 				 * with. If mp1 is NULL, we are taking the
2040 				 * whole message. No need to do anything.
2041 				 * Otherwise we assign mp1->b_cont to mp2 as
2042 				 * we will be putting this back onto the head of
2043 				 * the queue.
2044 				 */
2045 				if (mp1 != NULL) {
2046 					mp2 = mp1->b_cont;
2047 					mp1->b_cont = NULL;
2048 				}
2049 			}
2050 			/*
2051 			 * If mp2 is not NULL then we have part of the message
2052 			 * to put back onto the queue.
2053 			 */
2054 			if (mp2 != NULL) {
2055 				if ((mp2->b_next = bp->b_next) == NULL)
2056 					q->q_last = mp2;
2057 				else
2058 					bp->b_next->b_prev = mp2;
2059 				q->q_first = mp2;
2060 			} else {
2061 				if ((q->q_first = bp->b_next) == NULL)
2062 					q->q_last = NULL;
2063 				else
2064 					q->q_first->b_prev = NULL;
2065 			}
2066 		} else {
2067 			/*
2068 			 * Either no byte threshold was supplied, there is
2069 			 * not enough on the queue or we failed to
2070 			 * duplicate/copy a data block. In these cases we
2071 			 * just take the entire first message.
2072 			 */
2073 dup_failed:
2074 			bytecnt = mp_cont_len(bp, &mblkcnt);
2075 			if ((q->q_first = bp->b_next) == NULL)
2076 				q->q_last = NULL;
2077 			else
2078 				q->q_first->b_prev = NULL;
2079 		}
2080 		if (bp->b_band == 0) {
2081 			q->q_count -= bytecnt;
2082 			q->q_mblkcnt -= mblkcnt;
2083 			if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2084 			    (q->q_mblkcnt < q->q_hiwat))) {
2085 				q->q_flag &= ~QFULL;
2086 			}
2087 		} else {
2088 			int i;
2089 
2090 			ASSERT(bp->b_band <= q->q_nband);
2091 			ASSERT(q->q_bandp != NULL);
2092 			ASSERT(MUTEX_HELD(QLOCK(q)));
2093 			qbp = q->q_bandp;
2094 			i = bp->b_band;
2095 			while (--i > 0)
2096 				qbp = qbp->qb_next;
2097 			if (qbp->qb_first == qbp->qb_last) {
2098 				qbp->qb_first = NULL;
2099 				qbp->qb_last = NULL;
2100 			} else {
2101 				qbp->qb_first = bp->b_next;
2102 			}
2103 			qbp->qb_count -= bytecnt;
2104 			qbp->qb_mblkcnt -= mblkcnt;
2105 			if (qbp->qb_mblkcnt == 0 ||
2106 			    ((qbp->qb_count < qbp->qb_hiwat) &&
2107 			    (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2108 				qbp->qb_flag &= ~QB_FULL;
2109 			}
2110 		}
2111 		q->q_flag &= ~QWANTR;
2112 		bp->b_next = NULL;
2113 		bp->b_prev = NULL;
2114 	}
2115 	if (freezer != curthread)
2116 		mutex_exit(QLOCK(q));
2117 
2118 	STR_FTEVENT_MSG(bp, q, FTEV_GETQ, 0);
2119 
2120 	return (bp);
2121 }
2122 
2123 /*
2124  * Determine if a backenable is needed after removing a message in the
2125  * specified band.
2126  * NOTE: This routine assumes that something like getq_noenab() has been
2127  * already called.
2128  *
2129  * For the read side it is ok to hold sd_lock across calling this (and the
2130  * stream head often does).
2131  * But for the write side strwakeq might be invoked and it acquires sd_lock.
2132  */
2133 void
2134 qbackenable(queue_t *q, uchar_t band)
2135 {
2136 	int backenab = 0;
2137 	qband_t *qbp;
2138 	kthread_id_t freezer;
2139 
2140 	ASSERT(q);
2141 	ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
2142 
2143 	/*
2144 	 * Quick check without holding the lock.
2145 	 * OK since after getq() has lowered the q_count these flags
2146 	 * would not change unless either the qbackenable() is done by
2147 	 * another thread (which is ok) or the queue has gotten QFULL
2148 	 * in which case another backenable will take place when the queue
2149 	 * drops below q_lowat.
2150 	 */
2151 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
2152 		return;
2153 
2154 	/* freezestr should allow its caller to call getq/putq */
2155 	freezer = STREAM(q)->sd_freezer;
2156 	if (freezer == curthread) {
2157 		ASSERT(frozenstr(q));
2158 		ASSERT(MUTEX_HELD(QLOCK(q)));
2159 	} else
2160 		mutex_enter(QLOCK(q));
2161 
2162 	if (band == 0) {
2163 		if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
2164 		    q->q_mblkcnt < q->q_lowat)) {
2165 			backenab = q->q_flag & (QWANTW|QWANTWSYNC);
2166 		}
2167 	} else {
2168 		int i;
2169 
2170 		ASSERT((unsigned)band <= q->q_nband);
2171 		ASSERT(q->q_bandp != NULL);
2172 
2173 		qbp = q->q_bandp;
2174 		i = band;
2175 		while (--i > 0)
2176 			qbp = qbp->qb_next;
2177 
2178 		if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
2179 		    qbp->qb_mblkcnt < qbp->qb_lowat)) {
2180 			backenab = qbp->qb_flag & QB_WANTW;
2181 		}
2182 	}
2183 
2184 	if (backenab == 0) {
2185 		if (freezer != curthread)
2186 			mutex_exit(QLOCK(q));
2187 		return;
2188 	}
2189 
2190 	/* Have to drop the lock across strwakeq and backenable */
2191 	if (backenab & QWANTWSYNC)
2192 		q->q_flag &= ~QWANTWSYNC;
2193 	if (backenab & (QWANTW|QB_WANTW)) {
2194 		if (band != 0)
2195 			qbp->qb_flag &= ~QB_WANTW;
2196 		else {
2197 			q->q_flag &= ~QWANTW;
2198 		}
2199 	}
2200 
2201 	if (freezer != curthread)
2202 		mutex_exit(QLOCK(q));
2203 
2204 	if (backenab & QWANTWSYNC)
2205 		strwakeq(q, QWANTWSYNC);
2206 	if (backenab & (QWANTW|QB_WANTW))
2207 		backenable(q, band);
2208 }
2209 
2210 /*
2211  * Remove a message from a queue.  The queue count and other
2212  * flow control parameters are adjusted and the back queue
2213  * enabled if necessary.
2214  *
2215  * rmvq can be called with the stream frozen, but other utility functions
2216  * holding QLOCK, and by streams modules without any locks/frozen.
2217  */
2218 void
2219 rmvq(queue_t *q, mblk_t *mp)
2220 {
2221 	ASSERT(mp != NULL);
2222 
2223 	rmvq_noenab(q, mp);
2224 	if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
2225 		/*
2226 		 * qbackenable can handle a frozen stream but not a "random"
2227 		 * qlock being held. Drop lock across qbackenable.
2228 		 */
2229 		mutex_exit(QLOCK(q));
2230 		qbackenable(q, mp->b_band);
2231 		mutex_enter(QLOCK(q));
2232 	} else {
2233 		qbackenable(q, mp->b_band);
2234 	}
2235 }
2236 
2237 /*
2238  * Like rmvq() but without any backenabling.
2239  * This exists to handle SR_CONSOL_DATA in strrput().
2240  */
2241 void
2242 rmvq_noenab(queue_t *q, mblk_t *mp)
2243 {
2244 	int i;
2245 	qband_t *qbp = NULL;
2246 	kthread_id_t freezer;
2247 	int	bytecnt = 0, mblkcnt = 0;
2248 
2249 	freezer = STREAM(q)->sd_freezer;
2250 	if (freezer == curthread) {
2251 		ASSERT(frozenstr(q));
2252 		ASSERT(MUTEX_HELD(QLOCK(q)));
2253 	} else if (MUTEX_HELD(QLOCK(q))) {
2254 		/* Don't drop lock on exit */
2255 		freezer = curthread;
2256 	} else
2257 		mutex_enter(QLOCK(q));
2258 
2259 	ASSERT(mp->b_band <= q->q_nband);
2260 	if (mp->b_band != 0) {		/* Adjust band pointers */
2261 		ASSERT(q->q_bandp != NULL);
2262 		qbp = q->q_bandp;
2263 		i = mp->b_band;
2264 		while (--i > 0)
2265 			qbp = qbp->qb_next;
2266 		if (mp == qbp->qb_first) {
2267 			if (mp->b_next && mp->b_band == mp->b_next->b_band)
2268 				qbp->qb_first = mp->b_next;
2269 			else
2270 				qbp->qb_first = NULL;
2271 		}
2272 		if (mp == qbp->qb_last) {
2273 			if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
2274 				qbp->qb_last = mp->b_prev;
2275 			else
2276 				qbp->qb_last = NULL;
2277 		}
2278 	}
2279 
2280 	/*
2281 	 * Remove the message from the list.
2282 	 */
2283 	if (mp->b_prev)
2284 		mp->b_prev->b_next = mp->b_next;
2285 	else
2286 		q->q_first = mp->b_next;
2287 	if (mp->b_next)
2288 		mp->b_next->b_prev = mp->b_prev;
2289 	else
2290 		q->q_last = mp->b_prev;
2291 	mp->b_next = NULL;
2292 	mp->b_prev = NULL;
2293 
2294 	/* Get the size of the message for q_count accounting */
2295 	bytecnt = mp_cont_len(mp, &mblkcnt);
2296 
2297 	if (mp->b_band == 0) {		/* Perform q_count accounting */
2298 		q->q_count -= bytecnt;
2299 		q->q_mblkcnt -= mblkcnt;
2300 		if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2301 		    (q->q_mblkcnt < q->q_hiwat))) {
2302 			q->q_flag &= ~QFULL;
2303 		}
2304 	} else {			/* Perform qb_count accounting */
2305 		qbp->qb_count -= bytecnt;
2306 		qbp->qb_mblkcnt -= mblkcnt;
2307 		if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) &&
2308 		    (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2309 			qbp->qb_flag &= ~QB_FULL;
2310 		}
2311 	}
2312 	if (freezer != curthread)
2313 		mutex_exit(QLOCK(q));
2314 
2315 	STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, 0);
2316 }
2317 
2318 /*
2319  * Empty a queue.
2320  * If flag is set, remove all messages.  Otherwise, remove
2321  * only non-control messages.  If queue falls below its low
2322  * water mark, and QWANTW is set, enable the nearest upstream
2323  * service procedure.
2324  *
2325  * Historical note: when merging the M_FLUSH code in strrput with this
2326  * code one difference was discovered. flushq did not have a check
2327  * for q_lowat == 0 in the backenabling test.
2328  *
2329  * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
2330  * if one exists on the queue.
2331  */
2332 void
2333 flushq_common(queue_t *q, int flag, int pcproto_flag)
2334 {
2335 	mblk_t *mp, *nmp;
2336 	qband_t *qbp;
2337 	int backenab = 0;
2338 	unsigned char bpri;
2339 	unsigned char	qbf[NBAND];	/* band flushing backenable flags */
2340 
2341 	if (q->q_first == NULL)
2342 		return;
2343 
2344 	mutex_enter(QLOCK(q));
2345 	mp = q->q_first;
2346 	q->q_first = NULL;
2347 	q->q_last = NULL;
2348 	q->q_count = 0;
2349 	q->q_mblkcnt = 0;
2350 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2351 		qbp->qb_first = NULL;
2352 		qbp->qb_last = NULL;
2353 		qbp->qb_count = 0;
2354 		qbp->qb_mblkcnt = 0;
2355 		qbp->qb_flag &= ~QB_FULL;
2356 	}
2357 	q->q_flag &= ~QFULL;
2358 	mutex_exit(QLOCK(q));
2359 	while (mp) {
2360 		nmp = mp->b_next;
2361 		mp->b_next = mp->b_prev = NULL;
2362 
2363 		STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, 0);
2364 
2365 		if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
2366 			(void) putq(q, mp);
2367 		else if (flag || datamsg(mp->b_datap->db_type))
2368 			freemsg(mp);
2369 		else
2370 			(void) putq(q, mp);
2371 		mp = nmp;
2372 	}
2373 	bpri = 1;
2374 	mutex_enter(QLOCK(q));
2375 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2376 		if ((qbp->qb_flag & QB_WANTW) &&
2377 		    (((qbp->qb_count < qbp->qb_lowat) &&
2378 		    (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
2379 		    qbp->qb_lowat == 0)) {
2380 			qbp->qb_flag &= ~QB_WANTW;
2381 			backenab = 1;
2382 			qbf[bpri] = 1;
2383 		} else
2384 			qbf[bpri] = 0;
2385 		bpri++;
2386 	}
2387 	ASSERT(bpri == (unsigned char)(q->q_nband + 1));
2388 	if ((q->q_flag & QWANTW) &&
2389 	    (((q->q_count < q->q_lowat) &&
2390 	    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2391 		q->q_flag &= ~QWANTW;
2392 		backenab = 1;
2393 		qbf[0] = 1;
2394 	} else
2395 		qbf[0] = 0;
2396 
2397 	/*
2398 	 * If any band can now be written to, and there is a writer
2399 	 * for that band, then backenable the closest service procedure.
2400 	 */
2401 	if (backenab) {
2402 		mutex_exit(QLOCK(q));
2403 		for (bpri = q->q_nband; bpri != 0; bpri--)
2404 			if (qbf[bpri])
2405 				backenable(q, bpri);
2406 		if (qbf[0])
2407 			backenable(q, 0);
2408 	} else
2409 		mutex_exit(QLOCK(q));
2410 }
2411 
2412 /*
2413  * The real flushing takes place in flushq_common. This is done so that
2414  * a flag which specifies whether or not M_PCPROTO messages should be flushed
2415  * or not. Currently the only place that uses this flag is the stream head.
2416  */
2417 void
2418 flushq(queue_t *q, int flag)
2419 {
2420 	flushq_common(q, flag, 0);
2421 }
2422 
2423 /*
2424  * Flush the queue of messages of the given priority band.
2425  * There is some duplication of code between flushq and flushband.
2426  * This is because we want to optimize the code as much as possible.
2427  * The assumption is that there will be more messages in the normal
2428  * (priority 0) band than in any other.
2429  *
2430  * Historical note: when merging the M_FLUSH code in strrput with this
2431  * code one difference was discovered. flushband had an extra check for
2432  * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
2433  * case. That check does not match the man page for flushband and was not
2434  * in the strrput flush code hence it was removed.
2435  */
2436 void
2437 flushband(queue_t *q, unsigned char pri, int flag)
2438 {
2439 	mblk_t *mp;
2440 	mblk_t *nmp;
2441 	mblk_t *last;
2442 	qband_t *qbp;
2443 	int band;
2444 
2445 	ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
2446 	if (pri > q->q_nband) {
2447 		return;
2448 	}
2449 	mutex_enter(QLOCK(q));
2450 	if (pri == 0) {
2451 		mp = q->q_first;
2452 		q->q_first = NULL;
2453 		q->q_last = NULL;
2454 		q->q_count = 0;
2455 		q->q_mblkcnt = 0;
2456 		for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2457 			qbp->qb_first = NULL;
2458 			qbp->qb_last = NULL;
2459 			qbp->qb_count = 0;
2460 			qbp->qb_mblkcnt = 0;
2461 			qbp->qb_flag &= ~QB_FULL;
2462 		}
2463 		q->q_flag &= ~QFULL;
2464 		mutex_exit(QLOCK(q));
2465 		while (mp) {
2466 			nmp = mp->b_next;
2467 			mp->b_next = mp->b_prev = NULL;
2468 			if ((mp->b_band == 0) &&
2469 			    ((flag == FLUSHALL) ||
2470 			    datamsg(mp->b_datap->db_type)))
2471 				freemsg(mp);
2472 			else
2473 				(void) putq(q, mp);
2474 			mp = nmp;
2475 		}
2476 		mutex_enter(QLOCK(q));
2477 		if ((q->q_flag & QWANTW) &&
2478 		    (((q->q_count < q->q_lowat) &&
2479 		    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2480 			q->q_flag &= ~QWANTW;
2481 			mutex_exit(QLOCK(q));
2482 
2483 			backenable(q, pri);
2484 		} else
2485 			mutex_exit(QLOCK(q));
2486 	} else {	/* pri != 0 */
2487 		boolean_t flushed = B_FALSE;
2488 		band = pri;
2489 
2490 		ASSERT(MUTEX_HELD(QLOCK(q)));
2491 		qbp = q->q_bandp;
2492 		while (--band > 0)
2493 			qbp = qbp->qb_next;
2494 		mp = qbp->qb_first;
2495 		if (mp == NULL) {
2496 			mutex_exit(QLOCK(q));
2497 			return;
2498 		}
2499 		last = qbp->qb_last->b_next;
2500 		/*
2501 		 * rmvq_noenab() and freemsg() are called for each mblk that
2502 		 * meets the criteria.  The loop is executed until the last
2503 		 * mblk has been processed.
2504 		 */
2505 		while (mp != last) {
2506 			ASSERT(mp->b_band == pri);
2507 			nmp = mp->b_next;
2508 			if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
2509 				rmvq_noenab(q, mp);
2510 				freemsg(mp);
2511 				flushed = B_TRUE;
2512 			}
2513 			mp = nmp;
2514 		}
2515 		mutex_exit(QLOCK(q));
2516 
2517 		/*
2518 		 * If any mblk(s) has been freed, we know that qbackenable()
2519 		 * will need to be called.
2520 		 */
2521 		if (flushed)
2522 			qbackenable(q, pri);
2523 	}
2524 }
2525 
2526 /*
2527  * Return 1 if the queue is not full.  If the queue is full, return
2528  * 0 (may not put message) and set QWANTW flag (caller wants to write
2529  * to the queue).
2530  */
2531 int
2532 canput(queue_t *q)
2533 {
2534 	TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
2535 
2536 	/* this is for loopback transports, they should not do a canput */
2537 	ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
2538 
2539 	/* Find next forward module that has a service procedure */
2540 	q = q->q_nfsrv;
2541 
2542 	if (!(q->q_flag & QFULL)) {
2543 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2544 		return (1);
2545 	}
2546 	mutex_enter(QLOCK(q));
2547 	if (q->q_flag & QFULL) {
2548 		q->q_flag |= QWANTW;
2549 		mutex_exit(QLOCK(q));
2550 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
2551 		return (0);
2552 	}
2553 	mutex_exit(QLOCK(q));
2554 	TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2555 	return (1);
2556 }
2557 
2558 /*
2559  * This is the new canput for use with priority bands.  Return 1 if the
2560  * band is not full.  If the band is full, return 0 (may not put message)
2561  * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
2562  * write to the queue).
2563  */
2564 int
2565 bcanput(queue_t *q, unsigned char pri)
2566 {
2567 	qband_t *qbp;
2568 
2569 	TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
2570 	if (!q)
2571 		return (0);
2572 
2573 	/* Find next forward module that has a service procedure */
2574 	q = q->q_nfsrv;
2575 
2576 	mutex_enter(QLOCK(q));
2577 	if (pri == 0) {
2578 		if (q->q_flag & QFULL) {
2579 			q->q_flag |= QWANTW;
2580 			mutex_exit(QLOCK(q));
2581 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2582 			    "bcanput:%p %X %d", q, pri, 0);
2583 			return (0);
2584 		}
2585 	} else {	/* pri != 0 */
2586 		if (pri > q->q_nband) {
2587 			/*
2588 			 * No band exists yet, so return success.
2589 			 */
2590 			mutex_exit(QLOCK(q));
2591 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2592 			    "bcanput:%p %X %d", q, pri, 1);
2593 			return (1);
2594 		}
2595 		qbp = q->q_bandp;
2596 		while (--pri)
2597 			qbp = qbp->qb_next;
2598 		if (qbp->qb_flag & QB_FULL) {
2599 			qbp->qb_flag |= QB_WANTW;
2600 			mutex_exit(QLOCK(q));
2601 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2602 			    "bcanput:%p %X %d", q, pri, 0);
2603 			return (0);
2604 		}
2605 	}
2606 	mutex_exit(QLOCK(q));
2607 	TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2608 	    "bcanput:%p %X %d", q, pri, 1);
2609 	return (1);
2610 }
2611 
2612 /*
2613  * Put a message on a queue.
2614  *
2615  * Messages are enqueued on a priority basis.  The priority classes
2616  * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
2617  * and B_NORMAL (type < QPCTL && band == 0).
2618  *
2619  * Add appropriate weighted data block sizes to queue count.
2620  * If queue hits high water mark then set QFULL flag.
2621  *
2622  * If QNOENAB is not set (putq is allowed to enable the queue),
2623  * enable the queue only if the message is PRIORITY,
2624  * or the QWANTR flag is set (indicating that the service procedure
2625  * is ready to read the queue.  This implies that a service
2626  * procedure must NEVER put a high priority message back on its own
2627  * queue, as this would result in an infinite loop (!).
2628  */
2629 int
2630 putq(queue_t *q, mblk_t *bp)
2631 {
2632 	mblk_t *tmp;
2633 	qband_t *qbp = NULL;
2634 	int mcls = (int)queclass(bp);
2635 	kthread_id_t freezer;
2636 	int	bytecnt = 0, mblkcnt = 0;
2637 
2638 	freezer = STREAM(q)->sd_freezer;
2639 	if (freezer == curthread) {
2640 		ASSERT(frozenstr(q));
2641 		ASSERT(MUTEX_HELD(QLOCK(q)));
2642 	} else
2643 		mutex_enter(QLOCK(q));
2644 
2645 	/*
2646 	 * Make sanity checks and if qband structure is not yet
2647 	 * allocated, do so.
2648 	 */
2649 	if (mcls == QPCTL) {
2650 		if (bp->b_band != 0)
2651 			bp->b_band = 0;		/* force to be correct */
2652 	} else if (bp->b_band != 0) {
2653 		int i;
2654 		qband_t **qbpp;
2655 
2656 		if (bp->b_band > q->q_nband) {
2657 
2658 			/*
2659 			 * The qband structure for this priority band is
2660 			 * not on the queue yet, so we have to allocate
2661 			 * one on the fly.  It would be wasteful to
2662 			 * associate the qband structures with every
2663 			 * queue when the queues are allocated.  This is
2664 			 * because most queues will only need the normal
2665 			 * band of flow which can be described entirely
2666 			 * by the queue itself.
2667 			 */
2668 			qbpp = &q->q_bandp;
2669 			while (*qbpp)
2670 				qbpp = &(*qbpp)->qb_next;
2671 			while (bp->b_band > q->q_nband) {
2672 				if ((*qbpp = allocband()) == NULL) {
2673 					if (freezer != curthread)
2674 						mutex_exit(QLOCK(q));
2675 					return (0);
2676 				}
2677 				(*qbpp)->qb_hiwat = q->q_hiwat;
2678 				(*qbpp)->qb_lowat = q->q_lowat;
2679 				q->q_nband++;
2680 				qbpp = &(*qbpp)->qb_next;
2681 			}
2682 		}
2683 		ASSERT(MUTEX_HELD(QLOCK(q)));
2684 		qbp = q->q_bandp;
2685 		i = bp->b_band;
2686 		while (--i)
2687 			qbp = qbp->qb_next;
2688 	}
2689 
2690 	/*
2691 	 * If queue is empty, add the message and initialize the pointers.
2692 	 * Otherwise, adjust message pointers and queue pointers based on
2693 	 * the type of the message and where it belongs on the queue.  Some
2694 	 * code is duplicated to minimize the number of conditionals and
2695 	 * hopefully minimize the amount of time this routine takes.
2696 	 */
2697 	if (!q->q_first) {
2698 		bp->b_next = NULL;
2699 		bp->b_prev = NULL;
2700 		q->q_first = bp;
2701 		q->q_last = bp;
2702 		if (qbp) {
2703 			qbp->qb_first = bp;
2704 			qbp->qb_last = bp;
2705 		}
2706 	} else if (!qbp) {	/* bp->b_band == 0 */
2707 
2708 		/*
2709 		 * If queue class of message is less than or equal to
2710 		 * that of the last one on the queue, tack on to the end.
2711 		 */
2712 		tmp = q->q_last;
2713 		if (mcls <= (int)queclass(tmp)) {
2714 			bp->b_next = NULL;
2715 			bp->b_prev = tmp;
2716 			tmp->b_next = bp;
2717 			q->q_last = bp;
2718 		} else {
2719 			tmp = q->q_first;
2720 			while ((int)queclass(tmp) >= mcls)
2721 				tmp = tmp->b_next;
2722 
2723 			/*
2724 			 * Insert bp before tmp.
2725 			 */
2726 			bp->b_next = tmp;
2727 			bp->b_prev = tmp->b_prev;
2728 			if (tmp->b_prev)
2729 				tmp->b_prev->b_next = bp;
2730 			else
2731 				q->q_first = bp;
2732 			tmp->b_prev = bp;
2733 		}
2734 	} else {		/* bp->b_band != 0 */
2735 		if (qbp->qb_first) {
2736 			tmp = qbp->qb_last;
2737 
2738 			/*
2739 			 * Insert bp after the last message in this band.
2740 			 */
2741 			bp->b_next = tmp->b_next;
2742 			if (tmp->b_next)
2743 				tmp->b_next->b_prev = bp;
2744 			else
2745 				q->q_last = bp;
2746 			bp->b_prev = tmp;
2747 			tmp->b_next = bp;
2748 		} else {
2749 			tmp = q->q_last;
2750 			if ((mcls < (int)queclass(tmp)) ||
2751 			    (bp->b_band <= tmp->b_band)) {
2752 
2753 				/*
2754 				 * Tack bp on end of queue.
2755 				 */
2756 				bp->b_next = NULL;
2757 				bp->b_prev = tmp;
2758 				tmp->b_next = bp;
2759 				q->q_last = bp;
2760 			} else {
2761 				tmp = q->q_first;
2762 				while (tmp->b_datap->db_type >= QPCTL)
2763 					tmp = tmp->b_next;
2764 				while (tmp->b_band >= bp->b_band)
2765 					tmp = tmp->b_next;
2766 
2767 				/*
2768 				 * Insert bp before tmp.
2769 				 */
2770 				bp->b_next = tmp;
2771 				bp->b_prev = tmp->b_prev;
2772 				if (tmp->b_prev)
2773 					tmp->b_prev->b_next = bp;
2774 				else
2775 					q->q_first = bp;
2776 				tmp->b_prev = bp;
2777 			}
2778 			qbp->qb_first = bp;
2779 		}
2780 		qbp->qb_last = bp;
2781 	}
2782 
2783 	/* Get message byte count for q_count accounting */
2784 	bytecnt = mp_cont_len(bp, &mblkcnt);
2785 
2786 	if (qbp) {
2787 		qbp->qb_count += bytecnt;
2788 		qbp->qb_mblkcnt += mblkcnt;
2789 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2790 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2791 			qbp->qb_flag |= QB_FULL;
2792 		}
2793 	} else {
2794 		q->q_count += bytecnt;
2795 		q->q_mblkcnt += mblkcnt;
2796 		if ((q->q_count >= q->q_hiwat) ||
2797 		    (q->q_mblkcnt >= q->q_hiwat)) {
2798 			q->q_flag |= QFULL;
2799 		}
2800 	}
2801 
2802 	STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, 0);
2803 
2804 	if ((mcls > QNORM) ||
2805 	    (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
2806 		qenable_locked(q);
2807 	ASSERT(MUTEX_HELD(QLOCK(q)));
2808 	if (freezer != curthread)
2809 		mutex_exit(QLOCK(q));
2810 
2811 	return (1);
2812 }
2813 
2814 /*
2815  * Put stuff back at beginning of Q according to priority order.
2816  * See comment on putq above for details.
2817  */
2818 int
2819 putbq(queue_t *q, mblk_t *bp)
2820 {
2821 	mblk_t *tmp;
2822 	qband_t *qbp = NULL;
2823 	int mcls = (int)queclass(bp);
2824 	kthread_id_t freezer;
2825 	int	bytecnt = 0, mblkcnt = 0;
2826 
2827 	ASSERT(q && bp);
2828 	ASSERT(bp->b_next == NULL);
2829 	freezer = STREAM(q)->sd_freezer;
2830 	if (freezer == curthread) {
2831 		ASSERT(frozenstr(q));
2832 		ASSERT(MUTEX_HELD(QLOCK(q)));
2833 	} else
2834 		mutex_enter(QLOCK(q));
2835 
2836 	/*
2837 	 * Make sanity checks and if qband structure is not yet
2838 	 * allocated, do so.
2839 	 */
2840 	if (mcls == QPCTL) {
2841 		if (bp->b_band != 0)
2842 			bp->b_band = 0;		/* force to be correct */
2843 	} else if (bp->b_band != 0) {
2844 		int i;
2845 		qband_t **qbpp;
2846 
2847 		if (bp->b_band > q->q_nband) {
2848 			qbpp = &q->q_bandp;
2849 			while (*qbpp)
2850 				qbpp = &(*qbpp)->qb_next;
2851 			while (bp->b_band > q->q_nband) {
2852 				if ((*qbpp = allocband()) == NULL) {
2853 					if (freezer != curthread)
2854 						mutex_exit(QLOCK(q));
2855 					return (0);
2856 				}
2857 				(*qbpp)->qb_hiwat = q->q_hiwat;
2858 				(*qbpp)->qb_lowat = q->q_lowat;
2859 				q->q_nband++;
2860 				qbpp = &(*qbpp)->qb_next;
2861 			}
2862 		}
2863 		qbp = q->q_bandp;
2864 		i = bp->b_band;
2865 		while (--i)
2866 			qbp = qbp->qb_next;
2867 	}
2868 
2869 	/*
2870 	 * If queue is empty or if message is high priority,
2871 	 * place on the front of the queue.
2872 	 */
2873 	tmp = q->q_first;
2874 	if ((!tmp) || (mcls == QPCTL)) {
2875 		bp->b_next = tmp;
2876 		if (tmp)
2877 			tmp->b_prev = bp;
2878 		else
2879 			q->q_last = bp;
2880 		q->q_first = bp;
2881 		bp->b_prev = NULL;
2882 		if (qbp) {
2883 			qbp->qb_first = bp;
2884 			qbp->qb_last = bp;
2885 		}
2886 	} else if (qbp) {	/* bp->b_band != 0 */
2887 		tmp = qbp->qb_first;
2888 		if (tmp) {
2889 
2890 			/*
2891 			 * Insert bp before the first message in this band.
2892 			 */
2893 			bp->b_next = tmp;
2894 			bp->b_prev = tmp->b_prev;
2895 			if (tmp->b_prev)
2896 				tmp->b_prev->b_next = bp;
2897 			else
2898 				q->q_first = bp;
2899 			tmp->b_prev = bp;
2900 		} else {
2901 			tmp = q->q_last;
2902 			if ((mcls < (int)queclass(tmp)) ||
2903 			    (bp->b_band < tmp->b_band)) {
2904 
2905 				/*
2906 				 * Tack bp on end of queue.
2907 				 */
2908 				bp->b_next = NULL;
2909 				bp->b_prev = tmp;
2910 				tmp->b_next = bp;
2911 				q->q_last = bp;
2912 			} else {
2913 				tmp = q->q_first;
2914 				while (tmp->b_datap->db_type >= QPCTL)
2915 					tmp = tmp->b_next;
2916 				while (tmp->b_band > bp->b_band)
2917 					tmp = tmp->b_next;
2918 
2919 				/*
2920 				 * Insert bp before tmp.
2921 				 */
2922 				bp->b_next = tmp;
2923 				bp->b_prev = tmp->b_prev;
2924 				if (tmp->b_prev)
2925 					tmp->b_prev->b_next = bp;
2926 				else
2927 					q->q_first = bp;
2928 				tmp->b_prev = bp;
2929 			}
2930 			qbp->qb_last = bp;
2931 		}
2932 		qbp->qb_first = bp;
2933 	} else {		/* bp->b_band == 0 && !QPCTL */
2934 
2935 		/*
2936 		 * If the queue class or band is less than that of the last
2937 		 * message on the queue, tack bp on the end of the queue.
2938 		 */
2939 		tmp = q->q_last;
2940 		if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
2941 			bp->b_next = NULL;
2942 			bp->b_prev = tmp;
2943 			tmp->b_next = bp;
2944 			q->q_last = bp;
2945 		} else {
2946 			tmp = q->q_first;
2947 			while (tmp->b_datap->db_type >= QPCTL)
2948 				tmp = tmp->b_next;
2949 			while (tmp->b_band > bp->b_band)
2950 				tmp = tmp->b_next;
2951 
2952 			/*
2953 			 * Insert bp before tmp.
2954 			 */
2955 			bp->b_next = tmp;
2956 			bp->b_prev = tmp->b_prev;
2957 			if (tmp->b_prev)
2958 				tmp->b_prev->b_next = bp;
2959 			else
2960 				q->q_first = bp;
2961 			tmp->b_prev = bp;
2962 		}
2963 	}
2964 
2965 	/* Get message byte count for q_count accounting */
2966 	bytecnt = mp_cont_len(bp, &mblkcnt);
2967 
2968 	if (qbp) {
2969 		qbp->qb_count += bytecnt;
2970 		qbp->qb_mblkcnt += mblkcnt;
2971 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2972 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2973 			qbp->qb_flag |= QB_FULL;
2974 		}
2975 	} else {
2976 		q->q_count += bytecnt;
2977 		q->q_mblkcnt += mblkcnt;
2978 		if ((q->q_count >= q->q_hiwat) ||
2979 		    (q->q_mblkcnt >= q->q_hiwat)) {
2980 			q->q_flag |= QFULL;
2981 		}
2982 	}
2983 
2984 	STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, 0);
2985 
2986 	if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
2987 		qenable_locked(q);
2988 	ASSERT(MUTEX_HELD(QLOCK(q)));
2989 	if (freezer != curthread)
2990 		mutex_exit(QLOCK(q));
2991 
2992 	return (1);
2993 }
2994 
2995 /*
2996  * Insert a message before an existing message on the queue.  If the
2997  * existing message is NULL, the new messages is placed on the end of
2998  * the queue.  The queue class of the new message is ignored.  However,
2999  * the priority band of the new message must adhere to the following
3000  * ordering:
3001  *
3002  *	emp->b_prev->b_band >= mp->b_band >= emp->b_band.
3003  *
3004  * All flow control parameters are updated.
3005  *
3006  * insq can be called with the stream frozen, but other utility functions
3007  * holding QLOCK, and by streams modules without any locks/frozen.
3008  */
3009 int
3010 insq(queue_t *q, mblk_t *emp, mblk_t *mp)
3011 {
3012 	mblk_t *tmp;
3013 	qband_t *qbp = NULL;
3014 	int mcls = (int)queclass(mp);
3015 	kthread_id_t freezer;
3016 	int	bytecnt = 0, mblkcnt = 0;
3017 
3018 	freezer = STREAM(q)->sd_freezer;
3019 	if (freezer == curthread) {
3020 		ASSERT(frozenstr(q));
3021 		ASSERT(MUTEX_HELD(QLOCK(q)));
3022 	} else if (MUTEX_HELD(QLOCK(q))) {
3023 		/* Don't drop lock on exit */
3024 		freezer = curthread;
3025 	} else
3026 		mutex_enter(QLOCK(q));
3027 
3028 	if (mcls == QPCTL) {
3029 		if (mp->b_band != 0)
3030 			mp->b_band = 0;		/* force to be correct */
3031 		if (emp && emp->b_prev &&
3032 		    (emp->b_prev->b_datap->db_type < QPCTL))
3033 			goto badord;
3034 	}
3035 	if (emp) {
3036 		if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
3037 		    (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
3038 		    (emp->b_prev->b_band < mp->b_band))) {
3039 			goto badord;
3040 		}
3041 	} else {
3042 		tmp = q->q_last;
3043 		if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
3044 badord:
3045 			cmn_err(CE_WARN,
3046 			    "insq: attempt to insert message out of order "
3047 			    "on q %p", (void *)q);
3048 			if (freezer != curthread)
3049 				mutex_exit(QLOCK(q));
3050 			return (0);
3051 		}
3052 	}
3053 
3054 	if (mp->b_band != 0) {
3055 		int i;
3056 		qband_t **qbpp;
3057 
3058 		if (mp->b_band > q->q_nband) {
3059 			qbpp = &q->q_bandp;
3060 			while (*qbpp)
3061 				qbpp = &(*qbpp)->qb_next;
3062 			while (mp->b_band > q->q_nband) {
3063 				if ((*qbpp = allocband()) == NULL) {
3064 					if (freezer != curthread)
3065 						mutex_exit(QLOCK(q));
3066 					return (0);
3067 				}
3068 				(*qbpp)->qb_hiwat = q->q_hiwat;
3069 				(*qbpp)->qb_lowat = q->q_lowat;
3070 				q->q_nband++;
3071 				qbpp = &(*qbpp)->qb_next;
3072 			}
3073 		}
3074 		qbp = q->q_bandp;
3075 		i = mp->b_band;
3076 		while (--i)
3077 			qbp = qbp->qb_next;
3078 	}
3079 
3080 	if ((mp->b_next = emp) != NULL) {
3081 		if ((mp->b_prev = emp->b_prev) != NULL)
3082 			emp->b_prev->b_next = mp;
3083 		else
3084 			q->q_first = mp;
3085 		emp->b_prev = mp;
3086 	} else {
3087 		if ((mp->b_prev = q->q_last) != NULL)
3088 			q->q_last->b_next = mp;
3089 		else
3090 			q->q_first = mp;
3091 		q->q_last = mp;
3092 	}
3093 
3094 	/* Get mblk and byte count for q_count accounting */
3095 	bytecnt = mp_cont_len(mp, &mblkcnt);
3096 
3097 	if (qbp) {	/* adjust qband pointers and count */
3098 		if (!qbp->qb_first) {
3099 			qbp->qb_first = mp;
3100 			qbp->qb_last = mp;
3101 		} else {
3102 			if (mp->b_prev == NULL || (mp->b_prev != NULL &&
3103 			    (mp->b_prev->b_band != mp->b_band)))
3104 				qbp->qb_first = mp;
3105 			else if (mp->b_next == NULL || (mp->b_next != NULL &&
3106 			    (mp->b_next->b_band != mp->b_band)))
3107 				qbp->qb_last = mp;
3108 		}
3109 		qbp->qb_count += bytecnt;
3110 		qbp->qb_mblkcnt += mblkcnt;
3111 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
3112 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
3113 			qbp->qb_flag |= QB_FULL;
3114 		}
3115 	} else {
3116 		q->q_count += bytecnt;
3117 		q->q_mblkcnt += mblkcnt;
3118 		if ((q->q_count >= q->q_hiwat) ||
3119 		    (q->q_mblkcnt >= q->q_hiwat)) {
3120 			q->q_flag |= QFULL;
3121 		}
3122 	}
3123 
3124 	STR_FTEVENT_MSG(mp, q, FTEV_INSQ, 0);
3125 
3126 	if (canenable(q) && (q->q_flag & QWANTR))
3127 		qenable_locked(q);
3128 
3129 	ASSERT(MUTEX_HELD(QLOCK(q)));
3130 	if (freezer != curthread)
3131 		mutex_exit(QLOCK(q));
3132 
3133 	return (1);
3134 }
3135 
3136 /*
3137  * Create and put a control message on queue.
3138  */
3139 int
3140 putctl(queue_t *q, int type)
3141 {
3142 	mblk_t *bp;
3143 
3144 	if ((datamsg(type) && (type != M_DELAY)) ||
3145 	    (bp = allocb_tryhard(0)) == NULL)
3146 		return (0);
3147 	bp->b_datap->db_type = (unsigned char) type;
3148 
3149 	put(q, bp);
3150 
3151 	return (1);
3152 }
3153 
3154 /*
3155  * Control message with a single-byte parameter
3156  */
3157 int
3158 putctl1(queue_t *q, int type, int param)
3159 {
3160 	mblk_t *bp;
3161 
3162 	if ((datamsg(type) && (type != M_DELAY)) ||
3163 	    (bp = allocb_tryhard(1)) == NULL)
3164 		return (0);
3165 	bp->b_datap->db_type = (unsigned char)type;
3166 	*bp->b_wptr++ = (unsigned char)param;
3167 
3168 	put(q, bp);
3169 
3170 	return (1);
3171 }
3172 
3173 int
3174 putnextctl1(queue_t *q, int type, int param)
3175 {
3176 	mblk_t *bp;
3177 
3178 	if ((datamsg(type) && (type != M_DELAY)) ||
3179 	    ((bp = allocb_tryhard(1)) == NULL))
3180 		return (0);
3181 
3182 	bp->b_datap->db_type = (unsigned char)type;
3183 	*bp->b_wptr++ = (unsigned char)param;
3184 
3185 	putnext(q, bp);
3186 
3187 	return (1);
3188 }
3189 
3190 int
3191 putnextctl(queue_t *q, int type)
3192 {
3193 	mblk_t *bp;
3194 
3195 	if ((datamsg(type) && (type != M_DELAY)) ||
3196 	    ((bp = allocb_tryhard(0)) == NULL))
3197 		return (0);
3198 	bp->b_datap->db_type = (unsigned char)type;
3199 
3200 	putnext(q, bp);
3201 
3202 	return (1);
3203 }
3204 
3205 /*
3206  * Return the queue upstream from this one
3207  */
3208 queue_t *
3209 backq(queue_t *q)
3210 {
3211 	q = _OTHERQ(q);
3212 	if (q->q_next) {
3213 		q = q->q_next;
3214 		return (_OTHERQ(q));
3215 	}
3216 	return (NULL);
3217 }
3218 
3219 /*
3220  * Send a block back up the queue in reverse from this
3221  * one (e.g. to respond to ioctls)
3222  */
3223 void
3224 qreply(queue_t *q, mblk_t *bp)
3225 {
3226 	ASSERT(q && bp);
3227 
3228 	putnext(_OTHERQ(q), bp);
3229 }
3230 
3231 /*
3232  * Streams Queue Scheduling
3233  *
3234  * Queues are enabled through qenable() when they have messages to
3235  * process.  They are serviced by queuerun(), which runs each enabled
3236  * queue's service procedure.  The call to queuerun() is processor
3237  * dependent - the general principle is that it be run whenever a queue
3238  * is enabled but before returning to user level.  For system calls,
3239  * the function runqueues() is called if their action causes a queue
3240  * to be enabled.  For device interrupts, queuerun() should be
3241  * called before returning from the last level of interrupt.  Beyond
3242  * this, no timing assumptions should be made about queue scheduling.
3243  */
3244 
3245 /*
3246  * Enable a queue: put it on list of those whose service procedures are
3247  * ready to run and set up the scheduling mechanism.
3248  * The broadcast is done outside the mutex -> to avoid the woken thread
3249  * from contending with the mutex. This is OK 'cos the queue has been
3250  * enqueued on the runlist and flagged safely at this point.
3251  */
3252 void
3253 qenable(queue_t *q)
3254 {
3255 	mutex_enter(QLOCK(q));
3256 	qenable_locked(q);
3257 	mutex_exit(QLOCK(q));
3258 }
3259 /*
3260  * Return number of messages on queue
3261  */
3262 int
3263 qsize(queue_t *qp)
3264 {
3265 	int count = 0;
3266 	mblk_t *mp;
3267 
3268 	mutex_enter(QLOCK(qp));
3269 	for (mp = qp->q_first; mp; mp = mp->b_next)
3270 		count++;
3271 	mutex_exit(QLOCK(qp));
3272 	return (count);
3273 }
3274 
3275 /*
3276  * noenable - set queue so that putq() will not enable it.
3277  * enableok - set queue so that putq() can enable it.
3278  */
3279 void
3280 noenable(queue_t *q)
3281 {
3282 	mutex_enter(QLOCK(q));
3283 	q->q_flag |= QNOENB;
3284 	mutex_exit(QLOCK(q));
3285 }
3286 
3287 void
3288 enableok(queue_t *q)
3289 {
3290 	mutex_enter(QLOCK(q));
3291 	q->q_flag &= ~QNOENB;
3292 	mutex_exit(QLOCK(q));
3293 }
3294 
3295 /*
3296  * Set queue fields.
3297  */
3298 int
3299 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
3300 {
3301 	qband_t *qbp = NULL;
3302 	queue_t	*wrq;
3303 	int error = 0;
3304 	kthread_id_t freezer;
3305 
3306 	freezer = STREAM(q)->sd_freezer;
3307 	if (freezer == curthread) {
3308 		ASSERT(frozenstr(q));
3309 		ASSERT(MUTEX_HELD(QLOCK(q)));
3310 	} else
3311 		mutex_enter(QLOCK(q));
3312 
3313 	if (what >= QBAD) {
3314 		error = EINVAL;
3315 		goto done;
3316 	}
3317 	if (pri != 0) {
3318 		int i;
3319 		qband_t **qbpp;
3320 
3321 		if (pri > q->q_nband) {
3322 			qbpp = &q->q_bandp;
3323 			while (*qbpp)
3324 				qbpp = &(*qbpp)->qb_next;
3325 			while (pri > q->q_nband) {
3326 				if ((*qbpp = allocband()) == NULL) {
3327 					error = EAGAIN;
3328 					goto done;
3329 				}
3330 				(*qbpp)->qb_hiwat = q->q_hiwat;
3331 				(*qbpp)->qb_lowat = q->q_lowat;
3332 				q->q_nband++;
3333 				qbpp = &(*qbpp)->qb_next;
3334 			}
3335 		}
3336 		qbp = q->q_bandp;
3337 		i = pri;
3338 		while (--i)
3339 			qbp = qbp->qb_next;
3340 	}
3341 	switch (what) {
3342 
3343 	case QHIWAT:
3344 		if (qbp)
3345 			qbp->qb_hiwat = (size_t)val;
3346 		else
3347 			q->q_hiwat = (size_t)val;
3348 		break;
3349 
3350 	case QLOWAT:
3351 		if (qbp)
3352 			qbp->qb_lowat = (size_t)val;
3353 		else
3354 			q->q_lowat = (size_t)val;
3355 		break;
3356 
3357 	case QMAXPSZ:
3358 		if (qbp)
3359 			error = EINVAL;
3360 		else
3361 			q->q_maxpsz = (ssize_t)val;
3362 
3363 		/*
3364 		 * Performance concern, strwrite looks at the module below
3365 		 * the stream head for the maxpsz each time it does a write
3366 		 * we now cache it at the stream head.  Check to see if this
3367 		 * queue is sitting directly below the stream head.
3368 		 */
3369 		wrq = STREAM(q)->sd_wrq;
3370 		if (q != wrq->q_next)
3371 			break;
3372 
3373 		/*
3374 		 * If the stream is not frozen drop the current QLOCK and
3375 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3376 		 */
3377 		if (freezer != curthread) {
3378 			mutex_exit(QLOCK(q));
3379 			mutex_enter(QLOCK(wrq));
3380 		}
3381 		ASSERT(MUTEX_HELD(QLOCK(wrq)));
3382 
3383 		if (strmsgsz != 0) {
3384 			if (val == INFPSZ)
3385 				val = strmsgsz;
3386 			else  {
3387 				if (STREAM(q)->sd_vnode->v_type == VFIFO)
3388 					val = MIN(PIPE_BUF, val);
3389 				else
3390 					val = MIN(strmsgsz, val);
3391 			}
3392 		}
3393 		STREAM(q)->sd_qn_maxpsz = val;
3394 		if (freezer != curthread) {
3395 			mutex_exit(QLOCK(wrq));
3396 			mutex_enter(QLOCK(q));
3397 		}
3398 		break;
3399 
3400 	case QMINPSZ:
3401 		if (qbp)
3402 			error = EINVAL;
3403 		else
3404 			q->q_minpsz = (ssize_t)val;
3405 
3406 		/*
3407 		 * Performance concern, strwrite looks at the module below
3408 		 * the stream head for the maxpsz each time it does a write
3409 		 * we now cache it at the stream head.  Check to see if this
3410 		 * queue is sitting directly below the stream head.
3411 		 */
3412 		wrq = STREAM(q)->sd_wrq;
3413 		if (q != wrq->q_next)
3414 			break;
3415 
3416 		/*
3417 		 * If the stream is not frozen drop the current QLOCK and
3418 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3419 		 */
3420 		if (freezer != curthread) {
3421 			mutex_exit(QLOCK(q));
3422 			mutex_enter(QLOCK(wrq));
3423 		}
3424 		STREAM(q)->sd_qn_minpsz = (ssize_t)val;
3425 
3426 		if (freezer != curthread) {
3427 			mutex_exit(QLOCK(wrq));
3428 			mutex_enter(QLOCK(q));
3429 		}
3430 		break;
3431 
3432 	case QSTRUIOT:
3433 		if (qbp)
3434 			error = EINVAL;
3435 		else
3436 			q->q_struiot = (ushort_t)val;
3437 		break;
3438 
3439 	case QCOUNT:
3440 	case QFIRST:
3441 	case QLAST:
3442 	case QFLAG:
3443 		error = EPERM;
3444 		break;
3445 
3446 	default:
3447 		error = EINVAL;
3448 		break;
3449 	}
3450 done:
3451 	if (freezer != curthread)
3452 		mutex_exit(QLOCK(q));
3453 	return (error);
3454 }
3455 
3456 /*
3457  * Get queue fields.
3458  */
3459 int
3460 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
3461 {
3462 	qband_t		*qbp = NULL;
3463 	int		error = 0;
3464 	kthread_id_t	freezer;
3465 
3466 	freezer = STREAM(q)->sd_freezer;
3467 	if (freezer == curthread) {
3468 		ASSERT(frozenstr(q));
3469 		ASSERT(MUTEX_HELD(QLOCK(q)));
3470 	} else
3471 		mutex_enter(QLOCK(q));
3472 	if (what >= QBAD) {
3473 		error = EINVAL;
3474 		goto done;
3475 	}
3476 	if (pri != 0) {
3477 		int i;
3478 		qband_t **qbpp;
3479 
3480 		if (pri > q->q_nband) {
3481 			qbpp = &q->q_bandp;
3482 			while (*qbpp)
3483 				qbpp = &(*qbpp)->qb_next;
3484 			while (pri > q->q_nband) {
3485 				if ((*qbpp = allocband()) == NULL) {
3486 					error = EAGAIN;
3487 					goto done;
3488 				}
3489 				(*qbpp)->qb_hiwat = q->q_hiwat;
3490 				(*qbpp)->qb_lowat = q->q_lowat;
3491 				q->q_nband++;
3492 				qbpp = &(*qbpp)->qb_next;
3493 			}
3494 		}
3495 		qbp = q->q_bandp;
3496 		i = pri;
3497 		while (--i)
3498 			qbp = qbp->qb_next;
3499 	}
3500 	switch (what) {
3501 	case QHIWAT:
3502 		if (qbp)
3503 			*(size_t *)valp = qbp->qb_hiwat;
3504 		else
3505 			*(size_t *)valp = q->q_hiwat;
3506 		break;
3507 
3508 	case QLOWAT:
3509 		if (qbp)
3510 			*(size_t *)valp = qbp->qb_lowat;
3511 		else
3512 			*(size_t *)valp = q->q_lowat;
3513 		break;
3514 
3515 	case QMAXPSZ:
3516 		if (qbp)
3517 			error = EINVAL;
3518 		else
3519 			*(ssize_t *)valp = q->q_maxpsz;
3520 		break;
3521 
3522 	case QMINPSZ:
3523 		if (qbp)
3524 			error = EINVAL;
3525 		else
3526 			*(ssize_t *)valp = q->q_minpsz;
3527 		break;
3528 
3529 	case QCOUNT:
3530 		if (qbp)
3531 			*(size_t *)valp = qbp->qb_count;
3532 		else
3533 			*(size_t *)valp = q->q_count;
3534 		break;
3535 
3536 	case QFIRST:
3537 		if (qbp)
3538 			*(mblk_t **)valp = qbp->qb_first;
3539 		else
3540 			*(mblk_t **)valp = q->q_first;
3541 		break;
3542 
3543 	case QLAST:
3544 		if (qbp)
3545 			*(mblk_t **)valp = qbp->qb_last;
3546 		else
3547 			*(mblk_t **)valp = q->q_last;
3548 		break;
3549 
3550 	case QFLAG:
3551 		if (qbp)
3552 			*(uint_t *)valp = qbp->qb_flag;
3553 		else
3554 			*(uint_t *)valp = q->q_flag;
3555 		break;
3556 
3557 	case QSTRUIOT:
3558 		if (qbp)
3559 			error = EINVAL;
3560 		else
3561 			*(short *)valp = q->q_struiot;
3562 		break;
3563 
3564 	default:
3565 		error = EINVAL;
3566 		break;
3567 	}
3568 done:
3569 	if (freezer != curthread)
3570 		mutex_exit(QLOCK(q));
3571 	return (error);
3572 }
3573 
3574 /*
3575  * Function awakes all in cvwait/sigwait/pollwait, on one of:
3576  *	QWANTWSYNC or QWANTR or QWANTW,
3577  *
3578  * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
3579  *	 deferred wakeup will be done. Also if strpoll() in progress then a
3580  *	 deferred pollwakeup will be done.
3581  */
3582 void
3583 strwakeq(queue_t *q, int flag)
3584 {
3585 	stdata_t	*stp = STREAM(q);
3586 	pollhead_t	*pl;
3587 
3588 	mutex_enter(&stp->sd_lock);
3589 	pl = &stp->sd_pollist;
3590 	if (flag & QWANTWSYNC) {
3591 		ASSERT(!(q->q_flag & QREADR));
3592 		if (stp->sd_flag & WSLEEP) {
3593 			stp->sd_flag &= ~WSLEEP;
3594 			cv_broadcast(&stp->sd_wrq->q_wait);
3595 		} else {
3596 			stp->sd_wakeq |= WSLEEP;
3597 		}
3598 
3599 		mutex_exit(&stp->sd_lock);
3600 		pollwakeup(pl, POLLWRNORM);
3601 		mutex_enter(&stp->sd_lock);
3602 
3603 		if (stp->sd_sigflags & S_WRNORM)
3604 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3605 	} else if (flag & QWANTR) {
3606 		if (stp->sd_flag & RSLEEP) {
3607 			stp->sd_flag &= ~RSLEEP;
3608 			cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
3609 		} else {
3610 			stp->sd_wakeq |= RSLEEP;
3611 		}
3612 
3613 		mutex_exit(&stp->sd_lock);
3614 		pollwakeup(pl, POLLIN | POLLRDNORM);
3615 		mutex_enter(&stp->sd_lock);
3616 
3617 		{
3618 			int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
3619 
3620 			if (events)
3621 				strsendsig(stp->sd_siglist, events, 0, 0);
3622 		}
3623 	} else {
3624 		if (stp->sd_flag & WSLEEP) {
3625 			stp->sd_flag &= ~WSLEEP;
3626 			cv_broadcast(&stp->sd_wrq->q_wait);
3627 		}
3628 
3629 		mutex_exit(&stp->sd_lock);
3630 		pollwakeup(pl, POLLWRNORM);
3631 		mutex_enter(&stp->sd_lock);
3632 
3633 		if (stp->sd_sigflags & S_WRNORM)
3634 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3635 	}
3636 	mutex_exit(&stp->sd_lock);
3637 }
3638 
3639 int
3640 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
3641 {
3642 	stdata_t *stp = STREAM(q);
3643 	int typ  = STRUIOT_STANDARD;
3644 	uio_t	 *uiop = &dp->d_uio;
3645 	dblk_t	 *dbp;
3646 	ssize_t	 uiocnt;
3647 	ssize_t	 cnt;
3648 	unsigned char *ptr;
3649 	ssize_t	 resid;
3650 	int	 error = 0;
3651 	on_trap_data_t otd;
3652 	queue_t	*stwrq;
3653 
3654 	/*
3655 	 * Plumbing may change while taking the type so store the
3656 	 * queue in a temporary variable. It doesn't matter even
3657 	 * if the we take the type from the previous plumbing,
3658 	 * that's because if the plumbing has changed when we were
3659 	 * holding the queue in a temporary variable, we can continue
3660 	 * processing the message the way it would have been processed
3661 	 * in the old plumbing, without any side effects but a bit
3662 	 * extra processing for partial ip header checksum.
3663 	 *
3664 	 * This has been done to avoid holding the sd_lock which is
3665 	 * very hot.
3666 	 */
3667 
3668 	stwrq = stp->sd_struiowrq;
3669 	if (stwrq)
3670 		typ = stwrq->q_struiot;
3671 
3672 	for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
3673 		dbp = mp->b_datap;
3674 		ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
3675 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3676 		cnt = MIN(uiocnt, uiop->uio_resid);
3677 		if (!(dbp->db_struioflag & STRUIO_SPEC) ||
3678 		    (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
3679 			/*
3680 			 * Either this mblk has already been processed
3681 			 * or there is no more room in this mblk (?).
3682 			 */
3683 			continue;
3684 		}
3685 		switch (typ) {
3686 		case STRUIOT_STANDARD:
3687 			if (noblock) {
3688 				if (on_trap(&otd, OT_DATA_ACCESS)) {
3689 					no_trap();
3690 					error = EWOULDBLOCK;
3691 					goto out;
3692 				}
3693 			}
3694 			if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
3695 				if (noblock)
3696 					no_trap();
3697 				goto out;
3698 			}
3699 			if (noblock)
3700 				no_trap();
3701 			break;
3702 
3703 		default:
3704 			error = EIO;
3705 			goto out;
3706 		}
3707 		dbp->db_struioflag |= STRUIO_DONE;
3708 		dbp->db_cksumstuff += cnt;
3709 	}
3710 out:
3711 	if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
3712 		/*
3713 		 * A fault has occured and some bytes were moved to the
3714 		 * current mblk, the uio_t has already been updated by
3715 		 * the appropriate uio routine, so also update the mblk
3716 		 * to reflect this in case this same mblk chain is used
3717 		 * again (after the fault has been handled).
3718 		 */
3719 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3720 		if (uiocnt >= resid)
3721 			dbp->db_cksumstuff += resid;
3722 	}
3723 	return (error);
3724 }
3725 
3726 /*
3727  * Try to enter queue synchronously. Any attempt to enter a closing queue will
3728  * fails. The qp->q_rwcnt keeps track of the number of successful entries so
3729  * that removeq() will not try to close the queue while a thread is inside the
3730  * queue.
3731  */
3732 static boolean_t
3733 rwnext_enter(queue_t *qp)
3734 {
3735 	mutex_enter(QLOCK(qp));
3736 	if (qp->q_flag & QWCLOSE) {
3737 		mutex_exit(QLOCK(qp));
3738 		return (B_FALSE);
3739 	}
3740 	qp->q_rwcnt++;
3741 	ASSERT(qp->q_rwcnt != 0);
3742 	mutex_exit(QLOCK(qp));
3743 	return (B_TRUE);
3744 }
3745 
3746 /*
3747  * Decrease the count of threads running in sync stream queue and wake up any
3748  * threads blocked in removeq().
3749  */
3750 static void
3751 rwnext_exit(queue_t *qp)
3752 {
3753 	mutex_enter(QLOCK(qp));
3754 	qp->q_rwcnt--;
3755 	if (qp->q_flag & QWANTRMQSYNC) {
3756 		qp->q_flag &= ~QWANTRMQSYNC;
3757 		cv_broadcast(&qp->q_wait);
3758 	}
3759 	mutex_exit(QLOCK(qp));
3760 }
3761 
3762 /*
3763  * The purpose of rwnext() is to call the rw procedure of the next
3764  * (downstream) modules queue.
3765  *
3766  * treated as put entrypoint for perimeter syncronization.
3767  *
3768  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3769  * sync queues). If it is CIPUT sync queue sq_count is incremented and it does
3770  * not matter if any regular put entrypoints have been already entered. We
3771  * can't increment one of the sq_putcounts (instead of sq_count) because
3772  * qwait_rw won't know which counter to decrement.
3773  *
3774  * It would be reasonable to add the lockless FASTPUT logic.
3775  */
3776 int
3777 rwnext(queue_t *qp, struiod_t *dp)
3778 {
3779 	queue_t		*nqp;
3780 	syncq_t		*sq;
3781 	uint16_t	count;
3782 	uint16_t	flags;
3783 	struct qinit	*qi;
3784 	int		(*proc)();
3785 	struct stdata	*stp;
3786 	int		isread;
3787 	int		rval;
3788 
3789 	stp = STREAM(qp);
3790 	/*
3791 	 * Prevent q_next from changing by holding sd_lock until acquiring
3792 	 * SQLOCK. Note that a read-side rwnext from the streamhead will
3793 	 * already have sd_lock acquired. In either case sd_lock is always
3794 	 * released after acquiring SQLOCK.
3795 	 *
3796 	 * The streamhead read-side holding sd_lock when calling rwnext is
3797 	 * required to prevent a race condition were M_DATA mblks flowing
3798 	 * up the read-side of the stream could be bypassed by a rwnext()
3799 	 * down-call. In this case sd_lock acts as the streamhead perimeter.
3800 	 */
3801 	if ((nqp = _WR(qp)) == qp) {
3802 		isread = 0;
3803 		mutex_enter(&stp->sd_lock);
3804 		qp = nqp->q_next;
3805 	} else {
3806 		isread = 1;
3807 		if (nqp != stp->sd_wrq)
3808 			/* Not streamhead */
3809 			mutex_enter(&stp->sd_lock);
3810 		qp = _RD(nqp->q_next);
3811 	}
3812 	qi = qp->q_qinfo;
3813 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
3814 		/*
3815 		 * Not a synchronous module or no r/w procedure for this
3816 		 * queue, so just return EINVAL and let the caller handle it.
3817 		 */
3818 		mutex_exit(&stp->sd_lock);
3819 		return (EINVAL);
3820 	}
3821 
3822 	if (rwnext_enter(qp) == B_FALSE) {
3823 		mutex_exit(&stp->sd_lock);
3824 		return (EINVAL);
3825 	}
3826 
3827 	sq = qp->q_syncq;
3828 	mutex_enter(SQLOCK(sq));
3829 	mutex_exit(&stp->sd_lock);
3830 	count = sq->sq_count;
3831 	flags = sq->sq_flags;
3832 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3833 
3834 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3835 		/*
3836 		 * if this queue is being closed, return.
3837 		 */
3838 		if (qp->q_flag & QWCLOSE) {
3839 			mutex_exit(SQLOCK(sq));
3840 			rwnext_exit(qp);
3841 			return (EINVAL);
3842 		}
3843 
3844 		/*
3845 		 * Wait until we can enter the inner perimeter.
3846 		 */
3847 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3848 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3849 		count = sq->sq_count;
3850 		flags = sq->sq_flags;
3851 	}
3852 
3853 	if (isread == 0 && stp->sd_struiowrq == NULL ||
3854 	    isread == 1 && stp->sd_struiordq == NULL) {
3855 		/*
3856 		 * Stream plumbing changed while waiting for inner perimeter
3857 		 * so just return EINVAL and let the caller handle it.
3858 		 */
3859 		mutex_exit(SQLOCK(sq));
3860 		rwnext_exit(qp);
3861 		return (EINVAL);
3862 	}
3863 	if (!(flags & SQ_CIPUT))
3864 		sq->sq_flags = flags | SQ_EXCL;
3865 	sq->sq_count = count + 1;
3866 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3867 	/*
3868 	 * Note: The only message ordering guarantee that rwnext() makes is
3869 	 *	 for the write queue flow-control case. All others (r/w queue
3870 	 *	 with q_count > 0 (or q_first != 0)) are the resposibilty of
3871 	 *	 the queue's rw procedure. This could be genralized here buy
3872 	 *	 running the queue's service procedure, but that wouldn't be
3873 	 *	 the most efficent for all cases.
3874 	 */
3875 	mutex_exit(SQLOCK(sq));
3876 	if (! isread && (qp->q_flag & QFULL)) {
3877 		/*
3878 		 * Write queue may be flow controlled. If so,
3879 		 * mark the queue for wakeup when it's not.
3880 		 */
3881 		mutex_enter(QLOCK(qp));
3882 		if (qp->q_flag & QFULL) {
3883 			qp->q_flag |= QWANTWSYNC;
3884 			mutex_exit(QLOCK(qp));
3885 			rval = EWOULDBLOCK;
3886 			goto out;
3887 		}
3888 		mutex_exit(QLOCK(qp));
3889 	}
3890 
3891 	if (! isread && dp->d_mp)
3892 		STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
3893 		    dp->d_mp->b_datap->db_base);
3894 
3895 	rval = (*proc)(qp, dp);
3896 
3897 	if (isread && dp->d_mp)
3898 		STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
3899 		    dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
3900 out:
3901 	/*
3902 	 * The queue is protected from being freed by sq_count, so it is
3903 	 * safe to call rwnext_exit and reacquire SQLOCK(sq).
3904 	 */
3905 	rwnext_exit(qp);
3906 
3907 	mutex_enter(SQLOCK(sq));
3908 	flags = sq->sq_flags;
3909 	ASSERT(sq->sq_count != 0);
3910 	sq->sq_count--;
3911 	if (flags & SQ_TAIL) {
3912 		putnext_tail(sq, qp, flags);
3913 		/*
3914 		 * The only purpose of this ASSERT is to preserve calling stack
3915 		 * in DEBUG kernel.
3916 		 */
3917 		ASSERT(flags & SQ_TAIL);
3918 		return (rval);
3919 	}
3920 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3921 	/*
3922 	 * Safe to always drop SQ_EXCL:
3923 	 *	Not SQ_CIPUT means we set SQ_EXCL above
3924 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3925 	 *	did a qwriter(INNER) in which case nobody else
3926 	 *	is in the inner perimeter and we are exiting.
3927 	 *
3928 	 * I would like to make the following assertion:
3929 	 *
3930 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3931 	 *	sq->sq_count == 0);
3932 	 *
3933 	 * which indicates that if we are both putshared and exclusive,
3934 	 * we became exclusive while executing the putproc, and the only
3935 	 * claim on the syncq was the one we dropped a few lines above.
3936 	 * But other threads that enter putnext while the syncq is exclusive
3937 	 * need to make a claim as they may need to drop SQLOCK in the
3938 	 * has_writers case to avoid deadlocks.  If these threads are
3939 	 * delayed or preempted, it is possible that the writer thread can
3940 	 * find out that there are other claims making the (sq_count == 0)
3941 	 * test invalid.
3942 	 */
3943 
3944 	sq->sq_flags = flags & ~SQ_EXCL;
3945 	if (sq->sq_flags & SQ_WANTWAKEUP) {
3946 		sq->sq_flags &= ~SQ_WANTWAKEUP;
3947 		cv_broadcast(&sq->sq_wait);
3948 	}
3949 	mutex_exit(SQLOCK(sq));
3950 	return (rval);
3951 }
3952 
3953 /*
3954  * The purpose of infonext() is to call the info procedure of the next
3955  * (downstream) modules queue.
3956  *
3957  * treated as put entrypoint for perimeter syncronization.
3958  *
3959  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3960  * sync queues). If it is CIPUT sync queue regular sq_count is incremented and
3961  * it does not matter if any regular put entrypoints have been already
3962  * entered.
3963  */
3964 int
3965 infonext(queue_t *qp, infod_t *idp)
3966 {
3967 	queue_t		*nqp;
3968 	syncq_t		*sq;
3969 	uint16_t	count;
3970 	uint16_t	flags;
3971 	struct qinit	*qi;
3972 	int		(*proc)();
3973 	struct stdata	*stp;
3974 	int		rval;
3975 
3976 	stp = STREAM(qp);
3977 	/*
3978 	 * Prevent q_next from changing by holding sd_lock until
3979 	 * acquiring SQLOCK.
3980 	 */
3981 	mutex_enter(&stp->sd_lock);
3982 	if ((nqp = _WR(qp)) == qp) {
3983 		qp = nqp->q_next;
3984 	} else {
3985 		qp = _RD(nqp->q_next);
3986 	}
3987 	qi = qp->q_qinfo;
3988 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
3989 		mutex_exit(&stp->sd_lock);
3990 		return (EINVAL);
3991 	}
3992 	sq = qp->q_syncq;
3993 	mutex_enter(SQLOCK(sq));
3994 	mutex_exit(&stp->sd_lock);
3995 	count = sq->sq_count;
3996 	flags = sq->sq_flags;
3997 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3998 
3999 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
4000 		/*
4001 		 * Wait until we can enter the inner perimeter.
4002 		 */
4003 		sq->sq_flags = flags | SQ_WANTWAKEUP;
4004 		cv_wait(&sq->sq_wait, SQLOCK(sq));
4005 		count = sq->sq_count;
4006 		flags = sq->sq_flags;
4007 	}
4008 
4009 	if (! (flags & SQ_CIPUT))
4010 		sq->sq_flags = flags | SQ_EXCL;
4011 	sq->sq_count = count + 1;
4012 	ASSERT(sq->sq_count != 0);		/* Wraparound */
4013 	mutex_exit(SQLOCK(sq));
4014 
4015 	rval = (*proc)(qp, idp);
4016 
4017 	mutex_enter(SQLOCK(sq));
4018 	flags = sq->sq_flags;
4019 	ASSERT(sq->sq_count != 0);
4020 	sq->sq_count--;
4021 	if (flags & SQ_TAIL) {
4022 		putnext_tail(sq, qp, flags);
4023 		/*
4024 		 * The only purpose of this ASSERT is to preserve calling stack
4025 		 * in DEBUG kernel.
4026 		 */
4027 		ASSERT(flags & SQ_TAIL);
4028 		return (rval);
4029 	}
4030 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
4031 /*
4032  * XXXX
4033  * I am not certain the next comment is correct here.  I need to consider
4034  * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
4035  * might cause other problems.  It just might be safer to drop it if
4036  * !SQ_CIPUT because that is when we set it.
4037  */
4038 	/*
4039 	 * Safe to always drop SQ_EXCL:
4040 	 *	Not SQ_CIPUT means we set SQ_EXCL above
4041 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
4042 	 *	did a qwriter(INNER) in which case nobody else
4043 	 *	is in the inner perimeter and we are exiting.
4044 	 *
4045 	 * I would like to make the following assertion:
4046 	 *
4047 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
4048 	 *	sq->sq_count == 0);
4049 	 *
4050 	 * which indicates that if we are both putshared and exclusive,
4051 	 * we became exclusive while executing the putproc, and the only
4052 	 * claim on the syncq was the one we dropped a few lines above.
4053 	 * But other threads that enter putnext while the syncq is exclusive
4054 	 * need to make a claim as they may need to drop SQLOCK in the
4055 	 * has_writers case to avoid deadlocks.  If these threads are
4056 	 * delayed or preempted, it is possible that the writer thread can
4057 	 * find out that there are other claims making the (sq_count == 0)
4058 	 * test invalid.
4059 	 */
4060 
4061 	sq->sq_flags = flags & ~SQ_EXCL;
4062 	mutex_exit(SQLOCK(sq));
4063 	return (rval);
4064 }
4065 
4066 /*
4067  * Return nonzero if the queue is responsible for struio(), else return 0.
4068  */
4069 int
4070 isuioq(queue_t *q)
4071 {
4072 	if (q->q_flag & QREADR)
4073 		return (STREAM(q)->sd_struiordq == q);
4074 	else
4075 		return (STREAM(q)->sd_struiowrq == q);
4076 }
4077 
4078 #if defined(__sparc)
4079 int disable_putlocks = 0;
4080 #else
4081 int disable_putlocks = 1;
4082 #endif
4083 
4084 /*
4085  * called by create_putlock.
4086  */
4087 static void
4088 create_syncq_putlocks(queue_t *q)
4089 {
4090 	syncq_t	*sq = q->q_syncq;
4091 	ciputctrl_t *cip;
4092 	int i;
4093 
4094 	ASSERT(sq != NULL);
4095 
4096 	ASSERT(disable_putlocks == 0);
4097 	ASSERT(n_ciputctrl >= min_n_ciputctrl);
4098 	ASSERT(ciputctrl_cache != NULL);
4099 
4100 	if (!(sq->sq_type & SQ_CIPUT))
4101 		return;
4102 
4103 	for (i = 0; i <= 1; i++) {
4104 		if (sq->sq_ciputctrl == NULL) {
4105 			cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4106 			SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4107 			mutex_enter(SQLOCK(sq));
4108 			if (sq->sq_ciputctrl != NULL) {
4109 				mutex_exit(SQLOCK(sq));
4110 				kmem_cache_free(ciputctrl_cache, cip);
4111 			} else {
4112 				ASSERT(sq->sq_nciputctrl == 0);
4113 				sq->sq_nciputctrl = n_ciputctrl - 1;
4114 				/*
4115 				 * putnext checks sq_ciputctrl without holding
4116 				 * SQLOCK. if it is not NULL putnext assumes
4117 				 * sq_nciputctrl is initialized. membar below
4118 				 * insures that.
4119 				 */
4120 				membar_producer();
4121 				sq->sq_ciputctrl = cip;
4122 				mutex_exit(SQLOCK(sq));
4123 			}
4124 		}
4125 		ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
4126 		if (i == 1)
4127 			break;
4128 		q = _OTHERQ(q);
4129 		if (!(q->q_flag & QPERQ)) {
4130 			ASSERT(sq == q->q_syncq);
4131 			break;
4132 		}
4133 		ASSERT(q->q_syncq != NULL);
4134 		ASSERT(sq != q->q_syncq);
4135 		sq = q->q_syncq;
4136 		ASSERT(sq->sq_type & SQ_CIPUT);
4137 	}
4138 }
4139 
4140 /*
4141  * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
4142  * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
4143  * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
4144  * starting from q and down to the driver.
4145  *
4146  * This should be called after the affected queues are part of stream
4147  * geometry. It should be called from driver/module open routine after
4148  * qprocson() call. It is also called from nfs syscall where it is known that
4149  * stream is configured and won't change its geometry during create_putlock
4150  * call.
4151  *
4152  * caller normally uses 0 value for the stream argument to speed up MT putnext
4153  * into the perimeter of q for example because its perimeter is per module
4154  * (e.g. IP).
4155  *
4156  * caller normally uses non 0 value for the stream argument to hint the system
4157  * that the stream of q is a very contended global system stream
4158  * (e.g. NFS/UDP) and the part of the stream from q to the driver is
4159  * particularly MT hot.
4160  *
4161  * Caller insures stream plumbing won't happen while we are here and therefore
4162  * q_next can be safely used.
4163  */
4164 
4165 void
4166 create_putlocks(queue_t *q, int stream)
4167 {
4168 	ciputctrl_t	*cip;
4169 	struct stdata	*stp = STREAM(q);
4170 
4171 	q = _WR(q);
4172 	ASSERT(stp != NULL);
4173 
4174 	if (disable_putlocks != 0)
4175 		return;
4176 
4177 	if (n_ciputctrl < min_n_ciputctrl)
4178 		return;
4179 
4180 	ASSERT(ciputctrl_cache != NULL);
4181 
4182 	if (stream != 0 && stp->sd_ciputctrl == NULL) {
4183 		cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4184 		SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4185 		mutex_enter(&stp->sd_lock);
4186 		if (stp->sd_ciputctrl != NULL) {
4187 			mutex_exit(&stp->sd_lock);
4188 			kmem_cache_free(ciputctrl_cache, cip);
4189 		} else {
4190 			ASSERT(stp->sd_nciputctrl == 0);
4191 			stp->sd_nciputctrl = n_ciputctrl - 1;
4192 			/*
4193 			 * putnext checks sd_ciputctrl without holding
4194 			 * sd_lock. if it is not NULL putnext assumes
4195 			 * sd_nciputctrl is initialized. membar below
4196 			 * insures that.
4197 			 */
4198 			membar_producer();
4199 			stp->sd_ciputctrl = cip;
4200 			mutex_exit(&stp->sd_lock);
4201 		}
4202 	}
4203 
4204 	ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);
4205 
4206 	while (_SAMESTR(q)) {
4207 		create_syncq_putlocks(q);
4208 		if (stream == 0)
4209 			return;
4210 		q = q->q_next;
4211 	}
4212 	ASSERT(q != NULL);
4213 	create_syncq_putlocks(q);
4214 }
4215 
4216 /*
4217  * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
4218  * through a stream.
4219  *
4220  * Data currently record per-event is a timestamp, module/driver name,
4221  * downstream module/driver name, optional callstack, event type and a per
4222  * type datum.  Much of the STREAMS framework is instrumented for automatic
4223  * flow tracing (when enabled).  Events can be defined and used by STREAMS
4224  * modules and drivers.
4225  *
4226  * Global objects:
4227  *
4228  *	str_ftevent() - Add a flow-trace event to a dblk.
4229  *	str_ftfree() - Free flow-trace data
4230  *
4231  * Local objects:
4232  *
4233  *	fthdr_cache - pointer to the kmem cache for trace header.
4234  *	ftblk_cache - pointer to the kmem cache for trace data blocks.
4235  */
4236 
4237 int str_ftnever = 1;	/* Don't do STREAMS flow tracing */
4238 int str_ftstack = 0;	/* Don't record event call stacks */
4239 
4240 void
4241 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
4242 {
4243 	ftblk_t *bp = hp->tail;
4244 	ftblk_t *nbp;
4245 	ftevnt_t *ep;
4246 	int ix, nix;
4247 
4248 	ASSERT(hp != NULL);
4249 
4250 	for (;;) {
4251 		if ((ix = bp->ix) == FTBLK_EVNTS) {
4252 			/*
4253 			 * Tail doesn't have room, so need a new tail.
4254 			 *
4255 			 * To make this MT safe, first, allocate a new
4256 			 * ftblk, and initialize it.  To make life a
4257 			 * little easier, reserve the first slot (mostly
4258 			 * by making ix = 1).  When we are finished with
4259 			 * the initialization, CAS this pointer to the
4260 			 * tail.  If this succeeds, this is the new
4261 			 * "next" block.  Otherwise, another thread
4262 			 * got here first, so free the block and start
4263 			 * again.
4264 			 */
4265 			nbp = kmem_cache_alloc(ftblk_cache, KM_NOSLEEP);
4266 			if (nbp == NULL) {
4267 				/* no mem, so punt */
4268 				str_ftnever++;
4269 				/* free up all flow data? */
4270 				return;
4271 			}
4272 			nbp->nxt = NULL;
4273 			nbp->ix = 1;
4274 			/*
4275 			 * Just in case there is another thread about
4276 			 * to get the next index, we need to make sure
4277 			 * the value is there for it.
4278 			 */
4279 			membar_producer();
4280 			if (atomic_cas_ptr(&hp->tail, bp, nbp) == bp) {
4281 				/* CAS was successful */
4282 				bp->nxt = nbp;
4283 				membar_producer();
4284 				bp = nbp;
4285 				ix = 0;
4286 				goto cas_good;
4287 			} else {
4288 				kmem_cache_free(ftblk_cache, nbp);
4289 				bp = hp->tail;
4290 				continue;
4291 			}
4292 		}
4293 		nix = ix + 1;
4294 		if (atomic_cas_32((uint32_t *)&bp->ix, ix, nix) == ix) {
4295 		cas_good:
4296 			if (curthread != hp->thread) {
4297 				hp->thread = curthread;
4298 				evnt |= FTEV_CS;
4299 			}
4300 			if (CPU->cpu_seqid != hp->cpu_seqid) {
4301 				hp->cpu_seqid = CPU->cpu_seqid;
4302 				evnt |= FTEV_PS;
4303 			}
4304 			ep = &bp->ev[ix];
4305 			break;
4306 		}
4307 	}
4308 
4309 	if (evnt & FTEV_QMASK) {
4310 		queue_t *qp = p;
4311 
4312 		if (!(qp->q_flag & QREADR))
4313 			evnt |= FTEV_ISWR;
4314 
4315 		ep->mid = Q2NAME(qp);
4316 
4317 		/*
4318 		 * We only record the next queue name for FTEV_PUTNEXT since
4319 		 * that's the only time we *really* need it, and the putnext()
4320 		 * code ensures that qp->q_next won't vanish.  (We could use
4321 		 * claimstr()/releasestr() but at a performance cost.)
4322 		 */
4323 		if ((evnt & FTEV_MASK) == FTEV_PUTNEXT && qp->q_next != NULL)
4324 			ep->midnext = Q2NAME(qp->q_next);
4325 		else
4326 			ep->midnext = NULL;
4327 	} else {
4328 		ep->mid = p;
4329 		ep->midnext = NULL;
4330 	}
4331 
4332 	if (ep->stk != NULL)
4333 		ep->stk->fs_depth = getpcstack(ep->stk->fs_stk, FTSTK_DEPTH);
4334 
4335 	ep->ts = gethrtime();
4336 	ep->evnt = evnt;
4337 	ep->data = data;
4338 	hp->hash = (hp->hash << 9) + hp->hash;
4339 	hp->hash += (evnt << 16) | data;
4340 	hp->hash += (uintptr_t)ep->mid;
4341 }
4342 
4343 /*
4344  * Free flow-trace data.
4345  */
4346 void
4347 str_ftfree(dblk_t *dbp)
4348 {
4349 	fthdr_t *hp = dbp->db_fthdr;
4350 	ftblk_t *bp = &hp->first;
4351 	ftblk_t *nbp;
4352 
4353 	if (bp != hp->tail || bp->ix != 0) {
4354 		/*
4355 		 * Clear out the hash, have the tail point to itself, and free
4356 		 * any continuation blocks.
4357 		 */
4358 		bp = hp->first.nxt;
4359 		hp->tail = &hp->first;
4360 		hp->hash = 0;
4361 		hp->first.nxt = NULL;
4362 		hp->first.ix = 0;
4363 		while (bp != NULL) {
4364 			nbp = bp->nxt;
4365 			kmem_cache_free(ftblk_cache, bp);
4366 			bp = nbp;
4367 		}
4368 	}
4369 	kmem_cache_free(fthdr_cache, hp);
4370 	dbp->db_fthdr = NULL;
4371 }
4372