1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */
22 /* All Rights Reserved */
23
24 /*
25 * Copyright 2009 Sun Microsystems, Inc. All rights reserved.
26 * Use is subject to license terms.
27 *
28 * Copyright 2021 Tintri by DDN, Inc. All rights reserved.
29 * Copyright 2022 Garrett D'Amore
30 */
31
32 #include <sys/types.h>
33 #include <sys/param.h>
34 #include <sys/thread.h>
35 #include <sys/sysmacros.h>
36 #include <sys/stropts.h>
37 #include <sys/stream.h>
38 #include <sys/strsubr.h>
39 #include <sys/strsun.h>
40 #include <sys/conf.h>
41 #include <sys/debug.h>
42 #include <sys/cmn_err.h>
43 #include <sys/kmem.h>
44 #include <sys/atomic.h>
45 #include <sys/errno.h>
46 #include <sys/vtrace.h>
47 #include <sys/ftrace.h>
48 #include <sys/ontrap.h>
49 #include <sys/sdt.h>
50 #include <sys/strft.h>
51
52 #ifdef DEBUG
53 #include <sys/kmem_impl.h>
54 #endif
55
56 /*
57 * This file contains all the STREAMS utility routines that may
58 * be used by modules and drivers.
59 */
60
61 /*
62 * STREAMS message allocator: principles of operation
63 *
64 * The streams message allocator consists of all the routines that
65 * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
66 * dupb(), freeb() and freemsg(). What follows is a high-level view
67 * of how the allocator works.
68 *
69 * Every streams message consists of one or more mblks, a dblk, and data.
70 * All mblks for all types of messages come from a common mblk_cache.
71 * The dblk and data come in several flavors, depending on how the
72 * message is allocated:
73 *
74 * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
75 * fixed-size dblk/data caches. For message sizes that are multiples of
76 * PAGESIZE, dblks are allocated separately from the buffer.
77 * The associated buffer is allocated by the constructor using kmem_alloc().
78 * For all other message sizes, dblk and its associated data is allocated
79 * as a single contiguous chunk of memory.
80 * Objects in these caches consist of a dblk plus its associated data.
81 * allocb() determines the nearest-size cache by table lookup:
82 * the dblk_cache[] array provides the mapping from size to dblk cache.
83 *
84 * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
85 * kmem_alloc()'ing a buffer for the data and supplying that
86 * buffer to gesballoc(), described below.
87 *
88 * (3) The four flavors of [d]esballoc[a] are all implemented by a
89 * common routine, gesballoc() ("generic esballoc"). gesballoc()
90 * allocates a dblk from the global dblk_esb_cache and sets db_base,
91 * db_lim and db_frtnp to describe the caller-supplied buffer.
92 *
93 * While there are several routines to allocate messages, there is only
94 * one routine to free messages: freeb(). freeb() simply invokes the
95 * dblk's free method, dbp->db_free(), which is set at allocation time.
96 *
97 * dupb() creates a new reference to a message by allocating a new mblk,
98 * incrementing the dblk reference count and setting the dblk's free
99 * method to dblk_decref(). The dblk's original free method is retained
100 * in db_lastfree. dblk_decref() decrements the reference count on each
101 * freeb(). If this is not the last reference it just frees the mblk;
102 * if this *is* the last reference, it restores db_free to db_lastfree,
103 * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
104 *
105 * The implementation makes aggressive use of kmem object caching for
106 * maximum performance. This makes the code simple and compact, but
107 * also a bit abstruse in some places. The invariants that constitute a
108 * message's constructed state, described below, are more subtle than usual.
109 *
110 * Every dblk has an "attached mblk" as part of its constructed state.
111 * The mblk is allocated by the dblk's constructor and remains attached
112 * until the message is either dup'ed or pulled up. In the dupb() case
113 * the mblk association doesn't matter until the last free, at which time
114 * dblk_decref() attaches the last mblk to the dblk. pullupmsg() affects
115 * the mblk association because it swaps the leading mblks of two messages,
116 * so it is responsible for swapping their db_mblk pointers accordingly.
117 * From a constructed-state viewpoint it doesn't matter that a dblk's
118 * attached mblk can change while the message is allocated; all that
119 * matters is that the dblk has *some* attached mblk when it's freed.
120 *
121 * The sizes of the allocb() small-message caches are not magical.
122 * They represent a good trade-off between internal and external
123 * fragmentation for current workloads. They should be reevaluated
124 * periodically, especially if allocations larger than DBLK_MAX_CACHE
125 * become common. We use 64-byte alignment so that dblks don't
126 * straddle cache lines unnecessarily.
127 */
128 #define DBLK_MAX_CACHE 73728
129 #define DBLK_CACHE_ALIGN 64
130 #define DBLK_MIN_SIZE 8
131 #define DBLK_SIZE_SHIFT 3
132
133 #ifdef _BIG_ENDIAN
134 #define DBLK_RTFU_SHIFT(field) \
135 (8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
136 #else
137 #define DBLK_RTFU_SHIFT(field) \
138 (8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
139 #endif
140
141 #define DBLK_RTFU(ref, type, flags, uioflag) \
142 (((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
143 ((type) << DBLK_RTFU_SHIFT(db_type)) | \
144 (((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
145 ((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
146 #define DBLK_RTFU_REF_MASK (DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
147 #define DBLK_RTFU_WORD(dbp) (*((uint32_t *)&(dbp)->db_ref))
148 #define MBLK_BAND_FLAG_WORD(mp) (*((uint32_t *)&(mp)->b_band))
149
150 static size_t dblk_sizes[] = {
151 #ifdef _LP64
152 16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856,
153 8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624,
154 40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392,
155 #else
156 64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904,
157 8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672,
158 40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440,
159 #endif
160 DBLK_MAX_CACHE, 0
161 };
162
163 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
164 static struct kmem_cache *mblk_cache;
165 static struct kmem_cache *dblk_esb_cache;
166 static struct kmem_cache *fthdr_cache;
167 static struct kmem_cache *ftblk_cache;
168
169 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
170 static mblk_t *allocb_oversize(size_t size, int flags);
171 static int allocb_tryhard_fails;
172 static void frnop_func(void *arg);
173 frtn_t frnop = { frnop_func };
174 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
175
176 static boolean_t rwnext_enter(queue_t *qp);
177 static void rwnext_exit(queue_t *qp);
178
179 /*
180 * Patchable mblk/dblk kmem_cache flags.
181 */
182 int dblk_kmem_flags = 0;
183 int mblk_kmem_flags = 0;
184
185 static int
dblk_constructor(void * buf,void * cdrarg,int kmflags)186 dblk_constructor(void *buf, void *cdrarg, int kmflags)
187 {
188 dblk_t *dbp = buf;
189 ssize_t msg_size = (ssize_t)cdrarg;
190 size_t index;
191
192 ASSERT(msg_size != 0);
193
194 index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
195
196 ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
197
198 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
199 return (-1);
200 if ((msg_size & PAGEOFFSET) == 0) {
201 dbp->db_base = kmem_alloc(msg_size, kmflags);
202 if (dbp->db_base == NULL) {
203 kmem_cache_free(mblk_cache, dbp->db_mblk);
204 return (-1);
205 }
206 } else {
207 dbp->db_base = (unsigned char *)&dbp[1];
208 }
209
210 dbp->db_mblk->b_datap = dbp;
211 dbp->db_cache = dblk_cache[index];
212 dbp->db_lim = dbp->db_base + msg_size;
213 dbp->db_free = dbp->db_lastfree = dblk_lastfree;
214 dbp->db_frtnp = NULL;
215 dbp->db_fthdr = NULL;
216 dbp->db_credp = NULL;
217 dbp->db_cpid = -1;
218 dbp->db_struioflag = 0;
219 dbp->db_struioun.cksum.flags = 0;
220 return (0);
221 }
222
223 /*ARGSUSED*/
224 static int
dblk_esb_constructor(void * buf,void * cdrarg,int kmflags)225 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
226 {
227 dblk_t *dbp = buf;
228
229 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
230 return (-1);
231 dbp->db_mblk->b_datap = dbp;
232 dbp->db_cache = dblk_esb_cache;
233 dbp->db_fthdr = NULL;
234 dbp->db_credp = NULL;
235 dbp->db_cpid = -1;
236 dbp->db_struioflag = 0;
237 dbp->db_struioun.cksum.flags = 0;
238 return (0);
239 }
240
241 static int
bcache_dblk_constructor(void * buf,void * cdrarg,int kmflags)242 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
243 {
244 dblk_t *dbp = buf;
245 bcache_t *bcp = cdrarg;
246
247 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
248 return (-1);
249
250 dbp->db_base = kmem_cache_alloc(bcp->buffer_cache, kmflags);
251 if (dbp->db_base == NULL) {
252 kmem_cache_free(mblk_cache, dbp->db_mblk);
253 return (-1);
254 }
255
256 dbp->db_mblk->b_datap = dbp;
257 dbp->db_cache = (void *)bcp;
258 dbp->db_lim = dbp->db_base + bcp->size;
259 dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
260 dbp->db_frtnp = NULL;
261 dbp->db_fthdr = NULL;
262 dbp->db_credp = NULL;
263 dbp->db_cpid = -1;
264 dbp->db_struioflag = 0;
265 dbp->db_struioun.cksum.flags = 0;
266 return (0);
267 }
268
269 /*ARGSUSED*/
270 static void
dblk_destructor(void * buf,void * cdrarg)271 dblk_destructor(void *buf, void *cdrarg)
272 {
273 dblk_t *dbp = buf;
274 ssize_t msg_size = (ssize_t)cdrarg;
275
276 ASSERT(dbp->db_mblk->b_datap == dbp);
277 ASSERT(msg_size != 0);
278 ASSERT(dbp->db_struioflag == 0);
279 ASSERT(dbp->db_struioun.cksum.flags == 0);
280
281 if ((msg_size & PAGEOFFSET) == 0) {
282 kmem_free(dbp->db_base, msg_size);
283 }
284
285 kmem_cache_free(mblk_cache, dbp->db_mblk);
286 }
287
288 static void
bcache_dblk_destructor(void * buf,void * cdrarg)289 bcache_dblk_destructor(void *buf, void *cdrarg)
290 {
291 dblk_t *dbp = buf;
292 bcache_t *bcp = cdrarg;
293
294 kmem_cache_free(bcp->buffer_cache, dbp->db_base);
295
296 ASSERT(dbp->db_mblk->b_datap == dbp);
297 ASSERT(dbp->db_struioflag == 0);
298 ASSERT(dbp->db_struioun.cksum.flags == 0);
299
300 kmem_cache_free(mblk_cache, dbp->db_mblk);
301 }
302
303 /* ARGSUSED */
304 static int
ftblk_constructor(void * buf,void * cdrarg,int kmflags)305 ftblk_constructor(void *buf, void *cdrarg, int kmflags)
306 {
307 ftblk_t *fbp = buf;
308 int i;
309
310 bzero(fbp, sizeof (ftblk_t));
311 if (str_ftstack != 0) {
312 for (i = 0; i < FTBLK_EVNTS; i++)
313 fbp->ev[i].stk = kmem_alloc(sizeof (ftstk_t), kmflags);
314 }
315
316 return (0);
317 }
318
319 /* ARGSUSED */
320 static void
ftblk_destructor(void * buf,void * cdrarg)321 ftblk_destructor(void *buf, void *cdrarg)
322 {
323 ftblk_t *fbp = buf;
324 int i;
325
326 if (str_ftstack != 0) {
327 for (i = 0; i < FTBLK_EVNTS; i++) {
328 if (fbp->ev[i].stk != NULL) {
329 kmem_free(fbp->ev[i].stk, sizeof (ftstk_t));
330 fbp->ev[i].stk = NULL;
331 }
332 }
333 }
334 }
335
336 static int
fthdr_constructor(void * buf,void * cdrarg,int kmflags)337 fthdr_constructor(void *buf, void *cdrarg, int kmflags)
338 {
339 fthdr_t *fhp = buf;
340
341 return (ftblk_constructor(&fhp->first, cdrarg, kmflags));
342 }
343
344 static void
fthdr_destructor(void * buf,void * cdrarg)345 fthdr_destructor(void *buf, void *cdrarg)
346 {
347 fthdr_t *fhp = buf;
348
349 ftblk_destructor(&fhp->first, cdrarg);
350 }
351
352 void
streams_msg_init(void)353 streams_msg_init(void)
354 {
355 char name[40];
356 size_t size;
357 size_t lastsize = DBLK_MIN_SIZE;
358 size_t *sizep;
359 struct kmem_cache *cp;
360 size_t tot_size;
361 int offset;
362
363 mblk_cache = kmem_cache_create("streams_mblk", sizeof (mblk_t), 32,
364 NULL, NULL, NULL, NULL, NULL, mblk_kmem_flags);
365
366 for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
367
368 if ((offset = (size & PAGEOFFSET)) != 0) {
369 /*
370 * We are in the middle of a page, dblk should
371 * be allocated on the same page
372 */
373 tot_size = size + sizeof (dblk_t);
374 ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
375 < PAGESIZE);
376 ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
377
378 } else {
379
380 /*
381 * buf size is multiple of page size, dblk and
382 * buffer are allocated separately.
383 */
384
385 ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
386 tot_size = sizeof (dblk_t);
387 }
388
389 (void) sprintf(name, "streams_dblk_%ld", size);
390 cp = kmem_cache_create(name, tot_size, DBLK_CACHE_ALIGN,
391 dblk_constructor, dblk_destructor, NULL, (void *)(size),
392 NULL, dblk_kmem_flags);
393
394 while (lastsize <= size) {
395 dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
396 lastsize += DBLK_MIN_SIZE;
397 }
398 }
399
400 dblk_esb_cache = kmem_cache_create("streams_dblk_esb", sizeof (dblk_t),
401 DBLK_CACHE_ALIGN, dblk_esb_constructor, dblk_destructor, NULL,
402 (void *)sizeof (dblk_t), NULL, dblk_kmem_flags);
403 fthdr_cache = kmem_cache_create("streams_fthdr", sizeof (fthdr_t), 32,
404 fthdr_constructor, fthdr_destructor, NULL, NULL, NULL, 0);
405 ftblk_cache = kmem_cache_create("streams_ftblk", sizeof (ftblk_t), 32,
406 ftblk_constructor, ftblk_destructor, NULL, NULL, NULL, 0);
407
408 /* initialize throttling queue for esballoc */
409 esballoc_queue_init();
410 }
411
412 /*ARGSUSED*/
413 mblk_t *
allocb(size_t size,uint_t pri)414 allocb(size_t size, uint_t pri)
415 {
416 dblk_t *dbp;
417 mblk_t *mp;
418 size_t index;
419
420 index = (size - 1) >> DBLK_SIZE_SHIFT;
421
422 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
423 if (size != 0) {
424 mp = allocb_oversize(size, KM_NOSLEEP);
425 goto out;
426 }
427 index = 0;
428 }
429
430 if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
431 mp = NULL;
432 goto out;
433 }
434
435 mp = dbp->db_mblk;
436 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
437 mp->b_next = mp->b_prev = mp->b_cont = NULL;
438 mp->b_rptr = mp->b_wptr = dbp->db_base;
439 mp->b_queue = NULL;
440 MBLK_BAND_FLAG_WORD(mp) = 0;
441 STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
442 out:
443 FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
444
445 return (mp);
446 }
447
448 /*
449 * Allocate an mblk taking db_credp and db_cpid from the template.
450 * Allow the cred to be NULL.
451 */
452 mblk_t *
allocb_tmpl(size_t size,const mblk_t * tmpl)453 allocb_tmpl(size_t size, const mblk_t *tmpl)
454 {
455 mblk_t *mp = allocb(size, 0);
456
457 if (mp != NULL) {
458 dblk_t *src = tmpl->b_datap;
459 dblk_t *dst = mp->b_datap;
460 cred_t *cr;
461 pid_t cpid;
462
463 cr = msg_getcred(tmpl, &cpid);
464 if (cr != NULL)
465 crhold(dst->db_credp = cr);
466 dst->db_cpid = cpid;
467 dst->db_type = src->db_type;
468 }
469 return (mp);
470 }
471
472 mblk_t *
allocb_cred(size_t size,cred_t * cr,pid_t cpid)473 allocb_cred(size_t size, cred_t *cr, pid_t cpid)
474 {
475 mblk_t *mp = allocb(size, 0);
476
477 ASSERT(cr != NULL);
478 if (mp != NULL) {
479 dblk_t *dbp = mp->b_datap;
480
481 crhold(dbp->db_credp = cr);
482 dbp->db_cpid = cpid;
483 }
484 return (mp);
485 }
486
487 mblk_t *
allocb_cred_wait(size_t size,uint_t flags,int * error,cred_t * cr,pid_t cpid)488 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr, pid_t cpid)
489 {
490 mblk_t *mp = allocb_wait(size, 0, flags, error);
491
492 ASSERT(cr != NULL);
493 if (mp != NULL) {
494 dblk_t *dbp = mp->b_datap;
495
496 crhold(dbp->db_credp = cr);
497 dbp->db_cpid = cpid;
498 }
499
500 return (mp);
501 }
502
503 /*
504 * Extract the db_cred (and optionally db_cpid) from a message.
505 * We find the first mblk which has a non-NULL db_cred and use that.
506 * If none found we return NULL.
507 * Does NOT get a hold on the cred.
508 */
509 cred_t *
msg_getcred(const mblk_t * mp,pid_t * cpidp)510 msg_getcred(const mblk_t *mp, pid_t *cpidp)
511 {
512 cred_t *cr = NULL;
513 cred_t *cr2;
514 mblk_t *mp2;
515
516 while (mp != NULL) {
517 dblk_t *dbp = mp->b_datap;
518
519 cr = dbp->db_credp;
520 if (cr == NULL) {
521 mp = mp->b_cont;
522 continue;
523 }
524 if (cpidp != NULL)
525 *cpidp = dbp->db_cpid;
526
527 #ifdef DEBUG
528 /*
529 * Normally there should at most one db_credp in a message.
530 * But if there are multiple (as in the case of some M_IOC*
531 * and some internal messages in TCP/IP bind logic) then
532 * they must be identical in the normal case.
533 * However, a socket can be shared between different uids
534 * in which case data queued in TCP would be from different
535 * creds. Thus we can only assert for the zoneid being the
536 * same. Due to Multi-level Level Ports for TX, some
537 * cred_t can have a NULL cr_zone, and we skip the comparison
538 * in that case.
539 */
540 mp2 = mp->b_cont;
541 while (mp2 != NULL) {
542 cr2 = DB_CRED(mp2);
543 if (cr2 != NULL) {
544 DTRACE_PROBE2(msg__getcred,
545 cred_t *, cr, cred_t *, cr2);
546 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
547 crgetzone(cr) == NULL ||
548 crgetzone(cr2) == NULL);
549 }
550 mp2 = mp2->b_cont;
551 }
552 #endif
553 return (cr);
554 }
555 if (cpidp != NULL)
556 *cpidp = NOPID;
557 return (NULL);
558 }
559
560 /*
561 * Variant of msg_getcred which, when a cred is found
562 * 1. Returns with a hold on the cred
563 * 2. Clears the first cred in the mblk.
564 * This is more efficient to use than a msg_getcred() + crhold() when
565 * the message is freed after the cred has been extracted.
566 *
567 * The caller is responsible for ensuring that there is no other reference
568 * on the message since db_credp can not be cleared when there are other
569 * references.
570 */
571 cred_t *
msg_extractcred(mblk_t * mp,pid_t * cpidp)572 msg_extractcred(mblk_t *mp, pid_t *cpidp)
573 {
574 cred_t *cr = NULL;
575 cred_t *cr2;
576 mblk_t *mp2;
577
578 while (mp != NULL) {
579 dblk_t *dbp = mp->b_datap;
580
581 cr = dbp->db_credp;
582 if (cr == NULL) {
583 mp = mp->b_cont;
584 continue;
585 }
586 ASSERT(dbp->db_ref == 1);
587 dbp->db_credp = NULL;
588 if (cpidp != NULL)
589 *cpidp = dbp->db_cpid;
590 #ifdef DEBUG
591 /*
592 * Normally there should at most one db_credp in a message.
593 * But if there are multiple (as in the case of some M_IOC*
594 * and some internal messages in TCP/IP bind logic) then
595 * they must be identical in the normal case.
596 * However, a socket can be shared between different uids
597 * in which case data queued in TCP would be from different
598 * creds. Thus we can only assert for the zoneid being the
599 * same. Due to Multi-level Level Ports for TX, some
600 * cred_t can have a NULL cr_zone, and we skip the comparison
601 * in that case.
602 */
603 mp2 = mp->b_cont;
604 while (mp2 != NULL) {
605 cr2 = DB_CRED(mp2);
606 if (cr2 != NULL) {
607 DTRACE_PROBE2(msg__extractcred,
608 cred_t *, cr, cred_t *, cr2);
609 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
610 crgetzone(cr) == NULL ||
611 crgetzone(cr2) == NULL);
612 }
613 mp2 = mp2->b_cont;
614 }
615 #endif
616 return (cr);
617 }
618 return (NULL);
619 }
620 /*
621 * Get the label for a message. Uses the first mblk in the message
622 * which has a non-NULL db_credp.
623 * Returns NULL if there is no credp.
624 */
625 extern struct ts_label_s *
msg_getlabel(const mblk_t * mp)626 msg_getlabel(const mblk_t *mp)
627 {
628 cred_t *cr = msg_getcred(mp, NULL);
629
630 if (cr == NULL)
631 return (NULL);
632
633 return (crgetlabel(cr));
634 }
635
636 void
freeb(mblk_t * mp)637 freeb(mblk_t *mp)
638 {
639 dblk_t *dbp = mp->b_datap;
640
641 ASSERT(dbp->db_ref > 0);
642 ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
643 FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
644
645 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
646
647 dbp->db_free(mp, dbp);
648 }
649
650 void
freemsg(mblk_t * mp)651 freemsg(mblk_t *mp)
652 {
653 FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
654 while (mp) {
655 dblk_t *dbp = mp->b_datap;
656 mblk_t *mp_cont = mp->b_cont;
657
658 ASSERT(dbp->db_ref > 0);
659 ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
660
661 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
662
663 dbp->db_free(mp, dbp);
664 mp = mp_cont;
665 }
666 }
667
668 /*
669 * Reallocate a block for another use. Try hard to use the old block.
670 * If the old data is wanted (copy), leave b_wptr at the end of the data,
671 * otherwise return b_wptr = b_rptr.
672 *
673 * This routine is private and unstable.
674 */
675 mblk_t *
reallocb(mblk_t * mp,size_t size,uint_t copy)676 reallocb(mblk_t *mp, size_t size, uint_t copy)
677 {
678 mblk_t *mp1;
679 unsigned char *old_rptr;
680 ptrdiff_t cur_size;
681
682 if (mp == NULL)
683 return (allocb(size, BPRI_HI));
684
685 cur_size = mp->b_wptr - mp->b_rptr;
686 old_rptr = mp->b_rptr;
687
688 ASSERT(mp->b_datap->db_ref != 0);
689
690 if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
691 /*
692 * If the data is wanted and it will fit where it is, no
693 * work is required.
694 */
695 if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
696 return (mp);
697
698 mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
699 mp1 = mp;
700 } else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
701 /* XXX other mp state could be copied too, db_flags ... ? */
702 mp1->b_cont = mp->b_cont;
703 } else {
704 return (NULL);
705 }
706
707 if (copy) {
708 bcopy(old_rptr, mp1->b_rptr, cur_size);
709 mp1->b_wptr = mp1->b_rptr + cur_size;
710 }
711
712 if (mp != mp1)
713 freeb(mp);
714
715 return (mp1);
716 }
717
718 static void
dblk_lastfree(mblk_t * mp,dblk_t * dbp)719 dblk_lastfree(mblk_t *mp, dblk_t *dbp)
720 {
721 ASSERT(dbp->db_mblk == mp);
722 if (dbp->db_fthdr != NULL)
723 str_ftfree(dbp);
724
725 /* set credp and projid to be 'unspecified' before returning to cache */
726 if (dbp->db_credp != NULL) {
727 crfree(dbp->db_credp);
728 dbp->db_credp = NULL;
729 }
730 dbp->db_cpid = -1;
731
732 /* Reset the struioflag and the checksum flag fields */
733 dbp->db_struioflag = 0;
734 dbp->db_struioun.cksum.flags = 0;
735
736 /* and the COOKED and/or UIOA flag(s) */
737 dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA);
738
739 kmem_cache_free(dbp->db_cache, dbp);
740 }
741
742 static void
dblk_decref(mblk_t * mp,dblk_t * dbp)743 dblk_decref(mblk_t *mp, dblk_t *dbp)
744 {
745 if (dbp->db_ref != 1) {
746 uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
747 -(1 << DBLK_RTFU_SHIFT(db_ref)));
748 /*
749 * atomic_add_32_nv() just decremented db_ref, so we no longer
750 * have a reference to the dblk, which means another thread
751 * could free it. Therefore we cannot examine the dblk to
752 * determine whether ours was the last reference. Instead,
753 * we extract the new and minimum reference counts from rtfu.
754 * Note that all we're really saying is "if (ref != refmin)".
755 */
756 if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
757 ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
758 kmem_cache_free(mblk_cache, mp);
759 return;
760 }
761 }
762 dbp->db_mblk = mp;
763 dbp->db_free = dbp->db_lastfree;
764 dbp->db_lastfree(mp, dbp);
765 }
766
767 mblk_t *
dupb(mblk_t * mp)768 dupb(mblk_t *mp)
769 {
770 dblk_t *dbp = mp->b_datap;
771 mblk_t *new_mp;
772 uint32_t oldrtfu, newrtfu;
773
774 if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
775 goto out;
776
777 new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
778 new_mp->b_rptr = mp->b_rptr;
779 new_mp->b_wptr = mp->b_wptr;
780 new_mp->b_datap = dbp;
781 new_mp->b_queue = NULL;
782 MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
783
784 STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
785
786 dbp->db_free = dblk_decref;
787 do {
788 ASSERT(dbp->db_ref > 0);
789 oldrtfu = DBLK_RTFU_WORD(dbp);
790 newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
791 /*
792 * If db_ref is maxed out we can't dup this message anymore.
793 */
794 if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
795 kmem_cache_free(mblk_cache, new_mp);
796 new_mp = NULL;
797 goto out;
798 }
799 } while (atomic_cas_32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) !=
800 oldrtfu);
801
802 out:
803 FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
804 return (new_mp);
805 }
806
807 static void
dblk_lastfree_desb(mblk_t * mp,dblk_t * dbp)808 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
809 {
810 frtn_t *frp = dbp->db_frtnp;
811
812 ASSERT(dbp->db_mblk == mp);
813 frp->free_func(frp->free_arg);
814 if (dbp->db_fthdr != NULL)
815 str_ftfree(dbp);
816
817 /* set credp and projid to be 'unspecified' before returning to cache */
818 if (dbp->db_credp != NULL) {
819 crfree(dbp->db_credp);
820 dbp->db_credp = NULL;
821 }
822 dbp->db_cpid = -1;
823 dbp->db_struioflag = 0;
824 dbp->db_struioun.cksum.flags = 0;
825
826 kmem_cache_free(dbp->db_cache, dbp);
827 }
828
829 /*ARGSUSED*/
830 static void
frnop_func(void * arg)831 frnop_func(void *arg)
832 {
833 }
834
835 /*
836 * Generic esballoc used to implement the four flavors: [d]esballoc[a].
837 *
838 * The variants with a 'd' prefix (desballoc, desballoca)
839 * directly free the mblk when it loses its last ref,
840 * where the other variants free asynchronously.
841 * The variants with an 'a' suffix (esballoca, desballoca)
842 * add an extra ref, effectively letting the streams subsystem
843 * know that the message data should not be modified.
844 * (eg. see db_ref checks in reallocb and elsewhere)
845 *
846 * The method used by the 'a' suffix functions to keep the dblk
847 * db_ref > 1 is non-obvious. The macro DBLK_RTFU(2,...) passed to
848 * gesballoc sets the initial db_ref = 2 and sets the DBLK_REFMIN
849 * bit in db_flags. In dblk_decref() that flag essentially means
850 * the dblk has one extra ref, so the "last ref" is one, not zero.
851 */
852 static mblk_t *
gesballoc(unsigned char * base,size_t size,uint32_t db_rtfu,frtn_t * frp,void (* lastfree)(mblk_t *,dblk_t *),int kmflags)853 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
854 void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
855 {
856 dblk_t *dbp;
857 mblk_t *mp;
858
859 ASSERT(base != NULL && frp != NULL);
860
861 if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
862 mp = NULL;
863 goto out;
864 }
865
866 mp = dbp->db_mblk;
867 dbp->db_base = base;
868 dbp->db_lim = base + size;
869 dbp->db_free = dbp->db_lastfree = lastfree;
870 dbp->db_frtnp = frp;
871 DBLK_RTFU_WORD(dbp) = db_rtfu;
872 mp->b_next = mp->b_prev = mp->b_cont = NULL;
873 mp->b_rptr = mp->b_wptr = base;
874 mp->b_queue = NULL;
875 MBLK_BAND_FLAG_WORD(mp) = 0;
876
877 out:
878 FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
879 return (mp);
880 }
881
882 /*ARGSUSED*/
883 mblk_t *
esballoc(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)884 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
885 {
886 mblk_t *mp;
887
888 /*
889 * Note that this is structured to allow the common case (i.e.
890 * STREAMS flowtracing disabled) to call gesballoc() with tail
891 * call optimization.
892 */
893 if (!str_ftnever) {
894 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
895 frp, freebs_enqueue, KM_NOSLEEP);
896
897 if (mp != NULL)
898 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
899 return (mp);
900 }
901
902 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
903 frp, freebs_enqueue, KM_NOSLEEP));
904 }
905
906 /*
907 * Same as esballoc() but sleeps waiting for memory.
908 */
909 /*ARGSUSED*/
910 mblk_t *
esballoc_wait(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)911 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
912 {
913 mblk_t *mp;
914
915 /*
916 * Note that this is structured to allow the common case (i.e.
917 * STREAMS flowtracing disabled) to call gesballoc() with tail
918 * call optimization.
919 */
920 if (!str_ftnever) {
921 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
922 frp, freebs_enqueue, KM_SLEEP);
923
924 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
925 return (mp);
926 }
927
928 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
929 frp, freebs_enqueue, KM_SLEEP));
930 }
931
932 /*ARGSUSED*/
933 mblk_t *
desballoc(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)934 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
935 {
936 mblk_t *mp;
937
938 /*
939 * Note that this is structured to allow the common case (i.e.
940 * STREAMS flowtracing disabled) to call gesballoc() with tail
941 * call optimization.
942 */
943 if (!str_ftnever) {
944 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
945 frp, dblk_lastfree_desb, KM_NOSLEEP);
946
947 if (mp != NULL)
948 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
949 return (mp);
950 }
951
952 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
953 frp, dblk_lastfree_desb, KM_NOSLEEP));
954 }
955
956 /*ARGSUSED*/
957 mblk_t *
esballoca(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)958 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
959 {
960 mblk_t *mp;
961
962 /*
963 * Note that this is structured to allow the common case (i.e.
964 * STREAMS flowtracing disabled) to call gesballoc() with tail
965 * call optimization.
966 */
967 if (!str_ftnever) {
968 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
969 frp, freebs_enqueue, KM_NOSLEEP);
970
971 if (mp != NULL)
972 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
973 return (mp);
974 }
975
976 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
977 frp, freebs_enqueue, KM_NOSLEEP));
978 }
979
980 /*
981 * Same as esballoca() but sleeps waiting for memory.
982 */
983 mblk_t *
esballoca_wait(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)984 esballoca_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
985 {
986 mblk_t *mp;
987
988 /*
989 * Note that this is structured to allow the common case (i.e.
990 * STREAMS flowtracing disabled) to call gesballoc() with tail
991 * call optimization.
992 */
993 if (!str_ftnever) {
994 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
995 frp, freebs_enqueue, KM_SLEEP);
996
997 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
998 return (mp);
999 }
1000
1001 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
1002 frp, freebs_enqueue, KM_SLEEP));
1003 }
1004
1005 /*ARGSUSED*/
1006 mblk_t *
desballoca(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)1007 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
1008 {
1009 mblk_t *mp;
1010
1011 /*
1012 * Note that this is structured to allow the common case (i.e.
1013 * STREAMS flowtracing disabled) to call gesballoc() with tail
1014 * call optimization.
1015 */
1016 if (!str_ftnever) {
1017 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
1018 frp, dblk_lastfree_desb, KM_NOSLEEP);
1019
1020 if (mp != NULL)
1021 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
1022 return (mp);
1023 }
1024
1025 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
1026 frp, dblk_lastfree_desb, KM_NOSLEEP));
1027 }
1028
1029 static void
bcache_dblk_lastfree(mblk_t * mp,dblk_t * dbp)1030 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
1031 {
1032 bcache_t *bcp = dbp->db_cache;
1033
1034 ASSERT(dbp->db_mblk == mp);
1035 if (dbp->db_fthdr != NULL)
1036 str_ftfree(dbp);
1037
1038 /* set credp and projid to be 'unspecified' before returning to cache */
1039 if (dbp->db_credp != NULL) {
1040 crfree(dbp->db_credp);
1041 dbp->db_credp = NULL;
1042 }
1043 dbp->db_cpid = -1;
1044 dbp->db_struioflag = 0;
1045 dbp->db_struioun.cksum.flags = 0;
1046
1047 mutex_enter(&bcp->mutex);
1048 kmem_cache_free(bcp->dblk_cache, dbp);
1049 bcp->alloc--;
1050
1051 if (bcp->alloc == 0 && bcp->destroy != 0) {
1052 kmem_cache_destroy(bcp->dblk_cache);
1053 kmem_cache_destroy(bcp->buffer_cache);
1054 mutex_exit(&bcp->mutex);
1055 mutex_destroy(&bcp->mutex);
1056 kmem_free(bcp, sizeof (bcache_t));
1057 } else {
1058 mutex_exit(&bcp->mutex);
1059 }
1060 }
1061
1062 bcache_t *
bcache_create(char * name,size_t size,uint_t align)1063 bcache_create(char *name, size_t size, uint_t align)
1064 {
1065 bcache_t *bcp;
1066 char buffer[255];
1067
1068 ASSERT((align & (align - 1)) == 0);
1069
1070 if ((bcp = kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == NULL)
1071 return (NULL);
1072
1073 bcp->size = size;
1074 bcp->align = align;
1075 bcp->alloc = 0;
1076 bcp->destroy = 0;
1077
1078 mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
1079
1080 (void) sprintf(buffer, "%s_buffer_cache", name);
1081 bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
1082 NULL, NULL, NULL, 0);
1083 (void) sprintf(buffer, "%s_dblk_cache", name);
1084 bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
1085 DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
1086 NULL, (void *)bcp, NULL, 0);
1087
1088 return (bcp);
1089 }
1090
1091 void
bcache_destroy(bcache_t * bcp)1092 bcache_destroy(bcache_t *bcp)
1093 {
1094 ASSERT(bcp != NULL);
1095
1096 mutex_enter(&bcp->mutex);
1097 if (bcp->alloc == 0) {
1098 kmem_cache_destroy(bcp->dblk_cache);
1099 kmem_cache_destroy(bcp->buffer_cache);
1100 mutex_exit(&bcp->mutex);
1101 mutex_destroy(&bcp->mutex);
1102 kmem_free(bcp, sizeof (bcache_t));
1103 } else {
1104 bcp->destroy++;
1105 mutex_exit(&bcp->mutex);
1106 }
1107 }
1108
1109 /*ARGSUSED*/
1110 mblk_t *
bcache_allocb(bcache_t * bcp,uint_t pri)1111 bcache_allocb(bcache_t *bcp, uint_t pri)
1112 {
1113 dblk_t *dbp;
1114 mblk_t *mp = NULL;
1115
1116 ASSERT(bcp != NULL);
1117
1118 mutex_enter(&bcp->mutex);
1119 if (bcp->destroy != 0) {
1120 mutex_exit(&bcp->mutex);
1121 goto out;
1122 }
1123
1124 if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
1125 mutex_exit(&bcp->mutex);
1126 goto out;
1127 }
1128 bcp->alloc++;
1129 mutex_exit(&bcp->mutex);
1130
1131 ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
1132
1133 mp = dbp->db_mblk;
1134 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1135 mp->b_next = mp->b_prev = mp->b_cont = NULL;
1136 mp->b_rptr = mp->b_wptr = dbp->db_base;
1137 mp->b_queue = NULL;
1138 MBLK_BAND_FLAG_WORD(mp) = 0;
1139 STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
1140 out:
1141 FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
1142
1143 return (mp);
1144 }
1145
1146 static void
dblk_lastfree_oversize(mblk_t * mp,dblk_t * dbp)1147 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
1148 {
1149 ASSERT(dbp->db_mblk == mp);
1150 if (dbp->db_fthdr != NULL)
1151 str_ftfree(dbp);
1152
1153 /* set credp and projid to be 'unspecified' before returning to cache */
1154 if (dbp->db_credp != NULL) {
1155 crfree(dbp->db_credp);
1156 dbp->db_credp = NULL;
1157 }
1158 dbp->db_cpid = -1;
1159 dbp->db_struioflag = 0;
1160 dbp->db_struioun.cksum.flags = 0;
1161
1162 kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
1163 kmem_cache_free(dbp->db_cache, dbp);
1164 }
1165
1166 static mblk_t *
allocb_oversize(size_t size,int kmflags)1167 allocb_oversize(size_t size, int kmflags)
1168 {
1169 mblk_t *mp;
1170 void *buf;
1171
1172 size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
1173 if ((buf = kmem_alloc(size, kmflags)) == NULL)
1174 return (NULL);
1175 if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
1176 &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
1177 kmem_free(buf, size);
1178
1179 if (mp != NULL)
1180 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
1181
1182 return (mp);
1183 }
1184
1185 mblk_t *
allocb_tryhard(size_t target_size)1186 allocb_tryhard(size_t target_size)
1187 {
1188 size_t size;
1189 mblk_t *bp;
1190
1191 for (size = target_size; size < target_size + 512;
1192 size += DBLK_CACHE_ALIGN)
1193 if ((bp = allocb(size, BPRI_HI)) != NULL)
1194 return (bp);
1195 allocb_tryhard_fails++;
1196 return (NULL);
1197 }
1198
1199 /*
1200 * This routine is consolidation private for STREAMS internal use
1201 * This routine may only be called from sync routines (i.e., not
1202 * from put or service procedures). It is located here (rather
1203 * than strsubr.c) so that we don't have to expose all of the
1204 * allocb() implementation details in header files.
1205 */
1206 mblk_t *
allocb_wait(size_t size,uint_t pri,uint_t flags,int * error)1207 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
1208 {
1209 dblk_t *dbp;
1210 mblk_t *mp;
1211 size_t index;
1212
1213 index = (size -1) >> DBLK_SIZE_SHIFT;
1214
1215 if (flags & STR_NOSIG) {
1216 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
1217 if (size != 0) {
1218 mp = allocb_oversize(size, KM_SLEEP);
1219 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
1220 (uintptr_t)mp);
1221 return (mp);
1222 }
1223 index = 0;
1224 }
1225
1226 dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
1227 mp = dbp->db_mblk;
1228 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1229 mp->b_next = mp->b_prev = mp->b_cont = NULL;
1230 mp->b_rptr = mp->b_wptr = dbp->db_base;
1231 mp->b_queue = NULL;
1232 MBLK_BAND_FLAG_WORD(mp) = 0;
1233 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
1234
1235 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
1236
1237 } else {
1238 while ((mp = allocb(size, pri)) == NULL) {
1239 if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
1240 return (NULL);
1241 }
1242 }
1243
1244 return (mp);
1245 }
1246
1247 /*
1248 * Call function 'func' with 'arg' when a class zero block can
1249 * be allocated with priority 'pri'.
1250 */
1251 bufcall_id_t
esbbcall(uint_t pri,void (* func)(void *),void * arg)1252 esbbcall(uint_t pri, void (*func)(void *), void *arg)
1253 {
1254 return (bufcall(1, pri, func, arg));
1255 }
1256
1257 /*
1258 * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
1259 * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
1260 * This provides consistency for all internal allocators of ioctl.
1261 */
1262 mblk_t *
mkiocb(uint_t cmd)1263 mkiocb(uint_t cmd)
1264 {
1265 struct iocblk *ioc;
1266 mblk_t *mp;
1267
1268 /*
1269 * Allocate enough space for any of the ioctl related messages.
1270 */
1271 if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
1272 return (NULL);
1273
1274 bzero(mp->b_rptr, sizeof (union ioctypes));
1275
1276 /*
1277 * Set the mblk_t information and ptrs correctly.
1278 */
1279 mp->b_wptr += sizeof (struct iocblk);
1280 mp->b_datap->db_type = M_IOCTL;
1281
1282 /*
1283 * Fill in the fields.
1284 */
1285 ioc = (struct iocblk *)mp->b_rptr;
1286 ioc->ioc_cmd = cmd;
1287 ioc->ioc_cr = kcred;
1288 ioc->ioc_id = getiocseqno();
1289 ioc->ioc_flag = IOC_NATIVE;
1290 return (mp);
1291 }
1292
1293 /*
1294 * test if block of given size can be allocated with a request of
1295 * the given priority.
1296 * 'pri' is no longer used, but is retained for compatibility.
1297 */
1298 /* ARGSUSED */
1299 int
testb(size_t size,uint_t pri)1300 testb(size_t size, uint_t pri)
1301 {
1302 return ((size + sizeof (dblk_t)) <= kmem_avail());
1303 }
1304
1305 /*
1306 * Call function 'func' with argument 'arg' when there is a reasonably
1307 * good chance that a block of size 'size' can be allocated.
1308 * 'pri' is no longer used, but is retained for compatibility.
1309 */
1310 /* ARGSUSED */
1311 bufcall_id_t
bufcall(size_t size,uint_t pri,void (* func)(void *),void * arg)1312 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
1313 {
1314 static long bid = 1; /* always odd to save checking for zero */
1315 bufcall_id_t bc_id;
1316 struct strbufcall *bcp;
1317
1318 if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
1319 return (0);
1320
1321 bcp->bc_func = func;
1322 bcp->bc_arg = arg;
1323 bcp->bc_size = size;
1324 bcp->bc_next = NULL;
1325 bcp->bc_executor = NULL;
1326
1327 mutex_enter(&strbcall_lock);
1328 /*
1329 * After bcp is linked into strbcalls and strbcall_lock is dropped there
1330 * should be no references to bcp since it may be freed by
1331 * runbufcalls(). Since bcp_id field is returned, we save its value in
1332 * the local var.
1333 */
1334 bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2); /* keep it odd */
1335
1336 /*
1337 * add newly allocated stream event to existing
1338 * linked list of events.
1339 */
1340 if (strbcalls.bc_head == NULL) {
1341 strbcalls.bc_head = strbcalls.bc_tail = bcp;
1342 } else {
1343 strbcalls.bc_tail->bc_next = bcp;
1344 strbcalls.bc_tail = bcp;
1345 }
1346
1347 cv_signal(&strbcall_cv);
1348 mutex_exit(&strbcall_lock);
1349 return (bc_id);
1350 }
1351
1352 /*
1353 * Cancel a bufcall request.
1354 */
1355 void
unbufcall(bufcall_id_t id)1356 unbufcall(bufcall_id_t id)
1357 {
1358 strbufcall_t *bcp, *pbcp;
1359
1360 mutex_enter(&strbcall_lock);
1361 again:
1362 pbcp = NULL;
1363 for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
1364 if (id == bcp->bc_id)
1365 break;
1366 pbcp = bcp;
1367 }
1368 if (bcp) {
1369 if (bcp->bc_executor != NULL) {
1370 if (bcp->bc_executor != curthread) {
1371 cv_wait(&bcall_cv, &strbcall_lock);
1372 goto again;
1373 }
1374 } else {
1375 if (pbcp)
1376 pbcp->bc_next = bcp->bc_next;
1377 else
1378 strbcalls.bc_head = bcp->bc_next;
1379 if (bcp == strbcalls.bc_tail)
1380 strbcalls.bc_tail = pbcp;
1381 kmem_free(bcp, sizeof (strbufcall_t));
1382 }
1383 }
1384 mutex_exit(&strbcall_lock);
1385 }
1386
1387 /*
1388 * Duplicate a message block by block (uses dupb), returning
1389 * a pointer to the duplicate message.
1390 * Returns a non-NULL value only if the entire message
1391 * was dup'd.
1392 */
1393 mblk_t *
dupmsg(mblk_t * bp)1394 dupmsg(mblk_t *bp)
1395 {
1396 mblk_t *head, *nbp;
1397
1398 if (!bp || !(nbp = head = dupb(bp)))
1399 return (NULL);
1400
1401 while (bp->b_cont) {
1402 if (!(nbp->b_cont = dupb(bp->b_cont))) {
1403 freemsg(head);
1404 return (NULL);
1405 }
1406 nbp = nbp->b_cont;
1407 bp = bp->b_cont;
1408 }
1409 return (head);
1410 }
1411
1412 #define DUPB_NOLOAN(bp) \
1413 ((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
1414 copyb((bp)) : dupb((bp)))
1415
1416 mblk_t *
dupmsg_noloan(mblk_t * bp)1417 dupmsg_noloan(mblk_t *bp)
1418 {
1419 mblk_t *head, *nbp;
1420
1421 if (bp == NULL || DB_TYPE(bp) != M_DATA ||
1422 ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
1423 return (NULL);
1424
1425 while (bp->b_cont) {
1426 if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
1427 freemsg(head);
1428 return (NULL);
1429 }
1430 nbp = nbp->b_cont;
1431 bp = bp->b_cont;
1432 }
1433 return (head);
1434 }
1435
1436 /*
1437 * Copy data from message and data block to newly allocated message and
1438 * data block. Returns new message block pointer, or NULL if error.
1439 * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
1440 * as in the original even when db_base is not word aligned. (bug 1052877)
1441 */
1442 mblk_t *
copyb(mblk_t * bp)1443 copyb(mblk_t *bp)
1444 {
1445 mblk_t *nbp;
1446 dblk_t *dp, *ndp;
1447 uchar_t *base;
1448 size_t size;
1449 size_t unaligned;
1450
1451 ASSERT(bp->b_wptr >= bp->b_rptr);
1452
1453 dp = bp->b_datap;
1454 if (dp->db_fthdr != NULL)
1455 STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
1456
1457 size = dp->db_lim - dp->db_base;
1458 unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
1459 if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
1460 return (NULL);
1461 nbp->b_flag = bp->b_flag;
1462 nbp->b_band = bp->b_band;
1463 ndp = nbp->b_datap;
1464
1465 /*
1466 * Copy the various checksum information that came in
1467 * originally.
1468 */
1469 ndp->db_cksumstart = dp->db_cksumstart;
1470 ndp->db_cksumend = dp->db_cksumend;
1471 ndp->db_cksumstuff = dp->db_cksumstuff;
1472 bcopy(dp->db_struioun.data, ndp->db_struioun.data,
1473 sizeof (dp->db_struioun.data));
1474
1475 /*
1476 * Well, here is a potential issue. If we are trying to
1477 * trace a flow, and we copy the message, we might lose
1478 * information about where this message might have been.
1479 * So we should inherit the FT data. On the other hand,
1480 * a user might be interested only in alloc to free data.
1481 * So I guess the real answer is to provide a tunable.
1482 */
1483 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1484
1485 base = ndp->db_base + unaligned;
1486 bcopy(dp->db_base, ndp->db_base + unaligned, size);
1487
1488 nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
1489 nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
1490
1491 return (nbp);
1492 }
1493
1494 /*
1495 * Copy data from message to newly allocated message using new
1496 * data blocks. Returns a pointer to the new message, or NULL if error.
1497 */
1498 mblk_t *
copymsg(mblk_t * bp)1499 copymsg(mblk_t *bp)
1500 {
1501 mblk_t *head, *nbp;
1502
1503 if (!bp || !(nbp = head = copyb(bp)))
1504 return (NULL);
1505
1506 while (bp->b_cont) {
1507 if (!(nbp->b_cont = copyb(bp->b_cont))) {
1508 freemsg(head);
1509 return (NULL);
1510 }
1511 nbp = nbp->b_cont;
1512 bp = bp->b_cont;
1513 }
1514 return (head);
1515 }
1516
1517 /*
1518 * link a message block to tail of message
1519 */
1520 void
linkb(mblk_t * mp,mblk_t * bp)1521 linkb(mblk_t *mp, mblk_t *bp)
1522 {
1523 ASSERT(mp && bp);
1524
1525 for (; mp->b_cont; mp = mp->b_cont)
1526 ;
1527 mp->b_cont = bp;
1528 }
1529
1530 /*
1531 * unlink a message block from head of message
1532 * return pointer to new message.
1533 * NULL if message becomes empty.
1534 */
1535 mblk_t *
unlinkb(mblk_t * bp)1536 unlinkb(mblk_t *bp)
1537 {
1538 mblk_t *bp1;
1539
1540 bp1 = bp->b_cont;
1541 bp->b_cont = NULL;
1542 return (bp1);
1543 }
1544
1545 /*
1546 * remove a message block "bp" from message "mp"
1547 *
1548 * Return pointer to new message or NULL if no message remains.
1549 * Return -1 if bp is not found in message.
1550 */
1551 mblk_t *
rmvb(mblk_t * mp,mblk_t * bp)1552 rmvb(mblk_t *mp, mblk_t *bp)
1553 {
1554 mblk_t *tmp;
1555 mblk_t *lastp = NULL;
1556
1557 ASSERT(mp && bp);
1558 for (tmp = mp; tmp; tmp = tmp->b_cont) {
1559 if (tmp == bp) {
1560 if (lastp)
1561 lastp->b_cont = tmp->b_cont;
1562 else
1563 mp = tmp->b_cont;
1564 tmp->b_cont = NULL;
1565 return (mp);
1566 }
1567 lastp = tmp;
1568 }
1569 return ((mblk_t *)-1);
1570 }
1571
1572 /*
1573 * Concatenate and align first len bytes of common
1574 * message type. Len == -1, means concat everything.
1575 * Returns 1 on success, 0 on failure
1576 * After the pullup, mp points to the pulled up data.
1577 */
1578 int
pullupmsg(mblk_t * mp,ssize_t len)1579 pullupmsg(mblk_t *mp, ssize_t len)
1580 {
1581 mblk_t *bp, *b_cont;
1582 dblk_t *dbp;
1583 ssize_t n;
1584
1585 ASSERT(mp->b_datap->db_ref > 0);
1586 ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
1587
1588 if (len == -1) {
1589 if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
1590 return (1);
1591 len = xmsgsize(mp);
1592 } else {
1593 ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
1594 ASSERT(first_mblk_len >= 0);
1595 /*
1596 * If the length is less than that of the first mblk,
1597 * we want to pull up the message into an aligned mblk.
1598 * Though not part of the spec, some callers assume it.
1599 */
1600 if (len <= first_mblk_len) {
1601 if (str_aligned(mp->b_rptr))
1602 return (1);
1603 len = first_mblk_len;
1604 } else if (xmsgsize(mp) < len)
1605 return (0);
1606 }
1607
1608 if ((bp = allocb_tmpl(len, mp)) == NULL)
1609 return (0);
1610
1611 dbp = bp->b_datap;
1612 *bp = *mp; /* swap mblks so bp heads the old msg... */
1613 mp->b_datap = dbp; /* ... and mp heads the new message */
1614 mp->b_datap->db_mblk = mp;
1615 bp->b_datap->db_mblk = bp;
1616 mp->b_rptr = mp->b_wptr = dbp->db_base;
1617
1618 do {
1619 ASSERT(bp->b_datap->db_ref > 0);
1620 ASSERT(bp->b_wptr >= bp->b_rptr);
1621 n = MIN(bp->b_wptr - bp->b_rptr, len);
1622 ASSERT(n >= 0); /* allow zero-length mblk_t's */
1623 if (n > 0)
1624 bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
1625 mp->b_wptr += n;
1626 bp->b_rptr += n;
1627 len -= n;
1628 if (bp->b_rptr != bp->b_wptr)
1629 break;
1630 b_cont = bp->b_cont;
1631 freeb(bp);
1632 bp = b_cont;
1633 } while (len && bp);
1634
1635 mp->b_cont = bp; /* tack on whatever wasn't pulled up */
1636
1637 return (1);
1638 }
1639
1640 /*
1641 * Concatenate and align at least the first len bytes of common message
1642 * type. Len == -1 means concatenate everything. The original message is
1643 * unaltered. Returns a pointer to a new message on success, otherwise
1644 * returns NULL.
1645 */
1646 mblk_t *
msgpullup(mblk_t * mp,ssize_t len)1647 msgpullup(mblk_t *mp, ssize_t len)
1648 {
1649 mblk_t *newmp;
1650 ssize_t totlen;
1651 ssize_t n;
1652
1653 totlen = xmsgsize(mp);
1654
1655 if ((len > 0) && (len > totlen))
1656 return (NULL);
1657
1658 /*
1659 * Copy all of the first msg type into one new mblk, then dupmsg
1660 * and link the rest onto this.
1661 */
1662
1663 len = totlen;
1664
1665 if ((newmp = allocb_tmpl(len, mp)) == NULL)
1666 return (NULL);
1667
1668 newmp->b_flag = mp->b_flag;
1669 newmp->b_band = mp->b_band;
1670
1671 while (len > 0) {
1672 n = mp->b_wptr - mp->b_rptr;
1673 ASSERT(n >= 0); /* allow zero-length mblk_t's */
1674 if (n > 0)
1675 bcopy(mp->b_rptr, newmp->b_wptr, n);
1676 newmp->b_wptr += n;
1677 len -= n;
1678 mp = mp->b_cont;
1679 }
1680
1681 if (mp != NULL) {
1682 newmp->b_cont = dupmsg(mp);
1683 if (newmp->b_cont == NULL) {
1684 freemsg(newmp);
1685 return (NULL);
1686 }
1687 }
1688
1689 return (newmp);
1690 }
1691
1692 /*
1693 * Trim bytes from message
1694 * len > 0, trim from head
1695 * len < 0, trim from tail
1696 * Returns 1 on success, 0 on failure.
1697 */
1698 int
adjmsg(mblk_t * mp,ssize_t len)1699 adjmsg(mblk_t *mp, ssize_t len)
1700 {
1701 mblk_t *bp;
1702 mblk_t *save_bp = NULL;
1703 mblk_t *prev_bp;
1704 mblk_t *bcont;
1705 unsigned char type;
1706 ssize_t n;
1707 int fromhead;
1708 int first;
1709
1710 ASSERT(mp != NULL);
1711
1712 if (len < 0) {
1713 fromhead = 0;
1714 len = -len;
1715 } else {
1716 fromhead = 1;
1717 }
1718
1719 if (xmsgsize(mp) < len)
1720 return (0);
1721
1722 if (fromhead) {
1723 first = 1;
1724 while (len) {
1725 ASSERT(mp->b_wptr >= mp->b_rptr);
1726 n = MIN(mp->b_wptr - mp->b_rptr, len);
1727 mp->b_rptr += n;
1728 len -= n;
1729
1730 /*
1731 * If this is not the first zero length
1732 * message remove it
1733 */
1734 if (!first && (mp->b_wptr == mp->b_rptr)) {
1735 bcont = mp->b_cont;
1736 freeb(mp);
1737 mp = save_bp->b_cont = bcont;
1738 } else {
1739 save_bp = mp;
1740 mp = mp->b_cont;
1741 }
1742 first = 0;
1743 }
1744 } else {
1745 type = mp->b_datap->db_type;
1746 while (len) {
1747 bp = mp;
1748 save_bp = NULL;
1749
1750 /*
1751 * Find the last message of same type
1752 */
1753 while (bp && bp->b_datap->db_type == type) {
1754 ASSERT(bp->b_wptr >= bp->b_rptr);
1755 prev_bp = save_bp;
1756 save_bp = bp;
1757 bp = bp->b_cont;
1758 }
1759 if (save_bp == NULL)
1760 break;
1761 n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
1762 save_bp->b_wptr -= n;
1763 len -= n;
1764
1765 /*
1766 * If this is not the first message
1767 * and we have taken away everything
1768 * from this message, remove it
1769 */
1770
1771 if ((save_bp != mp) &&
1772 (save_bp->b_wptr == save_bp->b_rptr)) {
1773 bcont = save_bp->b_cont;
1774 freeb(save_bp);
1775 prev_bp->b_cont = bcont;
1776 }
1777 }
1778 }
1779 return (1);
1780 }
1781
1782 /*
1783 * get number of data bytes in message
1784 */
1785 size_t
msgdsize(mblk_t * bp)1786 msgdsize(mblk_t *bp)
1787 {
1788 size_t count = 0;
1789
1790 for (; bp; bp = bp->b_cont)
1791 if (bp->b_datap->db_type == M_DATA) {
1792 ASSERT(bp->b_wptr >= bp->b_rptr);
1793 count += bp->b_wptr - bp->b_rptr;
1794 }
1795 return (count);
1796 }
1797
1798 /*
1799 * Get a message off head of queue
1800 *
1801 * If queue has no buffers then mark queue
1802 * with QWANTR. (queue wants to be read by
1803 * someone when data becomes available)
1804 *
1805 * If there is something to take off then do so.
1806 * If queue falls below hi water mark turn off QFULL
1807 * flag. Decrement weighted count of queue.
1808 * Also turn off QWANTR because queue is being read.
1809 *
1810 * The queue count is maintained on a per-band basis.
1811 * Priority band 0 (normal messages) uses q_count,
1812 * q_lowat, etc. Non-zero priority bands use the
1813 * fields in their respective qband structures
1814 * (qb_count, qb_lowat, etc.) All messages appear
1815 * on the same list, linked via their b_next pointers.
1816 * q_first is the head of the list. q_count does
1817 * not reflect the size of all the messages on the
1818 * queue. It only reflects those messages in the
1819 * normal band of flow. The one exception to this
1820 * deals with high priority messages. They are in
1821 * their own conceptual "band", but are accounted
1822 * against q_count.
1823 *
1824 * If queue count is below the lo water mark and QWANTW
1825 * is set, enable the closest backq which has a service
1826 * procedure and turn off the QWANTW flag.
1827 *
1828 * getq could be built on top of rmvq, but isn't because
1829 * of performance considerations.
1830 *
1831 * A note on the use of q_count and q_mblkcnt:
1832 * q_count is the traditional byte count for messages that
1833 * have been put on a queue. Documentation tells us that
1834 * we shouldn't rely on that count, but some drivers/modules
1835 * do. What was needed, however, is a mechanism to prevent
1836 * runaway streams from consuming all of the resources,
1837 * and particularly be able to flow control zero-length
1838 * messages. q_mblkcnt is used for this purpose. It
1839 * counts the number of mblk's that are being put on
1840 * the queue. The intention here, is that each mblk should
1841 * contain one byte of data and, for the purpose of
1842 * flow-control, logically does. A queue will become
1843 * full when EITHER of these values (q_count and q_mblkcnt)
1844 * reach the highwater mark. It will clear when BOTH
1845 * of them drop below the highwater mark. And it will
1846 * backenable when BOTH of them drop below the lowwater
1847 * mark.
1848 * With this algorithm, a driver/module might be able
1849 * to find a reasonably accurate q_count, and the
1850 * framework can still try and limit resource usage.
1851 */
1852 mblk_t *
getq(queue_t * q)1853 getq(queue_t *q)
1854 {
1855 mblk_t *bp;
1856 uchar_t band = 0;
1857
1858 bp = getq_noenab(q, 0);
1859 if (bp != NULL)
1860 band = bp->b_band;
1861
1862 /*
1863 * Inlined from qbackenable().
1864 * Quick check without holding the lock.
1865 */
1866 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1867 return (bp);
1868
1869 qbackenable(q, band);
1870 return (bp);
1871 }
1872
1873 /*
1874 * Returns the number of bytes in a message (a message is defined as a
1875 * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we
1876 * also return the number of distinct mblks in the message.
1877 */
1878 int
mp_cont_len(mblk_t * bp,int * mblkcnt)1879 mp_cont_len(mblk_t *bp, int *mblkcnt)
1880 {
1881 mblk_t *mp;
1882 int mblks = 0;
1883 int bytes = 0;
1884
1885 for (mp = bp; mp != NULL; mp = mp->b_cont) {
1886 bytes += MBLKL(mp);
1887 mblks++;
1888 }
1889
1890 if (mblkcnt != NULL)
1891 *mblkcnt = mblks;
1892
1893 return (bytes);
1894 }
1895
1896 /*
1897 * Like getq() but does not backenable. This is used by the stream
1898 * head when a putback() is likely. The caller must call qbackenable()
1899 * after it is done with accessing the queue.
1900 * The rbytes arguments to getq_noneab() allows callers to specify a
1901 * the maximum number of bytes to return. If the current amount on the
1902 * queue is less than this then the entire message will be returned.
1903 * A value of 0 returns the entire message and is equivalent to the old
1904 * default behaviour prior to the addition of the rbytes argument.
1905 */
1906 mblk_t *
getq_noenab(queue_t * q,ssize_t rbytes)1907 getq_noenab(queue_t *q, ssize_t rbytes)
1908 {
1909 mblk_t *bp, *mp1;
1910 mblk_t *mp2 = NULL;
1911 qband_t *qbp;
1912 kthread_id_t freezer;
1913 int bytecnt = 0, mblkcnt = 0;
1914
1915 /* freezestr should allow its caller to call getq/putq */
1916 freezer = STREAM(q)->sd_freezer;
1917 if (freezer == curthread) {
1918 ASSERT(frozenstr(q));
1919 ASSERT(MUTEX_HELD(QLOCK(q)));
1920 } else
1921 mutex_enter(QLOCK(q));
1922
1923 if ((bp = q->q_first) == 0) {
1924 q->q_flag |= QWANTR;
1925 } else {
1926 /*
1927 * If the caller supplied a byte threshold and there is
1928 * more than this amount on the queue then break up the
1929 * the message appropriately. We can only safely do
1930 * this for M_DATA messages.
1931 */
1932 if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) &&
1933 (q->q_count > rbytes)) {
1934 /*
1935 * Inline version of mp_cont_len() which terminates
1936 * when we meet or exceed rbytes.
1937 */
1938 for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) {
1939 mblkcnt++;
1940 bytecnt += MBLKL(mp1);
1941 if (bytecnt >= rbytes)
1942 break;
1943 }
1944 /*
1945 * We need to account for the following scenarios:
1946 *
1947 * 1) Too much data in the first message:
1948 * mp1 will be the mblk which puts us over our
1949 * byte limit.
1950 * 2) Not enough data in the first message:
1951 * mp1 will be NULL.
1952 * 3) Exactly the right amount of data contained within
1953 * whole mblks:
1954 * mp1->b_cont will be where we break the message.
1955 */
1956 if (bytecnt > rbytes) {
1957 /*
1958 * Dup/copy mp1 and put what we don't need
1959 * back onto the queue. Adjust the read/write
1960 * and continuation pointers appropriately
1961 * and decrement the current mblk count to
1962 * reflect we are putting an mblk back onto
1963 * the queue.
1964 * When adjusting the message pointers, it's
1965 * OK to use the existing bytecnt and the
1966 * requested amount (rbytes) to calculate the
1967 * the new write offset (b_wptr) of what we
1968 * are taking. However, we cannot use these
1969 * values when calculating the read offset of
1970 * the mblk we are putting back on the queue.
1971 * This is because the begining (b_rptr) of the
1972 * mblk represents some arbitrary point within
1973 * the message.
1974 * It's simplest to do this by advancing b_rptr
1975 * by the new length of mp1 as we don't have to
1976 * remember any intermediate state.
1977 */
1978 ASSERT(mp1 != NULL);
1979 mblkcnt--;
1980 if ((mp2 = dupb(mp1)) == NULL &&
1981 (mp2 = copyb(mp1)) == NULL) {
1982 bytecnt = mblkcnt = 0;
1983 goto dup_failed;
1984 }
1985 mp2->b_cont = mp1->b_cont;
1986 mp1->b_wptr -= bytecnt - rbytes;
1987 mp2->b_rptr += mp1->b_wptr - mp1->b_rptr;
1988 mp1->b_cont = NULL;
1989 bytecnt = rbytes;
1990 } else {
1991 /*
1992 * Either there is not enough data in the first
1993 * message or there is no excess data to deal
1994 * with. If mp1 is NULL, we are taking the
1995 * whole message. No need to do anything.
1996 * Otherwise we assign mp1->b_cont to mp2 as
1997 * we will be putting this back onto the head of
1998 * the queue.
1999 */
2000 if (mp1 != NULL) {
2001 mp2 = mp1->b_cont;
2002 mp1->b_cont = NULL;
2003 }
2004 }
2005 /*
2006 * If mp2 is not NULL then we have part of the message
2007 * to put back onto the queue.
2008 */
2009 if (mp2 != NULL) {
2010 if ((mp2->b_next = bp->b_next) == NULL)
2011 q->q_last = mp2;
2012 else
2013 bp->b_next->b_prev = mp2;
2014 q->q_first = mp2;
2015 } else {
2016 if ((q->q_first = bp->b_next) == NULL)
2017 q->q_last = NULL;
2018 else
2019 q->q_first->b_prev = NULL;
2020 }
2021 } else {
2022 /*
2023 * Either no byte threshold was supplied, there is
2024 * not enough on the queue or we failed to
2025 * duplicate/copy a data block. In these cases we
2026 * just take the entire first message.
2027 */
2028 dup_failed:
2029 bytecnt = mp_cont_len(bp, &mblkcnt);
2030 if ((q->q_first = bp->b_next) == NULL)
2031 q->q_last = NULL;
2032 else
2033 q->q_first->b_prev = NULL;
2034 }
2035 if (bp->b_band == 0) {
2036 q->q_count -= bytecnt;
2037 q->q_mblkcnt -= mblkcnt;
2038 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2039 (q->q_mblkcnt < q->q_hiwat))) {
2040 q->q_flag &= ~QFULL;
2041 }
2042 } else {
2043 int i;
2044
2045 ASSERT(bp->b_band <= q->q_nband);
2046 ASSERT(q->q_bandp != NULL);
2047 ASSERT(MUTEX_HELD(QLOCK(q)));
2048 qbp = q->q_bandp;
2049 i = bp->b_band;
2050 while (--i > 0)
2051 qbp = qbp->qb_next;
2052 if (qbp->qb_first == qbp->qb_last) {
2053 qbp->qb_first = NULL;
2054 qbp->qb_last = NULL;
2055 } else {
2056 qbp->qb_first = bp->b_next;
2057 }
2058 qbp->qb_count -= bytecnt;
2059 qbp->qb_mblkcnt -= mblkcnt;
2060 if (qbp->qb_mblkcnt == 0 ||
2061 ((qbp->qb_count < qbp->qb_hiwat) &&
2062 (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2063 qbp->qb_flag &= ~QB_FULL;
2064 }
2065 }
2066 q->q_flag &= ~QWANTR;
2067 bp->b_next = NULL;
2068 bp->b_prev = NULL;
2069 }
2070 if (freezer != curthread)
2071 mutex_exit(QLOCK(q));
2072
2073 STR_FTEVENT_MSG(bp, q, FTEV_GETQ, 0);
2074
2075 return (bp);
2076 }
2077
2078 /*
2079 * Determine if a backenable is needed after removing a message in the
2080 * specified band.
2081 * NOTE: This routine assumes that something like getq_noenab() has been
2082 * already called.
2083 *
2084 * For the read side it is ok to hold sd_lock across calling this (and the
2085 * stream head often does).
2086 * But for the write side strwakeq might be invoked and it acquires sd_lock.
2087 */
2088 void
qbackenable(queue_t * q,uchar_t band)2089 qbackenable(queue_t *q, uchar_t band)
2090 {
2091 int backenab = 0;
2092 qband_t *qbp;
2093 kthread_id_t freezer;
2094
2095 ASSERT(q);
2096 ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
2097
2098 /*
2099 * Quick check without holding the lock.
2100 * OK since after getq() has lowered the q_count these flags
2101 * would not change unless either the qbackenable() is done by
2102 * another thread (which is ok) or the queue has gotten QFULL
2103 * in which case another backenable will take place when the queue
2104 * drops below q_lowat.
2105 */
2106 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
2107 return;
2108
2109 /* freezestr should allow its caller to call getq/putq */
2110 freezer = STREAM(q)->sd_freezer;
2111 if (freezer == curthread) {
2112 ASSERT(frozenstr(q));
2113 ASSERT(MUTEX_HELD(QLOCK(q)));
2114 } else
2115 mutex_enter(QLOCK(q));
2116
2117 if (band == 0) {
2118 if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
2119 q->q_mblkcnt < q->q_lowat)) {
2120 backenab = q->q_flag & (QWANTW|QWANTWSYNC);
2121 }
2122 } else {
2123 int i;
2124
2125 ASSERT((unsigned)band <= q->q_nband);
2126 ASSERT(q->q_bandp != NULL);
2127
2128 qbp = q->q_bandp;
2129 i = band;
2130 while (--i > 0)
2131 qbp = qbp->qb_next;
2132
2133 if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
2134 qbp->qb_mblkcnt < qbp->qb_lowat)) {
2135 backenab = qbp->qb_flag & QB_WANTW;
2136 }
2137 }
2138
2139 if (backenab == 0) {
2140 if (freezer != curthread)
2141 mutex_exit(QLOCK(q));
2142 return;
2143 }
2144
2145 /* Have to drop the lock across strwakeq and backenable */
2146 if (backenab & QWANTWSYNC)
2147 q->q_flag &= ~QWANTWSYNC;
2148 if (backenab & (QWANTW|QB_WANTW)) {
2149 if (band != 0)
2150 qbp->qb_flag &= ~QB_WANTW;
2151 else {
2152 q->q_flag &= ~QWANTW;
2153 }
2154 }
2155
2156 if (freezer != curthread)
2157 mutex_exit(QLOCK(q));
2158
2159 if (backenab & QWANTWSYNC)
2160 strwakeq(q, QWANTWSYNC);
2161 if (backenab & (QWANTW|QB_WANTW))
2162 backenable(q, band);
2163 }
2164
2165 /*
2166 * Remove a message from a queue. The queue count and other
2167 * flow control parameters are adjusted and the back queue
2168 * enabled if necessary.
2169 *
2170 * rmvq can be called with the stream frozen, but other utility functions
2171 * holding QLOCK, and by streams modules without any locks/frozen.
2172 */
2173 void
rmvq(queue_t * q,mblk_t * mp)2174 rmvq(queue_t *q, mblk_t *mp)
2175 {
2176 ASSERT(mp != NULL);
2177
2178 rmvq_noenab(q, mp);
2179 if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
2180 /*
2181 * qbackenable can handle a frozen stream but not a "random"
2182 * qlock being held. Drop lock across qbackenable.
2183 */
2184 mutex_exit(QLOCK(q));
2185 qbackenable(q, mp->b_band);
2186 mutex_enter(QLOCK(q));
2187 } else {
2188 qbackenable(q, mp->b_band);
2189 }
2190 }
2191
2192 /*
2193 * Like rmvq() but without any backenabling.
2194 * This exists to handle SR_CONSOL_DATA in strrput().
2195 */
2196 void
rmvq_noenab(queue_t * q,mblk_t * mp)2197 rmvq_noenab(queue_t *q, mblk_t *mp)
2198 {
2199 int i;
2200 qband_t *qbp = NULL;
2201 kthread_id_t freezer;
2202 int bytecnt = 0, mblkcnt = 0;
2203
2204 freezer = STREAM(q)->sd_freezer;
2205 if (freezer == curthread) {
2206 ASSERT(frozenstr(q));
2207 ASSERT(MUTEX_HELD(QLOCK(q)));
2208 } else if (MUTEX_HELD(QLOCK(q))) {
2209 /* Don't drop lock on exit */
2210 freezer = curthread;
2211 } else
2212 mutex_enter(QLOCK(q));
2213
2214 ASSERT(mp->b_band <= q->q_nband);
2215 if (mp->b_band != 0) { /* Adjust band pointers */
2216 ASSERT(q->q_bandp != NULL);
2217 qbp = q->q_bandp;
2218 i = mp->b_band;
2219 while (--i > 0)
2220 qbp = qbp->qb_next;
2221 if (mp == qbp->qb_first) {
2222 if (mp->b_next && mp->b_band == mp->b_next->b_band)
2223 qbp->qb_first = mp->b_next;
2224 else
2225 qbp->qb_first = NULL;
2226 }
2227 if (mp == qbp->qb_last) {
2228 if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
2229 qbp->qb_last = mp->b_prev;
2230 else
2231 qbp->qb_last = NULL;
2232 }
2233 }
2234
2235 /*
2236 * Remove the message from the list.
2237 */
2238 if (mp->b_prev)
2239 mp->b_prev->b_next = mp->b_next;
2240 else
2241 q->q_first = mp->b_next;
2242 if (mp->b_next)
2243 mp->b_next->b_prev = mp->b_prev;
2244 else
2245 q->q_last = mp->b_prev;
2246 mp->b_next = NULL;
2247 mp->b_prev = NULL;
2248
2249 /* Get the size of the message for q_count accounting */
2250 bytecnt = mp_cont_len(mp, &mblkcnt);
2251
2252 if (mp->b_band == 0) { /* Perform q_count accounting */
2253 q->q_count -= bytecnt;
2254 q->q_mblkcnt -= mblkcnt;
2255 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2256 (q->q_mblkcnt < q->q_hiwat))) {
2257 q->q_flag &= ~QFULL;
2258 }
2259 } else { /* Perform qb_count accounting */
2260 qbp->qb_count -= bytecnt;
2261 qbp->qb_mblkcnt -= mblkcnt;
2262 if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) &&
2263 (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2264 qbp->qb_flag &= ~QB_FULL;
2265 }
2266 }
2267 if (freezer != curthread)
2268 mutex_exit(QLOCK(q));
2269
2270 STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, 0);
2271 }
2272
2273 /*
2274 * Empty a queue.
2275 * If flag is set, remove all messages. Otherwise, remove
2276 * only non-control messages. If queue falls below its low
2277 * water mark, and QWANTW is set, enable the nearest upstream
2278 * service procedure.
2279 *
2280 * Historical note: when merging the M_FLUSH code in strrput with this
2281 * code one difference was discovered. flushq did not have a check
2282 * for q_lowat == 0 in the backenabling test.
2283 *
2284 * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
2285 * if one exists on the queue.
2286 */
2287 void
flushq_common(queue_t * q,int flag,int pcproto_flag)2288 flushq_common(queue_t *q, int flag, int pcproto_flag)
2289 {
2290 mblk_t *mp, *nmp;
2291 qband_t *qbp;
2292 int backenab = 0;
2293 unsigned char bpri;
2294 unsigned char qbf[NBAND]; /* band flushing backenable flags */
2295
2296 if (q->q_first == NULL)
2297 return;
2298
2299 mutex_enter(QLOCK(q));
2300 mp = q->q_first;
2301 q->q_first = NULL;
2302 q->q_last = NULL;
2303 q->q_count = 0;
2304 q->q_mblkcnt = 0;
2305 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2306 qbp->qb_first = NULL;
2307 qbp->qb_last = NULL;
2308 qbp->qb_count = 0;
2309 qbp->qb_mblkcnt = 0;
2310 qbp->qb_flag &= ~QB_FULL;
2311 }
2312 q->q_flag &= ~QFULL;
2313 mutex_exit(QLOCK(q));
2314 while (mp) {
2315 nmp = mp->b_next;
2316 mp->b_next = mp->b_prev = NULL;
2317
2318 STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, 0);
2319
2320 if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
2321 (void) putq(q, mp);
2322 else if (flag || datamsg(mp->b_datap->db_type))
2323 freemsg(mp);
2324 else
2325 (void) putq(q, mp);
2326 mp = nmp;
2327 }
2328 bpri = 1;
2329 mutex_enter(QLOCK(q));
2330 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2331 if ((qbp->qb_flag & QB_WANTW) &&
2332 (((qbp->qb_count < qbp->qb_lowat) &&
2333 (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
2334 qbp->qb_lowat == 0)) {
2335 qbp->qb_flag &= ~QB_WANTW;
2336 backenab = 1;
2337 qbf[bpri] = 1;
2338 } else
2339 qbf[bpri] = 0;
2340 bpri++;
2341 }
2342 ASSERT(bpri == (unsigned char)(q->q_nband + 1));
2343 if ((q->q_flag & QWANTW) &&
2344 (((q->q_count < q->q_lowat) &&
2345 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2346 q->q_flag &= ~QWANTW;
2347 backenab = 1;
2348 qbf[0] = 1;
2349 } else
2350 qbf[0] = 0;
2351
2352 /*
2353 * If any band can now be written to, and there is a writer
2354 * for that band, then backenable the closest service procedure.
2355 */
2356 if (backenab) {
2357 mutex_exit(QLOCK(q));
2358 for (bpri = q->q_nband; bpri != 0; bpri--)
2359 if (qbf[bpri])
2360 backenable(q, bpri);
2361 if (qbf[0])
2362 backenable(q, 0);
2363 } else
2364 mutex_exit(QLOCK(q));
2365 }
2366
2367 /*
2368 * The real flushing takes place in flushq_common. This is done so that
2369 * a flag which specifies whether or not M_PCPROTO messages should be flushed
2370 * or not. Currently the only place that uses this flag is the stream head.
2371 */
2372 void
flushq(queue_t * q,int flag)2373 flushq(queue_t *q, int flag)
2374 {
2375 flushq_common(q, flag, 0);
2376 }
2377
2378 /*
2379 * Flush the queue of messages of the given priority band.
2380 * There is some duplication of code between flushq and flushband.
2381 * This is because we want to optimize the code as much as possible.
2382 * The assumption is that there will be more messages in the normal
2383 * (priority 0) band than in any other.
2384 *
2385 * Historical note: when merging the M_FLUSH code in strrput with this
2386 * code one difference was discovered. flushband had an extra check for
2387 * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
2388 * case. That check does not match the man page for flushband and was not
2389 * in the strrput flush code hence it was removed.
2390 */
2391 void
flushband(queue_t * q,unsigned char pri,int flag)2392 flushband(queue_t *q, unsigned char pri, int flag)
2393 {
2394 mblk_t *mp;
2395 mblk_t *nmp;
2396 mblk_t *last;
2397 qband_t *qbp;
2398 int band;
2399
2400 ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
2401 if (pri > q->q_nband) {
2402 return;
2403 }
2404 mutex_enter(QLOCK(q));
2405 if (pri == 0) {
2406 mp = q->q_first;
2407 q->q_first = NULL;
2408 q->q_last = NULL;
2409 q->q_count = 0;
2410 q->q_mblkcnt = 0;
2411 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2412 qbp->qb_first = NULL;
2413 qbp->qb_last = NULL;
2414 qbp->qb_count = 0;
2415 qbp->qb_mblkcnt = 0;
2416 qbp->qb_flag &= ~QB_FULL;
2417 }
2418 q->q_flag &= ~QFULL;
2419 mutex_exit(QLOCK(q));
2420 while (mp) {
2421 nmp = mp->b_next;
2422 mp->b_next = mp->b_prev = NULL;
2423 if ((mp->b_band == 0) &&
2424 ((flag == FLUSHALL) ||
2425 datamsg(mp->b_datap->db_type)))
2426 freemsg(mp);
2427 else
2428 (void) putq(q, mp);
2429 mp = nmp;
2430 }
2431 mutex_enter(QLOCK(q));
2432 if ((q->q_flag & QWANTW) &&
2433 (((q->q_count < q->q_lowat) &&
2434 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2435 q->q_flag &= ~QWANTW;
2436 mutex_exit(QLOCK(q));
2437
2438 backenable(q, pri);
2439 } else
2440 mutex_exit(QLOCK(q));
2441 } else { /* pri != 0 */
2442 boolean_t flushed = B_FALSE;
2443 band = pri;
2444
2445 ASSERT(MUTEX_HELD(QLOCK(q)));
2446 qbp = q->q_bandp;
2447 while (--band > 0)
2448 qbp = qbp->qb_next;
2449 mp = qbp->qb_first;
2450 if (mp == NULL) {
2451 mutex_exit(QLOCK(q));
2452 return;
2453 }
2454 last = qbp->qb_last->b_next;
2455 /*
2456 * rmvq_noenab() and freemsg() are called for each mblk that
2457 * meets the criteria. The loop is executed until the last
2458 * mblk has been processed.
2459 */
2460 while (mp != last) {
2461 ASSERT(mp->b_band == pri);
2462 nmp = mp->b_next;
2463 if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
2464 rmvq_noenab(q, mp);
2465 freemsg(mp);
2466 flushed = B_TRUE;
2467 }
2468 mp = nmp;
2469 }
2470 mutex_exit(QLOCK(q));
2471
2472 /*
2473 * If any mblk(s) has been freed, we know that qbackenable()
2474 * will need to be called.
2475 */
2476 if (flushed)
2477 qbackenable(q, pri);
2478 }
2479 }
2480
2481 /*
2482 * Return 1 if the queue is not full. If the queue is full, return
2483 * 0 (may not put message) and set QWANTW flag (caller wants to write
2484 * to the queue).
2485 */
2486 int
canput(queue_t * q)2487 canput(queue_t *q)
2488 {
2489 TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
2490
2491 /* this is for loopback transports, they should not do a canput */
2492 ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
2493
2494 /* Find next forward module that has a service procedure */
2495 q = q->q_nfsrv;
2496
2497 if (!(q->q_flag & QFULL)) {
2498 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2499 return (1);
2500 }
2501 mutex_enter(QLOCK(q));
2502 if (q->q_flag & QFULL) {
2503 q->q_flag |= QWANTW;
2504 mutex_exit(QLOCK(q));
2505 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
2506 return (0);
2507 }
2508 mutex_exit(QLOCK(q));
2509 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2510 return (1);
2511 }
2512
2513 /*
2514 * This is the new canput for use with priority bands. Return 1 if the
2515 * band is not full. If the band is full, return 0 (may not put message)
2516 * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
2517 * write to the queue).
2518 */
2519 int
bcanput(queue_t * q,unsigned char pri)2520 bcanput(queue_t *q, unsigned char pri)
2521 {
2522 qband_t *qbp;
2523
2524 TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
2525 if (!q)
2526 return (0);
2527
2528 /* Find next forward module that has a service procedure */
2529 q = q->q_nfsrv;
2530
2531 mutex_enter(QLOCK(q));
2532 if (pri == 0) {
2533 if (q->q_flag & QFULL) {
2534 q->q_flag |= QWANTW;
2535 mutex_exit(QLOCK(q));
2536 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2537 "bcanput:%p %X %d", q, pri, 0);
2538 return (0);
2539 }
2540 } else { /* pri != 0 */
2541 if (pri > q->q_nband) {
2542 /*
2543 * No band exists yet, so return success.
2544 */
2545 mutex_exit(QLOCK(q));
2546 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2547 "bcanput:%p %X %d", q, pri, 1);
2548 return (1);
2549 }
2550 qbp = q->q_bandp;
2551 while (--pri)
2552 qbp = qbp->qb_next;
2553 if (qbp->qb_flag & QB_FULL) {
2554 qbp->qb_flag |= QB_WANTW;
2555 mutex_exit(QLOCK(q));
2556 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2557 "bcanput:%p %X %d", q, pri, 0);
2558 return (0);
2559 }
2560 }
2561 mutex_exit(QLOCK(q));
2562 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2563 "bcanput:%p %X %d", q, pri, 1);
2564 return (1);
2565 }
2566
2567 /*
2568 * Put a message on a queue.
2569 *
2570 * Messages are enqueued on a priority basis. The priority classes
2571 * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
2572 * and B_NORMAL (type < QPCTL && band == 0).
2573 *
2574 * Add appropriate weighted data block sizes to queue count.
2575 * If queue hits high water mark then set QFULL flag.
2576 *
2577 * If QNOENAB is not set (putq is allowed to enable the queue),
2578 * enable the queue only if the message is PRIORITY,
2579 * or the QWANTR flag is set (indicating that the service procedure
2580 * is ready to read the queue. This implies that a service
2581 * procedure must NEVER put a high priority message back on its own
2582 * queue, as this would result in an infinite loop (!).
2583 */
2584 int
putq(queue_t * q,mblk_t * bp)2585 putq(queue_t *q, mblk_t *bp)
2586 {
2587 mblk_t *tmp;
2588 qband_t *qbp = NULL;
2589 int mcls = (int)queclass(bp);
2590 kthread_id_t freezer;
2591 int bytecnt = 0, mblkcnt = 0;
2592
2593 freezer = STREAM(q)->sd_freezer;
2594 if (freezer == curthread) {
2595 ASSERT(frozenstr(q));
2596 ASSERT(MUTEX_HELD(QLOCK(q)));
2597 } else
2598 mutex_enter(QLOCK(q));
2599
2600 /*
2601 * Make sanity checks and if qband structure is not yet
2602 * allocated, do so.
2603 */
2604 if (mcls == QPCTL) {
2605 if (bp->b_band != 0)
2606 bp->b_band = 0; /* force to be correct */
2607 } else if (bp->b_band != 0) {
2608 int i;
2609 qband_t **qbpp;
2610
2611 if (bp->b_band > q->q_nband) {
2612
2613 /*
2614 * The qband structure for this priority band is
2615 * not on the queue yet, so we have to allocate
2616 * one on the fly. It would be wasteful to
2617 * associate the qband structures with every
2618 * queue when the queues are allocated. This is
2619 * because most queues will only need the normal
2620 * band of flow which can be described entirely
2621 * by the queue itself.
2622 */
2623 qbpp = &q->q_bandp;
2624 while (*qbpp)
2625 qbpp = &(*qbpp)->qb_next;
2626 while (bp->b_band > q->q_nband) {
2627 if ((*qbpp = allocband()) == NULL) {
2628 if (freezer != curthread)
2629 mutex_exit(QLOCK(q));
2630 return (0);
2631 }
2632 (*qbpp)->qb_hiwat = q->q_hiwat;
2633 (*qbpp)->qb_lowat = q->q_lowat;
2634 q->q_nband++;
2635 qbpp = &(*qbpp)->qb_next;
2636 }
2637 }
2638 ASSERT(MUTEX_HELD(QLOCK(q)));
2639 qbp = q->q_bandp;
2640 i = bp->b_band;
2641 while (--i)
2642 qbp = qbp->qb_next;
2643 }
2644
2645 /*
2646 * If queue is empty, add the message and initialize the pointers.
2647 * Otherwise, adjust message pointers and queue pointers based on
2648 * the type of the message and where it belongs on the queue. Some
2649 * code is duplicated to minimize the number of conditionals and
2650 * hopefully minimize the amount of time this routine takes.
2651 */
2652 if (!q->q_first) {
2653 bp->b_next = NULL;
2654 bp->b_prev = NULL;
2655 q->q_first = bp;
2656 q->q_last = bp;
2657 if (qbp) {
2658 qbp->qb_first = bp;
2659 qbp->qb_last = bp;
2660 }
2661 } else if (!qbp) { /* bp->b_band == 0 */
2662
2663 /*
2664 * If queue class of message is less than or equal to
2665 * that of the last one on the queue, tack on to the end.
2666 */
2667 tmp = q->q_last;
2668 if (mcls <= (int)queclass(tmp)) {
2669 bp->b_next = NULL;
2670 bp->b_prev = tmp;
2671 tmp->b_next = bp;
2672 q->q_last = bp;
2673 } else {
2674 tmp = q->q_first;
2675 while ((int)queclass(tmp) >= mcls)
2676 tmp = tmp->b_next;
2677
2678 /*
2679 * Insert bp before tmp.
2680 */
2681 bp->b_next = tmp;
2682 bp->b_prev = tmp->b_prev;
2683 if (tmp->b_prev)
2684 tmp->b_prev->b_next = bp;
2685 else
2686 q->q_first = bp;
2687 tmp->b_prev = bp;
2688 }
2689 } else { /* bp->b_band != 0 */
2690 if (qbp->qb_first) {
2691 tmp = qbp->qb_last;
2692
2693 /*
2694 * Insert bp after the last message in this band.
2695 */
2696 bp->b_next = tmp->b_next;
2697 if (tmp->b_next)
2698 tmp->b_next->b_prev = bp;
2699 else
2700 q->q_last = bp;
2701 bp->b_prev = tmp;
2702 tmp->b_next = bp;
2703 } else {
2704 tmp = q->q_last;
2705 if ((mcls < (int)queclass(tmp)) ||
2706 (bp->b_band <= tmp->b_band)) {
2707
2708 /*
2709 * Tack bp on end of queue.
2710 */
2711 bp->b_next = NULL;
2712 bp->b_prev = tmp;
2713 tmp->b_next = bp;
2714 q->q_last = bp;
2715 } else {
2716 tmp = q->q_first;
2717 while (tmp->b_datap->db_type >= QPCTL)
2718 tmp = tmp->b_next;
2719 while (tmp->b_band >= bp->b_band)
2720 tmp = tmp->b_next;
2721
2722 /*
2723 * Insert bp before tmp.
2724 */
2725 bp->b_next = tmp;
2726 bp->b_prev = tmp->b_prev;
2727 if (tmp->b_prev)
2728 tmp->b_prev->b_next = bp;
2729 else
2730 q->q_first = bp;
2731 tmp->b_prev = bp;
2732 }
2733 qbp->qb_first = bp;
2734 }
2735 qbp->qb_last = bp;
2736 }
2737
2738 /* Get message byte count for q_count accounting */
2739 bytecnt = mp_cont_len(bp, &mblkcnt);
2740
2741 if (qbp) {
2742 qbp->qb_count += bytecnt;
2743 qbp->qb_mblkcnt += mblkcnt;
2744 if ((qbp->qb_count >= qbp->qb_hiwat) ||
2745 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2746 qbp->qb_flag |= QB_FULL;
2747 }
2748 } else {
2749 q->q_count += bytecnt;
2750 q->q_mblkcnt += mblkcnt;
2751 if ((q->q_count >= q->q_hiwat) ||
2752 (q->q_mblkcnt >= q->q_hiwat)) {
2753 q->q_flag |= QFULL;
2754 }
2755 }
2756
2757 STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, 0);
2758
2759 if ((mcls > QNORM) ||
2760 (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
2761 qenable_locked(q);
2762 ASSERT(MUTEX_HELD(QLOCK(q)));
2763 if (freezer != curthread)
2764 mutex_exit(QLOCK(q));
2765
2766 return (1);
2767 }
2768
2769 /*
2770 * Put stuff back at beginning of Q according to priority order.
2771 * See comment on putq above for details.
2772 */
2773 int
putbq(queue_t * q,mblk_t * bp)2774 putbq(queue_t *q, mblk_t *bp)
2775 {
2776 mblk_t *tmp;
2777 qband_t *qbp = NULL;
2778 int mcls = (int)queclass(bp);
2779 kthread_id_t freezer;
2780 int bytecnt = 0, mblkcnt = 0;
2781
2782 ASSERT(q && bp);
2783 ASSERT(bp->b_next == NULL);
2784 freezer = STREAM(q)->sd_freezer;
2785 if (freezer == curthread) {
2786 ASSERT(frozenstr(q));
2787 ASSERT(MUTEX_HELD(QLOCK(q)));
2788 } else
2789 mutex_enter(QLOCK(q));
2790
2791 /*
2792 * Make sanity checks and if qband structure is not yet
2793 * allocated, do so.
2794 */
2795 if (mcls == QPCTL) {
2796 if (bp->b_band != 0)
2797 bp->b_band = 0; /* force to be correct */
2798 } else if (bp->b_band != 0) {
2799 int i;
2800 qband_t **qbpp;
2801
2802 if (bp->b_band > q->q_nband) {
2803 qbpp = &q->q_bandp;
2804 while (*qbpp)
2805 qbpp = &(*qbpp)->qb_next;
2806 while (bp->b_band > q->q_nband) {
2807 if ((*qbpp = allocband()) == NULL) {
2808 if (freezer != curthread)
2809 mutex_exit(QLOCK(q));
2810 return (0);
2811 }
2812 (*qbpp)->qb_hiwat = q->q_hiwat;
2813 (*qbpp)->qb_lowat = q->q_lowat;
2814 q->q_nband++;
2815 qbpp = &(*qbpp)->qb_next;
2816 }
2817 }
2818 qbp = q->q_bandp;
2819 i = bp->b_band;
2820 while (--i)
2821 qbp = qbp->qb_next;
2822 }
2823
2824 /*
2825 * If queue is empty or if message is high priority,
2826 * place on the front of the queue.
2827 */
2828 tmp = q->q_first;
2829 if ((!tmp) || (mcls == QPCTL)) {
2830 bp->b_next = tmp;
2831 if (tmp)
2832 tmp->b_prev = bp;
2833 else
2834 q->q_last = bp;
2835 q->q_first = bp;
2836 bp->b_prev = NULL;
2837 if (qbp) {
2838 qbp->qb_first = bp;
2839 qbp->qb_last = bp;
2840 }
2841 } else if (qbp) { /* bp->b_band != 0 */
2842 tmp = qbp->qb_first;
2843 if (tmp) {
2844
2845 /*
2846 * Insert bp before the first message in this band.
2847 */
2848 bp->b_next = tmp;
2849 bp->b_prev = tmp->b_prev;
2850 if (tmp->b_prev)
2851 tmp->b_prev->b_next = bp;
2852 else
2853 q->q_first = bp;
2854 tmp->b_prev = bp;
2855 } else {
2856 tmp = q->q_last;
2857 if ((mcls < (int)queclass(tmp)) ||
2858 (bp->b_band < tmp->b_band)) {
2859
2860 /*
2861 * Tack bp on end of queue.
2862 */
2863 bp->b_next = NULL;
2864 bp->b_prev = tmp;
2865 tmp->b_next = bp;
2866 q->q_last = bp;
2867 } else {
2868 tmp = q->q_first;
2869 while (tmp->b_datap->db_type >= QPCTL)
2870 tmp = tmp->b_next;
2871 while (tmp->b_band > bp->b_band)
2872 tmp = tmp->b_next;
2873
2874 /*
2875 * Insert bp before tmp.
2876 */
2877 bp->b_next = tmp;
2878 bp->b_prev = tmp->b_prev;
2879 if (tmp->b_prev)
2880 tmp->b_prev->b_next = bp;
2881 else
2882 q->q_first = bp;
2883 tmp->b_prev = bp;
2884 }
2885 qbp->qb_last = bp;
2886 }
2887 qbp->qb_first = bp;
2888 } else { /* bp->b_band == 0 && !QPCTL */
2889
2890 /*
2891 * If the queue class or band is less than that of the last
2892 * message on the queue, tack bp on the end of the queue.
2893 */
2894 tmp = q->q_last;
2895 if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
2896 bp->b_next = NULL;
2897 bp->b_prev = tmp;
2898 tmp->b_next = bp;
2899 q->q_last = bp;
2900 } else {
2901 tmp = q->q_first;
2902 while (tmp->b_datap->db_type >= QPCTL)
2903 tmp = tmp->b_next;
2904 while (tmp->b_band > bp->b_band)
2905 tmp = tmp->b_next;
2906
2907 /*
2908 * Insert bp before tmp.
2909 */
2910 bp->b_next = tmp;
2911 bp->b_prev = tmp->b_prev;
2912 if (tmp->b_prev)
2913 tmp->b_prev->b_next = bp;
2914 else
2915 q->q_first = bp;
2916 tmp->b_prev = bp;
2917 }
2918 }
2919
2920 /* Get message byte count for q_count accounting */
2921 bytecnt = mp_cont_len(bp, &mblkcnt);
2922
2923 if (qbp) {
2924 qbp->qb_count += bytecnt;
2925 qbp->qb_mblkcnt += mblkcnt;
2926 if ((qbp->qb_count >= qbp->qb_hiwat) ||
2927 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2928 qbp->qb_flag |= QB_FULL;
2929 }
2930 } else {
2931 q->q_count += bytecnt;
2932 q->q_mblkcnt += mblkcnt;
2933 if ((q->q_count >= q->q_hiwat) ||
2934 (q->q_mblkcnt >= q->q_hiwat)) {
2935 q->q_flag |= QFULL;
2936 }
2937 }
2938
2939 STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, 0);
2940
2941 if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
2942 qenable_locked(q);
2943 ASSERT(MUTEX_HELD(QLOCK(q)));
2944 if (freezer != curthread)
2945 mutex_exit(QLOCK(q));
2946
2947 return (1);
2948 }
2949
2950 /*
2951 * Insert a message before an existing message on the queue. If the
2952 * existing message is NULL, the new messages is placed on the end of
2953 * the queue. The queue class of the new message is ignored. However,
2954 * the priority band of the new message must adhere to the following
2955 * ordering:
2956 *
2957 * emp->b_prev->b_band >= mp->b_band >= emp->b_band.
2958 *
2959 * All flow control parameters are updated.
2960 *
2961 * insq can be called with the stream frozen, but other utility functions
2962 * holding QLOCK, and by streams modules without any locks/frozen.
2963 */
2964 int
insq(queue_t * q,mblk_t * emp,mblk_t * mp)2965 insq(queue_t *q, mblk_t *emp, mblk_t *mp)
2966 {
2967 mblk_t *tmp;
2968 qband_t *qbp = NULL;
2969 int mcls = (int)queclass(mp);
2970 kthread_id_t freezer;
2971 int bytecnt = 0, mblkcnt = 0;
2972
2973 freezer = STREAM(q)->sd_freezer;
2974 if (freezer == curthread) {
2975 ASSERT(frozenstr(q));
2976 ASSERT(MUTEX_HELD(QLOCK(q)));
2977 } else if (MUTEX_HELD(QLOCK(q))) {
2978 /* Don't drop lock on exit */
2979 freezer = curthread;
2980 } else
2981 mutex_enter(QLOCK(q));
2982
2983 if (mcls == QPCTL) {
2984 if (mp->b_band != 0)
2985 mp->b_band = 0; /* force to be correct */
2986 if (emp && emp->b_prev &&
2987 (emp->b_prev->b_datap->db_type < QPCTL))
2988 goto badord;
2989 }
2990 if (emp) {
2991 if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
2992 (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
2993 (emp->b_prev->b_band < mp->b_band))) {
2994 goto badord;
2995 }
2996 } else {
2997 tmp = q->q_last;
2998 if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
2999 badord:
3000 cmn_err(CE_WARN,
3001 "insq: attempt to insert message out of order "
3002 "on q %p", (void *)q);
3003 if (freezer != curthread)
3004 mutex_exit(QLOCK(q));
3005 return (0);
3006 }
3007 }
3008
3009 if (mp->b_band != 0) {
3010 int i;
3011 qband_t **qbpp;
3012
3013 if (mp->b_band > q->q_nband) {
3014 qbpp = &q->q_bandp;
3015 while (*qbpp)
3016 qbpp = &(*qbpp)->qb_next;
3017 while (mp->b_band > q->q_nband) {
3018 if ((*qbpp = allocband()) == NULL) {
3019 if (freezer != curthread)
3020 mutex_exit(QLOCK(q));
3021 return (0);
3022 }
3023 (*qbpp)->qb_hiwat = q->q_hiwat;
3024 (*qbpp)->qb_lowat = q->q_lowat;
3025 q->q_nband++;
3026 qbpp = &(*qbpp)->qb_next;
3027 }
3028 }
3029 qbp = q->q_bandp;
3030 i = mp->b_band;
3031 while (--i)
3032 qbp = qbp->qb_next;
3033 }
3034
3035 if ((mp->b_next = emp) != NULL) {
3036 if ((mp->b_prev = emp->b_prev) != NULL)
3037 emp->b_prev->b_next = mp;
3038 else
3039 q->q_first = mp;
3040 emp->b_prev = mp;
3041 } else {
3042 if ((mp->b_prev = q->q_last) != NULL)
3043 q->q_last->b_next = mp;
3044 else
3045 q->q_first = mp;
3046 q->q_last = mp;
3047 }
3048
3049 /* Get mblk and byte count for q_count accounting */
3050 bytecnt = mp_cont_len(mp, &mblkcnt);
3051
3052 if (qbp) { /* adjust qband pointers and count */
3053 if (!qbp->qb_first) {
3054 qbp->qb_first = mp;
3055 qbp->qb_last = mp;
3056 } else {
3057 if (mp->b_prev == NULL || (mp->b_prev != NULL &&
3058 (mp->b_prev->b_band != mp->b_band)))
3059 qbp->qb_first = mp;
3060 else if (mp->b_next == NULL || (mp->b_next != NULL &&
3061 (mp->b_next->b_band != mp->b_band)))
3062 qbp->qb_last = mp;
3063 }
3064 qbp->qb_count += bytecnt;
3065 qbp->qb_mblkcnt += mblkcnt;
3066 if ((qbp->qb_count >= qbp->qb_hiwat) ||
3067 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
3068 qbp->qb_flag |= QB_FULL;
3069 }
3070 } else {
3071 q->q_count += bytecnt;
3072 q->q_mblkcnt += mblkcnt;
3073 if ((q->q_count >= q->q_hiwat) ||
3074 (q->q_mblkcnt >= q->q_hiwat)) {
3075 q->q_flag |= QFULL;
3076 }
3077 }
3078
3079 STR_FTEVENT_MSG(mp, q, FTEV_INSQ, 0);
3080
3081 if (canenable(q) && (q->q_flag & QWANTR))
3082 qenable_locked(q);
3083
3084 ASSERT(MUTEX_HELD(QLOCK(q)));
3085 if (freezer != curthread)
3086 mutex_exit(QLOCK(q));
3087
3088 return (1);
3089 }
3090
3091 /*
3092 * Create and put a control message on queue.
3093 */
3094 int
putctl(queue_t * q,int type)3095 putctl(queue_t *q, int type)
3096 {
3097 mblk_t *bp;
3098
3099 if ((datamsg(type) && (type != M_DELAY)) ||
3100 (bp = allocb_tryhard(0)) == NULL)
3101 return (0);
3102 bp->b_datap->db_type = (unsigned char) type;
3103
3104 put(q, bp);
3105
3106 return (1);
3107 }
3108
3109 /*
3110 * Control message with a single-byte parameter
3111 */
3112 int
putctl1(queue_t * q,int type,int param)3113 putctl1(queue_t *q, int type, int param)
3114 {
3115 mblk_t *bp;
3116
3117 if ((datamsg(type) && (type != M_DELAY)) ||
3118 (bp = allocb_tryhard(1)) == NULL)
3119 return (0);
3120 bp->b_datap->db_type = (unsigned char)type;
3121 *bp->b_wptr++ = (unsigned char)param;
3122
3123 put(q, bp);
3124
3125 return (1);
3126 }
3127
3128 int
putnextctl1(queue_t * q,int type,int param)3129 putnextctl1(queue_t *q, int type, int param)
3130 {
3131 mblk_t *bp;
3132
3133 if ((datamsg(type) && (type != M_DELAY)) ||
3134 ((bp = allocb_tryhard(1)) == NULL))
3135 return (0);
3136
3137 bp->b_datap->db_type = (unsigned char)type;
3138 *bp->b_wptr++ = (unsigned char)param;
3139
3140 putnext(q, bp);
3141
3142 return (1);
3143 }
3144
3145 int
putnextctl(queue_t * q,int type)3146 putnextctl(queue_t *q, int type)
3147 {
3148 mblk_t *bp;
3149
3150 if ((datamsg(type) && (type != M_DELAY)) ||
3151 ((bp = allocb_tryhard(0)) == NULL))
3152 return (0);
3153 bp->b_datap->db_type = (unsigned char)type;
3154
3155 putnext(q, bp);
3156
3157 return (1);
3158 }
3159
3160 /*
3161 * Return the queue upstream from this one
3162 */
3163 queue_t *
backq(queue_t * q)3164 backq(queue_t *q)
3165 {
3166 q = _OTHERQ(q);
3167 if (q->q_next) {
3168 q = q->q_next;
3169 return (_OTHERQ(q));
3170 }
3171 return (NULL);
3172 }
3173
3174 /*
3175 * Send a block back up the queue in reverse from this
3176 * one (e.g. to respond to ioctls)
3177 */
3178 void
qreply(queue_t * q,mblk_t * bp)3179 qreply(queue_t *q, mblk_t *bp)
3180 {
3181 ASSERT(q && bp);
3182
3183 putnext(_OTHERQ(q), bp);
3184 }
3185
3186 /*
3187 * Streams Queue Scheduling
3188 *
3189 * Queues are enabled through qenable() when they have messages to
3190 * process. They are serviced by queuerun(), which runs each enabled
3191 * queue's service procedure. The call to queuerun() is processor
3192 * dependent - the general principle is that it be run whenever a queue
3193 * is enabled but before returning to user level. For system calls,
3194 * the function runqueues() is called if their action causes a queue
3195 * to be enabled. For device interrupts, queuerun() should be
3196 * called before returning from the last level of interrupt. Beyond
3197 * this, no timing assumptions should be made about queue scheduling.
3198 */
3199
3200 /*
3201 * Enable a queue: put it on list of those whose service procedures are
3202 * ready to run and set up the scheduling mechanism.
3203 * The broadcast is done outside the mutex -> to avoid the woken thread
3204 * from contending with the mutex. This is OK 'cos the queue has been
3205 * enqueued on the runlist and flagged safely at this point.
3206 */
3207 void
qenable(queue_t * q)3208 qenable(queue_t *q)
3209 {
3210 mutex_enter(QLOCK(q));
3211 qenable_locked(q);
3212 mutex_exit(QLOCK(q));
3213 }
3214 /*
3215 * Return number of messages on queue
3216 */
3217 int
qsize(queue_t * qp)3218 qsize(queue_t *qp)
3219 {
3220 int count = 0;
3221 mblk_t *mp;
3222
3223 mutex_enter(QLOCK(qp));
3224 for (mp = qp->q_first; mp; mp = mp->b_next)
3225 count++;
3226 mutex_exit(QLOCK(qp));
3227 return (count);
3228 }
3229
3230 /*
3231 * noenable - set queue so that putq() will not enable it.
3232 * enableok - set queue so that putq() can enable it.
3233 */
3234 void
noenable(queue_t * q)3235 noenable(queue_t *q)
3236 {
3237 mutex_enter(QLOCK(q));
3238 q->q_flag |= QNOENB;
3239 mutex_exit(QLOCK(q));
3240 }
3241
3242 void
enableok(queue_t * q)3243 enableok(queue_t *q)
3244 {
3245 mutex_enter(QLOCK(q));
3246 q->q_flag &= ~QNOENB;
3247 mutex_exit(QLOCK(q));
3248 }
3249
3250 /*
3251 * Set queue fields.
3252 */
3253 int
strqset(queue_t * q,qfields_t what,unsigned char pri,intptr_t val)3254 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
3255 {
3256 qband_t *qbp = NULL;
3257 queue_t *wrq;
3258 int error = 0;
3259 kthread_id_t freezer;
3260
3261 freezer = STREAM(q)->sd_freezer;
3262 if (freezer == curthread) {
3263 ASSERT(frozenstr(q));
3264 ASSERT(MUTEX_HELD(QLOCK(q)));
3265 } else
3266 mutex_enter(QLOCK(q));
3267
3268 if (what >= QBAD) {
3269 error = EINVAL;
3270 goto done;
3271 }
3272 if (pri != 0) {
3273 int i;
3274 qband_t **qbpp;
3275
3276 if (pri > q->q_nband) {
3277 qbpp = &q->q_bandp;
3278 while (*qbpp)
3279 qbpp = &(*qbpp)->qb_next;
3280 while (pri > q->q_nband) {
3281 if ((*qbpp = allocband()) == NULL) {
3282 error = EAGAIN;
3283 goto done;
3284 }
3285 (*qbpp)->qb_hiwat = q->q_hiwat;
3286 (*qbpp)->qb_lowat = q->q_lowat;
3287 q->q_nband++;
3288 qbpp = &(*qbpp)->qb_next;
3289 }
3290 }
3291 qbp = q->q_bandp;
3292 i = pri;
3293 while (--i)
3294 qbp = qbp->qb_next;
3295 }
3296 switch (what) {
3297
3298 case QHIWAT:
3299 if (qbp)
3300 qbp->qb_hiwat = (size_t)val;
3301 else
3302 q->q_hiwat = (size_t)val;
3303 break;
3304
3305 case QLOWAT:
3306 if (qbp)
3307 qbp->qb_lowat = (size_t)val;
3308 else
3309 q->q_lowat = (size_t)val;
3310 break;
3311
3312 case QMAXPSZ:
3313 if (qbp)
3314 error = EINVAL;
3315 else
3316 q->q_maxpsz = (ssize_t)val;
3317
3318 /*
3319 * Performance concern, strwrite looks at the module below
3320 * the stream head for the maxpsz each time it does a write
3321 * we now cache it at the stream head. Check to see if this
3322 * queue is sitting directly below the stream head.
3323 */
3324 wrq = STREAM(q)->sd_wrq;
3325 if (q != wrq->q_next)
3326 break;
3327
3328 /*
3329 * If the stream is not frozen drop the current QLOCK and
3330 * acquire the sd_wrq QLOCK which protects sd_qn_*
3331 */
3332 if (freezer != curthread) {
3333 mutex_exit(QLOCK(q));
3334 mutex_enter(QLOCK(wrq));
3335 }
3336 ASSERT(MUTEX_HELD(QLOCK(wrq)));
3337
3338 if (strmsgsz != 0) {
3339 if (val == INFPSZ)
3340 val = strmsgsz;
3341 else {
3342 if (STREAM(q)->sd_vnode->v_type == VFIFO)
3343 val = MIN(PIPE_BUF, val);
3344 else
3345 val = MIN(strmsgsz, val);
3346 }
3347 }
3348 STREAM(q)->sd_qn_maxpsz = val;
3349 if (freezer != curthread) {
3350 mutex_exit(QLOCK(wrq));
3351 mutex_enter(QLOCK(q));
3352 }
3353 break;
3354
3355 case QMINPSZ:
3356 if (qbp)
3357 error = EINVAL;
3358 else
3359 q->q_minpsz = (ssize_t)val;
3360
3361 /*
3362 * Performance concern, strwrite looks at the module below
3363 * the stream head for the maxpsz each time it does a write
3364 * we now cache it at the stream head. Check to see if this
3365 * queue is sitting directly below the stream head.
3366 */
3367 wrq = STREAM(q)->sd_wrq;
3368 if (q != wrq->q_next)
3369 break;
3370
3371 /*
3372 * If the stream is not frozen drop the current QLOCK and
3373 * acquire the sd_wrq QLOCK which protects sd_qn_*
3374 */
3375 if (freezer != curthread) {
3376 mutex_exit(QLOCK(q));
3377 mutex_enter(QLOCK(wrq));
3378 }
3379 STREAM(q)->sd_qn_minpsz = (ssize_t)val;
3380
3381 if (freezer != curthread) {
3382 mutex_exit(QLOCK(wrq));
3383 mutex_enter(QLOCK(q));
3384 }
3385 break;
3386
3387 case QSTRUIOT:
3388 if (qbp)
3389 error = EINVAL;
3390 else
3391 q->q_struiot = (ushort_t)val;
3392 break;
3393
3394 case QCOUNT:
3395 case QFIRST:
3396 case QLAST:
3397 case QFLAG:
3398 error = EPERM;
3399 break;
3400
3401 default:
3402 error = EINVAL;
3403 break;
3404 }
3405 done:
3406 if (freezer != curthread)
3407 mutex_exit(QLOCK(q));
3408 return (error);
3409 }
3410
3411 /*
3412 * Get queue fields.
3413 */
3414 int
strqget(queue_t * q,qfields_t what,unsigned char pri,void * valp)3415 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
3416 {
3417 qband_t *qbp = NULL;
3418 int error = 0;
3419 kthread_id_t freezer;
3420
3421 freezer = STREAM(q)->sd_freezer;
3422 if (freezer == curthread) {
3423 ASSERT(frozenstr(q));
3424 ASSERT(MUTEX_HELD(QLOCK(q)));
3425 } else
3426 mutex_enter(QLOCK(q));
3427 if (what >= QBAD) {
3428 error = EINVAL;
3429 goto done;
3430 }
3431 if (pri != 0) {
3432 int i;
3433 qband_t **qbpp;
3434
3435 if (pri > q->q_nband) {
3436 qbpp = &q->q_bandp;
3437 while (*qbpp)
3438 qbpp = &(*qbpp)->qb_next;
3439 while (pri > q->q_nband) {
3440 if ((*qbpp = allocband()) == NULL) {
3441 error = EAGAIN;
3442 goto done;
3443 }
3444 (*qbpp)->qb_hiwat = q->q_hiwat;
3445 (*qbpp)->qb_lowat = q->q_lowat;
3446 q->q_nband++;
3447 qbpp = &(*qbpp)->qb_next;
3448 }
3449 }
3450 qbp = q->q_bandp;
3451 i = pri;
3452 while (--i)
3453 qbp = qbp->qb_next;
3454 }
3455 switch (what) {
3456 case QHIWAT:
3457 if (qbp)
3458 *(size_t *)valp = qbp->qb_hiwat;
3459 else
3460 *(size_t *)valp = q->q_hiwat;
3461 break;
3462
3463 case QLOWAT:
3464 if (qbp)
3465 *(size_t *)valp = qbp->qb_lowat;
3466 else
3467 *(size_t *)valp = q->q_lowat;
3468 break;
3469
3470 case QMAXPSZ:
3471 if (qbp)
3472 error = EINVAL;
3473 else
3474 *(ssize_t *)valp = q->q_maxpsz;
3475 break;
3476
3477 case QMINPSZ:
3478 if (qbp)
3479 error = EINVAL;
3480 else
3481 *(ssize_t *)valp = q->q_minpsz;
3482 break;
3483
3484 case QCOUNT:
3485 if (qbp)
3486 *(size_t *)valp = qbp->qb_count;
3487 else
3488 *(size_t *)valp = q->q_count;
3489 break;
3490
3491 case QFIRST:
3492 if (qbp)
3493 *(mblk_t **)valp = qbp->qb_first;
3494 else
3495 *(mblk_t **)valp = q->q_first;
3496 break;
3497
3498 case QLAST:
3499 if (qbp)
3500 *(mblk_t **)valp = qbp->qb_last;
3501 else
3502 *(mblk_t **)valp = q->q_last;
3503 break;
3504
3505 case QFLAG:
3506 if (qbp)
3507 *(uint_t *)valp = qbp->qb_flag;
3508 else
3509 *(uint_t *)valp = q->q_flag;
3510 break;
3511
3512 case QSTRUIOT:
3513 if (qbp)
3514 error = EINVAL;
3515 else
3516 *(short *)valp = q->q_struiot;
3517 break;
3518
3519 default:
3520 error = EINVAL;
3521 break;
3522 }
3523 done:
3524 if (freezer != curthread)
3525 mutex_exit(QLOCK(q));
3526 return (error);
3527 }
3528
3529 /*
3530 * Function awakes all in cvwait/sigwait/pollwait, on one of:
3531 * QWANTWSYNC or QWANTR or QWANTW,
3532 *
3533 * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
3534 * deferred wakeup will be done. Also if strpoll() in progress then a
3535 * deferred pollwakeup will be done.
3536 */
3537 void
strwakeq(queue_t * q,int flag)3538 strwakeq(queue_t *q, int flag)
3539 {
3540 stdata_t *stp = STREAM(q);
3541 pollhead_t *pl;
3542
3543 mutex_enter(&stp->sd_lock);
3544 pl = &stp->sd_pollist;
3545 if (flag & QWANTWSYNC) {
3546 ASSERT(!(q->q_flag & QREADR));
3547 if (stp->sd_flag & WSLEEP) {
3548 stp->sd_flag &= ~WSLEEP;
3549 cv_broadcast(&stp->sd_wrq->q_wait);
3550 } else {
3551 stp->sd_wakeq |= WSLEEP;
3552 }
3553
3554 mutex_exit(&stp->sd_lock);
3555 pollwakeup(pl, POLLWRNORM);
3556 mutex_enter(&stp->sd_lock);
3557
3558 if (stp->sd_sigflags & S_WRNORM)
3559 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3560 } else if (flag & QWANTR) {
3561 if (stp->sd_flag & RSLEEP) {
3562 stp->sd_flag &= ~RSLEEP;
3563 cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
3564 } else {
3565 stp->sd_wakeq |= RSLEEP;
3566 }
3567
3568 mutex_exit(&stp->sd_lock);
3569 pollwakeup(pl, POLLIN | POLLRDNORM);
3570 mutex_enter(&stp->sd_lock);
3571
3572 {
3573 int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
3574
3575 if (events)
3576 strsendsig(stp->sd_siglist, events, 0, 0);
3577 }
3578 } else {
3579 if (stp->sd_flag & WSLEEP) {
3580 stp->sd_flag &= ~WSLEEP;
3581 cv_broadcast(&stp->sd_wrq->q_wait);
3582 }
3583
3584 mutex_exit(&stp->sd_lock);
3585 pollwakeup(pl, POLLWRNORM);
3586 mutex_enter(&stp->sd_lock);
3587
3588 if (stp->sd_sigflags & S_WRNORM)
3589 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3590 }
3591 mutex_exit(&stp->sd_lock);
3592 }
3593
3594 int
struioget(queue_t * q,mblk_t * mp,struiod_t * dp,int noblock)3595 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
3596 {
3597 stdata_t *stp = STREAM(q);
3598 int typ = STRUIOT_STANDARD;
3599 uio_t *uiop = &dp->d_uio;
3600 dblk_t *dbp;
3601 ssize_t uiocnt;
3602 ssize_t cnt;
3603 unsigned char *ptr;
3604 ssize_t resid;
3605 int error = 0;
3606 on_trap_data_t otd;
3607 queue_t *stwrq;
3608
3609 /*
3610 * Plumbing may change while taking the type so store the
3611 * queue in a temporary variable. It doesn't matter even
3612 * if the we take the type from the previous plumbing,
3613 * that's because if the plumbing has changed when we were
3614 * holding the queue in a temporary variable, we can continue
3615 * processing the message the way it would have been processed
3616 * in the old plumbing, without any side effects but a bit
3617 * extra processing for partial ip header checksum.
3618 *
3619 * This has been done to avoid holding the sd_lock which is
3620 * very hot.
3621 */
3622
3623 stwrq = stp->sd_struiowrq;
3624 if (stwrq)
3625 typ = stwrq->q_struiot;
3626
3627 for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
3628 dbp = mp->b_datap;
3629 ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
3630 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3631 cnt = MIN(uiocnt, uiop->uio_resid);
3632 if (!(dbp->db_struioflag & STRUIO_SPEC) ||
3633 (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
3634 /*
3635 * Either this mblk has already been processed
3636 * or there is no more room in this mblk (?).
3637 */
3638 continue;
3639 }
3640 switch (typ) {
3641 case STRUIOT_STANDARD:
3642 if (noblock) {
3643 if (on_trap(&otd, OT_DATA_ACCESS)) {
3644 no_trap();
3645 error = EWOULDBLOCK;
3646 goto out;
3647 }
3648 }
3649 if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
3650 if (noblock)
3651 no_trap();
3652 goto out;
3653 }
3654 if (noblock)
3655 no_trap();
3656 break;
3657
3658 default:
3659 error = EIO;
3660 goto out;
3661 }
3662 dbp->db_struioflag |= STRUIO_DONE;
3663 dbp->db_cksumstuff += cnt;
3664 }
3665 out:
3666 if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
3667 /*
3668 * A fault has occured and some bytes were moved to the
3669 * current mblk, the uio_t has already been updated by
3670 * the appropriate uio routine, so also update the mblk
3671 * to reflect this in case this same mblk chain is used
3672 * again (after the fault has been handled).
3673 */
3674 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3675 if (uiocnt >= resid)
3676 dbp->db_cksumstuff += resid;
3677 }
3678 return (error);
3679 }
3680
3681 /*
3682 * Try to enter queue synchronously. Any attempt to enter a closing queue will
3683 * fails. The qp->q_rwcnt keeps track of the number of successful entries so
3684 * that removeq() will not try to close the queue while a thread is inside the
3685 * queue.
3686 */
3687 static boolean_t
rwnext_enter(queue_t * qp)3688 rwnext_enter(queue_t *qp)
3689 {
3690 mutex_enter(QLOCK(qp));
3691 if (qp->q_flag & QWCLOSE) {
3692 mutex_exit(QLOCK(qp));
3693 return (B_FALSE);
3694 }
3695 qp->q_rwcnt++;
3696 ASSERT(qp->q_rwcnt != 0);
3697 mutex_exit(QLOCK(qp));
3698 return (B_TRUE);
3699 }
3700
3701 /*
3702 * Decrease the count of threads running in sync stream queue and wake up any
3703 * threads blocked in removeq().
3704 */
3705 static void
rwnext_exit(queue_t * qp)3706 rwnext_exit(queue_t *qp)
3707 {
3708 mutex_enter(QLOCK(qp));
3709 qp->q_rwcnt--;
3710 if (qp->q_flag & QWANTRMQSYNC) {
3711 qp->q_flag &= ~QWANTRMQSYNC;
3712 cv_broadcast(&qp->q_wait);
3713 }
3714 mutex_exit(QLOCK(qp));
3715 }
3716
3717 /*
3718 * The purpose of rwnext() is to call the rw procedure of the next
3719 * (downstream) modules queue.
3720 *
3721 * treated as put entrypoint for perimeter syncronization.
3722 *
3723 * There's no need to grab sq_putlocks here (which only exist for CIPUT
3724 * sync queues). If it is CIPUT sync queue sq_count is incremented and it does
3725 * not matter if any regular put entrypoints have been already entered. We
3726 * can't increment one of the sq_putcounts (instead of sq_count) because
3727 * qwait_rw won't know which counter to decrement.
3728 *
3729 * It would be reasonable to add the lockless FASTPUT logic.
3730 */
3731 int
rwnext(queue_t * qp,struiod_t * dp)3732 rwnext(queue_t *qp, struiod_t *dp)
3733 {
3734 queue_t *nqp;
3735 syncq_t *sq;
3736 uint16_t count;
3737 uint16_t flags;
3738 struct qinit *qi;
3739 int (*proc)();
3740 struct stdata *stp;
3741 int isread;
3742 int rval;
3743
3744 stp = STREAM(qp);
3745 /*
3746 * Prevent q_next from changing by holding sd_lock until acquiring
3747 * SQLOCK. Note that a read-side rwnext from the streamhead will
3748 * already have sd_lock acquired. In either case sd_lock is always
3749 * released after acquiring SQLOCK.
3750 *
3751 * The streamhead read-side holding sd_lock when calling rwnext is
3752 * required to prevent a race condition were M_DATA mblks flowing
3753 * up the read-side of the stream could be bypassed by a rwnext()
3754 * down-call. In this case sd_lock acts as the streamhead perimeter.
3755 */
3756 if ((nqp = _WR(qp)) == qp) {
3757 isread = 0;
3758 mutex_enter(&stp->sd_lock);
3759 qp = nqp->q_next;
3760 } else {
3761 isread = 1;
3762 if (nqp != stp->sd_wrq)
3763 /* Not streamhead */
3764 mutex_enter(&stp->sd_lock);
3765 qp = _RD(nqp->q_next);
3766 }
3767 qi = qp->q_qinfo;
3768 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
3769 /*
3770 * Not a synchronous module or no r/w procedure for this
3771 * queue, so just return EINVAL and let the caller handle it.
3772 */
3773 mutex_exit(&stp->sd_lock);
3774 return (EINVAL);
3775 }
3776
3777 if (rwnext_enter(qp) == B_FALSE) {
3778 mutex_exit(&stp->sd_lock);
3779 return (EINVAL);
3780 }
3781
3782 sq = qp->q_syncq;
3783 mutex_enter(SQLOCK(sq));
3784 mutex_exit(&stp->sd_lock);
3785 count = sq->sq_count;
3786 flags = sq->sq_flags;
3787 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3788
3789 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3790 /*
3791 * if this queue is being closed, return.
3792 */
3793 if (qp->q_flag & QWCLOSE) {
3794 mutex_exit(SQLOCK(sq));
3795 rwnext_exit(qp);
3796 return (EINVAL);
3797 }
3798
3799 /*
3800 * Wait until we can enter the inner perimeter.
3801 */
3802 sq->sq_flags = flags | SQ_WANTWAKEUP;
3803 cv_wait(&sq->sq_wait, SQLOCK(sq));
3804 count = sq->sq_count;
3805 flags = sq->sq_flags;
3806 }
3807
3808 if (isread == 0 && stp->sd_struiowrq == NULL ||
3809 isread == 1 && stp->sd_struiordq == NULL) {
3810 /*
3811 * Stream plumbing changed while waiting for inner perimeter
3812 * so just return EINVAL and let the caller handle it.
3813 */
3814 mutex_exit(SQLOCK(sq));
3815 rwnext_exit(qp);
3816 return (EINVAL);
3817 }
3818 if (!(flags & SQ_CIPUT))
3819 sq->sq_flags = flags | SQ_EXCL;
3820 sq->sq_count = count + 1;
3821 ASSERT(sq->sq_count != 0); /* Wraparound */
3822 /*
3823 * Note: The only message ordering guarantee that rwnext() makes is
3824 * for the write queue flow-control case. All others (r/w queue
3825 * with q_count > 0 (or q_first != 0)) are the resposibilty of
3826 * the queue's rw procedure. This could be genralized here buy
3827 * running the queue's service procedure, but that wouldn't be
3828 * the most efficent for all cases.
3829 */
3830 mutex_exit(SQLOCK(sq));
3831 if (! isread && (qp->q_flag & QFULL)) {
3832 /*
3833 * Write queue may be flow controlled. If so,
3834 * mark the queue for wakeup when it's not.
3835 */
3836 mutex_enter(QLOCK(qp));
3837 if (qp->q_flag & QFULL) {
3838 qp->q_flag |= QWANTWSYNC;
3839 mutex_exit(QLOCK(qp));
3840 rval = EWOULDBLOCK;
3841 goto out;
3842 }
3843 mutex_exit(QLOCK(qp));
3844 }
3845
3846 if (! isread && dp->d_mp)
3847 STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
3848 dp->d_mp->b_datap->db_base);
3849
3850 rval = (*proc)(qp, dp);
3851
3852 if (isread && dp->d_mp)
3853 STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
3854 dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
3855 out:
3856 /*
3857 * The queue is protected from being freed by sq_count, so it is
3858 * safe to call rwnext_exit and reacquire SQLOCK(sq).
3859 */
3860 rwnext_exit(qp);
3861
3862 mutex_enter(SQLOCK(sq));
3863 flags = sq->sq_flags;
3864 ASSERT(sq->sq_count != 0);
3865 sq->sq_count--;
3866 if (flags & SQ_TAIL) {
3867 putnext_tail(sq, qp, flags);
3868 /*
3869 * The only purpose of this ASSERT is to preserve calling stack
3870 * in DEBUG kernel.
3871 */
3872 ASSERT(flags & SQ_TAIL);
3873 return (rval);
3874 }
3875 ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3876 /*
3877 * Safe to always drop SQ_EXCL:
3878 * Not SQ_CIPUT means we set SQ_EXCL above
3879 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3880 * did a qwriter(INNER) in which case nobody else
3881 * is in the inner perimeter and we are exiting.
3882 *
3883 * I would like to make the following assertion:
3884 *
3885 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3886 * sq->sq_count == 0);
3887 *
3888 * which indicates that if we are both putshared and exclusive,
3889 * we became exclusive while executing the putproc, and the only
3890 * claim on the syncq was the one we dropped a few lines above.
3891 * But other threads that enter putnext while the syncq is exclusive
3892 * need to make a claim as they may need to drop SQLOCK in the
3893 * has_writers case to avoid deadlocks. If these threads are
3894 * delayed or preempted, it is possible that the writer thread can
3895 * find out that there are other claims making the (sq_count == 0)
3896 * test invalid.
3897 */
3898
3899 sq->sq_flags = flags & ~SQ_EXCL;
3900 if (sq->sq_flags & SQ_WANTWAKEUP) {
3901 sq->sq_flags &= ~SQ_WANTWAKEUP;
3902 cv_broadcast(&sq->sq_wait);
3903 }
3904 mutex_exit(SQLOCK(sq));
3905 return (rval);
3906 }
3907
3908 /*
3909 * The purpose of infonext() is to call the info procedure of the next
3910 * (downstream) modules queue.
3911 *
3912 * treated as put entrypoint for perimeter syncronization.
3913 *
3914 * There's no need to grab sq_putlocks here (which only exist for CIPUT
3915 * sync queues). If it is CIPUT sync queue regular sq_count is incremented and
3916 * it does not matter if any regular put entrypoints have been already
3917 * entered.
3918 */
3919 int
infonext(queue_t * qp,infod_t * idp)3920 infonext(queue_t *qp, infod_t *idp)
3921 {
3922 queue_t *nqp;
3923 syncq_t *sq;
3924 uint16_t count;
3925 uint16_t flags;
3926 struct qinit *qi;
3927 int (*proc)();
3928 struct stdata *stp;
3929 int rval;
3930
3931 stp = STREAM(qp);
3932 /*
3933 * Prevent q_next from changing by holding sd_lock until
3934 * acquiring SQLOCK.
3935 */
3936 mutex_enter(&stp->sd_lock);
3937 if ((nqp = _WR(qp)) == qp) {
3938 qp = nqp->q_next;
3939 } else {
3940 qp = _RD(nqp->q_next);
3941 }
3942 qi = qp->q_qinfo;
3943 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
3944 mutex_exit(&stp->sd_lock);
3945 return (EINVAL);
3946 }
3947 sq = qp->q_syncq;
3948 mutex_enter(SQLOCK(sq));
3949 mutex_exit(&stp->sd_lock);
3950 count = sq->sq_count;
3951 flags = sq->sq_flags;
3952 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3953
3954 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3955 /*
3956 * Wait until we can enter the inner perimeter.
3957 */
3958 sq->sq_flags = flags | SQ_WANTWAKEUP;
3959 cv_wait(&sq->sq_wait, SQLOCK(sq));
3960 count = sq->sq_count;
3961 flags = sq->sq_flags;
3962 }
3963
3964 if (! (flags & SQ_CIPUT))
3965 sq->sq_flags = flags | SQ_EXCL;
3966 sq->sq_count = count + 1;
3967 ASSERT(sq->sq_count != 0); /* Wraparound */
3968 mutex_exit(SQLOCK(sq));
3969
3970 rval = (*proc)(qp, idp);
3971
3972 mutex_enter(SQLOCK(sq));
3973 flags = sq->sq_flags;
3974 ASSERT(sq->sq_count != 0);
3975 sq->sq_count--;
3976 if (flags & SQ_TAIL) {
3977 putnext_tail(sq, qp, flags);
3978 /*
3979 * The only purpose of this ASSERT is to preserve calling stack
3980 * in DEBUG kernel.
3981 */
3982 ASSERT(flags & SQ_TAIL);
3983 return (rval);
3984 }
3985 ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3986 /*
3987 * XXXX
3988 * I am not certain the next comment is correct here. I need to consider
3989 * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
3990 * might cause other problems. It just might be safer to drop it if
3991 * !SQ_CIPUT because that is when we set it.
3992 */
3993 /*
3994 * Safe to always drop SQ_EXCL:
3995 * Not SQ_CIPUT means we set SQ_EXCL above
3996 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3997 * did a qwriter(INNER) in which case nobody else
3998 * is in the inner perimeter and we are exiting.
3999 *
4000 * I would like to make the following assertion:
4001 *
4002 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
4003 * sq->sq_count == 0);
4004 *
4005 * which indicates that if we are both putshared and exclusive,
4006 * we became exclusive while executing the putproc, and the only
4007 * claim on the syncq was the one we dropped a few lines above.
4008 * But other threads that enter putnext while the syncq is exclusive
4009 * need to make a claim as they may need to drop SQLOCK in the
4010 * has_writers case to avoid deadlocks. If these threads are
4011 * delayed or preempted, it is possible that the writer thread can
4012 * find out that there are other claims making the (sq_count == 0)
4013 * test invalid.
4014 */
4015
4016 sq->sq_flags = flags & ~SQ_EXCL;
4017 mutex_exit(SQLOCK(sq));
4018 return (rval);
4019 }
4020
4021 /*
4022 * Return nonzero if the queue is responsible for struio(), else return 0.
4023 */
4024 int
isuioq(queue_t * q)4025 isuioq(queue_t *q)
4026 {
4027 if (q->q_flag & QREADR)
4028 return (STREAM(q)->sd_struiordq == q);
4029 else
4030 return (STREAM(q)->sd_struiowrq == q);
4031 }
4032
4033 #if defined(__sparc)
4034 int disable_putlocks = 0;
4035 #else
4036 int disable_putlocks = 1;
4037 #endif
4038
4039 /*
4040 * called by create_putlock.
4041 */
4042 static void
create_syncq_putlocks(queue_t * q)4043 create_syncq_putlocks(queue_t *q)
4044 {
4045 syncq_t *sq = q->q_syncq;
4046 ciputctrl_t *cip;
4047 int i;
4048
4049 ASSERT(sq != NULL);
4050
4051 ASSERT(disable_putlocks == 0);
4052 ASSERT(n_ciputctrl >= min_n_ciputctrl);
4053 ASSERT(ciputctrl_cache != NULL);
4054
4055 if (!(sq->sq_type & SQ_CIPUT))
4056 return;
4057
4058 for (i = 0; i <= 1; i++) {
4059 if (sq->sq_ciputctrl == NULL) {
4060 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4061 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4062 mutex_enter(SQLOCK(sq));
4063 if (sq->sq_ciputctrl != NULL) {
4064 mutex_exit(SQLOCK(sq));
4065 kmem_cache_free(ciputctrl_cache, cip);
4066 } else {
4067 ASSERT(sq->sq_nciputctrl == 0);
4068 sq->sq_nciputctrl = n_ciputctrl - 1;
4069 /*
4070 * putnext checks sq_ciputctrl without holding
4071 * SQLOCK. if it is not NULL putnext assumes
4072 * sq_nciputctrl is initialized. membar below
4073 * insures that.
4074 */
4075 membar_producer();
4076 sq->sq_ciputctrl = cip;
4077 mutex_exit(SQLOCK(sq));
4078 }
4079 }
4080 ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
4081 if (i == 1)
4082 break;
4083 q = _OTHERQ(q);
4084 if (!(q->q_flag & QPERQ)) {
4085 ASSERT(sq == q->q_syncq);
4086 break;
4087 }
4088 ASSERT(q->q_syncq != NULL);
4089 ASSERT(sq != q->q_syncq);
4090 sq = q->q_syncq;
4091 ASSERT(sq->sq_type & SQ_CIPUT);
4092 }
4093 }
4094
4095 /*
4096 * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
4097 * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
4098 * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
4099 * starting from q and down to the driver.
4100 *
4101 * This should be called after the affected queues are part of stream
4102 * geometry. It should be called from driver/module open routine after
4103 * qprocson() call. It is also called from nfs syscall where it is known that
4104 * stream is configured and won't change its geometry during create_putlock
4105 * call.
4106 *
4107 * caller normally uses 0 value for the stream argument to speed up MT putnext
4108 * into the perimeter of q for example because its perimeter is per module
4109 * (e.g. IP).
4110 *
4111 * caller normally uses non 0 value for the stream argument to hint the system
4112 * that the stream of q is a very contended global system stream
4113 * (e.g. NFS/UDP) and the part of the stream from q to the driver is
4114 * particularly MT hot.
4115 *
4116 * Caller insures stream plumbing won't happen while we are here and therefore
4117 * q_next can be safely used.
4118 */
4119
4120 void
create_putlocks(queue_t * q,int stream)4121 create_putlocks(queue_t *q, int stream)
4122 {
4123 ciputctrl_t *cip;
4124 struct stdata *stp = STREAM(q);
4125
4126 q = _WR(q);
4127 ASSERT(stp != NULL);
4128
4129 if (disable_putlocks != 0)
4130 return;
4131
4132 if (n_ciputctrl < min_n_ciputctrl)
4133 return;
4134
4135 ASSERT(ciputctrl_cache != NULL);
4136
4137 if (stream != 0 && stp->sd_ciputctrl == NULL) {
4138 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4139 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4140 mutex_enter(&stp->sd_lock);
4141 if (stp->sd_ciputctrl != NULL) {
4142 mutex_exit(&stp->sd_lock);
4143 kmem_cache_free(ciputctrl_cache, cip);
4144 } else {
4145 ASSERT(stp->sd_nciputctrl == 0);
4146 stp->sd_nciputctrl = n_ciputctrl - 1;
4147 /*
4148 * putnext checks sd_ciputctrl without holding
4149 * sd_lock. if it is not NULL putnext assumes
4150 * sd_nciputctrl is initialized. membar below
4151 * insures that.
4152 */
4153 membar_producer();
4154 stp->sd_ciputctrl = cip;
4155 mutex_exit(&stp->sd_lock);
4156 }
4157 }
4158
4159 ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);
4160
4161 while (_SAMESTR(q)) {
4162 create_syncq_putlocks(q);
4163 if (stream == 0)
4164 return;
4165 q = q->q_next;
4166 }
4167 ASSERT(q != NULL);
4168 create_syncq_putlocks(q);
4169 }
4170
4171 /*
4172 * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
4173 * through a stream.
4174 *
4175 * Data currently record per-event is a timestamp, module/driver name,
4176 * downstream module/driver name, optional callstack, event type and a per
4177 * type datum. Much of the STREAMS framework is instrumented for automatic
4178 * flow tracing (when enabled). Events can be defined and used by STREAMS
4179 * modules and drivers.
4180 *
4181 * Global objects:
4182 *
4183 * str_ftevent() - Add a flow-trace event to a dblk.
4184 * str_ftfree() - Free flow-trace data
4185 *
4186 * Local objects:
4187 *
4188 * fthdr_cache - pointer to the kmem cache for trace header.
4189 * ftblk_cache - pointer to the kmem cache for trace data blocks.
4190 */
4191
4192 int str_ftnever = 1; /* Don't do STREAMS flow tracing */
4193 int str_ftstack = 0; /* Don't record event call stacks */
4194
4195 void
str_ftevent(fthdr_t * hp,void * p,ushort_t evnt,ushort_t data)4196 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
4197 {
4198 ftblk_t *bp = hp->tail;
4199 ftblk_t *nbp;
4200 ftevnt_t *ep;
4201 int ix, nix;
4202
4203 ASSERT(hp != NULL);
4204
4205 for (;;) {
4206 if ((ix = bp->ix) == FTBLK_EVNTS) {
4207 /*
4208 * Tail doesn't have room, so need a new tail.
4209 *
4210 * To make this MT safe, first, allocate a new
4211 * ftblk, and initialize it. To make life a
4212 * little easier, reserve the first slot (mostly
4213 * by making ix = 1). When we are finished with
4214 * the initialization, CAS this pointer to the
4215 * tail. If this succeeds, this is the new
4216 * "next" block. Otherwise, another thread
4217 * got here first, so free the block and start
4218 * again.
4219 */
4220 nbp = kmem_cache_alloc(ftblk_cache, KM_NOSLEEP);
4221 if (nbp == NULL) {
4222 /* no mem, so punt */
4223 str_ftnever++;
4224 /* free up all flow data? */
4225 return;
4226 }
4227 nbp->nxt = NULL;
4228 nbp->ix = 1;
4229 /*
4230 * Just in case there is another thread about
4231 * to get the next index, we need to make sure
4232 * the value is there for it.
4233 */
4234 membar_producer();
4235 if (atomic_cas_ptr(&hp->tail, bp, nbp) == bp) {
4236 /* CAS was successful */
4237 bp->nxt = nbp;
4238 membar_producer();
4239 bp = nbp;
4240 ix = 0;
4241 goto cas_good;
4242 } else {
4243 kmem_cache_free(ftblk_cache, nbp);
4244 bp = hp->tail;
4245 continue;
4246 }
4247 }
4248 nix = ix + 1;
4249 if (atomic_cas_32((uint32_t *)&bp->ix, ix, nix) == ix) {
4250 cas_good:
4251 if (curthread != hp->thread) {
4252 hp->thread = curthread;
4253 evnt |= FTEV_CS;
4254 }
4255 if (CPU->cpu_seqid != hp->cpu_seqid) {
4256 hp->cpu_seqid = CPU->cpu_seqid;
4257 evnt |= FTEV_PS;
4258 }
4259 ep = &bp->ev[ix];
4260 break;
4261 }
4262 }
4263
4264 if (evnt & FTEV_QMASK) {
4265 queue_t *qp = p;
4266
4267 if (!(qp->q_flag & QREADR))
4268 evnt |= FTEV_ISWR;
4269
4270 ep->mid = Q2NAME(qp);
4271
4272 /*
4273 * We only record the next queue name for FTEV_PUTNEXT since
4274 * that's the only time we *really* need it, and the putnext()
4275 * code ensures that qp->q_next won't vanish. (We could use
4276 * claimstr()/releasestr() but at a performance cost.)
4277 */
4278 if ((evnt & FTEV_MASK) == FTEV_PUTNEXT && qp->q_next != NULL)
4279 ep->midnext = Q2NAME(qp->q_next);
4280 else
4281 ep->midnext = NULL;
4282 } else {
4283 ep->mid = p;
4284 ep->midnext = NULL;
4285 }
4286
4287 if (ep->stk != NULL)
4288 ep->stk->fs_depth = getpcstack(ep->stk->fs_stk, FTSTK_DEPTH);
4289
4290 ep->ts = gethrtime();
4291 ep->evnt = evnt;
4292 ep->data = data;
4293 hp->hash = (hp->hash << 9) + hp->hash;
4294 hp->hash += (evnt << 16) | data;
4295 hp->hash += (uintptr_t)ep->mid;
4296 }
4297
4298 /*
4299 * Free flow-trace data.
4300 */
4301 void
str_ftfree(dblk_t * dbp)4302 str_ftfree(dblk_t *dbp)
4303 {
4304 fthdr_t *hp = dbp->db_fthdr;
4305 ftblk_t *bp = &hp->first;
4306 ftblk_t *nbp;
4307
4308 if (bp != hp->tail || bp->ix != 0) {
4309 /*
4310 * Clear out the hash, have the tail point to itself, and free
4311 * any continuation blocks.
4312 */
4313 bp = hp->first.nxt;
4314 hp->tail = &hp->first;
4315 hp->hash = 0;
4316 hp->first.nxt = NULL;
4317 hp->first.ix = 0;
4318 while (bp != NULL) {
4319 nbp = bp->nxt;
4320 kmem_cache_free(ftblk_cache, bp);
4321 bp = nbp;
4322 }
4323 }
4324 kmem_cache_free(fthdr_cache, hp);
4325 dbp->db_fthdr = NULL;
4326 }
4327