xref: /illumos-gate/usr/src/uts/common/io/stream.c (revision 150d2c5288c645a1c1a7d2bee61199a3729406c7)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
22 /*	  All Rights Reserved  	*/
23 
24 
25 /*
26  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
27  * Use is subject to license terms.
28  */
29 
30 #pragma ident	"%Z%%M%	%I%	%E% SMI"
31 
32 #include <sys/types.h>
33 #include <sys/param.h>
34 #include <sys/thread.h>
35 #include <sys/sysmacros.h>
36 #include <sys/stropts.h>
37 #include <sys/stream.h>
38 #include <sys/strsubr.h>
39 #include <sys/strsun.h>
40 #include <sys/conf.h>
41 #include <sys/debug.h>
42 #include <sys/cmn_err.h>
43 #include <sys/kmem.h>
44 #include <sys/atomic.h>
45 #include <sys/errno.h>
46 #include <sys/vtrace.h>
47 #include <sys/ftrace.h>
48 #include <sys/ontrap.h>
49 #include <sys/multidata.h>
50 #include <sys/multidata_impl.h>
51 #include <sys/sdt.h>
52 #include <sys/strft.h>
53 
54 #ifdef DEBUG
55 #include <sys/kmem_impl.h>
56 #endif
57 
58 /*
59  * This file contains all the STREAMS utility routines that may
60  * be used by modules and drivers.
61  */
62 
63 /*
64  * STREAMS message allocator: principles of operation
65  *
66  * The streams message allocator consists of all the routines that
67  * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
68  * dupb(), freeb() and freemsg().  What follows is a high-level view
69  * of how the allocator works.
70  *
71  * Every streams message consists of one or more mblks, a dblk, and data.
72  * All mblks for all types of messages come from a common mblk_cache.
73  * The dblk and data come in several flavors, depending on how the
74  * message is allocated:
75  *
76  * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
77  *     fixed-size dblk/data caches. For message sizes that are multiples of
78  *     PAGESIZE, dblks are allocated separately from the buffer.
79  *     The associated buffer is allocated by the constructor using kmem_alloc().
80  *     For all other message sizes, dblk and its associated data is allocated
81  *     as a single contiguous chunk of memory.
82  *     Objects in these caches consist of a dblk plus its associated data.
83  *     allocb() determines the nearest-size cache by table lookup:
84  *     the dblk_cache[] array provides the mapping from size to dblk cache.
85  *
86  * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
87  *     kmem_alloc()'ing a buffer for the data and supplying that
88  *     buffer to gesballoc(), described below.
89  *
90  * (3) The four flavors of [d]esballoc[a] are all implemented by a
91  *     common routine, gesballoc() ("generic esballoc").  gesballoc()
92  *     allocates a dblk from the global dblk_esb_cache and sets db_base,
93  *     db_lim and db_frtnp to describe the caller-supplied buffer.
94  *
95  * While there are several routines to allocate messages, there is only
96  * one routine to free messages: freeb().  freeb() simply invokes the
97  * dblk's free method, dbp->db_free(), which is set at allocation time.
98  *
99  * dupb() creates a new reference to a message by allocating a new mblk,
100  * incrementing the dblk reference count and setting the dblk's free
101  * method to dblk_decref().  The dblk's original free method is retained
102  * in db_lastfree.  dblk_decref() decrements the reference count on each
103  * freeb().  If this is not the last reference it just frees the mblk;
104  * if this *is* the last reference, it restores db_free to db_lastfree,
105  * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
106  *
107  * The implementation makes aggressive use of kmem object caching for
108  * maximum performance.  This makes the code simple and compact, but
109  * also a bit abstruse in some places.  The invariants that constitute a
110  * message's constructed state, described below, are more subtle than usual.
111  *
112  * Every dblk has an "attached mblk" as part of its constructed state.
113  * The mblk is allocated by the dblk's constructor and remains attached
114  * until the message is either dup'ed or pulled up.  In the dupb() case
115  * the mblk association doesn't matter until the last free, at which time
116  * dblk_decref() attaches the last mblk to the dblk.  pullupmsg() affects
117  * the mblk association because it swaps the leading mblks of two messages,
118  * so it is responsible for swapping their db_mblk pointers accordingly.
119  * From a constructed-state viewpoint it doesn't matter that a dblk's
120  * attached mblk can change while the message is allocated; all that
121  * matters is that the dblk has *some* attached mblk when it's freed.
122  *
123  * The sizes of the allocb() small-message caches are not magical.
124  * They represent a good trade-off between internal and external
125  * fragmentation for current workloads.  They should be reevaluated
126  * periodically, especially if allocations larger than DBLK_MAX_CACHE
127  * become common.  We use 64-byte alignment so that dblks don't
128  * straddle cache lines unnecessarily.
129  */
130 #define	DBLK_MAX_CACHE		73728
131 #define	DBLK_CACHE_ALIGN	64
132 #define	DBLK_MIN_SIZE		8
133 #define	DBLK_SIZE_SHIFT		3
134 
135 #ifdef _BIG_ENDIAN
136 #define	DBLK_RTFU_SHIFT(field)	\
137 	(8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
138 #else
139 #define	DBLK_RTFU_SHIFT(field)	\
140 	(8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
141 #endif
142 
143 #define	DBLK_RTFU(ref, type, flags, uioflag)	\
144 	(((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
145 	((type) << DBLK_RTFU_SHIFT(db_type)) | \
146 	(((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
147 	((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
148 #define	DBLK_RTFU_REF_MASK	(DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
149 #define	DBLK_RTFU_WORD(dbp)	(*((uint32_t *)&(dbp)->db_ref))
150 #define	MBLK_BAND_FLAG_WORD(mp)	(*((uint32_t *)&(mp)->b_band))
151 
152 static size_t dblk_sizes[] = {
153 #ifdef _LP64
154 	16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3920,
155 	8192, 12112, 16384, 20304, 24576, 28496, 32768, 36688,
156 	40960, 44880, 49152, 53072, 57344, 61264, 65536, 69456,
157 #else
158 	64, 128, 320, 576, 1088, 1536, 1984, 2624, 3968,
159 	8192, 12160, 16384, 20352, 24576, 28544, 32768, 36736,
160 	40960, 44928, 49152, 53120, 57344, 61312, 65536, 69504,
161 #endif
162 	DBLK_MAX_CACHE, 0
163 };
164 
165 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
166 static struct kmem_cache *mblk_cache;
167 static struct kmem_cache *dblk_esb_cache;
168 static struct kmem_cache *fthdr_cache;
169 static struct kmem_cache *ftblk_cache;
170 
171 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
172 static mblk_t *allocb_oversize(size_t size, int flags);
173 static int allocb_tryhard_fails;
174 static void frnop_func(void *arg);
175 frtn_t frnop = { frnop_func };
176 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
177 
178 static boolean_t rwnext_enter(queue_t *qp);
179 static void rwnext_exit(queue_t *qp);
180 
181 /*
182  * Patchable mblk/dblk kmem_cache flags.
183  */
184 int dblk_kmem_flags = 0;
185 int mblk_kmem_flags = 0;
186 
187 
188 static int
189 dblk_constructor(void *buf, void *cdrarg, int kmflags)
190 {
191 	dblk_t *dbp = buf;
192 	ssize_t msg_size = (ssize_t)cdrarg;
193 	size_t index;
194 
195 	ASSERT(msg_size != 0);
196 
197 	index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
198 
199 	ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
200 
201 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
202 		return (-1);
203 	if ((msg_size & PAGEOFFSET) == 0) {
204 		dbp->db_base = kmem_alloc(msg_size, kmflags);
205 		if (dbp->db_base == NULL) {
206 			kmem_cache_free(mblk_cache, dbp->db_mblk);
207 			return (-1);
208 		}
209 	} else {
210 		dbp->db_base = (unsigned char *)&dbp[1];
211 	}
212 
213 	dbp->db_mblk->b_datap = dbp;
214 	dbp->db_cache = dblk_cache[index];
215 	dbp->db_lim = dbp->db_base + msg_size;
216 	dbp->db_free = dbp->db_lastfree = dblk_lastfree;
217 	dbp->db_frtnp = NULL;
218 	dbp->db_fthdr = NULL;
219 	dbp->db_credp = NULL;
220 	dbp->db_cpid = -1;
221 	dbp->db_struioflag = 0;
222 	dbp->db_struioun.cksum.flags = 0;
223 	return (0);
224 }
225 
226 /*ARGSUSED*/
227 static int
228 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
229 {
230 	dblk_t *dbp = buf;
231 
232 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
233 		return (-1);
234 	dbp->db_mblk->b_datap = dbp;
235 	dbp->db_cache = dblk_esb_cache;
236 	dbp->db_fthdr = NULL;
237 	dbp->db_credp = NULL;
238 	dbp->db_cpid = -1;
239 	dbp->db_struioflag = 0;
240 	dbp->db_struioun.cksum.flags = 0;
241 	return (0);
242 }
243 
244 static int
245 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
246 {
247 	dblk_t *dbp = buf;
248 	bcache_t *bcp = (bcache_t *)cdrarg;
249 
250 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
251 		return (-1);
252 
253 	if ((dbp->db_base = (unsigned char *)kmem_cache_alloc(bcp->buffer_cache,
254 	    kmflags)) == NULL) {
255 		kmem_cache_free(mblk_cache, dbp->db_mblk);
256 		return (-1);
257 	}
258 
259 	dbp->db_mblk->b_datap = dbp;
260 	dbp->db_cache = (void *)bcp;
261 	dbp->db_lim = dbp->db_base + bcp->size;
262 	dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
263 	dbp->db_frtnp = NULL;
264 	dbp->db_fthdr = NULL;
265 	dbp->db_credp = NULL;
266 	dbp->db_cpid = -1;
267 	dbp->db_struioflag = 0;
268 	dbp->db_struioun.cksum.flags = 0;
269 	return (0);
270 }
271 
272 /*ARGSUSED*/
273 static void
274 dblk_destructor(void *buf, void *cdrarg)
275 {
276 	dblk_t *dbp = buf;
277 	ssize_t msg_size = (ssize_t)cdrarg;
278 
279 	ASSERT(dbp->db_mblk->b_datap == dbp);
280 
281 	ASSERT(msg_size != 0);
282 
283 	ASSERT(dbp->db_struioflag == 0);
284 	ASSERT(dbp->db_struioun.cksum.flags == 0);
285 
286 	if ((msg_size & PAGEOFFSET) == 0) {
287 		kmem_free(dbp->db_base, msg_size);
288 	}
289 
290 	kmem_cache_free(mblk_cache, dbp->db_mblk);
291 }
292 
293 static void
294 bcache_dblk_destructor(void *buf, void *cdrarg)
295 {
296 	dblk_t *dbp = buf;
297 	bcache_t *bcp = (bcache_t *)cdrarg;
298 
299 	kmem_cache_free(bcp->buffer_cache, dbp->db_base);
300 
301 	ASSERT(dbp->db_mblk->b_datap == dbp);
302 
303 	ASSERT(dbp->db_struioflag == 0);
304 	ASSERT(dbp->db_struioun.cksum.flags == 0);
305 
306 	kmem_cache_free(mblk_cache, dbp->db_mblk);
307 }
308 
309 void
310 streams_msg_init(void)
311 {
312 	char name[40];
313 	size_t size;
314 	size_t lastsize = DBLK_MIN_SIZE;
315 	size_t *sizep;
316 	struct kmem_cache *cp;
317 	size_t tot_size;
318 	int offset;
319 
320 	mblk_cache = kmem_cache_create("streams_mblk",
321 		sizeof (mblk_t), 32, NULL, NULL, NULL, NULL, NULL,
322 		mblk_kmem_flags);
323 
324 	for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
325 
326 		if ((offset = (size & PAGEOFFSET)) != 0) {
327 			/*
328 			 * We are in the middle of a page, dblk should
329 			 * be allocated on the same page
330 			 */
331 			tot_size = size + sizeof (dblk_t);
332 			ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
333 								< PAGESIZE);
334 			ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
335 
336 		} else {
337 
338 			/*
339 			 * buf size is multiple of page size, dblk and
340 			 * buffer are allocated separately.
341 			 */
342 
343 			ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
344 			tot_size = sizeof (dblk_t);
345 		}
346 
347 		(void) sprintf(name, "streams_dblk_%ld", size);
348 		cp = kmem_cache_create(name, tot_size,
349 			DBLK_CACHE_ALIGN, dblk_constructor,
350 			dblk_destructor, NULL,
351 			(void *)(size), NULL, dblk_kmem_flags);
352 
353 		while (lastsize <= size) {
354 			dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
355 			lastsize += DBLK_MIN_SIZE;
356 		}
357 	}
358 
359 	dblk_esb_cache = kmem_cache_create("streams_dblk_esb",
360 			sizeof (dblk_t), DBLK_CACHE_ALIGN,
361 			dblk_esb_constructor, dblk_destructor, NULL,
362 			(void *) sizeof (dblk_t), NULL, dblk_kmem_flags);
363 	fthdr_cache = kmem_cache_create("streams_fthdr",
364 		sizeof (fthdr_t), 32, NULL, NULL, NULL, NULL, NULL, 0);
365 	ftblk_cache = kmem_cache_create("streams_ftblk",
366 		sizeof (ftblk_t), 32, NULL, NULL, NULL, NULL, NULL, 0);
367 
368 	/* Initialize Multidata caches */
369 	mmd_init();
370 }
371 
372 /*ARGSUSED*/
373 mblk_t *
374 allocb(size_t size, uint_t pri)
375 {
376 	dblk_t *dbp;
377 	mblk_t *mp;
378 	size_t index;
379 
380 	index =  (size - 1)  >> DBLK_SIZE_SHIFT;
381 
382 	if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
383 		if (size != 0) {
384 			mp = allocb_oversize(size, KM_NOSLEEP);
385 			goto out;
386 		}
387 		index = 0;
388 	}
389 
390 	if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
391 		mp = NULL;
392 		goto out;
393 	}
394 
395 	mp = dbp->db_mblk;
396 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
397 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
398 	mp->b_rptr = mp->b_wptr = dbp->db_base;
399 	mp->b_queue = NULL;
400 	MBLK_BAND_FLAG_WORD(mp) = 0;
401 	STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
402 out:
403 	FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
404 
405 	return (mp);
406 }
407 
408 mblk_t *
409 allocb_tmpl(size_t size, const mblk_t *tmpl)
410 {
411 	mblk_t *mp = allocb(size, 0);
412 
413 	if (mp != NULL) {
414 		cred_t *cr = DB_CRED(tmpl);
415 		if (cr != NULL)
416 			crhold(mp->b_datap->db_credp = cr);
417 		DB_CPID(mp) = DB_CPID(tmpl);
418 		DB_TYPE(mp) = DB_TYPE(tmpl);
419 	}
420 	return (mp);
421 }
422 
423 mblk_t *
424 allocb_cred(size_t size, cred_t *cr)
425 {
426 	mblk_t *mp = allocb(size, 0);
427 
428 	if (mp != NULL && cr != NULL)
429 		crhold(mp->b_datap->db_credp = cr);
430 
431 	return (mp);
432 }
433 
434 mblk_t *
435 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr)
436 {
437 	mblk_t *mp = allocb_wait(size, 0, flags, error);
438 
439 	if (mp != NULL && cr != NULL)
440 		crhold(mp->b_datap->db_credp = cr);
441 
442 	return (mp);
443 }
444 
445 void
446 freeb(mblk_t *mp)
447 {
448 	dblk_t *dbp = mp->b_datap;
449 
450 	ASSERT(dbp->db_ref > 0);
451 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
452 	FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
453 
454 	STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
455 
456 	dbp->db_free(mp, dbp);
457 }
458 
459 void
460 freemsg(mblk_t *mp)
461 {
462 	FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
463 	while (mp) {
464 		dblk_t *dbp = mp->b_datap;
465 		mblk_t *mp_cont = mp->b_cont;
466 
467 		ASSERT(dbp->db_ref > 0);
468 		ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
469 
470 		STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
471 
472 		dbp->db_free(mp, dbp);
473 		mp = mp_cont;
474 	}
475 }
476 
477 /*
478  * Reallocate a block for another use.  Try hard to use the old block.
479  * If the old data is wanted (copy), leave b_wptr at the end of the data,
480  * otherwise return b_wptr = b_rptr.
481  *
482  * This routine is private and unstable.
483  */
484 mblk_t	*
485 reallocb(mblk_t *mp, size_t size, uint_t copy)
486 {
487 	mblk_t		*mp1;
488 	unsigned char	*old_rptr;
489 	ptrdiff_t	cur_size;
490 
491 	if (mp == NULL)
492 		return (allocb(size, BPRI_HI));
493 
494 	cur_size = mp->b_wptr - mp->b_rptr;
495 	old_rptr = mp->b_rptr;
496 
497 	ASSERT(mp->b_datap->db_ref != 0);
498 
499 	if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
500 		/*
501 		 * If the data is wanted and it will fit where it is, no
502 		 * work is required.
503 		 */
504 		if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
505 			return (mp);
506 
507 		mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
508 		mp1 = mp;
509 	} else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
510 		/* XXX other mp state could be copied too, db_flags ... ? */
511 		mp1->b_cont = mp->b_cont;
512 	} else {
513 		return (NULL);
514 	}
515 
516 	if (copy) {
517 		bcopy(old_rptr, mp1->b_rptr, cur_size);
518 		mp1->b_wptr = mp1->b_rptr + cur_size;
519 	}
520 
521 	if (mp != mp1)
522 		freeb(mp);
523 
524 	return (mp1);
525 }
526 
527 static void
528 dblk_lastfree(mblk_t *mp, dblk_t *dbp)
529 {
530 	ASSERT(dbp->db_mblk == mp);
531 	if (dbp->db_fthdr != NULL)
532 		str_ftfree(dbp);
533 
534 	/* set credp and projid to be 'unspecified' before returning to cache */
535 	if (dbp->db_credp != NULL) {
536 		crfree(dbp->db_credp);
537 		dbp->db_credp = NULL;
538 	}
539 	dbp->db_cpid = -1;
540 
541 	/* Reset the struioflag and the checksum flag fields */
542 	dbp->db_struioflag = 0;
543 	dbp->db_struioun.cksum.flags = 0;
544 
545 	/* and the COOKED flag */
546 	dbp->db_flags &= ~DBLK_COOKED;
547 
548 	kmem_cache_free(dbp->db_cache, dbp);
549 }
550 
551 static void
552 dblk_decref(mblk_t *mp, dblk_t *dbp)
553 {
554 	if (dbp->db_ref != 1) {
555 		uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
556 		    -(1 << DBLK_RTFU_SHIFT(db_ref)));
557 		/*
558 		 * atomic_add_32_nv() just decremented db_ref, so we no longer
559 		 * have a reference to the dblk, which means another thread
560 		 * could free it.  Therefore we cannot examine the dblk to
561 		 * determine whether ours was the last reference.  Instead,
562 		 * we extract the new and minimum reference counts from rtfu.
563 		 * Note that all we're really saying is "if (ref != refmin)".
564 		 */
565 		if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
566 		    ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
567 			kmem_cache_free(mblk_cache, mp);
568 			return;
569 		}
570 	}
571 	dbp->db_mblk = mp;
572 	dbp->db_free = dbp->db_lastfree;
573 	dbp->db_lastfree(mp, dbp);
574 }
575 
576 mblk_t *
577 dupb(mblk_t *mp)
578 {
579 	dblk_t *dbp = mp->b_datap;
580 	mblk_t *new_mp;
581 	uint32_t oldrtfu, newrtfu;
582 
583 	if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
584 		goto out;
585 
586 	new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
587 	new_mp->b_rptr = mp->b_rptr;
588 	new_mp->b_wptr = mp->b_wptr;
589 	new_mp->b_datap = dbp;
590 	new_mp->b_queue = NULL;
591 	MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
592 
593 	STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
594 
595 	dbp->db_free = dblk_decref;
596 	do {
597 		ASSERT(dbp->db_ref > 0);
598 		oldrtfu = DBLK_RTFU_WORD(dbp);
599 		newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
600 		/*
601 		 * If db_ref is maxed out we can't dup this message anymore.
602 		 */
603 		if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
604 			kmem_cache_free(mblk_cache, new_mp);
605 			new_mp = NULL;
606 			goto out;
607 		}
608 	} while (cas32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != oldrtfu);
609 
610 out:
611 	FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
612 	return (new_mp);
613 }
614 
615 static void
616 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
617 {
618 	frtn_t *frp = dbp->db_frtnp;
619 
620 	ASSERT(dbp->db_mblk == mp);
621 	frp->free_func(frp->free_arg);
622 	if (dbp->db_fthdr != NULL)
623 		str_ftfree(dbp);
624 
625 	/* set credp and projid to be 'unspecified' before returning to cache */
626 	if (dbp->db_credp != NULL) {
627 		crfree(dbp->db_credp);
628 		dbp->db_credp = NULL;
629 	}
630 	dbp->db_cpid = -1;
631 	dbp->db_struioflag = 0;
632 	dbp->db_struioun.cksum.flags = 0;
633 
634 	kmem_cache_free(dbp->db_cache, dbp);
635 }
636 
637 /*ARGSUSED*/
638 static void
639 frnop_func(void *arg)
640 {
641 }
642 
643 /*
644  * Generic esballoc used to implement the four flavors: [d]esballoc[a].
645  */
646 static mblk_t *
647 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
648 	void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
649 {
650 	dblk_t *dbp;
651 	mblk_t *mp;
652 
653 	ASSERT(base != NULL && frp != NULL);
654 
655 	if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
656 		mp = NULL;
657 		goto out;
658 	}
659 
660 	mp = dbp->db_mblk;
661 	dbp->db_base = base;
662 	dbp->db_lim = base + size;
663 	dbp->db_free = dbp->db_lastfree = lastfree;
664 	dbp->db_frtnp = frp;
665 	DBLK_RTFU_WORD(dbp) = db_rtfu;
666 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
667 	mp->b_rptr = mp->b_wptr = base;
668 	mp->b_queue = NULL;
669 	MBLK_BAND_FLAG_WORD(mp) = 0;
670 
671 out:
672 	FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
673 	return (mp);
674 }
675 
676 /*ARGSUSED*/
677 mblk_t *
678 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
679 {
680 	mblk_t *mp;
681 
682 	/*
683 	 * Note that this is structured to allow the common case (i.e.
684 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
685 	 * call optimization.
686 	 */
687 	if (!str_ftnever) {
688 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
689 		    frp, freebs_enqueue, KM_NOSLEEP);
690 
691 		if (mp != NULL)
692 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
693 		return (mp);
694 	}
695 
696 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
697 	    frp, freebs_enqueue, KM_NOSLEEP));
698 }
699 
700 /*
701  * Same as esballoc() but sleeps waiting for memory.
702  */
703 /*ARGSUSED*/
704 mblk_t *
705 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
706 {
707 	mblk_t *mp;
708 
709 	/*
710 	 * Note that this is structured to allow the common case (i.e.
711 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
712 	 * call optimization.
713 	 */
714 	if (!str_ftnever) {
715 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
716 		    frp, freebs_enqueue, KM_SLEEP);
717 
718 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
719 		return (mp);
720 	}
721 
722 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
723 	    frp, freebs_enqueue, KM_SLEEP));
724 }
725 
726 /*ARGSUSED*/
727 mblk_t *
728 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
729 {
730 	mblk_t *mp;
731 
732 	/*
733 	 * Note that this is structured to allow the common case (i.e.
734 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
735 	 * call optimization.
736 	 */
737 	if (!str_ftnever) {
738 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
739 			frp, dblk_lastfree_desb, KM_NOSLEEP);
740 
741 		if (mp != NULL)
742 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
743 		return (mp);
744 	}
745 
746 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
747 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
748 }
749 
750 /*ARGSUSED*/
751 mblk_t *
752 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
753 {
754 	mblk_t *mp;
755 
756 	/*
757 	 * Note that this is structured to allow the common case (i.e.
758 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
759 	 * call optimization.
760 	 */
761 	if (!str_ftnever) {
762 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
763 		    frp, freebs_enqueue, KM_NOSLEEP);
764 
765 		if (mp != NULL)
766 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
767 		return (mp);
768 	}
769 
770 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
771 	    frp, freebs_enqueue, KM_NOSLEEP));
772 }
773 
774 /*ARGSUSED*/
775 mblk_t *
776 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
777 {
778 	mblk_t *mp;
779 
780 	/*
781 	 * Note that this is structured to allow the common case (i.e.
782 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
783 	 * call optimization.
784 	 */
785 	if (!str_ftnever) {
786 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
787 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
788 
789 		if (mp != NULL)
790 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
791 		return (mp);
792 	}
793 
794 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
795 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
796 }
797 
798 static void
799 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
800 {
801 	bcache_t *bcp = dbp->db_cache;
802 
803 	ASSERT(dbp->db_mblk == mp);
804 	if (dbp->db_fthdr != NULL)
805 		str_ftfree(dbp);
806 
807 	/* set credp and projid to be 'unspecified' before returning to cache */
808 	if (dbp->db_credp != NULL) {
809 		crfree(dbp->db_credp);
810 		dbp->db_credp = NULL;
811 	}
812 	dbp->db_cpid = -1;
813 	dbp->db_struioflag = 0;
814 	dbp->db_struioun.cksum.flags = 0;
815 
816 	mutex_enter(&bcp->mutex);
817 	kmem_cache_free(bcp->dblk_cache, dbp);
818 	bcp->alloc--;
819 
820 	if (bcp->alloc == 0 && bcp->destroy != 0) {
821 		kmem_cache_destroy(bcp->dblk_cache);
822 		kmem_cache_destroy(bcp->buffer_cache);
823 		mutex_exit(&bcp->mutex);
824 		mutex_destroy(&bcp->mutex);
825 		kmem_free(bcp, sizeof (bcache_t));
826 	} else {
827 		mutex_exit(&bcp->mutex);
828 	}
829 }
830 
831 bcache_t *
832 bcache_create(char *name, size_t size, uint_t align)
833 {
834 	bcache_t *bcp;
835 	char buffer[255];
836 
837 	ASSERT((align & (align - 1)) == 0);
838 
839 	if ((bcp = (bcache_t *)kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) ==
840 	    NULL) {
841 		return (NULL);
842 	}
843 
844 	bcp->size = size;
845 	bcp->align = align;
846 	bcp->alloc = 0;
847 	bcp->destroy = 0;
848 
849 	mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
850 
851 	(void) sprintf(buffer, "%s_buffer_cache", name);
852 	bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
853 	    NULL, NULL, NULL, 0);
854 	(void) sprintf(buffer, "%s_dblk_cache", name);
855 	bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
856 	    DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
857 						NULL, (void *)bcp, NULL, 0);
858 
859 	return (bcp);
860 }
861 
862 void
863 bcache_destroy(bcache_t *bcp)
864 {
865 	ASSERT(bcp != NULL);
866 
867 	mutex_enter(&bcp->mutex);
868 	if (bcp->alloc == 0) {
869 		kmem_cache_destroy(bcp->dblk_cache);
870 		kmem_cache_destroy(bcp->buffer_cache);
871 		mutex_exit(&bcp->mutex);
872 		mutex_destroy(&bcp->mutex);
873 		kmem_free(bcp, sizeof (bcache_t));
874 	} else {
875 		bcp->destroy++;
876 		mutex_exit(&bcp->mutex);
877 	}
878 }
879 
880 /*ARGSUSED*/
881 mblk_t *
882 bcache_allocb(bcache_t *bcp, uint_t pri)
883 {
884 	dblk_t *dbp;
885 	mblk_t *mp = NULL;
886 
887 	ASSERT(bcp != NULL);
888 
889 	mutex_enter(&bcp->mutex);
890 	if (bcp->destroy != 0) {
891 		mutex_exit(&bcp->mutex);
892 		goto out;
893 	}
894 
895 	if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
896 		mutex_exit(&bcp->mutex);
897 		goto out;
898 	}
899 	bcp->alloc++;
900 	mutex_exit(&bcp->mutex);
901 
902 	ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
903 
904 	mp = dbp->db_mblk;
905 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
906 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
907 	mp->b_rptr = mp->b_wptr = dbp->db_base;
908 	mp->b_queue = NULL;
909 	MBLK_BAND_FLAG_WORD(mp) = 0;
910 	STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
911 out:
912 	FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
913 
914 	return (mp);
915 }
916 
917 static void
918 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
919 {
920 	ASSERT(dbp->db_mblk == mp);
921 	if (dbp->db_fthdr != NULL)
922 		str_ftfree(dbp);
923 
924 	/* set credp and projid to be 'unspecified' before returning to cache */
925 	if (dbp->db_credp != NULL) {
926 		crfree(dbp->db_credp);
927 		dbp->db_credp = NULL;
928 	}
929 	dbp->db_cpid = -1;
930 	dbp->db_struioflag = 0;
931 	dbp->db_struioun.cksum.flags = 0;
932 
933 	kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
934 	kmem_cache_free(dbp->db_cache, dbp);
935 }
936 
937 static mblk_t *
938 allocb_oversize(size_t size, int kmflags)
939 {
940 	mblk_t *mp;
941 	void *buf;
942 
943 	size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
944 	if ((buf = kmem_alloc(size, kmflags)) == NULL)
945 		return (NULL);
946 	if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
947 	    &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
948 		kmem_free(buf, size);
949 
950 	if (mp != NULL)
951 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
952 
953 	return (mp);
954 }
955 
956 mblk_t *
957 allocb_tryhard(size_t target_size)
958 {
959 	size_t size;
960 	mblk_t *bp;
961 
962 	for (size = target_size; size < target_size + 512;
963 	    size += DBLK_CACHE_ALIGN)
964 		if ((bp = allocb(size, BPRI_HI)) != NULL)
965 			return (bp);
966 	allocb_tryhard_fails++;
967 	return (NULL);
968 }
969 
970 /*
971  * This routine is consolidation private for STREAMS internal use
972  * This routine may only be called from sync routines (i.e., not
973  * from put or service procedures).  It is located here (rather
974  * than strsubr.c) so that we don't have to expose all of the
975  * allocb() implementation details in header files.
976  */
977 mblk_t *
978 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
979 {
980 	dblk_t *dbp;
981 	mblk_t *mp;
982 	size_t index;
983 
984 	index = (size -1) >> DBLK_SIZE_SHIFT;
985 
986 	if (flags & STR_NOSIG) {
987 		if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
988 			if (size != 0) {
989 				mp = allocb_oversize(size, KM_SLEEP);
990 				FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
991 				    (uintptr_t)mp);
992 				return (mp);
993 			}
994 			index = 0;
995 		}
996 
997 		dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
998 		mp = dbp->db_mblk;
999 		DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1000 		mp->b_next = mp->b_prev = mp->b_cont = NULL;
1001 		mp->b_rptr = mp->b_wptr = dbp->db_base;
1002 		mp->b_queue = NULL;
1003 		MBLK_BAND_FLAG_WORD(mp) = 0;
1004 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
1005 
1006 		FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
1007 
1008 	} else {
1009 		while ((mp = allocb(size, pri)) == NULL) {
1010 			if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
1011 				return (NULL);
1012 		}
1013 	}
1014 
1015 	return (mp);
1016 }
1017 
1018 /*
1019  * Call function 'func' with 'arg' when a class zero block can
1020  * be allocated with priority 'pri'.
1021  */
1022 bufcall_id_t
1023 esbbcall(uint_t pri, void (*func)(void *), void *arg)
1024 {
1025 	return (bufcall(1, pri, func, arg));
1026 }
1027 
1028 /*
1029  * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
1030  * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
1031  * This provides consistency for all internal allocators of ioctl.
1032  */
1033 mblk_t *
1034 mkiocb(uint_t cmd)
1035 {
1036 	struct iocblk	*ioc;
1037 	mblk_t		*mp;
1038 
1039 	/*
1040 	 * Allocate enough space for any of the ioctl related messages.
1041 	 */
1042 	if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
1043 		return (NULL);
1044 
1045 	bzero(mp->b_rptr, sizeof (union ioctypes));
1046 
1047 	/*
1048 	 * Set the mblk_t information and ptrs correctly.
1049 	 */
1050 	mp->b_wptr += sizeof (struct iocblk);
1051 	mp->b_datap->db_type = M_IOCTL;
1052 
1053 	/*
1054 	 * Fill in the fields.
1055 	 */
1056 	ioc		= (struct iocblk *)mp->b_rptr;
1057 	ioc->ioc_cmd	= cmd;
1058 	ioc->ioc_cr	= kcred;
1059 	ioc->ioc_id	= getiocseqno();
1060 	ioc->ioc_flag	= IOC_NATIVE;
1061 	return (mp);
1062 }
1063 
1064 /*
1065  * test if block of given size can be allocated with a request of
1066  * the given priority.
1067  * 'pri' is no longer used, but is retained for compatibility.
1068  */
1069 /* ARGSUSED */
1070 int
1071 testb(size_t size, uint_t pri)
1072 {
1073 	return ((size + sizeof (dblk_t)) <= kmem_avail());
1074 }
1075 
1076 /*
1077  * Call function 'func' with argument 'arg' when there is a reasonably
1078  * good chance that a block of size 'size' can be allocated.
1079  * 'pri' is no longer used, but is retained for compatibility.
1080  */
1081 /* ARGSUSED */
1082 bufcall_id_t
1083 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
1084 {
1085 	static long bid = 1;	/* always odd to save checking for zero */
1086 	bufcall_id_t bc_id;
1087 	struct strbufcall *bcp;
1088 
1089 	if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
1090 		return (0);
1091 
1092 	bcp->bc_func = func;
1093 	bcp->bc_arg = arg;
1094 	bcp->bc_size = size;
1095 	bcp->bc_next = NULL;
1096 	bcp->bc_executor = NULL;
1097 
1098 	mutex_enter(&strbcall_lock);
1099 	/*
1100 	 * After bcp is linked into strbcalls and strbcall_lock is dropped there
1101 	 * should be no references to bcp since it may be freed by
1102 	 * runbufcalls(). Since bcp_id field is returned, we save its value in
1103 	 * the local var.
1104 	 */
1105 	bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2);	/* keep it odd */
1106 
1107 	/*
1108 	 * add newly allocated stream event to existing
1109 	 * linked list of events.
1110 	 */
1111 	if (strbcalls.bc_head == NULL) {
1112 		strbcalls.bc_head = strbcalls.bc_tail = bcp;
1113 	} else {
1114 		strbcalls.bc_tail->bc_next = bcp;
1115 		strbcalls.bc_tail = bcp;
1116 	}
1117 
1118 	cv_signal(&strbcall_cv);
1119 	mutex_exit(&strbcall_lock);
1120 	return (bc_id);
1121 }
1122 
1123 /*
1124  * Cancel a bufcall request.
1125  */
1126 void
1127 unbufcall(bufcall_id_t id)
1128 {
1129 	strbufcall_t *bcp, *pbcp;
1130 
1131 	mutex_enter(&strbcall_lock);
1132 again:
1133 	pbcp = NULL;
1134 	for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
1135 		if (id == bcp->bc_id)
1136 			break;
1137 		pbcp = bcp;
1138 	}
1139 	if (bcp) {
1140 		if (bcp->bc_executor != NULL) {
1141 			if (bcp->bc_executor != curthread) {
1142 				cv_wait(&bcall_cv, &strbcall_lock);
1143 				goto again;
1144 			}
1145 		} else {
1146 			if (pbcp)
1147 				pbcp->bc_next = bcp->bc_next;
1148 			else
1149 				strbcalls.bc_head = bcp->bc_next;
1150 			if (bcp == strbcalls.bc_tail)
1151 				strbcalls.bc_tail = pbcp;
1152 			kmem_free(bcp, sizeof (strbufcall_t));
1153 		}
1154 	}
1155 	mutex_exit(&strbcall_lock);
1156 }
1157 
1158 /*
1159  * Duplicate a message block by block (uses dupb), returning
1160  * a pointer to the duplicate message.
1161  * Returns a non-NULL value only if the entire message
1162  * was dup'd.
1163  */
1164 mblk_t *
1165 dupmsg(mblk_t *bp)
1166 {
1167 	mblk_t *head, *nbp;
1168 
1169 	if (!bp || !(nbp = head = dupb(bp)))
1170 		return (NULL);
1171 
1172 	while (bp->b_cont) {
1173 		if (!(nbp->b_cont = dupb(bp->b_cont))) {
1174 			freemsg(head);
1175 			return (NULL);
1176 		}
1177 		nbp = nbp->b_cont;
1178 		bp = bp->b_cont;
1179 	}
1180 	return (head);
1181 }
1182 
1183 #define	DUPB_NOLOAN(bp) \
1184 	((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
1185 	copyb((bp)) : dupb((bp)))
1186 
1187 mblk_t *
1188 dupmsg_noloan(mblk_t *bp)
1189 {
1190 	mblk_t *head, *nbp;
1191 
1192 	if (bp == NULL || DB_TYPE(bp) != M_DATA ||
1193 	    ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
1194 		return (NULL);
1195 
1196 	while (bp->b_cont) {
1197 		if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
1198 			freemsg(head);
1199 			return (NULL);
1200 		}
1201 		nbp = nbp->b_cont;
1202 		bp = bp->b_cont;
1203 	}
1204 	return (head);
1205 }
1206 
1207 /*
1208  * Copy data from message and data block to newly allocated message and
1209  * data block. Returns new message block pointer, or NULL if error.
1210  * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
1211  * as in the original even when db_base is not word aligned. (bug 1052877)
1212  */
1213 mblk_t *
1214 copyb(mblk_t *bp)
1215 {
1216 	mblk_t	*nbp;
1217 	dblk_t	*dp, *ndp;
1218 	uchar_t *base;
1219 	size_t	size;
1220 	size_t	unaligned;
1221 
1222 	ASSERT(bp->b_wptr >= bp->b_rptr);
1223 
1224 	dp = bp->b_datap;
1225 	if (dp->db_fthdr != NULL)
1226 		STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
1227 
1228 	/*
1229 	 * Special handling for Multidata message; this should be
1230 	 * removed once a copy-callback routine is made available.
1231 	 */
1232 	if (dp->db_type == M_MULTIDATA) {
1233 		cred_t *cr;
1234 
1235 		if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL)
1236 			return (NULL);
1237 
1238 		nbp->b_flag = bp->b_flag;
1239 		nbp->b_band = bp->b_band;
1240 		ndp = nbp->b_datap;
1241 
1242 		/* See comments below on potential issues. */
1243 		STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1244 
1245 		ASSERT(ndp->db_type == dp->db_type);
1246 		cr = dp->db_credp;
1247 		if (cr != NULL)
1248 			crhold(ndp->db_credp = cr);
1249 		ndp->db_cpid = dp->db_cpid;
1250 		return (nbp);
1251 	}
1252 
1253 	size = dp->db_lim - dp->db_base;
1254 	unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
1255 	if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
1256 		return (NULL);
1257 	nbp->b_flag = bp->b_flag;
1258 	nbp->b_band = bp->b_band;
1259 	ndp = nbp->b_datap;
1260 
1261 	/*
1262 	 * Well, here is a potential issue.  If we are trying to
1263 	 * trace a flow, and we copy the message, we might lose
1264 	 * information about where this message might have been.
1265 	 * So we should inherit the FT data.  On the other hand,
1266 	 * a user might be interested only in alloc to free data.
1267 	 * So I guess the real answer is to provide a tunable.
1268 	 */
1269 	STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1270 
1271 	base = ndp->db_base + unaligned;
1272 	bcopy(dp->db_base, ndp->db_base + unaligned, size);
1273 
1274 	nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
1275 	nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
1276 
1277 	return (nbp);
1278 }
1279 
1280 /*
1281  * Copy data from message to newly allocated message using new
1282  * data blocks.  Returns a pointer to the new message, or NULL if error.
1283  */
1284 mblk_t *
1285 copymsg(mblk_t *bp)
1286 {
1287 	mblk_t *head, *nbp;
1288 
1289 	if (!bp || !(nbp = head = copyb(bp)))
1290 		return (NULL);
1291 
1292 	while (bp->b_cont) {
1293 		if (!(nbp->b_cont = copyb(bp->b_cont))) {
1294 			freemsg(head);
1295 			return (NULL);
1296 		}
1297 		nbp = nbp->b_cont;
1298 		bp = bp->b_cont;
1299 	}
1300 	return (head);
1301 }
1302 
1303 /*
1304  * link a message block to tail of message
1305  */
1306 void
1307 linkb(mblk_t *mp, mblk_t *bp)
1308 {
1309 	ASSERT(mp && bp);
1310 
1311 	for (; mp->b_cont; mp = mp->b_cont)
1312 		;
1313 	mp->b_cont = bp;
1314 }
1315 
1316 /*
1317  * unlink a message block from head of message
1318  * return pointer to new message.
1319  * NULL if message becomes empty.
1320  */
1321 mblk_t *
1322 unlinkb(mblk_t *bp)
1323 {
1324 	mblk_t *bp1;
1325 
1326 	bp1 = bp->b_cont;
1327 	bp->b_cont = NULL;
1328 	return (bp1);
1329 }
1330 
1331 /*
1332  * remove a message block "bp" from message "mp"
1333  *
1334  * Return pointer to new message or NULL if no message remains.
1335  * Return -1 if bp is not found in message.
1336  */
1337 mblk_t *
1338 rmvb(mblk_t *mp, mblk_t *bp)
1339 {
1340 	mblk_t *tmp;
1341 	mblk_t *lastp = NULL;
1342 
1343 	ASSERT(mp && bp);
1344 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
1345 		if (tmp == bp) {
1346 			if (lastp)
1347 				lastp->b_cont = tmp->b_cont;
1348 			else
1349 				mp = tmp->b_cont;
1350 			tmp->b_cont = NULL;
1351 			return (mp);
1352 		}
1353 		lastp = tmp;
1354 	}
1355 	return ((mblk_t *)-1);
1356 }
1357 
1358 /*
1359  * Concatenate and align first len bytes of common
1360  * message type.  Len == -1, means concat everything.
1361  * Returns 1 on success, 0 on failure
1362  * After the pullup, mp points to the pulled up data.
1363  */
1364 int
1365 pullupmsg(mblk_t *mp, ssize_t len)
1366 {
1367 	mblk_t *bp, *b_cont;
1368 	dblk_t *dbp;
1369 	ssize_t n;
1370 
1371 	ASSERT(mp->b_datap->db_ref > 0);
1372 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
1373 
1374 	/*
1375 	 * We won't handle Multidata message, since it contains
1376 	 * metadata which this function has no knowledge of; we
1377 	 * assert on DEBUG, and return failure otherwise.
1378 	 */
1379 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1380 	if (mp->b_datap->db_type == M_MULTIDATA)
1381 		return (0);
1382 
1383 	if (len == -1) {
1384 		if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
1385 			return (1);
1386 		len = xmsgsize(mp);
1387 	} else {
1388 		ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
1389 		ASSERT(first_mblk_len >= 0);
1390 		/*
1391 		 * If the length is less than that of the first mblk,
1392 		 * we want to pull up the message into an aligned mblk.
1393 		 * Though not part of the spec, some callers assume it.
1394 		 */
1395 		if (len <= first_mblk_len) {
1396 			if (str_aligned(mp->b_rptr))
1397 				return (1);
1398 			len = first_mblk_len;
1399 		} else if (xmsgsize(mp) < len)
1400 			return (0);
1401 	}
1402 
1403 	if ((bp = allocb_tmpl(len, mp)) == NULL)
1404 		return (0);
1405 
1406 	dbp = bp->b_datap;
1407 	*bp = *mp;		/* swap mblks so bp heads the old msg... */
1408 	mp->b_datap = dbp;	/* ... and mp heads the new message */
1409 	mp->b_datap->db_mblk = mp;
1410 	bp->b_datap->db_mblk = bp;
1411 	mp->b_rptr = mp->b_wptr = dbp->db_base;
1412 
1413 	do {
1414 		ASSERT(bp->b_datap->db_ref > 0);
1415 		ASSERT(bp->b_wptr >= bp->b_rptr);
1416 		n = MIN(bp->b_wptr - bp->b_rptr, len);
1417 		bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
1418 		mp->b_wptr += n;
1419 		bp->b_rptr += n;
1420 		len -= n;
1421 		if (bp->b_rptr != bp->b_wptr)
1422 			break;
1423 		b_cont = bp->b_cont;
1424 		freeb(bp);
1425 		bp = b_cont;
1426 	} while (len && bp);
1427 
1428 	mp->b_cont = bp;	/* tack on whatever wasn't pulled up */
1429 
1430 	return (1);
1431 }
1432 
1433 /*
1434  * Concatenate and align at least the first len bytes of common message
1435  * type.  Len == -1 means concatenate everything.  The original message is
1436  * unaltered.  Returns a pointer to a new message on success, otherwise
1437  * returns NULL.
1438  */
1439 mblk_t *
1440 msgpullup(mblk_t *mp, ssize_t len)
1441 {
1442 	mblk_t	*newmp;
1443 	ssize_t	totlen;
1444 	ssize_t	n;
1445 
1446 	/*
1447 	 * We won't handle Multidata message, since it contains
1448 	 * metadata which this function has no knowledge of; we
1449 	 * assert on DEBUG, and return failure otherwise.
1450 	 */
1451 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1452 	if (mp->b_datap->db_type == M_MULTIDATA)
1453 		return (NULL);
1454 
1455 	totlen = xmsgsize(mp);
1456 
1457 	if ((len > 0) && (len > totlen))
1458 		return (NULL);
1459 
1460 	/*
1461 	 * Copy all of the first msg type into one new mblk, then dupmsg
1462 	 * and link the rest onto this.
1463 	 */
1464 
1465 	len = totlen;
1466 
1467 	if ((newmp = allocb_tmpl(len, mp)) == NULL)
1468 		return (NULL);
1469 
1470 	newmp->b_flag = mp->b_flag;
1471 	newmp->b_band = mp->b_band;
1472 
1473 	while (len > 0) {
1474 		n = mp->b_wptr - mp->b_rptr;
1475 		ASSERT(n >= 0);		/* allow zero-length mblk_t's */
1476 		if (n > 0)
1477 			bcopy(mp->b_rptr, newmp->b_wptr, n);
1478 		newmp->b_wptr += n;
1479 		len -= n;
1480 		mp = mp->b_cont;
1481 	}
1482 
1483 	if (mp != NULL) {
1484 		newmp->b_cont = dupmsg(mp);
1485 		if (newmp->b_cont == NULL) {
1486 			freemsg(newmp);
1487 			return (NULL);
1488 		}
1489 	}
1490 
1491 	return (newmp);
1492 }
1493 
1494 /*
1495  * Trim bytes from message
1496  *  len > 0, trim from head
1497  *  len < 0, trim from tail
1498  * Returns 1 on success, 0 on failure.
1499  */
1500 int
1501 adjmsg(mblk_t *mp, ssize_t len)
1502 {
1503 	mblk_t *bp;
1504 	mblk_t *save_bp = NULL;
1505 	mblk_t *prev_bp;
1506 	mblk_t *bcont;
1507 	unsigned char type;
1508 	ssize_t n;
1509 	int fromhead;
1510 	int first;
1511 
1512 	ASSERT(mp != NULL);
1513 	/*
1514 	 * We won't handle Multidata message, since it contains
1515 	 * metadata which this function has no knowledge of; we
1516 	 * assert on DEBUG, and return failure otherwise.
1517 	 */
1518 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1519 	if (mp->b_datap->db_type == M_MULTIDATA)
1520 		return (0);
1521 
1522 	if (len < 0) {
1523 		fromhead = 0;
1524 		len = -len;
1525 	} else {
1526 		fromhead = 1;
1527 	}
1528 
1529 	if (xmsgsize(mp) < len)
1530 		return (0);
1531 
1532 
1533 	if (fromhead) {
1534 		first = 1;
1535 		while (len) {
1536 			ASSERT(mp->b_wptr >= mp->b_rptr);
1537 			n = MIN(mp->b_wptr - mp->b_rptr, len);
1538 			mp->b_rptr += n;
1539 			len -= n;
1540 
1541 			/*
1542 			 * If this is not the first zero length
1543 			 * message remove it
1544 			 */
1545 			if (!first && (mp->b_wptr == mp->b_rptr)) {
1546 				bcont = mp->b_cont;
1547 				freeb(mp);
1548 				mp = save_bp->b_cont = bcont;
1549 			} else {
1550 				save_bp = mp;
1551 				mp = mp->b_cont;
1552 			}
1553 			first = 0;
1554 		}
1555 	} else {
1556 		type = mp->b_datap->db_type;
1557 		while (len) {
1558 			bp = mp;
1559 			save_bp = NULL;
1560 
1561 			/*
1562 			 * Find the last message of same type
1563 			 */
1564 
1565 			while (bp && bp->b_datap->db_type == type) {
1566 				ASSERT(bp->b_wptr >= bp->b_rptr);
1567 				prev_bp = save_bp;
1568 				save_bp = bp;
1569 				bp = bp->b_cont;
1570 			}
1571 			if (save_bp == NULL)
1572 				break;
1573 			n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
1574 			save_bp->b_wptr -= n;
1575 			len -= n;
1576 
1577 			/*
1578 			 * If this is not the first message
1579 			 * and we have taken away everything
1580 			 * from this message, remove it
1581 			 */
1582 
1583 			if ((save_bp != mp) &&
1584 				(save_bp->b_wptr == save_bp->b_rptr)) {
1585 				bcont = save_bp->b_cont;
1586 				freeb(save_bp);
1587 				prev_bp->b_cont = bcont;
1588 			}
1589 		}
1590 	}
1591 	return (1);
1592 }
1593 
1594 /*
1595  * get number of data bytes in message
1596  */
1597 size_t
1598 msgdsize(mblk_t *bp)
1599 {
1600 	size_t count = 0;
1601 
1602 	for (; bp; bp = bp->b_cont)
1603 		if (bp->b_datap->db_type == M_DATA) {
1604 			ASSERT(bp->b_wptr >= bp->b_rptr);
1605 			count += bp->b_wptr - bp->b_rptr;
1606 		}
1607 	return (count);
1608 }
1609 
1610 /*
1611  * Get a message off head of queue
1612  *
1613  * If queue has no buffers then mark queue
1614  * with QWANTR. (queue wants to be read by
1615  * someone when data becomes available)
1616  *
1617  * If there is something to take off then do so.
1618  * If queue falls below hi water mark turn off QFULL
1619  * flag.  Decrement weighted count of queue.
1620  * Also turn off QWANTR because queue is being read.
1621  *
1622  * The queue count is maintained on a per-band basis.
1623  * Priority band 0 (normal messages) uses q_count,
1624  * q_lowat, etc.  Non-zero priority bands use the
1625  * fields in their respective qband structures
1626  * (qb_count, qb_lowat, etc.)  All messages appear
1627  * on the same list, linked via their b_next pointers.
1628  * q_first is the head of the list.  q_count does
1629  * not reflect the size of all the messages on the
1630  * queue.  It only reflects those messages in the
1631  * normal band of flow.  The one exception to this
1632  * deals with high priority messages.  They are in
1633  * their own conceptual "band", but are accounted
1634  * against q_count.
1635  *
1636  * If queue count is below the lo water mark and QWANTW
1637  * is set, enable the closest backq which has a service
1638  * procedure and turn off the QWANTW flag.
1639  *
1640  * getq could be built on top of rmvq, but isn't because
1641  * of performance considerations.
1642  *
1643  * A note on the use of q_count and q_mblkcnt:
1644  *   q_count is the traditional byte count for messages that
1645  *   have been put on a queue.  Documentation tells us that
1646  *   we shouldn't rely on that count, but some drivers/modules
1647  *   do.  What was needed, however, is a mechanism to prevent
1648  *   runaway streams from consuming all of the resources,
1649  *   and particularly be able to flow control zero-length
1650  *   messages.  q_mblkcnt is used for this purpose.  It
1651  *   counts the number of mblk's that are being put on
1652  *   the queue.  The intention here, is that each mblk should
1653  *   contain one byte of data and, for the purpose of
1654  *   flow-control, logically does.  A queue will become
1655  *   full when EITHER of these values (q_count and q_mblkcnt)
1656  *   reach the highwater mark.  It will clear when BOTH
1657  *   of them drop below the highwater mark.  And it will
1658  *   backenable when BOTH of them drop below the lowwater
1659  *   mark.
1660  *   With this algorithm, a driver/module might be able
1661  *   to find a reasonably accurate q_count, and the
1662  *   framework can still try and limit resource usage.
1663  */
1664 mblk_t *
1665 getq(queue_t *q)
1666 {
1667 	mblk_t *bp;
1668 	uchar_t band = 0;
1669 
1670 	bp = getq_noenab(q);
1671 	if (bp != NULL)
1672 		band = bp->b_band;
1673 
1674 	/*
1675 	 * Inlined from qbackenable().
1676 	 * Quick check without holding the lock.
1677 	 */
1678 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1679 		return (bp);
1680 
1681 	qbackenable(q, band);
1682 	return (bp);
1683 }
1684 
1685 /*
1686  * Calculate number of data bytes in a single data message block taking
1687  * multidata messages into account.
1688  */
1689 
1690 #define	ADD_MBLK_SIZE(mp, size) 					\
1691 	if (DB_TYPE(mp) != M_MULTIDATA) {				\
1692 		(size) += MBLKL(mp);					\
1693 	} else {							\
1694 		uint_t	pinuse;						\
1695 									\
1696 		mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse);	\
1697 		(size) += pinuse;					\
1698 	}
1699 
1700 /*
1701  * Like getq() but does not backenable.  This is used by the stream
1702  * head when a putback() is likely.  The caller must call qbackenable()
1703  * after it is done with accessing the queue.
1704  */
1705 mblk_t *
1706 getq_noenab(queue_t *q)
1707 {
1708 	mblk_t *bp;
1709 	mblk_t *tmp;
1710 	qband_t *qbp;
1711 	kthread_id_t freezer;
1712 	int	bytecnt = 0, mblkcnt = 0;
1713 
1714 	/* freezestr should allow its caller to call getq/putq */
1715 	freezer = STREAM(q)->sd_freezer;
1716 	if (freezer == curthread) {
1717 		ASSERT(frozenstr(q));
1718 		ASSERT(MUTEX_HELD(QLOCK(q)));
1719 	} else
1720 		mutex_enter(QLOCK(q));
1721 
1722 	if ((bp = q->q_first) == 0) {
1723 		q->q_flag |= QWANTR;
1724 	} else {
1725 		if ((q->q_first = bp->b_next) == NULL)
1726 			q->q_last = NULL;
1727 		else
1728 			q->q_first->b_prev = NULL;
1729 
1730 		/* Get message byte count for q_count accounting */
1731 		for (tmp = bp; tmp; tmp = tmp->b_cont) {
1732 			ADD_MBLK_SIZE(tmp, bytecnt);
1733 			mblkcnt++;
1734 		}
1735 
1736 		if (bp->b_band == 0) {
1737 			q->q_count -= bytecnt;
1738 			q->q_mblkcnt -= mblkcnt;
1739 			if ((q->q_count < q->q_hiwat) &&
1740 			    (q->q_mblkcnt < q->q_hiwat)) {
1741 				q->q_flag &= ~QFULL;
1742 			}
1743 		} else {
1744 			int i;
1745 
1746 			ASSERT(bp->b_band <= q->q_nband);
1747 			ASSERT(q->q_bandp != NULL);
1748 			ASSERT(MUTEX_HELD(QLOCK(q)));
1749 			qbp = q->q_bandp;
1750 			i = bp->b_band;
1751 			while (--i > 0)
1752 				qbp = qbp->qb_next;
1753 			if (qbp->qb_first == qbp->qb_last) {
1754 				qbp->qb_first = NULL;
1755 				qbp->qb_last = NULL;
1756 			} else {
1757 				qbp->qb_first = bp->b_next;
1758 			}
1759 			qbp->qb_count -= bytecnt;
1760 			qbp->qb_mblkcnt -= mblkcnt;
1761 			if ((qbp->qb_count < qbp->qb_hiwat) &&
1762 			    (qbp->qb_mblkcnt < qbp->qb_hiwat)) {
1763 				qbp->qb_flag &= ~QB_FULL;
1764 			}
1765 		}
1766 		q->q_flag &= ~QWANTR;
1767 		bp->b_next = NULL;
1768 		bp->b_prev = NULL;
1769 	}
1770 	if (freezer != curthread)
1771 		mutex_exit(QLOCK(q));
1772 
1773 	STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL);
1774 
1775 	return (bp);
1776 }
1777 
1778 /*
1779  * Determine if a backenable is needed after removing a message in the
1780  * specified band.
1781  * NOTE: This routine assumes that something like getq_noenab() has been
1782  * already called.
1783  *
1784  * For the read side it is ok to hold sd_lock across calling this (and the
1785  * stream head often does).
1786  * But for the write side strwakeq might be invoked and it acquires sd_lock.
1787  */
1788 void
1789 qbackenable(queue_t *q, uchar_t band)
1790 {
1791 	int backenab = 0;
1792 	qband_t *qbp;
1793 	kthread_id_t freezer;
1794 
1795 	ASSERT(q);
1796 	ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
1797 
1798 	/*
1799 	 * Quick check without holding the lock.
1800 	 * OK since after getq() has lowered the q_count these flags
1801 	 * would not change unless either the qbackenable() is done by
1802 	 * another thread (which is ok) or the queue has gotten QFULL
1803 	 * in which case another backenable will take place when the queue
1804 	 * drops below q_lowat.
1805 	 */
1806 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1807 		return;
1808 
1809 	/* freezestr should allow its caller to call getq/putq */
1810 	freezer = STREAM(q)->sd_freezer;
1811 	if (freezer == curthread) {
1812 		ASSERT(frozenstr(q));
1813 		ASSERT(MUTEX_HELD(QLOCK(q)));
1814 	} else
1815 		mutex_enter(QLOCK(q));
1816 
1817 	if (band == 0) {
1818 		if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
1819 		    q->q_mblkcnt < q->q_lowat)) {
1820 			backenab = q->q_flag & (QWANTW|QWANTWSYNC);
1821 		}
1822 	} else {
1823 		int i;
1824 
1825 		ASSERT((unsigned)band <= q->q_nband);
1826 		ASSERT(q->q_bandp != NULL);
1827 
1828 		qbp = q->q_bandp;
1829 		i = band;
1830 		while (--i > 0)
1831 			qbp = qbp->qb_next;
1832 
1833 		if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
1834 		    qbp->qb_mblkcnt < qbp->qb_lowat)) {
1835 			backenab = qbp->qb_flag & QB_WANTW;
1836 		}
1837 	}
1838 
1839 	if (backenab == 0) {
1840 		if (freezer != curthread)
1841 			mutex_exit(QLOCK(q));
1842 		return;
1843 	}
1844 
1845 	/* Have to drop the lock across strwakeq and backenable */
1846 	if (backenab & QWANTWSYNC)
1847 		q->q_flag &= ~QWANTWSYNC;
1848 	if (backenab & (QWANTW|QB_WANTW)) {
1849 		if (band != 0)
1850 			qbp->qb_flag &= ~QB_WANTW;
1851 		else {
1852 			q->q_flag &= ~QWANTW;
1853 		}
1854 	}
1855 
1856 	if (freezer != curthread)
1857 		mutex_exit(QLOCK(q));
1858 
1859 	if (backenab & QWANTWSYNC)
1860 		strwakeq(q, QWANTWSYNC);
1861 	if (backenab & (QWANTW|QB_WANTW))
1862 		backenable(q, band);
1863 }
1864 
1865 /*
1866  * Remove a message from a queue.  The queue count and other
1867  * flow control parameters are adjusted and the back queue
1868  * enabled if necessary.
1869  *
1870  * rmvq can be called with the stream frozen, but other utility functions
1871  * holding QLOCK, and by streams modules without any locks/frozen.
1872  */
1873 void
1874 rmvq(queue_t *q, mblk_t *mp)
1875 {
1876 	ASSERT(mp != NULL);
1877 
1878 	rmvq_noenab(q, mp);
1879 	if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
1880 		/*
1881 		 * qbackenable can handle a frozen stream but not a "random"
1882 		 * qlock being held. Drop lock across qbackenable.
1883 		 */
1884 		mutex_exit(QLOCK(q));
1885 		qbackenable(q, mp->b_band);
1886 		mutex_enter(QLOCK(q));
1887 	} else {
1888 		qbackenable(q, mp->b_band);
1889 	}
1890 }
1891 
1892 /*
1893  * Like rmvq() but without any backenabling.
1894  * This exists to handle SR_CONSOL_DATA in strrput().
1895  */
1896 void
1897 rmvq_noenab(queue_t *q, mblk_t *mp)
1898 {
1899 	mblk_t *tmp;
1900 	int i;
1901 	qband_t *qbp = NULL;
1902 	kthread_id_t freezer;
1903 	int	bytecnt = 0, mblkcnt = 0;
1904 
1905 	freezer = STREAM(q)->sd_freezer;
1906 	if (freezer == curthread) {
1907 		ASSERT(frozenstr(q));
1908 		ASSERT(MUTEX_HELD(QLOCK(q)));
1909 	} else if (MUTEX_HELD(QLOCK(q))) {
1910 		/* Don't drop lock on exit */
1911 		freezer = curthread;
1912 	} else
1913 		mutex_enter(QLOCK(q));
1914 
1915 	ASSERT(mp->b_band <= q->q_nband);
1916 	if (mp->b_band != 0) {		/* Adjust band pointers */
1917 		ASSERT(q->q_bandp != NULL);
1918 		qbp = q->q_bandp;
1919 		i = mp->b_band;
1920 		while (--i > 0)
1921 			qbp = qbp->qb_next;
1922 		if (mp == qbp->qb_first) {
1923 			if (mp->b_next && mp->b_band == mp->b_next->b_band)
1924 				qbp->qb_first = mp->b_next;
1925 			else
1926 				qbp->qb_first = NULL;
1927 		}
1928 		if (mp == qbp->qb_last) {
1929 			if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
1930 				qbp->qb_last = mp->b_prev;
1931 			else
1932 				qbp->qb_last = NULL;
1933 		}
1934 	}
1935 
1936 	/*
1937 	 * Remove the message from the list.
1938 	 */
1939 	if (mp->b_prev)
1940 		mp->b_prev->b_next = mp->b_next;
1941 	else
1942 		q->q_first = mp->b_next;
1943 	if (mp->b_next)
1944 		mp->b_next->b_prev = mp->b_prev;
1945 	else
1946 		q->q_last = mp->b_prev;
1947 	mp->b_next = NULL;
1948 	mp->b_prev = NULL;
1949 
1950 	/* Get the size of the message for q_count accounting */
1951 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
1952 		ADD_MBLK_SIZE(tmp, bytecnt);
1953 		mblkcnt++;
1954 	}
1955 
1956 	if (mp->b_band == 0) {		/* Perform q_count accounting */
1957 		q->q_count -= bytecnt;
1958 		q->q_mblkcnt -= mblkcnt;
1959 		if ((q->q_count < q->q_hiwat) &&
1960 		    (q->q_mblkcnt < q->q_hiwat)) {
1961 			q->q_flag &= ~QFULL;
1962 		}
1963 	} else {			/* Perform qb_count accounting */
1964 		qbp->qb_count -= bytecnt;
1965 		qbp->qb_mblkcnt -= mblkcnt;
1966 		if ((qbp->qb_count < qbp->qb_hiwat) &&
1967 		    (qbp->qb_mblkcnt < qbp->qb_hiwat)) {
1968 			qbp->qb_flag &= ~QB_FULL;
1969 		}
1970 	}
1971 	if (freezer != curthread)
1972 		mutex_exit(QLOCK(q));
1973 
1974 	STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL);
1975 }
1976 
1977 /*
1978  * Empty a queue.
1979  * If flag is set, remove all messages.  Otherwise, remove
1980  * only non-control messages.  If queue falls below its low
1981  * water mark, and QWANTW is set, enable the nearest upstream
1982  * service procedure.
1983  *
1984  * Historical note: when merging the M_FLUSH code in strrput with this
1985  * code one difference was discovered. flushq did not have a check
1986  * for q_lowat == 0 in the backenabling test.
1987  *
1988  * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
1989  * if one exists on the queue.
1990  */
1991 void
1992 flushq_common(queue_t *q, int flag, int pcproto_flag)
1993 {
1994 	mblk_t *mp, *nmp;
1995 	qband_t *qbp;
1996 	int backenab = 0;
1997 	unsigned char bpri;
1998 	unsigned char	qbf[NBAND];	/* band flushing backenable flags */
1999 
2000 	if (q->q_first == NULL)
2001 		return;
2002 
2003 	mutex_enter(QLOCK(q));
2004 	mp = q->q_first;
2005 	q->q_first = NULL;
2006 	q->q_last = NULL;
2007 	q->q_count = 0;
2008 	q->q_mblkcnt = 0;
2009 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2010 		qbp->qb_first = NULL;
2011 		qbp->qb_last = NULL;
2012 		qbp->qb_count = 0;
2013 		qbp->qb_mblkcnt = 0;
2014 		qbp->qb_flag &= ~QB_FULL;
2015 	}
2016 	q->q_flag &= ~QFULL;
2017 	mutex_exit(QLOCK(q));
2018 	while (mp) {
2019 		nmp = mp->b_next;
2020 		mp->b_next = mp->b_prev = NULL;
2021 
2022 		STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL);
2023 
2024 		if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
2025 			(void) putq(q, mp);
2026 		else if (flag || datamsg(mp->b_datap->db_type))
2027 			freemsg(mp);
2028 		else
2029 			(void) putq(q, mp);
2030 		mp = nmp;
2031 	}
2032 	bpri = 1;
2033 	mutex_enter(QLOCK(q));
2034 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2035 		if ((qbp->qb_flag & QB_WANTW) &&
2036 		    (((qbp->qb_count < qbp->qb_lowat) &&
2037 		    (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
2038 		    qbp->qb_lowat == 0)) {
2039 			qbp->qb_flag &= ~QB_WANTW;
2040 			backenab = 1;
2041 			qbf[bpri] = 1;
2042 		} else
2043 			qbf[bpri] = 0;
2044 		bpri++;
2045 	}
2046 	ASSERT(bpri == (unsigned char)(q->q_nband + 1));
2047 	if ((q->q_flag & QWANTW) &&
2048 	    (((q->q_count < q->q_lowat) &&
2049 	    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2050 		q->q_flag &= ~QWANTW;
2051 		backenab = 1;
2052 		qbf[0] = 1;
2053 	} else
2054 		qbf[0] = 0;
2055 
2056 	/*
2057 	 * If any band can now be written to, and there is a writer
2058 	 * for that band, then backenable the closest service procedure.
2059 	 */
2060 	if (backenab) {
2061 		mutex_exit(QLOCK(q));
2062 		for (bpri = q->q_nband; bpri != 0; bpri--)
2063 			if (qbf[bpri])
2064 				backenable(q, bpri);
2065 		if (qbf[0])
2066 			backenable(q, 0);
2067 	} else
2068 		mutex_exit(QLOCK(q));
2069 }
2070 
2071 /*
2072  * The real flushing takes place in flushq_common. This is done so that
2073  * a flag which specifies whether or not M_PCPROTO messages should be flushed
2074  * or not. Currently the only place that uses this flag is the stream head.
2075  */
2076 void
2077 flushq(queue_t *q, int flag)
2078 {
2079 	flushq_common(q, flag, 0);
2080 }
2081 
2082 /*
2083  * Flush the queue of messages of the given priority band.
2084  * There is some duplication of code between flushq and flushband.
2085  * This is because we want to optimize the code as much as possible.
2086  * The assumption is that there will be more messages in the normal
2087  * (priority 0) band than in any other.
2088  *
2089  * Historical note: when merging the M_FLUSH code in strrput with this
2090  * code one difference was discovered. flushband had an extra check for
2091  * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
2092  * case. That check does not match the man page for flushband and was not
2093  * in the strrput flush code hence it was removed.
2094  */
2095 void
2096 flushband(queue_t *q, unsigned char pri, int flag)
2097 {
2098 	mblk_t *mp;
2099 	mblk_t *nmp;
2100 	mblk_t *last;
2101 	qband_t *qbp;
2102 	int band;
2103 
2104 	ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
2105 	if (pri > q->q_nband) {
2106 		return;
2107 	}
2108 	mutex_enter(QLOCK(q));
2109 	if (pri == 0) {
2110 		mp = q->q_first;
2111 		q->q_first = NULL;
2112 		q->q_last = NULL;
2113 		q->q_count = 0;
2114 		q->q_mblkcnt = 0;
2115 		for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2116 			qbp->qb_first = NULL;
2117 			qbp->qb_last = NULL;
2118 			qbp->qb_count = 0;
2119 			qbp->qb_mblkcnt = 0;
2120 			qbp->qb_flag &= ~QB_FULL;
2121 		}
2122 		q->q_flag &= ~QFULL;
2123 		mutex_exit(QLOCK(q));
2124 		while (mp) {
2125 			nmp = mp->b_next;
2126 			mp->b_next = mp->b_prev = NULL;
2127 			if ((mp->b_band == 0) &&
2128 				((flag == FLUSHALL) ||
2129 				datamsg(mp->b_datap->db_type)))
2130 				freemsg(mp);
2131 			else
2132 				(void) putq(q, mp);
2133 			mp = nmp;
2134 		}
2135 		mutex_enter(QLOCK(q));
2136 		if ((q->q_flag & QWANTW) &&
2137 		    (((q->q_count < q->q_lowat) &&
2138 		    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2139 			q->q_flag &= ~QWANTW;
2140 			mutex_exit(QLOCK(q));
2141 
2142 			backenable(q, pri);
2143 		} else
2144 			mutex_exit(QLOCK(q));
2145 	} else {	/* pri != 0 */
2146 		boolean_t flushed = B_FALSE;
2147 		band = pri;
2148 
2149 		ASSERT(MUTEX_HELD(QLOCK(q)));
2150 		qbp = q->q_bandp;
2151 		while (--band > 0)
2152 			qbp = qbp->qb_next;
2153 		mp = qbp->qb_first;
2154 		if (mp == NULL) {
2155 			mutex_exit(QLOCK(q));
2156 			return;
2157 		}
2158 		last = qbp->qb_last->b_next;
2159 		/*
2160 		 * rmvq_noenab() and freemsg() are called for each mblk that
2161 		 * meets the criteria.  The loop is executed until the last
2162 		 * mblk has been processed.
2163 		 */
2164 		while (mp != last) {
2165 			ASSERT(mp->b_band == pri);
2166 			nmp = mp->b_next;
2167 			if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
2168 				rmvq_noenab(q, mp);
2169 				freemsg(mp);
2170 				flushed = B_TRUE;
2171 			}
2172 			mp = nmp;
2173 		}
2174 		mutex_exit(QLOCK(q));
2175 
2176 		/*
2177 		 * If any mblk(s) has been freed, we know that qbackenable()
2178 		 * will need to be called.
2179 		 */
2180 		if (flushed)
2181 			qbackenable(q, pri);
2182 	}
2183 }
2184 
2185 /*
2186  * Return 1 if the queue is not full.  If the queue is full, return
2187  * 0 (may not put message) and set QWANTW flag (caller wants to write
2188  * to the queue).
2189  */
2190 int
2191 canput(queue_t *q)
2192 {
2193 	TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
2194 
2195 	/* this is for loopback transports, they should not do a canput */
2196 	ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
2197 
2198 	/* Find next forward module that has a service procedure */
2199 	q = q->q_nfsrv;
2200 
2201 	if (!(q->q_flag & QFULL)) {
2202 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2203 		return (1);
2204 	}
2205 	mutex_enter(QLOCK(q));
2206 	if (q->q_flag & QFULL) {
2207 		q->q_flag |= QWANTW;
2208 		mutex_exit(QLOCK(q));
2209 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
2210 		return (0);
2211 	}
2212 	mutex_exit(QLOCK(q));
2213 	TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2214 	return (1);
2215 }
2216 
2217 /*
2218  * This is the new canput for use with priority bands.  Return 1 if the
2219  * band is not full.  If the band is full, return 0 (may not put message)
2220  * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
2221  * write to the queue).
2222  */
2223 int
2224 bcanput(queue_t *q, unsigned char pri)
2225 {
2226 	qband_t *qbp;
2227 
2228 	TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
2229 	if (!q)
2230 		return (0);
2231 
2232 	/* Find next forward module that has a service procedure */
2233 	q = q->q_nfsrv;
2234 
2235 	mutex_enter(QLOCK(q));
2236 	if (pri == 0) {
2237 		if (q->q_flag & QFULL) {
2238 			q->q_flag |= QWANTW;
2239 			mutex_exit(QLOCK(q));
2240 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2241 				"bcanput:%p %X %d", q, pri, 0);
2242 			return (0);
2243 		}
2244 	} else {	/* pri != 0 */
2245 		if (pri > q->q_nband) {
2246 			/*
2247 			 * No band exists yet, so return success.
2248 			 */
2249 			mutex_exit(QLOCK(q));
2250 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2251 				"bcanput:%p %X %d", q, pri, 1);
2252 			return (1);
2253 		}
2254 		qbp = q->q_bandp;
2255 		while (--pri)
2256 			qbp = qbp->qb_next;
2257 		if (qbp->qb_flag & QB_FULL) {
2258 			qbp->qb_flag |= QB_WANTW;
2259 			mutex_exit(QLOCK(q));
2260 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2261 				"bcanput:%p %X %d", q, pri, 0);
2262 			return (0);
2263 		}
2264 	}
2265 	mutex_exit(QLOCK(q));
2266 	TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2267 		"bcanput:%p %X %d", q, pri, 1);
2268 	return (1);
2269 }
2270 
2271 /*
2272  * Put a message on a queue.
2273  *
2274  * Messages are enqueued on a priority basis.  The priority classes
2275  * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
2276  * and B_NORMAL (type < QPCTL && band == 0).
2277  *
2278  * Add appropriate weighted data block sizes to queue count.
2279  * If queue hits high water mark then set QFULL flag.
2280  *
2281  * If QNOENAB is not set (putq is allowed to enable the queue),
2282  * enable the queue only if the message is PRIORITY,
2283  * or the QWANTR flag is set (indicating that the service procedure
2284  * is ready to read the queue.  This implies that a service
2285  * procedure must NEVER put a high priority message back on its own
2286  * queue, as this would result in an infinite loop (!).
2287  */
2288 int
2289 putq(queue_t *q, mblk_t *bp)
2290 {
2291 	mblk_t *tmp;
2292 	qband_t *qbp = NULL;
2293 	int mcls = (int)queclass(bp);
2294 	kthread_id_t freezer;
2295 	int	bytecnt = 0, mblkcnt = 0;
2296 
2297 	freezer = STREAM(q)->sd_freezer;
2298 	if (freezer == curthread) {
2299 		ASSERT(frozenstr(q));
2300 		ASSERT(MUTEX_HELD(QLOCK(q)));
2301 	} else
2302 		mutex_enter(QLOCK(q));
2303 
2304 	/*
2305 	 * Make sanity checks and if qband structure is not yet
2306 	 * allocated, do so.
2307 	 */
2308 	if (mcls == QPCTL) {
2309 		if (bp->b_band != 0)
2310 			bp->b_band = 0;		/* force to be correct */
2311 	} else if (bp->b_band != 0) {
2312 		int i;
2313 		qband_t **qbpp;
2314 
2315 		if (bp->b_band > q->q_nband) {
2316 
2317 			/*
2318 			 * The qband structure for this priority band is
2319 			 * not on the queue yet, so we have to allocate
2320 			 * one on the fly.  It would be wasteful to
2321 			 * associate the qband structures with every
2322 			 * queue when the queues are allocated.  This is
2323 			 * because most queues will only need the normal
2324 			 * band of flow which can be described entirely
2325 			 * by the queue itself.
2326 			 */
2327 			qbpp = &q->q_bandp;
2328 			while (*qbpp)
2329 				qbpp = &(*qbpp)->qb_next;
2330 			while (bp->b_band > q->q_nband) {
2331 				if ((*qbpp = allocband()) == NULL) {
2332 					if (freezer != curthread)
2333 						mutex_exit(QLOCK(q));
2334 					return (0);
2335 				}
2336 				(*qbpp)->qb_hiwat = q->q_hiwat;
2337 				(*qbpp)->qb_lowat = q->q_lowat;
2338 				q->q_nband++;
2339 				qbpp = &(*qbpp)->qb_next;
2340 			}
2341 		}
2342 		ASSERT(MUTEX_HELD(QLOCK(q)));
2343 		qbp = q->q_bandp;
2344 		i = bp->b_band;
2345 		while (--i)
2346 			qbp = qbp->qb_next;
2347 	}
2348 
2349 	/*
2350 	 * If queue is empty, add the message and initialize the pointers.
2351 	 * Otherwise, adjust message pointers and queue pointers based on
2352 	 * the type of the message and where it belongs on the queue.  Some
2353 	 * code is duplicated to minimize the number of conditionals and
2354 	 * hopefully minimize the amount of time this routine takes.
2355 	 */
2356 	if (!q->q_first) {
2357 		bp->b_next = NULL;
2358 		bp->b_prev = NULL;
2359 		q->q_first = bp;
2360 		q->q_last = bp;
2361 		if (qbp) {
2362 			qbp->qb_first = bp;
2363 			qbp->qb_last = bp;
2364 		}
2365 	} else if (!qbp) {	/* bp->b_band == 0 */
2366 
2367 		/*
2368 		 * If queue class of message is less than or equal to
2369 		 * that of the last one on the queue, tack on to the end.
2370 		 */
2371 		tmp = q->q_last;
2372 		if (mcls <= (int)queclass(tmp)) {
2373 			bp->b_next = NULL;
2374 			bp->b_prev = tmp;
2375 			tmp->b_next = bp;
2376 			q->q_last = bp;
2377 		} else {
2378 			tmp = q->q_first;
2379 			while ((int)queclass(tmp) >= mcls)
2380 				tmp = tmp->b_next;
2381 
2382 			/*
2383 			 * Insert bp before tmp.
2384 			 */
2385 			bp->b_next = tmp;
2386 			bp->b_prev = tmp->b_prev;
2387 			if (tmp->b_prev)
2388 				tmp->b_prev->b_next = bp;
2389 			else
2390 				q->q_first = bp;
2391 			tmp->b_prev = bp;
2392 		}
2393 	} else {		/* bp->b_band != 0 */
2394 		if (qbp->qb_first) {
2395 			tmp = qbp->qb_last;
2396 
2397 			/*
2398 			 * Insert bp after the last message in this band.
2399 			 */
2400 			bp->b_next = tmp->b_next;
2401 			if (tmp->b_next)
2402 				tmp->b_next->b_prev = bp;
2403 			else
2404 				q->q_last = bp;
2405 			bp->b_prev = tmp;
2406 			tmp->b_next = bp;
2407 		} else {
2408 			tmp = q->q_last;
2409 			if ((mcls < (int)queclass(tmp)) ||
2410 			    (bp->b_band <= tmp->b_band)) {
2411 
2412 				/*
2413 				 * Tack bp on end of queue.
2414 				 */
2415 				bp->b_next = NULL;
2416 				bp->b_prev = tmp;
2417 				tmp->b_next = bp;
2418 				q->q_last = bp;
2419 			} else {
2420 				tmp = q->q_first;
2421 				while (tmp->b_datap->db_type >= QPCTL)
2422 					tmp = tmp->b_next;
2423 				while (tmp->b_band >= bp->b_band)
2424 					tmp = tmp->b_next;
2425 
2426 				/*
2427 				 * Insert bp before tmp.
2428 				 */
2429 				bp->b_next = tmp;
2430 				bp->b_prev = tmp->b_prev;
2431 				if (tmp->b_prev)
2432 					tmp->b_prev->b_next = bp;
2433 				else
2434 					q->q_first = bp;
2435 				tmp->b_prev = bp;
2436 			}
2437 			qbp->qb_first = bp;
2438 		}
2439 		qbp->qb_last = bp;
2440 	}
2441 
2442 	/* Get message byte count for q_count accounting */
2443 	for (tmp = bp; tmp; tmp = tmp->b_cont) {
2444 		ADD_MBLK_SIZE(tmp, bytecnt);
2445 		mblkcnt++;
2446 	}
2447 
2448 	if (qbp) {
2449 		qbp->qb_count += bytecnt;
2450 		qbp->qb_mblkcnt += mblkcnt;
2451 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2452 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2453 			qbp->qb_flag |= QB_FULL;
2454 		}
2455 	} else {
2456 		q->q_count += bytecnt;
2457 		q->q_mblkcnt += mblkcnt;
2458 		if ((q->q_count >= q->q_hiwat) ||
2459 		    (q->q_mblkcnt >= q->q_hiwat)) {
2460 			q->q_flag |= QFULL;
2461 		}
2462 	}
2463 
2464 	STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL);
2465 
2466 	if ((mcls > QNORM) ||
2467 	    (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
2468 		qenable_locked(q);
2469 	ASSERT(MUTEX_HELD(QLOCK(q)));
2470 	if (freezer != curthread)
2471 		mutex_exit(QLOCK(q));
2472 
2473 	return (1);
2474 }
2475 
2476 /*
2477  * Put stuff back at beginning of Q according to priority order.
2478  * See comment on putq above for details.
2479  */
2480 int
2481 putbq(queue_t *q, mblk_t *bp)
2482 {
2483 	mblk_t *tmp;
2484 	qband_t *qbp = NULL;
2485 	int mcls = (int)queclass(bp);
2486 	kthread_id_t freezer;
2487 	int	bytecnt = 0, mblkcnt = 0;
2488 
2489 	ASSERT(q && bp);
2490 	ASSERT(bp->b_next == NULL);
2491 	freezer = STREAM(q)->sd_freezer;
2492 	if (freezer == curthread) {
2493 		ASSERT(frozenstr(q));
2494 		ASSERT(MUTEX_HELD(QLOCK(q)));
2495 	} else
2496 		mutex_enter(QLOCK(q));
2497 
2498 	/*
2499 	 * Make sanity checks and if qband structure is not yet
2500 	 * allocated, do so.
2501 	 */
2502 	if (mcls == QPCTL) {
2503 		if (bp->b_band != 0)
2504 			bp->b_band = 0;		/* force to be correct */
2505 	} else if (bp->b_band != 0) {
2506 		int i;
2507 		qband_t **qbpp;
2508 
2509 		if (bp->b_band > q->q_nband) {
2510 			qbpp = &q->q_bandp;
2511 			while (*qbpp)
2512 				qbpp = &(*qbpp)->qb_next;
2513 			while (bp->b_band > q->q_nband) {
2514 				if ((*qbpp = allocband()) == NULL) {
2515 					if (freezer != curthread)
2516 						mutex_exit(QLOCK(q));
2517 					return (0);
2518 				}
2519 				(*qbpp)->qb_hiwat = q->q_hiwat;
2520 				(*qbpp)->qb_lowat = q->q_lowat;
2521 				q->q_nband++;
2522 				qbpp = &(*qbpp)->qb_next;
2523 			}
2524 		}
2525 		qbp = q->q_bandp;
2526 		i = bp->b_band;
2527 		while (--i)
2528 			qbp = qbp->qb_next;
2529 	}
2530 
2531 	/*
2532 	 * If queue is empty or if message is high priority,
2533 	 * place on the front of the queue.
2534 	 */
2535 	tmp = q->q_first;
2536 	if ((!tmp) || (mcls == QPCTL)) {
2537 		bp->b_next = tmp;
2538 		if (tmp)
2539 			tmp->b_prev = bp;
2540 		else
2541 			q->q_last = bp;
2542 		q->q_first = bp;
2543 		bp->b_prev = NULL;
2544 		if (qbp) {
2545 			qbp->qb_first = bp;
2546 			qbp->qb_last = bp;
2547 		}
2548 	} else if (qbp) {	/* bp->b_band != 0 */
2549 		tmp = qbp->qb_first;
2550 		if (tmp) {
2551 
2552 			/*
2553 			 * Insert bp before the first message in this band.
2554 			 */
2555 			bp->b_next = tmp;
2556 			bp->b_prev = tmp->b_prev;
2557 			if (tmp->b_prev)
2558 				tmp->b_prev->b_next = bp;
2559 			else
2560 				q->q_first = bp;
2561 			tmp->b_prev = bp;
2562 		} else {
2563 			tmp = q->q_last;
2564 			if ((mcls < (int)queclass(tmp)) ||
2565 			    (bp->b_band < tmp->b_band)) {
2566 
2567 				/*
2568 				 * Tack bp on end of queue.
2569 				 */
2570 				bp->b_next = NULL;
2571 				bp->b_prev = tmp;
2572 				tmp->b_next = bp;
2573 				q->q_last = bp;
2574 			} else {
2575 				tmp = q->q_first;
2576 				while (tmp->b_datap->db_type >= QPCTL)
2577 					tmp = tmp->b_next;
2578 				while (tmp->b_band > bp->b_band)
2579 					tmp = tmp->b_next;
2580 
2581 				/*
2582 				 * Insert bp before tmp.
2583 				 */
2584 				bp->b_next = tmp;
2585 				bp->b_prev = tmp->b_prev;
2586 				if (tmp->b_prev)
2587 					tmp->b_prev->b_next = bp;
2588 				else
2589 					q->q_first = bp;
2590 				tmp->b_prev = bp;
2591 			}
2592 			qbp->qb_last = bp;
2593 		}
2594 		qbp->qb_first = bp;
2595 	} else {		/* bp->b_band == 0 && !QPCTL */
2596 
2597 		/*
2598 		 * If the queue class or band is less than that of the last
2599 		 * message on the queue, tack bp on the end of the queue.
2600 		 */
2601 		tmp = q->q_last;
2602 		if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
2603 			bp->b_next = NULL;
2604 			bp->b_prev = tmp;
2605 			tmp->b_next = bp;
2606 			q->q_last = bp;
2607 		} else {
2608 			tmp = q->q_first;
2609 			while (tmp->b_datap->db_type >= QPCTL)
2610 				tmp = tmp->b_next;
2611 			while (tmp->b_band > bp->b_band)
2612 				tmp = tmp->b_next;
2613 
2614 			/*
2615 			 * Insert bp before tmp.
2616 			 */
2617 			bp->b_next = tmp;
2618 			bp->b_prev = tmp->b_prev;
2619 			if (tmp->b_prev)
2620 				tmp->b_prev->b_next = bp;
2621 			else
2622 				q->q_first = bp;
2623 			tmp->b_prev = bp;
2624 		}
2625 	}
2626 
2627 	/* Get message byte count for q_count accounting */
2628 	for (tmp = bp; tmp; tmp = tmp->b_cont) {
2629 		ADD_MBLK_SIZE(tmp, bytecnt);
2630 		mblkcnt++;
2631 	}
2632 	if (qbp) {
2633 		qbp->qb_count += bytecnt;
2634 		qbp->qb_mblkcnt += mblkcnt;
2635 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2636 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2637 			qbp->qb_flag |= QB_FULL;
2638 		}
2639 	} else {
2640 		q->q_count += bytecnt;
2641 		q->q_mblkcnt += mblkcnt;
2642 		if ((q->q_count >= q->q_hiwat) ||
2643 		    (q->q_mblkcnt >= q->q_hiwat)) {
2644 			q->q_flag |= QFULL;
2645 		}
2646 	}
2647 
2648 	STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL);
2649 
2650 	if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
2651 		qenable_locked(q);
2652 	ASSERT(MUTEX_HELD(QLOCK(q)));
2653 	if (freezer != curthread)
2654 		mutex_exit(QLOCK(q));
2655 
2656 	return (1);
2657 }
2658 
2659 /*
2660  * Insert a message before an existing message on the queue.  If the
2661  * existing message is NULL, the new messages is placed on the end of
2662  * the queue.  The queue class of the new message is ignored.  However,
2663  * the priority band of the new message must adhere to the following
2664  * ordering:
2665  *
2666  *	emp->b_prev->b_band >= mp->b_band >= emp->b_band.
2667  *
2668  * All flow control parameters are updated.
2669  *
2670  * insq can be called with the stream frozen, but other utility functions
2671  * holding QLOCK, and by streams modules without any locks/frozen.
2672  */
2673 int
2674 insq(queue_t *q, mblk_t *emp, mblk_t *mp)
2675 {
2676 	mblk_t *tmp;
2677 	qband_t *qbp = NULL;
2678 	int mcls = (int)queclass(mp);
2679 	kthread_id_t freezer;
2680 	int	bytecnt = 0, mblkcnt = 0;
2681 
2682 	freezer = STREAM(q)->sd_freezer;
2683 	if (freezer == curthread) {
2684 		ASSERT(frozenstr(q));
2685 		ASSERT(MUTEX_HELD(QLOCK(q)));
2686 	} else if (MUTEX_HELD(QLOCK(q))) {
2687 		/* Don't drop lock on exit */
2688 		freezer = curthread;
2689 	} else
2690 		mutex_enter(QLOCK(q));
2691 
2692 	if (mcls == QPCTL) {
2693 		if (mp->b_band != 0)
2694 			mp->b_band = 0;		/* force to be correct */
2695 		if (emp && emp->b_prev &&
2696 		    (emp->b_prev->b_datap->db_type < QPCTL))
2697 			goto badord;
2698 	}
2699 	if (emp) {
2700 		if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
2701 		    (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
2702 		    (emp->b_prev->b_band < mp->b_band))) {
2703 			goto badord;
2704 		}
2705 	} else {
2706 		tmp = q->q_last;
2707 		if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
2708 badord:
2709 			cmn_err(CE_WARN,
2710 			    "insq: attempt to insert message out of order "
2711 			    "on q %p", (void *)q);
2712 			if (freezer != curthread)
2713 				mutex_exit(QLOCK(q));
2714 			return (0);
2715 		}
2716 	}
2717 
2718 	if (mp->b_band != 0) {
2719 		int i;
2720 		qband_t **qbpp;
2721 
2722 		if (mp->b_band > q->q_nband) {
2723 			qbpp = &q->q_bandp;
2724 			while (*qbpp)
2725 				qbpp = &(*qbpp)->qb_next;
2726 			while (mp->b_band > q->q_nband) {
2727 				if ((*qbpp = allocband()) == NULL) {
2728 					if (freezer != curthread)
2729 						mutex_exit(QLOCK(q));
2730 					return (0);
2731 				}
2732 				(*qbpp)->qb_hiwat = q->q_hiwat;
2733 				(*qbpp)->qb_lowat = q->q_lowat;
2734 				q->q_nband++;
2735 				qbpp = &(*qbpp)->qb_next;
2736 			}
2737 		}
2738 		qbp = q->q_bandp;
2739 		i = mp->b_band;
2740 		while (--i)
2741 			qbp = qbp->qb_next;
2742 	}
2743 
2744 	if ((mp->b_next = emp) != NULL) {
2745 		if ((mp->b_prev = emp->b_prev) != NULL)
2746 			emp->b_prev->b_next = mp;
2747 		else
2748 			q->q_first = mp;
2749 		emp->b_prev = mp;
2750 	} else {
2751 		if ((mp->b_prev = q->q_last) != NULL)
2752 			q->q_last->b_next = mp;
2753 		else
2754 			q->q_first = mp;
2755 		q->q_last = mp;
2756 	}
2757 
2758 	/* Get mblk and byte count for q_count accounting */
2759 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
2760 		ADD_MBLK_SIZE(tmp, bytecnt);
2761 		mblkcnt++;
2762 	}
2763 
2764 	if (qbp) {	/* adjust qband pointers and count */
2765 		if (!qbp->qb_first) {
2766 			qbp->qb_first = mp;
2767 			qbp->qb_last = mp;
2768 		} else {
2769 			if (mp->b_prev == NULL || (mp->b_prev != NULL &&
2770 			    (mp->b_prev->b_band != mp->b_band)))
2771 				qbp->qb_first = mp;
2772 			else if (mp->b_next == NULL || (mp->b_next != NULL &&
2773 			    (mp->b_next->b_band != mp->b_band)))
2774 				qbp->qb_last = mp;
2775 		}
2776 		qbp->qb_count += bytecnt;
2777 		qbp->qb_mblkcnt += mblkcnt;
2778 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2779 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2780 			qbp->qb_flag |= QB_FULL;
2781 		}
2782 	} else {
2783 		q->q_count += bytecnt;
2784 		q->q_mblkcnt += mblkcnt;
2785 		if ((q->q_count >= q->q_hiwat) ||
2786 		    (q->q_mblkcnt >= q->q_hiwat)) {
2787 			q->q_flag |= QFULL;
2788 		}
2789 	}
2790 
2791 	STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL);
2792 
2793 	if (canenable(q) && (q->q_flag & QWANTR))
2794 		qenable_locked(q);
2795 
2796 	ASSERT(MUTEX_HELD(QLOCK(q)));
2797 	if (freezer != curthread)
2798 		mutex_exit(QLOCK(q));
2799 
2800 	return (1);
2801 }
2802 
2803 /*
2804  * Create and put a control message on queue.
2805  */
2806 int
2807 putctl(queue_t *q, int type)
2808 {
2809 	mblk_t *bp;
2810 
2811 	if ((datamsg(type) && (type != M_DELAY)) ||
2812 	    (bp = allocb_tryhard(0)) == NULL)
2813 		return (0);
2814 	bp->b_datap->db_type = (unsigned char) type;
2815 
2816 	put(q, bp);
2817 
2818 	return (1);
2819 }
2820 
2821 /*
2822  * Control message with a single-byte parameter
2823  */
2824 int
2825 putctl1(queue_t *q, int type, int param)
2826 {
2827 	mblk_t *bp;
2828 
2829 	if ((datamsg(type) && (type != M_DELAY)) ||
2830 	    (bp = allocb_tryhard(1)) == NULL)
2831 		return (0);
2832 	bp->b_datap->db_type = (unsigned char)type;
2833 	*bp->b_wptr++ = (unsigned char)param;
2834 
2835 	put(q, bp);
2836 
2837 	return (1);
2838 }
2839 
2840 int
2841 putnextctl1(queue_t *q, int type, int param)
2842 {
2843 	mblk_t *bp;
2844 
2845 	if ((datamsg(type) && (type != M_DELAY)) ||
2846 		((bp = allocb_tryhard(1)) == NULL))
2847 		return (0);
2848 
2849 	bp->b_datap->db_type = (unsigned char)type;
2850 	*bp->b_wptr++ = (unsigned char)param;
2851 
2852 	putnext(q, bp);
2853 
2854 	return (1);
2855 }
2856 
2857 int
2858 putnextctl(queue_t *q, int type)
2859 {
2860 	mblk_t *bp;
2861 
2862 	if ((datamsg(type) && (type != M_DELAY)) ||
2863 		((bp = allocb_tryhard(0)) == NULL))
2864 		return (0);
2865 	bp->b_datap->db_type = (unsigned char)type;
2866 
2867 	putnext(q, bp);
2868 
2869 	return (1);
2870 }
2871 
2872 /*
2873  * Return the queue upstream from this one
2874  */
2875 queue_t *
2876 backq(queue_t *q)
2877 {
2878 	q = _OTHERQ(q);
2879 	if (q->q_next) {
2880 		q = q->q_next;
2881 		return (_OTHERQ(q));
2882 	}
2883 	return (NULL);
2884 }
2885 
2886 /*
2887  * Send a block back up the queue in reverse from this
2888  * one (e.g. to respond to ioctls)
2889  */
2890 void
2891 qreply(queue_t *q, mblk_t *bp)
2892 {
2893 	ASSERT(q && bp);
2894 
2895 	putnext(_OTHERQ(q), bp);
2896 }
2897 
2898 /*
2899  * Streams Queue Scheduling
2900  *
2901  * Queues are enabled through qenable() when they have messages to
2902  * process.  They are serviced by queuerun(), which runs each enabled
2903  * queue's service procedure.  The call to queuerun() is processor
2904  * dependent - the general principle is that it be run whenever a queue
2905  * is enabled but before returning to user level.  For system calls,
2906  * the function runqueues() is called if their action causes a queue
2907  * to be enabled.  For device interrupts, queuerun() should be
2908  * called before returning from the last level of interrupt.  Beyond
2909  * this, no timing assumptions should be made about queue scheduling.
2910  */
2911 
2912 /*
2913  * Enable a queue: put it on list of those whose service procedures are
2914  * ready to run and set up the scheduling mechanism.
2915  * The broadcast is done outside the mutex -> to avoid the woken thread
2916  * from contending with the mutex. This is OK 'cos the queue has been
2917  * enqueued on the runlist and flagged safely at this point.
2918  */
2919 void
2920 qenable(queue_t *q)
2921 {
2922 	mutex_enter(QLOCK(q));
2923 	qenable_locked(q);
2924 	mutex_exit(QLOCK(q));
2925 }
2926 /*
2927  * Return number of messages on queue
2928  */
2929 int
2930 qsize(queue_t *qp)
2931 {
2932 	int count = 0;
2933 	mblk_t *mp;
2934 
2935 	mutex_enter(QLOCK(qp));
2936 	for (mp = qp->q_first; mp; mp = mp->b_next)
2937 		count++;
2938 	mutex_exit(QLOCK(qp));
2939 	return (count);
2940 }
2941 
2942 /*
2943  * noenable - set queue so that putq() will not enable it.
2944  * enableok - set queue so that putq() can enable it.
2945  */
2946 void
2947 noenable(queue_t *q)
2948 {
2949 	mutex_enter(QLOCK(q));
2950 	q->q_flag |= QNOENB;
2951 	mutex_exit(QLOCK(q));
2952 }
2953 
2954 void
2955 enableok(queue_t *q)
2956 {
2957 	mutex_enter(QLOCK(q));
2958 	q->q_flag &= ~QNOENB;
2959 	mutex_exit(QLOCK(q));
2960 }
2961 
2962 /*
2963  * Set queue fields.
2964  */
2965 int
2966 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
2967 {
2968 	qband_t *qbp = NULL;
2969 	queue_t	*wrq;
2970 	int error = 0;
2971 	kthread_id_t freezer;
2972 
2973 	freezer = STREAM(q)->sd_freezer;
2974 	if (freezer == curthread) {
2975 		ASSERT(frozenstr(q));
2976 		ASSERT(MUTEX_HELD(QLOCK(q)));
2977 	} else
2978 		mutex_enter(QLOCK(q));
2979 
2980 	if (what >= QBAD) {
2981 		error = EINVAL;
2982 		goto done;
2983 	}
2984 	if (pri != 0) {
2985 		int i;
2986 		qband_t **qbpp;
2987 
2988 		if (pri > q->q_nband) {
2989 			qbpp = &q->q_bandp;
2990 			while (*qbpp)
2991 				qbpp = &(*qbpp)->qb_next;
2992 			while (pri > q->q_nband) {
2993 				if ((*qbpp = allocband()) == NULL) {
2994 					error = EAGAIN;
2995 					goto done;
2996 				}
2997 				(*qbpp)->qb_hiwat = q->q_hiwat;
2998 				(*qbpp)->qb_lowat = q->q_lowat;
2999 				q->q_nband++;
3000 				qbpp = &(*qbpp)->qb_next;
3001 			}
3002 		}
3003 		qbp = q->q_bandp;
3004 		i = pri;
3005 		while (--i)
3006 			qbp = qbp->qb_next;
3007 	}
3008 	switch (what) {
3009 
3010 	case QHIWAT:
3011 		if (qbp)
3012 			qbp->qb_hiwat = (size_t)val;
3013 		else
3014 			q->q_hiwat = (size_t)val;
3015 		break;
3016 
3017 	case QLOWAT:
3018 		if (qbp)
3019 			qbp->qb_lowat = (size_t)val;
3020 		else
3021 			q->q_lowat = (size_t)val;
3022 		break;
3023 
3024 	case QMAXPSZ:
3025 		if (qbp)
3026 			error = EINVAL;
3027 		else
3028 			q->q_maxpsz = (ssize_t)val;
3029 
3030 		/*
3031 		 * Performance concern, strwrite looks at the module below
3032 		 * the stream head for the maxpsz each time it does a write
3033 		 * we now cache it at the stream head.  Check to see if this
3034 		 * queue is sitting directly below the stream head.
3035 		 */
3036 		wrq = STREAM(q)->sd_wrq;
3037 		if (q != wrq->q_next)
3038 			break;
3039 
3040 		/*
3041 		 * If the stream is not frozen drop the current QLOCK and
3042 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3043 		 */
3044 		if (freezer != curthread) {
3045 			mutex_exit(QLOCK(q));
3046 			mutex_enter(QLOCK(wrq));
3047 		}
3048 		ASSERT(MUTEX_HELD(QLOCK(wrq)));
3049 
3050 		if (strmsgsz != 0) {
3051 			if (val == INFPSZ)
3052 				val = strmsgsz;
3053 			else  {
3054 				if (STREAM(q)->sd_vnode->v_type == VFIFO)
3055 					val = MIN(PIPE_BUF, val);
3056 				else
3057 					val = MIN(strmsgsz, val);
3058 			}
3059 		}
3060 		STREAM(q)->sd_qn_maxpsz = val;
3061 		if (freezer != curthread) {
3062 			mutex_exit(QLOCK(wrq));
3063 			mutex_enter(QLOCK(q));
3064 		}
3065 		break;
3066 
3067 	case QMINPSZ:
3068 		if (qbp)
3069 			error = EINVAL;
3070 		else
3071 			q->q_minpsz = (ssize_t)val;
3072 
3073 		/*
3074 		 * Performance concern, strwrite looks at the module below
3075 		 * the stream head for the maxpsz each time it does a write
3076 		 * we now cache it at the stream head.  Check to see if this
3077 		 * queue is sitting directly below the stream head.
3078 		 */
3079 		wrq = STREAM(q)->sd_wrq;
3080 		if (q != wrq->q_next)
3081 			break;
3082 
3083 		/*
3084 		 * If the stream is not frozen drop the current QLOCK and
3085 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3086 		 */
3087 		if (freezer != curthread) {
3088 			mutex_exit(QLOCK(q));
3089 			mutex_enter(QLOCK(wrq));
3090 		}
3091 		STREAM(q)->sd_qn_minpsz = (ssize_t)val;
3092 
3093 		if (freezer != curthread) {
3094 			mutex_exit(QLOCK(wrq));
3095 			mutex_enter(QLOCK(q));
3096 		}
3097 		break;
3098 
3099 	case QSTRUIOT:
3100 		if (qbp)
3101 			error = EINVAL;
3102 		else
3103 			q->q_struiot = (ushort_t)val;
3104 		break;
3105 
3106 	case QCOUNT:
3107 	case QFIRST:
3108 	case QLAST:
3109 	case QFLAG:
3110 		error = EPERM;
3111 		break;
3112 
3113 	default:
3114 		error = EINVAL;
3115 		break;
3116 	}
3117 done:
3118 	if (freezer != curthread)
3119 		mutex_exit(QLOCK(q));
3120 	return (error);
3121 }
3122 
3123 /*
3124  * Get queue fields.
3125  */
3126 int
3127 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
3128 {
3129 	qband_t 	*qbp = NULL;
3130 	int 		error = 0;
3131 	kthread_id_t 	freezer;
3132 
3133 	freezer = STREAM(q)->sd_freezer;
3134 	if (freezer == curthread) {
3135 		ASSERT(frozenstr(q));
3136 		ASSERT(MUTEX_HELD(QLOCK(q)));
3137 	} else
3138 		mutex_enter(QLOCK(q));
3139 	if (what >= QBAD) {
3140 		error = EINVAL;
3141 		goto done;
3142 	}
3143 	if (pri != 0) {
3144 		int i;
3145 		qband_t **qbpp;
3146 
3147 		if (pri > q->q_nband) {
3148 			qbpp = &q->q_bandp;
3149 			while (*qbpp)
3150 				qbpp = &(*qbpp)->qb_next;
3151 			while (pri > q->q_nband) {
3152 				if ((*qbpp = allocband()) == NULL) {
3153 					error = EAGAIN;
3154 					goto done;
3155 				}
3156 				(*qbpp)->qb_hiwat = q->q_hiwat;
3157 				(*qbpp)->qb_lowat = q->q_lowat;
3158 				q->q_nband++;
3159 				qbpp = &(*qbpp)->qb_next;
3160 			}
3161 		}
3162 		qbp = q->q_bandp;
3163 		i = pri;
3164 		while (--i)
3165 			qbp = qbp->qb_next;
3166 	}
3167 	switch (what) {
3168 	case QHIWAT:
3169 		if (qbp)
3170 			*(size_t *)valp = qbp->qb_hiwat;
3171 		else
3172 			*(size_t *)valp = q->q_hiwat;
3173 		break;
3174 
3175 	case QLOWAT:
3176 		if (qbp)
3177 			*(size_t *)valp = qbp->qb_lowat;
3178 		else
3179 			*(size_t *)valp = q->q_lowat;
3180 		break;
3181 
3182 	case QMAXPSZ:
3183 		if (qbp)
3184 			error = EINVAL;
3185 		else
3186 			*(ssize_t *)valp = q->q_maxpsz;
3187 		break;
3188 
3189 	case QMINPSZ:
3190 		if (qbp)
3191 			error = EINVAL;
3192 		else
3193 			*(ssize_t *)valp = q->q_minpsz;
3194 		break;
3195 
3196 	case QCOUNT:
3197 		if (qbp)
3198 			*(size_t *)valp = qbp->qb_count;
3199 		else
3200 			*(size_t *)valp = q->q_count;
3201 		break;
3202 
3203 	case QFIRST:
3204 		if (qbp)
3205 			*(mblk_t **)valp = qbp->qb_first;
3206 		else
3207 			*(mblk_t **)valp = q->q_first;
3208 		break;
3209 
3210 	case QLAST:
3211 		if (qbp)
3212 			*(mblk_t **)valp = qbp->qb_last;
3213 		else
3214 			*(mblk_t **)valp = q->q_last;
3215 		break;
3216 
3217 	case QFLAG:
3218 		if (qbp)
3219 			*(uint_t *)valp = qbp->qb_flag;
3220 		else
3221 			*(uint_t *)valp = q->q_flag;
3222 		break;
3223 
3224 	case QSTRUIOT:
3225 		if (qbp)
3226 			error = EINVAL;
3227 		else
3228 			*(short *)valp = q->q_struiot;
3229 		break;
3230 
3231 	default:
3232 		error = EINVAL;
3233 		break;
3234 	}
3235 done:
3236 	if (freezer != curthread)
3237 		mutex_exit(QLOCK(q));
3238 	return (error);
3239 }
3240 
3241 /*
3242  * Function awakes all in cvwait/sigwait/pollwait, on one of:
3243  *	QWANTWSYNC or QWANTR or QWANTW,
3244  *
3245  * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
3246  *	 deferred wakeup will be done. Also if strpoll() in progress then a
3247  *	 deferred pollwakeup will be done.
3248  */
3249 void
3250 strwakeq(queue_t *q, int flag)
3251 {
3252 	stdata_t 	*stp = STREAM(q);
3253 	pollhead_t 	*pl;
3254 
3255 	mutex_enter(&stp->sd_lock);
3256 	pl = &stp->sd_pollist;
3257 	if (flag & QWANTWSYNC) {
3258 		ASSERT(!(q->q_flag & QREADR));
3259 		if (stp->sd_flag & WSLEEP) {
3260 			stp->sd_flag &= ~WSLEEP;
3261 			cv_broadcast(&stp->sd_wrq->q_wait);
3262 		} else {
3263 			stp->sd_wakeq |= WSLEEP;
3264 		}
3265 
3266 		mutex_exit(&stp->sd_lock);
3267 		pollwakeup(pl, POLLWRNORM);
3268 		mutex_enter(&stp->sd_lock);
3269 
3270 		if (stp->sd_sigflags & S_WRNORM)
3271 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3272 	} else if (flag & QWANTR) {
3273 		if (stp->sd_flag & RSLEEP) {
3274 			stp->sd_flag &= ~RSLEEP;
3275 			cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
3276 		} else {
3277 			stp->sd_wakeq |= RSLEEP;
3278 		}
3279 
3280 		mutex_exit(&stp->sd_lock);
3281 		pollwakeup(pl, POLLIN | POLLRDNORM);
3282 		mutex_enter(&stp->sd_lock);
3283 
3284 		{
3285 			int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
3286 
3287 			if (events)
3288 				strsendsig(stp->sd_siglist, events, 0, 0);
3289 		}
3290 	} else {
3291 		if (stp->sd_flag & WSLEEP) {
3292 			stp->sd_flag &= ~WSLEEP;
3293 			cv_broadcast(&stp->sd_wrq->q_wait);
3294 		}
3295 
3296 		mutex_exit(&stp->sd_lock);
3297 		pollwakeup(pl, POLLWRNORM);
3298 		mutex_enter(&stp->sd_lock);
3299 
3300 		if (stp->sd_sigflags & S_WRNORM)
3301 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3302 	}
3303 	mutex_exit(&stp->sd_lock);
3304 }
3305 
3306 int
3307 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
3308 {
3309 	stdata_t *stp = STREAM(q);
3310 	int typ  = STRUIOT_STANDARD;
3311 	uio_t	 *uiop = &dp->d_uio;
3312 	dblk_t	 *dbp;
3313 	ssize_t	 uiocnt;
3314 	ssize_t	 cnt;
3315 	unsigned char *ptr;
3316 	ssize_t	 resid;
3317 	int	 error = 0;
3318 	on_trap_data_t otd;
3319 	queue_t	*stwrq;
3320 
3321 	/*
3322 	 * Plumbing may change while taking the type so store the
3323 	 * queue in a temporary variable. It doesn't matter even
3324 	 * if the we take the type from the previous plumbing,
3325 	 * that's because if the plumbing has changed when we were
3326 	 * holding the queue in a temporary variable, we can continue
3327 	 * processing the message the way it would have been processed
3328 	 * in the old plumbing, without any side effects but a bit
3329 	 * extra processing for partial ip header checksum.
3330 	 *
3331 	 * This has been done to avoid holding the sd_lock which is
3332 	 * very hot.
3333 	 */
3334 
3335 	stwrq = stp->sd_struiowrq;
3336 	if (stwrq)
3337 		typ = stwrq->q_struiot;
3338 
3339 	for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
3340 		dbp = mp->b_datap;
3341 		ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
3342 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3343 		cnt = MIN(uiocnt, uiop->uio_resid);
3344 		if (!(dbp->db_struioflag & STRUIO_SPEC) ||
3345 		    (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
3346 			/*
3347 			 * Either this mblk has already been processed
3348 			 * or there is no more room in this mblk (?).
3349 			 */
3350 			continue;
3351 		}
3352 		switch (typ) {
3353 		case STRUIOT_STANDARD:
3354 			if (noblock) {
3355 				if (on_trap(&otd, OT_DATA_ACCESS)) {
3356 					no_trap();
3357 					error = EWOULDBLOCK;
3358 					goto out;
3359 				}
3360 			}
3361 			if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
3362 				if (noblock)
3363 					no_trap();
3364 				goto out;
3365 			}
3366 			if (noblock)
3367 				no_trap();
3368 			break;
3369 
3370 		default:
3371 			error = EIO;
3372 			goto out;
3373 		}
3374 		dbp->db_struioflag |= STRUIO_DONE;
3375 		dbp->db_cksumstuff += cnt;
3376 	}
3377 out:
3378 	if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
3379 		/*
3380 		 * A fault has occured and some bytes were moved to the
3381 		 * current mblk, the uio_t has already been updated by
3382 		 * the appropriate uio routine, so also update the mblk
3383 		 * to reflect this in case this same mblk chain is used
3384 		 * again (after the fault has been handled).
3385 		 */
3386 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3387 		if (uiocnt >= resid)
3388 			dbp->db_cksumstuff += resid;
3389 	}
3390 	return (error);
3391 }
3392 
3393 /*
3394  * Try to enter queue synchronously. Any attempt to enter a closing queue will
3395  * fails. The qp->q_rwcnt keeps track of the number of successful entries so
3396  * that removeq() will not try to close the queue while a thread is inside the
3397  * queue.
3398  */
3399 static boolean_t
3400 rwnext_enter(queue_t *qp)
3401 {
3402 	mutex_enter(QLOCK(qp));
3403 	if (qp->q_flag & QWCLOSE) {
3404 		mutex_exit(QLOCK(qp));
3405 		return (B_FALSE);
3406 	}
3407 	qp->q_rwcnt++;
3408 	ASSERT(qp->q_rwcnt != 0);
3409 	mutex_exit(QLOCK(qp));
3410 	return (B_TRUE);
3411 }
3412 
3413 /*
3414  * Decrease the count of threads running in sync stream queue and wake up any
3415  * threads blocked in removeq().
3416  */
3417 static void
3418 rwnext_exit(queue_t *qp)
3419 {
3420 	mutex_enter(QLOCK(qp));
3421 	qp->q_rwcnt--;
3422 	if (qp->q_flag & QWANTRMQSYNC) {
3423 		qp->q_flag &= ~QWANTRMQSYNC;
3424 		cv_broadcast(&qp->q_wait);
3425 	}
3426 	mutex_exit(QLOCK(qp));
3427 }
3428 
3429 /*
3430  * The purpose of rwnext() is to call the rw procedure of the next
3431  * (downstream) modules queue.
3432  *
3433  * treated as put entrypoint for perimeter syncronization.
3434  *
3435  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3436  * sync queues). If it is CIPUT sync queue sq_count is incremented and it does
3437  * not matter if any regular put entrypoints have been already entered. We
3438  * can't increment one of the sq_putcounts (instead of sq_count) because
3439  * qwait_rw won't know which counter to decrement.
3440  *
3441  * It would be reasonable to add the lockless FASTPUT logic.
3442  */
3443 int
3444 rwnext(queue_t *qp, struiod_t *dp)
3445 {
3446 	queue_t		*nqp;
3447 	syncq_t		*sq;
3448 	uint16_t	count;
3449 	uint16_t	flags;
3450 	struct qinit	*qi;
3451 	int		(*proc)();
3452 	struct stdata	*stp;
3453 	int		isread;
3454 	int		rval;
3455 
3456 	stp = STREAM(qp);
3457 	/*
3458 	 * Prevent q_next from changing by holding sd_lock until acquiring
3459 	 * SQLOCK. Note that a read-side rwnext from the streamhead will
3460 	 * already have sd_lock acquired. In either case sd_lock is always
3461 	 * released after acquiring SQLOCK.
3462 	 *
3463 	 * The streamhead read-side holding sd_lock when calling rwnext is
3464 	 * required to prevent a race condition were M_DATA mblks flowing
3465 	 * up the read-side of the stream could be bypassed by a rwnext()
3466 	 * down-call. In this case sd_lock acts as the streamhead perimeter.
3467 	 */
3468 	if ((nqp = _WR(qp)) == qp) {
3469 		isread = 0;
3470 		mutex_enter(&stp->sd_lock);
3471 		qp = nqp->q_next;
3472 	} else {
3473 		isread = 1;
3474 		if (nqp != stp->sd_wrq)
3475 			/* Not streamhead */
3476 			mutex_enter(&stp->sd_lock);
3477 		qp = _RD(nqp->q_next);
3478 	}
3479 	qi = qp->q_qinfo;
3480 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
3481 		/*
3482 		 * Not a synchronous module or no r/w procedure for this
3483 		 * queue, so just return EINVAL and let the caller handle it.
3484 		 */
3485 		mutex_exit(&stp->sd_lock);
3486 		return (EINVAL);
3487 	}
3488 
3489 	if (rwnext_enter(qp) == B_FALSE) {
3490 		mutex_exit(&stp->sd_lock);
3491 		return (EINVAL);
3492 	}
3493 
3494 	sq = qp->q_syncq;
3495 	mutex_enter(SQLOCK(sq));
3496 	mutex_exit(&stp->sd_lock);
3497 	count = sq->sq_count;
3498 	flags = sq->sq_flags;
3499 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3500 
3501 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3502 		/*
3503 		 * if this queue is being closed, return.
3504 		 */
3505 		if (qp->q_flag & QWCLOSE) {
3506 			mutex_exit(SQLOCK(sq));
3507 			rwnext_exit(qp);
3508 			return (EINVAL);
3509 		}
3510 
3511 		/*
3512 		 * Wait until we can enter the inner perimeter.
3513 		 */
3514 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3515 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3516 		count = sq->sq_count;
3517 		flags = sq->sq_flags;
3518 	}
3519 
3520 	if (isread == 0 && stp->sd_struiowrq == NULL ||
3521 	    isread == 1 && stp->sd_struiordq == NULL) {
3522 		/*
3523 		 * Stream plumbing changed while waiting for inner perimeter
3524 		 * so just return EINVAL and let the caller handle it.
3525 		 */
3526 		mutex_exit(SQLOCK(sq));
3527 		rwnext_exit(qp);
3528 		return (EINVAL);
3529 	}
3530 	if (!(flags & SQ_CIPUT))
3531 		sq->sq_flags = flags | SQ_EXCL;
3532 	sq->sq_count = count + 1;
3533 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3534 	/*
3535 	 * Note: The only message ordering guarantee that rwnext() makes is
3536 	 *	 for the write queue flow-control case. All others (r/w queue
3537 	 *	 with q_count > 0 (or q_first != 0)) are the resposibilty of
3538 	 *	 the queue's rw procedure. This could be genralized here buy
3539 	 *	 running the queue's service procedure, but that wouldn't be
3540 	 *	 the most efficent for all cases.
3541 	 */
3542 	mutex_exit(SQLOCK(sq));
3543 	if (! isread && (qp->q_flag & QFULL)) {
3544 		/*
3545 		 * Write queue may be flow controlled. If so,
3546 		 * mark the queue for wakeup when it's not.
3547 		 */
3548 		mutex_enter(QLOCK(qp));
3549 		if (qp->q_flag & QFULL) {
3550 			qp->q_flag |= QWANTWSYNC;
3551 			mutex_exit(QLOCK(qp));
3552 			rval = EWOULDBLOCK;
3553 			goto out;
3554 		}
3555 		mutex_exit(QLOCK(qp));
3556 	}
3557 
3558 	if (! isread && dp->d_mp)
3559 		STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
3560 		    dp->d_mp->b_datap->db_base);
3561 
3562 	rval = (*proc)(qp, dp);
3563 
3564 	if (isread && dp->d_mp)
3565 		STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
3566 		    dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
3567 out:
3568 	/*
3569 	 * The queue is protected from being freed by sq_count, so it is
3570 	 * safe to call rwnext_exit and reacquire SQLOCK(sq).
3571 	 */
3572 	rwnext_exit(qp);
3573 
3574 	mutex_enter(SQLOCK(sq));
3575 	flags = sq->sq_flags;
3576 	ASSERT(sq->sq_count != 0);
3577 	sq->sq_count--;
3578 	if (flags & SQ_TAIL) {
3579 		putnext_tail(sq, qp, flags);
3580 		/*
3581 		 * The only purpose of this ASSERT is to preserve calling stack
3582 		 * in DEBUG kernel.
3583 		 */
3584 		ASSERT(flags & SQ_TAIL);
3585 		return (rval);
3586 	}
3587 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3588 	/*
3589 	 * Safe to always drop SQ_EXCL:
3590 	 *	Not SQ_CIPUT means we set SQ_EXCL above
3591 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3592 	 *	did a qwriter(INNER) in which case nobody else
3593 	 *	is in the inner perimeter and we are exiting.
3594 	 *
3595 	 * I would like to make the following assertion:
3596 	 *
3597 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3598 	 * 	sq->sq_count == 0);
3599 	 *
3600 	 * which indicates that if we are both putshared and exclusive,
3601 	 * we became exclusive while executing the putproc, and the only
3602 	 * claim on the syncq was the one we dropped a few lines above.
3603 	 * But other threads that enter putnext while the syncq is exclusive
3604 	 * need to make a claim as they may need to drop SQLOCK in the
3605 	 * has_writers case to avoid deadlocks.  If these threads are
3606 	 * delayed or preempted, it is possible that the writer thread can
3607 	 * find out that there are other claims making the (sq_count == 0)
3608 	 * test invalid.
3609 	 */
3610 
3611 	sq->sq_flags = flags & ~SQ_EXCL;
3612 	if (sq->sq_flags & SQ_WANTWAKEUP) {
3613 		sq->sq_flags &= ~SQ_WANTWAKEUP;
3614 		cv_broadcast(&sq->sq_wait);
3615 	}
3616 	mutex_exit(SQLOCK(sq));
3617 	return (rval);
3618 }
3619 
3620 /*
3621  * The purpose of infonext() is to call the info procedure of the next
3622  * (downstream) modules queue.
3623  *
3624  * treated as put entrypoint for perimeter syncronization.
3625  *
3626  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3627  * sync queues). If it is CIPUT sync queue regular sq_count is incremented and
3628  * it does not matter if any regular put entrypoints have been already
3629  * entered.
3630  */
3631 int
3632 infonext(queue_t *qp, infod_t *idp)
3633 {
3634 	queue_t		*nqp;
3635 	syncq_t		*sq;
3636 	uint16_t	count;
3637 	uint16_t 	flags;
3638 	struct qinit	*qi;
3639 	int		(*proc)();
3640 	struct stdata	*stp;
3641 	int		rval;
3642 
3643 	stp = STREAM(qp);
3644 	/*
3645 	 * Prevent q_next from changing by holding sd_lock until
3646 	 * acquiring SQLOCK.
3647 	 */
3648 	mutex_enter(&stp->sd_lock);
3649 	if ((nqp = _WR(qp)) == qp) {
3650 		qp = nqp->q_next;
3651 	} else {
3652 		qp = _RD(nqp->q_next);
3653 	}
3654 	qi = qp->q_qinfo;
3655 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
3656 		mutex_exit(&stp->sd_lock);
3657 		return (EINVAL);
3658 	}
3659 	sq = qp->q_syncq;
3660 	mutex_enter(SQLOCK(sq));
3661 	mutex_exit(&stp->sd_lock);
3662 	count = sq->sq_count;
3663 	flags = sq->sq_flags;
3664 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3665 
3666 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3667 		/*
3668 		 * Wait until we can enter the inner perimeter.
3669 		 */
3670 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3671 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3672 		count = sq->sq_count;
3673 		flags = sq->sq_flags;
3674 	}
3675 
3676 	if (! (flags & SQ_CIPUT))
3677 		sq->sq_flags = flags | SQ_EXCL;
3678 	sq->sq_count = count + 1;
3679 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3680 	mutex_exit(SQLOCK(sq));
3681 
3682 	rval = (*proc)(qp, idp);
3683 
3684 	mutex_enter(SQLOCK(sq));
3685 	flags = sq->sq_flags;
3686 	ASSERT(sq->sq_count != 0);
3687 	sq->sq_count--;
3688 	if (flags & SQ_TAIL) {
3689 		putnext_tail(sq, qp, flags);
3690 		/*
3691 		 * The only purpose of this ASSERT is to preserve calling stack
3692 		 * in DEBUG kernel.
3693 		 */
3694 		ASSERT(flags & SQ_TAIL);
3695 		return (rval);
3696 	}
3697 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3698 /*
3699  * XXXX
3700  * I am not certain the next comment is correct here.  I need to consider
3701  * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
3702  * might cause other problems.  It just might be safer to drop it if
3703  * !SQ_CIPUT because that is when we set it.
3704  */
3705 	/*
3706 	 * Safe to always drop SQ_EXCL:
3707 	 *	Not SQ_CIPUT means we set SQ_EXCL above
3708 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3709 	 *	did a qwriter(INNER) in which case nobody else
3710 	 *	is in the inner perimeter and we are exiting.
3711 	 *
3712 	 * I would like to make the following assertion:
3713 	 *
3714 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3715 	 *	sq->sq_count == 0);
3716 	 *
3717 	 * which indicates that if we are both putshared and exclusive,
3718 	 * we became exclusive while executing the putproc, and the only
3719 	 * claim on the syncq was the one we dropped a few lines above.
3720 	 * But other threads that enter putnext while the syncq is exclusive
3721 	 * need to make a claim as they may need to drop SQLOCK in the
3722 	 * has_writers case to avoid deadlocks.  If these threads are
3723 	 * delayed or preempted, it is possible that the writer thread can
3724 	 * find out that there are other claims making the (sq_count == 0)
3725 	 * test invalid.
3726 	 */
3727 
3728 	sq->sq_flags = flags & ~SQ_EXCL;
3729 	mutex_exit(SQLOCK(sq));
3730 	return (rval);
3731 }
3732 
3733 /*
3734  * Return nonzero if the queue is responsible for struio(), else return 0.
3735  */
3736 int
3737 isuioq(queue_t *q)
3738 {
3739 	if (q->q_flag & QREADR)
3740 		return (STREAM(q)->sd_struiordq == q);
3741 	else
3742 		return (STREAM(q)->sd_struiowrq == q);
3743 }
3744 
3745 #if defined(__sparc)
3746 int disable_putlocks = 0;
3747 #else
3748 int disable_putlocks = 1;
3749 #endif
3750 
3751 /*
3752  * called by create_putlock.
3753  */
3754 static void
3755 create_syncq_putlocks(queue_t *q)
3756 {
3757 	syncq_t	*sq = q->q_syncq;
3758 	ciputctrl_t *cip;
3759 	int i;
3760 
3761 	ASSERT(sq != NULL);
3762 
3763 	ASSERT(disable_putlocks == 0);
3764 	ASSERT(n_ciputctrl >= min_n_ciputctrl);
3765 	ASSERT(ciputctrl_cache != NULL);
3766 
3767 	if (!(sq->sq_type & SQ_CIPUT))
3768 		return;
3769 
3770 	for (i = 0; i <= 1; i++) {
3771 		if (sq->sq_ciputctrl == NULL) {
3772 			cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
3773 			SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
3774 			mutex_enter(SQLOCK(sq));
3775 			if (sq->sq_ciputctrl != NULL) {
3776 				mutex_exit(SQLOCK(sq));
3777 				kmem_cache_free(ciputctrl_cache, cip);
3778 			} else {
3779 				ASSERT(sq->sq_nciputctrl == 0);
3780 				sq->sq_nciputctrl = n_ciputctrl - 1;
3781 				/*
3782 				 * putnext checks sq_ciputctrl without holding
3783 				 * SQLOCK. if it is not NULL putnext assumes
3784 				 * sq_nciputctrl is initialized. membar below
3785 				 * insures that.
3786 				 */
3787 				membar_producer();
3788 				sq->sq_ciputctrl = cip;
3789 				mutex_exit(SQLOCK(sq));
3790 			}
3791 		}
3792 		ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
3793 		if (i == 1)
3794 			break;
3795 		q = _OTHERQ(q);
3796 		if (!(q->q_flag & QPERQ)) {
3797 			ASSERT(sq == q->q_syncq);
3798 			break;
3799 		}
3800 		ASSERT(q->q_syncq != NULL);
3801 		ASSERT(sq != q->q_syncq);
3802 		sq = q->q_syncq;
3803 		ASSERT(sq->sq_type & SQ_CIPUT);
3804 	}
3805 }
3806 
3807 /*
3808  * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
3809  * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
3810  * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
3811  * starting from q and down to the driver.
3812  *
3813  * This should be called after the affected queues are part of stream
3814  * geometry. It should be called from driver/module open routine after
3815  * qprocson() call. It is also called from nfs syscall where it is known that
3816  * stream is configured and won't change its geometry during create_putlock
3817  * call.
3818  *
3819  * caller normally uses 0 value for the stream argument to speed up MT putnext
3820  * into the perimeter of q for example because its perimeter is per module
3821  * (e.g. IP).
3822  *
3823  * caller normally uses non 0 value for the stream argument to hint the system
3824  * that the stream of q is a very contended global system stream
3825  * (e.g. NFS/UDP) and the part of the stream from q to the driver is
3826  * particularly MT hot.
3827  *
3828  * Caller insures stream plumbing won't happen while we are here and therefore
3829  * q_next can be safely used.
3830  */
3831 
3832 void
3833 create_putlocks(queue_t *q, int stream)
3834 {
3835 	ciputctrl_t	*cip;
3836 	struct stdata	*stp = STREAM(q);
3837 
3838 	q = _WR(q);
3839 	ASSERT(stp != NULL);
3840 
3841 	if (disable_putlocks != 0)
3842 		return;
3843 
3844 	if (n_ciputctrl < min_n_ciputctrl)
3845 		return;
3846 
3847 	ASSERT(ciputctrl_cache != NULL);
3848 
3849 	if (stream != 0 && stp->sd_ciputctrl == NULL) {
3850 		cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
3851 		SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
3852 		mutex_enter(&stp->sd_lock);
3853 		if (stp->sd_ciputctrl != NULL) {
3854 			mutex_exit(&stp->sd_lock);
3855 			kmem_cache_free(ciputctrl_cache, cip);
3856 		} else {
3857 			ASSERT(stp->sd_nciputctrl == 0);
3858 			stp->sd_nciputctrl = n_ciputctrl - 1;
3859 			/*
3860 			 * putnext checks sd_ciputctrl without holding
3861 			 * sd_lock. if it is not NULL putnext assumes
3862 			 * sd_nciputctrl is initialized. membar below
3863 			 * insures that.
3864 			 */
3865 			membar_producer();
3866 			stp->sd_ciputctrl = cip;
3867 			mutex_exit(&stp->sd_lock);
3868 		}
3869 	}
3870 
3871 	ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);
3872 
3873 	while (_SAMESTR(q)) {
3874 		create_syncq_putlocks(q);
3875 		if (stream == 0)
3876 			return;
3877 		q = q->q_next;
3878 	}
3879 	ASSERT(q != NULL);
3880 	create_syncq_putlocks(q);
3881 }
3882 
3883 /*
3884  * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
3885  * through a stream.
3886  *
3887  * Data currently record per event is a hrtime stamp, queue address, event
3888  * type, and a per type datum.  Much of the STREAMS framework is instrumented
3889  * for automatic flow tracing (when enabled).  Events can be defined and used
3890  * by STREAMS modules and drivers.
3891  *
3892  * Global objects:
3893  *
3894  *	str_ftevent() - Add a flow-trace event to a dblk.
3895  *	str_ftfree() - Free flow-trace data
3896  *
3897  * Local objects:
3898  *
3899  *	fthdr_cache - pointer to the kmem cache for trace header.
3900  *	ftblk_cache - pointer to the kmem cache for trace data blocks.
3901  */
3902 
3903 int str_ftnever = 1;	/* Don't do STREAMS flow tracing */
3904 
3905 void
3906 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
3907 {
3908 	ftblk_t *bp = hp->tail;
3909 	ftblk_t *nbp;
3910 	ftevnt_t *ep;
3911 	int ix, nix;
3912 
3913 	ASSERT(hp != NULL);
3914 
3915 	for (;;) {
3916 		if ((ix = bp->ix) == FTBLK_EVNTS) {
3917 			/*
3918 			 * Tail doesn't have room, so need a new tail.
3919 			 *
3920 			 * To make this MT safe, first, allocate a new
3921 			 * ftblk, and initialize it.  To make life a
3922 			 * little easier, reserve the first slot (mostly
3923 			 * by making ix = 1).  When we are finished with
3924 			 * the initialization, CAS this pointer to the
3925 			 * tail.  If this succeeds, this is the new
3926 			 * "next" block.  Otherwise, another thread
3927 			 * got here first, so free the block and start
3928 			 * again.
3929 			 */
3930 			if (!(nbp = kmem_cache_alloc(ftblk_cache,
3931 			    KM_NOSLEEP))) {
3932 				/* no mem, so punt */
3933 				str_ftnever++;
3934 				/* free up all flow data? */
3935 				return;
3936 			}
3937 			nbp->nxt = NULL;
3938 			nbp->ix = 1;
3939 			/*
3940 			 * Just in case there is another thread about
3941 			 * to get the next index, we need to make sure
3942 			 * the value is there for it.
3943 			 */
3944 			membar_producer();
3945 			if (casptr(&hp->tail, bp, nbp) == bp) {
3946 				/* CAS was successful */
3947 				bp->nxt = nbp;
3948 				membar_producer();
3949 				bp = nbp;
3950 				ix = 0;
3951 				goto cas_good;
3952 			} else {
3953 				kmem_cache_free(ftblk_cache, nbp);
3954 				bp = hp->tail;
3955 				continue;
3956 			}
3957 		}
3958 		nix = ix + 1;
3959 		if (cas32((uint32_t *)&bp->ix, ix, nix) == ix) {
3960 		cas_good:
3961 			if (curthread != hp->thread) {
3962 				hp->thread = curthread;
3963 				evnt |= FTEV_CS;
3964 			}
3965 			if (CPU->cpu_seqid != hp->cpu_seqid) {
3966 				hp->cpu_seqid = CPU->cpu_seqid;
3967 				evnt |= FTEV_PS;
3968 			}
3969 			ep = &bp->ev[ix];
3970 			break;
3971 		}
3972 	}
3973 
3974 	if (evnt & FTEV_QMASK) {
3975 		queue_t *qp = p;
3976 
3977 		/*
3978 		 * It is possible that the module info is broke
3979 		 * (as is logsubr.c at this comment writing).
3980 		 * Instead of panicing or doing other unmentionables,
3981 		 * we shall put a dummy name as the mid, and continue.
3982 		 */
3983 		if (qp->q_qinfo == NULL)
3984 			ep->mid = "NONAME";
3985 		else
3986 			ep->mid = qp->q_qinfo->qi_minfo->mi_idname;
3987 
3988 		if (!(qp->q_flag & QREADR))
3989 			evnt |= FTEV_ISWR;
3990 	} else {
3991 		ep->mid = (char *)p;
3992 	}
3993 
3994 	ep->ts = gethrtime();
3995 	ep->evnt = evnt;
3996 	ep->data = data;
3997 	hp->hash = (hp->hash << 9) + hp->hash;
3998 	hp->hash += (evnt << 16) | data;
3999 	hp->hash += (uintptr_t)ep->mid;
4000 }
4001 
4002 /*
4003  * Free flow-trace data.
4004  */
4005 void
4006 str_ftfree(dblk_t *dbp)
4007 {
4008 	fthdr_t *hp = dbp->db_fthdr;
4009 	ftblk_t *bp = &hp->first;
4010 	ftblk_t *nbp;
4011 
4012 	if (bp != hp->tail || bp->ix != 0) {
4013 		/*
4014 		 * Clear out the hash, have the tail point to itself, and free
4015 		 * any continuation blocks.
4016 		 */
4017 		bp = hp->first.nxt;
4018 		hp->tail = &hp->first;
4019 		hp->hash = 0;
4020 		hp->first.nxt = NULL;
4021 		hp->first.ix = 0;
4022 		while (bp != NULL) {
4023 			nbp = bp->nxt;
4024 			kmem_cache_free(ftblk_cache, bp);
4025 			bp = nbp;
4026 		}
4027 	}
4028 	kmem_cache_free(fthdr_cache, hp);
4029 	dbp->db_fthdr = NULL;
4030 }
4031