xref: /illumos-gate/usr/src/uts/common/io/stream.c (revision c64c5389d6d65f1a8915fd0ff67288100f518172)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
22 /*	  All Rights Reserved  	*/
23 
24 /*
25  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
26  * Use is subject to license terms.
27  */
28 
29 #include <sys/types.h>
30 #include <sys/param.h>
31 #include <sys/thread.h>
32 #include <sys/sysmacros.h>
33 #include <sys/stropts.h>
34 #include <sys/stream.h>
35 #include <sys/strsubr.h>
36 #include <sys/strsun.h>
37 #include <sys/conf.h>
38 #include <sys/debug.h>
39 #include <sys/cmn_err.h>
40 #include <sys/kmem.h>
41 #include <sys/atomic.h>
42 #include <sys/errno.h>
43 #include <sys/vtrace.h>
44 #include <sys/ftrace.h>
45 #include <sys/ontrap.h>
46 #include <sys/multidata.h>
47 #include <sys/multidata_impl.h>
48 #include <sys/sdt.h>
49 #include <sys/strft.h>
50 
51 #ifdef DEBUG
52 #include <sys/kmem_impl.h>
53 #endif
54 
55 /*
56  * This file contains all the STREAMS utility routines that may
57  * be used by modules and drivers.
58  */
59 
60 /*
61  * STREAMS message allocator: principles of operation
62  *
63  * The streams message allocator consists of all the routines that
64  * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
65  * dupb(), freeb() and freemsg().  What follows is a high-level view
66  * of how the allocator works.
67  *
68  * Every streams message consists of one or more mblks, a dblk, and data.
69  * All mblks for all types of messages come from a common mblk_cache.
70  * The dblk and data come in several flavors, depending on how the
71  * message is allocated:
72  *
73  * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
74  *     fixed-size dblk/data caches. For message sizes that are multiples of
75  *     PAGESIZE, dblks are allocated separately from the buffer.
76  *     The associated buffer is allocated by the constructor using kmem_alloc().
77  *     For all other message sizes, dblk and its associated data is allocated
78  *     as a single contiguous chunk of memory.
79  *     Objects in these caches consist of a dblk plus its associated data.
80  *     allocb() determines the nearest-size cache by table lookup:
81  *     the dblk_cache[] array provides the mapping from size to dblk cache.
82  *
83  * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
84  *     kmem_alloc()'ing a buffer for the data and supplying that
85  *     buffer to gesballoc(), described below.
86  *
87  * (3) The four flavors of [d]esballoc[a] are all implemented by a
88  *     common routine, gesballoc() ("generic esballoc").  gesballoc()
89  *     allocates a dblk from the global dblk_esb_cache and sets db_base,
90  *     db_lim and db_frtnp to describe the caller-supplied buffer.
91  *
92  * While there are several routines to allocate messages, there is only
93  * one routine to free messages: freeb().  freeb() simply invokes the
94  * dblk's free method, dbp->db_free(), which is set at allocation time.
95  *
96  * dupb() creates a new reference to a message by allocating a new mblk,
97  * incrementing the dblk reference count and setting the dblk's free
98  * method to dblk_decref().  The dblk's original free method is retained
99  * in db_lastfree.  dblk_decref() decrements the reference count on each
100  * freeb().  If this is not the last reference it just frees the mblk;
101  * if this *is* the last reference, it restores db_free to db_lastfree,
102  * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
103  *
104  * The implementation makes aggressive use of kmem object caching for
105  * maximum performance.  This makes the code simple and compact, but
106  * also a bit abstruse in some places.  The invariants that constitute a
107  * message's constructed state, described below, are more subtle than usual.
108  *
109  * Every dblk has an "attached mblk" as part of its constructed state.
110  * The mblk is allocated by the dblk's constructor and remains attached
111  * until the message is either dup'ed or pulled up.  In the dupb() case
112  * the mblk association doesn't matter until the last free, at which time
113  * dblk_decref() attaches the last mblk to the dblk.  pullupmsg() affects
114  * the mblk association because it swaps the leading mblks of two messages,
115  * so it is responsible for swapping their db_mblk pointers accordingly.
116  * From a constructed-state viewpoint it doesn't matter that a dblk's
117  * attached mblk can change while the message is allocated; all that
118  * matters is that the dblk has *some* attached mblk when it's freed.
119  *
120  * The sizes of the allocb() small-message caches are not magical.
121  * They represent a good trade-off between internal and external
122  * fragmentation for current workloads.  They should be reevaluated
123  * periodically, especially if allocations larger than DBLK_MAX_CACHE
124  * become common.  We use 64-byte alignment so that dblks don't
125  * straddle cache lines unnecessarily.
126  */
127 #define	DBLK_MAX_CACHE		73728
128 #define	DBLK_CACHE_ALIGN	64
129 #define	DBLK_MIN_SIZE		8
130 #define	DBLK_SIZE_SHIFT		3
131 
132 #ifdef _BIG_ENDIAN
133 #define	DBLK_RTFU_SHIFT(field)	\
134 	(8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
135 #else
136 #define	DBLK_RTFU_SHIFT(field)	\
137 	(8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
138 #endif
139 
140 #define	DBLK_RTFU(ref, type, flags, uioflag)	\
141 	(((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
142 	((type) << DBLK_RTFU_SHIFT(db_type)) | \
143 	(((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
144 	((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
145 #define	DBLK_RTFU_REF_MASK	(DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
146 #define	DBLK_RTFU_WORD(dbp)	(*((uint32_t *)&(dbp)->db_ref))
147 #define	MBLK_BAND_FLAG_WORD(mp)	(*((uint32_t *)&(mp)->b_band))
148 
149 static size_t dblk_sizes[] = {
150 #ifdef _LP64
151 	16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856,
152 	8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624,
153 	40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392,
154 #else
155 	64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904,
156 	8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672,
157 	40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440,
158 #endif
159 	DBLK_MAX_CACHE, 0
160 };
161 
162 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
163 static struct kmem_cache *mblk_cache;
164 static struct kmem_cache *dblk_esb_cache;
165 static struct kmem_cache *fthdr_cache;
166 static struct kmem_cache *ftblk_cache;
167 
168 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
169 static mblk_t *allocb_oversize(size_t size, int flags);
170 static int allocb_tryhard_fails;
171 static void frnop_func(void *arg);
172 frtn_t frnop = { frnop_func };
173 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
174 
175 static boolean_t rwnext_enter(queue_t *qp);
176 static void rwnext_exit(queue_t *qp);
177 
178 /*
179  * Patchable mblk/dblk kmem_cache flags.
180  */
181 int dblk_kmem_flags = 0;
182 int mblk_kmem_flags = 0;
183 
184 static int
185 dblk_constructor(void *buf, void *cdrarg, int kmflags)
186 {
187 	dblk_t *dbp = buf;
188 	ssize_t msg_size = (ssize_t)cdrarg;
189 	size_t index;
190 
191 	ASSERT(msg_size != 0);
192 
193 	index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
194 
195 	ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
196 
197 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
198 		return (-1);
199 	if ((msg_size & PAGEOFFSET) == 0) {
200 		dbp->db_base = kmem_alloc(msg_size, kmflags);
201 		if (dbp->db_base == NULL) {
202 			kmem_cache_free(mblk_cache, dbp->db_mblk);
203 			return (-1);
204 		}
205 	} else {
206 		dbp->db_base = (unsigned char *)&dbp[1];
207 	}
208 
209 	dbp->db_mblk->b_datap = dbp;
210 	dbp->db_cache = dblk_cache[index];
211 	dbp->db_lim = dbp->db_base + msg_size;
212 	dbp->db_free = dbp->db_lastfree = dblk_lastfree;
213 	dbp->db_frtnp = NULL;
214 	dbp->db_fthdr = NULL;
215 	dbp->db_credp = NULL;
216 	dbp->db_cpid = -1;
217 	dbp->db_struioflag = 0;
218 	dbp->db_struioun.cksum.flags = 0;
219 	return (0);
220 }
221 
222 /*ARGSUSED*/
223 static int
224 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
225 {
226 	dblk_t *dbp = buf;
227 
228 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
229 		return (-1);
230 	dbp->db_mblk->b_datap = dbp;
231 	dbp->db_cache = dblk_esb_cache;
232 	dbp->db_fthdr = NULL;
233 	dbp->db_credp = NULL;
234 	dbp->db_cpid = -1;
235 	dbp->db_struioflag = 0;
236 	dbp->db_struioun.cksum.flags = 0;
237 	return (0);
238 }
239 
240 static int
241 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
242 {
243 	dblk_t *dbp = buf;
244 	bcache_t *bcp = cdrarg;
245 
246 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
247 		return (-1);
248 
249 	dbp->db_base = kmem_cache_alloc(bcp->buffer_cache, kmflags);
250 	if (dbp->db_base == NULL) {
251 		kmem_cache_free(mblk_cache, dbp->db_mblk);
252 		return (-1);
253 	}
254 
255 	dbp->db_mblk->b_datap = dbp;
256 	dbp->db_cache = (void *)bcp;
257 	dbp->db_lim = dbp->db_base + bcp->size;
258 	dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
259 	dbp->db_frtnp = NULL;
260 	dbp->db_fthdr = NULL;
261 	dbp->db_credp = NULL;
262 	dbp->db_cpid = -1;
263 	dbp->db_struioflag = 0;
264 	dbp->db_struioun.cksum.flags = 0;
265 	return (0);
266 }
267 
268 /*ARGSUSED*/
269 static void
270 dblk_destructor(void *buf, void *cdrarg)
271 {
272 	dblk_t *dbp = buf;
273 	ssize_t msg_size = (ssize_t)cdrarg;
274 
275 	ASSERT(dbp->db_mblk->b_datap == dbp);
276 	ASSERT(msg_size != 0);
277 	ASSERT(dbp->db_struioflag == 0);
278 	ASSERT(dbp->db_struioun.cksum.flags == 0);
279 
280 	if ((msg_size & PAGEOFFSET) == 0) {
281 		kmem_free(dbp->db_base, msg_size);
282 	}
283 
284 	kmem_cache_free(mblk_cache, dbp->db_mblk);
285 }
286 
287 static void
288 bcache_dblk_destructor(void *buf, void *cdrarg)
289 {
290 	dblk_t *dbp = buf;
291 	bcache_t *bcp = cdrarg;
292 
293 	kmem_cache_free(bcp->buffer_cache, dbp->db_base);
294 
295 	ASSERT(dbp->db_mblk->b_datap == dbp);
296 	ASSERT(dbp->db_struioflag == 0);
297 	ASSERT(dbp->db_struioun.cksum.flags == 0);
298 
299 	kmem_cache_free(mblk_cache, dbp->db_mblk);
300 }
301 
302 /* ARGSUSED */
303 static int
304 ftblk_constructor(void *buf, void *cdrarg, int kmflags)
305 {
306 	ftblk_t *fbp = buf;
307 	int i;
308 
309 	bzero(fbp, sizeof (ftblk_t));
310 	if (str_ftstack != 0) {
311 		for (i = 0; i < FTBLK_EVNTS; i++)
312 			fbp->ev[i].stk = kmem_alloc(sizeof (ftstk_t), kmflags);
313 	}
314 
315 	return (0);
316 }
317 
318 /* ARGSUSED */
319 static void
320 ftblk_destructor(void *buf, void *cdrarg)
321 {
322 	ftblk_t *fbp = buf;
323 	int i;
324 
325 	if (str_ftstack != 0) {
326 		for (i = 0; i < FTBLK_EVNTS; i++) {
327 			if (fbp->ev[i].stk != NULL) {
328 				kmem_free(fbp->ev[i].stk, sizeof (ftstk_t));
329 				fbp->ev[i].stk = NULL;
330 			}
331 		}
332 	}
333 }
334 
335 static int
336 fthdr_constructor(void *buf, void *cdrarg, int kmflags)
337 {
338 	fthdr_t *fhp = buf;
339 
340 	return (ftblk_constructor(&fhp->first, cdrarg, kmflags));
341 }
342 
343 static void
344 fthdr_destructor(void *buf, void *cdrarg)
345 {
346 	fthdr_t *fhp = buf;
347 
348 	ftblk_destructor(&fhp->first, cdrarg);
349 }
350 
351 void
352 streams_msg_init(void)
353 {
354 	char name[40];
355 	size_t size;
356 	size_t lastsize = DBLK_MIN_SIZE;
357 	size_t *sizep;
358 	struct kmem_cache *cp;
359 	size_t tot_size;
360 	int offset;
361 
362 	mblk_cache = kmem_cache_create("streams_mblk", sizeof (mblk_t), 32,
363 	    NULL, NULL, NULL, NULL, NULL, mblk_kmem_flags);
364 
365 	for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
366 
367 		if ((offset = (size & PAGEOFFSET)) != 0) {
368 			/*
369 			 * We are in the middle of a page, dblk should
370 			 * be allocated on the same page
371 			 */
372 			tot_size = size + sizeof (dblk_t);
373 			ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
374 			    < PAGESIZE);
375 			ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
376 
377 		} else {
378 
379 			/*
380 			 * buf size is multiple of page size, dblk and
381 			 * buffer are allocated separately.
382 			 */
383 
384 			ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
385 			tot_size = sizeof (dblk_t);
386 		}
387 
388 		(void) sprintf(name, "streams_dblk_%ld", size);
389 		cp = kmem_cache_create(name, tot_size, DBLK_CACHE_ALIGN,
390 		    dblk_constructor, dblk_destructor, NULL, (void *)(size),
391 		    NULL, dblk_kmem_flags);
392 
393 		while (lastsize <= size) {
394 			dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
395 			lastsize += DBLK_MIN_SIZE;
396 		}
397 	}
398 
399 	dblk_esb_cache = kmem_cache_create("streams_dblk_esb", sizeof (dblk_t),
400 	    DBLK_CACHE_ALIGN, dblk_esb_constructor, dblk_destructor, NULL,
401 	    (void *)sizeof (dblk_t), NULL, dblk_kmem_flags);
402 	fthdr_cache = kmem_cache_create("streams_fthdr", sizeof (fthdr_t), 32,
403 	    fthdr_constructor, fthdr_destructor, NULL, NULL, NULL, 0);
404 	ftblk_cache = kmem_cache_create("streams_ftblk", sizeof (ftblk_t), 32,
405 	    ftblk_constructor, ftblk_destructor, NULL, NULL, NULL, 0);
406 
407 	/* Initialize Multidata caches */
408 	mmd_init();
409 
410 	/* initialize throttling queue for esballoc */
411 	esballoc_queue_init();
412 }
413 
414 /*ARGSUSED*/
415 mblk_t *
416 allocb(size_t size, uint_t pri)
417 {
418 	dblk_t *dbp;
419 	mblk_t *mp;
420 	size_t index;
421 
422 	index =  (size - 1)  >> DBLK_SIZE_SHIFT;
423 
424 	if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
425 		if (size != 0) {
426 			mp = allocb_oversize(size, KM_NOSLEEP);
427 			goto out;
428 		}
429 		index = 0;
430 	}
431 
432 	if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
433 		mp = NULL;
434 		goto out;
435 	}
436 
437 	mp = dbp->db_mblk;
438 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
439 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
440 	mp->b_rptr = mp->b_wptr = dbp->db_base;
441 	mp->b_queue = NULL;
442 	MBLK_BAND_FLAG_WORD(mp) = 0;
443 	STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
444 out:
445 	FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
446 
447 	return (mp);
448 }
449 
450 /*
451  * Allocate an mblk taking db_credp and db_cpid from the template.
452  * Allow the cred to be NULL.
453  */
454 mblk_t *
455 allocb_tmpl(size_t size, const mblk_t *tmpl)
456 {
457 	mblk_t *mp = allocb(size, 0);
458 
459 	if (mp != NULL) {
460 		dblk_t *src = tmpl->b_datap;
461 		dblk_t *dst = mp->b_datap;
462 		cred_t *cr;
463 		pid_t cpid;
464 
465 		cr = msg_getcred(tmpl, &cpid);
466 		if (cr != NULL)
467 			crhold(dst->db_credp = cr);
468 		dst->db_cpid = cpid;
469 		dst->db_type = src->db_type;
470 	}
471 	return (mp);
472 }
473 
474 mblk_t *
475 allocb_cred(size_t size, cred_t *cr, pid_t cpid)
476 {
477 	mblk_t *mp = allocb(size, 0);
478 
479 	ASSERT(cr != NULL);
480 	if (mp != NULL) {
481 		dblk_t *dbp = mp->b_datap;
482 
483 		crhold(dbp->db_credp = cr);
484 		dbp->db_cpid = cpid;
485 	}
486 	return (mp);
487 }
488 
489 mblk_t *
490 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr, pid_t cpid)
491 {
492 	mblk_t *mp = allocb_wait(size, 0, flags, error);
493 
494 	ASSERT(cr != NULL);
495 	if (mp != NULL) {
496 		dblk_t *dbp = mp->b_datap;
497 
498 		crhold(dbp->db_credp = cr);
499 		dbp->db_cpid = cpid;
500 	}
501 
502 	return (mp);
503 }
504 
505 /*
506  * Extract the db_cred (and optionally db_cpid) from a message.
507  * We find the first mblk which has a non-NULL db_cred and use that.
508  * If none found we return NULL.
509  * Does NOT get a hold on the cred.
510  */
511 cred_t *
512 msg_getcred(const mblk_t *mp, pid_t *cpidp)
513 {
514 	cred_t *cr = NULL;
515 	cred_t *cr2;
516 	mblk_t *mp2;
517 
518 	while (mp != NULL) {
519 		dblk_t *dbp = mp->b_datap;
520 
521 		cr = dbp->db_credp;
522 		if (cr == NULL) {
523 			mp = mp->b_cont;
524 			continue;
525 		}
526 		if (cpidp != NULL)
527 			*cpidp = dbp->db_cpid;
528 
529 #ifdef DEBUG
530 		/*
531 		 * Normally there should at most one db_credp in a message.
532 		 * But if there are multiple (as in the case of some M_IOC*
533 		 * and some internal messages in TCP/IP bind logic) then
534 		 * they must be identical in the normal case.
535 		 * However, a socket can be shared between different uids
536 		 * in which case data queued in TCP would be from different
537 		 * creds. Thus we can only assert for the zoneid being the
538 		 * same. Due to Multi-level Level Ports for TX, some
539 		 * cred_t can have a NULL cr_zone, and we skip the comparison
540 		 * in that case.
541 		 */
542 		mp2 = mp->b_cont;
543 		while (mp2 != NULL) {
544 			cr2 = DB_CRED(mp2);
545 			if (cr2 != NULL) {
546 				DTRACE_PROBE2(msg__getcred,
547 				    cred_t *, cr, cred_t *, cr2);
548 				ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
549 				    crgetzone(cr) == NULL ||
550 				    crgetzone(cr2) == NULL);
551 			}
552 			mp2 = mp2->b_cont;
553 		}
554 #endif
555 		return (cr);
556 	}
557 	if (cpidp != NULL)
558 		*cpidp = NOPID;
559 	return (NULL);
560 }
561 
562 /*
563  * Variant of msg_getcred which, when a cred is found
564  * 1. Returns with a hold on the cred
565  * 2. Clears the first cred in the mblk.
566  * This is more efficient to use than a msg_getcred() + crhold() when
567  * the message is freed after the cred has been extracted.
568  *
569  * The caller is responsible for ensuring that there is no other reference
570  * on the message since db_credp can not be cleared when there are other
571  * references.
572  */
573 cred_t *
574 msg_extractcred(mblk_t *mp, pid_t *cpidp)
575 {
576 	cred_t *cr = NULL;
577 	cred_t *cr2;
578 	mblk_t *mp2;
579 
580 	while (mp != NULL) {
581 		dblk_t *dbp = mp->b_datap;
582 
583 		cr = dbp->db_credp;
584 		if (cr == NULL) {
585 			mp = mp->b_cont;
586 			continue;
587 		}
588 		ASSERT(dbp->db_ref == 1);
589 		dbp->db_credp = NULL;
590 		if (cpidp != NULL)
591 			*cpidp = dbp->db_cpid;
592 #ifdef DEBUG
593 		/*
594 		 * Normally there should at most one db_credp in a message.
595 		 * But if there are multiple (as in the case of some M_IOC*
596 		 * and some internal messages in TCP/IP bind logic) then
597 		 * they must be identical in the normal case.
598 		 * However, a socket can be shared between different uids
599 		 * in which case data queued in TCP would be from different
600 		 * creds. Thus we can only assert for the zoneid being the
601 		 * same. Due to Multi-level Level Ports for TX, some
602 		 * cred_t can have a NULL cr_zone, and we skip the comparison
603 		 * in that case.
604 		 */
605 		mp2 = mp->b_cont;
606 		while (mp2 != NULL) {
607 			cr2 = DB_CRED(mp2);
608 			if (cr2 != NULL) {
609 				DTRACE_PROBE2(msg__extractcred,
610 				    cred_t *, cr, cred_t *, cr2);
611 				ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
612 				    crgetzone(cr) == NULL ||
613 				    crgetzone(cr2) == NULL);
614 			}
615 			mp2 = mp2->b_cont;
616 		}
617 #endif
618 		return (cr);
619 	}
620 	return (NULL);
621 }
622 /*
623  * Get the label for a message. Uses the first mblk in the message
624  * which has a non-NULL db_credp.
625  * Returns NULL if there is no credp.
626  */
627 extern struct ts_label_s *
628 msg_getlabel(const mblk_t *mp)
629 {
630 	cred_t *cr = msg_getcred(mp, NULL);
631 
632 	if (cr == NULL)
633 		return (NULL);
634 
635 	return (crgetlabel(cr));
636 }
637 
638 void
639 freeb(mblk_t *mp)
640 {
641 	dblk_t *dbp = mp->b_datap;
642 
643 	ASSERT(dbp->db_ref > 0);
644 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
645 	FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
646 
647 	STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
648 
649 	dbp->db_free(mp, dbp);
650 }
651 
652 void
653 freemsg(mblk_t *mp)
654 {
655 	FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
656 	while (mp) {
657 		dblk_t *dbp = mp->b_datap;
658 		mblk_t *mp_cont = mp->b_cont;
659 
660 		ASSERT(dbp->db_ref > 0);
661 		ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
662 
663 		STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
664 
665 		dbp->db_free(mp, dbp);
666 		mp = mp_cont;
667 	}
668 }
669 
670 /*
671  * Reallocate a block for another use.  Try hard to use the old block.
672  * If the old data is wanted (copy), leave b_wptr at the end of the data,
673  * otherwise return b_wptr = b_rptr.
674  *
675  * This routine is private and unstable.
676  */
677 mblk_t	*
678 reallocb(mblk_t *mp, size_t size, uint_t copy)
679 {
680 	mblk_t		*mp1;
681 	unsigned char	*old_rptr;
682 	ptrdiff_t	cur_size;
683 
684 	if (mp == NULL)
685 		return (allocb(size, BPRI_HI));
686 
687 	cur_size = mp->b_wptr - mp->b_rptr;
688 	old_rptr = mp->b_rptr;
689 
690 	ASSERT(mp->b_datap->db_ref != 0);
691 
692 	if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
693 		/*
694 		 * If the data is wanted and it will fit where it is, no
695 		 * work is required.
696 		 */
697 		if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
698 			return (mp);
699 
700 		mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
701 		mp1 = mp;
702 	} else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
703 		/* XXX other mp state could be copied too, db_flags ... ? */
704 		mp1->b_cont = mp->b_cont;
705 	} else {
706 		return (NULL);
707 	}
708 
709 	if (copy) {
710 		bcopy(old_rptr, mp1->b_rptr, cur_size);
711 		mp1->b_wptr = mp1->b_rptr + cur_size;
712 	}
713 
714 	if (mp != mp1)
715 		freeb(mp);
716 
717 	return (mp1);
718 }
719 
720 static void
721 dblk_lastfree(mblk_t *mp, dblk_t *dbp)
722 {
723 	ASSERT(dbp->db_mblk == mp);
724 	if (dbp->db_fthdr != NULL)
725 		str_ftfree(dbp);
726 
727 	/* set credp and projid to be 'unspecified' before returning to cache */
728 	if (dbp->db_credp != NULL) {
729 		crfree(dbp->db_credp);
730 		dbp->db_credp = NULL;
731 	}
732 	dbp->db_cpid = -1;
733 
734 	/* Reset the struioflag and the checksum flag fields */
735 	dbp->db_struioflag = 0;
736 	dbp->db_struioun.cksum.flags = 0;
737 
738 	/* and the COOKED and/or UIOA flag(s) */
739 	dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA);
740 
741 	kmem_cache_free(dbp->db_cache, dbp);
742 }
743 
744 static void
745 dblk_decref(mblk_t *mp, dblk_t *dbp)
746 {
747 	if (dbp->db_ref != 1) {
748 		uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
749 		    -(1 << DBLK_RTFU_SHIFT(db_ref)));
750 		/*
751 		 * atomic_add_32_nv() just decremented db_ref, so we no longer
752 		 * have a reference to the dblk, which means another thread
753 		 * could free it.  Therefore we cannot examine the dblk to
754 		 * determine whether ours was the last reference.  Instead,
755 		 * we extract the new and minimum reference counts from rtfu.
756 		 * Note that all we're really saying is "if (ref != refmin)".
757 		 */
758 		if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
759 		    ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
760 			kmem_cache_free(mblk_cache, mp);
761 			return;
762 		}
763 	}
764 	dbp->db_mblk = mp;
765 	dbp->db_free = dbp->db_lastfree;
766 	dbp->db_lastfree(mp, dbp);
767 }
768 
769 mblk_t *
770 dupb(mblk_t *mp)
771 {
772 	dblk_t *dbp = mp->b_datap;
773 	mblk_t *new_mp;
774 	uint32_t oldrtfu, newrtfu;
775 
776 	if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
777 		goto out;
778 
779 	new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
780 	new_mp->b_rptr = mp->b_rptr;
781 	new_mp->b_wptr = mp->b_wptr;
782 	new_mp->b_datap = dbp;
783 	new_mp->b_queue = NULL;
784 	MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
785 
786 	STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
787 
788 	dbp->db_free = dblk_decref;
789 	do {
790 		ASSERT(dbp->db_ref > 0);
791 		oldrtfu = DBLK_RTFU_WORD(dbp);
792 		newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
793 		/*
794 		 * If db_ref is maxed out we can't dup this message anymore.
795 		 */
796 		if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
797 			kmem_cache_free(mblk_cache, new_mp);
798 			new_mp = NULL;
799 			goto out;
800 		}
801 	} while (atomic_cas_32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) !=
802 	    oldrtfu);
803 
804 out:
805 	FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
806 	return (new_mp);
807 }
808 
809 static void
810 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
811 {
812 	frtn_t *frp = dbp->db_frtnp;
813 
814 	ASSERT(dbp->db_mblk == mp);
815 	frp->free_func(frp->free_arg);
816 	if (dbp->db_fthdr != NULL)
817 		str_ftfree(dbp);
818 
819 	/* set credp and projid to be 'unspecified' before returning to cache */
820 	if (dbp->db_credp != NULL) {
821 		crfree(dbp->db_credp);
822 		dbp->db_credp = NULL;
823 	}
824 	dbp->db_cpid = -1;
825 	dbp->db_struioflag = 0;
826 	dbp->db_struioun.cksum.flags = 0;
827 
828 	kmem_cache_free(dbp->db_cache, dbp);
829 }
830 
831 /*ARGSUSED*/
832 static void
833 frnop_func(void *arg)
834 {
835 }
836 
837 /*
838  * Generic esballoc used to implement the four flavors: [d]esballoc[a].
839  */
840 static mblk_t *
841 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
842     void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
843 {
844 	dblk_t *dbp;
845 	mblk_t *mp;
846 
847 	ASSERT(base != NULL && frp != NULL);
848 
849 	if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
850 		mp = NULL;
851 		goto out;
852 	}
853 
854 	mp = dbp->db_mblk;
855 	dbp->db_base = base;
856 	dbp->db_lim = base + size;
857 	dbp->db_free = dbp->db_lastfree = lastfree;
858 	dbp->db_frtnp = frp;
859 	DBLK_RTFU_WORD(dbp) = db_rtfu;
860 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
861 	mp->b_rptr = mp->b_wptr = base;
862 	mp->b_queue = NULL;
863 	MBLK_BAND_FLAG_WORD(mp) = 0;
864 
865 out:
866 	FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
867 	return (mp);
868 }
869 
870 /*ARGSUSED*/
871 mblk_t *
872 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
873 {
874 	mblk_t *mp;
875 
876 	/*
877 	 * Note that this is structured to allow the common case (i.e.
878 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
879 	 * call optimization.
880 	 */
881 	if (!str_ftnever) {
882 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
883 		    frp, freebs_enqueue, KM_NOSLEEP);
884 
885 		if (mp != NULL)
886 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
887 		return (mp);
888 	}
889 
890 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
891 	    frp, freebs_enqueue, KM_NOSLEEP));
892 }
893 
894 /*
895  * Same as esballoc() but sleeps waiting for memory.
896  */
897 /*ARGSUSED*/
898 mblk_t *
899 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
900 {
901 	mblk_t *mp;
902 
903 	/*
904 	 * Note that this is structured to allow the common case (i.e.
905 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
906 	 * call optimization.
907 	 */
908 	if (!str_ftnever) {
909 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
910 		    frp, freebs_enqueue, KM_SLEEP);
911 
912 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
913 		return (mp);
914 	}
915 
916 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
917 	    frp, freebs_enqueue, KM_SLEEP));
918 }
919 
920 /*ARGSUSED*/
921 mblk_t *
922 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
923 {
924 	mblk_t *mp;
925 
926 	/*
927 	 * Note that this is structured to allow the common case (i.e.
928 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
929 	 * call optimization.
930 	 */
931 	if (!str_ftnever) {
932 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
933 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
934 
935 		if (mp != NULL)
936 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
937 		return (mp);
938 	}
939 
940 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
941 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
942 }
943 
944 /*ARGSUSED*/
945 mblk_t *
946 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
947 {
948 	mblk_t *mp;
949 
950 	/*
951 	 * Note that this is structured to allow the common case (i.e.
952 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
953 	 * call optimization.
954 	 */
955 	if (!str_ftnever) {
956 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
957 		    frp, freebs_enqueue, KM_NOSLEEP);
958 
959 		if (mp != NULL)
960 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
961 		return (mp);
962 	}
963 
964 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
965 	    frp, freebs_enqueue, KM_NOSLEEP));
966 }
967 
968 /*ARGSUSED*/
969 mblk_t *
970 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
971 {
972 	mblk_t *mp;
973 
974 	/*
975 	 * Note that this is structured to allow the common case (i.e.
976 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
977 	 * call optimization.
978 	 */
979 	if (!str_ftnever) {
980 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
981 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
982 
983 		if (mp != NULL)
984 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
985 		return (mp);
986 	}
987 
988 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
989 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
990 }
991 
992 static void
993 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
994 {
995 	bcache_t *bcp = dbp->db_cache;
996 
997 	ASSERT(dbp->db_mblk == mp);
998 	if (dbp->db_fthdr != NULL)
999 		str_ftfree(dbp);
1000 
1001 	/* set credp and projid to be 'unspecified' before returning to cache */
1002 	if (dbp->db_credp != NULL) {
1003 		crfree(dbp->db_credp);
1004 		dbp->db_credp = NULL;
1005 	}
1006 	dbp->db_cpid = -1;
1007 	dbp->db_struioflag = 0;
1008 	dbp->db_struioun.cksum.flags = 0;
1009 
1010 	mutex_enter(&bcp->mutex);
1011 	kmem_cache_free(bcp->dblk_cache, dbp);
1012 	bcp->alloc--;
1013 
1014 	if (bcp->alloc == 0 && bcp->destroy != 0) {
1015 		kmem_cache_destroy(bcp->dblk_cache);
1016 		kmem_cache_destroy(bcp->buffer_cache);
1017 		mutex_exit(&bcp->mutex);
1018 		mutex_destroy(&bcp->mutex);
1019 		kmem_free(bcp, sizeof (bcache_t));
1020 	} else {
1021 		mutex_exit(&bcp->mutex);
1022 	}
1023 }
1024 
1025 bcache_t *
1026 bcache_create(char *name, size_t size, uint_t align)
1027 {
1028 	bcache_t *bcp;
1029 	char buffer[255];
1030 
1031 	ASSERT((align & (align - 1)) == 0);
1032 
1033 	if ((bcp = kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == NULL)
1034 		return (NULL);
1035 
1036 	bcp->size = size;
1037 	bcp->align = align;
1038 	bcp->alloc = 0;
1039 	bcp->destroy = 0;
1040 
1041 	mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
1042 
1043 	(void) sprintf(buffer, "%s_buffer_cache", name);
1044 	bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
1045 	    NULL, NULL, NULL, 0);
1046 	(void) sprintf(buffer, "%s_dblk_cache", name);
1047 	bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
1048 	    DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
1049 	    NULL, (void *)bcp, NULL, 0);
1050 
1051 	return (bcp);
1052 }
1053 
1054 void
1055 bcache_destroy(bcache_t *bcp)
1056 {
1057 	ASSERT(bcp != NULL);
1058 
1059 	mutex_enter(&bcp->mutex);
1060 	if (bcp->alloc == 0) {
1061 		kmem_cache_destroy(bcp->dblk_cache);
1062 		kmem_cache_destroy(bcp->buffer_cache);
1063 		mutex_exit(&bcp->mutex);
1064 		mutex_destroy(&bcp->mutex);
1065 		kmem_free(bcp, sizeof (bcache_t));
1066 	} else {
1067 		bcp->destroy++;
1068 		mutex_exit(&bcp->mutex);
1069 	}
1070 }
1071 
1072 /*ARGSUSED*/
1073 mblk_t *
1074 bcache_allocb(bcache_t *bcp, uint_t pri)
1075 {
1076 	dblk_t *dbp;
1077 	mblk_t *mp = NULL;
1078 
1079 	ASSERT(bcp != NULL);
1080 
1081 	mutex_enter(&bcp->mutex);
1082 	if (bcp->destroy != 0) {
1083 		mutex_exit(&bcp->mutex);
1084 		goto out;
1085 	}
1086 
1087 	if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
1088 		mutex_exit(&bcp->mutex);
1089 		goto out;
1090 	}
1091 	bcp->alloc++;
1092 	mutex_exit(&bcp->mutex);
1093 
1094 	ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
1095 
1096 	mp = dbp->db_mblk;
1097 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1098 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
1099 	mp->b_rptr = mp->b_wptr = dbp->db_base;
1100 	mp->b_queue = NULL;
1101 	MBLK_BAND_FLAG_WORD(mp) = 0;
1102 	STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
1103 out:
1104 	FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
1105 
1106 	return (mp);
1107 }
1108 
1109 static void
1110 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
1111 {
1112 	ASSERT(dbp->db_mblk == mp);
1113 	if (dbp->db_fthdr != NULL)
1114 		str_ftfree(dbp);
1115 
1116 	/* set credp and projid to be 'unspecified' before returning to cache */
1117 	if (dbp->db_credp != NULL) {
1118 		crfree(dbp->db_credp);
1119 		dbp->db_credp = NULL;
1120 	}
1121 	dbp->db_cpid = -1;
1122 	dbp->db_struioflag = 0;
1123 	dbp->db_struioun.cksum.flags = 0;
1124 
1125 	kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
1126 	kmem_cache_free(dbp->db_cache, dbp);
1127 }
1128 
1129 static mblk_t *
1130 allocb_oversize(size_t size, int kmflags)
1131 {
1132 	mblk_t *mp;
1133 	void *buf;
1134 
1135 	size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
1136 	if ((buf = kmem_alloc(size, kmflags)) == NULL)
1137 		return (NULL);
1138 	if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
1139 	    &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
1140 		kmem_free(buf, size);
1141 
1142 	if (mp != NULL)
1143 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
1144 
1145 	return (mp);
1146 }
1147 
1148 mblk_t *
1149 allocb_tryhard(size_t target_size)
1150 {
1151 	size_t size;
1152 	mblk_t *bp;
1153 
1154 	for (size = target_size; size < target_size + 512;
1155 	    size += DBLK_CACHE_ALIGN)
1156 		if ((bp = allocb(size, BPRI_HI)) != NULL)
1157 			return (bp);
1158 	allocb_tryhard_fails++;
1159 	return (NULL);
1160 }
1161 
1162 /*
1163  * This routine is consolidation private for STREAMS internal use
1164  * This routine may only be called from sync routines (i.e., not
1165  * from put or service procedures).  It is located here (rather
1166  * than strsubr.c) so that we don't have to expose all of the
1167  * allocb() implementation details in header files.
1168  */
1169 mblk_t *
1170 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
1171 {
1172 	dblk_t *dbp;
1173 	mblk_t *mp;
1174 	size_t index;
1175 
1176 	index = (size -1) >> DBLK_SIZE_SHIFT;
1177 
1178 	if (flags & STR_NOSIG) {
1179 		if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
1180 			if (size != 0) {
1181 				mp = allocb_oversize(size, KM_SLEEP);
1182 				FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
1183 				    (uintptr_t)mp);
1184 				return (mp);
1185 			}
1186 			index = 0;
1187 		}
1188 
1189 		dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
1190 		mp = dbp->db_mblk;
1191 		DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1192 		mp->b_next = mp->b_prev = mp->b_cont = NULL;
1193 		mp->b_rptr = mp->b_wptr = dbp->db_base;
1194 		mp->b_queue = NULL;
1195 		MBLK_BAND_FLAG_WORD(mp) = 0;
1196 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
1197 
1198 		FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
1199 
1200 	} else {
1201 		while ((mp = allocb(size, pri)) == NULL) {
1202 			if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
1203 				return (NULL);
1204 		}
1205 	}
1206 
1207 	return (mp);
1208 }
1209 
1210 /*
1211  * Call function 'func' with 'arg' when a class zero block can
1212  * be allocated with priority 'pri'.
1213  */
1214 bufcall_id_t
1215 esbbcall(uint_t pri, void (*func)(void *), void *arg)
1216 {
1217 	return (bufcall(1, pri, func, arg));
1218 }
1219 
1220 /*
1221  * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
1222  * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
1223  * This provides consistency for all internal allocators of ioctl.
1224  */
1225 mblk_t *
1226 mkiocb(uint_t cmd)
1227 {
1228 	struct iocblk	*ioc;
1229 	mblk_t		*mp;
1230 
1231 	/*
1232 	 * Allocate enough space for any of the ioctl related messages.
1233 	 */
1234 	if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
1235 		return (NULL);
1236 
1237 	bzero(mp->b_rptr, sizeof (union ioctypes));
1238 
1239 	/*
1240 	 * Set the mblk_t information and ptrs correctly.
1241 	 */
1242 	mp->b_wptr += sizeof (struct iocblk);
1243 	mp->b_datap->db_type = M_IOCTL;
1244 
1245 	/*
1246 	 * Fill in the fields.
1247 	 */
1248 	ioc		= (struct iocblk *)mp->b_rptr;
1249 	ioc->ioc_cmd	= cmd;
1250 	ioc->ioc_cr	= kcred;
1251 	ioc->ioc_id	= getiocseqno();
1252 	ioc->ioc_flag	= IOC_NATIVE;
1253 	return (mp);
1254 }
1255 
1256 /*
1257  * test if block of given size can be allocated with a request of
1258  * the given priority.
1259  * 'pri' is no longer used, but is retained for compatibility.
1260  */
1261 /* ARGSUSED */
1262 int
1263 testb(size_t size, uint_t pri)
1264 {
1265 	return ((size + sizeof (dblk_t)) <= kmem_avail());
1266 }
1267 
1268 /*
1269  * Call function 'func' with argument 'arg' when there is a reasonably
1270  * good chance that a block of size 'size' can be allocated.
1271  * 'pri' is no longer used, but is retained for compatibility.
1272  */
1273 /* ARGSUSED */
1274 bufcall_id_t
1275 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
1276 {
1277 	static long bid = 1;	/* always odd to save checking for zero */
1278 	bufcall_id_t bc_id;
1279 	struct strbufcall *bcp;
1280 
1281 	if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
1282 		return (0);
1283 
1284 	bcp->bc_func = func;
1285 	bcp->bc_arg = arg;
1286 	bcp->bc_size = size;
1287 	bcp->bc_next = NULL;
1288 	bcp->bc_executor = NULL;
1289 
1290 	mutex_enter(&strbcall_lock);
1291 	/*
1292 	 * After bcp is linked into strbcalls and strbcall_lock is dropped there
1293 	 * should be no references to bcp since it may be freed by
1294 	 * runbufcalls(). Since bcp_id field is returned, we save its value in
1295 	 * the local var.
1296 	 */
1297 	bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2);	/* keep it odd */
1298 
1299 	/*
1300 	 * add newly allocated stream event to existing
1301 	 * linked list of events.
1302 	 */
1303 	if (strbcalls.bc_head == NULL) {
1304 		strbcalls.bc_head = strbcalls.bc_tail = bcp;
1305 	} else {
1306 		strbcalls.bc_tail->bc_next = bcp;
1307 		strbcalls.bc_tail = bcp;
1308 	}
1309 
1310 	cv_signal(&strbcall_cv);
1311 	mutex_exit(&strbcall_lock);
1312 	return (bc_id);
1313 }
1314 
1315 /*
1316  * Cancel a bufcall request.
1317  */
1318 void
1319 unbufcall(bufcall_id_t id)
1320 {
1321 	strbufcall_t *bcp, *pbcp;
1322 
1323 	mutex_enter(&strbcall_lock);
1324 again:
1325 	pbcp = NULL;
1326 	for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
1327 		if (id == bcp->bc_id)
1328 			break;
1329 		pbcp = bcp;
1330 	}
1331 	if (bcp) {
1332 		if (bcp->bc_executor != NULL) {
1333 			if (bcp->bc_executor != curthread) {
1334 				cv_wait(&bcall_cv, &strbcall_lock);
1335 				goto again;
1336 			}
1337 		} else {
1338 			if (pbcp)
1339 				pbcp->bc_next = bcp->bc_next;
1340 			else
1341 				strbcalls.bc_head = bcp->bc_next;
1342 			if (bcp == strbcalls.bc_tail)
1343 				strbcalls.bc_tail = pbcp;
1344 			kmem_free(bcp, sizeof (strbufcall_t));
1345 		}
1346 	}
1347 	mutex_exit(&strbcall_lock);
1348 }
1349 
1350 /*
1351  * Duplicate a message block by block (uses dupb), returning
1352  * a pointer to the duplicate message.
1353  * Returns a non-NULL value only if the entire message
1354  * was dup'd.
1355  */
1356 mblk_t *
1357 dupmsg(mblk_t *bp)
1358 {
1359 	mblk_t *head, *nbp;
1360 
1361 	if (!bp || !(nbp = head = dupb(bp)))
1362 		return (NULL);
1363 
1364 	while (bp->b_cont) {
1365 		if (!(nbp->b_cont = dupb(bp->b_cont))) {
1366 			freemsg(head);
1367 			return (NULL);
1368 		}
1369 		nbp = nbp->b_cont;
1370 		bp = bp->b_cont;
1371 	}
1372 	return (head);
1373 }
1374 
1375 #define	DUPB_NOLOAN(bp) \
1376 	((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
1377 	copyb((bp)) : dupb((bp)))
1378 
1379 mblk_t *
1380 dupmsg_noloan(mblk_t *bp)
1381 {
1382 	mblk_t *head, *nbp;
1383 
1384 	if (bp == NULL || DB_TYPE(bp) != M_DATA ||
1385 	    ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
1386 		return (NULL);
1387 
1388 	while (bp->b_cont) {
1389 		if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
1390 			freemsg(head);
1391 			return (NULL);
1392 		}
1393 		nbp = nbp->b_cont;
1394 		bp = bp->b_cont;
1395 	}
1396 	return (head);
1397 }
1398 
1399 /*
1400  * Copy data from message and data block to newly allocated message and
1401  * data block. Returns new message block pointer, or NULL if error.
1402  * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
1403  * as in the original even when db_base is not word aligned. (bug 1052877)
1404  */
1405 mblk_t *
1406 copyb(mblk_t *bp)
1407 {
1408 	mblk_t	*nbp;
1409 	dblk_t	*dp, *ndp;
1410 	uchar_t *base;
1411 	size_t	size;
1412 	size_t	unaligned;
1413 
1414 	ASSERT(bp->b_wptr >= bp->b_rptr);
1415 
1416 	dp = bp->b_datap;
1417 	if (dp->db_fthdr != NULL)
1418 		STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
1419 
1420 	/*
1421 	 * Special handling for Multidata message; this should be
1422 	 * removed once a copy-callback routine is made available.
1423 	 */
1424 	if (dp->db_type == M_MULTIDATA) {
1425 		cred_t *cr;
1426 
1427 		if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL)
1428 			return (NULL);
1429 
1430 		nbp->b_flag = bp->b_flag;
1431 		nbp->b_band = bp->b_band;
1432 		ndp = nbp->b_datap;
1433 
1434 		/* See comments below on potential issues. */
1435 		STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1436 
1437 		ASSERT(ndp->db_type == dp->db_type);
1438 		cr = dp->db_credp;
1439 		if (cr != NULL)
1440 			crhold(ndp->db_credp = cr);
1441 		ndp->db_cpid = dp->db_cpid;
1442 		return (nbp);
1443 	}
1444 
1445 	size = dp->db_lim - dp->db_base;
1446 	unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
1447 	if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
1448 		return (NULL);
1449 	nbp->b_flag = bp->b_flag;
1450 	nbp->b_band = bp->b_band;
1451 	ndp = nbp->b_datap;
1452 
1453 	/*
1454 	 * Copy the various checksum information that came in
1455 	 * originally.
1456 	 */
1457 	ndp->db_cksumstart = dp->db_cksumstart;
1458 	ndp->db_cksumend = dp->db_cksumend;
1459 	ndp->db_cksumstuff = dp->db_cksumstuff;
1460 	bcopy(dp->db_struioun.data, ndp->db_struioun.data,
1461 	    sizeof (dp->db_struioun.data));
1462 
1463 	/*
1464 	 * Well, here is a potential issue.  If we are trying to
1465 	 * trace a flow, and we copy the message, we might lose
1466 	 * information about where this message might have been.
1467 	 * So we should inherit the FT data.  On the other hand,
1468 	 * a user might be interested only in alloc to free data.
1469 	 * So I guess the real answer is to provide a tunable.
1470 	 */
1471 	STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1472 
1473 	base = ndp->db_base + unaligned;
1474 	bcopy(dp->db_base, ndp->db_base + unaligned, size);
1475 
1476 	nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
1477 	nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
1478 
1479 	return (nbp);
1480 }
1481 
1482 /*
1483  * Copy data from message to newly allocated message using new
1484  * data blocks.  Returns a pointer to the new message, or NULL if error.
1485  */
1486 mblk_t *
1487 copymsg(mblk_t *bp)
1488 {
1489 	mblk_t *head, *nbp;
1490 
1491 	if (!bp || !(nbp = head = copyb(bp)))
1492 		return (NULL);
1493 
1494 	while (bp->b_cont) {
1495 		if (!(nbp->b_cont = copyb(bp->b_cont))) {
1496 			freemsg(head);
1497 			return (NULL);
1498 		}
1499 		nbp = nbp->b_cont;
1500 		bp = bp->b_cont;
1501 	}
1502 	return (head);
1503 }
1504 
1505 /*
1506  * link a message block to tail of message
1507  */
1508 void
1509 linkb(mblk_t *mp, mblk_t *bp)
1510 {
1511 	ASSERT(mp && bp);
1512 
1513 	for (; mp->b_cont; mp = mp->b_cont)
1514 		;
1515 	mp->b_cont = bp;
1516 }
1517 
1518 /*
1519  * unlink a message block from head of message
1520  * return pointer to new message.
1521  * NULL if message becomes empty.
1522  */
1523 mblk_t *
1524 unlinkb(mblk_t *bp)
1525 {
1526 	mblk_t *bp1;
1527 
1528 	bp1 = bp->b_cont;
1529 	bp->b_cont = NULL;
1530 	return (bp1);
1531 }
1532 
1533 /*
1534  * remove a message block "bp" from message "mp"
1535  *
1536  * Return pointer to new message or NULL if no message remains.
1537  * Return -1 if bp is not found in message.
1538  */
1539 mblk_t *
1540 rmvb(mblk_t *mp, mblk_t *bp)
1541 {
1542 	mblk_t *tmp;
1543 	mblk_t *lastp = NULL;
1544 
1545 	ASSERT(mp && bp);
1546 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
1547 		if (tmp == bp) {
1548 			if (lastp)
1549 				lastp->b_cont = tmp->b_cont;
1550 			else
1551 				mp = tmp->b_cont;
1552 			tmp->b_cont = NULL;
1553 			return (mp);
1554 		}
1555 		lastp = tmp;
1556 	}
1557 	return ((mblk_t *)-1);
1558 }
1559 
1560 /*
1561  * Concatenate and align first len bytes of common
1562  * message type.  Len == -1, means concat everything.
1563  * Returns 1 on success, 0 on failure
1564  * After the pullup, mp points to the pulled up data.
1565  */
1566 int
1567 pullupmsg(mblk_t *mp, ssize_t len)
1568 {
1569 	mblk_t *bp, *b_cont;
1570 	dblk_t *dbp;
1571 	ssize_t n;
1572 
1573 	ASSERT(mp->b_datap->db_ref > 0);
1574 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
1575 
1576 	/*
1577 	 * We won't handle Multidata message, since it contains
1578 	 * metadata which this function has no knowledge of; we
1579 	 * assert on DEBUG, and return failure otherwise.
1580 	 */
1581 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1582 	if (mp->b_datap->db_type == M_MULTIDATA)
1583 		return (0);
1584 
1585 	if (len == -1) {
1586 		if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
1587 			return (1);
1588 		len = xmsgsize(mp);
1589 	} else {
1590 		ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
1591 		ASSERT(first_mblk_len >= 0);
1592 		/*
1593 		 * If the length is less than that of the first mblk,
1594 		 * we want to pull up the message into an aligned mblk.
1595 		 * Though not part of the spec, some callers assume it.
1596 		 */
1597 		if (len <= first_mblk_len) {
1598 			if (str_aligned(mp->b_rptr))
1599 				return (1);
1600 			len = first_mblk_len;
1601 		} else if (xmsgsize(mp) < len)
1602 			return (0);
1603 	}
1604 
1605 	if ((bp = allocb_tmpl(len, mp)) == NULL)
1606 		return (0);
1607 
1608 	dbp = bp->b_datap;
1609 	*bp = *mp;		/* swap mblks so bp heads the old msg... */
1610 	mp->b_datap = dbp;	/* ... and mp heads the new message */
1611 	mp->b_datap->db_mblk = mp;
1612 	bp->b_datap->db_mblk = bp;
1613 	mp->b_rptr = mp->b_wptr = dbp->db_base;
1614 
1615 	do {
1616 		ASSERT(bp->b_datap->db_ref > 0);
1617 		ASSERT(bp->b_wptr >= bp->b_rptr);
1618 		n = MIN(bp->b_wptr - bp->b_rptr, len);
1619 		ASSERT(n >= 0);		/* allow zero-length mblk_t's */
1620 		if (n > 0)
1621 			bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
1622 		mp->b_wptr += n;
1623 		bp->b_rptr += n;
1624 		len -= n;
1625 		if (bp->b_rptr != bp->b_wptr)
1626 			break;
1627 		b_cont = bp->b_cont;
1628 		freeb(bp);
1629 		bp = b_cont;
1630 	} while (len && bp);
1631 
1632 	mp->b_cont = bp;	/* tack on whatever wasn't pulled up */
1633 
1634 	return (1);
1635 }
1636 
1637 /*
1638  * Concatenate and align at least the first len bytes of common message
1639  * type.  Len == -1 means concatenate everything.  The original message is
1640  * unaltered.  Returns a pointer to a new message on success, otherwise
1641  * returns NULL.
1642  */
1643 mblk_t *
1644 msgpullup(mblk_t *mp, ssize_t len)
1645 {
1646 	mblk_t	*newmp;
1647 	ssize_t	totlen;
1648 	ssize_t	n;
1649 
1650 	/*
1651 	 * We won't handle Multidata message, since it contains
1652 	 * metadata which this function has no knowledge of; we
1653 	 * assert on DEBUG, and return failure otherwise.
1654 	 */
1655 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1656 	if (mp->b_datap->db_type == M_MULTIDATA)
1657 		return (NULL);
1658 
1659 	totlen = xmsgsize(mp);
1660 
1661 	if ((len > 0) && (len > totlen))
1662 		return (NULL);
1663 
1664 	/*
1665 	 * Copy all of the first msg type into one new mblk, then dupmsg
1666 	 * and link the rest onto this.
1667 	 */
1668 
1669 	len = totlen;
1670 
1671 	if ((newmp = allocb_tmpl(len, mp)) == NULL)
1672 		return (NULL);
1673 
1674 	newmp->b_flag = mp->b_flag;
1675 	newmp->b_band = mp->b_band;
1676 
1677 	while (len > 0) {
1678 		n = mp->b_wptr - mp->b_rptr;
1679 		ASSERT(n >= 0);		/* allow zero-length mblk_t's */
1680 		if (n > 0)
1681 			bcopy(mp->b_rptr, newmp->b_wptr, n);
1682 		newmp->b_wptr += n;
1683 		len -= n;
1684 		mp = mp->b_cont;
1685 	}
1686 
1687 	if (mp != NULL) {
1688 		newmp->b_cont = dupmsg(mp);
1689 		if (newmp->b_cont == NULL) {
1690 			freemsg(newmp);
1691 			return (NULL);
1692 		}
1693 	}
1694 
1695 	return (newmp);
1696 }
1697 
1698 /*
1699  * Trim bytes from message
1700  *  len > 0, trim from head
1701  *  len < 0, trim from tail
1702  * Returns 1 on success, 0 on failure.
1703  */
1704 int
1705 adjmsg(mblk_t *mp, ssize_t len)
1706 {
1707 	mblk_t *bp;
1708 	mblk_t *save_bp = NULL;
1709 	mblk_t *prev_bp;
1710 	mblk_t *bcont;
1711 	unsigned char type;
1712 	ssize_t n;
1713 	int fromhead;
1714 	int first;
1715 
1716 	ASSERT(mp != NULL);
1717 	/*
1718 	 * We won't handle Multidata message, since it contains
1719 	 * metadata which this function has no knowledge of; we
1720 	 * assert on DEBUG, and return failure otherwise.
1721 	 */
1722 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1723 	if (mp->b_datap->db_type == M_MULTIDATA)
1724 		return (0);
1725 
1726 	if (len < 0) {
1727 		fromhead = 0;
1728 		len = -len;
1729 	} else {
1730 		fromhead = 1;
1731 	}
1732 
1733 	if (xmsgsize(mp) < len)
1734 		return (0);
1735 
1736 	if (fromhead) {
1737 		first = 1;
1738 		while (len) {
1739 			ASSERT(mp->b_wptr >= mp->b_rptr);
1740 			n = MIN(mp->b_wptr - mp->b_rptr, len);
1741 			mp->b_rptr += n;
1742 			len -= n;
1743 
1744 			/*
1745 			 * If this is not the first zero length
1746 			 * message remove it
1747 			 */
1748 			if (!first && (mp->b_wptr == mp->b_rptr)) {
1749 				bcont = mp->b_cont;
1750 				freeb(mp);
1751 				mp = save_bp->b_cont = bcont;
1752 			} else {
1753 				save_bp = mp;
1754 				mp = mp->b_cont;
1755 			}
1756 			first = 0;
1757 		}
1758 	} else {
1759 		type = mp->b_datap->db_type;
1760 		while (len) {
1761 			bp = mp;
1762 			save_bp = NULL;
1763 
1764 			/*
1765 			 * Find the last message of same type
1766 			 */
1767 			while (bp && bp->b_datap->db_type == type) {
1768 				ASSERT(bp->b_wptr >= bp->b_rptr);
1769 				prev_bp = save_bp;
1770 				save_bp = bp;
1771 				bp = bp->b_cont;
1772 			}
1773 			if (save_bp == NULL)
1774 				break;
1775 			n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
1776 			save_bp->b_wptr -= n;
1777 			len -= n;
1778 
1779 			/*
1780 			 * If this is not the first message
1781 			 * and we have taken away everything
1782 			 * from this message, remove it
1783 			 */
1784 
1785 			if ((save_bp != mp) &&
1786 			    (save_bp->b_wptr == save_bp->b_rptr)) {
1787 				bcont = save_bp->b_cont;
1788 				freeb(save_bp);
1789 				prev_bp->b_cont = bcont;
1790 			}
1791 		}
1792 	}
1793 	return (1);
1794 }
1795 
1796 /*
1797  * get number of data bytes in message
1798  */
1799 size_t
1800 msgdsize(mblk_t *bp)
1801 {
1802 	size_t count = 0;
1803 
1804 	for (; bp; bp = bp->b_cont)
1805 		if (bp->b_datap->db_type == M_DATA) {
1806 			ASSERT(bp->b_wptr >= bp->b_rptr);
1807 			count += bp->b_wptr - bp->b_rptr;
1808 		}
1809 	return (count);
1810 }
1811 
1812 /*
1813  * Get a message off head of queue
1814  *
1815  * If queue has no buffers then mark queue
1816  * with QWANTR. (queue wants to be read by
1817  * someone when data becomes available)
1818  *
1819  * If there is something to take off then do so.
1820  * If queue falls below hi water mark turn off QFULL
1821  * flag.  Decrement weighted count of queue.
1822  * Also turn off QWANTR because queue is being read.
1823  *
1824  * The queue count is maintained on a per-band basis.
1825  * Priority band 0 (normal messages) uses q_count,
1826  * q_lowat, etc.  Non-zero priority bands use the
1827  * fields in their respective qband structures
1828  * (qb_count, qb_lowat, etc.)  All messages appear
1829  * on the same list, linked via their b_next pointers.
1830  * q_first is the head of the list.  q_count does
1831  * not reflect the size of all the messages on the
1832  * queue.  It only reflects those messages in the
1833  * normal band of flow.  The one exception to this
1834  * deals with high priority messages.  They are in
1835  * their own conceptual "band", but are accounted
1836  * against q_count.
1837  *
1838  * If queue count is below the lo water mark and QWANTW
1839  * is set, enable the closest backq which has a service
1840  * procedure and turn off the QWANTW flag.
1841  *
1842  * getq could be built on top of rmvq, but isn't because
1843  * of performance considerations.
1844  *
1845  * A note on the use of q_count and q_mblkcnt:
1846  *   q_count is the traditional byte count for messages that
1847  *   have been put on a queue.  Documentation tells us that
1848  *   we shouldn't rely on that count, but some drivers/modules
1849  *   do.  What was needed, however, is a mechanism to prevent
1850  *   runaway streams from consuming all of the resources,
1851  *   and particularly be able to flow control zero-length
1852  *   messages.  q_mblkcnt is used for this purpose.  It
1853  *   counts the number of mblk's that are being put on
1854  *   the queue.  The intention here, is that each mblk should
1855  *   contain one byte of data and, for the purpose of
1856  *   flow-control, logically does.  A queue will become
1857  *   full when EITHER of these values (q_count and q_mblkcnt)
1858  *   reach the highwater mark.  It will clear when BOTH
1859  *   of them drop below the highwater mark.  And it will
1860  *   backenable when BOTH of them drop below the lowwater
1861  *   mark.
1862  *   With this algorithm, a driver/module might be able
1863  *   to find a reasonably accurate q_count, and the
1864  *   framework can still try and limit resource usage.
1865  */
1866 mblk_t *
1867 getq(queue_t *q)
1868 {
1869 	mblk_t *bp;
1870 	uchar_t band = 0;
1871 
1872 	bp = getq_noenab(q, 0);
1873 	if (bp != NULL)
1874 		band = bp->b_band;
1875 
1876 	/*
1877 	 * Inlined from qbackenable().
1878 	 * Quick check without holding the lock.
1879 	 */
1880 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1881 		return (bp);
1882 
1883 	qbackenable(q, band);
1884 	return (bp);
1885 }
1886 
1887 /*
1888  * Calculate number of data bytes in a single data message block taking
1889  * multidata messages into account.
1890  */
1891 
1892 #define	ADD_MBLK_SIZE(mp, size) 					\
1893 	if (DB_TYPE(mp) != M_MULTIDATA) {				\
1894 		(size) += MBLKL(mp);					\
1895 	} else {							\
1896 		uint_t	pinuse;						\
1897 									\
1898 		mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse);	\
1899 		(size) += pinuse;					\
1900 	}
1901 
1902 /*
1903  * Returns the number of bytes in a message (a message is defined as a
1904  * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we
1905  * also return the number of distinct mblks in the message.
1906  */
1907 int
1908 mp_cont_len(mblk_t *bp, int *mblkcnt)
1909 {
1910 	mblk_t	*mp;
1911 	int	mblks = 0;
1912 	int	bytes = 0;
1913 
1914 	for (mp = bp; mp != NULL; mp = mp->b_cont) {
1915 		ADD_MBLK_SIZE(mp, bytes);
1916 		mblks++;
1917 	}
1918 
1919 	if (mblkcnt != NULL)
1920 		*mblkcnt = mblks;
1921 
1922 	return (bytes);
1923 }
1924 
1925 /*
1926  * Like getq() but does not backenable.  This is used by the stream
1927  * head when a putback() is likely.  The caller must call qbackenable()
1928  * after it is done with accessing the queue.
1929  * The rbytes arguments to getq_noneab() allows callers to specify a
1930  * the maximum number of bytes to return. If the current amount on the
1931  * queue is less than this then the entire message will be returned.
1932  * A value of 0 returns the entire message and is equivalent to the old
1933  * default behaviour prior to the addition of the rbytes argument.
1934  */
1935 mblk_t *
1936 getq_noenab(queue_t *q, ssize_t rbytes)
1937 {
1938 	mblk_t *bp, *mp1;
1939 	mblk_t *mp2 = NULL;
1940 	qband_t *qbp;
1941 	kthread_id_t freezer;
1942 	int	bytecnt = 0, mblkcnt = 0;
1943 
1944 	/* freezestr should allow its caller to call getq/putq */
1945 	freezer = STREAM(q)->sd_freezer;
1946 	if (freezer == curthread) {
1947 		ASSERT(frozenstr(q));
1948 		ASSERT(MUTEX_HELD(QLOCK(q)));
1949 	} else
1950 		mutex_enter(QLOCK(q));
1951 
1952 	if ((bp = q->q_first) == 0) {
1953 		q->q_flag |= QWANTR;
1954 	} else {
1955 		/*
1956 		 * If the caller supplied a byte threshold and there is
1957 		 * more than this amount on the queue then break up the
1958 		 * the message appropriately.  We can only safely do
1959 		 * this for M_DATA messages.
1960 		 */
1961 		if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) &&
1962 		    (q->q_count > rbytes)) {
1963 			/*
1964 			 * Inline version of mp_cont_len() which terminates
1965 			 * when we meet or exceed rbytes.
1966 			 */
1967 			for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) {
1968 				mblkcnt++;
1969 				ADD_MBLK_SIZE(mp1, bytecnt);
1970 				if (bytecnt  >= rbytes)
1971 					break;
1972 			}
1973 			/*
1974 			 * We need to account for the following scenarios:
1975 			 *
1976 			 * 1) Too much data in the first message:
1977 			 *	mp1 will be the mblk which puts us over our
1978 			 *	byte limit.
1979 			 * 2) Not enough data in the first message:
1980 			 *	mp1 will be NULL.
1981 			 * 3) Exactly the right amount of data contained within
1982 			 *    whole mblks:
1983 			 *	mp1->b_cont will be where we break the message.
1984 			 */
1985 			if (bytecnt > rbytes) {
1986 				/*
1987 				 * Dup/copy mp1 and put what we don't need
1988 				 * back onto the queue. Adjust the read/write
1989 				 * and continuation pointers appropriately
1990 				 * and decrement the current mblk count to
1991 				 * reflect we are putting an mblk back onto
1992 				 * the queue.
1993 				 * When adjusting the message pointers, it's
1994 				 * OK to use the existing bytecnt and the
1995 				 * requested amount (rbytes) to calculate the
1996 				 * the new write offset (b_wptr) of what we
1997 				 * are taking. However, we  cannot use these
1998 				 * values when calculating the read offset of
1999 				 * the mblk we are putting back on the queue.
2000 				 * This is because the begining (b_rptr) of the
2001 				 * mblk represents some arbitrary point within
2002 				 * the message.
2003 				 * It's simplest to do this by advancing b_rptr
2004 				 * by the new length of mp1 as we don't have to
2005 				 * remember any intermediate state.
2006 				 */
2007 				ASSERT(mp1 != NULL);
2008 				mblkcnt--;
2009 				if ((mp2 = dupb(mp1)) == NULL &&
2010 				    (mp2 = copyb(mp1)) == NULL) {
2011 					bytecnt = mblkcnt = 0;
2012 					goto dup_failed;
2013 				}
2014 				mp2->b_cont = mp1->b_cont;
2015 				mp1->b_wptr -= bytecnt - rbytes;
2016 				mp2->b_rptr += mp1->b_wptr - mp1->b_rptr;
2017 				mp1->b_cont = NULL;
2018 				bytecnt = rbytes;
2019 			} else {
2020 				/*
2021 				 * Either there is not enough data in the first
2022 				 * message or there is no excess data to deal
2023 				 * with. If mp1 is NULL, we are taking the
2024 				 * whole message. No need to do anything.
2025 				 * Otherwise we assign mp1->b_cont to mp2 as
2026 				 * we will be putting this back onto the head of
2027 				 * the queue.
2028 				 */
2029 				if (mp1 != NULL) {
2030 					mp2 = mp1->b_cont;
2031 					mp1->b_cont = NULL;
2032 				}
2033 			}
2034 			/*
2035 			 * If mp2 is not NULL then we have part of the message
2036 			 * to put back onto the queue.
2037 			 */
2038 			if (mp2 != NULL) {
2039 				if ((mp2->b_next = bp->b_next) == NULL)
2040 					q->q_last = mp2;
2041 				else
2042 					bp->b_next->b_prev = mp2;
2043 				q->q_first = mp2;
2044 			} else {
2045 				if ((q->q_first = bp->b_next) == NULL)
2046 					q->q_last = NULL;
2047 				else
2048 					q->q_first->b_prev = NULL;
2049 			}
2050 		} else {
2051 			/*
2052 			 * Either no byte threshold was supplied, there is
2053 			 * not enough on the queue or we failed to
2054 			 * duplicate/copy a data block. In these cases we
2055 			 * just take the entire first message.
2056 			 */
2057 dup_failed:
2058 			bytecnt = mp_cont_len(bp, &mblkcnt);
2059 			if ((q->q_first = bp->b_next) == NULL)
2060 				q->q_last = NULL;
2061 			else
2062 				q->q_first->b_prev = NULL;
2063 		}
2064 		if (bp->b_band == 0) {
2065 			q->q_count -= bytecnt;
2066 			q->q_mblkcnt -= mblkcnt;
2067 			if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2068 			    (q->q_mblkcnt < q->q_hiwat))) {
2069 				q->q_flag &= ~QFULL;
2070 			}
2071 		} else {
2072 			int i;
2073 
2074 			ASSERT(bp->b_band <= q->q_nband);
2075 			ASSERT(q->q_bandp != NULL);
2076 			ASSERT(MUTEX_HELD(QLOCK(q)));
2077 			qbp = q->q_bandp;
2078 			i = bp->b_band;
2079 			while (--i > 0)
2080 				qbp = qbp->qb_next;
2081 			if (qbp->qb_first == qbp->qb_last) {
2082 				qbp->qb_first = NULL;
2083 				qbp->qb_last = NULL;
2084 			} else {
2085 				qbp->qb_first = bp->b_next;
2086 			}
2087 			qbp->qb_count -= bytecnt;
2088 			qbp->qb_mblkcnt -= mblkcnt;
2089 			if (qbp->qb_mblkcnt == 0 ||
2090 			    ((qbp->qb_count < qbp->qb_hiwat) &&
2091 			    (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2092 				qbp->qb_flag &= ~QB_FULL;
2093 			}
2094 		}
2095 		q->q_flag &= ~QWANTR;
2096 		bp->b_next = NULL;
2097 		bp->b_prev = NULL;
2098 	}
2099 	if (freezer != curthread)
2100 		mutex_exit(QLOCK(q));
2101 
2102 	STR_FTEVENT_MSG(bp, q, FTEV_GETQ, 0);
2103 
2104 	return (bp);
2105 }
2106 
2107 /*
2108  * Determine if a backenable is needed after removing a message in the
2109  * specified band.
2110  * NOTE: This routine assumes that something like getq_noenab() has been
2111  * already called.
2112  *
2113  * For the read side it is ok to hold sd_lock across calling this (and the
2114  * stream head often does).
2115  * But for the write side strwakeq might be invoked and it acquires sd_lock.
2116  */
2117 void
2118 qbackenable(queue_t *q, uchar_t band)
2119 {
2120 	int backenab = 0;
2121 	qband_t *qbp;
2122 	kthread_id_t freezer;
2123 
2124 	ASSERT(q);
2125 	ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
2126 
2127 	/*
2128 	 * Quick check without holding the lock.
2129 	 * OK since after getq() has lowered the q_count these flags
2130 	 * would not change unless either the qbackenable() is done by
2131 	 * another thread (which is ok) or the queue has gotten QFULL
2132 	 * in which case another backenable will take place when the queue
2133 	 * drops below q_lowat.
2134 	 */
2135 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
2136 		return;
2137 
2138 	/* freezestr should allow its caller to call getq/putq */
2139 	freezer = STREAM(q)->sd_freezer;
2140 	if (freezer == curthread) {
2141 		ASSERT(frozenstr(q));
2142 		ASSERT(MUTEX_HELD(QLOCK(q)));
2143 	} else
2144 		mutex_enter(QLOCK(q));
2145 
2146 	if (band == 0) {
2147 		if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
2148 		    q->q_mblkcnt < q->q_lowat)) {
2149 			backenab = q->q_flag & (QWANTW|QWANTWSYNC);
2150 		}
2151 	} else {
2152 		int i;
2153 
2154 		ASSERT((unsigned)band <= q->q_nband);
2155 		ASSERT(q->q_bandp != NULL);
2156 
2157 		qbp = q->q_bandp;
2158 		i = band;
2159 		while (--i > 0)
2160 			qbp = qbp->qb_next;
2161 
2162 		if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
2163 		    qbp->qb_mblkcnt < qbp->qb_lowat)) {
2164 			backenab = qbp->qb_flag & QB_WANTW;
2165 		}
2166 	}
2167 
2168 	if (backenab == 0) {
2169 		if (freezer != curthread)
2170 			mutex_exit(QLOCK(q));
2171 		return;
2172 	}
2173 
2174 	/* Have to drop the lock across strwakeq and backenable */
2175 	if (backenab & QWANTWSYNC)
2176 		q->q_flag &= ~QWANTWSYNC;
2177 	if (backenab & (QWANTW|QB_WANTW)) {
2178 		if (band != 0)
2179 			qbp->qb_flag &= ~QB_WANTW;
2180 		else {
2181 			q->q_flag &= ~QWANTW;
2182 		}
2183 	}
2184 
2185 	if (freezer != curthread)
2186 		mutex_exit(QLOCK(q));
2187 
2188 	if (backenab & QWANTWSYNC)
2189 		strwakeq(q, QWANTWSYNC);
2190 	if (backenab & (QWANTW|QB_WANTW))
2191 		backenable(q, band);
2192 }
2193 
2194 /*
2195  * Remove a message from a queue.  The queue count and other
2196  * flow control parameters are adjusted and the back queue
2197  * enabled if necessary.
2198  *
2199  * rmvq can be called with the stream frozen, but other utility functions
2200  * holding QLOCK, and by streams modules without any locks/frozen.
2201  */
2202 void
2203 rmvq(queue_t *q, mblk_t *mp)
2204 {
2205 	ASSERT(mp != NULL);
2206 
2207 	rmvq_noenab(q, mp);
2208 	if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
2209 		/*
2210 		 * qbackenable can handle a frozen stream but not a "random"
2211 		 * qlock being held. Drop lock across qbackenable.
2212 		 */
2213 		mutex_exit(QLOCK(q));
2214 		qbackenable(q, mp->b_band);
2215 		mutex_enter(QLOCK(q));
2216 	} else {
2217 		qbackenable(q, mp->b_band);
2218 	}
2219 }
2220 
2221 /*
2222  * Like rmvq() but without any backenabling.
2223  * This exists to handle SR_CONSOL_DATA in strrput().
2224  */
2225 void
2226 rmvq_noenab(queue_t *q, mblk_t *mp)
2227 {
2228 	int i;
2229 	qband_t *qbp = NULL;
2230 	kthread_id_t freezer;
2231 	int	bytecnt = 0, mblkcnt = 0;
2232 
2233 	freezer = STREAM(q)->sd_freezer;
2234 	if (freezer == curthread) {
2235 		ASSERT(frozenstr(q));
2236 		ASSERT(MUTEX_HELD(QLOCK(q)));
2237 	} else if (MUTEX_HELD(QLOCK(q))) {
2238 		/* Don't drop lock on exit */
2239 		freezer = curthread;
2240 	} else
2241 		mutex_enter(QLOCK(q));
2242 
2243 	ASSERT(mp->b_band <= q->q_nband);
2244 	if (mp->b_band != 0) {		/* Adjust band pointers */
2245 		ASSERT(q->q_bandp != NULL);
2246 		qbp = q->q_bandp;
2247 		i = mp->b_band;
2248 		while (--i > 0)
2249 			qbp = qbp->qb_next;
2250 		if (mp == qbp->qb_first) {
2251 			if (mp->b_next && mp->b_band == mp->b_next->b_band)
2252 				qbp->qb_first = mp->b_next;
2253 			else
2254 				qbp->qb_first = NULL;
2255 		}
2256 		if (mp == qbp->qb_last) {
2257 			if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
2258 				qbp->qb_last = mp->b_prev;
2259 			else
2260 				qbp->qb_last = NULL;
2261 		}
2262 	}
2263 
2264 	/*
2265 	 * Remove the message from the list.
2266 	 */
2267 	if (mp->b_prev)
2268 		mp->b_prev->b_next = mp->b_next;
2269 	else
2270 		q->q_first = mp->b_next;
2271 	if (mp->b_next)
2272 		mp->b_next->b_prev = mp->b_prev;
2273 	else
2274 		q->q_last = mp->b_prev;
2275 	mp->b_next = NULL;
2276 	mp->b_prev = NULL;
2277 
2278 	/* Get the size of the message for q_count accounting */
2279 	bytecnt = mp_cont_len(mp, &mblkcnt);
2280 
2281 	if (mp->b_band == 0) {		/* Perform q_count accounting */
2282 		q->q_count -= bytecnt;
2283 		q->q_mblkcnt -= mblkcnt;
2284 		if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2285 		    (q->q_mblkcnt < q->q_hiwat))) {
2286 			q->q_flag &= ~QFULL;
2287 		}
2288 	} else {			/* Perform qb_count accounting */
2289 		qbp->qb_count -= bytecnt;
2290 		qbp->qb_mblkcnt -= mblkcnt;
2291 		if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) &&
2292 		    (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2293 			qbp->qb_flag &= ~QB_FULL;
2294 		}
2295 	}
2296 	if (freezer != curthread)
2297 		mutex_exit(QLOCK(q));
2298 
2299 	STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, 0);
2300 }
2301 
2302 /*
2303  * Empty a queue.
2304  * If flag is set, remove all messages.  Otherwise, remove
2305  * only non-control messages.  If queue falls below its low
2306  * water mark, and QWANTW is set, enable the nearest upstream
2307  * service procedure.
2308  *
2309  * Historical note: when merging the M_FLUSH code in strrput with this
2310  * code one difference was discovered. flushq did not have a check
2311  * for q_lowat == 0 in the backenabling test.
2312  *
2313  * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
2314  * if one exists on the queue.
2315  */
2316 void
2317 flushq_common(queue_t *q, int flag, int pcproto_flag)
2318 {
2319 	mblk_t *mp, *nmp;
2320 	qband_t *qbp;
2321 	int backenab = 0;
2322 	unsigned char bpri;
2323 	unsigned char	qbf[NBAND];	/* band flushing backenable flags */
2324 
2325 	if (q->q_first == NULL)
2326 		return;
2327 
2328 	mutex_enter(QLOCK(q));
2329 	mp = q->q_first;
2330 	q->q_first = NULL;
2331 	q->q_last = NULL;
2332 	q->q_count = 0;
2333 	q->q_mblkcnt = 0;
2334 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2335 		qbp->qb_first = NULL;
2336 		qbp->qb_last = NULL;
2337 		qbp->qb_count = 0;
2338 		qbp->qb_mblkcnt = 0;
2339 		qbp->qb_flag &= ~QB_FULL;
2340 	}
2341 	q->q_flag &= ~QFULL;
2342 	mutex_exit(QLOCK(q));
2343 	while (mp) {
2344 		nmp = mp->b_next;
2345 		mp->b_next = mp->b_prev = NULL;
2346 
2347 		STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, 0);
2348 
2349 		if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
2350 			(void) putq(q, mp);
2351 		else if (flag || datamsg(mp->b_datap->db_type))
2352 			freemsg(mp);
2353 		else
2354 			(void) putq(q, mp);
2355 		mp = nmp;
2356 	}
2357 	bpri = 1;
2358 	mutex_enter(QLOCK(q));
2359 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2360 		if ((qbp->qb_flag & QB_WANTW) &&
2361 		    (((qbp->qb_count < qbp->qb_lowat) &&
2362 		    (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
2363 		    qbp->qb_lowat == 0)) {
2364 			qbp->qb_flag &= ~QB_WANTW;
2365 			backenab = 1;
2366 			qbf[bpri] = 1;
2367 		} else
2368 			qbf[bpri] = 0;
2369 		bpri++;
2370 	}
2371 	ASSERT(bpri == (unsigned char)(q->q_nband + 1));
2372 	if ((q->q_flag & QWANTW) &&
2373 	    (((q->q_count < q->q_lowat) &&
2374 	    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2375 		q->q_flag &= ~QWANTW;
2376 		backenab = 1;
2377 		qbf[0] = 1;
2378 	} else
2379 		qbf[0] = 0;
2380 
2381 	/*
2382 	 * If any band can now be written to, and there is a writer
2383 	 * for that band, then backenable the closest service procedure.
2384 	 */
2385 	if (backenab) {
2386 		mutex_exit(QLOCK(q));
2387 		for (bpri = q->q_nband; bpri != 0; bpri--)
2388 			if (qbf[bpri])
2389 				backenable(q, bpri);
2390 		if (qbf[0])
2391 			backenable(q, 0);
2392 	} else
2393 		mutex_exit(QLOCK(q));
2394 }
2395 
2396 /*
2397  * The real flushing takes place in flushq_common. This is done so that
2398  * a flag which specifies whether or not M_PCPROTO messages should be flushed
2399  * or not. Currently the only place that uses this flag is the stream head.
2400  */
2401 void
2402 flushq(queue_t *q, int flag)
2403 {
2404 	flushq_common(q, flag, 0);
2405 }
2406 
2407 /*
2408  * Flush the queue of messages of the given priority band.
2409  * There is some duplication of code between flushq and flushband.
2410  * This is because we want to optimize the code as much as possible.
2411  * The assumption is that there will be more messages in the normal
2412  * (priority 0) band than in any other.
2413  *
2414  * Historical note: when merging the M_FLUSH code in strrput with this
2415  * code one difference was discovered. flushband had an extra check for
2416  * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
2417  * case. That check does not match the man page for flushband and was not
2418  * in the strrput flush code hence it was removed.
2419  */
2420 void
2421 flushband(queue_t *q, unsigned char pri, int flag)
2422 {
2423 	mblk_t *mp;
2424 	mblk_t *nmp;
2425 	mblk_t *last;
2426 	qband_t *qbp;
2427 	int band;
2428 
2429 	ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
2430 	if (pri > q->q_nband) {
2431 		return;
2432 	}
2433 	mutex_enter(QLOCK(q));
2434 	if (pri == 0) {
2435 		mp = q->q_first;
2436 		q->q_first = NULL;
2437 		q->q_last = NULL;
2438 		q->q_count = 0;
2439 		q->q_mblkcnt = 0;
2440 		for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2441 			qbp->qb_first = NULL;
2442 			qbp->qb_last = NULL;
2443 			qbp->qb_count = 0;
2444 			qbp->qb_mblkcnt = 0;
2445 			qbp->qb_flag &= ~QB_FULL;
2446 		}
2447 		q->q_flag &= ~QFULL;
2448 		mutex_exit(QLOCK(q));
2449 		while (mp) {
2450 			nmp = mp->b_next;
2451 			mp->b_next = mp->b_prev = NULL;
2452 			if ((mp->b_band == 0) &&
2453 			    ((flag == FLUSHALL) ||
2454 			    datamsg(mp->b_datap->db_type)))
2455 				freemsg(mp);
2456 			else
2457 				(void) putq(q, mp);
2458 			mp = nmp;
2459 		}
2460 		mutex_enter(QLOCK(q));
2461 		if ((q->q_flag & QWANTW) &&
2462 		    (((q->q_count < q->q_lowat) &&
2463 		    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2464 			q->q_flag &= ~QWANTW;
2465 			mutex_exit(QLOCK(q));
2466 
2467 			backenable(q, pri);
2468 		} else
2469 			mutex_exit(QLOCK(q));
2470 	} else {	/* pri != 0 */
2471 		boolean_t flushed = B_FALSE;
2472 		band = pri;
2473 
2474 		ASSERT(MUTEX_HELD(QLOCK(q)));
2475 		qbp = q->q_bandp;
2476 		while (--band > 0)
2477 			qbp = qbp->qb_next;
2478 		mp = qbp->qb_first;
2479 		if (mp == NULL) {
2480 			mutex_exit(QLOCK(q));
2481 			return;
2482 		}
2483 		last = qbp->qb_last->b_next;
2484 		/*
2485 		 * rmvq_noenab() and freemsg() are called for each mblk that
2486 		 * meets the criteria.  The loop is executed until the last
2487 		 * mblk has been processed.
2488 		 */
2489 		while (mp != last) {
2490 			ASSERT(mp->b_band == pri);
2491 			nmp = mp->b_next;
2492 			if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
2493 				rmvq_noenab(q, mp);
2494 				freemsg(mp);
2495 				flushed = B_TRUE;
2496 			}
2497 			mp = nmp;
2498 		}
2499 		mutex_exit(QLOCK(q));
2500 
2501 		/*
2502 		 * If any mblk(s) has been freed, we know that qbackenable()
2503 		 * will need to be called.
2504 		 */
2505 		if (flushed)
2506 			qbackenable(q, pri);
2507 	}
2508 }
2509 
2510 /*
2511  * Return 1 if the queue is not full.  If the queue is full, return
2512  * 0 (may not put message) and set QWANTW flag (caller wants to write
2513  * to the queue).
2514  */
2515 int
2516 canput(queue_t *q)
2517 {
2518 	TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
2519 
2520 	/* this is for loopback transports, they should not do a canput */
2521 	ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
2522 
2523 	/* Find next forward module that has a service procedure */
2524 	q = q->q_nfsrv;
2525 
2526 	if (!(q->q_flag & QFULL)) {
2527 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2528 		return (1);
2529 	}
2530 	mutex_enter(QLOCK(q));
2531 	if (q->q_flag & QFULL) {
2532 		q->q_flag |= QWANTW;
2533 		mutex_exit(QLOCK(q));
2534 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
2535 		return (0);
2536 	}
2537 	mutex_exit(QLOCK(q));
2538 	TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2539 	return (1);
2540 }
2541 
2542 /*
2543  * This is the new canput for use with priority bands.  Return 1 if the
2544  * band is not full.  If the band is full, return 0 (may not put message)
2545  * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
2546  * write to the queue).
2547  */
2548 int
2549 bcanput(queue_t *q, unsigned char pri)
2550 {
2551 	qband_t *qbp;
2552 
2553 	TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
2554 	if (!q)
2555 		return (0);
2556 
2557 	/* Find next forward module that has a service procedure */
2558 	q = q->q_nfsrv;
2559 
2560 	mutex_enter(QLOCK(q));
2561 	if (pri == 0) {
2562 		if (q->q_flag & QFULL) {
2563 			q->q_flag |= QWANTW;
2564 			mutex_exit(QLOCK(q));
2565 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2566 			    "bcanput:%p %X %d", q, pri, 0);
2567 			return (0);
2568 		}
2569 	} else {	/* pri != 0 */
2570 		if (pri > q->q_nband) {
2571 			/*
2572 			 * No band exists yet, so return success.
2573 			 */
2574 			mutex_exit(QLOCK(q));
2575 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2576 			    "bcanput:%p %X %d", q, pri, 1);
2577 			return (1);
2578 		}
2579 		qbp = q->q_bandp;
2580 		while (--pri)
2581 			qbp = qbp->qb_next;
2582 		if (qbp->qb_flag & QB_FULL) {
2583 			qbp->qb_flag |= QB_WANTW;
2584 			mutex_exit(QLOCK(q));
2585 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2586 			    "bcanput:%p %X %d", q, pri, 0);
2587 			return (0);
2588 		}
2589 	}
2590 	mutex_exit(QLOCK(q));
2591 	TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2592 	    "bcanput:%p %X %d", q, pri, 1);
2593 	return (1);
2594 }
2595 
2596 /*
2597  * Put a message on a queue.
2598  *
2599  * Messages are enqueued on a priority basis.  The priority classes
2600  * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
2601  * and B_NORMAL (type < QPCTL && band == 0).
2602  *
2603  * Add appropriate weighted data block sizes to queue count.
2604  * If queue hits high water mark then set QFULL flag.
2605  *
2606  * If QNOENAB is not set (putq is allowed to enable the queue),
2607  * enable the queue only if the message is PRIORITY,
2608  * or the QWANTR flag is set (indicating that the service procedure
2609  * is ready to read the queue.  This implies that a service
2610  * procedure must NEVER put a high priority message back on its own
2611  * queue, as this would result in an infinite loop (!).
2612  */
2613 int
2614 putq(queue_t *q, mblk_t *bp)
2615 {
2616 	mblk_t *tmp;
2617 	qband_t *qbp = NULL;
2618 	int mcls = (int)queclass(bp);
2619 	kthread_id_t freezer;
2620 	int	bytecnt = 0, mblkcnt = 0;
2621 
2622 	freezer = STREAM(q)->sd_freezer;
2623 	if (freezer == curthread) {
2624 		ASSERT(frozenstr(q));
2625 		ASSERT(MUTEX_HELD(QLOCK(q)));
2626 	} else
2627 		mutex_enter(QLOCK(q));
2628 
2629 	/*
2630 	 * Make sanity checks and if qband structure is not yet
2631 	 * allocated, do so.
2632 	 */
2633 	if (mcls == QPCTL) {
2634 		if (bp->b_band != 0)
2635 			bp->b_band = 0;		/* force to be correct */
2636 	} else if (bp->b_band != 0) {
2637 		int i;
2638 		qband_t **qbpp;
2639 
2640 		if (bp->b_band > q->q_nband) {
2641 
2642 			/*
2643 			 * The qband structure for this priority band is
2644 			 * not on the queue yet, so we have to allocate
2645 			 * one on the fly.  It would be wasteful to
2646 			 * associate the qband structures with every
2647 			 * queue when the queues are allocated.  This is
2648 			 * because most queues will only need the normal
2649 			 * band of flow which can be described entirely
2650 			 * by the queue itself.
2651 			 */
2652 			qbpp = &q->q_bandp;
2653 			while (*qbpp)
2654 				qbpp = &(*qbpp)->qb_next;
2655 			while (bp->b_band > q->q_nband) {
2656 				if ((*qbpp = allocband()) == NULL) {
2657 					if (freezer != curthread)
2658 						mutex_exit(QLOCK(q));
2659 					return (0);
2660 				}
2661 				(*qbpp)->qb_hiwat = q->q_hiwat;
2662 				(*qbpp)->qb_lowat = q->q_lowat;
2663 				q->q_nband++;
2664 				qbpp = &(*qbpp)->qb_next;
2665 			}
2666 		}
2667 		ASSERT(MUTEX_HELD(QLOCK(q)));
2668 		qbp = q->q_bandp;
2669 		i = bp->b_band;
2670 		while (--i)
2671 			qbp = qbp->qb_next;
2672 	}
2673 
2674 	/*
2675 	 * If queue is empty, add the message and initialize the pointers.
2676 	 * Otherwise, adjust message pointers and queue pointers based on
2677 	 * the type of the message and where it belongs on the queue.  Some
2678 	 * code is duplicated to minimize the number of conditionals and
2679 	 * hopefully minimize the amount of time this routine takes.
2680 	 */
2681 	if (!q->q_first) {
2682 		bp->b_next = NULL;
2683 		bp->b_prev = NULL;
2684 		q->q_first = bp;
2685 		q->q_last = bp;
2686 		if (qbp) {
2687 			qbp->qb_first = bp;
2688 			qbp->qb_last = bp;
2689 		}
2690 	} else if (!qbp) {	/* bp->b_band == 0 */
2691 
2692 		/*
2693 		 * If queue class of message is less than or equal to
2694 		 * that of the last one on the queue, tack on to the end.
2695 		 */
2696 		tmp = q->q_last;
2697 		if (mcls <= (int)queclass(tmp)) {
2698 			bp->b_next = NULL;
2699 			bp->b_prev = tmp;
2700 			tmp->b_next = bp;
2701 			q->q_last = bp;
2702 		} else {
2703 			tmp = q->q_first;
2704 			while ((int)queclass(tmp) >= mcls)
2705 				tmp = tmp->b_next;
2706 
2707 			/*
2708 			 * Insert bp before tmp.
2709 			 */
2710 			bp->b_next = tmp;
2711 			bp->b_prev = tmp->b_prev;
2712 			if (tmp->b_prev)
2713 				tmp->b_prev->b_next = bp;
2714 			else
2715 				q->q_first = bp;
2716 			tmp->b_prev = bp;
2717 		}
2718 	} else {		/* bp->b_band != 0 */
2719 		if (qbp->qb_first) {
2720 			tmp = qbp->qb_last;
2721 
2722 			/*
2723 			 * Insert bp after the last message in this band.
2724 			 */
2725 			bp->b_next = tmp->b_next;
2726 			if (tmp->b_next)
2727 				tmp->b_next->b_prev = bp;
2728 			else
2729 				q->q_last = bp;
2730 			bp->b_prev = tmp;
2731 			tmp->b_next = bp;
2732 		} else {
2733 			tmp = q->q_last;
2734 			if ((mcls < (int)queclass(tmp)) ||
2735 			    (bp->b_band <= tmp->b_band)) {
2736 
2737 				/*
2738 				 * Tack bp on end of queue.
2739 				 */
2740 				bp->b_next = NULL;
2741 				bp->b_prev = tmp;
2742 				tmp->b_next = bp;
2743 				q->q_last = bp;
2744 			} else {
2745 				tmp = q->q_first;
2746 				while (tmp->b_datap->db_type >= QPCTL)
2747 					tmp = tmp->b_next;
2748 				while (tmp->b_band >= bp->b_band)
2749 					tmp = tmp->b_next;
2750 
2751 				/*
2752 				 * Insert bp before tmp.
2753 				 */
2754 				bp->b_next = tmp;
2755 				bp->b_prev = tmp->b_prev;
2756 				if (tmp->b_prev)
2757 					tmp->b_prev->b_next = bp;
2758 				else
2759 					q->q_first = bp;
2760 				tmp->b_prev = bp;
2761 			}
2762 			qbp->qb_first = bp;
2763 		}
2764 		qbp->qb_last = bp;
2765 	}
2766 
2767 	/* Get message byte count for q_count accounting */
2768 	bytecnt = mp_cont_len(bp, &mblkcnt);
2769 
2770 	if (qbp) {
2771 		qbp->qb_count += bytecnt;
2772 		qbp->qb_mblkcnt += mblkcnt;
2773 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2774 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2775 			qbp->qb_flag |= QB_FULL;
2776 		}
2777 	} else {
2778 		q->q_count += bytecnt;
2779 		q->q_mblkcnt += mblkcnt;
2780 		if ((q->q_count >= q->q_hiwat) ||
2781 		    (q->q_mblkcnt >= q->q_hiwat)) {
2782 			q->q_flag |= QFULL;
2783 		}
2784 	}
2785 
2786 	STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, 0);
2787 
2788 	if ((mcls > QNORM) ||
2789 	    (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
2790 		qenable_locked(q);
2791 	ASSERT(MUTEX_HELD(QLOCK(q)));
2792 	if (freezer != curthread)
2793 		mutex_exit(QLOCK(q));
2794 
2795 	return (1);
2796 }
2797 
2798 /*
2799  * Put stuff back at beginning of Q according to priority order.
2800  * See comment on putq above for details.
2801  */
2802 int
2803 putbq(queue_t *q, mblk_t *bp)
2804 {
2805 	mblk_t *tmp;
2806 	qband_t *qbp = NULL;
2807 	int mcls = (int)queclass(bp);
2808 	kthread_id_t freezer;
2809 	int	bytecnt = 0, mblkcnt = 0;
2810 
2811 	ASSERT(q && bp);
2812 	ASSERT(bp->b_next == NULL);
2813 	freezer = STREAM(q)->sd_freezer;
2814 	if (freezer == curthread) {
2815 		ASSERT(frozenstr(q));
2816 		ASSERT(MUTEX_HELD(QLOCK(q)));
2817 	} else
2818 		mutex_enter(QLOCK(q));
2819 
2820 	/*
2821 	 * Make sanity checks and if qband structure is not yet
2822 	 * allocated, do so.
2823 	 */
2824 	if (mcls == QPCTL) {
2825 		if (bp->b_band != 0)
2826 			bp->b_band = 0;		/* force to be correct */
2827 	} else if (bp->b_band != 0) {
2828 		int i;
2829 		qband_t **qbpp;
2830 
2831 		if (bp->b_band > q->q_nband) {
2832 			qbpp = &q->q_bandp;
2833 			while (*qbpp)
2834 				qbpp = &(*qbpp)->qb_next;
2835 			while (bp->b_band > q->q_nband) {
2836 				if ((*qbpp = allocband()) == NULL) {
2837 					if (freezer != curthread)
2838 						mutex_exit(QLOCK(q));
2839 					return (0);
2840 				}
2841 				(*qbpp)->qb_hiwat = q->q_hiwat;
2842 				(*qbpp)->qb_lowat = q->q_lowat;
2843 				q->q_nband++;
2844 				qbpp = &(*qbpp)->qb_next;
2845 			}
2846 		}
2847 		qbp = q->q_bandp;
2848 		i = bp->b_band;
2849 		while (--i)
2850 			qbp = qbp->qb_next;
2851 	}
2852 
2853 	/*
2854 	 * If queue is empty or if message is high priority,
2855 	 * place on the front of the queue.
2856 	 */
2857 	tmp = q->q_first;
2858 	if ((!tmp) || (mcls == QPCTL)) {
2859 		bp->b_next = tmp;
2860 		if (tmp)
2861 			tmp->b_prev = bp;
2862 		else
2863 			q->q_last = bp;
2864 		q->q_first = bp;
2865 		bp->b_prev = NULL;
2866 		if (qbp) {
2867 			qbp->qb_first = bp;
2868 			qbp->qb_last = bp;
2869 		}
2870 	} else if (qbp) {	/* bp->b_band != 0 */
2871 		tmp = qbp->qb_first;
2872 		if (tmp) {
2873 
2874 			/*
2875 			 * Insert bp before the first message in this band.
2876 			 */
2877 			bp->b_next = tmp;
2878 			bp->b_prev = tmp->b_prev;
2879 			if (tmp->b_prev)
2880 				tmp->b_prev->b_next = bp;
2881 			else
2882 				q->q_first = bp;
2883 			tmp->b_prev = bp;
2884 		} else {
2885 			tmp = q->q_last;
2886 			if ((mcls < (int)queclass(tmp)) ||
2887 			    (bp->b_band < tmp->b_band)) {
2888 
2889 				/*
2890 				 * Tack bp on end of queue.
2891 				 */
2892 				bp->b_next = NULL;
2893 				bp->b_prev = tmp;
2894 				tmp->b_next = bp;
2895 				q->q_last = bp;
2896 			} else {
2897 				tmp = q->q_first;
2898 				while (tmp->b_datap->db_type >= QPCTL)
2899 					tmp = tmp->b_next;
2900 				while (tmp->b_band > bp->b_band)
2901 					tmp = tmp->b_next;
2902 
2903 				/*
2904 				 * Insert bp before tmp.
2905 				 */
2906 				bp->b_next = tmp;
2907 				bp->b_prev = tmp->b_prev;
2908 				if (tmp->b_prev)
2909 					tmp->b_prev->b_next = bp;
2910 				else
2911 					q->q_first = bp;
2912 				tmp->b_prev = bp;
2913 			}
2914 			qbp->qb_last = bp;
2915 		}
2916 		qbp->qb_first = bp;
2917 	} else {		/* bp->b_band == 0 && !QPCTL */
2918 
2919 		/*
2920 		 * If the queue class or band is less than that of the last
2921 		 * message on the queue, tack bp on the end of the queue.
2922 		 */
2923 		tmp = q->q_last;
2924 		if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
2925 			bp->b_next = NULL;
2926 			bp->b_prev = tmp;
2927 			tmp->b_next = bp;
2928 			q->q_last = bp;
2929 		} else {
2930 			tmp = q->q_first;
2931 			while (tmp->b_datap->db_type >= QPCTL)
2932 				tmp = tmp->b_next;
2933 			while (tmp->b_band > bp->b_band)
2934 				tmp = tmp->b_next;
2935 
2936 			/*
2937 			 * Insert bp before tmp.
2938 			 */
2939 			bp->b_next = tmp;
2940 			bp->b_prev = tmp->b_prev;
2941 			if (tmp->b_prev)
2942 				tmp->b_prev->b_next = bp;
2943 			else
2944 				q->q_first = bp;
2945 			tmp->b_prev = bp;
2946 		}
2947 	}
2948 
2949 	/* Get message byte count for q_count accounting */
2950 	bytecnt = mp_cont_len(bp, &mblkcnt);
2951 
2952 	if (qbp) {
2953 		qbp->qb_count += bytecnt;
2954 		qbp->qb_mblkcnt += mblkcnt;
2955 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2956 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2957 			qbp->qb_flag |= QB_FULL;
2958 		}
2959 	} else {
2960 		q->q_count += bytecnt;
2961 		q->q_mblkcnt += mblkcnt;
2962 		if ((q->q_count >= q->q_hiwat) ||
2963 		    (q->q_mblkcnt >= q->q_hiwat)) {
2964 			q->q_flag |= QFULL;
2965 		}
2966 	}
2967 
2968 	STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, 0);
2969 
2970 	if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
2971 		qenable_locked(q);
2972 	ASSERT(MUTEX_HELD(QLOCK(q)));
2973 	if (freezer != curthread)
2974 		mutex_exit(QLOCK(q));
2975 
2976 	return (1);
2977 }
2978 
2979 /*
2980  * Insert a message before an existing message on the queue.  If the
2981  * existing message is NULL, the new messages is placed on the end of
2982  * the queue.  The queue class of the new message is ignored.  However,
2983  * the priority band of the new message must adhere to the following
2984  * ordering:
2985  *
2986  *	emp->b_prev->b_band >= mp->b_band >= emp->b_band.
2987  *
2988  * All flow control parameters are updated.
2989  *
2990  * insq can be called with the stream frozen, but other utility functions
2991  * holding QLOCK, and by streams modules without any locks/frozen.
2992  */
2993 int
2994 insq(queue_t *q, mblk_t *emp, mblk_t *mp)
2995 {
2996 	mblk_t *tmp;
2997 	qband_t *qbp = NULL;
2998 	int mcls = (int)queclass(mp);
2999 	kthread_id_t freezer;
3000 	int	bytecnt = 0, mblkcnt = 0;
3001 
3002 	freezer = STREAM(q)->sd_freezer;
3003 	if (freezer == curthread) {
3004 		ASSERT(frozenstr(q));
3005 		ASSERT(MUTEX_HELD(QLOCK(q)));
3006 	} else if (MUTEX_HELD(QLOCK(q))) {
3007 		/* Don't drop lock on exit */
3008 		freezer = curthread;
3009 	} else
3010 		mutex_enter(QLOCK(q));
3011 
3012 	if (mcls == QPCTL) {
3013 		if (mp->b_band != 0)
3014 			mp->b_band = 0;		/* force to be correct */
3015 		if (emp && emp->b_prev &&
3016 		    (emp->b_prev->b_datap->db_type < QPCTL))
3017 			goto badord;
3018 	}
3019 	if (emp) {
3020 		if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
3021 		    (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
3022 		    (emp->b_prev->b_band < mp->b_band))) {
3023 			goto badord;
3024 		}
3025 	} else {
3026 		tmp = q->q_last;
3027 		if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
3028 badord:
3029 			cmn_err(CE_WARN,
3030 			    "insq: attempt to insert message out of order "
3031 			    "on q %p", (void *)q);
3032 			if (freezer != curthread)
3033 				mutex_exit(QLOCK(q));
3034 			return (0);
3035 		}
3036 	}
3037 
3038 	if (mp->b_band != 0) {
3039 		int i;
3040 		qband_t **qbpp;
3041 
3042 		if (mp->b_band > q->q_nband) {
3043 			qbpp = &q->q_bandp;
3044 			while (*qbpp)
3045 				qbpp = &(*qbpp)->qb_next;
3046 			while (mp->b_band > q->q_nband) {
3047 				if ((*qbpp = allocband()) == NULL) {
3048 					if (freezer != curthread)
3049 						mutex_exit(QLOCK(q));
3050 					return (0);
3051 				}
3052 				(*qbpp)->qb_hiwat = q->q_hiwat;
3053 				(*qbpp)->qb_lowat = q->q_lowat;
3054 				q->q_nband++;
3055 				qbpp = &(*qbpp)->qb_next;
3056 			}
3057 		}
3058 		qbp = q->q_bandp;
3059 		i = mp->b_band;
3060 		while (--i)
3061 			qbp = qbp->qb_next;
3062 	}
3063 
3064 	if ((mp->b_next = emp) != NULL) {
3065 		if ((mp->b_prev = emp->b_prev) != NULL)
3066 			emp->b_prev->b_next = mp;
3067 		else
3068 			q->q_first = mp;
3069 		emp->b_prev = mp;
3070 	} else {
3071 		if ((mp->b_prev = q->q_last) != NULL)
3072 			q->q_last->b_next = mp;
3073 		else
3074 			q->q_first = mp;
3075 		q->q_last = mp;
3076 	}
3077 
3078 	/* Get mblk and byte count for q_count accounting */
3079 	bytecnt = mp_cont_len(mp, &mblkcnt);
3080 
3081 	if (qbp) {	/* adjust qband pointers and count */
3082 		if (!qbp->qb_first) {
3083 			qbp->qb_first = mp;
3084 			qbp->qb_last = mp;
3085 		} else {
3086 			if (mp->b_prev == NULL || (mp->b_prev != NULL &&
3087 			    (mp->b_prev->b_band != mp->b_band)))
3088 				qbp->qb_first = mp;
3089 			else if (mp->b_next == NULL || (mp->b_next != NULL &&
3090 			    (mp->b_next->b_band != mp->b_band)))
3091 				qbp->qb_last = mp;
3092 		}
3093 		qbp->qb_count += bytecnt;
3094 		qbp->qb_mblkcnt += mblkcnt;
3095 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
3096 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
3097 			qbp->qb_flag |= QB_FULL;
3098 		}
3099 	} else {
3100 		q->q_count += bytecnt;
3101 		q->q_mblkcnt += mblkcnt;
3102 		if ((q->q_count >= q->q_hiwat) ||
3103 		    (q->q_mblkcnt >= q->q_hiwat)) {
3104 			q->q_flag |= QFULL;
3105 		}
3106 	}
3107 
3108 	STR_FTEVENT_MSG(mp, q, FTEV_INSQ, 0);
3109 
3110 	if (canenable(q) && (q->q_flag & QWANTR))
3111 		qenable_locked(q);
3112 
3113 	ASSERT(MUTEX_HELD(QLOCK(q)));
3114 	if (freezer != curthread)
3115 		mutex_exit(QLOCK(q));
3116 
3117 	return (1);
3118 }
3119 
3120 /*
3121  * Create and put a control message on queue.
3122  */
3123 int
3124 putctl(queue_t *q, int type)
3125 {
3126 	mblk_t *bp;
3127 
3128 	if ((datamsg(type) && (type != M_DELAY)) ||
3129 	    (bp = allocb_tryhard(0)) == NULL)
3130 		return (0);
3131 	bp->b_datap->db_type = (unsigned char) type;
3132 
3133 	put(q, bp);
3134 
3135 	return (1);
3136 }
3137 
3138 /*
3139  * Control message with a single-byte parameter
3140  */
3141 int
3142 putctl1(queue_t *q, int type, int param)
3143 {
3144 	mblk_t *bp;
3145 
3146 	if ((datamsg(type) && (type != M_DELAY)) ||
3147 	    (bp = allocb_tryhard(1)) == NULL)
3148 		return (0);
3149 	bp->b_datap->db_type = (unsigned char)type;
3150 	*bp->b_wptr++ = (unsigned char)param;
3151 
3152 	put(q, bp);
3153 
3154 	return (1);
3155 }
3156 
3157 int
3158 putnextctl1(queue_t *q, int type, int param)
3159 {
3160 	mblk_t *bp;
3161 
3162 	if ((datamsg(type) && (type != M_DELAY)) ||
3163 	    ((bp = allocb_tryhard(1)) == NULL))
3164 		return (0);
3165 
3166 	bp->b_datap->db_type = (unsigned char)type;
3167 	*bp->b_wptr++ = (unsigned char)param;
3168 
3169 	putnext(q, bp);
3170 
3171 	return (1);
3172 }
3173 
3174 int
3175 putnextctl(queue_t *q, int type)
3176 {
3177 	mblk_t *bp;
3178 
3179 	if ((datamsg(type) && (type != M_DELAY)) ||
3180 	    ((bp = allocb_tryhard(0)) == NULL))
3181 		return (0);
3182 	bp->b_datap->db_type = (unsigned char)type;
3183 
3184 	putnext(q, bp);
3185 
3186 	return (1);
3187 }
3188 
3189 /*
3190  * Return the queue upstream from this one
3191  */
3192 queue_t *
3193 backq(queue_t *q)
3194 {
3195 	q = _OTHERQ(q);
3196 	if (q->q_next) {
3197 		q = q->q_next;
3198 		return (_OTHERQ(q));
3199 	}
3200 	return (NULL);
3201 }
3202 
3203 /*
3204  * Send a block back up the queue in reverse from this
3205  * one (e.g. to respond to ioctls)
3206  */
3207 void
3208 qreply(queue_t *q, mblk_t *bp)
3209 {
3210 	ASSERT(q && bp);
3211 
3212 	putnext(_OTHERQ(q), bp);
3213 }
3214 
3215 /*
3216  * Streams Queue Scheduling
3217  *
3218  * Queues are enabled through qenable() when they have messages to
3219  * process.  They are serviced by queuerun(), which runs each enabled
3220  * queue's service procedure.  The call to queuerun() is processor
3221  * dependent - the general principle is that it be run whenever a queue
3222  * is enabled but before returning to user level.  For system calls,
3223  * the function runqueues() is called if their action causes a queue
3224  * to be enabled.  For device interrupts, queuerun() should be
3225  * called before returning from the last level of interrupt.  Beyond
3226  * this, no timing assumptions should be made about queue scheduling.
3227  */
3228 
3229 /*
3230  * Enable a queue: put it on list of those whose service procedures are
3231  * ready to run and set up the scheduling mechanism.
3232  * The broadcast is done outside the mutex -> to avoid the woken thread
3233  * from contending with the mutex. This is OK 'cos the queue has been
3234  * enqueued on the runlist and flagged safely at this point.
3235  */
3236 void
3237 qenable(queue_t *q)
3238 {
3239 	mutex_enter(QLOCK(q));
3240 	qenable_locked(q);
3241 	mutex_exit(QLOCK(q));
3242 }
3243 /*
3244  * Return number of messages on queue
3245  */
3246 int
3247 qsize(queue_t *qp)
3248 {
3249 	int count = 0;
3250 	mblk_t *mp;
3251 
3252 	mutex_enter(QLOCK(qp));
3253 	for (mp = qp->q_first; mp; mp = mp->b_next)
3254 		count++;
3255 	mutex_exit(QLOCK(qp));
3256 	return (count);
3257 }
3258 
3259 /*
3260  * noenable - set queue so that putq() will not enable it.
3261  * enableok - set queue so that putq() can enable it.
3262  */
3263 void
3264 noenable(queue_t *q)
3265 {
3266 	mutex_enter(QLOCK(q));
3267 	q->q_flag |= QNOENB;
3268 	mutex_exit(QLOCK(q));
3269 }
3270 
3271 void
3272 enableok(queue_t *q)
3273 {
3274 	mutex_enter(QLOCK(q));
3275 	q->q_flag &= ~QNOENB;
3276 	mutex_exit(QLOCK(q));
3277 }
3278 
3279 /*
3280  * Set queue fields.
3281  */
3282 int
3283 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
3284 {
3285 	qband_t *qbp = NULL;
3286 	queue_t	*wrq;
3287 	int error = 0;
3288 	kthread_id_t freezer;
3289 
3290 	freezer = STREAM(q)->sd_freezer;
3291 	if (freezer == curthread) {
3292 		ASSERT(frozenstr(q));
3293 		ASSERT(MUTEX_HELD(QLOCK(q)));
3294 	} else
3295 		mutex_enter(QLOCK(q));
3296 
3297 	if (what >= QBAD) {
3298 		error = EINVAL;
3299 		goto done;
3300 	}
3301 	if (pri != 0) {
3302 		int i;
3303 		qband_t **qbpp;
3304 
3305 		if (pri > q->q_nband) {
3306 			qbpp = &q->q_bandp;
3307 			while (*qbpp)
3308 				qbpp = &(*qbpp)->qb_next;
3309 			while (pri > q->q_nband) {
3310 				if ((*qbpp = allocband()) == NULL) {
3311 					error = EAGAIN;
3312 					goto done;
3313 				}
3314 				(*qbpp)->qb_hiwat = q->q_hiwat;
3315 				(*qbpp)->qb_lowat = q->q_lowat;
3316 				q->q_nband++;
3317 				qbpp = &(*qbpp)->qb_next;
3318 			}
3319 		}
3320 		qbp = q->q_bandp;
3321 		i = pri;
3322 		while (--i)
3323 			qbp = qbp->qb_next;
3324 	}
3325 	switch (what) {
3326 
3327 	case QHIWAT:
3328 		if (qbp)
3329 			qbp->qb_hiwat = (size_t)val;
3330 		else
3331 			q->q_hiwat = (size_t)val;
3332 		break;
3333 
3334 	case QLOWAT:
3335 		if (qbp)
3336 			qbp->qb_lowat = (size_t)val;
3337 		else
3338 			q->q_lowat = (size_t)val;
3339 		break;
3340 
3341 	case QMAXPSZ:
3342 		if (qbp)
3343 			error = EINVAL;
3344 		else
3345 			q->q_maxpsz = (ssize_t)val;
3346 
3347 		/*
3348 		 * Performance concern, strwrite looks at the module below
3349 		 * the stream head for the maxpsz each time it does a write
3350 		 * we now cache it at the stream head.  Check to see if this
3351 		 * queue is sitting directly below the stream head.
3352 		 */
3353 		wrq = STREAM(q)->sd_wrq;
3354 		if (q != wrq->q_next)
3355 			break;
3356 
3357 		/*
3358 		 * If the stream is not frozen drop the current QLOCK and
3359 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3360 		 */
3361 		if (freezer != curthread) {
3362 			mutex_exit(QLOCK(q));
3363 			mutex_enter(QLOCK(wrq));
3364 		}
3365 		ASSERT(MUTEX_HELD(QLOCK(wrq)));
3366 
3367 		if (strmsgsz != 0) {
3368 			if (val == INFPSZ)
3369 				val = strmsgsz;
3370 			else  {
3371 				if (STREAM(q)->sd_vnode->v_type == VFIFO)
3372 					val = MIN(PIPE_BUF, val);
3373 				else
3374 					val = MIN(strmsgsz, val);
3375 			}
3376 		}
3377 		STREAM(q)->sd_qn_maxpsz = val;
3378 		if (freezer != curthread) {
3379 			mutex_exit(QLOCK(wrq));
3380 			mutex_enter(QLOCK(q));
3381 		}
3382 		break;
3383 
3384 	case QMINPSZ:
3385 		if (qbp)
3386 			error = EINVAL;
3387 		else
3388 			q->q_minpsz = (ssize_t)val;
3389 
3390 		/*
3391 		 * Performance concern, strwrite looks at the module below
3392 		 * the stream head for the maxpsz each time it does a write
3393 		 * we now cache it at the stream head.  Check to see if this
3394 		 * queue is sitting directly below the stream head.
3395 		 */
3396 		wrq = STREAM(q)->sd_wrq;
3397 		if (q != wrq->q_next)
3398 			break;
3399 
3400 		/*
3401 		 * If the stream is not frozen drop the current QLOCK and
3402 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3403 		 */
3404 		if (freezer != curthread) {
3405 			mutex_exit(QLOCK(q));
3406 			mutex_enter(QLOCK(wrq));
3407 		}
3408 		STREAM(q)->sd_qn_minpsz = (ssize_t)val;
3409 
3410 		if (freezer != curthread) {
3411 			mutex_exit(QLOCK(wrq));
3412 			mutex_enter(QLOCK(q));
3413 		}
3414 		break;
3415 
3416 	case QSTRUIOT:
3417 		if (qbp)
3418 			error = EINVAL;
3419 		else
3420 			q->q_struiot = (ushort_t)val;
3421 		break;
3422 
3423 	case QCOUNT:
3424 	case QFIRST:
3425 	case QLAST:
3426 	case QFLAG:
3427 		error = EPERM;
3428 		break;
3429 
3430 	default:
3431 		error = EINVAL;
3432 		break;
3433 	}
3434 done:
3435 	if (freezer != curthread)
3436 		mutex_exit(QLOCK(q));
3437 	return (error);
3438 }
3439 
3440 /*
3441  * Get queue fields.
3442  */
3443 int
3444 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
3445 {
3446 	qband_t 	*qbp = NULL;
3447 	int 		error = 0;
3448 	kthread_id_t 	freezer;
3449 
3450 	freezer = STREAM(q)->sd_freezer;
3451 	if (freezer == curthread) {
3452 		ASSERT(frozenstr(q));
3453 		ASSERT(MUTEX_HELD(QLOCK(q)));
3454 	} else
3455 		mutex_enter(QLOCK(q));
3456 	if (what >= QBAD) {
3457 		error = EINVAL;
3458 		goto done;
3459 	}
3460 	if (pri != 0) {
3461 		int i;
3462 		qband_t **qbpp;
3463 
3464 		if (pri > q->q_nband) {
3465 			qbpp = &q->q_bandp;
3466 			while (*qbpp)
3467 				qbpp = &(*qbpp)->qb_next;
3468 			while (pri > q->q_nband) {
3469 				if ((*qbpp = allocband()) == NULL) {
3470 					error = EAGAIN;
3471 					goto done;
3472 				}
3473 				(*qbpp)->qb_hiwat = q->q_hiwat;
3474 				(*qbpp)->qb_lowat = q->q_lowat;
3475 				q->q_nband++;
3476 				qbpp = &(*qbpp)->qb_next;
3477 			}
3478 		}
3479 		qbp = q->q_bandp;
3480 		i = pri;
3481 		while (--i)
3482 			qbp = qbp->qb_next;
3483 	}
3484 	switch (what) {
3485 	case QHIWAT:
3486 		if (qbp)
3487 			*(size_t *)valp = qbp->qb_hiwat;
3488 		else
3489 			*(size_t *)valp = q->q_hiwat;
3490 		break;
3491 
3492 	case QLOWAT:
3493 		if (qbp)
3494 			*(size_t *)valp = qbp->qb_lowat;
3495 		else
3496 			*(size_t *)valp = q->q_lowat;
3497 		break;
3498 
3499 	case QMAXPSZ:
3500 		if (qbp)
3501 			error = EINVAL;
3502 		else
3503 			*(ssize_t *)valp = q->q_maxpsz;
3504 		break;
3505 
3506 	case QMINPSZ:
3507 		if (qbp)
3508 			error = EINVAL;
3509 		else
3510 			*(ssize_t *)valp = q->q_minpsz;
3511 		break;
3512 
3513 	case QCOUNT:
3514 		if (qbp)
3515 			*(size_t *)valp = qbp->qb_count;
3516 		else
3517 			*(size_t *)valp = q->q_count;
3518 		break;
3519 
3520 	case QFIRST:
3521 		if (qbp)
3522 			*(mblk_t **)valp = qbp->qb_first;
3523 		else
3524 			*(mblk_t **)valp = q->q_first;
3525 		break;
3526 
3527 	case QLAST:
3528 		if (qbp)
3529 			*(mblk_t **)valp = qbp->qb_last;
3530 		else
3531 			*(mblk_t **)valp = q->q_last;
3532 		break;
3533 
3534 	case QFLAG:
3535 		if (qbp)
3536 			*(uint_t *)valp = qbp->qb_flag;
3537 		else
3538 			*(uint_t *)valp = q->q_flag;
3539 		break;
3540 
3541 	case QSTRUIOT:
3542 		if (qbp)
3543 			error = EINVAL;
3544 		else
3545 			*(short *)valp = q->q_struiot;
3546 		break;
3547 
3548 	default:
3549 		error = EINVAL;
3550 		break;
3551 	}
3552 done:
3553 	if (freezer != curthread)
3554 		mutex_exit(QLOCK(q));
3555 	return (error);
3556 }
3557 
3558 /*
3559  * Function awakes all in cvwait/sigwait/pollwait, on one of:
3560  *	QWANTWSYNC or QWANTR or QWANTW,
3561  *
3562  * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
3563  *	 deferred wakeup will be done. Also if strpoll() in progress then a
3564  *	 deferred pollwakeup will be done.
3565  */
3566 void
3567 strwakeq(queue_t *q, int flag)
3568 {
3569 	stdata_t 	*stp = STREAM(q);
3570 	pollhead_t 	*pl;
3571 
3572 	mutex_enter(&stp->sd_lock);
3573 	pl = &stp->sd_pollist;
3574 	if (flag & QWANTWSYNC) {
3575 		ASSERT(!(q->q_flag & QREADR));
3576 		if (stp->sd_flag & WSLEEP) {
3577 			stp->sd_flag &= ~WSLEEP;
3578 			cv_broadcast(&stp->sd_wrq->q_wait);
3579 		} else {
3580 			stp->sd_wakeq |= WSLEEP;
3581 		}
3582 
3583 		mutex_exit(&stp->sd_lock);
3584 		pollwakeup(pl, POLLWRNORM);
3585 		mutex_enter(&stp->sd_lock);
3586 
3587 		if (stp->sd_sigflags & S_WRNORM)
3588 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3589 	} else if (flag & QWANTR) {
3590 		if (stp->sd_flag & RSLEEP) {
3591 			stp->sd_flag &= ~RSLEEP;
3592 			cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
3593 		} else {
3594 			stp->sd_wakeq |= RSLEEP;
3595 		}
3596 
3597 		mutex_exit(&stp->sd_lock);
3598 		pollwakeup(pl, POLLIN | POLLRDNORM);
3599 		mutex_enter(&stp->sd_lock);
3600 
3601 		{
3602 			int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
3603 
3604 			if (events)
3605 				strsendsig(stp->sd_siglist, events, 0, 0);
3606 		}
3607 	} else {
3608 		if (stp->sd_flag & WSLEEP) {
3609 			stp->sd_flag &= ~WSLEEP;
3610 			cv_broadcast(&stp->sd_wrq->q_wait);
3611 		}
3612 
3613 		mutex_exit(&stp->sd_lock);
3614 		pollwakeup(pl, POLLWRNORM);
3615 		mutex_enter(&stp->sd_lock);
3616 
3617 		if (stp->sd_sigflags & S_WRNORM)
3618 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3619 	}
3620 	mutex_exit(&stp->sd_lock);
3621 }
3622 
3623 int
3624 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
3625 {
3626 	stdata_t *stp = STREAM(q);
3627 	int typ  = STRUIOT_STANDARD;
3628 	uio_t	 *uiop = &dp->d_uio;
3629 	dblk_t	 *dbp;
3630 	ssize_t	 uiocnt;
3631 	ssize_t	 cnt;
3632 	unsigned char *ptr;
3633 	ssize_t	 resid;
3634 	int	 error = 0;
3635 	on_trap_data_t otd;
3636 	queue_t	*stwrq;
3637 
3638 	/*
3639 	 * Plumbing may change while taking the type so store the
3640 	 * queue in a temporary variable. It doesn't matter even
3641 	 * if the we take the type from the previous plumbing,
3642 	 * that's because if the plumbing has changed when we were
3643 	 * holding the queue in a temporary variable, we can continue
3644 	 * processing the message the way it would have been processed
3645 	 * in the old plumbing, without any side effects but a bit
3646 	 * extra processing for partial ip header checksum.
3647 	 *
3648 	 * This has been done to avoid holding the sd_lock which is
3649 	 * very hot.
3650 	 */
3651 
3652 	stwrq = stp->sd_struiowrq;
3653 	if (stwrq)
3654 		typ = stwrq->q_struiot;
3655 
3656 	for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
3657 		dbp = mp->b_datap;
3658 		ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
3659 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3660 		cnt = MIN(uiocnt, uiop->uio_resid);
3661 		if (!(dbp->db_struioflag & STRUIO_SPEC) ||
3662 		    (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
3663 			/*
3664 			 * Either this mblk has already been processed
3665 			 * or there is no more room in this mblk (?).
3666 			 */
3667 			continue;
3668 		}
3669 		switch (typ) {
3670 		case STRUIOT_STANDARD:
3671 			if (noblock) {
3672 				if (on_trap(&otd, OT_DATA_ACCESS)) {
3673 					no_trap();
3674 					error = EWOULDBLOCK;
3675 					goto out;
3676 				}
3677 			}
3678 			if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
3679 				if (noblock)
3680 					no_trap();
3681 				goto out;
3682 			}
3683 			if (noblock)
3684 				no_trap();
3685 			break;
3686 
3687 		default:
3688 			error = EIO;
3689 			goto out;
3690 		}
3691 		dbp->db_struioflag |= STRUIO_DONE;
3692 		dbp->db_cksumstuff += cnt;
3693 	}
3694 out:
3695 	if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
3696 		/*
3697 		 * A fault has occured and some bytes were moved to the
3698 		 * current mblk, the uio_t has already been updated by
3699 		 * the appropriate uio routine, so also update the mblk
3700 		 * to reflect this in case this same mblk chain is used
3701 		 * again (after the fault has been handled).
3702 		 */
3703 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3704 		if (uiocnt >= resid)
3705 			dbp->db_cksumstuff += resid;
3706 	}
3707 	return (error);
3708 }
3709 
3710 /*
3711  * Try to enter queue synchronously. Any attempt to enter a closing queue will
3712  * fails. The qp->q_rwcnt keeps track of the number of successful entries so
3713  * that removeq() will not try to close the queue while a thread is inside the
3714  * queue.
3715  */
3716 static boolean_t
3717 rwnext_enter(queue_t *qp)
3718 {
3719 	mutex_enter(QLOCK(qp));
3720 	if (qp->q_flag & QWCLOSE) {
3721 		mutex_exit(QLOCK(qp));
3722 		return (B_FALSE);
3723 	}
3724 	qp->q_rwcnt++;
3725 	ASSERT(qp->q_rwcnt != 0);
3726 	mutex_exit(QLOCK(qp));
3727 	return (B_TRUE);
3728 }
3729 
3730 /*
3731  * Decrease the count of threads running in sync stream queue and wake up any
3732  * threads blocked in removeq().
3733  */
3734 static void
3735 rwnext_exit(queue_t *qp)
3736 {
3737 	mutex_enter(QLOCK(qp));
3738 	qp->q_rwcnt--;
3739 	if (qp->q_flag & QWANTRMQSYNC) {
3740 		qp->q_flag &= ~QWANTRMQSYNC;
3741 		cv_broadcast(&qp->q_wait);
3742 	}
3743 	mutex_exit(QLOCK(qp));
3744 }
3745 
3746 /*
3747  * The purpose of rwnext() is to call the rw procedure of the next
3748  * (downstream) modules queue.
3749  *
3750  * treated as put entrypoint for perimeter syncronization.
3751  *
3752  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3753  * sync queues). If it is CIPUT sync queue sq_count is incremented and it does
3754  * not matter if any regular put entrypoints have been already entered. We
3755  * can't increment one of the sq_putcounts (instead of sq_count) because
3756  * qwait_rw won't know which counter to decrement.
3757  *
3758  * It would be reasonable to add the lockless FASTPUT logic.
3759  */
3760 int
3761 rwnext(queue_t *qp, struiod_t *dp)
3762 {
3763 	queue_t		*nqp;
3764 	syncq_t		*sq;
3765 	uint16_t	count;
3766 	uint16_t	flags;
3767 	struct qinit	*qi;
3768 	int		(*proc)();
3769 	struct stdata	*stp;
3770 	int		isread;
3771 	int		rval;
3772 
3773 	stp = STREAM(qp);
3774 	/*
3775 	 * Prevent q_next from changing by holding sd_lock until acquiring
3776 	 * SQLOCK. Note that a read-side rwnext from the streamhead will
3777 	 * already have sd_lock acquired. In either case sd_lock is always
3778 	 * released after acquiring SQLOCK.
3779 	 *
3780 	 * The streamhead read-side holding sd_lock when calling rwnext is
3781 	 * required to prevent a race condition were M_DATA mblks flowing
3782 	 * up the read-side of the stream could be bypassed by a rwnext()
3783 	 * down-call. In this case sd_lock acts as the streamhead perimeter.
3784 	 */
3785 	if ((nqp = _WR(qp)) == qp) {
3786 		isread = 0;
3787 		mutex_enter(&stp->sd_lock);
3788 		qp = nqp->q_next;
3789 	} else {
3790 		isread = 1;
3791 		if (nqp != stp->sd_wrq)
3792 			/* Not streamhead */
3793 			mutex_enter(&stp->sd_lock);
3794 		qp = _RD(nqp->q_next);
3795 	}
3796 	qi = qp->q_qinfo;
3797 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
3798 		/*
3799 		 * Not a synchronous module or no r/w procedure for this
3800 		 * queue, so just return EINVAL and let the caller handle it.
3801 		 */
3802 		mutex_exit(&stp->sd_lock);
3803 		return (EINVAL);
3804 	}
3805 
3806 	if (rwnext_enter(qp) == B_FALSE) {
3807 		mutex_exit(&stp->sd_lock);
3808 		return (EINVAL);
3809 	}
3810 
3811 	sq = qp->q_syncq;
3812 	mutex_enter(SQLOCK(sq));
3813 	mutex_exit(&stp->sd_lock);
3814 	count = sq->sq_count;
3815 	flags = sq->sq_flags;
3816 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3817 
3818 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3819 		/*
3820 		 * if this queue is being closed, return.
3821 		 */
3822 		if (qp->q_flag & QWCLOSE) {
3823 			mutex_exit(SQLOCK(sq));
3824 			rwnext_exit(qp);
3825 			return (EINVAL);
3826 		}
3827 
3828 		/*
3829 		 * Wait until we can enter the inner perimeter.
3830 		 */
3831 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3832 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3833 		count = sq->sq_count;
3834 		flags = sq->sq_flags;
3835 	}
3836 
3837 	if (isread == 0 && stp->sd_struiowrq == NULL ||
3838 	    isread == 1 && stp->sd_struiordq == NULL) {
3839 		/*
3840 		 * Stream plumbing changed while waiting for inner perimeter
3841 		 * so just return EINVAL and let the caller handle it.
3842 		 */
3843 		mutex_exit(SQLOCK(sq));
3844 		rwnext_exit(qp);
3845 		return (EINVAL);
3846 	}
3847 	if (!(flags & SQ_CIPUT))
3848 		sq->sq_flags = flags | SQ_EXCL;
3849 	sq->sq_count = count + 1;
3850 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3851 	/*
3852 	 * Note: The only message ordering guarantee that rwnext() makes is
3853 	 *	 for the write queue flow-control case. All others (r/w queue
3854 	 *	 with q_count > 0 (or q_first != 0)) are the resposibilty of
3855 	 *	 the queue's rw procedure. This could be genralized here buy
3856 	 *	 running the queue's service procedure, but that wouldn't be
3857 	 *	 the most efficent for all cases.
3858 	 */
3859 	mutex_exit(SQLOCK(sq));
3860 	if (! isread && (qp->q_flag & QFULL)) {
3861 		/*
3862 		 * Write queue may be flow controlled. If so,
3863 		 * mark the queue for wakeup when it's not.
3864 		 */
3865 		mutex_enter(QLOCK(qp));
3866 		if (qp->q_flag & QFULL) {
3867 			qp->q_flag |= QWANTWSYNC;
3868 			mutex_exit(QLOCK(qp));
3869 			rval = EWOULDBLOCK;
3870 			goto out;
3871 		}
3872 		mutex_exit(QLOCK(qp));
3873 	}
3874 
3875 	if (! isread && dp->d_mp)
3876 		STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
3877 		    dp->d_mp->b_datap->db_base);
3878 
3879 	rval = (*proc)(qp, dp);
3880 
3881 	if (isread && dp->d_mp)
3882 		STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
3883 		    dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
3884 out:
3885 	/*
3886 	 * The queue is protected from being freed by sq_count, so it is
3887 	 * safe to call rwnext_exit and reacquire SQLOCK(sq).
3888 	 */
3889 	rwnext_exit(qp);
3890 
3891 	mutex_enter(SQLOCK(sq));
3892 	flags = sq->sq_flags;
3893 	ASSERT(sq->sq_count != 0);
3894 	sq->sq_count--;
3895 	if (flags & SQ_TAIL) {
3896 		putnext_tail(sq, qp, flags);
3897 		/*
3898 		 * The only purpose of this ASSERT is to preserve calling stack
3899 		 * in DEBUG kernel.
3900 		 */
3901 		ASSERT(flags & SQ_TAIL);
3902 		return (rval);
3903 	}
3904 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3905 	/*
3906 	 * Safe to always drop SQ_EXCL:
3907 	 *	Not SQ_CIPUT means we set SQ_EXCL above
3908 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3909 	 *	did a qwriter(INNER) in which case nobody else
3910 	 *	is in the inner perimeter and we are exiting.
3911 	 *
3912 	 * I would like to make the following assertion:
3913 	 *
3914 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3915 	 * 	sq->sq_count == 0);
3916 	 *
3917 	 * which indicates that if we are both putshared and exclusive,
3918 	 * we became exclusive while executing the putproc, and the only
3919 	 * claim on the syncq was the one we dropped a few lines above.
3920 	 * But other threads that enter putnext while the syncq is exclusive
3921 	 * need to make a claim as they may need to drop SQLOCK in the
3922 	 * has_writers case to avoid deadlocks.  If these threads are
3923 	 * delayed or preempted, it is possible that the writer thread can
3924 	 * find out that there are other claims making the (sq_count == 0)
3925 	 * test invalid.
3926 	 */
3927 
3928 	sq->sq_flags = flags & ~SQ_EXCL;
3929 	if (sq->sq_flags & SQ_WANTWAKEUP) {
3930 		sq->sq_flags &= ~SQ_WANTWAKEUP;
3931 		cv_broadcast(&sq->sq_wait);
3932 	}
3933 	mutex_exit(SQLOCK(sq));
3934 	return (rval);
3935 }
3936 
3937 /*
3938  * The purpose of infonext() is to call the info procedure of the next
3939  * (downstream) modules queue.
3940  *
3941  * treated as put entrypoint for perimeter syncronization.
3942  *
3943  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3944  * sync queues). If it is CIPUT sync queue regular sq_count is incremented and
3945  * it does not matter if any regular put entrypoints have been already
3946  * entered.
3947  */
3948 int
3949 infonext(queue_t *qp, infod_t *idp)
3950 {
3951 	queue_t		*nqp;
3952 	syncq_t		*sq;
3953 	uint16_t	count;
3954 	uint16_t 	flags;
3955 	struct qinit	*qi;
3956 	int		(*proc)();
3957 	struct stdata	*stp;
3958 	int		rval;
3959 
3960 	stp = STREAM(qp);
3961 	/*
3962 	 * Prevent q_next from changing by holding sd_lock until
3963 	 * acquiring SQLOCK.
3964 	 */
3965 	mutex_enter(&stp->sd_lock);
3966 	if ((nqp = _WR(qp)) == qp) {
3967 		qp = nqp->q_next;
3968 	} else {
3969 		qp = _RD(nqp->q_next);
3970 	}
3971 	qi = qp->q_qinfo;
3972 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
3973 		mutex_exit(&stp->sd_lock);
3974 		return (EINVAL);
3975 	}
3976 	sq = qp->q_syncq;
3977 	mutex_enter(SQLOCK(sq));
3978 	mutex_exit(&stp->sd_lock);
3979 	count = sq->sq_count;
3980 	flags = sq->sq_flags;
3981 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3982 
3983 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3984 		/*
3985 		 * Wait until we can enter the inner perimeter.
3986 		 */
3987 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3988 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3989 		count = sq->sq_count;
3990 		flags = sq->sq_flags;
3991 	}
3992 
3993 	if (! (flags & SQ_CIPUT))
3994 		sq->sq_flags = flags | SQ_EXCL;
3995 	sq->sq_count = count + 1;
3996 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3997 	mutex_exit(SQLOCK(sq));
3998 
3999 	rval = (*proc)(qp, idp);
4000 
4001 	mutex_enter(SQLOCK(sq));
4002 	flags = sq->sq_flags;
4003 	ASSERT(sq->sq_count != 0);
4004 	sq->sq_count--;
4005 	if (flags & SQ_TAIL) {
4006 		putnext_tail(sq, qp, flags);
4007 		/*
4008 		 * The only purpose of this ASSERT is to preserve calling stack
4009 		 * in DEBUG kernel.
4010 		 */
4011 		ASSERT(flags & SQ_TAIL);
4012 		return (rval);
4013 	}
4014 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
4015 /*
4016  * XXXX
4017  * I am not certain the next comment is correct here.  I need to consider
4018  * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
4019  * might cause other problems.  It just might be safer to drop it if
4020  * !SQ_CIPUT because that is when we set it.
4021  */
4022 	/*
4023 	 * Safe to always drop SQ_EXCL:
4024 	 *	Not SQ_CIPUT means we set SQ_EXCL above
4025 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
4026 	 *	did a qwriter(INNER) in which case nobody else
4027 	 *	is in the inner perimeter and we are exiting.
4028 	 *
4029 	 * I would like to make the following assertion:
4030 	 *
4031 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
4032 	 *	sq->sq_count == 0);
4033 	 *
4034 	 * which indicates that if we are both putshared and exclusive,
4035 	 * we became exclusive while executing the putproc, and the only
4036 	 * claim on the syncq was the one we dropped a few lines above.
4037 	 * But other threads that enter putnext while the syncq is exclusive
4038 	 * need to make a claim as they may need to drop SQLOCK in the
4039 	 * has_writers case to avoid deadlocks.  If these threads are
4040 	 * delayed or preempted, it is possible that the writer thread can
4041 	 * find out that there are other claims making the (sq_count == 0)
4042 	 * test invalid.
4043 	 */
4044 
4045 	sq->sq_flags = flags & ~SQ_EXCL;
4046 	mutex_exit(SQLOCK(sq));
4047 	return (rval);
4048 }
4049 
4050 /*
4051  * Return nonzero if the queue is responsible for struio(), else return 0.
4052  */
4053 int
4054 isuioq(queue_t *q)
4055 {
4056 	if (q->q_flag & QREADR)
4057 		return (STREAM(q)->sd_struiordq == q);
4058 	else
4059 		return (STREAM(q)->sd_struiowrq == q);
4060 }
4061 
4062 #if defined(__sparc)
4063 int disable_putlocks = 0;
4064 #else
4065 int disable_putlocks = 1;
4066 #endif
4067 
4068 /*
4069  * called by create_putlock.
4070  */
4071 static void
4072 create_syncq_putlocks(queue_t *q)
4073 {
4074 	syncq_t	*sq = q->q_syncq;
4075 	ciputctrl_t *cip;
4076 	int i;
4077 
4078 	ASSERT(sq != NULL);
4079 
4080 	ASSERT(disable_putlocks == 0);
4081 	ASSERT(n_ciputctrl >= min_n_ciputctrl);
4082 	ASSERT(ciputctrl_cache != NULL);
4083 
4084 	if (!(sq->sq_type & SQ_CIPUT))
4085 		return;
4086 
4087 	for (i = 0; i <= 1; i++) {
4088 		if (sq->sq_ciputctrl == NULL) {
4089 			cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4090 			SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4091 			mutex_enter(SQLOCK(sq));
4092 			if (sq->sq_ciputctrl != NULL) {
4093 				mutex_exit(SQLOCK(sq));
4094 				kmem_cache_free(ciputctrl_cache, cip);
4095 			} else {
4096 				ASSERT(sq->sq_nciputctrl == 0);
4097 				sq->sq_nciputctrl = n_ciputctrl - 1;
4098 				/*
4099 				 * putnext checks sq_ciputctrl without holding
4100 				 * SQLOCK. if it is not NULL putnext assumes
4101 				 * sq_nciputctrl is initialized. membar below
4102 				 * insures that.
4103 				 */
4104 				membar_producer();
4105 				sq->sq_ciputctrl = cip;
4106 				mutex_exit(SQLOCK(sq));
4107 			}
4108 		}
4109 		ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
4110 		if (i == 1)
4111 			break;
4112 		q = _OTHERQ(q);
4113 		if (!(q->q_flag & QPERQ)) {
4114 			ASSERT(sq == q->q_syncq);
4115 			break;
4116 		}
4117 		ASSERT(q->q_syncq != NULL);
4118 		ASSERT(sq != q->q_syncq);
4119 		sq = q->q_syncq;
4120 		ASSERT(sq->sq_type & SQ_CIPUT);
4121 	}
4122 }
4123 
4124 /*
4125  * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
4126  * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
4127  * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
4128  * starting from q and down to the driver.
4129  *
4130  * This should be called after the affected queues are part of stream
4131  * geometry. It should be called from driver/module open routine after
4132  * qprocson() call. It is also called from nfs syscall where it is known that
4133  * stream is configured and won't change its geometry during create_putlock
4134  * call.
4135  *
4136  * caller normally uses 0 value for the stream argument to speed up MT putnext
4137  * into the perimeter of q for example because its perimeter is per module
4138  * (e.g. IP).
4139  *
4140  * caller normally uses non 0 value for the stream argument to hint the system
4141  * that the stream of q is a very contended global system stream
4142  * (e.g. NFS/UDP) and the part of the stream from q to the driver is
4143  * particularly MT hot.
4144  *
4145  * Caller insures stream plumbing won't happen while we are here and therefore
4146  * q_next can be safely used.
4147  */
4148 
4149 void
4150 create_putlocks(queue_t *q, int stream)
4151 {
4152 	ciputctrl_t	*cip;
4153 	struct stdata	*stp = STREAM(q);
4154 
4155 	q = _WR(q);
4156 	ASSERT(stp != NULL);
4157 
4158 	if (disable_putlocks != 0)
4159 		return;
4160 
4161 	if (n_ciputctrl < min_n_ciputctrl)
4162 		return;
4163 
4164 	ASSERT(ciputctrl_cache != NULL);
4165 
4166 	if (stream != 0 && stp->sd_ciputctrl == NULL) {
4167 		cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4168 		SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4169 		mutex_enter(&stp->sd_lock);
4170 		if (stp->sd_ciputctrl != NULL) {
4171 			mutex_exit(&stp->sd_lock);
4172 			kmem_cache_free(ciputctrl_cache, cip);
4173 		} else {
4174 			ASSERT(stp->sd_nciputctrl == 0);
4175 			stp->sd_nciputctrl = n_ciputctrl - 1;
4176 			/*
4177 			 * putnext checks sd_ciputctrl without holding
4178 			 * sd_lock. if it is not NULL putnext assumes
4179 			 * sd_nciputctrl is initialized. membar below
4180 			 * insures that.
4181 			 */
4182 			membar_producer();
4183 			stp->sd_ciputctrl = cip;
4184 			mutex_exit(&stp->sd_lock);
4185 		}
4186 	}
4187 
4188 	ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);
4189 
4190 	while (_SAMESTR(q)) {
4191 		create_syncq_putlocks(q);
4192 		if (stream == 0)
4193 			return;
4194 		q = q->q_next;
4195 	}
4196 	ASSERT(q != NULL);
4197 	create_syncq_putlocks(q);
4198 }
4199 
4200 /*
4201  * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
4202  * through a stream.
4203  *
4204  * Data currently record per-event is a timestamp, module/driver name,
4205  * downstream module/driver name, optional callstack, event type and a per
4206  * type datum.  Much of the STREAMS framework is instrumented for automatic
4207  * flow tracing (when enabled).  Events can be defined and used by STREAMS
4208  * modules and drivers.
4209  *
4210  * Global objects:
4211  *
4212  *	str_ftevent() - Add a flow-trace event to a dblk.
4213  *	str_ftfree() - Free flow-trace data
4214  *
4215  * Local objects:
4216  *
4217  *	fthdr_cache - pointer to the kmem cache for trace header.
4218  *	ftblk_cache - pointer to the kmem cache for trace data blocks.
4219  */
4220 
4221 int str_ftnever = 1;	/* Don't do STREAMS flow tracing */
4222 int str_ftstack = 0;	/* Don't record event call stacks */
4223 
4224 void
4225 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
4226 {
4227 	ftblk_t *bp = hp->tail;
4228 	ftblk_t *nbp;
4229 	ftevnt_t *ep;
4230 	int ix, nix;
4231 
4232 	ASSERT(hp != NULL);
4233 
4234 	for (;;) {
4235 		if ((ix = bp->ix) == FTBLK_EVNTS) {
4236 			/*
4237 			 * Tail doesn't have room, so need a new tail.
4238 			 *
4239 			 * To make this MT safe, first, allocate a new
4240 			 * ftblk, and initialize it.  To make life a
4241 			 * little easier, reserve the first slot (mostly
4242 			 * by making ix = 1).  When we are finished with
4243 			 * the initialization, CAS this pointer to the
4244 			 * tail.  If this succeeds, this is the new
4245 			 * "next" block.  Otherwise, another thread
4246 			 * got here first, so free the block and start
4247 			 * again.
4248 			 */
4249 			nbp = kmem_cache_alloc(ftblk_cache, KM_NOSLEEP);
4250 			if (nbp == NULL) {
4251 				/* no mem, so punt */
4252 				str_ftnever++;
4253 				/* free up all flow data? */
4254 				return;
4255 			}
4256 			nbp->nxt = NULL;
4257 			nbp->ix = 1;
4258 			/*
4259 			 * Just in case there is another thread about
4260 			 * to get the next index, we need to make sure
4261 			 * the value is there for it.
4262 			 */
4263 			membar_producer();
4264 			if (atomic_cas_ptr(&hp->tail, bp, nbp) == bp) {
4265 				/* CAS was successful */
4266 				bp->nxt = nbp;
4267 				membar_producer();
4268 				bp = nbp;
4269 				ix = 0;
4270 				goto cas_good;
4271 			} else {
4272 				kmem_cache_free(ftblk_cache, nbp);
4273 				bp = hp->tail;
4274 				continue;
4275 			}
4276 		}
4277 		nix = ix + 1;
4278 		if (atomic_cas_32((uint32_t *)&bp->ix, ix, nix) == ix) {
4279 		cas_good:
4280 			if (curthread != hp->thread) {
4281 				hp->thread = curthread;
4282 				evnt |= FTEV_CS;
4283 			}
4284 			if (CPU->cpu_seqid != hp->cpu_seqid) {
4285 				hp->cpu_seqid = CPU->cpu_seqid;
4286 				evnt |= FTEV_PS;
4287 			}
4288 			ep = &bp->ev[ix];
4289 			break;
4290 		}
4291 	}
4292 
4293 	if (evnt & FTEV_QMASK) {
4294 		queue_t *qp = p;
4295 
4296 		if (!(qp->q_flag & QREADR))
4297 			evnt |= FTEV_ISWR;
4298 
4299 		ep->mid = Q2NAME(qp);
4300 
4301 		/*
4302 		 * We only record the next queue name for FTEV_PUTNEXT since
4303 		 * that's the only time we *really* need it, and the putnext()
4304 		 * code ensures that qp->q_next won't vanish.  (We could use
4305 		 * claimstr()/releasestr() but at a performance cost.)
4306 		 */
4307 		if ((evnt & FTEV_MASK) == FTEV_PUTNEXT && qp->q_next != NULL)
4308 			ep->midnext = Q2NAME(qp->q_next);
4309 		else
4310 			ep->midnext = NULL;
4311 	} else {
4312 		ep->mid = p;
4313 		ep->midnext = NULL;
4314 	}
4315 
4316 	if (ep->stk != NULL)
4317 		ep->stk->fs_depth = getpcstack(ep->stk->fs_stk, FTSTK_DEPTH);
4318 
4319 	ep->ts = gethrtime();
4320 	ep->evnt = evnt;
4321 	ep->data = data;
4322 	hp->hash = (hp->hash << 9) + hp->hash;
4323 	hp->hash += (evnt << 16) | data;
4324 	hp->hash += (uintptr_t)ep->mid;
4325 }
4326 
4327 /*
4328  * Free flow-trace data.
4329  */
4330 void
4331 str_ftfree(dblk_t *dbp)
4332 {
4333 	fthdr_t *hp = dbp->db_fthdr;
4334 	ftblk_t *bp = &hp->first;
4335 	ftblk_t *nbp;
4336 
4337 	if (bp != hp->tail || bp->ix != 0) {
4338 		/*
4339 		 * Clear out the hash, have the tail point to itself, and free
4340 		 * any continuation blocks.
4341 		 */
4342 		bp = hp->first.nxt;
4343 		hp->tail = &hp->first;
4344 		hp->hash = 0;
4345 		hp->first.nxt = NULL;
4346 		hp->first.ix = 0;
4347 		while (bp != NULL) {
4348 			nbp = bp->nxt;
4349 			kmem_cache_free(ftblk_cache, bp);
4350 			bp = nbp;
4351 		}
4352 	}
4353 	kmem_cache_free(fthdr_cache, hp);
4354 	dbp->db_fthdr = NULL;
4355 }
4356