xref: /illumos-gate/usr/src/uts/common/io/stream.c (revision 806838751b3ce15414781bffd4adfac166204c62)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
22 /*	All Rights Reserved	*/
23 
24 /*
25  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
26  * Use is subject to license terms.
27  *
28  * Copyright 2021 Tintri by DDN, Inc. All rights reserved.
29  * Copyright 2022 Garrett D'Amore
30  */
31 
32 #include <sys/types.h>
33 #include <sys/param.h>
34 #include <sys/thread.h>
35 #include <sys/sysmacros.h>
36 #include <sys/stropts.h>
37 #include <sys/stream.h>
38 #include <sys/strsubr.h>
39 #include <sys/strsun.h>
40 #include <sys/conf.h>
41 #include <sys/debug.h>
42 #include <sys/cmn_err.h>
43 #include <sys/kmem.h>
44 #include <sys/atomic.h>
45 #include <sys/errno.h>
46 #include <sys/vtrace.h>
47 #include <sys/ftrace.h>
48 #include <sys/ontrap.h>
49 #include <sys/sdt.h>
50 #include <sys/strft.h>
51 
52 #ifdef DEBUG
53 #include <sys/kmem_impl.h>
54 #endif
55 
56 /*
57  * This file contains all the STREAMS utility routines that may
58  * be used by modules and drivers.
59  */
60 
61 /*
62  * STREAMS message allocator: principles of operation
63  *
64  * The streams message allocator consists of all the routines that
65  * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
66  * dupb(), freeb() and freemsg().  What follows is a high-level view
67  * of how the allocator works.
68  *
69  * Every streams message consists of one or more mblks, a dblk, and data.
70  * All mblks for all types of messages come from a common mblk_cache.
71  * The dblk and data come in several flavors, depending on how the
72  * message is allocated:
73  *
74  * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
75  *     fixed-size dblk/data caches. For message sizes that are multiples of
76  *     PAGESIZE, dblks are allocated separately from the buffer.
77  *     The associated buffer is allocated by the constructor using kmem_alloc().
78  *     For all other message sizes, dblk and its associated data is allocated
79  *     as a single contiguous chunk of memory.
80  *     Objects in these caches consist of a dblk plus its associated data.
81  *     allocb() determines the nearest-size cache by table lookup:
82  *     the dblk_cache[] array provides the mapping from size to dblk cache.
83  *
84  * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
85  *     kmem_alloc()'ing a buffer for the data and supplying that
86  *     buffer to gesballoc(), described below.
87  *
88  * (3) The four flavors of [d]esballoc[a] are all implemented by a
89  *     common routine, gesballoc() ("generic esballoc").  gesballoc()
90  *     allocates a dblk from the global dblk_esb_cache and sets db_base,
91  *     db_lim and db_frtnp to describe the caller-supplied buffer.
92  *
93  * While there are several routines to allocate messages, there is only
94  * one routine to free messages: freeb().  freeb() simply invokes the
95  * dblk's free method, dbp->db_free(), which is set at allocation time.
96  *
97  * dupb() creates a new reference to a message by allocating a new mblk,
98  * incrementing the dblk reference count and setting the dblk's free
99  * method to dblk_decref().  The dblk's original free method is retained
100  * in db_lastfree.  dblk_decref() decrements the reference count on each
101  * freeb().  If this is not the last reference it just frees the mblk;
102  * if this *is* the last reference, it restores db_free to db_lastfree,
103  * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
104  *
105  * The implementation makes aggressive use of kmem object caching for
106  * maximum performance.  This makes the code simple and compact, but
107  * also a bit abstruse in some places.  The invariants that constitute a
108  * message's constructed state, described below, are more subtle than usual.
109  *
110  * Every dblk has an "attached mblk" as part of its constructed state.
111  * The mblk is allocated by the dblk's constructor and remains attached
112  * until the message is either dup'ed or pulled up.  In the dupb() case
113  * the mblk association doesn't matter until the last free, at which time
114  * dblk_decref() attaches the last mblk to the dblk.  pullupmsg() affects
115  * the mblk association because it swaps the leading mblks of two messages,
116  * so it is responsible for swapping their db_mblk pointers accordingly.
117  * From a constructed-state viewpoint it doesn't matter that a dblk's
118  * attached mblk can change while the message is allocated; all that
119  * matters is that the dblk has *some* attached mblk when it's freed.
120  *
121  * The sizes of the allocb() small-message caches are not magical.
122  * They represent a good trade-off between internal and external
123  * fragmentation for current workloads.  They should be reevaluated
124  * periodically, especially if allocations larger than DBLK_MAX_CACHE
125  * become common.  We use 64-byte alignment so that dblks don't
126  * straddle cache lines unnecessarily.
127  */
128 #define	DBLK_MAX_CACHE		73728
129 #define	DBLK_CACHE_ALIGN	64
130 #define	DBLK_MIN_SIZE		8
131 #define	DBLK_SIZE_SHIFT		3
132 
133 #ifdef _BIG_ENDIAN
134 #define	DBLK_RTFU_SHIFT(field)	\
135 	(8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
136 #else
137 #define	DBLK_RTFU_SHIFT(field)	\
138 	(8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
139 #endif
140 
141 #define	DBLK_RTFU(ref, type, flags, uioflag)	\
142 	(((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
143 	((type) << DBLK_RTFU_SHIFT(db_type)) | \
144 	(((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
145 	((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
146 #define	DBLK_RTFU_REF_MASK	(DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
147 #define	DBLK_RTFU_WORD(dbp)	(*((uint32_t *)&(dbp)->db_ref))
148 #define	MBLK_BAND_FLAG_WORD(mp)	(*((uint32_t *)&(mp)->b_band))
149 
150 static size_t dblk_sizes[] = {
151 #ifdef _LP64
152 	16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856,
153 	8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624,
154 	40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392,
155 #else
156 	64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904,
157 	8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672,
158 	40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440,
159 #endif
160 	DBLK_MAX_CACHE, 0
161 };
162 
163 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
164 static struct kmem_cache *mblk_cache;
165 static struct kmem_cache *dblk_esb_cache;
166 static struct kmem_cache *fthdr_cache;
167 static struct kmem_cache *ftblk_cache;
168 
169 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
170 static mblk_t *allocb_oversize(size_t size, int flags);
171 static int allocb_tryhard_fails;
172 static void frnop_func(void *arg);
173 frtn_t frnop = { frnop_func };
174 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
175 
176 static boolean_t rwnext_enter(queue_t *qp);
177 static void rwnext_exit(queue_t *qp);
178 
179 /*
180  * Patchable mblk/dblk kmem_cache flags.
181  */
182 int dblk_kmem_flags = 0;
183 int mblk_kmem_flags = 0;
184 
185 static int
186 dblk_constructor(void *buf, void *cdrarg, int kmflags)
187 {
188 	dblk_t *dbp = buf;
189 	ssize_t msg_size = (ssize_t)cdrarg;
190 	size_t index;
191 
192 	ASSERT(msg_size != 0);
193 
194 	index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
195 
196 	ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
197 
198 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
199 		return (-1);
200 	if ((msg_size & PAGEOFFSET) == 0) {
201 		dbp->db_base = kmem_alloc(msg_size, kmflags);
202 		if (dbp->db_base == NULL) {
203 			kmem_cache_free(mblk_cache, dbp->db_mblk);
204 			return (-1);
205 		}
206 	} else {
207 		dbp->db_base = (unsigned char *)&dbp[1];
208 	}
209 
210 	dbp->db_mblk->b_datap = dbp;
211 	dbp->db_cache = dblk_cache[index];
212 	dbp->db_lim = dbp->db_base + msg_size;
213 	dbp->db_free = dbp->db_lastfree = dblk_lastfree;
214 	dbp->db_frtnp = NULL;
215 	dbp->db_fthdr = NULL;
216 	dbp->db_credp = NULL;
217 	dbp->db_cpid = -1;
218 	dbp->db_struioflag = 0;
219 	dbp->db_struioun.cksum.flags = 0;
220 	return (0);
221 }
222 
223 /*ARGSUSED*/
224 static int
225 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
226 {
227 	dblk_t *dbp = buf;
228 
229 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
230 		return (-1);
231 	dbp->db_mblk->b_datap = dbp;
232 	dbp->db_cache = dblk_esb_cache;
233 	dbp->db_fthdr = NULL;
234 	dbp->db_credp = NULL;
235 	dbp->db_cpid = -1;
236 	dbp->db_struioflag = 0;
237 	dbp->db_struioun.cksum.flags = 0;
238 	return (0);
239 }
240 
241 static int
242 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
243 {
244 	dblk_t *dbp = buf;
245 	bcache_t *bcp = cdrarg;
246 
247 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
248 		return (-1);
249 
250 	dbp->db_base = kmem_cache_alloc(bcp->buffer_cache, kmflags);
251 	if (dbp->db_base == NULL) {
252 		kmem_cache_free(mblk_cache, dbp->db_mblk);
253 		return (-1);
254 	}
255 
256 	dbp->db_mblk->b_datap = dbp;
257 	dbp->db_cache = (void *)bcp;
258 	dbp->db_lim = dbp->db_base + bcp->size;
259 	dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
260 	dbp->db_frtnp = NULL;
261 	dbp->db_fthdr = NULL;
262 	dbp->db_credp = NULL;
263 	dbp->db_cpid = -1;
264 	dbp->db_struioflag = 0;
265 	dbp->db_struioun.cksum.flags = 0;
266 	return (0);
267 }
268 
269 /*ARGSUSED*/
270 static void
271 dblk_destructor(void *buf, void *cdrarg)
272 {
273 	dblk_t *dbp = buf;
274 	ssize_t msg_size = (ssize_t)cdrarg;
275 
276 	ASSERT(dbp->db_mblk->b_datap == dbp);
277 	ASSERT(msg_size != 0);
278 	ASSERT(dbp->db_struioflag == 0);
279 	ASSERT(dbp->db_struioun.cksum.flags == 0);
280 
281 	if ((msg_size & PAGEOFFSET) == 0) {
282 		kmem_free(dbp->db_base, msg_size);
283 	}
284 
285 	kmem_cache_free(mblk_cache, dbp->db_mblk);
286 }
287 
288 static void
289 bcache_dblk_destructor(void *buf, void *cdrarg)
290 {
291 	dblk_t *dbp = buf;
292 	bcache_t *bcp = cdrarg;
293 
294 	kmem_cache_free(bcp->buffer_cache, dbp->db_base);
295 
296 	ASSERT(dbp->db_mblk->b_datap == dbp);
297 	ASSERT(dbp->db_struioflag == 0);
298 	ASSERT(dbp->db_struioun.cksum.flags == 0);
299 
300 	kmem_cache_free(mblk_cache, dbp->db_mblk);
301 }
302 
303 /* ARGSUSED */
304 static int
305 ftblk_constructor(void *buf, void *cdrarg, int kmflags)
306 {
307 	ftblk_t *fbp = buf;
308 	int i;
309 
310 	bzero(fbp, sizeof (ftblk_t));
311 	if (str_ftstack != 0) {
312 		for (i = 0; i < FTBLK_EVNTS; i++)
313 			fbp->ev[i].stk = kmem_alloc(sizeof (ftstk_t), kmflags);
314 	}
315 
316 	return (0);
317 }
318 
319 /* ARGSUSED */
320 static void
321 ftblk_destructor(void *buf, void *cdrarg)
322 {
323 	ftblk_t *fbp = buf;
324 	int i;
325 
326 	if (str_ftstack != 0) {
327 		for (i = 0; i < FTBLK_EVNTS; i++) {
328 			if (fbp->ev[i].stk != NULL) {
329 				kmem_free(fbp->ev[i].stk, sizeof (ftstk_t));
330 				fbp->ev[i].stk = NULL;
331 			}
332 		}
333 	}
334 }
335 
336 static int
337 fthdr_constructor(void *buf, void *cdrarg, int kmflags)
338 {
339 	fthdr_t *fhp = buf;
340 
341 	return (ftblk_constructor(&fhp->first, cdrarg, kmflags));
342 }
343 
344 static void
345 fthdr_destructor(void *buf, void *cdrarg)
346 {
347 	fthdr_t *fhp = buf;
348 
349 	ftblk_destructor(&fhp->first, cdrarg);
350 }
351 
352 void
353 streams_msg_init(void)
354 {
355 	char name[40];
356 	size_t size;
357 	size_t lastsize = DBLK_MIN_SIZE;
358 	size_t *sizep;
359 	struct kmem_cache *cp;
360 	size_t tot_size;
361 	int offset;
362 
363 	mblk_cache = kmem_cache_create("streams_mblk", sizeof (mblk_t), 32,
364 	    NULL, NULL, NULL, NULL, NULL, mblk_kmem_flags);
365 
366 	for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
367 
368 		if ((offset = (size & PAGEOFFSET)) != 0) {
369 			/*
370 			 * We are in the middle of a page, dblk should
371 			 * be allocated on the same page
372 			 */
373 			tot_size = size + sizeof (dblk_t);
374 			ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
375 			    < PAGESIZE);
376 			ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
377 
378 		} else {
379 
380 			/*
381 			 * buf size is multiple of page size, dblk and
382 			 * buffer are allocated separately.
383 			 */
384 
385 			ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
386 			tot_size = sizeof (dblk_t);
387 		}
388 
389 		(void) sprintf(name, "streams_dblk_%ld", size);
390 		cp = kmem_cache_create(name, tot_size, DBLK_CACHE_ALIGN,
391 		    dblk_constructor, dblk_destructor, NULL, (void *)(size),
392 		    NULL, dblk_kmem_flags);
393 
394 		while (lastsize <= size) {
395 			dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
396 			lastsize += DBLK_MIN_SIZE;
397 		}
398 	}
399 
400 	dblk_esb_cache = kmem_cache_create("streams_dblk_esb", sizeof (dblk_t),
401 	    DBLK_CACHE_ALIGN, dblk_esb_constructor, dblk_destructor, NULL,
402 	    (void *)sizeof (dblk_t), NULL, dblk_kmem_flags);
403 	fthdr_cache = kmem_cache_create("streams_fthdr", sizeof (fthdr_t), 32,
404 	    fthdr_constructor, fthdr_destructor, NULL, NULL, NULL, 0);
405 	ftblk_cache = kmem_cache_create("streams_ftblk", sizeof (ftblk_t), 32,
406 	    ftblk_constructor, ftblk_destructor, NULL, NULL, NULL, 0);
407 
408 	/* initialize throttling queue for esballoc */
409 	esballoc_queue_init();
410 }
411 
412 /*ARGSUSED*/
413 mblk_t *
414 allocb(size_t size, uint_t pri)
415 {
416 	dblk_t *dbp;
417 	mblk_t *mp;
418 	size_t index;
419 
420 	index =  (size - 1)  >> DBLK_SIZE_SHIFT;
421 
422 	if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
423 		if (size != 0) {
424 			mp = allocb_oversize(size, KM_NOSLEEP);
425 			goto out;
426 		}
427 		index = 0;
428 	}
429 
430 	if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
431 		mp = NULL;
432 		goto out;
433 	}
434 
435 	mp = dbp->db_mblk;
436 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
437 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
438 	mp->b_rptr = mp->b_wptr = dbp->db_base;
439 	mp->b_queue = NULL;
440 	MBLK_BAND_FLAG_WORD(mp) = 0;
441 	STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
442 out:
443 	FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
444 
445 	return (mp);
446 }
447 
448 /*
449  * Allocate an mblk taking db_credp and db_cpid from the template.
450  * Allow the cred to be NULL.
451  */
452 mblk_t *
453 allocb_tmpl(size_t size, const mblk_t *tmpl)
454 {
455 	mblk_t *mp = allocb(size, 0);
456 
457 	if (mp != NULL) {
458 		dblk_t *src = tmpl->b_datap;
459 		dblk_t *dst = mp->b_datap;
460 		cred_t *cr;
461 		pid_t cpid;
462 
463 		cr = msg_getcred(tmpl, &cpid);
464 		if (cr != NULL)
465 			crhold(dst->db_credp = cr);
466 		dst->db_cpid = cpid;
467 		dst->db_type = src->db_type;
468 	}
469 	return (mp);
470 }
471 
472 mblk_t *
473 allocb_cred(size_t size, cred_t *cr, pid_t cpid)
474 {
475 	mblk_t *mp = allocb(size, 0);
476 
477 	ASSERT(cr != NULL);
478 	if (mp != NULL) {
479 		dblk_t *dbp = mp->b_datap;
480 
481 		crhold(dbp->db_credp = cr);
482 		dbp->db_cpid = cpid;
483 	}
484 	return (mp);
485 }
486 
487 mblk_t *
488 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr, pid_t cpid)
489 {
490 	mblk_t *mp = allocb_wait(size, 0, flags, error);
491 
492 	ASSERT(cr != NULL);
493 	if (mp != NULL) {
494 		dblk_t *dbp = mp->b_datap;
495 
496 		crhold(dbp->db_credp = cr);
497 		dbp->db_cpid = cpid;
498 	}
499 
500 	return (mp);
501 }
502 
503 /*
504  * Extract the db_cred (and optionally db_cpid) from a message.
505  * We find the first mblk which has a non-NULL db_cred and use that.
506  * If none found we return NULL.
507  * Does NOT get a hold on the cred.
508  */
509 cred_t *
510 msg_getcred(const mblk_t *mp, pid_t *cpidp)
511 {
512 	cred_t *cr = NULL;
513 	cred_t *cr2;
514 	mblk_t *mp2;
515 
516 	while (mp != NULL) {
517 		dblk_t *dbp = mp->b_datap;
518 
519 		cr = dbp->db_credp;
520 		if (cr == NULL) {
521 			mp = mp->b_cont;
522 			continue;
523 		}
524 		if (cpidp != NULL)
525 			*cpidp = dbp->db_cpid;
526 
527 #ifdef DEBUG
528 		/*
529 		 * Normally there should at most one db_credp in a message.
530 		 * But if there are multiple (as in the case of some M_IOC*
531 		 * and some internal messages in TCP/IP bind logic) then
532 		 * they must be identical in the normal case.
533 		 * However, a socket can be shared between different uids
534 		 * in which case data queued in TCP would be from different
535 		 * creds. Thus we can only assert for the zoneid being the
536 		 * same. Due to Multi-level Level Ports for TX, some
537 		 * cred_t can have a NULL cr_zone, and we skip the comparison
538 		 * in that case.
539 		 */
540 		mp2 = mp->b_cont;
541 		while (mp2 != NULL) {
542 			cr2 = DB_CRED(mp2);
543 			if (cr2 != NULL) {
544 				DTRACE_PROBE2(msg__getcred,
545 				    cred_t *, cr, cred_t *, cr2);
546 				ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
547 				    crgetzone(cr) == NULL ||
548 				    crgetzone(cr2) == NULL);
549 			}
550 			mp2 = mp2->b_cont;
551 		}
552 #endif
553 		return (cr);
554 	}
555 	if (cpidp != NULL)
556 		*cpidp = NOPID;
557 	return (NULL);
558 }
559 
560 /*
561  * Variant of msg_getcred which, when a cred is found
562  * 1. Returns with a hold on the cred
563  * 2. Clears the first cred in the mblk.
564  * This is more efficient to use than a msg_getcred() + crhold() when
565  * the message is freed after the cred has been extracted.
566  *
567  * The caller is responsible for ensuring that there is no other reference
568  * on the message since db_credp can not be cleared when there are other
569  * references.
570  */
571 cred_t *
572 msg_extractcred(mblk_t *mp, pid_t *cpidp)
573 {
574 	cred_t *cr = NULL;
575 	cred_t *cr2;
576 	mblk_t *mp2;
577 
578 	while (mp != NULL) {
579 		dblk_t *dbp = mp->b_datap;
580 
581 		cr = dbp->db_credp;
582 		if (cr == NULL) {
583 			mp = mp->b_cont;
584 			continue;
585 		}
586 		ASSERT(dbp->db_ref == 1);
587 		dbp->db_credp = NULL;
588 		if (cpidp != NULL)
589 			*cpidp = dbp->db_cpid;
590 #ifdef DEBUG
591 		/*
592 		 * Normally there should at most one db_credp in a message.
593 		 * But if there are multiple (as in the case of some M_IOC*
594 		 * and some internal messages in TCP/IP bind logic) then
595 		 * they must be identical in the normal case.
596 		 * However, a socket can be shared between different uids
597 		 * in which case data queued in TCP would be from different
598 		 * creds. Thus we can only assert for the zoneid being the
599 		 * same. Due to Multi-level Level Ports for TX, some
600 		 * cred_t can have a NULL cr_zone, and we skip the comparison
601 		 * in that case.
602 		 */
603 		mp2 = mp->b_cont;
604 		while (mp2 != NULL) {
605 			cr2 = DB_CRED(mp2);
606 			if (cr2 != NULL) {
607 				DTRACE_PROBE2(msg__extractcred,
608 				    cred_t *, cr, cred_t *, cr2);
609 				ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
610 				    crgetzone(cr) == NULL ||
611 				    crgetzone(cr2) == NULL);
612 			}
613 			mp2 = mp2->b_cont;
614 		}
615 #endif
616 		return (cr);
617 	}
618 	return (NULL);
619 }
620 /*
621  * Get the label for a message. Uses the first mblk in the message
622  * which has a non-NULL db_credp.
623  * Returns NULL if there is no credp.
624  */
625 extern struct ts_label_s *
626 msg_getlabel(const mblk_t *mp)
627 {
628 	cred_t *cr = msg_getcred(mp, NULL);
629 
630 	if (cr == NULL)
631 		return (NULL);
632 
633 	return (crgetlabel(cr));
634 }
635 
636 void
637 freeb(mblk_t *mp)
638 {
639 	dblk_t *dbp = mp->b_datap;
640 
641 	ASSERT(dbp->db_ref > 0);
642 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
643 	FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
644 
645 	STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
646 
647 	dbp->db_free(mp, dbp);
648 }
649 
650 void
651 freemsg(mblk_t *mp)
652 {
653 	FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
654 	while (mp) {
655 		dblk_t *dbp = mp->b_datap;
656 		mblk_t *mp_cont = mp->b_cont;
657 
658 		ASSERT(dbp->db_ref > 0);
659 		ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
660 
661 		STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
662 
663 		dbp->db_free(mp, dbp);
664 		mp = mp_cont;
665 	}
666 }
667 
668 /*
669  * Reallocate a block for another use.  Try hard to use the old block.
670  * If the old data is wanted (copy), leave b_wptr at the end of the data,
671  * otherwise return b_wptr = b_rptr.
672  *
673  * This routine is private and unstable.
674  */
675 mblk_t	*
676 reallocb(mblk_t *mp, size_t size, uint_t copy)
677 {
678 	mblk_t		*mp1;
679 	unsigned char	*old_rptr;
680 	ptrdiff_t	cur_size;
681 
682 	if (mp == NULL)
683 		return (allocb(size, BPRI_HI));
684 
685 	cur_size = mp->b_wptr - mp->b_rptr;
686 	old_rptr = mp->b_rptr;
687 
688 	ASSERT(mp->b_datap->db_ref != 0);
689 
690 	if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
691 		/*
692 		 * If the data is wanted and it will fit where it is, no
693 		 * work is required.
694 		 */
695 		if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
696 			return (mp);
697 
698 		mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
699 		mp1 = mp;
700 	} else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
701 		/* XXX other mp state could be copied too, db_flags ... ? */
702 		mp1->b_cont = mp->b_cont;
703 	} else {
704 		return (NULL);
705 	}
706 
707 	if (copy) {
708 		bcopy(old_rptr, mp1->b_rptr, cur_size);
709 		mp1->b_wptr = mp1->b_rptr + cur_size;
710 	}
711 
712 	if (mp != mp1)
713 		freeb(mp);
714 
715 	return (mp1);
716 }
717 
718 static void
719 dblk_lastfree(mblk_t *mp, dblk_t *dbp)
720 {
721 	ASSERT(dbp->db_mblk == mp);
722 	if (dbp->db_fthdr != NULL)
723 		str_ftfree(dbp);
724 
725 	/* set credp and projid to be 'unspecified' before returning to cache */
726 	if (dbp->db_credp != NULL) {
727 		crfree(dbp->db_credp);
728 		dbp->db_credp = NULL;
729 	}
730 	dbp->db_cpid = -1;
731 
732 	/* Reset the struioflag and the checksum flag fields */
733 	dbp->db_struioflag = 0;
734 	dbp->db_struioun.cksum.flags = 0;
735 
736 	/* and the COOKED and/or UIOA flag(s) */
737 	dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA);
738 
739 	kmem_cache_free(dbp->db_cache, dbp);
740 }
741 
742 static void
743 dblk_decref(mblk_t *mp, dblk_t *dbp)
744 {
745 	if (dbp->db_ref != 1) {
746 		uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
747 		    -(1 << DBLK_RTFU_SHIFT(db_ref)));
748 		/*
749 		 * atomic_add_32_nv() just decremented db_ref, so we no longer
750 		 * have a reference to the dblk, which means another thread
751 		 * could free it.  Therefore we cannot examine the dblk to
752 		 * determine whether ours was the last reference.  Instead,
753 		 * we extract the new and minimum reference counts from rtfu.
754 		 * Note that all we're really saying is "if (ref != refmin)".
755 		 */
756 		if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
757 		    ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
758 			kmem_cache_free(mblk_cache, mp);
759 			return;
760 		}
761 	}
762 	dbp->db_mblk = mp;
763 	dbp->db_free = dbp->db_lastfree;
764 	dbp->db_lastfree(mp, dbp);
765 }
766 
767 mblk_t *
768 dupb(mblk_t *mp)
769 {
770 	dblk_t *dbp = mp->b_datap;
771 	mblk_t *new_mp;
772 	uint32_t oldrtfu, newrtfu;
773 
774 	if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
775 		goto out;
776 
777 	new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
778 	new_mp->b_rptr = mp->b_rptr;
779 	new_mp->b_wptr = mp->b_wptr;
780 	new_mp->b_datap = dbp;
781 	new_mp->b_queue = NULL;
782 	MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
783 
784 	STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
785 
786 	dbp->db_free = dblk_decref;
787 	do {
788 		ASSERT(dbp->db_ref > 0);
789 		oldrtfu = DBLK_RTFU_WORD(dbp);
790 		newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
791 		/*
792 		 * If db_ref is maxed out we can't dup this message anymore.
793 		 */
794 		if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
795 			kmem_cache_free(mblk_cache, new_mp);
796 			new_mp = NULL;
797 			goto out;
798 		}
799 	} while (atomic_cas_32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) !=
800 	    oldrtfu);
801 
802 out:
803 	FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
804 	return (new_mp);
805 }
806 
807 static void
808 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
809 {
810 	frtn_t *frp = dbp->db_frtnp;
811 
812 	ASSERT(dbp->db_mblk == mp);
813 	frp->free_func(frp->free_arg);
814 	if (dbp->db_fthdr != NULL)
815 		str_ftfree(dbp);
816 
817 	/* set credp and projid to be 'unspecified' before returning to cache */
818 	if (dbp->db_credp != NULL) {
819 		crfree(dbp->db_credp);
820 		dbp->db_credp = NULL;
821 	}
822 	dbp->db_cpid = -1;
823 	dbp->db_struioflag = 0;
824 	dbp->db_struioun.cksum.flags = 0;
825 
826 	kmem_cache_free(dbp->db_cache, dbp);
827 }
828 
829 /*ARGSUSED*/
830 static void
831 frnop_func(void *arg)
832 {
833 }
834 
835 /*
836  * Generic esballoc used to implement the four flavors: [d]esballoc[a].
837  *
838  * The variants with a 'd' prefix (desballoc, desballoca)
839  *	directly free the mblk when it loses its last ref,
840  *	where the other variants free asynchronously.
841  * The variants with an 'a' suffix (esballoca, desballoca)
842  *	add an extra ref, effectively letting the streams subsystem
843  *	know that the message data should not be modified.
844  *	(eg. see db_ref checks in reallocb and elsewhere)
845  *
846  * The method used by the 'a' suffix functions to keep the dblk
847  * db_ref > 1 is non-obvious.  The macro DBLK_RTFU(2,...) passed to
848  * gesballoc sets the initial db_ref = 2 and sets the DBLK_REFMIN
849  * bit in db_flags.  In dblk_decref() that flag essentially means
850  * the dblk has one extra ref, so the "last ref" is one, not zero.
851  */
852 static mblk_t *
853 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
854     void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
855 {
856 	dblk_t *dbp;
857 	mblk_t *mp;
858 
859 	ASSERT(base != NULL && frp != NULL);
860 
861 	if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
862 		mp = NULL;
863 		goto out;
864 	}
865 
866 	mp = dbp->db_mblk;
867 	dbp->db_base = base;
868 	dbp->db_lim = base + size;
869 	dbp->db_free = dbp->db_lastfree = lastfree;
870 	dbp->db_frtnp = frp;
871 	DBLK_RTFU_WORD(dbp) = db_rtfu;
872 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
873 	mp->b_rptr = mp->b_wptr = base;
874 	mp->b_queue = NULL;
875 	MBLK_BAND_FLAG_WORD(mp) = 0;
876 
877 out:
878 	FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
879 	return (mp);
880 }
881 
882 /*ARGSUSED*/
883 mblk_t *
884 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
885 {
886 	mblk_t *mp;
887 
888 	/*
889 	 * Note that this is structured to allow the common case (i.e.
890 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
891 	 * call optimization.
892 	 */
893 	if (!str_ftnever) {
894 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
895 		    frp, freebs_enqueue, KM_NOSLEEP);
896 
897 		if (mp != NULL)
898 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
899 		return (mp);
900 	}
901 
902 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
903 	    frp, freebs_enqueue, KM_NOSLEEP));
904 }
905 
906 /*
907  * Same as esballoc() but sleeps waiting for memory.
908  */
909 /*ARGSUSED*/
910 mblk_t *
911 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
912 {
913 	mblk_t *mp;
914 
915 	/*
916 	 * Note that this is structured to allow the common case (i.e.
917 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
918 	 * call optimization.
919 	 */
920 	if (!str_ftnever) {
921 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
922 		    frp, freebs_enqueue, KM_SLEEP);
923 
924 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
925 		return (mp);
926 	}
927 
928 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
929 	    frp, freebs_enqueue, KM_SLEEP));
930 }
931 
932 /*ARGSUSED*/
933 mblk_t *
934 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
935 {
936 	mblk_t *mp;
937 
938 	/*
939 	 * Note that this is structured to allow the common case (i.e.
940 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
941 	 * call optimization.
942 	 */
943 	if (!str_ftnever) {
944 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
945 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
946 
947 		if (mp != NULL)
948 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
949 		return (mp);
950 	}
951 
952 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
953 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
954 }
955 
956 /*ARGSUSED*/
957 mblk_t *
958 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
959 {
960 	mblk_t *mp;
961 
962 	/*
963 	 * Note that this is structured to allow the common case (i.e.
964 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
965 	 * call optimization.
966 	 */
967 	if (!str_ftnever) {
968 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
969 		    frp, freebs_enqueue, KM_NOSLEEP);
970 
971 		if (mp != NULL)
972 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
973 		return (mp);
974 	}
975 
976 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
977 	    frp, freebs_enqueue, KM_NOSLEEP));
978 }
979 
980 /*ARGSUSED*/
981 mblk_t *
982 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
983 {
984 	mblk_t *mp;
985 
986 	/*
987 	 * Note that this is structured to allow the common case (i.e.
988 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
989 	 * call optimization.
990 	 */
991 	if (!str_ftnever) {
992 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
993 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
994 
995 		if (mp != NULL)
996 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
997 		return (mp);
998 	}
999 
1000 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
1001 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
1002 }
1003 
1004 static void
1005 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
1006 {
1007 	bcache_t *bcp = dbp->db_cache;
1008 
1009 	ASSERT(dbp->db_mblk == mp);
1010 	if (dbp->db_fthdr != NULL)
1011 		str_ftfree(dbp);
1012 
1013 	/* set credp and projid to be 'unspecified' before returning to cache */
1014 	if (dbp->db_credp != NULL) {
1015 		crfree(dbp->db_credp);
1016 		dbp->db_credp = NULL;
1017 	}
1018 	dbp->db_cpid = -1;
1019 	dbp->db_struioflag = 0;
1020 	dbp->db_struioun.cksum.flags = 0;
1021 
1022 	mutex_enter(&bcp->mutex);
1023 	kmem_cache_free(bcp->dblk_cache, dbp);
1024 	bcp->alloc--;
1025 
1026 	if (bcp->alloc == 0 && bcp->destroy != 0) {
1027 		kmem_cache_destroy(bcp->dblk_cache);
1028 		kmem_cache_destroy(bcp->buffer_cache);
1029 		mutex_exit(&bcp->mutex);
1030 		mutex_destroy(&bcp->mutex);
1031 		kmem_free(bcp, sizeof (bcache_t));
1032 	} else {
1033 		mutex_exit(&bcp->mutex);
1034 	}
1035 }
1036 
1037 bcache_t *
1038 bcache_create(char *name, size_t size, uint_t align)
1039 {
1040 	bcache_t *bcp;
1041 	char buffer[255];
1042 
1043 	ASSERT((align & (align - 1)) == 0);
1044 
1045 	if ((bcp = kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == NULL)
1046 		return (NULL);
1047 
1048 	bcp->size = size;
1049 	bcp->align = align;
1050 	bcp->alloc = 0;
1051 	bcp->destroy = 0;
1052 
1053 	mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
1054 
1055 	(void) sprintf(buffer, "%s_buffer_cache", name);
1056 	bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
1057 	    NULL, NULL, NULL, 0);
1058 	(void) sprintf(buffer, "%s_dblk_cache", name);
1059 	bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
1060 	    DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
1061 	    NULL, (void *)bcp, NULL, 0);
1062 
1063 	return (bcp);
1064 }
1065 
1066 void
1067 bcache_destroy(bcache_t *bcp)
1068 {
1069 	ASSERT(bcp != NULL);
1070 
1071 	mutex_enter(&bcp->mutex);
1072 	if (bcp->alloc == 0) {
1073 		kmem_cache_destroy(bcp->dblk_cache);
1074 		kmem_cache_destroy(bcp->buffer_cache);
1075 		mutex_exit(&bcp->mutex);
1076 		mutex_destroy(&bcp->mutex);
1077 		kmem_free(bcp, sizeof (bcache_t));
1078 	} else {
1079 		bcp->destroy++;
1080 		mutex_exit(&bcp->mutex);
1081 	}
1082 }
1083 
1084 /*ARGSUSED*/
1085 mblk_t *
1086 bcache_allocb(bcache_t *bcp, uint_t pri)
1087 {
1088 	dblk_t *dbp;
1089 	mblk_t *mp = NULL;
1090 
1091 	ASSERT(bcp != NULL);
1092 
1093 	mutex_enter(&bcp->mutex);
1094 	if (bcp->destroy != 0) {
1095 		mutex_exit(&bcp->mutex);
1096 		goto out;
1097 	}
1098 
1099 	if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
1100 		mutex_exit(&bcp->mutex);
1101 		goto out;
1102 	}
1103 	bcp->alloc++;
1104 	mutex_exit(&bcp->mutex);
1105 
1106 	ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
1107 
1108 	mp = dbp->db_mblk;
1109 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1110 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
1111 	mp->b_rptr = mp->b_wptr = dbp->db_base;
1112 	mp->b_queue = NULL;
1113 	MBLK_BAND_FLAG_WORD(mp) = 0;
1114 	STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
1115 out:
1116 	FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
1117 
1118 	return (mp);
1119 }
1120 
1121 static void
1122 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
1123 {
1124 	ASSERT(dbp->db_mblk == mp);
1125 	if (dbp->db_fthdr != NULL)
1126 		str_ftfree(dbp);
1127 
1128 	/* set credp and projid to be 'unspecified' before returning to cache */
1129 	if (dbp->db_credp != NULL) {
1130 		crfree(dbp->db_credp);
1131 		dbp->db_credp = NULL;
1132 	}
1133 	dbp->db_cpid = -1;
1134 	dbp->db_struioflag = 0;
1135 	dbp->db_struioun.cksum.flags = 0;
1136 
1137 	kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
1138 	kmem_cache_free(dbp->db_cache, dbp);
1139 }
1140 
1141 static mblk_t *
1142 allocb_oversize(size_t size, int kmflags)
1143 {
1144 	mblk_t *mp;
1145 	void *buf;
1146 
1147 	size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
1148 	if ((buf = kmem_alloc(size, kmflags)) == NULL)
1149 		return (NULL);
1150 	if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
1151 	    &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
1152 		kmem_free(buf, size);
1153 
1154 	if (mp != NULL)
1155 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
1156 
1157 	return (mp);
1158 }
1159 
1160 mblk_t *
1161 allocb_tryhard(size_t target_size)
1162 {
1163 	size_t size;
1164 	mblk_t *bp;
1165 
1166 	for (size = target_size; size < target_size + 512;
1167 	    size += DBLK_CACHE_ALIGN)
1168 		if ((bp = allocb(size, BPRI_HI)) != NULL)
1169 			return (bp);
1170 	allocb_tryhard_fails++;
1171 	return (NULL);
1172 }
1173 
1174 /*
1175  * This routine is consolidation private for STREAMS internal use
1176  * This routine may only be called from sync routines (i.e., not
1177  * from put or service procedures).  It is located here (rather
1178  * than strsubr.c) so that we don't have to expose all of the
1179  * allocb() implementation details in header files.
1180  */
1181 mblk_t *
1182 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
1183 {
1184 	dblk_t *dbp;
1185 	mblk_t *mp;
1186 	size_t index;
1187 
1188 	index = (size -1) >> DBLK_SIZE_SHIFT;
1189 
1190 	if (flags & STR_NOSIG) {
1191 		if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
1192 			if (size != 0) {
1193 				mp = allocb_oversize(size, KM_SLEEP);
1194 				FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
1195 				    (uintptr_t)mp);
1196 				return (mp);
1197 			}
1198 			index = 0;
1199 		}
1200 
1201 		dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
1202 		mp = dbp->db_mblk;
1203 		DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1204 		mp->b_next = mp->b_prev = mp->b_cont = NULL;
1205 		mp->b_rptr = mp->b_wptr = dbp->db_base;
1206 		mp->b_queue = NULL;
1207 		MBLK_BAND_FLAG_WORD(mp) = 0;
1208 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
1209 
1210 		FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
1211 
1212 	} else {
1213 		while ((mp = allocb(size, pri)) == NULL) {
1214 			if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
1215 				return (NULL);
1216 		}
1217 	}
1218 
1219 	return (mp);
1220 }
1221 
1222 /*
1223  * Call function 'func' with 'arg' when a class zero block can
1224  * be allocated with priority 'pri'.
1225  */
1226 bufcall_id_t
1227 esbbcall(uint_t pri, void (*func)(void *), void *arg)
1228 {
1229 	return (bufcall(1, pri, func, arg));
1230 }
1231 
1232 /*
1233  * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
1234  * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
1235  * This provides consistency for all internal allocators of ioctl.
1236  */
1237 mblk_t *
1238 mkiocb(uint_t cmd)
1239 {
1240 	struct iocblk	*ioc;
1241 	mblk_t		*mp;
1242 
1243 	/*
1244 	 * Allocate enough space for any of the ioctl related messages.
1245 	 */
1246 	if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
1247 		return (NULL);
1248 
1249 	bzero(mp->b_rptr, sizeof (union ioctypes));
1250 
1251 	/*
1252 	 * Set the mblk_t information and ptrs correctly.
1253 	 */
1254 	mp->b_wptr += sizeof (struct iocblk);
1255 	mp->b_datap->db_type = M_IOCTL;
1256 
1257 	/*
1258 	 * Fill in the fields.
1259 	 */
1260 	ioc		= (struct iocblk *)mp->b_rptr;
1261 	ioc->ioc_cmd	= cmd;
1262 	ioc->ioc_cr	= kcred;
1263 	ioc->ioc_id	= getiocseqno();
1264 	ioc->ioc_flag	= IOC_NATIVE;
1265 	return (mp);
1266 }
1267 
1268 /*
1269  * test if block of given size can be allocated with a request of
1270  * the given priority.
1271  * 'pri' is no longer used, but is retained for compatibility.
1272  */
1273 /* ARGSUSED */
1274 int
1275 testb(size_t size, uint_t pri)
1276 {
1277 	return ((size + sizeof (dblk_t)) <= kmem_avail());
1278 }
1279 
1280 /*
1281  * Call function 'func' with argument 'arg' when there is a reasonably
1282  * good chance that a block of size 'size' can be allocated.
1283  * 'pri' is no longer used, but is retained for compatibility.
1284  */
1285 /* ARGSUSED */
1286 bufcall_id_t
1287 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
1288 {
1289 	static long bid = 1;	/* always odd to save checking for zero */
1290 	bufcall_id_t bc_id;
1291 	struct strbufcall *bcp;
1292 
1293 	if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
1294 		return (0);
1295 
1296 	bcp->bc_func = func;
1297 	bcp->bc_arg = arg;
1298 	bcp->bc_size = size;
1299 	bcp->bc_next = NULL;
1300 	bcp->bc_executor = NULL;
1301 
1302 	mutex_enter(&strbcall_lock);
1303 	/*
1304 	 * After bcp is linked into strbcalls and strbcall_lock is dropped there
1305 	 * should be no references to bcp since it may be freed by
1306 	 * runbufcalls(). Since bcp_id field is returned, we save its value in
1307 	 * the local var.
1308 	 */
1309 	bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2);	/* keep it odd */
1310 
1311 	/*
1312 	 * add newly allocated stream event to existing
1313 	 * linked list of events.
1314 	 */
1315 	if (strbcalls.bc_head == NULL) {
1316 		strbcalls.bc_head = strbcalls.bc_tail = bcp;
1317 	} else {
1318 		strbcalls.bc_tail->bc_next = bcp;
1319 		strbcalls.bc_tail = bcp;
1320 	}
1321 
1322 	cv_signal(&strbcall_cv);
1323 	mutex_exit(&strbcall_lock);
1324 	return (bc_id);
1325 }
1326 
1327 /*
1328  * Cancel a bufcall request.
1329  */
1330 void
1331 unbufcall(bufcall_id_t id)
1332 {
1333 	strbufcall_t *bcp, *pbcp;
1334 
1335 	mutex_enter(&strbcall_lock);
1336 again:
1337 	pbcp = NULL;
1338 	for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
1339 		if (id == bcp->bc_id)
1340 			break;
1341 		pbcp = bcp;
1342 	}
1343 	if (bcp) {
1344 		if (bcp->bc_executor != NULL) {
1345 			if (bcp->bc_executor != curthread) {
1346 				cv_wait(&bcall_cv, &strbcall_lock);
1347 				goto again;
1348 			}
1349 		} else {
1350 			if (pbcp)
1351 				pbcp->bc_next = bcp->bc_next;
1352 			else
1353 				strbcalls.bc_head = bcp->bc_next;
1354 			if (bcp == strbcalls.bc_tail)
1355 				strbcalls.bc_tail = pbcp;
1356 			kmem_free(bcp, sizeof (strbufcall_t));
1357 		}
1358 	}
1359 	mutex_exit(&strbcall_lock);
1360 }
1361 
1362 /*
1363  * Duplicate a message block by block (uses dupb), returning
1364  * a pointer to the duplicate message.
1365  * Returns a non-NULL value only if the entire message
1366  * was dup'd.
1367  */
1368 mblk_t *
1369 dupmsg(mblk_t *bp)
1370 {
1371 	mblk_t *head, *nbp;
1372 
1373 	if (!bp || !(nbp = head = dupb(bp)))
1374 		return (NULL);
1375 
1376 	while (bp->b_cont) {
1377 		if (!(nbp->b_cont = dupb(bp->b_cont))) {
1378 			freemsg(head);
1379 			return (NULL);
1380 		}
1381 		nbp = nbp->b_cont;
1382 		bp = bp->b_cont;
1383 	}
1384 	return (head);
1385 }
1386 
1387 #define	DUPB_NOLOAN(bp) \
1388 	((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
1389 	copyb((bp)) : dupb((bp)))
1390 
1391 mblk_t *
1392 dupmsg_noloan(mblk_t *bp)
1393 {
1394 	mblk_t *head, *nbp;
1395 
1396 	if (bp == NULL || DB_TYPE(bp) != M_DATA ||
1397 	    ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
1398 		return (NULL);
1399 
1400 	while (bp->b_cont) {
1401 		if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
1402 			freemsg(head);
1403 			return (NULL);
1404 		}
1405 		nbp = nbp->b_cont;
1406 		bp = bp->b_cont;
1407 	}
1408 	return (head);
1409 }
1410 
1411 /*
1412  * Copy data from message and data block to newly allocated message and
1413  * data block. Returns new message block pointer, or NULL if error.
1414  * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
1415  * as in the original even when db_base is not word aligned. (bug 1052877)
1416  */
1417 mblk_t *
1418 copyb(mblk_t *bp)
1419 {
1420 	mblk_t	*nbp;
1421 	dblk_t	*dp, *ndp;
1422 	uchar_t *base;
1423 	size_t	size;
1424 	size_t	unaligned;
1425 
1426 	ASSERT(bp->b_wptr >= bp->b_rptr);
1427 
1428 	dp = bp->b_datap;
1429 	if (dp->db_fthdr != NULL)
1430 		STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
1431 
1432 	size = dp->db_lim - dp->db_base;
1433 	unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
1434 	if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
1435 		return (NULL);
1436 	nbp->b_flag = bp->b_flag;
1437 	nbp->b_band = bp->b_band;
1438 	ndp = nbp->b_datap;
1439 
1440 	/*
1441 	 * Copy the various checksum information that came in
1442 	 * originally.
1443 	 */
1444 	ndp->db_cksumstart = dp->db_cksumstart;
1445 	ndp->db_cksumend = dp->db_cksumend;
1446 	ndp->db_cksumstuff = dp->db_cksumstuff;
1447 	bcopy(dp->db_struioun.data, ndp->db_struioun.data,
1448 	    sizeof (dp->db_struioun.data));
1449 
1450 	/*
1451 	 * Well, here is a potential issue.  If we are trying to
1452 	 * trace a flow, and we copy the message, we might lose
1453 	 * information about where this message might have been.
1454 	 * So we should inherit the FT data.  On the other hand,
1455 	 * a user might be interested only in alloc to free data.
1456 	 * So I guess the real answer is to provide a tunable.
1457 	 */
1458 	STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1459 
1460 	base = ndp->db_base + unaligned;
1461 	bcopy(dp->db_base, ndp->db_base + unaligned, size);
1462 
1463 	nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
1464 	nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
1465 
1466 	return (nbp);
1467 }
1468 
1469 /*
1470  * Copy data from message to newly allocated message using new
1471  * data blocks.  Returns a pointer to the new message, or NULL if error.
1472  */
1473 mblk_t *
1474 copymsg(mblk_t *bp)
1475 {
1476 	mblk_t *head, *nbp;
1477 
1478 	if (!bp || !(nbp = head = copyb(bp)))
1479 		return (NULL);
1480 
1481 	while (bp->b_cont) {
1482 		if (!(nbp->b_cont = copyb(bp->b_cont))) {
1483 			freemsg(head);
1484 			return (NULL);
1485 		}
1486 		nbp = nbp->b_cont;
1487 		bp = bp->b_cont;
1488 	}
1489 	return (head);
1490 }
1491 
1492 /*
1493  * link a message block to tail of message
1494  */
1495 void
1496 linkb(mblk_t *mp, mblk_t *bp)
1497 {
1498 	ASSERT(mp && bp);
1499 
1500 	for (; mp->b_cont; mp = mp->b_cont)
1501 		;
1502 	mp->b_cont = bp;
1503 }
1504 
1505 /*
1506  * unlink a message block from head of message
1507  * return pointer to new message.
1508  * NULL if message becomes empty.
1509  */
1510 mblk_t *
1511 unlinkb(mblk_t *bp)
1512 {
1513 	mblk_t *bp1;
1514 
1515 	bp1 = bp->b_cont;
1516 	bp->b_cont = NULL;
1517 	return (bp1);
1518 }
1519 
1520 /*
1521  * remove a message block "bp" from message "mp"
1522  *
1523  * Return pointer to new message or NULL if no message remains.
1524  * Return -1 if bp is not found in message.
1525  */
1526 mblk_t *
1527 rmvb(mblk_t *mp, mblk_t *bp)
1528 {
1529 	mblk_t *tmp;
1530 	mblk_t *lastp = NULL;
1531 
1532 	ASSERT(mp && bp);
1533 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
1534 		if (tmp == bp) {
1535 			if (lastp)
1536 				lastp->b_cont = tmp->b_cont;
1537 			else
1538 				mp = tmp->b_cont;
1539 			tmp->b_cont = NULL;
1540 			return (mp);
1541 		}
1542 		lastp = tmp;
1543 	}
1544 	return ((mblk_t *)-1);
1545 }
1546 
1547 /*
1548  * Concatenate and align first len bytes of common
1549  * message type.  Len == -1, means concat everything.
1550  * Returns 1 on success, 0 on failure
1551  * After the pullup, mp points to the pulled up data.
1552  */
1553 int
1554 pullupmsg(mblk_t *mp, ssize_t len)
1555 {
1556 	mblk_t *bp, *b_cont;
1557 	dblk_t *dbp;
1558 	ssize_t n;
1559 
1560 	ASSERT(mp->b_datap->db_ref > 0);
1561 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
1562 
1563 	if (len == -1) {
1564 		if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
1565 			return (1);
1566 		len = xmsgsize(mp);
1567 	} else {
1568 		ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
1569 		ASSERT(first_mblk_len >= 0);
1570 		/*
1571 		 * If the length is less than that of the first mblk,
1572 		 * we want to pull up the message into an aligned mblk.
1573 		 * Though not part of the spec, some callers assume it.
1574 		 */
1575 		if (len <= first_mblk_len) {
1576 			if (str_aligned(mp->b_rptr))
1577 				return (1);
1578 			len = first_mblk_len;
1579 		} else if (xmsgsize(mp) < len)
1580 			return (0);
1581 	}
1582 
1583 	if ((bp = allocb_tmpl(len, mp)) == NULL)
1584 		return (0);
1585 
1586 	dbp = bp->b_datap;
1587 	*bp = *mp;		/* swap mblks so bp heads the old msg... */
1588 	mp->b_datap = dbp;	/* ... and mp heads the new message */
1589 	mp->b_datap->db_mblk = mp;
1590 	bp->b_datap->db_mblk = bp;
1591 	mp->b_rptr = mp->b_wptr = dbp->db_base;
1592 
1593 	do {
1594 		ASSERT(bp->b_datap->db_ref > 0);
1595 		ASSERT(bp->b_wptr >= bp->b_rptr);
1596 		n = MIN(bp->b_wptr - bp->b_rptr, len);
1597 		ASSERT(n >= 0);		/* allow zero-length mblk_t's */
1598 		if (n > 0)
1599 			bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
1600 		mp->b_wptr += n;
1601 		bp->b_rptr += n;
1602 		len -= n;
1603 		if (bp->b_rptr != bp->b_wptr)
1604 			break;
1605 		b_cont = bp->b_cont;
1606 		freeb(bp);
1607 		bp = b_cont;
1608 	} while (len && bp);
1609 
1610 	mp->b_cont = bp;	/* tack on whatever wasn't pulled up */
1611 
1612 	return (1);
1613 }
1614 
1615 /*
1616  * Concatenate and align at least the first len bytes of common message
1617  * type.  Len == -1 means concatenate everything.  The original message is
1618  * unaltered.  Returns a pointer to a new message on success, otherwise
1619  * returns NULL.
1620  */
1621 mblk_t *
1622 msgpullup(mblk_t *mp, ssize_t len)
1623 {
1624 	mblk_t	*newmp;
1625 	ssize_t	totlen;
1626 	ssize_t	n;
1627 
1628 	totlen = xmsgsize(mp);
1629 
1630 	if ((len > 0) && (len > totlen))
1631 		return (NULL);
1632 
1633 	/*
1634 	 * Copy all of the first msg type into one new mblk, then dupmsg
1635 	 * and link the rest onto this.
1636 	 */
1637 
1638 	len = totlen;
1639 
1640 	if ((newmp = allocb_tmpl(len, mp)) == NULL)
1641 		return (NULL);
1642 
1643 	newmp->b_flag = mp->b_flag;
1644 	newmp->b_band = mp->b_band;
1645 
1646 	while (len > 0) {
1647 		n = mp->b_wptr - mp->b_rptr;
1648 		ASSERT(n >= 0);		/* allow zero-length mblk_t's */
1649 		if (n > 0)
1650 			bcopy(mp->b_rptr, newmp->b_wptr, n);
1651 		newmp->b_wptr += n;
1652 		len -= n;
1653 		mp = mp->b_cont;
1654 	}
1655 
1656 	if (mp != NULL) {
1657 		newmp->b_cont = dupmsg(mp);
1658 		if (newmp->b_cont == NULL) {
1659 			freemsg(newmp);
1660 			return (NULL);
1661 		}
1662 	}
1663 
1664 	return (newmp);
1665 }
1666 
1667 /*
1668  * Trim bytes from message
1669  *  len > 0, trim from head
1670  *  len < 0, trim from tail
1671  * Returns 1 on success, 0 on failure.
1672  */
1673 int
1674 adjmsg(mblk_t *mp, ssize_t len)
1675 {
1676 	mblk_t *bp;
1677 	mblk_t *save_bp = NULL;
1678 	mblk_t *prev_bp;
1679 	mblk_t *bcont;
1680 	unsigned char type;
1681 	ssize_t n;
1682 	int fromhead;
1683 	int first;
1684 
1685 	ASSERT(mp != NULL);
1686 
1687 	if (len < 0) {
1688 		fromhead = 0;
1689 		len = -len;
1690 	} else {
1691 		fromhead = 1;
1692 	}
1693 
1694 	if (xmsgsize(mp) < len)
1695 		return (0);
1696 
1697 	if (fromhead) {
1698 		first = 1;
1699 		while (len) {
1700 			ASSERT(mp->b_wptr >= mp->b_rptr);
1701 			n = MIN(mp->b_wptr - mp->b_rptr, len);
1702 			mp->b_rptr += n;
1703 			len -= n;
1704 
1705 			/*
1706 			 * If this is not the first zero length
1707 			 * message remove it
1708 			 */
1709 			if (!first && (mp->b_wptr == mp->b_rptr)) {
1710 				bcont = mp->b_cont;
1711 				freeb(mp);
1712 				mp = save_bp->b_cont = bcont;
1713 			} else {
1714 				save_bp = mp;
1715 				mp = mp->b_cont;
1716 			}
1717 			first = 0;
1718 		}
1719 	} else {
1720 		type = mp->b_datap->db_type;
1721 		while (len) {
1722 			bp = mp;
1723 			save_bp = NULL;
1724 
1725 			/*
1726 			 * Find the last message of same type
1727 			 */
1728 			while (bp && bp->b_datap->db_type == type) {
1729 				ASSERT(bp->b_wptr >= bp->b_rptr);
1730 				prev_bp = save_bp;
1731 				save_bp = bp;
1732 				bp = bp->b_cont;
1733 			}
1734 			if (save_bp == NULL)
1735 				break;
1736 			n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
1737 			save_bp->b_wptr -= n;
1738 			len -= n;
1739 
1740 			/*
1741 			 * If this is not the first message
1742 			 * and we have taken away everything
1743 			 * from this message, remove it
1744 			 */
1745 
1746 			if ((save_bp != mp) &&
1747 			    (save_bp->b_wptr == save_bp->b_rptr)) {
1748 				bcont = save_bp->b_cont;
1749 				freeb(save_bp);
1750 				prev_bp->b_cont = bcont;
1751 			}
1752 		}
1753 	}
1754 	return (1);
1755 }
1756 
1757 /*
1758  * get number of data bytes in message
1759  */
1760 size_t
1761 msgdsize(mblk_t *bp)
1762 {
1763 	size_t count = 0;
1764 
1765 	for (; bp; bp = bp->b_cont)
1766 		if (bp->b_datap->db_type == M_DATA) {
1767 			ASSERT(bp->b_wptr >= bp->b_rptr);
1768 			count += bp->b_wptr - bp->b_rptr;
1769 		}
1770 	return (count);
1771 }
1772 
1773 /*
1774  * Get a message off head of queue
1775  *
1776  * If queue has no buffers then mark queue
1777  * with QWANTR. (queue wants to be read by
1778  * someone when data becomes available)
1779  *
1780  * If there is something to take off then do so.
1781  * If queue falls below hi water mark turn off QFULL
1782  * flag.  Decrement weighted count of queue.
1783  * Also turn off QWANTR because queue is being read.
1784  *
1785  * The queue count is maintained on a per-band basis.
1786  * Priority band 0 (normal messages) uses q_count,
1787  * q_lowat, etc.  Non-zero priority bands use the
1788  * fields in their respective qband structures
1789  * (qb_count, qb_lowat, etc.)  All messages appear
1790  * on the same list, linked via their b_next pointers.
1791  * q_first is the head of the list.  q_count does
1792  * not reflect the size of all the messages on the
1793  * queue.  It only reflects those messages in the
1794  * normal band of flow.  The one exception to this
1795  * deals with high priority messages.  They are in
1796  * their own conceptual "band", but are accounted
1797  * against q_count.
1798  *
1799  * If queue count is below the lo water mark and QWANTW
1800  * is set, enable the closest backq which has a service
1801  * procedure and turn off the QWANTW flag.
1802  *
1803  * getq could be built on top of rmvq, but isn't because
1804  * of performance considerations.
1805  *
1806  * A note on the use of q_count and q_mblkcnt:
1807  *   q_count is the traditional byte count for messages that
1808  *   have been put on a queue.  Documentation tells us that
1809  *   we shouldn't rely on that count, but some drivers/modules
1810  *   do.  What was needed, however, is a mechanism to prevent
1811  *   runaway streams from consuming all of the resources,
1812  *   and particularly be able to flow control zero-length
1813  *   messages.  q_mblkcnt is used for this purpose.  It
1814  *   counts the number of mblk's that are being put on
1815  *   the queue.  The intention here, is that each mblk should
1816  *   contain one byte of data and, for the purpose of
1817  *   flow-control, logically does.  A queue will become
1818  *   full when EITHER of these values (q_count and q_mblkcnt)
1819  *   reach the highwater mark.  It will clear when BOTH
1820  *   of them drop below the highwater mark.  And it will
1821  *   backenable when BOTH of them drop below the lowwater
1822  *   mark.
1823  *   With this algorithm, a driver/module might be able
1824  *   to find a reasonably accurate q_count, and the
1825  *   framework can still try and limit resource usage.
1826  */
1827 mblk_t *
1828 getq(queue_t *q)
1829 {
1830 	mblk_t *bp;
1831 	uchar_t band = 0;
1832 
1833 	bp = getq_noenab(q, 0);
1834 	if (bp != NULL)
1835 		band = bp->b_band;
1836 
1837 	/*
1838 	 * Inlined from qbackenable().
1839 	 * Quick check without holding the lock.
1840 	 */
1841 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1842 		return (bp);
1843 
1844 	qbackenable(q, band);
1845 	return (bp);
1846 }
1847 
1848 /*
1849  * Returns the number of bytes in a message (a message is defined as a
1850  * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we
1851  * also return the number of distinct mblks in the message.
1852  */
1853 int
1854 mp_cont_len(mblk_t *bp, int *mblkcnt)
1855 {
1856 	mblk_t	*mp;
1857 	int	mblks = 0;
1858 	int	bytes = 0;
1859 
1860 	for (mp = bp; mp != NULL; mp = mp->b_cont) {
1861 		bytes += MBLKL(mp);
1862 		mblks++;
1863 	}
1864 
1865 	if (mblkcnt != NULL)
1866 		*mblkcnt = mblks;
1867 
1868 	return (bytes);
1869 }
1870 
1871 /*
1872  * Like getq() but does not backenable.  This is used by the stream
1873  * head when a putback() is likely.  The caller must call qbackenable()
1874  * after it is done with accessing the queue.
1875  * The rbytes arguments to getq_noneab() allows callers to specify a
1876  * the maximum number of bytes to return. If the current amount on the
1877  * queue is less than this then the entire message will be returned.
1878  * A value of 0 returns the entire message and is equivalent to the old
1879  * default behaviour prior to the addition of the rbytes argument.
1880  */
1881 mblk_t *
1882 getq_noenab(queue_t *q, ssize_t rbytes)
1883 {
1884 	mblk_t *bp, *mp1;
1885 	mblk_t *mp2 = NULL;
1886 	qband_t *qbp;
1887 	kthread_id_t freezer;
1888 	int	bytecnt = 0, mblkcnt = 0;
1889 
1890 	/* freezestr should allow its caller to call getq/putq */
1891 	freezer = STREAM(q)->sd_freezer;
1892 	if (freezer == curthread) {
1893 		ASSERT(frozenstr(q));
1894 		ASSERT(MUTEX_HELD(QLOCK(q)));
1895 	} else
1896 		mutex_enter(QLOCK(q));
1897 
1898 	if ((bp = q->q_first) == 0) {
1899 		q->q_flag |= QWANTR;
1900 	} else {
1901 		/*
1902 		 * If the caller supplied a byte threshold and there is
1903 		 * more than this amount on the queue then break up the
1904 		 * the message appropriately.  We can only safely do
1905 		 * this for M_DATA messages.
1906 		 */
1907 		if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) &&
1908 		    (q->q_count > rbytes)) {
1909 			/*
1910 			 * Inline version of mp_cont_len() which terminates
1911 			 * when we meet or exceed rbytes.
1912 			 */
1913 			for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) {
1914 				mblkcnt++;
1915 				bytecnt += MBLKL(mp1);
1916 				if (bytecnt  >= rbytes)
1917 					break;
1918 			}
1919 			/*
1920 			 * We need to account for the following scenarios:
1921 			 *
1922 			 * 1) Too much data in the first message:
1923 			 *	mp1 will be the mblk which puts us over our
1924 			 *	byte limit.
1925 			 * 2) Not enough data in the first message:
1926 			 *	mp1 will be NULL.
1927 			 * 3) Exactly the right amount of data contained within
1928 			 *    whole mblks:
1929 			 *	mp1->b_cont will be where we break the message.
1930 			 */
1931 			if (bytecnt > rbytes) {
1932 				/*
1933 				 * Dup/copy mp1 and put what we don't need
1934 				 * back onto the queue. Adjust the read/write
1935 				 * and continuation pointers appropriately
1936 				 * and decrement the current mblk count to
1937 				 * reflect we are putting an mblk back onto
1938 				 * the queue.
1939 				 * When adjusting the message pointers, it's
1940 				 * OK to use the existing bytecnt and the
1941 				 * requested amount (rbytes) to calculate the
1942 				 * the new write offset (b_wptr) of what we
1943 				 * are taking. However, we  cannot use these
1944 				 * values when calculating the read offset of
1945 				 * the mblk we are putting back on the queue.
1946 				 * This is because the begining (b_rptr) of the
1947 				 * mblk represents some arbitrary point within
1948 				 * the message.
1949 				 * It's simplest to do this by advancing b_rptr
1950 				 * by the new length of mp1 as we don't have to
1951 				 * remember any intermediate state.
1952 				 */
1953 				ASSERT(mp1 != NULL);
1954 				mblkcnt--;
1955 				if ((mp2 = dupb(mp1)) == NULL &&
1956 				    (mp2 = copyb(mp1)) == NULL) {
1957 					bytecnt = mblkcnt = 0;
1958 					goto dup_failed;
1959 				}
1960 				mp2->b_cont = mp1->b_cont;
1961 				mp1->b_wptr -= bytecnt - rbytes;
1962 				mp2->b_rptr += mp1->b_wptr - mp1->b_rptr;
1963 				mp1->b_cont = NULL;
1964 				bytecnt = rbytes;
1965 			} else {
1966 				/*
1967 				 * Either there is not enough data in the first
1968 				 * message or there is no excess data to deal
1969 				 * with. If mp1 is NULL, we are taking the
1970 				 * whole message. No need to do anything.
1971 				 * Otherwise we assign mp1->b_cont to mp2 as
1972 				 * we will be putting this back onto the head of
1973 				 * the queue.
1974 				 */
1975 				if (mp1 != NULL) {
1976 					mp2 = mp1->b_cont;
1977 					mp1->b_cont = NULL;
1978 				}
1979 			}
1980 			/*
1981 			 * If mp2 is not NULL then we have part of the message
1982 			 * to put back onto the queue.
1983 			 */
1984 			if (mp2 != NULL) {
1985 				if ((mp2->b_next = bp->b_next) == NULL)
1986 					q->q_last = mp2;
1987 				else
1988 					bp->b_next->b_prev = mp2;
1989 				q->q_first = mp2;
1990 			} else {
1991 				if ((q->q_first = bp->b_next) == NULL)
1992 					q->q_last = NULL;
1993 				else
1994 					q->q_first->b_prev = NULL;
1995 			}
1996 		} else {
1997 			/*
1998 			 * Either no byte threshold was supplied, there is
1999 			 * not enough on the queue or we failed to
2000 			 * duplicate/copy a data block. In these cases we
2001 			 * just take the entire first message.
2002 			 */
2003 dup_failed:
2004 			bytecnt = mp_cont_len(bp, &mblkcnt);
2005 			if ((q->q_first = bp->b_next) == NULL)
2006 				q->q_last = NULL;
2007 			else
2008 				q->q_first->b_prev = NULL;
2009 		}
2010 		if (bp->b_band == 0) {
2011 			q->q_count -= bytecnt;
2012 			q->q_mblkcnt -= mblkcnt;
2013 			if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2014 			    (q->q_mblkcnt < q->q_hiwat))) {
2015 				q->q_flag &= ~QFULL;
2016 			}
2017 		} else {
2018 			int i;
2019 
2020 			ASSERT(bp->b_band <= q->q_nband);
2021 			ASSERT(q->q_bandp != NULL);
2022 			ASSERT(MUTEX_HELD(QLOCK(q)));
2023 			qbp = q->q_bandp;
2024 			i = bp->b_band;
2025 			while (--i > 0)
2026 				qbp = qbp->qb_next;
2027 			if (qbp->qb_first == qbp->qb_last) {
2028 				qbp->qb_first = NULL;
2029 				qbp->qb_last = NULL;
2030 			} else {
2031 				qbp->qb_first = bp->b_next;
2032 			}
2033 			qbp->qb_count -= bytecnt;
2034 			qbp->qb_mblkcnt -= mblkcnt;
2035 			if (qbp->qb_mblkcnt == 0 ||
2036 			    ((qbp->qb_count < qbp->qb_hiwat) &&
2037 			    (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2038 				qbp->qb_flag &= ~QB_FULL;
2039 			}
2040 		}
2041 		q->q_flag &= ~QWANTR;
2042 		bp->b_next = NULL;
2043 		bp->b_prev = NULL;
2044 	}
2045 	if (freezer != curthread)
2046 		mutex_exit(QLOCK(q));
2047 
2048 	STR_FTEVENT_MSG(bp, q, FTEV_GETQ, 0);
2049 
2050 	return (bp);
2051 }
2052 
2053 /*
2054  * Determine if a backenable is needed after removing a message in the
2055  * specified band.
2056  * NOTE: This routine assumes that something like getq_noenab() has been
2057  * already called.
2058  *
2059  * For the read side it is ok to hold sd_lock across calling this (and the
2060  * stream head often does).
2061  * But for the write side strwakeq might be invoked and it acquires sd_lock.
2062  */
2063 void
2064 qbackenable(queue_t *q, uchar_t band)
2065 {
2066 	int backenab = 0;
2067 	qband_t *qbp;
2068 	kthread_id_t freezer;
2069 
2070 	ASSERT(q);
2071 	ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
2072 
2073 	/*
2074 	 * Quick check without holding the lock.
2075 	 * OK since after getq() has lowered the q_count these flags
2076 	 * would not change unless either the qbackenable() is done by
2077 	 * another thread (which is ok) or the queue has gotten QFULL
2078 	 * in which case another backenable will take place when the queue
2079 	 * drops below q_lowat.
2080 	 */
2081 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
2082 		return;
2083 
2084 	/* freezestr should allow its caller to call getq/putq */
2085 	freezer = STREAM(q)->sd_freezer;
2086 	if (freezer == curthread) {
2087 		ASSERT(frozenstr(q));
2088 		ASSERT(MUTEX_HELD(QLOCK(q)));
2089 	} else
2090 		mutex_enter(QLOCK(q));
2091 
2092 	if (band == 0) {
2093 		if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
2094 		    q->q_mblkcnt < q->q_lowat)) {
2095 			backenab = q->q_flag & (QWANTW|QWANTWSYNC);
2096 		}
2097 	} else {
2098 		int i;
2099 
2100 		ASSERT((unsigned)band <= q->q_nband);
2101 		ASSERT(q->q_bandp != NULL);
2102 
2103 		qbp = q->q_bandp;
2104 		i = band;
2105 		while (--i > 0)
2106 			qbp = qbp->qb_next;
2107 
2108 		if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
2109 		    qbp->qb_mblkcnt < qbp->qb_lowat)) {
2110 			backenab = qbp->qb_flag & QB_WANTW;
2111 		}
2112 	}
2113 
2114 	if (backenab == 0) {
2115 		if (freezer != curthread)
2116 			mutex_exit(QLOCK(q));
2117 		return;
2118 	}
2119 
2120 	/* Have to drop the lock across strwakeq and backenable */
2121 	if (backenab & QWANTWSYNC)
2122 		q->q_flag &= ~QWANTWSYNC;
2123 	if (backenab & (QWANTW|QB_WANTW)) {
2124 		if (band != 0)
2125 			qbp->qb_flag &= ~QB_WANTW;
2126 		else {
2127 			q->q_flag &= ~QWANTW;
2128 		}
2129 	}
2130 
2131 	if (freezer != curthread)
2132 		mutex_exit(QLOCK(q));
2133 
2134 	if (backenab & QWANTWSYNC)
2135 		strwakeq(q, QWANTWSYNC);
2136 	if (backenab & (QWANTW|QB_WANTW))
2137 		backenable(q, band);
2138 }
2139 
2140 /*
2141  * Remove a message from a queue.  The queue count and other
2142  * flow control parameters are adjusted and the back queue
2143  * enabled if necessary.
2144  *
2145  * rmvq can be called with the stream frozen, but other utility functions
2146  * holding QLOCK, and by streams modules without any locks/frozen.
2147  */
2148 void
2149 rmvq(queue_t *q, mblk_t *mp)
2150 {
2151 	ASSERT(mp != NULL);
2152 
2153 	rmvq_noenab(q, mp);
2154 	if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
2155 		/*
2156 		 * qbackenable can handle a frozen stream but not a "random"
2157 		 * qlock being held. Drop lock across qbackenable.
2158 		 */
2159 		mutex_exit(QLOCK(q));
2160 		qbackenable(q, mp->b_band);
2161 		mutex_enter(QLOCK(q));
2162 	} else {
2163 		qbackenable(q, mp->b_band);
2164 	}
2165 }
2166 
2167 /*
2168  * Like rmvq() but without any backenabling.
2169  * This exists to handle SR_CONSOL_DATA in strrput().
2170  */
2171 void
2172 rmvq_noenab(queue_t *q, mblk_t *mp)
2173 {
2174 	int i;
2175 	qband_t *qbp = NULL;
2176 	kthread_id_t freezer;
2177 	int	bytecnt = 0, mblkcnt = 0;
2178 
2179 	freezer = STREAM(q)->sd_freezer;
2180 	if (freezer == curthread) {
2181 		ASSERT(frozenstr(q));
2182 		ASSERT(MUTEX_HELD(QLOCK(q)));
2183 	} else if (MUTEX_HELD(QLOCK(q))) {
2184 		/* Don't drop lock on exit */
2185 		freezer = curthread;
2186 	} else
2187 		mutex_enter(QLOCK(q));
2188 
2189 	ASSERT(mp->b_band <= q->q_nband);
2190 	if (mp->b_band != 0) {		/* Adjust band pointers */
2191 		ASSERT(q->q_bandp != NULL);
2192 		qbp = q->q_bandp;
2193 		i = mp->b_band;
2194 		while (--i > 0)
2195 			qbp = qbp->qb_next;
2196 		if (mp == qbp->qb_first) {
2197 			if (mp->b_next && mp->b_band == mp->b_next->b_band)
2198 				qbp->qb_first = mp->b_next;
2199 			else
2200 				qbp->qb_first = NULL;
2201 		}
2202 		if (mp == qbp->qb_last) {
2203 			if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
2204 				qbp->qb_last = mp->b_prev;
2205 			else
2206 				qbp->qb_last = NULL;
2207 		}
2208 	}
2209 
2210 	/*
2211 	 * Remove the message from the list.
2212 	 */
2213 	if (mp->b_prev)
2214 		mp->b_prev->b_next = mp->b_next;
2215 	else
2216 		q->q_first = mp->b_next;
2217 	if (mp->b_next)
2218 		mp->b_next->b_prev = mp->b_prev;
2219 	else
2220 		q->q_last = mp->b_prev;
2221 	mp->b_next = NULL;
2222 	mp->b_prev = NULL;
2223 
2224 	/* Get the size of the message for q_count accounting */
2225 	bytecnt = mp_cont_len(mp, &mblkcnt);
2226 
2227 	if (mp->b_band == 0) {		/* Perform q_count accounting */
2228 		q->q_count -= bytecnt;
2229 		q->q_mblkcnt -= mblkcnt;
2230 		if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2231 		    (q->q_mblkcnt < q->q_hiwat))) {
2232 			q->q_flag &= ~QFULL;
2233 		}
2234 	} else {			/* Perform qb_count accounting */
2235 		qbp->qb_count -= bytecnt;
2236 		qbp->qb_mblkcnt -= mblkcnt;
2237 		if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) &&
2238 		    (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2239 			qbp->qb_flag &= ~QB_FULL;
2240 		}
2241 	}
2242 	if (freezer != curthread)
2243 		mutex_exit(QLOCK(q));
2244 
2245 	STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, 0);
2246 }
2247 
2248 /*
2249  * Empty a queue.
2250  * If flag is set, remove all messages.  Otherwise, remove
2251  * only non-control messages.  If queue falls below its low
2252  * water mark, and QWANTW is set, enable the nearest upstream
2253  * service procedure.
2254  *
2255  * Historical note: when merging the M_FLUSH code in strrput with this
2256  * code one difference was discovered. flushq did not have a check
2257  * for q_lowat == 0 in the backenabling test.
2258  *
2259  * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
2260  * if one exists on the queue.
2261  */
2262 void
2263 flushq_common(queue_t *q, int flag, int pcproto_flag)
2264 {
2265 	mblk_t *mp, *nmp;
2266 	qband_t *qbp;
2267 	int backenab = 0;
2268 	unsigned char bpri;
2269 	unsigned char	qbf[NBAND];	/* band flushing backenable flags */
2270 
2271 	if (q->q_first == NULL)
2272 		return;
2273 
2274 	mutex_enter(QLOCK(q));
2275 	mp = q->q_first;
2276 	q->q_first = NULL;
2277 	q->q_last = NULL;
2278 	q->q_count = 0;
2279 	q->q_mblkcnt = 0;
2280 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2281 		qbp->qb_first = NULL;
2282 		qbp->qb_last = NULL;
2283 		qbp->qb_count = 0;
2284 		qbp->qb_mblkcnt = 0;
2285 		qbp->qb_flag &= ~QB_FULL;
2286 	}
2287 	q->q_flag &= ~QFULL;
2288 	mutex_exit(QLOCK(q));
2289 	while (mp) {
2290 		nmp = mp->b_next;
2291 		mp->b_next = mp->b_prev = NULL;
2292 
2293 		STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, 0);
2294 
2295 		if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
2296 			(void) putq(q, mp);
2297 		else if (flag || datamsg(mp->b_datap->db_type))
2298 			freemsg(mp);
2299 		else
2300 			(void) putq(q, mp);
2301 		mp = nmp;
2302 	}
2303 	bpri = 1;
2304 	mutex_enter(QLOCK(q));
2305 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2306 		if ((qbp->qb_flag & QB_WANTW) &&
2307 		    (((qbp->qb_count < qbp->qb_lowat) &&
2308 		    (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
2309 		    qbp->qb_lowat == 0)) {
2310 			qbp->qb_flag &= ~QB_WANTW;
2311 			backenab = 1;
2312 			qbf[bpri] = 1;
2313 		} else
2314 			qbf[bpri] = 0;
2315 		bpri++;
2316 	}
2317 	ASSERT(bpri == (unsigned char)(q->q_nband + 1));
2318 	if ((q->q_flag & QWANTW) &&
2319 	    (((q->q_count < q->q_lowat) &&
2320 	    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2321 		q->q_flag &= ~QWANTW;
2322 		backenab = 1;
2323 		qbf[0] = 1;
2324 	} else
2325 		qbf[0] = 0;
2326 
2327 	/*
2328 	 * If any band can now be written to, and there is a writer
2329 	 * for that band, then backenable the closest service procedure.
2330 	 */
2331 	if (backenab) {
2332 		mutex_exit(QLOCK(q));
2333 		for (bpri = q->q_nband; bpri != 0; bpri--)
2334 			if (qbf[bpri])
2335 				backenable(q, bpri);
2336 		if (qbf[0])
2337 			backenable(q, 0);
2338 	} else
2339 		mutex_exit(QLOCK(q));
2340 }
2341 
2342 /*
2343  * The real flushing takes place in flushq_common. This is done so that
2344  * a flag which specifies whether or not M_PCPROTO messages should be flushed
2345  * or not. Currently the only place that uses this flag is the stream head.
2346  */
2347 void
2348 flushq(queue_t *q, int flag)
2349 {
2350 	flushq_common(q, flag, 0);
2351 }
2352 
2353 /*
2354  * Flush the queue of messages of the given priority band.
2355  * There is some duplication of code between flushq and flushband.
2356  * This is because we want to optimize the code as much as possible.
2357  * The assumption is that there will be more messages in the normal
2358  * (priority 0) band than in any other.
2359  *
2360  * Historical note: when merging the M_FLUSH code in strrput with this
2361  * code one difference was discovered. flushband had an extra check for
2362  * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
2363  * case. That check does not match the man page for flushband and was not
2364  * in the strrput flush code hence it was removed.
2365  */
2366 void
2367 flushband(queue_t *q, unsigned char pri, int flag)
2368 {
2369 	mblk_t *mp;
2370 	mblk_t *nmp;
2371 	mblk_t *last;
2372 	qband_t *qbp;
2373 	int band;
2374 
2375 	ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
2376 	if (pri > q->q_nband) {
2377 		return;
2378 	}
2379 	mutex_enter(QLOCK(q));
2380 	if (pri == 0) {
2381 		mp = q->q_first;
2382 		q->q_first = NULL;
2383 		q->q_last = NULL;
2384 		q->q_count = 0;
2385 		q->q_mblkcnt = 0;
2386 		for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2387 			qbp->qb_first = NULL;
2388 			qbp->qb_last = NULL;
2389 			qbp->qb_count = 0;
2390 			qbp->qb_mblkcnt = 0;
2391 			qbp->qb_flag &= ~QB_FULL;
2392 		}
2393 		q->q_flag &= ~QFULL;
2394 		mutex_exit(QLOCK(q));
2395 		while (mp) {
2396 			nmp = mp->b_next;
2397 			mp->b_next = mp->b_prev = NULL;
2398 			if ((mp->b_band == 0) &&
2399 			    ((flag == FLUSHALL) ||
2400 			    datamsg(mp->b_datap->db_type)))
2401 				freemsg(mp);
2402 			else
2403 				(void) putq(q, mp);
2404 			mp = nmp;
2405 		}
2406 		mutex_enter(QLOCK(q));
2407 		if ((q->q_flag & QWANTW) &&
2408 		    (((q->q_count < q->q_lowat) &&
2409 		    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2410 			q->q_flag &= ~QWANTW;
2411 			mutex_exit(QLOCK(q));
2412 
2413 			backenable(q, pri);
2414 		} else
2415 			mutex_exit(QLOCK(q));
2416 	} else {	/* pri != 0 */
2417 		boolean_t flushed = B_FALSE;
2418 		band = pri;
2419 
2420 		ASSERT(MUTEX_HELD(QLOCK(q)));
2421 		qbp = q->q_bandp;
2422 		while (--band > 0)
2423 			qbp = qbp->qb_next;
2424 		mp = qbp->qb_first;
2425 		if (mp == NULL) {
2426 			mutex_exit(QLOCK(q));
2427 			return;
2428 		}
2429 		last = qbp->qb_last->b_next;
2430 		/*
2431 		 * rmvq_noenab() and freemsg() are called for each mblk that
2432 		 * meets the criteria.  The loop is executed until the last
2433 		 * mblk has been processed.
2434 		 */
2435 		while (mp != last) {
2436 			ASSERT(mp->b_band == pri);
2437 			nmp = mp->b_next;
2438 			if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
2439 				rmvq_noenab(q, mp);
2440 				freemsg(mp);
2441 				flushed = B_TRUE;
2442 			}
2443 			mp = nmp;
2444 		}
2445 		mutex_exit(QLOCK(q));
2446 
2447 		/*
2448 		 * If any mblk(s) has been freed, we know that qbackenable()
2449 		 * will need to be called.
2450 		 */
2451 		if (flushed)
2452 			qbackenable(q, pri);
2453 	}
2454 }
2455 
2456 /*
2457  * Return 1 if the queue is not full.  If the queue is full, return
2458  * 0 (may not put message) and set QWANTW flag (caller wants to write
2459  * to the queue).
2460  */
2461 int
2462 canput(queue_t *q)
2463 {
2464 	TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
2465 
2466 	/* this is for loopback transports, they should not do a canput */
2467 	ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
2468 
2469 	/* Find next forward module that has a service procedure */
2470 	q = q->q_nfsrv;
2471 
2472 	if (!(q->q_flag & QFULL)) {
2473 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2474 		return (1);
2475 	}
2476 	mutex_enter(QLOCK(q));
2477 	if (q->q_flag & QFULL) {
2478 		q->q_flag |= QWANTW;
2479 		mutex_exit(QLOCK(q));
2480 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
2481 		return (0);
2482 	}
2483 	mutex_exit(QLOCK(q));
2484 	TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2485 	return (1);
2486 }
2487 
2488 /*
2489  * This is the new canput for use with priority bands.  Return 1 if the
2490  * band is not full.  If the band is full, return 0 (may not put message)
2491  * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
2492  * write to the queue).
2493  */
2494 int
2495 bcanput(queue_t *q, unsigned char pri)
2496 {
2497 	qband_t *qbp;
2498 
2499 	TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
2500 	if (!q)
2501 		return (0);
2502 
2503 	/* Find next forward module that has a service procedure */
2504 	q = q->q_nfsrv;
2505 
2506 	mutex_enter(QLOCK(q));
2507 	if (pri == 0) {
2508 		if (q->q_flag & QFULL) {
2509 			q->q_flag |= QWANTW;
2510 			mutex_exit(QLOCK(q));
2511 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2512 			    "bcanput:%p %X %d", q, pri, 0);
2513 			return (0);
2514 		}
2515 	} else {	/* pri != 0 */
2516 		if (pri > q->q_nband) {
2517 			/*
2518 			 * No band exists yet, so return success.
2519 			 */
2520 			mutex_exit(QLOCK(q));
2521 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2522 			    "bcanput:%p %X %d", q, pri, 1);
2523 			return (1);
2524 		}
2525 		qbp = q->q_bandp;
2526 		while (--pri)
2527 			qbp = qbp->qb_next;
2528 		if (qbp->qb_flag & QB_FULL) {
2529 			qbp->qb_flag |= QB_WANTW;
2530 			mutex_exit(QLOCK(q));
2531 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2532 			    "bcanput:%p %X %d", q, pri, 0);
2533 			return (0);
2534 		}
2535 	}
2536 	mutex_exit(QLOCK(q));
2537 	TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2538 	    "bcanput:%p %X %d", q, pri, 1);
2539 	return (1);
2540 }
2541 
2542 /*
2543  * Put a message on a queue.
2544  *
2545  * Messages are enqueued on a priority basis.  The priority classes
2546  * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
2547  * and B_NORMAL (type < QPCTL && band == 0).
2548  *
2549  * Add appropriate weighted data block sizes to queue count.
2550  * If queue hits high water mark then set QFULL flag.
2551  *
2552  * If QNOENAB is not set (putq is allowed to enable the queue),
2553  * enable the queue only if the message is PRIORITY,
2554  * or the QWANTR flag is set (indicating that the service procedure
2555  * is ready to read the queue.  This implies that a service
2556  * procedure must NEVER put a high priority message back on its own
2557  * queue, as this would result in an infinite loop (!).
2558  */
2559 int
2560 putq(queue_t *q, mblk_t *bp)
2561 {
2562 	mblk_t *tmp;
2563 	qband_t *qbp = NULL;
2564 	int mcls = (int)queclass(bp);
2565 	kthread_id_t freezer;
2566 	int	bytecnt = 0, mblkcnt = 0;
2567 
2568 	freezer = STREAM(q)->sd_freezer;
2569 	if (freezer == curthread) {
2570 		ASSERT(frozenstr(q));
2571 		ASSERT(MUTEX_HELD(QLOCK(q)));
2572 	} else
2573 		mutex_enter(QLOCK(q));
2574 
2575 	/*
2576 	 * Make sanity checks and if qband structure is not yet
2577 	 * allocated, do so.
2578 	 */
2579 	if (mcls == QPCTL) {
2580 		if (bp->b_band != 0)
2581 			bp->b_band = 0;		/* force to be correct */
2582 	} else if (bp->b_band != 0) {
2583 		int i;
2584 		qband_t **qbpp;
2585 
2586 		if (bp->b_band > q->q_nband) {
2587 
2588 			/*
2589 			 * The qband structure for this priority band is
2590 			 * not on the queue yet, so we have to allocate
2591 			 * one on the fly.  It would be wasteful to
2592 			 * associate the qband structures with every
2593 			 * queue when the queues are allocated.  This is
2594 			 * because most queues will only need the normal
2595 			 * band of flow which can be described entirely
2596 			 * by the queue itself.
2597 			 */
2598 			qbpp = &q->q_bandp;
2599 			while (*qbpp)
2600 				qbpp = &(*qbpp)->qb_next;
2601 			while (bp->b_band > q->q_nband) {
2602 				if ((*qbpp = allocband()) == NULL) {
2603 					if (freezer != curthread)
2604 						mutex_exit(QLOCK(q));
2605 					return (0);
2606 				}
2607 				(*qbpp)->qb_hiwat = q->q_hiwat;
2608 				(*qbpp)->qb_lowat = q->q_lowat;
2609 				q->q_nband++;
2610 				qbpp = &(*qbpp)->qb_next;
2611 			}
2612 		}
2613 		ASSERT(MUTEX_HELD(QLOCK(q)));
2614 		qbp = q->q_bandp;
2615 		i = bp->b_band;
2616 		while (--i)
2617 			qbp = qbp->qb_next;
2618 	}
2619 
2620 	/*
2621 	 * If queue is empty, add the message and initialize the pointers.
2622 	 * Otherwise, adjust message pointers and queue pointers based on
2623 	 * the type of the message and where it belongs on the queue.  Some
2624 	 * code is duplicated to minimize the number of conditionals and
2625 	 * hopefully minimize the amount of time this routine takes.
2626 	 */
2627 	if (!q->q_first) {
2628 		bp->b_next = NULL;
2629 		bp->b_prev = NULL;
2630 		q->q_first = bp;
2631 		q->q_last = bp;
2632 		if (qbp) {
2633 			qbp->qb_first = bp;
2634 			qbp->qb_last = bp;
2635 		}
2636 	} else if (!qbp) {	/* bp->b_band == 0 */
2637 
2638 		/*
2639 		 * If queue class of message is less than or equal to
2640 		 * that of the last one on the queue, tack on to the end.
2641 		 */
2642 		tmp = q->q_last;
2643 		if (mcls <= (int)queclass(tmp)) {
2644 			bp->b_next = NULL;
2645 			bp->b_prev = tmp;
2646 			tmp->b_next = bp;
2647 			q->q_last = bp;
2648 		} else {
2649 			tmp = q->q_first;
2650 			while ((int)queclass(tmp) >= mcls)
2651 				tmp = tmp->b_next;
2652 
2653 			/*
2654 			 * Insert bp before tmp.
2655 			 */
2656 			bp->b_next = tmp;
2657 			bp->b_prev = tmp->b_prev;
2658 			if (tmp->b_prev)
2659 				tmp->b_prev->b_next = bp;
2660 			else
2661 				q->q_first = bp;
2662 			tmp->b_prev = bp;
2663 		}
2664 	} else {		/* bp->b_band != 0 */
2665 		if (qbp->qb_first) {
2666 			tmp = qbp->qb_last;
2667 
2668 			/*
2669 			 * Insert bp after the last message in this band.
2670 			 */
2671 			bp->b_next = tmp->b_next;
2672 			if (tmp->b_next)
2673 				tmp->b_next->b_prev = bp;
2674 			else
2675 				q->q_last = bp;
2676 			bp->b_prev = tmp;
2677 			tmp->b_next = bp;
2678 		} else {
2679 			tmp = q->q_last;
2680 			if ((mcls < (int)queclass(tmp)) ||
2681 			    (bp->b_band <= tmp->b_band)) {
2682 
2683 				/*
2684 				 * Tack bp on end of queue.
2685 				 */
2686 				bp->b_next = NULL;
2687 				bp->b_prev = tmp;
2688 				tmp->b_next = bp;
2689 				q->q_last = bp;
2690 			} else {
2691 				tmp = q->q_first;
2692 				while (tmp->b_datap->db_type >= QPCTL)
2693 					tmp = tmp->b_next;
2694 				while (tmp->b_band >= bp->b_band)
2695 					tmp = tmp->b_next;
2696 
2697 				/*
2698 				 * Insert bp before tmp.
2699 				 */
2700 				bp->b_next = tmp;
2701 				bp->b_prev = tmp->b_prev;
2702 				if (tmp->b_prev)
2703 					tmp->b_prev->b_next = bp;
2704 				else
2705 					q->q_first = bp;
2706 				tmp->b_prev = bp;
2707 			}
2708 			qbp->qb_first = bp;
2709 		}
2710 		qbp->qb_last = bp;
2711 	}
2712 
2713 	/* Get message byte count for q_count accounting */
2714 	bytecnt = mp_cont_len(bp, &mblkcnt);
2715 
2716 	if (qbp) {
2717 		qbp->qb_count += bytecnt;
2718 		qbp->qb_mblkcnt += mblkcnt;
2719 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2720 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2721 			qbp->qb_flag |= QB_FULL;
2722 		}
2723 	} else {
2724 		q->q_count += bytecnt;
2725 		q->q_mblkcnt += mblkcnt;
2726 		if ((q->q_count >= q->q_hiwat) ||
2727 		    (q->q_mblkcnt >= q->q_hiwat)) {
2728 			q->q_flag |= QFULL;
2729 		}
2730 	}
2731 
2732 	STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, 0);
2733 
2734 	if ((mcls > QNORM) ||
2735 	    (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
2736 		qenable_locked(q);
2737 	ASSERT(MUTEX_HELD(QLOCK(q)));
2738 	if (freezer != curthread)
2739 		mutex_exit(QLOCK(q));
2740 
2741 	return (1);
2742 }
2743 
2744 /*
2745  * Put stuff back at beginning of Q according to priority order.
2746  * See comment on putq above for details.
2747  */
2748 int
2749 putbq(queue_t *q, mblk_t *bp)
2750 {
2751 	mblk_t *tmp;
2752 	qband_t *qbp = NULL;
2753 	int mcls = (int)queclass(bp);
2754 	kthread_id_t freezer;
2755 	int	bytecnt = 0, mblkcnt = 0;
2756 
2757 	ASSERT(q && bp);
2758 	ASSERT(bp->b_next == NULL);
2759 	freezer = STREAM(q)->sd_freezer;
2760 	if (freezer == curthread) {
2761 		ASSERT(frozenstr(q));
2762 		ASSERT(MUTEX_HELD(QLOCK(q)));
2763 	} else
2764 		mutex_enter(QLOCK(q));
2765 
2766 	/*
2767 	 * Make sanity checks and if qband structure is not yet
2768 	 * allocated, do so.
2769 	 */
2770 	if (mcls == QPCTL) {
2771 		if (bp->b_band != 0)
2772 			bp->b_band = 0;		/* force to be correct */
2773 	} else if (bp->b_band != 0) {
2774 		int i;
2775 		qband_t **qbpp;
2776 
2777 		if (bp->b_band > q->q_nband) {
2778 			qbpp = &q->q_bandp;
2779 			while (*qbpp)
2780 				qbpp = &(*qbpp)->qb_next;
2781 			while (bp->b_band > q->q_nband) {
2782 				if ((*qbpp = allocband()) == NULL) {
2783 					if (freezer != curthread)
2784 						mutex_exit(QLOCK(q));
2785 					return (0);
2786 				}
2787 				(*qbpp)->qb_hiwat = q->q_hiwat;
2788 				(*qbpp)->qb_lowat = q->q_lowat;
2789 				q->q_nband++;
2790 				qbpp = &(*qbpp)->qb_next;
2791 			}
2792 		}
2793 		qbp = q->q_bandp;
2794 		i = bp->b_band;
2795 		while (--i)
2796 			qbp = qbp->qb_next;
2797 	}
2798 
2799 	/*
2800 	 * If queue is empty or if message is high priority,
2801 	 * place on the front of the queue.
2802 	 */
2803 	tmp = q->q_first;
2804 	if ((!tmp) || (mcls == QPCTL)) {
2805 		bp->b_next = tmp;
2806 		if (tmp)
2807 			tmp->b_prev = bp;
2808 		else
2809 			q->q_last = bp;
2810 		q->q_first = bp;
2811 		bp->b_prev = NULL;
2812 		if (qbp) {
2813 			qbp->qb_first = bp;
2814 			qbp->qb_last = bp;
2815 		}
2816 	} else if (qbp) {	/* bp->b_band != 0 */
2817 		tmp = qbp->qb_first;
2818 		if (tmp) {
2819 
2820 			/*
2821 			 * Insert bp before the first message in this band.
2822 			 */
2823 			bp->b_next = tmp;
2824 			bp->b_prev = tmp->b_prev;
2825 			if (tmp->b_prev)
2826 				tmp->b_prev->b_next = bp;
2827 			else
2828 				q->q_first = bp;
2829 			tmp->b_prev = bp;
2830 		} else {
2831 			tmp = q->q_last;
2832 			if ((mcls < (int)queclass(tmp)) ||
2833 			    (bp->b_band < tmp->b_band)) {
2834 
2835 				/*
2836 				 * Tack bp on end of queue.
2837 				 */
2838 				bp->b_next = NULL;
2839 				bp->b_prev = tmp;
2840 				tmp->b_next = bp;
2841 				q->q_last = bp;
2842 			} else {
2843 				tmp = q->q_first;
2844 				while (tmp->b_datap->db_type >= QPCTL)
2845 					tmp = tmp->b_next;
2846 				while (tmp->b_band > bp->b_band)
2847 					tmp = tmp->b_next;
2848 
2849 				/*
2850 				 * Insert bp before tmp.
2851 				 */
2852 				bp->b_next = tmp;
2853 				bp->b_prev = tmp->b_prev;
2854 				if (tmp->b_prev)
2855 					tmp->b_prev->b_next = bp;
2856 				else
2857 					q->q_first = bp;
2858 				tmp->b_prev = bp;
2859 			}
2860 			qbp->qb_last = bp;
2861 		}
2862 		qbp->qb_first = bp;
2863 	} else {		/* bp->b_band == 0 && !QPCTL */
2864 
2865 		/*
2866 		 * If the queue class or band is less than that of the last
2867 		 * message on the queue, tack bp on the end of the queue.
2868 		 */
2869 		tmp = q->q_last;
2870 		if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
2871 			bp->b_next = NULL;
2872 			bp->b_prev = tmp;
2873 			tmp->b_next = bp;
2874 			q->q_last = bp;
2875 		} else {
2876 			tmp = q->q_first;
2877 			while (tmp->b_datap->db_type >= QPCTL)
2878 				tmp = tmp->b_next;
2879 			while (tmp->b_band > bp->b_band)
2880 				tmp = tmp->b_next;
2881 
2882 			/*
2883 			 * Insert bp before tmp.
2884 			 */
2885 			bp->b_next = tmp;
2886 			bp->b_prev = tmp->b_prev;
2887 			if (tmp->b_prev)
2888 				tmp->b_prev->b_next = bp;
2889 			else
2890 				q->q_first = bp;
2891 			tmp->b_prev = bp;
2892 		}
2893 	}
2894 
2895 	/* Get message byte count for q_count accounting */
2896 	bytecnt = mp_cont_len(bp, &mblkcnt);
2897 
2898 	if (qbp) {
2899 		qbp->qb_count += bytecnt;
2900 		qbp->qb_mblkcnt += mblkcnt;
2901 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2902 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2903 			qbp->qb_flag |= QB_FULL;
2904 		}
2905 	} else {
2906 		q->q_count += bytecnt;
2907 		q->q_mblkcnt += mblkcnt;
2908 		if ((q->q_count >= q->q_hiwat) ||
2909 		    (q->q_mblkcnt >= q->q_hiwat)) {
2910 			q->q_flag |= QFULL;
2911 		}
2912 	}
2913 
2914 	STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, 0);
2915 
2916 	if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
2917 		qenable_locked(q);
2918 	ASSERT(MUTEX_HELD(QLOCK(q)));
2919 	if (freezer != curthread)
2920 		mutex_exit(QLOCK(q));
2921 
2922 	return (1);
2923 }
2924 
2925 /*
2926  * Insert a message before an existing message on the queue.  If the
2927  * existing message is NULL, the new messages is placed on the end of
2928  * the queue.  The queue class of the new message is ignored.  However,
2929  * the priority band of the new message must adhere to the following
2930  * ordering:
2931  *
2932  *	emp->b_prev->b_band >= mp->b_band >= emp->b_band.
2933  *
2934  * All flow control parameters are updated.
2935  *
2936  * insq can be called with the stream frozen, but other utility functions
2937  * holding QLOCK, and by streams modules without any locks/frozen.
2938  */
2939 int
2940 insq(queue_t *q, mblk_t *emp, mblk_t *mp)
2941 {
2942 	mblk_t *tmp;
2943 	qband_t *qbp = NULL;
2944 	int mcls = (int)queclass(mp);
2945 	kthread_id_t freezer;
2946 	int	bytecnt = 0, mblkcnt = 0;
2947 
2948 	freezer = STREAM(q)->sd_freezer;
2949 	if (freezer == curthread) {
2950 		ASSERT(frozenstr(q));
2951 		ASSERT(MUTEX_HELD(QLOCK(q)));
2952 	} else if (MUTEX_HELD(QLOCK(q))) {
2953 		/* Don't drop lock on exit */
2954 		freezer = curthread;
2955 	} else
2956 		mutex_enter(QLOCK(q));
2957 
2958 	if (mcls == QPCTL) {
2959 		if (mp->b_band != 0)
2960 			mp->b_band = 0;		/* force to be correct */
2961 		if (emp && emp->b_prev &&
2962 		    (emp->b_prev->b_datap->db_type < QPCTL))
2963 			goto badord;
2964 	}
2965 	if (emp) {
2966 		if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
2967 		    (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
2968 		    (emp->b_prev->b_band < mp->b_band))) {
2969 			goto badord;
2970 		}
2971 	} else {
2972 		tmp = q->q_last;
2973 		if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
2974 badord:
2975 			cmn_err(CE_WARN,
2976 			    "insq: attempt to insert message out of order "
2977 			    "on q %p", (void *)q);
2978 			if (freezer != curthread)
2979 				mutex_exit(QLOCK(q));
2980 			return (0);
2981 		}
2982 	}
2983 
2984 	if (mp->b_band != 0) {
2985 		int i;
2986 		qband_t **qbpp;
2987 
2988 		if (mp->b_band > q->q_nband) {
2989 			qbpp = &q->q_bandp;
2990 			while (*qbpp)
2991 				qbpp = &(*qbpp)->qb_next;
2992 			while (mp->b_band > q->q_nband) {
2993 				if ((*qbpp = allocband()) == NULL) {
2994 					if (freezer != curthread)
2995 						mutex_exit(QLOCK(q));
2996 					return (0);
2997 				}
2998 				(*qbpp)->qb_hiwat = q->q_hiwat;
2999 				(*qbpp)->qb_lowat = q->q_lowat;
3000 				q->q_nband++;
3001 				qbpp = &(*qbpp)->qb_next;
3002 			}
3003 		}
3004 		qbp = q->q_bandp;
3005 		i = mp->b_band;
3006 		while (--i)
3007 			qbp = qbp->qb_next;
3008 	}
3009 
3010 	if ((mp->b_next = emp) != NULL) {
3011 		if ((mp->b_prev = emp->b_prev) != NULL)
3012 			emp->b_prev->b_next = mp;
3013 		else
3014 			q->q_first = mp;
3015 		emp->b_prev = mp;
3016 	} else {
3017 		if ((mp->b_prev = q->q_last) != NULL)
3018 			q->q_last->b_next = mp;
3019 		else
3020 			q->q_first = mp;
3021 		q->q_last = mp;
3022 	}
3023 
3024 	/* Get mblk and byte count for q_count accounting */
3025 	bytecnt = mp_cont_len(mp, &mblkcnt);
3026 
3027 	if (qbp) {	/* adjust qband pointers and count */
3028 		if (!qbp->qb_first) {
3029 			qbp->qb_first = mp;
3030 			qbp->qb_last = mp;
3031 		} else {
3032 			if (mp->b_prev == NULL || (mp->b_prev != NULL &&
3033 			    (mp->b_prev->b_band != mp->b_band)))
3034 				qbp->qb_first = mp;
3035 			else if (mp->b_next == NULL || (mp->b_next != NULL &&
3036 			    (mp->b_next->b_band != mp->b_band)))
3037 				qbp->qb_last = mp;
3038 		}
3039 		qbp->qb_count += bytecnt;
3040 		qbp->qb_mblkcnt += mblkcnt;
3041 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
3042 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
3043 			qbp->qb_flag |= QB_FULL;
3044 		}
3045 	} else {
3046 		q->q_count += bytecnt;
3047 		q->q_mblkcnt += mblkcnt;
3048 		if ((q->q_count >= q->q_hiwat) ||
3049 		    (q->q_mblkcnt >= q->q_hiwat)) {
3050 			q->q_flag |= QFULL;
3051 		}
3052 	}
3053 
3054 	STR_FTEVENT_MSG(mp, q, FTEV_INSQ, 0);
3055 
3056 	if (canenable(q) && (q->q_flag & QWANTR))
3057 		qenable_locked(q);
3058 
3059 	ASSERT(MUTEX_HELD(QLOCK(q)));
3060 	if (freezer != curthread)
3061 		mutex_exit(QLOCK(q));
3062 
3063 	return (1);
3064 }
3065 
3066 /*
3067  * Create and put a control message on queue.
3068  */
3069 int
3070 putctl(queue_t *q, int type)
3071 {
3072 	mblk_t *bp;
3073 
3074 	if ((datamsg(type) && (type != M_DELAY)) ||
3075 	    (bp = allocb_tryhard(0)) == NULL)
3076 		return (0);
3077 	bp->b_datap->db_type = (unsigned char) type;
3078 
3079 	put(q, bp);
3080 
3081 	return (1);
3082 }
3083 
3084 /*
3085  * Control message with a single-byte parameter
3086  */
3087 int
3088 putctl1(queue_t *q, int type, int param)
3089 {
3090 	mblk_t *bp;
3091 
3092 	if ((datamsg(type) && (type != M_DELAY)) ||
3093 	    (bp = allocb_tryhard(1)) == NULL)
3094 		return (0);
3095 	bp->b_datap->db_type = (unsigned char)type;
3096 	*bp->b_wptr++ = (unsigned char)param;
3097 
3098 	put(q, bp);
3099 
3100 	return (1);
3101 }
3102 
3103 int
3104 putnextctl1(queue_t *q, int type, int param)
3105 {
3106 	mblk_t *bp;
3107 
3108 	if ((datamsg(type) && (type != M_DELAY)) ||
3109 	    ((bp = allocb_tryhard(1)) == NULL))
3110 		return (0);
3111 
3112 	bp->b_datap->db_type = (unsigned char)type;
3113 	*bp->b_wptr++ = (unsigned char)param;
3114 
3115 	putnext(q, bp);
3116 
3117 	return (1);
3118 }
3119 
3120 int
3121 putnextctl(queue_t *q, int type)
3122 {
3123 	mblk_t *bp;
3124 
3125 	if ((datamsg(type) && (type != M_DELAY)) ||
3126 	    ((bp = allocb_tryhard(0)) == NULL))
3127 		return (0);
3128 	bp->b_datap->db_type = (unsigned char)type;
3129 
3130 	putnext(q, bp);
3131 
3132 	return (1);
3133 }
3134 
3135 /*
3136  * Return the queue upstream from this one
3137  */
3138 queue_t *
3139 backq(queue_t *q)
3140 {
3141 	q = _OTHERQ(q);
3142 	if (q->q_next) {
3143 		q = q->q_next;
3144 		return (_OTHERQ(q));
3145 	}
3146 	return (NULL);
3147 }
3148 
3149 /*
3150  * Send a block back up the queue in reverse from this
3151  * one (e.g. to respond to ioctls)
3152  */
3153 void
3154 qreply(queue_t *q, mblk_t *bp)
3155 {
3156 	ASSERT(q && bp);
3157 
3158 	putnext(_OTHERQ(q), bp);
3159 }
3160 
3161 /*
3162  * Streams Queue Scheduling
3163  *
3164  * Queues are enabled through qenable() when they have messages to
3165  * process.  They are serviced by queuerun(), which runs each enabled
3166  * queue's service procedure.  The call to queuerun() is processor
3167  * dependent - the general principle is that it be run whenever a queue
3168  * is enabled but before returning to user level.  For system calls,
3169  * the function runqueues() is called if their action causes a queue
3170  * to be enabled.  For device interrupts, queuerun() should be
3171  * called before returning from the last level of interrupt.  Beyond
3172  * this, no timing assumptions should be made about queue scheduling.
3173  */
3174 
3175 /*
3176  * Enable a queue: put it on list of those whose service procedures are
3177  * ready to run and set up the scheduling mechanism.
3178  * The broadcast is done outside the mutex -> to avoid the woken thread
3179  * from contending with the mutex. This is OK 'cos the queue has been
3180  * enqueued on the runlist and flagged safely at this point.
3181  */
3182 void
3183 qenable(queue_t *q)
3184 {
3185 	mutex_enter(QLOCK(q));
3186 	qenable_locked(q);
3187 	mutex_exit(QLOCK(q));
3188 }
3189 /*
3190  * Return number of messages on queue
3191  */
3192 int
3193 qsize(queue_t *qp)
3194 {
3195 	int count = 0;
3196 	mblk_t *mp;
3197 
3198 	mutex_enter(QLOCK(qp));
3199 	for (mp = qp->q_first; mp; mp = mp->b_next)
3200 		count++;
3201 	mutex_exit(QLOCK(qp));
3202 	return (count);
3203 }
3204 
3205 /*
3206  * noenable - set queue so that putq() will not enable it.
3207  * enableok - set queue so that putq() can enable it.
3208  */
3209 void
3210 noenable(queue_t *q)
3211 {
3212 	mutex_enter(QLOCK(q));
3213 	q->q_flag |= QNOENB;
3214 	mutex_exit(QLOCK(q));
3215 }
3216 
3217 void
3218 enableok(queue_t *q)
3219 {
3220 	mutex_enter(QLOCK(q));
3221 	q->q_flag &= ~QNOENB;
3222 	mutex_exit(QLOCK(q));
3223 }
3224 
3225 /*
3226  * Set queue fields.
3227  */
3228 int
3229 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
3230 {
3231 	qband_t *qbp = NULL;
3232 	queue_t	*wrq;
3233 	int error = 0;
3234 	kthread_id_t freezer;
3235 
3236 	freezer = STREAM(q)->sd_freezer;
3237 	if (freezer == curthread) {
3238 		ASSERT(frozenstr(q));
3239 		ASSERT(MUTEX_HELD(QLOCK(q)));
3240 	} else
3241 		mutex_enter(QLOCK(q));
3242 
3243 	if (what >= QBAD) {
3244 		error = EINVAL;
3245 		goto done;
3246 	}
3247 	if (pri != 0) {
3248 		int i;
3249 		qband_t **qbpp;
3250 
3251 		if (pri > q->q_nband) {
3252 			qbpp = &q->q_bandp;
3253 			while (*qbpp)
3254 				qbpp = &(*qbpp)->qb_next;
3255 			while (pri > q->q_nband) {
3256 				if ((*qbpp = allocband()) == NULL) {
3257 					error = EAGAIN;
3258 					goto done;
3259 				}
3260 				(*qbpp)->qb_hiwat = q->q_hiwat;
3261 				(*qbpp)->qb_lowat = q->q_lowat;
3262 				q->q_nband++;
3263 				qbpp = &(*qbpp)->qb_next;
3264 			}
3265 		}
3266 		qbp = q->q_bandp;
3267 		i = pri;
3268 		while (--i)
3269 			qbp = qbp->qb_next;
3270 	}
3271 	switch (what) {
3272 
3273 	case QHIWAT:
3274 		if (qbp)
3275 			qbp->qb_hiwat = (size_t)val;
3276 		else
3277 			q->q_hiwat = (size_t)val;
3278 		break;
3279 
3280 	case QLOWAT:
3281 		if (qbp)
3282 			qbp->qb_lowat = (size_t)val;
3283 		else
3284 			q->q_lowat = (size_t)val;
3285 		break;
3286 
3287 	case QMAXPSZ:
3288 		if (qbp)
3289 			error = EINVAL;
3290 		else
3291 			q->q_maxpsz = (ssize_t)val;
3292 
3293 		/*
3294 		 * Performance concern, strwrite looks at the module below
3295 		 * the stream head for the maxpsz each time it does a write
3296 		 * we now cache it at the stream head.  Check to see if this
3297 		 * queue is sitting directly below the stream head.
3298 		 */
3299 		wrq = STREAM(q)->sd_wrq;
3300 		if (q != wrq->q_next)
3301 			break;
3302 
3303 		/*
3304 		 * If the stream is not frozen drop the current QLOCK and
3305 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3306 		 */
3307 		if (freezer != curthread) {
3308 			mutex_exit(QLOCK(q));
3309 			mutex_enter(QLOCK(wrq));
3310 		}
3311 		ASSERT(MUTEX_HELD(QLOCK(wrq)));
3312 
3313 		if (strmsgsz != 0) {
3314 			if (val == INFPSZ)
3315 				val = strmsgsz;
3316 			else  {
3317 				if (STREAM(q)->sd_vnode->v_type == VFIFO)
3318 					val = MIN(PIPE_BUF, val);
3319 				else
3320 					val = MIN(strmsgsz, val);
3321 			}
3322 		}
3323 		STREAM(q)->sd_qn_maxpsz = val;
3324 		if (freezer != curthread) {
3325 			mutex_exit(QLOCK(wrq));
3326 			mutex_enter(QLOCK(q));
3327 		}
3328 		break;
3329 
3330 	case QMINPSZ:
3331 		if (qbp)
3332 			error = EINVAL;
3333 		else
3334 			q->q_minpsz = (ssize_t)val;
3335 
3336 		/*
3337 		 * Performance concern, strwrite looks at the module below
3338 		 * the stream head for the maxpsz each time it does a write
3339 		 * we now cache it at the stream head.  Check to see if this
3340 		 * queue is sitting directly below the stream head.
3341 		 */
3342 		wrq = STREAM(q)->sd_wrq;
3343 		if (q != wrq->q_next)
3344 			break;
3345 
3346 		/*
3347 		 * If the stream is not frozen drop the current QLOCK and
3348 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3349 		 */
3350 		if (freezer != curthread) {
3351 			mutex_exit(QLOCK(q));
3352 			mutex_enter(QLOCK(wrq));
3353 		}
3354 		STREAM(q)->sd_qn_minpsz = (ssize_t)val;
3355 
3356 		if (freezer != curthread) {
3357 			mutex_exit(QLOCK(wrq));
3358 			mutex_enter(QLOCK(q));
3359 		}
3360 		break;
3361 
3362 	case QSTRUIOT:
3363 		if (qbp)
3364 			error = EINVAL;
3365 		else
3366 			q->q_struiot = (ushort_t)val;
3367 		break;
3368 
3369 	case QCOUNT:
3370 	case QFIRST:
3371 	case QLAST:
3372 	case QFLAG:
3373 		error = EPERM;
3374 		break;
3375 
3376 	default:
3377 		error = EINVAL;
3378 		break;
3379 	}
3380 done:
3381 	if (freezer != curthread)
3382 		mutex_exit(QLOCK(q));
3383 	return (error);
3384 }
3385 
3386 /*
3387  * Get queue fields.
3388  */
3389 int
3390 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
3391 {
3392 	qband_t		*qbp = NULL;
3393 	int		error = 0;
3394 	kthread_id_t	freezer;
3395 
3396 	freezer = STREAM(q)->sd_freezer;
3397 	if (freezer == curthread) {
3398 		ASSERT(frozenstr(q));
3399 		ASSERT(MUTEX_HELD(QLOCK(q)));
3400 	} else
3401 		mutex_enter(QLOCK(q));
3402 	if (what >= QBAD) {
3403 		error = EINVAL;
3404 		goto done;
3405 	}
3406 	if (pri != 0) {
3407 		int i;
3408 		qband_t **qbpp;
3409 
3410 		if (pri > q->q_nband) {
3411 			qbpp = &q->q_bandp;
3412 			while (*qbpp)
3413 				qbpp = &(*qbpp)->qb_next;
3414 			while (pri > q->q_nband) {
3415 				if ((*qbpp = allocband()) == NULL) {
3416 					error = EAGAIN;
3417 					goto done;
3418 				}
3419 				(*qbpp)->qb_hiwat = q->q_hiwat;
3420 				(*qbpp)->qb_lowat = q->q_lowat;
3421 				q->q_nband++;
3422 				qbpp = &(*qbpp)->qb_next;
3423 			}
3424 		}
3425 		qbp = q->q_bandp;
3426 		i = pri;
3427 		while (--i)
3428 			qbp = qbp->qb_next;
3429 	}
3430 	switch (what) {
3431 	case QHIWAT:
3432 		if (qbp)
3433 			*(size_t *)valp = qbp->qb_hiwat;
3434 		else
3435 			*(size_t *)valp = q->q_hiwat;
3436 		break;
3437 
3438 	case QLOWAT:
3439 		if (qbp)
3440 			*(size_t *)valp = qbp->qb_lowat;
3441 		else
3442 			*(size_t *)valp = q->q_lowat;
3443 		break;
3444 
3445 	case QMAXPSZ:
3446 		if (qbp)
3447 			error = EINVAL;
3448 		else
3449 			*(ssize_t *)valp = q->q_maxpsz;
3450 		break;
3451 
3452 	case QMINPSZ:
3453 		if (qbp)
3454 			error = EINVAL;
3455 		else
3456 			*(ssize_t *)valp = q->q_minpsz;
3457 		break;
3458 
3459 	case QCOUNT:
3460 		if (qbp)
3461 			*(size_t *)valp = qbp->qb_count;
3462 		else
3463 			*(size_t *)valp = q->q_count;
3464 		break;
3465 
3466 	case QFIRST:
3467 		if (qbp)
3468 			*(mblk_t **)valp = qbp->qb_first;
3469 		else
3470 			*(mblk_t **)valp = q->q_first;
3471 		break;
3472 
3473 	case QLAST:
3474 		if (qbp)
3475 			*(mblk_t **)valp = qbp->qb_last;
3476 		else
3477 			*(mblk_t **)valp = q->q_last;
3478 		break;
3479 
3480 	case QFLAG:
3481 		if (qbp)
3482 			*(uint_t *)valp = qbp->qb_flag;
3483 		else
3484 			*(uint_t *)valp = q->q_flag;
3485 		break;
3486 
3487 	case QSTRUIOT:
3488 		if (qbp)
3489 			error = EINVAL;
3490 		else
3491 			*(short *)valp = q->q_struiot;
3492 		break;
3493 
3494 	default:
3495 		error = EINVAL;
3496 		break;
3497 	}
3498 done:
3499 	if (freezer != curthread)
3500 		mutex_exit(QLOCK(q));
3501 	return (error);
3502 }
3503 
3504 /*
3505  * Function awakes all in cvwait/sigwait/pollwait, on one of:
3506  *	QWANTWSYNC or QWANTR or QWANTW,
3507  *
3508  * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
3509  *	 deferred wakeup will be done. Also if strpoll() in progress then a
3510  *	 deferred pollwakeup will be done.
3511  */
3512 void
3513 strwakeq(queue_t *q, int flag)
3514 {
3515 	stdata_t	*stp = STREAM(q);
3516 	pollhead_t	*pl;
3517 
3518 	mutex_enter(&stp->sd_lock);
3519 	pl = &stp->sd_pollist;
3520 	if (flag & QWANTWSYNC) {
3521 		ASSERT(!(q->q_flag & QREADR));
3522 		if (stp->sd_flag & WSLEEP) {
3523 			stp->sd_flag &= ~WSLEEP;
3524 			cv_broadcast(&stp->sd_wrq->q_wait);
3525 		} else {
3526 			stp->sd_wakeq |= WSLEEP;
3527 		}
3528 
3529 		mutex_exit(&stp->sd_lock);
3530 		pollwakeup(pl, POLLWRNORM);
3531 		mutex_enter(&stp->sd_lock);
3532 
3533 		if (stp->sd_sigflags & S_WRNORM)
3534 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3535 	} else if (flag & QWANTR) {
3536 		if (stp->sd_flag & RSLEEP) {
3537 			stp->sd_flag &= ~RSLEEP;
3538 			cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
3539 		} else {
3540 			stp->sd_wakeq |= RSLEEP;
3541 		}
3542 
3543 		mutex_exit(&stp->sd_lock);
3544 		pollwakeup(pl, POLLIN | POLLRDNORM);
3545 		mutex_enter(&stp->sd_lock);
3546 
3547 		{
3548 			int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
3549 
3550 			if (events)
3551 				strsendsig(stp->sd_siglist, events, 0, 0);
3552 		}
3553 	} else {
3554 		if (stp->sd_flag & WSLEEP) {
3555 			stp->sd_flag &= ~WSLEEP;
3556 			cv_broadcast(&stp->sd_wrq->q_wait);
3557 		}
3558 
3559 		mutex_exit(&stp->sd_lock);
3560 		pollwakeup(pl, POLLWRNORM);
3561 		mutex_enter(&stp->sd_lock);
3562 
3563 		if (stp->sd_sigflags & S_WRNORM)
3564 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3565 	}
3566 	mutex_exit(&stp->sd_lock);
3567 }
3568 
3569 int
3570 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
3571 {
3572 	stdata_t *stp = STREAM(q);
3573 	int typ  = STRUIOT_STANDARD;
3574 	uio_t	 *uiop = &dp->d_uio;
3575 	dblk_t	 *dbp;
3576 	ssize_t	 uiocnt;
3577 	ssize_t	 cnt;
3578 	unsigned char *ptr;
3579 	ssize_t	 resid;
3580 	int	 error = 0;
3581 	on_trap_data_t otd;
3582 	queue_t	*stwrq;
3583 
3584 	/*
3585 	 * Plumbing may change while taking the type so store the
3586 	 * queue in a temporary variable. It doesn't matter even
3587 	 * if the we take the type from the previous plumbing,
3588 	 * that's because if the plumbing has changed when we were
3589 	 * holding the queue in a temporary variable, we can continue
3590 	 * processing the message the way it would have been processed
3591 	 * in the old plumbing, without any side effects but a bit
3592 	 * extra processing for partial ip header checksum.
3593 	 *
3594 	 * This has been done to avoid holding the sd_lock which is
3595 	 * very hot.
3596 	 */
3597 
3598 	stwrq = stp->sd_struiowrq;
3599 	if (stwrq)
3600 		typ = stwrq->q_struiot;
3601 
3602 	for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
3603 		dbp = mp->b_datap;
3604 		ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
3605 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3606 		cnt = MIN(uiocnt, uiop->uio_resid);
3607 		if (!(dbp->db_struioflag & STRUIO_SPEC) ||
3608 		    (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
3609 			/*
3610 			 * Either this mblk has already been processed
3611 			 * or there is no more room in this mblk (?).
3612 			 */
3613 			continue;
3614 		}
3615 		switch (typ) {
3616 		case STRUIOT_STANDARD:
3617 			if (noblock) {
3618 				if (on_trap(&otd, OT_DATA_ACCESS)) {
3619 					no_trap();
3620 					error = EWOULDBLOCK;
3621 					goto out;
3622 				}
3623 			}
3624 			if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
3625 				if (noblock)
3626 					no_trap();
3627 				goto out;
3628 			}
3629 			if (noblock)
3630 				no_trap();
3631 			break;
3632 
3633 		default:
3634 			error = EIO;
3635 			goto out;
3636 		}
3637 		dbp->db_struioflag |= STRUIO_DONE;
3638 		dbp->db_cksumstuff += cnt;
3639 	}
3640 out:
3641 	if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
3642 		/*
3643 		 * A fault has occured and some bytes were moved to the
3644 		 * current mblk, the uio_t has already been updated by
3645 		 * the appropriate uio routine, so also update the mblk
3646 		 * to reflect this in case this same mblk chain is used
3647 		 * again (after the fault has been handled).
3648 		 */
3649 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3650 		if (uiocnt >= resid)
3651 			dbp->db_cksumstuff += resid;
3652 	}
3653 	return (error);
3654 }
3655 
3656 /*
3657  * Try to enter queue synchronously. Any attempt to enter a closing queue will
3658  * fails. The qp->q_rwcnt keeps track of the number of successful entries so
3659  * that removeq() will not try to close the queue while a thread is inside the
3660  * queue.
3661  */
3662 static boolean_t
3663 rwnext_enter(queue_t *qp)
3664 {
3665 	mutex_enter(QLOCK(qp));
3666 	if (qp->q_flag & QWCLOSE) {
3667 		mutex_exit(QLOCK(qp));
3668 		return (B_FALSE);
3669 	}
3670 	qp->q_rwcnt++;
3671 	ASSERT(qp->q_rwcnt != 0);
3672 	mutex_exit(QLOCK(qp));
3673 	return (B_TRUE);
3674 }
3675 
3676 /*
3677  * Decrease the count of threads running in sync stream queue and wake up any
3678  * threads blocked in removeq().
3679  */
3680 static void
3681 rwnext_exit(queue_t *qp)
3682 {
3683 	mutex_enter(QLOCK(qp));
3684 	qp->q_rwcnt--;
3685 	if (qp->q_flag & QWANTRMQSYNC) {
3686 		qp->q_flag &= ~QWANTRMQSYNC;
3687 		cv_broadcast(&qp->q_wait);
3688 	}
3689 	mutex_exit(QLOCK(qp));
3690 }
3691 
3692 /*
3693  * The purpose of rwnext() is to call the rw procedure of the next
3694  * (downstream) modules queue.
3695  *
3696  * treated as put entrypoint for perimeter syncronization.
3697  *
3698  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3699  * sync queues). If it is CIPUT sync queue sq_count is incremented and it does
3700  * not matter if any regular put entrypoints have been already entered. We
3701  * can't increment one of the sq_putcounts (instead of sq_count) because
3702  * qwait_rw won't know which counter to decrement.
3703  *
3704  * It would be reasonable to add the lockless FASTPUT logic.
3705  */
3706 int
3707 rwnext(queue_t *qp, struiod_t *dp)
3708 {
3709 	queue_t		*nqp;
3710 	syncq_t		*sq;
3711 	uint16_t	count;
3712 	uint16_t	flags;
3713 	struct qinit	*qi;
3714 	int		(*proc)();
3715 	struct stdata	*stp;
3716 	int		isread;
3717 	int		rval;
3718 
3719 	stp = STREAM(qp);
3720 	/*
3721 	 * Prevent q_next from changing by holding sd_lock until acquiring
3722 	 * SQLOCK. Note that a read-side rwnext from the streamhead will
3723 	 * already have sd_lock acquired. In either case sd_lock is always
3724 	 * released after acquiring SQLOCK.
3725 	 *
3726 	 * The streamhead read-side holding sd_lock when calling rwnext is
3727 	 * required to prevent a race condition were M_DATA mblks flowing
3728 	 * up the read-side of the stream could be bypassed by a rwnext()
3729 	 * down-call. In this case sd_lock acts as the streamhead perimeter.
3730 	 */
3731 	if ((nqp = _WR(qp)) == qp) {
3732 		isread = 0;
3733 		mutex_enter(&stp->sd_lock);
3734 		qp = nqp->q_next;
3735 	} else {
3736 		isread = 1;
3737 		if (nqp != stp->sd_wrq)
3738 			/* Not streamhead */
3739 			mutex_enter(&stp->sd_lock);
3740 		qp = _RD(nqp->q_next);
3741 	}
3742 	qi = qp->q_qinfo;
3743 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
3744 		/*
3745 		 * Not a synchronous module or no r/w procedure for this
3746 		 * queue, so just return EINVAL and let the caller handle it.
3747 		 */
3748 		mutex_exit(&stp->sd_lock);
3749 		return (EINVAL);
3750 	}
3751 
3752 	if (rwnext_enter(qp) == B_FALSE) {
3753 		mutex_exit(&stp->sd_lock);
3754 		return (EINVAL);
3755 	}
3756 
3757 	sq = qp->q_syncq;
3758 	mutex_enter(SQLOCK(sq));
3759 	mutex_exit(&stp->sd_lock);
3760 	count = sq->sq_count;
3761 	flags = sq->sq_flags;
3762 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3763 
3764 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3765 		/*
3766 		 * if this queue is being closed, return.
3767 		 */
3768 		if (qp->q_flag & QWCLOSE) {
3769 			mutex_exit(SQLOCK(sq));
3770 			rwnext_exit(qp);
3771 			return (EINVAL);
3772 		}
3773 
3774 		/*
3775 		 * Wait until we can enter the inner perimeter.
3776 		 */
3777 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3778 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3779 		count = sq->sq_count;
3780 		flags = sq->sq_flags;
3781 	}
3782 
3783 	if (isread == 0 && stp->sd_struiowrq == NULL ||
3784 	    isread == 1 && stp->sd_struiordq == NULL) {
3785 		/*
3786 		 * Stream plumbing changed while waiting for inner perimeter
3787 		 * so just return EINVAL and let the caller handle it.
3788 		 */
3789 		mutex_exit(SQLOCK(sq));
3790 		rwnext_exit(qp);
3791 		return (EINVAL);
3792 	}
3793 	if (!(flags & SQ_CIPUT))
3794 		sq->sq_flags = flags | SQ_EXCL;
3795 	sq->sq_count = count + 1;
3796 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3797 	/*
3798 	 * Note: The only message ordering guarantee that rwnext() makes is
3799 	 *	 for the write queue flow-control case. All others (r/w queue
3800 	 *	 with q_count > 0 (or q_first != 0)) are the resposibilty of
3801 	 *	 the queue's rw procedure. This could be genralized here buy
3802 	 *	 running the queue's service procedure, but that wouldn't be
3803 	 *	 the most efficent for all cases.
3804 	 */
3805 	mutex_exit(SQLOCK(sq));
3806 	if (! isread && (qp->q_flag & QFULL)) {
3807 		/*
3808 		 * Write queue may be flow controlled. If so,
3809 		 * mark the queue for wakeup when it's not.
3810 		 */
3811 		mutex_enter(QLOCK(qp));
3812 		if (qp->q_flag & QFULL) {
3813 			qp->q_flag |= QWANTWSYNC;
3814 			mutex_exit(QLOCK(qp));
3815 			rval = EWOULDBLOCK;
3816 			goto out;
3817 		}
3818 		mutex_exit(QLOCK(qp));
3819 	}
3820 
3821 	if (! isread && dp->d_mp)
3822 		STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
3823 		    dp->d_mp->b_datap->db_base);
3824 
3825 	rval = (*proc)(qp, dp);
3826 
3827 	if (isread && dp->d_mp)
3828 		STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
3829 		    dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
3830 out:
3831 	/*
3832 	 * The queue is protected from being freed by sq_count, so it is
3833 	 * safe to call rwnext_exit and reacquire SQLOCK(sq).
3834 	 */
3835 	rwnext_exit(qp);
3836 
3837 	mutex_enter(SQLOCK(sq));
3838 	flags = sq->sq_flags;
3839 	ASSERT(sq->sq_count != 0);
3840 	sq->sq_count--;
3841 	if (flags & SQ_TAIL) {
3842 		putnext_tail(sq, qp, flags);
3843 		/*
3844 		 * The only purpose of this ASSERT is to preserve calling stack
3845 		 * in DEBUG kernel.
3846 		 */
3847 		ASSERT(flags & SQ_TAIL);
3848 		return (rval);
3849 	}
3850 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3851 	/*
3852 	 * Safe to always drop SQ_EXCL:
3853 	 *	Not SQ_CIPUT means we set SQ_EXCL above
3854 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3855 	 *	did a qwriter(INNER) in which case nobody else
3856 	 *	is in the inner perimeter and we are exiting.
3857 	 *
3858 	 * I would like to make the following assertion:
3859 	 *
3860 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3861 	 *	sq->sq_count == 0);
3862 	 *
3863 	 * which indicates that if we are both putshared and exclusive,
3864 	 * we became exclusive while executing the putproc, and the only
3865 	 * claim on the syncq was the one we dropped a few lines above.
3866 	 * But other threads that enter putnext while the syncq is exclusive
3867 	 * need to make a claim as they may need to drop SQLOCK in the
3868 	 * has_writers case to avoid deadlocks.  If these threads are
3869 	 * delayed or preempted, it is possible that the writer thread can
3870 	 * find out that there are other claims making the (sq_count == 0)
3871 	 * test invalid.
3872 	 */
3873 
3874 	sq->sq_flags = flags & ~SQ_EXCL;
3875 	if (sq->sq_flags & SQ_WANTWAKEUP) {
3876 		sq->sq_flags &= ~SQ_WANTWAKEUP;
3877 		cv_broadcast(&sq->sq_wait);
3878 	}
3879 	mutex_exit(SQLOCK(sq));
3880 	return (rval);
3881 }
3882 
3883 /*
3884  * The purpose of infonext() is to call the info procedure of the next
3885  * (downstream) modules queue.
3886  *
3887  * treated as put entrypoint for perimeter syncronization.
3888  *
3889  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3890  * sync queues). If it is CIPUT sync queue regular sq_count is incremented and
3891  * it does not matter if any regular put entrypoints have been already
3892  * entered.
3893  */
3894 int
3895 infonext(queue_t *qp, infod_t *idp)
3896 {
3897 	queue_t		*nqp;
3898 	syncq_t		*sq;
3899 	uint16_t	count;
3900 	uint16_t	flags;
3901 	struct qinit	*qi;
3902 	int		(*proc)();
3903 	struct stdata	*stp;
3904 	int		rval;
3905 
3906 	stp = STREAM(qp);
3907 	/*
3908 	 * Prevent q_next from changing by holding sd_lock until
3909 	 * acquiring SQLOCK.
3910 	 */
3911 	mutex_enter(&stp->sd_lock);
3912 	if ((nqp = _WR(qp)) == qp) {
3913 		qp = nqp->q_next;
3914 	} else {
3915 		qp = _RD(nqp->q_next);
3916 	}
3917 	qi = qp->q_qinfo;
3918 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
3919 		mutex_exit(&stp->sd_lock);
3920 		return (EINVAL);
3921 	}
3922 	sq = qp->q_syncq;
3923 	mutex_enter(SQLOCK(sq));
3924 	mutex_exit(&stp->sd_lock);
3925 	count = sq->sq_count;
3926 	flags = sq->sq_flags;
3927 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3928 
3929 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3930 		/*
3931 		 * Wait until we can enter the inner perimeter.
3932 		 */
3933 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3934 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3935 		count = sq->sq_count;
3936 		flags = sq->sq_flags;
3937 	}
3938 
3939 	if (! (flags & SQ_CIPUT))
3940 		sq->sq_flags = flags | SQ_EXCL;
3941 	sq->sq_count = count + 1;
3942 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3943 	mutex_exit(SQLOCK(sq));
3944 
3945 	rval = (*proc)(qp, idp);
3946 
3947 	mutex_enter(SQLOCK(sq));
3948 	flags = sq->sq_flags;
3949 	ASSERT(sq->sq_count != 0);
3950 	sq->sq_count--;
3951 	if (flags & SQ_TAIL) {
3952 		putnext_tail(sq, qp, flags);
3953 		/*
3954 		 * The only purpose of this ASSERT is to preserve calling stack
3955 		 * in DEBUG kernel.
3956 		 */
3957 		ASSERT(flags & SQ_TAIL);
3958 		return (rval);
3959 	}
3960 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3961 /*
3962  * XXXX
3963  * I am not certain the next comment is correct here.  I need to consider
3964  * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
3965  * might cause other problems.  It just might be safer to drop it if
3966  * !SQ_CIPUT because that is when we set it.
3967  */
3968 	/*
3969 	 * Safe to always drop SQ_EXCL:
3970 	 *	Not SQ_CIPUT means we set SQ_EXCL above
3971 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3972 	 *	did a qwriter(INNER) in which case nobody else
3973 	 *	is in the inner perimeter and we are exiting.
3974 	 *
3975 	 * I would like to make the following assertion:
3976 	 *
3977 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3978 	 *	sq->sq_count == 0);
3979 	 *
3980 	 * which indicates that if we are both putshared and exclusive,
3981 	 * we became exclusive while executing the putproc, and the only
3982 	 * claim on the syncq was the one we dropped a few lines above.
3983 	 * But other threads that enter putnext while the syncq is exclusive
3984 	 * need to make a claim as they may need to drop SQLOCK in the
3985 	 * has_writers case to avoid deadlocks.  If these threads are
3986 	 * delayed or preempted, it is possible that the writer thread can
3987 	 * find out that there are other claims making the (sq_count == 0)
3988 	 * test invalid.
3989 	 */
3990 
3991 	sq->sq_flags = flags & ~SQ_EXCL;
3992 	mutex_exit(SQLOCK(sq));
3993 	return (rval);
3994 }
3995 
3996 /*
3997  * Return nonzero if the queue is responsible for struio(), else return 0.
3998  */
3999 int
4000 isuioq(queue_t *q)
4001 {
4002 	if (q->q_flag & QREADR)
4003 		return (STREAM(q)->sd_struiordq == q);
4004 	else
4005 		return (STREAM(q)->sd_struiowrq == q);
4006 }
4007 
4008 #if defined(__sparc)
4009 int disable_putlocks = 0;
4010 #else
4011 int disable_putlocks = 1;
4012 #endif
4013 
4014 /*
4015  * called by create_putlock.
4016  */
4017 static void
4018 create_syncq_putlocks(queue_t *q)
4019 {
4020 	syncq_t	*sq = q->q_syncq;
4021 	ciputctrl_t *cip;
4022 	int i;
4023 
4024 	ASSERT(sq != NULL);
4025 
4026 	ASSERT(disable_putlocks == 0);
4027 	ASSERT(n_ciputctrl >= min_n_ciputctrl);
4028 	ASSERT(ciputctrl_cache != NULL);
4029 
4030 	if (!(sq->sq_type & SQ_CIPUT))
4031 		return;
4032 
4033 	for (i = 0; i <= 1; i++) {
4034 		if (sq->sq_ciputctrl == NULL) {
4035 			cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4036 			SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4037 			mutex_enter(SQLOCK(sq));
4038 			if (sq->sq_ciputctrl != NULL) {
4039 				mutex_exit(SQLOCK(sq));
4040 				kmem_cache_free(ciputctrl_cache, cip);
4041 			} else {
4042 				ASSERT(sq->sq_nciputctrl == 0);
4043 				sq->sq_nciputctrl = n_ciputctrl - 1;
4044 				/*
4045 				 * putnext checks sq_ciputctrl without holding
4046 				 * SQLOCK. if it is not NULL putnext assumes
4047 				 * sq_nciputctrl is initialized. membar below
4048 				 * insures that.
4049 				 */
4050 				membar_producer();
4051 				sq->sq_ciputctrl = cip;
4052 				mutex_exit(SQLOCK(sq));
4053 			}
4054 		}
4055 		ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
4056 		if (i == 1)
4057 			break;
4058 		q = _OTHERQ(q);
4059 		if (!(q->q_flag & QPERQ)) {
4060 			ASSERT(sq == q->q_syncq);
4061 			break;
4062 		}
4063 		ASSERT(q->q_syncq != NULL);
4064 		ASSERT(sq != q->q_syncq);
4065 		sq = q->q_syncq;
4066 		ASSERT(sq->sq_type & SQ_CIPUT);
4067 	}
4068 }
4069 
4070 /*
4071  * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
4072  * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
4073  * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
4074  * starting from q and down to the driver.
4075  *
4076  * This should be called after the affected queues are part of stream
4077  * geometry. It should be called from driver/module open routine after
4078  * qprocson() call. It is also called from nfs syscall where it is known that
4079  * stream is configured and won't change its geometry during create_putlock
4080  * call.
4081  *
4082  * caller normally uses 0 value for the stream argument to speed up MT putnext
4083  * into the perimeter of q for example because its perimeter is per module
4084  * (e.g. IP).
4085  *
4086  * caller normally uses non 0 value for the stream argument to hint the system
4087  * that the stream of q is a very contended global system stream
4088  * (e.g. NFS/UDP) and the part of the stream from q to the driver is
4089  * particularly MT hot.
4090  *
4091  * Caller insures stream plumbing won't happen while we are here and therefore
4092  * q_next can be safely used.
4093  */
4094 
4095 void
4096 create_putlocks(queue_t *q, int stream)
4097 {
4098 	ciputctrl_t	*cip;
4099 	struct stdata	*stp = STREAM(q);
4100 
4101 	q = _WR(q);
4102 	ASSERT(stp != NULL);
4103 
4104 	if (disable_putlocks != 0)
4105 		return;
4106 
4107 	if (n_ciputctrl < min_n_ciputctrl)
4108 		return;
4109 
4110 	ASSERT(ciputctrl_cache != NULL);
4111 
4112 	if (stream != 0 && stp->sd_ciputctrl == NULL) {
4113 		cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4114 		SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4115 		mutex_enter(&stp->sd_lock);
4116 		if (stp->sd_ciputctrl != NULL) {
4117 			mutex_exit(&stp->sd_lock);
4118 			kmem_cache_free(ciputctrl_cache, cip);
4119 		} else {
4120 			ASSERT(stp->sd_nciputctrl == 0);
4121 			stp->sd_nciputctrl = n_ciputctrl - 1;
4122 			/*
4123 			 * putnext checks sd_ciputctrl without holding
4124 			 * sd_lock. if it is not NULL putnext assumes
4125 			 * sd_nciputctrl is initialized. membar below
4126 			 * insures that.
4127 			 */
4128 			membar_producer();
4129 			stp->sd_ciputctrl = cip;
4130 			mutex_exit(&stp->sd_lock);
4131 		}
4132 	}
4133 
4134 	ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);
4135 
4136 	while (_SAMESTR(q)) {
4137 		create_syncq_putlocks(q);
4138 		if (stream == 0)
4139 			return;
4140 		q = q->q_next;
4141 	}
4142 	ASSERT(q != NULL);
4143 	create_syncq_putlocks(q);
4144 }
4145 
4146 /*
4147  * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
4148  * through a stream.
4149  *
4150  * Data currently record per-event is a timestamp, module/driver name,
4151  * downstream module/driver name, optional callstack, event type and a per
4152  * type datum.  Much of the STREAMS framework is instrumented for automatic
4153  * flow tracing (when enabled).  Events can be defined and used by STREAMS
4154  * modules and drivers.
4155  *
4156  * Global objects:
4157  *
4158  *	str_ftevent() - Add a flow-trace event to a dblk.
4159  *	str_ftfree() - Free flow-trace data
4160  *
4161  * Local objects:
4162  *
4163  *	fthdr_cache - pointer to the kmem cache for trace header.
4164  *	ftblk_cache - pointer to the kmem cache for trace data blocks.
4165  */
4166 
4167 int str_ftnever = 1;	/* Don't do STREAMS flow tracing */
4168 int str_ftstack = 0;	/* Don't record event call stacks */
4169 
4170 void
4171 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
4172 {
4173 	ftblk_t *bp = hp->tail;
4174 	ftblk_t *nbp;
4175 	ftevnt_t *ep;
4176 	int ix, nix;
4177 
4178 	ASSERT(hp != NULL);
4179 
4180 	for (;;) {
4181 		if ((ix = bp->ix) == FTBLK_EVNTS) {
4182 			/*
4183 			 * Tail doesn't have room, so need a new tail.
4184 			 *
4185 			 * To make this MT safe, first, allocate a new
4186 			 * ftblk, and initialize it.  To make life a
4187 			 * little easier, reserve the first slot (mostly
4188 			 * by making ix = 1).  When we are finished with
4189 			 * the initialization, CAS this pointer to the
4190 			 * tail.  If this succeeds, this is the new
4191 			 * "next" block.  Otherwise, another thread
4192 			 * got here first, so free the block and start
4193 			 * again.
4194 			 */
4195 			nbp = kmem_cache_alloc(ftblk_cache, KM_NOSLEEP);
4196 			if (nbp == NULL) {
4197 				/* no mem, so punt */
4198 				str_ftnever++;
4199 				/* free up all flow data? */
4200 				return;
4201 			}
4202 			nbp->nxt = NULL;
4203 			nbp->ix = 1;
4204 			/*
4205 			 * Just in case there is another thread about
4206 			 * to get the next index, we need to make sure
4207 			 * the value is there for it.
4208 			 */
4209 			membar_producer();
4210 			if (atomic_cas_ptr(&hp->tail, bp, nbp) == bp) {
4211 				/* CAS was successful */
4212 				bp->nxt = nbp;
4213 				membar_producer();
4214 				bp = nbp;
4215 				ix = 0;
4216 				goto cas_good;
4217 			} else {
4218 				kmem_cache_free(ftblk_cache, nbp);
4219 				bp = hp->tail;
4220 				continue;
4221 			}
4222 		}
4223 		nix = ix + 1;
4224 		if (atomic_cas_32((uint32_t *)&bp->ix, ix, nix) == ix) {
4225 		cas_good:
4226 			if (curthread != hp->thread) {
4227 				hp->thread = curthread;
4228 				evnt |= FTEV_CS;
4229 			}
4230 			if (CPU->cpu_seqid != hp->cpu_seqid) {
4231 				hp->cpu_seqid = CPU->cpu_seqid;
4232 				evnt |= FTEV_PS;
4233 			}
4234 			ep = &bp->ev[ix];
4235 			break;
4236 		}
4237 	}
4238 
4239 	if (evnt & FTEV_QMASK) {
4240 		queue_t *qp = p;
4241 
4242 		if (!(qp->q_flag & QREADR))
4243 			evnt |= FTEV_ISWR;
4244 
4245 		ep->mid = Q2NAME(qp);
4246 
4247 		/*
4248 		 * We only record the next queue name for FTEV_PUTNEXT since
4249 		 * that's the only time we *really* need it, and the putnext()
4250 		 * code ensures that qp->q_next won't vanish.  (We could use
4251 		 * claimstr()/releasestr() but at a performance cost.)
4252 		 */
4253 		if ((evnt & FTEV_MASK) == FTEV_PUTNEXT && qp->q_next != NULL)
4254 			ep->midnext = Q2NAME(qp->q_next);
4255 		else
4256 			ep->midnext = NULL;
4257 	} else {
4258 		ep->mid = p;
4259 		ep->midnext = NULL;
4260 	}
4261 
4262 	if (ep->stk != NULL)
4263 		ep->stk->fs_depth = getpcstack(ep->stk->fs_stk, FTSTK_DEPTH);
4264 
4265 	ep->ts = gethrtime();
4266 	ep->evnt = evnt;
4267 	ep->data = data;
4268 	hp->hash = (hp->hash << 9) + hp->hash;
4269 	hp->hash += (evnt << 16) | data;
4270 	hp->hash += (uintptr_t)ep->mid;
4271 }
4272 
4273 /*
4274  * Free flow-trace data.
4275  */
4276 void
4277 str_ftfree(dblk_t *dbp)
4278 {
4279 	fthdr_t *hp = dbp->db_fthdr;
4280 	ftblk_t *bp = &hp->first;
4281 	ftblk_t *nbp;
4282 
4283 	if (bp != hp->tail || bp->ix != 0) {
4284 		/*
4285 		 * Clear out the hash, have the tail point to itself, and free
4286 		 * any continuation blocks.
4287 		 */
4288 		bp = hp->first.nxt;
4289 		hp->tail = &hp->first;
4290 		hp->hash = 0;
4291 		hp->first.nxt = NULL;
4292 		hp->first.ix = 0;
4293 		while (bp != NULL) {
4294 			nbp = bp->nxt;
4295 			kmem_cache_free(ftblk_cache, bp);
4296 			bp = nbp;
4297 		}
4298 	}
4299 	kmem_cache_free(fthdr_cache, hp);
4300 	dbp->db_fthdr = NULL;
4301 }
4302