xref: /illumos-gate/usr/src/uts/common/io/stream.c (revision 3fe455549728ac525df3be56130ad8e075d645d7)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
22 /*	All Rights Reserved	*/
23 
24 /*
25  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
26  * Use is subject to license terms.
27  *
28  * Copyright 2021 Tintri by DDN, Inc. All rights reserved.
29  * Copyright 2022 Garrett D'Amore
30  * Copyright 2024 Oxide Computer Company
31  */
32 
33 #include <sys/types.h>
34 #include <sys/param.h>
35 #include <sys/thread.h>
36 #include <sys/sysmacros.h>
37 #include <sys/stropts.h>
38 #include <sys/stream.h>
39 #include <sys/strsubr.h>
40 #include <sys/strsun.h>
41 #include <sys/conf.h>
42 #include <sys/debug.h>
43 #include <sys/cmn_err.h>
44 #include <sys/kmem.h>
45 #include <sys/atomic.h>
46 #include <sys/errno.h>
47 #include <sys/vtrace.h>
48 #include <sys/ftrace.h>
49 #include <sys/ontrap.h>
50 #include <sys/sdt.h>
51 #include <sys/strft.h>
52 
53 #ifdef DEBUG
54 #include <sys/kmem_impl.h>
55 #endif
56 
57 /*
58  * This file contains all the STREAMS utility routines that may
59  * be used by modules and drivers.
60  */
61 
62 /*
63  * STREAMS message allocator: principles of operation
64  *
65  * The streams message allocator consists of all the routines that
66  * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
67  * dupb(), freeb() and freemsg().  What follows is a high-level view
68  * of how the allocator works.
69  *
70  * Every streams message consists of one or more mblks, a dblk, and data.
71  * All mblks for all types of messages come from a common mblk_cache.
72  * The dblk and data come in several flavors, depending on how the
73  * message is allocated:
74  *
75  * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
76  *     fixed-size dblk/data caches. For message sizes that are multiples of
77  *     PAGESIZE, dblks are allocated separately from the buffer.
78  *     The associated buffer is allocated by the constructor using kmem_alloc().
79  *     For all other message sizes, dblk and its associated data is allocated
80  *     as a single contiguous chunk of memory.
81  *     Objects in these caches consist of a dblk plus its associated data.
82  *     allocb() determines the nearest-size cache by table lookup:
83  *     the dblk_cache[] array provides the mapping from size to dblk cache.
84  *
85  * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
86  *     kmem_alloc()'ing a buffer for the data and supplying that
87  *     buffer to gesballoc(), described below.
88  *
89  * (3) The four flavors of [d]esballoc[a] are all implemented by a
90  *     common routine, gesballoc() ("generic esballoc").  gesballoc()
91  *     allocates a dblk from the global dblk_esb_cache and sets db_base,
92  *     db_lim and db_frtnp to describe the caller-supplied buffer.
93  *
94  * While there are several routines to allocate messages, there is only
95  * one routine to free messages: freeb().  freeb() simply invokes the
96  * dblk's free method, dbp->db_free(), which is set at allocation time.
97  *
98  * dupb() creates a new reference to a message by allocating a new mblk,
99  * incrementing the dblk reference count and setting the dblk's free
100  * method to dblk_decref().  The dblk's original free method is retained
101  * in db_lastfree.  dblk_decref() decrements the reference count on each
102  * freeb().  If this is not the last reference it just frees the mblk;
103  * if this *is* the last reference, it restores db_free to db_lastfree,
104  * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
105  *
106  * The implementation makes aggressive use of kmem object caching for
107  * maximum performance.  This makes the code simple and compact, but
108  * also a bit abstruse in some places.  The invariants that constitute a
109  * message's constructed state, described below, are more subtle than usual.
110  *
111  * Every dblk has an "attached mblk" as part of its constructed state.
112  * The mblk is allocated by the dblk's constructor and remains attached
113  * until the message is either dup'ed or pulled up.  In the dupb() case
114  * the mblk association doesn't matter until the last free, at which time
115  * dblk_decref() attaches the last mblk to the dblk.  pullupmsg() affects
116  * the mblk association because it swaps the leading mblks of two messages,
117  * so it is responsible for swapping their db_mblk pointers accordingly.
118  * From a constructed-state viewpoint it doesn't matter that a dblk's
119  * attached mblk can change while the message is allocated; all that
120  * matters is that the dblk has *some* attached mblk when it's freed.
121  *
122  * The sizes of the allocb() small-message caches are not magical.
123  * They represent a good trade-off between internal and external
124  * fragmentation for current workloads.  They should be reevaluated
125  * periodically, especially if allocations larger than DBLK_MAX_CACHE
126  * become common.  We use 64-byte alignment so that dblks don't
127  * straddle cache lines unnecessarily.
128  */
129 #define	DBLK_MAX_CACHE		73728
130 #define	DBLK_CACHE_ALIGN	64
131 #define	DBLK_MIN_SIZE		8
132 #define	DBLK_SIZE_SHIFT		3
133 
134 #ifdef _BIG_ENDIAN
135 #define	DBLK_RTFU_SHIFT(field)	\
136 	(8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
137 #else
138 #define	DBLK_RTFU_SHIFT(field)	\
139 	(8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
140 #endif
141 
142 #define	DBLK_RTFU(ref, type, flags, uioflag)	\
143 	(((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
144 	((type) << DBLK_RTFU_SHIFT(db_type)) | \
145 	(((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
146 	((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
147 #define	DBLK_RTFU_REF_MASK	(DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
148 #define	DBLK_RTFU_WORD(dbp)	(*((uint32_t *)&(dbp)->db_ref))
149 #define	MBLK_BAND_FLAG_WORD(mp)	(*((uint32_t *)&(mp)->b_band))
150 
151 static size_t dblk_sizes[] = {
152 #ifdef _LP64
153 	16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856,
154 	8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624,
155 	40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392,
156 #else
157 	64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904,
158 	8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672,
159 	40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440,
160 #endif
161 	DBLK_MAX_CACHE, 0
162 };
163 
164 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
165 static struct kmem_cache *mblk_cache;
166 static struct kmem_cache *dblk_esb_cache;
167 static struct kmem_cache *fthdr_cache;
168 static struct kmem_cache *ftblk_cache;
169 
170 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
171 static mblk_t *allocb_oversize(size_t size, int flags);
172 static int allocb_tryhard_fails;
173 static void frnop_func(void *arg);
174 frtn_t frnop = { frnop_func };
175 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
176 
177 static boolean_t rwnext_enter(queue_t *qp);
178 static void rwnext_exit(queue_t *qp);
179 
180 /*
181  * Patchable mblk/dblk kmem_cache flags.
182  */
183 int dblk_kmem_flags = 0;
184 int mblk_kmem_flags = 0;
185 
186 static int
187 dblk_constructor(void *buf, void *cdrarg, int kmflags)
188 {
189 	dblk_t *dbp = buf;
190 	ssize_t msg_size = (ssize_t)cdrarg;
191 	size_t index;
192 
193 	ASSERT(msg_size != 0);
194 
195 	index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
196 
197 	ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
198 
199 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
200 		return (-1);
201 	if ((msg_size & PAGEOFFSET) == 0) {
202 		dbp->db_base = kmem_alloc(msg_size, kmflags);
203 		if (dbp->db_base == NULL) {
204 			kmem_cache_free(mblk_cache, dbp->db_mblk);
205 			return (-1);
206 		}
207 	} else {
208 		dbp->db_base = (unsigned char *)&dbp[1];
209 	}
210 
211 	dbp->db_mblk->b_datap = dbp;
212 	dbp->db_cache = dblk_cache[index];
213 	dbp->db_lim = dbp->db_base + msg_size;
214 	dbp->db_free = dbp->db_lastfree = dblk_lastfree;
215 	dbp->db_frtnp = NULL;
216 	dbp->db_fthdr = NULL;
217 	dbp->db_credp = NULL;
218 	dbp->db_cpid = -1;
219 	dbp->db_struioflag = 0;
220 	dbp->db_struioun.cksum.flags = 0;
221 	return (0);
222 }
223 
224 /*ARGSUSED*/
225 static int
226 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
227 {
228 	dblk_t *dbp = buf;
229 
230 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
231 		return (-1);
232 	dbp->db_mblk->b_datap = dbp;
233 	dbp->db_cache = dblk_esb_cache;
234 	dbp->db_fthdr = NULL;
235 	dbp->db_credp = NULL;
236 	dbp->db_cpid = -1;
237 	dbp->db_struioflag = 0;
238 	dbp->db_struioun.cksum.flags = 0;
239 	return (0);
240 }
241 
242 static int
243 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
244 {
245 	dblk_t *dbp = buf;
246 	bcache_t *bcp = cdrarg;
247 
248 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
249 		return (-1);
250 
251 	dbp->db_base = kmem_cache_alloc(bcp->buffer_cache, kmflags);
252 	if (dbp->db_base == NULL) {
253 		kmem_cache_free(mblk_cache, dbp->db_mblk);
254 		return (-1);
255 	}
256 
257 	dbp->db_mblk->b_datap = dbp;
258 	dbp->db_cache = (void *)bcp;
259 	dbp->db_lim = dbp->db_base + bcp->size;
260 	dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
261 	dbp->db_frtnp = NULL;
262 	dbp->db_fthdr = NULL;
263 	dbp->db_credp = NULL;
264 	dbp->db_cpid = -1;
265 	dbp->db_struioflag = 0;
266 	dbp->db_struioun.cksum.flags = 0;
267 	return (0);
268 }
269 
270 /*ARGSUSED*/
271 static void
272 dblk_destructor(void *buf, void *cdrarg)
273 {
274 	dblk_t *dbp = buf;
275 	ssize_t msg_size = (ssize_t)cdrarg;
276 
277 	ASSERT(dbp->db_mblk->b_datap == dbp);
278 	ASSERT(msg_size != 0);
279 	ASSERT(dbp->db_struioflag == 0);
280 	ASSERT(dbp->db_struioun.cksum.flags == 0);
281 
282 	if ((msg_size & PAGEOFFSET) == 0) {
283 		kmem_free(dbp->db_base, msg_size);
284 	}
285 
286 	kmem_cache_free(mblk_cache, dbp->db_mblk);
287 }
288 
289 static void
290 bcache_dblk_destructor(void *buf, void *cdrarg)
291 {
292 	dblk_t *dbp = buf;
293 	bcache_t *bcp = cdrarg;
294 
295 	kmem_cache_free(bcp->buffer_cache, dbp->db_base);
296 
297 	ASSERT(dbp->db_mblk->b_datap == dbp);
298 	ASSERT(dbp->db_struioflag == 0);
299 	ASSERT(dbp->db_struioun.cksum.flags == 0);
300 
301 	kmem_cache_free(mblk_cache, dbp->db_mblk);
302 }
303 
304 /* ARGSUSED */
305 static int
306 ftblk_constructor(void *buf, void *cdrarg, int kmflags)
307 {
308 	ftblk_t *fbp = buf;
309 	int i;
310 
311 	bzero(fbp, sizeof (ftblk_t));
312 	if (str_ftstack != 0) {
313 		for (i = 0; i < FTBLK_EVNTS; i++)
314 			fbp->ev[i].stk = kmem_alloc(sizeof (ftstk_t), kmflags);
315 	}
316 
317 	return (0);
318 }
319 
320 /* ARGSUSED */
321 static void
322 ftblk_destructor(void *buf, void *cdrarg)
323 {
324 	ftblk_t *fbp = buf;
325 	int i;
326 
327 	if (str_ftstack != 0) {
328 		for (i = 0; i < FTBLK_EVNTS; i++) {
329 			if (fbp->ev[i].stk != NULL) {
330 				kmem_free(fbp->ev[i].stk, sizeof (ftstk_t));
331 				fbp->ev[i].stk = NULL;
332 			}
333 		}
334 	}
335 }
336 
337 static int
338 fthdr_constructor(void *buf, void *cdrarg, int kmflags)
339 {
340 	fthdr_t *fhp = buf;
341 
342 	return (ftblk_constructor(&fhp->first, cdrarg, kmflags));
343 }
344 
345 static void
346 fthdr_destructor(void *buf, void *cdrarg)
347 {
348 	fthdr_t *fhp = buf;
349 
350 	ftblk_destructor(&fhp->first, cdrarg);
351 }
352 
353 void
354 streams_msg_init(void)
355 {
356 	char name[40];
357 	size_t size;
358 	size_t lastsize = DBLK_MIN_SIZE;
359 	size_t *sizep;
360 	struct kmem_cache *cp;
361 	size_t tot_size;
362 	int offset;
363 
364 	mblk_cache = kmem_cache_create("streams_mblk", sizeof (mblk_t), 32,
365 	    NULL, NULL, NULL, NULL, NULL, mblk_kmem_flags);
366 
367 	for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
368 
369 		if ((offset = (size & PAGEOFFSET)) != 0) {
370 			/*
371 			 * We are in the middle of a page, dblk should
372 			 * be allocated on the same page
373 			 */
374 			tot_size = size + sizeof (dblk_t);
375 			ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
376 			    < PAGESIZE);
377 			ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
378 
379 		} else {
380 
381 			/*
382 			 * buf size is multiple of page size, dblk and
383 			 * buffer are allocated separately.
384 			 */
385 
386 			ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
387 			tot_size = sizeof (dblk_t);
388 		}
389 
390 		(void) sprintf(name, "streams_dblk_%ld", size);
391 		cp = kmem_cache_create(name, tot_size, DBLK_CACHE_ALIGN,
392 		    dblk_constructor, dblk_destructor, NULL, (void *)(size),
393 		    NULL, dblk_kmem_flags);
394 
395 		while (lastsize <= size) {
396 			dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
397 			lastsize += DBLK_MIN_SIZE;
398 		}
399 	}
400 
401 	dblk_esb_cache = kmem_cache_create("streams_dblk_esb", sizeof (dblk_t),
402 	    DBLK_CACHE_ALIGN, dblk_esb_constructor, dblk_destructor, NULL,
403 	    (void *)sizeof (dblk_t), NULL, dblk_kmem_flags);
404 	fthdr_cache = kmem_cache_create("streams_fthdr", sizeof (fthdr_t), 32,
405 	    fthdr_constructor, fthdr_destructor, NULL, NULL, NULL, 0);
406 	ftblk_cache = kmem_cache_create("streams_ftblk", sizeof (ftblk_t), 32,
407 	    ftblk_constructor, ftblk_destructor, NULL, NULL, NULL, 0);
408 
409 	/* initialize throttling queue for esballoc */
410 	esballoc_queue_init();
411 }
412 
413 /*ARGSUSED*/
414 mblk_t *
415 allocb(size_t size, uint_t pri)
416 {
417 	dblk_t *dbp;
418 	mblk_t *mp;
419 	size_t index;
420 
421 	index =  (size - 1)  >> DBLK_SIZE_SHIFT;
422 
423 	if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
424 		if (size != 0) {
425 			mp = allocb_oversize(size, KM_NOSLEEP);
426 			goto out;
427 		}
428 		index = 0;
429 	}
430 
431 	if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
432 		mp = NULL;
433 		goto out;
434 	}
435 
436 	mp = dbp->db_mblk;
437 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
438 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
439 	mp->b_rptr = mp->b_wptr = dbp->db_base;
440 	mp->b_queue = NULL;
441 	MBLK_BAND_FLAG_WORD(mp) = 0;
442 	STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
443 out:
444 	FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
445 
446 	return (mp);
447 }
448 
449 /*
450  * Allocate an mblk taking db_credp and db_cpid from the template.
451  * Allow the cred to be NULL.
452  */
453 mblk_t *
454 allocb_tmpl(size_t size, const mblk_t *tmpl)
455 {
456 	mblk_t *mp = allocb(size, 0);
457 
458 	if (mp != NULL) {
459 		dblk_t *src = tmpl->b_datap;
460 		dblk_t *dst = mp->b_datap;
461 		cred_t *cr;
462 		pid_t cpid;
463 
464 		cr = msg_getcred(tmpl, &cpid);
465 		if (cr != NULL)
466 			crhold(dst->db_credp = cr);
467 		dst->db_cpid = cpid;
468 		dst->db_type = src->db_type;
469 	}
470 	return (mp);
471 }
472 
473 mblk_t *
474 allocb_cred(size_t size, cred_t *cr, pid_t cpid)
475 {
476 	mblk_t *mp = allocb(size, 0);
477 
478 	ASSERT(cr != NULL);
479 	if (mp != NULL) {
480 		dblk_t *dbp = mp->b_datap;
481 
482 		crhold(dbp->db_credp = cr);
483 		dbp->db_cpid = cpid;
484 	}
485 	return (mp);
486 }
487 
488 mblk_t *
489 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr, pid_t cpid)
490 {
491 	mblk_t *mp = allocb_wait(size, 0, flags, error);
492 
493 	ASSERT(cr != NULL);
494 	if (mp != NULL) {
495 		dblk_t *dbp = mp->b_datap;
496 
497 		crhold(dbp->db_credp = cr);
498 		dbp->db_cpid = cpid;
499 	}
500 
501 	return (mp);
502 }
503 
504 /*
505  * Extract the db_cred (and optionally db_cpid) from a message.
506  * We find the first mblk which has a non-NULL db_cred and use that.
507  * If none found we return NULL.
508  * Does NOT get a hold on the cred.
509  */
510 cred_t *
511 msg_getcred(const mblk_t *mp, pid_t *cpidp)
512 {
513 	cred_t *cr = NULL;
514 	cred_t *cr2;
515 	mblk_t *mp2;
516 
517 	while (mp != NULL) {
518 		dblk_t *dbp = mp->b_datap;
519 
520 		cr = dbp->db_credp;
521 		if (cr == NULL) {
522 			mp = mp->b_cont;
523 			continue;
524 		}
525 		if (cpidp != NULL)
526 			*cpidp = dbp->db_cpid;
527 
528 #ifdef DEBUG
529 		/*
530 		 * Normally there should at most one db_credp in a message.
531 		 * But if there are multiple (as in the case of some M_IOC*
532 		 * and some internal messages in TCP/IP bind logic) then
533 		 * they must be identical in the normal case.
534 		 * However, a socket can be shared between different uids
535 		 * in which case data queued in TCP would be from different
536 		 * creds. Thus we can only assert for the zoneid being the
537 		 * same. Due to Multi-level Level Ports for TX, some
538 		 * cred_t can have a NULL cr_zone, and we skip the comparison
539 		 * in that case.
540 		 */
541 		mp2 = mp->b_cont;
542 		while (mp2 != NULL) {
543 			cr2 = DB_CRED(mp2);
544 			if (cr2 != NULL) {
545 				DTRACE_PROBE2(msg__getcred,
546 				    cred_t *, cr, cred_t *, cr2);
547 				ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
548 				    crgetzone(cr) == NULL ||
549 				    crgetzone(cr2) == NULL);
550 			}
551 			mp2 = mp2->b_cont;
552 		}
553 #endif
554 		return (cr);
555 	}
556 	if (cpidp != NULL)
557 		*cpidp = NOPID;
558 	return (NULL);
559 }
560 
561 /*
562  * Variant of msg_getcred which, when a cred is found
563  * 1. Returns with a hold on the cred
564  * 2. Clears the first cred in the mblk.
565  * This is more efficient to use than a msg_getcred() + crhold() when
566  * the message is freed after the cred has been extracted.
567  *
568  * The caller is responsible for ensuring that there is no other reference
569  * on the message since db_credp can not be cleared when there are other
570  * references.
571  */
572 cred_t *
573 msg_extractcred(mblk_t *mp, pid_t *cpidp)
574 {
575 	cred_t *cr = NULL;
576 	cred_t *cr2;
577 	mblk_t *mp2;
578 
579 	while (mp != NULL) {
580 		dblk_t *dbp = mp->b_datap;
581 
582 		cr = dbp->db_credp;
583 		if (cr == NULL) {
584 			mp = mp->b_cont;
585 			continue;
586 		}
587 		ASSERT(dbp->db_ref == 1);
588 		dbp->db_credp = NULL;
589 		if (cpidp != NULL)
590 			*cpidp = dbp->db_cpid;
591 #ifdef DEBUG
592 		/*
593 		 * Normally there should at most one db_credp in a message.
594 		 * But if there are multiple (as in the case of some M_IOC*
595 		 * and some internal messages in TCP/IP bind logic) then
596 		 * they must be identical in the normal case.
597 		 * However, a socket can be shared between different uids
598 		 * in which case data queued in TCP would be from different
599 		 * creds. Thus we can only assert for the zoneid being the
600 		 * same. Due to Multi-level Level Ports for TX, some
601 		 * cred_t can have a NULL cr_zone, and we skip the comparison
602 		 * in that case.
603 		 */
604 		mp2 = mp->b_cont;
605 		while (mp2 != NULL) {
606 			cr2 = DB_CRED(mp2);
607 			if (cr2 != NULL) {
608 				DTRACE_PROBE2(msg__extractcred,
609 				    cred_t *, cr, cred_t *, cr2);
610 				ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
611 				    crgetzone(cr) == NULL ||
612 				    crgetzone(cr2) == NULL);
613 			}
614 			mp2 = mp2->b_cont;
615 		}
616 #endif
617 		return (cr);
618 	}
619 	return (NULL);
620 }
621 /*
622  * Get the label for a message. Uses the first mblk in the message
623  * which has a non-NULL db_credp.
624  * Returns NULL if there is no credp.
625  */
626 extern struct ts_label_s *
627 msg_getlabel(const mblk_t *mp)
628 {
629 	cred_t *cr = msg_getcred(mp, NULL);
630 
631 	if (cr == NULL)
632 		return (NULL);
633 
634 	return (crgetlabel(cr));
635 }
636 
637 void
638 freeb(mblk_t *mp)
639 {
640 	dblk_t *dbp = mp->b_datap;
641 
642 	ASSERT(dbp->db_ref > 0);
643 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
644 	FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
645 
646 	STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
647 
648 	dbp->db_free(mp, dbp);
649 }
650 
651 void
652 freemsg(mblk_t *mp)
653 {
654 	FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
655 	while (mp) {
656 		dblk_t *dbp = mp->b_datap;
657 		mblk_t *mp_cont = mp->b_cont;
658 
659 		ASSERT(dbp->db_ref > 0);
660 		ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
661 
662 		STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
663 
664 		dbp->db_free(mp, dbp);
665 		mp = mp_cont;
666 	}
667 }
668 
669 /*
670  * Reallocate a block for another use.  Try hard to use the old block.
671  * If the old data is wanted (copy), leave b_wptr at the end of the data,
672  * otherwise return b_wptr = b_rptr.
673  *
674  * This routine is private and unstable.
675  */
676 mblk_t	*
677 reallocb(mblk_t *mp, size_t size, uint_t copy)
678 {
679 	mblk_t		*mp1;
680 	unsigned char	*old_rptr;
681 	ptrdiff_t	cur_size;
682 
683 	if (mp == NULL)
684 		return (allocb(size, BPRI_HI));
685 
686 	cur_size = mp->b_wptr - mp->b_rptr;
687 	old_rptr = mp->b_rptr;
688 
689 	ASSERT(mp->b_datap->db_ref != 0);
690 
691 	if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
692 		/*
693 		 * If the data is wanted and it will fit where it is, no
694 		 * work is required.
695 		 */
696 		if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
697 			return (mp);
698 
699 		mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
700 		mp1 = mp;
701 	} else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
702 		/* XXX other mp state could be copied too, db_flags ... ? */
703 		mp1->b_cont = mp->b_cont;
704 	} else {
705 		return (NULL);
706 	}
707 
708 	if (copy) {
709 		bcopy(old_rptr, mp1->b_rptr, cur_size);
710 		mp1->b_wptr = mp1->b_rptr + cur_size;
711 	}
712 
713 	if (mp != mp1)
714 		freeb(mp);
715 
716 	return (mp1);
717 }
718 
719 static void
720 dblk_lastfree(mblk_t *mp, dblk_t *dbp)
721 {
722 	ASSERT(dbp->db_mblk == mp);
723 	if (dbp->db_fthdr != NULL)
724 		str_ftfree(dbp);
725 
726 	/* set credp and projid to be 'unspecified' before returning to cache */
727 	if (dbp->db_credp != NULL) {
728 		crfree(dbp->db_credp);
729 		dbp->db_credp = NULL;
730 	}
731 	dbp->db_cpid = -1;
732 
733 	/* Reset the struioflag and the checksum flag fields */
734 	dbp->db_struioflag = 0;
735 	dbp->db_struioun.cksum.flags = 0;
736 
737 	/* and the COOKED and/or UIOA flag(s) */
738 	dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA);
739 
740 	kmem_cache_free(dbp->db_cache, dbp);
741 }
742 
743 static void
744 dblk_decref(mblk_t *mp, dblk_t *dbp)
745 {
746 	if (dbp->db_ref != 1) {
747 		uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
748 		    -(1 << DBLK_RTFU_SHIFT(db_ref)));
749 		/*
750 		 * atomic_add_32_nv() just decremented db_ref, so we no longer
751 		 * have a reference to the dblk, which means another thread
752 		 * could free it.  Therefore we cannot examine the dblk to
753 		 * determine whether ours was the last reference.  Instead,
754 		 * we extract the new and minimum reference counts from rtfu.
755 		 * Note that all we're really saying is "if (ref != refmin)".
756 		 */
757 		if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
758 		    ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
759 			kmem_cache_free(mblk_cache, mp);
760 			return;
761 		}
762 	}
763 	dbp->db_mblk = mp;
764 	dbp->db_free = dbp->db_lastfree;
765 	dbp->db_lastfree(mp, dbp);
766 }
767 
768 mblk_t *
769 dupb(mblk_t *mp)
770 {
771 	dblk_t *dbp = mp->b_datap;
772 	mblk_t *new_mp;
773 	uint32_t oldrtfu, newrtfu;
774 
775 	if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
776 		goto out;
777 
778 	new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
779 	new_mp->b_rptr = mp->b_rptr;
780 	new_mp->b_wptr = mp->b_wptr;
781 	new_mp->b_datap = dbp;
782 	new_mp->b_queue = NULL;
783 	MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
784 
785 	STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
786 
787 	dbp->db_free = dblk_decref;
788 	do {
789 		ASSERT(dbp->db_ref > 0);
790 		oldrtfu = DBLK_RTFU_WORD(dbp);
791 		newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
792 		/*
793 		 * If db_ref is maxed out we can't dup this message anymore.
794 		 */
795 		if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
796 			kmem_cache_free(mblk_cache, new_mp);
797 			new_mp = NULL;
798 			goto out;
799 		}
800 	} while (atomic_cas_32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) !=
801 	    oldrtfu);
802 
803 out:
804 	FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
805 	return (new_mp);
806 }
807 
808 static void
809 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
810 {
811 	frtn_t *frp = dbp->db_frtnp;
812 
813 	ASSERT(dbp->db_mblk == mp);
814 	frp->free_func(frp->free_arg);
815 	if (dbp->db_fthdr != NULL)
816 		str_ftfree(dbp);
817 
818 	/* set credp and projid to be 'unspecified' before returning to cache */
819 	if (dbp->db_credp != NULL) {
820 		crfree(dbp->db_credp);
821 		dbp->db_credp = NULL;
822 	}
823 	dbp->db_cpid = -1;
824 	dbp->db_struioflag = 0;
825 	dbp->db_struioun.cksum.flags = 0;
826 
827 	kmem_cache_free(dbp->db_cache, dbp);
828 }
829 
830 /*ARGSUSED*/
831 static void
832 frnop_func(void *arg)
833 {
834 }
835 
836 /*
837  * Generic esballoc used to implement the four flavors: [d]esballoc[a].
838  *
839  * The variants with a 'd' prefix (desballoc, desballoca)
840  *	directly free the mblk when it loses its last ref,
841  *	where the other variants free asynchronously.
842  * The variants with an 'a' suffix (esballoca, desballoca)
843  *	add an extra ref, effectively letting the streams subsystem
844  *	know that the message data should not be modified.
845  *	(eg. see db_ref checks in reallocb and elsewhere)
846  *
847  * The method used by the 'a' suffix functions to keep the dblk
848  * db_ref > 1 is non-obvious.  The macro DBLK_RTFU(2,...) passed to
849  * gesballoc sets the initial db_ref = 2 and sets the DBLK_REFMIN
850  * bit in db_flags.  In dblk_decref() that flag essentially means
851  * the dblk has one extra ref, so the "last ref" is one, not zero.
852  */
853 static mblk_t *
854 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
855     void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
856 {
857 	dblk_t *dbp;
858 	mblk_t *mp;
859 
860 	ASSERT(base != NULL && frp != NULL);
861 
862 	if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
863 		mp = NULL;
864 		goto out;
865 	}
866 
867 	mp = dbp->db_mblk;
868 	dbp->db_base = base;
869 	dbp->db_lim = base + size;
870 	dbp->db_free = dbp->db_lastfree = lastfree;
871 	dbp->db_frtnp = frp;
872 	DBLK_RTFU_WORD(dbp) = db_rtfu;
873 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
874 	mp->b_rptr = mp->b_wptr = base;
875 	mp->b_queue = NULL;
876 	MBLK_BAND_FLAG_WORD(mp) = 0;
877 
878 out:
879 	FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
880 	return (mp);
881 }
882 
883 /*ARGSUSED*/
884 mblk_t *
885 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
886 {
887 	mblk_t *mp;
888 
889 	/*
890 	 * Note that this is structured to allow the common case (i.e.
891 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
892 	 * call optimization.
893 	 */
894 	if (!str_ftnever) {
895 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
896 		    frp, freebs_enqueue, KM_NOSLEEP);
897 
898 		if (mp != NULL)
899 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
900 		return (mp);
901 	}
902 
903 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
904 	    frp, freebs_enqueue, KM_NOSLEEP));
905 }
906 
907 /*
908  * Same as esballoc() but sleeps waiting for memory.
909  */
910 /*ARGSUSED*/
911 mblk_t *
912 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
913 {
914 	mblk_t *mp;
915 
916 	/*
917 	 * Note that this is structured to allow the common case (i.e.
918 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
919 	 * call optimization.
920 	 */
921 	if (!str_ftnever) {
922 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
923 		    frp, freebs_enqueue, KM_SLEEP);
924 
925 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
926 		return (mp);
927 	}
928 
929 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
930 	    frp, freebs_enqueue, KM_SLEEP));
931 }
932 
933 /*ARGSUSED*/
934 mblk_t *
935 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
936 {
937 	mblk_t *mp;
938 
939 	/*
940 	 * Note that this is structured to allow the common case (i.e.
941 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
942 	 * call optimization.
943 	 */
944 	if (!str_ftnever) {
945 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
946 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
947 
948 		if (mp != NULL)
949 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
950 		return (mp);
951 	}
952 
953 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
954 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
955 }
956 
957 /*ARGSUSED*/
958 mblk_t *
959 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
960 {
961 	mblk_t *mp;
962 
963 	/*
964 	 * Note that this is structured to allow the common case (i.e.
965 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
966 	 * call optimization.
967 	 */
968 	if (!str_ftnever) {
969 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
970 		    frp, freebs_enqueue, KM_NOSLEEP);
971 
972 		if (mp != NULL)
973 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
974 		return (mp);
975 	}
976 
977 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
978 	    frp, freebs_enqueue, KM_NOSLEEP));
979 }
980 
981 /*
982  * Same as esballoca() but sleeps waiting for memory.
983  */
984 mblk_t *
985 esballoca_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
986 {
987 	mblk_t *mp;
988 
989 	/*
990 	 * Note that this is structured to allow the common case (i.e.
991 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
992 	 * call optimization.
993 	 */
994 	if (!str_ftnever) {
995 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
996 		    frp, freebs_enqueue, KM_SLEEP);
997 
998 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
999 		return (mp);
1000 	}
1001 
1002 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
1003 	    frp, freebs_enqueue, KM_SLEEP));
1004 }
1005 
1006 /*ARGSUSED*/
1007 mblk_t *
1008 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
1009 {
1010 	mblk_t *mp;
1011 
1012 	/*
1013 	 * Note that this is structured to allow the common case (i.e.
1014 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
1015 	 * call optimization.
1016 	 */
1017 	if (!str_ftnever) {
1018 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
1019 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
1020 
1021 		if (mp != NULL)
1022 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
1023 		return (mp);
1024 	}
1025 
1026 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
1027 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
1028 }
1029 
1030 static void
1031 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
1032 {
1033 	bcache_t *bcp = dbp->db_cache;
1034 
1035 	ASSERT(dbp->db_mblk == mp);
1036 	if (dbp->db_fthdr != NULL)
1037 		str_ftfree(dbp);
1038 
1039 	/* set credp and projid to be 'unspecified' before returning to cache */
1040 	if (dbp->db_credp != NULL) {
1041 		crfree(dbp->db_credp);
1042 		dbp->db_credp = NULL;
1043 	}
1044 	dbp->db_cpid = -1;
1045 	dbp->db_struioflag = 0;
1046 	dbp->db_struioun.cksum.flags = 0;
1047 
1048 	mutex_enter(&bcp->mutex);
1049 	kmem_cache_free(bcp->dblk_cache, dbp);
1050 	bcp->alloc--;
1051 
1052 	if (bcp->alloc == 0 && bcp->destroy != 0) {
1053 		kmem_cache_destroy(bcp->dblk_cache);
1054 		kmem_cache_destroy(bcp->buffer_cache);
1055 		mutex_exit(&bcp->mutex);
1056 		mutex_destroy(&bcp->mutex);
1057 		kmem_free(bcp, sizeof (bcache_t));
1058 	} else {
1059 		mutex_exit(&bcp->mutex);
1060 	}
1061 }
1062 
1063 bcache_t *
1064 bcache_create(char *name, size_t size, uint_t align)
1065 {
1066 	bcache_t *bcp;
1067 	char buffer[255];
1068 
1069 	ASSERT((align & (align - 1)) == 0);
1070 
1071 	if ((bcp = kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == NULL)
1072 		return (NULL);
1073 
1074 	bcp->size = size;
1075 	bcp->align = align;
1076 	bcp->alloc = 0;
1077 	bcp->destroy = 0;
1078 
1079 	mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
1080 
1081 	(void) sprintf(buffer, "%s_buffer_cache", name);
1082 	bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
1083 	    NULL, NULL, NULL, 0);
1084 	(void) sprintf(buffer, "%s_dblk_cache", name);
1085 	bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
1086 	    DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
1087 	    NULL, (void *)bcp, NULL, 0);
1088 
1089 	return (bcp);
1090 }
1091 
1092 void
1093 bcache_destroy(bcache_t *bcp)
1094 {
1095 	ASSERT(bcp != NULL);
1096 
1097 	mutex_enter(&bcp->mutex);
1098 	if (bcp->alloc == 0) {
1099 		kmem_cache_destroy(bcp->dblk_cache);
1100 		kmem_cache_destroy(bcp->buffer_cache);
1101 		mutex_exit(&bcp->mutex);
1102 		mutex_destroy(&bcp->mutex);
1103 		kmem_free(bcp, sizeof (bcache_t));
1104 	} else {
1105 		bcp->destroy++;
1106 		mutex_exit(&bcp->mutex);
1107 	}
1108 }
1109 
1110 /*ARGSUSED*/
1111 mblk_t *
1112 bcache_allocb(bcache_t *bcp, uint_t pri)
1113 {
1114 	dblk_t *dbp;
1115 	mblk_t *mp = NULL;
1116 
1117 	ASSERT(bcp != NULL);
1118 
1119 	mutex_enter(&bcp->mutex);
1120 	if (bcp->destroy != 0) {
1121 		mutex_exit(&bcp->mutex);
1122 		goto out;
1123 	}
1124 
1125 	if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
1126 		mutex_exit(&bcp->mutex);
1127 		goto out;
1128 	}
1129 	bcp->alloc++;
1130 	mutex_exit(&bcp->mutex);
1131 
1132 	ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
1133 
1134 	mp = dbp->db_mblk;
1135 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1136 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
1137 	mp->b_rptr = mp->b_wptr = dbp->db_base;
1138 	mp->b_queue = NULL;
1139 	MBLK_BAND_FLAG_WORD(mp) = 0;
1140 	STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
1141 out:
1142 	FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
1143 
1144 	return (mp);
1145 }
1146 
1147 static void
1148 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
1149 {
1150 	ASSERT(dbp->db_mblk == mp);
1151 	if (dbp->db_fthdr != NULL)
1152 		str_ftfree(dbp);
1153 
1154 	/* set credp and projid to be 'unspecified' before returning to cache */
1155 	if (dbp->db_credp != NULL) {
1156 		crfree(dbp->db_credp);
1157 		dbp->db_credp = NULL;
1158 	}
1159 	dbp->db_cpid = -1;
1160 	dbp->db_struioflag = 0;
1161 	dbp->db_struioun.cksum.flags = 0;
1162 
1163 	kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
1164 	kmem_cache_free(dbp->db_cache, dbp);
1165 }
1166 
1167 static mblk_t *
1168 allocb_oversize(size_t size, int kmflags)
1169 {
1170 	mblk_t *mp;
1171 	void *buf;
1172 
1173 	size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
1174 	if ((buf = kmem_alloc(size, kmflags)) == NULL)
1175 		return (NULL);
1176 	if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
1177 	    &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
1178 		kmem_free(buf, size);
1179 
1180 	if (mp != NULL)
1181 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
1182 
1183 	return (mp);
1184 }
1185 
1186 mblk_t *
1187 allocb_tryhard(size_t target_size)
1188 {
1189 	size_t size;
1190 	mblk_t *bp;
1191 
1192 	for (size = target_size; size < target_size + 512;
1193 	    size += DBLK_CACHE_ALIGN)
1194 		if ((bp = allocb(size, BPRI_HI)) != NULL)
1195 			return (bp);
1196 	allocb_tryhard_fails++;
1197 	return (NULL);
1198 }
1199 
1200 /*
1201  * This routine is consolidation private for STREAMS internal use
1202  * This routine may only be called from sync routines (i.e., not
1203  * from put or service procedures).  It is located here (rather
1204  * than strsubr.c) so that we don't have to expose all of the
1205  * allocb() implementation details in header files.
1206  */
1207 mblk_t *
1208 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
1209 {
1210 	dblk_t *dbp;
1211 	mblk_t *mp;
1212 	size_t index;
1213 
1214 	index = (size -1) >> DBLK_SIZE_SHIFT;
1215 
1216 	if (flags & STR_NOSIG) {
1217 		if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
1218 			if (size != 0) {
1219 				mp = allocb_oversize(size, KM_SLEEP);
1220 				FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
1221 				    (uintptr_t)mp);
1222 				return (mp);
1223 			}
1224 			index = 0;
1225 		}
1226 
1227 		dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
1228 		mp = dbp->db_mblk;
1229 		DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1230 		mp->b_next = mp->b_prev = mp->b_cont = NULL;
1231 		mp->b_rptr = mp->b_wptr = dbp->db_base;
1232 		mp->b_queue = NULL;
1233 		MBLK_BAND_FLAG_WORD(mp) = 0;
1234 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
1235 
1236 		FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
1237 
1238 	} else {
1239 		while ((mp = allocb(size, pri)) == NULL) {
1240 			if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
1241 				return (NULL);
1242 		}
1243 	}
1244 
1245 	return (mp);
1246 }
1247 
1248 /*
1249  * Call function 'func' with 'arg' when a class zero block can
1250  * be allocated with priority 'pri'.
1251  */
1252 bufcall_id_t
1253 esbbcall(uint_t pri, void (*func)(void *), void *arg)
1254 {
1255 	return (bufcall(1, pri, func, arg));
1256 }
1257 
1258 /*
1259  * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
1260  * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
1261  * This provides consistency for all internal allocators of ioctl.
1262  */
1263 mblk_t *
1264 mkiocb(uint_t cmd)
1265 {
1266 	struct iocblk	*ioc;
1267 	mblk_t		*mp;
1268 
1269 	/*
1270 	 * Allocate enough space for any of the ioctl related messages.
1271 	 */
1272 	if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
1273 		return (NULL);
1274 
1275 	bzero(mp->b_rptr, sizeof (union ioctypes));
1276 
1277 	/*
1278 	 * Set the mblk_t information and ptrs correctly.
1279 	 */
1280 	mp->b_wptr += sizeof (struct iocblk);
1281 	mp->b_datap->db_type = M_IOCTL;
1282 
1283 	/*
1284 	 * Fill in the fields.
1285 	 */
1286 	ioc		= (struct iocblk *)mp->b_rptr;
1287 	ioc->ioc_cmd	= cmd;
1288 	ioc->ioc_cr	= kcred;
1289 	ioc->ioc_id	= getiocseqno();
1290 	ioc->ioc_flag	= IOC_NATIVE;
1291 	return (mp);
1292 }
1293 
1294 /*
1295  * test if block of given size can be allocated with a request of
1296  * the given priority.
1297  * 'pri' is no longer used, but is retained for compatibility.
1298  */
1299 /* ARGSUSED */
1300 int
1301 testb(size_t size, uint_t pri)
1302 {
1303 	return ((size + sizeof (dblk_t)) <= kmem_avail());
1304 }
1305 
1306 /*
1307  * Call function 'func' with argument 'arg' when there is a reasonably
1308  * good chance that a block of size 'size' can be allocated.
1309  * 'pri' is no longer used, but is retained for compatibility.
1310  */
1311 /* ARGSUSED */
1312 bufcall_id_t
1313 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
1314 {
1315 	static long bid = 1;	/* always odd to save checking for zero */
1316 	bufcall_id_t bc_id;
1317 	struct strbufcall *bcp;
1318 
1319 	if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
1320 		return (0);
1321 
1322 	bcp->bc_func = func;
1323 	bcp->bc_arg = arg;
1324 	bcp->bc_size = size;
1325 	bcp->bc_next = NULL;
1326 	bcp->bc_executor = NULL;
1327 
1328 	mutex_enter(&strbcall_lock);
1329 	/*
1330 	 * After bcp is linked into strbcalls and strbcall_lock is dropped there
1331 	 * should be no references to bcp since it may be freed by
1332 	 * runbufcalls(). Since bcp_id field is returned, we save its value in
1333 	 * the local var.
1334 	 */
1335 	bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2);	/* keep it odd */
1336 
1337 	/*
1338 	 * add newly allocated stream event to existing
1339 	 * linked list of events.
1340 	 */
1341 	if (strbcalls.bc_head == NULL) {
1342 		strbcalls.bc_head = strbcalls.bc_tail = bcp;
1343 	} else {
1344 		strbcalls.bc_tail->bc_next = bcp;
1345 		strbcalls.bc_tail = bcp;
1346 	}
1347 
1348 	cv_signal(&strbcall_cv);
1349 	mutex_exit(&strbcall_lock);
1350 	return (bc_id);
1351 }
1352 
1353 /*
1354  * Cancel a bufcall request.
1355  */
1356 void
1357 unbufcall(bufcall_id_t id)
1358 {
1359 	strbufcall_t *bcp, *pbcp;
1360 
1361 	mutex_enter(&strbcall_lock);
1362 again:
1363 	pbcp = NULL;
1364 	for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
1365 		if (id == bcp->bc_id)
1366 			break;
1367 		pbcp = bcp;
1368 	}
1369 	if (bcp) {
1370 		if (bcp->bc_executor != NULL) {
1371 			if (bcp->bc_executor != curthread) {
1372 				cv_wait(&bcall_cv, &strbcall_lock);
1373 				goto again;
1374 			}
1375 		} else {
1376 			if (pbcp)
1377 				pbcp->bc_next = bcp->bc_next;
1378 			else
1379 				strbcalls.bc_head = bcp->bc_next;
1380 			if (bcp == strbcalls.bc_tail)
1381 				strbcalls.bc_tail = pbcp;
1382 			kmem_free(bcp, sizeof (strbufcall_t));
1383 		}
1384 	}
1385 	mutex_exit(&strbcall_lock);
1386 }
1387 
1388 /*
1389  * Duplicate a message block by block (uses dupb), returning
1390  * a pointer to the duplicate message.
1391  * Returns a non-NULL value only if the entire message
1392  * was dup'd.
1393  */
1394 mblk_t *
1395 dupmsg(mblk_t *bp)
1396 {
1397 	mblk_t *head, *nbp;
1398 
1399 	if (!bp || !(nbp = head = dupb(bp)))
1400 		return (NULL);
1401 
1402 	while (bp->b_cont) {
1403 		if (!(nbp->b_cont = dupb(bp->b_cont))) {
1404 			freemsg(head);
1405 			return (NULL);
1406 		}
1407 		nbp = nbp->b_cont;
1408 		bp = bp->b_cont;
1409 	}
1410 	return (head);
1411 }
1412 
1413 #define	DUPB_NOLOAN(bp) \
1414 	((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
1415 	copyb((bp)) : dupb((bp)))
1416 
1417 mblk_t *
1418 dupmsg_noloan(mblk_t *bp)
1419 {
1420 	mblk_t *head, *nbp;
1421 
1422 	if (bp == NULL || DB_TYPE(bp) != M_DATA ||
1423 	    ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
1424 		return (NULL);
1425 
1426 	while (bp->b_cont) {
1427 		if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
1428 			freemsg(head);
1429 			return (NULL);
1430 		}
1431 		nbp = nbp->b_cont;
1432 		bp = bp->b_cont;
1433 	}
1434 	return (head);
1435 }
1436 
1437 /*
1438  * Copy data from message and data block to newly allocated message and
1439  * data block. Returns new message block pointer, or NULL if error.
1440  * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
1441  * as in the original even when db_base is not word aligned. (bug 1052877)
1442  */
1443 mblk_t *
1444 copyb(mblk_t *bp)
1445 {
1446 	mblk_t	*nbp;
1447 	dblk_t	*dp, *ndp;
1448 	uchar_t *base;
1449 	size_t	size;
1450 	size_t	unaligned;
1451 
1452 	ASSERT(bp->b_wptr >= bp->b_rptr);
1453 
1454 	dp = bp->b_datap;
1455 	if (dp->db_fthdr != NULL)
1456 		STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
1457 
1458 	size = dp->db_lim - dp->db_base;
1459 	unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
1460 	if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
1461 		return (NULL);
1462 	nbp->b_flag = bp->b_flag;
1463 	nbp->b_band = bp->b_band;
1464 	ndp = nbp->b_datap;
1465 
1466 	/*
1467 	 * Copy the various checksum information that came in
1468 	 * originally.
1469 	 */
1470 	ndp->db_cksumstart = dp->db_cksumstart;
1471 	ndp->db_cksumend = dp->db_cksumend;
1472 	ndp->db_cksumstuff = dp->db_cksumstuff;
1473 	bcopy(dp->db_struioun.data, ndp->db_struioun.data,
1474 	    sizeof (dp->db_struioun.data));
1475 
1476 	/*
1477 	 * Well, here is a potential issue.  If we are trying to
1478 	 * trace a flow, and we copy the message, we might lose
1479 	 * information about where this message might have been.
1480 	 * So we should inherit the FT data.  On the other hand,
1481 	 * a user might be interested only in alloc to free data.
1482 	 * So I guess the real answer is to provide a tunable.
1483 	 */
1484 	STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1485 
1486 	base = ndp->db_base + unaligned;
1487 	bcopy(dp->db_base, ndp->db_base + unaligned, size);
1488 
1489 	nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
1490 	nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
1491 
1492 	return (nbp);
1493 }
1494 
1495 /*
1496  * Copy data from message to newly allocated message using new
1497  * data blocks.  Returns a pointer to the new message, or NULL if error.
1498  */
1499 mblk_t *
1500 copymsg(mblk_t *bp)
1501 {
1502 	mblk_t *head, *nbp;
1503 
1504 	if (!bp || !(nbp = head = copyb(bp)))
1505 		return (NULL);
1506 
1507 	while (bp->b_cont) {
1508 		if (!(nbp->b_cont = copyb(bp->b_cont))) {
1509 			freemsg(head);
1510 			return (NULL);
1511 		}
1512 		nbp = nbp->b_cont;
1513 		bp = bp->b_cont;
1514 	}
1515 	return (head);
1516 }
1517 
1518 /*
1519  * link a message block to tail of message
1520  */
1521 void
1522 linkb(mblk_t *mp, mblk_t *bp)
1523 {
1524 	ASSERT(mp && bp);
1525 
1526 	for (; mp->b_cont; mp = mp->b_cont)
1527 		;
1528 	mp->b_cont = bp;
1529 }
1530 
1531 /*
1532  * unlink a message block from head of message
1533  * return pointer to new message.
1534  * NULL if message becomes empty.
1535  */
1536 mblk_t *
1537 unlinkb(mblk_t *bp)
1538 {
1539 	mblk_t *bp1;
1540 
1541 	bp1 = bp->b_cont;
1542 	bp->b_cont = NULL;
1543 	return (bp1);
1544 }
1545 
1546 /*
1547  * remove a message block "bp" from message "mp"
1548  *
1549  * Return pointer to new message or NULL if no message remains.
1550  * Return -1 if bp is not found in message.
1551  */
1552 mblk_t *
1553 rmvb(mblk_t *mp, mblk_t *bp)
1554 {
1555 	mblk_t *tmp;
1556 	mblk_t *lastp = NULL;
1557 
1558 	ASSERT(mp && bp);
1559 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
1560 		if (tmp == bp) {
1561 			if (lastp)
1562 				lastp->b_cont = tmp->b_cont;
1563 			else
1564 				mp = tmp->b_cont;
1565 			tmp->b_cont = NULL;
1566 			return (mp);
1567 		}
1568 		lastp = tmp;
1569 	}
1570 	return ((mblk_t *)-1);
1571 }
1572 
1573 /*
1574  * Concatenate and align first len bytes of common
1575  * message type.  Len == -1, means concat everything.
1576  * Returns 1 on success, 0 on failure
1577  * After the pullup, mp points to the pulled up data.
1578  */
1579 int
1580 pullupmsg(mblk_t *mp, ssize_t len)
1581 {
1582 	mblk_t *bp, *b_cont;
1583 	dblk_t *dbp;
1584 	ssize_t n;
1585 
1586 	ASSERT(mp->b_datap->db_ref > 0);
1587 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
1588 
1589 	if (len == -1) {
1590 		if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
1591 			return (1);
1592 		len = xmsgsize(mp);
1593 	} else {
1594 		ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
1595 		ASSERT(first_mblk_len >= 0);
1596 		/*
1597 		 * If the length is less than that of the first mblk,
1598 		 * we want to pull up the message into an aligned mblk.
1599 		 * Though not part of the spec, some callers assume it.
1600 		 */
1601 		if (len <= first_mblk_len) {
1602 			if (str_aligned(mp->b_rptr))
1603 				return (1);
1604 			len = first_mblk_len;
1605 		} else if (xmsgsize(mp) < len)
1606 			return (0);
1607 	}
1608 
1609 	if ((bp = allocb_tmpl(len, mp)) == NULL)
1610 		return (0);
1611 
1612 	dbp = bp->b_datap;
1613 	*bp = *mp;		/* swap mblks so bp heads the old msg... */
1614 	mp->b_datap = dbp;	/* ... and mp heads the new message */
1615 	mp->b_datap->db_mblk = mp;
1616 	bp->b_datap->db_mblk = bp;
1617 	mp->b_rptr = mp->b_wptr = dbp->db_base;
1618 
1619 	do {
1620 		ASSERT(bp->b_datap->db_ref > 0);
1621 		ASSERT(bp->b_wptr >= bp->b_rptr);
1622 		n = MIN(bp->b_wptr - bp->b_rptr, len);
1623 		ASSERT(n >= 0);		/* allow zero-length mblk_t's */
1624 		if (n > 0)
1625 			bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
1626 		mp->b_wptr += n;
1627 		bp->b_rptr += n;
1628 		len -= n;
1629 		if (bp->b_rptr != bp->b_wptr)
1630 			break;
1631 		b_cont = bp->b_cont;
1632 		freeb(bp);
1633 		bp = b_cont;
1634 	} while (len && bp);
1635 
1636 	mp->b_cont = bp;	/* tack on whatever wasn't pulled up */
1637 
1638 	return (1);
1639 }
1640 
1641 /*
1642  * Concatenate and align at least the first len bytes of common message
1643  * type.  Len == -1 means concatenate everything.  The original message is
1644  * unaltered.  Returns a pointer to a new message on success, otherwise
1645  * returns NULL.
1646  */
1647 mblk_t *
1648 msgpullup(mblk_t *mp, ssize_t len)
1649 {
1650 	mblk_t *newmp;
1651 	ssize_t totlen = xmsgsize(mp);
1652 	ssize_t offset = 0;
1653 
1654 	if (len == -1)
1655 		len = totlen;
1656 
1657 	if (len < 0 || (len > 0 && len > totlen))
1658 		return (NULL);
1659 
1660 	if ((newmp = allocb_tmpl(len, mp)) == NULL)
1661 		return (NULL);
1662 
1663 	newmp->b_flag = mp->b_flag;
1664 	newmp->b_band = mp->b_band;
1665 
1666 	while (len > 0) {
1667 		ssize_t seglen = MBLKL(mp);
1668 		ssize_t n = MIN(seglen, len);
1669 
1670 		ASSERT3P(mp, !=, NULL);	/* guaranteed by len <= totlen */
1671 		ASSERT3S(n, >=, 0);	/* allow zero-length mblk_t's */
1672 		if (n > 0)
1673 			bcopy(mp->b_rptr, newmp->b_wptr, n);
1674 		newmp->b_wptr += n;
1675 		len -= n;
1676 
1677 		if (n == seglen)
1678 			mp = mp->b_cont;
1679 		else if (len == 0)
1680 			offset = n;
1681 	}
1682 	ASSERT3S(len, ==, 0);
1683 
1684 	if (mp != NULL) {
1685 		newmp->b_cont = dupmsg(mp);
1686 		if (newmp->b_cont == NULL) {
1687 			freemsg(newmp);
1688 			return (NULL);
1689 		}
1690 		ASSERT3S(offset, >=, 0);
1691 		ASSERT3U(MBLKL(newmp->b_cont), >=, offset);
1692 		newmp->b_cont->b_rptr += offset;
1693 	}
1694 
1695 	return (newmp);
1696 }
1697 
1698 /*
1699  * Trim bytes from message
1700  *  len > 0, trim from head
1701  *  len < 0, trim from tail
1702  * Returns 1 on success, 0 on failure.
1703  */
1704 int
1705 adjmsg(mblk_t *mp, ssize_t len)
1706 {
1707 	mblk_t *bp;
1708 	mblk_t *save_bp = NULL;
1709 	mblk_t *prev_bp;
1710 	mblk_t *bcont;
1711 	unsigned char type;
1712 	ssize_t n;
1713 	int fromhead;
1714 	int first;
1715 
1716 	ASSERT(mp != NULL);
1717 
1718 	if (len < 0) {
1719 		fromhead = 0;
1720 		len = -len;
1721 	} else {
1722 		fromhead = 1;
1723 	}
1724 
1725 	if (xmsgsize(mp) < len)
1726 		return (0);
1727 
1728 	if (fromhead) {
1729 		first = 1;
1730 		while (len) {
1731 			ASSERT(mp->b_wptr >= mp->b_rptr);
1732 			n = MIN(mp->b_wptr - mp->b_rptr, len);
1733 			mp->b_rptr += n;
1734 			len -= n;
1735 
1736 			/*
1737 			 * If this is not the first zero length
1738 			 * message remove it
1739 			 */
1740 			if (!first && (mp->b_wptr == mp->b_rptr)) {
1741 				bcont = mp->b_cont;
1742 				freeb(mp);
1743 				mp = save_bp->b_cont = bcont;
1744 			} else {
1745 				save_bp = mp;
1746 				mp = mp->b_cont;
1747 			}
1748 			first = 0;
1749 		}
1750 	} else {
1751 		type = mp->b_datap->db_type;
1752 		while (len) {
1753 			bp = mp;
1754 			save_bp = NULL;
1755 
1756 			/*
1757 			 * Find the last message of same type
1758 			 */
1759 			while (bp && bp->b_datap->db_type == type) {
1760 				ASSERT(bp->b_wptr >= bp->b_rptr);
1761 				prev_bp = save_bp;
1762 				save_bp = bp;
1763 				bp = bp->b_cont;
1764 			}
1765 			if (save_bp == NULL)
1766 				break;
1767 			n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
1768 			save_bp->b_wptr -= n;
1769 			len -= n;
1770 
1771 			/*
1772 			 * If this is not the first message
1773 			 * and we have taken away everything
1774 			 * from this message, remove it
1775 			 */
1776 
1777 			if ((save_bp != mp) &&
1778 			    (save_bp->b_wptr == save_bp->b_rptr)) {
1779 				bcont = save_bp->b_cont;
1780 				freeb(save_bp);
1781 				prev_bp->b_cont = bcont;
1782 			}
1783 		}
1784 	}
1785 	return (1);
1786 }
1787 
1788 /*
1789  * get number of data bytes in message
1790  */
1791 size_t
1792 msgdsize(mblk_t *bp)
1793 {
1794 	size_t count = 0;
1795 
1796 	for (; bp; bp = bp->b_cont)
1797 		if (bp->b_datap->db_type == M_DATA) {
1798 			ASSERT(bp->b_wptr >= bp->b_rptr);
1799 			count += bp->b_wptr - bp->b_rptr;
1800 		}
1801 	return (count);
1802 }
1803 
1804 /*
1805  * Get a message off head of queue
1806  *
1807  * If queue has no buffers then mark queue
1808  * with QWANTR. (queue wants to be read by
1809  * someone when data becomes available)
1810  *
1811  * If there is something to take off then do so.
1812  * If queue falls below hi water mark turn off QFULL
1813  * flag.  Decrement weighted count of queue.
1814  * Also turn off QWANTR because queue is being read.
1815  *
1816  * The queue count is maintained on a per-band basis.
1817  * Priority band 0 (normal messages) uses q_count,
1818  * q_lowat, etc.  Non-zero priority bands use the
1819  * fields in their respective qband structures
1820  * (qb_count, qb_lowat, etc.)  All messages appear
1821  * on the same list, linked via their b_next pointers.
1822  * q_first is the head of the list.  q_count does
1823  * not reflect the size of all the messages on the
1824  * queue.  It only reflects those messages in the
1825  * normal band of flow.  The one exception to this
1826  * deals with high priority messages.  They are in
1827  * their own conceptual "band", but are accounted
1828  * against q_count.
1829  *
1830  * If queue count is below the lo water mark and QWANTW
1831  * is set, enable the closest backq which has a service
1832  * procedure and turn off the QWANTW flag.
1833  *
1834  * getq could be built on top of rmvq, but isn't because
1835  * of performance considerations.
1836  *
1837  * A note on the use of q_count and q_mblkcnt:
1838  *   q_count is the traditional byte count for messages that
1839  *   have been put on a queue.  Documentation tells us that
1840  *   we shouldn't rely on that count, but some drivers/modules
1841  *   do.  What was needed, however, is a mechanism to prevent
1842  *   runaway streams from consuming all of the resources,
1843  *   and particularly be able to flow control zero-length
1844  *   messages.  q_mblkcnt is used for this purpose.  It
1845  *   counts the number of mblk's that are being put on
1846  *   the queue.  The intention here, is that each mblk should
1847  *   contain one byte of data and, for the purpose of
1848  *   flow-control, logically does.  A queue will become
1849  *   full when EITHER of these values (q_count and q_mblkcnt)
1850  *   reach the highwater mark.  It will clear when BOTH
1851  *   of them drop below the highwater mark.  And it will
1852  *   backenable when BOTH of them drop below the lowwater
1853  *   mark.
1854  *   With this algorithm, a driver/module might be able
1855  *   to find a reasonably accurate q_count, and the
1856  *   framework can still try and limit resource usage.
1857  */
1858 mblk_t *
1859 getq(queue_t *q)
1860 {
1861 	mblk_t *bp;
1862 	uchar_t band = 0;
1863 
1864 	bp = getq_noenab(q, 0);
1865 	if (bp != NULL)
1866 		band = bp->b_band;
1867 
1868 	/*
1869 	 * Inlined from qbackenable().
1870 	 * Quick check without holding the lock.
1871 	 */
1872 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1873 		return (bp);
1874 
1875 	qbackenable(q, band);
1876 	return (bp);
1877 }
1878 
1879 /*
1880  * Returns the number of bytes in a message (a message is defined as a
1881  * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we
1882  * also return the number of distinct mblks in the message.
1883  */
1884 int
1885 mp_cont_len(mblk_t *bp, int *mblkcnt)
1886 {
1887 	mblk_t	*mp;
1888 	int	mblks = 0;
1889 	int	bytes = 0;
1890 
1891 	for (mp = bp; mp != NULL; mp = mp->b_cont) {
1892 		bytes += MBLKL(mp);
1893 		mblks++;
1894 	}
1895 
1896 	if (mblkcnt != NULL)
1897 		*mblkcnt = mblks;
1898 
1899 	return (bytes);
1900 }
1901 
1902 /*
1903  * Like getq() but does not backenable.  This is used by the stream
1904  * head when a putback() is likely.  The caller must call qbackenable()
1905  * after it is done with accessing the queue.
1906  * The rbytes arguments to getq_noneab() allows callers to specify a
1907  * the maximum number of bytes to return. If the current amount on the
1908  * queue is less than this then the entire message will be returned.
1909  * A value of 0 returns the entire message and is equivalent to the old
1910  * default behaviour prior to the addition of the rbytes argument.
1911  */
1912 mblk_t *
1913 getq_noenab(queue_t *q, ssize_t rbytes)
1914 {
1915 	mblk_t *bp, *mp1;
1916 	mblk_t *mp2 = NULL;
1917 	qband_t *qbp;
1918 	kthread_id_t freezer;
1919 	int	bytecnt = 0, mblkcnt = 0;
1920 
1921 	/* freezestr should allow its caller to call getq/putq */
1922 	freezer = STREAM(q)->sd_freezer;
1923 	if (freezer == curthread) {
1924 		ASSERT(frozenstr(q));
1925 		ASSERT(MUTEX_HELD(QLOCK(q)));
1926 	} else
1927 		mutex_enter(QLOCK(q));
1928 
1929 	if ((bp = q->q_first) == 0) {
1930 		q->q_flag |= QWANTR;
1931 	} else {
1932 		/*
1933 		 * If the caller supplied a byte threshold and there is
1934 		 * more than this amount on the queue then break up the
1935 		 * the message appropriately.  We can only safely do
1936 		 * this for M_DATA messages.
1937 		 */
1938 		if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) &&
1939 		    (q->q_count > rbytes)) {
1940 			/*
1941 			 * Inline version of mp_cont_len() which terminates
1942 			 * when we meet or exceed rbytes.
1943 			 */
1944 			for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) {
1945 				mblkcnt++;
1946 				bytecnt += MBLKL(mp1);
1947 				if (bytecnt  >= rbytes)
1948 					break;
1949 			}
1950 			/*
1951 			 * We need to account for the following scenarios:
1952 			 *
1953 			 * 1) Too much data in the first message:
1954 			 *	mp1 will be the mblk which puts us over our
1955 			 *	byte limit.
1956 			 * 2) Not enough data in the first message:
1957 			 *	mp1 will be NULL.
1958 			 * 3) Exactly the right amount of data contained within
1959 			 *    whole mblks:
1960 			 *	mp1->b_cont will be where we break the message.
1961 			 */
1962 			if (bytecnt > rbytes) {
1963 				/*
1964 				 * Dup/copy mp1 and put what we don't need
1965 				 * back onto the queue. Adjust the read/write
1966 				 * and continuation pointers appropriately
1967 				 * and decrement the current mblk count to
1968 				 * reflect we are putting an mblk back onto
1969 				 * the queue.
1970 				 * When adjusting the message pointers, it's
1971 				 * OK to use the existing bytecnt and the
1972 				 * requested amount (rbytes) to calculate the
1973 				 * the new write offset (b_wptr) of what we
1974 				 * are taking. However, we  cannot use these
1975 				 * values when calculating the read offset of
1976 				 * the mblk we are putting back on the queue.
1977 				 * This is because the begining (b_rptr) of the
1978 				 * mblk represents some arbitrary point within
1979 				 * the message.
1980 				 * It's simplest to do this by advancing b_rptr
1981 				 * by the new length of mp1 as we don't have to
1982 				 * remember any intermediate state.
1983 				 */
1984 				ASSERT(mp1 != NULL);
1985 				mblkcnt--;
1986 				if ((mp2 = dupb(mp1)) == NULL &&
1987 				    (mp2 = copyb(mp1)) == NULL) {
1988 					bytecnt = mblkcnt = 0;
1989 					goto dup_failed;
1990 				}
1991 				mp2->b_cont = mp1->b_cont;
1992 				mp1->b_wptr -= bytecnt - rbytes;
1993 				mp2->b_rptr += mp1->b_wptr - mp1->b_rptr;
1994 				mp1->b_cont = NULL;
1995 				bytecnt = rbytes;
1996 			} else {
1997 				/*
1998 				 * Either there is not enough data in the first
1999 				 * message or there is no excess data to deal
2000 				 * with. If mp1 is NULL, we are taking the
2001 				 * whole message. No need to do anything.
2002 				 * Otherwise we assign mp1->b_cont to mp2 as
2003 				 * we will be putting this back onto the head of
2004 				 * the queue.
2005 				 */
2006 				if (mp1 != NULL) {
2007 					mp2 = mp1->b_cont;
2008 					mp1->b_cont = NULL;
2009 				}
2010 			}
2011 			/*
2012 			 * If mp2 is not NULL then we have part of the message
2013 			 * to put back onto the queue.
2014 			 */
2015 			if (mp2 != NULL) {
2016 				if ((mp2->b_next = bp->b_next) == NULL)
2017 					q->q_last = mp2;
2018 				else
2019 					bp->b_next->b_prev = mp2;
2020 				q->q_first = mp2;
2021 			} else {
2022 				if ((q->q_first = bp->b_next) == NULL)
2023 					q->q_last = NULL;
2024 				else
2025 					q->q_first->b_prev = NULL;
2026 			}
2027 		} else {
2028 			/*
2029 			 * Either no byte threshold was supplied, there is
2030 			 * not enough on the queue or we failed to
2031 			 * duplicate/copy a data block. In these cases we
2032 			 * just take the entire first message.
2033 			 */
2034 dup_failed:
2035 			bytecnt = mp_cont_len(bp, &mblkcnt);
2036 			if ((q->q_first = bp->b_next) == NULL)
2037 				q->q_last = NULL;
2038 			else
2039 				q->q_first->b_prev = NULL;
2040 		}
2041 		if (bp->b_band == 0) {
2042 			q->q_count -= bytecnt;
2043 			q->q_mblkcnt -= mblkcnt;
2044 			if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2045 			    (q->q_mblkcnt < q->q_hiwat))) {
2046 				q->q_flag &= ~QFULL;
2047 			}
2048 		} else {
2049 			int i;
2050 
2051 			ASSERT(bp->b_band <= q->q_nband);
2052 			ASSERT(q->q_bandp != NULL);
2053 			ASSERT(MUTEX_HELD(QLOCK(q)));
2054 			qbp = q->q_bandp;
2055 			i = bp->b_band;
2056 			while (--i > 0)
2057 				qbp = qbp->qb_next;
2058 			if (qbp->qb_first == qbp->qb_last) {
2059 				qbp->qb_first = NULL;
2060 				qbp->qb_last = NULL;
2061 			} else {
2062 				qbp->qb_first = bp->b_next;
2063 			}
2064 			qbp->qb_count -= bytecnt;
2065 			qbp->qb_mblkcnt -= mblkcnt;
2066 			if (qbp->qb_mblkcnt == 0 ||
2067 			    ((qbp->qb_count < qbp->qb_hiwat) &&
2068 			    (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2069 				qbp->qb_flag &= ~QB_FULL;
2070 			}
2071 		}
2072 		q->q_flag &= ~QWANTR;
2073 		bp->b_next = NULL;
2074 		bp->b_prev = NULL;
2075 	}
2076 	if (freezer != curthread)
2077 		mutex_exit(QLOCK(q));
2078 
2079 	STR_FTEVENT_MSG(bp, q, FTEV_GETQ, 0);
2080 
2081 	return (bp);
2082 }
2083 
2084 /*
2085  * Determine if a backenable is needed after removing a message in the
2086  * specified band.
2087  * NOTE: This routine assumes that something like getq_noenab() has been
2088  * already called.
2089  *
2090  * For the read side it is ok to hold sd_lock across calling this (and the
2091  * stream head often does).
2092  * But for the write side strwakeq might be invoked and it acquires sd_lock.
2093  */
2094 void
2095 qbackenable(queue_t *q, uchar_t band)
2096 {
2097 	int backenab = 0;
2098 	qband_t *qbp;
2099 	kthread_id_t freezer;
2100 
2101 	ASSERT(q);
2102 	ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
2103 
2104 	/*
2105 	 * Quick check without holding the lock.
2106 	 * OK since after getq() has lowered the q_count these flags
2107 	 * would not change unless either the qbackenable() is done by
2108 	 * another thread (which is ok) or the queue has gotten QFULL
2109 	 * in which case another backenable will take place when the queue
2110 	 * drops below q_lowat.
2111 	 */
2112 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
2113 		return;
2114 
2115 	/* freezestr should allow its caller to call getq/putq */
2116 	freezer = STREAM(q)->sd_freezer;
2117 	if (freezer == curthread) {
2118 		ASSERT(frozenstr(q));
2119 		ASSERT(MUTEX_HELD(QLOCK(q)));
2120 	} else
2121 		mutex_enter(QLOCK(q));
2122 
2123 	if (band == 0) {
2124 		if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
2125 		    q->q_mblkcnt < q->q_lowat)) {
2126 			backenab = q->q_flag & (QWANTW|QWANTWSYNC);
2127 		}
2128 	} else {
2129 		int i;
2130 
2131 		ASSERT((unsigned)band <= q->q_nband);
2132 		ASSERT(q->q_bandp != NULL);
2133 
2134 		qbp = q->q_bandp;
2135 		i = band;
2136 		while (--i > 0)
2137 			qbp = qbp->qb_next;
2138 
2139 		if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
2140 		    qbp->qb_mblkcnt < qbp->qb_lowat)) {
2141 			backenab = qbp->qb_flag & QB_WANTW;
2142 		}
2143 	}
2144 
2145 	if (backenab == 0) {
2146 		if (freezer != curthread)
2147 			mutex_exit(QLOCK(q));
2148 		return;
2149 	}
2150 
2151 	/* Have to drop the lock across strwakeq and backenable */
2152 	if (backenab & QWANTWSYNC)
2153 		q->q_flag &= ~QWANTWSYNC;
2154 	if (backenab & (QWANTW|QB_WANTW)) {
2155 		if (band != 0)
2156 			qbp->qb_flag &= ~QB_WANTW;
2157 		else {
2158 			q->q_flag &= ~QWANTW;
2159 		}
2160 	}
2161 
2162 	if (freezer != curthread)
2163 		mutex_exit(QLOCK(q));
2164 
2165 	if (backenab & QWANTWSYNC)
2166 		strwakeq(q, QWANTWSYNC);
2167 	if (backenab & (QWANTW|QB_WANTW))
2168 		backenable(q, band);
2169 }
2170 
2171 /*
2172  * Remove a message from a queue.  The queue count and other
2173  * flow control parameters are adjusted and the back queue
2174  * enabled if necessary.
2175  *
2176  * rmvq can be called with the stream frozen, but other utility functions
2177  * holding QLOCK, and by streams modules without any locks/frozen.
2178  */
2179 void
2180 rmvq(queue_t *q, mblk_t *mp)
2181 {
2182 	ASSERT(mp != NULL);
2183 
2184 	rmvq_noenab(q, mp);
2185 	if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
2186 		/*
2187 		 * qbackenable can handle a frozen stream but not a "random"
2188 		 * qlock being held. Drop lock across qbackenable.
2189 		 */
2190 		mutex_exit(QLOCK(q));
2191 		qbackenable(q, mp->b_band);
2192 		mutex_enter(QLOCK(q));
2193 	} else {
2194 		qbackenable(q, mp->b_band);
2195 	}
2196 }
2197 
2198 /*
2199  * Like rmvq() but without any backenabling.
2200  * This exists to handle SR_CONSOL_DATA in strrput().
2201  */
2202 void
2203 rmvq_noenab(queue_t *q, mblk_t *mp)
2204 {
2205 	int i;
2206 	qband_t *qbp = NULL;
2207 	kthread_id_t freezer;
2208 	int	bytecnt = 0, mblkcnt = 0;
2209 
2210 	freezer = STREAM(q)->sd_freezer;
2211 	if (freezer == curthread) {
2212 		ASSERT(frozenstr(q));
2213 		ASSERT(MUTEX_HELD(QLOCK(q)));
2214 	} else if (MUTEX_HELD(QLOCK(q))) {
2215 		/* Don't drop lock on exit */
2216 		freezer = curthread;
2217 	} else
2218 		mutex_enter(QLOCK(q));
2219 
2220 	ASSERT(mp->b_band <= q->q_nband);
2221 	if (mp->b_band != 0) {		/* Adjust band pointers */
2222 		ASSERT(q->q_bandp != NULL);
2223 		qbp = q->q_bandp;
2224 		i = mp->b_band;
2225 		while (--i > 0)
2226 			qbp = qbp->qb_next;
2227 		if (mp == qbp->qb_first) {
2228 			if (mp->b_next && mp->b_band == mp->b_next->b_band)
2229 				qbp->qb_first = mp->b_next;
2230 			else
2231 				qbp->qb_first = NULL;
2232 		}
2233 		if (mp == qbp->qb_last) {
2234 			if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
2235 				qbp->qb_last = mp->b_prev;
2236 			else
2237 				qbp->qb_last = NULL;
2238 		}
2239 	}
2240 
2241 	/*
2242 	 * Remove the message from the list.
2243 	 */
2244 	if (mp->b_prev)
2245 		mp->b_prev->b_next = mp->b_next;
2246 	else
2247 		q->q_first = mp->b_next;
2248 	if (mp->b_next)
2249 		mp->b_next->b_prev = mp->b_prev;
2250 	else
2251 		q->q_last = mp->b_prev;
2252 	mp->b_next = NULL;
2253 	mp->b_prev = NULL;
2254 
2255 	/* Get the size of the message for q_count accounting */
2256 	bytecnt = mp_cont_len(mp, &mblkcnt);
2257 
2258 	if (mp->b_band == 0) {		/* Perform q_count accounting */
2259 		q->q_count -= bytecnt;
2260 		q->q_mblkcnt -= mblkcnt;
2261 		if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2262 		    (q->q_mblkcnt < q->q_hiwat))) {
2263 			q->q_flag &= ~QFULL;
2264 		}
2265 	} else {			/* Perform qb_count accounting */
2266 		qbp->qb_count -= bytecnt;
2267 		qbp->qb_mblkcnt -= mblkcnt;
2268 		if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) &&
2269 		    (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2270 			qbp->qb_flag &= ~QB_FULL;
2271 		}
2272 	}
2273 	if (freezer != curthread)
2274 		mutex_exit(QLOCK(q));
2275 
2276 	STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, 0);
2277 }
2278 
2279 /*
2280  * Empty a queue.
2281  * If flag is set, remove all messages.  Otherwise, remove
2282  * only non-control messages.  If queue falls below its low
2283  * water mark, and QWANTW is set, enable the nearest upstream
2284  * service procedure.
2285  *
2286  * Historical note: when merging the M_FLUSH code in strrput with this
2287  * code one difference was discovered. flushq did not have a check
2288  * for q_lowat == 0 in the backenabling test.
2289  *
2290  * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
2291  * if one exists on the queue.
2292  */
2293 void
2294 flushq_common(queue_t *q, int flag, int pcproto_flag)
2295 {
2296 	mblk_t *mp, *nmp;
2297 	qband_t *qbp;
2298 	int backenab = 0;
2299 	unsigned char bpri;
2300 	unsigned char	qbf[NBAND];	/* band flushing backenable flags */
2301 
2302 	if (q->q_first == NULL)
2303 		return;
2304 
2305 	mutex_enter(QLOCK(q));
2306 	mp = q->q_first;
2307 	q->q_first = NULL;
2308 	q->q_last = NULL;
2309 	q->q_count = 0;
2310 	q->q_mblkcnt = 0;
2311 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2312 		qbp->qb_first = NULL;
2313 		qbp->qb_last = NULL;
2314 		qbp->qb_count = 0;
2315 		qbp->qb_mblkcnt = 0;
2316 		qbp->qb_flag &= ~QB_FULL;
2317 	}
2318 	q->q_flag &= ~QFULL;
2319 	mutex_exit(QLOCK(q));
2320 	while (mp) {
2321 		nmp = mp->b_next;
2322 		mp->b_next = mp->b_prev = NULL;
2323 
2324 		STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, 0);
2325 
2326 		if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
2327 			(void) putq(q, mp);
2328 		else if (flag || datamsg(mp->b_datap->db_type))
2329 			freemsg(mp);
2330 		else
2331 			(void) putq(q, mp);
2332 		mp = nmp;
2333 	}
2334 	bpri = 1;
2335 	mutex_enter(QLOCK(q));
2336 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2337 		if ((qbp->qb_flag & QB_WANTW) &&
2338 		    (((qbp->qb_count < qbp->qb_lowat) &&
2339 		    (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
2340 		    qbp->qb_lowat == 0)) {
2341 			qbp->qb_flag &= ~QB_WANTW;
2342 			backenab = 1;
2343 			qbf[bpri] = 1;
2344 		} else
2345 			qbf[bpri] = 0;
2346 		bpri++;
2347 	}
2348 	ASSERT(bpri == (unsigned char)(q->q_nband + 1));
2349 	if ((q->q_flag & QWANTW) &&
2350 	    (((q->q_count < q->q_lowat) &&
2351 	    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2352 		q->q_flag &= ~QWANTW;
2353 		backenab = 1;
2354 		qbf[0] = 1;
2355 	} else
2356 		qbf[0] = 0;
2357 
2358 	/*
2359 	 * If any band can now be written to, and there is a writer
2360 	 * for that band, then backenable the closest service procedure.
2361 	 */
2362 	if (backenab) {
2363 		mutex_exit(QLOCK(q));
2364 		for (bpri = q->q_nband; bpri != 0; bpri--)
2365 			if (qbf[bpri])
2366 				backenable(q, bpri);
2367 		if (qbf[0])
2368 			backenable(q, 0);
2369 	} else
2370 		mutex_exit(QLOCK(q));
2371 }
2372 
2373 /*
2374  * The real flushing takes place in flushq_common. This is done so that
2375  * a flag which specifies whether or not M_PCPROTO messages should be flushed
2376  * or not. Currently the only place that uses this flag is the stream head.
2377  */
2378 void
2379 flushq(queue_t *q, int flag)
2380 {
2381 	flushq_common(q, flag, 0);
2382 }
2383 
2384 /*
2385  * Flush the queue of messages of the given priority band.
2386  * There is some duplication of code between flushq and flushband.
2387  * This is because we want to optimize the code as much as possible.
2388  * The assumption is that there will be more messages in the normal
2389  * (priority 0) band than in any other.
2390  *
2391  * Historical note: when merging the M_FLUSH code in strrput with this
2392  * code one difference was discovered. flushband had an extra check for
2393  * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
2394  * case. That check does not match the man page for flushband and was not
2395  * in the strrput flush code hence it was removed.
2396  */
2397 void
2398 flushband(queue_t *q, unsigned char pri, int flag)
2399 {
2400 	mblk_t *mp;
2401 	mblk_t *nmp;
2402 	mblk_t *last;
2403 	qband_t *qbp;
2404 	int band;
2405 
2406 	ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
2407 	if (pri > q->q_nband) {
2408 		return;
2409 	}
2410 	mutex_enter(QLOCK(q));
2411 	if (pri == 0) {
2412 		mp = q->q_first;
2413 		q->q_first = NULL;
2414 		q->q_last = NULL;
2415 		q->q_count = 0;
2416 		q->q_mblkcnt = 0;
2417 		for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2418 			qbp->qb_first = NULL;
2419 			qbp->qb_last = NULL;
2420 			qbp->qb_count = 0;
2421 			qbp->qb_mblkcnt = 0;
2422 			qbp->qb_flag &= ~QB_FULL;
2423 		}
2424 		q->q_flag &= ~QFULL;
2425 		mutex_exit(QLOCK(q));
2426 		while (mp) {
2427 			nmp = mp->b_next;
2428 			mp->b_next = mp->b_prev = NULL;
2429 			if ((mp->b_band == 0) &&
2430 			    ((flag == FLUSHALL) ||
2431 			    datamsg(mp->b_datap->db_type)))
2432 				freemsg(mp);
2433 			else
2434 				(void) putq(q, mp);
2435 			mp = nmp;
2436 		}
2437 		mutex_enter(QLOCK(q));
2438 		if ((q->q_flag & QWANTW) &&
2439 		    (((q->q_count < q->q_lowat) &&
2440 		    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2441 			q->q_flag &= ~QWANTW;
2442 			mutex_exit(QLOCK(q));
2443 
2444 			backenable(q, pri);
2445 		} else
2446 			mutex_exit(QLOCK(q));
2447 	} else {	/* pri != 0 */
2448 		boolean_t flushed = B_FALSE;
2449 		band = pri;
2450 
2451 		ASSERT(MUTEX_HELD(QLOCK(q)));
2452 		qbp = q->q_bandp;
2453 		while (--band > 0)
2454 			qbp = qbp->qb_next;
2455 		mp = qbp->qb_first;
2456 		if (mp == NULL) {
2457 			mutex_exit(QLOCK(q));
2458 			return;
2459 		}
2460 		last = qbp->qb_last->b_next;
2461 		/*
2462 		 * rmvq_noenab() and freemsg() are called for each mblk that
2463 		 * meets the criteria.  The loop is executed until the last
2464 		 * mblk has been processed.
2465 		 */
2466 		while (mp != last) {
2467 			ASSERT(mp->b_band == pri);
2468 			nmp = mp->b_next;
2469 			if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
2470 				rmvq_noenab(q, mp);
2471 				freemsg(mp);
2472 				flushed = B_TRUE;
2473 			}
2474 			mp = nmp;
2475 		}
2476 		mutex_exit(QLOCK(q));
2477 
2478 		/*
2479 		 * If any mblk(s) has been freed, we know that qbackenable()
2480 		 * will need to be called.
2481 		 */
2482 		if (flushed)
2483 			qbackenable(q, pri);
2484 	}
2485 }
2486 
2487 /*
2488  * Return 1 if the queue is not full.  If the queue is full, return
2489  * 0 (may not put message) and set QWANTW flag (caller wants to write
2490  * to the queue).
2491  */
2492 int
2493 canput(queue_t *q)
2494 {
2495 	TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
2496 
2497 	/* this is for loopback transports, they should not do a canput */
2498 	ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
2499 
2500 	/* Find next forward module that has a service procedure */
2501 	q = q->q_nfsrv;
2502 
2503 	if (!(q->q_flag & QFULL)) {
2504 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2505 		return (1);
2506 	}
2507 	mutex_enter(QLOCK(q));
2508 	if (q->q_flag & QFULL) {
2509 		q->q_flag |= QWANTW;
2510 		mutex_exit(QLOCK(q));
2511 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
2512 		return (0);
2513 	}
2514 	mutex_exit(QLOCK(q));
2515 	TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2516 	return (1);
2517 }
2518 
2519 /*
2520  * This is the new canput for use with priority bands.  Return 1 if the
2521  * band is not full.  If the band is full, return 0 (may not put message)
2522  * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
2523  * write to the queue).
2524  */
2525 int
2526 bcanput(queue_t *q, unsigned char pri)
2527 {
2528 	qband_t *qbp;
2529 
2530 	TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
2531 	if (!q)
2532 		return (0);
2533 
2534 	/* Find next forward module that has a service procedure */
2535 	q = q->q_nfsrv;
2536 
2537 	mutex_enter(QLOCK(q));
2538 	if (pri == 0) {
2539 		if (q->q_flag & QFULL) {
2540 			q->q_flag |= QWANTW;
2541 			mutex_exit(QLOCK(q));
2542 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2543 			    "bcanput:%p %X %d", q, pri, 0);
2544 			return (0);
2545 		}
2546 	} else {	/* pri != 0 */
2547 		if (pri > q->q_nband) {
2548 			/*
2549 			 * No band exists yet, so return success.
2550 			 */
2551 			mutex_exit(QLOCK(q));
2552 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2553 			    "bcanput:%p %X %d", q, pri, 1);
2554 			return (1);
2555 		}
2556 		qbp = q->q_bandp;
2557 		while (--pri)
2558 			qbp = qbp->qb_next;
2559 		if (qbp->qb_flag & QB_FULL) {
2560 			qbp->qb_flag |= QB_WANTW;
2561 			mutex_exit(QLOCK(q));
2562 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2563 			    "bcanput:%p %X %d", q, pri, 0);
2564 			return (0);
2565 		}
2566 	}
2567 	mutex_exit(QLOCK(q));
2568 	TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2569 	    "bcanput:%p %X %d", q, pri, 1);
2570 	return (1);
2571 }
2572 
2573 /*
2574  * Put a message on a queue.
2575  *
2576  * Messages are enqueued on a priority basis.  The priority classes
2577  * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
2578  * and B_NORMAL (type < QPCTL && band == 0).
2579  *
2580  * Add appropriate weighted data block sizes to queue count.
2581  * If queue hits high water mark then set QFULL flag.
2582  *
2583  * If QNOENAB is not set (putq is allowed to enable the queue),
2584  * enable the queue only if the message is PRIORITY,
2585  * or the QWANTR flag is set (indicating that the service procedure
2586  * is ready to read the queue.  This implies that a service
2587  * procedure must NEVER put a high priority message back on its own
2588  * queue, as this would result in an infinite loop (!).
2589  */
2590 int
2591 putq(queue_t *q, mblk_t *bp)
2592 {
2593 	mblk_t *tmp;
2594 	qband_t *qbp = NULL;
2595 	int mcls = (int)queclass(bp);
2596 	kthread_id_t freezer;
2597 	int	bytecnt = 0, mblkcnt = 0;
2598 
2599 	freezer = STREAM(q)->sd_freezer;
2600 	if (freezer == curthread) {
2601 		ASSERT(frozenstr(q));
2602 		ASSERT(MUTEX_HELD(QLOCK(q)));
2603 	} else
2604 		mutex_enter(QLOCK(q));
2605 
2606 	/*
2607 	 * Make sanity checks and if qband structure is not yet
2608 	 * allocated, do so.
2609 	 */
2610 	if (mcls == QPCTL) {
2611 		if (bp->b_band != 0)
2612 			bp->b_band = 0;		/* force to be correct */
2613 	} else if (bp->b_band != 0) {
2614 		int i;
2615 		qband_t **qbpp;
2616 
2617 		if (bp->b_band > q->q_nband) {
2618 
2619 			/*
2620 			 * The qband structure for this priority band is
2621 			 * not on the queue yet, so we have to allocate
2622 			 * one on the fly.  It would be wasteful to
2623 			 * associate the qband structures with every
2624 			 * queue when the queues are allocated.  This is
2625 			 * because most queues will only need the normal
2626 			 * band of flow which can be described entirely
2627 			 * by the queue itself.
2628 			 */
2629 			qbpp = &q->q_bandp;
2630 			while (*qbpp)
2631 				qbpp = &(*qbpp)->qb_next;
2632 			while (bp->b_band > q->q_nband) {
2633 				if ((*qbpp = allocband()) == NULL) {
2634 					if (freezer != curthread)
2635 						mutex_exit(QLOCK(q));
2636 					return (0);
2637 				}
2638 				(*qbpp)->qb_hiwat = q->q_hiwat;
2639 				(*qbpp)->qb_lowat = q->q_lowat;
2640 				q->q_nband++;
2641 				qbpp = &(*qbpp)->qb_next;
2642 			}
2643 		}
2644 		ASSERT(MUTEX_HELD(QLOCK(q)));
2645 		qbp = q->q_bandp;
2646 		i = bp->b_band;
2647 		while (--i)
2648 			qbp = qbp->qb_next;
2649 	}
2650 
2651 	/*
2652 	 * If queue is empty, add the message and initialize the pointers.
2653 	 * Otherwise, adjust message pointers and queue pointers based on
2654 	 * the type of the message and where it belongs on the queue.  Some
2655 	 * code is duplicated to minimize the number of conditionals and
2656 	 * hopefully minimize the amount of time this routine takes.
2657 	 */
2658 	if (!q->q_first) {
2659 		bp->b_next = NULL;
2660 		bp->b_prev = NULL;
2661 		q->q_first = bp;
2662 		q->q_last = bp;
2663 		if (qbp) {
2664 			qbp->qb_first = bp;
2665 			qbp->qb_last = bp;
2666 		}
2667 	} else if (!qbp) {	/* bp->b_band == 0 */
2668 
2669 		/*
2670 		 * If queue class of message is less than or equal to
2671 		 * that of the last one on the queue, tack on to the end.
2672 		 */
2673 		tmp = q->q_last;
2674 		if (mcls <= (int)queclass(tmp)) {
2675 			bp->b_next = NULL;
2676 			bp->b_prev = tmp;
2677 			tmp->b_next = bp;
2678 			q->q_last = bp;
2679 		} else {
2680 			tmp = q->q_first;
2681 			while ((int)queclass(tmp) >= mcls)
2682 				tmp = tmp->b_next;
2683 
2684 			/*
2685 			 * Insert bp before tmp.
2686 			 */
2687 			bp->b_next = tmp;
2688 			bp->b_prev = tmp->b_prev;
2689 			if (tmp->b_prev)
2690 				tmp->b_prev->b_next = bp;
2691 			else
2692 				q->q_first = bp;
2693 			tmp->b_prev = bp;
2694 		}
2695 	} else {		/* bp->b_band != 0 */
2696 		if (qbp->qb_first) {
2697 			tmp = qbp->qb_last;
2698 
2699 			/*
2700 			 * Insert bp after the last message in this band.
2701 			 */
2702 			bp->b_next = tmp->b_next;
2703 			if (tmp->b_next)
2704 				tmp->b_next->b_prev = bp;
2705 			else
2706 				q->q_last = bp;
2707 			bp->b_prev = tmp;
2708 			tmp->b_next = bp;
2709 		} else {
2710 			tmp = q->q_last;
2711 			if ((mcls < (int)queclass(tmp)) ||
2712 			    (bp->b_band <= tmp->b_band)) {
2713 
2714 				/*
2715 				 * Tack bp on end of queue.
2716 				 */
2717 				bp->b_next = NULL;
2718 				bp->b_prev = tmp;
2719 				tmp->b_next = bp;
2720 				q->q_last = bp;
2721 			} else {
2722 				tmp = q->q_first;
2723 				while (tmp->b_datap->db_type >= QPCTL)
2724 					tmp = tmp->b_next;
2725 				while (tmp->b_band >= bp->b_band)
2726 					tmp = tmp->b_next;
2727 
2728 				/*
2729 				 * Insert bp before tmp.
2730 				 */
2731 				bp->b_next = tmp;
2732 				bp->b_prev = tmp->b_prev;
2733 				if (tmp->b_prev)
2734 					tmp->b_prev->b_next = bp;
2735 				else
2736 					q->q_first = bp;
2737 				tmp->b_prev = bp;
2738 			}
2739 			qbp->qb_first = bp;
2740 		}
2741 		qbp->qb_last = bp;
2742 	}
2743 
2744 	/* Get message byte count for q_count accounting */
2745 	bytecnt = mp_cont_len(bp, &mblkcnt);
2746 
2747 	if (qbp) {
2748 		qbp->qb_count += bytecnt;
2749 		qbp->qb_mblkcnt += mblkcnt;
2750 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2751 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2752 			qbp->qb_flag |= QB_FULL;
2753 		}
2754 	} else {
2755 		q->q_count += bytecnt;
2756 		q->q_mblkcnt += mblkcnt;
2757 		if ((q->q_count >= q->q_hiwat) ||
2758 		    (q->q_mblkcnt >= q->q_hiwat)) {
2759 			q->q_flag |= QFULL;
2760 		}
2761 	}
2762 
2763 	STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, 0);
2764 
2765 	if ((mcls > QNORM) ||
2766 	    (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
2767 		qenable_locked(q);
2768 	ASSERT(MUTEX_HELD(QLOCK(q)));
2769 	if (freezer != curthread)
2770 		mutex_exit(QLOCK(q));
2771 
2772 	return (1);
2773 }
2774 
2775 /*
2776  * Put stuff back at beginning of Q according to priority order.
2777  * See comment on putq above for details.
2778  */
2779 int
2780 putbq(queue_t *q, mblk_t *bp)
2781 {
2782 	mblk_t *tmp;
2783 	qband_t *qbp = NULL;
2784 	int mcls = (int)queclass(bp);
2785 	kthread_id_t freezer;
2786 	int	bytecnt = 0, mblkcnt = 0;
2787 
2788 	ASSERT(q && bp);
2789 	ASSERT(bp->b_next == NULL);
2790 	freezer = STREAM(q)->sd_freezer;
2791 	if (freezer == curthread) {
2792 		ASSERT(frozenstr(q));
2793 		ASSERT(MUTEX_HELD(QLOCK(q)));
2794 	} else
2795 		mutex_enter(QLOCK(q));
2796 
2797 	/*
2798 	 * Make sanity checks and if qband structure is not yet
2799 	 * allocated, do so.
2800 	 */
2801 	if (mcls == QPCTL) {
2802 		if (bp->b_band != 0)
2803 			bp->b_band = 0;		/* force to be correct */
2804 	} else if (bp->b_band != 0) {
2805 		int i;
2806 		qband_t **qbpp;
2807 
2808 		if (bp->b_band > q->q_nband) {
2809 			qbpp = &q->q_bandp;
2810 			while (*qbpp)
2811 				qbpp = &(*qbpp)->qb_next;
2812 			while (bp->b_band > q->q_nband) {
2813 				if ((*qbpp = allocband()) == NULL) {
2814 					if (freezer != curthread)
2815 						mutex_exit(QLOCK(q));
2816 					return (0);
2817 				}
2818 				(*qbpp)->qb_hiwat = q->q_hiwat;
2819 				(*qbpp)->qb_lowat = q->q_lowat;
2820 				q->q_nband++;
2821 				qbpp = &(*qbpp)->qb_next;
2822 			}
2823 		}
2824 		qbp = q->q_bandp;
2825 		i = bp->b_band;
2826 		while (--i)
2827 			qbp = qbp->qb_next;
2828 	}
2829 
2830 	/*
2831 	 * If queue is empty or if message is high priority,
2832 	 * place on the front of the queue.
2833 	 */
2834 	tmp = q->q_first;
2835 	if ((!tmp) || (mcls == QPCTL)) {
2836 		bp->b_next = tmp;
2837 		if (tmp)
2838 			tmp->b_prev = bp;
2839 		else
2840 			q->q_last = bp;
2841 		q->q_first = bp;
2842 		bp->b_prev = NULL;
2843 		if (qbp) {
2844 			qbp->qb_first = bp;
2845 			qbp->qb_last = bp;
2846 		}
2847 	} else if (qbp) {	/* bp->b_band != 0 */
2848 		tmp = qbp->qb_first;
2849 		if (tmp) {
2850 
2851 			/*
2852 			 * Insert bp before the first message in this band.
2853 			 */
2854 			bp->b_next = tmp;
2855 			bp->b_prev = tmp->b_prev;
2856 			if (tmp->b_prev)
2857 				tmp->b_prev->b_next = bp;
2858 			else
2859 				q->q_first = bp;
2860 			tmp->b_prev = bp;
2861 		} else {
2862 			tmp = q->q_last;
2863 			if ((mcls < (int)queclass(tmp)) ||
2864 			    (bp->b_band < tmp->b_band)) {
2865 
2866 				/*
2867 				 * Tack bp on end of queue.
2868 				 */
2869 				bp->b_next = NULL;
2870 				bp->b_prev = tmp;
2871 				tmp->b_next = bp;
2872 				q->q_last = bp;
2873 			} else {
2874 				tmp = q->q_first;
2875 				while (tmp->b_datap->db_type >= QPCTL)
2876 					tmp = tmp->b_next;
2877 				while (tmp->b_band > bp->b_band)
2878 					tmp = tmp->b_next;
2879 
2880 				/*
2881 				 * Insert bp before tmp.
2882 				 */
2883 				bp->b_next = tmp;
2884 				bp->b_prev = tmp->b_prev;
2885 				if (tmp->b_prev)
2886 					tmp->b_prev->b_next = bp;
2887 				else
2888 					q->q_first = bp;
2889 				tmp->b_prev = bp;
2890 			}
2891 			qbp->qb_last = bp;
2892 		}
2893 		qbp->qb_first = bp;
2894 	} else {		/* bp->b_band == 0 && !QPCTL */
2895 
2896 		/*
2897 		 * If the queue class or band is less than that of the last
2898 		 * message on the queue, tack bp on the end of the queue.
2899 		 */
2900 		tmp = q->q_last;
2901 		if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
2902 			bp->b_next = NULL;
2903 			bp->b_prev = tmp;
2904 			tmp->b_next = bp;
2905 			q->q_last = bp;
2906 		} else {
2907 			tmp = q->q_first;
2908 			while (tmp->b_datap->db_type >= QPCTL)
2909 				tmp = tmp->b_next;
2910 			while (tmp->b_band > bp->b_band)
2911 				tmp = tmp->b_next;
2912 
2913 			/*
2914 			 * Insert bp before tmp.
2915 			 */
2916 			bp->b_next = tmp;
2917 			bp->b_prev = tmp->b_prev;
2918 			if (tmp->b_prev)
2919 				tmp->b_prev->b_next = bp;
2920 			else
2921 				q->q_first = bp;
2922 			tmp->b_prev = bp;
2923 		}
2924 	}
2925 
2926 	/* Get message byte count for q_count accounting */
2927 	bytecnt = mp_cont_len(bp, &mblkcnt);
2928 
2929 	if (qbp) {
2930 		qbp->qb_count += bytecnt;
2931 		qbp->qb_mblkcnt += mblkcnt;
2932 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2933 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2934 			qbp->qb_flag |= QB_FULL;
2935 		}
2936 	} else {
2937 		q->q_count += bytecnt;
2938 		q->q_mblkcnt += mblkcnt;
2939 		if ((q->q_count >= q->q_hiwat) ||
2940 		    (q->q_mblkcnt >= q->q_hiwat)) {
2941 			q->q_flag |= QFULL;
2942 		}
2943 	}
2944 
2945 	STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, 0);
2946 
2947 	if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
2948 		qenable_locked(q);
2949 	ASSERT(MUTEX_HELD(QLOCK(q)));
2950 	if (freezer != curthread)
2951 		mutex_exit(QLOCK(q));
2952 
2953 	return (1);
2954 }
2955 
2956 /*
2957  * Insert a message before an existing message on the queue.  If the
2958  * existing message is NULL, the new messages is placed on the end of
2959  * the queue.  The queue class of the new message is ignored.  However,
2960  * the priority band of the new message must adhere to the following
2961  * ordering:
2962  *
2963  *	emp->b_prev->b_band >= mp->b_band >= emp->b_band.
2964  *
2965  * All flow control parameters are updated.
2966  *
2967  * insq can be called with the stream frozen, but other utility functions
2968  * holding QLOCK, and by streams modules without any locks/frozen.
2969  */
2970 int
2971 insq(queue_t *q, mblk_t *emp, mblk_t *mp)
2972 {
2973 	mblk_t *tmp;
2974 	qband_t *qbp = NULL;
2975 	int mcls = (int)queclass(mp);
2976 	kthread_id_t freezer;
2977 	int	bytecnt = 0, mblkcnt = 0;
2978 
2979 	freezer = STREAM(q)->sd_freezer;
2980 	if (freezer == curthread) {
2981 		ASSERT(frozenstr(q));
2982 		ASSERT(MUTEX_HELD(QLOCK(q)));
2983 	} else if (MUTEX_HELD(QLOCK(q))) {
2984 		/* Don't drop lock on exit */
2985 		freezer = curthread;
2986 	} else
2987 		mutex_enter(QLOCK(q));
2988 
2989 	if (mcls == QPCTL) {
2990 		if (mp->b_band != 0)
2991 			mp->b_band = 0;		/* force to be correct */
2992 		if (emp && emp->b_prev &&
2993 		    (emp->b_prev->b_datap->db_type < QPCTL))
2994 			goto badord;
2995 	}
2996 	if (emp) {
2997 		if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
2998 		    (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
2999 		    (emp->b_prev->b_band < mp->b_band))) {
3000 			goto badord;
3001 		}
3002 	} else {
3003 		tmp = q->q_last;
3004 		if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
3005 badord:
3006 			cmn_err(CE_WARN,
3007 			    "insq: attempt to insert message out of order "
3008 			    "on q %p", (void *)q);
3009 			if (freezer != curthread)
3010 				mutex_exit(QLOCK(q));
3011 			return (0);
3012 		}
3013 	}
3014 
3015 	if (mp->b_band != 0) {
3016 		int i;
3017 		qband_t **qbpp;
3018 
3019 		if (mp->b_band > q->q_nband) {
3020 			qbpp = &q->q_bandp;
3021 			while (*qbpp)
3022 				qbpp = &(*qbpp)->qb_next;
3023 			while (mp->b_band > q->q_nband) {
3024 				if ((*qbpp = allocband()) == NULL) {
3025 					if (freezer != curthread)
3026 						mutex_exit(QLOCK(q));
3027 					return (0);
3028 				}
3029 				(*qbpp)->qb_hiwat = q->q_hiwat;
3030 				(*qbpp)->qb_lowat = q->q_lowat;
3031 				q->q_nband++;
3032 				qbpp = &(*qbpp)->qb_next;
3033 			}
3034 		}
3035 		qbp = q->q_bandp;
3036 		i = mp->b_band;
3037 		while (--i)
3038 			qbp = qbp->qb_next;
3039 	}
3040 
3041 	if ((mp->b_next = emp) != NULL) {
3042 		if ((mp->b_prev = emp->b_prev) != NULL)
3043 			emp->b_prev->b_next = mp;
3044 		else
3045 			q->q_first = mp;
3046 		emp->b_prev = mp;
3047 	} else {
3048 		if ((mp->b_prev = q->q_last) != NULL)
3049 			q->q_last->b_next = mp;
3050 		else
3051 			q->q_first = mp;
3052 		q->q_last = mp;
3053 	}
3054 
3055 	/* Get mblk and byte count for q_count accounting */
3056 	bytecnt = mp_cont_len(mp, &mblkcnt);
3057 
3058 	if (qbp) {	/* adjust qband pointers and count */
3059 		if (!qbp->qb_first) {
3060 			qbp->qb_first = mp;
3061 			qbp->qb_last = mp;
3062 		} else {
3063 			if (mp->b_prev == NULL || (mp->b_prev != NULL &&
3064 			    (mp->b_prev->b_band != mp->b_band)))
3065 				qbp->qb_first = mp;
3066 			else if (mp->b_next == NULL || (mp->b_next != NULL &&
3067 			    (mp->b_next->b_band != mp->b_band)))
3068 				qbp->qb_last = mp;
3069 		}
3070 		qbp->qb_count += bytecnt;
3071 		qbp->qb_mblkcnt += mblkcnt;
3072 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
3073 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
3074 			qbp->qb_flag |= QB_FULL;
3075 		}
3076 	} else {
3077 		q->q_count += bytecnt;
3078 		q->q_mblkcnt += mblkcnt;
3079 		if ((q->q_count >= q->q_hiwat) ||
3080 		    (q->q_mblkcnt >= q->q_hiwat)) {
3081 			q->q_flag |= QFULL;
3082 		}
3083 	}
3084 
3085 	STR_FTEVENT_MSG(mp, q, FTEV_INSQ, 0);
3086 
3087 	if (canenable(q) && (q->q_flag & QWANTR))
3088 		qenable_locked(q);
3089 
3090 	ASSERT(MUTEX_HELD(QLOCK(q)));
3091 	if (freezer != curthread)
3092 		mutex_exit(QLOCK(q));
3093 
3094 	return (1);
3095 }
3096 
3097 /*
3098  * Create and put a control message on queue.
3099  */
3100 int
3101 putctl(queue_t *q, int type)
3102 {
3103 	mblk_t *bp;
3104 
3105 	if ((datamsg(type) && (type != M_DELAY)) ||
3106 	    (bp = allocb_tryhard(0)) == NULL)
3107 		return (0);
3108 	bp->b_datap->db_type = (unsigned char) type;
3109 
3110 	put(q, bp);
3111 
3112 	return (1);
3113 }
3114 
3115 /*
3116  * Control message with a single-byte parameter
3117  */
3118 int
3119 putctl1(queue_t *q, int type, int param)
3120 {
3121 	mblk_t *bp;
3122 
3123 	if ((datamsg(type) && (type != M_DELAY)) ||
3124 	    (bp = allocb_tryhard(1)) == NULL)
3125 		return (0);
3126 	bp->b_datap->db_type = (unsigned char)type;
3127 	*bp->b_wptr++ = (unsigned char)param;
3128 
3129 	put(q, bp);
3130 
3131 	return (1);
3132 }
3133 
3134 int
3135 putnextctl1(queue_t *q, int type, int param)
3136 {
3137 	mblk_t *bp;
3138 
3139 	if ((datamsg(type) && (type != M_DELAY)) ||
3140 	    ((bp = allocb_tryhard(1)) == NULL))
3141 		return (0);
3142 
3143 	bp->b_datap->db_type = (unsigned char)type;
3144 	*bp->b_wptr++ = (unsigned char)param;
3145 
3146 	putnext(q, bp);
3147 
3148 	return (1);
3149 }
3150 
3151 int
3152 putnextctl(queue_t *q, int type)
3153 {
3154 	mblk_t *bp;
3155 
3156 	if ((datamsg(type) && (type != M_DELAY)) ||
3157 	    ((bp = allocb_tryhard(0)) == NULL))
3158 		return (0);
3159 	bp->b_datap->db_type = (unsigned char)type;
3160 
3161 	putnext(q, bp);
3162 
3163 	return (1);
3164 }
3165 
3166 /*
3167  * Return the queue upstream from this one
3168  */
3169 queue_t *
3170 backq(queue_t *q)
3171 {
3172 	q = _OTHERQ(q);
3173 	if (q->q_next) {
3174 		q = q->q_next;
3175 		return (_OTHERQ(q));
3176 	}
3177 	return (NULL);
3178 }
3179 
3180 /*
3181  * Send a block back up the queue in reverse from this
3182  * one (e.g. to respond to ioctls)
3183  */
3184 void
3185 qreply(queue_t *q, mblk_t *bp)
3186 {
3187 	ASSERT(q && bp);
3188 
3189 	putnext(_OTHERQ(q), bp);
3190 }
3191 
3192 /*
3193  * Streams Queue Scheduling
3194  *
3195  * Queues are enabled through qenable() when they have messages to
3196  * process.  They are serviced by queuerun(), which runs each enabled
3197  * queue's service procedure.  The call to queuerun() is processor
3198  * dependent - the general principle is that it be run whenever a queue
3199  * is enabled but before returning to user level.  For system calls,
3200  * the function runqueues() is called if their action causes a queue
3201  * to be enabled.  For device interrupts, queuerun() should be
3202  * called before returning from the last level of interrupt.  Beyond
3203  * this, no timing assumptions should be made about queue scheduling.
3204  */
3205 
3206 /*
3207  * Enable a queue: put it on list of those whose service procedures are
3208  * ready to run and set up the scheduling mechanism.
3209  * The broadcast is done outside the mutex -> to avoid the woken thread
3210  * from contending with the mutex. This is OK 'cos the queue has been
3211  * enqueued on the runlist and flagged safely at this point.
3212  */
3213 void
3214 qenable(queue_t *q)
3215 {
3216 	mutex_enter(QLOCK(q));
3217 	qenable_locked(q);
3218 	mutex_exit(QLOCK(q));
3219 }
3220 /*
3221  * Return number of messages on queue
3222  */
3223 int
3224 qsize(queue_t *qp)
3225 {
3226 	int count = 0;
3227 	mblk_t *mp;
3228 
3229 	mutex_enter(QLOCK(qp));
3230 	for (mp = qp->q_first; mp; mp = mp->b_next)
3231 		count++;
3232 	mutex_exit(QLOCK(qp));
3233 	return (count);
3234 }
3235 
3236 /*
3237  * noenable - set queue so that putq() will not enable it.
3238  * enableok - set queue so that putq() can enable it.
3239  */
3240 void
3241 noenable(queue_t *q)
3242 {
3243 	mutex_enter(QLOCK(q));
3244 	q->q_flag |= QNOENB;
3245 	mutex_exit(QLOCK(q));
3246 }
3247 
3248 void
3249 enableok(queue_t *q)
3250 {
3251 	mutex_enter(QLOCK(q));
3252 	q->q_flag &= ~QNOENB;
3253 	mutex_exit(QLOCK(q));
3254 }
3255 
3256 /*
3257  * Set queue fields.
3258  */
3259 int
3260 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
3261 {
3262 	qband_t *qbp = NULL;
3263 	queue_t	*wrq;
3264 	int error = 0;
3265 	kthread_id_t freezer;
3266 
3267 	freezer = STREAM(q)->sd_freezer;
3268 	if (freezer == curthread) {
3269 		ASSERT(frozenstr(q));
3270 		ASSERT(MUTEX_HELD(QLOCK(q)));
3271 	} else
3272 		mutex_enter(QLOCK(q));
3273 
3274 	if (what >= QBAD) {
3275 		error = EINVAL;
3276 		goto done;
3277 	}
3278 	if (pri != 0) {
3279 		int i;
3280 		qband_t **qbpp;
3281 
3282 		if (pri > q->q_nband) {
3283 			qbpp = &q->q_bandp;
3284 			while (*qbpp)
3285 				qbpp = &(*qbpp)->qb_next;
3286 			while (pri > q->q_nband) {
3287 				if ((*qbpp = allocband()) == NULL) {
3288 					error = EAGAIN;
3289 					goto done;
3290 				}
3291 				(*qbpp)->qb_hiwat = q->q_hiwat;
3292 				(*qbpp)->qb_lowat = q->q_lowat;
3293 				q->q_nband++;
3294 				qbpp = &(*qbpp)->qb_next;
3295 			}
3296 		}
3297 		qbp = q->q_bandp;
3298 		i = pri;
3299 		while (--i)
3300 			qbp = qbp->qb_next;
3301 	}
3302 	switch (what) {
3303 
3304 	case QHIWAT:
3305 		if (qbp)
3306 			qbp->qb_hiwat = (size_t)val;
3307 		else
3308 			q->q_hiwat = (size_t)val;
3309 		break;
3310 
3311 	case QLOWAT:
3312 		if (qbp)
3313 			qbp->qb_lowat = (size_t)val;
3314 		else
3315 			q->q_lowat = (size_t)val;
3316 		break;
3317 
3318 	case QMAXPSZ:
3319 		if (qbp)
3320 			error = EINVAL;
3321 		else
3322 			q->q_maxpsz = (ssize_t)val;
3323 
3324 		/*
3325 		 * Performance concern, strwrite looks at the module below
3326 		 * the stream head for the maxpsz each time it does a write
3327 		 * we now cache it at the stream head.  Check to see if this
3328 		 * queue is sitting directly below the stream head.
3329 		 */
3330 		wrq = STREAM(q)->sd_wrq;
3331 		if (q != wrq->q_next)
3332 			break;
3333 
3334 		/*
3335 		 * If the stream is not frozen drop the current QLOCK and
3336 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3337 		 */
3338 		if (freezer != curthread) {
3339 			mutex_exit(QLOCK(q));
3340 			mutex_enter(QLOCK(wrq));
3341 		}
3342 		ASSERT(MUTEX_HELD(QLOCK(wrq)));
3343 
3344 		if (strmsgsz != 0) {
3345 			if (val == INFPSZ)
3346 				val = strmsgsz;
3347 			else  {
3348 				if (STREAM(q)->sd_vnode->v_type == VFIFO)
3349 					val = MIN(PIPE_BUF, val);
3350 				else
3351 					val = MIN(strmsgsz, val);
3352 			}
3353 		}
3354 		STREAM(q)->sd_qn_maxpsz = val;
3355 		if (freezer != curthread) {
3356 			mutex_exit(QLOCK(wrq));
3357 			mutex_enter(QLOCK(q));
3358 		}
3359 		break;
3360 
3361 	case QMINPSZ:
3362 		if (qbp)
3363 			error = EINVAL;
3364 		else
3365 			q->q_minpsz = (ssize_t)val;
3366 
3367 		/*
3368 		 * Performance concern, strwrite looks at the module below
3369 		 * the stream head for the maxpsz each time it does a write
3370 		 * we now cache it at the stream head.  Check to see if this
3371 		 * queue is sitting directly below the stream head.
3372 		 */
3373 		wrq = STREAM(q)->sd_wrq;
3374 		if (q != wrq->q_next)
3375 			break;
3376 
3377 		/*
3378 		 * If the stream is not frozen drop the current QLOCK and
3379 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3380 		 */
3381 		if (freezer != curthread) {
3382 			mutex_exit(QLOCK(q));
3383 			mutex_enter(QLOCK(wrq));
3384 		}
3385 		STREAM(q)->sd_qn_minpsz = (ssize_t)val;
3386 
3387 		if (freezer != curthread) {
3388 			mutex_exit(QLOCK(wrq));
3389 			mutex_enter(QLOCK(q));
3390 		}
3391 		break;
3392 
3393 	case QSTRUIOT:
3394 		if (qbp)
3395 			error = EINVAL;
3396 		else
3397 			q->q_struiot = (ushort_t)val;
3398 		break;
3399 
3400 	case QCOUNT:
3401 	case QFIRST:
3402 	case QLAST:
3403 	case QFLAG:
3404 		error = EPERM;
3405 		break;
3406 
3407 	default:
3408 		error = EINVAL;
3409 		break;
3410 	}
3411 done:
3412 	if (freezer != curthread)
3413 		mutex_exit(QLOCK(q));
3414 	return (error);
3415 }
3416 
3417 /*
3418  * Get queue fields.
3419  */
3420 int
3421 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
3422 {
3423 	qband_t		*qbp = NULL;
3424 	int		error = 0;
3425 	kthread_id_t	freezer;
3426 
3427 	freezer = STREAM(q)->sd_freezer;
3428 	if (freezer == curthread) {
3429 		ASSERT(frozenstr(q));
3430 		ASSERT(MUTEX_HELD(QLOCK(q)));
3431 	} else
3432 		mutex_enter(QLOCK(q));
3433 	if (what >= QBAD) {
3434 		error = EINVAL;
3435 		goto done;
3436 	}
3437 	if (pri != 0) {
3438 		int i;
3439 		qband_t **qbpp;
3440 
3441 		if (pri > q->q_nband) {
3442 			qbpp = &q->q_bandp;
3443 			while (*qbpp)
3444 				qbpp = &(*qbpp)->qb_next;
3445 			while (pri > q->q_nband) {
3446 				if ((*qbpp = allocband()) == NULL) {
3447 					error = EAGAIN;
3448 					goto done;
3449 				}
3450 				(*qbpp)->qb_hiwat = q->q_hiwat;
3451 				(*qbpp)->qb_lowat = q->q_lowat;
3452 				q->q_nband++;
3453 				qbpp = &(*qbpp)->qb_next;
3454 			}
3455 		}
3456 		qbp = q->q_bandp;
3457 		i = pri;
3458 		while (--i)
3459 			qbp = qbp->qb_next;
3460 	}
3461 	switch (what) {
3462 	case QHIWAT:
3463 		if (qbp)
3464 			*(size_t *)valp = qbp->qb_hiwat;
3465 		else
3466 			*(size_t *)valp = q->q_hiwat;
3467 		break;
3468 
3469 	case QLOWAT:
3470 		if (qbp)
3471 			*(size_t *)valp = qbp->qb_lowat;
3472 		else
3473 			*(size_t *)valp = q->q_lowat;
3474 		break;
3475 
3476 	case QMAXPSZ:
3477 		if (qbp)
3478 			error = EINVAL;
3479 		else
3480 			*(ssize_t *)valp = q->q_maxpsz;
3481 		break;
3482 
3483 	case QMINPSZ:
3484 		if (qbp)
3485 			error = EINVAL;
3486 		else
3487 			*(ssize_t *)valp = q->q_minpsz;
3488 		break;
3489 
3490 	case QCOUNT:
3491 		if (qbp)
3492 			*(size_t *)valp = qbp->qb_count;
3493 		else
3494 			*(size_t *)valp = q->q_count;
3495 		break;
3496 
3497 	case QFIRST:
3498 		if (qbp)
3499 			*(mblk_t **)valp = qbp->qb_first;
3500 		else
3501 			*(mblk_t **)valp = q->q_first;
3502 		break;
3503 
3504 	case QLAST:
3505 		if (qbp)
3506 			*(mblk_t **)valp = qbp->qb_last;
3507 		else
3508 			*(mblk_t **)valp = q->q_last;
3509 		break;
3510 
3511 	case QFLAG:
3512 		if (qbp)
3513 			*(uint_t *)valp = qbp->qb_flag;
3514 		else
3515 			*(uint_t *)valp = q->q_flag;
3516 		break;
3517 
3518 	case QSTRUIOT:
3519 		if (qbp)
3520 			error = EINVAL;
3521 		else
3522 			*(short *)valp = q->q_struiot;
3523 		break;
3524 
3525 	default:
3526 		error = EINVAL;
3527 		break;
3528 	}
3529 done:
3530 	if (freezer != curthread)
3531 		mutex_exit(QLOCK(q));
3532 	return (error);
3533 }
3534 
3535 /*
3536  * Function awakes all in cvwait/sigwait/pollwait, on one of:
3537  *	QWANTWSYNC or QWANTR or QWANTW,
3538  *
3539  * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
3540  *	 deferred wakeup will be done. Also if strpoll() in progress then a
3541  *	 deferred pollwakeup will be done.
3542  */
3543 void
3544 strwakeq(queue_t *q, int flag)
3545 {
3546 	stdata_t	*stp = STREAM(q);
3547 	pollhead_t	*pl;
3548 
3549 	mutex_enter(&stp->sd_lock);
3550 	pl = &stp->sd_pollist;
3551 	if (flag & QWANTWSYNC) {
3552 		ASSERT(!(q->q_flag & QREADR));
3553 		if (stp->sd_flag & WSLEEP) {
3554 			stp->sd_flag &= ~WSLEEP;
3555 			cv_broadcast(&stp->sd_wrq->q_wait);
3556 		} else {
3557 			stp->sd_wakeq |= WSLEEP;
3558 		}
3559 
3560 		mutex_exit(&stp->sd_lock);
3561 		pollwakeup(pl, POLLWRNORM);
3562 		mutex_enter(&stp->sd_lock);
3563 
3564 		if (stp->sd_sigflags & S_WRNORM)
3565 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3566 	} else if (flag & QWANTR) {
3567 		if (stp->sd_flag & RSLEEP) {
3568 			stp->sd_flag &= ~RSLEEP;
3569 			cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
3570 		} else {
3571 			stp->sd_wakeq |= RSLEEP;
3572 		}
3573 
3574 		mutex_exit(&stp->sd_lock);
3575 		pollwakeup(pl, POLLIN | POLLRDNORM);
3576 		mutex_enter(&stp->sd_lock);
3577 
3578 		{
3579 			int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
3580 
3581 			if (events)
3582 				strsendsig(stp->sd_siglist, events, 0, 0);
3583 		}
3584 	} else {
3585 		if (stp->sd_flag & WSLEEP) {
3586 			stp->sd_flag &= ~WSLEEP;
3587 			cv_broadcast(&stp->sd_wrq->q_wait);
3588 		}
3589 
3590 		mutex_exit(&stp->sd_lock);
3591 		pollwakeup(pl, POLLWRNORM);
3592 		mutex_enter(&stp->sd_lock);
3593 
3594 		if (stp->sd_sigflags & S_WRNORM)
3595 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3596 	}
3597 	mutex_exit(&stp->sd_lock);
3598 }
3599 
3600 int
3601 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
3602 {
3603 	stdata_t *stp = STREAM(q);
3604 	int typ  = STRUIOT_STANDARD;
3605 	uio_t	 *uiop = &dp->d_uio;
3606 	dblk_t	 *dbp;
3607 	ssize_t	 uiocnt;
3608 	ssize_t	 cnt;
3609 	unsigned char *ptr;
3610 	ssize_t	 resid;
3611 	int	 error = 0;
3612 	on_trap_data_t otd;
3613 	queue_t	*stwrq;
3614 
3615 	/*
3616 	 * Plumbing may change while taking the type so store the
3617 	 * queue in a temporary variable. It doesn't matter even
3618 	 * if the we take the type from the previous plumbing,
3619 	 * that's because if the plumbing has changed when we were
3620 	 * holding the queue in a temporary variable, we can continue
3621 	 * processing the message the way it would have been processed
3622 	 * in the old plumbing, without any side effects but a bit
3623 	 * extra processing for partial ip header checksum.
3624 	 *
3625 	 * This has been done to avoid holding the sd_lock which is
3626 	 * very hot.
3627 	 */
3628 
3629 	stwrq = stp->sd_struiowrq;
3630 	if (stwrq)
3631 		typ = stwrq->q_struiot;
3632 
3633 	for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
3634 		dbp = mp->b_datap;
3635 		ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
3636 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3637 		cnt = MIN(uiocnt, uiop->uio_resid);
3638 		if (!(dbp->db_struioflag & STRUIO_SPEC) ||
3639 		    (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
3640 			/*
3641 			 * Either this mblk has already been processed
3642 			 * or there is no more room in this mblk (?).
3643 			 */
3644 			continue;
3645 		}
3646 		switch (typ) {
3647 		case STRUIOT_STANDARD:
3648 			if (noblock) {
3649 				if (on_trap(&otd, OT_DATA_ACCESS)) {
3650 					no_trap();
3651 					error = EWOULDBLOCK;
3652 					goto out;
3653 				}
3654 			}
3655 			if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
3656 				if (noblock)
3657 					no_trap();
3658 				goto out;
3659 			}
3660 			if (noblock)
3661 				no_trap();
3662 			break;
3663 
3664 		default:
3665 			error = EIO;
3666 			goto out;
3667 		}
3668 		dbp->db_struioflag |= STRUIO_DONE;
3669 		dbp->db_cksumstuff += cnt;
3670 	}
3671 out:
3672 	if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
3673 		/*
3674 		 * A fault has occured and some bytes were moved to the
3675 		 * current mblk, the uio_t has already been updated by
3676 		 * the appropriate uio routine, so also update the mblk
3677 		 * to reflect this in case this same mblk chain is used
3678 		 * again (after the fault has been handled).
3679 		 */
3680 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3681 		if (uiocnt >= resid)
3682 			dbp->db_cksumstuff += resid;
3683 	}
3684 	return (error);
3685 }
3686 
3687 /*
3688  * Try to enter queue synchronously. Any attempt to enter a closing queue will
3689  * fails. The qp->q_rwcnt keeps track of the number of successful entries so
3690  * that removeq() will not try to close the queue while a thread is inside the
3691  * queue.
3692  */
3693 static boolean_t
3694 rwnext_enter(queue_t *qp)
3695 {
3696 	mutex_enter(QLOCK(qp));
3697 	if (qp->q_flag & QWCLOSE) {
3698 		mutex_exit(QLOCK(qp));
3699 		return (B_FALSE);
3700 	}
3701 	qp->q_rwcnt++;
3702 	ASSERT(qp->q_rwcnt != 0);
3703 	mutex_exit(QLOCK(qp));
3704 	return (B_TRUE);
3705 }
3706 
3707 /*
3708  * Decrease the count of threads running in sync stream queue and wake up any
3709  * threads blocked in removeq().
3710  */
3711 static void
3712 rwnext_exit(queue_t *qp)
3713 {
3714 	mutex_enter(QLOCK(qp));
3715 	qp->q_rwcnt--;
3716 	if (qp->q_flag & QWANTRMQSYNC) {
3717 		qp->q_flag &= ~QWANTRMQSYNC;
3718 		cv_broadcast(&qp->q_wait);
3719 	}
3720 	mutex_exit(QLOCK(qp));
3721 }
3722 
3723 /*
3724  * The purpose of rwnext() is to call the rw procedure of the next
3725  * (downstream) modules queue.
3726  *
3727  * treated as put entrypoint for perimeter syncronization.
3728  *
3729  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3730  * sync queues). If it is CIPUT sync queue sq_count is incremented and it does
3731  * not matter if any regular put entrypoints have been already entered. We
3732  * can't increment one of the sq_putcounts (instead of sq_count) because
3733  * qwait_rw won't know which counter to decrement.
3734  *
3735  * It would be reasonable to add the lockless FASTPUT logic.
3736  */
3737 int
3738 rwnext(queue_t *qp, struiod_t *dp)
3739 {
3740 	queue_t		*nqp;
3741 	syncq_t		*sq;
3742 	uint16_t	count;
3743 	uint16_t	flags;
3744 	struct qinit	*qi;
3745 	int		(*proc)();
3746 	struct stdata	*stp;
3747 	int		isread;
3748 	int		rval;
3749 
3750 	stp = STREAM(qp);
3751 	/*
3752 	 * Prevent q_next from changing by holding sd_lock until acquiring
3753 	 * SQLOCK. Note that a read-side rwnext from the streamhead will
3754 	 * already have sd_lock acquired. In either case sd_lock is always
3755 	 * released after acquiring SQLOCK.
3756 	 *
3757 	 * The streamhead read-side holding sd_lock when calling rwnext is
3758 	 * required to prevent a race condition were M_DATA mblks flowing
3759 	 * up the read-side of the stream could be bypassed by a rwnext()
3760 	 * down-call. In this case sd_lock acts as the streamhead perimeter.
3761 	 */
3762 	if ((nqp = _WR(qp)) == qp) {
3763 		isread = 0;
3764 		mutex_enter(&stp->sd_lock);
3765 		qp = nqp->q_next;
3766 	} else {
3767 		isread = 1;
3768 		if (nqp != stp->sd_wrq)
3769 			/* Not streamhead */
3770 			mutex_enter(&stp->sd_lock);
3771 		qp = _RD(nqp->q_next);
3772 	}
3773 	qi = qp->q_qinfo;
3774 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
3775 		/*
3776 		 * Not a synchronous module or no r/w procedure for this
3777 		 * queue, so just return EINVAL and let the caller handle it.
3778 		 */
3779 		mutex_exit(&stp->sd_lock);
3780 		return (EINVAL);
3781 	}
3782 
3783 	if (rwnext_enter(qp) == B_FALSE) {
3784 		mutex_exit(&stp->sd_lock);
3785 		return (EINVAL);
3786 	}
3787 
3788 	sq = qp->q_syncq;
3789 	mutex_enter(SQLOCK(sq));
3790 	mutex_exit(&stp->sd_lock);
3791 	count = sq->sq_count;
3792 	flags = sq->sq_flags;
3793 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3794 
3795 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3796 		/*
3797 		 * if this queue is being closed, return.
3798 		 */
3799 		if (qp->q_flag & QWCLOSE) {
3800 			mutex_exit(SQLOCK(sq));
3801 			rwnext_exit(qp);
3802 			return (EINVAL);
3803 		}
3804 
3805 		/*
3806 		 * Wait until we can enter the inner perimeter.
3807 		 */
3808 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3809 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3810 		count = sq->sq_count;
3811 		flags = sq->sq_flags;
3812 	}
3813 
3814 	if (isread == 0 && stp->sd_struiowrq == NULL ||
3815 	    isread == 1 && stp->sd_struiordq == NULL) {
3816 		/*
3817 		 * Stream plumbing changed while waiting for inner perimeter
3818 		 * so just return EINVAL and let the caller handle it.
3819 		 */
3820 		mutex_exit(SQLOCK(sq));
3821 		rwnext_exit(qp);
3822 		return (EINVAL);
3823 	}
3824 	if (!(flags & SQ_CIPUT))
3825 		sq->sq_flags = flags | SQ_EXCL;
3826 	sq->sq_count = count + 1;
3827 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3828 	/*
3829 	 * Note: The only message ordering guarantee that rwnext() makes is
3830 	 *	 for the write queue flow-control case. All others (r/w queue
3831 	 *	 with q_count > 0 (or q_first != 0)) are the resposibilty of
3832 	 *	 the queue's rw procedure. This could be genralized here buy
3833 	 *	 running the queue's service procedure, but that wouldn't be
3834 	 *	 the most efficent for all cases.
3835 	 */
3836 	mutex_exit(SQLOCK(sq));
3837 	if (! isread && (qp->q_flag & QFULL)) {
3838 		/*
3839 		 * Write queue may be flow controlled. If so,
3840 		 * mark the queue for wakeup when it's not.
3841 		 */
3842 		mutex_enter(QLOCK(qp));
3843 		if (qp->q_flag & QFULL) {
3844 			qp->q_flag |= QWANTWSYNC;
3845 			mutex_exit(QLOCK(qp));
3846 			rval = EWOULDBLOCK;
3847 			goto out;
3848 		}
3849 		mutex_exit(QLOCK(qp));
3850 	}
3851 
3852 	if (! isread && dp->d_mp)
3853 		STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
3854 		    dp->d_mp->b_datap->db_base);
3855 
3856 	rval = (*proc)(qp, dp);
3857 
3858 	if (isread && dp->d_mp)
3859 		STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
3860 		    dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
3861 out:
3862 	/*
3863 	 * The queue is protected from being freed by sq_count, so it is
3864 	 * safe to call rwnext_exit and reacquire SQLOCK(sq).
3865 	 */
3866 	rwnext_exit(qp);
3867 
3868 	mutex_enter(SQLOCK(sq));
3869 	flags = sq->sq_flags;
3870 	ASSERT(sq->sq_count != 0);
3871 	sq->sq_count--;
3872 	if (flags & SQ_TAIL) {
3873 		putnext_tail(sq, qp, flags);
3874 		/*
3875 		 * The only purpose of this ASSERT is to preserve calling stack
3876 		 * in DEBUG kernel.
3877 		 */
3878 		ASSERT(flags & SQ_TAIL);
3879 		return (rval);
3880 	}
3881 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3882 	/*
3883 	 * Safe to always drop SQ_EXCL:
3884 	 *	Not SQ_CIPUT means we set SQ_EXCL above
3885 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3886 	 *	did a qwriter(INNER) in which case nobody else
3887 	 *	is in the inner perimeter and we are exiting.
3888 	 *
3889 	 * I would like to make the following assertion:
3890 	 *
3891 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3892 	 *	sq->sq_count == 0);
3893 	 *
3894 	 * which indicates that if we are both putshared and exclusive,
3895 	 * we became exclusive while executing the putproc, and the only
3896 	 * claim on the syncq was the one we dropped a few lines above.
3897 	 * But other threads that enter putnext while the syncq is exclusive
3898 	 * need to make a claim as they may need to drop SQLOCK in the
3899 	 * has_writers case to avoid deadlocks.  If these threads are
3900 	 * delayed or preempted, it is possible that the writer thread can
3901 	 * find out that there are other claims making the (sq_count == 0)
3902 	 * test invalid.
3903 	 */
3904 
3905 	sq->sq_flags = flags & ~SQ_EXCL;
3906 	if (sq->sq_flags & SQ_WANTWAKEUP) {
3907 		sq->sq_flags &= ~SQ_WANTWAKEUP;
3908 		cv_broadcast(&sq->sq_wait);
3909 	}
3910 	mutex_exit(SQLOCK(sq));
3911 	return (rval);
3912 }
3913 
3914 /*
3915  * The purpose of infonext() is to call the info procedure of the next
3916  * (downstream) modules queue.
3917  *
3918  * treated as put entrypoint for perimeter syncronization.
3919  *
3920  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3921  * sync queues). If it is CIPUT sync queue regular sq_count is incremented and
3922  * it does not matter if any regular put entrypoints have been already
3923  * entered.
3924  */
3925 int
3926 infonext(queue_t *qp, infod_t *idp)
3927 {
3928 	queue_t		*nqp;
3929 	syncq_t		*sq;
3930 	uint16_t	count;
3931 	uint16_t	flags;
3932 	struct qinit	*qi;
3933 	int		(*proc)();
3934 	struct stdata	*stp;
3935 	int		rval;
3936 
3937 	stp = STREAM(qp);
3938 	/*
3939 	 * Prevent q_next from changing by holding sd_lock until
3940 	 * acquiring SQLOCK.
3941 	 */
3942 	mutex_enter(&stp->sd_lock);
3943 	if ((nqp = _WR(qp)) == qp) {
3944 		qp = nqp->q_next;
3945 	} else {
3946 		qp = _RD(nqp->q_next);
3947 	}
3948 	qi = qp->q_qinfo;
3949 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
3950 		mutex_exit(&stp->sd_lock);
3951 		return (EINVAL);
3952 	}
3953 	sq = qp->q_syncq;
3954 	mutex_enter(SQLOCK(sq));
3955 	mutex_exit(&stp->sd_lock);
3956 	count = sq->sq_count;
3957 	flags = sq->sq_flags;
3958 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3959 
3960 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3961 		/*
3962 		 * Wait until we can enter the inner perimeter.
3963 		 */
3964 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3965 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3966 		count = sq->sq_count;
3967 		flags = sq->sq_flags;
3968 	}
3969 
3970 	if (! (flags & SQ_CIPUT))
3971 		sq->sq_flags = flags | SQ_EXCL;
3972 	sq->sq_count = count + 1;
3973 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3974 	mutex_exit(SQLOCK(sq));
3975 
3976 	rval = (*proc)(qp, idp);
3977 
3978 	mutex_enter(SQLOCK(sq));
3979 	flags = sq->sq_flags;
3980 	ASSERT(sq->sq_count != 0);
3981 	sq->sq_count--;
3982 	if (flags & SQ_TAIL) {
3983 		putnext_tail(sq, qp, flags);
3984 		/*
3985 		 * The only purpose of this ASSERT is to preserve calling stack
3986 		 * in DEBUG kernel.
3987 		 */
3988 		ASSERT(flags & SQ_TAIL);
3989 		return (rval);
3990 	}
3991 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3992 /*
3993  * XXXX
3994  * I am not certain the next comment is correct here.  I need to consider
3995  * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
3996  * might cause other problems.  It just might be safer to drop it if
3997  * !SQ_CIPUT because that is when we set it.
3998  */
3999 	/*
4000 	 * Safe to always drop SQ_EXCL:
4001 	 *	Not SQ_CIPUT means we set SQ_EXCL above
4002 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
4003 	 *	did a qwriter(INNER) in which case nobody else
4004 	 *	is in the inner perimeter and we are exiting.
4005 	 *
4006 	 * I would like to make the following assertion:
4007 	 *
4008 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
4009 	 *	sq->sq_count == 0);
4010 	 *
4011 	 * which indicates that if we are both putshared and exclusive,
4012 	 * we became exclusive while executing the putproc, and the only
4013 	 * claim on the syncq was the one we dropped a few lines above.
4014 	 * But other threads that enter putnext while the syncq is exclusive
4015 	 * need to make a claim as they may need to drop SQLOCK in the
4016 	 * has_writers case to avoid deadlocks.  If these threads are
4017 	 * delayed or preempted, it is possible that the writer thread can
4018 	 * find out that there are other claims making the (sq_count == 0)
4019 	 * test invalid.
4020 	 */
4021 
4022 	sq->sq_flags = flags & ~SQ_EXCL;
4023 	mutex_exit(SQLOCK(sq));
4024 	return (rval);
4025 }
4026 
4027 /*
4028  * Return nonzero if the queue is responsible for struio(), else return 0.
4029  */
4030 int
4031 isuioq(queue_t *q)
4032 {
4033 	if (q->q_flag & QREADR)
4034 		return (STREAM(q)->sd_struiordq == q);
4035 	else
4036 		return (STREAM(q)->sd_struiowrq == q);
4037 }
4038 
4039 #if defined(__sparc)
4040 int disable_putlocks = 0;
4041 #else
4042 int disable_putlocks = 1;
4043 #endif
4044 
4045 /*
4046  * called by create_putlock.
4047  */
4048 static void
4049 create_syncq_putlocks(queue_t *q)
4050 {
4051 	syncq_t	*sq = q->q_syncq;
4052 	ciputctrl_t *cip;
4053 	int i;
4054 
4055 	ASSERT(sq != NULL);
4056 
4057 	ASSERT(disable_putlocks == 0);
4058 	ASSERT(n_ciputctrl >= min_n_ciputctrl);
4059 	ASSERT(ciputctrl_cache != NULL);
4060 
4061 	if (!(sq->sq_type & SQ_CIPUT))
4062 		return;
4063 
4064 	for (i = 0; i <= 1; i++) {
4065 		if (sq->sq_ciputctrl == NULL) {
4066 			cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4067 			SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4068 			mutex_enter(SQLOCK(sq));
4069 			if (sq->sq_ciputctrl != NULL) {
4070 				mutex_exit(SQLOCK(sq));
4071 				kmem_cache_free(ciputctrl_cache, cip);
4072 			} else {
4073 				ASSERT(sq->sq_nciputctrl == 0);
4074 				sq->sq_nciputctrl = n_ciputctrl - 1;
4075 				/*
4076 				 * putnext checks sq_ciputctrl without holding
4077 				 * SQLOCK. if it is not NULL putnext assumes
4078 				 * sq_nciputctrl is initialized. membar below
4079 				 * insures that.
4080 				 */
4081 				membar_producer();
4082 				sq->sq_ciputctrl = cip;
4083 				mutex_exit(SQLOCK(sq));
4084 			}
4085 		}
4086 		ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
4087 		if (i == 1)
4088 			break;
4089 		q = _OTHERQ(q);
4090 		if (!(q->q_flag & QPERQ)) {
4091 			ASSERT(sq == q->q_syncq);
4092 			break;
4093 		}
4094 		ASSERT(q->q_syncq != NULL);
4095 		ASSERT(sq != q->q_syncq);
4096 		sq = q->q_syncq;
4097 		ASSERT(sq->sq_type & SQ_CIPUT);
4098 	}
4099 }
4100 
4101 /*
4102  * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
4103  * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
4104  * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
4105  * starting from q and down to the driver.
4106  *
4107  * This should be called after the affected queues are part of stream
4108  * geometry. It should be called from driver/module open routine after
4109  * qprocson() call. It is also called from nfs syscall where it is known that
4110  * stream is configured and won't change its geometry during create_putlock
4111  * call.
4112  *
4113  * caller normally uses 0 value for the stream argument to speed up MT putnext
4114  * into the perimeter of q for example because its perimeter is per module
4115  * (e.g. IP).
4116  *
4117  * caller normally uses non 0 value for the stream argument to hint the system
4118  * that the stream of q is a very contended global system stream
4119  * (e.g. NFS/UDP) and the part of the stream from q to the driver is
4120  * particularly MT hot.
4121  *
4122  * Caller insures stream plumbing won't happen while we are here and therefore
4123  * q_next can be safely used.
4124  */
4125 
4126 void
4127 create_putlocks(queue_t *q, int stream)
4128 {
4129 	ciputctrl_t	*cip;
4130 	struct stdata	*stp = STREAM(q);
4131 
4132 	q = _WR(q);
4133 	ASSERT(stp != NULL);
4134 
4135 	if (disable_putlocks != 0)
4136 		return;
4137 
4138 	if (n_ciputctrl < min_n_ciputctrl)
4139 		return;
4140 
4141 	ASSERT(ciputctrl_cache != NULL);
4142 
4143 	if (stream != 0 && stp->sd_ciputctrl == NULL) {
4144 		cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4145 		SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4146 		mutex_enter(&stp->sd_lock);
4147 		if (stp->sd_ciputctrl != NULL) {
4148 			mutex_exit(&stp->sd_lock);
4149 			kmem_cache_free(ciputctrl_cache, cip);
4150 		} else {
4151 			ASSERT(stp->sd_nciputctrl == 0);
4152 			stp->sd_nciputctrl = n_ciputctrl - 1;
4153 			/*
4154 			 * putnext checks sd_ciputctrl without holding
4155 			 * sd_lock. if it is not NULL putnext assumes
4156 			 * sd_nciputctrl is initialized. membar below
4157 			 * insures that.
4158 			 */
4159 			membar_producer();
4160 			stp->sd_ciputctrl = cip;
4161 			mutex_exit(&stp->sd_lock);
4162 		}
4163 	}
4164 
4165 	ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);
4166 
4167 	while (_SAMESTR(q)) {
4168 		create_syncq_putlocks(q);
4169 		if (stream == 0)
4170 			return;
4171 		q = q->q_next;
4172 	}
4173 	ASSERT(q != NULL);
4174 	create_syncq_putlocks(q);
4175 }
4176 
4177 /*
4178  * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
4179  * through a stream.
4180  *
4181  * Data currently record per-event is a timestamp, module/driver name,
4182  * downstream module/driver name, optional callstack, event type and a per
4183  * type datum.  Much of the STREAMS framework is instrumented for automatic
4184  * flow tracing (when enabled).  Events can be defined and used by STREAMS
4185  * modules and drivers.
4186  *
4187  * Global objects:
4188  *
4189  *	str_ftevent() - Add a flow-trace event to a dblk.
4190  *	str_ftfree() - Free flow-trace data
4191  *
4192  * Local objects:
4193  *
4194  *	fthdr_cache - pointer to the kmem cache for trace header.
4195  *	ftblk_cache - pointer to the kmem cache for trace data blocks.
4196  */
4197 
4198 int str_ftnever = 1;	/* Don't do STREAMS flow tracing */
4199 int str_ftstack = 0;	/* Don't record event call stacks */
4200 
4201 void
4202 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
4203 {
4204 	ftblk_t *bp = hp->tail;
4205 	ftblk_t *nbp;
4206 	ftevnt_t *ep;
4207 	int ix, nix;
4208 
4209 	ASSERT(hp != NULL);
4210 
4211 	for (;;) {
4212 		if ((ix = bp->ix) == FTBLK_EVNTS) {
4213 			/*
4214 			 * Tail doesn't have room, so need a new tail.
4215 			 *
4216 			 * To make this MT safe, first, allocate a new
4217 			 * ftblk, and initialize it.  To make life a
4218 			 * little easier, reserve the first slot (mostly
4219 			 * by making ix = 1).  When we are finished with
4220 			 * the initialization, CAS this pointer to the
4221 			 * tail.  If this succeeds, this is the new
4222 			 * "next" block.  Otherwise, another thread
4223 			 * got here first, so free the block and start
4224 			 * again.
4225 			 */
4226 			nbp = kmem_cache_alloc(ftblk_cache, KM_NOSLEEP);
4227 			if (nbp == NULL) {
4228 				/* no mem, so punt */
4229 				str_ftnever++;
4230 				/* free up all flow data? */
4231 				return;
4232 			}
4233 			nbp->nxt = NULL;
4234 			nbp->ix = 1;
4235 			/*
4236 			 * Just in case there is another thread about
4237 			 * to get the next index, we need to make sure
4238 			 * the value is there for it.
4239 			 */
4240 			membar_producer();
4241 			if (atomic_cas_ptr(&hp->tail, bp, nbp) == bp) {
4242 				/* CAS was successful */
4243 				bp->nxt = nbp;
4244 				membar_producer();
4245 				bp = nbp;
4246 				ix = 0;
4247 				goto cas_good;
4248 			} else {
4249 				kmem_cache_free(ftblk_cache, nbp);
4250 				bp = hp->tail;
4251 				continue;
4252 			}
4253 		}
4254 		nix = ix + 1;
4255 		if (atomic_cas_32((uint32_t *)&bp->ix, ix, nix) == ix) {
4256 		cas_good:
4257 			if (curthread != hp->thread) {
4258 				hp->thread = curthread;
4259 				evnt |= FTEV_CS;
4260 			}
4261 			if (CPU->cpu_seqid != hp->cpu_seqid) {
4262 				hp->cpu_seqid = CPU->cpu_seqid;
4263 				evnt |= FTEV_PS;
4264 			}
4265 			ep = &bp->ev[ix];
4266 			break;
4267 		}
4268 	}
4269 
4270 	if (evnt & FTEV_QMASK) {
4271 		queue_t *qp = p;
4272 
4273 		if (!(qp->q_flag & QREADR))
4274 			evnt |= FTEV_ISWR;
4275 
4276 		ep->mid = Q2NAME(qp);
4277 
4278 		/*
4279 		 * We only record the next queue name for FTEV_PUTNEXT since
4280 		 * that's the only time we *really* need it, and the putnext()
4281 		 * code ensures that qp->q_next won't vanish.  (We could use
4282 		 * claimstr()/releasestr() but at a performance cost.)
4283 		 */
4284 		if ((evnt & FTEV_MASK) == FTEV_PUTNEXT && qp->q_next != NULL)
4285 			ep->midnext = Q2NAME(qp->q_next);
4286 		else
4287 			ep->midnext = NULL;
4288 	} else {
4289 		ep->mid = p;
4290 		ep->midnext = NULL;
4291 	}
4292 
4293 	if (ep->stk != NULL)
4294 		ep->stk->fs_depth = getpcstack(ep->stk->fs_stk, FTSTK_DEPTH);
4295 
4296 	ep->ts = gethrtime();
4297 	ep->evnt = evnt;
4298 	ep->data = data;
4299 	hp->hash = (hp->hash << 9) + hp->hash;
4300 	hp->hash += (evnt << 16) | data;
4301 	hp->hash += (uintptr_t)ep->mid;
4302 }
4303 
4304 /*
4305  * Free flow-trace data.
4306  */
4307 void
4308 str_ftfree(dblk_t *dbp)
4309 {
4310 	fthdr_t *hp = dbp->db_fthdr;
4311 	ftblk_t *bp = &hp->first;
4312 	ftblk_t *nbp;
4313 
4314 	if (bp != hp->tail || bp->ix != 0) {
4315 		/*
4316 		 * Clear out the hash, have the tail point to itself, and free
4317 		 * any continuation blocks.
4318 		 */
4319 		bp = hp->first.nxt;
4320 		hp->tail = &hp->first;
4321 		hp->hash = 0;
4322 		hp->first.nxt = NULL;
4323 		hp->first.ix = 0;
4324 		while (bp != NULL) {
4325 			nbp = bp->nxt;
4326 			kmem_cache_free(ftblk_cache, bp);
4327 			bp = nbp;
4328 		}
4329 	}
4330 	kmem_cache_free(fthdr_cache, hp);
4331 	dbp->db_fthdr = NULL;
4332 }
4333