xref: /titanic_52/usr/src/uts/common/io/stream.c (revision d6c23f6fbecbcca8ddd2b74c6e10f37095f9fd46)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
22 /*	  All Rights Reserved  	*/
23 
24 
25 /*
26  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
27  * Use is subject to license terms.
28  */
29 
30 #pragma ident	"%Z%%M%	%I%	%E% SMI"
31 
32 #include <sys/types.h>
33 #include <sys/param.h>
34 #include <sys/thread.h>
35 #include <sys/sysmacros.h>
36 #include <sys/stropts.h>
37 #include <sys/stream.h>
38 #include <sys/strsubr.h>
39 #include <sys/strsun.h>
40 #include <sys/conf.h>
41 #include <sys/debug.h>
42 #include <sys/cmn_err.h>
43 #include <sys/kmem.h>
44 #include <sys/atomic.h>
45 #include <sys/errno.h>
46 #include <sys/vtrace.h>
47 #include <sys/ftrace.h>
48 #include <sys/ontrap.h>
49 #include <sys/multidata.h>
50 #include <sys/multidata_impl.h>
51 #include <sys/sdt.h>
52 #include <sys/strft.h>
53 
54 #ifdef DEBUG
55 #include <sys/kmem_impl.h>
56 #endif
57 
58 /*
59  * This file contains all the STREAMS utility routines that may
60  * be used by modules and drivers.
61  */
62 
63 /*
64  * STREAMS message allocator: principles of operation
65  *
66  * The streams message allocator consists of all the routines that
67  * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
68  * dupb(), freeb() and freemsg().  What follows is a high-level view
69  * of how the allocator works.
70  *
71  * Every streams message consists of one or more mblks, a dblk, and data.
72  * All mblks for all types of messages come from a common mblk_cache.
73  * The dblk and data come in several flavors, depending on how the
74  * message is allocated:
75  *
76  * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
77  *     fixed-size dblk/data caches. For message sizes that are multiples of
78  *     PAGESIZE, dblks are allocated separately from the buffer.
79  *     The associated buffer is allocated by the constructor using kmem_alloc().
80  *     For all other message sizes, dblk and its associated data is allocated
81  *     as a single contiguous chunk of memory.
82  *     Objects in these caches consist of a dblk plus its associated data.
83  *     allocb() determines the nearest-size cache by table lookup:
84  *     the dblk_cache[] array provides the mapping from size to dblk cache.
85  *
86  * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
87  *     kmem_alloc()'ing a buffer for the data and supplying that
88  *     buffer to gesballoc(), described below.
89  *
90  * (3) The four flavors of [d]esballoc[a] are all implemented by a
91  *     common routine, gesballoc() ("generic esballoc").  gesballoc()
92  *     allocates a dblk from the global dblk_esb_cache and sets db_base,
93  *     db_lim and db_frtnp to describe the caller-supplied buffer.
94  *
95  * While there are several routines to allocate messages, there is only
96  * one routine to free messages: freeb().  freeb() simply invokes the
97  * dblk's free method, dbp->db_free(), which is set at allocation time.
98  *
99  * dupb() creates a new reference to a message by allocating a new mblk,
100  * incrementing the dblk reference count and setting the dblk's free
101  * method to dblk_decref().  The dblk's original free method is retained
102  * in db_lastfree.  dblk_decref() decrements the reference count on each
103  * freeb().  If this is not the last reference it just frees the mblk;
104  * if this *is* the last reference, it restores db_free to db_lastfree,
105  * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
106  *
107  * The implementation makes aggressive use of kmem object caching for
108  * maximum performance.  This makes the code simple and compact, but
109  * also a bit abstruse in some places.  The invariants that constitute a
110  * message's constructed state, described below, are more subtle than usual.
111  *
112  * Every dblk has an "attached mblk" as part of its constructed state.
113  * The mblk is allocated by the dblk's constructor and remains attached
114  * until the message is either dup'ed or pulled up.  In the dupb() case
115  * the mblk association doesn't matter until the last free, at which time
116  * dblk_decref() attaches the last mblk to the dblk.  pullupmsg() affects
117  * the mblk association because it swaps the leading mblks of two messages,
118  * so it is responsible for swapping their db_mblk pointers accordingly.
119  * From a constructed-state viewpoint it doesn't matter that a dblk's
120  * attached mblk can change while the message is allocated; all that
121  * matters is that the dblk has *some* attached mblk when it's freed.
122  *
123  * The sizes of the allocb() small-message caches are not magical.
124  * They represent a good trade-off between internal and external
125  * fragmentation for current workloads.  They should be reevaluated
126  * periodically, especially if allocations larger than DBLK_MAX_CACHE
127  * become common.  We use 64-byte alignment so that dblks don't
128  * straddle cache lines unnecessarily.
129  */
130 #define	DBLK_MAX_CACHE		73728
131 #define	DBLK_CACHE_ALIGN	64
132 #define	DBLK_MIN_SIZE		8
133 #define	DBLK_SIZE_SHIFT		3
134 
135 #ifdef _BIG_ENDIAN
136 #define	DBLK_RTFU_SHIFT(field)	\
137 	(8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
138 #else
139 #define	DBLK_RTFU_SHIFT(field)	\
140 	(8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
141 #endif
142 
143 #define	DBLK_RTFU(ref, type, flags, uioflag)	\
144 	(((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
145 	((type) << DBLK_RTFU_SHIFT(db_type)) | \
146 	(((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
147 	((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
148 #define	DBLK_RTFU_REF_MASK	(DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
149 #define	DBLK_RTFU_WORD(dbp)	(*((uint32_t *)&(dbp)->db_ref))
150 #define	MBLK_BAND_FLAG_WORD(mp)	(*((uint32_t *)&(mp)->b_band))
151 
152 static size_t dblk_sizes[] = {
153 #ifdef _LP64
154 	16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856,
155 	8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624,
156 	40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392,
157 #else
158 	64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904,
159 	8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672,
160 	40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440,
161 #endif
162 	DBLK_MAX_CACHE, 0
163 };
164 
165 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
166 static struct kmem_cache *mblk_cache;
167 static struct kmem_cache *dblk_esb_cache;
168 static struct kmem_cache *fthdr_cache;
169 static struct kmem_cache *ftblk_cache;
170 
171 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
172 static mblk_t *allocb_oversize(size_t size, int flags);
173 static int allocb_tryhard_fails;
174 static void frnop_func(void *arg);
175 frtn_t frnop = { frnop_func };
176 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
177 
178 static boolean_t rwnext_enter(queue_t *qp);
179 static void rwnext_exit(queue_t *qp);
180 
181 /*
182  * Patchable mblk/dblk kmem_cache flags.
183  */
184 int dblk_kmem_flags = 0;
185 int mblk_kmem_flags = 0;
186 
187 
188 static int
189 dblk_constructor(void *buf, void *cdrarg, int kmflags)
190 {
191 	dblk_t *dbp = buf;
192 	ssize_t msg_size = (ssize_t)cdrarg;
193 	size_t index;
194 
195 	ASSERT(msg_size != 0);
196 
197 	index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
198 
199 	ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
200 
201 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
202 		return (-1);
203 	if ((msg_size & PAGEOFFSET) == 0) {
204 		dbp->db_base = kmem_alloc(msg_size, kmflags);
205 		if (dbp->db_base == NULL) {
206 			kmem_cache_free(mblk_cache, dbp->db_mblk);
207 			return (-1);
208 		}
209 	} else {
210 		dbp->db_base = (unsigned char *)&dbp[1];
211 	}
212 
213 	dbp->db_mblk->b_datap = dbp;
214 	dbp->db_cache = dblk_cache[index];
215 	dbp->db_lim = dbp->db_base + msg_size;
216 	dbp->db_free = dbp->db_lastfree = dblk_lastfree;
217 	dbp->db_frtnp = NULL;
218 	dbp->db_fthdr = NULL;
219 	dbp->db_credp = NULL;
220 	dbp->db_cpid = -1;
221 	dbp->db_struioflag = 0;
222 	dbp->db_struioun.cksum.flags = 0;
223 	return (0);
224 }
225 
226 /*ARGSUSED*/
227 static int
228 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
229 {
230 	dblk_t *dbp = buf;
231 
232 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
233 		return (-1);
234 	dbp->db_mblk->b_datap = dbp;
235 	dbp->db_cache = dblk_esb_cache;
236 	dbp->db_fthdr = NULL;
237 	dbp->db_credp = NULL;
238 	dbp->db_cpid = -1;
239 	dbp->db_struioflag = 0;
240 	dbp->db_struioun.cksum.flags = 0;
241 	return (0);
242 }
243 
244 static int
245 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
246 {
247 	dblk_t *dbp = buf;
248 	bcache_t *bcp = (bcache_t *)cdrarg;
249 
250 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
251 		return (-1);
252 
253 	if ((dbp->db_base = (unsigned char *)kmem_cache_alloc(bcp->buffer_cache,
254 	    kmflags)) == NULL) {
255 		kmem_cache_free(mblk_cache, dbp->db_mblk);
256 		return (-1);
257 	}
258 
259 	dbp->db_mblk->b_datap = dbp;
260 	dbp->db_cache = (void *)bcp;
261 	dbp->db_lim = dbp->db_base + bcp->size;
262 	dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
263 	dbp->db_frtnp = NULL;
264 	dbp->db_fthdr = NULL;
265 	dbp->db_credp = NULL;
266 	dbp->db_cpid = -1;
267 	dbp->db_struioflag = 0;
268 	dbp->db_struioun.cksum.flags = 0;
269 	return (0);
270 }
271 
272 /*ARGSUSED*/
273 static void
274 dblk_destructor(void *buf, void *cdrarg)
275 {
276 	dblk_t *dbp = buf;
277 	ssize_t msg_size = (ssize_t)cdrarg;
278 
279 	ASSERT(dbp->db_mblk->b_datap == dbp);
280 
281 	ASSERT(msg_size != 0);
282 
283 	ASSERT(dbp->db_struioflag == 0);
284 	ASSERT(dbp->db_struioun.cksum.flags == 0);
285 
286 	if ((msg_size & PAGEOFFSET) == 0) {
287 		kmem_free(dbp->db_base, msg_size);
288 	}
289 
290 	kmem_cache_free(mblk_cache, dbp->db_mblk);
291 }
292 
293 static void
294 bcache_dblk_destructor(void *buf, void *cdrarg)
295 {
296 	dblk_t *dbp = buf;
297 	bcache_t *bcp = (bcache_t *)cdrarg;
298 
299 	kmem_cache_free(bcp->buffer_cache, dbp->db_base);
300 
301 	ASSERT(dbp->db_mblk->b_datap == dbp);
302 
303 	ASSERT(dbp->db_struioflag == 0);
304 	ASSERT(dbp->db_struioun.cksum.flags == 0);
305 
306 	kmem_cache_free(mblk_cache, dbp->db_mblk);
307 }
308 
309 void
310 streams_msg_init(void)
311 {
312 	char name[40];
313 	size_t size;
314 	size_t lastsize = DBLK_MIN_SIZE;
315 	size_t *sizep;
316 	struct kmem_cache *cp;
317 	size_t tot_size;
318 	int offset;
319 
320 	mblk_cache = kmem_cache_create("streams_mblk",
321 	    sizeof (mblk_t), 32, NULL, NULL, NULL, NULL, NULL,
322 	    mblk_kmem_flags);
323 
324 	for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
325 
326 		if ((offset = (size & PAGEOFFSET)) != 0) {
327 			/*
328 			 * We are in the middle of a page, dblk should
329 			 * be allocated on the same page
330 			 */
331 			tot_size = size + sizeof (dblk_t);
332 			ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
333 			    < PAGESIZE);
334 			ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
335 
336 		} else {
337 
338 			/*
339 			 * buf size is multiple of page size, dblk and
340 			 * buffer are allocated separately.
341 			 */
342 
343 			ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
344 			tot_size = sizeof (dblk_t);
345 		}
346 
347 		(void) sprintf(name, "streams_dblk_%ld", size);
348 		cp = kmem_cache_create(name, tot_size,
349 		    DBLK_CACHE_ALIGN, dblk_constructor,
350 		    dblk_destructor, NULL,
351 		    (void *)(size), NULL, dblk_kmem_flags);
352 
353 		while (lastsize <= size) {
354 			dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
355 			lastsize += DBLK_MIN_SIZE;
356 		}
357 	}
358 
359 	dblk_esb_cache = kmem_cache_create("streams_dblk_esb",
360 	    sizeof (dblk_t), DBLK_CACHE_ALIGN,
361 	    dblk_esb_constructor, dblk_destructor, NULL,
362 	    (void *) sizeof (dblk_t), NULL, dblk_kmem_flags);
363 	fthdr_cache = kmem_cache_create("streams_fthdr",
364 	    sizeof (fthdr_t), 32, NULL, NULL, NULL, NULL, NULL, 0);
365 	ftblk_cache = kmem_cache_create("streams_ftblk",
366 	    sizeof (ftblk_t), 32, NULL, NULL, NULL, NULL, NULL, 0);
367 
368 	/* Initialize Multidata caches */
369 	mmd_init();
370 
371 	/* initialize throttling queue for esballoc */
372 	esballoc_queue_init();
373 }
374 
375 /*ARGSUSED*/
376 mblk_t *
377 allocb(size_t size, uint_t pri)
378 {
379 	dblk_t *dbp;
380 	mblk_t *mp;
381 	size_t index;
382 
383 	index =  (size - 1)  >> DBLK_SIZE_SHIFT;
384 
385 	if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
386 		if (size != 0) {
387 			mp = allocb_oversize(size, KM_NOSLEEP);
388 			goto out;
389 		}
390 		index = 0;
391 	}
392 
393 	if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
394 		mp = NULL;
395 		goto out;
396 	}
397 
398 	mp = dbp->db_mblk;
399 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
400 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
401 	mp->b_rptr = mp->b_wptr = dbp->db_base;
402 	mp->b_queue = NULL;
403 	MBLK_BAND_FLAG_WORD(mp) = 0;
404 	STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
405 out:
406 	FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
407 
408 	return (mp);
409 }
410 
411 mblk_t *
412 allocb_tmpl(size_t size, const mblk_t *tmpl)
413 {
414 	mblk_t *mp = allocb(size, 0);
415 
416 	if (mp != NULL) {
417 		cred_t *cr = DB_CRED(tmpl);
418 		if (cr != NULL)
419 			crhold(mp->b_datap->db_credp = cr);
420 		DB_CPID(mp) = DB_CPID(tmpl);
421 		DB_TYPE(mp) = DB_TYPE(tmpl);
422 	}
423 	return (mp);
424 }
425 
426 mblk_t *
427 allocb_cred(size_t size, cred_t *cr)
428 {
429 	mblk_t *mp = allocb(size, 0);
430 
431 	if (mp != NULL && cr != NULL)
432 		crhold(mp->b_datap->db_credp = cr);
433 
434 	return (mp);
435 }
436 
437 mblk_t *
438 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr)
439 {
440 	mblk_t *mp = allocb_wait(size, 0, flags, error);
441 
442 	if (mp != NULL && cr != NULL)
443 		crhold(mp->b_datap->db_credp = cr);
444 
445 	return (mp);
446 }
447 
448 void
449 freeb(mblk_t *mp)
450 {
451 	dblk_t *dbp = mp->b_datap;
452 
453 	ASSERT(dbp->db_ref > 0);
454 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
455 	FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
456 
457 	STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
458 
459 	dbp->db_free(mp, dbp);
460 }
461 
462 void
463 freemsg(mblk_t *mp)
464 {
465 	FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
466 	while (mp) {
467 		dblk_t *dbp = mp->b_datap;
468 		mblk_t *mp_cont = mp->b_cont;
469 
470 		ASSERT(dbp->db_ref > 0);
471 		ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
472 
473 		STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
474 
475 		dbp->db_free(mp, dbp);
476 		mp = mp_cont;
477 	}
478 }
479 
480 /*
481  * Reallocate a block for another use.  Try hard to use the old block.
482  * If the old data is wanted (copy), leave b_wptr at the end of the data,
483  * otherwise return b_wptr = b_rptr.
484  *
485  * This routine is private and unstable.
486  */
487 mblk_t	*
488 reallocb(mblk_t *mp, size_t size, uint_t copy)
489 {
490 	mblk_t		*mp1;
491 	unsigned char	*old_rptr;
492 	ptrdiff_t	cur_size;
493 
494 	if (mp == NULL)
495 		return (allocb(size, BPRI_HI));
496 
497 	cur_size = mp->b_wptr - mp->b_rptr;
498 	old_rptr = mp->b_rptr;
499 
500 	ASSERT(mp->b_datap->db_ref != 0);
501 
502 	if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
503 		/*
504 		 * If the data is wanted and it will fit where it is, no
505 		 * work is required.
506 		 */
507 		if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
508 			return (mp);
509 
510 		mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
511 		mp1 = mp;
512 	} else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
513 		/* XXX other mp state could be copied too, db_flags ... ? */
514 		mp1->b_cont = mp->b_cont;
515 	} else {
516 		return (NULL);
517 	}
518 
519 	if (copy) {
520 		bcopy(old_rptr, mp1->b_rptr, cur_size);
521 		mp1->b_wptr = mp1->b_rptr + cur_size;
522 	}
523 
524 	if (mp != mp1)
525 		freeb(mp);
526 
527 	return (mp1);
528 }
529 
530 static void
531 dblk_lastfree(mblk_t *mp, dblk_t *dbp)
532 {
533 	ASSERT(dbp->db_mblk == mp);
534 	if (dbp->db_fthdr != NULL)
535 		str_ftfree(dbp);
536 
537 	/* set credp and projid to be 'unspecified' before returning to cache */
538 	if (dbp->db_credp != NULL) {
539 		crfree(dbp->db_credp);
540 		dbp->db_credp = NULL;
541 	}
542 	dbp->db_cpid = -1;
543 
544 	/* Reset the struioflag and the checksum flag fields */
545 	dbp->db_struioflag = 0;
546 	dbp->db_struioun.cksum.flags = 0;
547 
548 	/* and the COOKED and/or UIOA flag(s) */
549 	dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA);
550 
551 	kmem_cache_free(dbp->db_cache, dbp);
552 }
553 
554 static void
555 dblk_decref(mblk_t *mp, dblk_t *dbp)
556 {
557 	if (dbp->db_ref != 1) {
558 		uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
559 		    -(1 << DBLK_RTFU_SHIFT(db_ref)));
560 		/*
561 		 * atomic_add_32_nv() just decremented db_ref, so we no longer
562 		 * have a reference to the dblk, which means another thread
563 		 * could free it.  Therefore we cannot examine the dblk to
564 		 * determine whether ours was the last reference.  Instead,
565 		 * we extract the new and minimum reference counts from rtfu.
566 		 * Note that all we're really saying is "if (ref != refmin)".
567 		 */
568 		if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
569 		    ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
570 			kmem_cache_free(mblk_cache, mp);
571 			return;
572 		}
573 	}
574 	dbp->db_mblk = mp;
575 	dbp->db_free = dbp->db_lastfree;
576 	dbp->db_lastfree(mp, dbp);
577 }
578 
579 mblk_t *
580 dupb(mblk_t *mp)
581 {
582 	dblk_t *dbp = mp->b_datap;
583 	mblk_t *new_mp;
584 	uint32_t oldrtfu, newrtfu;
585 
586 	if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
587 		goto out;
588 
589 	new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
590 	new_mp->b_rptr = mp->b_rptr;
591 	new_mp->b_wptr = mp->b_wptr;
592 	new_mp->b_datap = dbp;
593 	new_mp->b_queue = NULL;
594 	MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
595 
596 	STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
597 
598 	dbp->db_free = dblk_decref;
599 	do {
600 		ASSERT(dbp->db_ref > 0);
601 		oldrtfu = DBLK_RTFU_WORD(dbp);
602 		newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
603 		/*
604 		 * If db_ref is maxed out we can't dup this message anymore.
605 		 */
606 		if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
607 			kmem_cache_free(mblk_cache, new_mp);
608 			new_mp = NULL;
609 			goto out;
610 		}
611 	} while (cas32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != oldrtfu);
612 
613 out:
614 	FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
615 	return (new_mp);
616 }
617 
618 static void
619 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
620 {
621 	frtn_t *frp = dbp->db_frtnp;
622 
623 	ASSERT(dbp->db_mblk == mp);
624 	frp->free_func(frp->free_arg);
625 	if (dbp->db_fthdr != NULL)
626 		str_ftfree(dbp);
627 
628 	/* set credp and projid to be 'unspecified' before returning to cache */
629 	if (dbp->db_credp != NULL) {
630 		crfree(dbp->db_credp);
631 		dbp->db_credp = NULL;
632 	}
633 	dbp->db_cpid = -1;
634 	dbp->db_struioflag = 0;
635 	dbp->db_struioun.cksum.flags = 0;
636 
637 	kmem_cache_free(dbp->db_cache, dbp);
638 }
639 
640 /*ARGSUSED*/
641 static void
642 frnop_func(void *arg)
643 {
644 }
645 
646 /*
647  * Generic esballoc used to implement the four flavors: [d]esballoc[a].
648  */
649 static mblk_t *
650 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
651 	void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
652 {
653 	dblk_t *dbp;
654 	mblk_t *mp;
655 
656 	ASSERT(base != NULL && frp != NULL);
657 
658 	if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
659 		mp = NULL;
660 		goto out;
661 	}
662 
663 	mp = dbp->db_mblk;
664 	dbp->db_base = base;
665 	dbp->db_lim = base + size;
666 	dbp->db_free = dbp->db_lastfree = lastfree;
667 	dbp->db_frtnp = frp;
668 	DBLK_RTFU_WORD(dbp) = db_rtfu;
669 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
670 	mp->b_rptr = mp->b_wptr = base;
671 	mp->b_queue = NULL;
672 	MBLK_BAND_FLAG_WORD(mp) = 0;
673 
674 out:
675 	FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
676 	return (mp);
677 }
678 
679 /*ARGSUSED*/
680 mblk_t *
681 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
682 {
683 	mblk_t *mp;
684 
685 	/*
686 	 * Note that this is structured to allow the common case (i.e.
687 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
688 	 * call optimization.
689 	 */
690 	if (!str_ftnever) {
691 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
692 		    frp, freebs_enqueue, KM_NOSLEEP);
693 
694 		if (mp != NULL)
695 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
696 		return (mp);
697 	}
698 
699 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
700 	    frp, freebs_enqueue, KM_NOSLEEP));
701 }
702 
703 /*
704  * Same as esballoc() but sleeps waiting for memory.
705  */
706 /*ARGSUSED*/
707 mblk_t *
708 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
709 {
710 	mblk_t *mp;
711 
712 	/*
713 	 * Note that this is structured to allow the common case (i.e.
714 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
715 	 * call optimization.
716 	 */
717 	if (!str_ftnever) {
718 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
719 		    frp, freebs_enqueue, KM_SLEEP);
720 
721 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
722 		return (mp);
723 	}
724 
725 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
726 	    frp, freebs_enqueue, KM_SLEEP));
727 }
728 
729 /*ARGSUSED*/
730 mblk_t *
731 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
732 {
733 	mblk_t *mp;
734 
735 	/*
736 	 * Note that this is structured to allow the common case (i.e.
737 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
738 	 * call optimization.
739 	 */
740 	if (!str_ftnever) {
741 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
742 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
743 
744 		if (mp != NULL)
745 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
746 		return (mp);
747 	}
748 
749 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
750 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
751 }
752 
753 /*ARGSUSED*/
754 mblk_t *
755 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
756 {
757 	mblk_t *mp;
758 
759 	/*
760 	 * Note that this is structured to allow the common case (i.e.
761 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
762 	 * call optimization.
763 	 */
764 	if (!str_ftnever) {
765 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
766 		    frp, freebs_enqueue, KM_NOSLEEP);
767 
768 		if (mp != NULL)
769 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
770 		return (mp);
771 	}
772 
773 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
774 	    frp, freebs_enqueue, KM_NOSLEEP));
775 }
776 
777 /*ARGSUSED*/
778 mblk_t *
779 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
780 {
781 	mblk_t *mp;
782 
783 	/*
784 	 * Note that this is structured to allow the common case (i.e.
785 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
786 	 * call optimization.
787 	 */
788 	if (!str_ftnever) {
789 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
790 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
791 
792 		if (mp != NULL)
793 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
794 		return (mp);
795 	}
796 
797 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
798 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
799 }
800 
801 static void
802 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
803 {
804 	bcache_t *bcp = dbp->db_cache;
805 
806 	ASSERT(dbp->db_mblk == mp);
807 	if (dbp->db_fthdr != NULL)
808 		str_ftfree(dbp);
809 
810 	/* set credp and projid to be 'unspecified' before returning to cache */
811 	if (dbp->db_credp != NULL) {
812 		crfree(dbp->db_credp);
813 		dbp->db_credp = NULL;
814 	}
815 	dbp->db_cpid = -1;
816 	dbp->db_struioflag = 0;
817 	dbp->db_struioun.cksum.flags = 0;
818 
819 	mutex_enter(&bcp->mutex);
820 	kmem_cache_free(bcp->dblk_cache, dbp);
821 	bcp->alloc--;
822 
823 	if (bcp->alloc == 0 && bcp->destroy != 0) {
824 		kmem_cache_destroy(bcp->dblk_cache);
825 		kmem_cache_destroy(bcp->buffer_cache);
826 		mutex_exit(&bcp->mutex);
827 		mutex_destroy(&bcp->mutex);
828 		kmem_free(bcp, sizeof (bcache_t));
829 	} else {
830 		mutex_exit(&bcp->mutex);
831 	}
832 }
833 
834 bcache_t *
835 bcache_create(char *name, size_t size, uint_t align)
836 {
837 	bcache_t *bcp;
838 	char buffer[255];
839 
840 	ASSERT((align & (align - 1)) == 0);
841 
842 	if ((bcp = (bcache_t *)kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) ==
843 	    NULL) {
844 		return (NULL);
845 	}
846 
847 	bcp->size = size;
848 	bcp->align = align;
849 	bcp->alloc = 0;
850 	bcp->destroy = 0;
851 
852 	mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
853 
854 	(void) sprintf(buffer, "%s_buffer_cache", name);
855 	bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
856 	    NULL, NULL, NULL, 0);
857 	(void) sprintf(buffer, "%s_dblk_cache", name);
858 	bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
859 	    DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
860 	    NULL, (void *)bcp, NULL, 0);
861 
862 	return (bcp);
863 }
864 
865 void
866 bcache_destroy(bcache_t *bcp)
867 {
868 	ASSERT(bcp != NULL);
869 
870 	mutex_enter(&bcp->mutex);
871 	if (bcp->alloc == 0) {
872 		kmem_cache_destroy(bcp->dblk_cache);
873 		kmem_cache_destroy(bcp->buffer_cache);
874 		mutex_exit(&bcp->mutex);
875 		mutex_destroy(&bcp->mutex);
876 		kmem_free(bcp, sizeof (bcache_t));
877 	} else {
878 		bcp->destroy++;
879 		mutex_exit(&bcp->mutex);
880 	}
881 }
882 
883 /*ARGSUSED*/
884 mblk_t *
885 bcache_allocb(bcache_t *bcp, uint_t pri)
886 {
887 	dblk_t *dbp;
888 	mblk_t *mp = NULL;
889 
890 	ASSERT(bcp != NULL);
891 
892 	mutex_enter(&bcp->mutex);
893 	if (bcp->destroy != 0) {
894 		mutex_exit(&bcp->mutex);
895 		goto out;
896 	}
897 
898 	if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
899 		mutex_exit(&bcp->mutex);
900 		goto out;
901 	}
902 	bcp->alloc++;
903 	mutex_exit(&bcp->mutex);
904 
905 	ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
906 
907 	mp = dbp->db_mblk;
908 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
909 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
910 	mp->b_rptr = mp->b_wptr = dbp->db_base;
911 	mp->b_queue = NULL;
912 	MBLK_BAND_FLAG_WORD(mp) = 0;
913 	STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
914 out:
915 	FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
916 
917 	return (mp);
918 }
919 
920 static void
921 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
922 {
923 	ASSERT(dbp->db_mblk == mp);
924 	if (dbp->db_fthdr != NULL)
925 		str_ftfree(dbp);
926 
927 	/* set credp and projid to be 'unspecified' before returning to cache */
928 	if (dbp->db_credp != NULL) {
929 		crfree(dbp->db_credp);
930 		dbp->db_credp = NULL;
931 	}
932 	dbp->db_cpid = -1;
933 	dbp->db_struioflag = 0;
934 	dbp->db_struioun.cksum.flags = 0;
935 
936 	kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
937 	kmem_cache_free(dbp->db_cache, dbp);
938 }
939 
940 static mblk_t *
941 allocb_oversize(size_t size, int kmflags)
942 {
943 	mblk_t *mp;
944 	void *buf;
945 
946 	size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
947 	if ((buf = kmem_alloc(size, kmflags)) == NULL)
948 		return (NULL);
949 	if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
950 	    &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
951 		kmem_free(buf, size);
952 
953 	if (mp != NULL)
954 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
955 
956 	return (mp);
957 }
958 
959 mblk_t *
960 allocb_tryhard(size_t target_size)
961 {
962 	size_t size;
963 	mblk_t *bp;
964 
965 	for (size = target_size; size < target_size + 512;
966 	    size += DBLK_CACHE_ALIGN)
967 		if ((bp = allocb(size, BPRI_HI)) != NULL)
968 			return (bp);
969 	allocb_tryhard_fails++;
970 	return (NULL);
971 }
972 
973 /*
974  * This routine is consolidation private for STREAMS internal use
975  * This routine may only be called from sync routines (i.e., not
976  * from put or service procedures).  It is located here (rather
977  * than strsubr.c) so that we don't have to expose all of the
978  * allocb() implementation details in header files.
979  */
980 mblk_t *
981 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
982 {
983 	dblk_t *dbp;
984 	mblk_t *mp;
985 	size_t index;
986 
987 	index = (size -1) >> DBLK_SIZE_SHIFT;
988 
989 	if (flags & STR_NOSIG) {
990 		if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
991 			if (size != 0) {
992 				mp = allocb_oversize(size, KM_SLEEP);
993 				FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
994 				    (uintptr_t)mp);
995 				return (mp);
996 			}
997 			index = 0;
998 		}
999 
1000 		dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
1001 		mp = dbp->db_mblk;
1002 		DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1003 		mp->b_next = mp->b_prev = mp->b_cont = NULL;
1004 		mp->b_rptr = mp->b_wptr = dbp->db_base;
1005 		mp->b_queue = NULL;
1006 		MBLK_BAND_FLAG_WORD(mp) = 0;
1007 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
1008 
1009 		FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
1010 
1011 	} else {
1012 		while ((mp = allocb(size, pri)) == NULL) {
1013 			if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
1014 				return (NULL);
1015 		}
1016 	}
1017 
1018 	return (mp);
1019 }
1020 
1021 /*
1022  * Call function 'func' with 'arg' when a class zero block can
1023  * be allocated with priority 'pri'.
1024  */
1025 bufcall_id_t
1026 esbbcall(uint_t pri, void (*func)(void *), void *arg)
1027 {
1028 	return (bufcall(1, pri, func, arg));
1029 }
1030 
1031 /*
1032  * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
1033  * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
1034  * This provides consistency for all internal allocators of ioctl.
1035  */
1036 mblk_t *
1037 mkiocb(uint_t cmd)
1038 {
1039 	struct iocblk	*ioc;
1040 	mblk_t		*mp;
1041 
1042 	/*
1043 	 * Allocate enough space for any of the ioctl related messages.
1044 	 */
1045 	if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
1046 		return (NULL);
1047 
1048 	bzero(mp->b_rptr, sizeof (union ioctypes));
1049 
1050 	/*
1051 	 * Set the mblk_t information and ptrs correctly.
1052 	 */
1053 	mp->b_wptr += sizeof (struct iocblk);
1054 	mp->b_datap->db_type = M_IOCTL;
1055 
1056 	/*
1057 	 * Fill in the fields.
1058 	 */
1059 	ioc		= (struct iocblk *)mp->b_rptr;
1060 	ioc->ioc_cmd	= cmd;
1061 	ioc->ioc_cr	= kcred;
1062 	ioc->ioc_id	= getiocseqno();
1063 	ioc->ioc_flag	= IOC_NATIVE;
1064 	return (mp);
1065 }
1066 
1067 /*
1068  * test if block of given size can be allocated with a request of
1069  * the given priority.
1070  * 'pri' is no longer used, but is retained for compatibility.
1071  */
1072 /* ARGSUSED */
1073 int
1074 testb(size_t size, uint_t pri)
1075 {
1076 	return ((size + sizeof (dblk_t)) <= kmem_avail());
1077 }
1078 
1079 /*
1080  * Call function 'func' with argument 'arg' when there is a reasonably
1081  * good chance that a block of size 'size' can be allocated.
1082  * 'pri' is no longer used, but is retained for compatibility.
1083  */
1084 /* ARGSUSED */
1085 bufcall_id_t
1086 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
1087 {
1088 	static long bid = 1;	/* always odd to save checking for zero */
1089 	bufcall_id_t bc_id;
1090 	struct strbufcall *bcp;
1091 
1092 	if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
1093 		return (0);
1094 
1095 	bcp->bc_func = func;
1096 	bcp->bc_arg = arg;
1097 	bcp->bc_size = size;
1098 	bcp->bc_next = NULL;
1099 	bcp->bc_executor = NULL;
1100 
1101 	mutex_enter(&strbcall_lock);
1102 	/*
1103 	 * After bcp is linked into strbcalls and strbcall_lock is dropped there
1104 	 * should be no references to bcp since it may be freed by
1105 	 * runbufcalls(). Since bcp_id field is returned, we save its value in
1106 	 * the local var.
1107 	 */
1108 	bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2);	/* keep it odd */
1109 
1110 	/*
1111 	 * add newly allocated stream event to existing
1112 	 * linked list of events.
1113 	 */
1114 	if (strbcalls.bc_head == NULL) {
1115 		strbcalls.bc_head = strbcalls.bc_tail = bcp;
1116 	} else {
1117 		strbcalls.bc_tail->bc_next = bcp;
1118 		strbcalls.bc_tail = bcp;
1119 	}
1120 
1121 	cv_signal(&strbcall_cv);
1122 	mutex_exit(&strbcall_lock);
1123 	return (bc_id);
1124 }
1125 
1126 /*
1127  * Cancel a bufcall request.
1128  */
1129 void
1130 unbufcall(bufcall_id_t id)
1131 {
1132 	strbufcall_t *bcp, *pbcp;
1133 
1134 	mutex_enter(&strbcall_lock);
1135 again:
1136 	pbcp = NULL;
1137 	for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
1138 		if (id == bcp->bc_id)
1139 			break;
1140 		pbcp = bcp;
1141 	}
1142 	if (bcp) {
1143 		if (bcp->bc_executor != NULL) {
1144 			if (bcp->bc_executor != curthread) {
1145 				cv_wait(&bcall_cv, &strbcall_lock);
1146 				goto again;
1147 			}
1148 		} else {
1149 			if (pbcp)
1150 				pbcp->bc_next = bcp->bc_next;
1151 			else
1152 				strbcalls.bc_head = bcp->bc_next;
1153 			if (bcp == strbcalls.bc_tail)
1154 				strbcalls.bc_tail = pbcp;
1155 			kmem_free(bcp, sizeof (strbufcall_t));
1156 		}
1157 	}
1158 	mutex_exit(&strbcall_lock);
1159 }
1160 
1161 /*
1162  * Duplicate a message block by block (uses dupb), returning
1163  * a pointer to the duplicate message.
1164  * Returns a non-NULL value only if the entire message
1165  * was dup'd.
1166  */
1167 mblk_t *
1168 dupmsg(mblk_t *bp)
1169 {
1170 	mblk_t *head, *nbp;
1171 
1172 	if (!bp || !(nbp = head = dupb(bp)))
1173 		return (NULL);
1174 
1175 	while (bp->b_cont) {
1176 		if (!(nbp->b_cont = dupb(bp->b_cont))) {
1177 			freemsg(head);
1178 			return (NULL);
1179 		}
1180 		nbp = nbp->b_cont;
1181 		bp = bp->b_cont;
1182 	}
1183 	return (head);
1184 }
1185 
1186 #define	DUPB_NOLOAN(bp) \
1187 	((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
1188 	copyb((bp)) : dupb((bp)))
1189 
1190 mblk_t *
1191 dupmsg_noloan(mblk_t *bp)
1192 {
1193 	mblk_t *head, *nbp;
1194 
1195 	if (bp == NULL || DB_TYPE(bp) != M_DATA ||
1196 	    ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
1197 		return (NULL);
1198 
1199 	while (bp->b_cont) {
1200 		if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
1201 			freemsg(head);
1202 			return (NULL);
1203 		}
1204 		nbp = nbp->b_cont;
1205 		bp = bp->b_cont;
1206 	}
1207 	return (head);
1208 }
1209 
1210 /*
1211  * Copy data from message and data block to newly allocated message and
1212  * data block. Returns new message block pointer, or NULL if error.
1213  * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
1214  * as in the original even when db_base is not word aligned. (bug 1052877)
1215  */
1216 mblk_t *
1217 copyb(mblk_t *bp)
1218 {
1219 	mblk_t	*nbp;
1220 	dblk_t	*dp, *ndp;
1221 	uchar_t *base;
1222 	size_t	size;
1223 	size_t	unaligned;
1224 
1225 	ASSERT(bp->b_wptr >= bp->b_rptr);
1226 
1227 	dp = bp->b_datap;
1228 	if (dp->db_fthdr != NULL)
1229 		STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
1230 
1231 	/*
1232 	 * Special handling for Multidata message; this should be
1233 	 * removed once a copy-callback routine is made available.
1234 	 */
1235 	if (dp->db_type == M_MULTIDATA) {
1236 		cred_t *cr;
1237 
1238 		if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL)
1239 			return (NULL);
1240 
1241 		nbp->b_flag = bp->b_flag;
1242 		nbp->b_band = bp->b_band;
1243 		ndp = nbp->b_datap;
1244 
1245 		/* See comments below on potential issues. */
1246 		STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1247 
1248 		ASSERT(ndp->db_type == dp->db_type);
1249 		cr = dp->db_credp;
1250 		if (cr != NULL)
1251 			crhold(ndp->db_credp = cr);
1252 		ndp->db_cpid = dp->db_cpid;
1253 		return (nbp);
1254 	}
1255 
1256 	size = dp->db_lim - dp->db_base;
1257 	unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
1258 	if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
1259 		return (NULL);
1260 	nbp->b_flag = bp->b_flag;
1261 	nbp->b_band = bp->b_band;
1262 	ndp = nbp->b_datap;
1263 
1264 	/*
1265 	 * Well, here is a potential issue.  If we are trying to
1266 	 * trace a flow, and we copy the message, we might lose
1267 	 * information about where this message might have been.
1268 	 * So we should inherit the FT data.  On the other hand,
1269 	 * a user might be interested only in alloc to free data.
1270 	 * So I guess the real answer is to provide a tunable.
1271 	 */
1272 	STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1273 
1274 	base = ndp->db_base + unaligned;
1275 	bcopy(dp->db_base, ndp->db_base + unaligned, size);
1276 
1277 	nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
1278 	nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
1279 
1280 	return (nbp);
1281 }
1282 
1283 /*
1284  * Copy data from message to newly allocated message using new
1285  * data blocks.  Returns a pointer to the new message, or NULL if error.
1286  */
1287 mblk_t *
1288 copymsg(mblk_t *bp)
1289 {
1290 	mblk_t *head, *nbp;
1291 
1292 	if (!bp || !(nbp = head = copyb(bp)))
1293 		return (NULL);
1294 
1295 	while (bp->b_cont) {
1296 		if (!(nbp->b_cont = copyb(bp->b_cont))) {
1297 			freemsg(head);
1298 			return (NULL);
1299 		}
1300 		nbp = nbp->b_cont;
1301 		bp = bp->b_cont;
1302 	}
1303 	return (head);
1304 }
1305 
1306 /*
1307  * link a message block to tail of message
1308  */
1309 void
1310 linkb(mblk_t *mp, mblk_t *bp)
1311 {
1312 	ASSERT(mp && bp);
1313 
1314 	for (; mp->b_cont; mp = mp->b_cont)
1315 		;
1316 	mp->b_cont = bp;
1317 }
1318 
1319 /*
1320  * unlink a message block from head of message
1321  * return pointer to new message.
1322  * NULL if message becomes empty.
1323  */
1324 mblk_t *
1325 unlinkb(mblk_t *bp)
1326 {
1327 	mblk_t *bp1;
1328 
1329 	bp1 = bp->b_cont;
1330 	bp->b_cont = NULL;
1331 	return (bp1);
1332 }
1333 
1334 /*
1335  * remove a message block "bp" from message "mp"
1336  *
1337  * Return pointer to new message or NULL if no message remains.
1338  * Return -1 if bp is not found in message.
1339  */
1340 mblk_t *
1341 rmvb(mblk_t *mp, mblk_t *bp)
1342 {
1343 	mblk_t *tmp;
1344 	mblk_t *lastp = NULL;
1345 
1346 	ASSERT(mp && bp);
1347 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
1348 		if (tmp == bp) {
1349 			if (lastp)
1350 				lastp->b_cont = tmp->b_cont;
1351 			else
1352 				mp = tmp->b_cont;
1353 			tmp->b_cont = NULL;
1354 			return (mp);
1355 		}
1356 		lastp = tmp;
1357 	}
1358 	return ((mblk_t *)-1);
1359 }
1360 
1361 /*
1362  * Concatenate and align first len bytes of common
1363  * message type.  Len == -1, means concat everything.
1364  * Returns 1 on success, 0 on failure
1365  * After the pullup, mp points to the pulled up data.
1366  */
1367 int
1368 pullupmsg(mblk_t *mp, ssize_t len)
1369 {
1370 	mblk_t *bp, *b_cont;
1371 	dblk_t *dbp;
1372 	ssize_t n;
1373 
1374 	ASSERT(mp->b_datap->db_ref > 0);
1375 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
1376 
1377 	/*
1378 	 * We won't handle Multidata message, since it contains
1379 	 * metadata which this function has no knowledge of; we
1380 	 * assert on DEBUG, and return failure otherwise.
1381 	 */
1382 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1383 	if (mp->b_datap->db_type == M_MULTIDATA)
1384 		return (0);
1385 
1386 	if (len == -1) {
1387 		if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
1388 			return (1);
1389 		len = xmsgsize(mp);
1390 	} else {
1391 		ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
1392 		ASSERT(first_mblk_len >= 0);
1393 		/*
1394 		 * If the length is less than that of the first mblk,
1395 		 * we want to pull up the message into an aligned mblk.
1396 		 * Though not part of the spec, some callers assume it.
1397 		 */
1398 		if (len <= first_mblk_len) {
1399 			if (str_aligned(mp->b_rptr))
1400 				return (1);
1401 			len = first_mblk_len;
1402 		} else if (xmsgsize(mp) < len)
1403 			return (0);
1404 	}
1405 
1406 	if ((bp = allocb_tmpl(len, mp)) == NULL)
1407 		return (0);
1408 
1409 	dbp = bp->b_datap;
1410 	*bp = *mp;		/* swap mblks so bp heads the old msg... */
1411 	mp->b_datap = dbp;	/* ... and mp heads the new message */
1412 	mp->b_datap->db_mblk = mp;
1413 	bp->b_datap->db_mblk = bp;
1414 	mp->b_rptr = mp->b_wptr = dbp->db_base;
1415 
1416 	do {
1417 		ASSERT(bp->b_datap->db_ref > 0);
1418 		ASSERT(bp->b_wptr >= bp->b_rptr);
1419 		n = MIN(bp->b_wptr - bp->b_rptr, len);
1420 		bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
1421 		mp->b_wptr += n;
1422 		bp->b_rptr += n;
1423 		len -= n;
1424 		if (bp->b_rptr != bp->b_wptr)
1425 			break;
1426 		b_cont = bp->b_cont;
1427 		freeb(bp);
1428 		bp = b_cont;
1429 	} while (len && bp);
1430 
1431 	mp->b_cont = bp;	/* tack on whatever wasn't pulled up */
1432 
1433 	return (1);
1434 }
1435 
1436 /*
1437  * Concatenate and align at least the first len bytes of common message
1438  * type.  Len == -1 means concatenate everything.  The original message is
1439  * unaltered.  Returns a pointer to a new message on success, otherwise
1440  * returns NULL.
1441  */
1442 mblk_t *
1443 msgpullup(mblk_t *mp, ssize_t len)
1444 {
1445 	mblk_t	*newmp;
1446 	ssize_t	totlen;
1447 	ssize_t	n;
1448 
1449 	/*
1450 	 * We won't handle Multidata message, since it contains
1451 	 * metadata which this function has no knowledge of; we
1452 	 * assert on DEBUG, and return failure otherwise.
1453 	 */
1454 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1455 	if (mp->b_datap->db_type == M_MULTIDATA)
1456 		return (NULL);
1457 
1458 	totlen = xmsgsize(mp);
1459 
1460 	if ((len > 0) && (len > totlen))
1461 		return (NULL);
1462 
1463 	/*
1464 	 * Copy all of the first msg type into one new mblk, then dupmsg
1465 	 * and link the rest onto this.
1466 	 */
1467 
1468 	len = totlen;
1469 
1470 	if ((newmp = allocb_tmpl(len, mp)) == NULL)
1471 		return (NULL);
1472 
1473 	newmp->b_flag = mp->b_flag;
1474 	newmp->b_band = mp->b_band;
1475 
1476 	while (len > 0) {
1477 		n = mp->b_wptr - mp->b_rptr;
1478 		ASSERT(n >= 0);		/* allow zero-length mblk_t's */
1479 		if (n > 0)
1480 			bcopy(mp->b_rptr, newmp->b_wptr, n);
1481 		newmp->b_wptr += n;
1482 		len -= n;
1483 		mp = mp->b_cont;
1484 	}
1485 
1486 	if (mp != NULL) {
1487 		newmp->b_cont = dupmsg(mp);
1488 		if (newmp->b_cont == NULL) {
1489 			freemsg(newmp);
1490 			return (NULL);
1491 		}
1492 	}
1493 
1494 	return (newmp);
1495 }
1496 
1497 /*
1498  * Trim bytes from message
1499  *  len > 0, trim from head
1500  *  len < 0, trim from tail
1501  * Returns 1 on success, 0 on failure.
1502  */
1503 int
1504 adjmsg(mblk_t *mp, ssize_t len)
1505 {
1506 	mblk_t *bp;
1507 	mblk_t *save_bp = NULL;
1508 	mblk_t *prev_bp;
1509 	mblk_t *bcont;
1510 	unsigned char type;
1511 	ssize_t n;
1512 	int fromhead;
1513 	int first;
1514 
1515 	ASSERT(mp != NULL);
1516 	/*
1517 	 * We won't handle Multidata message, since it contains
1518 	 * metadata which this function has no knowledge of; we
1519 	 * assert on DEBUG, and return failure otherwise.
1520 	 */
1521 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1522 	if (mp->b_datap->db_type == M_MULTIDATA)
1523 		return (0);
1524 
1525 	if (len < 0) {
1526 		fromhead = 0;
1527 		len = -len;
1528 	} else {
1529 		fromhead = 1;
1530 	}
1531 
1532 	if (xmsgsize(mp) < len)
1533 		return (0);
1534 
1535 
1536 	if (fromhead) {
1537 		first = 1;
1538 		while (len) {
1539 			ASSERT(mp->b_wptr >= mp->b_rptr);
1540 			n = MIN(mp->b_wptr - mp->b_rptr, len);
1541 			mp->b_rptr += n;
1542 			len -= n;
1543 
1544 			/*
1545 			 * If this is not the first zero length
1546 			 * message remove it
1547 			 */
1548 			if (!first && (mp->b_wptr == mp->b_rptr)) {
1549 				bcont = mp->b_cont;
1550 				freeb(mp);
1551 				mp = save_bp->b_cont = bcont;
1552 			} else {
1553 				save_bp = mp;
1554 				mp = mp->b_cont;
1555 			}
1556 			first = 0;
1557 		}
1558 	} else {
1559 		type = mp->b_datap->db_type;
1560 		while (len) {
1561 			bp = mp;
1562 			save_bp = NULL;
1563 
1564 			/*
1565 			 * Find the last message of same type
1566 			 */
1567 
1568 			while (bp && bp->b_datap->db_type == type) {
1569 				ASSERT(bp->b_wptr >= bp->b_rptr);
1570 				prev_bp = save_bp;
1571 				save_bp = bp;
1572 				bp = bp->b_cont;
1573 			}
1574 			if (save_bp == NULL)
1575 				break;
1576 			n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
1577 			save_bp->b_wptr -= n;
1578 			len -= n;
1579 
1580 			/*
1581 			 * If this is not the first message
1582 			 * and we have taken away everything
1583 			 * from this message, remove it
1584 			 */
1585 
1586 			if ((save_bp != mp) &&
1587 			    (save_bp->b_wptr == save_bp->b_rptr)) {
1588 				bcont = save_bp->b_cont;
1589 				freeb(save_bp);
1590 				prev_bp->b_cont = bcont;
1591 			}
1592 		}
1593 	}
1594 	return (1);
1595 }
1596 
1597 /*
1598  * get number of data bytes in message
1599  */
1600 size_t
1601 msgdsize(mblk_t *bp)
1602 {
1603 	size_t count = 0;
1604 
1605 	for (; bp; bp = bp->b_cont)
1606 		if (bp->b_datap->db_type == M_DATA) {
1607 			ASSERT(bp->b_wptr >= bp->b_rptr);
1608 			count += bp->b_wptr - bp->b_rptr;
1609 		}
1610 	return (count);
1611 }
1612 
1613 /*
1614  * Get a message off head of queue
1615  *
1616  * If queue has no buffers then mark queue
1617  * with QWANTR. (queue wants to be read by
1618  * someone when data becomes available)
1619  *
1620  * If there is something to take off then do so.
1621  * If queue falls below hi water mark turn off QFULL
1622  * flag.  Decrement weighted count of queue.
1623  * Also turn off QWANTR because queue is being read.
1624  *
1625  * The queue count is maintained on a per-band basis.
1626  * Priority band 0 (normal messages) uses q_count,
1627  * q_lowat, etc.  Non-zero priority bands use the
1628  * fields in their respective qband structures
1629  * (qb_count, qb_lowat, etc.)  All messages appear
1630  * on the same list, linked via their b_next pointers.
1631  * q_first is the head of the list.  q_count does
1632  * not reflect the size of all the messages on the
1633  * queue.  It only reflects those messages in the
1634  * normal band of flow.  The one exception to this
1635  * deals with high priority messages.  They are in
1636  * their own conceptual "band", but are accounted
1637  * against q_count.
1638  *
1639  * If queue count is below the lo water mark and QWANTW
1640  * is set, enable the closest backq which has a service
1641  * procedure and turn off the QWANTW flag.
1642  *
1643  * getq could be built on top of rmvq, but isn't because
1644  * of performance considerations.
1645  *
1646  * A note on the use of q_count and q_mblkcnt:
1647  *   q_count is the traditional byte count for messages that
1648  *   have been put on a queue.  Documentation tells us that
1649  *   we shouldn't rely on that count, but some drivers/modules
1650  *   do.  What was needed, however, is a mechanism to prevent
1651  *   runaway streams from consuming all of the resources,
1652  *   and particularly be able to flow control zero-length
1653  *   messages.  q_mblkcnt is used for this purpose.  It
1654  *   counts the number of mblk's that are being put on
1655  *   the queue.  The intention here, is that each mblk should
1656  *   contain one byte of data and, for the purpose of
1657  *   flow-control, logically does.  A queue will become
1658  *   full when EITHER of these values (q_count and q_mblkcnt)
1659  *   reach the highwater mark.  It will clear when BOTH
1660  *   of them drop below the highwater mark.  And it will
1661  *   backenable when BOTH of them drop below the lowwater
1662  *   mark.
1663  *   With this algorithm, a driver/module might be able
1664  *   to find a reasonably accurate q_count, and the
1665  *   framework can still try and limit resource usage.
1666  */
1667 mblk_t *
1668 getq(queue_t *q)
1669 {
1670 	mblk_t *bp;
1671 	uchar_t band = 0;
1672 
1673 	bp = getq_noenab(q, 0);
1674 	if (bp != NULL)
1675 		band = bp->b_band;
1676 
1677 	/*
1678 	 * Inlined from qbackenable().
1679 	 * Quick check without holding the lock.
1680 	 */
1681 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1682 		return (bp);
1683 
1684 	qbackenable(q, band);
1685 	return (bp);
1686 }
1687 
1688 /*
1689  * Calculate number of data bytes in a single data message block taking
1690  * multidata messages into account.
1691  */
1692 
1693 #define	ADD_MBLK_SIZE(mp, size) 					\
1694 	if (DB_TYPE(mp) != M_MULTIDATA) {				\
1695 		(size) += MBLKL(mp);					\
1696 	} else {							\
1697 		uint_t	pinuse;						\
1698 									\
1699 		mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse);	\
1700 		(size) += pinuse;					\
1701 	}
1702 
1703 /*
1704  * Returns the number of bytes in a message (a message is defined as a
1705  * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we
1706  * also return the number of distinct mblks in the message.
1707  */
1708 int
1709 mp_cont_len(mblk_t *bp, int *mblkcnt)
1710 {
1711 	mblk_t	*mp;
1712 	int	mblks = 0;
1713 	int	bytes = 0;
1714 
1715 	for (mp = bp; mp != NULL; mp = mp->b_cont) {
1716 		ADD_MBLK_SIZE(mp, bytes);
1717 		mblks++;
1718 	}
1719 
1720 	if (mblkcnt != NULL)
1721 		*mblkcnt = mblks;
1722 
1723 	return (bytes);
1724 }
1725 
1726 /*
1727  * Like getq() but does not backenable.  This is used by the stream
1728  * head when a putback() is likely.  The caller must call qbackenable()
1729  * after it is done with accessing the queue.
1730  * The rbytes arguments to getq_noneab() allows callers to specify a
1731  * the maximum number of bytes to return. If the current amount on the
1732  * queue is less than this then the entire message will be returned.
1733  * A value of 0 returns the entire message and is equivalent to the old
1734  * default behaviour prior to the addition of the rbytes argument.
1735  */
1736 mblk_t *
1737 getq_noenab(queue_t *q, ssize_t rbytes)
1738 {
1739 	mblk_t *bp, *mp1;
1740 	mblk_t *mp2 = NULL;
1741 	qband_t *qbp;
1742 	kthread_id_t freezer;
1743 	int	bytecnt = 0, mblkcnt = 0;
1744 
1745 	/* freezestr should allow its caller to call getq/putq */
1746 	freezer = STREAM(q)->sd_freezer;
1747 	if (freezer == curthread) {
1748 		ASSERT(frozenstr(q));
1749 		ASSERT(MUTEX_HELD(QLOCK(q)));
1750 	} else
1751 		mutex_enter(QLOCK(q));
1752 
1753 	if ((bp = q->q_first) == 0) {
1754 		q->q_flag |= QWANTR;
1755 	} else {
1756 		/*
1757 		 * If the caller supplied a byte threshold and there is
1758 		 * more than this amount on the queue then break up the
1759 		 * the message appropriately.  We can only safely do
1760 		 * this for M_DATA messages.
1761 		 */
1762 		if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) &&
1763 		    (q->q_count > rbytes)) {
1764 			/*
1765 			 * Inline version of mp_cont_len() which terminates
1766 			 * when we meet or exceed rbytes.
1767 			 */
1768 			for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) {
1769 				mblkcnt++;
1770 				ADD_MBLK_SIZE(mp1, bytecnt);
1771 				if (bytecnt  >= rbytes)
1772 					break;
1773 			}
1774 			/*
1775 			 * We need to account for the following scenarios:
1776 			 *
1777 			 * 1) Too much data in the first message:
1778 			 *	mp1 will be the mblk which puts us over our
1779 			 *	byte limit.
1780 			 * 2) Not enough data in the first message:
1781 			 *	mp1 will be NULL.
1782 			 * 3) Exactly the right amount of data contained within
1783 			 *    whole mblks:
1784 			 *	mp1->b_cont will be where we break the message.
1785 			 */
1786 			if (bytecnt > rbytes) {
1787 				/*
1788 				 * Dup/copy mp1 and put what we don't need
1789 				 * back onto the queue. Adjust the read/write
1790 				 * and continuation pointers appropriately
1791 				 * and decrement the current mblk count to
1792 				 * reflect we are putting an mblk back onto
1793 				 * the queue.
1794 				 * When adjusting the message pointers, it's
1795 				 * OK to use the existing bytecnt and the
1796 				 * requested amount (rbytes) to calculate the
1797 				 * the new write offset (b_wptr) of what we
1798 				 * are taking. However, we  cannot use these
1799 				 * values when calculating the read offset of
1800 				 * the mblk we are putting back on the queue.
1801 				 * This is because the begining (b_rptr) of the
1802 				 * mblk represents some arbitrary point within
1803 				 * the message.
1804 				 * It's simplest to do this by advancing b_rptr
1805 				 * by the new length of mp1 as we don't have to
1806 				 * remember any intermediate state.
1807 				 */
1808 				ASSERT(mp1 != NULL);
1809 				mblkcnt--;
1810 				if ((mp2 = dupb(mp1)) == NULL &&
1811 				    (mp2 = copyb(mp1)) == NULL) {
1812 					bytecnt = mblkcnt = 0;
1813 					goto dup_failed;
1814 				}
1815 				mp2->b_cont = mp1->b_cont;
1816 				mp1->b_wptr -= bytecnt - rbytes;
1817 				mp2->b_rptr += mp1->b_wptr - mp1->b_rptr;
1818 				mp1->b_cont = NULL;
1819 				bytecnt = rbytes;
1820 			} else {
1821 				/*
1822 				 * Either there is not enough data in the first
1823 				 * message or there is no excess data to deal
1824 				 * with. If mp1 is NULL, we are taking the
1825 				 * whole message. No need to do anything.
1826 				 * Otherwise we assign mp1->b_cont to mp2 as
1827 				 * we will be putting this back onto the head of
1828 				 * the queue.
1829 				 */
1830 				if (mp1 != NULL) {
1831 					mp2 = mp1->b_cont;
1832 					mp1->b_cont = NULL;
1833 				}
1834 			}
1835 			/*
1836 			 * If mp2 is not NULL then we have part of the message
1837 			 * to put back onto the queue.
1838 			 */
1839 			if (mp2 != NULL) {
1840 				if ((mp2->b_next = bp->b_next) == NULL)
1841 					q->q_last = mp2;
1842 				else
1843 					bp->b_next->b_prev = mp2;
1844 				q->q_first = mp2;
1845 			} else {
1846 				if ((q->q_first = bp->b_next) == NULL)
1847 					q->q_last = NULL;
1848 				else
1849 					q->q_first->b_prev = NULL;
1850 			}
1851 		} else {
1852 			/*
1853 			 * Either no byte threshold was supplied, there is
1854 			 * not enough on the queue or we failed to
1855 			 * duplicate/copy a data block. In these cases we
1856 			 * just take the entire first message.
1857 			 */
1858 dup_failed:
1859 			bytecnt = mp_cont_len(bp, &mblkcnt);
1860 			if ((q->q_first = bp->b_next) == NULL)
1861 				q->q_last = NULL;
1862 			else
1863 				q->q_first->b_prev = NULL;
1864 		}
1865 		if (bp->b_band == 0) {
1866 			q->q_count -= bytecnt;
1867 			q->q_mblkcnt -= mblkcnt;
1868 			if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
1869 			    (q->q_mblkcnt < q->q_hiwat))) {
1870 				q->q_flag &= ~QFULL;
1871 			}
1872 		} else {
1873 			int i;
1874 
1875 			ASSERT(bp->b_band <= q->q_nband);
1876 			ASSERT(q->q_bandp != NULL);
1877 			ASSERT(MUTEX_HELD(QLOCK(q)));
1878 			qbp = q->q_bandp;
1879 			i = bp->b_band;
1880 			while (--i > 0)
1881 				qbp = qbp->qb_next;
1882 			if (qbp->qb_first == qbp->qb_last) {
1883 				qbp->qb_first = NULL;
1884 				qbp->qb_last = NULL;
1885 			} else {
1886 				qbp->qb_first = bp->b_next;
1887 			}
1888 			qbp->qb_count -= bytecnt;
1889 			qbp->qb_mblkcnt -= mblkcnt;
1890 			if (qbp->qb_mblkcnt == 0 ||
1891 			    ((qbp->qb_count < qbp->qb_hiwat) &&
1892 			    (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
1893 				qbp->qb_flag &= ~QB_FULL;
1894 			}
1895 		}
1896 		q->q_flag &= ~QWANTR;
1897 		bp->b_next = NULL;
1898 		bp->b_prev = NULL;
1899 	}
1900 	if (freezer != curthread)
1901 		mutex_exit(QLOCK(q));
1902 
1903 	STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL);
1904 
1905 	return (bp);
1906 }
1907 
1908 /*
1909  * Determine if a backenable is needed after removing a message in the
1910  * specified band.
1911  * NOTE: This routine assumes that something like getq_noenab() has been
1912  * already called.
1913  *
1914  * For the read side it is ok to hold sd_lock across calling this (and the
1915  * stream head often does).
1916  * But for the write side strwakeq might be invoked and it acquires sd_lock.
1917  */
1918 void
1919 qbackenable(queue_t *q, uchar_t band)
1920 {
1921 	int backenab = 0;
1922 	qband_t *qbp;
1923 	kthread_id_t freezer;
1924 
1925 	ASSERT(q);
1926 	ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
1927 
1928 	/*
1929 	 * Quick check without holding the lock.
1930 	 * OK since after getq() has lowered the q_count these flags
1931 	 * would not change unless either the qbackenable() is done by
1932 	 * another thread (which is ok) or the queue has gotten QFULL
1933 	 * in which case another backenable will take place when the queue
1934 	 * drops below q_lowat.
1935 	 */
1936 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1937 		return;
1938 
1939 	/* freezestr should allow its caller to call getq/putq */
1940 	freezer = STREAM(q)->sd_freezer;
1941 	if (freezer == curthread) {
1942 		ASSERT(frozenstr(q));
1943 		ASSERT(MUTEX_HELD(QLOCK(q)));
1944 	} else
1945 		mutex_enter(QLOCK(q));
1946 
1947 	if (band == 0) {
1948 		if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
1949 		    q->q_mblkcnt < q->q_lowat)) {
1950 			backenab = q->q_flag & (QWANTW|QWANTWSYNC);
1951 		}
1952 	} else {
1953 		int i;
1954 
1955 		ASSERT((unsigned)band <= q->q_nband);
1956 		ASSERT(q->q_bandp != NULL);
1957 
1958 		qbp = q->q_bandp;
1959 		i = band;
1960 		while (--i > 0)
1961 			qbp = qbp->qb_next;
1962 
1963 		if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
1964 		    qbp->qb_mblkcnt < qbp->qb_lowat)) {
1965 			backenab = qbp->qb_flag & QB_WANTW;
1966 		}
1967 	}
1968 
1969 	if (backenab == 0) {
1970 		if (freezer != curthread)
1971 			mutex_exit(QLOCK(q));
1972 		return;
1973 	}
1974 
1975 	/* Have to drop the lock across strwakeq and backenable */
1976 	if (backenab & QWANTWSYNC)
1977 		q->q_flag &= ~QWANTWSYNC;
1978 	if (backenab & (QWANTW|QB_WANTW)) {
1979 		if (band != 0)
1980 			qbp->qb_flag &= ~QB_WANTW;
1981 		else {
1982 			q->q_flag &= ~QWANTW;
1983 		}
1984 	}
1985 
1986 	if (freezer != curthread)
1987 		mutex_exit(QLOCK(q));
1988 
1989 	if (backenab & QWANTWSYNC)
1990 		strwakeq(q, QWANTWSYNC);
1991 	if (backenab & (QWANTW|QB_WANTW))
1992 		backenable(q, band);
1993 }
1994 
1995 /*
1996  * Remove a message from a queue.  The queue count and other
1997  * flow control parameters are adjusted and the back queue
1998  * enabled if necessary.
1999  *
2000  * rmvq can be called with the stream frozen, but other utility functions
2001  * holding QLOCK, and by streams modules without any locks/frozen.
2002  */
2003 void
2004 rmvq(queue_t *q, mblk_t *mp)
2005 {
2006 	ASSERT(mp != NULL);
2007 
2008 	rmvq_noenab(q, mp);
2009 	if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
2010 		/*
2011 		 * qbackenable can handle a frozen stream but not a "random"
2012 		 * qlock being held. Drop lock across qbackenable.
2013 		 */
2014 		mutex_exit(QLOCK(q));
2015 		qbackenable(q, mp->b_band);
2016 		mutex_enter(QLOCK(q));
2017 	} else {
2018 		qbackenable(q, mp->b_band);
2019 	}
2020 }
2021 
2022 /*
2023  * Like rmvq() but without any backenabling.
2024  * This exists to handle SR_CONSOL_DATA in strrput().
2025  */
2026 void
2027 rmvq_noenab(queue_t *q, mblk_t *mp)
2028 {
2029 	int i;
2030 	qband_t *qbp = NULL;
2031 	kthread_id_t freezer;
2032 	int	bytecnt = 0, mblkcnt = 0;
2033 
2034 	freezer = STREAM(q)->sd_freezer;
2035 	if (freezer == curthread) {
2036 		ASSERT(frozenstr(q));
2037 		ASSERT(MUTEX_HELD(QLOCK(q)));
2038 	} else if (MUTEX_HELD(QLOCK(q))) {
2039 		/* Don't drop lock on exit */
2040 		freezer = curthread;
2041 	} else
2042 		mutex_enter(QLOCK(q));
2043 
2044 	ASSERT(mp->b_band <= q->q_nband);
2045 	if (mp->b_band != 0) {		/* Adjust band pointers */
2046 		ASSERT(q->q_bandp != NULL);
2047 		qbp = q->q_bandp;
2048 		i = mp->b_band;
2049 		while (--i > 0)
2050 			qbp = qbp->qb_next;
2051 		if (mp == qbp->qb_first) {
2052 			if (mp->b_next && mp->b_band == mp->b_next->b_band)
2053 				qbp->qb_first = mp->b_next;
2054 			else
2055 				qbp->qb_first = NULL;
2056 		}
2057 		if (mp == qbp->qb_last) {
2058 			if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
2059 				qbp->qb_last = mp->b_prev;
2060 			else
2061 				qbp->qb_last = NULL;
2062 		}
2063 	}
2064 
2065 	/*
2066 	 * Remove the message from the list.
2067 	 */
2068 	if (mp->b_prev)
2069 		mp->b_prev->b_next = mp->b_next;
2070 	else
2071 		q->q_first = mp->b_next;
2072 	if (mp->b_next)
2073 		mp->b_next->b_prev = mp->b_prev;
2074 	else
2075 		q->q_last = mp->b_prev;
2076 	mp->b_next = NULL;
2077 	mp->b_prev = NULL;
2078 
2079 	/* Get the size of the message for q_count accounting */
2080 	bytecnt = mp_cont_len(mp, &mblkcnt);
2081 
2082 	if (mp->b_band == 0) {		/* Perform q_count accounting */
2083 		q->q_count -= bytecnt;
2084 		q->q_mblkcnt -= mblkcnt;
2085 		if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2086 		    (q->q_mblkcnt < q->q_hiwat))) {
2087 			q->q_flag &= ~QFULL;
2088 		}
2089 	} else {			/* Perform qb_count accounting */
2090 		qbp->qb_count -= bytecnt;
2091 		qbp->qb_mblkcnt -= mblkcnt;
2092 		if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) &&
2093 		    (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2094 			qbp->qb_flag &= ~QB_FULL;
2095 		}
2096 	}
2097 	if (freezer != curthread)
2098 		mutex_exit(QLOCK(q));
2099 
2100 	STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL);
2101 }
2102 
2103 /*
2104  * Empty a queue.
2105  * If flag is set, remove all messages.  Otherwise, remove
2106  * only non-control messages.  If queue falls below its low
2107  * water mark, and QWANTW is set, enable the nearest upstream
2108  * service procedure.
2109  *
2110  * Historical note: when merging the M_FLUSH code in strrput with this
2111  * code one difference was discovered. flushq did not have a check
2112  * for q_lowat == 0 in the backenabling test.
2113  *
2114  * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
2115  * if one exists on the queue.
2116  */
2117 void
2118 flushq_common(queue_t *q, int flag, int pcproto_flag)
2119 {
2120 	mblk_t *mp, *nmp;
2121 	qband_t *qbp;
2122 	int backenab = 0;
2123 	unsigned char bpri;
2124 	unsigned char	qbf[NBAND];	/* band flushing backenable flags */
2125 
2126 	if (q->q_first == NULL)
2127 		return;
2128 
2129 	mutex_enter(QLOCK(q));
2130 	mp = q->q_first;
2131 	q->q_first = NULL;
2132 	q->q_last = NULL;
2133 	q->q_count = 0;
2134 	q->q_mblkcnt = 0;
2135 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2136 		qbp->qb_first = NULL;
2137 		qbp->qb_last = NULL;
2138 		qbp->qb_count = 0;
2139 		qbp->qb_mblkcnt = 0;
2140 		qbp->qb_flag &= ~QB_FULL;
2141 	}
2142 	q->q_flag &= ~QFULL;
2143 	mutex_exit(QLOCK(q));
2144 	while (mp) {
2145 		nmp = mp->b_next;
2146 		mp->b_next = mp->b_prev = NULL;
2147 
2148 		STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL);
2149 
2150 		if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
2151 			(void) putq(q, mp);
2152 		else if (flag || datamsg(mp->b_datap->db_type))
2153 			freemsg(mp);
2154 		else
2155 			(void) putq(q, mp);
2156 		mp = nmp;
2157 	}
2158 	bpri = 1;
2159 	mutex_enter(QLOCK(q));
2160 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2161 		if ((qbp->qb_flag & QB_WANTW) &&
2162 		    (((qbp->qb_count < qbp->qb_lowat) &&
2163 		    (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
2164 		    qbp->qb_lowat == 0)) {
2165 			qbp->qb_flag &= ~QB_WANTW;
2166 			backenab = 1;
2167 			qbf[bpri] = 1;
2168 		} else
2169 			qbf[bpri] = 0;
2170 		bpri++;
2171 	}
2172 	ASSERT(bpri == (unsigned char)(q->q_nband + 1));
2173 	if ((q->q_flag & QWANTW) &&
2174 	    (((q->q_count < q->q_lowat) &&
2175 	    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2176 		q->q_flag &= ~QWANTW;
2177 		backenab = 1;
2178 		qbf[0] = 1;
2179 	} else
2180 		qbf[0] = 0;
2181 
2182 	/*
2183 	 * If any band can now be written to, and there is a writer
2184 	 * for that band, then backenable the closest service procedure.
2185 	 */
2186 	if (backenab) {
2187 		mutex_exit(QLOCK(q));
2188 		for (bpri = q->q_nband; bpri != 0; bpri--)
2189 			if (qbf[bpri])
2190 				backenable(q, bpri);
2191 		if (qbf[0])
2192 			backenable(q, 0);
2193 	} else
2194 		mutex_exit(QLOCK(q));
2195 }
2196 
2197 /*
2198  * The real flushing takes place in flushq_common. This is done so that
2199  * a flag which specifies whether or not M_PCPROTO messages should be flushed
2200  * or not. Currently the only place that uses this flag is the stream head.
2201  */
2202 void
2203 flushq(queue_t *q, int flag)
2204 {
2205 	flushq_common(q, flag, 0);
2206 }
2207 
2208 /*
2209  * Flush the queue of messages of the given priority band.
2210  * There is some duplication of code between flushq and flushband.
2211  * This is because we want to optimize the code as much as possible.
2212  * The assumption is that there will be more messages in the normal
2213  * (priority 0) band than in any other.
2214  *
2215  * Historical note: when merging the M_FLUSH code in strrput with this
2216  * code one difference was discovered. flushband had an extra check for
2217  * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
2218  * case. That check does not match the man page for flushband and was not
2219  * in the strrput flush code hence it was removed.
2220  */
2221 void
2222 flushband(queue_t *q, unsigned char pri, int flag)
2223 {
2224 	mblk_t *mp;
2225 	mblk_t *nmp;
2226 	mblk_t *last;
2227 	qband_t *qbp;
2228 	int band;
2229 
2230 	ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
2231 	if (pri > q->q_nband) {
2232 		return;
2233 	}
2234 	mutex_enter(QLOCK(q));
2235 	if (pri == 0) {
2236 		mp = q->q_first;
2237 		q->q_first = NULL;
2238 		q->q_last = NULL;
2239 		q->q_count = 0;
2240 		q->q_mblkcnt = 0;
2241 		for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2242 			qbp->qb_first = NULL;
2243 			qbp->qb_last = NULL;
2244 			qbp->qb_count = 0;
2245 			qbp->qb_mblkcnt = 0;
2246 			qbp->qb_flag &= ~QB_FULL;
2247 		}
2248 		q->q_flag &= ~QFULL;
2249 		mutex_exit(QLOCK(q));
2250 		while (mp) {
2251 			nmp = mp->b_next;
2252 			mp->b_next = mp->b_prev = NULL;
2253 			if ((mp->b_band == 0) &&
2254 			    ((flag == FLUSHALL) ||
2255 			    datamsg(mp->b_datap->db_type)))
2256 				freemsg(mp);
2257 			else
2258 				(void) putq(q, mp);
2259 			mp = nmp;
2260 		}
2261 		mutex_enter(QLOCK(q));
2262 		if ((q->q_flag & QWANTW) &&
2263 		    (((q->q_count < q->q_lowat) &&
2264 		    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2265 			q->q_flag &= ~QWANTW;
2266 			mutex_exit(QLOCK(q));
2267 
2268 			backenable(q, pri);
2269 		} else
2270 			mutex_exit(QLOCK(q));
2271 	} else {	/* pri != 0 */
2272 		boolean_t flushed = B_FALSE;
2273 		band = pri;
2274 
2275 		ASSERT(MUTEX_HELD(QLOCK(q)));
2276 		qbp = q->q_bandp;
2277 		while (--band > 0)
2278 			qbp = qbp->qb_next;
2279 		mp = qbp->qb_first;
2280 		if (mp == NULL) {
2281 			mutex_exit(QLOCK(q));
2282 			return;
2283 		}
2284 		last = qbp->qb_last->b_next;
2285 		/*
2286 		 * rmvq_noenab() and freemsg() are called for each mblk that
2287 		 * meets the criteria.  The loop is executed until the last
2288 		 * mblk has been processed.
2289 		 */
2290 		while (mp != last) {
2291 			ASSERT(mp->b_band == pri);
2292 			nmp = mp->b_next;
2293 			if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
2294 				rmvq_noenab(q, mp);
2295 				freemsg(mp);
2296 				flushed = B_TRUE;
2297 			}
2298 			mp = nmp;
2299 		}
2300 		mutex_exit(QLOCK(q));
2301 
2302 		/*
2303 		 * If any mblk(s) has been freed, we know that qbackenable()
2304 		 * will need to be called.
2305 		 */
2306 		if (flushed)
2307 			qbackenable(q, pri);
2308 	}
2309 }
2310 
2311 /*
2312  * Return 1 if the queue is not full.  If the queue is full, return
2313  * 0 (may not put message) and set QWANTW flag (caller wants to write
2314  * to the queue).
2315  */
2316 int
2317 canput(queue_t *q)
2318 {
2319 	TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
2320 
2321 	/* this is for loopback transports, they should not do a canput */
2322 	ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
2323 
2324 	/* Find next forward module that has a service procedure */
2325 	q = q->q_nfsrv;
2326 
2327 	if (!(q->q_flag & QFULL)) {
2328 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2329 		return (1);
2330 	}
2331 	mutex_enter(QLOCK(q));
2332 	if (q->q_flag & QFULL) {
2333 		q->q_flag |= QWANTW;
2334 		mutex_exit(QLOCK(q));
2335 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
2336 		return (0);
2337 	}
2338 	mutex_exit(QLOCK(q));
2339 	TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2340 	return (1);
2341 }
2342 
2343 /*
2344  * This is the new canput for use with priority bands.  Return 1 if the
2345  * band is not full.  If the band is full, return 0 (may not put message)
2346  * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
2347  * write to the queue).
2348  */
2349 int
2350 bcanput(queue_t *q, unsigned char pri)
2351 {
2352 	qband_t *qbp;
2353 
2354 	TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
2355 	if (!q)
2356 		return (0);
2357 
2358 	/* Find next forward module that has a service procedure */
2359 	q = q->q_nfsrv;
2360 
2361 	mutex_enter(QLOCK(q));
2362 	if (pri == 0) {
2363 		if (q->q_flag & QFULL) {
2364 			q->q_flag |= QWANTW;
2365 			mutex_exit(QLOCK(q));
2366 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2367 			    "bcanput:%p %X %d", q, pri, 0);
2368 			return (0);
2369 		}
2370 	} else {	/* pri != 0 */
2371 		if (pri > q->q_nband) {
2372 			/*
2373 			 * No band exists yet, so return success.
2374 			 */
2375 			mutex_exit(QLOCK(q));
2376 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2377 			    "bcanput:%p %X %d", q, pri, 1);
2378 			return (1);
2379 		}
2380 		qbp = q->q_bandp;
2381 		while (--pri)
2382 			qbp = qbp->qb_next;
2383 		if (qbp->qb_flag & QB_FULL) {
2384 			qbp->qb_flag |= QB_WANTW;
2385 			mutex_exit(QLOCK(q));
2386 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2387 			    "bcanput:%p %X %d", q, pri, 0);
2388 			return (0);
2389 		}
2390 	}
2391 	mutex_exit(QLOCK(q));
2392 	TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2393 	    "bcanput:%p %X %d", q, pri, 1);
2394 	return (1);
2395 }
2396 
2397 /*
2398  * Put a message on a queue.
2399  *
2400  * Messages are enqueued on a priority basis.  The priority classes
2401  * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
2402  * and B_NORMAL (type < QPCTL && band == 0).
2403  *
2404  * Add appropriate weighted data block sizes to queue count.
2405  * If queue hits high water mark then set QFULL flag.
2406  *
2407  * If QNOENAB is not set (putq is allowed to enable the queue),
2408  * enable the queue only if the message is PRIORITY,
2409  * or the QWANTR flag is set (indicating that the service procedure
2410  * is ready to read the queue.  This implies that a service
2411  * procedure must NEVER put a high priority message back on its own
2412  * queue, as this would result in an infinite loop (!).
2413  */
2414 int
2415 putq(queue_t *q, mblk_t *bp)
2416 {
2417 	mblk_t *tmp;
2418 	qband_t *qbp = NULL;
2419 	int mcls = (int)queclass(bp);
2420 	kthread_id_t freezer;
2421 	int	bytecnt = 0, mblkcnt = 0;
2422 
2423 	freezer = STREAM(q)->sd_freezer;
2424 	if (freezer == curthread) {
2425 		ASSERT(frozenstr(q));
2426 		ASSERT(MUTEX_HELD(QLOCK(q)));
2427 	} else
2428 		mutex_enter(QLOCK(q));
2429 
2430 	/*
2431 	 * Make sanity checks and if qband structure is not yet
2432 	 * allocated, do so.
2433 	 */
2434 	if (mcls == QPCTL) {
2435 		if (bp->b_band != 0)
2436 			bp->b_band = 0;		/* force to be correct */
2437 	} else if (bp->b_band != 0) {
2438 		int i;
2439 		qband_t **qbpp;
2440 
2441 		if (bp->b_band > q->q_nband) {
2442 
2443 			/*
2444 			 * The qband structure for this priority band is
2445 			 * not on the queue yet, so we have to allocate
2446 			 * one on the fly.  It would be wasteful to
2447 			 * associate the qband structures with every
2448 			 * queue when the queues are allocated.  This is
2449 			 * because most queues will only need the normal
2450 			 * band of flow which can be described entirely
2451 			 * by the queue itself.
2452 			 */
2453 			qbpp = &q->q_bandp;
2454 			while (*qbpp)
2455 				qbpp = &(*qbpp)->qb_next;
2456 			while (bp->b_band > q->q_nband) {
2457 				if ((*qbpp = allocband()) == NULL) {
2458 					if (freezer != curthread)
2459 						mutex_exit(QLOCK(q));
2460 					return (0);
2461 				}
2462 				(*qbpp)->qb_hiwat = q->q_hiwat;
2463 				(*qbpp)->qb_lowat = q->q_lowat;
2464 				q->q_nband++;
2465 				qbpp = &(*qbpp)->qb_next;
2466 			}
2467 		}
2468 		ASSERT(MUTEX_HELD(QLOCK(q)));
2469 		qbp = q->q_bandp;
2470 		i = bp->b_band;
2471 		while (--i)
2472 			qbp = qbp->qb_next;
2473 	}
2474 
2475 	/*
2476 	 * If queue is empty, add the message and initialize the pointers.
2477 	 * Otherwise, adjust message pointers and queue pointers based on
2478 	 * the type of the message and where it belongs on the queue.  Some
2479 	 * code is duplicated to minimize the number of conditionals and
2480 	 * hopefully minimize the amount of time this routine takes.
2481 	 */
2482 	if (!q->q_first) {
2483 		bp->b_next = NULL;
2484 		bp->b_prev = NULL;
2485 		q->q_first = bp;
2486 		q->q_last = bp;
2487 		if (qbp) {
2488 			qbp->qb_first = bp;
2489 			qbp->qb_last = bp;
2490 		}
2491 	} else if (!qbp) {	/* bp->b_band == 0 */
2492 
2493 		/*
2494 		 * If queue class of message is less than or equal to
2495 		 * that of the last one on the queue, tack on to the end.
2496 		 */
2497 		tmp = q->q_last;
2498 		if (mcls <= (int)queclass(tmp)) {
2499 			bp->b_next = NULL;
2500 			bp->b_prev = tmp;
2501 			tmp->b_next = bp;
2502 			q->q_last = bp;
2503 		} else {
2504 			tmp = q->q_first;
2505 			while ((int)queclass(tmp) >= mcls)
2506 				tmp = tmp->b_next;
2507 
2508 			/*
2509 			 * Insert bp before tmp.
2510 			 */
2511 			bp->b_next = tmp;
2512 			bp->b_prev = tmp->b_prev;
2513 			if (tmp->b_prev)
2514 				tmp->b_prev->b_next = bp;
2515 			else
2516 				q->q_first = bp;
2517 			tmp->b_prev = bp;
2518 		}
2519 	} else {		/* bp->b_band != 0 */
2520 		if (qbp->qb_first) {
2521 			tmp = qbp->qb_last;
2522 
2523 			/*
2524 			 * Insert bp after the last message in this band.
2525 			 */
2526 			bp->b_next = tmp->b_next;
2527 			if (tmp->b_next)
2528 				tmp->b_next->b_prev = bp;
2529 			else
2530 				q->q_last = bp;
2531 			bp->b_prev = tmp;
2532 			tmp->b_next = bp;
2533 		} else {
2534 			tmp = q->q_last;
2535 			if ((mcls < (int)queclass(tmp)) ||
2536 			    (bp->b_band <= tmp->b_band)) {
2537 
2538 				/*
2539 				 * Tack bp on end of queue.
2540 				 */
2541 				bp->b_next = NULL;
2542 				bp->b_prev = tmp;
2543 				tmp->b_next = bp;
2544 				q->q_last = bp;
2545 			} else {
2546 				tmp = q->q_first;
2547 				while (tmp->b_datap->db_type >= QPCTL)
2548 					tmp = tmp->b_next;
2549 				while (tmp->b_band >= bp->b_band)
2550 					tmp = tmp->b_next;
2551 
2552 				/*
2553 				 * Insert bp before tmp.
2554 				 */
2555 				bp->b_next = tmp;
2556 				bp->b_prev = tmp->b_prev;
2557 				if (tmp->b_prev)
2558 					tmp->b_prev->b_next = bp;
2559 				else
2560 					q->q_first = bp;
2561 				tmp->b_prev = bp;
2562 			}
2563 			qbp->qb_first = bp;
2564 		}
2565 		qbp->qb_last = bp;
2566 	}
2567 
2568 	/* Get message byte count for q_count accounting */
2569 	bytecnt = mp_cont_len(bp, &mblkcnt);
2570 
2571 	if (qbp) {
2572 		qbp->qb_count += bytecnt;
2573 		qbp->qb_mblkcnt += mblkcnt;
2574 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2575 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2576 			qbp->qb_flag |= QB_FULL;
2577 		}
2578 	} else {
2579 		q->q_count += bytecnt;
2580 		q->q_mblkcnt += mblkcnt;
2581 		if ((q->q_count >= q->q_hiwat) ||
2582 		    (q->q_mblkcnt >= q->q_hiwat)) {
2583 			q->q_flag |= QFULL;
2584 		}
2585 	}
2586 
2587 	STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL);
2588 
2589 	if ((mcls > QNORM) ||
2590 	    (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
2591 		qenable_locked(q);
2592 	ASSERT(MUTEX_HELD(QLOCK(q)));
2593 	if (freezer != curthread)
2594 		mutex_exit(QLOCK(q));
2595 
2596 	return (1);
2597 }
2598 
2599 /*
2600  * Put stuff back at beginning of Q according to priority order.
2601  * See comment on putq above for details.
2602  */
2603 int
2604 putbq(queue_t *q, mblk_t *bp)
2605 {
2606 	mblk_t *tmp;
2607 	qband_t *qbp = NULL;
2608 	int mcls = (int)queclass(bp);
2609 	kthread_id_t freezer;
2610 	int	bytecnt = 0, mblkcnt = 0;
2611 
2612 	ASSERT(q && bp);
2613 	ASSERT(bp->b_next == NULL);
2614 	freezer = STREAM(q)->sd_freezer;
2615 	if (freezer == curthread) {
2616 		ASSERT(frozenstr(q));
2617 		ASSERT(MUTEX_HELD(QLOCK(q)));
2618 	} else
2619 		mutex_enter(QLOCK(q));
2620 
2621 	/*
2622 	 * Make sanity checks and if qband structure is not yet
2623 	 * allocated, do so.
2624 	 */
2625 	if (mcls == QPCTL) {
2626 		if (bp->b_band != 0)
2627 			bp->b_band = 0;		/* force to be correct */
2628 	} else if (bp->b_band != 0) {
2629 		int i;
2630 		qband_t **qbpp;
2631 
2632 		if (bp->b_band > q->q_nband) {
2633 			qbpp = &q->q_bandp;
2634 			while (*qbpp)
2635 				qbpp = &(*qbpp)->qb_next;
2636 			while (bp->b_band > q->q_nband) {
2637 				if ((*qbpp = allocband()) == NULL) {
2638 					if (freezer != curthread)
2639 						mutex_exit(QLOCK(q));
2640 					return (0);
2641 				}
2642 				(*qbpp)->qb_hiwat = q->q_hiwat;
2643 				(*qbpp)->qb_lowat = q->q_lowat;
2644 				q->q_nband++;
2645 				qbpp = &(*qbpp)->qb_next;
2646 			}
2647 		}
2648 		qbp = q->q_bandp;
2649 		i = bp->b_band;
2650 		while (--i)
2651 			qbp = qbp->qb_next;
2652 	}
2653 
2654 	/*
2655 	 * If queue is empty or if message is high priority,
2656 	 * place on the front of the queue.
2657 	 */
2658 	tmp = q->q_first;
2659 	if ((!tmp) || (mcls == QPCTL)) {
2660 		bp->b_next = tmp;
2661 		if (tmp)
2662 			tmp->b_prev = bp;
2663 		else
2664 			q->q_last = bp;
2665 		q->q_first = bp;
2666 		bp->b_prev = NULL;
2667 		if (qbp) {
2668 			qbp->qb_first = bp;
2669 			qbp->qb_last = bp;
2670 		}
2671 	} else if (qbp) {	/* bp->b_band != 0 */
2672 		tmp = qbp->qb_first;
2673 		if (tmp) {
2674 
2675 			/*
2676 			 * Insert bp before the first message in this band.
2677 			 */
2678 			bp->b_next = tmp;
2679 			bp->b_prev = tmp->b_prev;
2680 			if (tmp->b_prev)
2681 				tmp->b_prev->b_next = bp;
2682 			else
2683 				q->q_first = bp;
2684 			tmp->b_prev = bp;
2685 		} else {
2686 			tmp = q->q_last;
2687 			if ((mcls < (int)queclass(tmp)) ||
2688 			    (bp->b_band < tmp->b_band)) {
2689 
2690 				/*
2691 				 * Tack bp on end of queue.
2692 				 */
2693 				bp->b_next = NULL;
2694 				bp->b_prev = tmp;
2695 				tmp->b_next = bp;
2696 				q->q_last = bp;
2697 			} else {
2698 				tmp = q->q_first;
2699 				while (tmp->b_datap->db_type >= QPCTL)
2700 					tmp = tmp->b_next;
2701 				while (tmp->b_band > bp->b_band)
2702 					tmp = tmp->b_next;
2703 
2704 				/*
2705 				 * Insert bp before tmp.
2706 				 */
2707 				bp->b_next = tmp;
2708 				bp->b_prev = tmp->b_prev;
2709 				if (tmp->b_prev)
2710 					tmp->b_prev->b_next = bp;
2711 				else
2712 					q->q_first = bp;
2713 				tmp->b_prev = bp;
2714 			}
2715 			qbp->qb_last = bp;
2716 		}
2717 		qbp->qb_first = bp;
2718 	} else {		/* bp->b_band == 0 && !QPCTL */
2719 
2720 		/*
2721 		 * If the queue class or band is less than that of the last
2722 		 * message on the queue, tack bp on the end of the queue.
2723 		 */
2724 		tmp = q->q_last;
2725 		if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
2726 			bp->b_next = NULL;
2727 			bp->b_prev = tmp;
2728 			tmp->b_next = bp;
2729 			q->q_last = bp;
2730 		} else {
2731 			tmp = q->q_first;
2732 			while (tmp->b_datap->db_type >= QPCTL)
2733 				tmp = tmp->b_next;
2734 			while (tmp->b_band > bp->b_band)
2735 				tmp = tmp->b_next;
2736 
2737 			/*
2738 			 * Insert bp before tmp.
2739 			 */
2740 			bp->b_next = tmp;
2741 			bp->b_prev = tmp->b_prev;
2742 			if (tmp->b_prev)
2743 				tmp->b_prev->b_next = bp;
2744 			else
2745 				q->q_first = bp;
2746 			tmp->b_prev = bp;
2747 		}
2748 	}
2749 
2750 	/* Get message byte count for q_count accounting */
2751 	bytecnt = mp_cont_len(bp, &mblkcnt);
2752 
2753 	if (qbp) {
2754 		qbp->qb_count += bytecnt;
2755 		qbp->qb_mblkcnt += mblkcnt;
2756 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2757 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2758 			qbp->qb_flag |= QB_FULL;
2759 		}
2760 	} else {
2761 		q->q_count += bytecnt;
2762 		q->q_mblkcnt += mblkcnt;
2763 		if ((q->q_count >= q->q_hiwat) ||
2764 		    (q->q_mblkcnt >= q->q_hiwat)) {
2765 			q->q_flag |= QFULL;
2766 		}
2767 	}
2768 
2769 	STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL);
2770 
2771 	if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
2772 		qenable_locked(q);
2773 	ASSERT(MUTEX_HELD(QLOCK(q)));
2774 	if (freezer != curthread)
2775 		mutex_exit(QLOCK(q));
2776 
2777 	return (1);
2778 }
2779 
2780 /*
2781  * Insert a message before an existing message on the queue.  If the
2782  * existing message is NULL, the new messages is placed on the end of
2783  * the queue.  The queue class of the new message is ignored.  However,
2784  * the priority band of the new message must adhere to the following
2785  * ordering:
2786  *
2787  *	emp->b_prev->b_band >= mp->b_band >= emp->b_band.
2788  *
2789  * All flow control parameters are updated.
2790  *
2791  * insq can be called with the stream frozen, but other utility functions
2792  * holding QLOCK, and by streams modules without any locks/frozen.
2793  */
2794 int
2795 insq(queue_t *q, mblk_t *emp, mblk_t *mp)
2796 {
2797 	mblk_t *tmp;
2798 	qband_t *qbp = NULL;
2799 	int mcls = (int)queclass(mp);
2800 	kthread_id_t freezer;
2801 	int	bytecnt = 0, mblkcnt = 0;
2802 
2803 	freezer = STREAM(q)->sd_freezer;
2804 	if (freezer == curthread) {
2805 		ASSERT(frozenstr(q));
2806 		ASSERT(MUTEX_HELD(QLOCK(q)));
2807 	} else if (MUTEX_HELD(QLOCK(q))) {
2808 		/* Don't drop lock on exit */
2809 		freezer = curthread;
2810 	} else
2811 		mutex_enter(QLOCK(q));
2812 
2813 	if (mcls == QPCTL) {
2814 		if (mp->b_band != 0)
2815 			mp->b_band = 0;		/* force to be correct */
2816 		if (emp && emp->b_prev &&
2817 		    (emp->b_prev->b_datap->db_type < QPCTL))
2818 			goto badord;
2819 	}
2820 	if (emp) {
2821 		if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
2822 		    (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
2823 		    (emp->b_prev->b_band < mp->b_band))) {
2824 			goto badord;
2825 		}
2826 	} else {
2827 		tmp = q->q_last;
2828 		if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
2829 badord:
2830 			cmn_err(CE_WARN,
2831 			    "insq: attempt to insert message out of order "
2832 			    "on q %p", (void *)q);
2833 			if (freezer != curthread)
2834 				mutex_exit(QLOCK(q));
2835 			return (0);
2836 		}
2837 	}
2838 
2839 	if (mp->b_band != 0) {
2840 		int i;
2841 		qband_t **qbpp;
2842 
2843 		if (mp->b_band > q->q_nband) {
2844 			qbpp = &q->q_bandp;
2845 			while (*qbpp)
2846 				qbpp = &(*qbpp)->qb_next;
2847 			while (mp->b_band > q->q_nband) {
2848 				if ((*qbpp = allocband()) == NULL) {
2849 					if (freezer != curthread)
2850 						mutex_exit(QLOCK(q));
2851 					return (0);
2852 				}
2853 				(*qbpp)->qb_hiwat = q->q_hiwat;
2854 				(*qbpp)->qb_lowat = q->q_lowat;
2855 				q->q_nband++;
2856 				qbpp = &(*qbpp)->qb_next;
2857 			}
2858 		}
2859 		qbp = q->q_bandp;
2860 		i = mp->b_band;
2861 		while (--i)
2862 			qbp = qbp->qb_next;
2863 	}
2864 
2865 	if ((mp->b_next = emp) != NULL) {
2866 		if ((mp->b_prev = emp->b_prev) != NULL)
2867 			emp->b_prev->b_next = mp;
2868 		else
2869 			q->q_first = mp;
2870 		emp->b_prev = mp;
2871 	} else {
2872 		if ((mp->b_prev = q->q_last) != NULL)
2873 			q->q_last->b_next = mp;
2874 		else
2875 			q->q_first = mp;
2876 		q->q_last = mp;
2877 	}
2878 
2879 	/* Get mblk and byte count for q_count accounting */
2880 	bytecnt = mp_cont_len(mp, &mblkcnt);
2881 
2882 	if (qbp) {	/* adjust qband pointers and count */
2883 		if (!qbp->qb_first) {
2884 			qbp->qb_first = mp;
2885 			qbp->qb_last = mp;
2886 		} else {
2887 			if (mp->b_prev == NULL || (mp->b_prev != NULL &&
2888 			    (mp->b_prev->b_band != mp->b_band)))
2889 				qbp->qb_first = mp;
2890 			else if (mp->b_next == NULL || (mp->b_next != NULL &&
2891 			    (mp->b_next->b_band != mp->b_band)))
2892 				qbp->qb_last = mp;
2893 		}
2894 		qbp->qb_count += bytecnt;
2895 		qbp->qb_mblkcnt += mblkcnt;
2896 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2897 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2898 			qbp->qb_flag |= QB_FULL;
2899 		}
2900 	} else {
2901 		q->q_count += bytecnt;
2902 		q->q_mblkcnt += mblkcnt;
2903 		if ((q->q_count >= q->q_hiwat) ||
2904 		    (q->q_mblkcnt >= q->q_hiwat)) {
2905 			q->q_flag |= QFULL;
2906 		}
2907 	}
2908 
2909 	STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL);
2910 
2911 	if (canenable(q) && (q->q_flag & QWANTR))
2912 		qenable_locked(q);
2913 
2914 	ASSERT(MUTEX_HELD(QLOCK(q)));
2915 	if (freezer != curthread)
2916 		mutex_exit(QLOCK(q));
2917 
2918 	return (1);
2919 }
2920 
2921 /*
2922  * Create and put a control message on queue.
2923  */
2924 int
2925 putctl(queue_t *q, int type)
2926 {
2927 	mblk_t *bp;
2928 
2929 	if ((datamsg(type) && (type != M_DELAY)) ||
2930 	    (bp = allocb_tryhard(0)) == NULL)
2931 		return (0);
2932 	bp->b_datap->db_type = (unsigned char) type;
2933 
2934 	put(q, bp);
2935 
2936 	return (1);
2937 }
2938 
2939 /*
2940  * Control message with a single-byte parameter
2941  */
2942 int
2943 putctl1(queue_t *q, int type, int param)
2944 {
2945 	mblk_t *bp;
2946 
2947 	if ((datamsg(type) && (type != M_DELAY)) ||
2948 	    (bp = allocb_tryhard(1)) == NULL)
2949 		return (0);
2950 	bp->b_datap->db_type = (unsigned char)type;
2951 	*bp->b_wptr++ = (unsigned char)param;
2952 
2953 	put(q, bp);
2954 
2955 	return (1);
2956 }
2957 
2958 int
2959 putnextctl1(queue_t *q, int type, int param)
2960 {
2961 	mblk_t *bp;
2962 
2963 	if ((datamsg(type) && (type != M_DELAY)) ||
2964 	    ((bp = allocb_tryhard(1)) == NULL))
2965 		return (0);
2966 
2967 	bp->b_datap->db_type = (unsigned char)type;
2968 	*bp->b_wptr++ = (unsigned char)param;
2969 
2970 	putnext(q, bp);
2971 
2972 	return (1);
2973 }
2974 
2975 int
2976 putnextctl(queue_t *q, int type)
2977 {
2978 	mblk_t *bp;
2979 
2980 	if ((datamsg(type) && (type != M_DELAY)) ||
2981 	    ((bp = allocb_tryhard(0)) == NULL))
2982 		return (0);
2983 	bp->b_datap->db_type = (unsigned char)type;
2984 
2985 	putnext(q, bp);
2986 
2987 	return (1);
2988 }
2989 
2990 /*
2991  * Return the queue upstream from this one
2992  */
2993 queue_t *
2994 backq(queue_t *q)
2995 {
2996 	q = _OTHERQ(q);
2997 	if (q->q_next) {
2998 		q = q->q_next;
2999 		return (_OTHERQ(q));
3000 	}
3001 	return (NULL);
3002 }
3003 
3004 /*
3005  * Send a block back up the queue in reverse from this
3006  * one (e.g. to respond to ioctls)
3007  */
3008 void
3009 qreply(queue_t *q, mblk_t *bp)
3010 {
3011 	ASSERT(q && bp);
3012 
3013 	putnext(_OTHERQ(q), bp);
3014 }
3015 
3016 /*
3017  * Streams Queue Scheduling
3018  *
3019  * Queues are enabled through qenable() when they have messages to
3020  * process.  They are serviced by queuerun(), which runs each enabled
3021  * queue's service procedure.  The call to queuerun() is processor
3022  * dependent - the general principle is that it be run whenever a queue
3023  * is enabled but before returning to user level.  For system calls,
3024  * the function runqueues() is called if their action causes a queue
3025  * to be enabled.  For device interrupts, queuerun() should be
3026  * called before returning from the last level of interrupt.  Beyond
3027  * this, no timing assumptions should be made about queue scheduling.
3028  */
3029 
3030 /*
3031  * Enable a queue: put it on list of those whose service procedures are
3032  * ready to run and set up the scheduling mechanism.
3033  * The broadcast is done outside the mutex -> to avoid the woken thread
3034  * from contending with the mutex. This is OK 'cos the queue has been
3035  * enqueued on the runlist and flagged safely at this point.
3036  */
3037 void
3038 qenable(queue_t *q)
3039 {
3040 	mutex_enter(QLOCK(q));
3041 	qenable_locked(q);
3042 	mutex_exit(QLOCK(q));
3043 }
3044 /*
3045  * Return number of messages on queue
3046  */
3047 int
3048 qsize(queue_t *qp)
3049 {
3050 	int count = 0;
3051 	mblk_t *mp;
3052 
3053 	mutex_enter(QLOCK(qp));
3054 	for (mp = qp->q_first; mp; mp = mp->b_next)
3055 		count++;
3056 	mutex_exit(QLOCK(qp));
3057 	return (count);
3058 }
3059 
3060 /*
3061  * noenable - set queue so that putq() will not enable it.
3062  * enableok - set queue so that putq() can enable it.
3063  */
3064 void
3065 noenable(queue_t *q)
3066 {
3067 	mutex_enter(QLOCK(q));
3068 	q->q_flag |= QNOENB;
3069 	mutex_exit(QLOCK(q));
3070 }
3071 
3072 void
3073 enableok(queue_t *q)
3074 {
3075 	mutex_enter(QLOCK(q));
3076 	q->q_flag &= ~QNOENB;
3077 	mutex_exit(QLOCK(q));
3078 }
3079 
3080 /*
3081  * Set queue fields.
3082  */
3083 int
3084 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
3085 {
3086 	qband_t *qbp = NULL;
3087 	queue_t	*wrq;
3088 	int error = 0;
3089 	kthread_id_t freezer;
3090 
3091 	freezer = STREAM(q)->sd_freezer;
3092 	if (freezer == curthread) {
3093 		ASSERT(frozenstr(q));
3094 		ASSERT(MUTEX_HELD(QLOCK(q)));
3095 	} else
3096 		mutex_enter(QLOCK(q));
3097 
3098 	if (what >= QBAD) {
3099 		error = EINVAL;
3100 		goto done;
3101 	}
3102 	if (pri != 0) {
3103 		int i;
3104 		qband_t **qbpp;
3105 
3106 		if (pri > q->q_nband) {
3107 			qbpp = &q->q_bandp;
3108 			while (*qbpp)
3109 				qbpp = &(*qbpp)->qb_next;
3110 			while (pri > q->q_nband) {
3111 				if ((*qbpp = allocband()) == NULL) {
3112 					error = EAGAIN;
3113 					goto done;
3114 				}
3115 				(*qbpp)->qb_hiwat = q->q_hiwat;
3116 				(*qbpp)->qb_lowat = q->q_lowat;
3117 				q->q_nband++;
3118 				qbpp = &(*qbpp)->qb_next;
3119 			}
3120 		}
3121 		qbp = q->q_bandp;
3122 		i = pri;
3123 		while (--i)
3124 			qbp = qbp->qb_next;
3125 	}
3126 	switch (what) {
3127 
3128 	case QHIWAT:
3129 		if (qbp)
3130 			qbp->qb_hiwat = (size_t)val;
3131 		else
3132 			q->q_hiwat = (size_t)val;
3133 		break;
3134 
3135 	case QLOWAT:
3136 		if (qbp)
3137 			qbp->qb_lowat = (size_t)val;
3138 		else
3139 			q->q_lowat = (size_t)val;
3140 		break;
3141 
3142 	case QMAXPSZ:
3143 		if (qbp)
3144 			error = EINVAL;
3145 		else
3146 			q->q_maxpsz = (ssize_t)val;
3147 
3148 		/*
3149 		 * Performance concern, strwrite looks at the module below
3150 		 * the stream head for the maxpsz each time it does a write
3151 		 * we now cache it at the stream head.  Check to see if this
3152 		 * queue is sitting directly below the stream head.
3153 		 */
3154 		wrq = STREAM(q)->sd_wrq;
3155 		if (q != wrq->q_next)
3156 			break;
3157 
3158 		/*
3159 		 * If the stream is not frozen drop the current QLOCK and
3160 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3161 		 */
3162 		if (freezer != curthread) {
3163 			mutex_exit(QLOCK(q));
3164 			mutex_enter(QLOCK(wrq));
3165 		}
3166 		ASSERT(MUTEX_HELD(QLOCK(wrq)));
3167 
3168 		if (strmsgsz != 0) {
3169 			if (val == INFPSZ)
3170 				val = strmsgsz;
3171 			else  {
3172 				if (STREAM(q)->sd_vnode->v_type == VFIFO)
3173 					val = MIN(PIPE_BUF, val);
3174 				else
3175 					val = MIN(strmsgsz, val);
3176 			}
3177 		}
3178 		STREAM(q)->sd_qn_maxpsz = val;
3179 		if (freezer != curthread) {
3180 			mutex_exit(QLOCK(wrq));
3181 			mutex_enter(QLOCK(q));
3182 		}
3183 		break;
3184 
3185 	case QMINPSZ:
3186 		if (qbp)
3187 			error = EINVAL;
3188 		else
3189 			q->q_minpsz = (ssize_t)val;
3190 
3191 		/*
3192 		 * Performance concern, strwrite looks at the module below
3193 		 * the stream head for the maxpsz each time it does a write
3194 		 * we now cache it at the stream head.  Check to see if this
3195 		 * queue is sitting directly below the stream head.
3196 		 */
3197 		wrq = STREAM(q)->sd_wrq;
3198 		if (q != wrq->q_next)
3199 			break;
3200 
3201 		/*
3202 		 * If the stream is not frozen drop the current QLOCK and
3203 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3204 		 */
3205 		if (freezer != curthread) {
3206 			mutex_exit(QLOCK(q));
3207 			mutex_enter(QLOCK(wrq));
3208 		}
3209 		STREAM(q)->sd_qn_minpsz = (ssize_t)val;
3210 
3211 		if (freezer != curthread) {
3212 			mutex_exit(QLOCK(wrq));
3213 			mutex_enter(QLOCK(q));
3214 		}
3215 		break;
3216 
3217 	case QSTRUIOT:
3218 		if (qbp)
3219 			error = EINVAL;
3220 		else
3221 			q->q_struiot = (ushort_t)val;
3222 		break;
3223 
3224 	case QCOUNT:
3225 	case QFIRST:
3226 	case QLAST:
3227 	case QFLAG:
3228 		error = EPERM;
3229 		break;
3230 
3231 	default:
3232 		error = EINVAL;
3233 		break;
3234 	}
3235 done:
3236 	if (freezer != curthread)
3237 		mutex_exit(QLOCK(q));
3238 	return (error);
3239 }
3240 
3241 /*
3242  * Get queue fields.
3243  */
3244 int
3245 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
3246 {
3247 	qband_t 	*qbp = NULL;
3248 	int 		error = 0;
3249 	kthread_id_t 	freezer;
3250 
3251 	freezer = STREAM(q)->sd_freezer;
3252 	if (freezer == curthread) {
3253 		ASSERT(frozenstr(q));
3254 		ASSERT(MUTEX_HELD(QLOCK(q)));
3255 	} else
3256 		mutex_enter(QLOCK(q));
3257 	if (what >= QBAD) {
3258 		error = EINVAL;
3259 		goto done;
3260 	}
3261 	if (pri != 0) {
3262 		int i;
3263 		qband_t **qbpp;
3264 
3265 		if (pri > q->q_nband) {
3266 			qbpp = &q->q_bandp;
3267 			while (*qbpp)
3268 				qbpp = &(*qbpp)->qb_next;
3269 			while (pri > q->q_nband) {
3270 				if ((*qbpp = allocband()) == NULL) {
3271 					error = EAGAIN;
3272 					goto done;
3273 				}
3274 				(*qbpp)->qb_hiwat = q->q_hiwat;
3275 				(*qbpp)->qb_lowat = q->q_lowat;
3276 				q->q_nband++;
3277 				qbpp = &(*qbpp)->qb_next;
3278 			}
3279 		}
3280 		qbp = q->q_bandp;
3281 		i = pri;
3282 		while (--i)
3283 			qbp = qbp->qb_next;
3284 	}
3285 	switch (what) {
3286 	case QHIWAT:
3287 		if (qbp)
3288 			*(size_t *)valp = qbp->qb_hiwat;
3289 		else
3290 			*(size_t *)valp = q->q_hiwat;
3291 		break;
3292 
3293 	case QLOWAT:
3294 		if (qbp)
3295 			*(size_t *)valp = qbp->qb_lowat;
3296 		else
3297 			*(size_t *)valp = q->q_lowat;
3298 		break;
3299 
3300 	case QMAXPSZ:
3301 		if (qbp)
3302 			error = EINVAL;
3303 		else
3304 			*(ssize_t *)valp = q->q_maxpsz;
3305 		break;
3306 
3307 	case QMINPSZ:
3308 		if (qbp)
3309 			error = EINVAL;
3310 		else
3311 			*(ssize_t *)valp = q->q_minpsz;
3312 		break;
3313 
3314 	case QCOUNT:
3315 		if (qbp)
3316 			*(size_t *)valp = qbp->qb_count;
3317 		else
3318 			*(size_t *)valp = q->q_count;
3319 		break;
3320 
3321 	case QFIRST:
3322 		if (qbp)
3323 			*(mblk_t **)valp = qbp->qb_first;
3324 		else
3325 			*(mblk_t **)valp = q->q_first;
3326 		break;
3327 
3328 	case QLAST:
3329 		if (qbp)
3330 			*(mblk_t **)valp = qbp->qb_last;
3331 		else
3332 			*(mblk_t **)valp = q->q_last;
3333 		break;
3334 
3335 	case QFLAG:
3336 		if (qbp)
3337 			*(uint_t *)valp = qbp->qb_flag;
3338 		else
3339 			*(uint_t *)valp = q->q_flag;
3340 		break;
3341 
3342 	case QSTRUIOT:
3343 		if (qbp)
3344 			error = EINVAL;
3345 		else
3346 			*(short *)valp = q->q_struiot;
3347 		break;
3348 
3349 	default:
3350 		error = EINVAL;
3351 		break;
3352 	}
3353 done:
3354 	if (freezer != curthread)
3355 		mutex_exit(QLOCK(q));
3356 	return (error);
3357 }
3358 
3359 /*
3360  * Function awakes all in cvwait/sigwait/pollwait, on one of:
3361  *	QWANTWSYNC or QWANTR or QWANTW,
3362  *
3363  * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
3364  *	 deferred wakeup will be done. Also if strpoll() in progress then a
3365  *	 deferred pollwakeup will be done.
3366  */
3367 void
3368 strwakeq(queue_t *q, int flag)
3369 {
3370 	stdata_t 	*stp = STREAM(q);
3371 	pollhead_t 	*pl;
3372 
3373 	mutex_enter(&stp->sd_lock);
3374 	pl = &stp->sd_pollist;
3375 	if (flag & QWANTWSYNC) {
3376 		ASSERT(!(q->q_flag & QREADR));
3377 		if (stp->sd_flag & WSLEEP) {
3378 			stp->sd_flag &= ~WSLEEP;
3379 			cv_broadcast(&stp->sd_wrq->q_wait);
3380 		} else {
3381 			stp->sd_wakeq |= WSLEEP;
3382 		}
3383 
3384 		mutex_exit(&stp->sd_lock);
3385 		pollwakeup(pl, POLLWRNORM);
3386 		mutex_enter(&stp->sd_lock);
3387 
3388 		if (stp->sd_sigflags & S_WRNORM)
3389 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3390 	} else if (flag & QWANTR) {
3391 		if (stp->sd_flag & RSLEEP) {
3392 			stp->sd_flag &= ~RSLEEP;
3393 			cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
3394 		} else {
3395 			stp->sd_wakeq |= RSLEEP;
3396 		}
3397 
3398 		mutex_exit(&stp->sd_lock);
3399 		pollwakeup(pl, POLLIN | POLLRDNORM);
3400 		mutex_enter(&stp->sd_lock);
3401 
3402 		{
3403 			int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
3404 
3405 			if (events)
3406 				strsendsig(stp->sd_siglist, events, 0, 0);
3407 		}
3408 	} else {
3409 		if (stp->sd_flag & WSLEEP) {
3410 			stp->sd_flag &= ~WSLEEP;
3411 			cv_broadcast(&stp->sd_wrq->q_wait);
3412 		}
3413 
3414 		mutex_exit(&stp->sd_lock);
3415 		pollwakeup(pl, POLLWRNORM);
3416 		mutex_enter(&stp->sd_lock);
3417 
3418 		if (stp->sd_sigflags & S_WRNORM)
3419 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3420 	}
3421 	mutex_exit(&stp->sd_lock);
3422 }
3423 
3424 int
3425 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
3426 {
3427 	stdata_t *stp = STREAM(q);
3428 	int typ  = STRUIOT_STANDARD;
3429 	uio_t	 *uiop = &dp->d_uio;
3430 	dblk_t	 *dbp;
3431 	ssize_t	 uiocnt;
3432 	ssize_t	 cnt;
3433 	unsigned char *ptr;
3434 	ssize_t	 resid;
3435 	int	 error = 0;
3436 	on_trap_data_t otd;
3437 	queue_t	*stwrq;
3438 
3439 	/*
3440 	 * Plumbing may change while taking the type so store the
3441 	 * queue in a temporary variable. It doesn't matter even
3442 	 * if the we take the type from the previous plumbing,
3443 	 * that's because if the plumbing has changed when we were
3444 	 * holding the queue in a temporary variable, we can continue
3445 	 * processing the message the way it would have been processed
3446 	 * in the old plumbing, without any side effects but a bit
3447 	 * extra processing for partial ip header checksum.
3448 	 *
3449 	 * This has been done to avoid holding the sd_lock which is
3450 	 * very hot.
3451 	 */
3452 
3453 	stwrq = stp->sd_struiowrq;
3454 	if (stwrq)
3455 		typ = stwrq->q_struiot;
3456 
3457 	for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
3458 		dbp = mp->b_datap;
3459 		ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
3460 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3461 		cnt = MIN(uiocnt, uiop->uio_resid);
3462 		if (!(dbp->db_struioflag & STRUIO_SPEC) ||
3463 		    (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
3464 			/*
3465 			 * Either this mblk has already been processed
3466 			 * or there is no more room in this mblk (?).
3467 			 */
3468 			continue;
3469 		}
3470 		switch (typ) {
3471 		case STRUIOT_STANDARD:
3472 			if (noblock) {
3473 				if (on_trap(&otd, OT_DATA_ACCESS)) {
3474 					no_trap();
3475 					error = EWOULDBLOCK;
3476 					goto out;
3477 				}
3478 			}
3479 			if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
3480 				if (noblock)
3481 					no_trap();
3482 				goto out;
3483 			}
3484 			if (noblock)
3485 				no_trap();
3486 			break;
3487 
3488 		default:
3489 			error = EIO;
3490 			goto out;
3491 		}
3492 		dbp->db_struioflag |= STRUIO_DONE;
3493 		dbp->db_cksumstuff += cnt;
3494 	}
3495 out:
3496 	if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
3497 		/*
3498 		 * A fault has occured and some bytes were moved to the
3499 		 * current mblk, the uio_t has already been updated by
3500 		 * the appropriate uio routine, so also update the mblk
3501 		 * to reflect this in case this same mblk chain is used
3502 		 * again (after the fault has been handled).
3503 		 */
3504 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3505 		if (uiocnt >= resid)
3506 			dbp->db_cksumstuff += resid;
3507 	}
3508 	return (error);
3509 }
3510 
3511 /*
3512  * Try to enter queue synchronously. Any attempt to enter a closing queue will
3513  * fails. The qp->q_rwcnt keeps track of the number of successful entries so
3514  * that removeq() will not try to close the queue while a thread is inside the
3515  * queue.
3516  */
3517 static boolean_t
3518 rwnext_enter(queue_t *qp)
3519 {
3520 	mutex_enter(QLOCK(qp));
3521 	if (qp->q_flag & QWCLOSE) {
3522 		mutex_exit(QLOCK(qp));
3523 		return (B_FALSE);
3524 	}
3525 	qp->q_rwcnt++;
3526 	ASSERT(qp->q_rwcnt != 0);
3527 	mutex_exit(QLOCK(qp));
3528 	return (B_TRUE);
3529 }
3530 
3531 /*
3532  * Decrease the count of threads running in sync stream queue and wake up any
3533  * threads blocked in removeq().
3534  */
3535 static void
3536 rwnext_exit(queue_t *qp)
3537 {
3538 	mutex_enter(QLOCK(qp));
3539 	qp->q_rwcnt--;
3540 	if (qp->q_flag & QWANTRMQSYNC) {
3541 		qp->q_flag &= ~QWANTRMQSYNC;
3542 		cv_broadcast(&qp->q_wait);
3543 	}
3544 	mutex_exit(QLOCK(qp));
3545 }
3546 
3547 /*
3548  * The purpose of rwnext() is to call the rw procedure of the next
3549  * (downstream) modules queue.
3550  *
3551  * treated as put entrypoint for perimeter syncronization.
3552  *
3553  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3554  * sync queues). If it is CIPUT sync queue sq_count is incremented and it does
3555  * not matter if any regular put entrypoints have been already entered. We
3556  * can't increment one of the sq_putcounts (instead of sq_count) because
3557  * qwait_rw won't know which counter to decrement.
3558  *
3559  * It would be reasonable to add the lockless FASTPUT logic.
3560  */
3561 int
3562 rwnext(queue_t *qp, struiod_t *dp)
3563 {
3564 	queue_t		*nqp;
3565 	syncq_t		*sq;
3566 	uint16_t	count;
3567 	uint16_t	flags;
3568 	struct qinit	*qi;
3569 	int		(*proc)();
3570 	struct stdata	*stp;
3571 	int		isread;
3572 	int		rval;
3573 
3574 	stp = STREAM(qp);
3575 	/*
3576 	 * Prevent q_next from changing by holding sd_lock until acquiring
3577 	 * SQLOCK. Note that a read-side rwnext from the streamhead will
3578 	 * already have sd_lock acquired. In either case sd_lock is always
3579 	 * released after acquiring SQLOCK.
3580 	 *
3581 	 * The streamhead read-side holding sd_lock when calling rwnext is
3582 	 * required to prevent a race condition were M_DATA mblks flowing
3583 	 * up the read-side of the stream could be bypassed by a rwnext()
3584 	 * down-call. In this case sd_lock acts as the streamhead perimeter.
3585 	 */
3586 	if ((nqp = _WR(qp)) == qp) {
3587 		isread = 0;
3588 		mutex_enter(&stp->sd_lock);
3589 		qp = nqp->q_next;
3590 	} else {
3591 		isread = 1;
3592 		if (nqp != stp->sd_wrq)
3593 			/* Not streamhead */
3594 			mutex_enter(&stp->sd_lock);
3595 		qp = _RD(nqp->q_next);
3596 	}
3597 	qi = qp->q_qinfo;
3598 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
3599 		/*
3600 		 * Not a synchronous module or no r/w procedure for this
3601 		 * queue, so just return EINVAL and let the caller handle it.
3602 		 */
3603 		mutex_exit(&stp->sd_lock);
3604 		return (EINVAL);
3605 	}
3606 
3607 	if (rwnext_enter(qp) == B_FALSE) {
3608 		mutex_exit(&stp->sd_lock);
3609 		return (EINVAL);
3610 	}
3611 
3612 	sq = qp->q_syncq;
3613 	mutex_enter(SQLOCK(sq));
3614 	mutex_exit(&stp->sd_lock);
3615 	count = sq->sq_count;
3616 	flags = sq->sq_flags;
3617 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3618 
3619 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3620 		/*
3621 		 * if this queue is being closed, return.
3622 		 */
3623 		if (qp->q_flag & QWCLOSE) {
3624 			mutex_exit(SQLOCK(sq));
3625 			rwnext_exit(qp);
3626 			return (EINVAL);
3627 		}
3628 
3629 		/*
3630 		 * Wait until we can enter the inner perimeter.
3631 		 */
3632 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3633 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3634 		count = sq->sq_count;
3635 		flags = sq->sq_flags;
3636 	}
3637 
3638 	if (isread == 0 && stp->sd_struiowrq == NULL ||
3639 	    isread == 1 && stp->sd_struiordq == NULL) {
3640 		/*
3641 		 * Stream plumbing changed while waiting for inner perimeter
3642 		 * so just return EINVAL and let the caller handle it.
3643 		 */
3644 		mutex_exit(SQLOCK(sq));
3645 		rwnext_exit(qp);
3646 		return (EINVAL);
3647 	}
3648 	if (!(flags & SQ_CIPUT))
3649 		sq->sq_flags = flags | SQ_EXCL;
3650 	sq->sq_count = count + 1;
3651 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3652 	/*
3653 	 * Note: The only message ordering guarantee that rwnext() makes is
3654 	 *	 for the write queue flow-control case. All others (r/w queue
3655 	 *	 with q_count > 0 (or q_first != 0)) are the resposibilty of
3656 	 *	 the queue's rw procedure. This could be genralized here buy
3657 	 *	 running the queue's service procedure, but that wouldn't be
3658 	 *	 the most efficent for all cases.
3659 	 */
3660 	mutex_exit(SQLOCK(sq));
3661 	if (! isread && (qp->q_flag & QFULL)) {
3662 		/*
3663 		 * Write queue may be flow controlled. If so,
3664 		 * mark the queue for wakeup when it's not.
3665 		 */
3666 		mutex_enter(QLOCK(qp));
3667 		if (qp->q_flag & QFULL) {
3668 			qp->q_flag |= QWANTWSYNC;
3669 			mutex_exit(QLOCK(qp));
3670 			rval = EWOULDBLOCK;
3671 			goto out;
3672 		}
3673 		mutex_exit(QLOCK(qp));
3674 	}
3675 
3676 	if (! isread && dp->d_mp)
3677 		STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
3678 		    dp->d_mp->b_datap->db_base);
3679 
3680 	rval = (*proc)(qp, dp);
3681 
3682 	if (isread && dp->d_mp)
3683 		STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
3684 		    dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
3685 out:
3686 	/*
3687 	 * The queue is protected from being freed by sq_count, so it is
3688 	 * safe to call rwnext_exit and reacquire SQLOCK(sq).
3689 	 */
3690 	rwnext_exit(qp);
3691 
3692 	mutex_enter(SQLOCK(sq));
3693 	flags = sq->sq_flags;
3694 	ASSERT(sq->sq_count != 0);
3695 	sq->sq_count--;
3696 	if (flags & SQ_TAIL) {
3697 		putnext_tail(sq, qp, flags);
3698 		/*
3699 		 * The only purpose of this ASSERT is to preserve calling stack
3700 		 * in DEBUG kernel.
3701 		 */
3702 		ASSERT(flags & SQ_TAIL);
3703 		return (rval);
3704 	}
3705 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3706 	/*
3707 	 * Safe to always drop SQ_EXCL:
3708 	 *	Not SQ_CIPUT means we set SQ_EXCL above
3709 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3710 	 *	did a qwriter(INNER) in which case nobody else
3711 	 *	is in the inner perimeter and we are exiting.
3712 	 *
3713 	 * I would like to make the following assertion:
3714 	 *
3715 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3716 	 * 	sq->sq_count == 0);
3717 	 *
3718 	 * which indicates that if we are both putshared and exclusive,
3719 	 * we became exclusive while executing the putproc, and the only
3720 	 * claim on the syncq was the one we dropped a few lines above.
3721 	 * But other threads that enter putnext while the syncq is exclusive
3722 	 * need to make a claim as they may need to drop SQLOCK in the
3723 	 * has_writers case to avoid deadlocks.  If these threads are
3724 	 * delayed or preempted, it is possible that the writer thread can
3725 	 * find out that there are other claims making the (sq_count == 0)
3726 	 * test invalid.
3727 	 */
3728 
3729 	sq->sq_flags = flags & ~SQ_EXCL;
3730 	if (sq->sq_flags & SQ_WANTWAKEUP) {
3731 		sq->sq_flags &= ~SQ_WANTWAKEUP;
3732 		cv_broadcast(&sq->sq_wait);
3733 	}
3734 	mutex_exit(SQLOCK(sq));
3735 	return (rval);
3736 }
3737 
3738 /*
3739  * The purpose of infonext() is to call the info procedure of the next
3740  * (downstream) modules queue.
3741  *
3742  * treated as put entrypoint for perimeter syncronization.
3743  *
3744  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3745  * sync queues). If it is CIPUT sync queue regular sq_count is incremented and
3746  * it does not matter if any regular put entrypoints have been already
3747  * entered.
3748  */
3749 int
3750 infonext(queue_t *qp, infod_t *idp)
3751 {
3752 	queue_t		*nqp;
3753 	syncq_t		*sq;
3754 	uint16_t	count;
3755 	uint16_t 	flags;
3756 	struct qinit	*qi;
3757 	int		(*proc)();
3758 	struct stdata	*stp;
3759 	int		rval;
3760 
3761 	stp = STREAM(qp);
3762 	/*
3763 	 * Prevent q_next from changing by holding sd_lock until
3764 	 * acquiring SQLOCK.
3765 	 */
3766 	mutex_enter(&stp->sd_lock);
3767 	if ((nqp = _WR(qp)) == qp) {
3768 		qp = nqp->q_next;
3769 	} else {
3770 		qp = _RD(nqp->q_next);
3771 	}
3772 	qi = qp->q_qinfo;
3773 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
3774 		mutex_exit(&stp->sd_lock);
3775 		return (EINVAL);
3776 	}
3777 	sq = qp->q_syncq;
3778 	mutex_enter(SQLOCK(sq));
3779 	mutex_exit(&stp->sd_lock);
3780 	count = sq->sq_count;
3781 	flags = sq->sq_flags;
3782 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3783 
3784 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3785 		/*
3786 		 * Wait until we can enter the inner perimeter.
3787 		 */
3788 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3789 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3790 		count = sq->sq_count;
3791 		flags = sq->sq_flags;
3792 	}
3793 
3794 	if (! (flags & SQ_CIPUT))
3795 		sq->sq_flags = flags | SQ_EXCL;
3796 	sq->sq_count = count + 1;
3797 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3798 	mutex_exit(SQLOCK(sq));
3799 
3800 	rval = (*proc)(qp, idp);
3801 
3802 	mutex_enter(SQLOCK(sq));
3803 	flags = sq->sq_flags;
3804 	ASSERT(sq->sq_count != 0);
3805 	sq->sq_count--;
3806 	if (flags & SQ_TAIL) {
3807 		putnext_tail(sq, qp, flags);
3808 		/*
3809 		 * The only purpose of this ASSERT is to preserve calling stack
3810 		 * in DEBUG kernel.
3811 		 */
3812 		ASSERT(flags & SQ_TAIL);
3813 		return (rval);
3814 	}
3815 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3816 /*
3817  * XXXX
3818  * I am not certain the next comment is correct here.  I need to consider
3819  * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
3820  * might cause other problems.  It just might be safer to drop it if
3821  * !SQ_CIPUT because that is when we set it.
3822  */
3823 	/*
3824 	 * Safe to always drop SQ_EXCL:
3825 	 *	Not SQ_CIPUT means we set SQ_EXCL above
3826 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3827 	 *	did a qwriter(INNER) in which case nobody else
3828 	 *	is in the inner perimeter and we are exiting.
3829 	 *
3830 	 * I would like to make the following assertion:
3831 	 *
3832 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3833 	 *	sq->sq_count == 0);
3834 	 *
3835 	 * which indicates that if we are both putshared and exclusive,
3836 	 * we became exclusive while executing the putproc, and the only
3837 	 * claim on the syncq was the one we dropped a few lines above.
3838 	 * But other threads that enter putnext while the syncq is exclusive
3839 	 * need to make a claim as they may need to drop SQLOCK in the
3840 	 * has_writers case to avoid deadlocks.  If these threads are
3841 	 * delayed or preempted, it is possible that the writer thread can
3842 	 * find out that there are other claims making the (sq_count == 0)
3843 	 * test invalid.
3844 	 */
3845 
3846 	sq->sq_flags = flags & ~SQ_EXCL;
3847 	mutex_exit(SQLOCK(sq));
3848 	return (rval);
3849 }
3850 
3851 /*
3852  * Return nonzero if the queue is responsible for struio(), else return 0.
3853  */
3854 int
3855 isuioq(queue_t *q)
3856 {
3857 	if (q->q_flag & QREADR)
3858 		return (STREAM(q)->sd_struiordq == q);
3859 	else
3860 		return (STREAM(q)->sd_struiowrq == q);
3861 }
3862 
3863 #if defined(__sparc)
3864 int disable_putlocks = 0;
3865 #else
3866 int disable_putlocks = 1;
3867 #endif
3868 
3869 /*
3870  * called by create_putlock.
3871  */
3872 static void
3873 create_syncq_putlocks(queue_t *q)
3874 {
3875 	syncq_t	*sq = q->q_syncq;
3876 	ciputctrl_t *cip;
3877 	int i;
3878 
3879 	ASSERT(sq != NULL);
3880 
3881 	ASSERT(disable_putlocks == 0);
3882 	ASSERT(n_ciputctrl >= min_n_ciputctrl);
3883 	ASSERT(ciputctrl_cache != NULL);
3884 
3885 	if (!(sq->sq_type & SQ_CIPUT))
3886 		return;
3887 
3888 	for (i = 0; i <= 1; i++) {
3889 		if (sq->sq_ciputctrl == NULL) {
3890 			cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
3891 			SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
3892 			mutex_enter(SQLOCK(sq));
3893 			if (sq->sq_ciputctrl != NULL) {
3894 				mutex_exit(SQLOCK(sq));
3895 				kmem_cache_free(ciputctrl_cache, cip);
3896 			} else {
3897 				ASSERT(sq->sq_nciputctrl == 0);
3898 				sq->sq_nciputctrl = n_ciputctrl - 1;
3899 				/*
3900 				 * putnext checks sq_ciputctrl without holding
3901 				 * SQLOCK. if it is not NULL putnext assumes
3902 				 * sq_nciputctrl is initialized. membar below
3903 				 * insures that.
3904 				 */
3905 				membar_producer();
3906 				sq->sq_ciputctrl = cip;
3907 				mutex_exit(SQLOCK(sq));
3908 			}
3909 		}
3910 		ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
3911 		if (i == 1)
3912 			break;
3913 		q = _OTHERQ(q);
3914 		if (!(q->q_flag & QPERQ)) {
3915 			ASSERT(sq == q->q_syncq);
3916 			break;
3917 		}
3918 		ASSERT(q->q_syncq != NULL);
3919 		ASSERT(sq != q->q_syncq);
3920 		sq = q->q_syncq;
3921 		ASSERT(sq->sq_type & SQ_CIPUT);
3922 	}
3923 }
3924 
3925 /*
3926  * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
3927  * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
3928  * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
3929  * starting from q and down to the driver.
3930  *
3931  * This should be called after the affected queues are part of stream
3932  * geometry. It should be called from driver/module open routine after
3933  * qprocson() call. It is also called from nfs syscall where it is known that
3934  * stream is configured and won't change its geometry during create_putlock
3935  * call.
3936  *
3937  * caller normally uses 0 value for the stream argument to speed up MT putnext
3938  * into the perimeter of q for example because its perimeter is per module
3939  * (e.g. IP).
3940  *
3941  * caller normally uses non 0 value for the stream argument to hint the system
3942  * that the stream of q is a very contended global system stream
3943  * (e.g. NFS/UDP) and the part of the stream from q to the driver is
3944  * particularly MT hot.
3945  *
3946  * Caller insures stream plumbing won't happen while we are here and therefore
3947  * q_next can be safely used.
3948  */
3949 
3950 void
3951 create_putlocks(queue_t *q, int stream)
3952 {
3953 	ciputctrl_t	*cip;
3954 	struct stdata	*stp = STREAM(q);
3955 
3956 	q = _WR(q);
3957 	ASSERT(stp != NULL);
3958 
3959 	if (disable_putlocks != 0)
3960 		return;
3961 
3962 	if (n_ciputctrl < min_n_ciputctrl)
3963 		return;
3964 
3965 	ASSERT(ciputctrl_cache != NULL);
3966 
3967 	if (stream != 0 && stp->sd_ciputctrl == NULL) {
3968 		cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
3969 		SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
3970 		mutex_enter(&stp->sd_lock);
3971 		if (stp->sd_ciputctrl != NULL) {
3972 			mutex_exit(&stp->sd_lock);
3973 			kmem_cache_free(ciputctrl_cache, cip);
3974 		} else {
3975 			ASSERT(stp->sd_nciputctrl == 0);
3976 			stp->sd_nciputctrl = n_ciputctrl - 1;
3977 			/*
3978 			 * putnext checks sd_ciputctrl without holding
3979 			 * sd_lock. if it is not NULL putnext assumes
3980 			 * sd_nciputctrl is initialized. membar below
3981 			 * insures that.
3982 			 */
3983 			membar_producer();
3984 			stp->sd_ciputctrl = cip;
3985 			mutex_exit(&stp->sd_lock);
3986 		}
3987 	}
3988 
3989 	ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);
3990 
3991 	while (_SAMESTR(q)) {
3992 		create_syncq_putlocks(q);
3993 		if (stream == 0)
3994 			return;
3995 		q = q->q_next;
3996 	}
3997 	ASSERT(q != NULL);
3998 	create_syncq_putlocks(q);
3999 }
4000 
4001 /*
4002  * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
4003  * through a stream.
4004  *
4005  * Data currently record per event is a hrtime stamp, queue address, event
4006  * type, and a per type datum.  Much of the STREAMS framework is instrumented
4007  * for automatic flow tracing (when enabled).  Events can be defined and used
4008  * by STREAMS modules and drivers.
4009  *
4010  * Global objects:
4011  *
4012  *	str_ftevent() - Add a flow-trace event to a dblk.
4013  *	str_ftfree() - Free flow-trace data
4014  *
4015  * Local objects:
4016  *
4017  *	fthdr_cache - pointer to the kmem cache for trace header.
4018  *	ftblk_cache - pointer to the kmem cache for trace data blocks.
4019  */
4020 
4021 int str_ftnever = 1;	/* Don't do STREAMS flow tracing */
4022 
4023 void
4024 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
4025 {
4026 	ftblk_t *bp = hp->tail;
4027 	ftblk_t *nbp;
4028 	ftevnt_t *ep;
4029 	int ix, nix;
4030 
4031 	ASSERT(hp != NULL);
4032 
4033 	for (;;) {
4034 		if ((ix = bp->ix) == FTBLK_EVNTS) {
4035 			/*
4036 			 * Tail doesn't have room, so need a new tail.
4037 			 *
4038 			 * To make this MT safe, first, allocate a new
4039 			 * ftblk, and initialize it.  To make life a
4040 			 * little easier, reserve the first slot (mostly
4041 			 * by making ix = 1).  When we are finished with
4042 			 * the initialization, CAS this pointer to the
4043 			 * tail.  If this succeeds, this is the new
4044 			 * "next" block.  Otherwise, another thread
4045 			 * got here first, so free the block and start
4046 			 * again.
4047 			 */
4048 			if (!(nbp = kmem_cache_alloc(ftblk_cache,
4049 			    KM_NOSLEEP))) {
4050 				/* no mem, so punt */
4051 				str_ftnever++;
4052 				/* free up all flow data? */
4053 				return;
4054 			}
4055 			nbp->nxt = NULL;
4056 			nbp->ix = 1;
4057 			/*
4058 			 * Just in case there is another thread about
4059 			 * to get the next index, we need to make sure
4060 			 * the value is there for it.
4061 			 */
4062 			membar_producer();
4063 			if (casptr(&hp->tail, bp, nbp) == bp) {
4064 				/* CAS was successful */
4065 				bp->nxt = nbp;
4066 				membar_producer();
4067 				bp = nbp;
4068 				ix = 0;
4069 				goto cas_good;
4070 			} else {
4071 				kmem_cache_free(ftblk_cache, nbp);
4072 				bp = hp->tail;
4073 				continue;
4074 			}
4075 		}
4076 		nix = ix + 1;
4077 		if (cas32((uint32_t *)&bp->ix, ix, nix) == ix) {
4078 		cas_good:
4079 			if (curthread != hp->thread) {
4080 				hp->thread = curthread;
4081 				evnt |= FTEV_CS;
4082 			}
4083 			if (CPU->cpu_seqid != hp->cpu_seqid) {
4084 				hp->cpu_seqid = CPU->cpu_seqid;
4085 				evnt |= FTEV_PS;
4086 			}
4087 			ep = &bp->ev[ix];
4088 			break;
4089 		}
4090 	}
4091 
4092 	if (evnt & FTEV_QMASK) {
4093 		queue_t *qp = p;
4094 
4095 		/*
4096 		 * It is possible that the module info is broke
4097 		 * (as is logsubr.c at this comment writing).
4098 		 * Instead of panicing or doing other unmentionables,
4099 		 * we shall put a dummy name as the mid, and continue.
4100 		 */
4101 		if (qp->q_qinfo == NULL)
4102 			ep->mid = "NONAME";
4103 		else
4104 			ep->mid = qp->q_qinfo->qi_minfo->mi_idname;
4105 
4106 		if (!(qp->q_flag & QREADR))
4107 			evnt |= FTEV_ISWR;
4108 	} else {
4109 		ep->mid = (char *)p;
4110 	}
4111 
4112 	ep->ts = gethrtime();
4113 	ep->evnt = evnt;
4114 	ep->data = data;
4115 	hp->hash = (hp->hash << 9) + hp->hash;
4116 	hp->hash += (evnt << 16) | data;
4117 	hp->hash += (uintptr_t)ep->mid;
4118 }
4119 
4120 /*
4121  * Free flow-trace data.
4122  */
4123 void
4124 str_ftfree(dblk_t *dbp)
4125 {
4126 	fthdr_t *hp = dbp->db_fthdr;
4127 	ftblk_t *bp = &hp->first;
4128 	ftblk_t *nbp;
4129 
4130 	if (bp != hp->tail || bp->ix != 0) {
4131 		/*
4132 		 * Clear out the hash, have the tail point to itself, and free
4133 		 * any continuation blocks.
4134 		 */
4135 		bp = hp->first.nxt;
4136 		hp->tail = &hp->first;
4137 		hp->hash = 0;
4138 		hp->first.nxt = NULL;
4139 		hp->first.ix = 0;
4140 		while (bp != NULL) {
4141 			nbp = bp->nxt;
4142 			kmem_cache_free(ftblk_cache, bp);
4143 			bp = nbp;
4144 		}
4145 	}
4146 	kmem_cache_free(fthdr_cache, hp);
4147 	dbp->db_fthdr = NULL;
4148 }
4149