xref: /illumos-gate/usr/src/uts/common/io/stream.c (revision afd7fd7b2e19d8a40c0b602c50de34faa641f387)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
23 /*	  All Rights Reserved  	*/
24 
25 
26 /*
27  * Copyright 2005 Sun Microsystems, Inc.  All rights reserved.
28  * Use is subject to license terms.
29  */
30 
31 #pragma ident	"%Z%%M%	%I%	%E% SMI"
32 
33 #include <sys/types.h>
34 #include <sys/param.h>
35 #include <sys/thread.h>
36 #include <sys/sysmacros.h>
37 #include <sys/stropts.h>
38 #include <sys/stream.h>
39 #include <sys/strsubr.h>
40 #include <sys/strsun.h>
41 #include <sys/conf.h>
42 #include <sys/debug.h>
43 #include <sys/cmn_err.h>
44 #include <sys/kmem.h>
45 #include <sys/atomic.h>
46 #include <sys/errno.h>
47 #include <sys/vtrace.h>
48 #include <sys/ftrace.h>
49 #include <sys/ontrap.h>
50 #include <sys/multidata.h>
51 #include <sys/multidata_impl.h>
52 #include <sys/sdt.h>
53 
54 #ifdef DEBUG
55 #include <sys/kmem_impl.h>
56 #endif
57 
58 /*
59  * This file contains all the STREAMS utility routines that may
60  * be used by modules and drivers.
61  */
62 
63 /*
64  * STREAMS message allocator: principles of operation
65  *
66  * The streams message allocator consists of all the routines that
67  * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
68  * dupb(), freeb() and freemsg().  What follows is a high-level view
69  * of how the allocator works.
70  *
71  * Every streams message consists of one or more mblks, a dblk, and data.
72  * All mblks for all types of messages come from a common mblk_cache.
73  * The dblk and data come in several flavors, depending on how the
74  * message is allocated:
75  *
76  * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
77  *     fixed-size dblk/data caches. For message sizes that are multiples of
78  *     PAGESIZE, dblks are allocated separately from the buffer.
79  *     The associated buffer is allocated by the constructor using kmem_alloc().
80  *     For all other message sizes, dblk and its associated data is allocated
81  *     as a single contiguous chunk of memory.
82  *     Objects in these caches consist of a dblk plus its associated data.
83  *     allocb() determines the nearest-size cache by table lookup:
84  *     the dblk_cache[] array provides the mapping from size to dblk cache.
85  *
86  * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
87  *     kmem_alloc()'ing a buffer for the data and supplying that
88  *     buffer to gesballoc(), described below.
89  *
90  * (3) The four flavors of [d]esballoc[a] are all implemented by a
91  *     common routine, gesballoc() ("generic esballoc").  gesballoc()
92  *     allocates a dblk from the global dblk_esb_cache and sets db_base,
93  *     db_lim and db_frtnp to describe the caller-supplied buffer.
94  *
95  * While there are several routines to allocate messages, there is only
96  * one routine to free messages: freeb().  freeb() simply invokes the
97  * dblk's free method, dbp->db_free(), which is set at allocation time.
98  *
99  * dupb() creates a new reference to a message by allocating a new mblk,
100  * incrementing the dblk reference count and setting the dblk's free
101  * method to dblk_decref().  The dblk's original free method is retained
102  * in db_lastfree.  dblk_decref() decrements the reference count on each
103  * freeb().  If this is not the last reference it just frees the mblk;
104  * if this *is* the last reference, it restores db_free to db_lastfree,
105  * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
106  *
107  * The implementation makes aggressive use of kmem object caching for
108  * maximum performance.  This makes the code simple and compact, but
109  * also a bit abstruse in some places.  The invariants that constitute a
110  * message's constructed state, described below, are more subtle than usual.
111  *
112  * Every dblk has an "attached mblk" as part of its constructed state.
113  * The mblk is allocated by the dblk's constructor and remains attached
114  * until the message is either dup'ed or pulled up.  In the dupb() case
115  * the mblk association doesn't matter until the last free, at which time
116  * dblk_decref() attaches the last mblk to the dblk.  pullupmsg() affects
117  * the mblk association because it swaps the leading mblks of two messages,
118  * so it is responsible for swapping their db_mblk pointers accordingly.
119  * From a constructed-state viewpoint it doesn't matter that a dblk's
120  * attached mblk can change while the message is allocated; all that
121  * matters is that the dblk has *some* attached mblk when it's freed.
122  *
123  * The sizes of the allocb() small-message caches are not magical.
124  * They represent a good trade-off between internal and external
125  * fragmentation for current workloads.  They should be reevaluated
126  * periodically, especially if allocations larger than DBLK_MAX_CACHE
127  * become common.  We use 64-byte alignment so that dblks don't
128  * straddle cache lines unnecessarily.
129  */
130 #define	DBLK_MAX_CACHE		73728
131 #define	DBLK_CACHE_ALIGN	64
132 #define	DBLK_MIN_SIZE		8
133 #define	DBLK_SIZE_SHIFT		3
134 
135 #ifdef _BIG_ENDIAN
136 #define	DBLK_RTFU_SHIFT(field)	\
137 	(8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
138 #else
139 #define	DBLK_RTFU_SHIFT(field)	\
140 	(8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
141 #endif
142 
143 #define	DBLK_RTFU(ref, type, flags, uioflag)	\
144 	(((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
145 	((type) << DBLK_RTFU_SHIFT(db_type)) | \
146 	(((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
147 	((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
148 #define	DBLK_RTFU_REF_MASK	(DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
149 #define	DBLK_RTFU_WORD(dbp)	(*((uint32_t *)&(dbp)->db_ref))
150 #define	MBLK_BAND_FLAG_WORD(mp)	(*((uint32_t *)&(mp)->b_band))
151 
152 static size_t dblk_sizes[] = {
153 #ifdef _LP64
154 	16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3920,
155 	8192, 12112, 16384, 20304, 24576, 28496, 32768, 36688,
156 	40960, 44880, 49152, 53072, 57344, 61264, 65536, 69456,
157 #else
158 	64, 128, 320, 576, 1088, 1536, 1984, 2624, 3968,
159 	8192, 12160, 16384, 20352, 24576, 28544, 32768, 36736,
160 	40960, 44928, 49152, 53120, 57344, 61312, 65536, 69504,
161 #endif
162 	DBLK_MAX_CACHE, 0
163 };
164 
165 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
166 static struct kmem_cache *mblk_cache;
167 static struct kmem_cache *dblk_esb_cache;
168 static struct kmem_cache *fthdr_cache;
169 static struct kmem_cache *ftblk_cache;
170 
171 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
172 static mblk_t *allocb_oversize(size_t size, int flags);
173 static int allocb_tryhard_fails;
174 static void frnop_func(void *arg);
175 frtn_t frnop = { frnop_func };
176 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
177 
178 static boolean_t rwnext_enter(queue_t *qp);
179 static void rwnext_exit(queue_t *qp);
180 
181 /*
182  * Patchable mblk/dblk kmem_cache flags.
183  */
184 int dblk_kmem_flags = 0;
185 int mblk_kmem_flags = 0;
186 
187 
188 static int
189 dblk_constructor(void *buf, void *cdrarg, int kmflags)
190 {
191 	dblk_t *dbp = buf;
192 	ssize_t msg_size = (ssize_t)cdrarg;
193 	size_t index;
194 
195 	ASSERT(msg_size != 0);
196 
197 	index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
198 
199 	ASSERT(index <= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
200 
201 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
202 		return (-1);
203 	if ((msg_size & PAGEOFFSET) == 0) {
204 		dbp->db_base = kmem_alloc(msg_size, kmflags);
205 		if (dbp->db_base == NULL) {
206 			kmem_cache_free(mblk_cache, dbp->db_mblk);
207 			return (-1);
208 		}
209 	} else {
210 		dbp->db_base = (unsigned char *)&dbp[1];
211 	}
212 
213 	dbp->db_mblk->b_datap = dbp;
214 	dbp->db_cache = dblk_cache[index];
215 	dbp->db_lim = dbp->db_base + msg_size;
216 	dbp->db_free = dbp->db_lastfree = dblk_lastfree;
217 	dbp->db_frtnp = NULL;
218 	dbp->db_fthdr = NULL;
219 	dbp->db_credp = NULL;
220 	dbp->db_cpid = -1;
221 	dbp->db_struioflag = 0;
222 	dbp->db_struioun.cksum.flags = 0;
223 	return (0);
224 }
225 
226 /*ARGSUSED*/
227 static int
228 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
229 {
230 	dblk_t *dbp = buf;
231 
232 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
233 		return (-1);
234 	dbp->db_mblk->b_datap = dbp;
235 	dbp->db_cache = dblk_esb_cache;
236 	dbp->db_fthdr = NULL;
237 	dbp->db_credp = NULL;
238 	dbp->db_cpid = -1;
239 	dbp->db_struioflag = 0;
240 	dbp->db_struioun.cksum.flags = 0;
241 	return (0);
242 }
243 
244 static int
245 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
246 {
247 	dblk_t *dbp = buf;
248 	bcache_t *bcp = (bcache_t *)cdrarg;
249 
250 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
251 		return (-1);
252 
253 	if ((dbp->db_base = (unsigned char *)kmem_cache_alloc(bcp->buffer_cache,
254 	    kmflags)) == NULL) {
255 		kmem_cache_free(mblk_cache, dbp->db_mblk);
256 		return (-1);
257 	}
258 
259 	dbp->db_mblk->b_datap = dbp;
260 	dbp->db_cache = (void *)bcp;
261 	dbp->db_lim = dbp->db_base + bcp->size;
262 	dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
263 	dbp->db_frtnp = NULL;
264 	dbp->db_fthdr = NULL;
265 	dbp->db_credp = NULL;
266 	dbp->db_cpid = -1;
267 	dbp->db_struioflag = 0;
268 	dbp->db_struioun.cksum.flags = 0;
269 	return (0);
270 }
271 
272 /*ARGSUSED*/
273 static void
274 dblk_destructor(void *buf, void *cdrarg)
275 {
276 	dblk_t *dbp = buf;
277 	ssize_t msg_size = (ssize_t)cdrarg;
278 
279 	ASSERT(dbp->db_mblk->b_datap == dbp);
280 
281 	ASSERT(msg_size != 0);
282 
283 	ASSERT(dbp->db_struioflag == 0);
284 	ASSERT(dbp->db_struioun.cksum.flags == 0);
285 
286 	if ((msg_size & PAGEOFFSET) == 0) {
287 		kmem_free(dbp->db_base, msg_size);
288 	}
289 
290 	kmem_cache_free(mblk_cache, dbp->db_mblk);
291 }
292 
293 static void
294 bcache_dblk_destructor(void *buf, void *cdrarg)
295 {
296 	dblk_t *dbp = buf;
297 	bcache_t *bcp = (bcache_t *)cdrarg;
298 
299 	kmem_cache_free(bcp->buffer_cache, dbp->db_base);
300 
301 	ASSERT(dbp->db_mblk->b_datap == dbp);
302 
303 	ASSERT(dbp->db_struioflag == 0);
304 	ASSERT(dbp->db_struioun.cksum.flags == 0);
305 
306 	kmem_cache_free(mblk_cache, dbp->db_mblk);
307 }
308 
309 void
310 streams_msg_init(void)
311 {
312 	char name[40];
313 	size_t size;
314 	size_t lastsize = DBLK_MIN_SIZE;
315 	size_t *sizep;
316 	struct kmem_cache *cp;
317 	size_t tot_size;
318 	int offset;
319 
320 	mblk_cache = kmem_cache_create("streams_mblk",
321 		sizeof (mblk_t), 32, NULL, NULL, NULL, NULL, NULL,
322 		mblk_kmem_flags);
323 
324 	for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
325 
326 		if ((offset = (size & PAGEOFFSET)) != 0) {
327 			/*
328 			 * We are in the middle of a page, dblk should
329 			 * be allocated on the same page
330 			 */
331 			tot_size = size + sizeof (dblk_t);
332 			ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
333 								< PAGESIZE);
334 			ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
335 
336 		} else {
337 
338 			/*
339 			 * buf size is multiple of page size, dblk and
340 			 * buffer are allocated separately.
341 			 */
342 
343 			ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
344 			tot_size = sizeof (dblk_t);
345 		}
346 
347 		(void) sprintf(name, "streams_dblk_%ld", size);
348 		cp = kmem_cache_create(name, tot_size,
349 			DBLK_CACHE_ALIGN, dblk_constructor,
350 			dblk_destructor, NULL,
351 			(void *)(size), NULL, dblk_kmem_flags);
352 
353 		while (lastsize <= size) {
354 			dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
355 			lastsize += DBLK_MIN_SIZE;
356 		}
357 	}
358 
359 	dblk_esb_cache = kmem_cache_create("streams_dblk_esb",
360 			sizeof (dblk_t), DBLK_CACHE_ALIGN,
361 			dblk_esb_constructor, dblk_destructor, NULL,
362 			(void *) sizeof (dblk_t), NULL, dblk_kmem_flags);
363 	fthdr_cache = kmem_cache_create("streams_fthdr",
364 		sizeof (fthdr_t), 32, NULL, NULL, NULL, NULL, NULL, 0);
365 	ftblk_cache = kmem_cache_create("streams_ftblk",
366 		sizeof (ftblk_t), 32, NULL, NULL, NULL, NULL, NULL, 0);
367 
368 	/* Initialize Multidata caches */
369 	mmd_init();
370 }
371 
372 /*ARGSUSED*/
373 mblk_t *
374 allocb(size_t size, uint_t pri)
375 {
376 	dblk_t *dbp;
377 	mblk_t *mp;
378 	size_t index;
379 
380 	index =  (size - 1)  >> DBLK_SIZE_SHIFT;
381 
382 	if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
383 		if (size != 0) {
384 			mp = allocb_oversize(size, KM_NOSLEEP);
385 			goto out;
386 		}
387 		index = 0;
388 	}
389 
390 	if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
391 		mp = NULL;
392 		goto out;
393 	}
394 
395 	mp = dbp->db_mblk;
396 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
397 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
398 	mp->b_rptr = mp->b_wptr = dbp->db_base;
399 	mp->b_queue = NULL;
400 	MBLK_BAND_FLAG_WORD(mp) = 0;
401 	STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
402 out:
403 	FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
404 
405 	return (mp);
406 }
407 
408 mblk_t *
409 allocb_tmpl(size_t size, const mblk_t *tmpl)
410 {
411 	mblk_t *mp = allocb(size, 0);
412 
413 	if (mp != NULL) {
414 		cred_t *cr = DB_CRED(tmpl);
415 		if (cr != NULL)
416 			crhold(mp->b_datap->db_credp = cr);
417 		DB_CPID(mp) = DB_CPID(tmpl);
418 		DB_TYPE(mp) = DB_TYPE(tmpl);
419 	}
420 	return (mp);
421 }
422 
423 mblk_t *
424 allocb_cred(size_t size, cred_t *cr)
425 {
426 	mblk_t *mp = allocb(size, 0);
427 
428 	if (mp != NULL && cr != NULL)
429 		crhold(mp->b_datap->db_credp = cr);
430 
431 	return (mp);
432 }
433 
434 mblk_t *
435 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr)
436 {
437 	mblk_t *mp = allocb_wait(size, 0, flags, error);
438 
439 	if (mp != NULL && cr != NULL)
440 		crhold(mp->b_datap->db_credp = cr);
441 
442 	return (mp);
443 }
444 
445 void
446 freeb(mblk_t *mp)
447 {
448 	dblk_t *dbp = mp->b_datap;
449 
450 	ASSERT(dbp->db_ref > 0);
451 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
452 	FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
453 
454 	STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
455 
456 	dbp->db_free(mp, dbp);
457 }
458 
459 void
460 freemsg(mblk_t *mp)
461 {
462 	FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
463 	while (mp) {
464 		dblk_t *dbp = mp->b_datap;
465 		mblk_t *mp_cont = mp->b_cont;
466 
467 		ASSERT(dbp->db_ref > 0);
468 		ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
469 
470 		STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
471 
472 		dbp->db_free(mp, dbp);
473 		mp = mp_cont;
474 	}
475 }
476 
477 /*
478  * Reallocate a block for another use.  Try hard to use the old block.
479  * If the old data is wanted (copy), leave b_wptr at the end of the data,
480  * otherwise return b_wptr = b_rptr.
481  *
482  * This routine is private and unstable.
483  */
484 mblk_t	*
485 reallocb(mblk_t *mp, size_t size, uint_t copy)
486 {
487 	mblk_t		*mp1;
488 	unsigned char	*old_rptr;
489 	ptrdiff_t	cur_size;
490 
491 	if (mp == NULL)
492 		return (allocb(size, BPRI_HI));
493 
494 	cur_size = mp->b_wptr - mp->b_rptr;
495 	old_rptr = mp->b_rptr;
496 
497 	ASSERT(mp->b_datap->db_ref != 0);
498 
499 	if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
500 		/*
501 		 * If the data is wanted and it will fit where it is, no
502 		 * work is required.
503 		 */
504 		if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
505 			return (mp);
506 
507 		mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
508 		mp1 = mp;
509 	} else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
510 		/* XXX other mp state could be copied too, db_flags ... ? */
511 		mp1->b_cont = mp->b_cont;
512 	} else {
513 		return (NULL);
514 	}
515 
516 	if (copy) {
517 		bcopy(old_rptr, mp1->b_rptr, cur_size);
518 		mp1->b_wptr = mp1->b_rptr + cur_size;
519 	}
520 
521 	if (mp != mp1)
522 		freeb(mp);
523 
524 	return (mp1);
525 }
526 
527 static void
528 dblk_lastfree(mblk_t *mp, dblk_t *dbp)
529 {
530 	ASSERT(dbp->db_mblk == mp);
531 	if (dbp->db_fthdr != NULL)
532 		str_ftfree(dbp);
533 
534 	/* set credp and projid to be 'unspecified' before returning to cache */
535 	if (dbp->db_credp != NULL) {
536 		crfree(dbp->db_credp);
537 		dbp->db_credp = NULL;
538 	}
539 	dbp->db_cpid = -1;
540 
541 	/* Reset the struioflag and the checksum flag fields */
542 	dbp->db_struioflag = 0;
543 	dbp->db_struioun.cksum.flags = 0;
544 
545 	kmem_cache_free(dbp->db_cache, dbp);
546 }
547 
548 static void
549 dblk_decref(mblk_t *mp, dblk_t *dbp)
550 {
551 	if (dbp->db_ref != 1) {
552 		uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
553 		    -(1 << DBLK_RTFU_SHIFT(db_ref)));
554 		/*
555 		 * atomic_add_32_nv() just decremented db_ref, so we no longer
556 		 * have a reference to the dblk, which means another thread
557 		 * could free it.  Therefore we cannot examine the dblk to
558 		 * determine whether ours was the last reference.  Instead,
559 		 * we extract the new and minimum reference counts from rtfu.
560 		 * Note that all we're really saying is "if (ref != refmin)".
561 		 */
562 		if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
563 		    ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
564 			kmem_cache_free(mblk_cache, mp);
565 			return;
566 		}
567 	}
568 	dbp->db_mblk = mp;
569 	dbp->db_free = dbp->db_lastfree;
570 	dbp->db_lastfree(mp, dbp);
571 }
572 
573 mblk_t *
574 dupb(mblk_t *mp)
575 {
576 	dblk_t *dbp = mp->b_datap;
577 	mblk_t *new_mp;
578 	uint32_t oldrtfu, newrtfu;
579 
580 	if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
581 		goto out;
582 
583 	new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
584 	new_mp->b_rptr = mp->b_rptr;
585 	new_mp->b_wptr = mp->b_wptr;
586 	new_mp->b_datap = dbp;
587 	new_mp->b_queue = NULL;
588 	MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
589 
590 	STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
591 
592 	/*
593 	 * First-dup optimization.  The enabling assumption is that there
594 	 * can can never be a race (in correct code) to dup the first copy
595 	 * of a message.  Therefore we don't need to do it atomically.
596 	 */
597 	if (dbp->db_free != dblk_decref) {
598 		dbp->db_free = dblk_decref;
599 		dbp->db_ref++;
600 		goto out;
601 	}
602 
603 	do {
604 		ASSERT(dbp->db_ref > 0);
605 		oldrtfu = DBLK_RTFU_WORD(dbp);
606 		newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
607 		/*
608 		 * If db_ref is maxed out we can't dup this message anymore.
609 		 */
610 		if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
611 			kmem_cache_free(mblk_cache, new_mp);
612 			new_mp = NULL;
613 			goto out;
614 		}
615 	} while (cas32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != oldrtfu);
616 
617 out:
618 	FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
619 	return (new_mp);
620 }
621 
622 static void
623 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
624 {
625 	frtn_t *frp = dbp->db_frtnp;
626 
627 	ASSERT(dbp->db_mblk == mp);
628 	frp->free_func(frp->free_arg);
629 	if (dbp->db_fthdr != NULL)
630 		str_ftfree(dbp);
631 
632 	/* set credp and projid to be 'unspecified' before returning to cache */
633 	if (dbp->db_credp != NULL) {
634 		crfree(dbp->db_credp);
635 		dbp->db_credp = NULL;
636 	}
637 	dbp->db_cpid = -1;
638 	dbp->db_struioflag = 0;
639 	dbp->db_struioun.cksum.flags = 0;
640 
641 	kmem_cache_free(dbp->db_cache, dbp);
642 }
643 
644 /*ARGSUSED*/
645 static void
646 frnop_func(void *arg)
647 {
648 }
649 
650 /*
651  * Generic esballoc used to implement the four flavors: [d]esballoc[a].
652  */
653 static mblk_t *
654 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
655 	void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
656 {
657 	dblk_t *dbp;
658 	mblk_t *mp;
659 
660 	ASSERT(base != NULL && frp != NULL);
661 
662 	if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
663 		mp = NULL;
664 		goto out;
665 	}
666 
667 	mp = dbp->db_mblk;
668 	dbp->db_base = base;
669 	dbp->db_lim = base + size;
670 	dbp->db_free = dbp->db_lastfree = lastfree;
671 	dbp->db_frtnp = frp;
672 	DBLK_RTFU_WORD(dbp) = db_rtfu;
673 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
674 	mp->b_rptr = mp->b_wptr = base;
675 	mp->b_queue = NULL;
676 	MBLK_BAND_FLAG_WORD(mp) = 0;
677 
678 out:
679 	FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
680 	return (mp);
681 }
682 
683 /*ARGSUSED*/
684 mblk_t *
685 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
686 {
687 	mblk_t *mp;
688 
689 	/*
690 	 * Note that this is structured to allow the common case (i.e.
691 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
692 	 * call optimization.
693 	 */
694 	if (!str_ftnever) {
695 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
696 		    frp, freebs_enqueue, KM_NOSLEEP);
697 
698 		if (mp != NULL)
699 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
700 		return (mp);
701 	}
702 
703 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
704 	    frp, freebs_enqueue, KM_NOSLEEP));
705 }
706 
707 /*
708  * Same as esballoc() but sleeps waiting for memory.
709  */
710 /*ARGSUSED*/
711 mblk_t *
712 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
713 {
714 	mblk_t *mp;
715 
716 	/*
717 	 * Note that this is structured to allow the common case (i.e.
718 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
719 	 * call optimization.
720 	 */
721 	if (!str_ftnever) {
722 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
723 		    frp, freebs_enqueue, KM_SLEEP);
724 
725 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
726 		return (mp);
727 	}
728 
729 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
730 	    frp, freebs_enqueue, KM_SLEEP));
731 }
732 
733 /*ARGSUSED*/
734 mblk_t *
735 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
736 {
737 	mblk_t *mp;
738 
739 	/*
740 	 * Note that this is structured to allow the common case (i.e.
741 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
742 	 * call optimization.
743 	 */
744 	if (!str_ftnever) {
745 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
746 			frp, dblk_lastfree_desb, KM_NOSLEEP);
747 
748 		if (mp != NULL)
749 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
750 		return (mp);
751 	}
752 
753 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
754 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
755 }
756 
757 /*ARGSUSED*/
758 mblk_t *
759 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
760 {
761 	mblk_t *mp;
762 
763 	/*
764 	 * Note that this is structured to allow the common case (i.e.
765 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
766 	 * call optimization.
767 	 */
768 	if (!str_ftnever) {
769 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
770 		    frp, freebs_enqueue, KM_NOSLEEP);
771 
772 		if (mp != NULL)
773 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
774 		return (mp);
775 	}
776 
777 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
778 	    frp, freebs_enqueue, KM_NOSLEEP));
779 }
780 
781 /*ARGSUSED*/
782 mblk_t *
783 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
784 {
785 	mblk_t *mp;
786 
787 	/*
788 	 * Note that this is structured to allow the common case (i.e.
789 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
790 	 * call optimization.
791 	 */
792 	if (!str_ftnever) {
793 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
794 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
795 
796 		if (mp != NULL)
797 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
798 		return (mp);
799 	}
800 
801 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
802 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
803 }
804 
805 static void
806 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
807 {
808 	bcache_t *bcp = dbp->db_cache;
809 
810 	ASSERT(dbp->db_mblk == mp);
811 	if (dbp->db_fthdr != NULL)
812 		str_ftfree(dbp);
813 
814 	/* set credp and projid to be 'unspecified' before returning to cache */
815 	if (dbp->db_credp != NULL) {
816 		crfree(dbp->db_credp);
817 		dbp->db_credp = NULL;
818 	}
819 	dbp->db_cpid = -1;
820 	dbp->db_struioflag = 0;
821 	dbp->db_struioun.cksum.flags = 0;
822 
823 	mutex_enter(&bcp->mutex);
824 	kmem_cache_free(bcp->dblk_cache, dbp);
825 	bcp->alloc--;
826 
827 	if (bcp->alloc == 0 && bcp->destroy != 0) {
828 		kmem_cache_destroy(bcp->dblk_cache);
829 		kmem_cache_destroy(bcp->buffer_cache);
830 		mutex_exit(&bcp->mutex);
831 		mutex_destroy(&bcp->mutex);
832 		kmem_free(bcp, sizeof (bcache_t));
833 	} else {
834 		mutex_exit(&bcp->mutex);
835 	}
836 }
837 
838 bcache_t *
839 bcache_create(char *name, size_t size, uint_t align)
840 {
841 	bcache_t *bcp;
842 	char buffer[255];
843 
844 	ASSERT((align & (align - 1)) == 0);
845 
846 	if ((bcp = (bcache_t *)kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) ==
847 	    NULL) {
848 		return (NULL);
849 	}
850 
851 	bcp->size = size;
852 	bcp->align = align;
853 	bcp->alloc = 0;
854 	bcp->destroy = 0;
855 
856 	mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
857 
858 	(void) sprintf(buffer, "%s_buffer_cache", name);
859 	bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
860 	    NULL, NULL, NULL, 0);
861 	(void) sprintf(buffer, "%s_dblk_cache", name);
862 	bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
863 	    DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
864 						NULL, (void *)bcp, NULL, 0);
865 
866 	return (bcp);
867 }
868 
869 void
870 bcache_destroy(bcache_t *bcp)
871 {
872 	ASSERT(bcp != NULL);
873 
874 	mutex_enter(&bcp->mutex);
875 	if (bcp->alloc == 0) {
876 		kmem_cache_destroy(bcp->dblk_cache);
877 		kmem_cache_destroy(bcp->buffer_cache);
878 		mutex_exit(&bcp->mutex);
879 		mutex_destroy(&bcp->mutex);
880 		kmem_free(bcp, sizeof (bcache_t));
881 	} else {
882 		bcp->destroy++;
883 		mutex_exit(&bcp->mutex);
884 	}
885 }
886 
887 /*ARGSUSED*/
888 mblk_t *
889 bcache_allocb(bcache_t *bcp, uint_t pri)
890 {
891 	dblk_t *dbp;
892 	mblk_t *mp = NULL;
893 
894 	ASSERT(bcp != NULL);
895 
896 	mutex_enter(&bcp->mutex);
897 	if (bcp->destroy != 0) {
898 		mutex_exit(&bcp->mutex);
899 		goto out;
900 	}
901 
902 	if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
903 		mutex_exit(&bcp->mutex);
904 		goto out;
905 	}
906 	bcp->alloc++;
907 	mutex_exit(&bcp->mutex);
908 
909 	ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
910 
911 	mp = dbp->db_mblk;
912 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
913 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
914 	mp->b_rptr = mp->b_wptr = dbp->db_base;
915 	mp->b_queue = NULL;
916 	MBLK_BAND_FLAG_WORD(mp) = 0;
917 	STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
918 out:
919 	FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
920 
921 	return (mp);
922 }
923 
924 static void
925 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
926 {
927 	ASSERT(dbp->db_mblk == mp);
928 	if (dbp->db_fthdr != NULL)
929 		str_ftfree(dbp);
930 
931 	/* set credp and projid to be 'unspecified' before returning to cache */
932 	if (dbp->db_credp != NULL) {
933 		crfree(dbp->db_credp);
934 		dbp->db_credp = NULL;
935 	}
936 	dbp->db_cpid = -1;
937 	dbp->db_struioflag = 0;
938 	dbp->db_struioun.cksum.flags = 0;
939 
940 	kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
941 	kmem_cache_free(dbp->db_cache, dbp);
942 }
943 
944 static mblk_t *
945 allocb_oversize(size_t size, int kmflags)
946 {
947 	mblk_t *mp;
948 	void *buf;
949 
950 	size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
951 	if ((buf = kmem_alloc(size, kmflags)) == NULL)
952 		return (NULL);
953 	if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
954 	    &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
955 		kmem_free(buf, size);
956 
957 	if (mp != NULL)
958 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
959 
960 	return (mp);
961 }
962 
963 mblk_t *
964 allocb_tryhard(size_t target_size)
965 {
966 	size_t size;
967 	mblk_t *bp;
968 
969 	for (size = target_size; size < target_size + 512;
970 	    size += DBLK_CACHE_ALIGN)
971 		if ((bp = allocb(size, BPRI_HI)) != NULL)
972 			return (bp);
973 	allocb_tryhard_fails++;
974 	return (NULL);
975 }
976 
977 /*
978  * This routine is consolidation private for STREAMS internal use
979  * This routine may only be called from sync routines (i.e., not
980  * from put or service procedures).  It is located here (rather
981  * than strsubr.c) so that we don't have to expose all of the
982  * allocb() implementation details in header files.
983  */
984 mblk_t *
985 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
986 {
987 	dblk_t *dbp;
988 	mblk_t *mp;
989 	size_t index;
990 
991 	index = (size -1) >> DBLK_SIZE_SHIFT;
992 
993 	if (flags & STR_NOSIG) {
994 		if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
995 			if (size != 0) {
996 				mp = allocb_oversize(size, KM_SLEEP);
997 				FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
998 				    (uintptr_t)mp);
999 				return (mp);
1000 			}
1001 			index = 0;
1002 		}
1003 
1004 		dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
1005 		mp = dbp->db_mblk;
1006 		DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1007 		mp->b_next = mp->b_prev = mp->b_cont = NULL;
1008 		mp->b_rptr = mp->b_wptr = dbp->db_base;
1009 		mp->b_queue = NULL;
1010 		MBLK_BAND_FLAG_WORD(mp) = 0;
1011 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
1012 
1013 		FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
1014 
1015 	} else {
1016 		while ((mp = allocb(size, pri)) == NULL) {
1017 			if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
1018 				return (NULL);
1019 		}
1020 	}
1021 
1022 	return (mp);
1023 }
1024 
1025 /*
1026  * Call function 'func' with 'arg' when a class zero block can
1027  * be allocated with priority 'pri'.
1028  */
1029 bufcall_id_t
1030 esbbcall(uint_t pri, void (*func)(void *), void *arg)
1031 {
1032 	return (bufcall(1, pri, func, arg));
1033 }
1034 
1035 /*
1036  * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
1037  * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
1038  * This provides consistency for all internal allocators of ioctl.
1039  */
1040 mblk_t *
1041 mkiocb(uint_t cmd)
1042 {
1043 	struct iocblk	*ioc;
1044 	mblk_t		*mp;
1045 
1046 	/*
1047 	 * Allocate enough space for any of the ioctl related messages.
1048 	 */
1049 	if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
1050 		return (NULL);
1051 
1052 	bzero(mp->b_rptr, sizeof (union ioctypes));
1053 
1054 	/*
1055 	 * Set the mblk_t information and ptrs correctly.
1056 	 */
1057 	mp->b_wptr += sizeof (struct iocblk);
1058 	mp->b_datap->db_type = M_IOCTL;
1059 
1060 	/*
1061 	 * Fill in the fields.
1062 	 */
1063 	ioc		= (struct iocblk *)mp->b_rptr;
1064 	ioc->ioc_cmd	= cmd;
1065 	ioc->ioc_cr	= kcred;
1066 	ioc->ioc_id	= getiocseqno();
1067 	ioc->ioc_flag	= IOC_NATIVE;
1068 	return (mp);
1069 }
1070 
1071 /*
1072  * test if block of given size can be allocated with a request of
1073  * the given priority.
1074  * 'pri' is no longer used, but is retained for compatibility.
1075  */
1076 /* ARGSUSED */
1077 int
1078 testb(size_t size, uint_t pri)
1079 {
1080 	return ((size + sizeof (dblk_t)) <= kmem_avail());
1081 }
1082 
1083 /*
1084  * Call function 'func' with argument 'arg' when there is a reasonably
1085  * good chance that a block of size 'size' can be allocated.
1086  * 'pri' is no longer used, but is retained for compatibility.
1087  */
1088 /* ARGSUSED */
1089 bufcall_id_t
1090 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
1091 {
1092 	static long bid = 1;	/* always odd to save checking for zero */
1093 	bufcall_id_t bc_id;
1094 	struct strbufcall *bcp;
1095 
1096 	if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
1097 		return (0);
1098 
1099 	bcp->bc_func = func;
1100 	bcp->bc_arg = arg;
1101 	bcp->bc_size = size;
1102 	bcp->bc_next = NULL;
1103 	bcp->bc_executor = NULL;
1104 
1105 	mutex_enter(&strbcall_lock);
1106 	/*
1107 	 * After bcp is linked into strbcalls and strbcall_lock is dropped there
1108 	 * should be no references to bcp since it may be freed by
1109 	 * runbufcalls(). Since bcp_id field is returned, we save its value in
1110 	 * the local var.
1111 	 */
1112 	bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2);	/* keep it odd */
1113 
1114 	/*
1115 	 * add newly allocated stream event to existing
1116 	 * linked list of events.
1117 	 */
1118 	if (strbcalls.bc_head == NULL) {
1119 		strbcalls.bc_head = strbcalls.bc_tail = bcp;
1120 	} else {
1121 		strbcalls.bc_tail->bc_next = bcp;
1122 		strbcalls.bc_tail = bcp;
1123 	}
1124 
1125 	cv_signal(&strbcall_cv);
1126 	mutex_exit(&strbcall_lock);
1127 	return (bc_id);
1128 }
1129 
1130 /*
1131  * Cancel a bufcall request.
1132  */
1133 void
1134 unbufcall(bufcall_id_t id)
1135 {
1136 	strbufcall_t *bcp, *pbcp;
1137 
1138 	mutex_enter(&strbcall_lock);
1139 again:
1140 	pbcp = NULL;
1141 	for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
1142 		if (id == bcp->bc_id)
1143 			break;
1144 		pbcp = bcp;
1145 	}
1146 	if (bcp) {
1147 		if (bcp->bc_executor != NULL) {
1148 			if (bcp->bc_executor != curthread) {
1149 				cv_wait(&bcall_cv, &strbcall_lock);
1150 				goto again;
1151 			}
1152 		} else {
1153 			if (pbcp)
1154 				pbcp->bc_next = bcp->bc_next;
1155 			else
1156 				strbcalls.bc_head = bcp->bc_next;
1157 			if (bcp == strbcalls.bc_tail)
1158 				strbcalls.bc_tail = pbcp;
1159 			kmem_free(bcp, sizeof (strbufcall_t));
1160 		}
1161 	}
1162 	mutex_exit(&strbcall_lock);
1163 }
1164 
1165 /*
1166  * Duplicate a message block by block (uses dupb), returning
1167  * a pointer to the duplicate message.
1168  * Returns a non-NULL value only if the entire message
1169  * was dup'd.
1170  */
1171 mblk_t *
1172 dupmsg(mblk_t *bp)
1173 {
1174 	mblk_t *head, *nbp;
1175 
1176 	if (!bp || !(nbp = head = dupb(bp)))
1177 		return (NULL);
1178 
1179 	while (bp->b_cont) {
1180 		if (!(nbp->b_cont = dupb(bp->b_cont))) {
1181 			freemsg(head);
1182 			return (NULL);
1183 		}
1184 		nbp = nbp->b_cont;
1185 		bp = bp->b_cont;
1186 	}
1187 	return (head);
1188 }
1189 
1190 #define	DUPB_NOLOAN(bp) \
1191 	((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
1192 	copyb((bp)) : dupb((bp)))
1193 
1194 mblk_t *
1195 dupmsg_noloan(mblk_t *bp)
1196 {
1197 	mblk_t *head, *nbp;
1198 
1199 	if (bp == NULL || DB_TYPE(bp) != M_DATA ||
1200 	    ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
1201 		return (NULL);
1202 
1203 	while (bp->b_cont) {
1204 		if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
1205 			freemsg(head);
1206 			return (NULL);
1207 		}
1208 		nbp = nbp->b_cont;
1209 		bp = bp->b_cont;
1210 	}
1211 	return (head);
1212 }
1213 
1214 /*
1215  * Copy data from message and data block to newly allocated message and
1216  * data block. Returns new message block pointer, or NULL if error.
1217  * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
1218  * as in the original even when db_base is not word aligned. (bug 1052877)
1219  */
1220 mblk_t *
1221 copyb(mblk_t *bp)
1222 {
1223 	mblk_t	*nbp;
1224 	dblk_t	*dp, *ndp;
1225 	uchar_t *base;
1226 	size_t	size;
1227 	size_t	unaligned;
1228 
1229 	ASSERT(bp->b_wptr >= bp->b_rptr);
1230 
1231 	dp = bp->b_datap;
1232 	if (dp->db_fthdr != NULL)
1233 		STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
1234 
1235 	/*
1236 	 * Special handling for Multidata message; this should be
1237 	 * removed once a copy-callback routine is made available.
1238 	 */
1239 	if (dp->db_type == M_MULTIDATA) {
1240 		cred_t *cr;
1241 
1242 		if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL)
1243 			return (NULL);
1244 
1245 		nbp->b_flag = bp->b_flag;
1246 		nbp->b_band = bp->b_band;
1247 		ndp = nbp->b_datap;
1248 
1249 		/* See comments below on potential issues. */
1250 		STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1251 
1252 		ASSERT(ndp->db_type == dp->db_type);
1253 		cr = dp->db_credp;
1254 		if (cr != NULL)
1255 			crhold(ndp->db_credp = cr);
1256 		ndp->db_cpid = dp->db_cpid;
1257 		return (nbp);
1258 	}
1259 
1260 	size = dp->db_lim - dp->db_base;
1261 	unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
1262 	if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
1263 		return (NULL);
1264 	nbp->b_flag = bp->b_flag;
1265 	nbp->b_band = bp->b_band;
1266 	ndp = nbp->b_datap;
1267 
1268 	/*
1269 	 * Well, here is a potential issue.  If we are trying to
1270 	 * trace a flow, and we copy the message, we might lose
1271 	 * information about where this message might have been.
1272 	 * So we should inherit the FT data.  On the other hand,
1273 	 * a user might be interested only in alloc to free data.
1274 	 * So I guess the real answer is to provide a tunable.
1275 	 */
1276 	STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1277 
1278 	base = ndp->db_base + unaligned;
1279 	bcopy(dp->db_base, ndp->db_base + unaligned, size);
1280 
1281 	nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
1282 	nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
1283 
1284 	return (nbp);
1285 }
1286 
1287 /*
1288  * Copy data from message to newly allocated message using new
1289  * data blocks.  Returns a pointer to the new message, or NULL if error.
1290  */
1291 mblk_t *
1292 copymsg(mblk_t *bp)
1293 {
1294 	mblk_t *head, *nbp;
1295 
1296 	if (!bp || !(nbp = head = copyb(bp)))
1297 		return (NULL);
1298 
1299 	while (bp->b_cont) {
1300 		if (!(nbp->b_cont = copyb(bp->b_cont))) {
1301 			freemsg(head);
1302 			return (NULL);
1303 		}
1304 		nbp = nbp->b_cont;
1305 		bp = bp->b_cont;
1306 	}
1307 	return (head);
1308 }
1309 
1310 /*
1311  * link a message block to tail of message
1312  */
1313 void
1314 linkb(mblk_t *mp, mblk_t *bp)
1315 {
1316 	ASSERT(mp && bp);
1317 
1318 	for (; mp->b_cont; mp = mp->b_cont)
1319 		;
1320 	mp->b_cont = bp;
1321 }
1322 
1323 /*
1324  * unlink a message block from head of message
1325  * return pointer to new message.
1326  * NULL if message becomes empty.
1327  */
1328 mblk_t *
1329 unlinkb(mblk_t *bp)
1330 {
1331 	mblk_t *bp1;
1332 
1333 	bp1 = bp->b_cont;
1334 	bp->b_cont = NULL;
1335 	return (bp1);
1336 }
1337 
1338 /*
1339  * remove a message block "bp" from message "mp"
1340  *
1341  * Return pointer to new message or NULL if no message remains.
1342  * Return -1 if bp is not found in message.
1343  */
1344 mblk_t *
1345 rmvb(mblk_t *mp, mblk_t *bp)
1346 {
1347 	mblk_t *tmp;
1348 	mblk_t *lastp = NULL;
1349 
1350 	ASSERT(mp && bp);
1351 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
1352 		if (tmp == bp) {
1353 			if (lastp)
1354 				lastp->b_cont = tmp->b_cont;
1355 			else
1356 				mp = tmp->b_cont;
1357 			tmp->b_cont = NULL;
1358 			return (mp);
1359 		}
1360 		lastp = tmp;
1361 	}
1362 	return ((mblk_t *)-1);
1363 }
1364 
1365 /*
1366  * Concatenate and align first len bytes of common
1367  * message type.  Len == -1, means concat everything.
1368  * Returns 1 on success, 0 on failure
1369  * After the pullup, mp points to the pulled up data.
1370  */
1371 int
1372 pullupmsg(mblk_t *mp, ssize_t len)
1373 {
1374 	mblk_t *bp, *b_cont;
1375 	dblk_t *dbp;
1376 	ssize_t n;
1377 
1378 	ASSERT(mp->b_datap->db_ref > 0);
1379 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
1380 
1381 	/*
1382 	 * We won't handle Multidata message, since it contains
1383 	 * metadata which this function has no knowledge of; we
1384 	 * assert on DEBUG, and return failure otherwise.
1385 	 */
1386 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1387 	if (mp->b_datap->db_type == M_MULTIDATA)
1388 		return (0);
1389 
1390 	if (len == -1) {
1391 		if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
1392 			return (1);
1393 		len = xmsgsize(mp);
1394 	} else {
1395 		ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
1396 		ASSERT(first_mblk_len >= 0);
1397 		/*
1398 		 * If the length is less than that of the first mblk,
1399 		 * we want to pull up the message into an aligned mblk.
1400 		 * Though not part of the spec, some callers assume it.
1401 		 */
1402 		if (len <= first_mblk_len) {
1403 			if (str_aligned(mp->b_rptr))
1404 				return (1);
1405 			len = first_mblk_len;
1406 		} else if (xmsgsize(mp) < len)
1407 			return (0);
1408 	}
1409 
1410 	if ((bp = allocb_tmpl(len, mp)) == NULL)
1411 		return (0);
1412 
1413 	dbp = bp->b_datap;
1414 	*bp = *mp;		/* swap mblks so bp heads the old msg... */
1415 	mp->b_datap = dbp;	/* ... and mp heads the new message */
1416 	mp->b_datap->db_mblk = mp;
1417 	bp->b_datap->db_mblk = bp;
1418 	mp->b_rptr = mp->b_wptr = dbp->db_base;
1419 
1420 	do {
1421 		ASSERT(bp->b_datap->db_ref > 0);
1422 		ASSERT(bp->b_wptr >= bp->b_rptr);
1423 		n = MIN(bp->b_wptr - bp->b_rptr, len);
1424 		bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
1425 		mp->b_wptr += n;
1426 		bp->b_rptr += n;
1427 		len -= n;
1428 		if (bp->b_rptr != bp->b_wptr)
1429 			break;
1430 		b_cont = bp->b_cont;
1431 		freeb(bp);
1432 		bp = b_cont;
1433 	} while (len && bp);
1434 
1435 	mp->b_cont = bp;	/* tack on whatever wasn't pulled up */
1436 
1437 	return (1);
1438 }
1439 
1440 /*
1441  * Concatenate and align at least the first len bytes of common message
1442  * type.  Len == -1 means concatenate everything.  The original message is
1443  * unaltered.  Returns a pointer to a new message on success, otherwise
1444  * returns NULL.
1445  */
1446 mblk_t *
1447 msgpullup(mblk_t *mp, ssize_t len)
1448 {
1449 	mblk_t	*newmp;
1450 	ssize_t	totlen;
1451 	ssize_t	n;
1452 
1453 	/*
1454 	 * We won't handle Multidata message, since it contains
1455 	 * metadata which this function has no knowledge of; we
1456 	 * assert on DEBUG, and return failure otherwise.
1457 	 */
1458 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1459 	if (mp->b_datap->db_type == M_MULTIDATA)
1460 		return (NULL);
1461 
1462 	totlen = xmsgsize(mp);
1463 
1464 	if ((len > 0) && (len > totlen))
1465 		return (NULL);
1466 
1467 	/*
1468 	 * Copy all of the first msg type into one new mblk, then dupmsg
1469 	 * and link the rest onto this.
1470 	 */
1471 
1472 	len = totlen;
1473 
1474 	if ((newmp = allocb_tmpl(len, mp)) == NULL)
1475 		return (NULL);
1476 
1477 	newmp->b_flag = mp->b_flag;
1478 	newmp->b_band = mp->b_band;
1479 
1480 	while (len > 0) {
1481 		n = mp->b_wptr - mp->b_rptr;
1482 		ASSERT(n >= 0);		/* allow zero-length mblk_t's */
1483 		if (n > 0)
1484 			bcopy(mp->b_rptr, newmp->b_wptr, n);
1485 		newmp->b_wptr += n;
1486 		len -= n;
1487 		mp = mp->b_cont;
1488 	}
1489 
1490 	if (mp != NULL) {
1491 		newmp->b_cont = dupmsg(mp);
1492 		if (newmp->b_cont == NULL) {
1493 			freemsg(newmp);
1494 			return (NULL);
1495 		}
1496 	}
1497 
1498 	return (newmp);
1499 }
1500 
1501 /*
1502  * Trim bytes from message
1503  *  len > 0, trim from head
1504  *  len < 0, trim from tail
1505  * Returns 1 on success, 0 on failure.
1506  */
1507 int
1508 adjmsg(mblk_t *mp, ssize_t len)
1509 {
1510 	mblk_t *bp;
1511 	mblk_t *save_bp = NULL;
1512 	mblk_t *prev_bp;
1513 	mblk_t *bcont;
1514 	unsigned char type;
1515 	ssize_t n;
1516 	int fromhead;
1517 	int first;
1518 
1519 	ASSERT(mp != NULL);
1520 	/*
1521 	 * We won't handle Multidata message, since it contains
1522 	 * metadata which this function has no knowledge of; we
1523 	 * assert on DEBUG, and return failure otherwise.
1524 	 */
1525 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1526 	if (mp->b_datap->db_type == M_MULTIDATA)
1527 		return (0);
1528 
1529 	if (len < 0) {
1530 		fromhead = 0;
1531 		len = -len;
1532 	} else {
1533 		fromhead = 1;
1534 	}
1535 
1536 	if (xmsgsize(mp) < len)
1537 		return (0);
1538 
1539 
1540 	if (fromhead) {
1541 		first = 1;
1542 		while (len) {
1543 			ASSERT(mp->b_wptr >= mp->b_rptr);
1544 			n = MIN(mp->b_wptr - mp->b_rptr, len);
1545 			mp->b_rptr += n;
1546 			len -= n;
1547 
1548 			/*
1549 			 * If this is not the first zero length
1550 			 * message remove it
1551 			 */
1552 			if (!first && (mp->b_wptr == mp->b_rptr)) {
1553 				bcont = mp->b_cont;
1554 				freeb(mp);
1555 				mp = save_bp->b_cont = bcont;
1556 			} else {
1557 				save_bp = mp;
1558 				mp = mp->b_cont;
1559 			}
1560 			first = 0;
1561 		}
1562 	} else {
1563 		type = mp->b_datap->db_type;
1564 		while (len) {
1565 			bp = mp;
1566 			save_bp = NULL;
1567 
1568 			/*
1569 			 * Find the last message of same type
1570 			 */
1571 
1572 			while (bp && bp->b_datap->db_type == type) {
1573 				ASSERT(bp->b_wptr >= bp->b_rptr);
1574 				prev_bp = save_bp;
1575 				save_bp = bp;
1576 				bp = bp->b_cont;
1577 			}
1578 			if (save_bp == NULL)
1579 				break;
1580 			n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
1581 			save_bp->b_wptr -= n;
1582 			len -= n;
1583 
1584 			/*
1585 			 * If this is not the first message
1586 			 * and we have taken away everything
1587 			 * from this message, remove it
1588 			 */
1589 
1590 			if ((save_bp != mp) &&
1591 				(save_bp->b_wptr == save_bp->b_rptr)) {
1592 				bcont = save_bp->b_cont;
1593 				freeb(save_bp);
1594 				prev_bp->b_cont = bcont;
1595 			}
1596 		}
1597 	}
1598 	return (1);
1599 }
1600 
1601 /*
1602  * get number of data bytes in message
1603  */
1604 size_t
1605 msgdsize(mblk_t *bp)
1606 {
1607 	size_t count = 0;
1608 
1609 	for (; bp; bp = bp->b_cont)
1610 		if (bp->b_datap->db_type == M_DATA) {
1611 			ASSERT(bp->b_wptr >= bp->b_rptr);
1612 			count += bp->b_wptr - bp->b_rptr;
1613 		}
1614 	return (count);
1615 }
1616 
1617 /*
1618  * Get a message off head of queue
1619  *
1620  * If queue has no buffers then mark queue
1621  * with QWANTR. (queue wants to be read by
1622  * someone when data becomes available)
1623  *
1624  * If there is something to take off then do so.
1625  * If queue falls below hi water mark turn off QFULL
1626  * flag.  Decrement weighted count of queue.
1627  * Also turn off QWANTR because queue is being read.
1628  *
1629  * The queue count is maintained on a per-band basis.
1630  * Priority band 0 (normal messages) uses q_count,
1631  * q_lowat, etc.  Non-zero priority bands use the
1632  * fields in their respective qband structures
1633  * (qb_count, qb_lowat, etc.)  All messages appear
1634  * on the same list, linked via their b_next pointers.
1635  * q_first is the head of the list.  q_count does
1636  * not reflect the size of all the messages on the
1637  * queue.  It only reflects those messages in the
1638  * normal band of flow.  The one exception to this
1639  * deals with high priority messages.  They are in
1640  * their own conceptual "band", but are accounted
1641  * against q_count.
1642  *
1643  * If queue count is below the lo water mark and QWANTW
1644  * is set, enable the closest backq which has a service
1645  * procedure and turn off the QWANTW flag.
1646  *
1647  * getq could be built on top of rmvq, but isn't because
1648  * of performance considerations.
1649  *
1650  * A note on the use of q_count and q_mblkcnt:
1651  *   q_count is the traditional byte count for messages that
1652  *   have been put on a queue.  Documentation tells us that
1653  *   we shouldn't rely on that count, but some drivers/modules
1654  *   do.  What was needed, however, is a mechanism to prevent
1655  *   runaway streams from consuming all of the resources,
1656  *   and particularly be able to flow control zero-length
1657  *   messages.  q_mblkcnt is used for this purpose.  It
1658  *   counts the number of mblk's that are being put on
1659  *   the queue.  The intention here, is that each mblk should
1660  *   contain one byte of data and, for the purpose of
1661  *   flow-control, logically does.  A queue will become
1662  *   full when EITHER of these values (q_count and q_mblkcnt)
1663  *   reach the highwater mark.  It will clear when BOTH
1664  *   of them drop below the highwater mark.  And it will
1665  *   backenable when BOTH of them drop below the lowwater
1666  *   mark.
1667  *   With this algorithm, a driver/module might be able
1668  *   to find a reasonably accurate q_count, and the
1669  *   framework can still try and limit resource usage.
1670  */
1671 mblk_t *
1672 getq(queue_t *q)
1673 {
1674 	mblk_t *bp;
1675 	uchar_t band = 0;
1676 
1677 	bp = getq_noenab(q);
1678 	if (bp != NULL)
1679 		band = bp->b_band;
1680 
1681 	/*
1682 	 * Inlined from qbackenable().
1683 	 * Quick check without holding the lock.
1684 	 */
1685 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1686 		return (bp);
1687 
1688 	qbackenable(q, band);
1689 	return (bp);
1690 }
1691 
1692 /*
1693  * Like getq() but does not backenable.  This is used by the stream
1694  * head when a putback() is likely.  The caller must call qbackenable()
1695  * after it is done with accessing the queue.
1696  */
1697 mblk_t *
1698 getq_noenab(queue_t *q)
1699 {
1700 	mblk_t *bp;
1701 	mblk_t *tmp;
1702 	qband_t *qbp;
1703 	kthread_id_t freezer;
1704 	int	bytecnt = 0, mblkcnt = 0;
1705 
1706 	/* freezestr should allow its caller to call getq/putq */
1707 	freezer = STREAM(q)->sd_freezer;
1708 	if (freezer == curthread) {
1709 		ASSERT(frozenstr(q));
1710 		ASSERT(MUTEX_HELD(QLOCK(q)));
1711 	} else
1712 		mutex_enter(QLOCK(q));
1713 
1714 	if ((bp = q->q_first) == 0) {
1715 		q->q_flag |= QWANTR;
1716 	} else {
1717 		if ((q->q_first = bp->b_next) == NULL)
1718 			q->q_last = NULL;
1719 		else
1720 			q->q_first->b_prev = NULL;
1721 
1722 		/* Get message byte count for q_count accounting */
1723 		for (tmp = bp; tmp; tmp = tmp->b_cont) {
1724 			bytecnt += (tmp->b_wptr - tmp->b_rptr);
1725 			mblkcnt++;
1726 		}
1727 
1728 		if (bp->b_band == 0) {
1729 			q->q_count -= bytecnt;
1730 			q->q_mblkcnt -= mblkcnt;
1731 			if ((q->q_count < q->q_hiwat) &&
1732 			    (q->q_mblkcnt < q->q_hiwat)) {
1733 				q->q_flag &= ~QFULL;
1734 			}
1735 		} else {
1736 			int i;
1737 
1738 			ASSERT(bp->b_band <= q->q_nband);
1739 			ASSERT(q->q_bandp != NULL);
1740 			ASSERT(MUTEX_HELD(QLOCK(q)));
1741 			qbp = q->q_bandp;
1742 			i = bp->b_band;
1743 			while (--i > 0)
1744 				qbp = qbp->qb_next;
1745 			if (qbp->qb_first == qbp->qb_last) {
1746 				qbp->qb_first = NULL;
1747 				qbp->qb_last = NULL;
1748 			} else {
1749 				qbp->qb_first = bp->b_next;
1750 			}
1751 			qbp->qb_count -= bytecnt;
1752 			qbp->qb_mblkcnt -= mblkcnt;
1753 			if ((qbp->qb_count < qbp->qb_hiwat) &&
1754 			    (qbp->qb_mblkcnt < qbp->qb_hiwat)) {
1755 				qbp->qb_flag &= ~QB_FULL;
1756 			}
1757 		}
1758 		q->q_flag &= ~QWANTR;
1759 		bp->b_next = NULL;
1760 		bp->b_prev = NULL;
1761 	}
1762 	if (freezer != curthread)
1763 		mutex_exit(QLOCK(q));
1764 
1765 	STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL);
1766 
1767 	return (bp);
1768 }
1769 
1770 /*
1771  * Determine if a backenable is needed after removing a message in the
1772  * specified band.
1773  * NOTE: This routine assumes that something like getq_noenab() has been
1774  * already called.
1775  *
1776  * For the read side it is ok to hold sd_lock across calling this (and the
1777  * stream head often does).
1778  * But for the write side strwakeq might be invoked and it acquires sd_lock.
1779  */
1780 void
1781 qbackenable(queue_t *q, uchar_t band)
1782 {
1783 	int backenab = 0;
1784 	qband_t *qbp;
1785 	kthread_id_t freezer;
1786 
1787 	ASSERT(q);
1788 	ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
1789 
1790 	/*
1791 	 * Quick check without holding the lock.
1792 	 * OK since after getq() has lowered the q_count these flags
1793 	 * would not change unless either the qbackenable() is done by
1794 	 * another thread (which is ok) or the queue has gotten QFULL
1795 	 * in which case another backenable will take place when the queue
1796 	 * drops below q_lowat.
1797 	 */
1798 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1799 		return;
1800 
1801 	/* freezestr should allow its caller to call getq/putq */
1802 	freezer = STREAM(q)->sd_freezer;
1803 	if (freezer == curthread) {
1804 		ASSERT(frozenstr(q));
1805 		ASSERT(MUTEX_HELD(QLOCK(q)));
1806 	} else
1807 		mutex_enter(QLOCK(q));
1808 
1809 	if (band == 0) {
1810 		if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
1811 		    q->q_mblkcnt < q->q_lowat)) {
1812 			backenab = q->q_flag & (QWANTW|QWANTWSYNC);
1813 		}
1814 	} else {
1815 		int i;
1816 
1817 		ASSERT((unsigned)band <= q->q_nband);
1818 		ASSERT(q->q_bandp != NULL);
1819 
1820 		qbp = q->q_bandp;
1821 		i = band;
1822 		while (--i > 0)
1823 			qbp = qbp->qb_next;
1824 
1825 		if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
1826 		    qbp->qb_mblkcnt < qbp->qb_lowat)) {
1827 			backenab = qbp->qb_flag & QB_WANTW;
1828 		}
1829 	}
1830 
1831 	if (backenab == 0) {
1832 		if (freezer != curthread)
1833 			mutex_exit(QLOCK(q));
1834 		return;
1835 	}
1836 
1837 	/* Have to drop the lock across strwakeq and backenable */
1838 	if (backenab & QWANTWSYNC)
1839 		q->q_flag &= ~QWANTWSYNC;
1840 	if (backenab & (QWANTW|QB_WANTW)) {
1841 		if (band != 0)
1842 			qbp->qb_flag &= ~QB_WANTW;
1843 		else {
1844 			q->q_flag &= ~QWANTW;
1845 		}
1846 	}
1847 
1848 	if (freezer != curthread)
1849 		mutex_exit(QLOCK(q));
1850 
1851 	if (backenab & QWANTWSYNC)
1852 		strwakeq(q, QWANTWSYNC);
1853 	if (backenab & (QWANTW|QB_WANTW))
1854 		backenable(q, band);
1855 }
1856 
1857 /*
1858  * Remove a message from a queue.  The queue count and other
1859  * flow control parameters are adjusted and the back queue
1860  * enabled if necessary.
1861  *
1862  * rmvq can be called with the stream frozen, but other utility functions
1863  * holding QLOCK, and by streams modules without any locks/frozen.
1864  */
1865 void
1866 rmvq(queue_t *q, mblk_t *mp)
1867 {
1868 	ASSERT(mp != NULL);
1869 
1870 	rmvq_noenab(q, mp);
1871 	if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
1872 		/*
1873 		 * qbackenable can handle a frozen stream but not a "random"
1874 		 * qlock being held. Drop lock across qbackenable.
1875 		 */
1876 		mutex_exit(QLOCK(q));
1877 		qbackenable(q, mp->b_band);
1878 		mutex_enter(QLOCK(q));
1879 	} else {
1880 		qbackenable(q, mp->b_band);
1881 	}
1882 }
1883 
1884 /*
1885  * Like rmvq() but without any backenabling.
1886  * This exists to handle SR_CONSOL_DATA in strrput().
1887  */
1888 void
1889 rmvq_noenab(queue_t *q, mblk_t *mp)
1890 {
1891 	mblk_t *tmp;
1892 	int i;
1893 	qband_t *qbp = NULL;
1894 	kthread_id_t freezer;
1895 	int	bytecnt = 0, mblkcnt = 0;
1896 
1897 	freezer = STREAM(q)->sd_freezer;
1898 	if (freezer == curthread) {
1899 		ASSERT(frozenstr(q));
1900 		ASSERT(MUTEX_HELD(QLOCK(q)));
1901 	} else if (MUTEX_HELD(QLOCK(q))) {
1902 		/* Don't drop lock on exit */
1903 		freezer = curthread;
1904 	} else
1905 		mutex_enter(QLOCK(q));
1906 
1907 	ASSERT(mp->b_band <= q->q_nband);
1908 	if (mp->b_band != 0) {		/* Adjust band pointers */
1909 		ASSERT(q->q_bandp != NULL);
1910 		qbp = q->q_bandp;
1911 		i = mp->b_band;
1912 		while (--i > 0)
1913 			qbp = qbp->qb_next;
1914 		if (mp == qbp->qb_first) {
1915 			if (mp->b_next && mp->b_band == mp->b_next->b_band)
1916 				qbp->qb_first = mp->b_next;
1917 			else
1918 				qbp->qb_first = NULL;
1919 		}
1920 		if (mp == qbp->qb_last) {
1921 			if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
1922 				qbp->qb_last = mp->b_prev;
1923 			else
1924 				qbp->qb_last = NULL;
1925 		}
1926 	}
1927 
1928 	/*
1929 	 * Remove the message from the list.
1930 	 */
1931 	if (mp->b_prev)
1932 		mp->b_prev->b_next = mp->b_next;
1933 	else
1934 		q->q_first = mp->b_next;
1935 	if (mp->b_next)
1936 		mp->b_next->b_prev = mp->b_prev;
1937 	else
1938 		q->q_last = mp->b_prev;
1939 	mp->b_next = NULL;
1940 	mp->b_prev = NULL;
1941 
1942 	/* Get the size of the message for q_count accounting */
1943 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
1944 		bytecnt += (tmp->b_wptr - tmp->b_rptr);
1945 		mblkcnt++;
1946 	}
1947 
1948 	if (mp->b_band == 0) {		/* Perform q_count accounting */
1949 		q->q_count -= bytecnt;
1950 		q->q_mblkcnt -= mblkcnt;
1951 		if ((q->q_count < q->q_hiwat) &&
1952 		    (q->q_mblkcnt < q->q_hiwat)) {
1953 			q->q_flag &= ~QFULL;
1954 		}
1955 	} else {			/* Perform qb_count accounting */
1956 		qbp->qb_count -= bytecnt;
1957 		qbp->qb_mblkcnt -= mblkcnt;
1958 		if ((qbp->qb_count < qbp->qb_hiwat) &&
1959 		    (qbp->qb_mblkcnt < qbp->qb_hiwat)) {
1960 			qbp->qb_flag &= ~QB_FULL;
1961 		}
1962 	}
1963 	if (freezer != curthread)
1964 		mutex_exit(QLOCK(q));
1965 
1966 	STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL);
1967 }
1968 
1969 /*
1970  * Empty a queue.
1971  * If flag is set, remove all messages.  Otherwise, remove
1972  * only non-control messages.  If queue falls below its low
1973  * water mark, and QWANTW is set, enable the nearest upstream
1974  * service procedure.
1975  *
1976  * Historical note: when merging the M_FLUSH code in strrput with this
1977  * code one difference was discovered. flushq did not have a check
1978  * for q_lowat == 0 in the backenabling test.
1979  *
1980  * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
1981  * if one exists on the queue.
1982  */
1983 void
1984 flushq_common(queue_t *q, int flag, int pcproto_flag)
1985 {
1986 	mblk_t *mp, *nmp;
1987 	qband_t *qbp;
1988 	int backenab = 0;
1989 	unsigned char bpri;
1990 	unsigned char	qbf[NBAND];	/* band flushing backenable flags */
1991 
1992 	if (q->q_first == NULL)
1993 		return;
1994 
1995 	mutex_enter(QLOCK(q));
1996 	mp = q->q_first;
1997 	q->q_first = NULL;
1998 	q->q_last = NULL;
1999 	q->q_count = 0;
2000 	q->q_mblkcnt = 0;
2001 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2002 		qbp->qb_first = NULL;
2003 		qbp->qb_last = NULL;
2004 		qbp->qb_count = 0;
2005 		qbp->qb_mblkcnt = 0;
2006 		qbp->qb_flag &= ~QB_FULL;
2007 	}
2008 	q->q_flag &= ~QFULL;
2009 	mutex_exit(QLOCK(q));
2010 	while (mp) {
2011 		nmp = mp->b_next;
2012 		mp->b_next = mp->b_prev = NULL;
2013 
2014 		STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL);
2015 
2016 		if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
2017 			(void) putq(q, mp);
2018 		else if (flag || datamsg(mp->b_datap->db_type))
2019 			freemsg(mp);
2020 		else
2021 			(void) putq(q, mp);
2022 		mp = nmp;
2023 	}
2024 	bpri = 1;
2025 	mutex_enter(QLOCK(q));
2026 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2027 		if ((qbp->qb_flag & QB_WANTW) &&
2028 		    (((qbp->qb_count < qbp->qb_lowat) &&
2029 		    (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
2030 		    qbp->qb_lowat == 0)) {
2031 			qbp->qb_flag &= ~QB_WANTW;
2032 			backenab = 1;
2033 			qbf[bpri] = 1;
2034 		} else
2035 			qbf[bpri] = 0;
2036 		bpri++;
2037 	}
2038 	ASSERT(bpri == (unsigned char)(q->q_nband + 1));
2039 	if ((q->q_flag & QWANTW) &&
2040 	    (((q->q_count < q->q_lowat) &&
2041 	    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2042 		q->q_flag &= ~QWANTW;
2043 		backenab = 1;
2044 		qbf[0] = 1;
2045 	} else
2046 		qbf[0] = 0;
2047 
2048 	/*
2049 	 * If any band can now be written to, and there is a writer
2050 	 * for that band, then backenable the closest service procedure.
2051 	 */
2052 	if (backenab) {
2053 		mutex_exit(QLOCK(q));
2054 		for (bpri = q->q_nband; bpri != 0; bpri--)
2055 			if (qbf[bpri])
2056 				backenable(q, bpri);
2057 		if (qbf[0])
2058 			backenable(q, 0);
2059 	} else
2060 		mutex_exit(QLOCK(q));
2061 }
2062 
2063 /*
2064  * The real flushing takes place in flushq_common. This is done so that
2065  * a flag which specifies whether or not M_PCPROTO messages should be flushed
2066  * or not. Currently the only place that uses this flag is the stream head.
2067  */
2068 void
2069 flushq(queue_t *q, int flag)
2070 {
2071 	flushq_common(q, flag, 0);
2072 }
2073 
2074 /*
2075  * Flush the queue of messages of the given priority band.
2076  * There is some duplication of code between flushq and flushband.
2077  * This is because we want to optimize the code as much as possible.
2078  * The assumption is that there will be more messages in the normal
2079  * (priority 0) band than in any other.
2080  *
2081  * Historical note: when merging the M_FLUSH code in strrput with this
2082  * code one difference was discovered. flushband had an extra check for
2083  * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
2084  * case. That check does not match the man page for flushband and was not
2085  * in the strrput flush code hence it was removed.
2086  */
2087 void
2088 flushband(queue_t *q, unsigned char pri, int flag)
2089 {
2090 	mblk_t *mp;
2091 	mblk_t *nmp;
2092 	mblk_t *last;
2093 	qband_t *qbp;
2094 	int band;
2095 
2096 	ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
2097 	if (pri > q->q_nband) {
2098 		return;
2099 	}
2100 	mutex_enter(QLOCK(q));
2101 	if (pri == 0) {
2102 		mp = q->q_first;
2103 		q->q_first = NULL;
2104 		q->q_last = NULL;
2105 		q->q_count = 0;
2106 		q->q_mblkcnt = 0;
2107 		for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2108 			qbp->qb_first = NULL;
2109 			qbp->qb_last = NULL;
2110 			qbp->qb_count = 0;
2111 			qbp->qb_mblkcnt = 0;
2112 			qbp->qb_flag &= ~QB_FULL;
2113 		}
2114 		q->q_flag &= ~QFULL;
2115 		mutex_exit(QLOCK(q));
2116 		while (mp) {
2117 			nmp = mp->b_next;
2118 			mp->b_next = mp->b_prev = NULL;
2119 			if ((mp->b_band == 0) &&
2120 				((flag == FLUSHALL) ||
2121 				datamsg(mp->b_datap->db_type)))
2122 				freemsg(mp);
2123 			else
2124 				(void) putq(q, mp);
2125 			mp = nmp;
2126 		}
2127 		mutex_enter(QLOCK(q));
2128 		if ((q->q_flag & QWANTW) &&
2129 		    (((q->q_count < q->q_lowat) &&
2130 		    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2131 			q->q_flag &= ~QWANTW;
2132 			mutex_exit(QLOCK(q));
2133 
2134 			backenable(q, pri);
2135 		} else
2136 			mutex_exit(QLOCK(q));
2137 	} else {	/* pri != 0 */
2138 		boolean_t flushed = B_FALSE;
2139 		band = pri;
2140 
2141 		ASSERT(MUTEX_HELD(QLOCK(q)));
2142 		qbp = q->q_bandp;
2143 		while (--band > 0)
2144 			qbp = qbp->qb_next;
2145 		mp = qbp->qb_first;
2146 		if (mp == NULL) {
2147 			mutex_exit(QLOCK(q));
2148 			return;
2149 		}
2150 		last = qbp->qb_last->b_next;
2151 		/*
2152 		 * rmvq_noenab() and freemsg() are called for each mblk that
2153 		 * meets the criteria.  The loop is executed until the last
2154 		 * mblk has been processed.
2155 		 */
2156 		while (mp != last) {
2157 			ASSERT(mp->b_band == pri);
2158 			nmp = mp->b_next;
2159 			if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
2160 				rmvq_noenab(q, mp);
2161 				freemsg(mp);
2162 				flushed = B_TRUE;
2163 			}
2164 			mp = nmp;
2165 		}
2166 		mutex_exit(QLOCK(q));
2167 
2168 		/*
2169 		 * If any mblk(s) has been freed, we know that qbackenable()
2170 		 * will need to be called.
2171 		 */
2172 		if (flushed)
2173 			qbackenable(q, pri);
2174 	}
2175 }
2176 
2177 /*
2178  * Return 1 if the queue is not full.  If the queue is full, return
2179  * 0 (may not put message) and set QWANTW flag (caller wants to write
2180  * to the queue).
2181  */
2182 int
2183 canput(queue_t *q)
2184 {
2185 	TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
2186 
2187 	/* this is for loopback transports, they should not do a canput */
2188 	ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
2189 
2190 	/* Find next forward module that has a service procedure */
2191 	q = q->q_nfsrv;
2192 
2193 	if (!(q->q_flag & QFULL)) {
2194 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2195 		return (1);
2196 	}
2197 	mutex_enter(QLOCK(q));
2198 	if (q->q_flag & QFULL) {
2199 		q->q_flag |= QWANTW;
2200 		mutex_exit(QLOCK(q));
2201 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
2202 		return (0);
2203 	}
2204 	mutex_exit(QLOCK(q));
2205 	TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2206 	return (1);
2207 }
2208 
2209 /*
2210  * This is the new canput for use with priority bands.  Return 1 if the
2211  * band is not full.  If the band is full, return 0 (may not put message)
2212  * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
2213  * write to the queue).
2214  */
2215 int
2216 bcanput(queue_t *q, unsigned char pri)
2217 {
2218 	qband_t *qbp;
2219 
2220 	TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
2221 	if (!q)
2222 		return (0);
2223 
2224 	/* Find next forward module that has a service procedure */
2225 	q = q->q_nfsrv;
2226 
2227 	mutex_enter(QLOCK(q));
2228 	if (pri == 0) {
2229 		if (q->q_flag & QFULL) {
2230 			q->q_flag |= QWANTW;
2231 			mutex_exit(QLOCK(q));
2232 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2233 				"bcanput:%p %X %d", q, pri, 0);
2234 			return (0);
2235 		}
2236 	} else {	/* pri != 0 */
2237 		if (pri > q->q_nband) {
2238 			/*
2239 			 * No band exists yet, so return success.
2240 			 */
2241 			mutex_exit(QLOCK(q));
2242 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2243 				"bcanput:%p %X %d", q, pri, 1);
2244 			return (1);
2245 		}
2246 		qbp = q->q_bandp;
2247 		while (--pri)
2248 			qbp = qbp->qb_next;
2249 		if (qbp->qb_flag & QB_FULL) {
2250 			qbp->qb_flag |= QB_WANTW;
2251 			mutex_exit(QLOCK(q));
2252 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2253 				"bcanput:%p %X %d", q, pri, 0);
2254 			return (0);
2255 		}
2256 	}
2257 	mutex_exit(QLOCK(q));
2258 	TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2259 		"bcanput:%p %X %d", q, pri, 1);
2260 	return (1);
2261 }
2262 
2263 /*
2264  * Put a message on a queue.
2265  *
2266  * Messages are enqueued on a priority basis.  The priority classes
2267  * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
2268  * and B_NORMAL (type < QPCTL && band == 0).
2269  *
2270  * Add appropriate weighted data block sizes to queue count.
2271  * If queue hits high water mark then set QFULL flag.
2272  *
2273  * If QNOENAB is not set (putq is allowed to enable the queue),
2274  * enable the queue only if the message is PRIORITY,
2275  * or the QWANTR flag is set (indicating that the service procedure
2276  * is ready to read the queue.  This implies that a service
2277  * procedure must NEVER put a high priority message back on its own
2278  * queue, as this would result in an infinite loop (!).
2279  */
2280 int
2281 putq(queue_t *q, mblk_t *bp)
2282 {
2283 	mblk_t *tmp;
2284 	qband_t *qbp = NULL;
2285 	int mcls = (int)queclass(bp);
2286 	kthread_id_t freezer;
2287 	int	bytecnt = 0, mblkcnt = 0;
2288 
2289 	freezer = STREAM(q)->sd_freezer;
2290 	if (freezer == curthread) {
2291 		ASSERT(frozenstr(q));
2292 		ASSERT(MUTEX_HELD(QLOCK(q)));
2293 	} else
2294 		mutex_enter(QLOCK(q));
2295 
2296 	/*
2297 	 * Make sanity checks and if qband structure is not yet
2298 	 * allocated, do so.
2299 	 */
2300 	if (mcls == QPCTL) {
2301 		if (bp->b_band != 0)
2302 			bp->b_band = 0;		/* force to be correct */
2303 	} else if (bp->b_band != 0) {
2304 		int i;
2305 		qband_t **qbpp;
2306 
2307 		if (bp->b_band > q->q_nband) {
2308 
2309 			/*
2310 			 * The qband structure for this priority band is
2311 			 * not on the queue yet, so we have to allocate
2312 			 * one on the fly.  It would be wasteful to
2313 			 * associate the qband structures with every
2314 			 * queue when the queues are allocated.  This is
2315 			 * because most queues will only need the normal
2316 			 * band of flow which can be described entirely
2317 			 * by the queue itself.
2318 			 */
2319 			qbpp = &q->q_bandp;
2320 			while (*qbpp)
2321 				qbpp = &(*qbpp)->qb_next;
2322 			while (bp->b_band > q->q_nband) {
2323 				if ((*qbpp = allocband()) == NULL) {
2324 					if (freezer != curthread)
2325 						mutex_exit(QLOCK(q));
2326 					return (0);
2327 				}
2328 				(*qbpp)->qb_hiwat = q->q_hiwat;
2329 				(*qbpp)->qb_lowat = q->q_lowat;
2330 				q->q_nband++;
2331 				qbpp = &(*qbpp)->qb_next;
2332 			}
2333 		}
2334 		ASSERT(MUTEX_HELD(QLOCK(q)));
2335 		qbp = q->q_bandp;
2336 		i = bp->b_band;
2337 		while (--i)
2338 			qbp = qbp->qb_next;
2339 	}
2340 
2341 	/*
2342 	 * If queue is empty, add the message and initialize the pointers.
2343 	 * Otherwise, adjust message pointers and queue pointers based on
2344 	 * the type of the message and where it belongs on the queue.  Some
2345 	 * code is duplicated to minimize the number of conditionals and
2346 	 * hopefully minimize the amount of time this routine takes.
2347 	 */
2348 	if (!q->q_first) {
2349 		bp->b_next = NULL;
2350 		bp->b_prev = NULL;
2351 		q->q_first = bp;
2352 		q->q_last = bp;
2353 		if (qbp) {
2354 			qbp->qb_first = bp;
2355 			qbp->qb_last = bp;
2356 		}
2357 	} else if (!qbp) {	/* bp->b_band == 0 */
2358 
2359 		/*
2360 		 * If queue class of message is less than or equal to
2361 		 * that of the last one on the queue, tack on to the end.
2362 		 */
2363 		tmp = q->q_last;
2364 		if (mcls <= (int)queclass(tmp)) {
2365 			bp->b_next = NULL;
2366 			bp->b_prev = tmp;
2367 			tmp->b_next = bp;
2368 			q->q_last = bp;
2369 		} else {
2370 			tmp = q->q_first;
2371 			while ((int)queclass(tmp) >= mcls)
2372 				tmp = tmp->b_next;
2373 
2374 			/*
2375 			 * Insert bp before tmp.
2376 			 */
2377 			bp->b_next = tmp;
2378 			bp->b_prev = tmp->b_prev;
2379 			if (tmp->b_prev)
2380 				tmp->b_prev->b_next = bp;
2381 			else
2382 				q->q_first = bp;
2383 			tmp->b_prev = bp;
2384 		}
2385 	} else {		/* bp->b_band != 0 */
2386 		if (qbp->qb_first) {
2387 			tmp = qbp->qb_last;
2388 
2389 			/*
2390 			 * Insert bp after the last message in this band.
2391 			 */
2392 			bp->b_next = tmp->b_next;
2393 			if (tmp->b_next)
2394 				tmp->b_next->b_prev = bp;
2395 			else
2396 				q->q_last = bp;
2397 			bp->b_prev = tmp;
2398 			tmp->b_next = bp;
2399 		} else {
2400 			tmp = q->q_last;
2401 			if ((mcls < (int)queclass(tmp)) ||
2402 			    (bp->b_band <= tmp->b_band)) {
2403 
2404 				/*
2405 				 * Tack bp on end of queue.
2406 				 */
2407 				bp->b_next = NULL;
2408 				bp->b_prev = tmp;
2409 				tmp->b_next = bp;
2410 				q->q_last = bp;
2411 			} else {
2412 				tmp = q->q_first;
2413 				while (tmp->b_datap->db_type >= QPCTL)
2414 					tmp = tmp->b_next;
2415 				while (tmp->b_band >= bp->b_band)
2416 					tmp = tmp->b_next;
2417 
2418 				/*
2419 				 * Insert bp before tmp.
2420 				 */
2421 				bp->b_next = tmp;
2422 				bp->b_prev = tmp->b_prev;
2423 				if (tmp->b_prev)
2424 					tmp->b_prev->b_next = bp;
2425 				else
2426 					q->q_first = bp;
2427 				tmp->b_prev = bp;
2428 			}
2429 			qbp->qb_first = bp;
2430 		}
2431 		qbp->qb_last = bp;
2432 	}
2433 
2434 	/* Get message byte count for q_count accounting */
2435 	for (tmp = bp; tmp; tmp = tmp->b_cont) {
2436 		bytecnt += (tmp->b_wptr - tmp->b_rptr);
2437 		mblkcnt++;
2438 	}
2439 	if (qbp) {
2440 		qbp->qb_count += bytecnt;
2441 		qbp->qb_mblkcnt += mblkcnt;
2442 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2443 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2444 			qbp->qb_flag |= QB_FULL;
2445 		}
2446 	} else {
2447 		q->q_count += bytecnt;
2448 		q->q_mblkcnt += mblkcnt;
2449 		if ((q->q_count >= q->q_hiwat) ||
2450 		    (q->q_mblkcnt >= q->q_hiwat)) {
2451 			q->q_flag |= QFULL;
2452 		}
2453 	}
2454 
2455 	STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL);
2456 
2457 	if ((mcls > QNORM) ||
2458 	    (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
2459 		qenable_locked(q);
2460 	ASSERT(MUTEX_HELD(QLOCK(q)));
2461 	if (freezer != curthread)
2462 		mutex_exit(QLOCK(q));
2463 
2464 	return (1);
2465 }
2466 
2467 /*
2468  * Put stuff back at beginning of Q according to priority order.
2469  * See comment on putq above for details.
2470  */
2471 int
2472 putbq(queue_t *q, mblk_t *bp)
2473 {
2474 	mblk_t *tmp;
2475 	qband_t *qbp = NULL;
2476 	int mcls = (int)queclass(bp);
2477 	kthread_id_t freezer;
2478 	int	bytecnt = 0, mblkcnt = 0;
2479 
2480 	ASSERT(q && bp);
2481 	ASSERT(bp->b_next == NULL);
2482 	freezer = STREAM(q)->sd_freezer;
2483 	if (freezer == curthread) {
2484 		ASSERT(frozenstr(q));
2485 		ASSERT(MUTEX_HELD(QLOCK(q)));
2486 	} else
2487 		mutex_enter(QLOCK(q));
2488 
2489 	/*
2490 	 * Make sanity checks and if qband structure is not yet
2491 	 * allocated, do so.
2492 	 */
2493 	if (mcls == QPCTL) {
2494 		if (bp->b_band != 0)
2495 			bp->b_band = 0;		/* force to be correct */
2496 	} else if (bp->b_band != 0) {
2497 		int i;
2498 		qband_t **qbpp;
2499 
2500 		if (bp->b_band > q->q_nband) {
2501 			qbpp = &q->q_bandp;
2502 			while (*qbpp)
2503 				qbpp = &(*qbpp)->qb_next;
2504 			while (bp->b_band > q->q_nband) {
2505 				if ((*qbpp = allocband()) == NULL) {
2506 					if (freezer != curthread)
2507 						mutex_exit(QLOCK(q));
2508 					return (0);
2509 				}
2510 				(*qbpp)->qb_hiwat = q->q_hiwat;
2511 				(*qbpp)->qb_lowat = q->q_lowat;
2512 				q->q_nband++;
2513 				qbpp = &(*qbpp)->qb_next;
2514 			}
2515 		}
2516 		qbp = q->q_bandp;
2517 		i = bp->b_band;
2518 		while (--i)
2519 			qbp = qbp->qb_next;
2520 	}
2521 
2522 	/*
2523 	 * If queue is empty or if message is high priority,
2524 	 * place on the front of the queue.
2525 	 */
2526 	tmp = q->q_first;
2527 	if ((!tmp) || (mcls == QPCTL)) {
2528 		bp->b_next = tmp;
2529 		if (tmp)
2530 			tmp->b_prev = bp;
2531 		else
2532 			q->q_last = bp;
2533 		q->q_first = bp;
2534 		bp->b_prev = NULL;
2535 		if (qbp) {
2536 			qbp->qb_first = bp;
2537 			qbp->qb_last = bp;
2538 		}
2539 	} else if (qbp) {	/* bp->b_band != 0 */
2540 		tmp = qbp->qb_first;
2541 		if (tmp) {
2542 
2543 			/*
2544 			 * Insert bp before the first message in this band.
2545 			 */
2546 			bp->b_next = tmp;
2547 			bp->b_prev = tmp->b_prev;
2548 			if (tmp->b_prev)
2549 				tmp->b_prev->b_next = bp;
2550 			else
2551 				q->q_first = bp;
2552 			tmp->b_prev = bp;
2553 		} else {
2554 			tmp = q->q_last;
2555 			if ((mcls < (int)queclass(tmp)) ||
2556 			    (bp->b_band < tmp->b_band)) {
2557 
2558 				/*
2559 				 * Tack bp on end of queue.
2560 				 */
2561 				bp->b_next = NULL;
2562 				bp->b_prev = tmp;
2563 				tmp->b_next = bp;
2564 				q->q_last = bp;
2565 			} else {
2566 				tmp = q->q_first;
2567 				while (tmp->b_datap->db_type >= QPCTL)
2568 					tmp = tmp->b_next;
2569 				while (tmp->b_band > bp->b_band)
2570 					tmp = tmp->b_next;
2571 
2572 				/*
2573 				 * Insert bp before tmp.
2574 				 */
2575 				bp->b_next = tmp;
2576 				bp->b_prev = tmp->b_prev;
2577 				if (tmp->b_prev)
2578 					tmp->b_prev->b_next = bp;
2579 				else
2580 					q->q_first = bp;
2581 				tmp->b_prev = bp;
2582 			}
2583 			qbp->qb_last = bp;
2584 		}
2585 		qbp->qb_first = bp;
2586 	} else {		/* bp->b_band == 0 && !QPCTL */
2587 
2588 		/*
2589 		 * If the queue class or band is less than that of the last
2590 		 * message on the queue, tack bp on the end of the queue.
2591 		 */
2592 		tmp = q->q_last;
2593 		if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
2594 			bp->b_next = NULL;
2595 			bp->b_prev = tmp;
2596 			tmp->b_next = bp;
2597 			q->q_last = bp;
2598 		} else {
2599 			tmp = q->q_first;
2600 			while (tmp->b_datap->db_type >= QPCTL)
2601 				tmp = tmp->b_next;
2602 			while (tmp->b_band > bp->b_band)
2603 				tmp = tmp->b_next;
2604 
2605 			/*
2606 			 * Insert bp before tmp.
2607 			 */
2608 			bp->b_next = tmp;
2609 			bp->b_prev = tmp->b_prev;
2610 			if (tmp->b_prev)
2611 				tmp->b_prev->b_next = bp;
2612 			else
2613 				q->q_first = bp;
2614 			tmp->b_prev = bp;
2615 		}
2616 	}
2617 
2618 	/* Get message byte count for q_count accounting */
2619 	for (tmp = bp; tmp; tmp = tmp->b_cont) {
2620 		bytecnt += (tmp->b_wptr - tmp->b_rptr);
2621 		mblkcnt++;
2622 	}
2623 	if (qbp) {
2624 		qbp->qb_count += bytecnt;
2625 		qbp->qb_mblkcnt += mblkcnt;
2626 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2627 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2628 			qbp->qb_flag |= QB_FULL;
2629 		}
2630 	} else {
2631 		q->q_count += bytecnt;
2632 		q->q_mblkcnt += mblkcnt;
2633 		if ((q->q_count >= q->q_hiwat) ||
2634 		    (q->q_mblkcnt >= q->q_hiwat)) {
2635 			q->q_flag |= QFULL;
2636 		}
2637 	}
2638 
2639 	STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL);
2640 
2641 	if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
2642 		qenable_locked(q);
2643 	ASSERT(MUTEX_HELD(QLOCK(q)));
2644 	if (freezer != curthread)
2645 		mutex_exit(QLOCK(q));
2646 
2647 	return (1);
2648 }
2649 
2650 /*
2651  * Insert a message before an existing message on the queue.  If the
2652  * existing message is NULL, the new messages is placed on the end of
2653  * the queue.  The queue class of the new message is ignored.  However,
2654  * the priority band of the new message must adhere to the following
2655  * ordering:
2656  *
2657  *	emp->b_prev->b_band >= mp->b_band >= emp->b_band.
2658  *
2659  * All flow control parameters are updated.
2660  *
2661  * insq can be called with the stream frozen, but other utility functions
2662  * holding QLOCK, and by streams modules without any locks/frozen.
2663  */
2664 int
2665 insq(queue_t *q, mblk_t *emp, mblk_t *mp)
2666 {
2667 	mblk_t *tmp;
2668 	qband_t *qbp = NULL;
2669 	int mcls = (int)queclass(mp);
2670 	kthread_id_t freezer;
2671 	int	bytecnt = 0, mblkcnt = 0;
2672 
2673 	freezer = STREAM(q)->sd_freezer;
2674 	if (freezer == curthread) {
2675 		ASSERT(frozenstr(q));
2676 		ASSERT(MUTEX_HELD(QLOCK(q)));
2677 	} else if (MUTEX_HELD(QLOCK(q))) {
2678 		/* Don't drop lock on exit */
2679 		freezer = curthread;
2680 	} else
2681 		mutex_enter(QLOCK(q));
2682 
2683 	if (mcls == QPCTL) {
2684 		if (mp->b_band != 0)
2685 			mp->b_band = 0;		/* force to be correct */
2686 		if (emp && emp->b_prev &&
2687 		    (emp->b_prev->b_datap->db_type < QPCTL))
2688 			goto badord;
2689 	}
2690 	if (emp) {
2691 		if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
2692 		    (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
2693 		    (emp->b_prev->b_band < mp->b_band))) {
2694 			goto badord;
2695 		}
2696 	} else {
2697 		tmp = q->q_last;
2698 		if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
2699 badord:
2700 			cmn_err(CE_WARN,
2701 			    "insq: attempt to insert message out of order "
2702 			    "on q %p", (void *)q);
2703 			if (freezer != curthread)
2704 				mutex_exit(QLOCK(q));
2705 			return (0);
2706 		}
2707 	}
2708 
2709 	if (mp->b_band != 0) {
2710 		int i;
2711 		qband_t **qbpp;
2712 
2713 		if (mp->b_band > q->q_nband) {
2714 			qbpp = &q->q_bandp;
2715 			while (*qbpp)
2716 				qbpp = &(*qbpp)->qb_next;
2717 			while (mp->b_band > q->q_nband) {
2718 				if ((*qbpp = allocband()) == NULL) {
2719 					if (freezer != curthread)
2720 						mutex_exit(QLOCK(q));
2721 					return (0);
2722 				}
2723 				(*qbpp)->qb_hiwat = q->q_hiwat;
2724 				(*qbpp)->qb_lowat = q->q_lowat;
2725 				q->q_nband++;
2726 				qbpp = &(*qbpp)->qb_next;
2727 			}
2728 		}
2729 		qbp = q->q_bandp;
2730 		i = mp->b_band;
2731 		while (--i)
2732 			qbp = qbp->qb_next;
2733 	}
2734 
2735 	if ((mp->b_next = emp) != NULL) {
2736 		if ((mp->b_prev = emp->b_prev) != NULL)
2737 			emp->b_prev->b_next = mp;
2738 		else
2739 			q->q_first = mp;
2740 		emp->b_prev = mp;
2741 	} else {
2742 		if ((mp->b_prev = q->q_last) != NULL)
2743 			q->q_last->b_next = mp;
2744 		else
2745 			q->q_first = mp;
2746 		q->q_last = mp;
2747 	}
2748 
2749 	/* Get mblk and byte count for q_count accounting */
2750 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
2751 		bytecnt += (tmp->b_wptr - tmp->b_rptr);
2752 		mblkcnt++;
2753 	}
2754 
2755 	if (qbp) {	/* adjust qband pointers and count */
2756 		if (!qbp->qb_first) {
2757 			qbp->qb_first = mp;
2758 			qbp->qb_last = mp;
2759 		} else {
2760 			if (mp->b_prev == NULL || (mp->b_prev != NULL &&
2761 			    (mp->b_prev->b_band != mp->b_band)))
2762 				qbp->qb_first = mp;
2763 			else if (mp->b_next == NULL || (mp->b_next != NULL &&
2764 			    (mp->b_next->b_band != mp->b_band)))
2765 				qbp->qb_last = mp;
2766 		}
2767 		qbp->qb_count += bytecnt;
2768 		qbp->qb_mblkcnt += mblkcnt;
2769 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2770 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2771 			qbp->qb_flag |= QB_FULL;
2772 		}
2773 	} else {
2774 		q->q_count += bytecnt;
2775 		q->q_mblkcnt += mblkcnt;
2776 		if ((q->q_count >= q->q_hiwat) ||
2777 		    (q->q_mblkcnt >= q->q_hiwat)) {
2778 			q->q_flag |= QFULL;
2779 		}
2780 	}
2781 
2782 	STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL);
2783 
2784 	if (canenable(q) && (q->q_flag & QWANTR))
2785 		qenable_locked(q);
2786 
2787 	ASSERT(MUTEX_HELD(QLOCK(q)));
2788 	if (freezer != curthread)
2789 		mutex_exit(QLOCK(q));
2790 
2791 	return (1);
2792 }
2793 
2794 /*
2795  * Create and put a control message on queue.
2796  */
2797 int
2798 putctl(queue_t *q, int type)
2799 {
2800 	mblk_t *bp;
2801 
2802 	if ((datamsg(type) && (type != M_DELAY)) ||
2803 	    (bp = allocb_tryhard(0)) == NULL)
2804 		return (0);
2805 	bp->b_datap->db_type = (unsigned char) type;
2806 
2807 	put(q, bp);
2808 
2809 	return (1);
2810 }
2811 
2812 /*
2813  * Control message with a single-byte parameter
2814  */
2815 int
2816 putctl1(queue_t *q, int type, int param)
2817 {
2818 	mblk_t *bp;
2819 
2820 	if ((datamsg(type) && (type != M_DELAY)) ||
2821 	    (bp = allocb_tryhard(1)) == NULL)
2822 		return (0);
2823 	bp->b_datap->db_type = (unsigned char)type;
2824 	*bp->b_wptr++ = (unsigned char)param;
2825 
2826 	put(q, bp);
2827 
2828 	return (1);
2829 }
2830 
2831 int
2832 putnextctl1(queue_t *q, int type, int param)
2833 {
2834 	mblk_t *bp;
2835 
2836 	if ((datamsg(type) && (type != M_DELAY)) ||
2837 		((bp = allocb_tryhard(1)) == NULL))
2838 		return (0);
2839 
2840 	bp->b_datap->db_type = (unsigned char)type;
2841 	*bp->b_wptr++ = (unsigned char)param;
2842 
2843 	putnext(q, bp);
2844 
2845 	return (1);
2846 }
2847 
2848 int
2849 putnextctl(queue_t *q, int type)
2850 {
2851 	mblk_t *bp;
2852 
2853 	if ((datamsg(type) && (type != M_DELAY)) ||
2854 		((bp = allocb_tryhard(0)) == NULL))
2855 		return (0);
2856 	bp->b_datap->db_type = (unsigned char)type;
2857 
2858 	putnext(q, bp);
2859 
2860 	return (1);
2861 }
2862 
2863 /*
2864  * Return the queue upstream from this one
2865  */
2866 queue_t *
2867 backq(queue_t *q)
2868 {
2869 	q = _OTHERQ(q);
2870 	if (q->q_next) {
2871 		q = q->q_next;
2872 		return (_OTHERQ(q));
2873 	}
2874 	return (NULL);
2875 }
2876 
2877 /*
2878  * Send a block back up the queue in reverse from this
2879  * one (e.g. to respond to ioctls)
2880  */
2881 void
2882 qreply(queue_t *q, mblk_t *bp)
2883 {
2884 	ASSERT(q && bp);
2885 
2886 	putnext(_OTHERQ(q), bp);
2887 }
2888 
2889 /*
2890  * Streams Queue Scheduling
2891  *
2892  * Queues are enabled through qenable() when they have messages to
2893  * process.  They are serviced by queuerun(), which runs each enabled
2894  * queue's service procedure.  The call to queuerun() is processor
2895  * dependent - the general principle is that it be run whenever a queue
2896  * is enabled but before returning to user level.  For system calls,
2897  * the function runqueues() is called if their action causes a queue
2898  * to be enabled.  For device interrupts, queuerun() should be
2899  * called before returning from the last level of interrupt.  Beyond
2900  * this, no timing assumptions should be made about queue scheduling.
2901  */
2902 
2903 /*
2904  * Enable a queue: put it on list of those whose service procedures are
2905  * ready to run and set up the scheduling mechanism.
2906  * The broadcast is done outside the mutex -> to avoid the woken thread
2907  * from contending with the mutex. This is OK 'cos the queue has been
2908  * enqueued on the runlist and flagged safely at this point.
2909  */
2910 void
2911 qenable(queue_t *q)
2912 {
2913 	mutex_enter(QLOCK(q));
2914 	qenable_locked(q);
2915 	mutex_exit(QLOCK(q));
2916 }
2917 /*
2918  * Return number of messages on queue
2919  */
2920 int
2921 qsize(queue_t *qp)
2922 {
2923 	int count = 0;
2924 	mblk_t *mp;
2925 
2926 	mutex_enter(QLOCK(qp));
2927 	for (mp = qp->q_first; mp; mp = mp->b_next)
2928 		count++;
2929 	mutex_exit(QLOCK(qp));
2930 	return (count);
2931 }
2932 
2933 /*
2934  * noenable - set queue so that putq() will not enable it.
2935  * enableok - set queue so that putq() can enable it.
2936  */
2937 void
2938 noenable(queue_t *q)
2939 {
2940 	mutex_enter(QLOCK(q));
2941 	q->q_flag |= QNOENB;
2942 	mutex_exit(QLOCK(q));
2943 }
2944 
2945 void
2946 enableok(queue_t *q)
2947 {
2948 	mutex_enter(QLOCK(q));
2949 	q->q_flag &= ~QNOENB;
2950 	mutex_exit(QLOCK(q));
2951 }
2952 
2953 /*
2954  * Set queue fields.
2955  */
2956 int
2957 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
2958 {
2959 	qband_t *qbp = NULL;
2960 	queue_t	*wrq;
2961 	int error = 0;
2962 	kthread_id_t freezer;
2963 
2964 	freezer = STREAM(q)->sd_freezer;
2965 	if (freezer == curthread) {
2966 		ASSERT(frozenstr(q));
2967 		ASSERT(MUTEX_HELD(QLOCK(q)));
2968 	} else
2969 		mutex_enter(QLOCK(q));
2970 
2971 	if (what >= QBAD) {
2972 		error = EINVAL;
2973 		goto done;
2974 	}
2975 	if (pri != 0) {
2976 		int i;
2977 		qband_t **qbpp;
2978 
2979 		if (pri > q->q_nband) {
2980 			qbpp = &q->q_bandp;
2981 			while (*qbpp)
2982 				qbpp = &(*qbpp)->qb_next;
2983 			while (pri > q->q_nband) {
2984 				if ((*qbpp = allocband()) == NULL) {
2985 					error = EAGAIN;
2986 					goto done;
2987 				}
2988 				(*qbpp)->qb_hiwat = q->q_hiwat;
2989 				(*qbpp)->qb_lowat = q->q_lowat;
2990 				q->q_nband++;
2991 				qbpp = &(*qbpp)->qb_next;
2992 			}
2993 		}
2994 		qbp = q->q_bandp;
2995 		i = pri;
2996 		while (--i)
2997 			qbp = qbp->qb_next;
2998 	}
2999 	switch (what) {
3000 
3001 	case QHIWAT:
3002 		if (qbp)
3003 			qbp->qb_hiwat = (size_t)val;
3004 		else
3005 			q->q_hiwat = (size_t)val;
3006 		break;
3007 
3008 	case QLOWAT:
3009 		if (qbp)
3010 			qbp->qb_lowat = (size_t)val;
3011 		else
3012 			q->q_lowat = (size_t)val;
3013 		break;
3014 
3015 	case QMAXPSZ:
3016 		if (qbp)
3017 			error = EINVAL;
3018 		else
3019 			q->q_maxpsz = (ssize_t)val;
3020 
3021 		/*
3022 		 * Performance concern, strwrite looks at the module below
3023 		 * the stream head for the maxpsz each time it does a write
3024 		 * we now cache it at the stream head.  Check to see if this
3025 		 * queue is sitting directly below the stream head.
3026 		 */
3027 		wrq = STREAM(q)->sd_wrq;
3028 		if (q != wrq->q_next)
3029 			break;
3030 
3031 		/*
3032 		 * If the stream is not frozen drop the current QLOCK and
3033 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3034 		 */
3035 		if (freezer != curthread) {
3036 			mutex_exit(QLOCK(q));
3037 			mutex_enter(QLOCK(wrq));
3038 		}
3039 		ASSERT(MUTEX_HELD(QLOCK(wrq)));
3040 
3041 		if (strmsgsz != 0) {
3042 			if (val == INFPSZ)
3043 				val = strmsgsz;
3044 			else  {
3045 				if (STREAM(q)->sd_vnode->v_type == VFIFO)
3046 					val = MIN(PIPE_BUF, val);
3047 				else
3048 					val = MIN(strmsgsz, val);
3049 			}
3050 		}
3051 		STREAM(q)->sd_qn_maxpsz = val;
3052 		if (freezer != curthread) {
3053 			mutex_exit(QLOCK(wrq));
3054 			mutex_enter(QLOCK(q));
3055 		}
3056 		break;
3057 
3058 	case QMINPSZ:
3059 		if (qbp)
3060 			error = EINVAL;
3061 		else
3062 			q->q_minpsz = (ssize_t)val;
3063 
3064 		/*
3065 		 * Performance concern, strwrite looks at the module below
3066 		 * the stream head for the maxpsz each time it does a write
3067 		 * we now cache it at the stream head.  Check to see if this
3068 		 * queue is sitting directly below the stream head.
3069 		 */
3070 		wrq = STREAM(q)->sd_wrq;
3071 		if (q != wrq->q_next)
3072 			break;
3073 
3074 		/*
3075 		 * If the stream is not frozen drop the current QLOCK and
3076 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3077 		 */
3078 		if (freezer != curthread) {
3079 			mutex_exit(QLOCK(q));
3080 			mutex_enter(QLOCK(wrq));
3081 		}
3082 		STREAM(q)->sd_qn_minpsz = (ssize_t)val;
3083 
3084 		if (freezer != curthread) {
3085 			mutex_exit(QLOCK(wrq));
3086 			mutex_enter(QLOCK(q));
3087 		}
3088 		break;
3089 
3090 	case QSTRUIOT:
3091 		if (qbp)
3092 			error = EINVAL;
3093 		else
3094 			q->q_struiot = (ushort_t)val;
3095 		break;
3096 
3097 	case QCOUNT:
3098 	case QFIRST:
3099 	case QLAST:
3100 	case QFLAG:
3101 		error = EPERM;
3102 		break;
3103 
3104 	default:
3105 		error = EINVAL;
3106 		break;
3107 	}
3108 done:
3109 	if (freezer != curthread)
3110 		mutex_exit(QLOCK(q));
3111 	return (error);
3112 }
3113 
3114 /*
3115  * Get queue fields.
3116  */
3117 int
3118 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
3119 {
3120 	qband_t 	*qbp = NULL;
3121 	int 		error = 0;
3122 	kthread_id_t 	freezer;
3123 
3124 	freezer = STREAM(q)->sd_freezer;
3125 	if (freezer == curthread) {
3126 		ASSERT(frozenstr(q));
3127 		ASSERT(MUTEX_HELD(QLOCK(q)));
3128 	} else
3129 		mutex_enter(QLOCK(q));
3130 	if (what >= QBAD) {
3131 		error = EINVAL;
3132 		goto done;
3133 	}
3134 	if (pri != 0) {
3135 		int i;
3136 		qband_t **qbpp;
3137 
3138 		if (pri > q->q_nband) {
3139 			qbpp = &q->q_bandp;
3140 			while (*qbpp)
3141 				qbpp = &(*qbpp)->qb_next;
3142 			while (pri > q->q_nband) {
3143 				if ((*qbpp = allocband()) == NULL) {
3144 					error = EAGAIN;
3145 					goto done;
3146 				}
3147 				(*qbpp)->qb_hiwat = q->q_hiwat;
3148 				(*qbpp)->qb_lowat = q->q_lowat;
3149 				q->q_nband++;
3150 				qbpp = &(*qbpp)->qb_next;
3151 			}
3152 		}
3153 		qbp = q->q_bandp;
3154 		i = pri;
3155 		while (--i)
3156 			qbp = qbp->qb_next;
3157 	}
3158 	switch (what) {
3159 	case QHIWAT:
3160 		if (qbp)
3161 			*(size_t *)valp = qbp->qb_hiwat;
3162 		else
3163 			*(size_t *)valp = q->q_hiwat;
3164 		break;
3165 
3166 	case QLOWAT:
3167 		if (qbp)
3168 			*(size_t *)valp = qbp->qb_lowat;
3169 		else
3170 			*(size_t *)valp = q->q_lowat;
3171 		break;
3172 
3173 	case QMAXPSZ:
3174 		if (qbp)
3175 			error = EINVAL;
3176 		else
3177 			*(ssize_t *)valp = q->q_maxpsz;
3178 		break;
3179 
3180 	case QMINPSZ:
3181 		if (qbp)
3182 			error = EINVAL;
3183 		else
3184 			*(ssize_t *)valp = q->q_minpsz;
3185 		break;
3186 
3187 	case QCOUNT:
3188 		if (qbp)
3189 			*(size_t *)valp = qbp->qb_count;
3190 		else
3191 			*(size_t *)valp = q->q_count;
3192 		break;
3193 
3194 	case QFIRST:
3195 		if (qbp)
3196 			*(mblk_t **)valp = qbp->qb_first;
3197 		else
3198 			*(mblk_t **)valp = q->q_first;
3199 		break;
3200 
3201 	case QLAST:
3202 		if (qbp)
3203 			*(mblk_t **)valp = qbp->qb_last;
3204 		else
3205 			*(mblk_t **)valp = q->q_last;
3206 		break;
3207 
3208 	case QFLAG:
3209 		if (qbp)
3210 			*(uint_t *)valp = qbp->qb_flag;
3211 		else
3212 			*(uint_t *)valp = q->q_flag;
3213 		break;
3214 
3215 	case QSTRUIOT:
3216 		if (qbp)
3217 			error = EINVAL;
3218 		else
3219 			*(short *)valp = q->q_struiot;
3220 		break;
3221 
3222 	default:
3223 		error = EINVAL;
3224 		break;
3225 	}
3226 done:
3227 	if (freezer != curthread)
3228 		mutex_exit(QLOCK(q));
3229 	return (error);
3230 }
3231 
3232 /*
3233  * Function awakes all in cvwait/sigwait/pollwait, on one of:
3234  *	QWANTWSYNC or QWANTR or QWANTW,
3235  *
3236  * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
3237  *	 deferred wakeup will be done. Also if strpoll() in progress then a
3238  *	 deferred pollwakeup will be done.
3239  */
3240 void
3241 strwakeq(queue_t *q, int flag)
3242 {
3243 	stdata_t 	*stp = STREAM(q);
3244 	pollhead_t 	*pl;
3245 
3246 	mutex_enter(&stp->sd_lock);
3247 	pl = &stp->sd_pollist;
3248 	if (flag & QWANTWSYNC) {
3249 		ASSERT(!(q->q_flag & QREADR));
3250 		if (stp->sd_flag & WSLEEP) {
3251 			stp->sd_flag &= ~WSLEEP;
3252 			cv_broadcast(&stp->sd_wrq->q_wait);
3253 		} else {
3254 			stp->sd_wakeq |= WSLEEP;
3255 		}
3256 
3257 		mutex_exit(&stp->sd_lock);
3258 		pollwakeup(pl, POLLWRNORM);
3259 		mutex_enter(&stp->sd_lock);
3260 
3261 		if (stp->sd_sigflags & S_WRNORM)
3262 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3263 	} else if (flag & QWANTR) {
3264 		if (stp->sd_flag & RSLEEP) {
3265 			stp->sd_flag &= ~RSLEEP;
3266 			cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
3267 		} else {
3268 			stp->sd_wakeq |= RSLEEP;
3269 		}
3270 
3271 		mutex_exit(&stp->sd_lock);
3272 		pollwakeup(pl, POLLIN | POLLRDNORM);
3273 		mutex_enter(&stp->sd_lock);
3274 
3275 		{
3276 			int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
3277 
3278 			if (events)
3279 				strsendsig(stp->sd_siglist, events, 0, 0);
3280 		}
3281 	} else {
3282 		if (stp->sd_flag & WSLEEP) {
3283 			stp->sd_flag &= ~WSLEEP;
3284 			cv_broadcast(&stp->sd_wrq->q_wait);
3285 		}
3286 
3287 		mutex_exit(&stp->sd_lock);
3288 		pollwakeup(pl, POLLWRNORM);
3289 		mutex_enter(&stp->sd_lock);
3290 
3291 		if (stp->sd_sigflags & S_WRNORM)
3292 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3293 	}
3294 	mutex_exit(&stp->sd_lock);
3295 }
3296 
3297 int
3298 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
3299 {
3300 	stdata_t *stp = STREAM(q);
3301 	int typ  = STRUIOT_STANDARD;
3302 	uio_t	 *uiop = &dp->d_uio;
3303 	dblk_t	 *dbp;
3304 	ssize_t	 uiocnt;
3305 	ssize_t	 cnt;
3306 	unsigned char *ptr;
3307 	ssize_t	 resid;
3308 	int	 error = 0;
3309 	on_trap_data_t otd;
3310 	queue_t	*stwrq;
3311 
3312 	/*
3313 	 * Plumbing may change while taking the type so store the
3314 	 * queue in a temporary variable. It doesn't matter even
3315 	 * if the we take the type from the previous plumbing,
3316 	 * that's because if the plumbing has changed when we were
3317 	 * holding the queue in a temporary variable, we can continue
3318 	 * processing the message the way it would have been processed
3319 	 * in the old plumbing, without any side effects but a bit
3320 	 * extra processing for partial ip header checksum.
3321 	 *
3322 	 * This has been done to avoid holding the sd_lock which is
3323 	 * very hot.
3324 	 */
3325 
3326 	stwrq = stp->sd_struiowrq;
3327 	if (stwrq)
3328 		typ = stwrq->q_struiot;
3329 
3330 	for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
3331 		dbp = mp->b_datap;
3332 		ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
3333 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3334 		cnt = MIN(uiocnt, uiop->uio_resid);
3335 		if (!(dbp->db_struioflag & STRUIO_SPEC) ||
3336 		    (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
3337 			/*
3338 			 * Either this mblk has already been processed
3339 			 * or there is no more room in this mblk (?).
3340 			 */
3341 			continue;
3342 		}
3343 		switch (typ) {
3344 		case STRUIOT_STANDARD:
3345 			if (noblock) {
3346 				if (on_trap(&otd, OT_DATA_ACCESS)) {
3347 					no_trap();
3348 					error = EWOULDBLOCK;
3349 					goto out;
3350 				}
3351 			}
3352 			if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
3353 				if (noblock)
3354 					no_trap();
3355 				goto out;
3356 			}
3357 			if (noblock)
3358 				no_trap();
3359 			break;
3360 
3361 		default:
3362 			error = EIO;
3363 			goto out;
3364 		}
3365 		dbp->db_struioflag |= STRUIO_DONE;
3366 		dbp->db_cksumstuff += cnt;
3367 	}
3368 out:
3369 	if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
3370 		/*
3371 		 * A fault has occured and some bytes were moved to the
3372 		 * current mblk, the uio_t has already been updated by
3373 		 * the appropriate uio routine, so also update the mblk
3374 		 * to reflect this in case this same mblk chain is used
3375 		 * again (after the fault has been handled).
3376 		 */
3377 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3378 		if (uiocnt >= resid)
3379 			dbp->db_cksumstuff += resid;
3380 	}
3381 	return (error);
3382 }
3383 
3384 /*
3385  * Try to enter queue synchronously. Any attempt to enter a closing queue will
3386  * fails. The qp->q_rwcnt keeps track of the number of successful entries so
3387  * that removeq() will not try to close the queue while a thread is inside the
3388  * queue.
3389  */
3390 static boolean_t
3391 rwnext_enter(queue_t *qp)
3392 {
3393 	mutex_enter(QLOCK(qp));
3394 	if (qp->q_flag & QWCLOSE) {
3395 		mutex_exit(QLOCK(qp));
3396 		return (B_FALSE);
3397 	}
3398 	qp->q_rwcnt++;
3399 	ASSERT(qp->q_rwcnt != 0);
3400 	mutex_exit(QLOCK(qp));
3401 	return (B_TRUE);
3402 }
3403 
3404 /*
3405  * Decrease the count of threads running in sync stream queue and wake up any
3406  * threads blocked in removeq().
3407  */
3408 static void
3409 rwnext_exit(queue_t *qp)
3410 {
3411 	mutex_enter(QLOCK(qp));
3412 	qp->q_rwcnt--;
3413 	if (qp->q_flag & QWANTRMQSYNC) {
3414 		qp->q_flag &= ~QWANTRMQSYNC;
3415 		cv_broadcast(&qp->q_wait);
3416 	}
3417 	mutex_exit(QLOCK(qp));
3418 }
3419 
3420 /*
3421  * The purpose of rwnext() is to call the rw procedure of the next
3422  * (downstream) modules queue.
3423  *
3424  * treated as put entrypoint for perimeter syncronization.
3425  *
3426  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3427  * sync queues). If it is CIPUT sync queue sq_count is incremented and it does
3428  * not matter if any regular put entrypoints have been already entered. We
3429  * can't increment one of the sq_putcounts (instead of sq_count) because
3430  * qwait_rw won't know which counter to decrement.
3431  *
3432  * It would be reasonable to add the lockless FASTPUT logic.
3433  */
3434 int
3435 rwnext(queue_t *qp, struiod_t *dp)
3436 {
3437 	queue_t		*nqp;
3438 	syncq_t		*sq;
3439 	uint16_t	count;
3440 	uint16_t	flags;
3441 	struct qinit	*qi;
3442 	int		(*proc)();
3443 	struct stdata	*stp;
3444 	int		isread;
3445 	int		rval;
3446 
3447 	stp = STREAM(qp);
3448 	/*
3449 	 * Prevent q_next from changing by holding sd_lock until acquiring
3450 	 * SQLOCK. Note that a read-side rwnext from the streamhead will
3451 	 * already have sd_lock acquired. In either case sd_lock is always
3452 	 * released after acquiring SQLOCK.
3453 	 *
3454 	 * The streamhead read-side holding sd_lock when calling rwnext is
3455 	 * required to prevent a race condition were M_DATA mblks flowing
3456 	 * up the read-side of the stream could be bypassed by a rwnext()
3457 	 * down-call. In this case sd_lock acts as the streamhead perimeter.
3458 	 */
3459 	if ((nqp = _WR(qp)) == qp) {
3460 		isread = 0;
3461 		mutex_enter(&stp->sd_lock);
3462 		qp = nqp->q_next;
3463 	} else {
3464 		isread = 1;
3465 		if (nqp != stp->sd_wrq)
3466 			/* Not streamhead */
3467 			mutex_enter(&stp->sd_lock);
3468 		qp = _RD(nqp->q_next);
3469 	}
3470 	qi = qp->q_qinfo;
3471 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
3472 		/*
3473 		 * Not a synchronous module or no r/w procedure for this
3474 		 * queue, so just return EINVAL and let the caller handle it.
3475 		 */
3476 		mutex_exit(&stp->sd_lock);
3477 		return (EINVAL);
3478 	}
3479 
3480 	if (rwnext_enter(qp) == B_FALSE) {
3481 		mutex_exit(&stp->sd_lock);
3482 		return (EINVAL);
3483 	}
3484 
3485 	sq = qp->q_syncq;
3486 	mutex_enter(SQLOCK(sq));
3487 	mutex_exit(&stp->sd_lock);
3488 	count = sq->sq_count;
3489 	flags = sq->sq_flags;
3490 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3491 
3492 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3493 		/*
3494 		 * if this queue is being closed, return.
3495 		 */
3496 		if (qp->q_flag & QWCLOSE) {
3497 			mutex_exit(SQLOCK(sq));
3498 			rwnext_exit(qp);
3499 			return (EINVAL);
3500 		}
3501 
3502 		/*
3503 		 * Wait until we can enter the inner perimeter.
3504 		 */
3505 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3506 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3507 		count = sq->sq_count;
3508 		flags = sq->sq_flags;
3509 	}
3510 
3511 	if (isread == 0 && stp->sd_struiowrq == NULL ||
3512 	    isread == 1 && stp->sd_struiordq == NULL) {
3513 		/*
3514 		 * Stream plumbing changed while waiting for inner perimeter
3515 		 * so just return EINVAL and let the caller handle it.
3516 		 */
3517 		mutex_exit(SQLOCK(sq));
3518 		rwnext_exit(qp);
3519 		return (EINVAL);
3520 	}
3521 	if (!(flags & SQ_CIPUT))
3522 		sq->sq_flags = flags | SQ_EXCL;
3523 	sq->sq_count = count + 1;
3524 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3525 	/*
3526 	 * Note: The only message ordering guarantee that rwnext() makes is
3527 	 *	 for the write queue flow-control case. All others (r/w queue
3528 	 *	 with q_count > 0 (or q_first != 0)) are the resposibilty of
3529 	 *	 the queue's rw procedure. This could be genralized here buy
3530 	 *	 running the queue's service procedure, but that wouldn't be
3531 	 *	 the most efficent for all cases.
3532 	 */
3533 	mutex_exit(SQLOCK(sq));
3534 	if (! isread && (qp->q_flag & QFULL)) {
3535 		/*
3536 		 * Write queue may be flow controlled. If so,
3537 		 * mark the queue for wakeup when it's not.
3538 		 */
3539 		mutex_enter(QLOCK(qp));
3540 		if (qp->q_flag & QFULL) {
3541 			qp->q_flag |= QWANTWSYNC;
3542 			mutex_exit(QLOCK(qp));
3543 			rval = EWOULDBLOCK;
3544 			goto out;
3545 		}
3546 		mutex_exit(QLOCK(qp));
3547 	}
3548 
3549 	if (! isread && dp->d_mp)
3550 		STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
3551 		    dp->d_mp->b_datap->db_base);
3552 
3553 	rval = (*proc)(qp, dp);
3554 
3555 	if (isread && dp->d_mp)
3556 		STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
3557 		    dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
3558 out:
3559 	/*
3560 	 * The queue is protected from being freed by sq_count, so it is
3561 	 * safe to call rwnext_exit and reacquire SQLOCK(sq).
3562 	 */
3563 	rwnext_exit(qp);
3564 
3565 	mutex_enter(SQLOCK(sq));
3566 	flags = sq->sq_flags;
3567 	ASSERT(sq->sq_count != 0);
3568 	sq->sq_count--;
3569 	if (flags & SQ_TAIL) {
3570 		putnext_tail(sq, qp, flags);
3571 		/*
3572 		 * The only purpose of this ASSERT is to preserve calling stack
3573 		 * in DEBUG kernel.
3574 		 */
3575 		ASSERT(flags & SQ_TAIL);
3576 		return (rval);
3577 	}
3578 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3579 	/*
3580 	 * Safe to always drop SQ_EXCL:
3581 	 *	Not SQ_CIPUT means we set SQ_EXCL above
3582 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3583 	 *	did a qwriter(INNER) in which case nobody else
3584 	 *	is in the inner perimeter and we are exiting.
3585 	 *
3586 	 * I would like to make the following assertion:
3587 	 *
3588 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3589 	 * 	sq->sq_count == 0);
3590 	 *
3591 	 * which indicates that if we are both putshared and exclusive,
3592 	 * we became exclusive while executing the putproc, and the only
3593 	 * claim on the syncq was the one we dropped a few lines above.
3594 	 * But other threads that enter putnext while the syncq is exclusive
3595 	 * need to make a claim as they may need to drop SQLOCK in the
3596 	 * has_writers case to avoid deadlocks.  If these threads are
3597 	 * delayed or preempted, it is possible that the writer thread can
3598 	 * find out that there are other claims making the (sq_count == 0)
3599 	 * test invalid.
3600 	 */
3601 
3602 	sq->sq_flags = flags & ~SQ_EXCL;
3603 	if (sq->sq_flags & SQ_WANTWAKEUP) {
3604 		sq->sq_flags &= ~SQ_WANTWAKEUP;
3605 		cv_broadcast(&sq->sq_wait);
3606 	}
3607 	mutex_exit(SQLOCK(sq));
3608 	return (rval);
3609 }
3610 
3611 /*
3612  * The purpose of infonext() is to call the info procedure of the next
3613  * (downstream) modules queue.
3614  *
3615  * treated as put entrypoint for perimeter syncronization.
3616  *
3617  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3618  * sync queues). If it is CIPUT sync queue regular sq_count is incremented and
3619  * it does not matter if any regular put entrypoints have been already
3620  * entered.
3621  */
3622 int
3623 infonext(queue_t *qp, infod_t *idp)
3624 {
3625 	queue_t		*nqp;
3626 	syncq_t		*sq;
3627 	uint16_t	count;
3628 	uint16_t 	flags;
3629 	struct qinit	*qi;
3630 	int		(*proc)();
3631 	struct stdata	*stp;
3632 	int		rval;
3633 
3634 	stp = STREAM(qp);
3635 	/*
3636 	 * Prevent q_next from changing by holding sd_lock until
3637 	 * acquiring SQLOCK.
3638 	 */
3639 	mutex_enter(&stp->sd_lock);
3640 	if ((nqp = _WR(qp)) == qp) {
3641 		qp = nqp->q_next;
3642 	} else {
3643 		qp = _RD(nqp->q_next);
3644 	}
3645 	qi = qp->q_qinfo;
3646 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
3647 		mutex_exit(&stp->sd_lock);
3648 		return (EINVAL);
3649 	}
3650 	sq = qp->q_syncq;
3651 	mutex_enter(SQLOCK(sq));
3652 	mutex_exit(&stp->sd_lock);
3653 	count = sq->sq_count;
3654 	flags = sq->sq_flags;
3655 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3656 
3657 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3658 		/*
3659 		 * Wait until we can enter the inner perimeter.
3660 		 */
3661 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3662 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3663 		count = sq->sq_count;
3664 		flags = sq->sq_flags;
3665 	}
3666 
3667 	if (! (flags & SQ_CIPUT))
3668 		sq->sq_flags = flags | SQ_EXCL;
3669 	sq->sq_count = count + 1;
3670 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3671 	mutex_exit(SQLOCK(sq));
3672 
3673 	rval = (*proc)(qp, idp);
3674 
3675 	mutex_enter(SQLOCK(sq));
3676 	flags = sq->sq_flags;
3677 	ASSERT(sq->sq_count != 0);
3678 	sq->sq_count--;
3679 	if (flags & SQ_TAIL) {
3680 		putnext_tail(sq, qp, flags);
3681 		/*
3682 		 * The only purpose of this ASSERT is to preserve calling stack
3683 		 * in DEBUG kernel.
3684 		 */
3685 		ASSERT(flags & SQ_TAIL);
3686 		return (rval);
3687 	}
3688 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3689 /*
3690  * XXXX
3691  * I am not certain the next comment is correct here.  I need to consider
3692  * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
3693  * might cause other problems.  It just might be safer to drop it if
3694  * !SQ_CIPUT because that is when we set it.
3695  */
3696 	/*
3697 	 * Safe to always drop SQ_EXCL:
3698 	 *	Not SQ_CIPUT means we set SQ_EXCL above
3699 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3700 	 *	did a qwriter(INNER) in which case nobody else
3701 	 *	is in the inner perimeter and we are exiting.
3702 	 *
3703 	 * I would like to make the following assertion:
3704 	 *
3705 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3706 	 *	sq->sq_count == 0);
3707 	 *
3708 	 * which indicates that if we are both putshared and exclusive,
3709 	 * we became exclusive while executing the putproc, and the only
3710 	 * claim on the syncq was the one we dropped a few lines above.
3711 	 * But other threads that enter putnext while the syncq is exclusive
3712 	 * need to make a claim as they may need to drop SQLOCK in the
3713 	 * has_writers case to avoid deadlocks.  If these threads are
3714 	 * delayed or preempted, it is possible that the writer thread can
3715 	 * find out that there are other claims making the (sq_count == 0)
3716 	 * test invalid.
3717 	 */
3718 
3719 	sq->sq_flags = flags & ~SQ_EXCL;
3720 	mutex_exit(SQLOCK(sq));
3721 	return (rval);
3722 }
3723 
3724 /*
3725  * Return nonzero if the queue is responsible for struio(), else return 0.
3726  */
3727 int
3728 isuioq(queue_t *q)
3729 {
3730 	if (q->q_flag & QREADR)
3731 		return (STREAM(q)->sd_struiordq == q);
3732 	else
3733 		return (STREAM(q)->sd_struiowrq == q);
3734 }
3735 
3736 #if defined(__sparc)
3737 int disable_putlocks = 0;
3738 #else
3739 int disable_putlocks = 1;
3740 #endif
3741 
3742 /*
3743  * called by create_putlock.
3744  */
3745 static void
3746 create_syncq_putlocks(queue_t *q)
3747 {
3748 	syncq_t	*sq = q->q_syncq;
3749 	ciputctrl_t *cip;
3750 	int i;
3751 
3752 	ASSERT(sq != NULL);
3753 
3754 	ASSERT(disable_putlocks == 0);
3755 	ASSERT(n_ciputctrl >= min_n_ciputctrl);
3756 	ASSERT(ciputctrl_cache != NULL);
3757 
3758 	if (!(sq->sq_type & SQ_CIPUT))
3759 		return;
3760 
3761 	for (i = 0; i <= 1; i++) {
3762 		if (sq->sq_ciputctrl == NULL) {
3763 			cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
3764 			SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
3765 			mutex_enter(SQLOCK(sq));
3766 			if (sq->sq_ciputctrl != NULL) {
3767 				mutex_exit(SQLOCK(sq));
3768 				kmem_cache_free(ciputctrl_cache, cip);
3769 			} else {
3770 				ASSERT(sq->sq_nciputctrl == 0);
3771 				sq->sq_nciputctrl = n_ciputctrl - 1;
3772 				/*
3773 				 * putnext checks sq_ciputctrl without holding
3774 				 * SQLOCK. if it is not NULL putnext assumes
3775 				 * sq_nciputctrl is initialized. membar below
3776 				 * insures that.
3777 				 */
3778 				membar_producer();
3779 				sq->sq_ciputctrl = cip;
3780 				mutex_exit(SQLOCK(sq));
3781 			}
3782 		}
3783 		ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
3784 		if (i == 1)
3785 			break;
3786 		q = _OTHERQ(q);
3787 		if (!(q->q_flag & QPERQ)) {
3788 			ASSERT(sq == q->q_syncq);
3789 			break;
3790 		}
3791 		ASSERT(q->q_syncq != NULL);
3792 		ASSERT(sq != q->q_syncq);
3793 		sq = q->q_syncq;
3794 		ASSERT(sq->sq_type & SQ_CIPUT);
3795 	}
3796 }
3797 
3798 /*
3799  * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
3800  * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
3801  * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
3802  * starting from q and down to the driver.
3803  *
3804  * This should be called after the affected queues are part of stream
3805  * geometry. It should be called from driver/module open routine after
3806  * qprocson() call. It is also called from nfs syscall where it is known that
3807  * stream is configured and won't change its geometry during create_putlock
3808  * call.
3809  *
3810  * caller normally uses 0 value for the stream argument to speed up MT putnext
3811  * into the perimeter of q for example because its perimeter is per module
3812  * (e.g. IP).
3813  *
3814  * caller normally uses non 0 value for the stream argument to hint the system
3815  * that the stream of q is a very contended global system stream
3816  * (e.g. NFS/UDP) and the part of the stream from q to the driver is
3817  * particularly MT hot.
3818  *
3819  * Caller insures stream plumbing won't happen while we are here and therefore
3820  * q_next can be safely used.
3821  */
3822 
3823 void
3824 create_putlocks(queue_t *q, int stream)
3825 {
3826 	ciputctrl_t	*cip;
3827 	struct stdata	*stp = STREAM(q);
3828 
3829 	q = _WR(q);
3830 	ASSERT(stp != NULL);
3831 
3832 	if (disable_putlocks != 0)
3833 		return;
3834 
3835 	if (n_ciputctrl < min_n_ciputctrl)
3836 		return;
3837 
3838 	ASSERT(ciputctrl_cache != NULL);
3839 
3840 	if (stream != 0 && stp->sd_ciputctrl == NULL) {
3841 		cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
3842 		SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
3843 		mutex_enter(&stp->sd_lock);
3844 		if (stp->sd_ciputctrl != NULL) {
3845 			mutex_exit(&stp->sd_lock);
3846 			kmem_cache_free(ciputctrl_cache, cip);
3847 		} else {
3848 			ASSERT(stp->sd_nciputctrl == 0);
3849 			stp->sd_nciputctrl = n_ciputctrl - 1;
3850 			/*
3851 			 * putnext checks sd_ciputctrl without holding
3852 			 * sd_lock. if it is not NULL putnext assumes
3853 			 * sd_nciputctrl is initialized. membar below
3854 			 * insures that.
3855 			 */
3856 			membar_producer();
3857 			stp->sd_ciputctrl = cip;
3858 			mutex_exit(&stp->sd_lock);
3859 		}
3860 	}
3861 
3862 	ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);
3863 
3864 	while (_SAMESTR(q)) {
3865 		create_syncq_putlocks(q);
3866 		if (stream == 0)
3867 			return;
3868 		q = q->q_next;
3869 	}
3870 	ASSERT(q != NULL);
3871 	create_syncq_putlocks(q);
3872 }
3873 
3874 /*
3875  * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
3876  * through a stream.
3877  *
3878  * Data currently record per event is a hrtime stamp, queue address, event
3879  * type, and a per type datum.  Much of the STREAMS framework is instrumented
3880  * for automatic flow tracing (when enabled).  Events can be defined and used
3881  * by STREAMS modules and drivers.
3882  *
3883  * Global objects:
3884  *
3885  *	str_ftevent() - Add a flow-trace event to a dblk.
3886  *	str_ftfree() - Free flow-trace data
3887  *
3888  * Local objects:
3889  *
3890  *	fthdr_cache - pointer to the kmem cache for trace header.
3891  *	ftblk_cache - pointer to the kmem cache for trace data blocks.
3892  */
3893 
3894 int str_ftnever = 1;	/* Don't do STREAMS flow tracing */
3895 
3896 void
3897 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
3898 {
3899 	ftblk_t *bp = hp->tail;
3900 	ftblk_t *nbp;
3901 	ftevnt_t *ep;
3902 	int ix, nix;
3903 
3904 	ASSERT(hp != NULL);
3905 
3906 	for (;;) {
3907 		if ((ix = bp->ix) == FTBLK_EVNTS) {
3908 			/*
3909 			 * Tail doesn't have room, so need a new tail.
3910 			 *
3911 			 * To make this MT safe, first, allocate a new
3912 			 * ftblk, and initialize it.  To make life a
3913 			 * little easier, reserve the first slot (mostly
3914 			 * by making ix = 1).  When we are finished with
3915 			 * the initialization, CAS this pointer to the
3916 			 * tail.  If this succeeds, this is the new
3917 			 * "next" block.  Otherwise, another thread
3918 			 * got here first, so free the block and start
3919 			 * again.
3920 			 */
3921 			if (!(nbp = kmem_cache_alloc(ftblk_cache,
3922 			    KM_NOSLEEP))) {
3923 				/* no mem, so punt */
3924 				str_ftnever++;
3925 				/* free up all flow data? */
3926 				return;
3927 			}
3928 			nbp->nxt = NULL;
3929 			nbp->ix = 1;
3930 			/*
3931 			 * Just in case there is another thread about
3932 			 * to get the next index, we need to make sure
3933 			 * the value is there for it.
3934 			 */
3935 			membar_producer();
3936 			if (casptr(&hp->tail, bp, nbp) == bp) {
3937 				/* CAS was successful */
3938 				bp->nxt = nbp;
3939 				membar_producer();
3940 				bp = nbp;
3941 				ix = 0;
3942 				goto cas_good;
3943 			} else {
3944 				kmem_cache_free(ftblk_cache, nbp);
3945 				bp = hp->tail;
3946 				continue;
3947 			}
3948 		}
3949 		nix = ix + 1;
3950 		if (cas32((uint32_t *)&bp->ix, ix, nix) == ix) {
3951 		cas_good:
3952 			if (curthread != hp->thread) {
3953 				hp->thread = curthread;
3954 				evnt |= FTEV_CS;
3955 			}
3956 			if (CPU->cpu_seqid != hp->cpu_seqid) {
3957 				hp->cpu_seqid = CPU->cpu_seqid;
3958 				evnt |= FTEV_PS;
3959 			}
3960 			ep = &bp->ev[ix];
3961 			break;
3962 		}
3963 	}
3964 
3965 	if (evnt & FTEV_QMASK) {
3966 		queue_t *qp = p;
3967 
3968 		/*
3969 		 * It is possible that the module info is broke
3970 		 * (as is logsubr.c at this comment writing).
3971 		 * Instead of panicing or doing other unmentionables,
3972 		 * we shall put a dummy name as the mid, and continue.
3973 		 */
3974 		if (qp->q_qinfo == NULL)
3975 			ep->mid = "NONAME";
3976 		else
3977 			ep->mid = qp->q_qinfo->qi_minfo->mi_idname;
3978 
3979 		if (!(qp->q_flag & QREADR))
3980 			evnt |= FTEV_ISWR;
3981 	} else {
3982 		ep->mid = (char *)p;
3983 	}
3984 
3985 	ep->ts = gethrtime();
3986 	ep->evnt = evnt;
3987 	ep->data = data;
3988 	hp->hash = (hp->hash << 9) + hp->hash;
3989 	hp->hash += (evnt << 16) | data;
3990 	hp->hash += (uintptr_t)ep->mid;
3991 }
3992 
3993 /*
3994  * Free flow-trace data.
3995  */
3996 void
3997 str_ftfree(dblk_t *dbp)
3998 {
3999 	fthdr_t *hp = dbp->db_fthdr;
4000 	ftblk_t *bp = &hp->first;
4001 	ftblk_t *nbp;
4002 
4003 	if (bp != hp->tail || bp->ix != 0) {
4004 		/*
4005 		 * Clear out the hash, have the tail point to itself, and free
4006 		 * any continuation blocks.
4007 		 */
4008 		bp = hp->first.nxt;
4009 		hp->tail = &hp->first;
4010 		hp->hash = 0;
4011 		hp->first.nxt = NULL;
4012 		hp->first.ix = 0;
4013 		while (bp != NULL) {
4014 			nbp = bp->nxt;
4015 			kmem_cache_free(ftblk_cache, bp);
4016 			bp = nbp;
4017 		}
4018 	}
4019 	kmem_cache_free(fthdr_cache, hp);
4020 	dbp->db_fthdr = NULL;
4021 }
4022