xref: /titanic_50/usr/src/uts/common/io/stream.c (revision 4f85d229295a756a4e6f1759b47df7b97412db7d)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
23 /*	  All Rights Reserved  	*/
24 
25 
26 /*
27  * Copyright 2005 Sun Microsystems, Inc.  All rights reserved.
28  * Use is subject to license terms.
29  */
30 
31 #pragma ident	"%Z%%M%	%I%	%E% SMI"
32 
33 #include <sys/types.h>
34 #include <sys/param.h>
35 #include <sys/thread.h>
36 #include <sys/sysmacros.h>
37 #include <sys/stropts.h>
38 #include <sys/stream.h>
39 #include <sys/strsubr.h>
40 #include <sys/strsun.h>
41 #include <sys/conf.h>
42 #include <sys/debug.h>
43 #include <sys/cmn_err.h>
44 #include <sys/kmem.h>
45 #include <sys/atomic.h>
46 #include <sys/errno.h>
47 #include <sys/vtrace.h>
48 #include <sys/ftrace.h>
49 #include <sys/ontrap.h>
50 #include <sys/multidata.h>
51 #include <sys/multidata_impl.h>
52 #include <sys/sdt.h>
53 
54 #ifdef DEBUG
55 #include <sys/kmem_impl.h>
56 #endif
57 
58 /*
59  * This file contains all the STREAMS utility routines that may
60  * be used by modules and drivers.
61  */
62 
63 /*
64  * STREAMS message allocator: principles of operation
65  *
66  * The streams message allocator consists of all the routines that
67  * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
68  * dupb(), freeb() and freemsg().  What follows is a high-level view
69  * of how the allocator works.
70  *
71  * Every streams message consists of one or more mblks, a dblk, and data.
72  * All mblks for all types of messages come from a common mblk_cache.
73  * The dblk and data come in several flavors, depending on how the
74  * message is allocated:
75  *
76  * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
77  *     fixed-size dblk/data caches. For message sizes that are multiples of
78  *     PAGESIZE, dblks are allocated separately from the buffer.
79  *     The associated buffer is allocated by the constructor using kmem_alloc().
80  *     For all other message sizes, dblk and its associated data is allocated
81  *     as a single contiguous chunk of memory.
82  *     Objects in these caches consist of a dblk plus its associated data.
83  *     allocb() determines the nearest-size cache by table lookup:
84  *     the dblk_cache[] array provides the mapping from size to dblk cache.
85  *
86  * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
87  *     kmem_alloc()'ing a buffer for the data and supplying that
88  *     buffer to gesballoc(), described below.
89  *
90  * (3) The four flavors of [d]esballoc[a] are all implemented by a
91  *     common routine, gesballoc() ("generic esballoc").  gesballoc()
92  *     allocates a dblk from the global dblk_esb_cache and sets db_base,
93  *     db_lim and db_frtnp to describe the caller-supplied buffer.
94  *
95  * While there are several routines to allocate messages, there is only
96  * one routine to free messages: freeb().  freeb() simply invokes the
97  * dblk's free method, dbp->db_free(), which is set at allocation time.
98  *
99  * dupb() creates a new reference to a message by allocating a new mblk,
100  * incrementing the dblk reference count and setting the dblk's free
101  * method to dblk_decref().  The dblk's original free method is retained
102  * in db_lastfree.  dblk_decref() decrements the reference count on each
103  * freeb().  If this is not the last reference it just frees the mblk;
104  * if this *is* the last reference, it restores db_free to db_lastfree,
105  * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
106  *
107  * The implementation makes aggressive use of kmem object caching for
108  * maximum performance.  This makes the code simple and compact, but
109  * also a bit abstruse in some places.  The invariants that constitute a
110  * message's constructed state, described below, are more subtle than usual.
111  *
112  * Every dblk has an "attached mblk" as part of its constructed state.
113  * The mblk is allocated by the dblk's constructor and remains attached
114  * until the message is either dup'ed or pulled up.  In the dupb() case
115  * the mblk association doesn't matter until the last free, at which time
116  * dblk_decref() attaches the last mblk to the dblk.  pullupmsg() affects
117  * the mblk association because it swaps the leading mblks of two messages,
118  * so it is responsible for swapping their db_mblk pointers accordingly.
119  * From a constructed-state viewpoint it doesn't matter that a dblk's
120  * attached mblk can change while the message is allocated; all that
121  * matters is that the dblk has *some* attached mblk when it's freed.
122  *
123  * The sizes of the allocb() small-message caches are not magical.
124  * They represent a good trade-off between internal and external
125  * fragmentation for current workloads.  They should be reevaluated
126  * periodically, especially if allocations larger than DBLK_MAX_CACHE
127  * become common.  We use 64-byte alignment so that dblks don't
128  * straddle cache lines unnecessarily.
129  */
130 #define	DBLK_MAX_CACHE		73728
131 #define	DBLK_CACHE_ALIGN	64
132 #define	DBLK_MIN_SIZE		8
133 #define	DBLK_SIZE_SHIFT		3
134 
135 #ifdef _BIG_ENDIAN
136 #define	DBLK_RTFU_SHIFT(field)	\
137 	(8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
138 #else
139 #define	DBLK_RTFU_SHIFT(field)	\
140 	(8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
141 #endif
142 
143 #define	DBLK_RTFU(ref, type, flags, uioflag)	\
144 	(((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
145 	((type) << DBLK_RTFU_SHIFT(db_type)) | \
146 	(((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
147 	((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
148 #define	DBLK_RTFU_REF_MASK	(DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
149 #define	DBLK_RTFU_WORD(dbp)	(*((uint32_t *)&(dbp)->db_ref))
150 #define	MBLK_BAND_FLAG_WORD(mp)	(*((uint32_t *)&(mp)->b_band))
151 
152 static size_t dblk_sizes[] = {
153 #ifdef _LP64
154 	16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3920,
155 	8192, 12112, 16384, 20304, 24576, 28496, 32768, 36688,
156 	40960, 44880, 49152, 53072, 57344, 61264, 65536, 69456,
157 #else
158 	64, 128, 320, 576, 1088, 1536, 1984, 2624, 3968,
159 	8192, 12160, 16384, 20352, 24576, 28544, 32768, 36736,
160 	40960, 44928, 49152, 53120, 57344, 61312, 65536, 69504,
161 #endif
162 	DBLK_MAX_CACHE, 0
163 };
164 
165 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
166 static struct kmem_cache *mblk_cache;
167 static struct kmem_cache *dblk_esb_cache;
168 static struct kmem_cache *fthdr_cache;
169 static struct kmem_cache *ftblk_cache;
170 
171 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
172 static mblk_t *allocb_oversize(size_t size, int flags);
173 static int allocb_tryhard_fails;
174 static void frnop_func(void *arg);
175 frtn_t frnop = { frnop_func };
176 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
177 
178 static boolean_t rwnext_enter(queue_t *qp);
179 static void rwnext_exit(queue_t *qp);
180 
181 /*
182  * Patchable mblk/dblk kmem_cache flags.
183  */
184 int dblk_kmem_flags = 0;
185 int mblk_kmem_flags = 0;
186 
187 
188 static int
189 dblk_constructor(void *buf, void *cdrarg, int kmflags)
190 {
191 	dblk_t *dbp = buf;
192 	ssize_t msg_size = (ssize_t)cdrarg;
193 	size_t index;
194 
195 	ASSERT(msg_size != 0);
196 
197 	index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
198 
199 	ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
200 
201 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
202 		return (-1);
203 	if ((msg_size & PAGEOFFSET) == 0) {
204 		dbp->db_base = kmem_alloc(msg_size, kmflags);
205 		if (dbp->db_base == NULL) {
206 			kmem_cache_free(mblk_cache, dbp->db_mblk);
207 			return (-1);
208 		}
209 	} else {
210 		dbp->db_base = (unsigned char *)&dbp[1];
211 	}
212 
213 	dbp->db_mblk->b_datap = dbp;
214 	dbp->db_cache = dblk_cache[index];
215 	dbp->db_lim = dbp->db_base + msg_size;
216 	dbp->db_free = dbp->db_lastfree = dblk_lastfree;
217 	dbp->db_frtnp = NULL;
218 	dbp->db_fthdr = NULL;
219 	dbp->db_credp = NULL;
220 	dbp->db_cpid = -1;
221 	dbp->db_struioflag = 0;
222 	dbp->db_struioun.cksum.flags = 0;
223 	return (0);
224 }
225 
226 /*ARGSUSED*/
227 static int
228 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
229 {
230 	dblk_t *dbp = buf;
231 
232 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
233 		return (-1);
234 	dbp->db_mblk->b_datap = dbp;
235 	dbp->db_cache = dblk_esb_cache;
236 	dbp->db_fthdr = NULL;
237 	dbp->db_credp = NULL;
238 	dbp->db_cpid = -1;
239 	dbp->db_struioflag = 0;
240 	dbp->db_struioun.cksum.flags = 0;
241 	return (0);
242 }
243 
244 static int
245 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
246 {
247 	dblk_t *dbp = buf;
248 	bcache_t *bcp = (bcache_t *)cdrarg;
249 
250 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
251 		return (-1);
252 
253 	if ((dbp->db_base = (unsigned char *)kmem_cache_alloc(bcp->buffer_cache,
254 	    kmflags)) == NULL) {
255 		kmem_cache_free(mblk_cache, dbp->db_mblk);
256 		return (-1);
257 	}
258 
259 	dbp->db_mblk->b_datap = dbp;
260 	dbp->db_cache = (void *)bcp;
261 	dbp->db_lim = dbp->db_base + bcp->size;
262 	dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
263 	dbp->db_frtnp = NULL;
264 	dbp->db_fthdr = NULL;
265 	dbp->db_credp = NULL;
266 	dbp->db_cpid = -1;
267 	dbp->db_struioflag = 0;
268 	dbp->db_struioun.cksum.flags = 0;
269 	return (0);
270 }
271 
272 /*ARGSUSED*/
273 static void
274 dblk_destructor(void *buf, void *cdrarg)
275 {
276 	dblk_t *dbp = buf;
277 	ssize_t msg_size = (ssize_t)cdrarg;
278 
279 	ASSERT(dbp->db_mblk->b_datap == dbp);
280 
281 	ASSERT(msg_size != 0);
282 
283 	ASSERT(dbp->db_struioflag == 0);
284 	ASSERT(dbp->db_struioun.cksum.flags == 0);
285 
286 	if ((msg_size & PAGEOFFSET) == 0) {
287 		kmem_free(dbp->db_base, msg_size);
288 	}
289 
290 	kmem_cache_free(mblk_cache, dbp->db_mblk);
291 }
292 
293 static void
294 bcache_dblk_destructor(void *buf, void *cdrarg)
295 {
296 	dblk_t *dbp = buf;
297 	bcache_t *bcp = (bcache_t *)cdrarg;
298 
299 	kmem_cache_free(bcp->buffer_cache, dbp->db_base);
300 
301 	ASSERT(dbp->db_mblk->b_datap == dbp);
302 
303 	ASSERT(dbp->db_struioflag == 0);
304 	ASSERT(dbp->db_struioun.cksum.flags == 0);
305 
306 	kmem_cache_free(mblk_cache, dbp->db_mblk);
307 }
308 
309 void
310 streams_msg_init(void)
311 {
312 	char name[40];
313 	size_t size;
314 	size_t lastsize = DBLK_MIN_SIZE;
315 	size_t *sizep;
316 	struct kmem_cache *cp;
317 	size_t tot_size;
318 	int offset;
319 
320 	mblk_cache = kmem_cache_create("streams_mblk",
321 		sizeof (mblk_t), 32, NULL, NULL, NULL, NULL, NULL,
322 		mblk_kmem_flags);
323 
324 	for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
325 
326 		if ((offset = (size & PAGEOFFSET)) != 0) {
327 			/*
328 			 * We are in the middle of a page, dblk should
329 			 * be allocated on the same page
330 			 */
331 			tot_size = size + sizeof (dblk_t);
332 			ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
333 								< PAGESIZE);
334 			ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
335 
336 		} else {
337 
338 			/*
339 			 * buf size is multiple of page size, dblk and
340 			 * buffer are allocated separately.
341 			 */
342 
343 			ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
344 			tot_size = sizeof (dblk_t);
345 		}
346 
347 		(void) sprintf(name, "streams_dblk_%ld", size);
348 		cp = kmem_cache_create(name, tot_size,
349 			DBLK_CACHE_ALIGN, dblk_constructor,
350 			dblk_destructor, NULL,
351 			(void *)(size), NULL, dblk_kmem_flags);
352 
353 		while (lastsize <= size) {
354 			dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
355 			lastsize += DBLK_MIN_SIZE;
356 		}
357 	}
358 
359 	dblk_esb_cache = kmem_cache_create("streams_dblk_esb",
360 			sizeof (dblk_t), DBLK_CACHE_ALIGN,
361 			dblk_esb_constructor, dblk_destructor, NULL,
362 			(void *) sizeof (dblk_t), NULL, dblk_kmem_flags);
363 	fthdr_cache = kmem_cache_create("streams_fthdr",
364 		sizeof (fthdr_t), 32, NULL, NULL, NULL, NULL, NULL, 0);
365 	ftblk_cache = kmem_cache_create("streams_ftblk",
366 		sizeof (ftblk_t), 32, NULL, NULL, NULL, NULL, NULL, 0);
367 
368 	/* Initialize Multidata caches */
369 	mmd_init();
370 }
371 
372 /*ARGSUSED*/
373 mblk_t *
374 allocb(size_t size, uint_t pri)
375 {
376 	dblk_t *dbp;
377 	mblk_t *mp;
378 	size_t index;
379 
380 	index =  (size - 1)  >> DBLK_SIZE_SHIFT;
381 
382 	if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
383 		if (size != 0) {
384 			mp = allocb_oversize(size, KM_NOSLEEP);
385 			goto out;
386 		}
387 		index = 0;
388 	}
389 
390 	if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
391 		mp = NULL;
392 		goto out;
393 	}
394 
395 	mp = dbp->db_mblk;
396 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
397 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
398 	mp->b_rptr = mp->b_wptr = dbp->db_base;
399 	mp->b_queue = NULL;
400 	MBLK_BAND_FLAG_WORD(mp) = 0;
401 	STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
402 out:
403 	FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
404 
405 	return (mp);
406 }
407 
408 mblk_t *
409 allocb_tmpl(size_t size, const mblk_t *tmpl)
410 {
411 	mblk_t *mp = allocb(size, 0);
412 
413 	if (mp != NULL) {
414 		cred_t *cr = DB_CRED(tmpl);
415 		if (cr != NULL)
416 			crhold(mp->b_datap->db_credp = cr);
417 		DB_CPID(mp) = DB_CPID(tmpl);
418 		DB_TYPE(mp) = DB_TYPE(tmpl);
419 	}
420 	return (mp);
421 }
422 
423 mblk_t *
424 allocb_cred(size_t size, cred_t *cr)
425 {
426 	mblk_t *mp = allocb(size, 0);
427 
428 	if (mp != NULL && cr != NULL)
429 		crhold(mp->b_datap->db_credp = cr);
430 
431 	return (mp);
432 }
433 
434 mblk_t *
435 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr)
436 {
437 	mblk_t *mp = allocb_wait(size, 0, flags, error);
438 
439 	if (mp != NULL && cr != NULL)
440 		crhold(mp->b_datap->db_credp = cr);
441 
442 	return (mp);
443 }
444 
445 void
446 freeb(mblk_t *mp)
447 {
448 	dblk_t *dbp = mp->b_datap;
449 
450 	ASSERT(dbp->db_ref > 0);
451 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
452 	FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
453 
454 	STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
455 
456 	dbp->db_free(mp, dbp);
457 }
458 
459 void
460 freemsg(mblk_t *mp)
461 {
462 	FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
463 	while (mp) {
464 		dblk_t *dbp = mp->b_datap;
465 		mblk_t *mp_cont = mp->b_cont;
466 
467 		ASSERT(dbp->db_ref > 0);
468 		ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
469 
470 		STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
471 
472 		dbp->db_free(mp, dbp);
473 		mp = mp_cont;
474 	}
475 }
476 
477 /*
478  * Reallocate a block for another use.  Try hard to use the old block.
479  * If the old data is wanted (copy), leave b_wptr at the end of the data,
480  * otherwise return b_wptr = b_rptr.
481  *
482  * This routine is private and unstable.
483  */
484 mblk_t	*
485 reallocb(mblk_t *mp, size_t size, uint_t copy)
486 {
487 	mblk_t		*mp1;
488 	unsigned char	*old_rptr;
489 	ptrdiff_t	cur_size;
490 
491 	if (mp == NULL)
492 		return (allocb(size, BPRI_HI));
493 
494 	cur_size = mp->b_wptr - mp->b_rptr;
495 	old_rptr = mp->b_rptr;
496 
497 	ASSERT(mp->b_datap->db_ref != 0);
498 
499 	if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
500 		/*
501 		 * If the data is wanted and it will fit where it is, no
502 		 * work is required.
503 		 */
504 		if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
505 			return (mp);
506 
507 		mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
508 		mp1 = mp;
509 	} else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
510 		/* XXX other mp state could be copied too, db_flags ... ? */
511 		mp1->b_cont = mp->b_cont;
512 	} else {
513 		return (NULL);
514 	}
515 
516 	if (copy) {
517 		bcopy(old_rptr, mp1->b_rptr, cur_size);
518 		mp1->b_wptr = mp1->b_rptr + cur_size;
519 	}
520 
521 	if (mp != mp1)
522 		freeb(mp);
523 
524 	return (mp1);
525 }
526 
527 static void
528 dblk_lastfree(mblk_t *mp, dblk_t *dbp)
529 {
530 	ASSERT(dbp->db_mblk == mp);
531 	if (dbp->db_fthdr != NULL)
532 		str_ftfree(dbp);
533 
534 	/* set credp and projid to be 'unspecified' before returning to cache */
535 	if (dbp->db_credp != NULL) {
536 		crfree(dbp->db_credp);
537 		dbp->db_credp = NULL;
538 	}
539 	dbp->db_cpid = -1;
540 
541 	/* Reset the struioflag and the checksum flag fields */
542 	dbp->db_struioflag = 0;
543 	dbp->db_struioun.cksum.flags = 0;
544 
545 	/* and the COOKED flag */
546 	dbp->db_flags &= ~DBLK_COOKED;
547 
548 	kmem_cache_free(dbp->db_cache, dbp);
549 }
550 
551 static void
552 dblk_decref(mblk_t *mp, dblk_t *dbp)
553 {
554 	if (dbp->db_ref != 1) {
555 		uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
556 		    -(1 << DBLK_RTFU_SHIFT(db_ref)));
557 		/*
558 		 * atomic_add_32_nv() just decremented db_ref, so we no longer
559 		 * have a reference to the dblk, which means another thread
560 		 * could free it.  Therefore we cannot examine the dblk to
561 		 * determine whether ours was the last reference.  Instead,
562 		 * we extract the new and minimum reference counts from rtfu.
563 		 * Note that all we're really saying is "if (ref != refmin)".
564 		 */
565 		if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
566 		    ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
567 			kmem_cache_free(mblk_cache, mp);
568 			return;
569 		}
570 	}
571 	dbp->db_mblk = mp;
572 	dbp->db_free = dbp->db_lastfree;
573 	dbp->db_lastfree(mp, dbp);
574 }
575 
576 mblk_t *
577 dupb(mblk_t *mp)
578 {
579 	dblk_t *dbp = mp->b_datap;
580 	mblk_t *new_mp;
581 	uint32_t oldrtfu, newrtfu;
582 
583 	if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
584 		goto out;
585 
586 	new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
587 	new_mp->b_rptr = mp->b_rptr;
588 	new_mp->b_wptr = mp->b_wptr;
589 	new_mp->b_datap = dbp;
590 	new_mp->b_queue = NULL;
591 	MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
592 
593 	STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
594 
595 	/*
596 	 * First-dup optimization.  The enabling assumption is that there
597 	 * can can never be a race (in correct code) to dup the first copy
598 	 * of a message.  Therefore we don't need to do it atomically.
599 	 */
600 	if (dbp->db_free != dblk_decref) {
601 		dbp->db_free = dblk_decref;
602 		dbp->db_ref++;
603 		goto out;
604 	}
605 
606 	do {
607 		ASSERT(dbp->db_ref > 0);
608 		oldrtfu = DBLK_RTFU_WORD(dbp);
609 		newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
610 		/*
611 		 * If db_ref is maxed out we can't dup this message anymore.
612 		 */
613 		if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
614 			kmem_cache_free(mblk_cache, new_mp);
615 			new_mp = NULL;
616 			goto out;
617 		}
618 	} while (cas32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != oldrtfu);
619 
620 out:
621 	FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
622 	return (new_mp);
623 }
624 
625 static void
626 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
627 {
628 	frtn_t *frp = dbp->db_frtnp;
629 
630 	ASSERT(dbp->db_mblk == mp);
631 	frp->free_func(frp->free_arg);
632 	if (dbp->db_fthdr != NULL)
633 		str_ftfree(dbp);
634 
635 	/* set credp and projid to be 'unspecified' before returning to cache */
636 	if (dbp->db_credp != NULL) {
637 		crfree(dbp->db_credp);
638 		dbp->db_credp = NULL;
639 	}
640 	dbp->db_cpid = -1;
641 	dbp->db_struioflag = 0;
642 	dbp->db_struioun.cksum.flags = 0;
643 
644 	kmem_cache_free(dbp->db_cache, dbp);
645 }
646 
647 /*ARGSUSED*/
648 static void
649 frnop_func(void *arg)
650 {
651 }
652 
653 /*
654  * Generic esballoc used to implement the four flavors: [d]esballoc[a].
655  */
656 static mblk_t *
657 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
658 	void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
659 {
660 	dblk_t *dbp;
661 	mblk_t *mp;
662 
663 	ASSERT(base != NULL && frp != NULL);
664 
665 	if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
666 		mp = NULL;
667 		goto out;
668 	}
669 
670 	mp = dbp->db_mblk;
671 	dbp->db_base = base;
672 	dbp->db_lim = base + size;
673 	dbp->db_free = dbp->db_lastfree = lastfree;
674 	dbp->db_frtnp = frp;
675 	DBLK_RTFU_WORD(dbp) = db_rtfu;
676 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
677 	mp->b_rptr = mp->b_wptr = base;
678 	mp->b_queue = NULL;
679 	MBLK_BAND_FLAG_WORD(mp) = 0;
680 
681 out:
682 	FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
683 	return (mp);
684 }
685 
686 /*ARGSUSED*/
687 mblk_t *
688 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
689 {
690 	mblk_t *mp;
691 
692 	/*
693 	 * Note that this is structured to allow the common case (i.e.
694 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
695 	 * call optimization.
696 	 */
697 	if (!str_ftnever) {
698 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
699 		    frp, freebs_enqueue, KM_NOSLEEP);
700 
701 		if (mp != NULL)
702 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
703 		return (mp);
704 	}
705 
706 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
707 	    frp, freebs_enqueue, KM_NOSLEEP));
708 }
709 
710 /*
711  * Same as esballoc() but sleeps waiting for memory.
712  */
713 /*ARGSUSED*/
714 mblk_t *
715 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
716 {
717 	mblk_t *mp;
718 
719 	/*
720 	 * Note that this is structured to allow the common case (i.e.
721 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
722 	 * call optimization.
723 	 */
724 	if (!str_ftnever) {
725 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
726 		    frp, freebs_enqueue, KM_SLEEP);
727 
728 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
729 		return (mp);
730 	}
731 
732 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
733 	    frp, freebs_enqueue, KM_SLEEP));
734 }
735 
736 /*ARGSUSED*/
737 mblk_t *
738 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
739 {
740 	mblk_t *mp;
741 
742 	/*
743 	 * Note that this is structured to allow the common case (i.e.
744 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
745 	 * call optimization.
746 	 */
747 	if (!str_ftnever) {
748 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
749 			frp, dblk_lastfree_desb, KM_NOSLEEP);
750 
751 		if (mp != NULL)
752 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
753 		return (mp);
754 	}
755 
756 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
757 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
758 }
759 
760 /*ARGSUSED*/
761 mblk_t *
762 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
763 {
764 	mblk_t *mp;
765 
766 	/*
767 	 * Note that this is structured to allow the common case (i.e.
768 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
769 	 * call optimization.
770 	 */
771 	if (!str_ftnever) {
772 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
773 		    frp, freebs_enqueue, KM_NOSLEEP);
774 
775 		if (mp != NULL)
776 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
777 		return (mp);
778 	}
779 
780 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
781 	    frp, freebs_enqueue, KM_NOSLEEP));
782 }
783 
784 /*ARGSUSED*/
785 mblk_t *
786 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
787 {
788 	mblk_t *mp;
789 
790 	/*
791 	 * Note that this is structured to allow the common case (i.e.
792 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
793 	 * call optimization.
794 	 */
795 	if (!str_ftnever) {
796 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
797 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
798 
799 		if (mp != NULL)
800 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
801 		return (mp);
802 	}
803 
804 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
805 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
806 }
807 
808 static void
809 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
810 {
811 	bcache_t *bcp = dbp->db_cache;
812 
813 	ASSERT(dbp->db_mblk == mp);
814 	if (dbp->db_fthdr != NULL)
815 		str_ftfree(dbp);
816 
817 	/* set credp and projid to be 'unspecified' before returning to cache */
818 	if (dbp->db_credp != NULL) {
819 		crfree(dbp->db_credp);
820 		dbp->db_credp = NULL;
821 	}
822 	dbp->db_cpid = -1;
823 	dbp->db_struioflag = 0;
824 	dbp->db_struioun.cksum.flags = 0;
825 
826 	mutex_enter(&bcp->mutex);
827 	kmem_cache_free(bcp->dblk_cache, dbp);
828 	bcp->alloc--;
829 
830 	if (bcp->alloc == 0 && bcp->destroy != 0) {
831 		kmem_cache_destroy(bcp->dblk_cache);
832 		kmem_cache_destroy(bcp->buffer_cache);
833 		mutex_exit(&bcp->mutex);
834 		mutex_destroy(&bcp->mutex);
835 		kmem_free(bcp, sizeof (bcache_t));
836 	} else {
837 		mutex_exit(&bcp->mutex);
838 	}
839 }
840 
841 bcache_t *
842 bcache_create(char *name, size_t size, uint_t align)
843 {
844 	bcache_t *bcp;
845 	char buffer[255];
846 
847 	ASSERT((align & (align - 1)) == 0);
848 
849 	if ((bcp = (bcache_t *)kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) ==
850 	    NULL) {
851 		return (NULL);
852 	}
853 
854 	bcp->size = size;
855 	bcp->align = align;
856 	bcp->alloc = 0;
857 	bcp->destroy = 0;
858 
859 	mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
860 
861 	(void) sprintf(buffer, "%s_buffer_cache", name);
862 	bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
863 	    NULL, NULL, NULL, 0);
864 	(void) sprintf(buffer, "%s_dblk_cache", name);
865 	bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
866 	    DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
867 						NULL, (void *)bcp, NULL, 0);
868 
869 	return (bcp);
870 }
871 
872 void
873 bcache_destroy(bcache_t *bcp)
874 {
875 	ASSERT(bcp != NULL);
876 
877 	mutex_enter(&bcp->mutex);
878 	if (bcp->alloc == 0) {
879 		kmem_cache_destroy(bcp->dblk_cache);
880 		kmem_cache_destroy(bcp->buffer_cache);
881 		mutex_exit(&bcp->mutex);
882 		mutex_destroy(&bcp->mutex);
883 		kmem_free(bcp, sizeof (bcache_t));
884 	} else {
885 		bcp->destroy++;
886 		mutex_exit(&bcp->mutex);
887 	}
888 }
889 
890 /*ARGSUSED*/
891 mblk_t *
892 bcache_allocb(bcache_t *bcp, uint_t pri)
893 {
894 	dblk_t *dbp;
895 	mblk_t *mp = NULL;
896 
897 	ASSERT(bcp != NULL);
898 
899 	mutex_enter(&bcp->mutex);
900 	if (bcp->destroy != 0) {
901 		mutex_exit(&bcp->mutex);
902 		goto out;
903 	}
904 
905 	if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
906 		mutex_exit(&bcp->mutex);
907 		goto out;
908 	}
909 	bcp->alloc++;
910 	mutex_exit(&bcp->mutex);
911 
912 	ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
913 
914 	mp = dbp->db_mblk;
915 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
916 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
917 	mp->b_rptr = mp->b_wptr = dbp->db_base;
918 	mp->b_queue = NULL;
919 	MBLK_BAND_FLAG_WORD(mp) = 0;
920 	STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
921 out:
922 	FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
923 
924 	return (mp);
925 }
926 
927 static void
928 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
929 {
930 	ASSERT(dbp->db_mblk == mp);
931 	if (dbp->db_fthdr != NULL)
932 		str_ftfree(dbp);
933 
934 	/* set credp and projid to be 'unspecified' before returning to cache */
935 	if (dbp->db_credp != NULL) {
936 		crfree(dbp->db_credp);
937 		dbp->db_credp = NULL;
938 	}
939 	dbp->db_cpid = -1;
940 	dbp->db_struioflag = 0;
941 	dbp->db_struioun.cksum.flags = 0;
942 
943 	kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
944 	kmem_cache_free(dbp->db_cache, dbp);
945 }
946 
947 static mblk_t *
948 allocb_oversize(size_t size, int kmflags)
949 {
950 	mblk_t *mp;
951 	void *buf;
952 
953 	size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
954 	if ((buf = kmem_alloc(size, kmflags)) == NULL)
955 		return (NULL);
956 	if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
957 	    &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
958 		kmem_free(buf, size);
959 
960 	if (mp != NULL)
961 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
962 
963 	return (mp);
964 }
965 
966 mblk_t *
967 allocb_tryhard(size_t target_size)
968 {
969 	size_t size;
970 	mblk_t *bp;
971 
972 	for (size = target_size; size < target_size + 512;
973 	    size += DBLK_CACHE_ALIGN)
974 		if ((bp = allocb(size, BPRI_HI)) != NULL)
975 			return (bp);
976 	allocb_tryhard_fails++;
977 	return (NULL);
978 }
979 
980 /*
981  * This routine is consolidation private for STREAMS internal use
982  * This routine may only be called from sync routines (i.e., not
983  * from put or service procedures).  It is located here (rather
984  * than strsubr.c) so that we don't have to expose all of the
985  * allocb() implementation details in header files.
986  */
987 mblk_t *
988 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
989 {
990 	dblk_t *dbp;
991 	mblk_t *mp;
992 	size_t index;
993 
994 	index = (size -1) >> DBLK_SIZE_SHIFT;
995 
996 	if (flags & STR_NOSIG) {
997 		if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
998 			if (size != 0) {
999 				mp = allocb_oversize(size, KM_SLEEP);
1000 				FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
1001 				    (uintptr_t)mp);
1002 				return (mp);
1003 			}
1004 			index = 0;
1005 		}
1006 
1007 		dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
1008 		mp = dbp->db_mblk;
1009 		DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1010 		mp->b_next = mp->b_prev = mp->b_cont = NULL;
1011 		mp->b_rptr = mp->b_wptr = dbp->db_base;
1012 		mp->b_queue = NULL;
1013 		MBLK_BAND_FLAG_WORD(mp) = 0;
1014 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
1015 
1016 		FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
1017 
1018 	} else {
1019 		while ((mp = allocb(size, pri)) == NULL) {
1020 			if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
1021 				return (NULL);
1022 		}
1023 	}
1024 
1025 	return (mp);
1026 }
1027 
1028 /*
1029  * Call function 'func' with 'arg' when a class zero block can
1030  * be allocated with priority 'pri'.
1031  */
1032 bufcall_id_t
1033 esbbcall(uint_t pri, void (*func)(void *), void *arg)
1034 {
1035 	return (bufcall(1, pri, func, arg));
1036 }
1037 
1038 /*
1039  * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
1040  * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
1041  * This provides consistency for all internal allocators of ioctl.
1042  */
1043 mblk_t *
1044 mkiocb(uint_t cmd)
1045 {
1046 	struct iocblk	*ioc;
1047 	mblk_t		*mp;
1048 
1049 	/*
1050 	 * Allocate enough space for any of the ioctl related messages.
1051 	 */
1052 	if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
1053 		return (NULL);
1054 
1055 	bzero(mp->b_rptr, sizeof (union ioctypes));
1056 
1057 	/*
1058 	 * Set the mblk_t information and ptrs correctly.
1059 	 */
1060 	mp->b_wptr += sizeof (struct iocblk);
1061 	mp->b_datap->db_type = M_IOCTL;
1062 
1063 	/*
1064 	 * Fill in the fields.
1065 	 */
1066 	ioc		= (struct iocblk *)mp->b_rptr;
1067 	ioc->ioc_cmd	= cmd;
1068 	ioc->ioc_cr	= kcred;
1069 	ioc->ioc_id	= getiocseqno();
1070 	ioc->ioc_flag	= IOC_NATIVE;
1071 	return (mp);
1072 }
1073 
1074 /*
1075  * test if block of given size can be allocated with a request of
1076  * the given priority.
1077  * 'pri' is no longer used, but is retained for compatibility.
1078  */
1079 /* ARGSUSED */
1080 int
1081 testb(size_t size, uint_t pri)
1082 {
1083 	return ((size + sizeof (dblk_t)) <= kmem_avail());
1084 }
1085 
1086 /*
1087  * Call function 'func' with argument 'arg' when there is a reasonably
1088  * good chance that a block of size 'size' can be allocated.
1089  * 'pri' is no longer used, but is retained for compatibility.
1090  */
1091 /* ARGSUSED */
1092 bufcall_id_t
1093 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
1094 {
1095 	static long bid = 1;	/* always odd to save checking for zero */
1096 	bufcall_id_t bc_id;
1097 	struct strbufcall *bcp;
1098 
1099 	if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
1100 		return (0);
1101 
1102 	bcp->bc_func = func;
1103 	bcp->bc_arg = arg;
1104 	bcp->bc_size = size;
1105 	bcp->bc_next = NULL;
1106 	bcp->bc_executor = NULL;
1107 
1108 	mutex_enter(&strbcall_lock);
1109 	/*
1110 	 * After bcp is linked into strbcalls and strbcall_lock is dropped there
1111 	 * should be no references to bcp since it may be freed by
1112 	 * runbufcalls(). Since bcp_id field is returned, we save its value in
1113 	 * the local var.
1114 	 */
1115 	bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2);	/* keep it odd */
1116 
1117 	/*
1118 	 * add newly allocated stream event to existing
1119 	 * linked list of events.
1120 	 */
1121 	if (strbcalls.bc_head == NULL) {
1122 		strbcalls.bc_head = strbcalls.bc_tail = bcp;
1123 	} else {
1124 		strbcalls.bc_tail->bc_next = bcp;
1125 		strbcalls.bc_tail = bcp;
1126 	}
1127 
1128 	cv_signal(&strbcall_cv);
1129 	mutex_exit(&strbcall_lock);
1130 	return (bc_id);
1131 }
1132 
1133 /*
1134  * Cancel a bufcall request.
1135  */
1136 void
1137 unbufcall(bufcall_id_t id)
1138 {
1139 	strbufcall_t *bcp, *pbcp;
1140 
1141 	mutex_enter(&strbcall_lock);
1142 again:
1143 	pbcp = NULL;
1144 	for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
1145 		if (id == bcp->bc_id)
1146 			break;
1147 		pbcp = bcp;
1148 	}
1149 	if (bcp) {
1150 		if (bcp->bc_executor != NULL) {
1151 			if (bcp->bc_executor != curthread) {
1152 				cv_wait(&bcall_cv, &strbcall_lock);
1153 				goto again;
1154 			}
1155 		} else {
1156 			if (pbcp)
1157 				pbcp->bc_next = bcp->bc_next;
1158 			else
1159 				strbcalls.bc_head = bcp->bc_next;
1160 			if (bcp == strbcalls.bc_tail)
1161 				strbcalls.bc_tail = pbcp;
1162 			kmem_free(bcp, sizeof (strbufcall_t));
1163 		}
1164 	}
1165 	mutex_exit(&strbcall_lock);
1166 }
1167 
1168 /*
1169  * Duplicate a message block by block (uses dupb), returning
1170  * a pointer to the duplicate message.
1171  * Returns a non-NULL value only if the entire message
1172  * was dup'd.
1173  */
1174 mblk_t *
1175 dupmsg(mblk_t *bp)
1176 {
1177 	mblk_t *head, *nbp;
1178 
1179 	if (!bp || !(nbp = head = dupb(bp)))
1180 		return (NULL);
1181 
1182 	while (bp->b_cont) {
1183 		if (!(nbp->b_cont = dupb(bp->b_cont))) {
1184 			freemsg(head);
1185 			return (NULL);
1186 		}
1187 		nbp = nbp->b_cont;
1188 		bp = bp->b_cont;
1189 	}
1190 	return (head);
1191 }
1192 
1193 #define	DUPB_NOLOAN(bp) \
1194 	((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
1195 	copyb((bp)) : dupb((bp)))
1196 
1197 mblk_t *
1198 dupmsg_noloan(mblk_t *bp)
1199 {
1200 	mblk_t *head, *nbp;
1201 
1202 	if (bp == NULL || DB_TYPE(bp) != M_DATA ||
1203 	    ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
1204 		return (NULL);
1205 
1206 	while (bp->b_cont) {
1207 		if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
1208 			freemsg(head);
1209 			return (NULL);
1210 		}
1211 		nbp = nbp->b_cont;
1212 		bp = bp->b_cont;
1213 	}
1214 	return (head);
1215 }
1216 
1217 /*
1218  * Copy data from message and data block to newly allocated message and
1219  * data block. Returns new message block pointer, or NULL if error.
1220  * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
1221  * as in the original even when db_base is not word aligned. (bug 1052877)
1222  */
1223 mblk_t *
1224 copyb(mblk_t *bp)
1225 {
1226 	mblk_t	*nbp;
1227 	dblk_t	*dp, *ndp;
1228 	uchar_t *base;
1229 	size_t	size;
1230 	size_t	unaligned;
1231 
1232 	ASSERT(bp->b_wptr >= bp->b_rptr);
1233 
1234 	dp = bp->b_datap;
1235 	if (dp->db_fthdr != NULL)
1236 		STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
1237 
1238 	/*
1239 	 * Special handling for Multidata message; this should be
1240 	 * removed once a copy-callback routine is made available.
1241 	 */
1242 	if (dp->db_type == M_MULTIDATA) {
1243 		cred_t *cr;
1244 
1245 		if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL)
1246 			return (NULL);
1247 
1248 		nbp->b_flag = bp->b_flag;
1249 		nbp->b_band = bp->b_band;
1250 		ndp = nbp->b_datap;
1251 
1252 		/* See comments below on potential issues. */
1253 		STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1254 
1255 		ASSERT(ndp->db_type == dp->db_type);
1256 		cr = dp->db_credp;
1257 		if (cr != NULL)
1258 			crhold(ndp->db_credp = cr);
1259 		ndp->db_cpid = dp->db_cpid;
1260 		return (nbp);
1261 	}
1262 
1263 	size = dp->db_lim - dp->db_base;
1264 	unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
1265 	if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
1266 		return (NULL);
1267 	nbp->b_flag = bp->b_flag;
1268 	nbp->b_band = bp->b_band;
1269 	ndp = nbp->b_datap;
1270 
1271 	/*
1272 	 * Well, here is a potential issue.  If we are trying to
1273 	 * trace a flow, and we copy the message, we might lose
1274 	 * information about where this message might have been.
1275 	 * So we should inherit the FT data.  On the other hand,
1276 	 * a user might be interested only in alloc to free data.
1277 	 * So I guess the real answer is to provide a tunable.
1278 	 */
1279 	STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1280 
1281 	base = ndp->db_base + unaligned;
1282 	bcopy(dp->db_base, ndp->db_base + unaligned, size);
1283 
1284 	nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
1285 	nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
1286 
1287 	return (nbp);
1288 }
1289 
1290 /*
1291  * Copy data from message to newly allocated message using new
1292  * data blocks.  Returns a pointer to the new message, or NULL if error.
1293  */
1294 mblk_t *
1295 copymsg(mblk_t *bp)
1296 {
1297 	mblk_t *head, *nbp;
1298 
1299 	if (!bp || !(nbp = head = copyb(bp)))
1300 		return (NULL);
1301 
1302 	while (bp->b_cont) {
1303 		if (!(nbp->b_cont = copyb(bp->b_cont))) {
1304 			freemsg(head);
1305 			return (NULL);
1306 		}
1307 		nbp = nbp->b_cont;
1308 		bp = bp->b_cont;
1309 	}
1310 	return (head);
1311 }
1312 
1313 /*
1314  * link a message block to tail of message
1315  */
1316 void
1317 linkb(mblk_t *mp, mblk_t *bp)
1318 {
1319 	ASSERT(mp && bp);
1320 
1321 	for (; mp->b_cont; mp = mp->b_cont)
1322 		;
1323 	mp->b_cont = bp;
1324 }
1325 
1326 /*
1327  * unlink a message block from head of message
1328  * return pointer to new message.
1329  * NULL if message becomes empty.
1330  */
1331 mblk_t *
1332 unlinkb(mblk_t *bp)
1333 {
1334 	mblk_t *bp1;
1335 
1336 	bp1 = bp->b_cont;
1337 	bp->b_cont = NULL;
1338 	return (bp1);
1339 }
1340 
1341 /*
1342  * remove a message block "bp" from message "mp"
1343  *
1344  * Return pointer to new message or NULL if no message remains.
1345  * Return -1 if bp is not found in message.
1346  */
1347 mblk_t *
1348 rmvb(mblk_t *mp, mblk_t *bp)
1349 {
1350 	mblk_t *tmp;
1351 	mblk_t *lastp = NULL;
1352 
1353 	ASSERT(mp && bp);
1354 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
1355 		if (tmp == bp) {
1356 			if (lastp)
1357 				lastp->b_cont = tmp->b_cont;
1358 			else
1359 				mp = tmp->b_cont;
1360 			tmp->b_cont = NULL;
1361 			return (mp);
1362 		}
1363 		lastp = tmp;
1364 	}
1365 	return ((mblk_t *)-1);
1366 }
1367 
1368 /*
1369  * Concatenate and align first len bytes of common
1370  * message type.  Len == -1, means concat everything.
1371  * Returns 1 on success, 0 on failure
1372  * After the pullup, mp points to the pulled up data.
1373  */
1374 int
1375 pullupmsg(mblk_t *mp, ssize_t len)
1376 {
1377 	mblk_t *bp, *b_cont;
1378 	dblk_t *dbp;
1379 	ssize_t n;
1380 
1381 	ASSERT(mp->b_datap->db_ref > 0);
1382 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
1383 
1384 	/*
1385 	 * We won't handle Multidata message, since it contains
1386 	 * metadata which this function has no knowledge of; we
1387 	 * assert on DEBUG, and return failure otherwise.
1388 	 */
1389 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1390 	if (mp->b_datap->db_type == M_MULTIDATA)
1391 		return (0);
1392 
1393 	if (len == -1) {
1394 		if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
1395 			return (1);
1396 		len = xmsgsize(mp);
1397 	} else {
1398 		ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
1399 		ASSERT(first_mblk_len >= 0);
1400 		/*
1401 		 * If the length is less than that of the first mblk,
1402 		 * we want to pull up the message into an aligned mblk.
1403 		 * Though not part of the spec, some callers assume it.
1404 		 */
1405 		if (len <= first_mblk_len) {
1406 			if (str_aligned(mp->b_rptr))
1407 				return (1);
1408 			len = first_mblk_len;
1409 		} else if (xmsgsize(mp) < len)
1410 			return (0);
1411 	}
1412 
1413 	if ((bp = allocb_tmpl(len, mp)) == NULL)
1414 		return (0);
1415 
1416 	dbp = bp->b_datap;
1417 	*bp = *mp;		/* swap mblks so bp heads the old msg... */
1418 	mp->b_datap = dbp;	/* ... and mp heads the new message */
1419 	mp->b_datap->db_mblk = mp;
1420 	bp->b_datap->db_mblk = bp;
1421 	mp->b_rptr = mp->b_wptr = dbp->db_base;
1422 
1423 	do {
1424 		ASSERT(bp->b_datap->db_ref > 0);
1425 		ASSERT(bp->b_wptr >= bp->b_rptr);
1426 		n = MIN(bp->b_wptr - bp->b_rptr, len);
1427 		bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
1428 		mp->b_wptr += n;
1429 		bp->b_rptr += n;
1430 		len -= n;
1431 		if (bp->b_rptr != bp->b_wptr)
1432 			break;
1433 		b_cont = bp->b_cont;
1434 		freeb(bp);
1435 		bp = b_cont;
1436 	} while (len && bp);
1437 
1438 	mp->b_cont = bp;	/* tack on whatever wasn't pulled up */
1439 
1440 	return (1);
1441 }
1442 
1443 /*
1444  * Concatenate and align at least the first len bytes of common message
1445  * type.  Len == -1 means concatenate everything.  The original message is
1446  * unaltered.  Returns a pointer to a new message on success, otherwise
1447  * returns NULL.
1448  */
1449 mblk_t *
1450 msgpullup(mblk_t *mp, ssize_t len)
1451 {
1452 	mblk_t	*newmp;
1453 	ssize_t	totlen;
1454 	ssize_t	n;
1455 
1456 	/*
1457 	 * We won't handle Multidata message, since it contains
1458 	 * metadata which this function has no knowledge of; we
1459 	 * assert on DEBUG, and return failure otherwise.
1460 	 */
1461 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1462 	if (mp->b_datap->db_type == M_MULTIDATA)
1463 		return (NULL);
1464 
1465 	totlen = xmsgsize(mp);
1466 
1467 	if ((len > 0) && (len > totlen))
1468 		return (NULL);
1469 
1470 	/*
1471 	 * Copy all of the first msg type into one new mblk, then dupmsg
1472 	 * and link the rest onto this.
1473 	 */
1474 
1475 	len = totlen;
1476 
1477 	if ((newmp = allocb_tmpl(len, mp)) == NULL)
1478 		return (NULL);
1479 
1480 	newmp->b_flag = mp->b_flag;
1481 	newmp->b_band = mp->b_band;
1482 
1483 	while (len > 0) {
1484 		n = mp->b_wptr - mp->b_rptr;
1485 		ASSERT(n >= 0);		/* allow zero-length mblk_t's */
1486 		if (n > 0)
1487 			bcopy(mp->b_rptr, newmp->b_wptr, n);
1488 		newmp->b_wptr += n;
1489 		len -= n;
1490 		mp = mp->b_cont;
1491 	}
1492 
1493 	if (mp != NULL) {
1494 		newmp->b_cont = dupmsg(mp);
1495 		if (newmp->b_cont == NULL) {
1496 			freemsg(newmp);
1497 			return (NULL);
1498 		}
1499 	}
1500 
1501 	return (newmp);
1502 }
1503 
1504 /*
1505  * Trim bytes from message
1506  *  len > 0, trim from head
1507  *  len < 0, trim from tail
1508  * Returns 1 on success, 0 on failure.
1509  */
1510 int
1511 adjmsg(mblk_t *mp, ssize_t len)
1512 {
1513 	mblk_t *bp;
1514 	mblk_t *save_bp = NULL;
1515 	mblk_t *prev_bp;
1516 	mblk_t *bcont;
1517 	unsigned char type;
1518 	ssize_t n;
1519 	int fromhead;
1520 	int first;
1521 
1522 	ASSERT(mp != NULL);
1523 	/*
1524 	 * We won't handle Multidata message, since it contains
1525 	 * metadata which this function has no knowledge of; we
1526 	 * assert on DEBUG, and return failure otherwise.
1527 	 */
1528 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1529 	if (mp->b_datap->db_type == M_MULTIDATA)
1530 		return (0);
1531 
1532 	if (len < 0) {
1533 		fromhead = 0;
1534 		len = -len;
1535 	} else {
1536 		fromhead = 1;
1537 	}
1538 
1539 	if (xmsgsize(mp) < len)
1540 		return (0);
1541 
1542 
1543 	if (fromhead) {
1544 		first = 1;
1545 		while (len) {
1546 			ASSERT(mp->b_wptr >= mp->b_rptr);
1547 			n = MIN(mp->b_wptr - mp->b_rptr, len);
1548 			mp->b_rptr += n;
1549 			len -= n;
1550 
1551 			/*
1552 			 * If this is not the first zero length
1553 			 * message remove it
1554 			 */
1555 			if (!first && (mp->b_wptr == mp->b_rptr)) {
1556 				bcont = mp->b_cont;
1557 				freeb(mp);
1558 				mp = save_bp->b_cont = bcont;
1559 			} else {
1560 				save_bp = mp;
1561 				mp = mp->b_cont;
1562 			}
1563 			first = 0;
1564 		}
1565 	} else {
1566 		type = mp->b_datap->db_type;
1567 		while (len) {
1568 			bp = mp;
1569 			save_bp = NULL;
1570 
1571 			/*
1572 			 * Find the last message of same type
1573 			 */
1574 
1575 			while (bp && bp->b_datap->db_type == type) {
1576 				ASSERT(bp->b_wptr >= bp->b_rptr);
1577 				prev_bp = save_bp;
1578 				save_bp = bp;
1579 				bp = bp->b_cont;
1580 			}
1581 			if (save_bp == NULL)
1582 				break;
1583 			n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
1584 			save_bp->b_wptr -= n;
1585 			len -= n;
1586 
1587 			/*
1588 			 * If this is not the first message
1589 			 * and we have taken away everything
1590 			 * from this message, remove it
1591 			 */
1592 
1593 			if ((save_bp != mp) &&
1594 				(save_bp->b_wptr == save_bp->b_rptr)) {
1595 				bcont = save_bp->b_cont;
1596 				freeb(save_bp);
1597 				prev_bp->b_cont = bcont;
1598 			}
1599 		}
1600 	}
1601 	return (1);
1602 }
1603 
1604 /*
1605  * get number of data bytes in message
1606  */
1607 size_t
1608 msgdsize(mblk_t *bp)
1609 {
1610 	size_t count = 0;
1611 
1612 	for (; bp; bp = bp->b_cont)
1613 		if (bp->b_datap->db_type == M_DATA) {
1614 			ASSERT(bp->b_wptr >= bp->b_rptr);
1615 			count += bp->b_wptr - bp->b_rptr;
1616 		}
1617 	return (count);
1618 }
1619 
1620 /*
1621  * Get a message off head of queue
1622  *
1623  * If queue has no buffers then mark queue
1624  * with QWANTR. (queue wants to be read by
1625  * someone when data becomes available)
1626  *
1627  * If there is something to take off then do so.
1628  * If queue falls below hi water mark turn off QFULL
1629  * flag.  Decrement weighted count of queue.
1630  * Also turn off QWANTR because queue is being read.
1631  *
1632  * The queue count is maintained on a per-band basis.
1633  * Priority band 0 (normal messages) uses q_count,
1634  * q_lowat, etc.  Non-zero priority bands use the
1635  * fields in their respective qband structures
1636  * (qb_count, qb_lowat, etc.)  All messages appear
1637  * on the same list, linked via their b_next pointers.
1638  * q_first is the head of the list.  q_count does
1639  * not reflect the size of all the messages on the
1640  * queue.  It only reflects those messages in the
1641  * normal band of flow.  The one exception to this
1642  * deals with high priority messages.  They are in
1643  * their own conceptual "band", but are accounted
1644  * against q_count.
1645  *
1646  * If queue count is below the lo water mark and QWANTW
1647  * is set, enable the closest backq which has a service
1648  * procedure and turn off the QWANTW flag.
1649  *
1650  * getq could be built on top of rmvq, but isn't because
1651  * of performance considerations.
1652  *
1653  * A note on the use of q_count and q_mblkcnt:
1654  *   q_count is the traditional byte count for messages that
1655  *   have been put on a queue.  Documentation tells us that
1656  *   we shouldn't rely on that count, but some drivers/modules
1657  *   do.  What was needed, however, is a mechanism to prevent
1658  *   runaway streams from consuming all of the resources,
1659  *   and particularly be able to flow control zero-length
1660  *   messages.  q_mblkcnt is used for this purpose.  It
1661  *   counts the number of mblk's that are being put on
1662  *   the queue.  The intention here, is that each mblk should
1663  *   contain one byte of data and, for the purpose of
1664  *   flow-control, logically does.  A queue will become
1665  *   full when EITHER of these values (q_count and q_mblkcnt)
1666  *   reach the highwater mark.  It will clear when BOTH
1667  *   of them drop below the highwater mark.  And it will
1668  *   backenable when BOTH of them drop below the lowwater
1669  *   mark.
1670  *   With this algorithm, a driver/module might be able
1671  *   to find a reasonably accurate q_count, and the
1672  *   framework can still try and limit resource usage.
1673  */
1674 mblk_t *
1675 getq(queue_t *q)
1676 {
1677 	mblk_t *bp;
1678 	uchar_t band = 0;
1679 
1680 	bp = getq_noenab(q);
1681 	if (bp != NULL)
1682 		band = bp->b_band;
1683 
1684 	/*
1685 	 * Inlined from qbackenable().
1686 	 * Quick check without holding the lock.
1687 	 */
1688 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1689 		return (bp);
1690 
1691 	qbackenable(q, band);
1692 	return (bp);
1693 }
1694 
1695 /*
1696  * Calculate number of data bytes in a single data message block taking
1697  * multidata messages into account.
1698  */
1699 
1700 #define	ADD_MBLK_SIZE(mp, size) 					\
1701 	if (DB_TYPE(mp) != M_MULTIDATA) {				\
1702 		(size) += MBLKL(mp);					\
1703 	} else {							\
1704 		uint_t	pinuse;						\
1705 									\
1706 		mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse);	\
1707 		(size) += pinuse;					\
1708 	}
1709 
1710 /*
1711  * Like getq() but does not backenable.  This is used by the stream
1712  * head when a putback() is likely.  The caller must call qbackenable()
1713  * after it is done with accessing the queue.
1714  */
1715 mblk_t *
1716 getq_noenab(queue_t *q)
1717 {
1718 	mblk_t *bp;
1719 	mblk_t *tmp;
1720 	qband_t *qbp;
1721 	kthread_id_t freezer;
1722 	int	bytecnt = 0, mblkcnt = 0;
1723 
1724 	/* freezestr should allow its caller to call getq/putq */
1725 	freezer = STREAM(q)->sd_freezer;
1726 	if (freezer == curthread) {
1727 		ASSERT(frozenstr(q));
1728 		ASSERT(MUTEX_HELD(QLOCK(q)));
1729 	} else
1730 		mutex_enter(QLOCK(q));
1731 
1732 	if ((bp = q->q_first) == 0) {
1733 		q->q_flag |= QWANTR;
1734 	} else {
1735 		if ((q->q_first = bp->b_next) == NULL)
1736 			q->q_last = NULL;
1737 		else
1738 			q->q_first->b_prev = NULL;
1739 
1740 		/* Get message byte count for q_count accounting */
1741 		for (tmp = bp; tmp; tmp = tmp->b_cont) {
1742 			ADD_MBLK_SIZE(tmp, bytecnt);
1743 			mblkcnt++;
1744 		}
1745 
1746 		if (bp->b_band == 0) {
1747 			q->q_count -= bytecnt;
1748 			q->q_mblkcnt -= mblkcnt;
1749 			if ((q->q_count < q->q_hiwat) &&
1750 			    (q->q_mblkcnt < q->q_hiwat)) {
1751 				q->q_flag &= ~QFULL;
1752 			}
1753 		} else {
1754 			int i;
1755 
1756 			ASSERT(bp->b_band <= q->q_nband);
1757 			ASSERT(q->q_bandp != NULL);
1758 			ASSERT(MUTEX_HELD(QLOCK(q)));
1759 			qbp = q->q_bandp;
1760 			i = bp->b_band;
1761 			while (--i > 0)
1762 				qbp = qbp->qb_next;
1763 			if (qbp->qb_first == qbp->qb_last) {
1764 				qbp->qb_first = NULL;
1765 				qbp->qb_last = NULL;
1766 			} else {
1767 				qbp->qb_first = bp->b_next;
1768 			}
1769 			qbp->qb_count -= bytecnt;
1770 			qbp->qb_mblkcnt -= mblkcnt;
1771 			if ((qbp->qb_count < qbp->qb_hiwat) &&
1772 			    (qbp->qb_mblkcnt < qbp->qb_hiwat)) {
1773 				qbp->qb_flag &= ~QB_FULL;
1774 			}
1775 		}
1776 		q->q_flag &= ~QWANTR;
1777 		bp->b_next = NULL;
1778 		bp->b_prev = NULL;
1779 	}
1780 	if (freezer != curthread)
1781 		mutex_exit(QLOCK(q));
1782 
1783 	STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL);
1784 
1785 	return (bp);
1786 }
1787 
1788 /*
1789  * Determine if a backenable is needed after removing a message in the
1790  * specified band.
1791  * NOTE: This routine assumes that something like getq_noenab() has been
1792  * already called.
1793  *
1794  * For the read side it is ok to hold sd_lock across calling this (and the
1795  * stream head often does).
1796  * But for the write side strwakeq might be invoked and it acquires sd_lock.
1797  */
1798 void
1799 qbackenable(queue_t *q, uchar_t band)
1800 {
1801 	int backenab = 0;
1802 	qband_t *qbp;
1803 	kthread_id_t freezer;
1804 
1805 	ASSERT(q);
1806 	ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
1807 
1808 	/*
1809 	 * Quick check without holding the lock.
1810 	 * OK since after getq() has lowered the q_count these flags
1811 	 * would not change unless either the qbackenable() is done by
1812 	 * another thread (which is ok) or the queue has gotten QFULL
1813 	 * in which case another backenable will take place when the queue
1814 	 * drops below q_lowat.
1815 	 */
1816 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1817 		return;
1818 
1819 	/* freezestr should allow its caller to call getq/putq */
1820 	freezer = STREAM(q)->sd_freezer;
1821 	if (freezer == curthread) {
1822 		ASSERT(frozenstr(q));
1823 		ASSERT(MUTEX_HELD(QLOCK(q)));
1824 	} else
1825 		mutex_enter(QLOCK(q));
1826 
1827 	if (band == 0) {
1828 		if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
1829 		    q->q_mblkcnt < q->q_lowat)) {
1830 			backenab = q->q_flag & (QWANTW|QWANTWSYNC);
1831 		}
1832 	} else {
1833 		int i;
1834 
1835 		ASSERT((unsigned)band <= q->q_nband);
1836 		ASSERT(q->q_bandp != NULL);
1837 
1838 		qbp = q->q_bandp;
1839 		i = band;
1840 		while (--i > 0)
1841 			qbp = qbp->qb_next;
1842 
1843 		if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
1844 		    qbp->qb_mblkcnt < qbp->qb_lowat)) {
1845 			backenab = qbp->qb_flag & QB_WANTW;
1846 		}
1847 	}
1848 
1849 	if (backenab == 0) {
1850 		if (freezer != curthread)
1851 			mutex_exit(QLOCK(q));
1852 		return;
1853 	}
1854 
1855 	/* Have to drop the lock across strwakeq and backenable */
1856 	if (backenab & QWANTWSYNC)
1857 		q->q_flag &= ~QWANTWSYNC;
1858 	if (backenab & (QWANTW|QB_WANTW)) {
1859 		if (band != 0)
1860 			qbp->qb_flag &= ~QB_WANTW;
1861 		else {
1862 			q->q_flag &= ~QWANTW;
1863 		}
1864 	}
1865 
1866 	if (freezer != curthread)
1867 		mutex_exit(QLOCK(q));
1868 
1869 	if (backenab & QWANTWSYNC)
1870 		strwakeq(q, QWANTWSYNC);
1871 	if (backenab & (QWANTW|QB_WANTW))
1872 		backenable(q, band);
1873 }
1874 
1875 /*
1876  * Remove a message from a queue.  The queue count and other
1877  * flow control parameters are adjusted and the back queue
1878  * enabled if necessary.
1879  *
1880  * rmvq can be called with the stream frozen, but other utility functions
1881  * holding QLOCK, and by streams modules without any locks/frozen.
1882  */
1883 void
1884 rmvq(queue_t *q, mblk_t *mp)
1885 {
1886 	ASSERT(mp != NULL);
1887 
1888 	rmvq_noenab(q, mp);
1889 	if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
1890 		/*
1891 		 * qbackenable can handle a frozen stream but not a "random"
1892 		 * qlock being held. Drop lock across qbackenable.
1893 		 */
1894 		mutex_exit(QLOCK(q));
1895 		qbackenable(q, mp->b_band);
1896 		mutex_enter(QLOCK(q));
1897 	} else {
1898 		qbackenable(q, mp->b_band);
1899 	}
1900 }
1901 
1902 /*
1903  * Like rmvq() but without any backenabling.
1904  * This exists to handle SR_CONSOL_DATA in strrput().
1905  */
1906 void
1907 rmvq_noenab(queue_t *q, mblk_t *mp)
1908 {
1909 	mblk_t *tmp;
1910 	int i;
1911 	qband_t *qbp = NULL;
1912 	kthread_id_t freezer;
1913 	int	bytecnt = 0, mblkcnt = 0;
1914 
1915 	freezer = STREAM(q)->sd_freezer;
1916 	if (freezer == curthread) {
1917 		ASSERT(frozenstr(q));
1918 		ASSERT(MUTEX_HELD(QLOCK(q)));
1919 	} else if (MUTEX_HELD(QLOCK(q))) {
1920 		/* Don't drop lock on exit */
1921 		freezer = curthread;
1922 	} else
1923 		mutex_enter(QLOCK(q));
1924 
1925 	ASSERT(mp->b_band <= q->q_nband);
1926 	if (mp->b_band != 0) {		/* Adjust band pointers */
1927 		ASSERT(q->q_bandp != NULL);
1928 		qbp = q->q_bandp;
1929 		i = mp->b_band;
1930 		while (--i > 0)
1931 			qbp = qbp->qb_next;
1932 		if (mp == qbp->qb_first) {
1933 			if (mp->b_next && mp->b_band == mp->b_next->b_band)
1934 				qbp->qb_first = mp->b_next;
1935 			else
1936 				qbp->qb_first = NULL;
1937 		}
1938 		if (mp == qbp->qb_last) {
1939 			if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
1940 				qbp->qb_last = mp->b_prev;
1941 			else
1942 				qbp->qb_last = NULL;
1943 		}
1944 	}
1945 
1946 	/*
1947 	 * Remove the message from the list.
1948 	 */
1949 	if (mp->b_prev)
1950 		mp->b_prev->b_next = mp->b_next;
1951 	else
1952 		q->q_first = mp->b_next;
1953 	if (mp->b_next)
1954 		mp->b_next->b_prev = mp->b_prev;
1955 	else
1956 		q->q_last = mp->b_prev;
1957 	mp->b_next = NULL;
1958 	mp->b_prev = NULL;
1959 
1960 	/* Get the size of the message for q_count accounting */
1961 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
1962 		ADD_MBLK_SIZE(tmp, bytecnt);
1963 		mblkcnt++;
1964 	}
1965 
1966 	if (mp->b_band == 0) {		/* Perform q_count accounting */
1967 		q->q_count -= bytecnt;
1968 		q->q_mblkcnt -= mblkcnt;
1969 		if ((q->q_count < q->q_hiwat) &&
1970 		    (q->q_mblkcnt < q->q_hiwat)) {
1971 			q->q_flag &= ~QFULL;
1972 		}
1973 	} else {			/* Perform qb_count accounting */
1974 		qbp->qb_count -= bytecnt;
1975 		qbp->qb_mblkcnt -= mblkcnt;
1976 		if ((qbp->qb_count < qbp->qb_hiwat) &&
1977 		    (qbp->qb_mblkcnt < qbp->qb_hiwat)) {
1978 			qbp->qb_flag &= ~QB_FULL;
1979 		}
1980 	}
1981 	if (freezer != curthread)
1982 		mutex_exit(QLOCK(q));
1983 
1984 	STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL);
1985 }
1986 
1987 /*
1988  * Empty a queue.
1989  * If flag is set, remove all messages.  Otherwise, remove
1990  * only non-control messages.  If queue falls below its low
1991  * water mark, and QWANTW is set, enable the nearest upstream
1992  * service procedure.
1993  *
1994  * Historical note: when merging the M_FLUSH code in strrput with this
1995  * code one difference was discovered. flushq did not have a check
1996  * for q_lowat == 0 in the backenabling test.
1997  *
1998  * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
1999  * if one exists on the queue.
2000  */
2001 void
2002 flushq_common(queue_t *q, int flag, int pcproto_flag)
2003 {
2004 	mblk_t *mp, *nmp;
2005 	qband_t *qbp;
2006 	int backenab = 0;
2007 	unsigned char bpri;
2008 	unsigned char	qbf[NBAND];	/* band flushing backenable flags */
2009 
2010 	if (q->q_first == NULL)
2011 		return;
2012 
2013 	mutex_enter(QLOCK(q));
2014 	mp = q->q_first;
2015 	q->q_first = NULL;
2016 	q->q_last = NULL;
2017 	q->q_count = 0;
2018 	q->q_mblkcnt = 0;
2019 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2020 		qbp->qb_first = NULL;
2021 		qbp->qb_last = NULL;
2022 		qbp->qb_count = 0;
2023 		qbp->qb_mblkcnt = 0;
2024 		qbp->qb_flag &= ~QB_FULL;
2025 	}
2026 	q->q_flag &= ~QFULL;
2027 	mutex_exit(QLOCK(q));
2028 	while (mp) {
2029 		nmp = mp->b_next;
2030 		mp->b_next = mp->b_prev = NULL;
2031 
2032 		STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL);
2033 
2034 		if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
2035 			(void) putq(q, mp);
2036 		else if (flag || datamsg(mp->b_datap->db_type))
2037 			freemsg(mp);
2038 		else
2039 			(void) putq(q, mp);
2040 		mp = nmp;
2041 	}
2042 	bpri = 1;
2043 	mutex_enter(QLOCK(q));
2044 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2045 		if ((qbp->qb_flag & QB_WANTW) &&
2046 		    (((qbp->qb_count < qbp->qb_lowat) &&
2047 		    (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
2048 		    qbp->qb_lowat == 0)) {
2049 			qbp->qb_flag &= ~QB_WANTW;
2050 			backenab = 1;
2051 			qbf[bpri] = 1;
2052 		} else
2053 			qbf[bpri] = 0;
2054 		bpri++;
2055 	}
2056 	ASSERT(bpri == (unsigned char)(q->q_nband + 1));
2057 	if ((q->q_flag & QWANTW) &&
2058 	    (((q->q_count < q->q_lowat) &&
2059 	    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2060 		q->q_flag &= ~QWANTW;
2061 		backenab = 1;
2062 		qbf[0] = 1;
2063 	} else
2064 		qbf[0] = 0;
2065 
2066 	/*
2067 	 * If any band can now be written to, and there is a writer
2068 	 * for that band, then backenable the closest service procedure.
2069 	 */
2070 	if (backenab) {
2071 		mutex_exit(QLOCK(q));
2072 		for (bpri = q->q_nband; bpri != 0; bpri--)
2073 			if (qbf[bpri])
2074 				backenable(q, bpri);
2075 		if (qbf[0])
2076 			backenable(q, 0);
2077 	} else
2078 		mutex_exit(QLOCK(q));
2079 }
2080 
2081 /*
2082  * The real flushing takes place in flushq_common. This is done so that
2083  * a flag which specifies whether or not M_PCPROTO messages should be flushed
2084  * or not. Currently the only place that uses this flag is the stream head.
2085  */
2086 void
2087 flushq(queue_t *q, int flag)
2088 {
2089 	flushq_common(q, flag, 0);
2090 }
2091 
2092 /*
2093  * Flush the queue of messages of the given priority band.
2094  * There is some duplication of code between flushq and flushband.
2095  * This is because we want to optimize the code as much as possible.
2096  * The assumption is that there will be more messages in the normal
2097  * (priority 0) band than in any other.
2098  *
2099  * Historical note: when merging the M_FLUSH code in strrput with this
2100  * code one difference was discovered. flushband had an extra check for
2101  * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
2102  * case. That check does not match the man page for flushband and was not
2103  * in the strrput flush code hence it was removed.
2104  */
2105 void
2106 flushband(queue_t *q, unsigned char pri, int flag)
2107 {
2108 	mblk_t *mp;
2109 	mblk_t *nmp;
2110 	mblk_t *last;
2111 	qband_t *qbp;
2112 	int band;
2113 
2114 	ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
2115 	if (pri > q->q_nband) {
2116 		return;
2117 	}
2118 	mutex_enter(QLOCK(q));
2119 	if (pri == 0) {
2120 		mp = q->q_first;
2121 		q->q_first = NULL;
2122 		q->q_last = NULL;
2123 		q->q_count = 0;
2124 		q->q_mblkcnt = 0;
2125 		for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2126 			qbp->qb_first = NULL;
2127 			qbp->qb_last = NULL;
2128 			qbp->qb_count = 0;
2129 			qbp->qb_mblkcnt = 0;
2130 			qbp->qb_flag &= ~QB_FULL;
2131 		}
2132 		q->q_flag &= ~QFULL;
2133 		mutex_exit(QLOCK(q));
2134 		while (mp) {
2135 			nmp = mp->b_next;
2136 			mp->b_next = mp->b_prev = NULL;
2137 			if ((mp->b_band == 0) &&
2138 				((flag == FLUSHALL) ||
2139 				datamsg(mp->b_datap->db_type)))
2140 				freemsg(mp);
2141 			else
2142 				(void) putq(q, mp);
2143 			mp = nmp;
2144 		}
2145 		mutex_enter(QLOCK(q));
2146 		if ((q->q_flag & QWANTW) &&
2147 		    (((q->q_count < q->q_lowat) &&
2148 		    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2149 			q->q_flag &= ~QWANTW;
2150 			mutex_exit(QLOCK(q));
2151 
2152 			backenable(q, pri);
2153 		} else
2154 			mutex_exit(QLOCK(q));
2155 	} else {	/* pri != 0 */
2156 		boolean_t flushed = B_FALSE;
2157 		band = pri;
2158 
2159 		ASSERT(MUTEX_HELD(QLOCK(q)));
2160 		qbp = q->q_bandp;
2161 		while (--band > 0)
2162 			qbp = qbp->qb_next;
2163 		mp = qbp->qb_first;
2164 		if (mp == NULL) {
2165 			mutex_exit(QLOCK(q));
2166 			return;
2167 		}
2168 		last = qbp->qb_last->b_next;
2169 		/*
2170 		 * rmvq_noenab() and freemsg() are called for each mblk that
2171 		 * meets the criteria.  The loop is executed until the last
2172 		 * mblk has been processed.
2173 		 */
2174 		while (mp != last) {
2175 			ASSERT(mp->b_band == pri);
2176 			nmp = mp->b_next;
2177 			if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
2178 				rmvq_noenab(q, mp);
2179 				freemsg(mp);
2180 				flushed = B_TRUE;
2181 			}
2182 			mp = nmp;
2183 		}
2184 		mutex_exit(QLOCK(q));
2185 
2186 		/*
2187 		 * If any mblk(s) has been freed, we know that qbackenable()
2188 		 * will need to be called.
2189 		 */
2190 		if (flushed)
2191 			qbackenable(q, pri);
2192 	}
2193 }
2194 
2195 /*
2196  * Return 1 if the queue is not full.  If the queue is full, return
2197  * 0 (may not put message) and set QWANTW flag (caller wants to write
2198  * to the queue).
2199  */
2200 int
2201 canput(queue_t *q)
2202 {
2203 	TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
2204 
2205 	/* this is for loopback transports, they should not do a canput */
2206 	ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
2207 
2208 	/* Find next forward module that has a service procedure */
2209 	q = q->q_nfsrv;
2210 
2211 	if (!(q->q_flag & QFULL)) {
2212 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2213 		return (1);
2214 	}
2215 	mutex_enter(QLOCK(q));
2216 	if (q->q_flag & QFULL) {
2217 		q->q_flag |= QWANTW;
2218 		mutex_exit(QLOCK(q));
2219 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
2220 		return (0);
2221 	}
2222 	mutex_exit(QLOCK(q));
2223 	TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2224 	return (1);
2225 }
2226 
2227 /*
2228  * This is the new canput for use with priority bands.  Return 1 if the
2229  * band is not full.  If the band is full, return 0 (may not put message)
2230  * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
2231  * write to the queue).
2232  */
2233 int
2234 bcanput(queue_t *q, unsigned char pri)
2235 {
2236 	qband_t *qbp;
2237 
2238 	TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
2239 	if (!q)
2240 		return (0);
2241 
2242 	/* Find next forward module that has a service procedure */
2243 	q = q->q_nfsrv;
2244 
2245 	mutex_enter(QLOCK(q));
2246 	if (pri == 0) {
2247 		if (q->q_flag & QFULL) {
2248 			q->q_flag |= QWANTW;
2249 			mutex_exit(QLOCK(q));
2250 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2251 				"bcanput:%p %X %d", q, pri, 0);
2252 			return (0);
2253 		}
2254 	} else {	/* pri != 0 */
2255 		if (pri > q->q_nband) {
2256 			/*
2257 			 * No band exists yet, so return success.
2258 			 */
2259 			mutex_exit(QLOCK(q));
2260 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2261 				"bcanput:%p %X %d", q, pri, 1);
2262 			return (1);
2263 		}
2264 		qbp = q->q_bandp;
2265 		while (--pri)
2266 			qbp = qbp->qb_next;
2267 		if (qbp->qb_flag & QB_FULL) {
2268 			qbp->qb_flag |= QB_WANTW;
2269 			mutex_exit(QLOCK(q));
2270 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2271 				"bcanput:%p %X %d", q, pri, 0);
2272 			return (0);
2273 		}
2274 	}
2275 	mutex_exit(QLOCK(q));
2276 	TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2277 		"bcanput:%p %X %d", q, pri, 1);
2278 	return (1);
2279 }
2280 
2281 /*
2282  * Put a message on a queue.
2283  *
2284  * Messages are enqueued on a priority basis.  The priority classes
2285  * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
2286  * and B_NORMAL (type < QPCTL && band == 0).
2287  *
2288  * Add appropriate weighted data block sizes to queue count.
2289  * If queue hits high water mark then set QFULL flag.
2290  *
2291  * If QNOENAB is not set (putq is allowed to enable the queue),
2292  * enable the queue only if the message is PRIORITY,
2293  * or the QWANTR flag is set (indicating that the service procedure
2294  * is ready to read the queue.  This implies that a service
2295  * procedure must NEVER put a high priority message back on its own
2296  * queue, as this would result in an infinite loop (!).
2297  */
2298 int
2299 putq(queue_t *q, mblk_t *bp)
2300 {
2301 	mblk_t *tmp;
2302 	qband_t *qbp = NULL;
2303 	int mcls = (int)queclass(bp);
2304 	kthread_id_t freezer;
2305 	int	bytecnt = 0, mblkcnt = 0;
2306 
2307 	freezer = STREAM(q)->sd_freezer;
2308 	if (freezer == curthread) {
2309 		ASSERT(frozenstr(q));
2310 		ASSERT(MUTEX_HELD(QLOCK(q)));
2311 	} else
2312 		mutex_enter(QLOCK(q));
2313 
2314 	/*
2315 	 * Make sanity checks and if qband structure is not yet
2316 	 * allocated, do so.
2317 	 */
2318 	if (mcls == QPCTL) {
2319 		if (bp->b_band != 0)
2320 			bp->b_band = 0;		/* force to be correct */
2321 	} else if (bp->b_band != 0) {
2322 		int i;
2323 		qband_t **qbpp;
2324 
2325 		if (bp->b_band > q->q_nband) {
2326 
2327 			/*
2328 			 * The qband structure for this priority band is
2329 			 * not on the queue yet, so we have to allocate
2330 			 * one on the fly.  It would be wasteful to
2331 			 * associate the qband structures with every
2332 			 * queue when the queues are allocated.  This is
2333 			 * because most queues will only need the normal
2334 			 * band of flow which can be described entirely
2335 			 * by the queue itself.
2336 			 */
2337 			qbpp = &q->q_bandp;
2338 			while (*qbpp)
2339 				qbpp = &(*qbpp)->qb_next;
2340 			while (bp->b_band > q->q_nband) {
2341 				if ((*qbpp = allocband()) == NULL) {
2342 					if (freezer != curthread)
2343 						mutex_exit(QLOCK(q));
2344 					return (0);
2345 				}
2346 				(*qbpp)->qb_hiwat = q->q_hiwat;
2347 				(*qbpp)->qb_lowat = q->q_lowat;
2348 				q->q_nband++;
2349 				qbpp = &(*qbpp)->qb_next;
2350 			}
2351 		}
2352 		ASSERT(MUTEX_HELD(QLOCK(q)));
2353 		qbp = q->q_bandp;
2354 		i = bp->b_band;
2355 		while (--i)
2356 			qbp = qbp->qb_next;
2357 	}
2358 
2359 	/*
2360 	 * If queue is empty, add the message and initialize the pointers.
2361 	 * Otherwise, adjust message pointers and queue pointers based on
2362 	 * the type of the message and where it belongs on the queue.  Some
2363 	 * code is duplicated to minimize the number of conditionals and
2364 	 * hopefully minimize the amount of time this routine takes.
2365 	 */
2366 	if (!q->q_first) {
2367 		bp->b_next = NULL;
2368 		bp->b_prev = NULL;
2369 		q->q_first = bp;
2370 		q->q_last = bp;
2371 		if (qbp) {
2372 			qbp->qb_first = bp;
2373 			qbp->qb_last = bp;
2374 		}
2375 	} else if (!qbp) {	/* bp->b_band == 0 */
2376 
2377 		/*
2378 		 * If queue class of message is less than or equal to
2379 		 * that of the last one on the queue, tack on to the end.
2380 		 */
2381 		tmp = q->q_last;
2382 		if (mcls <= (int)queclass(tmp)) {
2383 			bp->b_next = NULL;
2384 			bp->b_prev = tmp;
2385 			tmp->b_next = bp;
2386 			q->q_last = bp;
2387 		} else {
2388 			tmp = q->q_first;
2389 			while ((int)queclass(tmp) >= mcls)
2390 				tmp = tmp->b_next;
2391 
2392 			/*
2393 			 * Insert bp before tmp.
2394 			 */
2395 			bp->b_next = tmp;
2396 			bp->b_prev = tmp->b_prev;
2397 			if (tmp->b_prev)
2398 				tmp->b_prev->b_next = bp;
2399 			else
2400 				q->q_first = bp;
2401 			tmp->b_prev = bp;
2402 		}
2403 	} else {		/* bp->b_band != 0 */
2404 		if (qbp->qb_first) {
2405 			tmp = qbp->qb_last;
2406 
2407 			/*
2408 			 * Insert bp after the last message in this band.
2409 			 */
2410 			bp->b_next = tmp->b_next;
2411 			if (tmp->b_next)
2412 				tmp->b_next->b_prev = bp;
2413 			else
2414 				q->q_last = bp;
2415 			bp->b_prev = tmp;
2416 			tmp->b_next = bp;
2417 		} else {
2418 			tmp = q->q_last;
2419 			if ((mcls < (int)queclass(tmp)) ||
2420 			    (bp->b_band <= tmp->b_band)) {
2421 
2422 				/*
2423 				 * Tack bp on end of queue.
2424 				 */
2425 				bp->b_next = NULL;
2426 				bp->b_prev = tmp;
2427 				tmp->b_next = bp;
2428 				q->q_last = bp;
2429 			} else {
2430 				tmp = q->q_first;
2431 				while (tmp->b_datap->db_type >= QPCTL)
2432 					tmp = tmp->b_next;
2433 				while (tmp->b_band >= bp->b_band)
2434 					tmp = tmp->b_next;
2435 
2436 				/*
2437 				 * Insert bp before tmp.
2438 				 */
2439 				bp->b_next = tmp;
2440 				bp->b_prev = tmp->b_prev;
2441 				if (tmp->b_prev)
2442 					tmp->b_prev->b_next = bp;
2443 				else
2444 					q->q_first = bp;
2445 				tmp->b_prev = bp;
2446 			}
2447 			qbp->qb_first = bp;
2448 		}
2449 		qbp->qb_last = bp;
2450 	}
2451 
2452 	/* Get message byte count for q_count accounting */
2453 	for (tmp = bp; tmp; tmp = tmp->b_cont) {
2454 		ADD_MBLK_SIZE(tmp, bytecnt);
2455 		mblkcnt++;
2456 	}
2457 
2458 	if (qbp) {
2459 		qbp->qb_count += bytecnt;
2460 		qbp->qb_mblkcnt += mblkcnt;
2461 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2462 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2463 			qbp->qb_flag |= QB_FULL;
2464 		}
2465 	} else {
2466 		q->q_count += bytecnt;
2467 		q->q_mblkcnt += mblkcnt;
2468 		if ((q->q_count >= q->q_hiwat) ||
2469 		    (q->q_mblkcnt >= q->q_hiwat)) {
2470 			q->q_flag |= QFULL;
2471 		}
2472 	}
2473 
2474 	STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL);
2475 
2476 	if ((mcls > QNORM) ||
2477 	    (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
2478 		qenable_locked(q);
2479 	ASSERT(MUTEX_HELD(QLOCK(q)));
2480 	if (freezer != curthread)
2481 		mutex_exit(QLOCK(q));
2482 
2483 	return (1);
2484 }
2485 
2486 /*
2487  * Put stuff back at beginning of Q according to priority order.
2488  * See comment on putq above for details.
2489  */
2490 int
2491 putbq(queue_t *q, mblk_t *bp)
2492 {
2493 	mblk_t *tmp;
2494 	qband_t *qbp = NULL;
2495 	int mcls = (int)queclass(bp);
2496 	kthread_id_t freezer;
2497 	int	bytecnt = 0, mblkcnt = 0;
2498 
2499 	ASSERT(q && bp);
2500 	ASSERT(bp->b_next == NULL);
2501 	freezer = STREAM(q)->sd_freezer;
2502 	if (freezer == curthread) {
2503 		ASSERT(frozenstr(q));
2504 		ASSERT(MUTEX_HELD(QLOCK(q)));
2505 	} else
2506 		mutex_enter(QLOCK(q));
2507 
2508 	/*
2509 	 * Make sanity checks and if qband structure is not yet
2510 	 * allocated, do so.
2511 	 */
2512 	if (mcls == QPCTL) {
2513 		if (bp->b_band != 0)
2514 			bp->b_band = 0;		/* force to be correct */
2515 	} else if (bp->b_band != 0) {
2516 		int i;
2517 		qband_t **qbpp;
2518 
2519 		if (bp->b_band > q->q_nband) {
2520 			qbpp = &q->q_bandp;
2521 			while (*qbpp)
2522 				qbpp = &(*qbpp)->qb_next;
2523 			while (bp->b_band > q->q_nband) {
2524 				if ((*qbpp = allocband()) == NULL) {
2525 					if (freezer != curthread)
2526 						mutex_exit(QLOCK(q));
2527 					return (0);
2528 				}
2529 				(*qbpp)->qb_hiwat = q->q_hiwat;
2530 				(*qbpp)->qb_lowat = q->q_lowat;
2531 				q->q_nband++;
2532 				qbpp = &(*qbpp)->qb_next;
2533 			}
2534 		}
2535 		qbp = q->q_bandp;
2536 		i = bp->b_band;
2537 		while (--i)
2538 			qbp = qbp->qb_next;
2539 	}
2540 
2541 	/*
2542 	 * If queue is empty or if message is high priority,
2543 	 * place on the front of the queue.
2544 	 */
2545 	tmp = q->q_first;
2546 	if ((!tmp) || (mcls == QPCTL)) {
2547 		bp->b_next = tmp;
2548 		if (tmp)
2549 			tmp->b_prev = bp;
2550 		else
2551 			q->q_last = bp;
2552 		q->q_first = bp;
2553 		bp->b_prev = NULL;
2554 		if (qbp) {
2555 			qbp->qb_first = bp;
2556 			qbp->qb_last = bp;
2557 		}
2558 	} else if (qbp) {	/* bp->b_band != 0 */
2559 		tmp = qbp->qb_first;
2560 		if (tmp) {
2561 
2562 			/*
2563 			 * Insert bp before the first message in this band.
2564 			 */
2565 			bp->b_next = tmp;
2566 			bp->b_prev = tmp->b_prev;
2567 			if (tmp->b_prev)
2568 				tmp->b_prev->b_next = bp;
2569 			else
2570 				q->q_first = bp;
2571 			tmp->b_prev = bp;
2572 		} else {
2573 			tmp = q->q_last;
2574 			if ((mcls < (int)queclass(tmp)) ||
2575 			    (bp->b_band < tmp->b_band)) {
2576 
2577 				/*
2578 				 * Tack bp on end of queue.
2579 				 */
2580 				bp->b_next = NULL;
2581 				bp->b_prev = tmp;
2582 				tmp->b_next = bp;
2583 				q->q_last = bp;
2584 			} else {
2585 				tmp = q->q_first;
2586 				while (tmp->b_datap->db_type >= QPCTL)
2587 					tmp = tmp->b_next;
2588 				while (tmp->b_band > bp->b_band)
2589 					tmp = tmp->b_next;
2590 
2591 				/*
2592 				 * Insert bp before tmp.
2593 				 */
2594 				bp->b_next = tmp;
2595 				bp->b_prev = tmp->b_prev;
2596 				if (tmp->b_prev)
2597 					tmp->b_prev->b_next = bp;
2598 				else
2599 					q->q_first = bp;
2600 				tmp->b_prev = bp;
2601 			}
2602 			qbp->qb_last = bp;
2603 		}
2604 		qbp->qb_first = bp;
2605 	} else {		/* bp->b_band == 0 && !QPCTL */
2606 
2607 		/*
2608 		 * If the queue class or band is less than that of the last
2609 		 * message on the queue, tack bp on the end of the queue.
2610 		 */
2611 		tmp = q->q_last;
2612 		if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
2613 			bp->b_next = NULL;
2614 			bp->b_prev = tmp;
2615 			tmp->b_next = bp;
2616 			q->q_last = bp;
2617 		} else {
2618 			tmp = q->q_first;
2619 			while (tmp->b_datap->db_type >= QPCTL)
2620 				tmp = tmp->b_next;
2621 			while (tmp->b_band > bp->b_band)
2622 				tmp = tmp->b_next;
2623 
2624 			/*
2625 			 * Insert bp before tmp.
2626 			 */
2627 			bp->b_next = tmp;
2628 			bp->b_prev = tmp->b_prev;
2629 			if (tmp->b_prev)
2630 				tmp->b_prev->b_next = bp;
2631 			else
2632 				q->q_first = bp;
2633 			tmp->b_prev = bp;
2634 		}
2635 	}
2636 
2637 	/* Get message byte count for q_count accounting */
2638 	for (tmp = bp; tmp; tmp = tmp->b_cont) {
2639 		ADD_MBLK_SIZE(tmp, bytecnt);
2640 		mblkcnt++;
2641 	}
2642 	if (qbp) {
2643 		qbp->qb_count += bytecnt;
2644 		qbp->qb_mblkcnt += mblkcnt;
2645 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2646 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2647 			qbp->qb_flag |= QB_FULL;
2648 		}
2649 	} else {
2650 		q->q_count += bytecnt;
2651 		q->q_mblkcnt += mblkcnt;
2652 		if ((q->q_count >= q->q_hiwat) ||
2653 		    (q->q_mblkcnt >= q->q_hiwat)) {
2654 			q->q_flag |= QFULL;
2655 		}
2656 	}
2657 
2658 	STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL);
2659 
2660 	if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
2661 		qenable_locked(q);
2662 	ASSERT(MUTEX_HELD(QLOCK(q)));
2663 	if (freezer != curthread)
2664 		mutex_exit(QLOCK(q));
2665 
2666 	return (1);
2667 }
2668 
2669 /*
2670  * Insert a message before an existing message on the queue.  If the
2671  * existing message is NULL, the new messages is placed on the end of
2672  * the queue.  The queue class of the new message is ignored.  However,
2673  * the priority band of the new message must adhere to the following
2674  * ordering:
2675  *
2676  *	emp->b_prev->b_band >= mp->b_band >= emp->b_band.
2677  *
2678  * All flow control parameters are updated.
2679  *
2680  * insq can be called with the stream frozen, but other utility functions
2681  * holding QLOCK, and by streams modules without any locks/frozen.
2682  */
2683 int
2684 insq(queue_t *q, mblk_t *emp, mblk_t *mp)
2685 {
2686 	mblk_t *tmp;
2687 	qband_t *qbp = NULL;
2688 	int mcls = (int)queclass(mp);
2689 	kthread_id_t freezer;
2690 	int	bytecnt = 0, mblkcnt = 0;
2691 
2692 	freezer = STREAM(q)->sd_freezer;
2693 	if (freezer == curthread) {
2694 		ASSERT(frozenstr(q));
2695 		ASSERT(MUTEX_HELD(QLOCK(q)));
2696 	} else if (MUTEX_HELD(QLOCK(q))) {
2697 		/* Don't drop lock on exit */
2698 		freezer = curthread;
2699 	} else
2700 		mutex_enter(QLOCK(q));
2701 
2702 	if (mcls == QPCTL) {
2703 		if (mp->b_band != 0)
2704 			mp->b_band = 0;		/* force to be correct */
2705 		if (emp && emp->b_prev &&
2706 		    (emp->b_prev->b_datap->db_type < QPCTL))
2707 			goto badord;
2708 	}
2709 	if (emp) {
2710 		if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
2711 		    (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
2712 		    (emp->b_prev->b_band < mp->b_band))) {
2713 			goto badord;
2714 		}
2715 	} else {
2716 		tmp = q->q_last;
2717 		if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
2718 badord:
2719 			cmn_err(CE_WARN,
2720 			    "insq: attempt to insert message out of order "
2721 			    "on q %p", (void *)q);
2722 			if (freezer != curthread)
2723 				mutex_exit(QLOCK(q));
2724 			return (0);
2725 		}
2726 	}
2727 
2728 	if (mp->b_band != 0) {
2729 		int i;
2730 		qband_t **qbpp;
2731 
2732 		if (mp->b_band > q->q_nband) {
2733 			qbpp = &q->q_bandp;
2734 			while (*qbpp)
2735 				qbpp = &(*qbpp)->qb_next;
2736 			while (mp->b_band > q->q_nband) {
2737 				if ((*qbpp = allocband()) == NULL) {
2738 					if (freezer != curthread)
2739 						mutex_exit(QLOCK(q));
2740 					return (0);
2741 				}
2742 				(*qbpp)->qb_hiwat = q->q_hiwat;
2743 				(*qbpp)->qb_lowat = q->q_lowat;
2744 				q->q_nband++;
2745 				qbpp = &(*qbpp)->qb_next;
2746 			}
2747 		}
2748 		qbp = q->q_bandp;
2749 		i = mp->b_band;
2750 		while (--i)
2751 			qbp = qbp->qb_next;
2752 	}
2753 
2754 	if ((mp->b_next = emp) != NULL) {
2755 		if ((mp->b_prev = emp->b_prev) != NULL)
2756 			emp->b_prev->b_next = mp;
2757 		else
2758 			q->q_first = mp;
2759 		emp->b_prev = mp;
2760 	} else {
2761 		if ((mp->b_prev = q->q_last) != NULL)
2762 			q->q_last->b_next = mp;
2763 		else
2764 			q->q_first = mp;
2765 		q->q_last = mp;
2766 	}
2767 
2768 	/* Get mblk and byte count for q_count accounting */
2769 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
2770 		ADD_MBLK_SIZE(tmp, bytecnt);
2771 		mblkcnt++;
2772 	}
2773 
2774 	if (qbp) {	/* adjust qband pointers and count */
2775 		if (!qbp->qb_first) {
2776 			qbp->qb_first = mp;
2777 			qbp->qb_last = mp;
2778 		} else {
2779 			if (mp->b_prev == NULL || (mp->b_prev != NULL &&
2780 			    (mp->b_prev->b_band != mp->b_band)))
2781 				qbp->qb_first = mp;
2782 			else if (mp->b_next == NULL || (mp->b_next != NULL &&
2783 			    (mp->b_next->b_band != mp->b_band)))
2784 				qbp->qb_last = mp;
2785 		}
2786 		qbp->qb_count += bytecnt;
2787 		qbp->qb_mblkcnt += mblkcnt;
2788 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2789 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2790 			qbp->qb_flag |= QB_FULL;
2791 		}
2792 	} else {
2793 		q->q_count += bytecnt;
2794 		q->q_mblkcnt += mblkcnt;
2795 		if ((q->q_count >= q->q_hiwat) ||
2796 		    (q->q_mblkcnt >= q->q_hiwat)) {
2797 			q->q_flag |= QFULL;
2798 		}
2799 	}
2800 
2801 	STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL);
2802 
2803 	if (canenable(q) && (q->q_flag & QWANTR))
2804 		qenable_locked(q);
2805 
2806 	ASSERT(MUTEX_HELD(QLOCK(q)));
2807 	if (freezer != curthread)
2808 		mutex_exit(QLOCK(q));
2809 
2810 	return (1);
2811 }
2812 
2813 /*
2814  * Create and put a control message on queue.
2815  */
2816 int
2817 putctl(queue_t *q, int type)
2818 {
2819 	mblk_t *bp;
2820 
2821 	if ((datamsg(type) && (type != M_DELAY)) ||
2822 	    (bp = allocb_tryhard(0)) == NULL)
2823 		return (0);
2824 	bp->b_datap->db_type = (unsigned char) type;
2825 
2826 	put(q, bp);
2827 
2828 	return (1);
2829 }
2830 
2831 /*
2832  * Control message with a single-byte parameter
2833  */
2834 int
2835 putctl1(queue_t *q, int type, int param)
2836 {
2837 	mblk_t *bp;
2838 
2839 	if ((datamsg(type) && (type != M_DELAY)) ||
2840 	    (bp = allocb_tryhard(1)) == NULL)
2841 		return (0);
2842 	bp->b_datap->db_type = (unsigned char)type;
2843 	*bp->b_wptr++ = (unsigned char)param;
2844 
2845 	put(q, bp);
2846 
2847 	return (1);
2848 }
2849 
2850 int
2851 putnextctl1(queue_t *q, int type, int param)
2852 {
2853 	mblk_t *bp;
2854 
2855 	if ((datamsg(type) && (type != M_DELAY)) ||
2856 		((bp = allocb_tryhard(1)) == NULL))
2857 		return (0);
2858 
2859 	bp->b_datap->db_type = (unsigned char)type;
2860 	*bp->b_wptr++ = (unsigned char)param;
2861 
2862 	putnext(q, bp);
2863 
2864 	return (1);
2865 }
2866 
2867 int
2868 putnextctl(queue_t *q, int type)
2869 {
2870 	mblk_t *bp;
2871 
2872 	if ((datamsg(type) && (type != M_DELAY)) ||
2873 		((bp = allocb_tryhard(0)) == NULL))
2874 		return (0);
2875 	bp->b_datap->db_type = (unsigned char)type;
2876 
2877 	putnext(q, bp);
2878 
2879 	return (1);
2880 }
2881 
2882 /*
2883  * Return the queue upstream from this one
2884  */
2885 queue_t *
2886 backq(queue_t *q)
2887 {
2888 	q = _OTHERQ(q);
2889 	if (q->q_next) {
2890 		q = q->q_next;
2891 		return (_OTHERQ(q));
2892 	}
2893 	return (NULL);
2894 }
2895 
2896 /*
2897  * Send a block back up the queue in reverse from this
2898  * one (e.g. to respond to ioctls)
2899  */
2900 void
2901 qreply(queue_t *q, mblk_t *bp)
2902 {
2903 	ASSERT(q && bp);
2904 
2905 	putnext(_OTHERQ(q), bp);
2906 }
2907 
2908 /*
2909  * Streams Queue Scheduling
2910  *
2911  * Queues are enabled through qenable() when they have messages to
2912  * process.  They are serviced by queuerun(), which runs each enabled
2913  * queue's service procedure.  The call to queuerun() is processor
2914  * dependent - the general principle is that it be run whenever a queue
2915  * is enabled but before returning to user level.  For system calls,
2916  * the function runqueues() is called if their action causes a queue
2917  * to be enabled.  For device interrupts, queuerun() should be
2918  * called before returning from the last level of interrupt.  Beyond
2919  * this, no timing assumptions should be made about queue scheduling.
2920  */
2921 
2922 /*
2923  * Enable a queue: put it on list of those whose service procedures are
2924  * ready to run and set up the scheduling mechanism.
2925  * The broadcast is done outside the mutex -> to avoid the woken thread
2926  * from contending with the mutex. This is OK 'cos the queue has been
2927  * enqueued on the runlist and flagged safely at this point.
2928  */
2929 void
2930 qenable(queue_t *q)
2931 {
2932 	mutex_enter(QLOCK(q));
2933 	qenable_locked(q);
2934 	mutex_exit(QLOCK(q));
2935 }
2936 /*
2937  * Return number of messages on queue
2938  */
2939 int
2940 qsize(queue_t *qp)
2941 {
2942 	int count = 0;
2943 	mblk_t *mp;
2944 
2945 	mutex_enter(QLOCK(qp));
2946 	for (mp = qp->q_first; mp; mp = mp->b_next)
2947 		count++;
2948 	mutex_exit(QLOCK(qp));
2949 	return (count);
2950 }
2951 
2952 /*
2953  * noenable - set queue so that putq() will not enable it.
2954  * enableok - set queue so that putq() can enable it.
2955  */
2956 void
2957 noenable(queue_t *q)
2958 {
2959 	mutex_enter(QLOCK(q));
2960 	q->q_flag |= QNOENB;
2961 	mutex_exit(QLOCK(q));
2962 }
2963 
2964 void
2965 enableok(queue_t *q)
2966 {
2967 	mutex_enter(QLOCK(q));
2968 	q->q_flag &= ~QNOENB;
2969 	mutex_exit(QLOCK(q));
2970 }
2971 
2972 /*
2973  * Set queue fields.
2974  */
2975 int
2976 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
2977 {
2978 	qband_t *qbp = NULL;
2979 	queue_t	*wrq;
2980 	int error = 0;
2981 	kthread_id_t freezer;
2982 
2983 	freezer = STREAM(q)->sd_freezer;
2984 	if (freezer == curthread) {
2985 		ASSERT(frozenstr(q));
2986 		ASSERT(MUTEX_HELD(QLOCK(q)));
2987 	} else
2988 		mutex_enter(QLOCK(q));
2989 
2990 	if (what >= QBAD) {
2991 		error = EINVAL;
2992 		goto done;
2993 	}
2994 	if (pri != 0) {
2995 		int i;
2996 		qband_t **qbpp;
2997 
2998 		if (pri > q->q_nband) {
2999 			qbpp = &q->q_bandp;
3000 			while (*qbpp)
3001 				qbpp = &(*qbpp)->qb_next;
3002 			while (pri > q->q_nband) {
3003 				if ((*qbpp = allocband()) == NULL) {
3004 					error = EAGAIN;
3005 					goto done;
3006 				}
3007 				(*qbpp)->qb_hiwat = q->q_hiwat;
3008 				(*qbpp)->qb_lowat = q->q_lowat;
3009 				q->q_nband++;
3010 				qbpp = &(*qbpp)->qb_next;
3011 			}
3012 		}
3013 		qbp = q->q_bandp;
3014 		i = pri;
3015 		while (--i)
3016 			qbp = qbp->qb_next;
3017 	}
3018 	switch (what) {
3019 
3020 	case QHIWAT:
3021 		if (qbp)
3022 			qbp->qb_hiwat = (size_t)val;
3023 		else
3024 			q->q_hiwat = (size_t)val;
3025 		break;
3026 
3027 	case QLOWAT:
3028 		if (qbp)
3029 			qbp->qb_lowat = (size_t)val;
3030 		else
3031 			q->q_lowat = (size_t)val;
3032 		break;
3033 
3034 	case QMAXPSZ:
3035 		if (qbp)
3036 			error = EINVAL;
3037 		else
3038 			q->q_maxpsz = (ssize_t)val;
3039 
3040 		/*
3041 		 * Performance concern, strwrite looks at the module below
3042 		 * the stream head for the maxpsz each time it does a write
3043 		 * we now cache it at the stream head.  Check to see if this
3044 		 * queue is sitting directly below the stream head.
3045 		 */
3046 		wrq = STREAM(q)->sd_wrq;
3047 		if (q != wrq->q_next)
3048 			break;
3049 
3050 		/*
3051 		 * If the stream is not frozen drop the current QLOCK and
3052 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3053 		 */
3054 		if (freezer != curthread) {
3055 			mutex_exit(QLOCK(q));
3056 			mutex_enter(QLOCK(wrq));
3057 		}
3058 		ASSERT(MUTEX_HELD(QLOCK(wrq)));
3059 
3060 		if (strmsgsz != 0) {
3061 			if (val == INFPSZ)
3062 				val = strmsgsz;
3063 			else  {
3064 				if (STREAM(q)->sd_vnode->v_type == VFIFO)
3065 					val = MIN(PIPE_BUF, val);
3066 				else
3067 					val = MIN(strmsgsz, val);
3068 			}
3069 		}
3070 		STREAM(q)->sd_qn_maxpsz = val;
3071 		if (freezer != curthread) {
3072 			mutex_exit(QLOCK(wrq));
3073 			mutex_enter(QLOCK(q));
3074 		}
3075 		break;
3076 
3077 	case QMINPSZ:
3078 		if (qbp)
3079 			error = EINVAL;
3080 		else
3081 			q->q_minpsz = (ssize_t)val;
3082 
3083 		/*
3084 		 * Performance concern, strwrite looks at the module below
3085 		 * the stream head for the maxpsz each time it does a write
3086 		 * we now cache it at the stream head.  Check to see if this
3087 		 * queue is sitting directly below the stream head.
3088 		 */
3089 		wrq = STREAM(q)->sd_wrq;
3090 		if (q != wrq->q_next)
3091 			break;
3092 
3093 		/*
3094 		 * If the stream is not frozen drop the current QLOCK and
3095 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3096 		 */
3097 		if (freezer != curthread) {
3098 			mutex_exit(QLOCK(q));
3099 			mutex_enter(QLOCK(wrq));
3100 		}
3101 		STREAM(q)->sd_qn_minpsz = (ssize_t)val;
3102 
3103 		if (freezer != curthread) {
3104 			mutex_exit(QLOCK(wrq));
3105 			mutex_enter(QLOCK(q));
3106 		}
3107 		break;
3108 
3109 	case QSTRUIOT:
3110 		if (qbp)
3111 			error = EINVAL;
3112 		else
3113 			q->q_struiot = (ushort_t)val;
3114 		break;
3115 
3116 	case QCOUNT:
3117 	case QFIRST:
3118 	case QLAST:
3119 	case QFLAG:
3120 		error = EPERM;
3121 		break;
3122 
3123 	default:
3124 		error = EINVAL;
3125 		break;
3126 	}
3127 done:
3128 	if (freezer != curthread)
3129 		mutex_exit(QLOCK(q));
3130 	return (error);
3131 }
3132 
3133 /*
3134  * Get queue fields.
3135  */
3136 int
3137 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
3138 {
3139 	qband_t 	*qbp = NULL;
3140 	int 		error = 0;
3141 	kthread_id_t 	freezer;
3142 
3143 	freezer = STREAM(q)->sd_freezer;
3144 	if (freezer == curthread) {
3145 		ASSERT(frozenstr(q));
3146 		ASSERT(MUTEX_HELD(QLOCK(q)));
3147 	} else
3148 		mutex_enter(QLOCK(q));
3149 	if (what >= QBAD) {
3150 		error = EINVAL;
3151 		goto done;
3152 	}
3153 	if (pri != 0) {
3154 		int i;
3155 		qband_t **qbpp;
3156 
3157 		if (pri > q->q_nband) {
3158 			qbpp = &q->q_bandp;
3159 			while (*qbpp)
3160 				qbpp = &(*qbpp)->qb_next;
3161 			while (pri > q->q_nband) {
3162 				if ((*qbpp = allocband()) == NULL) {
3163 					error = EAGAIN;
3164 					goto done;
3165 				}
3166 				(*qbpp)->qb_hiwat = q->q_hiwat;
3167 				(*qbpp)->qb_lowat = q->q_lowat;
3168 				q->q_nband++;
3169 				qbpp = &(*qbpp)->qb_next;
3170 			}
3171 		}
3172 		qbp = q->q_bandp;
3173 		i = pri;
3174 		while (--i)
3175 			qbp = qbp->qb_next;
3176 	}
3177 	switch (what) {
3178 	case QHIWAT:
3179 		if (qbp)
3180 			*(size_t *)valp = qbp->qb_hiwat;
3181 		else
3182 			*(size_t *)valp = q->q_hiwat;
3183 		break;
3184 
3185 	case QLOWAT:
3186 		if (qbp)
3187 			*(size_t *)valp = qbp->qb_lowat;
3188 		else
3189 			*(size_t *)valp = q->q_lowat;
3190 		break;
3191 
3192 	case QMAXPSZ:
3193 		if (qbp)
3194 			error = EINVAL;
3195 		else
3196 			*(ssize_t *)valp = q->q_maxpsz;
3197 		break;
3198 
3199 	case QMINPSZ:
3200 		if (qbp)
3201 			error = EINVAL;
3202 		else
3203 			*(ssize_t *)valp = q->q_minpsz;
3204 		break;
3205 
3206 	case QCOUNT:
3207 		if (qbp)
3208 			*(size_t *)valp = qbp->qb_count;
3209 		else
3210 			*(size_t *)valp = q->q_count;
3211 		break;
3212 
3213 	case QFIRST:
3214 		if (qbp)
3215 			*(mblk_t **)valp = qbp->qb_first;
3216 		else
3217 			*(mblk_t **)valp = q->q_first;
3218 		break;
3219 
3220 	case QLAST:
3221 		if (qbp)
3222 			*(mblk_t **)valp = qbp->qb_last;
3223 		else
3224 			*(mblk_t **)valp = q->q_last;
3225 		break;
3226 
3227 	case QFLAG:
3228 		if (qbp)
3229 			*(uint_t *)valp = qbp->qb_flag;
3230 		else
3231 			*(uint_t *)valp = q->q_flag;
3232 		break;
3233 
3234 	case QSTRUIOT:
3235 		if (qbp)
3236 			error = EINVAL;
3237 		else
3238 			*(short *)valp = q->q_struiot;
3239 		break;
3240 
3241 	default:
3242 		error = EINVAL;
3243 		break;
3244 	}
3245 done:
3246 	if (freezer != curthread)
3247 		mutex_exit(QLOCK(q));
3248 	return (error);
3249 }
3250 
3251 /*
3252  * Function awakes all in cvwait/sigwait/pollwait, on one of:
3253  *	QWANTWSYNC or QWANTR or QWANTW,
3254  *
3255  * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
3256  *	 deferred wakeup will be done. Also if strpoll() in progress then a
3257  *	 deferred pollwakeup will be done.
3258  */
3259 void
3260 strwakeq(queue_t *q, int flag)
3261 {
3262 	stdata_t 	*stp = STREAM(q);
3263 	pollhead_t 	*pl;
3264 
3265 	mutex_enter(&stp->sd_lock);
3266 	pl = &stp->sd_pollist;
3267 	if (flag & QWANTWSYNC) {
3268 		ASSERT(!(q->q_flag & QREADR));
3269 		if (stp->sd_flag & WSLEEP) {
3270 			stp->sd_flag &= ~WSLEEP;
3271 			cv_broadcast(&stp->sd_wrq->q_wait);
3272 		} else {
3273 			stp->sd_wakeq |= WSLEEP;
3274 		}
3275 
3276 		mutex_exit(&stp->sd_lock);
3277 		pollwakeup(pl, POLLWRNORM);
3278 		mutex_enter(&stp->sd_lock);
3279 
3280 		if (stp->sd_sigflags & S_WRNORM)
3281 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3282 	} else if (flag & QWANTR) {
3283 		if (stp->sd_flag & RSLEEP) {
3284 			stp->sd_flag &= ~RSLEEP;
3285 			cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
3286 		} else {
3287 			stp->sd_wakeq |= RSLEEP;
3288 		}
3289 
3290 		mutex_exit(&stp->sd_lock);
3291 		pollwakeup(pl, POLLIN | POLLRDNORM);
3292 		mutex_enter(&stp->sd_lock);
3293 
3294 		{
3295 			int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
3296 
3297 			if (events)
3298 				strsendsig(stp->sd_siglist, events, 0, 0);
3299 		}
3300 	} else {
3301 		if (stp->sd_flag & WSLEEP) {
3302 			stp->sd_flag &= ~WSLEEP;
3303 			cv_broadcast(&stp->sd_wrq->q_wait);
3304 		}
3305 
3306 		mutex_exit(&stp->sd_lock);
3307 		pollwakeup(pl, POLLWRNORM);
3308 		mutex_enter(&stp->sd_lock);
3309 
3310 		if (stp->sd_sigflags & S_WRNORM)
3311 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3312 	}
3313 	mutex_exit(&stp->sd_lock);
3314 }
3315 
3316 int
3317 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
3318 {
3319 	stdata_t *stp = STREAM(q);
3320 	int typ  = STRUIOT_STANDARD;
3321 	uio_t	 *uiop = &dp->d_uio;
3322 	dblk_t	 *dbp;
3323 	ssize_t	 uiocnt;
3324 	ssize_t	 cnt;
3325 	unsigned char *ptr;
3326 	ssize_t	 resid;
3327 	int	 error = 0;
3328 	on_trap_data_t otd;
3329 	queue_t	*stwrq;
3330 
3331 	/*
3332 	 * Plumbing may change while taking the type so store the
3333 	 * queue in a temporary variable. It doesn't matter even
3334 	 * if the we take the type from the previous plumbing,
3335 	 * that's because if the plumbing has changed when we were
3336 	 * holding the queue in a temporary variable, we can continue
3337 	 * processing the message the way it would have been processed
3338 	 * in the old plumbing, without any side effects but a bit
3339 	 * extra processing for partial ip header checksum.
3340 	 *
3341 	 * This has been done to avoid holding the sd_lock which is
3342 	 * very hot.
3343 	 */
3344 
3345 	stwrq = stp->sd_struiowrq;
3346 	if (stwrq)
3347 		typ = stwrq->q_struiot;
3348 
3349 	for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
3350 		dbp = mp->b_datap;
3351 		ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
3352 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3353 		cnt = MIN(uiocnt, uiop->uio_resid);
3354 		if (!(dbp->db_struioflag & STRUIO_SPEC) ||
3355 		    (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
3356 			/*
3357 			 * Either this mblk has already been processed
3358 			 * or there is no more room in this mblk (?).
3359 			 */
3360 			continue;
3361 		}
3362 		switch (typ) {
3363 		case STRUIOT_STANDARD:
3364 			if (noblock) {
3365 				if (on_trap(&otd, OT_DATA_ACCESS)) {
3366 					no_trap();
3367 					error = EWOULDBLOCK;
3368 					goto out;
3369 				}
3370 			}
3371 			if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
3372 				if (noblock)
3373 					no_trap();
3374 				goto out;
3375 			}
3376 			if (noblock)
3377 				no_trap();
3378 			break;
3379 
3380 		default:
3381 			error = EIO;
3382 			goto out;
3383 		}
3384 		dbp->db_struioflag |= STRUIO_DONE;
3385 		dbp->db_cksumstuff += cnt;
3386 	}
3387 out:
3388 	if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
3389 		/*
3390 		 * A fault has occured and some bytes were moved to the
3391 		 * current mblk, the uio_t has already been updated by
3392 		 * the appropriate uio routine, so also update the mblk
3393 		 * to reflect this in case this same mblk chain is used
3394 		 * again (after the fault has been handled).
3395 		 */
3396 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3397 		if (uiocnt >= resid)
3398 			dbp->db_cksumstuff += resid;
3399 	}
3400 	return (error);
3401 }
3402 
3403 /*
3404  * Try to enter queue synchronously. Any attempt to enter a closing queue will
3405  * fails. The qp->q_rwcnt keeps track of the number of successful entries so
3406  * that removeq() will not try to close the queue while a thread is inside the
3407  * queue.
3408  */
3409 static boolean_t
3410 rwnext_enter(queue_t *qp)
3411 {
3412 	mutex_enter(QLOCK(qp));
3413 	if (qp->q_flag & QWCLOSE) {
3414 		mutex_exit(QLOCK(qp));
3415 		return (B_FALSE);
3416 	}
3417 	qp->q_rwcnt++;
3418 	ASSERT(qp->q_rwcnt != 0);
3419 	mutex_exit(QLOCK(qp));
3420 	return (B_TRUE);
3421 }
3422 
3423 /*
3424  * Decrease the count of threads running in sync stream queue and wake up any
3425  * threads blocked in removeq().
3426  */
3427 static void
3428 rwnext_exit(queue_t *qp)
3429 {
3430 	mutex_enter(QLOCK(qp));
3431 	qp->q_rwcnt--;
3432 	if (qp->q_flag & QWANTRMQSYNC) {
3433 		qp->q_flag &= ~QWANTRMQSYNC;
3434 		cv_broadcast(&qp->q_wait);
3435 	}
3436 	mutex_exit(QLOCK(qp));
3437 }
3438 
3439 /*
3440  * The purpose of rwnext() is to call the rw procedure of the next
3441  * (downstream) modules queue.
3442  *
3443  * treated as put entrypoint for perimeter syncronization.
3444  *
3445  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3446  * sync queues). If it is CIPUT sync queue sq_count is incremented and it does
3447  * not matter if any regular put entrypoints have been already entered. We
3448  * can't increment one of the sq_putcounts (instead of sq_count) because
3449  * qwait_rw won't know which counter to decrement.
3450  *
3451  * It would be reasonable to add the lockless FASTPUT logic.
3452  */
3453 int
3454 rwnext(queue_t *qp, struiod_t *dp)
3455 {
3456 	queue_t		*nqp;
3457 	syncq_t		*sq;
3458 	uint16_t	count;
3459 	uint16_t	flags;
3460 	struct qinit	*qi;
3461 	int		(*proc)();
3462 	struct stdata	*stp;
3463 	int		isread;
3464 	int		rval;
3465 
3466 	stp = STREAM(qp);
3467 	/*
3468 	 * Prevent q_next from changing by holding sd_lock until acquiring
3469 	 * SQLOCK. Note that a read-side rwnext from the streamhead will
3470 	 * already have sd_lock acquired. In either case sd_lock is always
3471 	 * released after acquiring SQLOCK.
3472 	 *
3473 	 * The streamhead read-side holding sd_lock when calling rwnext is
3474 	 * required to prevent a race condition were M_DATA mblks flowing
3475 	 * up the read-side of the stream could be bypassed by a rwnext()
3476 	 * down-call. In this case sd_lock acts as the streamhead perimeter.
3477 	 */
3478 	if ((nqp = _WR(qp)) == qp) {
3479 		isread = 0;
3480 		mutex_enter(&stp->sd_lock);
3481 		qp = nqp->q_next;
3482 	} else {
3483 		isread = 1;
3484 		if (nqp != stp->sd_wrq)
3485 			/* Not streamhead */
3486 			mutex_enter(&stp->sd_lock);
3487 		qp = _RD(nqp->q_next);
3488 	}
3489 	qi = qp->q_qinfo;
3490 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
3491 		/*
3492 		 * Not a synchronous module or no r/w procedure for this
3493 		 * queue, so just return EINVAL and let the caller handle it.
3494 		 */
3495 		mutex_exit(&stp->sd_lock);
3496 		return (EINVAL);
3497 	}
3498 
3499 	if (rwnext_enter(qp) == B_FALSE) {
3500 		mutex_exit(&stp->sd_lock);
3501 		return (EINVAL);
3502 	}
3503 
3504 	sq = qp->q_syncq;
3505 	mutex_enter(SQLOCK(sq));
3506 	mutex_exit(&stp->sd_lock);
3507 	count = sq->sq_count;
3508 	flags = sq->sq_flags;
3509 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3510 
3511 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3512 		/*
3513 		 * if this queue is being closed, return.
3514 		 */
3515 		if (qp->q_flag & QWCLOSE) {
3516 			mutex_exit(SQLOCK(sq));
3517 			rwnext_exit(qp);
3518 			return (EINVAL);
3519 		}
3520 
3521 		/*
3522 		 * Wait until we can enter the inner perimeter.
3523 		 */
3524 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3525 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3526 		count = sq->sq_count;
3527 		flags = sq->sq_flags;
3528 	}
3529 
3530 	if (isread == 0 && stp->sd_struiowrq == NULL ||
3531 	    isread == 1 && stp->sd_struiordq == NULL) {
3532 		/*
3533 		 * Stream plumbing changed while waiting for inner perimeter
3534 		 * so just return EINVAL and let the caller handle it.
3535 		 */
3536 		mutex_exit(SQLOCK(sq));
3537 		rwnext_exit(qp);
3538 		return (EINVAL);
3539 	}
3540 	if (!(flags & SQ_CIPUT))
3541 		sq->sq_flags = flags | SQ_EXCL;
3542 	sq->sq_count = count + 1;
3543 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3544 	/*
3545 	 * Note: The only message ordering guarantee that rwnext() makes is
3546 	 *	 for the write queue flow-control case. All others (r/w queue
3547 	 *	 with q_count > 0 (or q_first != 0)) are the resposibilty of
3548 	 *	 the queue's rw procedure. This could be genralized here buy
3549 	 *	 running the queue's service procedure, but that wouldn't be
3550 	 *	 the most efficent for all cases.
3551 	 */
3552 	mutex_exit(SQLOCK(sq));
3553 	if (! isread && (qp->q_flag & QFULL)) {
3554 		/*
3555 		 * Write queue may be flow controlled. If so,
3556 		 * mark the queue for wakeup when it's not.
3557 		 */
3558 		mutex_enter(QLOCK(qp));
3559 		if (qp->q_flag & QFULL) {
3560 			qp->q_flag |= QWANTWSYNC;
3561 			mutex_exit(QLOCK(qp));
3562 			rval = EWOULDBLOCK;
3563 			goto out;
3564 		}
3565 		mutex_exit(QLOCK(qp));
3566 	}
3567 
3568 	if (! isread && dp->d_mp)
3569 		STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
3570 		    dp->d_mp->b_datap->db_base);
3571 
3572 	rval = (*proc)(qp, dp);
3573 
3574 	if (isread && dp->d_mp)
3575 		STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
3576 		    dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
3577 out:
3578 	/*
3579 	 * The queue is protected from being freed by sq_count, so it is
3580 	 * safe to call rwnext_exit and reacquire SQLOCK(sq).
3581 	 */
3582 	rwnext_exit(qp);
3583 
3584 	mutex_enter(SQLOCK(sq));
3585 	flags = sq->sq_flags;
3586 	ASSERT(sq->sq_count != 0);
3587 	sq->sq_count--;
3588 	if (flags & SQ_TAIL) {
3589 		putnext_tail(sq, qp, flags);
3590 		/*
3591 		 * The only purpose of this ASSERT is to preserve calling stack
3592 		 * in DEBUG kernel.
3593 		 */
3594 		ASSERT(flags & SQ_TAIL);
3595 		return (rval);
3596 	}
3597 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3598 	/*
3599 	 * Safe to always drop SQ_EXCL:
3600 	 *	Not SQ_CIPUT means we set SQ_EXCL above
3601 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3602 	 *	did a qwriter(INNER) in which case nobody else
3603 	 *	is in the inner perimeter and we are exiting.
3604 	 *
3605 	 * I would like to make the following assertion:
3606 	 *
3607 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3608 	 * 	sq->sq_count == 0);
3609 	 *
3610 	 * which indicates that if we are both putshared and exclusive,
3611 	 * we became exclusive while executing the putproc, and the only
3612 	 * claim on the syncq was the one we dropped a few lines above.
3613 	 * But other threads that enter putnext while the syncq is exclusive
3614 	 * need to make a claim as they may need to drop SQLOCK in the
3615 	 * has_writers case to avoid deadlocks.  If these threads are
3616 	 * delayed or preempted, it is possible that the writer thread can
3617 	 * find out that there are other claims making the (sq_count == 0)
3618 	 * test invalid.
3619 	 */
3620 
3621 	sq->sq_flags = flags & ~SQ_EXCL;
3622 	if (sq->sq_flags & SQ_WANTWAKEUP) {
3623 		sq->sq_flags &= ~SQ_WANTWAKEUP;
3624 		cv_broadcast(&sq->sq_wait);
3625 	}
3626 	mutex_exit(SQLOCK(sq));
3627 	return (rval);
3628 }
3629 
3630 /*
3631  * The purpose of infonext() is to call the info procedure of the next
3632  * (downstream) modules queue.
3633  *
3634  * treated as put entrypoint for perimeter syncronization.
3635  *
3636  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3637  * sync queues). If it is CIPUT sync queue regular sq_count is incremented and
3638  * it does not matter if any regular put entrypoints have been already
3639  * entered.
3640  */
3641 int
3642 infonext(queue_t *qp, infod_t *idp)
3643 {
3644 	queue_t		*nqp;
3645 	syncq_t		*sq;
3646 	uint16_t	count;
3647 	uint16_t 	flags;
3648 	struct qinit	*qi;
3649 	int		(*proc)();
3650 	struct stdata	*stp;
3651 	int		rval;
3652 
3653 	stp = STREAM(qp);
3654 	/*
3655 	 * Prevent q_next from changing by holding sd_lock until
3656 	 * acquiring SQLOCK.
3657 	 */
3658 	mutex_enter(&stp->sd_lock);
3659 	if ((nqp = _WR(qp)) == qp) {
3660 		qp = nqp->q_next;
3661 	} else {
3662 		qp = _RD(nqp->q_next);
3663 	}
3664 	qi = qp->q_qinfo;
3665 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
3666 		mutex_exit(&stp->sd_lock);
3667 		return (EINVAL);
3668 	}
3669 	sq = qp->q_syncq;
3670 	mutex_enter(SQLOCK(sq));
3671 	mutex_exit(&stp->sd_lock);
3672 	count = sq->sq_count;
3673 	flags = sq->sq_flags;
3674 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3675 
3676 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3677 		/*
3678 		 * Wait until we can enter the inner perimeter.
3679 		 */
3680 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3681 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3682 		count = sq->sq_count;
3683 		flags = sq->sq_flags;
3684 	}
3685 
3686 	if (! (flags & SQ_CIPUT))
3687 		sq->sq_flags = flags | SQ_EXCL;
3688 	sq->sq_count = count + 1;
3689 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3690 	mutex_exit(SQLOCK(sq));
3691 
3692 	rval = (*proc)(qp, idp);
3693 
3694 	mutex_enter(SQLOCK(sq));
3695 	flags = sq->sq_flags;
3696 	ASSERT(sq->sq_count != 0);
3697 	sq->sq_count--;
3698 	if (flags & SQ_TAIL) {
3699 		putnext_tail(sq, qp, flags);
3700 		/*
3701 		 * The only purpose of this ASSERT is to preserve calling stack
3702 		 * in DEBUG kernel.
3703 		 */
3704 		ASSERT(flags & SQ_TAIL);
3705 		return (rval);
3706 	}
3707 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3708 /*
3709  * XXXX
3710  * I am not certain the next comment is correct here.  I need to consider
3711  * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
3712  * might cause other problems.  It just might be safer to drop it if
3713  * !SQ_CIPUT because that is when we set it.
3714  */
3715 	/*
3716 	 * Safe to always drop SQ_EXCL:
3717 	 *	Not SQ_CIPUT means we set SQ_EXCL above
3718 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3719 	 *	did a qwriter(INNER) in which case nobody else
3720 	 *	is in the inner perimeter and we are exiting.
3721 	 *
3722 	 * I would like to make the following assertion:
3723 	 *
3724 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3725 	 *	sq->sq_count == 0);
3726 	 *
3727 	 * which indicates that if we are both putshared and exclusive,
3728 	 * we became exclusive while executing the putproc, and the only
3729 	 * claim on the syncq was the one we dropped a few lines above.
3730 	 * But other threads that enter putnext while the syncq is exclusive
3731 	 * need to make a claim as they may need to drop SQLOCK in the
3732 	 * has_writers case to avoid deadlocks.  If these threads are
3733 	 * delayed or preempted, it is possible that the writer thread can
3734 	 * find out that there are other claims making the (sq_count == 0)
3735 	 * test invalid.
3736 	 */
3737 
3738 	sq->sq_flags = flags & ~SQ_EXCL;
3739 	mutex_exit(SQLOCK(sq));
3740 	return (rval);
3741 }
3742 
3743 /*
3744  * Return nonzero if the queue is responsible for struio(), else return 0.
3745  */
3746 int
3747 isuioq(queue_t *q)
3748 {
3749 	if (q->q_flag & QREADR)
3750 		return (STREAM(q)->sd_struiordq == q);
3751 	else
3752 		return (STREAM(q)->sd_struiowrq == q);
3753 }
3754 
3755 #if defined(__sparc)
3756 int disable_putlocks = 0;
3757 #else
3758 int disable_putlocks = 1;
3759 #endif
3760 
3761 /*
3762  * called by create_putlock.
3763  */
3764 static void
3765 create_syncq_putlocks(queue_t *q)
3766 {
3767 	syncq_t	*sq = q->q_syncq;
3768 	ciputctrl_t *cip;
3769 	int i;
3770 
3771 	ASSERT(sq != NULL);
3772 
3773 	ASSERT(disable_putlocks == 0);
3774 	ASSERT(n_ciputctrl >= min_n_ciputctrl);
3775 	ASSERT(ciputctrl_cache != NULL);
3776 
3777 	if (!(sq->sq_type & SQ_CIPUT))
3778 		return;
3779 
3780 	for (i = 0; i <= 1; i++) {
3781 		if (sq->sq_ciputctrl == NULL) {
3782 			cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
3783 			SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
3784 			mutex_enter(SQLOCK(sq));
3785 			if (sq->sq_ciputctrl != NULL) {
3786 				mutex_exit(SQLOCK(sq));
3787 				kmem_cache_free(ciputctrl_cache, cip);
3788 			} else {
3789 				ASSERT(sq->sq_nciputctrl == 0);
3790 				sq->sq_nciputctrl = n_ciputctrl - 1;
3791 				/*
3792 				 * putnext checks sq_ciputctrl without holding
3793 				 * SQLOCK. if it is not NULL putnext assumes
3794 				 * sq_nciputctrl is initialized. membar below
3795 				 * insures that.
3796 				 */
3797 				membar_producer();
3798 				sq->sq_ciputctrl = cip;
3799 				mutex_exit(SQLOCK(sq));
3800 			}
3801 		}
3802 		ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
3803 		if (i == 1)
3804 			break;
3805 		q = _OTHERQ(q);
3806 		if (!(q->q_flag & QPERQ)) {
3807 			ASSERT(sq == q->q_syncq);
3808 			break;
3809 		}
3810 		ASSERT(q->q_syncq != NULL);
3811 		ASSERT(sq != q->q_syncq);
3812 		sq = q->q_syncq;
3813 		ASSERT(sq->sq_type & SQ_CIPUT);
3814 	}
3815 }
3816 
3817 /*
3818  * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
3819  * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
3820  * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
3821  * starting from q and down to the driver.
3822  *
3823  * This should be called after the affected queues are part of stream
3824  * geometry. It should be called from driver/module open routine after
3825  * qprocson() call. It is also called from nfs syscall where it is known that
3826  * stream is configured and won't change its geometry during create_putlock
3827  * call.
3828  *
3829  * caller normally uses 0 value for the stream argument to speed up MT putnext
3830  * into the perimeter of q for example because its perimeter is per module
3831  * (e.g. IP).
3832  *
3833  * caller normally uses non 0 value for the stream argument to hint the system
3834  * that the stream of q is a very contended global system stream
3835  * (e.g. NFS/UDP) and the part of the stream from q to the driver is
3836  * particularly MT hot.
3837  *
3838  * Caller insures stream plumbing won't happen while we are here and therefore
3839  * q_next can be safely used.
3840  */
3841 
3842 void
3843 create_putlocks(queue_t *q, int stream)
3844 {
3845 	ciputctrl_t	*cip;
3846 	struct stdata	*stp = STREAM(q);
3847 
3848 	q = _WR(q);
3849 	ASSERT(stp != NULL);
3850 
3851 	if (disable_putlocks != 0)
3852 		return;
3853 
3854 	if (n_ciputctrl < min_n_ciputctrl)
3855 		return;
3856 
3857 	ASSERT(ciputctrl_cache != NULL);
3858 
3859 	if (stream != 0 && stp->sd_ciputctrl == NULL) {
3860 		cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
3861 		SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
3862 		mutex_enter(&stp->sd_lock);
3863 		if (stp->sd_ciputctrl != NULL) {
3864 			mutex_exit(&stp->sd_lock);
3865 			kmem_cache_free(ciputctrl_cache, cip);
3866 		} else {
3867 			ASSERT(stp->sd_nciputctrl == 0);
3868 			stp->sd_nciputctrl = n_ciputctrl - 1;
3869 			/*
3870 			 * putnext checks sd_ciputctrl without holding
3871 			 * sd_lock. if it is not NULL putnext assumes
3872 			 * sd_nciputctrl is initialized. membar below
3873 			 * insures that.
3874 			 */
3875 			membar_producer();
3876 			stp->sd_ciputctrl = cip;
3877 			mutex_exit(&stp->sd_lock);
3878 		}
3879 	}
3880 
3881 	ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);
3882 
3883 	while (_SAMESTR(q)) {
3884 		create_syncq_putlocks(q);
3885 		if (stream == 0)
3886 			return;
3887 		q = q->q_next;
3888 	}
3889 	ASSERT(q != NULL);
3890 	create_syncq_putlocks(q);
3891 }
3892 
3893 /*
3894  * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
3895  * through a stream.
3896  *
3897  * Data currently record per event is a hrtime stamp, queue address, event
3898  * type, and a per type datum.  Much of the STREAMS framework is instrumented
3899  * for automatic flow tracing (when enabled).  Events can be defined and used
3900  * by STREAMS modules and drivers.
3901  *
3902  * Global objects:
3903  *
3904  *	str_ftevent() - Add a flow-trace event to a dblk.
3905  *	str_ftfree() - Free flow-trace data
3906  *
3907  * Local objects:
3908  *
3909  *	fthdr_cache - pointer to the kmem cache for trace header.
3910  *	ftblk_cache - pointer to the kmem cache for trace data blocks.
3911  */
3912 
3913 int str_ftnever = 1;	/* Don't do STREAMS flow tracing */
3914 
3915 void
3916 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
3917 {
3918 	ftblk_t *bp = hp->tail;
3919 	ftblk_t *nbp;
3920 	ftevnt_t *ep;
3921 	int ix, nix;
3922 
3923 	ASSERT(hp != NULL);
3924 
3925 	for (;;) {
3926 		if ((ix = bp->ix) == FTBLK_EVNTS) {
3927 			/*
3928 			 * Tail doesn't have room, so need a new tail.
3929 			 *
3930 			 * To make this MT safe, first, allocate a new
3931 			 * ftblk, and initialize it.  To make life a
3932 			 * little easier, reserve the first slot (mostly
3933 			 * by making ix = 1).  When we are finished with
3934 			 * the initialization, CAS this pointer to the
3935 			 * tail.  If this succeeds, this is the new
3936 			 * "next" block.  Otherwise, another thread
3937 			 * got here first, so free the block and start
3938 			 * again.
3939 			 */
3940 			if (!(nbp = kmem_cache_alloc(ftblk_cache,
3941 			    KM_NOSLEEP))) {
3942 				/* no mem, so punt */
3943 				str_ftnever++;
3944 				/* free up all flow data? */
3945 				return;
3946 			}
3947 			nbp->nxt = NULL;
3948 			nbp->ix = 1;
3949 			/*
3950 			 * Just in case there is another thread about
3951 			 * to get the next index, we need to make sure
3952 			 * the value is there for it.
3953 			 */
3954 			membar_producer();
3955 			if (casptr(&hp->tail, bp, nbp) == bp) {
3956 				/* CAS was successful */
3957 				bp->nxt = nbp;
3958 				membar_producer();
3959 				bp = nbp;
3960 				ix = 0;
3961 				goto cas_good;
3962 			} else {
3963 				kmem_cache_free(ftblk_cache, nbp);
3964 				bp = hp->tail;
3965 				continue;
3966 			}
3967 		}
3968 		nix = ix + 1;
3969 		if (cas32((uint32_t *)&bp->ix, ix, nix) == ix) {
3970 		cas_good:
3971 			if (curthread != hp->thread) {
3972 				hp->thread = curthread;
3973 				evnt |= FTEV_CS;
3974 			}
3975 			if (CPU->cpu_seqid != hp->cpu_seqid) {
3976 				hp->cpu_seqid = CPU->cpu_seqid;
3977 				evnt |= FTEV_PS;
3978 			}
3979 			ep = &bp->ev[ix];
3980 			break;
3981 		}
3982 	}
3983 
3984 	if (evnt & FTEV_QMASK) {
3985 		queue_t *qp = p;
3986 
3987 		/*
3988 		 * It is possible that the module info is broke
3989 		 * (as is logsubr.c at this comment writing).
3990 		 * Instead of panicing or doing other unmentionables,
3991 		 * we shall put a dummy name as the mid, and continue.
3992 		 */
3993 		if (qp->q_qinfo == NULL)
3994 			ep->mid = "NONAME";
3995 		else
3996 			ep->mid = qp->q_qinfo->qi_minfo->mi_idname;
3997 
3998 		if (!(qp->q_flag & QREADR))
3999 			evnt |= FTEV_ISWR;
4000 	} else {
4001 		ep->mid = (char *)p;
4002 	}
4003 
4004 	ep->ts = gethrtime();
4005 	ep->evnt = evnt;
4006 	ep->data = data;
4007 	hp->hash = (hp->hash << 9) + hp->hash;
4008 	hp->hash += (evnt << 16) | data;
4009 	hp->hash += (uintptr_t)ep->mid;
4010 }
4011 
4012 /*
4013  * Free flow-trace data.
4014  */
4015 void
4016 str_ftfree(dblk_t *dbp)
4017 {
4018 	fthdr_t *hp = dbp->db_fthdr;
4019 	ftblk_t *bp = &hp->first;
4020 	ftblk_t *nbp;
4021 
4022 	if (bp != hp->tail || bp->ix != 0) {
4023 		/*
4024 		 * Clear out the hash, have the tail point to itself, and free
4025 		 * any continuation blocks.
4026 		 */
4027 		bp = hp->first.nxt;
4028 		hp->tail = &hp->first;
4029 		hp->hash = 0;
4030 		hp->first.nxt = NULL;
4031 		hp->first.ix = 0;
4032 		while (bp != NULL) {
4033 			nbp = bp->nxt;
4034 			kmem_cache_free(ftblk_cache, bp);
4035 			bp = nbp;
4036 		}
4037 	}
4038 	kmem_cache_free(fthdr_cache, hp);
4039 	dbp->db_fthdr = NULL;
4040 }
4041