xref: /titanic_44/usr/src/uts/common/io/stream.c (revision 3289e1bdec732dbdf04517907516fd0d00f6a6f8)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
22 /*	  All Rights Reserved  	*/
23 
24 
25 /*
26  * Copyright 2007 Sun Microsystems, Inc.  All rights reserved.
27  * Use is subject to license terms.
28  */
29 
30 #pragma ident	"%Z%%M%	%I%	%E% SMI"
31 
32 #include <sys/types.h>
33 #include <sys/param.h>
34 #include <sys/thread.h>
35 #include <sys/sysmacros.h>
36 #include <sys/stropts.h>
37 #include <sys/stream.h>
38 #include <sys/strsubr.h>
39 #include <sys/strsun.h>
40 #include <sys/conf.h>
41 #include <sys/debug.h>
42 #include <sys/cmn_err.h>
43 #include <sys/kmem.h>
44 #include <sys/atomic.h>
45 #include <sys/errno.h>
46 #include <sys/vtrace.h>
47 #include <sys/ftrace.h>
48 #include <sys/ontrap.h>
49 #include <sys/multidata.h>
50 #include <sys/multidata_impl.h>
51 #include <sys/sdt.h>
52 #include <sys/strft.h>
53 
54 #ifdef DEBUG
55 #include <sys/kmem_impl.h>
56 #endif
57 
58 /*
59  * This file contains all the STREAMS utility routines that may
60  * be used by modules and drivers.
61  */
62 
63 /*
64  * STREAMS message allocator: principles of operation
65  *
66  * The streams message allocator consists of all the routines that
67  * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
68  * dupb(), freeb() and freemsg().  What follows is a high-level view
69  * of how the allocator works.
70  *
71  * Every streams message consists of one or more mblks, a dblk, and data.
72  * All mblks for all types of messages come from a common mblk_cache.
73  * The dblk and data come in several flavors, depending on how the
74  * message is allocated:
75  *
76  * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
77  *     fixed-size dblk/data caches. For message sizes that are multiples of
78  *     PAGESIZE, dblks are allocated separately from the buffer.
79  *     The associated buffer is allocated by the constructor using kmem_alloc().
80  *     For all other message sizes, dblk and its associated data is allocated
81  *     as a single contiguous chunk of memory.
82  *     Objects in these caches consist of a dblk plus its associated data.
83  *     allocb() determines the nearest-size cache by table lookup:
84  *     the dblk_cache[] array provides the mapping from size to dblk cache.
85  *
86  * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
87  *     kmem_alloc()'ing a buffer for the data and supplying that
88  *     buffer to gesballoc(), described below.
89  *
90  * (3) The four flavors of [d]esballoc[a] are all implemented by a
91  *     common routine, gesballoc() ("generic esballoc").  gesballoc()
92  *     allocates a dblk from the global dblk_esb_cache and sets db_base,
93  *     db_lim and db_frtnp to describe the caller-supplied buffer.
94  *
95  * While there are several routines to allocate messages, there is only
96  * one routine to free messages: freeb().  freeb() simply invokes the
97  * dblk's free method, dbp->db_free(), which is set at allocation time.
98  *
99  * dupb() creates a new reference to a message by allocating a new mblk,
100  * incrementing the dblk reference count and setting the dblk's free
101  * method to dblk_decref().  The dblk's original free method is retained
102  * in db_lastfree.  dblk_decref() decrements the reference count on each
103  * freeb().  If this is not the last reference it just frees the mblk;
104  * if this *is* the last reference, it restores db_free to db_lastfree,
105  * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
106  *
107  * The implementation makes aggressive use of kmem object caching for
108  * maximum performance.  This makes the code simple and compact, but
109  * also a bit abstruse in some places.  The invariants that constitute a
110  * message's constructed state, described below, are more subtle than usual.
111  *
112  * Every dblk has an "attached mblk" as part of its constructed state.
113  * The mblk is allocated by the dblk's constructor and remains attached
114  * until the message is either dup'ed or pulled up.  In the dupb() case
115  * the mblk association doesn't matter until the last free, at which time
116  * dblk_decref() attaches the last mblk to the dblk.  pullupmsg() affects
117  * the mblk association because it swaps the leading mblks of two messages,
118  * so it is responsible for swapping their db_mblk pointers accordingly.
119  * From a constructed-state viewpoint it doesn't matter that a dblk's
120  * attached mblk can change while the message is allocated; all that
121  * matters is that the dblk has *some* attached mblk when it's freed.
122  *
123  * The sizes of the allocb() small-message caches are not magical.
124  * They represent a good trade-off between internal and external
125  * fragmentation for current workloads.  They should be reevaluated
126  * periodically, especially if allocations larger than DBLK_MAX_CACHE
127  * become common.  We use 64-byte alignment so that dblks don't
128  * straddle cache lines unnecessarily.
129  */
130 #define	DBLK_MAX_CACHE		73728
131 #define	DBLK_CACHE_ALIGN	64
132 #define	DBLK_MIN_SIZE		8
133 #define	DBLK_SIZE_SHIFT		3
134 
135 #ifdef _BIG_ENDIAN
136 #define	DBLK_RTFU_SHIFT(field)	\
137 	(8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
138 #else
139 #define	DBLK_RTFU_SHIFT(field)	\
140 	(8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
141 #endif
142 
143 #define	DBLK_RTFU(ref, type, flags, uioflag)	\
144 	(((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
145 	((type) << DBLK_RTFU_SHIFT(db_type)) | \
146 	(((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
147 	((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
148 #define	DBLK_RTFU_REF_MASK	(DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
149 #define	DBLK_RTFU_WORD(dbp)	(*((uint32_t *)&(dbp)->db_ref))
150 #define	MBLK_BAND_FLAG_WORD(mp)	(*((uint32_t *)&(mp)->b_band))
151 
152 static size_t dblk_sizes[] = {
153 #ifdef _LP64
154 	16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3920,
155 	8192, 12112, 16384, 20304, 24576, 28496, 32768, 36688,
156 	40960, 44880, 49152, 53072, 57344, 61264, 65536, 69456,
157 #else
158 	64, 128, 320, 576, 1088, 1536, 1984, 2624, 3968,
159 	8192, 12160, 16384, 20352, 24576, 28544, 32768, 36736,
160 	40960, 44928, 49152, 53120, 57344, 61312, 65536, 69504,
161 #endif
162 	DBLK_MAX_CACHE, 0
163 };
164 
165 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
166 static struct kmem_cache *mblk_cache;
167 static struct kmem_cache *dblk_esb_cache;
168 static struct kmem_cache *fthdr_cache;
169 static struct kmem_cache *ftblk_cache;
170 
171 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
172 static mblk_t *allocb_oversize(size_t size, int flags);
173 static int allocb_tryhard_fails;
174 static void frnop_func(void *arg);
175 frtn_t frnop = { frnop_func };
176 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
177 
178 static boolean_t rwnext_enter(queue_t *qp);
179 static void rwnext_exit(queue_t *qp);
180 
181 /*
182  * Patchable mblk/dblk kmem_cache flags.
183  */
184 int dblk_kmem_flags = 0;
185 int mblk_kmem_flags = 0;
186 
187 
188 static int
189 dblk_constructor(void *buf, void *cdrarg, int kmflags)
190 {
191 	dblk_t *dbp = buf;
192 	ssize_t msg_size = (ssize_t)cdrarg;
193 	size_t index;
194 
195 	ASSERT(msg_size != 0);
196 
197 	index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
198 
199 	ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
200 
201 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
202 		return (-1);
203 	if ((msg_size & PAGEOFFSET) == 0) {
204 		dbp->db_base = kmem_alloc(msg_size, kmflags);
205 		if (dbp->db_base == NULL) {
206 			kmem_cache_free(mblk_cache, dbp->db_mblk);
207 			return (-1);
208 		}
209 	} else {
210 		dbp->db_base = (unsigned char *)&dbp[1];
211 	}
212 
213 	dbp->db_mblk->b_datap = dbp;
214 	dbp->db_cache = dblk_cache[index];
215 	dbp->db_lim = dbp->db_base + msg_size;
216 	dbp->db_free = dbp->db_lastfree = dblk_lastfree;
217 	dbp->db_frtnp = NULL;
218 	dbp->db_fthdr = NULL;
219 	dbp->db_credp = NULL;
220 	dbp->db_cpid = -1;
221 	dbp->db_struioflag = 0;
222 	dbp->db_struioun.cksum.flags = 0;
223 	return (0);
224 }
225 
226 /*ARGSUSED*/
227 static int
228 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
229 {
230 	dblk_t *dbp = buf;
231 
232 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
233 		return (-1);
234 	dbp->db_mblk->b_datap = dbp;
235 	dbp->db_cache = dblk_esb_cache;
236 	dbp->db_fthdr = NULL;
237 	dbp->db_credp = NULL;
238 	dbp->db_cpid = -1;
239 	dbp->db_struioflag = 0;
240 	dbp->db_struioun.cksum.flags = 0;
241 	return (0);
242 }
243 
244 static int
245 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
246 {
247 	dblk_t *dbp = buf;
248 	bcache_t *bcp = (bcache_t *)cdrarg;
249 
250 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
251 		return (-1);
252 
253 	if ((dbp->db_base = (unsigned char *)kmem_cache_alloc(bcp->buffer_cache,
254 	    kmflags)) == NULL) {
255 		kmem_cache_free(mblk_cache, dbp->db_mblk);
256 		return (-1);
257 	}
258 
259 	dbp->db_mblk->b_datap = dbp;
260 	dbp->db_cache = (void *)bcp;
261 	dbp->db_lim = dbp->db_base + bcp->size;
262 	dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
263 	dbp->db_frtnp = NULL;
264 	dbp->db_fthdr = NULL;
265 	dbp->db_credp = NULL;
266 	dbp->db_cpid = -1;
267 	dbp->db_struioflag = 0;
268 	dbp->db_struioun.cksum.flags = 0;
269 	return (0);
270 }
271 
272 /*ARGSUSED*/
273 static void
274 dblk_destructor(void *buf, void *cdrarg)
275 {
276 	dblk_t *dbp = buf;
277 	ssize_t msg_size = (ssize_t)cdrarg;
278 
279 	ASSERT(dbp->db_mblk->b_datap == dbp);
280 
281 	ASSERT(msg_size != 0);
282 
283 	ASSERT(dbp->db_struioflag == 0);
284 	ASSERT(dbp->db_struioun.cksum.flags == 0);
285 
286 	if ((msg_size & PAGEOFFSET) == 0) {
287 		kmem_free(dbp->db_base, msg_size);
288 	}
289 
290 	kmem_cache_free(mblk_cache, dbp->db_mblk);
291 }
292 
293 static void
294 bcache_dblk_destructor(void *buf, void *cdrarg)
295 {
296 	dblk_t *dbp = buf;
297 	bcache_t *bcp = (bcache_t *)cdrarg;
298 
299 	kmem_cache_free(bcp->buffer_cache, dbp->db_base);
300 
301 	ASSERT(dbp->db_mblk->b_datap == dbp);
302 
303 	ASSERT(dbp->db_struioflag == 0);
304 	ASSERT(dbp->db_struioun.cksum.flags == 0);
305 
306 	kmem_cache_free(mblk_cache, dbp->db_mblk);
307 }
308 
309 void
310 streams_msg_init(void)
311 {
312 	char name[40];
313 	size_t size;
314 	size_t lastsize = DBLK_MIN_SIZE;
315 	size_t *sizep;
316 	struct kmem_cache *cp;
317 	size_t tot_size;
318 	int offset;
319 
320 	mblk_cache = kmem_cache_create("streams_mblk",
321 		sizeof (mblk_t), 32, NULL, NULL, NULL, NULL, NULL,
322 		mblk_kmem_flags);
323 
324 	for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
325 
326 		if ((offset = (size & PAGEOFFSET)) != 0) {
327 			/*
328 			 * We are in the middle of a page, dblk should
329 			 * be allocated on the same page
330 			 */
331 			tot_size = size + sizeof (dblk_t);
332 			ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
333 								< PAGESIZE);
334 			ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
335 
336 		} else {
337 
338 			/*
339 			 * buf size is multiple of page size, dblk and
340 			 * buffer are allocated separately.
341 			 */
342 
343 			ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
344 			tot_size = sizeof (dblk_t);
345 		}
346 
347 		(void) sprintf(name, "streams_dblk_%ld", size);
348 		cp = kmem_cache_create(name, tot_size,
349 			DBLK_CACHE_ALIGN, dblk_constructor,
350 			dblk_destructor, NULL,
351 			(void *)(size), NULL, dblk_kmem_flags);
352 
353 		while (lastsize <= size) {
354 			dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
355 			lastsize += DBLK_MIN_SIZE;
356 		}
357 	}
358 
359 	dblk_esb_cache = kmem_cache_create("streams_dblk_esb",
360 			sizeof (dblk_t), DBLK_CACHE_ALIGN,
361 			dblk_esb_constructor, dblk_destructor, NULL,
362 			(void *) sizeof (dblk_t), NULL, dblk_kmem_flags);
363 	fthdr_cache = kmem_cache_create("streams_fthdr",
364 		sizeof (fthdr_t), 32, NULL, NULL, NULL, NULL, NULL, 0);
365 	ftblk_cache = kmem_cache_create("streams_ftblk",
366 		sizeof (ftblk_t), 32, NULL, NULL, NULL, NULL, NULL, 0);
367 
368 	/* Initialize Multidata caches */
369 	mmd_init();
370 
371 	/* initialize throttling queue for esballoc */
372 	esballoc_queue_init();
373 }
374 
375 /*ARGSUSED*/
376 mblk_t *
377 allocb(size_t size, uint_t pri)
378 {
379 	dblk_t *dbp;
380 	mblk_t *mp;
381 	size_t index;
382 
383 	index =  (size - 1)  >> DBLK_SIZE_SHIFT;
384 
385 	if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
386 		if (size != 0) {
387 			mp = allocb_oversize(size, KM_NOSLEEP);
388 			goto out;
389 		}
390 		index = 0;
391 	}
392 
393 	if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
394 		mp = NULL;
395 		goto out;
396 	}
397 
398 	mp = dbp->db_mblk;
399 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
400 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
401 	mp->b_rptr = mp->b_wptr = dbp->db_base;
402 	mp->b_queue = NULL;
403 	MBLK_BAND_FLAG_WORD(mp) = 0;
404 	STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
405 out:
406 	FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
407 
408 	return (mp);
409 }
410 
411 mblk_t *
412 allocb_tmpl(size_t size, const mblk_t *tmpl)
413 {
414 	mblk_t *mp = allocb(size, 0);
415 
416 	if (mp != NULL) {
417 		cred_t *cr = DB_CRED(tmpl);
418 		if (cr != NULL)
419 			crhold(mp->b_datap->db_credp = cr);
420 		DB_CPID(mp) = DB_CPID(tmpl);
421 		DB_TYPE(mp) = DB_TYPE(tmpl);
422 	}
423 	return (mp);
424 }
425 
426 mblk_t *
427 allocb_cred(size_t size, cred_t *cr)
428 {
429 	mblk_t *mp = allocb(size, 0);
430 
431 	if (mp != NULL && cr != NULL)
432 		crhold(mp->b_datap->db_credp = cr);
433 
434 	return (mp);
435 }
436 
437 mblk_t *
438 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr)
439 {
440 	mblk_t *mp = allocb_wait(size, 0, flags, error);
441 
442 	if (mp != NULL && cr != NULL)
443 		crhold(mp->b_datap->db_credp = cr);
444 
445 	return (mp);
446 }
447 
448 void
449 freeb(mblk_t *mp)
450 {
451 	dblk_t *dbp = mp->b_datap;
452 
453 	ASSERT(dbp->db_ref > 0);
454 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
455 	FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
456 
457 	STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
458 
459 	dbp->db_free(mp, dbp);
460 }
461 
462 void
463 freemsg(mblk_t *mp)
464 {
465 	FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
466 	while (mp) {
467 		dblk_t *dbp = mp->b_datap;
468 		mblk_t *mp_cont = mp->b_cont;
469 
470 		ASSERT(dbp->db_ref > 0);
471 		ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
472 
473 		STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
474 
475 		dbp->db_free(mp, dbp);
476 		mp = mp_cont;
477 	}
478 }
479 
480 /*
481  * Reallocate a block for another use.  Try hard to use the old block.
482  * If the old data is wanted (copy), leave b_wptr at the end of the data,
483  * otherwise return b_wptr = b_rptr.
484  *
485  * This routine is private and unstable.
486  */
487 mblk_t	*
488 reallocb(mblk_t *mp, size_t size, uint_t copy)
489 {
490 	mblk_t		*mp1;
491 	unsigned char	*old_rptr;
492 	ptrdiff_t	cur_size;
493 
494 	if (mp == NULL)
495 		return (allocb(size, BPRI_HI));
496 
497 	cur_size = mp->b_wptr - mp->b_rptr;
498 	old_rptr = mp->b_rptr;
499 
500 	ASSERT(mp->b_datap->db_ref != 0);
501 
502 	if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
503 		/*
504 		 * If the data is wanted and it will fit where it is, no
505 		 * work is required.
506 		 */
507 		if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
508 			return (mp);
509 
510 		mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
511 		mp1 = mp;
512 	} else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
513 		/* XXX other mp state could be copied too, db_flags ... ? */
514 		mp1->b_cont = mp->b_cont;
515 	} else {
516 		return (NULL);
517 	}
518 
519 	if (copy) {
520 		bcopy(old_rptr, mp1->b_rptr, cur_size);
521 		mp1->b_wptr = mp1->b_rptr + cur_size;
522 	}
523 
524 	if (mp != mp1)
525 		freeb(mp);
526 
527 	return (mp1);
528 }
529 
530 static void
531 dblk_lastfree(mblk_t *mp, dblk_t *dbp)
532 {
533 	ASSERT(dbp->db_mblk == mp);
534 	if (dbp->db_fthdr != NULL)
535 		str_ftfree(dbp);
536 
537 	/* set credp and projid to be 'unspecified' before returning to cache */
538 	if (dbp->db_credp != NULL) {
539 		crfree(dbp->db_credp);
540 		dbp->db_credp = NULL;
541 	}
542 	dbp->db_cpid = -1;
543 
544 	/* Reset the struioflag and the checksum flag fields */
545 	dbp->db_struioflag = 0;
546 	dbp->db_struioun.cksum.flags = 0;
547 
548 	/* and the COOKED flag */
549 	dbp->db_flags &= ~DBLK_COOKED;
550 
551 	kmem_cache_free(dbp->db_cache, dbp);
552 }
553 
554 static void
555 dblk_decref(mblk_t *mp, dblk_t *dbp)
556 {
557 	if (dbp->db_ref != 1) {
558 		uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
559 		    -(1 << DBLK_RTFU_SHIFT(db_ref)));
560 		/*
561 		 * atomic_add_32_nv() just decremented db_ref, so we no longer
562 		 * have a reference to the dblk, which means another thread
563 		 * could free it.  Therefore we cannot examine the dblk to
564 		 * determine whether ours was the last reference.  Instead,
565 		 * we extract the new and minimum reference counts from rtfu.
566 		 * Note that all we're really saying is "if (ref != refmin)".
567 		 */
568 		if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
569 		    ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
570 			kmem_cache_free(mblk_cache, mp);
571 			return;
572 		}
573 	}
574 	dbp->db_mblk = mp;
575 	dbp->db_free = dbp->db_lastfree;
576 	dbp->db_lastfree(mp, dbp);
577 }
578 
579 mblk_t *
580 dupb(mblk_t *mp)
581 {
582 	dblk_t *dbp = mp->b_datap;
583 	mblk_t *new_mp;
584 	uint32_t oldrtfu, newrtfu;
585 
586 	if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
587 		goto out;
588 
589 	new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
590 	new_mp->b_rptr = mp->b_rptr;
591 	new_mp->b_wptr = mp->b_wptr;
592 	new_mp->b_datap = dbp;
593 	new_mp->b_queue = NULL;
594 	MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
595 
596 	STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
597 
598 	dbp->db_free = dblk_decref;
599 	do {
600 		ASSERT(dbp->db_ref > 0);
601 		oldrtfu = DBLK_RTFU_WORD(dbp);
602 		newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
603 		/*
604 		 * If db_ref is maxed out we can't dup this message anymore.
605 		 */
606 		if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
607 			kmem_cache_free(mblk_cache, new_mp);
608 			new_mp = NULL;
609 			goto out;
610 		}
611 	} while (cas32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != oldrtfu);
612 
613 out:
614 	FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
615 	return (new_mp);
616 }
617 
618 static void
619 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
620 {
621 	frtn_t *frp = dbp->db_frtnp;
622 
623 	ASSERT(dbp->db_mblk == mp);
624 	frp->free_func(frp->free_arg);
625 	if (dbp->db_fthdr != NULL)
626 		str_ftfree(dbp);
627 
628 	/* set credp and projid to be 'unspecified' before returning to cache */
629 	if (dbp->db_credp != NULL) {
630 		crfree(dbp->db_credp);
631 		dbp->db_credp = NULL;
632 	}
633 	dbp->db_cpid = -1;
634 	dbp->db_struioflag = 0;
635 	dbp->db_struioun.cksum.flags = 0;
636 
637 	kmem_cache_free(dbp->db_cache, dbp);
638 }
639 
640 /*ARGSUSED*/
641 static void
642 frnop_func(void *arg)
643 {
644 }
645 
646 /*
647  * Generic esballoc used to implement the four flavors: [d]esballoc[a].
648  */
649 static mblk_t *
650 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
651 	void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
652 {
653 	dblk_t *dbp;
654 	mblk_t *mp;
655 
656 	ASSERT(base != NULL && frp != NULL);
657 
658 	if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
659 		mp = NULL;
660 		goto out;
661 	}
662 
663 	mp = dbp->db_mblk;
664 	dbp->db_base = base;
665 	dbp->db_lim = base + size;
666 	dbp->db_free = dbp->db_lastfree = lastfree;
667 	dbp->db_frtnp = frp;
668 	DBLK_RTFU_WORD(dbp) = db_rtfu;
669 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
670 	mp->b_rptr = mp->b_wptr = base;
671 	mp->b_queue = NULL;
672 	MBLK_BAND_FLAG_WORD(mp) = 0;
673 
674 out:
675 	FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
676 	return (mp);
677 }
678 
679 /*ARGSUSED*/
680 mblk_t *
681 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
682 {
683 	mblk_t *mp;
684 
685 	/*
686 	 * Note that this is structured to allow the common case (i.e.
687 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
688 	 * call optimization.
689 	 */
690 	if (!str_ftnever) {
691 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
692 		    frp, freebs_enqueue, KM_NOSLEEP);
693 
694 		if (mp != NULL)
695 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
696 		return (mp);
697 	}
698 
699 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
700 	    frp, freebs_enqueue, KM_NOSLEEP));
701 }
702 
703 /*
704  * Same as esballoc() but sleeps waiting for memory.
705  */
706 /*ARGSUSED*/
707 mblk_t *
708 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
709 {
710 	mblk_t *mp;
711 
712 	/*
713 	 * Note that this is structured to allow the common case (i.e.
714 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
715 	 * call optimization.
716 	 */
717 	if (!str_ftnever) {
718 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
719 		    frp, freebs_enqueue, KM_SLEEP);
720 
721 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
722 		return (mp);
723 	}
724 
725 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
726 	    frp, freebs_enqueue, KM_SLEEP));
727 }
728 
729 /*ARGSUSED*/
730 mblk_t *
731 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
732 {
733 	mblk_t *mp;
734 
735 	/*
736 	 * Note that this is structured to allow the common case (i.e.
737 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
738 	 * call optimization.
739 	 */
740 	if (!str_ftnever) {
741 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
742 			frp, dblk_lastfree_desb, KM_NOSLEEP);
743 
744 		if (mp != NULL)
745 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
746 		return (mp);
747 	}
748 
749 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
750 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
751 }
752 
753 /*ARGSUSED*/
754 mblk_t *
755 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
756 {
757 	mblk_t *mp;
758 
759 	/*
760 	 * Note that this is structured to allow the common case (i.e.
761 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
762 	 * call optimization.
763 	 */
764 	if (!str_ftnever) {
765 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
766 		    frp, freebs_enqueue, KM_NOSLEEP);
767 
768 		if (mp != NULL)
769 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
770 		return (mp);
771 	}
772 
773 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
774 	    frp, freebs_enqueue, KM_NOSLEEP));
775 }
776 
777 /*ARGSUSED*/
778 mblk_t *
779 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
780 {
781 	mblk_t *mp;
782 
783 	/*
784 	 * Note that this is structured to allow the common case (i.e.
785 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
786 	 * call optimization.
787 	 */
788 	if (!str_ftnever) {
789 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
790 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
791 
792 		if (mp != NULL)
793 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
794 		return (mp);
795 	}
796 
797 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
798 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
799 }
800 
801 static void
802 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
803 {
804 	bcache_t *bcp = dbp->db_cache;
805 
806 	ASSERT(dbp->db_mblk == mp);
807 	if (dbp->db_fthdr != NULL)
808 		str_ftfree(dbp);
809 
810 	/* set credp and projid to be 'unspecified' before returning to cache */
811 	if (dbp->db_credp != NULL) {
812 		crfree(dbp->db_credp);
813 		dbp->db_credp = NULL;
814 	}
815 	dbp->db_cpid = -1;
816 	dbp->db_struioflag = 0;
817 	dbp->db_struioun.cksum.flags = 0;
818 
819 	mutex_enter(&bcp->mutex);
820 	kmem_cache_free(bcp->dblk_cache, dbp);
821 	bcp->alloc--;
822 
823 	if (bcp->alloc == 0 && bcp->destroy != 0) {
824 		kmem_cache_destroy(bcp->dblk_cache);
825 		kmem_cache_destroy(bcp->buffer_cache);
826 		mutex_exit(&bcp->mutex);
827 		mutex_destroy(&bcp->mutex);
828 		kmem_free(bcp, sizeof (bcache_t));
829 	} else {
830 		mutex_exit(&bcp->mutex);
831 	}
832 }
833 
834 bcache_t *
835 bcache_create(char *name, size_t size, uint_t align)
836 {
837 	bcache_t *bcp;
838 	char buffer[255];
839 
840 	ASSERT((align & (align - 1)) == 0);
841 
842 	if ((bcp = (bcache_t *)kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) ==
843 	    NULL) {
844 		return (NULL);
845 	}
846 
847 	bcp->size = size;
848 	bcp->align = align;
849 	bcp->alloc = 0;
850 	bcp->destroy = 0;
851 
852 	mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
853 
854 	(void) sprintf(buffer, "%s_buffer_cache", name);
855 	bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
856 	    NULL, NULL, NULL, 0);
857 	(void) sprintf(buffer, "%s_dblk_cache", name);
858 	bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
859 	    DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
860 						NULL, (void *)bcp, NULL, 0);
861 
862 	return (bcp);
863 }
864 
865 void
866 bcache_destroy(bcache_t *bcp)
867 {
868 	ASSERT(bcp != NULL);
869 
870 	mutex_enter(&bcp->mutex);
871 	if (bcp->alloc == 0) {
872 		kmem_cache_destroy(bcp->dblk_cache);
873 		kmem_cache_destroy(bcp->buffer_cache);
874 		mutex_exit(&bcp->mutex);
875 		mutex_destroy(&bcp->mutex);
876 		kmem_free(bcp, sizeof (bcache_t));
877 	} else {
878 		bcp->destroy++;
879 		mutex_exit(&bcp->mutex);
880 	}
881 }
882 
883 /*ARGSUSED*/
884 mblk_t *
885 bcache_allocb(bcache_t *bcp, uint_t pri)
886 {
887 	dblk_t *dbp;
888 	mblk_t *mp = NULL;
889 
890 	ASSERT(bcp != NULL);
891 
892 	mutex_enter(&bcp->mutex);
893 	if (bcp->destroy != 0) {
894 		mutex_exit(&bcp->mutex);
895 		goto out;
896 	}
897 
898 	if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
899 		mutex_exit(&bcp->mutex);
900 		goto out;
901 	}
902 	bcp->alloc++;
903 	mutex_exit(&bcp->mutex);
904 
905 	ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
906 
907 	mp = dbp->db_mblk;
908 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
909 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
910 	mp->b_rptr = mp->b_wptr = dbp->db_base;
911 	mp->b_queue = NULL;
912 	MBLK_BAND_FLAG_WORD(mp) = 0;
913 	STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
914 out:
915 	FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
916 
917 	return (mp);
918 }
919 
920 static void
921 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
922 {
923 	ASSERT(dbp->db_mblk == mp);
924 	if (dbp->db_fthdr != NULL)
925 		str_ftfree(dbp);
926 
927 	/* set credp and projid to be 'unspecified' before returning to cache */
928 	if (dbp->db_credp != NULL) {
929 		crfree(dbp->db_credp);
930 		dbp->db_credp = NULL;
931 	}
932 	dbp->db_cpid = -1;
933 	dbp->db_struioflag = 0;
934 	dbp->db_struioun.cksum.flags = 0;
935 
936 	kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
937 	kmem_cache_free(dbp->db_cache, dbp);
938 }
939 
940 static mblk_t *
941 allocb_oversize(size_t size, int kmflags)
942 {
943 	mblk_t *mp;
944 	void *buf;
945 
946 	size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
947 	if ((buf = kmem_alloc(size, kmflags)) == NULL)
948 		return (NULL);
949 	if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
950 	    &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
951 		kmem_free(buf, size);
952 
953 	if (mp != NULL)
954 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
955 
956 	return (mp);
957 }
958 
959 mblk_t *
960 allocb_tryhard(size_t target_size)
961 {
962 	size_t size;
963 	mblk_t *bp;
964 
965 	for (size = target_size; size < target_size + 512;
966 	    size += DBLK_CACHE_ALIGN)
967 		if ((bp = allocb(size, BPRI_HI)) != NULL)
968 			return (bp);
969 	allocb_tryhard_fails++;
970 	return (NULL);
971 }
972 
973 /*
974  * This routine is consolidation private for STREAMS internal use
975  * This routine may only be called from sync routines (i.e., not
976  * from put or service procedures).  It is located here (rather
977  * than strsubr.c) so that we don't have to expose all of the
978  * allocb() implementation details in header files.
979  */
980 mblk_t *
981 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
982 {
983 	dblk_t *dbp;
984 	mblk_t *mp;
985 	size_t index;
986 
987 	index = (size -1) >> DBLK_SIZE_SHIFT;
988 
989 	if (flags & STR_NOSIG) {
990 		if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
991 			if (size != 0) {
992 				mp = allocb_oversize(size, KM_SLEEP);
993 				FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
994 				    (uintptr_t)mp);
995 				return (mp);
996 			}
997 			index = 0;
998 		}
999 
1000 		dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
1001 		mp = dbp->db_mblk;
1002 		DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1003 		mp->b_next = mp->b_prev = mp->b_cont = NULL;
1004 		mp->b_rptr = mp->b_wptr = dbp->db_base;
1005 		mp->b_queue = NULL;
1006 		MBLK_BAND_FLAG_WORD(mp) = 0;
1007 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
1008 
1009 		FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
1010 
1011 	} else {
1012 		while ((mp = allocb(size, pri)) == NULL) {
1013 			if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
1014 				return (NULL);
1015 		}
1016 	}
1017 
1018 	return (mp);
1019 }
1020 
1021 /*
1022  * Call function 'func' with 'arg' when a class zero block can
1023  * be allocated with priority 'pri'.
1024  */
1025 bufcall_id_t
1026 esbbcall(uint_t pri, void (*func)(void *), void *arg)
1027 {
1028 	return (bufcall(1, pri, func, arg));
1029 }
1030 
1031 /*
1032  * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
1033  * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
1034  * This provides consistency for all internal allocators of ioctl.
1035  */
1036 mblk_t *
1037 mkiocb(uint_t cmd)
1038 {
1039 	struct iocblk	*ioc;
1040 	mblk_t		*mp;
1041 
1042 	/*
1043 	 * Allocate enough space for any of the ioctl related messages.
1044 	 */
1045 	if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
1046 		return (NULL);
1047 
1048 	bzero(mp->b_rptr, sizeof (union ioctypes));
1049 
1050 	/*
1051 	 * Set the mblk_t information and ptrs correctly.
1052 	 */
1053 	mp->b_wptr += sizeof (struct iocblk);
1054 	mp->b_datap->db_type = M_IOCTL;
1055 
1056 	/*
1057 	 * Fill in the fields.
1058 	 */
1059 	ioc		= (struct iocblk *)mp->b_rptr;
1060 	ioc->ioc_cmd	= cmd;
1061 	ioc->ioc_cr	= kcred;
1062 	ioc->ioc_id	= getiocseqno();
1063 	ioc->ioc_flag	= IOC_NATIVE;
1064 	return (mp);
1065 }
1066 
1067 /*
1068  * test if block of given size can be allocated with a request of
1069  * the given priority.
1070  * 'pri' is no longer used, but is retained for compatibility.
1071  */
1072 /* ARGSUSED */
1073 int
1074 testb(size_t size, uint_t pri)
1075 {
1076 	return ((size + sizeof (dblk_t)) <= kmem_avail());
1077 }
1078 
1079 /*
1080  * Call function 'func' with argument 'arg' when there is a reasonably
1081  * good chance that a block of size 'size' can be allocated.
1082  * 'pri' is no longer used, but is retained for compatibility.
1083  */
1084 /* ARGSUSED */
1085 bufcall_id_t
1086 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
1087 {
1088 	static long bid = 1;	/* always odd to save checking for zero */
1089 	bufcall_id_t bc_id;
1090 	struct strbufcall *bcp;
1091 
1092 	if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
1093 		return (0);
1094 
1095 	bcp->bc_func = func;
1096 	bcp->bc_arg = arg;
1097 	bcp->bc_size = size;
1098 	bcp->bc_next = NULL;
1099 	bcp->bc_executor = NULL;
1100 
1101 	mutex_enter(&strbcall_lock);
1102 	/*
1103 	 * After bcp is linked into strbcalls and strbcall_lock is dropped there
1104 	 * should be no references to bcp since it may be freed by
1105 	 * runbufcalls(). Since bcp_id field is returned, we save its value in
1106 	 * the local var.
1107 	 */
1108 	bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2);	/* keep it odd */
1109 
1110 	/*
1111 	 * add newly allocated stream event to existing
1112 	 * linked list of events.
1113 	 */
1114 	if (strbcalls.bc_head == NULL) {
1115 		strbcalls.bc_head = strbcalls.bc_tail = bcp;
1116 	} else {
1117 		strbcalls.bc_tail->bc_next = bcp;
1118 		strbcalls.bc_tail = bcp;
1119 	}
1120 
1121 	cv_signal(&strbcall_cv);
1122 	mutex_exit(&strbcall_lock);
1123 	return (bc_id);
1124 }
1125 
1126 /*
1127  * Cancel a bufcall request.
1128  */
1129 void
1130 unbufcall(bufcall_id_t id)
1131 {
1132 	strbufcall_t *bcp, *pbcp;
1133 
1134 	mutex_enter(&strbcall_lock);
1135 again:
1136 	pbcp = NULL;
1137 	for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
1138 		if (id == bcp->bc_id)
1139 			break;
1140 		pbcp = bcp;
1141 	}
1142 	if (bcp) {
1143 		if (bcp->bc_executor != NULL) {
1144 			if (bcp->bc_executor != curthread) {
1145 				cv_wait(&bcall_cv, &strbcall_lock);
1146 				goto again;
1147 			}
1148 		} else {
1149 			if (pbcp)
1150 				pbcp->bc_next = bcp->bc_next;
1151 			else
1152 				strbcalls.bc_head = bcp->bc_next;
1153 			if (bcp == strbcalls.bc_tail)
1154 				strbcalls.bc_tail = pbcp;
1155 			kmem_free(bcp, sizeof (strbufcall_t));
1156 		}
1157 	}
1158 	mutex_exit(&strbcall_lock);
1159 }
1160 
1161 /*
1162  * Duplicate a message block by block (uses dupb), returning
1163  * a pointer to the duplicate message.
1164  * Returns a non-NULL value only if the entire message
1165  * was dup'd.
1166  */
1167 mblk_t *
1168 dupmsg(mblk_t *bp)
1169 {
1170 	mblk_t *head, *nbp;
1171 
1172 	if (!bp || !(nbp = head = dupb(bp)))
1173 		return (NULL);
1174 
1175 	while (bp->b_cont) {
1176 		if (!(nbp->b_cont = dupb(bp->b_cont))) {
1177 			freemsg(head);
1178 			return (NULL);
1179 		}
1180 		nbp = nbp->b_cont;
1181 		bp = bp->b_cont;
1182 	}
1183 	return (head);
1184 }
1185 
1186 #define	DUPB_NOLOAN(bp) \
1187 	((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
1188 	copyb((bp)) : dupb((bp)))
1189 
1190 mblk_t *
1191 dupmsg_noloan(mblk_t *bp)
1192 {
1193 	mblk_t *head, *nbp;
1194 
1195 	if (bp == NULL || DB_TYPE(bp) != M_DATA ||
1196 	    ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
1197 		return (NULL);
1198 
1199 	while (bp->b_cont) {
1200 		if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
1201 			freemsg(head);
1202 			return (NULL);
1203 		}
1204 		nbp = nbp->b_cont;
1205 		bp = bp->b_cont;
1206 	}
1207 	return (head);
1208 }
1209 
1210 /*
1211  * Copy data from message and data block to newly allocated message and
1212  * data block. Returns new message block pointer, or NULL if error.
1213  * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
1214  * as in the original even when db_base is not word aligned. (bug 1052877)
1215  */
1216 mblk_t *
1217 copyb(mblk_t *bp)
1218 {
1219 	mblk_t	*nbp;
1220 	dblk_t	*dp, *ndp;
1221 	uchar_t *base;
1222 	size_t	size;
1223 	size_t	unaligned;
1224 
1225 	ASSERT(bp->b_wptr >= bp->b_rptr);
1226 
1227 	dp = bp->b_datap;
1228 	if (dp->db_fthdr != NULL)
1229 		STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
1230 
1231 	/*
1232 	 * Special handling for Multidata message; this should be
1233 	 * removed once a copy-callback routine is made available.
1234 	 */
1235 	if (dp->db_type == M_MULTIDATA) {
1236 		cred_t *cr;
1237 
1238 		if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL)
1239 			return (NULL);
1240 
1241 		nbp->b_flag = bp->b_flag;
1242 		nbp->b_band = bp->b_band;
1243 		ndp = nbp->b_datap;
1244 
1245 		/* See comments below on potential issues. */
1246 		STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1247 
1248 		ASSERT(ndp->db_type == dp->db_type);
1249 		cr = dp->db_credp;
1250 		if (cr != NULL)
1251 			crhold(ndp->db_credp = cr);
1252 		ndp->db_cpid = dp->db_cpid;
1253 		return (nbp);
1254 	}
1255 
1256 	size = dp->db_lim - dp->db_base;
1257 	unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
1258 	if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
1259 		return (NULL);
1260 	nbp->b_flag = bp->b_flag;
1261 	nbp->b_band = bp->b_band;
1262 	ndp = nbp->b_datap;
1263 
1264 	/*
1265 	 * Well, here is a potential issue.  If we are trying to
1266 	 * trace a flow, and we copy the message, we might lose
1267 	 * information about where this message might have been.
1268 	 * So we should inherit the FT data.  On the other hand,
1269 	 * a user might be interested only in alloc to free data.
1270 	 * So I guess the real answer is to provide a tunable.
1271 	 */
1272 	STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1273 
1274 	base = ndp->db_base + unaligned;
1275 	bcopy(dp->db_base, ndp->db_base + unaligned, size);
1276 
1277 	nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
1278 	nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
1279 
1280 	return (nbp);
1281 }
1282 
1283 /*
1284  * Copy data from message to newly allocated message using new
1285  * data blocks.  Returns a pointer to the new message, or NULL if error.
1286  */
1287 mblk_t *
1288 copymsg(mblk_t *bp)
1289 {
1290 	mblk_t *head, *nbp;
1291 
1292 	if (!bp || !(nbp = head = copyb(bp)))
1293 		return (NULL);
1294 
1295 	while (bp->b_cont) {
1296 		if (!(nbp->b_cont = copyb(bp->b_cont))) {
1297 			freemsg(head);
1298 			return (NULL);
1299 		}
1300 		nbp = nbp->b_cont;
1301 		bp = bp->b_cont;
1302 	}
1303 	return (head);
1304 }
1305 
1306 /*
1307  * link a message block to tail of message
1308  */
1309 void
1310 linkb(mblk_t *mp, mblk_t *bp)
1311 {
1312 	ASSERT(mp && bp);
1313 
1314 	for (; mp->b_cont; mp = mp->b_cont)
1315 		;
1316 	mp->b_cont = bp;
1317 }
1318 
1319 /*
1320  * unlink a message block from head of message
1321  * return pointer to new message.
1322  * NULL if message becomes empty.
1323  */
1324 mblk_t *
1325 unlinkb(mblk_t *bp)
1326 {
1327 	mblk_t *bp1;
1328 
1329 	bp1 = bp->b_cont;
1330 	bp->b_cont = NULL;
1331 	return (bp1);
1332 }
1333 
1334 /*
1335  * remove a message block "bp" from message "mp"
1336  *
1337  * Return pointer to new message or NULL if no message remains.
1338  * Return -1 if bp is not found in message.
1339  */
1340 mblk_t *
1341 rmvb(mblk_t *mp, mblk_t *bp)
1342 {
1343 	mblk_t *tmp;
1344 	mblk_t *lastp = NULL;
1345 
1346 	ASSERT(mp && bp);
1347 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
1348 		if (tmp == bp) {
1349 			if (lastp)
1350 				lastp->b_cont = tmp->b_cont;
1351 			else
1352 				mp = tmp->b_cont;
1353 			tmp->b_cont = NULL;
1354 			return (mp);
1355 		}
1356 		lastp = tmp;
1357 	}
1358 	return ((mblk_t *)-1);
1359 }
1360 
1361 /*
1362  * Concatenate and align first len bytes of common
1363  * message type.  Len == -1, means concat everything.
1364  * Returns 1 on success, 0 on failure
1365  * After the pullup, mp points to the pulled up data.
1366  */
1367 int
1368 pullupmsg(mblk_t *mp, ssize_t len)
1369 {
1370 	mblk_t *bp, *b_cont;
1371 	dblk_t *dbp;
1372 	ssize_t n;
1373 
1374 	ASSERT(mp->b_datap->db_ref > 0);
1375 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
1376 
1377 	/*
1378 	 * We won't handle Multidata message, since it contains
1379 	 * metadata which this function has no knowledge of; we
1380 	 * assert on DEBUG, and return failure otherwise.
1381 	 */
1382 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1383 	if (mp->b_datap->db_type == M_MULTIDATA)
1384 		return (0);
1385 
1386 	if (len == -1) {
1387 		if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
1388 			return (1);
1389 		len = xmsgsize(mp);
1390 	} else {
1391 		ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
1392 		ASSERT(first_mblk_len >= 0);
1393 		/*
1394 		 * If the length is less than that of the first mblk,
1395 		 * we want to pull up the message into an aligned mblk.
1396 		 * Though not part of the spec, some callers assume it.
1397 		 */
1398 		if (len <= first_mblk_len) {
1399 			if (str_aligned(mp->b_rptr))
1400 				return (1);
1401 			len = first_mblk_len;
1402 		} else if (xmsgsize(mp) < len)
1403 			return (0);
1404 	}
1405 
1406 	if ((bp = allocb_tmpl(len, mp)) == NULL)
1407 		return (0);
1408 
1409 	dbp = bp->b_datap;
1410 	*bp = *mp;		/* swap mblks so bp heads the old msg... */
1411 	mp->b_datap = dbp;	/* ... and mp heads the new message */
1412 	mp->b_datap->db_mblk = mp;
1413 	bp->b_datap->db_mblk = bp;
1414 	mp->b_rptr = mp->b_wptr = dbp->db_base;
1415 
1416 	do {
1417 		ASSERT(bp->b_datap->db_ref > 0);
1418 		ASSERT(bp->b_wptr >= bp->b_rptr);
1419 		n = MIN(bp->b_wptr - bp->b_rptr, len);
1420 		bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
1421 		mp->b_wptr += n;
1422 		bp->b_rptr += n;
1423 		len -= n;
1424 		if (bp->b_rptr != bp->b_wptr)
1425 			break;
1426 		b_cont = bp->b_cont;
1427 		freeb(bp);
1428 		bp = b_cont;
1429 	} while (len && bp);
1430 
1431 	mp->b_cont = bp;	/* tack on whatever wasn't pulled up */
1432 
1433 	return (1);
1434 }
1435 
1436 /*
1437  * Concatenate and align at least the first len bytes of common message
1438  * type.  Len == -1 means concatenate everything.  The original message is
1439  * unaltered.  Returns a pointer to a new message on success, otherwise
1440  * returns NULL.
1441  */
1442 mblk_t *
1443 msgpullup(mblk_t *mp, ssize_t len)
1444 {
1445 	mblk_t	*newmp;
1446 	ssize_t	totlen;
1447 	ssize_t	n;
1448 
1449 	/*
1450 	 * We won't handle Multidata message, since it contains
1451 	 * metadata which this function has no knowledge of; we
1452 	 * assert on DEBUG, and return failure otherwise.
1453 	 */
1454 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1455 	if (mp->b_datap->db_type == M_MULTIDATA)
1456 		return (NULL);
1457 
1458 	totlen = xmsgsize(mp);
1459 
1460 	if ((len > 0) && (len > totlen))
1461 		return (NULL);
1462 
1463 	/*
1464 	 * Copy all of the first msg type into one new mblk, then dupmsg
1465 	 * and link the rest onto this.
1466 	 */
1467 
1468 	len = totlen;
1469 
1470 	if ((newmp = allocb_tmpl(len, mp)) == NULL)
1471 		return (NULL);
1472 
1473 	newmp->b_flag = mp->b_flag;
1474 	newmp->b_band = mp->b_band;
1475 
1476 	while (len > 0) {
1477 		n = mp->b_wptr - mp->b_rptr;
1478 		ASSERT(n >= 0);		/* allow zero-length mblk_t's */
1479 		if (n > 0)
1480 			bcopy(mp->b_rptr, newmp->b_wptr, n);
1481 		newmp->b_wptr += n;
1482 		len -= n;
1483 		mp = mp->b_cont;
1484 	}
1485 
1486 	if (mp != NULL) {
1487 		newmp->b_cont = dupmsg(mp);
1488 		if (newmp->b_cont == NULL) {
1489 			freemsg(newmp);
1490 			return (NULL);
1491 		}
1492 	}
1493 
1494 	return (newmp);
1495 }
1496 
1497 /*
1498  * Trim bytes from message
1499  *  len > 0, trim from head
1500  *  len < 0, trim from tail
1501  * Returns 1 on success, 0 on failure.
1502  */
1503 int
1504 adjmsg(mblk_t *mp, ssize_t len)
1505 {
1506 	mblk_t *bp;
1507 	mblk_t *save_bp = NULL;
1508 	mblk_t *prev_bp;
1509 	mblk_t *bcont;
1510 	unsigned char type;
1511 	ssize_t n;
1512 	int fromhead;
1513 	int first;
1514 
1515 	ASSERT(mp != NULL);
1516 	/*
1517 	 * We won't handle Multidata message, since it contains
1518 	 * metadata which this function has no knowledge of; we
1519 	 * assert on DEBUG, and return failure otherwise.
1520 	 */
1521 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1522 	if (mp->b_datap->db_type == M_MULTIDATA)
1523 		return (0);
1524 
1525 	if (len < 0) {
1526 		fromhead = 0;
1527 		len = -len;
1528 	} else {
1529 		fromhead = 1;
1530 	}
1531 
1532 	if (xmsgsize(mp) < len)
1533 		return (0);
1534 
1535 
1536 	if (fromhead) {
1537 		first = 1;
1538 		while (len) {
1539 			ASSERT(mp->b_wptr >= mp->b_rptr);
1540 			n = MIN(mp->b_wptr - mp->b_rptr, len);
1541 			mp->b_rptr += n;
1542 			len -= n;
1543 
1544 			/*
1545 			 * If this is not the first zero length
1546 			 * message remove it
1547 			 */
1548 			if (!first && (mp->b_wptr == mp->b_rptr)) {
1549 				bcont = mp->b_cont;
1550 				freeb(mp);
1551 				mp = save_bp->b_cont = bcont;
1552 			} else {
1553 				save_bp = mp;
1554 				mp = mp->b_cont;
1555 			}
1556 			first = 0;
1557 		}
1558 	} else {
1559 		type = mp->b_datap->db_type;
1560 		while (len) {
1561 			bp = mp;
1562 			save_bp = NULL;
1563 
1564 			/*
1565 			 * Find the last message of same type
1566 			 */
1567 
1568 			while (bp && bp->b_datap->db_type == type) {
1569 				ASSERT(bp->b_wptr >= bp->b_rptr);
1570 				prev_bp = save_bp;
1571 				save_bp = bp;
1572 				bp = bp->b_cont;
1573 			}
1574 			if (save_bp == NULL)
1575 				break;
1576 			n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
1577 			save_bp->b_wptr -= n;
1578 			len -= n;
1579 
1580 			/*
1581 			 * If this is not the first message
1582 			 * and we have taken away everything
1583 			 * from this message, remove it
1584 			 */
1585 
1586 			if ((save_bp != mp) &&
1587 				(save_bp->b_wptr == save_bp->b_rptr)) {
1588 				bcont = save_bp->b_cont;
1589 				freeb(save_bp);
1590 				prev_bp->b_cont = bcont;
1591 			}
1592 		}
1593 	}
1594 	return (1);
1595 }
1596 
1597 /*
1598  * get number of data bytes in message
1599  */
1600 size_t
1601 msgdsize(mblk_t *bp)
1602 {
1603 	size_t count = 0;
1604 
1605 	for (; bp; bp = bp->b_cont)
1606 		if (bp->b_datap->db_type == M_DATA) {
1607 			ASSERT(bp->b_wptr >= bp->b_rptr);
1608 			count += bp->b_wptr - bp->b_rptr;
1609 		}
1610 	return (count);
1611 }
1612 
1613 /*
1614  * Get a message off head of queue
1615  *
1616  * If queue has no buffers then mark queue
1617  * with QWANTR. (queue wants to be read by
1618  * someone when data becomes available)
1619  *
1620  * If there is something to take off then do so.
1621  * If queue falls below hi water mark turn off QFULL
1622  * flag.  Decrement weighted count of queue.
1623  * Also turn off QWANTR because queue is being read.
1624  *
1625  * The queue count is maintained on a per-band basis.
1626  * Priority band 0 (normal messages) uses q_count,
1627  * q_lowat, etc.  Non-zero priority bands use the
1628  * fields in their respective qband structures
1629  * (qb_count, qb_lowat, etc.)  All messages appear
1630  * on the same list, linked via their b_next pointers.
1631  * q_first is the head of the list.  q_count does
1632  * not reflect the size of all the messages on the
1633  * queue.  It only reflects those messages in the
1634  * normal band of flow.  The one exception to this
1635  * deals with high priority messages.  They are in
1636  * their own conceptual "band", but are accounted
1637  * against q_count.
1638  *
1639  * If queue count is below the lo water mark and QWANTW
1640  * is set, enable the closest backq which has a service
1641  * procedure and turn off the QWANTW flag.
1642  *
1643  * getq could be built on top of rmvq, but isn't because
1644  * of performance considerations.
1645  *
1646  * A note on the use of q_count and q_mblkcnt:
1647  *   q_count is the traditional byte count for messages that
1648  *   have been put on a queue.  Documentation tells us that
1649  *   we shouldn't rely on that count, but some drivers/modules
1650  *   do.  What was needed, however, is a mechanism to prevent
1651  *   runaway streams from consuming all of the resources,
1652  *   and particularly be able to flow control zero-length
1653  *   messages.  q_mblkcnt is used for this purpose.  It
1654  *   counts the number of mblk's that are being put on
1655  *   the queue.  The intention here, is that each mblk should
1656  *   contain one byte of data and, for the purpose of
1657  *   flow-control, logically does.  A queue will become
1658  *   full when EITHER of these values (q_count and q_mblkcnt)
1659  *   reach the highwater mark.  It will clear when BOTH
1660  *   of them drop below the highwater mark.  And it will
1661  *   backenable when BOTH of them drop below the lowwater
1662  *   mark.
1663  *   With this algorithm, a driver/module might be able
1664  *   to find a reasonably accurate q_count, and the
1665  *   framework can still try and limit resource usage.
1666  */
1667 mblk_t *
1668 getq(queue_t *q)
1669 {
1670 	mblk_t *bp;
1671 	uchar_t band = 0;
1672 
1673 	bp = getq_noenab(q);
1674 	if (bp != NULL)
1675 		band = bp->b_band;
1676 
1677 	/*
1678 	 * Inlined from qbackenable().
1679 	 * Quick check without holding the lock.
1680 	 */
1681 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1682 		return (bp);
1683 
1684 	qbackenable(q, band);
1685 	return (bp);
1686 }
1687 
1688 /*
1689  * Calculate number of data bytes in a single data message block taking
1690  * multidata messages into account.
1691  */
1692 
1693 #define	ADD_MBLK_SIZE(mp, size) 					\
1694 	if (DB_TYPE(mp) != M_MULTIDATA) {				\
1695 		(size) += MBLKL(mp);					\
1696 	} else {							\
1697 		uint_t	pinuse;						\
1698 									\
1699 		mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse);	\
1700 		(size) += pinuse;					\
1701 	}
1702 
1703 /*
1704  * Like getq() but does not backenable.  This is used by the stream
1705  * head when a putback() is likely.  The caller must call qbackenable()
1706  * after it is done with accessing the queue.
1707  */
1708 mblk_t *
1709 getq_noenab(queue_t *q)
1710 {
1711 	mblk_t *bp;
1712 	mblk_t *tmp;
1713 	qband_t *qbp;
1714 	kthread_id_t freezer;
1715 	int	bytecnt = 0, mblkcnt = 0;
1716 
1717 	/* freezestr should allow its caller to call getq/putq */
1718 	freezer = STREAM(q)->sd_freezer;
1719 	if (freezer == curthread) {
1720 		ASSERT(frozenstr(q));
1721 		ASSERT(MUTEX_HELD(QLOCK(q)));
1722 	} else
1723 		mutex_enter(QLOCK(q));
1724 
1725 	if ((bp = q->q_first) == 0) {
1726 		q->q_flag |= QWANTR;
1727 	} else {
1728 		if ((q->q_first = bp->b_next) == NULL)
1729 			q->q_last = NULL;
1730 		else
1731 			q->q_first->b_prev = NULL;
1732 
1733 		/* Get message byte count for q_count accounting */
1734 		for (tmp = bp; tmp; tmp = tmp->b_cont) {
1735 			ADD_MBLK_SIZE(tmp, bytecnt);
1736 			mblkcnt++;
1737 		}
1738 
1739 		if (bp->b_band == 0) {
1740 			q->q_count -= bytecnt;
1741 			q->q_mblkcnt -= mblkcnt;
1742 			if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
1743 			    (q->q_mblkcnt < q->q_hiwat))) {
1744 				q->q_flag &= ~QFULL;
1745 			}
1746 		} else {
1747 			int i;
1748 
1749 			ASSERT(bp->b_band <= q->q_nband);
1750 			ASSERT(q->q_bandp != NULL);
1751 			ASSERT(MUTEX_HELD(QLOCK(q)));
1752 			qbp = q->q_bandp;
1753 			i = bp->b_band;
1754 			while (--i > 0)
1755 				qbp = qbp->qb_next;
1756 			if (qbp->qb_first == qbp->qb_last) {
1757 				qbp->qb_first = NULL;
1758 				qbp->qb_last = NULL;
1759 			} else {
1760 				qbp->qb_first = bp->b_next;
1761 			}
1762 			qbp->qb_count -= bytecnt;
1763 			qbp->qb_mblkcnt -= mblkcnt;
1764 			if (qbp->qb_mblkcnt == 0 ||
1765 			    ((qbp->qb_count < qbp->qb_hiwat) &&
1766 			    (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
1767 				qbp->qb_flag &= ~QB_FULL;
1768 			}
1769 		}
1770 		q->q_flag &= ~QWANTR;
1771 		bp->b_next = NULL;
1772 		bp->b_prev = NULL;
1773 	}
1774 	if (freezer != curthread)
1775 		mutex_exit(QLOCK(q));
1776 
1777 	STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL);
1778 
1779 	return (bp);
1780 }
1781 
1782 /*
1783  * Determine if a backenable is needed after removing a message in the
1784  * specified band.
1785  * NOTE: This routine assumes that something like getq_noenab() has been
1786  * already called.
1787  *
1788  * For the read side it is ok to hold sd_lock across calling this (and the
1789  * stream head often does).
1790  * But for the write side strwakeq might be invoked and it acquires sd_lock.
1791  */
1792 void
1793 qbackenable(queue_t *q, uchar_t band)
1794 {
1795 	int backenab = 0;
1796 	qband_t *qbp;
1797 	kthread_id_t freezer;
1798 
1799 	ASSERT(q);
1800 	ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
1801 
1802 	/*
1803 	 * Quick check without holding the lock.
1804 	 * OK since after getq() has lowered the q_count these flags
1805 	 * would not change unless either the qbackenable() is done by
1806 	 * another thread (which is ok) or the queue has gotten QFULL
1807 	 * in which case another backenable will take place when the queue
1808 	 * drops below q_lowat.
1809 	 */
1810 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1811 		return;
1812 
1813 	/* freezestr should allow its caller to call getq/putq */
1814 	freezer = STREAM(q)->sd_freezer;
1815 	if (freezer == curthread) {
1816 		ASSERT(frozenstr(q));
1817 		ASSERT(MUTEX_HELD(QLOCK(q)));
1818 	} else
1819 		mutex_enter(QLOCK(q));
1820 
1821 	if (band == 0) {
1822 		if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
1823 		    q->q_mblkcnt < q->q_lowat)) {
1824 			backenab = q->q_flag & (QWANTW|QWANTWSYNC);
1825 		}
1826 	} else {
1827 		int i;
1828 
1829 		ASSERT((unsigned)band <= q->q_nband);
1830 		ASSERT(q->q_bandp != NULL);
1831 
1832 		qbp = q->q_bandp;
1833 		i = band;
1834 		while (--i > 0)
1835 			qbp = qbp->qb_next;
1836 
1837 		if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
1838 		    qbp->qb_mblkcnt < qbp->qb_lowat)) {
1839 			backenab = qbp->qb_flag & QB_WANTW;
1840 		}
1841 	}
1842 
1843 	if (backenab == 0) {
1844 		if (freezer != curthread)
1845 			mutex_exit(QLOCK(q));
1846 		return;
1847 	}
1848 
1849 	/* Have to drop the lock across strwakeq and backenable */
1850 	if (backenab & QWANTWSYNC)
1851 		q->q_flag &= ~QWANTWSYNC;
1852 	if (backenab & (QWANTW|QB_WANTW)) {
1853 		if (band != 0)
1854 			qbp->qb_flag &= ~QB_WANTW;
1855 		else {
1856 			q->q_flag &= ~QWANTW;
1857 		}
1858 	}
1859 
1860 	if (freezer != curthread)
1861 		mutex_exit(QLOCK(q));
1862 
1863 	if (backenab & QWANTWSYNC)
1864 		strwakeq(q, QWANTWSYNC);
1865 	if (backenab & (QWANTW|QB_WANTW))
1866 		backenable(q, band);
1867 }
1868 
1869 /*
1870  * Remove a message from a queue.  The queue count and other
1871  * flow control parameters are adjusted and the back queue
1872  * enabled if necessary.
1873  *
1874  * rmvq can be called with the stream frozen, but other utility functions
1875  * holding QLOCK, and by streams modules without any locks/frozen.
1876  */
1877 void
1878 rmvq(queue_t *q, mblk_t *mp)
1879 {
1880 	ASSERT(mp != NULL);
1881 
1882 	rmvq_noenab(q, mp);
1883 	if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
1884 		/*
1885 		 * qbackenable can handle a frozen stream but not a "random"
1886 		 * qlock being held. Drop lock across qbackenable.
1887 		 */
1888 		mutex_exit(QLOCK(q));
1889 		qbackenable(q, mp->b_band);
1890 		mutex_enter(QLOCK(q));
1891 	} else {
1892 		qbackenable(q, mp->b_band);
1893 	}
1894 }
1895 
1896 /*
1897  * Like rmvq() but without any backenabling.
1898  * This exists to handle SR_CONSOL_DATA in strrput().
1899  */
1900 void
1901 rmvq_noenab(queue_t *q, mblk_t *mp)
1902 {
1903 	mblk_t *tmp;
1904 	int i;
1905 	qband_t *qbp = NULL;
1906 	kthread_id_t freezer;
1907 	int	bytecnt = 0, mblkcnt = 0;
1908 
1909 	freezer = STREAM(q)->sd_freezer;
1910 	if (freezer == curthread) {
1911 		ASSERT(frozenstr(q));
1912 		ASSERT(MUTEX_HELD(QLOCK(q)));
1913 	} else if (MUTEX_HELD(QLOCK(q))) {
1914 		/* Don't drop lock on exit */
1915 		freezer = curthread;
1916 	} else
1917 		mutex_enter(QLOCK(q));
1918 
1919 	ASSERT(mp->b_band <= q->q_nband);
1920 	if (mp->b_band != 0) {		/* Adjust band pointers */
1921 		ASSERT(q->q_bandp != NULL);
1922 		qbp = q->q_bandp;
1923 		i = mp->b_band;
1924 		while (--i > 0)
1925 			qbp = qbp->qb_next;
1926 		if (mp == qbp->qb_first) {
1927 			if (mp->b_next && mp->b_band == mp->b_next->b_band)
1928 				qbp->qb_first = mp->b_next;
1929 			else
1930 				qbp->qb_first = NULL;
1931 		}
1932 		if (mp == qbp->qb_last) {
1933 			if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
1934 				qbp->qb_last = mp->b_prev;
1935 			else
1936 				qbp->qb_last = NULL;
1937 		}
1938 	}
1939 
1940 	/*
1941 	 * Remove the message from the list.
1942 	 */
1943 	if (mp->b_prev)
1944 		mp->b_prev->b_next = mp->b_next;
1945 	else
1946 		q->q_first = mp->b_next;
1947 	if (mp->b_next)
1948 		mp->b_next->b_prev = mp->b_prev;
1949 	else
1950 		q->q_last = mp->b_prev;
1951 	mp->b_next = NULL;
1952 	mp->b_prev = NULL;
1953 
1954 	/* Get the size of the message for q_count accounting */
1955 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
1956 		ADD_MBLK_SIZE(tmp, bytecnt);
1957 		mblkcnt++;
1958 	}
1959 
1960 	if (mp->b_band == 0) {		/* Perform q_count accounting */
1961 		q->q_count -= bytecnt;
1962 		q->q_mblkcnt -= mblkcnt;
1963 		if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
1964 		    (q->q_mblkcnt < q->q_hiwat))) {
1965 			q->q_flag &= ~QFULL;
1966 		}
1967 	} else {			/* Perform qb_count accounting */
1968 		qbp->qb_count -= bytecnt;
1969 		qbp->qb_mblkcnt -= mblkcnt;
1970 		if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) &&
1971 		    (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
1972 			qbp->qb_flag &= ~QB_FULL;
1973 		}
1974 	}
1975 	if (freezer != curthread)
1976 		mutex_exit(QLOCK(q));
1977 
1978 	STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL);
1979 }
1980 
1981 /*
1982  * Empty a queue.
1983  * If flag is set, remove all messages.  Otherwise, remove
1984  * only non-control messages.  If queue falls below its low
1985  * water mark, and QWANTW is set, enable the nearest upstream
1986  * service procedure.
1987  *
1988  * Historical note: when merging the M_FLUSH code in strrput with this
1989  * code one difference was discovered. flushq did not have a check
1990  * for q_lowat == 0 in the backenabling test.
1991  *
1992  * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
1993  * if one exists on the queue.
1994  */
1995 void
1996 flushq_common(queue_t *q, int flag, int pcproto_flag)
1997 {
1998 	mblk_t *mp, *nmp;
1999 	qband_t *qbp;
2000 	int backenab = 0;
2001 	unsigned char bpri;
2002 	unsigned char	qbf[NBAND];	/* band flushing backenable flags */
2003 
2004 	if (q->q_first == NULL)
2005 		return;
2006 
2007 	mutex_enter(QLOCK(q));
2008 	mp = q->q_first;
2009 	q->q_first = NULL;
2010 	q->q_last = NULL;
2011 	q->q_count = 0;
2012 	q->q_mblkcnt = 0;
2013 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2014 		qbp->qb_first = NULL;
2015 		qbp->qb_last = NULL;
2016 		qbp->qb_count = 0;
2017 		qbp->qb_mblkcnt = 0;
2018 		qbp->qb_flag &= ~QB_FULL;
2019 	}
2020 	q->q_flag &= ~QFULL;
2021 	mutex_exit(QLOCK(q));
2022 	while (mp) {
2023 		nmp = mp->b_next;
2024 		mp->b_next = mp->b_prev = NULL;
2025 
2026 		STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL);
2027 
2028 		if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
2029 			(void) putq(q, mp);
2030 		else if (flag || datamsg(mp->b_datap->db_type))
2031 			freemsg(mp);
2032 		else
2033 			(void) putq(q, mp);
2034 		mp = nmp;
2035 	}
2036 	bpri = 1;
2037 	mutex_enter(QLOCK(q));
2038 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2039 		if ((qbp->qb_flag & QB_WANTW) &&
2040 		    (((qbp->qb_count < qbp->qb_lowat) &&
2041 		    (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
2042 		    qbp->qb_lowat == 0)) {
2043 			qbp->qb_flag &= ~QB_WANTW;
2044 			backenab = 1;
2045 			qbf[bpri] = 1;
2046 		} else
2047 			qbf[bpri] = 0;
2048 		bpri++;
2049 	}
2050 	ASSERT(bpri == (unsigned char)(q->q_nband + 1));
2051 	if ((q->q_flag & QWANTW) &&
2052 	    (((q->q_count < q->q_lowat) &&
2053 	    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2054 		q->q_flag &= ~QWANTW;
2055 		backenab = 1;
2056 		qbf[0] = 1;
2057 	} else
2058 		qbf[0] = 0;
2059 
2060 	/*
2061 	 * If any band can now be written to, and there is a writer
2062 	 * for that band, then backenable the closest service procedure.
2063 	 */
2064 	if (backenab) {
2065 		mutex_exit(QLOCK(q));
2066 		for (bpri = q->q_nband; bpri != 0; bpri--)
2067 			if (qbf[bpri])
2068 				backenable(q, bpri);
2069 		if (qbf[0])
2070 			backenable(q, 0);
2071 	} else
2072 		mutex_exit(QLOCK(q));
2073 }
2074 
2075 /*
2076  * The real flushing takes place in flushq_common. This is done so that
2077  * a flag which specifies whether or not M_PCPROTO messages should be flushed
2078  * or not. Currently the only place that uses this flag is the stream head.
2079  */
2080 void
2081 flushq(queue_t *q, int flag)
2082 {
2083 	flushq_common(q, flag, 0);
2084 }
2085 
2086 /*
2087  * Flush the queue of messages of the given priority band.
2088  * There is some duplication of code between flushq and flushband.
2089  * This is because we want to optimize the code as much as possible.
2090  * The assumption is that there will be more messages in the normal
2091  * (priority 0) band than in any other.
2092  *
2093  * Historical note: when merging the M_FLUSH code in strrput with this
2094  * code one difference was discovered. flushband had an extra check for
2095  * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
2096  * case. That check does not match the man page for flushband and was not
2097  * in the strrput flush code hence it was removed.
2098  */
2099 void
2100 flushband(queue_t *q, unsigned char pri, int flag)
2101 {
2102 	mblk_t *mp;
2103 	mblk_t *nmp;
2104 	mblk_t *last;
2105 	qband_t *qbp;
2106 	int band;
2107 
2108 	ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
2109 	if (pri > q->q_nband) {
2110 		return;
2111 	}
2112 	mutex_enter(QLOCK(q));
2113 	if (pri == 0) {
2114 		mp = q->q_first;
2115 		q->q_first = NULL;
2116 		q->q_last = NULL;
2117 		q->q_count = 0;
2118 		q->q_mblkcnt = 0;
2119 		for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2120 			qbp->qb_first = NULL;
2121 			qbp->qb_last = NULL;
2122 			qbp->qb_count = 0;
2123 			qbp->qb_mblkcnt = 0;
2124 			qbp->qb_flag &= ~QB_FULL;
2125 		}
2126 		q->q_flag &= ~QFULL;
2127 		mutex_exit(QLOCK(q));
2128 		while (mp) {
2129 			nmp = mp->b_next;
2130 			mp->b_next = mp->b_prev = NULL;
2131 			if ((mp->b_band == 0) &&
2132 				((flag == FLUSHALL) ||
2133 				datamsg(mp->b_datap->db_type)))
2134 				freemsg(mp);
2135 			else
2136 				(void) putq(q, mp);
2137 			mp = nmp;
2138 		}
2139 		mutex_enter(QLOCK(q));
2140 		if ((q->q_flag & QWANTW) &&
2141 		    (((q->q_count < q->q_lowat) &&
2142 		    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2143 			q->q_flag &= ~QWANTW;
2144 			mutex_exit(QLOCK(q));
2145 
2146 			backenable(q, pri);
2147 		} else
2148 			mutex_exit(QLOCK(q));
2149 	} else {	/* pri != 0 */
2150 		boolean_t flushed = B_FALSE;
2151 		band = pri;
2152 
2153 		ASSERT(MUTEX_HELD(QLOCK(q)));
2154 		qbp = q->q_bandp;
2155 		while (--band > 0)
2156 			qbp = qbp->qb_next;
2157 		mp = qbp->qb_first;
2158 		if (mp == NULL) {
2159 			mutex_exit(QLOCK(q));
2160 			return;
2161 		}
2162 		last = qbp->qb_last->b_next;
2163 		/*
2164 		 * rmvq_noenab() and freemsg() are called for each mblk that
2165 		 * meets the criteria.  The loop is executed until the last
2166 		 * mblk has been processed.
2167 		 */
2168 		while (mp != last) {
2169 			ASSERT(mp->b_band == pri);
2170 			nmp = mp->b_next;
2171 			if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
2172 				rmvq_noenab(q, mp);
2173 				freemsg(mp);
2174 				flushed = B_TRUE;
2175 			}
2176 			mp = nmp;
2177 		}
2178 		mutex_exit(QLOCK(q));
2179 
2180 		/*
2181 		 * If any mblk(s) has been freed, we know that qbackenable()
2182 		 * will need to be called.
2183 		 */
2184 		if (flushed)
2185 			qbackenable(q, pri);
2186 	}
2187 }
2188 
2189 /*
2190  * Return 1 if the queue is not full.  If the queue is full, return
2191  * 0 (may not put message) and set QWANTW flag (caller wants to write
2192  * to the queue).
2193  */
2194 int
2195 canput(queue_t *q)
2196 {
2197 	TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
2198 
2199 	/* this is for loopback transports, they should not do a canput */
2200 	ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
2201 
2202 	/* Find next forward module that has a service procedure */
2203 	q = q->q_nfsrv;
2204 
2205 	if (!(q->q_flag & QFULL)) {
2206 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2207 		return (1);
2208 	}
2209 	mutex_enter(QLOCK(q));
2210 	if (q->q_flag & QFULL) {
2211 		q->q_flag |= QWANTW;
2212 		mutex_exit(QLOCK(q));
2213 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
2214 		return (0);
2215 	}
2216 	mutex_exit(QLOCK(q));
2217 	TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2218 	return (1);
2219 }
2220 
2221 /*
2222  * This is the new canput for use with priority bands.  Return 1 if the
2223  * band is not full.  If the band is full, return 0 (may not put message)
2224  * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
2225  * write to the queue).
2226  */
2227 int
2228 bcanput(queue_t *q, unsigned char pri)
2229 {
2230 	qband_t *qbp;
2231 
2232 	TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
2233 	if (!q)
2234 		return (0);
2235 
2236 	/* Find next forward module that has a service procedure */
2237 	q = q->q_nfsrv;
2238 
2239 	mutex_enter(QLOCK(q));
2240 	if (pri == 0) {
2241 		if (q->q_flag & QFULL) {
2242 			q->q_flag |= QWANTW;
2243 			mutex_exit(QLOCK(q));
2244 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2245 				"bcanput:%p %X %d", q, pri, 0);
2246 			return (0);
2247 		}
2248 	} else {	/* pri != 0 */
2249 		if (pri > q->q_nband) {
2250 			/*
2251 			 * No band exists yet, so return success.
2252 			 */
2253 			mutex_exit(QLOCK(q));
2254 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2255 				"bcanput:%p %X %d", q, pri, 1);
2256 			return (1);
2257 		}
2258 		qbp = q->q_bandp;
2259 		while (--pri)
2260 			qbp = qbp->qb_next;
2261 		if (qbp->qb_flag & QB_FULL) {
2262 			qbp->qb_flag |= QB_WANTW;
2263 			mutex_exit(QLOCK(q));
2264 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2265 				"bcanput:%p %X %d", q, pri, 0);
2266 			return (0);
2267 		}
2268 	}
2269 	mutex_exit(QLOCK(q));
2270 	TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2271 		"bcanput:%p %X %d", q, pri, 1);
2272 	return (1);
2273 }
2274 
2275 /*
2276  * Put a message on a queue.
2277  *
2278  * Messages are enqueued on a priority basis.  The priority classes
2279  * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
2280  * and B_NORMAL (type < QPCTL && band == 0).
2281  *
2282  * Add appropriate weighted data block sizes to queue count.
2283  * If queue hits high water mark then set QFULL flag.
2284  *
2285  * If QNOENAB is not set (putq is allowed to enable the queue),
2286  * enable the queue only if the message is PRIORITY,
2287  * or the QWANTR flag is set (indicating that the service procedure
2288  * is ready to read the queue.  This implies that a service
2289  * procedure must NEVER put a high priority message back on its own
2290  * queue, as this would result in an infinite loop (!).
2291  */
2292 int
2293 putq(queue_t *q, mblk_t *bp)
2294 {
2295 	mblk_t *tmp;
2296 	qband_t *qbp = NULL;
2297 	int mcls = (int)queclass(bp);
2298 	kthread_id_t freezer;
2299 	int	bytecnt = 0, mblkcnt = 0;
2300 
2301 	freezer = STREAM(q)->sd_freezer;
2302 	if (freezer == curthread) {
2303 		ASSERT(frozenstr(q));
2304 		ASSERT(MUTEX_HELD(QLOCK(q)));
2305 	} else
2306 		mutex_enter(QLOCK(q));
2307 
2308 	/*
2309 	 * Make sanity checks and if qband structure is not yet
2310 	 * allocated, do so.
2311 	 */
2312 	if (mcls == QPCTL) {
2313 		if (bp->b_band != 0)
2314 			bp->b_band = 0;		/* force to be correct */
2315 	} else if (bp->b_band != 0) {
2316 		int i;
2317 		qband_t **qbpp;
2318 
2319 		if (bp->b_band > q->q_nband) {
2320 
2321 			/*
2322 			 * The qband structure for this priority band is
2323 			 * not on the queue yet, so we have to allocate
2324 			 * one on the fly.  It would be wasteful to
2325 			 * associate the qband structures with every
2326 			 * queue when the queues are allocated.  This is
2327 			 * because most queues will only need the normal
2328 			 * band of flow which can be described entirely
2329 			 * by the queue itself.
2330 			 */
2331 			qbpp = &q->q_bandp;
2332 			while (*qbpp)
2333 				qbpp = &(*qbpp)->qb_next;
2334 			while (bp->b_band > q->q_nband) {
2335 				if ((*qbpp = allocband()) == NULL) {
2336 					if (freezer != curthread)
2337 						mutex_exit(QLOCK(q));
2338 					return (0);
2339 				}
2340 				(*qbpp)->qb_hiwat = q->q_hiwat;
2341 				(*qbpp)->qb_lowat = q->q_lowat;
2342 				q->q_nband++;
2343 				qbpp = &(*qbpp)->qb_next;
2344 			}
2345 		}
2346 		ASSERT(MUTEX_HELD(QLOCK(q)));
2347 		qbp = q->q_bandp;
2348 		i = bp->b_band;
2349 		while (--i)
2350 			qbp = qbp->qb_next;
2351 	}
2352 
2353 	/*
2354 	 * If queue is empty, add the message and initialize the pointers.
2355 	 * Otherwise, adjust message pointers and queue pointers based on
2356 	 * the type of the message and where it belongs on the queue.  Some
2357 	 * code is duplicated to minimize the number of conditionals and
2358 	 * hopefully minimize the amount of time this routine takes.
2359 	 */
2360 	if (!q->q_first) {
2361 		bp->b_next = NULL;
2362 		bp->b_prev = NULL;
2363 		q->q_first = bp;
2364 		q->q_last = bp;
2365 		if (qbp) {
2366 			qbp->qb_first = bp;
2367 			qbp->qb_last = bp;
2368 		}
2369 	} else if (!qbp) {	/* bp->b_band == 0 */
2370 
2371 		/*
2372 		 * If queue class of message is less than or equal to
2373 		 * that of the last one on the queue, tack on to the end.
2374 		 */
2375 		tmp = q->q_last;
2376 		if (mcls <= (int)queclass(tmp)) {
2377 			bp->b_next = NULL;
2378 			bp->b_prev = tmp;
2379 			tmp->b_next = bp;
2380 			q->q_last = bp;
2381 		} else {
2382 			tmp = q->q_first;
2383 			while ((int)queclass(tmp) >= mcls)
2384 				tmp = tmp->b_next;
2385 
2386 			/*
2387 			 * Insert bp before tmp.
2388 			 */
2389 			bp->b_next = tmp;
2390 			bp->b_prev = tmp->b_prev;
2391 			if (tmp->b_prev)
2392 				tmp->b_prev->b_next = bp;
2393 			else
2394 				q->q_first = bp;
2395 			tmp->b_prev = bp;
2396 		}
2397 	} else {		/* bp->b_band != 0 */
2398 		if (qbp->qb_first) {
2399 			tmp = qbp->qb_last;
2400 
2401 			/*
2402 			 * Insert bp after the last message in this band.
2403 			 */
2404 			bp->b_next = tmp->b_next;
2405 			if (tmp->b_next)
2406 				tmp->b_next->b_prev = bp;
2407 			else
2408 				q->q_last = bp;
2409 			bp->b_prev = tmp;
2410 			tmp->b_next = bp;
2411 		} else {
2412 			tmp = q->q_last;
2413 			if ((mcls < (int)queclass(tmp)) ||
2414 			    (bp->b_band <= tmp->b_band)) {
2415 
2416 				/*
2417 				 * Tack bp on end of queue.
2418 				 */
2419 				bp->b_next = NULL;
2420 				bp->b_prev = tmp;
2421 				tmp->b_next = bp;
2422 				q->q_last = bp;
2423 			} else {
2424 				tmp = q->q_first;
2425 				while (tmp->b_datap->db_type >= QPCTL)
2426 					tmp = tmp->b_next;
2427 				while (tmp->b_band >= bp->b_band)
2428 					tmp = tmp->b_next;
2429 
2430 				/*
2431 				 * Insert bp before tmp.
2432 				 */
2433 				bp->b_next = tmp;
2434 				bp->b_prev = tmp->b_prev;
2435 				if (tmp->b_prev)
2436 					tmp->b_prev->b_next = bp;
2437 				else
2438 					q->q_first = bp;
2439 				tmp->b_prev = bp;
2440 			}
2441 			qbp->qb_first = bp;
2442 		}
2443 		qbp->qb_last = bp;
2444 	}
2445 
2446 	/* Get message byte count for q_count accounting */
2447 	for (tmp = bp; tmp; tmp = tmp->b_cont) {
2448 		ADD_MBLK_SIZE(tmp, bytecnt);
2449 		mblkcnt++;
2450 	}
2451 
2452 	if (qbp) {
2453 		qbp->qb_count += bytecnt;
2454 		qbp->qb_mblkcnt += mblkcnt;
2455 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2456 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2457 			qbp->qb_flag |= QB_FULL;
2458 		}
2459 	} else {
2460 		q->q_count += bytecnt;
2461 		q->q_mblkcnt += mblkcnt;
2462 		if ((q->q_count >= q->q_hiwat) ||
2463 		    (q->q_mblkcnt >= q->q_hiwat)) {
2464 			q->q_flag |= QFULL;
2465 		}
2466 	}
2467 
2468 	STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL);
2469 
2470 	if ((mcls > QNORM) ||
2471 	    (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
2472 		qenable_locked(q);
2473 	ASSERT(MUTEX_HELD(QLOCK(q)));
2474 	if (freezer != curthread)
2475 		mutex_exit(QLOCK(q));
2476 
2477 	return (1);
2478 }
2479 
2480 /*
2481  * Put stuff back at beginning of Q according to priority order.
2482  * See comment on putq above for details.
2483  */
2484 int
2485 putbq(queue_t *q, mblk_t *bp)
2486 {
2487 	mblk_t *tmp;
2488 	qband_t *qbp = NULL;
2489 	int mcls = (int)queclass(bp);
2490 	kthread_id_t freezer;
2491 	int	bytecnt = 0, mblkcnt = 0;
2492 
2493 	ASSERT(q && bp);
2494 	ASSERT(bp->b_next == NULL);
2495 	freezer = STREAM(q)->sd_freezer;
2496 	if (freezer == curthread) {
2497 		ASSERT(frozenstr(q));
2498 		ASSERT(MUTEX_HELD(QLOCK(q)));
2499 	} else
2500 		mutex_enter(QLOCK(q));
2501 
2502 	/*
2503 	 * Make sanity checks and if qband structure is not yet
2504 	 * allocated, do so.
2505 	 */
2506 	if (mcls == QPCTL) {
2507 		if (bp->b_band != 0)
2508 			bp->b_band = 0;		/* force to be correct */
2509 	} else if (bp->b_band != 0) {
2510 		int i;
2511 		qband_t **qbpp;
2512 
2513 		if (bp->b_band > q->q_nband) {
2514 			qbpp = &q->q_bandp;
2515 			while (*qbpp)
2516 				qbpp = &(*qbpp)->qb_next;
2517 			while (bp->b_band > q->q_nband) {
2518 				if ((*qbpp = allocband()) == NULL) {
2519 					if (freezer != curthread)
2520 						mutex_exit(QLOCK(q));
2521 					return (0);
2522 				}
2523 				(*qbpp)->qb_hiwat = q->q_hiwat;
2524 				(*qbpp)->qb_lowat = q->q_lowat;
2525 				q->q_nband++;
2526 				qbpp = &(*qbpp)->qb_next;
2527 			}
2528 		}
2529 		qbp = q->q_bandp;
2530 		i = bp->b_band;
2531 		while (--i)
2532 			qbp = qbp->qb_next;
2533 	}
2534 
2535 	/*
2536 	 * If queue is empty or if message is high priority,
2537 	 * place on the front of the queue.
2538 	 */
2539 	tmp = q->q_first;
2540 	if ((!tmp) || (mcls == QPCTL)) {
2541 		bp->b_next = tmp;
2542 		if (tmp)
2543 			tmp->b_prev = bp;
2544 		else
2545 			q->q_last = bp;
2546 		q->q_first = bp;
2547 		bp->b_prev = NULL;
2548 		if (qbp) {
2549 			qbp->qb_first = bp;
2550 			qbp->qb_last = bp;
2551 		}
2552 	} else if (qbp) {	/* bp->b_band != 0 */
2553 		tmp = qbp->qb_first;
2554 		if (tmp) {
2555 
2556 			/*
2557 			 * Insert bp before the first message in this band.
2558 			 */
2559 			bp->b_next = tmp;
2560 			bp->b_prev = tmp->b_prev;
2561 			if (tmp->b_prev)
2562 				tmp->b_prev->b_next = bp;
2563 			else
2564 				q->q_first = bp;
2565 			tmp->b_prev = bp;
2566 		} else {
2567 			tmp = q->q_last;
2568 			if ((mcls < (int)queclass(tmp)) ||
2569 			    (bp->b_band < tmp->b_band)) {
2570 
2571 				/*
2572 				 * Tack bp on end of queue.
2573 				 */
2574 				bp->b_next = NULL;
2575 				bp->b_prev = tmp;
2576 				tmp->b_next = bp;
2577 				q->q_last = bp;
2578 			} else {
2579 				tmp = q->q_first;
2580 				while (tmp->b_datap->db_type >= QPCTL)
2581 					tmp = tmp->b_next;
2582 				while (tmp->b_band > bp->b_band)
2583 					tmp = tmp->b_next;
2584 
2585 				/*
2586 				 * Insert bp before tmp.
2587 				 */
2588 				bp->b_next = tmp;
2589 				bp->b_prev = tmp->b_prev;
2590 				if (tmp->b_prev)
2591 					tmp->b_prev->b_next = bp;
2592 				else
2593 					q->q_first = bp;
2594 				tmp->b_prev = bp;
2595 			}
2596 			qbp->qb_last = bp;
2597 		}
2598 		qbp->qb_first = bp;
2599 	} else {		/* bp->b_band == 0 && !QPCTL */
2600 
2601 		/*
2602 		 * If the queue class or band is less than that of the last
2603 		 * message on the queue, tack bp on the end of the queue.
2604 		 */
2605 		tmp = q->q_last;
2606 		if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
2607 			bp->b_next = NULL;
2608 			bp->b_prev = tmp;
2609 			tmp->b_next = bp;
2610 			q->q_last = bp;
2611 		} else {
2612 			tmp = q->q_first;
2613 			while (tmp->b_datap->db_type >= QPCTL)
2614 				tmp = tmp->b_next;
2615 			while (tmp->b_band > bp->b_band)
2616 				tmp = tmp->b_next;
2617 
2618 			/*
2619 			 * Insert bp before tmp.
2620 			 */
2621 			bp->b_next = tmp;
2622 			bp->b_prev = tmp->b_prev;
2623 			if (tmp->b_prev)
2624 				tmp->b_prev->b_next = bp;
2625 			else
2626 				q->q_first = bp;
2627 			tmp->b_prev = bp;
2628 		}
2629 	}
2630 
2631 	/* Get message byte count for q_count accounting */
2632 	for (tmp = bp; tmp; tmp = tmp->b_cont) {
2633 		ADD_MBLK_SIZE(tmp, bytecnt);
2634 		mblkcnt++;
2635 	}
2636 	if (qbp) {
2637 		qbp->qb_count += bytecnt;
2638 		qbp->qb_mblkcnt += mblkcnt;
2639 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2640 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2641 			qbp->qb_flag |= QB_FULL;
2642 		}
2643 	} else {
2644 		q->q_count += bytecnt;
2645 		q->q_mblkcnt += mblkcnt;
2646 		if ((q->q_count >= q->q_hiwat) ||
2647 		    (q->q_mblkcnt >= q->q_hiwat)) {
2648 			q->q_flag |= QFULL;
2649 		}
2650 	}
2651 
2652 	STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL);
2653 
2654 	if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
2655 		qenable_locked(q);
2656 	ASSERT(MUTEX_HELD(QLOCK(q)));
2657 	if (freezer != curthread)
2658 		mutex_exit(QLOCK(q));
2659 
2660 	return (1);
2661 }
2662 
2663 /*
2664  * Insert a message before an existing message on the queue.  If the
2665  * existing message is NULL, the new messages is placed on the end of
2666  * the queue.  The queue class of the new message is ignored.  However,
2667  * the priority band of the new message must adhere to the following
2668  * ordering:
2669  *
2670  *	emp->b_prev->b_band >= mp->b_band >= emp->b_band.
2671  *
2672  * All flow control parameters are updated.
2673  *
2674  * insq can be called with the stream frozen, but other utility functions
2675  * holding QLOCK, and by streams modules without any locks/frozen.
2676  */
2677 int
2678 insq(queue_t *q, mblk_t *emp, mblk_t *mp)
2679 {
2680 	mblk_t *tmp;
2681 	qband_t *qbp = NULL;
2682 	int mcls = (int)queclass(mp);
2683 	kthread_id_t freezer;
2684 	int	bytecnt = 0, mblkcnt = 0;
2685 
2686 	freezer = STREAM(q)->sd_freezer;
2687 	if (freezer == curthread) {
2688 		ASSERT(frozenstr(q));
2689 		ASSERT(MUTEX_HELD(QLOCK(q)));
2690 	} else if (MUTEX_HELD(QLOCK(q))) {
2691 		/* Don't drop lock on exit */
2692 		freezer = curthread;
2693 	} else
2694 		mutex_enter(QLOCK(q));
2695 
2696 	if (mcls == QPCTL) {
2697 		if (mp->b_band != 0)
2698 			mp->b_band = 0;		/* force to be correct */
2699 		if (emp && emp->b_prev &&
2700 		    (emp->b_prev->b_datap->db_type < QPCTL))
2701 			goto badord;
2702 	}
2703 	if (emp) {
2704 		if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
2705 		    (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
2706 		    (emp->b_prev->b_band < mp->b_band))) {
2707 			goto badord;
2708 		}
2709 	} else {
2710 		tmp = q->q_last;
2711 		if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
2712 badord:
2713 			cmn_err(CE_WARN,
2714 			    "insq: attempt to insert message out of order "
2715 			    "on q %p", (void *)q);
2716 			if (freezer != curthread)
2717 				mutex_exit(QLOCK(q));
2718 			return (0);
2719 		}
2720 	}
2721 
2722 	if (mp->b_band != 0) {
2723 		int i;
2724 		qband_t **qbpp;
2725 
2726 		if (mp->b_band > q->q_nband) {
2727 			qbpp = &q->q_bandp;
2728 			while (*qbpp)
2729 				qbpp = &(*qbpp)->qb_next;
2730 			while (mp->b_band > q->q_nband) {
2731 				if ((*qbpp = allocband()) == NULL) {
2732 					if (freezer != curthread)
2733 						mutex_exit(QLOCK(q));
2734 					return (0);
2735 				}
2736 				(*qbpp)->qb_hiwat = q->q_hiwat;
2737 				(*qbpp)->qb_lowat = q->q_lowat;
2738 				q->q_nband++;
2739 				qbpp = &(*qbpp)->qb_next;
2740 			}
2741 		}
2742 		qbp = q->q_bandp;
2743 		i = mp->b_band;
2744 		while (--i)
2745 			qbp = qbp->qb_next;
2746 	}
2747 
2748 	if ((mp->b_next = emp) != NULL) {
2749 		if ((mp->b_prev = emp->b_prev) != NULL)
2750 			emp->b_prev->b_next = mp;
2751 		else
2752 			q->q_first = mp;
2753 		emp->b_prev = mp;
2754 	} else {
2755 		if ((mp->b_prev = q->q_last) != NULL)
2756 			q->q_last->b_next = mp;
2757 		else
2758 			q->q_first = mp;
2759 		q->q_last = mp;
2760 	}
2761 
2762 	/* Get mblk and byte count for q_count accounting */
2763 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
2764 		ADD_MBLK_SIZE(tmp, bytecnt);
2765 		mblkcnt++;
2766 	}
2767 
2768 	if (qbp) {	/* adjust qband pointers and count */
2769 		if (!qbp->qb_first) {
2770 			qbp->qb_first = mp;
2771 			qbp->qb_last = mp;
2772 		} else {
2773 			if (mp->b_prev == NULL || (mp->b_prev != NULL &&
2774 			    (mp->b_prev->b_band != mp->b_band)))
2775 				qbp->qb_first = mp;
2776 			else if (mp->b_next == NULL || (mp->b_next != NULL &&
2777 			    (mp->b_next->b_band != mp->b_band)))
2778 				qbp->qb_last = mp;
2779 		}
2780 		qbp->qb_count += bytecnt;
2781 		qbp->qb_mblkcnt += mblkcnt;
2782 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2783 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2784 			qbp->qb_flag |= QB_FULL;
2785 		}
2786 	} else {
2787 		q->q_count += bytecnt;
2788 		q->q_mblkcnt += mblkcnt;
2789 		if ((q->q_count >= q->q_hiwat) ||
2790 		    (q->q_mblkcnt >= q->q_hiwat)) {
2791 			q->q_flag |= QFULL;
2792 		}
2793 	}
2794 
2795 	STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL);
2796 
2797 	if (canenable(q) && (q->q_flag & QWANTR))
2798 		qenable_locked(q);
2799 
2800 	ASSERT(MUTEX_HELD(QLOCK(q)));
2801 	if (freezer != curthread)
2802 		mutex_exit(QLOCK(q));
2803 
2804 	return (1);
2805 }
2806 
2807 /*
2808  * Create and put a control message on queue.
2809  */
2810 int
2811 putctl(queue_t *q, int type)
2812 {
2813 	mblk_t *bp;
2814 
2815 	if ((datamsg(type) && (type != M_DELAY)) ||
2816 	    (bp = allocb_tryhard(0)) == NULL)
2817 		return (0);
2818 	bp->b_datap->db_type = (unsigned char) type;
2819 
2820 	put(q, bp);
2821 
2822 	return (1);
2823 }
2824 
2825 /*
2826  * Control message with a single-byte parameter
2827  */
2828 int
2829 putctl1(queue_t *q, int type, int param)
2830 {
2831 	mblk_t *bp;
2832 
2833 	if ((datamsg(type) && (type != M_DELAY)) ||
2834 	    (bp = allocb_tryhard(1)) == NULL)
2835 		return (0);
2836 	bp->b_datap->db_type = (unsigned char)type;
2837 	*bp->b_wptr++ = (unsigned char)param;
2838 
2839 	put(q, bp);
2840 
2841 	return (1);
2842 }
2843 
2844 int
2845 putnextctl1(queue_t *q, int type, int param)
2846 {
2847 	mblk_t *bp;
2848 
2849 	if ((datamsg(type) && (type != M_DELAY)) ||
2850 		((bp = allocb_tryhard(1)) == NULL))
2851 		return (0);
2852 
2853 	bp->b_datap->db_type = (unsigned char)type;
2854 	*bp->b_wptr++ = (unsigned char)param;
2855 
2856 	putnext(q, bp);
2857 
2858 	return (1);
2859 }
2860 
2861 int
2862 putnextctl(queue_t *q, int type)
2863 {
2864 	mblk_t *bp;
2865 
2866 	if ((datamsg(type) && (type != M_DELAY)) ||
2867 		((bp = allocb_tryhard(0)) == NULL))
2868 		return (0);
2869 	bp->b_datap->db_type = (unsigned char)type;
2870 
2871 	putnext(q, bp);
2872 
2873 	return (1);
2874 }
2875 
2876 /*
2877  * Return the queue upstream from this one
2878  */
2879 queue_t *
2880 backq(queue_t *q)
2881 {
2882 	q = _OTHERQ(q);
2883 	if (q->q_next) {
2884 		q = q->q_next;
2885 		return (_OTHERQ(q));
2886 	}
2887 	return (NULL);
2888 }
2889 
2890 /*
2891  * Send a block back up the queue in reverse from this
2892  * one (e.g. to respond to ioctls)
2893  */
2894 void
2895 qreply(queue_t *q, mblk_t *bp)
2896 {
2897 	ASSERT(q && bp);
2898 
2899 	putnext(_OTHERQ(q), bp);
2900 }
2901 
2902 /*
2903  * Streams Queue Scheduling
2904  *
2905  * Queues are enabled through qenable() when they have messages to
2906  * process.  They are serviced by queuerun(), which runs each enabled
2907  * queue's service procedure.  The call to queuerun() is processor
2908  * dependent - the general principle is that it be run whenever a queue
2909  * is enabled but before returning to user level.  For system calls,
2910  * the function runqueues() is called if their action causes a queue
2911  * to be enabled.  For device interrupts, queuerun() should be
2912  * called before returning from the last level of interrupt.  Beyond
2913  * this, no timing assumptions should be made about queue scheduling.
2914  */
2915 
2916 /*
2917  * Enable a queue: put it on list of those whose service procedures are
2918  * ready to run and set up the scheduling mechanism.
2919  * The broadcast is done outside the mutex -> to avoid the woken thread
2920  * from contending with the mutex. This is OK 'cos the queue has been
2921  * enqueued on the runlist and flagged safely at this point.
2922  */
2923 void
2924 qenable(queue_t *q)
2925 {
2926 	mutex_enter(QLOCK(q));
2927 	qenable_locked(q);
2928 	mutex_exit(QLOCK(q));
2929 }
2930 /*
2931  * Return number of messages on queue
2932  */
2933 int
2934 qsize(queue_t *qp)
2935 {
2936 	int count = 0;
2937 	mblk_t *mp;
2938 
2939 	mutex_enter(QLOCK(qp));
2940 	for (mp = qp->q_first; mp; mp = mp->b_next)
2941 		count++;
2942 	mutex_exit(QLOCK(qp));
2943 	return (count);
2944 }
2945 
2946 /*
2947  * noenable - set queue so that putq() will not enable it.
2948  * enableok - set queue so that putq() can enable it.
2949  */
2950 void
2951 noenable(queue_t *q)
2952 {
2953 	mutex_enter(QLOCK(q));
2954 	q->q_flag |= QNOENB;
2955 	mutex_exit(QLOCK(q));
2956 }
2957 
2958 void
2959 enableok(queue_t *q)
2960 {
2961 	mutex_enter(QLOCK(q));
2962 	q->q_flag &= ~QNOENB;
2963 	mutex_exit(QLOCK(q));
2964 }
2965 
2966 /*
2967  * Set queue fields.
2968  */
2969 int
2970 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
2971 {
2972 	qband_t *qbp = NULL;
2973 	queue_t	*wrq;
2974 	int error = 0;
2975 	kthread_id_t freezer;
2976 
2977 	freezer = STREAM(q)->sd_freezer;
2978 	if (freezer == curthread) {
2979 		ASSERT(frozenstr(q));
2980 		ASSERT(MUTEX_HELD(QLOCK(q)));
2981 	} else
2982 		mutex_enter(QLOCK(q));
2983 
2984 	if (what >= QBAD) {
2985 		error = EINVAL;
2986 		goto done;
2987 	}
2988 	if (pri != 0) {
2989 		int i;
2990 		qband_t **qbpp;
2991 
2992 		if (pri > q->q_nband) {
2993 			qbpp = &q->q_bandp;
2994 			while (*qbpp)
2995 				qbpp = &(*qbpp)->qb_next;
2996 			while (pri > q->q_nband) {
2997 				if ((*qbpp = allocband()) == NULL) {
2998 					error = EAGAIN;
2999 					goto done;
3000 				}
3001 				(*qbpp)->qb_hiwat = q->q_hiwat;
3002 				(*qbpp)->qb_lowat = q->q_lowat;
3003 				q->q_nband++;
3004 				qbpp = &(*qbpp)->qb_next;
3005 			}
3006 		}
3007 		qbp = q->q_bandp;
3008 		i = pri;
3009 		while (--i)
3010 			qbp = qbp->qb_next;
3011 	}
3012 	switch (what) {
3013 
3014 	case QHIWAT:
3015 		if (qbp)
3016 			qbp->qb_hiwat = (size_t)val;
3017 		else
3018 			q->q_hiwat = (size_t)val;
3019 		break;
3020 
3021 	case QLOWAT:
3022 		if (qbp)
3023 			qbp->qb_lowat = (size_t)val;
3024 		else
3025 			q->q_lowat = (size_t)val;
3026 		break;
3027 
3028 	case QMAXPSZ:
3029 		if (qbp)
3030 			error = EINVAL;
3031 		else
3032 			q->q_maxpsz = (ssize_t)val;
3033 
3034 		/*
3035 		 * Performance concern, strwrite looks at the module below
3036 		 * the stream head for the maxpsz each time it does a write
3037 		 * we now cache it at the stream head.  Check to see if this
3038 		 * queue is sitting directly below the stream head.
3039 		 */
3040 		wrq = STREAM(q)->sd_wrq;
3041 		if (q != wrq->q_next)
3042 			break;
3043 
3044 		/*
3045 		 * If the stream is not frozen drop the current QLOCK and
3046 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3047 		 */
3048 		if (freezer != curthread) {
3049 			mutex_exit(QLOCK(q));
3050 			mutex_enter(QLOCK(wrq));
3051 		}
3052 		ASSERT(MUTEX_HELD(QLOCK(wrq)));
3053 
3054 		if (strmsgsz != 0) {
3055 			if (val == INFPSZ)
3056 				val = strmsgsz;
3057 			else  {
3058 				if (STREAM(q)->sd_vnode->v_type == VFIFO)
3059 					val = MIN(PIPE_BUF, val);
3060 				else
3061 					val = MIN(strmsgsz, val);
3062 			}
3063 		}
3064 		STREAM(q)->sd_qn_maxpsz = val;
3065 		if (freezer != curthread) {
3066 			mutex_exit(QLOCK(wrq));
3067 			mutex_enter(QLOCK(q));
3068 		}
3069 		break;
3070 
3071 	case QMINPSZ:
3072 		if (qbp)
3073 			error = EINVAL;
3074 		else
3075 			q->q_minpsz = (ssize_t)val;
3076 
3077 		/*
3078 		 * Performance concern, strwrite looks at the module below
3079 		 * the stream head for the maxpsz each time it does a write
3080 		 * we now cache it at the stream head.  Check to see if this
3081 		 * queue is sitting directly below the stream head.
3082 		 */
3083 		wrq = STREAM(q)->sd_wrq;
3084 		if (q != wrq->q_next)
3085 			break;
3086 
3087 		/*
3088 		 * If the stream is not frozen drop the current QLOCK and
3089 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3090 		 */
3091 		if (freezer != curthread) {
3092 			mutex_exit(QLOCK(q));
3093 			mutex_enter(QLOCK(wrq));
3094 		}
3095 		STREAM(q)->sd_qn_minpsz = (ssize_t)val;
3096 
3097 		if (freezer != curthread) {
3098 			mutex_exit(QLOCK(wrq));
3099 			mutex_enter(QLOCK(q));
3100 		}
3101 		break;
3102 
3103 	case QSTRUIOT:
3104 		if (qbp)
3105 			error = EINVAL;
3106 		else
3107 			q->q_struiot = (ushort_t)val;
3108 		break;
3109 
3110 	case QCOUNT:
3111 	case QFIRST:
3112 	case QLAST:
3113 	case QFLAG:
3114 		error = EPERM;
3115 		break;
3116 
3117 	default:
3118 		error = EINVAL;
3119 		break;
3120 	}
3121 done:
3122 	if (freezer != curthread)
3123 		mutex_exit(QLOCK(q));
3124 	return (error);
3125 }
3126 
3127 /*
3128  * Get queue fields.
3129  */
3130 int
3131 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
3132 {
3133 	qband_t 	*qbp = NULL;
3134 	int 		error = 0;
3135 	kthread_id_t 	freezer;
3136 
3137 	freezer = STREAM(q)->sd_freezer;
3138 	if (freezer == curthread) {
3139 		ASSERT(frozenstr(q));
3140 		ASSERT(MUTEX_HELD(QLOCK(q)));
3141 	} else
3142 		mutex_enter(QLOCK(q));
3143 	if (what >= QBAD) {
3144 		error = EINVAL;
3145 		goto done;
3146 	}
3147 	if (pri != 0) {
3148 		int i;
3149 		qband_t **qbpp;
3150 
3151 		if (pri > q->q_nband) {
3152 			qbpp = &q->q_bandp;
3153 			while (*qbpp)
3154 				qbpp = &(*qbpp)->qb_next;
3155 			while (pri > q->q_nband) {
3156 				if ((*qbpp = allocband()) == NULL) {
3157 					error = EAGAIN;
3158 					goto done;
3159 				}
3160 				(*qbpp)->qb_hiwat = q->q_hiwat;
3161 				(*qbpp)->qb_lowat = q->q_lowat;
3162 				q->q_nband++;
3163 				qbpp = &(*qbpp)->qb_next;
3164 			}
3165 		}
3166 		qbp = q->q_bandp;
3167 		i = pri;
3168 		while (--i)
3169 			qbp = qbp->qb_next;
3170 	}
3171 	switch (what) {
3172 	case QHIWAT:
3173 		if (qbp)
3174 			*(size_t *)valp = qbp->qb_hiwat;
3175 		else
3176 			*(size_t *)valp = q->q_hiwat;
3177 		break;
3178 
3179 	case QLOWAT:
3180 		if (qbp)
3181 			*(size_t *)valp = qbp->qb_lowat;
3182 		else
3183 			*(size_t *)valp = q->q_lowat;
3184 		break;
3185 
3186 	case QMAXPSZ:
3187 		if (qbp)
3188 			error = EINVAL;
3189 		else
3190 			*(ssize_t *)valp = q->q_maxpsz;
3191 		break;
3192 
3193 	case QMINPSZ:
3194 		if (qbp)
3195 			error = EINVAL;
3196 		else
3197 			*(ssize_t *)valp = q->q_minpsz;
3198 		break;
3199 
3200 	case QCOUNT:
3201 		if (qbp)
3202 			*(size_t *)valp = qbp->qb_count;
3203 		else
3204 			*(size_t *)valp = q->q_count;
3205 		break;
3206 
3207 	case QFIRST:
3208 		if (qbp)
3209 			*(mblk_t **)valp = qbp->qb_first;
3210 		else
3211 			*(mblk_t **)valp = q->q_first;
3212 		break;
3213 
3214 	case QLAST:
3215 		if (qbp)
3216 			*(mblk_t **)valp = qbp->qb_last;
3217 		else
3218 			*(mblk_t **)valp = q->q_last;
3219 		break;
3220 
3221 	case QFLAG:
3222 		if (qbp)
3223 			*(uint_t *)valp = qbp->qb_flag;
3224 		else
3225 			*(uint_t *)valp = q->q_flag;
3226 		break;
3227 
3228 	case QSTRUIOT:
3229 		if (qbp)
3230 			error = EINVAL;
3231 		else
3232 			*(short *)valp = q->q_struiot;
3233 		break;
3234 
3235 	default:
3236 		error = EINVAL;
3237 		break;
3238 	}
3239 done:
3240 	if (freezer != curthread)
3241 		mutex_exit(QLOCK(q));
3242 	return (error);
3243 }
3244 
3245 /*
3246  * Function awakes all in cvwait/sigwait/pollwait, on one of:
3247  *	QWANTWSYNC or QWANTR or QWANTW,
3248  *
3249  * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
3250  *	 deferred wakeup will be done. Also if strpoll() in progress then a
3251  *	 deferred pollwakeup will be done.
3252  */
3253 void
3254 strwakeq(queue_t *q, int flag)
3255 {
3256 	stdata_t 	*stp = STREAM(q);
3257 	pollhead_t 	*pl;
3258 
3259 	mutex_enter(&stp->sd_lock);
3260 	pl = &stp->sd_pollist;
3261 	if (flag & QWANTWSYNC) {
3262 		ASSERT(!(q->q_flag & QREADR));
3263 		if (stp->sd_flag & WSLEEP) {
3264 			stp->sd_flag &= ~WSLEEP;
3265 			cv_broadcast(&stp->sd_wrq->q_wait);
3266 		} else {
3267 			stp->sd_wakeq |= WSLEEP;
3268 		}
3269 
3270 		mutex_exit(&stp->sd_lock);
3271 		pollwakeup(pl, POLLWRNORM);
3272 		mutex_enter(&stp->sd_lock);
3273 
3274 		if (stp->sd_sigflags & S_WRNORM)
3275 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3276 	} else if (flag & QWANTR) {
3277 		if (stp->sd_flag & RSLEEP) {
3278 			stp->sd_flag &= ~RSLEEP;
3279 			cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
3280 		} else {
3281 			stp->sd_wakeq |= RSLEEP;
3282 		}
3283 
3284 		mutex_exit(&stp->sd_lock);
3285 		pollwakeup(pl, POLLIN | POLLRDNORM);
3286 		mutex_enter(&stp->sd_lock);
3287 
3288 		{
3289 			int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
3290 
3291 			if (events)
3292 				strsendsig(stp->sd_siglist, events, 0, 0);
3293 		}
3294 	} else {
3295 		if (stp->sd_flag & WSLEEP) {
3296 			stp->sd_flag &= ~WSLEEP;
3297 			cv_broadcast(&stp->sd_wrq->q_wait);
3298 		}
3299 
3300 		mutex_exit(&stp->sd_lock);
3301 		pollwakeup(pl, POLLWRNORM);
3302 		mutex_enter(&stp->sd_lock);
3303 
3304 		if (stp->sd_sigflags & S_WRNORM)
3305 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3306 	}
3307 	mutex_exit(&stp->sd_lock);
3308 }
3309 
3310 int
3311 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
3312 {
3313 	stdata_t *stp = STREAM(q);
3314 	int typ  = STRUIOT_STANDARD;
3315 	uio_t	 *uiop = &dp->d_uio;
3316 	dblk_t	 *dbp;
3317 	ssize_t	 uiocnt;
3318 	ssize_t	 cnt;
3319 	unsigned char *ptr;
3320 	ssize_t	 resid;
3321 	int	 error = 0;
3322 	on_trap_data_t otd;
3323 	queue_t	*stwrq;
3324 
3325 	/*
3326 	 * Plumbing may change while taking the type so store the
3327 	 * queue in a temporary variable. It doesn't matter even
3328 	 * if the we take the type from the previous plumbing,
3329 	 * that's because if the plumbing has changed when we were
3330 	 * holding the queue in a temporary variable, we can continue
3331 	 * processing the message the way it would have been processed
3332 	 * in the old plumbing, without any side effects but a bit
3333 	 * extra processing for partial ip header checksum.
3334 	 *
3335 	 * This has been done to avoid holding the sd_lock which is
3336 	 * very hot.
3337 	 */
3338 
3339 	stwrq = stp->sd_struiowrq;
3340 	if (stwrq)
3341 		typ = stwrq->q_struiot;
3342 
3343 	for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
3344 		dbp = mp->b_datap;
3345 		ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
3346 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3347 		cnt = MIN(uiocnt, uiop->uio_resid);
3348 		if (!(dbp->db_struioflag & STRUIO_SPEC) ||
3349 		    (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
3350 			/*
3351 			 * Either this mblk has already been processed
3352 			 * or there is no more room in this mblk (?).
3353 			 */
3354 			continue;
3355 		}
3356 		switch (typ) {
3357 		case STRUIOT_STANDARD:
3358 			if (noblock) {
3359 				if (on_trap(&otd, OT_DATA_ACCESS)) {
3360 					no_trap();
3361 					error = EWOULDBLOCK;
3362 					goto out;
3363 				}
3364 			}
3365 			if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
3366 				if (noblock)
3367 					no_trap();
3368 				goto out;
3369 			}
3370 			if (noblock)
3371 				no_trap();
3372 			break;
3373 
3374 		default:
3375 			error = EIO;
3376 			goto out;
3377 		}
3378 		dbp->db_struioflag |= STRUIO_DONE;
3379 		dbp->db_cksumstuff += cnt;
3380 	}
3381 out:
3382 	if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
3383 		/*
3384 		 * A fault has occured and some bytes were moved to the
3385 		 * current mblk, the uio_t has already been updated by
3386 		 * the appropriate uio routine, so also update the mblk
3387 		 * to reflect this in case this same mblk chain is used
3388 		 * again (after the fault has been handled).
3389 		 */
3390 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3391 		if (uiocnt >= resid)
3392 			dbp->db_cksumstuff += resid;
3393 	}
3394 	return (error);
3395 }
3396 
3397 /*
3398  * Try to enter queue synchronously. Any attempt to enter a closing queue will
3399  * fails. The qp->q_rwcnt keeps track of the number of successful entries so
3400  * that removeq() will not try to close the queue while a thread is inside the
3401  * queue.
3402  */
3403 static boolean_t
3404 rwnext_enter(queue_t *qp)
3405 {
3406 	mutex_enter(QLOCK(qp));
3407 	if (qp->q_flag & QWCLOSE) {
3408 		mutex_exit(QLOCK(qp));
3409 		return (B_FALSE);
3410 	}
3411 	qp->q_rwcnt++;
3412 	ASSERT(qp->q_rwcnt != 0);
3413 	mutex_exit(QLOCK(qp));
3414 	return (B_TRUE);
3415 }
3416 
3417 /*
3418  * Decrease the count of threads running in sync stream queue and wake up any
3419  * threads blocked in removeq().
3420  */
3421 static void
3422 rwnext_exit(queue_t *qp)
3423 {
3424 	mutex_enter(QLOCK(qp));
3425 	qp->q_rwcnt--;
3426 	if (qp->q_flag & QWANTRMQSYNC) {
3427 		qp->q_flag &= ~QWANTRMQSYNC;
3428 		cv_broadcast(&qp->q_wait);
3429 	}
3430 	mutex_exit(QLOCK(qp));
3431 }
3432 
3433 /*
3434  * The purpose of rwnext() is to call the rw procedure of the next
3435  * (downstream) modules queue.
3436  *
3437  * treated as put entrypoint for perimeter syncronization.
3438  *
3439  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3440  * sync queues). If it is CIPUT sync queue sq_count is incremented and it does
3441  * not matter if any regular put entrypoints have been already entered. We
3442  * can't increment one of the sq_putcounts (instead of sq_count) because
3443  * qwait_rw won't know which counter to decrement.
3444  *
3445  * It would be reasonable to add the lockless FASTPUT logic.
3446  */
3447 int
3448 rwnext(queue_t *qp, struiod_t *dp)
3449 {
3450 	queue_t		*nqp;
3451 	syncq_t		*sq;
3452 	uint16_t	count;
3453 	uint16_t	flags;
3454 	struct qinit	*qi;
3455 	int		(*proc)();
3456 	struct stdata	*stp;
3457 	int		isread;
3458 	int		rval;
3459 
3460 	stp = STREAM(qp);
3461 	/*
3462 	 * Prevent q_next from changing by holding sd_lock until acquiring
3463 	 * SQLOCK. Note that a read-side rwnext from the streamhead will
3464 	 * already have sd_lock acquired. In either case sd_lock is always
3465 	 * released after acquiring SQLOCK.
3466 	 *
3467 	 * The streamhead read-side holding sd_lock when calling rwnext is
3468 	 * required to prevent a race condition were M_DATA mblks flowing
3469 	 * up the read-side of the stream could be bypassed by a rwnext()
3470 	 * down-call. In this case sd_lock acts as the streamhead perimeter.
3471 	 */
3472 	if ((nqp = _WR(qp)) == qp) {
3473 		isread = 0;
3474 		mutex_enter(&stp->sd_lock);
3475 		qp = nqp->q_next;
3476 	} else {
3477 		isread = 1;
3478 		if (nqp != stp->sd_wrq)
3479 			/* Not streamhead */
3480 			mutex_enter(&stp->sd_lock);
3481 		qp = _RD(nqp->q_next);
3482 	}
3483 	qi = qp->q_qinfo;
3484 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
3485 		/*
3486 		 * Not a synchronous module or no r/w procedure for this
3487 		 * queue, so just return EINVAL and let the caller handle it.
3488 		 */
3489 		mutex_exit(&stp->sd_lock);
3490 		return (EINVAL);
3491 	}
3492 
3493 	if (rwnext_enter(qp) == B_FALSE) {
3494 		mutex_exit(&stp->sd_lock);
3495 		return (EINVAL);
3496 	}
3497 
3498 	sq = qp->q_syncq;
3499 	mutex_enter(SQLOCK(sq));
3500 	mutex_exit(&stp->sd_lock);
3501 	count = sq->sq_count;
3502 	flags = sq->sq_flags;
3503 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3504 
3505 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3506 		/*
3507 		 * if this queue is being closed, return.
3508 		 */
3509 		if (qp->q_flag & QWCLOSE) {
3510 			mutex_exit(SQLOCK(sq));
3511 			rwnext_exit(qp);
3512 			return (EINVAL);
3513 		}
3514 
3515 		/*
3516 		 * Wait until we can enter the inner perimeter.
3517 		 */
3518 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3519 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3520 		count = sq->sq_count;
3521 		flags = sq->sq_flags;
3522 	}
3523 
3524 	if (isread == 0 && stp->sd_struiowrq == NULL ||
3525 	    isread == 1 && stp->sd_struiordq == NULL) {
3526 		/*
3527 		 * Stream plumbing changed while waiting for inner perimeter
3528 		 * so just return EINVAL and let the caller handle it.
3529 		 */
3530 		mutex_exit(SQLOCK(sq));
3531 		rwnext_exit(qp);
3532 		return (EINVAL);
3533 	}
3534 	if (!(flags & SQ_CIPUT))
3535 		sq->sq_flags = flags | SQ_EXCL;
3536 	sq->sq_count = count + 1;
3537 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3538 	/*
3539 	 * Note: The only message ordering guarantee that rwnext() makes is
3540 	 *	 for the write queue flow-control case. All others (r/w queue
3541 	 *	 with q_count > 0 (or q_first != 0)) are the resposibilty of
3542 	 *	 the queue's rw procedure. This could be genralized here buy
3543 	 *	 running the queue's service procedure, but that wouldn't be
3544 	 *	 the most efficent for all cases.
3545 	 */
3546 	mutex_exit(SQLOCK(sq));
3547 	if (! isread && (qp->q_flag & QFULL)) {
3548 		/*
3549 		 * Write queue may be flow controlled. If so,
3550 		 * mark the queue for wakeup when it's not.
3551 		 */
3552 		mutex_enter(QLOCK(qp));
3553 		if (qp->q_flag & QFULL) {
3554 			qp->q_flag |= QWANTWSYNC;
3555 			mutex_exit(QLOCK(qp));
3556 			rval = EWOULDBLOCK;
3557 			goto out;
3558 		}
3559 		mutex_exit(QLOCK(qp));
3560 	}
3561 
3562 	if (! isread && dp->d_mp)
3563 		STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
3564 		    dp->d_mp->b_datap->db_base);
3565 
3566 	rval = (*proc)(qp, dp);
3567 
3568 	if (isread && dp->d_mp)
3569 		STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
3570 		    dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
3571 out:
3572 	/*
3573 	 * The queue is protected from being freed by sq_count, so it is
3574 	 * safe to call rwnext_exit and reacquire SQLOCK(sq).
3575 	 */
3576 	rwnext_exit(qp);
3577 
3578 	mutex_enter(SQLOCK(sq));
3579 	flags = sq->sq_flags;
3580 	ASSERT(sq->sq_count != 0);
3581 	sq->sq_count--;
3582 	if (flags & SQ_TAIL) {
3583 		putnext_tail(sq, qp, flags);
3584 		/*
3585 		 * The only purpose of this ASSERT is to preserve calling stack
3586 		 * in DEBUG kernel.
3587 		 */
3588 		ASSERT(flags & SQ_TAIL);
3589 		return (rval);
3590 	}
3591 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3592 	/*
3593 	 * Safe to always drop SQ_EXCL:
3594 	 *	Not SQ_CIPUT means we set SQ_EXCL above
3595 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3596 	 *	did a qwriter(INNER) in which case nobody else
3597 	 *	is in the inner perimeter and we are exiting.
3598 	 *
3599 	 * I would like to make the following assertion:
3600 	 *
3601 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3602 	 * 	sq->sq_count == 0);
3603 	 *
3604 	 * which indicates that if we are both putshared and exclusive,
3605 	 * we became exclusive while executing the putproc, and the only
3606 	 * claim on the syncq was the one we dropped a few lines above.
3607 	 * But other threads that enter putnext while the syncq is exclusive
3608 	 * need to make a claim as they may need to drop SQLOCK in the
3609 	 * has_writers case to avoid deadlocks.  If these threads are
3610 	 * delayed or preempted, it is possible that the writer thread can
3611 	 * find out that there are other claims making the (sq_count == 0)
3612 	 * test invalid.
3613 	 */
3614 
3615 	sq->sq_flags = flags & ~SQ_EXCL;
3616 	if (sq->sq_flags & SQ_WANTWAKEUP) {
3617 		sq->sq_flags &= ~SQ_WANTWAKEUP;
3618 		cv_broadcast(&sq->sq_wait);
3619 	}
3620 	mutex_exit(SQLOCK(sq));
3621 	return (rval);
3622 }
3623 
3624 /*
3625  * The purpose of infonext() is to call the info procedure of the next
3626  * (downstream) modules queue.
3627  *
3628  * treated as put entrypoint for perimeter syncronization.
3629  *
3630  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3631  * sync queues). If it is CIPUT sync queue regular sq_count is incremented and
3632  * it does not matter if any regular put entrypoints have been already
3633  * entered.
3634  */
3635 int
3636 infonext(queue_t *qp, infod_t *idp)
3637 {
3638 	queue_t		*nqp;
3639 	syncq_t		*sq;
3640 	uint16_t	count;
3641 	uint16_t 	flags;
3642 	struct qinit	*qi;
3643 	int		(*proc)();
3644 	struct stdata	*stp;
3645 	int		rval;
3646 
3647 	stp = STREAM(qp);
3648 	/*
3649 	 * Prevent q_next from changing by holding sd_lock until
3650 	 * acquiring SQLOCK.
3651 	 */
3652 	mutex_enter(&stp->sd_lock);
3653 	if ((nqp = _WR(qp)) == qp) {
3654 		qp = nqp->q_next;
3655 	} else {
3656 		qp = _RD(nqp->q_next);
3657 	}
3658 	qi = qp->q_qinfo;
3659 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
3660 		mutex_exit(&stp->sd_lock);
3661 		return (EINVAL);
3662 	}
3663 	sq = qp->q_syncq;
3664 	mutex_enter(SQLOCK(sq));
3665 	mutex_exit(&stp->sd_lock);
3666 	count = sq->sq_count;
3667 	flags = sq->sq_flags;
3668 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3669 
3670 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3671 		/*
3672 		 * Wait until we can enter the inner perimeter.
3673 		 */
3674 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3675 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3676 		count = sq->sq_count;
3677 		flags = sq->sq_flags;
3678 	}
3679 
3680 	if (! (flags & SQ_CIPUT))
3681 		sq->sq_flags = flags | SQ_EXCL;
3682 	sq->sq_count = count + 1;
3683 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3684 	mutex_exit(SQLOCK(sq));
3685 
3686 	rval = (*proc)(qp, idp);
3687 
3688 	mutex_enter(SQLOCK(sq));
3689 	flags = sq->sq_flags;
3690 	ASSERT(sq->sq_count != 0);
3691 	sq->sq_count--;
3692 	if (flags & SQ_TAIL) {
3693 		putnext_tail(sq, qp, flags);
3694 		/*
3695 		 * The only purpose of this ASSERT is to preserve calling stack
3696 		 * in DEBUG kernel.
3697 		 */
3698 		ASSERT(flags & SQ_TAIL);
3699 		return (rval);
3700 	}
3701 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3702 /*
3703  * XXXX
3704  * I am not certain the next comment is correct here.  I need to consider
3705  * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
3706  * might cause other problems.  It just might be safer to drop it if
3707  * !SQ_CIPUT because that is when we set it.
3708  */
3709 	/*
3710 	 * Safe to always drop SQ_EXCL:
3711 	 *	Not SQ_CIPUT means we set SQ_EXCL above
3712 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3713 	 *	did a qwriter(INNER) in which case nobody else
3714 	 *	is in the inner perimeter and we are exiting.
3715 	 *
3716 	 * I would like to make the following assertion:
3717 	 *
3718 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3719 	 *	sq->sq_count == 0);
3720 	 *
3721 	 * which indicates that if we are both putshared and exclusive,
3722 	 * we became exclusive while executing the putproc, and the only
3723 	 * claim on the syncq was the one we dropped a few lines above.
3724 	 * But other threads that enter putnext while the syncq is exclusive
3725 	 * need to make a claim as they may need to drop SQLOCK in the
3726 	 * has_writers case to avoid deadlocks.  If these threads are
3727 	 * delayed or preempted, it is possible that the writer thread can
3728 	 * find out that there are other claims making the (sq_count == 0)
3729 	 * test invalid.
3730 	 */
3731 
3732 	sq->sq_flags = flags & ~SQ_EXCL;
3733 	mutex_exit(SQLOCK(sq));
3734 	return (rval);
3735 }
3736 
3737 /*
3738  * Return nonzero if the queue is responsible for struio(), else return 0.
3739  */
3740 int
3741 isuioq(queue_t *q)
3742 {
3743 	if (q->q_flag & QREADR)
3744 		return (STREAM(q)->sd_struiordq == q);
3745 	else
3746 		return (STREAM(q)->sd_struiowrq == q);
3747 }
3748 
3749 #if defined(__sparc)
3750 int disable_putlocks = 0;
3751 #else
3752 int disable_putlocks = 1;
3753 #endif
3754 
3755 /*
3756  * called by create_putlock.
3757  */
3758 static void
3759 create_syncq_putlocks(queue_t *q)
3760 {
3761 	syncq_t	*sq = q->q_syncq;
3762 	ciputctrl_t *cip;
3763 	int i;
3764 
3765 	ASSERT(sq != NULL);
3766 
3767 	ASSERT(disable_putlocks == 0);
3768 	ASSERT(n_ciputctrl >= min_n_ciputctrl);
3769 	ASSERT(ciputctrl_cache != NULL);
3770 
3771 	if (!(sq->sq_type & SQ_CIPUT))
3772 		return;
3773 
3774 	for (i = 0; i <= 1; i++) {
3775 		if (sq->sq_ciputctrl == NULL) {
3776 			cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
3777 			SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
3778 			mutex_enter(SQLOCK(sq));
3779 			if (sq->sq_ciputctrl != NULL) {
3780 				mutex_exit(SQLOCK(sq));
3781 				kmem_cache_free(ciputctrl_cache, cip);
3782 			} else {
3783 				ASSERT(sq->sq_nciputctrl == 0);
3784 				sq->sq_nciputctrl = n_ciputctrl - 1;
3785 				/*
3786 				 * putnext checks sq_ciputctrl without holding
3787 				 * SQLOCK. if it is not NULL putnext assumes
3788 				 * sq_nciputctrl is initialized. membar below
3789 				 * insures that.
3790 				 */
3791 				membar_producer();
3792 				sq->sq_ciputctrl = cip;
3793 				mutex_exit(SQLOCK(sq));
3794 			}
3795 		}
3796 		ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
3797 		if (i == 1)
3798 			break;
3799 		q = _OTHERQ(q);
3800 		if (!(q->q_flag & QPERQ)) {
3801 			ASSERT(sq == q->q_syncq);
3802 			break;
3803 		}
3804 		ASSERT(q->q_syncq != NULL);
3805 		ASSERT(sq != q->q_syncq);
3806 		sq = q->q_syncq;
3807 		ASSERT(sq->sq_type & SQ_CIPUT);
3808 	}
3809 }
3810 
3811 /*
3812  * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
3813  * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
3814  * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
3815  * starting from q and down to the driver.
3816  *
3817  * This should be called after the affected queues are part of stream
3818  * geometry. It should be called from driver/module open routine after
3819  * qprocson() call. It is also called from nfs syscall where it is known that
3820  * stream is configured and won't change its geometry during create_putlock
3821  * call.
3822  *
3823  * caller normally uses 0 value for the stream argument to speed up MT putnext
3824  * into the perimeter of q for example because its perimeter is per module
3825  * (e.g. IP).
3826  *
3827  * caller normally uses non 0 value for the stream argument to hint the system
3828  * that the stream of q is a very contended global system stream
3829  * (e.g. NFS/UDP) and the part of the stream from q to the driver is
3830  * particularly MT hot.
3831  *
3832  * Caller insures stream plumbing won't happen while we are here and therefore
3833  * q_next can be safely used.
3834  */
3835 
3836 void
3837 create_putlocks(queue_t *q, int stream)
3838 {
3839 	ciputctrl_t	*cip;
3840 	struct stdata	*stp = STREAM(q);
3841 
3842 	q = _WR(q);
3843 	ASSERT(stp != NULL);
3844 
3845 	if (disable_putlocks != 0)
3846 		return;
3847 
3848 	if (n_ciputctrl < min_n_ciputctrl)
3849 		return;
3850 
3851 	ASSERT(ciputctrl_cache != NULL);
3852 
3853 	if (stream != 0 && stp->sd_ciputctrl == NULL) {
3854 		cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
3855 		SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
3856 		mutex_enter(&stp->sd_lock);
3857 		if (stp->sd_ciputctrl != NULL) {
3858 			mutex_exit(&stp->sd_lock);
3859 			kmem_cache_free(ciputctrl_cache, cip);
3860 		} else {
3861 			ASSERT(stp->sd_nciputctrl == 0);
3862 			stp->sd_nciputctrl = n_ciputctrl - 1;
3863 			/*
3864 			 * putnext checks sd_ciputctrl without holding
3865 			 * sd_lock. if it is not NULL putnext assumes
3866 			 * sd_nciputctrl is initialized. membar below
3867 			 * insures that.
3868 			 */
3869 			membar_producer();
3870 			stp->sd_ciputctrl = cip;
3871 			mutex_exit(&stp->sd_lock);
3872 		}
3873 	}
3874 
3875 	ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);
3876 
3877 	while (_SAMESTR(q)) {
3878 		create_syncq_putlocks(q);
3879 		if (stream == 0)
3880 			return;
3881 		q = q->q_next;
3882 	}
3883 	ASSERT(q != NULL);
3884 	create_syncq_putlocks(q);
3885 }
3886 
3887 /*
3888  * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
3889  * through a stream.
3890  *
3891  * Data currently record per event is a hrtime stamp, queue address, event
3892  * type, and a per type datum.  Much of the STREAMS framework is instrumented
3893  * for automatic flow tracing (when enabled).  Events can be defined and used
3894  * by STREAMS modules and drivers.
3895  *
3896  * Global objects:
3897  *
3898  *	str_ftevent() - Add a flow-trace event to a dblk.
3899  *	str_ftfree() - Free flow-trace data
3900  *
3901  * Local objects:
3902  *
3903  *	fthdr_cache - pointer to the kmem cache for trace header.
3904  *	ftblk_cache - pointer to the kmem cache for trace data blocks.
3905  */
3906 
3907 int str_ftnever = 1;	/* Don't do STREAMS flow tracing */
3908 
3909 void
3910 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
3911 {
3912 	ftblk_t *bp = hp->tail;
3913 	ftblk_t *nbp;
3914 	ftevnt_t *ep;
3915 	int ix, nix;
3916 
3917 	ASSERT(hp != NULL);
3918 
3919 	for (;;) {
3920 		if ((ix = bp->ix) == FTBLK_EVNTS) {
3921 			/*
3922 			 * Tail doesn't have room, so need a new tail.
3923 			 *
3924 			 * To make this MT safe, first, allocate a new
3925 			 * ftblk, and initialize it.  To make life a
3926 			 * little easier, reserve the first slot (mostly
3927 			 * by making ix = 1).  When we are finished with
3928 			 * the initialization, CAS this pointer to the
3929 			 * tail.  If this succeeds, this is the new
3930 			 * "next" block.  Otherwise, another thread
3931 			 * got here first, so free the block and start
3932 			 * again.
3933 			 */
3934 			if (!(nbp = kmem_cache_alloc(ftblk_cache,
3935 			    KM_NOSLEEP))) {
3936 				/* no mem, so punt */
3937 				str_ftnever++;
3938 				/* free up all flow data? */
3939 				return;
3940 			}
3941 			nbp->nxt = NULL;
3942 			nbp->ix = 1;
3943 			/*
3944 			 * Just in case there is another thread about
3945 			 * to get the next index, we need to make sure
3946 			 * the value is there for it.
3947 			 */
3948 			membar_producer();
3949 			if (casptr(&hp->tail, bp, nbp) == bp) {
3950 				/* CAS was successful */
3951 				bp->nxt = nbp;
3952 				membar_producer();
3953 				bp = nbp;
3954 				ix = 0;
3955 				goto cas_good;
3956 			} else {
3957 				kmem_cache_free(ftblk_cache, nbp);
3958 				bp = hp->tail;
3959 				continue;
3960 			}
3961 		}
3962 		nix = ix + 1;
3963 		if (cas32((uint32_t *)&bp->ix, ix, nix) == ix) {
3964 		cas_good:
3965 			if (curthread != hp->thread) {
3966 				hp->thread = curthread;
3967 				evnt |= FTEV_CS;
3968 			}
3969 			if (CPU->cpu_seqid != hp->cpu_seqid) {
3970 				hp->cpu_seqid = CPU->cpu_seqid;
3971 				evnt |= FTEV_PS;
3972 			}
3973 			ep = &bp->ev[ix];
3974 			break;
3975 		}
3976 	}
3977 
3978 	if (evnt & FTEV_QMASK) {
3979 		queue_t *qp = p;
3980 
3981 		/*
3982 		 * It is possible that the module info is broke
3983 		 * (as is logsubr.c at this comment writing).
3984 		 * Instead of panicing or doing other unmentionables,
3985 		 * we shall put a dummy name as the mid, and continue.
3986 		 */
3987 		if (qp->q_qinfo == NULL)
3988 			ep->mid = "NONAME";
3989 		else
3990 			ep->mid = qp->q_qinfo->qi_minfo->mi_idname;
3991 
3992 		if (!(qp->q_flag & QREADR))
3993 			evnt |= FTEV_ISWR;
3994 	} else {
3995 		ep->mid = (char *)p;
3996 	}
3997 
3998 	ep->ts = gethrtime();
3999 	ep->evnt = evnt;
4000 	ep->data = data;
4001 	hp->hash = (hp->hash << 9) + hp->hash;
4002 	hp->hash += (evnt << 16) | data;
4003 	hp->hash += (uintptr_t)ep->mid;
4004 }
4005 
4006 /*
4007  * Free flow-trace data.
4008  */
4009 void
4010 str_ftfree(dblk_t *dbp)
4011 {
4012 	fthdr_t *hp = dbp->db_fthdr;
4013 	ftblk_t *bp = &hp->first;
4014 	ftblk_t *nbp;
4015 
4016 	if (bp != hp->tail || bp->ix != 0) {
4017 		/*
4018 		 * Clear out the hash, have the tail point to itself, and free
4019 		 * any continuation blocks.
4020 		 */
4021 		bp = hp->first.nxt;
4022 		hp->tail = &hp->first;
4023 		hp->hash = 0;
4024 		hp->first.nxt = NULL;
4025 		hp->first.ix = 0;
4026 		while (bp != NULL) {
4027 			nbp = bp->nxt;
4028 			kmem_cache_free(ftblk_cache, bp);
4029 			bp = nbp;
4030 		}
4031 	}
4032 	kmem_cache_free(fthdr_cache, hp);
4033 	dbp->db_fthdr = NULL;
4034 }
4035