xref: /illumos-gate/usr/src/uts/common/io/stream.c (revision 4c28a617e3922d92a58e813a5b955eb526b9c386)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
22 /*	  All Rights Reserved  	*/
23 
24 /*
25  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
26  * Use is subject to license terms.
27  */
28 
29 #include <sys/types.h>
30 #include <sys/param.h>
31 #include <sys/thread.h>
32 #include <sys/sysmacros.h>
33 #include <sys/stropts.h>
34 #include <sys/stream.h>
35 #include <sys/strsubr.h>
36 #include <sys/strsun.h>
37 #include <sys/conf.h>
38 #include <sys/debug.h>
39 #include <sys/cmn_err.h>
40 #include <sys/kmem.h>
41 #include <sys/atomic.h>
42 #include <sys/errno.h>
43 #include <sys/vtrace.h>
44 #include <sys/ftrace.h>
45 #include <sys/ontrap.h>
46 #include <sys/multidata.h>
47 #include <sys/multidata_impl.h>
48 #include <sys/sdt.h>
49 #include <sys/strft.h>
50 
51 #ifdef DEBUG
52 #include <sys/kmem_impl.h>
53 #endif
54 
55 /*
56  * This file contains all the STREAMS utility routines that may
57  * be used by modules and drivers.
58  */
59 
60 /*
61  * STREAMS message allocator: principles of operation
62  *
63  * The streams message allocator consists of all the routines that
64  * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
65  * dupb(), freeb() and freemsg().  What follows is a high-level view
66  * of how the allocator works.
67  *
68  * Every streams message consists of one or more mblks, a dblk, and data.
69  * All mblks for all types of messages come from a common mblk_cache.
70  * The dblk and data come in several flavors, depending on how the
71  * message is allocated:
72  *
73  * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
74  *     fixed-size dblk/data caches. For message sizes that are multiples of
75  *     PAGESIZE, dblks are allocated separately from the buffer.
76  *     The associated buffer is allocated by the constructor using kmem_alloc().
77  *     For all other message sizes, dblk and its associated data is allocated
78  *     as a single contiguous chunk of memory.
79  *     Objects in these caches consist of a dblk plus its associated data.
80  *     allocb() determines the nearest-size cache by table lookup:
81  *     the dblk_cache[] array provides the mapping from size to dblk cache.
82  *
83  * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
84  *     kmem_alloc()'ing a buffer for the data and supplying that
85  *     buffer to gesballoc(), described below.
86  *
87  * (3) The four flavors of [d]esballoc[a] are all implemented by a
88  *     common routine, gesballoc() ("generic esballoc").  gesballoc()
89  *     allocates a dblk from the global dblk_esb_cache and sets db_base,
90  *     db_lim and db_frtnp to describe the caller-supplied buffer.
91  *
92  * While there are several routines to allocate messages, there is only
93  * one routine to free messages: freeb().  freeb() simply invokes the
94  * dblk's free method, dbp->db_free(), which is set at allocation time.
95  *
96  * dupb() creates a new reference to a message by allocating a new mblk,
97  * incrementing the dblk reference count and setting the dblk's free
98  * method to dblk_decref().  The dblk's original free method is retained
99  * in db_lastfree.  dblk_decref() decrements the reference count on each
100  * freeb().  If this is not the last reference it just frees the mblk;
101  * if this *is* the last reference, it restores db_free to db_lastfree,
102  * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
103  *
104  * The implementation makes aggressive use of kmem object caching for
105  * maximum performance.  This makes the code simple and compact, but
106  * also a bit abstruse in some places.  The invariants that constitute a
107  * message's constructed state, described below, are more subtle than usual.
108  *
109  * Every dblk has an "attached mblk" as part of its constructed state.
110  * The mblk is allocated by the dblk's constructor and remains attached
111  * until the message is either dup'ed or pulled up.  In the dupb() case
112  * the mblk association doesn't matter until the last free, at which time
113  * dblk_decref() attaches the last mblk to the dblk.  pullupmsg() affects
114  * the mblk association because it swaps the leading mblks of two messages,
115  * so it is responsible for swapping their db_mblk pointers accordingly.
116  * From a constructed-state viewpoint it doesn't matter that a dblk's
117  * attached mblk can change while the message is allocated; all that
118  * matters is that the dblk has *some* attached mblk when it's freed.
119  *
120  * The sizes of the allocb() small-message caches are not magical.
121  * They represent a good trade-off between internal and external
122  * fragmentation for current workloads.  They should be reevaluated
123  * periodically, especially if allocations larger than DBLK_MAX_CACHE
124  * become common.  We use 64-byte alignment so that dblks don't
125  * straddle cache lines unnecessarily.
126  */
127 #define	DBLK_MAX_CACHE		73728
128 #define	DBLK_CACHE_ALIGN	64
129 #define	DBLK_MIN_SIZE		8
130 #define	DBLK_SIZE_SHIFT		3
131 
132 #ifdef _BIG_ENDIAN
133 #define	DBLK_RTFU_SHIFT(field)	\
134 	(8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
135 #else
136 #define	DBLK_RTFU_SHIFT(field)	\
137 	(8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
138 #endif
139 
140 #define	DBLK_RTFU(ref, type, flags, uioflag)	\
141 	(((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
142 	((type) << DBLK_RTFU_SHIFT(db_type)) | \
143 	(((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
144 	((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
145 #define	DBLK_RTFU_REF_MASK	(DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
146 #define	DBLK_RTFU_WORD(dbp)	(*((uint32_t *)&(dbp)->db_ref))
147 #define	MBLK_BAND_FLAG_WORD(mp)	(*((uint32_t *)&(mp)->b_band))
148 
149 static size_t dblk_sizes[] = {
150 #ifdef _LP64
151 	16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856,
152 	8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624,
153 	40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392,
154 #else
155 	64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904,
156 	8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672,
157 	40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440,
158 #endif
159 	DBLK_MAX_CACHE, 0
160 };
161 
162 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
163 static struct kmem_cache *mblk_cache;
164 static struct kmem_cache *dblk_esb_cache;
165 static struct kmem_cache *fthdr_cache;
166 static struct kmem_cache *ftblk_cache;
167 
168 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
169 static mblk_t *allocb_oversize(size_t size, int flags);
170 static int allocb_tryhard_fails;
171 static void frnop_func(void *arg);
172 frtn_t frnop = { frnop_func };
173 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
174 
175 static boolean_t rwnext_enter(queue_t *qp);
176 static void rwnext_exit(queue_t *qp);
177 
178 /*
179  * Patchable mblk/dblk kmem_cache flags.
180  */
181 int dblk_kmem_flags = 0;
182 int mblk_kmem_flags = 0;
183 
184 static int
185 dblk_constructor(void *buf, void *cdrarg, int kmflags)
186 {
187 	dblk_t *dbp = buf;
188 	ssize_t msg_size = (ssize_t)cdrarg;
189 	size_t index;
190 
191 	ASSERT(msg_size != 0);
192 
193 	index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
194 
195 	ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
196 
197 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
198 		return (-1);
199 	if ((msg_size & PAGEOFFSET) == 0) {
200 		dbp->db_base = kmem_alloc(msg_size, kmflags);
201 		if (dbp->db_base == NULL) {
202 			kmem_cache_free(mblk_cache, dbp->db_mblk);
203 			return (-1);
204 		}
205 	} else {
206 		dbp->db_base = (unsigned char *)&dbp[1];
207 	}
208 
209 	dbp->db_mblk->b_datap = dbp;
210 	dbp->db_cache = dblk_cache[index];
211 	dbp->db_lim = dbp->db_base + msg_size;
212 	dbp->db_free = dbp->db_lastfree = dblk_lastfree;
213 	dbp->db_frtnp = NULL;
214 	dbp->db_fthdr = NULL;
215 	dbp->db_credp = NULL;
216 	dbp->db_cpid = -1;
217 	dbp->db_struioflag = 0;
218 	dbp->db_struioun.cksum.flags = 0;
219 	return (0);
220 }
221 
222 /*ARGSUSED*/
223 static int
224 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
225 {
226 	dblk_t *dbp = buf;
227 
228 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
229 		return (-1);
230 	dbp->db_mblk->b_datap = dbp;
231 	dbp->db_cache = dblk_esb_cache;
232 	dbp->db_fthdr = NULL;
233 	dbp->db_credp = NULL;
234 	dbp->db_cpid = -1;
235 	dbp->db_struioflag = 0;
236 	dbp->db_struioun.cksum.flags = 0;
237 	return (0);
238 }
239 
240 static int
241 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
242 {
243 	dblk_t *dbp = buf;
244 	bcache_t *bcp = cdrarg;
245 
246 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
247 		return (-1);
248 
249 	dbp->db_base = kmem_cache_alloc(bcp->buffer_cache, kmflags);
250 	if (dbp->db_base == NULL) {
251 		kmem_cache_free(mblk_cache, dbp->db_mblk);
252 		return (-1);
253 	}
254 
255 	dbp->db_mblk->b_datap = dbp;
256 	dbp->db_cache = (void *)bcp;
257 	dbp->db_lim = dbp->db_base + bcp->size;
258 	dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
259 	dbp->db_frtnp = NULL;
260 	dbp->db_fthdr = NULL;
261 	dbp->db_credp = NULL;
262 	dbp->db_cpid = -1;
263 	dbp->db_struioflag = 0;
264 	dbp->db_struioun.cksum.flags = 0;
265 	return (0);
266 }
267 
268 /*ARGSUSED*/
269 static void
270 dblk_destructor(void *buf, void *cdrarg)
271 {
272 	dblk_t *dbp = buf;
273 	ssize_t msg_size = (ssize_t)cdrarg;
274 
275 	ASSERT(dbp->db_mblk->b_datap == dbp);
276 	ASSERT(msg_size != 0);
277 	ASSERT(dbp->db_struioflag == 0);
278 	ASSERT(dbp->db_struioun.cksum.flags == 0);
279 
280 	if ((msg_size & PAGEOFFSET) == 0) {
281 		kmem_free(dbp->db_base, msg_size);
282 	}
283 
284 	kmem_cache_free(mblk_cache, dbp->db_mblk);
285 }
286 
287 static void
288 bcache_dblk_destructor(void *buf, void *cdrarg)
289 {
290 	dblk_t *dbp = buf;
291 	bcache_t *bcp = cdrarg;
292 
293 	kmem_cache_free(bcp->buffer_cache, dbp->db_base);
294 
295 	ASSERT(dbp->db_mblk->b_datap == dbp);
296 	ASSERT(dbp->db_struioflag == 0);
297 	ASSERT(dbp->db_struioun.cksum.flags == 0);
298 
299 	kmem_cache_free(mblk_cache, dbp->db_mblk);
300 }
301 
302 /* ARGSUSED */
303 static int
304 ftblk_constructor(void *buf, void *cdrarg, int kmflags)
305 {
306 	ftblk_t *fbp = buf;
307 	int i;
308 
309 	bzero(fbp, sizeof (ftblk_t));
310 	if (str_ftstack != 0) {
311 		for (i = 0; i < FTBLK_EVNTS; i++)
312 			fbp->ev[i].stk = kmem_alloc(sizeof (ftstk_t), kmflags);
313 	}
314 
315 	return (0);
316 }
317 
318 /* ARGSUSED */
319 static void
320 ftblk_destructor(void *buf, void *cdrarg)
321 {
322 	ftblk_t *fbp = buf;
323 	int i;
324 
325 	if (str_ftstack != 0) {
326 		for (i = 0; i < FTBLK_EVNTS; i++) {
327 			if (fbp->ev[i].stk != NULL) {
328 				kmem_free(fbp->ev[i].stk, sizeof (ftstk_t));
329 				fbp->ev[i].stk = NULL;
330 			}
331 		}
332 	}
333 }
334 
335 static int
336 fthdr_constructor(void *buf, void *cdrarg, int kmflags)
337 {
338 	fthdr_t *fhp = buf;
339 
340 	return (ftblk_constructor(&fhp->first, cdrarg, kmflags));
341 }
342 
343 static void
344 fthdr_destructor(void *buf, void *cdrarg)
345 {
346 	fthdr_t *fhp = buf;
347 
348 	ftblk_destructor(&fhp->first, cdrarg);
349 }
350 
351 void
352 streams_msg_init(void)
353 {
354 	char name[40];
355 	size_t size;
356 	size_t lastsize = DBLK_MIN_SIZE;
357 	size_t *sizep;
358 	struct kmem_cache *cp;
359 	size_t tot_size;
360 	int offset;
361 
362 	mblk_cache = kmem_cache_create("streams_mblk", sizeof (mblk_t), 32,
363 	    NULL, NULL, NULL, NULL, NULL, mblk_kmem_flags);
364 
365 	for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
366 
367 		if ((offset = (size & PAGEOFFSET)) != 0) {
368 			/*
369 			 * We are in the middle of a page, dblk should
370 			 * be allocated on the same page
371 			 */
372 			tot_size = size + sizeof (dblk_t);
373 			ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
374 			    < PAGESIZE);
375 			ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
376 
377 		} else {
378 
379 			/*
380 			 * buf size is multiple of page size, dblk and
381 			 * buffer are allocated separately.
382 			 */
383 
384 			ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
385 			tot_size = sizeof (dblk_t);
386 		}
387 
388 		(void) sprintf(name, "streams_dblk_%ld", size);
389 		cp = kmem_cache_create(name, tot_size, DBLK_CACHE_ALIGN,
390 		    dblk_constructor, dblk_destructor, NULL, (void *)(size),
391 		    NULL, dblk_kmem_flags);
392 
393 		while (lastsize <= size) {
394 			dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
395 			lastsize += DBLK_MIN_SIZE;
396 		}
397 	}
398 
399 	dblk_esb_cache = kmem_cache_create("streams_dblk_esb", sizeof (dblk_t),
400 	    DBLK_CACHE_ALIGN, dblk_esb_constructor, dblk_destructor, NULL,
401 	    (void *)sizeof (dblk_t), NULL, dblk_kmem_flags);
402 	fthdr_cache = kmem_cache_create("streams_fthdr", sizeof (fthdr_t), 32,
403 	    fthdr_constructor, fthdr_destructor, NULL, NULL, NULL, 0);
404 	ftblk_cache = kmem_cache_create("streams_ftblk", sizeof (ftblk_t), 32,
405 	    ftblk_constructor, ftblk_destructor, NULL, NULL, NULL, 0);
406 
407 	/* Initialize Multidata caches */
408 	mmd_init();
409 
410 	/* initialize throttling queue for esballoc */
411 	esballoc_queue_init();
412 }
413 
414 /*ARGSUSED*/
415 mblk_t *
416 allocb(size_t size, uint_t pri)
417 {
418 	dblk_t *dbp;
419 	mblk_t *mp;
420 	size_t index;
421 
422 	index =  (size - 1)  >> DBLK_SIZE_SHIFT;
423 
424 	if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
425 		if (size != 0) {
426 			mp = allocb_oversize(size, KM_NOSLEEP);
427 			goto out;
428 		}
429 		index = 0;
430 	}
431 
432 	if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
433 		mp = NULL;
434 		goto out;
435 	}
436 
437 	mp = dbp->db_mblk;
438 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
439 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
440 	mp->b_rptr = mp->b_wptr = dbp->db_base;
441 	mp->b_queue = NULL;
442 	MBLK_BAND_FLAG_WORD(mp) = 0;
443 	STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
444 out:
445 	FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
446 
447 	return (mp);
448 }
449 
450 /*
451  * Allocate an mblk taking db_credp and db_cpid from the template.
452  * Allow the cred to be NULL.
453  */
454 mblk_t *
455 allocb_tmpl(size_t size, const mblk_t *tmpl)
456 {
457 	mblk_t *mp = allocb(size, 0);
458 
459 	if (mp != NULL) {
460 		dblk_t *src = tmpl->b_datap;
461 		dblk_t *dst = mp->b_datap;
462 		cred_t *cr;
463 		pid_t cpid;
464 
465 		cr = msg_getcred(tmpl, &cpid);
466 		if (cr != NULL)
467 			crhold(dst->db_credp = cr);
468 		dst->db_cpid = cpid;
469 		dst->db_type = src->db_type;
470 	}
471 	return (mp);
472 }
473 
474 mblk_t *
475 allocb_cred(size_t size, cred_t *cr, pid_t cpid)
476 {
477 	mblk_t *mp = allocb(size, 0);
478 
479 	ASSERT(cr != NULL);
480 	if (mp != NULL) {
481 		dblk_t *dbp = mp->b_datap;
482 
483 		crhold(dbp->db_credp = cr);
484 		dbp->db_cpid = cpid;
485 	}
486 	return (mp);
487 }
488 
489 mblk_t *
490 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr, pid_t cpid)
491 {
492 	mblk_t *mp = allocb_wait(size, 0, flags, error);
493 
494 	ASSERT(cr != NULL);
495 	if (mp != NULL) {
496 		dblk_t *dbp = mp->b_datap;
497 
498 		crhold(dbp->db_credp = cr);
499 		dbp->db_cpid = cpid;
500 	}
501 
502 	return (mp);
503 }
504 
505 /*
506  * Extract the db_cred (and optionally db_cpid) from a message.
507  * We find the first mblk which has a non-NULL db_cred and use that.
508  * If none found we return NULL.
509  * Does NOT get a hold on the cred.
510  */
511 cred_t *
512 msg_getcred(const mblk_t *mp, pid_t *cpidp)
513 {
514 	cred_t *cr = NULL;
515 	cred_t *cr2;
516 	mblk_t *mp2;
517 
518 	while (mp != NULL) {
519 		dblk_t *dbp = mp->b_datap;
520 
521 		cr = dbp->db_credp;
522 		if (cr == NULL) {
523 			mp = mp->b_cont;
524 			continue;
525 		}
526 		if (cpidp != NULL)
527 			*cpidp = dbp->db_cpid;
528 
529 #ifdef DEBUG
530 		/*
531 		 * Normally there should at most one db_credp in a message.
532 		 * But if there are multiple (as in the case of some M_IOC*
533 		 * and some internal messages in TCP/IP bind logic) then
534 		 * they must be identical in the normal case.
535 		 * However, a socket can be shared between different uids
536 		 * in which case data queued in TCP would be from different
537 		 * creds. Thus we can only assert for the zoneid being the
538 		 * same. Due to Multi-level Level Ports for TX, some
539 		 * cred_t can have a NULL cr_zone, and we skip the comparison
540 		 * in that case.
541 		 */
542 		mp2 = mp->b_cont;
543 		while (mp2 != NULL) {
544 			cr2 = DB_CRED(mp2);
545 			if (cr2 != NULL) {
546 				DTRACE_PROBE2(msg__getcred,
547 				    cred_t *, cr, cred_t *, cr2);
548 				ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
549 				    crgetzone(cr) == NULL ||
550 				    crgetzone(cr2) == NULL);
551 			}
552 			mp2 = mp2->b_cont;
553 		}
554 #endif
555 		return (cr);
556 	}
557 	if (cpidp != NULL)
558 		*cpidp = NOPID;
559 	return (NULL);
560 }
561 
562 /*
563  * Variant of msg_getcred which, when a cred is found
564  * 1. Returns with a hold on the cred
565  * 2. Clears the first cred in the mblk.
566  * This is more efficient to use than a msg_getcred() + crhold() when
567  * the message is freed after the cred has been extracted.
568  *
569  * The caller is responsible for ensuring that there is no other reference
570  * on the message since db_credp can not be cleared when there are other
571  * references.
572  */
573 cred_t *
574 msg_extractcred(mblk_t *mp, pid_t *cpidp)
575 {
576 	cred_t *cr = NULL;
577 	cred_t *cr2;
578 	mblk_t *mp2;
579 
580 	while (mp != NULL) {
581 		dblk_t *dbp = mp->b_datap;
582 
583 		cr = dbp->db_credp;
584 		if (cr == NULL) {
585 			mp = mp->b_cont;
586 			continue;
587 		}
588 		ASSERT(dbp->db_ref == 1);
589 		dbp->db_credp = NULL;
590 		if (cpidp != NULL)
591 			*cpidp = dbp->db_cpid;
592 #ifdef DEBUG
593 		/*
594 		 * Normally there should at most one db_credp in a message.
595 		 * But if there are multiple (as in the case of some M_IOC*
596 		 * and some internal messages in TCP/IP bind logic) then
597 		 * they must be identical in the normal case.
598 		 * However, a socket can be shared between different uids
599 		 * in which case data queued in TCP would be from different
600 		 * creds. Thus we can only assert for the zoneid being the
601 		 * same. Due to Multi-level Level Ports for TX, some
602 		 * cred_t can have a NULL cr_zone, and we skip the comparison
603 		 * in that case.
604 		 */
605 		mp2 = mp->b_cont;
606 		while (mp2 != NULL) {
607 			cr2 = DB_CRED(mp2);
608 			if (cr2 != NULL) {
609 				DTRACE_PROBE2(msg__extractcred,
610 				    cred_t *, cr, cred_t *, cr2);
611 				ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
612 				    crgetzone(cr) == NULL ||
613 				    crgetzone(cr2) == NULL);
614 			}
615 			mp2 = mp2->b_cont;
616 		}
617 #endif
618 		return (cr);
619 	}
620 	return (NULL);
621 }
622 /*
623  * Get the label for a message. Uses the first mblk in the message
624  * which has a non-NULL db_credp.
625  * Returns NULL if there is no credp.
626  */
627 extern struct ts_label_s *
628 msg_getlabel(const mblk_t *mp)
629 {
630 	cred_t *cr = msg_getcred(mp, NULL);
631 
632 	if (cr == NULL)
633 		return (NULL);
634 
635 	return (crgetlabel(cr));
636 }
637 
638 void
639 freeb(mblk_t *mp)
640 {
641 	dblk_t *dbp = mp->b_datap;
642 
643 	ASSERT(dbp->db_ref > 0);
644 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
645 	FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
646 
647 	STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
648 
649 	dbp->db_free(mp, dbp);
650 }
651 
652 void
653 freemsg(mblk_t *mp)
654 {
655 	FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
656 	while (mp) {
657 		dblk_t *dbp = mp->b_datap;
658 		mblk_t *mp_cont = mp->b_cont;
659 
660 		ASSERT(dbp->db_ref > 0);
661 		ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
662 
663 		STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
664 
665 		dbp->db_free(mp, dbp);
666 		mp = mp_cont;
667 	}
668 }
669 
670 /*
671  * Reallocate a block for another use.  Try hard to use the old block.
672  * If the old data is wanted (copy), leave b_wptr at the end of the data,
673  * otherwise return b_wptr = b_rptr.
674  *
675  * This routine is private and unstable.
676  */
677 mblk_t	*
678 reallocb(mblk_t *mp, size_t size, uint_t copy)
679 {
680 	mblk_t		*mp1;
681 	unsigned char	*old_rptr;
682 	ptrdiff_t	cur_size;
683 
684 	if (mp == NULL)
685 		return (allocb(size, BPRI_HI));
686 
687 	cur_size = mp->b_wptr - mp->b_rptr;
688 	old_rptr = mp->b_rptr;
689 
690 	ASSERT(mp->b_datap->db_ref != 0);
691 
692 	if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
693 		/*
694 		 * If the data is wanted and it will fit where it is, no
695 		 * work is required.
696 		 */
697 		if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
698 			return (mp);
699 
700 		mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
701 		mp1 = mp;
702 	} else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
703 		/* XXX other mp state could be copied too, db_flags ... ? */
704 		mp1->b_cont = mp->b_cont;
705 	} else {
706 		return (NULL);
707 	}
708 
709 	if (copy) {
710 		bcopy(old_rptr, mp1->b_rptr, cur_size);
711 		mp1->b_wptr = mp1->b_rptr + cur_size;
712 	}
713 
714 	if (mp != mp1)
715 		freeb(mp);
716 
717 	return (mp1);
718 }
719 
720 static void
721 dblk_lastfree(mblk_t *mp, dblk_t *dbp)
722 {
723 	ASSERT(dbp->db_mblk == mp);
724 	if (dbp->db_fthdr != NULL)
725 		str_ftfree(dbp);
726 
727 	/* set credp and projid to be 'unspecified' before returning to cache */
728 	if (dbp->db_credp != NULL) {
729 		crfree(dbp->db_credp);
730 		dbp->db_credp = NULL;
731 	}
732 	dbp->db_cpid = -1;
733 
734 	/* Reset the struioflag and the checksum flag fields */
735 	dbp->db_struioflag = 0;
736 	dbp->db_struioun.cksum.flags = 0;
737 
738 	/* and the COOKED and/or UIOA flag(s) */
739 	dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA);
740 
741 	kmem_cache_free(dbp->db_cache, dbp);
742 }
743 
744 static void
745 dblk_decref(mblk_t *mp, dblk_t *dbp)
746 {
747 	if (dbp->db_ref != 1) {
748 		uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
749 		    -(1 << DBLK_RTFU_SHIFT(db_ref)));
750 		/*
751 		 * atomic_add_32_nv() just decremented db_ref, so we no longer
752 		 * have a reference to the dblk, which means another thread
753 		 * could free it.  Therefore we cannot examine the dblk to
754 		 * determine whether ours was the last reference.  Instead,
755 		 * we extract the new and minimum reference counts from rtfu.
756 		 * Note that all we're really saying is "if (ref != refmin)".
757 		 */
758 		if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
759 		    ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
760 			kmem_cache_free(mblk_cache, mp);
761 			return;
762 		}
763 	}
764 	dbp->db_mblk = mp;
765 	dbp->db_free = dbp->db_lastfree;
766 	dbp->db_lastfree(mp, dbp);
767 }
768 
769 mblk_t *
770 dupb(mblk_t *mp)
771 {
772 	dblk_t *dbp = mp->b_datap;
773 	mblk_t *new_mp;
774 	uint32_t oldrtfu, newrtfu;
775 
776 	if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
777 		goto out;
778 
779 	new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
780 	new_mp->b_rptr = mp->b_rptr;
781 	new_mp->b_wptr = mp->b_wptr;
782 	new_mp->b_datap = dbp;
783 	new_mp->b_queue = NULL;
784 	MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
785 
786 	STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
787 
788 	dbp->db_free = dblk_decref;
789 	do {
790 		ASSERT(dbp->db_ref > 0);
791 		oldrtfu = DBLK_RTFU_WORD(dbp);
792 		newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
793 		/*
794 		 * If db_ref is maxed out we can't dup this message anymore.
795 		 */
796 		if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
797 			kmem_cache_free(mblk_cache, new_mp);
798 			new_mp = NULL;
799 			goto out;
800 		}
801 	} while (atomic_cas_32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) !=
802 	    oldrtfu);
803 
804 out:
805 	FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
806 	return (new_mp);
807 }
808 
809 static void
810 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
811 {
812 	frtn_t *frp = dbp->db_frtnp;
813 
814 	ASSERT(dbp->db_mblk == mp);
815 	frp->free_func(frp->free_arg);
816 	if (dbp->db_fthdr != NULL)
817 		str_ftfree(dbp);
818 
819 	/* set credp and projid to be 'unspecified' before returning to cache */
820 	if (dbp->db_credp != NULL) {
821 		crfree(dbp->db_credp);
822 		dbp->db_credp = NULL;
823 	}
824 	dbp->db_cpid = -1;
825 	dbp->db_struioflag = 0;
826 	dbp->db_struioun.cksum.flags = 0;
827 
828 	kmem_cache_free(dbp->db_cache, dbp);
829 }
830 
831 /*ARGSUSED*/
832 static void
833 frnop_func(void *arg)
834 {
835 }
836 
837 /*
838  * Generic esballoc used to implement the four flavors: [d]esballoc[a].
839  */
840 static mblk_t *
841 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
842 	void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
843 {
844 	dblk_t *dbp;
845 	mblk_t *mp;
846 
847 	ASSERT(base != NULL && frp != NULL);
848 
849 	if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
850 		mp = NULL;
851 		goto out;
852 	}
853 
854 	mp = dbp->db_mblk;
855 	dbp->db_base = base;
856 	dbp->db_lim = base + size;
857 	dbp->db_free = dbp->db_lastfree = lastfree;
858 	dbp->db_frtnp = frp;
859 	DBLK_RTFU_WORD(dbp) = db_rtfu;
860 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
861 	mp->b_rptr = mp->b_wptr = base;
862 	mp->b_queue = NULL;
863 	MBLK_BAND_FLAG_WORD(mp) = 0;
864 
865 out:
866 	FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
867 	return (mp);
868 }
869 
870 /*ARGSUSED*/
871 mblk_t *
872 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
873 {
874 	mblk_t *mp;
875 
876 	/*
877 	 * Note that this is structured to allow the common case (i.e.
878 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
879 	 * call optimization.
880 	 */
881 	if (!str_ftnever) {
882 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
883 		    frp, freebs_enqueue, KM_NOSLEEP);
884 
885 		if (mp != NULL)
886 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
887 		return (mp);
888 	}
889 
890 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
891 	    frp, freebs_enqueue, KM_NOSLEEP));
892 }
893 
894 /*
895  * Same as esballoc() but sleeps waiting for memory.
896  */
897 /*ARGSUSED*/
898 mblk_t *
899 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
900 {
901 	mblk_t *mp;
902 
903 	/*
904 	 * Note that this is structured to allow the common case (i.e.
905 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
906 	 * call optimization.
907 	 */
908 	if (!str_ftnever) {
909 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
910 		    frp, freebs_enqueue, KM_SLEEP);
911 
912 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
913 		return (mp);
914 	}
915 
916 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
917 	    frp, freebs_enqueue, KM_SLEEP));
918 }
919 
920 /*ARGSUSED*/
921 mblk_t *
922 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
923 {
924 	mblk_t *mp;
925 
926 	/*
927 	 * Note that this is structured to allow the common case (i.e.
928 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
929 	 * call optimization.
930 	 */
931 	if (!str_ftnever) {
932 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
933 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
934 
935 		if (mp != NULL)
936 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
937 		return (mp);
938 	}
939 
940 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
941 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
942 }
943 
944 /*ARGSUSED*/
945 mblk_t *
946 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
947 {
948 	mblk_t *mp;
949 
950 	/*
951 	 * Note that this is structured to allow the common case (i.e.
952 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
953 	 * call optimization.
954 	 */
955 	if (!str_ftnever) {
956 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
957 		    frp, freebs_enqueue, KM_NOSLEEP);
958 
959 		if (mp != NULL)
960 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
961 		return (mp);
962 	}
963 
964 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
965 	    frp, freebs_enqueue, KM_NOSLEEP));
966 }
967 
968 /*ARGSUSED*/
969 mblk_t *
970 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
971 {
972 	mblk_t *mp;
973 
974 	/*
975 	 * Note that this is structured to allow the common case (i.e.
976 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
977 	 * call optimization.
978 	 */
979 	if (!str_ftnever) {
980 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
981 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
982 
983 		if (mp != NULL)
984 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
985 		return (mp);
986 	}
987 
988 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
989 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
990 }
991 
992 static void
993 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
994 {
995 	bcache_t *bcp = dbp->db_cache;
996 
997 	ASSERT(dbp->db_mblk == mp);
998 	if (dbp->db_fthdr != NULL)
999 		str_ftfree(dbp);
1000 
1001 	/* set credp and projid to be 'unspecified' before returning to cache */
1002 	if (dbp->db_credp != NULL) {
1003 		crfree(dbp->db_credp);
1004 		dbp->db_credp = NULL;
1005 	}
1006 	dbp->db_cpid = -1;
1007 	dbp->db_struioflag = 0;
1008 	dbp->db_struioun.cksum.flags = 0;
1009 
1010 	mutex_enter(&bcp->mutex);
1011 	kmem_cache_free(bcp->dblk_cache, dbp);
1012 	bcp->alloc--;
1013 
1014 	if (bcp->alloc == 0 && bcp->destroy != 0) {
1015 		kmem_cache_destroy(bcp->dblk_cache);
1016 		kmem_cache_destroy(bcp->buffer_cache);
1017 		mutex_exit(&bcp->mutex);
1018 		mutex_destroy(&bcp->mutex);
1019 		kmem_free(bcp, sizeof (bcache_t));
1020 	} else {
1021 		mutex_exit(&bcp->mutex);
1022 	}
1023 }
1024 
1025 bcache_t *
1026 bcache_create(char *name, size_t size, uint_t align)
1027 {
1028 	bcache_t *bcp;
1029 	char buffer[255];
1030 
1031 	ASSERT((align & (align - 1)) == 0);
1032 
1033 	if ((bcp = kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == NULL)
1034 		return (NULL);
1035 
1036 	bcp->size = size;
1037 	bcp->align = align;
1038 	bcp->alloc = 0;
1039 	bcp->destroy = 0;
1040 
1041 	mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
1042 
1043 	(void) sprintf(buffer, "%s_buffer_cache", name);
1044 	bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
1045 	    NULL, NULL, NULL, 0);
1046 	(void) sprintf(buffer, "%s_dblk_cache", name);
1047 	bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
1048 	    DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
1049 	    NULL, (void *)bcp, NULL, 0);
1050 
1051 	return (bcp);
1052 }
1053 
1054 void
1055 bcache_destroy(bcache_t *bcp)
1056 {
1057 	ASSERT(bcp != NULL);
1058 
1059 	mutex_enter(&bcp->mutex);
1060 	if (bcp->alloc == 0) {
1061 		kmem_cache_destroy(bcp->dblk_cache);
1062 		kmem_cache_destroy(bcp->buffer_cache);
1063 		mutex_exit(&bcp->mutex);
1064 		mutex_destroy(&bcp->mutex);
1065 		kmem_free(bcp, sizeof (bcache_t));
1066 	} else {
1067 		bcp->destroy++;
1068 		mutex_exit(&bcp->mutex);
1069 	}
1070 }
1071 
1072 /*ARGSUSED*/
1073 mblk_t *
1074 bcache_allocb(bcache_t *bcp, uint_t pri)
1075 {
1076 	dblk_t *dbp;
1077 	mblk_t *mp = NULL;
1078 
1079 	ASSERT(bcp != NULL);
1080 
1081 	mutex_enter(&bcp->mutex);
1082 	if (bcp->destroy != 0) {
1083 		mutex_exit(&bcp->mutex);
1084 		goto out;
1085 	}
1086 
1087 	if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
1088 		mutex_exit(&bcp->mutex);
1089 		goto out;
1090 	}
1091 	bcp->alloc++;
1092 	mutex_exit(&bcp->mutex);
1093 
1094 	ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
1095 
1096 	mp = dbp->db_mblk;
1097 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1098 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
1099 	mp->b_rptr = mp->b_wptr = dbp->db_base;
1100 	mp->b_queue = NULL;
1101 	MBLK_BAND_FLAG_WORD(mp) = 0;
1102 	STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
1103 out:
1104 	FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
1105 
1106 	return (mp);
1107 }
1108 
1109 static void
1110 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
1111 {
1112 	ASSERT(dbp->db_mblk == mp);
1113 	if (dbp->db_fthdr != NULL)
1114 		str_ftfree(dbp);
1115 
1116 	/* set credp and projid to be 'unspecified' before returning to cache */
1117 	if (dbp->db_credp != NULL) {
1118 		crfree(dbp->db_credp);
1119 		dbp->db_credp = NULL;
1120 	}
1121 	dbp->db_cpid = -1;
1122 	dbp->db_struioflag = 0;
1123 	dbp->db_struioun.cksum.flags = 0;
1124 
1125 	kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
1126 	kmem_cache_free(dbp->db_cache, dbp);
1127 }
1128 
1129 static mblk_t *
1130 allocb_oversize(size_t size, int kmflags)
1131 {
1132 	mblk_t *mp;
1133 	void *buf;
1134 
1135 	size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
1136 	if ((buf = kmem_alloc(size, kmflags)) == NULL)
1137 		return (NULL);
1138 	if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
1139 	    &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
1140 		kmem_free(buf, size);
1141 
1142 	if (mp != NULL)
1143 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
1144 
1145 	return (mp);
1146 }
1147 
1148 mblk_t *
1149 allocb_tryhard(size_t target_size)
1150 {
1151 	size_t size;
1152 	mblk_t *bp;
1153 
1154 	for (size = target_size; size < target_size + 512;
1155 	    size += DBLK_CACHE_ALIGN)
1156 		if ((bp = allocb(size, BPRI_HI)) != NULL)
1157 			return (bp);
1158 	allocb_tryhard_fails++;
1159 	return (NULL);
1160 }
1161 
1162 /*
1163  * This routine is consolidation private for STREAMS internal use
1164  * This routine may only be called from sync routines (i.e., not
1165  * from put or service procedures).  It is located here (rather
1166  * than strsubr.c) so that we don't have to expose all of the
1167  * allocb() implementation details in header files.
1168  */
1169 mblk_t *
1170 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
1171 {
1172 	dblk_t *dbp;
1173 	mblk_t *mp;
1174 	size_t index;
1175 
1176 	index = (size -1) >> DBLK_SIZE_SHIFT;
1177 
1178 	if (flags & STR_NOSIG) {
1179 		if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
1180 			if (size != 0) {
1181 				mp = allocb_oversize(size, KM_SLEEP);
1182 				FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
1183 				    (uintptr_t)mp);
1184 				return (mp);
1185 			}
1186 			index = 0;
1187 		}
1188 
1189 		dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
1190 		mp = dbp->db_mblk;
1191 		DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1192 		mp->b_next = mp->b_prev = mp->b_cont = NULL;
1193 		mp->b_rptr = mp->b_wptr = dbp->db_base;
1194 		mp->b_queue = NULL;
1195 		MBLK_BAND_FLAG_WORD(mp) = 0;
1196 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
1197 
1198 		FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
1199 
1200 	} else {
1201 		while ((mp = allocb(size, pri)) == NULL) {
1202 			if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
1203 				return (NULL);
1204 		}
1205 	}
1206 
1207 	return (mp);
1208 }
1209 
1210 /*
1211  * Call function 'func' with 'arg' when a class zero block can
1212  * be allocated with priority 'pri'.
1213  */
1214 bufcall_id_t
1215 esbbcall(uint_t pri, void (*func)(void *), void *arg)
1216 {
1217 	return (bufcall(1, pri, func, arg));
1218 }
1219 
1220 /*
1221  * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
1222  * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
1223  * This provides consistency for all internal allocators of ioctl.
1224  */
1225 mblk_t *
1226 mkiocb(uint_t cmd)
1227 {
1228 	struct iocblk	*ioc;
1229 	mblk_t		*mp;
1230 
1231 	/*
1232 	 * Allocate enough space for any of the ioctl related messages.
1233 	 */
1234 	if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
1235 		return (NULL);
1236 
1237 	bzero(mp->b_rptr, sizeof (union ioctypes));
1238 
1239 	/*
1240 	 * Set the mblk_t information and ptrs correctly.
1241 	 */
1242 	mp->b_wptr += sizeof (struct iocblk);
1243 	mp->b_datap->db_type = M_IOCTL;
1244 
1245 	/*
1246 	 * Fill in the fields.
1247 	 */
1248 	ioc		= (struct iocblk *)mp->b_rptr;
1249 	ioc->ioc_cmd	= cmd;
1250 	ioc->ioc_cr	= kcred;
1251 	ioc->ioc_id	= getiocseqno();
1252 	ioc->ioc_flag	= IOC_NATIVE;
1253 	return (mp);
1254 }
1255 
1256 /*
1257  * test if block of given size can be allocated with a request of
1258  * the given priority.
1259  * 'pri' is no longer used, but is retained for compatibility.
1260  */
1261 /* ARGSUSED */
1262 int
1263 testb(size_t size, uint_t pri)
1264 {
1265 	return ((size + sizeof (dblk_t)) <= kmem_avail());
1266 }
1267 
1268 /*
1269  * Call function 'func' with argument 'arg' when there is a reasonably
1270  * good chance that a block of size 'size' can be allocated.
1271  * 'pri' is no longer used, but is retained for compatibility.
1272  */
1273 /* ARGSUSED */
1274 bufcall_id_t
1275 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
1276 {
1277 	static long bid = 1;	/* always odd to save checking for zero */
1278 	bufcall_id_t bc_id;
1279 	struct strbufcall *bcp;
1280 
1281 	if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
1282 		return (0);
1283 
1284 	bcp->bc_func = func;
1285 	bcp->bc_arg = arg;
1286 	bcp->bc_size = size;
1287 	bcp->bc_next = NULL;
1288 	bcp->bc_executor = NULL;
1289 
1290 	mutex_enter(&strbcall_lock);
1291 	/*
1292 	 * After bcp is linked into strbcalls and strbcall_lock is dropped there
1293 	 * should be no references to bcp since it may be freed by
1294 	 * runbufcalls(). Since bcp_id field is returned, we save its value in
1295 	 * the local var.
1296 	 */
1297 	bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2);	/* keep it odd */
1298 
1299 	/*
1300 	 * add newly allocated stream event to existing
1301 	 * linked list of events.
1302 	 */
1303 	if (strbcalls.bc_head == NULL) {
1304 		strbcalls.bc_head = strbcalls.bc_tail = bcp;
1305 	} else {
1306 		strbcalls.bc_tail->bc_next = bcp;
1307 		strbcalls.bc_tail = bcp;
1308 	}
1309 
1310 	cv_signal(&strbcall_cv);
1311 	mutex_exit(&strbcall_lock);
1312 	return (bc_id);
1313 }
1314 
1315 /*
1316  * Cancel a bufcall request.
1317  */
1318 void
1319 unbufcall(bufcall_id_t id)
1320 {
1321 	strbufcall_t *bcp, *pbcp;
1322 
1323 	mutex_enter(&strbcall_lock);
1324 again:
1325 	pbcp = NULL;
1326 	for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
1327 		if (id == bcp->bc_id)
1328 			break;
1329 		pbcp = bcp;
1330 	}
1331 	if (bcp) {
1332 		if (bcp->bc_executor != NULL) {
1333 			if (bcp->bc_executor != curthread) {
1334 				cv_wait(&bcall_cv, &strbcall_lock);
1335 				goto again;
1336 			}
1337 		} else {
1338 			if (pbcp)
1339 				pbcp->bc_next = bcp->bc_next;
1340 			else
1341 				strbcalls.bc_head = bcp->bc_next;
1342 			if (bcp == strbcalls.bc_tail)
1343 				strbcalls.bc_tail = pbcp;
1344 			kmem_free(bcp, sizeof (strbufcall_t));
1345 		}
1346 	}
1347 	mutex_exit(&strbcall_lock);
1348 }
1349 
1350 /*
1351  * Duplicate a message block by block (uses dupb), returning
1352  * a pointer to the duplicate message.
1353  * Returns a non-NULL value only if the entire message
1354  * was dup'd.
1355  */
1356 mblk_t *
1357 dupmsg(mblk_t *bp)
1358 {
1359 	mblk_t *head, *nbp;
1360 
1361 	if (!bp || !(nbp = head = dupb(bp)))
1362 		return (NULL);
1363 
1364 	while (bp->b_cont) {
1365 		if (!(nbp->b_cont = dupb(bp->b_cont))) {
1366 			freemsg(head);
1367 			return (NULL);
1368 		}
1369 		nbp = nbp->b_cont;
1370 		bp = bp->b_cont;
1371 	}
1372 	return (head);
1373 }
1374 
1375 #define	DUPB_NOLOAN(bp) \
1376 	((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
1377 	copyb((bp)) : dupb((bp)))
1378 
1379 mblk_t *
1380 dupmsg_noloan(mblk_t *bp)
1381 {
1382 	mblk_t *head, *nbp;
1383 
1384 	if (bp == NULL || DB_TYPE(bp) != M_DATA ||
1385 	    ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
1386 		return (NULL);
1387 
1388 	while (bp->b_cont) {
1389 		if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
1390 			freemsg(head);
1391 			return (NULL);
1392 		}
1393 		nbp = nbp->b_cont;
1394 		bp = bp->b_cont;
1395 	}
1396 	return (head);
1397 }
1398 
1399 /*
1400  * Copy data from message and data block to newly allocated message and
1401  * data block. Returns new message block pointer, or NULL if error.
1402  * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
1403  * as in the original even when db_base is not word aligned. (bug 1052877)
1404  */
1405 mblk_t *
1406 copyb(mblk_t *bp)
1407 {
1408 	mblk_t	*nbp;
1409 	dblk_t	*dp, *ndp;
1410 	uchar_t *base;
1411 	size_t	size;
1412 	size_t	unaligned;
1413 
1414 	ASSERT(bp->b_wptr >= bp->b_rptr);
1415 
1416 	dp = bp->b_datap;
1417 	if (dp->db_fthdr != NULL)
1418 		STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
1419 
1420 	/*
1421 	 * Special handling for Multidata message; this should be
1422 	 * removed once a copy-callback routine is made available.
1423 	 */
1424 	if (dp->db_type == M_MULTIDATA) {
1425 		cred_t *cr;
1426 
1427 		if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL)
1428 			return (NULL);
1429 
1430 		nbp->b_flag = bp->b_flag;
1431 		nbp->b_band = bp->b_band;
1432 		ndp = nbp->b_datap;
1433 
1434 		/* See comments below on potential issues. */
1435 		STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1436 
1437 		ASSERT(ndp->db_type == dp->db_type);
1438 		cr = dp->db_credp;
1439 		if (cr != NULL)
1440 			crhold(ndp->db_credp = cr);
1441 		ndp->db_cpid = dp->db_cpid;
1442 		return (nbp);
1443 	}
1444 
1445 	size = dp->db_lim - dp->db_base;
1446 	unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
1447 	if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
1448 		return (NULL);
1449 	nbp->b_flag = bp->b_flag;
1450 	nbp->b_band = bp->b_band;
1451 	ndp = nbp->b_datap;
1452 
1453 	/*
1454 	 * Well, here is a potential issue.  If we are trying to
1455 	 * trace a flow, and we copy the message, we might lose
1456 	 * information about where this message might have been.
1457 	 * So we should inherit the FT data.  On the other hand,
1458 	 * a user might be interested only in alloc to free data.
1459 	 * So I guess the real answer is to provide a tunable.
1460 	 */
1461 	STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1462 
1463 	base = ndp->db_base + unaligned;
1464 	bcopy(dp->db_base, ndp->db_base + unaligned, size);
1465 
1466 	nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
1467 	nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
1468 
1469 	return (nbp);
1470 }
1471 
1472 /*
1473  * Copy data from message to newly allocated message using new
1474  * data blocks.  Returns a pointer to the new message, or NULL if error.
1475  */
1476 mblk_t *
1477 copymsg(mblk_t *bp)
1478 {
1479 	mblk_t *head, *nbp;
1480 
1481 	if (!bp || !(nbp = head = copyb(bp)))
1482 		return (NULL);
1483 
1484 	while (bp->b_cont) {
1485 		if (!(nbp->b_cont = copyb(bp->b_cont))) {
1486 			freemsg(head);
1487 			return (NULL);
1488 		}
1489 		nbp = nbp->b_cont;
1490 		bp = bp->b_cont;
1491 	}
1492 	return (head);
1493 }
1494 
1495 /*
1496  * link a message block to tail of message
1497  */
1498 void
1499 linkb(mblk_t *mp, mblk_t *bp)
1500 {
1501 	ASSERT(mp && bp);
1502 
1503 	for (; mp->b_cont; mp = mp->b_cont)
1504 		;
1505 	mp->b_cont = bp;
1506 }
1507 
1508 /*
1509  * unlink a message block from head of message
1510  * return pointer to new message.
1511  * NULL if message becomes empty.
1512  */
1513 mblk_t *
1514 unlinkb(mblk_t *bp)
1515 {
1516 	mblk_t *bp1;
1517 
1518 	bp1 = bp->b_cont;
1519 	bp->b_cont = NULL;
1520 	return (bp1);
1521 }
1522 
1523 /*
1524  * remove a message block "bp" from message "mp"
1525  *
1526  * Return pointer to new message or NULL if no message remains.
1527  * Return -1 if bp is not found in message.
1528  */
1529 mblk_t *
1530 rmvb(mblk_t *mp, mblk_t *bp)
1531 {
1532 	mblk_t *tmp;
1533 	mblk_t *lastp = NULL;
1534 
1535 	ASSERT(mp && bp);
1536 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
1537 		if (tmp == bp) {
1538 			if (lastp)
1539 				lastp->b_cont = tmp->b_cont;
1540 			else
1541 				mp = tmp->b_cont;
1542 			tmp->b_cont = NULL;
1543 			return (mp);
1544 		}
1545 		lastp = tmp;
1546 	}
1547 	return ((mblk_t *)-1);
1548 }
1549 
1550 /*
1551  * Concatenate and align first len bytes of common
1552  * message type.  Len == -1, means concat everything.
1553  * Returns 1 on success, 0 on failure
1554  * After the pullup, mp points to the pulled up data.
1555  */
1556 int
1557 pullupmsg(mblk_t *mp, ssize_t len)
1558 {
1559 	mblk_t *bp, *b_cont;
1560 	dblk_t *dbp;
1561 	ssize_t n;
1562 
1563 	ASSERT(mp->b_datap->db_ref > 0);
1564 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
1565 
1566 	/*
1567 	 * We won't handle Multidata message, since it contains
1568 	 * metadata which this function has no knowledge of; we
1569 	 * assert on DEBUG, and return failure otherwise.
1570 	 */
1571 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1572 	if (mp->b_datap->db_type == M_MULTIDATA)
1573 		return (0);
1574 
1575 	if (len == -1) {
1576 		if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
1577 			return (1);
1578 		len = xmsgsize(mp);
1579 	} else {
1580 		ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
1581 		ASSERT(first_mblk_len >= 0);
1582 		/*
1583 		 * If the length is less than that of the first mblk,
1584 		 * we want to pull up the message into an aligned mblk.
1585 		 * Though not part of the spec, some callers assume it.
1586 		 */
1587 		if (len <= first_mblk_len) {
1588 			if (str_aligned(mp->b_rptr))
1589 				return (1);
1590 			len = first_mblk_len;
1591 		} else if (xmsgsize(mp) < len)
1592 			return (0);
1593 	}
1594 
1595 	if ((bp = allocb_tmpl(len, mp)) == NULL)
1596 		return (0);
1597 
1598 	dbp = bp->b_datap;
1599 	*bp = *mp;		/* swap mblks so bp heads the old msg... */
1600 	mp->b_datap = dbp;	/* ... and mp heads the new message */
1601 	mp->b_datap->db_mblk = mp;
1602 	bp->b_datap->db_mblk = bp;
1603 	mp->b_rptr = mp->b_wptr = dbp->db_base;
1604 
1605 	do {
1606 		ASSERT(bp->b_datap->db_ref > 0);
1607 		ASSERT(bp->b_wptr >= bp->b_rptr);
1608 		n = MIN(bp->b_wptr - bp->b_rptr, len);
1609 		ASSERT(n >= 0);		/* allow zero-length mblk_t's */
1610 		if (n > 0)
1611 			bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
1612 		mp->b_wptr += n;
1613 		bp->b_rptr += n;
1614 		len -= n;
1615 		if (bp->b_rptr != bp->b_wptr)
1616 			break;
1617 		b_cont = bp->b_cont;
1618 		freeb(bp);
1619 		bp = b_cont;
1620 	} while (len && bp);
1621 
1622 	mp->b_cont = bp;	/* tack on whatever wasn't pulled up */
1623 
1624 	return (1);
1625 }
1626 
1627 /*
1628  * Concatenate and align at least the first len bytes of common message
1629  * type.  Len == -1 means concatenate everything.  The original message is
1630  * unaltered.  Returns a pointer to a new message on success, otherwise
1631  * returns NULL.
1632  */
1633 mblk_t *
1634 msgpullup(mblk_t *mp, ssize_t len)
1635 {
1636 	mblk_t	*newmp;
1637 	ssize_t	totlen;
1638 	ssize_t	n;
1639 
1640 	/*
1641 	 * We won't handle Multidata message, since it contains
1642 	 * metadata which this function has no knowledge of; we
1643 	 * assert on DEBUG, and return failure otherwise.
1644 	 */
1645 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1646 	if (mp->b_datap->db_type == M_MULTIDATA)
1647 		return (NULL);
1648 
1649 	totlen = xmsgsize(mp);
1650 
1651 	if ((len > 0) && (len > totlen))
1652 		return (NULL);
1653 
1654 	/*
1655 	 * Copy all of the first msg type into one new mblk, then dupmsg
1656 	 * and link the rest onto this.
1657 	 */
1658 
1659 	len = totlen;
1660 
1661 	if ((newmp = allocb_tmpl(len, mp)) == NULL)
1662 		return (NULL);
1663 
1664 	newmp->b_flag = mp->b_flag;
1665 	newmp->b_band = mp->b_band;
1666 
1667 	while (len > 0) {
1668 		n = mp->b_wptr - mp->b_rptr;
1669 		ASSERT(n >= 0);		/* allow zero-length mblk_t's */
1670 		if (n > 0)
1671 			bcopy(mp->b_rptr, newmp->b_wptr, n);
1672 		newmp->b_wptr += n;
1673 		len -= n;
1674 		mp = mp->b_cont;
1675 	}
1676 
1677 	if (mp != NULL) {
1678 		newmp->b_cont = dupmsg(mp);
1679 		if (newmp->b_cont == NULL) {
1680 			freemsg(newmp);
1681 			return (NULL);
1682 		}
1683 	}
1684 
1685 	return (newmp);
1686 }
1687 
1688 /*
1689  * Trim bytes from message
1690  *  len > 0, trim from head
1691  *  len < 0, trim from tail
1692  * Returns 1 on success, 0 on failure.
1693  */
1694 int
1695 adjmsg(mblk_t *mp, ssize_t len)
1696 {
1697 	mblk_t *bp;
1698 	mblk_t *save_bp = NULL;
1699 	mblk_t *prev_bp;
1700 	mblk_t *bcont;
1701 	unsigned char type;
1702 	ssize_t n;
1703 	int fromhead;
1704 	int first;
1705 
1706 	ASSERT(mp != NULL);
1707 	/*
1708 	 * We won't handle Multidata message, since it contains
1709 	 * metadata which this function has no knowledge of; we
1710 	 * assert on DEBUG, and return failure otherwise.
1711 	 */
1712 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1713 	if (mp->b_datap->db_type == M_MULTIDATA)
1714 		return (0);
1715 
1716 	if (len < 0) {
1717 		fromhead = 0;
1718 		len = -len;
1719 	} else {
1720 		fromhead = 1;
1721 	}
1722 
1723 	if (xmsgsize(mp) < len)
1724 		return (0);
1725 
1726 	if (fromhead) {
1727 		first = 1;
1728 		while (len) {
1729 			ASSERT(mp->b_wptr >= mp->b_rptr);
1730 			n = MIN(mp->b_wptr - mp->b_rptr, len);
1731 			mp->b_rptr += n;
1732 			len -= n;
1733 
1734 			/*
1735 			 * If this is not the first zero length
1736 			 * message remove it
1737 			 */
1738 			if (!first && (mp->b_wptr == mp->b_rptr)) {
1739 				bcont = mp->b_cont;
1740 				freeb(mp);
1741 				mp = save_bp->b_cont = bcont;
1742 			} else {
1743 				save_bp = mp;
1744 				mp = mp->b_cont;
1745 			}
1746 			first = 0;
1747 		}
1748 	} else {
1749 		type = mp->b_datap->db_type;
1750 		while (len) {
1751 			bp = mp;
1752 			save_bp = NULL;
1753 
1754 			/*
1755 			 * Find the last message of same type
1756 			 */
1757 			while (bp && bp->b_datap->db_type == type) {
1758 				ASSERT(bp->b_wptr >= bp->b_rptr);
1759 				prev_bp = save_bp;
1760 				save_bp = bp;
1761 				bp = bp->b_cont;
1762 			}
1763 			if (save_bp == NULL)
1764 				break;
1765 			n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
1766 			save_bp->b_wptr -= n;
1767 			len -= n;
1768 
1769 			/*
1770 			 * If this is not the first message
1771 			 * and we have taken away everything
1772 			 * from this message, remove it
1773 			 */
1774 
1775 			if ((save_bp != mp) &&
1776 			    (save_bp->b_wptr == save_bp->b_rptr)) {
1777 				bcont = save_bp->b_cont;
1778 				freeb(save_bp);
1779 				prev_bp->b_cont = bcont;
1780 			}
1781 		}
1782 	}
1783 	return (1);
1784 }
1785 
1786 /*
1787  * get number of data bytes in message
1788  */
1789 size_t
1790 msgdsize(mblk_t *bp)
1791 {
1792 	size_t count = 0;
1793 
1794 	for (; bp; bp = bp->b_cont)
1795 		if (bp->b_datap->db_type == M_DATA) {
1796 			ASSERT(bp->b_wptr >= bp->b_rptr);
1797 			count += bp->b_wptr - bp->b_rptr;
1798 		}
1799 	return (count);
1800 }
1801 
1802 /*
1803  * Get a message off head of queue
1804  *
1805  * If queue has no buffers then mark queue
1806  * with QWANTR. (queue wants to be read by
1807  * someone when data becomes available)
1808  *
1809  * If there is something to take off then do so.
1810  * If queue falls below hi water mark turn off QFULL
1811  * flag.  Decrement weighted count of queue.
1812  * Also turn off QWANTR because queue is being read.
1813  *
1814  * The queue count is maintained on a per-band basis.
1815  * Priority band 0 (normal messages) uses q_count,
1816  * q_lowat, etc.  Non-zero priority bands use the
1817  * fields in their respective qband structures
1818  * (qb_count, qb_lowat, etc.)  All messages appear
1819  * on the same list, linked via their b_next pointers.
1820  * q_first is the head of the list.  q_count does
1821  * not reflect the size of all the messages on the
1822  * queue.  It only reflects those messages in the
1823  * normal band of flow.  The one exception to this
1824  * deals with high priority messages.  They are in
1825  * their own conceptual "band", but are accounted
1826  * against q_count.
1827  *
1828  * If queue count is below the lo water mark and QWANTW
1829  * is set, enable the closest backq which has a service
1830  * procedure and turn off the QWANTW flag.
1831  *
1832  * getq could be built on top of rmvq, but isn't because
1833  * of performance considerations.
1834  *
1835  * A note on the use of q_count and q_mblkcnt:
1836  *   q_count is the traditional byte count for messages that
1837  *   have been put on a queue.  Documentation tells us that
1838  *   we shouldn't rely on that count, but some drivers/modules
1839  *   do.  What was needed, however, is a mechanism to prevent
1840  *   runaway streams from consuming all of the resources,
1841  *   and particularly be able to flow control zero-length
1842  *   messages.  q_mblkcnt is used for this purpose.  It
1843  *   counts the number of mblk's that are being put on
1844  *   the queue.  The intention here, is that each mblk should
1845  *   contain one byte of data and, for the purpose of
1846  *   flow-control, logically does.  A queue will become
1847  *   full when EITHER of these values (q_count and q_mblkcnt)
1848  *   reach the highwater mark.  It will clear when BOTH
1849  *   of them drop below the highwater mark.  And it will
1850  *   backenable when BOTH of them drop below the lowwater
1851  *   mark.
1852  *   With this algorithm, a driver/module might be able
1853  *   to find a reasonably accurate q_count, and the
1854  *   framework can still try and limit resource usage.
1855  */
1856 mblk_t *
1857 getq(queue_t *q)
1858 {
1859 	mblk_t *bp;
1860 	uchar_t band = 0;
1861 
1862 	bp = getq_noenab(q, 0);
1863 	if (bp != NULL)
1864 		band = bp->b_band;
1865 
1866 	/*
1867 	 * Inlined from qbackenable().
1868 	 * Quick check without holding the lock.
1869 	 */
1870 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1871 		return (bp);
1872 
1873 	qbackenable(q, band);
1874 	return (bp);
1875 }
1876 
1877 /*
1878  * Calculate number of data bytes in a single data message block taking
1879  * multidata messages into account.
1880  */
1881 
1882 #define	ADD_MBLK_SIZE(mp, size) 					\
1883 	if (DB_TYPE(mp) != M_MULTIDATA) {				\
1884 		(size) += MBLKL(mp);					\
1885 	} else {							\
1886 		uint_t	pinuse;						\
1887 									\
1888 		mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse);	\
1889 		(size) += pinuse;					\
1890 	}
1891 
1892 /*
1893  * Returns the number of bytes in a message (a message is defined as a
1894  * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we
1895  * also return the number of distinct mblks in the message.
1896  */
1897 int
1898 mp_cont_len(mblk_t *bp, int *mblkcnt)
1899 {
1900 	mblk_t	*mp;
1901 	int	mblks = 0;
1902 	int	bytes = 0;
1903 
1904 	for (mp = bp; mp != NULL; mp = mp->b_cont) {
1905 		ADD_MBLK_SIZE(mp, bytes);
1906 		mblks++;
1907 	}
1908 
1909 	if (mblkcnt != NULL)
1910 		*mblkcnt = mblks;
1911 
1912 	return (bytes);
1913 }
1914 
1915 /*
1916  * Like getq() but does not backenable.  This is used by the stream
1917  * head when a putback() is likely.  The caller must call qbackenable()
1918  * after it is done with accessing the queue.
1919  * The rbytes arguments to getq_noneab() allows callers to specify a
1920  * the maximum number of bytes to return. If the current amount on the
1921  * queue is less than this then the entire message will be returned.
1922  * A value of 0 returns the entire message and is equivalent to the old
1923  * default behaviour prior to the addition of the rbytes argument.
1924  */
1925 mblk_t *
1926 getq_noenab(queue_t *q, ssize_t rbytes)
1927 {
1928 	mblk_t *bp, *mp1;
1929 	mblk_t *mp2 = NULL;
1930 	qband_t *qbp;
1931 	kthread_id_t freezer;
1932 	int	bytecnt = 0, mblkcnt = 0;
1933 
1934 	/* freezestr should allow its caller to call getq/putq */
1935 	freezer = STREAM(q)->sd_freezer;
1936 	if (freezer == curthread) {
1937 		ASSERT(frozenstr(q));
1938 		ASSERT(MUTEX_HELD(QLOCK(q)));
1939 	} else
1940 		mutex_enter(QLOCK(q));
1941 
1942 	if ((bp = q->q_first) == 0) {
1943 		q->q_flag |= QWANTR;
1944 	} else {
1945 		/*
1946 		 * If the caller supplied a byte threshold and there is
1947 		 * more than this amount on the queue then break up the
1948 		 * the message appropriately.  We can only safely do
1949 		 * this for M_DATA messages.
1950 		 */
1951 		if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) &&
1952 		    (q->q_count > rbytes)) {
1953 			/*
1954 			 * Inline version of mp_cont_len() which terminates
1955 			 * when we meet or exceed rbytes.
1956 			 */
1957 			for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) {
1958 				mblkcnt++;
1959 				ADD_MBLK_SIZE(mp1, bytecnt);
1960 				if (bytecnt  >= rbytes)
1961 					break;
1962 			}
1963 			/*
1964 			 * We need to account for the following scenarios:
1965 			 *
1966 			 * 1) Too much data in the first message:
1967 			 *	mp1 will be the mblk which puts us over our
1968 			 *	byte limit.
1969 			 * 2) Not enough data in the first message:
1970 			 *	mp1 will be NULL.
1971 			 * 3) Exactly the right amount of data contained within
1972 			 *    whole mblks:
1973 			 *	mp1->b_cont will be where we break the message.
1974 			 */
1975 			if (bytecnt > rbytes) {
1976 				/*
1977 				 * Dup/copy mp1 and put what we don't need
1978 				 * back onto the queue. Adjust the read/write
1979 				 * and continuation pointers appropriately
1980 				 * and decrement the current mblk count to
1981 				 * reflect we are putting an mblk back onto
1982 				 * the queue.
1983 				 * When adjusting the message pointers, it's
1984 				 * OK to use the existing bytecnt and the
1985 				 * requested amount (rbytes) to calculate the
1986 				 * the new write offset (b_wptr) of what we
1987 				 * are taking. However, we  cannot use these
1988 				 * values when calculating the read offset of
1989 				 * the mblk we are putting back on the queue.
1990 				 * This is because the begining (b_rptr) of the
1991 				 * mblk represents some arbitrary point within
1992 				 * the message.
1993 				 * It's simplest to do this by advancing b_rptr
1994 				 * by the new length of mp1 as we don't have to
1995 				 * remember any intermediate state.
1996 				 */
1997 				ASSERT(mp1 != NULL);
1998 				mblkcnt--;
1999 				if ((mp2 = dupb(mp1)) == NULL &&
2000 				    (mp2 = copyb(mp1)) == NULL) {
2001 					bytecnt = mblkcnt = 0;
2002 					goto dup_failed;
2003 				}
2004 				mp2->b_cont = mp1->b_cont;
2005 				mp1->b_wptr -= bytecnt - rbytes;
2006 				mp2->b_rptr += mp1->b_wptr - mp1->b_rptr;
2007 				mp1->b_cont = NULL;
2008 				bytecnt = rbytes;
2009 			} else {
2010 				/*
2011 				 * Either there is not enough data in the first
2012 				 * message or there is no excess data to deal
2013 				 * with. If mp1 is NULL, we are taking the
2014 				 * whole message. No need to do anything.
2015 				 * Otherwise we assign mp1->b_cont to mp2 as
2016 				 * we will be putting this back onto the head of
2017 				 * the queue.
2018 				 */
2019 				if (mp1 != NULL) {
2020 					mp2 = mp1->b_cont;
2021 					mp1->b_cont = NULL;
2022 				}
2023 			}
2024 			/*
2025 			 * If mp2 is not NULL then we have part of the message
2026 			 * to put back onto the queue.
2027 			 */
2028 			if (mp2 != NULL) {
2029 				if ((mp2->b_next = bp->b_next) == NULL)
2030 					q->q_last = mp2;
2031 				else
2032 					bp->b_next->b_prev = mp2;
2033 				q->q_first = mp2;
2034 			} else {
2035 				if ((q->q_first = bp->b_next) == NULL)
2036 					q->q_last = NULL;
2037 				else
2038 					q->q_first->b_prev = NULL;
2039 			}
2040 		} else {
2041 			/*
2042 			 * Either no byte threshold was supplied, there is
2043 			 * not enough on the queue or we failed to
2044 			 * duplicate/copy a data block. In these cases we
2045 			 * just take the entire first message.
2046 			 */
2047 dup_failed:
2048 			bytecnt = mp_cont_len(bp, &mblkcnt);
2049 			if ((q->q_first = bp->b_next) == NULL)
2050 				q->q_last = NULL;
2051 			else
2052 				q->q_first->b_prev = NULL;
2053 		}
2054 		if (bp->b_band == 0) {
2055 			q->q_count -= bytecnt;
2056 			q->q_mblkcnt -= mblkcnt;
2057 			if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2058 			    (q->q_mblkcnt < q->q_hiwat))) {
2059 				q->q_flag &= ~QFULL;
2060 			}
2061 		} else {
2062 			int i;
2063 
2064 			ASSERT(bp->b_band <= q->q_nband);
2065 			ASSERT(q->q_bandp != NULL);
2066 			ASSERT(MUTEX_HELD(QLOCK(q)));
2067 			qbp = q->q_bandp;
2068 			i = bp->b_band;
2069 			while (--i > 0)
2070 				qbp = qbp->qb_next;
2071 			if (qbp->qb_first == qbp->qb_last) {
2072 				qbp->qb_first = NULL;
2073 				qbp->qb_last = NULL;
2074 			} else {
2075 				qbp->qb_first = bp->b_next;
2076 			}
2077 			qbp->qb_count -= bytecnt;
2078 			qbp->qb_mblkcnt -= mblkcnt;
2079 			if (qbp->qb_mblkcnt == 0 ||
2080 			    ((qbp->qb_count < qbp->qb_hiwat) &&
2081 			    (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2082 				qbp->qb_flag &= ~QB_FULL;
2083 			}
2084 		}
2085 		q->q_flag &= ~QWANTR;
2086 		bp->b_next = NULL;
2087 		bp->b_prev = NULL;
2088 	}
2089 	if (freezer != curthread)
2090 		mutex_exit(QLOCK(q));
2091 
2092 	STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL);
2093 
2094 	return (bp);
2095 }
2096 
2097 /*
2098  * Determine if a backenable is needed after removing a message in the
2099  * specified band.
2100  * NOTE: This routine assumes that something like getq_noenab() has been
2101  * already called.
2102  *
2103  * For the read side it is ok to hold sd_lock across calling this (and the
2104  * stream head often does).
2105  * But for the write side strwakeq might be invoked and it acquires sd_lock.
2106  */
2107 void
2108 qbackenable(queue_t *q, uchar_t band)
2109 {
2110 	int backenab = 0;
2111 	qband_t *qbp;
2112 	kthread_id_t freezer;
2113 
2114 	ASSERT(q);
2115 	ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
2116 
2117 	/*
2118 	 * Quick check without holding the lock.
2119 	 * OK since after getq() has lowered the q_count these flags
2120 	 * would not change unless either the qbackenable() is done by
2121 	 * another thread (which is ok) or the queue has gotten QFULL
2122 	 * in which case another backenable will take place when the queue
2123 	 * drops below q_lowat.
2124 	 */
2125 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
2126 		return;
2127 
2128 	/* freezestr should allow its caller to call getq/putq */
2129 	freezer = STREAM(q)->sd_freezer;
2130 	if (freezer == curthread) {
2131 		ASSERT(frozenstr(q));
2132 		ASSERT(MUTEX_HELD(QLOCK(q)));
2133 	} else
2134 		mutex_enter(QLOCK(q));
2135 
2136 	if (band == 0) {
2137 		if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
2138 		    q->q_mblkcnt < q->q_lowat)) {
2139 			backenab = q->q_flag & (QWANTW|QWANTWSYNC);
2140 		}
2141 	} else {
2142 		int i;
2143 
2144 		ASSERT((unsigned)band <= q->q_nband);
2145 		ASSERT(q->q_bandp != NULL);
2146 
2147 		qbp = q->q_bandp;
2148 		i = band;
2149 		while (--i > 0)
2150 			qbp = qbp->qb_next;
2151 
2152 		if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
2153 		    qbp->qb_mblkcnt < qbp->qb_lowat)) {
2154 			backenab = qbp->qb_flag & QB_WANTW;
2155 		}
2156 	}
2157 
2158 	if (backenab == 0) {
2159 		if (freezer != curthread)
2160 			mutex_exit(QLOCK(q));
2161 		return;
2162 	}
2163 
2164 	/* Have to drop the lock across strwakeq and backenable */
2165 	if (backenab & QWANTWSYNC)
2166 		q->q_flag &= ~QWANTWSYNC;
2167 	if (backenab & (QWANTW|QB_WANTW)) {
2168 		if (band != 0)
2169 			qbp->qb_flag &= ~QB_WANTW;
2170 		else {
2171 			q->q_flag &= ~QWANTW;
2172 		}
2173 	}
2174 
2175 	if (freezer != curthread)
2176 		mutex_exit(QLOCK(q));
2177 
2178 	if (backenab & QWANTWSYNC)
2179 		strwakeq(q, QWANTWSYNC);
2180 	if (backenab & (QWANTW|QB_WANTW))
2181 		backenable(q, band);
2182 }
2183 
2184 /*
2185  * Remove a message from a queue.  The queue count and other
2186  * flow control parameters are adjusted and the back queue
2187  * enabled if necessary.
2188  *
2189  * rmvq can be called with the stream frozen, but other utility functions
2190  * holding QLOCK, and by streams modules without any locks/frozen.
2191  */
2192 void
2193 rmvq(queue_t *q, mblk_t *mp)
2194 {
2195 	ASSERT(mp != NULL);
2196 
2197 	rmvq_noenab(q, mp);
2198 	if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
2199 		/*
2200 		 * qbackenable can handle a frozen stream but not a "random"
2201 		 * qlock being held. Drop lock across qbackenable.
2202 		 */
2203 		mutex_exit(QLOCK(q));
2204 		qbackenable(q, mp->b_band);
2205 		mutex_enter(QLOCK(q));
2206 	} else {
2207 		qbackenable(q, mp->b_band);
2208 	}
2209 }
2210 
2211 /*
2212  * Like rmvq() but without any backenabling.
2213  * This exists to handle SR_CONSOL_DATA in strrput().
2214  */
2215 void
2216 rmvq_noenab(queue_t *q, mblk_t *mp)
2217 {
2218 	int i;
2219 	qband_t *qbp = NULL;
2220 	kthread_id_t freezer;
2221 	int	bytecnt = 0, mblkcnt = 0;
2222 
2223 	freezer = STREAM(q)->sd_freezer;
2224 	if (freezer == curthread) {
2225 		ASSERT(frozenstr(q));
2226 		ASSERT(MUTEX_HELD(QLOCK(q)));
2227 	} else if (MUTEX_HELD(QLOCK(q))) {
2228 		/* Don't drop lock on exit */
2229 		freezer = curthread;
2230 	} else
2231 		mutex_enter(QLOCK(q));
2232 
2233 	ASSERT(mp->b_band <= q->q_nband);
2234 	if (mp->b_band != 0) {		/* Adjust band pointers */
2235 		ASSERT(q->q_bandp != NULL);
2236 		qbp = q->q_bandp;
2237 		i = mp->b_band;
2238 		while (--i > 0)
2239 			qbp = qbp->qb_next;
2240 		if (mp == qbp->qb_first) {
2241 			if (mp->b_next && mp->b_band == mp->b_next->b_band)
2242 				qbp->qb_first = mp->b_next;
2243 			else
2244 				qbp->qb_first = NULL;
2245 		}
2246 		if (mp == qbp->qb_last) {
2247 			if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
2248 				qbp->qb_last = mp->b_prev;
2249 			else
2250 				qbp->qb_last = NULL;
2251 		}
2252 	}
2253 
2254 	/*
2255 	 * Remove the message from the list.
2256 	 */
2257 	if (mp->b_prev)
2258 		mp->b_prev->b_next = mp->b_next;
2259 	else
2260 		q->q_first = mp->b_next;
2261 	if (mp->b_next)
2262 		mp->b_next->b_prev = mp->b_prev;
2263 	else
2264 		q->q_last = mp->b_prev;
2265 	mp->b_next = NULL;
2266 	mp->b_prev = NULL;
2267 
2268 	/* Get the size of the message for q_count accounting */
2269 	bytecnt = mp_cont_len(mp, &mblkcnt);
2270 
2271 	if (mp->b_band == 0) {		/* Perform q_count accounting */
2272 		q->q_count -= bytecnt;
2273 		q->q_mblkcnt -= mblkcnt;
2274 		if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2275 		    (q->q_mblkcnt < q->q_hiwat))) {
2276 			q->q_flag &= ~QFULL;
2277 		}
2278 	} else {			/* Perform qb_count accounting */
2279 		qbp->qb_count -= bytecnt;
2280 		qbp->qb_mblkcnt -= mblkcnt;
2281 		if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) &&
2282 		    (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2283 			qbp->qb_flag &= ~QB_FULL;
2284 		}
2285 	}
2286 	if (freezer != curthread)
2287 		mutex_exit(QLOCK(q));
2288 
2289 	STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL);
2290 }
2291 
2292 /*
2293  * Empty a queue.
2294  * If flag is set, remove all messages.  Otherwise, remove
2295  * only non-control messages.  If queue falls below its low
2296  * water mark, and QWANTW is set, enable the nearest upstream
2297  * service procedure.
2298  *
2299  * Historical note: when merging the M_FLUSH code in strrput with this
2300  * code one difference was discovered. flushq did not have a check
2301  * for q_lowat == 0 in the backenabling test.
2302  *
2303  * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
2304  * if one exists on the queue.
2305  */
2306 void
2307 flushq_common(queue_t *q, int flag, int pcproto_flag)
2308 {
2309 	mblk_t *mp, *nmp;
2310 	qband_t *qbp;
2311 	int backenab = 0;
2312 	unsigned char bpri;
2313 	unsigned char	qbf[NBAND];	/* band flushing backenable flags */
2314 
2315 	if (q->q_first == NULL)
2316 		return;
2317 
2318 	mutex_enter(QLOCK(q));
2319 	mp = q->q_first;
2320 	q->q_first = NULL;
2321 	q->q_last = NULL;
2322 	q->q_count = 0;
2323 	q->q_mblkcnt = 0;
2324 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2325 		qbp->qb_first = NULL;
2326 		qbp->qb_last = NULL;
2327 		qbp->qb_count = 0;
2328 		qbp->qb_mblkcnt = 0;
2329 		qbp->qb_flag &= ~QB_FULL;
2330 	}
2331 	q->q_flag &= ~QFULL;
2332 	mutex_exit(QLOCK(q));
2333 	while (mp) {
2334 		nmp = mp->b_next;
2335 		mp->b_next = mp->b_prev = NULL;
2336 
2337 		STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL);
2338 
2339 		if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
2340 			(void) putq(q, mp);
2341 		else if (flag || datamsg(mp->b_datap->db_type))
2342 			freemsg(mp);
2343 		else
2344 			(void) putq(q, mp);
2345 		mp = nmp;
2346 	}
2347 	bpri = 1;
2348 	mutex_enter(QLOCK(q));
2349 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2350 		if ((qbp->qb_flag & QB_WANTW) &&
2351 		    (((qbp->qb_count < qbp->qb_lowat) &&
2352 		    (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
2353 		    qbp->qb_lowat == 0)) {
2354 			qbp->qb_flag &= ~QB_WANTW;
2355 			backenab = 1;
2356 			qbf[bpri] = 1;
2357 		} else
2358 			qbf[bpri] = 0;
2359 		bpri++;
2360 	}
2361 	ASSERT(bpri == (unsigned char)(q->q_nband + 1));
2362 	if ((q->q_flag & QWANTW) &&
2363 	    (((q->q_count < q->q_lowat) &&
2364 	    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2365 		q->q_flag &= ~QWANTW;
2366 		backenab = 1;
2367 		qbf[0] = 1;
2368 	} else
2369 		qbf[0] = 0;
2370 
2371 	/*
2372 	 * If any band can now be written to, and there is a writer
2373 	 * for that band, then backenable the closest service procedure.
2374 	 */
2375 	if (backenab) {
2376 		mutex_exit(QLOCK(q));
2377 		for (bpri = q->q_nband; bpri != 0; bpri--)
2378 			if (qbf[bpri])
2379 				backenable(q, bpri);
2380 		if (qbf[0])
2381 			backenable(q, 0);
2382 	} else
2383 		mutex_exit(QLOCK(q));
2384 }
2385 
2386 /*
2387  * The real flushing takes place in flushq_common. This is done so that
2388  * a flag which specifies whether or not M_PCPROTO messages should be flushed
2389  * or not. Currently the only place that uses this flag is the stream head.
2390  */
2391 void
2392 flushq(queue_t *q, int flag)
2393 {
2394 	flushq_common(q, flag, 0);
2395 }
2396 
2397 /*
2398  * Flush the queue of messages of the given priority band.
2399  * There is some duplication of code between flushq and flushband.
2400  * This is because we want to optimize the code as much as possible.
2401  * The assumption is that there will be more messages in the normal
2402  * (priority 0) band than in any other.
2403  *
2404  * Historical note: when merging the M_FLUSH code in strrput with this
2405  * code one difference was discovered. flushband had an extra check for
2406  * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
2407  * case. That check does not match the man page for flushband and was not
2408  * in the strrput flush code hence it was removed.
2409  */
2410 void
2411 flushband(queue_t *q, unsigned char pri, int flag)
2412 {
2413 	mblk_t *mp;
2414 	mblk_t *nmp;
2415 	mblk_t *last;
2416 	qband_t *qbp;
2417 	int band;
2418 
2419 	ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
2420 	if (pri > q->q_nband) {
2421 		return;
2422 	}
2423 	mutex_enter(QLOCK(q));
2424 	if (pri == 0) {
2425 		mp = q->q_first;
2426 		q->q_first = NULL;
2427 		q->q_last = NULL;
2428 		q->q_count = 0;
2429 		q->q_mblkcnt = 0;
2430 		for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2431 			qbp->qb_first = NULL;
2432 			qbp->qb_last = NULL;
2433 			qbp->qb_count = 0;
2434 			qbp->qb_mblkcnt = 0;
2435 			qbp->qb_flag &= ~QB_FULL;
2436 		}
2437 		q->q_flag &= ~QFULL;
2438 		mutex_exit(QLOCK(q));
2439 		while (mp) {
2440 			nmp = mp->b_next;
2441 			mp->b_next = mp->b_prev = NULL;
2442 			if ((mp->b_band == 0) &&
2443 			    ((flag == FLUSHALL) ||
2444 			    datamsg(mp->b_datap->db_type)))
2445 				freemsg(mp);
2446 			else
2447 				(void) putq(q, mp);
2448 			mp = nmp;
2449 		}
2450 		mutex_enter(QLOCK(q));
2451 		if ((q->q_flag & QWANTW) &&
2452 		    (((q->q_count < q->q_lowat) &&
2453 		    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2454 			q->q_flag &= ~QWANTW;
2455 			mutex_exit(QLOCK(q));
2456 
2457 			backenable(q, pri);
2458 		} else
2459 			mutex_exit(QLOCK(q));
2460 	} else {	/* pri != 0 */
2461 		boolean_t flushed = B_FALSE;
2462 		band = pri;
2463 
2464 		ASSERT(MUTEX_HELD(QLOCK(q)));
2465 		qbp = q->q_bandp;
2466 		while (--band > 0)
2467 			qbp = qbp->qb_next;
2468 		mp = qbp->qb_first;
2469 		if (mp == NULL) {
2470 			mutex_exit(QLOCK(q));
2471 			return;
2472 		}
2473 		last = qbp->qb_last->b_next;
2474 		/*
2475 		 * rmvq_noenab() and freemsg() are called for each mblk that
2476 		 * meets the criteria.  The loop is executed until the last
2477 		 * mblk has been processed.
2478 		 */
2479 		while (mp != last) {
2480 			ASSERT(mp->b_band == pri);
2481 			nmp = mp->b_next;
2482 			if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
2483 				rmvq_noenab(q, mp);
2484 				freemsg(mp);
2485 				flushed = B_TRUE;
2486 			}
2487 			mp = nmp;
2488 		}
2489 		mutex_exit(QLOCK(q));
2490 
2491 		/*
2492 		 * If any mblk(s) has been freed, we know that qbackenable()
2493 		 * will need to be called.
2494 		 */
2495 		if (flushed)
2496 			qbackenable(q, pri);
2497 	}
2498 }
2499 
2500 /*
2501  * Return 1 if the queue is not full.  If the queue is full, return
2502  * 0 (may not put message) and set QWANTW flag (caller wants to write
2503  * to the queue).
2504  */
2505 int
2506 canput(queue_t *q)
2507 {
2508 	TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
2509 
2510 	/* this is for loopback transports, they should not do a canput */
2511 	ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
2512 
2513 	/* Find next forward module that has a service procedure */
2514 	q = q->q_nfsrv;
2515 
2516 	if (!(q->q_flag & QFULL)) {
2517 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2518 		return (1);
2519 	}
2520 	mutex_enter(QLOCK(q));
2521 	if (q->q_flag & QFULL) {
2522 		q->q_flag |= QWANTW;
2523 		mutex_exit(QLOCK(q));
2524 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
2525 		return (0);
2526 	}
2527 	mutex_exit(QLOCK(q));
2528 	TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2529 	return (1);
2530 }
2531 
2532 /*
2533  * This is the new canput for use with priority bands.  Return 1 if the
2534  * band is not full.  If the band is full, return 0 (may not put message)
2535  * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
2536  * write to the queue).
2537  */
2538 int
2539 bcanput(queue_t *q, unsigned char pri)
2540 {
2541 	qband_t *qbp;
2542 
2543 	TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
2544 	if (!q)
2545 		return (0);
2546 
2547 	/* Find next forward module that has a service procedure */
2548 	q = q->q_nfsrv;
2549 
2550 	mutex_enter(QLOCK(q));
2551 	if (pri == 0) {
2552 		if (q->q_flag & QFULL) {
2553 			q->q_flag |= QWANTW;
2554 			mutex_exit(QLOCK(q));
2555 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2556 			    "bcanput:%p %X %d", q, pri, 0);
2557 			return (0);
2558 		}
2559 	} else {	/* pri != 0 */
2560 		if (pri > q->q_nband) {
2561 			/*
2562 			 * No band exists yet, so return success.
2563 			 */
2564 			mutex_exit(QLOCK(q));
2565 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2566 			    "bcanput:%p %X %d", q, pri, 1);
2567 			return (1);
2568 		}
2569 		qbp = q->q_bandp;
2570 		while (--pri)
2571 			qbp = qbp->qb_next;
2572 		if (qbp->qb_flag & QB_FULL) {
2573 			qbp->qb_flag |= QB_WANTW;
2574 			mutex_exit(QLOCK(q));
2575 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2576 			    "bcanput:%p %X %d", q, pri, 0);
2577 			return (0);
2578 		}
2579 	}
2580 	mutex_exit(QLOCK(q));
2581 	TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2582 	    "bcanput:%p %X %d", q, pri, 1);
2583 	return (1);
2584 }
2585 
2586 /*
2587  * Put a message on a queue.
2588  *
2589  * Messages are enqueued on a priority basis.  The priority classes
2590  * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
2591  * and B_NORMAL (type < QPCTL && band == 0).
2592  *
2593  * Add appropriate weighted data block sizes to queue count.
2594  * If queue hits high water mark then set QFULL flag.
2595  *
2596  * If QNOENAB is not set (putq is allowed to enable the queue),
2597  * enable the queue only if the message is PRIORITY,
2598  * or the QWANTR flag is set (indicating that the service procedure
2599  * is ready to read the queue.  This implies that a service
2600  * procedure must NEVER put a high priority message back on its own
2601  * queue, as this would result in an infinite loop (!).
2602  */
2603 int
2604 putq(queue_t *q, mblk_t *bp)
2605 {
2606 	mblk_t *tmp;
2607 	qband_t *qbp = NULL;
2608 	int mcls = (int)queclass(bp);
2609 	kthread_id_t freezer;
2610 	int	bytecnt = 0, mblkcnt = 0;
2611 
2612 	freezer = STREAM(q)->sd_freezer;
2613 	if (freezer == curthread) {
2614 		ASSERT(frozenstr(q));
2615 		ASSERT(MUTEX_HELD(QLOCK(q)));
2616 	} else
2617 		mutex_enter(QLOCK(q));
2618 
2619 	/*
2620 	 * Make sanity checks and if qband structure is not yet
2621 	 * allocated, do so.
2622 	 */
2623 	if (mcls == QPCTL) {
2624 		if (bp->b_band != 0)
2625 			bp->b_band = 0;		/* force to be correct */
2626 	} else if (bp->b_band != 0) {
2627 		int i;
2628 		qband_t **qbpp;
2629 
2630 		if (bp->b_band > q->q_nband) {
2631 
2632 			/*
2633 			 * The qband structure for this priority band is
2634 			 * not on the queue yet, so we have to allocate
2635 			 * one on the fly.  It would be wasteful to
2636 			 * associate the qband structures with every
2637 			 * queue when the queues are allocated.  This is
2638 			 * because most queues will only need the normal
2639 			 * band of flow which can be described entirely
2640 			 * by the queue itself.
2641 			 */
2642 			qbpp = &q->q_bandp;
2643 			while (*qbpp)
2644 				qbpp = &(*qbpp)->qb_next;
2645 			while (bp->b_band > q->q_nband) {
2646 				if ((*qbpp = allocband()) == NULL) {
2647 					if (freezer != curthread)
2648 						mutex_exit(QLOCK(q));
2649 					return (0);
2650 				}
2651 				(*qbpp)->qb_hiwat = q->q_hiwat;
2652 				(*qbpp)->qb_lowat = q->q_lowat;
2653 				q->q_nband++;
2654 				qbpp = &(*qbpp)->qb_next;
2655 			}
2656 		}
2657 		ASSERT(MUTEX_HELD(QLOCK(q)));
2658 		qbp = q->q_bandp;
2659 		i = bp->b_band;
2660 		while (--i)
2661 			qbp = qbp->qb_next;
2662 	}
2663 
2664 	/*
2665 	 * If queue is empty, add the message and initialize the pointers.
2666 	 * Otherwise, adjust message pointers and queue pointers based on
2667 	 * the type of the message and where it belongs on the queue.  Some
2668 	 * code is duplicated to minimize the number of conditionals and
2669 	 * hopefully minimize the amount of time this routine takes.
2670 	 */
2671 	if (!q->q_first) {
2672 		bp->b_next = NULL;
2673 		bp->b_prev = NULL;
2674 		q->q_first = bp;
2675 		q->q_last = bp;
2676 		if (qbp) {
2677 			qbp->qb_first = bp;
2678 			qbp->qb_last = bp;
2679 		}
2680 	} else if (!qbp) {	/* bp->b_band == 0 */
2681 
2682 		/*
2683 		 * If queue class of message is less than or equal to
2684 		 * that of the last one on the queue, tack on to the end.
2685 		 */
2686 		tmp = q->q_last;
2687 		if (mcls <= (int)queclass(tmp)) {
2688 			bp->b_next = NULL;
2689 			bp->b_prev = tmp;
2690 			tmp->b_next = bp;
2691 			q->q_last = bp;
2692 		} else {
2693 			tmp = q->q_first;
2694 			while ((int)queclass(tmp) >= mcls)
2695 				tmp = tmp->b_next;
2696 
2697 			/*
2698 			 * Insert bp before tmp.
2699 			 */
2700 			bp->b_next = tmp;
2701 			bp->b_prev = tmp->b_prev;
2702 			if (tmp->b_prev)
2703 				tmp->b_prev->b_next = bp;
2704 			else
2705 				q->q_first = bp;
2706 			tmp->b_prev = bp;
2707 		}
2708 	} else {		/* bp->b_band != 0 */
2709 		if (qbp->qb_first) {
2710 			tmp = qbp->qb_last;
2711 
2712 			/*
2713 			 * Insert bp after the last message in this band.
2714 			 */
2715 			bp->b_next = tmp->b_next;
2716 			if (tmp->b_next)
2717 				tmp->b_next->b_prev = bp;
2718 			else
2719 				q->q_last = bp;
2720 			bp->b_prev = tmp;
2721 			tmp->b_next = bp;
2722 		} else {
2723 			tmp = q->q_last;
2724 			if ((mcls < (int)queclass(tmp)) ||
2725 			    (bp->b_band <= tmp->b_band)) {
2726 
2727 				/*
2728 				 * Tack bp on end of queue.
2729 				 */
2730 				bp->b_next = NULL;
2731 				bp->b_prev = tmp;
2732 				tmp->b_next = bp;
2733 				q->q_last = bp;
2734 			} else {
2735 				tmp = q->q_first;
2736 				while (tmp->b_datap->db_type >= QPCTL)
2737 					tmp = tmp->b_next;
2738 				while (tmp->b_band >= bp->b_band)
2739 					tmp = tmp->b_next;
2740 
2741 				/*
2742 				 * Insert bp before tmp.
2743 				 */
2744 				bp->b_next = tmp;
2745 				bp->b_prev = tmp->b_prev;
2746 				if (tmp->b_prev)
2747 					tmp->b_prev->b_next = bp;
2748 				else
2749 					q->q_first = bp;
2750 				tmp->b_prev = bp;
2751 			}
2752 			qbp->qb_first = bp;
2753 		}
2754 		qbp->qb_last = bp;
2755 	}
2756 
2757 	/* Get message byte count for q_count accounting */
2758 	bytecnt = mp_cont_len(bp, &mblkcnt);
2759 
2760 	if (qbp) {
2761 		qbp->qb_count += bytecnt;
2762 		qbp->qb_mblkcnt += mblkcnt;
2763 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2764 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2765 			qbp->qb_flag |= QB_FULL;
2766 		}
2767 	} else {
2768 		q->q_count += bytecnt;
2769 		q->q_mblkcnt += mblkcnt;
2770 		if ((q->q_count >= q->q_hiwat) ||
2771 		    (q->q_mblkcnt >= q->q_hiwat)) {
2772 			q->q_flag |= QFULL;
2773 		}
2774 	}
2775 
2776 	STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL);
2777 
2778 	if ((mcls > QNORM) ||
2779 	    (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
2780 		qenable_locked(q);
2781 	ASSERT(MUTEX_HELD(QLOCK(q)));
2782 	if (freezer != curthread)
2783 		mutex_exit(QLOCK(q));
2784 
2785 	return (1);
2786 }
2787 
2788 /*
2789  * Put stuff back at beginning of Q according to priority order.
2790  * See comment on putq above for details.
2791  */
2792 int
2793 putbq(queue_t *q, mblk_t *bp)
2794 {
2795 	mblk_t *tmp;
2796 	qband_t *qbp = NULL;
2797 	int mcls = (int)queclass(bp);
2798 	kthread_id_t freezer;
2799 	int	bytecnt = 0, mblkcnt = 0;
2800 
2801 	ASSERT(q && bp);
2802 	ASSERT(bp->b_next == NULL);
2803 	freezer = STREAM(q)->sd_freezer;
2804 	if (freezer == curthread) {
2805 		ASSERT(frozenstr(q));
2806 		ASSERT(MUTEX_HELD(QLOCK(q)));
2807 	} else
2808 		mutex_enter(QLOCK(q));
2809 
2810 	/*
2811 	 * Make sanity checks and if qband structure is not yet
2812 	 * allocated, do so.
2813 	 */
2814 	if (mcls == QPCTL) {
2815 		if (bp->b_band != 0)
2816 			bp->b_band = 0;		/* force to be correct */
2817 	} else if (bp->b_band != 0) {
2818 		int i;
2819 		qband_t **qbpp;
2820 
2821 		if (bp->b_band > q->q_nband) {
2822 			qbpp = &q->q_bandp;
2823 			while (*qbpp)
2824 				qbpp = &(*qbpp)->qb_next;
2825 			while (bp->b_band > q->q_nband) {
2826 				if ((*qbpp = allocband()) == NULL) {
2827 					if (freezer != curthread)
2828 						mutex_exit(QLOCK(q));
2829 					return (0);
2830 				}
2831 				(*qbpp)->qb_hiwat = q->q_hiwat;
2832 				(*qbpp)->qb_lowat = q->q_lowat;
2833 				q->q_nband++;
2834 				qbpp = &(*qbpp)->qb_next;
2835 			}
2836 		}
2837 		qbp = q->q_bandp;
2838 		i = bp->b_band;
2839 		while (--i)
2840 			qbp = qbp->qb_next;
2841 	}
2842 
2843 	/*
2844 	 * If queue is empty or if message is high priority,
2845 	 * place on the front of the queue.
2846 	 */
2847 	tmp = q->q_first;
2848 	if ((!tmp) || (mcls == QPCTL)) {
2849 		bp->b_next = tmp;
2850 		if (tmp)
2851 			tmp->b_prev = bp;
2852 		else
2853 			q->q_last = bp;
2854 		q->q_first = bp;
2855 		bp->b_prev = NULL;
2856 		if (qbp) {
2857 			qbp->qb_first = bp;
2858 			qbp->qb_last = bp;
2859 		}
2860 	} else if (qbp) {	/* bp->b_band != 0 */
2861 		tmp = qbp->qb_first;
2862 		if (tmp) {
2863 
2864 			/*
2865 			 * Insert bp before the first message in this band.
2866 			 */
2867 			bp->b_next = tmp;
2868 			bp->b_prev = tmp->b_prev;
2869 			if (tmp->b_prev)
2870 				tmp->b_prev->b_next = bp;
2871 			else
2872 				q->q_first = bp;
2873 			tmp->b_prev = bp;
2874 		} else {
2875 			tmp = q->q_last;
2876 			if ((mcls < (int)queclass(tmp)) ||
2877 			    (bp->b_band < tmp->b_band)) {
2878 
2879 				/*
2880 				 * Tack bp on end of queue.
2881 				 */
2882 				bp->b_next = NULL;
2883 				bp->b_prev = tmp;
2884 				tmp->b_next = bp;
2885 				q->q_last = bp;
2886 			} else {
2887 				tmp = q->q_first;
2888 				while (tmp->b_datap->db_type >= QPCTL)
2889 					tmp = tmp->b_next;
2890 				while (tmp->b_band > bp->b_band)
2891 					tmp = tmp->b_next;
2892 
2893 				/*
2894 				 * Insert bp before tmp.
2895 				 */
2896 				bp->b_next = tmp;
2897 				bp->b_prev = tmp->b_prev;
2898 				if (tmp->b_prev)
2899 					tmp->b_prev->b_next = bp;
2900 				else
2901 					q->q_first = bp;
2902 				tmp->b_prev = bp;
2903 			}
2904 			qbp->qb_last = bp;
2905 		}
2906 		qbp->qb_first = bp;
2907 	} else {		/* bp->b_band == 0 && !QPCTL */
2908 
2909 		/*
2910 		 * If the queue class or band is less than that of the last
2911 		 * message on the queue, tack bp on the end of the queue.
2912 		 */
2913 		tmp = q->q_last;
2914 		if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
2915 			bp->b_next = NULL;
2916 			bp->b_prev = tmp;
2917 			tmp->b_next = bp;
2918 			q->q_last = bp;
2919 		} else {
2920 			tmp = q->q_first;
2921 			while (tmp->b_datap->db_type >= QPCTL)
2922 				tmp = tmp->b_next;
2923 			while (tmp->b_band > bp->b_band)
2924 				tmp = tmp->b_next;
2925 
2926 			/*
2927 			 * Insert bp before tmp.
2928 			 */
2929 			bp->b_next = tmp;
2930 			bp->b_prev = tmp->b_prev;
2931 			if (tmp->b_prev)
2932 				tmp->b_prev->b_next = bp;
2933 			else
2934 				q->q_first = bp;
2935 			tmp->b_prev = bp;
2936 		}
2937 	}
2938 
2939 	/* Get message byte count for q_count accounting */
2940 	bytecnt = mp_cont_len(bp, &mblkcnt);
2941 
2942 	if (qbp) {
2943 		qbp->qb_count += bytecnt;
2944 		qbp->qb_mblkcnt += mblkcnt;
2945 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2946 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2947 			qbp->qb_flag |= QB_FULL;
2948 		}
2949 	} else {
2950 		q->q_count += bytecnt;
2951 		q->q_mblkcnt += mblkcnt;
2952 		if ((q->q_count >= q->q_hiwat) ||
2953 		    (q->q_mblkcnt >= q->q_hiwat)) {
2954 			q->q_flag |= QFULL;
2955 		}
2956 	}
2957 
2958 	STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL);
2959 
2960 	if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
2961 		qenable_locked(q);
2962 	ASSERT(MUTEX_HELD(QLOCK(q)));
2963 	if (freezer != curthread)
2964 		mutex_exit(QLOCK(q));
2965 
2966 	return (1);
2967 }
2968 
2969 /*
2970  * Insert a message before an existing message on the queue.  If the
2971  * existing message is NULL, the new messages is placed on the end of
2972  * the queue.  The queue class of the new message is ignored.  However,
2973  * the priority band of the new message must adhere to the following
2974  * ordering:
2975  *
2976  *	emp->b_prev->b_band >= mp->b_band >= emp->b_band.
2977  *
2978  * All flow control parameters are updated.
2979  *
2980  * insq can be called with the stream frozen, but other utility functions
2981  * holding QLOCK, and by streams modules without any locks/frozen.
2982  */
2983 int
2984 insq(queue_t *q, mblk_t *emp, mblk_t *mp)
2985 {
2986 	mblk_t *tmp;
2987 	qband_t *qbp = NULL;
2988 	int mcls = (int)queclass(mp);
2989 	kthread_id_t freezer;
2990 	int	bytecnt = 0, mblkcnt = 0;
2991 
2992 	freezer = STREAM(q)->sd_freezer;
2993 	if (freezer == curthread) {
2994 		ASSERT(frozenstr(q));
2995 		ASSERT(MUTEX_HELD(QLOCK(q)));
2996 	} else if (MUTEX_HELD(QLOCK(q))) {
2997 		/* Don't drop lock on exit */
2998 		freezer = curthread;
2999 	} else
3000 		mutex_enter(QLOCK(q));
3001 
3002 	if (mcls == QPCTL) {
3003 		if (mp->b_band != 0)
3004 			mp->b_band = 0;		/* force to be correct */
3005 		if (emp && emp->b_prev &&
3006 		    (emp->b_prev->b_datap->db_type < QPCTL))
3007 			goto badord;
3008 	}
3009 	if (emp) {
3010 		if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
3011 		    (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
3012 		    (emp->b_prev->b_band < mp->b_band))) {
3013 			goto badord;
3014 		}
3015 	} else {
3016 		tmp = q->q_last;
3017 		if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
3018 badord:
3019 			cmn_err(CE_WARN,
3020 			    "insq: attempt to insert message out of order "
3021 			    "on q %p", (void *)q);
3022 			if (freezer != curthread)
3023 				mutex_exit(QLOCK(q));
3024 			return (0);
3025 		}
3026 	}
3027 
3028 	if (mp->b_band != 0) {
3029 		int i;
3030 		qband_t **qbpp;
3031 
3032 		if (mp->b_band > q->q_nband) {
3033 			qbpp = &q->q_bandp;
3034 			while (*qbpp)
3035 				qbpp = &(*qbpp)->qb_next;
3036 			while (mp->b_band > q->q_nband) {
3037 				if ((*qbpp = allocband()) == NULL) {
3038 					if (freezer != curthread)
3039 						mutex_exit(QLOCK(q));
3040 					return (0);
3041 				}
3042 				(*qbpp)->qb_hiwat = q->q_hiwat;
3043 				(*qbpp)->qb_lowat = q->q_lowat;
3044 				q->q_nband++;
3045 				qbpp = &(*qbpp)->qb_next;
3046 			}
3047 		}
3048 		qbp = q->q_bandp;
3049 		i = mp->b_band;
3050 		while (--i)
3051 			qbp = qbp->qb_next;
3052 	}
3053 
3054 	if ((mp->b_next = emp) != NULL) {
3055 		if ((mp->b_prev = emp->b_prev) != NULL)
3056 			emp->b_prev->b_next = mp;
3057 		else
3058 			q->q_first = mp;
3059 		emp->b_prev = mp;
3060 	} else {
3061 		if ((mp->b_prev = q->q_last) != NULL)
3062 			q->q_last->b_next = mp;
3063 		else
3064 			q->q_first = mp;
3065 		q->q_last = mp;
3066 	}
3067 
3068 	/* Get mblk and byte count for q_count accounting */
3069 	bytecnt = mp_cont_len(mp, &mblkcnt);
3070 
3071 	if (qbp) {	/* adjust qband pointers and count */
3072 		if (!qbp->qb_first) {
3073 			qbp->qb_first = mp;
3074 			qbp->qb_last = mp;
3075 		} else {
3076 			if (mp->b_prev == NULL || (mp->b_prev != NULL &&
3077 			    (mp->b_prev->b_band != mp->b_band)))
3078 				qbp->qb_first = mp;
3079 			else if (mp->b_next == NULL || (mp->b_next != NULL &&
3080 			    (mp->b_next->b_band != mp->b_band)))
3081 				qbp->qb_last = mp;
3082 		}
3083 		qbp->qb_count += bytecnt;
3084 		qbp->qb_mblkcnt += mblkcnt;
3085 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
3086 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
3087 			qbp->qb_flag |= QB_FULL;
3088 		}
3089 	} else {
3090 		q->q_count += bytecnt;
3091 		q->q_mblkcnt += mblkcnt;
3092 		if ((q->q_count >= q->q_hiwat) ||
3093 		    (q->q_mblkcnt >= q->q_hiwat)) {
3094 			q->q_flag |= QFULL;
3095 		}
3096 	}
3097 
3098 	STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL);
3099 
3100 	if (canenable(q) && (q->q_flag & QWANTR))
3101 		qenable_locked(q);
3102 
3103 	ASSERT(MUTEX_HELD(QLOCK(q)));
3104 	if (freezer != curthread)
3105 		mutex_exit(QLOCK(q));
3106 
3107 	return (1);
3108 }
3109 
3110 /*
3111  * Create and put a control message on queue.
3112  */
3113 int
3114 putctl(queue_t *q, int type)
3115 {
3116 	mblk_t *bp;
3117 
3118 	if ((datamsg(type) && (type != M_DELAY)) ||
3119 	    (bp = allocb_tryhard(0)) == NULL)
3120 		return (0);
3121 	bp->b_datap->db_type = (unsigned char) type;
3122 
3123 	put(q, bp);
3124 
3125 	return (1);
3126 }
3127 
3128 /*
3129  * Control message with a single-byte parameter
3130  */
3131 int
3132 putctl1(queue_t *q, int type, int param)
3133 {
3134 	mblk_t *bp;
3135 
3136 	if ((datamsg(type) && (type != M_DELAY)) ||
3137 	    (bp = allocb_tryhard(1)) == NULL)
3138 		return (0);
3139 	bp->b_datap->db_type = (unsigned char)type;
3140 	*bp->b_wptr++ = (unsigned char)param;
3141 
3142 	put(q, bp);
3143 
3144 	return (1);
3145 }
3146 
3147 int
3148 putnextctl1(queue_t *q, int type, int param)
3149 {
3150 	mblk_t *bp;
3151 
3152 	if ((datamsg(type) && (type != M_DELAY)) ||
3153 	    ((bp = allocb_tryhard(1)) == NULL))
3154 		return (0);
3155 
3156 	bp->b_datap->db_type = (unsigned char)type;
3157 	*bp->b_wptr++ = (unsigned char)param;
3158 
3159 	putnext(q, bp);
3160 
3161 	return (1);
3162 }
3163 
3164 int
3165 putnextctl(queue_t *q, int type)
3166 {
3167 	mblk_t *bp;
3168 
3169 	if ((datamsg(type) && (type != M_DELAY)) ||
3170 	    ((bp = allocb_tryhard(0)) == NULL))
3171 		return (0);
3172 	bp->b_datap->db_type = (unsigned char)type;
3173 
3174 	putnext(q, bp);
3175 
3176 	return (1);
3177 }
3178 
3179 /*
3180  * Return the queue upstream from this one
3181  */
3182 queue_t *
3183 backq(queue_t *q)
3184 {
3185 	q = _OTHERQ(q);
3186 	if (q->q_next) {
3187 		q = q->q_next;
3188 		return (_OTHERQ(q));
3189 	}
3190 	return (NULL);
3191 }
3192 
3193 /*
3194  * Send a block back up the queue in reverse from this
3195  * one (e.g. to respond to ioctls)
3196  */
3197 void
3198 qreply(queue_t *q, mblk_t *bp)
3199 {
3200 	ASSERT(q && bp);
3201 
3202 	putnext(_OTHERQ(q), bp);
3203 }
3204 
3205 /*
3206  * Streams Queue Scheduling
3207  *
3208  * Queues are enabled through qenable() when they have messages to
3209  * process.  They are serviced by queuerun(), which runs each enabled
3210  * queue's service procedure.  The call to queuerun() is processor
3211  * dependent - the general principle is that it be run whenever a queue
3212  * is enabled but before returning to user level.  For system calls,
3213  * the function runqueues() is called if their action causes a queue
3214  * to be enabled.  For device interrupts, queuerun() should be
3215  * called before returning from the last level of interrupt.  Beyond
3216  * this, no timing assumptions should be made about queue scheduling.
3217  */
3218 
3219 /*
3220  * Enable a queue: put it on list of those whose service procedures are
3221  * ready to run and set up the scheduling mechanism.
3222  * The broadcast is done outside the mutex -> to avoid the woken thread
3223  * from contending with the mutex. This is OK 'cos the queue has been
3224  * enqueued on the runlist and flagged safely at this point.
3225  */
3226 void
3227 qenable(queue_t *q)
3228 {
3229 	mutex_enter(QLOCK(q));
3230 	qenable_locked(q);
3231 	mutex_exit(QLOCK(q));
3232 }
3233 /*
3234  * Return number of messages on queue
3235  */
3236 int
3237 qsize(queue_t *qp)
3238 {
3239 	int count = 0;
3240 	mblk_t *mp;
3241 
3242 	mutex_enter(QLOCK(qp));
3243 	for (mp = qp->q_first; mp; mp = mp->b_next)
3244 		count++;
3245 	mutex_exit(QLOCK(qp));
3246 	return (count);
3247 }
3248 
3249 /*
3250  * noenable - set queue so that putq() will not enable it.
3251  * enableok - set queue so that putq() can enable it.
3252  */
3253 void
3254 noenable(queue_t *q)
3255 {
3256 	mutex_enter(QLOCK(q));
3257 	q->q_flag |= QNOENB;
3258 	mutex_exit(QLOCK(q));
3259 }
3260 
3261 void
3262 enableok(queue_t *q)
3263 {
3264 	mutex_enter(QLOCK(q));
3265 	q->q_flag &= ~QNOENB;
3266 	mutex_exit(QLOCK(q));
3267 }
3268 
3269 /*
3270  * Set queue fields.
3271  */
3272 int
3273 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
3274 {
3275 	qband_t *qbp = NULL;
3276 	queue_t	*wrq;
3277 	int error = 0;
3278 	kthread_id_t freezer;
3279 
3280 	freezer = STREAM(q)->sd_freezer;
3281 	if (freezer == curthread) {
3282 		ASSERT(frozenstr(q));
3283 		ASSERT(MUTEX_HELD(QLOCK(q)));
3284 	} else
3285 		mutex_enter(QLOCK(q));
3286 
3287 	if (what >= QBAD) {
3288 		error = EINVAL;
3289 		goto done;
3290 	}
3291 	if (pri != 0) {
3292 		int i;
3293 		qband_t **qbpp;
3294 
3295 		if (pri > q->q_nband) {
3296 			qbpp = &q->q_bandp;
3297 			while (*qbpp)
3298 				qbpp = &(*qbpp)->qb_next;
3299 			while (pri > q->q_nband) {
3300 				if ((*qbpp = allocband()) == NULL) {
3301 					error = EAGAIN;
3302 					goto done;
3303 				}
3304 				(*qbpp)->qb_hiwat = q->q_hiwat;
3305 				(*qbpp)->qb_lowat = q->q_lowat;
3306 				q->q_nband++;
3307 				qbpp = &(*qbpp)->qb_next;
3308 			}
3309 		}
3310 		qbp = q->q_bandp;
3311 		i = pri;
3312 		while (--i)
3313 			qbp = qbp->qb_next;
3314 	}
3315 	switch (what) {
3316 
3317 	case QHIWAT:
3318 		if (qbp)
3319 			qbp->qb_hiwat = (size_t)val;
3320 		else
3321 			q->q_hiwat = (size_t)val;
3322 		break;
3323 
3324 	case QLOWAT:
3325 		if (qbp)
3326 			qbp->qb_lowat = (size_t)val;
3327 		else
3328 			q->q_lowat = (size_t)val;
3329 		break;
3330 
3331 	case QMAXPSZ:
3332 		if (qbp)
3333 			error = EINVAL;
3334 		else
3335 			q->q_maxpsz = (ssize_t)val;
3336 
3337 		/*
3338 		 * Performance concern, strwrite looks at the module below
3339 		 * the stream head for the maxpsz each time it does a write
3340 		 * we now cache it at the stream head.  Check to see if this
3341 		 * queue is sitting directly below the stream head.
3342 		 */
3343 		wrq = STREAM(q)->sd_wrq;
3344 		if (q != wrq->q_next)
3345 			break;
3346 
3347 		/*
3348 		 * If the stream is not frozen drop the current QLOCK and
3349 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3350 		 */
3351 		if (freezer != curthread) {
3352 			mutex_exit(QLOCK(q));
3353 			mutex_enter(QLOCK(wrq));
3354 		}
3355 		ASSERT(MUTEX_HELD(QLOCK(wrq)));
3356 
3357 		if (strmsgsz != 0) {
3358 			if (val == INFPSZ)
3359 				val = strmsgsz;
3360 			else  {
3361 				if (STREAM(q)->sd_vnode->v_type == VFIFO)
3362 					val = MIN(PIPE_BUF, val);
3363 				else
3364 					val = MIN(strmsgsz, val);
3365 			}
3366 		}
3367 		STREAM(q)->sd_qn_maxpsz = val;
3368 		if (freezer != curthread) {
3369 			mutex_exit(QLOCK(wrq));
3370 			mutex_enter(QLOCK(q));
3371 		}
3372 		break;
3373 
3374 	case QMINPSZ:
3375 		if (qbp)
3376 			error = EINVAL;
3377 		else
3378 			q->q_minpsz = (ssize_t)val;
3379 
3380 		/*
3381 		 * Performance concern, strwrite looks at the module below
3382 		 * the stream head for the maxpsz each time it does a write
3383 		 * we now cache it at the stream head.  Check to see if this
3384 		 * queue is sitting directly below the stream head.
3385 		 */
3386 		wrq = STREAM(q)->sd_wrq;
3387 		if (q != wrq->q_next)
3388 			break;
3389 
3390 		/*
3391 		 * If the stream is not frozen drop the current QLOCK and
3392 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3393 		 */
3394 		if (freezer != curthread) {
3395 			mutex_exit(QLOCK(q));
3396 			mutex_enter(QLOCK(wrq));
3397 		}
3398 		STREAM(q)->sd_qn_minpsz = (ssize_t)val;
3399 
3400 		if (freezer != curthread) {
3401 			mutex_exit(QLOCK(wrq));
3402 			mutex_enter(QLOCK(q));
3403 		}
3404 		break;
3405 
3406 	case QSTRUIOT:
3407 		if (qbp)
3408 			error = EINVAL;
3409 		else
3410 			q->q_struiot = (ushort_t)val;
3411 		break;
3412 
3413 	case QCOUNT:
3414 	case QFIRST:
3415 	case QLAST:
3416 	case QFLAG:
3417 		error = EPERM;
3418 		break;
3419 
3420 	default:
3421 		error = EINVAL;
3422 		break;
3423 	}
3424 done:
3425 	if (freezer != curthread)
3426 		mutex_exit(QLOCK(q));
3427 	return (error);
3428 }
3429 
3430 /*
3431  * Get queue fields.
3432  */
3433 int
3434 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
3435 {
3436 	qband_t 	*qbp = NULL;
3437 	int 		error = 0;
3438 	kthread_id_t 	freezer;
3439 
3440 	freezer = STREAM(q)->sd_freezer;
3441 	if (freezer == curthread) {
3442 		ASSERT(frozenstr(q));
3443 		ASSERT(MUTEX_HELD(QLOCK(q)));
3444 	} else
3445 		mutex_enter(QLOCK(q));
3446 	if (what >= QBAD) {
3447 		error = EINVAL;
3448 		goto done;
3449 	}
3450 	if (pri != 0) {
3451 		int i;
3452 		qband_t **qbpp;
3453 
3454 		if (pri > q->q_nband) {
3455 			qbpp = &q->q_bandp;
3456 			while (*qbpp)
3457 				qbpp = &(*qbpp)->qb_next;
3458 			while (pri > q->q_nband) {
3459 				if ((*qbpp = allocband()) == NULL) {
3460 					error = EAGAIN;
3461 					goto done;
3462 				}
3463 				(*qbpp)->qb_hiwat = q->q_hiwat;
3464 				(*qbpp)->qb_lowat = q->q_lowat;
3465 				q->q_nband++;
3466 				qbpp = &(*qbpp)->qb_next;
3467 			}
3468 		}
3469 		qbp = q->q_bandp;
3470 		i = pri;
3471 		while (--i)
3472 			qbp = qbp->qb_next;
3473 	}
3474 	switch (what) {
3475 	case QHIWAT:
3476 		if (qbp)
3477 			*(size_t *)valp = qbp->qb_hiwat;
3478 		else
3479 			*(size_t *)valp = q->q_hiwat;
3480 		break;
3481 
3482 	case QLOWAT:
3483 		if (qbp)
3484 			*(size_t *)valp = qbp->qb_lowat;
3485 		else
3486 			*(size_t *)valp = q->q_lowat;
3487 		break;
3488 
3489 	case QMAXPSZ:
3490 		if (qbp)
3491 			error = EINVAL;
3492 		else
3493 			*(ssize_t *)valp = q->q_maxpsz;
3494 		break;
3495 
3496 	case QMINPSZ:
3497 		if (qbp)
3498 			error = EINVAL;
3499 		else
3500 			*(ssize_t *)valp = q->q_minpsz;
3501 		break;
3502 
3503 	case QCOUNT:
3504 		if (qbp)
3505 			*(size_t *)valp = qbp->qb_count;
3506 		else
3507 			*(size_t *)valp = q->q_count;
3508 		break;
3509 
3510 	case QFIRST:
3511 		if (qbp)
3512 			*(mblk_t **)valp = qbp->qb_first;
3513 		else
3514 			*(mblk_t **)valp = q->q_first;
3515 		break;
3516 
3517 	case QLAST:
3518 		if (qbp)
3519 			*(mblk_t **)valp = qbp->qb_last;
3520 		else
3521 			*(mblk_t **)valp = q->q_last;
3522 		break;
3523 
3524 	case QFLAG:
3525 		if (qbp)
3526 			*(uint_t *)valp = qbp->qb_flag;
3527 		else
3528 			*(uint_t *)valp = q->q_flag;
3529 		break;
3530 
3531 	case QSTRUIOT:
3532 		if (qbp)
3533 			error = EINVAL;
3534 		else
3535 			*(short *)valp = q->q_struiot;
3536 		break;
3537 
3538 	default:
3539 		error = EINVAL;
3540 		break;
3541 	}
3542 done:
3543 	if (freezer != curthread)
3544 		mutex_exit(QLOCK(q));
3545 	return (error);
3546 }
3547 
3548 /*
3549  * Function awakes all in cvwait/sigwait/pollwait, on one of:
3550  *	QWANTWSYNC or QWANTR or QWANTW,
3551  *
3552  * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
3553  *	 deferred wakeup will be done. Also if strpoll() in progress then a
3554  *	 deferred pollwakeup will be done.
3555  */
3556 void
3557 strwakeq(queue_t *q, int flag)
3558 {
3559 	stdata_t 	*stp = STREAM(q);
3560 	pollhead_t 	*pl;
3561 
3562 	mutex_enter(&stp->sd_lock);
3563 	pl = &stp->sd_pollist;
3564 	if (flag & QWANTWSYNC) {
3565 		ASSERT(!(q->q_flag & QREADR));
3566 		if (stp->sd_flag & WSLEEP) {
3567 			stp->sd_flag &= ~WSLEEP;
3568 			cv_broadcast(&stp->sd_wrq->q_wait);
3569 		} else {
3570 			stp->sd_wakeq |= WSLEEP;
3571 		}
3572 
3573 		mutex_exit(&stp->sd_lock);
3574 		pollwakeup(pl, POLLWRNORM);
3575 		mutex_enter(&stp->sd_lock);
3576 
3577 		if (stp->sd_sigflags & S_WRNORM)
3578 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3579 	} else if (flag & QWANTR) {
3580 		if (stp->sd_flag & RSLEEP) {
3581 			stp->sd_flag &= ~RSLEEP;
3582 			cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
3583 		} else {
3584 			stp->sd_wakeq |= RSLEEP;
3585 		}
3586 
3587 		mutex_exit(&stp->sd_lock);
3588 		pollwakeup(pl, POLLIN | POLLRDNORM);
3589 		mutex_enter(&stp->sd_lock);
3590 
3591 		{
3592 			int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
3593 
3594 			if (events)
3595 				strsendsig(stp->sd_siglist, events, 0, 0);
3596 		}
3597 	} else {
3598 		if (stp->sd_flag & WSLEEP) {
3599 			stp->sd_flag &= ~WSLEEP;
3600 			cv_broadcast(&stp->sd_wrq->q_wait);
3601 		}
3602 
3603 		mutex_exit(&stp->sd_lock);
3604 		pollwakeup(pl, POLLWRNORM);
3605 		mutex_enter(&stp->sd_lock);
3606 
3607 		if (stp->sd_sigflags & S_WRNORM)
3608 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3609 	}
3610 	mutex_exit(&stp->sd_lock);
3611 }
3612 
3613 int
3614 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
3615 {
3616 	stdata_t *stp = STREAM(q);
3617 	int typ  = STRUIOT_STANDARD;
3618 	uio_t	 *uiop = &dp->d_uio;
3619 	dblk_t	 *dbp;
3620 	ssize_t	 uiocnt;
3621 	ssize_t	 cnt;
3622 	unsigned char *ptr;
3623 	ssize_t	 resid;
3624 	int	 error = 0;
3625 	on_trap_data_t otd;
3626 	queue_t	*stwrq;
3627 
3628 	/*
3629 	 * Plumbing may change while taking the type so store the
3630 	 * queue in a temporary variable. It doesn't matter even
3631 	 * if the we take the type from the previous plumbing,
3632 	 * that's because if the plumbing has changed when we were
3633 	 * holding the queue in a temporary variable, we can continue
3634 	 * processing the message the way it would have been processed
3635 	 * in the old plumbing, without any side effects but a bit
3636 	 * extra processing for partial ip header checksum.
3637 	 *
3638 	 * This has been done to avoid holding the sd_lock which is
3639 	 * very hot.
3640 	 */
3641 
3642 	stwrq = stp->sd_struiowrq;
3643 	if (stwrq)
3644 		typ = stwrq->q_struiot;
3645 
3646 	for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
3647 		dbp = mp->b_datap;
3648 		ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
3649 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3650 		cnt = MIN(uiocnt, uiop->uio_resid);
3651 		if (!(dbp->db_struioflag & STRUIO_SPEC) ||
3652 		    (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
3653 			/*
3654 			 * Either this mblk has already been processed
3655 			 * or there is no more room in this mblk (?).
3656 			 */
3657 			continue;
3658 		}
3659 		switch (typ) {
3660 		case STRUIOT_STANDARD:
3661 			if (noblock) {
3662 				if (on_trap(&otd, OT_DATA_ACCESS)) {
3663 					no_trap();
3664 					error = EWOULDBLOCK;
3665 					goto out;
3666 				}
3667 			}
3668 			if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
3669 				if (noblock)
3670 					no_trap();
3671 				goto out;
3672 			}
3673 			if (noblock)
3674 				no_trap();
3675 			break;
3676 
3677 		default:
3678 			error = EIO;
3679 			goto out;
3680 		}
3681 		dbp->db_struioflag |= STRUIO_DONE;
3682 		dbp->db_cksumstuff += cnt;
3683 	}
3684 out:
3685 	if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
3686 		/*
3687 		 * A fault has occured and some bytes were moved to the
3688 		 * current mblk, the uio_t has already been updated by
3689 		 * the appropriate uio routine, so also update the mblk
3690 		 * to reflect this in case this same mblk chain is used
3691 		 * again (after the fault has been handled).
3692 		 */
3693 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3694 		if (uiocnt >= resid)
3695 			dbp->db_cksumstuff += resid;
3696 	}
3697 	return (error);
3698 }
3699 
3700 /*
3701  * Try to enter queue synchronously. Any attempt to enter a closing queue will
3702  * fails. The qp->q_rwcnt keeps track of the number of successful entries so
3703  * that removeq() will not try to close the queue while a thread is inside the
3704  * queue.
3705  */
3706 static boolean_t
3707 rwnext_enter(queue_t *qp)
3708 {
3709 	mutex_enter(QLOCK(qp));
3710 	if (qp->q_flag & QWCLOSE) {
3711 		mutex_exit(QLOCK(qp));
3712 		return (B_FALSE);
3713 	}
3714 	qp->q_rwcnt++;
3715 	ASSERT(qp->q_rwcnt != 0);
3716 	mutex_exit(QLOCK(qp));
3717 	return (B_TRUE);
3718 }
3719 
3720 /*
3721  * Decrease the count of threads running in sync stream queue and wake up any
3722  * threads blocked in removeq().
3723  */
3724 static void
3725 rwnext_exit(queue_t *qp)
3726 {
3727 	mutex_enter(QLOCK(qp));
3728 	qp->q_rwcnt--;
3729 	if (qp->q_flag & QWANTRMQSYNC) {
3730 		qp->q_flag &= ~QWANTRMQSYNC;
3731 		cv_broadcast(&qp->q_wait);
3732 	}
3733 	mutex_exit(QLOCK(qp));
3734 }
3735 
3736 /*
3737  * The purpose of rwnext() is to call the rw procedure of the next
3738  * (downstream) modules queue.
3739  *
3740  * treated as put entrypoint for perimeter syncronization.
3741  *
3742  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3743  * sync queues). If it is CIPUT sync queue sq_count is incremented and it does
3744  * not matter if any regular put entrypoints have been already entered. We
3745  * can't increment one of the sq_putcounts (instead of sq_count) because
3746  * qwait_rw won't know which counter to decrement.
3747  *
3748  * It would be reasonable to add the lockless FASTPUT logic.
3749  */
3750 int
3751 rwnext(queue_t *qp, struiod_t *dp)
3752 {
3753 	queue_t		*nqp;
3754 	syncq_t		*sq;
3755 	uint16_t	count;
3756 	uint16_t	flags;
3757 	struct qinit	*qi;
3758 	int		(*proc)();
3759 	struct stdata	*stp;
3760 	int		isread;
3761 	int		rval;
3762 
3763 	stp = STREAM(qp);
3764 	/*
3765 	 * Prevent q_next from changing by holding sd_lock until acquiring
3766 	 * SQLOCK. Note that a read-side rwnext from the streamhead will
3767 	 * already have sd_lock acquired. In either case sd_lock is always
3768 	 * released after acquiring SQLOCK.
3769 	 *
3770 	 * The streamhead read-side holding sd_lock when calling rwnext is
3771 	 * required to prevent a race condition were M_DATA mblks flowing
3772 	 * up the read-side of the stream could be bypassed by a rwnext()
3773 	 * down-call. In this case sd_lock acts as the streamhead perimeter.
3774 	 */
3775 	if ((nqp = _WR(qp)) == qp) {
3776 		isread = 0;
3777 		mutex_enter(&stp->sd_lock);
3778 		qp = nqp->q_next;
3779 	} else {
3780 		isread = 1;
3781 		if (nqp != stp->sd_wrq)
3782 			/* Not streamhead */
3783 			mutex_enter(&stp->sd_lock);
3784 		qp = _RD(nqp->q_next);
3785 	}
3786 	qi = qp->q_qinfo;
3787 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
3788 		/*
3789 		 * Not a synchronous module or no r/w procedure for this
3790 		 * queue, so just return EINVAL and let the caller handle it.
3791 		 */
3792 		mutex_exit(&stp->sd_lock);
3793 		return (EINVAL);
3794 	}
3795 
3796 	if (rwnext_enter(qp) == B_FALSE) {
3797 		mutex_exit(&stp->sd_lock);
3798 		return (EINVAL);
3799 	}
3800 
3801 	sq = qp->q_syncq;
3802 	mutex_enter(SQLOCK(sq));
3803 	mutex_exit(&stp->sd_lock);
3804 	count = sq->sq_count;
3805 	flags = sq->sq_flags;
3806 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3807 
3808 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3809 		/*
3810 		 * if this queue is being closed, return.
3811 		 */
3812 		if (qp->q_flag & QWCLOSE) {
3813 			mutex_exit(SQLOCK(sq));
3814 			rwnext_exit(qp);
3815 			return (EINVAL);
3816 		}
3817 
3818 		/*
3819 		 * Wait until we can enter the inner perimeter.
3820 		 */
3821 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3822 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3823 		count = sq->sq_count;
3824 		flags = sq->sq_flags;
3825 	}
3826 
3827 	if (isread == 0 && stp->sd_struiowrq == NULL ||
3828 	    isread == 1 && stp->sd_struiordq == NULL) {
3829 		/*
3830 		 * Stream plumbing changed while waiting for inner perimeter
3831 		 * so just return EINVAL and let the caller handle it.
3832 		 */
3833 		mutex_exit(SQLOCK(sq));
3834 		rwnext_exit(qp);
3835 		return (EINVAL);
3836 	}
3837 	if (!(flags & SQ_CIPUT))
3838 		sq->sq_flags = flags | SQ_EXCL;
3839 	sq->sq_count = count + 1;
3840 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3841 	/*
3842 	 * Note: The only message ordering guarantee that rwnext() makes is
3843 	 *	 for the write queue flow-control case. All others (r/w queue
3844 	 *	 with q_count > 0 (or q_first != 0)) are the resposibilty of
3845 	 *	 the queue's rw procedure. This could be genralized here buy
3846 	 *	 running the queue's service procedure, but that wouldn't be
3847 	 *	 the most efficent for all cases.
3848 	 */
3849 	mutex_exit(SQLOCK(sq));
3850 	if (! isread && (qp->q_flag & QFULL)) {
3851 		/*
3852 		 * Write queue may be flow controlled. If so,
3853 		 * mark the queue for wakeup when it's not.
3854 		 */
3855 		mutex_enter(QLOCK(qp));
3856 		if (qp->q_flag & QFULL) {
3857 			qp->q_flag |= QWANTWSYNC;
3858 			mutex_exit(QLOCK(qp));
3859 			rval = EWOULDBLOCK;
3860 			goto out;
3861 		}
3862 		mutex_exit(QLOCK(qp));
3863 	}
3864 
3865 	if (! isread && dp->d_mp)
3866 		STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
3867 		    dp->d_mp->b_datap->db_base);
3868 
3869 	rval = (*proc)(qp, dp);
3870 
3871 	if (isread && dp->d_mp)
3872 		STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
3873 		    dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
3874 out:
3875 	/*
3876 	 * The queue is protected from being freed by sq_count, so it is
3877 	 * safe to call rwnext_exit and reacquire SQLOCK(sq).
3878 	 */
3879 	rwnext_exit(qp);
3880 
3881 	mutex_enter(SQLOCK(sq));
3882 	flags = sq->sq_flags;
3883 	ASSERT(sq->sq_count != 0);
3884 	sq->sq_count--;
3885 	if (flags & SQ_TAIL) {
3886 		putnext_tail(sq, qp, flags);
3887 		/*
3888 		 * The only purpose of this ASSERT is to preserve calling stack
3889 		 * in DEBUG kernel.
3890 		 */
3891 		ASSERT(flags & SQ_TAIL);
3892 		return (rval);
3893 	}
3894 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3895 	/*
3896 	 * Safe to always drop SQ_EXCL:
3897 	 *	Not SQ_CIPUT means we set SQ_EXCL above
3898 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3899 	 *	did a qwriter(INNER) in which case nobody else
3900 	 *	is in the inner perimeter and we are exiting.
3901 	 *
3902 	 * I would like to make the following assertion:
3903 	 *
3904 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3905 	 * 	sq->sq_count == 0);
3906 	 *
3907 	 * which indicates that if we are both putshared and exclusive,
3908 	 * we became exclusive while executing the putproc, and the only
3909 	 * claim on the syncq was the one we dropped a few lines above.
3910 	 * But other threads that enter putnext while the syncq is exclusive
3911 	 * need to make a claim as they may need to drop SQLOCK in the
3912 	 * has_writers case to avoid deadlocks.  If these threads are
3913 	 * delayed or preempted, it is possible that the writer thread can
3914 	 * find out that there are other claims making the (sq_count == 0)
3915 	 * test invalid.
3916 	 */
3917 
3918 	sq->sq_flags = flags & ~SQ_EXCL;
3919 	if (sq->sq_flags & SQ_WANTWAKEUP) {
3920 		sq->sq_flags &= ~SQ_WANTWAKEUP;
3921 		cv_broadcast(&sq->sq_wait);
3922 	}
3923 	mutex_exit(SQLOCK(sq));
3924 	return (rval);
3925 }
3926 
3927 /*
3928  * The purpose of infonext() is to call the info procedure of the next
3929  * (downstream) modules queue.
3930  *
3931  * treated as put entrypoint for perimeter syncronization.
3932  *
3933  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3934  * sync queues). If it is CIPUT sync queue regular sq_count is incremented and
3935  * it does not matter if any regular put entrypoints have been already
3936  * entered.
3937  */
3938 int
3939 infonext(queue_t *qp, infod_t *idp)
3940 {
3941 	queue_t		*nqp;
3942 	syncq_t		*sq;
3943 	uint16_t	count;
3944 	uint16_t 	flags;
3945 	struct qinit	*qi;
3946 	int		(*proc)();
3947 	struct stdata	*stp;
3948 	int		rval;
3949 
3950 	stp = STREAM(qp);
3951 	/*
3952 	 * Prevent q_next from changing by holding sd_lock until
3953 	 * acquiring SQLOCK.
3954 	 */
3955 	mutex_enter(&stp->sd_lock);
3956 	if ((nqp = _WR(qp)) == qp) {
3957 		qp = nqp->q_next;
3958 	} else {
3959 		qp = _RD(nqp->q_next);
3960 	}
3961 	qi = qp->q_qinfo;
3962 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
3963 		mutex_exit(&stp->sd_lock);
3964 		return (EINVAL);
3965 	}
3966 	sq = qp->q_syncq;
3967 	mutex_enter(SQLOCK(sq));
3968 	mutex_exit(&stp->sd_lock);
3969 	count = sq->sq_count;
3970 	flags = sq->sq_flags;
3971 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3972 
3973 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3974 		/*
3975 		 * Wait until we can enter the inner perimeter.
3976 		 */
3977 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3978 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3979 		count = sq->sq_count;
3980 		flags = sq->sq_flags;
3981 	}
3982 
3983 	if (! (flags & SQ_CIPUT))
3984 		sq->sq_flags = flags | SQ_EXCL;
3985 	sq->sq_count = count + 1;
3986 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3987 	mutex_exit(SQLOCK(sq));
3988 
3989 	rval = (*proc)(qp, idp);
3990 
3991 	mutex_enter(SQLOCK(sq));
3992 	flags = sq->sq_flags;
3993 	ASSERT(sq->sq_count != 0);
3994 	sq->sq_count--;
3995 	if (flags & SQ_TAIL) {
3996 		putnext_tail(sq, qp, flags);
3997 		/*
3998 		 * The only purpose of this ASSERT is to preserve calling stack
3999 		 * in DEBUG kernel.
4000 		 */
4001 		ASSERT(flags & SQ_TAIL);
4002 		return (rval);
4003 	}
4004 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
4005 /*
4006  * XXXX
4007  * I am not certain the next comment is correct here.  I need to consider
4008  * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
4009  * might cause other problems.  It just might be safer to drop it if
4010  * !SQ_CIPUT because that is when we set it.
4011  */
4012 	/*
4013 	 * Safe to always drop SQ_EXCL:
4014 	 *	Not SQ_CIPUT means we set SQ_EXCL above
4015 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
4016 	 *	did a qwriter(INNER) in which case nobody else
4017 	 *	is in the inner perimeter and we are exiting.
4018 	 *
4019 	 * I would like to make the following assertion:
4020 	 *
4021 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
4022 	 *	sq->sq_count == 0);
4023 	 *
4024 	 * which indicates that if we are both putshared and exclusive,
4025 	 * we became exclusive while executing the putproc, and the only
4026 	 * claim on the syncq was the one we dropped a few lines above.
4027 	 * But other threads that enter putnext while the syncq is exclusive
4028 	 * need to make a claim as they may need to drop SQLOCK in the
4029 	 * has_writers case to avoid deadlocks.  If these threads are
4030 	 * delayed or preempted, it is possible that the writer thread can
4031 	 * find out that there are other claims making the (sq_count == 0)
4032 	 * test invalid.
4033 	 */
4034 
4035 	sq->sq_flags = flags & ~SQ_EXCL;
4036 	mutex_exit(SQLOCK(sq));
4037 	return (rval);
4038 }
4039 
4040 /*
4041  * Return nonzero if the queue is responsible for struio(), else return 0.
4042  */
4043 int
4044 isuioq(queue_t *q)
4045 {
4046 	if (q->q_flag & QREADR)
4047 		return (STREAM(q)->sd_struiordq == q);
4048 	else
4049 		return (STREAM(q)->sd_struiowrq == q);
4050 }
4051 
4052 #if defined(__sparc)
4053 int disable_putlocks = 0;
4054 #else
4055 int disable_putlocks = 1;
4056 #endif
4057 
4058 /*
4059  * called by create_putlock.
4060  */
4061 static void
4062 create_syncq_putlocks(queue_t *q)
4063 {
4064 	syncq_t	*sq = q->q_syncq;
4065 	ciputctrl_t *cip;
4066 	int i;
4067 
4068 	ASSERT(sq != NULL);
4069 
4070 	ASSERT(disable_putlocks == 0);
4071 	ASSERT(n_ciputctrl >= min_n_ciputctrl);
4072 	ASSERT(ciputctrl_cache != NULL);
4073 
4074 	if (!(sq->sq_type & SQ_CIPUT))
4075 		return;
4076 
4077 	for (i = 0; i <= 1; i++) {
4078 		if (sq->sq_ciputctrl == NULL) {
4079 			cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4080 			SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4081 			mutex_enter(SQLOCK(sq));
4082 			if (sq->sq_ciputctrl != NULL) {
4083 				mutex_exit(SQLOCK(sq));
4084 				kmem_cache_free(ciputctrl_cache, cip);
4085 			} else {
4086 				ASSERT(sq->sq_nciputctrl == 0);
4087 				sq->sq_nciputctrl = n_ciputctrl - 1;
4088 				/*
4089 				 * putnext checks sq_ciputctrl without holding
4090 				 * SQLOCK. if it is not NULL putnext assumes
4091 				 * sq_nciputctrl is initialized. membar below
4092 				 * insures that.
4093 				 */
4094 				membar_producer();
4095 				sq->sq_ciputctrl = cip;
4096 				mutex_exit(SQLOCK(sq));
4097 			}
4098 		}
4099 		ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
4100 		if (i == 1)
4101 			break;
4102 		q = _OTHERQ(q);
4103 		if (!(q->q_flag & QPERQ)) {
4104 			ASSERT(sq == q->q_syncq);
4105 			break;
4106 		}
4107 		ASSERT(q->q_syncq != NULL);
4108 		ASSERT(sq != q->q_syncq);
4109 		sq = q->q_syncq;
4110 		ASSERT(sq->sq_type & SQ_CIPUT);
4111 	}
4112 }
4113 
4114 /*
4115  * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
4116  * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
4117  * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
4118  * starting from q and down to the driver.
4119  *
4120  * This should be called after the affected queues are part of stream
4121  * geometry. It should be called from driver/module open routine after
4122  * qprocson() call. It is also called from nfs syscall where it is known that
4123  * stream is configured and won't change its geometry during create_putlock
4124  * call.
4125  *
4126  * caller normally uses 0 value for the stream argument to speed up MT putnext
4127  * into the perimeter of q for example because its perimeter is per module
4128  * (e.g. IP).
4129  *
4130  * caller normally uses non 0 value for the stream argument to hint the system
4131  * that the stream of q is a very contended global system stream
4132  * (e.g. NFS/UDP) and the part of the stream from q to the driver is
4133  * particularly MT hot.
4134  *
4135  * Caller insures stream plumbing won't happen while we are here and therefore
4136  * q_next can be safely used.
4137  */
4138 
4139 void
4140 create_putlocks(queue_t *q, int stream)
4141 {
4142 	ciputctrl_t	*cip;
4143 	struct stdata	*stp = STREAM(q);
4144 
4145 	q = _WR(q);
4146 	ASSERT(stp != NULL);
4147 
4148 	if (disable_putlocks != 0)
4149 		return;
4150 
4151 	if (n_ciputctrl < min_n_ciputctrl)
4152 		return;
4153 
4154 	ASSERT(ciputctrl_cache != NULL);
4155 
4156 	if (stream != 0 && stp->sd_ciputctrl == NULL) {
4157 		cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4158 		SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4159 		mutex_enter(&stp->sd_lock);
4160 		if (stp->sd_ciputctrl != NULL) {
4161 			mutex_exit(&stp->sd_lock);
4162 			kmem_cache_free(ciputctrl_cache, cip);
4163 		} else {
4164 			ASSERT(stp->sd_nciputctrl == 0);
4165 			stp->sd_nciputctrl = n_ciputctrl - 1;
4166 			/*
4167 			 * putnext checks sd_ciputctrl without holding
4168 			 * sd_lock. if it is not NULL putnext assumes
4169 			 * sd_nciputctrl is initialized. membar below
4170 			 * insures that.
4171 			 */
4172 			membar_producer();
4173 			stp->sd_ciputctrl = cip;
4174 			mutex_exit(&stp->sd_lock);
4175 		}
4176 	}
4177 
4178 	ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);
4179 
4180 	while (_SAMESTR(q)) {
4181 		create_syncq_putlocks(q);
4182 		if (stream == 0)
4183 			return;
4184 		q = q->q_next;
4185 	}
4186 	ASSERT(q != NULL);
4187 	create_syncq_putlocks(q);
4188 }
4189 
4190 /*
4191  * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
4192  * through a stream.
4193  *
4194  * Data currently record per-event is a timestamp, module/driver name,
4195  * downstream module/driver name, optional callstack, event type and a per
4196  * type datum.  Much of the STREAMS framework is instrumented for automatic
4197  * flow tracing (when enabled).  Events can be defined and used by STREAMS
4198  * modules and drivers.
4199  *
4200  * Global objects:
4201  *
4202  *	str_ftevent() - Add a flow-trace event to a dblk.
4203  *	str_ftfree() - Free flow-trace data
4204  *
4205  * Local objects:
4206  *
4207  *	fthdr_cache - pointer to the kmem cache for trace header.
4208  *	ftblk_cache - pointer to the kmem cache for trace data blocks.
4209  */
4210 
4211 int str_ftnever = 1;	/* Don't do STREAMS flow tracing */
4212 int str_ftstack = 0;	/* Don't record event call stacks */
4213 
4214 void
4215 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
4216 {
4217 	ftblk_t *bp = hp->tail;
4218 	ftblk_t *nbp;
4219 	ftevnt_t *ep;
4220 	int ix, nix;
4221 
4222 	ASSERT(hp != NULL);
4223 
4224 	for (;;) {
4225 		if ((ix = bp->ix) == FTBLK_EVNTS) {
4226 			/*
4227 			 * Tail doesn't have room, so need a new tail.
4228 			 *
4229 			 * To make this MT safe, first, allocate a new
4230 			 * ftblk, and initialize it.  To make life a
4231 			 * little easier, reserve the first slot (mostly
4232 			 * by making ix = 1).  When we are finished with
4233 			 * the initialization, CAS this pointer to the
4234 			 * tail.  If this succeeds, this is the new
4235 			 * "next" block.  Otherwise, another thread
4236 			 * got here first, so free the block and start
4237 			 * again.
4238 			 */
4239 			nbp = kmem_cache_alloc(ftblk_cache, KM_NOSLEEP);
4240 			if (nbp == NULL) {
4241 				/* no mem, so punt */
4242 				str_ftnever++;
4243 				/* free up all flow data? */
4244 				return;
4245 			}
4246 			nbp->nxt = NULL;
4247 			nbp->ix = 1;
4248 			/*
4249 			 * Just in case there is another thread about
4250 			 * to get the next index, we need to make sure
4251 			 * the value is there for it.
4252 			 */
4253 			membar_producer();
4254 			if (atomic_cas_ptr(&hp->tail, bp, nbp) == bp) {
4255 				/* CAS was successful */
4256 				bp->nxt = nbp;
4257 				membar_producer();
4258 				bp = nbp;
4259 				ix = 0;
4260 				goto cas_good;
4261 			} else {
4262 				kmem_cache_free(ftblk_cache, nbp);
4263 				bp = hp->tail;
4264 				continue;
4265 			}
4266 		}
4267 		nix = ix + 1;
4268 		if (atomic_cas_32((uint32_t *)&bp->ix, ix, nix) == ix) {
4269 		cas_good:
4270 			if (curthread != hp->thread) {
4271 				hp->thread = curthread;
4272 				evnt |= FTEV_CS;
4273 			}
4274 			if (CPU->cpu_seqid != hp->cpu_seqid) {
4275 				hp->cpu_seqid = CPU->cpu_seqid;
4276 				evnt |= FTEV_PS;
4277 			}
4278 			ep = &bp->ev[ix];
4279 			break;
4280 		}
4281 	}
4282 
4283 	if (evnt & FTEV_QMASK) {
4284 		queue_t *qp = p;
4285 
4286 		if (!(qp->q_flag & QREADR))
4287 			evnt |= FTEV_ISWR;
4288 
4289 		ep->mid = Q2NAME(qp);
4290 
4291 		/*
4292 		 * We only record the next queue name for FTEV_PUTNEXT since
4293 		 * that's the only time we *really* need it, and the putnext()
4294 		 * code ensures that qp->q_next won't vanish.  (We could use
4295 		 * claimstr()/releasestr() but at a performance cost.)
4296 		 */
4297 		if ((evnt & FTEV_MASK) == FTEV_PUTNEXT && qp->q_next != NULL)
4298 			ep->midnext = Q2NAME(qp->q_next);
4299 		else
4300 			ep->midnext = NULL;
4301 	} else {
4302 		ep->mid = p;
4303 		ep->midnext = NULL;
4304 	}
4305 
4306 	if (ep->stk != NULL)
4307 		ep->stk->fs_depth = getpcstack(ep->stk->fs_stk, FTSTK_DEPTH);
4308 
4309 	ep->ts = gethrtime();
4310 	ep->evnt = evnt;
4311 	ep->data = data;
4312 	hp->hash = (hp->hash << 9) + hp->hash;
4313 	hp->hash += (evnt << 16) | data;
4314 	hp->hash += (uintptr_t)ep->mid;
4315 }
4316 
4317 /*
4318  * Free flow-trace data.
4319  */
4320 void
4321 str_ftfree(dblk_t *dbp)
4322 {
4323 	fthdr_t *hp = dbp->db_fthdr;
4324 	ftblk_t *bp = &hp->first;
4325 	ftblk_t *nbp;
4326 
4327 	if (bp != hp->tail || bp->ix != 0) {
4328 		/*
4329 		 * Clear out the hash, have the tail point to itself, and free
4330 		 * any continuation blocks.
4331 		 */
4332 		bp = hp->first.nxt;
4333 		hp->tail = &hp->first;
4334 		hp->hash = 0;
4335 		hp->first.nxt = NULL;
4336 		hp->first.ix = 0;
4337 		while (bp != NULL) {
4338 			nbp = bp->nxt;
4339 			kmem_cache_free(ftblk_cache, bp);
4340 			bp = nbp;
4341 		}
4342 	}
4343 	kmem_cache_free(fthdr_cache, hp);
4344 	dbp->db_fthdr = NULL;
4345 }
4346