xref: /illumos-gate/usr/src/uts/common/io/stream.c (revision 0250c53ad267726f2438e3c6556199a0bbf588a2)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
22 /*	All Rights Reserved	*/
23 
24 /*
25  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
26  * Use is subject to license terms.
27  *
28  * Copyright 2021 Tintri by DDN, Inc. All rights reserved.
29  * Copyright 2022 Garrett D'Amore
30  */
31 
32 #include <sys/types.h>
33 #include <sys/param.h>
34 #include <sys/thread.h>
35 #include <sys/sysmacros.h>
36 #include <sys/stropts.h>
37 #include <sys/stream.h>
38 #include <sys/strsubr.h>
39 #include <sys/strsun.h>
40 #include <sys/conf.h>
41 #include <sys/debug.h>
42 #include <sys/cmn_err.h>
43 #include <sys/kmem.h>
44 #include <sys/atomic.h>
45 #include <sys/errno.h>
46 #include <sys/vtrace.h>
47 #include <sys/ftrace.h>
48 #include <sys/ontrap.h>
49 #include <sys/sdt.h>
50 #include <sys/strft.h>
51 
52 #ifdef DEBUG
53 #include <sys/kmem_impl.h>
54 #endif
55 
56 /*
57  * This file contains all the STREAMS utility routines that may
58  * be used by modules and drivers.
59  */
60 
61 /*
62  * STREAMS message allocator: principles of operation
63  *
64  * The streams message allocator consists of all the routines that
65  * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
66  * dupb(), freeb() and freemsg().  What follows is a high-level view
67  * of how the allocator works.
68  *
69  * Every streams message consists of one or more mblks, a dblk, and data.
70  * All mblks for all types of messages come from a common mblk_cache.
71  * The dblk and data come in several flavors, depending on how the
72  * message is allocated:
73  *
74  * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
75  *     fixed-size dblk/data caches. For message sizes that are multiples of
76  *     PAGESIZE, dblks are allocated separately from the buffer.
77  *     The associated buffer is allocated by the constructor using kmem_alloc().
78  *     For all other message sizes, dblk and its associated data is allocated
79  *     as a single contiguous chunk of memory.
80  *     Objects in these caches consist of a dblk plus its associated data.
81  *     allocb() determines the nearest-size cache by table lookup:
82  *     the dblk_cache[] array provides the mapping from size to dblk cache.
83  *
84  * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
85  *     kmem_alloc()'ing a buffer for the data and supplying that
86  *     buffer to gesballoc(), described below.
87  *
88  * (3) The four flavors of [d]esballoc[a] are all implemented by a
89  *     common routine, gesballoc() ("generic esballoc").  gesballoc()
90  *     allocates a dblk from the global dblk_esb_cache and sets db_base,
91  *     db_lim and db_frtnp to describe the caller-supplied buffer.
92  *
93  * While there are several routines to allocate messages, there is only
94  * one routine to free messages: freeb().  freeb() simply invokes the
95  * dblk's free method, dbp->db_free(), which is set at allocation time.
96  *
97  * dupb() creates a new reference to a message by allocating a new mblk,
98  * incrementing the dblk reference count and setting the dblk's free
99  * method to dblk_decref().  The dblk's original free method is retained
100  * in db_lastfree.  dblk_decref() decrements the reference count on each
101  * freeb().  If this is not the last reference it just frees the mblk;
102  * if this *is* the last reference, it restores db_free to db_lastfree,
103  * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
104  *
105  * The implementation makes aggressive use of kmem object caching for
106  * maximum performance.  This makes the code simple and compact, but
107  * also a bit abstruse in some places.  The invariants that constitute a
108  * message's constructed state, described below, are more subtle than usual.
109  *
110  * Every dblk has an "attached mblk" as part of its constructed state.
111  * The mblk is allocated by the dblk's constructor and remains attached
112  * until the message is either dup'ed or pulled up.  In the dupb() case
113  * the mblk association doesn't matter until the last free, at which time
114  * dblk_decref() attaches the last mblk to the dblk.  pullupmsg() affects
115  * the mblk association because it swaps the leading mblks of two messages,
116  * so it is responsible for swapping their db_mblk pointers accordingly.
117  * From a constructed-state viewpoint it doesn't matter that a dblk's
118  * attached mblk can change while the message is allocated; all that
119  * matters is that the dblk has *some* attached mblk when it's freed.
120  *
121  * The sizes of the allocb() small-message caches are not magical.
122  * They represent a good trade-off between internal and external
123  * fragmentation for current workloads.  They should be reevaluated
124  * periodically, especially if allocations larger than DBLK_MAX_CACHE
125  * become common.  We use 64-byte alignment so that dblks don't
126  * straddle cache lines unnecessarily.
127  */
128 #define	DBLK_MAX_CACHE		73728
129 #define	DBLK_CACHE_ALIGN	64
130 #define	DBLK_MIN_SIZE		8
131 #define	DBLK_SIZE_SHIFT		3
132 
133 #ifdef _BIG_ENDIAN
134 #define	DBLK_RTFU_SHIFT(field)	\
135 	(8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
136 #else
137 #define	DBLK_RTFU_SHIFT(field)	\
138 	(8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
139 #endif
140 
141 #define	DBLK_RTFU(ref, type, flags, uioflag)	\
142 	(((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
143 	((type) << DBLK_RTFU_SHIFT(db_type)) | \
144 	(((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
145 	((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
146 #define	DBLK_RTFU_REF_MASK	(DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
147 #define	DBLK_RTFU_WORD(dbp)	(*((uint32_t *)&(dbp)->db_ref))
148 #define	MBLK_BAND_FLAG_WORD(mp)	(*((uint32_t *)&(mp)->b_band))
149 
150 static size_t dblk_sizes[] = {
151 #ifdef _LP64
152 	16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856,
153 	8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624,
154 	40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392,
155 #else
156 	64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904,
157 	8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672,
158 	40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440,
159 #endif
160 	DBLK_MAX_CACHE, 0
161 };
162 
163 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
164 static struct kmem_cache *mblk_cache;
165 static struct kmem_cache *dblk_esb_cache;
166 static struct kmem_cache *fthdr_cache;
167 static struct kmem_cache *ftblk_cache;
168 
169 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
170 static mblk_t *allocb_oversize(size_t size, int flags);
171 static int allocb_tryhard_fails;
172 static void frnop_func(void *arg);
173 frtn_t frnop = { frnop_func };
174 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
175 
176 static boolean_t rwnext_enter(queue_t *qp);
177 static void rwnext_exit(queue_t *qp);
178 
179 /*
180  * Patchable mblk/dblk kmem_cache flags.
181  */
182 int dblk_kmem_flags = 0;
183 int mblk_kmem_flags = 0;
184 
185 static int
186 dblk_constructor(void *buf, void *cdrarg, int kmflags)
187 {
188 	dblk_t *dbp = buf;
189 	ssize_t msg_size = (ssize_t)cdrarg;
190 	size_t index;
191 
192 	ASSERT(msg_size != 0);
193 
194 	index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
195 
196 	ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
197 
198 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
199 		return (-1);
200 	if ((msg_size & PAGEOFFSET) == 0) {
201 		dbp->db_base = kmem_alloc(msg_size, kmflags);
202 		if (dbp->db_base == NULL) {
203 			kmem_cache_free(mblk_cache, dbp->db_mblk);
204 			return (-1);
205 		}
206 	} else {
207 		dbp->db_base = (unsigned char *)&dbp[1];
208 	}
209 
210 	dbp->db_mblk->b_datap = dbp;
211 	dbp->db_cache = dblk_cache[index];
212 	dbp->db_lim = dbp->db_base + msg_size;
213 	dbp->db_free = dbp->db_lastfree = dblk_lastfree;
214 	dbp->db_frtnp = NULL;
215 	dbp->db_fthdr = NULL;
216 	dbp->db_credp = NULL;
217 	dbp->db_cpid = -1;
218 	dbp->db_struioflag = 0;
219 	dbp->db_struioun.cksum.flags = 0;
220 	return (0);
221 }
222 
223 /*ARGSUSED*/
224 static int
225 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
226 {
227 	dblk_t *dbp = buf;
228 
229 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
230 		return (-1);
231 	dbp->db_mblk->b_datap = dbp;
232 	dbp->db_cache = dblk_esb_cache;
233 	dbp->db_fthdr = NULL;
234 	dbp->db_credp = NULL;
235 	dbp->db_cpid = -1;
236 	dbp->db_struioflag = 0;
237 	dbp->db_struioun.cksum.flags = 0;
238 	return (0);
239 }
240 
241 static int
242 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
243 {
244 	dblk_t *dbp = buf;
245 	bcache_t *bcp = cdrarg;
246 
247 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
248 		return (-1);
249 
250 	dbp->db_base = kmem_cache_alloc(bcp->buffer_cache, kmflags);
251 	if (dbp->db_base == NULL) {
252 		kmem_cache_free(mblk_cache, dbp->db_mblk);
253 		return (-1);
254 	}
255 
256 	dbp->db_mblk->b_datap = dbp;
257 	dbp->db_cache = (void *)bcp;
258 	dbp->db_lim = dbp->db_base + bcp->size;
259 	dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
260 	dbp->db_frtnp = NULL;
261 	dbp->db_fthdr = NULL;
262 	dbp->db_credp = NULL;
263 	dbp->db_cpid = -1;
264 	dbp->db_struioflag = 0;
265 	dbp->db_struioun.cksum.flags = 0;
266 	return (0);
267 }
268 
269 /*ARGSUSED*/
270 static void
271 dblk_destructor(void *buf, void *cdrarg)
272 {
273 	dblk_t *dbp = buf;
274 	ssize_t msg_size = (ssize_t)cdrarg;
275 
276 	ASSERT(dbp->db_mblk->b_datap == dbp);
277 	ASSERT(msg_size != 0);
278 	ASSERT(dbp->db_struioflag == 0);
279 	ASSERT(dbp->db_struioun.cksum.flags == 0);
280 
281 	if ((msg_size & PAGEOFFSET) == 0) {
282 		kmem_free(dbp->db_base, msg_size);
283 	}
284 
285 	kmem_cache_free(mblk_cache, dbp->db_mblk);
286 }
287 
288 static void
289 bcache_dblk_destructor(void *buf, void *cdrarg)
290 {
291 	dblk_t *dbp = buf;
292 	bcache_t *bcp = cdrarg;
293 
294 	kmem_cache_free(bcp->buffer_cache, dbp->db_base);
295 
296 	ASSERT(dbp->db_mblk->b_datap == dbp);
297 	ASSERT(dbp->db_struioflag == 0);
298 	ASSERT(dbp->db_struioun.cksum.flags == 0);
299 
300 	kmem_cache_free(mblk_cache, dbp->db_mblk);
301 }
302 
303 /* ARGSUSED */
304 static int
305 ftblk_constructor(void *buf, void *cdrarg, int kmflags)
306 {
307 	ftblk_t *fbp = buf;
308 	int i;
309 
310 	bzero(fbp, sizeof (ftblk_t));
311 	if (str_ftstack != 0) {
312 		for (i = 0; i < FTBLK_EVNTS; i++)
313 			fbp->ev[i].stk = kmem_alloc(sizeof (ftstk_t), kmflags);
314 	}
315 
316 	return (0);
317 }
318 
319 /* ARGSUSED */
320 static void
321 ftblk_destructor(void *buf, void *cdrarg)
322 {
323 	ftblk_t *fbp = buf;
324 	int i;
325 
326 	if (str_ftstack != 0) {
327 		for (i = 0; i < FTBLK_EVNTS; i++) {
328 			if (fbp->ev[i].stk != NULL) {
329 				kmem_free(fbp->ev[i].stk, sizeof (ftstk_t));
330 				fbp->ev[i].stk = NULL;
331 			}
332 		}
333 	}
334 }
335 
336 static int
337 fthdr_constructor(void *buf, void *cdrarg, int kmflags)
338 {
339 	fthdr_t *fhp = buf;
340 
341 	return (ftblk_constructor(&fhp->first, cdrarg, kmflags));
342 }
343 
344 static void
345 fthdr_destructor(void *buf, void *cdrarg)
346 {
347 	fthdr_t *fhp = buf;
348 
349 	ftblk_destructor(&fhp->first, cdrarg);
350 }
351 
352 void
353 streams_msg_init(void)
354 {
355 	char name[40];
356 	size_t size;
357 	size_t lastsize = DBLK_MIN_SIZE;
358 	size_t *sizep;
359 	struct kmem_cache *cp;
360 	size_t tot_size;
361 	int offset;
362 
363 	mblk_cache = kmem_cache_create("streams_mblk", sizeof (mblk_t), 32,
364 	    NULL, NULL, NULL, NULL, NULL, mblk_kmem_flags);
365 
366 	for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
367 
368 		if ((offset = (size & PAGEOFFSET)) != 0) {
369 			/*
370 			 * We are in the middle of a page, dblk should
371 			 * be allocated on the same page
372 			 */
373 			tot_size = size + sizeof (dblk_t);
374 			ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
375 			    < PAGESIZE);
376 			ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
377 
378 		} else {
379 
380 			/*
381 			 * buf size is multiple of page size, dblk and
382 			 * buffer are allocated separately.
383 			 */
384 
385 			ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
386 			tot_size = sizeof (dblk_t);
387 		}
388 
389 		(void) sprintf(name, "streams_dblk_%ld", size);
390 		cp = kmem_cache_create(name, tot_size, DBLK_CACHE_ALIGN,
391 		    dblk_constructor, dblk_destructor, NULL, (void *)(size),
392 		    NULL, dblk_kmem_flags);
393 
394 		while (lastsize <= size) {
395 			dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
396 			lastsize += DBLK_MIN_SIZE;
397 		}
398 	}
399 
400 	dblk_esb_cache = kmem_cache_create("streams_dblk_esb", sizeof (dblk_t),
401 	    DBLK_CACHE_ALIGN, dblk_esb_constructor, dblk_destructor, NULL,
402 	    (void *)sizeof (dblk_t), NULL, dblk_kmem_flags);
403 	fthdr_cache = kmem_cache_create("streams_fthdr", sizeof (fthdr_t), 32,
404 	    fthdr_constructor, fthdr_destructor, NULL, NULL, NULL, 0);
405 	ftblk_cache = kmem_cache_create("streams_ftblk", sizeof (ftblk_t), 32,
406 	    ftblk_constructor, ftblk_destructor, NULL, NULL, NULL, 0);
407 
408 	/* initialize throttling queue for esballoc */
409 	esballoc_queue_init();
410 }
411 
412 /*ARGSUSED*/
413 mblk_t *
414 allocb(size_t size, uint_t pri)
415 {
416 	dblk_t *dbp;
417 	mblk_t *mp;
418 	size_t index;
419 
420 	index =  (size - 1)  >> DBLK_SIZE_SHIFT;
421 
422 	if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
423 		if (size != 0) {
424 			mp = allocb_oversize(size, KM_NOSLEEP);
425 			goto out;
426 		}
427 		index = 0;
428 	}
429 
430 	if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
431 		mp = NULL;
432 		goto out;
433 	}
434 
435 	mp = dbp->db_mblk;
436 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
437 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
438 	mp->b_rptr = mp->b_wptr = dbp->db_base;
439 	mp->b_queue = NULL;
440 	MBLK_BAND_FLAG_WORD(mp) = 0;
441 	STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
442 out:
443 	FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
444 
445 	return (mp);
446 }
447 
448 /*
449  * Allocate an mblk taking db_credp and db_cpid from the template.
450  * Allow the cred to be NULL.
451  */
452 mblk_t *
453 allocb_tmpl(size_t size, const mblk_t *tmpl)
454 {
455 	mblk_t *mp = allocb(size, 0);
456 
457 	if (mp != NULL) {
458 		dblk_t *src = tmpl->b_datap;
459 		dblk_t *dst = mp->b_datap;
460 		cred_t *cr;
461 		pid_t cpid;
462 
463 		cr = msg_getcred(tmpl, &cpid);
464 		if (cr != NULL)
465 			crhold(dst->db_credp = cr);
466 		dst->db_cpid = cpid;
467 		dst->db_type = src->db_type;
468 	}
469 	return (mp);
470 }
471 
472 mblk_t *
473 allocb_cred(size_t size, cred_t *cr, pid_t cpid)
474 {
475 	mblk_t *mp = allocb(size, 0);
476 
477 	ASSERT(cr != NULL);
478 	if (mp != NULL) {
479 		dblk_t *dbp = mp->b_datap;
480 
481 		crhold(dbp->db_credp = cr);
482 		dbp->db_cpid = cpid;
483 	}
484 	return (mp);
485 }
486 
487 mblk_t *
488 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr, pid_t cpid)
489 {
490 	mblk_t *mp = allocb_wait(size, 0, flags, error);
491 
492 	ASSERT(cr != NULL);
493 	if (mp != NULL) {
494 		dblk_t *dbp = mp->b_datap;
495 
496 		crhold(dbp->db_credp = cr);
497 		dbp->db_cpid = cpid;
498 	}
499 
500 	return (mp);
501 }
502 
503 /*
504  * Extract the db_cred (and optionally db_cpid) from a message.
505  * We find the first mblk which has a non-NULL db_cred and use that.
506  * If none found we return NULL.
507  * Does NOT get a hold on the cred.
508  */
509 cred_t *
510 msg_getcred(const mblk_t *mp, pid_t *cpidp)
511 {
512 	cred_t *cr = NULL;
513 	cred_t *cr2;
514 	mblk_t *mp2;
515 
516 	while (mp != NULL) {
517 		dblk_t *dbp = mp->b_datap;
518 
519 		cr = dbp->db_credp;
520 		if (cr == NULL) {
521 			mp = mp->b_cont;
522 			continue;
523 		}
524 		if (cpidp != NULL)
525 			*cpidp = dbp->db_cpid;
526 
527 #ifdef DEBUG
528 		/*
529 		 * Normally there should at most one db_credp in a message.
530 		 * But if there are multiple (as in the case of some M_IOC*
531 		 * and some internal messages in TCP/IP bind logic) then
532 		 * they must be identical in the normal case.
533 		 * However, a socket can be shared between different uids
534 		 * in which case data queued in TCP would be from different
535 		 * creds. Thus we can only assert for the zoneid being the
536 		 * same. Due to Multi-level Level Ports for TX, some
537 		 * cred_t can have a NULL cr_zone, and we skip the comparison
538 		 * in that case.
539 		 */
540 		mp2 = mp->b_cont;
541 		while (mp2 != NULL) {
542 			cr2 = DB_CRED(mp2);
543 			if (cr2 != NULL) {
544 				DTRACE_PROBE2(msg__getcred,
545 				    cred_t *, cr, cred_t *, cr2);
546 				ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
547 				    crgetzone(cr) == NULL ||
548 				    crgetzone(cr2) == NULL);
549 			}
550 			mp2 = mp2->b_cont;
551 		}
552 #endif
553 		return (cr);
554 	}
555 	if (cpidp != NULL)
556 		*cpidp = NOPID;
557 	return (NULL);
558 }
559 
560 /*
561  * Variant of msg_getcred which, when a cred is found
562  * 1. Returns with a hold on the cred
563  * 2. Clears the first cred in the mblk.
564  * This is more efficient to use than a msg_getcred() + crhold() when
565  * the message is freed after the cred has been extracted.
566  *
567  * The caller is responsible for ensuring that there is no other reference
568  * on the message since db_credp can not be cleared when there are other
569  * references.
570  */
571 cred_t *
572 msg_extractcred(mblk_t *mp, pid_t *cpidp)
573 {
574 	cred_t *cr = NULL;
575 	cred_t *cr2;
576 	mblk_t *mp2;
577 
578 	while (mp != NULL) {
579 		dblk_t *dbp = mp->b_datap;
580 
581 		cr = dbp->db_credp;
582 		if (cr == NULL) {
583 			mp = mp->b_cont;
584 			continue;
585 		}
586 		ASSERT(dbp->db_ref == 1);
587 		dbp->db_credp = NULL;
588 		if (cpidp != NULL)
589 			*cpidp = dbp->db_cpid;
590 #ifdef DEBUG
591 		/*
592 		 * Normally there should at most one db_credp in a message.
593 		 * But if there are multiple (as in the case of some M_IOC*
594 		 * and some internal messages in TCP/IP bind logic) then
595 		 * they must be identical in the normal case.
596 		 * However, a socket can be shared between different uids
597 		 * in which case data queued in TCP would be from different
598 		 * creds. Thus we can only assert for the zoneid being the
599 		 * same. Due to Multi-level Level Ports for TX, some
600 		 * cred_t can have a NULL cr_zone, and we skip the comparison
601 		 * in that case.
602 		 */
603 		mp2 = mp->b_cont;
604 		while (mp2 != NULL) {
605 			cr2 = DB_CRED(mp2);
606 			if (cr2 != NULL) {
607 				DTRACE_PROBE2(msg__extractcred,
608 				    cred_t *, cr, cred_t *, cr2);
609 				ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
610 				    crgetzone(cr) == NULL ||
611 				    crgetzone(cr2) == NULL);
612 			}
613 			mp2 = mp2->b_cont;
614 		}
615 #endif
616 		return (cr);
617 	}
618 	return (NULL);
619 }
620 /*
621  * Get the label for a message. Uses the first mblk in the message
622  * which has a non-NULL db_credp.
623  * Returns NULL if there is no credp.
624  */
625 extern struct ts_label_s *
626 msg_getlabel(const mblk_t *mp)
627 {
628 	cred_t *cr = msg_getcred(mp, NULL);
629 
630 	if (cr == NULL)
631 		return (NULL);
632 
633 	return (crgetlabel(cr));
634 }
635 
636 void
637 freeb(mblk_t *mp)
638 {
639 	dblk_t *dbp = mp->b_datap;
640 
641 	ASSERT(dbp->db_ref > 0);
642 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
643 	FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
644 
645 	STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
646 
647 	dbp->db_free(mp, dbp);
648 }
649 
650 void
651 freemsg(mblk_t *mp)
652 {
653 	FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
654 	while (mp) {
655 		dblk_t *dbp = mp->b_datap;
656 		mblk_t *mp_cont = mp->b_cont;
657 
658 		ASSERT(dbp->db_ref > 0);
659 		ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
660 
661 		STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
662 
663 		dbp->db_free(mp, dbp);
664 		mp = mp_cont;
665 	}
666 }
667 
668 /*
669  * Reallocate a block for another use.  Try hard to use the old block.
670  * If the old data is wanted (copy), leave b_wptr at the end of the data,
671  * otherwise return b_wptr = b_rptr.
672  *
673  * This routine is private and unstable.
674  */
675 mblk_t	*
676 reallocb(mblk_t *mp, size_t size, uint_t copy)
677 {
678 	mblk_t		*mp1;
679 	unsigned char	*old_rptr;
680 	ptrdiff_t	cur_size;
681 
682 	if (mp == NULL)
683 		return (allocb(size, BPRI_HI));
684 
685 	cur_size = mp->b_wptr - mp->b_rptr;
686 	old_rptr = mp->b_rptr;
687 
688 	ASSERT(mp->b_datap->db_ref != 0);
689 
690 	if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
691 		/*
692 		 * If the data is wanted and it will fit where it is, no
693 		 * work is required.
694 		 */
695 		if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
696 			return (mp);
697 
698 		mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
699 		mp1 = mp;
700 	} else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
701 		/* XXX other mp state could be copied too, db_flags ... ? */
702 		mp1->b_cont = mp->b_cont;
703 	} else {
704 		return (NULL);
705 	}
706 
707 	if (copy) {
708 		bcopy(old_rptr, mp1->b_rptr, cur_size);
709 		mp1->b_wptr = mp1->b_rptr + cur_size;
710 	}
711 
712 	if (mp != mp1)
713 		freeb(mp);
714 
715 	return (mp1);
716 }
717 
718 static void
719 dblk_lastfree(mblk_t *mp, dblk_t *dbp)
720 {
721 	ASSERT(dbp->db_mblk == mp);
722 	if (dbp->db_fthdr != NULL)
723 		str_ftfree(dbp);
724 
725 	/* set credp and projid to be 'unspecified' before returning to cache */
726 	if (dbp->db_credp != NULL) {
727 		crfree(dbp->db_credp);
728 		dbp->db_credp = NULL;
729 	}
730 	dbp->db_cpid = -1;
731 
732 	/* Reset the struioflag and the checksum flag fields */
733 	dbp->db_struioflag = 0;
734 	dbp->db_struioun.cksum.flags = 0;
735 
736 	/* and the COOKED and/or UIOA flag(s) */
737 	dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA);
738 
739 	kmem_cache_free(dbp->db_cache, dbp);
740 }
741 
742 static void
743 dblk_decref(mblk_t *mp, dblk_t *dbp)
744 {
745 	if (dbp->db_ref != 1) {
746 		uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
747 		    -(1 << DBLK_RTFU_SHIFT(db_ref)));
748 		/*
749 		 * atomic_add_32_nv() just decremented db_ref, so we no longer
750 		 * have a reference to the dblk, which means another thread
751 		 * could free it.  Therefore we cannot examine the dblk to
752 		 * determine whether ours was the last reference.  Instead,
753 		 * we extract the new and minimum reference counts from rtfu.
754 		 * Note that all we're really saying is "if (ref != refmin)".
755 		 */
756 		if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
757 		    ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
758 			kmem_cache_free(mblk_cache, mp);
759 			return;
760 		}
761 	}
762 	dbp->db_mblk = mp;
763 	dbp->db_free = dbp->db_lastfree;
764 	dbp->db_lastfree(mp, dbp);
765 }
766 
767 mblk_t *
768 dupb(mblk_t *mp)
769 {
770 	dblk_t *dbp = mp->b_datap;
771 	mblk_t *new_mp;
772 	uint32_t oldrtfu, newrtfu;
773 
774 	if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
775 		goto out;
776 
777 	new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
778 	new_mp->b_rptr = mp->b_rptr;
779 	new_mp->b_wptr = mp->b_wptr;
780 	new_mp->b_datap = dbp;
781 	new_mp->b_queue = NULL;
782 	MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
783 
784 	STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
785 
786 	dbp->db_free = dblk_decref;
787 	do {
788 		ASSERT(dbp->db_ref > 0);
789 		oldrtfu = DBLK_RTFU_WORD(dbp);
790 		newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
791 		/*
792 		 * If db_ref is maxed out we can't dup this message anymore.
793 		 */
794 		if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
795 			kmem_cache_free(mblk_cache, new_mp);
796 			new_mp = NULL;
797 			goto out;
798 		}
799 	} while (atomic_cas_32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) !=
800 	    oldrtfu);
801 
802 out:
803 	FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
804 	return (new_mp);
805 }
806 
807 static void
808 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
809 {
810 	frtn_t *frp = dbp->db_frtnp;
811 
812 	ASSERT(dbp->db_mblk == mp);
813 	frp->free_func(frp->free_arg);
814 	if (dbp->db_fthdr != NULL)
815 		str_ftfree(dbp);
816 
817 	/* set credp and projid to be 'unspecified' before returning to cache */
818 	if (dbp->db_credp != NULL) {
819 		crfree(dbp->db_credp);
820 		dbp->db_credp = NULL;
821 	}
822 	dbp->db_cpid = -1;
823 	dbp->db_struioflag = 0;
824 	dbp->db_struioun.cksum.flags = 0;
825 
826 	kmem_cache_free(dbp->db_cache, dbp);
827 }
828 
829 /*ARGSUSED*/
830 static void
831 frnop_func(void *arg)
832 {
833 }
834 
835 /*
836  * Generic esballoc used to implement the four flavors: [d]esballoc[a].
837  *
838  * The variants with a 'd' prefix (desballoc, desballoca)
839  *	directly free the mblk when it loses its last ref,
840  *	where the other variants free asynchronously.
841  * The variants with an 'a' suffix (esballoca, desballoca)
842  *	add an extra ref, effectively letting the streams subsystem
843  *	know that the message data should not be modified.
844  *	(eg. see db_ref checks in reallocb and elsewhere)
845  *
846  * The method used by the 'a' suffix functions to keep the dblk
847  * db_ref > 1 is non-obvious.  The macro DBLK_RTFU(2,...) passed to
848  * gesballoc sets the initial db_ref = 2 and sets the DBLK_REFMIN
849  * bit in db_flags.  In dblk_decref() that flag essentially means
850  * the dblk has one extra ref, so the "last ref" is one, not zero.
851  */
852 static mblk_t *
853 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
854     void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
855 {
856 	dblk_t *dbp;
857 	mblk_t *mp;
858 
859 	ASSERT(base != NULL && frp != NULL);
860 
861 	if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
862 		mp = NULL;
863 		goto out;
864 	}
865 
866 	mp = dbp->db_mblk;
867 	dbp->db_base = base;
868 	dbp->db_lim = base + size;
869 	dbp->db_free = dbp->db_lastfree = lastfree;
870 	dbp->db_frtnp = frp;
871 	DBLK_RTFU_WORD(dbp) = db_rtfu;
872 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
873 	mp->b_rptr = mp->b_wptr = base;
874 	mp->b_queue = NULL;
875 	MBLK_BAND_FLAG_WORD(mp) = 0;
876 
877 out:
878 	FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
879 	return (mp);
880 }
881 
882 /*ARGSUSED*/
883 mblk_t *
884 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
885 {
886 	mblk_t *mp;
887 
888 	/*
889 	 * Note that this is structured to allow the common case (i.e.
890 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
891 	 * call optimization.
892 	 */
893 	if (!str_ftnever) {
894 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
895 		    frp, freebs_enqueue, KM_NOSLEEP);
896 
897 		if (mp != NULL)
898 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
899 		return (mp);
900 	}
901 
902 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
903 	    frp, freebs_enqueue, KM_NOSLEEP));
904 }
905 
906 /*
907  * Same as esballoc() but sleeps waiting for memory.
908  */
909 /*ARGSUSED*/
910 mblk_t *
911 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
912 {
913 	mblk_t *mp;
914 
915 	/*
916 	 * Note that this is structured to allow the common case (i.e.
917 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
918 	 * call optimization.
919 	 */
920 	if (!str_ftnever) {
921 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
922 		    frp, freebs_enqueue, KM_SLEEP);
923 
924 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
925 		return (mp);
926 	}
927 
928 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
929 	    frp, freebs_enqueue, KM_SLEEP));
930 }
931 
932 /*ARGSUSED*/
933 mblk_t *
934 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
935 {
936 	mblk_t *mp;
937 
938 	/*
939 	 * Note that this is structured to allow the common case (i.e.
940 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
941 	 * call optimization.
942 	 */
943 	if (!str_ftnever) {
944 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
945 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
946 
947 		if (mp != NULL)
948 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
949 		return (mp);
950 	}
951 
952 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
953 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
954 }
955 
956 /*ARGSUSED*/
957 mblk_t *
958 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
959 {
960 	mblk_t *mp;
961 
962 	/*
963 	 * Note that this is structured to allow the common case (i.e.
964 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
965 	 * call optimization.
966 	 */
967 	if (!str_ftnever) {
968 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
969 		    frp, freebs_enqueue, KM_NOSLEEP);
970 
971 		if (mp != NULL)
972 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
973 		return (mp);
974 	}
975 
976 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
977 	    frp, freebs_enqueue, KM_NOSLEEP));
978 }
979 
980 /*
981  * Same as esballoca() but sleeps waiting for memory.
982  */
983 mblk_t *
984 esballoca_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
985 {
986 	mblk_t *mp;
987 
988 	/*
989 	 * Note that this is structured to allow the common case (i.e.
990 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
991 	 * call optimization.
992 	 */
993 	if (!str_ftnever) {
994 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
995 		    frp, freebs_enqueue, KM_SLEEP);
996 
997 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
998 		return (mp);
999 	}
1000 
1001 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
1002 	    frp, freebs_enqueue, KM_SLEEP));
1003 }
1004 
1005 /*ARGSUSED*/
1006 mblk_t *
1007 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
1008 {
1009 	mblk_t *mp;
1010 
1011 	/*
1012 	 * Note that this is structured to allow the common case (i.e.
1013 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
1014 	 * call optimization.
1015 	 */
1016 	if (!str_ftnever) {
1017 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
1018 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
1019 
1020 		if (mp != NULL)
1021 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
1022 		return (mp);
1023 	}
1024 
1025 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
1026 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
1027 }
1028 
1029 static void
1030 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
1031 {
1032 	bcache_t *bcp = dbp->db_cache;
1033 
1034 	ASSERT(dbp->db_mblk == mp);
1035 	if (dbp->db_fthdr != NULL)
1036 		str_ftfree(dbp);
1037 
1038 	/* set credp and projid to be 'unspecified' before returning to cache */
1039 	if (dbp->db_credp != NULL) {
1040 		crfree(dbp->db_credp);
1041 		dbp->db_credp = NULL;
1042 	}
1043 	dbp->db_cpid = -1;
1044 	dbp->db_struioflag = 0;
1045 	dbp->db_struioun.cksum.flags = 0;
1046 
1047 	mutex_enter(&bcp->mutex);
1048 	kmem_cache_free(bcp->dblk_cache, dbp);
1049 	bcp->alloc--;
1050 
1051 	if (bcp->alloc == 0 && bcp->destroy != 0) {
1052 		kmem_cache_destroy(bcp->dblk_cache);
1053 		kmem_cache_destroy(bcp->buffer_cache);
1054 		mutex_exit(&bcp->mutex);
1055 		mutex_destroy(&bcp->mutex);
1056 		kmem_free(bcp, sizeof (bcache_t));
1057 	} else {
1058 		mutex_exit(&bcp->mutex);
1059 	}
1060 }
1061 
1062 bcache_t *
1063 bcache_create(char *name, size_t size, uint_t align)
1064 {
1065 	bcache_t *bcp;
1066 	char buffer[255];
1067 
1068 	ASSERT((align & (align - 1)) == 0);
1069 
1070 	if ((bcp = kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == NULL)
1071 		return (NULL);
1072 
1073 	bcp->size = size;
1074 	bcp->align = align;
1075 	bcp->alloc = 0;
1076 	bcp->destroy = 0;
1077 
1078 	mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
1079 
1080 	(void) sprintf(buffer, "%s_buffer_cache", name);
1081 	bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
1082 	    NULL, NULL, NULL, 0);
1083 	(void) sprintf(buffer, "%s_dblk_cache", name);
1084 	bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
1085 	    DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
1086 	    NULL, (void *)bcp, NULL, 0);
1087 
1088 	return (bcp);
1089 }
1090 
1091 void
1092 bcache_destroy(bcache_t *bcp)
1093 {
1094 	ASSERT(bcp != NULL);
1095 
1096 	mutex_enter(&bcp->mutex);
1097 	if (bcp->alloc == 0) {
1098 		kmem_cache_destroy(bcp->dblk_cache);
1099 		kmem_cache_destroy(bcp->buffer_cache);
1100 		mutex_exit(&bcp->mutex);
1101 		mutex_destroy(&bcp->mutex);
1102 		kmem_free(bcp, sizeof (bcache_t));
1103 	} else {
1104 		bcp->destroy++;
1105 		mutex_exit(&bcp->mutex);
1106 	}
1107 }
1108 
1109 /*ARGSUSED*/
1110 mblk_t *
1111 bcache_allocb(bcache_t *bcp, uint_t pri)
1112 {
1113 	dblk_t *dbp;
1114 	mblk_t *mp = NULL;
1115 
1116 	ASSERT(bcp != NULL);
1117 
1118 	mutex_enter(&bcp->mutex);
1119 	if (bcp->destroy != 0) {
1120 		mutex_exit(&bcp->mutex);
1121 		goto out;
1122 	}
1123 
1124 	if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
1125 		mutex_exit(&bcp->mutex);
1126 		goto out;
1127 	}
1128 	bcp->alloc++;
1129 	mutex_exit(&bcp->mutex);
1130 
1131 	ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
1132 
1133 	mp = dbp->db_mblk;
1134 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1135 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
1136 	mp->b_rptr = mp->b_wptr = dbp->db_base;
1137 	mp->b_queue = NULL;
1138 	MBLK_BAND_FLAG_WORD(mp) = 0;
1139 	STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
1140 out:
1141 	FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
1142 
1143 	return (mp);
1144 }
1145 
1146 static void
1147 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
1148 {
1149 	ASSERT(dbp->db_mblk == mp);
1150 	if (dbp->db_fthdr != NULL)
1151 		str_ftfree(dbp);
1152 
1153 	/* set credp and projid to be 'unspecified' before returning to cache */
1154 	if (dbp->db_credp != NULL) {
1155 		crfree(dbp->db_credp);
1156 		dbp->db_credp = NULL;
1157 	}
1158 	dbp->db_cpid = -1;
1159 	dbp->db_struioflag = 0;
1160 	dbp->db_struioun.cksum.flags = 0;
1161 
1162 	kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
1163 	kmem_cache_free(dbp->db_cache, dbp);
1164 }
1165 
1166 static mblk_t *
1167 allocb_oversize(size_t size, int kmflags)
1168 {
1169 	mblk_t *mp;
1170 	void *buf;
1171 
1172 	size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
1173 	if ((buf = kmem_alloc(size, kmflags)) == NULL)
1174 		return (NULL);
1175 	if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
1176 	    &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
1177 		kmem_free(buf, size);
1178 
1179 	if (mp != NULL)
1180 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
1181 
1182 	return (mp);
1183 }
1184 
1185 mblk_t *
1186 allocb_tryhard(size_t target_size)
1187 {
1188 	size_t size;
1189 	mblk_t *bp;
1190 
1191 	for (size = target_size; size < target_size + 512;
1192 	    size += DBLK_CACHE_ALIGN)
1193 		if ((bp = allocb(size, BPRI_HI)) != NULL)
1194 			return (bp);
1195 	allocb_tryhard_fails++;
1196 	return (NULL);
1197 }
1198 
1199 /*
1200  * This routine is consolidation private for STREAMS internal use
1201  * This routine may only be called from sync routines (i.e., not
1202  * from put or service procedures).  It is located here (rather
1203  * than strsubr.c) so that we don't have to expose all of the
1204  * allocb() implementation details in header files.
1205  */
1206 mblk_t *
1207 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
1208 {
1209 	dblk_t *dbp;
1210 	mblk_t *mp;
1211 	size_t index;
1212 
1213 	index = (size -1) >> DBLK_SIZE_SHIFT;
1214 
1215 	if (flags & STR_NOSIG) {
1216 		if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
1217 			if (size != 0) {
1218 				mp = allocb_oversize(size, KM_SLEEP);
1219 				FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
1220 				    (uintptr_t)mp);
1221 				return (mp);
1222 			}
1223 			index = 0;
1224 		}
1225 
1226 		dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
1227 		mp = dbp->db_mblk;
1228 		DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1229 		mp->b_next = mp->b_prev = mp->b_cont = NULL;
1230 		mp->b_rptr = mp->b_wptr = dbp->db_base;
1231 		mp->b_queue = NULL;
1232 		MBLK_BAND_FLAG_WORD(mp) = 0;
1233 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
1234 
1235 		FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
1236 
1237 	} else {
1238 		while ((mp = allocb(size, pri)) == NULL) {
1239 			if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
1240 				return (NULL);
1241 		}
1242 	}
1243 
1244 	return (mp);
1245 }
1246 
1247 /*
1248  * Call function 'func' with 'arg' when a class zero block can
1249  * be allocated with priority 'pri'.
1250  */
1251 bufcall_id_t
1252 esbbcall(uint_t pri, void (*func)(void *), void *arg)
1253 {
1254 	return (bufcall(1, pri, func, arg));
1255 }
1256 
1257 /*
1258  * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
1259  * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
1260  * This provides consistency for all internal allocators of ioctl.
1261  */
1262 mblk_t *
1263 mkiocb(uint_t cmd)
1264 {
1265 	struct iocblk	*ioc;
1266 	mblk_t		*mp;
1267 
1268 	/*
1269 	 * Allocate enough space for any of the ioctl related messages.
1270 	 */
1271 	if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
1272 		return (NULL);
1273 
1274 	bzero(mp->b_rptr, sizeof (union ioctypes));
1275 
1276 	/*
1277 	 * Set the mblk_t information and ptrs correctly.
1278 	 */
1279 	mp->b_wptr += sizeof (struct iocblk);
1280 	mp->b_datap->db_type = M_IOCTL;
1281 
1282 	/*
1283 	 * Fill in the fields.
1284 	 */
1285 	ioc		= (struct iocblk *)mp->b_rptr;
1286 	ioc->ioc_cmd	= cmd;
1287 	ioc->ioc_cr	= kcred;
1288 	ioc->ioc_id	= getiocseqno();
1289 	ioc->ioc_flag	= IOC_NATIVE;
1290 	return (mp);
1291 }
1292 
1293 /*
1294  * test if block of given size can be allocated with a request of
1295  * the given priority.
1296  * 'pri' is no longer used, but is retained for compatibility.
1297  */
1298 /* ARGSUSED */
1299 int
1300 testb(size_t size, uint_t pri)
1301 {
1302 	return ((size + sizeof (dblk_t)) <= kmem_avail());
1303 }
1304 
1305 /*
1306  * Call function 'func' with argument 'arg' when there is a reasonably
1307  * good chance that a block of size 'size' can be allocated.
1308  * 'pri' is no longer used, but is retained for compatibility.
1309  */
1310 /* ARGSUSED */
1311 bufcall_id_t
1312 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
1313 {
1314 	static long bid = 1;	/* always odd to save checking for zero */
1315 	bufcall_id_t bc_id;
1316 	struct strbufcall *bcp;
1317 
1318 	if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
1319 		return (0);
1320 
1321 	bcp->bc_func = func;
1322 	bcp->bc_arg = arg;
1323 	bcp->bc_size = size;
1324 	bcp->bc_next = NULL;
1325 	bcp->bc_executor = NULL;
1326 
1327 	mutex_enter(&strbcall_lock);
1328 	/*
1329 	 * After bcp is linked into strbcalls and strbcall_lock is dropped there
1330 	 * should be no references to bcp since it may be freed by
1331 	 * runbufcalls(). Since bcp_id field is returned, we save its value in
1332 	 * the local var.
1333 	 */
1334 	bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2);	/* keep it odd */
1335 
1336 	/*
1337 	 * add newly allocated stream event to existing
1338 	 * linked list of events.
1339 	 */
1340 	if (strbcalls.bc_head == NULL) {
1341 		strbcalls.bc_head = strbcalls.bc_tail = bcp;
1342 	} else {
1343 		strbcalls.bc_tail->bc_next = bcp;
1344 		strbcalls.bc_tail = bcp;
1345 	}
1346 
1347 	cv_signal(&strbcall_cv);
1348 	mutex_exit(&strbcall_lock);
1349 	return (bc_id);
1350 }
1351 
1352 /*
1353  * Cancel a bufcall request.
1354  */
1355 void
1356 unbufcall(bufcall_id_t id)
1357 {
1358 	strbufcall_t *bcp, *pbcp;
1359 
1360 	mutex_enter(&strbcall_lock);
1361 again:
1362 	pbcp = NULL;
1363 	for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
1364 		if (id == bcp->bc_id)
1365 			break;
1366 		pbcp = bcp;
1367 	}
1368 	if (bcp) {
1369 		if (bcp->bc_executor != NULL) {
1370 			if (bcp->bc_executor != curthread) {
1371 				cv_wait(&bcall_cv, &strbcall_lock);
1372 				goto again;
1373 			}
1374 		} else {
1375 			if (pbcp)
1376 				pbcp->bc_next = bcp->bc_next;
1377 			else
1378 				strbcalls.bc_head = bcp->bc_next;
1379 			if (bcp == strbcalls.bc_tail)
1380 				strbcalls.bc_tail = pbcp;
1381 			kmem_free(bcp, sizeof (strbufcall_t));
1382 		}
1383 	}
1384 	mutex_exit(&strbcall_lock);
1385 }
1386 
1387 /*
1388  * Duplicate a message block by block (uses dupb), returning
1389  * a pointer to the duplicate message.
1390  * Returns a non-NULL value only if the entire message
1391  * was dup'd.
1392  */
1393 mblk_t *
1394 dupmsg(mblk_t *bp)
1395 {
1396 	mblk_t *head, *nbp;
1397 
1398 	if (!bp || !(nbp = head = dupb(bp)))
1399 		return (NULL);
1400 
1401 	while (bp->b_cont) {
1402 		if (!(nbp->b_cont = dupb(bp->b_cont))) {
1403 			freemsg(head);
1404 			return (NULL);
1405 		}
1406 		nbp = nbp->b_cont;
1407 		bp = bp->b_cont;
1408 	}
1409 	return (head);
1410 }
1411 
1412 #define	DUPB_NOLOAN(bp) \
1413 	((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
1414 	copyb((bp)) : dupb((bp)))
1415 
1416 mblk_t *
1417 dupmsg_noloan(mblk_t *bp)
1418 {
1419 	mblk_t *head, *nbp;
1420 
1421 	if (bp == NULL || DB_TYPE(bp) != M_DATA ||
1422 	    ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
1423 		return (NULL);
1424 
1425 	while (bp->b_cont) {
1426 		if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
1427 			freemsg(head);
1428 			return (NULL);
1429 		}
1430 		nbp = nbp->b_cont;
1431 		bp = bp->b_cont;
1432 	}
1433 	return (head);
1434 }
1435 
1436 /*
1437  * Copy data from message and data block to newly allocated message and
1438  * data block. Returns new message block pointer, or NULL if error.
1439  * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
1440  * as in the original even when db_base is not word aligned. (bug 1052877)
1441  */
1442 mblk_t *
1443 copyb(mblk_t *bp)
1444 {
1445 	mblk_t	*nbp;
1446 	dblk_t	*dp, *ndp;
1447 	uchar_t *base;
1448 	size_t	size;
1449 	size_t	unaligned;
1450 
1451 	ASSERT(bp->b_wptr >= bp->b_rptr);
1452 
1453 	dp = bp->b_datap;
1454 	if (dp->db_fthdr != NULL)
1455 		STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
1456 
1457 	size = dp->db_lim - dp->db_base;
1458 	unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
1459 	if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
1460 		return (NULL);
1461 	nbp->b_flag = bp->b_flag;
1462 	nbp->b_band = bp->b_band;
1463 	ndp = nbp->b_datap;
1464 
1465 	/*
1466 	 * Copy the various checksum information that came in
1467 	 * originally.
1468 	 */
1469 	ndp->db_cksumstart = dp->db_cksumstart;
1470 	ndp->db_cksumend = dp->db_cksumend;
1471 	ndp->db_cksumstuff = dp->db_cksumstuff;
1472 	bcopy(dp->db_struioun.data, ndp->db_struioun.data,
1473 	    sizeof (dp->db_struioun.data));
1474 
1475 	/*
1476 	 * Well, here is a potential issue.  If we are trying to
1477 	 * trace a flow, and we copy the message, we might lose
1478 	 * information about where this message might have been.
1479 	 * So we should inherit the FT data.  On the other hand,
1480 	 * a user might be interested only in alloc to free data.
1481 	 * So I guess the real answer is to provide a tunable.
1482 	 */
1483 	STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1484 
1485 	base = ndp->db_base + unaligned;
1486 	bcopy(dp->db_base, ndp->db_base + unaligned, size);
1487 
1488 	nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
1489 	nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
1490 
1491 	return (nbp);
1492 }
1493 
1494 /*
1495  * Copy data from message to newly allocated message using new
1496  * data blocks.  Returns a pointer to the new message, or NULL if error.
1497  */
1498 mblk_t *
1499 copymsg(mblk_t *bp)
1500 {
1501 	mblk_t *head, *nbp;
1502 
1503 	if (!bp || !(nbp = head = copyb(bp)))
1504 		return (NULL);
1505 
1506 	while (bp->b_cont) {
1507 		if (!(nbp->b_cont = copyb(bp->b_cont))) {
1508 			freemsg(head);
1509 			return (NULL);
1510 		}
1511 		nbp = nbp->b_cont;
1512 		bp = bp->b_cont;
1513 	}
1514 	return (head);
1515 }
1516 
1517 /*
1518  * link a message block to tail of message
1519  */
1520 void
1521 linkb(mblk_t *mp, mblk_t *bp)
1522 {
1523 	ASSERT(mp && bp);
1524 
1525 	for (; mp->b_cont; mp = mp->b_cont)
1526 		;
1527 	mp->b_cont = bp;
1528 }
1529 
1530 /*
1531  * unlink a message block from head of message
1532  * return pointer to new message.
1533  * NULL if message becomes empty.
1534  */
1535 mblk_t *
1536 unlinkb(mblk_t *bp)
1537 {
1538 	mblk_t *bp1;
1539 
1540 	bp1 = bp->b_cont;
1541 	bp->b_cont = NULL;
1542 	return (bp1);
1543 }
1544 
1545 /*
1546  * remove a message block "bp" from message "mp"
1547  *
1548  * Return pointer to new message or NULL if no message remains.
1549  * Return -1 if bp is not found in message.
1550  */
1551 mblk_t *
1552 rmvb(mblk_t *mp, mblk_t *bp)
1553 {
1554 	mblk_t *tmp;
1555 	mblk_t *lastp = NULL;
1556 
1557 	ASSERT(mp && bp);
1558 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
1559 		if (tmp == bp) {
1560 			if (lastp)
1561 				lastp->b_cont = tmp->b_cont;
1562 			else
1563 				mp = tmp->b_cont;
1564 			tmp->b_cont = NULL;
1565 			return (mp);
1566 		}
1567 		lastp = tmp;
1568 	}
1569 	return ((mblk_t *)-1);
1570 }
1571 
1572 /*
1573  * Concatenate and align first len bytes of common
1574  * message type.  Len == -1, means concat everything.
1575  * Returns 1 on success, 0 on failure
1576  * After the pullup, mp points to the pulled up data.
1577  */
1578 int
1579 pullupmsg(mblk_t *mp, ssize_t len)
1580 {
1581 	mblk_t *bp, *b_cont;
1582 	dblk_t *dbp;
1583 	ssize_t n;
1584 
1585 	ASSERT(mp->b_datap->db_ref > 0);
1586 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
1587 
1588 	if (len == -1) {
1589 		if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
1590 			return (1);
1591 		len = xmsgsize(mp);
1592 	} else {
1593 		ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
1594 		ASSERT(first_mblk_len >= 0);
1595 		/*
1596 		 * If the length is less than that of the first mblk,
1597 		 * we want to pull up the message into an aligned mblk.
1598 		 * Though not part of the spec, some callers assume it.
1599 		 */
1600 		if (len <= first_mblk_len) {
1601 			if (str_aligned(mp->b_rptr))
1602 				return (1);
1603 			len = first_mblk_len;
1604 		} else if (xmsgsize(mp) < len)
1605 			return (0);
1606 	}
1607 
1608 	if ((bp = allocb_tmpl(len, mp)) == NULL)
1609 		return (0);
1610 
1611 	dbp = bp->b_datap;
1612 	*bp = *mp;		/* swap mblks so bp heads the old msg... */
1613 	mp->b_datap = dbp;	/* ... and mp heads the new message */
1614 	mp->b_datap->db_mblk = mp;
1615 	bp->b_datap->db_mblk = bp;
1616 	mp->b_rptr = mp->b_wptr = dbp->db_base;
1617 
1618 	do {
1619 		ASSERT(bp->b_datap->db_ref > 0);
1620 		ASSERT(bp->b_wptr >= bp->b_rptr);
1621 		n = MIN(bp->b_wptr - bp->b_rptr, len);
1622 		ASSERT(n >= 0);		/* allow zero-length mblk_t's */
1623 		if (n > 0)
1624 			bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
1625 		mp->b_wptr += n;
1626 		bp->b_rptr += n;
1627 		len -= n;
1628 		if (bp->b_rptr != bp->b_wptr)
1629 			break;
1630 		b_cont = bp->b_cont;
1631 		freeb(bp);
1632 		bp = b_cont;
1633 	} while (len && bp);
1634 
1635 	mp->b_cont = bp;	/* tack on whatever wasn't pulled up */
1636 
1637 	return (1);
1638 }
1639 
1640 /*
1641  * Concatenate and align at least the first len bytes of common message
1642  * type.  Len == -1 means concatenate everything.  The original message is
1643  * unaltered.  Returns a pointer to a new message on success, otherwise
1644  * returns NULL.
1645  */
1646 mblk_t *
1647 msgpullup(mblk_t *mp, ssize_t len)
1648 {
1649 	mblk_t	*newmp;
1650 	ssize_t	totlen;
1651 	ssize_t	n;
1652 
1653 	totlen = xmsgsize(mp);
1654 
1655 	if ((len > 0) && (len > totlen))
1656 		return (NULL);
1657 
1658 	/*
1659 	 * Copy all of the first msg type into one new mblk, then dupmsg
1660 	 * and link the rest onto this.
1661 	 */
1662 
1663 	len = totlen;
1664 
1665 	if ((newmp = allocb_tmpl(len, mp)) == NULL)
1666 		return (NULL);
1667 
1668 	newmp->b_flag = mp->b_flag;
1669 	newmp->b_band = mp->b_band;
1670 
1671 	while (len > 0) {
1672 		n = mp->b_wptr - mp->b_rptr;
1673 		ASSERT(n >= 0);		/* allow zero-length mblk_t's */
1674 		if (n > 0)
1675 			bcopy(mp->b_rptr, newmp->b_wptr, n);
1676 		newmp->b_wptr += n;
1677 		len -= n;
1678 		mp = mp->b_cont;
1679 	}
1680 
1681 	if (mp != NULL) {
1682 		newmp->b_cont = dupmsg(mp);
1683 		if (newmp->b_cont == NULL) {
1684 			freemsg(newmp);
1685 			return (NULL);
1686 		}
1687 	}
1688 
1689 	return (newmp);
1690 }
1691 
1692 /*
1693  * Trim bytes from message
1694  *  len > 0, trim from head
1695  *  len < 0, trim from tail
1696  * Returns 1 on success, 0 on failure.
1697  */
1698 int
1699 adjmsg(mblk_t *mp, ssize_t len)
1700 {
1701 	mblk_t *bp;
1702 	mblk_t *save_bp = NULL;
1703 	mblk_t *prev_bp;
1704 	mblk_t *bcont;
1705 	unsigned char type;
1706 	ssize_t n;
1707 	int fromhead;
1708 	int first;
1709 
1710 	ASSERT(mp != NULL);
1711 
1712 	if (len < 0) {
1713 		fromhead = 0;
1714 		len = -len;
1715 	} else {
1716 		fromhead = 1;
1717 	}
1718 
1719 	if (xmsgsize(mp) < len)
1720 		return (0);
1721 
1722 	if (fromhead) {
1723 		first = 1;
1724 		while (len) {
1725 			ASSERT(mp->b_wptr >= mp->b_rptr);
1726 			n = MIN(mp->b_wptr - mp->b_rptr, len);
1727 			mp->b_rptr += n;
1728 			len -= n;
1729 
1730 			/*
1731 			 * If this is not the first zero length
1732 			 * message remove it
1733 			 */
1734 			if (!first && (mp->b_wptr == mp->b_rptr)) {
1735 				bcont = mp->b_cont;
1736 				freeb(mp);
1737 				mp = save_bp->b_cont = bcont;
1738 			} else {
1739 				save_bp = mp;
1740 				mp = mp->b_cont;
1741 			}
1742 			first = 0;
1743 		}
1744 	} else {
1745 		type = mp->b_datap->db_type;
1746 		while (len) {
1747 			bp = mp;
1748 			save_bp = NULL;
1749 
1750 			/*
1751 			 * Find the last message of same type
1752 			 */
1753 			while (bp && bp->b_datap->db_type == type) {
1754 				ASSERT(bp->b_wptr >= bp->b_rptr);
1755 				prev_bp = save_bp;
1756 				save_bp = bp;
1757 				bp = bp->b_cont;
1758 			}
1759 			if (save_bp == NULL)
1760 				break;
1761 			n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
1762 			save_bp->b_wptr -= n;
1763 			len -= n;
1764 
1765 			/*
1766 			 * If this is not the first message
1767 			 * and we have taken away everything
1768 			 * from this message, remove it
1769 			 */
1770 
1771 			if ((save_bp != mp) &&
1772 			    (save_bp->b_wptr == save_bp->b_rptr)) {
1773 				bcont = save_bp->b_cont;
1774 				freeb(save_bp);
1775 				prev_bp->b_cont = bcont;
1776 			}
1777 		}
1778 	}
1779 	return (1);
1780 }
1781 
1782 /*
1783  * get number of data bytes in message
1784  */
1785 size_t
1786 msgdsize(mblk_t *bp)
1787 {
1788 	size_t count = 0;
1789 
1790 	for (; bp; bp = bp->b_cont)
1791 		if (bp->b_datap->db_type == M_DATA) {
1792 			ASSERT(bp->b_wptr >= bp->b_rptr);
1793 			count += bp->b_wptr - bp->b_rptr;
1794 		}
1795 	return (count);
1796 }
1797 
1798 /*
1799  * Get a message off head of queue
1800  *
1801  * If queue has no buffers then mark queue
1802  * with QWANTR. (queue wants to be read by
1803  * someone when data becomes available)
1804  *
1805  * If there is something to take off then do so.
1806  * If queue falls below hi water mark turn off QFULL
1807  * flag.  Decrement weighted count of queue.
1808  * Also turn off QWANTR because queue is being read.
1809  *
1810  * The queue count is maintained on a per-band basis.
1811  * Priority band 0 (normal messages) uses q_count,
1812  * q_lowat, etc.  Non-zero priority bands use the
1813  * fields in their respective qband structures
1814  * (qb_count, qb_lowat, etc.)  All messages appear
1815  * on the same list, linked via their b_next pointers.
1816  * q_first is the head of the list.  q_count does
1817  * not reflect the size of all the messages on the
1818  * queue.  It only reflects those messages in the
1819  * normal band of flow.  The one exception to this
1820  * deals with high priority messages.  They are in
1821  * their own conceptual "band", but are accounted
1822  * against q_count.
1823  *
1824  * If queue count is below the lo water mark and QWANTW
1825  * is set, enable the closest backq which has a service
1826  * procedure and turn off the QWANTW flag.
1827  *
1828  * getq could be built on top of rmvq, but isn't because
1829  * of performance considerations.
1830  *
1831  * A note on the use of q_count and q_mblkcnt:
1832  *   q_count is the traditional byte count for messages that
1833  *   have been put on a queue.  Documentation tells us that
1834  *   we shouldn't rely on that count, but some drivers/modules
1835  *   do.  What was needed, however, is a mechanism to prevent
1836  *   runaway streams from consuming all of the resources,
1837  *   and particularly be able to flow control zero-length
1838  *   messages.  q_mblkcnt is used for this purpose.  It
1839  *   counts the number of mblk's that are being put on
1840  *   the queue.  The intention here, is that each mblk should
1841  *   contain one byte of data and, for the purpose of
1842  *   flow-control, logically does.  A queue will become
1843  *   full when EITHER of these values (q_count and q_mblkcnt)
1844  *   reach the highwater mark.  It will clear when BOTH
1845  *   of them drop below the highwater mark.  And it will
1846  *   backenable when BOTH of them drop below the lowwater
1847  *   mark.
1848  *   With this algorithm, a driver/module might be able
1849  *   to find a reasonably accurate q_count, and the
1850  *   framework can still try and limit resource usage.
1851  */
1852 mblk_t *
1853 getq(queue_t *q)
1854 {
1855 	mblk_t *bp;
1856 	uchar_t band = 0;
1857 
1858 	bp = getq_noenab(q, 0);
1859 	if (bp != NULL)
1860 		band = bp->b_band;
1861 
1862 	/*
1863 	 * Inlined from qbackenable().
1864 	 * Quick check without holding the lock.
1865 	 */
1866 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1867 		return (bp);
1868 
1869 	qbackenable(q, band);
1870 	return (bp);
1871 }
1872 
1873 /*
1874  * Returns the number of bytes in a message (a message is defined as a
1875  * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we
1876  * also return the number of distinct mblks in the message.
1877  */
1878 int
1879 mp_cont_len(mblk_t *bp, int *mblkcnt)
1880 {
1881 	mblk_t	*mp;
1882 	int	mblks = 0;
1883 	int	bytes = 0;
1884 
1885 	for (mp = bp; mp != NULL; mp = mp->b_cont) {
1886 		bytes += MBLKL(mp);
1887 		mblks++;
1888 	}
1889 
1890 	if (mblkcnt != NULL)
1891 		*mblkcnt = mblks;
1892 
1893 	return (bytes);
1894 }
1895 
1896 /*
1897  * Like getq() but does not backenable.  This is used by the stream
1898  * head when a putback() is likely.  The caller must call qbackenable()
1899  * after it is done with accessing the queue.
1900  * The rbytes arguments to getq_noneab() allows callers to specify a
1901  * the maximum number of bytes to return. If the current amount on the
1902  * queue is less than this then the entire message will be returned.
1903  * A value of 0 returns the entire message and is equivalent to the old
1904  * default behaviour prior to the addition of the rbytes argument.
1905  */
1906 mblk_t *
1907 getq_noenab(queue_t *q, ssize_t rbytes)
1908 {
1909 	mblk_t *bp, *mp1;
1910 	mblk_t *mp2 = NULL;
1911 	qband_t *qbp;
1912 	kthread_id_t freezer;
1913 	int	bytecnt = 0, mblkcnt = 0;
1914 
1915 	/* freezestr should allow its caller to call getq/putq */
1916 	freezer = STREAM(q)->sd_freezer;
1917 	if (freezer == curthread) {
1918 		ASSERT(frozenstr(q));
1919 		ASSERT(MUTEX_HELD(QLOCK(q)));
1920 	} else
1921 		mutex_enter(QLOCK(q));
1922 
1923 	if ((bp = q->q_first) == 0) {
1924 		q->q_flag |= QWANTR;
1925 	} else {
1926 		/*
1927 		 * If the caller supplied a byte threshold and there is
1928 		 * more than this amount on the queue then break up the
1929 		 * the message appropriately.  We can only safely do
1930 		 * this for M_DATA messages.
1931 		 */
1932 		if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) &&
1933 		    (q->q_count > rbytes)) {
1934 			/*
1935 			 * Inline version of mp_cont_len() which terminates
1936 			 * when we meet or exceed rbytes.
1937 			 */
1938 			for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) {
1939 				mblkcnt++;
1940 				bytecnt += MBLKL(mp1);
1941 				if (bytecnt  >= rbytes)
1942 					break;
1943 			}
1944 			/*
1945 			 * We need to account for the following scenarios:
1946 			 *
1947 			 * 1) Too much data in the first message:
1948 			 *	mp1 will be the mblk which puts us over our
1949 			 *	byte limit.
1950 			 * 2) Not enough data in the first message:
1951 			 *	mp1 will be NULL.
1952 			 * 3) Exactly the right amount of data contained within
1953 			 *    whole mblks:
1954 			 *	mp1->b_cont will be where we break the message.
1955 			 */
1956 			if (bytecnt > rbytes) {
1957 				/*
1958 				 * Dup/copy mp1 and put what we don't need
1959 				 * back onto the queue. Adjust the read/write
1960 				 * and continuation pointers appropriately
1961 				 * and decrement the current mblk count to
1962 				 * reflect we are putting an mblk back onto
1963 				 * the queue.
1964 				 * When adjusting the message pointers, it's
1965 				 * OK to use the existing bytecnt and the
1966 				 * requested amount (rbytes) to calculate the
1967 				 * the new write offset (b_wptr) of what we
1968 				 * are taking. However, we  cannot use these
1969 				 * values when calculating the read offset of
1970 				 * the mblk we are putting back on the queue.
1971 				 * This is because the begining (b_rptr) of the
1972 				 * mblk represents some arbitrary point within
1973 				 * the message.
1974 				 * It's simplest to do this by advancing b_rptr
1975 				 * by the new length of mp1 as we don't have to
1976 				 * remember any intermediate state.
1977 				 */
1978 				ASSERT(mp1 != NULL);
1979 				mblkcnt--;
1980 				if ((mp2 = dupb(mp1)) == NULL &&
1981 				    (mp2 = copyb(mp1)) == NULL) {
1982 					bytecnt = mblkcnt = 0;
1983 					goto dup_failed;
1984 				}
1985 				mp2->b_cont = mp1->b_cont;
1986 				mp1->b_wptr -= bytecnt - rbytes;
1987 				mp2->b_rptr += mp1->b_wptr - mp1->b_rptr;
1988 				mp1->b_cont = NULL;
1989 				bytecnt = rbytes;
1990 			} else {
1991 				/*
1992 				 * Either there is not enough data in the first
1993 				 * message or there is no excess data to deal
1994 				 * with. If mp1 is NULL, we are taking the
1995 				 * whole message. No need to do anything.
1996 				 * Otherwise we assign mp1->b_cont to mp2 as
1997 				 * we will be putting this back onto the head of
1998 				 * the queue.
1999 				 */
2000 				if (mp1 != NULL) {
2001 					mp2 = mp1->b_cont;
2002 					mp1->b_cont = NULL;
2003 				}
2004 			}
2005 			/*
2006 			 * If mp2 is not NULL then we have part of the message
2007 			 * to put back onto the queue.
2008 			 */
2009 			if (mp2 != NULL) {
2010 				if ((mp2->b_next = bp->b_next) == NULL)
2011 					q->q_last = mp2;
2012 				else
2013 					bp->b_next->b_prev = mp2;
2014 				q->q_first = mp2;
2015 			} else {
2016 				if ((q->q_first = bp->b_next) == NULL)
2017 					q->q_last = NULL;
2018 				else
2019 					q->q_first->b_prev = NULL;
2020 			}
2021 		} else {
2022 			/*
2023 			 * Either no byte threshold was supplied, there is
2024 			 * not enough on the queue or we failed to
2025 			 * duplicate/copy a data block. In these cases we
2026 			 * just take the entire first message.
2027 			 */
2028 dup_failed:
2029 			bytecnt = mp_cont_len(bp, &mblkcnt);
2030 			if ((q->q_first = bp->b_next) == NULL)
2031 				q->q_last = NULL;
2032 			else
2033 				q->q_first->b_prev = NULL;
2034 		}
2035 		if (bp->b_band == 0) {
2036 			q->q_count -= bytecnt;
2037 			q->q_mblkcnt -= mblkcnt;
2038 			if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2039 			    (q->q_mblkcnt < q->q_hiwat))) {
2040 				q->q_flag &= ~QFULL;
2041 			}
2042 		} else {
2043 			int i;
2044 
2045 			ASSERT(bp->b_band <= q->q_nband);
2046 			ASSERT(q->q_bandp != NULL);
2047 			ASSERT(MUTEX_HELD(QLOCK(q)));
2048 			qbp = q->q_bandp;
2049 			i = bp->b_band;
2050 			while (--i > 0)
2051 				qbp = qbp->qb_next;
2052 			if (qbp->qb_first == qbp->qb_last) {
2053 				qbp->qb_first = NULL;
2054 				qbp->qb_last = NULL;
2055 			} else {
2056 				qbp->qb_first = bp->b_next;
2057 			}
2058 			qbp->qb_count -= bytecnt;
2059 			qbp->qb_mblkcnt -= mblkcnt;
2060 			if (qbp->qb_mblkcnt == 0 ||
2061 			    ((qbp->qb_count < qbp->qb_hiwat) &&
2062 			    (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2063 				qbp->qb_flag &= ~QB_FULL;
2064 			}
2065 		}
2066 		q->q_flag &= ~QWANTR;
2067 		bp->b_next = NULL;
2068 		bp->b_prev = NULL;
2069 	}
2070 	if (freezer != curthread)
2071 		mutex_exit(QLOCK(q));
2072 
2073 	STR_FTEVENT_MSG(bp, q, FTEV_GETQ, 0);
2074 
2075 	return (bp);
2076 }
2077 
2078 /*
2079  * Determine if a backenable is needed after removing a message in the
2080  * specified band.
2081  * NOTE: This routine assumes that something like getq_noenab() has been
2082  * already called.
2083  *
2084  * For the read side it is ok to hold sd_lock across calling this (and the
2085  * stream head often does).
2086  * But for the write side strwakeq might be invoked and it acquires sd_lock.
2087  */
2088 void
2089 qbackenable(queue_t *q, uchar_t band)
2090 {
2091 	int backenab = 0;
2092 	qband_t *qbp;
2093 	kthread_id_t freezer;
2094 
2095 	ASSERT(q);
2096 	ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
2097 
2098 	/*
2099 	 * Quick check without holding the lock.
2100 	 * OK since after getq() has lowered the q_count these flags
2101 	 * would not change unless either the qbackenable() is done by
2102 	 * another thread (which is ok) or the queue has gotten QFULL
2103 	 * in which case another backenable will take place when the queue
2104 	 * drops below q_lowat.
2105 	 */
2106 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
2107 		return;
2108 
2109 	/* freezestr should allow its caller to call getq/putq */
2110 	freezer = STREAM(q)->sd_freezer;
2111 	if (freezer == curthread) {
2112 		ASSERT(frozenstr(q));
2113 		ASSERT(MUTEX_HELD(QLOCK(q)));
2114 	} else
2115 		mutex_enter(QLOCK(q));
2116 
2117 	if (band == 0) {
2118 		if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
2119 		    q->q_mblkcnt < q->q_lowat)) {
2120 			backenab = q->q_flag & (QWANTW|QWANTWSYNC);
2121 		}
2122 	} else {
2123 		int i;
2124 
2125 		ASSERT((unsigned)band <= q->q_nband);
2126 		ASSERT(q->q_bandp != NULL);
2127 
2128 		qbp = q->q_bandp;
2129 		i = band;
2130 		while (--i > 0)
2131 			qbp = qbp->qb_next;
2132 
2133 		if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
2134 		    qbp->qb_mblkcnt < qbp->qb_lowat)) {
2135 			backenab = qbp->qb_flag & QB_WANTW;
2136 		}
2137 	}
2138 
2139 	if (backenab == 0) {
2140 		if (freezer != curthread)
2141 			mutex_exit(QLOCK(q));
2142 		return;
2143 	}
2144 
2145 	/* Have to drop the lock across strwakeq and backenable */
2146 	if (backenab & QWANTWSYNC)
2147 		q->q_flag &= ~QWANTWSYNC;
2148 	if (backenab & (QWANTW|QB_WANTW)) {
2149 		if (band != 0)
2150 			qbp->qb_flag &= ~QB_WANTW;
2151 		else {
2152 			q->q_flag &= ~QWANTW;
2153 		}
2154 	}
2155 
2156 	if (freezer != curthread)
2157 		mutex_exit(QLOCK(q));
2158 
2159 	if (backenab & QWANTWSYNC)
2160 		strwakeq(q, QWANTWSYNC);
2161 	if (backenab & (QWANTW|QB_WANTW))
2162 		backenable(q, band);
2163 }
2164 
2165 /*
2166  * Remove a message from a queue.  The queue count and other
2167  * flow control parameters are adjusted and the back queue
2168  * enabled if necessary.
2169  *
2170  * rmvq can be called with the stream frozen, but other utility functions
2171  * holding QLOCK, and by streams modules without any locks/frozen.
2172  */
2173 void
2174 rmvq(queue_t *q, mblk_t *mp)
2175 {
2176 	ASSERT(mp != NULL);
2177 
2178 	rmvq_noenab(q, mp);
2179 	if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
2180 		/*
2181 		 * qbackenable can handle a frozen stream but not a "random"
2182 		 * qlock being held. Drop lock across qbackenable.
2183 		 */
2184 		mutex_exit(QLOCK(q));
2185 		qbackenable(q, mp->b_band);
2186 		mutex_enter(QLOCK(q));
2187 	} else {
2188 		qbackenable(q, mp->b_band);
2189 	}
2190 }
2191 
2192 /*
2193  * Like rmvq() but without any backenabling.
2194  * This exists to handle SR_CONSOL_DATA in strrput().
2195  */
2196 void
2197 rmvq_noenab(queue_t *q, mblk_t *mp)
2198 {
2199 	int i;
2200 	qband_t *qbp = NULL;
2201 	kthread_id_t freezer;
2202 	int	bytecnt = 0, mblkcnt = 0;
2203 
2204 	freezer = STREAM(q)->sd_freezer;
2205 	if (freezer == curthread) {
2206 		ASSERT(frozenstr(q));
2207 		ASSERT(MUTEX_HELD(QLOCK(q)));
2208 	} else if (MUTEX_HELD(QLOCK(q))) {
2209 		/* Don't drop lock on exit */
2210 		freezer = curthread;
2211 	} else
2212 		mutex_enter(QLOCK(q));
2213 
2214 	ASSERT(mp->b_band <= q->q_nband);
2215 	if (mp->b_band != 0) {		/* Adjust band pointers */
2216 		ASSERT(q->q_bandp != NULL);
2217 		qbp = q->q_bandp;
2218 		i = mp->b_band;
2219 		while (--i > 0)
2220 			qbp = qbp->qb_next;
2221 		if (mp == qbp->qb_first) {
2222 			if (mp->b_next && mp->b_band == mp->b_next->b_band)
2223 				qbp->qb_first = mp->b_next;
2224 			else
2225 				qbp->qb_first = NULL;
2226 		}
2227 		if (mp == qbp->qb_last) {
2228 			if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
2229 				qbp->qb_last = mp->b_prev;
2230 			else
2231 				qbp->qb_last = NULL;
2232 		}
2233 	}
2234 
2235 	/*
2236 	 * Remove the message from the list.
2237 	 */
2238 	if (mp->b_prev)
2239 		mp->b_prev->b_next = mp->b_next;
2240 	else
2241 		q->q_first = mp->b_next;
2242 	if (mp->b_next)
2243 		mp->b_next->b_prev = mp->b_prev;
2244 	else
2245 		q->q_last = mp->b_prev;
2246 	mp->b_next = NULL;
2247 	mp->b_prev = NULL;
2248 
2249 	/* Get the size of the message for q_count accounting */
2250 	bytecnt = mp_cont_len(mp, &mblkcnt);
2251 
2252 	if (mp->b_band == 0) {		/* Perform q_count accounting */
2253 		q->q_count -= bytecnt;
2254 		q->q_mblkcnt -= mblkcnt;
2255 		if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2256 		    (q->q_mblkcnt < q->q_hiwat))) {
2257 			q->q_flag &= ~QFULL;
2258 		}
2259 	} else {			/* Perform qb_count accounting */
2260 		qbp->qb_count -= bytecnt;
2261 		qbp->qb_mblkcnt -= mblkcnt;
2262 		if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) &&
2263 		    (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2264 			qbp->qb_flag &= ~QB_FULL;
2265 		}
2266 	}
2267 	if (freezer != curthread)
2268 		mutex_exit(QLOCK(q));
2269 
2270 	STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, 0);
2271 }
2272 
2273 /*
2274  * Empty a queue.
2275  * If flag is set, remove all messages.  Otherwise, remove
2276  * only non-control messages.  If queue falls below its low
2277  * water mark, and QWANTW is set, enable the nearest upstream
2278  * service procedure.
2279  *
2280  * Historical note: when merging the M_FLUSH code in strrput with this
2281  * code one difference was discovered. flushq did not have a check
2282  * for q_lowat == 0 in the backenabling test.
2283  *
2284  * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
2285  * if one exists on the queue.
2286  */
2287 void
2288 flushq_common(queue_t *q, int flag, int pcproto_flag)
2289 {
2290 	mblk_t *mp, *nmp;
2291 	qband_t *qbp;
2292 	int backenab = 0;
2293 	unsigned char bpri;
2294 	unsigned char	qbf[NBAND];	/* band flushing backenable flags */
2295 
2296 	if (q->q_first == NULL)
2297 		return;
2298 
2299 	mutex_enter(QLOCK(q));
2300 	mp = q->q_first;
2301 	q->q_first = NULL;
2302 	q->q_last = NULL;
2303 	q->q_count = 0;
2304 	q->q_mblkcnt = 0;
2305 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2306 		qbp->qb_first = NULL;
2307 		qbp->qb_last = NULL;
2308 		qbp->qb_count = 0;
2309 		qbp->qb_mblkcnt = 0;
2310 		qbp->qb_flag &= ~QB_FULL;
2311 	}
2312 	q->q_flag &= ~QFULL;
2313 	mutex_exit(QLOCK(q));
2314 	while (mp) {
2315 		nmp = mp->b_next;
2316 		mp->b_next = mp->b_prev = NULL;
2317 
2318 		STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, 0);
2319 
2320 		if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
2321 			(void) putq(q, mp);
2322 		else if (flag || datamsg(mp->b_datap->db_type))
2323 			freemsg(mp);
2324 		else
2325 			(void) putq(q, mp);
2326 		mp = nmp;
2327 	}
2328 	bpri = 1;
2329 	mutex_enter(QLOCK(q));
2330 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2331 		if ((qbp->qb_flag & QB_WANTW) &&
2332 		    (((qbp->qb_count < qbp->qb_lowat) &&
2333 		    (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
2334 		    qbp->qb_lowat == 0)) {
2335 			qbp->qb_flag &= ~QB_WANTW;
2336 			backenab = 1;
2337 			qbf[bpri] = 1;
2338 		} else
2339 			qbf[bpri] = 0;
2340 		bpri++;
2341 	}
2342 	ASSERT(bpri == (unsigned char)(q->q_nband + 1));
2343 	if ((q->q_flag & QWANTW) &&
2344 	    (((q->q_count < q->q_lowat) &&
2345 	    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2346 		q->q_flag &= ~QWANTW;
2347 		backenab = 1;
2348 		qbf[0] = 1;
2349 	} else
2350 		qbf[0] = 0;
2351 
2352 	/*
2353 	 * If any band can now be written to, and there is a writer
2354 	 * for that band, then backenable the closest service procedure.
2355 	 */
2356 	if (backenab) {
2357 		mutex_exit(QLOCK(q));
2358 		for (bpri = q->q_nband; bpri != 0; bpri--)
2359 			if (qbf[bpri])
2360 				backenable(q, bpri);
2361 		if (qbf[0])
2362 			backenable(q, 0);
2363 	} else
2364 		mutex_exit(QLOCK(q));
2365 }
2366 
2367 /*
2368  * The real flushing takes place in flushq_common. This is done so that
2369  * a flag which specifies whether or not M_PCPROTO messages should be flushed
2370  * or not. Currently the only place that uses this flag is the stream head.
2371  */
2372 void
2373 flushq(queue_t *q, int flag)
2374 {
2375 	flushq_common(q, flag, 0);
2376 }
2377 
2378 /*
2379  * Flush the queue of messages of the given priority band.
2380  * There is some duplication of code between flushq and flushband.
2381  * This is because we want to optimize the code as much as possible.
2382  * The assumption is that there will be more messages in the normal
2383  * (priority 0) band than in any other.
2384  *
2385  * Historical note: when merging the M_FLUSH code in strrput with this
2386  * code one difference was discovered. flushband had an extra check for
2387  * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
2388  * case. That check does not match the man page for flushband and was not
2389  * in the strrput flush code hence it was removed.
2390  */
2391 void
2392 flushband(queue_t *q, unsigned char pri, int flag)
2393 {
2394 	mblk_t *mp;
2395 	mblk_t *nmp;
2396 	mblk_t *last;
2397 	qband_t *qbp;
2398 	int band;
2399 
2400 	ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
2401 	if (pri > q->q_nband) {
2402 		return;
2403 	}
2404 	mutex_enter(QLOCK(q));
2405 	if (pri == 0) {
2406 		mp = q->q_first;
2407 		q->q_first = NULL;
2408 		q->q_last = NULL;
2409 		q->q_count = 0;
2410 		q->q_mblkcnt = 0;
2411 		for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2412 			qbp->qb_first = NULL;
2413 			qbp->qb_last = NULL;
2414 			qbp->qb_count = 0;
2415 			qbp->qb_mblkcnt = 0;
2416 			qbp->qb_flag &= ~QB_FULL;
2417 		}
2418 		q->q_flag &= ~QFULL;
2419 		mutex_exit(QLOCK(q));
2420 		while (mp) {
2421 			nmp = mp->b_next;
2422 			mp->b_next = mp->b_prev = NULL;
2423 			if ((mp->b_band == 0) &&
2424 			    ((flag == FLUSHALL) ||
2425 			    datamsg(mp->b_datap->db_type)))
2426 				freemsg(mp);
2427 			else
2428 				(void) putq(q, mp);
2429 			mp = nmp;
2430 		}
2431 		mutex_enter(QLOCK(q));
2432 		if ((q->q_flag & QWANTW) &&
2433 		    (((q->q_count < q->q_lowat) &&
2434 		    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2435 			q->q_flag &= ~QWANTW;
2436 			mutex_exit(QLOCK(q));
2437 
2438 			backenable(q, pri);
2439 		} else
2440 			mutex_exit(QLOCK(q));
2441 	} else {	/* pri != 0 */
2442 		boolean_t flushed = B_FALSE;
2443 		band = pri;
2444 
2445 		ASSERT(MUTEX_HELD(QLOCK(q)));
2446 		qbp = q->q_bandp;
2447 		while (--band > 0)
2448 			qbp = qbp->qb_next;
2449 		mp = qbp->qb_first;
2450 		if (mp == NULL) {
2451 			mutex_exit(QLOCK(q));
2452 			return;
2453 		}
2454 		last = qbp->qb_last->b_next;
2455 		/*
2456 		 * rmvq_noenab() and freemsg() are called for each mblk that
2457 		 * meets the criteria.  The loop is executed until the last
2458 		 * mblk has been processed.
2459 		 */
2460 		while (mp != last) {
2461 			ASSERT(mp->b_band == pri);
2462 			nmp = mp->b_next;
2463 			if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
2464 				rmvq_noenab(q, mp);
2465 				freemsg(mp);
2466 				flushed = B_TRUE;
2467 			}
2468 			mp = nmp;
2469 		}
2470 		mutex_exit(QLOCK(q));
2471 
2472 		/*
2473 		 * If any mblk(s) has been freed, we know that qbackenable()
2474 		 * will need to be called.
2475 		 */
2476 		if (flushed)
2477 			qbackenable(q, pri);
2478 	}
2479 }
2480 
2481 /*
2482  * Return 1 if the queue is not full.  If the queue is full, return
2483  * 0 (may not put message) and set QWANTW flag (caller wants to write
2484  * to the queue).
2485  */
2486 int
2487 canput(queue_t *q)
2488 {
2489 	TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
2490 
2491 	/* this is for loopback transports, they should not do a canput */
2492 	ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
2493 
2494 	/* Find next forward module that has a service procedure */
2495 	q = q->q_nfsrv;
2496 
2497 	if (!(q->q_flag & QFULL)) {
2498 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2499 		return (1);
2500 	}
2501 	mutex_enter(QLOCK(q));
2502 	if (q->q_flag & QFULL) {
2503 		q->q_flag |= QWANTW;
2504 		mutex_exit(QLOCK(q));
2505 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
2506 		return (0);
2507 	}
2508 	mutex_exit(QLOCK(q));
2509 	TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2510 	return (1);
2511 }
2512 
2513 /*
2514  * This is the new canput for use with priority bands.  Return 1 if the
2515  * band is not full.  If the band is full, return 0 (may not put message)
2516  * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
2517  * write to the queue).
2518  */
2519 int
2520 bcanput(queue_t *q, unsigned char pri)
2521 {
2522 	qband_t *qbp;
2523 
2524 	TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
2525 	if (!q)
2526 		return (0);
2527 
2528 	/* Find next forward module that has a service procedure */
2529 	q = q->q_nfsrv;
2530 
2531 	mutex_enter(QLOCK(q));
2532 	if (pri == 0) {
2533 		if (q->q_flag & QFULL) {
2534 			q->q_flag |= QWANTW;
2535 			mutex_exit(QLOCK(q));
2536 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2537 			    "bcanput:%p %X %d", q, pri, 0);
2538 			return (0);
2539 		}
2540 	} else {	/* pri != 0 */
2541 		if (pri > q->q_nband) {
2542 			/*
2543 			 * No band exists yet, so return success.
2544 			 */
2545 			mutex_exit(QLOCK(q));
2546 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2547 			    "bcanput:%p %X %d", q, pri, 1);
2548 			return (1);
2549 		}
2550 		qbp = q->q_bandp;
2551 		while (--pri)
2552 			qbp = qbp->qb_next;
2553 		if (qbp->qb_flag & QB_FULL) {
2554 			qbp->qb_flag |= QB_WANTW;
2555 			mutex_exit(QLOCK(q));
2556 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2557 			    "bcanput:%p %X %d", q, pri, 0);
2558 			return (0);
2559 		}
2560 	}
2561 	mutex_exit(QLOCK(q));
2562 	TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2563 	    "bcanput:%p %X %d", q, pri, 1);
2564 	return (1);
2565 }
2566 
2567 /*
2568  * Put a message on a queue.
2569  *
2570  * Messages are enqueued on a priority basis.  The priority classes
2571  * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
2572  * and B_NORMAL (type < QPCTL && band == 0).
2573  *
2574  * Add appropriate weighted data block sizes to queue count.
2575  * If queue hits high water mark then set QFULL flag.
2576  *
2577  * If QNOENAB is not set (putq is allowed to enable the queue),
2578  * enable the queue only if the message is PRIORITY,
2579  * or the QWANTR flag is set (indicating that the service procedure
2580  * is ready to read the queue.  This implies that a service
2581  * procedure must NEVER put a high priority message back on its own
2582  * queue, as this would result in an infinite loop (!).
2583  */
2584 int
2585 putq(queue_t *q, mblk_t *bp)
2586 {
2587 	mblk_t *tmp;
2588 	qband_t *qbp = NULL;
2589 	int mcls = (int)queclass(bp);
2590 	kthread_id_t freezer;
2591 	int	bytecnt = 0, mblkcnt = 0;
2592 
2593 	freezer = STREAM(q)->sd_freezer;
2594 	if (freezer == curthread) {
2595 		ASSERT(frozenstr(q));
2596 		ASSERT(MUTEX_HELD(QLOCK(q)));
2597 	} else
2598 		mutex_enter(QLOCK(q));
2599 
2600 	/*
2601 	 * Make sanity checks and if qband structure is not yet
2602 	 * allocated, do so.
2603 	 */
2604 	if (mcls == QPCTL) {
2605 		if (bp->b_band != 0)
2606 			bp->b_band = 0;		/* force to be correct */
2607 	} else if (bp->b_band != 0) {
2608 		int i;
2609 		qband_t **qbpp;
2610 
2611 		if (bp->b_band > q->q_nband) {
2612 
2613 			/*
2614 			 * The qband structure for this priority band is
2615 			 * not on the queue yet, so we have to allocate
2616 			 * one on the fly.  It would be wasteful to
2617 			 * associate the qband structures with every
2618 			 * queue when the queues are allocated.  This is
2619 			 * because most queues will only need the normal
2620 			 * band of flow which can be described entirely
2621 			 * by the queue itself.
2622 			 */
2623 			qbpp = &q->q_bandp;
2624 			while (*qbpp)
2625 				qbpp = &(*qbpp)->qb_next;
2626 			while (bp->b_band > q->q_nband) {
2627 				if ((*qbpp = allocband()) == NULL) {
2628 					if (freezer != curthread)
2629 						mutex_exit(QLOCK(q));
2630 					return (0);
2631 				}
2632 				(*qbpp)->qb_hiwat = q->q_hiwat;
2633 				(*qbpp)->qb_lowat = q->q_lowat;
2634 				q->q_nband++;
2635 				qbpp = &(*qbpp)->qb_next;
2636 			}
2637 		}
2638 		ASSERT(MUTEX_HELD(QLOCK(q)));
2639 		qbp = q->q_bandp;
2640 		i = bp->b_band;
2641 		while (--i)
2642 			qbp = qbp->qb_next;
2643 	}
2644 
2645 	/*
2646 	 * If queue is empty, add the message and initialize the pointers.
2647 	 * Otherwise, adjust message pointers and queue pointers based on
2648 	 * the type of the message and where it belongs on the queue.  Some
2649 	 * code is duplicated to minimize the number of conditionals and
2650 	 * hopefully minimize the amount of time this routine takes.
2651 	 */
2652 	if (!q->q_first) {
2653 		bp->b_next = NULL;
2654 		bp->b_prev = NULL;
2655 		q->q_first = bp;
2656 		q->q_last = bp;
2657 		if (qbp) {
2658 			qbp->qb_first = bp;
2659 			qbp->qb_last = bp;
2660 		}
2661 	} else if (!qbp) {	/* bp->b_band == 0 */
2662 
2663 		/*
2664 		 * If queue class of message is less than or equal to
2665 		 * that of the last one on the queue, tack on to the end.
2666 		 */
2667 		tmp = q->q_last;
2668 		if (mcls <= (int)queclass(tmp)) {
2669 			bp->b_next = NULL;
2670 			bp->b_prev = tmp;
2671 			tmp->b_next = bp;
2672 			q->q_last = bp;
2673 		} else {
2674 			tmp = q->q_first;
2675 			while ((int)queclass(tmp) >= mcls)
2676 				tmp = tmp->b_next;
2677 
2678 			/*
2679 			 * Insert bp before tmp.
2680 			 */
2681 			bp->b_next = tmp;
2682 			bp->b_prev = tmp->b_prev;
2683 			if (tmp->b_prev)
2684 				tmp->b_prev->b_next = bp;
2685 			else
2686 				q->q_first = bp;
2687 			tmp->b_prev = bp;
2688 		}
2689 	} else {		/* bp->b_band != 0 */
2690 		if (qbp->qb_first) {
2691 			tmp = qbp->qb_last;
2692 
2693 			/*
2694 			 * Insert bp after the last message in this band.
2695 			 */
2696 			bp->b_next = tmp->b_next;
2697 			if (tmp->b_next)
2698 				tmp->b_next->b_prev = bp;
2699 			else
2700 				q->q_last = bp;
2701 			bp->b_prev = tmp;
2702 			tmp->b_next = bp;
2703 		} else {
2704 			tmp = q->q_last;
2705 			if ((mcls < (int)queclass(tmp)) ||
2706 			    (bp->b_band <= tmp->b_band)) {
2707 
2708 				/*
2709 				 * Tack bp on end of queue.
2710 				 */
2711 				bp->b_next = NULL;
2712 				bp->b_prev = tmp;
2713 				tmp->b_next = bp;
2714 				q->q_last = bp;
2715 			} else {
2716 				tmp = q->q_first;
2717 				while (tmp->b_datap->db_type >= QPCTL)
2718 					tmp = tmp->b_next;
2719 				while (tmp->b_band >= bp->b_band)
2720 					tmp = tmp->b_next;
2721 
2722 				/*
2723 				 * Insert bp before tmp.
2724 				 */
2725 				bp->b_next = tmp;
2726 				bp->b_prev = tmp->b_prev;
2727 				if (tmp->b_prev)
2728 					tmp->b_prev->b_next = bp;
2729 				else
2730 					q->q_first = bp;
2731 				tmp->b_prev = bp;
2732 			}
2733 			qbp->qb_first = bp;
2734 		}
2735 		qbp->qb_last = bp;
2736 	}
2737 
2738 	/* Get message byte count for q_count accounting */
2739 	bytecnt = mp_cont_len(bp, &mblkcnt);
2740 
2741 	if (qbp) {
2742 		qbp->qb_count += bytecnt;
2743 		qbp->qb_mblkcnt += mblkcnt;
2744 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2745 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2746 			qbp->qb_flag |= QB_FULL;
2747 		}
2748 	} else {
2749 		q->q_count += bytecnt;
2750 		q->q_mblkcnt += mblkcnt;
2751 		if ((q->q_count >= q->q_hiwat) ||
2752 		    (q->q_mblkcnt >= q->q_hiwat)) {
2753 			q->q_flag |= QFULL;
2754 		}
2755 	}
2756 
2757 	STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, 0);
2758 
2759 	if ((mcls > QNORM) ||
2760 	    (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
2761 		qenable_locked(q);
2762 	ASSERT(MUTEX_HELD(QLOCK(q)));
2763 	if (freezer != curthread)
2764 		mutex_exit(QLOCK(q));
2765 
2766 	return (1);
2767 }
2768 
2769 /*
2770  * Put stuff back at beginning of Q according to priority order.
2771  * See comment on putq above for details.
2772  */
2773 int
2774 putbq(queue_t *q, mblk_t *bp)
2775 {
2776 	mblk_t *tmp;
2777 	qband_t *qbp = NULL;
2778 	int mcls = (int)queclass(bp);
2779 	kthread_id_t freezer;
2780 	int	bytecnt = 0, mblkcnt = 0;
2781 
2782 	ASSERT(q && bp);
2783 	ASSERT(bp->b_next == NULL);
2784 	freezer = STREAM(q)->sd_freezer;
2785 	if (freezer == curthread) {
2786 		ASSERT(frozenstr(q));
2787 		ASSERT(MUTEX_HELD(QLOCK(q)));
2788 	} else
2789 		mutex_enter(QLOCK(q));
2790 
2791 	/*
2792 	 * Make sanity checks and if qband structure is not yet
2793 	 * allocated, do so.
2794 	 */
2795 	if (mcls == QPCTL) {
2796 		if (bp->b_band != 0)
2797 			bp->b_band = 0;		/* force to be correct */
2798 	} else if (bp->b_band != 0) {
2799 		int i;
2800 		qband_t **qbpp;
2801 
2802 		if (bp->b_band > q->q_nband) {
2803 			qbpp = &q->q_bandp;
2804 			while (*qbpp)
2805 				qbpp = &(*qbpp)->qb_next;
2806 			while (bp->b_band > q->q_nband) {
2807 				if ((*qbpp = allocband()) == NULL) {
2808 					if (freezer != curthread)
2809 						mutex_exit(QLOCK(q));
2810 					return (0);
2811 				}
2812 				(*qbpp)->qb_hiwat = q->q_hiwat;
2813 				(*qbpp)->qb_lowat = q->q_lowat;
2814 				q->q_nband++;
2815 				qbpp = &(*qbpp)->qb_next;
2816 			}
2817 		}
2818 		qbp = q->q_bandp;
2819 		i = bp->b_band;
2820 		while (--i)
2821 			qbp = qbp->qb_next;
2822 	}
2823 
2824 	/*
2825 	 * If queue is empty or if message is high priority,
2826 	 * place on the front of the queue.
2827 	 */
2828 	tmp = q->q_first;
2829 	if ((!tmp) || (mcls == QPCTL)) {
2830 		bp->b_next = tmp;
2831 		if (tmp)
2832 			tmp->b_prev = bp;
2833 		else
2834 			q->q_last = bp;
2835 		q->q_first = bp;
2836 		bp->b_prev = NULL;
2837 		if (qbp) {
2838 			qbp->qb_first = bp;
2839 			qbp->qb_last = bp;
2840 		}
2841 	} else if (qbp) {	/* bp->b_band != 0 */
2842 		tmp = qbp->qb_first;
2843 		if (tmp) {
2844 
2845 			/*
2846 			 * Insert bp before the first message in this band.
2847 			 */
2848 			bp->b_next = tmp;
2849 			bp->b_prev = tmp->b_prev;
2850 			if (tmp->b_prev)
2851 				tmp->b_prev->b_next = bp;
2852 			else
2853 				q->q_first = bp;
2854 			tmp->b_prev = bp;
2855 		} else {
2856 			tmp = q->q_last;
2857 			if ((mcls < (int)queclass(tmp)) ||
2858 			    (bp->b_band < tmp->b_band)) {
2859 
2860 				/*
2861 				 * Tack bp on end of queue.
2862 				 */
2863 				bp->b_next = NULL;
2864 				bp->b_prev = tmp;
2865 				tmp->b_next = bp;
2866 				q->q_last = bp;
2867 			} else {
2868 				tmp = q->q_first;
2869 				while (tmp->b_datap->db_type >= QPCTL)
2870 					tmp = tmp->b_next;
2871 				while (tmp->b_band > bp->b_band)
2872 					tmp = tmp->b_next;
2873 
2874 				/*
2875 				 * Insert bp before tmp.
2876 				 */
2877 				bp->b_next = tmp;
2878 				bp->b_prev = tmp->b_prev;
2879 				if (tmp->b_prev)
2880 					tmp->b_prev->b_next = bp;
2881 				else
2882 					q->q_first = bp;
2883 				tmp->b_prev = bp;
2884 			}
2885 			qbp->qb_last = bp;
2886 		}
2887 		qbp->qb_first = bp;
2888 	} else {		/* bp->b_band == 0 && !QPCTL */
2889 
2890 		/*
2891 		 * If the queue class or band is less than that of the last
2892 		 * message on the queue, tack bp on the end of the queue.
2893 		 */
2894 		tmp = q->q_last;
2895 		if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
2896 			bp->b_next = NULL;
2897 			bp->b_prev = tmp;
2898 			tmp->b_next = bp;
2899 			q->q_last = bp;
2900 		} else {
2901 			tmp = q->q_first;
2902 			while (tmp->b_datap->db_type >= QPCTL)
2903 				tmp = tmp->b_next;
2904 			while (tmp->b_band > bp->b_band)
2905 				tmp = tmp->b_next;
2906 
2907 			/*
2908 			 * Insert bp before tmp.
2909 			 */
2910 			bp->b_next = tmp;
2911 			bp->b_prev = tmp->b_prev;
2912 			if (tmp->b_prev)
2913 				tmp->b_prev->b_next = bp;
2914 			else
2915 				q->q_first = bp;
2916 			tmp->b_prev = bp;
2917 		}
2918 	}
2919 
2920 	/* Get message byte count for q_count accounting */
2921 	bytecnt = mp_cont_len(bp, &mblkcnt);
2922 
2923 	if (qbp) {
2924 		qbp->qb_count += bytecnt;
2925 		qbp->qb_mblkcnt += mblkcnt;
2926 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2927 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2928 			qbp->qb_flag |= QB_FULL;
2929 		}
2930 	} else {
2931 		q->q_count += bytecnt;
2932 		q->q_mblkcnt += mblkcnt;
2933 		if ((q->q_count >= q->q_hiwat) ||
2934 		    (q->q_mblkcnt >= q->q_hiwat)) {
2935 			q->q_flag |= QFULL;
2936 		}
2937 	}
2938 
2939 	STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, 0);
2940 
2941 	if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
2942 		qenable_locked(q);
2943 	ASSERT(MUTEX_HELD(QLOCK(q)));
2944 	if (freezer != curthread)
2945 		mutex_exit(QLOCK(q));
2946 
2947 	return (1);
2948 }
2949 
2950 /*
2951  * Insert a message before an existing message on the queue.  If the
2952  * existing message is NULL, the new messages is placed on the end of
2953  * the queue.  The queue class of the new message is ignored.  However,
2954  * the priority band of the new message must adhere to the following
2955  * ordering:
2956  *
2957  *	emp->b_prev->b_band >= mp->b_band >= emp->b_band.
2958  *
2959  * All flow control parameters are updated.
2960  *
2961  * insq can be called with the stream frozen, but other utility functions
2962  * holding QLOCK, and by streams modules without any locks/frozen.
2963  */
2964 int
2965 insq(queue_t *q, mblk_t *emp, mblk_t *mp)
2966 {
2967 	mblk_t *tmp;
2968 	qband_t *qbp = NULL;
2969 	int mcls = (int)queclass(mp);
2970 	kthread_id_t freezer;
2971 	int	bytecnt = 0, mblkcnt = 0;
2972 
2973 	freezer = STREAM(q)->sd_freezer;
2974 	if (freezer == curthread) {
2975 		ASSERT(frozenstr(q));
2976 		ASSERT(MUTEX_HELD(QLOCK(q)));
2977 	} else if (MUTEX_HELD(QLOCK(q))) {
2978 		/* Don't drop lock on exit */
2979 		freezer = curthread;
2980 	} else
2981 		mutex_enter(QLOCK(q));
2982 
2983 	if (mcls == QPCTL) {
2984 		if (mp->b_band != 0)
2985 			mp->b_band = 0;		/* force to be correct */
2986 		if (emp && emp->b_prev &&
2987 		    (emp->b_prev->b_datap->db_type < QPCTL))
2988 			goto badord;
2989 	}
2990 	if (emp) {
2991 		if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
2992 		    (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
2993 		    (emp->b_prev->b_band < mp->b_band))) {
2994 			goto badord;
2995 		}
2996 	} else {
2997 		tmp = q->q_last;
2998 		if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
2999 badord:
3000 			cmn_err(CE_WARN,
3001 			    "insq: attempt to insert message out of order "
3002 			    "on q %p", (void *)q);
3003 			if (freezer != curthread)
3004 				mutex_exit(QLOCK(q));
3005 			return (0);
3006 		}
3007 	}
3008 
3009 	if (mp->b_band != 0) {
3010 		int i;
3011 		qband_t **qbpp;
3012 
3013 		if (mp->b_band > q->q_nband) {
3014 			qbpp = &q->q_bandp;
3015 			while (*qbpp)
3016 				qbpp = &(*qbpp)->qb_next;
3017 			while (mp->b_band > q->q_nband) {
3018 				if ((*qbpp = allocband()) == NULL) {
3019 					if (freezer != curthread)
3020 						mutex_exit(QLOCK(q));
3021 					return (0);
3022 				}
3023 				(*qbpp)->qb_hiwat = q->q_hiwat;
3024 				(*qbpp)->qb_lowat = q->q_lowat;
3025 				q->q_nband++;
3026 				qbpp = &(*qbpp)->qb_next;
3027 			}
3028 		}
3029 		qbp = q->q_bandp;
3030 		i = mp->b_band;
3031 		while (--i)
3032 			qbp = qbp->qb_next;
3033 	}
3034 
3035 	if ((mp->b_next = emp) != NULL) {
3036 		if ((mp->b_prev = emp->b_prev) != NULL)
3037 			emp->b_prev->b_next = mp;
3038 		else
3039 			q->q_first = mp;
3040 		emp->b_prev = mp;
3041 	} else {
3042 		if ((mp->b_prev = q->q_last) != NULL)
3043 			q->q_last->b_next = mp;
3044 		else
3045 			q->q_first = mp;
3046 		q->q_last = mp;
3047 	}
3048 
3049 	/* Get mblk and byte count for q_count accounting */
3050 	bytecnt = mp_cont_len(mp, &mblkcnt);
3051 
3052 	if (qbp) {	/* adjust qband pointers and count */
3053 		if (!qbp->qb_first) {
3054 			qbp->qb_first = mp;
3055 			qbp->qb_last = mp;
3056 		} else {
3057 			if (mp->b_prev == NULL || (mp->b_prev != NULL &&
3058 			    (mp->b_prev->b_band != mp->b_band)))
3059 				qbp->qb_first = mp;
3060 			else if (mp->b_next == NULL || (mp->b_next != NULL &&
3061 			    (mp->b_next->b_band != mp->b_band)))
3062 				qbp->qb_last = mp;
3063 		}
3064 		qbp->qb_count += bytecnt;
3065 		qbp->qb_mblkcnt += mblkcnt;
3066 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
3067 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
3068 			qbp->qb_flag |= QB_FULL;
3069 		}
3070 	} else {
3071 		q->q_count += bytecnt;
3072 		q->q_mblkcnt += mblkcnt;
3073 		if ((q->q_count >= q->q_hiwat) ||
3074 		    (q->q_mblkcnt >= q->q_hiwat)) {
3075 			q->q_flag |= QFULL;
3076 		}
3077 	}
3078 
3079 	STR_FTEVENT_MSG(mp, q, FTEV_INSQ, 0);
3080 
3081 	if (canenable(q) && (q->q_flag & QWANTR))
3082 		qenable_locked(q);
3083 
3084 	ASSERT(MUTEX_HELD(QLOCK(q)));
3085 	if (freezer != curthread)
3086 		mutex_exit(QLOCK(q));
3087 
3088 	return (1);
3089 }
3090 
3091 /*
3092  * Create and put a control message on queue.
3093  */
3094 int
3095 putctl(queue_t *q, int type)
3096 {
3097 	mblk_t *bp;
3098 
3099 	if ((datamsg(type) && (type != M_DELAY)) ||
3100 	    (bp = allocb_tryhard(0)) == NULL)
3101 		return (0);
3102 	bp->b_datap->db_type = (unsigned char) type;
3103 
3104 	put(q, bp);
3105 
3106 	return (1);
3107 }
3108 
3109 /*
3110  * Control message with a single-byte parameter
3111  */
3112 int
3113 putctl1(queue_t *q, int type, int param)
3114 {
3115 	mblk_t *bp;
3116 
3117 	if ((datamsg(type) && (type != M_DELAY)) ||
3118 	    (bp = allocb_tryhard(1)) == NULL)
3119 		return (0);
3120 	bp->b_datap->db_type = (unsigned char)type;
3121 	*bp->b_wptr++ = (unsigned char)param;
3122 
3123 	put(q, bp);
3124 
3125 	return (1);
3126 }
3127 
3128 int
3129 putnextctl1(queue_t *q, int type, int param)
3130 {
3131 	mblk_t *bp;
3132 
3133 	if ((datamsg(type) && (type != M_DELAY)) ||
3134 	    ((bp = allocb_tryhard(1)) == NULL))
3135 		return (0);
3136 
3137 	bp->b_datap->db_type = (unsigned char)type;
3138 	*bp->b_wptr++ = (unsigned char)param;
3139 
3140 	putnext(q, bp);
3141 
3142 	return (1);
3143 }
3144 
3145 int
3146 putnextctl(queue_t *q, int type)
3147 {
3148 	mblk_t *bp;
3149 
3150 	if ((datamsg(type) && (type != M_DELAY)) ||
3151 	    ((bp = allocb_tryhard(0)) == NULL))
3152 		return (0);
3153 	bp->b_datap->db_type = (unsigned char)type;
3154 
3155 	putnext(q, bp);
3156 
3157 	return (1);
3158 }
3159 
3160 /*
3161  * Return the queue upstream from this one
3162  */
3163 queue_t *
3164 backq(queue_t *q)
3165 {
3166 	q = _OTHERQ(q);
3167 	if (q->q_next) {
3168 		q = q->q_next;
3169 		return (_OTHERQ(q));
3170 	}
3171 	return (NULL);
3172 }
3173 
3174 /*
3175  * Send a block back up the queue in reverse from this
3176  * one (e.g. to respond to ioctls)
3177  */
3178 void
3179 qreply(queue_t *q, mblk_t *bp)
3180 {
3181 	ASSERT(q && bp);
3182 
3183 	putnext(_OTHERQ(q), bp);
3184 }
3185 
3186 /*
3187  * Streams Queue Scheduling
3188  *
3189  * Queues are enabled through qenable() when they have messages to
3190  * process.  They are serviced by queuerun(), which runs each enabled
3191  * queue's service procedure.  The call to queuerun() is processor
3192  * dependent - the general principle is that it be run whenever a queue
3193  * is enabled but before returning to user level.  For system calls,
3194  * the function runqueues() is called if their action causes a queue
3195  * to be enabled.  For device interrupts, queuerun() should be
3196  * called before returning from the last level of interrupt.  Beyond
3197  * this, no timing assumptions should be made about queue scheduling.
3198  */
3199 
3200 /*
3201  * Enable a queue: put it on list of those whose service procedures are
3202  * ready to run and set up the scheduling mechanism.
3203  * The broadcast is done outside the mutex -> to avoid the woken thread
3204  * from contending with the mutex. This is OK 'cos the queue has been
3205  * enqueued on the runlist and flagged safely at this point.
3206  */
3207 void
3208 qenable(queue_t *q)
3209 {
3210 	mutex_enter(QLOCK(q));
3211 	qenable_locked(q);
3212 	mutex_exit(QLOCK(q));
3213 }
3214 /*
3215  * Return number of messages on queue
3216  */
3217 int
3218 qsize(queue_t *qp)
3219 {
3220 	int count = 0;
3221 	mblk_t *mp;
3222 
3223 	mutex_enter(QLOCK(qp));
3224 	for (mp = qp->q_first; mp; mp = mp->b_next)
3225 		count++;
3226 	mutex_exit(QLOCK(qp));
3227 	return (count);
3228 }
3229 
3230 /*
3231  * noenable - set queue so that putq() will not enable it.
3232  * enableok - set queue so that putq() can enable it.
3233  */
3234 void
3235 noenable(queue_t *q)
3236 {
3237 	mutex_enter(QLOCK(q));
3238 	q->q_flag |= QNOENB;
3239 	mutex_exit(QLOCK(q));
3240 }
3241 
3242 void
3243 enableok(queue_t *q)
3244 {
3245 	mutex_enter(QLOCK(q));
3246 	q->q_flag &= ~QNOENB;
3247 	mutex_exit(QLOCK(q));
3248 }
3249 
3250 /*
3251  * Set queue fields.
3252  */
3253 int
3254 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
3255 {
3256 	qband_t *qbp = NULL;
3257 	queue_t	*wrq;
3258 	int error = 0;
3259 	kthread_id_t freezer;
3260 
3261 	freezer = STREAM(q)->sd_freezer;
3262 	if (freezer == curthread) {
3263 		ASSERT(frozenstr(q));
3264 		ASSERT(MUTEX_HELD(QLOCK(q)));
3265 	} else
3266 		mutex_enter(QLOCK(q));
3267 
3268 	if (what >= QBAD) {
3269 		error = EINVAL;
3270 		goto done;
3271 	}
3272 	if (pri != 0) {
3273 		int i;
3274 		qband_t **qbpp;
3275 
3276 		if (pri > q->q_nband) {
3277 			qbpp = &q->q_bandp;
3278 			while (*qbpp)
3279 				qbpp = &(*qbpp)->qb_next;
3280 			while (pri > q->q_nband) {
3281 				if ((*qbpp = allocband()) == NULL) {
3282 					error = EAGAIN;
3283 					goto done;
3284 				}
3285 				(*qbpp)->qb_hiwat = q->q_hiwat;
3286 				(*qbpp)->qb_lowat = q->q_lowat;
3287 				q->q_nband++;
3288 				qbpp = &(*qbpp)->qb_next;
3289 			}
3290 		}
3291 		qbp = q->q_bandp;
3292 		i = pri;
3293 		while (--i)
3294 			qbp = qbp->qb_next;
3295 	}
3296 	switch (what) {
3297 
3298 	case QHIWAT:
3299 		if (qbp)
3300 			qbp->qb_hiwat = (size_t)val;
3301 		else
3302 			q->q_hiwat = (size_t)val;
3303 		break;
3304 
3305 	case QLOWAT:
3306 		if (qbp)
3307 			qbp->qb_lowat = (size_t)val;
3308 		else
3309 			q->q_lowat = (size_t)val;
3310 		break;
3311 
3312 	case QMAXPSZ:
3313 		if (qbp)
3314 			error = EINVAL;
3315 		else
3316 			q->q_maxpsz = (ssize_t)val;
3317 
3318 		/*
3319 		 * Performance concern, strwrite looks at the module below
3320 		 * the stream head for the maxpsz each time it does a write
3321 		 * we now cache it at the stream head.  Check to see if this
3322 		 * queue is sitting directly below the stream head.
3323 		 */
3324 		wrq = STREAM(q)->sd_wrq;
3325 		if (q != wrq->q_next)
3326 			break;
3327 
3328 		/*
3329 		 * If the stream is not frozen drop the current QLOCK and
3330 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3331 		 */
3332 		if (freezer != curthread) {
3333 			mutex_exit(QLOCK(q));
3334 			mutex_enter(QLOCK(wrq));
3335 		}
3336 		ASSERT(MUTEX_HELD(QLOCK(wrq)));
3337 
3338 		if (strmsgsz != 0) {
3339 			if (val == INFPSZ)
3340 				val = strmsgsz;
3341 			else  {
3342 				if (STREAM(q)->sd_vnode->v_type == VFIFO)
3343 					val = MIN(PIPE_BUF, val);
3344 				else
3345 					val = MIN(strmsgsz, val);
3346 			}
3347 		}
3348 		STREAM(q)->sd_qn_maxpsz = val;
3349 		if (freezer != curthread) {
3350 			mutex_exit(QLOCK(wrq));
3351 			mutex_enter(QLOCK(q));
3352 		}
3353 		break;
3354 
3355 	case QMINPSZ:
3356 		if (qbp)
3357 			error = EINVAL;
3358 		else
3359 			q->q_minpsz = (ssize_t)val;
3360 
3361 		/*
3362 		 * Performance concern, strwrite looks at the module below
3363 		 * the stream head for the maxpsz each time it does a write
3364 		 * we now cache it at the stream head.  Check to see if this
3365 		 * queue is sitting directly below the stream head.
3366 		 */
3367 		wrq = STREAM(q)->sd_wrq;
3368 		if (q != wrq->q_next)
3369 			break;
3370 
3371 		/*
3372 		 * If the stream is not frozen drop the current QLOCK and
3373 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3374 		 */
3375 		if (freezer != curthread) {
3376 			mutex_exit(QLOCK(q));
3377 			mutex_enter(QLOCK(wrq));
3378 		}
3379 		STREAM(q)->sd_qn_minpsz = (ssize_t)val;
3380 
3381 		if (freezer != curthread) {
3382 			mutex_exit(QLOCK(wrq));
3383 			mutex_enter(QLOCK(q));
3384 		}
3385 		break;
3386 
3387 	case QSTRUIOT:
3388 		if (qbp)
3389 			error = EINVAL;
3390 		else
3391 			q->q_struiot = (ushort_t)val;
3392 		break;
3393 
3394 	case QCOUNT:
3395 	case QFIRST:
3396 	case QLAST:
3397 	case QFLAG:
3398 		error = EPERM;
3399 		break;
3400 
3401 	default:
3402 		error = EINVAL;
3403 		break;
3404 	}
3405 done:
3406 	if (freezer != curthread)
3407 		mutex_exit(QLOCK(q));
3408 	return (error);
3409 }
3410 
3411 /*
3412  * Get queue fields.
3413  */
3414 int
3415 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
3416 {
3417 	qband_t		*qbp = NULL;
3418 	int		error = 0;
3419 	kthread_id_t	freezer;
3420 
3421 	freezer = STREAM(q)->sd_freezer;
3422 	if (freezer == curthread) {
3423 		ASSERT(frozenstr(q));
3424 		ASSERT(MUTEX_HELD(QLOCK(q)));
3425 	} else
3426 		mutex_enter(QLOCK(q));
3427 	if (what >= QBAD) {
3428 		error = EINVAL;
3429 		goto done;
3430 	}
3431 	if (pri != 0) {
3432 		int i;
3433 		qband_t **qbpp;
3434 
3435 		if (pri > q->q_nband) {
3436 			qbpp = &q->q_bandp;
3437 			while (*qbpp)
3438 				qbpp = &(*qbpp)->qb_next;
3439 			while (pri > q->q_nband) {
3440 				if ((*qbpp = allocband()) == NULL) {
3441 					error = EAGAIN;
3442 					goto done;
3443 				}
3444 				(*qbpp)->qb_hiwat = q->q_hiwat;
3445 				(*qbpp)->qb_lowat = q->q_lowat;
3446 				q->q_nband++;
3447 				qbpp = &(*qbpp)->qb_next;
3448 			}
3449 		}
3450 		qbp = q->q_bandp;
3451 		i = pri;
3452 		while (--i)
3453 			qbp = qbp->qb_next;
3454 	}
3455 	switch (what) {
3456 	case QHIWAT:
3457 		if (qbp)
3458 			*(size_t *)valp = qbp->qb_hiwat;
3459 		else
3460 			*(size_t *)valp = q->q_hiwat;
3461 		break;
3462 
3463 	case QLOWAT:
3464 		if (qbp)
3465 			*(size_t *)valp = qbp->qb_lowat;
3466 		else
3467 			*(size_t *)valp = q->q_lowat;
3468 		break;
3469 
3470 	case QMAXPSZ:
3471 		if (qbp)
3472 			error = EINVAL;
3473 		else
3474 			*(ssize_t *)valp = q->q_maxpsz;
3475 		break;
3476 
3477 	case QMINPSZ:
3478 		if (qbp)
3479 			error = EINVAL;
3480 		else
3481 			*(ssize_t *)valp = q->q_minpsz;
3482 		break;
3483 
3484 	case QCOUNT:
3485 		if (qbp)
3486 			*(size_t *)valp = qbp->qb_count;
3487 		else
3488 			*(size_t *)valp = q->q_count;
3489 		break;
3490 
3491 	case QFIRST:
3492 		if (qbp)
3493 			*(mblk_t **)valp = qbp->qb_first;
3494 		else
3495 			*(mblk_t **)valp = q->q_first;
3496 		break;
3497 
3498 	case QLAST:
3499 		if (qbp)
3500 			*(mblk_t **)valp = qbp->qb_last;
3501 		else
3502 			*(mblk_t **)valp = q->q_last;
3503 		break;
3504 
3505 	case QFLAG:
3506 		if (qbp)
3507 			*(uint_t *)valp = qbp->qb_flag;
3508 		else
3509 			*(uint_t *)valp = q->q_flag;
3510 		break;
3511 
3512 	case QSTRUIOT:
3513 		if (qbp)
3514 			error = EINVAL;
3515 		else
3516 			*(short *)valp = q->q_struiot;
3517 		break;
3518 
3519 	default:
3520 		error = EINVAL;
3521 		break;
3522 	}
3523 done:
3524 	if (freezer != curthread)
3525 		mutex_exit(QLOCK(q));
3526 	return (error);
3527 }
3528 
3529 /*
3530  * Function awakes all in cvwait/sigwait/pollwait, on one of:
3531  *	QWANTWSYNC or QWANTR or QWANTW,
3532  *
3533  * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
3534  *	 deferred wakeup will be done. Also if strpoll() in progress then a
3535  *	 deferred pollwakeup will be done.
3536  */
3537 void
3538 strwakeq(queue_t *q, int flag)
3539 {
3540 	stdata_t	*stp = STREAM(q);
3541 	pollhead_t	*pl;
3542 
3543 	mutex_enter(&stp->sd_lock);
3544 	pl = &stp->sd_pollist;
3545 	if (flag & QWANTWSYNC) {
3546 		ASSERT(!(q->q_flag & QREADR));
3547 		if (stp->sd_flag & WSLEEP) {
3548 			stp->sd_flag &= ~WSLEEP;
3549 			cv_broadcast(&stp->sd_wrq->q_wait);
3550 		} else {
3551 			stp->sd_wakeq |= WSLEEP;
3552 		}
3553 
3554 		mutex_exit(&stp->sd_lock);
3555 		pollwakeup(pl, POLLWRNORM);
3556 		mutex_enter(&stp->sd_lock);
3557 
3558 		if (stp->sd_sigflags & S_WRNORM)
3559 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3560 	} else if (flag & QWANTR) {
3561 		if (stp->sd_flag & RSLEEP) {
3562 			stp->sd_flag &= ~RSLEEP;
3563 			cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
3564 		} else {
3565 			stp->sd_wakeq |= RSLEEP;
3566 		}
3567 
3568 		mutex_exit(&stp->sd_lock);
3569 		pollwakeup(pl, POLLIN | POLLRDNORM);
3570 		mutex_enter(&stp->sd_lock);
3571 
3572 		{
3573 			int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
3574 
3575 			if (events)
3576 				strsendsig(stp->sd_siglist, events, 0, 0);
3577 		}
3578 	} else {
3579 		if (stp->sd_flag & WSLEEP) {
3580 			stp->sd_flag &= ~WSLEEP;
3581 			cv_broadcast(&stp->sd_wrq->q_wait);
3582 		}
3583 
3584 		mutex_exit(&stp->sd_lock);
3585 		pollwakeup(pl, POLLWRNORM);
3586 		mutex_enter(&stp->sd_lock);
3587 
3588 		if (stp->sd_sigflags & S_WRNORM)
3589 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3590 	}
3591 	mutex_exit(&stp->sd_lock);
3592 }
3593 
3594 int
3595 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
3596 {
3597 	stdata_t *stp = STREAM(q);
3598 	int typ  = STRUIOT_STANDARD;
3599 	uio_t	 *uiop = &dp->d_uio;
3600 	dblk_t	 *dbp;
3601 	ssize_t	 uiocnt;
3602 	ssize_t	 cnt;
3603 	unsigned char *ptr;
3604 	ssize_t	 resid;
3605 	int	 error = 0;
3606 	on_trap_data_t otd;
3607 	queue_t	*stwrq;
3608 
3609 	/*
3610 	 * Plumbing may change while taking the type so store the
3611 	 * queue in a temporary variable. It doesn't matter even
3612 	 * if the we take the type from the previous plumbing,
3613 	 * that's because if the plumbing has changed when we were
3614 	 * holding the queue in a temporary variable, we can continue
3615 	 * processing the message the way it would have been processed
3616 	 * in the old plumbing, without any side effects but a bit
3617 	 * extra processing for partial ip header checksum.
3618 	 *
3619 	 * This has been done to avoid holding the sd_lock which is
3620 	 * very hot.
3621 	 */
3622 
3623 	stwrq = stp->sd_struiowrq;
3624 	if (stwrq)
3625 		typ = stwrq->q_struiot;
3626 
3627 	for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
3628 		dbp = mp->b_datap;
3629 		ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
3630 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3631 		cnt = MIN(uiocnt, uiop->uio_resid);
3632 		if (!(dbp->db_struioflag & STRUIO_SPEC) ||
3633 		    (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
3634 			/*
3635 			 * Either this mblk has already been processed
3636 			 * or there is no more room in this mblk (?).
3637 			 */
3638 			continue;
3639 		}
3640 		switch (typ) {
3641 		case STRUIOT_STANDARD:
3642 			if (noblock) {
3643 				if (on_trap(&otd, OT_DATA_ACCESS)) {
3644 					no_trap();
3645 					error = EWOULDBLOCK;
3646 					goto out;
3647 				}
3648 			}
3649 			if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
3650 				if (noblock)
3651 					no_trap();
3652 				goto out;
3653 			}
3654 			if (noblock)
3655 				no_trap();
3656 			break;
3657 
3658 		default:
3659 			error = EIO;
3660 			goto out;
3661 		}
3662 		dbp->db_struioflag |= STRUIO_DONE;
3663 		dbp->db_cksumstuff += cnt;
3664 	}
3665 out:
3666 	if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
3667 		/*
3668 		 * A fault has occured and some bytes were moved to the
3669 		 * current mblk, the uio_t has already been updated by
3670 		 * the appropriate uio routine, so also update the mblk
3671 		 * to reflect this in case this same mblk chain is used
3672 		 * again (after the fault has been handled).
3673 		 */
3674 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3675 		if (uiocnt >= resid)
3676 			dbp->db_cksumstuff += resid;
3677 	}
3678 	return (error);
3679 }
3680 
3681 /*
3682  * Try to enter queue synchronously. Any attempt to enter a closing queue will
3683  * fails. The qp->q_rwcnt keeps track of the number of successful entries so
3684  * that removeq() will not try to close the queue while a thread is inside the
3685  * queue.
3686  */
3687 static boolean_t
3688 rwnext_enter(queue_t *qp)
3689 {
3690 	mutex_enter(QLOCK(qp));
3691 	if (qp->q_flag & QWCLOSE) {
3692 		mutex_exit(QLOCK(qp));
3693 		return (B_FALSE);
3694 	}
3695 	qp->q_rwcnt++;
3696 	ASSERT(qp->q_rwcnt != 0);
3697 	mutex_exit(QLOCK(qp));
3698 	return (B_TRUE);
3699 }
3700 
3701 /*
3702  * Decrease the count of threads running in sync stream queue and wake up any
3703  * threads blocked in removeq().
3704  */
3705 static void
3706 rwnext_exit(queue_t *qp)
3707 {
3708 	mutex_enter(QLOCK(qp));
3709 	qp->q_rwcnt--;
3710 	if (qp->q_flag & QWANTRMQSYNC) {
3711 		qp->q_flag &= ~QWANTRMQSYNC;
3712 		cv_broadcast(&qp->q_wait);
3713 	}
3714 	mutex_exit(QLOCK(qp));
3715 }
3716 
3717 /*
3718  * The purpose of rwnext() is to call the rw procedure of the next
3719  * (downstream) modules queue.
3720  *
3721  * treated as put entrypoint for perimeter syncronization.
3722  *
3723  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3724  * sync queues). If it is CIPUT sync queue sq_count is incremented and it does
3725  * not matter if any regular put entrypoints have been already entered. We
3726  * can't increment one of the sq_putcounts (instead of sq_count) because
3727  * qwait_rw won't know which counter to decrement.
3728  *
3729  * It would be reasonable to add the lockless FASTPUT logic.
3730  */
3731 int
3732 rwnext(queue_t *qp, struiod_t *dp)
3733 {
3734 	queue_t		*nqp;
3735 	syncq_t		*sq;
3736 	uint16_t	count;
3737 	uint16_t	flags;
3738 	struct qinit	*qi;
3739 	int		(*proc)();
3740 	struct stdata	*stp;
3741 	int		isread;
3742 	int		rval;
3743 
3744 	stp = STREAM(qp);
3745 	/*
3746 	 * Prevent q_next from changing by holding sd_lock until acquiring
3747 	 * SQLOCK. Note that a read-side rwnext from the streamhead will
3748 	 * already have sd_lock acquired. In either case sd_lock is always
3749 	 * released after acquiring SQLOCK.
3750 	 *
3751 	 * The streamhead read-side holding sd_lock when calling rwnext is
3752 	 * required to prevent a race condition were M_DATA mblks flowing
3753 	 * up the read-side of the stream could be bypassed by a rwnext()
3754 	 * down-call. In this case sd_lock acts as the streamhead perimeter.
3755 	 */
3756 	if ((nqp = _WR(qp)) == qp) {
3757 		isread = 0;
3758 		mutex_enter(&stp->sd_lock);
3759 		qp = nqp->q_next;
3760 	} else {
3761 		isread = 1;
3762 		if (nqp != stp->sd_wrq)
3763 			/* Not streamhead */
3764 			mutex_enter(&stp->sd_lock);
3765 		qp = _RD(nqp->q_next);
3766 	}
3767 	qi = qp->q_qinfo;
3768 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
3769 		/*
3770 		 * Not a synchronous module or no r/w procedure for this
3771 		 * queue, so just return EINVAL and let the caller handle it.
3772 		 */
3773 		mutex_exit(&stp->sd_lock);
3774 		return (EINVAL);
3775 	}
3776 
3777 	if (rwnext_enter(qp) == B_FALSE) {
3778 		mutex_exit(&stp->sd_lock);
3779 		return (EINVAL);
3780 	}
3781 
3782 	sq = qp->q_syncq;
3783 	mutex_enter(SQLOCK(sq));
3784 	mutex_exit(&stp->sd_lock);
3785 	count = sq->sq_count;
3786 	flags = sq->sq_flags;
3787 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3788 
3789 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3790 		/*
3791 		 * if this queue is being closed, return.
3792 		 */
3793 		if (qp->q_flag & QWCLOSE) {
3794 			mutex_exit(SQLOCK(sq));
3795 			rwnext_exit(qp);
3796 			return (EINVAL);
3797 		}
3798 
3799 		/*
3800 		 * Wait until we can enter the inner perimeter.
3801 		 */
3802 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3803 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3804 		count = sq->sq_count;
3805 		flags = sq->sq_flags;
3806 	}
3807 
3808 	if (isread == 0 && stp->sd_struiowrq == NULL ||
3809 	    isread == 1 && stp->sd_struiordq == NULL) {
3810 		/*
3811 		 * Stream plumbing changed while waiting for inner perimeter
3812 		 * so just return EINVAL and let the caller handle it.
3813 		 */
3814 		mutex_exit(SQLOCK(sq));
3815 		rwnext_exit(qp);
3816 		return (EINVAL);
3817 	}
3818 	if (!(flags & SQ_CIPUT))
3819 		sq->sq_flags = flags | SQ_EXCL;
3820 	sq->sq_count = count + 1;
3821 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3822 	/*
3823 	 * Note: The only message ordering guarantee that rwnext() makes is
3824 	 *	 for the write queue flow-control case. All others (r/w queue
3825 	 *	 with q_count > 0 (or q_first != 0)) are the resposibilty of
3826 	 *	 the queue's rw procedure. This could be genralized here buy
3827 	 *	 running the queue's service procedure, but that wouldn't be
3828 	 *	 the most efficent for all cases.
3829 	 */
3830 	mutex_exit(SQLOCK(sq));
3831 	if (! isread && (qp->q_flag & QFULL)) {
3832 		/*
3833 		 * Write queue may be flow controlled. If so,
3834 		 * mark the queue for wakeup when it's not.
3835 		 */
3836 		mutex_enter(QLOCK(qp));
3837 		if (qp->q_flag & QFULL) {
3838 			qp->q_flag |= QWANTWSYNC;
3839 			mutex_exit(QLOCK(qp));
3840 			rval = EWOULDBLOCK;
3841 			goto out;
3842 		}
3843 		mutex_exit(QLOCK(qp));
3844 	}
3845 
3846 	if (! isread && dp->d_mp)
3847 		STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
3848 		    dp->d_mp->b_datap->db_base);
3849 
3850 	rval = (*proc)(qp, dp);
3851 
3852 	if (isread && dp->d_mp)
3853 		STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
3854 		    dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
3855 out:
3856 	/*
3857 	 * The queue is protected from being freed by sq_count, so it is
3858 	 * safe to call rwnext_exit and reacquire SQLOCK(sq).
3859 	 */
3860 	rwnext_exit(qp);
3861 
3862 	mutex_enter(SQLOCK(sq));
3863 	flags = sq->sq_flags;
3864 	ASSERT(sq->sq_count != 0);
3865 	sq->sq_count--;
3866 	if (flags & SQ_TAIL) {
3867 		putnext_tail(sq, qp, flags);
3868 		/*
3869 		 * The only purpose of this ASSERT is to preserve calling stack
3870 		 * in DEBUG kernel.
3871 		 */
3872 		ASSERT(flags & SQ_TAIL);
3873 		return (rval);
3874 	}
3875 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3876 	/*
3877 	 * Safe to always drop SQ_EXCL:
3878 	 *	Not SQ_CIPUT means we set SQ_EXCL above
3879 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3880 	 *	did a qwriter(INNER) in which case nobody else
3881 	 *	is in the inner perimeter and we are exiting.
3882 	 *
3883 	 * I would like to make the following assertion:
3884 	 *
3885 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3886 	 *	sq->sq_count == 0);
3887 	 *
3888 	 * which indicates that if we are both putshared and exclusive,
3889 	 * we became exclusive while executing the putproc, and the only
3890 	 * claim on the syncq was the one we dropped a few lines above.
3891 	 * But other threads that enter putnext while the syncq is exclusive
3892 	 * need to make a claim as they may need to drop SQLOCK in the
3893 	 * has_writers case to avoid deadlocks.  If these threads are
3894 	 * delayed or preempted, it is possible that the writer thread can
3895 	 * find out that there are other claims making the (sq_count == 0)
3896 	 * test invalid.
3897 	 */
3898 
3899 	sq->sq_flags = flags & ~SQ_EXCL;
3900 	if (sq->sq_flags & SQ_WANTWAKEUP) {
3901 		sq->sq_flags &= ~SQ_WANTWAKEUP;
3902 		cv_broadcast(&sq->sq_wait);
3903 	}
3904 	mutex_exit(SQLOCK(sq));
3905 	return (rval);
3906 }
3907 
3908 /*
3909  * The purpose of infonext() is to call the info procedure of the next
3910  * (downstream) modules queue.
3911  *
3912  * treated as put entrypoint for perimeter syncronization.
3913  *
3914  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3915  * sync queues). If it is CIPUT sync queue regular sq_count is incremented and
3916  * it does not matter if any regular put entrypoints have been already
3917  * entered.
3918  */
3919 int
3920 infonext(queue_t *qp, infod_t *idp)
3921 {
3922 	queue_t		*nqp;
3923 	syncq_t		*sq;
3924 	uint16_t	count;
3925 	uint16_t	flags;
3926 	struct qinit	*qi;
3927 	int		(*proc)();
3928 	struct stdata	*stp;
3929 	int		rval;
3930 
3931 	stp = STREAM(qp);
3932 	/*
3933 	 * Prevent q_next from changing by holding sd_lock until
3934 	 * acquiring SQLOCK.
3935 	 */
3936 	mutex_enter(&stp->sd_lock);
3937 	if ((nqp = _WR(qp)) == qp) {
3938 		qp = nqp->q_next;
3939 	} else {
3940 		qp = _RD(nqp->q_next);
3941 	}
3942 	qi = qp->q_qinfo;
3943 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
3944 		mutex_exit(&stp->sd_lock);
3945 		return (EINVAL);
3946 	}
3947 	sq = qp->q_syncq;
3948 	mutex_enter(SQLOCK(sq));
3949 	mutex_exit(&stp->sd_lock);
3950 	count = sq->sq_count;
3951 	flags = sq->sq_flags;
3952 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3953 
3954 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3955 		/*
3956 		 * Wait until we can enter the inner perimeter.
3957 		 */
3958 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3959 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3960 		count = sq->sq_count;
3961 		flags = sq->sq_flags;
3962 	}
3963 
3964 	if (! (flags & SQ_CIPUT))
3965 		sq->sq_flags = flags | SQ_EXCL;
3966 	sq->sq_count = count + 1;
3967 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3968 	mutex_exit(SQLOCK(sq));
3969 
3970 	rval = (*proc)(qp, idp);
3971 
3972 	mutex_enter(SQLOCK(sq));
3973 	flags = sq->sq_flags;
3974 	ASSERT(sq->sq_count != 0);
3975 	sq->sq_count--;
3976 	if (flags & SQ_TAIL) {
3977 		putnext_tail(sq, qp, flags);
3978 		/*
3979 		 * The only purpose of this ASSERT is to preserve calling stack
3980 		 * in DEBUG kernel.
3981 		 */
3982 		ASSERT(flags & SQ_TAIL);
3983 		return (rval);
3984 	}
3985 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3986 /*
3987  * XXXX
3988  * I am not certain the next comment is correct here.  I need to consider
3989  * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
3990  * might cause other problems.  It just might be safer to drop it if
3991  * !SQ_CIPUT because that is when we set it.
3992  */
3993 	/*
3994 	 * Safe to always drop SQ_EXCL:
3995 	 *	Not SQ_CIPUT means we set SQ_EXCL above
3996 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3997 	 *	did a qwriter(INNER) in which case nobody else
3998 	 *	is in the inner perimeter and we are exiting.
3999 	 *
4000 	 * I would like to make the following assertion:
4001 	 *
4002 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
4003 	 *	sq->sq_count == 0);
4004 	 *
4005 	 * which indicates that if we are both putshared and exclusive,
4006 	 * we became exclusive while executing the putproc, and the only
4007 	 * claim on the syncq was the one we dropped a few lines above.
4008 	 * But other threads that enter putnext while the syncq is exclusive
4009 	 * need to make a claim as they may need to drop SQLOCK in the
4010 	 * has_writers case to avoid deadlocks.  If these threads are
4011 	 * delayed or preempted, it is possible that the writer thread can
4012 	 * find out that there are other claims making the (sq_count == 0)
4013 	 * test invalid.
4014 	 */
4015 
4016 	sq->sq_flags = flags & ~SQ_EXCL;
4017 	mutex_exit(SQLOCK(sq));
4018 	return (rval);
4019 }
4020 
4021 /*
4022  * Return nonzero if the queue is responsible for struio(), else return 0.
4023  */
4024 int
4025 isuioq(queue_t *q)
4026 {
4027 	if (q->q_flag & QREADR)
4028 		return (STREAM(q)->sd_struiordq == q);
4029 	else
4030 		return (STREAM(q)->sd_struiowrq == q);
4031 }
4032 
4033 #if defined(__sparc)
4034 int disable_putlocks = 0;
4035 #else
4036 int disable_putlocks = 1;
4037 #endif
4038 
4039 /*
4040  * called by create_putlock.
4041  */
4042 static void
4043 create_syncq_putlocks(queue_t *q)
4044 {
4045 	syncq_t	*sq = q->q_syncq;
4046 	ciputctrl_t *cip;
4047 	int i;
4048 
4049 	ASSERT(sq != NULL);
4050 
4051 	ASSERT(disable_putlocks == 0);
4052 	ASSERT(n_ciputctrl >= min_n_ciputctrl);
4053 	ASSERT(ciputctrl_cache != NULL);
4054 
4055 	if (!(sq->sq_type & SQ_CIPUT))
4056 		return;
4057 
4058 	for (i = 0; i <= 1; i++) {
4059 		if (sq->sq_ciputctrl == NULL) {
4060 			cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4061 			SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4062 			mutex_enter(SQLOCK(sq));
4063 			if (sq->sq_ciputctrl != NULL) {
4064 				mutex_exit(SQLOCK(sq));
4065 				kmem_cache_free(ciputctrl_cache, cip);
4066 			} else {
4067 				ASSERT(sq->sq_nciputctrl == 0);
4068 				sq->sq_nciputctrl = n_ciputctrl - 1;
4069 				/*
4070 				 * putnext checks sq_ciputctrl without holding
4071 				 * SQLOCK. if it is not NULL putnext assumes
4072 				 * sq_nciputctrl is initialized. membar below
4073 				 * insures that.
4074 				 */
4075 				membar_producer();
4076 				sq->sq_ciputctrl = cip;
4077 				mutex_exit(SQLOCK(sq));
4078 			}
4079 		}
4080 		ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
4081 		if (i == 1)
4082 			break;
4083 		q = _OTHERQ(q);
4084 		if (!(q->q_flag & QPERQ)) {
4085 			ASSERT(sq == q->q_syncq);
4086 			break;
4087 		}
4088 		ASSERT(q->q_syncq != NULL);
4089 		ASSERT(sq != q->q_syncq);
4090 		sq = q->q_syncq;
4091 		ASSERT(sq->sq_type & SQ_CIPUT);
4092 	}
4093 }
4094 
4095 /*
4096  * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
4097  * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
4098  * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
4099  * starting from q and down to the driver.
4100  *
4101  * This should be called after the affected queues are part of stream
4102  * geometry. It should be called from driver/module open routine after
4103  * qprocson() call. It is also called from nfs syscall where it is known that
4104  * stream is configured and won't change its geometry during create_putlock
4105  * call.
4106  *
4107  * caller normally uses 0 value for the stream argument to speed up MT putnext
4108  * into the perimeter of q for example because its perimeter is per module
4109  * (e.g. IP).
4110  *
4111  * caller normally uses non 0 value for the stream argument to hint the system
4112  * that the stream of q is a very contended global system stream
4113  * (e.g. NFS/UDP) and the part of the stream from q to the driver is
4114  * particularly MT hot.
4115  *
4116  * Caller insures stream plumbing won't happen while we are here and therefore
4117  * q_next can be safely used.
4118  */
4119 
4120 void
4121 create_putlocks(queue_t *q, int stream)
4122 {
4123 	ciputctrl_t	*cip;
4124 	struct stdata	*stp = STREAM(q);
4125 
4126 	q = _WR(q);
4127 	ASSERT(stp != NULL);
4128 
4129 	if (disable_putlocks != 0)
4130 		return;
4131 
4132 	if (n_ciputctrl < min_n_ciputctrl)
4133 		return;
4134 
4135 	ASSERT(ciputctrl_cache != NULL);
4136 
4137 	if (stream != 0 && stp->sd_ciputctrl == NULL) {
4138 		cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4139 		SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4140 		mutex_enter(&stp->sd_lock);
4141 		if (stp->sd_ciputctrl != NULL) {
4142 			mutex_exit(&stp->sd_lock);
4143 			kmem_cache_free(ciputctrl_cache, cip);
4144 		} else {
4145 			ASSERT(stp->sd_nciputctrl == 0);
4146 			stp->sd_nciputctrl = n_ciputctrl - 1;
4147 			/*
4148 			 * putnext checks sd_ciputctrl without holding
4149 			 * sd_lock. if it is not NULL putnext assumes
4150 			 * sd_nciputctrl is initialized. membar below
4151 			 * insures that.
4152 			 */
4153 			membar_producer();
4154 			stp->sd_ciputctrl = cip;
4155 			mutex_exit(&stp->sd_lock);
4156 		}
4157 	}
4158 
4159 	ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);
4160 
4161 	while (_SAMESTR(q)) {
4162 		create_syncq_putlocks(q);
4163 		if (stream == 0)
4164 			return;
4165 		q = q->q_next;
4166 	}
4167 	ASSERT(q != NULL);
4168 	create_syncq_putlocks(q);
4169 }
4170 
4171 /*
4172  * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
4173  * through a stream.
4174  *
4175  * Data currently record per-event is a timestamp, module/driver name,
4176  * downstream module/driver name, optional callstack, event type and a per
4177  * type datum.  Much of the STREAMS framework is instrumented for automatic
4178  * flow tracing (when enabled).  Events can be defined and used by STREAMS
4179  * modules and drivers.
4180  *
4181  * Global objects:
4182  *
4183  *	str_ftevent() - Add a flow-trace event to a dblk.
4184  *	str_ftfree() - Free flow-trace data
4185  *
4186  * Local objects:
4187  *
4188  *	fthdr_cache - pointer to the kmem cache for trace header.
4189  *	ftblk_cache - pointer to the kmem cache for trace data blocks.
4190  */
4191 
4192 int str_ftnever = 1;	/* Don't do STREAMS flow tracing */
4193 int str_ftstack = 0;	/* Don't record event call stacks */
4194 
4195 void
4196 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
4197 {
4198 	ftblk_t *bp = hp->tail;
4199 	ftblk_t *nbp;
4200 	ftevnt_t *ep;
4201 	int ix, nix;
4202 
4203 	ASSERT(hp != NULL);
4204 
4205 	for (;;) {
4206 		if ((ix = bp->ix) == FTBLK_EVNTS) {
4207 			/*
4208 			 * Tail doesn't have room, so need a new tail.
4209 			 *
4210 			 * To make this MT safe, first, allocate a new
4211 			 * ftblk, and initialize it.  To make life a
4212 			 * little easier, reserve the first slot (mostly
4213 			 * by making ix = 1).  When we are finished with
4214 			 * the initialization, CAS this pointer to the
4215 			 * tail.  If this succeeds, this is the new
4216 			 * "next" block.  Otherwise, another thread
4217 			 * got here first, so free the block and start
4218 			 * again.
4219 			 */
4220 			nbp = kmem_cache_alloc(ftblk_cache, KM_NOSLEEP);
4221 			if (nbp == NULL) {
4222 				/* no mem, so punt */
4223 				str_ftnever++;
4224 				/* free up all flow data? */
4225 				return;
4226 			}
4227 			nbp->nxt = NULL;
4228 			nbp->ix = 1;
4229 			/*
4230 			 * Just in case there is another thread about
4231 			 * to get the next index, we need to make sure
4232 			 * the value is there for it.
4233 			 */
4234 			membar_producer();
4235 			if (atomic_cas_ptr(&hp->tail, bp, nbp) == bp) {
4236 				/* CAS was successful */
4237 				bp->nxt = nbp;
4238 				membar_producer();
4239 				bp = nbp;
4240 				ix = 0;
4241 				goto cas_good;
4242 			} else {
4243 				kmem_cache_free(ftblk_cache, nbp);
4244 				bp = hp->tail;
4245 				continue;
4246 			}
4247 		}
4248 		nix = ix + 1;
4249 		if (atomic_cas_32((uint32_t *)&bp->ix, ix, nix) == ix) {
4250 		cas_good:
4251 			if (curthread != hp->thread) {
4252 				hp->thread = curthread;
4253 				evnt |= FTEV_CS;
4254 			}
4255 			if (CPU->cpu_seqid != hp->cpu_seqid) {
4256 				hp->cpu_seqid = CPU->cpu_seqid;
4257 				evnt |= FTEV_PS;
4258 			}
4259 			ep = &bp->ev[ix];
4260 			break;
4261 		}
4262 	}
4263 
4264 	if (evnt & FTEV_QMASK) {
4265 		queue_t *qp = p;
4266 
4267 		if (!(qp->q_flag & QREADR))
4268 			evnt |= FTEV_ISWR;
4269 
4270 		ep->mid = Q2NAME(qp);
4271 
4272 		/*
4273 		 * We only record the next queue name for FTEV_PUTNEXT since
4274 		 * that's the only time we *really* need it, and the putnext()
4275 		 * code ensures that qp->q_next won't vanish.  (We could use
4276 		 * claimstr()/releasestr() but at a performance cost.)
4277 		 */
4278 		if ((evnt & FTEV_MASK) == FTEV_PUTNEXT && qp->q_next != NULL)
4279 			ep->midnext = Q2NAME(qp->q_next);
4280 		else
4281 			ep->midnext = NULL;
4282 	} else {
4283 		ep->mid = p;
4284 		ep->midnext = NULL;
4285 	}
4286 
4287 	if (ep->stk != NULL)
4288 		ep->stk->fs_depth = getpcstack(ep->stk->fs_stk, FTSTK_DEPTH);
4289 
4290 	ep->ts = gethrtime();
4291 	ep->evnt = evnt;
4292 	ep->data = data;
4293 	hp->hash = (hp->hash << 9) + hp->hash;
4294 	hp->hash += (evnt << 16) | data;
4295 	hp->hash += (uintptr_t)ep->mid;
4296 }
4297 
4298 /*
4299  * Free flow-trace data.
4300  */
4301 void
4302 str_ftfree(dblk_t *dbp)
4303 {
4304 	fthdr_t *hp = dbp->db_fthdr;
4305 	ftblk_t *bp = &hp->first;
4306 	ftblk_t *nbp;
4307 
4308 	if (bp != hp->tail || bp->ix != 0) {
4309 		/*
4310 		 * Clear out the hash, have the tail point to itself, and free
4311 		 * any continuation blocks.
4312 		 */
4313 		bp = hp->first.nxt;
4314 		hp->tail = &hp->first;
4315 		hp->hash = 0;
4316 		hp->first.nxt = NULL;
4317 		hp->first.ix = 0;
4318 		while (bp != NULL) {
4319 			nbp = bp->nxt;
4320 			kmem_cache_free(ftblk_cache, bp);
4321 			bp = nbp;
4322 		}
4323 	}
4324 	kmem_cache_free(fthdr_cache, hp);
4325 	dbp->db_fthdr = NULL;
4326 }
4327