xref: /titanic_44/usr/src/uts/common/io/stream.c (revision 208e825d0597a017edee1b095c64040043c0c673)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
23 /*	  All Rights Reserved  	*/
24 
25 
26 /*
27  * Copyright 2005 Sun Microsystems, Inc.  All rights reserved.
28  * Use is subject to license terms.
29  */
30 
31 #pragma ident	"%Z%%M%	%I%	%E% SMI"
32 
33 #include <sys/types.h>
34 #include <sys/param.h>
35 #include <sys/thread.h>
36 #include <sys/sysmacros.h>
37 #include <sys/stropts.h>
38 #include <sys/stream.h>
39 #include <sys/strsubr.h>
40 #include <sys/strsun.h>
41 #include <sys/conf.h>
42 #include <sys/debug.h>
43 #include <sys/cmn_err.h>
44 #include <sys/kmem.h>
45 #include <sys/atomic.h>
46 #include <sys/errno.h>
47 #include <sys/vtrace.h>
48 #include <sys/ftrace.h>
49 #include <sys/ontrap.h>
50 #include <sys/multidata.h>
51 #include <sys/multidata_impl.h>
52 #include <sys/sdt.h>
53 #include <sys/strft.h>
54 
55 #ifdef DEBUG
56 #include <sys/kmem_impl.h>
57 #endif
58 
59 /*
60  * This file contains all the STREAMS utility routines that may
61  * be used by modules and drivers.
62  */
63 
64 /*
65  * STREAMS message allocator: principles of operation
66  *
67  * The streams message allocator consists of all the routines that
68  * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
69  * dupb(), freeb() and freemsg().  What follows is a high-level view
70  * of how the allocator works.
71  *
72  * Every streams message consists of one or more mblks, a dblk, and data.
73  * All mblks for all types of messages come from a common mblk_cache.
74  * The dblk and data come in several flavors, depending on how the
75  * message is allocated:
76  *
77  * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
78  *     fixed-size dblk/data caches. For message sizes that are multiples of
79  *     PAGESIZE, dblks are allocated separately from the buffer.
80  *     The associated buffer is allocated by the constructor using kmem_alloc().
81  *     For all other message sizes, dblk and its associated data is allocated
82  *     as a single contiguous chunk of memory.
83  *     Objects in these caches consist of a dblk plus its associated data.
84  *     allocb() determines the nearest-size cache by table lookup:
85  *     the dblk_cache[] array provides the mapping from size to dblk cache.
86  *
87  * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
88  *     kmem_alloc()'ing a buffer for the data and supplying that
89  *     buffer to gesballoc(), described below.
90  *
91  * (3) The four flavors of [d]esballoc[a] are all implemented by a
92  *     common routine, gesballoc() ("generic esballoc").  gesballoc()
93  *     allocates a dblk from the global dblk_esb_cache and sets db_base,
94  *     db_lim and db_frtnp to describe the caller-supplied buffer.
95  *
96  * While there are several routines to allocate messages, there is only
97  * one routine to free messages: freeb().  freeb() simply invokes the
98  * dblk's free method, dbp->db_free(), which is set at allocation time.
99  *
100  * dupb() creates a new reference to a message by allocating a new mblk,
101  * incrementing the dblk reference count and setting the dblk's free
102  * method to dblk_decref().  The dblk's original free method is retained
103  * in db_lastfree.  dblk_decref() decrements the reference count on each
104  * freeb().  If this is not the last reference it just frees the mblk;
105  * if this *is* the last reference, it restores db_free to db_lastfree,
106  * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
107  *
108  * The implementation makes aggressive use of kmem object caching for
109  * maximum performance.  This makes the code simple and compact, but
110  * also a bit abstruse in some places.  The invariants that constitute a
111  * message's constructed state, described below, are more subtle than usual.
112  *
113  * Every dblk has an "attached mblk" as part of its constructed state.
114  * The mblk is allocated by the dblk's constructor and remains attached
115  * until the message is either dup'ed or pulled up.  In the dupb() case
116  * the mblk association doesn't matter until the last free, at which time
117  * dblk_decref() attaches the last mblk to the dblk.  pullupmsg() affects
118  * the mblk association because it swaps the leading mblks of two messages,
119  * so it is responsible for swapping their db_mblk pointers accordingly.
120  * From a constructed-state viewpoint it doesn't matter that a dblk's
121  * attached mblk can change while the message is allocated; all that
122  * matters is that the dblk has *some* attached mblk when it's freed.
123  *
124  * The sizes of the allocb() small-message caches are not magical.
125  * They represent a good trade-off between internal and external
126  * fragmentation for current workloads.  They should be reevaluated
127  * periodically, especially if allocations larger than DBLK_MAX_CACHE
128  * become common.  We use 64-byte alignment so that dblks don't
129  * straddle cache lines unnecessarily.
130  */
131 #define	DBLK_MAX_CACHE		73728
132 #define	DBLK_CACHE_ALIGN	64
133 #define	DBLK_MIN_SIZE		8
134 #define	DBLK_SIZE_SHIFT		3
135 
136 #ifdef _BIG_ENDIAN
137 #define	DBLK_RTFU_SHIFT(field)	\
138 	(8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
139 #else
140 #define	DBLK_RTFU_SHIFT(field)	\
141 	(8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
142 #endif
143 
144 #define	DBLK_RTFU(ref, type, flags, uioflag)	\
145 	(((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
146 	((type) << DBLK_RTFU_SHIFT(db_type)) | \
147 	(((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
148 	((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
149 #define	DBLK_RTFU_REF_MASK	(DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
150 #define	DBLK_RTFU_WORD(dbp)	(*((uint32_t *)&(dbp)->db_ref))
151 #define	MBLK_BAND_FLAG_WORD(mp)	(*((uint32_t *)&(mp)->b_band))
152 
153 static size_t dblk_sizes[] = {
154 #ifdef _LP64
155 	16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3920,
156 	8192, 12112, 16384, 20304, 24576, 28496, 32768, 36688,
157 	40960, 44880, 49152, 53072, 57344, 61264, 65536, 69456,
158 #else
159 	64, 128, 320, 576, 1088, 1536, 1984, 2624, 3968,
160 	8192, 12160, 16384, 20352, 24576, 28544, 32768, 36736,
161 	40960, 44928, 49152, 53120, 57344, 61312, 65536, 69504,
162 #endif
163 	DBLK_MAX_CACHE, 0
164 };
165 
166 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
167 static struct kmem_cache *mblk_cache;
168 static struct kmem_cache *dblk_esb_cache;
169 static struct kmem_cache *fthdr_cache;
170 static struct kmem_cache *ftblk_cache;
171 
172 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
173 static mblk_t *allocb_oversize(size_t size, int flags);
174 static int allocb_tryhard_fails;
175 static void frnop_func(void *arg);
176 frtn_t frnop = { frnop_func };
177 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
178 
179 static boolean_t rwnext_enter(queue_t *qp);
180 static void rwnext_exit(queue_t *qp);
181 
182 /*
183  * Patchable mblk/dblk kmem_cache flags.
184  */
185 int dblk_kmem_flags = 0;
186 int mblk_kmem_flags = 0;
187 
188 
189 static int
190 dblk_constructor(void *buf, void *cdrarg, int kmflags)
191 {
192 	dblk_t *dbp = buf;
193 	ssize_t msg_size = (ssize_t)cdrarg;
194 	size_t index;
195 
196 	ASSERT(msg_size != 0);
197 
198 	index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
199 
200 	ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
201 
202 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
203 		return (-1);
204 	if ((msg_size & PAGEOFFSET) == 0) {
205 		dbp->db_base = kmem_alloc(msg_size, kmflags);
206 		if (dbp->db_base == NULL) {
207 			kmem_cache_free(mblk_cache, dbp->db_mblk);
208 			return (-1);
209 		}
210 	} else {
211 		dbp->db_base = (unsigned char *)&dbp[1];
212 	}
213 
214 	dbp->db_mblk->b_datap = dbp;
215 	dbp->db_cache = dblk_cache[index];
216 	dbp->db_lim = dbp->db_base + msg_size;
217 	dbp->db_free = dbp->db_lastfree = dblk_lastfree;
218 	dbp->db_frtnp = NULL;
219 	dbp->db_fthdr = NULL;
220 	dbp->db_credp = NULL;
221 	dbp->db_cpid = -1;
222 	dbp->db_struioflag = 0;
223 	dbp->db_struioun.cksum.flags = 0;
224 	return (0);
225 }
226 
227 /*ARGSUSED*/
228 static int
229 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
230 {
231 	dblk_t *dbp = buf;
232 
233 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
234 		return (-1);
235 	dbp->db_mblk->b_datap = dbp;
236 	dbp->db_cache = dblk_esb_cache;
237 	dbp->db_fthdr = NULL;
238 	dbp->db_credp = NULL;
239 	dbp->db_cpid = -1;
240 	dbp->db_struioflag = 0;
241 	dbp->db_struioun.cksum.flags = 0;
242 	return (0);
243 }
244 
245 static int
246 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
247 {
248 	dblk_t *dbp = buf;
249 	bcache_t *bcp = (bcache_t *)cdrarg;
250 
251 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
252 		return (-1);
253 
254 	if ((dbp->db_base = (unsigned char *)kmem_cache_alloc(bcp->buffer_cache,
255 	    kmflags)) == NULL) {
256 		kmem_cache_free(mblk_cache, dbp->db_mblk);
257 		return (-1);
258 	}
259 
260 	dbp->db_mblk->b_datap = dbp;
261 	dbp->db_cache = (void *)bcp;
262 	dbp->db_lim = dbp->db_base + bcp->size;
263 	dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
264 	dbp->db_frtnp = NULL;
265 	dbp->db_fthdr = NULL;
266 	dbp->db_credp = NULL;
267 	dbp->db_cpid = -1;
268 	dbp->db_struioflag = 0;
269 	dbp->db_struioun.cksum.flags = 0;
270 	return (0);
271 }
272 
273 /*ARGSUSED*/
274 static void
275 dblk_destructor(void *buf, void *cdrarg)
276 {
277 	dblk_t *dbp = buf;
278 	ssize_t msg_size = (ssize_t)cdrarg;
279 
280 	ASSERT(dbp->db_mblk->b_datap == dbp);
281 
282 	ASSERT(msg_size != 0);
283 
284 	ASSERT(dbp->db_struioflag == 0);
285 	ASSERT(dbp->db_struioun.cksum.flags == 0);
286 
287 	if ((msg_size & PAGEOFFSET) == 0) {
288 		kmem_free(dbp->db_base, msg_size);
289 	}
290 
291 	kmem_cache_free(mblk_cache, dbp->db_mblk);
292 }
293 
294 static void
295 bcache_dblk_destructor(void *buf, void *cdrarg)
296 {
297 	dblk_t *dbp = buf;
298 	bcache_t *bcp = (bcache_t *)cdrarg;
299 
300 	kmem_cache_free(bcp->buffer_cache, dbp->db_base);
301 
302 	ASSERT(dbp->db_mblk->b_datap == dbp);
303 
304 	ASSERT(dbp->db_struioflag == 0);
305 	ASSERT(dbp->db_struioun.cksum.flags == 0);
306 
307 	kmem_cache_free(mblk_cache, dbp->db_mblk);
308 }
309 
310 void
311 streams_msg_init(void)
312 {
313 	char name[40];
314 	size_t size;
315 	size_t lastsize = DBLK_MIN_SIZE;
316 	size_t *sizep;
317 	struct kmem_cache *cp;
318 	size_t tot_size;
319 	int offset;
320 
321 	mblk_cache = kmem_cache_create("streams_mblk",
322 		sizeof (mblk_t), 32, NULL, NULL, NULL, NULL, NULL,
323 		mblk_kmem_flags);
324 
325 	for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
326 
327 		if ((offset = (size & PAGEOFFSET)) != 0) {
328 			/*
329 			 * We are in the middle of a page, dblk should
330 			 * be allocated on the same page
331 			 */
332 			tot_size = size + sizeof (dblk_t);
333 			ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
334 								< PAGESIZE);
335 			ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
336 
337 		} else {
338 
339 			/*
340 			 * buf size is multiple of page size, dblk and
341 			 * buffer are allocated separately.
342 			 */
343 
344 			ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
345 			tot_size = sizeof (dblk_t);
346 		}
347 
348 		(void) sprintf(name, "streams_dblk_%ld", size);
349 		cp = kmem_cache_create(name, tot_size,
350 			DBLK_CACHE_ALIGN, dblk_constructor,
351 			dblk_destructor, NULL,
352 			(void *)(size), NULL, dblk_kmem_flags);
353 
354 		while (lastsize <= size) {
355 			dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
356 			lastsize += DBLK_MIN_SIZE;
357 		}
358 	}
359 
360 	dblk_esb_cache = kmem_cache_create("streams_dblk_esb",
361 			sizeof (dblk_t), DBLK_CACHE_ALIGN,
362 			dblk_esb_constructor, dblk_destructor, NULL,
363 			(void *) sizeof (dblk_t), NULL, dblk_kmem_flags);
364 	fthdr_cache = kmem_cache_create("streams_fthdr",
365 		sizeof (fthdr_t), 32, NULL, NULL, NULL, NULL, NULL, 0);
366 	ftblk_cache = kmem_cache_create("streams_ftblk",
367 		sizeof (ftblk_t), 32, NULL, NULL, NULL, NULL, NULL, 0);
368 
369 	/* Initialize Multidata caches */
370 	mmd_init();
371 }
372 
373 /*ARGSUSED*/
374 mblk_t *
375 allocb(size_t size, uint_t pri)
376 {
377 	dblk_t *dbp;
378 	mblk_t *mp;
379 	size_t index;
380 
381 	index =  (size - 1)  >> DBLK_SIZE_SHIFT;
382 
383 	if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
384 		if (size != 0) {
385 			mp = allocb_oversize(size, KM_NOSLEEP);
386 			goto out;
387 		}
388 		index = 0;
389 	}
390 
391 	if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
392 		mp = NULL;
393 		goto out;
394 	}
395 
396 	mp = dbp->db_mblk;
397 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
398 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
399 	mp->b_rptr = mp->b_wptr = dbp->db_base;
400 	mp->b_queue = NULL;
401 	MBLK_BAND_FLAG_WORD(mp) = 0;
402 	STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
403 out:
404 	FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
405 
406 	return (mp);
407 }
408 
409 mblk_t *
410 allocb_tmpl(size_t size, const mblk_t *tmpl)
411 {
412 	mblk_t *mp = allocb(size, 0);
413 
414 	if (mp != NULL) {
415 		cred_t *cr = DB_CRED(tmpl);
416 		if (cr != NULL)
417 			crhold(mp->b_datap->db_credp = cr);
418 		DB_CPID(mp) = DB_CPID(tmpl);
419 		DB_TYPE(mp) = DB_TYPE(tmpl);
420 	}
421 	return (mp);
422 }
423 
424 mblk_t *
425 allocb_cred(size_t size, cred_t *cr)
426 {
427 	mblk_t *mp = allocb(size, 0);
428 
429 	if (mp != NULL && cr != NULL)
430 		crhold(mp->b_datap->db_credp = cr);
431 
432 	return (mp);
433 }
434 
435 mblk_t *
436 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr)
437 {
438 	mblk_t *mp = allocb_wait(size, 0, flags, error);
439 
440 	if (mp != NULL && cr != NULL)
441 		crhold(mp->b_datap->db_credp = cr);
442 
443 	return (mp);
444 }
445 
446 void
447 freeb(mblk_t *mp)
448 {
449 	dblk_t *dbp = mp->b_datap;
450 
451 	ASSERT(dbp->db_ref > 0);
452 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
453 	FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
454 
455 	STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
456 
457 	dbp->db_free(mp, dbp);
458 }
459 
460 void
461 freemsg(mblk_t *mp)
462 {
463 	FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
464 	while (mp) {
465 		dblk_t *dbp = mp->b_datap;
466 		mblk_t *mp_cont = mp->b_cont;
467 
468 		ASSERT(dbp->db_ref > 0);
469 		ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
470 
471 		STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
472 
473 		dbp->db_free(mp, dbp);
474 		mp = mp_cont;
475 	}
476 }
477 
478 /*
479  * Reallocate a block for another use.  Try hard to use the old block.
480  * If the old data is wanted (copy), leave b_wptr at the end of the data,
481  * otherwise return b_wptr = b_rptr.
482  *
483  * This routine is private and unstable.
484  */
485 mblk_t	*
486 reallocb(mblk_t *mp, size_t size, uint_t copy)
487 {
488 	mblk_t		*mp1;
489 	unsigned char	*old_rptr;
490 	ptrdiff_t	cur_size;
491 
492 	if (mp == NULL)
493 		return (allocb(size, BPRI_HI));
494 
495 	cur_size = mp->b_wptr - mp->b_rptr;
496 	old_rptr = mp->b_rptr;
497 
498 	ASSERT(mp->b_datap->db_ref != 0);
499 
500 	if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
501 		/*
502 		 * If the data is wanted and it will fit where it is, no
503 		 * work is required.
504 		 */
505 		if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
506 			return (mp);
507 
508 		mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
509 		mp1 = mp;
510 	} else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
511 		/* XXX other mp state could be copied too, db_flags ... ? */
512 		mp1->b_cont = mp->b_cont;
513 	} else {
514 		return (NULL);
515 	}
516 
517 	if (copy) {
518 		bcopy(old_rptr, mp1->b_rptr, cur_size);
519 		mp1->b_wptr = mp1->b_rptr + cur_size;
520 	}
521 
522 	if (mp != mp1)
523 		freeb(mp);
524 
525 	return (mp1);
526 }
527 
528 static void
529 dblk_lastfree(mblk_t *mp, dblk_t *dbp)
530 {
531 	ASSERT(dbp->db_mblk == mp);
532 	if (dbp->db_fthdr != NULL)
533 		str_ftfree(dbp);
534 
535 	/* set credp and projid to be 'unspecified' before returning to cache */
536 	if (dbp->db_credp != NULL) {
537 		crfree(dbp->db_credp);
538 		dbp->db_credp = NULL;
539 	}
540 	dbp->db_cpid = -1;
541 
542 	/* Reset the struioflag and the checksum flag fields */
543 	dbp->db_struioflag = 0;
544 	dbp->db_struioun.cksum.flags = 0;
545 
546 	/* and the COOKED flag */
547 	dbp->db_flags &= ~DBLK_COOKED;
548 
549 	kmem_cache_free(dbp->db_cache, dbp);
550 }
551 
552 static void
553 dblk_decref(mblk_t *mp, dblk_t *dbp)
554 {
555 	if (dbp->db_ref != 1) {
556 		uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
557 		    -(1 << DBLK_RTFU_SHIFT(db_ref)));
558 		/*
559 		 * atomic_add_32_nv() just decremented db_ref, so we no longer
560 		 * have a reference to the dblk, which means another thread
561 		 * could free it.  Therefore we cannot examine the dblk to
562 		 * determine whether ours was the last reference.  Instead,
563 		 * we extract the new and minimum reference counts from rtfu.
564 		 * Note that all we're really saying is "if (ref != refmin)".
565 		 */
566 		if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
567 		    ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
568 			kmem_cache_free(mblk_cache, mp);
569 			return;
570 		}
571 	}
572 	dbp->db_mblk = mp;
573 	dbp->db_free = dbp->db_lastfree;
574 	dbp->db_lastfree(mp, dbp);
575 }
576 
577 mblk_t *
578 dupb(mblk_t *mp)
579 {
580 	dblk_t *dbp = mp->b_datap;
581 	mblk_t *new_mp;
582 	uint32_t oldrtfu, newrtfu;
583 
584 	if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
585 		goto out;
586 
587 	new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
588 	new_mp->b_rptr = mp->b_rptr;
589 	new_mp->b_wptr = mp->b_wptr;
590 	new_mp->b_datap = dbp;
591 	new_mp->b_queue = NULL;
592 	MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
593 
594 	STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
595 
596 	/*
597 	 * First-dup optimization.  The enabling assumption is that there
598 	 * can can never be a race (in correct code) to dup the first copy
599 	 * of a message.  Therefore we don't need to do it atomically.
600 	 */
601 	if (dbp->db_free != dblk_decref) {
602 		dbp->db_free = dblk_decref;
603 		dbp->db_ref++;
604 		goto out;
605 	}
606 
607 	do {
608 		ASSERT(dbp->db_ref > 0);
609 		oldrtfu = DBLK_RTFU_WORD(dbp);
610 		newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
611 		/*
612 		 * If db_ref is maxed out we can't dup this message anymore.
613 		 */
614 		if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
615 			kmem_cache_free(mblk_cache, new_mp);
616 			new_mp = NULL;
617 			goto out;
618 		}
619 	} while (cas32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != oldrtfu);
620 
621 out:
622 	FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
623 	return (new_mp);
624 }
625 
626 static void
627 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
628 {
629 	frtn_t *frp = dbp->db_frtnp;
630 
631 	ASSERT(dbp->db_mblk == mp);
632 	frp->free_func(frp->free_arg);
633 	if (dbp->db_fthdr != NULL)
634 		str_ftfree(dbp);
635 
636 	/* set credp and projid to be 'unspecified' before returning to cache */
637 	if (dbp->db_credp != NULL) {
638 		crfree(dbp->db_credp);
639 		dbp->db_credp = NULL;
640 	}
641 	dbp->db_cpid = -1;
642 	dbp->db_struioflag = 0;
643 	dbp->db_struioun.cksum.flags = 0;
644 
645 	kmem_cache_free(dbp->db_cache, dbp);
646 }
647 
648 /*ARGSUSED*/
649 static void
650 frnop_func(void *arg)
651 {
652 }
653 
654 /*
655  * Generic esballoc used to implement the four flavors: [d]esballoc[a].
656  */
657 static mblk_t *
658 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
659 	void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
660 {
661 	dblk_t *dbp;
662 	mblk_t *mp;
663 
664 	ASSERT(base != NULL && frp != NULL);
665 
666 	if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
667 		mp = NULL;
668 		goto out;
669 	}
670 
671 	mp = dbp->db_mblk;
672 	dbp->db_base = base;
673 	dbp->db_lim = base + size;
674 	dbp->db_free = dbp->db_lastfree = lastfree;
675 	dbp->db_frtnp = frp;
676 	DBLK_RTFU_WORD(dbp) = db_rtfu;
677 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
678 	mp->b_rptr = mp->b_wptr = base;
679 	mp->b_queue = NULL;
680 	MBLK_BAND_FLAG_WORD(mp) = 0;
681 
682 out:
683 	FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
684 	return (mp);
685 }
686 
687 /*ARGSUSED*/
688 mblk_t *
689 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
690 {
691 	mblk_t *mp;
692 
693 	/*
694 	 * Note that this is structured to allow the common case (i.e.
695 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
696 	 * call optimization.
697 	 */
698 	if (!str_ftnever) {
699 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
700 		    frp, freebs_enqueue, KM_NOSLEEP);
701 
702 		if (mp != NULL)
703 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
704 		return (mp);
705 	}
706 
707 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
708 	    frp, freebs_enqueue, KM_NOSLEEP));
709 }
710 
711 /*
712  * Same as esballoc() but sleeps waiting for memory.
713  */
714 /*ARGSUSED*/
715 mblk_t *
716 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
717 {
718 	mblk_t *mp;
719 
720 	/*
721 	 * Note that this is structured to allow the common case (i.e.
722 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
723 	 * call optimization.
724 	 */
725 	if (!str_ftnever) {
726 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
727 		    frp, freebs_enqueue, KM_SLEEP);
728 
729 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
730 		return (mp);
731 	}
732 
733 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
734 	    frp, freebs_enqueue, KM_SLEEP));
735 }
736 
737 /*ARGSUSED*/
738 mblk_t *
739 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
740 {
741 	mblk_t *mp;
742 
743 	/*
744 	 * Note that this is structured to allow the common case (i.e.
745 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
746 	 * call optimization.
747 	 */
748 	if (!str_ftnever) {
749 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
750 			frp, dblk_lastfree_desb, KM_NOSLEEP);
751 
752 		if (mp != NULL)
753 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
754 		return (mp);
755 	}
756 
757 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
758 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
759 }
760 
761 /*ARGSUSED*/
762 mblk_t *
763 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
764 {
765 	mblk_t *mp;
766 
767 	/*
768 	 * Note that this is structured to allow the common case (i.e.
769 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
770 	 * call optimization.
771 	 */
772 	if (!str_ftnever) {
773 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
774 		    frp, freebs_enqueue, KM_NOSLEEP);
775 
776 		if (mp != NULL)
777 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
778 		return (mp);
779 	}
780 
781 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
782 	    frp, freebs_enqueue, KM_NOSLEEP));
783 }
784 
785 /*ARGSUSED*/
786 mblk_t *
787 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
788 {
789 	mblk_t *mp;
790 
791 	/*
792 	 * Note that this is structured to allow the common case (i.e.
793 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
794 	 * call optimization.
795 	 */
796 	if (!str_ftnever) {
797 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
798 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
799 
800 		if (mp != NULL)
801 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
802 		return (mp);
803 	}
804 
805 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
806 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
807 }
808 
809 static void
810 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
811 {
812 	bcache_t *bcp = dbp->db_cache;
813 
814 	ASSERT(dbp->db_mblk == mp);
815 	if (dbp->db_fthdr != NULL)
816 		str_ftfree(dbp);
817 
818 	/* set credp and projid to be 'unspecified' before returning to cache */
819 	if (dbp->db_credp != NULL) {
820 		crfree(dbp->db_credp);
821 		dbp->db_credp = NULL;
822 	}
823 	dbp->db_cpid = -1;
824 	dbp->db_struioflag = 0;
825 	dbp->db_struioun.cksum.flags = 0;
826 
827 	mutex_enter(&bcp->mutex);
828 	kmem_cache_free(bcp->dblk_cache, dbp);
829 	bcp->alloc--;
830 
831 	if (bcp->alloc == 0 && bcp->destroy != 0) {
832 		kmem_cache_destroy(bcp->dblk_cache);
833 		kmem_cache_destroy(bcp->buffer_cache);
834 		mutex_exit(&bcp->mutex);
835 		mutex_destroy(&bcp->mutex);
836 		kmem_free(bcp, sizeof (bcache_t));
837 	} else {
838 		mutex_exit(&bcp->mutex);
839 	}
840 }
841 
842 bcache_t *
843 bcache_create(char *name, size_t size, uint_t align)
844 {
845 	bcache_t *bcp;
846 	char buffer[255];
847 
848 	ASSERT((align & (align - 1)) == 0);
849 
850 	if ((bcp = (bcache_t *)kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) ==
851 	    NULL) {
852 		return (NULL);
853 	}
854 
855 	bcp->size = size;
856 	bcp->align = align;
857 	bcp->alloc = 0;
858 	bcp->destroy = 0;
859 
860 	mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
861 
862 	(void) sprintf(buffer, "%s_buffer_cache", name);
863 	bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
864 	    NULL, NULL, NULL, 0);
865 	(void) sprintf(buffer, "%s_dblk_cache", name);
866 	bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
867 	    DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
868 						NULL, (void *)bcp, NULL, 0);
869 
870 	return (bcp);
871 }
872 
873 void
874 bcache_destroy(bcache_t *bcp)
875 {
876 	ASSERT(bcp != NULL);
877 
878 	mutex_enter(&bcp->mutex);
879 	if (bcp->alloc == 0) {
880 		kmem_cache_destroy(bcp->dblk_cache);
881 		kmem_cache_destroy(bcp->buffer_cache);
882 		mutex_exit(&bcp->mutex);
883 		mutex_destroy(&bcp->mutex);
884 		kmem_free(bcp, sizeof (bcache_t));
885 	} else {
886 		bcp->destroy++;
887 		mutex_exit(&bcp->mutex);
888 	}
889 }
890 
891 /*ARGSUSED*/
892 mblk_t *
893 bcache_allocb(bcache_t *bcp, uint_t pri)
894 {
895 	dblk_t *dbp;
896 	mblk_t *mp = NULL;
897 
898 	ASSERT(bcp != NULL);
899 
900 	mutex_enter(&bcp->mutex);
901 	if (bcp->destroy != 0) {
902 		mutex_exit(&bcp->mutex);
903 		goto out;
904 	}
905 
906 	if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
907 		mutex_exit(&bcp->mutex);
908 		goto out;
909 	}
910 	bcp->alloc++;
911 	mutex_exit(&bcp->mutex);
912 
913 	ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
914 
915 	mp = dbp->db_mblk;
916 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
917 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
918 	mp->b_rptr = mp->b_wptr = dbp->db_base;
919 	mp->b_queue = NULL;
920 	MBLK_BAND_FLAG_WORD(mp) = 0;
921 	STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
922 out:
923 	FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
924 
925 	return (mp);
926 }
927 
928 static void
929 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
930 {
931 	ASSERT(dbp->db_mblk == mp);
932 	if (dbp->db_fthdr != NULL)
933 		str_ftfree(dbp);
934 
935 	/* set credp and projid to be 'unspecified' before returning to cache */
936 	if (dbp->db_credp != NULL) {
937 		crfree(dbp->db_credp);
938 		dbp->db_credp = NULL;
939 	}
940 	dbp->db_cpid = -1;
941 	dbp->db_struioflag = 0;
942 	dbp->db_struioun.cksum.flags = 0;
943 
944 	kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
945 	kmem_cache_free(dbp->db_cache, dbp);
946 }
947 
948 static mblk_t *
949 allocb_oversize(size_t size, int kmflags)
950 {
951 	mblk_t *mp;
952 	void *buf;
953 
954 	size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
955 	if ((buf = kmem_alloc(size, kmflags)) == NULL)
956 		return (NULL);
957 	if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
958 	    &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
959 		kmem_free(buf, size);
960 
961 	if (mp != NULL)
962 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
963 
964 	return (mp);
965 }
966 
967 mblk_t *
968 allocb_tryhard(size_t target_size)
969 {
970 	size_t size;
971 	mblk_t *bp;
972 
973 	for (size = target_size; size < target_size + 512;
974 	    size += DBLK_CACHE_ALIGN)
975 		if ((bp = allocb(size, BPRI_HI)) != NULL)
976 			return (bp);
977 	allocb_tryhard_fails++;
978 	return (NULL);
979 }
980 
981 /*
982  * This routine is consolidation private for STREAMS internal use
983  * This routine may only be called from sync routines (i.e., not
984  * from put or service procedures).  It is located here (rather
985  * than strsubr.c) so that we don't have to expose all of the
986  * allocb() implementation details in header files.
987  */
988 mblk_t *
989 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
990 {
991 	dblk_t *dbp;
992 	mblk_t *mp;
993 	size_t index;
994 
995 	index = (size -1) >> DBLK_SIZE_SHIFT;
996 
997 	if (flags & STR_NOSIG) {
998 		if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
999 			if (size != 0) {
1000 				mp = allocb_oversize(size, KM_SLEEP);
1001 				FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
1002 				    (uintptr_t)mp);
1003 				return (mp);
1004 			}
1005 			index = 0;
1006 		}
1007 
1008 		dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
1009 		mp = dbp->db_mblk;
1010 		DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1011 		mp->b_next = mp->b_prev = mp->b_cont = NULL;
1012 		mp->b_rptr = mp->b_wptr = dbp->db_base;
1013 		mp->b_queue = NULL;
1014 		MBLK_BAND_FLAG_WORD(mp) = 0;
1015 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
1016 
1017 		FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
1018 
1019 	} else {
1020 		while ((mp = allocb(size, pri)) == NULL) {
1021 			if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
1022 				return (NULL);
1023 		}
1024 	}
1025 
1026 	return (mp);
1027 }
1028 
1029 /*
1030  * Call function 'func' with 'arg' when a class zero block can
1031  * be allocated with priority 'pri'.
1032  */
1033 bufcall_id_t
1034 esbbcall(uint_t pri, void (*func)(void *), void *arg)
1035 {
1036 	return (bufcall(1, pri, func, arg));
1037 }
1038 
1039 /*
1040  * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
1041  * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
1042  * This provides consistency for all internal allocators of ioctl.
1043  */
1044 mblk_t *
1045 mkiocb(uint_t cmd)
1046 {
1047 	struct iocblk	*ioc;
1048 	mblk_t		*mp;
1049 
1050 	/*
1051 	 * Allocate enough space for any of the ioctl related messages.
1052 	 */
1053 	if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
1054 		return (NULL);
1055 
1056 	bzero(mp->b_rptr, sizeof (union ioctypes));
1057 
1058 	/*
1059 	 * Set the mblk_t information and ptrs correctly.
1060 	 */
1061 	mp->b_wptr += sizeof (struct iocblk);
1062 	mp->b_datap->db_type = M_IOCTL;
1063 
1064 	/*
1065 	 * Fill in the fields.
1066 	 */
1067 	ioc		= (struct iocblk *)mp->b_rptr;
1068 	ioc->ioc_cmd	= cmd;
1069 	ioc->ioc_cr	= kcred;
1070 	ioc->ioc_id	= getiocseqno();
1071 	ioc->ioc_flag	= IOC_NATIVE;
1072 	return (mp);
1073 }
1074 
1075 /*
1076  * test if block of given size can be allocated with a request of
1077  * the given priority.
1078  * 'pri' is no longer used, but is retained for compatibility.
1079  */
1080 /* ARGSUSED */
1081 int
1082 testb(size_t size, uint_t pri)
1083 {
1084 	return ((size + sizeof (dblk_t)) <= kmem_avail());
1085 }
1086 
1087 /*
1088  * Call function 'func' with argument 'arg' when there is a reasonably
1089  * good chance that a block of size 'size' can be allocated.
1090  * 'pri' is no longer used, but is retained for compatibility.
1091  */
1092 /* ARGSUSED */
1093 bufcall_id_t
1094 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
1095 {
1096 	static long bid = 1;	/* always odd to save checking for zero */
1097 	bufcall_id_t bc_id;
1098 	struct strbufcall *bcp;
1099 
1100 	if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
1101 		return (0);
1102 
1103 	bcp->bc_func = func;
1104 	bcp->bc_arg = arg;
1105 	bcp->bc_size = size;
1106 	bcp->bc_next = NULL;
1107 	bcp->bc_executor = NULL;
1108 
1109 	mutex_enter(&strbcall_lock);
1110 	/*
1111 	 * After bcp is linked into strbcalls and strbcall_lock is dropped there
1112 	 * should be no references to bcp since it may be freed by
1113 	 * runbufcalls(). Since bcp_id field is returned, we save its value in
1114 	 * the local var.
1115 	 */
1116 	bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2);	/* keep it odd */
1117 
1118 	/*
1119 	 * add newly allocated stream event to existing
1120 	 * linked list of events.
1121 	 */
1122 	if (strbcalls.bc_head == NULL) {
1123 		strbcalls.bc_head = strbcalls.bc_tail = bcp;
1124 	} else {
1125 		strbcalls.bc_tail->bc_next = bcp;
1126 		strbcalls.bc_tail = bcp;
1127 	}
1128 
1129 	cv_signal(&strbcall_cv);
1130 	mutex_exit(&strbcall_lock);
1131 	return (bc_id);
1132 }
1133 
1134 /*
1135  * Cancel a bufcall request.
1136  */
1137 void
1138 unbufcall(bufcall_id_t id)
1139 {
1140 	strbufcall_t *bcp, *pbcp;
1141 
1142 	mutex_enter(&strbcall_lock);
1143 again:
1144 	pbcp = NULL;
1145 	for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
1146 		if (id == bcp->bc_id)
1147 			break;
1148 		pbcp = bcp;
1149 	}
1150 	if (bcp) {
1151 		if (bcp->bc_executor != NULL) {
1152 			if (bcp->bc_executor != curthread) {
1153 				cv_wait(&bcall_cv, &strbcall_lock);
1154 				goto again;
1155 			}
1156 		} else {
1157 			if (pbcp)
1158 				pbcp->bc_next = bcp->bc_next;
1159 			else
1160 				strbcalls.bc_head = bcp->bc_next;
1161 			if (bcp == strbcalls.bc_tail)
1162 				strbcalls.bc_tail = pbcp;
1163 			kmem_free(bcp, sizeof (strbufcall_t));
1164 		}
1165 	}
1166 	mutex_exit(&strbcall_lock);
1167 }
1168 
1169 /*
1170  * Duplicate a message block by block (uses dupb), returning
1171  * a pointer to the duplicate message.
1172  * Returns a non-NULL value only if the entire message
1173  * was dup'd.
1174  */
1175 mblk_t *
1176 dupmsg(mblk_t *bp)
1177 {
1178 	mblk_t *head, *nbp;
1179 
1180 	if (!bp || !(nbp = head = dupb(bp)))
1181 		return (NULL);
1182 
1183 	while (bp->b_cont) {
1184 		if (!(nbp->b_cont = dupb(bp->b_cont))) {
1185 			freemsg(head);
1186 			return (NULL);
1187 		}
1188 		nbp = nbp->b_cont;
1189 		bp = bp->b_cont;
1190 	}
1191 	return (head);
1192 }
1193 
1194 #define	DUPB_NOLOAN(bp) \
1195 	((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
1196 	copyb((bp)) : dupb((bp)))
1197 
1198 mblk_t *
1199 dupmsg_noloan(mblk_t *bp)
1200 {
1201 	mblk_t *head, *nbp;
1202 
1203 	if (bp == NULL || DB_TYPE(bp) != M_DATA ||
1204 	    ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
1205 		return (NULL);
1206 
1207 	while (bp->b_cont) {
1208 		if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
1209 			freemsg(head);
1210 			return (NULL);
1211 		}
1212 		nbp = nbp->b_cont;
1213 		bp = bp->b_cont;
1214 	}
1215 	return (head);
1216 }
1217 
1218 /*
1219  * Copy data from message and data block to newly allocated message and
1220  * data block. Returns new message block pointer, or NULL if error.
1221  * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
1222  * as in the original even when db_base is not word aligned. (bug 1052877)
1223  */
1224 mblk_t *
1225 copyb(mblk_t *bp)
1226 {
1227 	mblk_t	*nbp;
1228 	dblk_t	*dp, *ndp;
1229 	uchar_t *base;
1230 	size_t	size;
1231 	size_t	unaligned;
1232 
1233 	ASSERT(bp->b_wptr >= bp->b_rptr);
1234 
1235 	dp = bp->b_datap;
1236 	if (dp->db_fthdr != NULL)
1237 		STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
1238 
1239 	/*
1240 	 * Special handling for Multidata message; this should be
1241 	 * removed once a copy-callback routine is made available.
1242 	 */
1243 	if (dp->db_type == M_MULTIDATA) {
1244 		cred_t *cr;
1245 
1246 		if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL)
1247 			return (NULL);
1248 
1249 		nbp->b_flag = bp->b_flag;
1250 		nbp->b_band = bp->b_band;
1251 		ndp = nbp->b_datap;
1252 
1253 		/* See comments below on potential issues. */
1254 		STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1255 
1256 		ASSERT(ndp->db_type == dp->db_type);
1257 		cr = dp->db_credp;
1258 		if (cr != NULL)
1259 			crhold(ndp->db_credp = cr);
1260 		ndp->db_cpid = dp->db_cpid;
1261 		return (nbp);
1262 	}
1263 
1264 	size = dp->db_lim - dp->db_base;
1265 	unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
1266 	if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
1267 		return (NULL);
1268 	nbp->b_flag = bp->b_flag;
1269 	nbp->b_band = bp->b_band;
1270 	ndp = nbp->b_datap;
1271 
1272 	/*
1273 	 * Well, here is a potential issue.  If we are trying to
1274 	 * trace a flow, and we copy the message, we might lose
1275 	 * information about where this message might have been.
1276 	 * So we should inherit the FT data.  On the other hand,
1277 	 * a user might be interested only in alloc to free data.
1278 	 * So I guess the real answer is to provide a tunable.
1279 	 */
1280 	STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1281 
1282 	base = ndp->db_base + unaligned;
1283 	bcopy(dp->db_base, ndp->db_base + unaligned, size);
1284 
1285 	nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
1286 	nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
1287 
1288 	return (nbp);
1289 }
1290 
1291 /*
1292  * Copy data from message to newly allocated message using new
1293  * data blocks.  Returns a pointer to the new message, or NULL if error.
1294  */
1295 mblk_t *
1296 copymsg(mblk_t *bp)
1297 {
1298 	mblk_t *head, *nbp;
1299 
1300 	if (!bp || !(nbp = head = copyb(bp)))
1301 		return (NULL);
1302 
1303 	while (bp->b_cont) {
1304 		if (!(nbp->b_cont = copyb(bp->b_cont))) {
1305 			freemsg(head);
1306 			return (NULL);
1307 		}
1308 		nbp = nbp->b_cont;
1309 		bp = bp->b_cont;
1310 	}
1311 	return (head);
1312 }
1313 
1314 /*
1315  * link a message block to tail of message
1316  */
1317 void
1318 linkb(mblk_t *mp, mblk_t *bp)
1319 {
1320 	ASSERT(mp && bp);
1321 
1322 	for (; mp->b_cont; mp = mp->b_cont)
1323 		;
1324 	mp->b_cont = bp;
1325 }
1326 
1327 /*
1328  * unlink a message block from head of message
1329  * return pointer to new message.
1330  * NULL if message becomes empty.
1331  */
1332 mblk_t *
1333 unlinkb(mblk_t *bp)
1334 {
1335 	mblk_t *bp1;
1336 
1337 	bp1 = bp->b_cont;
1338 	bp->b_cont = NULL;
1339 	return (bp1);
1340 }
1341 
1342 /*
1343  * remove a message block "bp" from message "mp"
1344  *
1345  * Return pointer to new message or NULL if no message remains.
1346  * Return -1 if bp is not found in message.
1347  */
1348 mblk_t *
1349 rmvb(mblk_t *mp, mblk_t *bp)
1350 {
1351 	mblk_t *tmp;
1352 	mblk_t *lastp = NULL;
1353 
1354 	ASSERT(mp && bp);
1355 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
1356 		if (tmp == bp) {
1357 			if (lastp)
1358 				lastp->b_cont = tmp->b_cont;
1359 			else
1360 				mp = tmp->b_cont;
1361 			tmp->b_cont = NULL;
1362 			return (mp);
1363 		}
1364 		lastp = tmp;
1365 	}
1366 	return ((mblk_t *)-1);
1367 }
1368 
1369 /*
1370  * Concatenate and align first len bytes of common
1371  * message type.  Len == -1, means concat everything.
1372  * Returns 1 on success, 0 on failure
1373  * After the pullup, mp points to the pulled up data.
1374  */
1375 int
1376 pullupmsg(mblk_t *mp, ssize_t len)
1377 {
1378 	mblk_t *bp, *b_cont;
1379 	dblk_t *dbp;
1380 	ssize_t n;
1381 
1382 	ASSERT(mp->b_datap->db_ref > 0);
1383 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
1384 
1385 	/*
1386 	 * We won't handle Multidata message, since it contains
1387 	 * metadata which this function has no knowledge of; we
1388 	 * assert on DEBUG, and return failure otherwise.
1389 	 */
1390 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1391 	if (mp->b_datap->db_type == M_MULTIDATA)
1392 		return (0);
1393 
1394 	if (len == -1) {
1395 		if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
1396 			return (1);
1397 		len = xmsgsize(mp);
1398 	} else {
1399 		ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
1400 		ASSERT(first_mblk_len >= 0);
1401 		/*
1402 		 * If the length is less than that of the first mblk,
1403 		 * we want to pull up the message into an aligned mblk.
1404 		 * Though not part of the spec, some callers assume it.
1405 		 */
1406 		if (len <= first_mblk_len) {
1407 			if (str_aligned(mp->b_rptr))
1408 				return (1);
1409 			len = first_mblk_len;
1410 		} else if (xmsgsize(mp) < len)
1411 			return (0);
1412 	}
1413 
1414 	if ((bp = allocb_tmpl(len, mp)) == NULL)
1415 		return (0);
1416 
1417 	dbp = bp->b_datap;
1418 	*bp = *mp;		/* swap mblks so bp heads the old msg... */
1419 	mp->b_datap = dbp;	/* ... and mp heads the new message */
1420 	mp->b_datap->db_mblk = mp;
1421 	bp->b_datap->db_mblk = bp;
1422 	mp->b_rptr = mp->b_wptr = dbp->db_base;
1423 
1424 	do {
1425 		ASSERT(bp->b_datap->db_ref > 0);
1426 		ASSERT(bp->b_wptr >= bp->b_rptr);
1427 		n = MIN(bp->b_wptr - bp->b_rptr, len);
1428 		bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
1429 		mp->b_wptr += n;
1430 		bp->b_rptr += n;
1431 		len -= n;
1432 		if (bp->b_rptr != bp->b_wptr)
1433 			break;
1434 		b_cont = bp->b_cont;
1435 		freeb(bp);
1436 		bp = b_cont;
1437 	} while (len && bp);
1438 
1439 	mp->b_cont = bp;	/* tack on whatever wasn't pulled up */
1440 
1441 	return (1);
1442 }
1443 
1444 /*
1445  * Concatenate and align at least the first len bytes of common message
1446  * type.  Len == -1 means concatenate everything.  The original message is
1447  * unaltered.  Returns a pointer to a new message on success, otherwise
1448  * returns NULL.
1449  */
1450 mblk_t *
1451 msgpullup(mblk_t *mp, ssize_t len)
1452 {
1453 	mblk_t	*newmp;
1454 	ssize_t	totlen;
1455 	ssize_t	n;
1456 
1457 	/*
1458 	 * We won't handle Multidata message, since it contains
1459 	 * metadata which this function has no knowledge of; we
1460 	 * assert on DEBUG, and return failure otherwise.
1461 	 */
1462 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1463 	if (mp->b_datap->db_type == M_MULTIDATA)
1464 		return (NULL);
1465 
1466 	totlen = xmsgsize(mp);
1467 
1468 	if ((len > 0) && (len > totlen))
1469 		return (NULL);
1470 
1471 	/*
1472 	 * Copy all of the first msg type into one new mblk, then dupmsg
1473 	 * and link the rest onto this.
1474 	 */
1475 
1476 	len = totlen;
1477 
1478 	if ((newmp = allocb_tmpl(len, mp)) == NULL)
1479 		return (NULL);
1480 
1481 	newmp->b_flag = mp->b_flag;
1482 	newmp->b_band = mp->b_band;
1483 
1484 	while (len > 0) {
1485 		n = mp->b_wptr - mp->b_rptr;
1486 		ASSERT(n >= 0);		/* allow zero-length mblk_t's */
1487 		if (n > 0)
1488 			bcopy(mp->b_rptr, newmp->b_wptr, n);
1489 		newmp->b_wptr += n;
1490 		len -= n;
1491 		mp = mp->b_cont;
1492 	}
1493 
1494 	if (mp != NULL) {
1495 		newmp->b_cont = dupmsg(mp);
1496 		if (newmp->b_cont == NULL) {
1497 			freemsg(newmp);
1498 			return (NULL);
1499 		}
1500 	}
1501 
1502 	return (newmp);
1503 }
1504 
1505 /*
1506  * Trim bytes from message
1507  *  len > 0, trim from head
1508  *  len < 0, trim from tail
1509  * Returns 1 on success, 0 on failure.
1510  */
1511 int
1512 adjmsg(mblk_t *mp, ssize_t len)
1513 {
1514 	mblk_t *bp;
1515 	mblk_t *save_bp = NULL;
1516 	mblk_t *prev_bp;
1517 	mblk_t *bcont;
1518 	unsigned char type;
1519 	ssize_t n;
1520 	int fromhead;
1521 	int first;
1522 
1523 	ASSERT(mp != NULL);
1524 	/*
1525 	 * We won't handle Multidata message, since it contains
1526 	 * metadata which this function has no knowledge of; we
1527 	 * assert on DEBUG, and return failure otherwise.
1528 	 */
1529 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1530 	if (mp->b_datap->db_type == M_MULTIDATA)
1531 		return (0);
1532 
1533 	if (len < 0) {
1534 		fromhead = 0;
1535 		len = -len;
1536 	} else {
1537 		fromhead = 1;
1538 	}
1539 
1540 	if (xmsgsize(mp) < len)
1541 		return (0);
1542 
1543 
1544 	if (fromhead) {
1545 		first = 1;
1546 		while (len) {
1547 			ASSERT(mp->b_wptr >= mp->b_rptr);
1548 			n = MIN(mp->b_wptr - mp->b_rptr, len);
1549 			mp->b_rptr += n;
1550 			len -= n;
1551 
1552 			/*
1553 			 * If this is not the first zero length
1554 			 * message remove it
1555 			 */
1556 			if (!first && (mp->b_wptr == mp->b_rptr)) {
1557 				bcont = mp->b_cont;
1558 				freeb(mp);
1559 				mp = save_bp->b_cont = bcont;
1560 			} else {
1561 				save_bp = mp;
1562 				mp = mp->b_cont;
1563 			}
1564 			first = 0;
1565 		}
1566 	} else {
1567 		type = mp->b_datap->db_type;
1568 		while (len) {
1569 			bp = mp;
1570 			save_bp = NULL;
1571 
1572 			/*
1573 			 * Find the last message of same type
1574 			 */
1575 
1576 			while (bp && bp->b_datap->db_type == type) {
1577 				ASSERT(bp->b_wptr >= bp->b_rptr);
1578 				prev_bp = save_bp;
1579 				save_bp = bp;
1580 				bp = bp->b_cont;
1581 			}
1582 			if (save_bp == NULL)
1583 				break;
1584 			n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
1585 			save_bp->b_wptr -= n;
1586 			len -= n;
1587 
1588 			/*
1589 			 * If this is not the first message
1590 			 * and we have taken away everything
1591 			 * from this message, remove it
1592 			 */
1593 
1594 			if ((save_bp != mp) &&
1595 				(save_bp->b_wptr == save_bp->b_rptr)) {
1596 				bcont = save_bp->b_cont;
1597 				freeb(save_bp);
1598 				prev_bp->b_cont = bcont;
1599 			}
1600 		}
1601 	}
1602 	return (1);
1603 }
1604 
1605 /*
1606  * get number of data bytes in message
1607  */
1608 size_t
1609 msgdsize(mblk_t *bp)
1610 {
1611 	size_t count = 0;
1612 
1613 	for (; bp; bp = bp->b_cont)
1614 		if (bp->b_datap->db_type == M_DATA) {
1615 			ASSERT(bp->b_wptr >= bp->b_rptr);
1616 			count += bp->b_wptr - bp->b_rptr;
1617 		}
1618 	return (count);
1619 }
1620 
1621 /*
1622  * Get a message off head of queue
1623  *
1624  * If queue has no buffers then mark queue
1625  * with QWANTR. (queue wants to be read by
1626  * someone when data becomes available)
1627  *
1628  * If there is something to take off then do so.
1629  * If queue falls below hi water mark turn off QFULL
1630  * flag.  Decrement weighted count of queue.
1631  * Also turn off QWANTR because queue is being read.
1632  *
1633  * The queue count is maintained on a per-band basis.
1634  * Priority band 0 (normal messages) uses q_count,
1635  * q_lowat, etc.  Non-zero priority bands use the
1636  * fields in their respective qband structures
1637  * (qb_count, qb_lowat, etc.)  All messages appear
1638  * on the same list, linked via their b_next pointers.
1639  * q_first is the head of the list.  q_count does
1640  * not reflect the size of all the messages on the
1641  * queue.  It only reflects those messages in the
1642  * normal band of flow.  The one exception to this
1643  * deals with high priority messages.  They are in
1644  * their own conceptual "band", but are accounted
1645  * against q_count.
1646  *
1647  * If queue count is below the lo water mark and QWANTW
1648  * is set, enable the closest backq which has a service
1649  * procedure and turn off the QWANTW flag.
1650  *
1651  * getq could be built on top of rmvq, but isn't because
1652  * of performance considerations.
1653  *
1654  * A note on the use of q_count and q_mblkcnt:
1655  *   q_count is the traditional byte count for messages that
1656  *   have been put on a queue.  Documentation tells us that
1657  *   we shouldn't rely on that count, but some drivers/modules
1658  *   do.  What was needed, however, is a mechanism to prevent
1659  *   runaway streams from consuming all of the resources,
1660  *   and particularly be able to flow control zero-length
1661  *   messages.  q_mblkcnt is used for this purpose.  It
1662  *   counts the number of mblk's that are being put on
1663  *   the queue.  The intention here, is that each mblk should
1664  *   contain one byte of data and, for the purpose of
1665  *   flow-control, logically does.  A queue will become
1666  *   full when EITHER of these values (q_count and q_mblkcnt)
1667  *   reach the highwater mark.  It will clear when BOTH
1668  *   of them drop below the highwater mark.  And it will
1669  *   backenable when BOTH of them drop below the lowwater
1670  *   mark.
1671  *   With this algorithm, a driver/module might be able
1672  *   to find a reasonably accurate q_count, and the
1673  *   framework can still try and limit resource usage.
1674  */
1675 mblk_t *
1676 getq(queue_t *q)
1677 {
1678 	mblk_t *bp;
1679 	uchar_t band = 0;
1680 
1681 	bp = getq_noenab(q);
1682 	if (bp != NULL)
1683 		band = bp->b_band;
1684 
1685 	/*
1686 	 * Inlined from qbackenable().
1687 	 * Quick check without holding the lock.
1688 	 */
1689 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1690 		return (bp);
1691 
1692 	qbackenable(q, band);
1693 	return (bp);
1694 }
1695 
1696 /*
1697  * Calculate number of data bytes in a single data message block taking
1698  * multidata messages into account.
1699  */
1700 
1701 #define	ADD_MBLK_SIZE(mp, size) 					\
1702 	if (DB_TYPE(mp) != M_MULTIDATA) {				\
1703 		(size) += MBLKL(mp);					\
1704 	} else {							\
1705 		uint_t	pinuse;						\
1706 									\
1707 		mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse);	\
1708 		(size) += pinuse;					\
1709 	}
1710 
1711 /*
1712  * Like getq() but does not backenable.  This is used by the stream
1713  * head when a putback() is likely.  The caller must call qbackenable()
1714  * after it is done with accessing the queue.
1715  */
1716 mblk_t *
1717 getq_noenab(queue_t *q)
1718 {
1719 	mblk_t *bp;
1720 	mblk_t *tmp;
1721 	qband_t *qbp;
1722 	kthread_id_t freezer;
1723 	int	bytecnt = 0, mblkcnt = 0;
1724 
1725 	/* freezestr should allow its caller to call getq/putq */
1726 	freezer = STREAM(q)->sd_freezer;
1727 	if (freezer == curthread) {
1728 		ASSERT(frozenstr(q));
1729 		ASSERT(MUTEX_HELD(QLOCK(q)));
1730 	} else
1731 		mutex_enter(QLOCK(q));
1732 
1733 	if ((bp = q->q_first) == 0) {
1734 		q->q_flag |= QWANTR;
1735 	} else {
1736 		if ((q->q_first = bp->b_next) == NULL)
1737 			q->q_last = NULL;
1738 		else
1739 			q->q_first->b_prev = NULL;
1740 
1741 		/* Get message byte count for q_count accounting */
1742 		for (tmp = bp; tmp; tmp = tmp->b_cont) {
1743 			ADD_MBLK_SIZE(tmp, bytecnt);
1744 			mblkcnt++;
1745 		}
1746 
1747 		if (bp->b_band == 0) {
1748 			q->q_count -= bytecnt;
1749 			q->q_mblkcnt -= mblkcnt;
1750 			if ((q->q_count < q->q_hiwat) &&
1751 			    (q->q_mblkcnt < q->q_hiwat)) {
1752 				q->q_flag &= ~QFULL;
1753 			}
1754 		} else {
1755 			int i;
1756 
1757 			ASSERT(bp->b_band <= q->q_nband);
1758 			ASSERT(q->q_bandp != NULL);
1759 			ASSERT(MUTEX_HELD(QLOCK(q)));
1760 			qbp = q->q_bandp;
1761 			i = bp->b_band;
1762 			while (--i > 0)
1763 				qbp = qbp->qb_next;
1764 			if (qbp->qb_first == qbp->qb_last) {
1765 				qbp->qb_first = NULL;
1766 				qbp->qb_last = NULL;
1767 			} else {
1768 				qbp->qb_first = bp->b_next;
1769 			}
1770 			qbp->qb_count -= bytecnt;
1771 			qbp->qb_mblkcnt -= mblkcnt;
1772 			if ((qbp->qb_count < qbp->qb_hiwat) &&
1773 			    (qbp->qb_mblkcnt < qbp->qb_hiwat)) {
1774 				qbp->qb_flag &= ~QB_FULL;
1775 			}
1776 		}
1777 		q->q_flag &= ~QWANTR;
1778 		bp->b_next = NULL;
1779 		bp->b_prev = NULL;
1780 	}
1781 	if (freezer != curthread)
1782 		mutex_exit(QLOCK(q));
1783 
1784 	STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL);
1785 
1786 	return (bp);
1787 }
1788 
1789 /*
1790  * Determine if a backenable is needed after removing a message in the
1791  * specified band.
1792  * NOTE: This routine assumes that something like getq_noenab() has been
1793  * already called.
1794  *
1795  * For the read side it is ok to hold sd_lock across calling this (and the
1796  * stream head often does).
1797  * But for the write side strwakeq might be invoked and it acquires sd_lock.
1798  */
1799 void
1800 qbackenable(queue_t *q, uchar_t band)
1801 {
1802 	int backenab = 0;
1803 	qband_t *qbp;
1804 	kthread_id_t freezer;
1805 
1806 	ASSERT(q);
1807 	ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
1808 
1809 	/*
1810 	 * Quick check without holding the lock.
1811 	 * OK since after getq() has lowered the q_count these flags
1812 	 * would not change unless either the qbackenable() is done by
1813 	 * another thread (which is ok) or the queue has gotten QFULL
1814 	 * in which case another backenable will take place when the queue
1815 	 * drops below q_lowat.
1816 	 */
1817 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1818 		return;
1819 
1820 	/* freezestr should allow its caller to call getq/putq */
1821 	freezer = STREAM(q)->sd_freezer;
1822 	if (freezer == curthread) {
1823 		ASSERT(frozenstr(q));
1824 		ASSERT(MUTEX_HELD(QLOCK(q)));
1825 	} else
1826 		mutex_enter(QLOCK(q));
1827 
1828 	if (band == 0) {
1829 		if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
1830 		    q->q_mblkcnt < q->q_lowat)) {
1831 			backenab = q->q_flag & (QWANTW|QWANTWSYNC);
1832 		}
1833 	} else {
1834 		int i;
1835 
1836 		ASSERT((unsigned)band <= q->q_nband);
1837 		ASSERT(q->q_bandp != NULL);
1838 
1839 		qbp = q->q_bandp;
1840 		i = band;
1841 		while (--i > 0)
1842 			qbp = qbp->qb_next;
1843 
1844 		if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
1845 		    qbp->qb_mblkcnt < qbp->qb_lowat)) {
1846 			backenab = qbp->qb_flag & QB_WANTW;
1847 		}
1848 	}
1849 
1850 	if (backenab == 0) {
1851 		if (freezer != curthread)
1852 			mutex_exit(QLOCK(q));
1853 		return;
1854 	}
1855 
1856 	/* Have to drop the lock across strwakeq and backenable */
1857 	if (backenab & QWANTWSYNC)
1858 		q->q_flag &= ~QWANTWSYNC;
1859 	if (backenab & (QWANTW|QB_WANTW)) {
1860 		if (band != 0)
1861 			qbp->qb_flag &= ~QB_WANTW;
1862 		else {
1863 			q->q_flag &= ~QWANTW;
1864 		}
1865 	}
1866 
1867 	if (freezer != curthread)
1868 		mutex_exit(QLOCK(q));
1869 
1870 	if (backenab & QWANTWSYNC)
1871 		strwakeq(q, QWANTWSYNC);
1872 	if (backenab & (QWANTW|QB_WANTW))
1873 		backenable(q, band);
1874 }
1875 
1876 /*
1877  * Remove a message from a queue.  The queue count and other
1878  * flow control parameters are adjusted and the back queue
1879  * enabled if necessary.
1880  *
1881  * rmvq can be called with the stream frozen, but other utility functions
1882  * holding QLOCK, and by streams modules without any locks/frozen.
1883  */
1884 void
1885 rmvq(queue_t *q, mblk_t *mp)
1886 {
1887 	ASSERT(mp != NULL);
1888 
1889 	rmvq_noenab(q, mp);
1890 	if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
1891 		/*
1892 		 * qbackenable can handle a frozen stream but not a "random"
1893 		 * qlock being held. Drop lock across qbackenable.
1894 		 */
1895 		mutex_exit(QLOCK(q));
1896 		qbackenable(q, mp->b_band);
1897 		mutex_enter(QLOCK(q));
1898 	} else {
1899 		qbackenable(q, mp->b_band);
1900 	}
1901 }
1902 
1903 /*
1904  * Like rmvq() but without any backenabling.
1905  * This exists to handle SR_CONSOL_DATA in strrput().
1906  */
1907 void
1908 rmvq_noenab(queue_t *q, mblk_t *mp)
1909 {
1910 	mblk_t *tmp;
1911 	int i;
1912 	qband_t *qbp = NULL;
1913 	kthread_id_t freezer;
1914 	int	bytecnt = 0, mblkcnt = 0;
1915 
1916 	freezer = STREAM(q)->sd_freezer;
1917 	if (freezer == curthread) {
1918 		ASSERT(frozenstr(q));
1919 		ASSERT(MUTEX_HELD(QLOCK(q)));
1920 	} else if (MUTEX_HELD(QLOCK(q))) {
1921 		/* Don't drop lock on exit */
1922 		freezer = curthread;
1923 	} else
1924 		mutex_enter(QLOCK(q));
1925 
1926 	ASSERT(mp->b_band <= q->q_nband);
1927 	if (mp->b_band != 0) {		/* Adjust band pointers */
1928 		ASSERT(q->q_bandp != NULL);
1929 		qbp = q->q_bandp;
1930 		i = mp->b_band;
1931 		while (--i > 0)
1932 			qbp = qbp->qb_next;
1933 		if (mp == qbp->qb_first) {
1934 			if (mp->b_next && mp->b_band == mp->b_next->b_band)
1935 				qbp->qb_first = mp->b_next;
1936 			else
1937 				qbp->qb_first = NULL;
1938 		}
1939 		if (mp == qbp->qb_last) {
1940 			if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
1941 				qbp->qb_last = mp->b_prev;
1942 			else
1943 				qbp->qb_last = NULL;
1944 		}
1945 	}
1946 
1947 	/*
1948 	 * Remove the message from the list.
1949 	 */
1950 	if (mp->b_prev)
1951 		mp->b_prev->b_next = mp->b_next;
1952 	else
1953 		q->q_first = mp->b_next;
1954 	if (mp->b_next)
1955 		mp->b_next->b_prev = mp->b_prev;
1956 	else
1957 		q->q_last = mp->b_prev;
1958 	mp->b_next = NULL;
1959 	mp->b_prev = NULL;
1960 
1961 	/* Get the size of the message for q_count accounting */
1962 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
1963 		ADD_MBLK_SIZE(tmp, bytecnt);
1964 		mblkcnt++;
1965 	}
1966 
1967 	if (mp->b_band == 0) {		/* Perform q_count accounting */
1968 		q->q_count -= bytecnt;
1969 		q->q_mblkcnt -= mblkcnt;
1970 		if ((q->q_count < q->q_hiwat) &&
1971 		    (q->q_mblkcnt < q->q_hiwat)) {
1972 			q->q_flag &= ~QFULL;
1973 		}
1974 	} else {			/* Perform qb_count accounting */
1975 		qbp->qb_count -= bytecnt;
1976 		qbp->qb_mblkcnt -= mblkcnt;
1977 		if ((qbp->qb_count < qbp->qb_hiwat) &&
1978 		    (qbp->qb_mblkcnt < qbp->qb_hiwat)) {
1979 			qbp->qb_flag &= ~QB_FULL;
1980 		}
1981 	}
1982 	if (freezer != curthread)
1983 		mutex_exit(QLOCK(q));
1984 
1985 	STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL);
1986 }
1987 
1988 /*
1989  * Empty a queue.
1990  * If flag is set, remove all messages.  Otherwise, remove
1991  * only non-control messages.  If queue falls below its low
1992  * water mark, and QWANTW is set, enable the nearest upstream
1993  * service procedure.
1994  *
1995  * Historical note: when merging the M_FLUSH code in strrput with this
1996  * code one difference was discovered. flushq did not have a check
1997  * for q_lowat == 0 in the backenabling test.
1998  *
1999  * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
2000  * if one exists on the queue.
2001  */
2002 void
2003 flushq_common(queue_t *q, int flag, int pcproto_flag)
2004 {
2005 	mblk_t *mp, *nmp;
2006 	qband_t *qbp;
2007 	int backenab = 0;
2008 	unsigned char bpri;
2009 	unsigned char	qbf[NBAND];	/* band flushing backenable flags */
2010 
2011 	if (q->q_first == NULL)
2012 		return;
2013 
2014 	mutex_enter(QLOCK(q));
2015 	mp = q->q_first;
2016 	q->q_first = NULL;
2017 	q->q_last = NULL;
2018 	q->q_count = 0;
2019 	q->q_mblkcnt = 0;
2020 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2021 		qbp->qb_first = NULL;
2022 		qbp->qb_last = NULL;
2023 		qbp->qb_count = 0;
2024 		qbp->qb_mblkcnt = 0;
2025 		qbp->qb_flag &= ~QB_FULL;
2026 	}
2027 	q->q_flag &= ~QFULL;
2028 	mutex_exit(QLOCK(q));
2029 	while (mp) {
2030 		nmp = mp->b_next;
2031 		mp->b_next = mp->b_prev = NULL;
2032 
2033 		STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL);
2034 
2035 		if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
2036 			(void) putq(q, mp);
2037 		else if (flag || datamsg(mp->b_datap->db_type))
2038 			freemsg(mp);
2039 		else
2040 			(void) putq(q, mp);
2041 		mp = nmp;
2042 	}
2043 	bpri = 1;
2044 	mutex_enter(QLOCK(q));
2045 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2046 		if ((qbp->qb_flag & QB_WANTW) &&
2047 		    (((qbp->qb_count < qbp->qb_lowat) &&
2048 		    (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
2049 		    qbp->qb_lowat == 0)) {
2050 			qbp->qb_flag &= ~QB_WANTW;
2051 			backenab = 1;
2052 			qbf[bpri] = 1;
2053 		} else
2054 			qbf[bpri] = 0;
2055 		bpri++;
2056 	}
2057 	ASSERT(bpri == (unsigned char)(q->q_nband + 1));
2058 	if ((q->q_flag & QWANTW) &&
2059 	    (((q->q_count < q->q_lowat) &&
2060 	    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2061 		q->q_flag &= ~QWANTW;
2062 		backenab = 1;
2063 		qbf[0] = 1;
2064 	} else
2065 		qbf[0] = 0;
2066 
2067 	/*
2068 	 * If any band can now be written to, and there is a writer
2069 	 * for that band, then backenable the closest service procedure.
2070 	 */
2071 	if (backenab) {
2072 		mutex_exit(QLOCK(q));
2073 		for (bpri = q->q_nband; bpri != 0; bpri--)
2074 			if (qbf[bpri])
2075 				backenable(q, bpri);
2076 		if (qbf[0])
2077 			backenable(q, 0);
2078 	} else
2079 		mutex_exit(QLOCK(q));
2080 }
2081 
2082 /*
2083  * The real flushing takes place in flushq_common. This is done so that
2084  * a flag which specifies whether or not M_PCPROTO messages should be flushed
2085  * or not. Currently the only place that uses this flag is the stream head.
2086  */
2087 void
2088 flushq(queue_t *q, int flag)
2089 {
2090 	flushq_common(q, flag, 0);
2091 }
2092 
2093 /*
2094  * Flush the queue of messages of the given priority band.
2095  * There is some duplication of code between flushq and flushband.
2096  * This is because we want to optimize the code as much as possible.
2097  * The assumption is that there will be more messages in the normal
2098  * (priority 0) band than in any other.
2099  *
2100  * Historical note: when merging the M_FLUSH code in strrput with this
2101  * code one difference was discovered. flushband had an extra check for
2102  * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
2103  * case. That check does not match the man page for flushband and was not
2104  * in the strrput flush code hence it was removed.
2105  */
2106 void
2107 flushband(queue_t *q, unsigned char pri, int flag)
2108 {
2109 	mblk_t *mp;
2110 	mblk_t *nmp;
2111 	mblk_t *last;
2112 	qband_t *qbp;
2113 	int band;
2114 
2115 	ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
2116 	if (pri > q->q_nband) {
2117 		return;
2118 	}
2119 	mutex_enter(QLOCK(q));
2120 	if (pri == 0) {
2121 		mp = q->q_first;
2122 		q->q_first = NULL;
2123 		q->q_last = NULL;
2124 		q->q_count = 0;
2125 		q->q_mblkcnt = 0;
2126 		for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2127 			qbp->qb_first = NULL;
2128 			qbp->qb_last = NULL;
2129 			qbp->qb_count = 0;
2130 			qbp->qb_mblkcnt = 0;
2131 			qbp->qb_flag &= ~QB_FULL;
2132 		}
2133 		q->q_flag &= ~QFULL;
2134 		mutex_exit(QLOCK(q));
2135 		while (mp) {
2136 			nmp = mp->b_next;
2137 			mp->b_next = mp->b_prev = NULL;
2138 			if ((mp->b_band == 0) &&
2139 				((flag == FLUSHALL) ||
2140 				datamsg(mp->b_datap->db_type)))
2141 				freemsg(mp);
2142 			else
2143 				(void) putq(q, mp);
2144 			mp = nmp;
2145 		}
2146 		mutex_enter(QLOCK(q));
2147 		if ((q->q_flag & QWANTW) &&
2148 		    (((q->q_count < q->q_lowat) &&
2149 		    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2150 			q->q_flag &= ~QWANTW;
2151 			mutex_exit(QLOCK(q));
2152 
2153 			backenable(q, pri);
2154 		} else
2155 			mutex_exit(QLOCK(q));
2156 	} else {	/* pri != 0 */
2157 		boolean_t flushed = B_FALSE;
2158 		band = pri;
2159 
2160 		ASSERT(MUTEX_HELD(QLOCK(q)));
2161 		qbp = q->q_bandp;
2162 		while (--band > 0)
2163 			qbp = qbp->qb_next;
2164 		mp = qbp->qb_first;
2165 		if (mp == NULL) {
2166 			mutex_exit(QLOCK(q));
2167 			return;
2168 		}
2169 		last = qbp->qb_last->b_next;
2170 		/*
2171 		 * rmvq_noenab() and freemsg() are called for each mblk that
2172 		 * meets the criteria.  The loop is executed until the last
2173 		 * mblk has been processed.
2174 		 */
2175 		while (mp != last) {
2176 			ASSERT(mp->b_band == pri);
2177 			nmp = mp->b_next;
2178 			if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
2179 				rmvq_noenab(q, mp);
2180 				freemsg(mp);
2181 				flushed = B_TRUE;
2182 			}
2183 			mp = nmp;
2184 		}
2185 		mutex_exit(QLOCK(q));
2186 
2187 		/*
2188 		 * If any mblk(s) has been freed, we know that qbackenable()
2189 		 * will need to be called.
2190 		 */
2191 		if (flushed)
2192 			qbackenable(q, pri);
2193 	}
2194 }
2195 
2196 /*
2197  * Return 1 if the queue is not full.  If the queue is full, return
2198  * 0 (may not put message) and set QWANTW flag (caller wants to write
2199  * to the queue).
2200  */
2201 int
2202 canput(queue_t *q)
2203 {
2204 	TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
2205 
2206 	/* this is for loopback transports, they should not do a canput */
2207 	ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
2208 
2209 	/* Find next forward module that has a service procedure */
2210 	q = q->q_nfsrv;
2211 
2212 	if (!(q->q_flag & QFULL)) {
2213 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2214 		return (1);
2215 	}
2216 	mutex_enter(QLOCK(q));
2217 	if (q->q_flag & QFULL) {
2218 		q->q_flag |= QWANTW;
2219 		mutex_exit(QLOCK(q));
2220 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
2221 		return (0);
2222 	}
2223 	mutex_exit(QLOCK(q));
2224 	TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2225 	return (1);
2226 }
2227 
2228 /*
2229  * This is the new canput for use with priority bands.  Return 1 if the
2230  * band is not full.  If the band is full, return 0 (may not put message)
2231  * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
2232  * write to the queue).
2233  */
2234 int
2235 bcanput(queue_t *q, unsigned char pri)
2236 {
2237 	qband_t *qbp;
2238 
2239 	TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
2240 	if (!q)
2241 		return (0);
2242 
2243 	/* Find next forward module that has a service procedure */
2244 	q = q->q_nfsrv;
2245 
2246 	mutex_enter(QLOCK(q));
2247 	if (pri == 0) {
2248 		if (q->q_flag & QFULL) {
2249 			q->q_flag |= QWANTW;
2250 			mutex_exit(QLOCK(q));
2251 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2252 				"bcanput:%p %X %d", q, pri, 0);
2253 			return (0);
2254 		}
2255 	} else {	/* pri != 0 */
2256 		if (pri > q->q_nband) {
2257 			/*
2258 			 * No band exists yet, so return success.
2259 			 */
2260 			mutex_exit(QLOCK(q));
2261 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2262 				"bcanput:%p %X %d", q, pri, 1);
2263 			return (1);
2264 		}
2265 		qbp = q->q_bandp;
2266 		while (--pri)
2267 			qbp = qbp->qb_next;
2268 		if (qbp->qb_flag & QB_FULL) {
2269 			qbp->qb_flag |= QB_WANTW;
2270 			mutex_exit(QLOCK(q));
2271 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2272 				"bcanput:%p %X %d", q, pri, 0);
2273 			return (0);
2274 		}
2275 	}
2276 	mutex_exit(QLOCK(q));
2277 	TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2278 		"bcanput:%p %X %d", q, pri, 1);
2279 	return (1);
2280 }
2281 
2282 /*
2283  * Put a message on a queue.
2284  *
2285  * Messages are enqueued on a priority basis.  The priority classes
2286  * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
2287  * and B_NORMAL (type < QPCTL && band == 0).
2288  *
2289  * Add appropriate weighted data block sizes to queue count.
2290  * If queue hits high water mark then set QFULL flag.
2291  *
2292  * If QNOENAB is not set (putq is allowed to enable the queue),
2293  * enable the queue only if the message is PRIORITY,
2294  * or the QWANTR flag is set (indicating that the service procedure
2295  * is ready to read the queue.  This implies that a service
2296  * procedure must NEVER put a high priority message back on its own
2297  * queue, as this would result in an infinite loop (!).
2298  */
2299 int
2300 putq(queue_t *q, mblk_t *bp)
2301 {
2302 	mblk_t *tmp;
2303 	qband_t *qbp = NULL;
2304 	int mcls = (int)queclass(bp);
2305 	kthread_id_t freezer;
2306 	int	bytecnt = 0, mblkcnt = 0;
2307 
2308 	freezer = STREAM(q)->sd_freezer;
2309 	if (freezer == curthread) {
2310 		ASSERT(frozenstr(q));
2311 		ASSERT(MUTEX_HELD(QLOCK(q)));
2312 	} else
2313 		mutex_enter(QLOCK(q));
2314 
2315 	/*
2316 	 * Make sanity checks and if qband structure is not yet
2317 	 * allocated, do so.
2318 	 */
2319 	if (mcls == QPCTL) {
2320 		if (bp->b_band != 0)
2321 			bp->b_band = 0;		/* force to be correct */
2322 	} else if (bp->b_band != 0) {
2323 		int i;
2324 		qband_t **qbpp;
2325 
2326 		if (bp->b_band > q->q_nband) {
2327 
2328 			/*
2329 			 * The qband structure for this priority band is
2330 			 * not on the queue yet, so we have to allocate
2331 			 * one on the fly.  It would be wasteful to
2332 			 * associate the qband structures with every
2333 			 * queue when the queues are allocated.  This is
2334 			 * because most queues will only need the normal
2335 			 * band of flow which can be described entirely
2336 			 * by the queue itself.
2337 			 */
2338 			qbpp = &q->q_bandp;
2339 			while (*qbpp)
2340 				qbpp = &(*qbpp)->qb_next;
2341 			while (bp->b_band > q->q_nband) {
2342 				if ((*qbpp = allocband()) == NULL) {
2343 					if (freezer != curthread)
2344 						mutex_exit(QLOCK(q));
2345 					return (0);
2346 				}
2347 				(*qbpp)->qb_hiwat = q->q_hiwat;
2348 				(*qbpp)->qb_lowat = q->q_lowat;
2349 				q->q_nband++;
2350 				qbpp = &(*qbpp)->qb_next;
2351 			}
2352 		}
2353 		ASSERT(MUTEX_HELD(QLOCK(q)));
2354 		qbp = q->q_bandp;
2355 		i = bp->b_band;
2356 		while (--i)
2357 			qbp = qbp->qb_next;
2358 	}
2359 
2360 	/*
2361 	 * If queue is empty, add the message and initialize the pointers.
2362 	 * Otherwise, adjust message pointers and queue pointers based on
2363 	 * the type of the message and where it belongs on the queue.  Some
2364 	 * code is duplicated to minimize the number of conditionals and
2365 	 * hopefully minimize the amount of time this routine takes.
2366 	 */
2367 	if (!q->q_first) {
2368 		bp->b_next = NULL;
2369 		bp->b_prev = NULL;
2370 		q->q_first = bp;
2371 		q->q_last = bp;
2372 		if (qbp) {
2373 			qbp->qb_first = bp;
2374 			qbp->qb_last = bp;
2375 		}
2376 	} else if (!qbp) {	/* bp->b_band == 0 */
2377 
2378 		/*
2379 		 * If queue class of message is less than or equal to
2380 		 * that of the last one on the queue, tack on to the end.
2381 		 */
2382 		tmp = q->q_last;
2383 		if (mcls <= (int)queclass(tmp)) {
2384 			bp->b_next = NULL;
2385 			bp->b_prev = tmp;
2386 			tmp->b_next = bp;
2387 			q->q_last = bp;
2388 		} else {
2389 			tmp = q->q_first;
2390 			while ((int)queclass(tmp) >= mcls)
2391 				tmp = tmp->b_next;
2392 
2393 			/*
2394 			 * Insert bp before tmp.
2395 			 */
2396 			bp->b_next = tmp;
2397 			bp->b_prev = tmp->b_prev;
2398 			if (tmp->b_prev)
2399 				tmp->b_prev->b_next = bp;
2400 			else
2401 				q->q_first = bp;
2402 			tmp->b_prev = bp;
2403 		}
2404 	} else {		/* bp->b_band != 0 */
2405 		if (qbp->qb_first) {
2406 			tmp = qbp->qb_last;
2407 
2408 			/*
2409 			 * Insert bp after the last message in this band.
2410 			 */
2411 			bp->b_next = tmp->b_next;
2412 			if (tmp->b_next)
2413 				tmp->b_next->b_prev = bp;
2414 			else
2415 				q->q_last = bp;
2416 			bp->b_prev = tmp;
2417 			tmp->b_next = bp;
2418 		} else {
2419 			tmp = q->q_last;
2420 			if ((mcls < (int)queclass(tmp)) ||
2421 			    (bp->b_band <= tmp->b_band)) {
2422 
2423 				/*
2424 				 * Tack bp on end of queue.
2425 				 */
2426 				bp->b_next = NULL;
2427 				bp->b_prev = tmp;
2428 				tmp->b_next = bp;
2429 				q->q_last = bp;
2430 			} else {
2431 				tmp = q->q_first;
2432 				while (tmp->b_datap->db_type >= QPCTL)
2433 					tmp = tmp->b_next;
2434 				while (tmp->b_band >= bp->b_band)
2435 					tmp = tmp->b_next;
2436 
2437 				/*
2438 				 * Insert bp before tmp.
2439 				 */
2440 				bp->b_next = tmp;
2441 				bp->b_prev = tmp->b_prev;
2442 				if (tmp->b_prev)
2443 					tmp->b_prev->b_next = bp;
2444 				else
2445 					q->q_first = bp;
2446 				tmp->b_prev = bp;
2447 			}
2448 			qbp->qb_first = bp;
2449 		}
2450 		qbp->qb_last = bp;
2451 	}
2452 
2453 	/* Get message byte count for q_count accounting */
2454 	for (tmp = bp; tmp; tmp = tmp->b_cont) {
2455 		ADD_MBLK_SIZE(tmp, bytecnt);
2456 		mblkcnt++;
2457 	}
2458 
2459 	if (qbp) {
2460 		qbp->qb_count += bytecnt;
2461 		qbp->qb_mblkcnt += mblkcnt;
2462 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2463 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2464 			qbp->qb_flag |= QB_FULL;
2465 		}
2466 	} else {
2467 		q->q_count += bytecnt;
2468 		q->q_mblkcnt += mblkcnt;
2469 		if ((q->q_count >= q->q_hiwat) ||
2470 		    (q->q_mblkcnt >= q->q_hiwat)) {
2471 			q->q_flag |= QFULL;
2472 		}
2473 	}
2474 
2475 	STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL);
2476 
2477 	if ((mcls > QNORM) ||
2478 	    (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
2479 		qenable_locked(q);
2480 	ASSERT(MUTEX_HELD(QLOCK(q)));
2481 	if (freezer != curthread)
2482 		mutex_exit(QLOCK(q));
2483 
2484 	return (1);
2485 }
2486 
2487 /*
2488  * Put stuff back at beginning of Q according to priority order.
2489  * See comment on putq above for details.
2490  */
2491 int
2492 putbq(queue_t *q, mblk_t *bp)
2493 {
2494 	mblk_t *tmp;
2495 	qband_t *qbp = NULL;
2496 	int mcls = (int)queclass(bp);
2497 	kthread_id_t freezer;
2498 	int	bytecnt = 0, mblkcnt = 0;
2499 
2500 	ASSERT(q && bp);
2501 	ASSERT(bp->b_next == NULL);
2502 	freezer = STREAM(q)->sd_freezer;
2503 	if (freezer == curthread) {
2504 		ASSERT(frozenstr(q));
2505 		ASSERT(MUTEX_HELD(QLOCK(q)));
2506 	} else
2507 		mutex_enter(QLOCK(q));
2508 
2509 	/*
2510 	 * Make sanity checks and if qband structure is not yet
2511 	 * allocated, do so.
2512 	 */
2513 	if (mcls == QPCTL) {
2514 		if (bp->b_band != 0)
2515 			bp->b_band = 0;		/* force to be correct */
2516 	} else if (bp->b_band != 0) {
2517 		int i;
2518 		qband_t **qbpp;
2519 
2520 		if (bp->b_band > q->q_nband) {
2521 			qbpp = &q->q_bandp;
2522 			while (*qbpp)
2523 				qbpp = &(*qbpp)->qb_next;
2524 			while (bp->b_band > q->q_nband) {
2525 				if ((*qbpp = allocband()) == NULL) {
2526 					if (freezer != curthread)
2527 						mutex_exit(QLOCK(q));
2528 					return (0);
2529 				}
2530 				(*qbpp)->qb_hiwat = q->q_hiwat;
2531 				(*qbpp)->qb_lowat = q->q_lowat;
2532 				q->q_nband++;
2533 				qbpp = &(*qbpp)->qb_next;
2534 			}
2535 		}
2536 		qbp = q->q_bandp;
2537 		i = bp->b_band;
2538 		while (--i)
2539 			qbp = qbp->qb_next;
2540 	}
2541 
2542 	/*
2543 	 * If queue is empty or if message is high priority,
2544 	 * place on the front of the queue.
2545 	 */
2546 	tmp = q->q_first;
2547 	if ((!tmp) || (mcls == QPCTL)) {
2548 		bp->b_next = tmp;
2549 		if (tmp)
2550 			tmp->b_prev = bp;
2551 		else
2552 			q->q_last = bp;
2553 		q->q_first = bp;
2554 		bp->b_prev = NULL;
2555 		if (qbp) {
2556 			qbp->qb_first = bp;
2557 			qbp->qb_last = bp;
2558 		}
2559 	} else if (qbp) {	/* bp->b_band != 0 */
2560 		tmp = qbp->qb_first;
2561 		if (tmp) {
2562 
2563 			/*
2564 			 * Insert bp before the first message in this band.
2565 			 */
2566 			bp->b_next = tmp;
2567 			bp->b_prev = tmp->b_prev;
2568 			if (tmp->b_prev)
2569 				tmp->b_prev->b_next = bp;
2570 			else
2571 				q->q_first = bp;
2572 			tmp->b_prev = bp;
2573 		} else {
2574 			tmp = q->q_last;
2575 			if ((mcls < (int)queclass(tmp)) ||
2576 			    (bp->b_band < tmp->b_band)) {
2577 
2578 				/*
2579 				 * Tack bp on end of queue.
2580 				 */
2581 				bp->b_next = NULL;
2582 				bp->b_prev = tmp;
2583 				tmp->b_next = bp;
2584 				q->q_last = bp;
2585 			} else {
2586 				tmp = q->q_first;
2587 				while (tmp->b_datap->db_type >= QPCTL)
2588 					tmp = tmp->b_next;
2589 				while (tmp->b_band > bp->b_band)
2590 					tmp = tmp->b_next;
2591 
2592 				/*
2593 				 * Insert bp before tmp.
2594 				 */
2595 				bp->b_next = tmp;
2596 				bp->b_prev = tmp->b_prev;
2597 				if (tmp->b_prev)
2598 					tmp->b_prev->b_next = bp;
2599 				else
2600 					q->q_first = bp;
2601 				tmp->b_prev = bp;
2602 			}
2603 			qbp->qb_last = bp;
2604 		}
2605 		qbp->qb_first = bp;
2606 	} else {		/* bp->b_band == 0 && !QPCTL */
2607 
2608 		/*
2609 		 * If the queue class or band is less than that of the last
2610 		 * message on the queue, tack bp on the end of the queue.
2611 		 */
2612 		tmp = q->q_last;
2613 		if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
2614 			bp->b_next = NULL;
2615 			bp->b_prev = tmp;
2616 			tmp->b_next = bp;
2617 			q->q_last = bp;
2618 		} else {
2619 			tmp = q->q_first;
2620 			while (tmp->b_datap->db_type >= QPCTL)
2621 				tmp = tmp->b_next;
2622 			while (tmp->b_band > bp->b_band)
2623 				tmp = tmp->b_next;
2624 
2625 			/*
2626 			 * Insert bp before tmp.
2627 			 */
2628 			bp->b_next = tmp;
2629 			bp->b_prev = tmp->b_prev;
2630 			if (tmp->b_prev)
2631 				tmp->b_prev->b_next = bp;
2632 			else
2633 				q->q_first = bp;
2634 			tmp->b_prev = bp;
2635 		}
2636 	}
2637 
2638 	/* Get message byte count for q_count accounting */
2639 	for (tmp = bp; tmp; tmp = tmp->b_cont) {
2640 		ADD_MBLK_SIZE(tmp, bytecnt);
2641 		mblkcnt++;
2642 	}
2643 	if (qbp) {
2644 		qbp->qb_count += bytecnt;
2645 		qbp->qb_mblkcnt += mblkcnt;
2646 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2647 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2648 			qbp->qb_flag |= QB_FULL;
2649 		}
2650 	} else {
2651 		q->q_count += bytecnt;
2652 		q->q_mblkcnt += mblkcnt;
2653 		if ((q->q_count >= q->q_hiwat) ||
2654 		    (q->q_mblkcnt >= q->q_hiwat)) {
2655 			q->q_flag |= QFULL;
2656 		}
2657 	}
2658 
2659 	STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL);
2660 
2661 	if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
2662 		qenable_locked(q);
2663 	ASSERT(MUTEX_HELD(QLOCK(q)));
2664 	if (freezer != curthread)
2665 		mutex_exit(QLOCK(q));
2666 
2667 	return (1);
2668 }
2669 
2670 /*
2671  * Insert a message before an existing message on the queue.  If the
2672  * existing message is NULL, the new messages is placed on the end of
2673  * the queue.  The queue class of the new message is ignored.  However,
2674  * the priority band of the new message must adhere to the following
2675  * ordering:
2676  *
2677  *	emp->b_prev->b_band >= mp->b_band >= emp->b_band.
2678  *
2679  * All flow control parameters are updated.
2680  *
2681  * insq can be called with the stream frozen, but other utility functions
2682  * holding QLOCK, and by streams modules without any locks/frozen.
2683  */
2684 int
2685 insq(queue_t *q, mblk_t *emp, mblk_t *mp)
2686 {
2687 	mblk_t *tmp;
2688 	qband_t *qbp = NULL;
2689 	int mcls = (int)queclass(mp);
2690 	kthread_id_t freezer;
2691 	int	bytecnt = 0, mblkcnt = 0;
2692 
2693 	freezer = STREAM(q)->sd_freezer;
2694 	if (freezer == curthread) {
2695 		ASSERT(frozenstr(q));
2696 		ASSERT(MUTEX_HELD(QLOCK(q)));
2697 	} else if (MUTEX_HELD(QLOCK(q))) {
2698 		/* Don't drop lock on exit */
2699 		freezer = curthread;
2700 	} else
2701 		mutex_enter(QLOCK(q));
2702 
2703 	if (mcls == QPCTL) {
2704 		if (mp->b_band != 0)
2705 			mp->b_band = 0;		/* force to be correct */
2706 		if (emp && emp->b_prev &&
2707 		    (emp->b_prev->b_datap->db_type < QPCTL))
2708 			goto badord;
2709 	}
2710 	if (emp) {
2711 		if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
2712 		    (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
2713 		    (emp->b_prev->b_band < mp->b_band))) {
2714 			goto badord;
2715 		}
2716 	} else {
2717 		tmp = q->q_last;
2718 		if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
2719 badord:
2720 			cmn_err(CE_WARN,
2721 			    "insq: attempt to insert message out of order "
2722 			    "on q %p", (void *)q);
2723 			if (freezer != curthread)
2724 				mutex_exit(QLOCK(q));
2725 			return (0);
2726 		}
2727 	}
2728 
2729 	if (mp->b_band != 0) {
2730 		int i;
2731 		qband_t **qbpp;
2732 
2733 		if (mp->b_band > q->q_nband) {
2734 			qbpp = &q->q_bandp;
2735 			while (*qbpp)
2736 				qbpp = &(*qbpp)->qb_next;
2737 			while (mp->b_band > q->q_nband) {
2738 				if ((*qbpp = allocband()) == NULL) {
2739 					if (freezer != curthread)
2740 						mutex_exit(QLOCK(q));
2741 					return (0);
2742 				}
2743 				(*qbpp)->qb_hiwat = q->q_hiwat;
2744 				(*qbpp)->qb_lowat = q->q_lowat;
2745 				q->q_nband++;
2746 				qbpp = &(*qbpp)->qb_next;
2747 			}
2748 		}
2749 		qbp = q->q_bandp;
2750 		i = mp->b_band;
2751 		while (--i)
2752 			qbp = qbp->qb_next;
2753 	}
2754 
2755 	if ((mp->b_next = emp) != NULL) {
2756 		if ((mp->b_prev = emp->b_prev) != NULL)
2757 			emp->b_prev->b_next = mp;
2758 		else
2759 			q->q_first = mp;
2760 		emp->b_prev = mp;
2761 	} else {
2762 		if ((mp->b_prev = q->q_last) != NULL)
2763 			q->q_last->b_next = mp;
2764 		else
2765 			q->q_first = mp;
2766 		q->q_last = mp;
2767 	}
2768 
2769 	/* Get mblk and byte count for q_count accounting */
2770 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
2771 		ADD_MBLK_SIZE(tmp, bytecnt);
2772 		mblkcnt++;
2773 	}
2774 
2775 	if (qbp) {	/* adjust qband pointers and count */
2776 		if (!qbp->qb_first) {
2777 			qbp->qb_first = mp;
2778 			qbp->qb_last = mp;
2779 		} else {
2780 			if (mp->b_prev == NULL || (mp->b_prev != NULL &&
2781 			    (mp->b_prev->b_band != mp->b_band)))
2782 				qbp->qb_first = mp;
2783 			else if (mp->b_next == NULL || (mp->b_next != NULL &&
2784 			    (mp->b_next->b_band != mp->b_band)))
2785 				qbp->qb_last = mp;
2786 		}
2787 		qbp->qb_count += bytecnt;
2788 		qbp->qb_mblkcnt += mblkcnt;
2789 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2790 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2791 			qbp->qb_flag |= QB_FULL;
2792 		}
2793 	} else {
2794 		q->q_count += bytecnt;
2795 		q->q_mblkcnt += mblkcnt;
2796 		if ((q->q_count >= q->q_hiwat) ||
2797 		    (q->q_mblkcnt >= q->q_hiwat)) {
2798 			q->q_flag |= QFULL;
2799 		}
2800 	}
2801 
2802 	STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL);
2803 
2804 	if (canenable(q) && (q->q_flag & QWANTR))
2805 		qenable_locked(q);
2806 
2807 	ASSERT(MUTEX_HELD(QLOCK(q)));
2808 	if (freezer != curthread)
2809 		mutex_exit(QLOCK(q));
2810 
2811 	return (1);
2812 }
2813 
2814 /*
2815  * Create and put a control message on queue.
2816  */
2817 int
2818 putctl(queue_t *q, int type)
2819 {
2820 	mblk_t *bp;
2821 
2822 	if ((datamsg(type) && (type != M_DELAY)) ||
2823 	    (bp = allocb_tryhard(0)) == NULL)
2824 		return (0);
2825 	bp->b_datap->db_type = (unsigned char) type;
2826 
2827 	put(q, bp);
2828 
2829 	return (1);
2830 }
2831 
2832 /*
2833  * Control message with a single-byte parameter
2834  */
2835 int
2836 putctl1(queue_t *q, int type, int param)
2837 {
2838 	mblk_t *bp;
2839 
2840 	if ((datamsg(type) && (type != M_DELAY)) ||
2841 	    (bp = allocb_tryhard(1)) == NULL)
2842 		return (0);
2843 	bp->b_datap->db_type = (unsigned char)type;
2844 	*bp->b_wptr++ = (unsigned char)param;
2845 
2846 	put(q, bp);
2847 
2848 	return (1);
2849 }
2850 
2851 int
2852 putnextctl1(queue_t *q, int type, int param)
2853 {
2854 	mblk_t *bp;
2855 
2856 	if ((datamsg(type) && (type != M_DELAY)) ||
2857 		((bp = allocb_tryhard(1)) == NULL))
2858 		return (0);
2859 
2860 	bp->b_datap->db_type = (unsigned char)type;
2861 	*bp->b_wptr++ = (unsigned char)param;
2862 
2863 	putnext(q, bp);
2864 
2865 	return (1);
2866 }
2867 
2868 int
2869 putnextctl(queue_t *q, int type)
2870 {
2871 	mblk_t *bp;
2872 
2873 	if ((datamsg(type) && (type != M_DELAY)) ||
2874 		((bp = allocb_tryhard(0)) == NULL))
2875 		return (0);
2876 	bp->b_datap->db_type = (unsigned char)type;
2877 
2878 	putnext(q, bp);
2879 
2880 	return (1);
2881 }
2882 
2883 /*
2884  * Return the queue upstream from this one
2885  */
2886 queue_t *
2887 backq(queue_t *q)
2888 {
2889 	q = _OTHERQ(q);
2890 	if (q->q_next) {
2891 		q = q->q_next;
2892 		return (_OTHERQ(q));
2893 	}
2894 	return (NULL);
2895 }
2896 
2897 /*
2898  * Send a block back up the queue in reverse from this
2899  * one (e.g. to respond to ioctls)
2900  */
2901 void
2902 qreply(queue_t *q, mblk_t *bp)
2903 {
2904 	ASSERT(q && bp);
2905 
2906 	putnext(_OTHERQ(q), bp);
2907 }
2908 
2909 /*
2910  * Streams Queue Scheduling
2911  *
2912  * Queues are enabled through qenable() when they have messages to
2913  * process.  They are serviced by queuerun(), which runs each enabled
2914  * queue's service procedure.  The call to queuerun() is processor
2915  * dependent - the general principle is that it be run whenever a queue
2916  * is enabled but before returning to user level.  For system calls,
2917  * the function runqueues() is called if their action causes a queue
2918  * to be enabled.  For device interrupts, queuerun() should be
2919  * called before returning from the last level of interrupt.  Beyond
2920  * this, no timing assumptions should be made about queue scheduling.
2921  */
2922 
2923 /*
2924  * Enable a queue: put it on list of those whose service procedures are
2925  * ready to run and set up the scheduling mechanism.
2926  * The broadcast is done outside the mutex -> to avoid the woken thread
2927  * from contending with the mutex. This is OK 'cos the queue has been
2928  * enqueued on the runlist and flagged safely at this point.
2929  */
2930 void
2931 qenable(queue_t *q)
2932 {
2933 	mutex_enter(QLOCK(q));
2934 	qenable_locked(q);
2935 	mutex_exit(QLOCK(q));
2936 }
2937 /*
2938  * Return number of messages on queue
2939  */
2940 int
2941 qsize(queue_t *qp)
2942 {
2943 	int count = 0;
2944 	mblk_t *mp;
2945 
2946 	mutex_enter(QLOCK(qp));
2947 	for (mp = qp->q_first; mp; mp = mp->b_next)
2948 		count++;
2949 	mutex_exit(QLOCK(qp));
2950 	return (count);
2951 }
2952 
2953 /*
2954  * noenable - set queue so that putq() will not enable it.
2955  * enableok - set queue so that putq() can enable it.
2956  */
2957 void
2958 noenable(queue_t *q)
2959 {
2960 	mutex_enter(QLOCK(q));
2961 	q->q_flag |= QNOENB;
2962 	mutex_exit(QLOCK(q));
2963 }
2964 
2965 void
2966 enableok(queue_t *q)
2967 {
2968 	mutex_enter(QLOCK(q));
2969 	q->q_flag &= ~QNOENB;
2970 	mutex_exit(QLOCK(q));
2971 }
2972 
2973 /*
2974  * Set queue fields.
2975  */
2976 int
2977 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
2978 {
2979 	qband_t *qbp = NULL;
2980 	queue_t	*wrq;
2981 	int error = 0;
2982 	kthread_id_t freezer;
2983 
2984 	freezer = STREAM(q)->sd_freezer;
2985 	if (freezer == curthread) {
2986 		ASSERT(frozenstr(q));
2987 		ASSERT(MUTEX_HELD(QLOCK(q)));
2988 	} else
2989 		mutex_enter(QLOCK(q));
2990 
2991 	if (what >= QBAD) {
2992 		error = EINVAL;
2993 		goto done;
2994 	}
2995 	if (pri != 0) {
2996 		int i;
2997 		qband_t **qbpp;
2998 
2999 		if (pri > q->q_nband) {
3000 			qbpp = &q->q_bandp;
3001 			while (*qbpp)
3002 				qbpp = &(*qbpp)->qb_next;
3003 			while (pri > q->q_nband) {
3004 				if ((*qbpp = allocband()) == NULL) {
3005 					error = EAGAIN;
3006 					goto done;
3007 				}
3008 				(*qbpp)->qb_hiwat = q->q_hiwat;
3009 				(*qbpp)->qb_lowat = q->q_lowat;
3010 				q->q_nband++;
3011 				qbpp = &(*qbpp)->qb_next;
3012 			}
3013 		}
3014 		qbp = q->q_bandp;
3015 		i = pri;
3016 		while (--i)
3017 			qbp = qbp->qb_next;
3018 	}
3019 	switch (what) {
3020 
3021 	case QHIWAT:
3022 		if (qbp)
3023 			qbp->qb_hiwat = (size_t)val;
3024 		else
3025 			q->q_hiwat = (size_t)val;
3026 		break;
3027 
3028 	case QLOWAT:
3029 		if (qbp)
3030 			qbp->qb_lowat = (size_t)val;
3031 		else
3032 			q->q_lowat = (size_t)val;
3033 		break;
3034 
3035 	case QMAXPSZ:
3036 		if (qbp)
3037 			error = EINVAL;
3038 		else
3039 			q->q_maxpsz = (ssize_t)val;
3040 
3041 		/*
3042 		 * Performance concern, strwrite looks at the module below
3043 		 * the stream head for the maxpsz each time it does a write
3044 		 * we now cache it at the stream head.  Check to see if this
3045 		 * queue is sitting directly below the stream head.
3046 		 */
3047 		wrq = STREAM(q)->sd_wrq;
3048 		if (q != wrq->q_next)
3049 			break;
3050 
3051 		/*
3052 		 * If the stream is not frozen drop the current QLOCK and
3053 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3054 		 */
3055 		if (freezer != curthread) {
3056 			mutex_exit(QLOCK(q));
3057 			mutex_enter(QLOCK(wrq));
3058 		}
3059 		ASSERT(MUTEX_HELD(QLOCK(wrq)));
3060 
3061 		if (strmsgsz != 0) {
3062 			if (val == INFPSZ)
3063 				val = strmsgsz;
3064 			else  {
3065 				if (STREAM(q)->sd_vnode->v_type == VFIFO)
3066 					val = MIN(PIPE_BUF, val);
3067 				else
3068 					val = MIN(strmsgsz, val);
3069 			}
3070 		}
3071 		STREAM(q)->sd_qn_maxpsz = val;
3072 		if (freezer != curthread) {
3073 			mutex_exit(QLOCK(wrq));
3074 			mutex_enter(QLOCK(q));
3075 		}
3076 		break;
3077 
3078 	case QMINPSZ:
3079 		if (qbp)
3080 			error = EINVAL;
3081 		else
3082 			q->q_minpsz = (ssize_t)val;
3083 
3084 		/*
3085 		 * Performance concern, strwrite looks at the module below
3086 		 * the stream head for the maxpsz each time it does a write
3087 		 * we now cache it at the stream head.  Check to see if this
3088 		 * queue is sitting directly below the stream head.
3089 		 */
3090 		wrq = STREAM(q)->sd_wrq;
3091 		if (q != wrq->q_next)
3092 			break;
3093 
3094 		/*
3095 		 * If the stream is not frozen drop the current QLOCK and
3096 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3097 		 */
3098 		if (freezer != curthread) {
3099 			mutex_exit(QLOCK(q));
3100 			mutex_enter(QLOCK(wrq));
3101 		}
3102 		STREAM(q)->sd_qn_minpsz = (ssize_t)val;
3103 
3104 		if (freezer != curthread) {
3105 			mutex_exit(QLOCK(wrq));
3106 			mutex_enter(QLOCK(q));
3107 		}
3108 		break;
3109 
3110 	case QSTRUIOT:
3111 		if (qbp)
3112 			error = EINVAL;
3113 		else
3114 			q->q_struiot = (ushort_t)val;
3115 		break;
3116 
3117 	case QCOUNT:
3118 	case QFIRST:
3119 	case QLAST:
3120 	case QFLAG:
3121 		error = EPERM;
3122 		break;
3123 
3124 	default:
3125 		error = EINVAL;
3126 		break;
3127 	}
3128 done:
3129 	if (freezer != curthread)
3130 		mutex_exit(QLOCK(q));
3131 	return (error);
3132 }
3133 
3134 /*
3135  * Get queue fields.
3136  */
3137 int
3138 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
3139 {
3140 	qband_t 	*qbp = NULL;
3141 	int 		error = 0;
3142 	kthread_id_t 	freezer;
3143 
3144 	freezer = STREAM(q)->sd_freezer;
3145 	if (freezer == curthread) {
3146 		ASSERT(frozenstr(q));
3147 		ASSERT(MUTEX_HELD(QLOCK(q)));
3148 	} else
3149 		mutex_enter(QLOCK(q));
3150 	if (what >= QBAD) {
3151 		error = EINVAL;
3152 		goto done;
3153 	}
3154 	if (pri != 0) {
3155 		int i;
3156 		qband_t **qbpp;
3157 
3158 		if (pri > q->q_nband) {
3159 			qbpp = &q->q_bandp;
3160 			while (*qbpp)
3161 				qbpp = &(*qbpp)->qb_next;
3162 			while (pri > q->q_nband) {
3163 				if ((*qbpp = allocband()) == NULL) {
3164 					error = EAGAIN;
3165 					goto done;
3166 				}
3167 				(*qbpp)->qb_hiwat = q->q_hiwat;
3168 				(*qbpp)->qb_lowat = q->q_lowat;
3169 				q->q_nband++;
3170 				qbpp = &(*qbpp)->qb_next;
3171 			}
3172 		}
3173 		qbp = q->q_bandp;
3174 		i = pri;
3175 		while (--i)
3176 			qbp = qbp->qb_next;
3177 	}
3178 	switch (what) {
3179 	case QHIWAT:
3180 		if (qbp)
3181 			*(size_t *)valp = qbp->qb_hiwat;
3182 		else
3183 			*(size_t *)valp = q->q_hiwat;
3184 		break;
3185 
3186 	case QLOWAT:
3187 		if (qbp)
3188 			*(size_t *)valp = qbp->qb_lowat;
3189 		else
3190 			*(size_t *)valp = q->q_lowat;
3191 		break;
3192 
3193 	case QMAXPSZ:
3194 		if (qbp)
3195 			error = EINVAL;
3196 		else
3197 			*(ssize_t *)valp = q->q_maxpsz;
3198 		break;
3199 
3200 	case QMINPSZ:
3201 		if (qbp)
3202 			error = EINVAL;
3203 		else
3204 			*(ssize_t *)valp = q->q_minpsz;
3205 		break;
3206 
3207 	case QCOUNT:
3208 		if (qbp)
3209 			*(size_t *)valp = qbp->qb_count;
3210 		else
3211 			*(size_t *)valp = q->q_count;
3212 		break;
3213 
3214 	case QFIRST:
3215 		if (qbp)
3216 			*(mblk_t **)valp = qbp->qb_first;
3217 		else
3218 			*(mblk_t **)valp = q->q_first;
3219 		break;
3220 
3221 	case QLAST:
3222 		if (qbp)
3223 			*(mblk_t **)valp = qbp->qb_last;
3224 		else
3225 			*(mblk_t **)valp = q->q_last;
3226 		break;
3227 
3228 	case QFLAG:
3229 		if (qbp)
3230 			*(uint_t *)valp = qbp->qb_flag;
3231 		else
3232 			*(uint_t *)valp = q->q_flag;
3233 		break;
3234 
3235 	case QSTRUIOT:
3236 		if (qbp)
3237 			error = EINVAL;
3238 		else
3239 			*(short *)valp = q->q_struiot;
3240 		break;
3241 
3242 	default:
3243 		error = EINVAL;
3244 		break;
3245 	}
3246 done:
3247 	if (freezer != curthread)
3248 		mutex_exit(QLOCK(q));
3249 	return (error);
3250 }
3251 
3252 /*
3253  * Function awakes all in cvwait/sigwait/pollwait, on one of:
3254  *	QWANTWSYNC or QWANTR or QWANTW,
3255  *
3256  * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
3257  *	 deferred wakeup will be done. Also if strpoll() in progress then a
3258  *	 deferred pollwakeup will be done.
3259  */
3260 void
3261 strwakeq(queue_t *q, int flag)
3262 {
3263 	stdata_t 	*stp = STREAM(q);
3264 	pollhead_t 	*pl;
3265 
3266 	mutex_enter(&stp->sd_lock);
3267 	pl = &stp->sd_pollist;
3268 	if (flag & QWANTWSYNC) {
3269 		ASSERT(!(q->q_flag & QREADR));
3270 		if (stp->sd_flag & WSLEEP) {
3271 			stp->sd_flag &= ~WSLEEP;
3272 			cv_broadcast(&stp->sd_wrq->q_wait);
3273 		} else {
3274 			stp->sd_wakeq |= WSLEEP;
3275 		}
3276 
3277 		mutex_exit(&stp->sd_lock);
3278 		pollwakeup(pl, POLLWRNORM);
3279 		mutex_enter(&stp->sd_lock);
3280 
3281 		if (stp->sd_sigflags & S_WRNORM)
3282 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3283 	} else if (flag & QWANTR) {
3284 		if (stp->sd_flag & RSLEEP) {
3285 			stp->sd_flag &= ~RSLEEP;
3286 			cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
3287 		} else {
3288 			stp->sd_wakeq |= RSLEEP;
3289 		}
3290 
3291 		mutex_exit(&stp->sd_lock);
3292 		pollwakeup(pl, POLLIN | POLLRDNORM);
3293 		mutex_enter(&stp->sd_lock);
3294 
3295 		{
3296 			int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
3297 
3298 			if (events)
3299 				strsendsig(stp->sd_siglist, events, 0, 0);
3300 		}
3301 	} else {
3302 		if (stp->sd_flag & WSLEEP) {
3303 			stp->sd_flag &= ~WSLEEP;
3304 			cv_broadcast(&stp->sd_wrq->q_wait);
3305 		}
3306 
3307 		mutex_exit(&stp->sd_lock);
3308 		pollwakeup(pl, POLLWRNORM);
3309 		mutex_enter(&stp->sd_lock);
3310 
3311 		if (stp->sd_sigflags & S_WRNORM)
3312 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3313 	}
3314 	mutex_exit(&stp->sd_lock);
3315 }
3316 
3317 int
3318 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
3319 {
3320 	stdata_t *stp = STREAM(q);
3321 	int typ  = STRUIOT_STANDARD;
3322 	uio_t	 *uiop = &dp->d_uio;
3323 	dblk_t	 *dbp;
3324 	ssize_t	 uiocnt;
3325 	ssize_t	 cnt;
3326 	unsigned char *ptr;
3327 	ssize_t	 resid;
3328 	int	 error = 0;
3329 	on_trap_data_t otd;
3330 	queue_t	*stwrq;
3331 
3332 	/*
3333 	 * Plumbing may change while taking the type so store the
3334 	 * queue in a temporary variable. It doesn't matter even
3335 	 * if the we take the type from the previous plumbing,
3336 	 * that's because if the plumbing has changed when we were
3337 	 * holding the queue in a temporary variable, we can continue
3338 	 * processing the message the way it would have been processed
3339 	 * in the old plumbing, without any side effects but a bit
3340 	 * extra processing for partial ip header checksum.
3341 	 *
3342 	 * This has been done to avoid holding the sd_lock which is
3343 	 * very hot.
3344 	 */
3345 
3346 	stwrq = stp->sd_struiowrq;
3347 	if (stwrq)
3348 		typ = stwrq->q_struiot;
3349 
3350 	for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
3351 		dbp = mp->b_datap;
3352 		ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
3353 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3354 		cnt = MIN(uiocnt, uiop->uio_resid);
3355 		if (!(dbp->db_struioflag & STRUIO_SPEC) ||
3356 		    (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
3357 			/*
3358 			 * Either this mblk has already been processed
3359 			 * or there is no more room in this mblk (?).
3360 			 */
3361 			continue;
3362 		}
3363 		switch (typ) {
3364 		case STRUIOT_STANDARD:
3365 			if (noblock) {
3366 				if (on_trap(&otd, OT_DATA_ACCESS)) {
3367 					no_trap();
3368 					error = EWOULDBLOCK;
3369 					goto out;
3370 				}
3371 			}
3372 			if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
3373 				if (noblock)
3374 					no_trap();
3375 				goto out;
3376 			}
3377 			if (noblock)
3378 				no_trap();
3379 			break;
3380 
3381 		default:
3382 			error = EIO;
3383 			goto out;
3384 		}
3385 		dbp->db_struioflag |= STRUIO_DONE;
3386 		dbp->db_cksumstuff += cnt;
3387 	}
3388 out:
3389 	if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
3390 		/*
3391 		 * A fault has occured and some bytes were moved to the
3392 		 * current mblk, the uio_t has already been updated by
3393 		 * the appropriate uio routine, so also update the mblk
3394 		 * to reflect this in case this same mblk chain is used
3395 		 * again (after the fault has been handled).
3396 		 */
3397 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3398 		if (uiocnt >= resid)
3399 			dbp->db_cksumstuff += resid;
3400 	}
3401 	return (error);
3402 }
3403 
3404 /*
3405  * Try to enter queue synchronously. Any attempt to enter a closing queue will
3406  * fails. The qp->q_rwcnt keeps track of the number of successful entries so
3407  * that removeq() will not try to close the queue while a thread is inside the
3408  * queue.
3409  */
3410 static boolean_t
3411 rwnext_enter(queue_t *qp)
3412 {
3413 	mutex_enter(QLOCK(qp));
3414 	if (qp->q_flag & QWCLOSE) {
3415 		mutex_exit(QLOCK(qp));
3416 		return (B_FALSE);
3417 	}
3418 	qp->q_rwcnt++;
3419 	ASSERT(qp->q_rwcnt != 0);
3420 	mutex_exit(QLOCK(qp));
3421 	return (B_TRUE);
3422 }
3423 
3424 /*
3425  * Decrease the count of threads running in sync stream queue and wake up any
3426  * threads blocked in removeq().
3427  */
3428 static void
3429 rwnext_exit(queue_t *qp)
3430 {
3431 	mutex_enter(QLOCK(qp));
3432 	qp->q_rwcnt--;
3433 	if (qp->q_flag & QWANTRMQSYNC) {
3434 		qp->q_flag &= ~QWANTRMQSYNC;
3435 		cv_broadcast(&qp->q_wait);
3436 	}
3437 	mutex_exit(QLOCK(qp));
3438 }
3439 
3440 /*
3441  * The purpose of rwnext() is to call the rw procedure of the next
3442  * (downstream) modules queue.
3443  *
3444  * treated as put entrypoint for perimeter syncronization.
3445  *
3446  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3447  * sync queues). If it is CIPUT sync queue sq_count is incremented and it does
3448  * not matter if any regular put entrypoints have been already entered. We
3449  * can't increment one of the sq_putcounts (instead of sq_count) because
3450  * qwait_rw won't know which counter to decrement.
3451  *
3452  * It would be reasonable to add the lockless FASTPUT logic.
3453  */
3454 int
3455 rwnext(queue_t *qp, struiod_t *dp)
3456 {
3457 	queue_t		*nqp;
3458 	syncq_t		*sq;
3459 	uint16_t	count;
3460 	uint16_t	flags;
3461 	struct qinit	*qi;
3462 	int		(*proc)();
3463 	struct stdata	*stp;
3464 	int		isread;
3465 	int		rval;
3466 
3467 	stp = STREAM(qp);
3468 	/*
3469 	 * Prevent q_next from changing by holding sd_lock until acquiring
3470 	 * SQLOCK. Note that a read-side rwnext from the streamhead will
3471 	 * already have sd_lock acquired. In either case sd_lock is always
3472 	 * released after acquiring SQLOCK.
3473 	 *
3474 	 * The streamhead read-side holding sd_lock when calling rwnext is
3475 	 * required to prevent a race condition were M_DATA mblks flowing
3476 	 * up the read-side of the stream could be bypassed by a rwnext()
3477 	 * down-call. In this case sd_lock acts as the streamhead perimeter.
3478 	 */
3479 	if ((nqp = _WR(qp)) == qp) {
3480 		isread = 0;
3481 		mutex_enter(&stp->sd_lock);
3482 		qp = nqp->q_next;
3483 	} else {
3484 		isread = 1;
3485 		if (nqp != stp->sd_wrq)
3486 			/* Not streamhead */
3487 			mutex_enter(&stp->sd_lock);
3488 		qp = _RD(nqp->q_next);
3489 	}
3490 	qi = qp->q_qinfo;
3491 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
3492 		/*
3493 		 * Not a synchronous module or no r/w procedure for this
3494 		 * queue, so just return EINVAL and let the caller handle it.
3495 		 */
3496 		mutex_exit(&stp->sd_lock);
3497 		return (EINVAL);
3498 	}
3499 
3500 	if (rwnext_enter(qp) == B_FALSE) {
3501 		mutex_exit(&stp->sd_lock);
3502 		return (EINVAL);
3503 	}
3504 
3505 	sq = qp->q_syncq;
3506 	mutex_enter(SQLOCK(sq));
3507 	mutex_exit(&stp->sd_lock);
3508 	count = sq->sq_count;
3509 	flags = sq->sq_flags;
3510 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3511 
3512 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3513 		/*
3514 		 * if this queue is being closed, return.
3515 		 */
3516 		if (qp->q_flag & QWCLOSE) {
3517 			mutex_exit(SQLOCK(sq));
3518 			rwnext_exit(qp);
3519 			return (EINVAL);
3520 		}
3521 
3522 		/*
3523 		 * Wait until we can enter the inner perimeter.
3524 		 */
3525 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3526 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3527 		count = sq->sq_count;
3528 		flags = sq->sq_flags;
3529 	}
3530 
3531 	if (isread == 0 && stp->sd_struiowrq == NULL ||
3532 	    isread == 1 && stp->sd_struiordq == NULL) {
3533 		/*
3534 		 * Stream plumbing changed while waiting for inner perimeter
3535 		 * so just return EINVAL and let the caller handle it.
3536 		 */
3537 		mutex_exit(SQLOCK(sq));
3538 		rwnext_exit(qp);
3539 		return (EINVAL);
3540 	}
3541 	if (!(flags & SQ_CIPUT))
3542 		sq->sq_flags = flags | SQ_EXCL;
3543 	sq->sq_count = count + 1;
3544 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3545 	/*
3546 	 * Note: The only message ordering guarantee that rwnext() makes is
3547 	 *	 for the write queue flow-control case. All others (r/w queue
3548 	 *	 with q_count > 0 (or q_first != 0)) are the resposibilty of
3549 	 *	 the queue's rw procedure. This could be genralized here buy
3550 	 *	 running the queue's service procedure, but that wouldn't be
3551 	 *	 the most efficent for all cases.
3552 	 */
3553 	mutex_exit(SQLOCK(sq));
3554 	if (! isread && (qp->q_flag & QFULL)) {
3555 		/*
3556 		 * Write queue may be flow controlled. If so,
3557 		 * mark the queue for wakeup when it's not.
3558 		 */
3559 		mutex_enter(QLOCK(qp));
3560 		if (qp->q_flag & QFULL) {
3561 			qp->q_flag |= QWANTWSYNC;
3562 			mutex_exit(QLOCK(qp));
3563 			rval = EWOULDBLOCK;
3564 			goto out;
3565 		}
3566 		mutex_exit(QLOCK(qp));
3567 	}
3568 
3569 	if (! isread && dp->d_mp)
3570 		STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
3571 		    dp->d_mp->b_datap->db_base);
3572 
3573 	rval = (*proc)(qp, dp);
3574 
3575 	if (isread && dp->d_mp)
3576 		STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
3577 		    dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
3578 out:
3579 	/*
3580 	 * The queue is protected from being freed by sq_count, so it is
3581 	 * safe to call rwnext_exit and reacquire SQLOCK(sq).
3582 	 */
3583 	rwnext_exit(qp);
3584 
3585 	mutex_enter(SQLOCK(sq));
3586 	flags = sq->sq_flags;
3587 	ASSERT(sq->sq_count != 0);
3588 	sq->sq_count--;
3589 	if (flags & SQ_TAIL) {
3590 		putnext_tail(sq, qp, flags);
3591 		/*
3592 		 * The only purpose of this ASSERT is to preserve calling stack
3593 		 * in DEBUG kernel.
3594 		 */
3595 		ASSERT(flags & SQ_TAIL);
3596 		return (rval);
3597 	}
3598 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3599 	/*
3600 	 * Safe to always drop SQ_EXCL:
3601 	 *	Not SQ_CIPUT means we set SQ_EXCL above
3602 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3603 	 *	did a qwriter(INNER) in which case nobody else
3604 	 *	is in the inner perimeter and we are exiting.
3605 	 *
3606 	 * I would like to make the following assertion:
3607 	 *
3608 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3609 	 * 	sq->sq_count == 0);
3610 	 *
3611 	 * which indicates that if we are both putshared and exclusive,
3612 	 * we became exclusive while executing the putproc, and the only
3613 	 * claim on the syncq was the one we dropped a few lines above.
3614 	 * But other threads that enter putnext while the syncq is exclusive
3615 	 * need to make a claim as they may need to drop SQLOCK in the
3616 	 * has_writers case to avoid deadlocks.  If these threads are
3617 	 * delayed or preempted, it is possible that the writer thread can
3618 	 * find out that there are other claims making the (sq_count == 0)
3619 	 * test invalid.
3620 	 */
3621 
3622 	sq->sq_flags = flags & ~SQ_EXCL;
3623 	if (sq->sq_flags & SQ_WANTWAKEUP) {
3624 		sq->sq_flags &= ~SQ_WANTWAKEUP;
3625 		cv_broadcast(&sq->sq_wait);
3626 	}
3627 	mutex_exit(SQLOCK(sq));
3628 	return (rval);
3629 }
3630 
3631 /*
3632  * The purpose of infonext() is to call the info procedure of the next
3633  * (downstream) modules queue.
3634  *
3635  * treated as put entrypoint for perimeter syncronization.
3636  *
3637  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3638  * sync queues). If it is CIPUT sync queue regular sq_count is incremented and
3639  * it does not matter if any regular put entrypoints have been already
3640  * entered.
3641  */
3642 int
3643 infonext(queue_t *qp, infod_t *idp)
3644 {
3645 	queue_t		*nqp;
3646 	syncq_t		*sq;
3647 	uint16_t	count;
3648 	uint16_t 	flags;
3649 	struct qinit	*qi;
3650 	int		(*proc)();
3651 	struct stdata	*stp;
3652 	int		rval;
3653 
3654 	stp = STREAM(qp);
3655 	/*
3656 	 * Prevent q_next from changing by holding sd_lock until
3657 	 * acquiring SQLOCK.
3658 	 */
3659 	mutex_enter(&stp->sd_lock);
3660 	if ((nqp = _WR(qp)) == qp) {
3661 		qp = nqp->q_next;
3662 	} else {
3663 		qp = _RD(nqp->q_next);
3664 	}
3665 	qi = qp->q_qinfo;
3666 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
3667 		mutex_exit(&stp->sd_lock);
3668 		return (EINVAL);
3669 	}
3670 	sq = qp->q_syncq;
3671 	mutex_enter(SQLOCK(sq));
3672 	mutex_exit(&stp->sd_lock);
3673 	count = sq->sq_count;
3674 	flags = sq->sq_flags;
3675 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3676 
3677 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3678 		/*
3679 		 * Wait until we can enter the inner perimeter.
3680 		 */
3681 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3682 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3683 		count = sq->sq_count;
3684 		flags = sq->sq_flags;
3685 	}
3686 
3687 	if (! (flags & SQ_CIPUT))
3688 		sq->sq_flags = flags | SQ_EXCL;
3689 	sq->sq_count = count + 1;
3690 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3691 	mutex_exit(SQLOCK(sq));
3692 
3693 	rval = (*proc)(qp, idp);
3694 
3695 	mutex_enter(SQLOCK(sq));
3696 	flags = sq->sq_flags;
3697 	ASSERT(sq->sq_count != 0);
3698 	sq->sq_count--;
3699 	if (flags & SQ_TAIL) {
3700 		putnext_tail(sq, qp, flags);
3701 		/*
3702 		 * The only purpose of this ASSERT is to preserve calling stack
3703 		 * in DEBUG kernel.
3704 		 */
3705 		ASSERT(flags & SQ_TAIL);
3706 		return (rval);
3707 	}
3708 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3709 /*
3710  * XXXX
3711  * I am not certain the next comment is correct here.  I need to consider
3712  * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
3713  * might cause other problems.  It just might be safer to drop it if
3714  * !SQ_CIPUT because that is when we set it.
3715  */
3716 	/*
3717 	 * Safe to always drop SQ_EXCL:
3718 	 *	Not SQ_CIPUT means we set SQ_EXCL above
3719 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3720 	 *	did a qwriter(INNER) in which case nobody else
3721 	 *	is in the inner perimeter and we are exiting.
3722 	 *
3723 	 * I would like to make the following assertion:
3724 	 *
3725 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3726 	 *	sq->sq_count == 0);
3727 	 *
3728 	 * which indicates that if we are both putshared and exclusive,
3729 	 * we became exclusive while executing the putproc, and the only
3730 	 * claim on the syncq was the one we dropped a few lines above.
3731 	 * But other threads that enter putnext while the syncq is exclusive
3732 	 * need to make a claim as they may need to drop SQLOCK in the
3733 	 * has_writers case to avoid deadlocks.  If these threads are
3734 	 * delayed or preempted, it is possible that the writer thread can
3735 	 * find out that there are other claims making the (sq_count == 0)
3736 	 * test invalid.
3737 	 */
3738 
3739 	sq->sq_flags = flags & ~SQ_EXCL;
3740 	mutex_exit(SQLOCK(sq));
3741 	return (rval);
3742 }
3743 
3744 /*
3745  * Return nonzero if the queue is responsible for struio(), else return 0.
3746  */
3747 int
3748 isuioq(queue_t *q)
3749 {
3750 	if (q->q_flag & QREADR)
3751 		return (STREAM(q)->sd_struiordq == q);
3752 	else
3753 		return (STREAM(q)->sd_struiowrq == q);
3754 }
3755 
3756 #if defined(__sparc)
3757 int disable_putlocks = 0;
3758 #else
3759 int disable_putlocks = 1;
3760 #endif
3761 
3762 /*
3763  * called by create_putlock.
3764  */
3765 static void
3766 create_syncq_putlocks(queue_t *q)
3767 {
3768 	syncq_t	*sq = q->q_syncq;
3769 	ciputctrl_t *cip;
3770 	int i;
3771 
3772 	ASSERT(sq != NULL);
3773 
3774 	ASSERT(disable_putlocks == 0);
3775 	ASSERT(n_ciputctrl >= min_n_ciputctrl);
3776 	ASSERT(ciputctrl_cache != NULL);
3777 
3778 	if (!(sq->sq_type & SQ_CIPUT))
3779 		return;
3780 
3781 	for (i = 0; i <= 1; i++) {
3782 		if (sq->sq_ciputctrl == NULL) {
3783 			cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
3784 			SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
3785 			mutex_enter(SQLOCK(sq));
3786 			if (sq->sq_ciputctrl != NULL) {
3787 				mutex_exit(SQLOCK(sq));
3788 				kmem_cache_free(ciputctrl_cache, cip);
3789 			} else {
3790 				ASSERT(sq->sq_nciputctrl == 0);
3791 				sq->sq_nciputctrl = n_ciputctrl - 1;
3792 				/*
3793 				 * putnext checks sq_ciputctrl without holding
3794 				 * SQLOCK. if it is not NULL putnext assumes
3795 				 * sq_nciputctrl is initialized. membar below
3796 				 * insures that.
3797 				 */
3798 				membar_producer();
3799 				sq->sq_ciputctrl = cip;
3800 				mutex_exit(SQLOCK(sq));
3801 			}
3802 		}
3803 		ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
3804 		if (i == 1)
3805 			break;
3806 		q = _OTHERQ(q);
3807 		if (!(q->q_flag & QPERQ)) {
3808 			ASSERT(sq == q->q_syncq);
3809 			break;
3810 		}
3811 		ASSERT(q->q_syncq != NULL);
3812 		ASSERT(sq != q->q_syncq);
3813 		sq = q->q_syncq;
3814 		ASSERT(sq->sq_type & SQ_CIPUT);
3815 	}
3816 }
3817 
3818 /*
3819  * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
3820  * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
3821  * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
3822  * starting from q and down to the driver.
3823  *
3824  * This should be called after the affected queues are part of stream
3825  * geometry. It should be called from driver/module open routine after
3826  * qprocson() call. It is also called from nfs syscall where it is known that
3827  * stream is configured and won't change its geometry during create_putlock
3828  * call.
3829  *
3830  * caller normally uses 0 value for the stream argument to speed up MT putnext
3831  * into the perimeter of q for example because its perimeter is per module
3832  * (e.g. IP).
3833  *
3834  * caller normally uses non 0 value for the stream argument to hint the system
3835  * that the stream of q is a very contended global system stream
3836  * (e.g. NFS/UDP) and the part of the stream from q to the driver is
3837  * particularly MT hot.
3838  *
3839  * Caller insures stream plumbing won't happen while we are here and therefore
3840  * q_next can be safely used.
3841  */
3842 
3843 void
3844 create_putlocks(queue_t *q, int stream)
3845 {
3846 	ciputctrl_t	*cip;
3847 	struct stdata	*stp = STREAM(q);
3848 
3849 	q = _WR(q);
3850 	ASSERT(stp != NULL);
3851 
3852 	if (disable_putlocks != 0)
3853 		return;
3854 
3855 	if (n_ciputctrl < min_n_ciputctrl)
3856 		return;
3857 
3858 	ASSERT(ciputctrl_cache != NULL);
3859 
3860 	if (stream != 0 && stp->sd_ciputctrl == NULL) {
3861 		cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
3862 		SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
3863 		mutex_enter(&stp->sd_lock);
3864 		if (stp->sd_ciputctrl != NULL) {
3865 			mutex_exit(&stp->sd_lock);
3866 			kmem_cache_free(ciputctrl_cache, cip);
3867 		} else {
3868 			ASSERT(stp->sd_nciputctrl == 0);
3869 			stp->sd_nciputctrl = n_ciputctrl - 1;
3870 			/*
3871 			 * putnext checks sd_ciputctrl without holding
3872 			 * sd_lock. if it is not NULL putnext assumes
3873 			 * sd_nciputctrl is initialized. membar below
3874 			 * insures that.
3875 			 */
3876 			membar_producer();
3877 			stp->sd_ciputctrl = cip;
3878 			mutex_exit(&stp->sd_lock);
3879 		}
3880 	}
3881 
3882 	ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);
3883 
3884 	while (_SAMESTR(q)) {
3885 		create_syncq_putlocks(q);
3886 		if (stream == 0)
3887 			return;
3888 		q = q->q_next;
3889 	}
3890 	ASSERT(q != NULL);
3891 	create_syncq_putlocks(q);
3892 }
3893 
3894 /*
3895  * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
3896  * through a stream.
3897  *
3898  * Data currently record per event is a hrtime stamp, queue address, event
3899  * type, and a per type datum.  Much of the STREAMS framework is instrumented
3900  * for automatic flow tracing (when enabled).  Events can be defined and used
3901  * by STREAMS modules and drivers.
3902  *
3903  * Global objects:
3904  *
3905  *	str_ftevent() - Add a flow-trace event to a dblk.
3906  *	str_ftfree() - Free flow-trace data
3907  *
3908  * Local objects:
3909  *
3910  *	fthdr_cache - pointer to the kmem cache for trace header.
3911  *	ftblk_cache - pointer to the kmem cache for trace data blocks.
3912  */
3913 
3914 int str_ftnever = 1;	/* Don't do STREAMS flow tracing */
3915 
3916 void
3917 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
3918 {
3919 	ftblk_t *bp = hp->tail;
3920 	ftblk_t *nbp;
3921 	ftevnt_t *ep;
3922 	int ix, nix;
3923 
3924 	ASSERT(hp != NULL);
3925 
3926 	for (;;) {
3927 		if ((ix = bp->ix) == FTBLK_EVNTS) {
3928 			/*
3929 			 * Tail doesn't have room, so need a new tail.
3930 			 *
3931 			 * To make this MT safe, first, allocate a new
3932 			 * ftblk, and initialize it.  To make life a
3933 			 * little easier, reserve the first slot (mostly
3934 			 * by making ix = 1).  When we are finished with
3935 			 * the initialization, CAS this pointer to the
3936 			 * tail.  If this succeeds, this is the new
3937 			 * "next" block.  Otherwise, another thread
3938 			 * got here first, so free the block and start
3939 			 * again.
3940 			 */
3941 			if (!(nbp = kmem_cache_alloc(ftblk_cache,
3942 			    KM_NOSLEEP))) {
3943 				/* no mem, so punt */
3944 				str_ftnever++;
3945 				/* free up all flow data? */
3946 				return;
3947 			}
3948 			nbp->nxt = NULL;
3949 			nbp->ix = 1;
3950 			/*
3951 			 * Just in case there is another thread about
3952 			 * to get the next index, we need to make sure
3953 			 * the value is there for it.
3954 			 */
3955 			membar_producer();
3956 			if (casptr(&hp->tail, bp, nbp) == bp) {
3957 				/* CAS was successful */
3958 				bp->nxt = nbp;
3959 				membar_producer();
3960 				bp = nbp;
3961 				ix = 0;
3962 				goto cas_good;
3963 			} else {
3964 				kmem_cache_free(ftblk_cache, nbp);
3965 				bp = hp->tail;
3966 				continue;
3967 			}
3968 		}
3969 		nix = ix + 1;
3970 		if (cas32((uint32_t *)&bp->ix, ix, nix) == ix) {
3971 		cas_good:
3972 			if (curthread != hp->thread) {
3973 				hp->thread = curthread;
3974 				evnt |= FTEV_CS;
3975 			}
3976 			if (CPU->cpu_seqid != hp->cpu_seqid) {
3977 				hp->cpu_seqid = CPU->cpu_seqid;
3978 				evnt |= FTEV_PS;
3979 			}
3980 			ep = &bp->ev[ix];
3981 			break;
3982 		}
3983 	}
3984 
3985 	if (evnt & FTEV_QMASK) {
3986 		queue_t *qp = p;
3987 
3988 		/*
3989 		 * It is possible that the module info is broke
3990 		 * (as is logsubr.c at this comment writing).
3991 		 * Instead of panicing or doing other unmentionables,
3992 		 * we shall put a dummy name as the mid, and continue.
3993 		 */
3994 		if (qp->q_qinfo == NULL)
3995 			ep->mid = "NONAME";
3996 		else
3997 			ep->mid = qp->q_qinfo->qi_minfo->mi_idname;
3998 
3999 		if (!(qp->q_flag & QREADR))
4000 			evnt |= FTEV_ISWR;
4001 	} else {
4002 		ep->mid = (char *)p;
4003 	}
4004 
4005 	ep->ts = gethrtime();
4006 	ep->evnt = evnt;
4007 	ep->data = data;
4008 	hp->hash = (hp->hash << 9) + hp->hash;
4009 	hp->hash += (evnt << 16) | data;
4010 	hp->hash += (uintptr_t)ep->mid;
4011 }
4012 
4013 /*
4014  * Free flow-trace data.
4015  */
4016 void
4017 str_ftfree(dblk_t *dbp)
4018 {
4019 	fthdr_t *hp = dbp->db_fthdr;
4020 	ftblk_t *bp = &hp->first;
4021 	ftblk_t *nbp;
4022 
4023 	if (bp != hp->tail || bp->ix != 0) {
4024 		/*
4025 		 * Clear out the hash, have the tail point to itself, and free
4026 		 * any continuation blocks.
4027 		 */
4028 		bp = hp->first.nxt;
4029 		hp->tail = &hp->first;
4030 		hp->hash = 0;
4031 		hp->first.nxt = NULL;
4032 		hp->first.ix = 0;
4033 		while (bp != NULL) {
4034 			nbp = bp->nxt;
4035 			kmem_cache_free(ftblk_cache, bp);
4036 			bp = nbp;
4037 		}
4038 	}
4039 	kmem_cache_free(fthdr_cache, hp);
4040 	dbp->db_fthdr = NULL;
4041 }
4042