1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */
22 /* All Rights Reserved */
23
24 /*
25 * Copyright 2009 Sun Microsystems, Inc. All rights reserved.
26 * Use is subject to license terms.
27 *
28 * Copyright 2021 Tintri by DDN, Inc. All rights reserved.
29 * Copyright 2022 Garrett D'Amore
30 * Copyright 2024 Oxide Computer Company
31 */
32
33 #include <sys/types.h>
34 #include <sys/param.h>
35 #include <sys/thread.h>
36 #include <sys/sysmacros.h>
37 #include <sys/stropts.h>
38 #include <sys/stream.h>
39 #include <sys/strsubr.h>
40 #include <sys/strsun.h>
41 #include <sys/conf.h>
42 #include <sys/debug.h>
43 #include <sys/cmn_err.h>
44 #include <sys/kmem.h>
45 #include <sys/atomic.h>
46 #include <sys/errno.h>
47 #include <sys/vtrace.h>
48 #include <sys/ftrace.h>
49 #include <sys/ontrap.h>
50 #include <sys/sdt.h>
51 #include <sys/strft.h>
52
53 #ifdef DEBUG
54 #include <sys/kmem_impl.h>
55 #endif
56
57 /*
58 * This file contains all the STREAMS utility routines that may
59 * be used by modules and drivers.
60 */
61
62 /*
63 * STREAMS message allocator: principles of operation
64 *
65 * The streams message allocator consists of all the routines that
66 * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
67 * dupb(), freeb() and freemsg(). What follows is a high-level view
68 * of how the allocator works.
69 *
70 * Every streams message consists of one or more mblks, a dblk, and data.
71 * All mblks for all types of messages come from a common mblk_cache.
72 * The dblk and data come in several flavors, depending on how the
73 * message is allocated:
74 *
75 * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
76 * fixed-size dblk/data caches. For message sizes that are multiples of
77 * PAGESIZE, dblks are allocated separately from the buffer.
78 * The associated buffer is allocated by the constructor using kmem_alloc().
79 * For all other message sizes, dblk and its associated data is allocated
80 * as a single contiguous chunk of memory.
81 * Objects in these caches consist of a dblk plus its associated data.
82 * allocb() determines the nearest-size cache by table lookup:
83 * the dblk_cache[] array provides the mapping from size to dblk cache.
84 *
85 * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
86 * kmem_alloc()'ing a buffer for the data and supplying that
87 * buffer to gesballoc(), described below.
88 *
89 * (3) The four flavors of [d]esballoc[a] are all implemented by a
90 * common routine, gesballoc() ("generic esballoc"). gesballoc()
91 * allocates a dblk from the global dblk_esb_cache and sets db_base,
92 * db_lim and db_frtnp to describe the caller-supplied buffer.
93 *
94 * While there are several routines to allocate messages, there is only
95 * one routine to free messages: freeb(). freeb() simply invokes the
96 * dblk's free method, dbp->db_free(), which is set at allocation time.
97 *
98 * dupb() creates a new reference to a message by allocating a new mblk,
99 * incrementing the dblk reference count and setting the dblk's free
100 * method to dblk_decref(). The dblk's original free method is retained
101 * in db_lastfree. dblk_decref() decrements the reference count on each
102 * freeb(). If this is not the last reference it just frees the mblk;
103 * if this *is* the last reference, it restores db_free to db_lastfree,
104 * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
105 *
106 * The implementation makes aggressive use of kmem object caching for
107 * maximum performance. This makes the code simple and compact, but
108 * also a bit abstruse in some places. The invariants that constitute a
109 * message's constructed state, described below, are more subtle than usual.
110 *
111 * Every dblk has an "attached mblk" as part of its constructed state.
112 * The mblk is allocated by the dblk's constructor and remains attached
113 * until the message is either dup'ed or pulled up. In the dupb() case
114 * the mblk association doesn't matter until the last free, at which time
115 * dblk_decref() attaches the last mblk to the dblk. pullupmsg() affects
116 * the mblk association because it swaps the leading mblks of two messages,
117 * so it is responsible for swapping their db_mblk pointers accordingly.
118 * From a constructed-state viewpoint it doesn't matter that a dblk's
119 * attached mblk can change while the message is allocated; all that
120 * matters is that the dblk has *some* attached mblk when it's freed.
121 *
122 * The sizes of the allocb() small-message caches are not magical.
123 * They represent a good trade-off between internal and external
124 * fragmentation for current workloads. They should be reevaluated
125 * periodically, especially if allocations larger than DBLK_MAX_CACHE
126 * become common. We use 64-byte alignment so that dblks don't
127 * straddle cache lines unnecessarily.
128 */
129 #define DBLK_MAX_CACHE 73728
130 #define DBLK_CACHE_ALIGN 64
131 #define DBLK_MIN_SIZE 8
132 #define DBLK_SIZE_SHIFT 3
133
134 #ifdef _BIG_ENDIAN
135 #define DBLK_RTFU_SHIFT(field) \
136 (8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
137 #else
138 #define DBLK_RTFU_SHIFT(field) \
139 (8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
140 #endif
141
142 #define DBLK_RTFU(ref, type, flags, uioflag) \
143 (((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
144 ((type) << DBLK_RTFU_SHIFT(db_type)) | \
145 (((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
146 ((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
147 #define DBLK_RTFU_REF_MASK (DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
148 #define DBLK_RTFU_WORD(dbp) (*((uint32_t *)&(dbp)->db_ref))
149 #define MBLK_BAND_FLAG_WORD(mp) (*((uint32_t *)&(mp)->b_band))
150
151 static size_t dblk_sizes[] = {
152 #ifdef _LP64
153 16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856,
154 8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624,
155 40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392,
156 #else
157 64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904,
158 8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672,
159 40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440,
160 #endif
161 DBLK_MAX_CACHE, 0
162 };
163
164 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
165 static struct kmem_cache *mblk_cache;
166 static struct kmem_cache *dblk_esb_cache;
167 static struct kmem_cache *fthdr_cache;
168 static struct kmem_cache *ftblk_cache;
169
170 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
171 static mblk_t *allocb_oversize(size_t size, int flags);
172 static int allocb_tryhard_fails;
173 static void frnop_func(void *arg);
174 frtn_t frnop = { frnop_func };
175 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
176
177 static boolean_t rwnext_enter(queue_t *qp);
178 static void rwnext_exit(queue_t *qp);
179
180 /*
181 * Patchable mblk/dblk kmem_cache flags.
182 */
183 int dblk_kmem_flags = 0;
184 int mblk_kmem_flags = 0;
185
186 static int
dblk_constructor(void * buf,void * cdrarg,int kmflags)187 dblk_constructor(void *buf, void *cdrarg, int kmflags)
188 {
189 dblk_t *dbp = buf;
190 ssize_t msg_size = (ssize_t)cdrarg;
191 size_t index;
192
193 ASSERT(msg_size != 0);
194
195 index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
196
197 ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
198
199 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
200 return (-1);
201 if ((msg_size & PAGEOFFSET) == 0) {
202 dbp->db_base = kmem_alloc(msg_size, kmflags);
203 if (dbp->db_base == NULL) {
204 kmem_cache_free(mblk_cache, dbp->db_mblk);
205 return (-1);
206 }
207 } else {
208 dbp->db_base = (unsigned char *)&dbp[1];
209 }
210
211 dbp->db_mblk->b_datap = dbp;
212 dbp->db_cache = dblk_cache[index];
213 dbp->db_lim = dbp->db_base + msg_size;
214 dbp->db_free = dbp->db_lastfree = dblk_lastfree;
215 dbp->db_frtnp = NULL;
216 dbp->db_fthdr = NULL;
217 dbp->db_credp = NULL;
218 dbp->db_cpid = -1;
219 dbp->db_struioflag = 0;
220 dbp->db_struioun.cksum.flags = 0;
221 return (0);
222 }
223
224 /*ARGSUSED*/
225 static int
dblk_esb_constructor(void * buf,void * cdrarg,int kmflags)226 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
227 {
228 dblk_t *dbp = buf;
229
230 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
231 return (-1);
232 dbp->db_mblk->b_datap = dbp;
233 dbp->db_cache = dblk_esb_cache;
234 dbp->db_fthdr = NULL;
235 dbp->db_credp = NULL;
236 dbp->db_cpid = -1;
237 dbp->db_struioflag = 0;
238 dbp->db_struioun.cksum.flags = 0;
239 return (0);
240 }
241
242 static int
bcache_dblk_constructor(void * buf,void * cdrarg,int kmflags)243 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
244 {
245 dblk_t *dbp = buf;
246 bcache_t *bcp = cdrarg;
247
248 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
249 return (-1);
250
251 dbp->db_base = kmem_cache_alloc(bcp->buffer_cache, kmflags);
252 if (dbp->db_base == NULL) {
253 kmem_cache_free(mblk_cache, dbp->db_mblk);
254 return (-1);
255 }
256
257 dbp->db_mblk->b_datap = dbp;
258 dbp->db_cache = (void *)bcp;
259 dbp->db_lim = dbp->db_base + bcp->size;
260 dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
261 dbp->db_frtnp = NULL;
262 dbp->db_fthdr = NULL;
263 dbp->db_credp = NULL;
264 dbp->db_cpid = -1;
265 dbp->db_struioflag = 0;
266 dbp->db_struioun.cksum.flags = 0;
267 return (0);
268 }
269
270 /*ARGSUSED*/
271 static void
dblk_destructor(void * buf,void * cdrarg)272 dblk_destructor(void *buf, void *cdrarg)
273 {
274 dblk_t *dbp = buf;
275 ssize_t msg_size = (ssize_t)cdrarg;
276
277 ASSERT(dbp->db_mblk->b_datap == dbp);
278 ASSERT(msg_size != 0);
279 ASSERT(dbp->db_struioflag == 0);
280 ASSERT(dbp->db_struioun.cksum.flags == 0);
281
282 if ((msg_size & PAGEOFFSET) == 0) {
283 kmem_free(dbp->db_base, msg_size);
284 }
285
286 kmem_cache_free(mblk_cache, dbp->db_mblk);
287 }
288
289 static void
bcache_dblk_destructor(void * buf,void * cdrarg)290 bcache_dblk_destructor(void *buf, void *cdrarg)
291 {
292 dblk_t *dbp = buf;
293 bcache_t *bcp = cdrarg;
294
295 kmem_cache_free(bcp->buffer_cache, dbp->db_base);
296
297 ASSERT(dbp->db_mblk->b_datap == dbp);
298 ASSERT(dbp->db_struioflag == 0);
299 ASSERT(dbp->db_struioun.cksum.flags == 0);
300
301 kmem_cache_free(mblk_cache, dbp->db_mblk);
302 }
303
304 /* ARGSUSED */
305 static int
ftblk_constructor(void * buf,void * cdrarg,int kmflags)306 ftblk_constructor(void *buf, void *cdrarg, int kmflags)
307 {
308 ftblk_t *fbp = buf;
309 int i;
310
311 bzero(fbp, sizeof (ftblk_t));
312 if (str_ftstack != 0) {
313 for (i = 0; i < FTBLK_EVNTS; i++)
314 fbp->ev[i].stk = kmem_alloc(sizeof (ftstk_t), kmflags);
315 }
316
317 return (0);
318 }
319
320 /* ARGSUSED */
321 static void
ftblk_destructor(void * buf,void * cdrarg)322 ftblk_destructor(void *buf, void *cdrarg)
323 {
324 ftblk_t *fbp = buf;
325 int i;
326
327 if (str_ftstack != 0) {
328 for (i = 0; i < FTBLK_EVNTS; i++) {
329 if (fbp->ev[i].stk != NULL) {
330 kmem_free(fbp->ev[i].stk, sizeof (ftstk_t));
331 fbp->ev[i].stk = NULL;
332 }
333 }
334 }
335 }
336
337 static int
fthdr_constructor(void * buf,void * cdrarg,int kmflags)338 fthdr_constructor(void *buf, void *cdrarg, int kmflags)
339 {
340 fthdr_t *fhp = buf;
341
342 return (ftblk_constructor(&fhp->first, cdrarg, kmflags));
343 }
344
345 static void
fthdr_destructor(void * buf,void * cdrarg)346 fthdr_destructor(void *buf, void *cdrarg)
347 {
348 fthdr_t *fhp = buf;
349
350 ftblk_destructor(&fhp->first, cdrarg);
351 }
352
353 void
streams_msg_init(void)354 streams_msg_init(void)
355 {
356 char name[40];
357 size_t size;
358 size_t lastsize = DBLK_MIN_SIZE;
359 size_t *sizep;
360 struct kmem_cache *cp;
361 size_t tot_size;
362 int offset;
363
364 mblk_cache = kmem_cache_create("streams_mblk", sizeof (mblk_t), 32,
365 NULL, NULL, NULL, NULL, NULL, mblk_kmem_flags);
366
367 for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
368
369 if ((offset = (size & PAGEOFFSET)) != 0) {
370 /*
371 * We are in the middle of a page, dblk should
372 * be allocated on the same page
373 */
374 tot_size = size + sizeof (dblk_t);
375 ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
376 < PAGESIZE);
377 ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
378
379 } else {
380
381 /*
382 * buf size is multiple of page size, dblk and
383 * buffer are allocated separately.
384 */
385
386 ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
387 tot_size = sizeof (dblk_t);
388 }
389
390 (void) sprintf(name, "streams_dblk_%ld", size);
391 cp = kmem_cache_create(name, tot_size, DBLK_CACHE_ALIGN,
392 dblk_constructor, dblk_destructor, NULL, (void *)(size),
393 NULL, dblk_kmem_flags);
394
395 while (lastsize <= size) {
396 dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
397 lastsize += DBLK_MIN_SIZE;
398 }
399 }
400
401 dblk_esb_cache = kmem_cache_create("streams_dblk_esb", sizeof (dblk_t),
402 DBLK_CACHE_ALIGN, dblk_esb_constructor, dblk_destructor, NULL,
403 (void *)sizeof (dblk_t), NULL, dblk_kmem_flags);
404 fthdr_cache = kmem_cache_create("streams_fthdr", sizeof (fthdr_t), 32,
405 fthdr_constructor, fthdr_destructor, NULL, NULL, NULL, 0);
406 ftblk_cache = kmem_cache_create("streams_ftblk", sizeof (ftblk_t), 32,
407 ftblk_constructor, ftblk_destructor, NULL, NULL, NULL, 0);
408
409 /* initialize throttling queue for esballoc */
410 esballoc_queue_init();
411 }
412
413 /*ARGSUSED*/
414 mblk_t *
allocb(size_t size,uint_t pri)415 allocb(size_t size, uint_t pri)
416 {
417 dblk_t *dbp;
418 mblk_t *mp;
419 size_t index;
420
421 index = (size - 1) >> DBLK_SIZE_SHIFT;
422
423 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
424 if (size != 0) {
425 mp = allocb_oversize(size, KM_NOSLEEP);
426 goto out;
427 }
428 index = 0;
429 }
430
431 if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
432 mp = NULL;
433 goto out;
434 }
435
436 mp = dbp->db_mblk;
437 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
438 mp->b_next = mp->b_prev = mp->b_cont = NULL;
439 mp->b_rptr = mp->b_wptr = dbp->db_base;
440 mp->b_queue = NULL;
441 MBLK_BAND_FLAG_WORD(mp) = 0;
442 STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
443 out:
444 FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
445
446 return (mp);
447 }
448
449 /*
450 * Allocate an mblk taking db_credp and db_cpid from the template.
451 * Allow the cred to be NULL.
452 */
453 mblk_t *
allocb_tmpl(size_t size,const mblk_t * tmpl)454 allocb_tmpl(size_t size, const mblk_t *tmpl)
455 {
456 mblk_t *mp = allocb(size, 0);
457
458 if (mp != NULL) {
459 dblk_t *src = tmpl->b_datap;
460 dblk_t *dst = mp->b_datap;
461 cred_t *cr;
462 pid_t cpid;
463
464 cr = msg_getcred(tmpl, &cpid);
465 if (cr != NULL)
466 crhold(dst->db_credp = cr);
467 dst->db_cpid = cpid;
468 dst->db_type = src->db_type;
469 }
470 return (mp);
471 }
472
473 mblk_t *
allocb_cred(size_t size,cred_t * cr,pid_t cpid)474 allocb_cred(size_t size, cred_t *cr, pid_t cpid)
475 {
476 mblk_t *mp = allocb(size, 0);
477
478 ASSERT(cr != NULL);
479 if (mp != NULL) {
480 dblk_t *dbp = mp->b_datap;
481
482 crhold(dbp->db_credp = cr);
483 dbp->db_cpid = cpid;
484 }
485 return (mp);
486 }
487
488 mblk_t *
allocb_cred_wait(size_t size,uint_t flags,int * error,cred_t * cr,pid_t cpid)489 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr, pid_t cpid)
490 {
491 mblk_t *mp = allocb_wait(size, 0, flags, error);
492
493 ASSERT(cr != NULL);
494 if (mp != NULL) {
495 dblk_t *dbp = mp->b_datap;
496
497 crhold(dbp->db_credp = cr);
498 dbp->db_cpid = cpid;
499 }
500
501 return (mp);
502 }
503
504 /*
505 * Extract the db_cred (and optionally db_cpid) from a message.
506 * We find the first mblk which has a non-NULL db_cred and use that.
507 * If none found we return NULL.
508 * Does NOT get a hold on the cred.
509 */
510 cred_t *
msg_getcred(const mblk_t * mp,pid_t * cpidp)511 msg_getcred(const mblk_t *mp, pid_t *cpidp)
512 {
513 cred_t *cr = NULL;
514 cred_t *cr2;
515 mblk_t *mp2;
516
517 while (mp != NULL) {
518 dblk_t *dbp = mp->b_datap;
519
520 cr = dbp->db_credp;
521 if (cr == NULL) {
522 mp = mp->b_cont;
523 continue;
524 }
525 if (cpidp != NULL)
526 *cpidp = dbp->db_cpid;
527
528 #ifdef DEBUG
529 /*
530 * Normally there should at most one db_credp in a message.
531 * But if there are multiple (as in the case of some M_IOC*
532 * and some internal messages in TCP/IP bind logic) then
533 * they must be identical in the normal case.
534 * However, a socket can be shared between different uids
535 * in which case data queued in TCP would be from different
536 * creds. Thus we can only assert for the zoneid being the
537 * same. Due to Multi-level Level Ports for TX, some
538 * cred_t can have a NULL cr_zone, and we skip the comparison
539 * in that case.
540 */
541 mp2 = mp->b_cont;
542 while (mp2 != NULL) {
543 cr2 = DB_CRED(mp2);
544 if (cr2 != NULL) {
545 DTRACE_PROBE2(msg__getcred,
546 cred_t *, cr, cred_t *, cr2);
547 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
548 crgetzone(cr) == NULL ||
549 crgetzone(cr2) == NULL);
550 }
551 mp2 = mp2->b_cont;
552 }
553 #endif
554 return (cr);
555 }
556 if (cpidp != NULL)
557 *cpidp = NOPID;
558 return (NULL);
559 }
560
561 /*
562 * Variant of msg_getcred which, when a cred is found
563 * 1. Returns with a hold on the cred
564 * 2. Clears the first cred in the mblk.
565 * This is more efficient to use than a msg_getcred() + crhold() when
566 * the message is freed after the cred has been extracted.
567 *
568 * The caller is responsible for ensuring that there is no other reference
569 * on the message since db_credp can not be cleared when there are other
570 * references.
571 */
572 cred_t *
msg_extractcred(mblk_t * mp,pid_t * cpidp)573 msg_extractcred(mblk_t *mp, pid_t *cpidp)
574 {
575 cred_t *cr = NULL;
576 cred_t *cr2;
577 mblk_t *mp2;
578
579 while (mp != NULL) {
580 dblk_t *dbp = mp->b_datap;
581
582 cr = dbp->db_credp;
583 if (cr == NULL) {
584 mp = mp->b_cont;
585 continue;
586 }
587 ASSERT(dbp->db_ref == 1);
588 dbp->db_credp = NULL;
589 if (cpidp != NULL)
590 *cpidp = dbp->db_cpid;
591 #ifdef DEBUG
592 /*
593 * Normally there should at most one db_credp in a message.
594 * But if there are multiple (as in the case of some M_IOC*
595 * and some internal messages in TCP/IP bind logic) then
596 * they must be identical in the normal case.
597 * However, a socket can be shared between different uids
598 * in which case data queued in TCP would be from different
599 * creds. Thus we can only assert for the zoneid being the
600 * same. Due to Multi-level Level Ports for TX, some
601 * cred_t can have a NULL cr_zone, and we skip the comparison
602 * in that case.
603 */
604 mp2 = mp->b_cont;
605 while (mp2 != NULL) {
606 cr2 = DB_CRED(mp2);
607 if (cr2 != NULL) {
608 DTRACE_PROBE2(msg__extractcred,
609 cred_t *, cr, cred_t *, cr2);
610 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
611 crgetzone(cr) == NULL ||
612 crgetzone(cr2) == NULL);
613 }
614 mp2 = mp2->b_cont;
615 }
616 #endif
617 return (cr);
618 }
619 return (NULL);
620 }
621 /*
622 * Get the label for a message. Uses the first mblk in the message
623 * which has a non-NULL db_credp.
624 * Returns NULL if there is no credp.
625 */
626 extern struct ts_label_s *
msg_getlabel(const mblk_t * mp)627 msg_getlabel(const mblk_t *mp)
628 {
629 cred_t *cr = msg_getcred(mp, NULL);
630
631 if (cr == NULL)
632 return (NULL);
633
634 return (crgetlabel(cr));
635 }
636
637 void
freeb(mblk_t * mp)638 freeb(mblk_t *mp)
639 {
640 dblk_t *dbp = mp->b_datap;
641
642 ASSERT(dbp->db_ref > 0);
643 ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
644 FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
645
646 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
647
648 dbp->db_free(mp, dbp);
649 }
650
651 void
freemsg(mblk_t * mp)652 freemsg(mblk_t *mp)
653 {
654 FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
655 while (mp) {
656 dblk_t *dbp = mp->b_datap;
657 mblk_t *mp_cont = mp->b_cont;
658
659 ASSERT(dbp->db_ref > 0);
660 ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
661
662 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
663
664 dbp->db_free(mp, dbp);
665 mp = mp_cont;
666 }
667 }
668
669 /*
670 * Reallocate a block for another use. Try hard to use the old block.
671 * If the old data is wanted (copy), leave b_wptr at the end of the data,
672 * otherwise return b_wptr = b_rptr.
673 *
674 * This routine is private and unstable.
675 */
676 mblk_t *
reallocb(mblk_t * mp,size_t size,uint_t copy)677 reallocb(mblk_t *mp, size_t size, uint_t copy)
678 {
679 mblk_t *mp1;
680 unsigned char *old_rptr;
681 ptrdiff_t cur_size;
682
683 if (mp == NULL)
684 return (allocb(size, BPRI_HI));
685
686 cur_size = mp->b_wptr - mp->b_rptr;
687 old_rptr = mp->b_rptr;
688
689 ASSERT(mp->b_datap->db_ref != 0);
690
691 if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
692 /*
693 * If the data is wanted and it will fit where it is, no
694 * work is required.
695 */
696 if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
697 return (mp);
698
699 mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
700 mp1 = mp;
701 } else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
702 /* XXX other mp state could be copied too, db_flags ... ? */
703 mp1->b_cont = mp->b_cont;
704 } else {
705 return (NULL);
706 }
707
708 if (copy) {
709 bcopy(old_rptr, mp1->b_rptr, cur_size);
710 mp1->b_wptr = mp1->b_rptr + cur_size;
711 }
712
713 if (mp != mp1)
714 freeb(mp);
715
716 return (mp1);
717 }
718
719 static void
dblk_lastfree(mblk_t * mp,dblk_t * dbp)720 dblk_lastfree(mblk_t *mp, dblk_t *dbp)
721 {
722 ASSERT(dbp->db_mblk == mp);
723 if (dbp->db_fthdr != NULL)
724 str_ftfree(dbp);
725
726 /* set credp and projid to be 'unspecified' before returning to cache */
727 if (dbp->db_credp != NULL) {
728 crfree(dbp->db_credp);
729 dbp->db_credp = NULL;
730 }
731 dbp->db_cpid = -1;
732
733 /* Reset the struioflag and the checksum flag fields */
734 dbp->db_struioflag = 0;
735 dbp->db_struioun.cksum.flags = 0;
736
737 /* and the COOKED and/or UIOA flag(s) */
738 dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA);
739
740 kmem_cache_free(dbp->db_cache, dbp);
741 }
742
743 static void
dblk_decref(mblk_t * mp,dblk_t * dbp)744 dblk_decref(mblk_t *mp, dblk_t *dbp)
745 {
746 if (dbp->db_ref != 1) {
747 uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
748 -(1 << DBLK_RTFU_SHIFT(db_ref)));
749 /*
750 * atomic_add_32_nv() just decremented db_ref, so we no longer
751 * have a reference to the dblk, which means another thread
752 * could free it. Therefore we cannot examine the dblk to
753 * determine whether ours was the last reference. Instead,
754 * we extract the new and minimum reference counts from rtfu.
755 * Note that all we're really saying is "if (ref != refmin)".
756 */
757 if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
758 ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
759 kmem_cache_free(mblk_cache, mp);
760 return;
761 }
762 }
763 dbp->db_mblk = mp;
764 dbp->db_free = dbp->db_lastfree;
765 dbp->db_lastfree(mp, dbp);
766 }
767
768 mblk_t *
dupb(mblk_t * mp)769 dupb(mblk_t *mp)
770 {
771 dblk_t *dbp = mp->b_datap;
772 mblk_t *new_mp;
773 uint32_t oldrtfu, newrtfu;
774
775 if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
776 goto out;
777
778 new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
779 new_mp->b_rptr = mp->b_rptr;
780 new_mp->b_wptr = mp->b_wptr;
781 new_mp->b_datap = dbp;
782 new_mp->b_queue = NULL;
783 MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
784
785 STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
786
787 dbp->db_free = dblk_decref;
788 do {
789 ASSERT(dbp->db_ref > 0);
790 oldrtfu = DBLK_RTFU_WORD(dbp);
791 newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
792 /*
793 * If db_ref is maxed out we can't dup this message anymore.
794 */
795 if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
796 kmem_cache_free(mblk_cache, new_mp);
797 new_mp = NULL;
798 goto out;
799 }
800 } while (atomic_cas_32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) !=
801 oldrtfu);
802
803 out:
804 FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
805 return (new_mp);
806 }
807
808 static void
dblk_lastfree_desb(mblk_t * mp,dblk_t * dbp)809 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
810 {
811 frtn_t *frp = dbp->db_frtnp;
812
813 ASSERT(dbp->db_mblk == mp);
814 frp->free_func(frp->free_arg);
815 if (dbp->db_fthdr != NULL)
816 str_ftfree(dbp);
817
818 /* set credp and projid to be 'unspecified' before returning to cache */
819 if (dbp->db_credp != NULL) {
820 crfree(dbp->db_credp);
821 dbp->db_credp = NULL;
822 }
823 dbp->db_cpid = -1;
824 dbp->db_struioflag = 0;
825 dbp->db_struioun.cksum.flags = 0;
826
827 kmem_cache_free(dbp->db_cache, dbp);
828 }
829
830 /*ARGSUSED*/
831 static void
frnop_func(void * arg)832 frnop_func(void *arg)
833 {
834 }
835
836 /*
837 * Generic esballoc used to implement the four flavors: [d]esballoc[a].
838 *
839 * The variants with a 'd' prefix (desballoc, desballoca)
840 * directly free the mblk when it loses its last ref,
841 * where the other variants free asynchronously.
842 * The variants with an 'a' suffix (esballoca, desballoca)
843 * add an extra ref, effectively letting the streams subsystem
844 * know that the message data should not be modified.
845 * (eg. see db_ref checks in reallocb and elsewhere)
846 *
847 * The method used by the 'a' suffix functions to keep the dblk
848 * db_ref > 1 is non-obvious. The macro DBLK_RTFU(2,...) passed to
849 * gesballoc sets the initial db_ref = 2 and sets the DBLK_REFMIN
850 * bit in db_flags. In dblk_decref() that flag essentially means
851 * the dblk has one extra ref, so the "last ref" is one, not zero.
852 */
853 static mblk_t *
gesballoc(unsigned char * base,size_t size,uint32_t db_rtfu,frtn_t * frp,void (* lastfree)(mblk_t *,dblk_t *),int kmflags)854 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
855 void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
856 {
857 dblk_t *dbp;
858 mblk_t *mp;
859
860 ASSERT(base != NULL && frp != NULL);
861
862 if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
863 mp = NULL;
864 goto out;
865 }
866
867 mp = dbp->db_mblk;
868 dbp->db_base = base;
869 dbp->db_lim = base + size;
870 dbp->db_free = dbp->db_lastfree = lastfree;
871 dbp->db_frtnp = frp;
872 DBLK_RTFU_WORD(dbp) = db_rtfu;
873 mp->b_next = mp->b_prev = mp->b_cont = NULL;
874 mp->b_rptr = mp->b_wptr = base;
875 mp->b_queue = NULL;
876 MBLK_BAND_FLAG_WORD(mp) = 0;
877
878 out:
879 FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
880 return (mp);
881 }
882
883 /*ARGSUSED*/
884 mblk_t *
esballoc(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)885 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
886 {
887 mblk_t *mp;
888
889 /*
890 * Note that this is structured to allow the common case (i.e.
891 * STREAMS flowtracing disabled) to call gesballoc() with tail
892 * call optimization.
893 */
894 if (!str_ftnever) {
895 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
896 frp, freebs_enqueue, KM_NOSLEEP);
897
898 if (mp != NULL)
899 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
900 return (mp);
901 }
902
903 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
904 frp, freebs_enqueue, KM_NOSLEEP));
905 }
906
907 /*
908 * Same as esballoc() but sleeps waiting for memory.
909 */
910 /*ARGSUSED*/
911 mblk_t *
esballoc_wait(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)912 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
913 {
914 mblk_t *mp;
915
916 /*
917 * Note that this is structured to allow the common case (i.e.
918 * STREAMS flowtracing disabled) to call gesballoc() with tail
919 * call optimization.
920 */
921 if (!str_ftnever) {
922 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
923 frp, freebs_enqueue, KM_SLEEP);
924
925 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
926 return (mp);
927 }
928
929 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
930 frp, freebs_enqueue, KM_SLEEP));
931 }
932
933 /*ARGSUSED*/
934 mblk_t *
desballoc(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)935 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
936 {
937 mblk_t *mp;
938
939 /*
940 * Note that this is structured to allow the common case (i.e.
941 * STREAMS flowtracing disabled) to call gesballoc() with tail
942 * call optimization.
943 */
944 if (!str_ftnever) {
945 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
946 frp, dblk_lastfree_desb, KM_NOSLEEP);
947
948 if (mp != NULL)
949 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
950 return (mp);
951 }
952
953 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
954 frp, dblk_lastfree_desb, KM_NOSLEEP));
955 }
956
957 /*ARGSUSED*/
958 mblk_t *
esballoca(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)959 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
960 {
961 mblk_t *mp;
962
963 /*
964 * Note that this is structured to allow the common case (i.e.
965 * STREAMS flowtracing disabled) to call gesballoc() with tail
966 * call optimization.
967 */
968 if (!str_ftnever) {
969 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
970 frp, freebs_enqueue, KM_NOSLEEP);
971
972 if (mp != NULL)
973 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
974 return (mp);
975 }
976
977 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
978 frp, freebs_enqueue, KM_NOSLEEP));
979 }
980
981 /*
982 * Same as esballoca() but sleeps waiting for memory.
983 */
984 mblk_t *
esballoca_wait(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)985 esballoca_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
986 {
987 mblk_t *mp;
988
989 /*
990 * Note that this is structured to allow the common case (i.e.
991 * STREAMS flowtracing disabled) to call gesballoc() with tail
992 * call optimization.
993 */
994 if (!str_ftnever) {
995 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
996 frp, freebs_enqueue, KM_SLEEP);
997
998 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
999 return (mp);
1000 }
1001
1002 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
1003 frp, freebs_enqueue, KM_SLEEP));
1004 }
1005
1006 /*ARGSUSED*/
1007 mblk_t *
desballoca(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)1008 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
1009 {
1010 mblk_t *mp;
1011
1012 /*
1013 * Note that this is structured to allow the common case (i.e.
1014 * STREAMS flowtracing disabled) to call gesballoc() with tail
1015 * call optimization.
1016 */
1017 if (!str_ftnever) {
1018 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
1019 frp, dblk_lastfree_desb, KM_NOSLEEP);
1020
1021 if (mp != NULL)
1022 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
1023 return (mp);
1024 }
1025
1026 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
1027 frp, dblk_lastfree_desb, KM_NOSLEEP));
1028 }
1029
1030 static void
bcache_dblk_lastfree(mblk_t * mp,dblk_t * dbp)1031 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
1032 {
1033 bcache_t *bcp = dbp->db_cache;
1034
1035 ASSERT(dbp->db_mblk == mp);
1036 if (dbp->db_fthdr != NULL)
1037 str_ftfree(dbp);
1038
1039 /* set credp and projid to be 'unspecified' before returning to cache */
1040 if (dbp->db_credp != NULL) {
1041 crfree(dbp->db_credp);
1042 dbp->db_credp = NULL;
1043 }
1044 dbp->db_cpid = -1;
1045 dbp->db_struioflag = 0;
1046 dbp->db_struioun.cksum.flags = 0;
1047
1048 mutex_enter(&bcp->mutex);
1049 kmem_cache_free(bcp->dblk_cache, dbp);
1050 bcp->alloc--;
1051
1052 if (bcp->alloc == 0 && bcp->destroy != 0) {
1053 kmem_cache_destroy(bcp->dblk_cache);
1054 kmem_cache_destroy(bcp->buffer_cache);
1055 mutex_exit(&bcp->mutex);
1056 mutex_destroy(&bcp->mutex);
1057 kmem_free(bcp, sizeof (bcache_t));
1058 } else {
1059 mutex_exit(&bcp->mutex);
1060 }
1061 }
1062
1063 bcache_t *
bcache_create(char * name,size_t size,uint_t align)1064 bcache_create(char *name, size_t size, uint_t align)
1065 {
1066 bcache_t *bcp;
1067 char buffer[255];
1068
1069 ASSERT((align & (align - 1)) == 0);
1070
1071 if ((bcp = kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == NULL)
1072 return (NULL);
1073
1074 bcp->size = size;
1075 bcp->align = align;
1076 bcp->alloc = 0;
1077 bcp->destroy = 0;
1078
1079 mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
1080
1081 (void) sprintf(buffer, "%s_buffer_cache", name);
1082 bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
1083 NULL, NULL, NULL, 0);
1084 (void) sprintf(buffer, "%s_dblk_cache", name);
1085 bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
1086 DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
1087 NULL, (void *)bcp, NULL, 0);
1088
1089 return (bcp);
1090 }
1091
1092 void
bcache_destroy(bcache_t * bcp)1093 bcache_destroy(bcache_t *bcp)
1094 {
1095 ASSERT(bcp != NULL);
1096
1097 mutex_enter(&bcp->mutex);
1098 if (bcp->alloc == 0) {
1099 kmem_cache_destroy(bcp->dblk_cache);
1100 kmem_cache_destroy(bcp->buffer_cache);
1101 mutex_exit(&bcp->mutex);
1102 mutex_destroy(&bcp->mutex);
1103 kmem_free(bcp, sizeof (bcache_t));
1104 } else {
1105 bcp->destroy++;
1106 mutex_exit(&bcp->mutex);
1107 }
1108 }
1109
1110 /*ARGSUSED*/
1111 mblk_t *
bcache_allocb(bcache_t * bcp,uint_t pri)1112 bcache_allocb(bcache_t *bcp, uint_t pri)
1113 {
1114 dblk_t *dbp;
1115 mblk_t *mp = NULL;
1116
1117 ASSERT(bcp != NULL);
1118
1119 mutex_enter(&bcp->mutex);
1120 if (bcp->destroy != 0) {
1121 mutex_exit(&bcp->mutex);
1122 goto out;
1123 }
1124
1125 if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
1126 mutex_exit(&bcp->mutex);
1127 goto out;
1128 }
1129 bcp->alloc++;
1130 mutex_exit(&bcp->mutex);
1131
1132 ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
1133
1134 mp = dbp->db_mblk;
1135 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1136 mp->b_next = mp->b_prev = mp->b_cont = NULL;
1137 mp->b_rptr = mp->b_wptr = dbp->db_base;
1138 mp->b_queue = NULL;
1139 MBLK_BAND_FLAG_WORD(mp) = 0;
1140 STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
1141 out:
1142 FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
1143
1144 return (mp);
1145 }
1146
1147 static void
dblk_lastfree_oversize(mblk_t * mp,dblk_t * dbp)1148 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
1149 {
1150 ASSERT(dbp->db_mblk == mp);
1151 if (dbp->db_fthdr != NULL)
1152 str_ftfree(dbp);
1153
1154 /* set credp and projid to be 'unspecified' before returning to cache */
1155 if (dbp->db_credp != NULL) {
1156 crfree(dbp->db_credp);
1157 dbp->db_credp = NULL;
1158 }
1159 dbp->db_cpid = -1;
1160 dbp->db_struioflag = 0;
1161 dbp->db_struioun.cksum.flags = 0;
1162
1163 kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
1164 kmem_cache_free(dbp->db_cache, dbp);
1165 }
1166
1167 static mblk_t *
allocb_oversize(size_t size,int kmflags)1168 allocb_oversize(size_t size, int kmflags)
1169 {
1170 mblk_t *mp;
1171 void *buf;
1172
1173 size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
1174 if ((buf = kmem_alloc(size, kmflags)) == NULL)
1175 return (NULL);
1176 if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
1177 &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
1178 kmem_free(buf, size);
1179
1180 if (mp != NULL)
1181 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
1182
1183 return (mp);
1184 }
1185
1186 mblk_t *
allocb_tryhard(size_t target_size)1187 allocb_tryhard(size_t target_size)
1188 {
1189 size_t size;
1190 mblk_t *bp;
1191
1192 for (size = target_size; size < target_size + 512;
1193 size += DBLK_CACHE_ALIGN)
1194 if ((bp = allocb(size, BPRI_HI)) != NULL)
1195 return (bp);
1196 allocb_tryhard_fails++;
1197 return (NULL);
1198 }
1199
1200 /*
1201 * This routine is consolidation private for STREAMS internal use
1202 * This routine may only be called from sync routines (i.e., not
1203 * from put or service procedures). It is located here (rather
1204 * than strsubr.c) so that we don't have to expose all of the
1205 * allocb() implementation details in header files.
1206 */
1207 mblk_t *
allocb_wait(size_t size,uint_t pri,uint_t flags,int * error)1208 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
1209 {
1210 dblk_t *dbp;
1211 mblk_t *mp;
1212 size_t index;
1213
1214 index = (size -1) >> DBLK_SIZE_SHIFT;
1215
1216 if (flags & STR_NOSIG) {
1217 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
1218 if (size != 0) {
1219 mp = allocb_oversize(size, KM_SLEEP);
1220 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
1221 (uintptr_t)mp);
1222 return (mp);
1223 }
1224 index = 0;
1225 }
1226
1227 dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
1228 mp = dbp->db_mblk;
1229 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1230 mp->b_next = mp->b_prev = mp->b_cont = NULL;
1231 mp->b_rptr = mp->b_wptr = dbp->db_base;
1232 mp->b_queue = NULL;
1233 MBLK_BAND_FLAG_WORD(mp) = 0;
1234 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
1235
1236 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
1237
1238 } else {
1239 while ((mp = allocb(size, pri)) == NULL) {
1240 if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
1241 return (NULL);
1242 }
1243 }
1244
1245 return (mp);
1246 }
1247
1248 /*
1249 * Call function 'func' with 'arg' when a class zero block can
1250 * be allocated with priority 'pri'.
1251 */
1252 bufcall_id_t
esbbcall(uint_t pri,void (* func)(void *),void * arg)1253 esbbcall(uint_t pri, void (*func)(void *), void *arg)
1254 {
1255 return (bufcall(1, pri, func, arg));
1256 }
1257
1258 /*
1259 * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
1260 * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
1261 * This provides consistency for all internal allocators of ioctl.
1262 */
1263 mblk_t *
mkiocb(uint_t cmd)1264 mkiocb(uint_t cmd)
1265 {
1266 struct iocblk *ioc;
1267 mblk_t *mp;
1268
1269 /*
1270 * Allocate enough space for any of the ioctl related messages.
1271 */
1272 if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
1273 return (NULL);
1274
1275 bzero(mp->b_rptr, sizeof (union ioctypes));
1276
1277 /*
1278 * Set the mblk_t information and ptrs correctly.
1279 */
1280 mp->b_wptr += sizeof (struct iocblk);
1281 mp->b_datap->db_type = M_IOCTL;
1282
1283 /*
1284 * Fill in the fields.
1285 */
1286 ioc = (struct iocblk *)mp->b_rptr;
1287 ioc->ioc_cmd = cmd;
1288 ioc->ioc_cr = kcred;
1289 ioc->ioc_id = getiocseqno();
1290 ioc->ioc_flag = IOC_NATIVE;
1291 return (mp);
1292 }
1293
1294 /*
1295 * test if block of given size can be allocated with a request of
1296 * the given priority.
1297 * 'pri' is no longer used, but is retained for compatibility.
1298 */
1299 /* ARGSUSED */
1300 int
testb(size_t size,uint_t pri)1301 testb(size_t size, uint_t pri)
1302 {
1303 return ((size + sizeof (dblk_t)) <= kmem_avail());
1304 }
1305
1306 /*
1307 * Call function 'func' with argument 'arg' when there is a reasonably
1308 * good chance that a block of size 'size' can be allocated.
1309 * 'pri' is no longer used, but is retained for compatibility.
1310 */
1311 /* ARGSUSED */
1312 bufcall_id_t
bufcall(size_t size,uint_t pri,void (* func)(void *),void * arg)1313 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
1314 {
1315 static long bid = 1; /* always odd to save checking for zero */
1316 bufcall_id_t bc_id;
1317 struct strbufcall *bcp;
1318
1319 if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
1320 return (0);
1321
1322 bcp->bc_func = func;
1323 bcp->bc_arg = arg;
1324 bcp->bc_size = size;
1325 bcp->bc_next = NULL;
1326 bcp->bc_executor = NULL;
1327
1328 mutex_enter(&strbcall_lock);
1329 /*
1330 * After bcp is linked into strbcalls and strbcall_lock is dropped there
1331 * should be no references to bcp since it may be freed by
1332 * runbufcalls(). Since bcp_id field is returned, we save its value in
1333 * the local var.
1334 */
1335 bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2); /* keep it odd */
1336
1337 /*
1338 * add newly allocated stream event to existing
1339 * linked list of events.
1340 */
1341 if (strbcalls.bc_head == NULL) {
1342 strbcalls.bc_head = strbcalls.bc_tail = bcp;
1343 } else {
1344 strbcalls.bc_tail->bc_next = bcp;
1345 strbcalls.bc_tail = bcp;
1346 }
1347
1348 cv_signal(&strbcall_cv);
1349 mutex_exit(&strbcall_lock);
1350 return (bc_id);
1351 }
1352
1353 /*
1354 * Cancel a bufcall request.
1355 */
1356 void
unbufcall(bufcall_id_t id)1357 unbufcall(bufcall_id_t id)
1358 {
1359 strbufcall_t *bcp, *pbcp;
1360
1361 mutex_enter(&strbcall_lock);
1362 again:
1363 pbcp = NULL;
1364 for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
1365 if (id == bcp->bc_id)
1366 break;
1367 pbcp = bcp;
1368 }
1369 if (bcp) {
1370 if (bcp->bc_executor != NULL) {
1371 if (bcp->bc_executor != curthread) {
1372 cv_wait(&bcall_cv, &strbcall_lock);
1373 goto again;
1374 }
1375 } else {
1376 if (pbcp)
1377 pbcp->bc_next = bcp->bc_next;
1378 else
1379 strbcalls.bc_head = bcp->bc_next;
1380 if (bcp == strbcalls.bc_tail)
1381 strbcalls.bc_tail = pbcp;
1382 kmem_free(bcp, sizeof (strbufcall_t));
1383 }
1384 }
1385 mutex_exit(&strbcall_lock);
1386 }
1387
1388 /*
1389 * Duplicate a message block by block (uses dupb), returning
1390 * a pointer to the duplicate message.
1391 * Returns a non-NULL value only if the entire message
1392 * was dup'd.
1393 */
1394 mblk_t *
dupmsg(mblk_t * bp)1395 dupmsg(mblk_t *bp)
1396 {
1397 mblk_t *head, *nbp;
1398
1399 if (!bp || !(nbp = head = dupb(bp)))
1400 return (NULL);
1401
1402 while (bp->b_cont) {
1403 if (!(nbp->b_cont = dupb(bp->b_cont))) {
1404 freemsg(head);
1405 return (NULL);
1406 }
1407 nbp = nbp->b_cont;
1408 bp = bp->b_cont;
1409 }
1410 return (head);
1411 }
1412
1413 #define DUPB_NOLOAN(bp) \
1414 ((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
1415 copyb((bp)) : dupb((bp)))
1416
1417 mblk_t *
dupmsg_noloan(mblk_t * bp)1418 dupmsg_noloan(mblk_t *bp)
1419 {
1420 mblk_t *head, *nbp;
1421
1422 if (bp == NULL || DB_TYPE(bp) != M_DATA ||
1423 ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
1424 return (NULL);
1425
1426 while (bp->b_cont) {
1427 if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
1428 freemsg(head);
1429 return (NULL);
1430 }
1431 nbp = nbp->b_cont;
1432 bp = bp->b_cont;
1433 }
1434 return (head);
1435 }
1436
1437 /*
1438 * Copy data from message and data block to newly allocated message and
1439 * data block. Returns new message block pointer, or NULL if error.
1440 * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
1441 * as in the original even when db_base is not word aligned. (bug 1052877)
1442 */
1443 mblk_t *
copyb(mblk_t * bp)1444 copyb(mblk_t *bp)
1445 {
1446 mblk_t *nbp;
1447 dblk_t *dp, *ndp;
1448 uchar_t *base;
1449 size_t size;
1450 size_t unaligned;
1451
1452 ASSERT(bp->b_wptr >= bp->b_rptr);
1453
1454 dp = bp->b_datap;
1455 if (dp->db_fthdr != NULL)
1456 STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
1457
1458 size = dp->db_lim - dp->db_base;
1459 unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
1460 if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
1461 return (NULL);
1462 nbp->b_flag = bp->b_flag;
1463 nbp->b_band = bp->b_band;
1464 ndp = nbp->b_datap;
1465
1466 /*
1467 * Copy the various checksum information that came in
1468 * originally.
1469 */
1470 ndp->db_cksumstart = dp->db_cksumstart;
1471 ndp->db_cksumend = dp->db_cksumend;
1472 ndp->db_cksumstuff = dp->db_cksumstuff;
1473 bcopy(dp->db_struioun.data, ndp->db_struioun.data,
1474 sizeof (dp->db_struioun.data));
1475
1476 /*
1477 * Well, here is a potential issue. If we are trying to
1478 * trace a flow, and we copy the message, we might lose
1479 * information about where this message might have been.
1480 * So we should inherit the FT data. On the other hand,
1481 * a user might be interested only in alloc to free data.
1482 * So I guess the real answer is to provide a tunable.
1483 */
1484 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1485
1486 base = ndp->db_base + unaligned;
1487 bcopy(dp->db_base, ndp->db_base + unaligned, size);
1488
1489 nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
1490 nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
1491
1492 return (nbp);
1493 }
1494
1495 /*
1496 * Copy data from message to newly allocated message using new
1497 * data blocks. Returns a pointer to the new message, or NULL if error.
1498 */
1499 mblk_t *
copymsg(mblk_t * bp)1500 copymsg(mblk_t *bp)
1501 {
1502 mblk_t *head, *nbp;
1503
1504 if (!bp || !(nbp = head = copyb(bp)))
1505 return (NULL);
1506
1507 while (bp->b_cont) {
1508 if (!(nbp->b_cont = copyb(bp->b_cont))) {
1509 freemsg(head);
1510 return (NULL);
1511 }
1512 nbp = nbp->b_cont;
1513 bp = bp->b_cont;
1514 }
1515 return (head);
1516 }
1517
1518 /*
1519 * link a message block to tail of message
1520 */
1521 void
linkb(mblk_t * mp,mblk_t * bp)1522 linkb(mblk_t *mp, mblk_t *bp)
1523 {
1524 ASSERT(mp && bp);
1525
1526 for (; mp->b_cont; mp = mp->b_cont)
1527 ;
1528 mp->b_cont = bp;
1529 }
1530
1531 /*
1532 * unlink a message block from head of message
1533 * return pointer to new message.
1534 * NULL if message becomes empty.
1535 */
1536 mblk_t *
unlinkb(mblk_t * bp)1537 unlinkb(mblk_t *bp)
1538 {
1539 mblk_t *bp1;
1540
1541 bp1 = bp->b_cont;
1542 bp->b_cont = NULL;
1543 return (bp1);
1544 }
1545
1546 /*
1547 * remove a message block "bp" from message "mp"
1548 *
1549 * Return pointer to new message or NULL if no message remains.
1550 * Return -1 if bp is not found in message.
1551 */
1552 mblk_t *
rmvb(mblk_t * mp,mblk_t * bp)1553 rmvb(mblk_t *mp, mblk_t *bp)
1554 {
1555 mblk_t *tmp;
1556 mblk_t *lastp = NULL;
1557
1558 ASSERT(mp && bp);
1559 for (tmp = mp; tmp; tmp = tmp->b_cont) {
1560 if (tmp == bp) {
1561 if (lastp)
1562 lastp->b_cont = tmp->b_cont;
1563 else
1564 mp = tmp->b_cont;
1565 tmp->b_cont = NULL;
1566 return (mp);
1567 }
1568 lastp = tmp;
1569 }
1570 return ((mblk_t *)-1);
1571 }
1572
1573 /*
1574 * Concatenate and align first len bytes of common
1575 * message type. Len == -1, means concat everything.
1576 * Returns 1 on success, 0 on failure
1577 * After the pullup, mp points to the pulled up data.
1578 */
1579 int
pullupmsg(mblk_t * mp,ssize_t len)1580 pullupmsg(mblk_t *mp, ssize_t len)
1581 {
1582 mblk_t *bp, *b_cont;
1583 dblk_t *dbp;
1584 ssize_t n;
1585
1586 ASSERT(mp->b_datap->db_ref > 0);
1587 ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
1588
1589 if (len == -1) {
1590 if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
1591 return (1);
1592 len = xmsgsize(mp);
1593 } else {
1594 ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
1595 ASSERT(first_mblk_len >= 0);
1596 /*
1597 * If the length is less than that of the first mblk,
1598 * we want to pull up the message into an aligned mblk.
1599 * Though not part of the spec, some callers assume it.
1600 */
1601 if (len <= first_mblk_len) {
1602 if (str_aligned(mp->b_rptr))
1603 return (1);
1604 len = first_mblk_len;
1605 } else if (xmsgsize(mp) < len)
1606 return (0);
1607 }
1608
1609 if ((bp = allocb_tmpl(len, mp)) == NULL)
1610 return (0);
1611
1612 dbp = bp->b_datap;
1613 *bp = *mp; /* swap mblks so bp heads the old msg... */
1614 mp->b_datap = dbp; /* ... and mp heads the new message */
1615 mp->b_datap->db_mblk = mp;
1616 bp->b_datap->db_mblk = bp;
1617 mp->b_rptr = mp->b_wptr = dbp->db_base;
1618
1619 do {
1620 ASSERT(bp->b_datap->db_ref > 0);
1621 ASSERT(bp->b_wptr >= bp->b_rptr);
1622 n = MIN(bp->b_wptr - bp->b_rptr, len);
1623 ASSERT(n >= 0); /* allow zero-length mblk_t's */
1624 if (n > 0)
1625 bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
1626 mp->b_wptr += n;
1627 bp->b_rptr += n;
1628 len -= n;
1629 if (bp->b_rptr != bp->b_wptr)
1630 break;
1631 b_cont = bp->b_cont;
1632 freeb(bp);
1633 bp = b_cont;
1634 } while (len && bp);
1635
1636 mp->b_cont = bp; /* tack on whatever wasn't pulled up */
1637
1638 return (1);
1639 }
1640
1641 /*
1642 * Concatenate and align at least the first len bytes of common message
1643 * type. Len == -1 means concatenate everything. The original message is
1644 * unaltered. Returns a pointer to a new message on success, otherwise
1645 * returns NULL.
1646 */
1647 mblk_t *
msgpullup(mblk_t * mp,ssize_t len)1648 msgpullup(mblk_t *mp, ssize_t len)
1649 {
1650 mblk_t *newmp;
1651 ssize_t totlen = xmsgsize(mp);
1652 ssize_t offset = 0;
1653
1654 if (len == -1)
1655 len = totlen;
1656
1657 if (len < 0 || (len > 0 && len > totlen))
1658 return (NULL);
1659
1660 if ((newmp = allocb_tmpl(len, mp)) == NULL)
1661 return (NULL);
1662
1663 newmp->b_flag = mp->b_flag;
1664 newmp->b_band = mp->b_band;
1665
1666 while (len > 0) {
1667 ssize_t seglen = MBLKL(mp);
1668 ssize_t n = MIN(seglen, len);
1669
1670 ASSERT3P(mp, !=, NULL); /* guaranteed by len <= totlen */
1671 ASSERT3S(n, >=, 0); /* allow zero-length mblk_t's */
1672 if (n > 0)
1673 bcopy(mp->b_rptr, newmp->b_wptr, n);
1674 newmp->b_wptr += n;
1675 len -= n;
1676
1677 if (n == seglen)
1678 mp = mp->b_cont;
1679 else if (len == 0)
1680 offset = n;
1681 }
1682 ASSERT3S(len, ==, 0);
1683
1684 if (mp != NULL) {
1685 newmp->b_cont = dupmsg(mp);
1686 if (newmp->b_cont == NULL) {
1687 freemsg(newmp);
1688 return (NULL);
1689 }
1690 ASSERT3S(offset, >=, 0);
1691 ASSERT3U(MBLKL(newmp->b_cont), >=, offset);
1692 newmp->b_cont->b_rptr += offset;
1693 }
1694
1695 return (newmp);
1696 }
1697
1698 /*
1699 * Trim bytes from message
1700 * len > 0, trim from head
1701 * len < 0, trim from tail
1702 * Returns 1 on success, 0 on failure.
1703 */
1704 int
adjmsg(mblk_t * mp,ssize_t len)1705 adjmsg(mblk_t *mp, ssize_t len)
1706 {
1707 mblk_t *bp;
1708 mblk_t *save_bp = NULL;
1709 mblk_t *prev_bp;
1710 mblk_t *bcont;
1711 unsigned char type;
1712 ssize_t n;
1713 int fromhead;
1714 int first;
1715
1716 ASSERT(mp != NULL);
1717
1718 if (len < 0) {
1719 fromhead = 0;
1720 len = -len;
1721 } else {
1722 fromhead = 1;
1723 }
1724
1725 if (xmsgsize(mp) < len)
1726 return (0);
1727
1728 if (fromhead) {
1729 first = 1;
1730 while (len) {
1731 ASSERT(mp->b_wptr >= mp->b_rptr);
1732 n = MIN(mp->b_wptr - mp->b_rptr, len);
1733 mp->b_rptr += n;
1734 len -= n;
1735
1736 /*
1737 * If this is not the first zero length
1738 * message remove it
1739 */
1740 if (!first && (mp->b_wptr == mp->b_rptr)) {
1741 bcont = mp->b_cont;
1742 freeb(mp);
1743 mp = save_bp->b_cont = bcont;
1744 } else {
1745 save_bp = mp;
1746 mp = mp->b_cont;
1747 }
1748 first = 0;
1749 }
1750 } else {
1751 type = mp->b_datap->db_type;
1752 while (len) {
1753 bp = mp;
1754 save_bp = NULL;
1755
1756 /*
1757 * Find the last message of same type
1758 */
1759 while (bp && bp->b_datap->db_type == type) {
1760 ASSERT(bp->b_wptr >= bp->b_rptr);
1761 prev_bp = save_bp;
1762 save_bp = bp;
1763 bp = bp->b_cont;
1764 }
1765 if (save_bp == NULL)
1766 break;
1767 n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
1768 save_bp->b_wptr -= n;
1769 len -= n;
1770
1771 /*
1772 * If this is not the first message
1773 * and we have taken away everything
1774 * from this message, remove it
1775 */
1776
1777 if ((save_bp != mp) &&
1778 (save_bp->b_wptr == save_bp->b_rptr)) {
1779 bcont = save_bp->b_cont;
1780 freeb(save_bp);
1781 prev_bp->b_cont = bcont;
1782 }
1783 }
1784 }
1785 return (1);
1786 }
1787
1788 /*
1789 * get number of data bytes in message
1790 */
1791 size_t
msgdsize(mblk_t * bp)1792 msgdsize(mblk_t *bp)
1793 {
1794 size_t count = 0;
1795
1796 for (; bp; bp = bp->b_cont)
1797 if (bp->b_datap->db_type == M_DATA) {
1798 ASSERT(bp->b_wptr >= bp->b_rptr);
1799 count += bp->b_wptr - bp->b_rptr;
1800 }
1801 return (count);
1802 }
1803
1804 /*
1805 * Get a message off head of queue
1806 *
1807 * If queue has no buffers then mark queue
1808 * with QWANTR. (queue wants to be read by
1809 * someone when data becomes available)
1810 *
1811 * If there is something to take off then do so.
1812 * If queue falls below hi water mark turn off QFULL
1813 * flag. Decrement weighted count of queue.
1814 * Also turn off QWANTR because queue is being read.
1815 *
1816 * The queue count is maintained on a per-band basis.
1817 * Priority band 0 (normal messages) uses q_count,
1818 * q_lowat, etc. Non-zero priority bands use the
1819 * fields in their respective qband structures
1820 * (qb_count, qb_lowat, etc.) All messages appear
1821 * on the same list, linked via their b_next pointers.
1822 * q_first is the head of the list. q_count does
1823 * not reflect the size of all the messages on the
1824 * queue. It only reflects those messages in the
1825 * normal band of flow. The one exception to this
1826 * deals with high priority messages. They are in
1827 * their own conceptual "band", but are accounted
1828 * against q_count.
1829 *
1830 * If queue count is below the lo water mark and QWANTW
1831 * is set, enable the closest backq which has a service
1832 * procedure and turn off the QWANTW flag.
1833 *
1834 * getq could be built on top of rmvq, but isn't because
1835 * of performance considerations.
1836 *
1837 * A note on the use of q_count and q_mblkcnt:
1838 * q_count is the traditional byte count for messages that
1839 * have been put on a queue. Documentation tells us that
1840 * we shouldn't rely on that count, but some drivers/modules
1841 * do. What was needed, however, is a mechanism to prevent
1842 * runaway streams from consuming all of the resources,
1843 * and particularly be able to flow control zero-length
1844 * messages. q_mblkcnt is used for this purpose. It
1845 * counts the number of mblk's that are being put on
1846 * the queue. The intention here, is that each mblk should
1847 * contain one byte of data and, for the purpose of
1848 * flow-control, logically does. A queue will become
1849 * full when EITHER of these values (q_count and q_mblkcnt)
1850 * reach the highwater mark. It will clear when BOTH
1851 * of them drop below the highwater mark. And it will
1852 * backenable when BOTH of them drop below the lowwater
1853 * mark.
1854 * With this algorithm, a driver/module might be able
1855 * to find a reasonably accurate q_count, and the
1856 * framework can still try and limit resource usage.
1857 */
1858 mblk_t *
getq(queue_t * q)1859 getq(queue_t *q)
1860 {
1861 mblk_t *bp;
1862 uchar_t band = 0;
1863
1864 bp = getq_noenab(q, 0);
1865 if (bp != NULL)
1866 band = bp->b_band;
1867
1868 /*
1869 * Inlined from qbackenable().
1870 * Quick check without holding the lock.
1871 */
1872 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1873 return (bp);
1874
1875 qbackenable(q, band);
1876 return (bp);
1877 }
1878
1879 /*
1880 * Returns the number of bytes in a message (a message is defined as a
1881 * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we
1882 * also return the number of distinct mblks in the message.
1883 */
1884 int
mp_cont_len(mblk_t * bp,int * mblkcnt)1885 mp_cont_len(mblk_t *bp, int *mblkcnt)
1886 {
1887 mblk_t *mp;
1888 int mblks = 0;
1889 int bytes = 0;
1890
1891 for (mp = bp; mp != NULL; mp = mp->b_cont) {
1892 bytes += MBLKL(mp);
1893 mblks++;
1894 }
1895
1896 if (mblkcnt != NULL)
1897 *mblkcnt = mblks;
1898
1899 return (bytes);
1900 }
1901
1902 /*
1903 * Like getq() but does not backenable. This is used by the stream
1904 * head when a putback() is likely. The caller must call qbackenable()
1905 * after it is done with accessing the queue.
1906 * The rbytes arguments to getq_noneab() allows callers to specify a
1907 * the maximum number of bytes to return. If the current amount on the
1908 * queue is less than this then the entire message will be returned.
1909 * A value of 0 returns the entire message and is equivalent to the old
1910 * default behaviour prior to the addition of the rbytes argument.
1911 */
1912 mblk_t *
getq_noenab(queue_t * q,ssize_t rbytes)1913 getq_noenab(queue_t *q, ssize_t rbytes)
1914 {
1915 mblk_t *bp, *mp1;
1916 mblk_t *mp2 = NULL;
1917 qband_t *qbp;
1918 kthread_id_t freezer;
1919 int bytecnt = 0, mblkcnt = 0;
1920
1921 /* freezestr should allow its caller to call getq/putq */
1922 freezer = STREAM(q)->sd_freezer;
1923 if (freezer == curthread) {
1924 ASSERT(frozenstr(q));
1925 ASSERT(MUTEX_HELD(QLOCK(q)));
1926 } else
1927 mutex_enter(QLOCK(q));
1928
1929 if ((bp = q->q_first) == 0) {
1930 q->q_flag |= QWANTR;
1931 } else {
1932 /*
1933 * If the caller supplied a byte threshold and there is
1934 * more than this amount on the queue then break up the
1935 * the message appropriately. We can only safely do
1936 * this for M_DATA messages.
1937 */
1938 if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) &&
1939 (q->q_count > rbytes)) {
1940 /*
1941 * Inline version of mp_cont_len() which terminates
1942 * when we meet or exceed rbytes.
1943 */
1944 for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) {
1945 mblkcnt++;
1946 bytecnt += MBLKL(mp1);
1947 if (bytecnt >= rbytes)
1948 break;
1949 }
1950 /*
1951 * We need to account for the following scenarios:
1952 *
1953 * 1) Too much data in the first message:
1954 * mp1 will be the mblk which puts us over our
1955 * byte limit.
1956 * 2) Not enough data in the first message:
1957 * mp1 will be NULL.
1958 * 3) Exactly the right amount of data contained within
1959 * whole mblks:
1960 * mp1->b_cont will be where we break the message.
1961 */
1962 if (bytecnt > rbytes) {
1963 /*
1964 * Dup/copy mp1 and put what we don't need
1965 * back onto the queue. Adjust the read/write
1966 * and continuation pointers appropriately
1967 * and decrement the current mblk count to
1968 * reflect we are putting an mblk back onto
1969 * the queue.
1970 * When adjusting the message pointers, it's
1971 * OK to use the existing bytecnt and the
1972 * requested amount (rbytes) to calculate the
1973 * the new write offset (b_wptr) of what we
1974 * are taking. However, we cannot use these
1975 * values when calculating the read offset of
1976 * the mblk we are putting back on the queue.
1977 * This is because the begining (b_rptr) of the
1978 * mblk represents some arbitrary point within
1979 * the message.
1980 * It's simplest to do this by advancing b_rptr
1981 * by the new length of mp1 as we don't have to
1982 * remember any intermediate state.
1983 */
1984 ASSERT(mp1 != NULL);
1985 mblkcnt--;
1986 if ((mp2 = dupb(mp1)) == NULL &&
1987 (mp2 = copyb(mp1)) == NULL) {
1988 bytecnt = mblkcnt = 0;
1989 goto dup_failed;
1990 }
1991 mp2->b_cont = mp1->b_cont;
1992 mp1->b_wptr -= bytecnt - rbytes;
1993 mp2->b_rptr += mp1->b_wptr - mp1->b_rptr;
1994 mp1->b_cont = NULL;
1995 bytecnt = rbytes;
1996 } else {
1997 /*
1998 * Either there is not enough data in the first
1999 * message or there is no excess data to deal
2000 * with. If mp1 is NULL, we are taking the
2001 * whole message. No need to do anything.
2002 * Otherwise we assign mp1->b_cont to mp2 as
2003 * we will be putting this back onto the head of
2004 * the queue.
2005 */
2006 if (mp1 != NULL) {
2007 mp2 = mp1->b_cont;
2008 mp1->b_cont = NULL;
2009 }
2010 }
2011 /*
2012 * If mp2 is not NULL then we have part of the message
2013 * to put back onto the queue.
2014 */
2015 if (mp2 != NULL) {
2016 if ((mp2->b_next = bp->b_next) == NULL)
2017 q->q_last = mp2;
2018 else
2019 bp->b_next->b_prev = mp2;
2020 q->q_first = mp2;
2021 } else {
2022 if ((q->q_first = bp->b_next) == NULL)
2023 q->q_last = NULL;
2024 else
2025 q->q_first->b_prev = NULL;
2026 }
2027 } else {
2028 /*
2029 * Either no byte threshold was supplied, there is
2030 * not enough on the queue or we failed to
2031 * duplicate/copy a data block. In these cases we
2032 * just take the entire first message.
2033 */
2034 dup_failed:
2035 bytecnt = mp_cont_len(bp, &mblkcnt);
2036 if ((q->q_first = bp->b_next) == NULL)
2037 q->q_last = NULL;
2038 else
2039 q->q_first->b_prev = NULL;
2040 }
2041 if (bp->b_band == 0) {
2042 q->q_count -= bytecnt;
2043 q->q_mblkcnt -= mblkcnt;
2044 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2045 (q->q_mblkcnt < q->q_hiwat))) {
2046 q->q_flag &= ~QFULL;
2047 }
2048 } else {
2049 int i;
2050
2051 ASSERT(bp->b_band <= q->q_nband);
2052 ASSERT(q->q_bandp != NULL);
2053 ASSERT(MUTEX_HELD(QLOCK(q)));
2054 qbp = q->q_bandp;
2055 i = bp->b_band;
2056 while (--i > 0)
2057 qbp = qbp->qb_next;
2058 if (qbp->qb_first == qbp->qb_last) {
2059 qbp->qb_first = NULL;
2060 qbp->qb_last = NULL;
2061 } else {
2062 qbp->qb_first = bp->b_next;
2063 }
2064 qbp->qb_count -= bytecnt;
2065 qbp->qb_mblkcnt -= mblkcnt;
2066 if (qbp->qb_mblkcnt == 0 ||
2067 ((qbp->qb_count < qbp->qb_hiwat) &&
2068 (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2069 qbp->qb_flag &= ~QB_FULL;
2070 }
2071 }
2072 q->q_flag &= ~QWANTR;
2073 bp->b_next = NULL;
2074 bp->b_prev = NULL;
2075 }
2076 if (freezer != curthread)
2077 mutex_exit(QLOCK(q));
2078
2079 STR_FTEVENT_MSG(bp, q, FTEV_GETQ, 0);
2080
2081 return (bp);
2082 }
2083
2084 /*
2085 * Determine if a backenable is needed after removing a message in the
2086 * specified band.
2087 * NOTE: This routine assumes that something like getq_noenab() has been
2088 * already called.
2089 *
2090 * For the read side it is ok to hold sd_lock across calling this (and the
2091 * stream head often does).
2092 * But for the write side strwakeq might be invoked and it acquires sd_lock.
2093 */
2094 void
qbackenable(queue_t * q,uchar_t band)2095 qbackenable(queue_t *q, uchar_t band)
2096 {
2097 int backenab = 0;
2098 qband_t *qbp;
2099 kthread_id_t freezer;
2100
2101 ASSERT(q);
2102 ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
2103
2104 /*
2105 * Quick check without holding the lock.
2106 * OK since after getq() has lowered the q_count these flags
2107 * would not change unless either the qbackenable() is done by
2108 * another thread (which is ok) or the queue has gotten QFULL
2109 * in which case another backenable will take place when the queue
2110 * drops below q_lowat.
2111 */
2112 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
2113 return;
2114
2115 /* freezestr should allow its caller to call getq/putq */
2116 freezer = STREAM(q)->sd_freezer;
2117 if (freezer == curthread) {
2118 ASSERT(frozenstr(q));
2119 ASSERT(MUTEX_HELD(QLOCK(q)));
2120 } else
2121 mutex_enter(QLOCK(q));
2122
2123 if (band == 0) {
2124 if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
2125 q->q_mblkcnt < q->q_lowat)) {
2126 backenab = q->q_flag & (QWANTW|QWANTWSYNC);
2127 }
2128 } else {
2129 int i;
2130
2131 ASSERT((unsigned)band <= q->q_nband);
2132 ASSERT(q->q_bandp != NULL);
2133
2134 qbp = q->q_bandp;
2135 i = band;
2136 while (--i > 0)
2137 qbp = qbp->qb_next;
2138
2139 if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
2140 qbp->qb_mblkcnt < qbp->qb_lowat)) {
2141 backenab = qbp->qb_flag & QB_WANTW;
2142 }
2143 }
2144
2145 if (backenab == 0) {
2146 if (freezer != curthread)
2147 mutex_exit(QLOCK(q));
2148 return;
2149 }
2150
2151 /* Have to drop the lock across strwakeq and backenable */
2152 if (backenab & QWANTWSYNC)
2153 q->q_flag &= ~QWANTWSYNC;
2154 if (backenab & (QWANTW|QB_WANTW)) {
2155 if (band != 0)
2156 qbp->qb_flag &= ~QB_WANTW;
2157 else {
2158 q->q_flag &= ~QWANTW;
2159 }
2160 }
2161
2162 if (freezer != curthread)
2163 mutex_exit(QLOCK(q));
2164
2165 if (backenab & QWANTWSYNC)
2166 strwakeq(q, QWANTWSYNC);
2167 if (backenab & (QWANTW|QB_WANTW))
2168 backenable(q, band);
2169 }
2170
2171 /*
2172 * Remove a message from a queue. The queue count and other
2173 * flow control parameters are adjusted and the back queue
2174 * enabled if necessary.
2175 *
2176 * rmvq can be called with the stream frozen, but other utility functions
2177 * holding QLOCK, and by streams modules without any locks/frozen.
2178 */
2179 void
rmvq(queue_t * q,mblk_t * mp)2180 rmvq(queue_t *q, mblk_t *mp)
2181 {
2182 ASSERT(mp != NULL);
2183
2184 rmvq_noenab(q, mp);
2185 if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
2186 /*
2187 * qbackenable can handle a frozen stream but not a "random"
2188 * qlock being held. Drop lock across qbackenable.
2189 */
2190 mutex_exit(QLOCK(q));
2191 qbackenable(q, mp->b_band);
2192 mutex_enter(QLOCK(q));
2193 } else {
2194 qbackenable(q, mp->b_band);
2195 }
2196 }
2197
2198 /*
2199 * Like rmvq() but without any backenabling.
2200 * This exists to handle SR_CONSOL_DATA in strrput().
2201 */
2202 void
rmvq_noenab(queue_t * q,mblk_t * mp)2203 rmvq_noenab(queue_t *q, mblk_t *mp)
2204 {
2205 int i;
2206 qband_t *qbp = NULL;
2207 kthread_id_t freezer;
2208 int bytecnt = 0, mblkcnt = 0;
2209
2210 freezer = STREAM(q)->sd_freezer;
2211 if (freezer == curthread) {
2212 ASSERT(frozenstr(q));
2213 ASSERT(MUTEX_HELD(QLOCK(q)));
2214 } else if (MUTEX_HELD(QLOCK(q))) {
2215 /* Don't drop lock on exit */
2216 freezer = curthread;
2217 } else
2218 mutex_enter(QLOCK(q));
2219
2220 ASSERT(mp->b_band <= q->q_nband);
2221 if (mp->b_band != 0) { /* Adjust band pointers */
2222 ASSERT(q->q_bandp != NULL);
2223 qbp = q->q_bandp;
2224 i = mp->b_band;
2225 while (--i > 0)
2226 qbp = qbp->qb_next;
2227 if (mp == qbp->qb_first) {
2228 if (mp->b_next && mp->b_band == mp->b_next->b_band)
2229 qbp->qb_first = mp->b_next;
2230 else
2231 qbp->qb_first = NULL;
2232 }
2233 if (mp == qbp->qb_last) {
2234 if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
2235 qbp->qb_last = mp->b_prev;
2236 else
2237 qbp->qb_last = NULL;
2238 }
2239 }
2240
2241 /*
2242 * Remove the message from the list.
2243 */
2244 if (mp->b_prev)
2245 mp->b_prev->b_next = mp->b_next;
2246 else
2247 q->q_first = mp->b_next;
2248 if (mp->b_next)
2249 mp->b_next->b_prev = mp->b_prev;
2250 else
2251 q->q_last = mp->b_prev;
2252 mp->b_next = NULL;
2253 mp->b_prev = NULL;
2254
2255 /* Get the size of the message for q_count accounting */
2256 bytecnt = mp_cont_len(mp, &mblkcnt);
2257
2258 if (mp->b_band == 0) { /* Perform q_count accounting */
2259 q->q_count -= bytecnt;
2260 q->q_mblkcnt -= mblkcnt;
2261 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2262 (q->q_mblkcnt < q->q_hiwat))) {
2263 q->q_flag &= ~QFULL;
2264 }
2265 } else { /* Perform qb_count accounting */
2266 qbp->qb_count -= bytecnt;
2267 qbp->qb_mblkcnt -= mblkcnt;
2268 if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) &&
2269 (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2270 qbp->qb_flag &= ~QB_FULL;
2271 }
2272 }
2273 if (freezer != curthread)
2274 mutex_exit(QLOCK(q));
2275
2276 STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, 0);
2277 }
2278
2279 /*
2280 * Empty a queue.
2281 * If flag is set, remove all messages. Otherwise, remove
2282 * only non-control messages. If queue falls below its low
2283 * water mark, and QWANTW is set, enable the nearest upstream
2284 * service procedure.
2285 *
2286 * Historical note: when merging the M_FLUSH code in strrput with this
2287 * code one difference was discovered. flushq did not have a check
2288 * for q_lowat == 0 in the backenabling test.
2289 *
2290 * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
2291 * if one exists on the queue.
2292 */
2293 void
flushq_common(queue_t * q,int flag,int pcproto_flag)2294 flushq_common(queue_t *q, int flag, int pcproto_flag)
2295 {
2296 mblk_t *mp, *nmp;
2297 qband_t *qbp;
2298 int backenab = 0;
2299 unsigned char bpri;
2300 unsigned char qbf[NBAND]; /* band flushing backenable flags */
2301
2302 if (q->q_first == NULL)
2303 return;
2304
2305 mutex_enter(QLOCK(q));
2306 mp = q->q_first;
2307 q->q_first = NULL;
2308 q->q_last = NULL;
2309 q->q_count = 0;
2310 q->q_mblkcnt = 0;
2311 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2312 qbp->qb_first = NULL;
2313 qbp->qb_last = NULL;
2314 qbp->qb_count = 0;
2315 qbp->qb_mblkcnt = 0;
2316 qbp->qb_flag &= ~QB_FULL;
2317 }
2318 q->q_flag &= ~QFULL;
2319 mutex_exit(QLOCK(q));
2320 while (mp) {
2321 nmp = mp->b_next;
2322 mp->b_next = mp->b_prev = NULL;
2323
2324 STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, 0);
2325
2326 if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
2327 (void) putq(q, mp);
2328 else if (flag || datamsg(mp->b_datap->db_type))
2329 freemsg(mp);
2330 else
2331 (void) putq(q, mp);
2332 mp = nmp;
2333 }
2334 bpri = 1;
2335 mutex_enter(QLOCK(q));
2336 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2337 if ((qbp->qb_flag & QB_WANTW) &&
2338 (((qbp->qb_count < qbp->qb_lowat) &&
2339 (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
2340 qbp->qb_lowat == 0)) {
2341 qbp->qb_flag &= ~QB_WANTW;
2342 backenab = 1;
2343 qbf[bpri] = 1;
2344 } else
2345 qbf[bpri] = 0;
2346 bpri++;
2347 }
2348 ASSERT(bpri == (unsigned char)(q->q_nband + 1));
2349 if ((q->q_flag & QWANTW) &&
2350 (((q->q_count < q->q_lowat) &&
2351 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2352 q->q_flag &= ~QWANTW;
2353 backenab = 1;
2354 qbf[0] = 1;
2355 } else
2356 qbf[0] = 0;
2357
2358 /*
2359 * If any band can now be written to, and there is a writer
2360 * for that band, then backenable the closest service procedure.
2361 */
2362 if (backenab) {
2363 mutex_exit(QLOCK(q));
2364 for (bpri = q->q_nband; bpri != 0; bpri--)
2365 if (qbf[bpri])
2366 backenable(q, bpri);
2367 if (qbf[0])
2368 backenable(q, 0);
2369 } else
2370 mutex_exit(QLOCK(q));
2371 }
2372
2373 /*
2374 * The real flushing takes place in flushq_common. This is done so that
2375 * a flag which specifies whether or not M_PCPROTO messages should be flushed
2376 * or not. Currently the only place that uses this flag is the stream head.
2377 */
2378 void
flushq(queue_t * q,int flag)2379 flushq(queue_t *q, int flag)
2380 {
2381 flushq_common(q, flag, 0);
2382 }
2383
2384 /*
2385 * Flush the queue of messages of the given priority band.
2386 * There is some duplication of code between flushq and flushband.
2387 * This is because we want to optimize the code as much as possible.
2388 * The assumption is that there will be more messages in the normal
2389 * (priority 0) band than in any other.
2390 *
2391 * Historical note: when merging the M_FLUSH code in strrput with this
2392 * code one difference was discovered. flushband had an extra check for
2393 * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
2394 * case. That check does not match the man page for flushband and was not
2395 * in the strrput flush code hence it was removed.
2396 */
2397 void
flushband(queue_t * q,unsigned char pri,int flag)2398 flushband(queue_t *q, unsigned char pri, int flag)
2399 {
2400 mblk_t *mp;
2401 mblk_t *nmp;
2402 mblk_t *last;
2403 qband_t *qbp;
2404 int band;
2405
2406 ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
2407 if (pri > q->q_nband) {
2408 return;
2409 }
2410 mutex_enter(QLOCK(q));
2411 if (pri == 0) {
2412 mp = q->q_first;
2413 q->q_first = NULL;
2414 q->q_last = NULL;
2415 q->q_count = 0;
2416 q->q_mblkcnt = 0;
2417 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2418 qbp->qb_first = NULL;
2419 qbp->qb_last = NULL;
2420 qbp->qb_count = 0;
2421 qbp->qb_mblkcnt = 0;
2422 qbp->qb_flag &= ~QB_FULL;
2423 }
2424 q->q_flag &= ~QFULL;
2425 mutex_exit(QLOCK(q));
2426 while (mp) {
2427 nmp = mp->b_next;
2428 mp->b_next = mp->b_prev = NULL;
2429 if ((mp->b_band == 0) &&
2430 ((flag == FLUSHALL) ||
2431 datamsg(mp->b_datap->db_type)))
2432 freemsg(mp);
2433 else
2434 (void) putq(q, mp);
2435 mp = nmp;
2436 }
2437 mutex_enter(QLOCK(q));
2438 if ((q->q_flag & QWANTW) &&
2439 (((q->q_count < q->q_lowat) &&
2440 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2441 q->q_flag &= ~QWANTW;
2442 mutex_exit(QLOCK(q));
2443
2444 backenable(q, pri);
2445 } else
2446 mutex_exit(QLOCK(q));
2447 } else { /* pri != 0 */
2448 boolean_t flushed = B_FALSE;
2449 band = pri;
2450
2451 ASSERT(MUTEX_HELD(QLOCK(q)));
2452 qbp = q->q_bandp;
2453 while (--band > 0)
2454 qbp = qbp->qb_next;
2455 mp = qbp->qb_first;
2456 if (mp == NULL) {
2457 mutex_exit(QLOCK(q));
2458 return;
2459 }
2460 last = qbp->qb_last->b_next;
2461 /*
2462 * rmvq_noenab() and freemsg() are called for each mblk that
2463 * meets the criteria. The loop is executed until the last
2464 * mblk has been processed.
2465 */
2466 while (mp != last) {
2467 ASSERT(mp->b_band == pri);
2468 nmp = mp->b_next;
2469 if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
2470 rmvq_noenab(q, mp);
2471 freemsg(mp);
2472 flushed = B_TRUE;
2473 }
2474 mp = nmp;
2475 }
2476 mutex_exit(QLOCK(q));
2477
2478 /*
2479 * If any mblk(s) has been freed, we know that qbackenable()
2480 * will need to be called.
2481 */
2482 if (flushed)
2483 qbackenable(q, pri);
2484 }
2485 }
2486
2487 /*
2488 * Return 1 if the queue is not full. If the queue is full, return
2489 * 0 (may not put message) and set QWANTW flag (caller wants to write
2490 * to the queue).
2491 */
2492 int
canput(queue_t * q)2493 canput(queue_t *q)
2494 {
2495 TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
2496
2497 /* this is for loopback transports, they should not do a canput */
2498 ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
2499
2500 /* Find next forward module that has a service procedure */
2501 q = q->q_nfsrv;
2502
2503 if (!(q->q_flag & QFULL)) {
2504 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2505 return (1);
2506 }
2507 mutex_enter(QLOCK(q));
2508 if (q->q_flag & QFULL) {
2509 q->q_flag |= QWANTW;
2510 mutex_exit(QLOCK(q));
2511 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
2512 return (0);
2513 }
2514 mutex_exit(QLOCK(q));
2515 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2516 return (1);
2517 }
2518
2519 /*
2520 * This is the new canput for use with priority bands. Return 1 if the
2521 * band is not full. If the band is full, return 0 (may not put message)
2522 * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
2523 * write to the queue).
2524 */
2525 int
bcanput(queue_t * q,unsigned char pri)2526 bcanput(queue_t *q, unsigned char pri)
2527 {
2528 qband_t *qbp;
2529
2530 TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
2531 if (!q)
2532 return (0);
2533
2534 /* Find next forward module that has a service procedure */
2535 q = q->q_nfsrv;
2536
2537 mutex_enter(QLOCK(q));
2538 if (pri == 0) {
2539 if (q->q_flag & QFULL) {
2540 q->q_flag |= QWANTW;
2541 mutex_exit(QLOCK(q));
2542 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2543 "bcanput:%p %X %d", q, pri, 0);
2544 return (0);
2545 }
2546 } else { /* pri != 0 */
2547 if (pri > q->q_nband) {
2548 /*
2549 * No band exists yet, so return success.
2550 */
2551 mutex_exit(QLOCK(q));
2552 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2553 "bcanput:%p %X %d", q, pri, 1);
2554 return (1);
2555 }
2556 qbp = q->q_bandp;
2557 while (--pri)
2558 qbp = qbp->qb_next;
2559 if (qbp->qb_flag & QB_FULL) {
2560 qbp->qb_flag |= QB_WANTW;
2561 mutex_exit(QLOCK(q));
2562 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2563 "bcanput:%p %X %d", q, pri, 0);
2564 return (0);
2565 }
2566 }
2567 mutex_exit(QLOCK(q));
2568 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2569 "bcanput:%p %X %d", q, pri, 1);
2570 return (1);
2571 }
2572
2573 /*
2574 * Put a message on a queue.
2575 *
2576 * Messages are enqueued on a priority basis. The priority classes
2577 * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
2578 * and B_NORMAL (type < QPCTL && band == 0).
2579 *
2580 * Add appropriate weighted data block sizes to queue count.
2581 * If queue hits high water mark then set QFULL flag.
2582 *
2583 * If QNOENAB is not set (putq is allowed to enable the queue),
2584 * enable the queue only if the message is PRIORITY,
2585 * or the QWANTR flag is set (indicating that the service procedure
2586 * is ready to read the queue. This implies that a service
2587 * procedure must NEVER put a high priority message back on its own
2588 * queue, as this would result in an infinite loop (!).
2589 */
2590 int
putq(queue_t * q,mblk_t * bp)2591 putq(queue_t *q, mblk_t *bp)
2592 {
2593 mblk_t *tmp;
2594 qband_t *qbp = NULL;
2595 int mcls = (int)queclass(bp);
2596 kthread_id_t freezer;
2597 int bytecnt = 0, mblkcnt = 0;
2598
2599 freezer = STREAM(q)->sd_freezer;
2600 if (freezer == curthread) {
2601 ASSERT(frozenstr(q));
2602 ASSERT(MUTEX_HELD(QLOCK(q)));
2603 } else
2604 mutex_enter(QLOCK(q));
2605
2606 /*
2607 * Make sanity checks and if qband structure is not yet
2608 * allocated, do so.
2609 */
2610 if (mcls == QPCTL) {
2611 if (bp->b_band != 0)
2612 bp->b_band = 0; /* force to be correct */
2613 } else if (bp->b_band != 0) {
2614 int i;
2615 qband_t **qbpp;
2616
2617 if (bp->b_band > q->q_nband) {
2618
2619 /*
2620 * The qband structure for this priority band is
2621 * not on the queue yet, so we have to allocate
2622 * one on the fly. It would be wasteful to
2623 * associate the qband structures with every
2624 * queue when the queues are allocated. This is
2625 * because most queues will only need the normal
2626 * band of flow which can be described entirely
2627 * by the queue itself.
2628 */
2629 qbpp = &q->q_bandp;
2630 while (*qbpp)
2631 qbpp = &(*qbpp)->qb_next;
2632 while (bp->b_band > q->q_nband) {
2633 if ((*qbpp = allocband()) == NULL) {
2634 if (freezer != curthread)
2635 mutex_exit(QLOCK(q));
2636 return (0);
2637 }
2638 (*qbpp)->qb_hiwat = q->q_hiwat;
2639 (*qbpp)->qb_lowat = q->q_lowat;
2640 q->q_nband++;
2641 qbpp = &(*qbpp)->qb_next;
2642 }
2643 }
2644 ASSERT(MUTEX_HELD(QLOCK(q)));
2645 qbp = q->q_bandp;
2646 i = bp->b_band;
2647 while (--i)
2648 qbp = qbp->qb_next;
2649 }
2650
2651 /*
2652 * If queue is empty, add the message and initialize the pointers.
2653 * Otherwise, adjust message pointers and queue pointers based on
2654 * the type of the message and where it belongs on the queue. Some
2655 * code is duplicated to minimize the number of conditionals and
2656 * hopefully minimize the amount of time this routine takes.
2657 */
2658 if (!q->q_first) {
2659 bp->b_next = NULL;
2660 bp->b_prev = NULL;
2661 q->q_first = bp;
2662 q->q_last = bp;
2663 if (qbp) {
2664 qbp->qb_first = bp;
2665 qbp->qb_last = bp;
2666 }
2667 } else if (!qbp) { /* bp->b_band == 0 */
2668
2669 /*
2670 * If queue class of message is less than or equal to
2671 * that of the last one on the queue, tack on to the end.
2672 */
2673 tmp = q->q_last;
2674 if (mcls <= (int)queclass(tmp)) {
2675 bp->b_next = NULL;
2676 bp->b_prev = tmp;
2677 tmp->b_next = bp;
2678 q->q_last = bp;
2679 } else {
2680 tmp = q->q_first;
2681 while ((int)queclass(tmp) >= mcls)
2682 tmp = tmp->b_next;
2683
2684 /*
2685 * Insert bp before tmp.
2686 */
2687 bp->b_next = tmp;
2688 bp->b_prev = tmp->b_prev;
2689 if (tmp->b_prev)
2690 tmp->b_prev->b_next = bp;
2691 else
2692 q->q_first = bp;
2693 tmp->b_prev = bp;
2694 }
2695 } else { /* bp->b_band != 0 */
2696 if (qbp->qb_first) {
2697 tmp = qbp->qb_last;
2698
2699 /*
2700 * Insert bp after the last message in this band.
2701 */
2702 bp->b_next = tmp->b_next;
2703 if (tmp->b_next)
2704 tmp->b_next->b_prev = bp;
2705 else
2706 q->q_last = bp;
2707 bp->b_prev = tmp;
2708 tmp->b_next = bp;
2709 } else {
2710 tmp = q->q_last;
2711 if ((mcls < (int)queclass(tmp)) ||
2712 (bp->b_band <= tmp->b_band)) {
2713
2714 /*
2715 * Tack bp on end of queue.
2716 */
2717 bp->b_next = NULL;
2718 bp->b_prev = tmp;
2719 tmp->b_next = bp;
2720 q->q_last = bp;
2721 } else {
2722 tmp = q->q_first;
2723 while (tmp->b_datap->db_type >= QPCTL)
2724 tmp = tmp->b_next;
2725 while (tmp->b_band >= bp->b_band)
2726 tmp = tmp->b_next;
2727
2728 /*
2729 * Insert bp before tmp.
2730 */
2731 bp->b_next = tmp;
2732 bp->b_prev = tmp->b_prev;
2733 if (tmp->b_prev)
2734 tmp->b_prev->b_next = bp;
2735 else
2736 q->q_first = bp;
2737 tmp->b_prev = bp;
2738 }
2739 qbp->qb_first = bp;
2740 }
2741 qbp->qb_last = bp;
2742 }
2743
2744 /* Get message byte count for q_count accounting */
2745 bytecnt = mp_cont_len(bp, &mblkcnt);
2746
2747 if (qbp) {
2748 qbp->qb_count += bytecnt;
2749 qbp->qb_mblkcnt += mblkcnt;
2750 if ((qbp->qb_count >= qbp->qb_hiwat) ||
2751 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2752 qbp->qb_flag |= QB_FULL;
2753 }
2754 } else {
2755 q->q_count += bytecnt;
2756 q->q_mblkcnt += mblkcnt;
2757 if ((q->q_count >= q->q_hiwat) ||
2758 (q->q_mblkcnt >= q->q_hiwat)) {
2759 q->q_flag |= QFULL;
2760 }
2761 }
2762
2763 STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, 0);
2764
2765 if ((mcls > QNORM) ||
2766 (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
2767 qenable_locked(q);
2768 ASSERT(MUTEX_HELD(QLOCK(q)));
2769 if (freezer != curthread)
2770 mutex_exit(QLOCK(q));
2771
2772 return (1);
2773 }
2774
2775 /*
2776 * Put stuff back at beginning of Q according to priority order.
2777 * See comment on putq above for details.
2778 */
2779 int
putbq(queue_t * q,mblk_t * bp)2780 putbq(queue_t *q, mblk_t *bp)
2781 {
2782 mblk_t *tmp;
2783 qband_t *qbp = NULL;
2784 int mcls = (int)queclass(bp);
2785 kthread_id_t freezer;
2786 int bytecnt = 0, mblkcnt = 0;
2787
2788 ASSERT(q && bp);
2789 ASSERT(bp->b_next == NULL);
2790 freezer = STREAM(q)->sd_freezer;
2791 if (freezer == curthread) {
2792 ASSERT(frozenstr(q));
2793 ASSERT(MUTEX_HELD(QLOCK(q)));
2794 } else
2795 mutex_enter(QLOCK(q));
2796
2797 /*
2798 * Make sanity checks and if qband structure is not yet
2799 * allocated, do so.
2800 */
2801 if (mcls == QPCTL) {
2802 if (bp->b_band != 0)
2803 bp->b_band = 0; /* force to be correct */
2804 } else if (bp->b_band != 0) {
2805 int i;
2806 qband_t **qbpp;
2807
2808 if (bp->b_band > q->q_nband) {
2809 qbpp = &q->q_bandp;
2810 while (*qbpp)
2811 qbpp = &(*qbpp)->qb_next;
2812 while (bp->b_band > q->q_nband) {
2813 if ((*qbpp = allocband()) == NULL) {
2814 if (freezer != curthread)
2815 mutex_exit(QLOCK(q));
2816 return (0);
2817 }
2818 (*qbpp)->qb_hiwat = q->q_hiwat;
2819 (*qbpp)->qb_lowat = q->q_lowat;
2820 q->q_nband++;
2821 qbpp = &(*qbpp)->qb_next;
2822 }
2823 }
2824 qbp = q->q_bandp;
2825 i = bp->b_band;
2826 while (--i)
2827 qbp = qbp->qb_next;
2828 }
2829
2830 /*
2831 * If queue is empty or if message is high priority,
2832 * place on the front of the queue.
2833 */
2834 tmp = q->q_first;
2835 if ((!tmp) || (mcls == QPCTL)) {
2836 bp->b_next = tmp;
2837 if (tmp)
2838 tmp->b_prev = bp;
2839 else
2840 q->q_last = bp;
2841 q->q_first = bp;
2842 bp->b_prev = NULL;
2843 if (qbp) {
2844 qbp->qb_first = bp;
2845 qbp->qb_last = bp;
2846 }
2847 } else if (qbp) { /* bp->b_band != 0 */
2848 tmp = qbp->qb_first;
2849 if (tmp) {
2850
2851 /*
2852 * Insert bp before the first message in this band.
2853 */
2854 bp->b_next = tmp;
2855 bp->b_prev = tmp->b_prev;
2856 if (tmp->b_prev)
2857 tmp->b_prev->b_next = bp;
2858 else
2859 q->q_first = bp;
2860 tmp->b_prev = bp;
2861 } else {
2862 tmp = q->q_last;
2863 if ((mcls < (int)queclass(tmp)) ||
2864 (bp->b_band < tmp->b_band)) {
2865
2866 /*
2867 * Tack bp on end of queue.
2868 */
2869 bp->b_next = NULL;
2870 bp->b_prev = tmp;
2871 tmp->b_next = bp;
2872 q->q_last = bp;
2873 } else {
2874 tmp = q->q_first;
2875 while (tmp->b_datap->db_type >= QPCTL)
2876 tmp = tmp->b_next;
2877 while (tmp->b_band > bp->b_band)
2878 tmp = tmp->b_next;
2879
2880 /*
2881 * Insert bp before tmp.
2882 */
2883 bp->b_next = tmp;
2884 bp->b_prev = tmp->b_prev;
2885 if (tmp->b_prev)
2886 tmp->b_prev->b_next = bp;
2887 else
2888 q->q_first = bp;
2889 tmp->b_prev = bp;
2890 }
2891 qbp->qb_last = bp;
2892 }
2893 qbp->qb_first = bp;
2894 } else { /* bp->b_band == 0 && !QPCTL */
2895
2896 /*
2897 * If the queue class or band is less than that of the last
2898 * message on the queue, tack bp on the end of the queue.
2899 */
2900 tmp = q->q_last;
2901 if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
2902 bp->b_next = NULL;
2903 bp->b_prev = tmp;
2904 tmp->b_next = bp;
2905 q->q_last = bp;
2906 } else {
2907 tmp = q->q_first;
2908 while (tmp->b_datap->db_type >= QPCTL)
2909 tmp = tmp->b_next;
2910 while (tmp->b_band > bp->b_band)
2911 tmp = tmp->b_next;
2912
2913 /*
2914 * Insert bp before tmp.
2915 */
2916 bp->b_next = tmp;
2917 bp->b_prev = tmp->b_prev;
2918 if (tmp->b_prev)
2919 tmp->b_prev->b_next = bp;
2920 else
2921 q->q_first = bp;
2922 tmp->b_prev = bp;
2923 }
2924 }
2925
2926 /* Get message byte count for q_count accounting */
2927 bytecnt = mp_cont_len(bp, &mblkcnt);
2928
2929 if (qbp) {
2930 qbp->qb_count += bytecnt;
2931 qbp->qb_mblkcnt += mblkcnt;
2932 if ((qbp->qb_count >= qbp->qb_hiwat) ||
2933 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2934 qbp->qb_flag |= QB_FULL;
2935 }
2936 } else {
2937 q->q_count += bytecnt;
2938 q->q_mblkcnt += mblkcnt;
2939 if ((q->q_count >= q->q_hiwat) ||
2940 (q->q_mblkcnt >= q->q_hiwat)) {
2941 q->q_flag |= QFULL;
2942 }
2943 }
2944
2945 STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, 0);
2946
2947 if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
2948 qenable_locked(q);
2949 ASSERT(MUTEX_HELD(QLOCK(q)));
2950 if (freezer != curthread)
2951 mutex_exit(QLOCK(q));
2952
2953 return (1);
2954 }
2955
2956 /*
2957 * Insert a message before an existing message on the queue. If the
2958 * existing message is NULL, the new messages is placed on the end of
2959 * the queue. The queue class of the new message is ignored. However,
2960 * the priority band of the new message must adhere to the following
2961 * ordering:
2962 *
2963 * emp->b_prev->b_band >= mp->b_band >= emp->b_band.
2964 *
2965 * All flow control parameters are updated.
2966 *
2967 * insq can be called with the stream frozen, but other utility functions
2968 * holding QLOCK, and by streams modules without any locks/frozen.
2969 */
2970 int
insq(queue_t * q,mblk_t * emp,mblk_t * mp)2971 insq(queue_t *q, mblk_t *emp, mblk_t *mp)
2972 {
2973 mblk_t *tmp;
2974 qband_t *qbp = NULL;
2975 int mcls = (int)queclass(mp);
2976 kthread_id_t freezer;
2977 int bytecnt = 0, mblkcnt = 0;
2978
2979 freezer = STREAM(q)->sd_freezer;
2980 if (freezer == curthread) {
2981 ASSERT(frozenstr(q));
2982 ASSERT(MUTEX_HELD(QLOCK(q)));
2983 } else if (MUTEX_HELD(QLOCK(q))) {
2984 /* Don't drop lock on exit */
2985 freezer = curthread;
2986 } else
2987 mutex_enter(QLOCK(q));
2988
2989 if (mcls == QPCTL) {
2990 if (mp->b_band != 0)
2991 mp->b_band = 0; /* force to be correct */
2992 if (emp && emp->b_prev &&
2993 (emp->b_prev->b_datap->db_type < QPCTL))
2994 goto badord;
2995 }
2996 if (emp) {
2997 if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
2998 (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
2999 (emp->b_prev->b_band < mp->b_band))) {
3000 goto badord;
3001 }
3002 } else {
3003 tmp = q->q_last;
3004 if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
3005 badord:
3006 cmn_err(CE_WARN,
3007 "insq: attempt to insert message out of order "
3008 "on q %p", (void *)q);
3009 if (freezer != curthread)
3010 mutex_exit(QLOCK(q));
3011 return (0);
3012 }
3013 }
3014
3015 if (mp->b_band != 0) {
3016 int i;
3017 qband_t **qbpp;
3018
3019 if (mp->b_band > q->q_nband) {
3020 qbpp = &q->q_bandp;
3021 while (*qbpp)
3022 qbpp = &(*qbpp)->qb_next;
3023 while (mp->b_band > q->q_nband) {
3024 if ((*qbpp = allocband()) == NULL) {
3025 if (freezer != curthread)
3026 mutex_exit(QLOCK(q));
3027 return (0);
3028 }
3029 (*qbpp)->qb_hiwat = q->q_hiwat;
3030 (*qbpp)->qb_lowat = q->q_lowat;
3031 q->q_nband++;
3032 qbpp = &(*qbpp)->qb_next;
3033 }
3034 }
3035 qbp = q->q_bandp;
3036 i = mp->b_band;
3037 while (--i)
3038 qbp = qbp->qb_next;
3039 }
3040
3041 if ((mp->b_next = emp) != NULL) {
3042 if ((mp->b_prev = emp->b_prev) != NULL)
3043 emp->b_prev->b_next = mp;
3044 else
3045 q->q_first = mp;
3046 emp->b_prev = mp;
3047 } else {
3048 if ((mp->b_prev = q->q_last) != NULL)
3049 q->q_last->b_next = mp;
3050 else
3051 q->q_first = mp;
3052 q->q_last = mp;
3053 }
3054
3055 /* Get mblk and byte count for q_count accounting */
3056 bytecnt = mp_cont_len(mp, &mblkcnt);
3057
3058 if (qbp) { /* adjust qband pointers and count */
3059 if (!qbp->qb_first) {
3060 qbp->qb_first = mp;
3061 qbp->qb_last = mp;
3062 } else {
3063 if (mp->b_prev == NULL || (mp->b_prev != NULL &&
3064 (mp->b_prev->b_band != mp->b_band)))
3065 qbp->qb_first = mp;
3066 else if (mp->b_next == NULL || (mp->b_next != NULL &&
3067 (mp->b_next->b_band != mp->b_band)))
3068 qbp->qb_last = mp;
3069 }
3070 qbp->qb_count += bytecnt;
3071 qbp->qb_mblkcnt += mblkcnt;
3072 if ((qbp->qb_count >= qbp->qb_hiwat) ||
3073 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
3074 qbp->qb_flag |= QB_FULL;
3075 }
3076 } else {
3077 q->q_count += bytecnt;
3078 q->q_mblkcnt += mblkcnt;
3079 if ((q->q_count >= q->q_hiwat) ||
3080 (q->q_mblkcnt >= q->q_hiwat)) {
3081 q->q_flag |= QFULL;
3082 }
3083 }
3084
3085 STR_FTEVENT_MSG(mp, q, FTEV_INSQ, 0);
3086
3087 if (canenable(q) && (q->q_flag & QWANTR))
3088 qenable_locked(q);
3089
3090 ASSERT(MUTEX_HELD(QLOCK(q)));
3091 if (freezer != curthread)
3092 mutex_exit(QLOCK(q));
3093
3094 return (1);
3095 }
3096
3097 /*
3098 * Create and put a control message on queue.
3099 */
3100 int
putctl(queue_t * q,int type)3101 putctl(queue_t *q, int type)
3102 {
3103 mblk_t *bp;
3104
3105 if ((datamsg(type) && (type != M_DELAY)) ||
3106 (bp = allocb_tryhard(0)) == NULL)
3107 return (0);
3108 bp->b_datap->db_type = (unsigned char) type;
3109
3110 put(q, bp);
3111
3112 return (1);
3113 }
3114
3115 /*
3116 * Control message with a single-byte parameter
3117 */
3118 int
putctl1(queue_t * q,int type,int param)3119 putctl1(queue_t *q, int type, int param)
3120 {
3121 mblk_t *bp;
3122
3123 if ((datamsg(type) && (type != M_DELAY)) ||
3124 (bp = allocb_tryhard(1)) == NULL)
3125 return (0);
3126 bp->b_datap->db_type = (unsigned char)type;
3127 *bp->b_wptr++ = (unsigned char)param;
3128
3129 put(q, bp);
3130
3131 return (1);
3132 }
3133
3134 int
putnextctl1(queue_t * q,int type,int param)3135 putnextctl1(queue_t *q, int type, int param)
3136 {
3137 mblk_t *bp;
3138
3139 if ((datamsg(type) && (type != M_DELAY)) ||
3140 ((bp = allocb_tryhard(1)) == NULL))
3141 return (0);
3142
3143 bp->b_datap->db_type = (unsigned char)type;
3144 *bp->b_wptr++ = (unsigned char)param;
3145
3146 putnext(q, bp);
3147
3148 return (1);
3149 }
3150
3151 int
putnextctl(queue_t * q,int type)3152 putnextctl(queue_t *q, int type)
3153 {
3154 mblk_t *bp;
3155
3156 if ((datamsg(type) && (type != M_DELAY)) ||
3157 ((bp = allocb_tryhard(0)) == NULL))
3158 return (0);
3159 bp->b_datap->db_type = (unsigned char)type;
3160
3161 putnext(q, bp);
3162
3163 return (1);
3164 }
3165
3166 /*
3167 * Return the queue upstream from this one
3168 */
3169 queue_t *
backq(queue_t * q)3170 backq(queue_t *q)
3171 {
3172 q = _OTHERQ(q);
3173 if (q->q_next) {
3174 q = q->q_next;
3175 return (_OTHERQ(q));
3176 }
3177 return (NULL);
3178 }
3179
3180 /*
3181 * Send a block back up the queue in reverse from this
3182 * one (e.g. to respond to ioctls)
3183 */
3184 void
qreply(queue_t * q,mblk_t * bp)3185 qreply(queue_t *q, mblk_t *bp)
3186 {
3187 ASSERT(q && bp);
3188
3189 putnext(_OTHERQ(q), bp);
3190 }
3191
3192 /*
3193 * Streams Queue Scheduling
3194 *
3195 * Queues are enabled through qenable() when they have messages to
3196 * process. They are serviced by queuerun(), which runs each enabled
3197 * queue's service procedure. The call to queuerun() is processor
3198 * dependent - the general principle is that it be run whenever a queue
3199 * is enabled but before returning to user level. For system calls,
3200 * the function runqueues() is called if their action causes a queue
3201 * to be enabled. For device interrupts, queuerun() should be
3202 * called before returning from the last level of interrupt. Beyond
3203 * this, no timing assumptions should be made about queue scheduling.
3204 */
3205
3206 /*
3207 * Enable a queue: put it on list of those whose service procedures are
3208 * ready to run and set up the scheduling mechanism.
3209 * The broadcast is done outside the mutex -> to avoid the woken thread
3210 * from contending with the mutex. This is OK 'cos the queue has been
3211 * enqueued on the runlist and flagged safely at this point.
3212 */
3213 void
qenable(queue_t * q)3214 qenable(queue_t *q)
3215 {
3216 mutex_enter(QLOCK(q));
3217 qenable_locked(q);
3218 mutex_exit(QLOCK(q));
3219 }
3220 /*
3221 * Return number of messages on queue
3222 */
3223 int
qsize(queue_t * qp)3224 qsize(queue_t *qp)
3225 {
3226 int count = 0;
3227 mblk_t *mp;
3228
3229 mutex_enter(QLOCK(qp));
3230 for (mp = qp->q_first; mp; mp = mp->b_next)
3231 count++;
3232 mutex_exit(QLOCK(qp));
3233 return (count);
3234 }
3235
3236 /*
3237 * noenable - set queue so that putq() will not enable it.
3238 * enableok - set queue so that putq() can enable it.
3239 */
3240 void
noenable(queue_t * q)3241 noenable(queue_t *q)
3242 {
3243 mutex_enter(QLOCK(q));
3244 q->q_flag |= QNOENB;
3245 mutex_exit(QLOCK(q));
3246 }
3247
3248 void
enableok(queue_t * q)3249 enableok(queue_t *q)
3250 {
3251 mutex_enter(QLOCK(q));
3252 q->q_flag &= ~QNOENB;
3253 mutex_exit(QLOCK(q));
3254 }
3255
3256 /*
3257 * Set queue fields.
3258 */
3259 int
strqset(queue_t * q,qfields_t what,unsigned char pri,intptr_t val)3260 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
3261 {
3262 qband_t *qbp = NULL;
3263 queue_t *wrq;
3264 int error = 0;
3265 kthread_id_t freezer;
3266
3267 freezer = STREAM(q)->sd_freezer;
3268 if (freezer == curthread) {
3269 ASSERT(frozenstr(q));
3270 ASSERT(MUTEX_HELD(QLOCK(q)));
3271 } else
3272 mutex_enter(QLOCK(q));
3273
3274 if (what >= QBAD) {
3275 error = EINVAL;
3276 goto done;
3277 }
3278 if (pri != 0) {
3279 int i;
3280 qband_t **qbpp;
3281
3282 if (pri > q->q_nband) {
3283 qbpp = &q->q_bandp;
3284 while (*qbpp)
3285 qbpp = &(*qbpp)->qb_next;
3286 while (pri > q->q_nband) {
3287 if ((*qbpp = allocband()) == NULL) {
3288 error = EAGAIN;
3289 goto done;
3290 }
3291 (*qbpp)->qb_hiwat = q->q_hiwat;
3292 (*qbpp)->qb_lowat = q->q_lowat;
3293 q->q_nband++;
3294 qbpp = &(*qbpp)->qb_next;
3295 }
3296 }
3297 qbp = q->q_bandp;
3298 i = pri;
3299 while (--i)
3300 qbp = qbp->qb_next;
3301 }
3302 switch (what) {
3303
3304 case QHIWAT:
3305 if (qbp)
3306 qbp->qb_hiwat = (size_t)val;
3307 else
3308 q->q_hiwat = (size_t)val;
3309 break;
3310
3311 case QLOWAT:
3312 if (qbp)
3313 qbp->qb_lowat = (size_t)val;
3314 else
3315 q->q_lowat = (size_t)val;
3316 break;
3317
3318 case QMAXPSZ:
3319 if (qbp)
3320 error = EINVAL;
3321 else
3322 q->q_maxpsz = (ssize_t)val;
3323
3324 /*
3325 * Performance concern, strwrite looks at the module below
3326 * the stream head for the maxpsz each time it does a write
3327 * we now cache it at the stream head. Check to see if this
3328 * queue is sitting directly below the stream head.
3329 */
3330 wrq = STREAM(q)->sd_wrq;
3331 if (q != wrq->q_next)
3332 break;
3333
3334 /*
3335 * If the stream is not frozen drop the current QLOCK and
3336 * acquire the sd_wrq QLOCK which protects sd_qn_*
3337 */
3338 if (freezer != curthread) {
3339 mutex_exit(QLOCK(q));
3340 mutex_enter(QLOCK(wrq));
3341 }
3342 ASSERT(MUTEX_HELD(QLOCK(wrq)));
3343
3344 if (strmsgsz != 0) {
3345 if (val == INFPSZ)
3346 val = strmsgsz;
3347 else {
3348 if (STREAM(q)->sd_vnode->v_type == VFIFO)
3349 val = MIN(PIPE_BUF, val);
3350 else
3351 val = MIN(strmsgsz, val);
3352 }
3353 }
3354 STREAM(q)->sd_qn_maxpsz = val;
3355 if (freezer != curthread) {
3356 mutex_exit(QLOCK(wrq));
3357 mutex_enter(QLOCK(q));
3358 }
3359 break;
3360
3361 case QMINPSZ:
3362 if (qbp)
3363 error = EINVAL;
3364 else
3365 q->q_minpsz = (ssize_t)val;
3366
3367 /*
3368 * Performance concern, strwrite looks at the module below
3369 * the stream head for the maxpsz each time it does a write
3370 * we now cache it at the stream head. Check to see if this
3371 * queue is sitting directly below the stream head.
3372 */
3373 wrq = STREAM(q)->sd_wrq;
3374 if (q != wrq->q_next)
3375 break;
3376
3377 /*
3378 * If the stream is not frozen drop the current QLOCK and
3379 * acquire the sd_wrq QLOCK which protects sd_qn_*
3380 */
3381 if (freezer != curthread) {
3382 mutex_exit(QLOCK(q));
3383 mutex_enter(QLOCK(wrq));
3384 }
3385 STREAM(q)->sd_qn_minpsz = (ssize_t)val;
3386
3387 if (freezer != curthread) {
3388 mutex_exit(QLOCK(wrq));
3389 mutex_enter(QLOCK(q));
3390 }
3391 break;
3392
3393 case QSTRUIOT:
3394 if (qbp)
3395 error = EINVAL;
3396 else
3397 q->q_struiot = (ushort_t)val;
3398 break;
3399
3400 case QCOUNT:
3401 case QFIRST:
3402 case QLAST:
3403 case QFLAG:
3404 error = EPERM;
3405 break;
3406
3407 default:
3408 error = EINVAL;
3409 break;
3410 }
3411 done:
3412 if (freezer != curthread)
3413 mutex_exit(QLOCK(q));
3414 return (error);
3415 }
3416
3417 /*
3418 * Get queue fields.
3419 */
3420 int
strqget(queue_t * q,qfields_t what,unsigned char pri,void * valp)3421 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
3422 {
3423 qband_t *qbp = NULL;
3424 int error = 0;
3425 kthread_id_t freezer;
3426
3427 freezer = STREAM(q)->sd_freezer;
3428 if (freezer == curthread) {
3429 ASSERT(frozenstr(q));
3430 ASSERT(MUTEX_HELD(QLOCK(q)));
3431 } else
3432 mutex_enter(QLOCK(q));
3433 if (what >= QBAD) {
3434 error = EINVAL;
3435 goto done;
3436 }
3437 if (pri != 0) {
3438 int i;
3439 qband_t **qbpp;
3440
3441 if (pri > q->q_nband) {
3442 qbpp = &q->q_bandp;
3443 while (*qbpp)
3444 qbpp = &(*qbpp)->qb_next;
3445 while (pri > q->q_nband) {
3446 if ((*qbpp = allocband()) == NULL) {
3447 error = EAGAIN;
3448 goto done;
3449 }
3450 (*qbpp)->qb_hiwat = q->q_hiwat;
3451 (*qbpp)->qb_lowat = q->q_lowat;
3452 q->q_nband++;
3453 qbpp = &(*qbpp)->qb_next;
3454 }
3455 }
3456 qbp = q->q_bandp;
3457 i = pri;
3458 while (--i)
3459 qbp = qbp->qb_next;
3460 }
3461 switch (what) {
3462 case QHIWAT:
3463 if (qbp)
3464 *(size_t *)valp = qbp->qb_hiwat;
3465 else
3466 *(size_t *)valp = q->q_hiwat;
3467 break;
3468
3469 case QLOWAT:
3470 if (qbp)
3471 *(size_t *)valp = qbp->qb_lowat;
3472 else
3473 *(size_t *)valp = q->q_lowat;
3474 break;
3475
3476 case QMAXPSZ:
3477 if (qbp)
3478 error = EINVAL;
3479 else
3480 *(ssize_t *)valp = q->q_maxpsz;
3481 break;
3482
3483 case QMINPSZ:
3484 if (qbp)
3485 error = EINVAL;
3486 else
3487 *(ssize_t *)valp = q->q_minpsz;
3488 break;
3489
3490 case QCOUNT:
3491 if (qbp)
3492 *(size_t *)valp = qbp->qb_count;
3493 else
3494 *(size_t *)valp = q->q_count;
3495 break;
3496
3497 case QFIRST:
3498 if (qbp)
3499 *(mblk_t **)valp = qbp->qb_first;
3500 else
3501 *(mblk_t **)valp = q->q_first;
3502 break;
3503
3504 case QLAST:
3505 if (qbp)
3506 *(mblk_t **)valp = qbp->qb_last;
3507 else
3508 *(mblk_t **)valp = q->q_last;
3509 break;
3510
3511 case QFLAG:
3512 if (qbp)
3513 *(uint_t *)valp = qbp->qb_flag;
3514 else
3515 *(uint_t *)valp = q->q_flag;
3516 break;
3517
3518 case QSTRUIOT:
3519 if (qbp)
3520 error = EINVAL;
3521 else
3522 *(short *)valp = q->q_struiot;
3523 break;
3524
3525 default:
3526 error = EINVAL;
3527 break;
3528 }
3529 done:
3530 if (freezer != curthread)
3531 mutex_exit(QLOCK(q));
3532 return (error);
3533 }
3534
3535 /*
3536 * Function awakes all in cvwait/sigwait/pollwait, on one of:
3537 * QWANTWSYNC or QWANTR or QWANTW,
3538 *
3539 * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
3540 * deferred wakeup will be done. Also if strpoll() in progress then a
3541 * deferred pollwakeup will be done.
3542 */
3543 void
strwakeq(queue_t * q,int flag)3544 strwakeq(queue_t *q, int flag)
3545 {
3546 stdata_t *stp = STREAM(q);
3547 pollhead_t *pl;
3548
3549 mutex_enter(&stp->sd_lock);
3550 pl = &stp->sd_pollist;
3551 if (flag & QWANTWSYNC) {
3552 ASSERT(!(q->q_flag & QREADR));
3553 if (stp->sd_flag & WSLEEP) {
3554 stp->sd_flag &= ~WSLEEP;
3555 cv_broadcast(&stp->sd_wrq->q_wait);
3556 } else {
3557 stp->sd_wakeq |= WSLEEP;
3558 }
3559
3560 mutex_exit(&stp->sd_lock);
3561 pollwakeup(pl, POLLWRNORM);
3562 mutex_enter(&stp->sd_lock);
3563
3564 if (stp->sd_sigflags & S_WRNORM)
3565 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3566 } else if (flag & QWANTR) {
3567 if (stp->sd_flag & RSLEEP) {
3568 stp->sd_flag &= ~RSLEEP;
3569 cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
3570 } else {
3571 stp->sd_wakeq |= RSLEEP;
3572 }
3573
3574 mutex_exit(&stp->sd_lock);
3575 pollwakeup(pl, POLLIN | POLLRDNORM);
3576 mutex_enter(&stp->sd_lock);
3577
3578 {
3579 int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
3580
3581 if (events)
3582 strsendsig(stp->sd_siglist, events, 0, 0);
3583 }
3584 } else {
3585 if (stp->sd_flag & WSLEEP) {
3586 stp->sd_flag &= ~WSLEEP;
3587 cv_broadcast(&stp->sd_wrq->q_wait);
3588 }
3589
3590 mutex_exit(&stp->sd_lock);
3591 pollwakeup(pl, POLLWRNORM);
3592 mutex_enter(&stp->sd_lock);
3593
3594 if (stp->sd_sigflags & S_WRNORM)
3595 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3596 }
3597 mutex_exit(&stp->sd_lock);
3598 }
3599
3600 int
struioget(queue_t * q,mblk_t * mp,struiod_t * dp,int noblock)3601 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
3602 {
3603 stdata_t *stp = STREAM(q);
3604 int typ = STRUIOT_STANDARD;
3605 uio_t *uiop = &dp->d_uio;
3606 dblk_t *dbp;
3607 ssize_t uiocnt;
3608 ssize_t cnt;
3609 unsigned char *ptr;
3610 ssize_t resid;
3611 int error = 0;
3612 on_trap_data_t otd;
3613 queue_t *stwrq;
3614
3615 /*
3616 * Plumbing may change while taking the type so store the
3617 * queue in a temporary variable. It doesn't matter even
3618 * if the we take the type from the previous plumbing,
3619 * that's because if the plumbing has changed when we were
3620 * holding the queue in a temporary variable, we can continue
3621 * processing the message the way it would have been processed
3622 * in the old plumbing, without any side effects but a bit
3623 * extra processing for partial ip header checksum.
3624 *
3625 * This has been done to avoid holding the sd_lock which is
3626 * very hot.
3627 */
3628
3629 stwrq = stp->sd_struiowrq;
3630 if (stwrq)
3631 typ = stwrq->q_struiot;
3632
3633 for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
3634 dbp = mp->b_datap;
3635 ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
3636 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3637 cnt = MIN(uiocnt, uiop->uio_resid);
3638 if (!(dbp->db_struioflag & STRUIO_SPEC) ||
3639 (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
3640 /*
3641 * Either this mblk has already been processed
3642 * or there is no more room in this mblk (?).
3643 */
3644 continue;
3645 }
3646 switch (typ) {
3647 case STRUIOT_STANDARD:
3648 if (noblock) {
3649 if (on_trap(&otd, OT_DATA_ACCESS)) {
3650 no_trap();
3651 error = EWOULDBLOCK;
3652 goto out;
3653 }
3654 }
3655 if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
3656 if (noblock)
3657 no_trap();
3658 goto out;
3659 }
3660 if (noblock)
3661 no_trap();
3662 break;
3663
3664 default:
3665 error = EIO;
3666 goto out;
3667 }
3668 dbp->db_struioflag |= STRUIO_DONE;
3669 dbp->db_cksumstuff += cnt;
3670 }
3671 out:
3672 if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
3673 /*
3674 * A fault has occured and some bytes were moved to the
3675 * current mblk, the uio_t has already been updated by
3676 * the appropriate uio routine, so also update the mblk
3677 * to reflect this in case this same mblk chain is used
3678 * again (after the fault has been handled).
3679 */
3680 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3681 if (uiocnt >= resid)
3682 dbp->db_cksumstuff += resid;
3683 }
3684 return (error);
3685 }
3686
3687 /*
3688 * Try to enter queue synchronously. Any attempt to enter a closing queue will
3689 * fails. The qp->q_rwcnt keeps track of the number of successful entries so
3690 * that removeq() will not try to close the queue while a thread is inside the
3691 * queue.
3692 */
3693 static boolean_t
rwnext_enter(queue_t * qp)3694 rwnext_enter(queue_t *qp)
3695 {
3696 mutex_enter(QLOCK(qp));
3697 if (qp->q_flag & QWCLOSE) {
3698 mutex_exit(QLOCK(qp));
3699 return (B_FALSE);
3700 }
3701 qp->q_rwcnt++;
3702 ASSERT(qp->q_rwcnt != 0);
3703 mutex_exit(QLOCK(qp));
3704 return (B_TRUE);
3705 }
3706
3707 /*
3708 * Decrease the count of threads running in sync stream queue and wake up any
3709 * threads blocked in removeq().
3710 */
3711 static void
rwnext_exit(queue_t * qp)3712 rwnext_exit(queue_t *qp)
3713 {
3714 mutex_enter(QLOCK(qp));
3715 qp->q_rwcnt--;
3716 if (qp->q_flag & QWANTRMQSYNC) {
3717 qp->q_flag &= ~QWANTRMQSYNC;
3718 cv_broadcast(&qp->q_wait);
3719 }
3720 mutex_exit(QLOCK(qp));
3721 }
3722
3723 /*
3724 * The purpose of rwnext() is to call the rw procedure of the next
3725 * (downstream) modules queue.
3726 *
3727 * treated as put entrypoint for perimeter syncronization.
3728 *
3729 * There's no need to grab sq_putlocks here (which only exist for CIPUT
3730 * sync queues). If it is CIPUT sync queue sq_count is incremented and it does
3731 * not matter if any regular put entrypoints have been already entered. We
3732 * can't increment one of the sq_putcounts (instead of sq_count) because
3733 * qwait_rw won't know which counter to decrement.
3734 *
3735 * It would be reasonable to add the lockless FASTPUT logic.
3736 */
3737 int
rwnext(queue_t * qp,struiod_t * dp)3738 rwnext(queue_t *qp, struiod_t *dp)
3739 {
3740 queue_t *nqp;
3741 syncq_t *sq;
3742 uint16_t count;
3743 uint16_t flags;
3744 struct qinit *qi;
3745 int (*proc)();
3746 struct stdata *stp;
3747 int isread;
3748 int rval;
3749
3750 stp = STREAM(qp);
3751 /*
3752 * Prevent q_next from changing by holding sd_lock until acquiring
3753 * SQLOCK. Note that a read-side rwnext from the streamhead will
3754 * already have sd_lock acquired. In either case sd_lock is always
3755 * released after acquiring SQLOCK.
3756 *
3757 * The streamhead read-side holding sd_lock when calling rwnext is
3758 * required to prevent a race condition were M_DATA mblks flowing
3759 * up the read-side of the stream could be bypassed by a rwnext()
3760 * down-call. In this case sd_lock acts as the streamhead perimeter.
3761 */
3762 if ((nqp = _WR(qp)) == qp) {
3763 isread = 0;
3764 mutex_enter(&stp->sd_lock);
3765 qp = nqp->q_next;
3766 } else {
3767 isread = 1;
3768 if (nqp != stp->sd_wrq)
3769 /* Not streamhead */
3770 mutex_enter(&stp->sd_lock);
3771 qp = _RD(nqp->q_next);
3772 }
3773 qi = qp->q_qinfo;
3774 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
3775 /*
3776 * Not a synchronous module or no r/w procedure for this
3777 * queue, so just return EINVAL and let the caller handle it.
3778 */
3779 mutex_exit(&stp->sd_lock);
3780 return (EINVAL);
3781 }
3782
3783 if (rwnext_enter(qp) == B_FALSE) {
3784 mutex_exit(&stp->sd_lock);
3785 return (EINVAL);
3786 }
3787
3788 sq = qp->q_syncq;
3789 mutex_enter(SQLOCK(sq));
3790 mutex_exit(&stp->sd_lock);
3791 count = sq->sq_count;
3792 flags = sq->sq_flags;
3793 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3794
3795 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3796 /*
3797 * if this queue is being closed, return.
3798 */
3799 if (qp->q_flag & QWCLOSE) {
3800 mutex_exit(SQLOCK(sq));
3801 rwnext_exit(qp);
3802 return (EINVAL);
3803 }
3804
3805 /*
3806 * Wait until we can enter the inner perimeter.
3807 */
3808 sq->sq_flags = flags | SQ_WANTWAKEUP;
3809 cv_wait(&sq->sq_wait, SQLOCK(sq));
3810 count = sq->sq_count;
3811 flags = sq->sq_flags;
3812 }
3813
3814 if (isread == 0 && stp->sd_struiowrq == NULL ||
3815 isread == 1 && stp->sd_struiordq == NULL) {
3816 /*
3817 * Stream plumbing changed while waiting for inner perimeter
3818 * so just return EINVAL and let the caller handle it.
3819 */
3820 mutex_exit(SQLOCK(sq));
3821 rwnext_exit(qp);
3822 return (EINVAL);
3823 }
3824 if (!(flags & SQ_CIPUT))
3825 sq->sq_flags = flags | SQ_EXCL;
3826 sq->sq_count = count + 1;
3827 ASSERT(sq->sq_count != 0); /* Wraparound */
3828 /*
3829 * Note: The only message ordering guarantee that rwnext() makes is
3830 * for the write queue flow-control case. All others (r/w queue
3831 * with q_count > 0 (or q_first != 0)) are the resposibilty of
3832 * the queue's rw procedure. This could be genralized here buy
3833 * running the queue's service procedure, but that wouldn't be
3834 * the most efficent for all cases.
3835 */
3836 mutex_exit(SQLOCK(sq));
3837 if (! isread && (qp->q_flag & QFULL)) {
3838 /*
3839 * Write queue may be flow controlled. If so,
3840 * mark the queue for wakeup when it's not.
3841 */
3842 mutex_enter(QLOCK(qp));
3843 if (qp->q_flag & QFULL) {
3844 qp->q_flag |= QWANTWSYNC;
3845 mutex_exit(QLOCK(qp));
3846 rval = EWOULDBLOCK;
3847 goto out;
3848 }
3849 mutex_exit(QLOCK(qp));
3850 }
3851
3852 if (! isread && dp->d_mp)
3853 STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
3854 dp->d_mp->b_datap->db_base);
3855
3856 rval = (*proc)(qp, dp);
3857
3858 if (isread && dp->d_mp)
3859 STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
3860 dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
3861 out:
3862 /*
3863 * The queue is protected from being freed by sq_count, so it is
3864 * safe to call rwnext_exit and reacquire SQLOCK(sq).
3865 */
3866 rwnext_exit(qp);
3867
3868 mutex_enter(SQLOCK(sq));
3869 flags = sq->sq_flags;
3870 ASSERT(sq->sq_count != 0);
3871 sq->sq_count--;
3872 if (flags & SQ_TAIL) {
3873 putnext_tail(sq, qp, flags);
3874 /*
3875 * The only purpose of this ASSERT is to preserve calling stack
3876 * in DEBUG kernel.
3877 */
3878 ASSERT(flags & SQ_TAIL);
3879 return (rval);
3880 }
3881 ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3882 /*
3883 * Safe to always drop SQ_EXCL:
3884 * Not SQ_CIPUT means we set SQ_EXCL above
3885 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3886 * did a qwriter(INNER) in which case nobody else
3887 * is in the inner perimeter and we are exiting.
3888 *
3889 * I would like to make the following assertion:
3890 *
3891 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3892 * sq->sq_count == 0);
3893 *
3894 * which indicates that if we are both putshared and exclusive,
3895 * we became exclusive while executing the putproc, and the only
3896 * claim on the syncq was the one we dropped a few lines above.
3897 * But other threads that enter putnext while the syncq is exclusive
3898 * need to make a claim as they may need to drop SQLOCK in the
3899 * has_writers case to avoid deadlocks. If these threads are
3900 * delayed or preempted, it is possible that the writer thread can
3901 * find out that there are other claims making the (sq_count == 0)
3902 * test invalid.
3903 */
3904
3905 sq->sq_flags = flags & ~SQ_EXCL;
3906 if (sq->sq_flags & SQ_WANTWAKEUP) {
3907 sq->sq_flags &= ~SQ_WANTWAKEUP;
3908 cv_broadcast(&sq->sq_wait);
3909 }
3910 mutex_exit(SQLOCK(sq));
3911 return (rval);
3912 }
3913
3914 /*
3915 * The purpose of infonext() is to call the info procedure of the next
3916 * (downstream) modules queue.
3917 *
3918 * treated as put entrypoint for perimeter syncronization.
3919 *
3920 * There's no need to grab sq_putlocks here (which only exist for CIPUT
3921 * sync queues). If it is CIPUT sync queue regular sq_count is incremented and
3922 * it does not matter if any regular put entrypoints have been already
3923 * entered.
3924 */
3925 int
infonext(queue_t * qp,infod_t * idp)3926 infonext(queue_t *qp, infod_t *idp)
3927 {
3928 queue_t *nqp;
3929 syncq_t *sq;
3930 uint16_t count;
3931 uint16_t flags;
3932 struct qinit *qi;
3933 int (*proc)();
3934 struct stdata *stp;
3935 int rval;
3936
3937 stp = STREAM(qp);
3938 /*
3939 * Prevent q_next from changing by holding sd_lock until
3940 * acquiring SQLOCK.
3941 */
3942 mutex_enter(&stp->sd_lock);
3943 if ((nqp = _WR(qp)) == qp) {
3944 qp = nqp->q_next;
3945 } else {
3946 qp = _RD(nqp->q_next);
3947 }
3948 qi = qp->q_qinfo;
3949 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
3950 mutex_exit(&stp->sd_lock);
3951 return (EINVAL);
3952 }
3953 sq = qp->q_syncq;
3954 mutex_enter(SQLOCK(sq));
3955 mutex_exit(&stp->sd_lock);
3956 count = sq->sq_count;
3957 flags = sq->sq_flags;
3958 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3959
3960 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3961 /*
3962 * Wait until we can enter the inner perimeter.
3963 */
3964 sq->sq_flags = flags | SQ_WANTWAKEUP;
3965 cv_wait(&sq->sq_wait, SQLOCK(sq));
3966 count = sq->sq_count;
3967 flags = sq->sq_flags;
3968 }
3969
3970 if (! (flags & SQ_CIPUT))
3971 sq->sq_flags = flags | SQ_EXCL;
3972 sq->sq_count = count + 1;
3973 ASSERT(sq->sq_count != 0); /* Wraparound */
3974 mutex_exit(SQLOCK(sq));
3975
3976 rval = (*proc)(qp, idp);
3977
3978 mutex_enter(SQLOCK(sq));
3979 flags = sq->sq_flags;
3980 ASSERT(sq->sq_count != 0);
3981 sq->sq_count--;
3982 if (flags & SQ_TAIL) {
3983 putnext_tail(sq, qp, flags);
3984 /*
3985 * The only purpose of this ASSERT is to preserve calling stack
3986 * in DEBUG kernel.
3987 */
3988 ASSERT(flags & SQ_TAIL);
3989 return (rval);
3990 }
3991 ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3992 /*
3993 * XXXX
3994 * I am not certain the next comment is correct here. I need to consider
3995 * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
3996 * might cause other problems. It just might be safer to drop it if
3997 * !SQ_CIPUT because that is when we set it.
3998 */
3999 /*
4000 * Safe to always drop SQ_EXCL:
4001 * Not SQ_CIPUT means we set SQ_EXCL above
4002 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure
4003 * did a qwriter(INNER) in which case nobody else
4004 * is in the inner perimeter and we are exiting.
4005 *
4006 * I would like to make the following assertion:
4007 *
4008 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
4009 * sq->sq_count == 0);
4010 *
4011 * which indicates that if we are both putshared and exclusive,
4012 * we became exclusive while executing the putproc, and the only
4013 * claim on the syncq was the one we dropped a few lines above.
4014 * But other threads that enter putnext while the syncq is exclusive
4015 * need to make a claim as they may need to drop SQLOCK in the
4016 * has_writers case to avoid deadlocks. If these threads are
4017 * delayed or preempted, it is possible that the writer thread can
4018 * find out that there are other claims making the (sq_count == 0)
4019 * test invalid.
4020 */
4021
4022 sq->sq_flags = flags & ~SQ_EXCL;
4023 mutex_exit(SQLOCK(sq));
4024 return (rval);
4025 }
4026
4027 /*
4028 * Return nonzero if the queue is responsible for struio(), else return 0.
4029 */
4030 int
isuioq(queue_t * q)4031 isuioq(queue_t *q)
4032 {
4033 if (q->q_flag & QREADR)
4034 return (STREAM(q)->sd_struiordq == q);
4035 else
4036 return (STREAM(q)->sd_struiowrq == q);
4037 }
4038
4039 #if defined(__sparc)
4040 int disable_putlocks = 0;
4041 #else
4042 int disable_putlocks = 1;
4043 #endif
4044
4045 /*
4046 * called by create_putlock.
4047 */
4048 static void
create_syncq_putlocks(queue_t * q)4049 create_syncq_putlocks(queue_t *q)
4050 {
4051 syncq_t *sq = q->q_syncq;
4052 ciputctrl_t *cip;
4053 int i;
4054
4055 ASSERT(sq != NULL);
4056
4057 ASSERT(disable_putlocks == 0);
4058 ASSERT(n_ciputctrl >= min_n_ciputctrl);
4059 ASSERT(ciputctrl_cache != NULL);
4060
4061 if (!(sq->sq_type & SQ_CIPUT))
4062 return;
4063
4064 for (i = 0; i <= 1; i++) {
4065 if (sq->sq_ciputctrl == NULL) {
4066 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4067 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4068 mutex_enter(SQLOCK(sq));
4069 if (sq->sq_ciputctrl != NULL) {
4070 mutex_exit(SQLOCK(sq));
4071 kmem_cache_free(ciputctrl_cache, cip);
4072 } else {
4073 ASSERT(sq->sq_nciputctrl == 0);
4074 sq->sq_nciputctrl = n_ciputctrl - 1;
4075 /*
4076 * putnext checks sq_ciputctrl without holding
4077 * SQLOCK. if it is not NULL putnext assumes
4078 * sq_nciputctrl is initialized. membar below
4079 * insures that.
4080 */
4081 membar_producer();
4082 sq->sq_ciputctrl = cip;
4083 mutex_exit(SQLOCK(sq));
4084 }
4085 }
4086 ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
4087 if (i == 1)
4088 break;
4089 q = _OTHERQ(q);
4090 if (!(q->q_flag & QPERQ)) {
4091 ASSERT(sq == q->q_syncq);
4092 break;
4093 }
4094 ASSERT(q->q_syncq != NULL);
4095 ASSERT(sq != q->q_syncq);
4096 sq = q->q_syncq;
4097 ASSERT(sq->sq_type & SQ_CIPUT);
4098 }
4099 }
4100
4101 /*
4102 * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
4103 * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
4104 * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
4105 * starting from q and down to the driver.
4106 *
4107 * This should be called after the affected queues are part of stream
4108 * geometry. It should be called from driver/module open routine after
4109 * qprocson() call. It is also called from nfs syscall where it is known that
4110 * stream is configured and won't change its geometry during create_putlock
4111 * call.
4112 *
4113 * caller normally uses 0 value for the stream argument to speed up MT putnext
4114 * into the perimeter of q for example because its perimeter is per module
4115 * (e.g. IP).
4116 *
4117 * caller normally uses non 0 value for the stream argument to hint the system
4118 * that the stream of q is a very contended global system stream
4119 * (e.g. NFS/UDP) and the part of the stream from q to the driver is
4120 * particularly MT hot.
4121 *
4122 * Caller insures stream plumbing won't happen while we are here and therefore
4123 * q_next can be safely used.
4124 */
4125
4126 void
create_putlocks(queue_t * q,int stream)4127 create_putlocks(queue_t *q, int stream)
4128 {
4129 ciputctrl_t *cip;
4130 struct stdata *stp = STREAM(q);
4131
4132 q = _WR(q);
4133 ASSERT(stp != NULL);
4134
4135 if (disable_putlocks != 0)
4136 return;
4137
4138 if (n_ciputctrl < min_n_ciputctrl)
4139 return;
4140
4141 ASSERT(ciputctrl_cache != NULL);
4142
4143 if (stream != 0 && stp->sd_ciputctrl == NULL) {
4144 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4145 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4146 mutex_enter(&stp->sd_lock);
4147 if (stp->sd_ciputctrl != NULL) {
4148 mutex_exit(&stp->sd_lock);
4149 kmem_cache_free(ciputctrl_cache, cip);
4150 } else {
4151 ASSERT(stp->sd_nciputctrl == 0);
4152 stp->sd_nciputctrl = n_ciputctrl - 1;
4153 /*
4154 * putnext checks sd_ciputctrl without holding
4155 * sd_lock. if it is not NULL putnext assumes
4156 * sd_nciputctrl is initialized. membar below
4157 * insures that.
4158 */
4159 membar_producer();
4160 stp->sd_ciputctrl = cip;
4161 mutex_exit(&stp->sd_lock);
4162 }
4163 }
4164
4165 ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);
4166
4167 while (_SAMESTR(q)) {
4168 create_syncq_putlocks(q);
4169 if (stream == 0)
4170 return;
4171 q = q->q_next;
4172 }
4173 ASSERT(q != NULL);
4174 create_syncq_putlocks(q);
4175 }
4176
4177 /*
4178 * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
4179 * through a stream.
4180 *
4181 * Data currently record per-event is a timestamp, module/driver name,
4182 * downstream module/driver name, optional callstack, event type and a per
4183 * type datum. Much of the STREAMS framework is instrumented for automatic
4184 * flow tracing (when enabled). Events can be defined and used by STREAMS
4185 * modules and drivers.
4186 *
4187 * Global objects:
4188 *
4189 * str_ftevent() - Add a flow-trace event to a dblk.
4190 * str_ftfree() - Free flow-trace data
4191 *
4192 * Local objects:
4193 *
4194 * fthdr_cache - pointer to the kmem cache for trace header.
4195 * ftblk_cache - pointer to the kmem cache for trace data blocks.
4196 */
4197
4198 int str_ftnever = 1; /* Don't do STREAMS flow tracing */
4199 int str_ftstack = 0; /* Don't record event call stacks */
4200
4201 void
str_ftevent(fthdr_t * hp,void * p,ushort_t evnt,ushort_t data)4202 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
4203 {
4204 ftblk_t *bp = hp->tail;
4205 ftblk_t *nbp;
4206 ftevnt_t *ep;
4207 int ix, nix;
4208
4209 ASSERT(hp != NULL);
4210
4211 for (;;) {
4212 if ((ix = bp->ix) == FTBLK_EVNTS) {
4213 /*
4214 * Tail doesn't have room, so need a new tail.
4215 *
4216 * To make this MT safe, first, allocate a new
4217 * ftblk, and initialize it. To make life a
4218 * little easier, reserve the first slot (mostly
4219 * by making ix = 1). When we are finished with
4220 * the initialization, CAS this pointer to the
4221 * tail. If this succeeds, this is the new
4222 * "next" block. Otherwise, another thread
4223 * got here first, so free the block and start
4224 * again.
4225 */
4226 nbp = kmem_cache_alloc(ftblk_cache, KM_NOSLEEP);
4227 if (nbp == NULL) {
4228 /* no mem, so punt */
4229 str_ftnever++;
4230 /* free up all flow data? */
4231 return;
4232 }
4233 nbp->nxt = NULL;
4234 nbp->ix = 1;
4235 /*
4236 * Just in case there is another thread about
4237 * to get the next index, we need to make sure
4238 * the value is there for it.
4239 */
4240 membar_producer();
4241 if (atomic_cas_ptr(&hp->tail, bp, nbp) == bp) {
4242 /* CAS was successful */
4243 bp->nxt = nbp;
4244 membar_producer();
4245 bp = nbp;
4246 ix = 0;
4247 goto cas_good;
4248 } else {
4249 kmem_cache_free(ftblk_cache, nbp);
4250 bp = hp->tail;
4251 continue;
4252 }
4253 }
4254 nix = ix + 1;
4255 if (atomic_cas_32((uint32_t *)&bp->ix, ix, nix) == ix) {
4256 cas_good:
4257 if (curthread != hp->thread) {
4258 hp->thread = curthread;
4259 evnt |= FTEV_CS;
4260 }
4261 if (CPU->cpu_seqid != hp->cpu_seqid) {
4262 hp->cpu_seqid = CPU->cpu_seqid;
4263 evnt |= FTEV_PS;
4264 }
4265 ep = &bp->ev[ix];
4266 break;
4267 }
4268 }
4269
4270 if (evnt & FTEV_QMASK) {
4271 queue_t *qp = p;
4272
4273 if (!(qp->q_flag & QREADR))
4274 evnt |= FTEV_ISWR;
4275
4276 ep->mid = Q2NAME(qp);
4277
4278 /*
4279 * We only record the next queue name for FTEV_PUTNEXT since
4280 * that's the only time we *really* need it, and the putnext()
4281 * code ensures that qp->q_next won't vanish. (We could use
4282 * claimstr()/releasestr() but at a performance cost.)
4283 */
4284 if ((evnt & FTEV_MASK) == FTEV_PUTNEXT && qp->q_next != NULL)
4285 ep->midnext = Q2NAME(qp->q_next);
4286 else
4287 ep->midnext = NULL;
4288 } else {
4289 ep->mid = p;
4290 ep->midnext = NULL;
4291 }
4292
4293 if (ep->stk != NULL)
4294 ep->stk->fs_depth = getpcstack(ep->stk->fs_stk, FTSTK_DEPTH);
4295
4296 ep->ts = gethrtime();
4297 ep->evnt = evnt;
4298 ep->data = data;
4299 hp->hash = (hp->hash << 9) + hp->hash;
4300 hp->hash += (evnt << 16) | data;
4301 hp->hash += (uintptr_t)ep->mid;
4302 }
4303
4304 /*
4305 * Free flow-trace data.
4306 */
4307 void
str_ftfree(dblk_t * dbp)4308 str_ftfree(dblk_t *dbp)
4309 {
4310 fthdr_t *hp = dbp->db_fthdr;
4311 ftblk_t *bp = &hp->first;
4312 ftblk_t *nbp;
4313
4314 if (bp != hp->tail || bp->ix != 0) {
4315 /*
4316 * Clear out the hash, have the tail point to itself, and free
4317 * any continuation blocks.
4318 */
4319 bp = hp->first.nxt;
4320 hp->tail = &hp->first;
4321 hp->hash = 0;
4322 hp->first.nxt = NULL;
4323 hp->first.ix = 0;
4324 while (bp != NULL) {
4325 nbp = bp->nxt;
4326 kmem_cache_free(ftblk_cache, bp);
4327 bp = nbp;
4328 }
4329 }
4330 kmem_cache_free(fthdr_cache, hp);
4331 dbp->db_fthdr = NULL;
4332 }
4333