xref: /illumos-gate/usr/src/uts/common/io/stream.c (revision 5d086a78863b0e948d240467a191382719b8d813)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
22 /*	All Rights Reserved	*/
23 
24 /*
25  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
26  * Use is subject to license terms.
27  *
28  * Copyright 2021 Tintri by DDN, Inc. All rights reserved.
29  * Copyright 2022 Garrett D'Amore
30  * Copyright 2026 Oxide Computer Company
31  */
32 
33 #include <sys/types.h>
34 #include <sys/param.h>
35 #include <sys/thread.h>
36 #include <sys/sysmacros.h>
37 #include <sys/stropts.h>
38 #include <sys/stream.h>
39 #include <sys/strsubr.h>
40 #include <sys/strsun.h>
41 #include <sys/conf.h>
42 #include <sys/debug.h>
43 #include <sys/cmn_err.h>
44 #include <sys/kmem.h>
45 #include <sys/atomic.h>
46 #include <sys/errno.h>
47 #include <sys/vtrace.h>
48 #include <sys/ftrace.h>
49 #include <sys/ontrap.h>
50 #include <sys/sdt.h>
51 #include <sys/strft.h>
52 
53 #ifdef DEBUG
54 #include <sys/kmem_impl.h>
55 #endif
56 
57 /*
58  * This file contains all the STREAMS utility routines that may
59  * be used by modules and drivers.
60  */
61 
62 /*
63  * STREAMS message allocator: principles of operation
64  *
65  * The streams message allocator consists of all the routines that
66  * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
67  * dupb(), freeb() and freemsg().  What follows is a high-level view
68  * of how the allocator works.
69  *
70  * Every streams message consists of one or more mblks, a dblk, and data.
71  * All mblks for all types of messages come from a common mblk_cache.
72  * The dblk and data come in several flavors, depending on how the
73  * message is allocated:
74  *
75  * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
76  *     fixed-size dblk/data caches. For message sizes that are multiples of
77  *     PAGESIZE, dblks are allocated separately from the buffer.
78  *     The associated buffer is allocated by the constructor using kmem_alloc().
79  *     For all other message sizes, dblk and its associated data is allocated
80  *     as a single contiguous chunk of memory.
81  *     Objects in these caches consist of a dblk plus its associated data.
82  *     allocb() determines the nearest-size cache by table lookup:
83  *     the dblk_cache[] array provides the mapping from size to dblk cache.
84  *
85  * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
86  *     kmem_alloc()'ing a buffer for the data and supplying that
87  *     buffer to gesballoc(), described below.
88  *
89  * (3) The four flavors of [d]esballoc[a] are all implemented by a
90  *     common routine, gesballoc() ("generic esballoc").  gesballoc()
91  *     allocates a dblk from the global dblk_esb_cache and sets db_base,
92  *     db_lim and db_frtnp to describe the caller-supplied buffer.
93  *
94  * While there are several routines to allocate messages, there is only
95  * one routine to free messages: freeb().  freeb() simply invokes the
96  * dblk's free method, dbp->db_free(), which is set at allocation time.
97  *
98  * dupb() creates a new reference to a message by allocating a new mblk,
99  * incrementing the dblk reference count and setting the dblk's free
100  * method to dblk_decref().  The dblk's original free method is retained
101  * in db_lastfree.  dblk_decref() decrements the reference count on each
102  * freeb().  If this is not the last reference it just frees the mblk;
103  * if this *is* the last reference, it restores db_free to db_lastfree,
104  * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
105  *
106  * The implementation makes aggressive use of kmem object caching for
107  * maximum performance.  This makes the code simple and compact, but
108  * also a bit abstruse in some places.  The invariants that constitute a
109  * message's constructed state, described below, are more subtle than usual.
110  *
111  * Every dblk has an "attached mblk" as part of its constructed state.
112  * The mblk is allocated by the dblk's constructor and remains attached
113  * until the message is either dup'ed or pulled up.  In the dupb() case
114  * the mblk association doesn't matter until the last free, at which time
115  * dblk_decref() attaches the last mblk to the dblk.  pullupmsg() affects
116  * the mblk association because it swaps the leading mblks of two messages,
117  * so it is responsible for swapping their db_mblk pointers accordingly.
118  * From a constructed-state viewpoint it doesn't matter that a dblk's
119  * attached mblk can change while the message is allocated; all that
120  * matters is that the dblk has *some* attached mblk when it's freed.
121  *
122  * The sizes of the allocb() small-message caches are not magical.
123  * They represent a good trade-off between internal and external
124  * fragmentation for current workloads.  They should be reevaluated
125  * periodically, especially if allocations larger than DBLK_MAX_CACHE
126  * become common.  We use 64-byte alignment so that dblks don't
127  * straddle cache lines unnecessarily.
128  */
129 #define	DBLK_MAX_CACHE		73728
130 #define	DBLK_CACHE_ALIGN	64
131 #define	DBLK_MIN_SIZE		8
132 #define	DBLK_SIZE_SHIFT		3
133 
134 #ifdef _BIG_ENDIAN
135 #define	DBLK_RTFU_SHIFT(field)	\
136 	(8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
137 #else
138 #define	DBLK_RTFU_SHIFT(field)	\
139 	(8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
140 #endif
141 
142 #define	DBLK_RTFU(ref, type, flags, uioflag)	\
143 	(((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
144 	((type) << DBLK_RTFU_SHIFT(db_type)) | \
145 	(((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
146 	((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
147 #define	DBLK_RTFU_REF_MASK	(DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
148 #define	DBLK_RTFU_WORD(dbp)	(*((uint32_t *)&(dbp)->db_ref))
149 #define	MBLK_BAND_FLAG_WORD(mp)	(*((uint32_t *)&(mp)->b_band))
150 
151 static size_t dblk_sizes[] = {
152 #ifdef _LP64
153 	16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856,
154 	8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624,
155 	40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392,
156 #else
157 	64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904,
158 	8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672,
159 	40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440,
160 #endif
161 	DBLK_MAX_CACHE, 0
162 };
163 
164 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
165 static struct kmem_cache *mblk_cache;
166 static struct kmem_cache *dblk_esb_cache;
167 static struct kmem_cache *fthdr_cache;
168 static struct kmem_cache *ftblk_cache;
169 
170 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
171 static mblk_t *allocb_oversize(size_t size, int flags);
172 static int allocb_tryhard_fails;
173 static void frnop_func(void *arg);
174 frtn_t frnop = { frnop_func };
175 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
176 
177 static boolean_t rwnext_enter(queue_t *qp);
178 static void rwnext_exit(queue_t *qp);
179 
180 /*
181  * Patchable mblk/dblk kmem_cache flags.
182  */
183 int dblk_kmem_flags = 0;
184 int mblk_kmem_flags = 0;
185 
186 static int
dblk_constructor(void * buf,void * cdrarg,int kmflags)187 dblk_constructor(void *buf, void *cdrarg, int kmflags)
188 {
189 	dblk_t *dbp = buf;
190 	ssize_t msg_size = (ssize_t)cdrarg;
191 	size_t index;
192 
193 	ASSERT(msg_size != 0);
194 
195 	index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
196 
197 	ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
198 
199 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
200 		return (-1);
201 	if ((msg_size & PAGEOFFSET) == 0) {
202 		dbp->db_base = kmem_alloc(msg_size, kmflags);
203 		if (dbp->db_base == NULL) {
204 			kmem_cache_free(mblk_cache, dbp->db_mblk);
205 			return (-1);
206 		}
207 	} else {
208 		dbp->db_base = (unsigned char *)&dbp[1];
209 	}
210 
211 	dbp->db_mblk->b_datap = dbp;
212 	dbp->db_cache = dblk_cache[index];
213 	dbp->db_lim = dbp->db_base + msg_size;
214 	dbp->db_free = dbp->db_lastfree = dblk_lastfree;
215 	dbp->db_frtnp = NULL;
216 	dbp->db_fthdr = NULL;
217 	dbp->db_credp = NULL;
218 	dbp->db_cpid = -1;
219 	dbp->db_struioflag = 0;
220 	dbp->db_struioun.cksum.flags = 0;
221 	return (0);
222 }
223 
224 /*ARGSUSED*/
225 static int
dblk_esb_constructor(void * buf,void * cdrarg,int kmflags)226 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
227 {
228 	dblk_t *dbp = buf;
229 
230 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
231 		return (-1);
232 	dbp->db_mblk->b_datap = dbp;
233 	dbp->db_cache = dblk_esb_cache;
234 	dbp->db_fthdr = NULL;
235 	dbp->db_credp = NULL;
236 	dbp->db_cpid = -1;
237 	dbp->db_struioflag = 0;
238 	dbp->db_struioun.cksum.flags = 0;
239 	return (0);
240 }
241 
242 static int
bcache_dblk_constructor(void * buf,void * cdrarg,int kmflags)243 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
244 {
245 	dblk_t *dbp = buf;
246 	bcache_t *bcp = cdrarg;
247 
248 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
249 		return (-1);
250 
251 	dbp->db_base = kmem_cache_alloc(bcp->buffer_cache, kmflags);
252 	if (dbp->db_base == NULL) {
253 		kmem_cache_free(mblk_cache, dbp->db_mblk);
254 		return (-1);
255 	}
256 
257 	dbp->db_mblk->b_datap = dbp;
258 	dbp->db_cache = (void *)bcp;
259 	dbp->db_lim = dbp->db_base + bcp->size;
260 	dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
261 	dbp->db_frtnp = NULL;
262 	dbp->db_fthdr = NULL;
263 	dbp->db_credp = NULL;
264 	dbp->db_cpid = -1;
265 	dbp->db_struioflag = 0;
266 	dbp->db_struioun.cksum.flags = 0;
267 	return (0);
268 }
269 
270 /*ARGSUSED*/
271 static void
dblk_destructor(void * buf,void * cdrarg)272 dblk_destructor(void *buf, void *cdrarg)
273 {
274 	dblk_t *dbp = buf;
275 	ssize_t msg_size = (ssize_t)cdrarg;
276 
277 	ASSERT(dbp->db_mblk->b_datap == dbp);
278 	ASSERT(msg_size != 0);
279 	ASSERT(dbp->db_struioflag == 0);
280 	ASSERT(dbp->db_struioun.cksum.flags == 0);
281 
282 	if ((msg_size & PAGEOFFSET) == 0) {
283 		kmem_free(dbp->db_base, msg_size);
284 	}
285 
286 	kmem_cache_free(mblk_cache, dbp->db_mblk);
287 }
288 
289 static void
bcache_dblk_destructor(void * buf,void * cdrarg)290 bcache_dblk_destructor(void *buf, void *cdrarg)
291 {
292 	dblk_t *dbp = buf;
293 	bcache_t *bcp = cdrarg;
294 
295 	kmem_cache_free(bcp->buffer_cache, dbp->db_base);
296 
297 	ASSERT(dbp->db_mblk->b_datap == dbp);
298 	ASSERT(dbp->db_struioflag == 0);
299 	ASSERT(dbp->db_struioun.cksum.flags == 0);
300 
301 	kmem_cache_free(mblk_cache, dbp->db_mblk);
302 }
303 
304 /* ARGSUSED */
305 static int
ftblk_constructor(void * buf,void * cdrarg,int kmflags)306 ftblk_constructor(void *buf, void *cdrarg, int kmflags)
307 {
308 	ftblk_t *fbp = buf;
309 	int i;
310 
311 	bzero(fbp, sizeof (ftblk_t));
312 	if (str_ftstack != 0) {
313 		for (i = 0; i < FTBLK_EVNTS; i++)
314 			fbp->ev[i].stk = kmem_alloc(sizeof (ftstk_t), kmflags);
315 	}
316 
317 	return (0);
318 }
319 
320 /* ARGSUSED */
321 static void
ftblk_destructor(void * buf,void * cdrarg)322 ftblk_destructor(void *buf, void *cdrarg)
323 {
324 	ftblk_t *fbp = buf;
325 	int i;
326 
327 	if (str_ftstack != 0) {
328 		for (i = 0; i < FTBLK_EVNTS; i++) {
329 			if (fbp->ev[i].stk != NULL) {
330 				kmem_free(fbp->ev[i].stk, sizeof (ftstk_t));
331 				fbp->ev[i].stk = NULL;
332 			}
333 		}
334 	}
335 }
336 
337 static int
fthdr_constructor(void * buf,void * cdrarg,int kmflags)338 fthdr_constructor(void *buf, void *cdrarg, int kmflags)
339 {
340 	fthdr_t *fhp = buf;
341 
342 	return (ftblk_constructor(&fhp->first, cdrarg, kmflags));
343 }
344 
345 static void
fthdr_destructor(void * buf,void * cdrarg)346 fthdr_destructor(void *buf, void *cdrarg)
347 {
348 	fthdr_t *fhp = buf;
349 
350 	ftblk_destructor(&fhp->first, cdrarg);
351 }
352 
353 void
streams_msg_init(void)354 streams_msg_init(void)
355 {
356 	char name[40];
357 	size_t size;
358 	size_t lastsize = DBLK_MIN_SIZE;
359 	size_t *sizep;
360 	struct kmem_cache *cp;
361 	size_t tot_size;
362 	int offset;
363 
364 	mblk_cache = kmem_cache_create("streams_mblk", sizeof (mblk_t), 32,
365 	    NULL, NULL, NULL, NULL, NULL, mblk_kmem_flags);
366 
367 	for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
368 
369 		if ((offset = (size & PAGEOFFSET)) != 0) {
370 			/*
371 			 * We are in the middle of a page, dblk should
372 			 * be allocated on the same page
373 			 */
374 			tot_size = size + sizeof (dblk_t);
375 			ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
376 			    < PAGESIZE);
377 			ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
378 
379 		} else {
380 
381 			/*
382 			 * buf size is multiple of page size, dblk and
383 			 * buffer are allocated separately.
384 			 */
385 
386 			ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
387 			tot_size = sizeof (dblk_t);
388 		}
389 
390 		(void) sprintf(name, "streams_dblk_%ld", size);
391 		cp = kmem_cache_create(name, tot_size, DBLK_CACHE_ALIGN,
392 		    dblk_constructor, dblk_destructor, NULL, (void *)(size),
393 		    NULL, dblk_kmem_flags);
394 
395 		while (lastsize <= size) {
396 			dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
397 			lastsize += DBLK_MIN_SIZE;
398 		}
399 	}
400 
401 	dblk_esb_cache = kmem_cache_create("streams_dblk_esb", sizeof (dblk_t),
402 	    DBLK_CACHE_ALIGN, dblk_esb_constructor, dblk_destructor, NULL,
403 	    (void *)sizeof (dblk_t), NULL, dblk_kmem_flags);
404 	fthdr_cache = kmem_cache_create("streams_fthdr", sizeof (fthdr_t), 32,
405 	    fthdr_constructor, fthdr_destructor, NULL, NULL, NULL, 0);
406 	ftblk_cache = kmem_cache_create("streams_ftblk", sizeof (ftblk_t), 32,
407 	    ftblk_constructor, ftblk_destructor, NULL, NULL, NULL, 0);
408 
409 	/* initialize throttling queue for esballoc */
410 	esballoc_queue_init();
411 }
412 
413 /*ARGSUSED*/
414 mblk_t *
allocb(size_t size,uint_t pri)415 allocb(size_t size, uint_t pri)
416 {
417 	dblk_t *dbp;
418 	mblk_t *mp;
419 	size_t index;
420 
421 	index =  (size - 1)  >> DBLK_SIZE_SHIFT;
422 
423 	if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
424 		if (size != 0) {
425 			mp = allocb_oversize(size, KM_NOSLEEP);
426 			goto out;
427 		}
428 		index = 0;
429 	}
430 
431 	if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
432 		mp = NULL;
433 		goto out;
434 	}
435 
436 	mp = dbp->db_mblk;
437 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
438 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
439 	mp->b_rptr = mp->b_wptr = dbp->db_base;
440 	mp->b_queue = NULL;
441 	MBLK_BAND_FLAG_WORD(mp) = 0;
442 	STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
443 out:
444 	FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
445 
446 	return (mp);
447 }
448 
449 /*
450  * Allocate an mblk taking db_credp and db_cpid from the template.
451  * Allow the cred to be NULL.
452  */
453 mblk_t *
allocb_tmpl(size_t size,const mblk_t * tmpl)454 allocb_tmpl(size_t size, const mblk_t *tmpl)
455 {
456 	mblk_t *mp = allocb(size, 0);
457 
458 	if (mp != NULL) {
459 		dblk_t *src = tmpl->b_datap;
460 		dblk_t *dst = mp->b_datap;
461 		cred_t *cr;
462 		pid_t cpid;
463 
464 		cr = msg_getcred(tmpl, &cpid);
465 		if (cr != NULL)
466 			crhold(dst->db_credp = cr);
467 		dst->db_cpid = cpid;
468 		dst->db_type = src->db_type;
469 	}
470 	return (mp);
471 }
472 
473 mblk_t *
allocb_cred(size_t size,cred_t * cr,pid_t cpid)474 allocb_cred(size_t size, cred_t *cr, pid_t cpid)
475 {
476 	mblk_t *mp = allocb(size, 0);
477 
478 	ASSERT(cr != NULL);
479 	if (mp != NULL) {
480 		dblk_t *dbp = mp->b_datap;
481 
482 		crhold(dbp->db_credp = cr);
483 		dbp->db_cpid = cpid;
484 	}
485 	return (mp);
486 }
487 
488 mblk_t *
allocb_cred_wait(size_t size,uint_t flags,int * error,cred_t * cr,pid_t cpid)489 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr, pid_t cpid)
490 {
491 	mblk_t *mp = allocb_wait(size, 0, flags, error);
492 
493 	ASSERT(cr != NULL);
494 	if (mp != NULL) {
495 		dblk_t *dbp = mp->b_datap;
496 
497 		crhold(dbp->db_credp = cr);
498 		dbp->db_cpid = cpid;
499 	}
500 
501 	return (mp);
502 }
503 
504 /*
505  * Extract the db_cred (and optionally db_cpid) from a message.
506  * We find the first mblk which has a non-NULL db_cred and use that.
507  * If none found we return NULL.
508  * Does NOT get a hold on the cred.
509  */
510 cred_t *
msg_getcred(const mblk_t * mp,pid_t * cpidp)511 msg_getcred(const mblk_t *mp, pid_t *cpidp)
512 {
513 	cred_t *cr = NULL;
514 	cred_t *cr2;
515 	mblk_t *mp2;
516 
517 	while (mp != NULL) {
518 		dblk_t *dbp = mp->b_datap;
519 
520 		cr = dbp->db_credp;
521 		if (cr == NULL) {
522 			mp = mp->b_cont;
523 			continue;
524 		}
525 		if (cpidp != NULL)
526 			*cpidp = dbp->db_cpid;
527 
528 #ifdef DEBUG
529 		/*
530 		 * Normally there should at most one db_credp in a message.
531 		 * But if there are multiple (as in the case of some M_IOC*
532 		 * and some internal messages in TCP/IP bind logic) then
533 		 * they must be identical in the normal case.
534 		 * However, a socket can be shared between different uids
535 		 * in which case data queued in TCP would be from different
536 		 * creds. Thus we can only assert for the zoneid being the
537 		 * same. Due to Multi-level Level Ports for TX, some
538 		 * cred_t can have a NULL cr_zone, and we skip the comparison
539 		 * in that case.
540 		 */
541 		mp2 = mp->b_cont;
542 		while (mp2 != NULL) {
543 			cr2 = DB_CRED(mp2);
544 			if (cr2 != NULL) {
545 				DTRACE_PROBE2(msg__getcred,
546 				    cred_t *, cr, cred_t *, cr2);
547 				ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
548 				    crgetzone(cr) == NULL ||
549 				    crgetzone(cr2) == NULL);
550 			}
551 			mp2 = mp2->b_cont;
552 		}
553 #endif
554 		return (cr);
555 	}
556 	if (cpidp != NULL)
557 		*cpidp = NOPID;
558 	return (NULL);
559 }
560 
561 /*
562  * Variant of msg_getcred which, when a cred is found
563  * 1. Returns with a hold on the cred
564  * 2. Clears the first cred in the mblk.
565  * This is more efficient to use than a msg_getcred() + crhold() when
566  * the message is freed after the cred has been extracted.
567  *
568  * The caller is responsible for ensuring that there is no other reference
569  * on the message since db_credp can not be cleared when there are other
570  * references.
571  */
572 cred_t *
msg_extractcred(mblk_t * mp,pid_t * cpidp)573 msg_extractcred(mblk_t *mp, pid_t *cpidp)
574 {
575 	cred_t *cr = NULL;
576 	cred_t *cr2;
577 	mblk_t *mp2;
578 
579 	while (mp != NULL) {
580 		dblk_t *dbp = mp->b_datap;
581 
582 		cr = dbp->db_credp;
583 		if (cr == NULL) {
584 			mp = mp->b_cont;
585 			continue;
586 		}
587 		ASSERT(dbp->db_ref == 1);
588 		dbp->db_credp = NULL;
589 		if (cpidp != NULL)
590 			*cpidp = dbp->db_cpid;
591 #ifdef DEBUG
592 		/*
593 		 * Normally there should at most one db_credp in a message.
594 		 * But if there are multiple (as in the case of some M_IOC*
595 		 * and some internal messages in TCP/IP bind logic) then
596 		 * they must be identical in the normal case.
597 		 * However, a socket can be shared between different uids
598 		 * in which case data queued in TCP would be from different
599 		 * creds. Thus we can only assert for the zoneid being the
600 		 * same. Due to Multi-level Level Ports for TX, some
601 		 * cred_t can have a NULL cr_zone, and we skip the comparison
602 		 * in that case.
603 		 */
604 		mp2 = mp->b_cont;
605 		while (mp2 != NULL) {
606 			cr2 = DB_CRED(mp2);
607 			if (cr2 != NULL) {
608 				DTRACE_PROBE2(msg__extractcred,
609 				    cred_t *, cr, cred_t *, cr2);
610 				ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
611 				    crgetzone(cr) == NULL ||
612 				    crgetzone(cr2) == NULL);
613 			}
614 			mp2 = mp2->b_cont;
615 		}
616 #endif
617 		return (cr);
618 	}
619 	return (NULL);
620 }
621 /*
622  * Get the label for a message. Uses the first mblk in the message
623  * which has a non-NULL db_credp.
624  * Returns NULL if there is no credp.
625  */
626 extern struct ts_label_s *
msg_getlabel(const mblk_t * mp)627 msg_getlabel(const mblk_t *mp)
628 {
629 	cred_t *cr = msg_getcred(mp, NULL);
630 
631 	if (cr == NULL)
632 		return (NULL);
633 
634 	return (crgetlabel(cr));
635 }
636 
637 void
freeb(mblk_t * mp)638 freeb(mblk_t *mp)
639 {
640 	dblk_t *dbp = mp->b_datap;
641 
642 	ASSERT(dbp->db_ref > 0);
643 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
644 	FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
645 
646 	STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
647 
648 	dbp->db_free(mp, dbp);
649 }
650 
651 void
freemsg(mblk_t * mp)652 freemsg(mblk_t *mp)
653 {
654 	FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
655 	while (mp) {
656 		dblk_t *dbp = mp->b_datap;
657 		mblk_t *mp_cont = mp->b_cont;
658 
659 		ASSERT(dbp->db_ref > 0);
660 		ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
661 
662 		STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
663 
664 		dbp->db_free(mp, dbp);
665 		mp = mp_cont;
666 	}
667 }
668 
669 /*
670  * Reallocate a block for another use.  Try hard to use the old block.
671  * If the old data is wanted (copy), leave b_wptr at the end of the data,
672  * otherwise return b_wptr = b_rptr.
673  *
674  * This routine is private and unstable.
675  */
676 mblk_t	*
reallocb(mblk_t * mp,size_t size,uint_t copy)677 reallocb(mblk_t *mp, size_t size, uint_t copy)
678 {
679 	mblk_t		*mp1;
680 	unsigned char	*old_rptr;
681 	ptrdiff_t	cur_size;
682 
683 	if (mp == NULL)
684 		return (allocb(size, BPRI_HI));
685 
686 	cur_size = mp->b_wptr - mp->b_rptr;
687 	old_rptr = mp->b_rptr;
688 
689 	ASSERT(mp->b_datap->db_ref != 0);
690 
691 	if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
692 		/*
693 		 * If the data is wanted and it will fit where it is, no
694 		 * work is required.
695 		 */
696 		if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
697 			return (mp);
698 
699 		mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
700 		mp1 = mp;
701 	} else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
702 		/* XXX other mp state could be copied too, db_flags ... ? */
703 		mp1->b_cont = mp->b_cont;
704 	} else {
705 		return (NULL);
706 	}
707 
708 	if (copy) {
709 		bcopy(old_rptr, mp1->b_rptr, cur_size);
710 		mp1->b_wptr = mp1->b_rptr + cur_size;
711 	}
712 
713 	if (mp != mp1)
714 		freeb(mp);
715 
716 	return (mp1);
717 }
718 
719 static void
dblk_lastfree(mblk_t * mp,dblk_t * dbp)720 dblk_lastfree(mblk_t *mp, dblk_t *dbp)
721 {
722 	ASSERT(dbp->db_mblk == mp);
723 	if (dbp->db_fthdr != NULL)
724 		str_ftfree(dbp);
725 
726 	/* set credp and projid to be 'unspecified' before returning to cache */
727 	if (dbp->db_credp != NULL) {
728 		crfree(dbp->db_credp);
729 		dbp->db_credp = NULL;
730 	}
731 	dbp->db_cpid = -1;
732 
733 	/* Reset the struioflag and the checksum flag fields */
734 	dbp->db_struioflag = 0;
735 	dbp->db_struioun.cksum.flags = 0;
736 
737 	/* and the COOKED and/or UIOA flag(s) */
738 	dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA);
739 
740 	kmem_cache_free(dbp->db_cache, dbp);
741 }
742 
743 static void
dblk_decref(mblk_t * mp,dblk_t * dbp)744 dblk_decref(mblk_t *mp, dblk_t *dbp)
745 {
746 	if (dbp->db_ref != 1) {
747 		uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
748 		    -(1 << DBLK_RTFU_SHIFT(db_ref)));
749 		/*
750 		 * atomic_add_32_nv() just decremented db_ref, so we no longer
751 		 * have a reference to the dblk, which means another thread
752 		 * could free it.  Therefore we cannot examine the dblk to
753 		 * determine whether ours was the last reference.  Instead,
754 		 * we extract the new and minimum reference counts from rtfu.
755 		 * Note that all we're really saying is "if (ref != refmin)".
756 		 */
757 		if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
758 		    ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
759 			kmem_cache_free(mblk_cache, mp);
760 			return;
761 		}
762 	}
763 	dbp->db_mblk = mp;
764 	dbp->db_free = dbp->db_lastfree;
765 	dbp->db_lastfree(mp, dbp);
766 }
767 
768 mblk_t *
dupb(mblk_t * mp)769 dupb(mblk_t *mp)
770 {
771 	dblk_t *dbp = mp->b_datap;
772 	mblk_t *new_mp;
773 	uint32_t oldrtfu, newrtfu;
774 
775 	if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
776 		goto out;
777 
778 	new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
779 	new_mp->b_rptr = mp->b_rptr;
780 	new_mp->b_wptr = mp->b_wptr;
781 	new_mp->b_datap = dbp;
782 	new_mp->b_queue = NULL;
783 	MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
784 
785 	STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
786 
787 	dbp->db_free = dblk_decref;
788 	do {
789 		ASSERT(dbp->db_ref > 0);
790 		oldrtfu = DBLK_RTFU_WORD(dbp);
791 		newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
792 		/*
793 		 * If db_ref is maxed out we can't dup this message anymore.
794 		 */
795 		if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
796 			kmem_cache_free(mblk_cache, new_mp);
797 			new_mp = NULL;
798 			goto out;
799 		}
800 	} while (atomic_cas_32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) !=
801 	    oldrtfu);
802 
803 out:
804 	FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
805 	return (new_mp);
806 }
807 
808 static void
dblk_lastfree_desb(mblk_t * mp,dblk_t * dbp)809 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
810 {
811 	frtn_t *frp = dbp->db_frtnp;
812 
813 	ASSERT(dbp->db_mblk == mp);
814 	frp->free_func(frp->free_arg);
815 	if (dbp->db_fthdr != NULL)
816 		str_ftfree(dbp);
817 
818 	/* set credp and projid to be 'unspecified' before returning to cache */
819 	if (dbp->db_credp != NULL) {
820 		crfree(dbp->db_credp);
821 		dbp->db_credp = NULL;
822 	}
823 	dbp->db_cpid = -1;
824 	dbp->db_struioflag = 0;
825 	dbp->db_struioun.cksum.flags = 0;
826 
827 	kmem_cache_free(dbp->db_cache, dbp);
828 }
829 
830 /*ARGSUSED*/
831 static void
frnop_func(void * arg)832 frnop_func(void *arg)
833 {
834 }
835 
836 /*
837  * Generic esballoc used to implement the four flavors: [d]esballoc[a].
838  *
839  * The variants with a 'd' prefix (desballoc, desballoca)
840  *	directly free the mblk when it loses its last ref,
841  *	where the other variants free asynchronously.
842  * The variants with an 'a' suffix (esballoca, desballoca)
843  *	add an extra ref, effectively letting the streams subsystem
844  *	know that the message data should not be modified.
845  *	(eg. see db_ref checks in reallocb and elsewhere)
846  *
847  * The method used by the 'a' suffix functions to keep the dblk
848  * db_ref > 1 is non-obvious.  The macro DBLK_RTFU(2,...) passed to
849  * gesballoc sets the initial db_ref = 2 and sets the DBLK_REFMIN
850  * bit in db_flags.  In dblk_decref() that flag essentially means
851  * the dblk has one extra ref, so the "last ref" is one, not zero.
852  */
853 static mblk_t *
gesballoc(unsigned char * base,size_t size,uint32_t db_rtfu,frtn_t * frp,void (* lastfree)(mblk_t *,dblk_t *),int kmflags)854 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
855     void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
856 {
857 	dblk_t *dbp;
858 	mblk_t *mp;
859 
860 	ASSERT(base != NULL && frp != NULL);
861 
862 	if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
863 		mp = NULL;
864 		goto out;
865 	}
866 
867 	mp = dbp->db_mblk;
868 	dbp->db_base = base;
869 	dbp->db_lim = base + size;
870 	dbp->db_free = dbp->db_lastfree = lastfree;
871 	dbp->db_frtnp = frp;
872 	DBLK_RTFU_WORD(dbp) = db_rtfu;
873 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
874 	mp->b_rptr = mp->b_wptr = base;
875 	mp->b_queue = NULL;
876 	MBLK_BAND_FLAG_WORD(mp) = 0;
877 
878 out:
879 	FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
880 	return (mp);
881 }
882 
883 /*ARGSUSED*/
884 mblk_t *
esballoc(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)885 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
886 {
887 	mblk_t *mp;
888 
889 	/*
890 	 * Note that this is structured to allow the common case (i.e.
891 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
892 	 * call optimization.
893 	 */
894 	if (!str_ftnever) {
895 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
896 		    frp, freebs_enqueue, KM_NOSLEEP);
897 
898 		if (mp != NULL)
899 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
900 		return (mp);
901 	}
902 
903 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
904 	    frp, freebs_enqueue, KM_NOSLEEP));
905 }
906 
907 /*
908  * Same as esballoc() but sleeps waiting for memory.
909  */
910 /*ARGSUSED*/
911 mblk_t *
esballoc_wait(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)912 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
913 {
914 	mblk_t *mp;
915 
916 	/*
917 	 * Note that this is structured to allow the common case (i.e.
918 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
919 	 * call optimization.
920 	 */
921 	if (!str_ftnever) {
922 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
923 		    frp, freebs_enqueue, KM_SLEEP);
924 
925 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
926 		return (mp);
927 	}
928 
929 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
930 	    frp, freebs_enqueue, KM_SLEEP));
931 }
932 
933 /*ARGSUSED*/
934 mblk_t *
desballoc(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)935 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
936 {
937 	mblk_t *mp;
938 
939 	/*
940 	 * Note that this is structured to allow the common case (i.e.
941 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
942 	 * call optimization.
943 	 */
944 	if (!str_ftnever) {
945 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
946 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
947 
948 		if (mp != NULL)
949 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
950 		return (mp);
951 	}
952 
953 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
954 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
955 }
956 
957 /*ARGSUSED*/
958 mblk_t *
esballoca(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)959 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
960 {
961 	mblk_t *mp;
962 
963 	/*
964 	 * Note that this is structured to allow the common case (i.e.
965 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
966 	 * call optimization.
967 	 */
968 	if (!str_ftnever) {
969 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
970 		    frp, freebs_enqueue, KM_NOSLEEP);
971 
972 		if (mp != NULL)
973 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
974 		return (mp);
975 	}
976 
977 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
978 	    frp, freebs_enqueue, KM_NOSLEEP));
979 }
980 
981 /*
982  * Same as esballoca() but sleeps waiting for memory.
983  */
984 mblk_t *
esballoca_wait(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)985 esballoca_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
986 {
987 	mblk_t *mp;
988 
989 	/*
990 	 * Note that this is structured to allow the common case (i.e.
991 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
992 	 * call optimization.
993 	 */
994 	if (!str_ftnever) {
995 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
996 		    frp, freebs_enqueue, KM_SLEEP);
997 
998 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
999 		return (mp);
1000 	}
1001 
1002 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
1003 	    frp, freebs_enqueue, KM_SLEEP));
1004 }
1005 
1006 /*ARGSUSED*/
1007 mblk_t *
desballoca(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)1008 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
1009 {
1010 	mblk_t *mp;
1011 
1012 	/*
1013 	 * Note that this is structured to allow the common case (i.e.
1014 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
1015 	 * call optimization.
1016 	 */
1017 	if (!str_ftnever) {
1018 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
1019 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
1020 
1021 		if (mp != NULL)
1022 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
1023 		return (mp);
1024 	}
1025 
1026 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
1027 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
1028 }
1029 
1030 static void
bcache_dblk_lastfree(mblk_t * mp,dblk_t * dbp)1031 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
1032 {
1033 	bcache_t *bcp = dbp->db_cache;
1034 
1035 	ASSERT(dbp->db_mblk == mp);
1036 	if (dbp->db_fthdr != NULL)
1037 		str_ftfree(dbp);
1038 
1039 	/* set credp and projid to be 'unspecified' before returning to cache */
1040 	if (dbp->db_credp != NULL) {
1041 		crfree(dbp->db_credp);
1042 		dbp->db_credp = NULL;
1043 	}
1044 	dbp->db_cpid = -1;
1045 	dbp->db_struioflag = 0;
1046 	dbp->db_struioun.cksum.flags = 0;
1047 
1048 	mutex_enter(&bcp->mutex);
1049 	kmem_cache_free(bcp->dblk_cache, dbp);
1050 	bcp->alloc--;
1051 
1052 	if (bcp->alloc == 0 && bcp->destroy != 0) {
1053 		kmem_cache_destroy(bcp->dblk_cache);
1054 		kmem_cache_destroy(bcp->buffer_cache);
1055 		mutex_exit(&bcp->mutex);
1056 		mutex_destroy(&bcp->mutex);
1057 		kmem_free(bcp, sizeof (bcache_t));
1058 	} else {
1059 		mutex_exit(&bcp->mutex);
1060 	}
1061 }
1062 
1063 bcache_t *
bcache_create(char * name,size_t size,uint_t align)1064 bcache_create(char *name, size_t size, uint_t align)
1065 {
1066 	bcache_t *bcp;
1067 	char buffer[255];
1068 
1069 	ASSERT((align & (align - 1)) == 0);
1070 
1071 	if ((bcp = kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == NULL)
1072 		return (NULL);
1073 
1074 	bcp->size = size;
1075 	bcp->align = align;
1076 	bcp->alloc = 0;
1077 	bcp->destroy = 0;
1078 
1079 	mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
1080 
1081 	(void) sprintf(buffer, "%s_buffer_cache", name);
1082 	bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
1083 	    NULL, NULL, NULL, 0);
1084 	(void) sprintf(buffer, "%s_dblk_cache", name);
1085 	bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
1086 	    DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
1087 	    NULL, (void *)bcp, NULL, 0);
1088 
1089 	return (bcp);
1090 }
1091 
1092 void
bcache_destroy(bcache_t * bcp)1093 bcache_destroy(bcache_t *bcp)
1094 {
1095 	ASSERT(bcp != NULL);
1096 
1097 	mutex_enter(&bcp->mutex);
1098 	if (bcp->alloc == 0) {
1099 		kmem_cache_destroy(bcp->dblk_cache);
1100 		kmem_cache_destroy(bcp->buffer_cache);
1101 		mutex_exit(&bcp->mutex);
1102 		mutex_destroy(&bcp->mutex);
1103 		kmem_free(bcp, sizeof (bcache_t));
1104 	} else {
1105 		bcp->destroy++;
1106 		mutex_exit(&bcp->mutex);
1107 	}
1108 }
1109 
1110 /*ARGSUSED*/
1111 mblk_t *
bcache_allocb(bcache_t * bcp,uint_t pri)1112 bcache_allocb(bcache_t *bcp, uint_t pri)
1113 {
1114 	dblk_t *dbp;
1115 	mblk_t *mp = NULL;
1116 
1117 	ASSERT(bcp != NULL);
1118 
1119 	mutex_enter(&bcp->mutex);
1120 	if (bcp->destroy != 0) {
1121 		mutex_exit(&bcp->mutex);
1122 		goto out;
1123 	}
1124 
1125 	if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
1126 		mutex_exit(&bcp->mutex);
1127 		goto out;
1128 	}
1129 	bcp->alloc++;
1130 	mutex_exit(&bcp->mutex);
1131 
1132 	ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
1133 
1134 	mp = dbp->db_mblk;
1135 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1136 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
1137 	mp->b_rptr = mp->b_wptr = dbp->db_base;
1138 	mp->b_queue = NULL;
1139 	MBLK_BAND_FLAG_WORD(mp) = 0;
1140 	STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
1141 out:
1142 	FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
1143 
1144 	return (mp);
1145 }
1146 
1147 static void
dblk_lastfree_oversize(mblk_t * mp,dblk_t * dbp)1148 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
1149 {
1150 	ASSERT(dbp->db_mblk == mp);
1151 	if (dbp->db_fthdr != NULL)
1152 		str_ftfree(dbp);
1153 
1154 	/* set credp and projid to be 'unspecified' before returning to cache */
1155 	if (dbp->db_credp != NULL) {
1156 		crfree(dbp->db_credp);
1157 		dbp->db_credp = NULL;
1158 	}
1159 	dbp->db_cpid = -1;
1160 	dbp->db_struioflag = 0;
1161 	dbp->db_struioun.cksum.flags = 0;
1162 
1163 	kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
1164 	kmem_cache_free(dbp->db_cache, dbp);
1165 }
1166 
1167 static mblk_t *
allocb_oversize(size_t size,int kmflags)1168 allocb_oversize(size_t size, int kmflags)
1169 {
1170 	mblk_t *mp;
1171 	void *buf;
1172 
1173 	size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
1174 	if ((buf = kmem_alloc(size, kmflags)) == NULL)
1175 		return (NULL);
1176 	if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
1177 	    &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
1178 		kmem_free(buf, size);
1179 
1180 	if (mp != NULL)
1181 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
1182 
1183 	return (mp);
1184 }
1185 
1186 mblk_t *
allocb_tryhard(size_t target_size)1187 allocb_tryhard(size_t target_size)
1188 {
1189 	size_t size;
1190 	mblk_t *bp;
1191 
1192 	for (size = target_size; size < target_size + 512;
1193 	    size += DBLK_CACHE_ALIGN)
1194 		if ((bp = allocb(size, BPRI_HI)) != NULL)
1195 			return (bp);
1196 	allocb_tryhard_fails++;
1197 	return (NULL);
1198 }
1199 
1200 /*
1201  * This routine is consolidation private for STREAMS internal use
1202  * This routine may only be called from sync routines (i.e., not
1203  * from put or service procedures).  It is located here (rather
1204  * than strsubr.c) so that we don't have to expose all of the
1205  * allocb() implementation details in header files.
1206  */
1207 mblk_t *
allocb_wait(size_t size,uint_t pri,uint_t flags,int * error)1208 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
1209 {
1210 	dblk_t *dbp;
1211 	mblk_t *mp;
1212 	size_t index;
1213 
1214 	index = (size -1) >> DBLK_SIZE_SHIFT;
1215 
1216 	if (flags & STR_NOSIG) {
1217 		if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
1218 			if (size != 0) {
1219 				mp = allocb_oversize(size, KM_SLEEP);
1220 				FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
1221 				    (uintptr_t)mp);
1222 				return (mp);
1223 			}
1224 			index = 0;
1225 		}
1226 
1227 		dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
1228 		mp = dbp->db_mblk;
1229 		DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1230 		mp->b_next = mp->b_prev = mp->b_cont = NULL;
1231 		mp->b_rptr = mp->b_wptr = dbp->db_base;
1232 		mp->b_queue = NULL;
1233 		MBLK_BAND_FLAG_WORD(mp) = 0;
1234 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
1235 
1236 		FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
1237 
1238 	} else {
1239 		while ((mp = allocb(size, pri)) == NULL) {
1240 			if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
1241 				return (NULL);
1242 		}
1243 	}
1244 
1245 	return (mp);
1246 }
1247 
1248 /*
1249  * Call function 'func' with 'arg' when a class zero block can
1250  * be allocated with priority 'pri'.
1251  */
1252 bufcall_id_t
esbbcall(uint_t pri,void (* func)(void *),void * arg)1253 esbbcall(uint_t pri, void (*func)(void *), void *arg)
1254 {
1255 	return (bufcall(1, pri, func, arg));
1256 }
1257 
1258 /*
1259  * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
1260  * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
1261  * This provides consistency for all internal allocators of ioctl.
1262  */
1263 mblk_t *
mkiocb(uint_t cmd)1264 mkiocb(uint_t cmd)
1265 {
1266 	struct iocblk	*ioc;
1267 	mblk_t		*mp;
1268 
1269 	/*
1270 	 * Allocate enough space for any of the ioctl related messages.
1271 	 */
1272 	if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
1273 		return (NULL);
1274 
1275 	bzero(mp->b_rptr, sizeof (union ioctypes));
1276 
1277 	/*
1278 	 * Set the mblk_t information and ptrs correctly.
1279 	 */
1280 	mp->b_wptr += sizeof (struct iocblk);
1281 	mp->b_datap->db_type = M_IOCTL;
1282 
1283 	/*
1284 	 * Fill in the fields.
1285 	 */
1286 	ioc		= (struct iocblk *)mp->b_rptr;
1287 	ioc->ioc_cmd	= cmd;
1288 	ioc->ioc_cr	= kcred;
1289 	ioc->ioc_id	= getiocseqno();
1290 	ioc->ioc_flag	= IOC_NATIVE;
1291 	return (mp);
1292 }
1293 
1294 /*
1295  * test if block of given size can be allocated with a request of
1296  * the given priority.
1297  * 'pri' is no longer used, but is retained for compatibility.
1298  */
1299 /* ARGSUSED */
1300 int
testb(size_t size,uint_t pri)1301 testb(size_t size, uint_t pri)
1302 {
1303 	return ((size + sizeof (dblk_t)) <= kmem_avail());
1304 }
1305 
1306 /*
1307  * Call function 'func' with argument 'arg' when there is a reasonably
1308  * good chance that a block of size 'size' can be allocated.
1309  * 'pri' is no longer used, but is retained for compatibility.
1310  */
1311 /* ARGSUSED */
1312 bufcall_id_t
bufcall(size_t size,uint_t pri,void (* func)(void *),void * arg)1313 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
1314 {
1315 	static long bid = 1;	/* always odd to save checking for zero */
1316 	bufcall_id_t bc_id;
1317 	struct strbufcall *bcp;
1318 
1319 	if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
1320 		return (0);
1321 
1322 	bcp->bc_func = func;
1323 	bcp->bc_arg = arg;
1324 	bcp->bc_size = size;
1325 	bcp->bc_next = NULL;
1326 	bcp->bc_executor = NULL;
1327 
1328 	mutex_enter(&strbcall_lock);
1329 	/*
1330 	 * After bcp is linked into strbcalls and strbcall_lock is dropped there
1331 	 * should be no references to bcp since it may be freed by
1332 	 * runbufcalls(). Since bcp_id field is returned, we save its value in
1333 	 * the local var.
1334 	 */
1335 	bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2);	/* keep it odd */
1336 
1337 	/*
1338 	 * add newly allocated stream event to existing
1339 	 * linked list of events.
1340 	 */
1341 	if (strbcalls.bc_head == NULL) {
1342 		strbcalls.bc_head = strbcalls.bc_tail = bcp;
1343 	} else {
1344 		strbcalls.bc_tail->bc_next = bcp;
1345 		strbcalls.bc_tail = bcp;
1346 	}
1347 
1348 	cv_signal(&strbcall_cv);
1349 	mutex_exit(&strbcall_lock);
1350 	return (bc_id);
1351 }
1352 
1353 /*
1354  * Cancel a bufcall request.
1355  */
1356 void
unbufcall(bufcall_id_t id)1357 unbufcall(bufcall_id_t id)
1358 {
1359 	strbufcall_t *bcp, *pbcp;
1360 
1361 	mutex_enter(&strbcall_lock);
1362 again:
1363 	pbcp = NULL;
1364 	for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
1365 		if (id == bcp->bc_id)
1366 			break;
1367 		pbcp = bcp;
1368 	}
1369 	if (bcp) {
1370 		if (bcp->bc_executor != NULL) {
1371 			if (bcp->bc_executor != curthread) {
1372 				cv_wait(&bcall_cv, &strbcall_lock);
1373 				goto again;
1374 			}
1375 		} else {
1376 			if (pbcp)
1377 				pbcp->bc_next = bcp->bc_next;
1378 			else
1379 				strbcalls.bc_head = bcp->bc_next;
1380 			if (bcp == strbcalls.bc_tail)
1381 				strbcalls.bc_tail = pbcp;
1382 			kmem_free(bcp, sizeof (strbufcall_t));
1383 		}
1384 	}
1385 	mutex_exit(&strbcall_lock);
1386 }
1387 
1388 /*
1389  * Duplicate a message block by block (uses dupb), returning
1390  * a pointer to the duplicate message.
1391  * Returns a non-NULL value only if the entire message
1392  * was dup'd.
1393  */
1394 mblk_t *
dupmsg(mblk_t * bp)1395 dupmsg(mblk_t *bp)
1396 {
1397 	mblk_t *head, *nbp;
1398 
1399 	if (!bp || !(nbp = head = dupb(bp)))
1400 		return (NULL);
1401 
1402 	while (bp->b_cont) {
1403 		if (!(nbp->b_cont = dupb(bp->b_cont))) {
1404 			freemsg(head);
1405 			return (NULL);
1406 		}
1407 		nbp = nbp->b_cont;
1408 		bp = bp->b_cont;
1409 	}
1410 	return (head);
1411 }
1412 
1413 #define	DUPB_NOLOAN(bp) \
1414 	((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
1415 	copyb((bp)) : dupb((bp)))
1416 
1417 mblk_t *
dupmsg_noloan(mblk_t * bp)1418 dupmsg_noloan(mblk_t *bp)
1419 {
1420 	mblk_t *head, *nbp;
1421 
1422 	if (bp == NULL || DB_TYPE(bp) != M_DATA ||
1423 	    ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
1424 		return (NULL);
1425 
1426 	while (bp->b_cont) {
1427 		if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
1428 			freemsg(head);
1429 			return (NULL);
1430 		}
1431 		nbp = nbp->b_cont;
1432 		bp = bp->b_cont;
1433 	}
1434 	return (head);
1435 }
1436 
1437 /*
1438  * Copy data from message and data block to newly allocated message and
1439  * data block. Returns new message block pointer, or NULL if error.
1440  * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
1441  * as in the original even when db_base is not word aligned. (bug 1052877)
1442  */
1443 mblk_t *
copyb(mblk_t * bp)1444 copyb(mblk_t *bp)
1445 {
1446 	mblk_t	*nbp;
1447 	dblk_t	*dp, *ndp;
1448 	uchar_t *base;
1449 	size_t	size;
1450 	size_t	unaligned;
1451 
1452 	ASSERT(bp->b_wptr >= bp->b_rptr);
1453 
1454 	dp = bp->b_datap;
1455 	if (dp->db_fthdr != NULL)
1456 		STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
1457 
1458 	size = dp->db_lim - dp->db_base;
1459 	unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
1460 	if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
1461 		return (NULL);
1462 	nbp->b_flag = bp->b_flag;
1463 	nbp->b_band = bp->b_band;
1464 	ndp = nbp->b_datap;
1465 
1466 	/*
1467 	 * Copy the various checksum information that came in
1468 	 * originally.
1469 	 */
1470 	ndp->db_cksumstart = dp->db_cksumstart;
1471 	ndp->db_cksumend = dp->db_cksumend;
1472 	ndp->db_cksumstuff = dp->db_cksumstuff;
1473 	bcopy(dp->db_struioun.data, ndp->db_struioun.data,
1474 	    sizeof (dp->db_struioun.data));
1475 
1476 	/*
1477 	 * Well, here is a potential issue.  If we are trying to
1478 	 * trace a flow, and we copy the message, we might lose
1479 	 * information about where this message might have been.
1480 	 * So we should inherit the FT data.  On the other hand,
1481 	 * a user might be interested only in alloc to free data.
1482 	 * So I guess the real answer is to provide a tunable.
1483 	 */
1484 	STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1485 
1486 	base = ndp->db_base + unaligned;
1487 	bcopy(dp->db_base, ndp->db_base + unaligned, size);
1488 
1489 	nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
1490 	nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
1491 
1492 	return (nbp);
1493 }
1494 
1495 /*
1496  * Copy data from message to newly allocated message using new
1497  * data blocks.  Returns a pointer to the new message, or NULL if error.
1498  */
1499 mblk_t *
copymsg(mblk_t * bp)1500 copymsg(mblk_t *bp)
1501 {
1502 	mblk_t *head, *nbp;
1503 
1504 	if (!bp || !(nbp = head = copyb(bp)))
1505 		return (NULL);
1506 
1507 	while (bp->b_cont) {
1508 		if (!(nbp->b_cont = copyb(bp->b_cont))) {
1509 			freemsg(head);
1510 			return (NULL);
1511 		}
1512 		nbp = nbp->b_cont;
1513 		bp = bp->b_cont;
1514 	}
1515 	return (head);
1516 }
1517 
1518 /*
1519  * link a message block to tail of message
1520  */
1521 void
linkb(mblk_t * mp,mblk_t * bp)1522 linkb(mblk_t *mp, mblk_t *bp)
1523 {
1524 	ASSERT(mp && bp);
1525 
1526 	for (; mp->b_cont; mp = mp->b_cont)
1527 		;
1528 	mp->b_cont = bp;
1529 }
1530 
1531 /*
1532  * unlink a message block from head of message
1533  * return pointer to new message.
1534  * NULL if message becomes empty.
1535  */
1536 mblk_t *
unlinkb(mblk_t * bp)1537 unlinkb(mblk_t *bp)
1538 {
1539 	mblk_t *bp1;
1540 
1541 	bp1 = bp->b_cont;
1542 	bp->b_cont = NULL;
1543 	return (bp1);
1544 }
1545 
1546 /*
1547  * remove a message block "bp" from message "mp"
1548  *
1549  * Return pointer to new message or NULL if no message remains.
1550  * Return -1 if bp is not found in message.
1551  */
1552 mblk_t *
rmvb(mblk_t * mp,mblk_t * bp)1553 rmvb(mblk_t *mp, mblk_t *bp)
1554 {
1555 	mblk_t *tmp;
1556 	mblk_t *lastp = NULL;
1557 
1558 	ASSERT(mp && bp);
1559 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
1560 		if (tmp == bp) {
1561 			if (lastp)
1562 				lastp->b_cont = tmp->b_cont;
1563 			else
1564 				mp = tmp->b_cont;
1565 			tmp->b_cont = NULL;
1566 			return (mp);
1567 		}
1568 		lastp = tmp;
1569 	}
1570 	return ((mblk_t *)-1);
1571 }
1572 
1573 /*
1574  * Concatenate and align first len bytes of common
1575  * message type.  Len == -1, means concat everything.
1576  * Returns 1 on success, 0 on failure
1577  * After the pullup, mp points to the pulled up data.
1578  */
1579 int
pullupmsg(mblk_t * mp,ssize_t len)1580 pullupmsg(mblk_t *mp, ssize_t len)
1581 {
1582 	mblk_t *bp, *b_cont;
1583 	dblk_t *dbp;
1584 	ssize_t n;
1585 
1586 	ASSERT(mp->b_datap->db_ref > 0);
1587 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
1588 
1589 	if (len == -1) {
1590 		if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
1591 			return (1);
1592 		len = xmsgsize(mp);
1593 	} else {
1594 		ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
1595 		ASSERT(first_mblk_len >= 0);
1596 		/*
1597 		 * If the length is less than that of the first mblk,
1598 		 * we want to pull up the message into an aligned mblk.
1599 		 * Though not part of the spec, some callers assume it.
1600 		 */
1601 		if (len <= first_mblk_len) {
1602 			if (str_aligned(mp->b_rptr))
1603 				return (1);
1604 			len = first_mblk_len;
1605 		} else if (xmsgsize(mp) < len)
1606 			return (0);
1607 	}
1608 
1609 	if ((bp = allocb_tmpl(len, mp)) == NULL)
1610 		return (0);
1611 
1612 	dbp = bp->b_datap;
1613 	*bp = *mp;		/* swap mblks so bp heads the old msg... */
1614 	mp->b_datap = dbp;	/* ... and mp heads the new message */
1615 	mp->b_datap->db_mblk = mp;
1616 	bp->b_datap->db_mblk = bp;
1617 	mp->b_rptr = mp->b_wptr = dbp->db_base;
1618 
1619 	do {
1620 		ASSERT(bp->b_datap->db_ref > 0);
1621 		ASSERT(bp->b_wptr >= bp->b_rptr);
1622 		n = MIN(bp->b_wptr - bp->b_rptr, len);
1623 		ASSERT(n >= 0);		/* allow zero-length mblk_t's */
1624 		if (n > 0)
1625 			bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
1626 		mp->b_wptr += n;
1627 		bp->b_rptr += n;
1628 		len -= n;
1629 		if (bp->b_rptr != bp->b_wptr)
1630 			break;
1631 		b_cont = bp->b_cont;
1632 		freeb(bp);
1633 		bp = b_cont;
1634 	} while (len && bp);
1635 
1636 	mp->b_cont = bp;	/* tack on whatever wasn't pulled up */
1637 
1638 	return (1);
1639 }
1640 
1641 /*
1642  * Concatenate and align at least the first len bytes of common message
1643  * type.  Len == -1 means concatenate everything.  The original message is
1644  * unaltered.  Returns a pointer to a new message on success, otherwise
1645  * returns NULL.
1646  */
1647 mblk_t *
msgpullup(mblk_t * mp,ssize_t len)1648 msgpullup(mblk_t *mp, ssize_t len)
1649 {
1650 	return (msgpullup_pad(mp, len, 0));
1651 }
1652 
1653 /*
1654  * Concatenate and align at least the first len bytes of common message
1655  * type, leaving `pad` bytes of headroom before the message.
1656  *
1657  * `len` == -1 means concatenate everything.  The original message is
1658  * unaltered.  Returns a pointer to a new message on success, otherwise
1659  * returns NULL.
1660  */
1661 mblk_t *
msgpullup_pad(mblk_t * mp,ssize_t len,size_t pad)1662 msgpullup_pad(mblk_t *mp, ssize_t len, size_t pad)
1663 {
1664 	mblk_t *newmp;
1665 	ssize_t totlen = xmsgsize(mp);
1666 	ssize_t offset = 0;
1667 
1668 	if (len == -1)
1669 		len = totlen;
1670 
1671 	if (len < 0 || (len > 0 && len > totlen))
1672 		return (NULL);
1673 
1674 	/* Now that len is normalised (>0), check for overflow including pad. */
1675 	const size_t alloc_sz = (size_t)len + pad;
1676 	if (alloc_sz < (size_t)len ||
1677 	    (newmp = allocb_tmpl(alloc_sz, mp)) == NULL) {
1678 		return (NULL);
1679 	}
1680 
1681 	newmp->b_flag = mp->b_flag;
1682 	newmp->b_band = mp->b_band;
1683 	newmp->b_wptr += pad;
1684 	newmp->b_rptr = newmp->b_wptr;
1685 
1686 	while (len > 0) {
1687 		ssize_t seglen = MBLKL(mp);
1688 		ssize_t n = MIN(seglen, len);
1689 
1690 		ASSERT3P(mp, !=, NULL);	/* guaranteed by len <= totlen */
1691 		ASSERT3S(n, >=, 0);	/* allow zero-length mblk_t's */
1692 		if (n > 0)
1693 			bcopy(mp->b_rptr, newmp->b_wptr, n);
1694 		newmp->b_wptr += n;
1695 		len -= n;
1696 
1697 		if (n == seglen)
1698 			mp = mp->b_cont;
1699 		else if (len == 0)
1700 			offset = n;
1701 	}
1702 	ASSERT3S(len, ==, 0);
1703 
1704 	if (mp != NULL) {
1705 		newmp->b_cont = dupmsg(mp);
1706 		if (newmp->b_cont == NULL) {
1707 			freemsg(newmp);
1708 			return (NULL);
1709 		}
1710 		ASSERT3S(offset, >=, 0);
1711 		ASSERT3U(MBLKL(newmp->b_cont), >=, offset);
1712 		newmp->b_cont->b_rptr += offset;
1713 	}
1714 
1715 	return (newmp);
1716 }
1717 
1718 /*
1719  * Trim bytes from message
1720  *  len > 0, trim from head
1721  *  len < 0, trim from tail
1722  * Returns 1 on success, 0 on failure.
1723  */
1724 int
adjmsg(mblk_t * mp,ssize_t len)1725 adjmsg(mblk_t *mp, ssize_t len)
1726 {
1727 	mblk_t *bp;
1728 	mblk_t *save_bp = NULL;
1729 	mblk_t *prev_bp;
1730 	mblk_t *bcont;
1731 	unsigned char type;
1732 	ssize_t n;
1733 	int fromhead;
1734 	int first;
1735 
1736 	ASSERT(mp != NULL);
1737 
1738 	if (len < 0) {
1739 		fromhead = 0;
1740 		len = -len;
1741 	} else {
1742 		fromhead = 1;
1743 	}
1744 
1745 	if (xmsgsize(mp) < len)
1746 		return (0);
1747 
1748 	if (fromhead) {
1749 		first = 1;
1750 		while (len) {
1751 			ASSERT(mp->b_wptr >= mp->b_rptr);
1752 			n = MIN(mp->b_wptr - mp->b_rptr, len);
1753 			mp->b_rptr += n;
1754 			len -= n;
1755 
1756 			/*
1757 			 * If this is not the first zero length
1758 			 * message remove it
1759 			 */
1760 			if (!first && (mp->b_wptr == mp->b_rptr)) {
1761 				bcont = mp->b_cont;
1762 				freeb(mp);
1763 				mp = save_bp->b_cont = bcont;
1764 			} else {
1765 				save_bp = mp;
1766 				mp = mp->b_cont;
1767 			}
1768 			first = 0;
1769 		}
1770 	} else {
1771 		type = mp->b_datap->db_type;
1772 		while (len) {
1773 			bp = mp;
1774 			save_bp = NULL;
1775 
1776 			/*
1777 			 * Find the last message of same type
1778 			 */
1779 			while (bp && bp->b_datap->db_type == type) {
1780 				ASSERT(bp->b_wptr >= bp->b_rptr);
1781 				prev_bp = save_bp;
1782 				save_bp = bp;
1783 				bp = bp->b_cont;
1784 			}
1785 			if (save_bp == NULL)
1786 				break;
1787 			n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
1788 			save_bp->b_wptr -= n;
1789 			len -= n;
1790 
1791 			/*
1792 			 * If this is not the first message
1793 			 * and we have taken away everything
1794 			 * from this message, remove it
1795 			 */
1796 
1797 			if ((save_bp != mp) &&
1798 			    (save_bp->b_wptr == save_bp->b_rptr)) {
1799 				bcont = save_bp->b_cont;
1800 				freeb(save_bp);
1801 				prev_bp->b_cont = bcont;
1802 			}
1803 		}
1804 	}
1805 	return (1);
1806 }
1807 
1808 /*
1809  * get number of data bytes in message
1810  */
1811 size_t
msgdsize(mblk_t * bp)1812 msgdsize(mblk_t *bp)
1813 {
1814 	size_t count = 0;
1815 
1816 	for (; bp; bp = bp->b_cont)
1817 		if (bp->b_datap->db_type == M_DATA) {
1818 			ASSERT(bp->b_wptr >= bp->b_rptr);
1819 			count += bp->b_wptr - bp->b_rptr;
1820 		}
1821 	return (count);
1822 }
1823 
1824 /*
1825  * Get a message off head of queue
1826  *
1827  * If queue has no buffers then mark queue
1828  * with QWANTR. (queue wants to be read by
1829  * someone when data becomes available)
1830  *
1831  * If there is something to take off then do so.
1832  * If queue falls below hi water mark turn off QFULL
1833  * flag.  Decrement weighted count of queue.
1834  * Also turn off QWANTR because queue is being read.
1835  *
1836  * The queue count is maintained on a per-band basis.
1837  * Priority band 0 (normal messages) uses q_count,
1838  * q_lowat, etc.  Non-zero priority bands use the
1839  * fields in their respective qband structures
1840  * (qb_count, qb_lowat, etc.)  All messages appear
1841  * on the same list, linked via their b_next pointers.
1842  * q_first is the head of the list.  q_count does
1843  * not reflect the size of all the messages on the
1844  * queue.  It only reflects those messages in the
1845  * normal band of flow.  The one exception to this
1846  * deals with high priority messages.  They are in
1847  * their own conceptual "band", but are accounted
1848  * against q_count.
1849  *
1850  * If queue count is below the lo water mark and QWANTW
1851  * is set, enable the closest backq which has a service
1852  * procedure and turn off the QWANTW flag.
1853  *
1854  * getq could be built on top of rmvq, but isn't because
1855  * of performance considerations.
1856  *
1857  * A note on the use of q_count and q_mblkcnt:
1858  *   q_count is the traditional byte count for messages that
1859  *   have been put on a queue.  Documentation tells us that
1860  *   we shouldn't rely on that count, but some drivers/modules
1861  *   do.  What was needed, however, is a mechanism to prevent
1862  *   runaway streams from consuming all of the resources,
1863  *   and particularly be able to flow control zero-length
1864  *   messages.  q_mblkcnt is used for this purpose.  It
1865  *   counts the number of mblk's that are being put on
1866  *   the queue.  The intention here, is that each mblk should
1867  *   contain one byte of data and, for the purpose of
1868  *   flow-control, logically does.  A queue will become
1869  *   full when EITHER of these values (q_count and q_mblkcnt)
1870  *   reach the highwater mark.  It will clear when BOTH
1871  *   of them drop below the highwater mark.  And it will
1872  *   backenable when BOTH of them drop below the lowwater
1873  *   mark.
1874  *   With this algorithm, a driver/module might be able
1875  *   to find a reasonably accurate q_count, and the
1876  *   framework can still try and limit resource usage.
1877  */
1878 mblk_t *
getq(queue_t * q)1879 getq(queue_t *q)
1880 {
1881 	mblk_t *bp;
1882 	uchar_t band = 0;
1883 
1884 	bp = getq_noenab(q, 0);
1885 	if (bp != NULL)
1886 		band = bp->b_band;
1887 
1888 	/*
1889 	 * Inlined from qbackenable().
1890 	 * Quick check without holding the lock.
1891 	 */
1892 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1893 		return (bp);
1894 
1895 	qbackenable(q, band);
1896 	return (bp);
1897 }
1898 
1899 /*
1900  * Returns the number of bytes in a message (a message is defined as a
1901  * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we
1902  * also return the number of distinct mblks in the message.
1903  */
1904 int
mp_cont_len(mblk_t * bp,int * mblkcnt)1905 mp_cont_len(mblk_t *bp, int *mblkcnt)
1906 {
1907 	mblk_t	*mp;
1908 	int	mblks = 0;
1909 	int	bytes = 0;
1910 
1911 	for (mp = bp; mp != NULL; mp = mp->b_cont) {
1912 		bytes += MBLKL(mp);
1913 		mblks++;
1914 	}
1915 
1916 	if (mblkcnt != NULL)
1917 		*mblkcnt = mblks;
1918 
1919 	return (bytes);
1920 }
1921 
1922 /*
1923  * Like getq() but does not backenable.  This is used by the stream
1924  * head when a putback() is likely.  The caller must call qbackenable()
1925  * after it is done with accessing the queue.
1926  * The rbytes arguments to getq_noneab() allows callers to specify a
1927  * the maximum number of bytes to return. If the current amount on the
1928  * queue is less than this then the entire message will be returned.
1929  * A value of 0 returns the entire message and is equivalent to the old
1930  * default behaviour prior to the addition of the rbytes argument.
1931  */
1932 mblk_t *
getq_noenab(queue_t * q,ssize_t rbytes)1933 getq_noenab(queue_t *q, ssize_t rbytes)
1934 {
1935 	mblk_t *bp, *mp1;
1936 	mblk_t *mp2 = NULL;
1937 	qband_t *qbp;
1938 	kthread_id_t freezer;
1939 	int	bytecnt = 0, mblkcnt = 0;
1940 
1941 	/* freezestr should allow its caller to call getq/putq */
1942 	freezer = STREAM(q)->sd_freezer;
1943 	if (freezer == curthread) {
1944 		ASSERT(frozenstr(q));
1945 		ASSERT(MUTEX_HELD(QLOCK(q)));
1946 	} else
1947 		mutex_enter(QLOCK(q));
1948 
1949 	if ((bp = q->q_first) == 0) {
1950 		q->q_flag |= QWANTR;
1951 	} else {
1952 		/*
1953 		 * If the caller supplied a byte threshold and there is
1954 		 * more than this amount on the queue then break up the
1955 		 * the message appropriately.  We can only safely do
1956 		 * this for M_DATA messages.
1957 		 */
1958 		if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) &&
1959 		    (q->q_count > rbytes)) {
1960 			/*
1961 			 * Inline version of mp_cont_len() which terminates
1962 			 * when we meet or exceed rbytes.
1963 			 */
1964 			for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) {
1965 				mblkcnt++;
1966 				bytecnt += MBLKL(mp1);
1967 				if (bytecnt  >= rbytes)
1968 					break;
1969 			}
1970 			/*
1971 			 * We need to account for the following scenarios:
1972 			 *
1973 			 * 1) Too much data in the first message:
1974 			 *	mp1 will be the mblk which puts us over our
1975 			 *	byte limit.
1976 			 * 2) Not enough data in the first message:
1977 			 *	mp1 will be NULL.
1978 			 * 3) Exactly the right amount of data contained within
1979 			 *    whole mblks:
1980 			 *	mp1->b_cont will be where we break the message.
1981 			 */
1982 			if (bytecnt > rbytes) {
1983 				/*
1984 				 * Dup/copy mp1 and put what we don't need
1985 				 * back onto the queue. Adjust the read/write
1986 				 * and continuation pointers appropriately
1987 				 * and decrement the current mblk count to
1988 				 * reflect we are putting an mblk back onto
1989 				 * the queue.
1990 				 * When adjusting the message pointers, it's
1991 				 * OK to use the existing bytecnt and the
1992 				 * requested amount (rbytes) to calculate the
1993 				 * the new write offset (b_wptr) of what we
1994 				 * are taking. However, we  cannot use these
1995 				 * values when calculating the read offset of
1996 				 * the mblk we are putting back on the queue.
1997 				 * This is because the begining (b_rptr) of the
1998 				 * mblk represents some arbitrary point within
1999 				 * the message.
2000 				 * It's simplest to do this by advancing b_rptr
2001 				 * by the new length of mp1 as we don't have to
2002 				 * remember any intermediate state.
2003 				 */
2004 				ASSERT(mp1 != NULL);
2005 				mblkcnt--;
2006 				if ((mp2 = dupb(mp1)) == NULL &&
2007 				    (mp2 = copyb(mp1)) == NULL) {
2008 					bytecnt = mblkcnt = 0;
2009 					goto dup_failed;
2010 				}
2011 				mp2->b_cont = mp1->b_cont;
2012 				mp1->b_wptr -= bytecnt - rbytes;
2013 				mp2->b_rptr += mp1->b_wptr - mp1->b_rptr;
2014 				mp1->b_cont = NULL;
2015 				bytecnt = rbytes;
2016 			} else {
2017 				/*
2018 				 * Either there is not enough data in the first
2019 				 * message or there is no excess data to deal
2020 				 * with. If mp1 is NULL, we are taking the
2021 				 * whole message. No need to do anything.
2022 				 * Otherwise we assign mp1->b_cont to mp2 as
2023 				 * we will be putting this back onto the head of
2024 				 * the queue.
2025 				 */
2026 				if (mp1 != NULL) {
2027 					mp2 = mp1->b_cont;
2028 					mp1->b_cont = NULL;
2029 				}
2030 			}
2031 			/*
2032 			 * If mp2 is not NULL then we have part of the message
2033 			 * to put back onto the queue.
2034 			 */
2035 			if (mp2 != NULL) {
2036 				if ((mp2->b_next = bp->b_next) == NULL)
2037 					q->q_last = mp2;
2038 				else
2039 					bp->b_next->b_prev = mp2;
2040 				q->q_first = mp2;
2041 			} else {
2042 				if ((q->q_first = bp->b_next) == NULL)
2043 					q->q_last = NULL;
2044 				else
2045 					q->q_first->b_prev = NULL;
2046 			}
2047 		} else {
2048 			/*
2049 			 * Either no byte threshold was supplied, there is
2050 			 * not enough on the queue or we failed to
2051 			 * duplicate/copy a data block. In these cases we
2052 			 * just take the entire first message.
2053 			 */
2054 dup_failed:
2055 			bytecnt = mp_cont_len(bp, &mblkcnt);
2056 			if ((q->q_first = bp->b_next) == NULL)
2057 				q->q_last = NULL;
2058 			else
2059 				q->q_first->b_prev = NULL;
2060 		}
2061 		if (bp->b_band == 0) {
2062 			q->q_count -= bytecnt;
2063 			q->q_mblkcnt -= mblkcnt;
2064 			if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2065 			    (q->q_mblkcnt < q->q_hiwat))) {
2066 				q->q_flag &= ~QFULL;
2067 			}
2068 		} else {
2069 			int i;
2070 
2071 			ASSERT(bp->b_band <= q->q_nband);
2072 			ASSERT(q->q_bandp != NULL);
2073 			ASSERT(MUTEX_HELD(QLOCK(q)));
2074 			qbp = q->q_bandp;
2075 			i = bp->b_band;
2076 			while (--i > 0)
2077 				qbp = qbp->qb_next;
2078 			if (qbp->qb_first == qbp->qb_last) {
2079 				qbp->qb_first = NULL;
2080 				qbp->qb_last = NULL;
2081 			} else {
2082 				qbp->qb_first = bp->b_next;
2083 			}
2084 			qbp->qb_count -= bytecnt;
2085 			qbp->qb_mblkcnt -= mblkcnt;
2086 			if (qbp->qb_mblkcnt == 0 ||
2087 			    ((qbp->qb_count < qbp->qb_hiwat) &&
2088 			    (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2089 				qbp->qb_flag &= ~QB_FULL;
2090 			}
2091 		}
2092 		q->q_flag &= ~QWANTR;
2093 		bp->b_next = NULL;
2094 		bp->b_prev = NULL;
2095 	}
2096 	if (freezer != curthread)
2097 		mutex_exit(QLOCK(q));
2098 
2099 	STR_FTEVENT_MSG(bp, q, FTEV_GETQ, 0);
2100 
2101 	return (bp);
2102 }
2103 
2104 /*
2105  * Determine if a backenable is needed after removing a message in the
2106  * specified band.
2107  * NOTE: This routine assumes that something like getq_noenab() has been
2108  * already called.
2109  *
2110  * For the read side it is ok to hold sd_lock across calling this (and the
2111  * stream head often does).
2112  * But for the write side strwakeq might be invoked and it acquires sd_lock.
2113  */
2114 void
qbackenable(queue_t * q,uchar_t band)2115 qbackenable(queue_t *q, uchar_t band)
2116 {
2117 	int backenab = 0;
2118 	qband_t *qbp;
2119 	kthread_id_t freezer;
2120 
2121 	ASSERT(q);
2122 	ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
2123 
2124 	/*
2125 	 * Quick check without holding the lock.
2126 	 * OK since after getq() has lowered the q_count these flags
2127 	 * would not change unless either the qbackenable() is done by
2128 	 * another thread (which is ok) or the queue has gotten QFULL
2129 	 * in which case another backenable will take place when the queue
2130 	 * drops below q_lowat.
2131 	 */
2132 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
2133 		return;
2134 
2135 	/* freezestr should allow its caller to call getq/putq */
2136 	freezer = STREAM(q)->sd_freezer;
2137 	if (freezer == curthread) {
2138 		ASSERT(frozenstr(q));
2139 		ASSERT(MUTEX_HELD(QLOCK(q)));
2140 	} else
2141 		mutex_enter(QLOCK(q));
2142 
2143 	if (band == 0) {
2144 		if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
2145 		    q->q_mblkcnt < q->q_lowat)) {
2146 			backenab = q->q_flag & (QWANTW|QWANTWSYNC);
2147 		}
2148 	} else {
2149 		int i;
2150 
2151 		ASSERT((unsigned)band <= q->q_nband);
2152 		ASSERT(q->q_bandp != NULL);
2153 
2154 		qbp = q->q_bandp;
2155 		i = band;
2156 		while (--i > 0)
2157 			qbp = qbp->qb_next;
2158 
2159 		if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
2160 		    qbp->qb_mblkcnt < qbp->qb_lowat)) {
2161 			backenab = qbp->qb_flag & QB_WANTW;
2162 		}
2163 	}
2164 
2165 	if (backenab == 0) {
2166 		if (freezer != curthread)
2167 			mutex_exit(QLOCK(q));
2168 		return;
2169 	}
2170 
2171 	/* Have to drop the lock across strwakeq and backenable */
2172 	if (backenab & QWANTWSYNC)
2173 		q->q_flag &= ~QWANTWSYNC;
2174 	if (backenab & (QWANTW|QB_WANTW)) {
2175 		if (band != 0)
2176 			qbp->qb_flag &= ~QB_WANTW;
2177 		else {
2178 			q->q_flag &= ~QWANTW;
2179 		}
2180 	}
2181 
2182 	if (freezer != curthread)
2183 		mutex_exit(QLOCK(q));
2184 
2185 	if (backenab & QWANTWSYNC)
2186 		strwakeq(q, QWANTWSYNC);
2187 	if (backenab & (QWANTW|QB_WANTW))
2188 		backenable(q, band);
2189 }
2190 
2191 /*
2192  * Remove a message from a queue.  The queue count and other
2193  * flow control parameters are adjusted and the back queue
2194  * enabled if necessary.
2195  *
2196  * rmvq can be called with the stream frozen, but other utility functions
2197  * holding QLOCK, and by streams modules without any locks/frozen.
2198  */
2199 void
rmvq(queue_t * q,mblk_t * mp)2200 rmvq(queue_t *q, mblk_t *mp)
2201 {
2202 	ASSERT(mp != NULL);
2203 
2204 	rmvq_noenab(q, mp);
2205 	if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
2206 		/*
2207 		 * qbackenable can handle a frozen stream but not a "random"
2208 		 * qlock being held. Drop lock across qbackenable.
2209 		 */
2210 		mutex_exit(QLOCK(q));
2211 		qbackenable(q, mp->b_band);
2212 		mutex_enter(QLOCK(q));
2213 	} else {
2214 		qbackenable(q, mp->b_band);
2215 	}
2216 }
2217 
2218 /*
2219  * Like rmvq() but without any backenabling.
2220  * This exists to handle SR_CONSOL_DATA in strrput().
2221  */
2222 void
rmvq_noenab(queue_t * q,mblk_t * mp)2223 rmvq_noenab(queue_t *q, mblk_t *mp)
2224 {
2225 	int i;
2226 	qband_t *qbp = NULL;
2227 	kthread_id_t freezer;
2228 	int	bytecnt = 0, mblkcnt = 0;
2229 
2230 	freezer = STREAM(q)->sd_freezer;
2231 	if (freezer == curthread) {
2232 		ASSERT(frozenstr(q));
2233 		ASSERT(MUTEX_HELD(QLOCK(q)));
2234 	} else if (MUTEX_HELD(QLOCK(q))) {
2235 		/* Don't drop lock on exit */
2236 		freezer = curthread;
2237 	} else
2238 		mutex_enter(QLOCK(q));
2239 
2240 	ASSERT(mp->b_band <= q->q_nband);
2241 	if (mp->b_band != 0) {		/* Adjust band pointers */
2242 		ASSERT(q->q_bandp != NULL);
2243 		qbp = q->q_bandp;
2244 		i = mp->b_band;
2245 		while (--i > 0)
2246 			qbp = qbp->qb_next;
2247 		if (mp == qbp->qb_first) {
2248 			if (mp->b_next && mp->b_band == mp->b_next->b_band)
2249 				qbp->qb_first = mp->b_next;
2250 			else
2251 				qbp->qb_first = NULL;
2252 		}
2253 		if (mp == qbp->qb_last) {
2254 			if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
2255 				qbp->qb_last = mp->b_prev;
2256 			else
2257 				qbp->qb_last = NULL;
2258 		}
2259 	}
2260 
2261 	/*
2262 	 * Remove the message from the list.
2263 	 */
2264 	if (mp->b_prev)
2265 		mp->b_prev->b_next = mp->b_next;
2266 	else
2267 		q->q_first = mp->b_next;
2268 	if (mp->b_next)
2269 		mp->b_next->b_prev = mp->b_prev;
2270 	else
2271 		q->q_last = mp->b_prev;
2272 	mp->b_next = NULL;
2273 	mp->b_prev = NULL;
2274 
2275 	/* Get the size of the message for q_count accounting */
2276 	bytecnt = mp_cont_len(mp, &mblkcnt);
2277 
2278 	if (mp->b_band == 0) {		/* Perform q_count accounting */
2279 		q->q_count -= bytecnt;
2280 		q->q_mblkcnt -= mblkcnt;
2281 		if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2282 		    (q->q_mblkcnt < q->q_hiwat))) {
2283 			q->q_flag &= ~QFULL;
2284 		}
2285 	} else {			/* Perform qb_count accounting */
2286 		qbp->qb_count -= bytecnt;
2287 		qbp->qb_mblkcnt -= mblkcnt;
2288 		if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) &&
2289 		    (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2290 			qbp->qb_flag &= ~QB_FULL;
2291 		}
2292 	}
2293 	if (freezer != curthread)
2294 		mutex_exit(QLOCK(q));
2295 
2296 	STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, 0);
2297 }
2298 
2299 /*
2300  * Empty a queue.
2301  * If flag is set, remove all messages.  Otherwise, remove
2302  * only non-control messages.  If queue falls below its low
2303  * water mark, and QWANTW is set, enable the nearest upstream
2304  * service procedure.
2305  *
2306  * Historical note: when merging the M_FLUSH code in strrput with this
2307  * code one difference was discovered. flushq did not have a check
2308  * for q_lowat == 0 in the backenabling test.
2309  *
2310  * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
2311  * if one exists on the queue.
2312  */
2313 void
flushq_common(queue_t * q,int flag,int pcproto_flag)2314 flushq_common(queue_t *q, int flag, int pcproto_flag)
2315 {
2316 	mblk_t *mp, *nmp;
2317 	qband_t *qbp;
2318 	int backenab = 0;
2319 	unsigned char bpri;
2320 	unsigned char	qbf[NBAND];	/* band flushing backenable flags */
2321 
2322 	if (q->q_first == NULL)
2323 		return;
2324 
2325 	mutex_enter(QLOCK(q));
2326 	mp = q->q_first;
2327 	q->q_first = NULL;
2328 	q->q_last = NULL;
2329 	q->q_count = 0;
2330 	q->q_mblkcnt = 0;
2331 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2332 		qbp->qb_first = NULL;
2333 		qbp->qb_last = NULL;
2334 		qbp->qb_count = 0;
2335 		qbp->qb_mblkcnt = 0;
2336 		qbp->qb_flag &= ~QB_FULL;
2337 	}
2338 	q->q_flag &= ~QFULL;
2339 	mutex_exit(QLOCK(q));
2340 	while (mp) {
2341 		nmp = mp->b_next;
2342 		mp->b_next = mp->b_prev = NULL;
2343 
2344 		STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, 0);
2345 
2346 		if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
2347 			(void) putq(q, mp);
2348 		else if (flag || datamsg(mp->b_datap->db_type))
2349 			freemsg(mp);
2350 		else
2351 			(void) putq(q, mp);
2352 		mp = nmp;
2353 	}
2354 	bpri = 1;
2355 	mutex_enter(QLOCK(q));
2356 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2357 		if ((qbp->qb_flag & QB_WANTW) &&
2358 		    (((qbp->qb_count < qbp->qb_lowat) &&
2359 		    (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
2360 		    qbp->qb_lowat == 0)) {
2361 			qbp->qb_flag &= ~QB_WANTW;
2362 			backenab = 1;
2363 			qbf[bpri] = 1;
2364 		} else
2365 			qbf[bpri] = 0;
2366 		bpri++;
2367 	}
2368 	ASSERT(bpri == (unsigned char)(q->q_nband + 1));
2369 	if ((q->q_flag & QWANTW) &&
2370 	    (((q->q_count < q->q_lowat) &&
2371 	    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2372 		q->q_flag &= ~QWANTW;
2373 		backenab = 1;
2374 		qbf[0] = 1;
2375 	} else
2376 		qbf[0] = 0;
2377 
2378 	/*
2379 	 * If any band can now be written to, and there is a writer
2380 	 * for that band, then backenable the closest service procedure.
2381 	 */
2382 	if (backenab) {
2383 		mutex_exit(QLOCK(q));
2384 		for (bpri = q->q_nband; bpri != 0; bpri--)
2385 			if (qbf[bpri])
2386 				backenable(q, bpri);
2387 		if (qbf[0])
2388 			backenable(q, 0);
2389 	} else
2390 		mutex_exit(QLOCK(q));
2391 }
2392 
2393 /*
2394  * The real flushing takes place in flushq_common. This is done so that
2395  * a flag which specifies whether or not M_PCPROTO messages should be flushed
2396  * or not. Currently the only place that uses this flag is the stream head.
2397  */
2398 void
flushq(queue_t * q,int flag)2399 flushq(queue_t *q, int flag)
2400 {
2401 	flushq_common(q, flag, 0);
2402 }
2403 
2404 /*
2405  * Flush the queue of messages of the given priority band.
2406  * There is some duplication of code between flushq and flushband.
2407  * This is because we want to optimize the code as much as possible.
2408  * The assumption is that there will be more messages in the normal
2409  * (priority 0) band than in any other.
2410  *
2411  * Historical note: when merging the M_FLUSH code in strrput with this
2412  * code one difference was discovered. flushband had an extra check for
2413  * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
2414  * case. That check does not match the man page for flushband and was not
2415  * in the strrput flush code hence it was removed.
2416  */
2417 void
flushband(queue_t * q,unsigned char pri,int flag)2418 flushband(queue_t *q, unsigned char pri, int flag)
2419 {
2420 	mblk_t *mp;
2421 	mblk_t *nmp;
2422 	mblk_t *last;
2423 	qband_t *qbp;
2424 	int band;
2425 
2426 	ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
2427 	if (pri > q->q_nband) {
2428 		return;
2429 	}
2430 	mutex_enter(QLOCK(q));
2431 	if (pri == 0) {
2432 		mp = q->q_first;
2433 		q->q_first = NULL;
2434 		q->q_last = NULL;
2435 		q->q_count = 0;
2436 		q->q_mblkcnt = 0;
2437 		for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2438 			qbp->qb_first = NULL;
2439 			qbp->qb_last = NULL;
2440 			qbp->qb_count = 0;
2441 			qbp->qb_mblkcnt = 0;
2442 			qbp->qb_flag &= ~QB_FULL;
2443 		}
2444 		q->q_flag &= ~QFULL;
2445 		mutex_exit(QLOCK(q));
2446 		while (mp) {
2447 			nmp = mp->b_next;
2448 			mp->b_next = mp->b_prev = NULL;
2449 			if ((mp->b_band == 0) &&
2450 			    ((flag == FLUSHALL) ||
2451 			    datamsg(mp->b_datap->db_type)))
2452 				freemsg(mp);
2453 			else
2454 				(void) putq(q, mp);
2455 			mp = nmp;
2456 		}
2457 		mutex_enter(QLOCK(q));
2458 		if ((q->q_flag & QWANTW) &&
2459 		    (((q->q_count < q->q_lowat) &&
2460 		    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2461 			q->q_flag &= ~QWANTW;
2462 			mutex_exit(QLOCK(q));
2463 
2464 			backenable(q, pri);
2465 		} else
2466 			mutex_exit(QLOCK(q));
2467 	} else {	/* pri != 0 */
2468 		boolean_t flushed = B_FALSE;
2469 		band = pri;
2470 
2471 		ASSERT(MUTEX_HELD(QLOCK(q)));
2472 		qbp = q->q_bandp;
2473 		while (--band > 0)
2474 			qbp = qbp->qb_next;
2475 		mp = qbp->qb_first;
2476 		if (mp == NULL) {
2477 			mutex_exit(QLOCK(q));
2478 			return;
2479 		}
2480 		last = qbp->qb_last->b_next;
2481 		/*
2482 		 * rmvq_noenab() and freemsg() are called for each mblk that
2483 		 * meets the criteria.  The loop is executed until the last
2484 		 * mblk has been processed.
2485 		 */
2486 		while (mp != last) {
2487 			ASSERT(mp->b_band == pri);
2488 			nmp = mp->b_next;
2489 			if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
2490 				rmvq_noenab(q, mp);
2491 				freemsg(mp);
2492 				flushed = B_TRUE;
2493 			}
2494 			mp = nmp;
2495 		}
2496 		mutex_exit(QLOCK(q));
2497 
2498 		/*
2499 		 * If any mblk(s) has been freed, we know that qbackenable()
2500 		 * will need to be called.
2501 		 */
2502 		if (flushed)
2503 			qbackenable(q, pri);
2504 	}
2505 }
2506 
2507 /*
2508  * Return 1 if the queue is not full.  If the queue is full, return
2509  * 0 (may not put message) and set QWANTW flag (caller wants to write
2510  * to the queue).
2511  */
2512 int
canput(queue_t * q)2513 canput(queue_t *q)
2514 {
2515 	TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
2516 
2517 	/* this is for loopback transports, they should not do a canput */
2518 	ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
2519 
2520 	/* Find next forward module that has a service procedure */
2521 	q = q->q_nfsrv;
2522 
2523 	if (!(q->q_flag & QFULL)) {
2524 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2525 		return (1);
2526 	}
2527 	mutex_enter(QLOCK(q));
2528 	if (q->q_flag & QFULL) {
2529 		q->q_flag |= QWANTW;
2530 		mutex_exit(QLOCK(q));
2531 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
2532 		return (0);
2533 	}
2534 	mutex_exit(QLOCK(q));
2535 	TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2536 	return (1);
2537 }
2538 
2539 /*
2540  * This is the new canput for use with priority bands.  Return 1 if the
2541  * band is not full.  If the band is full, return 0 (may not put message)
2542  * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
2543  * write to the queue).
2544  */
2545 int
bcanput(queue_t * q,unsigned char pri)2546 bcanput(queue_t *q, unsigned char pri)
2547 {
2548 	qband_t *qbp;
2549 
2550 	TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
2551 	if (!q)
2552 		return (0);
2553 
2554 	/* Find next forward module that has a service procedure */
2555 	q = q->q_nfsrv;
2556 
2557 	mutex_enter(QLOCK(q));
2558 	if (pri == 0) {
2559 		if (q->q_flag & QFULL) {
2560 			q->q_flag |= QWANTW;
2561 			mutex_exit(QLOCK(q));
2562 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2563 			    "bcanput:%p %X %d", q, pri, 0);
2564 			return (0);
2565 		}
2566 	} else {	/* pri != 0 */
2567 		if (pri > q->q_nband) {
2568 			/*
2569 			 * No band exists yet, so return success.
2570 			 */
2571 			mutex_exit(QLOCK(q));
2572 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2573 			    "bcanput:%p %X %d", q, pri, 1);
2574 			return (1);
2575 		}
2576 		qbp = q->q_bandp;
2577 		while (--pri)
2578 			qbp = qbp->qb_next;
2579 		if (qbp->qb_flag & QB_FULL) {
2580 			qbp->qb_flag |= QB_WANTW;
2581 			mutex_exit(QLOCK(q));
2582 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2583 			    "bcanput:%p %X %d", q, pri, 0);
2584 			return (0);
2585 		}
2586 	}
2587 	mutex_exit(QLOCK(q));
2588 	TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2589 	    "bcanput:%p %X %d", q, pri, 1);
2590 	return (1);
2591 }
2592 
2593 /*
2594  * Put a message on a queue.
2595  *
2596  * Messages are enqueued on a priority basis.  The priority classes
2597  * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
2598  * and B_NORMAL (type < QPCTL && band == 0).
2599  *
2600  * Add appropriate weighted data block sizes to queue count.
2601  * If queue hits high water mark then set QFULL flag.
2602  *
2603  * If QNOENAB is not set (putq is allowed to enable the queue),
2604  * enable the queue only if the message is PRIORITY,
2605  * or the QWANTR flag is set (indicating that the service procedure
2606  * is ready to read the queue.  This implies that a service
2607  * procedure must NEVER put a high priority message back on its own
2608  * queue, as this would result in an infinite loop (!).
2609  */
2610 int
putq(queue_t * q,mblk_t * bp)2611 putq(queue_t *q, mblk_t *bp)
2612 {
2613 	mblk_t *tmp;
2614 	qband_t *qbp = NULL;
2615 	int mcls = (int)queclass(bp);
2616 	kthread_id_t freezer;
2617 	int	bytecnt = 0, mblkcnt = 0;
2618 
2619 	freezer = STREAM(q)->sd_freezer;
2620 	if (freezer == curthread) {
2621 		ASSERT(frozenstr(q));
2622 		ASSERT(MUTEX_HELD(QLOCK(q)));
2623 	} else
2624 		mutex_enter(QLOCK(q));
2625 
2626 	/*
2627 	 * Make sanity checks and if qband structure is not yet
2628 	 * allocated, do so.
2629 	 */
2630 	if (mcls == QPCTL) {
2631 		if (bp->b_band != 0)
2632 			bp->b_band = 0;		/* force to be correct */
2633 	} else if (bp->b_band != 0) {
2634 		int i;
2635 		qband_t **qbpp;
2636 
2637 		if (bp->b_band > q->q_nband) {
2638 
2639 			/*
2640 			 * The qband structure for this priority band is
2641 			 * not on the queue yet, so we have to allocate
2642 			 * one on the fly.  It would be wasteful to
2643 			 * associate the qband structures with every
2644 			 * queue when the queues are allocated.  This is
2645 			 * because most queues will only need the normal
2646 			 * band of flow which can be described entirely
2647 			 * by the queue itself.
2648 			 */
2649 			qbpp = &q->q_bandp;
2650 			while (*qbpp)
2651 				qbpp = &(*qbpp)->qb_next;
2652 			while (bp->b_band > q->q_nband) {
2653 				if ((*qbpp = allocband()) == NULL) {
2654 					if (freezer != curthread)
2655 						mutex_exit(QLOCK(q));
2656 					return (0);
2657 				}
2658 				(*qbpp)->qb_hiwat = q->q_hiwat;
2659 				(*qbpp)->qb_lowat = q->q_lowat;
2660 				q->q_nband++;
2661 				qbpp = &(*qbpp)->qb_next;
2662 			}
2663 		}
2664 		ASSERT(MUTEX_HELD(QLOCK(q)));
2665 		qbp = q->q_bandp;
2666 		i = bp->b_band;
2667 		while (--i)
2668 			qbp = qbp->qb_next;
2669 	}
2670 
2671 	/*
2672 	 * If queue is empty, add the message and initialize the pointers.
2673 	 * Otherwise, adjust message pointers and queue pointers based on
2674 	 * the type of the message and where it belongs on the queue.  Some
2675 	 * code is duplicated to minimize the number of conditionals and
2676 	 * hopefully minimize the amount of time this routine takes.
2677 	 */
2678 	if (!q->q_first) {
2679 		bp->b_next = NULL;
2680 		bp->b_prev = NULL;
2681 		q->q_first = bp;
2682 		q->q_last = bp;
2683 		if (qbp) {
2684 			qbp->qb_first = bp;
2685 			qbp->qb_last = bp;
2686 		}
2687 	} else if (!qbp) {	/* bp->b_band == 0 */
2688 
2689 		/*
2690 		 * If queue class of message is less than or equal to
2691 		 * that of the last one on the queue, tack on to the end.
2692 		 */
2693 		tmp = q->q_last;
2694 		if (mcls <= (int)queclass(tmp)) {
2695 			bp->b_next = NULL;
2696 			bp->b_prev = tmp;
2697 			tmp->b_next = bp;
2698 			q->q_last = bp;
2699 		} else {
2700 			tmp = q->q_first;
2701 			while ((int)queclass(tmp) >= mcls)
2702 				tmp = tmp->b_next;
2703 
2704 			/*
2705 			 * Insert bp before tmp.
2706 			 */
2707 			bp->b_next = tmp;
2708 			bp->b_prev = tmp->b_prev;
2709 			if (tmp->b_prev)
2710 				tmp->b_prev->b_next = bp;
2711 			else
2712 				q->q_first = bp;
2713 			tmp->b_prev = bp;
2714 		}
2715 	} else {		/* bp->b_band != 0 */
2716 		if (qbp->qb_first) {
2717 			tmp = qbp->qb_last;
2718 
2719 			/*
2720 			 * Insert bp after the last message in this band.
2721 			 */
2722 			bp->b_next = tmp->b_next;
2723 			if (tmp->b_next)
2724 				tmp->b_next->b_prev = bp;
2725 			else
2726 				q->q_last = bp;
2727 			bp->b_prev = tmp;
2728 			tmp->b_next = bp;
2729 		} else {
2730 			tmp = q->q_last;
2731 			if ((mcls < (int)queclass(tmp)) ||
2732 			    (bp->b_band <= tmp->b_band)) {
2733 
2734 				/*
2735 				 * Tack bp on end of queue.
2736 				 */
2737 				bp->b_next = NULL;
2738 				bp->b_prev = tmp;
2739 				tmp->b_next = bp;
2740 				q->q_last = bp;
2741 			} else {
2742 				tmp = q->q_first;
2743 				while (tmp->b_datap->db_type >= QPCTL)
2744 					tmp = tmp->b_next;
2745 				while (tmp->b_band >= bp->b_band)
2746 					tmp = tmp->b_next;
2747 
2748 				/*
2749 				 * Insert bp before tmp.
2750 				 */
2751 				bp->b_next = tmp;
2752 				bp->b_prev = tmp->b_prev;
2753 				if (tmp->b_prev)
2754 					tmp->b_prev->b_next = bp;
2755 				else
2756 					q->q_first = bp;
2757 				tmp->b_prev = bp;
2758 			}
2759 			qbp->qb_first = bp;
2760 		}
2761 		qbp->qb_last = bp;
2762 	}
2763 
2764 	/* Get message byte count for q_count accounting */
2765 	bytecnt = mp_cont_len(bp, &mblkcnt);
2766 
2767 	if (qbp) {
2768 		qbp->qb_count += bytecnt;
2769 		qbp->qb_mblkcnt += mblkcnt;
2770 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2771 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2772 			qbp->qb_flag |= QB_FULL;
2773 		}
2774 	} else {
2775 		q->q_count += bytecnt;
2776 		q->q_mblkcnt += mblkcnt;
2777 		if ((q->q_count >= q->q_hiwat) ||
2778 		    (q->q_mblkcnt >= q->q_hiwat)) {
2779 			q->q_flag |= QFULL;
2780 		}
2781 	}
2782 
2783 	STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, 0);
2784 
2785 	if ((mcls > QNORM) ||
2786 	    (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
2787 		qenable_locked(q);
2788 	ASSERT(MUTEX_HELD(QLOCK(q)));
2789 	if (freezer != curthread)
2790 		mutex_exit(QLOCK(q));
2791 
2792 	return (1);
2793 }
2794 
2795 /*
2796  * Put stuff back at beginning of Q according to priority order.
2797  * See comment on putq above for details.
2798  */
2799 int
putbq(queue_t * q,mblk_t * bp)2800 putbq(queue_t *q, mblk_t *bp)
2801 {
2802 	mblk_t *tmp;
2803 	qband_t *qbp = NULL;
2804 	int mcls = (int)queclass(bp);
2805 	kthread_id_t freezer;
2806 	int	bytecnt = 0, mblkcnt = 0;
2807 
2808 	ASSERT(q && bp);
2809 	ASSERT(bp->b_next == NULL);
2810 	freezer = STREAM(q)->sd_freezer;
2811 	if (freezer == curthread) {
2812 		ASSERT(frozenstr(q));
2813 		ASSERT(MUTEX_HELD(QLOCK(q)));
2814 	} else
2815 		mutex_enter(QLOCK(q));
2816 
2817 	/*
2818 	 * Make sanity checks and if qband structure is not yet
2819 	 * allocated, do so.
2820 	 */
2821 	if (mcls == QPCTL) {
2822 		if (bp->b_band != 0)
2823 			bp->b_band = 0;		/* force to be correct */
2824 	} else if (bp->b_band != 0) {
2825 		int i;
2826 		qband_t **qbpp;
2827 
2828 		if (bp->b_band > q->q_nband) {
2829 			qbpp = &q->q_bandp;
2830 			while (*qbpp)
2831 				qbpp = &(*qbpp)->qb_next;
2832 			while (bp->b_band > q->q_nband) {
2833 				if ((*qbpp = allocband()) == NULL) {
2834 					if (freezer != curthread)
2835 						mutex_exit(QLOCK(q));
2836 					return (0);
2837 				}
2838 				(*qbpp)->qb_hiwat = q->q_hiwat;
2839 				(*qbpp)->qb_lowat = q->q_lowat;
2840 				q->q_nband++;
2841 				qbpp = &(*qbpp)->qb_next;
2842 			}
2843 		}
2844 		qbp = q->q_bandp;
2845 		i = bp->b_band;
2846 		while (--i)
2847 			qbp = qbp->qb_next;
2848 	}
2849 
2850 	/*
2851 	 * If queue is empty or if message is high priority,
2852 	 * place on the front of the queue.
2853 	 */
2854 	tmp = q->q_first;
2855 	if ((!tmp) || (mcls == QPCTL)) {
2856 		bp->b_next = tmp;
2857 		if (tmp)
2858 			tmp->b_prev = bp;
2859 		else
2860 			q->q_last = bp;
2861 		q->q_first = bp;
2862 		bp->b_prev = NULL;
2863 		if (qbp) {
2864 			qbp->qb_first = bp;
2865 			qbp->qb_last = bp;
2866 		}
2867 	} else if (qbp) {	/* bp->b_band != 0 */
2868 		tmp = qbp->qb_first;
2869 		if (tmp) {
2870 
2871 			/*
2872 			 * Insert bp before the first message in this band.
2873 			 */
2874 			bp->b_next = tmp;
2875 			bp->b_prev = tmp->b_prev;
2876 			if (tmp->b_prev)
2877 				tmp->b_prev->b_next = bp;
2878 			else
2879 				q->q_first = bp;
2880 			tmp->b_prev = bp;
2881 		} else {
2882 			tmp = q->q_last;
2883 			if ((mcls < (int)queclass(tmp)) ||
2884 			    (bp->b_band < tmp->b_band)) {
2885 
2886 				/*
2887 				 * Tack bp on end of queue.
2888 				 */
2889 				bp->b_next = NULL;
2890 				bp->b_prev = tmp;
2891 				tmp->b_next = bp;
2892 				q->q_last = bp;
2893 			} else {
2894 				tmp = q->q_first;
2895 				while (tmp->b_datap->db_type >= QPCTL)
2896 					tmp = tmp->b_next;
2897 				while (tmp->b_band > bp->b_band)
2898 					tmp = tmp->b_next;
2899 
2900 				/*
2901 				 * Insert bp before tmp.
2902 				 */
2903 				bp->b_next = tmp;
2904 				bp->b_prev = tmp->b_prev;
2905 				if (tmp->b_prev)
2906 					tmp->b_prev->b_next = bp;
2907 				else
2908 					q->q_first = bp;
2909 				tmp->b_prev = bp;
2910 			}
2911 			qbp->qb_last = bp;
2912 		}
2913 		qbp->qb_first = bp;
2914 	} else {		/* bp->b_band == 0 && !QPCTL */
2915 
2916 		/*
2917 		 * If the queue class or band is less than that of the last
2918 		 * message on the queue, tack bp on the end of the queue.
2919 		 */
2920 		tmp = q->q_last;
2921 		if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
2922 			bp->b_next = NULL;
2923 			bp->b_prev = tmp;
2924 			tmp->b_next = bp;
2925 			q->q_last = bp;
2926 		} else {
2927 			tmp = q->q_first;
2928 			while (tmp->b_datap->db_type >= QPCTL)
2929 				tmp = tmp->b_next;
2930 			while (tmp->b_band > bp->b_band)
2931 				tmp = tmp->b_next;
2932 
2933 			/*
2934 			 * Insert bp before tmp.
2935 			 */
2936 			bp->b_next = tmp;
2937 			bp->b_prev = tmp->b_prev;
2938 			if (tmp->b_prev)
2939 				tmp->b_prev->b_next = bp;
2940 			else
2941 				q->q_first = bp;
2942 			tmp->b_prev = bp;
2943 		}
2944 	}
2945 
2946 	/* Get message byte count for q_count accounting */
2947 	bytecnt = mp_cont_len(bp, &mblkcnt);
2948 
2949 	if (qbp) {
2950 		qbp->qb_count += bytecnt;
2951 		qbp->qb_mblkcnt += mblkcnt;
2952 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2953 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2954 			qbp->qb_flag |= QB_FULL;
2955 		}
2956 	} else {
2957 		q->q_count += bytecnt;
2958 		q->q_mblkcnt += mblkcnt;
2959 		if ((q->q_count >= q->q_hiwat) ||
2960 		    (q->q_mblkcnt >= q->q_hiwat)) {
2961 			q->q_flag |= QFULL;
2962 		}
2963 	}
2964 
2965 	STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, 0);
2966 
2967 	if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
2968 		qenable_locked(q);
2969 	ASSERT(MUTEX_HELD(QLOCK(q)));
2970 	if (freezer != curthread)
2971 		mutex_exit(QLOCK(q));
2972 
2973 	return (1);
2974 }
2975 
2976 /*
2977  * Insert a message before an existing message on the queue.  If the
2978  * existing message is NULL, the new messages is placed on the end of
2979  * the queue.  The queue class of the new message is ignored.  However,
2980  * the priority band of the new message must adhere to the following
2981  * ordering:
2982  *
2983  *	emp->b_prev->b_band >= mp->b_band >= emp->b_band.
2984  *
2985  * All flow control parameters are updated.
2986  *
2987  * insq can be called with the stream frozen, but other utility functions
2988  * holding QLOCK, and by streams modules without any locks/frozen.
2989  */
2990 int
insq(queue_t * q,mblk_t * emp,mblk_t * mp)2991 insq(queue_t *q, mblk_t *emp, mblk_t *mp)
2992 {
2993 	mblk_t *tmp;
2994 	qband_t *qbp = NULL;
2995 	int mcls = (int)queclass(mp);
2996 	kthread_id_t freezer;
2997 	int	bytecnt = 0, mblkcnt = 0;
2998 
2999 	freezer = STREAM(q)->sd_freezer;
3000 	if (freezer == curthread) {
3001 		ASSERT(frozenstr(q));
3002 		ASSERT(MUTEX_HELD(QLOCK(q)));
3003 	} else if (MUTEX_HELD(QLOCK(q))) {
3004 		/* Don't drop lock on exit */
3005 		freezer = curthread;
3006 	} else
3007 		mutex_enter(QLOCK(q));
3008 
3009 	if (mcls == QPCTL) {
3010 		if (mp->b_band != 0)
3011 			mp->b_band = 0;		/* force to be correct */
3012 		if (emp && emp->b_prev &&
3013 		    (emp->b_prev->b_datap->db_type < QPCTL))
3014 			goto badord;
3015 	}
3016 	if (emp) {
3017 		if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
3018 		    (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
3019 		    (emp->b_prev->b_band < mp->b_band))) {
3020 			goto badord;
3021 		}
3022 	} else {
3023 		tmp = q->q_last;
3024 		if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
3025 badord:
3026 			cmn_err(CE_WARN,
3027 			    "insq: attempt to insert message out of order "
3028 			    "on q %p", (void *)q);
3029 			if (freezer != curthread)
3030 				mutex_exit(QLOCK(q));
3031 			return (0);
3032 		}
3033 	}
3034 
3035 	if (mp->b_band != 0) {
3036 		int i;
3037 		qband_t **qbpp;
3038 
3039 		if (mp->b_band > q->q_nband) {
3040 			qbpp = &q->q_bandp;
3041 			while (*qbpp)
3042 				qbpp = &(*qbpp)->qb_next;
3043 			while (mp->b_band > q->q_nband) {
3044 				if ((*qbpp = allocband()) == NULL) {
3045 					if (freezer != curthread)
3046 						mutex_exit(QLOCK(q));
3047 					return (0);
3048 				}
3049 				(*qbpp)->qb_hiwat = q->q_hiwat;
3050 				(*qbpp)->qb_lowat = q->q_lowat;
3051 				q->q_nband++;
3052 				qbpp = &(*qbpp)->qb_next;
3053 			}
3054 		}
3055 		qbp = q->q_bandp;
3056 		i = mp->b_band;
3057 		while (--i)
3058 			qbp = qbp->qb_next;
3059 	}
3060 
3061 	if ((mp->b_next = emp) != NULL) {
3062 		if ((mp->b_prev = emp->b_prev) != NULL)
3063 			emp->b_prev->b_next = mp;
3064 		else
3065 			q->q_first = mp;
3066 		emp->b_prev = mp;
3067 	} else {
3068 		if ((mp->b_prev = q->q_last) != NULL)
3069 			q->q_last->b_next = mp;
3070 		else
3071 			q->q_first = mp;
3072 		q->q_last = mp;
3073 	}
3074 
3075 	/* Get mblk and byte count for q_count accounting */
3076 	bytecnt = mp_cont_len(mp, &mblkcnt);
3077 
3078 	if (qbp) {	/* adjust qband pointers and count */
3079 		if (!qbp->qb_first) {
3080 			qbp->qb_first = mp;
3081 			qbp->qb_last = mp;
3082 		} else {
3083 			if (mp->b_prev == NULL || (mp->b_prev != NULL &&
3084 			    (mp->b_prev->b_band != mp->b_band)))
3085 				qbp->qb_first = mp;
3086 			else if (mp->b_next == NULL || (mp->b_next != NULL &&
3087 			    (mp->b_next->b_band != mp->b_band)))
3088 				qbp->qb_last = mp;
3089 		}
3090 		qbp->qb_count += bytecnt;
3091 		qbp->qb_mblkcnt += mblkcnt;
3092 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
3093 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
3094 			qbp->qb_flag |= QB_FULL;
3095 		}
3096 	} else {
3097 		q->q_count += bytecnt;
3098 		q->q_mblkcnt += mblkcnt;
3099 		if ((q->q_count >= q->q_hiwat) ||
3100 		    (q->q_mblkcnt >= q->q_hiwat)) {
3101 			q->q_flag |= QFULL;
3102 		}
3103 	}
3104 
3105 	STR_FTEVENT_MSG(mp, q, FTEV_INSQ, 0);
3106 
3107 	if (canenable(q) && (q->q_flag & QWANTR))
3108 		qenable_locked(q);
3109 
3110 	ASSERT(MUTEX_HELD(QLOCK(q)));
3111 	if (freezer != curthread)
3112 		mutex_exit(QLOCK(q));
3113 
3114 	return (1);
3115 }
3116 
3117 /*
3118  * Create and put a control message on queue.
3119  */
3120 int
putctl(queue_t * q,int type)3121 putctl(queue_t *q, int type)
3122 {
3123 	mblk_t *bp;
3124 
3125 	if ((datamsg(type) && (type != M_DELAY)) ||
3126 	    (bp = allocb_tryhard(0)) == NULL)
3127 		return (0);
3128 	bp->b_datap->db_type = (unsigned char) type;
3129 
3130 	put(q, bp);
3131 
3132 	return (1);
3133 }
3134 
3135 /*
3136  * Control message with a single-byte parameter
3137  */
3138 int
putctl1(queue_t * q,int type,int param)3139 putctl1(queue_t *q, int type, int param)
3140 {
3141 	mblk_t *bp;
3142 
3143 	if ((datamsg(type) && (type != M_DELAY)) ||
3144 	    (bp = allocb_tryhard(1)) == NULL)
3145 		return (0);
3146 	bp->b_datap->db_type = (unsigned char)type;
3147 	*bp->b_wptr++ = (unsigned char)param;
3148 
3149 	put(q, bp);
3150 
3151 	return (1);
3152 }
3153 
3154 int
putnextctl1(queue_t * q,int type,int param)3155 putnextctl1(queue_t *q, int type, int param)
3156 {
3157 	mblk_t *bp;
3158 
3159 	if ((datamsg(type) && (type != M_DELAY)) ||
3160 	    ((bp = allocb_tryhard(1)) == NULL))
3161 		return (0);
3162 
3163 	bp->b_datap->db_type = (unsigned char)type;
3164 	*bp->b_wptr++ = (unsigned char)param;
3165 
3166 	putnext(q, bp);
3167 
3168 	return (1);
3169 }
3170 
3171 int
putnextctl(queue_t * q,int type)3172 putnextctl(queue_t *q, int type)
3173 {
3174 	mblk_t *bp;
3175 
3176 	if ((datamsg(type) && (type != M_DELAY)) ||
3177 	    ((bp = allocb_tryhard(0)) == NULL))
3178 		return (0);
3179 	bp->b_datap->db_type = (unsigned char)type;
3180 
3181 	putnext(q, bp);
3182 
3183 	return (1);
3184 }
3185 
3186 /*
3187  * Return the queue upstream from this one
3188  */
3189 queue_t *
backq(queue_t * q)3190 backq(queue_t *q)
3191 {
3192 	q = _OTHERQ(q);
3193 	if (q->q_next) {
3194 		q = q->q_next;
3195 		return (_OTHERQ(q));
3196 	}
3197 	return (NULL);
3198 }
3199 
3200 /*
3201  * Send a block back up the queue in reverse from this
3202  * one (e.g. to respond to ioctls)
3203  */
3204 void
qreply(queue_t * q,mblk_t * bp)3205 qreply(queue_t *q, mblk_t *bp)
3206 {
3207 	ASSERT(q && bp);
3208 
3209 	putnext(_OTHERQ(q), bp);
3210 }
3211 
3212 /*
3213  * Streams Queue Scheduling
3214  *
3215  * Queues are enabled through qenable() when they have messages to
3216  * process.  They are serviced by queuerun(), which runs each enabled
3217  * queue's service procedure.  The call to queuerun() is processor
3218  * dependent - the general principle is that it be run whenever a queue
3219  * is enabled but before returning to user level.  For system calls,
3220  * the function runqueues() is called if their action causes a queue
3221  * to be enabled.  For device interrupts, queuerun() should be
3222  * called before returning from the last level of interrupt.  Beyond
3223  * this, no timing assumptions should be made about queue scheduling.
3224  */
3225 
3226 /*
3227  * Enable a queue: put it on list of those whose service procedures are
3228  * ready to run and set up the scheduling mechanism.
3229  * The broadcast is done outside the mutex -> to avoid the woken thread
3230  * from contending with the mutex. This is OK 'cos the queue has been
3231  * enqueued on the runlist and flagged safely at this point.
3232  */
3233 void
qenable(queue_t * q)3234 qenable(queue_t *q)
3235 {
3236 	mutex_enter(QLOCK(q));
3237 	qenable_locked(q);
3238 	mutex_exit(QLOCK(q));
3239 }
3240 /*
3241  * Return number of messages on queue
3242  */
3243 int
qsize(queue_t * qp)3244 qsize(queue_t *qp)
3245 {
3246 	int count = 0;
3247 	mblk_t *mp;
3248 
3249 	mutex_enter(QLOCK(qp));
3250 	for (mp = qp->q_first; mp; mp = mp->b_next)
3251 		count++;
3252 	mutex_exit(QLOCK(qp));
3253 	return (count);
3254 }
3255 
3256 /*
3257  * noenable - set queue so that putq() will not enable it.
3258  * enableok - set queue so that putq() can enable it.
3259  */
3260 void
noenable(queue_t * q)3261 noenable(queue_t *q)
3262 {
3263 	mutex_enter(QLOCK(q));
3264 	q->q_flag |= QNOENB;
3265 	mutex_exit(QLOCK(q));
3266 }
3267 
3268 void
enableok(queue_t * q)3269 enableok(queue_t *q)
3270 {
3271 	mutex_enter(QLOCK(q));
3272 	q->q_flag &= ~QNOENB;
3273 	mutex_exit(QLOCK(q));
3274 }
3275 
3276 /*
3277  * Set queue fields.
3278  */
3279 int
strqset(queue_t * q,qfields_t what,unsigned char pri,intptr_t val)3280 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
3281 {
3282 	qband_t *qbp = NULL;
3283 	queue_t	*wrq;
3284 	int error = 0;
3285 	kthread_id_t freezer;
3286 
3287 	freezer = STREAM(q)->sd_freezer;
3288 	if (freezer == curthread) {
3289 		ASSERT(frozenstr(q));
3290 		ASSERT(MUTEX_HELD(QLOCK(q)));
3291 	} else
3292 		mutex_enter(QLOCK(q));
3293 
3294 	if (what >= QBAD) {
3295 		error = EINVAL;
3296 		goto done;
3297 	}
3298 	if (pri != 0) {
3299 		int i;
3300 		qband_t **qbpp;
3301 
3302 		if (pri > q->q_nband) {
3303 			qbpp = &q->q_bandp;
3304 			while (*qbpp)
3305 				qbpp = &(*qbpp)->qb_next;
3306 			while (pri > q->q_nband) {
3307 				if ((*qbpp = allocband()) == NULL) {
3308 					error = EAGAIN;
3309 					goto done;
3310 				}
3311 				(*qbpp)->qb_hiwat = q->q_hiwat;
3312 				(*qbpp)->qb_lowat = q->q_lowat;
3313 				q->q_nband++;
3314 				qbpp = &(*qbpp)->qb_next;
3315 			}
3316 		}
3317 		qbp = q->q_bandp;
3318 		i = pri;
3319 		while (--i)
3320 			qbp = qbp->qb_next;
3321 	}
3322 	switch (what) {
3323 
3324 	case QHIWAT:
3325 		if (qbp)
3326 			qbp->qb_hiwat = (size_t)val;
3327 		else
3328 			q->q_hiwat = (size_t)val;
3329 		break;
3330 
3331 	case QLOWAT:
3332 		if (qbp)
3333 			qbp->qb_lowat = (size_t)val;
3334 		else
3335 			q->q_lowat = (size_t)val;
3336 		break;
3337 
3338 	case QMAXPSZ:
3339 		if (qbp)
3340 			error = EINVAL;
3341 		else
3342 			q->q_maxpsz = (ssize_t)val;
3343 
3344 		/*
3345 		 * Performance concern, strwrite looks at the module below
3346 		 * the stream head for the maxpsz each time it does a write
3347 		 * we now cache it at the stream head.  Check to see if this
3348 		 * queue is sitting directly below the stream head.
3349 		 */
3350 		wrq = STREAM(q)->sd_wrq;
3351 		if (q != wrq->q_next)
3352 			break;
3353 
3354 		/*
3355 		 * If the stream is not frozen drop the current QLOCK and
3356 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3357 		 */
3358 		if (freezer != curthread) {
3359 			mutex_exit(QLOCK(q));
3360 			mutex_enter(QLOCK(wrq));
3361 		}
3362 		ASSERT(MUTEX_HELD(QLOCK(wrq)));
3363 
3364 		if (strmsgsz != 0) {
3365 			if (val == INFPSZ)
3366 				val = strmsgsz;
3367 			else  {
3368 				if (STREAM(q)->sd_vnode->v_type == VFIFO)
3369 					val = MIN(PIPE_BUF, val);
3370 				else
3371 					val = MIN(strmsgsz, val);
3372 			}
3373 		}
3374 		STREAM(q)->sd_qn_maxpsz = val;
3375 		if (freezer != curthread) {
3376 			mutex_exit(QLOCK(wrq));
3377 			mutex_enter(QLOCK(q));
3378 		}
3379 		break;
3380 
3381 	case QMINPSZ:
3382 		if (qbp)
3383 			error = EINVAL;
3384 		else
3385 			q->q_minpsz = (ssize_t)val;
3386 
3387 		/*
3388 		 * Performance concern, strwrite looks at the module below
3389 		 * the stream head for the maxpsz each time it does a write
3390 		 * we now cache it at the stream head.  Check to see if this
3391 		 * queue is sitting directly below the stream head.
3392 		 */
3393 		wrq = STREAM(q)->sd_wrq;
3394 		if (q != wrq->q_next)
3395 			break;
3396 
3397 		/*
3398 		 * If the stream is not frozen drop the current QLOCK and
3399 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3400 		 */
3401 		if (freezer != curthread) {
3402 			mutex_exit(QLOCK(q));
3403 			mutex_enter(QLOCK(wrq));
3404 		}
3405 		STREAM(q)->sd_qn_minpsz = (ssize_t)val;
3406 
3407 		if (freezer != curthread) {
3408 			mutex_exit(QLOCK(wrq));
3409 			mutex_enter(QLOCK(q));
3410 		}
3411 		break;
3412 
3413 	case QSTRUIOT:
3414 		if (qbp)
3415 			error = EINVAL;
3416 		else
3417 			q->q_struiot = (ushort_t)val;
3418 		break;
3419 
3420 	case QCOUNT:
3421 	case QFIRST:
3422 	case QLAST:
3423 	case QFLAG:
3424 		error = EPERM;
3425 		break;
3426 
3427 	default:
3428 		error = EINVAL;
3429 		break;
3430 	}
3431 done:
3432 	if (freezer != curthread)
3433 		mutex_exit(QLOCK(q));
3434 	return (error);
3435 }
3436 
3437 /*
3438  * Get queue fields.
3439  */
3440 int
strqget(queue_t * q,qfields_t what,unsigned char pri,void * valp)3441 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
3442 {
3443 	qband_t		*qbp = NULL;
3444 	int		error = 0;
3445 	kthread_id_t	freezer;
3446 
3447 	freezer = STREAM(q)->sd_freezer;
3448 	if (freezer == curthread) {
3449 		ASSERT(frozenstr(q));
3450 		ASSERT(MUTEX_HELD(QLOCK(q)));
3451 	} else
3452 		mutex_enter(QLOCK(q));
3453 	if (what >= QBAD) {
3454 		error = EINVAL;
3455 		goto done;
3456 	}
3457 	if (pri != 0) {
3458 		int i;
3459 		qband_t **qbpp;
3460 
3461 		if (pri > q->q_nband) {
3462 			qbpp = &q->q_bandp;
3463 			while (*qbpp)
3464 				qbpp = &(*qbpp)->qb_next;
3465 			while (pri > q->q_nband) {
3466 				if ((*qbpp = allocband()) == NULL) {
3467 					error = EAGAIN;
3468 					goto done;
3469 				}
3470 				(*qbpp)->qb_hiwat = q->q_hiwat;
3471 				(*qbpp)->qb_lowat = q->q_lowat;
3472 				q->q_nband++;
3473 				qbpp = &(*qbpp)->qb_next;
3474 			}
3475 		}
3476 		qbp = q->q_bandp;
3477 		i = pri;
3478 		while (--i)
3479 			qbp = qbp->qb_next;
3480 	}
3481 	switch (what) {
3482 	case QHIWAT:
3483 		if (qbp)
3484 			*(size_t *)valp = qbp->qb_hiwat;
3485 		else
3486 			*(size_t *)valp = q->q_hiwat;
3487 		break;
3488 
3489 	case QLOWAT:
3490 		if (qbp)
3491 			*(size_t *)valp = qbp->qb_lowat;
3492 		else
3493 			*(size_t *)valp = q->q_lowat;
3494 		break;
3495 
3496 	case QMAXPSZ:
3497 		if (qbp)
3498 			error = EINVAL;
3499 		else
3500 			*(ssize_t *)valp = q->q_maxpsz;
3501 		break;
3502 
3503 	case QMINPSZ:
3504 		if (qbp)
3505 			error = EINVAL;
3506 		else
3507 			*(ssize_t *)valp = q->q_minpsz;
3508 		break;
3509 
3510 	case QCOUNT:
3511 		if (qbp)
3512 			*(size_t *)valp = qbp->qb_count;
3513 		else
3514 			*(size_t *)valp = q->q_count;
3515 		break;
3516 
3517 	case QFIRST:
3518 		if (qbp)
3519 			*(mblk_t **)valp = qbp->qb_first;
3520 		else
3521 			*(mblk_t **)valp = q->q_first;
3522 		break;
3523 
3524 	case QLAST:
3525 		if (qbp)
3526 			*(mblk_t **)valp = qbp->qb_last;
3527 		else
3528 			*(mblk_t **)valp = q->q_last;
3529 		break;
3530 
3531 	case QFLAG:
3532 		if (qbp)
3533 			*(uint_t *)valp = qbp->qb_flag;
3534 		else
3535 			*(uint_t *)valp = q->q_flag;
3536 		break;
3537 
3538 	case QSTRUIOT:
3539 		if (qbp)
3540 			error = EINVAL;
3541 		else
3542 			*(short *)valp = q->q_struiot;
3543 		break;
3544 
3545 	default:
3546 		error = EINVAL;
3547 		break;
3548 	}
3549 done:
3550 	if (freezer != curthread)
3551 		mutex_exit(QLOCK(q));
3552 	return (error);
3553 }
3554 
3555 /*
3556  * Function awakes all in cvwait/sigwait/pollwait, on one of:
3557  *	QWANTWSYNC or QWANTR or QWANTW,
3558  *
3559  * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
3560  *	 deferred wakeup will be done. Also if strpoll() in progress then a
3561  *	 deferred pollwakeup will be done.
3562  */
3563 void
strwakeq(queue_t * q,int flag)3564 strwakeq(queue_t *q, int flag)
3565 {
3566 	stdata_t	*stp = STREAM(q);
3567 	pollhead_t	*pl;
3568 
3569 	mutex_enter(&stp->sd_lock);
3570 	pl = &stp->sd_pollist;
3571 	if (flag & QWANTWSYNC) {
3572 		ASSERT(!(q->q_flag & QREADR));
3573 		if (stp->sd_flag & WSLEEP) {
3574 			stp->sd_flag &= ~WSLEEP;
3575 			cv_broadcast(&stp->sd_wrq->q_wait);
3576 		} else {
3577 			stp->sd_wakeq |= WSLEEP;
3578 		}
3579 
3580 		mutex_exit(&stp->sd_lock);
3581 		pollwakeup(pl, POLLWRNORM);
3582 		mutex_enter(&stp->sd_lock);
3583 
3584 		if (stp->sd_sigflags & S_WRNORM)
3585 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3586 	} else if (flag & QWANTR) {
3587 		if (stp->sd_flag & RSLEEP) {
3588 			stp->sd_flag &= ~RSLEEP;
3589 			cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
3590 		} else {
3591 			stp->sd_wakeq |= RSLEEP;
3592 		}
3593 
3594 		mutex_exit(&stp->sd_lock);
3595 		pollwakeup(pl, POLLIN | POLLRDNORM);
3596 		mutex_enter(&stp->sd_lock);
3597 
3598 		{
3599 			int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
3600 
3601 			if (events)
3602 				strsendsig(stp->sd_siglist, events, 0, 0);
3603 		}
3604 	} else {
3605 		if (stp->sd_flag & WSLEEP) {
3606 			stp->sd_flag &= ~WSLEEP;
3607 			cv_broadcast(&stp->sd_wrq->q_wait);
3608 		}
3609 
3610 		mutex_exit(&stp->sd_lock);
3611 		pollwakeup(pl, POLLWRNORM);
3612 		mutex_enter(&stp->sd_lock);
3613 
3614 		if (stp->sd_sigflags & S_WRNORM)
3615 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3616 	}
3617 	mutex_exit(&stp->sd_lock);
3618 }
3619 
3620 int
struioget(queue_t * q,mblk_t * mp,struiod_t * dp,int noblock)3621 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
3622 {
3623 	stdata_t *stp = STREAM(q);
3624 	int typ  = STRUIOT_STANDARD;
3625 	uio_t	 *uiop = &dp->d_uio;
3626 	dblk_t	 *dbp;
3627 	ssize_t	 uiocnt;
3628 	ssize_t	 cnt;
3629 	unsigned char *ptr;
3630 	ssize_t	 resid;
3631 	int	 error = 0;
3632 	on_trap_data_t otd;
3633 	queue_t	*stwrq;
3634 
3635 	/*
3636 	 * Plumbing may change while taking the type so store the
3637 	 * queue in a temporary variable. It doesn't matter even
3638 	 * if the we take the type from the previous plumbing,
3639 	 * that's because if the plumbing has changed when we were
3640 	 * holding the queue in a temporary variable, we can continue
3641 	 * processing the message the way it would have been processed
3642 	 * in the old plumbing, without any side effects but a bit
3643 	 * extra processing for partial ip header checksum.
3644 	 *
3645 	 * This has been done to avoid holding the sd_lock which is
3646 	 * very hot.
3647 	 */
3648 
3649 	stwrq = stp->sd_struiowrq;
3650 	if (stwrq)
3651 		typ = stwrq->q_struiot;
3652 
3653 	for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
3654 		dbp = mp->b_datap;
3655 		ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
3656 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3657 		cnt = MIN(uiocnt, uiop->uio_resid);
3658 		if (!(dbp->db_struioflag & STRUIO_SPEC) ||
3659 		    (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
3660 			/*
3661 			 * Either this mblk has already been processed
3662 			 * or there is no more room in this mblk (?).
3663 			 */
3664 			continue;
3665 		}
3666 		switch (typ) {
3667 		case STRUIOT_STANDARD:
3668 			if (noblock) {
3669 				if (on_trap(&otd, OT_DATA_ACCESS)) {
3670 					no_trap();
3671 					error = EWOULDBLOCK;
3672 					goto out;
3673 				}
3674 			}
3675 			if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
3676 				if (noblock)
3677 					no_trap();
3678 				goto out;
3679 			}
3680 			if (noblock)
3681 				no_trap();
3682 			break;
3683 
3684 		default:
3685 			error = EIO;
3686 			goto out;
3687 		}
3688 		dbp->db_struioflag |= STRUIO_DONE;
3689 		dbp->db_cksumstuff += cnt;
3690 	}
3691 out:
3692 	if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
3693 		/*
3694 		 * A fault has occured and some bytes were moved to the
3695 		 * current mblk, the uio_t has already been updated by
3696 		 * the appropriate uio routine, so also update the mblk
3697 		 * to reflect this in case this same mblk chain is used
3698 		 * again (after the fault has been handled).
3699 		 */
3700 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3701 		if (uiocnt >= resid)
3702 			dbp->db_cksumstuff += resid;
3703 	}
3704 	return (error);
3705 }
3706 
3707 /*
3708  * Try to enter queue synchronously. Any attempt to enter a closing queue will
3709  * fails. The qp->q_rwcnt keeps track of the number of successful entries so
3710  * that removeq() will not try to close the queue while a thread is inside the
3711  * queue.
3712  */
3713 static boolean_t
rwnext_enter(queue_t * qp)3714 rwnext_enter(queue_t *qp)
3715 {
3716 	mutex_enter(QLOCK(qp));
3717 	if (qp->q_flag & QWCLOSE) {
3718 		mutex_exit(QLOCK(qp));
3719 		return (B_FALSE);
3720 	}
3721 	qp->q_rwcnt++;
3722 	ASSERT(qp->q_rwcnt != 0);
3723 	mutex_exit(QLOCK(qp));
3724 	return (B_TRUE);
3725 }
3726 
3727 /*
3728  * Decrease the count of threads running in sync stream queue and wake up any
3729  * threads blocked in removeq().
3730  */
3731 static void
rwnext_exit(queue_t * qp)3732 rwnext_exit(queue_t *qp)
3733 {
3734 	mutex_enter(QLOCK(qp));
3735 	qp->q_rwcnt--;
3736 	if (qp->q_flag & QWANTRMQSYNC) {
3737 		qp->q_flag &= ~QWANTRMQSYNC;
3738 		cv_broadcast(&qp->q_wait);
3739 	}
3740 	mutex_exit(QLOCK(qp));
3741 }
3742 
3743 /*
3744  * The purpose of rwnext() is to call the rw procedure of the next
3745  * (downstream) modules queue.
3746  *
3747  * treated as put entrypoint for perimeter syncronization.
3748  *
3749  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3750  * sync queues). If it is CIPUT sync queue sq_count is incremented and it does
3751  * not matter if any regular put entrypoints have been already entered. We
3752  * can't increment one of the sq_putcounts (instead of sq_count) because
3753  * qwait_rw won't know which counter to decrement.
3754  *
3755  * It would be reasonable to add the lockless FASTPUT logic.
3756  */
3757 int
rwnext(queue_t * qp,struiod_t * dp)3758 rwnext(queue_t *qp, struiod_t *dp)
3759 {
3760 	queue_t		*nqp;
3761 	syncq_t		*sq;
3762 	uint16_t	count;
3763 	uint16_t	flags;
3764 	struct qinit	*qi;
3765 	int		(*proc)();
3766 	struct stdata	*stp;
3767 	int		isread;
3768 	int		rval;
3769 
3770 	stp = STREAM(qp);
3771 	/*
3772 	 * Prevent q_next from changing by holding sd_lock until acquiring
3773 	 * SQLOCK. Note that a read-side rwnext from the streamhead will
3774 	 * already have sd_lock acquired. In either case sd_lock is always
3775 	 * released after acquiring SQLOCK.
3776 	 *
3777 	 * The streamhead read-side holding sd_lock when calling rwnext is
3778 	 * required to prevent a race condition were M_DATA mblks flowing
3779 	 * up the read-side of the stream could be bypassed by a rwnext()
3780 	 * down-call. In this case sd_lock acts as the streamhead perimeter.
3781 	 */
3782 	if ((nqp = _WR(qp)) == qp) {
3783 		isread = 0;
3784 		mutex_enter(&stp->sd_lock);
3785 		qp = nqp->q_next;
3786 	} else {
3787 		isread = 1;
3788 		if (nqp != stp->sd_wrq)
3789 			/* Not streamhead */
3790 			mutex_enter(&stp->sd_lock);
3791 		qp = _RD(nqp->q_next);
3792 	}
3793 	qi = qp->q_qinfo;
3794 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
3795 		/*
3796 		 * Not a synchronous module or no r/w procedure for this
3797 		 * queue, so just return EINVAL and let the caller handle it.
3798 		 */
3799 		mutex_exit(&stp->sd_lock);
3800 		return (EINVAL);
3801 	}
3802 
3803 	if (rwnext_enter(qp) == B_FALSE) {
3804 		mutex_exit(&stp->sd_lock);
3805 		return (EINVAL);
3806 	}
3807 
3808 	sq = qp->q_syncq;
3809 	mutex_enter(SQLOCK(sq));
3810 	mutex_exit(&stp->sd_lock);
3811 	count = sq->sq_count;
3812 	flags = sq->sq_flags;
3813 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3814 
3815 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3816 		/*
3817 		 * if this queue is being closed, return.
3818 		 */
3819 		if (qp->q_flag & QWCLOSE) {
3820 			mutex_exit(SQLOCK(sq));
3821 			rwnext_exit(qp);
3822 			return (EINVAL);
3823 		}
3824 
3825 		/*
3826 		 * Wait until we can enter the inner perimeter.
3827 		 */
3828 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3829 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3830 		count = sq->sq_count;
3831 		flags = sq->sq_flags;
3832 	}
3833 
3834 	if (isread == 0 && stp->sd_struiowrq == NULL ||
3835 	    isread == 1 && stp->sd_struiordq == NULL) {
3836 		/*
3837 		 * Stream plumbing changed while waiting for inner perimeter
3838 		 * so just return EINVAL and let the caller handle it.
3839 		 */
3840 		mutex_exit(SQLOCK(sq));
3841 		rwnext_exit(qp);
3842 		return (EINVAL);
3843 	}
3844 	if (!(flags & SQ_CIPUT))
3845 		sq->sq_flags = flags | SQ_EXCL;
3846 	sq->sq_count = count + 1;
3847 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3848 	/*
3849 	 * Note: The only message ordering guarantee that rwnext() makes is
3850 	 *	 for the write queue flow-control case. All others (r/w queue
3851 	 *	 with q_count > 0 (or q_first != 0)) are the resposibilty of
3852 	 *	 the queue's rw procedure. This could be genralized here buy
3853 	 *	 running the queue's service procedure, but that wouldn't be
3854 	 *	 the most efficent for all cases.
3855 	 */
3856 	mutex_exit(SQLOCK(sq));
3857 	if (! isread && (qp->q_flag & QFULL)) {
3858 		/*
3859 		 * Write queue may be flow controlled. If so,
3860 		 * mark the queue for wakeup when it's not.
3861 		 */
3862 		mutex_enter(QLOCK(qp));
3863 		if (qp->q_flag & QFULL) {
3864 			qp->q_flag |= QWANTWSYNC;
3865 			mutex_exit(QLOCK(qp));
3866 			rval = EWOULDBLOCK;
3867 			goto out;
3868 		}
3869 		mutex_exit(QLOCK(qp));
3870 	}
3871 
3872 	if (! isread && dp->d_mp)
3873 		STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
3874 		    dp->d_mp->b_datap->db_base);
3875 
3876 	rval = (*proc)(qp, dp);
3877 
3878 	if (isread && dp->d_mp)
3879 		STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
3880 		    dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
3881 out:
3882 	/*
3883 	 * The queue is protected from being freed by sq_count, so it is
3884 	 * safe to call rwnext_exit and reacquire SQLOCK(sq).
3885 	 */
3886 	rwnext_exit(qp);
3887 
3888 	mutex_enter(SQLOCK(sq));
3889 	flags = sq->sq_flags;
3890 	ASSERT(sq->sq_count != 0);
3891 	sq->sq_count--;
3892 	if (flags & SQ_TAIL) {
3893 		putnext_tail(sq, qp, flags);
3894 		/*
3895 		 * The only purpose of this ASSERT is to preserve calling stack
3896 		 * in DEBUG kernel.
3897 		 */
3898 		ASSERT(flags & SQ_TAIL);
3899 		return (rval);
3900 	}
3901 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3902 	/*
3903 	 * Safe to always drop SQ_EXCL:
3904 	 *	Not SQ_CIPUT means we set SQ_EXCL above
3905 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3906 	 *	did a qwriter(INNER) in which case nobody else
3907 	 *	is in the inner perimeter and we are exiting.
3908 	 *
3909 	 * I would like to make the following assertion:
3910 	 *
3911 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3912 	 *	sq->sq_count == 0);
3913 	 *
3914 	 * which indicates that if we are both putshared and exclusive,
3915 	 * we became exclusive while executing the putproc, and the only
3916 	 * claim on the syncq was the one we dropped a few lines above.
3917 	 * But other threads that enter putnext while the syncq is exclusive
3918 	 * need to make a claim as they may need to drop SQLOCK in the
3919 	 * has_writers case to avoid deadlocks.  If these threads are
3920 	 * delayed or preempted, it is possible that the writer thread can
3921 	 * find out that there are other claims making the (sq_count == 0)
3922 	 * test invalid.
3923 	 */
3924 
3925 	sq->sq_flags = flags & ~SQ_EXCL;
3926 	if (sq->sq_flags & SQ_WANTWAKEUP) {
3927 		sq->sq_flags &= ~SQ_WANTWAKEUP;
3928 		cv_broadcast(&sq->sq_wait);
3929 	}
3930 	mutex_exit(SQLOCK(sq));
3931 	return (rval);
3932 }
3933 
3934 /*
3935  * The purpose of infonext() is to call the info procedure of the next
3936  * (downstream) modules queue.
3937  *
3938  * treated as put entrypoint for perimeter syncronization.
3939  *
3940  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3941  * sync queues). If it is CIPUT sync queue regular sq_count is incremented and
3942  * it does not matter if any regular put entrypoints have been already
3943  * entered.
3944  */
3945 int
infonext(queue_t * qp,infod_t * idp)3946 infonext(queue_t *qp, infod_t *idp)
3947 {
3948 	queue_t		*nqp;
3949 	syncq_t		*sq;
3950 	uint16_t	count;
3951 	uint16_t	flags;
3952 	struct qinit	*qi;
3953 	int		(*proc)();
3954 	struct stdata	*stp;
3955 	int		rval;
3956 
3957 	stp = STREAM(qp);
3958 	/*
3959 	 * Prevent q_next from changing by holding sd_lock until
3960 	 * acquiring SQLOCK.
3961 	 */
3962 	mutex_enter(&stp->sd_lock);
3963 	if ((nqp = _WR(qp)) == qp) {
3964 		qp = nqp->q_next;
3965 	} else {
3966 		qp = _RD(nqp->q_next);
3967 	}
3968 	qi = qp->q_qinfo;
3969 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
3970 		mutex_exit(&stp->sd_lock);
3971 		return (EINVAL);
3972 	}
3973 	sq = qp->q_syncq;
3974 	mutex_enter(SQLOCK(sq));
3975 	mutex_exit(&stp->sd_lock);
3976 	count = sq->sq_count;
3977 	flags = sq->sq_flags;
3978 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3979 
3980 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3981 		/*
3982 		 * Wait until we can enter the inner perimeter.
3983 		 */
3984 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3985 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3986 		count = sq->sq_count;
3987 		flags = sq->sq_flags;
3988 	}
3989 
3990 	if (! (flags & SQ_CIPUT))
3991 		sq->sq_flags = flags | SQ_EXCL;
3992 	sq->sq_count = count + 1;
3993 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3994 	mutex_exit(SQLOCK(sq));
3995 
3996 	rval = (*proc)(qp, idp);
3997 
3998 	mutex_enter(SQLOCK(sq));
3999 	flags = sq->sq_flags;
4000 	ASSERT(sq->sq_count != 0);
4001 	sq->sq_count--;
4002 	if (flags & SQ_TAIL) {
4003 		putnext_tail(sq, qp, flags);
4004 		/*
4005 		 * The only purpose of this ASSERT is to preserve calling stack
4006 		 * in DEBUG kernel.
4007 		 */
4008 		ASSERT(flags & SQ_TAIL);
4009 		return (rval);
4010 	}
4011 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
4012 /*
4013  * XXXX
4014  * I am not certain the next comment is correct here.  I need to consider
4015  * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
4016  * might cause other problems.  It just might be safer to drop it if
4017  * !SQ_CIPUT because that is when we set it.
4018  */
4019 	/*
4020 	 * Safe to always drop SQ_EXCL:
4021 	 *	Not SQ_CIPUT means we set SQ_EXCL above
4022 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
4023 	 *	did a qwriter(INNER) in which case nobody else
4024 	 *	is in the inner perimeter and we are exiting.
4025 	 *
4026 	 * I would like to make the following assertion:
4027 	 *
4028 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
4029 	 *	sq->sq_count == 0);
4030 	 *
4031 	 * which indicates that if we are both putshared and exclusive,
4032 	 * we became exclusive while executing the putproc, and the only
4033 	 * claim on the syncq was the one we dropped a few lines above.
4034 	 * But other threads that enter putnext while the syncq is exclusive
4035 	 * need to make a claim as they may need to drop SQLOCK in the
4036 	 * has_writers case to avoid deadlocks.  If these threads are
4037 	 * delayed or preempted, it is possible that the writer thread can
4038 	 * find out that there are other claims making the (sq_count == 0)
4039 	 * test invalid.
4040 	 */
4041 
4042 	sq->sq_flags = flags & ~SQ_EXCL;
4043 	mutex_exit(SQLOCK(sq));
4044 	return (rval);
4045 }
4046 
4047 /*
4048  * Return nonzero if the queue is responsible for struio(), else return 0.
4049  */
4050 int
isuioq(queue_t * q)4051 isuioq(queue_t *q)
4052 {
4053 	if (q->q_flag & QREADR)
4054 		return (STREAM(q)->sd_struiordq == q);
4055 	else
4056 		return (STREAM(q)->sd_struiowrq == q);
4057 }
4058 
4059 #if defined(__sparc)
4060 int disable_putlocks = 0;
4061 #else
4062 int disable_putlocks = 1;
4063 #endif
4064 
4065 /*
4066  * called by create_putlock.
4067  */
4068 static void
create_syncq_putlocks(queue_t * q)4069 create_syncq_putlocks(queue_t *q)
4070 {
4071 	syncq_t	*sq = q->q_syncq;
4072 	ciputctrl_t *cip;
4073 	int i;
4074 
4075 	ASSERT(sq != NULL);
4076 
4077 	ASSERT(disable_putlocks == 0);
4078 	ASSERT(n_ciputctrl >= min_n_ciputctrl);
4079 	ASSERT(ciputctrl_cache != NULL);
4080 
4081 	if (!(sq->sq_type & SQ_CIPUT))
4082 		return;
4083 
4084 	for (i = 0; i <= 1; i++) {
4085 		if (sq->sq_ciputctrl == NULL) {
4086 			cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4087 			SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4088 			mutex_enter(SQLOCK(sq));
4089 			if (sq->sq_ciputctrl != NULL) {
4090 				mutex_exit(SQLOCK(sq));
4091 				kmem_cache_free(ciputctrl_cache, cip);
4092 			} else {
4093 				ASSERT(sq->sq_nciputctrl == 0);
4094 				sq->sq_nciputctrl = n_ciputctrl - 1;
4095 				/*
4096 				 * putnext checks sq_ciputctrl without holding
4097 				 * SQLOCK. if it is not NULL putnext assumes
4098 				 * sq_nciputctrl is initialized. membar below
4099 				 * insures that.
4100 				 */
4101 				membar_producer();
4102 				sq->sq_ciputctrl = cip;
4103 				mutex_exit(SQLOCK(sq));
4104 			}
4105 		}
4106 		ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
4107 		if (i == 1)
4108 			break;
4109 		q = _OTHERQ(q);
4110 		if (!(q->q_flag & QPERQ)) {
4111 			ASSERT(sq == q->q_syncq);
4112 			break;
4113 		}
4114 		ASSERT(q->q_syncq != NULL);
4115 		ASSERT(sq != q->q_syncq);
4116 		sq = q->q_syncq;
4117 		ASSERT(sq->sq_type & SQ_CIPUT);
4118 	}
4119 }
4120 
4121 /*
4122  * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
4123  * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
4124  * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
4125  * starting from q and down to the driver.
4126  *
4127  * This should be called after the affected queues are part of stream
4128  * geometry. It should be called from driver/module open routine after
4129  * qprocson() call. It is also called from nfs syscall where it is known that
4130  * stream is configured and won't change its geometry during create_putlock
4131  * call.
4132  *
4133  * caller normally uses 0 value for the stream argument to speed up MT putnext
4134  * into the perimeter of q for example because its perimeter is per module
4135  * (e.g. IP).
4136  *
4137  * caller normally uses non 0 value for the stream argument to hint the system
4138  * that the stream of q is a very contended global system stream
4139  * (e.g. NFS/UDP) and the part of the stream from q to the driver is
4140  * particularly MT hot.
4141  *
4142  * Caller insures stream plumbing won't happen while we are here and therefore
4143  * q_next can be safely used.
4144  */
4145 
4146 void
create_putlocks(queue_t * q,int stream)4147 create_putlocks(queue_t *q, int stream)
4148 {
4149 	ciputctrl_t	*cip;
4150 	struct stdata	*stp = STREAM(q);
4151 
4152 	q = _WR(q);
4153 	ASSERT(stp != NULL);
4154 
4155 	if (disable_putlocks != 0)
4156 		return;
4157 
4158 	if (n_ciputctrl < min_n_ciputctrl)
4159 		return;
4160 
4161 	ASSERT(ciputctrl_cache != NULL);
4162 
4163 	if (stream != 0 && stp->sd_ciputctrl == NULL) {
4164 		cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4165 		SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4166 		mutex_enter(&stp->sd_lock);
4167 		if (stp->sd_ciputctrl != NULL) {
4168 			mutex_exit(&stp->sd_lock);
4169 			kmem_cache_free(ciputctrl_cache, cip);
4170 		} else {
4171 			ASSERT(stp->sd_nciputctrl == 0);
4172 			stp->sd_nciputctrl = n_ciputctrl - 1;
4173 			/*
4174 			 * putnext checks sd_ciputctrl without holding
4175 			 * sd_lock. if it is not NULL putnext assumes
4176 			 * sd_nciputctrl is initialized. membar below
4177 			 * insures that.
4178 			 */
4179 			membar_producer();
4180 			stp->sd_ciputctrl = cip;
4181 			mutex_exit(&stp->sd_lock);
4182 		}
4183 	}
4184 
4185 	ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);
4186 
4187 	while (_SAMESTR(q)) {
4188 		create_syncq_putlocks(q);
4189 		if (stream == 0)
4190 			return;
4191 		q = q->q_next;
4192 	}
4193 	ASSERT(q != NULL);
4194 	create_syncq_putlocks(q);
4195 }
4196 
4197 /*
4198  * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
4199  * through a stream.
4200  *
4201  * Data currently record per-event is a timestamp, module/driver name,
4202  * downstream module/driver name, optional callstack, event type and a per
4203  * type datum.  Much of the STREAMS framework is instrumented for automatic
4204  * flow tracing (when enabled).  Events can be defined and used by STREAMS
4205  * modules and drivers.
4206  *
4207  * Global objects:
4208  *
4209  *	str_ftevent() - Add a flow-trace event to a dblk.
4210  *	str_ftfree() - Free flow-trace data
4211  *
4212  * Local objects:
4213  *
4214  *	fthdr_cache - pointer to the kmem cache for trace header.
4215  *	ftblk_cache - pointer to the kmem cache for trace data blocks.
4216  */
4217 
4218 int str_ftnever = 1;	/* Don't do STREAMS flow tracing */
4219 int str_ftstack = 0;	/* Don't record event call stacks */
4220 
4221 void
str_ftevent(fthdr_t * hp,void * p,ushort_t evnt,ushort_t data)4222 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
4223 {
4224 	ftblk_t *bp = hp->tail;
4225 	ftblk_t *nbp;
4226 	ftevnt_t *ep;
4227 	int ix, nix;
4228 
4229 	ASSERT(hp != NULL);
4230 
4231 	for (;;) {
4232 		if ((ix = bp->ix) == FTBLK_EVNTS) {
4233 			/*
4234 			 * Tail doesn't have room, so need a new tail.
4235 			 *
4236 			 * To make this MT safe, first, allocate a new
4237 			 * ftblk, and initialize it.  To make life a
4238 			 * little easier, reserve the first slot (mostly
4239 			 * by making ix = 1).  When we are finished with
4240 			 * the initialization, CAS this pointer to the
4241 			 * tail.  If this succeeds, this is the new
4242 			 * "next" block.  Otherwise, another thread
4243 			 * got here first, so free the block and start
4244 			 * again.
4245 			 */
4246 			nbp = kmem_cache_alloc(ftblk_cache, KM_NOSLEEP);
4247 			if (nbp == NULL) {
4248 				/* no mem, so punt */
4249 				str_ftnever++;
4250 				/* free up all flow data? */
4251 				return;
4252 			}
4253 			nbp->nxt = NULL;
4254 			nbp->ix = 1;
4255 			/*
4256 			 * Just in case there is another thread about
4257 			 * to get the next index, we need to make sure
4258 			 * the value is there for it.
4259 			 */
4260 			membar_producer();
4261 			if (atomic_cas_ptr(&hp->tail, bp, nbp) == bp) {
4262 				/* CAS was successful */
4263 				bp->nxt = nbp;
4264 				membar_producer();
4265 				bp = nbp;
4266 				ix = 0;
4267 				goto cas_good;
4268 			} else {
4269 				kmem_cache_free(ftblk_cache, nbp);
4270 				bp = hp->tail;
4271 				continue;
4272 			}
4273 		}
4274 		nix = ix + 1;
4275 		if (atomic_cas_32((uint32_t *)&bp->ix, ix, nix) == ix) {
4276 		cas_good:
4277 			if (curthread != hp->thread) {
4278 				hp->thread = curthread;
4279 				evnt |= FTEV_CS;
4280 			}
4281 			if (CPU->cpu_seqid != hp->cpu_seqid) {
4282 				hp->cpu_seqid = CPU->cpu_seqid;
4283 				evnt |= FTEV_PS;
4284 			}
4285 			ep = &bp->ev[ix];
4286 			break;
4287 		}
4288 	}
4289 
4290 	if (evnt & FTEV_QMASK) {
4291 		queue_t *qp = p;
4292 
4293 		if (!(qp->q_flag & QREADR))
4294 			evnt |= FTEV_ISWR;
4295 
4296 		ep->mid = Q2NAME(qp);
4297 
4298 		/*
4299 		 * We only record the next queue name for FTEV_PUTNEXT since
4300 		 * that's the only time we *really* need it, and the putnext()
4301 		 * code ensures that qp->q_next won't vanish.  (We could use
4302 		 * claimstr()/releasestr() but at a performance cost.)
4303 		 */
4304 		if ((evnt & FTEV_MASK) == FTEV_PUTNEXT && qp->q_next != NULL)
4305 			ep->midnext = Q2NAME(qp->q_next);
4306 		else
4307 			ep->midnext = NULL;
4308 	} else {
4309 		ep->mid = p;
4310 		ep->midnext = NULL;
4311 	}
4312 
4313 	if (ep->stk != NULL)
4314 		ep->stk->fs_depth = getpcstack(ep->stk->fs_stk, FTSTK_DEPTH);
4315 
4316 	ep->ts = gethrtime();
4317 	ep->evnt = evnt;
4318 	ep->data = data;
4319 	hp->hash = (hp->hash << 9) + hp->hash;
4320 	hp->hash += (evnt << 16) | data;
4321 	hp->hash += (uintptr_t)ep->mid;
4322 }
4323 
4324 /*
4325  * Free flow-trace data.
4326  */
4327 void
str_ftfree(dblk_t * dbp)4328 str_ftfree(dblk_t *dbp)
4329 {
4330 	fthdr_t *hp = dbp->db_fthdr;
4331 	ftblk_t *bp = &hp->first;
4332 	ftblk_t *nbp;
4333 
4334 	if (bp != hp->tail || bp->ix != 0) {
4335 		/*
4336 		 * Clear out the hash, have the tail point to itself, and free
4337 		 * any continuation blocks.
4338 		 */
4339 		bp = hp->first.nxt;
4340 		hp->tail = &hp->first;
4341 		hp->hash = 0;
4342 		hp->first.nxt = NULL;
4343 		hp->first.ix = 0;
4344 		while (bp != NULL) {
4345 			nbp = bp->nxt;
4346 			kmem_cache_free(ftblk_cache, bp);
4347 			bp = nbp;
4348 		}
4349 	}
4350 	kmem_cache_free(fthdr_cache, hp);
4351 	dbp->db_fthdr = NULL;
4352 }
4353