xref: /titanic_50/usr/src/uts/common/io/stream.c (revision d6fc3580b959bb971ca808c63cba30811e73737c)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
22 /*	  All Rights Reserved  	*/
23 
24 /*
25  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
26  * Use is subject to license terms.
27  */
28 
29 #include <sys/types.h>
30 #include <sys/param.h>
31 #include <sys/thread.h>
32 #include <sys/sysmacros.h>
33 #include <sys/stropts.h>
34 #include <sys/stream.h>
35 #include <sys/strsubr.h>
36 #include <sys/strsun.h>
37 #include <sys/conf.h>
38 #include <sys/debug.h>
39 #include <sys/cmn_err.h>
40 #include <sys/kmem.h>
41 #include <sys/atomic.h>
42 #include <sys/errno.h>
43 #include <sys/vtrace.h>
44 #include <sys/ftrace.h>
45 #include <sys/ontrap.h>
46 #include <sys/multidata.h>
47 #include <sys/multidata_impl.h>
48 #include <sys/sdt.h>
49 #include <sys/strft.h>
50 
51 #ifdef DEBUG
52 #include <sys/kmem_impl.h>
53 #endif
54 
55 /*
56  * This file contains all the STREAMS utility routines that may
57  * be used by modules and drivers.
58  */
59 
60 /*
61  * STREAMS message allocator: principles of operation
62  *
63  * The streams message allocator consists of all the routines that
64  * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
65  * dupb(), freeb() and freemsg().  What follows is a high-level view
66  * of how the allocator works.
67  *
68  * Every streams message consists of one or more mblks, a dblk, and data.
69  * All mblks for all types of messages come from a common mblk_cache.
70  * The dblk and data come in several flavors, depending on how the
71  * message is allocated:
72  *
73  * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
74  *     fixed-size dblk/data caches. For message sizes that are multiples of
75  *     PAGESIZE, dblks are allocated separately from the buffer.
76  *     The associated buffer is allocated by the constructor using kmem_alloc().
77  *     For all other message sizes, dblk and its associated data is allocated
78  *     as a single contiguous chunk of memory.
79  *     Objects in these caches consist of a dblk plus its associated data.
80  *     allocb() determines the nearest-size cache by table lookup:
81  *     the dblk_cache[] array provides the mapping from size to dblk cache.
82  *
83  * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
84  *     kmem_alloc()'ing a buffer for the data and supplying that
85  *     buffer to gesballoc(), described below.
86  *
87  * (3) The four flavors of [d]esballoc[a] are all implemented by a
88  *     common routine, gesballoc() ("generic esballoc").  gesballoc()
89  *     allocates a dblk from the global dblk_esb_cache and sets db_base,
90  *     db_lim and db_frtnp to describe the caller-supplied buffer.
91  *
92  * While there are several routines to allocate messages, there is only
93  * one routine to free messages: freeb().  freeb() simply invokes the
94  * dblk's free method, dbp->db_free(), which is set at allocation time.
95  *
96  * dupb() creates a new reference to a message by allocating a new mblk,
97  * incrementing the dblk reference count and setting the dblk's free
98  * method to dblk_decref().  The dblk's original free method is retained
99  * in db_lastfree.  dblk_decref() decrements the reference count on each
100  * freeb().  If this is not the last reference it just frees the mblk;
101  * if this *is* the last reference, it restores db_free to db_lastfree,
102  * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
103  *
104  * The implementation makes aggressive use of kmem object caching for
105  * maximum performance.  This makes the code simple and compact, but
106  * also a bit abstruse in some places.  The invariants that constitute a
107  * message's constructed state, described below, are more subtle than usual.
108  *
109  * Every dblk has an "attached mblk" as part of its constructed state.
110  * The mblk is allocated by the dblk's constructor and remains attached
111  * until the message is either dup'ed or pulled up.  In the dupb() case
112  * the mblk association doesn't matter until the last free, at which time
113  * dblk_decref() attaches the last mblk to the dblk.  pullupmsg() affects
114  * the mblk association because it swaps the leading mblks of two messages,
115  * so it is responsible for swapping their db_mblk pointers accordingly.
116  * From a constructed-state viewpoint it doesn't matter that a dblk's
117  * attached mblk can change while the message is allocated; all that
118  * matters is that the dblk has *some* attached mblk when it's freed.
119  *
120  * The sizes of the allocb() small-message caches are not magical.
121  * They represent a good trade-off between internal and external
122  * fragmentation for current workloads.  They should be reevaluated
123  * periodically, especially if allocations larger than DBLK_MAX_CACHE
124  * become common.  We use 64-byte alignment so that dblks don't
125  * straddle cache lines unnecessarily.
126  */
127 #define	DBLK_MAX_CACHE		73728
128 #define	DBLK_CACHE_ALIGN	64
129 #define	DBLK_MIN_SIZE		8
130 #define	DBLK_SIZE_SHIFT		3
131 
132 #ifdef _BIG_ENDIAN
133 #define	DBLK_RTFU_SHIFT(field)	\
134 	(8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
135 #else
136 #define	DBLK_RTFU_SHIFT(field)	\
137 	(8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
138 #endif
139 
140 #define	DBLK_RTFU(ref, type, flags, uioflag)	\
141 	(((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
142 	((type) << DBLK_RTFU_SHIFT(db_type)) | \
143 	(((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
144 	((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
145 #define	DBLK_RTFU_REF_MASK	(DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
146 #define	DBLK_RTFU_WORD(dbp)	(*((uint32_t *)&(dbp)->db_ref))
147 #define	MBLK_BAND_FLAG_WORD(mp)	(*((uint32_t *)&(mp)->b_band))
148 
149 static size_t dblk_sizes[] = {
150 #ifdef _LP64
151 	16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856,
152 	8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624,
153 	40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392,
154 #else
155 	64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904,
156 	8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672,
157 	40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440,
158 #endif
159 	DBLK_MAX_CACHE, 0
160 };
161 
162 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
163 static struct kmem_cache *mblk_cache;
164 static struct kmem_cache *dblk_esb_cache;
165 static struct kmem_cache *fthdr_cache;
166 static struct kmem_cache *ftblk_cache;
167 
168 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
169 static mblk_t *allocb_oversize(size_t size, int flags);
170 static int allocb_tryhard_fails;
171 static void frnop_func(void *arg);
172 frtn_t frnop = { frnop_func };
173 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
174 
175 static boolean_t rwnext_enter(queue_t *qp);
176 static void rwnext_exit(queue_t *qp);
177 
178 /*
179  * Patchable mblk/dblk kmem_cache flags.
180  */
181 int dblk_kmem_flags = 0;
182 int mblk_kmem_flags = 0;
183 
184 static int
185 dblk_constructor(void *buf, void *cdrarg, int kmflags)
186 {
187 	dblk_t *dbp = buf;
188 	ssize_t msg_size = (ssize_t)cdrarg;
189 	size_t index;
190 
191 	ASSERT(msg_size != 0);
192 
193 	index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
194 
195 	ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
196 
197 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
198 		return (-1);
199 	if ((msg_size & PAGEOFFSET) == 0) {
200 		dbp->db_base = kmem_alloc(msg_size, kmflags);
201 		if (dbp->db_base == NULL) {
202 			kmem_cache_free(mblk_cache, dbp->db_mblk);
203 			return (-1);
204 		}
205 	} else {
206 		dbp->db_base = (unsigned char *)&dbp[1];
207 	}
208 
209 	dbp->db_mblk->b_datap = dbp;
210 	dbp->db_cache = dblk_cache[index];
211 	dbp->db_lim = dbp->db_base + msg_size;
212 	dbp->db_free = dbp->db_lastfree = dblk_lastfree;
213 	dbp->db_frtnp = NULL;
214 	dbp->db_fthdr = NULL;
215 	dbp->db_credp = NULL;
216 	dbp->db_cpid = -1;
217 	dbp->db_struioflag = 0;
218 	dbp->db_struioun.cksum.flags = 0;
219 	return (0);
220 }
221 
222 /*ARGSUSED*/
223 static int
224 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
225 {
226 	dblk_t *dbp = buf;
227 
228 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
229 		return (-1);
230 	dbp->db_mblk->b_datap = dbp;
231 	dbp->db_cache = dblk_esb_cache;
232 	dbp->db_fthdr = NULL;
233 	dbp->db_credp = NULL;
234 	dbp->db_cpid = -1;
235 	dbp->db_struioflag = 0;
236 	dbp->db_struioun.cksum.flags = 0;
237 	return (0);
238 }
239 
240 static int
241 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
242 {
243 	dblk_t *dbp = buf;
244 	bcache_t *bcp = cdrarg;
245 
246 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
247 		return (-1);
248 
249 	dbp->db_base = kmem_cache_alloc(bcp->buffer_cache, kmflags);
250 	if (dbp->db_base == NULL) {
251 		kmem_cache_free(mblk_cache, dbp->db_mblk);
252 		return (-1);
253 	}
254 
255 	dbp->db_mblk->b_datap = dbp;
256 	dbp->db_cache = (void *)bcp;
257 	dbp->db_lim = dbp->db_base + bcp->size;
258 	dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
259 	dbp->db_frtnp = NULL;
260 	dbp->db_fthdr = NULL;
261 	dbp->db_credp = NULL;
262 	dbp->db_cpid = -1;
263 	dbp->db_struioflag = 0;
264 	dbp->db_struioun.cksum.flags = 0;
265 	return (0);
266 }
267 
268 /*ARGSUSED*/
269 static void
270 dblk_destructor(void *buf, void *cdrarg)
271 {
272 	dblk_t *dbp = buf;
273 	ssize_t msg_size = (ssize_t)cdrarg;
274 
275 	ASSERT(dbp->db_mblk->b_datap == dbp);
276 	ASSERT(msg_size != 0);
277 	ASSERT(dbp->db_struioflag == 0);
278 	ASSERT(dbp->db_struioun.cksum.flags == 0);
279 
280 	if ((msg_size & PAGEOFFSET) == 0) {
281 		kmem_free(dbp->db_base, msg_size);
282 	}
283 
284 	kmem_cache_free(mblk_cache, dbp->db_mblk);
285 }
286 
287 static void
288 bcache_dblk_destructor(void *buf, void *cdrarg)
289 {
290 	dblk_t *dbp = buf;
291 	bcache_t *bcp = cdrarg;
292 
293 	kmem_cache_free(bcp->buffer_cache, dbp->db_base);
294 
295 	ASSERT(dbp->db_mblk->b_datap == dbp);
296 	ASSERT(dbp->db_struioflag == 0);
297 	ASSERT(dbp->db_struioun.cksum.flags == 0);
298 
299 	kmem_cache_free(mblk_cache, dbp->db_mblk);
300 }
301 
302 /* ARGSUSED */
303 static int
304 ftblk_constructor(void *buf, void *cdrarg, int kmflags)
305 {
306 	ftblk_t *fbp = buf;
307 	int i;
308 
309 	bzero(fbp, sizeof (ftblk_t));
310 	if (str_ftstack != 0) {
311 		for (i = 0; i < FTBLK_EVNTS; i++)
312 			fbp->ev[i].stk = kmem_alloc(sizeof (ftstk_t), kmflags);
313 	}
314 
315 	return (0);
316 }
317 
318 /* ARGSUSED */
319 static void
320 ftblk_destructor(void *buf, void *cdrarg)
321 {
322 	ftblk_t *fbp = buf;
323 	int i;
324 
325 	if (str_ftstack != 0) {
326 		for (i = 0; i < FTBLK_EVNTS; i++) {
327 			if (fbp->ev[i].stk != NULL) {
328 				kmem_free(fbp->ev[i].stk, sizeof (ftstk_t));
329 				fbp->ev[i].stk = NULL;
330 			}
331 		}
332 	}
333 }
334 
335 static int
336 fthdr_constructor(void *buf, void *cdrarg, int kmflags)
337 {
338 	fthdr_t *fhp = buf;
339 
340 	return (ftblk_constructor(&fhp->first, cdrarg, kmflags));
341 }
342 
343 static void
344 fthdr_destructor(void *buf, void *cdrarg)
345 {
346 	fthdr_t *fhp = buf;
347 
348 	ftblk_destructor(&fhp->first, cdrarg);
349 }
350 
351 void
352 streams_msg_init(void)
353 {
354 	char name[40];
355 	size_t size;
356 	size_t lastsize = DBLK_MIN_SIZE;
357 	size_t *sizep;
358 	struct kmem_cache *cp;
359 	size_t tot_size;
360 	int offset;
361 
362 	mblk_cache = kmem_cache_create("streams_mblk", sizeof (mblk_t), 32,
363 	    NULL, NULL, NULL, NULL, NULL, mblk_kmem_flags);
364 
365 	for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
366 
367 		if ((offset = (size & PAGEOFFSET)) != 0) {
368 			/*
369 			 * We are in the middle of a page, dblk should
370 			 * be allocated on the same page
371 			 */
372 			tot_size = size + sizeof (dblk_t);
373 			ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
374 			    < PAGESIZE);
375 			ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
376 
377 		} else {
378 
379 			/*
380 			 * buf size is multiple of page size, dblk and
381 			 * buffer are allocated separately.
382 			 */
383 
384 			ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
385 			tot_size = sizeof (dblk_t);
386 		}
387 
388 		(void) sprintf(name, "streams_dblk_%ld", size);
389 		cp = kmem_cache_create(name, tot_size, DBLK_CACHE_ALIGN,
390 		    dblk_constructor, dblk_destructor, NULL, (void *)(size),
391 		    NULL, dblk_kmem_flags);
392 
393 		while (lastsize <= size) {
394 			dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
395 			lastsize += DBLK_MIN_SIZE;
396 		}
397 	}
398 
399 	dblk_esb_cache = kmem_cache_create("streams_dblk_esb", sizeof (dblk_t),
400 	    DBLK_CACHE_ALIGN, dblk_esb_constructor, dblk_destructor, NULL,
401 	    (void *)sizeof (dblk_t), NULL, dblk_kmem_flags);
402 	fthdr_cache = kmem_cache_create("streams_fthdr", sizeof (fthdr_t), 32,
403 	    fthdr_constructor, fthdr_destructor, NULL, NULL, NULL, 0);
404 	ftblk_cache = kmem_cache_create("streams_ftblk", sizeof (ftblk_t), 32,
405 	    ftblk_constructor, ftblk_destructor, NULL, NULL, NULL, 0);
406 
407 	/* Initialize Multidata caches */
408 	mmd_init();
409 
410 	/* initialize throttling queue for esballoc */
411 	esballoc_queue_init();
412 }
413 
414 /*ARGSUSED*/
415 mblk_t *
416 allocb(size_t size, uint_t pri)
417 {
418 	dblk_t *dbp;
419 	mblk_t *mp;
420 	size_t index;
421 
422 	index =  (size - 1)  >> DBLK_SIZE_SHIFT;
423 
424 	if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
425 		if (size != 0) {
426 			mp = allocb_oversize(size, KM_NOSLEEP);
427 			goto out;
428 		}
429 		index = 0;
430 	}
431 
432 	if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
433 		mp = NULL;
434 		goto out;
435 	}
436 
437 	mp = dbp->db_mblk;
438 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
439 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
440 	mp->b_rptr = mp->b_wptr = dbp->db_base;
441 	mp->b_queue = NULL;
442 	MBLK_BAND_FLAG_WORD(mp) = 0;
443 	STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
444 out:
445 	FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
446 
447 	return (mp);
448 }
449 
450 mblk_t *
451 allocb_tmpl(size_t size, const mblk_t *tmpl)
452 {
453 	mblk_t *mp = allocb(size, 0);
454 
455 	if (mp != NULL) {
456 		cred_t *cr = DB_CRED(tmpl);
457 		if (cr != NULL)
458 			crhold(mp->b_datap->db_credp = cr);
459 		DB_CPID(mp) = DB_CPID(tmpl);
460 		DB_TYPE(mp) = DB_TYPE(tmpl);
461 	}
462 	return (mp);
463 }
464 
465 mblk_t *
466 allocb_cred(size_t size, cred_t *cr)
467 {
468 	mblk_t *mp = allocb(size, 0);
469 
470 	if (mp != NULL && cr != NULL)
471 		crhold(mp->b_datap->db_credp = cr);
472 
473 	return (mp);
474 }
475 
476 mblk_t *
477 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr)
478 {
479 	mblk_t *mp = allocb_wait(size, 0, flags, error);
480 
481 	if (mp != NULL && cr != NULL)
482 		crhold(mp->b_datap->db_credp = cr);
483 
484 	return (mp);
485 }
486 
487 void
488 freeb(mblk_t *mp)
489 {
490 	dblk_t *dbp = mp->b_datap;
491 
492 	ASSERT(dbp->db_ref > 0);
493 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
494 	FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
495 
496 	STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
497 
498 	dbp->db_free(mp, dbp);
499 }
500 
501 void
502 freemsg(mblk_t *mp)
503 {
504 	FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
505 	while (mp) {
506 		dblk_t *dbp = mp->b_datap;
507 		mblk_t *mp_cont = mp->b_cont;
508 
509 		ASSERT(dbp->db_ref > 0);
510 		ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
511 
512 		STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
513 
514 		dbp->db_free(mp, dbp);
515 		mp = mp_cont;
516 	}
517 }
518 
519 /*
520  * Reallocate a block for another use.  Try hard to use the old block.
521  * If the old data is wanted (copy), leave b_wptr at the end of the data,
522  * otherwise return b_wptr = b_rptr.
523  *
524  * This routine is private and unstable.
525  */
526 mblk_t	*
527 reallocb(mblk_t *mp, size_t size, uint_t copy)
528 {
529 	mblk_t		*mp1;
530 	unsigned char	*old_rptr;
531 	ptrdiff_t	cur_size;
532 
533 	if (mp == NULL)
534 		return (allocb(size, BPRI_HI));
535 
536 	cur_size = mp->b_wptr - mp->b_rptr;
537 	old_rptr = mp->b_rptr;
538 
539 	ASSERT(mp->b_datap->db_ref != 0);
540 
541 	if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
542 		/*
543 		 * If the data is wanted and it will fit where it is, no
544 		 * work is required.
545 		 */
546 		if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
547 			return (mp);
548 
549 		mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
550 		mp1 = mp;
551 	} else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
552 		/* XXX other mp state could be copied too, db_flags ... ? */
553 		mp1->b_cont = mp->b_cont;
554 	} else {
555 		return (NULL);
556 	}
557 
558 	if (copy) {
559 		bcopy(old_rptr, mp1->b_rptr, cur_size);
560 		mp1->b_wptr = mp1->b_rptr + cur_size;
561 	}
562 
563 	if (mp != mp1)
564 		freeb(mp);
565 
566 	return (mp1);
567 }
568 
569 static void
570 dblk_lastfree(mblk_t *mp, dblk_t *dbp)
571 {
572 	ASSERT(dbp->db_mblk == mp);
573 	if (dbp->db_fthdr != NULL)
574 		str_ftfree(dbp);
575 
576 	/* set credp and projid to be 'unspecified' before returning to cache */
577 	if (dbp->db_credp != NULL) {
578 		crfree(dbp->db_credp);
579 		dbp->db_credp = NULL;
580 	}
581 	dbp->db_cpid = -1;
582 
583 	/* Reset the struioflag and the checksum flag fields */
584 	dbp->db_struioflag = 0;
585 	dbp->db_struioun.cksum.flags = 0;
586 
587 	/* and the COOKED and/or UIOA flag(s) */
588 	dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA);
589 
590 	kmem_cache_free(dbp->db_cache, dbp);
591 }
592 
593 static void
594 dblk_decref(mblk_t *mp, dblk_t *dbp)
595 {
596 	if (dbp->db_ref != 1) {
597 		uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
598 		    -(1 << DBLK_RTFU_SHIFT(db_ref)));
599 		/*
600 		 * atomic_add_32_nv() just decremented db_ref, so we no longer
601 		 * have a reference to the dblk, which means another thread
602 		 * could free it.  Therefore we cannot examine the dblk to
603 		 * determine whether ours was the last reference.  Instead,
604 		 * we extract the new and minimum reference counts from rtfu.
605 		 * Note that all we're really saying is "if (ref != refmin)".
606 		 */
607 		if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
608 		    ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
609 			kmem_cache_free(mblk_cache, mp);
610 			return;
611 		}
612 	}
613 	dbp->db_mblk = mp;
614 	dbp->db_free = dbp->db_lastfree;
615 	dbp->db_lastfree(mp, dbp);
616 }
617 
618 mblk_t *
619 dupb(mblk_t *mp)
620 {
621 	dblk_t *dbp = mp->b_datap;
622 	mblk_t *new_mp;
623 	uint32_t oldrtfu, newrtfu;
624 
625 	if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
626 		goto out;
627 
628 	new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
629 	new_mp->b_rptr = mp->b_rptr;
630 	new_mp->b_wptr = mp->b_wptr;
631 	new_mp->b_datap = dbp;
632 	new_mp->b_queue = NULL;
633 	MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
634 
635 	STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
636 
637 	dbp->db_free = dblk_decref;
638 	do {
639 		ASSERT(dbp->db_ref > 0);
640 		oldrtfu = DBLK_RTFU_WORD(dbp);
641 		newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
642 		/*
643 		 * If db_ref is maxed out we can't dup this message anymore.
644 		 */
645 		if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
646 			kmem_cache_free(mblk_cache, new_mp);
647 			new_mp = NULL;
648 			goto out;
649 		}
650 	} while (cas32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != oldrtfu);
651 
652 out:
653 	FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
654 	return (new_mp);
655 }
656 
657 static void
658 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
659 {
660 	frtn_t *frp = dbp->db_frtnp;
661 
662 	ASSERT(dbp->db_mblk == mp);
663 	frp->free_func(frp->free_arg);
664 	if (dbp->db_fthdr != NULL)
665 		str_ftfree(dbp);
666 
667 	/* set credp and projid to be 'unspecified' before returning to cache */
668 	if (dbp->db_credp != NULL) {
669 		crfree(dbp->db_credp);
670 		dbp->db_credp = NULL;
671 	}
672 	dbp->db_cpid = -1;
673 	dbp->db_struioflag = 0;
674 	dbp->db_struioun.cksum.flags = 0;
675 
676 	kmem_cache_free(dbp->db_cache, dbp);
677 }
678 
679 /*ARGSUSED*/
680 static void
681 frnop_func(void *arg)
682 {
683 }
684 
685 /*
686  * Generic esballoc used to implement the four flavors: [d]esballoc[a].
687  */
688 static mblk_t *
689 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
690 	void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
691 {
692 	dblk_t *dbp;
693 	mblk_t *mp;
694 
695 	ASSERT(base != NULL && frp != NULL);
696 
697 	if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
698 		mp = NULL;
699 		goto out;
700 	}
701 
702 	mp = dbp->db_mblk;
703 	dbp->db_base = base;
704 	dbp->db_lim = base + size;
705 	dbp->db_free = dbp->db_lastfree = lastfree;
706 	dbp->db_frtnp = frp;
707 	DBLK_RTFU_WORD(dbp) = db_rtfu;
708 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
709 	mp->b_rptr = mp->b_wptr = base;
710 	mp->b_queue = NULL;
711 	MBLK_BAND_FLAG_WORD(mp) = 0;
712 
713 out:
714 	FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
715 	return (mp);
716 }
717 
718 /*ARGSUSED*/
719 mblk_t *
720 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
721 {
722 	mblk_t *mp;
723 
724 	/*
725 	 * Note that this is structured to allow the common case (i.e.
726 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
727 	 * call optimization.
728 	 */
729 	if (!str_ftnever) {
730 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
731 		    frp, freebs_enqueue, KM_NOSLEEP);
732 
733 		if (mp != NULL)
734 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
735 		return (mp);
736 	}
737 
738 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
739 	    frp, freebs_enqueue, KM_NOSLEEP));
740 }
741 
742 /*
743  * Same as esballoc() but sleeps waiting for memory.
744  */
745 /*ARGSUSED*/
746 mblk_t *
747 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
748 {
749 	mblk_t *mp;
750 
751 	/*
752 	 * Note that this is structured to allow the common case (i.e.
753 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
754 	 * call optimization.
755 	 */
756 	if (!str_ftnever) {
757 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
758 		    frp, freebs_enqueue, KM_SLEEP);
759 
760 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
761 		return (mp);
762 	}
763 
764 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
765 	    frp, freebs_enqueue, KM_SLEEP));
766 }
767 
768 /*ARGSUSED*/
769 mblk_t *
770 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
771 {
772 	mblk_t *mp;
773 
774 	/*
775 	 * Note that this is structured to allow the common case (i.e.
776 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
777 	 * call optimization.
778 	 */
779 	if (!str_ftnever) {
780 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
781 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
782 
783 		if (mp != NULL)
784 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
785 		return (mp);
786 	}
787 
788 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
789 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
790 }
791 
792 /*ARGSUSED*/
793 mblk_t *
794 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
795 {
796 	mblk_t *mp;
797 
798 	/*
799 	 * Note that this is structured to allow the common case (i.e.
800 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
801 	 * call optimization.
802 	 */
803 	if (!str_ftnever) {
804 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
805 		    frp, freebs_enqueue, KM_NOSLEEP);
806 
807 		if (mp != NULL)
808 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
809 		return (mp);
810 	}
811 
812 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
813 	    frp, freebs_enqueue, KM_NOSLEEP));
814 }
815 
816 /*ARGSUSED*/
817 mblk_t *
818 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
819 {
820 	mblk_t *mp;
821 
822 	/*
823 	 * Note that this is structured to allow the common case (i.e.
824 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
825 	 * call optimization.
826 	 */
827 	if (!str_ftnever) {
828 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
829 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
830 
831 		if (mp != NULL)
832 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
833 		return (mp);
834 	}
835 
836 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
837 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
838 }
839 
840 static void
841 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
842 {
843 	bcache_t *bcp = dbp->db_cache;
844 
845 	ASSERT(dbp->db_mblk == mp);
846 	if (dbp->db_fthdr != NULL)
847 		str_ftfree(dbp);
848 
849 	/* set credp and projid to be 'unspecified' before returning to cache */
850 	if (dbp->db_credp != NULL) {
851 		crfree(dbp->db_credp);
852 		dbp->db_credp = NULL;
853 	}
854 	dbp->db_cpid = -1;
855 	dbp->db_struioflag = 0;
856 	dbp->db_struioun.cksum.flags = 0;
857 
858 	mutex_enter(&bcp->mutex);
859 	kmem_cache_free(bcp->dblk_cache, dbp);
860 	bcp->alloc--;
861 
862 	if (bcp->alloc == 0 && bcp->destroy != 0) {
863 		kmem_cache_destroy(bcp->dblk_cache);
864 		kmem_cache_destroy(bcp->buffer_cache);
865 		mutex_exit(&bcp->mutex);
866 		mutex_destroy(&bcp->mutex);
867 		kmem_free(bcp, sizeof (bcache_t));
868 	} else {
869 		mutex_exit(&bcp->mutex);
870 	}
871 }
872 
873 bcache_t *
874 bcache_create(char *name, size_t size, uint_t align)
875 {
876 	bcache_t *bcp;
877 	char buffer[255];
878 
879 	ASSERT((align & (align - 1)) == 0);
880 
881 	if ((bcp = kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == NULL)
882 		return (NULL);
883 
884 	bcp->size = size;
885 	bcp->align = align;
886 	bcp->alloc = 0;
887 	bcp->destroy = 0;
888 
889 	mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
890 
891 	(void) sprintf(buffer, "%s_buffer_cache", name);
892 	bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
893 	    NULL, NULL, NULL, 0);
894 	(void) sprintf(buffer, "%s_dblk_cache", name);
895 	bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
896 	    DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
897 	    NULL, (void *)bcp, NULL, 0);
898 
899 	return (bcp);
900 }
901 
902 void
903 bcache_destroy(bcache_t *bcp)
904 {
905 	ASSERT(bcp != NULL);
906 
907 	mutex_enter(&bcp->mutex);
908 	if (bcp->alloc == 0) {
909 		kmem_cache_destroy(bcp->dblk_cache);
910 		kmem_cache_destroy(bcp->buffer_cache);
911 		mutex_exit(&bcp->mutex);
912 		mutex_destroy(&bcp->mutex);
913 		kmem_free(bcp, sizeof (bcache_t));
914 	} else {
915 		bcp->destroy++;
916 		mutex_exit(&bcp->mutex);
917 	}
918 }
919 
920 /*ARGSUSED*/
921 mblk_t *
922 bcache_allocb(bcache_t *bcp, uint_t pri)
923 {
924 	dblk_t *dbp;
925 	mblk_t *mp = NULL;
926 
927 	ASSERT(bcp != NULL);
928 
929 	mutex_enter(&bcp->mutex);
930 	if (bcp->destroy != 0) {
931 		mutex_exit(&bcp->mutex);
932 		goto out;
933 	}
934 
935 	if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
936 		mutex_exit(&bcp->mutex);
937 		goto out;
938 	}
939 	bcp->alloc++;
940 	mutex_exit(&bcp->mutex);
941 
942 	ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
943 
944 	mp = dbp->db_mblk;
945 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
946 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
947 	mp->b_rptr = mp->b_wptr = dbp->db_base;
948 	mp->b_queue = NULL;
949 	MBLK_BAND_FLAG_WORD(mp) = 0;
950 	STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
951 out:
952 	FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
953 
954 	return (mp);
955 }
956 
957 static void
958 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
959 {
960 	ASSERT(dbp->db_mblk == mp);
961 	if (dbp->db_fthdr != NULL)
962 		str_ftfree(dbp);
963 
964 	/* set credp and projid to be 'unspecified' before returning to cache */
965 	if (dbp->db_credp != NULL) {
966 		crfree(dbp->db_credp);
967 		dbp->db_credp = NULL;
968 	}
969 	dbp->db_cpid = -1;
970 	dbp->db_struioflag = 0;
971 	dbp->db_struioun.cksum.flags = 0;
972 
973 	kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
974 	kmem_cache_free(dbp->db_cache, dbp);
975 }
976 
977 static mblk_t *
978 allocb_oversize(size_t size, int kmflags)
979 {
980 	mblk_t *mp;
981 	void *buf;
982 
983 	size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
984 	if ((buf = kmem_alloc(size, kmflags)) == NULL)
985 		return (NULL);
986 	if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
987 	    &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
988 		kmem_free(buf, size);
989 
990 	if (mp != NULL)
991 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
992 
993 	return (mp);
994 }
995 
996 mblk_t *
997 allocb_tryhard(size_t target_size)
998 {
999 	size_t size;
1000 	mblk_t *bp;
1001 
1002 	for (size = target_size; size < target_size + 512;
1003 	    size += DBLK_CACHE_ALIGN)
1004 		if ((bp = allocb(size, BPRI_HI)) != NULL)
1005 			return (bp);
1006 	allocb_tryhard_fails++;
1007 	return (NULL);
1008 }
1009 
1010 /*
1011  * This routine is consolidation private for STREAMS internal use
1012  * This routine may only be called from sync routines (i.e., not
1013  * from put or service procedures).  It is located here (rather
1014  * than strsubr.c) so that we don't have to expose all of the
1015  * allocb() implementation details in header files.
1016  */
1017 mblk_t *
1018 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
1019 {
1020 	dblk_t *dbp;
1021 	mblk_t *mp;
1022 	size_t index;
1023 
1024 	index = (size -1) >> DBLK_SIZE_SHIFT;
1025 
1026 	if (flags & STR_NOSIG) {
1027 		if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
1028 			if (size != 0) {
1029 				mp = allocb_oversize(size, KM_SLEEP);
1030 				FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
1031 				    (uintptr_t)mp);
1032 				return (mp);
1033 			}
1034 			index = 0;
1035 		}
1036 
1037 		dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
1038 		mp = dbp->db_mblk;
1039 		DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1040 		mp->b_next = mp->b_prev = mp->b_cont = NULL;
1041 		mp->b_rptr = mp->b_wptr = dbp->db_base;
1042 		mp->b_queue = NULL;
1043 		MBLK_BAND_FLAG_WORD(mp) = 0;
1044 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
1045 
1046 		FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
1047 
1048 	} else {
1049 		while ((mp = allocb(size, pri)) == NULL) {
1050 			if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
1051 				return (NULL);
1052 		}
1053 	}
1054 
1055 	return (mp);
1056 }
1057 
1058 /*
1059  * Call function 'func' with 'arg' when a class zero block can
1060  * be allocated with priority 'pri'.
1061  */
1062 bufcall_id_t
1063 esbbcall(uint_t pri, void (*func)(void *), void *arg)
1064 {
1065 	return (bufcall(1, pri, func, arg));
1066 }
1067 
1068 /*
1069  * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
1070  * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
1071  * This provides consistency for all internal allocators of ioctl.
1072  */
1073 mblk_t *
1074 mkiocb(uint_t cmd)
1075 {
1076 	struct iocblk	*ioc;
1077 	mblk_t		*mp;
1078 
1079 	/*
1080 	 * Allocate enough space for any of the ioctl related messages.
1081 	 */
1082 	if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
1083 		return (NULL);
1084 
1085 	bzero(mp->b_rptr, sizeof (union ioctypes));
1086 
1087 	/*
1088 	 * Set the mblk_t information and ptrs correctly.
1089 	 */
1090 	mp->b_wptr += sizeof (struct iocblk);
1091 	mp->b_datap->db_type = M_IOCTL;
1092 
1093 	/*
1094 	 * Fill in the fields.
1095 	 */
1096 	ioc		= (struct iocblk *)mp->b_rptr;
1097 	ioc->ioc_cmd	= cmd;
1098 	ioc->ioc_cr	= kcred;
1099 	ioc->ioc_id	= getiocseqno();
1100 	ioc->ioc_flag	= IOC_NATIVE;
1101 	return (mp);
1102 }
1103 
1104 /*
1105  * test if block of given size can be allocated with a request of
1106  * the given priority.
1107  * 'pri' is no longer used, but is retained for compatibility.
1108  */
1109 /* ARGSUSED */
1110 int
1111 testb(size_t size, uint_t pri)
1112 {
1113 	return ((size + sizeof (dblk_t)) <= kmem_avail());
1114 }
1115 
1116 /*
1117  * Call function 'func' with argument 'arg' when there is a reasonably
1118  * good chance that a block of size 'size' can be allocated.
1119  * 'pri' is no longer used, but is retained for compatibility.
1120  */
1121 /* ARGSUSED */
1122 bufcall_id_t
1123 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
1124 {
1125 	static long bid = 1;	/* always odd to save checking for zero */
1126 	bufcall_id_t bc_id;
1127 	struct strbufcall *bcp;
1128 
1129 	if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
1130 		return (0);
1131 
1132 	bcp->bc_func = func;
1133 	bcp->bc_arg = arg;
1134 	bcp->bc_size = size;
1135 	bcp->bc_next = NULL;
1136 	bcp->bc_executor = NULL;
1137 
1138 	mutex_enter(&strbcall_lock);
1139 	/*
1140 	 * After bcp is linked into strbcalls and strbcall_lock is dropped there
1141 	 * should be no references to bcp since it may be freed by
1142 	 * runbufcalls(). Since bcp_id field is returned, we save its value in
1143 	 * the local var.
1144 	 */
1145 	bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2);	/* keep it odd */
1146 
1147 	/*
1148 	 * add newly allocated stream event to existing
1149 	 * linked list of events.
1150 	 */
1151 	if (strbcalls.bc_head == NULL) {
1152 		strbcalls.bc_head = strbcalls.bc_tail = bcp;
1153 	} else {
1154 		strbcalls.bc_tail->bc_next = bcp;
1155 		strbcalls.bc_tail = bcp;
1156 	}
1157 
1158 	cv_signal(&strbcall_cv);
1159 	mutex_exit(&strbcall_lock);
1160 	return (bc_id);
1161 }
1162 
1163 /*
1164  * Cancel a bufcall request.
1165  */
1166 void
1167 unbufcall(bufcall_id_t id)
1168 {
1169 	strbufcall_t *bcp, *pbcp;
1170 
1171 	mutex_enter(&strbcall_lock);
1172 again:
1173 	pbcp = NULL;
1174 	for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
1175 		if (id == bcp->bc_id)
1176 			break;
1177 		pbcp = bcp;
1178 	}
1179 	if (bcp) {
1180 		if (bcp->bc_executor != NULL) {
1181 			if (bcp->bc_executor != curthread) {
1182 				cv_wait(&bcall_cv, &strbcall_lock);
1183 				goto again;
1184 			}
1185 		} else {
1186 			if (pbcp)
1187 				pbcp->bc_next = bcp->bc_next;
1188 			else
1189 				strbcalls.bc_head = bcp->bc_next;
1190 			if (bcp == strbcalls.bc_tail)
1191 				strbcalls.bc_tail = pbcp;
1192 			kmem_free(bcp, sizeof (strbufcall_t));
1193 		}
1194 	}
1195 	mutex_exit(&strbcall_lock);
1196 }
1197 
1198 /*
1199  * Duplicate a message block by block (uses dupb), returning
1200  * a pointer to the duplicate message.
1201  * Returns a non-NULL value only if the entire message
1202  * was dup'd.
1203  */
1204 mblk_t *
1205 dupmsg(mblk_t *bp)
1206 {
1207 	mblk_t *head, *nbp;
1208 
1209 	if (!bp || !(nbp = head = dupb(bp)))
1210 		return (NULL);
1211 
1212 	while (bp->b_cont) {
1213 		if (!(nbp->b_cont = dupb(bp->b_cont))) {
1214 			freemsg(head);
1215 			return (NULL);
1216 		}
1217 		nbp = nbp->b_cont;
1218 		bp = bp->b_cont;
1219 	}
1220 	return (head);
1221 }
1222 
1223 #define	DUPB_NOLOAN(bp) \
1224 	((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
1225 	copyb((bp)) : dupb((bp)))
1226 
1227 mblk_t *
1228 dupmsg_noloan(mblk_t *bp)
1229 {
1230 	mblk_t *head, *nbp;
1231 
1232 	if (bp == NULL || DB_TYPE(bp) != M_DATA ||
1233 	    ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
1234 		return (NULL);
1235 
1236 	while (bp->b_cont) {
1237 		if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
1238 			freemsg(head);
1239 			return (NULL);
1240 		}
1241 		nbp = nbp->b_cont;
1242 		bp = bp->b_cont;
1243 	}
1244 	return (head);
1245 }
1246 
1247 /*
1248  * Copy data from message and data block to newly allocated message and
1249  * data block. Returns new message block pointer, or NULL if error.
1250  * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
1251  * as in the original even when db_base is not word aligned. (bug 1052877)
1252  */
1253 mblk_t *
1254 copyb(mblk_t *bp)
1255 {
1256 	mblk_t	*nbp;
1257 	dblk_t	*dp, *ndp;
1258 	uchar_t *base;
1259 	size_t	size;
1260 	size_t	unaligned;
1261 
1262 	ASSERT(bp->b_wptr >= bp->b_rptr);
1263 
1264 	dp = bp->b_datap;
1265 	if (dp->db_fthdr != NULL)
1266 		STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
1267 
1268 	/*
1269 	 * Special handling for Multidata message; this should be
1270 	 * removed once a copy-callback routine is made available.
1271 	 */
1272 	if (dp->db_type == M_MULTIDATA) {
1273 		cred_t *cr;
1274 
1275 		if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL)
1276 			return (NULL);
1277 
1278 		nbp->b_flag = bp->b_flag;
1279 		nbp->b_band = bp->b_band;
1280 		ndp = nbp->b_datap;
1281 
1282 		/* See comments below on potential issues. */
1283 		STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1284 
1285 		ASSERT(ndp->db_type == dp->db_type);
1286 		cr = dp->db_credp;
1287 		if (cr != NULL)
1288 			crhold(ndp->db_credp = cr);
1289 		ndp->db_cpid = dp->db_cpid;
1290 		return (nbp);
1291 	}
1292 
1293 	size = dp->db_lim - dp->db_base;
1294 	unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
1295 	if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
1296 		return (NULL);
1297 	nbp->b_flag = bp->b_flag;
1298 	nbp->b_band = bp->b_band;
1299 	ndp = nbp->b_datap;
1300 
1301 	/*
1302 	 * Well, here is a potential issue.  If we are trying to
1303 	 * trace a flow, and we copy the message, we might lose
1304 	 * information about where this message might have been.
1305 	 * So we should inherit the FT data.  On the other hand,
1306 	 * a user might be interested only in alloc to free data.
1307 	 * So I guess the real answer is to provide a tunable.
1308 	 */
1309 	STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1310 
1311 	base = ndp->db_base + unaligned;
1312 	bcopy(dp->db_base, ndp->db_base + unaligned, size);
1313 
1314 	nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
1315 	nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
1316 
1317 	return (nbp);
1318 }
1319 
1320 /*
1321  * Copy data from message to newly allocated message using new
1322  * data blocks.  Returns a pointer to the new message, or NULL if error.
1323  */
1324 mblk_t *
1325 copymsg(mblk_t *bp)
1326 {
1327 	mblk_t *head, *nbp;
1328 
1329 	if (!bp || !(nbp = head = copyb(bp)))
1330 		return (NULL);
1331 
1332 	while (bp->b_cont) {
1333 		if (!(nbp->b_cont = copyb(bp->b_cont))) {
1334 			freemsg(head);
1335 			return (NULL);
1336 		}
1337 		nbp = nbp->b_cont;
1338 		bp = bp->b_cont;
1339 	}
1340 	return (head);
1341 }
1342 
1343 /*
1344  * link a message block to tail of message
1345  */
1346 void
1347 linkb(mblk_t *mp, mblk_t *bp)
1348 {
1349 	ASSERT(mp && bp);
1350 
1351 	for (; mp->b_cont; mp = mp->b_cont)
1352 		;
1353 	mp->b_cont = bp;
1354 }
1355 
1356 /*
1357  * unlink a message block from head of message
1358  * return pointer to new message.
1359  * NULL if message becomes empty.
1360  */
1361 mblk_t *
1362 unlinkb(mblk_t *bp)
1363 {
1364 	mblk_t *bp1;
1365 
1366 	bp1 = bp->b_cont;
1367 	bp->b_cont = NULL;
1368 	return (bp1);
1369 }
1370 
1371 /*
1372  * remove a message block "bp" from message "mp"
1373  *
1374  * Return pointer to new message or NULL if no message remains.
1375  * Return -1 if bp is not found in message.
1376  */
1377 mblk_t *
1378 rmvb(mblk_t *mp, mblk_t *bp)
1379 {
1380 	mblk_t *tmp;
1381 	mblk_t *lastp = NULL;
1382 
1383 	ASSERT(mp && bp);
1384 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
1385 		if (tmp == bp) {
1386 			if (lastp)
1387 				lastp->b_cont = tmp->b_cont;
1388 			else
1389 				mp = tmp->b_cont;
1390 			tmp->b_cont = NULL;
1391 			return (mp);
1392 		}
1393 		lastp = tmp;
1394 	}
1395 	return ((mblk_t *)-1);
1396 }
1397 
1398 /*
1399  * Concatenate and align first len bytes of common
1400  * message type.  Len == -1, means concat everything.
1401  * Returns 1 on success, 0 on failure
1402  * After the pullup, mp points to the pulled up data.
1403  */
1404 int
1405 pullupmsg(mblk_t *mp, ssize_t len)
1406 {
1407 	mblk_t *bp, *b_cont;
1408 	dblk_t *dbp;
1409 	ssize_t n;
1410 
1411 	ASSERT(mp->b_datap->db_ref > 0);
1412 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
1413 
1414 	/*
1415 	 * We won't handle Multidata message, since it contains
1416 	 * metadata which this function has no knowledge of; we
1417 	 * assert on DEBUG, and return failure otherwise.
1418 	 */
1419 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1420 	if (mp->b_datap->db_type == M_MULTIDATA)
1421 		return (0);
1422 
1423 	if (len == -1) {
1424 		if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
1425 			return (1);
1426 		len = xmsgsize(mp);
1427 	} else {
1428 		ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
1429 		ASSERT(first_mblk_len >= 0);
1430 		/*
1431 		 * If the length is less than that of the first mblk,
1432 		 * we want to pull up the message into an aligned mblk.
1433 		 * Though not part of the spec, some callers assume it.
1434 		 */
1435 		if (len <= first_mblk_len) {
1436 			if (str_aligned(mp->b_rptr))
1437 				return (1);
1438 			len = first_mblk_len;
1439 		} else if (xmsgsize(mp) < len)
1440 			return (0);
1441 	}
1442 
1443 	if ((bp = allocb_tmpl(len, mp)) == NULL)
1444 		return (0);
1445 
1446 	dbp = bp->b_datap;
1447 	*bp = *mp;		/* swap mblks so bp heads the old msg... */
1448 	mp->b_datap = dbp;	/* ... and mp heads the new message */
1449 	mp->b_datap->db_mblk = mp;
1450 	bp->b_datap->db_mblk = bp;
1451 	mp->b_rptr = mp->b_wptr = dbp->db_base;
1452 
1453 	do {
1454 		ASSERT(bp->b_datap->db_ref > 0);
1455 		ASSERT(bp->b_wptr >= bp->b_rptr);
1456 		n = MIN(bp->b_wptr - bp->b_rptr, len);
1457 		bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
1458 		mp->b_wptr += n;
1459 		bp->b_rptr += n;
1460 		len -= n;
1461 		if (bp->b_rptr != bp->b_wptr)
1462 			break;
1463 		b_cont = bp->b_cont;
1464 		freeb(bp);
1465 		bp = b_cont;
1466 	} while (len && bp);
1467 
1468 	mp->b_cont = bp;	/* tack on whatever wasn't pulled up */
1469 
1470 	return (1);
1471 }
1472 
1473 /*
1474  * Concatenate and align at least the first len bytes of common message
1475  * type.  Len == -1 means concatenate everything.  The original message is
1476  * unaltered.  Returns a pointer to a new message on success, otherwise
1477  * returns NULL.
1478  */
1479 mblk_t *
1480 msgpullup(mblk_t *mp, ssize_t len)
1481 {
1482 	mblk_t	*newmp;
1483 	ssize_t	totlen;
1484 	ssize_t	n;
1485 
1486 	/*
1487 	 * We won't handle Multidata message, since it contains
1488 	 * metadata which this function has no knowledge of; we
1489 	 * assert on DEBUG, and return failure otherwise.
1490 	 */
1491 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1492 	if (mp->b_datap->db_type == M_MULTIDATA)
1493 		return (NULL);
1494 
1495 	totlen = xmsgsize(mp);
1496 
1497 	if ((len > 0) && (len > totlen))
1498 		return (NULL);
1499 
1500 	/*
1501 	 * Copy all of the first msg type into one new mblk, then dupmsg
1502 	 * and link the rest onto this.
1503 	 */
1504 
1505 	len = totlen;
1506 
1507 	if ((newmp = allocb_tmpl(len, mp)) == NULL)
1508 		return (NULL);
1509 
1510 	newmp->b_flag = mp->b_flag;
1511 	newmp->b_band = mp->b_band;
1512 
1513 	while (len > 0) {
1514 		n = mp->b_wptr - mp->b_rptr;
1515 		ASSERT(n >= 0);		/* allow zero-length mblk_t's */
1516 		if (n > 0)
1517 			bcopy(mp->b_rptr, newmp->b_wptr, n);
1518 		newmp->b_wptr += n;
1519 		len -= n;
1520 		mp = mp->b_cont;
1521 	}
1522 
1523 	if (mp != NULL) {
1524 		newmp->b_cont = dupmsg(mp);
1525 		if (newmp->b_cont == NULL) {
1526 			freemsg(newmp);
1527 			return (NULL);
1528 		}
1529 	}
1530 
1531 	return (newmp);
1532 }
1533 
1534 /*
1535  * Trim bytes from message
1536  *  len > 0, trim from head
1537  *  len < 0, trim from tail
1538  * Returns 1 on success, 0 on failure.
1539  */
1540 int
1541 adjmsg(mblk_t *mp, ssize_t len)
1542 {
1543 	mblk_t *bp;
1544 	mblk_t *save_bp = NULL;
1545 	mblk_t *prev_bp;
1546 	mblk_t *bcont;
1547 	unsigned char type;
1548 	ssize_t n;
1549 	int fromhead;
1550 	int first;
1551 
1552 	ASSERT(mp != NULL);
1553 	/*
1554 	 * We won't handle Multidata message, since it contains
1555 	 * metadata which this function has no knowledge of; we
1556 	 * assert on DEBUG, and return failure otherwise.
1557 	 */
1558 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1559 	if (mp->b_datap->db_type == M_MULTIDATA)
1560 		return (0);
1561 
1562 	if (len < 0) {
1563 		fromhead = 0;
1564 		len = -len;
1565 	} else {
1566 		fromhead = 1;
1567 	}
1568 
1569 	if (xmsgsize(mp) < len)
1570 		return (0);
1571 
1572 	if (fromhead) {
1573 		first = 1;
1574 		while (len) {
1575 			ASSERT(mp->b_wptr >= mp->b_rptr);
1576 			n = MIN(mp->b_wptr - mp->b_rptr, len);
1577 			mp->b_rptr += n;
1578 			len -= n;
1579 
1580 			/*
1581 			 * If this is not the first zero length
1582 			 * message remove it
1583 			 */
1584 			if (!first && (mp->b_wptr == mp->b_rptr)) {
1585 				bcont = mp->b_cont;
1586 				freeb(mp);
1587 				mp = save_bp->b_cont = bcont;
1588 			} else {
1589 				save_bp = mp;
1590 				mp = mp->b_cont;
1591 			}
1592 			first = 0;
1593 		}
1594 	} else {
1595 		type = mp->b_datap->db_type;
1596 		while (len) {
1597 			bp = mp;
1598 			save_bp = NULL;
1599 
1600 			/*
1601 			 * Find the last message of same type
1602 			 */
1603 			while (bp && bp->b_datap->db_type == type) {
1604 				ASSERT(bp->b_wptr >= bp->b_rptr);
1605 				prev_bp = save_bp;
1606 				save_bp = bp;
1607 				bp = bp->b_cont;
1608 			}
1609 			if (save_bp == NULL)
1610 				break;
1611 			n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
1612 			save_bp->b_wptr -= n;
1613 			len -= n;
1614 
1615 			/*
1616 			 * If this is not the first message
1617 			 * and we have taken away everything
1618 			 * from this message, remove it
1619 			 */
1620 
1621 			if ((save_bp != mp) &&
1622 			    (save_bp->b_wptr == save_bp->b_rptr)) {
1623 				bcont = save_bp->b_cont;
1624 				freeb(save_bp);
1625 				prev_bp->b_cont = bcont;
1626 			}
1627 		}
1628 	}
1629 	return (1);
1630 }
1631 
1632 /*
1633  * get number of data bytes in message
1634  */
1635 size_t
1636 msgdsize(mblk_t *bp)
1637 {
1638 	size_t count = 0;
1639 
1640 	for (; bp; bp = bp->b_cont)
1641 		if (bp->b_datap->db_type == M_DATA) {
1642 			ASSERT(bp->b_wptr >= bp->b_rptr);
1643 			count += bp->b_wptr - bp->b_rptr;
1644 		}
1645 	return (count);
1646 }
1647 
1648 /*
1649  * Get a message off head of queue
1650  *
1651  * If queue has no buffers then mark queue
1652  * with QWANTR. (queue wants to be read by
1653  * someone when data becomes available)
1654  *
1655  * If there is something to take off then do so.
1656  * If queue falls below hi water mark turn off QFULL
1657  * flag.  Decrement weighted count of queue.
1658  * Also turn off QWANTR because queue is being read.
1659  *
1660  * The queue count is maintained on a per-band basis.
1661  * Priority band 0 (normal messages) uses q_count,
1662  * q_lowat, etc.  Non-zero priority bands use the
1663  * fields in their respective qband structures
1664  * (qb_count, qb_lowat, etc.)  All messages appear
1665  * on the same list, linked via their b_next pointers.
1666  * q_first is the head of the list.  q_count does
1667  * not reflect the size of all the messages on the
1668  * queue.  It only reflects those messages in the
1669  * normal band of flow.  The one exception to this
1670  * deals with high priority messages.  They are in
1671  * their own conceptual "band", but are accounted
1672  * against q_count.
1673  *
1674  * If queue count is below the lo water mark and QWANTW
1675  * is set, enable the closest backq which has a service
1676  * procedure and turn off the QWANTW flag.
1677  *
1678  * getq could be built on top of rmvq, but isn't because
1679  * of performance considerations.
1680  *
1681  * A note on the use of q_count and q_mblkcnt:
1682  *   q_count is the traditional byte count for messages that
1683  *   have been put on a queue.  Documentation tells us that
1684  *   we shouldn't rely on that count, but some drivers/modules
1685  *   do.  What was needed, however, is a mechanism to prevent
1686  *   runaway streams from consuming all of the resources,
1687  *   and particularly be able to flow control zero-length
1688  *   messages.  q_mblkcnt is used for this purpose.  It
1689  *   counts the number of mblk's that are being put on
1690  *   the queue.  The intention here, is that each mblk should
1691  *   contain one byte of data and, for the purpose of
1692  *   flow-control, logically does.  A queue will become
1693  *   full when EITHER of these values (q_count and q_mblkcnt)
1694  *   reach the highwater mark.  It will clear when BOTH
1695  *   of them drop below the highwater mark.  And it will
1696  *   backenable when BOTH of them drop below the lowwater
1697  *   mark.
1698  *   With this algorithm, a driver/module might be able
1699  *   to find a reasonably accurate q_count, and the
1700  *   framework can still try and limit resource usage.
1701  */
1702 mblk_t *
1703 getq(queue_t *q)
1704 {
1705 	mblk_t *bp;
1706 	uchar_t band = 0;
1707 
1708 	bp = getq_noenab(q, 0);
1709 	if (bp != NULL)
1710 		band = bp->b_band;
1711 
1712 	/*
1713 	 * Inlined from qbackenable().
1714 	 * Quick check without holding the lock.
1715 	 */
1716 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1717 		return (bp);
1718 
1719 	qbackenable(q, band);
1720 	return (bp);
1721 }
1722 
1723 /*
1724  * Calculate number of data bytes in a single data message block taking
1725  * multidata messages into account.
1726  */
1727 
1728 #define	ADD_MBLK_SIZE(mp, size) 					\
1729 	if (DB_TYPE(mp) != M_MULTIDATA) {				\
1730 		(size) += MBLKL(mp);					\
1731 	} else {							\
1732 		uint_t	pinuse;						\
1733 									\
1734 		mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse);	\
1735 		(size) += pinuse;					\
1736 	}
1737 
1738 /*
1739  * Returns the number of bytes in a message (a message is defined as a
1740  * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we
1741  * also return the number of distinct mblks in the message.
1742  */
1743 int
1744 mp_cont_len(mblk_t *bp, int *mblkcnt)
1745 {
1746 	mblk_t	*mp;
1747 	int	mblks = 0;
1748 	int	bytes = 0;
1749 
1750 	for (mp = bp; mp != NULL; mp = mp->b_cont) {
1751 		ADD_MBLK_SIZE(mp, bytes);
1752 		mblks++;
1753 	}
1754 
1755 	if (mblkcnt != NULL)
1756 		*mblkcnt = mblks;
1757 
1758 	return (bytes);
1759 }
1760 
1761 /*
1762  * Like getq() but does not backenable.  This is used by the stream
1763  * head when a putback() is likely.  The caller must call qbackenable()
1764  * after it is done with accessing the queue.
1765  * The rbytes arguments to getq_noneab() allows callers to specify a
1766  * the maximum number of bytes to return. If the current amount on the
1767  * queue is less than this then the entire message will be returned.
1768  * A value of 0 returns the entire message and is equivalent to the old
1769  * default behaviour prior to the addition of the rbytes argument.
1770  */
1771 mblk_t *
1772 getq_noenab(queue_t *q, ssize_t rbytes)
1773 {
1774 	mblk_t *bp, *mp1;
1775 	mblk_t *mp2 = NULL;
1776 	qband_t *qbp;
1777 	kthread_id_t freezer;
1778 	int	bytecnt = 0, mblkcnt = 0;
1779 
1780 	/* freezestr should allow its caller to call getq/putq */
1781 	freezer = STREAM(q)->sd_freezer;
1782 	if (freezer == curthread) {
1783 		ASSERT(frozenstr(q));
1784 		ASSERT(MUTEX_HELD(QLOCK(q)));
1785 	} else
1786 		mutex_enter(QLOCK(q));
1787 
1788 	if ((bp = q->q_first) == 0) {
1789 		q->q_flag |= QWANTR;
1790 	} else {
1791 		/*
1792 		 * If the caller supplied a byte threshold and there is
1793 		 * more than this amount on the queue then break up the
1794 		 * the message appropriately.  We can only safely do
1795 		 * this for M_DATA messages.
1796 		 */
1797 		if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) &&
1798 		    (q->q_count > rbytes)) {
1799 			/*
1800 			 * Inline version of mp_cont_len() which terminates
1801 			 * when we meet or exceed rbytes.
1802 			 */
1803 			for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) {
1804 				mblkcnt++;
1805 				ADD_MBLK_SIZE(mp1, bytecnt);
1806 				if (bytecnt  >= rbytes)
1807 					break;
1808 			}
1809 			/*
1810 			 * We need to account for the following scenarios:
1811 			 *
1812 			 * 1) Too much data in the first message:
1813 			 *	mp1 will be the mblk which puts us over our
1814 			 *	byte limit.
1815 			 * 2) Not enough data in the first message:
1816 			 *	mp1 will be NULL.
1817 			 * 3) Exactly the right amount of data contained within
1818 			 *    whole mblks:
1819 			 *	mp1->b_cont will be where we break the message.
1820 			 */
1821 			if (bytecnt > rbytes) {
1822 				/*
1823 				 * Dup/copy mp1 and put what we don't need
1824 				 * back onto the queue. Adjust the read/write
1825 				 * and continuation pointers appropriately
1826 				 * and decrement the current mblk count to
1827 				 * reflect we are putting an mblk back onto
1828 				 * the queue.
1829 				 * When adjusting the message pointers, it's
1830 				 * OK to use the existing bytecnt and the
1831 				 * requested amount (rbytes) to calculate the
1832 				 * the new write offset (b_wptr) of what we
1833 				 * are taking. However, we  cannot use these
1834 				 * values when calculating the read offset of
1835 				 * the mblk we are putting back on the queue.
1836 				 * This is because the begining (b_rptr) of the
1837 				 * mblk represents some arbitrary point within
1838 				 * the message.
1839 				 * It's simplest to do this by advancing b_rptr
1840 				 * by the new length of mp1 as we don't have to
1841 				 * remember any intermediate state.
1842 				 */
1843 				ASSERT(mp1 != NULL);
1844 				mblkcnt--;
1845 				if ((mp2 = dupb(mp1)) == NULL &&
1846 				    (mp2 = copyb(mp1)) == NULL) {
1847 					bytecnt = mblkcnt = 0;
1848 					goto dup_failed;
1849 				}
1850 				mp2->b_cont = mp1->b_cont;
1851 				mp1->b_wptr -= bytecnt - rbytes;
1852 				mp2->b_rptr += mp1->b_wptr - mp1->b_rptr;
1853 				mp1->b_cont = NULL;
1854 				bytecnt = rbytes;
1855 			} else {
1856 				/*
1857 				 * Either there is not enough data in the first
1858 				 * message or there is no excess data to deal
1859 				 * with. If mp1 is NULL, we are taking the
1860 				 * whole message. No need to do anything.
1861 				 * Otherwise we assign mp1->b_cont to mp2 as
1862 				 * we will be putting this back onto the head of
1863 				 * the queue.
1864 				 */
1865 				if (mp1 != NULL) {
1866 					mp2 = mp1->b_cont;
1867 					mp1->b_cont = NULL;
1868 				}
1869 			}
1870 			/*
1871 			 * If mp2 is not NULL then we have part of the message
1872 			 * to put back onto the queue.
1873 			 */
1874 			if (mp2 != NULL) {
1875 				if ((mp2->b_next = bp->b_next) == NULL)
1876 					q->q_last = mp2;
1877 				else
1878 					bp->b_next->b_prev = mp2;
1879 				q->q_first = mp2;
1880 			} else {
1881 				if ((q->q_first = bp->b_next) == NULL)
1882 					q->q_last = NULL;
1883 				else
1884 					q->q_first->b_prev = NULL;
1885 			}
1886 		} else {
1887 			/*
1888 			 * Either no byte threshold was supplied, there is
1889 			 * not enough on the queue or we failed to
1890 			 * duplicate/copy a data block. In these cases we
1891 			 * just take the entire first message.
1892 			 */
1893 dup_failed:
1894 			bytecnt = mp_cont_len(bp, &mblkcnt);
1895 			if ((q->q_first = bp->b_next) == NULL)
1896 				q->q_last = NULL;
1897 			else
1898 				q->q_first->b_prev = NULL;
1899 		}
1900 		if (bp->b_band == 0) {
1901 			q->q_count -= bytecnt;
1902 			q->q_mblkcnt -= mblkcnt;
1903 			if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
1904 			    (q->q_mblkcnt < q->q_hiwat))) {
1905 				q->q_flag &= ~QFULL;
1906 			}
1907 		} else {
1908 			int i;
1909 
1910 			ASSERT(bp->b_band <= q->q_nband);
1911 			ASSERT(q->q_bandp != NULL);
1912 			ASSERT(MUTEX_HELD(QLOCK(q)));
1913 			qbp = q->q_bandp;
1914 			i = bp->b_band;
1915 			while (--i > 0)
1916 				qbp = qbp->qb_next;
1917 			if (qbp->qb_first == qbp->qb_last) {
1918 				qbp->qb_first = NULL;
1919 				qbp->qb_last = NULL;
1920 			} else {
1921 				qbp->qb_first = bp->b_next;
1922 			}
1923 			qbp->qb_count -= bytecnt;
1924 			qbp->qb_mblkcnt -= mblkcnt;
1925 			if (qbp->qb_mblkcnt == 0 ||
1926 			    ((qbp->qb_count < qbp->qb_hiwat) &&
1927 			    (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
1928 				qbp->qb_flag &= ~QB_FULL;
1929 			}
1930 		}
1931 		q->q_flag &= ~QWANTR;
1932 		bp->b_next = NULL;
1933 		bp->b_prev = NULL;
1934 	}
1935 	if (freezer != curthread)
1936 		mutex_exit(QLOCK(q));
1937 
1938 	STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL);
1939 
1940 	return (bp);
1941 }
1942 
1943 /*
1944  * Determine if a backenable is needed after removing a message in the
1945  * specified band.
1946  * NOTE: This routine assumes that something like getq_noenab() has been
1947  * already called.
1948  *
1949  * For the read side it is ok to hold sd_lock across calling this (and the
1950  * stream head often does).
1951  * But for the write side strwakeq might be invoked and it acquires sd_lock.
1952  */
1953 void
1954 qbackenable(queue_t *q, uchar_t band)
1955 {
1956 	int backenab = 0;
1957 	qband_t *qbp;
1958 	kthread_id_t freezer;
1959 
1960 	ASSERT(q);
1961 	ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
1962 
1963 	/*
1964 	 * Quick check without holding the lock.
1965 	 * OK since after getq() has lowered the q_count these flags
1966 	 * would not change unless either the qbackenable() is done by
1967 	 * another thread (which is ok) or the queue has gotten QFULL
1968 	 * in which case another backenable will take place when the queue
1969 	 * drops below q_lowat.
1970 	 */
1971 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1972 		return;
1973 
1974 	/* freezestr should allow its caller to call getq/putq */
1975 	freezer = STREAM(q)->sd_freezer;
1976 	if (freezer == curthread) {
1977 		ASSERT(frozenstr(q));
1978 		ASSERT(MUTEX_HELD(QLOCK(q)));
1979 	} else
1980 		mutex_enter(QLOCK(q));
1981 
1982 	if (band == 0) {
1983 		if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
1984 		    q->q_mblkcnt < q->q_lowat)) {
1985 			backenab = q->q_flag & (QWANTW|QWANTWSYNC);
1986 		}
1987 	} else {
1988 		int i;
1989 
1990 		ASSERT((unsigned)band <= q->q_nband);
1991 		ASSERT(q->q_bandp != NULL);
1992 
1993 		qbp = q->q_bandp;
1994 		i = band;
1995 		while (--i > 0)
1996 			qbp = qbp->qb_next;
1997 
1998 		if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
1999 		    qbp->qb_mblkcnt < qbp->qb_lowat)) {
2000 			backenab = qbp->qb_flag & QB_WANTW;
2001 		}
2002 	}
2003 
2004 	if (backenab == 0) {
2005 		if (freezer != curthread)
2006 			mutex_exit(QLOCK(q));
2007 		return;
2008 	}
2009 
2010 	/* Have to drop the lock across strwakeq and backenable */
2011 	if (backenab & QWANTWSYNC)
2012 		q->q_flag &= ~QWANTWSYNC;
2013 	if (backenab & (QWANTW|QB_WANTW)) {
2014 		if (band != 0)
2015 			qbp->qb_flag &= ~QB_WANTW;
2016 		else {
2017 			q->q_flag &= ~QWANTW;
2018 		}
2019 	}
2020 
2021 	if (freezer != curthread)
2022 		mutex_exit(QLOCK(q));
2023 
2024 	if (backenab & QWANTWSYNC)
2025 		strwakeq(q, QWANTWSYNC);
2026 	if (backenab & (QWANTW|QB_WANTW))
2027 		backenable(q, band);
2028 }
2029 
2030 /*
2031  * Remove a message from a queue.  The queue count and other
2032  * flow control parameters are adjusted and the back queue
2033  * enabled if necessary.
2034  *
2035  * rmvq can be called with the stream frozen, but other utility functions
2036  * holding QLOCK, and by streams modules without any locks/frozen.
2037  */
2038 void
2039 rmvq(queue_t *q, mblk_t *mp)
2040 {
2041 	ASSERT(mp != NULL);
2042 
2043 	rmvq_noenab(q, mp);
2044 	if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
2045 		/*
2046 		 * qbackenable can handle a frozen stream but not a "random"
2047 		 * qlock being held. Drop lock across qbackenable.
2048 		 */
2049 		mutex_exit(QLOCK(q));
2050 		qbackenable(q, mp->b_band);
2051 		mutex_enter(QLOCK(q));
2052 	} else {
2053 		qbackenable(q, mp->b_band);
2054 	}
2055 }
2056 
2057 /*
2058  * Like rmvq() but without any backenabling.
2059  * This exists to handle SR_CONSOL_DATA in strrput().
2060  */
2061 void
2062 rmvq_noenab(queue_t *q, mblk_t *mp)
2063 {
2064 	int i;
2065 	qband_t *qbp = NULL;
2066 	kthread_id_t freezer;
2067 	int	bytecnt = 0, mblkcnt = 0;
2068 
2069 	freezer = STREAM(q)->sd_freezer;
2070 	if (freezer == curthread) {
2071 		ASSERT(frozenstr(q));
2072 		ASSERT(MUTEX_HELD(QLOCK(q)));
2073 	} else if (MUTEX_HELD(QLOCK(q))) {
2074 		/* Don't drop lock on exit */
2075 		freezer = curthread;
2076 	} else
2077 		mutex_enter(QLOCK(q));
2078 
2079 	ASSERT(mp->b_band <= q->q_nband);
2080 	if (mp->b_band != 0) {		/* Adjust band pointers */
2081 		ASSERT(q->q_bandp != NULL);
2082 		qbp = q->q_bandp;
2083 		i = mp->b_band;
2084 		while (--i > 0)
2085 			qbp = qbp->qb_next;
2086 		if (mp == qbp->qb_first) {
2087 			if (mp->b_next && mp->b_band == mp->b_next->b_band)
2088 				qbp->qb_first = mp->b_next;
2089 			else
2090 				qbp->qb_first = NULL;
2091 		}
2092 		if (mp == qbp->qb_last) {
2093 			if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
2094 				qbp->qb_last = mp->b_prev;
2095 			else
2096 				qbp->qb_last = NULL;
2097 		}
2098 	}
2099 
2100 	/*
2101 	 * Remove the message from the list.
2102 	 */
2103 	if (mp->b_prev)
2104 		mp->b_prev->b_next = mp->b_next;
2105 	else
2106 		q->q_first = mp->b_next;
2107 	if (mp->b_next)
2108 		mp->b_next->b_prev = mp->b_prev;
2109 	else
2110 		q->q_last = mp->b_prev;
2111 	mp->b_next = NULL;
2112 	mp->b_prev = NULL;
2113 
2114 	/* Get the size of the message for q_count accounting */
2115 	bytecnt = mp_cont_len(mp, &mblkcnt);
2116 
2117 	if (mp->b_band == 0) {		/* Perform q_count accounting */
2118 		q->q_count -= bytecnt;
2119 		q->q_mblkcnt -= mblkcnt;
2120 		if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2121 		    (q->q_mblkcnt < q->q_hiwat))) {
2122 			q->q_flag &= ~QFULL;
2123 		}
2124 	} else {			/* Perform qb_count accounting */
2125 		qbp->qb_count -= bytecnt;
2126 		qbp->qb_mblkcnt -= mblkcnt;
2127 		if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) &&
2128 		    (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2129 			qbp->qb_flag &= ~QB_FULL;
2130 		}
2131 	}
2132 	if (freezer != curthread)
2133 		mutex_exit(QLOCK(q));
2134 
2135 	STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL);
2136 }
2137 
2138 /*
2139  * Empty a queue.
2140  * If flag is set, remove all messages.  Otherwise, remove
2141  * only non-control messages.  If queue falls below its low
2142  * water mark, and QWANTW is set, enable the nearest upstream
2143  * service procedure.
2144  *
2145  * Historical note: when merging the M_FLUSH code in strrput with this
2146  * code one difference was discovered. flushq did not have a check
2147  * for q_lowat == 0 in the backenabling test.
2148  *
2149  * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
2150  * if one exists on the queue.
2151  */
2152 void
2153 flushq_common(queue_t *q, int flag, int pcproto_flag)
2154 {
2155 	mblk_t *mp, *nmp;
2156 	qband_t *qbp;
2157 	int backenab = 0;
2158 	unsigned char bpri;
2159 	unsigned char	qbf[NBAND];	/* band flushing backenable flags */
2160 
2161 	if (q->q_first == NULL)
2162 		return;
2163 
2164 	mutex_enter(QLOCK(q));
2165 	mp = q->q_first;
2166 	q->q_first = NULL;
2167 	q->q_last = NULL;
2168 	q->q_count = 0;
2169 	q->q_mblkcnt = 0;
2170 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2171 		qbp->qb_first = NULL;
2172 		qbp->qb_last = NULL;
2173 		qbp->qb_count = 0;
2174 		qbp->qb_mblkcnt = 0;
2175 		qbp->qb_flag &= ~QB_FULL;
2176 	}
2177 	q->q_flag &= ~QFULL;
2178 	mutex_exit(QLOCK(q));
2179 	while (mp) {
2180 		nmp = mp->b_next;
2181 		mp->b_next = mp->b_prev = NULL;
2182 
2183 		STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL);
2184 
2185 		if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
2186 			(void) putq(q, mp);
2187 		else if (flag || datamsg(mp->b_datap->db_type))
2188 			freemsg(mp);
2189 		else
2190 			(void) putq(q, mp);
2191 		mp = nmp;
2192 	}
2193 	bpri = 1;
2194 	mutex_enter(QLOCK(q));
2195 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2196 		if ((qbp->qb_flag & QB_WANTW) &&
2197 		    (((qbp->qb_count < qbp->qb_lowat) &&
2198 		    (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
2199 		    qbp->qb_lowat == 0)) {
2200 			qbp->qb_flag &= ~QB_WANTW;
2201 			backenab = 1;
2202 			qbf[bpri] = 1;
2203 		} else
2204 			qbf[bpri] = 0;
2205 		bpri++;
2206 	}
2207 	ASSERT(bpri == (unsigned char)(q->q_nband + 1));
2208 	if ((q->q_flag & QWANTW) &&
2209 	    (((q->q_count < q->q_lowat) &&
2210 	    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2211 		q->q_flag &= ~QWANTW;
2212 		backenab = 1;
2213 		qbf[0] = 1;
2214 	} else
2215 		qbf[0] = 0;
2216 
2217 	/*
2218 	 * If any band can now be written to, and there is a writer
2219 	 * for that band, then backenable the closest service procedure.
2220 	 */
2221 	if (backenab) {
2222 		mutex_exit(QLOCK(q));
2223 		for (bpri = q->q_nband; bpri != 0; bpri--)
2224 			if (qbf[bpri])
2225 				backenable(q, bpri);
2226 		if (qbf[0])
2227 			backenable(q, 0);
2228 	} else
2229 		mutex_exit(QLOCK(q));
2230 }
2231 
2232 /*
2233  * The real flushing takes place in flushq_common. This is done so that
2234  * a flag which specifies whether or not M_PCPROTO messages should be flushed
2235  * or not. Currently the only place that uses this flag is the stream head.
2236  */
2237 void
2238 flushq(queue_t *q, int flag)
2239 {
2240 	flushq_common(q, flag, 0);
2241 }
2242 
2243 /*
2244  * Flush the queue of messages of the given priority band.
2245  * There is some duplication of code between flushq and flushband.
2246  * This is because we want to optimize the code as much as possible.
2247  * The assumption is that there will be more messages in the normal
2248  * (priority 0) band than in any other.
2249  *
2250  * Historical note: when merging the M_FLUSH code in strrput with this
2251  * code one difference was discovered. flushband had an extra check for
2252  * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
2253  * case. That check does not match the man page for flushband and was not
2254  * in the strrput flush code hence it was removed.
2255  */
2256 void
2257 flushband(queue_t *q, unsigned char pri, int flag)
2258 {
2259 	mblk_t *mp;
2260 	mblk_t *nmp;
2261 	mblk_t *last;
2262 	qband_t *qbp;
2263 	int band;
2264 
2265 	ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
2266 	if (pri > q->q_nband) {
2267 		return;
2268 	}
2269 	mutex_enter(QLOCK(q));
2270 	if (pri == 0) {
2271 		mp = q->q_first;
2272 		q->q_first = NULL;
2273 		q->q_last = NULL;
2274 		q->q_count = 0;
2275 		q->q_mblkcnt = 0;
2276 		for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2277 			qbp->qb_first = NULL;
2278 			qbp->qb_last = NULL;
2279 			qbp->qb_count = 0;
2280 			qbp->qb_mblkcnt = 0;
2281 			qbp->qb_flag &= ~QB_FULL;
2282 		}
2283 		q->q_flag &= ~QFULL;
2284 		mutex_exit(QLOCK(q));
2285 		while (mp) {
2286 			nmp = mp->b_next;
2287 			mp->b_next = mp->b_prev = NULL;
2288 			if ((mp->b_band == 0) &&
2289 			    ((flag == FLUSHALL) ||
2290 			    datamsg(mp->b_datap->db_type)))
2291 				freemsg(mp);
2292 			else
2293 				(void) putq(q, mp);
2294 			mp = nmp;
2295 		}
2296 		mutex_enter(QLOCK(q));
2297 		if ((q->q_flag & QWANTW) &&
2298 		    (((q->q_count < q->q_lowat) &&
2299 		    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2300 			q->q_flag &= ~QWANTW;
2301 			mutex_exit(QLOCK(q));
2302 
2303 			backenable(q, pri);
2304 		} else
2305 			mutex_exit(QLOCK(q));
2306 	} else {	/* pri != 0 */
2307 		boolean_t flushed = B_FALSE;
2308 		band = pri;
2309 
2310 		ASSERT(MUTEX_HELD(QLOCK(q)));
2311 		qbp = q->q_bandp;
2312 		while (--band > 0)
2313 			qbp = qbp->qb_next;
2314 		mp = qbp->qb_first;
2315 		if (mp == NULL) {
2316 			mutex_exit(QLOCK(q));
2317 			return;
2318 		}
2319 		last = qbp->qb_last->b_next;
2320 		/*
2321 		 * rmvq_noenab() and freemsg() are called for each mblk that
2322 		 * meets the criteria.  The loop is executed until the last
2323 		 * mblk has been processed.
2324 		 */
2325 		while (mp != last) {
2326 			ASSERT(mp->b_band == pri);
2327 			nmp = mp->b_next;
2328 			if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
2329 				rmvq_noenab(q, mp);
2330 				freemsg(mp);
2331 				flushed = B_TRUE;
2332 			}
2333 			mp = nmp;
2334 		}
2335 		mutex_exit(QLOCK(q));
2336 
2337 		/*
2338 		 * If any mblk(s) has been freed, we know that qbackenable()
2339 		 * will need to be called.
2340 		 */
2341 		if (flushed)
2342 			qbackenable(q, pri);
2343 	}
2344 }
2345 
2346 /*
2347  * Return 1 if the queue is not full.  If the queue is full, return
2348  * 0 (may not put message) and set QWANTW flag (caller wants to write
2349  * to the queue).
2350  */
2351 int
2352 canput(queue_t *q)
2353 {
2354 	TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
2355 
2356 	/* this is for loopback transports, they should not do a canput */
2357 	ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
2358 
2359 	/* Find next forward module that has a service procedure */
2360 	q = q->q_nfsrv;
2361 
2362 	if (!(q->q_flag & QFULL)) {
2363 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2364 		return (1);
2365 	}
2366 	mutex_enter(QLOCK(q));
2367 	if (q->q_flag & QFULL) {
2368 		q->q_flag |= QWANTW;
2369 		mutex_exit(QLOCK(q));
2370 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
2371 		return (0);
2372 	}
2373 	mutex_exit(QLOCK(q));
2374 	TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2375 	return (1);
2376 }
2377 
2378 /*
2379  * This is the new canput for use with priority bands.  Return 1 if the
2380  * band is not full.  If the band is full, return 0 (may not put message)
2381  * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
2382  * write to the queue).
2383  */
2384 int
2385 bcanput(queue_t *q, unsigned char pri)
2386 {
2387 	qband_t *qbp;
2388 
2389 	TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
2390 	if (!q)
2391 		return (0);
2392 
2393 	/* Find next forward module that has a service procedure */
2394 	q = q->q_nfsrv;
2395 
2396 	mutex_enter(QLOCK(q));
2397 	if (pri == 0) {
2398 		if (q->q_flag & QFULL) {
2399 			q->q_flag |= QWANTW;
2400 			mutex_exit(QLOCK(q));
2401 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2402 			    "bcanput:%p %X %d", q, pri, 0);
2403 			return (0);
2404 		}
2405 	} else {	/* pri != 0 */
2406 		if (pri > q->q_nband) {
2407 			/*
2408 			 * No band exists yet, so return success.
2409 			 */
2410 			mutex_exit(QLOCK(q));
2411 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2412 			    "bcanput:%p %X %d", q, pri, 1);
2413 			return (1);
2414 		}
2415 		qbp = q->q_bandp;
2416 		while (--pri)
2417 			qbp = qbp->qb_next;
2418 		if (qbp->qb_flag & QB_FULL) {
2419 			qbp->qb_flag |= QB_WANTW;
2420 			mutex_exit(QLOCK(q));
2421 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2422 			    "bcanput:%p %X %d", q, pri, 0);
2423 			return (0);
2424 		}
2425 	}
2426 	mutex_exit(QLOCK(q));
2427 	TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2428 	    "bcanput:%p %X %d", q, pri, 1);
2429 	return (1);
2430 }
2431 
2432 /*
2433  * Put a message on a queue.
2434  *
2435  * Messages are enqueued on a priority basis.  The priority classes
2436  * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
2437  * and B_NORMAL (type < QPCTL && band == 0).
2438  *
2439  * Add appropriate weighted data block sizes to queue count.
2440  * If queue hits high water mark then set QFULL flag.
2441  *
2442  * If QNOENAB is not set (putq is allowed to enable the queue),
2443  * enable the queue only if the message is PRIORITY,
2444  * or the QWANTR flag is set (indicating that the service procedure
2445  * is ready to read the queue.  This implies that a service
2446  * procedure must NEVER put a high priority message back on its own
2447  * queue, as this would result in an infinite loop (!).
2448  */
2449 int
2450 putq(queue_t *q, mblk_t *bp)
2451 {
2452 	mblk_t *tmp;
2453 	qband_t *qbp = NULL;
2454 	int mcls = (int)queclass(bp);
2455 	kthread_id_t freezer;
2456 	int	bytecnt = 0, mblkcnt = 0;
2457 
2458 	freezer = STREAM(q)->sd_freezer;
2459 	if (freezer == curthread) {
2460 		ASSERT(frozenstr(q));
2461 		ASSERT(MUTEX_HELD(QLOCK(q)));
2462 	} else
2463 		mutex_enter(QLOCK(q));
2464 
2465 	/*
2466 	 * Make sanity checks and if qband structure is not yet
2467 	 * allocated, do so.
2468 	 */
2469 	if (mcls == QPCTL) {
2470 		if (bp->b_band != 0)
2471 			bp->b_band = 0;		/* force to be correct */
2472 	} else if (bp->b_band != 0) {
2473 		int i;
2474 		qband_t **qbpp;
2475 
2476 		if (bp->b_band > q->q_nband) {
2477 
2478 			/*
2479 			 * The qband structure for this priority band is
2480 			 * not on the queue yet, so we have to allocate
2481 			 * one on the fly.  It would be wasteful to
2482 			 * associate the qband structures with every
2483 			 * queue when the queues are allocated.  This is
2484 			 * because most queues will only need the normal
2485 			 * band of flow which can be described entirely
2486 			 * by the queue itself.
2487 			 */
2488 			qbpp = &q->q_bandp;
2489 			while (*qbpp)
2490 				qbpp = &(*qbpp)->qb_next;
2491 			while (bp->b_band > q->q_nband) {
2492 				if ((*qbpp = allocband()) == NULL) {
2493 					if (freezer != curthread)
2494 						mutex_exit(QLOCK(q));
2495 					return (0);
2496 				}
2497 				(*qbpp)->qb_hiwat = q->q_hiwat;
2498 				(*qbpp)->qb_lowat = q->q_lowat;
2499 				q->q_nband++;
2500 				qbpp = &(*qbpp)->qb_next;
2501 			}
2502 		}
2503 		ASSERT(MUTEX_HELD(QLOCK(q)));
2504 		qbp = q->q_bandp;
2505 		i = bp->b_band;
2506 		while (--i)
2507 			qbp = qbp->qb_next;
2508 	}
2509 
2510 	/*
2511 	 * If queue is empty, add the message and initialize the pointers.
2512 	 * Otherwise, adjust message pointers and queue pointers based on
2513 	 * the type of the message and where it belongs on the queue.  Some
2514 	 * code is duplicated to minimize the number of conditionals and
2515 	 * hopefully minimize the amount of time this routine takes.
2516 	 */
2517 	if (!q->q_first) {
2518 		bp->b_next = NULL;
2519 		bp->b_prev = NULL;
2520 		q->q_first = bp;
2521 		q->q_last = bp;
2522 		if (qbp) {
2523 			qbp->qb_first = bp;
2524 			qbp->qb_last = bp;
2525 		}
2526 	} else if (!qbp) {	/* bp->b_band == 0 */
2527 
2528 		/*
2529 		 * If queue class of message is less than or equal to
2530 		 * that of the last one on the queue, tack on to the end.
2531 		 */
2532 		tmp = q->q_last;
2533 		if (mcls <= (int)queclass(tmp)) {
2534 			bp->b_next = NULL;
2535 			bp->b_prev = tmp;
2536 			tmp->b_next = bp;
2537 			q->q_last = bp;
2538 		} else {
2539 			tmp = q->q_first;
2540 			while ((int)queclass(tmp) >= mcls)
2541 				tmp = tmp->b_next;
2542 
2543 			/*
2544 			 * Insert bp before tmp.
2545 			 */
2546 			bp->b_next = tmp;
2547 			bp->b_prev = tmp->b_prev;
2548 			if (tmp->b_prev)
2549 				tmp->b_prev->b_next = bp;
2550 			else
2551 				q->q_first = bp;
2552 			tmp->b_prev = bp;
2553 		}
2554 	} else {		/* bp->b_band != 0 */
2555 		if (qbp->qb_first) {
2556 			tmp = qbp->qb_last;
2557 
2558 			/*
2559 			 * Insert bp after the last message in this band.
2560 			 */
2561 			bp->b_next = tmp->b_next;
2562 			if (tmp->b_next)
2563 				tmp->b_next->b_prev = bp;
2564 			else
2565 				q->q_last = bp;
2566 			bp->b_prev = tmp;
2567 			tmp->b_next = bp;
2568 		} else {
2569 			tmp = q->q_last;
2570 			if ((mcls < (int)queclass(tmp)) ||
2571 			    (bp->b_band <= tmp->b_band)) {
2572 
2573 				/*
2574 				 * Tack bp on end of queue.
2575 				 */
2576 				bp->b_next = NULL;
2577 				bp->b_prev = tmp;
2578 				tmp->b_next = bp;
2579 				q->q_last = bp;
2580 			} else {
2581 				tmp = q->q_first;
2582 				while (tmp->b_datap->db_type >= QPCTL)
2583 					tmp = tmp->b_next;
2584 				while (tmp->b_band >= bp->b_band)
2585 					tmp = tmp->b_next;
2586 
2587 				/*
2588 				 * Insert bp before tmp.
2589 				 */
2590 				bp->b_next = tmp;
2591 				bp->b_prev = tmp->b_prev;
2592 				if (tmp->b_prev)
2593 					tmp->b_prev->b_next = bp;
2594 				else
2595 					q->q_first = bp;
2596 				tmp->b_prev = bp;
2597 			}
2598 			qbp->qb_first = bp;
2599 		}
2600 		qbp->qb_last = bp;
2601 	}
2602 
2603 	/* Get message byte count for q_count accounting */
2604 	bytecnt = mp_cont_len(bp, &mblkcnt);
2605 
2606 	if (qbp) {
2607 		qbp->qb_count += bytecnt;
2608 		qbp->qb_mblkcnt += mblkcnt;
2609 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2610 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2611 			qbp->qb_flag |= QB_FULL;
2612 		}
2613 	} else {
2614 		q->q_count += bytecnt;
2615 		q->q_mblkcnt += mblkcnt;
2616 		if ((q->q_count >= q->q_hiwat) ||
2617 		    (q->q_mblkcnt >= q->q_hiwat)) {
2618 			q->q_flag |= QFULL;
2619 		}
2620 	}
2621 
2622 	STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL);
2623 
2624 	if ((mcls > QNORM) ||
2625 	    (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
2626 		qenable_locked(q);
2627 	ASSERT(MUTEX_HELD(QLOCK(q)));
2628 	if (freezer != curthread)
2629 		mutex_exit(QLOCK(q));
2630 
2631 	return (1);
2632 }
2633 
2634 /*
2635  * Put stuff back at beginning of Q according to priority order.
2636  * See comment on putq above for details.
2637  */
2638 int
2639 putbq(queue_t *q, mblk_t *bp)
2640 {
2641 	mblk_t *tmp;
2642 	qband_t *qbp = NULL;
2643 	int mcls = (int)queclass(bp);
2644 	kthread_id_t freezer;
2645 	int	bytecnt = 0, mblkcnt = 0;
2646 
2647 	ASSERT(q && bp);
2648 	ASSERT(bp->b_next == NULL);
2649 	freezer = STREAM(q)->sd_freezer;
2650 	if (freezer == curthread) {
2651 		ASSERT(frozenstr(q));
2652 		ASSERT(MUTEX_HELD(QLOCK(q)));
2653 	} else
2654 		mutex_enter(QLOCK(q));
2655 
2656 	/*
2657 	 * Make sanity checks and if qband structure is not yet
2658 	 * allocated, do so.
2659 	 */
2660 	if (mcls == QPCTL) {
2661 		if (bp->b_band != 0)
2662 			bp->b_band = 0;		/* force to be correct */
2663 	} else if (bp->b_band != 0) {
2664 		int i;
2665 		qband_t **qbpp;
2666 
2667 		if (bp->b_band > q->q_nband) {
2668 			qbpp = &q->q_bandp;
2669 			while (*qbpp)
2670 				qbpp = &(*qbpp)->qb_next;
2671 			while (bp->b_band > q->q_nband) {
2672 				if ((*qbpp = allocband()) == NULL) {
2673 					if (freezer != curthread)
2674 						mutex_exit(QLOCK(q));
2675 					return (0);
2676 				}
2677 				(*qbpp)->qb_hiwat = q->q_hiwat;
2678 				(*qbpp)->qb_lowat = q->q_lowat;
2679 				q->q_nband++;
2680 				qbpp = &(*qbpp)->qb_next;
2681 			}
2682 		}
2683 		qbp = q->q_bandp;
2684 		i = bp->b_band;
2685 		while (--i)
2686 			qbp = qbp->qb_next;
2687 	}
2688 
2689 	/*
2690 	 * If queue is empty or if message is high priority,
2691 	 * place on the front of the queue.
2692 	 */
2693 	tmp = q->q_first;
2694 	if ((!tmp) || (mcls == QPCTL)) {
2695 		bp->b_next = tmp;
2696 		if (tmp)
2697 			tmp->b_prev = bp;
2698 		else
2699 			q->q_last = bp;
2700 		q->q_first = bp;
2701 		bp->b_prev = NULL;
2702 		if (qbp) {
2703 			qbp->qb_first = bp;
2704 			qbp->qb_last = bp;
2705 		}
2706 	} else if (qbp) {	/* bp->b_band != 0 */
2707 		tmp = qbp->qb_first;
2708 		if (tmp) {
2709 
2710 			/*
2711 			 * Insert bp before the first message in this band.
2712 			 */
2713 			bp->b_next = tmp;
2714 			bp->b_prev = tmp->b_prev;
2715 			if (tmp->b_prev)
2716 				tmp->b_prev->b_next = bp;
2717 			else
2718 				q->q_first = bp;
2719 			tmp->b_prev = bp;
2720 		} else {
2721 			tmp = q->q_last;
2722 			if ((mcls < (int)queclass(tmp)) ||
2723 			    (bp->b_band < tmp->b_band)) {
2724 
2725 				/*
2726 				 * Tack bp on end of queue.
2727 				 */
2728 				bp->b_next = NULL;
2729 				bp->b_prev = tmp;
2730 				tmp->b_next = bp;
2731 				q->q_last = bp;
2732 			} else {
2733 				tmp = q->q_first;
2734 				while (tmp->b_datap->db_type >= QPCTL)
2735 					tmp = tmp->b_next;
2736 				while (tmp->b_band > bp->b_band)
2737 					tmp = tmp->b_next;
2738 
2739 				/*
2740 				 * Insert bp before tmp.
2741 				 */
2742 				bp->b_next = tmp;
2743 				bp->b_prev = tmp->b_prev;
2744 				if (tmp->b_prev)
2745 					tmp->b_prev->b_next = bp;
2746 				else
2747 					q->q_first = bp;
2748 				tmp->b_prev = bp;
2749 			}
2750 			qbp->qb_last = bp;
2751 		}
2752 		qbp->qb_first = bp;
2753 	} else {		/* bp->b_band == 0 && !QPCTL */
2754 
2755 		/*
2756 		 * If the queue class or band is less than that of the last
2757 		 * message on the queue, tack bp on the end of the queue.
2758 		 */
2759 		tmp = q->q_last;
2760 		if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
2761 			bp->b_next = NULL;
2762 			bp->b_prev = tmp;
2763 			tmp->b_next = bp;
2764 			q->q_last = bp;
2765 		} else {
2766 			tmp = q->q_first;
2767 			while (tmp->b_datap->db_type >= QPCTL)
2768 				tmp = tmp->b_next;
2769 			while (tmp->b_band > bp->b_band)
2770 				tmp = tmp->b_next;
2771 
2772 			/*
2773 			 * Insert bp before tmp.
2774 			 */
2775 			bp->b_next = tmp;
2776 			bp->b_prev = tmp->b_prev;
2777 			if (tmp->b_prev)
2778 				tmp->b_prev->b_next = bp;
2779 			else
2780 				q->q_first = bp;
2781 			tmp->b_prev = bp;
2782 		}
2783 	}
2784 
2785 	/* Get message byte count for q_count accounting */
2786 	bytecnt = mp_cont_len(bp, &mblkcnt);
2787 
2788 	if (qbp) {
2789 		qbp->qb_count += bytecnt;
2790 		qbp->qb_mblkcnt += mblkcnt;
2791 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2792 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2793 			qbp->qb_flag |= QB_FULL;
2794 		}
2795 	} else {
2796 		q->q_count += bytecnt;
2797 		q->q_mblkcnt += mblkcnt;
2798 		if ((q->q_count >= q->q_hiwat) ||
2799 		    (q->q_mblkcnt >= q->q_hiwat)) {
2800 			q->q_flag |= QFULL;
2801 		}
2802 	}
2803 
2804 	STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL);
2805 
2806 	if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
2807 		qenable_locked(q);
2808 	ASSERT(MUTEX_HELD(QLOCK(q)));
2809 	if (freezer != curthread)
2810 		mutex_exit(QLOCK(q));
2811 
2812 	return (1);
2813 }
2814 
2815 /*
2816  * Insert a message before an existing message on the queue.  If the
2817  * existing message is NULL, the new messages is placed on the end of
2818  * the queue.  The queue class of the new message is ignored.  However,
2819  * the priority band of the new message must adhere to the following
2820  * ordering:
2821  *
2822  *	emp->b_prev->b_band >= mp->b_band >= emp->b_band.
2823  *
2824  * All flow control parameters are updated.
2825  *
2826  * insq can be called with the stream frozen, but other utility functions
2827  * holding QLOCK, and by streams modules without any locks/frozen.
2828  */
2829 int
2830 insq(queue_t *q, mblk_t *emp, mblk_t *mp)
2831 {
2832 	mblk_t *tmp;
2833 	qband_t *qbp = NULL;
2834 	int mcls = (int)queclass(mp);
2835 	kthread_id_t freezer;
2836 	int	bytecnt = 0, mblkcnt = 0;
2837 
2838 	freezer = STREAM(q)->sd_freezer;
2839 	if (freezer == curthread) {
2840 		ASSERT(frozenstr(q));
2841 		ASSERT(MUTEX_HELD(QLOCK(q)));
2842 	} else if (MUTEX_HELD(QLOCK(q))) {
2843 		/* Don't drop lock on exit */
2844 		freezer = curthread;
2845 	} else
2846 		mutex_enter(QLOCK(q));
2847 
2848 	if (mcls == QPCTL) {
2849 		if (mp->b_band != 0)
2850 			mp->b_band = 0;		/* force to be correct */
2851 		if (emp && emp->b_prev &&
2852 		    (emp->b_prev->b_datap->db_type < QPCTL))
2853 			goto badord;
2854 	}
2855 	if (emp) {
2856 		if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
2857 		    (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
2858 		    (emp->b_prev->b_band < mp->b_band))) {
2859 			goto badord;
2860 		}
2861 	} else {
2862 		tmp = q->q_last;
2863 		if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
2864 badord:
2865 			cmn_err(CE_WARN,
2866 			    "insq: attempt to insert message out of order "
2867 			    "on q %p", (void *)q);
2868 			if (freezer != curthread)
2869 				mutex_exit(QLOCK(q));
2870 			return (0);
2871 		}
2872 	}
2873 
2874 	if (mp->b_band != 0) {
2875 		int i;
2876 		qband_t **qbpp;
2877 
2878 		if (mp->b_band > q->q_nband) {
2879 			qbpp = &q->q_bandp;
2880 			while (*qbpp)
2881 				qbpp = &(*qbpp)->qb_next;
2882 			while (mp->b_band > q->q_nband) {
2883 				if ((*qbpp = allocband()) == NULL) {
2884 					if (freezer != curthread)
2885 						mutex_exit(QLOCK(q));
2886 					return (0);
2887 				}
2888 				(*qbpp)->qb_hiwat = q->q_hiwat;
2889 				(*qbpp)->qb_lowat = q->q_lowat;
2890 				q->q_nband++;
2891 				qbpp = &(*qbpp)->qb_next;
2892 			}
2893 		}
2894 		qbp = q->q_bandp;
2895 		i = mp->b_band;
2896 		while (--i)
2897 			qbp = qbp->qb_next;
2898 	}
2899 
2900 	if ((mp->b_next = emp) != NULL) {
2901 		if ((mp->b_prev = emp->b_prev) != NULL)
2902 			emp->b_prev->b_next = mp;
2903 		else
2904 			q->q_first = mp;
2905 		emp->b_prev = mp;
2906 	} else {
2907 		if ((mp->b_prev = q->q_last) != NULL)
2908 			q->q_last->b_next = mp;
2909 		else
2910 			q->q_first = mp;
2911 		q->q_last = mp;
2912 	}
2913 
2914 	/* Get mblk and byte count for q_count accounting */
2915 	bytecnt = mp_cont_len(mp, &mblkcnt);
2916 
2917 	if (qbp) {	/* adjust qband pointers and count */
2918 		if (!qbp->qb_first) {
2919 			qbp->qb_first = mp;
2920 			qbp->qb_last = mp;
2921 		} else {
2922 			if (mp->b_prev == NULL || (mp->b_prev != NULL &&
2923 			    (mp->b_prev->b_band != mp->b_band)))
2924 				qbp->qb_first = mp;
2925 			else if (mp->b_next == NULL || (mp->b_next != NULL &&
2926 			    (mp->b_next->b_band != mp->b_band)))
2927 				qbp->qb_last = mp;
2928 		}
2929 		qbp->qb_count += bytecnt;
2930 		qbp->qb_mblkcnt += mblkcnt;
2931 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2932 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2933 			qbp->qb_flag |= QB_FULL;
2934 		}
2935 	} else {
2936 		q->q_count += bytecnt;
2937 		q->q_mblkcnt += mblkcnt;
2938 		if ((q->q_count >= q->q_hiwat) ||
2939 		    (q->q_mblkcnt >= q->q_hiwat)) {
2940 			q->q_flag |= QFULL;
2941 		}
2942 	}
2943 
2944 	STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL);
2945 
2946 	if (canenable(q) && (q->q_flag & QWANTR))
2947 		qenable_locked(q);
2948 
2949 	ASSERT(MUTEX_HELD(QLOCK(q)));
2950 	if (freezer != curthread)
2951 		mutex_exit(QLOCK(q));
2952 
2953 	return (1);
2954 }
2955 
2956 /*
2957  * Create and put a control message on queue.
2958  */
2959 int
2960 putctl(queue_t *q, int type)
2961 {
2962 	mblk_t *bp;
2963 
2964 	if ((datamsg(type) && (type != M_DELAY)) ||
2965 	    (bp = allocb_tryhard(0)) == NULL)
2966 		return (0);
2967 	bp->b_datap->db_type = (unsigned char) type;
2968 
2969 	put(q, bp);
2970 
2971 	return (1);
2972 }
2973 
2974 /*
2975  * Control message with a single-byte parameter
2976  */
2977 int
2978 putctl1(queue_t *q, int type, int param)
2979 {
2980 	mblk_t *bp;
2981 
2982 	if ((datamsg(type) && (type != M_DELAY)) ||
2983 	    (bp = allocb_tryhard(1)) == NULL)
2984 		return (0);
2985 	bp->b_datap->db_type = (unsigned char)type;
2986 	*bp->b_wptr++ = (unsigned char)param;
2987 
2988 	put(q, bp);
2989 
2990 	return (1);
2991 }
2992 
2993 int
2994 putnextctl1(queue_t *q, int type, int param)
2995 {
2996 	mblk_t *bp;
2997 
2998 	if ((datamsg(type) && (type != M_DELAY)) ||
2999 	    ((bp = allocb_tryhard(1)) == NULL))
3000 		return (0);
3001 
3002 	bp->b_datap->db_type = (unsigned char)type;
3003 	*bp->b_wptr++ = (unsigned char)param;
3004 
3005 	putnext(q, bp);
3006 
3007 	return (1);
3008 }
3009 
3010 int
3011 putnextctl(queue_t *q, int type)
3012 {
3013 	mblk_t *bp;
3014 
3015 	if ((datamsg(type) && (type != M_DELAY)) ||
3016 	    ((bp = allocb_tryhard(0)) == NULL))
3017 		return (0);
3018 	bp->b_datap->db_type = (unsigned char)type;
3019 
3020 	putnext(q, bp);
3021 
3022 	return (1);
3023 }
3024 
3025 /*
3026  * Return the queue upstream from this one
3027  */
3028 queue_t *
3029 backq(queue_t *q)
3030 {
3031 	q = _OTHERQ(q);
3032 	if (q->q_next) {
3033 		q = q->q_next;
3034 		return (_OTHERQ(q));
3035 	}
3036 	return (NULL);
3037 }
3038 
3039 /*
3040  * Send a block back up the queue in reverse from this
3041  * one (e.g. to respond to ioctls)
3042  */
3043 void
3044 qreply(queue_t *q, mblk_t *bp)
3045 {
3046 	ASSERT(q && bp);
3047 
3048 	putnext(_OTHERQ(q), bp);
3049 }
3050 
3051 /*
3052  * Streams Queue Scheduling
3053  *
3054  * Queues are enabled through qenable() when they have messages to
3055  * process.  They are serviced by queuerun(), which runs each enabled
3056  * queue's service procedure.  The call to queuerun() is processor
3057  * dependent - the general principle is that it be run whenever a queue
3058  * is enabled but before returning to user level.  For system calls,
3059  * the function runqueues() is called if their action causes a queue
3060  * to be enabled.  For device interrupts, queuerun() should be
3061  * called before returning from the last level of interrupt.  Beyond
3062  * this, no timing assumptions should be made about queue scheduling.
3063  */
3064 
3065 /*
3066  * Enable a queue: put it on list of those whose service procedures are
3067  * ready to run and set up the scheduling mechanism.
3068  * The broadcast is done outside the mutex -> to avoid the woken thread
3069  * from contending with the mutex. This is OK 'cos the queue has been
3070  * enqueued on the runlist and flagged safely at this point.
3071  */
3072 void
3073 qenable(queue_t *q)
3074 {
3075 	mutex_enter(QLOCK(q));
3076 	qenable_locked(q);
3077 	mutex_exit(QLOCK(q));
3078 }
3079 /*
3080  * Return number of messages on queue
3081  */
3082 int
3083 qsize(queue_t *qp)
3084 {
3085 	int count = 0;
3086 	mblk_t *mp;
3087 
3088 	mutex_enter(QLOCK(qp));
3089 	for (mp = qp->q_first; mp; mp = mp->b_next)
3090 		count++;
3091 	mutex_exit(QLOCK(qp));
3092 	return (count);
3093 }
3094 
3095 /*
3096  * noenable - set queue so that putq() will not enable it.
3097  * enableok - set queue so that putq() can enable it.
3098  */
3099 void
3100 noenable(queue_t *q)
3101 {
3102 	mutex_enter(QLOCK(q));
3103 	q->q_flag |= QNOENB;
3104 	mutex_exit(QLOCK(q));
3105 }
3106 
3107 void
3108 enableok(queue_t *q)
3109 {
3110 	mutex_enter(QLOCK(q));
3111 	q->q_flag &= ~QNOENB;
3112 	mutex_exit(QLOCK(q));
3113 }
3114 
3115 /*
3116  * Set queue fields.
3117  */
3118 int
3119 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
3120 {
3121 	qband_t *qbp = NULL;
3122 	queue_t	*wrq;
3123 	int error = 0;
3124 	kthread_id_t freezer;
3125 
3126 	freezer = STREAM(q)->sd_freezer;
3127 	if (freezer == curthread) {
3128 		ASSERT(frozenstr(q));
3129 		ASSERT(MUTEX_HELD(QLOCK(q)));
3130 	} else
3131 		mutex_enter(QLOCK(q));
3132 
3133 	if (what >= QBAD) {
3134 		error = EINVAL;
3135 		goto done;
3136 	}
3137 	if (pri != 0) {
3138 		int i;
3139 		qband_t **qbpp;
3140 
3141 		if (pri > q->q_nband) {
3142 			qbpp = &q->q_bandp;
3143 			while (*qbpp)
3144 				qbpp = &(*qbpp)->qb_next;
3145 			while (pri > q->q_nband) {
3146 				if ((*qbpp = allocband()) == NULL) {
3147 					error = EAGAIN;
3148 					goto done;
3149 				}
3150 				(*qbpp)->qb_hiwat = q->q_hiwat;
3151 				(*qbpp)->qb_lowat = q->q_lowat;
3152 				q->q_nband++;
3153 				qbpp = &(*qbpp)->qb_next;
3154 			}
3155 		}
3156 		qbp = q->q_bandp;
3157 		i = pri;
3158 		while (--i)
3159 			qbp = qbp->qb_next;
3160 	}
3161 	switch (what) {
3162 
3163 	case QHIWAT:
3164 		if (qbp)
3165 			qbp->qb_hiwat = (size_t)val;
3166 		else
3167 			q->q_hiwat = (size_t)val;
3168 		break;
3169 
3170 	case QLOWAT:
3171 		if (qbp)
3172 			qbp->qb_lowat = (size_t)val;
3173 		else
3174 			q->q_lowat = (size_t)val;
3175 		break;
3176 
3177 	case QMAXPSZ:
3178 		if (qbp)
3179 			error = EINVAL;
3180 		else
3181 			q->q_maxpsz = (ssize_t)val;
3182 
3183 		/*
3184 		 * Performance concern, strwrite looks at the module below
3185 		 * the stream head for the maxpsz each time it does a write
3186 		 * we now cache it at the stream head.  Check to see if this
3187 		 * queue is sitting directly below the stream head.
3188 		 */
3189 		wrq = STREAM(q)->sd_wrq;
3190 		if (q != wrq->q_next)
3191 			break;
3192 
3193 		/*
3194 		 * If the stream is not frozen drop the current QLOCK and
3195 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3196 		 */
3197 		if (freezer != curthread) {
3198 			mutex_exit(QLOCK(q));
3199 			mutex_enter(QLOCK(wrq));
3200 		}
3201 		ASSERT(MUTEX_HELD(QLOCK(wrq)));
3202 
3203 		if (strmsgsz != 0) {
3204 			if (val == INFPSZ)
3205 				val = strmsgsz;
3206 			else  {
3207 				if (STREAM(q)->sd_vnode->v_type == VFIFO)
3208 					val = MIN(PIPE_BUF, val);
3209 				else
3210 					val = MIN(strmsgsz, val);
3211 			}
3212 		}
3213 		STREAM(q)->sd_qn_maxpsz = val;
3214 		if (freezer != curthread) {
3215 			mutex_exit(QLOCK(wrq));
3216 			mutex_enter(QLOCK(q));
3217 		}
3218 		break;
3219 
3220 	case QMINPSZ:
3221 		if (qbp)
3222 			error = EINVAL;
3223 		else
3224 			q->q_minpsz = (ssize_t)val;
3225 
3226 		/*
3227 		 * Performance concern, strwrite looks at the module below
3228 		 * the stream head for the maxpsz each time it does a write
3229 		 * we now cache it at the stream head.  Check to see if this
3230 		 * queue is sitting directly below the stream head.
3231 		 */
3232 		wrq = STREAM(q)->sd_wrq;
3233 		if (q != wrq->q_next)
3234 			break;
3235 
3236 		/*
3237 		 * If the stream is not frozen drop the current QLOCK and
3238 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3239 		 */
3240 		if (freezer != curthread) {
3241 			mutex_exit(QLOCK(q));
3242 			mutex_enter(QLOCK(wrq));
3243 		}
3244 		STREAM(q)->sd_qn_minpsz = (ssize_t)val;
3245 
3246 		if (freezer != curthread) {
3247 			mutex_exit(QLOCK(wrq));
3248 			mutex_enter(QLOCK(q));
3249 		}
3250 		break;
3251 
3252 	case QSTRUIOT:
3253 		if (qbp)
3254 			error = EINVAL;
3255 		else
3256 			q->q_struiot = (ushort_t)val;
3257 		break;
3258 
3259 	case QCOUNT:
3260 	case QFIRST:
3261 	case QLAST:
3262 	case QFLAG:
3263 		error = EPERM;
3264 		break;
3265 
3266 	default:
3267 		error = EINVAL;
3268 		break;
3269 	}
3270 done:
3271 	if (freezer != curthread)
3272 		mutex_exit(QLOCK(q));
3273 	return (error);
3274 }
3275 
3276 /*
3277  * Get queue fields.
3278  */
3279 int
3280 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
3281 {
3282 	qband_t 	*qbp = NULL;
3283 	int 		error = 0;
3284 	kthread_id_t 	freezer;
3285 
3286 	freezer = STREAM(q)->sd_freezer;
3287 	if (freezer == curthread) {
3288 		ASSERT(frozenstr(q));
3289 		ASSERT(MUTEX_HELD(QLOCK(q)));
3290 	} else
3291 		mutex_enter(QLOCK(q));
3292 	if (what >= QBAD) {
3293 		error = EINVAL;
3294 		goto done;
3295 	}
3296 	if (pri != 0) {
3297 		int i;
3298 		qband_t **qbpp;
3299 
3300 		if (pri > q->q_nband) {
3301 			qbpp = &q->q_bandp;
3302 			while (*qbpp)
3303 				qbpp = &(*qbpp)->qb_next;
3304 			while (pri > q->q_nband) {
3305 				if ((*qbpp = allocband()) == NULL) {
3306 					error = EAGAIN;
3307 					goto done;
3308 				}
3309 				(*qbpp)->qb_hiwat = q->q_hiwat;
3310 				(*qbpp)->qb_lowat = q->q_lowat;
3311 				q->q_nband++;
3312 				qbpp = &(*qbpp)->qb_next;
3313 			}
3314 		}
3315 		qbp = q->q_bandp;
3316 		i = pri;
3317 		while (--i)
3318 			qbp = qbp->qb_next;
3319 	}
3320 	switch (what) {
3321 	case QHIWAT:
3322 		if (qbp)
3323 			*(size_t *)valp = qbp->qb_hiwat;
3324 		else
3325 			*(size_t *)valp = q->q_hiwat;
3326 		break;
3327 
3328 	case QLOWAT:
3329 		if (qbp)
3330 			*(size_t *)valp = qbp->qb_lowat;
3331 		else
3332 			*(size_t *)valp = q->q_lowat;
3333 		break;
3334 
3335 	case QMAXPSZ:
3336 		if (qbp)
3337 			error = EINVAL;
3338 		else
3339 			*(ssize_t *)valp = q->q_maxpsz;
3340 		break;
3341 
3342 	case QMINPSZ:
3343 		if (qbp)
3344 			error = EINVAL;
3345 		else
3346 			*(ssize_t *)valp = q->q_minpsz;
3347 		break;
3348 
3349 	case QCOUNT:
3350 		if (qbp)
3351 			*(size_t *)valp = qbp->qb_count;
3352 		else
3353 			*(size_t *)valp = q->q_count;
3354 		break;
3355 
3356 	case QFIRST:
3357 		if (qbp)
3358 			*(mblk_t **)valp = qbp->qb_first;
3359 		else
3360 			*(mblk_t **)valp = q->q_first;
3361 		break;
3362 
3363 	case QLAST:
3364 		if (qbp)
3365 			*(mblk_t **)valp = qbp->qb_last;
3366 		else
3367 			*(mblk_t **)valp = q->q_last;
3368 		break;
3369 
3370 	case QFLAG:
3371 		if (qbp)
3372 			*(uint_t *)valp = qbp->qb_flag;
3373 		else
3374 			*(uint_t *)valp = q->q_flag;
3375 		break;
3376 
3377 	case QSTRUIOT:
3378 		if (qbp)
3379 			error = EINVAL;
3380 		else
3381 			*(short *)valp = q->q_struiot;
3382 		break;
3383 
3384 	default:
3385 		error = EINVAL;
3386 		break;
3387 	}
3388 done:
3389 	if (freezer != curthread)
3390 		mutex_exit(QLOCK(q));
3391 	return (error);
3392 }
3393 
3394 /*
3395  * Function awakes all in cvwait/sigwait/pollwait, on one of:
3396  *	QWANTWSYNC or QWANTR or QWANTW,
3397  *
3398  * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
3399  *	 deferred wakeup will be done. Also if strpoll() in progress then a
3400  *	 deferred pollwakeup will be done.
3401  */
3402 void
3403 strwakeq(queue_t *q, int flag)
3404 {
3405 	stdata_t 	*stp = STREAM(q);
3406 	pollhead_t 	*pl;
3407 
3408 	mutex_enter(&stp->sd_lock);
3409 	pl = &stp->sd_pollist;
3410 	if (flag & QWANTWSYNC) {
3411 		ASSERT(!(q->q_flag & QREADR));
3412 		if (stp->sd_flag & WSLEEP) {
3413 			stp->sd_flag &= ~WSLEEP;
3414 			cv_broadcast(&stp->sd_wrq->q_wait);
3415 		} else {
3416 			stp->sd_wakeq |= WSLEEP;
3417 		}
3418 
3419 		mutex_exit(&stp->sd_lock);
3420 		pollwakeup(pl, POLLWRNORM);
3421 		mutex_enter(&stp->sd_lock);
3422 
3423 		if (stp->sd_sigflags & S_WRNORM)
3424 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3425 	} else if (flag & QWANTR) {
3426 		if (stp->sd_flag & RSLEEP) {
3427 			stp->sd_flag &= ~RSLEEP;
3428 			cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
3429 		} else {
3430 			stp->sd_wakeq |= RSLEEP;
3431 		}
3432 
3433 		mutex_exit(&stp->sd_lock);
3434 		pollwakeup(pl, POLLIN | POLLRDNORM);
3435 		mutex_enter(&stp->sd_lock);
3436 
3437 		{
3438 			int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
3439 
3440 			if (events)
3441 				strsendsig(stp->sd_siglist, events, 0, 0);
3442 		}
3443 	} else {
3444 		if (stp->sd_flag & WSLEEP) {
3445 			stp->sd_flag &= ~WSLEEP;
3446 			cv_broadcast(&stp->sd_wrq->q_wait);
3447 		}
3448 
3449 		mutex_exit(&stp->sd_lock);
3450 		pollwakeup(pl, POLLWRNORM);
3451 		mutex_enter(&stp->sd_lock);
3452 
3453 		if (stp->sd_sigflags & S_WRNORM)
3454 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3455 	}
3456 	mutex_exit(&stp->sd_lock);
3457 }
3458 
3459 int
3460 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
3461 {
3462 	stdata_t *stp = STREAM(q);
3463 	int typ  = STRUIOT_STANDARD;
3464 	uio_t	 *uiop = &dp->d_uio;
3465 	dblk_t	 *dbp;
3466 	ssize_t	 uiocnt;
3467 	ssize_t	 cnt;
3468 	unsigned char *ptr;
3469 	ssize_t	 resid;
3470 	int	 error = 0;
3471 	on_trap_data_t otd;
3472 	queue_t	*stwrq;
3473 
3474 	/*
3475 	 * Plumbing may change while taking the type so store the
3476 	 * queue in a temporary variable. It doesn't matter even
3477 	 * if the we take the type from the previous plumbing,
3478 	 * that's because if the plumbing has changed when we were
3479 	 * holding the queue in a temporary variable, we can continue
3480 	 * processing the message the way it would have been processed
3481 	 * in the old plumbing, without any side effects but a bit
3482 	 * extra processing for partial ip header checksum.
3483 	 *
3484 	 * This has been done to avoid holding the sd_lock which is
3485 	 * very hot.
3486 	 */
3487 
3488 	stwrq = stp->sd_struiowrq;
3489 	if (stwrq)
3490 		typ = stwrq->q_struiot;
3491 
3492 	for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
3493 		dbp = mp->b_datap;
3494 		ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
3495 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3496 		cnt = MIN(uiocnt, uiop->uio_resid);
3497 		if (!(dbp->db_struioflag & STRUIO_SPEC) ||
3498 		    (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
3499 			/*
3500 			 * Either this mblk has already been processed
3501 			 * or there is no more room in this mblk (?).
3502 			 */
3503 			continue;
3504 		}
3505 		switch (typ) {
3506 		case STRUIOT_STANDARD:
3507 			if (noblock) {
3508 				if (on_trap(&otd, OT_DATA_ACCESS)) {
3509 					no_trap();
3510 					error = EWOULDBLOCK;
3511 					goto out;
3512 				}
3513 			}
3514 			if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
3515 				if (noblock)
3516 					no_trap();
3517 				goto out;
3518 			}
3519 			if (noblock)
3520 				no_trap();
3521 			break;
3522 
3523 		default:
3524 			error = EIO;
3525 			goto out;
3526 		}
3527 		dbp->db_struioflag |= STRUIO_DONE;
3528 		dbp->db_cksumstuff += cnt;
3529 	}
3530 out:
3531 	if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
3532 		/*
3533 		 * A fault has occured and some bytes were moved to the
3534 		 * current mblk, the uio_t has already been updated by
3535 		 * the appropriate uio routine, so also update the mblk
3536 		 * to reflect this in case this same mblk chain is used
3537 		 * again (after the fault has been handled).
3538 		 */
3539 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3540 		if (uiocnt >= resid)
3541 			dbp->db_cksumstuff += resid;
3542 	}
3543 	return (error);
3544 }
3545 
3546 /*
3547  * Try to enter queue synchronously. Any attempt to enter a closing queue will
3548  * fails. The qp->q_rwcnt keeps track of the number of successful entries so
3549  * that removeq() will not try to close the queue while a thread is inside the
3550  * queue.
3551  */
3552 static boolean_t
3553 rwnext_enter(queue_t *qp)
3554 {
3555 	mutex_enter(QLOCK(qp));
3556 	if (qp->q_flag & QWCLOSE) {
3557 		mutex_exit(QLOCK(qp));
3558 		return (B_FALSE);
3559 	}
3560 	qp->q_rwcnt++;
3561 	ASSERT(qp->q_rwcnt != 0);
3562 	mutex_exit(QLOCK(qp));
3563 	return (B_TRUE);
3564 }
3565 
3566 /*
3567  * Decrease the count of threads running in sync stream queue and wake up any
3568  * threads blocked in removeq().
3569  */
3570 static void
3571 rwnext_exit(queue_t *qp)
3572 {
3573 	mutex_enter(QLOCK(qp));
3574 	qp->q_rwcnt--;
3575 	if (qp->q_flag & QWANTRMQSYNC) {
3576 		qp->q_flag &= ~QWANTRMQSYNC;
3577 		cv_broadcast(&qp->q_wait);
3578 	}
3579 	mutex_exit(QLOCK(qp));
3580 }
3581 
3582 /*
3583  * The purpose of rwnext() is to call the rw procedure of the next
3584  * (downstream) modules queue.
3585  *
3586  * treated as put entrypoint for perimeter syncronization.
3587  *
3588  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3589  * sync queues). If it is CIPUT sync queue sq_count is incremented and it does
3590  * not matter if any regular put entrypoints have been already entered. We
3591  * can't increment one of the sq_putcounts (instead of sq_count) because
3592  * qwait_rw won't know which counter to decrement.
3593  *
3594  * It would be reasonable to add the lockless FASTPUT logic.
3595  */
3596 int
3597 rwnext(queue_t *qp, struiod_t *dp)
3598 {
3599 	queue_t		*nqp;
3600 	syncq_t		*sq;
3601 	uint16_t	count;
3602 	uint16_t	flags;
3603 	struct qinit	*qi;
3604 	int		(*proc)();
3605 	struct stdata	*stp;
3606 	int		isread;
3607 	int		rval;
3608 
3609 	stp = STREAM(qp);
3610 	/*
3611 	 * Prevent q_next from changing by holding sd_lock until acquiring
3612 	 * SQLOCK. Note that a read-side rwnext from the streamhead will
3613 	 * already have sd_lock acquired. In either case sd_lock is always
3614 	 * released after acquiring SQLOCK.
3615 	 *
3616 	 * The streamhead read-side holding sd_lock when calling rwnext is
3617 	 * required to prevent a race condition were M_DATA mblks flowing
3618 	 * up the read-side of the stream could be bypassed by a rwnext()
3619 	 * down-call. In this case sd_lock acts as the streamhead perimeter.
3620 	 */
3621 	if ((nqp = _WR(qp)) == qp) {
3622 		isread = 0;
3623 		mutex_enter(&stp->sd_lock);
3624 		qp = nqp->q_next;
3625 	} else {
3626 		isread = 1;
3627 		if (nqp != stp->sd_wrq)
3628 			/* Not streamhead */
3629 			mutex_enter(&stp->sd_lock);
3630 		qp = _RD(nqp->q_next);
3631 	}
3632 	qi = qp->q_qinfo;
3633 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
3634 		/*
3635 		 * Not a synchronous module or no r/w procedure for this
3636 		 * queue, so just return EINVAL and let the caller handle it.
3637 		 */
3638 		mutex_exit(&stp->sd_lock);
3639 		return (EINVAL);
3640 	}
3641 
3642 	if (rwnext_enter(qp) == B_FALSE) {
3643 		mutex_exit(&stp->sd_lock);
3644 		return (EINVAL);
3645 	}
3646 
3647 	sq = qp->q_syncq;
3648 	mutex_enter(SQLOCK(sq));
3649 	mutex_exit(&stp->sd_lock);
3650 	count = sq->sq_count;
3651 	flags = sq->sq_flags;
3652 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3653 
3654 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3655 		/*
3656 		 * if this queue is being closed, return.
3657 		 */
3658 		if (qp->q_flag & QWCLOSE) {
3659 			mutex_exit(SQLOCK(sq));
3660 			rwnext_exit(qp);
3661 			return (EINVAL);
3662 		}
3663 
3664 		/*
3665 		 * Wait until we can enter the inner perimeter.
3666 		 */
3667 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3668 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3669 		count = sq->sq_count;
3670 		flags = sq->sq_flags;
3671 	}
3672 
3673 	if (isread == 0 && stp->sd_struiowrq == NULL ||
3674 	    isread == 1 && stp->sd_struiordq == NULL) {
3675 		/*
3676 		 * Stream plumbing changed while waiting for inner perimeter
3677 		 * so just return EINVAL and let the caller handle it.
3678 		 */
3679 		mutex_exit(SQLOCK(sq));
3680 		rwnext_exit(qp);
3681 		return (EINVAL);
3682 	}
3683 	if (!(flags & SQ_CIPUT))
3684 		sq->sq_flags = flags | SQ_EXCL;
3685 	sq->sq_count = count + 1;
3686 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3687 	/*
3688 	 * Note: The only message ordering guarantee that rwnext() makes is
3689 	 *	 for the write queue flow-control case. All others (r/w queue
3690 	 *	 with q_count > 0 (or q_first != 0)) are the resposibilty of
3691 	 *	 the queue's rw procedure. This could be genralized here buy
3692 	 *	 running the queue's service procedure, but that wouldn't be
3693 	 *	 the most efficent for all cases.
3694 	 */
3695 	mutex_exit(SQLOCK(sq));
3696 	if (! isread && (qp->q_flag & QFULL)) {
3697 		/*
3698 		 * Write queue may be flow controlled. If so,
3699 		 * mark the queue for wakeup when it's not.
3700 		 */
3701 		mutex_enter(QLOCK(qp));
3702 		if (qp->q_flag & QFULL) {
3703 			qp->q_flag |= QWANTWSYNC;
3704 			mutex_exit(QLOCK(qp));
3705 			rval = EWOULDBLOCK;
3706 			goto out;
3707 		}
3708 		mutex_exit(QLOCK(qp));
3709 	}
3710 
3711 	if (! isread && dp->d_mp)
3712 		STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
3713 		    dp->d_mp->b_datap->db_base);
3714 
3715 	rval = (*proc)(qp, dp);
3716 
3717 	if (isread && dp->d_mp)
3718 		STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
3719 		    dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
3720 out:
3721 	/*
3722 	 * The queue is protected from being freed by sq_count, so it is
3723 	 * safe to call rwnext_exit and reacquire SQLOCK(sq).
3724 	 */
3725 	rwnext_exit(qp);
3726 
3727 	mutex_enter(SQLOCK(sq));
3728 	flags = sq->sq_flags;
3729 	ASSERT(sq->sq_count != 0);
3730 	sq->sq_count--;
3731 	if (flags & SQ_TAIL) {
3732 		putnext_tail(sq, qp, flags);
3733 		/*
3734 		 * The only purpose of this ASSERT is to preserve calling stack
3735 		 * in DEBUG kernel.
3736 		 */
3737 		ASSERT(flags & SQ_TAIL);
3738 		return (rval);
3739 	}
3740 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3741 	/*
3742 	 * Safe to always drop SQ_EXCL:
3743 	 *	Not SQ_CIPUT means we set SQ_EXCL above
3744 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3745 	 *	did a qwriter(INNER) in which case nobody else
3746 	 *	is in the inner perimeter and we are exiting.
3747 	 *
3748 	 * I would like to make the following assertion:
3749 	 *
3750 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3751 	 * 	sq->sq_count == 0);
3752 	 *
3753 	 * which indicates that if we are both putshared and exclusive,
3754 	 * we became exclusive while executing the putproc, and the only
3755 	 * claim on the syncq was the one we dropped a few lines above.
3756 	 * But other threads that enter putnext while the syncq is exclusive
3757 	 * need to make a claim as they may need to drop SQLOCK in the
3758 	 * has_writers case to avoid deadlocks.  If these threads are
3759 	 * delayed or preempted, it is possible that the writer thread can
3760 	 * find out that there are other claims making the (sq_count == 0)
3761 	 * test invalid.
3762 	 */
3763 
3764 	sq->sq_flags = flags & ~SQ_EXCL;
3765 	if (sq->sq_flags & SQ_WANTWAKEUP) {
3766 		sq->sq_flags &= ~SQ_WANTWAKEUP;
3767 		cv_broadcast(&sq->sq_wait);
3768 	}
3769 	mutex_exit(SQLOCK(sq));
3770 	return (rval);
3771 }
3772 
3773 /*
3774  * The purpose of infonext() is to call the info procedure of the next
3775  * (downstream) modules queue.
3776  *
3777  * treated as put entrypoint for perimeter syncronization.
3778  *
3779  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3780  * sync queues). If it is CIPUT sync queue regular sq_count is incremented and
3781  * it does not matter if any regular put entrypoints have been already
3782  * entered.
3783  */
3784 int
3785 infonext(queue_t *qp, infod_t *idp)
3786 {
3787 	queue_t		*nqp;
3788 	syncq_t		*sq;
3789 	uint16_t	count;
3790 	uint16_t 	flags;
3791 	struct qinit	*qi;
3792 	int		(*proc)();
3793 	struct stdata	*stp;
3794 	int		rval;
3795 
3796 	stp = STREAM(qp);
3797 	/*
3798 	 * Prevent q_next from changing by holding sd_lock until
3799 	 * acquiring SQLOCK.
3800 	 */
3801 	mutex_enter(&stp->sd_lock);
3802 	if ((nqp = _WR(qp)) == qp) {
3803 		qp = nqp->q_next;
3804 	} else {
3805 		qp = _RD(nqp->q_next);
3806 	}
3807 	qi = qp->q_qinfo;
3808 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
3809 		mutex_exit(&stp->sd_lock);
3810 		return (EINVAL);
3811 	}
3812 	sq = qp->q_syncq;
3813 	mutex_enter(SQLOCK(sq));
3814 	mutex_exit(&stp->sd_lock);
3815 	count = sq->sq_count;
3816 	flags = sq->sq_flags;
3817 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3818 
3819 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3820 		/*
3821 		 * Wait until we can enter the inner perimeter.
3822 		 */
3823 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3824 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3825 		count = sq->sq_count;
3826 		flags = sq->sq_flags;
3827 	}
3828 
3829 	if (! (flags & SQ_CIPUT))
3830 		sq->sq_flags = flags | SQ_EXCL;
3831 	sq->sq_count = count + 1;
3832 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3833 	mutex_exit(SQLOCK(sq));
3834 
3835 	rval = (*proc)(qp, idp);
3836 
3837 	mutex_enter(SQLOCK(sq));
3838 	flags = sq->sq_flags;
3839 	ASSERT(sq->sq_count != 0);
3840 	sq->sq_count--;
3841 	if (flags & SQ_TAIL) {
3842 		putnext_tail(sq, qp, flags);
3843 		/*
3844 		 * The only purpose of this ASSERT is to preserve calling stack
3845 		 * in DEBUG kernel.
3846 		 */
3847 		ASSERT(flags & SQ_TAIL);
3848 		return (rval);
3849 	}
3850 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3851 /*
3852  * XXXX
3853  * I am not certain the next comment is correct here.  I need to consider
3854  * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
3855  * might cause other problems.  It just might be safer to drop it if
3856  * !SQ_CIPUT because that is when we set it.
3857  */
3858 	/*
3859 	 * Safe to always drop SQ_EXCL:
3860 	 *	Not SQ_CIPUT means we set SQ_EXCL above
3861 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3862 	 *	did a qwriter(INNER) in which case nobody else
3863 	 *	is in the inner perimeter and we are exiting.
3864 	 *
3865 	 * I would like to make the following assertion:
3866 	 *
3867 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3868 	 *	sq->sq_count == 0);
3869 	 *
3870 	 * which indicates that if we are both putshared and exclusive,
3871 	 * we became exclusive while executing the putproc, and the only
3872 	 * claim on the syncq was the one we dropped a few lines above.
3873 	 * But other threads that enter putnext while the syncq is exclusive
3874 	 * need to make a claim as they may need to drop SQLOCK in the
3875 	 * has_writers case to avoid deadlocks.  If these threads are
3876 	 * delayed or preempted, it is possible that the writer thread can
3877 	 * find out that there are other claims making the (sq_count == 0)
3878 	 * test invalid.
3879 	 */
3880 
3881 	sq->sq_flags = flags & ~SQ_EXCL;
3882 	mutex_exit(SQLOCK(sq));
3883 	return (rval);
3884 }
3885 
3886 /*
3887  * Return nonzero if the queue is responsible for struio(), else return 0.
3888  */
3889 int
3890 isuioq(queue_t *q)
3891 {
3892 	if (q->q_flag & QREADR)
3893 		return (STREAM(q)->sd_struiordq == q);
3894 	else
3895 		return (STREAM(q)->sd_struiowrq == q);
3896 }
3897 
3898 #if defined(__sparc)
3899 int disable_putlocks = 0;
3900 #else
3901 int disable_putlocks = 1;
3902 #endif
3903 
3904 /*
3905  * called by create_putlock.
3906  */
3907 static void
3908 create_syncq_putlocks(queue_t *q)
3909 {
3910 	syncq_t	*sq = q->q_syncq;
3911 	ciputctrl_t *cip;
3912 	int i;
3913 
3914 	ASSERT(sq != NULL);
3915 
3916 	ASSERT(disable_putlocks == 0);
3917 	ASSERT(n_ciputctrl >= min_n_ciputctrl);
3918 	ASSERT(ciputctrl_cache != NULL);
3919 
3920 	if (!(sq->sq_type & SQ_CIPUT))
3921 		return;
3922 
3923 	for (i = 0; i <= 1; i++) {
3924 		if (sq->sq_ciputctrl == NULL) {
3925 			cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
3926 			SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
3927 			mutex_enter(SQLOCK(sq));
3928 			if (sq->sq_ciputctrl != NULL) {
3929 				mutex_exit(SQLOCK(sq));
3930 				kmem_cache_free(ciputctrl_cache, cip);
3931 			} else {
3932 				ASSERT(sq->sq_nciputctrl == 0);
3933 				sq->sq_nciputctrl = n_ciputctrl - 1;
3934 				/*
3935 				 * putnext checks sq_ciputctrl without holding
3936 				 * SQLOCK. if it is not NULL putnext assumes
3937 				 * sq_nciputctrl is initialized. membar below
3938 				 * insures that.
3939 				 */
3940 				membar_producer();
3941 				sq->sq_ciputctrl = cip;
3942 				mutex_exit(SQLOCK(sq));
3943 			}
3944 		}
3945 		ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
3946 		if (i == 1)
3947 			break;
3948 		q = _OTHERQ(q);
3949 		if (!(q->q_flag & QPERQ)) {
3950 			ASSERT(sq == q->q_syncq);
3951 			break;
3952 		}
3953 		ASSERT(q->q_syncq != NULL);
3954 		ASSERT(sq != q->q_syncq);
3955 		sq = q->q_syncq;
3956 		ASSERT(sq->sq_type & SQ_CIPUT);
3957 	}
3958 }
3959 
3960 /*
3961  * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
3962  * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
3963  * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
3964  * starting from q and down to the driver.
3965  *
3966  * This should be called after the affected queues are part of stream
3967  * geometry. It should be called from driver/module open routine after
3968  * qprocson() call. It is also called from nfs syscall where it is known that
3969  * stream is configured and won't change its geometry during create_putlock
3970  * call.
3971  *
3972  * caller normally uses 0 value for the stream argument to speed up MT putnext
3973  * into the perimeter of q for example because its perimeter is per module
3974  * (e.g. IP).
3975  *
3976  * caller normally uses non 0 value for the stream argument to hint the system
3977  * that the stream of q is a very contended global system stream
3978  * (e.g. NFS/UDP) and the part of the stream from q to the driver is
3979  * particularly MT hot.
3980  *
3981  * Caller insures stream plumbing won't happen while we are here and therefore
3982  * q_next can be safely used.
3983  */
3984 
3985 void
3986 create_putlocks(queue_t *q, int stream)
3987 {
3988 	ciputctrl_t	*cip;
3989 	struct stdata	*stp = STREAM(q);
3990 
3991 	q = _WR(q);
3992 	ASSERT(stp != NULL);
3993 
3994 	if (disable_putlocks != 0)
3995 		return;
3996 
3997 	if (n_ciputctrl < min_n_ciputctrl)
3998 		return;
3999 
4000 	ASSERT(ciputctrl_cache != NULL);
4001 
4002 	if (stream != 0 && stp->sd_ciputctrl == NULL) {
4003 		cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4004 		SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4005 		mutex_enter(&stp->sd_lock);
4006 		if (stp->sd_ciputctrl != NULL) {
4007 			mutex_exit(&stp->sd_lock);
4008 			kmem_cache_free(ciputctrl_cache, cip);
4009 		} else {
4010 			ASSERT(stp->sd_nciputctrl == 0);
4011 			stp->sd_nciputctrl = n_ciputctrl - 1;
4012 			/*
4013 			 * putnext checks sd_ciputctrl without holding
4014 			 * sd_lock. if it is not NULL putnext assumes
4015 			 * sd_nciputctrl is initialized. membar below
4016 			 * insures that.
4017 			 */
4018 			membar_producer();
4019 			stp->sd_ciputctrl = cip;
4020 			mutex_exit(&stp->sd_lock);
4021 		}
4022 	}
4023 
4024 	ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);
4025 
4026 	while (_SAMESTR(q)) {
4027 		create_syncq_putlocks(q);
4028 		if (stream == 0)
4029 			return;
4030 		q = q->q_next;
4031 	}
4032 	ASSERT(q != NULL);
4033 	create_syncq_putlocks(q);
4034 }
4035 
4036 /*
4037  * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
4038  * through a stream.
4039  *
4040  * Data currently record per-event is a timestamp, module/driver name,
4041  * downstream module/driver name, optional callstack, event type and a per
4042  * type datum.  Much of the STREAMS framework is instrumented for automatic
4043  * flow tracing (when enabled).  Events can be defined and used by STREAMS
4044  * modules and drivers.
4045  *
4046  * Global objects:
4047  *
4048  *	str_ftevent() - Add a flow-trace event to a dblk.
4049  *	str_ftfree() - Free flow-trace data
4050  *
4051  * Local objects:
4052  *
4053  *	fthdr_cache - pointer to the kmem cache for trace header.
4054  *	ftblk_cache - pointer to the kmem cache for trace data blocks.
4055  */
4056 
4057 int str_ftnever = 1;	/* Don't do STREAMS flow tracing */
4058 int str_ftstack = 0;	/* Don't record event call stacks */
4059 
4060 void
4061 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
4062 {
4063 	ftblk_t *bp = hp->tail;
4064 	ftblk_t *nbp;
4065 	ftevnt_t *ep;
4066 	int ix, nix;
4067 
4068 	ASSERT(hp != NULL);
4069 
4070 	for (;;) {
4071 		if ((ix = bp->ix) == FTBLK_EVNTS) {
4072 			/*
4073 			 * Tail doesn't have room, so need a new tail.
4074 			 *
4075 			 * To make this MT safe, first, allocate a new
4076 			 * ftblk, and initialize it.  To make life a
4077 			 * little easier, reserve the first slot (mostly
4078 			 * by making ix = 1).  When we are finished with
4079 			 * the initialization, CAS this pointer to the
4080 			 * tail.  If this succeeds, this is the new
4081 			 * "next" block.  Otherwise, another thread
4082 			 * got here first, so free the block and start
4083 			 * again.
4084 			 */
4085 			nbp = kmem_cache_alloc(ftblk_cache, KM_NOSLEEP);
4086 			if (nbp == NULL) {
4087 				/* no mem, so punt */
4088 				str_ftnever++;
4089 				/* free up all flow data? */
4090 				return;
4091 			}
4092 			nbp->nxt = NULL;
4093 			nbp->ix = 1;
4094 			/*
4095 			 * Just in case there is another thread about
4096 			 * to get the next index, we need to make sure
4097 			 * the value is there for it.
4098 			 */
4099 			membar_producer();
4100 			if (casptr(&hp->tail, bp, nbp) == bp) {
4101 				/* CAS was successful */
4102 				bp->nxt = nbp;
4103 				membar_producer();
4104 				bp = nbp;
4105 				ix = 0;
4106 				goto cas_good;
4107 			} else {
4108 				kmem_cache_free(ftblk_cache, nbp);
4109 				bp = hp->tail;
4110 				continue;
4111 			}
4112 		}
4113 		nix = ix + 1;
4114 		if (cas32((uint32_t *)&bp->ix, ix, nix) == ix) {
4115 		cas_good:
4116 			if (curthread != hp->thread) {
4117 				hp->thread = curthread;
4118 				evnt |= FTEV_CS;
4119 			}
4120 			if (CPU->cpu_seqid != hp->cpu_seqid) {
4121 				hp->cpu_seqid = CPU->cpu_seqid;
4122 				evnt |= FTEV_PS;
4123 			}
4124 			ep = &bp->ev[ix];
4125 			break;
4126 		}
4127 	}
4128 
4129 	if (evnt & FTEV_QMASK) {
4130 		queue_t *qp = p;
4131 
4132 		if (!(qp->q_flag & QREADR))
4133 			evnt |= FTEV_ISWR;
4134 
4135 		ep->mid = Q2NAME(qp);
4136 
4137 		/*
4138 		 * We only record the next queue name for FTEV_PUTNEXT since
4139 		 * that's the only time we *really* need it, and the putnext()
4140 		 * code ensures that qp->q_next won't vanish.  (We could use
4141 		 * claimstr()/releasestr() but at a performance cost.)
4142 		 */
4143 		if ((evnt & FTEV_MASK) == FTEV_PUTNEXT && qp->q_next != NULL)
4144 			ep->midnext = Q2NAME(qp->q_next);
4145 		else
4146 			ep->midnext = NULL;
4147 	} else {
4148 		ep->mid = p;
4149 		ep->midnext = NULL;
4150 	}
4151 
4152 	if (ep->stk != NULL)
4153 		ep->stk->fs_depth = getpcstack(ep->stk->fs_stk, FTSTK_DEPTH);
4154 
4155 	ep->ts = gethrtime();
4156 	ep->evnt = evnt;
4157 	ep->data = data;
4158 	hp->hash = (hp->hash << 9) + hp->hash;
4159 	hp->hash += (evnt << 16) | data;
4160 	hp->hash += (uintptr_t)ep->mid;
4161 }
4162 
4163 /*
4164  * Free flow-trace data.
4165  */
4166 void
4167 str_ftfree(dblk_t *dbp)
4168 {
4169 	fthdr_t *hp = dbp->db_fthdr;
4170 	ftblk_t *bp = &hp->first;
4171 	ftblk_t *nbp;
4172 
4173 	if (bp != hp->tail || bp->ix != 0) {
4174 		/*
4175 		 * Clear out the hash, have the tail point to itself, and free
4176 		 * any continuation blocks.
4177 		 */
4178 		bp = hp->first.nxt;
4179 		hp->tail = &hp->first;
4180 		hp->hash = 0;
4181 		hp->first.nxt = NULL;
4182 		hp->first.ix = 0;
4183 		while (bp != NULL) {
4184 			nbp = bp->nxt;
4185 			kmem_cache_free(ftblk_cache, bp);
4186 			bp = nbp;
4187 		}
4188 	}
4189 	kmem_cache_free(fthdr_cache, hp);
4190 	dbp->db_fthdr = NULL;
4191 }
4192