xref: /illumos-gate/usr/src/uts/common/io/stream.c (revision eda50310abb3984bab11856a2aca8936d26881cb)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
22 /*	  All Rights Reserved  	*/
23 
24 
25 /*
26  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
27  * Use is subject to license terms.
28  */
29 
30 #pragma ident	"%Z%%M%	%I%	%E% SMI"
31 
32 #include <sys/types.h>
33 #include <sys/param.h>
34 #include <sys/thread.h>
35 #include <sys/sysmacros.h>
36 #include <sys/stropts.h>
37 #include <sys/stream.h>
38 #include <sys/strsubr.h>
39 #include <sys/strsun.h>
40 #include <sys/conf.h>
41 #include <sys/debug.h>
42 #include <sys/cmn_err.h>
43 #include <sys/kmem.h>
44 #include <sys/atomic.h>
45 #include <sys/errno.h>
46 #include <sys/vtrace.h>
47 #include <sys/ftrace.h>
48 #include <sys/ontrap.h>
49 #include <sys/multidata.h>
50 #include <sys/multidata_impl.h>
51 #include <sys/sdt.h>
52 #include <sys/strft.h>
53 
54 #ifdef DEBUG
55 #include <sys/kmem_impl.h>
56 #endif
57 
58 /*
59  * This file contains all the STREAMS utility routines that may
60  * be used by modules and drivers.
61  */
62 
63 /*
64  * STREAMS message allocator: principles of operation
65  *
66  * The streams message allocator consists of all the routines that
67  * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
68  * dupb(), freeb() and freemsg().  What follows is a high-level view
69  * of how the allocator works.
70  *
71  * Every streams message consists of one or more mblks, a dblk, and data.
72  * All mblks for all types of messages come from a common mblk_cache.
73  * The dblk and data come in several flavors, depending on how the
74  * message is allocated:
75  *
76  * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
77  *     fixed-size dblk/data caches. For message sizes that are multiples of
78  *     PAGESIZE, dblks are allocated separately from the buffer.
79  *     The associated buffer is allocated by the constructor using kmem_alloc().
80  *     For all other message sizes, dblk and its associated data is allocated
81  *     as a single contiguous chunk of memory.
82  *     Objects in these caches consist of a dblk plus its associated data.
83  *     allocb() determines the nearest-size cache by table lookup:
84  *     the dblk_cache[] array provides the mapping from size to dblk cache.
85  *
86  * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
87  *     kmem_alloc()'ing a buffer for the data and supplying that
88  *     buffer to gesballoc(), described below.
89  *
90  * (3) The four flavors of [d]esballoc[a] are all implemented by a
91  *     common routine, gesballoc() ("generic esballoc").  gesballoc()
92  *     allocates a dblk from the global dblk_esb_cache and sets db_base,
93  *     db_lim and db_frtnp to describe the caller-supplied buffer.
94  *
95  * While there are several routines to allocate messages, there is only
96  * one routine to free messages: freeb().  freeb() simply invokes the
97  * dblk's free method, dbp->db_free(), which is set at allocation time.
98  *
99  * dupb() creates a new reference to a message by allocating a new mblk,
100  * incrementing the dblk reference count and setting the dblk's free
101  * method to dblk_decref().  The dblk's original free method is retained
102  * in db_lastfree.  dblk_decref() decrements the reference count on each
103  * freeb().  If this is not the last reference it just frees the mblk;
104  * if this *is* the last reference, it restores db_free to db_lastfree,
105  * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
106  *
107  * The implementation makes aggressive use of kmem object caching for
108  * maximum performance.  This makes the code simple and compact, but
109  * also a bit abstruse in some places.  The invariants that constitute a
110  * message's constructed state, described below, are more subtle than usual.
111  *
112  * Every dblk has an "attached mblk" as part of its constructed state.
113  * The mblk is allocated by the dblk's constructor and remains attached
114  * until the message is either dup'ed or pulled up.  In the dupb() case
115  * the mblk association doesn't matter until the last free, at which time
116  * dblk_decref() attaches the last mblk to the dblk.  pullupmsg() affects
117  * the mblk association because it swaps the leading mblks of two messages,
118  * so it is responsible for swapping their db_mblk pointers accordingly.
119  * From a constructed-state viewpoint it doesn't matter that a dblk's
120  * attached mblk can change while the message is allocated; all that
121  * matters is that the dblk has *some* attached mblk when it's freed.
122  *
123  * The sizes of the allocb() small-message caches are not magical.
124  * They represent a good trade-off between internal and external
125  * fragmentation for current workloads.  They should be reevaluated
126  * periodically, especially if allocations larger than DBLK_MAX_CACHE
127  * become common.  We use 64-byte alignment so that dblks don't
128  * straddle cache lines unnecessarily.
129  */
130 #define	DBLK_MAX_CACHE		73728
131 #define	DBLK_CACHE_ALIGN	64
132 #define	DBLK_MIN_SIZE		8
133 #define	DBLK_SIZE_SHIFT		3
134 
135 #ifdef _BIG_ENDIAN
136 #define	DBLK_RTFU_SHIFT(field)	\
137 	(8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
138 #else
139 #define	DBLK_RTFU_SHIFT(field)	\
140 	(8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
141 #endif
142 
143 #define	DBLK_RTFU(ref, type, flags, uioflag)	\
144 	(((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
145 	((type) << DBLK_RTFU_SHIFT(db_type)) | \
146 	(((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
147 	((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
148 #define	DBLK_RTFU_REF_MASK	(DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
149 #define	DBLK_RTFU_WORD(dbp)	(*((uint32_t *)&(dbp)->db_ref))
150 #define	MBLK_BAND_FLAG_WORD(mp)	(*((uint32_t *)&(mp)->b_band))
151 
152 static size_t dblk_sizes[] = {
153 #ifdef _LP64
154 	16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3920,
155 	8192, 12112, 16384, 20304, 24576, 28496, 32768, 36688,
156 	40960, 44880, 49152, 53072, 57344, 61264, 65536, 69456,
157 #else
158 	64, 128, 320, 576, 1088, 1536, 1984, 2624, 3968,
159 	8192, 12160, 16384, 20352, 24576, 28544, 32768, 36736,
160 	40960, 44928, 49152, 53120, 57344, 61312, 65536, 69504,
161 #endif
162 	DBLK_MAX_CACHE, 0
163 };
164 
165 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
166 static struct kmem_cache *mblk_cache;
167 static struct kmem_cache *dblk_esb_cache;
168 static struct kmem_cache *fthdr_cache;
169 static struct kmem_cache *ftblk_cache;
170 
171 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
172 static mblk_t *allocb_oversize(size_t size, int flags);
173 static int allocb_tryhard_fails;
174 static void frnop_func(void *arg);
175 frtn_t frnop = { frnop_func };
176 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
177 
178 static boolean_t rwnext_enter(queue_t *qp);
179 static void rwnext_exit(queue_t *qp);
180 
181 /*
182  * Patchable mblk/dblk kmem_cache flags.
183  */
184 int dblk_kmem_flags = 0;
185 int mblk_kmem_flags = 0;
186 
187 
188 static int
189 dblk_constructor(void *buf, void *cdrarg, int kmflags)
190 {
191 	dblk_t *dbp = buf;
192 	ssize_t msg_size = (ssize_t)cdrarg;
193 	size_t index;
194 
195 	ASSERT(msg_size != 0);
196 
197 	index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
198 
199 	ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
200 
201 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
202 		return (-1);
203 	if ((msg_size & PAGEOFFSET) == 0) {
204 		dbp->db_base = kmem_alloc(msg_size, kmflags);
205 		if (dbp->db_base == NULL) {
206 			kmem_cache_free(mblk_cache, dbp->db_mblk);
207 			return (-1);
208 		}
209 	} else {
210 		dbp->db_base = (unsigned char *)&dbp[1];
211 	}
212 
213 	dbp->db_mblk->b_datap = dbp;
214 	dbp->db_cache = dblk_cache[index];
215 	dbp->db_lim = dbp->db_base + msg_size;
216 	dbp->db_free = dbp->db_lastfree = dblk_lastfree;
217 	dbp->db_frtnp = NULL;
218 	dbp->db_fthdr = NULL;
219 	dbp->db_credp = NULL;
220 	dbp->db_cpid = -1;
221 	dbp->db_struioflag = 0;
222 	dbp->db_struioun.cksum.flags = 0;
223 	return (0);
224 }
225 
226 /*ARGSUSED*/
227 static int
228 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
229 {
230 	dblk_t *dbp = buf;
231 
232 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
233 		return (-1);
234 	dbp->db_mblk->b_datap = dbp;
235 	dbp->db_cache = dblk_esb_cache;
236 	dbp->db_fthdr = NULL;
237 	dbp->db_credp = NULL;
238 	dbp->db_cpid = -1;
239 	dbp->db_struioflag = 0;
240 	dbp->db_struioun.cksum.flags = 0;
241 	return (0);
242 }
243 
244 static int
245 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
246 {
247 	dblk_t *dbp = buf;
248 	bcache_t *bcp = (bcache_t *)cdrarg;
249 
250 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
251 		return (-1);
252 
253 	if ((dbp->db_base = (unsigned char *)kmem_cache_alloc(bcp->buffer_cache,
254 	    kmflags)) == NULL) {
255 		kmem_cache_free(mblk_cache, dbp->db_mblk);
256 		return (-1);
257 	}
258 
259 	dbp->db_mblk->b_datap = dbp;
260 	dbp->db_cache = (void *)bcp;
261 	dbp->db_lim = dbp->db_base + bcp->size;
262 	dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
263 	dbp->db_frtnp = NULL;
264 	dbp->db_fthdr = NULL;
265 	dbp->db_credp = NULL;
266 	dbp->db_cpid = -1;
267 	dbp->db_struioflag = 0;
268 	dbp->db_struioun.cksum.flags = 0;
269 	return (0);
270 }
271 
272 /*ARGSUSED*/
273 static void
274 dblk_destructor(void *buf, void *cdrarg)
275 {
276 	dblk_t *dbp = buf;
277 	ssize_t msg_size = (ssize_t)cdrarg;
278 
279 	ASSERT(dbp->db_mblk->b_datap == dbp);
280 
281 	ASSERT(msg_size != 0);
282 
283 	ASSERT(dbp->db_struioflag == 0);
284 	ASSERT(dbp->db_struioun.cksum.flags == 0);
285 
286 	if ((msg_size & PAGEOFFSET) == 0) {
287 		kmem_free(dbp->db_base, msg_size);
288 	}
289 
290 	kmem_cache_free(mblk_cache, dbp->db_mblk);
291 }
292 
293 static void
294 bcache_dblk_destructor(void *buf, void *cdrarg)
295 {
296 	dblk_t *dbp = buf;
297 	bcache_t *bcp = (bcache_t *)cdrarg;
298 
299 	kmem_cache_free(bcp->buffer_cache, dbp->db_base);
300 
301 	ASSERT(dbp->db_mblk->b_datap == dbp);
302 
303 	ASSERT(dbp->db_struioflag == 0);
304 	ASSERT(dbp->db_struioun.cksum.flags == 0);
305 
306 	kmem_cache_free(mblk_cache, dbp->db_mblk);
307 }
308 
309 void
310 streams_msg_init(void)
311 {
312 	char name[40];
313 	size_t size;
314 	size_t lastsize = DBLK_MIN_SIZE;
315 	size_t *sizep;
316 	struct kmem_cache *cp;
317 	size_t tot_size;
318 	int offset;
319 
320 	mblk_cache = kmem_cache_create("streams_mblk",
321 		sizeof (mblk_t), 32, NULL, NULL, NULL, NULL, NULL,
322 		mblk_kmem_flags);
323 
324 	for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
325 
326 		if ((offset = (size & PAGEOFFSET)) != 0) {
327 			/*
328 			 * We are in the middle of a page, dblk should
329 			 * be allocated on the same page
330 			 */
331 			tot_size = size + sizeof (dblk_t);
332 			ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
333 								< PAGESIZE);
334 			ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
335 
336 		} else {
337 
338 			/*
339 			 * buf size is multiple of page size, dblk and
340 			 * buffer are allocated separately.
341 			 */
342 
343 			ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
344 			tot_size = sizeof (dblk_t);
345 		}
346 
347 		(void) sprintf(name, "streams_dblk_%ld", size);
348 		cp = kmem_cache_create(name, tot_size,
349 			DBLK_CACHE_ALIGN, dblk_constructor,
350 			dblk_destructor, NULL,
351 			(void *)(size), NULL, dblk_kmem_flags);
352 
353 		while (lastsize <= size) {
354 			dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
355 			lastsize += DBLK_MIN_SIZE;
356 		}
357 	}
358 
359 	dblk_esb_cache = kmem_cache_create("streams_dblk_esb",
360 			sizeof (dblk_t), DBLK_CACHE_ALIGN,
361 			dblk_esb_constructor, dblk_destructor, NULL,
362 			(void *) sizeof (dblk_t), NULL, dblk_kmem_flags);
363 	fthdr_cache = kmem_cache_create("streams_fthdr",
364 		sizeof (fthdr_t), 32, NULL, NULL, NULL, NULL, NULL, 0);
365 	ftblk_cache = kmem_cache_create("streams_ftblk",
366 		sizeof (ftblk_t), 32, NULL, NULL, NULL, NULL, NULL, 0);
367 
368 	/* Initialize Multidata caches */
369 	mmd_init();
370 }
371 
372 /*ARGSUSED*/
373 mblk_t *
374 allocb(size_t size, uint_t pri)
375 {
376 	dblk_t *dbp;
377 	mblk_t *mp;
378 	size_t index;
379 
380 	index =  (size - 1)  >> DBLK_SIZE_SHIFT;
381 
382 	if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
383 		if (size != 0) {
384 			mp = allocb_oversize(size, KM_NOSLEEP);
385 			goto out;
386 		}
387 		index = 0;
388 	}
389 
390 	if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
391 		mp = NULL;
392 		goto out;
393 	}
394 
395 	mp = dbp->db_mblk;
396 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
397 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
398 	mp->b_rptr = mp->b_wptr = dbp->db_base;
399 	mp->b_queue = NULL;
400 	MBLK_BAND_FLAG_WORD(mp) = 0;
401 	STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
402 out:
403 	FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
404 
405 	return (mp);
406 }
407 
408 mblk_t *
409 allocb_tmpl(size_t size, const mblk_t *tmpl)
410 {
411 	mblk_t *mp = allocb(size, 0);
412 
413 	if (mp != NULL) {
414 		cred_t *cr = DB_CRED(tmpl);
415 		if (cr != NULL)
416 			crhold(mp->b_datap->db_credp = cr);
417 		DB_CPID(mp) = DB_CPID(tmpl);
418 		DB_TYPE(mp) = DB_TYPE(tmpl);
419 	}
420 	return (mp);
421 }
422 
423 mblk_t *
424 allocb_cred(size_t size, cred_t *cr)
425 {
426 	mblk_t *mp = allocb(size, 0);
427 
428 	if (mp != NULL && cr != NULL)
429 		crhold(mp->b_datap->db_credp = cr);
430 
431 	return (mp);
432 }
433 
434 mblk_t *
435 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr)
436 {
437 	mblk_t *mp = allocb_wait(size, 0, flags, error);
438 
439 	if (mp != NULL && cr != NULL)
440 		crhold(mp->b_datap->db_credp = cr);
441 
442 	return (mp);
443 }
444 
445 void
446 freeb(mblk_t *mp)
447 {
448 	dblk_t *dbp = mp->b_datap;
449 
450 	ASSERT(dbp->db_ref > 0);
451 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
452 	FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
453 
454 	STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
455 
456 	dbp->db_free(mp, dbp);
457 }
458 
459 void
460 freemsg(mblk_t *mp)
461 {
462 	FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
463 	while (mp) {
464 		dblk_t *dbp = mp->b_datap;
465 		mblk_t *mp_cont = mp->b_cont;
466 
467 		ASSERT(dbp->db_ref > 0);
468 		ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
469 
470 		STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
471 
472 		dbp->db_free(mp, dbp);
473 		mp = mp_cont;
474 	}
475 }
476 
477 /*
478  * Reallocate a block for another use.  Try hard to use the old block.
479  * If the old data is wanted (copy), leave b_wptr at the end of the data,
480  * otherwise return b_wptr = b_rptr.
481  *
482  * This routine is private and unstable.
483  */
484 mblk_t	*
485 reallocb(mblk_t *mp, size_t size, uint_t copy)
486 {
487 	mblk_t		*mp1;
488 	unsigned char	*old_rptr;
489 	ptrdiff_t	cur_size;
490 
491 	if (mp == NULL)
492 		return (allocb(size, BPRI_HI));
493 
494 	cur_size = mp->b_wptr - mp->b_rptr;
495 	old_rptr = mp->b_rptr;
496 
497 	ASSERT(mp->b_datap->db_ref != 0);
498 
499 	if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
500 		/*
501 		 * If the data is wanted and it will fit where it is, no
502 		 * work is required.
503 		 */
504 		if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
505 			return (mp);
506 
507 		mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
508 		mp1 = mp;
509 	} else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
510 		/* XXX other mp state could be copied too, db_flags ... ? */
511 		mp1->b_cont = mp->b_cont;
512 	} else {
513 		return (NULL);
514 	}
515 
516 	if (copy) {
517 		bcopy(old_rptr, mp1->b_rptr, cur_size);
518 		mp1->b_wptr = mp1->b_rptr + cur_size;
519 	}
520 
521 	if (mp != mp1)
522 		freeb(mp);
523 
524 	return (mp1);
525 }
526 
527 static void
528 dblk_lastfree(mblk_t *mp, dblk_t *dbp)
529 {
530 	ASSERT(dbp->db_mblk == mp);
531 	if (dbp->db_fthdr != NULL)
532 		str_ftfree(dbp);
533 
534 	/* set credp and projid to be 'unspecified' before returning to cache */
535 	if (dbp->db_credp != NULL) {
536 		crfree(dbp->db_credp);
537 		dbp->db_credp = NULL;
538 	}
539 	dbp->db_cpid = -1;
540 
541 	/* Reset the struioflag and the checksum flag fields */
542 	dbp->db_struioflag = 0;
543 	dbp->db_struioun.cksum.flags = 0;
544 
545 	/* and the COOKED flag */
546 	dbp->db_flags &= ~DBLK_COOKED;
547 
548 	kmem_cache_free(dbp->db_cache, dbp);
549 }
550 
551 static void
552 dblk_decref(mblk_t *mp, dblk_t *dbp)
553 {
554 	if (dbp->db_ref != 1) {
555 		uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
556 		    -(1 << DBLK_RTFU_SHIFT(db_ref)));
557 		/*
558 		 * atomic_add_32_nv() just decremented db_ref, so we no longer
559 		 * have a reference to the dblk, which means another thread
560 		 * could free it.  Therefore we cannot examine the dblk to
561 		 * determine whether ours was the last reference.  Instead,
562 		 * we extract the new and minimum reference counts from rtfu.
563 		 * Note that all we're really saying is "if (ref != refmin)".
564 		 */
565 		if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
566 		    ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
567 			kmem_cache_free(mblk_cache, mp);
568 			return;
569 		}
570 	}
571 	dbp->db_mblk = mp;
572 	dbp->db_free = dbp->db_lastfree;
573 	dbp->db_lastfree(mp, dbp);
574 }
575 
576 mblk_t *
577 dupb(mblk_t *mp)
578 {
579 	dblk_t *dbp = mp->b_datap;
580 	mblk_t *new_mp;
581 	uint32_t oldrtfu, newrtfu;
582 
583 	if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
584 		goto out;
585 
586 	new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
587 	new_mp->b_rptr = mp->b_rptr;
588 	new_mp->b_wptr = mp->b_wptr;
589 	new_mp->b_datap = dbp;
590 	new_mp->b_queue = NULL;
591 	MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
592 
593 	STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
594 
595 	/*
596 	 * First-dup optimization.  The enabling assumption is that there
597 	 * can can never be a race (in correct code) to dup the first copy
598 	 * of a message.  Therefore we don't need to do it atomically.
599 	 */
600 	if (dbp->db_free != dblk_decref) {
601 		dbp->db_free = dblk_decref;
602 		dbp->db_ref++;
603 		goto out;
604 	}
605 
606 	do {
607 		ASSERT(dbp->db_ref > 0);
608 		oldrtfu = DBLK_RTFU_WORD(dbp);
609 		newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
610 		/*
611 		 * If db_ref is maxed out we can't dup this message anymore.
612 		 */
613 		if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
614 			kmem_cache_free(mblk_cache, new_mp);
615 			new_mp = NULL;
616 			goto out;
617 		}
618 	} while (cas32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != oldrtfu);
619 
620 out:
621 	FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
622 	return (new_mp);
623 }
624 
625 static void
626 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
627 {
628 	frtn_t *frp = dbp->db_frtnp;
629 
630 	ASSERT(dbp->db_mblk == mp);
631 	frp->free_func(frp->free_arg);
632 	if (dbp->db_fthdr != NULL)
633 		str_ftfree(dbp);
634 
635 	/* set credp and projid to be 'unspecified' before returning to cache */
636 	if (dbp->db_credp != NULL) {
637 		crfree(dbp->db_credp);
638 		dbp->db_credp = NULL;
639 	}
640 	dbp->db_cpid = -1;
641 	dbp->db_struioflag = 0;
642 	dbp->db_struioun.cksum.flags = 0;
643 
644 	kmem_cache_free(dbp->db_cache, dbp);
645 }
646 
647 /*ARGSUSED*/
648 static void
649 frnop_func(void *arg)
650 {
651 }
652 
653 /*
654  * Generic esballoc used to implement the four flavors: [d]esballoc[a].
655  */
656 static mblk_t *
657 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
658 	void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
659 {
660 	dblk_t *dbp;
661 	mblk_t *mp;
662 
663 	ASSERT(base != NULL && frp != NULL);
664 
665 	if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
666 		mp = NULL;
667 		goto out;
668 	}
669 
670 	mp = dbp->db_mblk;
671 	dbp->db_base = base;
672 	dbp->db_lim = base + size;
673 	dbp->db_free = dbp->db_lastfree = lastfree;
674 	dbp->db_frtnp = frp;
675 	DBLK_RTFU_WORD(dbp) = db_rtfu;
676 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
677 	mp->b_rptr = mp->b_wptr = base;
678 	mp->b_queue = NULL;
679 	MBLK_BAND_FLAG_WORD(mp) = 0;
680 
681 out:
682 	FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
683 	return (mp);
684 }
685 
686 /*ARGSUSED*/
687 mblk_t *
688 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
689 {
690 	mblk_t *mp;
691 
692 	/*
693 	 * Note that this is structured to allow the common case (i.e.
694 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
695 	 * call optimization.
696 	 */
697 	if (!str_ftnever) {
698 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
699 		    frp, freebs_enqueue, KM_NOSLEEP);
700 
701 		if (mp != NULL)
702 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
703 		return (mp);
704 	}
705 
706 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
707 	    frp, freebs_enqueue, KM_NOSLEEP));
708 }
709 
710 /*
711  * Same as esballoc() but sleeps waiting for memory.
712  */
713 /*ARGSUSED*/
714 mblk_t *
715 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
716 {
717 	mblk_t *mp;
718 
719 	/*
720 	 * Note that this is structured to allow the common case (i.e.
721 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
722 	 * call optimization.
723 	 */
724 	if (!str_ftnever) {
725 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
726 		    frp, freebs_enqueue, KM_SLEEP);
727 
728 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
729 		return (mp);
730 	}
731 
732 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
733 	    frp, freebs_enqueue, KM_SLEEP));
734 }
735 
736 /*ARGSUSED*/
737 mblk_t *
738 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
739 {
740 	mblk_t *mp;
741 
742 	/*
743 	 * Note that this is structured to allow the common case (i.e.
744 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
745 	 * call optimization.
746 	 */
747 	if (!str_ftnever) {
748 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
749 			frp, dblk_lastfree_desb, KM_NOSLEEP);
750 
751 		if (mp != NULL)
752 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
753 		return (mp);
754 	}
755 
756 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
757 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
758 }
759 
760 /*ARGSUSED*/
761 mblk_t *
762 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
763 {
764 	mblk_t *mp;
765 
766 	/*
767 	 * Note that this is structured to allow the common case (i.e.
768 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
769 	 * call optimization.
770 	 */
771 	if (!str_ftnever) {
772 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
773 		    frp, freebs_enqueue, KM_NOSLEEP);
774 
775 		if (mp != NULL)
776 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
777 		return (mp);
778 	}
779 
780 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
781 	    frp, freebs_enqueue, KM_NOSLEEP));
782 }
783 
784 /*ARGSUSED*/
785 mblk_t *
786 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
787 {
788 	mblk_t *mp;
789 
790 	/*
791 	 * Note that this is structured to allow the common case (i.e.
792 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
793 	 * call optimization.
794 	 */
795 	if (!str_ftnever) {
796 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
797 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
798 
799 		if (mp != NULL)
800 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
801 		return (mp);
802 	}
803 
804 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
805 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
806 }
807 
808 static void
809 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
810 {
811 	bcache_t *bcp = dbp->db_cache;
812 
813 	ASSERT(dbp->db_mblk == mp);
814 	if (dbp->db_fthdr != NULL)
815 		str_ftfree(dbp);
816 
817 	/* set credp and projid to be 'unspecified' before returning to cache */
818 	if (dbp->db_credp != NULL) {
819 		crfree(dbp->db_credp);
820 		dbp->db_credp = NULL;
821 	}
822 	dbp->db_cpid = -1;
823 	dbp->db_struioflag = 0;
824 	dbp->db_struioun.cksum.flags = 0;
825 
826 	mutex_enter(&bcp->mutex);
827 	kmem_cache_free(bcp->dblk_cache, dbp);
828 	bcp->alloc--;
829 
830 	if (bcp->alloc == 0 && bcp->destroy != 0) {
831 		kmem_cache_destroy(bcp->dblk_cache);
832 		kmem_cache_destroy(bcp->buffer_cache);
833 		mutex_exit(&bcp->mutex);
834 		mutex_destroy(&bcp->mutex);
835 		kmem_free(bcp, sizeof (bcache_t));
836 	} else {
837 		mutex_exit(&bcp->mutex);
838 	}
839 }
840 
841 bcache_t *
842 bcache_create(char *name, size_t size, uint_t align)
843 {
844 	bcache_t *bcp;
845 	char buffer[255];
846 
847 	ASSERT((align & (align - 1)) == 0);
848 
849 	if ((bcp = (bcache_t *)kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) ==
850 	    NULL) {
851 		return (NULL);
852 	}
853 
854 	bcp->size = size;
855 	bcp->align = align;
856 	bcp->alloc = 0;
857 	bcp->destroy = 0;
858 
859 	mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
860 
861 	(void) sprintf(buffer, "%s_buffer_cache", name);
862 	bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
863 	    NULL, NULL, NULL, 0);
864 	(void) sprintf(buffer, "%s_dblk_cache", name);
865 	bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
866 	    DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
867 						NULL, (void *)bcp, NULL, 0);
868 
869 	return (bcp);
870 }
871 
872 void
873 bcache_destroy(bcache_t *bcp)
874 {
875 	ASSERT(bcp != NULL);
876 
877 	mutex_enter(&bcp->mutex);
878 	if (bcp->alloc == 0) {
879 		kmem_cache_destroy(bcp->dblk_cache);
880 		kmem_cache_destroy(bcp->buffer_cache);
881 		mutex_exit(&bcp->mutex);
882 		mutex_destroy(&bcp->mutex);
883 		kmem_free(bcp, sizeof (bcache_t));
884 	} else {
885 		bcp->destroy++;
886 		mutex_exit(&bcp->mutex);
887 	}
888 }
889 
890 /*ARGSUSED*/
891 mblk_t *
892 bcache_allocb(bcache_t *bcp, uint_t pri)
893 {
894 	dblk_t *dbp;
895 	mblk_t *mp = NULL;
896 
897 	ASSERT(bcp != NULL);
898 
899 	mutex_enter(&bcp->mutex);
900 	if (bcp->destroy != 0) {
901 		mutex_exit(&bcp->mutex);
902 		goto out;
903 	}
904 
905 	if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
906 		mutex_exit(&bcp->mutex);
907 		goto out;
908 	}
909 	bcp->alloc++;
910 	mutex_exit(&bcp->mutex);
911 
912 	ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
913 
914 	mp = dbp->db_mblk;
915 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
916 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
917 	mp->b_rptr = mp->b_wptr = dbp->db_base;
918 	mp->b_queue = NULL;
919 	MBLK_BAND_FLAG_WORD(mp) = 0;
920 	STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
921 out:
922 	FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
923 
924 	return (mp);
925 }
926 
927 static void
928 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
929 {
930 	ASSERT(dbp->db_mblk == mp);
931 	if (dbp->db_fthdr != NULL)
932 		str_ftfree(dbp);
933 
934 	/* set credp and projid to be 'unspecified' before returning to cache */
935 	if (dbp->db_credp != NULL) {
936 		crfree(dbp->db_credp);
937 		dbp->db_credp = NULL;
938 	}
939 	dbp->db_cpid = -1;
940 	dbp->db_struioflag = 0;
941 	dbp->db_struioun.cksum.flags = 0;
942 
943 	kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
944 	kmem_cache_free(dbp->db_cache, dbp);
945 }
946 
947 static mblk_t *
948 allocb_oversize(size_t size, int kmflags)
949 {
950 	mblk_t *mp;
951 	void *buf;
952 
953 	size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
954 	if ((buf = kmem_alloc(size, kmflags)) == NULL)
955 		return (NULL);
956 	if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
957 	    &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
958 		kmem_free(buf, size);
959 
960 	if (mp != NULL)
961 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
962 
963 	return (mp);
964 }
965 
966 mblk_t *
967 allocb_tryhard(size_t target_size)
968 {
969 	size_t size;
970 	mblk_t *bp;
971 
972 	for (size = target_size; size < target_size + 512;
973 	    size += DBLK_CACHE_ALIGN)
974 		if ((bp = allocb(size, BPRI_HI)) != NULL)
975 			return (bp);
976 	allocb_tryhard_fails++;
977 	return (NULL);
978 }
979 
980 /*
981  * This routine is consolidation private for STREAMS internal use
982  * This routine may only be called from sync routines (i.e., not
983  * from put or service procedures).  It is located here (rather
984  * than strsubr.c) so that we don't have to expose all of the
985  * allocb() implementation details in header files.
986  */
987 mblk_t *
988 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
989 {
990 	dblk_t *dbp;
991 	mblk_t *mp;
992 	size_t index;
993 
994 	index = (size -1) >> DBLK_SIZE_SHIFT;
995 
996 	if (flags & STR_NOSIG) {
997 		if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
998 			if (size != 0) {
999 				mp = allocb_oversize(size, KM_SLEEP);
1000 				FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
1001 				    (uintptr_t)mp);
1002 				return (mp);
1003 			}
1004 			index = 0;
1005 		}
1006 
1007 		dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
1008 		mp = dbp->db_mblk;
1009 		DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1010 		mp->b_next = mp->b_prev = mp->b_cont = NULL;
1011 		mp->b_rptr = mp->b_wptr = dbp->db_base;
1012 		mp->b_queue = NULL;
1013 		MBLK_BAND_FLAG_WORD(mp) = 0;
1014 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
1015 
1016 		FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
1017 
1018 	} else {
1019 		while ((mp = allocb(size, pri)) == NULL) {
1020 			if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
1021 				return (NULL);
1022 		}
1023 	}
1024 
1025 	return (mp);
1026 }
1027 
1028 /*
1029  * Call function 'func' with 'arg' when a class zero block can
1030  * be allocated with priority 'pri'.
1031  */
1032 bufcall_id_t
1033 esbbcall(uint_t pri, void (*func)(void *), void *arg)
1034 {
1035 	return (bufcall(1, pri, func, arg));
1036 }
1037 
1038 /*
1039  * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
1040  * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
1041  * This provides consistency for all internal allocators of ioctl.
1042  */
1043 mblk_t *
1044 mkiocb(uint_t cmd)
1045 {
1046 	struct iocblk	*ioc;
1047 	mblk_t		*mp;
1048 
1049 	/*
1050 	 * Allocate enough space for any of the ioctl related messages.
1051 	 */
1052 	if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
1053 		return (NULL);
1054 
1055 	bzero(mp->b_rptr, sizeof (union ioctypes));
1056 
1057 	/*
1058 	 * Set the mblk_t information and ptrs correctly.
1059 	 */
1060 	mp->b_wptr += sizeof (struct iocblk);
1061 	mp->b_datap->db_type = M_IOCTL;
1062 
1063 	/*
1064 	 * Fill in the fields.
1065 	 */
1066 	ioc		= (struct iocblk *)mp->b_rptr;
1067 	ioc->ioc_cmd	= cmd;
1068 	ioc->ioc_cr	= kcred;
1069 	ioc->ioc_id	= getiocseqno();
1070 	ioc->ioc_flag	= IOC_NATIVE;
1071 	return (mp);
1072 }
1073 
1074 /*
1075  * test if block of given size can be allocated with a request of
1076  * the given priority.
1077  * 'pri' is no longer used, but is retained for compatibility.
1078  */
1079 /* ARGSUSED */
1080 int
1081 testb(size_t size, uint_t pri)
1082 {
1083 	return ((size + sizeof (dblk_t)) <= kmem_avail());
1084 }
1085 
1086 /*
1087  * Call function 'func' with argument 'arg' when there is a reasonably
1088  * good chance that a block of size 'size' can be allocated.
1089  * 'pri' is no longer used, but is retained for compatibility.
1090  */
1091 /* ARGSUSED */
1092 bufcall_id_t
1093 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
1094 {
1095 	static long bid = 1;	/* always odd to save checking for zero */
1096 	bufcall_id_t bc_id;
1097 	struct strbufcall *bcp;
1098 
1099 	if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
1100 		return (0);
1101 
1102 	bcp->bc_func = func;
1103 	bcp->bc_arg = arg;
1104 	bcp->bc_size = size;
1105 	bcp->bc_next = NULL;
1106 	bcp->bc_executor = NULL;
1107 
1108 	mutex_enter(&strbcall_lock);
1109 	/*
1110 	 * After bcp is linked into strbcalls and strbcall_lock is dropped there
1111 	 * should be no references to bcp since it may be freed by
1112 	 * runbufcalls(). Since bcp_id field is returned, we save its value in
1113 	 * the local var.
1114 	 */
1115 	bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2);	/* keep it odd */
1116 
1117 	/*
1118 	 * add newly allocated stream event to existing
1119 	 * linked list of events.
1120 	 */
1121 	if (strbcalls.bc_head == NULL) {
1122 		strbcalls.bc_head = strbcalls.bc_tail = bcp;
1123 	} else {
1124 		strbcalls.bc_tail->bc_next = bcp;
1125 		strbcalls.bc_tail = bcp;
1126 	}
1127 
1128 	cv_signal(&strbcall_cv);
1129 	mutex_exit(&strbcall_lock);
1130 	return (bc_id);
1131 }
1132 
1133 /*
1134  * Cancel a bufcall request.
1135  */
1136 void
1137 unbufcall(bufcall_id_t id)
1138 {
1139 	strbufcall_t *bcp, *pbcp;
1140 
1141 	mutex_enter(&strbcall_lock);
1142 again:
1143 	pbcp = NULL;
1144 	for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
1145 		if (id == bcp->bc_id)
1146 			break;
1147 		pbcp = bcp;
1148 	}
1149 	if (bcp) {
1150 		if (bcp->bc_executor != NULL) {
1151 			if (bcp->bc_executor != curthread) {
1152 				cv_wait(&bcall_cv, &strbcall_lock);
1153 				goto again;
1154 			}
1155 		} else {
1156 			if (pbcp)
1157 				pbcp->bc_next = bcp->bc_next;
1158 			else
1159 				strbcalls.bc_head = bcp->bc_next;
1160 			if (bcp == strbcalls.bc_tail)
1161 				strbcalls.bc_tail = pbcp;
1162 			kmem_free(bcp, sizeof (strbufcall_t));
1163 		}
1164 	}
1165 	mutex_exit(&strbcall_lock);
1166 }
1167 
1168 /*
1169  * Duplicate a message block by block (uses dupb), returning
1170  * a pointer to the duplicate message.
1171  * Returns a non-NULL value only if the entire message
1172  * was dup'd.
1173  */
1174 mblk_t *
1175 dupmsg(mblk_t *bp)
1176 {
1177 	mblk_t *head, *nbp;
1178 
1179 	if (!bp || !(nbp = head = dupb(bp)))
1180 		return (NULL);
1181 
1182 	while (bp->b_cont) {
1183 		if (!(nbp->b_cont = dupb(bp->b_cont))) {
1184 			freemsg(head);
1185 			return (NULL);
1186 		}
1187 		nbp = nbp->b_cont;
1188 		bp = bp->b_cont;
1189 	}
1190 	return (head);
1191 }
1192 
1193 #define	DUPB_NOLOAN(bp) \
1194 	((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
1195 	copyb((bp)) : dupb((bp)))
1196 
1197 mblk_t *
1198 dupmsg_noloan(mblk_t *bp)
1199 {
1200 	mblk_t *head, *nbp;
1201 
1202 	if (bp == NULL || DB_TYPE(bp) != M_DATA ||
1203 	    ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
1204 		return (NULL);
1205 
1206 	while (bp->b_cont) {
1207 		if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
1208 			freemsg(head);
1209 			return (NULL);
1210 		}
1211 		nbp = nbp->b_cont;
1212 		bp = bp->b_cont;
1213 	}
1214 	return (head);
1215 }
1216 
1217 /*
1218  * Copy data from message and data block to newly allocated message and
1219  * data block. Returns new message block pointer, or NULL if error.
1220  * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
1221  * as in the original even when db_base is not word aligned. (bug 1052877)
1222  */
1223 mblk_t *
1224 copyb(mblk_t *bp)
1225 {
1226 	mblk_t	*nbp;
1227 	dblk_t	*dp, *ndp;
1228 	uchar_t *base;
1229 	size_t	size;
1230 	size_t	unaligned;
1231 
1232 	ASSERT(bp->b_wptr >= bp->b_rptr);
1233 
1234 	dp = bp->b_datap;
1235 	if (dp->db_fthdr != NULL)
1236 		STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
1237 
1238 	/*
1239 	 * Special handling for Multidata message; this should be
1240 	 * removed once a copy-callback routine is made available.
1241 	 */
1242 	if (dp->db_type == M_MULTIDATA) {
1243 		cred_t *cr;
1244 
1245 		if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL)
1246 			return (NULL);
1247 
1248 		nbp->b_flag = bp->b_flag;
1249 		nbp->b_band = bp->b_band;
1250 		ndp = nbp->b_datap;
1251 
1252 		/* See comments below on potential issues. */
1253 		STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1254 
1255 		ASSERT(ndp->db_type == dp->db_type);
1256 		cr = dp->db_credp;
1257 		if (cr != NULL)
1258 			crhold(ndp->db_credp = cr);
1259 		ndp->db_cpid = dp->db_cpid;
1260 		return (nbp);
1261 	}
1262 
1263 	size = dp->db_lim - dp->db_base;
1264 	unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
1265 	if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
1266 		return (NULL);
1267 	nbp->b_flag = bp->b_flag;
1268 	nbp->b_band = bp->b_band;
1269 	ndp = nbp->b_datap;
1270 
1271 	/*
1272 	 * Well, here is a potential issue.  If we are trying to
1273 	 * trace a flow, and we copy the message, we might lose
1274 	 * information about where this message might have been.
1275 	 * So we should inherit the FT data.  On the other hand,
1276 	 * a user might be interested only in alloc to free data.
1277 	 * So I guess the real answer is to provide a tunable.
1278 	 */
1279 	STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1280 
1281 	base = ndp->db_base + unaligned;
1282 	bcopy(dp->db_base, ndp->db_base + unaligned, size);
1283 
1284 	nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
1285 	nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
1286 
1287 	return (nbp);
1288 }
1289 
1290 /*
1291  * Copy data from message to newly allocated message using new
1292  * data blocks.  Returns a pointer to the new message, or NULL if error.
1293  */
1294 mblk_t *
1295 copymsg(mblk_t *bp)
1296 {
1297 	mblk_t *head, *nbp;
1298 
1299 	if (!bp || !(nbp = head = copyb(bp)))
1300 		return (NULL);
1301 
1302 	while (bp->b_cont) {
1303 		if (!(nbp->b_cont = copyb(bp->b_cont))) {
1304 			freemsg(head);
1305 			return (NULL);
1306 		}
1307 		nbp = nbp->b_cont;
1308 		bp = bp->b_cont;
1309 	}
1310 	return (head);
1311 }
1312 
1313 /*
1314  * link a message block to tail of message
1315  */
1316 void
1317 linkb(mblk_t *mp, mblk_t *bp)
1318 {
1319 	ASSERT(mp && bp);
1320 
1321 	for (; mp->b_cont; mp = mp->b_cont)
1322 		;
1323 	mp->b_cont = bp;
1324 }
1325 
1326 /*
1327  * unlink a message block from head of message
1328  * return pointer to new message.
1329  * NULL if message becomes empty.
1330  */
1331 mblk_t *
1332 unlinkb(mblk_t *bp)
1333 {
1334 	mblk_t *bp1;
1335 
1336 	bp1 = bp->b_cont;
1337 	bp->b_cont = NULL;
1338 	return (bp1);
1339 }
1340 
1341 /*
1342  * remove a message block "bp" from message "mp"
1343  *
1344  * Return pointer to new message or NULL if no message remains.
1345  * Return -1 if bp is not found in message.
1346  */
1347 mblk_t *
1348 rmvb(mblk_t *mp, mblk_t *bp)
1349 {
1350 	mblk_t *tmp;
1351 	mblk_t *lastp = NULL;
1352 
1353 	ASSERT(mp && bp);
1354 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
1355 		if (tmp == bp) {
1356 			if (lastp)
1357 				lastp->b_cont = tmp->b_cont;
1358 			else
1359 				mp = tmp->b_cont;
1360 			tmp->b_cont = NULL;
1361 			return (mp);
1362 		}
1363 		lastp = tmp;
1364 	}
1365 	return ((mblk_t *)-1);
1366 }
1367 
1368 /*
1369  * Concatenate and align first len bytes of common
1370  * message type.  Len == -1, means concat everything.
1371  * Returns 1 on success, 0 on failure
1372  * After the pullup, mp points to the pulled up data.
1373  */
1374 int
1375 pullupmsg(mblk_t *mp, ssize_t len)
1376 {
1377 	mblk_t *bp, *b_cont;
1378 	dblk_t *dbp;
1379 	ssize_t n;
1380 	uint32_t start, stuff, end, value, flags;
1381 
1382 	ASSERT(mp->b_datap->db_ref > 0);
1383 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
1384 
1385 	/*
1386 	 * We won't handle Multidata message, since it contains
1387 	 * metadata which this function has no knowledge of; we
1388 	 * assert on DEBUG, and return failure otherwise.
1389 	 */
1390 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1391 	if (mp->b_datap->db_type == M_MULTIDATA)
1392 		return (0);
1393 
1394 	if (len == -1) {
1395 		if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
1396 			return (1);
1397 		len = xmsgsize(mp);
1398 	} else {
1399 		ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
1400 		ASSERT(first_mblk_len >= 0);
1401 		/*
1402 		 * If the length is less than that of the first mblk,
1403 		 * we want to pull up the message into an aligned mblk.
1404 		 * Though not part of the spec, some callers assume it.
1405 		 */
1406 		if (len <= first_mblk_len) {
1407 			if (str_aligned(mp->b_rptr))
1408 				return (1);
1409 			len = first_mblk_len;
1410 		} else if (xmsgsize(mp) < len)
1411 			return (0);
1412 	}
1413 
1414 	if ((bp = allocb_tmpl(len, mp)) == NULL)
1415 		return (0);
1416 
1417 	dbp = bp->b_datap;
1418 	*bp = *mp;		/* swap mblks so bp heads the old msg... */
1419 	mp->b_datap = dbp;	/* ... and mp heads the new message */
1420 	mp->b_datap->db_mblk = mp;
1421 	bp->b_datap->db_mblk = bp;
1422 	mp->b_rptr = mp->b_wptr = dbp->db_base;
1423 
1424 	/*
1425 	 * Need to preserve checksum information by copying them
1426 	 * to mp which heads the pulluped message.
1427 	 */
1428 	hcksum_retrieve(bp, NULL, NULL, &start, &stuff, &end, &value, &flags);
1429 	(void) hcksum_assoc(mp, NULL, NULL, start, stuff, end, value, flags, 0);
1430 
1431 	do {
1432 		ASSERT(bp->b_datap->db_ref > 0);
1433 		ASSERT(bp->b_wptr >= bp->b_rptr);
1434 		n = MIN(bp->b_wptr - bp->b_rptr, len);
1435 		bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
1436 		mp->b_wptr += n;
1437 		bp->b_rptr += n;
1438 		len -= n;
1439 		if (bp->b_rptr != bp->b_wptr)
1440 			break;
1441 		b_cont = bp->b_cont;
1442 		freeb(bp);
1443 		bp = b_cont;
1444 	} while (len && bp);
1445 
1446 	mp->b_cont = bp;	/* tack on whatever wasn't pulled up */
1447 
1448 	return (1);
1449 }
1450 
1451 /*
1452  * Concatenate and align at least the first len bytes of common message
1453  * type.  Len == -1 means concatenate everything.  The original message is
1454  * unaltered.  Returns a pointer to a new message on success, otherwise
1455  * returns NULL.
1456  */
1457 mblk_t *
1458 msgpullup(mblk_t *mp, ssize_t len)
1459 {
1460 	mblk_t	*newmp;
1461 	ssize_t	totlen;
1462 	ssize_t	n;
1463 	uint32_t start, stuff, end, value, flags;
1464 
1465 	/*
1466 	 * We won't handle Multidata message, since it contains
1467 	 * metadata which this function has no knowledge of; we
1468 	 * assert on DEBUG, and return failure otherwise.
1469 	 */
1470 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1471 	if (mp->b_datap->db_type == M_MULTIDATA)
1472 		return (NULL);
1473 
1474 	totlen = xmsgsize(mp);
1475 
1476 	if ((len > 0) && (len > totlen))
1477 		return (NULL);
1478 
1479 	/*
1480 	 * Copy all of the first msg type into one new mblk, then dupmsg
1481 	 * and link the rest onto this.
1482 	 */
1483 
1484 	len = totlen;
1485 
1486 	if ((newmp = allocb_tmpl(len, mp)) == NULL)
1487 		return (NULL);
1488 
1489 	newmp->b_flag = mp->b_flag;
1490 	newmp->b_band = mp->b_band;
1491 
1492 	/*
1493 	 * Need to preserve checksum information by copying them
1494 	 * to newmp which heads the pulluped message.
1495 	 */
1496 	hcksum_retrieve(mp, NULL, NULL, &start, &stuff, &end, &value, &flags);
1497 	(void) hcksum_assoc(newmp, NULL, NULL, start, stuff, end,
1498 	    value, flags, 0);
1499 
1500 	while (len > 0) {
1501 		n = mp->b_wptr - mp->b_rptr;
1502 		ASSERT(n >= 0);		/* allow zero-length mblk_t's */
1503 		if (n > 0)
1504 			bcopy(mp->b_rptr, newmp->b_wptr, n);
1505 		newmp->b_wptr += n;
1506 		len -= n;
1507 		mp = mp->b_cont;
1508 	}
1509 
1510 	if (mp != NULL) {
1511 		newmp->b_cont = dupmsg(mp);
1512 		if (newmp->b_cont == NULL) {
1513 			freemsg(newmp);
1514 			return (NULL);
1515 		}
1516 	}
1517 
1518 	return (newmp);
1519 }
1520 
1521 /*
1522  * Trim bytes from message
1523  *  len > 0, trim from head
1524  *  len < 0, trim from tail
1525  * Returns 1 on success, 0 on failure.
1526  */
1527 int
1528 adjmsg(mblk_t *mp, ssize_t len)
1529 {
1530 	mblk_t *bp;
1531 	mblk_t *save_bp = NULL;
1532 	mblk_t *prev_bp;
1533 	mblk_t *bcont;
1534 	unsigned char type;
1535 	ssize_t n;
1536 	int fromhead;
1537 	int first;
1538 
1539 	ASSERT(mp != NULL);
1540 	/*
1541 	 * We won't handle Multidata message, since it contains
1542 	 * metadata which this function has no knowledge of; we
1543 	 * assert on DEBUG, and return failure otherwise.
1544 	 */
1545 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1546 	if (mp->b_datap->db_type == M_MULTIDATA)
1547 		return (0);
1548 
1549 	if (len < 0) {
1550 		fromhead = 0;
1551 		len = -len;
1552 	} else {
1553 		fromhead = 1;
1554 	}
1555 
1556 	if (xmsgsize(mp) < len)
1557 		return (0);
1558 
1559 
1560 	if (fromhead) {
1561 		first = 1;
1562 		while (len) {
1563 			ASSERT(mp->b_wptr >= mp->b_rptr);
1564 			n = MIN(mp->b_wptr - mp->b_rptr, len);
1565 			mp->b_rptr += n;
1566 			len -= n;
1567 
1568 			/*
1569 			 * If this is not the first zero length
1570 			 * message remove it
1571 			 */
1572 			if (!first && (mp->b_wptr == mp->b_rptr)) {
1573 				bcont = mp->b_cont;
1574 				freeb(mp);
1575 				mp = save_bp->b_cont = bcont;
1576 			} else {
1577 				save_bp = mp;
1578 				mp = mp->b_cont;
1579 			}
1580 			first = 0;
1581 		}
1582 	} else {
1583 		type = mp->b_datap->db_type;
1584 		while (len) {
1585 			bp = mp;
1586 			save_bp = NULL;
1587 
1588 			/*
1589 			 * Find the last message of same type
1590 			 */
1591 
1592 			while (bp && bp->b_datap->db_type == type) {
1593 				ASSERT(bp->b_wptr >= bp->b_rptr);
1594 				prev_bp = save_bp;
1595 				save_bp = bp;
1596 				bp = bp->b_cont;
1597 			}
1598 			if (save_bp == NULL)
1599 				break;
1600 			n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
1601 			save_bp->b_wptr -= n;
1602 			len -= n;
1603 
1604 			/*
1605 			 * If this is not the first message
1606 			 * and we have taken away everything
1607 			 * from this message, remove it
1608 			 */
1609 
1610 			if ((save_bp != mp) &&
1611 				(save_bp->b_wptr == save_bp->b_rptr)) {
1612 				bcont = save_bp->b_cont;
1613 				freeb(save_bp);
1614 				prev_bp->b_cont = bcont;
1615 			}
1616 		}
1617 	}
1618 	return (1);
1619 }
1620 
1621 /*
1622  * get number of data bytes in message
1623  */
1624 size_t
1625 msgdsize(mblk_t *bp)
1626 {
1627 	size_t count = 0;
1628 
1629 	for (; bp; bp = bp->b_cont)
1630 		if (bp->b_datap->db_type == M_DATA) {
1631 			ASSERT(bp->b_wptr >= bp->b_rptr);
1632 			count += bp->b_wptr - bp->b_rptr;
1633 		}
1634 	return (count);
1635 }
1636 
1637 /*
1638  * Get a message off head of queue
1639  *
1640  * If queue has no buffers then mark queue
1641  * with QWANTR. (queue wants to be read by
1642  * someone when data becomes available)
1643  *
1644  * If there is something to take off then do so.
1645  * If queue falls below hi water mark turn off QFULL
1646  * flag.  Decrement weighted count of queue.
1647  * Also turn off QWANTR because queue is being read.
1648  *
1649  * The queue count is maintained on a per-band basis.
1650  * Priority band 0 (normal messages) uses q_count,
1651  * q_lowat, etc.  Non-zero priority bands use the
1652  * fields in their respective qband structures
1653  * (qb_count, qb_lowat, etc.)  All messages appear
1654  * on the same list, linked via their b_next pointers.
1655  * q_first is the head of the list.  q_count does
1656  * not reflect the size of all the messages on the
1657  * queue.  It only reflects those messages in the
1658  * normal band of flow.  The one exception to this
1659  * deals with high priority messages.  They are in
1660  * their own conceptual "band", but are accounted
1661  * against q_count.
1662  *
1663  * If queue count is below the lo water mark and QWANTW
1664  * is set, enable the closest backq which has a service
1665  * procedure and turn off the QWANTW flag.
1666  *
1667  * getq could be built on top of rmvq, but isn't because
1668  * of performance considerations.
1669  *
1670  * A note on the use of q_count and q_mblkcnt:
1671  *   q_count is the traditional byte count for messages that
1672  *   have been put on a queue.  Documentation tells us that
1673  *   we shouldn't rely on that count, but some drivers/modules
1674  *   do.  What was needed, however, is a mechanism to prevent
1675  *   runaway streams from consuming all of the resources,
1676  *   and particularly be able to flow control zero-length
1677  *   messages.  q_mblkcnt is used for this purpose.  It
1678  *   counts the number of mblk's that are being put on
1679  *   the queue.  The intention here, is that each mblk should
1680  *   contain one byte of data and, for the purpose of
1681  *   flow-control, logically does.  A queue will become
1682  *   full when EITHER of these values (q_count and q_mblkcnt)
1683  *   reach the highwater mark.  It will clear when BOTH
1684  *   of them drop below the highwater mark.  And it will
1685  *   backenable when BOTH of them drop below the lowwater
1686  *   mark.
1687  *   With this algorithm, a driver/module might be able
1688  *   to find a reasonably accurate q_count, and the
1689  *   framework can still try and limit resource usage.
1690  */
1691 mblk_t *
1692 getq(queue_t *q)
1693 {
1694 	mblk_t *bp;
1695 	uchar_t band = 0;
1696 
1697 	bp = getq_noenab(q);
1698 	if (bp != NULL)
1699 		band = bp->b_band;
1700 
1701 	/*
1702 	 * Inlined from qbackenable().
1703 	 * Quick check without holding the lock.
1704 	 */
1705 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1706 		return (bp);
1707 
1708 	qbackenable(q, band);
1709 	return (bp);
1710 }
1711 
1712 /*
1713  * Calculate number of data bytes in a single data message block taking
1714  * multidata messages into account.
1715  */
1716 
1717 #define	ADD_MBLK_SIZE(mp, size) 					\
1718 	if (DB_TYPE(mp) != M_MULTIDATA) {				\
1719 		(size) += MBLKL(mp);					\
1720 	} else {							\
1721 		uint_t	pinuse;						\
1722 									\
1723 		mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse);	\
1724 		(size) += pinuse;					\
1725 	}
1726 
1727 /*
1728  * Like getq() but does not backenable.  This is used by the stream
1729  * head when a putback() is likely.  The caller must call qbackenable()
1730  * after it is done with accessing the queue.
1731  */
1732 mblk_t *
1733 getq_noenab(queue_t *q)
1734 {
1735 	mblk_t *bp;
1736 	mblk_t *tmp;
1737 	qband_t *qbp;
1738 	kthread_id_t freezer;
1739 	int	bytecnt = 0, mblkcnt = 0;
1740 
1741 	/* freezestr should allow its caller to call getq/putq */
1742 	freezer = STREAM(q)->sd_freezer;
1743 	if (freezer == curthread) {
1744 		ASSERT(frozenstr(q));
1745 		ASSERT(MUTEX_HELD(QLOCK(q)));
1746 	} else
1747 		mutex_enter(QLOCK(q));
1748 
1749 	if ((bp = q->q_first) == 0) {
1750 		q->q_flag |= QWANTR;
1751 	} else {
1752 		if ((q->q_first = bp->b_next) == NULL)
1753 			q->q_last = NULL;
1754 		else
1755 			q->q_first->b_prev = NULL;
1756 
1757 		/* Get message byte count for q_count accounting */
1758 		for (tmp = bp; tmp; tmp = tmp->b_cont) {
1759 			ADD_MBLK_SIZE(tmp, bytecnt);
1760 			mblkcnt++;
1761 		}
1762 
1763 		if (bp->b_band == 0) {
1764 			q->q_count -= bytecnt;
1765 			q->q_mblkcnt -= mblkcnt;
1766 			if ((q->q_count < q->q_hiwat) &&
1767 			    (q->q_mblkcnt < q->q_hiwat)) {
1768 				q->q_flag &= ~QFULL;
1769 			}
1770 		} else {
1771 			int i;
1772 
1773 			ASSERT(bp->b_band <= q->q_nband);
1774 			ASSERT(q->q_bandp != NULL);
1775 			ASSERT(MUTEX_HELD(QLOCK(q)));
1776 			qbp = q->q_bandp;
1777 			i = bp->b_band;
1778 			while (--i > 0)
1779 				qbp = qbp->qb_next;
1780 			if (qbp->qb_first == qbp->qb_last) {
1781 				qbp->qb_first = NULL;
1782 				qbp->qb_last = NULL;
1783 			} else {
1784 				qbp->qb_first = bp->b_next;
1785 			}
1786 			qbp->qb_count -= bytecnt;
1787 			qbp->qb_mblkcnt -= mblkcnt;
1788 			if ((qbp->qb_count < qbp->qb_hiwat) &&
1789 			    (qbp->qb_mblkcnt < qbp->qb_hiwat)) {
1790 				qbp->qb_flag &= ~QB_FULL;
1791 			}
1792 		}
1793 		q->q_flag &= ~QWANTR;
1794 		bp->b_next = NULL;
1795 		bp->b_prev = NULL;
1796 	}
1797 	if (freezer != curthread)
1798 		mutex_exit(QLOCK(q));
1799 
1800 	STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL);
1801 
1802 	return (bp);
1803 }
1804 
1805 /*
1806  * Determine if a backenable is needed after removing a message in the
1807  * specified band.
1808  * NOTE: This routine assumes that something like getq_noenab() has been
1809  * already called.
1810  *
1811  * For the read side it is ok to hold sd_lock across calling this (and the
1812  * stream head often does).
1813  * But for the write side strwakeq might be invoked and it acquires sd_lock.
1814  */
1815 void
1816 qbackenable(queue_t *q, uchar_t band)
1817 {
1818 	int backenab = 0;
1819 	qband_t *qbp;
1820 	kthread_id_t freezer;
1821 
1822 	ASSERT(q);
1823 	ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
1824 
1825 	/*
1826 	 * Quick check without holding the lock.
1827 	 * OK since after getq() has lowered the q_count these flags
1828 	 * would not change unless either the qbackenable() is done by
1829 	 * another thread (which is ok) or the queue has gotten QFULL
1830 	 * in which case another backenable will take place when the queue
1831 	 * drops below q_lowat.
1832 	 */
1833 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1834 		return;
1835 
1836 	/* freezestr should allow its caller to call getq/putq */
1837 	freezer = STREAM(q)->sd_freezer;
1838 	if (freezer == curthread) {
1839 		ASSERT(frozenstr(q));
1840 		ASSERT(MUTEX_HELD(QLOCK(q)));
1841 	} else
1842 		mutex_enter(QLOCK(q));
1843 
1844 	if (band == 0) {
1845 		if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
1846 		    q->q_mblkcnt < q->q_lowat)) {
1847 			backenab = q->q_flag & (QWANTW|QWANTWSYNC);
1848 		}
1849 	} else {
1850 		int i;
1851 
1852 		ASSERT((unsigned)band <= q->q_nband);
1853 		ASSERT(q->q_bandp != NULL);
1854 
1855 		qbp = q->q_bandp;
1856 		i = band;
1857 		while (--i > 0)
1858 			qbp = qbp->qb_next;
1859 
1860 		if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
1861 		    qbp->qb_mblkcnt < qbp->qb_lowat)) {
1862 			backenab = qbp->qb_flag & QB_WANTW;
1863 		}
1864 	}
1865 
1866 	if (backenab == 0) {
1867 		if (freezer != curthread)
1868 			mutex_exit(QLOCK(q));
1869 		return;
1870 	}
1871 
1872 	/* Have to drop the lock across strwakeq and backenable */
1873 	if (backenab & QWANTWSYNC)
1874 		q->q_flag &= ~QWANTWSYNC;
1875 	if (backenab & (QWANTW|QB_WANTW)) {
1876 		if (band != 0)
1877 			qbp->qb_flag &= ~QB_WANTW;
1878 		else {
1879 			q->q_flag &= ~QWANTW;
1880 		}
1881 	}
1882 
1883 	if (freezer != curthread)
1884 		mutex_exit(QLOCK(q));
1885 
1886 	if (backenab & QWANTWSYNC)
1887 		strwakeq(q, QWANTWSYNC);
1888 	if (backenab & (QWANTW|QB_WANTW))
1889 		backenable(q, band);
1890 }
1891 
1892 /*
1893  * Remove a message from a queue.  The queue count and other
1894  * flow control parameters are adjusted and the back queue
1895  * enabled if necessary.
1896  *
1897  * rmvq can be called with the stream frozen, but other utility functions
1898  * holding QLOCK, and by streams modules without any locks/frozen.
1899  */
1900 void
1901 rmvq(queue_t *q, mblk_t *mp)
1902 {
1903 	ASSERT(mp != NULL);
1904 
1905 	rmvq_noenab(q, mp);
1906 	if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
1907 		/*
1908 		 * qbackenable can handle a frozen stream but not a "random"
1909 		 * qlock being held. Drop lock across qbackenable.
1910 		 */
1911 		mutex_exit(QLOCK(q));
1912 		qbackenable(q, mp->b_band);
1913 		mutex_enter(QLOCK(q));
1914 	} else {
1915 		qbackenable(q, mp->b_band);
1916 	}
1917 }
1918 
1919 /*
1920  * Like rmvq() but without any backenabling.
1921  * This exists to handle SR_CONSOL_DATA in strrput().
1922  */
1923 void
1924 rmvq_noenab(queue_t *q, mblk_t *mp)
1925 {
1926 	mblk_t *tmp;
1927 	int i;
1928 	qband_t *qbp = NULL;
1929 	kthread_id_t freezer;
1930 	int	bytecnt = 0, mblkcnt = 0;
1931 
1932 	freezer = STREAM(q)->sd_freezer;
1933 	if (freezer == curthread) {
1934 		ASSERT(frozenstr(q));
1935 		ASSERT(MUTEX_HELD(QLOCK(q)));
1936 	} else if (MUTEX_HELD(QLOCK(q))) {
1937 		/* Don't drop lock on exit */
1938 		freezer = curthread;
1939 	} else
1940 		mutex_enter(QLOCK(q));
1941 
1942 	ASSERT(mp->b_band <= q->q_nband);
1943 	if (mp->b_band != 0) {		/* Adjust band pointers */
1944 		ASSERT(q->q_bandp != NULL);
1945 		qbp = q->q_bandp;
1946 		i = mp->b_band;
1947 		while (--i > 0)
1948 			qbp = qbp->qb_next;
1949 		if (mp == qbp->qb_first) {
1950 			if (mp->b_next && mp->b_band == mp->b_next->b_band)
1951 				qbp->qb_first = mp->b_next;
1952 			else
1953 				qbp->qb_first = NULL;
1954 		}
1955 		if (mp == qbp->qb_last) {
1956 			if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
1957 				qbp->qb_last = mp->b_prev;
1958 			else
1959 				qbp->qb_last = NULL;
1960 		}
1961 	}
1962 
1963 	/*
1964 	 * Remove the message from the list.
1965 	 */
1966 	if (mp->b_prev)
1967 		mp->b_prev->b_next = mp->b_next;
1968 	else
1969 		q->q_first = mp->b_next;
1970 	if (mp->b_next)
1971 		mp->b_next->b_prev = mp->b_prev;
1972 	else
1973 		q->q_last = mp->b_prev;
1974 	mp->b_next = NULL;
1975 	mp->b_prev = NULL;
1976 
1977 	/* Get the size of the message for q_count accounting */
1978 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
1979 		ADD_MBLK_SIZE(tmp, bytecnt);
1980 		mblkcnt++;
1981 	}
1982 
1983 	if (mp->b_band == 0) {		/* Perform q_count accounting */
1984 		q->q_count -= bytecnt;
1985 		q->q_mblkcnt -= mblkcnt;
1986 		if ((q->q_count < q->q_hiwat) &&
1987 		    (q->q_mblkcnt < q->q_hiwat)) {
1988 			q->q_flag &= ~QFULL;
1989 		}
1990 	} else {			/* Perform qb_count accounting */
1991 		qbp->qb_count -= bytecnt;
1992 		qbp->qb_mblkcnt -= mblkcnt;
1993 		if ((qbp->qb_count < qbp->qb_hiwat) &&
1994 		    (qbp->qb_mblkcnt < qbp->qb_hiwat)) {
1995 			qbp->qb_flag &= ~QB_FULL;
1996 		}
1997 	}
1998 	if (freezer != curthread)
1999 		mutex_exit(QLOCK(q));
2000 
2001 	STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL);
2002 }
2003 
2004 /*
2005  * Empty a queue.
2006  * If flag is set, remove all messages.  Otherwise, remove
2007  * only non-control messages.  If queue falls below its low
2008  * water mark, and QWANTW is set, enable the nearest upstream
2009  * service procedure.
2010  *
2011  * Historical note: when merging the M_FLUSH code in strrput with this
2012  * code one difference was discovered. flushq did not have a check
2013  * for q_lowat == 0 in the backenabling test.
2014  *
2015  * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
2016  * if one exists on the queue.
2017  */
2018 void
2019 flushq_common(queue_t *q, int flag, int pcproto_flag)
2020 {
2021 	mblk_t *mp, *nmp;
2022 	qband_t *qbp;
2023 	int backenab = 0;
2024 	unsigned char bpri;
2025 	unsigned char	qbf[NBAND];	/* band flushing backenable flags */
2026 
2027 	if (q->q_first == NULL)
2028 		return;
2029 
2030 	mutex_enter(QLOCK(q));
2031 	mp = q->q_first;
2032 	q->q_first = NULL;
2033 	q->q_last = NULL;
2034 	q->q_count = 0;
2035 	q->q_mblkcnt = 0;
2036 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2037 		qbp->qb_first = NULL;
2038 		qbp->qb_last = NULL;
2039 		qbp->qb_count = 0;
2040 		qbp->qb_mblkcnt = 0;
2041 		qbp->qb_flag &= ~QB_FULL;
2042 	}
2043 	q->q_flag &= ~QFULL;
2044 	mutex_exit(QLOCK(q));
2045 	while (mp) {
2046 		nmp = mp->b_next;
2047 		mp->b_next = mp->b_prev = NULL;
2048 
2049 		STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL);
2050 
2051 		if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
2052 			(void) putq(q, mp);
2053 		else if (flag || datamsg(mp->b_datap->db_type))
2054 			freemsg(mp);
2055 		else
2056 			(void) putq(q, mp);
2057 		mp = nmp;
2058 	}
2059 	bpri = 1;
2060 	mutex_enter(QLOCK(q));
2061 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2062 		if ((qbp->qb_flag & QB_WANTW) &&
2063 		    (((qbp->qb_count < qbp->qb_lowat) &&
2064 		    (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
2065 		    qbp->qb_lowat == 0)) {
2066 			qbp->qb_flag &= ~QB_WANTW;
2067 			backenab = 1;
2068 			qbf[bpri] = 1;
2069 		} else
2070 			qbf[bpri] = 0;
2071 		bpri++;
2072 	}
2073 	ASSERT(bpri == (unsigned char)(q->q_nband + 1));
2074 	if ((q->q_flag & QWANTW) &&
2075 	    (((q->q_count < q->q_lowat) &&
2076 	    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2077 		q->q_flag &= ~QWANTW;
2078 		backenab = 1;
2079 		qbf[0] = 1;
2080 	} else
2081 		qbf[0] = 0;
2082 
2083 	/*
2084 	 * If any band can now be written to, and there is a writer
2085 	 * for that band, then backenable the closest service procedure.
2086 	 */
2087 	if (backenab) {
2088 		mutex_exit(QLOCK(q));
2089 		for (bpri = q->q_nband; bpri != 0; bpri--)
2090 			if (qbf[bpri])
2091 				backenable(q, bpri);
2092 		if (qbf[0])
2093 			backenable(q, 0);
2094 	} else
2095 		mutex_exit(QLOCK(q));
2096 }
2097 
2098 /*
2099  * The real flushing takes place in flushq_common. This is done so that
2100  * a flag which specifies whether or not M_PCPROTO messages should be flushed
2101  * or not. Currently the only place that uses this flag is the stream head.
2102  */
2103 void
2104 flushq(queue_t *q, int flag)
2105 {
2106 	flushq_common(q, flag, 0);
2107 }
2108 
2109 /*
2110  * Flush the queue of messages of the given priority band.
2111  * There is some duplication of code between flushq and flushband.
2112  * This is because we want to optimize the code as much as possible.
2113  * The assumption is that there will be more messages in the normal
2114  * (priority 0) band than in any other.
2115  *
2116  * Historical note: when merging the M_FLUSH code in strrput with this
2117  * code one difference was discovered. flushband had an extra check for
2118  * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
2119  * case. That check does not match the man page for flushband and was not
2120  * in the strrput flush code hence it was removed.
2121  */
2122 void
2123 flushband(queue_t *q, unsigned char pri, int flag)
2124 {
2125 	mblk_t *mp;
2126 	mblk_t *nmp;
2127 	mblk_t *last;
2128 	qband_t *qbp;
2129 	int band;
2130 
2131 	ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
2132 	if (pri > q->q_nband) {
2133 		return;
2134 	}
2135 	mutex_enter(QLOCK(q));
2136 	if (pri == 0) {
2137 		mp = q->q_first;
2138 		q->q_first = NULL;
2139 		q->q_last = NULL;
2140 		q->q_count = 0;
2141 		q->q_mblkcnt = 0;
2142 		for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2143 			qbp->qb_first = NULL;
2144 			qbp->qb_last = NULL;
2145 			qbp->qb_count = 0;
2146 			qbp->qb_mblkcnt = 0;
2147 			qbp->qb_flag &= ~QB_FULL;
2148 		}
2149 		q->q_flag &= ~QFULL;
2150 		mutex_exit(QLOCK(q));
2151 		while (mp) {
2152 			nmp = mp->b_next;
2153 			mp->b_next = mp->b_prev = NULL;
2154 			if ((mp->b_band == 0) &&
2155 				((flag == FLUSHALL) ||
2156 				datamsg(mp->b_datap->db_type)))
2157 				freemsg(mp);
2158 			else
2159 				(void) putq(q, mp);
2160 			mp = nmp;
2161 		}
2162 		mutex_enter(QLOCK(q));
2163 		if ((q->q_flag & QWANTW) &&
2164 		    (((q->q_count < q->q_lowat) &&
2165 		    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2166 			q->q_flag &= ~QWANTW;
2167 			mutex_exit(QLOCK(q));
2168 
2169 			backenable(q, pri);
2170 		} else
2171 			mutex_exit(QLOCK(q));
2172 	} else {	/* pri != 0 */
2173 		boolean_t flushed = B_FALSE;
2174 		band = pri;
2175 
2176 		ASSERT(MUTEX_HELD(QLOCK(q)));
2177 		qbp = q->q_bandp;
2178 		while (--band > 0)
2179 			qbp = qbp->qb_next;
2180 		mp = qbp->qb_first;
2181 		if (mp == NULL) {
2182 			mutex_exit(QLOCK(q));
2183 			return;
2184 		}
2185 		last = qbp->qb_last->b_next;
2186 		/*
2187 		 * rmvq_noenab() and freemsg() are called for each mblk that
2188 		 * meets the criteria.  The loop is executed until the last
2189 		 * mblk has been processed.
2190 		 */
2191 		while (mp != last) {
2192 			ASSERT(mp->b_band == pri);
2193 			nmp = mp->b_next;
2194 			if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
2195 				rmvq_noenab(q, mp);
2196 				freemsg(mp);
2197 				flushed = B_TRUE;
2198 			}
2199 			mp = nmp;
2200 		}
2201 		mutex_exit(QLOCK(q));
2202 
2203 		/*
2204 		 * If any mblk(s) has been freed, we know that qbackenable()
2205 		 * will need to be called.
2206 		 */
2207 		if (flushed)
2208 			qbackenable(q, pri);
2209 	}
2210 }
2211 
2212 /*
2213  * Return 1 if the queue is not full.  If the queue is full, return
2214  * 0 (may not put message) and set QWANTW flag (caller wants to write
2215  * to the queue).
2216  */
2217 int
2218 canput(queue_t *q)
2219 {
2220 	TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
2221 
2222 	/* this is for loopback transports, they should not do a canput */
2223 	ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
2224 
2225 	/* Find next forward module that has a service procedure */
2226 	q = q->q_nfsrv;
2227 
2228 	if (!(q->q_flag & QFULL)) {
2229 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2230 		return (1);
2231 	}
2232 	mutex_enter(QLOCK(q));
2233 	if (q->q_flag & QFULL) {
2234 		q->q_flag |= QWANTW;
2235 		mutex_exit(QLOCK(q));
2236 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
2237 		return (0);
2238 	}
2239 	mutex_exit(QLOCK(q));
2240 	TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2241 	return (1);
2242 }
2243 
2244 /*
2245  * This is the new canput for use with priority bands.  Return 1 if the
2246  * band is not full.  If the band is full, return 0 (may not put message)
2247  * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
2248  * write to the queue).
2249  */
2250 int
2251 bcanput(queue_t *q, unsigned char pri)
2252 {
2253 	qband_t *qbp;
2254 
2255 	TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
2256 	if (!q)
2257 		return (0);
2258 
2259 	/* Find next forward module that has a service procedure */
2260 	q = q->q_nfsrv;
2261 
2262 	mutex_enter(QLOCK(q));
2263 	if (pri == 0) {
2264 		if (q->q_flag & QFULL) {
2265 			q->q_flag |= QWANTW;
2266 			mutex_exit(QLOCK(q));
2267 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2268 				"bcanput:%p %X %d", q, pri, 0);
2269 			return (0);
2270 		}
2271 	} else {	/* pri != 0 */
2272 		if (pri > q->q_nband) {
2273 			/*
2274 			 * No band exists yet, so return success.
2275 			 */
2276 			mutex_exit(QLOCK(q));
2277 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2278 				"bcanput:%p %X %d", q, pri, 1);
2279 			return (1);
2280 		}
2281 		qbp = q->q_bandp;
2282 		while (--pri)
2283 			qbp = qbp->qb_next;
2284 		if (qbp->qb_flag & QB_FULL) {
2285 			qbp->qb_flag |= QB_WANTW;
2286 			mutex_exit(QLOCK(q));
2287 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2288 				"bcanput:%p %X %d", q, pri, 0);
2289 			return (0);
2290 		}
2291 	}
2292 	mutex_exit(QLOCK(q));
2293 	TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2294 		"bcanput:%p %X %d", q, pri, 1);
2295 	return (1);
2296 }
2297 
2298 /*
2299  * Put a message on a queue.
2300  *
2301  * Messages are enqueued on a priority basis.  The priority classes
2302  * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
2303  * and B_NORMAL (type < QPCTL && band == 0).
2304  *
2305  * Add appropriate weighted data block sizes to queue count.
2306  * If queue hits high water mark then set QFULL flag.
2307  *
2308  * If QNOENAB is not set (putq is allowed to enable the queue),
2309  * enable the queue only if the message is PRIORITY,
2310  * or the QWANTR flag is set (indicating that the service procedure
2311  * is ready to read the queue.  This implies that a service
2312  * procedure must NEVER put a high priority message back on its own
2313  * queue, as this would result in an infinite loop (!).
2314  */
2315 int
2316 putq(queue_t *q, mblk_t *bp)
2317 {
2318 	mblk_t *tmp;
2319 	qband_t *qbp = NULL;
2320 	int mcls = (int)queclass(bp);
2321 	kthread_id_t freezer;
2322 	int	bytecnt = 0, mblkcnt = 0;
2323 
2324 	freezer = STREAM(q)->sd_freezer;
2325 	if (freezer == curthread) {
2326 		ASSERT(frozenstr(q));
2327 		ASSERT(MUTEX_HELD(QLOCK(q)));
2328 	} else
2329 		mutex_enter(QLOCK(q));
2330 
2331 	/*
2332 	 * Make sanity checks and if qband structure is not yet
2333 	 * allocated, do so.
2334 	 */
2335 	if (mcls == QPCTL) {
2336 		if (bp->b_band != 0)
2337 			bp->b_band = 0;		/* force to be correct */
2338 	} else if (bp->b_band != 0) {
2339 		int i;
2340 		qband_t **qbpp;
2341 
2342 		if (bp->b_band > q->q_nband) {
2343 
2344 			/*
2345 			 * The qband structure for this priority band is
2346 			 * not on the queue yet, so we have to allocate
2347 			 * one on the fly.  It would be wasteful to
2348 			 * associate the qband structures with every
2349 			 * queue when the queues are allocated.  This is
2350 			 * because most queues will only need the normal
2351 			 * band of flow which can be described entirely
2352 			 * by the queue itself.
2353 			 */
2354 			qbpp = &q->q_bandp;
2355 			while (*qbpp)
2356 				qbpp = &(*qbpp)->qb_next;
2357 			while (bp->b_band > q->q_nband) {
2358 				if ((*qbpp = allocband()) == NULL) {
2359 					if (freezer != curthread)
2360 						mutex_exit(QLOCK(q));
2361 					return (0);
2362 				}
2363 				(*qbpp)->qb_hiwat = q->q_hiwat;
2364 				(*qbpp)->qb_lowat = q->q_lowat;
2365 				q->q_nband++;
2366 				qbpp = &(*qbpp)->qb_next;
2367 			}
2368 		}
2369 		ASSERT(MUTEX_HELD(QLOCK(q)));
2370 		qbp = q->q_bandp;
2371 		i = bp->b_band;
2372 		while (--i)
2373 			qbp = qbp->qb_next;
2374 	}
2375 
2376 	/*
2377 	 * If queue is empty, add the message and initialize the pointers.
2378 	 * Otherwise, adjust message pointers and queue pointers based on
2379 	 * the type of the message and where it belongs on the queue.  Some
2380 	 * code is duplicated to minimize the number of conditionals and
2381 	 * hopefully minimize the amount of time this routine takes.
2382 	 */
2383 	if (!q->q_first) {
2384 		bp->b_next = NULL;
2385 		bp->b_prev = NULL;
2386 		q->q_first = bp;
2387 		q->q_last = bp;
2388 		if (qbp) {
2389 			qbp->qb_first = bp;
2390 			qbp->qb_last = bp;
2391 		}
2392 	} else if (!qbp) {	/* bp->b_band == 0 */
2393 
2394 		/*
2395 		 * If queue class of message is less than or equal to
2396 		 * that of the last one on the queue, tack on to the end.
2397 		 */
2398 		tmp = q->q_last;
2399 		if (mcls <= (int)queclass(tmp)) {
2400 			bp->b_next = NULL;
2401 			bp->b_prev = tmp;
2402 			tmp->b_next = bp;
2403 			q->q_last = bp;
2404 		} else {
2405 			tmp = q->q_first;
2406 			while ((int)queclass(tmp) >= mcls)
2407 				tmp = tmp->b_next;
2408 
2409 			/*
2410 			 * Insert bp before tmp.
2411 			 */
2412 			bp->b_next = tmp;
2413 			bp->b_prev = tmp->b_prev;
2414 			if (tmp->b_prev)
2415 				tmp->b_prev->b_next = bp;
2416 			else
2417 				q->q_first = bp;
2418 			tmp->b_prev = bp;
2419 		}
2420 	} else {		/* bp->b_band != 0 */
2421 		if (qbp->qb_first) {
2422 			tmp = qbp->qb_last;
2423 
2424 			/*
2425 			 * Insert bp after the last message in this band.
2426 			 */
2427 			bp->b_next = tmp->b_next;
2428 			if (tmp->b_next)
2429 				tmp->b_next->b_prev = bp;
2430 			else
2431 				q->q_last = bp;
2432 			bp->b_prev = tmp;
2433 			tmp->b_next = bp;
2434 		} else {
2435 			tmp = q->q_last;
2436 			if ((mcls < (int)queclass(tmp)) ||
2437 			    (bp->b_band <= tmp->b_band)) {
2438 
2439 				/*
2440 				 * Tack bp on end of queue.
2441 				 */
2442 				bp->b_next = NULL;
2443 				bp->b_prev = tmp;
2444 				tmp->b_next = bp;
2445 				q->q_last = bp;
2446 			} else {
2447 				tmp = q->q_first;
2448 				while (tmp->b_datap->db_type >= QPCTL)
2449 					tmp = tmp->b_next;
2450 				while (tmp->b_band >= bp->b_band)
2451 					tmp = tmp->b_next;
2452 
2453 				/*
2454 				 * Insert bp before tmp.
2455 				 */
2456 				bp->b_next = tmp;
2457 				bp->b_prev = tmp->b_prev;
2458 				if (tmp->b_prev)
2459 					tmp->b_prev->b_next = bp;
2460 				else
2461 					q->q_first = bp;
2462 				tmp->b_prev = bp;
2463 			}
2464 			qbp->qb_first = bp;
2465 		}
2466 		qbp->qb_last = bp;
2467 	}
2468 
2469 	/* Get message byte count for q_count accounting */
2470 	for (tmp = bp; tmp; tmp = tmp->b_cont) {
2471 		ADD_MBLK_SIZE(tmp, bytecnt);
2472 		mblkcnt++;
2473 	}
2474 
2475 	if (qbp) {
2476 		qbp->qb_count += bytecnt;
2477 		qbp->qb_mblkcnt += mblkcnt;
2478 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2479 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2480 			qbp->qb_flag |= QB_FULL;
2481 		}
2482 	} else {
2483 		q->q_count += bytecnt;
2484 		q->q_mblkcnt += mblkcnt;
2485 		if ((q->q_count >= q->q_hiwat) ||
2486 		    (q->q_mblkcnt >= q->q_hiwat)) {
2487 			q->q_flag |= QFULL;
2488 		}
2489 	}
2490 
2491 	STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL);
2492 
2493 	if ((mcls > QNORM) ||
2494 	    (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
2495 		qenable_locked(q);
2496 	ASSERT(MUTEX_HELD(QLOCK(q)));
2497 	if (freezer != curthread)
2498 		mutex_exit(QLOCK(q));
2499 
2500 	return (1);
2501 }
2502 
2503 /*
2504  * Put stuff back at beginning of Q according to priority order.
2505  * See comment on putq above for details.
2506  */
2507 int
2508 putbq(queue_t *q, mblk_t *bp)
2509 {
2510 	mblk_t *tmp;
2511 	qband_t *qbp = NULL;
2512 	int mcls = (int)queclass(bp);
2513 	kthread_id_t freezer;
2514 	int	bytecnt = 0, mblkcnt = 0;
2515 
2516 	ASSERT(q && bp);
2517 	ASSERT(bp->b_next == NULL);
2518 	freezer = STREAM(q)->sd_freezer;
2519 	if (freezer == curthread) {
2520 		ASSERT(frozenstr(q));
2521 		ASSERT(MUTEX_HELD(QLOCK(q)));
2522 	} else
2523 		mutex_enter(QLOCK(q));
2524 
2525 	/*
2526 	 * Make sanity checks and if qband structure is not yet
2527 	 * allocated, do so.
2528 	 */
2529 	if (mcls == QPCTL) {
2530 		if (bp->b_band != 0)
2531 			bp->b_band = 0;		/* force to be correct */
2532 	} else if (bp->b_band != 0) {
2533 		int i;
2534 		qband_t **qbpp;
2535 
2536 		if (bp->b_band > q->q_nband) {
2537 			qbpp = &q->q_bandp;
2538 			while (*qbpp)
2539 				qbpp = &(*qbpp)->qb_next;
2540 			while (bp->b_band > q->q_nband) {
2541 				if ((*qbpp = allocband()) == NULL) {
2542 					if (freezer != curthread)
2543 						mutex_exit(QLOCK(q));
2544 					return (0);
2545 				}
2546 				(*qbpp)->qb_hiwat = q->q_hiwat;
2547 				(*qbpp)->qb_lowat = q->q_lowat;
2548 				q->q_nband++;
2549 				qbpp = &(*qbpp)->qb_next;
2550 			}
2551 		}
2552 		qbp = q->q_bandp;
2553 		i = bp->b_band;
2554 		while (--i)
2555 			qbp = qbp->qb_next;
2556 	}
2557 
2558 	/*
2559 	 * If queue is empty or if message is high priority,
2560 	 * place on the front of the queue.
2561 	 */
2562 	tmp = q->q_first;
2563 	if ((!tmp) || (mcls == QPCTL)) {
2564 		bp->b_next = tmp;
2565 		if (tmp)
2566 			tmp->b_prev = bp;
2567 		else
2568 			q->q_last = bp;
2569 		q->q_first = bp;
2570 		bp->b_prev = NULL;
2571 		if (qbp) {
2572 			qbp->qb_first = bp;
2573 			qbp->qb_last = bp;
2574 		}
2575 	} else if (qbp) {	/* bp->b_band != 0 */
2576 		tmp = qbp->qb_first;
2577 		if (tmp) {
2578 
2579 			/*
2580 			 * Insert bp before the first message in this band.
2581 			 */
2582 			bp->b_next = tmp;
2583 			bp->b_prev = tmp->b_prev;
2584 			if (tmp->b_prev)
2585 				tmp->b_prev->b_next = bp;
2586 			else
2587 				q->q_first = bp;
2588 			tmp->b_prev = bp;
2589 		} else {
2590 			tmp = q->q_last;
2591 			if ((mcls < (int)queclass(tmp)) ||
2592 			    (bp->b_band < tmp->b_band)) {
2593 
2594 				/*
2595 				 * Tack bp on end of queue.
2596 				 */
2597 				bp->b_next = NULL;
2598 				bp->b_prev = tmp;
2599 				tmp->b_next = bp;
2600 				q->q_last = bp;
2601 			} else {
2602 				tmp = q->q_first;
2603 				while (tmp->b_datap->db_type >= QPCTL)
2604 					tmp = tmp->b_next;
2605 				while (tmp->b_band > bp->b_band)
2606 					tmp = tmp->b_next;
2607 
2608 				/*
2609 				 * Insert bp before tmp.
2610 				 */
2611 				bp->b_next = tmp;
2612 				bp->b_prev = tmp->b_prev;
2613 				if (tmp->b_prev)
2614 					tmp->b_prev->b_next = bp;
2615 				else
2616 					q->q_first = bp;
2617 				tmp->b_prev = bp;
2618 			}
2619 			qbp->qb_last = bp;
2620 		}
2621 		qbp->qb_first = bp;
2622 	} else {		/* bp->b_band == 0 && !QPCTL */
2623 
2624 		/*
2625 		 * If the queue class or band is less than that of the last
2626 		 * message on the queue, tack bp on the end of the queue.
2627 		 */
2628 		tmp = q->q_last;
2629 		if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
2630 			bp->b_next = NULL;
2631 			bp->b_prev = tmp;
2632 			tmp->b_next = bp;
2633 			q->q_last = bp;
2634 		} else {
2635 			tmp = q->q_first;
2636 			while (tmp->b_datap->db_type >= QPCTL)
2637 				tmp = tmp->b_next;
2638 			while (tmp->b_band > bp->b_band)
2639 				tmp = tmp->b_next;
2640 
2641 			/*
2642 			 * Insert bp before tmp.
2643 			 */
2644 			bp->b_next = tmp;
2645 			bp->b_prev = tmp->b_prev;
2646 			if (tmp->b_prev)
2647 				tmp->b_prev->b_next = bp;
2648 			else
2649 				q->q_first = bp;
2650 			tmp->b_prev = bp;
2651 		}
2652 	}
2653 
2654 	/* Get message byte count for q_count accounting */
2655 	for (tmp = bp; tmp; tmp = tmp->b_cont) {
2656 		ADD_MBLK_SIZE(tmp, bytecnt);
2657 		mblkcnt++;
2658 	}
2659 	if (qbp) {
2660 		qbp->qb_count += bytecnt;
2661 		qbp->qb_mblkcnt += mblkcnt;
2662 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2663 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2664 			qbp->qb_flag |= QB_FULL;
2665 		}
2666 	} else {
2667 		q->q_count += bytecnt;
2668 		q->q_mblkcnt += mblkcnt;
2669 		if ((q->q_count >= q->q_hiwat) ||
2670 		    (q->q_mblkcnt >= q->q_hiwat)) {
2671 			q->q_flag |= QFULL;
2672 		}
2673 	}
2674 
2675 	STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL);
2676 
2677 	if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
2678 		qenable_locked(q);
2679 	ASSERT(MUTEX_HELD(QLOCK(q)));
2680 	if (freezer != curthread)
2681 		mutex_exit(QLOCK(q));
2682 
2683 	return (1);
2684 }
2685 
2686 /*
2687  * Insert a message before an existing message on the queue.  If the
2688  * existing message is NULL, the new messages is placed on the end of
2689  * the queue.  The queue class of the new message is ignored.  However,
2690  * the priority band of the new message must adhere to the following
2691  * ordering:
2692  *
2693  *	emp->b_prev->b_band >= mp->b_band >= emp->b_band.
2694  *
2695  * All flow control parameters are updated.
2696  *
2697  * insq can be called with the stream frozen, but other utility functions
2698  * holding QLOCK, and by streams modules without any locks/frozen.
2699  */
2700 int
2701 insq(queue_t *q, mblk_t *emp, mblk_t *mp)
2702 {
2703 	mblk_t *tmp;
2704 	qband_t *qbp = NULL;
2705 	int mcls = (int)queclass(mp);
2706 	kthread_id_t freezer;
2707 	int	bytecnt = 0, mblkcnt = 0;
2708 
2709 	freezer = STREAM(q)->sd_freezer;
2710 	if (freezer == curthread) {
2711 		ASSERT(frozenstr(q));
2712 		ASSERT(MUTEX_HELD(QLOCK(q)));
2713 	} else if (MUTEX_HELD(QLOCK(q))) {
2714 		/* Don't drop lock on exit */
2715 		freezer = curthread;
2716 	} else
2717 		mutex_enter(QLOCK(q));
2718 
2719 	if (mcls == QPCTL) {
2720 		if (mp->b_band != 0)
2721 			mp->b_band = 0;		/* force to be correct */
2722 		if (emp && emp->b_prev &&
2723 		    (emp->b_prev->b_datap->db_type < QPCTL))
2724 			goto badord;
2725 	}
2726 	if (emp) {
2727 		if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
2728 		    (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
2729 		    (emp->b_prev->b_band < mp->b_band))) {
2730 			goto badord;
2731 		}
2732 	} else {
2733 		tmp = q->q_last;
2734 		if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
2735 badord:
2736 			cmn_err(CE_WARN,
2737 			    "insq: attempt to insert message out of order "
2738 			    "on q %p", (void *)q);
2739 			if (freezer != curthread)
2740 				mutex_exit(QLOCK(q));
2741 			return (0);
2742 		}
2743 	}
2744 
2745 	if (mp->b_band != 0) {
2746 		int i;
2747 		qband_t **qbpp;
2748 
2749 		if (mp->b_band > q->q_nband) {
2750 			qbpp = &q->q_bandp;
2751 			while (*qbpp)
2752 				qbpp = &(*qbpp)->qb_next;
2753 			while (mp->b_band > q->q_nband) {
2754 				if ((*qbpp = allocband()) == NULL) {
2755 					if (freezer != curthread)
2756 						mutex_exit(QLOCK(q));
2757 					return (0);
2758 				}
2759 				(*qbpp)->qb_hiwat = q->q_hiwat;
2760 				(*qbpp)->qb_lowat = q->q_lowat;
2761 				q->q_nband++;
2762 				qbpp = &(*qbpp)->qb_next;
2763 			}
2764 		}
2765 		qbp = q->q_bandp;
2766 		i = mp->b_band;
2767 		while (--i)
2768 			qbp = qbp->qb_next;
2769 	}
2770 
2771 	if ((mp->b_next = emp) != NULL) {
2772 		if ((mp->b_prev = emp->b_prev) != NULL)
2773 			emp->b_prev->b_next = mp;
2774 		else
2775 			q->q_first = mp;
2776 		emp->b_prev = mp;
2777 	} else {
2778 		if ((mp->b_prev = q->q_last) != NULL)
2779 			q->q_last->b_next = mp;
2780 		else
2781 			q->q_first = mp;
2782 		q->q_last = mp;
2783 	}
2784 
2785 	/* Get mblk and byte count for q_count accounting */
2786 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
2787 		ADD_MBLK_SIZE(tmp, bytecnt);
2788 		mblkcnt++;
2789 	}
2790 
2791 	if (qbp) {	/* adjust qband pointers and count */
2792 		if (!qbp->qb_first) {
2793 			qbp->qb_first = mp;
2794 			qbp->qb_last = mp;
2795 		} else {
2796 			if (mp->b_prev == NULL || (mp->b_prev != NULL &&
2797 			    (mp->b_prev->b_band != mp->b_band)))
2798 				qbp->qb_first = mp;
2799 			else if (mp->b_next == NULL || (mp->b_next != NULL &&
2800 			    (mp->b_next->b_band != mp->b_band)))
2801 				qbp->qb_last = mp;
2802 		}
2803 		qbp->qb_count += bytecnt;
2804 		qbp->qb_mblkcnt += mblkcnt;
2805 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2806 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2807 			qbp->qb_flag |= QB_FULL;
2808 		}
2809 	} else {
2810 		q->q_count += bytecnt;
2811 		q->q_mblkcnt += mblkcnt;
2812 		if ((q->q_count >= q->q_hiwat) ||
2813 		    (q->q_mblkcnt >= q->q_hiwat)) {
2814 			q->q_flag |= QFULL;
2815 		}
2816 	}
2817 
2818 	STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL);
2819 
2820 	if (canenable(q) && (q->q_flag & QWANTR))
2821 		qenable_locked(q);
2822 
2823 	ASSERT(MUTEX_HELD(QLOCK(q)));
2824 	if (freezer != curthread)
2825 		mutex_exit(QLOCK(q));
2826 
2827 	return (1);
2828 }
2829 
2830 /*
2831  * Create and put a control message on queue.
2832  */
2833 int
2834 putctl(queue_t *q, int type)
2835 {
2836 	mblk_t *bp;
2837 
2838 	if ((datamsg(type) && (type != M_DELAY)) ||
2839 	    (bp = allocb_tryhard(0)) == NULL)
2840 		return (0);
2841 	bp->b_datap->db_type = (unsigned char) type;
2842 
2843 	put(q, bp);
2844 
2845 	return (1);
2846 }
2847 
2848 /*
2849  * Control message with a single-byte parameter
2850  */
2851 int
2852 putctl1(queue_t *q, int type, int param)
2853 {
2854 	mblk_t *bp;
2855 
2856 	if ((datamsg(type) && (type != M_DELAY)) ||
2857 	    (bp = allocb_tryhard(1)) == NULL)
2858 		return (0);
2859 	bp->b_datap->db_type = (unsigned char)type;
2860 	*bp->b_wptr++ = (unsigned char)param;
2861 
2862 	put(q, bp);
2863 
2864 	return (1);
2865 }
2866 
2867 int
2868 putnextctl1(queue_t *q, int type, int param)
2869 {
2870 	mblk_t *bp;
2871 
2872 	if ((datamsg(type) && (type != M_DELAY)) ||
2873 		((bp = allocb_tryhard(1)) == NULL))
2874 		return (0);
2875 
2876 	bp->b_datap->db_type = (unsigned char)type;
2877 	*bp->b_wptr++ = (unsigned char)param;
2878 
2879 	putnext(q, bp);
2880 
2881 	return (1);
2882 }
2883 
2884 int
2885 putnextctl(queue_t *q, int type)
2886 {
2887 	mblk_t *bp;
2888 
2889 	if ((datamsg(type) && (type != M_DELAY)) ||
2890 		((bp = allocb_tryhard(0)) == NULL))
2891 		return (0);
2892 	bp->b_datap->db_type = (unsigned char)type;
2893 
2894 	putnext(q, bp);
2895 
2896 	return (1);
2897 }
2898 
2899 /*
2900  * Return the queue upstream from this one
2901  */
2902 queue_t *
2903 backq(queue_t *q)
2904 {
2905 	q = _OTHERQ(q);
2906 	if (q->q_next) {
2907 		q = q->q_next;
2908 		return (_OTHERQ(q));
2909 	}
2910 	return (NULL);
2911 }
2912 
2913 /*
2914  * Send a block back up the queue in reverse from this
2915  * one (e.g. to respond to ioctls)
2916  */
2917 void
2918 qreply(queue_t *q, mblk_t *bp)
2919 {
2920 	ASSERT(q && bp);
2921 
2922 	putnext(_OTHERQ(q), bp);
2923 }
2924 
2925 /*
2926  * Streams Queue Scheduling
2927  *
2928  * Queues are enabled through qenable() when they have messages to
2929  * process.  They are serviced by queuerun(), which runs each enabled
2930  * queue's service procedure.  The call to queuerun() is processor
2931  * dependent - the general principle is that it be run whenever a queue
2932  * is enabled but before returning to user level.  For system calls,
2933  * the function runqueues() is called if their action causes a queue
2934  * to be enabled.  For device interrupts, queuerun() should be
2935  * called before returning from the last level of interrupt.  Beyond
2936  * this, no timing assumptions should be made about queue scheduling.
2937  */
2938 
2939 /*
2940  * Enable a queue: put it on list of those whose service procedures are
2941  * ready to run and set up the scheduling mechanism.
2942  * The broadcast is done outside the mutex -> to avoid the woken thread
2943  * from contending with the mutex. This is OK 'cos the queue has been
2944  * enqueued on the runlist and flagged safely at this point.
2945  */
2946 void
2947 qenable(queue_t *q)
2948 {
2949 	mutex_enter(QLOCK(q));
2950 	qenable_locked(q);
2951 	mutex_exit(QLOCK(q));
2952 }
2953 /*
2954  * Return number of messages on queue
2955  */
2956 int
2957 qsize(queue_t *qp)
2958 {
2959 	int count = 0;
2960 	mblk_t *mp;
2961 
2962 	mutex_enter(QLOCK(qp));
2963 	for (mp = qp->q_first; mp; mp = mp->b_next)
2964 		count++;
2965 	mutex_exit(QLOCK(qp));
2966 	return (count);
2967 }
2968 
2969 /*
2970  * noenable - set queue so that putq() will not enable it.
2971  * enableok - set queue so that putq() can enable it.
2972  */
2973 void
2974 noenable(queue_t *q)
2975 {
2976 	mutex_enter(QLOCK(q));
2977 	q->q_flag |= QNOENB;
2978 	mutex_exit(QLOCK(q));
2979 }
2980 
2981 void
2982 enableok(queue_t *q)
2983 {
2984 	mutex_enter(QLOCK(q));
2985 	q->q_flag &= ~QNOENB;
2986 	mutex_exit(QLOCK(q));
2987 }
2988 
2989 /*
2990  * Set queue fields.
2991  */
2992 int
2993 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
2994 {
2995 	qband_t *qbp = NULL;
2996 	queue_t	*wrq;
2997 	int error = 0;
2998 	kthread_id_t freezer;
2999 
3000 	freezer = STREAM(q)->sd_freezer;
3001 	if (freezer == curthread) {
3002 		ASSERT(frozenstr(q));
3003 		ASSERT(MUTEX_HELD(QLOCK(q)));
3004 	} else
3005 		mutex_enter(QLOCK(q));
3006 
3007 	if (what >= QBAD) {
3008 		error = EINVAL;
3009 		goto done;
3010 	}
3011 	if (pri != 0) {
3012 		int i;
3013 		qband_t **qbpp;
3014 
3015 		if (pri > q->q_nband) {
3016 			qbpp = &q->q_bandp;
3017 			while (*qbpp)
3018 				qbpp = &(*qbpp)->qb_next;
3019 			while (pri > q->q_nband) {
3020 				if ((*qbpp = allocband()) == NULL) {
3021 					error = EAGAIN;
3022 					goto done;
3023 				}
3024 				(*qbpp)->qb_hiwat = q->q_hiwat;
3025 				(*qbpp)->qb_lowat = q->q_lowat;
3026 				q->q_nband++;
3027 				qbpp = &(*qbpp)->qb_next;
3028 			}
3029 		}
3030 		qbp = q->q_bandp;
3031 		i = pri;
3032 		while (--i)
3033 			qbp = qbp->qb_next;
3034 	}
3035 	switch (what) {
3036 
3037 	case QHIWAT:
3038 		if (qbp)
3039 			qbp->qb_hiwat = (size_t)val;
3040 		else
3041 			q->q_hiwat = (size_t)val;
3042 		break;
3043 
3044 	case QLOWAT:
3045 		if (qbp)
3046 			qbp->qb_lowat = (size_t)val;
3047 		else
3048 			q->q_lowat = (size_t)val;
3049 		break;
3050 
3051 	case QMAXPSZ:
3052 		if (qbp)
3053 			error = EINVAL;
3054 		else
3055 			q->q_maxpsz = (ssize_t)val;
3056 
3057 		/*
3058 		 * Performance concern, strwrite looks at the module below
3059 		 * the stream head for the maxpsz each time it does a write
3060 		 * we now cache it at the stream head.  Check to see if this
3061 		 * queue is sitting directly below the stream head.
3062 		 */
3063 		wrq = STREAM(q)->sd_wrq;
3064 		if (q != wrq->q_next)
3065 			break;
3066 
3067 		/*
3068 		 * If the stream is not frozen drop the current QLOCK and
3069 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3070 		 */
3071 		if (freezer != curthread) {
3072 			mutex_exit(QLOCK(q));
3073 			mutex_enter(QLOCK(wrq));
3074 		}
3075 		ASSERT(MUTEX_HELD(QLOCK(wrq)));
3076 
3077 		if (strmsgsz != 0) {
3078 			if (val == INFPSZ)
3079 				val = strmsgsz;
3080 			else  {
3081 				if (STREAM(q)->sd_vnode->v_type == VFIFO)
3082 					val = MIN(PIPE_BUF, val);
3083 				else
3084 					val = MIN(strmsgsz, val);
3085 			}
3086 		}
3087 		STREAM(q)->sd_qn_maxpsz = val;
3088 		if (freezer != curthread) {
3089 			mutex_exit(QLOCK(wrq));
3090 			mutex_enter(QLOCK(q));
3091 		}
3092 		break;
3093 
3094 	case QMINPSZ:
3095 		if (qbp)
3096 			error = EINVAL;
3097 		else
3098 			q->q_minpsz = (ssize_t)val;
3099 
3100 		/*
3101 		 * Performance concern, strwrite looks at the module below
3102 		 * the stream head for the maxpsz each time it does a write
3103 		 * we now cache it at the stream head.  Check to see if this
3104 		 * queue is sitting directly below the stream head.
3105 		 */
3106 		wrq = STREAM(q)->sd_wrq;
3107 		if (q != wrq->q_next)
3108 			break;
3109 
3110 		/*
3111 		 * If the stream is not frozen drop the current QLOCK and
3112 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3113 		 */
3114 		if (freezer != curthread) {
3115 			mutex_exit(QLOCK(q));
3116 			mutex_enter(QLOCK(wrq));
3117 		}
3118 		STREAM(q)->sd_qn_minpsz = (ssize_t)val;
3119 
3120 		if (freezer != curthread) {
3121 			mutex_exit(QLOCK(wrq));
3122 			mutex_enter(QLOCK(q));
3123 		}
3124 		break;
3125 
3126 	case QSTRUIOT:
3127 		if (qbp)
3128 			error = EINVAL;
3129 		else
3130 			q->q_struiot = (ushort_t)val;
3131 		break;
3132 
3133 	case QCOUNT:
3134 	case QFIRST:
3135 	case QLAST:
3136 	case QFLAG:
3137 		error = EPERM;
3138 		break;
3139 
3140 	default:
3141 		error = EINVAL;
3142 		break;
3143 	}
3144 done:
3145 	if (freezer != curthread)
3146 		mutex_exit(QLOCK(q));
3147 	return (error);
3148 }
3149 
3150 /*
3151  * Get queue fields.
3152  */
3153 int
3154 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
3155 {
3156 	qband_t 	*qbp = NULL;
3157 	int 		error = 0;
3158 	kthread_id_t 	freezer;
3159 
3160 	freezer = STREAM(q)->sd_freezer;
3161 	if (freezer == curthread) {
3162 		ASSERT(frozenstr(q));
3163 		ASSERT(MUTEX_HELD(QLOCK(q)));
3164 	} else
3165 		mutex_enter(QLOCK(q));
3166 	if (what >= QBAD) {
3167 		error = EINVAL;
3168 		goto done;
3169 	}
3170 	if (pri != 0) {
3171 		int i;
3172 		qband_t **qbpp;
3173 
3174 		if (pri > q->q_nband) {
3175 			qbpp = &q->q_bandp;
3176 			while (*qbpp)
3177 				qbpp = &(*qbpp)->qb_next;
3178 			while (pri > q->q_nband) {
3179 				if ((*qbpp = allocband()) == NULL) {
3180 					error = EAGAIN;
3181 					goto done;
3182 				}
3183 				(*qbpp)->qb_hiwat = q->q_hiwat;
3184 				(*qbpp)->qb_lowat = q->q_lowat;
3185 				q->q_nband++;
3186 				qbpp = &(*qbpp)->qb_next;
3187 			}
3188 		}
3189 		qbp = q->q_bandp;
3190 		i = pri;
3191 		while (--i)
3192 			qbp = qbp->qb_next;
3193 	}
3194 	switch (what) {
3195 	case QHIWAT:
3196 		if (qbp)
3197 			*(size_t *)valp = qbp->qb_hiwat;
3198 		else
3199 			*(size_t *)valp = q->q_hiwat;
3200 		break;
3201 
3202 	case QLOWAT:
3203 		if (qbp)
3204 			*(size_t *)valp = qbp->qb_lowat;
3205 		else
3206 			*(size_t *)valp = q->q_lowat;
3207 		break;
3208 
3209 	case QMAXPSZ:
3210 		if (qbp)
3211 			error = EINVAL;
3212 		else
3213 			*(ssize_t *)valp = q->q_maxpsz;
3214 		break;
3215 
3216 	case QMINPSZ:
3217 		if (qbp)
3218 			error = EINVAL;
3219 		else
3220 			*(ssize_t *)valp = q->q_minpsz;
3221 		break;
3222 
3223 	case QCOUNT:
3224 		if (qbp)
3225 			*(size_t *)valp = qbp->qb_count;
3226 		else
3227 			*(size_t *)valp = q->q_count;
3228 		break;
3229 
3230 	case QFIRST:
3231 		if (qbp)
3232 			*(mblk_t **)valp = qbp->qb_first;
3233 		else
3234 			*(mblk_t **)valp = q->q_first;
3235 		break;
3236 
3237 	case QLAST:
3238 		if (qbp)
3239 			*(mblk_t **)valp = qbp->qb_last;
3240 		else
3241 			*(mblk_t **)valp = q->q_last;
3242 		break;
3243 
3244 	case QFLAG:
3245 		if (qbp)
3246 			*(uint_t *)valp = qbp->qb_flag;
3247 		else
3248 			*(uint_t *)valp = q->q_flag;
3249 		break;
3250 
3251 	case QSTRUIOT:
3252 		if (qbp)
3253 			error = EINVAL;
3254 		else
3255 			*(short *)valp = q->q_struiot;
3256 		break;
3257 
3258 	default:
3259 		error = EINVAL;
3260 		break;
3261 	}
3262 done:
3263 	if (freezer != curthread)
3264 		mutex_exit(QLOCK(q));
3265 	return (error);
3266 }
3267 
3268 /*
3269  * Function awakes all in cvwait/sigwait/pollwait, on one of:
3270  *	QWANTWSYNC or QWANTR or QWANTW,
3271  *
3272  * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
3273  *	 deferred wakeup will be done. Also if strpoll() in progress then a
3274  *	 deferred pollwakeup will be done.
3275  */
3276 void
3277 strwakeq(queue_t *q, int flag)
3278 {
3279 	stdata_t 	*stp = STREAM(q);
3280 	pollhead_t 	*pl;
3281 
3282 	mutex_enter(&stp->sd_lock);
3283 	pl = &stp->sd_pollist;
3284 	if (flag & QWANTWSYNC) {
3285 		ASSERT(!(q->q_flag & QREADR));
3286 		if (stp->sd_flag & WSLEEP) {
3287 			stp->sd_flag &= ~WSLEEP;
3288 			cv_broadcast(&stp->sd_wrq->q_wait);
3289 		} else {
3290 			stp->sd_wakeq |= WSLEEP;
3291 		}
3292 
3293 		mutex_exit(&stp->sd_lock);
3294 		pollwakeup(pl, POLLWRNORM);
3295 		mutex_enter(&stp->sd_lock);
3296 
3297 		if (stp->sd_sigflags & S_WRNORM)
3298 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3299 	} else if (flag & QWANTR) {
3300 		if (stp->sd_flag & RSLEEP) {
3301 			stp->sd_flag &= ~RSLEEP;
3302 			cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
3303 		} else {
3304 			stp->sd_wakeq |= RSLEEP;
3305 		}
3306 
3307 		mutex_exit(&stp->sd_lock);
3308 		pollwakeup(pl, POLLIN | POLLRDNORM);
3309 		mutex_enter(&stp->sd_lock);
3310 
3311 		{
3312 			int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
3313 
3314 			if (events)
3315 				strsendsig(stp->sd_siglist, events, 0, 0);
3316 		}
3317 	} else {
3318 		if (stp->sd_flag & WSLEEP) {
3319 			stp->sd_flag &= ~WSLEEP;
3320 			cv_broadcast(&stp->sd_wrq->q_wait);
3321 		}
3322 
3323 		mutex_exit(&stp->sd_lock);
3324 		pollwakeup(pl, POLLWRNORM);
3325 		mutex_enter(&stp->sd_lock);
3326 
3327 		if (stp->sd_sigflags & S_WRNORM)
3328 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3329 	}
3330 	mutex_exit(&stp->sd_lock);
3331 }
3332 
3333 int
3334 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
3335 {
3336 	stdata_t *stp = STREAM(q);
3337 	int typ  = STRUIOT_STANDARD;
3338 	uio_t	 *uiop = &dp->d_uio;
3339 	dblk_t	 *dbp;
3340 	ssize_t	 uiocnt;
3341 	ssize_t	 cnt;
3342 	unsigned char *ptr;
3343 	ssize_t	 resid;
3344 	int	 error = 0;
3345 	on_trap_data_t otd;
3346 	queue_t	*stwrq;
3347 
3348 	/*
3349 	 * Plumbing may change while taking the type so store the
3350 	 * queue in a temporary variable. It doesn't matter even
3351 	 * if the we take the type from the previous plumbing,
3352 	 * that's because if the plumbing has changed when we were
3353 	 * holding the queue in a temporary variable, we can continue
3354 	 * processing the message the way it would have been processed
3355 	 * in the old plumbing, without any side effects but a bit
3356 	 * extra processing for partial ip header checksum.
3357 	 *
3358 	 * This has been done to avoid holding the sd_lock which is
3359 	 * very hot.
3360 	 */
3361 
3362 	stwrq = stp->sd_struiowrq;
3363 	if (stwrq)
3364 		typ = stwrq->q_struiot;
3365 
3366 	for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
3367 		dbp = mp->b_datap;
3368 		ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
3369 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3370 		cnt = MIN(uiocnt, uiop->uio_resid);
3371 		if (!(dbp->db_struioflag & STRUIO_SPEC) ||
3372 		    (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
3373 			/*
3374 			 * Either this mblk has already been processed
3375 			 * or there is no more room in this mblk (?).
3376 			 */
3377 			continue;
3378 		}
3379 		switch (typ) {
3380 		case STRUIOT_STANDARD:
3381 			if (noblock) {
3382 				if (on_trap(&otd, OT_DATA_ACCESS)) {
3383 					no_trap();
3384 					error = EWOULDBLOCK;
3385 					goto out;
3386 				}
3387 			}
3388 			if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
3389 				if (noblock)
3390 					no_trap();
3391 				goto out;
3392 			}
3393 			if (noblock)
3394 				no_trap();
3395 			break;
3396 
3397 		default:
3398 			error = EIO;
3399 			goto out;
3400 		}
3401 		dbp->db_struioflag |= STRUIO_DONE;
3402 		dbp->db_cksumstuff += cnt;
3403 	}
3404 out:
3405 	if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
3406 		/*
3407 		 * A fault has occured and some bytes were moved to the
3408 		 * current mblk, the uio_t has already been updated by
3409 		 * the appropriate uio routine, so also update the mblk
3410 		 * to reflect this in case this same mblk chain is used
3411 		 * again (after the fault has been handled).
3412 		 */
3413 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3414 		if (uiocnt >= resid)
3415 			dbp->db_cksumstuff += resid;
3416 	}
3417 	return (error);
3418 }
3419 
3420 /*
3421  * Try to enter queue synchronously. Any attempt to enter a closing queue will
3422  * fails. The qp->q_rwcnt keeps track of the number of successful entries so
3423  * that removeq() will not try to close the queue while a thread is inside the
3424  * queue.
3425  */
3426 static boolean_t
3427 rwnext_enter(queue_t *qp)
3428 {
3429 	mutex_enter(QLOCK(qp));
3430 	if (qp->q_flag & QWCLOSE) {
3431 		mutex_exit(QLOCK(qp));
3432 		return (B_FALSE);
3433 	}
3434 	qp->q_rwcnt++;
3435 	ASSERT(qp->q_rwcnt != 0);
3436 	mutex_exit(QLOCK(qp));
3437 	return (B_TRUE);
3438 }
3439 
3440 /*
3441  * Decrease the count of threads running in sync stream queue and wake up any
3442  * threads blocked in removeq().
3443  */
3444 static void
3445 rwnext_exit(queue_t *qp)
3446 {
3447 	mutex_enter(QLOCK(qp));
3448 	qp->q_rwcnt--;
3449 	if (qp->q_flag & QWANTRMQSYNC) {
3450 		qp->q_flag &= ~QWANTRMQSYNC;
3451 		cv_broadcast(&qp->q_wait);
3452 	}
3453 	mutex_exit(QLOCK(qp));
3454 }
3455 
3456 /*
3457  * The purpose of rwnext() is to call the rw procedure of the next
3458  * (downstream) modules queue.
3459  *
3460  * treated as put entrypoint for perimeter syncronization.
3461  *
3462  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3463  * sync queues). If it is CIPUT sync queue sq_count is incremented and it does
3464  * not matter if any regular put entrypoints have been already entered. We
3465  * can't increment one of the sq_putcounts (instead of sq_count) because
3466  * qwait_rw won't know which counter to decrement.
3467  *
3468  * It would be reasonable to add the lockless FASTPUT logic.
3469  */
3470 int
3471 rwnext(queue_t *qp, struiod_t *dp)
3472 {
3473 	queue_t		*nqp;
3474 	syncq_t		*sq;
3475 	uint16_t	count;
3476 	uint16_t	flags;
3477 	struct qinit	*qi;
3478 	int		(*proc)();
3479 	struct stdata	*stp;
3480 	int		isread;
3481 	int		rval;
3482 
3483 	stp = STREAM(qp);
3484 	/*
3485 	 * Prevent q_next from changing by holding sd_lock until acquiring
3486 	 * SQLOCK. Note that a read-side rwnext from the streamhead will
3487 	 * already have sd_lock acquired. In either case sd_lock is always
3488 	 * released after acquiring SQLOCK.
3489 	 *
3490 	 * The streamhead read-side holding sd_lock when calling rwnext is
3491 	 * required to prevent a race condition were M_DATA mblks flowing
3492 	 * up the read-side of the stream could be bypassed by a rwnext()
3493 	 * down-call. In this case sd_lock acts as the streamhead perimeter.
3494 	 */
3495 	if ((nqp = _WR(qp)) == qp) {
3496 		isread = 0;
3497 		mutex_enter(&stp->sd_lock);
3498 		qp = nqp->q_next;
3499 	} else {
3500 		isread = 1;
3501 		if (nqp != stp->sd_wrq)
3502 			/* Not streamhead */
3503 			mutex_enter(&stp->sd_lock);
3504 		qp = _RD(nqp->q_next);
3505 	}
3506 	qi = qp->q_qinfo;
3507 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
3508 		/*
3509 		 * Not a synchronous module or no r/w procedure for this
3510 		 * queue, so just return EINVAL and let the caller handle it.
3511 		 */
3512 		mutex_exit(&stp->sd_lock);
3513 		return (EINVAL);
3514 	}
3515 
3516 	if (rwnext_enter(qp) == B_FALSE) {
3517 		mutex_exit(&stp->sd_lock);
3518 		return (EINVAL);
3519 	}
3520 
3521 	sq = qp->q_syncq;
3522 	mutex_enter(SQLOCK(sq));
3523 	mutex_exit(&stp->sd_lock);
3524 	count = sq->sq_count;
3525 	flags = sq->sq_flags;
3526 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3527 
3528 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3529 		/*
3530 		 * if this queue is being closed, return.
3531 		 */
3532 		if (qp->q_flag & QWCLOSE) {
3533 			mutex_exit(SQLOCK(sq));
3534 			rwnext_exit(qp);
3535 			return (EINVAL);
3536 		}
3537 
3538 		/*
3539 		 * Wait until we can enter the inner perimeter.
3540 		 */
3541 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3542 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3543 		count = sq->sq_count;
3544 		flags = sq->sq_flags;
3545 	}
3546 
3547 	if (isread == 0 && stp->sd_struiowrq == NULL ||
3548 	    isread == 1 && stp->sd_struiordq == NULL) {
3549 		/*
3550 		 * Stream plumbing changed while waiting for inner perimeter
3551 		 * so just return EINVAL and let the caller handle it.
3552 		 */
3553 		mutex_exit(SQLOCK(sq));
3554 		rwnext_exit(qp);
3555 		return (EINVAL);
3556 	}
3557 	if (!(flags & SQ_CIPUT))
3558 		sq->sq_flags = flags | SQ_EXCL;
3559 	sq->sq_count = count + 1;
3560 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3561 	/*
3562 	 * Note: The only message ordering guarantee that rwnext() makes is
3563 	 *	 for the write queue flow-control case. All others (r/w queue
3564 	 *	 with q_count > 0 (or q_first != 0)) are the resposibilty of
3565 	 *	 the queue's rw procedure. This could be genralized here buy
3566 	 *	 running the queue's service procedure, but that wouldn't be
3567 	 *	 the most efficent for all cases.
3568 	 */
3569 	mutex_exit(SQLOCK(sq));
3570 	if (! isread && (qp->q_flag & QFULL)) {
3571 		/*
3572 		 * Write queue may be flow controlled. If so,
3573 		 * mark the queue for wakeup when it's not.
3574 		 */
3575 		mutex_enter(QLOCK(qp));
3576 		if (qp->q_flag & QFULL) {
3577 			qp->q_flag |= QWANTWSYNC;
3578 			mutex_exit(QLOCK(qp));
3579 			rval = EWOULDBLOCK;
3580 			goto out;
3581 		}
3582 		mutex_exit(QLOCK(qp));
3583 	}
3584 
3585 	if (! isread && dp->d_mp)
3586 		STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
3587 		    dp->d_mp->b_datap->db_base);
3588 
3589 	rval = (*proc)(qp, dp);
3590 
3591 	if (isread && dp->d_mp)
3592 		STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
3593 		    dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
3594 out:
3595 	/*
3596 	 * The queue is protected from being freed by sq_count, so it is
3597 	 * safe to call rwnext_exit and reacquire SQLOCK(sq).
3598 	 */
3599 	rwnext_exit(qp);
3600 
3601 	mutex_enter(SQLOCK(sq));
3602 	flags = sq->sq_flags;
3603 	ASSERT(sq->sq_count != 0);
3604 	sq->sq_count--;
3605 	if (flags & SQ_TAIL) {
3606 		putnext_tail(sq, qp, flags);
3607 		/*
3608 		 * The only purpose of this ASSERT is to preserve calling stack
3609 		 * in DEBUG kernel.
3610 		 */
3611 		ASSERT(flags & SQ_TAIL);
3612 		return (rval);
3613 	}
3614 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3615 	/*
3616 	 * Safe to always drop SQ_EXCL:
3617 	 *	Not SQ_CIPUT means we set SQ_EXCL above
3618 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3619 	 *	did a qwriter(INNER) in which case nobody else
3620 	 *	is in the inner perimeter and we are exiting.
3621 	 *
3622 	 * I would like to make the following assertion:
3623 	 *
3624 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3625 	 * 	sq->sq_count == 0);
3626 	 *
3627 	 * which indicates that if we are both putshared and exclusive,
3628 	 * we became exclusive while executing the putproc, and the only
3629 	 * claim on the syncq was the one we dropped a few lines above.
3630 	 * But other threads that enter putnext while the syncq is exclusive
3631 	 * need to make a claim as they may need to drop SQLOCK in the
3632 	 * has_writers case to avoid deadlocks.  If these threads are
3633 	 * delayed or preempted, it is possible that the writer thread can
3634 	 * find out that there are other claims making the (sq_count == 0)
3635 	 * test invalid.
3636 	 */
3637 
3638 	sq->sq_flags = flags & ~SQ_EXCL;
3639 	if (sq->sq_flags & SQ_WANTWAKEUP) {
3640 		sq->sq_flags &= ~SQ_WANTWAKEUP;
3641 		cv_broadcast(&sq->sq_wait);
3642 	}
3643 	mutex_exit(SQLOCK(sq));
3644 	return (rval);
3645 }
3646 
3647 /*
3648  * The purpose of infonext() is to call the info procedure of the next
3649  * (downstream) modules queue.
3650  *
3651  * treated as put entrypoint for perimeter syncronization.
3652  *
3653  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3654  * sync queues). If it is CIPUT sync queue regular sq_count is incremented and
3655  * it does not matter if any regular put entrypoints have been already
3656  * entered.
3657  */
3658 int
3659 infonext(queue_t *qp, infod_t *idp)
3660 {
3661 	queue_t		*nqp;
3662 	syncq_t		*sq;
3663 	uint16_t	count;
3664 	uint16_t 	flags;
3665 	struct qinit	*qi;
3666 	int		(*proc)();
3667 	struct stdata	*stp;
3668 	int		rval;
3669 
3670 	stp = STREAM(qp);
3671 	/*
3672 	 * Prevent q_next from changing by holding sd_lock until
3673 	 * acquiring SQLOCK.
3674 	 */
3675 	mutex_enter(&stp->sd_lock);
3676 	if ((nqp = _WR(qp)) == qp) {
3677 		qp = nqp->q_next;
3678 	} else {
3679 		qp = _RD(nqp->q_next);
3680 	}
3681 	qi = qp->q_qinfo;
3682 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
3683 		mutex_exit(&stp->sd_lock);
3684 		return (EINVAL);
3685 	}
3686 	sq = qp->q_syncq;
3687 	mutex_enter(SQLOCK(sq));
3688 	mutex_exit(&stp->sd_lock);
3689 	count = sq->sq_count;
3690 	flags = sq->sq_flags;
3691 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3692 
3693 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3694 		/*
3695 		 * Wait until we can enter the inner perimeter.
3696 		 */
3697 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3698 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3699 		count = sq->sq_count;
3700 		flags = sq->sq_flags;
3701 	}
3702 
3703 	if (! (flags & SQ_CIPUT))
3704 		sq->sq_flags = flags | SQ_EXCL;
3705 	sq->sq_count = count + 1;
3706 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3707 	mutex_exit(SQLOCK(sq));
3708 
3709 	rval = (*proc)(qp, idp);
3710 
3711 	mutex_enter(SQLOCK(sq));
3712 	flags = sq->sq_flags;
3713 	ASSERT(sq->sq_count != 0);
3714 	sq->sq_count--;
3715 	if (flags & SQ_TAIL) {
3716 		putnext_tail(sq, qp, flags);
3717 		/*
3718 		 * The only purpose of this ASSERT is to preserve calling stack
3719 		 * in DEBUG kernel.
3720 		 */
3721 		ASSERT(flags & SQ_TAIL);
3722 		return (rval);
3723 	}
3724 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3725 /*
3726  * XXXX
3727  * I am not certain the next comment is correct here.  I need to consider
3728  * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
3729  * might cause other problems.  It just might be safer to drop it if
3730  * !SQ_CIPUT because that is when we set it.
3731  */
3732 	/*
3733 	 * Safe to always drop SQ_EXCL:
3734 	 *	Not SQ_CIPUT means we set SQ_EXCL above
3735 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3736 	 *	did a qwriter(INNER) in which case nobody else
3737 	 *	is in the inner perimeter and we are exiting.
3738 	 *
3739 	 * I would like to make the following assertion:
3740 	 *
3741 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3742 	 *	sq->sq_count == 0);
3743 	 *
3744 	 * which indicates that if we are both putshared and exclusive,
3745 	 * we became exclusive while executing the putproc, and the only
3746 	 * claim on the syncq was the one we dropped a few lines above.
3747 	 * But other threads that enter putnext while the syncq is exclusive
3748 	 * need to make a claim as they may need to drop SQLOCK in the
3749 	 * has_writers case to avoid deadlocks.  If these threads are
3750 	 * delayed or preempted, it is possible that the writer thread can
3751 	 * find out that there are other claims making the (sq_count == 0)
3752 	 * test invalid.
3753 	 */
3754 
3755 	sq->sq_flags = flags & ~SQ_EXCL;
3756 	mutex_exit(SQLOCK(sq));
3757 	return (rval);
3758 }
3759 
3760 /*
3761  * Return nonzero if the queue is responsible for struio(), else return 0.
3762  */
3763 int
3764 isuioq(queue_t *q)
3765 {
3766 	if (q->q_flag & QREADR)
3767 		return (STREAM(q)->sd_struiordq == q);
3768 	else
3769 		return (STREAM(q)->sd_struiowrq == q);
3770 }
3771 
3772 #if defined(__sparc)
3773 int disable_putlocks = 0;
3774 #else
3775 int disable_putlocks = 1;
3776 #endif
3777 
3778 /*
3779  * called by create_putlock.
3780  */
3781 static void
3782 create_syncq_putlocks(queue_t *q)
3783 {
3784 	syncq_t	*sq = q->q_syncq;
3785 	ciputctrl_t *cip;
3786 	int i;
3787 
3788 	ASSERT(sq != NULL);
3789 
3790 	ASSERT(disable_putlocks == 0);
3791 	ASSERT(n_ciputctrl >= min_n_ciputctrl);
3792 	ASSERT(ciputctrl_cache != NULL);
3793 
3794 	if (!(sq->sq_type & SQ_CIPUT))
3795 		return;
3796 
3797 	for (i = 0; i <= 1; i++) {
3798 		if (sq->sq_ciputctrl == NULL) {
3799 			cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
3800 			SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
3801 			mutex_enter(SQLOCK(sq));
3802 			if (sq->sq_ciputctrl != NULL) {
3803 				mutex_exit(SQLOCK(sq));
3804 				kmem_cache_free(ciputctrl_cache, cip);
3805 			} else {
3806 				ASSERT(sq->sq_nciputctrl == 0);
3807 				sq->sq_nciputctrl = n_ciputctrl - 1;
3808 				/*
3809 				 * putnext checks sq_ciputctrl without holding
3810 				 * SQLOCK. if it is not NULL putnext assumes
3811 				 * sq_nciputctrl is initialized. membar below
3812 				 * insures that.
3813 				 */
3814 				membar_producer();
3815 				sq->sq_ciputctrl = cip;
3816 				mutex_exit(SQLOCK(sq));
3817 			}
3818 		}
3819 		ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
3820 		if (i == 1)
3821 			break;
3822 		q = _OTHERQ(q);
3823 		if (!(q->q_flag & QPERQ)) {
3824 			ASSERT(sq == q->q_syncq);
3825 			break;
3826 		}
3827 		ASSERT(q->q_syncq != NULL);
3828 		ASSERT(sq != q->q_syncq);
3829 		sq = q->q_syncq;
3830 		ASSERT(sq->sq_type & SQ_CIPUT);
3831 	}
3832 }
3833 
3834 /*
3835  * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
3836  * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
3837  * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
3838  * starting from q and down to the driver.
3839  *
3840  * This should be called after the affected queues are part of stream
3841  * geometry. It should be called from driver/module open routine after
3842  * qprocson() call. It is also called from nfs syscall where it is known that
3843  * stream is configured and won't change its geometry during create_putlock
3844  * call.
3845  *
3846  * caller normally uses 0 value for the stream argument to speed up MT putnext
3847  * into the perimeter of q for example because its perimeter is per module
3848  * (e.g. IP).
3849  *
3850  * caller normally uses non 0 value for the stream argument to hint the system
3851  * that the stream of q is a very contended global system stream
3852  * (e.g. NFS/UDP) and the part of the stream from q to the driver is
3853  * particularly MT hot.
3854  *
3855  * Caller insures stream plumbing won't happen while we are here and therefore
3856  * q_next can be safely used.
3857  */
3858 
3859 void
3860 create_putlocks(queue_t *q, int stream)
3861 {
3862 	ciputctrl_t	*cip;
3863 	struct stdata	*stp = STREAM(q);
3864 
3865 	q = _WR(q);
3866 	ASSERT(stp != NULL);
3867 
3868 	if (disable_putlocks != 0)
3869 		return;
3870 
3871 	if (n_ciputctrl < min_n_ciputctrl)
3872 		return;
3873 
3874 	ASSERT(ciputctrl_cache != NULL);
3875 
3876 	if (stream != 0 && stp->sd_ciputctrl == NULL) {
3877 		cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
3878 		SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
3879 		mutex_enter(&stp->sd_lock);
3880 		if (stp->sd_ciputctrl != NULL) {
3881 			mutex_exit(&stp->sd_lock);
3882 			kmem_cache_free(ciputctrl_cache, cip);
3883 		} else {
3884 			ASSERT(stp->sd_nciputctrl == 0);
3885 			stp->sd_nciputctrl = n_ciputctrl - 1;
3886 			/*
3887 			 * putnext checks sd_ciputctrl without holding
3888 			 * sd_lock. if it is not NULL putnext assumes
3889 			 * sd_nciputctrl is initialized. membar below
3890 			 * insures that.
3891 			 */
3892 			membar_producer();
3893 			stp->sd_ciputctrl = cip;
3894 			mutex_exit(&stp->sd_lock);
3895 		}
3896 	}
3897 
3898 	ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);
3899 
3900 	while (_SAMESTR(q)) {
3901 		create_syncq_putlocks(q);
3902 		if (stream == 0)
3903 			return;
3904 		q = q->q_next;
3905 	}
3906 	ASSERT(q != NULL);
3907 	create_syncq_putlocks(q);
3908 }
3909 
3910 /*
3911  * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
3912  * through a stream.
3913  *
3914  * Data currently record per event is a hrtime stamp, queue address, event
3915  * type, and a per type datum.  Much of the STREAMS framework is instrumented
3916  * for automatic flow tracing (when enabled).  Events can be defined and used
3917  * by STREAMS modules and drivers.
3918  *
3919  * Global objects:
3920  *
3921  *	str_ftevent() - Add a flow-trace event to a dblk.
3922  *	str_ftfree() - Free flow-trace data
3923  *
3924  * Local objects:
3925  *
3926  *	fthdr_cache - pointer to the kmem cache for trace header.
3927  *	ftblk_cache - pointer to the kmem cache for trace data blocks.
3928  */
3929 
3930 int str_ftnever = 1;	/* Don't do STREAMS flow tracing */
3931 
3932 void
3933 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
3934 {
3935 	ftblk_t *bp = hp->tail;
3936 	ftblk_t *nbp;
3937 	ftevnt_t *ep;
3938 	int ix, nix;
3939 
3940 	ASSERT(hp != NULL);
3941 
3942 	for (;;) {
3943 		if ((ix = bp->ix) == FTBLK_EVNTS) {
3944 			/*
3945 			 * Tail doesn't have room, so need a new tail.
3946 			 *
3947 			 * To make this MT safe, first, allocate a new
3948 			 * ftblk, and initialize it.  To make life a
3949 			 * little easier, reserve the first slot (mostly
3950 			 * by making ix = 1).  When we are finished with
3951 			 * the initialization, CAS this pointer to the
3952 			 * tail.  If this succeeds, this is the new
3953 			 * "next" block.  Otherwise, another thread
3954 			 * got here first, so free the block and start
3955 			 * again.
3956 			 */
3957 			if (!(nbp = kmem_cache_alloc(ftblk_cache,
3958 			    KM_NOSLEEP))) {
3959 				/* no mem, so punt */
3960 				str_ftnever++;
3961 				/* free up all flow data? */
3962 				return;
3963 			}
3964 			nbp->nxt = NULL;
3965 			nbp->ix = 1;
3966 			/*
3967 			 * Just in case there is another thread about
3968 			 * to get the next index, we need to make sure
3969 			 * the value is there for it.
3970 			 */
3971 			membar_producer();
3972 			if (casptr(&hp->tail, bp, nbp) == bp) {
3973 				/* CAS was successful */
3974 				bp->nxt = nbp;
3975 				membar_producer();
3976 				bp = nbp;
3977 				ix = 0;
3978 				goto cas_good;
3979 			} else {
3980 				kmem_cache_free(ftblk_cache, nbp);
3981 				bp = hp->tail;
3982 				continue;
3983 			}
3984 		}
3985 		nix = ix + 1;
3986 		if (cas32((uint32_t *)&bp->ix, ix, nix) == ix) {
3987 		cas_good:
3988 			if (curthread != hp->thread) {
3989 				hp->thread = curthread;
3990 				evnt |= FTEV_CS;
3991 			}
3992 			if (CPU->cpu_seqid != hp->cpu_seqid) {
3993 				hp->cpu_seqid = CPU->cpu_seqid;
3994 				evnt |= FTEV_PS;
3995 			}
3996 			ep = &bp->ev[ix];
3997 			break;
3998 		}
3999 	}
4000 
4001 	if (evnt & FTEV_QMASK) {
4002 		queue_t *qp = p;
4003 
4004 		/*
4005 		 * It is possible that the module info is broke
4006 		 * (as is logsubr.c at this comment writing).
4007 		 * Instead of panicing or doing other unmentionables,
4008 		 * we shall put a dummy name as the mid, and continue.
4009 		 */
4010 		if (qp->q_qinfo == NULL)
4011 			ep->mid = "NONAME";
4012 		else
4013 			ep->mid = qp->q_qinfo->qi_minfo->mi_idname;
4014 
4015 		if (!(qp->q_flag & QREADR))
4016 			evnt |= FTEV_ISWR;
4017 	} else {
4018 		ep->mid = (char *)p;
4019 	}
4020 
4021 	ep->ts = gethrtime();
4022 	ep->evnt = evnt;
4023 	ep->data = data;
4024 	hp->hash = (hp->hash << 9) + hp->hash;
4025 	hp->hash += (evnt << 16) | data;
4026 	hp->hash += (uintptr_t)ep->mid;
4027 }
4028 
4029 /*
4030  * Free flow-trace data.
4031  */
4032 void
4033 str_ftfree(dblk_t *dbp)
4034 {
4035 	fthdr_t *hp = dbp->db_fthdr;
4036 	ftblk_t *bp = &hp->first;
4037 	ftblk_t *nbp;
4038 
4039 	if (bp != hp->tail || bp->ix != 0) {
4040 		/*
4041 		 * Clear out the hash, have the tail point to itself, and free
4042 		 * any continuation blocks.
4043 		 */
4044 		bp = hp->first.nxt;
4045 		hp->tail = &hp->first;
4046 		hp->hash = 0;
4047 		hp->first.nxt = NULL;
4048 		hp->first.ix = 0;
4049 		while (bp != NULL) {
4050 			nbp = bp->nxt;
4051 			kmem_cache_free(ftblk_cache, bp);
4052 			bp = nbp;
4053 		}
4054 	}
4055 	kmem_cache_free(fthdr_cache, hp);
4056 	dbp->db_fthdr = NULL;
4057 }
4058