1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */
22 /* All Rights Reserved */
23
24 /*
25 * Copyright 2009 Sun Microsystems, Inc. All rights reserved.
26 * Use is subject to license terms.
27 *
28 * Copyright 2021 Tintri by DDN, Inc. All rights reserved.
29 * Copyright 2022 Garrett D'Amore
30 * Copyright 2026 Oxide Computer Company
31 */
32
33 #include <sys/types.h>
34 #include <sys/param.h>
35 #include <sys/thread.h>
36 #include <sys/sysmacros.h>
37 #include <sys/stropts.h>
38 #include <sys/stream.h>
39 #include <sys/strsubr.h>
40 #include <sys/strsun.h>
41 #include <sys/conf.h>
42 #include <sys/debug.h>
43 #include <sys/cmn_err.h>
44 #include <sys/kmem.h>
45 #include <sys/atomic.h>
46 #include <sys/errno.h>
47 #include <sys/vtrace.h>
48 #include <sys/ftrace.h>
49 #include <sys/ontrap.h>
50 #include <sys/sdt.h>
51 #include <sys/strft.h>
52
53 #ifdef DEBUG
54 #include <sys/kmem_impl.h>
55 #endif
56
57 /*
58 * This file contains all the STREAMS utility routines that may
59 * be used by modules and drivers.
60 */
61
62 /*
63 * STREAMS message allocator: principles of operation
64 *
65 * The streams message allocator consists of all the routines that
66 * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
67 * dupb(), freeb() and freemsg(). What follows is a high-level view
68 * of how the allocator works.
69 *
70 * Every streams message consists of one or more mblks, a dblk, and data.
71 * All mblks for all types of messages come from a common mblk_cache.
72 * The dblk and data come in several flavors, depending on how the
73 * message is allocated:
74 *
75 * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
76 * fixed-size dblk/data caches. For message sizes that are multiples of
77 * PAGESIZE, dblks are allocated separately from the buffer.
78 * The associated buffer is allocated by the constructor using kmem_alloc().
79 * For all other message sizes, dblk and its associated data is allocated
80 * as a single contiguous chunk of memory.
81 * Objects in these caches consist of a dblk plus its associated data.
82 * allocb() determines the nearest-size cache by table lookup:
83 * the dblk_cache[] array provides the mapping from size to dblk cache.
84 *
85 * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
86 * kmem_alloc()'ing a buffer for the data and supplying that
87 * buffer to gesballoc(), described below.
88 *
89 * (3) The four flavors of [d]esballoc[a] are all implemented by a
90 * common routine, gesballoc() ("generic esballoc"). gesballoc()
91 * allocates a dblk from the global dblk_esb_cache and sets db_base,
92 * db_lim and db_frtnp to describe the caller-supplied buffer.
93 *
94 * While there are several routines to allocate messages, there is only
95 * one routine to free messages: freeb(). freeb() simply invokes the
96 * dblk's free method, dbp->db_free(), which is set at allocation time.
97 *
98 * dupb() creates a new reference to a message by allocating a new mblk,
99 * incrementing the dblk reference count and setting the dblk's free
100 * method to dblk_decref(). The dblk's original free method is retained
101 * in db_lastfree. dblk_decref() decrements the reference count on each
102 * freeb(). If this is not the last reference it just frees the mblk;
103 * if this *is* the last reference, it restores db_free to db_lastfree,
104 * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
105 *
106 * The implementation makes aggressive use of kmem object caching for
107 * maximum performance. This makes the code simple and compact, but
108 * also a bit abstruse in some places. The invariants that constitute a
109 * message's constructed state, described below, are more subtle than usual.
110 *
111 * Every dblk has an "attached mblk" as part of its constructed state.
112 * The mblk is allocated by the dblk's constructor and remains attached
113 * until the message is either dup'ed or pulled up. In the dupb() case
114 * the mblk association doesn't matter until the last free, at which time
115 * dblk_decref() attaches the last mblk to the dblk. pullupmsg() affects
116 * the mblk association because it swaps the leading mblks of two messages,
117 * so it is responsible for swapping their db_mblk pointers accordingly.
118 * From a constructed-state viewpoint it doesn't matter that a dblk's
119 * attached mblk can change while the message is allocated; all that
120 * matters is that the dblk has *some* attached mblk when it's freed.
121 *
122 * The sizes of the allocb() small-message caches are not magical.
123 * They represent a good trade-off between internal and external
124 * fragmentation for current workloads. They should be reevaluated
125 * periodically, especially if allocations larger than DBLK_MAX_CACHE
126 * become common. We use 64-byte alignment so that dblks don't
127 * straddle cache lines unnecessarily.
128 */
129 #define DBLK_MAX_CACHE 73728
130 #define DBLK_CACHE_ALIGN 64
131 #define DBLK_MIN_SIZE 8
132 #define DBLK_SIZE_SHIFT 3
133
134 #ifdef _BIG_ENDIAN
135 #define DBLK_RTFU_SHIFT(field) \
136 (8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
137 #else
138 #define DBLK_RTFU_SHIFT(field) \
139 (8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
140 #endif
141
142 #define DBLK_RTFU(ref, type, flags, uioflag) \
143 (((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
144 ((type) << DBLK_RTFU_SHIFT(db_type)) | \
145 (((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
146 ((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
147 #define DBLK_RTFU_REF_MASK (DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
148 #define DBLK_RTFU_WORD(dbp) (*((uint32_t *)&(dbp)->db_ref))
149 #define MBLK_BAND_FLAG_WORD(mp) (*((uint32_t *)&(mp)->b_band))
150
151 static size_t dblk_sizes[] = {
152 #ifdef _LP64
153 16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856,
154 8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624,
155 40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392,
156 #else
157 64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904,
158 8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672,
159 40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440,
160 #endif
161 DBLK_MAX_CACHE, 0
162 };
163
164 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
165 static struct kmem_cache *mblk_cache;
166 static struct kmem_cache *dblk_esb_cache;
167 static struct kmem_cache *fthdr_cache;
168 static struct kmem_cache *ftblk_cache;
169
170 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
171 static mblk_t *allocb_oversize(size_t size, int flags);
172 static int allocb_tryhard_fails;
173 static void frnop_func(void *arg);
174 frtn_t frnop = { frnop_func };
175 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
176
177 static boolean_t rwnext_enter(queue_t *qp);
178 static void rwnext_exit(queue_t *qp);
179
180 /*
181 * Patchable mblk/dblk kmem_cache flags.
182 */
183 int dblk_kmem_flags = 0;
184 int mblk_kmem_flags = 0;
185
186 static int
dblk_constructor(void * buf,void * cdrarg,int kmflags)187 dblk_constructor(void *buf, void *cdrarg, int kmflags)
188 {
189 dblk_t *dbp = buf;
190 ssize_t msg_size = (ssize_t)cdrarg;
191 size_t index;
192
193 ASSERT(msg_size != 0);
194
195 index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
196
197 ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
198
199 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
200 return (-1);
201 if ((msg_size & PAGEOFFSET) == 0) {
202 dbp->db_base = kmem_alloc(msg_size, kmflags);
203 if (dbp->db_base == NULL) {
204 kmem_cache_free(mblk_cache, dbp->db_mblk);
205 return (-1);
206 }
207 } else {
208 dbp->db_base = (unsigned char *)&dbp[1];
209 }
210
211 dbp->db_mblk->b_datap = dbp;
212 dbp->db_cache = dblk_cache[index];
213 dbp->db_lim = dbp->db_base + msg_size;
214 dbp->db_free = dbp->db_lastfree = dblk_lastfree;
215 dbp->db_frtnp = NULL;
216 dbp->db_fthdr = NULL;
217 dbp->db_credp = NULL;
218 dbp->db_cpid = -1;
219 dbp->db_struioflag = 0;
220 dbp->db_struioun.cksum.flags = 0;
221 return (0);
222 }
223
224 /*ARGSUSED*/
225 static int
dblk_esb_constructor(void * buf,void * cdrarg,int kmflags)226 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
227 {
228 dblk_t *dbp = buf;
229
230 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
231 return (-1);
232 dbp->db_mblk->b_datap = dbp;
233 dbp->db_cache = dblk_esb_cache;
234 dbp->db_fthdr = NULL;
235 dbp->db_credp = NULL;
236 dbp->db_cpid = -1;
237 dbp->db_struioflag = 0;
238 dbp->db_struioun.cksum.flags = 0;
239 return (0);
240 }
241
242 static int
bcache_dblk_constructor(void * buf,void * cdrarg,int kmflags)243 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
244 {
245 dblk_t *dbp = buf;
246 bcache_t *bcp = cdrarg;
247
248 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
249 return (-1);
250
251 dbp->db_base = kmem_cache_alloc(bcp->buffer_cache, kmflags);
252 if (dbp->db_base == NULL) {
253 kmem_cache_free(mblk_cache, dbp->db_mblk);
254 return (-1);
255 }
256
257 dbp->db_mblk->b_datap = dbp;
258 dbp->db_cache = (void *)bcp;
259 dbp->db_lim = dbp->db_base + bcp->size;
260 dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
261 dbp->db_frtnp = NULL;
262 dbp->db_fthdr = NULL;
263 dbp->db_credp = NULL;
264 dbp->db_cpid = -1;
265 dbp->db_struioflag = 0;
266 dbp->db_struioun.cksum.flags = 0;
267 return (0);
268 }
269
270 /*ARGSUSED*/
271 static void
dblk_destructor(void * buf,void * cdrarg)272 dblk_destructor(void *buf, void *cdrarg)
273 {
274 dblk_t *dbp = buf;
275 ssize_t msg_size = (ssize_t)cdrarg;
276
277 ASSERT(dbp->db_mblk->b_datap == dbp);
278 ASSERT(msg_size != 0);
279 ASSERT(dbp->db_struioflag == 0);
280 ASSERT(dbp->db_struioun.cksum.flags == 0);
281
282 if ((msg_size & PAGEOFFSET) == 0) {
283 kmem_free(dbp->db_base, msg_size);
284 }
285
286 kmem_cache_free(mblk_cache, dbp->db_mblk);
287 }
288
289 static void
bcache_dblk_destructor(void * buf,void * cdrarg)290 bcache_dblk_destructor(void *buf, void *cdrarg)
291 {
292 dblk_t *dbp = buf;
293 bcache_t *bcp = cdrarg;
294
295 kmem_cache_free(bcp->buffer_cache, dbp->db_base);
296
297 ASSERT(dbp->db_mblk->b_datap == dbp);
298 ASSERT(dbp->db_struioflag == 0);
299 ASSERT(dbp->db_struioun.cksum.flags == 0);
300
301 kmem_cache_free(mblk_cache, dbp->db_mblk);
302 }
303
304 /* ARGSUSED */
305 static int
ftblk_constructor(void * buf,void * cdrarg,int kmflags)306 ftblk_constructor(void *buf, void *cdrarg, int kmflags)
307 {
308 ftblk_t *fbp = buf;
309 int i;
310
311 bzero(fbp, sizeof (ftblk_t));
312 if (str_ftstack != 0) {
313 for (i = 0; i < FTBLK_EVNTS; i++)
314 fbp->ev[i].stk = kmem_alloc(sizeof (ftstk_t), kmflags);
315 }
316
317 return (0);
318 }
319
320 /* ARGSUSED */
321 static void
ftblk_destructor(void * buf,void * cdrarg)322 ftblk_destructor(void *buf, void *cdrarg)
323 {
324 ftblk_t *fbp = buf;
325 int i;
326
327 if (str_ftstack != 0) {
328 for (i = 0; i < FTBLK_EVNTS; i++) {
329 if (fbp->ev[i].stk != NULL) {
330 kmem_free(fbp->ev[i].stk, sizeof (ftstk_t));
331 fbp->ev[i].stk = NULL;
332 }
333 }
334 }
335 }
336
337 static int
fthdr_constructor(void * buf,void * cdrarg,int kmflags)338 fthdr_constructor(void *buf, void *cdrarg, int kmflags)
339 {
340 fthdr_t *fhp = buf;
341
342 return (ftblk_constructor(&fhp->first, cdrarg, kmflags));
343 }
344
345 static void
fthdr_destructor(void * buf,void * cdrarg)346 fthdr_destructor(void *buf, void *cdrarg)
347 {
348 fthdr_t *fhp = buf;
349
350 ftblk_destructor(&fhp->first, cdrarg);
351 }
352
353 void
streams_msg_init(void)354 streams_msg_init(void)
355 {
356 char name[40];
357 size_t size;
358 size_t lastsize = DBLK_MIN_SIZE;
359 size_t *sizep;
360 struct kmem_cache *cp;
361 size_t tot_size;
362 int offset;
363
364 mblk_cache = kmem_cache_create("streams_mblk", sizeof (mblk_t), 32,
365 NULL, NULL, NULL, NULL, NULL, mblk_kmem_flags);
366
367 for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
368
369 if ((offset = (size & PAGEOFFSET)) != 0) {
370 /*
371 * We are in the middle of a page, dblk should
372 * be allocated on the same page
373 */
374 tot_size = size + sizeof (dblk_t);
375 ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
376 < PAGESIZE);
377 ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
378
379 } else {
380
381 /*
382 * buf size is multiple of page size, dblk and
383 * buffer are allocated separately.
384 */
385
386 ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
387 tot_size = sizeof (dblk_t);
388 }
389
390 (void) sprintf(name, "streams_dblk_%ld", size);
391 cp = kmem_cache_create(name, tot_size, DBLK_CACHE_ALIGN,
392 dblk_constructor, dblk_destructor, NULL, (void *)(size),
393 NULL, dblk_kmem_flags);
394
395 while (lastsize <= size) {
396 dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
397 lastsize += DBLK_MIN_SIZE;
398 }
399 }
400
401 dblk_esb_cache = kmem_cache_create("streams_dblk_esb", sizeof (dblk_t),
402 DBLK_CACHE_ALIGN, dblk_esb_constructor, dblk_destructor, NULL,
403 (void *)sizeof (dblk_t), NULL, dblk_kmem_flags);
404 fthdr_cache = kmem_cache_create("streams_fthdr", sizeof (fthdr_t), 32,
405 fthdr_constructor, fthdr_destructor, NULL, NULL, NULL, 0);
406 ftblk_cache = kmem_cache_create("streams_ftblk", sizeof (ftblk_t), 32,
407 ftblk_constructor, ftblk_destructor, NULL, NULL, NULL, 0);
408
409 /* initialize throttling queue for esballoc */
410 esballoc_queue_init();
411 }
412
413 /*ARGSUSED*/
414 mblk_t *
allocb(size_t size,uint_t pri)415 allocb(size_t size, uint_t pri)
416 {
417 dblk_t *dbp;
418 mblk_t *mp;
419 size_t index;
420
421 index = (size - 1) >> DBLK_SIZE_SHIFT;
422
423 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
424 if (size != 0) {
425 mp = allocb_oversize(size, KM_NOSLEEP);
426 goto out;
427 }
428 index = 0;
429 }
430
431 if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
432 mp = NULL;
433 goto out;
434 }
435
436 mp = dbp->db_mblk;
437 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
438 mp->b_next = mp->b_prev = mp->b_cont = NULL;
439 mp->b_rptr = mp->b_wptr = dbp->db_base;
440 mp->b_queue = NULL;
441 MBLK_BAND_FLAG_WORD(mp) = 0;
442 STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
443 out:
444 FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
445
446 return (mp);
447 }
448
449 /*
450 * Allocate an mblk taking db_credp and db_cpid from the template.
451 * Allow the cred to be NULL.
452 */
453 mblk_t *
allocb_tmpl(size_t size,const mblk_t * tmpl)454 allocb_tmpl(size_t size, const mblk_t *tmpl)
455 {
456 mblk_t *mp = allocb(size, 0);
457
458 if (mp != NULL) {
459 dblk_t *src = tmpl->b_datap;
460 dblk_t *dst = mp->b_datap;
461 cred_t *cr;
462 pid_t cpid;
463
464 cr = msg_getcred(tmpl, &cpid);
465 if (cr != NULL)
466 crhold(dst->db_credp = cr);
467 dst->db_cpid = cpid;
468 dst->db_type = src->db_type;
469 }
470 return (mp);
471 }
472
473 mblk_t *
allocb_cred(size_t size,cred_t * cr,pid_t cpid)474 allocb_cred(size_t size, cred_t *cr, pid_t cpid)
475 {
476 mblk_t *mp = allocb(size, 0);
477
478 ASSERT(cr != NULL);
479 if (mp != NULL) {
480 dblk_t *dbp = mp->b_datap;
481
482 crhold(dbp->db_credp = cr);
483 dbp->db_cpid = cpid;
484 }
485 return (mp);
486 }
487
488 mblk_t *
allocb_cred_wait(size_t size,uint_t flags,int * error,cred_t * cr,pid_t cpid)489 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr, pid_t cpid)
490 {
491 mblk_t *mp = allocb_wait(size, 0, flags, error);
492
493 ASSERT(cr != NULL);
494 if (mp != NULL) {
495 dblk_t *dbp = mp->b_datap;
496
497 crhold(dbp->db_credp = cr);
498 dbp->db_cpid = cpid;
499 }
500
501 return (mp);
502 }
503
504 /*
505 * Extract the db_cred (and optionally db_cpid) from a message.
506 * We find the first mblk which has a non-NULL db_cred and use that.
507 * If none found we return NULL.
508 * Does NOT get a hold on the cred.
509 */
510 cred_t *
msg_getcred(const mblk_t * mp,pid_t * cpidp)511 msg_getcred(const mblk_t *mp, pid_t *cpidp)
512 {
513 cred_t *cr = NULL;
514 cred_t *cr2;
515 mblk_t *mp2;
516
517 while (mp != NULL) {
518 dblk_t *dbp = mp->b_datap;
519
520 cr = dbp->db_credp;
521 if (cr == NULL) {
522 mp = mp->b_cont;
523 continue;
524 }
525 if (cpidp != NULL)
526 *cpidp = dbp->db_cpid;
527
528 #ifdef DEBUG
529 /*
530 * Normally there should at most one db_credp in a message.
531 * But if there are multiple (as in the case of some M_IOC*
532 * and some internal messages in TCP/IP bind logic) then
533 * they must be identical in the normal case.
534 * However, a socket can be shared between different uids
535 * in which case data queued in TCP would be from different
536 * creds. Thus we can only assert for the zoneid being the
537 * same. Due to Multi-level Level Ports for TX, some
538 * cred_t can have a NULL cr_zone, and we skip the comparison
539 * in that case.
540 */
541 mp2 = mp->b_cont;
542 while (mp2 != NULL) {
543 cr2 = DB_CRED(mp2);
544 if (cr2 != NULL) {
545 DTRACE_PROBE2(msg__getcred,
546 cred_t *, cr, cred_t *, cr2);
547 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
548 crgetzone(cr) == NULL ||
549 crgetzone(cr2) == NULL);
550 }
551 mp2 = mp2->b_cont;
552 }
553 #endif
554 return (cr);
555 }
556 if (cpidp != NULL)
557 *cpidp = NOPID;
558 return (NULL);
559 }
560
561 /*
562 * Variant of msg_getcred which, when a cred is found
563 * 1. Returns with a hold on the cred
564 * 2. Clears the first cred in the mblk.
565 * This is more efficient to use than a msg_getcred() + crhold() when
566 * the message is freed after the cred has been extracted.
567 *
568 * The caller is responsible for ensuring that there is no other reference
569 * on the message since db_credp can not be cleared when there are other
570 * references.
571 */
572 cred_t *
msg_extractcred(mblk_t * mp,pid_t * cpidp)573 msg_extractcred(mblk_t *mp, pid_t *cpidp)
574 {
575 cred_t *cr = NULL;
576 cred_t *cr2;
577 mblk_t *mp2;
578
579 while (mp != NULL) {
580 dblk_t *dbp = mp->b_datap;
581
582 cr = dbp->db_credp;
583 if (cr == NULL) {
584 mp = mp->b_cont;
585 continue;
586 }
587 ASSERT(dbp->db_ref == 1);
588 dbp->db_credp = NULL;
589 if (cpidp != NULL)
590 *cpidp = dbp->db_cpid;
591 #ifdef DEBUG
592 /*
593 * Normally there should at most one db_credp in a message.
594 * But if there are multiple (as in the case of some M_IOC*
595 * and some internal messages in TCP/IP bind logic) then
596 * they must be identical in the normal case.
597 * However, a socket can be shared between different uids
598 * in which case data queued in TCP would be from different
599 * creds. Thus we can only assert for the zoneid being the
600 * same. Due to Multi-level Level Ports for TX, some
601 * cred_t can have a NULL cr_zone, and we skip the comparison
602 * in that case.
603 */
604 mp2 = mp->b_cont;
605 while (mp2 != NULL) {
606 cr2 = DB_CRED(mp2);
607 if (cr2 != NULL) {
608 DTRACE_PROBE2(msg__extractcred,
609 cred_t *, cr, cred_t *, cr2);
610 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
611 crgetzone(cr) == NULL ||
612 crgetzone(cr2) == NULL);
613 }
614 mp2 = mp2->b_cont;
615 }
616 #endif
617 return (cr);
618 }
619 return (NULL);
620 }
621 /*
622 * Get the label for a message. Uses the first mblk in the message
623 * which has a non-NULL db_credp.
624 * Returns NULL if there is no credp.
625 */
626 extern struct ts_label_s *
msg_getlabel(const mblk_t * mp)627 msg_getlabel(const mblk_t *mp)
628 {
629 cred_t *cr = msg_getcred(mp, NULL);
630
631 if (cr == NULL)
632 return (NULL);
633
634 return (crgetlabel(cr));
635 }
636
637 void
freeb(mblk_t * mp)638 freeb(mblk_t *mp)
639 {
640 dblk_t *dbp = mp->b_datap;
641
642 ASSERT(dbp->db_ref > 0);
643 ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
644 FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
645
646 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
647
648 dbp->db_free(mp, dbp);
649 }
650
651 void
freemsg(mblk_t * mp)652 freemsg(mblk_t *mp)
653 {
654 FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
655 while (mp) {
656 dblk_t *dbp = mp->b_datap;
657 mblk_t *mp_cont = mp->b_cont;
658
659 ASSERT(dbp->db_ref > 0);
660 ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
661
662 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
663
664 dbp->db_free(mp, dbp);
665 mp = mp_cont;
666 }
667 }
668
669 /*
670 * Reallocate a block for another use. Try hard to use the old block.
671 * If the old data is wanted (copy), leave b_wptr at the end of the data,
672 * otherwise return b_wptr = b_rptr.
673 *
674 * This routine is private and unstable.
675 */
676 mblk_t *
reallocb(mblk_t * mp,size_t size,uint_t copy)677 reallocb(mblk_t *mp, size_t size, uint_t copy)
678 {
679 mblk_t *mp1;
680 unsigned char *old_rptr;
681 ptrdiff_t cur_size;
682
683 if (mp == NULL)
684 return (allocb(size, BPRI_HI));
685
686 cur_size = mp->b_wptr - mp->b_rptr;
687 old_rptr = mp->b_rptr;
688
689 ASSERT(mp->b_datap->db_ref != 0);
690
691 if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
692 /*
693 * If the data is wanted and it will fit where it is, no
694 * work is required.
695 */
696 if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
697 return (mp);
698
699 mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
700 mp1 = mp;
701 } else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
702 /* XXX other mp state could be copied too, db_flags ... ? */
703 mp1->b_cont = mp->b_cont;
704 } else {
705 return (NULL);
706 }
707
708 if (copy) {
709 bcopy(old_rptr, mp1->b_rptr, cur_size);
710 mp1->b_wptr = mp1->b_rptr + cur_size;
711 }
712
713 if (mp != mp1)
714 freeb(mp);
715
716 return (mp1);
717 }
718
719 static void
dblk_lastfree(mblk_t * mp,dblk_t * dbp)720 dblk_lastfree(mblk_t *mp, dblk_t *dbp)
721 {
722 ASSERT(dbp->db_mblk == mp);
723 if (dbp->db_fthdr != NULL)
724 str_ftfree(dbp);
725
726 /* set credp and projid to be 'unspecified' before returning to cache */
727 if (dbp->db_credp != NULL) {
728 crfree(dbp->db_credp);
729 dbp->db_credp = NULL;
730 }
731 dbp->db_cpid = -1;
732
733 /* Reset the struioflag and the checksum flag fields */
734 dbp->db_struioflag = 0;
735 dbp->db_struioun.cksum.flags = 0;
736
737 /* and the COOKED and/or UIOA flag(s) */
738 dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA);
739
740 kmem_cache_free(dbp->db_cache, dbp);
741 }
742
743 static void
dblk_decref(mblk_t * mp,dblk_t * dbp)744 dblk_decref(mblk_t *mp, dblk_t *dbp)
745 {
746 if (dbp->db_ref != 1) {
747 uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
748 -(1 << DBLK_RTFU_SHIFT(db_ref)));
749 /*
750 * atomic_add_32_nv() just decremented db_ref, so we no longer
751 * have a reference to the dblk, which means another thread
752 * could free it. Therefore we cannot examine the dblk to
753 * determine whether ours was the last reference. Instead,
754 * we extract the new and minimum reference counts from rtfu.
755 * Note that all we're really saying is "if (ref != refmin)".
756 */
757 if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
758 ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
759 kmem_cache_free(mblk_cache, mp);
760 return;
761 }
762 }
763 dbp->db_mblk = mp;
764 dbp->db_free = dbp->db_lastfree;
765 dbp->db_lastfree(mp, dbp);
766 }
767
768 mblk_t *
dupb(mblk_t * mp)769 dupb(mblk_t *mp)
770 {
771 dblk_t *dbp = mp->b_datap;
772 mblk_t *new_mp;
773 uint32_t oldrtfu, newrtfu;
774
775 if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
776 goto out;
777
778 new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
779 new_mp->b_rptr = mp->b_rptr;
780 new_mp->b_wptr = mp->b_wptr;
781 new_mp->b_datap = dbp;
782 new_mp->b_queue = NULL;
783 MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
784
785 STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
786
787 dbp->db_free = dblk_decref;
788 do {
789 ASSERT(dbp->db_ref > 0);
790 oldrtfu = DBLK_RTFU_WORD(dbp);
791 newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
792 /*
793 * If db_ref is maxed out we can't dup this message anymore.
794 */
795 if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
796 kmem_cache_free(mblk_cache, new_mp);
797 new_mp = NULL;
798 goto out;
799 }
800 } while (atomic_cas_32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) !=
801 oldrtfu);
802
803 out:
804 FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
805 return (new_mp);
806 }
807
808 static void
dblk_lastfree_desb(mblk_t * mp,dblk_t * dbp)809 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
810 {
811 frtn_t *frp = dbp->db_frtnp;
812
813 ASSERT(dbp->db_mblk == mp);
814 frp->free_func(frp->free_arg);
815 if (dbp->db_fthdr != NULL)
816 str_ftfree(dbp);
817
818 /* set credp and projid to be 'unspecified' before returning to cache */
819 if (dbp->db_credp != NULL) {
820 crfree(dbp->db_credp);
821 dbp->db_credp = NULL;
822 }
823 dbp->db_cpid = -1;
824 dbp->db_struioflag = 0;
825 dbp->db_struioun.cksum.flags = 0;
826
827 kmem_cache_free(dbp->db_cache, dbp);
828 }
829
830 /*ARGSUSED*/
831 static void
frnop_func(void * arg)832 frnop_func(void *arg)
833 {
834 }
835
836 /*
837 * Generic esballoc used to implement the four flavors: [d]esballoc[a].
838 *
839 * The variants with a 'd' prefix (desballoc, desballoca)
840 * directly free the mblk when it loses its last ref,
841 * where the other variants free asynchronously.
842 * The variants with an 'a' suffix (esballoca, desballoca)
843 * add an extra ref, effectively letting the streams subsystem
844 * know that the message data should not be modified.
845 * (eg. see db_ref checks in reallocb and elsewhere)
846 *
847 * The method used by the 'a' suffix functions to keep the dblk
848 * db_ref > 1 is non-obvious. The macro DBLK_RTFU(2,...) passed to
849 * gesballoc sets the initial db_ref = 2 and sets the DBLK_REFMIN
850 * bit in db_flags. In dblk_decref() that flag essentially means
851 * the dblk has one extra ref, so the "last ref" is one, not zero.
852 */
853 static mblk_t *
gesballoc(unsigned char * base,size_t size,uint32_t db_rtfu,frtn_t * frp,void (* lastfree)(mblk_t *,dblk_t *),int kmflags)854 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
855 void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
856 {
857 dblk_t *dbp;
858 mblk_t *mp;
859
860 ASSERT(base != NULL && frp != NULL);
861
862 if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
863 mp = NULL;
864 goto out;
865 }
866
867 mp = dbp->db_mblk;
868 dbp->db_base = base;
869 dbp->db_lim = base + size;
870 dbp->db_free = dbp->db_lastfree = lastfree;
871 dbp->db_frtnp = frp;
872 DBLK_RTFU_WORD(dbp) = db_rtfu;
873 mp->b_next = mp->b_prev = mp->b_cont = NULL;
874 mp->b_rptr = mp->b_wptr = base;
875 mp->b_queue = NULL;
876 MBLK_BAND_FLAG_WORD(mp) = 0;
877
878 out:
879 FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
880 return (mp);
881 }
882
883 /*ARGSUSED*/
884 mblk_t *
esballoc(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)885 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
886 {
887 mblk_t *mp;
888
889 /*
890 * Note that this is structured to allow the common case (i.e.
891 * STREAMS flowtracing disabled) to call gesballoc() with tail
892 * call optimization.
893 */
894 if (!str_ftnever) {
895 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
896 frp, freebs_enqueue, KM_NOSLEEP);
897
898 if (mp != NULL)
899 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
900 return (mp);
901 }
902
903 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
904 frp, freebs_enqueue, KM_NOSLEEP));
905 }
906
907 /*
908 * Same as esballoc() but sleeps waiting for memory.
909 */
910 /*ARGSUSED*/
911 mblk_t *
esballoc_wait(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)912 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
913 {
914 mblk_t *mp;
915
916 /*
917 * Note that this is structured to allow the common case (i.e.
918 * STREAMS flowtracing disabled) to call gesballoc() with tail
919 * call optimization.
920 */
921 if (!str_ftnever) {
922 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
923 frp, freebs_enqueue, KM_SLEEP);
924
925 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
926 return (mp);
927 }
928
929 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
930 frp, freebs_enqueue, KM_SLEEP));
931 }
932
933 /*ARGSUSED*/
934 mblk_t *
desballoc(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)935 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
936 {
937 mblk_t *mp;
938
939 /*
940 * Note that this is structured to allow the common case (i.e.
941 * STREAMS flowtracing disabled) to call gesballoc() with tail
942 * call optimization.
943 */
944 if (!str_ftnever) {
945 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
946 frp, dblk_lastfree_desb, KM_NOSLEEP);
947
948 if (mp != NULL)
949 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
950 return (mp);
951 }
952
953 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
954 frp, dblk_lastfree_desb, KM_NOSLEEP));
955 }
956
957 /*ARGSUSED*/
958 mblk_t *
esballoca(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)959 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
960 {
961 mblk_t *mp;
962
963 /*
964 * Note that this is structured to allow the common case (i.e.
965 * STREAMS flowtracing disabled) to call gesballoc() with tail
966 * call optimization.
967 */
968 if (!str_ftnever) {
969 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
970 frp, freebs_enqueue, KM_NOSLEEP);
971
972 if (mp != NULL)
973 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
974 return (mp);
975 }
976
977 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
978 frp, freebs_enqueue, KM_NOSLEEP));
979 }
980
981 /*
982 * Same as esballoca() but sleeps waiting for memory.
983 */
984 mblk_t *
esballoca_wait(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)985 esballoca_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
986 {
987 mblk_t *mp;
988
989 /*
990 * Note that this is structured to allow the common case (i.e.
991 * STREAMS flowtracing disabled) to call gesballoc() with tail
992 * call optimization.
993 */
994 if (!str_ftnever) {
995 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
996 frp, freebs_enqueue, KM_SLEEP);
997
998 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
999 return (mp);
1000 }
1001
1002 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
1003 frp, freebs_enqueue, KM_SLEEP));
1004 }
1005
1006 /*ARGSUSED*/
1007 mblk_t *
desballoca(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)1008 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
1009 {
1010 mblk_t *mp;
1011
1012 /*
1013 * Note that this is structured to allow the common case (i.e.
1014 * STREAMS flowtracing disabled) to call gesballoc() with tail
1015 * call optimization.
1016 */
1017 if (!str_ftnever) {
1018 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
1019 frp, dblk_lastfree_desb, KM_NOSLEEP);
1020
1021 if (mp != NULL)
1022 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
1023 return (mp);
1024 }
1025
1026 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
1027 frp, dblk_lastfree_desb, KM_NOSLEEP));
1028 }
1029
1030 static void
bcache_dblk_lastfree(mblk_t * mp,dblk_t * dbp)1031 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
1032 {
1033 bcache_t *bcp = dbp->db_cache;
1034
1035 ASSERT(dbp->db_mblk == mp);
1036 if (dbp->db_fthdr != NULL)
1037 str_ftfree(dbp);
1038
1039 /* set credp and projid to be 'unspecified' before returning to cache */
1040 if (dbp->db_credp != NULL) {
1041 crfree(dbp->db_credp);
1042 dbp->db_credp = NULL;
1043 }
1044 dbp->db_cpid = -1;
1045 dbp->db_struioflag = 0;
1046 dbp->db_struioun.cksum.flags = 0;
1047
1048 mutex_enter(&bcp->mutex);
1049 kmem_cache_free(bcp->dblk_cache, dbp);
1050 bcp->alloc--;
1051
1052 if (bcp->alloc == 0 && bcp->destroy != 0) {
1053 kmem_cache_destroy(bcp->dblk_cache);
1054 kmem_cache_destroy(bcp->buffer_cache);
1055 mutex_exit(&bcp->mutex);
1056 mutex_destroy(&bcp->mutex);
1057 kmem_free(bcp, sizeof (bcache_t));
1058 } else {
1059 mutex_exit(&bcp->mutex);
1060 }
1061 }
1062
1063 bcache_t *
bcache_create(char * name,size_t size,uint_t align)1064 bcache_create(char *name, size_t size, uint_t align)
1065 {
1066 bcache_t *bcp;
1067 char buffer[255];
1068
1069 ASSERT((align & (align - 1)) == 0);
1070
1071 if ((bcp = kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == NULL)
1072 return (NULL);
1073
1074 bcp->size = size;
1075 bcp->align = align;
1076 bcp->alloc = 0;
1077 bcp->destroy = 0;
1078
1079 mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
1080
1081 (void) sprintf(buffer, "%s_buffer_cache", name);
1082 bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
1083 NULL, NULL, NULL, 0);
1084 (void) sprintf(buffer, "%s_dblk_cache", name);
1085 bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
1086 DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
1087 NULL, (void *)bcp, NULL, 0);
1088
1089 return (bcp);
1090 }
1091
1092 void
bcache_destroy(bcache_t * bcp)1093 bcache_destroy(bcache_t *bcp)
1094 {
1095 ASSERT(bcp != NULL);
1096
1097 mutex_enter(&bcp->mutex);
1098 if (bcp->alloc == 0) {
1099 kmem_cache_destroy(bcp->dblk_cache);
1100 kmem_cache_destroy(bcp->buffer_cache);
1101 mutex_exit(&bcp->mutex);
1102 mutex_destroy(&bcp->mutex);
1103 kmem_free(bcp, sizeof (bcache_t));
1104 } else {
1105 bcp->destroy++;
1106 mutex_exit(&bcp->mutex);
1107 }
1108 }
1109
1110 /*ARGSUSED*/
1111 mblk_t *
bcache_allocb(bcache_t * bcp,uint_t pri)1112 bcache_allocb(bcache_t *bcp, uint_t pri)
1113 {
1114 dblk_t *dbp;
1115 mblk_t *mp = NULL;
1116
1117 ASSERT(bcp != NULL);
1118
1119 mutex_enter(&bcp->mutex);
1120 if (bcp->destroy != 0) {
1121 mutex_exit(&bcp->mutex);
1122 goto out;
1123 }
1124
1125 if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
1126 mutex_exit(&bcp->mutex);
1127 goto out;
1128 }
1129 bcp->alloc++;
1130 mutex_exit(&bcp->mutex);
1131
1132 ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
1133
1134 mp = dbp->db_mblk;
1135 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1136 mp->b_next = mp->b_prev = mp->b_cont = NULL;
1137 mp->b_rptr = mp->b_wptr = dbp->db_base;
1138 mp->b_queue = NULL;
1139 MBLK_BAND_FLAG_WORD(mp) = 0;
1140 STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
1141 out:
1142 FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
1143
1144 return (mp);
1145 }
1146
1147 static void
dblk_lastfree_oversize(mblk_t * mp,dblk_t * dbp)1148 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
1149 {
1150 ASSERT(dbp->db_mblk == mp);
1151 if (dbp->db_fthdr != NULL)
1152 str_ftfree(dbp);
1153
1154 /* set credp and projid to be 'unspecified' before returning to cache */
1155 if (dbp->db_credp != NULL) {
1156 crfree(dbp->db_credp);
1157 dbp->db_credp = NULL;
1158 }
1159 dbp->db_cpid = -1;
1160 dbp->db_struioflag = 0;
1161 dbp->db_struioun.cksum.flags = 0;
1162
1163 kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
1164 kmem_cache_free(dbp->db_cache, dbp);
1165 }
1166
1167 static mblk_t *
allocb_oversize(size_t size,int kmflags)1168 allocb_oversize(size_t size, int kmflags)
1169 {
1170 mblk_t *mp;
1171 void *buf;
1172
1173 size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
1174 if ((buf = kmem_alloc(size, kmflags)) == NULL)
1175 return (NULL);
1176 if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
1177 &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
1178 kmem_free(buf, size);
1179
1180 if (mp != NULL)
1181 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
1182
1183 return (mp);
1184 }
1185
1186 mblk_t *
allocb_tryhard(size_t target_size)1187 allocb_tryhard(size_t target_size)
1188 {
1189 size_t size;
1190 mblk_t *bp;
1191
1192 for (size = target_size; size < target_size + 512;
1193 size += DBLK_CACHE_ALIGN)
1194 if ((bp = allocb(size, BPRI_HI)) != NULL)
1195 return (bp);
1196 allocb_tryhard_fails++;
1197 return (NULL);
1198 }
1199
1200 /*
1201 * This routine is consolidation private for STREAMS internal use
1202 * This routine may only be called from sync routines (i.e., not
1203 * from put or service procedures). It is located here (rather
1204 * than strsubr.c) so that we don't have to expose all of the
1205 * allocb() implementation details in header files.
1206 */
1207 mblk_t *
allocb_wait(size_t size,uint_t pri,uint_t flags,int * error)1208 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
1209 {
1210 dblk_t *dbp;
1211 mblk_t *mp;
1212 size_t index;
1213
1214 index = (size -1) >> DBLK_SIZE_SHIFT;
1215
1216 if (flags & STR_NOSIG) {
1217 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
1218 if (size != 0) {
1219 mp = allocb_oversize(size, KM_SLEEP);
1220 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
1221 (uintptr_t)mp);
1222 return (mp);
1223 }
1224 index = 0;
1225 }
1226
1227 dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
1228 mp = dbp->db_mblk;
1229 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1230 mp->b_next = mp->b_prev = mp->b_cont = NULL;
1231 mp->b_rptr = mp->b_wptr = dbp->db_base;
1232 mp->b_queue = NULL;
1233 MBLK_BAND_FLAG_WORD(mp) = 0;
1234 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
1235
1236 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
1237
1238 } else {
1239 while ((mp = allocb(size, pri)) == NULL) {
1240 if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
1241 return (NULL);
1242 }
1243 }
1244
1245 return (mp);
1246 }
1247
1248 /*
1249 * Call function 'func' with 'arg' when a class zero block can
1250 * be allocated with priority 'pri'.
1251 */
1252 bufcall_id_t
esbbcall(uint_t pri,void (* func)(void *),void * arg)1253 esbbcall(uint_t pri, void (*func)(void *), void *arg)
1254 {
1255 return (bufcall(1, pri, func, arg));
1256 }
1257
1258 /*
1259 * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
1260 * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
1261 * This provides consistency for all internal allocators of ioctl.
1262 */
1263 mblk_t *
mkiocb(uint_t cmd)1264 mkiocb(uint_t cmd)
1265 {
1266 struct iocblk *ioc;
1267 mblk_t *mp;
1268
1269 /*
1270 * Allocate enough space for any of the ioctl related messages.
1271 */
1272 if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
1273 return (NULL);
1274
1275 bzero(mp->b_rptr, sizeof (union ioctypes));
1276
1277 /*
1278 * Set the mblk_t information and ptrs correctly.
1279 */
1280 mp->b_wptr += sizeof (struct iocblk);
1281 mp->b_datap->db_type = M_IOCTL;
1282
1283 /*
1284 * Fill in the fields.
1285 */
1286 ioc = (struct iocblk *)mp->b_rptr;
1287 ioc->ioc_cmd = cmd;
1288 ioc->ioc_cr = kcred;
1289 ioc->ioc_id = getiocseqno();
1290 ioc->ioc_flag = IOC_NATIVE;
1291 return (mp);
1292 }
1293
1294 /*
1295 * test if block of given size can be allocated with a request of
1296 * the given priority.
1297 * 'pri' is no longer used, but is retained for compatibility.
1298 */
1299 /* ARGSUSED */
1300 int
testb(size_t size,uint_t pri)1301 testb(size_t size, uint_t pri)
1302 {
1303 return ((size + sizeof (dblk_t)) <= kmem_avail());
1304 }
1305
1306 /*
1307 * Call function 'func' with argument 'arg' when there is a reasonably
1308 * good chance that a block of size 'size' can be allocated.
1309 * 'pri' is no longer used, but is retained for compatibility.
1310 */
1311 /* ARGSUSED */
1312 bufcall_id_t
bufcall(size_t size,uint_t pri,void (* func)(void *),void * arg)1313 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
1314 {
1315 static long bid = 1; /* always odd to save checking for zero */
1316 bufcall_id_t bc_id;
1317 struct strbufcall *bcp;
1318
1319 if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
1320 return (0);
1321
1322 bcp->bc_func = func;
1323 bcp->bc_arg = arg;
1324 bcp->bc_size = size;
1325 bcp->bc_next = NULL;
1326 bcp->bc_executor = NULL;
1327
1328 mutex_enter(&strbcall_lock);
1329 /*
1330 * After bcp is linked into strbcalls and strbcall_lock is dropped there
1331 * should be no references to bcp since it may be freed by
1332 * runbufcalls(). Since bcp_id field is returned, we save its value in
1333 * the local var.
1334 */
1335 bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2); /* keep it odd */
1336
1337 /*
1338 * add newly allocated stream event to existing
1339 * linked list of events.
1340 */
1341 if (strbcalls.bc_head == NULL) {
1342 strbcalls.bc_head = strbcalls.bc_tail = bcp;
1343 } else {
1344 strbcalls.bc_tail->bc_next = bcp;
1345 strbcalls.bc_tail = bcp;
1346 }
1347
1348 cv_signal(&strbcall_cv);
1349 mutex_exit(&strbcall_lock);
1350 return (bc_id);
1351 }
1352
1353 /*
1354 * Cancel a bufcall request.
1355 */
1356 void
unbufcall(bufcall_id_t id)1357 unbufcall(bufcall_id_t id)
1358 {
1359 strbufcall_t *bcp, *pbcp;
1360
1361 mutex_enter(&strbcall_lock);
1362 again:
1363 pbcp = NULL;
1364 for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
1365 if (id == bcp->bc_id)
1366 break;
1367 pbcp = bcp;
1368 }
1369 if (bcp) {
1370 if (bcp->bc_executor != NULL) {
1371 if (bcp->bc_executor != curthread) {
1372 cv_wait(&bcall_cv, &strbcall_lock);
1373 goto again;
1374 }
1375 } else {
1376 if (pbcp)
1377 pbcp->bc_next = bcp->bc_next;
1378 else
1379 strbcalls.bc_head = bcp->bc_next;
1380 if (bcp == strbcalls.bc_tail)
1381 strbcalls.bc_tail = pbcp;
1382 kmem_free(bcp, sizeof (strbufcall_t));
1383 }
1384 }
1385 mutex_exit(&strbcall_lock);
1386 }
1387
1388 /*
1389 * Duplicate a message block by block (uses dupb), returning
1390 * a pointer to the duplicate message.
1391 * Returns a non-NULL value only if the entire message
1392 * was dup'd.
1393 */
1394 mblk_t *
dupmsg(mblk_t * bp)1395 dupmsg(mblk_t *bp)
1396 {
1397 mblk_t *head, *nbp;
1398
1399 if (!bp || !(nbp = head = dupb(bp)))
1400 return (NULL);
1401
1402 while (bp->b_cont) {
1403 if (!(nbp->b_cont = dupb(bp->b_cont))) {
1404 freemsg(head);
1405 return (NULL);
1406 }
1407 nbp = nbp->b_cont;
1408 bp = bp->b_cont;
1409 }
1410 return (head);
1411 }
1412
1413 #define DUPB_NOLOAN(bp) \
1414 ((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
1415 copyb((bp)) : dupb((bp)))
1416
1417 mblk_t *
dupmsg_noloan(mblk_t * bp)1418 dupmsg_noloan(mblk_t *bp)
1419 {
1420 mblk_t *head, *nbp;
1421
1422 if (bp == NULL || DB_TYPE(bp) != M_DATA ||
1423 ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
1424 return (NULL);
1425
1426 while (bp->b_cont) {
1427 if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
1428 freemsg(head);
1429 return (NULL);
1430 }
1431 nbp = nbp->b_cont;
1432 bp = bp->b_cont;
1433 }
1434 return (head);
1435 }
1436
1437 /*
1438 * Copy data from message and data block to newly allocated message and
1439 * data block. Returns new message block pointer, or NULL if error.
1440 * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
1441 * as in the original even when db_base is not word aligned. (bug 1052877)
1442 */
1443 mblk_t *
copyb(mblk_t * bp)1444 copyb(mblk_t *bp)
1445 {
1446 mblk_t *nbp;
1447 dblk_t *dp, *ndp;
1448 uchar_t *base;
1449 size_t size;
1450 size_t unaligned;
1451
1452 ASSERT(bp->b_wptr >= bp->b_rptr);
1453
1454 dp = bp->b_datap;
1455 if (dp->db_fthdr != NULL)
1456 STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
1457
1458 size = dp->db_lim - dp->db_base;
1459 unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
1460 if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
1461 return (NULL);
1462 nbp->b_flag = bp->b_flag;
1463 nbp->b_band = bp->b_band;
1464 ndp = nbp->b_datap;
1465
1466 /*
1467 * Copy the various checksum information that came in
1468 * originally.
1469 */
1470 ndp->db_cksumstart = dp->db_cksumstart;
1471 ndp->db_cksumend = dp->db_cksumend;
1472 ndp->db_cksumstuff = dp->db_cksumstuff;
1473 bcopy(dp->db_struioun.data, ndp->db_struioun.data,
1474 sizeof (dp->db_struioun.data));
1475
1476 /*
1477 * Well, here is a potential issue. If we are trying to
1478 * trace a flow, and we copy the message, we might lose
1479 * information about where this message might have been.
1480 * So we should inherit the FT data. On the other hand,
1481 * a user might be interested only in alloc to free data.
1482 * So I guess the real answer is to provide a tunable.
1483 */
1484 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1485
1486 base = ndp->db_base + unaligned;
1487 bcopy(dp->db_base, ndp->db_base + unaligned, size);
1488
1489 nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
1490 nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
1491
1492 return (nbp);
1493 }
1494
1495 /*
1496 * Copy data from message to newly allocated message using new
1497 * data blocks. Returns a pointer to the new message, or NULL if error.
1498 */
1499 mblk_t *
copymsg(mblk_t * bp)1500 copymsg(mblk_t *bp)
1501 {
1502 mblk_t *head, *nbp;
1503
1504 if (!bp || !(nbp = head = copyb(bp)))
1505 return (NULL);
1506
1507 while (bp->b_cont) {
1508 if (!(nbp->b_cont = copyb(bp->b_cont))) {
1509 freemsg(head);
1510 return (NULL);
1511 }
1512 nbp = nbp->b_cont;
1513 bp = bp->b_cont;
1514 }
1515 return (head);
1516 }
1517
1518 /*
1519 * link a message block to tail of message
1520 */
1521 void
linkb(mblk_t * mp,mblk_t * bp)1522 linkb(mblk_t *mp, mblk_t *bp)
1523 {
1524 ASSERT(mp && bp);
1525
1526 for (; mp->b_cont; mp = mp->b_cont)
1527 ;
1528 mp->b_cont = bp;
1529 }
1530
1531 /*
1532 * unlink a message block from head of message
1533 * return pointer to new message.
1534 * NULL if message becomes empty.
1535 */
1536 mblk_t *
unlinkb(mblk_t * bp)1537 unlinkb(mblk_t *bp)
1538 {
1539 mblk_t *bp1;
1540
1541 bp1 = bp->b_cont;
1542 bp->b_cont = NULL;
1543 return (bp1);
1544 }
1545
1546 /*
1547 * remove a message block "bp" from message "mp"
1548 *
1549 * Return pointer to new message or NULL if no message remains.
1550 * Return -1 if bp is not found in message.
1551 */
1552 mblk_t *
rmvb(mblk_t * mp,mblk_t * bp)1553 rmvb(mblk_t *mp, mblk_t *bp)
1554 {
1555 mblk_t *tmp;
1556 mblk_t *lastp = NULL;
1557
1558 ASSERT(mp && bp);
1559 for (tmp = mp; tmp; tmp = tmp->b_cont) {
1560 if (tmp == bp) {
1561 if (lastp)
1562 lastp->b_cont = tmp->b_cont;
1563 else
1564 mp = tmp->b_cont;
1565 tmp->b_cont = NULL;
1566 return (mp);
1567 }
1568 lastp = tmp;
1569 }
1570 return ((mblk_t *)-1);
1571 }
1572
1573 /*
1574 * Concatenate and align first len bytes of common
1575 * message type. Len == -1, means concat everything.
1576 * Returns 1 on success, 0 on failure
1577 * After the pullup, mp points to the pulled up data.
1578 */
1579 int
pullupmsg(mblk_t * mp,ssize_t len)1580 pullupmsg(mblk_t *mp, ssize_t len)
1581 {
1582 mblk_t *bp, *b_cont;
1583 dblk_t *dbp;
1584 ssize_t n;
1585
1586 ASSERT(mp->b_datap->db_ref > 0);
1587 ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
1588
1589 if (len == -1) {
1590 if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
1591 return (1);
1592 len = xmsgsize(mp);
1593 } else {
1594 ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
1595 ASSERT(first_mblk_len >= 0);
1596 /*
1597 * If the length is less than that of the first mblk,
1598 * we want to pull up the message into an aligned mblk.
1599 * Though not part of the spec, some callers assume it.
1600 */
1601 if (len <= first_mblk_len) {
1602 if (str_aligned(mp->b_rptr))
1603 return (1);
1604 len = first_mblk_len;
1605 } else if (xmsgsize(mp) < len)
1606 return (0);
1607 }
1608
1609 if ((bp = allocb_tmpl(len, mp)) == NULL)
1610 return (0);
1611
1612 dbp = bp->b_datap;
1613 *bp = *mp; /* swap mblks so bp heads the old msg... */
1614 mp->b_datap = dbp; /* ... and mp heads the new message */
1615 mp->b_datap->db_mblk = mp;
1616 bp->b_datap->db_mblk = bp;
1617 mp->b_rptr = mp->b_wptr = dbp->db_base;
1618
1619 do {
1620 ASSERT(bp->b_datap->db_ref > 0);
1621 ASSERT(bp->b_wptr >= bp->b_rptr);
1622 n = MIN(bp->b_wptr - bp->b_rptr, len);
1623 ASSERT(n >= 0); /* allow zero-length mblk_t's */
1624 if (n > 0)
1625 bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
1626 mp->b_wptr += n;
1627 bp->b_rptr += n;
1628 len -= n;
1629 if (bp->b_rptr != bp->b_wptr)
1630 break;
1631 b_cont = bp->b_cont;
1632 freeb(bp);
1633 bp = b_cont;
1634 } while (len && bp);
1635
1636 mp->b_cont = bp; /* tack on whatever wasn't pulled up */
1637
1638 return (1);
1639 }
1640
1641 /*
1642 * Concatenate and align at least the first len bytes of common message
1643 * type. Len == -1 means concatenate everything. The original message is
1644 * unaltered. Returns a pointer to a new message on success, otherwise
1645 * returns NULL.
1646 */
1647 mblk_t *
msgpullup(mblk_t * mp,ssize_t len)1648 msgpullup(mblk_t *mp, ssize_t len)
1649 {
1650 return (msgpullup_pad(mp, len, 0));
1651 }
1652
1653 /*
1654 * Concatenate and align at least the first len bytes of common message
1655 * type, leaving `pad` bytes of headroom before the message.
1656 *
1657 * `len` == -1 means concatenate everything. The original message is
1658 * unaltered. Returns a pointer to a new message on success, otherwise
1659 * returns NULL.
1660 */
1661 mblk_t *
msgpullup_pad(mblk_t * mp,ssize_t len,size_t pad)1662 msgpullup_pad(mblk_t *mp, ssize_t len, size_t pad)
1663 {
1664 mblk_t *newmp;
1665 ssize_t totlen = xmsgsize(mp);
1666 ssize_t offset = 0;
1667
1668 if (len == -1)
1669 len = totlen;
1670
1671 if (len < 0 || (len > 0 && len > totlen))
1672 return (NULL);
1673
1674 /* Now that len is normalised (>0), check for overflow including pad. */
1675 const size_t alloc_sz = (size_t)len + pad;
1676 if (alloc_sz < (size_t)len ||
1677 (newmp = allocb_tmpl(alloc_sz, mp)) == NULL) {
1678 return (NULL);
1679 }
1680
1681 newmp->b_flag = mp->b_flag;
1682 newmp->b_band = mp->b_band;
1683 newmp->b_wptr += pad;
1684 newmp->b_rptr = newmp->b_wptr;
1685
1686 while (len > 0) {
1687 ssize_t seglen = MBLKL(mp);
1688 ssize_t n = MIN(seglen, len);
1689
1690 ASSERT3P(mp, !=, NULL); /* guaranteed by len <= totlen */
1691 ASSERT3S(n, >=, 0); /* allow zero-length mblk_t's */
1692 if (n > 0)
1693 bcopy(mp->b_rptr, newmp->b_wptr, n);
1694 newmp->b_wptr += n;
1695 len -= n;
1696
1697 if (n == seglen)
1698 mp = mp->b_cont;
1699 else if (len == 0)
1700 offset = n;
1701 }
1702 ASSERT3S(len, ==, 0);
1703
1704 if (mp != NULL) {
1705 newmp->b_cont = dupmsg(mp);
1706 if (newmp->b_cont == NULL) {
1707 freemsg(newmp);
1708 return (NULL);
1709 }
1710 ASSERT3S(offset, >=, 0);
1711 ASSERT3U(MBLKL(newmp->b_cont), >=, offset);
1712 newmp->b_cont->b_rptr += offset;
1713 }
1714
1715 return (newmp);
1716 }
1717
1718 /*
1719 * Trim bytes from message
1720 * len > 0, trim from head
1721 * len < 0, trim from tail
1722 * Returns 1 on success, 0 on failure.
1723 */
1724 int
adjmsg(mblk_t * mp,ssize_t len)1725 adjmsg(mblk_t *mp, ssize_t len)
1726 {
1727 mblk_t *bp;
1728 mblk_t *save_bp = NULL;
1729 mblk_t *prev_bp;
1730 mblk_t *bcont;
1731 unsigned char type;
1732 ssize_t n;
1733 int fromhead;
1734 int first;
1735
1736 ASSERT(mp != NULL);
1737
1738 if (len < 0) {
1739 fromhead = 0;
1740 len = -len;
1741 } else {
1742 fromhead = 1;
1743 }
1744
1745 if (xmsgsize(mp) < len)
1746 return (0);
1747
1748 if (fromhead) {
1749 first = 1;
1750 while (len) {
1751 ASSERT(mp->b_wptr >= mp->b_rptr);
1752 n = MIN(mp->b_wptr - mp->b_rptr, len);
1753 mp->b_rptr += n;
1754 len -= n;
1755
1756 /*
1757 * If this is not the first zero length
1758 * message remove it
1759 */
1760 if (!first && (mp->b_wptr == mp->b_rptr)) {
1761 bcont = mp->b_cont;
1762 freeb(mp);
1763 mp = save_bp->b_cont = bcont;
1764 } else {
1765 save_bp = mp;
1766 mp = mp->b_cont;
1767 }
1768 first = 0;
1769 }
1770 } else {
1771 type = mp->b_datap->db_type;
1772 while (len) {
1773 bp = mp;
1774 save_bp = NULL;
1775
1776 /*
1777 * Find the last message of same type
1778 */
1779 while (bp && bp->b_datap->db_type == type) {
1780 ASSERT(bp->b_wptr >= bp->b_rptr);
1781 prev_bp = save_bp;
1782 save_bp = bp;
1783 bp = bp->b_cont;
1784 }
1785 if (save_bp == NULL)
1786 break;
1787 n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
1788 save_bp->b_wptr -= n;
1789 len -= n;
1790
1791 /*
1792 * If this is not the first message
1793 * and we have taken away everything
1794 * from this message, remove it
1795 */
1796
1797 if ((save_bp != mp) &&
1798 (save_bp->b_wptr == save_bp->b_rptr)) {
1799 bcont = save_bp->b_cont;
1800 freeb(save_bp);
1801 prev_bp->b_cont = bcont;
1802 }
1803 }
1804 }
1805 return (1);
1806 }
1807
1808 /*
1809 * get number of data bytes in message
1810 */
1811 size_t
msgdsize(mblk_t * bp)1812 msgdsize(mblk_t *bp)
1813 {
1814 size_t count = 0;
1815
1816 for (; bp; bp = bp->b_cont)
1817 if (bp->b_datap->db_type == M_DATA) {
1818 ASSERT(bp->b_wptr >= bp->b_rptr);
1819 count += bp->b_wptr - bp->b_rptr;
1820 }
1821 return (count);
1822 }
1823
1824 /*
1825 * Get a message off head of queue
1826 *
1827 * If queue has no buffers then mark queue
1828 * with QWANTR. (queue wants to be read by
1829 * someone when data becomes available)
1830 *
1831 * If there is something to take off then do so.
1832 * If queue falls below hi water mark turn off QFULL
1833 * flag. Decrement weighted count of queue.
1834 * Also turn off QWANTR because queue is being read.
1835 *
1836 * The queue count is maintained on a per-band basis.
1837 * Priority band 0 (normal messages) uses q_count,
1838 * q_lowat, etc. Non-zero priority bands use the
1839 * fields in their respective qband structures
1840 * (qb_count, qb_lowat, etc.) All messages appear
1841 * on the same list, linked via their b_next pointers.
1842 * q_first is the head of the list. q_count does
1843 * not reflect the size of all the messages on the
1844 * queue. It only reflects those messages in the
1845 * normal band of flow. The one exception to this
1846 * deals with high priority messages. They are in
1847 * their own conceptual "band", but are accounted
1848 * against q_count.
1849 *
1850 * If queue count is below the lo water mark and QWANTW
1851 * is set, enable the closest backq which has a service
1852 * procedure and turn off the QWANTW flag.
1853 *
1854 * getq could be built on top of rmvq, but isn't because
1855 * of performance considerations.
1856 *
1857 * A note on the use of q_count and q_mblkcnt:
1858 * q_count is the traditional byte count for messages that
1859 * have been put on a queue. Documentation tells us that
1860 * we shouldn't rely on that count, but some drivers/modules
1861 * do. What was needed, however, is a mechanism to prevent
1862 * runaway streams from consuming all of the resources,
1863 * and particularly be able to flow control zero-length
1864 * messages. q_mblkcnt is used for this purpose. It
1865 * counts the number of mblk's that are being put on
1866 * the queue. The intention here, is that each mblk should
1867 * contain one byte of data and, for the purpose of
1868 * flow-control, logically does. A queue will become
1869 * full when EITHER of these values (q_count and q_mblkcnt)
1870 * reach the highwater mark. It will clear when BOTH
1871 * of them drop below the highwater mark. And it will
1872 * backenable when BOTH of them drop below the lowwater
1873 * mark.
1874 * With this algorithm, a driver/module might be able
1875 * to find a reasonably accurate q_count, and the
1876 * framework can still try and limit resource usage.
1877 */
1878 mblk_t *
getq(queue_t * q)1879 getq(queue_t *q)
1880 {
1881 mblk_t *bp;
1882 uchar_t band = 0;
1883
1884 bp = getq_noenab(q, 0);
1885 if (bp != NULL)
1886 band = bp->b_band;
1887
1888 /*
1889 * Inlined from qbackenable().
1890 * Quick check without holding the lock.
1891 */
1892 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1893 return (bp);
1894
1895 qbackenable(q, band);
1896 return (bp);
1897 }
1898
1899 /*
1900 * Returns the number of bytes in a message (a message is defined as a
1901 * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we
1902 * also return the number of distinct mblks in the message.
1903 */
1904 int
mp_cont_len(mblk_t * bp,int * mblkcnt)1905 mp_cont_len(mblk_t *bp, int *mblkcnt)
1906 {
1907 mblk_t *mp;
1908 int mblks = 0;
1909 int bytes = 0;
1910
1911 for (mp = bp; mp != NULL; mp = mp->b_cont) {
1912 bytes += MBLKL(mp);
1913 mblks++;
1914 }
1915
1916 if (mblkcnt != NULL)
1917 *mblkcnt = mblks;
1918
1919 return (bytes);
1920 }
1921
1922 /*
1923 * Like getq() but does not backenable. This is used by the stream
1924 * head when a putback() is likely. The caller must call qbackenable()
1925 * after it is done with accessing the queue.
1926 * The rbytes arguments to getq_noneab() allows callers to specify a
1927 * the maximum number of bytes to return. If the current amount on the
1928 * queue is less than this then the entire message will be returned.
1929 * A value of 0 returns the entire message and is equivalent to the old
1930 * default behaviour prior to the addition of the rbytes argument.
1931 */
1932 mblk_t *
getq_noenab(queue_t * q,ssize_t rbytes)1933 getq_noenab(queue_t *q, ssize_t rbytes)
1934 {
1935 mblk_t *bp, *mp1;
1936 mblk_t *mp2 = NULL;
1937 qband_t *qbp;
1938 kthread_id_t freezer;
1939 int bytecnt = 0, mblkcnt = 0;
1940
1941 /* freezestr should allow its caller to call getq/putq */
1942 freezer = STREAM(q)->sd_freezer;
1943 if (freezer == curthread) {
1944 ASSERT(frozenstr(q));
1945 ASSERT(MUTEX_HELD(QLOCK(q)));
1946 } else
1947 mutex_enter(QLOCK(q));
1948
1949 if ((bp = q->q_first) == 0) {
1950 q->q_flag |= QWANTR;
1951 } else {
1952 /*
1953 * If the caller supplied a byte threshold and there is
1954 * more than this amount on the queue then break up the
1955 * the message appropriately. We can only safely do
1956 * this for M_DATA messages.
1957 */
1958 if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) &&
1959 (q->q_count > rbytes)) {
1960 /*
1961 * Inline version of mp_cont_len() which terminates
1962 * when we meet or exceed rbytes.
1963 */
1964 for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) {
1965 mblkcnt++;
1966 bytecnt += MBLKL(mp1);
1967 if (bytecnt >= rbytes)
1968 break;
1969 }
1970 /*
1971 * We need to account for the following scenarios:
1972 *
1973 * 1) Too much data in the first message:
1974 * mp1 will be the mblk which puts us over our
1975 * byte limit.
1976 * 2) Not enough data in the first message:
1977 * mp1 will be NULL.
1978 * 3) Exactly the right amount of data contained within
1979 * whole mblks:
1980 * mp1->b_cont will be where we break the message.
1981 */
1982 if (bytecnt > rbytes) {
1983 /*
1984 * Dup/copy mp1 and put what we don't need
1985 * back onto the queue. Adjust the read/write
1986 * and continuation pointers appropriately
1987 * and decrement the current mblk count to
1988 * reflect we are putting an mblk back onto
1989 * the queue.
1990 * When adjusting the message pointers, it's
1991 * OK to use the existing bytecnt and the
1992 * requested amount (rbytes) to calculate the
1993 * the new write offset (b_wptr) of what we
1994 * are taking. However, we cannot use these
1995 * values when calculating the read offset of
1996 * the mblk we are putting back on the queue.
1997 * This is because the begining (b_rptr) of the
1998 * mblk represents some arbitrary point within
1999 * the message.
2000 * It's simplest to do this by advancing b_rptr
2001 * by the new length of mp1 as we don't have to
2002 * remember any intermediate state.
2003 */
2004 ASSERT(mp1 != NULL);
2005 mblkcnt--;
2006 if ((mp2 = dupb(mp1)) == NULL &&
2007 (mp2 = copyb(mp1)) == NULL) {
2008 bytecnt = mblkcnt = 0;
2009 goto dup_failed;
2010 }
2011 mp2->b_cont = mp1->b_cont;
2012 mp1->b_wptr -= bytecnt - rbytes;
2013 mp2->b_rptr += mp1->b_wptr - mp1->b_rptr;
2014 mp1->b_cont = NULL;
2015 bytecnt = rbytes;
2016 } else {
2017 /*
2018 * Either there is not enough data in the first
2019 * message or there is no excess data to deal
2020 * with. If mp1 is NULL, we are taking the
2021 * whole message. No need to do anything.
2022 * Otherwise we assign mp1->b_cont to mp2 as
2023 * we will be putting this back onto the head of
2024 * the queue.
2025 */
2026 if (mp1 != NULL) {
2027 mp2 = mp1->b_cont;
2028 mp1->b_cont = NULL;
2029 }
2030 }
2031 /*
2032 * If mp2 is not NULL then we have part of the message
2033 * to put back onto the queue.
2034 */
2035 if (mp2 != NULL) {
2036 if ((mp2->b_next = bp->b_next) == NULL)
2037 q->q_last = mp2;
2038 else
2039 bp->b_next->b_prev = mp2;
2040 q->q_first = mp2;
2041 } else {
2042 if ((q->q_first = bp->b_next) == NULL)
2043 q->q_last = NULL;
2044 else
2045 q->q_first->b_prev = NULL;
2046 }
2047 } else {
2048 /*
2049 * Either no byte threshold was supplied, there is
2050 * not enough on the queue or we failed to
2051 * duplicate/copy a data block. In these cases we
2052 * just take the entire first message.
2053 */
2054 dup_failed:
2055 bytecnt = mp_cont_len(bp, &mblkcnt);
2056 if ((q->q_first = bp->b_next) == NULL)
2057 q->q_last = NULL;
2058 else
2059 q->q_first->b_prev = NULL;
2060 }
2061 if (bp->b_band == 0) {
2062 q->q_count -= bytecnt;
2063 q->q_mblkcnt -= mblkcnt;
2064 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2065 (q->q_mblkcnt < q->q_hiwat))) {
2066 q->q_flag &= ~QFULL;
2067 }
2068 } else {
2069 int i;
2070
2071 ASSERT(bp->b_band <= q->q_nband);
2072 ASSERT(q->q_bandp != NULL);
2073 ASSERT(MUTEX_HELD(QLOCK(q)));
2074 qbp = q->q_bandp;
2075 i = bp->b_band;
2076 while (--i > 0)
2077 qbp = qbp->qb_next;
2078 if (qbp->qb_first == qbp->qb_last) {
2079 qbp->qb_first = NULL;
2080 qbp->qb_last = NULL;
2081 } else {
2082 qbp->qb_first = bp->b_next;
2083 }
2084 qbp->qb_count -= bytecnt;
2085 qbp->qb_mblkcnt -= mblkcnt;
2086 if (qbp->qb_mblkcnt == 0 ||
2087 ((qbp->qb_count < qbp->qb_hiwat) &&
2088 (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2089 qbp->qb_flag &= ~QB_FULL;
2090 }
2091 }
2092 q->q_flag &= ~QWANTR;
2093 bp->b_next = NULL;
2094 bp->b_prev = NULL;
2095 }
2096 if (freezer != curthread)
2097 mutex_exit(QLOCK(q));
2098
2099 STR_FTEVENT_MSG(bp, q, FTEV_GETQ, 0);
2100
2101 return (bp);
2102 }
2103
2104 /*
2105 * Determine if a backenable is needed after removing a message in the
2106 * specified band.
2107 * NOTE: This routine assumes that something like getq_noenab() has been
2108 * already called.
2109 *
2110 * For the read side it is ok to hold sd_lock across calling this (and the
2111 * stream head often does).
2112 * But for the write side strwakeq might be invoked and it acquires sd_lock.
2113 */
2114 void
qbackenable(queue_t * q,uchar_t band)2115 qbackenable(queue_t *q, uchar_t band)
2116 {
2117 int backenab = 0;
2118 qband_t *qbp;
2119 kthread_id_t freezer;
2120
2121 ASSERT(q);
2122 ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
2123
2124 /*
2125 * Quick check without holding the lock.
2126 * OK since after getq() has lowered the q_count these flags
2127 * would not change unless either the qbackenable() is done by
2128 * another thread (which is ok) or the queue has gotten QFULL
2129 * in which case another backenable will take place when the queue
2130 * drops below q_lowat.
2131 */
2132 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
2133 return;
2134
2135 /* freezestr should allow its caller to call getq/putq */
2136 freezer = STREAM(q)->sd_freezer;
2137 if (freezer == curthread) {
2138 ASSERT(frozenstr(q));
2139 ASSERT(MUTEX_HELD(QLOCK(q)));
2140 } else
2141 mutex_enter(QLOCK(q));
2142
2143 if (band == 0) {
2144 if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
2145 q->q_mblkcnt < q->q_lowat)) {
2146 backenab = q->q_flag & (QWANTW|QWANTWSYNC);
2147 }
2148 } else {
2149 int i;
2150
2151 ASSERT((unsigned)band <= q->q_nband);
2152 ASSERT(q->q_bandp != NULL);
2153
2154 qbp = q->q_bandp;
2155 i = band;
2156 while (--i > 0)
2157 qbp = qbp->qb_next;
2158
2159 if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
2160 qbp->qb_mblkcnt < qbp->qb_lowat)) {
2161 backenab = qbp->qb_flag & QB_WANTW;
2162 }
2163 }
2164
2165 if (backenab == 0) {
2166 if (freezer != curthread)
2167 mutex_exit(QLOCK(q));
2168 return;
2169 }
2170
2171 /* Have to drop the lock across strwakeq and backenable */
2172 if (backenab & QWANTWSYNC)
2173 q->q_flag &= ~QWANTWSYNC;
2174 if (backenab & (QWANTW|QB_WANTW)) {
2175 if (band != 0)
2176 qbp->qb_flag &= ~QB_WANTW;
2177 else {
2178 q->q_flag &= ~QWANTW;
2179 }
2180 }
2181
2182 if (freezer != curthread)
2183 mutex_exit(QLOCK(q));
2184
2185 if (backenab & QWANTWSYNC)
2186 strwakeq(q, QWANTWSYNC);
2187 if (backenab & (QWANTW|QB_WANTW))
2188 backenable(q, band);
2189 }
2190
2191 /*
2192 * Remove a message from a queue. The queue count and other
2193 * flow control parameters are adjusted and the back queue
2194 * enabled if necessary.
2195 *
2196 * rmvq can be called with the stream frozen, but other utility functions
2197 * holding QLOCK, and by streams modules without any locks/frozen.
2198 */
2199 void
rmvq(queue_t * q,mblk_t * mp)2200 rmvq(queue_t *q, mblk_t *mp)
2201 {
2202 ASSERT(mp != NULL);
2203
2204 rmvq_noenab(q, mp);
2205 if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
2206 /*
2207 * qbackenable can handle a frozen stream but not a "random"
2208 * qlock being held. Drop lock across qbackenable.
2209 */
2210 mutex_exit(QLOCK(q));
2211 qbackenable(q, mp->b_band);
2212 mutex_enter(QLOCK(q));
2213 } else {
2214 qbackenable(q, mp->b_band);
2215 }
2216 }
2217
2218 /*
2219 * Like rmvq() but without any backenabling.
2220 * This exists to handle SR_CONSOL_DATA in strrput().
2221 */
2222 void
rmvq_noenab(queue_t * q,mblk_t * mp)2223 rmvq_noenab(queue_t *q, mblk_t *mp)
2224 {
2225 int i;
2226 qband_t *qbp = NULL;
2227 kthread_id_t freezer;
2228 int bytecnt = 0, mblkcnt = 0;
2229
2230 freezer = STREAM(q)->sd_freezer;
2231 if (freezer == curthread) {
2232 ASSERT(frozenstr(q));
2233 ASSERT(MUTEX_HELD(QLOCK(q)));
2234 } else if (MUTEX_HELD(QLOCK(q))) {
2235 /* Don't drop lock on exit */
2236 freezer = curthread;
2237 } else
2238 mutex_enter(QLOCK(q));
2239
2240 ASSERT(mp->b_band <= q->q_nband);
2241 if (mp->b_band != 0) { /* Adjust band pointers */
2242 ASSERT(q->q_bandp != NULL);
2243 qbp = q->q_bandp;
2244 i = mp->b_band;
2245 while (--i > 0)
2246 qbp = qbp->qb_next;
2247 if (mp == qbp->qb_first) {
2248 if (mp->b_next && mp->b_band == mp->b_next->b_band)
2249 qbp->qb_first = mp->b_next;
2250 else
2251 qbp->qb_first = NULL;
2252 }
2253 if (mp == qbp->qb_last) {
2254 if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
2255 qbp->qb_last = mp->b_prev;
2256 else
2257 qbp->qb_last = NULL;
2258 }
2259 }
2260
2261 /*
2262 * Remove the message from the list.
2263 */
2264 if (mp->b_prev)
2265 mp->b_prev->b_next = mp->b_next;
2266 else
2267 q->q_first = mp->b_next;
2268 if (mp->b_next)
2269 mp->b_next->b_prev = mp->b_prev;
2270 else
2271 q->q_last = mp->b_prev;
2272 mp->b_next = NULL;
2273 mp->b_prev = NULL;
2274
2275 /* Get the size of the message for q_count accounting */
2276 bytecnt = mp_cont_len(mp, &mblkcnt);
2277
2278 if (mp->b_band == 0) { /* Perform q_count accounting */
2279 q->q_count -= bytecnt;
2280 q->q_mblkcnt -= mblkcnt;
2281 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2282 (q->q_mblkcnt < q->q_hiwat))) {
2283 q->q_flag &= ~QFULL;
2284 }
2285 } else { /* Perform qb_count accounting */
2286 qbp->qb_count -= bytecnt;
2287 qbp->qb_mblkcnt -= mblkcnt;
2288 if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) &&
2289 (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2290 qbp->qb_flag &= ~QB_FULL;
2291 }
2292 }
2293 if (freezer != curthread)
2294 mutex_exit(QLOCK(q));
2295
2296 STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, 0);
2297 }
2298
2299 /*
2300 * Empty a queue.
2301 * If flag is set, remove all messages. Otherwise, remove
2302 * only non-control messages. If queue falls below its low
2303 * water mark, and QWANTW is set, enable the nearest upstream
2304 * service procedure.
2305 *
2306 * Historical note: when merging the M_FLUSH code in strrput with this
2307 * code one difference was discovered. flushq did not have a check
2308 * for q_lowat == 0 in the backenabling test.
2309 *
2310 * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
2311 * if one exists on the queue.
2312 */
2313 void
flushq_common(queue_t * q,int flag,int pcproto_flag)2314 flushq_common(queue_t *q, int flag, int pcproto_flag)
2315 {
2316 mblk_t *mp, *nmp;
2317 qband_t *qbp;
2318 int backenab = 0;
2319 unsigned char bpri;
2320 unsigned char qbf[NBAND]; /* band flushing backenable flags */
2321
2322 if (q->q_first == NULL)
2323 return;
2324
2325 mutex_enter(QLOCK(q));
2326 mp = q->q_first;
2327 q->q_first = NULL;
2328 q->q_last = NULL;
2329 q->q_count = 0;
2330 q->q_mblkcnt = 0;
2331 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2332 qbp->qb_first = NULL;
2333 qbp->qb_last = NULL;
2334 qbp->qb_count = 0;
2335 qbp->qb_mblkcnt = 0;
2336 qbp->qb_flag &= ~QB_FULL;
2337 }
2338 q->q_flag &= ~QFULL;
2339 mutex_exit(QLOCK(q));
2340 while (mp) {
2341 nmp = mp->b_next;
2342 mp->b_next = mp->b_prev = NULL;
2343
2344 STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, 0);
2345
2346 if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
2347 (void) putq(q, mp);
2348 else if (flag || datamsg(mp->b_datap->db_type))
2349 freemsg(mp);
2350 else
2351 (void) putq(q, mp);
2352 mp = nmp;
2353 }
2354 bpri = 1;
2355 mutex_enter(QLOCK(q));
2356 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2357 if ((qbp->qb_flag & QB_WANTW) &&
2358 (((qbp->qb_count < qbp->qb_lowat) &&
2359 (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
2360 qbp->qb_lowat == 0)) {
2361 qbp->qb_flag &= ~QB_WANTW;
2362 backenab = 1;
2363 qbf[bpri] = 1;
2364 } else
2365 qbf[bpri] = 0;
2366 bpri++;
2367 }
2368 ASSERT(bpri == (unsigned char)(q->q_nband + 1));
2369 if ((q->q_flag & QWANTW) &&
2370 (((q->q_count < q->q_lowat) &&
2371 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2372 q->q_flag &= ~QWANTW;
2373 backenab = 1;
2374 qbf[0] = 1;
2375 } else
2376 qbf[0] = 0;
2377
2378 /*
2379 * If any band can now be written to, and there is a writer
2380 * for that band, then backenable the closest service procedure.
2381 */
2382 if (backenab) {
2383 mutex_exit(QLOCK(q));
2384 for (bpri = q->q_nband; bpri != 0; bpri--)
2385 if (qbf[bpri])
2386 backenable(q, bpri);
2387 if (qbf[0])
2388 backenable(q, 0);
2389 } else
2390 mutex_exit(QLOCK(q));
2391 }
2392
2393 /*
2394 * The real flushing takes place in flushq_common. This is done so that
2395 * a flag which specifies whether or not M_PCPROTO messages should be flushed
2396 * or not. Currently the only place that uses this flag is the stream head.
2397 */
2398 void
flushq(queue_t * q,int flag)2399 flushq(queue_t *q, int flag)
2400 {
2401 flushq_common(q, flag, 0);
2402 }
2403
2404 /*
2405 * Flush the queue of messages of the given priority band.
2406 * There is some duplication of code between flushq and flushband.
2407 * This is because we want to optimize the code as much as possible.
2408 * The assumption is that there will be more messages in the normal
2409 * (priority 0) band than in any other.
2410 *
2411 * Historical note: when merging the M_FLUSH code in strrput with this
2412 * code one difference was discovered. flushband had an extra check for
2413 * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
2414 * case. That check does not match the man page for flushband and was not
2415 * in the strrput flush code hence it was removed.
2416 */
2417 void
flushband(queue_t * q,unsigned char pri,int flag)2418 flushband(queue_t *q, unsigned char pri, int flag)
2419 {
2420 mblk_t *mp;
2421 mblk_t *nmp;
2422 mblk_t *last;
2423 qband_t *qbp;
2424 int band;
2425
2426 ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
2427 if (pri > q->q_nband) {
2428 return;
2429 }
2430 mutex_enter(QLOCK(q));
2431 if (pri == 0) {
2432 mp = q->q_first;
2433 q->q_first = NULL;
2434 q->q_last = NULL;
2435 q->q_count = 0;
2436 q->q_mblkcnt = 0;
2437 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2438 qbp->qb_first = NULL;
2439 qbp->qb_last = NULL;
2440 qbp->qb_count = 0;
2441 qbp->qb_mblkcnt = 0;
2442 qbp->qb_flag &= ~QB_FULL;
2443 }
2444 q->q_flag &= ~QFULL;
2445 mutex_exit(QLOCK(q));
2446 while (mp) {
2447 nmp = mp->b_next;
2448 mp->b_next = mp->b_prev = NULL;
2449 if ((mp->b_band == 0) &&
2450 ((flag == FLUSHALL) ||
2451 datamsg(mp->b_datap->db_type)))
2452 freemsg(mp);
2453 else
2454 (void) putq(q, mp);
2455 mp = nmp;
2456 }
2457 mutex_enter(QLOCK(q));
2458 if ((q->q_flag & QWANTW) &&
2459 (((q->q_count < q->q_lowat) &&
2460 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2461 q->q_flag &= ~QWANTW;
2462 mutex_exit(QLOCK(q));
2463
2464 backenable(q, pri);
2465 } else
2466 mutex_exit(QLOCK(q));
2467 } else { /* pri != 0 */
2468 boolean_t flushed = B_FALSE;
2469 band = pri;
2470
2471 ASSERT(MUTEX_HELD(QLOCK(q)));
2472 qbp = q->q_bandp;
2473 while (--band > 0)
2474 qbp = qbp->qb_next;
2475 mp = qbp->qb_first;
2476 if (mp == NULL) {
2477 mutex_exit(QLOCK(q));
2478 return;
2479 }
2480 last = qbp->qb_last->b_next;
2481 /*
2482 * rmvq_noenab() and freemsg() are called for each mblk that
2483 * meets the criteria. The loop is executed until the last
2484 * mblk has been processed.
2485 */
2486 while (mp != last) {
2487 ASSERT(mp->b_band == pri);
2488 nmp = mp->b_next;
2489 if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
2490 rmvq_noenab(q, mp);
2491 freemsg(mp);
2492 flushed = B_TRUE;
2493 }
2494 mp = nmp;
2495 }
2496 mutex_exit(QLOCK(q));
2497
2498 /*
2499 * If any mblk(s) has been freed, we know that qbackenable()
2500 * will need to be called.
2501 */
2502 if (flushed)
2503 qbackenable(q, pri);
2504 }
2505 }
2506
2507 /*
2508 * Return 1 if the queue is not full. If the queue is full, return
2509 * 0 (may not put message) and set QWANTW flag (caller wants to write
2510 * to the queue).
2511 */
2512 int
canput(queue_t * q)2513 canput(queue_t *q)
2514 {
2515 TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
2516
2517 /* this is for loopback transports, they should not do a canput */
2518 ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
2519
2520 /* Find next forward module that has a service procedure */
2521 q = q->q_nfsrv;
2522
2523 if (!(q->q_flag & QFULL)) {
2524 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2525 return (1);
2526 }
2527 mutex_enter(QLOCK(q));
2528 if (q->q_flag & QFULL) {
2529 q->q_flag |= QWANTW;
2530 mutex_exit(QLOCK(q));
2531 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
2532 return (0);
2533 }
2534 mutex_exit(QLOCK(q));
2535 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2536 return (1);
2537 }
2538
2539 /*
2540 * This is the new canput for use with priority bands. Return 1 if the
2541 * band is not full. If the band is full, return 0 (may not put message)
2542 * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
2543 * write to the queue).
2544 */
2545 int
bcanput(queue_t * q,unsigned char pri)2546 bcanput(queue_t *q, unsigned char pri)
2547 {
2548 qband_t *qbp;
2549
2550 TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
2551 if (!q)
2552 return (0);
2553
2554 /* Find next forward module that has a service procedure */
2555 q = q->q_nfsrv;
2556
2557 mutex_enter(QLOCK(q));
2558 if (pri == 0) {
2559 if (q->q_flag & QFULL) {
2560 q->q_flag |= QWANTW;
2561 mutex_exit(QLOCK(q));
2562 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2563 "bcanput:%p %X %d", q, pri, 0);
2564 return (0);
2565 }
2566 } else { /* pri != 0 */
2567 if (pri > q->q_nband) {
2568 /*
2569 * No band exists yet, so return success.
2570 */
2571 mutex_exit(QLOCK(q));
2572 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2573 "bcanput:%p %X %d", q, pri, 1);
2574 return (1);
2575 }
2576 qbp = q->q_bandp;
2577 while (--pri)
2578 qbp = qbp->qb_next;
2579 if (qbp->qb_flag & QB_FULL) {
2580 qbp->qb_flag |= QB_WANTW;
2581 mutex_exit(QLOCK(q));
2582 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2583 "bcanput:%p %X %d", q, pri, 0);
2584 return (0);
2585 }
2586 }
2587 mutex_exit(QLOCK(q));
2588 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2589 "bcanput:%p %X %d", q, pri, 1);
2590 return (1);
2591 }
2592
2593 /*
2594 * Put a message on a queue.
2595 *
2596 * Messages are enqueued on a priority basis. The priority classes
2597 * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
2598 * and B_NORMAL (type < QPCTL && band == 0).
2599 *
2600 * Add appropriate weighted data block sizes to queue count.
2601 * If queue hits high water mark then set QFULL flag.
2602 *
2603 * If QNOENAB is not set (putq is allowed to enable the queue),
2604 * enable the queue only if the message is PRIORITY,
2605 * or the QWANTR flag is set (indicating that the service procedure
2606 * is ready to read the queue. This implies that a service
2607 * procedure must NEVER put a high priority message back on its own
2608 * queue, as this would result in an infinite loop (!).
2609 */
2610 int
putq(queue_t * q,mblk_t * bp)2611 putq(queue_t *q, mblk_t *bp)
2612 {
2613 mblk_t *tmp;
2614 qband_t *qbp = NULL;
2615 int mcls = (int)queclass(bp);
2616 kthread_id_t freezer;
2617 int bytecnt = 0, mblkcnt = 0;
2618
2619 freezer = STREAM(q)->sd_freezer;
2620 if (freezer == curthread) {
2621 ASSERT(frozenstr(q));
2622 ASSERT(MUTEX_HELD(QLOCK(q)));
2623 } else
2624 mutex_enter(QLOCK(q));
2625
2626 /*
2627 * Make sanity checks and if qband structure is not yet
2628 * allocated, do so.
2629 */
2630 if (mcls == QPCTL) {
2631 if (bp->b_band != 0)
2632 bp->b_band = 0; /* force to be correct */
2633 } else if (bp->b_band != 0) {
2634 int i;
2635 qband_t **qbpp;
2636
2637 if (bp->b_band > q->q_nband) {
2638
2639 /*
2640 * The qband structure for this priority band is
2641 * not on the queue yet, so we have to allocate
2642 * one on the fly. It would be wasteful to
2643 * associate the qband structures with every
2644 * queue when the queues are allocated. This is
2645 * because most queues will only need the normal
2646 * band of flow which can be described entirely
2647 * by the queue itself.
2648 */
2649 qbpp = &q->q_bandp;
2650 while (*qbpp)
2651 qbpp = &(*qbpp)->qb_next;
2652 while (bp->b_band > q->q_nband) {
2653 if ((*qbpp = allocband()) == NULL) {
2654 if (freezer != curthread)
2655 mutex_exit(QLOCK(q));
2656 return (0);
2657 }
2658 (*qbpp)->qb_hiwat = q->q_hiwat;
2659 (*qbpp)->qb_lowat = q->q_lowat;
2660 q->q_nband++;
2661 qbpp = &(*qbpp)->qb_next;
2662 }
2663 }
2664 ASSERT(MUTEX_HELD(QLOCK(q)));
2665 qbp = q->q_bandp;
2666 i = bp->b_band;
2667 while (--i)
2668 qbp = qbp->qb_next;
2669 }
2670
2671 /*
2672 * If queue is empty, add the message and initialize the pointers.
2673 * Otherwise, adjust message pointers and queue pointers based on
2674 * the type of the message and where it belongs on the queue. Some
2675 * code is duplicated to minimize the number of conditionals and
2676 * hopefully minimize the amount of time this routine takes.
2677 */
2678 if (!q->q_first) {
2679 bp->b_next = NULL;
2680 bp->b_prev = NULL;
2681 q->q_first = bp;
2682 q->q_last = bp;
2683 if (qbp) {
2684 qbp->qb_first = bp;
2685 qbp->qb_last = bp;
2686 }
2687 } else if (!qbp) { /* bp->b_band == 0 */
2688
2689 /*
2690 * If queue class of message is less than or equal to
2691 * that of the last one on the queue, tack on to the end.
2692 */
2693 tmp = q->q_last;
2694 if (mcls <= (int)queclass(tmp)) {
2695 bp->b_next = NULL;
2696 bp->b_prev = tmp;
2697 tmp->b_next = bp;
2698 q->q_last = bp;
2699 } else {
2700 tmp = q->q_first;
2701 while ((int)queclass(tmp) >= mcls)
2702 tmp = tmp->b_next;
2703
2704 /*
2705 * Insert bp before tmp.
2706 */
2707 bp->b_next = tmp;
2708 bp->b_prev = tmp->b_prev;
2709 if (tmp->b_prev)
2710 tmp->b_prev->b_next = bp;
2711 else
2712 q->q_first = bp;
2713 tmp->b_prev = bp;
2714 }
2715 } else { /* bp->b_band != 0 */
2716 if (qbp->qb_first) {
2717 tmp = qbp->qb_last;
2718
2719 /*
2720 * Insert bp after the last message in this band.
2721 */
2722 bp->b_next = tmp->b_next;
2723 if (tmp->b_next)
2724 tmp->b_next->b_prev = bp;
2725 else
2726 q->q_last = bp;
2727 bp->b_prev = tmp;
2728 tmp->b_next = bp;
2729 } else {
2730 tmp = q->q_last;
2731 if ((mcls < (int)queclass(tmp)) ||
2732 (bp->b_band <= tmp->b_band)) {
2733
2734 /*
2735 * Tack bp on end of queue.
2736 */
2737 bp->b_next = NULL;
2738 bp->b_prev = tmp;
2739 tmp->b_next = bp;
2740 q->q_last = bp;
2741 } else {
2742 tmp = q->q_first;
2743 while (tmp->b_datap->db_type >= QPCTL)
2744 tmp = tmp->b_next;
2745 while (tmp->b_band >= bp->b_band)
2746 tmp = tmp->b_next;
2747
2748 /*
2749 * Insert bp before tmp.
2750 */
2751 bp->b_next = tmp;
2752 bp->b_prev = tmp->b_prev;
2753 if (tmp->b_prev)
2754 tmp->b_prev->b_next = bp;
2755 else
2756 q->q_first = bp;
2757 tmp->b_prev = bp;
2758 }
2759 qbp->qb_first = bp;
2760 }
2761 qbp->qb_last = bp;
2762 }
2763
2764 /* Get message byte count for q_count accounting */
2765 bytecnt = mp_cont_len(bp, &mblkcnt);
2766
2767 if (qbp) {
2768 qbp->qb_count += bytecnt;
2769 qbp->qb_mblkcnt += mblkcnt;
2770 if ((qbp->qb_count >= qbp->qb_hiwat) ||
2771 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2772 qbp->qb_flag |= QB_FULL;
2773 }
2774 } else {
2775 q->q_count += bytecnt;
2776 q->q_mblkcnt += mblkcnt;
2777 if ((q->q_count >= q->q_hiwat) ||
2778 (q->q_mblkcnt >= q->q_hiwat)) {
2779 q->q_flag |= QFULL;
2780 }
2781 }
2782
2783 STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, 0);
2784
2785 if ((mcls > QNORM) ||
2786 (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
2787 qenable_locked(q);
2788 ASSERT(MUTEX_HELD(QLOCK(q)));
2789 if (freezer != curthread)
2790 mutex_exit(QLOCK(q));
2791
2792 return (1);
2793 }
2794
2795 /*
2796 * Put stuff back at beginning of Q according to priority order.
2797 * See comment on putq above for details.
2798 */
2799 int
putbq(queue_t * q,mblk_t * bp)2800 putbq(queue_t *q, mblk_t *bp)
2801 {
2802 mblk_t *tmp;
2803 qband_t *qbp = NULL;
2804 int mcls = (int)queclass(bp);
2805 kthread_id_t freezer;
2806 int bytecnt = 0, mblkcnt = 0;
2807
2808 ASSERT(q && bp);
2809 ASSERT(bp->b_next == NULL);
2810 freezer = STREAM(q)->sd_freezer;
2811 if (freezer == curthread) {
2812 ASSERT(frozenstr(q));
2813 ASSERT(MUTEX_HELD(QLOCK(q)));
2814 } else
2815 mutex_enter(QLOCK(q));
2816
2817 /*
2818 * Make sanity checks and if qband structure is not yet
2819 * allocated, do so.
2820 */
2821 if (mcls == QPCTL) {
2822 if (bp->b_band != 0)
2823 bp->b_band = 0; /* force to be correct */
2824 } else if (bp->b_band != 0) {
2825 int i;
2826 qband_t **qbpp;
2827
2828 if (bp->b_band > q->q_nband) {
2829 qbpp = &q->q_bandp;
2830 while (*qbpp)
2831 qbpp = &(*qbpp)->qb_next;
2832 while (bp->b_band > q->q_nband) {
2833 if ((*qbpp = allocband()) == NULL) {
2834 if (freezer != curthread)
2835 mutex_exit(QLOCK(q));
2836 return (0);
2837 }
2838 (*qbpp)->qb_hiwat = q->q_hiwat;
2839 (*qbpp)->qb_lowat = q->q_lowat;
2840 q->q_nband++;
2841 qbpp = &(*qbpp)->qb_next;
2842 }
2843 }
2844 qbp = q->q_bandp;
2845 i = bp->b_band;
2846 while (--i)
2847 qbp = qbp->qb_next;
2848 }
2849
2850 /*
2851 * If queue is empty or if message is high priority,
2852 * place on the front of the queue.
2853 */
2854 tmp = q->q_first;
2855 if ((!tmp) || (mcls == QPCTL)) {
2856 bp->b_next = tmp;
2857 if (tmp)
2858 tmp->b_prev = bp;
2859 else
2860 q->q_last = bp;
2861 q->q_first = bp;
2862 bp->b_prev = NULL;
2863 if (qbp) {
2864 qbp->qb_first = bp;
2865 qbp->qb_last = bp;
2866 }
2867 } else if (qbp) { /* bp->b_band != 0 */
2868 tmp = qbp->qb_first;
2869 if (tmp) {
2870
2871 /*
2872 * Insert bp before the first message in this band.
2873 */
2874 bp->b_next = tmp;
2875 bp->b_prev = tmp->b_prev;
2876 if (tmp->b_prev)
2877 tmp->b_prev->b_next = bp;
2878 else
2879 q->q_first = bp;
2880 tmp->b_prev = bp;
2881 } else {
2882 tmp = q->q_last;
2883 if ((mcls < (int)queclass(tmp)) ||
2884 (bp->b_band < tmp->b_band)) {
2885
2886 /*
2887 * Tack bp on end of queue.
2888 */
2889 bp->b_next = NULL;
2890 bp->b_prev = tmp;
2891 tmp->b_next = bp;
2892 q->q_last = bp;
2893 } else {
2894 tmp = q->q_first;
2895 while (tmp->b_datap->db_type >= QPCTL)
2896 tmp = tmp->b_next;
2897 while (tmp->b_band > bp->b_band)
2898 tmp = tmp->b_next;
2899
2900 /*
2901 * Insert bp before tmp.
2902 */
2903 bp->b_next = tmp;
2904 bp->b_prev = tmp->b_prev;
2905 if (tmp->b_prev)
2906 tmp->b_prev->b_next = bp;
2907 else
2908 q->q_first = bp;
2909 tmp->b_prev = bp;
2910 }
2911 qbp->qb_last = bp;
2912 }
2913 qbp->qb_first = bp;
2914 } else { /* bp->b_band == 0 && !QPCTL */
2915
2916 /*
2917 * If the queue class or band is less than that of the last
2918 * message on the queue, tack bp on the end of the queue.
2919 */
2920 tmp = q->q_last;
2921 if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
2922 bp->b_next = NULL;
2923 bp->b_prev = tmp;
2924 tmp->b_next = bp;
2925 q->q_last = bp;
2926 } else {
2927 tmp = q->q_first;
2928 while (tmp->b_datap->db_type >= QPCTL)
2929 tmp = tmp->b_next;
2930 while (tmp->b_band > bp->b_band)
2931 tmp = tmp->b_next;
2932
2933 /*
2934 * Insert bp before tmp.
2935 */
2936 bp->b_next = tmp;
2937 bp->b_prev = tmp->b_prev;
2938 if (tmp->b_prev)
2939 tmp->b_prev->b_next = bp;
2940 else
2941 q->q_first = bp;
2942 tmp->b_prev = bp;
2943 }
2944 }
2945
2946 /* Get message byte count for q_count accounting */
2947 bytecnt = mp_cont_len(bp, &mblkcnt);
2948
2949 if (qbp) {
2950 qbp->qb_count += bytecnt;
2951 qbp->qb_mblkcnt += mblkcnt;
2952 if ((qbp->qb_count >= qbp->qb_hiwat) ||
2953 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2954 qbp->qb_flag |= QB_FULL;
2955 }
2956 } else {
2957 q->q_count += bytecnt;
2958 q->q_mblkcnt += mblkcnt;
2959 if ((q->q_count >= q->q_hiwat) ||
2960 (q->q_mblkcnt >= q->q_hiwat)) {
2961 q->q_flag |= QFULL;
2962 }
2963 }
2964
2965 STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, 0);
2966
2967 if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
2968 qenable_locked(q);
2969 ASSERT(MUTEX_HELD(QLOCK(q)));
2970 if (freezer != curthread)
2971 mutex_exit(QLOCK(q));
2972
2973 return (1);
2974 }
2975
2976 /*
2977 * Insert a message before an existing message on the queue. If the
2978 * existing message is NULL, the new messages is placed on the end of
2979 * the queue. The queue class of the new message is ignored. However,
2980 * the priority band of the new message must adhere to the following
2981 * ordering:
2982 *
2983 * emp->b_prev->b_band >= mp->b_band >= emp->b_band.
2984 *
2985 * All flow control parameters are updated.
2986 *
2987 * insq can be called with the stream frozen, but other utility functions
2988 * holding QLOCK, and by streams modules without any locks/frozen.
2989 */
2990 int
insq(queue_t * q,mblk_t * emp,mblk_t * mp)2991 insq(queue_t *q, mblk_t *emp, mblk_t *mp)
2992 {
2993 mblk_t *tmp;
2994 qband_t *qbp = NULL;
2995 int mcls = (int)queclass(mp);
2996 kthread_id_t freezer;
2997 int bytecnt = 0, mblkcnt = 0;
2998
2999 freezer = STREAM(q)->sd_freezer;
3000 if (freezer == curthread) {
3001 ASSERT(frozenstr(q));
3002 ASSERT(MUTEX_HELD(QLOCK(q)));
3003 } else if (MUTEX_HELD(QLOCK(q))) {
3004 /* Don't drop lock on exit */
3005 freezer = curthread;
3006 } else
3007 mutex_enter(QLOCK(q));
3008
3009 if (mcls == QPCTL) {
3010 if (mp->b_band != 0)
3011 mp->b_band = 0; /* force to be correct */
3012 if (emp && emp->b_prev &&
3013 (emp->b_prev->b_datap->db_type < QPCTL))
3014 goto badord;
3015 }
3016 if (emp) {
3017 if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
3018 (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
3019 (emp->b_prev->b_band < mp->b_band))) {
3020 goto badord;
3021 }
3022 } else {
3023 tmp = q->q_last;
3024 if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
3025 badord:
3026 cmn_err(CE_WARN,
3027 "insq: attempt to insert message out of order "
3028 "on q %p", (void *)q);
3029 if (freezer != curthread)
3030 mutex_exit(QLOCK(q));
3031 return (0);
3032 }
3033 }
3034
3035 if (mp->b_band != 0) {
3036 int i;
3037 qband_t **qbpp;
3038
3039 if (mp->b_band > q->q_nband) {
3040 qbpp = &q->q_bandp;
3041 while (*qbpp)
3042 qbpp = &(*qbpp)->qb_next;
3043 while (mp->b_band > q->q_nband) {
3044 if ((*qbpp = allocband()) == NULL) {
3045 if (freezer != curthread)
3046 mutex_exit(QLOCK(q));
3047 return (0);
3048 }
3049 (*qbpp)->qb_hiwat = q->q_hiwat;
3050 (*qbpp)->qb_lowat = q->q_lowat;
3051 q->q_nband++;
3052 qbpp = &(*qbpp)->qb_next;
3053 }
3054 }
3055 qbp = q->q_bandp;
3056 i = mp->b_band;
3057 while (--i)
3058 qbp = qbp->qb_next;
3059 }
3060
3061 if ((mp->b_next = emp) != NULL) {
3062 if ((mp->b_prev = emp->b_prev) != NULL)
3063 emp->b_prev->b_next = mp;
3064 else
3065 q->q_first = mp;
3066 emp->b_prev = mp;
3067 } else {
3068 if ((mp->b_prev = q->q_last) != NULL)
3069 q->q_last->b_next = mp;
3070 else
3071 q->q_first = mp;
3072 q->q_last = mp;
3073 }
3074
3075 /* Get mblk and byte count for q_count accounting */
3076 bytecnt = mp_cont_len(mp, &mblkcnt);
3077
3078 if (qbp) { /* adjust qband pointers and count */
3079 if (!qbp->qb_first) {
3080 qbp->qb_first = mp;
3081 qbp->qb_last = mp;
3082 } else {
3083 if (mp->b_prev == NULL || (mp->b_prev != NULL &&
3084 (mp->b_prev->b_band != mp->b_band)))
3085 qbp->qb_first = mp;
3086 else if (mp->b_next == NULL || (mp->b_next != NULL &&
3087 (mp->b_next->b_band != mp->b_band)))
3088 qbp->qb_last = mp;
3089 }
3090 qbp->qb_count += bytecnt;
3091 qbp->qb_mblkcnt += mblkcnt;
3092 if ((qbp->qb_count >= qbp->qb_hiwat) ||
3093 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
3094 qbp->qb_flag |= QB_FULL;
3095 }
3096 } else {
3097 q->q_count += bytecnt;
3098 q->q_mblkcnt += mblkcnt;
3099 if ((q->q_count >= q->q_hiwat) ||
3100 (q->q_mblkcnt >= q->q_hiwat)) {
3101 q->q_flag |= QFULL;
3102 }
3103 }
3104
3105 STR_FTEVENT_MSG(mp, q, FTEV_INSQ, 0);
3106
3107 if (canenable(q) && (q->q_flag & QWANTR))
3108 qenable_locked(q);
3109
3110 ASSERT(MUTEX_HELD(QLOCK(q)));
3111 if (freezer != curthread)
3112 mutex_exit(QLOCK(q));
3113
3114 return (1);
3115 }
3116
3117 /*
3118 * Create and put a control message on queue.
3119 */
3120 int
putctl(queue_t * q,int type)3121 putctl(queue_t *q, int type)
3122 {
3123 mblk_t *bp;
3124
3125 if ((datamsg(type) && (type != M_DELAY)) ||
3126 (bp = allocb_tryhard(0)) == NULL)
3127 return (0);
3128 bp->b_datap->db_type = (unsigned char) type;
3129
3130 put(q, bp);
3131
3132 return (1);
3133 }
3134
3135 /*
3136 * Control message with a single-byte parameter
3137 */
3138 int
putctl1(queue_t * q,int type,int param)3139 putctl1(queue_t *q, int type, int param)
3140 {
3141 mblk_t *bp;
3142
3143 if ((datamsg(type) && (type != M_DELAY)) ||
3144 (bp = allocb_tryhard(1)) == NULL)
3145 return (0);
3146 bp->b_datap->db_type = (unsigned char)type;
3147 *bp->b_wptr++ = (unsigned char)param;
3148
3149 put(q, bp);
3150
3151 return (1);
3152 }
3153
3154 int
putnextctl1(queue_t * q,int type,int param)3155 putnextctl1(queue_t *q, int type, int param)
3156 {
3157 mblk_t *bp;
3158
3159 if ((datamsg(type) && (type != M_DELAY)) ||
3160 ((bp = allocb_tryhard(1)) == NULL))
3161 return (0);
3162
3163 bp->b_datap->db_type = (unsigned char)type;
3164 *bp->b_wptr++ = (unsigned char)param;
3165
3166 putnext(q, bp);
3167
3168 return (1);
3169 }
3170
3171 int
putnextctl(queue_t * q,int type)3172 putnextctl(queue_t *q, int type)
3173 {
3174 mblk_t *bp;
3175
3176 if ((datamsg(type) && (type != M_DELAY)) ||
3177 ((bp = allocb_tryhard(0)) == NULL))
3178 return (0);
3179 bp->b_datap->db_type = (unsigned char)type;
3180
3181 putnext(q, bp);
3182
3183 return (1);
3184 }
3185
3186 /*
3187 * Return the queue upstream from this one
3188 */
3189 queue_t *
backq(queue_t * q)3190 backq(queue_t *q)
3191 {
3192 q = _OTHERQ(q);
3193 if (q->q_next) {
3194 q = q->q_next;
3195 return (_OTHERQ(q));
3196 }
3197 return (NULL);
3198 }
3199
3200 /*
3201 * Send a block back up the queue in reverse from this
3202 * one (e.g. to respond to ioctls)
3203 */
3204 void
qreply(queue_t * q,mblk_t * bp)3205 qreply(queue_t *q, mblk_t *bp)
3206 {
3207 ASSERT(q && bp);
3208
3209 putnext(_OTHERQ(q), bp);
3210 }
3211
3212 /*
3213 * Streams Queue Scheduling
3214 *
3215 * Queues are enabled through qenable() when they have messages to
3216 * process. They are serviced by queuerun(), which runs each enabled
3217 * queue's service procedure. The call to queuerun() is processor
3218 * dependent - the general principle is that it be run whenever a queue
3219 * is enabled but before returning to user level. For system calls,
3220 * the function runqueues() is called if their action causes a queue
3221 * to be enabled. For device interrupts, queuerun() should be
3222 * called before returning from the last level of interrupt. Beyond
3223 * this, no timing assumptions should be made about queue scheduling.
3224 */
3225
3226 /*
3227 * Enable a queue: put it on list of those whose service procedures are
3228 * ready to run and set up the scheduling mechanism.
3229 * The broadcast is done outside the mutex -> to avoid the woken thread
3230 * from contending with the mutex. This is OK 'cos the queue has been
3231 * enqueued on the runlist and flagged safely at this point.
3232 */
3233 void
qenable(queue_t * q)3234 qenable(queue_t *q)
3235 {
3236 mutex_enter(QLOCK(q));
3237 qenable_locked(q);
3238 mutex_exit(QLOCK(q));
3239 }
3240 /*
3241 * Return number of messages on queue
3242 */
3243 int
qsize(queue_t * qp)3244 qsize(queue_t *qp)
3245 {
3246 int count = 0;
3247 mblk_t *mp;
3248
3249 mutex_enter(QLOCK(qp));
3250 for (mp = qp->q_first; mp; mp = mp->b_next)
3251 count++;
3252 mutex_exit(QLOCK(qp));
3253 return (count);
3254 }
3255
3256 /*
3257 * noenable - set queue so that putq() will not enable it.
3258 * enableok - set queue so that putq() can enable it.
3259 */
3260 void
noenable(queue_t * q)3261 noenable(queue_t *q)
3262 {
3263 mutex_enter(QLOCK(q));
3264 q->q_flag |= QNOENB;
3265 mutex_exit(QLOCK(q));
3266 }
3267
3268 void
enableok(queue_t * q)3269 enableok(queue_t *q)
3270 {
3271 mutex_enter(QLOCK(q));
3272 q->q_flag &= ~QNOENB;
3273 mutex_exit(QLOCK(q));
3274 }
3275
3276 /*
3277 * Set queue fields.
3278 */
3279 int
strqset(queue_t * q,qfields_t what,unsigned char pri,intptr_t val)3280 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
3281 {
3282 qband_t *qbp = NULL;
3283 queue_t *wrq;
3284 int error = 0;
3285 kthread_id_t freezer;
3286
3287 freezer = STREAM(q)->sd_freezer;
3288 if (freezer == curthread) {
3289 ASSERT(frozenstr(q));
3290 ASSERT(MUTEX_HELD(QLOCK(q)));
3291 } else
3292 mutex_enter(QLOCK(q));
3293
3294 if (what >= QBAD) {
3295 error = EINVAL;
3296 goto done;
3297 }
3298 if (pri != 0) {
3299 int i;
3300 qband_t **qbpp;
3301
3302 if (pri > q->q_nband) {
3303 qbpp = &q->q_bandp;
3304 while (*qbpp)
3305 qbpp = &(*qbpp)->qb_next;
3306 while (pri > q->q_nband) {
3307 if ((*qbpp = allocband()) == NULL) {
3308 error = EAGAIN;
3309 goto done;
3310 }
3311 (*qbpp)->qb_hiwat = q->q_hiwat;
3312 (*qbpp)->qb_lowat = q->q_lowat;
3313 q->q_nband++;
3314 qbpp = &(*qbpp)->qb_next;
3315 }
3316 }
3317 qbp = q->q_bandp;
3318 i = pri;
3319 while (--i)
3320 qbp = qbp->qb_next;
3321 }
3322 switch (what) {
3323
3324 case QHIWAT:
3325 if (qbp)
3326 qbp->qb_hiwat = (size_t)val;
3327 else
3328 q->q_hiwat = (size_t)val;
3329 break;
3330
3331 case QLOWAT:
3332 if (qbp)
3333 qbp->qb_lowat = (size_t)val;
3334 else
3335 q->q_lowat = (size_t)val;
3336 break;
3337
3338 case QMAXPSZ:
3339 if (qbp)
3340 error = EINVAL;
3341 else
3342 q->q_maxpsz = (ssize_t)val;
3343
3344 /*
3345 * Performance concern, strwrite looks at the module below
3346 * the stream head for the maxpsz each time it does a write
3347 * we now cache it at the stream head. Check to see if this
3348 * queue is sitting directly below the stream head.
3349 */
3350 wrq = STREAM(q)->sd_wrq;
3351 if (q != wrq->q_next)
3352 break;
3353
3354 /*
3355 * If the stream is not frozen drop the current QLOCK and
3356 * acquire the sd_wrq QLOCK which protects sd_qn_*
3357 */
3358 if (freezer != curthread) {
3359 mutex_exit(QLOCK(q));
3360 mutex_enter(QLOCK(wrq));
3361 }
3362 ASSERT(MUTEX_HELD(QLOCK(wrq)));
3363
3364 if (strmsgsz != 0) {
3365 if (val == INFPSZ)
3366 val = strmsgsz;
3367 else {
3368 if (STREAM(q)->sd_vnode->v_type == VFIFO)
3369 val = MIN(PIPE_BUF, val);
3370 else
3371 val = MIN(strmsgsz, val);
3372 }
3373 }
3374 STREAM(q)->sd_qn_maxpsz = val;
3375 if (freezer != curthread) {
3376 mutex_exit(QLOCK(wrq));
3377 mutex_enter(QLOCK(q));
3378 }
3379 break;
3380
3381 case QMINPSZ:
3382 if (qbp)
3383 error = EINVAL;
3384 else
3385 q->q_minpsz = (ssize_t)val;
3386
3387 /*
3388 * Performance concern, strwrite looks at the module below
3389 * the stream head for the maxpsz each time it does a write
3390 * we now cache it at the stream head. Check to see if this
3391 * queue is sitting directly below the stream head.
3392 */
3393 wrq = STREAM(q)->sd_wrq;
3394 if (q != wrq->q_next)
3395 break;
3396
3397 /*
3398 * If the stream is not frozen drop the current QLOCK and
3399 * acquire the sd_wrq QLOCK which protects sd_qn_*
3400 */
3401 if (freezer != curthread) {
3402 mutex_exit(QLOCK(q));
3403 mutex_enter(QLOCK(wrq));
3404 }
3405 STREAM(q)->sd_qn_minpsz = (ssize_t)val;
3406
3407 if (freezer != curthread) {
3408 mutex_exit(QLOCK(wrq));
3409 mutex_enter(QLOCK(q));
3410 }
3411 break;
3412
3413 case QSTRUIOT:
3414 if (qbp)
3415 error = EINVAL;
3416 else
3417 q->q_struiot = (ushort_t)val;
3418 break;
3419
3420 case QCOUNT:
3421 case QFIRST:
3422 case QLAST:
3423 case QFLAG:
3424 error = EPERM;
3425 break;
3426
3427 default:
3428 error = EINVAL;
3429 break;
3430 }
3431 done:
3432 if (freezer != curthread)
3433 mutex_exit(QLOCK(q));
3434 return (error);
3435 }
3436
3437 /*
3438 * Get queue fields.
3439 */
3440 int
strqget(queue_t * q,qfields_t what,unsigned char pri,void * valp)3441 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
3442 {
3443 qband_t *qbp = NULL;
3444 int error = 0;
3445 kthread_id_t freezer;
3446
3447 freezer = STREAM(q)->sd_freezer;
3448 if (freezer == curthread) {
3449 ASSERT(frozenstr(q));
3450 ASSERT(MUTEX_HELD(QLOCK(q)));
3451 } else
3452 mutex_enter(QLOCK(q));
3453 if (what >= QBAD) {
3454 error = EINVAL;
3455 goto done;
3456 }
3457 if (pri != 0) {
3458 int i;
3459 qband_t **qbpp;
3460
3461 if (pri > q->q_nband) {
3462 qbpp = &q->q_bandp;
3463 while (*qbpp)
3464 qbpp = &(*qbpp)->qb_next;
3465 while (pri > q->q_nband) {
3466 if ((*qbpp = allocband()) == NULL) {
3467 error = EAGAIN;
3468 goto done;
3469 }
3470 (*qbpp)->qb_hiwat = q->q_hiwat;
3471 (*qbpp)->qb_lowat = q->q_lowat;
3472 q->q_nband++;
3473 qbpp = &(*qbpp)->qb_next;
3474 }
3475 }
3476 qbp = q->q_bandp;
3477 i = pri;
3478 while (--i)
3479 qbp = qbp->qb_next;
3480 }
3481 switch (what) {
3482 case QHIWAT:
3483 if (qbp)
3484 *(size_t *)valp = qbp->qb_hiwat;
3485 else
3486 *(size_t *)valp = q->q_hiwat;
3487 break;
3488
3489 case QLOWAT:
3490 if (qbp)
3491 *(size_t *)valp = qbp->qb_lowat;
3492 else
3493 *(size_t *)valp = q->q_lowat;
3494 break;
3495
3496 case QMAXPSZ:
3497 if (qbp)
3498 error = EINVAL;
3499 else
3500 *(ssize_t *)valp = q->q_maxpsz;
3501 break;
3502
3503 case QMINPSZ:
3504 if (qbp)
3505 error = EINVAL;
3506 else
3507 *(ssize_t *)valp = q->q_minpsz;
3508 break;
3509
3510 case QCOUNT:
3511 if (qbp)
3512 *(size_t *)valp = qbp->qb_count;
3513 else
3514 *(size_t *)valp = q->q_count;
3515 break;
3516
3517 case QFIRST:
3518 if (qbp)
3519 *(mblk_t **)valp = qbp->qb_first;
3520 else
3521 *(mblk_t **)valp = q->q_first;
3522 break;
3523
3524 case QLAST:
3525 if (qbp)
3526 *(mblk_t **)valp = qbp->qb_last;
3527 else
3528 *(mblk_t **)valp = q->q_last;
3529 break;
3530
3531 case QFLAG:
3532 if (qbp)
3533 *(uint_t *)valp = qbp->qb_flag;
3534 else
3535 *(uint_t *)valp = q->q_flag;
3536 break;
3537
3538 case QSTRUIOT:
3539 if (qbp)
3540 error = EINVAL;
3541 else
3542 *(short *)valp = q->q_struiot;
3543 break;
3544
3545 default:
3546 error = EINVAL;
3547 break;
3548 }
3549 done:
3550 if (freezer != curthread)
3551 mutex_exit(QLOCK(q));
3552 return (error);
3553 }
3554
3555 /*
3556 * Function awakes all in cvwait/sigwait/pollwait, on one of:
3557 * QWANTWSYNC or QWANTR or QWANTW,
3558 *
3559 * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
3560 * deferred wakeup will be done. Also if strpoll() in progress then a
3561 * deferred pollwakeup will be done.
3562 */
3563 void
strwakeq(queue_t * q,int flag)3564 strwakeq(queue_t *q, int flag)
3565 {
3566 stdata_t *stp = STREAM(q);
3567 pollhead_t *pl;
3568
3569 mutex_enter(&stp->sd_lock);
3570 pl = &stp->sd_pollist;
3571 if (flag & QWANTWSYNC) {
3572 ASSERT(!(q->q_flag & QREADR));
3573 if (stp->sd_flag & WSLEEP) {
3574 stp->sd_flag &= ~WSLEEP;
3575 cv_broadcast(&stp->sd_wrq->q_wait);
3576 } else {
3577 stp->sd_wakeq |= WSLEEP;
3578 }
3579
3580 mutex_exit(&stp->sd_lock);
3581 pollwakeup(pl, POLLWRNORM);
3582 mutex_enter(&stp->sd_lock);
3583
3584 if (stp->sd_sigflags & S_WRNORM)
3585 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3586 } else if (flag & QWANTR) {
3587 if (stp->sd_flag & RSLEEP) {
3588 stp->sd_flag &= ~RSLEEP;
3589 cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
3590 } else {
3591 stp->sd_wakeq |= RSLEEP;
3592 }
3593
3594 mutex_exit(&stp->sd_lock);
3595 pollwakeup(pl, POLLIN | POLLRDNORM);
3596 mutex_enter(&stp->sd_lock);
3597
3598 {
3599 int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
3600
3601 if (events)
3602 strsendsig(stp->sd_siglist, events, 0, 0);
3603 }
3604 } else {
3605 if (stp->sd_flag & WSLEEP) {
3606 stp->sd_flag &= ~WSLEEP;
3607 cv_broadcast(&stp->sd_wrq->q_wait);
3608 }
3609
3610 mutex_exit(&stp->sd_lock);
3611 pollwakeup(pl, POLLWRNORM);
3612 mutex_enter(&stp->sd_lock);
3613
3614 if (stp->sd_sigflags & S_WRNORM)
3615 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3616 }
3617 mutex_exit(&stp->sd_lock);
3618 }
3619
3620 int
struioget(queue_t * q,mblk_t * mp,struiod_t * dp,int noblock)3621 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
3622 {
3623 stdata_t *stp = STREAM(q);
3624 int typ = STRUIOT_STANDARD;
3625 uio_t *uiop = &dp->d_uio;
3626 dblk_t *dbp;
3627 ssize_t uiocnt;
3628 ssize_t cnt;
3629 unsigned char *ptr;
3630 ssize_t resid;
3631 int error = 0;
3632 on_trap_data_t otd;
3633 queue_t *stwrq;
3634
3635 /*
3636 * Plumbing may change while taking the type so store the
3637 * queue in a temporary variable. It doesn't matter even
3638 * if the we take the type from the previous plumbing,
3639 * that's because if the plumbing has changed when we were
3640 * holding the queue in a temporary variable, we can continue
3641 * processing the message the way it would have been processed
3642 * in the old plumbing, without any side effects but a bit
3643 * extra processing for partial ip header checksum.
3644 *
3645 * This has been done to avoid holding the sd_lock which is
3646 * very hot.
3647 */
3648
3649 stwrq = stp->sd_struiowrq;
3650 if (stwrq)
3651 typ = stwrq->q_struiot;
3652
3653 for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
3654 dbp = mp->b_datap;
3655 ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
3656 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3657 cnt = MIN(uiocnt, uiop->uio_resid);
3658 if (!(dbp->db_struioflag & STRUIO_SPEC) ||
3659 (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
3660 /*
3661 * Either this mblk has already been processed
3662 * or there is no more room in this mblk (?).
3663 */
3664 continue;
3665 }
3666 switch (typ) {
3667 case STRUIOT_STANDARD:
3668 if (noblock) {
3669 if (on_trap(&otd, OT_DATA_ACCESS)) {
3670 no_trap();
3671 error = EWOULDBLOCK;
3672 goto out;
3673 }
3674 }
3675 if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
3676 if (noblock)
3677 no_trap();
3678 goto out;
3679 }
3680 if (noblock)
3681 no_trap();
3682 break;
3683
3684 default:
3685 error = EIO;
3686 goto out;
3687 }
3688 dbp->db_struioflag |= STRUIO_DONE;
3689 dbp->db_cksumstuff += cnt;
3690 }
3691 out:
3692 if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
3693 /*
3694 * A fault has occured and some bytes were moved to the
3695 * current mblk, the uio_t has already been updated by
3696 * the appropriate uio routine, so also update the mblk
3697 * to reflect this in case this same mblk chain is used
3698 * again (after the fault has been handled).
3699 */
3700 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3701 if (uiocnt >= resid)
3702 dbp->db_cksumstuff += resid;
3703 }
3704 return (error);
3705 }
3706
3707 /*
3708 * Try to enter queue synchronously. Any attempt to enter a closing queue will
3709 * fails. The qp->q_rwcnt keeps track of the number of successful entries so
3710 * that removeq() will not try to close the queue while a thread is inside the
3711 * queue.
3712 */
3713 static boolean_t
rwnext_enter(queue_t * qp)3714 rwnext_enter(queue_t *qp)
3715 {
3716 mutex_enter(QLOCK(qp));
3717 if (qp->q_flag & QWCLOSE) {
3718 mutex_exit(QLOCK(qp));
3719 return (B_FALSE);
3720 }
3721 qp->q_rwcnt++;
3722 ASSERT(qp->q_rwcnt != 0);
3723 mutex_exit(QLOCK(qp));
3724 return (B_TRUE);
3725 }
3726
3727 /*
3728 * Decrease the count of threads running in sync stream queue and wake up any
3729 * threads blocked in removeq().
3730 */
3731 static void
rwnext_exit(queue_t * qp)3732 rwnext_exit(queue_t *qp)
3733 {
3734 mutex_enter(QLOCK(qp));
3735 qp->q_rwcnt--;
3736 if (qp->q_flag & QWANTRMQSYNC) {
3737 qp->q_flag &= ~QWANTRMQSYNC;
3738 cv_broadcast(&qp->q_wait);
3739 }
3740 mutex_exit(QLOCK(qp));
3741 }
3742
3743 /*
3744 * The purpose of rwnext() is to call the rw procedure of the next
3745 * (downstream) modules queue.
3746 *
3747 * treated as put entrypoint for perimeter syncronization.
3748 *
3749 * There's no need to grab sq_putlocks here (which only exist for CIPUT
3750 * sync queues). If it is CIPUT sync queue sq_count is incremented and it does
3751 * not matter if any regular put entrypoints have been already entered. We
3752 * can't increment one of the sq_putcounts (instead of sq_count) because
3753 * qwait_rw won't know which counter to decrement.
3754 *
3755 * It would be reasonable to add the lockless FASTPUT logic.
3756 */
3757 int
rwnext(queue_t * qp,struiod_t * dp)3758 rwnext(queue_t *qp, struiod_t *dp)
3759 {
3760 queue_t *nqp;
3761 syncq_t *sq;
3762 uint16_t count;
3763 uint16_t flags;
3764 struct qinit *qi;
3765 int (*proc)();
3766 struct stdata *stp;
3767 int isread;
3768 int rval;
3769
3770 stp = STREAM(qp);
3771 /*
3772 * Prevent q_next from changing by holding sd_lock until acquiring
3773 * SQLOCK. Note that a read-side rwnext from the streamhead will
3774 * already have sd_lock acquired. In either case sd_lock is always
3775 * released after acquiring SQLOCK.
3776 *
3777 * The streamhead read-side holding sd_lock when calling rwnext is
3778 * required to prevent a race condition were M_DATA mblks flowing
3779 * up the read-side of the stream could be bypassed by a rwnext()
3780 * down-call. In this case sd_lock acts as the streamhead perimeter.
3781 */
3782 if ((nqp = _WR(qp)) == qp) {
3783 isread = 0;
3784 mutex_enter(&stp->sd_lock);
3785 qp = nqp->q_next;
3786 } else {
3787 isread = 1;
3788 if (nqp != stp->sd_wrq)
3789 /* Not streamhead */
3790 mutex_enter(&stp->sd_lock);
3791 qp = _RD(nqp->q_next);
3792 }
3793 qi = qp->q_qinfo;
3794 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
3795 /*
3796 * Not a synchronous module or no r/w procedure for this
3797 * queue, so just return EINVAL and let the caller handle it.
3798 */
3799 mutex_exit(&stp->sd_lock);
3800 return (EINVAL);
3801 }
3802
3803 if (rwnext_enter(qp) == B_FALSE) {
3804 mutex_exit(&stp->sd_lock);
3805 return (EINVAL);
3806 }
3807
3808 sq = qp->q_syncq;
3809 mutex_enter(SQLOCK(sq));
3810 mutex_exit(&stp->sd_lock);
3811 count = sq->sq_count;
3812 flags = sq->sq_flags;
3813 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3814
3815 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3816 /*
3817 * if this queue is being closed, return.
3818 */
3819 if (qp->q_flag & QWCLOSE) {
3820 mutex_exit(SQLOCK(sq));
3821 rwnext_exit(qp);
3822 return (EINVAL);
3823 }
3824
3825 /*
3826 * Wait until we can enter the inner perimeter.
3827 */
3828 sq->sq_flags = flags | SQ_WANTWAKEUP;
3829 cv_wait(&sq->sq_wait, SQLOCK(sq));
3830 count = sq->sq_count;
3831 flags = sq->sq_flags;
3832 }
3833
3834 if (isread == 0 && stp->sd_struiowrq == NULL ||
3835 isread == 1 && stp->sd_struiordq == NULL) {
3836 /*
3837 * Stream plumbing changed while waiting for inner perimeter
3838 * so just return EINVAL and let the caller handle it.
3839 */
3840 mutex_exit(SQLOCK(sq));
3841 rwnext_exit(qp);
3842 return (EINVAL);
3843 }
3844 if (!(flags & SQ_CIPUT))
3845 sq->sq_flags = flags | SQ_EXCL;
3846 sq->sq_count = count + 1;
3847 ASSERT(sq->sq_count != 0); /* Wraparound */
3848 /*
3849 * Note: The only message ordering guarantee that rwnext() makes is
3850 * for the write queue flow-control case. All others (r/w queue
3851 * with q_count > 0 (or q_first != 0)) are the resposibilty of
3852 * the queue's rw procedure. This could be genralized here buy
3853 * running the queue's service procedure, but that wouldn't be
3854 * the most efficent for all cases.
3855 */
3856 mutex_exit(SQLOCK(sq));
3857 if (! isread && (qp->q_flag & QFULL)) {
3858 /*
3859 * Write queue may be flow controlled. If so,
3860 * mark the queue for wakeup when it's not.
3861 */
3862 mutex_enter(QLOCK(qp));
3863 if (qp->q_flag & QFULL) {
3864 qp->q_flag |= QWANTWSYNC;
3865 mutex_exit(QLOCK(qp));
3866 rval = EWOULDBLOCK;
3867 goto out;
3868 }
3869 mutex_exit(QLOCK(qp));
3870 }
3871
3872 if (! isread && dp->d_mp)
3873 STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
3874 dp->d_mp->b_datap->db_base);
3875
3876 rval = (*proc)(qp, dp);
3877
3878 if (isread && dp->d_mp)
3879 STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
3880 dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
3881 out:
3882 /*
3883 * The queue is protected from being freed by sq_count, so it is
3884 * safe to call rwnext_exit and reacquire SQLOCK(sq).
3885 */
3886 rwnext_exit(qp);
3887
3888 mutex_enter(SQLOCK(sq));
3889 flags = sq->sq_flags;
3890 ASSERT(sq->sq_count != 0);
3891 sq->sq_count--;
3892 if (flags & SQ_TAIL) {
3893 putnext_tail(sq, qp, flags);
3894 /*
3895 * The only purpose of this ASSERT is to preserve calling stack
3896 * in DEBUG kernel.
3897 */
3898 ASSERT(flags & SQ_TAIL);
3899 return (rval);
3900 }
3901 ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3902 /*
3903 * Safe to always drop SQ_EXCL:
3904 * Not SQ_CIPUT means we set SQ_EXCL above
3905 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3906 * did a qwriter(INNER) in which case nobody else
3907 * is in the inner perimeter and we are exiting.
3908 *
3909 * I would like to make the following assertion:
3910 *
3911 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3912 * sq->sq_count == 0);
3913 *
3914 * which indicates that if we are both putshared and exclusive,
3915 * we became exclusive while executing the putproc, and the only
3916 * claim on the syncq was the one we dropped a few lines above.
3917 * But other threads that enter putnext while the syncq is exclusive
3918 * need to make a claim as they may need to drop SQLOCK in the
3919 * has_writers case to avoid deadlocks. If these threads are
3920 * delayed or preempted, it is possible that the writer thread can
3921 * find out that there are other claims making the (sq_count == 0)
3922 * test invalid.
3923 */
3924
3925 sq->sq_flags = flags & ~SQ_EXCL;
3926 if (sq->sq_flags & SQ_WANTWAKEUP) {
3927 sq->sq_flags &= ~SQ_WANTWAKEUP;
3928 cv_broadcast(&sq->sq_wait);
3929 }
3930 mutex_exit(SQLOCK(sq));
3931 return (rval);
3932 }
3933
3934 /*
3935 * The purpose of infonext() is to call the info procedure of the next
3936 * (downstream) modules queue.
3937 *
3938 * treated as put entrypoint for perimeter syncronization.
3939 *
3940 * There's no need to grab sq_putlocks here (which only exist for CIPUT
3941 * sync queues). If it is CIPUT sync queue regular sq_count is incremented and
3942 * it does not matter if any regular put entrypoints have been already
3943 * entered.
3944 */
3945 int
infonext(queue_t * qp,infod_t * idp)3946 infonext(queue_t *qp, infod_t *idp)
3947 {
3948 queue_t *nqp;
3949 syncq_t *sq;
3950 uint16_t count;
3951 uint16_t flags;
3952 struct qinit *qi;
3953 int (*proc)();
3954 struct stdata *stp;
3955 int rval;
3956
3957 stp = STREAM(qp);
3958 /*
3959 * Prevent q_next from changing by holding sd_lock until
3960 * acquiring SQLOCK.
3961 */
3962 mutex_enter(&stp->sd_lock);
3963 if ((nqp = _WR(qp)) == qp) {
3964 qp = nqp->q_next;
3965 } else {
3966 qp = _RD(nqp->q_next);
3967 }
3968 qi = qp->q_qinfo;
3969 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
3970 mutex_exit(&stp->sd_lock);
3971 return (EINVAL);
3972 }
3973 sq = qp->q_syncq;
3974 mutex_enter(SQLOCK(sq));
3975 mutex_exit(&stp->sd_lock);
3976 count = sq->sq_count;
3977 flags = sq->sq_flags;
3978 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3979
3980 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3981 /*
3982 * Wait until we can enter the inner perimeter.
3983 */
3984 sq->sq_flags = flags | SQ_WANTWAKEUP;
3985 cv_wait(&sq->sq_wait, SQLOCK(sq));
3986 count = sq->sq_count;
3987 flags = sq->sq_flags;
3988 }
3989
3990 if (! (flags & SQ_CIPUT))
3991 sq->sq_flags = flags | SQ_EXCL;
3992 sq->sq_count = count + 1;
3993 ASSERT(sq->sq_count != 0); /* Wraparound */
3994 mutex_exit(SQLOCK(sq));
3995
3996 rval = (*proc)(qp, idp);
3997
3998 mutex_enter(SQLOCK(sq));
3999 flags = sq->sq_flags;
4000 ASSERT(sq->sq_count != 0);
4001 sq->sq_count--;
4002 if (flags & SQ_TAIL) {
4003 putnext_tail(sq, qp, flags);
4004 /*
4005 * The only purpose of this ASSERT is to preserve calling stack
4006 * in DEBUG kernel.
4007 */
4008 ASSERT(flags & SQ_TAIL);
4009 return (rval);
4010 }
4011 ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
4012 /*
4013 * XXXX
4014 * I am not certain the next comment is correct here. I need to consider
4015 * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
4016 * might cause other problems. It just might be safer to drop it if
4017 * !SQ_CIPUT because that is when we set it.
4018 */
4019 /*
4020 * Safe to always drop SQ_EXCL:
4021 * Not SQ_CIPUT means we set SQ_EXCL above
4022 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure
4023 * did a qwriter(INNER) in which case nobody else
4024 * is in the inner perimeter and we are exiting.
4025 *
4026 * I would like to make the following assertion:
4027 *
4028 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
4029 * sq->sq_count == 0);
4030 *
4031 * which indicates that if we are both putshared and exclusive,
4032 * we became exclusive while executing the putproc, and the only
4033 * claim on the syncq was the one we dropped a few lines above.
4034 * But other threads that enter putnext while the syncq is exclusive
4035 * need to make a claim as they may need to drop SQLOCK in the
4036 * has_writers case to avoid deadlocks. If these threads are
4037 * delayed or preempted, it is possible that the writer thread can
4038 * find out that there are other claims making the (sq_count == 0)
4039 * test invalid.
4040 */
4041
4042 sq->sq_flags = flags & ~SQ_EXCL;
4043 mutex_exit(SQLOCK(sq));
4044 return (rval);
4045 }
4046
4047 /*
4048 * Return nonzero if the queue is responsible for struio(), else return 0.
4049 */
4050 int
isuioq(queue_t * q)4051 isuioq(queue_t *q)
4052 {
4053 if (q->q_flag & QREADR)
4054 return (STREAM(q)->sd_struiordq == q);
4055 else
4056 return (STREAM(q)->sd_struiowrq == q);
4057 }
4058
4059 #if defined(__sparc)
4060 int disable_putlocks = 0;
4061 #else
4062 int disable_putlocks = 1;
4063 #endif
4064
4065 /*
4066 * called by create_putlock.
4067 */
4068 static void
create_syncq_putlocks(queue_t * q)4069 create_syncq_putlocks(queue_t *q)
4070 {
4071 syncq_t *sq = q->q_syncq;
4072 ciputctrl_t *cip;
4073 int i;
4074
4075 ASSERT(sq != NULL);
4076
4077 ASSERT(disable_putlocks == 0);
4078 ASSERT(n_ciputctrl >= min_n_ciputctrl);
4079 ASSERT(ciputctrl_cache != NULL);
4080
4081 if (!(sq->sq_type & SQ_CIPUT))
4082 return;
4083
4084 for (i = 0; i <= 1; i++) {
4085 if (sq->sq_ciputctrl == NULL) {
4086 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4087 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4088 mutex_enter(SQLOCK(sq));
4089 if (sq->sq_ciputctrl != NULL) {
4090 mutex_exit(SQLOCK(sq));
4091 kmem_cache_free(ciputctrl_cache, cip);
4092 } else {
4093 ASSERT(sq->sq_nciputctrl == 0);
4094 sq->sq_nciputctrl = n_ciputctrl - 1;
4095 /*
4096 * putnext checks sq_ciputctrl without holding
4097 * SQLOCK. if it is not NULL putnext assumes
4098 * sq_nciputctrl is initialized. membar below
4099 * insures that.
4100 */
4101 membar_producer();
4102 sq->sq_ciputctrl = cip;
4103 mutex_exit(SQLOCK(sq));
4104 }
4105 }
4106 ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
4107 if (i == 1)
4108 break;
4109 q = _OTHERQ(q);
4110 if (!(q->q_flag & QPERQ)) {
4111 ASSERT(sq == q->q_syncq);
4112 break;
4113 }
4114 ASSERT(q->q_syncq != NULL);
4115 ASSERT(sq != q->q_syncq);
4116 sq = q->q_syncq;
4117 ASSERT(sq->sq_type & SQ_CIPUT);
4118 }
4119 }
4120
4121 /*
4122 * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
4123 * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
4124 * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
4125 * starting from q and down to the driver.
4126 *
4127 * This should be called after the affected queues are part of stream
4128 * geometry. It should be called from driver/module open routine after
4129 * qprocson() call. It is also called from nfs syscall where it is known that
4130 * stream is configured and won't change its geometry during create_putlock
4131 * call.
4132 *
4133 * caller normally uses 0 value for the stream argument to speed up MT putnext
4134 * into the perimeter of q for example because its perimeter is per module
4135 * (e.g. IP).
4136 *
4137 * caller normally uses non 0 value for the stream argument to hint the system
4138 * that the stream of q is a very contended global system stream
4139 * (e.g. NFS/UDP) and the part of the stream from q to the driver is
4140 * particularly MT hot.
4141 *
4142 * Caller insures stream plumbing won't happen while we are here and therefore
4143 * q_next can be safely used.
4144 */
4145
4146 void
create_putlocks(queue_t * q,int stream)4147 create_putlocks(queue_t *q, int stream)
4148 {
4149 ciputctrl_t *cip;
4150 struct stdata *stp = STREAM(q);
4151
4152 q = _WR(q);
4153 ASSERT(stp != NULL);
4154
4155 if (disable_putlocks != 0)
4156 return;
4157
4158 if (n_ciputctrl < min_n_ciputctrl)
4159 return;
4160
4161 ASSERT(ciputctrl_cache != NULL);
4162
4163 if (stream != 0 && stp->sd_ciputctrl == NULL) {
4164 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4165 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4166 mutex_enter(&stp->sd_lock);
4167 if (stp->sd_ciputctrl != NULL) {
4168 mutex_exit(&stp->sd_lock);
4169 kmem_cache_free(ciputctrl_cache, cip);
4170 } else {
4171 ASSERT(stp->sd_nciputctrl == 0);
4172 stp->sd_nciputctrl = n_ciputctrl - 1;
4173 /*
4174 * putnext checks sd_ciputctrl without holding
4175 * sd_lock. if it is not NULL putnext assumes
4176 * sd_nciputctrl is initialized. membar below
4177 * insures that.
4178 */
4179 membar_producer();
4180 stp->sd_ciputctrl = cip;
4181 mutex_exit(&stp->sd_lock);
4182 }
4183 }
4184
4185 ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);
4186
4187 while (_SAMESTR(q)) {
4188 create_syncq_putlocks(q);
4189 if (stream == 0)
4190 return;
4191 q = q->q_next;
4192 }
4193 ASSERT(q != NULL);
4194 create_syncq_putlocks(q);
4195 }
4196
4197 /*
4198 * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
4199 * through a stream.
4200 *
4201 * Data currently record per-event is a timestamp, module/driver name,
4202 * downstream module/driver name, optional callstack, event type and a per
4203 * type datum. Much of the STREAMS framework is instrumented for automatic
4204 * flow tracing (when enabled). Events can be defined and used by STREAMS
4205 * modules and drivers.
4206 *
4207 * Global objects:
4208 *
4209 * str_ftevent() - Add a flow-trace event to a dblk.
4210 * str_ftfree() - Free flow-trace data
4211 *
4212 * Local objects:
4213 *
4214 * fthdr_cache - pointer to the kmem cache for trace header.
4215 * ftblk_cache - pointer to the kmem cache for trace data blocks.
4216 */
4217
4218 int str_ftnever = 1; /* Don't do STREAMS flow tracing */
4219 int str_ftstack = 0; /* Don't record event call stacks */
4220
4221 void
str_ftevent(fthdr_t * hp,void * p,ushort_t evnt,ushort_t data)4222 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
4223 {
4224 ftblk_t *bp = hp->tail;
4225 ftblk_t *nbp;
4226 ftevnt_t *ep;
4227 int ix, nix;
4228
4229 ASSERT(hp != NULL);
4230
4231 for (;;) {
4232 if ((ix = bp->ix) == FTBLK_EVNTS) {
4233 /*
4234 * Tail doesn't have room, so need a new tail.
4235 *
4236 * To make this MT safe, first, allocate a new
4237 * ftblk, and initialize it. To make life a
4238 * little easier, reserve the first slot (mostly
4239 * by making ix = 1). When we are finished with
4240 * the initialization, CAS this pointer to the
4241 * tail. If this succeeds, this is the new
4242 * "next" block. Otherwise, another thread
4243 * got here first, so free the block and start
4244 * again.
4245 */
4246 nbp = kmem_cache_alloc(ftblk_cache, KM_NOSLEEP);
4247 if (nbp == NULL) {
4248 /* no mem, so punt */
4249 str_ftnever++;
4250 /* free up all flow data? */
4251 return;
4252 }
4253 nbp->nxt = NULL;
4254 nbp->ix = 1;
4255 /*
4256 * Just in case there is another thread about
4257 * to get the next index, we need to make sure
4258 * the value is there for it.
4259 */
4260 membar_producer();
4261 if (atomic_cas_ptr(&hp->tail, bp, nbp) == bp) {
4262 /* CAS was successful */
4263 bp->nxt = nbp;
4264 membar_producer();
4265 bp = nbp;
4266 ix = 0;
4267 goto cas_good;
4268 } else {
4269 kmem_cache_free(ftblk_cache, nbp);
4270 bp = hp->tail;
4271 continue;
4272 }
4273 }
4274 nix = ix + 1;
4275 if (atomic_cas_32((uint32_t *)&bp->ix, ix, nix) == ix) {
4276 cas_good:
4277 if (curthread != hp->thread) {
4278 hp->thread = curthread;
4279 evnt |= FTEV_CS;
4280 }
4281 if (CPU->cpu_seqid != hp->cpu_seqid) {
4282 hp->cpu_seqid = CPU->cpu_seqid;
4283 evnt |= FTEV_PS;
4284 }
4285 ep = &bp->ev[ix];
4286 break;
4287 }
4288 }
4289
4290 if (evnt & FTEV_QMASK) {
4291 queue_t *qp = p;
4292
4293 if (!(qp->q_flag & QREADR))
4294 evnt |= FTEV_ISWR;
4295
4296 ep->mid = Q2NAME(qp);
4297
4298 /*
4299 * We only record the next queue name for FTEV_PUTNEXT since
4300 * that's the only time we *really* need it, and the putnext()
4301 * code ensures that qp->q_next won't vanish. (We could use
4302 * claimstr()/releasestr() but at a performance cost.)
4303 */
4304 if ((evnt & FTEV_MASK) == FTEV_PUTNEXT && qp->q_next != NULL)
4305 ep->midnext = Q2NAME(qp->q_next);
4306 else
4307 ep->midnext = NULL;
4308 } else {
4309 ep->mid = p;
4310 ep->midnext = NULL;
4311 }
4312
4313 if (ep->stk != NULL)
4314 ep->stk->fs_depth = getpcstack(ep->stk->fs_stk, FTSTK_DEPTH);
4315
4316 ep->ts = gethrtime();
4317 ep->evnt = evnt;
4318 ep->data = data;
4319 hp->hash = (hp->hash << 9) + hp->hash;
4320 hp->hash += (evnt << 16) | data;
4321 hp->hash += (uintptr_t)ep->mid;
4322 }
4323
4324 /*
4325 * Free flow-trace data.
4326 */
4327 void
str_ftfree(dblk_t * dbp)4328 str_ftfree(dblk_t *dbp)
4329 {
4330 fthdr_t *hp = dbp->db_fthdr;
4331 ftblk_t *bp = &hp->first;
4332 ftblk_t *nbp;
4333
4334 if (bp != hp->tail || bp->ix != 0) {
4335 /*
4336 * Clear out the hash, have the tail point to itself, and free
4337 * any continuation blocks.
4338 */
4339 bp = hp->first.nxt;
4340 hp->tail = &hp->first;
4341 hp->hash = 0;
4342 hp->first.nxt = NULL;
4343 hp->first.ix = 0;
4344 while (bp != NULL) {
4345 nbp = bp->nxt;
4346 kmem_cache_free(ftblk_cache, bp);
4347 bp = nbp;
4348 }
4349 }
4350 kmem_cache_free(fthdr_cache, hp);
4351 dbp->db_fthdr = NULL;
4352 }
4353