xref: /titanic_52/usr/src/uts/common/io/stream.c (revision 4bc0a2ef2b7ba50a7a717e7ddbf31472ad28e358)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
23 /*	  All Rights Reserved  	*/
24 
25 
26 /*
27  * Copyright 2005 Sun Microsystems, Inc.  All rights reserved.
28  * Use is subject to license terms.
29  */
30 
31 #pragma ident	"%Z%%M%	%I%	%E% SMI"
32 
33 #include <sys/types.h>
34 #include <sys/param.h>
35 #include <sys/thread.h>
36 #include <sys/sysmacros.h>
37 #include <sys/stropts.h>
38 #include <sys/stream.h>
39 #include <sys/strsubr.h>
40 #include <sys/strsun.h>
41 #include <sys/conf.h>
42 #include <sys/debug.h>
43 #include <sys/cmn_err.h>
44 #include <sys/kmem.h>
45 #include <sys/atomic.h>
46 #include <sys/errno.h>
47 #include <sys/vtrace.h>
48 #include <sys/ftrace.h>
49 #include <sys/ontrap.h>
50 #include <sys/multidata.h>
51 #include <sys/multidata_impl.h>
52 #include <sys/sdt.h>
53 
54 #ifdef DEBUG
55 #include <sys/kmem_impl.h>
56 #endif
57 
58 /*
59  * This file contains all the STREAMS utility routines that may
60  * be used by modules and drivers.
61  */
62 
63 /*
64  * STREAMS message allocator: principles of operation
65  *
66  * The streams message allocator consists of all the routines that
67  * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
68  * dupb(), freeb() and freemsg().  What follows is a high-level view
69  * of how the allocator works.
70  *
71  * Every streams message consists of one or more mblks, a dblk, and data.
72  * All mblks for all types of messages come from a common mblk_cache.
73  * The dblk and data come in several flavors, depending on how the
74  * message is allocated:
75  *
76  * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
77  *     fixed-size dblk/data caches. For message sizes that are multiples of
78  *     PAGESIZE, dblks are allocated separately from the buffer.
79  *     The associated buffer is allocated by the constructor using kmem_alloc().
80  *     For all other message sizes, dblk and its associated data is allocated
81  *     as a single contiguous chunk of memory.
82  *     Objects in these caches consist of a dblk plus its associated data.
83  *     allocb() determines the nearest-size cache by table lookup:
84  *     the dblk_cache[] array provides the mapping from size to dblk cache.
85  *
86  * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
87  *     kmem_alloc()'ing a buffer for the data and supplying that
88  *     buffer to gesballoc(), described below.
89  *
90  * (3) The four flavors of [d]esballoc[a] are all implemented by a
91  *     common routine, gesballoc() ("generic esballoc").  gesballoc()
92  *     allocates a dblk from the global dblk_esb_cache and sets db_base,
93  *     db_lim and db_frtnp to describe the caller-supplied buffer.
94  *
95  * While there are several routines to allocate messages, there is only
96  * one routine to free messages: freeb().  freeb() simply invokes the
97  * dblk's free method, dbp->db_free(), which is set at allocation time.
98  *
99  * dupb() creates a new reference to a message by allocating a new mblk,
100  * incrementing the dblk reference count and setting the dblk's free
101  * method to dblk_decref().  The dblk's original free method is retained
102  * in db_lastfree.  dblk_decref() decrements the reference count on each
103  * freeb().  If this is not the last reference it just frees the mblk;
104  * if this *is* the last reference, it restores db_free to db_lastfree,
105  * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
106  *
107  * The implementation makes aggressive use of kmem object caching for
108  * maximum performance.  This makes the code simple and compact, but
109  * also a bit abstruse in some places.  The invariants that constitute a
110  * message's constructed state, described below, are more subtle than usual.
111  *
112  * Every dblk has an "attached mblk" as part of its constructed state.
113  * The mblk is allocated by the dblk's constructor and remains attached
114  * until the message is either dup'ed or pulled up.  In the dupb() case
115  * the mblk association doesn't matter until the last free, at which time
116  * dblk_decref() attaches the last mblk to the dblk.  pullupmsg() affects
117  * the mblk association because it swaps the leading mblks of two messages,
118  * so it is responsible for swapping their db_mblk pointers accordingly.
119  * From a constructed-state viewpoint it doesn't matter that a dblk's
120  * attached mblk can change while the message is allocated; all that
121  * matters is that the dblk has *some* attached mblk when it's freed.
122  *
123  * The sizes of the allocb() small-message caches are not magical.
124  * They represent a good trade-off between internal and external
125  * fragmentation for current workloads.  They should be reevaluated
126  * periodically, especially if allocations larger than DBLK_MAX_CACHE
127  * become common.  We use 64-byte alignment so that dblks don't
128  * straddle cache lines unnecessarily.
129  */
130 #define	DBLK_MAX_CACHE		73728
131 #define	DBLK_CACHE_ALIGN	64
132 #define	DBLK_MIN_SIZE		8
133 #define	DBLK_SIZE_SHIFT		3
134 
135 #ifdef _BIG_ENDIAN
136 #define	DBLK_RTFU_SHIFT(field)	\
137 	(8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
138 #else
139 #define	DBLK_RTFU_SHIFT(field)	\
140 	(8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
141 #endif
142 
143 #define	DBLK_RTFU(ref, type, flags, uioflag)	\
144 	(((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
145 	((type) << DBLK_RTFU_SHIFT(db_type)) | \
146 	(((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
147 	((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
148 #define	DBLK_RTFU_REF_MASK	(DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
149 #define	DBLK_RTFU_WORD(dbp)	(*((uint32_t *)&(dbp)->db_ref))
150 #define	MBLK_BAND_FLAG_WORD(mp)	(*((uint32_t *)&(mp)->b_band))
151 
152 static size_t dblk_sizes[] = {
153 #ifdef _LP64
154 	16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3920,
155 	8192, 12112, 16384, 20304, 24576, 28496, 32768, 36688,
156 	40960, 44880, 49152, 53072, 57344, 61264, 65536, 69456,
157 #else
158 	64, 128, 320, 576, 1088, 1536, 1984, 2624, 3968,
159 	8192, 12160, 16384, 20352, 24576, 28544, 32768, 36736,
160 	40960, 44928, 49152, 53120, 57344, 61312, 65536, 69504,
161 #endif
162 	DBLK_MAX_CACHE, 0
163 };
164 
165 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
166 static struct kmem_cache *mblk_cache;
167 static struct kmem_cache *dblk_esb_cache;
168 static struct kmem_cache *fthdr_cache;
169 static struct kmem_cache *ftblk_cache;
170 
171 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
172 static mblk_t *allocb_oversize(size_t size, int flags);
173 static int allocb_tryhard_fails;
174 static void frnop_func(void *arg);
175 frtn_t frnop = { frnop_func };
176 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
177 
178 static boolean_t rwnext_enter(queue_t *qp);
179 static void rwnext_exit(queue_t *qp);
180 
181 /*
182  * Patchable mblk/dblk kmem_cache flags.
183  */
184 int dblk_kmem_flags = 0;
185 int mblk_kmem_flags = 0;
186 
187 
188 static int
189 dblk_constructor(void *buf, void *cdrarg, int kmflags)
190 {
191 	dblk_t *dbp = buf;
192 	ssize_t msg_size = (ssize_t)cdrarg;
193 	size_t index;
194 
195 	ASSERT(msg_size != 0);
196 
197 	index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
198 
199 	ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
200 
201 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
202 		return (-1);
203 	if ((msg_size & PAGEOFFSET) == 0) {
204 		dbp->db_base = kmem_alloc(msg_size, kmflags);
205 		if (dbp->db_base == NULL) {
206 			kmem_cache_free(mblk_cache, dbp->db_mblk);
207 			return (-1);
208 		}
209 	} else {
210 		dbp->db_base = (unsigned char *)&dbp[1];
211 	}
212 
213 	dbp->db_mblk->b_datap = dbp;
214 	dbp->db_cache = dblk_cache[index];
215 	dbp->db_lim = dbp->db_base + msg_size;
216 	dbp->db_free = dbp->db_lastfree = dblk_lastfree;
217 	dbp->db_frtnp = NULL;
218 	dbp->db_fthdr = NULL;
219 	dbp->db_credp = NULL;
220 	dbp->db_cpid = -1;
221 	dbp->db_struioflag = 0;
222 	dbp->db_struioun.cksum.flags = 0;
223 	return (0);
224 }
225 
226 /*ARGSUSED*/
227 static int
228 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
229 {
230 	dblk_t *dbp = buf;
231 
232 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
233 		return (-1);
234 	dbp->db_mblk->b_datap = dbp;
235 	dbp->db_cache = dblk_esb_cache;
236 	dbp->db_fthdr = NULL;
237 	dbp->db_credp = NULL;
238 	dbp->db_cpid = -1;
239 	dbp->db_struioflag = 0;
240 	dbp->db_struioun.cksum.flags = 0;
241 	return (0);
242 }
243 
244 static int
245 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
246 {
247 	dblk_t *dbp = buf;
248 	bcache_t *bcp = (bcache_t *)cdrarg;
249 
250 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
251 		return (-1);
252 
253 	if ((dbp->db_base = (unsigned char *)kmem_cache_alloc(bcp->buffer_cache,
254 	    kmflags)) == NULL) {
255 		kmem_cache_free(mblk_cache, dbp->db_mblk);
256 		return (-1);
257 	}
258 
259 	dbp->db_mblk->b_datap = dbp;
260 	dbp->db_cache = (void *)bcp;
261 	dbp->db_lim = dbp->db_base + bcp->size;
262 	dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
263 	dbp->db_frtnp = NULL;
264 	dbp->db_fthdr = NULL;
265 	dbp->db_credp = NULL;
266 	dbp->db_cpid = -1;
267 	dbp->db_struioflag = 0;
268 	dbp->db_struioun.cksum.flags = 0;
269 	return (0);
270 }
271 
272 /*ARGSUSED*/
273 static void
274 dblk_destructor(void *buf, void *cdrarg)
275 {
276 	dblk_t *dbp = buf;
277 	ssize_t msg_size = (ssize_t)cdrarg;
278 
279 	ASSERT(dbp->db_mblk->b_datap == dbp);
280 
281 	ASSERT(msg_size != 0);
282 
283 	ASSERT(dbp->db_struioflag == 0);
284 	ASSERT(dbp->db_struioun.cksum.flags == 0);
285 
286 	if ((msg_size & PAGEOFFSET) == 0) {
287 		kmem_free(dbp->db_base, msg_size);
288 	}
289 
290 	kmem_cache_free(mblk_cache, dbp->db_mblk);
291 }
292 
293 static void
294 bcache_dblk_destructor(void *buf, void *cdrarg)
295 {
296 	dblk_t *dbp = buf;
297 	bcache_t *bcp = (bcache_t *)cdrarg;
298 
299 	kmem_cache_free(bcp->buffer_cache, dbp->db_base);
300 
301 	ASSERT(dbp->db_mblk->b_datap == dbp);
302 
303 	ASSERT(dbp->db_struioflag == 0);
304 	ASSERT(dbp->db_struioun.cksum.flags == 0);
305 
306 	kmem_cache_free(mblk_cache, dbp->db_mblk);
307 }
308 
309 void
310 streams_msg_init(void)
311 {
312 	char name[40];
313 	size_t size;
314 	size_t lastsize = DBLK_MIN_SIZE;
315 	size_t *sizep;
316 	struct kmem_cache *cp;
317 	size_t tot_size;
318 	int offset;
319 
320 	mblk_cache = kmem_cache_create("streams_mblk",
321 		sizeof (mblk_t), 32, NULL, NULL, NULL, NULL, NULL,
322 		mblk_kmem_flags);
323 
324 	for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
325 
326 		if ((offset = (size & PAGEOFFSET)) != 0) {
327 			/*
328 			 * We are in the middle of a page, dblk should
329 			 * be allocated on the same page
330 			 */
331 			tot_size = size + sizeof (dblk_t);
332 			ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
333 								< PAGESIZE);
334 			ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
335 
336 		} else {
337 
338 			/*
339 			 * buf size is multiple of page size, dblk and
340 			 * buffer are allocated separately.
341 			 */
342 
343 			ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
344 			tot_size = sizeof (dblk_t);
345 		}
346 
347 		(void) sprintf(name, "streams_dblk_%ld", size);
348 		cp = kmem_cache_create(name, tot_size,
349 			DBLK_CACHE_ALIGN, dblk_constructor,
350 			dblk_destructor, NULL,
351 			(void *)(size), NULL, dblk_kmem_flags);
352 
353 		while (lastsize <= size) {
354 			dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
355 			lastsize += DBLK_MIN_SIZE;
356 		}
357 	}
358 
359 	dblk_esb_cache = kmem_cache_create("streams_dblk_esb",
360 			sizeof (dblk_t), DBLK_CACHE_ALIGN,
361 			dblk_esb_constructor, dblk_destructor, NULL,
362 			(void *) sizeof (dblk_t), NULL, dblk_kmem_flags);
363 	fthdr_cache = kmem_cache_create("streams_fthdr",
364 		sizeof (fthdr_t), 32, NULL, NULL, NULL, NULL, NULL, 0);
365 	ftblk_cache = kmem_cache_create("streams_ftblk",
366 		sizeof (ftblk_t), 32, NULL, NULL, NULL, NULL, NULL, 0);
367 
368 	/* Initialize Multidata caches */
369 	mmd_init();
370 }
371 
372 /*ARGSUSED*/
373 mblk_t *
374 allocb(size_t size, uint_t pri)
375 {
376 	dblk_t *dbp;
377 	mblk_t *mp;
378 	size_t index;
379 
380 	index =  (size - 1)  >> DBLK_SIZE_SHIFT;
381 
382 	if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
383 		if (size != 0) {
384 			mp = allocb_oversize(size, KM_NOSLEEP);
385 			goto out;
386 		}
387 		index = 0;
388 	}
389 
390 	if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
391 		mp = NULL;
392 		goto out;
393 	}
394 
395 	mp = dbp->db_mblk;
396 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
397 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
398 	mp->b_rptr = mp->b_wptr = dbp->db_base;
399 	mp->b_queue = NULL;
400 	MBLK_BAND_FLAG_WORD(mp) = 0;
401 	STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
402 out:
403 	FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
404 
405 	return (mp);
406 }
407 
408 mblk_t *
409 allocb_tmpl(size_t size, const mblk_t *tmpl)
410 {
411 	mblk_t *mp = allocb(size, 0);
412 
413 	if (mp != NULL) {
414 		cred_t *cr = DB_CRED(tmpl);
415 		if (cr != NULL)
416 			crhold(mp->b_datap->db_credp = cr);
417 		DB_CPID(mp) = DB_CPID(tmpl);
418 		DB_TYPE(mp) = DB_TYPE(tmpl);
419 	}
420 	return (mp);
421 }
422 
423 mblk_t *
424 allocb_cred(size_t size, cred_t *cr)
425 {
426 	mblk_t *mp = allocb(size, 0);
427 
428 	if (mp != NULL && cr != NULL)
429 		crhold(mp->b_datap->db_credp = cr);
430 
431 	return (mp);
432 }
433 
434 mblk_t *
435 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr)
436 {
437 	mblk_t *mp = allocb_wait(size, 0, flags, error);
438 
439 	if (mp != NULL && cr != NULL)
440 		crhold(mp->b_datap->db_credp = cr);
441 
442 	return (mp);
443 }
444 
445 void
446 freeb(mblk_t *mp)
447 {
448 	dblk_t *dbp = mp->b_datap;
449 
450 	ASSERT(dbp->db_ref > 0);
451 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
452 	FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
453 
454 	STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
455 
456 	dbp->db_free(mp, dbp);
457 }
458 
459 void
460 freemsg(mblk_t *mp)
461 {
462 	FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
463 	while (mp) {
464 		dblk_t *dbp = mp->b_datap;
465 		mblk_t *mp_cont = mp->b_cont;
466 
467 		ASSERT(dbp->db_ref > 0);
468 		ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
469 
470 		STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
471 
472 		dbp->db_free(mp, dbp);
473 		mp = mp_cont;
474 	}
475 }
476 
477 /*
478  * Reallocate a block for another use.  Try hard to use the old block.
479  * If the old data is wanted (copy), leave b_wptr at the end of the data,
480  * otherwise return b_wptr = b_rptr.
481  *
482  * This routine is private and unstable.
483  */
484 mblk_t	*
485 reallocb(mblk_t *mp, size_t size, uint_t copy)
486 {
487 	mblk_t		*mp1;
488 	unsigned char	*old_rptr;
489 	ptrdiff_t	cur_size;
490 
491 	if (mp == NULL)
492 		return (allocb(size, BPRI_HI));
493 
494 	cur_size = mp->b_wptr - mp->b_rptr;
495 	old_rptr = mp->b_rptr;
496 
497 	ASSERT(mp->b_datap->db_ref != 0);
498 
499 	if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
500 		/*
501 		 * If the data is wanted and it will fit where it is, no
502 		 * work is required.
503 		 */
504 		if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
505 			return (mp);
506 
507 		mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
508 		mp1 = mp;
509 	} else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
510 		/* XXX other mp state could be copied too, db_flags ... ? */
511 		mp1->b_cont = mp->b_cont;
512 	} else {
513 		return (NULL);
514 	}
515 
516 	if (copy) {
517 		bcopy(old_rptr, mp1->b_rptr, cur_size);
518 		mp1->b_wptr = mp1->b_rptr + cur_size;
519 	}
520 
521 	if (mp != mp1)
522 		freeb(mp);
523 
524 	return (mp1);
525 }
526 
527 static void
528 dblk_lastfree(mblk_t *mp, dblk_t *dbp)
529 {
530 	ASSERT(dbp->db_mblk == mp);
531 	if (dbp->db_fthdr != NULL)
532 		str_ftfree(dbp);
533 
534 	/* set credp and projid to be 'unspecified' before returning to cache */
535 	if (dbp->db_credp != NULL) {
536 		crfree(dbp->db_credp);
537 		dbp->db_credp = NULL;
538 	}
539 	dbp->db_cpid = -1;
540 
541 	/* Reset the struioflag and the checksum flag fields */
542 	dbp->db_struioflag = 0;
543 	dbp->db_struioun.cksum.flags = 0;
544 
545 	kmem_cache_free(dbp->db_cache, dbp);
546 }
547 
548 static void
549 dblk_decref(mblk_t *mp, dblk_t *dbp)
550 {
551 	if (dbp->db_ref != 1) {
552 		uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
553 		    -(1 << DBLK_RTFU_SHIFT(db_ref)));
554 		/*
555 		 * atomic_add_32_nv() just decremented db_ref, so we no longer
556 		 * have a reference to the dblk, which means another thread
557 		 * could free it.  Therefore we cannot examine the dblk to
558 		 * determine whether ours was the last reference.  Instead,
559 		 * we extract the new and minimum reference counts from rtfu.
560 		 * Note that all we're really saying is "if (ref != refmin)".
561 		 */
562 		if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
563 		    ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
564 			kmem_cache_free(mblk_cache, mp);
565 			return;
566 		}
567 	}
568 	dbp->db_mblk = mp;
569 	dbp->db_free = dbp->db_lastfree;
570 	dbp->db_lastfree(mp, dbp);
571 }
572 
573 mblk_t *
574 dupb(mblk_t *mp)
575 {
576 	dblk_t *dbp = mp->b_datap;
577 	mblk_t *new_mp;
578 	uint32_t oldrtfu, newrtfu;
579 
580 	if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
581 		goto out;
582 
583 	new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
584 	new_mp->b_rptr = mp->b_rptr;
585 	new_mp->b_wptr = mp->b_wptr;
586 	new_mp->b_datap = dbp;
587 	new_mp->b_queue = NULL;
588 	MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
589 
590 	STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
591 
592 	/*
593 	 * First-dup optimization.  The enabling assumption is that there
594 	 * can can never be a race (in correct code) to dup the first copy
595 	 * of a message.  Therefore we don't need to do it atomically.
596 	 */
597 	if (dbp->db_free != dblk_decref) {
598 		dbp->db_free = dblk_decref;
599 		dbp->db_ref++;
600 		goto out;
601 	}
602 
603 	do {
604 		ASSERT(dbp->db_ref > 0);
605 		oldrtfu = DBLK_RTFU_WORD(dbp);
606 		newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
607 		/*
608 		 * If db_ref is maxed out we can't dup this message anymore.
609 		 */
610 		if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
611 			kmem_cache_free(mblk_cache, new_mp);
612 			new_mp = NULL;
613 			goto out;
614 		}
615 	} while (cas32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != oldrtfu);
616 
617 out:
618 	FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
619 	return (new_mp);
620 }
621 
622 static void
623 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
624 {
625 	frtn_t *frp = dbp->db_frtnp;
626 
627 	ASSERT(dbp->db_mblk == mp);
628 	frp->free_func(frp->free_arg);
629 	if (dbp->db_fthdr != NULL)
630 		str_ftfree(dbp);
631 
632 	/* set credp and projid to be 'unspecified' before returning to cache */
633 	if (dbp->db_credp != NULL) {
634 		crfree(dbp->db_credp);
635 		dbp->db_credp = NULL;
636 	}
637 	dbp->db_cpid = -1;
638 	dbp->db_struioflag = 0;
639 	dbp->db_struioun.cksum.flags = 0;
640 
641 	kmem_cache_free(dbp->db_cache, dbp);
642 }
643 
644 /*ARGSUSED*/
645 static void
646 frnop_func(void *arg)
647 {
648 }
649 
650 /*
651  * Generic esballoc used to implement the four flavors: [d]esballoc[a].
652  */
653 static mblk_t *
654 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
655 	void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
656 {
657 	dblk_t *dbp;
658 	mblk_t *mp;
659 
660 	ASSERT(base != NULL && frp != NULL);
661 
662 	if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
663 		mp = NULL;
664 		goto out;
665 	}
666 
667 	mp = dbp->db_mblk;
668 	dbp->db_base = base;
669 	dbp->db_lim = base + size;
670 	dbp->db_free = dbp->db_lastfree = lastfree;
671 	dbp->db_frtnp = frp;
672 	DBLK_RTFU_WORD(dbp) = db_rtfu;
673 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
674 	mp->b_rptr = mp->b_wptr = base;
675 	mp->b_queue = NULL;
676 	MBLK_BAND_FLAG_WORD(mp) = 0;
677 
678 out:
679 	FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
680 	return (mp);
681 }
682 
683 /*ARGSUSED*/
684 mblk_t *
685 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
686 {
687 	mblk_t *mp;
688 
689 	/*
690 	 * Note that this is structured to allow the common case (i.e.
691 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
692 	 * call optimization.
693 	 */
694 	if (!str_ftnever) {
695 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
696 		    frp, freebs_enqueue, KM_NOSLEEP);
697 
698 		if (mp != NULL)
699 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
700 		return (mp);
701 	}
702 
703 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
704 	    frp, freebs_enqueue, KM_NOSLEEP));
705 }
706 
707 /*
708  * Same as esballoc() but sleeps waiting for memory.
709  */
710 /*ARGSUSED*/
711 mblk_t *
712 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
713 {
714 	mblk_t *mp;
715 
716 	/*
717 	 * Note that this is structured to allow the common case (i.e.
718 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
719 	 * call optimization.
720 	 */
721 	if (!str_ftnever) {
722 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
723 		    frp, freebs_enqueue, KM_SLEEP);
724 
725 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
726 		return (mp);
727 	}
728 
729 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
730 	    frp, freebs_enqueue, KM_SLEEP));
731 }
732 
733 /*ARGSUSED*/
734 mblk_t *
735 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
736 {
737 	mblk_t *mp;
738 
739 	/*
740 	 * Note that this is structured to allow the common case (i.e.
741 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
742 	 * call optimization.
743 	 */
744 	if (!str_ftnever) {
745 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
746 			frp, dblk_lastfree_desb, KM_NOSLEEP);
747 
748 		if (mp != NULL)
749 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
750 		return (mp);
751 	}
752 
753 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
754 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
755 }
756 
757 /*ARGSUSED*/
758 mblk_t *
759 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
760 {
761 	mblk_t *mp;
762 
763 	/*
764 	 * Note that this is structured to allow the common case (i.e.
765 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
766 	 * call optimization.
767 	 */
768 	if (!str_ftnever) {
769 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
770 		    frp, freebs_enqueue, KM_NOSLEEP);
771 
772 		if (mp != NULL)
773 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
774 		return (mp);
775 	}
776 
777 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
778 	    frp, freebs_enqueue, KM_NOSLEEP));
779 }
780 
781 /*ARGSUSED*/
782 mblk_t *
783 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
784 {
785 	mblk_t *mp;
786 
787 	/*
788 	 * Note that this is structured to allow the common case (i.e.
789 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
790 	 * call optimization.
791 	 */
792 	if (!str_ftnever) {
793 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
794 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
795 
796 		if (mp != NULL)
797 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
798 		return (mp);
799 	}
800 
801 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
802 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
803 }
804 
805 static void
806 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
807 {
808 	bcache_t *bcp = dbp->db_cache;
809 
810 	ASSERT(dbp->db_mblk == mp);
811 	if (dbp->db_fthdr != NULL)
812 		str_ftfree(dbp);
813 
814 	/* set credp and projid to be 'unspecified' before returning to cache */
815 	if (dbp->db_credp != NULL) {
816 		crfree(dbp->db_credp);
817 		dbp->db_credp = NULL;
818 	}
819 	dbp->db_cpid = -1;
820 	dbp->db_struioflag = 0;
821 	dbp->db_struioun.cksum.flags = 0;
822 
823 	mutex_enter(&bcp->mutex);
824 	kmem_cache_free(bcp->dblk_cache, dbp);
825 	bcp->alloc--;
826 
827 	if (bcp->alloc == 0 && bcp->destroy != 0) {
828 		kmem_cache_destroy(bcp->dblk_cache);
829 		kmem_cache_destroy(bcp->buffer_cache);
830 		mutex_exit(&bcp->mutex);
831 		mutex_destroy(&bcp->mutex);
832 		kmem_free(bcp, sizeof (bcache_t));
833 	} else {
834 		mutex_exit(&bcp->mutex);
835 	}
836 }
837 
838 bcache_t *
839 bcache_create(char *name, size_t size, uint_t align)
840 {
841 	bcache_t *bcp;
842 	char buffer[255];
843 
844 	ASSERT((align & (align - 1)) == 0);
845 
846 	if ((bcp = (bcache_t *)kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) ==
847 	    NULL) {
848 		return (NULL);
849 	}
850 
851 	bcp->size = size;
852 	bcp->align = align;
853 	bcp->alloc = 0;
854 	bcp->destroy = 0;
855 
856 	mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
857 
858 	(void) sprintf(buffer, "%s_buffer_cache", name);
859 	bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
860 	    NULL, NULL, NULL, 0);
861 	(void) sprintf(buffer, "%s_dblk_cache", name);
862 	bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
863 	    DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
864 						NULL, (void *)bcp, NULL, 0);
865 
866 	return (bcp);
867 }
868 
869 void
870 bcache_destroy(bcache_t *bcp)
871 {
872 	ASSERT(bcp != NULL);
873 
874 	mutex_enter(&bcp->mutex);
875 	if (bcp->alloc == 0) {
876 		kmem_cache_destroy(bcp->dblk_cache);
877 		kmem_cache_destroy(bcp->buffer_cache);
878 		mutex_exit(&bcp->mutex);
879 		mutex_destroy(&bcp->mutex);
880 		kmem_free(bcp, sizeof (bcache_t));
881 	} else {
882 		bcp->destroy++;
883 		mutex_exit(&bcp->mutex);
884 	}
885 }
886 
887 /*ARGSUSED*/
888 mblk_t *
889 bcache_allocb(bcache_t *bcp, uint_t pri)
890 {
891 	dblk_t *dbp;
892 	mblk_t *mp = NULL;
893 
894 	ASSERT(bcp != NULL);
895 
896 	mutex_enter(&bcp->mutex);
897 	if (bcp->destroy != 0) {
898 		mutex_exit(&bcp->mutex);
899 		goto out;
900 	}
901 
902 	if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
903 		mutex_exit(&bcp->mutex);
904 		goto out;
905 	}
906 	bcp->alloc++;
907 	mutex_exit(&bcp->mutex);
908 
909 	ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
910 
911 	mp = dbp->db_mblk;
912 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
913 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
914 	mp->b_rptr = mp->b_wptr = dbp->db_base;
915 	mp->b_queue = NULL;
916 	MBLK_BAND_FLAG_WORD(mp) = 0;
917 	STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
918 out:
919 	FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
920 
921 	return (mp);
922 }
923 
924 static void
925 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
926 {
927 	ASSERT(dbp->db_mblk == mp);
928 	if (dbp->db_fthdr != NULL)
929 		str_ftfree(dbp);
930 
931 	/* set credp and projid to be 'unspecified' before returning to cache */
932 	if (dbp->db_credp != NULL) {
933 		crfree(dbp->db_credp);
934 		dbp->db_credp = NULL;
935 	}
936 	dbp->db_cpid = -1;
937 	dbp->db_struioflag = 0;
938 	dbp->db_struioun.cksum.flags = 0;
939 
940 	kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
941 	kmem_cache_free(dbp->db_cache, dbp);
942 }
943 
944 static mblk_t *
945 allocb_oversize(size_t size, int kmflags)
946 {
947 	mblk_t *mp;
948 	void *buf;
949 
950 	size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
951 	if ((buf = kmem_alloc(size, kmflags)) == NULL)
952 		return (NULL);
953 	if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
954 	    &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
955 		kmem_free(buf, size);
956 
957 	if (mp != NULL)
958 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
959 
960 	return (mp);
961 }
962 
963 mblk_t *
964 allocb_tryhard(size_t target_size)
965 {
966 	size_t size;
967 	mblk_t *bp;
968 
969 	for (size = target_size; size < target_size + 512;
970 	    size += DBLK_CACHE_ALIGN)
971 		if ((bp = allocb(size, BPRI_HI)) != NULL)
972 			return (bp);
973 	allocb_tryhard_fails++;
974 	return (NULL);
975 }
976 
977 /*
978  * This routine is consolidation private for STREAMS internal use
979  * This routine may only be called from sync routines (i.e., not
980  * from put or service procedures).  It is located here (rather
981  * than strsubr.c) so that we don't have to expose all of the
982  * allocb() implementation details in header files.
983  */
984 mblk_t *
985 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
986 {
987 	dblk_t *dbp;
988 	mblk_t *mp;
989 	size_t index;
990 
991 	index = (size -1) >> DBLK_SIZE_SHIFT;
992 
993 	if (flags & STR_NOSIG) {
994 		if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
995 			if (size != 0) {
996 				mp = allocb_oversize(size, KM_SLEEP);
997 				FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
998 				    (uintptr_t)mp);
999 				return (mp);
1000 			}
1001 			index = 0;
1002 		}
1003 
1004 		dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
1005 		mp = dbp->db_mblk;
1006 		DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1007 		mp->b_next = mp->b_prev = mp->b_cont = NULL;
1008 		mp->b_rptr = mp->b_wptr = dbp->db_base;
1009 		mp->b_queue = NULL;
1010 		MBLK_BAND_FLAG_WORD(mp) = 0;
1011 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
1012 
1013 		FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
1014 
1015 	} else {
1016 		while ((mp = allocb(size, pri)) == NULL) {
1017 			if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
1018 				return (NULL);
1019 		}
1020 	}
1021 
1022 	return (mp);
1023 }
1024 
1025 /*
1026  * Call function 'func' with 'arg' when a class zero block can
1027  * be allocated with priority 'pri'.
1028  */
1029 bufcall_id_t
1030 esbbcall(uint_t pri, void (*func)(void *), void *arg)
1031 {
1032 	return (bufcall(1, pri, func, arg));
1033 }
1034 
1035 /*
1036  * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
1037  * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
1038  * This provides consistency for all internal allocators of ioctl.
1039  */
1040 mblk_t *
1041 mkiocb(uint_t cmd)
1042 {
1043 	struct iocblk	*ioc;
1044 	mblk_t		*mp;
1045 
1046 	/*
1047 	 * Allocate enough space for any of the ioctl related messages.
1048 	 */
1049 	if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
1050 		return (NULL);
1051 
1052 	bzero(mp->b_rptr, sizeof (union ioctypes));
1053 
1054 	/*
1055 	 * Set the mblk_t information and ptrs correctly.
1056 	 */
1057 	mp->b_wptr += sizeof (struct iocblk);
1058 	mp->b_datap->db_type = M_IOCTL;
1059 
1060 	/*
1061 	 * Fill in the fields.
1062 	 */
1063 	ioc		= (struct iocblk *)mp->b_rptr;
1064 	ioc->ioc_cmd	= cmd;
1065 	ioc->ioc_cr	= kcred;
1066 	ioc->ioc_id	= getiocseqno();
1067 	ioc->ioc_flag	= IOC_NATIVE;
1068 	return (mp);
1069 }
1070 
1071 /*
1072  * test if block of given size can be allocated with a request of
1073  * the given priority.
1074  * 'pri' is no longer used, but is retained for compatibility.
1075  */
1076 /* ARGSUSED */
1077 int
1078 testb(size_t size, uint_t pri)
1079 {
1080 	return ((size + sizeof (dblk_t)) <= kmem_avail());
1081 }
1082 
1083 /*
1084  * Call function 'func' with argument 'arg' when there is a reasonably
1085  * good chance that a block of size 'size' can be allocated.
1086  * 'pri' is no longer used, but is retained for compatibility.
1087  */
1088 /* ARGSUSED */
1089 bufcall_id_t
1090 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
1091 {
1092 	static long bid = 1;	/* always odd to save checking for zero */
1093 	bufcall_id_t bc_id;
1094 	struct strbufcall *bcp;
1095 
1096 	if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
1097 		return (0);
1098 
1099 	bcp->bc_func = func;
1100 	bcp->bc_arg = arg;
1101 	bcp->bc_size = size;
1102 	bcp->bc_next = NULL;
1103 	bcp->bc_executor = NULL;
1104 
1105 	mutex_enter(&strbcall_lock);
1106 	/*
1107 	 * After bcp is linked into strbcalls and strbcall_lock is dropped there
1108 	 * should be no references to bcp since it may be freed by
1109 	 * runbufcalls(). Since bcp_id field is returned, we save its value in
1110 	 * the local var.
1111 	 */
1112 	bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2);	/* keep it odd */
1113 
1114 	/*
1115 	 * add newly allocated stream event to existing
1116 	 * linked list of events.
1117 	 */
1118 	if (strbcalls.bc_head == NULL) {
1119 		strbcalls.bc_head = strbcalls.bc_tail = bcp;
1120 	} else {
1121 		strbcalls.bc_tail->bc_next = bcp;
1122 		strbcalls.bc_tail = bcp;
1123 	}
1124 
1125 	cv_signal(&strbcall_cv);
1126 	mutex_exit(&strbcall_lock);
1127 	return (bc_id);
1128 }
1129 
1130 /*
1131  * Cancel a bufcall request.
1132  */
1133 void
1134 unbufcall(bufcall_id_t id)
1135 {
1136 	strbufcall_t *bcp, *pbcp;
1137 
1138 	mutex_enter(&strbcall_lock);
1139 again:
1140 	pbcp = NULL;
1141 	for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
1142 		if (id == bcp->bc_id)
1143 			break;
1144 		pbcp = bcp;
1145 	}
1146 	if (bcp) {
1147 		if (bcp->bc_executor != NULL) {
1148 			if (bcp->bc_executor != curthread) {
1149 				cv_wait(&bcall_cv, &strbcall_lock);
1150 				goto again;
1151 			}
1152 		} else {
1153 			if (pbcp)
1154 				pbcp->bc_next = bcp->bc_next;
1155 			else
1156 				strbcalls.bc_head = bcp->bc_next;
1157 			if (bcp == strbcalls.bc_tail)
1158 				strbcalls.bc_tail = pbcp;
1159 			kmem_free(bcp, sizeof (strbufcall_t));
1160 		}
1161 	}
1162 	mutex_exit(&strbcall_lock);
1163 }
1164 
1165 /*
1166  * Duplicate a message block by block (uses dupb), returning
1167  * a pointer to the duplicate message.
1168  * Returns a non-NULL value only if the entire message
1169  * was dup'd.
1170  */
1171 mblk_t *
1172 dupmsg(mblk_t *bp)
1173 {
1174 	mblk_t *head, *nbp;
1175 
1176 	if (!bp || !(nbp = head = dupb(bp)))
1177 		return (NULL);
1178 
1179 	while (bp->b_cont) {
1180 		if (!(nbp->b_cont = dupb(bp->b_cont))) {
1181 			freemsg(head);
1182 			return (NULL);
1183 		}
1184 		nbp = nbp->b_cont;
1185 		bp = bp->b_cont;
1186 	}
1187 	return (head);
1188 }
1189 
1190 #define	DUPB_NOLOAN(bp) \
1191 	((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
1192 	copyb((bp)) : dupb((bp)))
1193 
1194 mblk_t *
1195 dupmsg_noloan(mblk_t *bp)
1196 {
1197 	mblk_t *head, *nbp;
1198 
1199 	if (bp == NULL || DB_TYPE(bp) != M_DATA ||
1200 	    ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
1201 		return (NULL);
1202 
1203 	while (bp->b_cont) {
1204 		if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
1205 			freemsg(head);
1206 			return (NULL);
1207 		}
1208 		nbp = nbp->b_cont;
1209 		bp = bp->b_cont;
1210 	}
1211 	return (head);
1212 }
1213 
1214 /*
1215  * Copy data from message and data block to newly allocated message and
1216  * data block. Returns new message block pointer, or NULL if error.
1217  * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
1218  * as in the original even when db_base is not word aligned. (bug 1052877)
1219  */
1220 mblk_t *
1221 copyb(mblk_t *bp)
1222 {
1223 	mblk_t	*nbp;
1224 	dblk_t	*dp, *ndp;
1225 	uchar_t *base;
1226 	size_t	size;
1227 	size_t	unaligned;
1228 
1229 	ASSERT(bp->b_wptr >= bp->b_rptr);
1230 
1231 	dp = bp->b_datap;
1232 	if (dp->db_fthdr != NULL)
1233 		STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
1234 
1235 	/*
1236 	 * Special handling for Multidata message; this should be
1237 	 * removed once a copy-callback routine is made available.
1238 	 */
1239 	if (dp->db_type == M_MULTIDATA) {
1240 		cred_t *cr;
1241 
1242 		if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL)
1243 			return (NULL);
1244 
1245 		nbp->b_flag = bp->b_flag;
1246 		nbp->b_band = bp->b_band;
1247 		ndp = nbp->b_datap;
1248 
1249 		/* See comments below on potential issues. */
1250 		STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1251 
1252 		ASSERT(ndp->db_type == dp->db_type);
1253 		cr = dp->db_credp;
1254 		if (cr != NULL)
1255 			crhold(ndp->db_credp = cr);
1256 		ndp->db_cpid = dp->db_cpid;
1257 		return (nbp);
1258 	}
1259 
1260 	size = dp->db_lim - dp->db_base;
1261 	unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
1262 	if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
1263 		return (NULL);
1264 	nbp->b_flag = bp->b_flag;
1265 	nbp->b_band = bp->b_band;
1266 	ndp = nbp->b_datap;
1267 
1268 	/*
1269 	 * Well, here is a potential issue.  If we are trying to
1270 	 * trace a flow, and we copy the message, we might lose
1271 	 * information about where this message might have been.
1272 	 * So we should inherit the FT data.  On the other hand,
1273 	 * a user might be interested only in alloc to free data.
1274 	 * So I guess the real answer is to provide a tunable.
1275 	 */
1276 	STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1277 
1278 	base = ndp->db_base + unaligned;
1279 	bcopy(dp->db_base, ndp->db_base + unaligned, size);
1280 
1281 	nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
1282 	nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
1283 
1284 	return (nbp);
1285 }
1286 
1287 /*
1288  * Copy data from message to newly allocated message using new
1289  * data blocks.  Returns a pointer to the new message, or NULL if error.
1290  */
1291 mblk_t *
1292 copymsg(mblk_t *bp)
1293 {
1294 	mblk_t *head, *nbp;
1295 
1296 	if (!bp || !(nbp = head = copyb(bp)))
1297 		return (NULL);
1298 
1299 	while (bp->b_cont) {
1300 		if (!(nbp->b_cont = copyb(bp->b_cont))) {
1301 			freemsg(head);
1302 			return (NULL);
1303 		}
1304 		nbp = nbp->b_cont;
1305 		bp = bp->b_cont;
1306 	}
1307 	return (head);
1308 }
1309 
1310 /*
1311  * link a message block to tail of message
1312  */
1313 void
1314 linkb(mblk_t *mp, mblk_t *bp)
1315 {
1316 	ASSERT(mp && bp);
1317 
1318 	for (; mp->b_cont; mp = mp->b_cont)
1319 		;
1320 	mp->b_cont = bp;
1321 }
1322 
1323 /*
1324  * unlink a message block from head of message
1325  * return pointer to new message.
1326  * NULL if message becomes empty.
1327  */
1328 mblk_t *
1329 unlinkb(mblk_t *bp)
1330 {
1331 	mblk_t *bp1;
1332 
1333 	bp1 = bp->b_cont;
1334 	bp->b_cont = NULL;
1335 	return (bp1);
1336 }
1337 
1338 /*
1339  * remove a message block "bp" from message "mp"
1340  *
1341  * Return pointer to new message or NULL if no message remains.
1342  * Return -1 if bp is not found in message.
1343  */
1344 mblk_t *
1345 rmvb(mblk_t *mp, mblk_t *bp)
1346 {
1347 	mblk_t *tmp;
1348 	mblk_t *lastp = NULL;
1349 
1350 	ASSERT(mp && bp);
1351 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
1352 		if (tmp == bp) {
1353 			if (lastp)
1354 				lastp->b_cont = tmp->b_cont;
1355 			else
1356 				mp = tmp->b_cont;
1357 			tmp->b_cont = NULL;
1358 			return (mp);
1359 		}
1360 		lastp = tmp;
1361 	}
1362 	return ((mblk_t *)-1);
1363 }
1364 
1365 /*
1366  * Concatenate and align first len bytes of common
1367  * message type.  Len == -1, means concat everything.
1368  * Returns 1 on success, 0 on failure
1369  * After the pullup, mp points to the pulled up data.
1370  */
1371 int
1372 pullupmsg(mblk_t *mp, ssize_t len)
1373 {
1374 	mblk_t *bp, *b_cont;
1375 	dblk_t *dbp;
1376 	ssize_t n;
1377 
1378 	ASSERT(mp->b_datap->db_ref > 0);
1379 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
1380 
1381 	/*
1382 	 * We won't handle Multidata message, since it contains
1383 	 * metadata which this function has no knowledge of; we
1384 	 * assert on DEBUG, and return failure otherwise.
1385 	 */
1386 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1387 	if (mp->b_datap->db_type == M_MULTIDATA)
1388 		return (0);
1389 
1390 	if (len == -1) {
1391 		if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
1392 			return (1);
1393 		len = xmsgsize(mp);
1394 	} else {
1395 		ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
1396 		ASSERT(first_mblk_len >= 0);
1397 		/*
1398 		 * If the length is less than that of the first mblk,
1399 		 * we want to pull up the message into an aligned mblk.
1400 		 * Though not part of the spec, some callers assume it.
1401 		 */
1402 		if (len <= first_mblk_len) {
1403 			if (str_aligned(mp->b_rptr))
1404 				return (1);
1405 			len = first_mblk_len;
1406 		} else if (xmsgsize(mp) < len)
1407 			return (0);
1408 	}
1409 
1410 	if ((bp = allocb_tmpl(len, mp)) == NULL)
1411 		return (0);
1412 
1413 	dbp = bp->b_datap;
1414 	*bp = *mp;		/* swap mblks so bp heads the old msg... */
1415 	mp->b_datap = dbp;	/* ... and mp heads the new message */
1416 	mp->b_datap->db_mblk = mp;
1417 	bp->b_datap->db_mblk = bp;
1418 	mp->b_rptr = mp->b_wptr = dbp->db_base;
1419 
1420 	do {
1421 		ASSERT(bp->b_datap->db_ref > 0);
1422 		ASSERT(bp->b_wptr >= bp->b_rptr);
1423 		n = MIN(bp->b_wptr - bp->b_rptr, len);
1424 		bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
1425 		mp->b_wptr += n;
1426 		bp->b_rptr += n;
1427 		len -= n;
1428 		if (bp->b_rptr != bp->b_wptr)
1429 			break;
1430 		b_cont = bp->b_cont;
1431 		freeb(bp);
1432 		bp = b_cont;
1433 	} while (len && bp);
1434 
1435 	mp->b_cont = bp;	/* tack on whatever wasn't pulled up */
1436 
1437 	return (1);
1438 }
1439 
1440 /*
1441  * Concatenate and align at least the first len bytes of common message
1442  * type.  Len == -1 means concatenate everything.  The original message is
1443  * unaltered.  Returns a pointer to a new message on success, otherwise
1444  * returns NULL.
1445  */
1446 mblk_t *
1447 msgpullup(mblk_t *mp, ssize_t len)
1448 {
1449 	mblk_t	*newmp;
1450 	ssize_t	totlen;
1451 	ssize_t	n;
1452 
1453 	/*
1454 	 * We won't handle Multidata message, since it contains
1455 	 * metadata which this function has no knowledge of; we
1456 	 * assert on DEBUG, and return failure otherwise.
1457 	 */
1458 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1459 	if (mp->b_datap->db_type == M_MULTIDATA)
1460 		return (NULL);
1461 
1462 	totlen = xmsgsize(mp);
1463 
1464 	if ((len > 0) && (len > totlen))
1465 		return (NULL);
1466 
1467 	/*
1468 	 * Copy all of the first msg type into one new mblk, then dupmsg
1469 	 * and link the rest onto this.
1470 	 */
1471 
1472 	len = totlen;
1473 
1474 	if ((newmp = allocb_tmpl(len, mp)) == NULL)
1475 		return (NULL);
1476 
1477 	newmp->b_flag = mp->b_flag;
1478 	newmp->b_band = mp->b_band;
1479 
1480 	while (len > 0) {
1481 		n = mp->b_wptr - mp->b_rptr;
1482 		ASSERT(n >= 0);		/* allow zero-length mblk_t's */
1483 		if (n > 0)
1484 			bcopy(mp->b_rptr, newmp->b_wptr, n);
1485 		newmp->b_wptr += n;
1486 		len -= n;
1487 		mp = mp->b_cont;
1488 	}
1489 
1490 	if (mp != NULL) {
1491 		newmp->b_cont = dupmsg(mp);
1492 		if (newmp->b_cont == NULL) {
1493 			freemsg(newmp);
1494 			return (NULL);
1495 		}
1496 	}
1497 
1498 	return (newmp);
1499 }
1500 
1501 /*
1502  * Trim bytes from message
1503  *  len > 0, trim from head
1504  *  len < 0, trim from tail
1505  * Returns 1 on success, 0 on failure.
1506  */
1507 int
1508 adjmsg(mblk_t *mp, ssize_t len)
1509 {
1510 	mblk_t *bp;
1511 	mblk_t *save_bp = NULL;
1512 	mblk_t *prev_bp;
1513 	mblk_t *bcont;
1514 	unsigned char type;
1515 	ssize_t n;
1516 	int fromhead;
1517 	int first;
1518 
1519 	ASSERT(mp != NULL);
1520 	/*
1521 	 * We won't handle Multidata message, since it contains
1522 	 * metadata which this function has no knowledge of; we
1523 	 * assert on DEBUG, and return failure otherwise.
1524 	 */
1525 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1526 	if (mp->b_datap->db_type == M_MULTIDATA)
1527 		return (0);
1528 
1529 	if (len < 0) {
1530 		fromhead = 0;
1531 		len = -len;
1532 	} else {
1533 		fromhead = 1;
1534 	}
1535 
1536 	if (xmsgsize(mp) < len)
1537 		return (0);
1538 
1539 
1540 	if (fromhead) {
1541 		first = 1;
1542 		while (len) {
1543 			ASSERT(mp->b_wptr >= mp->b_rptr);
1544 			n = MIN(mp->b_wptr - mp->b_rptr, len);
1545 			mp->b_rptr += n;
1546 			len -= n;
1547 
1548 			/*
1549 			 * If this is not the first zero length
1550 			 * message remove it
1551 			 */
1552 			if (!first && (mp->b_wptr == mp->b_rptr)) {
1553 				bcont = mp->b_cont;
1554 				freeb(mp);
1555 				mp = save_bp->b_cont = bcont;
1556 			} else {
1557 				save_bp = mp;
1558 				mp = mp->b_cont;
1559 			}
1560 			first = 0;
1561 		}
1562 	} else {
1563 		type = mp->b_datap->db_type;
1564 		while (len) {
1565 			bp = mp;
1566 			save_bp = NULL;
1567 
1568 			/*
1569 			 * Find the last message of same type
1570 			 */
1571 
1572 			while (bp && bp->b_datap->db_type == type) {
1573 				ASSERT(bp->b_wptr >= bp->b_rptr);
1574 				prev_bp = save_bp;
1575 				save_bp = bp;
1576 				bp = bp->b_cont;
1577 			}
1578 			if (save_bp == NULL)
1579 				break;
1580 			n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
1581 			save_bp->b_wptr -= n;
1582 			len -= n;
1583 
1584 			/*
1585 			 * If this is not the first message
1586 			 * and we have taken away everything
1587 			 * from this message, remove it
1588 			 */
1589 
1590 			if ((save_bp != mp) &&
1591 				(save_bp->b_wptr == save_bp->b_rptr)) {
1592 				bcont = save_bp->b_cont;
1593 				freeb(save_bp);
1594 				prev_bp->b_cont = bcont;
1595 			}
1596 		}
1597 	}
1598 	return (1);
1599 }
1600 
1601 /*
1602  * get number of data bytes in message
1603  */
1604 size_t
1605 msgdsize(mblk_t *bp)
1606 {
1607 	size_t count = 0;
1608 
1609 	for (; bp; bp = bp->b_cont)
1610 		if (bp->b_datap->db_type == M_DATA) {
1611 			ASSERT(bp->b_wptr >= bp->b_rptr);
1612 			count += bp->b_wptr - bp->b_rptr;
1613 		}
1614 	return (count);
1615 }
1616 
1617 /*
1618  * Get a message off head of queue
1619  *
1620  * If queue has no buffers then mark queue
1621  * with QWANTR. (queue wants to be read by
1622  * someone when data becomes available)
1623  *
1624  * If there is something to take off then do so.
1625  * If queue falls below hi water mark turn off QFULL
1626  * flag.  Decrement weighted count of queue.
1627  * Also turn off QWANTR because queue is being read.
1628  *
1629  * The queue count is maintained on a per-band basis.
1630  * Priority band 0 (normal messages) uses q_count,
1631  * q_lowat, etc.  Non-zero priority bands use the
1632  * fields in their respective qband structures
1633  * (qb_count, qb_lowat, etc.)  All messages appear
1634  * on the same list, linked via their b_next pointers.
1635  * q_first is the head of the list.  q_count does
1636  * not reflect the size of all the messages on the
1637  * queue.  It only reflects those messages in the
1638  * normal band of flow.  The one exception to this
1639  * deals with high priority messages.  They are in
1640  * their own conceptual "band", but are accounted
1641  * against q_count.
1642  *
1643  * If queue count is below the lo water mark and QWANTW
1644  * is set, enable the closest backq which has a service
1645  * procedure and turn off the QWANTW flag.
1646  *
1647  * getq could be built on top of rmvq, but isn't because
1648  * of performance considerations.
1649  *
1650  * A note on the use of q_count and q_mblkcnt:
1651  *   q_count is the traditional byte count for messages that
1652  *   have been put on a queue.  Documentation tells us that
1653  *   we shouldn't rely on that count, but some drivers/modules
1654  *   do.  What was needed, however, is a mechanism to prevent
1655  *   runaway streams from consuming all of the resources,
1656  *   and particularly be able to flow control zero-length
1657  *   messages.  q_mblkcnt is used for this purpose.  It
1658  *   counts the number of mblk's that are being put on
1659  *   the queue.  The intention here, is that each mblk should
1660  *   contain one byte of data and, for the purpose of
1661  *   flow-control, logically does.  A queue will become
1662  *   full when EITHER of these values (q_count and q_mblkcnt)
1663  *   reach the highwater mark.  It will clear when BOTH
1664  *   of them drop below the highwater mark.  And it will
1665  *   backenable when BOTH of them drop below the lowwater
1666  *   mark.
1667  *   With this algorithm, a driver/module might be able
1668  *   to find a reasonably accurate q_count, and the
1669  *   framework can still try and limit resource usage.
1670  */
1671 mblk_t *
1672 getq(queue_t *q)
1673 {
1674 	mblk_t *bp;
1675 	uchar_t band = 0;
1676 
1677 	bp = getq_noenab(q);
1678 	if (bp != NULL)
1679 		band = bp->b_band;
1680 
1681 	/*
1682 	 * Inlined from qbackenable().
1683 	 * Quick check without holding the lock.
1684 	 */
1685 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1686 		return (bp);
1687 
1688 	qbackenable(q, band);
1689 	return (bp);
1690 }
1691 
1692 /*
1693  * Calculate number of data bytes in a single data message block taking
1694  * multidata messages into account.
1695  */
1696 
1697 #define	ADD_MBLK_SIZE(mp, size) 					\
1698 	if (DB_TYPE(mp) != M_MULTIDATA) {				\
1699 		(size) += MBLKL(mp);					\
1700 	} else {							\
1701 		uint_t	pinuse;						\
1702 									\
1703 		mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse);	\
1704 		(size) += pinuse;					\
1705 	}
1706 
1707 /*
1708  * Like getq() but does not backenable.  This is used by the stream
1709  * head when a putback() is likely.  The caller must call qbackenable()
1710  * after it is done with accessing the queue.
1711  */
1712 mblk_t *
1713 getq_noenab(queue_t *q)
1714 {
1715 	mblk_t *bp;
1716 	mblk_t *tmp;
1717 	qband_t *qbp;
1718 	kthread_id_t freezer;
1719 	int	bytecnt = 0, mblkcnt = 0;
1720 
1721 	/* freezestr should allow its caller to call getq/putq */
1722 	freezer = STREAM(q)->sd_freezer;
1723 	if (freezer == curthread) {
1724 		ASSERT(frozenstr(q));
1725 		ASSERT(MUTEX_HELD(QLOCK(q)));
1726 	} else
1727 		mutex_enter(QLOCK(q));
1728 
1729 	if ((bp = q->q_first) == 0) {
1730 		q->q_flag |= QWANTR;
1731 	} else {
1732 		if ((q->q_first = bp->b_next) == NULL)
1733 			q->q_last = NULL;
1734 		else
1735 			q->q_first->b_prev = NULL;
1736 
1737 		/* Get message byte count for q_count accounting */
1738 		for (tmp = bp; tmp; tmp = tmp->b_cont) {
1739 			ADD_MBLK_SIZE(tmp, bytecnt);
1740 			mblkcnt++;
1741 		}
1742 
1743 		if (bp->b_band == 0) {
1744 			q->q_count -= bytecnt;
1745 			q->q_mblkcnt -= mblkcnt;
1746 			if ((q->q_count < q->q_hiwat) &&
1747 			    (q->q_mblkcnt < q->q_hiwat)) {
1748 				q->q_flag &= ~QFULL;
1749 			}
1750 		} else {
1751 			int i;
1752 
1753 			ASSERT(bp->b_band <= q->q_nband);
1754 			ASSERT(q->q_bandp != NULL);
1755 			ASSERT(MUTEX_HELD(QLOCK(q)));
1756 			qbp = q->q_bandp;
1757 			i = bp->b_band;
1758 			while (--i > 0)
1759 				qbp = qbp->qb_next;
1760 			if (qbp->qb_first == qbp->qb_last) {
1761 				qbp->qb_first = NULL;
1762 				qbp->qb_last = NULL;
1763 			} else {
1764 				qbp->qb_first = bp->b_next;
1765 			}
1766 			qbp->qb_count -= bytecnt;
1767 			qbp->qb_mblkcnt -= mblkcnt;
1768 			if ((qbp->qb_count < qbp->qb_hiwat) &&
1769 			    (qbp->qb_mblkcnt < qbp->qb_hiwat)) {
1770 				qbp->qb_flag &= ~QB_FULL;
1771 			}
1772 		}
1773 		q->q_flag &= ~QWANTR;
1774 		bp->b_next = NULL;
1775 		bp->b_prev = NULL;
1776 	}
1777 	if (freezer != curthread)
1778 		mutex_exit(QLOCK(q));
1779 
1780 	STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL);
1781 
1782 	return (bp);
1783 }
1784 
1785 /*
1786  * Determine if a backenable is needed after removing a message in the
1787  * specified band.
1788  * NOTE: This routine assumes that something like getq_noenab() has been
1789  * already called.
1790  *
1791  * For the read side it is ok to hold sd_lock across calling this (and the
1792  * stream head often does).
1793  * But for the write side strwakeq might be invoked and it acquires sd_lock.
1794  */
1795 void
1796 qbackenable(queue_t *q, uchar_t band)
1797 {
1798 	int backenab = 0;
1799 	qband_t *qbp;
1800 	kthread_id_t freezer;
1801 
1802 	ASSERT(q);
1803 	ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
1804 
1805 	/*
1806 	 * Quick check without holding the lock.
1807 	 * OK since after getq() has lowered the q_count these flags
1808 	 * would not change unless either the qbackenable() is done by
1809 	 * another thread (which is ok) or the queue has gotten QFULL
1810 	 * in which case another backenable will take place when the queue
1811 	 * drops below q_lowat.
1812 	 */
1813 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1814 		return;
1815 
1816 	/* freezestr should allow its caller to call getq/putq */
1817 	freezer = STREAM(q)->sd_freezer;
1818 	if (freezer == curthread) {
1819 		ASSERT(frozenstr(q));
1820 		ASSERT(MUTEX_HELD(QLOCK(q)));
1821 	} else
1822 		mutex_enter(QLOCK(q));
1823 
1824 	if (band == 0) {
1825 		if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
1826 		    q->q_mblkcnt < q->q_lowat)) {
1827 			backenab = q->q_flag & (QWANTW|QWANTWSYNC);
1828 		}
1829 	} else {
1830 		int i;
1831 
1832 		ASSERT((unsigned)band <= q->q_nband);
1833 		ASSERT(q->q_bandp != NULL);
1834 
1835 		qbp = q->q_bandp;
1836 		i = band;
1837 		while (--i > 0)
1838 			qbp = qbp->qb_next;
1839 
1840 		if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
1841 		    qbp->qb_mblkcnt < qbp->qb_lowat)) {
1842 			backenab = qbp->qb_flag & QB_WANTW;
1843 		}
1844 	}
1845 
1846 	if (backenab == 0) {
1847 		if (freezer != curthread)
1848 			mutex_exit(QLOCK(q));
1849 		return;
1850 	}
1851 
1852 	/* Have to drop the lock across strwakeq and backenable */
1853 	if (backenab & QWANTWSYNC)
1854 		q->q_flag &= ~QWANTWSYNC;
1855 	if (backenab & (QWANTW|QB_WANTW)) {
1856 		if (band != 0)
1857 			qbp->qb_flag &= ~QB_WANTW;
1858 		else {
1859 			q->q_flag &= ~QWANTW;
1860 		}
1861 	}
1862 
1863 	if (freezer != curthread)
1864 		mutex_exit(QLOCK(q));
1865 
1866 	if (backenab & QWANTWSYNC)
1867 		strwakeq(q, QWANTWSYNC);
1868 	if (backenab & (QWANTW|QB_WANTW))
1869 		backenable(q, band);
1870 }
1871 
1872 /*
1873  * Remove a message from a queue.  The queue count and other
1874  * flow control parameters are adjusted and the back queue
1875  * enabled if necessary.
1876  *
1877  * rmvq can be called with the stream frozen, but other utility functions
1878  * holding QLOCK, and by streams modules without any locks/frozen.
1879  */
1880 void
1881 rmvq(queue_t *q, mblk_t *mp)
1882 {
1883 	ASSERT(mp != NULL);
1884 
1885 	rmvq_noenab(q, mp);
1886 	if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
1887 		/*
1888 		 * qbackenable can handle a frozen stream but not a "random"
1889 		 * qlock being held. Drop lock across qbackenable.
1890 		 */
1891 		mutex_exit(QLOCK(q));
1892 		qbackenable(q, mp->b_band);
1893 		mutex_enter(QLOCK(q));
1894 	} else {
1895 		qbackenable(q, mp->b_band);
1896 	}
1897 }
1898 
1899 /*
1900  * Like rmvq() but without any backenabling.
1901  * This exists to handle SR_CONSOL_DATA in strrput().
1902  */
1903 void
1904 rmvq_noenab(queue_t *q, mblk_t *mp)
1905 {
1906 	mblk_t *tmp;
1907 	int i;
1908 	qband_t *qbp = NULL;
1909 	kthread_id_t freezer;
1910 	int	bytecnt = 0, mblkcnt = 0;
1911 
1912 	freezer = STREAM(q)->sd_freezer;
1913 	if (freezer == curthread) {
1914 		ASSERT(frozenstr(q));
1915 		ASSERT(MUTEX_HELD(QLOCK(q)));
1916 	} else if (MUTEX_HELD(QLOCK(q))) {
1917 		/* Don't drop lock on exit */
1918 		freezer = curthread;
1919 	} else
1920 		mutex_enter(QLOCK(q));
1921 
1922 	ASSERT(mp->b_band <= q->q_nband);
1923 	if (mp->b_band != 0) {		/* Adjust band pointers */
1924 		ASSERT(q->q_bandp != NULL);
1925 		qbp = q->q_bandp;
1926 		i = mp->b_band;
1927 		while (--i > 0)
1928 			qbp = qbp->qb_next;
1929 		if (mp == qbp->qb_first) {
1930 			if (mp->b_next && mp->b_band == mp->b_next->b_band)
1931 				qbp->qb_first = mp->b_next;
1932 			else
1933 				qbp->qb_first = NULL;
1934 		}
1935 		if (mp == qbp->qb_last) {
1936 			if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
1937 				qbp->qb_last = mp->b_prev;
1938 			else
1939 				qbp->qb_last = NULL;
1940 		}
1941 	}
1942 
1943 	/*
1944 	 * Remove the message from the list.
1945 	 */
1946 	if (mp->b_prev)
1947 		mp->b_prev->b_next = mp->b_next;
1948 	else
1949 		q->q_first = mp->b_next;
1950 	if (mp->b_next)
1951 		mp->b_next->b_prev = mp->b_prev;
1952 	else
1953 		q->q_last = mp->b_prev;
1954 	mp->b_next = NULL;
1955 	mp->b_prev = NULL;
1956 
1957 	/* Get the size of the message for q_count accounting */
1958 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
1959 		ADD_MBLK_SIZE(tmp, bytecnt);
1960 		mblkcnt++;
1961 	}
1962 
1963 	if (mp->b_band == 0) {		/* Perform q_count accounting */
1964 		q->q_count -= bytecnt;
1965 		q->q_mblkcnt -= mblkcnt;
1966 		if ((q->q_count < q->q_hiwat) &&
1967 		    (q->q_mblkcnt < q->q_hiwat)) {
1968 			q->q_flag &= ~QFULL;
1969 		}
1970 	} else {			/* Perform qb_count accounting */
1971 		qbp->qb_count -= bytecnt;
1972 		qbp->qb_mblkcnt -= mblkcnt;
1973 		if ((qbp->qb_count < qbp->qb_hiwat) &&
1974 		    (qbp->qb_mblkcnt < qbp->qb_hiwat)) {
1975 			qbp->qb_flag &= ~QB_FULL;
1976 		}
1977 	}
1978 	if (freezer != curthread)
1979 		mutex_exit(QLOCK(q));
1980 
1981 	STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL);
1982 }
1983 
1984 /*
1985  * Empty a queue.
1986  * If flag is set, remove all messages.  Otherwise, remove
1987  * only non-control messages.  If queue falls below its low
1988  * water mark, and QWANTW is set, enable the nearest upstream
1989  * service procedure.
1990  *
1991  * Historical note: when merging the M_FLUSH code in strrput with this
1992  * code one difference was discovered. flushq did not have a check
1993  * for q_lowat == 0 in the backenabling test.
1994  *
1995  * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
1996  * if one exists on the queue.
1997  */
1998 void
1999 flushq_common(queue_t *q, int flag, int pcproto_flag)
2000 {
2001 	mblk_t *mp, *nmp;
2002 	qband_t *qbp;
2003 	int backenab = 0;
2004 	unsigned char bpri;
2005 	unsigned char	qbf[NBAND];	/* band flushing backenable flags */
2006 
2007 	if (q->q_first == NULL)
2008 		return;
2009 
2010 	mutex_enter(QLOCK(q));
2011 	mp = q->q_first;
2012 	q->q_first = NULL;
2013 	q->q_last = NULL;
2014 	q->q_count = 0;
2015 	q->q_mblkcnt = 0;
2016 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2017 		qbp->qb_first = NULL;
2018 		qbp->qb_last = NULL;
2019 		qbp->qb_count = 0;
2020 		qbp->qb_mblkcnt = 0;
2021 		qbp->qb_flag &= ~QB_FULL;
2022 	}
2023 	q->q_flag &= ~QFULL;
2024 	mutex_exit(QLOCK(q));
2025 	while (mp) {
2026 		nmp = mp->b_next;
2027 		mp->b_next = mp->b_prev = NULL;
2028 
2029 		STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL);
2030 
2031 		if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
2032 			(void) putq(q, mp);
2033 		else if (flag || datamsg(mp->b_datap->db_type))
2034 			freemsg(mp);
2035 		else
2036 			(void) putq(q, mp);
2037 		mp = nmp;
2038 	}
2039 	bpri = 1;
2040 	mutex_enter(QLOCK(q));
2041 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2042 		if ((qbp->qb_flag & QB_WANTW) &&
2043 		    (((qbp->qb_count < qbp->qb_lowat) &&
2044 		    (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
2045 		    qbp->qb_lowat == 0)) {
2046 			qbp->qb_flag &= ~QB_WANTW;
2047 			backenab = 1;
2048 			qbf[bpri] = 1;
2049 		} else
2050 			qbf[bpri] = 0;
2051 		bpri++;
2052 	}
2053 	ASSERT(bpri == (unsigned char)(q->q_nband + 1));
2054 	if ((q->q_flag & QWANTW) &&
2055 	    (((q->q_count < q->q_lowat) &&
2056 	    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2057 		q->q_flag &= ~QWANTW;
2058 		backenab = 1;
2059 		qbf[0] = 1;
2060 	} else
2061 		qbf[0] = 0;
2062 
2063 	/*
2064 	 * If any band can now be written to, and there is a writer
2065 	 * for that band, then backenable the closest service procedure.
2066 	 */
2067 	if (backenab) {
2068 		mutex_exit(QLOCK(q));
2069 		for (bpri = q->q_nband; bpri != 0; bpri--)
2070 			if (qbf[bpri])
2071 				backenable(q, bpri);
2072 		if (qbf[0])
2073 			backenable(q, 0);
2074 	} else
2075 		mutex_exit(QLOCK(q));
2076 }
2077 
2078 /*
2079  * The real flushing takes place in flushq_common. This is done so that
2080  * a flag which specifies whether or not M_PCPROTO messages should be flushed
2081  * or not. Currently the only place that uses this flag is the stream head.
2082  */
2083 void
2084 flushq(queue_t *q, int flag)
2085 {
2086 	flushq_common(q, flag, 0);
2087 }
2088 
2089 /*
2090  * Flush the queue of messages of the given priority band.
2091  * There is some duplication of code between flushq and flushband.
2092  * This is because we want to optimize the code as much as possible.
2093  * The assumption is that there will be more messages in the normal
2094  * (priority 0) band than in any other.
2095  *
2096  * Historical note: when merging the M_FLUSH code in strrput with this
2097  * code one difference was discovered. flushband had an extra check for
2098  * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
2099  * case. That check does not match the man page for flushband and was not
2100  * in the strrput flush code hence it was removed.
2101  */
2102 void
2103 flushband(queue_t *q, unsigned char pri, int flag)
2104 {
2105 	mblk_t *mp;
2106 	mblk_t *nmp;
2107 	mblk_t *last;
2108 	qband_t *qbp;
2109 	int band;
2110 
2111 	ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
2112 	if (pri > q->q_nband) {
2113 		return;
2114 	}
2115 	mutex_enter(QLOCK(q));
2116 	if (pri == 0) {
2117 		mp = q->q_first;
2118 		q->q_first = NULL;
2119 		q->q_last = NULL;
2120 		q->q_count = 0;
2121 		q->q_mblkcnt = 0;
2122 		for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2123 			qbp->qb_first = NULL;
2124 			qbp->qb_last = NULL;
2125 			qbp->qb_count = 0;
2126 			qbp->qb_mblkcnt = 0;
2127 			qbp->qb_flag &= ~QB_FULL;
2128 		}
2129 		q->q_flag &= ~QFULL;
2130 		mutex_exit(QLOCK(q));
2131 		while (mp) {
2132 			nmp = mp->b_next;
2133 			mp->b_next = mp->b_prev = NULL;
2134 			if ((mp->b_band == 0) &&
2135 				((flag == FLUSHALL) ||
2136 				datamsg(mp->b_datap->db_type)))
2137 				freemsg(mp);
2138 			else
2139 				(void) putq(q, mp);
2140 			mp = nmp;
2141 		}
2142 		mutex_enter(QLOCK(q));
2143 		if ((q->q_flag & QWANTW) &&
2144 		    (((q->q_count < q->q_lowat) &&
2145 		    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2146 			q->q_flag &= ~QWANTW;
2147 			mutex_exit(QLOCK(q));
2148 
2149 			backenable(q, pri);
2150 		} else
2151 			mutex_exit(QLOCK(q));
2152 	} else {	/* pri != 0 */
2153 		boolean_t flushed = B_FALSE;
2154 		band = pri;
2155 
2156 		ASSERT(MUTEX_HELD(QLOCK(q)));
2157 		qbp = q->q_bandp;
2158 		while (--band > 0)
2159 			qbp = qbp->qb_next;
2160 		mp = qbp->qb_first;
2161 		if (mp == NULL) {
2162 			mutex_exit(QLOCK(q));
2163 			return;
2164 		}
2165 		last = qbp->qb_last->b_next;
2166 		/*
2167 		 * rmvq_noenab() and freemsg() are called for each mblk that
2168 		 * meets the criteria.  The loop is executed until the last
2169 		 * mblk has been processed.
2170 		 */
2171 		while (mp != last) {
2172 			ASSERT(mp->b_band == pri);
2173 			nmp = mp->b_next;
2174 			if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
2175 				rmvq_noenab(q, mp);
2176 				freemsg(mp);
2177 				flushed = B_TRUE;
2178 			}
2179 			mp = nmp;
2180 		}
2181 		mutex_exit(QLOCK(q));
2182 
2183 		/*
2184 		 * If any mblk(s) has been freed, we know that qbackenable()
2185 		 * will need to be called.
2186 		 */
2187 		if (flushed)
2188 			qbackenable(q, pri);
2189 	}
2190 }
2191 
2192 /*
2193  * Return 1 if the queue is not full.  If the queue is full, return
2194  * 0 (may not put message) and set QWANTW flag (caller wants to write
2195  * to the queue).
2196  */
2197 int
2198 canput(queue_t *q)
2199 {
2200 	TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
2201 
2202 	/* this is for loopback transports, they should not do a canput */
2203 	ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
2204 
2205 	/* Find next forward module that has a service procedure */
2206 	q = q->q_nfsrv;
2207 
2208 	if (!(q->q_flag & QFULL)) {
2209 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2210 		return (1);
2211 	}
2212 	mutex_enter(QLOCK(q));
2213 	if (q->q_flag & QFULL) {
2214 		q->q_flag |= QWANTW;
2215 		mutex_exit(QLOCK(q));
2216 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
2217 		return (0);
2218 	}
2219 	mutex_exit(QLOCK(q));
2220 	TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2221 	return (1);
2222 }
2223 
2224 /*
2225  * This is the new canput for use with priority bands.  Return 1 if the
2226  * band is not full.  If the band is full, return 0 (may not put message)
2227  * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
2228  * write to the queue).
2229  */
2230 int
2231 bcanput(queue_t *q, unsigned char pri)
2232 {
2233 	qband_t *qbp;
2234 
2235 	TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
2236 	if (!q)
2237 		return (0);
2238 
2239 	/* Find next forward module that has a service procedure */
2240 	q = q->q_nfsrv;
2241 
2242 	mutex_enter(QLOCK(q));
2243 	if (pri == 0) {
2244 		if (q->q_flag & QFULL) {
2245 			q->q_flag |= QWANTW;
2246 			mutex_exit(QLOCK(q));
2247 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2248 				"bcanput:%p %X %d", q, pri, 0);
2249 			return (0);
2250 		}
2251 	} else {	/* pri != 0 */
2252 		if (pri > q->q_nband) {
2253 			/*
2254 			 * No band exists yet, so return success.
2255 			 */
2256 			mutex_exit(QLOCK(q));
2257 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2258 				"bcanput:%p %X %d", q, pri, 1);
2259 			return (1);
2260 		}
2261 		qbp = q->q_bandp;
2262 		while (--pri)
2263 			qbp = qbp->qb_next;
2264 		if (qbp->qb_flag & QB_FULL) {
2265 			qbp->qb_flag |= QB_WANTW;
2266 			mutex_exit(QLOCK(q));
2267 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2268 				"bcanput:%p %X %d", q, pri, 0);
2269 			return (0);
2270 		}
2271 	}
2272 	mutex_exit(QLOCK(q));
2273 	TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2274 		"bcanput:%p %X %d", q, pri, 1);
2275 	return (1);
2276 }
2277 
2278 /*
2279  * Put a message on a queue.
2280  *
2281  * Messages are enqueued on a priority basis.  The priority classes
2282  * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
2283  * and B_NORMAL (type < QPCTL && band == 0).
2284  *
2285  * Add appropriate weighted data block sizes to queue count.
2286  * If queue hits high water mark then set QFULL flag.
2287  *
2288  * If QNOENAB is not set (putq is allowed to enable the queue),
2289  * enable the queue only if the message is PRIORITY,
2290  * or the QWANTR flag is set (indicating that the service procedure
2291  * is ready to read the queue.  This implies that a service
2292  * procedure must NEVER put a high priority message back on its own
2293  * queue, as this would result in an infinite loop (!).
2294  */
2295 int
2296 putq(queue_t *q, mblk_t *bp)
2297 {
2298 	mblk_t *tmp;
2299 	qband_t *qbp = NULL;
2300 	int mcls = (int)queclass(bp);
2301 	kthread_id_t freezer;
2302 	int	bytecnt = 0, mblkcnt = 0;
2303 
2304 	freezer = STREAM(q)->sd_freezer;
2305 	if (freezer == curthread) {
2306 		ASSERT(frozenstr(q));
2307 		ASSERT(MUTEX_HELD(QLOCK(q)));
2308 	} else
2309 		mutex_enter(QLOCK(q));
2310 
2311 	/*
2312 	 * Make sanity checks and if qband structure is not yet
2313 	 * allocated, do so.
2314 	 */
2315 	if (mcls == QPCTL) {
2316 		if (bp->b_band != 0)
2317 			bp->b_band = 0;		/* force to be correct */
2318 	} else if (bp->b_band != 0) {
2319 		int i;
2320 		qband_t **qbpp;
2321 
2322 		if (bp->b_band > q->q_nband) {
2323 
2324 			/*
2325 			 * The qband structure for this priority band is
2326 			 * not on the queue yet, so we have to allocate
2327 			 * one on the fly.  It would be wasteful to
2328 			 * associate the qband structures with every
2329 			 * queue when the queues are allocated.  This is
2330 			 * because most queues will only need the normal
2331 			 * band of flow which can be described entirely
2332 			 * by the queue itself.
2333 			 */
2334 			qbpp = &q->q_bandp;
2335 			while (*qbpp)
2336 				qbpp = &(*qbpp)->qb_next;
2337 			while (bp->b_band > q->q_nband) {
2338 				if ((*qbpp = allocband()) == NULL) {
2339 					if (freezer != curthread)
2340 						mutex_exit(QLOCK(q));
2341 					return (0);
2342 				}
2343 				(*qbpp)->qb_hiwat = q->q_hiwat;
2344 				(*qbpp)->qb_lowat = q->q_lowat;
2345 				q->q_nband++;
2346 				qbpp = &(*qbpp)->qb_next;
2347 			}
2348 		}
2349 		ASSERT(MUTEX_HELD(QLOCK(q)));
2350 		qbp = q->q_bandp;
2351 		i = bp->b_band;
2352 		while (--i)
2353 			qbp = qbp->qb_next;
2354 	}
2355 
2356 	/*
2357 	 * If queue is empty, add the message and initialize the pointers.
2358 	 * Otherwise, adjust message pointers and queue pointers based on
2359 	 * the type of the message and where it belongs on the queue.  Some
2360 	 * code is duplicated to minimize the number of conditionals and
2361 	 * hopefully minimize the amount of time this routine takes.
2362 	 */
2363 	if (!q->q_first) {
2364 		bp->b_next = NULL;
2365 		bp->b_prev = NULL;
2366 		q->q_first = bp;
2367 		q->q_last = bp;
2368 		if (qbp) {
2369 			qbp->qb_first = bp;
2370 			qbp->qb_last = bp;
2371 		}
2372 	} else if (!qbp) {	/* bp->b_band == 0 */
2373 
2374 		/*
2375 		 * If queue class of message is less than or equal to
2376 		 * that of the last one on the queue, tack on to the end.
2377 		 */
2378 		tmp = q->q_last;
2379 		if (mcls <= (int)queclass(tmp)) {
2380 			bp->b_next = NULL;
2381 			bp->b_prev = tmp;
2382 			tmp->b_next = bp;
2383 			q->q_last = bp;
2384 		} else {
2385 			tmp = q->q_first;
2386 			while ((int)queclass(tmp) >= mcls)
2387 				tmp = tmp->b_next;
2388 
2389 			/*
2390 			 * Insert bp before tmp.
2391 			 */
2392 			bp->b_next = tmp;
2393 			bp->b_prev = tmp->b_prev;
2394 			if (tmp->b_prev)
2395 				tmp->b_prev->b_next = bp;
2396 			else
2397 				q->q_first = bp;
2398 			tmp->b_prev = bp;
2399 		}
2400 	} else {		/* bp->b_band != 0 */
2401 		if (qbp->qb_first) {
2402 			tmp = qbp->qb_last;
2403 
2404 			/*
2405 			 * Insert bp after the last message in this band.
2406 			 */
2407 			bp->b_next = tmp->b_next;
2408 			if (tmp->b_next)
2409 				tmp->b_next->b_prev = bp;
2410 			else
2411 				q->q_last = bp;
2412 			bp->b_prev = tmp;
2413 			tmp->b_next = bp;
2414 		} else {
2415 			tmp = q->q_last;
2416 			if ((mcls < (int)queclass(tmp)) ||
2417 			    (bp->b_band <= tmp->b_band)) {
2418 
2419 				/*
2420 				 * Tack bp on end of queue.
2421 				 */
2422 				bp->b_next = NULL;
2423 				bp->b_prev = tmp;
2424 				tmp->b_next = bp;
2425 				q->q_last = bp;
2426 			} else {
2427 				tmp = q->q_first;
2428 				while (tmp->b_datap->db_type >= QPCTL)
2429 					tmp = tmp->b_next;
2430 				while (tmp->b_band >= bp->b_band)
2431 					tmp = tmp->b_next;
2432 
2433 				/*
2434 				 * Insert bp before tmp.
2435 				 */
2436 				bp->b_next = tmp;
2437 				bp->b_prev = tmp->b_prev;
2438 				if (tmp->b_prev)
2439 					tmp->b_prev->b_next = bp;
2440 				else
2441 					q->q_first = bp;
2442 				tmp->b_prev = bp;
2443 			}
2444 			qbp->qb_first = bp;
2445 		}
2446 		qbp->qb_last = bp;
2447 	}
2448 
2449 	/* Get message byte count for q_count accounting */
2450 	for (tmp = bp; tmp; tmp = tmp->b_cont) {
2451 		ADD_MBLK_SIZE(tmp, bytecnt);
2452 		mblkcnt++;
2453 	}
2454 
2455 	if (qbp) {
2456 		qbp->qb_count += bytecnt;
2457 		qbp->qb_mblkcnt += mblkcnt;
2458 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2459 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2460 			qbp->qb_flag |= QB_FULL;
2461 		}
2462 	} else {
2463 		q->q_count += bytecnt;
2464 		q->q_mblkcnt += mblkcnt;
2465 		if ((q->q_count >= q->q_hiwat) ||
2466 		    (q->q_mblkcnt >= q->q_hiwat)) {
2467 			q->q_flag |= QFULL;
2468 		}
2469 	}
2470 
2471 	STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL);
2472 
2473 	if ((mcls > QNORM) ||
2474 	    (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
2475 		qenable_locked(q);
2476 	ASSERT(MUTEX_HELD(QLOCK(q)));
2477 	if (freezer != curthread)
2478 		mutex_exit(QLOCK(q));
2479 
2480 	return (1);
2481 }
2482 
2483 /*
2484  * Put stuff back at beginning of Q according to priority order.
2485  * See comment on putq above for details.
2486  */
2487 int
2488 putbq(queue_t *q, mblk_t *bp)
2489 {
2490 	mblk_t *tmp;
2491 	qband_t *qbp = NULL;
2492 	int mcls = (int)queclass(bp);
2493 	kthread_id_t freezer;
2494 	int	bytecnt = 0, mblkcnt = 0;
2495 
2496 	ASSERT(q && bp);
2497 	ASSERT(bp->b_next == NULL);
2498 	freezer = STREAM(q)->sd_freezer;
2499 	if (freezer == curthread) {
2500 		ASSERT(frozenstr(q));
2501 		ASSERT(MUTEX_HELD(QLOCK(q)));
2502 	} else
2503 		mutex_enter(QLOCK(q));
2504 
2505 	/*
2506 	 * Make sanity checks and if qband structure is not yet
2507 	 * allocated, do so.
2508 	 */
2509 	if (mcls == QPCTL) {
2510 		if (bp->b_band != 0)
2511 			bp->b_band = 0;		/* force to be correct */
2512 	} else if (bp->b_band != 0) {
2513 		int i;
2514 		qband_t **qbpp;
2515 
2516 		if (bp->b_band > q->q_nband) {
2517 			qbpp = &q->q_bandp;
2518 			while (*qbpp)
2519 				qbpp = &(*qbpp)->qb_next;
2520 			while (bp->b_band > q->q_nband) {
2521 				if ((*qbpp = allocband()) == NULL) {
2522 					if (freezer != curthread)
2523 						mutex_exit(QLOCK(q));
2524 					return (0);
2525 				}
2526 				(*qbpp)->qb_hiwat = q->q_hiwat;
2527 				(*qbpp)->qb_lowat = q->q_lowat;
2528 				q->q_nband++;
2529 				qbpp = &(*qbpp)->qb_next;
2530 			}
2531 		}
2532 		qbp = q->q_bandp;
2533 		i = bp->b_band;
2534 		while (--i)
2535 			qbp = qbp->qb_next;
2536 	}
2537 
2538 	/*
2539 	 * If queue is empty or if message is high priority,
2540 	 * place on the front of the queue.
2541 	 */
2542 	tmp = q->q_first;
2543 	if ((!tmp) || (mcls == QPCTL)) {
2544 		bp->b_next = tmp;
2545 		if (tmp)
2546 			tmp->b_prev = bp;
2547 		else
2548 			q->q_last = bp;
2549 		q->q_first = bp;
2550 		bp->b_prev = NULL;
2551 		if (qbp) {
2552 			qbp->qb_first = bp;
2553 			qbp->qb_last = bp;
2554 		}
2555 	} else if (qbp) {	/* bp->b_band != 0 */
2556 		tmp = qbp->qb_first;
2557 		if (tmp) {
2558 
2559 			/*
2560 			 * Insert bp before the first message in this band.
2561 			 */
2562 			bp->b_next = tmp;
2563 			bp->b_prev = tmp->b_prev;
2564 			if (tmp->b_prev)
2565 				tmp->b_prev->b_next = bp;
2566 			else
2567 				q->q_first = bp;
2568 			tmp->b_prev = bp;
2569 		} else {
2570 			tmp = q->q_last;
2571 			if ((mcls < (int)queclass(tmp)) ||
2572 			    (bp->b_band < tmp->b_band)) {
2573 
2574 				/*
2575 				 * Tack bp on end of queue.
2576 				 */
2577 				bp->b_next = NULL;
2578 				bp->b_prev = tmp;
2579 				tmp->b_next = bp;
2580 				q->q_last = bp;
2581 			} else {
2582 				tmp = q->q_first;
2583 				while (tmp->b_datap->db_type >= QPCTL)
2584 					tmp = tmp->b_next;
2585 				while (tmp->b_band > bp->b_band)
2586 					tmp = tmp->b_next;
2587 
2588 				/*
2589 				 * Insert bp before tmp.
2590 				 */
2591 				bp->b_next = tmp;
2592 				bp->b_prev = tmp->b_prev;
2593 				if (tmp->b_prev)
2594 					tmp->b_prev->b_next = bp;
2595 				else
2596 					q->q_first = bp;
2597 				tmp->b_prev = bp;
2598 			}
2599 			qbp->qb_last = bp;
2600 		}
2601 		qbp->qb_first = bp;
2602 	} else {		/* bp->b_band == 0 && !QPCTL */
2603 
2604 		/*
2605 		 * If the queue class or band is less than that of the last
2606 		 * message on the queue, tack bp on the end of the queue.
2607 		 */
2608 		tmp = q->q_last;
2609 		if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
2610 			bp->b_next = NULL;
2611 			bp->b_prev = tmp;
2612 			tmp->b_next = bp;
2613 			q->q_last = bp;
2614 		} else {
2615 			tmp = q->q_first;
2616 			while (tmp->b_datap->db_type >= QPCTL)
2617 				tmp = tmp->b_next;
2618 			while (tmp->b_band > bp->b_band)
2619 				tmp = tmp->b_next;
2620 
2621 			/*
2622 			 * Insert bp before tmp.
2623 			 */
2624 			bp->b_next = tmp;
2625 			bp->b_prev = tmp->b_prev;
2626 			if (tmp->b_prev)
2627 				tmp->b_prev->b_next = bp;
2628 			else
2629 				q->q_first = bp;
2630 			tmp->b_prev = bp;
2631 		}
2632 	}
2633 
2634 	/* Get message byte count for q_count accounting */
2635 	for (tmp = bp; tmp; tmp = tmp->b_cont) {
2636 		ADD_MBLK_SIZE(tmp, bytecnt);
2637 		mblkcnt++;
2638 	}
2639 	if (qbp) {
2640 		qbp->qb_count += bytecnt;
2641 		qbp->qb_mblkcnt += mblkcnt;
2642 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2643 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2644 			qbp->qb_flag |= QB_FULL;
2645 		}
2646 	} else {
2647 		q->q_count += bytecnt;
2648 		q->q_mblkcnt += mblkcnt;
2649 		if ((q->q_count >= q->q_hiwat) ||
2650 		    (q->q_mblkcnt >= q->q_hiwat)) {
2651 			q->q_flag |= QFULL;
2652 		}
2653 	}
2654 
2655 	STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL);
2656 
2657 	if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
2658 		qenable_locked(q);
2659 	ASSERT(MUTEX_HELD(QLOCK(q)));
2660 	if (freezer != curthread)
2661 		mutex_exit(QLOCK(q));
2662 
2663 	return (1);
2664 }
2665 
2666 /*
2667  * Insert a message before an existing message on the queue.  If the
2668  * existing message is NULL, the new messages is placed on the end of
2669  * the queue.  The queue class of the new message is ignored.  However,
2670  * the priority band of the new message must adhere to the following
2671  * ordering:
2672  *
2673  *	emp->b_prev->b_band >= mp->b_band >= emp->b_band.
2674  *
2675  * All flow control parameters are updated.
2676  *
2677  * insq can be called with the stream frozen, but other utility functions
2678  * holding QLOCK, and by streams modules without any locks/frozen.
2679  */
2680 int
2681 insq(queue_t *q, mblk_t *emp, mblk_t *mp)
2682 {
2683 	mblk_t *tmp;
2684 	qband_t *qbp = NULL;
2685 	int mcls = (int)queclass(mp);
2686 	kthread_id_t freezer;
2687 	int	bytecnt = 0, mblkcnt = 0;
2688 
2689 	freezer = STREAM(q)->sd_freezer;
2690 	if (freezer == curthread) {
2691 		ASSERT(frozenstr(q));
2692 		ASSERT(MUTEX_HELD(QLOCK(q)));
2693 	} else if (MUTEX_HELD(QLOCK(q))) {
2694 		/* Don't drop lock on exit */
2695 		freezer = curthread;
2696 	} else
2697 		mutex_enter(QLOCK(q));
2698 
2699 	if (mcls == QPCTL) {
2700 		if (mp->b_band != 0)
2701 			mp->b_band = 0;		/* force to be correct */
2702 		if (emp && emp->b_prev &&
2703 		    (emp->b_prev->b_datap->db_type < QPCTL))
2704 			goto badord;
2705 	}
2706 	if (emp) {
2707 		if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
2708 		    (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
2709 		    (emp->b_prev->b_band < mp->b_band))) {
2710 			goto badord;
2711 		}
2712 	} else {
2713 		tmp = q->q_last;
2714 		if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
2715 badord:
2716 			cmn_err(CE_WARN,
2717 			    "insq: attempt to insert message out of order "
2718 			    "on q %p", (void *)q);
2719 			if (freezer != curthread)
2720 				mutex_exit(QLOCK(q));
2721 			return (0);
2722 		}
2723 	}
2724 
2725 	if (mp->b_band != 0) {
2726 		int i;
2727 		qband_t **qbpp;
2728 
2729 		if (mp->b_band > q->q_nband) {
2730 			qbpp = &q->q_bandp;
2731 			while (*qbpp)
2732 				qbpp = &(*qbpp)->qb_next;
2733 			while (mp->b_band > q->q_nband) {
2734 				if ((*qbpp = allocband()) == NULL) {
2735 					if (freezer != curthread)
2736 						mutex_exit(QLOCK(q));
2737 					return (0);
2738 				}
2739 				(*qbpp)->qb_hiwat = q->q_hiwat;
2740 				(*qbpp)->qb_lowat = q->q_lowat;
2741 				q->q_nband++;
2742 				qbpp = &(*qbpp)->qb_next;
2743 			}
2744 		}
2745 		qbp = q->q_bandp;
2746 		i = mp->b_band;
2747 		while (--i)
2748 			qbp = qbp->qb_next;
2749 	}
2750 
2751 	if ((mp->b_next = emp) != NULL) {
2752 		if ((mp->b_prev = emp->b_prev) != NULL)
2753 			emp->b_prev->b_next = mp;
2754 		else
2755 			q->q_first = mp;
2756 		emp->b_prev = mp;
2757 	} else {
2758 		if ((mp->b_prev = q->q_last) != NULL)
2759 			q->q_last->b_next = mp;
2760 		else
2761 			q->q_first = mp;
2762 		q->q_last = mp;
2763 	}
2764 
2765 	/* Get mblk and byte count for q_count accounting */
2766 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
2767 		ADD_MBLK_SIZE(tmp, bytecnt);
2768 		mblkcnt++;
2769 	}
2770 
2771 	if (qbp) {	/* adjust qband pointers and count */
2772 		if (!qbp->qb_first) {
2773 			qbp->qb_first = mp;
2774 			qbp->qb_last = mp;
2775 		} else {
2776 			if (mp->b_prev == NULL || (mp->b_prev != NULL &&
2777 			    (mp->b_prev->b_band != mp->b_band)))
2778 				qbp->qb_first = mp;
2779 			else if (mp->b_next == NULL || (mp->b_next != NULL &&
2780 			    (mp->b_next->b_band != mp->b_band)))
2781 				qbp->qb_last = mp;
2782 		}
2783 		qbp->qb_count += bytecnt;
2784 		qbp->qb_mblkcnt += mblkcnt;
2785 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2786 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2787 			qbp->qb_flag |= QB_FULL;
2788 		}
2789 	} else {
2790 		q->q_count += bytecnt;
2791 		q->q_mblkcnt += mblkcnt;
2792 		if ((q->q_count >= q->q_hiwat) ||
2793 		    (q->q_mblkcnt >= q->q_hiwat)) {
2794 			q->q_flag |= QFULL;
2795 		}
2796 	}
2797 
2798 	STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL);
2799 
2800 	if (canenable(q) && (q->q_flag & QWANTR))
2801 		qenable_locked(q);
2802 
2803 	ASSERT(MUTEX_HELD(QLOCK(q)));
2804 	if (freezer != curthread)
2805 		mutex_exit(QLOCK(q));
2806 
2807 	return (1);
2808 }
2809 
2810 /*
2811  * Create and put a control message on queue.
2812  */
2813 int
2814 putctl(queue_t *q, int type)
2815 {
2816 	mblk_t *bp;
2817 
2818 	if ((datamsg(type) && (type != M_DELAY)) ||
2819 	    (bp = allocb_tryhard(0)) == NULL)
2820 		return (0);
2821 	bp->b_datap->db_type = (unsigned char) type;
2822 
2823 	put(q, bp);
2824 
2825 	return (1);
2826 }
2827 
2828 /*
2829  * Control message with a single-byte parameter
2830  */
2831 int
2832 putctl1(queue_t *q, int type, int param)
2833 {
2834 	mblk_t *bp;
2835 
2836 	if ((datamsg(type) && (type != M_DELAY)) ||
2837 	    (bp = allocb_tryhard(1)) == NULL)
2838 		return (0);
2839 	bp->b_datap->db_type = (unsigned char)type;
2840 	*bp->b_wptr++ = (unsigned char)param;
2841 
2842 	put(q, bp);
2843 
2844 	return (1);
2845 }
2846 
2847 int
2848 putnextctl1(queue_t *q, int type, int param)
2849 {
2850 	mblk_t *bp;
2851 
2852 	if ((datamsg(type) && (type != M_DELAY)) ||
2853 		((bp = allocb_tryhard(1)) == NULL))
2854 		return (0);
2855 
2856 	bp->b_datap->db_type = (unsigned char)type;
2857 	*bp->b_wptr++ = (unsigned char)param;
2858 
2859 	putnext(q, bp);
2860 
2861 	return (1);
2862 }
2863 
2864 int
2865 putnextctl(queue_t *q, int type)
2866 {
2867 	mblk_t *bp;
2868 
2869 	if ((datamsg(type) && (type != M_DELAY)) ||
2870 		((bp = allocb_tryhard(0)) == NULL))
2871 		return (0);
2872 	bp->b_datap->db_type = (unsigned char)type;
2873 
2874 	putnext(q, bp);
2875 
2876 	return (1);
2877 }
2878 
2879 /*
2880  * Return the queue upstream from this one
2881  */
2882 queue_t *
2883 backq(queue_t *q)
2884 {
2885 	q = _OTHERQ(q);
2886 	if (q->q_next) {
2887 		q = q->q_next;
2888 		return (_OTHERQ(q));
2889 	}
2890 	return (NULL);
2891 }
2892 
2893 /*
2894  * Send a block back up the queue in reverse from this
2895  * one (e.g. to respond to ioctls)
2896  */
2897 void
2898 qreply(queue_t *q, mblk_t *bp)
2899 {
2900 	ASSERT(q && bp);
2901 
2902 	putnext(_OTHERQ(q), bp);
2903 }
2904 
2905 /*
2906  * Streams Queue Scheduling
2907  *
2908  * Queues are enabled through qenable() when they have messages to
2909  * process.  They are serviced by queuerun(), which runs each enabled
2910  * queue's service procedure.  The call to queuerun() is processor
2911  * dependent - the general principle is that it be run whenever a queue
2912  * is enabled but before returning to user level.  For system calls,
2913  * the function runqueues() is called if their action causes a queue
2914  * to be enabled.  For device interrupts, queuerun() should be
2915  * called before returning from the last level of interrupt.  Beyond
2916  * this, no timing assumptions should be made about queue scheduling.
2917  */
2918 
2919 /*
2920  * Enable a queue: put it on list of those whose service procedures are
2921  * ready to run and set up the scheduling mechanism.
2922  * The broadcast is done outside the mutex -> to avoid the woken thread
2923  * from contending with the mutex. This is OK 'cos the queue has been
2924  * enqueued on the runlist and flagged safely at this point.
2925  */
2926 void
2927 qenable(queue_t *q)
2928 {
2929 	mutex_enter(QLOCK(q));
2930 	qenable_locked(q);
2931 	mutex_exit(QLOCK(q));
2932 }
2933 /*
2934  * Return number of messages on queue
2935  */
2936 int
2937 qsize(queue_t *qp)
2938 {
2939 	int count = 0;
2940 	mblk_t *mp;
2941 
2942 	mutex_enter(QLOCK(qp));
2943 	for (mp = qp->q_first; mp; mp = mp->b_next)
2944 		count++;
2945 	mutex_exit(QLOCK(qp));
2946 	return (count);
2947 }
2948 
2949 /*
2950  * noenable - set queue so that putq() will not enable it.
2951  * enableok - set queue so that putq() can enable it.
2952  */
2953 void
2954 noenable(queue_t *q)
2955 {
2956 	mutex_enter(QLOCK(q));
2957 	q->q_flag |= QNOENB;
2958 	mutex_exit(QLOCK(q));
2959 }
2960 
2961 void
2962 enableok(queue_t *q)
2963 {
2964 	mutex_enter(QLOCK(q));
2965 	q->q_flag &= ~QNOENB;
2966 	mutex_exit(QLOCK(q));
2967 }
2968 
2969 /*
2970  * Set queue fields.
2971  */
2972 int
2973 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
2974 {
2975 	qband_t *qbp = NULL;
2976 	queue_t	*wrq;
2977 	int error = 0;
2978 	kthread_id_t freezer;
2979 
2980 	freezer = STREAM(q)->sd_freezer;
2981 	if (freezer == curthread) {
2982 		ASSERT(frozenstr(q));
2983 		ASSERT(MUTEX_HELD(QLOCK(q)));
2984 	} else
2985 		mutex_enter(QLOCK(q));
2986 
2987 	if (what >= QBAD) {
2988 		error = EINVAL;
2989 		goto done;
2990 	}
2991 	if (pri != 0) {
2992 		int i;
2993 		qband_t **qbpp;
2994 
2995 		if (pri > q->q_nband) {
2996 			qbpp = &q->q_bandp;
2997 			while (*qbpp)
2998 				qbpp = &(*qbpp)->qb_next;
2999 			while (pri > q->q_nband) {
3000 				if ((*qbpp = allocband()) == NULL) {
3001 					error = EAGAIN;
3002 					goto done;
3003 				}
3004 				(*qbpp)->qb_hiwat = q->q_hiwat;
3005 				(*qbpp)->qb_lowat = q->q_lowat;
3006 				q->q_nband++;
3007 				qbpp = &(*qbpp)->qb_next;
3008 			}
3009 		}
3010 		qbp = q->q_bandp;
3011 		i = pri;
3012 		while (--i)
3013 			qbp = qbp->qb_next;
3014 	}
3015 	switch (what) {
3016 
3017 	case QHIWAT:
3018 		if (qbp)
3019 			qbp->qb_hiwat = (size_t)val;
3020 		else
3021 			q->q_hiwat = (size_t)val;
3022 		break;
3023 
3024 	case QLOWAT:
3025 		if (qbp)
3026 			qbp->qb_lowat = (size_t)val;
3027 		else
3028 			q->q_lowat = (size_t)val;
3029 		break;
3030 
3031 	case QMAXPSZ:
3032 		if (qbp)
3033 			error = EINVAL;
3034 		else
3035 			q->q_maxpsz = (ssize_t)val;
3036 
3037 		/*
3038 		 * Performance concern, strwrite looks at the module below
3039 		 * the stream head for the maxpsz each time it does a write
3040 		 * we now cache it at the stream head.  Check to see if this
3041 		 * queue is sitting directly below the stream head.
3042 		 */
3043 		wrq = STREAM(q)->sd_wrq;
3044 		if (q != wrq->q_next)
3045 			break;
3046 
3047 		/*
3048 		 * If the stream is not frozen drop the current QLOCK and
3049 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3050 		 */
3051 		if (freezer != curthread) {
3052 			mutex_exit(QLOCK(q));
3053 			mutex_enter(QLOCK(wrq));
3054 		}
3055 		ASSERT(MUTEX_HELD(QLOCK(wrq)));
3056 
3057 		if (strmsgsz != 0) {
3058 			if (val == INFPSZ)
3059 				val = strmsgsz;
3060 			else  {
3061 				if (STREAM(q)->sd_vnode->v_type == VFIFO)
3062 					val = MIN(PIPE_BUF, val);
3063 				else
3064 					val = MIN(strmsgsz, val);
3065 			}
3066 		}
3067 		STREAM(q)->sd_qn_maxpsz = val;
3068 		if (freezer != curthread) {
3069 			mutex_exit(QLOCK(wrq));
3070 			mutex_enter(QLOCK(q));
3071 		}
3072 		break;
3073 
3074 	case QMINPSZ:
3075 		if (qbp)
3076 			error = EINVAL;
3077 		else
3078 			q->q_minpsz = (ssize_t)val;
3079 
3080 		/*
3081 		 * Performance concern, strwrite looks at the module below
3082 		 * the stream head for the maxpsz each time it does a write
3083 		 * we now cache it at the stream head.  Check to see if this
3084 		 * queue is sitting directly below the stream head.
3085 		 */
3086 		wrq = STREAM(q)->sd_wrq;
3087 		if (q != wrq->q_next)
3088 			break;
3089 
3090 		/*
3091 		 * If the stream is not frozen drop the current QLOCK and
3092 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3093 		 */
3094 		if (freezer != curthread) {
3095 			mutex_exit(QLOCK(q));
3096 			mutex_enter(QLOCK(wrq));
3097 		}
3098 		STREAM(q)->sd_qn_minpsz = (ssize_t)val;
3099 
3100 		if (freezer != curthread) {
3101 			mutex_exit(QLOCK(wrq));
3102 			mutex_enter(QLOCK(q));
3103 		}
3104 		break;
3105 
3106 	case QSTRUIOT:
3107 		if (qbp)
3108 			error = EINVAL;
3109 		else
3110 			q->q_struiot = (ushort_t)val;
3111 		break;
3112 
3113 	case QCOUNT:
3114 	case QFIRST:
3115 	case QLAST:
3116 	case QFLAG:
3117 		error = EPERM;
3118 		break;
3119 
3120 	default:
3121 		error = EINVAL;
3122 		break;
3123 	}
3124 done:
3125 	if (freezer != curthread)
3126 		mutex_exit(QLOCK(q));
3127 	return (error);
3128 }
3129 
3130 /*
3131  * Get queue fields.
3132  */
3133 int
3134 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
3135 {
3136 	qband_t 	*qbp = NULL;
3137 	int 		error = 0;
3138 	kthread_id_t 	freezer;
3139 
3140 	freezer = STREAM(q)->sd_freezer;
3141 	if (freezer == curthread) {
3142 		ASSERT(frozenstr(q));
3143 		ASSERT(MUTEX_HELD(QLOCK(q)));
3144 	} else
3145 		mutex_enter(QLOCK(q));
3146 	if (what >= QBAD) {
3147 		error = EINVAL;
3148 		goto done;
3149 	}
3150 	if (pri != 0) {
3151 		int i;
3152 		qband_t **qbpp;
3153 
3154 		if (pri > q->q_nband) {
3155 			qbpp = &q->q_bandp;
3156 			while (*qbpp)
3157 				qbpp = &(*qbpp)->qb_next;
3158 			while (pri > q->q_nband) {
3159 				if ((*qbpp = allocband()) == NULL) {
3160 					error = EAGAIN;
3161 					goto done;
3162 				}
3163 				(*qbpp)->qb_hiwat = q->q_hiwat;
3164 				(*qbpp)->qb_lowat = q->q_lowat;
3165 				q->q_nband++;
3166 				qbpp = &(*qbpp)->qb_next;
3167 			}
3168 		}
3169 		qbp = q->q_bandp;
3170 		i = pri;
3171 		while (--i)
3172 			qbp = qbp->qb_next;
3173 	}
3174 	switch (what) {
3175 	case QHIWAT:
3176 		if (qbp)
3177 			*(size_t *)valp = qbp->qb_hiwat;
3178 		else
3179 			*(size_t *)valp = q->q_hiwat;
3180 		break;
3181 
3182 	case QLOWAT:
3183 		if (qbp)
3184 			*(size_t *)valp = qbp->qb_lowat;
3185 		else
3186 			*(size_t *)valp = q->q_lowat;
3187 		break;
3188 
3189 	case QMAXPSZ:
3190 		if (qbp)
3191 			error = EINVAL;
3192 		else
3193 			*(ssize_t *)valp = q->q_maxpsz;
3194 		break;
3195 
3196 	case QMINPSZ:
3197 		if (qbp)
3198 			error = EINVAL;
3199 		else
3200 			*(ssize_t *)valp = q->q_minpsz;
3201 		break;
3202 
3203 	case QCOUNT:
3204 		if (qbp)
3205 			*(size_t *)valp = qbp->qb_count;
3206 		else
3207 			*(size_t *)valp = q->q_count;
3208 		break;
3209 
3210 	case QFIRST:
3211 		if (qbp)
3212 			*(mblk_t **)valp = qbp->qb_first;
3213 		else
3214 			*(mblk_t **)valp = q->q_first;
3215 		break;
3216 
3217 	case QLAST:
3218 		if (qbp)
3219 			*(mblk_t **)valp = qbp->qb_last;
3220 		else
3221 			*(mblk_t **)valp = q->q_last;
3222 		break;
3223 
3224 	case QFLAG:
3225 		if (qbp)
3226 			*(uint_t *)valp = qbp->qb_flag;
3227 		else
3228 			*(uint_t *)valp = q->q_flag;
3229 		break;
3230 
3231 	case QSTRUIOT:
3232 		if (qbp)
3233 			error = EINVAL;
3234 		else
3235 			*(short *)valp = q->q_struiot;
3236 		break;
3237 
3238 	default:
3239 		error = EINVAL;
3240 		break;
3241 	}
3242 done:
3243 	if (freezer != curthread)
3244 		mutex_exit(QLOCK(q));
3245 	return (error);
3246 }
3247 
3248 /*
3249  * Function awakes all in cvwait/sigwait/pollwait, on one of:
3250  *	QWANTWSYNC or QWANTR or QWANTW,
3251  *
3252  * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
3253  *	 deferred wakeup will be done. Also if strpoll() in progress then a
3254  *	 deferred pollwakeup will be done.
3255  */
3256 void
3257 strwakeq(queue_t *q, int flag)
3258 {
3259 	stdata_t 	*stp = STREAM(q);
3260 	pollhead_t 	*pl;
3261 
3262 	mutex_enter(&stp->sd_lock);
3263 	pl = &stp->sd_pollist;
3264 	if (flag & QWANTWSYNC) {
3265 		ASSERT(!(q->q_flag & QREADR));
3266 		if (stp->sd_flag & WSLEEP) {
3267 			stp->sd_flag &= ~WSLEEP;
3268 			cv_broadcast(&stp->sd_wrq->q_wait);
3269 		} else {
3270 			stp->sd_wakeq |= WSLEEP;
3271 		}
3272 
3273 		mutex_exit(&stp->sd_lock);
3274 		pollwakeup(pl, POLLWRNORM);
3275 		mutex_enter(&stp->sd_lock);
3276 
3277 		if (stp->sd_sigflags & S_WRNORM)
3278 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3279 	} else if (flag & QWANTR) {
3280 		if (stp->sd_flag & RSLEEP) {
3281 			stp->sd_flag &= ~RSLEEP;
3282 			cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
3283 		} else {
3284 			stp->sd_wakeq |= RSLEEP;
3285 		}
3286 
3287 		mutex_exit(&stp->sd_lock);
3288 		pollwakeup(pl, POLLIN | POLLRDNORM);
3289 		mutex_enter(&stp->sd_lock);
3290 
3291 		{
3292 			int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
3293 
3294 			if (events)
3295 				strsendsig(stp->sd_siglist, events, 0, 0);
3296 		}
3297 	} else {
3298 		if (stp->sd_flag & WSLEEP) {
3299 			stp->sd_flag &= ~WSLEEP;
3300 			cv_broadcast(&stp->sd_wrq->q_wait);
3301 		}
3302 
3303 		mutex_exit(&stp->sd_lock);
3304 		pollwakeup(pl, POLLWRNORM);
3305 		mutex_enter(&stp->sd_lock);
3306 
3307 		if (stp->sd_sigflags & S_WRNORM)
3308 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3309 	}
3310 	mutex_exit(&stp->sd_lock);
3311 }
3312 
3313 int
3314 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
3315 {
3316 	stdata_t *stp = STREAM(q);
3317 	int typ  = STRUIOT_STANDARD;
3318 	uio_t	 *uiop = &dp->d_uio;
3319 	dblk_t	 *dbp;
3320 	ssize_t	 uiocnt;
3321 	ssize_t	 cnt;
3322 	unsigned char *ptr;
3323 	ssize_t	 resid;
3324 	int	 error = 0;
3325 	on_trap_data_t otd;
3326 	queue_t	*stwrq;
3327 
3328 	/*
3329 	 * Plumbing may change while taking the type so store the
3330 	 * queue in a temporary variable. It doesn't matter even
3331 	 * if the we take the type from the previous plumbing,
3332 	 * that's because if the plumbing has changed when we were
3333 	 * holding the queue in a temporary variable, we can continue
3334 	 * processing the message the way it would have been processed
3335 	 * in the old plumbing, without any side effects but a bit
3336 	 * extra processing for partial ip header checksum.
3337 	 *
3338 	 * This has been done to avoid holding the sd_lock which is
3339 	 * very hot.
3340 	 */
3341 
3342 	stwrq = stp->sd_struiowrq;
3343 	if (stwrq)
3344 		typ = stwrq->q_struiot;
3345 
3346 	for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
3347 		dbp = mp->b_datap;
3348 		ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
3349 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3350 		cnt = MIN(uiocnt, uiop->uio_resid);
3351 		if (!(dbp->db_struioflag & STRUIO_SPEC) ||
3352 		    (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
3353 			/*
3354 			 * Either this mblk has already been processed
3355 			 * or there is no more room in this mblk (?).
3356 			 */
3357 			continue;
3358 		}
3359 		switch (typ) {
3360 		case STRUIOT_STANDARD:
3361 			if (noblock) {
3362 				if (on_trap(&otd, OT_DATA_ACCESS)) {
3363 					no_trap();
3364 					error = EWOULDBLOCK;
3365 					goto out;
3366 				}
3367 			}
3368 			if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
3369 				if (noblock)
3370 					no_trap();
3371 				goto out;
3372 			}
3373 			if (noblock)
3374 				no_trap();
3375 			break;
3376 
3377 		default:
3378 			error = EIO;
3379 			goto out;
3380 		}
3381 		dbp->db_struioflag |= STRUIO_DONE;
3382 		dbp->db_cksumstuff += cnt;
3383 	}
3384 out:
3385 	if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
3386 		/*
3387 		 * A fault has occured and some bytes were moved to the
3388 		 * current mblk, the uio_t has already been updated by
3389 		 * the appropriate uio routine, so also update the mblk
3390 		 * to reflect this in case this same mblk chain is used
3391 		 * again (after the fault has been handled).
3392 		 */
3393 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3394 		if (uiocnt >= resid)
3395 			dbp->db_cksumstuff += resid;
3396 	}
3397 	return (error);
3398 }
3399 
3400 /*
3401  * Try to enter queue synchronously. Any attempt to enter a closing queue will
3402  * fails. The qp->q_rwcnt keeps track of the number of successful entries so
3403  * that removeq() will not try to close the queue while a thread is inside the
3404  * queue.
3405  */
3406 static boolean_t
3407 rwnext_enter(queue_t *qp)
3408 {
3409 	mutex_enter(QLOCK(qp));
3410 	if (qp->q_flag & QWCLOSE) {
3411 		mutex_exit(QLOCK(qp));
3412 		return (B_FALSE);
3413 	}
3414 	qp->q_rwcnt++;
3415 	ASSERT(qp->q_rwcnt != 0);
3416 	mutex_exit(QLOCK(qp));
3417 	return (B_TRUE);
3418 }
3419 
3420 /*
3421  * Decrease the count of threads running in sync stream queue and wake up any
3422  * threads blocked in removeq().
3423  */
3424 static void
3425 rwnext_exit(queue_t *qp)
3426 {
3427 	mutex_enter(QLOCK(qp));
3428 	qp->q_rwcnt--;
3429 	if (qp->q_flag & QWANTRMQSYNC) {
3430 		qp->q_flag &= ~QWANTRMQSYNC;
3431 		cv_broadcast(&qp->q_wait);
3432 	}
3433 	mutex_exit(QLOCK(qp));
3434 }
3435 
3436 /*
3437  * The purpose of rwnext() is to call the rw procedure of the next
3438  * (downstream) modules queue.
3439  *
3440  * treated as put entrypoint for perimeter syncronization.
3441  *
3442  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3443  * sync queues). If it is CIPUT sync queue sq_count is incremented and it does
3444  * not matter if any regular put entrypoints have been already entered. We
3445  * can't increment one of the sq_putcounts (instead of sq_count) because
3446  * qwait_rw won't know which counter to decrement.
3447  *
3448  * It would be reasonable to add the lockless FASTPUT logic.
3449  */
3450 int
3451 rwnext(queue_t *qp, struiod_t *dp)
3452 {
3453 	queue_t		*nqp;
3454 	syncq_t		*sq;
3455 	uint16_t	count;
3456 	uint16_t	flags;
3457 	struct qinit	*qi;
3458 	int		(*proc)();
3459 	struct stdata	*stp;
3460 	int		isread;
3461 	int		rval;
3462 
3463 	stp = STREAM(qp);
3464 	/*
3465 	 * Prevent q_next from changing by holding sd_lock until acquiring
3466 	 * SQLOCK. Note that a read-side rwnext from the streamhead will
3467 	 * already have sd_lock acquired. In either case sd_lock is always
3468 	 * released after acquiring SQLOCK.
3469 	 *
3470 	 * The streamhead read-side holding sd_lock when calling rwnext is
3471 	 * required to prevent a race condition were M_DATA mblks flowing
3472 	 * up the read-side of the stream could be bypassed by a rwnext()
3473 	 * down-call. In this case sd_lock acts as the streamhead perimeter.
3474 	 */
3475 	if ((nqp = _WR(qp)) == qp) {
3476 		isread = 0;
3477 		mutex_enter(&stp->sd_lock);
3478 		qp = nqp->q_next;
3479 	} else {
3480 		isread = 1;
3481 		if (nqp != stp->sd_wrq)
3482 			/* Not streamhead */
3483 			mutex_enter(&stp->sd_lock);
3484 		qp = _RD(nqp->q_next);
3485 	}
3486 	qi = qp->q_qinfo;
3487 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
3488 		/*
3489 		 * Not a synchronous module or no r/w procedure for this
3490 		 * queue, so just return EINVAL and let the caller handle it.
3491 		 */
3492 		mutex_exit(&stp->sd_lock);
3493 		return (EINVAL);
3494 	}
3495 
3496 	if (rwnext_enter(qp) == B_FALSE) {
3497 		mutex_exit(&stp->sd_lock);
3498 		return (EINVAL);
3499 	}
3500 
3501 	sq = qp->q_syncq;
3502 	mutex_enter(SQLOCK(sq));
3503 	mutex_exit(&stp->sd_lock);
3504 	count = sq->sq_count;
3505 	flags = sq->sq_flags;
3506 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3507 
3508 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3509 		/*
3510 		 * if this queue is being closed, return.
3511 		 */
3512 		if (qp->q_flag & QWCLOSE) {
3513 			mutex_exit(SQLOCK(sq));
3514 			rwnext_exit(qp);
3515 			return (EINVAL);
3516 		}
3517 
3518 		/*
3519 		 * Wait until we can enter the inner perimeter.
3520 		 */
3521 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3522 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3523 		count = sq->sq_count;
3524 		flags = sq->sq_flags;
3525 	}
3526 
3527 	if (isread == 0 && stp->sd_struiowrq == NULL ||
3528 	    isread == 1 && stp->sd_struiordq == NULL) {
3529 		/*
3530 		 * Stream plumbing changed while waiting for inner perimeter
3531 		 * so just return EINVAL and let the caller handle it.
3532 		 */
3533 		mutex_exit(SQLOCK(sq));
3534 		rwnext_exit(qp);
3535 		return (EINVAL);
3536 	}
3537 	if (!(flags & SQ_CIPUT))
3538 		sq->sq_flags = flags | SQ_EXCL;
3539 	sq->sq_count = count + 1;
3540 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3541 	/*
3542 	 * Note: The only message ordering guarantee that rwnext() makes is
3543 	 *	 for the write queue flow-control case. All others (r/w queue
3544 	 *	 with q_count > 0 (or q_first != 0)) are the resposibilty of
3545 	 *	 the queue's rw procedure. This could be genralized here buy
3546 	 *	 running the queue's service procedure, but that wouldn't be
3547 	 *	 the most efficent for all cases.
3548 	 */
3549 	mutex_exit(SQLOCK(sq));
3550 	if (! isread && (qp->q_flag & QFULL)) {
3551 		/*
3552 		 * Write queue may be flow controlled. If so,
3553 		 * mark the queue for wakeup when it's not.
3554 		 */
3555 		mutex_enter(QLOCK(qp));
3556 		if (qp->q_flag & QFULL) {
3557 			qp->q_flag |= QWANTWSYNC;
3558 			mutex_exit(QLOCK(qp));
3559 			rval = EWOULDBLOCK;
3560 			goto out;
3561 		}
3562 		mutex_exit(QLOCK(qp));
3563 	}
3564 
3565 	if (! isread && dp->d_mp)
3566 		STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
3567 		    dp->d_mp->b_datap->db_base);
3568 
3569 	rval = (*proc)(qp, dp);
3570 
3571 	if (isread && dp->d_mp)
3572 		STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
3573 		    dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
3574 out:
3575 	/*
3576 	 * The queue is protected from being freed by sq_count, so it is
3577 	 * safe to call rwnext_exit and reacquire SQLOCK(sq).
3578 	 */
3579 	rwnext_exit(qp);
3580 
3581 	mutex_enter(SQLOCK(sq));
3582 	flags = sq->sq_flags;
3583 	ASSERT(sq->sq_count != 0);
3584 	sq->sq_count--;
3585 	if (flags & SQ_TAIL) {
3586 		putnext_tail(sq, qp, flags);
3587 		/*
3588 		 * The only purpose of this ASSERT is to preserve calling stack
3589 		 * in DEBUG kernel.
3590 		 */
3591 		ASSERT(flags & SQ_TAIL);
3592 		return (rval);
3593 	}
3594 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3595 	/*
3596 	 * Safe to always drop SQ_EXCL:
3597 	 *	Not SQ_CIPUT means we set SQ_EXCL above
3598 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3599 	 *	did a qwriter(INNER) in which case nobody else
3600 	 *	is in the inner perimeter and we are exiting.
3601 	 *
3602 	 * I would like to make the following assertion:
3603 	 *
3604 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3605 	 * 	sq->sq_count == 0);
3606 	 *
3607 	 * which indicates that if we are both putshared and exclusive,
3608 	 * we became exclusive while executing the putproc, and the only
3609 	 * claim on the syncq was the one we dropped a few lines above.
3610 	 * But other threads that enter putnext while the syncq is exclusive
3611 	 * need to make a claim as they may need to drop SQLOCK in the
3612 	 * has_writers case to avoid deadlocks.  If these threads are
3613 	 * delayed or preempted, it is possible that the writer thread can
3614 	 * find out that there are other claims making the (sq_count == 0)
3615 	 * test invalid.
3616 	 */
3617 
3618 	sq->sq_flags = flags & ~SQ_EXCL;
3619 	if (sq->sq_flags & SQ_WANTWAKEUP) {
3620 		sq->sq_flags &= ~SQ_WANTWAKEUP;
3621 		cv_broadcast(&sq->sq_wait);
3622 	}
3623 	mutex_exit(SQLOCK(sq));
3624 	return (rval);
3625 }
3626 
3627 /*
3628  * The purpose of infonext() is to call the info procedure of the next
3629  * (downstream) modules queue.
3630  *
3631  * treated as put entrypoint for perimeter syncronization.
3632  *
3633  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3634  * sync queues). If it is CIPUT sync queue regular sq_count is incremented and
3635  * it does not matter if any regular put entrypoints have been already
3636  * entered.
3637  */
3638 int
3639 infonext(queue_t *qp, infod_t *idp)
3640 {
3641 	queue_t		*nqp;
3642 	syncq_t		*sq;
3643 	uint16_t	count;
3644 	uint16_t 	flags;
3645 	struct qinit	*qi;
3646 	int		(*proc)();
3647 	struct stdata	*stp;
3648 	int		rval;
3649 
3650 	stp = STREAM(qp);
3651 	/*
3652 	 * Prevent q_next from changing by holding sd_lock until
3653 	 * acquiring SQLOCK.
3654 	 */
3655 	mutex_enter(&stp->sd_lock);
3656 	if ((nqp = _WR(qp)) == qp) {
3657 		qp = nqp->q_next;
3658 	} else {
3659 		qp = _RD(nqp->q_next);
3660 	}
3661 	qi = qp->q_qinfo;
3662 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
3663 		mutex_exit(&stp->sd_lock);
3664 		return (EINVAL);
3665 	}
3666 	sq = qp->q_syncq;
3667 	mutex_enter(SQLOCK(sq));
3668 	mutex_exit(&stp->sd_lock);
3669 	count = sq->sq_count;
3670 	flags = sq->sq_flags;
3671 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3672 
3673 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3674 		/*
3675 		 * Wait until we can enter the inner perimeter.
3676 		 */
3677 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3678 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3679 		count = sq->sq_count;
3680 		flags = sq->sq_flags;
3681 	}
3682 
3683 	if (! (flags & SQ_CIPUT))
3684 		sq->sq_flags = flags | SQ_EXCL;
3685 	sq->sq_count = count + 1;
3686 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3687 	mutex_exit(SQLOCK(sq));
3688 
3689 	rval = (*proc)(qp, idp);
3690 
3691 	mutex_enter(SQLOCK(sq));
3692 	flags = sq->sq_flags;
3693 	ASSERT(sq->sq_count != 0);
3694 	sq->sq_count--;
3695 	if (flags & SQ_TAIL) {
3696 		putnext_tail(sq, qp, flags);
3697 		/*
3698 		 * The only purpose of this ASSERT is to preserve calling stack
3699 		 * in DEBUG kernel.
3700 		 */
3701 		ASSERT(flags & SQ_TAIL);
3702 		return (rval);
3703 	}
3704 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3705 /*
3706  * XXXX
3707  * I am not certain the next comment is correct here.  I need to consider
3708  * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
3709  * might cause other problems.  It just might be safer to drop it if
3710  * !SQ_CIPUT because that is when we set it.
3711  */
3712 	/*
3713 	 * Safe to always drop SQ_EXCL:
3714 	 *	Not SQ_CIPUT means we set SQ_EXCL above
3715 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3716 	 *	did a qwriter(INNER) in which case nobody else
3717 	 *	is in the inner perimeter and we are exiting.
3718 	 *
3719 	 * I would like to make the following assertion:
3720 	 *
3721 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3722 	 *	sq->sq_count == 0);
3723 	 *
3724 	 * which indicates that if we are both putshared and exclusive,
3725 	 * we became exclusive while executing the putproc, and the only
3726 	 * claim on the syncq was the one we dropped a few lines above.
3727 	 * But other threads that enter putnext while the syncq is exclusive
3728 	 * need to make a claim as they may need to drop SQLOCK in the
3729 	 * has_writers case to avoid deadlocks.  If these threads are
3730 	 * delayed or preempted, it is possible that the writer thread can
3731 	 * find out that there are other claims making the (sq_count == 0)
3732 	 * test invalid.
3733 	 */
3734 
3735 	sq->sq_flags = flags & ~SQ_EXCL;
3736 	mutex_exit(SQLOCK(sq));
3737 	return (rval);
3738 }
3739 
3740 /*
3741  * Return nonzero if the queue is responsible for struio(), else return 0.
3742  */
3743 int
3744 isuioq(queue_t *q)
3745 {
3746 	if (q->q_flag & QREADR)
3747 		return (STREAM(q)->sd_struiordq == q);
3748 	else
3749 		return (STREAM(q)->sd_struiowrq == q);
3750 }
3751 
3752 #if defined(__sparc)
3753 int disable_putlocks = 0;
3754 #else
3755 int disable_putlocks = 1;
3756 #endif
3757 
3758 /*
3759  * called by create_putlock.
3760  */
3761 static void
3762 create_syncq_putlocks(queue_t *q)
3763 {
3764 	syncq_t	*sq = q->q_syncq;
3765 	ciputctrl_t *cip;
3766 	int i;
3767 
3768 	ASSERT(sq != NULL);
3769 
3770 	ASSERT(disable_putlocks == 0);
3771 	ASSERT(n_ciputctrl >= min_n_ciputctrl);
3772 	ASSERT(ciputctrl_cache != NULL);
3773 
3774 	if (!(sq->sq_type & SQ_CIPUT))
3775 		return;
3776 
3777 	for (i = 0; i <= 1; i++) {
3778 		if (sq->sq_ciputctrl == NULL) {
3779 			cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
3780 			SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
3781 			mutex_enter(SQLOCK(sq));
3782 			if (sq->sq_ciputctrl != NULL) {
3783 				mutex_exit(SQLOCK(sq));
3784 				kmem_cache_free(ciputctrl_cache, cip);
3785 			} else {
3786 				ASSERT(sq->sq_nciputctrl == 0);
3787 				sq->sq_nciputctrl = n_ciputctrl - 1;
3788 				/*
3789 				 * putnext checks sq_ciputctrl without holding
3790 				 * SQLOCK. if it is not NULL putnext assumes
3791 				 * sq_nciputctrl is initialized. membar below
3792 				 * insures that.
3793 				 */
3794 				membar_producer();
3795 				sq->sq_ciputctrl = cip;
3796 				mutex_exit(SQLOCK(sq));
3797 			}
3798 		}
3799 		ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
3800 		if (i == 1)
3801 			break;
3802 		q = _OTHERQ(q);
3803 		if (!(q->q_flag & QPERQ)) {
3804 			ASSERT(sq == q->q_syncq);
3805 			break;
3806 		}
3807 		ASSERT(q->q_syncq != NULL);
3808 		ASSERT(sq != q->q_syncq);
3809 		sq = q->q_syncq;
3810 		ASSERT(sq->sq_type & SQ_CIPUT);
3811 	}
3812 }
3813 
3814 /*
3815  * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
3816  * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
3817  * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
3818  * starting from q and down to the driver.
3819  *
3820  * This should be called after the affected queues are part of stream
3821  * geometry. It should be called from driver/module open routine after
3822  * qprocson() call. It is also called from nfs syscall where it is known that
3823  * stream is configured and won't change its geometry during create_putlock
3824  * call.
3825  *
3826  * caller normally uses 0 value for the stream argument to speed up MT putnext
3827  * into the perimeter of q for example because its perimeter is per module
3828  * (e.g. IP).
3829  *
3830  * caller normally uses non 0 value for the stream argument to hint the system
3831  * that the stream of q is a very contended global system stream
3832  * (e.g. NFS/UDP) and the part of the stream from q to the driver is
3833  * particularly MT hot.
3834  *
3835  * Caller insures stream plumbing won't happen while we are here and therefore
3836  * q_next can be safely used.
3837  */
3838 
3839 void
3840 create_putlocks(queue_t *q, int stream)
3841 {
3842 	ciputctrl_t	*cip;
3843 	struct stdata	*stp = STREAM(q);
3844 
3845 	q = _WR(q);
3846 	ASSERT(stp != NULL);
3847 
3848 	if (disable_putlocks != 0)
3849 		return;
3850 
3851 	if (n_ciputctrl < min_n_ciputctrl)
3852 		return;
3853 
3854 	ASSERT(ciputctrl_cache != NULL);
3855 
3856 	if (stream != 0 && stp->sd_ciputctrl == NULL) {
3857 		cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
3858 		SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
3859 		mutex_enter(&stp->sd_lock);
3860 		if (stp->sd_ciputctrl != NULL) {
3861 			mutex_exit(&stp->sd_lock);
3862 			kmem_cache_free(ciputctrl_cache, cip);
3863 		} else {
3864 			ASSERT(stp->sd_nciputctrl == 0);
3865 			stp->sd_nciputctrl = n_ciputctrl - 1;
3866 			/*
3867 			 * putnext checks sd_ciputctrl without holding
3868 			 * sd_lock. if it is not NULL putnext assumes
3869 			 * sd_nciputctrl is initialized. membar below
3870 			 * insures that.
3871 			 */
3872 			membar_producer();
3873 			stp->sd_ciputctrl = cip;
3874 			mutex_exit(&stp->sd_lock);
3875 		}
3876 	}
3877 
3878 	ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);
3879 
3880 	while (_SAMESTR(q)) {
3881 		create_syncq_putlocks(q);
3882 		if (stream == 0)
3883 			return;
3884 		q = q->q_next;
3885 	}
3886 	ASSERT(q != NULL);
3887 	create_syncq_putlocks(q);
3888 }
3889 
3890 /*
3891  * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
3892  * through a stream.
3893  *
3894  * Data currently record per event is a hrtime stamp, queue address, event
3895  * type, and a per type datum.  Much of the STREAMS framework is instrumented
3896  * for automatic flow tracing (when enabled).  Events can be defined and used
3897  * by STREAMS modules and drivers.
3898  *
3899  * Global objects:
3900  *
3901  *	str_ftevent() - Add a flow-trace event to a dblk.
3902  *	str_ftfree() - Free flow-trace data
3903  *
3904  * Local objects:
3905  *
3906  *	fthdr_cache - pointer to the kmem cache for trace header.
3907  *	ftblk_cache - pointer to the kmem cache for trace data blocks.
3908  */
3909 
3910 int str_ftnever = 1;	/* Don't do STREAMS flow tracing */
3911 
3912 void
3913 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
3914 {
3915 	ftblk_t *bp = hp->tail;
3916 	ftblk_t *nbp;
3917 	ftevnt_t *ep;
3918 	int ix, nix;
3919 
3920 	ASSERT(hp != NULL);
3921 
3922 	for (;;) {
3923 		if ((ix = bp->ix) == FTBLK_EVNTS) {
3924 			/*
3925 			 * Tail doesn't have room, so need a new tail.
3926 			 *
3927 			 * To make this MT safe, first, allocate a new
3928 			 * ftblk, and initialize it.  To make life a
3929 			 * little easier, reserve the first slot (mostly
3930 			 * by making ix = 1).  When we are finished with
3931 			 * the initialization, CAS this pointer to the
3932 			 * tail.  If this succeeds, this is the new
3933 			 * "next" block.  Otherwise, another thread
3934 			 * got here first, so free the block and start
3935 			 * again.
3936 			 */
3937 			if (!(nbp = kmem_cache_alloc(ftblk_cache,
3938 			    KM_NOSLEEP))) {
3939 				/* no mem, so punt */
3940 				str_ftnever++;
3941 				/* free up all flow data? */
3942 				return;
3943 			}
3944 			nbp->nxt = NULL;
3945 			nbp->ix = 1;
3946 			/*
3947 			 * Just in case there is another thread about
3948 			 * to get the next index, we need to make sure
3949 			 * the value is there for it.
3950 			 */
3951 			membar_producer();
3952 			if (casptr(&hp->tail, bp, nbp) == bp) {
3953 				/* CAS was successful */
3954 				bp->nxt = nbp;
3955 				membar_producer();
3956 				bp = nbp;
3957 				ix = 0;
3958 				goto cas_good;
3959 			} else {
3960 				kmem_cache_free(ftblk_cache, nbp);
3961 				bp = hp->tail;
3962 				continue;
3963 			}
3964 		}
3965 		nix = ix + 1;
3966 		if (cas32((uint32_t *)&bp->ix, ix, nix) == ix) {
3967 		cas_good:
3968 			if (curthread != hp->thread) {
3969 				hp->thread = curthread;
3970 				evnt |= FTEV_CS;
3971 			}
3972 			if (CPU->cpu_seqid != hp->cpu_seqid) {
3973 				hp->cpu_seqid = CPU->cpu_seqid;
3974 				evnt |= FTEV_PS;
3975 			}
3976 			ep = &bp->ev[ix];
3977 			break;
3978 		}
3979 	}
3980 
3981 	if (evnt & FTEV_QMASK) {
3982 		queue_t *qp = p;
3983 
3984 		/*
3985 		 * It is possible that the module info is broke
3986 		 * (as is logsubr.c at this comment writing).
3987 		 * Instead of panicing or doing other unmentionables,
3988 		 * we shall put a dummy name as the mid, and continue.
3989 		 */
3990 		if (qp->q_qinfo == NULL)
3991 			ep->mid = "NONAME";
3992 		else
3993 			ep->mid = qp->q_qinfo->qi_minfo->mi_idname;
3994 
3995 		if (!(qp->q_flag & QREADR))
3996 			evnt |= FTEV_ISWR;
3997 	} else {
3998 		ep->mid = (char *)p;
3999 	}
4000 
4001 	ep->ts = gethrtime();
4002 	ep->evnt = evnt;
4003 	ep->data = data;
4004 	hp->hash = (hp->hash << 9) + hp->hash;
4005 	hp->hash += (evnt << 16) | data;
4006 	hp->hash += (uintptr_t)ep->mid;
4007 }
4008 
4009 /*
4010  * Free flow-trace data.
4011  */
4012 void
4013 str_ftfree(dblk_t *dbp)
4014 {
4015 	fthdr_t *hp = dbp->db_fthdr;
4016 	ftblk_t *bp = &hp->first;
4017 	ftblk_t *nbp;
4018 
4019 	if (bp != hp->tail || bp->ix != 0) {
4020 		/*
4021 		 * Clear out the hash, have the tail point to itself, and free
4022 		 * any continuation blocks.
4023 		 */
4024 		bp = hp->first.nxt;
4025 		hp->tail = &hp->first;
4026 		hp->hash = 0;
4027 		hp->first.nxt = NULL;
4028 		hp->first.ix = 0;
4029 		while (bp != NULL) {
4030 			nbp = bp->nxt;
4031 			kmem_cache_free(ftblk_cache, bp);
4032 			bp = nbp;
4033 		}
4034 	}
4035 	kmem_cache_free(fthdr_cache, hp);
4036 	dbp->db_fthdr = NULL;
4037 }
4038