xref: /titanic_50/usr/src/uts/common/io/stream.c (revision 9a5d73e03cd3312ddb571a748c40a63c58bd66e5)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
22 /*	  All Rights Reserved  	*/
23 
24 /*
25  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
26  * Use is subject to license terms.
27  */
28 
29 #include <sys/types.h>
30 #include <sys/param.h>
31 #include <sys/thread.h>
32 #include <sys/sysmacros.h>
33 #include <sys/stropts.h>
34 #include <sys/stream.h>
35 #include <sys/strsubr.h>
36 #include <sys/strsun.h>
37 #include <sys/conf.h>
38 #include <sys/debug.h>
39 #include <sys/cmn_err.h>
40 #include <sys/kmem.h>
41 #include <sys/atomic.h>
42 #include <sys/errno.h>
43 #include <sys/vtrace.h>
44 #include <sys/ftrace.h>
45 #include <sys/ontrap.h>
46 #include <sys/multidata.h>
47 #include <sys/multidata_impl.h>
48 #include <sys/sdt.h>
49 #include <sys/strft.h>
50 
51 #ifdef DEBUG
52 #include <sys/kmem_impl.h>
53 #endif
54 
55 /*
56  * This file contains all the STREAMS utility routines that may
57  * be used by modules and drivers.
58  */
59 
60 /*
61  * STREAMS message allocator: principles of operation
62  *
63  * The streams message allocator consists of all the routines that
64  * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
65  * dupb(), freeb() and freemsg().  What follows is a high-level view
66  * of how the allocator works.
67  *
68  * Every streams message consists of one or more mblks, a dblk, and data.
69  * All mblks for all types of messages come from a common mblk_cache.
70  * The dblk and data come in several flavors, depending on how the
71  * message is allocated:
72  *
73  * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
74  *     fixed-size dblk/data caches. For message sizes that are multiples of
75  *     PAGESIZE, dblks are allocated separately from the buffer.
76  *     The associated buffer is allocated by the constructor using kmem_alloc().
77  *     For all other message sizes, dblk and its associated data is allocated
78  *     as a single contiguous chunk of memory.
79  *     Objects in these caches consist of a dblk plus its associated data.
80  *     allocb() determines the nearest-size cache by table lookup:
81  *     the dblk_cache[] array provides the mapping from size to dblk cache.
82  *
83  * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
84  *     kmem_alloc()'ing a buffer for the data and supplying that
85  *     buffer to gesballoc(), described below.
86  *
87  * (3) The four flavors of [d]esballoc[a] are all implemented by a
88  *     common routine, gesballoc() ("generic esballoc").  gesballoc()
89  *     allocates a dblk from the global dblk_esb_cache and sets db_base,
90  *     db_lim and db_frtnp to describe the caller-supplied buffer.
91  *
92  * While there are several routines to allocate messages, there is only
93  * one routine to free messages: freeb().  freeb() simply invokes the
94  * dblk's free method, dbp->db_free(), which is set at allocation time.
95  *
96  * dupb() creates a new reference to a message by allocating a new mblk,
97  * incrementing the dblk reference count and setting the dblk's free
98  * method to dblk_decref().  The dblk's original free method is retained
99  * in db_lastfree.  dblk_decref() decrements the reference count on each
100  * freeb().  If this is not the last reference it just frees the mblk;
101  * if this *is* the last reference, it restores db_free to db_lastfree,
102  * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
103  *
104  * The implementation makes aggressive use of kmem object caching for
105  * maximum performance.  This makes the code simple and compact, but
106  * also a bit abstruse in some places.  The invariants that constitute a
107  * message's constructed state, described below, are more subtle than usual.
108  *
109  * Every dblk has an "attached mblk" as part of its constructed state.
110  * The mblk is allocated by the dblk's constructor and remains attached
111  * until the message is either dup'ed or pulled up.  In the dupb() case
112  * the mblk association doesn't matter until the last free, at which time
113  * dblk_decref() attaches the last mblk to the dblk.  pullupmsg() affects
114  * the mblk association because it swaps the leading mblks of two messages,
115  * so it is responsible for swapping their db_mblk pointers accordingly.
116  * From a constructed-state viewpoint it doesn't matter that a dblk's
117  * attached mblk can change while the message is allocated; all that
118  * matters is that the dblk has *some* attached mblk when it's freed.
119  *
120  * The sizes of the allocb() small-message caches are not magical.
121  * They represent a good trade-off between internal and external
122  * fragmentation for current workloads.  They should be reevaluated
123  * periodically, especially if allocations larger than DBLK_MAX_CACHE
124  * become common.  We use 64-byte alignment so that dblks don't
125  * straddle cache lines unnecessarily.
126  */
127 #define	DBLK_MAX_CACHE		73728
128 #define	DBLK_CACHE_ALIGN	64
129 #define	DBLK_MIN_SIZE		8
130 #define	DBLK_SIZE_SHIFT		3
131 
132 #ifdef _BIG_ENDIAN
133 #define	DBLK_RTFU_SHIFT(field)	\
134 	(8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
135 #else
136 #define	DBLK_RTFU_SHIFT(field)	\
137 	(8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
138 #endif
139 
140 #define	DBLK_RTFU(ref, type, flags, uioflag)	\
141 	(((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
142 	((type) << DBLK_RTFU_SHIFT(db_type)) | \
143 	(((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
144 	((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
145 #define	DBLK_RTFU_REF_MASK	(DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
146 #define	DBLK_RTFU_WORD(dbp)	(*((uint32_t *)&(dbp)->db_ref))
147 #define	MBLK_BAND_FLAG_WORD(mp)	(*((uint32_t *)&(mp)->b_band))
148 
149 static size_t dblk_sizes[] = {
150 #ifdef _LP64
151 	16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856,
152 	8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624,
153 	40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392,
154 #else
155 	64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904,
156 	8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672,
157 	40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440,
158 #endif
159 	DBLK_MAX_CACHE, 0
160 };
161 
162 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
163 static struct kmem_cache *mblk_cache;
164 static struct kmem_cache *dblk_esb_cache;
165 static struct kmem_cache *fthdr_cache;
166 static struct kmem_cache *ftblk_cache;
167 
168 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
169 static mblk_t *allocb_oversize(size_t size, int flags);
170 static int allocb_tryhard_fails;
171 static void frnop_func(void *arg);
172 frtn_t frnop = { frnop_func };
173 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
174 
175 static boolean_t rwnext_enter(queue_t *qp);
176 static void rwnext_exit(queue_t *qp);
177 
178 /*
179  * Patchable mblk/dblk kmem_cache flags.
180  */
181 int dblk_kmem_flags = 0;
182 int mblk_kmem_flags = 0;
183 
184 static int
185 dblk_constructor(void *buf, void *cdrarg, int kmflags)
186 {
187 	dblk_t *dbp = buf;
188 	ssize_t msg_size = (ssize_t)cdrarg;
189 	size_t index;
190 
191 	ASSERT(msg_size != 0);
192 
193 	index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
194 
195 	ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
196 
197 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
198 		return (-1);
199 	if ((msg_size & PAGEOFFSET) == 0) {
200 		dbp->db_base = kmem_alloc(msg_size, kmflags);
201 		if (dbp->db_base == NULL) {
202 			kmem_cache_free(mblk_cache, dbp->db_mblk);
203 			return (-1);
204 		}
205 	} else {
206 		dbp->db_base = (unsigned char *)&dbp[1];
207 	}
208 
209 	dbp->db_mblk->b_datap = dbp;
210 	dbp->db_cache = dblk_cache[index];
211 	dbp->db_lim = dbp->db_base + msg_size;
212 	dbp->db_free = dbp->db_lastfree = dblk_lastfree;
213 	dbp->db_frtnp = NULL;
214 	dbp->db_fthdr = NULL;
215 	dbp->db_credp = NULL;
216 	dbp->db_cpid = -1;
217 	dbp->db_struioflag = 0;
218 	dbp->db_struioun.cksum.flags = 0;
219 	return (0);
220 }
221 
222 /*ARGSUSED*/
223 static int
224 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
225 {
226 	dblk_t *dbp = buf;
227 
228 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
229 		return (-1);
230 	dbp->db_mblk->b_datap = dbp;
231 	dbp->db_cache = dblk_esb_cache;
232 	dbp->db_fthdr = NULL;
233 	dbp->db_credp = NULL;
234 	dbp->db_cpid = -1;
235 	dbp->db_struioflag = 0;
236 	dbp->db_struioun.cksum.flags = 0;
237 	return (0);
238 }
239 
240 static int
241 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
242 {
243 	dblk_t *dbp = buf;
244 	bcache_t *bcp = cdrarg;
245 
246 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
247 		return (-1);
248 
249 	dbp->db_base = kmem_cache_alloc(bcp->buffer_cache, kmflags);
250 	if (dbp->db_base == NULL) {
251 		kmem_cache_free(mblk_cache, dbp->db_mblk);
252 		return (-1);
253 	}
254 
255 	dbp->db_mblk->b_datap = dbp;
256 	dbp->db_cache = (void *)bcp;
257 	dbp->db_lim = dbp->db_base + bcp->size;
258 	dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
259 	dbp->db_frtnp = NULL;
260 	dbp->db_fthdr = NULL;
261 	dbp->db_credp = NULL;
262 	dbp->db_cpid = -1;
263 	dbp->db_struioflag = 0;
264 	dbp->db_struioun.cksum.flags = 0;
265 	return (0);
266 }
267 
268 /*ARGSUSED*/
269 static void
270 dblk_destructor(void *buf, void *cdrarg)
271 {
272 	dblk_t *dbp = buf;
273 	ssize_t msg_size = (ssize_t)cdrarg;
274 
275 	ASSERT(dbp->db_mblk->b_datap == dbp);
276 	ASSERT(msg_size != 0);
277 	ASSERT(dbp->db_struioflag == 0);
278 	ASSERT(dbp->db_struioun.cksum.flags == 0);
279 
280 	if ((msg_size & PAGEOFFSET) == 0) {
281 		kmem_free(dbp->db_base, msg_size);
282 	}
283 
284 	kmem_cache_free(mblk_cache, dbp->db_mblk);
285 }
286 
287 static void
288 bcache_dblk_destructor(void *buf, void *cdrarg)
289 {
290 	dblk_t *dbp = buf;
291 	bcache_t *bcp = cdrarg;
292 
293 	kmem_cache_free(bcp->buffer_cache, dbp->db_base);
294 
295 	ASSERT(dbp->db_mblk->b_datap == dbp);
296 	ASSERT(dbp->db_struioflag == 0);
297 	ASSERT(dbp->db_struioun.cksum.flags == 0);
298 
299 	kmem_cache_free(mblk_cache, dbp->db_mblk);
300 }
301 
302 /* ARGSUSED */
303 static int
304 ftblk_constructor(void *buf, void *cdrarg, int kmflags)
305 {
306 	ftblk_t *fbp = buf;
307 	int i;
308 
309 	bzero(fbp, sizeof (ftblk_t));
310 	if (str_ftstack != 0) {
311 		for (i = 0; i < FTBLK_EVNTS; i++)
312 			fbp->ev[i].stk = kmem_alloc(sizeof (ftstk_t), kmflags);
313 	}
314 
315 	return (0);
316 }
317 
318 /* ARGSUSED */
319 static void
320 ftblk_destructor(void *buf, void *cdrarg)
321 {
322 	ftblk_t *fbp = buf;
323 	int i;
324 
325 	if (str_ftstack != 0) {
326 		for (i = 0; i < FTBLK_EVNTS; i++) {
327 			if (fbp->ev[i].stk != NULL) {
328 				kmem_free(fbp->ev[i].stk, sizeof (ftstk_t));
329 				fbp->ev[i].stk = NULL;
330 			}
331 		}
332 	}
333 }
334 
335 static int
336 fthdr_constructor(void *buf, void *cdrarg, int kmflags)
337 {
338 	fthdr_t *fhp = buf;
339 
340 	return (ftblk_constructor(&fhp->first, cdrarg, kmflags));
341 }
342 
343 static void
344 fthdr_destructor(void *buf, void *cdrarg)
345 {
346 	fthdr_t *fhp = buf;
347 
348 	ftblk_destructor(&fhp->first, cdrarg);
349 }
350 
351 void
352 streams_msg_init(void)
353 {
354 	char name[40];
355 	size_t size;
356 	size_t lastsize = DBLK_MIN_SIZE;
357 	size_t *sizep;
358 	struct kmem_cache *cp;
359 	size_t tot_size;
360 	int offset;
361 
362 	mblk_cache = kmem_cache_create("streams_mblk", sizeof (mblk_t), 32,
363 	    NULL, NULL, NULL, NULL, NULL, mblk_kmem_flags);
364 
365 	for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
366 
367 		if ((offset = (size & PAGEOFFSET)) != 0) {
368 			/*
369 			 * We are in the middle of a page, dblk should
370 			 * be allocated on the same page
371 			 */
372 			tot_size = size + sizeof (dblk_t);
373 			ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
374 			    < PAGESIZE);
375 			ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
376 
377 		} else {
378 
379 			/*
380 			 * buf size is multiple of page size, dblk and
381 			 * buffer are allocated separately.
382 			 */
383 
384 			ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
385 			tot_size = sizeof (dblk_t);
386 		}
387 
388 		(void) sprintf(name, "streams_dblk_%ld", size);
389 		cp = kmem_cache_create(name, tot_size, DBLK_CACHE_ALIGN,
390 		    dblk_constructor, dblk_destructor, NULL, (void *)(size),
391 		    NULL, dblk_kmem_flags);
392 
393 		while (lastsize <= size) {
394 			dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
395 			lastsize += DBLK_MIN_SIZE;
396 		}
397 	}
398 
399 	dblk_esb_cache = kmem_cache_create("streams_dblk_esb", sizeof (dblk_t),
400 	    DBLK_CACHE_ALIGN, dblk_esb_constructor, dblk_destructor, NULL,
401 	    (void *)sizeof (dblk_t), NULL, dblk_kmem_flags);
402 	fthdr_cache = kmem_cache_create("streams_fthdr", sizeof (fthdr_t), 32,
403 	    fthdr_constructor, fthdr_destructor, NULL, NULL, NULL, 0);
404 	ftblk_cache = kmem_cache_create("streams_ftblk", sizeof (ftblk_t), 32,
405 	    ftblk_constructor, ftblk_destructor, NULL, NULL, NULL, 0);
406 
407 	/* Initialize Multidata caches */
408 	mmd_init();
409 
410 	/* initialize throttling queue for esballoc */
411 	esballoc_queue_init();
412 }
413 
414 /*ARGSUSED*/
415 mblk_t *
416 allocb(size_t size, uint_t pri)
417 {
418 	dblk_t *dbp;
419 	mblk_t *mp;
420 	size_t index;
421 
422 	index =  (size - 1)  >> DBLK_SIZE_SHIFT;
423 
424 	if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
425 		if (size != 0) {
426 			mp = allocb_oversize(size, KM_NOSLEEP);
427 			goto out;
428 		}
429 		index = 0;
430 	}
431 
432 	if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
433 		mp = NULL;
434 		goto out;
435 	}
436 
437 	mp = dbp->db_mblk;
438 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
439 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
440 	mp->b_rptr = mp->b_wptr = dbp->db_base;
441 	mp->b_queue = NULL;
442 	MBLK_BAND_FLAG_WORD(mp) = 0;
443 	STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
444 out:
445 	FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
446 
447 	return (mp);
448 }
449 
450 /*
451  * Allocate an mblk taking db_credp and db_cpid from the template.
452  * Allow the cred to be NULL.
453  */
454 mblk_t *
455 allocb_tmpl(size_t size, const mblk_t *tmpl)
456 {
457 	mblk_t *mp = allocb(size, 0);
458 
459 	if (mp != NULL) {
460 		dblk_t *src = tmpl->b_datap;
461 		dblk_t *dst = mp->b_datap;
462 		cred_t *cr = src->db_credp;
463 
464 		if (cr != NULL)
465 			crhold(dst->db_credp = cr);
466 		dst->db_cpid = src->db_cpid;
467 		dst->db_type = src->db_type;
468 	}
469 	return (mp);
470 }
471 
472 mblk_t *
473 allocb_cred(size_t size, cred_t *cr, pid_t cpid)
474 {
475 	mblk_t *mp = allocb(size, 0);
476 
477 	ASSERT(cr != NULL);
478 	if (mp != NULL) {
479 		dblk_t *dbp = mp->b_datap;
480 
481 		crhold(dbp->db_credp = cr);
482 		dbp->db_cpid = cpid;
483 	}
484 	return (mp);
485 }
486 
487 mblk_t *
488 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr, pid_t cpid)
489 {
490 	mblk_t *mp = allocb_wait(size, 0, flags, error);
491 
492 	ASSERT(cr != NULL);
493 	if (mp != NULL) {
494 		dblk_t *dbp = mp->b_datap;
495 
496 		crhold(dbp->db_credp = cr);
497 		dbp->db_cpid = cpid;
498 	}
499 
500 	return (mp);
501 }
502 
503 /*
504  * Extract the db_cred (and optionally db_cpid) from a message.
505  * We find the first mblk which has a non-NULL db_cred and use that.
506  * If none found we return NULL.
507  * Does NOT get a hold on the cred.
508  */
509 cred_t *
510 msg_getcred(const mblk_t *mp, pid_t *cpidp)
511 {
512 	cred_t *cr = NULL;
513 	cred_t *cr2;
514 
515 	while (mp != NULL) {
516 		dblk_t *dbp = mp->b_datap;
517 
518 		cr = dbp->db_credp;
519 		if (cr == NULL) {
520 			mp = mp->b_cont;
521 			continue;
522 		}
523 		if (cpidp != NULL)
524 			*cpidp = dbp->db_cpid;
525 
526 #ifdef DEBUG
527 		/*
528 		 * Normally there should at most one db_credp in a message.
529 		 * But if there are multiple (as in the case of some M_IOC*
530 		 * and some internal messages in TCP/IP bind logic) then
531 		 * they must be identical in the normal case.
532 		 * However, a socket can be shared between different uids
533 		 * in which case data queued in TCP would be from different
534 		 * creds. Thus we can only assert for the zoneid being the
535 		 * same. Due to Multi-level Level Ports for TX, some
536 		 * cred_t can have a NULL cr_zone, and we skip the comparison
537 		 * in that case.
538 		 */
539 		cr2 = msg_getcred(mp->b_cont, NULL);
540 		if (cr2 != NULL) {
541 			DTRACE_PROBE2(msg__getcred,
542 			    cred_t *, cr, cred_t *, cr2);
543 			ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
544 			    crgetzone(cr) == NULL ||
545 			    crgetzone(cr2) == NULL);
546 		}
547 #endif
548 		return (cr);
549 	}
550 	if (cpidp != NULL)
551 		*cpidp = NOPID;
552 	return (NULL);
553 }
554 
555 /*
556  * Variant of msg_getcred which, when a cred is found
557  * 1. Returns with a hold on the cred
558  * 2. Clears the first cred in the mblk.
559  * This is more efficient to use than a msg_getcred() + crhold() when
560  * the message is freed after the cred has been extracted.
561  *
562  * The caller is responsible for ensuring that there is no other reference
563  * on the message since db_credp can not be cleared when there are other
564  * references.
565  */
566 cred_t *
567 msg_extractcred(mblk_t *mp, pid_t *cpidp)
568 {
569 	cred_t *cr = NULL;
570 	cred_t *cr2;
571 
572 	while (mp != NULL) {
573 		dblk_t *dbp = mp->b_datap;
574 
575 		cr = dbp->db_credp;
576 		if (cr == NULL) {
577 			mp = mp->b_cont;
578 			continue;
579 		}
580 		ASSERT(dbp->db_ref == 1);
581 		dbp->db_credp = NULL;
582 		if (cpidp != NULL)
583 			*cpidp = dbp->db_cpid;
584 #ifdef DEBUG
585 		/*
586 		 * Normally there should at most one db_credp in a message.
587 		 * But if there are multiple (as in the case of some M_IOC*
588 		 * and some internal messages in TCP/IP bind logic) then
589 		 * they must be identical in the normal case.
590 		 * However, a socket can be shared between different uids
591 		 * in which case data queued in TCP would be from different
592 		 * creds. Thus we can only assert for the zoneid being the
593 		 * same. Due to Multi-level Level Ports for TX, some
594 		 * cred_t can have a NULL cr_zone, and we skip the comparison
595 		 * in that case.
596 		 */
597 		cr2 = msg_getcred(mp->b_cont, NULL);
598 		if (cr2 != NULL) {
599 			DTRACE_PROBE2(msg__extractcred,
600 			    cred_t *, cr, cred_t *, cr2);
601 			ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
602 			    crgetzone(cr) == NULL ||
603 			    crgetzone(cr2) == NULL);
604 		}
605 #endif
606 		return (cr);
607 	}
608 	return (NULL);
609 }
610 /*
611  * Get the label for a message. Uses the first mblk in the message
612  * which has a non-NULL db_credp.
613  * Returns NULL if there is no credp.
614  */
615 extern struct ts_label_s *
616 msg_getlabel(const mblk_t *mp)
617 {
618 	cred_t *cr = msg_getcred(mp, NULL);
619 
620 	if (cr == NULL)
621 		return (NULL);
622 
623 	return (crgetlabel(cr));
624 }
625 
626 void
627 freeb(mblk_t *mp)
628 {
629 	dblk_t *dbp = mp->b_datap;
630 
631 	ASSERT(dbp->db_ref > 0);
632 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
633 	FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
634 
635 	STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
636 
637 	dbp->db_free(mp, dbp);
638 }
639 
640 void
641 freemsg(mblk_t *mp)
642 {
643 	FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
644 	while (mp) {
645 		dblk_t *dbp = mp->b_datap;
646 		mblk_t *mp_cont = mp->b_cont;
647 
648 		ASSERT(dbp->db_ref > 0);
649 		ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
650 
651 		STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
652 
653 		dbp->db_free(mp, dbp);
654 		mp = mp_cont;
655 	}
656 }
657 
658 /*
659  * Reallocate a block for another use.  Try hard to use the old block.
660  * If the old data is wanted (copy), leave b_wptr at the end of the data,
661  * otherwise return b_wptr = b_rptr.
662  *
663  * This routine is private and unstable.
664  */
665 mblk_t	*
666 reallocb(mblk_t *mp, size_t size, uint_t copy)
667 {
668 	mblk_t		*mp1;
669 	unsigned char	*old_rptr;
670 	ptrdiff_t	cur_size;
671 
672 	if (mp == NULL)
673 		return (allocb(size, BPRI_HI));
674 
675 	cur_size = mp->b_wptr - mp->b_rptr;
676 	old_rptr = mp->b_rptr;
677 
678 	ASSERT(mp->b_datap->db_ref != 0);
679 
680 	if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
681 		/*
682 		 * If the data is wanted and it will fit where it is, no
683 		 * work is required.
684 		 */
685 		if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
686 			return (mp);
687 
688 		mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
689 		mp1 = mp;
690 	} else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
691 		/* XXX other mp state could be copied too, db_flags ... ? */
692 		mp1->b_cont = mp->b_cont;
693 	} else {
694 		return (NULL);
695 	}
696 
697 	if (copy) {
698 		bcopy(old_rptr, mp1->b_rptr, cur_size);
699 		mp1->b_wptr = mp1->b_rptr + cur_size;
700 	}
701 
702 	if (mp != mp1)
703 		freeb(mp);
704 
705 	return (mp1);
706 }
707 
708 static void
709 dblk_lastfree(mblk_t *mp, dblk_t *dbp)
710 {
711 	ASSERT(dbp->db_mblk == mp);
712 	if (dbp->db_fthdr != NULL)
713 		str_ftfree(dbp);
714 
715 	/* set credp and projid to be 'unspecified' before returning to cache */
716 	if (dbp->db_credp != NULL) {
717 		crfree(dbp->db_credp);
718 		dbp->db_credp = NULL;
719 	}
720 	dbp->db_cpid = -1;
721 
722 	/* Reset the struioflag and the checksum flag fields */
723 	dbp->db_struioflag = 0;
724 	dbp->db_struioun.cksum.flags = 0;
725 
726 	/* and the COOKED and/or UIOA flag(s) */
727 	dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA);
728 
729 	kmem_cache_free(dbp->db_cache, dbp);
730 }
731 
732 static void
733 dblk_decref(mblk_t *mp, dblk_t *dbp)
734 {
735 	if (dbp->db_ref != 1) {
736 		uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
737 		    -(1 << DBLK_RTFU_SHIFT(db_ref)));
738 		/*
739 		 * atomic_add_32_nv() just decremented db_ref, so we no longer
740 		 * have a reference to the dblk, which means another thread
741 		 * could free it.  Therefore we cannot examine the dblk to
742 		 * determine whether ours was the last reference.  Instead,
743 		 * we extract the new and minimum reference counts from rtfu.
744 		 * Note that all we're really saying is "if (ref != refmin)".
745 		 */
746 		if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
747 		    ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
748 			kmem_cache_free(mblk_cache, mp);
749 			return;
750 		}
751 	}
752 	dbp->db_mblk = mp;
753 	dbp->db_free = dbp->db_lastfree;
754 	dbp->db_lastfree(mp, dbp);
755 }
756 
757 mblk_t *
758 dupb(mblk_t *mp)
759 {
760 	dblk_t *dbp = mp->b_datap;
761 	mblk_t *new_mp;
762 	uint32_t oldrtfu, newrtfu;
763 
764 	if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
765 		goto out;
766 
767 	new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
768 	new_mp->b_rptr = mp->b_rptr;
769 	new_mp->b_wptr = mp->b_wptr;
770 	new_mp->b_datap = dbp;
771 	new_mp->b_queue = NULL;
772 	MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
773 
774 	STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
775 
776 	dbp->db_free = dblk_decref;
777 	do {
778 		ASSERT(dbp->db_ref > 0);
779 		oldrtfu = DBLK_RTFU_WORD(dbp);
780 		newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
781 		/*
782 		 * If db_ref is maxed out we can't dup this message anymore.
783 		 */
784 		if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
785 			kmem_cache_free(mblk_cache, new_mp);
786 			new_mp = NULL;
787 			goto out;
788 		}
789 	} while (cas32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != oldrtfu);
790 
791 out:
792 	FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
793 	return (new_mp);
794 }
795 
796 static void
797 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
798 {
799 	frtn_t *frp = dbp->db_frtnp;
800 
801 	ASSERT(dbp->db_mblk == mp);
802 	frp->free_func(frp->free_arg);
803 	if (dbp->db_fthdr != NULL)
804 		str_ftfree(dbp);
805 
806 	/* set credp and projid to be 'unspecified' before returning to cache */
807 	if (dbp->db_credp != NULL) {
808 		crfree(dbp->db_credp);
809 		dbp->db_credp = NULL;
810 	}
811 	dbp->db_cpid = -1;
812 	dbp->db_struioflag = 0;
813 	dbp->db_struioun.cksum.flags = 0;
814 
815 	kmem_cache_free(dbp->db_cache, dbp);
816 }
817 
818 /*ARGSUSED*/
819 static void
820 frnop_func(void *arg)
821 {
822 }
823 
824 /*
825  * Generic esballoc used to implement the four flavors: [d]esballoc[a].
826  */
827 static mblk_t *
828 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
829 	void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
830 {
831 	dblk_t *dbp;
832 	mblk_t *mp;
833 
834 	ASSERT(base != NULL && frp != NULL);
835 
836 	if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
837 		mp = NULL;
838 		goto out;
839 	}
840 
841 	mp = dbp->db_mblk;
842 	dbp->db_base = base;
843 	dbp->db_lim = base + size;
844 	dbp->db_free = dbp->db_lastfree = lastfree;
845 	dbp->db_frtnp = frp;
846 	DBLK_RTFU_WORD(dbp) = db_rtfu;
847 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
848 	mp->b_rptr = mp->b_wptr = base;
849 	mp->b_queue = NULL;
850 	MBLK_BAND_FLAG_WORD(mp) = 0;
851 
852 out:
853 	FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
854 	return (mp);
855 }
856 
857 /*ARGSUSED*/
858 mblk_t *
859 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
860 {
861 	mblk_t *mp;
862 
863 	/*
864 	 * Note that this is structured to allow the common case (i.e.
865 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
866 	 * call optimization.
867 	 */
868 	if (!str_ftnever) {
869 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
870 		    frp, freebs_enqueue, KM_NOSLEEP);
871 
872 		if (mp != NULL)
873 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
874 		return (mp);
875 	}
876 
877 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
878 	    frp, freebs_enqueue, KM_NOSLEEP));
879 }
880 
881 /*
882  * Same as esballoc() but sleeps waiting for memory.
883  */
884 /*ARGSUSED*/
885 mblk_t *
886 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
887 {
888 	mblk_t *mp;
889 
890 	/*
891 	 * Note that this is structured to allow the common case (i.e.
892 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
893 	 * call optimization.
894 	 */
895 	if (!str_ftnever) {
896 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
897 		    frp, freebs_enqueue, KM_SLEEP);
898 
899 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
900 		return (mp);
901 	}
902 
903 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
904 	    frp, freebs_enqueue, KM_SLEEP));
905 }
906 
907 /*ARGSUSED*/
908 mblk_t *
909 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
910 {
911 	mblk_t *mp;
912 
913 	/*
914 	 * Note that this is structured to allow the common case (i.e.
915 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
916 	 * call optimization.
917 	 */
918 	if (!str_ftnever) {
919 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
920 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
921 
922 		if (mp != NULL)
923 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
924 		return (mp);
925 	}
926 
927 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
928 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
929 }
930 
931 /*ARGSUSED*/
932 mblk_t *
933 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
934 {
935 	mblk_t *mp;
936 
937 	/*
938 	 * Note that this is structured to allow the common case (i.e.
939 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
940 	 * call optimization.
941 	 */
942 	if (!str_ftnever) {
943 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
944 		    frp, freebs_enqueue, KM_NOSLEEP);
945 
946 		if (mp != NULL)
947 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
948 		return (mp);
949 	}
950 
951 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
952 	    frp, freebs_enqueue, KM_NOSLEEP));
953 }
954 
955 /*ARGSUSED*/
956 mblk_t *
957 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
958 {
959 	mblk_t *mp;
960 
961 	/*
962 	 * Note that this is structured to allow the common case (i.e.
963 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
964 	 * call optimization.
965 	 */
966 	if (!str_ftnever) {
967 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
968 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
969 
970 		if (mp != NULL)
971 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
972 		return (mp);
973 	}
974 
975 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
976 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
977 }
978 
979 static void
980 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
981 {
982 	bcache_t *bcp = dbp->db_cache;
983 
984 	ASSERT(dbp->db_mblk == mp);
985 	if (dbp->db_fthdr != NULL)
986 		str_ftfree(dbp);
987 
988 	/* set credp and projid to be 'unspecified' before returning to cache */
989 	if (dbp->db_credp != NULL) {
990 		crfree(dbp->db_credp);
991 		dbp->db_credp = NULL;
992 	}
993 	dbp->db_cpid = -1;
994 	dbp->db_struioflag = 0;
995 	dbp->db_struioun.cksum.flags = 0;
996 
997 	mutex_enter(&bcp->mutex);
998 	kmem_cache_free(bcp->dblk_cache, dbp);
999 	bcp->alloc--;
1000 
1001 	if (bcp->alloc == 0 && bcp->destroy != 0) {
1002 		kmem_cache_destroy(bcp->dblk_cache);
1003 		kmem_cache_destroy(bcp->buffer_cache);
1004 		mutex_exit(&bcp->mutex);
1005 		mutex_destroy(&bcp->mutex);
1006 		kmem_free(bcp, sizeof (bcache_t));
1007 	} else {
1008 		mutex_exit(&bcp->mutex);
1009 	}
1010 }
1011 
1012 bcache_t *
1013 bcache_create(char *name, size_t size, uint_t align)
1014 {
1015 	bcache_t *bcp;
1016 	char buffer[255];
1017 
1018 	ASSERT((align & (align - 1)) == 0);
1019 
1020 	if ((bcp = kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == NULL)
1021 		return (NULL);
1022 
1023 	bcp->size = size;
1024 	bcp->align = align;
1025 	bcp->alloc = 0;
1026 	bcp->destroy = 0;
1027 
1028 	mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
1029 
1030 	(void) sprintf(buffer, "%s_buffer_cache", name);
1031 	bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
1032 	    NULL, NULL, NULL, 0);
1033 	(void) sprintf(buffer, "%s_dblk_cache", name);
1034 	bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
1035 	    DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
1036 	    NULL, (void *)bcp, NULL, 0);
1037 
1038 	return (bcp);
1039 }
1040 
1041 void
1042 bcache_destroy(bcache_t *bcp)
1043 {
1044 	ASSERT(bcp != NULL);
1045 
1046 	mutex_enter(&bcp->mutex);
1047 	if (bcp->alloc == 0) {
1048 		kmem_cache_destroy(bcp->dblk_cache);
1049 		kmem_cache_destroy(bcp->buffer_cache);
1050 		mutex_exit(&bcp->mutex);
1051 		mutex_destroy(&bcp->mutex);
1052 		kmem_free(bcp, sizeof (bcache_t));
1053 	} else {
1054 		bcp->destroy++;
1055 		mutex_exit(&bcp->mutex);
1056 	}
1057 }
1058 
1059 /*ARGSUSED*/
1060 mblk_t *
1061 bcache_allocb(bcache_t *bcp, uint_t pri)
1062 {
1063 	dblk_t *dbp;
1064 	mblk_t *mp = NULL;
1065 
1066 	ASSERT(bcp != NULL);
1067 
1068 	mutex_enter(&bcp->mutex);
1069 	if (bcp->destroy != 0) {
1070 		mutex_exit(&bcp->mutex);
1071 		goto out;
1072 	}
1073 
1074 	if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
1075 		mutex_exit(&bcp->mutex);
1076 		goto out;
1077 	}
1078 	bcp->alloc++;
1079 	mutex_exit(&bcp->mutex);
1080 
1081 	ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
1082 
1083 	mp = dbp->db_mblk;
1084 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1085 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
1086 	mp->b_rptr = mp->b_wptr = dbp->db_base;
1087 	mp->b_queue = NULL;
1088 	MBLK_BAND_FLAG_WORD(mp) = 0;
1089 	STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
1090 out:
1091 	FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
1092 
1093 	return (mp);
1094 }
1095 
1096 static void
1097 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
1098 {
1099 	ASSERT(dbp->db_mblk == mp);
1100 	if (dbp->db_fthdr != NULL)
1101 		str_ftfree(dbp);
1102 
1103 	/* set credp and projid to be 'unspecified' before returning to cache */
1104 	if (dbp->db_credp != NULL) {
1105 		crfree(dbp->db_credp);
1106 		dbp->db_credp = NULL;
1107 	}
1108 	dbp->db_cpid = -1;
1109 	dbp->db_struioflag = 0;
1110 	dbp->db_struioun.cksum.flags = 0;
1111 
1112 	kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
1113 	kmem_cache_free(dbp->db_cache, dbp);
1114 }
1115 
1116 static mblk_t *
1117 allocb_oversize(size_t size, int kmflags)
1118 {
1119 	mblk_t *mp;
1120 	void *buf;
1121 
1122 	size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
1123 	if ((buf = kmem_alloc(size, kmflags)) == NULL)
1124 		return (NULL);
1125 	if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
1126 	    &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
1127 		kmem_free(buf, size);
1128 
1129 	if (mp != NULL)
1130 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
1131 
1132 	return (mp);
1133 }
1134 
1135 mblk_t *
1136 allocb_tryhard(size_t target_size)
1137 {
1138 	size_t size;
1139 	mblk_t *bp;
1140 
1141 	for (size = target_size; size < target_size + 512;
1142 	    size += DBLK_CACHE_ALIGN)
1143 		if ((bp = allocb(size, BPRI_HI)) != NULL)
1144 			return (bp);
1145 	allocb_tryhard_fails++;
1146 	return (NULL);
1147 }
1148 
1149 /*
1150  * This routine is consolidation private for STREAMS internal use
1151  * This routine may only be called from sync routines (i.e., not
1152  * from put or service procedures).  It is located here (rather
1153  * than strsubr.c) so that we don't have to expose all of the
1154  * allocb() implementation details in header files.
1155  */
1156 mblk_t *
1157 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
1158 {
1159 	dblk_t *dbp;
1160 	mblk_t *mp;
1161 	size_t index;
1162 
1163 	index = (size -1) >> DBLK_SIZE_SHIFT;
1164 
1165 	if (flags & STR_NOSIG) {
1166 		if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
1167 			if (size != 0) {
1168 				mp = allocb_oversize(size, KM_SLEEP);
1169 				FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
1170 				    (uintptr_t)mp);
1171 				return (mp);
1172 			}
1173 			index = 0;
1174 		}
1175 
1176 		dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
1177 		mp = dbp->db_mblk;
1178 		DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1179 		mp->b_next = mp->b_prev = mp->b_cont = NULL;
1180 		mp->b_rptr = mp->b_wptr = dbp->db_base;
1181 		mp->b_queue = NULL;
1182 		MBLK_BAND_FLAG_WORD(mp) = 0;
1183 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
1184 
1185 		FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
1186 
1187 	} else {
1188 		while ((mp = allocb(size, pri)) == NULL) {
1189 			if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
1190 				return (NULL);
1191 		}
1192 	}
1193 
1194 	return (mp);
1195 }
1196 
1197 /*
1198  * Call function 'func' with 'arg' when a class zero block can
1199  * be allocated with priority 'pri'.
1200  */
1201 bufcall_id_t
1202 esbbcall(uint_t pri, void (*func)(void *), void *arg)
1203 {
1204 	return (bufcall(1, pri, func, arg));
1205 }
1206 
1207 /*
1208  * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
1209  * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
1210  * This provides consistency for all internal allocators of ioctl.
1211  */
1212 mblk_t *
1213 mkiocb(uint_t cmd)
1214 {
1215 	struct iocblk	*ioc;
1216 	mblk_t		*mp;
1217 
1218 	/*
1219 	 * Allocate enough space for any of the ioctl related messages.
1220 	 */
1221 	if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
1222 		return (NULL);
1223 
1224 	bzero(mp->b_rptr, sizeof (union ioctypes));
1225 
1226 	/*
1227 	 * Set the mblk_t information and ptrs correctly.
1228 	 */
1229 	mp->b_wptr += sizeof (struct iocblk);
1230 	mp->b_datap->db_type = M_IOCTL;
1231 
1232 	/*
1233 	 * Fill in the fields.
1234 	 */
1235 	ioc		= (struct iocblk *)mp->b_rptr;
1236 	ioc->ioc_cmd	= cmd;
1237 	ioc->ioc_cr	= kcred;
1238 	ioc->ioc_id	= getiocseqno();
1239 	ioc->ioc_flag	= IOC_NATIVE;
1240 	return (mp);
1241 }
1242 
1243 /*
1244  * test if block of given size can be allocated with a request of
1245  * the given priority.
1246  * 'pri' is no longer used, but is retained for compatibility.
1247  */
1248 /* ARGSUSED */
1249 int
1250 testb(size_t size, uint_t pri)
1251 {
1252 	return ((size + sizeof (dblk_t)) <= kmem_avail());
1253 }
1254 
1255 /*
1256  * Call function 'func' with argument 'arg' when there is a reasonably
1257  * good chance that a block of size 'size' can be allocated.
1258  * 'pri' is no longer used, but is retained for compatibility.
1259  */
1260 /* ARGSUSED */
1261 bufcall_id_t
1262 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
1263 {
1264 	static long bid = 1;	/* always odd to save checking for zero */
1265 	bufcall_id_t bc_id;
1266 	struct strbufcall *bcp;
1267 
1268 	if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
1269 		return (0);
1270 
1271 	bcp->bc_func = func;
1272 	bcp->bc_arg = arg;
1273 	bcp->bc_size = size;
1274 	bcp->bc_next = NULL;
1275 	bcp->bc_executor = NULL;
1276 
1277 	mutex_enter(&strbcall_lock);
1278 	/*
1279 	 * After bcp is linked into strbcalls and strbcall_lock is dropped there
1280 	 * should be no references to bcp since it may be freed by
1281 	 * runbufcalls(). Since bcp_id field is returned, we save its value in
1282 	 * the local var.
1283 	 */
1284 	bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2);	/* keep it odd */
1285 
1286 	/*
1287 	 * add newly allocated stream event to existing
1288 	 * linked list of events.
1289 	 */
1290 	if (strbcalls.bc_head == NULL) {
1291 		strbcalls.bc_head = strbcalls.bc_tail = bcp;
1292 	} else {
1293 		strbcalls.bc_tail->bc_next = bcp;
1294 		strbcalls.bc_tail = bcp;
1295 	}
1296 
1297 	cv_signal(&strbcall_cv);
1298 	mutex_exit(&strbcall_lock);
1299 	return (bc_id);
1300 }
1301 
1302 /*
1303  * Cancel a bufcall request.
1304  */
1305 void
1306 unbufcall(bufcall_id_t id)
1307 {
1308 	strbufcall_t *bcp, *pbcp;
1309 
1310 	mutex_enter(&strbcall_lock);
1311 again:
1312 	pbcp = NULL;
1313 	for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
1314 		if (id == bcp->bc_id)
1315 			break;
1316 		pbcp = bcp;
1317 	}
1318 	if (bcp) {
1319 		if (bcp->bc_executor != NULL) {
1320 			if (bcp->bc_executor != curthread) {
1321 				cv_wait(&bcall_cv, &strbcall_lock);
1322 				goto again;
1323 			}
1324 		} else {
1325 			if (pbcp)
1326 				pbcp->bc_next = bcp->bc_next;
1327 			else
1328 				strbcalls.bc_head = bcp->bc_next;
1329 			if (bcp == strbcalls.bc_tail)
1330 				strbcalls.bc_tail = pbcp;
1331 			kmem_free(bcp, sizeof (strbufcall_t));
1332 		}
1333 	}
1334 	mutex_exit(&strbcall_lock);
1335 }
1336 
1337 /*
1338  * Duplicate a message block by block (uses dupb), returning
1339  * a pointer to the duplicate message.
1340  * Returns a non-NULL value only if the entire message
1341  * was dup'd.
1342  */
1343 mblk_t *
1344 dupmsg(mblk_t *bp)
1345 {
1346 	mblk_t *head, *nbp;
1347 
1348 	if (!bp || !(nbp = head = dupb(bp)))
1349 		return (NULL);
1350 
1351 	while (bp->b_cont) {
1352 		if (!(nbp->b_cont = dupb(bp->b_cont))) {
1353 			freemsg(head);
1354 			return (NULL);
1355 		}
1356 		nbp = nbp->b_cont;
1357 		bp = bp->b_cont;
1358 	}
1359 	return (head);
1360 }
1361 
1362 #define	DUPB_NOLOAN(bp) \
1363 	((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
1364 	copyb((bp)) : dupb((bp)))
1365 
1366 mblk_t *
1367 dupmsg_noloan(mblk_t *bp)
1368 {
1369 	mblk_t *head, *nbp;
1370 
1371 	if (bp == NULL || DB_TYPE(bp) != M_DATA ||
1372 	    ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
1373 		return (NULL);
1374 
1375 	while (bp->b_cont) {
1376 		if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
1377 			freemsg(head);
1378 			return (NULL);
1379 		}
1380 		nbp = nbp->b_cont;
1381 		bp = bp->b_cont;
1382 	}
1383 	return (head);
1384 }
1385 
1386 /*
1387  * Copy data from message and data block to newly allocated message and
1388  * data block. Returns new message block pointer, or NULL if error.
1389  * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
1390  * as in the original even when db_base is not word aligned. (bug 1052877)
1391  */
1392 mblk_t *
1393 copyb(mblk_t *bp)
1394 {
1395 	mblk_t	*nbp;
1396 	dblk_t	*dp, *ndp;
1397 	uchar_t *base;
1398 	size_t	size;
1399 	size_t	unaligned;
1400 
1401 	ASSERT(bp->b_wptr >= bp->b_rptr);
1402 
1403 	dp = bp->b_datap;
1404 	if (dp->db_fthdr != NULL)
1405 		STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
1406 
1407 	/*
1408 	 * Special handling for Multidata message; this should be
1409 	 * removed once a copy-callback routine is made available.
1410 	 */
1411 	if (dp->db_type == M_MULTIDATA) {
1412 		cred_t *cr;
1413 
1414 		if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL)
1415 			return (NULL);
1416 
1417 		nbp->b_flag = bp->b_flag;
1418 		nbp->b_band = bp->b_band;
1419 		ndp = nbp->b_datap;
1420 
1421 		/* See comments below on potential issues. */
1422 		STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1423 
1424 		ASSERT(ndp->db_type == dp->db_type);
1425 		cr = dp->db_credp;
1426 		if (cr != NULL)
1427 			crhold(ndp->db_credp = cr);
1428 		ndp->db_cpid = dp->db_cpid;
1429 		return (nbp);
1430 	}
1431 
1432 	size = dp->db_lim - dp->db_base;
1433 	unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
1434 	if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
1435 		return (NULL);
1436 	nbp->b_flag = bp->b_flag;
1437 	nbp->b_band = bp->b_band;
1438 	ndp = nbp->b_datap;
1439 
1440 	/*
1441 	 * Well, here is a potential issue.  If we are trying to
1442 	 * trace a flow, and we copy the message, we might lose
1443 	 * information about where this message might have been.
1444 	 * So we should inherit the FT data.  On the other hand,
1445 	 * a user might be interested only in alloc to free data.
1446 	 * So I guess the real answer is to provide a tunable.
1447 	 */
1448 	STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1449 
1450 	base = ndp->db_base + unaligned;
1451 	bcopy(dp->db_base, ndp->db_base + unaligned, size);
1452 
1453 	nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
1454 	nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
1455 
1456 	return (nbp);
1457 }
1458 
1459 /*
1460  * Copy data from message to newly allocated message using new
1461  * data blocks.  Returns a pointer to the new message, or NULL if error.
1462  */
1463 mblk_t *
1464 copymsg(mblk_t *bp)
1465 {
1466 	mblk_t *head, *nbp;
1467 
1468 	if (!bp || !(nbp = head = copyb(bp)))
1469 		return (NULL);
1470 
1471 	while (bp->b_cont) {
1472 		if (!(nbp->b_cont = copyb(bp->b_cont))) {
1473 			freemsg(head);
1474 			return (NULL);
1475 		}
1476 		nbp = nbp->b_cont;
1477 		bp = bp->b_cont;
1478 	}
1479 	return (head);
1480 }
1481 
1482 /*
1483  * link a message block to tail of message
1484  */
1485 void
1486 linkb(mblk_t *mp, mblk_t *bp)
1487 {
1488 	ASSERT(mp && bp);
1489 
1490 	for (; mp->b_cont; mp = mp->b_cont)
1491 		;
1492 	mp->b_cont = bp;
1493 }
1494 
1495 /*
1496  * unlink a message block from head of message
1497  * return pointer to new message.
1498  * NULL if message becomes empty.
1499  */
1500 mblk_t *
1501 unlinkb(mblk_t *bp)
1502 {
1503 	mblk_t *bp1;
1504 
1505 	bp1 = bp->b_cont;
1506 	bp->b_cont = NULL;
1507 	return (bp1);
1508 }
1509 
1510 /*
1511  * remove a message block "bp" from message "mp"
1512  *
1513  * Return pointer to new message or NULL if no message remains.
1514  * Return -1 if bp is not found in message.
1515  */
1516 mblk_t *
1517 rmvb(mblk_t *mp, mblk_t *bp)
1518 {
1519 	mblk_t *tmp;
1520 	mblk_t *lastp = NULL;
1521 
1522 	ASSERT(mp && bp);
1523 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
1524 		if (tmp == bp) {
1525 			if (lastp)
1526 				lastp->b_cont = tmp->b_cont;
1527 			else
1528 				mp = tmp->b_cont;
1529 			tmp->b_cont = NULL;
1530 			return (mp);
1531 		}
1532 		lastp = tmp;
1533 	}
1534 	return ((mblk_t *)-1);
1535 }
1536 
1537 /*
1538  * Concatenate and align first len bytes of common
1539  * message type.  Len == -1, means concat everything.
1540  * Returns 1 on success, 0 on failure
1541  * After the pullup, mp points to the pulled up data.
1542  */
1543 int
1544 pullupmsg(mblk_t *mp, ssize_t len)
1545 {
1546 	mblk_t *bp, *b_cont;
1547 	dblk_t *dbp;
1548 	ssize_t n;
1549 
1550 	ASSERT(mp->b_datap->db_ref > 0);
1551 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
1552 
1553 	/*
1554 	 * We won't handle Multidata message, since it contains
1555 	 * metadata which this function has no knowledge of; we
1556 	 * assert on DEBUG, and return failure otherwise.
1557 	 */
1558 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1559 	if (mp->b_datap->db_type == M_MULTIDATA)
1560 		return (0);
1561 
1562 	if (len == -1) {
1563 		if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
1564 			return (1);
1565 		len = xmsgsize(mp);
1566 	} else {
1567 		ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
1568 		ASSERT(first_mblk_len >= 0);
1569 		/*
1570 		 * If the length is less than that of the first mblk,
1571 		 * we want to pull up the message into an aligned mblk.
1572 		 * Though not part of the spec, some callers assume it.
1573 		 */
1574 		if (len <= first_mblk_len) {
1575 			if (str_aligned(mp->b_rptr))
1576 				return (1);
1577 			len = first_mblk_len;
1578 		} else if (xmsgsize(mp) < len)
1579 			return (0);
1580 	}
1581 
1582 	if ((bp = allocb_tmpl(len, mp)) == NULL)
1583 		return (0);
1584 
1585 	dbp = bp->b_datap;
1586 	*bp = *mp;		/* swap mblks so bp heads the old msg... */
1587 	mp->b_datap = dbp;	/* ... and mp heads the new message */
1588 	mp->b_datap->db_mblk = mp;
1589 	bp->b_datap->db_mblk = bp;
1590 	mp->b_rptr = mp->b_wptr = dbp->db_base;
1591 
1592 	do {
1593 		ASSERT(bp->b_datap->db_ref > 0);
1594 		ASSERT(bp->b_wptr >= bp->b_rptr);
1595 		n = MIN(bp->b_wptr - bp->b_rptr, len);
1596 		bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
1597 		mp->b_wptr += n;
1598 		bp->b_rptr += n;
1599 		len -= n;
1600 		if (bp->b_rptr != bp->b_wptr)
1601 			break;
1602 		b_cont = bp->b_cont;
1603 		freeb(bp);
1604 		bp = b_cont;
1605 	} while (len && bp);
1606 
1607 	mp->b_cont = bp;	/* tack on whatever wasn't pulled up */
1608 
1609 	return (1);
1610 }
1611 
1612 /*
1613  * Concatenate and align at least the first len bytes of common message
1614  * type.  Len == -1 means concatenate everything.  The original message is
1615  * unaltered.  Returns a pointer to a new message on success, otherwise
1616  * returns NULL.
1617  */
1618 mblk_t *
1619 msgpullup(mblk_t *mp, ssize_t len)
1620 {
1621 	mblk_t	*newmp;
1622 	ssize_t	totlen;
1623 	ssize_t	n;
1624 
1625 	/*
1626 	 * We won't handle Multidata message, since it contains
1627 	 * metadata which this function has no knowledge of; we
1628 	 * assert on DEBUG, and return failure otherwise.
1629 	 */
1630 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1631 	if (mp->b_datap->db_type == M_MULTIDATA)
1632 		return (NULL);
1633 
1634 	totlen = xmsgsize(mp);
1635 
1636 	if ((len > 0) && (len > totlen))
1637 		return (NULL);
1638 
1639 	/*
1640 	 * Copy all of the first msg type into one new mblk, then dupmsg
1641 	 * and link the rest onto this.
1642 	 */
1643 
1644 	len = totlen;
1645 
1646 	if ((newmp = allocb_tmpl(len, mp)) == NULL)
1647 		return (NULL);
1648 
1649 	newmp->b_flag = mp->b_flag;
1650 	newmp->b_band = mp->b_band;
1651 
1652 	while (len > 0) {
1653 		n = mp->b_wptr - mp->b_rptr;
1654 		ASSERT(n >= 0);		/* allow zero-length mblk_t's */
1655 		if (n > 0)
1656 			bcopy(mp->b_rptr, newmp->b_wptr, n);
1657 		newmp->b_wptr += n;
1658 		len -= n;
1659 		mp = mp->b_cont;
1660 	}
1661 
1662 	if (mp != NULL) {
1663 		newmp->b_cont = dupmsg(mp);
1664 		if (newmp->b_cont == NULL) {
1665 			freemsg(newmp);
1666 			return (NULL);
1667 		}
1668 	}
1669 
1670 	return (newmp);
1671 }
1672 
1673 /*
1674  * Trim bytes from message
1675  *  len > 0, trim from head
1676  *  len < 0, trim from tail
1677  * Returns 1 on success, 0 on failure.
1678  */
1679 int
1680 adjmsg(mblk_t *mp, ssize_t len)
1681 {
1682 	mblk_t *bp;
1683 	mblk_t *save_bp = NULL;
1684 	mblk_t *prev_bp;
1685 	mblk_t *bcont;
1686 	unsigned char type;
1687 	ssize_t n;
1688 	int fromhead;
1689 	int first;
1690 
1691 	ASSERT(mp != NULL);
1692 	/*
1693 	 * We won't handle Multidata message, since it contains
1694 	 * metadata which this function has no knowledge of; we
1695 	 * assert on DEBUG, and return failure otherwise.
1696 	 */
1697 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1698 	if (mp->b_datap->db_type == M_MULTIDATA)
1699 		return (0);
1700 
1701 	if (len < 0) {
1702 		fromhead = 0;
1703 		len = -len;
1704 	} else {
1705 		fromhead = 1;
1706 	}
1707 
1708 	if (xmsgsize(mp) < len)
1709 		return (0);
1710 
1711 	if (fromhead) {
1712 		first = 1;
1713 		while (len) {
1714 			ASSERT(mp->b_wptr >= mp->b_rptr);
1715 			n = MIN(mp->b_wptr - mp->b_rptr, len);
1716 			mp->b_rptr += n;
1717 			len -= n;
1718 
1719 			/*
1720 			 * If this is not the first zero length
1721 			 * message remove it
1722 			 */
1723 			if (!first && (mp->b_wptr == mp->b_rptr)) {
1724 				bcont = mp->b_cont;
1725 				freeb(mp);
1726 				mp = save_bp->b_cont = bcont;
1727 			} else {
1728 				save_bp = mp;
1729 				mp = mp->b_cont;
1730 			}
1731 			first = 0;
1732 		}
1733 	} else {
1734 		type = mp->b_datap->db_type;
1735 		while (len) {
1736 			bp = mp;
1737 			save_bp = NULL;
1738 
1739 			/*
1740 			 * Find the last message of same type
1741 			 */
1742 			while (bp && bp->b_datap->db_type == type) {
1743 				ASSERT(bp->b_wptr >= bp->b_rptr);
1744 				prev_bp = save_bp;
1745 				save_bp = bp;
1746 				bp = bp->b_cont;
1747 			}
1748 			if (save_bp == NULL)
1749 				break;
1750 			n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
1751 			save_bp->b_wptr -= n;
1752 			len -= n;
1753 
1754 			/*
1755 			 * If this is not the first message
1756 			 * and we have taken away everything
1757 			 * from this message, remove it
1758 			 */
1759 
1760 			if ((save_bp != mp) &&
1761 			    (save_bp->b_wptr == save_bp->b_rptr)) {
1762 				bcont = save_bp->b_cont;
1763 				freeb(save_bp);
1764 				prev_bp->b_cont = bcont;
1765 			}
1766 		}
1767 	}
1768 	return (1);
1769 }
1770 
1771 /*
1772  * get number of data bytes in message
1773  */
1774 size_t
1775 msgdsize(mblk_t *bp)
1776 {
1777 	size_t count = 0;
1778 
1779 	for (; bp; bp = bp->b_cont)
1780 		if (bp->b_datap->db_type == M_DATA) {
1781 			ASSERT(bp->b_wptr >= bp->b_rptr);
1782 			count += bp->b_wptr - bp->b_rptr;
1783 		}
1784 	return (count);
1785 }
1786 
1787 /*
1788  * Get a message off head of queue
1789  *
1790  * If queue has no buffers then mark queue
1791  * with QWANTR. (queue wants to be read by
1792  * someone when data becomes available)
1793  *
1794  * If there is something to take off then do so.
1795  * If queue falls below hi water mark turn off QFULL
1796  * flag.  Decrement weighted count of queue.
1797  * Also turn off QWANTR because queue is being read.
1798  *
1799  * The queue count is maintained on a per-band basis.
1800  * Priority band 0 (normal messages) uses q_count,
1801  * q_lowat, etc.  Non-zero priority bands use the
1802  * fields in their respective qband structures
1803  * (qb_count, qb_lowat, etc.)  All messages appear
1804  * on the same list, linked via their b_next pointers.
1805  * q_first is the head of the list.  q_count does
1806  * not reflect the size of all the messages on the
1807  * queue.  It only reflects those messages in the
1808  * normal band of flow.  The one exception to this
1809  * deals with high priority messages.  They are in
1810  * their own conceptual "band", but are accounted
1811  * against q_count.
1812  *
1813  * If queue count is below the lo water mark and QWANTW
1814  * is set, enable the closest backq which has a service
1815  * procedure and turn off the QWANTW flag.
1816  *
1817  * getq could be built on top of rmvq, but isn't because
1818  * of performance considerations.
1819  *
1820  * A note on the use of q_count and q_mblkcnt:
1821  *   q_count is the traditional byte count for messages that
1822  *   have been put on a queue.  Documentation tells us that
1823  *   we shouldn't rely on that count, but some drivers/modules
1824  *   do.  What was needed, however, is a mechanism to prevent
1825  *   runaway streams from consuming all of the resources,
1826  *   and particularly be able to flow control zero-length
1827  *   messages.  q_mblkcnt is used for this purpose.  It
1828  *   counts the number of mblk's that are being put on
1829  *   the queue.  The intention here, is that each mblk should
1830  *   contain one byte of data and, for the purpose of
1831  *   flow-control, logically does.  A queue will become
1832  *   full when EITHER of these values (q_count and q_mblkcnt)
1833  *   reach the highwater mark.  It will clear when BOTH
1834  *   of them drop below the highwater mark.  And it will
1835  *   backenable when BOTH of them drop below the lowwater
1836  *   mark.
1837  *   With this algorithm, a driver/module might be able
1838  *   to find a reasonably accurate q_count, and the
1839  *   framework can still try and limit resource usage.
1840  */
1841 mblk_t *
1842 getq(queue_t *q)
1843 {
1844 	mblk_t *bp;
1845 	uchar_t band = 0;
1846 
1847 	bp = getq_noenab(q, 0);
1848 	if (bp != NULL)
1849 		band = bp->b_band;
1850 
1851 	/*
1852 	 * Inlined from qbackenable().
1853 	 * Quick check without holding the lock.
1854 	 */
1855 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1856 		return (bp);
1857 
1858 	qbackenable(q, band);
1859 	return (bp);
1860 }
1861 
1862 /*
1863  * Calculate number of data bytes in a single data message block taking
1864  * multidata messages into account.
1865  */
1866 
1867 #define	ADD_MBLK_SIZE(mp, size) 					\
1868 	if (DB_TYPE(mp) != M_MULTIDATA) {				\
1869 		(size) += MBLKL(mp);					\
1870 	} else {							\
1871 		uint_t	pinuse;						\
1872 									\
1873 		mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse);	\
1874 		(size) += pinuse;					\
1875 	}
1876 
1877 /*
1878  * Returns the number of bytes in a message (a message is defined as a
1879  * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we
1880  * also return the number of distinct mblks in the message.
1881  */
1882 int
1883 mp_cont_len(mblk_t *bp, int *mblkcnt)
1884 {
1885 	mblk_t	*mp;
1886 	int	mblks = 0;
1887 	int	bytes = 0;
1888 
1889 	for (mp = bp; mp != NULL; mp = mp->b_cont) {
1890 		ADD_MBLK_SIZE(mp, bytes);
1891 		mblks++;
1892 	}
1893 
1894 	if (mblkcnt != NULL)
1895 		*mblkcnt = mblks;
1896 
1897 	return (bytes);
1898 }
1899 
1900 /*
1901  * Like getq() but does not backenable.  This is used by the stream
1902  * head when a putback() is likely.  The caller must call qbackenable()
1903  * after it is done with accessing the queue.
1904  * The rbytes arguments to getq_noneab() allows callers to specify a
1905  * the maximum number of bytes to return. If the current amount on the
1906  * queue is less than this then the entire message will be returned.
1907  * A value of 0 returns the entire message and is equivalent to the old
1908  * default behaviour prior to the addition of the rbytes argument.
1909  */
1910 mblk_t *
1911 getq_noenab(queue_t *q, ssize_t rbytes)
1912 {
1913 	mblk_t *bp, *mp1;
1914 	mblk_t *mp2 = NULL;
1915 	qband_t *qbp;
1916 	kthread_id_t freezer;
1917 	int	bytecnt = 0, mblkcnt = 0;
1918 
1919 	/* freezestr should allow its caller to call getq/putq */
1920 	freezer = STREAM(q)->sd_freezer;
1921 	if (freezer == curthread) {
1922 		ASSERT(frozenstr(q));
1923 		ASSERT(MUTEX_HELD(QLOCK(q)));
1924 	} else
1925 		mutex_enter(QLOCK(q));
1926 
1927 	if ((bp = q->q_first) == 0) {
1928 		q->q_flag |= QWANTR;
1929 	} else {
1930 		/*
1931 		 * If the caller supplied a byte threshold and there is
1932 		 * more than this amount on the queue then break up the
1933 		 * the message appropriately.  We can only safely do
1934 		 * this for M_DATA messages.
1935 		 */
1936 		if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) &&
1937 		    (q->q_count > rbytes)) {
1938 			/*
1939 			 * Inline version of mp_cont_len() which terminates
1940 			 * when we meet or exceed rbytes.
1941 			 */
1942 			for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) {
1943 				mblkcnt++;
1944 				ADD_MBLK_SIZE(mp1, bytecnt);
1945 				if (bytecnt  >= rbytes)
1946 					break;
1947 			}
1948 			/*
1949 			 * We need to account for the following scenarios:
1950 			 *
1951 			 * 1) Too much data in the first message:
1952 			 *	mp1 will be the mblk which puts us over our
1953 			 *	byte limit.
1954 			 * 2) Not enough data in the first message:
1955 			 *	mp1 will be NULL.
1956 			 * 3) Exactly the right amount of data contained within
1957 			 *    whole mblks:
1958 			 *	mp1->b_cont will be where we break the message.
1959 			 */
1960 			if (bytecnt > rbytes) {
1961 				/*
1962 				 * Dup/copy mp1 and put what we don't need
1963 				 * back onto the queue. Adjust the read/write
1964 				 * and continuation pointers appropriately
1965 				 * and decrement the current mblk count to
1966 				 * reflect we are putting an mblk back onto
1967 				 * the queue.
1968 				 * When adjusting the message pointers, it's
1969 				 * OK to use the existing bytecnt and the
1970 				 * requested amount (rbytes) to calculate the
1971 				 * the new write offset (b_wptr) of what we
1972 				 * are taking. However, we  cannot use these
1973 				 * values when calculating the read offset of
1974 				 * the mblk we are putting back on the queue.
1975 				 * This is because the begining (b_rptr) of the
1976 				 * mblk represents some arbitrary point within
1977 				 * the message.
1978 				 * It's simplest to do this by advancing b_rptr
1979 				 * by the new length of mp1 as we don't have to
1980 				 * remember any intermediate state.
1981 				 */
1982 				ASSERT(mp1 != NULL);
1983 				mblkcnt--;
1984 				if ((mp2 = dupb(mp1)) == NULL &&
1985 				    (mp2 = copyb(mp1)) == NULL) {
1986 					bytecnt = mblkcnt = 0;
1987 					goto dup_failed;
1988 				}
1989 				mp2->b_cont = mp1->b_cont;
1990 				mp1->b_wptr -= bytecnt - rbytes;
1991 				mp2->b_rptr += mp1->b_wptr - mp1->b_rptr;
1992 				mp1->b_cont = NULL;
1993 				bytecnt = rbytes;
1994 			} else {
1995 				/*
1996 				 * Either there is not enough data in the first
1997 				 * message or there is no excess data to deal
1998 				 * with. If mp1 is NULL, we are taking the
1999 				 * whole message. No need to do anything.
2000 				 * Otherwise we assign mp1->b_cont to mp2 as
2001 				 * we will be putting this back onto the head of
2002 				 * the queue.
2003 				 */
2004 				if (mp1 != NULL) {
2005 					mp2 = mp1->b_cont;
2006 					mp1->b_cont = NULL;
2007 				}
2008 			}
2009 			/*
2010 			 * If mp2 is not NULL then we have part of the message
2011 			 * to put back onto the queue.
2012 			 */
2013 			if (mp2 != NULL) {
2014 				if ((mp2->b_next = bp->b_next) == NULL)
2015 					q->q_last = mp2;
2016 				else
2017 					bp->b_next->b_prev = mp2;
2018 				q->q_first = mp2;
2019 			} else {
2020 				if ((q->q_first = bp->b_next) == NULL)
2021 					q->q_last = NULL;
2022 				else
2023 					q->q_first->b_prev = NULL;
2024 			}
2025 		} else {
2026 			/*
2027 			 * Either no byte threshold was supplied, there is
2028 			 * not enough on the queue or we failed to
2029 			 * duplicate/copy a data block. In these cases we
2030 			 * just take the entire first message.
2031 			 */
2032 dup_failed:
2033 			bytecnt = mp_cont_len(bp, &mblkcnt);
2034 			if ((q->q_first = bp->b_next) == NULL)
2035 				q->q_last = NULL;
2036 			else
2037 				q->q_first->b_prev = NULL;
2038 		}
2039 		if (bp->b_band == 0) {
2040 			q->q_count -= bytecnt;
2041 			q->q_mblkcnt -= mblkcnt;
2042 			if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2043 			    (q->q_mblkcnt < q->q_hiwat))) {
2044 				q->q_flag &= ~QFULL;
2045 			}
2046 		} else {
2047 			int i;
2048 
2049 			ASSERT(bp->b_band <= q->q_nband);
2050 			ASSERT(q->q_bandp != NULL);
2051 			ASSERT(MUTEX_HELD(QLOCK(q)));
2052 			qbp = q->q_bandp;
2053 			i = bp->b_band;
2054 			while (--i > 0)
2055 				qbp = qbp->qb_next;
2056 			if (qbp->qb_first == qbp->qb_last) {
2057 				qbp->qb_first = NULL;
2058 				qbp->qb_last = NULL;
2059 			} else {
2060 				qbp->qb_first = bp->b_next;
2061 			}
2062 			qbp->qb_count -= bytecnt;
2063 			qbp->qb_mblkcnt -= mblkcnt;
2064 			if (qbp->qb_mblkcnt == 0 ||
2065 			    ((qbp->qb_count < qbp->qb_hiwat) &&
2066 			    (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2067 				qbp->qb_flag &= ~QB_FULL;
2068 			}
2069 		}
2070 		q->q_flag &= ~QWANTR;
2071 		bp->b_next = NULL;
2072 		bp->b_prev = NULL;
2073 	}
2074 	if (freezer != curthread)
2075 		mutex_exit(QLOCK(q));
2076 
2077 	STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL);
2078 
2079 	return (bp);
2080 }
2081 
2082 /*
2083  * Determine if a backenable is needed after removing a message in the
2084  * specified band.
2085  * NOTE: This routine assumes that something like getq_noenab() has been
2086  * already called.
2087  *
2088  * For the read side it is ok to hold sd_lock across calling this (and the
2089  * stream head often does).
2090  * But for the write side strwakeq might be invoked and it acquires sd_lock.
2091  */
2092 void
2093 qbackenable(queue_t *q, uchar_t band)
2094 {
2095 	int backenab = 0;
2096 	qband_t *qbp;
2097 	kthread_id_t freezer;
2098 
2099 	ASSERT(q);
2100 	ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
2101 
2102 	/*
2103 	 * Quick check without holding the lock.
2104 	 * OK since after getq() has lowered the q_count these flags
2105 	 * would not change unless either the qbackenable() is done by
2106 	 * another thread (which is ok) or the queue has gotten QFULL
2107 	 * in which case another backenable will take place when the queue
2108 	 * drops below q_lowat.
2109 	 */
2110 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
2111 		return;
2112 
2113 	/* freezestr should allow its caller to call getq/putq */
2114 	freezer = STREAM(q)->sd_freezer;
2115 	if (freezer == curthread) {
2116 		ASSERT(frozenstr(q));
2117 		ASSERT(MUTEX_HELD(QLOCK(q)));
2118 	} else
2119 		mutex_enter(QLOCK(q));
2120 
2121 	if (band == 0) {
2122 		if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
2123 		    q->q_mblkcnt < q->q_lowat)) {
2124 			backenab = q->q_flag & (QWANTW|QWANTWSYNC);
2125 		}
2126 	} else {
2127 		int i;
2128 
2129 		ASSERT((unsigned)band <= q->q_nband);
2130 		ASSERT(q->q_bandp != NULL);
2131 
2132 		qbp = q->q_bandp;
2133 		i = band;
2134 		while (--i > 0)
2135 			qbp = qbp->qb_next;
2136 
2137 		if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
2138 		    qbp->qb_mblkcnt < qbp->qb_lowat)) {
2139 			backenab = qbp->qb_flag & QB_WANTW;
2140 		}
2141 	}
2142 
2143 	if (backenab == 0) {
2144 		if (freezer != curthread)
2145 			mutex_exit(QLOCK(q));
2146 		return;
2147 	}
2148 
2149 	/* Have to drop the lock across strwakeq and backenable */
2150 	if (backenab & QWANTWSYNC)
2151 		q->q_flag &= ~QWANTWSYNC;
2152 	if (backenab & (QWANTW|QB_WANTW)) {
2153 		if (band != 0)
2154 			qbp->qb_flag &= ~QB_WANTW;
2155 		else {
2156 			q->q_flag &= ~QWANTW;
2157 		}
2158 	}
2159 
2160 	if (freezer != curthread)
2161 		mutex_exit(QLOCK(q));
2162 
2163 	if (backenab & QWANTWSYNC)
2164 		strwakeq(q, QWANTWSYNC);
2165 	if (backenab & (QWANTW|QB_WANTW))
2166 		backenable(q, band);
2167 }
2168 
2169 /*
2170  * Remove a message from a queue.  The queue count and other
2171  * flow control parameters are adjusted and the back queue
2172  * enabled if necessary.
2173  *
2174  * rmvq can be called with the stream frozen, but other utility functions
2175  * holding QLOCK, and by streams modules without any locks/frozen.
2176  */
2177 void
2178 rmvq(queue_t *q, mblk_t *mp)
2179 {
2180 	ASSERT(mp != NULL);
2181 
2182 	rmvq_noenab(q, mp);
2183 	if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
2184 		/*
2185 		 * qbackenable can handle a frozen stream but not a "random"
2186 		 * qlock being held. Drop lock across qbackenable.
2187 		 */
2188 		mutex_exit(QLOCK(q));
2189 		qbackenable(q, mp->b_band);
2190 		mutex_enter(QLOCK(q));
2191 	} else {
2192 		qbackenable(q, mp->b_band);
2193 	}
2194 }
2195 
2196 /*
2197  * Like rmvq() but without any backenabling.
2198  * This exists to handle SR_CONSOL_DATA in strrput().
2199  */
2200 void
2201 rmvq_noenab(queue_t *q, mblk_t *mp)
2202 {
2203 	int i;
2204 	qband_t *qbp = NULL;
2205 	kthread_id_t freezer;
2206 	int	bytecnt = 0, mblkcnt = 0;
2207 
2208 	freezer = STREAM(q)->sd_freezer;
2209 	if (freezer == curthread) {
2210 		ASSERT(frozenstr(q));
2211 		ASSERT(MUTEX_HELD(QLOCK(q)));
2212 	} else if (MUTEX_HELD(QLOCK(q))) {
2213 		/* Don't drop lock on exit */
2214 		freezer = curthread;
2215 	} else
2216 		mutex_enter(QLOCK(q));
2217 
2218 	ASSERT(mp->b_band <= q->q_nband);
2219 	if (mp->b_band != 0) {		/* Adjust band pointers */
2220 		ASSERT(q->q_bandp != NULL);
2221 		qbp = q->q_bandp;
2222 		i = mp->b_band;
2223 		while (--i > 0)
2224 			qbp = qbp->qb_next;
2225 		if (mp == qbp->qb_first) {
2226 			if (mp->b_next && mp->b_band == mp->b_next->b_band)
2227 				qbp->qb_first = mp->b_next;
2228 			else
2229 				qbp->qb_first = NULL;
2230 		}
2231 		if (mp == qbp->qb_last) {
2232 			if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
2233 				qbp->qb_last = mp->b_prev;
2234 			else
2235 				qbp->qb_last = NULL;
2236 		}
2237 	}
2238 
2239 	/*
2240 	 * Remove the message from the list.
2241 	 */
2242 	if (mp->b_prev)
2243 		mp->b_prev->b_next = mp->b_next;
2244 	else
2245 		q->q_first = mp->b_next;
2246 	if (mp->b_next)
2247 		mp->b_next->b_prev = mp->b_prev;
2248 	else
2249 		q->q_last = mp->b_prev;
2250 	mp->b_next = NULL;
2251 	mp->b_prev = NULL;
2252 
2253 	/* Get the size of the message for q_count accounting */
2254 	bytecnt = mp_cont_len(mp, &mblkcnt);
2255 
2256 	if (mp->b_band == 0) {		/* Perform q_count accounting */
2257 		q->q_count -= bytecnt;
2258 		q->q_mblkcnt -= mblkcnt;
2259 		if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2260 		    (q->q_mblkcnt < q->q_hiwat))) {
2261 			q->q_flag &= ~QFULL;
2262 		}
2263 	} else {			/* Perform qb_count accounting */
2264 		qbp->qb_count -= bytecnt;
2265 		qbp->qb_mblkcnt -= mblkcnt;
2266 		if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) &&
2267 		    (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2268 			qbp->qb_flag &= ~QB_FULL;
2269 		}
2270 	}
2271 	if (freezer != curthread)
2272 		mutex_exit(QLOCK(q));
2273 
2274 	STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL);
2275 }
2276 
2277 /*
2278  * Empty a queue.
2279  * If flag is set, remove all messages.  Otherwise, remove
2280  * only non-control messages.  If queue falls below its low
2281  * water mark, and QWANTW is set, enable the nearest upstream
2282  * service procedure.
2283  *
2284  * Historical note: when merging the M_FLUSH code in strrput with this
2285  * code one difference was discovered. flushq did not have a check
2286  * for q_lowat == 0 in the backenabling test.
2287  *
2288  * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
2289  * if one exists on the queue.
2290  */
2291 void
2292 flushq_common(queue_t *q, int flag, int pcproto_flag)
2293 {
2294 	mblk_t *mp, *nmp;
2295 	qband_t *qbp;
2296 	int backenab = 0;
2297 	unsigned char bpri;
2298 	unsigned char	qbf[NBAND];	/* band flushing backenable flags */
2299 
2300 	if (q->q_first == NULL)
2301 		return;
2302 
2303 	mutex_enter(QLOCK(q));
2304 	mp = q->q_first;
2305 	q->q_first = NULL;
2306 	q->q_last = NULL;
2307 	q->q_count = 0;
2308 	q->q_mblkcnt = 0;
2309 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2310 		qbp->qb_first = NULL;
2311 		qbp->qb_last = NULL;
2312 		qbp->qb_count = 0;
2313 		qbp->qb_mblkcnt = 0;
2314 		qbp->qb_flag &= ~QB_FULL;
2315 	}
2316 	q->q_flag &= ~QFULL;
2317 	mutex_exit(QLOCK(q));
2318 	while (mp) {
2319 		nmp = mp->b_next;
2320 		mp->b_next = mp->b_prev = NULL;
2321 
2322 		STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL);
2323 
2324 		if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
2325 			(void) putq(q, mp);
2326 		else if (flag || datamsg(mp->b_datap->db_type))
2327 			freemsg(mp);
2328 		else
2329 			(void) putq(q, mp);
2330 		mp = nmp;
2331 	}
2332 	bpri = 1;
2333 	mutex_enter(QLOCK(q));
2334 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2335 		if ((qbp->qb_flag & QB_WANTW) &&
2336 		    (((qbp->qb_count < qbp->qb_lowat) &&
2337 		    (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
2338 		    qbp->qb_lowat == 0)) {
2339 			qbp->qb_flag &= ~QB_WANTW;
2340 			backenab = 1;
2341 			qbf[bpri] = 1;
2342 		} else
2343 			qbf[bpri] = 0;
2344 		bpri++;
2345 	}
2346 	ASSERT(bpri == (unsigned char)(q->q_nband + 1));
2347 	if ((q->q_flag & QWANTW) &&
2348 	    (((q->q_count < q->q_lowat) &&
2349 	    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2350 		q->q_flag &= ~QWANTW;
2351 		backenab = 1;
2352 		qbf[0] = 1;
2353 	} else
2354 		qbf[0] = 0;
2355 
2356 	/*
2357 	 * If any band can now be written to, and there is a writer
2358 	 * for that band, then backenable the closest service procedure.
2359 	 */
2360 	if (backenab) {
2361 		mutex_exit(QLOCK(q));
2362 		for (bpri = q->q_nband; bpri != 0; bpri--)
2363 			if (qbf[bpri])
2364 				backenable(q, bpri);
2365 		if (qbf[0])
2366 			backenable(q, 0);
2367 	} else
2368 		mutex_exit(QLOCK(q));
2369 }
2370 
2371 /*
2372  * The real flushing takes place in flushq_common. This is done so that
2373  * a flag which specifies whether or not M_PCPROTO messages should be flushed
2374  * or not. Currently the only place that uses this flag is the stream head.
2375  */
2376 void
2377 flushq(queue_t *q, int flag)
2378 {
2379 	flushq_common(q, flag, 0);
2380 }
2381 
2382 /*
2383  * Flush the queue of messages of the given priority band.
2384  * There is some duplication of code between flushq and flushband.
2385  * This is because we want to optimize the code as much as possible.
2386  * The assumption is that there will be more messages in the normal
2387  * (priority 0) band than in any other.
2388  *
2389  * Historical note: when merging the M_FLUSH code in strrput with this
2390  * code one difference was discovered. flushband had an extra check for
2391  * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
2392  * case. That check does not match the man page for flushband and was not
2393  * in the strrput flush code hence it was removed.
2394  */
2395 void
2396 flushband(queue_t *q, unsigned char pri, int flag)
2397 {
2398 	mblk_t *mp;
2399 	mblk_t *nmp;
2400 	mblk_t *last;
2401 	qband_t *qbp;
2402 	int band;
2403 
2404 	ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
2405 	if (pri > q->q_nband) {
2406 		return;
2407 	}
2408 	mutex_enter(QLOCK(q));
2409 	if (pri == 0) {
2410 		mp = q->q_first;
2411 		q->q_first = NULL;
2412 		q->q_last = NULL;
2413 		q->q_count = 0;
2414 		q->q_mblkcnt = 0;
2415 		for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2416 			qbp->qb_first = NULL;
2417 			qbp->qb_last = NULL;
2418 			qbp->qb_count = 0;
2419 			qbp->qb_mblkcnt = 0;
2420 			qbp->qb_flag &= ~QB_FULL;
2421 		}
2422 		q->q_flag &= ~QFULL;
2423 		mutex_exit(QLOCK(q));
2424 		while (mp) {
2425 			nmp = mp->b_next;
2426 			mp->b_next = mp->b_prev = NULL;
2427 			if ((mp->b_band == 0) &&
2428 			    ((flag == FLUSHALL) ||
2429 			    datamsg(mp->b_datap->db_type)))
2430 				freemsg(mp);
2431 			else
2432 				(void) putq(q, mp);
2433 			mp = nmp;
2434 		}
2435 		mutex_enter(QLOCK(q));
2436 		if ((q->q_flag & QWANTW) &&
2437 		    (((q->q_count < q->q_lowat) &&
2438 		    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2439 			q->q_flag &= ~QWANTW;
2440 			mutex_exit(QLOCK(q));
2441 
2442 			backenable(q, pri);
2443 		} else
2444 			mutex_exit(QLOCK(q));
2445 	} else {	/* pri != 0 */
2446 		boolean_t flushed = B_FALSE;
2447 		band = pri;
2448 
2449 		ASSERT(MUTEX_HELD(QLOCK(q)));
2450 		qbp = q->q_bandp;
2451 		while (--band > 0)
2452 			qbp = qbp->qb_next;
2453 		mp = qbp->qb_first;
2454 		if (mp == NULL) {
2455 			mutex_exit(QLOCK(q));
2456 			return;
2457 		}
2458 		last = qbp->qb_last->b_next;
2459 		/*
2460 		 * rmvq_noenab() and freemsg() are called for each mblk that
2461 		 * meets the criteria.  The loop is executed until the last
2462 		 * mblk has been processed.
2463 		 */
2464 		while (mp != last) {
2465 			ASSERT(mp->b_band == pri);
2466 			nmp = mp->b_next;
2467 			if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
2468 				rmvq_noenab(q, mp);
2469 				freemsg(mp);
2470 				flushed = B_TRUE;
2471 			}
2472 			mp = nmp;
2473 		}
2474 		mutex_exit(QLOCK(q));
2475 
2476 		/*
2477 		 * If any mblk(s) has been freed, we know that qbackenable()
2478 		 * will need to be called.
2479 		 */
2480 		if (flushed)
2481 			qbackenable(q, pri);
2482 	}
2483 }
2484 
2485 /*
2486  * Return 1 if the queue is not full.  If the queue is full, return
2487  * 0 (may not put message) and set QWANTW flag (caller wants to write
2488  * to the queue).
2489  */
2490 int
2491 canput(queue_t *q)
2492 {
2493 	TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
2494 
2495 	/* this is for loopback transports, they should not do a canput */
2496 	ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
2497 
2498 	/* Find next forward module that has a service procedure */
2499 	q = q->q_nfsrv;
2500 
2501 	if (!(q->q_flag & QFULL)) {
2502 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2503 		return (1);
2504 	}
2505 	mutex_enter(QLOCK(q));
2506 	if (q->q_flag & QFULL) {
2507 		q->q_flag |= QWANTW;
2508 		mutex_exit(QLOCK(q));
2509 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
2510 		return (0);
2511 	}
2512 	mutex_exit(QLOCK(q));
2513 	TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2514 	return (1);
2515 }
2516 
2517 /*
2518  * This is the new canput for use with priority bands.  Return 1 if the
2519  * band is not full.  If the band is full, return 0 (may not put message)
2520  * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
2521  * write to the queue).
2522  */
2523 int
2524 bcanput(queue_t *q, unsigned char pri)
2525 {
2526 	qband_t *qbp;
2527 
2528 	TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
2529 	if (!q)
2530 		return (0);
2531 
2532 	/* Find next forward module that has a service procedure */
2533 	q = q->q_nfsrv;
2534 
2535 	mutex_enter(QLOCK(q));
2536 	if (pri == 0) {
2537 		if (q->q_flag & QFULL) {
2538 			q->q_flag |= QWANTW;
2539 			mutex_exit(QLOCK(q));
2540 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2541 			    "bcanput:%p %X %d", q, pri, 0);
2542 			return (0);
2543 		}
2544 	} else {	/* pri != 0 */
2545 		if (pri > q->q_nband) {
2546 			/*
2547 			 * No band exists yet, so return success.
2548 			 */
2549 			mutex_exit(QLOCK(q));
2550 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2551 			    "bcanput:%p %X %d", q, pri, 1);
2552 			return (1);
2553 		}
2554 		qbp = q->q_bandp;
2555 		while (--pri)
2556 			qbp = qbp->qb_next;
2557 		if (qbp->qb_flag & QB_FULL) {
2558 			qbp->qb_flag |= QB_WANTW;
2559 			mutex_exit(QLOCK(q));
2560 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2561 			    "bcanput:%p %X %d", q, pri, 0);
2562 			return (0);
2563 		}
2564 	}
2565 	mutex_exit(QLOCK(q));
2566 	TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2567 	    "bcanput:%p %X %d", q, pri, 1);
2568 	return (1);
2569 }
2570 
2571 /*
2572  * Put a message on a queue.
2573  *
2574  * Messages are enqueued on a priority basis.  The priority classes
2575  * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
2576  * and B_NORMAL (type < QPCTL && band == 0).
2577  *
2578  * Add appropriate weighted data block sizes to queue count.
2579  * If queue hits high water mark then set QFULL flag.
2580  *
2581  * If QNOENAB is not set (putq is allowed to enable the queue),
2582  * enable the queue only if the message is PRIORITY,
2583  * or the QWANTR flag is set (indicating that the service procedure
2584  * is ready to read the queue.  This implies that a service
2585  * procedure must NEVER put a high priority message back on its own
2586  * queue, as this would result in an infinite loop (!).
2587  */
2588 int
2589 putq(queue_t *q, mblk_t *bp)
2590 {
2591 	mblk_t *tmp;
2592 	qband_t *qbp = NULL;
2593 	int mcls = (int)queclass(bp);
2594 	kthread_id_t freezer;
2595 	int	bytecnt = 0, mblkcnt = 0;
2596 
2597 	freezer = STREAM(q)->sd_freezer;
2598 	if (freezer == curthread) {
2599 		ASSERT(frozenstr(q));
2600 		ASSERT(MUTEX_HELD(QLOCK(q)));
2601 	} else
2602 		mutex_enter(QLOCK(q));
2603 
2604 	/*
2605 	 * Make sanity checks and if qband structure is not yet
2606 	 * allocated, do so.
2607 	 */
2608 	if (mcls == QPCTL) {
2609 		if (bp->b_band != 0)
2610 			bp->b_band = 0;		/* force to be correct */
2611 	} else if (bp->b_band != 0) {
2612 		int i;
2613 		qband_t **qbpp;
2614 
2615 		if (bp->b_band > q->q_nband) {
2616 
2617 			/*
2618 			 * The qband structure for this priority band is
2619 			 * not on the queue yet, so we have to allocate
2620 			 * one on the fly.  It would be wasteful to
2621 			 * associate the qband structures with every
2622 			 * queue when the queues are allocated.  This is
2623 			 * because most queues will only need the normal
2624 			 * band of flow which can be described entirely
2625 			 * by the queue itself.
2626 			 */
2627 			qbpp = &q->q_bandp;
2628 			while (*qbpp)
2629 				qbpp = &(*qbpp)->qb_next;
2630 			while (bp->b_band > q->q_nband) {
2631 				if ((*qbpp = allocband()) == NULL) {
2632 					if (freezer != curthread)
2633 						mutex_exit(QLOCK(q));
2634 					return (0);
2635 				}
2636 				(*qbpp)->qb_hiwat = q->q_hiwat;
2637 				(*qbpp)->qb_lowat = q->q_lowat;
2638 				q->q_nband++;
2639 				qbpp = &(*qbpp)->qb_next;
2640 			}
2641 		}
2642 		ASSERT(MUTEX_HELD(QLOCK(q)));
2643 		qbp = q->q_bandp;
2644 		i = bp->b_band;
2645 		while (--i)
2646 			qbp = qbp->qb_next;
2647 	}
2648 
2649 	/*
2650 	 * If queue is empty, add the message and initialize the pointers.
2651 	 * Otherwise, adjust message pointers and queue pointers based on
2652 	 * the type of the message and where it belongs on the queue.  Some
2653 	 * code is duplicated to minimize the number of conditionals and
2654 	 * hopefully minimize the amount of time this routine takes.
2655 	 */
2656 	if (!q->q_first) {
2657 		bp->b_next = NULL;
2658 		bp->b_prev = NULL;
2659 		q->q_first = bp;
2660 		q->q_last = bp;
2661 		if (qbp) {
2662 			qbp->qb_first = bp;
2663 			qbp->qb_last = bp;
2664 		}
2665 	} else if (!qbp) {	/* bp->b_band == 0 */
2666 
2667 		/*
2668 		 * If queue class of message is less than or equal to
2669 		 * that of the last one on the queue, tack on to the end.
2670 		 */
2671 		tmp = q->q_last;
2672 		if (mcls <= (int)queclass(tmp)) {
2673 			bp->b_next = NULL;
2674 			bp->b_prev = tmp;
2675 			tmp->b_next = bp;
2676 			q->q_last = bp;
2677 		} else {
2678 			tmp = q->q_first;
2679 			while ((int)queclass(tmp) >= mcls)
2680 				tmp = tmp->b_next;
2681 
2682 			/*
2683 			 * Insert bp before tmp.
2684 			 */
2685 			bp->b_next = tmp;
2686 			bp->b_prev = tmp->b_prev;
2687 			if (tmp->b_prev)
2688 				tmp->b_prev->b_next = bp;
2689 			else
2690 				q->q_first = bp;
2691 			tmp->b_prev = bp;
2692 		}
2693 	} else {		/* bp->b_band != 0 */
2694 		if (qbp->qb_first) {
2695 			tmp = qbp->qb_last;
2696 
2697 			/*
2698 			 * Insert bp after the last message in this band.
2699 			 */
2700 			bp->b_next = tmp->b_next;
2701 			if (tmp->b_next)
2702 				tmp->b_next->b_prev = bp;
2703 			else
2704 				q->q_last = bp;
2705 			bp->b_prev = tmp;
2706 			tmp->b_next = bp;
2707 		} else {
2708 			tmp = q->q_last;
2709 			if ((mcls < (int)queclass(tmp)) ||
2710 			    (bp->b_band <= tmp->b_band)) {
2711 
2712 				/*
2713 				 * Tack bp on end of queue.
2714 				 */
2715 				bp->b_next = NULL;
2716 				bp->b_prev = tmp;
2717 				tmp->b_next = bp;
2718 				q->q_last = bp;
2719 			} else {
2720 				tmp = q->q_first;
2721 				while (tmp->b_datap->db_type >= QPCTL)
2722 					tmp = tmp->b_next;
2723 				while (tmp->b_band >= bp->b_band)
2724 					tmp = tmp->b_next;
2725 
2726 				/*
2727 				 * Insert bp before tmp.
2728 				 */
2729 				bp->b_next = tmp;
2730 				bp->b_prev = tmp->b_prev;
2731 				if (tmp->b_prev)
2732 					tmp->b_prev->b_next = bp;
2733 				else
2734 					q->q_first = bp;
2735 				tmp->b_prev = bp;
2736 			}
2737 			qbp->qb_first = bp;
2738 		}
2739 		qbp->qb_last = bp;
2740 	}
2741 
2742 	/* Get message byte count for q_count accounting */
2743 	bytecnt = mp_cont_len(bp, &mblkcnt);
2744 
2745 	if (qbp) {
2746 		qbp->qb_count += bytecnt;
2747 		qbp->qb_mblkcnt += mblkcnt;
2748 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2749 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2750 			qbp->qb_flag |= QB_FULL;
2751 		}
2752 	} else {
2753 		q->q_count += bytecnt;
2754 		q->q_mblkcnt += mblkcnt;
2755 		if ((q->q_count >= q->q_hiwat) ||
2756 		    (q->q_mblkcnt >= q->q_hiwat)) {
2757 			q->q_flag |= QFULL;
2758 		}
2759 	}
2760 
2761 	STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL);
2762 
2763 	if ((mcls > QNORM) ||
2764 	    (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
2765 		qenable_locked(q);
2766 	ASSERT(MUTEX_HELD(QLOCK(q)));
2767 	if (freezer != curthread)
2768 		mutex_exit(QLOCK(q));
2769 
2770 	return (1);
2771 }
2772 
2773 /*
2774  * Put stuff back at beginning of Q according to priority order.
2775  * See comment on putq above for details.
2776  */
2777 int
2778 putbq(queue_t *q, mblk_t *bp)
2779 {
2780 	mblk_t *tmp;
2781 	qband_t *qbp = NULL;
2782 	int mcls = (int)queclass(bp);
2783 	kthread_id_t freezer;
2784 	int	bytecnt = 0, mblkcnt = 0;
2785 
2786 	ASSERT(q && bp);
2787 	ASSERT(bp->b_next == NULL);
2788 	freezer = STREAM(q)->sd_freezer;
2789 	if (freezer == curthread) {
2790 		ASSERT(frozenstr(q));
2791 		ASSERT(MUTEX_HELD(QLOCK(q)));
2792 	} else
2793 		mutex_enter(QLOCK(q));
2794 
2795 	/*
2796 	 * Make sanity checks and if qband structure is not yet
2797 	 * allocated, do so.
2798 	 */
2799 	if (mcls == QPCTL) {
2800 		if (bp->b_band != 0)
2801 			bp->b_band = 0;		/* force to be correct */
2802 	} else if (bp->b_band != 0) {
2803 		int i;
2804 		qband_t **qbpp;
2805 
2806 		if (bp->b_band > q->q_nband) {
2807 			qbpp = &q->q_bandp;
2808 			while (*qbpp)
2809 				qbpp = &(*qbpp)->qb_next;
2810 			while (bp->b_band > q->q_nband) {
2811 				if ((*qbpp = allocband()) == NULL) {
2812 					if (freezer != curthread)
2813 						mutex_exit(QLOCK(q));
2814 					return (0);
2815 				}
2816 				(*qbpp)->qb_hiwat = q->q_hiwat;
2817 				(*qbpp)->qb_lowat = q->q_lowat;
2818 				q->q_nband++;
2819 				qbpp = &(*qbpp)->qb_next;
2820 			}
2821 		}
2822 		qbp = q->q_bandp;
2823 		i = bp->b_band;
2824 		while (--i)
2825 			qbp = qbp->qb_next;
2826 	}
2827 
2828 	/*
2829 	 * If queue is empty or if message is high priority,
2830 	 * place on the front of the queue.
2831 	 */
2832 	tmp = q->q_first;
2833 	if ((!tmp) || (mcls == QPCTL)) {
2834 		bp->b_next = tmp;
2835 		if (tmp)
2836 			tmp->b_prev = bp;
2837 		else
2838 			q->q_last = bp;
2839 		q->q_first = bp;
2840 		bp->b_prev = NULL;
2841 		if (qbp) {
2842 			qbp->qb_first = bp;
2843 			qbp->qb_last = bp;
2844 		}
2845 	} else if (qbp) {	/* bp->b_band != 0 */
2846 		tmp = qbp->qb_first;
2847 		if (tmp) {
2848 
2849 			/*
2850 			 * Insert bp before the first message in this band.
2851 			 */
2852 			bp->b_next = tmp;
2853 			bp->b_prev = tmp->b_prev;
2854 			if (tmp->b_prev)
2855 				tmp->b_prev->b_next = bp;
2856 			else
2857 				q->q_first = bp;
2858 			tmp->b_prev = bp;
2859 		} else {
2860 			tmp = q->q_last;
2861 			if ((mcls < (int)queclass(tmp)) ||
2862 			    (bp->b_band < tmp->b_band)) {
2863 
2864 				/*
2865 				 * Tack bp on end of queue.
2866 				 */
2867 				bp->b_next = NULL;
2868 				bp->b_prev = tmp;
2869 				tmp->b_next = bp;
2870 				q->q_last = bp;
2871 			} else {
2872 				tmp = q->q_first;
2873 				while (tmp->b_datap->db_type >= QPCTL)
2874 					tmp = tmp->b_next;
2875 				while (tmp->b_band > bp->b_band)
2876 					tmp = tmp->b_next;
2877 
2878 				/*
2879 				 * Insert bp before tmp.
2880 				 */
2881 				bp->b_next = tmp;
2882 				bp->b_prev = tmp->b_prev;
2883 				if (tmp->b_prev)
2884 					tmp->b_prev->b_next = bp;
2885 				else
2886 					q->q_first = bp;
2887 				tmp->b_prev = bp;
2888 			}
2889 			qbp->qb_last = bp;
2890 		}
2891 		qbp->qb_first = bp;
2892 	} else {		/* bp->b_band == 0 && !QPCTL */
2893 
2894 		/*
2895 		 * If the queue class or band is less than that of the last
2896 		 * message on the queue, tack bp on the end of the queue.
2897 		 */
2898 		tmp = q->q_last;
2899 		if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
2900 			bp->b_next = NULL;
2901 			bp->b_prev = tmp;
2902 			tmp->b_next = bp;
2903 			q->q_last = bp;
2904 		} else {
2905 			tmp = q->q_first;
2906 			while (tmp->b_datap->db_type >= QPCTL)
2907 				tmp = tmp->b_next;
2908 			while (tmp->b_band > bp->b_band)
2909 				tmp = tmp->b_next;
2910 
2911 			/*
2912 			 * Insert bp before tmp.
2913 			 */
2914 			bp->b_next = tmp;
2915 			bp->b_prev = tmp->b_prev;
2916 			if (tmp->b_prev)
2917 				tmp->b_prev->b_next = bp;
2918 			else
2919 				q->q_first = bp;
2920 			tmp->b_prev = bp;
2921 		}
2922 	}
2923 
2924 	/* Get message byte count for q_count accounting */
2925 	bytecnt = mp_cont_len(bp, &mblkcnt);
2926 
2927 	if (qbp) {
2928 		qbp->qb_count += bytecnt;
2929 		qbp->qb_mblkcnt += mblkcnt;
2930 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2931 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2932 			qbp->qb_flag |= QB_FULL;
2933 		}
2934 	} else {
2935 		q->q_count += bytecnt;
2936 		q->q_mblkcnt += mblkcnt;
2937 		if ((q->q_count >= q->q_hiwat) ||
2938 		    (q->q_mblkcnt >= q->q_hiwat)) {
2939 			q->q_flag |= QFULL;
2940 		}
2941 	}
2942 
2943 	STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL);
2944 
2945 	if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
2946 		qenable_locked(q);
2947 	ASSERT(MUTEX_HELD(QLOCK(q)));
2948 	if (freezer != curthread)
2949 		mutex_exit(QLOCK(q));
2950 
2951 	return (1);
2952 }
2953 
2954 /*
2955  * Insert a message before an existing message on the queue.  If the
2956  * existing message is NULL, the new messages is placed on the end of
2957  * the queue.  The queue class of the new message is ignored.  However,
2958  * the priority band of the new message must adhere to the following
2959  * ordering:
2960  *
2961  *	emp->b_prev->b_band >= mp->b_band >= emp->b_band.
2962  *
2963  * All flow control parameters are updated.
2964  *
2965  * insq can be called with the stream frozen, but other utility functions
2966  * holding QLOCK, and by streams modules without any locks/frozen.
2967  */
2968 int
2969 insq(queue_t *q, mblk_t *emp, mblk_t *mp)
2970 {
2971 	mblk_t *tmp;
2972 	qband_t *qbp = NULL;
2973 	int mcls = (int)queclass(mp);
2974 	kthread_id_t freezer;
2975 	int	bytecnt = 0, mblkcnt = 0;
2976 
2977 	freezer = STREAM(q)->sd_freezer;
2978 	if (freezer == curthread) {
2979 		ASSERT(frozenstr(q));
2980 		ASSERT(MUTEX_HELD(QLOCK(q)));
2981 	} else if (MUTEX_HELD(QLOCK(q))) {
2982 		/* Don't drop lock on exit */
2983 		freezer = curthread;
2984 	} else
2985 		mutex_enter(QLOCK(q));
2986 
2987 	if (mcls == QPCTL) {
2988 		if (mp->b_band != 0)
2989 			mp->b_band = 0;		/* force to be correct */
2990 		if (emp && emp->b_prev &&
2991 		    (emp->b_prev->b_datap->db_type < QPCTL))
2992 			goto badord;
2993 	}
2994 	if (emp) {
2995 		if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
2996 		    (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
2997 		    (emp->b_prev->b_band < mp->b_band))) {
2998 			goto badord;
2999 		}
3000 	} else {
3001 		tmp = q->q_last;
3002 		if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
3003 badord:
3004 			cmn_err(CE_WARN,
3005 			    "insq: attempt to insert message out of order "
3006 			    "on q %p", (void *)q);
3007 			if (freezer != curthread)
3008 				mutex_exit(QLOCK(q));
3009 			return (0);
3010 		}
3011 	}
3012 
3013 	if (mp->b_band != 0) {
3014 		int i;
3015 		qband_t **qbpp;
3016 
3017 		if (mp->b_band > q->q_nband) {
3018 			qbpp = &q->q_bandp;
3019 			while (*qbpp)
3020 				qbpp = &(*qbpp)->qb_next;
3021 			while (mp->b_band > q->q_nband) {
3022 				if ((*qbpp = allocband()) == NULL) {
3023 					if (freezer != curthread)
3024 						mutex_exit(QLOCK(q));
3025 					return (0);
3026 				}
3027 				(*qbpp)->qb_hiwat = q->q_hiwat;
3028 				(*qbpp)->qb_lowat = q->q_lowat;
3029 				q->q_nband++;
3030 				qbpp = &(*qbpp)->qb_next;
3031 			}
3032 		}
3033 		qbp = q->q_bandp;
3034 		i = mp->b_band;
3035 		while (--i)
3036 			qbp = qbp->qb_next;
3037 	}
3038 
3039 	if ((mp->b_next = emp) != NULL) {
3040 		if ((mp->b_prev = emp->b_prev) != NULL)
3041 			emp->b_prev->b_next = mp;
3042 		else
3043 			q->q_first = mp;
3044 		emp->b_prev = mp;
3045 	} else {
3046 		if ((mp->b_prev = q->q_last) != NULL)
3047 			q->q_last->b_next = mp;
3048 		else
3049 			q->q_first = mp;
3050 		q->q_last = mp;
3051 	}
3052 
3053 	/* Get mblk and byte count for q_count accounting */
3054 	bytecnt = mp_cont_len(mp, &mblkcnt);
3055 
3056 	if (qbp) {	/* adjust qband pointers and count */
3057 		if (!qbp->qb_first) {
3058 			qbp->qb_first = mp;
3059 			qbp->qb_last = mp;
3060 		} else {
3061 			if (mp->b_prev == NULL || (mp->b_prev != NULL &&
3062 			    (mp->b_prev->b_band != mp->b_band)))
3063 				qbp->qb_first = mp;
3064 			else if (mp->b_next == NULL || (mp->b_next != NULL &&
3065 			    (mp->b_next->b_band != mp->b_band)))
3066 				qbp->qb_last = mp;
3067 		}
3068 		qbp->qb_count += bytecnt;
3069 		qbp->qb_mblkcnt += mblkcnt;
3070 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
3071 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
3072 			qbp->qb_flag |= QB_FULL;
3073 		}
3074 	} else {
3075 		q->q_count += bytecnt;
3076 		q->q_mblkcnt += mblkcnt;
3077 		if ((q->q_count >= q->q_hiwat) ||
3078 		    (q->q_mblkcnt >= q->q_hiwat)) {
3079 			q->q_flag |= QFULL;
3080 		}
3081 	}
3082 
3083 	STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL);
3084 
3085 	if (canenable(q) && (q->q_flag & QWANTR))
3086 		qenable_locked(q);
3087 
3088 	ASSERT(MUTEX_HELD(QLOCK(q)));
3089 	if (freezer != curthread)
3090 		mutex_exit(QLOCK(q));
3091 
3092 	return (1);
3093 }
3094 
3095 /*
3096  * Create and put a control message on queue.
3097  */
3098 int
3099 putctl(queue_t *q, int type)
3100 {
3101 	mblk_t *bp;
3102 
3103 	if ((datamsg(type) && (type != M_DELAY)) ||
3104 	    (bp = allocb_tryhard(0)) == NULL)
3105 		return (0);
3106 	bp->b_datap->db_type = (unsigned char) type;
3107 
3108 	put(q, bp);
3109 
3110 	return (1);
3111 }
3112 
3113 /*
3114  * Control message with a single-byte parameter
3115  */
3116 int
3117 putctl1(queue_t *q, int type, int param)
3118 {
3119 	mblk_t *bp;
3120 
3121 	if ((datamsg(type) && (type != M_DELAY)) ||
3122 	    (bp = allocb_tryhard(1)) == NULL)
3123 		return (0);
3124 	bp->b_datap->db_type = (unsigned char)type;
3125 	*bp->b_wptr++ = (unsigned char)param;
3126 
3127 	put(q, bp);
3128 
3129 	return (1);
3130 }
3131 
3132 int
3133 putnextctl1(queue_t *q, int type, int param)
3134 {
3135 	mblk_t *bp;
3136 
3137 	if ((datamsg(type) && (type != M_DELAY)) ||
3138 	    ((bp = allocb_tryhard(1)) == NULL))
3139 		return (0);
3140 
3141 	bp->b_datap->db_type = (unsigned char)type;
3142 	*bp->b_wptr++ = (unsigned char)param;
3143 
3144 	putnext(q, bp);
3145 
3146 	return (1);
3147 }
3148 
3149 int
3150 putnextctl(queue_t *q, int type)
3151 {
3152 	mblk_t *bp;
3153 
3154 	if ((datamsg(type) && (type != M_DELAY)) ||
3155 	    ((bp = allocb_tryhard(0)) == NULL))
3156 		return (0);
3157 	bp->b_datap->db_type = (unsigned char)type;
3158 
3159 	putnext(q, bp);
3160 
3161 	return (1);
3162 }
3163 
3164 /*
3165  * Return the queue upstream from this one
3166  */
3167 queue_t *
3168 backq(queue_t *q)
3169 {
3170 	q = _OTHERQ(q);
3171 	if (q->q_next) {
3172 		q = q->q_next;
3173 		return (_OTHERQ(q));
3174 	}
3175 	return (NULL);
3176 }
3177 
3178 /*
3179  * Send a block back up the queue in reverse from this
3180  * one (e.g. to respond to ioctls)
3181  */
3182 void
3183 qreply(queue_t *q, mblk_t *bp)
3184 {
3185 	ASSERT(q && bp);
3186 
3187 	putnext(_OTHERQ(q), bp);
3188 }
3189 
3190 /*
3191  * Streams Queue Scheduling
3192  *
3193  * Queues are enabled through qenable() when they have messages to
3194  * process.  They are serviced by queuerun(), which runs each enabled
3195  * queue's service procedure.  The call to queuerun() is processor
3196  * dependent - the general principle is that it be run whenever a queue
3197  * is enabled but before returning to user level.  For system calls,
3198  * the function runqueues() is called if their action causes a queue
3199  * to be enabled.  For device interrupts, queuerun() should be
3200  * called before returning from the last level of interrupt.  Beyond
3201  * this, no timing assumptions should be made about queue scheduling.
3202  */
3203 
3204 /*
3205  * Enable a queue: put it on list of those whose service procedures are
3206  * ready to run and set up the scheduling mechanism.
3207  * The broadcast is done outside the mutex -> to avoid the woken thread
3208  * from contending with the mutex. This is OK 'cos the queue has been
3209  * enqueued on the runlist and flagged safely at this point.
3210  */
3211 void
3212 qenable(queue_t *q)
3213 {
3214 	mutex_enter(QLOCK(q));
3215 	qenable_locked(q);
3216 	mutex_exit(QLOCK(q));
3217 }
3218 /*
3219  * Return number of messages on queue
3220  */
3221 int
3222 qsize(queue_t *qp)
3223 {
3224 	int count = 0;
3225 	mblk_t *mp;
3226 
3227 	mutex_enter(QLOCK(qp));
3228 	for (mp = qp->q_first; mp; mp = mp->b_next)
3229 		count++;
3230 	mutex_exit(QLOCK(qp));
3231 	return (count);
3232 }
3233 
3234 /*
3235  * noenable - set queue so that putq() will not enable it.
3236  * enableok - set queue so that putq() can enable it.
3237  */
3238 void
3239 noenable(queue_t *q)
3240 {
3241 	mutex_enter(QLOCK(q));
3242 	q->q_flag |= QNOENB;
3243 	mutex_exit(QLOCK(q));
3244 }
3245 
3246 void
3247 enableok(queue_t *q)
3248 {
3249 	mutex_enter(QLOCK(q));
3250 	q->q_flag &= ~QNOENB;
3251 	mutex_exit(QLOCK(q));
3252 }
3253 
3254 /*
3255  * Set queue fields.
3256  */
3257 int
3258 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
3259 {
3260 	qband_t *qbp = NULL;
3261 	queue_t	*wrq;
3262 	int error = 0;
3263 	kthread_id_t freezer;
3264 
3265 	freezer = STREAM(q)->sd_freezer;
3266 	if (freezer == curthread) {
3267 		ASSERT(frozenstr(q));
3268 		ASSERT(MUTEX_HELD(QLOCK(q)));
3269 	} else
3270 		mutex_enter(QLOCK(q));
3271 
3272 	if (what >= QBAD) {
3273 		error = EINVAL;
3274 		goto done;
3275 	}
3276 	if (pri != 0) {
3277 		int i;
3278 		qband_t **qbpp;
3279 
3280 		if (pri > q->q_nband) {
3281 			qbpp = &q->q_bandp;
3282 			while (*qbpp)
3283 				qbpp = &(*qbpp)->qb_next;
3284 			while (pri > q->q_nband) {
3285 				if ((*qbpp = allocband()) == NULL) {
3286 					error = EAGAIN;
3287 					goto done;
3288 				}
3289 				(*qbpp)->qb_hiwat = q->q_hiwat;
3290 				(*qbpp)->qb_lowat = q->q_lowat;
3291 				q->q_nband++;
3292 				qbpp = &(*qbpp)->qb_next;
3293 			}
3294 		}
3295 		qbp = q->q_bandp;
3296 		i = pri;
3297 		while (--i)
3298 			qbp = qbp->qb_next;
3299 	}
3300 	switch (what) {
3301 
3302 	case QHIWAT:
3303 		if (qbp)
3304 			qbp->qb_hiwat = (size_t)val;
3305 		else
3306 			q->q_hiwat = (size_t)val;
3307 		break;
3308 
3309 	case QLOWAT:
3310 		if (qbp)
3311 			qbp->qb_lowat = (size_t)val;
3312 		else
3313 			q->q_lowat = (size_t)val;
3314 		break;
3315 
3316 	case QMAXPSZ:
3317 		if (qbp)
3318 			error = EINVAL;
3319 		else
3320 			q->q_maxpsz = (ssize_t)val;
3321 
3322 		/*
3323 		 * Performance concern, strwrite looks at the module below
3324 		 * the stream head for the maxpsz each time it does a write
3325 		 * we now cache it at the stream head.  Check to see if this
3326 		 * queue is sitting directly below the stream head.
3327 		 */
3328 		wrq = STREAM(q)->sd_wrq;
3329 		if (q != wrq->q_next)
3330 			break;
3331 
3332 		/*
3333 		 * If the stream is not frozen drop the current QLOCK and
3334 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3335 		 */
3336 		if (freezer != curthread) {
3337 			mutex_exit(QLOCK(q));
3338 			mutex_enter(QLOCK(wrq));
3339 		}
3340 		ASSERT(MUTEX_HELD(QLOCK(wrq)));
3341 
3342 		if (strmsgsz != 0) {
3343 			if (val == INFPSZ)
3344 				val = strmsgsz;
3345 			else  {
3346 				if (STREAM(q)->sd_vnode->v_type == VFIFO)
3347 					val = MIN(PIPE_BUF, val);
3348 				else
3349 					val = MIN(strmsgsz, val);
3350 			}
3351 		}
3352 		STREAM(q)->sd_qn_maxpsz = val;
3353 		if (freezer != curthread) {
3354 			mutex_exit(QLOCK(wrq));
3355 			mutex_enter(QLOCK(q));
3356 		}
3357 		break;
3358 
3359 	case QMINPSZ:
3360 		if (qbp)
3361 			error = EINVAL;
3362 		else
3363 			q->q_minpsz = (ssize_t)val;
3364 
3365 		/*
3366 		 * Performance concern, strwrite looks at the module below
3367 		 * the stream head for the maxpsz each time it does a write
3368 		 * we now cache it at the stream head.  Check to see if this
3369 		 * queue is sitting directly below the stream head.
3370 		 */
3371 		wrq = STREAM(q)->sd_wrq;
3372 		if (q != wrq->q_next)
3373 			break;
3374 
3375 		/*
3376 		 * If the stream is not frozen drop the current QLOCK and
3377 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3378 		 */
3379 		if (freezer != curthread) {
3380 			mutex_exit(QLOCK(q));
3381 			mutex_enter(QLOCK(wrq));
3382 		}
3383 		STREAM(q)->sd_qn_minpsz = (ssize_t)val;
3384 
3385 		if (freezer != curthread) {
3386 			mutex_exit(QLOCK(wrq));
3387 			mutex_enter(QLOCK(q));
3388 		}
3389 		break;
3390 
3391 	case QSTRUIOT:
3392 		if (qbp)
3393 			error = EINVAL;
3394 		else
3395 			q->q_struiot = (ushort_t)val;
3396 		break;
3397 
3398 	case QCOUNT:
3399 	case QFIRST:
3400 	case QLAST:
3401 	case QFLAG:
3402 		error = EPERM;
3403 		break;
3404 
3405 	default:
3406 		error = EINVAL;
3407 		break;
3408 	}
3409 done:
3410 	if (freezer != curthread)
3411 		mutex_exit(QLOCK(q));
3412 	return (error);
3413 }
3414 
3415 /*
3416  * Get queue fields.
3417  */
3418 int
3419 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
3420 {
3421 	qband_t 	*qbp = NULL;
3422 	int 		error = 0;
3423 	kthread_id_t 	freezer;
3424 
3425 	freezer = STREAM(q)->sd_freezer;
3426 	if (freezer == curthread) {
3427 		ASSERT(frozenstr(q));
3428 		ASSERT(MUTEX_HELD(QLOCK(q)));
3429 	} else
3430 		mutex_enter(QLOCK(q));
3431 	if (what >= QBAD) {
3432 		error = EINVAL;
3433 		goto done;
3434 	}
3435 	if (pri != 0) {
3436 		int i;
3437 		qband_t **qbpp;
3438 
3439 		if (pri > q->q_nband) {
3440 			qbpp = &q->q_bandp;
3441 			while (*qbpp)
3442 				qbpp = &(*qbpp)->qb_next;
3443 			while (pri > q->q_nband) {
3444 				if ((*qbpp = allocband()) == NULL) {
3445 					error = EAGAIN;
3446 					goto done;
3447 				}
3448 				(*qbpp)->qb_hiwat = q->q_hiwat;
3449 				(*qbpp)->qb_lowat = q->q_lowat;
3450 				q->q_nband++;
3451 				qbpp = &(*qbpp)->qb_next;
3452 			}
3453 		}
3454 		qbp = q->q_bandp;
3455 		i = pri;
3456 		while (--i)
3457 			qbp = qbp->qb_next;
3458 	}
3459 	switch (what) {
3460 	case QHIWAT:
3461 		if (qbp)
3462 			*(size_t *)valp = qbp->qb_hiwat;
3463 		else
3464 			*(size_t *)valp = q->q_hiwat;
3465 		break;
3466 
3467 	case QLOWAT:
3468 		if (qbp)
3469 			*(size_t *)valp = qbp->qb_lowat;
3470 		else
3471 			*(size_t *)valp = q->q_lowat;
3472 		break;
3473 
3474 	case QMAXPSZ:
3475 		if (qbp)
3476 			error = EINVAL;
3477 		else
3478 			*(ssize_t *)valp = q->q_maxpsz;
3479 		break;
3480 
3481 	case QMINPSZ:
3482 		if (qbp)
3483 			error = EINVAL;
3484 		else
3485 			*(ssize_t *)valp = q->q_minpsz;
3486 		break;
3487 
3488 	case QCOUNT:
3489 		if (qbp)
3490 			*(size_t *)valp = qbp->qb_count;
3491 		else
3492 			*(size_t *)valp = q->q_count;
3493 		break;
3494 
3495 	case QFIRST:
3496 		if (qbp)
3497 			*(mblk_t **)valp = qbp->qb_first;
3498 		else
3499 			*(mblk_t **)valp = q->q_first;
3500 		break;
3501 
3502 	case QLAST:
3503 		if (qbp)
3504 			*(mblk_t **)valp = qbp->qb_last;
3505 		else
3506 			*(mblk_t **)valp = q->q_last;
3507 		break;
3508 
3509 	case QFLAG:
3510 		if (qbp)
3511 			*(uint_t *)valp = qbp->qb_flag;
3512 		else
3513 			*(uint_t *)valp = q->q_flag;
3514 		break;
3515 
3516 	case QSTRUIOT:
3517 		if (qbp)
3518 			error = EINVAL;
3519 		else
3520 			*(short *)valp = q->q_struiot;
3521 		break;
3522 
3523 	default:
3524 		error = EINVAL;
3525 		break;
3526 	}
3527 done:
3528 	if (freezer != curthread)
3529 		mutex_exit(QLOCK(q));
3530 	return (error);
3531 }
3532 
3533 /*
3534  * Function awakes all in cvwait/sigwait/pollwait, on one of:
3535  *	QWANTWSYNC or QWANTR or QWANTW,
3536  *
3537  * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
3538  *	 deferred wakeup will be done. Also if strpoll() in progress then a
3539  *	 deferred pollwakeup will be done.
3540  */
3541 void
3542 strwakeq(queue_t *q, int flag)
3543 {
3544 	stdata_t 	*stp = STREAM(q);
3545 	pollhead_t 	*pl;
3546 
3547 	mutex_enter(&stp->sd_lock);
3548 	pl = &stp->sd_pollist;
3549 	if (flag & QWANTWSYNC) {
3550 		ASSERT(!(q->q_flag & QREADR));
3551 		if (stp->sd_flag & WSLEEP) {
3552 			stp->sd_flag &= ~WSLEEP;
3553 			cv_broadcast(&stp->sd_wrq->q_wait);
3554 		} else {
3555 			stp->sd_wakeq |= WSLEEP;
3556 		}
3557 
3558 		mutex_exit(&stp->sd_lock);
3559 		pollwakeup(pl, POLLWRNORM);
3560 		mutex_enter(&stp->sd_lock);
3561 
3562 		if (stp->sd_sigflags & S_WRNORM)
3563 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3564 	} else if (flag & QWANTR) {
3565 		if (stp->sd_flag & RSLEEP) {
3566 			stp->sd_flag &= ~RSLEEP;
3567 			cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
3568 		} else {
3569 			stp->sd_wakeq |= RSLEEP;
3570 		}
3571 
3572 		mutex_exit(&stp->sd_lock);
3573 		pollwakeup(pl, POLLIN | POLLRDNORM);
3574 		mutex_enter(&stp->sd_lock);
3575 
3576 		{
3577 			int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
3578 
3579 			if (events)
3580 				strsendsig(stp->sd_siglist, events, 0, 0);
3581 		}
3582 	} else {
3583 		if (stp->sd_flag & WSLEEP) {
3584 			stp->sd_flag &= ~WSLEEP;
3585 			cv_broadcast(&stp->sd_wrq->q_wait);
3586 		}
3587 
3588 		mutex_exit(&stp->sd_lock);
3589 		pollwakeup(pl, POLLWRNORM);
3590 		mutex_enter(&stp->sd_lock);
3591 
3592 		if (stp->sd_sigflags & S_WRNORM)
3593 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3594 	}
3595 	mutex_exit(&stp->sd_lock);
3596 }
3597 
3598 int
3599 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
3600 {
3601 	stdata_t *stp = STREAM(q);
3602 	int typ  = STRUIOT_STANDARD;
3603 	uio_t	 *uiop = &dp->d_uio;
3604 	dblk_t	 *dbp;
3605 	ssize_t	 uiocnt;
3606 	ssize_t	 cnt;
3607 	unsigned char *ptr;
3608 	ssize_t	 resid;
3609 	int	 error = 0;
3610 	on_trap_data_t otd;
3611 	queue_t	*stwrq;
3612 
3613 	/*
3614 	 * Plumbing may change while taking the type so store the
3615 	 * queue in a temporary variable. It doesn't matter even
3616 	 * if the we take the type from the previous plumbing,
3617 	 * that's because if the plumbing has changed when we were
3618 	 * holding the queue in a temporary variable, we can continue
3619 	 * processing the message the way it would have been processed
3620 	 * in the old plumbing, without any side effects but a bit
3621 	 * extra processing for partial ip header checksum.
3622 	 *
3623 	 * This has been done to avoid holding the sd_lock which is
3624 	 * very hot.
3625 	 */
3626 
3627 	stwrq = stp->sd_struiowrq;
3628 	if (stwrq)
3629 		typ = stwrq->q_struiot;
3630 
3631 	for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
3632 		dbp = mp->b_datap;
3633 		ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
3634 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3635 		cnt = MIN(uiocnt, uiop->uio_resid);
3636 		if (!(dbp->db_struioflag & STRUIO_SPEC) ||
3637 		    (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
3638 			/*
3639 			 * Either this mblk has already been processed
3640 			 * or there is no more room in this mblk (?).
3641 			 */
3642 			continue;
3643 		}
3644 		switch (typ) {
3645 		case STRUIOT_STANDARD:
3646 			if (noblock) {
3647 				if (on_trap(&otd, OT_DATA_ACCESS)) {
3648 					no_trap();
3649 					error = EWOULDBLOCK;
3650 					goto out;
3651 				}
3652 			}
3653 			if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
3654 				if (noblock)
3655 					no_trap();
3656 				goto out;
3657 			}
3658 			if (noblock)
3659 				no_trap();
3660 			break;
3661 
3662 		default:
3663 			error = EIO;
3664 			goto out;
3665 		}
3666 		dbp->db_struioflag |= STRUIO_DONE;
3667 		dbp->db_cksumstuff += cnt;
3668 	}
3669 out:
3670 	if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
3671 		/*
3672 		 * A fault has occured and some bytes were moved to the
3673 		 * current mblk, the uio_t has already been updated by
3674 		 * the appropriate uio routine, so also update the mblk
3675 		 * to reflect this in case this same mblk chain is used
3676 		 * again (after the fault has been handled).
3677 		 */
3678 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3679 		if (uiocnt >= resid)
3680 			dbp->db_cksumstuff += resid;
3681 	}
3682 	return (error);
3683 }
3684 
3685 /*
3686  * Try to enter queue synchronously. Any attempt to enter a closing queue will
3687  * fails. The qp->q_rwcnt keeps track of the number of successful entries so
3688  * that removeq() will not try to close the queue while a thread is inside the
3689  * queue.
3690  */
3691 static boolean_t
3692 rwnext_enter(queue_t *qp)
3693 {
3694 	mutex_enter(QLOCK(qp));
3695 	if (qp->q_flag & QWCLOSE) {
3696 		mutex_exit(QLOCK(qp));
3697 		return (B_FALSE);
3698 	}
3699 	qp->q_rwcnt++;
3700 	ASSERT(qp->q_rwcnt != 0);
3701 	mutex_exit(QLOCK(qp));
3702 	return (B_TRUE);
3703 }
3704 
3705 /*
3706  * Decrease the count of threads running in sync stream queue and wake up any
3707  * threads blocked in removeq().
3708  */
3709 static void
3710 rwnext_exit(queue_t *qp)
3711 {
3712 	mutex_enter(QLOCK(qp));
3713 	qp->q_rwcnt--;
3714 	if (qp->q_flag & QWANTRMQSYNC) {
3715 		qp->q_flag &= ~QWANTRMQSYNC;
3716 		cv_broadcast(&qp->q_wait);
3717 	}
3718 	mutex_exit(QLOCK(qp));
3719 }
3720 
3721 /*
3722  * The purpose of rwnext() is to call the rw procedure of the next
3723  * (downstream) modules queue.
3724  *
3725  * treated as put entrypoint for perimeter syncronization.
3726  *
3727  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3728  * sync queues). If it is CIPUT sync queue sq_count is incremented and it does
3729  * not matter if any regular put entrypoints have been already entered. We
3730  * can't increment one of the sq_putcounts (instead of sq_count) because
3731  * qwait_rw won't know which counter to decrement.
3732  *
3733  * It would be reasonable to add the lockless FASTPUT logic.
3734  */
3735 int
3736 rwnext(queue_t *qp, struiod_t *dp)
3737 {
3738 	queue_t		*nqp;
3739 	syncq_t		*sq;
3740 	uint16_t	count;
3741 	uint16_t	flags;
3742 	struct qinit	*qi;
3743 	int		(*proc)();
3744 	struct stdata	*stp;
3745 	int		isread;
3746 	int		rval;
3747 
3748 	stp = STREAM(qp);
3749 	/*
3750 	 * Prevent q_next from changing by holding sd_lock until acquiring
3751 	 * SQLOCK. Note that a read-side rwnext from the streamhead will
3752 	 * already have sd_lock acquired. In either case sd_lock is always
3753 	 * released after acquiring SQLOCK.
3754 	 *
3755 	 * The streamhead read-side holding sd_lock when calling rwnext is
3756 	 * required to prevent a race condition were M_DATA mblks flowing
3757 	 * up the read-side of the stream could be bypassed by a rwnext()
3758 	 * down-call. In this case sd_lock acts as the streamhead perimeter.
3759 	 */
3760 	if ((nqp = _WR(qp)) == qp) {
3761 		isread = 0;
3762 		mutex_enter(&stp->sd_lock);
3763 		qp = nqp->q_next;
3764 	} else {
3765 		isread = 1;
3766 		if (nqp != stp->sd_wrq)
3767 			/* Not streamhead */
3768 			mutex_enter(&stp->sd_lock);
3769 		qp = _RD(nqp->q_next);
3770 	}
3771 	qi = qp->q_qinfo;
3772 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
3773 		/*
3774 		 * Not a synchronous module or no r/w procedure for this
3775 		 * queue, so just return EINVAL and let the caller handle it.
3776 		 */
3777 		mutex_exit(&stp->sd_lock);
3778 		return (EINVAL);
3779 	}
3780 
3781 	if (rwnext_enter(qp) == B_FALSE) {
3782 		mutex_exit(&stp->sd_lock);
3783 		return (EINVAL);
3784 	}
3785 
3786 	sq = qp->q_syncq;
3787 	mutex_enter(SQLOCK(sq));
3788 	mutex_exit(&stp->sd_lock);
3789 	count = sq->sq_count;
3790 	flags = sq->sq_flags;
3791 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3792 
3793 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3794 		/*
3795 		 * if this queue is being closed, return.
3796 		 */
3797 		if (qp->q_flag & QWCLOSE) {
3798 			mutex_exit(SQLOCK(sq));
3799 			rwnext_exit(qp);
3800 			return (EINVAL);
3801 		}
3802 
3803 		/*
3804 		 * Wait until we can enter the inner perimeter.
3805 		 */
3806 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3807 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3808 		count = sq->sq_count;
3809 		flags = sq->sq_flags;
3810 	}
3811 
3812 	if (isread == 0 && stp->sd_struiowrq == NULL ||
3813 	    isread == 1 && stp->sd_struiordq == NULL) {
3814 		/*
3815 		 * Stream plumbing changed while waiting for inner perimeter
3816 		 * so just return EINVAL and let the caller handle it.
3817 		 */
3818 		mutex_exit(SQLOCK(sq));
3819 		rwnext_exit(qp);
3820 		return (EINVAL);
3821 	}
3822 	if (!(flags & SQ_CIPUT))
3823 		sq->sq_flags = flags | SQ_EXCL;
3824 	sq->sq_count = count + 1;
3825 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3826 	/*
3827 	 * Note: The only message ordering guarantee that rwnext() makes is
3828 	 *	 for the write queue flow-control case. All others (r/w queue
3829 	 *	 with q_count > 0 (or q_first != 0)) are the resposibilty of
3830 	 *	 the queue's rw procedure. This could be genralized here buy
3831 	 *	 running the queue's service procedure, but that wouldn't be
3832 	 *	 the most efficent for all cases.
3833 	 */
3834 	mutex_exit(SQLOCK(sq));
3835 	if (! isread && (qp->q_flag & QFULL)) {
3836 		/*
3837 		 * Write queue may be flow controlled. If so,
3838 		 * mark the queue for wakeup when it's not.
3839 		 */
3840 		mutex_enter(QLOCK(qp));
3841 		if (qp->q_flag & QFULL) {
3842 			qp->q_flag |= QWANTWSYNC;
3843 			mutex_exit(QLOCK(qp));
3844 			rval = EWOULDBLOCK;
3845 			goto out;
3846 		}
3847 		mutex_exit(QLOCK(qp));
3848 	}
3849 
3850 	if (! isread && dp->d_mp)
3851 		STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
3852 		    dp->d_mp->b_datap->db_base);
3853 
3854 	rval = (*proc)(qp, dp);
3855 
3856 	if (isread && dp->d_mp)
3857 		STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
3858 		    dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
3859 out:
3860 	/*
3861 	 * The queue is protected from being freed by sq_count, so it is
3862 	 * safe to call rwnext_exit and reacquire SQLOCK(sq).
3863 	 */
3864 	rwnext_exit(qp);
3865 
3866 	mutex_enter(SQLOCK(sq));
3867 	flags = sq->sq_flags;
3868 	ASSERT(sq->sq_count != 0);
3869 	sq->sq_count--;
3870 	if (flags & SQ_TAIL) {
3871 		putnext_tail(sq, qp, flags);
3872 		/*
3873 		 * The only purpose of this ASSERT is to preserve calling stack
3874 		 * in DEBUG kernel.
3875 		 */
3876 		ASSERT(flags & SQ_TAIL);
3877 		return (rval);
3878 	}
3879 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3880 	/*
3881 	 * Safe to always drop SQ_EXCL:
3882 	 *	Not SQ_CIPUT means we set SQ_EXCL above
3883 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3884 	 *	did a qwriter(INNER) in which case nobody else
3885 	 *	is in the inner perimeter and we are exiting.
3886 	 *
3887 	 * I would like to make the following assertion:
3888 	 *
3889 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3890 	 * 	sq->sq_count == 0);
3891 	 *
3892 	 * which indicates that if we are both putshared and exclusive,
3893 	 * we became exclusive while executing the putproc, and the only
3894 	 * claim on the syncq was the one we dropped a few lines above.
3895 	 * But other threads that enter putnext while the syncq is exclusive
3896 	 * need to make a claim as they may need to drop SQLOCK in the
3897 	 * has_writers case to avoid deadlocks.  If these threads are
3898 	 * delayed or preempted, it is possible that the writer thread can
3899 	 * find out that there are other claims making the (sq_count == 0)
3900 	 * test invalid.
3901 	 */
3902 
3903 	sq->sq_flags = flags & ~SQ_EXCL;
3904 	if (sq->sq_flags & SQ_WANTWAKEUP) {
3905 		sq->sq_flags &= ~SQ_WANTWAKEUP;
3906 		cv_broadcast(&sq->sq_wait);
3907 	}
3908 	mutex_exit(SQLOCK(sq));
3909 	return (rval);
3910 }
3911 
3912 /*
3913  * The purpose of infonext() is to call the info procedure of the next
3914  * (downstream) modules queue.
3915  *
3916  * treated as put entrypoint for perimeter syncronization.
3917  *
3918  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3919  * sync queues). If it is CIPUT sync queue regular sq_count is incremented and
3920  * it does not matter if any regular put entrypoints have been already
3921  * entered.
3922  */
3923 int
3924 infonext(queue_t *qp, infod_t *idp)
3925 {
3926 	queue_t		*nqp;
3927 	syncq_t		*sq;
3928 	uint16_t	count;
3929 	uint16_t 	flags;
3930 	struct qinit	*qi;
3931 	int		(*proc)();
3932 	struct stdata	*stp;
3933 	int		rval;
3934 
3935 	stp = STREAM(qp);
3936 	/*
3937 	 * Prevent q_next from changing by holding sd_lock until
3938 	 * acquiring SQLOCK.
3939 	 */
3940 	mutex_enter(&stp->sd_lock);
3941 	if ((nqp = _WR(qp)) == qp) {
3942 		qp = nqp->q_next;
3943 	} else {
3944 		qp = _RD(nqp->q_next);
3945 	}
3946 	qi = qp->q_qinfo;
3947 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
3948 		mutex_exit(&stp->sd_lock);
3949 		return (EINVAL);
3950 	}
3951 	sq = qp->q_syncq;
3952 	mutex_enter(SQLOCK(sq));
3953 	mutex_exit(&stp->sd_lock);
3954 	count = sq->sq_count;
3955 	flags = sq->sq_flags;
3956 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3957 
3958 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3959 		/*
3960 		 * Wait until we can enter the inner perimeter.
3961 		 */
3962 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3963 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3964 		count = sq->sq_count;
3965 		flags = sq->sq_flags;
3966 	}
3967 
3968 	if (! (flags & SQ_CIPUT))
3969 		sq->sq_flags = flags | SQ_EXCL;
3970 	sq->sq_count = count + 1;
3971 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3972 	mutex_exit(SQLOCK(sq));
3973 
3974 	rval = (*proc)(qp, idp);
3975 
3976 	mutex_enter(SQLOCK(sq));
3977 	flags = sq->sq_flags;
3978 	ASSERT(sq->sq_count != 0);
3979 	sq->sq_count--;
3980 	if (flags & SQ_TAIL) {
3981 		putnext_tail(sq, qp, flags);
3982 		/*
3983 		 * The only purpose of this ASSERT is to preserve calling stack
3984 		 * in DEBUG kernel.
3985 		 */
3986 		ASSERT(flags & SQ_TAIL);
3987 		return (rval);
3988 	}
3989 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3990 /*
3991  * XXXX
3992  * I am not certain the next comment is correct here.  I need to consider
3993  * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
3994  * might cause other problems.  It just might be safer to drop it if
3995  * !SQ_CIPUT because that is when we set it.
3996  */
3997 	/*
3998 	 * Safe to always drop SQ_EXCL:
3999 	 *	Not SQ_CIPUT means we set SQ_EXCL above
4000 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
4001 	 *	did a qwriter(INNER) in which case nobody else
4002 	 *	is in the inner perimeter and we are exiting.
4003 	 *
4004 	 * I would like to make the following assertion:
4005 	 *
4006 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
4007 	 *	sq->sq_count == 0);
4008 	 *
4009 	 * which indicates that if we are both putshared and exclusive,
4010 	 * we became exclusive while executing the putproc, and the only
4011 	 * claim on the syncq was the one we dropped a few lines above.
4012 	 * But other threads that enter putnext while the syncq is exclusive
4013 	 * need to make a claim as they may need to drop SQLOCK in the
4014 	 * has_writers case to avoid deadlocks.  If these threads are
4015 	 * delayed or preempted, it is possible that the writer thread can
4016 	 * find out that there are other claims making the (sq_count == 0)
4017 	 * test invalid.
4018 	 */
4019 
4020 	sq->sq_flags = flags & ~SQ_EXCL;
4021 	mutex_exit(SQLOCK(sq));
4022 	return (rval);
4023 }
4024 
4025 /*
4026  * Return nonzero if the queue is responsible for struio(), else return 0.
4027  */
4028 int
4029 isuioq(queue_t *q)
4030 {
4031 	if (q->q_flag & QREADR)
4032 		return (STREAM(q)->sd_struiordq == q);
4033 	else
4034 		return (STREAM(q)->sd_struiowrq == q);
4035 }
4036 
4037 #if defined(__sparc)
4038 int disable_putlocks = 0;
4039 #else
4040 int disable_putlocks = 1;
4041 #endif
4042 
4043 /*
4044  * called by create_putlock.
4045  */
4046 static void
4047 create_syncq_putlocks(queue_t *q)
4048 {
4049 	syncq_t	*sq = q->q_syncq;
4050 	ciputctrl_t *cip;
4051 	int i;
4052 
4053 	ASSERT(sq != NULL);
4054 
4055 	ASSERT(disable_putlocks == 0);
4056 	ASSERT(n_ciputctrl >= min_n_ciputctrl);
4057 	ASSERT(ciputctrl_cache != NULL);
4058 
4059 	if (!(sq->sq_type & SQ_CIPUT))
4060 		return;
4061 
4062 	for (i = 0; i <= 1; i++) {
4063 		if (sq->sq_ciputctrl == NULL) {
4064 			cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4065 			SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4066 			mutex_enter(SQLOCK(sq));
4067 			if (sq->sq_ciputctrl != NULL) {
4068 				mutex_exit(SQLOCK(sq));
4069 				kmem_cache_free(ciputctrl_cache, cip);
4070 			} else {
4071 				ASSERT(sq->sq_nciputctrl == 0);
4072 				sq->sq_nciputctrl = n_ciputctrl - 1;
4073 				/*
4074 				 * putnext checks sq_ciputctrl without holding
4075 				 * SQLOCK. if it is not NULL putnext assumes
4076 				 * sq_nciputctrl is initialized. membar below
4077 				 * insures that.
4078 				 */
4079 				membar_producer();
4080 				sq->sq_ciputctrl = cip;
4081 				mutex_exit(SQLOCK(sq));
4082 			}
4083 		}
4084 		ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
4085 		if (i == 1)
4086 			break;
4087 		q = _OTHERQ(q);
4088 		if (!(q->q_flag & QPERQ)) {
4089 			ASSERT(sq == q->q_syncq);
4090 			break;
4091 		}
4092 		ASSERT(q->q_syncq != NULL);
4093 		ASSERT(sq != q->q_syncq);
4094 		sq = q->q_syncq;
4095 		ASSERT(sq->sq_type & SQ_CIPUT);
4096 	}
4097 }
4098 
4099 /*
4100  * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
4101  * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
4102  * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
4103  * starting from q and down to the driver.
4104  *
4105  * This should be called after the affected queues are part of stream
4106  * geometry. It should be called from driver/module open routine after
4107  * qprocson() call. It is also called from nfs syscall where it is known that
4108  * stream is configured and won't change its geometry during create_putlock
4109  * call.
4110  *
4111  * caller normally uses 0 value for the stream argument to speed up MT putnext
4112  * into the perimeter of q for example because its perimeter is per module
4113  * (e.g. IP).
4114  *
4115  * caller normally uses non 0 value for the stream argument to hint the system
4116  * that the stream of q is a very contended global system stream
4117  * (e.g. NFS/UDP) and the part of the stream from q to the driver is
4118  * particularly MT hot.
4119  *
4120  * Caller insures stream plumbing won't happen while we are here and therefore
4121  * q_next can be safely used.
4122  */
4123 
4124 void
4125 create_putlocks(queue_t *q, int stream)
4126 {
4127 	ciputctrl_t	*cip;
4128 	struct stdata	*stp = STREAM(q);
4129 
4130 	q = _WR(q);
4131 	ASSERT(stp != NULL);
4132 
4133 	if (disable_putlocks != 0)
4134 		return;
4135 
4136 	if (n_ciputctrl < min_n_ciputctrl)
4137 		return;
4138 
4139 	ASSERT(ciputctrl_cache != NULL);
4140 
4141 	if (stream != 0 && stp->sd_ciputctrl == NULL) {
4142 		cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4143 		SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4144 		mutex_enter(&stp->sd_lock);
4145 		if (stp->sd_ciputctrl != NULL) {
4146 			mutex_exit(&stp->sd_lock);
4147 			kmem_cache_free(ciputctrl_cache, cip);
4148 		} else {
4149 			ASSERT(stp->sd_nciputctrl == 0);
4150 			stp->sd_nciputctrl = n_ciputctrl - 1;
4151 			/*
4152 			 * putnext checks sd_ciputctrl without holding
4153 			 * sd_lock. if it is not NULL putnext assumes
4154 			 * sd_nciputctrl is initialized. membar below
4155 			 * insures that.
4156 			 */
4157 			membar_producer();
4158 			stp->sd_ciputctrl = cip;
4159 			mutex_exit(&stp->sd_lock);
4160 		}
4161 	}
4162 
4163 	ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);
4164 
4165 	while (_SAMESTR(q)) {
4166 		create_syncq_putlocks(q);
4167 		if (stream == 0)
4168 			return;
4169 		q = q->q_next;
4170 	}
4171 	ASSERT(q != NULL);
4172 	create_syncq_putlocks(q);
4173 }
4174 
4175 /*
4176  * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
4177  * through a stream.
4178  *
4179  * Data currently record per-event is a timestamp, module/driver name,
4180  * downstream module/driver name, optional callstack, event type and a per
4181  * type datum.  Much of the STREAMS framework is instrumented for automatic
4182  * flow tracing (when enabled).  Events can be defined and used by STREAMS
4183  * modules and drivers.
4184  *
4185  * Global objects:
4186  *
4187  *	str_ftevent() - Add a flow-trace event to a dblk.
4188  *	str_ftfree() - Free flow-trace data
4189  *
4190  * Local objects:
4191  *
4192  *	fthdr_cache - pointer to the kmem cache for trace header.
4193  *	ftblk_cache - pointer to the kmem cache for trace data blocks.
4194  */
4195 
4196 int str_ftnever = 1;	/* Don't do STREAMS flow tracing */
4197 int str_ftstack = 0;	/* Don't record event call stacks */
4198 
4199 void
4200 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
4201 {
4202 	ftblk_t *bp = hp->tail;
4203 	ftblk_t *nbp;
4204 	ftevnt_t *ep;
4205 	int ix, nix;
4206 
4207 	ASSERT(hp != NULL);
4208 
4209 	for (;;) {
4210 		if ((ix = bp->ix) == FTBLK_EVNTS) {
4211 			/*
4212 			 * Tail doesn't have room, so need a new tail.
4213 			 *
4214 			 * To make this MT safe, first, allocate a new
4215 			 * ftblk, and initialize it.  To make life a
4216 			 * little easier, reserve the first slot (mostly
4217 			 * by making ix = 1).  When we are finished with
4218 			 * the initialization, CAS this pointer to the
4219 			 * tail.  If this succeeds, this is the new
4220 			 * "next" block.  Otherwise, another thread
4221 			 * got here first, so free the block and start
4222 			 * again.
4223 			 */
4224 			nbp = kmem_cache_alloc(ftblk_cache, KM_NOSLEEP);
4225 			if (nbp == NULL) {
4226 				/* no mem, so punt */
4227 				str_ftnever++;
4228 				/* free up all flow data? */
4229 				return;
4230 			}
4231 			nbp->nxt = NULL;
4232 			nbp->ix = 1;
4233 			/*
4234 			 * Just in case there is another thread about
4235 			 * to get the next index, we need to make sure
4236 			 * the value is there for it.
4237 			 */
4238 			membar_producer();
4239 			if (casptr(&hp->tail, bp, nbp) == bp) {
4240 				/* CAS was successful */
4241 				bp->nxt = nbp;
4242 				membar_producer();
4243 				bp = nbp;
4244 				ix = 0;
4245 				goto cas_good;
4246 			} else {
4247 				kmem_cache_free(ftblk_cache, nbp);
4248 				bp = hp->tail;
4249 				continue;
4250 			}
4251 		}
4252 		nix = ix + 1;
4253 		if (cas32((uint32_t *)&bp->ix, ix, nix) == ix) {
4254 		cas_good:
4255 			if (curthread != hp->thread) {
4256 				hp->thread = curthread;
4257 				evnt |= FTEV_CS;
4258 			}
4259 			if (CPU->cpu_seqid != hp->cpu_seqid) {
4260 				hp->cpu_seqid = CPU->cpu_seqid;
4261 				evnt |= FTEV_PS;
4262 			}
4263 			ep = &bp->ev[ix];
4264 			break;
4265 		}
4266 	}
4267 
4268 	if (evnt & FTEV_QMASK) {
4269 		queue_t *qp = p;
4270 
4271 		if (!(qp->q_flag & QREADR))
4272 			evnt |= FTEV_ISWR;
4273 
4274 		ep->mid = Q2NAME(qp);
4275 
4276 		/*
4277 		 * We only record the next queue name for FTEV_PUTNEXT since
4278 		 * that's the only time we *really* need it, and the putnext()
4279 		 * code ensures that qp->q_next won't vanish.  (We could use
4280 		 * claimstr()/releasestr() but at a performance cost.)
4281 		 */
4282 		if ((evnt & FTEV_MASK) == FTEV_PUTNEXT && qp->q_next != NULL)
4283 			ep->midnext = Q2NAME(qp->q_next);
4284 		else
4285 			ep->midnext = NULL;
4286 	} else {
4287 		ep->mid = p;
4288 		ep->midnext = NULL;
4289 	}
4290 
4291 	if (ep->stk != NULL)
4292 		ep->stk->fs_depth = getpcstack(ep->stk->fs_stk, FTSTK_DEPTH);
4293 
4294 	ep->ts = gethrtime();
4295 	ep->evnt = evnt;
4296 	ep->data = data;
4297 	hp->hash = (hp->hash << 9) + hp->hash;
4298 	hp->hash += (evnt << 16) | data;
4299 	hp->hash += (uintptr_t)ep->mid;
4300 }
4301 
4302 /*
4303  * Free flow-trace data.
4304  */
4305 void
4306 str_ftfree(dblk_t *dbp)
4307 {
4308 	fthdr_t *hp = dbp->db_fthdr;
4309 	ftblk_t *bp = &hp->first;
4310 	ftblk_t *nbp;
4311 
4312 	if (bp != hp->tail || bp->ix != 0) {
4313 		/*
4314 		 * Clear out the hash, have the tail point to itself, and free
4315 		 * any continuation blocks.
4316 		 */
4317 		bp = hp->first.nxt;
4318 		hp->tail = &hp->first;
4319 		hp->hash = 0;
4320 		hp->first.nxt = NULL;
4321 		hp->first.ix = 0;
4322 		while (bp != NULL) {
4323 			nbp = bp->nxt;
4324 			kmem_cache_free(ftblk_cache, bp);
4325 			bp = nbp;
4326 		}
4327 	}
4328 	kmem_cache_free(fthdr_cache, hp);
4329 	dbp->db_fthdr = NULL;
4330 }
4331