xref: /illumos-gate/usr/src/uts/common/io/stream.c (revision 8521e5e6630b57b9883c3979cd5589e53f09e044)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
22 /*	  All Rights Reserved  	*/
23 
24 
25 /*
26  * Copyright 2007 Sun Microsystems, Inc.  All rights reserved.
27  * Use is subject to license terms.
28  */
29 
30 #pragma ident	"%Z%%M%	%I%	%E% SMI"
31 
32 #include <sys/types.h>
33 #include <sys/param.h>
34 #include <sys/thread.h>
35 #include <sys/sysmacros.h>
36 #include <sys/stropts.h>
37 #include <sys/stream.h>
38 #include <sys/strsubr.h>
39 #include <sys/strsun.h>
40 #include <sys/conf.h>
41 #include <sys/debug.h>
42 #include <sys/cmn_err.h>
43 #include <sys/kmem.h>
44 #include <sys/atomic.h>
45 #include <sys/errno.h>
46 #include <sys/vtrace.h>
47 #include <sys/ftrace.h>
48 #include <sys/ontrap.h>
49 #include <sys/multidata.h>
50 #include <sys/multidata_impl.h>
51 #include <sys/sdt.h>
52 #include <sys/strft.h>
53 
54 #ifdef DEBUG
55 #include <sys/kmem_impl.h>
56 #endif
57 
58 /*
59  * This file contains all the STREAMS utility routines that may
60  * be used by modules and drivers.
61  */
62 
63 /*
64  * STREAMS message allocator: principles of operation
65  *
66  * The streams message allocator consists of all the routines that
67  * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
68  * dupb(), freeb() and freemsg().  What follows is a high-level view
69  * of how the allocator works.
70  *
71  * Every streams message consists of one or more mblks, a dblk, and data.
72  * All mblks for all types of messages come from a common mblk_cache.
73  * The dblk and data come in several flavors, depending on how the
74  * message is allocated:
75  *
76  * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
77  *     fixed-size dblk/data caches. For message sizes that are multiples of
78  *     PAGESIZE, dblks are allocated separately from the buffer.
79  *     The associated buffer is allocated by the constructor using kmem_alloc().
80  *     For all other message sizes, dblk and its associated data is allocated
81  *     as a single contiguous chunk of memory.
82  *     Objects in these caches consist of a dblk plus its associated data.
83  *     allocb() determines the nearest-size cache by table lookup:
84  *     the dblk_cache[] array provides the mapping from size to dblk cache.
85  *
86  * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
87  *     kmem_alloc()'ing a buffer for the data and supplying that
88  *     buffer to gesballoc(), described below.
89  *
90  * (3) The four flavors of [d]esballoc[a] are all implemented by a
91  *     common routine, gesballoc() ("generic esballoc").  gesballoc()
92  *     allocates a dblk from the global dblk_esb_cache and sets db_base,
93  *     db_lim and db_frtnp to describe the caller-supplied buffer.
94  *
95  * While there are several routines to allocate messages, there is only
96  * one routine to free messages: freeb().  freeb() simply invokes the
97  * dblk's free method, dbp->db_free(), which is set at allocation time.
98  *
99  * dupb() creates a new reference to a message by allocating a new mblk,
100  * incrementing the dblk reference count and setting the dblk's free
101  * method to dblk_decref().  The dblk's original free method is retained
102  * in db_lastfree.  dblk_decref() decrements the reference count on each
103  * freeb().  If this is not the last reference it just frees the mblk;
104  * if this *is* the last reference, it restores db_free to db_lastfree,
105  * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
106  *
107  * The implementation makes aggressive use of kmem object caching for
108  * maximum performance.  This makes the code simple and compact, but
109  * also a bit abstruse in some places.  The invariants that constitute a
110  * message's constructed state, described below, are more subtle than usual.
111  *
112  * Every dblk has an "attached mblk" as part of its constructed state.
113  * The mblk is allocated by the dblk's constructor and remains attached
114  * until the message is either dup'ed or pulled up.  In the dupb() case
115  * the mblk association doesn't matter until the last free, at which time
116  * dblk_decref() attaches the last mblk to the dblk.  pullupmsg() affects
117  * the mblk association because it swaps the leading mblks of two messages,
118  * so it is responsible for swapping their db_mblk pointers accordingly.
119  * From a constructed-state viewpoint it doesn't matter that a dblk's
120  * attached mblk can change while the message is allocated; all that
121  * matters is that the dblk has *some* attached mblk when it's freed.
122  *
123  * The sizes of the allocb() small-message caches are not magical.
124  * They represent a good trade-off between internal and external
125  * fragmentation for current workloads.  They should be reevaluated
126  * periodically, especially if allocations larger than DBLK_MAX_CACHE
127  * become common.  We use 64-byte alignment so that dblks don't
128  * straddle cache lines unnecessarily.
129  */
130 #define	DBLK_MAX_CACHE		73728
131 #define	DBLK_CACHE_ALIGN	64
132 #define	DBLK_MIN_SIZE		8
133 #define	DBLK_SIZE_SHIFT		3
134 
135 #ifdef _BIG_ENDIAN
136 #define	DBLK_RTFU_SHIFT(field)	\
137 	(8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
138 #else
139 #define	DBLK_RTFU_SHIFT(field)	\
140 	(8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
141 #endif
142 
143 #define	DBLK_RTFU(ref, type, flags, uioflag)	\
144 	(((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
145 	((type) << DBLK_RTFU_SHIFT(db_type)) | \
146 	(((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
147 	((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
148 #define	DBLK_RTFU_REF_MASK	(DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
149 #define	DBLK_RTFU_WORD(dbp)	(*((uint32_t *)&(dbp)->db_ref))
150 #define	MBLK_BAND_FLAG_WORD(mp)	(*((uint32_t *)&(mp)->b_band))
151 
152 static size_t dblk_sizes[] = {
153 #ifdef _LP64
154 	16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3920,
155 	8192, 12112, 16384, 20304, 24576, 28496, 32768, 36688,
156 	40960, 44880, 49152, 53072, 57344, 61264, 65536, 69456,
157 #else
158 	64, 128, 320, 576, 1088, 1536, 1984, 2624, 3968,
159 	8192, 12160, 16384, 20352, 24576, 28544, 32768, 36736,
160 	40960, 44928, 49152, 53120, 57344, 61312, 65536, 69504,
161 #endif
162 	DBLK_MAX_CACHE, 0
163 };
164 
165 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
166 static struct kmem_cache *mblk_cache;
167 static struct kmem_cache *dblk_esb_cache;
168 static struct kmem_cache *fthdr_cache;
169 static struct kmem_cache *ftblk_cache;
170 
171 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
172 static mblk_t *allocb_oversize(size_t size, int flags);
173 static int allocb_tryhard_fails;
174 static void frnop_func(void *arg);
175 frtn_t frnop = { frnop_func };
176 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
177 
178 static boolean_t rwnext_enter(queue_t *qp);
179 static void rwnext_exit(queue_t *qp);
180 
181 /*
182  * Patchable mblk/dblk kmem_cache flags.
183  */
184 int dblk_kmem_flags = 0;
185 int mblk_kmem_flags = 0;
186 
187 
188 static int
189 dblk_constructor(void *buf, void *cdrarg, int kmflags)
190 {
191 	dblk_t *dbp = buf;
192 	ssize_t msg_size = (ssize_t)cdrarg;
193 	size_t index;
194 
195 	ASSERT(msg_size != 0);
196 
197 	index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
198 
199 	ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
200 
201 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
202 		return (-1);
203 	if ((msg_size & PAGEOFFSET) == 0) {
204 		dbp->db_base = kmem_alloc(msg_size, kmflags);
205 		if (dbp->db_base == NULL) {
206 			kmem_cache_free(mblk_cache, dbp->db_mblk);
207 			return (-1);
208 		}
209 	} else {
210 		dbp->db_base = (unsigned char *)&dbp[1];
211 	}
212 
213 	dbp->db_mblk->b_datap = dbp;
214 	dbp->db_cache = dblk_cache[index];
215 	dbp->db_lim = dbp->db_base + msg_size;
216 	dbp->db_free = dbp->db_lastfree = dblk_lastfree;
217 	dbp->db_frtnp = NULL;
218 	dbp->db_fthdr = NULL;
219 	dbp->db_credp = NULL;
220 	dbp->db_cpid = -1;
221 	dbp->db_struioflag = 0;
222 	dbp->db_struioun.cksum.flags = 0;
223 	return (0);
224 }
225 
226 /*ARGSUSED*/
227 static int
228 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
229 {
230 	dblk_t *dbp = buf;
231 
232 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
233 		return (-1);
234 	dbp->db_mblk->b_datap = dbp;
235 	dbp->db_cache = dblk_esb_cache;
236 	dbp->db_fthdr = NULL;
237 	dbp->db_credp = NULL;
238 	dbp->db_cpid = -1;
239 	dbp->db_struioflag = 0;
240 	dbp->db_struioun.cksum.flags = 0;
241 	return (0);
242 }
243 
244 static int
245 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
246 {
247 	dblk_t *dbp = buf;
248 	bcache_t *bcp = (bcache_t *)cdrarg;
249 
250 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
251 		return (-1);
252 
253 	if ((dbp->db_base = (unsigned char *)kmem_cache_alloc(bcp->buffer_cache,
254 	    kmflags)) == NULL) {
255 		kmem_cache_free(mblk_cache, dbp->db_mblk);
256 		return (-1);
257 	}
258 
259 	dbp->db_mblk->b_datap = dbp;
260 	dbp->db_cache = (void *)bcp;
261 	dbp->db_lim = dbp->db_base + bcp->size;
262 	dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
263 	dbp->db_frtnp = NULL;
264 	dbp->db_fthdr = NULL;
265 	dbp->db_credp = NULL;
266 	dbp->db_cpid = -1;
267 	dbp->db_struioflag = 0;
268 	dbp->db_struioun.cksum.flags = 0;
269 	return (0);
270 }
271 
272 /*ARGSUSED*/
273 static void
274 dblk_destructor(void *buf, void *cdrarg)
275 {
276 	dblk_t *dbp = buf;
277 	ssize_t msg_size = (ssize_t)cdrarg;
278 
279 	ASSERT(dbp->db_mblk->b_datap == dbp);
280 
281 	ASSERT(msg_size != 0);
282 
283 	ASSERT(dbp->db_struioflag == 0);
284 	ASSERT(dbp->db_struioun.cksum.flags == 0);
285 
286 	if ((msg_size & PAGEOFFSET) == 0) {
287 		kmem_free(dbp->db_base, msg_size);
288 	}
289 
290 	kmem_cache_free(mblk_cache, dbp->db_mblk);
291 }
292 
293 static void
294 bcache_dblk_destructor(void *buf, void *cdrarg)
295 {
296 	dblk_t *dbp = buf;
297 	bcache_t *bcp = (bcache_t *)cdrarg;
298 
299 	kmem_cache_free(bcp->buffer_cache, dbp->db_base);
300 
301 	ASSERT(dbp->db_mblk->b_datap == dbp);
302 
303 	ASSERT(dbp->db_struioflag == 0);
304 	ASSERT(dbp->db_struioun.cksum.flags == 0);
305 
306 	kmem_cache_free(mblk_cache, dbp->db_mblk);
307 }
308 
309 void
310 streams_msg_init(void)
311 {
312 	char name[40];
313 	size_t size;
314 	size_t lastsize = DBLK_MIN_SIZE;
315 	size_t *sizep;
316 	struct kmem_cache *cp;
317 	size_t tot_size;
318 	int offset;
319 
320 	mblk_cache = kmem_cache_create("streams_mblk",
321 		sizeof (mblk_t), 32, NULL, NULL, NULL, NULL, NULL,
322 		mblk_kmem_flags);
323 
324 	for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
325 
326 		if ((offset = (size & PAGEOFFSET)) != 0) {
327 			/*
328 			 * We are in the middle of a page, dblk should
329 			 * be allocated on the same page
330 			 */
331 			tot_size = size + sizeof (dblk_t);
332 			ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
333 								< PAGESIZE);
334 			ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
335 
336 		} else {
337 
338 			/*
339 			 * buf size is multiple of page size, dblk and
340 			 * buffer are allocated separately.
341 			 */
342 
343 			ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
344 			tot_size = sizeof (dblk_t);
345 		}
346 
347 		(void) sprintf(name, "streams_dblk_%ld", size);
348 		cp = kmem_cache_create(name, tot_size,
349 			DBLK_CACHE_ALIGN, dblk_constructor,
350 			dblk_destructor, NULL,
351 			(void *)(size), NULL, dblk_kmem_flags);
352 
353 		while (lastsize <= size) {
354 			dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
355 			lastsize += DBLK_MIN_SIZE;
356 		}
357 	}
358 
359 	dblk_esb_cache = kmem_cache_create("streams_dblk_esb",
360 			sizeof (dblk_t), DBLK_CACHE_ALIGN,
361 			dblk_esb_constructor, dblk_destructor, NULL,
362 			(void *) sizeof (dblk_t), NULL, dblk_kmem_flags);
363 	fthdr_cache = kmem_cache_create("streams_fthdr",
364 		sizeof (fthdr_t), 32, NULL, NULL, NULL, NULL, NULL, 0);
365 	ftblk_cache = kmem_cache_create("streams_ftblk",
366 		sizeof (ftblk_t), 32, NULL, NULL, NULL, NULL, NULL, 0);
367 
368 	/* Initialize Multidata caches */
369 	mmd_init();
370 
371 	/* initialize throttling queue for esballoc */
372 	esballoc_queue_init();
373 }
374 
375 /*ARGSUSED*/
376 mblk_t *
377 allocb(size_t size, uint_t pri)
378 {
379 	dblk_t *dbp;
380 	mblk_t *mp;
381 	size_t index;
382 
383 	index =  (size - 1)  >> DBLK_SIZE_SHIFT;
384 
385 	if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
386 		if (size != 0) {
387 			mp = allocb_oversize(size, KM_NOSLEEP);
388 			goto out;
389 		}
390 		index = 0;
391 	}
392 
393 	if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
394 		mp = NULL;
395 		goto out;
396 	}
397 
398 	mp = dbp->db_mblk;
399 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
400 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
401 	mp->b_rptr = mp->b_wptr = dbp->db_base;
402 	mp->b_queue = NULL;
403 	MBLK_BAND_FLAG_WORD(mp) = 0;
404 	STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
405 out:
406 	FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
407 
408 	return (mp);
409 }
410 
411 mblk_t *
412 allocb_tmpl(size_t size, const mblk_t *tmpl)
413 {
414 	mblk_t *mp = allocb(size, 0);
415 
416 	if (mp != NULL) {
417 		cred_t *cr = DB_CRED(tmpl);
418 		if (cr != NULL)
419 			crhold(mp->b_datap->db_credp = cr);
420 		DB_CPID(mp) = DB_CPID(tmpl);
421 		DB_TYPE(mp) = DB_TYPE(tmpl);
422 	}
423 	return (mp);
424 }
425 
426 mblk_t *
427 allocb_cred(size_t size, cred_t *cr)
428 {
429 	mblk_t *mp = allocb(size, 0);
430 
431 	if (mp != NULL && cr != NULL)
432 		crhold(mp->b_datap->db_credp = cr);
433 
434 	return (mp);
435 }
436 
437 mblk_t *
438 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr)
439 {
440 	mblk_t *mp = allocb_wait(size, 0, flags, error);
441 
442 	if (mp != NULL && cr != NULL)
443 		crhold(mp->b_datap->db_credp = cr);
444 
445 	return (mp);
446 }
447 
448 void
449 freeb(mblk_t *mp)
450 {
451 	dblk_t *dbp = mp->b_datap;
452 
453 	ASSERT(dbp->db_ref > 0);
454 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
455 	FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
456 
457 	STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
458 
459 	dbp->db_free(mp, dbp);
460 }
461 
462 void
463 freemsg(mblk_t *mp)
464 {
465 	FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
466 	while (mp) {
467 		dblk_t *dbp = mp->b_datap;
468 		mblk_t *mp_cont = mp->b_cont;
469 
470 		ASSERT(dbp->db_ref > 0);
471 		ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
472 
473 		STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
474 
475 		dbp->db_free(mp, dbp);
476 		mp = mp_cont;
477 	}
478 }
479 
480 /*
481  * Reallocate a block for another use.  Try hard to use the old block.
482  * If the old data is wanted (copy), leave b_wptr at the end of the data,
483  * otherwise return b_wptr = b_rptr.
484  *
485  * This routine is private and unstable.
486  */
487 mblk_t	*
488 reallocb(mblk_t *mp, size_t size, uint_t copy)
489 {
490 	mblk_t		*mp1;
491 	unsigned char	*old_rptr;
492 	ptrdiff_t	cur_size;
493 
494 	if (mp == NULL)
495 		return (allocb(size, BPRI_HI));
496 
497 	cur_size = mp->b_wptr - mp->b_rptr;
498 	old_rptr = mp->b_rptr;
499 
500 	ASSERT(mp->b_datap->db_ref != 0);
501 
502 	if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
503 		/*
504 		 * If the data is wanted and it will fit where it is, no
505 		 * work is required.
506 		 */
507 		if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
508 			return (mp);
509 
510 		mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
511 		mp1 = mp;
512 	} else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
513 		/* XXX other mp state could be copied too, db_flags ... ? */
514 		mp1->b_cont = mp->b_cont;
515 	} else {
516 		return (NULL);
517 	}
518 
519 	if (copy) {
520 		bcopy(old_rptr, mp1->b_rptr, cur_size);
521 		mp1->b_wptr = mp1->b_rptr + cur_size;
522 	}
523 
524 	if (mp != mp1)
525 		freeb(mp);
526 
527 	return (mp1);
528 }
529 
530 static void
531 dblk_lastfree(mblk_t *mp, dblk_t *dbp)
532 {
533 	ASSERT(dbp->db_mblk == mp);
534 	if (dbp->db_fthdr != NULL)
535 		str_ftfree(dbp);
536 
537 	/* set credp and projid to be 'unspecified' before returning to cache */
538 	if (dbp->db_credp != NULL) {
539 		crfree(dbp->db_credp);
540 		dbp->db_credp = NULL;
541 	}
542 	dbp->db_cpid = -1;
543 
544 	/* Reset the struioflag and the checksum flag fields */
545 	dbp->db_struioflag = 0;
546 	dbp->db_struioun.cksum.flags = 0;
547 
548 	/* and the COOKED flag */
549 	dbp->db_flags &= ~DBLK_COOKED;
550 
551 	kmem_cache_free(dbp->db_cache, dbp);
552 }
553 
554 static void
555 dblk_decref(mblk_t *mp, dblk_t *dbp)
556 {
557 	if (dbp->db_ref != 1) {
558 		uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
559 		    -(1 << DBLK_RTFU_SHIFT(db_ref)));
560 		/*
561 		 * atomic_add_32_nv() just decremented db_ref, so we no longer
562 		 * have a reference to the dblk, which means another thread
563 		 * could free it.  Therefore we cannot examine the dblk to
564 		 * determine whether ours was the last reference.  Instead,
565 		 * we extract the new and minimum reference counts from rtfu.
566 		 * Note that all we're really saying is "if (ref != refmin)".
567 		 */
568 		if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
569 		    ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
570 			kmem_cache_free(mblk_cache, mp);
571 			return;
572 		}
573 	}
574 	dbp->db_mblk = mp;
575 	dbp->db_free = dbp->db_lastfree;
576 	dbp->db_lastfree(mp, dbp);
577 }
578 
579 mblk_t *
580 dupb(mblk_t *mp)
581 {
582 	dblk_t *dbp = mp->b_datap;
583 	mblk_t *new_mp;
584 	uint32_t oldrtfu, newrtfu;
585 
586 	if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
587 		goto out;
588 
589 	new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
590 	new_mp->b_rptr = mp->b_rptr;
591 	new_mp->b_wptr = mp->b_wptr;
592 	new_mp->b_datap = dbp;
593 	new_mp->b_queue = NULL;
594 	MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
595 
596 	STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
597 
598 	dbp->db_free = dblk_decref;
599 	do {
600 		ASSERT(dbp->db_ref > 0);
601 		oldrtfu = DBLK_RTFU_WORD(dbp);
602 		newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
603 		/*
604 		 * If db_ref is maxed out we can't dup this message anymore.
605 		 */
606 		if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
607 			kmem_cache_free(mblk_cache, new_mp);
608 			new_mp = NULL;
609 			goto out;
610 		}
611 	} while (cas32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != oldrtfu);
612 
613 out:
614 	FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
615 	return (new_mp);
616 }
617 
618 static void
619 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
620 {
621 	frtn_t *frp = dbp->db_frtnp;
622 
623 	ASSERT(dbp->db_mblk == mp);
624 	frp->free_func(frp->free_arg);
625 	if (dbp->db_fthdr != NULL)
626 		str_ftfree(dbp);
627 
628 	/* set credp and projid to be 'unspecified' before returning to cache */
629 	if (dbp->db_credp != NULL) {
630 		crfree(dbp->db_credp);
631 		dbp->db_credp = NULL;
632 	}
633 	dbp->db_cpid = -1;
634 	dbp->db_struioflag = 0;
635 	dbp->db_struioun.cksum.flags = 0;
636 
637 	kmem_cache_free(dbp->db_cache, dbp);
638 }
639 
640 /*ARGSUSED*/
641 static void
642 frnop_func(void *arg)
643 {
644 }
645 
646 /*
647  * Generic esballoc used to implement the four flavors: [d]esballoc[a].
648  */
649 static mblk_t *
650 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
651 	void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
652 {
653 	dblk_t *dbp;
654 	mblk_t *mp;
655 
656 	ASSERT(base != NULL && frp != NULL);
657 
658 	if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
659 		mp = NULL;
660 		goto out;
661 	}
662 
663 	mp = dbp->db_mblk;
664 	dbp->db_base = base;
665 	dbp->db_lim = base + size;
666 	dbp->db_free = dbp->db_lastfree = lastfree;
667 	dbp->db_frtnp = frp;
668 	DBLK_RTFU_WORD(dbp) = db_rtfu;
669 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
670 	mp->b_rptr = mp->b_wptr = base;
671 	mp->b_queue = NULL;
672 	MBLK_BAND_FLAG_WORD(mp) = 0;
673 
674 out:
675 	FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
676 	return (mp);
677 }
678 
679 /*ARGSUSED*/
680 mblk_t *
681 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
682 {
683 	mblk_t *mp;
684 
685 	/*
686 	 * Note that this is structured to allow the common case (i.e.
687 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
688 	 * call optimization.
689 	 */
690 	if (!str_ftnever) {
691 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
692 		    frp, freebs_enqueue, KM_NOSLEEP);
693 
694 		if (mp != NULL)
695 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
696 		return (mp);
697 	}
698 
699 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
700 	    frp, freebs_enqueue, KM_NOSLEEP));
701 }
702 
703 /*
704  * Same as esballoc() but sleeps waiting for memory.
705  */
706 /*ARGSUSED*/
707 mblk_t *
708 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
709 {
710 	mblk_t *mp;
711 
712 	/*
713 	 * Note that this is structured to allow the common case (i.e.
714 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
715 	 * call optimization.
716 	 */
717 	if (!str_ftnever) {
718 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
719 		    frp, freebs_enqueue, KM_SLEEP);
720 
721 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
722 		return (mp);
723 	}
724 
725 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
726 	    frp, freebs_enqueue, KM_SLEEP));
727 }
728 
729 /*ARGSUSED*/
730 mblk_t *
731 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
732 {
733 	mblk_t *mp;
734 
735 	/*
736 	 * Note that this is structured to allow the common case (i.e.
737 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
738 	 * call optimization.
739 	 */
740 	if (!str_ftnever) {
741 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
742 			frp, dblk_lastfree_desb, KM_NOSLEEP);
743 
744 		if (mp != NULL)
745 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
746 		return (mp);
747 	}
748 
749 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
750 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
751 }
752 
753 /*ARGSUSED*/
754 mblk_t *
755 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
756 {
757 	mblk_t *mp;
758 
759 	/*
760 	 * Note that this is structured to allow the common case (i.e.
761 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
762 	 * call optimization.
763 	 */
764 	if (!str_ftnever) {
765 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
766 		    frp, freebs_enqueue, KM_NOSLEEP);
767 
768 		if (mp != NULL)
769 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
770 		return (mp);
771 	}
772 
773 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
774 	    frp, freebs_enqueue, KM_NOSLEEP));
775 }
776 
777 /*ARGSUSED*/
778 mblk_t *
779 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
780 {
781 	mblk_t *mp;
782 
783 	/*
784 	 * Note that this is structured to allow the common case (i.e.
785 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
786 	 * call optimization.
787 	 */
788 	if (!str_ftnever) {
789 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
790 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
791 
792 		if (mp != NULL)
793 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
794 		return (mp);
795 	}
796 
797 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
798 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
799 }
800 
801 static void
802 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
803 {
804 	bcache_t *bcp = dbp->db_cache;
805 
806 	ASSERT(dbp->db_mblk == mp);
807 	if (dbp->db_fthdr != NULL)
808 		str_ftfree(dbp);
809 
810 	/* set credp and projid to be 'unspecified' before returning to cache */
811 	if (dbp->db_credp != NULL) {
812 		crfree(dbp->db_credp);
813 		dbp->db_credp = NULL;
814 	}
815 	dbp->db_cpid = -1;
816 	dbp->db_struioflag = 0;
817 	dbp->db_struioun.cksum.flags = 0;
818 
819 	mutex_enter(&bcp->mutex);
820 	kmem_cache_free(bcp->dblk_cache, dbp);
821 	bcp->alloc--;
822 
823 	if (bcp->alloc == 0 && bcp->destroy != 0) {
824 		kmem_cache_destroy(bcp->dblk_cache);
825 		kmem_cache_destroy(bcp->buffer_cache);
826 		mutex_exit(&bcp->mutex);
827 		mutex_destroy(&bcp->mutex);
828 		kmem_free(bcp, sizeof (bcache_t));
829 	} else {
830 		mutex_exit(&bcp->mutex);
831 	}
832 }
833 
834 bcache_t *
835 bcache_create(char *name, size_t size, uint_t align)
836 {
837 	bcache_t *bcp;
838 	char buffer[255];
839 
840 	ASSERT((align & (align - 1)) == 0);
841 
842 	if ((bcp = (bcache_t *)kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) ==
843 	    NULL) {
844 		return (NULL);
845 	}
846 
847 	bcp->size = size;
848 	bcp->align = align;
849 	bcp->alloc = 0;
850 	bcp->destroy = 0;
851 
852 	mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
853 
854 	(void) sprintf(buffer, "%s_buffer_cache", name);
855 	bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
856 	    NULL, NULL, NULL, 0);
857 	(void) sprintf(buffer, "%s_dblk_cache", name);
858 	bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
859 	    DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
860 						NULL, (void *)bcp, NULL, 0);
861 
862 	return (bcp);
863 }
864 
865 void
866 bcache_destroy(bcache_t *bcp)
867 {
868 	ASSERT(bcp != NULL);
869 
870 	mutex_enter(&bcp->mutex);
871 	if (bcp->alloc == 0) {
872 		kmem_cache_destroy(bcp->dblk_cache);
873 		kmem_cache_destroy(bcp->buffer_cache);
874 		mutex_exit(&bcp->mutex);
875 		mutex_destroy(&bcp->mutex);
876 		kmem_free(bcp, sizeof (bcache_t));
877 	} else {
878 		bcp->destroy++;
879 		mutex_exit(&bcp->mutex);
880 	}
881 }
882 
883 /*ARGSUSED*/
884 mblk_t *
885 bcache_allocb(bcache_t *bcp, uint_t pri)
886 {
887 	dblk_t *dbp;
888 	mblk_t *mp = NULL;
889 
890 	ASSERT(bcp != NULL);
891 
892 	mutex_enter(&bcp->mutex);
893 	if (bcp->destroy != 0) {
894 		mutex_exit(&bcp->mutex);
895 		goto out;
896 	}
897 
898 	if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
899 		mutex_exit(&bcp->mutex);
900 		goto out;
901 	}
902 	bcp->alloc++;
903 	mutex_exit(&bcp->mutex);
904 
905 	ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
906 
907 	mp = dbp->db_mblk;
908 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
909 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
910 	mp->b_rptr = mp->b_wptr = dbp->db_base;
911 	mp->b_queue = NULL;
912 	MBLK_BAND_FLAG_WORD(mp) = 0;
913 	STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
914 out:
915 	FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
916 
917 	return (mp);
918 }
919 
920 static void
921 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
922 {
923 	ASSERT(dbp->db_mblk == mp);
924 	if (dbp->db_fthdr != NULL)
925 		str_ftfree(dbp);
926 
927 	/* set credp and projid to be 'unspecified' before returning to cache */
928 	if (dbp->db_credp != NULL) {
929 		crfree(dbp->db_credp);
930 		dbp->db_credp = NULL;
931 	}
932 	dbp->db_cpid = -1;
933 	dbp->db_struioflag = 0;
934 	dbp->db_struioun.cksum.flags = 0;
935 
936 	kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
937 	kmem_cache_free(dbp->db_cache, dbp);
938 }
939 
940 static mblk_t *
941 allocb_oversize(size_t size, int kmflags)
942 {
943 	mblk_t *mp;
944 	void *buf;
945 
946 	size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
947 	if ((buf = kmem_alloc(size, kmflags)) == NULL)
948 		return (NULL);
949 	if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
950 	    &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
951 		kmem_free(buf, size);
952 
953 	if (mp != NULL)
954 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
955 
956 	return (mp);
957 }
958 
959 mblk_t *
960 allocb_tryhard(size_t target_size)
961 {
962 	size_t size;
963 	mblk_t *bp;
964 
965 	for (size = target_size; size < target_size + 512;
966 	    size += DBLK_CACHE_ALIGN)
967 		if ((bp = allocb(size, BPRI_HI)) != NULL)
968 			return (bp);
969 	allocb_tryhard_fails++;
970 	return (NULL);
971 }
972 
973 /*
974  * This routine is consolidation private for STREAMS internal use
975  * This routine may only be called from sync routines (i.e., not
976  * from put or service procedures).  It is located here (rather
977  * than strsubr.c) so that we don't have to expose all of the
978  * allocb() implementation details in header files.
979  */
980 mblk_t *
981 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
982 {
983 	dblk_t *dbp;
984 	mblk_t *mp;
985 	size_t index;
986 
987 	index = (size -1) >> DBLK_SIZE_SHIFT;
988 
989 	if (flags & STR_NOSIG) {
990 		if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
991 			if (size != 0) {
992 				mp = allocb_oversize(size, KM_SLEEP);
993 				FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
994 				    (uintptr_t)mp);
995 				return (mp);
996 			}
997 			index = 0;
998 		}
999 
1000 		dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
1001 		mp = dbp->db_mblk;
1002 		DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1003 		mp->b_next = mp->b_prev = mp->b_cont = NULL;
1004 		mp->b_rptr = mp->b_wptr = dbp->db_base;
1005 		mp->b_queue = NULL;
1006 		MBLK_BAND_FLAG_WORD(mp) = 0;
1007 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
1008 
1009 		FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
1010 
1011 	} else {
1012 		while ((mp = allocb(size, pri)) == NULL) {
1013 			if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
1014 				return (NULL);
1015 		}
1016 	}
1017 
1018 	return (mp);
1019 }
1020 
1021 /*
1022  * Call function 'func' with 'arg' when a class zero block can
1023  * be allocated with priority 'pri'.
1024  */
1025 bufcall_id_t
1026 esbbcall(uint_t pri, void (*func)(void *), void *arg)
1027 {
1028 	return (bufcall(1, pri, func, arg));
1029 }
1030 
1031 /*
1032  * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
1033  * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
1034  * This provides consistency for all internal allocators of ioctl.
1035  */
1036 mblk_t *
1037 mkiocb(uint_t cmd)
1038 {
1039 	struct iocblk	*ioc;
1040 	mblk_t		*mp;
1041 
1042 	/*
1043 	 * Allocate enough space for any of the ioctl related messages.
1044 	 */
1045 	if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
1046 		return (NULL);
1047 
1048 	bzero(mp->b_rptr, sizeof (union ioctypes));
1049 
1050 	/*
1051 	 * Set the mblk_t information and ptrs correctly.
1052 	 */
1053 	mp->b_wptr += sizeof (struct iocblk);
1054 	mp->b_datap->db_type = M_IOCTL;
1055 
1056 	/*
1057 	 * Fill in the fields.
1058 	 */
1059 	ioc		= (struct iocblk *)mp->b_rptr;
1060 	ioc->ioc_cmd	= cmd;
1061 	ioc->ioc_cr	= kcred;
1062 	ioc->ioc_id	= getiocseqno();
1063 	ioc->ioc_flag	= IOC_NATIVE;
1064 	return (mp);
1065 }
1066 
1067 /*
1068  * test if block of given size can be allocated with a request of
1069  * the given priority.
1070  * 'pri' is no longer used, but is retained for compatibility.
1071  */
1072 /* ARGSUSED */
1073 int
1074 testb(size_t size, uint_t pri)
1075 {
1076 	return ((size + sizeof (dblk_t)) <= kmem_avail());
1077 }
1078 
1079 /*
1080  * Call function 'func' with argument 'arg' when there is a reasonably
1081  * good chance that a block of size 'size' can be allocated.
1082  * 'pri' is no longer used, but is retained for compatibility.
1083  */
1084 /* ARGSUSED */
1085 bufcall_id_t
1086 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
1087 {
1088 	static long bid = 1;	/* always odd to save checking for zero */
1089 	bufcall_id_t bc_id;
1090 	struct strbufcall *bcp;
1091 
1092 	if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
1093 		return (0);
1094 
1095 	bcp->bc_func = func;
1096 	bcp->bc_arg = arg;
1097 	bcp->bc_size = size;
1098 	bcp->bc_next = NULL;
1099 	bcp->bc_executor = NULL;
1100 
1101 	mutex_enter(&strbcall_lock);
1102 	/*
1103 	 * After bcp is linked into strbcalls and strbcall_lock is dropped there
1104 	 * should be no references to bcp since it may be freed by
1105 	 * runbufcalls(). Since bcp_id field is returned, we save its value in
1106 	 * the local var.
1107 	 */
1108 	bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2);	/* keep it odd */
1109 
1110 	/*
1111 	 * add newly allocated stream event to existing
1112 	 * linked list of events.
1113 	 */
1114 	if (strbcalls.bc_head == NULL) {
1115 		strbcalls.bc_head = strbcalls.bc_tail = bcp;
1116 	} else {
1117 		strbcalls.bc_tail->bc_next = bcp;
1118 		strbcalls.bc_tail = bcp;
1119 	}
1120 
1121 	cv_signal(&strbcall_cv);
1122 	mutex_exit(&strbcall_lock);
1123 	return (bc_id);
1124 }
1125 
1126 /*
1127  * Cancel a bufcall request.
1128  */
1129 void
1130 unbufcall(bufcall_id_t id)
1131 {
1132 	strbufcall_t *bcp, *pbcp;
1133 
1134 	mutex_enter(&strbcall_lock);
1135 again:
1136 	pbcp = NULL;
1137 	for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
1138 		if (id == bcp->bc_id)
1139 			break;
1140 		pbcp = bcp;
1141 	}
1142 	if (bcp) {
1143 		if (bcp->bc_executor != NULL) {
1144 			if (bcp->bc_executor != curthread) {
1145 				cv_wait(&bcall_cv, &strbcall_lock);
1146 				goto again;
1147 			}
1148 		} else {
1149 			if (pbcp)
1150 				pbcp->bc_next = bcp->bc_next;
1151 			else
1152 				strbcalls.bc_head = bcp->bc_next;
1153 			if (bcp == strbcalls.bc_tail)
1154 				strbcalls.bc_tail = pbcp;
1155 			kmem_free(bcp, sizeof (strbufcall_t));
1156 		}
1157 	}
1158 	mutex_exit(&strbcall_lock);
1159 }
1160 
1161 /*
1162  * Duplicate a message block by block (uses dupb), returning
1163  * a pointer to the duplicate message.
1164  * Returns a non-NULL value only if the entire message
1165  * was dup'd.
1166  */
1167 mblk_t *
1168 dupmsg(mblk_t *bp)
1169 {
1170 	mblk_t *head, *nbp;
1171 
1172 	if (!bp || !(nbp = head = dupb(bp)))
1173 		return (NULL);
1174 
1175 	while (bp->b_cont) {
1176 		if (!(nbp->b_cont = dupb(bp->b_cont))) {
1177 			freemsg(head);
1178 			return (NULL);
1179 		}
1180 		nbp = nbp->b_cont;
1181 		bp = bp->b_cont;
1182 	}
1183 	return (head);
1184 }
1185 
1186 #define	DUPB_NOLOAN(bp) \
1187 	((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
1188 	copyb((bp)) : dupb((bp)))
1189 
1190 mblk_t *
1191 dupmsg_noloan(mblk_t *bp)
1192 {
1193 	mblk_t *head, *nbp;
1194 
1195 	if (bp == NULL || DB_TYPE(bp) != M_DATA ||
1196 	    ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
1197 		return (NULL);
1198 
1199 	while (bp->b_cont) {
1200 		if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
1201 			freemsg(head);
1202 			return (NULL);
1203 		}
1204 		nbp = nbp->b_cont;
1205 		bp = bp->b_cont;
1206 	}
1207 	return (head);
1208 }
1209 
1210 /*
1211  * Copy data from message and data block to newly allocated message and
1212  * data block. Returns new message block pointer, or NULL if error.
1213  * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
1214  * as in the original even when db_base is not word aligned. (bug 1052877)
1215  */
1216 mblk_t *
1217 copyb(mblk_t *bp)
1218 {
1219 	mblk_t	*nbp;
1220 	dblk_t	*dp, *ndp;
1221 	uchar_t *base;
1222 	size_t	size;
1223 	size_t	unaligned;
1224 
1225 	ASSERT(bp->b_wptr >= bp->b_rptr);
1226 
1227 	dp = bp->b_datap;
1228 	if (dp->db_fthdr != NULL)
1229 		STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
1230 
1231 	/*
1232 	 * Special handling for Multidata message; this should be
1233 	 * removed once a copy-callback routine is made available.
1234 	 */
1235 	if (dp->db_type == M_MULTIDATA) {
1236 		cred_t *cr;
1237 
1238 		if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL)
1239 			return (NULL);
1240 
1241 		nbp->b_flag = bp->b_flag;
1242 		nbp->b_band = bp->b_band;
1243 		ndp = nbp->b_datap;
1244 
1245 		/* See comments below on potential issues. */
1246 		STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1247 
1248 		ASSERT(ndp->db_type == dp->db_type);
1249 		cr = dp->db_credp;
1250 		if (cr != NULL)
1251 			crhold(ndp->db_credp = cr);
1252 		ndp->db_cpid = dp->db_cpid;
1253 		return (nbp);
1254 	}
1255 
1256 	size = dp->db_lim - dp->db_base;
1257 	unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
1258 	if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
1259 		return (NULL);
1260 	nbp->b_flag = bp->b_flag;
1261 	nbp->b_band = bp->b_band;
1262 	ndp = nbp->b_datap;
1263 
1264 	/*
1265 	 * Well, here is a potential issue.  If we are trying to
1266 	 * trace a flow, and we copy the message, we might lose
1267 	 * information about where this message might have been.
1268 	 * So we should inherit the FT data.  On the other hand,
1269 	 * a user might be interested only in alloc to free data.
1270 	 * So I guess the real answer is to provide a tunable.
1271 	 */
1272 	STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1273 
1274 	base = ndp->db_base + unaligned;
1275 	bcopy(dp->db_base, ndp->db_base + unaligned, size);
1276 
1277 	nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
1278 	nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
1279 
1280 	return (nbp);
1281 }
1282 
1283 /*
1284  * Copy data from message to newly allocated message using new
1285  * data blocks.  Returns a pointer to the new message, or NULL if error.
1286  */
1287 mblk_t *
1288 copymsg(mblk_t *bp)
1289 {
1290 	mblk_t *head, *nbp;
1291 
1292 	if (!bp || !(nbp = head = copyb(bp)))
1293 		return (NULL);
1294 
1295 	while (bp->b_cont) {
1296 		if (!(nbp->b_cont = copyb(bp->b_cont))) {
1297 			freemsg(head);
1298 			return (NULL);
1299 		}
1300 		nbp = nbp->b_cont;
1301 		bp = bp->b_cont;
1302 	}
1303 	return (head);
1304 }
1305 
1306 /*
1307  * link a message block to tail of message
1308  */
1309 void
1310 linkb(mblk_t *mp, mblk_t *bp)
1311 {
1312 	ASSERT(mp && bp);
1313 
1314 	for (; mp->b_cont; mp = mp->b_cont)
1315 		;
1316 	mp->b_cont = bp;
1317 }
1318 
1319 /*
1320  * unlink a message block from head of message
1321  * return pointer to new message.
1322  * NULL if message becomes empty.
1323  */
1324 mblk_t *
1325 unlinkb(mblk_t *bp)
1326 {
1327 	mblk_t *bp1;
1328 
1329 	bp1 = bp->b_cont;
1330 	bp->b_cont = NULL;
1331 	return (bp1);
1332 }
1333 
1334 /*
1335  * remove a message block "bp" from message "mp"
1336  *
1337  * Return pointer to new message or NULL if no message remains.
1338  * Return -1 if bp is not found in message.
1339  */
1340 mblk_t *
1341 rmvb(mblk_t *mp, mblk_t *bp)
1342 {
1343 	mblk_t *tmp;
1344 	mblk_t *lastp = NULL;
1345 
1346 	ASSERT(mp && bp);
1347 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
1348 		if (tmp == bp) {
1349 			if (lastp)
1350 				lastp->b_cont = tmp->b_cont;
1351 			else
1352 				mp = tmp->b_cont;
1353 			tmp->b_cont = NULL;
1354 			return (mp);
1355 		}
1356 		lastp = tmp;
1357 	}
1358 	return ((mblk_t *)-1);
1359 }
1360 
1361 /*
1362  * Concatenate and align first len bytes of common
1363  * message type.  Len == -1, means concat everything.
1364  * Returns 1 on success, 0 on failure
1365  * After the pullup, mp points to the pulled up data.
1366  */
1367 int
1368 pullupmsg(mblk_t *mp, ssize_t len)
1369 {
1370 	mblk_t *bp, *b_cont;
1371 	dblk_t *dbp;
1372 	ssize_t n;
1373 
1374 	ASSERT(mp->b_datap->db_ref > 0);
1375 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
1376 
1377 	/*
1378 	 * We won't handle Multidata message, since it contains
1379 	 * metadata which this function has no knowledge of; we
1380 	 * assert on DEBUG, and return failure otherwise.
1381 	 */
1382 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1383 	if (mp->b_datap->db_type == M_MULTIDATA)
1384 		return (0);
1385 
1386 	if (len == -1) {
1387 		if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
1388 			return (1);
1389 		len = xmsgsize(mp);
1390 	} else {
1391 		ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
1392 		ASSERT(first_mblk_len >= 0);
1393 		/*
1394 		 * If the length is less than that of the first mblk,
1395 		 * we want to pull up the message into an aligned mblk.
1396 		 * Though not part of the spec, some callers assume it.
1397 		 */
1398 		if (len <= first_mblk_len) {
1399 			if (str_aligned(mp->b_rptr))
1400 				return (1);
1401 			len = first_mblk_len;
1402 		} else if (xmsgsize(mp) < len)
1403 			return (0);
1404 	}
1405 
1406 	if ((bp = allocb_tmpl(len, mp)) == NULL)
1407 		return (0);
1408 
1409 	dbp = bp->b_datap;
1410 	*bp = *mp;		/* swap mblks so bp heads the old msg... */
1411 	mp->b_datap = dbp;	/* ... and mp heads the new message */
1412 	mp->b_datap->db_mblk = mp;
1413 	bp->b_datap->db_mblk = bp;
1414 	mp->b_rptr = mp->b_wptr = dbp->db_base;
1415 
1416 	do {
1417 		ASSERT(bp->b_datap->db_ref > 0);
1418 		ASSERT(bp->b_wptr >= bp->b_rptr);
1419 		n = MIN(bp->b_wptr - bp->b_rptr, len);
1420 		bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
1421 		mp->b_wptr += n;
1422 		bp->b_rptr += n;
1423 		len -= n;
1424 		if (bp->b_rptr != bp->b_wptr)
1425 			break;
1426 		b_cont = bp->b_cont;
1427 		freeb(bp);
1428 		bp = b_cont;
1429 	} while (len && bp);
1430 
1431 	mp->b_cont = bp;	/* tack on whatever wasn't pulled up */
1432 
1433 	return (1);
1434 }
1435 
1436 /*
1437  * Concatenate and align at least the first len bytes of common message
1438  * type.  Len == -1 means concatenate everything.  The original message is
1439  * unaltered.  Returns a pointer to a new message on success, otherwise
1440  * returns NULL.
1441  */
1442 mblk_t *
1443 msgpullup(mblk_t *mp, ssize_t len)
1444 {
1445 	mblk_t	*newmp;
1446 	ssize_t	totlen;
1447 	ssize_t	n;
1448 
1449 	/*
1450 	 * We won't handle Multidata message, since it contains
1451 	 * metadata which this function has no knowledge of; we
1452 	 * assert on DEBUG, and return failure otherwise.
1453 	 */
1454 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1455 	if (mp->b_datap->db_type == M_MULTIDATA)
1456 		return (NULL);
1457 
1458 	totlen = xmsgsize(mp);
1459 
1460 	if ((len > 0) && (len > totlen))
1461 		return (NULL);
1462 
1463 	/*
1464 	 * Copy all of the first msg type into one new mblk, then dupmsg
1465 	 * and link the rest onto this.
1466 	 */
1467 
1468 	len = totlen;
1469 
1470 	if ((newmp = allocb_tmpl(len, mp)) == NULL)
1471 		return (NULL);
1472 
1473 	newmp->b_flag = mp->b_flag;
1474 	newmp->b_band = mp->b_band;
1475 
1476 	while (len > 0) {
1477 		n = mp->b_wptr - mp->b_rptr;
1478 		ASSERT(n >= 0);		/* allow zero-length mblk_t's */
1479 		if (n > 0)
1480 			bcopy(mp->b_rptr, newmp->b_wptr, n);
1481 		newmp->b_wptr += n;
1482 		len -= n;
1483 		mp = mp->b_cont;
1484 	}
1485 
1486 	if (mp != NULL) {
1487 		newmp->b_cont = dupmsg(mp);
1488 		if (newmp->b_cont == NULL) {
1489 			freemsg(newmp);
1490 			return (NULL);
1491 		}
1492 	}
1493 
1494 	return (newmp);
1495 }
1496 
1497 /*
1498  * Trim bytes from message
1499  *  len > 0, trim from head
1500  *  len < 0, trim from tail
1501  * Returns 1 on success, 0 on failure.
1502  */
1503 int
1504 adjmsg(mblk_t *mp, ssize_t len)
1505 {
1506 	mblk_t *bp;
1507 	mblk_t *save_bp = NULL;
1508 	mblk_t *prev_bp;
1509 	mblk_t *bcont;
1510 	unsigned char type;
1511 	ssize_t n;
1512 	int fromhead;
1513 	int first;
1514 
1515 	ASSERT(mp != NULL);
1516 	/*
1517 	 * We won't handle Multidata message, since it contains
1518 	 * metadata which this function has no knowledge of; we
1519 	 * assert on DEBUG, and return failure otherwise.
1520 	 */
1521 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1522 	if (mp->b_datap->db_type == M_MULTIDATA)
1523 		return (0);
1524 
1525 	if (len < 0) {
1526 		fromhead = 0;
1527 		len = -len;
1528 	} else {
1529 		fromhead = 1;
1530 	}
1531 
1532 	if (xmsgsize(mp) < len)
1533 		return (0);
1534 
1535 
1536 	if (fromhead) {
1537 		first = 1;
1538 		while (len) {
1539 			ASSERT(mp->b_wptr >= mp->b_rptr);
1540 			n = MIN(mp->b_wptr - mp->b_rptr, len);
1541 			mp->b_rptr += n;
1542 			len -= n;
1543 
1544 			/*
1545 			 * If this is not the first zero length
1546 			 * message remove it
1547 			 */
1548 			if (!first && (mp->b_wptr == mp->b_rptr)) {
1549 				bcont = mp->b_cont;
1550 				freeb(mp);
1551 				mp = save_bp->b_cont = bcont;
1552 			} else {
1553 				save_bp = mp;
1554 				mp = mp->b_cont;
1555 			}
1556 			first = 0;
1557 		}
1558 	} else {
1559 		type = mp->b_datap->db_type;
1560 		while (len) {
1561 			bp = mp;
1562 			save_bp = NULL;
1563 
1564 			/*
1565 			 * Find the last message of same type
1566 			 */
1567 
1568 			while (bp && bp->b_datap->db_type == type) {
1569 				ASSERT(bp->b_wptr >= bp->b_rptr);
1570 				prev_bp = save_bp;
1571 				save_bp = bp;
1572 				bp = bp->b_cont;
1573 			}
1574 			if (save_bp == NULL)
1575 				break;
1576 			n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
1577 			save_bp->b_wptr -= n;
1578 			len -= n;
1579 
1580 			/*
1581 			 * If this is not the first message
1582 			 * and we have taken away everything
1583 			 * from this message, remove it
1584 			 */
1585 
1586 			if ((save_bp != mp) &&
1587 				(save_bp->b_wptr == save_bp->b_rptr)) {
1588 				bcont = save_bp->b_cont;
1589 				freeb(save_bp);
1590 				prev_bp->b_cont = bcont;
1591 			}
1592 		}
1593 	}
1594 	return (1);
1595 }
1596 
1597 /*
1598  * get number of data bytes in message
1599  */
1600 size_t
1601 msgdsize(mblk_t *bp)
1602 {
1603 	size_t count = 0;
1604 
1605 	for (; bp; bp = bp->b_cont)
1606 		if (bp->b_datap->db_type == M_DATA) {
1607 			ASSERT(bp->b_wptr >= bp->b_rptr);
1608 			count += bp->b_wptr - bp->b_rptr;
1609 		}
1610 	return (count);
1611 }
1612 
1613 /*
1614  * Get a message off head of queue
1615  *
1616  * If queue has no buffers then mark queue
1617  * with QWANTR. (queue wants to be read by
1618  * someone when data becomes available)
1619  *
1620  * If there is something to take off then do so.
1621  * If queue falls below hi water mark turn off QFULL
1622  * flag.  Decrement weighted count of queue.
1623  * Also turn off QWANTR because queue is being read.
1624  *
1625  * The queue count is maintained on a per-band basis.
1626  * Priority band 0 (normal messages) uses q_count,
1627  * q_lowat, etc.  Non-zero priority bands use the
1628  * fields in their respective qband structures
1629  * (qb_count, qb_lowat, etc.)  All messages appear
1630  * on the same list, linked via their b_next pointers.
1631  * q_first is the head of the list.  q_count does
1632  * not reflect the size of all the messages on the
1633  * queue.  It only reflects those messages in the
1634  * normal band of flow.  The one exception to this
1635  * deals with high priority messages.  They are in
1636  * their own conceptual "band", but are accounted
1637  * against q_count.
1638  *
1639  * If queue count is below the lo water mark and QWANTW
1640  * is set, enable the closest backq which has a service
1641  * procedure and turn off the QWANTW flag.
1642  *
1643  * getq could be built on top of rmvq, but isn't because
1644  * of performance considerations.
1645  *
1646  * A note on the use of q_count and q_mblkcnt:
1647  *   q_count is the traditional byte count for messages that
1648  *   have been put on a queue.  Documentation tells us that
1649  *   we shouldn't rely on that count, but some drivers/modules
1650  *   do.  What was needed, however, is a mechanism to prevent
1651  *   runaway streams from consuming all of the resources,
1652  *   and particularly be able to flow control zero-length
1653  *   messages.  q_mblkcnt is used for this purpose.  It
1654  *   counts the number of mblk's that are being put on
1655  *   the queue.  The intention here, is that each mblk should
1656  *   contain one byte of data and, for the purpose of
1657  *   flow-control, logically does.  A queue will become
1658  *   full when EITHER of these values (q_count and q_mblkcnt)
1659  *   reach the highwater mark.  It will clear when BOTH
1660  *   of them drop below the highwater mark.  And it will
1661  *   backenable when BOTH of them drop below the lowwater
1662  *   mark.
1663  *   With this algorithm, a driver/module might be able
1664  *   to find a reasonably accurate q_count, and the
1665  *   framework can still try and limit resource usage.
1666  */
1667 mblk_t *
1668 getq(queue_t *q)
1669 {
1670 	mblk_t *bp;
1671 	uchar_t band = 0;
1672 
1673 	bp = getq_noenab(q);
1674 	if (bp != NULL)
1675 		band = bp->b_band;
1676 
1677 	/*
1678 	 * Inlined from qbackenable().
1679 	 * Quick check without holding the lock.
1680 	 */
1681 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1682 		return (bp);
1683 
1684 	qbackenable(q, band);
1685 	return (bp);
1686 }
1687 
1688 /*
1689  * Calculate number of data bytes in a single data message block taking
1690  * multidata messages into account.
1691  */
1692 
1693 #define	ADD_MBLK_SIZE(mp, size) 					\
1694 	if (DB_TYPE(mp) != M_MULTIDATA) {				\
1695 		(size) += MBLKL(mp);					\
1696 	} else {							\
1697 		uint_t	pinuse;						\
1698 									\
1699 		mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse);	\
1700 		(size) += pinuse;					\
1701 	}
1702 
1703 /*
1704  * Like getq() but does not backenable.  This is used by the stream
1705  * head when a putback() is likely.  The caller must call qbackenable()
1706  * after it is done with accessing the queue.
1707  */
1708 mblk_t *
1709 getq_noenab(queue_t *q)
1710 {
1711 	mblk_t *bp;
1712 	mblk_t *tmp;
1713 	qband_t *qbp;
1714 	kthread_id_t freezer;
1715 	int	bytecnt = 0, mblkcnt = 0;
1716 
1717 	/* freezestr should allow its caller to call getq/putq */
1718 	freezer = STREAM(q)->sd_freezer;
1719 	if (freezer == curthread) {
1720 		ASSERT(frozenstr(q));
1721 		ASSERT(MUTEX_HELD(QLOCK(q)));
1722 	} else
1723 		mutex_enter(QLOCK(q));
1724 
1725 	if ((bp = q->q_first) == 0) {
1726 		q->q_flag |= QWANTR;
1727 	} else {
1728 		if ((q->q_first = bp->b_next) == NULL)
1729 			q->q_last = NULL;
1730 		else
1731 			q->q_first->b_prev = NULL;
1732 
1733 		/* Get message byte count for q_count accounting */
1734 		for (tmp = bp; tmp; tmp = tmp->b_cont) {
1735 			ADD_MBLK_SIZE(tmp, bytecnt);
1736 			mblkcnt++;
1737 		}
1738 
1739 		if (bp->b_band == 0) {
1740 			q->q_count -= bytecnt;
1741 			q->q_mblkcnt -= mblkcnt;
1742 			if ((q->q_count < q->q_hiwat) &&
1743 			    (q->q_mblkcnt < q->q_hiwat)) {
1744 				q->q_flag &= ~QFULL;
1745 			}
1746 		} else {
1747 			int i;
1748 
1749 			ASSERT(bp->b_band <= q->q_nband);
1750 			ASSERT(q->q_bandp != NULL);
1751 			ASSERT(MUTEX_HELD(QLOCK(q)));
1752 			qbp = q->q_bandp;
1753 			i = bp->b_band;
1754 			while (--i > 0)
1755 				qbp = qbp->qb_next;
1756 			if (qbp->qb_first == qbp->qb_last) {
1757 				qbp->qb_first = NULL;
1758 				qbp->qb_last = NULL;
1759 			} else {
1760 				qbp->qb_first = bp->b_next;
1761 			}
1762 			qbp->qb_count -= bytecnt;
1763 			qbp->qb_mblkcnt -= mblkcnt;
1764 			if ((qbp->qb_count < qbp->qb_hiwat) &&
1765 			    (qbp->qb_mblkcnt < qbp->qb_hiwat)) {
1766 				qbp->qb_flag &= ~QB_FULL;
1767 			}
1768 		}
1769 		q->q_flag &= ~QWANTR;
1770 		bp->b_next = NULL;
1771 		bp->b_prev = NULL;
1772 	}
1773 	if (freezer != curthread)
1774 		mutex_exit(QLOCK(q));
1775 
1776 	STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL);
1777 
1778 	return (bp);
1779 }
1780 
1781 /*
1782  * Determine if a backenable is needed after removing a message in the
1783  * specified band.
1784  * NOTE: This routine assumes that something like getq_noenab() has been
1785  * already called.
1786  *
1787  * For the read side it is ok to hold sd_lock across calling this (and the
1788  * stream head often does).
1789  * But for the write side strwakeq might be invoked and it acquires sd_lock.
1790  */
1791 void
1792 qbackenable(queue_t *q, uchar_t band)
1793 {
1794 	int backenab = 0;
1795 	qband_t *qbp;
1796 	kthread_id_t freezer;
1797 
1798 	ASSERT(q);
1799 	ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
1800 
1801 	/*
1802 	 * Quick check without holding the lock.
1803 	 * OK since after getq() has lowered the q_count these flags
1804 	 * would not change unless either the qbackenable() is done by
1805 	 * another thread (which is ok) or the queue has gotten QFULL
1806 	 * in which case another backenable will take place when the queue
1807 	 * drops below q_lowat.
1808 	 */
1809 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1810 		return;
1811 
1812 	/* freezestr should allow its caller to call getq/putq */
1813 	freezer = STREAM(q)->sd_freezer;
1814 	if (freezer == curthread) {
1815 		ASSERT(frozenstr(q));
1816 		ASSERT(MUTEX_HELD(QLOCK(q)));
1817 	} else
1818 		mutex_enter(QLOCK(q));
1819 
1820 	if (band == 0) {
1821 		if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
1822 		    q->q_mblkcnt < q->q_lowat)) {
1823 			backenab = q->q_flag & (QWANTW|QWANTWSYNC);
1824 		}
1825 	} else {
1826 		int i;
1827 
1828 		ASSERT((unsigned)band <= q->q_nband);
1829 		ASSERT(q->q_bandp != NULL);
1830 
1831 		qbp = q->q_bandp;
1832 		i = band;
1833 		while (--i > 0)
1834 			qbp = qbp->qb_next;
1835 
1836 		if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
1837 		    qbp->qb_mblkcnt < qbp->qb_lowat)) {
1838 			backenab = qbp->qb_flag & QB_WANTW;
1839 		}
1840 	}
1841 
1842 	if (backenab == 0) {
1843 		if (freezer != curthread)
1844 			mutex_exit(QLOCK(q));
1845 		return;
1846 	}
1847 
1848 	/* Have to drop the lock across strwakeq and backenable */
1849 	if (backenab & QWANTWSYNC)
1850 		q->q_flag &= ~QWANTWSYNC;
1851 	if (backenab & (QWANTW|QB_WANTW)) {
1852 		if (band != 0)
1853 			qbp->qb_flag &= ~QB_WANTW;
1854 		else {
1855 			q->q_flag &= ~QWANTW;
1856 		}
1857 	}
1858 
1859 	if (freezer != curthread)
1860 		mutex_exit(QLOCK(q));
1861 
1862 	if (backenab & QWANTWSYNC)
1863 		strwakeq(q, QWANTWSYNC);
1864 	if (backenab & (QWANTW|QB_WANTW))
1865 		backenable(q, band);
1866 }
1867 
1868 /*
1869  * Remove a message from a queue.  The queue count and other
1870  * flow control parameters are adjusted and the back queue
1871  * enabled if necessary.
1872  *
1873  * rmvq can be called with the stream frozen, but other utility functions
1874  * holding QLOCK, and by streams modules without any locks/frozen.
1875  */
1876 void
1877 rmvq(queue_t *q, mblk_t *mp)
1878 {
1879 	ASSERT(mp != NULL);
1880 
1881 	rmvq_noenab(q, mp);
1882 	if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
1883 		/*
1884 		 * qbackenable can handle a frozen stream but not a "random"
1885 		 * qlock being held. Drop lock across qbackenable.
1886 		 */
1887 		mutex_exit(QLOCK(q));
1888 		qbackenable(q, mp->b_band);
1889 		mutex_enter(QLOCK(q));
1890 	} else {
1891 		qbackenable(q, mp->b_band);
1892 	}
1893 }
1894 
1895 /*
1896  * Like rmvq() but without any backenabling.
1897  * This exists to handle SR_CONSOL_DATA in strrput().
1898  */
1899 void
1900 rmvq_noenab(queue_t *q, mblk_t *mp)
1901 {
1902 	mblk_t *tmp;
1903 	int i;
1904 	qband_t *qbp = NULL;
1905 	kthread_id_t freezer;
1906 	int	bytecnt = 0, mblkcnt = 0;
1907 
1908 	freezer = STREAM(q)->sd_freezer;
1909 	if (freezer == curthread) {
1910 		ASSERT(frozenstr(q));
1911 		ASSERT(MUTEX_HELD(QLOCK(q)));
1912 	} else if (MUTEX_HELD(QLOCK(q))) {
1913 		/* Don't drop lock on exit */
1914 		freezer = curthread;
1915 	} else
1916 		mutex_enter(QLOCK(q));
1917 
1918 	ASSERT(mp->b_band <= q->q_nband);
1919 	if (mp->b_band != 0) {		/* Adjust band pointers */
1920 		ASSERT(q->q_bandp != NULL);
1921 		qbp = q->q_bandp;
1922 		i = mp->b_band;
1923 		while (--i > 0)
1924 			qbp = qbp->qb_next;
1925 		if (mp == qbp->qb_first) {
1926 			if (mp->b_next && mp->b_band == mp->b_next->b_band)
1927 				qbp->qb_first = mp->b_next;
1928 			else
1929 				qbp->qb_first = NULL;
1930 		}
1931 		if (mp == qbp->qb_last) {
1932 			if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
1933 				qbp->qb_last = mp->b_prev;
1934 			else
1935 				qbp->qb_last = NULL;
1936 		}
1937 	}
1938 
1939 	/*
1940 	 * Remove the message from the list.
1941 	 */
1942 	if (mp->b_prev)
1943 		mp->b_prev->b_next = mp->b_next;
1944 	else
1945 		q->q_first = mp->b_next;
1946 	if (mp->b_next)
1947 		mp->b_next->b_prev = mp->b_prev;
1948 	else
1949 		q->q_last = mp->b_prev;
1950 	mp->b_next = NULL;
1951 	mp->b_prev = NULL;
1952 
1953 	/* Get the size of the message for q_count accounting */
1954 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
1955 		ADD_MBLK_SIZE(tmp, bytecnt);
1956 		mblkcnt++;
1957 	}
1958 
1959 	if (mp->b_band == 0) {		/* Perform q_count accounting */
1960 		q->q_count -= bytecnt;
1961 		q->q_mblkcnt -= mblkcnt;
1962 		if ((q->q_count < q->q_hiwat) &&
1963 		    (q->q_mblkcnt < q->q_hiwat)) {
1964 			q->q_flag &= ~QFULL;
1965 		}
1966 	} else {			/* Perform qb_count accounting */
1967 		qbp->qb_count -= bytecnt;
1968 		qbp->qb_mblkcnt -= mblkcnt;
1969 		if ((qbp->qb_count < qbp->qb_hiwat) &&
1970 		    (qbp->qb_mblkcnt < qbp->qb_hiwat)) {
1971 			qbp->qb_flag &= ~QB_FULL;
1972 		}
1973 	}
1974 	if (freezer != curthread)
1975 		mutex_exit(QLOCK(q));
1976 
1977 	STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL);
1978 }
1979 
1980 /*
1981  * Empty a queue.
1982  * If flag is set, remove all messages.  Otherwise, remove
1983  * only non-control messages.  If queue falls below its low
1984  * water mark, and QWANTW is set, enable the nearest upstream
1985  * service procedure.
1986  *
1987  * Historical note: when merging the M_FLUSH code in strrput with this
1988  * code one difference was discovered. flushq did not have a check
1989  * for q_lowat == 0 in the backenabling test.
1990  *
1991  * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
1992  * if one exists on the queue.
1993  */
1994 void
1995 flushq_common(queue_t *q, int flag, int pcproto_flag)
1996 {
1997 	mblk_t *mp, *nmp;
1998 	qband_t *qbp;
1999 	int backenab = 0;
2000 	unsigned char bpri;
2001 	unsigned char	qbf[NBAND];	/* band flushing backenable flags */
2002 
2003 	if (q->q_first == NULL)
2004 		return;
2005 
2006 	mutex_enter(QLOCK(q));
2007 	mp = q->q_first;
2008 	q->q_first = NULL;
2009 	q->q_last = NULL;
2010 	q->q_count = 0;
2011 	q->q_mblkcnt = 0;
2012 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2013 		qbp->qb_first = NULL;
2014 		qbp->qb_last = NULL;
2015 		qbp->qb_count = 0;
2016 		qbp->qb_mblkcnt = 0;
2017 		qbp->qb_flag &= ~QB_FULL;
2018 	}
2019 	q->q_flag &= ~QFULL;
2020 	mutex_exit(QLOCK(q));
2021 	while (mp) {
2022 		nmp = mp->b_next;
2023 		mp->b_next = mp->b_prev = NULL;
2024 
2025 		STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL);
2026 
2027 		if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
2028 			(void) putq(q, mp);
2029 		else if (flag || datamsg(mp->b_datap->db_type))
2030 			freemsg(mp);
2031 		else
2032 			(void) putq(q, mp);
2033 		mp = nmp;
2034 	}
2035 	bpri = 1;
2036 	mutex_enter(QLOCK(q));
2037 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2038 		if ((qbp->qb_flag & QB_WANTW) &&
2039 		    (((qbp->qb_count < qbp->qb_lowat) &&
2040 		    (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
2041 		    qbp->qb_lowat == 0)) {
2042 			qbp->qb_flag &= ~QB_WANTW;
2043 			backenab = 1;
2044 			qbf[bpri] = 1;
2045 		} else
2046 			qbf[bpri] = 0;
2047 		bpri++;
2048 	}
2049 	ASSERT(bpri == (unsigned char)(q->q_nband + 1));
2050 	if ((q->q_flag & QWANTW) &&
2051 	    (((q->q_count < q->q_lowat) &&
2052 	    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2053 		q->q_flag &= ~QWANTW;
2054 		backenab = 1;
2055 		qbf[0] = 1;
2056 	} else
2057 		qbf[0] = 0;
2058 
2059 	/*
2060 	 * If any band can now be written to, and there is a writer
2061 	 * for that band, then backenable the closest service procedure.
2062 	 */
2063 	if (backenab) {
2064 		mutex_exit(QLOCK(q));
2065 		for (bpri = q->q_nband; bpri != 0; bpri--)
2066 			if (qbf[bpri])
2067 				backenable(q, bpri);
2068 		if (qbf[0])
2069 			backenable(q, 0);
2070 	} else
2071 		mutex_exit(QLOCK(q));
2072 }
2073 
2074 /*
2075  * The real flushing takes place in flushq_common. This is done so that
2076  * a flag which specifies whether or not M_PCPROTO messages should be flushed
2077  * or not. Currently the only place that uses this flag is the stream head.
2078  */
2079 void
2080 flushq(queue_t *q, int flag)
2081 {
2082 	flushq_common(q, flag, 0);
2083 }
2084 
2085 /*
2086  * Flush the queue of messages of the given priority band.
2087  * There is some duplication of code between flushq and flushband.
2088  * This is because we want to optimize the code as much as possible.
2089  * The assumption is that there will be more messages in the normal
2090  * (priority 0) band than in any other.
2091  *
2092  * Historical note: when merging the M_FLUSH code in strrput with this
2093  * code one difference was discovered. flushband had an extra check for
2094  * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
2095  * case. That check does not match the man page for flushband and was not
2096  * in the strrput flush code hence it was removed.
2097  */
2098 void
2099 flushband(queue_t *q, unsigned char pri, int flag)
2100 {
2101 	mblk_t *mp;
2102 	mblk_t *nmp;
2103 	mblk_t *last;
2104 	qband_t *qbp;
2105 	int band;
2106 
2107 	ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
2108 	if (pri > q->q_nband) {
2109 		return;
2110 	}
2111 	mutex_enter(QLOCK(q));
2112 	if (pri == 0) {
2113 		mp = q->q_first;
2114 		q->q_first = NULL;
2115 		q->q_last = NULL;
2116 		q->q_count = 0;
2117 		q->q_mblkcnt = 0;
2118 		for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2119 			qbp->qb_first = NULL;
2120 			qbp->qb_last = NULL;
2121 			qbp->qb_count = 0;
2122 			qbp->qb_mblkcnt = 0;
2123 			qbp->qb_flag &= ~QB_FULL;
2124 		}
2125 		q->q_flag &= ~QFULL;
2126 		mutex_exit(QLOCK(q));
2127 		while (mp) {
2128 			nmp = mp->b_next;
2129 			mp->b_next = mp->b_prev = NULL;
2130 			if ((mp->b_band == 0) &&
2131 				((flag == FLUSHALL) ||
2132 				datamsg(mp->b_datap->db_type)))
2133 				freemsg(mp);
2134 			else
2135 				(void) putq(q, mp);
2136 			mp = nmp;
2137 		}
2138 		mutex_enter(QLOCK(q));
2139 		if ((q->q_flag & QWANTW) &&
2140 		    (((q->q_count < q->q_lowat) &&
2141 		    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2142 			q->q_flag &= ~QWANTW;
2143 			mutex_exit(QLOCK(q));
2144 
2145 			backenable(q, pri);
2146 		} else
2147 			mutex_exit(QLOCK(q));
2148 	} else {	/* pri != 0 */
2149 		boolean_t flushed = B_FALSE;
2150 		band = pri;
2151 
2152 		ASSERT(MUTEX_HELD(QLOCK(q)));
2153 		qbp = q->q_bandp;
2154 		while (--band > 0)
2155 			qbp = qbp->qb_next;
2156 		mp = qbp->qb_first;
2157 		if (mp == NULL) {
2158 			mutex_exit(QLOCK(q));
2159 			return;
2160 		}
2161 		last = qbp->qb_last->b_next;
2162 		/*
2163 		 * rmvq_noenab() and freemsg() are called for each mblk that
2164 		 * meets the criteria.  The loop is executed until the last
2165 		 * mblk has been processed.
2166 		 */
2167 		while (mp != last) {
2168 			ASSERT(mp->b_band == pri);
2169 			nmp = mp->b_next;
2170 			if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
2171 				rmvq_noenab(q, mp);
2172 				freemsg(mp);
2173 				flushed = B_TRUE;
2174 			}
2175 			mp = nmp;
2176 		}
2177 		mutex_exit(QLOCK(q));
2178 
2179 		/*
2180 		 * If any mblk(s) has been freed, we know that qbackenable()
2181 		 * will need to be called.
2182 		 */
2183 		if (flushed)
2184 			qbackenable(q, pri);
2185 	}
2186 }
2187 
2188 /*
2189  * Return 1 if the queue is not full.  If the queue is full, return
2190  * 0 (may not put message) and set QWANTW flag (caller wants to write
2191  * to the queue).
2192  */
2193 int
2194 canput(queue_t *q)
2195 {
2196 	TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
2197 
2198 	/* this is for loopback transports, they should not do a canput */
2199 	ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
2200 
2201 	/* Find next forward module that has a service procedure */
2202 	q = q->q_nfsrv;
2203 
2204 	if (!(q->q_flag & QFULL)) {
2205 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2206 		return (1);
2207 	}
2208 	mutex_enter(QLOCK(q));
2209 	if (q->q_flag & QFULL) {
2210 		q->q_flag |= QWANTW;
2211 		mutex_exit(QLOCK(q));
2212 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
2213 		return (0);
2214 	}
2215 	mutex_exit(QLOCK(q));
2216 	TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2217 	return (1);
2218 }
2219 
2220 /*
2221  * This is the new canput for use with priority bands.  Return 1 if the
2222  * band is not full.  If the band is full, return 0 (may not put message)
2223  * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
2224  * write to the queue).
2225  */
2226 int
2227 bcanput(queue_t *q, unsigned char pri)
2228 {
2229 	qband_t *qbp;
2230 
2231 	TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
2232 	if (!q)
2233 		return (0);
2234 
2235 	/* Find next forward module that has a service procedure */
2236 	q = q->q_nfsrv;
2237 
2238 	mutex_enter(QLOCK(q));
2239 	if (pri == 0) {
2240 		if (q->q_flag & QFULL) {
2241 			q->q_flag |= QWANTW;
2242 			mutex_exit(QLOCK(q));
2243 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2244 				"bcanput:%p %X %d", q, pri, 0);
2245 			return (0);
2246 		}
2247 	} else {	/* pri != 0 */
2248 		if (pri > q->q_nband) {
2249 			/*
2250 			 * No band exists yet, so return success.
2251 			 */
2252 			mutex_exit(QLOCK(q));
2253 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2254 				"bcanput:%p %X %d", q, pri, 1);
2255 			return (1);
2256 		}
2257 		qbp = q->q_bandp;
2258 		while (--pri)
2259 			qbp = qbp->qb_next;
2260 		if (qbp->qb_flag & QB_FULL) {
2261 			qbp->qb_flag |= QB_WANTW;
2262 			mutex_exit(QLOCK(q));
2263 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2264 				"bcanput:%p %X %d", q, pri, 0);
2265 			return (0);
2266 		}
2267 	}
2268 	mutex_exit(QLOCK(q));
2269 	TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2270 		"bcanput:%p %X %d", q, pri, 1);
2271 	return (1);
2272 }
2273 
2274 /*
2275  * Put a message on a queue.
2276  *
2277  * Messages are enqueued on a priority basis.  The priority classes
2278  * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
2279  * and B_NORMAL (type < QPCTL && band == 0).
2280  *
2281  * Add appropriate weighted data block sizes to queue count.
2282  * If queue hits high water mark then set QFULL flag.
2283  *
2284  * If QNOENAB is not set (putq is allowed to enable the queue),
2285  * enable the queue only if the message is PRIORITY,
2286  * or the QWANTR flag is set (indicating that the service procedure
2287  * is ready to read the queue.  This implies that a service
2288  * procedure must NEVER put a high priority message back on its own
2289  * queue, as this would result in an infinite loop (!).
2290  */
2291 int
2292 putq(queue_t *q, mblk_t *bp)
2293 {
2294 	mblk_t *tmp;
2295 	qband_t *qbp = NULL;
2296 	int mcls = (int)queclass(bp);
2297 	kthread_id_t freezer;
2298 	int	bytecnt = 0, mblkcnt = 0;
2299 
2300 	freezer = STREAM(q)->sd_freezer;
2301 	if (freezer == curthread) {
2302 		ASSERT(frozenstr(q));
2303 		ASSERT(MUTEX_HELD(QLOCK(q)));
2304 	} else
2305 		mutex_enter(QLOCK(q));
2306 
2307 	/*
2308 	 * Make sanity checks and if qband structure is not yet
2309 	 * allocated, do so.
2310 	 */
2311 	if (mcls == QPCTL) {
2312 		if (bp->b_band != 0)
2313 			bp->b_band = 0;		/* force to be correct */
2314 	} else if (bp->b_band != 0) {
2315 		int i;
2316 		qband_t **qbpp;
2317 
2318 		if (bp->b_band > q->q_nband) {
2319 
2320 			/*
2321 			 * The qband structure for this priority band is
2322 			 * not on the queue yet, so we have to allocate
2323 			 * one on the fly.  It would be wasteful to
2324 			 * associate the qband structures with every
2325 			 * queue when the queues are allocated.  This is
2326 			 * because most queues will only need the normal
2327 			 * band of flow which can be described entirely
2328 			 * by the queue itself.
2329 			 */
2330 			qbpp = &q->q_bandp;
2331 			while (*qbpp)
2332 				qbpp = &(*qbpp)->qb_next;
2333 			while (bp->b_band > q->q_nband) {
2334 				if ((*qbpp = allocband()) == NULL) {
2335 					if (freezer != curthread)
2336 						mutex_exit(QLOCK(q));
2337 					return (0);
2338 				}
2339 				(*qbpp)->qb_hiwat = q->q_hiwat;
2340 				(*qbpp)->qb_lowat = q->q_lowat;
2341 				q->q_nband++;
2342 				qbpp = &(*qbpp)->qb_next;
2343 			}
2344 		}
2345 		ASSERT(MUTEX_HELD(QLOCK(q)));
2346 		qbp = q->q_bandp;
2347 		i = bp->b_band;
2348 		while (--i)
2349 			qbp = qbp->qb_next;
2350 	}
2351 
2352 	/*
2353 	 * If queue is empty, add the message and initialize the pointers.
2354 	 * Otherwise, adjust message pointers and queue pointers based on
2355 	 * the type of the message and where it belongs on the queue.  Some
2356 	 * code is duplicated to minimize the number of conditionals and
2357 	 * hopefully minimize the amount of time this routine takes.
2358 	 */
2359 	if (!q->q_first) {
2360 		bp->b_next = NULL;
2361 		bp->b_prev = NULL;
2362 		q->q_first = bp;
2363 		q->q_last = bp;
2364 		if (qbp) {
2365 			qbp->qb_first = bp;
2366 			qbp->qb_last = bp;
2367 		}
2368 	} else if (!qbp) {	/* bp->b_band == 0 */
2369 
2370 		/*
2371 		 * If queue class of message is less than or equal to
2372 		 * that of the last one on the queue, tack on to the end.
2373 		 */
2374 		tmp = q->q_last;
2375 		if (mcls <= (int)queclass(tmp)) {
2376 			bp->b_next = NULL;
2377 			bp->b_prev = tmp;
2378 			tmp->b_next = bp;
2379 			q->q_last = bp;
2380 		} else {
2381 			tmp = q->q_first;
2382 			while ((int)queclass(tmp) >= mcls)
2383 				tmp = tmp->b_next;
2384 
2385 			/*
2386 			 * Insert bp before tmp.
2387 			 */
2388 			bp->b_next = tmp;
2389 			bp->b_prev = tmp->b_prev;
2390 			if (tmp->b_prev)
2391 				tmp->b_prev->b_next = bp;
2392 			else
2393 				q->q_first = bp;
2394 			tmp->b_prev = bp;
2395 		}
2396 	} else {		/* bp->b_band != 0 */
2397 		if (qbp->qb_first) {
2398 			tmp = qbp->qb_last;
2399 
2400 			/*
2401 			 * Insert bp after the last message in this band.
2402 			 */
2403 			bp->b_next = tmp->b_next;
2404 			if (tmp->b_next)
2405 				tmp->b_next->b_prev = bp;
2406 			else
2407 				q->q_last = bp;
2408 			bp->b_prev = tmp;
2409 			tmp->b_next = bp;
2410 		} else {
2411 			tmp = q->q_last;
2412 			if ((mcls < (int)queclass(tmp)) ||
2413 			    (bp->b_band <= tmp->b_band)) {
2414 
2415 				/*
2416 				 * Tack bp on end of queue.
2417 				 */
2418 				bp->b_next = NULL;
2419 				bp->b_prev = tmp;
2420 				tmp->b_next = bp;
2421 				q->q_last = bp;
2422 			} else {
2423 				tmp = q->q_first;
2424 				while (tmp->b_datap->db_type >= QPCTL)
2425 					tmp = tmp->b_next;
2426 				while (tmp->b_band >= bp->b_band)
2427 					tmp = tmp->b_next;
2428 
2429 				/*
2430 				 * Insert bp before tmp.
2431 				 */
2432 				bp->b_next = tmp;
2433 				bp->b_prev = tmp->b_prev;
2434 				if (tmp->b_prev)
2435 					tmp->b_prev->b_next = bp;
2436 				else
2437 					q->q_first = bp;
2438 				tmp->b_prev = bp;
2439 			}
2440 			qbp->qb_first = bp;
2441 		}
2442 		qbp->qb_last = bp;
2443 	}
2444 
2445 	/* Get message byte count for q_count accounting */
2446 	for (tmp = bp; tmp; tmp = tmp->b_cont) {
2447 		ADD_MBLK_SIZE(tmp, bytecnt);
2448 		mblkcnt++;
2449 	}
2450 
2451 	if (qbp) {
2452 		qbp->qb_count += bytecnt;
2453 		qbp->qb_mblkcnt += mblkcnt;
2454 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2455 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2456 			qbp->qb_flag |= QB_FULL;
2457 		}
2458 	} else {
2459 		q->q_count += bytecnt;
2460 		q->q_mblkcnt += mblkcnt;
2461 		if ((q->q_count >= q->q_hiwat) ||
2462 		    (q->q_mblkcnt >= q->q_hiwat)) {
2463 			q->q_flag |= QFULL;
2464 		}
2465 	}
2466 
2467 	STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL);
2468 
2469 	if ((mcls > QNORM) ||
2470 	    (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
2471 		qenable_locked(q);
2472 	ASSERT(MUTEX_HELD(QLOCK(q)));
2473 	if (freezer != curthread)
2474 		mutex_exit(QLOCK(q));
2475 
2476 	return (1);
2477 }
2478 
2479 /*
2480  * Put stuff back at beginning of Q according to priority order.
2481  * See comment on putq above for details.
2482  */
2483 int
2484 putbq(queue_t *q, mblk_t *bp)
2485 {
2486 	mblk_t *tmp;
2487 	qband_t *qbp = NULL;
2488 	int mcls = (int)queclass(bp);
2489 	kthread_id_t freezer;
2490 	int	bytecnt = 0, mblkcnt = 0;
2491 
2492 	ASSERT(q && bp);
2493 	ASSERT(bp->b_next == NULL);
2494 	freezer = STREAM(q)->sd_freezer;
2495 	if (freezer == curthread) {
2496 		ASSERT(frozenstr(q));
2497 		ASSERT(MUTEX_HELD(QLOCK(q)));
2498 	} else
2499 		mutex_enter(QLOCK(q));
2500 
2501 	/*
2502 	 * Make sanity checks and if qband structure is not yet
2503 	 * allocated, do so.
2504 	 */
2505 	if (mcls == QPCTL) {
2506 		if (bp->b_band != 0)
2507 			bp->b_band = 0;		/* force to be correct */
2508 	} else if (bp->b_band != 0) {
2509 		int i;
2510 		qband_t **qbpp;
2511 
2512 		if (bp->b_band > q->q_nband) {
2513 			qbpp = &q->q_bandp;
2514 			while (*qbpp)
2515 				qbpp = &(*qbpp)->qb_next;
2516 			while (bp->b_band > q->q_nband) {
2517 				if ((*qbpp = allocband()) == NULL) {
2518 					if (freezer != curthread)
2519 						mutex_exit(QLOCK(q));
2520 					return (0);
2521 				}
2522 				(*qbpp)->qb_hiwat = q->q_hiwat;
2523 				(*qbpp)->qb_lowat = q->q_lowat;
2524 				q->q_nband++;
2525 				qbpp = &(*qbpp)->qb_next;
2526 			}
2527 		}
2528 		qbp = q->q_bandp;
2529 		i = bp->b_band;
2530 		while (--i)
2531 			qbp = qbp->qb_next;
2532 	}
2533 
2534 	/*
2535 	 * If queue is empty or if message is high priority,
2536 	 * place on the front of the queue.
2537 	 */
2538 	tmp = q->q_first;
2539 	if ((!tmp) || (mcls == QPCTL)) {
2540 		bp->b_next = tmp;
2541 		if (tmp)
2542 			tmp->b_prev = bp;
2543 		else
2544 			q->q_last = bp;
2545 		q->q_first = bp;
2546 		bp->b_prev = NULL;
2547 		if (qbp) {
2548 			qbp->qb_first = bp;
2549 			qbp->qb_last = bp;
2550 		}
2551 	} else if (qbp) {	/* bp->b_band != 0 */
2552 		tmp = qbp->qb_first;
2553 		if (tmp) {
2554 
2555 			/*
2556 			 * Insert bp before the first message in this band.
2557 			 */
2558 			bp->b_next = tmp;
2559 			bp->b_prev = tmp->b_prev;
2560 			if (tmp->b_prev)
2561 				tmp->b_prev->b_next = bp;
2562 			else
2563 				q->q_first = bp;
2564 			tmp->b_prev = bp;
2565 		} else {
2566 			tmp = q->q_last;
2567 			if ((mcls < (int)queclass(tmp)) ||
2568 			    (bp->b_band < tmp->b_band)) {
2569 
2570 				/*
2571 				 * Tack bp on end of queue.
2572 				 */
2573 				bp->b_next = NULL;
2574 				bp->b_prev = tmp;
2575 				tmp->b_next = bp;
2576 				q->q_last = bp;
2577 			} else {
2578 				tmp = q->q_first;
2579 				while (tmp->b_datap->db_type >= QPCTL)
2580 					tmp = tmp->b_next;
2581 				while (tmp->b_band > bp->b_band)
2582 					tmp = tmp->b_next;
2583 
2584 				/*
2585 				 * Insert bp before tmp.
2586 				 */
2587 				bp->b_next = tmp;
2588 				bp->b_prev = tmp->b_prev;
2589 				if (tmp->b_prev)
2590 					tmp->b_prev->b_next = bp;
2591 				else
2592 					q->q_first = bp;
2593 				tmp->b_prev = bp;
2594 			}
2595 			qbp->qb_last = bp;
2596 		}
2597 		qbp->qb_first = bp;
2598 	} else {		/* bp->b_band == 0 && !QPCTL */
2599 
2600 		/*
2601 		 * If the queue class or band is less than that of the last
2602 		 * message on the queue, tack bp on the end of the queue.
2603 		 */
2604 		tmp = q->q_last;
2605 		if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
2606 			bp->b_next = NULL;
2607 			bp->b_prev = tmp;
2608 			tmp->b_next = bp;
2609 			q->q_last = bp;
2610 		} else {
2611 			tmp = q->q_first;
2612 			while (tmp->b_datap->db_type >= QPCTL)
2613 				tmp = tmp->b_next;
2614 			while (tmp->b_band > bp->b_band)
2615 				tmp = tmp->b_next;
2616 
2617 			/*
2618 			 * Insert bp before tmp.
2619 			 */
2620 			bp->b_next = tmp;
2621 			bp->b_prev = tmp->b_prev;
2622 			if (tmp->b_prev)
2623 				tmp->b_prev->b_next = bp;
2624 			else
2625 				q->q_first = bp;
2626 			tmp->b_prev = bp;
2627 		}
2628 	}
2629 
2630 	/* Get message byte count for q_count accounting */
2631 	for (tmp = bp; tmp; tmp = tmp->b_cont) {
2632 		ADD_MBLK_SIZE(tmp, bytecnt);
2633 		mblkcnt++;
2634 	}
2635 	if (qbp) {
2636 		qbp->qb_count += bytecnt;
2637 		qbp->qb_mblkcnt += mblkcnt;
2638 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2639 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2640 			qbp->qb_flag |= QB_FULL;
2641 		}
2642 	} else {
2643 		q->q_count += bytecnt;
2644 		q->q_mblkcnt += mblkcnt;
2645 		if ((q->q_count >= q->q_hiwat) ||
2646 		    (q->q_mblkcnt >= q->q_hiwat)) {
2647 			q->q_flag |= QFULL;
2648 		}
2649 	}
2650 
2651 	STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL);
2652 
2653 	if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
2654 		qenable_locked(q);
2655 	ASSERT(MUTEX_HELD(QLOCK(q)));
2656 	if (freezer != curthread)
2657 		mutex_exit(QLOCK(q));
2658 
2659 	return (1);
2660 }
2661 
2662 /*
2663  * Insert a message before an existing message on the queue.  If the
2664  * existing message is NULL, the new messages is placed on the end of
2665  * the queue.  The queue class of the new message is ignored.  However,
2666  * the priority band of the new message must adhere to the following
2667  * ordering:
2668  *
2669  *	emp->b_prev->b_band >= mp->b_band >= emp->b_band.
2670  *
2671  * All flow control parameters are updated.
2672  *
2673  * insq can be called with the stream frozen, but other utility functions
2674  * holding QLOCK, and by streams modules without any locks/frozen.
2675  */
2676 int
2677 insq(queue_t *q, mblk_t *emp, mblk_t *mp)
2678 {
2679 	mblk_t *tmp;
2680 	qband_t *qbp = NULL;
2681 	int mcls = (int)queclass(mp);
2682 	kthread_id_t freezer;
2683 	int	bytecnt = 0, mblkcnt = 0;
2684 
2685 	freezer = STREAM(q)->sd_freezer;
2686 	if (freezer == curthread) {
2687 		ASSERT(frozenstr(q));
2688 		ASSERT(MUTEX_HELD(QLOCK(q)));
2689 	} else if (MUTEX_HELD(QLOCK(q))) {
2690 		/* Don't drop lock on exit */
2691 		freezer = curthread;
2692 	} else
2693 		mutex_enter(QLOCK(q));
2694 
2695 	if (mcls == QPCTL) {
2696 		if (mp->b_band != 0)
2697 			mp->b_band = 0;		/* force to be correct */
2698 		if (emp && emp->b_prev &&
2699 		    (emp->b_prev->b_datap->db_type < QPCTL))
2700 			goto badord;
2701 	}
2702 	if (emp) {
2703 		if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
2704 		    (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
2705 		    (emp->b_prev->b_band < mp->b_band))) {
2706 			goto badord;
2707 		}
2708 	} else {
2709 		tmp = q->q_last;
2710 		if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
2711 badord:
2712 			cmn_err(CE_WARN,
2713 			    "insq: attempt to insert message out of order "
2714 			    "on q %p", (void *)q);
2715 			if (freezer != curthread)
2716 				mutex_exit(QLOCK(q));
2717 			return (0);
2718 		}
2719 	}
2720 
2721 	if (mp->b_band != 0) {
2722 		int i;
2723 		qband_t **qbpp;
2724 
2725 		if (mp->b_band > q->q_nband) {
2726 			qbpp = &q->q_bandp;
2727 			while (*qbpp)
2728 				qbpp = &(*qbpp)->qb_next;
2729 			while (mp->b_band > q->q_nband) {
2730 				if ((*qbpp = allocband()) == NULL) {
2731 					if (freezer != curthread)
2732 						mutex_exit(QLOCK(q));
2733 					return (0);
2734 				}
2735 				(*qbpp)->qb_hiwat = q->q_hiwat;
2736 				(*qbpp)->qb_lowat = q->q_lowat;
2737 				q->q_nband++;
2738 				qbpp = &(*qbpp)->qb_next;
2739 			}
2740 		}
2741 		qbp = q->q_bandp;
2742 		i = mp->b_band;
2743 		while (--i)
2744 			qbp = qbp->qb_next;
2745 	}
2746 
2747 	if ((mp->b_next = emp) != NULL) {
2748 		if ((mp->b_prev = emp->b_prev) != NULL)
2749 			emp->b_prev->b_next = mp;
2750 		else
2751 			q->q_first = mp;
2752 		emp->b_prev = mp;
2753 	} else {
2754 		if ((mp->b_prev = q->q_last) != NULL)
2755 			q->q_last->b_next = mp;
2756 		else
2757 			q->q_first = mp;
2758 		q->q_last = mp;
2759 	}
2760 
2761 	/* Get mblk and byte count for q_count accounting */
2762 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
2763 		ADD_MBLK_SIZE(tmp, bytecnt);
2764 		mblkcnt++;
2765 	}
2766 
2767 	if (qbp) {	/* adjust qband pointers and count */
2768 		if (!qbp->qb_first) {
2769 			qbp->qb_first = mp;
2770 			qbp->qb_last = mp;
2771 		} else {
2772 			if (mp->b_prev == NULL || (mp->b_prev != NULL &&
2773 			    (mp->b_prev->b_band != mp->b_band)))
2774 				qbp->qb_first = mp;
2775 			else if (mp->b_next == NULL || (mp->b_next != NULL &&
2776 			    (mp->b_next->b_band != mp->b_band)))
2777 				qbp->qb_last = mp;
2778 		}
2779 		qbp->qb_count += bytecnt;
2780 		qbp->qb_mblkcnt += mblkcnt;
2781 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2782 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2783 			qbp->qb_flag |= QB_FULL;
2784 		}
2785 	} else {
2786 		q->q_count += bytecnt;
2787 		q->q_mblkcnt += mblkcnt;
2788 		if ((q->q_count >= q->q_hiwat) ||
2789 		    (q->q_mblkcnt >= q->q_hiwat)) {
2790 			q->q_flag |= QFULL;
2791 		}
2792 	}
2793 
2794 	STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL);
2795 
2796 	if (canenable(q) && (q->q_flag & QWANTR))
2797 		qenable_locked(q);
2798 
2799 	ASSERT(MUTEX_HELD(QLOCK(q)));
2800 	if (freezer != curthread)
2801 		mutex_exit(QLOCK(q));
2802 
2803 	return (1);
2804 }
2805 
2806 /*
2807  * Create and put a control message on queue.
2808  */
2809 int
2810 putctl(queue_t *q, int type)
2811 {
2812 	mblk_t *bp;
2813 
2814 	if ((datamsg(type) && (type != M_DELAY)) ||
2815 	    (bp = allocb_tryhard(0)) == NULL)
2816 		return (0);
2817 	bp->b_datap->db_type = (unsigned char) type;
2818 
2819 	put(q, bp);
2820 
2821 	return (1);
2822 }
2823 
2824 /*
2825  * Control message with a single-byte parameter
2826  */
2827 int
2828 putctl1(queue_t *q, int type, int param)
2829 {
2830 	mblk_t *bp;
2831 
2832 	if ((datamsg(type) && (type != M_DELAY)) ||
2833 	    (bp = allocb_tryhard(1)) == NULL)
2834 		return (0);
2835 	bp->b_datap->db_type = (unsigned char)type;
2836 	*bp->b_wptr++ = (unsigned char)param;
2837 
2838 	put(q, bp);
2839 
2840 	return (1);
2841 }
2842 
2843 int
2844 putnextctl1(queue_t *q, int type, int param)
2845 {
2846 	mblk_t *bp;
2847 
2848 	if ((datamsg(type) && (type != M_DELAY)) ||
2849 		((bp = allocb_tryhard(1)) == NULL))
2850 		return (0);
2851 
2852 	bp->b_datap->db_type = (unsigned char)type;
2853 	*bp->b_wptr++ = (unsigned char)param;
2854 
2855 	putnext(q, bp);
2856 
2857 	return (1);
2858 }
2859 
2860 int
2861 putnextctl(queue_t *q, int type)
2862 {
2863 	mblk_t *bp;
2864 
2865 	if ((datamsg(type) && (type != M_DELAY)) ||
2866 		((bp = allocb_tryhard(0)) == NULL))
2867 		return (0);
2868 	bp->b_datap->db_type = (unsigned char)type;
2869 
2870 	putnext(q, bp);
2871 
2872 	return (1);
2873 }
2874 
2875 /*
2876  * Return the queue upstream from this one
2877  */
2878 queue_t *
2879 backq(queue_t *q)
2880 {
2881 	q = _OTHERQ(q);
2882 	if (q->q_next) {
2883 		q = q->q_next;
2884 		return (_OTHERQ(q));
2885 	}
2886 	return (NULL);
2887 }
2888 
2889 /*
2890  * Send a block back up the queue in reverse from this
2891  * one (e.g. to respond to ioctls)
2892  */
2893 void
2894 qreply(queue_t *q, mblk_t *bp)
2895 {
2896 	ASSERT(q && bp);
2897 
2898 	putnext(_OTHERQ(q), bp);
2899 }
2900 
2901 /*
2902  * Streams Queue Scheduling
2903  *
2904  * Queues are enabled through qenable() when they have messages to
2905  * process.  They are serviced by queuerun(), which runs each enabled
2906  * queue's service procedure.  The call to queuerun() is processor
2907  * dependent - the general principle is that it be run whenever a queue
2908  * is enabled but before returning to user level.  For system calls,
2909  * the function runqueues() is called if their action causes a queue
2910  * to be enabled.  For device interrupts, queuerun() should be
2911  * called before returning from the last level of interrupt.  Beyond
2912  * this, no timing assumptions should be made about queue scheduling.
2913  */
2914 
2915 /*
2916  * Enable a queue: put it on list of those whose service procedures are
2917  * ready to run and set up the scheduling mechanism.
2918  * The broadcast is done outside the mutex -> to avoid the woken thread
2919  * from contending with the mutex. This is OK 'cos the queue has been
2920  * enqueued on the runlist and flagged safely at this point.
2921  */
2922 void
2923 qenable(queue_t *q)
2924 {
2925 	mutex_enter(QLOCK(q));
2926 	qenable_locked(q);
2927 	mutex_exit(QLOCK(q));
2928 }
2929 /*
2930  * Return number of messages on queue
2931  */
2932 int
2933 qsize(queue_t *qp)
2934 {
2935 	int count = 0;
2936 	mblk_t *mp;
2937 
2938 	mutex_enter(QLOCK(qp));
2939 	for (mp = qp->q_first; mp; mp = mp->b_next)
2940 		count++;
2941 	mutex_exit(QLOCK(qp));
2942 	return (count);
2943 }
2944 
2945 /*
2946  * noenable - set queue so that putq() will not enable it.
2947  * enableok - set queue so that putq() can enable it.
2948  */
2949 void
2950 noenable(queue_t *q)
2951 {
2952 	mutex_enter(QLOCK(q));
2953 	q->q_flag |= QNOENB;
2954 	mutex_exit(QLOCK(q));
2955 }
2956 
2957 void
2958 enableok(queue_t *q)
2959 {
2960 	mutex_enter(QLOCK(q));
2961 	q->q_flag &= ~QNOENB;
2962 	mutex_exit(QLOCK(q));
2963 }
2964 
2965 /*
2966  * Set queue fields.
2967  */
2968 int
2969 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
2970 {
2971 	qband_t *qbp = NULL;
2972 	queue_t	*wrq;
2973 	int error = 0;
2974 	kthread_id_t freezer;
2975 
2976 	freezer = STREAM(q)->sd_freezer;
2977 	if (freezer == curthread) {
2978 		ASSERT(frozenstr(q));
2979 		ASSERT(MUTEX_HELD(QLOCK(q)));
2980 	} else
2981 		mutex_enter(QLOCK(q));
2982 
2983 	if (what >= QBAD) {
2984 		error = EINVAL;
2985 		goto done;
2986 	}
2987 	if (pri != 0) {
2988 		int i;
2989 		qband_t **qbpp;
2990 
2991 		if (pri > q->q_nband) {
2992 			qbpp = &q->q_bandp;
2993 			while (*qbpp)
2994 				qbpp = &(*qbpp)->qb_next;
2995 			while (pri > q->q_nband) {
2996 				if ((*qbpp = allocband()) == NULL) {
2997 					error = EAGAIN;
2998 					goto done;
2999 				}
3000 				(*qbpp)->qb_hiwat = q->q_hiwat;
3001 				(*qbpp)->qb_lowat = q->q_lowat;
3002 				q->q_nband++;
3003 				qbpp = &(*qbpp)->qb_next;
3004 			}
3005 		}
3006 		qbp = q->q_bandp;
3007 		i = pri;
3008 		while (--i)
3009 			qbp = qbp->qb_next;
3010 	}
3011 	switch (what) {
3012 
3013 	case QHIWAT:
3014 		if (qbp)
3015 			qbp->qb_hiwat = (size_t)val;
3016 		else
3017 			q->q_hiwat = (size_t)val;
3018 		break;
3019 
3020 	case QLOWAT:
3021 		if (qbp)
3022 			qbp->qb_lowat = (size_t)val;
3023 		else
3024 			q->q_lowat = (size_t)val;
3025 		break;
3026 
3027 	case QMAXPSZ:
3028 		if (qbp)
3029 			error = EINVAL;
3030 		else
3031 			q->q_maxpsz = (ssize_t)val;
3032 
3033 		/*
3034 		 * Performance concern, strwrite looks at the module below
3035 		 * the stream head for the maxpsz each time it does a write
3036 		 * we now cache it at the stream head.  Check to see if this
3037 		 * queue is sitting directly below the stream head.
3038 		 */
3039 		wrq = STREAM(q)->sd_wrq;
3040 		if (q != wrq->q_next)
3041 			break;
3042 
3043 		/*
3044 		 * If the stream is not frozen drop the current QLOCK and
3045 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3046 		 */
3047 		if (freezer != curthread) {
3048 			mutex_exit(QLOCK(q));
3049 			mutex_enter(QLOCK(wrq));
3050 		}
3051 		ASSERT(MUTEX_HELD(QLOCK(wrq)));
3052 
3053 		if (strmsgsz != 0) {
3054 			if (val == INFPSZ)
3055 				val = strmsgsz;
3056 			else  {
3057 				if (STREAM(q)->sd_vnode->v_type == VFIFO)
3058 					val = MIN(PIPE_BUF, val);
3059 				else
3060 					val = MIN(strmsgsz, val);
3061 			}
3062 		}
3063 		STREAM(q)->sd_qn_maxpsz = val;
3064 		if (freezer != curthread) {
3065 			mutex_exit(QLOCK(wrq));
3066 			mutex_enter(QLOCK(q));
3067 		}
3068 		break;
3069 
3070 	case QMINPSZ:
3071 		if (qbp)
3072 			error = EINVAL;
3073 		else
3074 			q->q_minpsz = (ssize_t)val;
3075 
3076 		/*
3077 		 * Performance concern, strwrite looks at the module below
3078 		 * the stream head for the maxpsz each time it does a write
3079 		 * we now cache it at the stream head.  Check to see if this
3080 		 * queue is sitting directly below the stream head.
3081 		 */
3082 		wrq = STREAM(q)->sd_wrq;
3083 		if (q != wrq->q_next)
3084 			break;
3085 
3086 		/*
3087 		 * If the stream is not frozen drop the current QLOCK and
3088 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3089 		 */
3090 		if (freezer != curthread) {
3091 			mutex_exit(QLOCK(q));
3092 			mutex_enter(QLOCK(wrq));
3093 		}
3094 		STREAM(q)->sd_qn_minpsz = (ssize_t)val;
3095 
3096 		if (freezer != curthread) {
3097 			mutex_exit(QLOCK(wrq));
3098 			mutex_enter(QLOCK(q));
3099 		}
3100 		break;
3101 
3102 	case QSTRUIOT:
3103 		if (qbp)
3104 			error = EINVAL;
3105 		else
3106 			q->q_struiot = (ushort_t)val;
3107 		break;
3108 
3109 	case QCOUNT:
3110 	case QFIRST:
3111 	case QLAST:
3112 	case QFLAG:
3113 		error = EPERM;
3114 		break;
3115 
3116 	default:
3117 		error = EINVAL;
3118 		break;
3119 	}
3120 done:
3121 	if (freezer != curthread)
3122 		mutex_exit(QLOCK(q));
3123 	return (error);
3124 }
3125 
3126 /*
3127  * Get queue fields.
3128  */
3129 int
3130 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
3131 {
3132 	qband_t 	*qbp = NULL;
3133 	int 		error = 0;
3134 	kthread_id_t 	freezer;
3135 
3136 	freezer = STREAM(q)->sd_freezer;
3137 	if (freezer == curthread) {
3138 		ASSERT(frozenstr(q));
3139 		ASSERT(MUTEX_HELD(QLOCK(q)));
3140 	} else
3141 		mutex_enter(QLOCK(q));
3142 	if (what >= QBAD) {
3143 		error = EINVAL;
3144 		goto done;
3145 	}
3146 	if (pri != 0) {
3147 		int i;
3148 		qband_t **qbpp;
3149 
3150 		if (pri > q->q_nband) {
3151 			qbpp = &q->q_bandp;
3152 			while (*qbpp)
3153 				qbpp = &(*qbpp)->qb_next;
3154 			while (pri > q->q_nband) {
3155 				if ((*qbpp = allocband()) == NULL) {
3156 					error = EAGAIN;
3157 					goto done;
3158 				}
3159 				(*qbpp)->qb_hiwat = q->q_hiwat;
3160 				(*qbpp)->qb_lowat = q->q_lowat;
3161 				q->q_nband++;
3162 				qbpp = &(*qbpp)->qb_next;
3163 			}
3164 		}
3165 		qbp = q->q_bandp;
3166 		i = pri;
3167 		while (--i)
3168 			qbp = qbp->qb_next;
3169 	}
3170 	switch (what) {
3171 	case QHIWAT:
3172 		if (qbp)
3173 			*(size_t *)valp = qbp->qb_hiwat;
3174 		else
3175 			*(size_t *)valp = q->q_hiwat;
3176 		break;
3177 
3178 	case QLOWAT:
3179 		if (qbp)
3180 			*(size_t *)valp = qbp->qb_lowat;
3181 		else
3182 			*(size_t *)valp = q->q_lowat;
3183 		break;
3184 
3185 	case QMAXPSZ:
3186 		if (qbp)
3187 			error = EINVAL;
3188 		else
3189 			*(ssize_t *)valp = q->q_maxpsz;
3190 		break;
3191 
3192 	case QMINPSZ:
3193 		if (qbp)
3194 			error = EINVAL;
3195 		else
3196 			*(ssize_t *)valp = q->q_minpsz;
3197 		break;
3198 
3199 	case QCOUNT:
3200 		if (qbp)
3201 			*(size_t *)valp = qbp->qb_count;
3202 		else
3203 			*(size_t *)valp = q->q_count;
3204 		break;
3205 
3206 	case QFIRST:
3207 		if (qbp)
3208 			*(mblk_t **)valp = qbp->qb_first;
3209 		else
3210 			*(mblk_t **)valp = q->q_first;
3211 		break;
3212 
3213 	case QLAST:
3214 		if (qbp)
3215 			*(mblk_t **)valp = qbp->qb_last;
3216 		else
3217 			*(mblk_t **)valp = q->q_last;
3218 		break;
3219 
3220 	case QFLAG:
3221 		if (qbp)
3222 			*(uint_t *)valp = qbp->qb_flag;
3223 		else
3224 			*(uint_t *)valp = q->q_flag;
3225 		break;
3226 
3227 	case QSTRUIOT:
3228 		if (qbp)
3229 			error = EINVAL;
3230 		else
3231 			*(short *)valp = q->q_struiot;
3232 		break;
3233 
3234 	default:
3235 		error = EINVAL;
3236 		break;
3237 	}
3238 done:
3239 	if (freezer != curthread)
3240 		mutex_exit(QLOCK(q));
3241 	return (error);
3242 }
3243 
3244 /*
3245  * Function awakes all in cvwait/sigwait/pollwait, on one of:
3246  *	QWANTWSYNC or QWANTR or QWANTW,
3247  *
3248  * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
3249  *	 deferred wakeup will be done. Also if strpoll() in progress then a
3250  *	 deferred pollwakeup will be done.
3251  */
3252 void
3253 strwakeq(queue_t *q, int flag)
3254 {
3255 	stdata_t 	*stp = STREAM(q);
3256 	pollhead_t 	*pl;
3257 
3258 	mutex_enter(&stp->sd_lock);
3259 	pl = &stp->sd_pollist;
3260 	if (flag & QWANTWSYNC) {
3261 		ASSERT(!(q->q_flag & QREADR));
3262 		if (stp->sd_flag & WSLEEP) {
3263 			stp->sd_flag &= ~WSLEEP;
3264 			cv_broadcast(&stp->sd_wrq->q_wait);
3265 		} else {
3266 			stp->sd_wakeq |= WSLEEP;
3267 		}
3268 
3269 		mutex_exit(&stp->sd_lock);
3270 		pollwakeup(pl, POLLWRNORM);
3271 		mutex_enter(&stp->sd_lock);
3272 
3273 		if (stp->sd_sigflags & S_WRNORM)
3274 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3275 	} else if (flag & QWANTR) {
3276 		if (stp->sd_flag & RSLEEP) {
3277 			stp->sd_flag &= ~RSLEEP;
3278 			cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
3279 		} else {
3280 			stp->sd_wakeq |= RSLEEP;
3281 		}
3282 
3283 		mutex_exit(&stp->sd_lock);
3284 		pollwakeup(pl, POLLIN | POLLRDNORM);
3285 		mutex_enter(&stp->sd_lock);
3286 
3287 		{
3288 			int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
3289 
3290 			if (events)
3291 				strsendsig(stp->sd_siglist, events, 0, 0);
3292 		}
3293 	} else {
3294 		if (stp->sd_flag & WSLEEP) {
3295 			stp->sd_flag &= ~WSLEEP;
3296 			cv_broadcast(&stp->sd_wrq->q_wait);
3297 		}
3298 
3299 		mutex_exit(&stp->sd_lock);
3300 		pollwakeup(pl, POLLWRNORM);
3301 		mutex_enter(&stp->sd_lock);
3302 
3303 		if (stp->sd_sigflags & S_WRNORM)
3304 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3305 	}
3306 	mutex_exit(&stp->sd_lock);
3307 }
3308 
3309 int
3310 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
3311 {
3312 	stdata_t *stp = STREAM(q);
3313 	int typ  = STRUIOT_STANDARD;
3314 	uio_t	 *uiop = &dp->d_uio;
3315 	dblk_t	 *dbp;
3316 	ssize_t	 uiocnt;
3317 	ssize_t	 cnt;
3318 	unsigned char *ptr;
3319 	ssize_t	 resid;
3320 	int	 error = 0;
3321 	on_trap_data_t otd;
3322 	queue_t	*stwrq;
3323 
3324 	/*
3325 	 * Plumbing may change while taking the type so store the
3326 	 * queue in a temporary variable. It doesn't matter even
3327 	 * if the we take the type from the previous plumbing,
3328 	 * that's because if the plumbing has changed when we were
3329 	 * holding the queue in a temporary variable, we can continue
3330 	 * processing the message the way it would have been processed
3331 	 * in the old plumbing, without any side effects but a bit
3332 	 * extra processing for partial ip header checksum.
3333 	 *
3334 	 * This has been done to avoid holding the sd_lock which is
3335 	 * very hot.
3336 	 */
3337 
3338 	stwrq = stp->sd_struiowrq;
3339 	if (stwrq)
3340 		typ = stwrq->q_struiot;
3341 
3342 	for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
3343 		dbp = mp->b_datap;
3344 		ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
3345 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3346 		cnt = MIN(uiocnt, uiop->uio_resid);
3347 		if (!(dbp->db_struioflag & STRUIO_SPEC) ||
3348 		    (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
3349 			/*
3350 			 * Either this mblk has already been processed
3351 			 * or there is no more room in this mblk (?).
3352 			 */
3353 			continue;
3354 		}
3355 		switch (typ) {
3356 		case STRUIOT_STANDARD:
3357 			if (noblock) {
3358 				if (on_trap(&otd, OT_DATA_ACCESS)) {
3359 					no_trap();
3360 					error = EWOULDBLOCK;
3361 					goto out;
3362 				}
3363 			}
3364 			if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
3365 				if (noblock)
3366 					no_trap();
3367 				goto out;
3368 			}
3369 			if (noblock)
3370 				no_trap();
3371 			break;
3372 
3373 		default:
3374 			error = EIO;
3375 			goto out;
3376 		}
3377 		dbp->db_struioflag |= STRUIO_DONE;
3378 		dbp->db_cksumstuff += cnt;
3379 	}
3380 out:
3381 	if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
3382 		/*
3383 		 * A fault has occured and some bytes were moved to the
3384 		 * current mblk, the uio_t has already been updated by
3385 		 * the appropriate uio routine, so also update the mblk
3386 		 * to reflect this in case this same mblk chain is used
3387 		 * again (after the fault has been handled).
3388 		 */
3389 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3390 		if (uiocnt >= resid)
3391 			dbp->db_cksumstuff += resid;
3392 	}
3393 	return (error);
3394 }
3395 
3396 /*
3397  * Try to enter queue synchronously. Any attempt to enter a closing queue will
3398  * fails. The qp->q_rwcnt keeps track of the number of successful entries so
3399  * that removeq() will not try to close the queue while a thread is inside the
3400  * queue.
3401  */
3402 static boolean_t
3403 rwnext_enter(queue_t *qp)
3404 {
3405 	mutex_enter(QLOCK(qp));
3406 	if (qp->q_flag & QWCLOSE) {
3407 		mutex_exit(QLOCK(qp));
3408 		return (B_FALSE);
3409 	}
3410 	qp->q_rwcnt++;
3411 	ASSERT(qp->q_rwcnt != 0);
3412 	mutex_exit(QLOCK(qp));
3413 	return (B_TRUE);
3414 }
3415 
3416 /*
3417  * Decrease the count of threads running in sync stream queue and wake up any
3418  * threads blocked in removeq().
3419  */
3420 static void
3421 rwnext_exit(queue_t *qp)
3422 {
3423 	mutex_enter(QLOCK(qp));
3424 	qp->q_rwcnt--;
3425 	if (qp->q_flag & QWANTRMQSYNC) {
3426 		qp->q_flag &= ~QWANTRMQSYNC;
3427 		cv_broadcast(&qp->q_wait);
3428 	}
3429 	mutex_exit(QLOCK(qp));
3430 }
3431 
3432 /*
3433  * The purpose of rwnext() is to call the rw procedure of the next
3434  * (downstream) modules queue.
3435  *
3436  * treated as put entrypoint for perimeter syncronization.
3437  *
3438  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3439  * sync queues). If it is CIPUT sync queue sq_count is incremented and it does
3440  * not matter if any regular put entrypoints have been already entered. We
3441  * can't increment one of the sq_putcounts (instead of sq_count) because
3442  * qwait_rw won't know which counter to decrement.
3443  *
3444  * It would be reasonable to add the lockless FASTPUT logic.
3445  */
3446 int
3447 rwnext(queue_t *qp, struiod_t *dp)
3448 {
3449 	queue_t		*nqp;
3450 	syncq_t		*sq;
3451 	uint16_t	count;
3452 	uint16_t	flags;
3453 	struct qinit	*qi;
3454 	int		(*proc)();
3455 	struct stdata	*stp;
3456 	int		isread;
3457 	int		rval;
3458 
3459 	stp = STREAM(qp);
3460 	/*
3461 	 * Prevent q_next from changing by holding sd_lock until acquiring
3462 	 * SQLOCK. Note that a read-side rwnext from the streamhead will
3463 	 * already have sd_lock acquired. In either case sd_lock is always
3464 	 * released after acquiring SQLOCK.
3465 	 *
3466 	 * The streamhead read-side holding sd_lock when calling rwnext is
3467 	 * required to prevent a race condition were M_DATA mblks flowing
3468 	 * up the read-side of the stream could be bypassed by a rwnext()
3469 	 * down-call. In this case sd_lock acts as the streamhead perimeter.
3470 	 */
3471 	if ((nqp = _WR(qp)) == qp) {
3472 		isread = 0;
3473 		mutex_enter(&stp->sd_lock);
3474 		qp = nqp->q_next;
3475 	} else {
3476 		isread = 1;
3477 		if (nqp != stp->sd_wrq)
3478 			/* Not streamhead */
3479 			mutex_enter(&stp->sd_lock);
3480 		qp = _RD(nqp->q_next);
3481 	}
3482 	qi = qp->q_qinfo;
3483 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
3484 		/*
3485 		 * Not a synchronous module or no r/w procedure for this
3486 		 * queue, so just return EINVAL and let the caller handle it.
3487 		 */
3488 		mutex_exit(&stp->sd_lock);
3489 		return (EINVAL);
3490 	}
3491 
3492 	if (rwnext_enter(qp) == B_FALSE) {
3493 		mutex_exit(&stp->sd_lock);
3494 		return (EINVAL);
3495 	}
3496 
3497 	sq = qp->q_syncq;
3498 	mutex_enter(SQLOCK(sq));
3499 	mutex_exit(&stp->sd_lock);
3500 	count = sq->sq_count;
3501 	flags = sq->sq_flags;
3502 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3503 
3504 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3505 		/*
3506 		 * if this queue is being closed, return.
3507 		 */
3508 		if (qp->q_flag & QWCLOSE) {
3509 			mutex_exit(SQLOCK(sq));
3510 			rwnext_exit(qp);
3511 			return (EINVAL);
3512 		}
3513 
3514 		/*
3515 		 * Wait until we can enter the inner perimeter.
3516 		 */
3517 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3518 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3519 		count = sq->sq_count;
3520 		flags = sq->sq_flags;
3521 	}
3522 
3523 	if (isread == 0 && stp->sd_struiowrq == NULL ||
3524 	    isread == 1 && stp->sd_struiordq == NULL) {
3525 		/*
3526 		 * Stream plumbing changed while waiting for inner perimeter
3527 		 * so just return EINVAL and let the caller handle it.
3528 		 */
3529 		mutex_exit(SQLOCK(sq));
3530 		rwnext_exit(qp);
3531 		return (EINVAL);
3532 	}
3533 	if (!(flags & SQ_CIPUT))
3534 		sq->sq_flags = flags | SQ_EXCL;
3535 	sq->sq_count = count + 1;
3536 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3537 	/*
3538 	 * Note: The only message ordering guarantee that rwnext() makes is
3539 	 *	 for the write queue flow-control case. All others (r/w queue
3540 	 *	 with q_count > 0 (or q_first != 0)) are the resposibilty of
3541 	 *	 the queue's rw procedure. This could be genralized here buy
3542 	 *	 running the queue's service procedure, but that wouldn't be
3543 	 *	 the most efficent for all cases.
3544 	 */
3545 	mutex_exit(SQLOCK(sq));
3546 	if (! isread && (qp->q_flag & QFULL)) {
3547 		/*
3548 		 * Write queue may be flow controlled. If so,
3549 		 * mark the queue for wakeup when it's not.
3550 		 */
3551 		mutex_enter(QLOCK(qp));
3552 		if (qp->q_flag & QFULL) {
3553 			qp->q_flag |= QWANTWSYNC;
3554 			mutex_exit(QLOCK(qp));
3555 			rval = EWOULDBLOCK;
3556 			goto out;
3557 		}
3558 		mutex_exit(QLOCK(qp));
3559 	}
3560 
3561 	if (! isread && dp->d_mp)
3562 		STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
3563 		    dp->d_mp->b_datap->db_base);
3564 
3565 	rval = (*proc)(qp, dp);
3566 
3567 	if (isread && dp->d_mp)
3568 		STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
3569 		    dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
3570 out:
3571 	/*
3572 	 * The queue is protected from being freed by sq_count, so it is
3573 	 * safe to call rwnext_exit and reacquire SQLOCK(sq).
3574 	 */
3575 	rwnext_exit(qp);
3576 
3577 	mutex_enter(SQLOCK(sq));
3578 	flags = sq->sq_flags;
3579 	ASSERT(sq->sq_count != 0);
3580 	sq->sq_count--;
3581 	if (flags & SQ_TAIL) {
3582 		putnext_tail(sq, qp, flags);
3583 		/*
3584 		 * The only purpose of this ASSERT is to preserve calling stack
3585 		 * in DEBUG kernel.
3586 		 */
3587 		ASSERT(flags & SQ_TAIL);
3588 		return (rval);
3589 	}
3590 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3591 	/*
3592 	 * Safe to always drop SQ_EXCL:
3593 	 *	Not SQ_CIPUT means we set SQ_EXCL above
3594 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3595 	 *	did a qwriter(INNER) in which case nobody else
3596 	 *	is in the inner perimeter and we are exiting.
3597 	 *
3598 	 * I would like to make the following assertion:
3599 	 *
3600 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3601 	 * 	sq->sq_count == 0);
3602 	 *
3603 	 * which indicates that if we are both putshared and exclusive,
3604 	 * we became exclusive while executing the putproc, and the only
3605 	 * claim on the syncq was the one we dropped a few lines above.
3606 	 * But other threads that enter putnext while the syncq is exclusive
3607 	 * need to make a claim as they may need to drop SQLOCK in the
3608 	 * has_writers case to avoid deadlocks.  If these threads are
3609 	 * delayed or preempted, it is possible that the writer thread can
3610 	 * find out that there are other claims making the (sq_count == 0)
3611 	 * test invalid.
3612 	 */
3613 
3614 	sq->sq_flags = flags & ~SQ_EXCL;
3615 	if (sq->sq_flags & SQ_WANTWAKEUP) {
3616 		sq->sq_flags &= ~SQ_WANTWAKEUP;
3617 		cv_broadcast(&sq->sq_wait);
3618 	}
3619 	mutex_exit(SQLOCK(sq));
3620 	return (rval);
3621 }
3622 
3623 /*
3624  * The purpose of infonext() is to call the info procedure of the next
3625  * (downstream) modules queue.
3626  *
3627  * treated as put entrypoint for perimeter syncronization.
3628  *
3629  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3630  * sync queues). If it is CIPUT sync queue regular sq_count is incremented and
3631  * it does not matter if any regular put entrypoints have been already
3632  * entered.
3633  */
3634 int
3635 infonext(queue_t *qp, infod_t *idp)
3636 {
3637 	queue_t		*nqp;
3638 	syncq_t		*sq;
3639 	uint16_t	count;
3640 	uint16_t 	flags;
3641 	struct qinit	*qi;
3642 	int		(*proc)();
3643 	struct stdata	*stp;
3644 	int		rval;
3645 
3646 	stp = STREAM(qp);
3647 	/*
3648 	 * Prevent q_next from changing by holding sd_lock until
3649 	 * acquiring SQLOCK.
3650 	 */
3651 	mutex_enter(&stp->sd_lock);
3652 	if ((nqp = _WR(qp)) == qp) {
3653 		qp = nqp->q_next;
3654 	} else {
3655 		qp = _RD(nqp->q_next);
3656 	}
3657 	qi = qp->q_qinfo;
3658 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
3659 		mutex_exit(&stp->sd_lock);
3660 		return (EINVAL);
3661 	}
3662 	sq = qp->q_syncq;
3663 	mutex_enter(SQLOCK(sq));
3664 	mutex_exit(&stp->sd_lock);
3665 	count = sq->sq_count;
3666 	flags = sq->sq_flags;
3667 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3668 
3669 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3670 		/*
3671 		 * Wait until we can enter the inner perimeter.
3672 		 */
3673 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3674 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3675 		count = sq->sq_count;
3676 		flags = sq->sq_flags;
3677 	}
3678 
3679 	if (! (flags & SQ_CIPUT))
3680 		sq->sq_flags = flags | SQ_EXCL;
3681 	sq->sq_count = count + 1;
3682 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3683 	mutex_exit(SQLOCK(sq));
3684 
3685 	rval = (*proc)(qp, idp);
3686 
3687 	mutex_enter(SQLOCK(sq));
3688 	flags = sq->sq_flags;
3689 	ASSERT(sq->sq_count != 0);
3690 	sq->sq_count--;
3691 	if (flags & SQ_TAIL) {
3692 		putnext_tail(sq, qp, flags);
3693 		/*
3694 		 * The only purpose of this ASSERT is to preserve calling stack
3695 		 * in DEBUG kernel.
3696 		 */
3697 		ASSERT(flags & SQ_TAIL);
3698 		return (rval);
3699 	}
3700 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3701 /*
3702  * XXXX
3703  * I am not certain the next comment is correct here.  I need to consider
3704  * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
3705  * might cause other problems.  It just might be safer to drop it if
3706  * !SQ_CIPUT because that is when we set it.
3707  */
3708 	/*
3709 	 * Safe to always drop SQ_EXCL:
3710 	 *	Not SQ_CIPUT means we set SQ_EXCL above
3711 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3712 	 *	did a qwriter(INNER) in which case nobody else
3713 	 *	is in the inner perimeter and we are exiting.
3714 	 *
3715 	 * I would like to make the following assertion:
3716 	 *
3717 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3718 	 *	sq->sq_count == 0);
3719 	 *
3720 	 * which indicates that if we are both putshared and exclusive,
3721 	 * we became exclusive while executing the putproc, and the only
3722 	 * claim on the syncq was the one we dropped a few lines above.
3723 	 * But other threads that enter putnext while the syncq is exclusive
3724 	 * need to make a claim as they may need to drop SQLOCK in the
3725 	 * has_writers case to avoid deadlocks.  If these threads are
3726 	 * delayed or preempted, it is possible that the writer thread can
3727 	 * find out that there are other claims making the (sq_count == 0)
3728 	 * test invalid.
3729 	 */
3730 
3731 	sq->sq_flags = flags & ~SQ_EXCL;
3732 	mutex_exit(SQLOCK(sq));
3733 	return (rval);
3734 }
3735 
3736 /*
3737  * Return nonzero if the queue is responsible for struio(), else return 0.
3738  */
3739 int
3740 isuioq(queue_t *q)
3741 {
3742 	if (q->q_flag & QREADR)
3743 		return (STREAM(q)->sd_struiordq == q);
3744 	else
3745 		return (STREAM(q)->sd_struiowrq == q);
3746 }
3747 
3748 #if defined(__sparc)
3749 int disable_putlocks = 0;
3750 #else
3751 int disable_putlocks = 1;
3752 #endif
3753 
3754 /*
3755  * called by create_putlock.
3756  */
3757 static void
3758 create_syncq_putlocks(queue_t *q)
3759 {
3760 	syncq_t	*sq = q->q_syncq;
3761 	ciputctrl_t *cip;
3762 	int i;
3763 
3764 	ASSERT(sq != NULL);
3765 
3766 	ASSERT(disable_putlocks == 0);
3767 	ASSERT(n_ciputctrl >= min_n_ciputctrl);
3768 	ASSERT(ciputctrl_cache != NULL);
3769 
3770 	if (!(sq->sq_type & SQ_CIPUT))
3771 		return;
3772 
3773 	for (i = 0; i <= 1; i++) {
3774 		if (sq->sq_ciputctrl == NULL) {
3775 			cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
3776 			SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
3777 			mutex_enter(SQLOCK(sq));
3778 			if (sq->sq_ciputctrl != NULL) {
3779 				mutex_exit(SQLOCK(sq));
3780 				kmem_cache_free(ciputctrl_cache, cip);
3781 			} else {
3782 				ASSERT(sq->sq_nciputctrl == 0);
3783 				sq->sq_nciputctrl = n_ciputctrl - 1;
3784 				/*
3785 				 * putnext checks sq_ciputctrl without holding
3786 				 * SQLOCK. if it is not NULL putnext assumes
3787 				 * sq_nciputctrl is initialized. membar below
3788 				 * insures that.
3789 				 */
3790 				membar_producer();
3791 				sq->sq_ciputctrl = cip;
3792 				mutex_exit(SQLOCK(sq));
3793 			}
3794 		}
3795 		ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
3796 		if (i == 1)
3797 			break;
3798 		q = _OTHERQ(q);
3799 		if (!(q->q_flag & QPERQ)) {
3800 			ASSERT(sq == q->q_syncq);
3801 			break;
3802 		}
3803 		ASSERT(q->q_syncq != NULL);
3804 		ASSERT(sq != q->q_syncq);
3805 		sq = q->q_syncq;
3806 		ASSERT(sq->sq_type & SQ_CIPUT);
3807 	}
3808 }
3809 
3810 /*
3811  * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
3812  * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
3813  * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
3814  * starting from q and down to the driver.
3815  *
3816  * This should be called after the affected queues are part of stream
3817  * geometry. It should be called from driver/module open routine after
3818  * qprocson() call. It is also called from nfs syscall where it is known that
3819  * stream is configured and won't change its geometry during create_putlock
3820  * call.
3821  *
3822  * caller normally uses 0 value for the stream argument to speed up MT putnext
3823  * into the perimeter of q for example because its perimeter is per module
3824  * (e.g. IP).
3825  *
3826  * caller normally uses non 0 value for the stream argument to hint the system
3827  * that the stream of q is a very contended global system stream
3828  * (e.g. NFS/UDP) and the part of the stream from q to the driver is
3829  * particularly MT hot.
3830  *
3831  * Caller insures stream plumbing won't happen while we are here and therefore
3832  * q_next can be safely used.
3833  */
3834 
3835 void
3836 create_putlocks(queue_t *q, int stream)
3837 {
3838 	ciputctrl_t	*cip;
3839 	struct stdata	*stp = STREAM(q);
3840 
3841 	q = _WR(q);
3842 	ASSERT(stp != NULL);
3843 
3844 	if (disable_putlocks != 0)
3845 		return;
3846 
3847 	if (n_ciputctrl < min_n_ciputctrl)
3848 		return;
3849 
3850 	ASSERT(ciputctrl_cache != NULL);
3851 
3852 	if (stream != 0 && stp->sd_ciputctrl == NULL) {
3853 		cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
3854 		SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
3855 		mutex_enter(&stp->sd_lock);
3856 		if (stp->sd_ciputctrl != NULL) {
3857 			mutex_exit(&stp->sd_lock);
3858 			kmem_cache_free(ciputctrl_cache, cip);
3859 		} else {
3860 			ASSERT(stp->sd_nciputctrl == 0);
3861 			stp->sd_nciputctrl = n_ciputctrl - 1;
3862 			/*
3863 			 * putnext checks sd_ciputctrl without holding
3864 			 * sd_lock. if it is not NULL putnext assumes
3865 			 * sd_nciputctrl is initialized. membar below
3866 			 * insures that.
3867 			 */
3868 			membar_producer();
3869 			stp->sd_ciputctrl = cip;
3870 			mutex_exit(&stp->sd_lock);
3871 		}
3872 	}
3873 
3874 	ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);
3875 
3876 	while (_SAMESTR(q)) {
3877 		create_syncq_putlocks(q);
3878 		if (stream == 0)
3879 			return;
3880 		q = q->q_next;
3881 	}
3882 	ASSERT(q != NULL);
3883 	create_syncq_putlocks(q);
3884 }
3885 
3886 /*
3887  * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
3888  * through a stream.
3889  *
3890  * Data currently record per event is a hrtime stamp, queue address, event
3891  * type, and a per type datum.  Much of the STREAMS framework is instrumented
3892  * for automatic flow tracing (when enabled).  Events can be defined and used
3893  * by STREAMS modules and drivers.
3894  *
3895  * Global objects:
3896  *
3897  *	str_ftevent() - Add a flow-trace event to a dblk.
3898  *	str_ftfree() - Free flow-trace data
3899  *
3900  * Local objects:
3901  *
3902  *	fthdr_cache - pointer to the kmem cache for trace header.
3903  *	ftblk_cache - pointer to the kmem cache for trace data blocks.
3904  */
3905 
3906 int str_ftnever = 1;	/* Don't do STREAMS flow tracing */
3907 
3908 void
3909 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
3910 {
3911 	ftblk_t *bp = hp->tail;
3912 	ftblk_t *nbp;
3913 	ftevnt_t *ep;
3914 	int ix, nix;
3915 
3916 	ASSERT(hp != NULL);
3917 
3918 	for (;;) {
3919 		if ((ix = bp->ix) == FTBLK_EVNTS) {
3920 			/*
3921 			 * Tail doesn't have room, so need a new tail.
3922 			 *
3923 			 * To make this MT safe, first, allocate a new
3924 			 * ftblk, and initialize it.  To make life a
3925 			 * little easier, reserve the first slot (mostly
3926 			 * by making ix = 1).  When we are finished with
3927 			 * the initialization, CAS this pointer to the
3928 			 * tail.  If this succeeds, this is the new
3929 			 * "next" block.  Otherwise, another thread
3930 			 * got here first, so free the block and start
3931 			 * again.
3932 			 */
3933 			if (!(nbp = kmem_cache_alloc(ftblk_cache,
3934 			    KM_NOSLEEP))) {
3935 				/* no mem, so punt */
3936 				str_ftnever++;
3937 				/* free up all flow data? */
3938 				return;
3939 			}
3940 			nbp->nxt = NULL;
3941 			nbp->ix = 1;
3942 			/*
3943 			 * Just in case there is another thread about
3944 			 * to get the next index, we need to make sure
3945 			 * the value is there for it.
3946 			 */
3947 			membar_producer();
3948 			if (casptr(&hp->tail, bp, nbp) == bp) {
3949 				/* CAS was successful */
3950 				bp->nxt = nbp;
3951 				membar_producer();
3952 				bp = nbp;
3953 				ix = 0;
3954 				goto cas_good;
3955 			} else {
3956 				kmem_cache_free(ftblk_cache, nbp);
3957 				bp = hp->tail;
3958 				continue;
3959 			}
3960 		}
3961 		nix = ix + 1;
3962 		if (cas32((uint32_t *)&bp->ix, ix, nix) == ix) {
3963 		cas_good:
3964 			if (curthread != hp->thread) {
3965 				hp->thread = curthread;
3966 				evnt |= FTEV_CS;
3967 			}
3968 			if (CPU->cpu_seqid != hp->cpu_seqid) {
3969 				hp->cpu_seqid = CPU->cpu_seqid;
3970 				evnt |= FTEV_PS;
3971 			}
3972 			ep = &bp->ev[ix];
3973 			break;
3974 		}
3975 	}
3976 
3977 	if (evnt & FTEV_QMASK) {
3978 		queue_t *qp = p;
3979 
3980 		/*
3981 		 * It is possible that the module info is broke
3982 		 * (as is logsubr.c at this comment writing).
3983 		 * Instead of panicing or doing other unmentionables,
3984 		 * we shall put a dummy name as the mid, and continue.
3985 		 */
3986 		if (qp->q_qinfo == NULL)
3987 			ep->mid = "NONAME";
3988 		else
3989 			ep->mid = qp->q_qinfo->qi_minfo->mi_idname;
3990 
3991 		if (!(qp->q_flag & QREADR))
3992 			evnt |= FTEV_ISWR;
3993 	} else {
3994 		ep->mid = (char *)p;
3995 	}
3996 
3997 	ep->ts = gethrtime();
3998 	ep->evnt = evnt;
3999 	ep->data = data;
4000 	hp->hash = (hp->hash << 9) + hp->hash;
4001 	hp->hash += (evnt << 16) | data;
4002 	hp->hash += (uintptr_t)ep->mid;
4003 }
4004 
4005 /*
4006  * Free flow-trace data.
4007  */
4008 void
4009 str_ftfree(dblk_t *dbp)
4010 {
4011 	fthdr_t *hp = dbp->db_fthdr;
4012 	ftblk_t *bp = &hp->first;
4013 	ftblk_t *nbp;
4014 
4015 	if (bp != hp->tail || bp->ix != 0) {
4016 		/*
4017 		 * Clear out the hash, have the tail point to itself, and free
4018 		 * any continuation blocks.
4019 		 */
4020 		bp = hp->first.nxt;
4021 		hp->tail = &hp->first;
4022 		hp->hash = 0;
4023 		hp->first.nxt = NULL;
4024 		hp->first.ix = 0;
4025 		while (bp != NULL) {
4026 			nbp = bp->nxt;
4027 			kmem_cache_free(ftblk_cache, bp);
4028 			bp = nbp;
4029 		}
4030 	}
4031 	kmem_cache_free(fthdr_cache, hp);
4032 	dbp->db_fthdr = NULL;
4033 }
4034