1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */
22 /* All Rights Reserved */
23
24 /*
25 * Copyright 2009 Sun Microsystems, Inc. All rights reserved.
26 * Use is subject to license terms.
27 */
28
29 #include <sys/types.h>
30 #include <sys/param.h>
31 #include <sys/thread.h>
32 #include <sys/sysmacros.h>
33 #include <sys/stropts.h>
34 #include <sys/stream.h>
35 #include <sys/strsubr.h>
36 #include <sys/strsun.h>
37 #include <sys/conf.h>
38 #include <sys/debug.h>
39 #include <sys/cmn_err.h>
40 #include <sys/kmem.h>
41 #include <sys/atomic.h>
42 #include <sys/errno.h>
43 #include <sys/vtrace.h>
44 #include <sys/ftrace.h>
45 #include <sys/ontrap.h>
46 #include <sys/multidata.h>
47 #include <sys/multidata_impl.h>
48 #include <sys/sdt.h>
49 #include <sys/strft.h>
50
51 #ifdef DEBUG
52 #include <sys/kmem_impl.h>
53 #endif
54
55 /*
56 * This file contains all the STREAMS utility routines that may
57 * be used by modules and drivers.
58 */
59
60 /*
61 * STREAMS message allocator: principles of operation
62 *
63 * The streams message allocator consists of all the routines that
64 * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
65 * dupb(), freeb() and freemsg(). What follows is a high-level view
66 * of how the allocator works.
67 *
68 * Every streams message consists of one or more mblks, a dblk, and data.
69 * All mblks for all types of messages come from a common mblk_cache.
70 * The dblk and data come in several flavors, depending on how the
71 * message is allocated:
72 *
73 * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
74 * fixed-size dblk/data caches. For message sizes that are multiples of
75 * PAGESIZE, dblks are allocated separately from the buffer.
76 * The associated buffer is allocated by the constructor using kmem_alloc().
77 * For all other message sizes, dblk and its associated data is allocated
78 * as a single contiguous chunk of memory.
79 * Objects in these caches consist of a dblk plus its associated data.
80 * allocb() determines the nearest-size cache by table lookup:
81 * the dblk_cache[] array provides the mapping from size to dblk cache.
82 *
83 * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
84 * kmem_alloc()'ing a buffer for the data and supplying that
85 * buffer to gesballoc(), described below.
86 *
87 * (3) The four flavors of [d]esballoc[a] are all implemented by a
88 * common routine, gesballoc() ("generic esballoc"). gesballoc()
89 * allocates a dblk from the global dblk_esb_cache and sets db_base,
90 * db_lim and db_frtnp to describe the caller-supplied buffer.
91 *
92 * While there are several routines to allocate messages, there is only
93 * one routine to free messages: freeb(). freeb() simply invokes the
94 * dblk's free method, dbp->db_free(), which is set at allocation time.
95 *
96 * dupb() creates a new reference to a message by allocating a new mblk,
97 * incrementing the dblk reference count and setting the dblk's free
98 * method to dblk_decref(). The dblk's original free method is retained
99 * in db_lastfree. dblk_decref() decrements the reference count on each
100 * freeb(). If this is not the last reference it just frees the mblk;
101 * if this *is* the last reference, it restores db_free to db_lastfree,
102 * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
103 *
104 * The implementation makes aggressive use of kmem object caching for
105 * maximum performance. This makes the code simple and compact, but
106 * also a bit abstruse in some places. The invariants that constitute a
107 * message's constructed state, described below, are more subtle than usual.
108 *
109 * Every dblk has an "attached mblk" as part of its constructed state.
110 * The mblk is allocated by the dblk's constructor and remains attached
111 * until the message is either dup'ed or pulled up. In the dupb() case
112 * the mblk association doesn't matter until the last free, at which time
113 * dblk_decref() attaches the last mblk to the dblk. pullupmsg() affects
114 * the mblk association because it swaps the leading mblks of two messages,
115 * so it is responsible for swapping their db_mblk pointers accordingly.
116 * From a constructed-state viewpoint it doesn't matter that a dblk's
117 * attached mblk can change while the message is allocated; all that
118 * matters is that the dblk has *some* attached mblk when it's freed.
119 *
120 * The sizes of the allocb() small-message caches are not magical.
121 * They represent a good trade-off between internal and external
122 * fragmentation for current workloads. They should be reevaluated
123 * periodically, especially if allocations larger than DBLK_MAX_CACHE
124 * become common. We use 64-byte alignment so that dblks don't
125 * straddle cache lines unnecessarily.
126 */
127 #define DBLK_MAX_CACHE 73728
128 #define DBLK_CACHE_ALIGN 64
129 #define DBLK_MIN_SIZE 8
130 #define DBLK_SIZE_SHIFT 3
131
132 #ifdef _BIG_ENDIAN
133 #define DBLK_RTFU_SHIFT(field) \
134 (8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
135 #else
136 #define DBLK_RTFU_SHIFT(field) \
137 (8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
138 #endif
139
140 #define DBLK_RTFU(ref, type, flags, uioflag) \
141 (((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
142 ((type) << DBLK_RTFU_SHIFT(db_type)) | \
143 (((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
144 ((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
145 #define DBLK_RTFU_REF_MASK (DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
146 #define DBLK_RTFU_WORD(dbp) (*((uint32_t *)&(dbp)->db_ref))
147 #define MBLK_BAND_FLAG_WORD(mp) (*((uint32_t *)&(mp)->b_band))
148
149 static size_t dblk_sizes[] = {
150 #ifdef _LP64
151 16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856,
152 8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624,
153 40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392,
154 #else
155 64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904,
156 8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672,
157 40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440,
158 #endif
159 DBLK_MAX_CACHE, 0
160 };
161
162 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
163 static struct kmem_cache *mblk_cache;
164 static struct kmem_cache *dblk_esb_cache;
165 static struct kmem_cache *fthdr_cache;
166 static struct kmem_cache *ftblk_cache;
167
168 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
169 static mblk_t *allocb_oversize(size_t size, int flags);
170 static int allocb_tryhard_fails;
171 static void frnop_func(void *arg);
172 frtn_t frnop = { frnop_func };
173 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
174
175 static boolean_t rwnext_enter(queue_t *qp);
176 static void rwnext_exit(queue_t *qp);
177
178 /*
179 * Patchable mblk/dblk kmem_cache flags.
180 */
181 int dblk_kmem_flags = 0;
182 int mblk_kmem_flags = 0;
183
184 static int
dblk_constructor(void * buf,void * cdrarg,int kmflags)185 dblk_constructor(void *buf, void *cdrarg, int kmflags)
186 {
187 dblk_t *dbp = buf;
188 ssize_t msg_size = (ssize_t)cdrarg;
189 size_t index;
190
191 ASSERT(msg_size != 0);
192
193 index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
194
195 ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
196
197 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
198 return (-1);
199 if ((msg_size & PAGEOFFSET) == 0) {
200 dbp->db_base = kmem_alloc(msg_size, kmflags);
201 if (dbp->db_base == NULL) {
202 kmem_cache_free(mblk_cache, dbp->db_mblk);
203 return (-1);
204 }
205 } else {
206 dbp->db_base = (unsigned char *)&dbp[1];
207 }
208
209 dbp->db_mblk->b_datap = dbp;
210 dbp->db_cache = dblk_cache[index];
211 dbp->db_lim = dbp->db_base + msg_size;
212 dbp->db_free = dbp->db_lastfree = dblk_lastfree;
213 dbp->db_frtnp = NULL;
214 dbp->db_fthdr = NULL;
215 dbp->db_credp = NULL;
216 dbp->db_cpid = -1;
217 dbp->db_struioflag = 0;
218 dbp->db_struioun.cksum.flags = 0;
219 return (0);
220 }
221
222 /*ARGSUSED*/
223 static int
dblk_esb_constructor(void * buf,void * cdrarg,int kmflags)224 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
225 {
226 dblk_t *dbp = buf;
227
228 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
229 return (-1);
230 dbp->db_mblk->b_datap = dbp;
231 dbp->db_cache = dblk_esb_cache;
232 dbp->db_fthdr = NULL;
233 dbp->db_credp = NULL;
234 dbp->db_cpid = -1;
235 dbp->db_struioflag = 0;
236 dbp->db_struioun.cksum.flags = 0;
237 return (0);
238 }
239
240 static int
bcache_dblk_constructor(void * buf,void * cdrarg,int kmflags)241 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
242 {
243 dblk_t *dbp = buf;
244 bcache_t *bcp = cdrarg;
245
246 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
247 return (-1);
248
249 dbp->db_base = kmem_cache_alloc(bcp->buffer_cache, kmflags);
250 if (dbp->db_base == NULL) {
251 kmem_cache_free(mblk_cache, dbp->db_mblk);
252 return (-1);
253 }
254
255 dbp->db_mblk->b_datap = dbp;
256 dbp->db_cache = (void *)bcp;
257 dbp->db_lim = dbp->db_base + bcp->size;
258 dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
259 dbp->db_frtnp = NULL;
260 dbp->db_fthdr = NULL;
261 dbp->db_credp = NULL;
262 dbp->db_cpid = -1;
263 dbp->db_struioflag = 0;
264 dbp->db_struioun.cksum.flags = 0;
265 return (0);
266 }
267
268 /*ARGSUSED*/
269 static void
dblk_destructor(void * buf,void * cdrarg)270 dblk_destructor(void *buf, void *cdrarg)
271 {
272 dblk_t *dbp = buf;
273 ssize_t msg_size = (ssize_t)cdrarg;
274
275 ASSERT(dbp->db_mblk->b_datap == dbp);
276 ASSERT(msg_size != 0);
277 ASSERT(dbp->db_struioflag == 0);
278 ASSERT(dbp->db_struioun.cksum.flags == 0);
279
280 if ((msg_size & PAGEOFFSET) == 0) {
281 kmem_free(dbp->db_base, msg_size);
282 }
283
284 kmem_cache_free(mblk_cache, dbp->db_mblk);
285 }
286
287 static void
bcache_dblk_destructor(void * buf,void * cdrarg)288 bcache_dblk_destructor(void *buf, void *cdrarg)
289 {
290 dblk_t *dbp = buf;
291 bcache_t *bcp = cdrarg;
292
293 kmem_cache_free(bcp->buffer_cache, dbp->db_base);
294
295 ASSERT(dbp->db_mblk->b_datap == dbp);
296 ASSERT(dbp->db_struioflag == 0);
297 ASSERT(dbp->db_struioun.cksum.flags == 0);
298
299 kmem_cache_free(mblk_cache, dbp->db_mblk);
300 }
301
302 /* ARGSUSED */
303 static int
ftblk_constructor(void * buf,void * cdrarg,int kmflags)304 ftblk_constructor(void *buf, void *cdrarg, int kmflags)
305 {
306 ftblk_t *fbp = buf;
307 int i;
308
309 bzero(fbp, sizeof (ftblk_t));
310 if (str_ftstack != 0) {
311 for (i = 0; i < FTBLK_EVNTS; i++)
312 fbp->ev[i].stk = kmem_alloc(sizeof (ftstk_t), kmflags);
313 }
314
315 return (0);
316 }
317
318 /* ARGSUSED */
319 static void
ftblk_destructor(void * buf,void * cdrarg)320 ftblk_destructor(void *buf, void *cdrarg)
321 {
322 ftblk_t *fbp = buf;
323 int i;
324
325 if (str_ftstack != 0) {
326 for (i = 0; i < FTBLK_EVNTS; i++) {
327 if (fbp->ev[i].stk != NULL) {
328 kmem_free(fbp->ev[i].stk, sizeof (ftstk_t));
329 fbp->ev[i].stk = NULL;
330 }
331 }
332 }
333 }
334
335 static int
fthdr_constructor(void * buf,void * cdrarg,int kmflags)336 fthdr_constructor(void *buf, void *cdrarg, int kmflags)
337 {
338 fthdr_t *fhp = buf;
339
340 return (ftblk_constructor(&fhp->first, cdrarg, kmflags));
341 }
342
343 static void
fthdr_destructor(void * buf,void * cdrarg)344 fthdr_destructor(void *buf, void *cdrarg)
345 {
346 fthdr_t *fhp = buf;
347
348 ftblk_destructor(&fhp->first, cdrarg);
349 }
350
351 void
streams_msg_init(void)352 streams_msg_init(void)
353 {
354 char name[40];
355 size_t size;
356 size_t lastsize = DBLK_MIN_SIZE;
357 size_t *sizep;
358 struct kmem_cache *cp;
359 size_t tot_size;
360 int offset;
361
362 mblk_cache = kmem_cache_create("streams_mblk", sizeof (mblk_t), 32,
363 NULL, NULL, NULL, NULL, NULL, mblk_kmem_flags);
364
365 for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
366
367 if ((offset = (size & PAGEOFFSET)) != 0) {
368 /*
369 * We are in the middle of a page, dblk should
370 * be allocated on the same page
371 */
372 tot_size = size + sizeof (dblk_t);
373 ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
374 < PAGESIZE);
375 ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
376
377 } else {
378
379 /*
380 * buf size is multiple of page size, dblk and
381 * buffer are allocated separately.
382 */
383
384 ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
385 tot_size = sizeof (dblk_t);
386 }
387
388 (void) sprintf(name, "streams_dblk_%ld", size);
389 cp = kmem_cache_create(name, tot_size, DBLK_CACHE_ALIGN,
390 dblk_constructor, dblk_destructor, NULL, (void *)(size),
391 NULL, dblk_kmem_flags);
392
393 while (lastsize <= size) {
394 dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
395 lastsize += DBLK_MIN_SIZE;
396 }
397 }
398
399 dblk_esb_cache = kmem_cache_create("streams_dblk_esb", sizeof (dblk_t),
400 DBLK_CACHE_ALIGN, dblk_esb_constructor, dblk_destructor, NULL,
401 (void *)sizeof (dblk_t), NULL, dblk_kmem_flags);
402 fthdr_cache = kmem_cache_create("streams_fthdr", sizeof (fthdr_t), 32,
403 fthdr_constructor, fthdr_destructor, NULL, NULL, NULL, 0);
404 ftblk_cache = kmem_cache_create("streams_ftblk", sizeof (ftblk_t), 32,
405 ftblk_constructor, ftblk_destructor, NULL, NULL, NULL, 0);
406
407 /* Initialize Multidata caches */
408 mmd_init();
409
410 /* initialize throttling queue for esballoc */
411 esballoc_queue_init();
412 }
413
414 /*ARGSUSED*/
415 mblk_t *
allocb(size_t size,uint_t pri)416 allocb(size_t size, uint_t pri)
417 {
418 dblk_t *dbp;
419 mblk_t *mp;
420 size_t index;
421
422 index = (size - 1) >> DBLK_SIZE_SHIFT;
423
424 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
425 if (size != 0) {
426 mp = allocb_oversize(size, KM_NOSLEEP);
427 goto out;
428 }
429 index = 0;
430 }
431
432 if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
433 mp = NULL;
434 goto out;
435 }
436
437 mp = dbp->db_mblk;
438 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
439 mp->b_next = mp->b_prev = mp->b_cont = NULL;
440 mp->b_rptr = mp->b_wptr = dbp->db_base;
441 mp->b_queue = NULL;
442 MBLK_BAND_FLAG_WORD(mp) = 0;
443 STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
444 out:
445 FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
446
447 return (mp);
448 }
449
450 /*
451 * Allocate an mblk taking db_credp and db_cpid from the template.
452 * Allow the cred to be NULL.
453 */
454 mblk_t *
allocb_tmpl(size_t size,const mblk_t * tmpl)455 allocb_tmpl(size_t size, const mblk_t *tmpl)
456 {
457 mblk_t *mp = allocb(size, 0);
458
459 if (mp != NULL) {
460 dblk_t *src = tmpl->b_datap;
461 dblk_t *dst = mp->b_datap;
462 cred_t *cr;
463 pid_t cpid;
464
465 cr = msg_getcred(tmpl, &cpid);
466 if (cr != NULL)
467 crhold(dst->db_credp = cr);
468 dst->db_cpid = cpid;
469 dst->db_type = src->db_type;
470 }
471 return (mp);
472 }
473
474 mblk_t *
allocb_cred(size_t size,cred_t * cr,pid_t cpid)475 allocb_cred(size_t size, cred_t *cr, pid_t cpid)
476 {
477 mblk_t *mp = allocb(size, 0);
478
479 ASSERT(cr != NULL);
480 if (mp != NULL) {
481 dblk_t *dbp = mp->b_datap;
482
483 crhold(dbp->db_credp = cr);
484 dbp->db_cpid = cpid;
485 }
486 return (mp);
487 }
488
489 mblk_t *
allocb_cred_wait(size_t size,uint_t flags,int * error,cred_t * cr,pid_t cpid)490 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr, pid_t cpid)
491 {
492 mblk_t *mp = allocb_wait(size, 0, flags, error);
493
494 ASSERT(cr != NULL);
495 if (mp != NULL) {
496 dblk_t *dbp = mp->b_datap;
497
498 crhold(dbp->db_credp = cr);
499 dbp->db_cpid = cpid;
500 }
501
502 return (mp);
503 }
504
505 /*
506 * Extract the db_cred (and optionally db_cpid) from a message.
507 * We find the first mblk which has a non-NULL db_cred and use that.
508 * If none found we return NULL.
509 * Does NOT get a hold on the cred.
510 */
511 cred_t *
msg_getcred(const mblk_t * mp,pid_t * cpidp)512 msg_getcred(const mblk_t *mp, pid_t *cpidp)
513 {
514 cred_t *cr = NULL;
515 cred_t *cr2;
516 mblk_t *mp2;
517
518 while (mp != NULL) {
519 dblk_t *dbp = mp->b_datap;
520
521 cr = dbp->db_credp;
522 if (cr == NULL) {
523 mp = mp->b_cont;
524 continue;
525 }
526 if (cpidp != NULL)
527 *cpidp = dbp->db_cpid;
528
529 #ifdef DEBUG
530 /*
531 * Normally there should at most one db_credp in a message.
532 * But if there are multiple (as in the case of some M_IOC*
533 * and some internal messages in TCP/IP bind logic) then
534 * they must be identical in the normal case.
535 * However, a socket can be shared between different uids
536 * in which case data queued in TCP would be from different
537 * creds. Thus we can only assert for the zoneid being the
538 * same. Due to Multi-level Level Ports for TX, some
539 * cred_t can have a NULL cr_zone, and we skip the comparison
540 * in that case.
541 */
542 mp2 = mp->b_cont;
543 while (mp2 != NULL) {
544 cr2 = DB_CRED(mp2);
545 if (cr2 != NULL) {
546 DTRACE_PROBE2(msg__getcred,
547 cred_t *, cr, cred_t *, cr2);
548 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
549 crgetzone(cr) == NULL ||
550 crgetzone(cr2) == NULL);
551 }
552 mp2 = mp2->b_cont;
553 }
554 #endif
555 return (cr);
556 }
557 if (cpidp != NULL)
558 *cpidp = NOPID;
559 return (NULL);
560 }
561
562 /*
563 * Variant of msg_getcred which, when a cred is found
564 * 1. Returns with a hold on the cred
565 * 2. Clears the first cred in the mblk.
566 * This is more efficient to use than a msg_getcred() + crhold() when
567 * the message is freed after the cred has been extracted.
568 *
569 * The caller is responsible for ensuring that there is no other reference
570 * on the message since db_credp can not be cleared when there are other
571 * references.
572 */
573 cred_t *
msg_extractcred(mblk_t * mp,pid_t * cpidp)574 msg_extractcred(mblk_t *mp, pid_t *cpidp)
575 {
576 cred_t *cr = NULL;
577 cred_t *cr2;
578 mblk_t *mp2;
579
580 while (mp != NULL) {
581 dblk_t *dbp = mp->b_datap;
582
583 cr = dbp->db_credp;
584 if (cr == NULL) {
585 mp = mp->b_cont;
586 continue;
587 }
588 ASSERT(dbp->db_ref == 1);
589 dbp->db_credp = NULL;
590 if (cpidp != NULL)
591 *cpidp = dbp->db_cpid;
592 #ifdef DEBUG
593 /*
594 * Normally there should at most one db_credp in a message.
595 * But if there are multiple (as in the case of some M_IOC*
596 * and some internal messages in TCP/IP bind logic) then
597 * they must be identical in the normal case.
598 * However, a socket can be shared between different uids
599 * in which case data queued in TCP would be from different
600 * creds. Thus we can only assert for the zoneid being the
601 * same. Due to Multi-level Level Ports for TX, some
602 * cred_t can have a NULL cr_zone, and we skip the comparison
603 * in that case.
604 */
605 mp2 = mp->b_cont;
606 while (mp2 != NULL) {
607 cr2 = DB_CRED(mp2);
608 if (cr2 != NULL) {
609 DTRACE_PROBE2(msg__extractcred,
610 cred_t *, cr, cred_t *, cr2);
611 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
612 crgetzone(cr) == NULL ||
613 crgetzone(cr2) == NULL);
614 }
615 mp2 = mp2->b_cont;
616 }
617 #endif
618 return (cr);
619 }
620 return (NULL);
621 }
622 /*
623 * Get the label for a message. Uses the first mblk in the message
624 * which has a non-NULL db_credp.
625 * Returns NULL if there is no credp.
626 */
627 extern struct ts_label_s *
msg_getlabel(const mblk_t * mp)628 msg_getlabel(const mblk_t *mp)
629 {
630 cred_t *cr = msg_getcred(mp, NULL);
631
632 if (cr == NULL)
633 return (NULL);
634
635 return (crgetlabel(cr));
636 }
637
638 void
freeb(mblk_t * mp)639 freeb(mblk_t *mp)
640 {
641 dblk_t *dbp = mp->b_datap;
642
643 ASSERT(dbp->db_ref > 0);
644 ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
645 FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
646
647 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
648
649 dbp->db_free(mp, dbp);
650 }
651
652 void
freemsg(mblk_t * mp)653 freemsg(mblk_t *mp)
654 {
655 FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
656 while (mp) {
657 dblk_t *dbp = mp->b_datap;
658 mblk_t *mp_cont = mp->b_cont;
659
660 ASSERT(dbp->db_ref > 0);
661 ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
662
663 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
664
665 dbp->db_free(mp, dbp);
666 mp = mp_cont;
667 }
668 }
669
670 /*
671 * Reallocate a block for another use. Try hard to use the old block.
672 * If the old data is wanted (copy), leave b_wptr at the end of the data,
673 * otherwise return b_wptr = b_rptr.
674 *
675 * This routine is private and unstable.
676 */
677 mblk_t *
reallocb(mblk_t * mp,size_t size,uint_t copy)678 reallocb(mblk_t *mp, size_t size, uint_t copy)
679 {
680 mblk_t *mp1;
681 unsigned char *old_rptr;
682 ptrdiff_t cur_size;
683
684 if (mp == NULL)
685 return (allocb(size, BPRI_HI));
686
687 cur_size = mp->b_wptr - mp->b_rptr;
688 old_rptr = mp->b_rptr;
689
690 ASSERT(mp->b_datap->db_ref != 0);
691
692 if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
693 /*
694 * If the data is wanted and it will fit where it is, no
695 * work is required.
696 */
697 if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
698 return (mp);
699
700 mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
701 mp1 = mp;
702 } else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
703 /* XXX other mp state could be copied too, db_flags ... ? */
704 mp1->b_cont = mp->b_cont;
705 } else {
706 return (NULL);
707 }
708
709 if (copy) {
710 bcopy(old_rptr, mp1->b_rptr, cur_size);
711 mp1->b_wptr = mp1->b_rptr + cur_size;
712 }
713
714 if (mp != mp1)
715 freeb(mp);
716
717 return (mp1);
718 }
719
720 static void
dblk_lastfree(mblk_t * mp,dblk_t * dbp)721 dblk_lastfree(mblk_t *mp, dblk_t *dbp)
722 {
723 ASSERT(dbp->db_mblk == mp);
724 if (dbp->db_fthdr != NULL)
725 str_ftfree(dbp);
726
727 /* set credp and projid to be 'unspecified' before returning to cache */
728 if (dbp->db_credp != NULL) {
729 crfree(dbp->db_credp);
730 dbp->db_credp = NULL;
731 }
732 dbp->db_cpid = -1;
733
734 /* Reset the struioflag and the checksum flag fields */
735 dbp->db_struioflag = 0;
736 dbp->db_struioun.cksum.flags = 0;
737
738 /* and the COOKED and/or UIOA flag(s) */
739 dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA);
740
741 kmem_cache_free(dbp->db_cache, dbp);
742 }
743
744 static void
dblk_decref(mblk_t * mp,dblk_t * dbp)745 dblk_decref(mblk_t *mp, dblk_t *dbp)
746 {
747 if (dbp->db_ref != 1) {
748 uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
749 -(1 << DBLK_RTFU_SHIFT(db_ref)));
750 /*
751 * atomic_add_32_nv() just decremented db_ref, so we no longer
752 * have a reference to the dblk, which means another thread
753 * could free it. Therefore we cannot examine the dblk to
754 * determine whether ours was the last reference. Instead,
755 * we extract the new and minimum reference counts from rtfu.
756 * Note that all we're really saying is "if (ref != refmin)".
757 */
758 if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
759 ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
760 kmem_cache_free(mblk_cache, mp);
761 return;
762 }
763 }
764 dbp->db_mblk = mp;
765 dbp->db_free = dbp->db_lastfree;
766 dbp->db_lastfree(mp, dbp);
767 }
768
769 mblk_t *
dupb(mblk_t * mp)770 dupb(mblk_t *mp)
771 {
772 dblk_t *dbp = mp->b_datap;
773 mblk_t *new_mp;
774 uint32_t oldrtfu, newrtfu;
775
776 if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
777 goto out;
778
779 new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
780 new_mp->b_rptr = mp->b_rptr;
781 new_mp->b_wptr = mp->b_wptr;
782 new_mp->b_datap = dbp;
783 new_mp->b_queue = NULL;
784 MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
785
786 STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
787
788 dbp->db_free = dblk_decref;
789 do {
790 ASSERT(dbp->db_ref > 0);
791 oldrtfu = DBLK_RTFU_WORD(dbp);
792 newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
793 /*
794 * If db_ref is maxed out we can't dup this message anymore.
795 */
796 if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
797 kmem_cache_free(mblk_cache, new_mp);
798 new_mp = NULL;
799 goto out;
800 }
801 } while (atomic_cas_32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) !=
802 oldrtfu);
803
804 out:
805 FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
806 return (new_mp);
807 }
808
809 static void
dblk_lastfree_desb(mblk_t * mp,dblk_t * dbp)810 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
811 {
812 frtn_t *frp = dbp->db_frtnp;
813
814 ASSERT(dbp->db_mblk == mp);
815 frp->free_func(frp->free_arg);
816 if (dbp->db_fthdr != NULL)
817 str_ftfree(dbp);
818
819 /* set credp and projid to be 'unspecified' before returning to cache */
820 if (dbp->db_credp != NULL) {
821 crfree(dbp->db_credp);
822 dbp->db_credp = NULL;
823 }
824 dbp->db_cpid = -1;
825 dbp->db_struioflag = 0;
826 dbp->db_struioun.cksum.flags = 0;
827
828 kmem_cache_free(dbp->db_cache, dbp);
829 }
830
831 /*ARGSUSED*/
832 static void
frnop_func(void * arg)833 frnop_func(void *arg)
834 {
835 }
836
837 /*
838 * Generic esballoc used to implement the four flavors: [d]esballoc[a].
839 */
840 static mblk_t *
gesballoc(unsigned char * base,size_t size,uint32_t db_rtfu,frtn_t * frp,void (* lastfree)(mblk_t *,dblk_t *),int kmflags)841 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
842 void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
843 {
844 dblk_t *dbp;
845 mblk_t *mp;
846
847 ASSERT(base != NULL && frp != NULL);
848
849 if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
850 mp = NULL;
851 goto out;
852 }
853
854 mp = dbp->db_mblk;
855 dbp->db_base = base;
856 dbp->db_lim = base + size;
857 dbp->db_free = dbp->db_lastfree = lastfree;
858 dbp->db_frtnp = frp;
859 DBLK_RTFU_WORD(dbp) = db_rtfu;
860 mp->b_next = mp->b_prev = mp->b_cont = NULL;
861 mp->b_rptr = mp->b_wptr = base;
862 mp->b_queue = NULL;
863 MBLK_BAND_FLAG_WORD(mp) = 0;
864
865 out:
866 FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
867 return (mp);
868 }
869
870 /*ARGSUSED*/
871 mblk_t *
esballoc(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)872 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
873 {
874 mblk_t *mp;
875
876 /*
877 * Note that this is structured to allow the common case (i.e.
878 * STREAMS flowtracing disabled) to call gesballoc() with tail
879 * call optimization.
880 */
881 if (!str_ftnever) {
882 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
883 frp, freebs_enqueue, KM_NOSLEEP);
884
885 if (mp != NULL)
886 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
887 return (mp);
888 }
889
890 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
891 frp, freebs_enqueue, KM_NOSLEEP));
892 }
893
894 /*
895 * Same as esballoc() but sleeps waiting for memory.
896 */
897 /*ARGSUSED*/
898 mblk_t *
esballoc_wait(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)899 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
900 {
901 mblk_t *mp;
902
903 /*
904 * Note that this is structured to allow the common case (i.e.
905 * STREAMS flowtracing disabled) to call gesballoc() with tail
906 * call optimization.
907 */
908 if (!str_ftnever) {
909 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
910 frp, freebs_enqueue, KM_SLEEP);
911
912 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
913 return (mp);
914 }
915
916 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
917 frp, freebs_enqueue, KM_SLEEP));
918 }
919
920 /*ARGSUSED*/
921 mblk_t *
desballoc(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)922 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
923 {
924 mblk_t *mp;
925
926 /*
927 * Note that this is structured to allow the common case (i.e.
928 * STREAMS flowtracing disabled) to call gesballoc() with tail
929 * call optimization.
930 */
931 if (!str_ftnever) {
932 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
933 frp, dblk_lastfree_desb, KM_NOSLEEP);
934
935 if (mp != NULL)
936 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
937 return (mp);
938 }
939
940 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
941 frp, dblk_lastfree_desb, KM_NOSLEEP));
942 }
943
944 /*ARGSUSED*/
945 mblk_t *
esballoca(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)946 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
947 {
948 mblk_t *mp;
949
950 /*
951 * Note that this is structured to allow the common case (i.e.
952 * STREAMS flowtracing disabled) to call gesballoc() with tail
953 * call optimization.
954 */
955 if (!str_ftnever) {
956 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
957 frp, freebs_enqueue, KM_NOSLEEP);
958
959 if (mp != NULL)
960 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
961 return (mp);
962 }
963
964 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
965 frp, freebs_enqueue, KM_NOSLEEP));
966 }
967
968 /*ARGSUSED*/
969 mblk_t *
desballoca(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)970 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
971 {
972 mblk_t *mp;
973
974 /*
975 * Note that this is structured to allow the common case (i.e.
976 * STREAMS flowtracing disabled) to call gesballoc() with tail
977 * call optimization.
978 */
979 if (!str_ftnever) {
980 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
981 frp, dblk_lastfree_desb, KM_NOSLEEP);
982
983 if (mp != NULL)
984 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
985 return (mp);
986 }
987
988 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
989 frp, dblk_lastfree_desb, KM_NOSLEEP));
990 }
991
992 static void
bcache_dblk_lastfree(mblk_t * mp,dblk_t * dbp)993 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
994 {
995 bcache_t *bcp = dbp->db_cache;
996
997 ASSERT(dbp->db_mblk == mp);
998 if (dbp->db_fthdr != NULL)
999 str_ftfree(dbp);
1000
1001 /* set credp and projid to be 'unspecified' before returning to cache */
1002 if (dbp->db_credp != NULL) {
1003 crfree(dbp->db_credp);
1004 dbp->db_credp = NULL;
1005 }
1006 dbp->db_cpid = -1;
1007 dbp->db_struioflag = 0;
1008 dbp->db_struioun.cksum.flags = 0;
1009
1010 mutex_enter(&bcp->mutex);
1011 kmem_cache_free(bcp->dblk_cache, dbp);
1012 bcp->alloc--;
1013
1014 if (bcp->alloc == 0 && bcp->destroy != 0) {
1015 kmem_cache_destroy(bcp->dblk_cache);
1016 kmem_cache_destroy(bcp->buffer_cache);
1017 mutex_exit(&bcp->mutex);
1018 mutex_destroy(&bcp->mutex);
1019 kmem_free(bcp, sizeof (bcache_t));
1020 } else {
1021 mutex_exit(&bcp->mutex);
1022 }
1023 }
1024
1025 bcache_t *
bcache_create(char * name,size_t size,uint_t align)1026 bcache_create(char *name, size_t size, uint_t align)
1027 {
1028 bcache_t *bcp;
1029 char buffer[255];
1030
1031 ASSERT((align & (align - 1)) == 0);
1032
1033 if ((bcp = kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == NULL)
1034 return (NULL);
1035
1036 bcp->size = size;
1037 bcp->align = align;
1038 bcp->alloc = 0;
1039 bcp->destroy = 0;
1040
1041 mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
1042
1043 (void) sprintf(buffer, "%s_buffer_cache", name);
1044 bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
1045 NULL, NULL, NULL, 0);
1046 (void) sprintf(buffer, "%s_dblk_cache", name);
1047 bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
1048 DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
1049 NULL, (void *)bcp, NULL, 0);
1050
1051 return (bcp);
1052 }
1053
1054 void
bcache_destroy(bcache_t * bcp)1055 bcache_destroy(bcache_t *bcp)
1056 {
1057 ASSERT(bcp != NULL);
1058
1059 mutex_enter(&bcp->mutex);
1060 if (bcp->alloc == 0) {
1061 kmem_cache_destroy(bcp->dblk_cache);
1062 kmem_cache_destroy(bcp->buffer_cache);
1063 mutex_exit(&bcp->mutex);
1064 mutex_destroy(&bcp->mutex);
1065 kmem_free(bcp, sizeof (bcache_t));
1066 } else {
1067 bcp->destroy++;
1068 mutex_exit(&bcp->mutex);
1069 }
1070 }
1071
1072 /*ARGSUSED*/
1073 mblk_t *
bcache_allocb(bcache_t * bcp,uint_t pri)1074 bcache_allocb(bcache_t *bcp, uint_t pri)
1075 {
1076 dblk_t *dbp;
1077 mblk_t *mp = NULL;
1078
1079 ASSERT(bcp != NULL);
1080
1081 mutex_enter(&bcp->mutex);
1082 if (bcp->destroy != 0) {
1083 mutex_exit(&bcp->mutex);
1084 goto out;
1085 }
1086
1087 if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
1088 mutex_exit(&bcp->mutex);
1089 goto out;
1090 }
1091 bcp->alloc++;
1092 mutex_exit(&bcp->mutex);
1093
1094 ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
1095
1096 mp = dbp->db_mblk;
1097 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1098 mp->b_next = mp->b_prev = mp->b_cont = NULL;
1099 mp->b_rptr = mp->b_wptr = dbp->db_base;
1100 mp->b_queue = NULL;
1101 MBLK_BAND_FLAG_WORD(mp) = 0;
1102 STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
1103 out:
1104 FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
1105
1106 return (mp);
1107 }
1108
1109 static void
dblk_lastfree_oversize(mblk_t * mp,dblk_t * dbp)1110 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
1111 {
1112 ASSERT(dbp->db_mblk == mp);
1113 if (dbp->db_fthdr != NULL)
1114 str_ftfree(dbp);
1115
1116 /* set credp and projid to be 'unspecified' before returning to cache */
1117 if (dbp->db_credp != NULL) {
1118 crfree(dbp->db_credp);
1119 dbp->db_credp = NULL;
1120 }
1121 dbp->db_cpid = -1;
1122 dbp->db_struioflag = 0;
1123 dbp->db_struioun.cksum.flags = 0;
1124
1125 kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
1126 kmem_cache_free(dbp->db_cache, dbp);
1127 }
1128
1129 static mblk_t *
allocb_oversize(size_t size,int kmflags)1130 allocb_oversize(size_t size, int kmflags)
1131 {
1132 mblk_t *mp;
1133 void *buf;
1134
1135 size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
1136 if ((buf = kmem_alloc(size, kmflags)) == NULL)
1137 return (NULL);
1138 if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
1139 &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
1140 kmem_free(buf, size);
1141
1142 if (mp != NULL)
1143 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
1144
1145 return (mp);
1146 }
1147
1148 mblk_t *
allocb_tryhard(size_t target_size)1149 allocb_tryhard(size_t target_size)
1150 {
1151 size_t size;
1152 mblk_t *bp;
1153
1154 for (size = target_size; size < target_size + 512;
1155 size += DBLK_CACHE_ALIGN)
1156 if ((bp = allocb(size, BPRI_HI)) != NULL)
1157 return (bp);
1158 allocb_tryhard_fails++;
1159 return (NULL);
1160 }
1161
1162 /*
1163 * This routine is consolidation private for STREAMS internal use
1164 * This routine may only be called from sync routines (i.e., not
1165 * from put or service procedures). It is located here (rather
1166 * than strsubr.c) so that we don't have to expose all of the
1167 * allocb() implementation details in header files.
1168 */
1169 mblk_t *
allocb_wait(size_t size,uint_t pri,uint_t flags,int * error)1170 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
1171 {
1172 dblk_t *dbp;
1173 mblk_t *mp;
1174 size_t index;
1175
1176 index = (size -1) >> DBLK_SIZE_SHIFT;
1177
1178 if (flags & STR_NOSIG) {
1179 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
1180 if (size != 0) {
1181 mp = allocb_oversize(size, KM_SLEEP);
1182 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
1183 (uintptr_t)mp);
1184 return (mp);
1185 }
1186 index = 0;
1187 }
1188
1189 dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
1190 mp = dbp->db_mblk;
1191 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1192 mp->b_next = mp->b_prev = mp->b_cont = NULL;
1193 mp->b_rptr = mp->b_wptr = dbp->db_base;
1194 mp->b_queue = NULL;
1195 MBLK_BAND_FLAG_WORD(mp) = 0;
1196 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
1197
1198 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
1199
1200 } else {
1201 while ((mp = allocb(size, pri)) == NULL) {
1202 if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
1203 return (NULL);
1204 }
1205 }
1206
1207 return (mp);
1208 }
1209
1210 /*
1211 * Call function 'func' with 'arg' when a class zero block can
1212 * be allocated with priority 'pri'.
1213 */
1214 bufcall_id_t
esbbcall(uint_t pri,void (* func)(void *),void * arg)1215 esbbcall(uint_t pri, void (*func)(void *), void *arg)
1216 {
1217 return (bufcall(1, pri, func, arg));
1218 }
1219
1220 /*
1221 * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
1222 * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
1223 * This provides consistency for all internal allocators of ioctl.
1224 */
1225 mblk_t *
mkiocb(uint_t cmd)1226 mkiocb(uint_t cmd)
1227 {
1228 struct iocblk *ioc;
1229 mblk_t *mp;
1230
1231 /*
1232 * Allocate enough space for any of the ioctl related messages.
1233 */
1234 if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
1235 return (NULL);
1236
1237 bzero(mp->b_rptr, sizeof (union ioctypes));
1238
1239 /*
1240 * Set the mblk_t information and ptrs correctly.
1241 */
1242 mp->b_wptr += sizeof (struct iocblk);
1243 mp->b_datap->db_type = M_IOCTL;
1244
1245 /*
1246 * Fill in the fields.
1247 */
1248 ioc = (struct iocblk *)mp->b_rptr;
1249 ioc->ioc_cmd = cmd;
1250 ioc->ioc_cr = kcred;
1251 ioc->ioc_id = getiocseqno();
1252 ioc->ioc_flag = IOC_NATIVE;
1253 return (mp);
1254 }
1255
1256 /*
1257 * test if block of given size can be allocated with a request of
1258 * the given priority.
1259 * 'pri' is no longer used, but is retained for compatibility.
1260 */
1261 /* ARGSUSED */
1262 int
testb(size_t size,uint_t pri)1263 testb(size_t size, uint_t pri)
1264 {
1265 return ((size + sizeof (dblk_t)) <= kmem_avail());
1266 }
1267
1268 /*
1269 * Call function 'func' with argument 'arg' when there is a reasonably
1270 * good chance that a block of size 'size' can be allocated.
1271 * 'pri' is no longer used, but is retained for compatibility.
1272 */
1273 /* ARGSUSED */
1274 bufcall_id_t
bufcall(size_t size,uint_t pri,void (* func)(void *),void * arg)1275 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
1276 {
1277 static long bid = 1; /* always odd to save checking for zero */
1278 bufcall_id_t bc_id;
1279 struct strbufcall *bcp;
1280
1281 if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
1282 return (0);
1283
1284 bcp->bc_func = func;
1285 bcp->bc_arg = arg;
1286 bcp->bc_size = size;
1287 bcp->bc_next = NULL;
1288 bcp->bc_executor = NULL;
1289
1290 mutex_enter(&strbcall_lock);
1291 /*
1292 * After bcp is linked into strbcalls and strbcall_lock is dropped there
1293 * should be no references to bcp since it may be freed by
1294 * runbufcalls(). Since bcp_id field is returned, we save its value in
1295 * the local var.
1296 */
1297 bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2); /* keep it odd */
1298
1299 /*
1300 * add newly allocated stream event to existing
1301 * linked list of events.
1302 */
1303 if (strbcalls.bc_head == NULL) {
1304 strbcalls.bc_head = strbcalls.bc_tail = bcp;
1305 } else {
1306 strbcalls.bc_tail->bc_next = bcp;
1307 strbcalls.bc_tail = bcp;
1308 }
1309
1310 cv_signal(&strbcall_cv);
1311 mutex_exit(&strbcall_lock);
1312 return (bc_id);
1313 }
1314
1315 /*
1316 * Cancel a bufcall request.
1317 */
1318 void
unbufcall(bufcall_id_t id)1319 unbufcall(bufcall_id_t id)
1320 {
1321 strbufcall_t *bcp, *pbcp;
1322
1323 mutex_enter(&strbcall_lock);
1324 again:
1325 pbcp = NULL;
1326 for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
1327 if (id == bcp->bc_id)
1328 break;
1329 pbcp = bcp;
1330 }
1331 if (bcp) {
1332 if (bcp->bc_executor != NULL) {
1333 if (bcp->bc_executor != curthread) {
1334 cv_wait(&bcall_cv, &strbcall_lock);
1335 goto again;
1336 }
1337 } else {
1338 if (pbcp)
1339 pbcp->bc_next = bcp->bc_next;
1340 else
1341 strbcalls.bc_head = bcp->bc_next;
1342 if (bcp == strbcalls.bc_tail)
1343 strbcalls.bc_tail = pbcp;
1344 kmem_free(bcp, sizeof (strbufcall_t));
1345 }
1346 }
1347 mutex_exit(&strbcall_lock);
1348 }
1349
1350 /*
1351 * Duplicate a message block by block (uses dupb), returning
1352 * a pointer to the duplicate message.
1353 * Returns a non-NULL value only if the entire message
1354 * was dup'd.
1355 */
1356 mblk_t *
dupmsg(mblk_t * bp)1357 dupmsg(mblk_t *bp)
1358 {
1359 mblk_t *head, *nbp;
1360
1361 if (!bp || !(nbp = head = dupb(bp)))
1362 return (NULL);
1363
1364 while (bp->b_cont) {
1365 if (!(nbp->b_cont = dupb(bp->b_cont))) {
1366 freemsg(head);
1367 return (NULL);
1368 }
1369 nbp = nbp->b_cont;
1370 bp = bp->b_cont;
1371 }
1372 return (head);
1373 }
1374
1375 #define DUPB_NOLOAN(bp) \
1376 ((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
1377 copyb((bp)) : dupb((bp)))
1378
1379 mblk_t *
dupmsg_noloan(mblk_t * bp)1380 dupmsg_noloan(mblk_t *bp)
1381 {
1382 mblk_t *head, *nbp;
1383
1384 if (bp == NULL || DB_TYPE(bp) != M_DATA ||
1385 ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
1386 return (NULL);
1387
1388 while (bp->b_cont) {
1389 if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
1390 freemsg(head);
1391 return (NULL);
1392 }
1393 nbp = nbp->b_cont;
1394 bp = bp->b_cont;
1395 }
1396 return (head);
1397 }
1398
1399 /*
1400 * Copy data from message and data block to newly allocated message and
1401 * data block. Returns new message block pointer, or NULL if error.
1402 * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
1403 * as in the original even when db_base is not word aligned. (bug 1052877)
1404 */
1405 mblk_t *
copyb(mblk_t * bp)1406 copyb(mblk_t *bp)
1407 {
1408 mblk_t *nbp;
1409 dblk_t *dp, *ndp;
1410 uchar_t *base;
1411 size_t size;
1412 size_t unaligned;
1413
1414 ASSERT(bp->b_wptr >= bp->b_rptr);
1415
1416 dp = bp->b_datap;
1417 if (dp->db_fthdr != NULL)
1418 STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
1419
1420 /*
1421 * Special handling for Multidata message; this should be
1422 * removed once a copy-callback routine is made available.
1423 */
1424 if (dp->db_type == M_MULTIDATA) {
1425 cred_t *cr;
1426
1427 if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL)
1428 return (NULL);
1429
1430 nbp->b_flag = bp->b_flag;
1431 nbp->b_band = bp->b_band;
1432 ndp = nbp->b_datap;
1433
1434 /* See comments below on potential issues. */
1435 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1436
1437 ASSERT(ndp->db_type == dp->db_type);
1438 cr = dp->db_credp;
1439 if (cr != NULL)
1440 crhold(ndp->db_credp = cr);
1441 ndp->db_cpid = dp->db_cpid;
1442 return (nbp);
1443 }
1444
1445 size = dp->db_lim - dp->db_base;
1446 unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
1447 if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
1448 return (NULL);
1449 nbp->b_flag = bp->b_flag;
1450 nbp->b_band = bp->b_band;
1451 ndp = nbp->b_datap;
1452
1453 /*
1454 * Well, here is a potential issue. If we are trying to
1455 * trace a flow, and we copy the message, we might lose
1456 * information about where this message might have been.
1457 * So we should inherit the FT data. On the other hand,
1458 * a user might be interested only in alloc to free data.
1459 * So I guess the real answer is to provide a tunable.
1460 */
1461 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1462
1463 base = ndp->db_base + unaligned;
1464 bcopy(dp->db_base, ndp->db_base + unaligned, size);
1465
1466 nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
1467 nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
1468
1469 return (nbp);
1470 }
1471
1472 /*
1473 * Copy data from message to newly allocated message using new
1474 * data blocks. Returns a pointer to the new message, or NULL if error.
1475 */
1476 mblk_t *
copymsg(mblk_t * bp)1477 copymsg(mblk_t *bp)
1478 {
1479 mblk_t *head, *nbp;
1480
1481 if (!bp || !(nbp = head = copyb(bp)))
1482 return (NULL);
1483
1484 while (bp->b_cont) {
1485 if (!(nbp->b_cont = copyb(bp->b_cont))) {
1486 freemsg(head);
1487 return (NULL);
1488 }
1489 nbp = nbp->b_cont;
1490 bp = bp->b_cont;
1491 }
1492 return (head);
1493 }
1494
1495 /*
1496 * link a message block to tail of message
1497 */
1498 void
linkb(mblk_t * mp,mblk_t * bp)1499 linkb(mblk_t *mp, mblk_t *bp)
1500 {
1501 ASSERT(mp && bp);
1502
1503 for (; mp->b_cont; mp = mp->b_cont)
1504 ;
1505 mp->b_cont = bp;
1506 }
1507
1508 /*
1509 * unlink a message block from head of message
1510 * return pointer to new message.
1511 * NULL if message becomes empty.
1512 */
1513 mblk_t *
unlinkb(mblk_t * bp)1514 unlinkb(mblk_t *bp)
1515 {
1516 mblk_t *bp1;
1517
1518 bp1 = bp->b_cont;
1519 bp->b_cont = NULL;
1520 return (bp1);
1521 }
1522
1523 /*
1524 * remove a message block "bp" from message "mp"
1525 *
1526 * Return pointer to new message or NULL if no message remains.
1527 * Return -1 if bp is not found in message.
1528 */
1529 mblk_t *
rmvb(mblk_t * mp,mblk_t * bp)1530 rmvb(mblk_t *mp, mblk_t *bp)
1531 {
1532 mblk_t *tmp;
1533 mblk_t *lastp = NULL;
1534
1535 ASSERT(mp && bp);
1536 for (tmp = mp; tmp; tmp = tmp->b_cont) {
1537 if (tmp == bp) {
1538 if (lastp)
1539 lastp->b_cont = tmp->b_cont;
1540 else
1541 mp = tmp->b_cont;
1542 tmp->b_cont = NULL;
1543 return (mp);
1544 }
1545 lastp = tmp;
1546 }
1547 return ((mblk_t *)-1);
1548 }
1549
1550 /*
1551 * Concatenate and align first len bytes of common
1552 * message type. Len == -1, means concat everything.
1553 * Returns 1 on success, 0 on failure
1554 * After the pullup, mp points to the pulled up data.
1555 */
1556 int
pullupmsg(mblk_t * mp,ssize_t len)1557 pullupmsg(mblk_t *mp, ssize_t len)
1558 {
1559 mblk_t *bp, *b_cont;
1560 dblk_t *dbp;
1561 ssize_t n;
1562
1563 ASSERT(mp->b_datap->db_ref > 0);
1564 ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
1565
1566 /*
1567 * We won't handle Multidata message, since it contains
1568 * metadata which this function has no knowledge of; we
1569 * assert on DEBUG, and return failure otherwise.
1570 */
1571 ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1572 if (mp->b_datap->db_type == M_MULTIDATA)
1573 return (0);
1574
1575 if (len == -1) {
1576 if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
1577 return (1);
1578 len = xmsgsize(mp);
1579 } else {
1580 ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
1581 ASSERT(first_mblk_len >= 0);
1582 /*
1583 * If the length is less than that of the first mblk,
1584 * we want to pull up the message into an aligned mblk.
1585 * Though not part of the spec, some callers assume it.
1586 */
1587 if (len <= first_mblk_len) {
1588 if (str_aligned(mp->b_rptr))
1589 return (1);
1590 len = first_mblk_len;
1591 } else if (xmsgsize(mp) < len)
1592 return (0);
1593 }
1594
1595 if ((bp = allocb_tmpl(len, mp)) == NULL)
1596 return (0);
1597
1598 dbp = bp->b_datap;
1599 *bp = *mp; /* swap mblks so bp heads the old msg... */
1600 mp->b_datap = dbp; /* ... and mp heads the new message */
1601 mp->b_datap->db_mblk = mp;
1602 bp->b_datap->db_mblk = bp;
1603 mp->b_rptr = mp->b_wptr = dbp->db_base;
1604
1605 do {
1606 ASSERT(bp->b_datap->db_ref > 0);
1607 ASSERT(bp->b_wptr >= bp->b_rptr);
1608 n = MIN(bp->b_wptr - bp->b_rptr, len);
1609 ASSERT(n >= 0); /* allow zero-length mblk_t's */
1610 if (n > 0)
1611 bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
1612 mp->b_wptr += n;
1613 bp->b_rptr += n;
1614 len -= n;
1615 if (bp->b_rptr != bp->b_wptr)
1616 break;
1617 b_cont = bp->b_cont;
1618 freeb(bp);
1619 bp = b_cont;
1620 } while (len && bp);
1621
1622 mp->b_cont = bp; /* tack on whatever wasn't pulled up */
1623
1624 return (1);
1625 }
1626
1627 /*
1628 * Concatenate and align at least the first len bytes of common message
1629 * type. Len == -1 means concatenate everything. The original message is
1630 * unaltered. Returns a pointer to a new message on success, otherwise
1631 * returns NULL.
1632 */
1633 mblk_t *
msgpullup(mblk_t * mp,ssize_t len)1634 msgpullup(mblk_t *mp, ssize_t len)
1635 {
1636 mblk_t *newmp;
1637 ssize_t totlen;
1638 ssize_t n;
1639
1640 /*
1641 * We won't handle Multidata message, since it contains
1642 * metadata which this function has no knowledge of; we
1643 * assert on DEBUG, and return failure otherwise.
1644 */
1645 ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1646 if (mp->b_datap->db_type == M_MULTIDATA)
1647 return (NULL);
1648
1649 totlen = xmsgsize(mp);
1650
1651 if ((len > 0) && (len > totlen))
1652 return (NULL);
1653
1654 /*
1655 * Copy all of the first msg type into one new mblk, then dupmsg
1656 * and link the rest onto this.
1657 */
1658
1659 len = totlen;
1660
1661 if ((newmp = allocb_tmpl(len, mp)) == NULL)
1662 return (NULL);
1663
1664 newmp->b_flag = mp->b_flag;
1665 newmp->b_band = mp->b_band;
1666
1667 while (len > 0) {
1668 n = mp->b_wptr - mp->b_rptr;
1669 ASSERT(n >= 0); /* allow zero-length mblk_t's */
1670 if (n > 0)
1671 bcopy(mp->b_rptr, newmp->b_wptr, n);
1672 newmp->b_wptr += n;
1673 len -= n;
1674 mp = mp->b_cont;
1675 }
1676
1677 if (mp != NULL) {
1678 newmp->b_cont = dupmsg(mp);
1679 if (newmp->b_cont == NULL) {
1680 freemsg(newmp);
1681 return (NULL);
1682 }
1683 }
1684
1685 return (newmp);
1686 }
1687
1688 /*
1689 * Trim bytes from message
1690 * len > 0, trim from head
1691 * len < 0, trim from tail
1692 * Returns 1 on success, 0 on failure.
1693 */
1694 int
adjmsg(mblk_t * mp,ssize_t len)1695 adjmsg(mblk_t *mp, ssize_t len)
1696 {
1697 mblk_t *bp;
1698 mblk_t *save_bp = NULL;
1699 mblk_t *prev_bp;
1700 mblk_t *bcont;
1701 unsigned char type;
1702 ssize_t n;
1703 int fromhead;
1704 int first;
1705
1706 ASSERT(mp != NULL);
1707 /*
1708 * We won't handle Multidata message, since it contains
1709 * metadata which this function has no knowledge of; we
1710 * assert on DEBUG, and return failure otherwise.
1711 */
1712 ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1713 if (mp->b_datap->db_type == M_MULTIDATA)
1714 return (0);
1715
1716 if (len < 0) {
1717 fromhead = 0;
1718 len = -len;
1719 } else {
1720 fromhead = 1;
1721 }
1722
1723 if (xmsgsize(mp) < len)
1724 return (0);
1725
1726 if (fromhead) {
1727 first = 1;
1728 while (len) {
1729 ASSERT(mp->b_wptr >= mp->b_rptr);
1730 n = MIN(mp->b_wptr - mp->b_rptr, len);
1731 mp->b_rptr += n;
1732 len -= n;
1733
1734 /*
1735 * If this is not the first zero length
1736 * message remove it
1737 */
1738 if (!first && (mp->b_wptr == mp->b_rptr)) {
1739 bcont = mp->b_cont;
1740 freeb(mp);
1741 mp = save_bp->b_cont = bcont;
1742 } else {
1743 save_bp = mp;
1744 mp = mp->b_cont;
1745 }
1746 first = 0;
1747 }
1748 } else {
1749 type = mp->b_datap->db_type;
1750 while (len) {
1751 bp = mp;
1752 save_bp = NULL;
1753
1754 /*
1755 * Find the last message of same type
1756 */
1757 while (bp && bp->b_datap->db_type == type) {
1758 ASSERT(bp->b_wptr >= bp->b_rptr);
1759 prev_bp = save_bp;
1760 save_bp = bp;
1761 bp = bp->b_cont;
1762 }
1763 if (save_bp == NULL)
1764 break;
1765 n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
1766 save_bp->b_wptr -= n;
1767 len -= n;
1768
1769 /*
1770 * If this is not the first message
1771 * and we have taken away everything
1772 * from this message, remove it
1773 */
1774
1775 if ((save_bp != mp) &&
1776 (save_bp->b_wptr == save_bp->b_rptr)) {
1777 bcont = save_bp->b_cont;
1778 freeb(save_bp);
1779 prev_bp->b_cont = bcont;
1780 }
1781 }
1782 }
1783 return (1);
1784 }
1785
1786 /*
1787 * get number of data bytes in message
1788 */
1789 size_t
msgdsize(mblk_t * bp)1790 msgdsize(mblk_t *bp)
1791 {
1792 size_t count = 0;
1793
1794 for (; bp; bp = bp->b_cont)
1795 if (bp->b_datap->db_type == M_DATA) {
1796 ASSERT(bp->b_wptr >= bp->b_rptr);
1797 count += bp->b_wptr - bp->b_rptr;
1798 }
1799 return (count);
1800 }
1801
1802 /*
1803 * Get a message off head of queue
1804 *
1805 * If queue has no buffers then mark queue
1806 * with QWANTR. (queue wants to be read by
1807 * someone when data becomes available)
1808 *
1809 * If there is something to take off then do so.
1810 * If queue falls below hi water mark turn off QFULL
1811 * flag. Decrement weighted count of queue.
1812 * Also turn off QWANTR because queue is being read.
1813 *
1814 * The queue count is maintained on a per-band basis.
1815 * Priority band 0 (normal messages) uses q_count,
1816 * q_lowat, etc. Non-zero priority bands use the
1817 * fields in their respective qband structures
1818 * (qb_count, qb_lowat, etc.) All messages appear
1819 * on the same list, linked via their b_next pointers.
1820 * q_first is the head of the list. q_count does
1821 * not reflect the size of all the messages on the
1822 * queue. It only reflects those messages in the
1823 * normal band of flow. The one exception to this
1824 * deals with high priority messages. They are in
1825 * their own conceptual "band", but are accounted
1826 * against q_count.
1827 *
1828 * If queue count is below the lo water mark and QWANTW
1829 * is set, enable the closest backq which has a service
1830 * procedure and turn off the QWANTW flag.
1831 *
1832 * getq could be built on top of rmvq, but isn't because
1833 * of performance considerations.
1834 *
1835 * A note on the use of q_count and q_mblkcnt:
1836 * q_count is the traditional byte count for messages that
1837 * have been put on a queue. Documentation tells us that
1838 * we shouldn't rely on that count, but some drivers/modules
1839 * do. What was needed, however, is a mechanism to prevent
1840 * runaway streams from consuming all of the resources,
1841 * and particularly be able to flow control zero-length
1842 * messages. q_mblkcnt is used for this purpose. It
1843 * counts the number of mblk's that are being put on
1844 * the queue. The intention here, is that each mblk should
1845 * contain one byte of data and, for the purpose of
1846 * flow-control, logically does. A queue will become
1847 * full when EITHER of these values (q_count and q_mblkcnt)
1848 * reach the highwater mark. It will clear when BOTH
1849 * of them drop below the highwater mark. And it will
1850 * backenable when BOTH of them drop below the lowwater
1851 * mark.
1852 * With this algorithm, a driver/module might be able
1853 * to find a reasonably accurate q_count, and the
1854 * framework can still try and limit resource usage.
1855 */
1856 mblk_t *
getq(queue_t * q)1857 getq(queue_t *q)
1858 {
1859 mblk_t *bp;
1860 uchar_t band = 0;
1861
1862 bp = getq_noenab(q, 0);
1863 if (bp != NULL)
1864 band = bp->b_band;
1865
1866 /*
1867 * Inlined from qbackenable().
1868 * Quick check without holding the lock.
1869 */
1870 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1871 return (bp);
1872
1873 qbackenable(q, band);
1874 return (bp);
1875 }
1876
1877 /*
1878 * Calculate number of data bytes in a single data message block taking
1879 * multidata messages into account.
1880 */
1881
1882 #define ADD_MBLK_SIZE(mp, size) \
1883 if (DB_TYPE(mp) != M_MULTIDATA) { \
1884 (size) += MBLKL(mp); \
1885 } else { \
1886 uint_t pinuse; \
1887 \
1888 mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse); \
1889 (size) += pinuse; \
1890 }
1891
1892 /*
1893 * Returns the number of bytes in a message (a message is defined as a
1894 * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we
1895 * also return the number of distinct mblks in the message.
1896 */
1897 int
mp_cont_len(mblk_t * bp,int * mblkcnt)1898 mp_cont_len(mblk_t *bp, int *mblkcnt)
1899 {
1900 mblk_t *mp;
1901 int mblks = 0;
1902 int bytes = 0;
1903
1904 for (mp = bp; mp != NULL; mp = mp->b_cont) {
1905 ADD_MBLK_SIZE(mp, bytes);
1906 mblks++;
1907 }
1908
1909 if (mblkcnt != NULL)
1910 *mblkcnt = mblks;
1911
1912 return (bytes);
1913 }
1914
1915 /*
1916 * Like getq() but does not backenable. This is used by the stream
1917 * head when a putback() is likely. The caller must call qbackenable()
1918 * after it is done with accessing the queue.
1919 * The rbytes arguments to getq_noneab() allows callers to specify a
1920 * the maximum number of bytes to return. If the current amount on the
1921 * queue is less than this then the entire message will be returned.
1922 * A value of 0 returns the entire message and is equivalent to the old
1923 * default behaviour prior to the addition of the rbytes argument.
1924 */
1925 mblk_t *
getq_noenab(queue_t * q,ssize_t rbytes)1926 getq_noenab(queue_t *q, ssize_t rbytes)
1927 {
1928 mblk_t *bp, *mp1;
1929 mblk_t *mp2 = NULL;
1930 qband_t *qbp;
1931 kthread_id_t freezer;
1932 int bytecnt = 0, mblkcnt = 0;
1933
1934 /* freezestr should allow its caller to call getq/putq */
1935 freezer = STREAM(q)->sd_freezer;
1936 if (freezer == curthread) {
1937 ASSERT(frozenstr(q));
1938 ASSERT(MUTEX_HELD(QLOCK(q)));
1939 } else
1940 mutex_enter(QLOCK(q));
1941
1942 if ((bp = q->q_first) == 0) {
1943 q->q_flag |= QWANTR;
1944 } else {
1945 /*
1946 * If the caller supplied a byte threshold and there is
1947 * more than this amount on the queue then break up the
1948 * the message appropriately. We can only safely do
1949 * this for M_DATA messages.
1950 */
1951 if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) &&
1952 (q->q_count > rbytes)) {
1953 /*
1954 * Inline version of mp_cont_len() which terminates
1955 * when we meet or exceed rbytes.
1956 */
1957 for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) {
1958 mblkcnt++;
1959 ADD_MBLK_SIZE(mp1, bytecnt);
1960 if (bytecnt >= rbytes)
1961 break;
1962 }
1963 /*
1964 * We need to account for the following scenarios:
1965 *
1966 * 1) Too much data in the first message:
1967 * mp1 will be the mblk which puts us over our
1968 * byte limit.
1969 * 2) Not enough data in the first message:
1970 * mp1 will be NULL.
1971 * 3) Exactly the right amount of data contained within
1972 * whole mblks:
1973 * mp1->b_cont will be where we break the message.
1974 */
1975 if (bytecnt > rbytes) {
1976 /*
1977 * Dup/copy mp1 and put what we don't need
1978 * back onto the queue. Adjust the read/write
1979 * and continuation pointers appropriately
1980 * and decrement the current mblk count to
1981 * reflect we are putting an mblk back onto
1982 * the queue.
1983 * When adjusting the message pointers, it's
1984 * OK to use the existing bytecnt and the
1985 * requested amount (rbytes) to calculate the
1986 * the new write offset (b_wptr) of what we
1987 * are taking. However, we cannot use these
1988 * values when calculating the read offset of
1989 * the mblk we are putting back on the queue.
1990 * This is because the begining (b_rptr) of the
1991 * mblk represents some arbitrary point within
1992 * the message.
1993 * It's simplest to do this by advancing b_rptr
1994 * by the new length of mp1 as we don't have to
1995 * remember any intermediate state.
1996 */
1997 ASSERT(mp1 != NULL);
1998 mblkcnt--;
1999 if ((mp2 = dupb(mp1)) == NULL &&
2000 (mp2 = copyb(mp1)) == NULL) {
2001 bytecnt = mblkcnt = 0;
2002 goto dup_failed;
2003 }
2004 mp2->b_cont = mp1->b_cont;
2005 mp1->b_wptr -= bytecnt - rbytes;
2006 mp2->b_rptr += mp1->b_wptr - mp1->b_rptr;
2007 mp1->b_cont = NULL;
2008 bytecnt = rbytes;
2009 } else {
2010 /*
2011 * Either there is not enough data in the first
2012 * message or there is no excess data to deal
2013 * with. If mp1 is NULL, we are taking the
2014 * whole message. No need to do anything.
2015 * Otherwise we assign mp1->b_cont to mp2 as
2016 * we will be putting this back onto the head of
2017 * the queue.
2018 */
2019 if (mp1 != NULL) {
2020 mp2 = mp1->b_cont;
2021 mp1->b_cont = NULL;
2022 }
2023 }
2024 /*
2025 * If mp2 is not NULL then we have part of the message
2026 * to put back onto the queue.
2027 */
2028 if (mp2 != NULL) {
2029 if ((mp2->b_next = bp->b_next) == NULL)
2030 q->q_last = mp2;
2031 else
2032 bp->b_next->b_prev = mp2;
2033 q->q_first = mp2;
2034 } else {
2035 if ((q->q_first = bp->b_next) == NULL)
2036 q->q_last = NULL;
2037 else
2038 q->q_first->b_prev = NULL;
2039 }
2040 } else {
2041 /*
2042 * Either no byte threshold was supplied, there is
2043 * not enough on the queue or we failed to
2044 * duplicate/copy a data block. In these cases we
2045 * just take the entire first message.
2046 */
2047 dup_failed:
2048 bytecnt = mp_cont_len(bp, &mblkcnt);
2049 if ((q->q_first = bp->b_next) == NULL)
2050 q->q_last = NULL;
2051 else
2052 q->q_first->b_prev = NULL;
2053 }
2054 if (bp->b_band == 0) {
2055 q->q_count -= bytecnt;
2056 q->q_mblkcnt -= mblkcnt;
2057 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2058 (q->q_mblkcnt < q->q_hiwat))) {
2059 q->q_flag &= ~QFULL;
2060 }
2061 } else {
2062 int i;
2063
2064 ASSERT(bp->b_band <= q->q_nband);
2065 ASSERT(q->q_bandp != NULL);
2066 ASSERT(MUTEX_HELD(QLOCK(q)));
2067 qbp = q->q_bandp;
2068 i = bp->b_band;
2069 while (--i > 0)
2070 qbp = qbp->qb_next;
2071 if (qbp->qb_first == qbp->qb_last) {
2072 qbp->qb_first = NULL;
2073 qbp->qb_last = NULL;
2074 } else {
2075 qbp->qb_first = bp->b_next;
2076 }
2077 qbp->qb_count -= bytecnt;
2078 qbp->qb_mblkcnt -= mblkcnt;
2079 if (qbp->qb_mblkcnt == 0 ||
2080 ((qbp->qb_count < qbp->qb_hiwat) &&
2081 (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2082 qbp->qb_flag &= ~QB_FULL;
2083 }
2084 }
2085 q->q_flag &= ~QWANTR;
2086 bp->b_next = NULL;
2087 bp->b_prev = NULL;
2088 }
2089 if (freezer != curthread)
2090 mutex_exit(QLOCK(q));
2091
2092 STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL);
2093
2094 return (bp);
2095 }
2096
2097 /*
2098 * Determine if a backenable is needed after removing a message in the
2099 * specified band.
2100 * NOTE: This routine assumes that something like getq_noenab() has been
2101 * already called.
2102 *
2103 * For the read side it is ok to hold sd_lock across calling this (and the
2104 * stream head often does).
2105 * But for the write side strwakeq might be invoked and it acquires sd_lock.
2106 */
2107 void
qbackenable(queue_t * q,uchar_t band)2108 qbackenable(queue_t *q, uchar_t band)
2109 {
2110 int backenab = 0;
2111 qband_t *qbp;
2112 kthread_id_t freezer;
2113
2114 ASSERT(q);
2115 ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
2116
2117 /*
2118 * Quick check without holding the lock.
2119 * OK since after getq() has lowered the q_count these flags
2120 * would not change unless either the qbackenable() is done by
2121 * another thread (which is ok) or the queue has gotten QFULL
2122 * in which case another backenable will take place when the queue
2123 * drops below q_lowat.
2124 */
2125 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
2126 return;
2127
2128 /* freezestr should allow its caller to call getq/putq */
2129 freezer = STREAM(q)->sd_freezer;
2130 if (freezer == curthread) {
2131 ASSERT(frozenstr(q));
2132 ASSERT(MUTEX_HELD(QLOCK(q)));
2133 } else
2134 mutex_enter(QLOCK(q));
2135
2136 if (band == 0) {
2137 if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
2138 q->q_mblkcnt < q->q_lowat)) {
2139 backenab = q->q_flag & (QWANTW|QWANTWSYNC);
2140 }
2141 } else {
2142 int i;
2143
2144 ASSERT((unsigned)band <= q->q_nband);
2145 ASSERT(q->q_bandp != NULL);
2146
2147 qbp = q->q_bandp;
2148 i = band;
2149 while (--i > 0)
2150 qbp = qbp->qb_next;
2151
2152 if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
2153 qbp->qb_mblkcnt < qbp->qb_lowat)) {
2154 backenab = qbp->qb_flag & QB_WANTW;
2155 }
2156 }
2157
2158 if (backenab == 0) {
2159 if (freezer != curthread)
2160 mutex_exit(QLOCK(q));
2161 return;
2162 }
2163
2164 /* Have to drop the lock across strwakeq and backenable */
2165 if (backenab & QWANTWSYNC)
2166 q->q_flag &= ~QWANTWSYNC;
2167 if (backenab & (QWANTW|QB_WANTW)) {
2168 if (band != 0)
2169 qbp->qb_flag &= ~QB_WANTW;
2170 else {
2171 q->q_flag &= ~QWANTW;
2172 }
2173 }
2174
2175 if (freezer != curthread)
2176 mutex_exit(QLOCK(q));
2177
2178 if (backenab & QWANTWSYNC)
2179 strwakeq(q, QWANTWSYNC);
2180 if (backenab & (QWANTW|QB_WANTW))
2181 backenable(q, band);
2182 }
2183
2184 /*
2185 * Remove a message from a queue. The queue count and other
2186 * flow control parameters are adjusted and the back queue
2187 * enabled if necessary.
2188 *
2189 * rmvq can be called with the stream frozen, but other utility functions
2190 * holding QLOCK, and by streams modules without any locks/frozen.
2191 */
2192 void
rmvq(queue_t * q,mblk_t * mp)2193 rmvq(queue_t *q, mblk_t *mp)
2194 {
2195 ASSERT(mp != NULL);
2196
2197 rmvq_noenab(q, mp);
2198 if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
2199 /*
2200 * qbackenable can handle a frozen stream but not a "random"
2201 * qlock being held. Drop lock across qbackenable.
2202 */
2203 mutex_exit(QLOCK(q));
2204 qbackenable(q, mp->b_band);
2205 mutex_enter(QLOCK(q));
2206 } else {
2207 qbackenable(q, mp->b_band);
2208 }
2209 }
2210
2211 /*
2212 * Like rmvq() but without any backenabling.
2213 * This exists to handle SR_CONSOL_DATA in strrput().
2214 */
2215 void
rmvq_noenab(queue_t * q,mblk_t * mp)2216 rmvq_noenab(queue_t *q, mblk_t *mp)
2217 {
2218 int i;
2219 qband_t *qbp = NULL;
2220 kthread_id_t freezer;
2221 int bytecnt = 0, mblkcnt = 0;
2222
2223 freezer = STREAM(q)->sd_freezer;
2224 if (freezer == curthread) {
2225 ASSERT(frozenstr(q));
2226 ASSERT(MUTEX_HELD(QLOCK(q)));
2227 } else if (MUTEX_HELD(QLOCK(q))) {
2228 /* Don't drop lock on exit */
2229 freezer = curthread;
2230 } else
2231 mutex_enter(QLOCK(q));
2232
2233 ASSERT(mp->b_band <= q->q_nband);
2234 if (mp->b_band != 0) { /* Adjust band pointers */
2235 ASSERT(q->q_bandp != NULL);
2236 qbp = q->q_bandp;
2237 i = mp->b_band;
2238 while (--i > 0)
2239 qbp = qbp->qb_next;
2240 if (mp == qbp->qb_first) {
2241 if (mp->b_next && mp->b_band == mp->b_next->b_band)
2242 qbp->qb_first = mp->b_next;
2243 else
2244 qbp->qb_first = NULL;
2245 }
2246 if (mp == qbp->qb_last) {
2247 if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
2248 qbp->qb_last = mp->b_prev;
2249 else
2250 qbp->qb_last = NULL;
2251 }
2252 }
2253
2254 /*
2255 * Remove the message from the list.
2256 */
2257 if (mp->b_prev)
2258 mp->b_prev->b_next = mp->b_next;
2259 else
2260 q->q_first = mp->b_next;
2261 if (mp->b_next)
2262 mp->b_next->b_prev = mp->b_prev;
2263 else
2264 q->q_last = mp->b_prev;
2265 mp->b_next = NULL;
2266 mp->b_prev = NULL;
2267
2268 /* Get the size of the message for q_count accounting */
2269 bytecnt = mp_cont_len(mp, &mblkcnt);
2270
2271 if (mp->b_band == 0) { /* Perform q_count accounting */
2272 q->q_count -= bytecnt;
2273 q->q_mblkcnt -= mblkcnt;
2274 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2275 (q->q_mblkcnt < q->q_hiwat))) {
2276 q->q_flag &= ~QFULL;
2277 }
2278 } else { /* Perform qb_count accounting */
2279 qbp->qb_count -= bytecnt;
2280 qbp->qb_mblkcnt -= mblkcnt;
2281 if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) &&
2282 (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2283 qbp->qb_flag &= ~QB_FULL;
2284 }
2285 }
2286 if (freezer != curthread)
2287 mutex_exit(QLOCK(q));
2288
2289 STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL);
2290 }
2291
2292 /*
2293 * Empty a queue.
2294 * If flag is set, remove all messages. Otherwise, remove
2295 * only non-control messages. If queue falls below its low
2296 * water mark, and QWANTW is set, enable the nearest upstream
2297 * service procedure.
2298 *
2299 * Historical note: when merging the M_FLUSH code in strrput with this
2300 * code one difference was discovered. flushq did not have a check
2301 * for q_lowat == 0 in the backenabling test.
2302 *
2303 * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
2304 * if one exists on the queue.
2305 */
2306 void
flushq_common(queue_t * q,int flag,int pcproto_flag)2307 flushq_common(queue_t *q, int flag, int pcproto_flag)
2308 {
2309 mblk_t *mp, *nmp;
2310 qband_t *qbp;
2311 int backenab = 0;
2312 unsigned char bpri;
2313 unsigned char qbf[NBAND]; /* band flushing backenable flags */
2314
2315 if (q->q_first == NULL)
2316 return;
2317
2318 mutex_enter(QLOCK(q));
2319 mp = q->q_first;
2320 q->q_first = NULL;
2321 q->q_last = NULL;
2322 q->q_count = 0;
2323 q->q_mblkcnt = 0;
2324 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2325 qbp->qb_first = NULL;
2326 qbp->qb_last = NULL;
2327 qbp->qb_count = 0;
2328 qbp->qb_mblkcnt = 0;
2329 qbp->qb_flag &= ~QB_FULL;
2330 }
2331 q->q_flag &= ~QFULL;
2332 mutex_exit(QLOCK(q));
2333 while (mp) {
2334 nmp = mp->b_next;
2335 mp->b_next = mp->b_prev = NULL;
2336
2337 STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL);
2338
2339 if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
2340 (void) putq(q, mp);
2341 else if (flag || datamsg(mp->b_datap->db_type))
2342 freemsg(mp);
2343 else
2344 (void) putq(q, mp);
2345 mp = nmp;
2346 }
2347 bpri = 1;
2348 mutex_enter(QLOCK(q));
2349 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2350 if ((qbp->qb_flag & QB_WANTW) &&
2351 (((qbp->qb_count < qbp->qb_lowat) &&
2352 (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
2353 qbp->qb_lowat == 0)) {
2354 qbp->qb_flag &= ~QB_WANTW;
2355 backenab = 1;
2356 qbf[bpri] = 1;
2357 } else
2358 qbf[bpri] = 0;
2359 bpri++;
2360 }
2361 ASSERT(bpri == (unsigned char)(q->q_nband + 1));
2362 if ((q->q_flag & QWANTW) &&
2363 (((q->q_count < q->q_lowat) &&
2364 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2365 q->q_flag &= ~QWANTW;
2366 backenab = 1;
2367 qbf[0] = 1;
2368 } else
2369 qbf[0] = 0;
2370
2371 /*
2372 * If any band can now be written to, and there is a writer
2373 * for that band, then backenable the closest service procedure.
2374 */
2375 if (backenab) {
2376 mutex_exit(QLOCK(q));
2377 for (bpri = q->q_nband; bpri != 0; bpri--)
2378 if (qbf[bpri])
2379 backenable(q, bpri);
2380 if (qbf[0])
2381 backenable(q, 0);
2382 } else
2383 mutex_exit(QLOCK(q));
2384 }
2385
2386 /*
2387 * The real flushing takes place in flushq_common. This is done so that
2388 * a flag which specifies whether or not M_PCPROTO messages should be flushed
2389 * or not. Currently the only place that uses this flag is the stream head.
2390 */
2391 void
flushq(queue_t * q,int flag)2392 flushq(queue_t *q, int flag)
2393 {
2394 flushq_common(q, flag, 0);
2395 }
2396
2397 /*
2398 * Flush the queue of messages of the given priority band.
2399 * There is some duplication of code between flushq and flushband.
2400 * This is because we want to optimize the code as much as possible.
2401 * The assumption is that there will be more messages in the normal
2402 * (priority 0) band than in any other.
2403 *
2404 * Historical note: when merging the M_FLUSH code in strrput with this
2405 * code one difference was discovered. flushband had an extra check for
2406 * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
2407 * case. That check does not match the man page for flushband and was not
2408 * in the strrput flush code hence it was removed.
2409 */
2410 void
flushband(queue_t * q,unsigned char pri,int flag)2411 flushband(queue_t *q, unsigned char pri, int flag)
2412 {
2413 mblk_t *mp;
2414 mblk_t *nmp;
2415 mblk_t *last;
2416 qband_t *qbp;
2417 int band;
2418
2419 ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
2420 if (pri > q->q_nband) {
2421 return;
2422 }
2423 mutex_enter(QLOCK(q));
2424 if (pri == 0) {
2425 mp = q->q_first;
2426 q->q_first = NULL;
2427 q->q_last = NULL;
2428 q->q_count = 0;
2429 q->q_mblkcnt = 0;
2430 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2431 qbp->qb_first = NULL;
2432 qbp->qb_last = NULL;
2433 qbp->qb_count = 0;
2434 qbp->qb_mblkcnt = 0;
2435 qbp->qb_flag &= ~QB_FULL;
2436 }
2437 q->q_flag &= ~QFULL;
2438 mutex_exit(QLOCK(q));
2439 while (mp) {
2440 nmp = mp->b_next;
2441 mp->b_next = mp->b_prev = NULL;
2442 if ((mp->b_band == 0) &&
2443 ((flag == FLUSHALL) ||
2444 datamsg(mp->b_datap->db_type)))
2445 freemsg(mp);
2446 else
2447 (void) putq(q, mp);
2448 mp = nmp;
2449 }
2450 mutex_enter(QLOCK(q));
2451 if ((q->q_flag & QWANTW) &&
2452 (((q->q_count < q->q_lowat) &&
2453 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2454 q->q_flag &= ~QWANTW;
2455 mutex_exit(QLOCK(q));
2456
2457 backenable(q, pri);
2458 } else
2459 mutex_exit(QLOCK(q));
2460 } else { /* pri != 0 */
2461 boolean_t flushed = B_FALSE;
2462 band = pri;
2463
2464 ASSERT(MUTEX_HELD(QLOCK(q)));
2465 qbp = q->q_bandp;
2466 while (--band > 0)
2467 qbp = qbp->qb_next;
2468 mp = qbp->qb_first;
2469 if (mp == NULL) {
2470 mutex_exit(QLOCK(q));
2471 return;
2472 }
2473 last = qbp->qb_last->b_next;
2474 /*
2475 * rmvq_noenab() and freemsg() are called for each mblk that
2476 * meets the criteria. The loop is executed until the last
2477 * mblk has been processed.
2478 */
2479 while (mp != last) {
2480 ASSERT(mp->b_band == pri);
2481 nmp = mp->b_next;
2482 if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
2483 rmvq_noenab(q, mp);
2484 freemsg(mp);
2485 flushed = B_TRUE;
2486 }
2487 mp = nmp;
2488 }
2489 mutex_exit(QLOCK(q));
2490
2491 /*
2492 * If any mblk(s) has been freed, we know that qbackenable()
2493 * will need to be called.
2494 */
2495 if (flushed)
2496 qbackenable(q, pri);
2497 }
2498 }
2499
2500 /*
2501 * Return 1 if the queue is not full. If the queue is full, return
2502 * 0 (may not put message) and set QWANTW flag (caller wants to write
2503 * to the queue).
2504 */
2505 int
canput(queue_t * q)2506 canput(queue_t *q)
2507 {
2508 TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
2509
2510 /* this is for loopback transports, they should not do a canput */
2511 ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
2512
2513 /* Find next forward module that has a service procedure */
2514 q = q->q_nfsrv;
2515
2516 if (!(q->q_flag & QFULL)) {
2517 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2518 return (1);
2519 }
2520 mutex_enter(QLOCK(q));
2521 if (q->q_flag & QFULL) {
2522 q->q_flag |= QWANTW;
2523 mutex_exit(QLOCK(q));
2524 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
2525 return (0);
2526 }
2527 mutex_exit(QLOCK(q));
2528 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2529 return (1);
2530 }
2531
2532 /*
2533 * This is the new canput for use with priority bands. Return 1 if the
2534 * band is not full. If the band is full, return 0 (may not put message)
2535 * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
2536 * write to the queue).
2537 */
2538 int
bcanput(queue_t * q,unsigned char pri)2539 bcanput(queue_t *q, unsigned char pri)
2540 {
2541 qband_t *qbp;
2542
2543 TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
2544 if (!q)
2545 return (0);
2546
2547 /* Find next forward module that has a service procedure */
2548 q = q->q_nfsrv;
2549
2550 mutex_enter(QLOCK(q));
2551 if (pri == 0) {
2552 if (q->q_flag & QFULL) {
2553 q->q_flag |= QWANTW;
2554 mutex_exit(QLOCK(q));
2555 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2556 "bcanput:%p %X %d", q, pri, 0);
2557 return (0);
2558 }
2559 } else { /* pri != 0 */
2560 if (pri > q->q_nband) {
2561 /*
2562 * No band exists yet, so return success.
2563 */
2564 mutex_exit(QLOCK(q));
2565 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2566 "bcanput:%p %X %d", q, pri, 1);
2567 return (1);
2568 }
2569 qbp = q->q_bandp;
2570 while (--pri)
2571 qbp = qbp->qb_next;
2572 if (qbp->qb_flag & QB_FULL) {
2573 qbp->qb_flag |= QB_WANTW;
2574 mutex_exit(QLOCK(q));
2575 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2576 "bcanput:%p %X %d", q, pri, 0);
2577 return (0);
2578 }
2579 }
2580 mutex_exit(QLOCK(q));
2581 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2582 "bcanput:%p %X %d", q, pri, 1);
2583 return (1);
2584 }
2585
2586 /*
2587 * Put a message on a queue.
2588 *
2589 * Messages are enqueued on a priority basis. The priority classes
2590 * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
2591 * and B_NORMAL (type < QPCTL && band == 0).
2592 *
2593 * Add appropriate weighted data block sizes to queue count.
2594 * If queue hits high water mark then set QFULL flag.
2595 *
2596 * If QNOENAB is not set (putq is allowed to enable the queue),
2597 * enable the queue only if the message is PRIORITY,
2598 * or the QWANTR flag is set (indicating that the service procedure
2599 * is ready to read the queue. This implies that a service
2600 * procedure must NEVER put a high priority message back on its own
2601 * queue, as this would result in an infinite loop (!).
2602 */
2603 int
putq(queue_t * q,mblk_t * bp)2604 putq(queue_t *q, mblk_t *bp)
2605 {
2606 mblk_t *tmp;
2607 qband_t *qbp = NULL;
2608 int mcls = (int)queclass(bp);
2609 kthread_id_t freezer;
2610 int bytecnt = 0, mblkcnt = 0;
2611
2612 freezer = STREAM(q)->sd_freezer;
2613 if (freezer == curthread) {
2614 ASSERT(frozenstr(q));
2615 ASSERT(MUTEX_HELD(QLOCK(q)));
2616 } else
2617 mutex_enter(QLOCK(q));
2618
2619 /*
2620 * Make sanity checks and if qband structure is not yet
2621 * allocated, do so.
2622 */
2623 if (mcls == QPCTL) {
2624 if (bp->b_band != 0)
2625 bp->b_band = 0; /* force to be correct */
2626 } else if (bp->b_band != 0) {
2627 int i;
2628 qband_t **qbpp;
2629
2630 if (bp->b_band > q->q_nband) {
2631
2632 /*
2633 * The qband structure for this priority band is
2634 * not on the queue yet, so we have to allocate
2635 * one on the fly. It would be wasteful to
2636 * associate the qband structures with every
2637 * queue when the queues are allocated. This is
2638 * because most queues will only need the normal
2639 * band of flow which can be described entirely
2640 * by the queue itself.
2641 */
2642 qbpp = &q->q_bandp;
2643 while (*qbpp)
2644 qbpp = &(*qbpp)->qb_next;
2645 while (bp->b_band > q->q_nband) {
2646 if ((*qbpp = allocband()) == NULL) {
2647 if (freezer != curthread)
2648 mutex_exit(QLOCK(q));
2649 return (0);
2650 }
2651 (*qbpp)->qb_hiwat = q->q_hiwat;
2652 (*qbpp)->qb_lowat = q->q_lowat;
2653 q->q_nband++;
2654 qbpp = &(*qbpp)->qb_next;
2655 }
2656 }
2657 ASSERT(MUTEX_HELD(QLOCK(q)));
2658 qbp = q->q_bandp;
2659 i = bp->b_band;
2660 while (--i)
2661 qbp = qbp->qb_next;
2662 }
2663
2664 /*
2665 * If queue is empty, add the message and initialize the pointers.
2666 * Otherwise, adjust message pointers and queue pointers based on
2667 * the type of the message and where it belongs on the queue. Some
2668 * code is duplicated to minimize the number of conditionals and
2669 * hopefully minimize the amount of time this routine takes.
2670 */
2671 if (!q->q_first) {
2672 bp->b_next = NULL;
2673 bp->b_prev = NULL;
2674 q->q_first = bp;
2675 q->q_last = bp;
2676 if (qbp) {
2677 qbp->qb_first = bp;
2678 qbp->qb_last = bp;
2679 }
2680 } else if (!qbp) { /* bp->b_band == 0 */
2681
2682 /*
2683 * If queue class of message is less than or equal to
2684 * that of the last one on the queue, tack on to the end.
2685 */
2686 tmp = q->q_last;
2687 if (mcls <= (int)queclass(tmp)) {
2688 bp->b_next = NULL;
2689 bp->b_prev = tmp;
2690 tmp->b_next = bp;
2691 q->q_last = bp;
2692 } else {
2693 tmp = q->q_first;
2694 while ((int)queclass(tmp) >= mcls)
2695 tmp = tmp->b_next;
2696
2697 /*
2698 * Insert bp before tmp.
2699 */
2700 bp->b_next = tmp;
2701 bp->b_prev = tmp->b_prev;
2702 if (tmp->b_prev)
2703 tmp->b_prev->b_next = bp;
2704 else
2705 q->q_first = bp;
2706 tmp->b_prev = bp;
2707 }
2708 } else { /* bp->b_band != 0 */
2709 if (qbp->qb_first) {
2710 tmp = qbp->qb_last;
2711
2712 /*
2713 * Insert bp after the last message in this band.
2714 */
2715 bp->b_next = tmp->b_next;
2716 if (tmp->b_next)
2717 tmp->b_next->b_prev = bp;
2718 else
2719 q->q_last = bp;
2720 bp->b_prev = tmp;
2721 tmp->b_next = bp;
2722 } else {
2723 tmp = q->q_last;
2724 if ((mcls < (int)queclass(tmp)) ||
2725 (bp->b_band <= tmp->b_band)) {
2726
2727 /*
2728 * Tack bp on end of queue.
2729 */
2730 bp->b_next = NULL;
2731 bp->b_prev = tmp;
2732 tmp->b_next = bp;
2733 q->q_last = bp;
2734 } else {
2735 tmp = q->q_first;
2736 while (tmp->b_datap->db_type >= QPCTL)
2737 tmp = tmp->b_next;
2738 while (tmp->b_band >= bp->b_band)
2739 tmp = tmp->b_next;
2740
2741 /*
2742 * Insert bp before tmp.
2743 */
2744 bp->b_next = tmp;
2745 bp->b_prev = tmp->b_prev;
2746 if (tmp->b_prev)
2747 tmp->b_prev->b_next = bp;
2748 else
2749 q->q_first = bp;
2750 tmp->b_prev = bp;
2751 }
2752 qbp->qb_first = bp;
2753 }
2754 qbp->qb_last = bp;
2755 }
2756
2757 /* Get message byte count for q_count accounting */
2758 bytecnt = mp_cont_len(bp, &mblkcnt);
2759
2760 if (qbp) {
2761 qbp->qb_count += bytecnt;
2762 qbp->qb_mblkcnt += mblkcnt;
2763 if ((qbp->qb_count >= qbp->qb_hiwat) ||
2764 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2765 qbp->qb_flag |= QB_FULL;
2766 }
2767 } else {
2768 q->q_count += bytecnt;
2769 q->q_mblkcnt += mblkcnt;
2770 if ((q->q_count >= q->q_hiwat) ||
2771 (q->q_mblkcnt >= q->q_hiwat)) {
2772 q->q_flag |= QFULL;
2773 }
2774 }
2775
2776 STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL);
2777
2778 if ((mcls > QNORM) ||
2779 (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
2780 qenable_locked(q);
2781 ASSERT(MUTEX_HELD(QLOCK(q)));
2782 if (freezer != curthread)
2783 mutex_exit(QLOCK(q));
2784
2785 return (1);
2786 }
2787
2788 /*
2789 * Put stuff back at beginning of Q according to priority order.
2790 * See comment on putq above for details.
2791 */
2792 int
putbq(queue_t * q,mblk_t * bp)2793 putbq(queue_t *q, mblk_t *bp)
2794 {
2795 mblk_t *tmp;
2796 qband_t *qbp = NULL;
2797 int mcls = (int)queclass(bp);
2798 kthread_id_t freezer;
2799 int bytecnt = 0, mblkcnt = 0;
2800
2801 ASSERT(q && bp);
2802 ASSERT(bp->b_next == NULL);
2803 freezer = STREAM(q)->sd_freezer;
2804 if (freezer == curthread) {
2805 ASSERT(frozenstr(q));
2806 ASSERT(MUTEX_HELD(QLOCK(q)));
2807 } else
2808 mutex_enter(QLOCK(q));
2809
2810 /*
2811 * Make sanity checks and if qband structure is not yet
2812 * allocated, do so.
2813 */
2814 if (mcls == QPCTL) {
2815 if (bp->b_band != 0)
2816 bp->b_band = 0; /* force to be correct */
2817 } else if (bp->b_band != 0) {
2818 int i;
2819 qband_t **qbpp;
2820
2821 if (bp->b_band > q->q_nband) {
2822 qbpp = &q->q_bandp;
2823 while (*qbpp)
2824 qbpp = &(*qbpp)->qb_next;
2825 while (bp->b_band > q->q_nband) {
2826 if ((*qbpp = allocband()) == NULL) {
2827 if (freezer != curthread)
2828 mutex_exit(QLOCK(q));
2829 return (0);
2830 }
2831 (*qbpp)->qb_hiwat = q->q_hiwat;
2832 (*qbpp)->qb_lowat = q->q_lowat;
2833 q->q_nband++;
2834 qbpp = &(*qbpp)->qb_next;
2835 }
2836 }
2837 qbp = q->q_bandp;
2838 i = bp->b_band;
2839 while (--i)
2840 qbp = qbp->qb_next;
2841 }
2842
2843 /*
2844 * If queue is empty or if message is high priority,
2845 * place on the front of the queue.
2846 */
2847 tmp = q->q_first;
2848 if ((!tmp) || (mcls == QPCTL)) {
2849 bp->b_next = tmp;
2850 if (tmp)
2851 tmp->b_prev = bp;
2852 else
2853 q->q_last = bp;
2854 q->q_first = bp;
2855 bp->b_prev = NULL;
2856 if (qbp) {
2857 qbp->qb_first = bp;
2858 qbp->qb_last = bp;
2859 }
2860 } else if (qbp) { /* bp->b_band != 0 */
2861 tmp = qbp->qb_first;
2862 if (tmp) {
2863
2864 /*
2865 * Insert bp before the first message in this band.
2866 */
2867 bp->b_next = tmp;
2868 bp->b_prev = tmp->b_prev;
2869 if (tmp->b_prev)
2870 tmp->b_prev->b_next = bp;
2871 else
2872 q->q_first = bp;
2873 tmp->b_prev = bp;
2874 } else {
2875 tmp = q->q_last;
2876 if ((mcls < (int)queclass(tmp)) ||
2877 (bp->b_band < tmp->b_band)) {
2878
2879 /*
2880 * Tack bp on end of queue.
2881 */
2882 bp->b_next = NULL;
2883 bp->b_prev = tmp;
2884 tmp->b_next = bp;
2885 q->q_last = bp;
2886 } else {
2887 tmp = q->q_first;
2888 while (tmp->b_datap->db_type >= QPCTL)
2889 tmp = tmp->b_next;
2890 while (tmp->b_band > bp->b_band)
2891 tmp = tmp->b_next;
2892
2893 /*
2894 * Insert bp before tmp.
2895 */
2896 bp->b_next = tmp;
2897 bp->b_prev = tmp->b_prev;
2898 if (tmp->b_prev)
2899 tmp->b_prev->b_next = bp;
2900 else
2901 q->q_first = bp;
2902 tmp->b_prev = bp;
2903 }
2904 qbp->qb_last = bp;
2905 }
2906 qbp->qb_first = bp;
2907 } else { /* bp->b_band == 0 && !QPCTL */
2908
2909 /*
2910 * If the queue class or band is less than that of the last
2911 * message on the queue, tack bp on the end of the queue.
2912 */
2913 tmp = q->q_last;
2914 if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
2915 bp->b_next = NULL;
2916 bp->b_prev = tmp;
2917 tmp->b_next = bp;
2918 q->q_last = bp;
2919 } else {
2920 tmp = q->q_first;
2921 while (tmp->b_datap->db_type >= QPCTL)
2922 tmp = tmp->b_next;
2923 while (tmp->b_band > bp->b_band)
2924 tmp = tmp->b_next;
2925
2926 /*
2927 * Insert bp before tmp.
2928 */
2929 bp->b_next = tmp;
2930 bp->b_prev = tmp->b_prev;
2931 if (tmp->b_prev)
2932 tmp->b_prev->b_next = bp;
2933 else
2934 q->q_first = bp;
2935 tmp->b_prev = bp;
2936 }
2937 }
2938
2939 /* Get message byte count for q_count accounting */
2940 bytecnt = mp_cont_len(bp, &mblkcnt);
2941
2942 if (qbp) {
2943 qbp->qb_count += bytecnt;
2944 qbp->qb_mblkcnt += mblkcnt;
2945 if ((qbp->qb_count >= qbp->qb_hiwat) ||
2946 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2947 qbp->qb_flag |= QB_FULL;
2948 }
2949 } else {
2950 q->q_count += bytecnt;
2951 q->q_mblkcnt += mblkcnt;
2952 if ((q->q_count >= q->q_hiwat) ||
2953 (q->q_mblkcnt >= q->q_hiwat)) {
2954 q->q_flag |= QFULL;
2955 }
2956 }
2957
2958 STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL);
2959
2960 if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
2961 qenable_locked(q);
2962 ASSERT(MUTEX_HELD(QLOCK(q)));
2963 if (freezer != curthread)
2964 mutex_exit(QLOCK(q));
2965
2966 return (1);
2967 }
2968
2969 /*
2970 * Insert a message before an existing message on the queue. If the
2971 * existing message is NULL, the new messages is placed on the end of
2972 * the queue. The queue class of the new message is ignored. However,
2973 * the priority band of the new message must adhere to the following
2974 * ordering:
2975 *
2976 * emp->b_prev->b_band >= mp->b_band >= emp->b_band.
2977 *
2978 * All flow control parameters are updated.
2979 *
2980 * insq can be called with the stream frozen, but other utility functions
2981 * holding QLOCK, and by streams modules without any locks/frozen.
2982 */
2983 int
insq(queue_t * q,mblk_t * emp,mblk_t * mp)2984 insq(queue_t *q, mblk_t *emp, mblk_t *mp)
2985 {
2986 mblk_t *tmp;
2987 qband_t *qbp = NULL;
2988 int mcls = (int)queclass(mp);
2989 kthread_id_t freezer;
2990 int bytecnt = 0, mblkcnt = 0;
2991
2992 freezer = STREAM(q)->sd_freezer;
2993 if (freezer == curthread) {
2994 ASSERT(frozenstr(q));
2995 ASSERT(MUTEX_HELD(QLOCK(q)));
2996 } else if (MUTEX_HELD(QLOCK(q))) {
2997 /* Don't drop lock on exit */
2998 freezer = curthread;
2999 } else
3000 mutex_enter(QLOCK(q));
3001
3002 if (mcls == QPCTL) {
3003 if (mp->b_band != 0)
3004 mp->b_band = 0; /* force to be correct */
3005 if (emp && emp->b_prev &&
3006 (emp->b_prev->b_datap->db_type < QPCTL))
3007 goto badord;
3008 }
3009 if (emp) {
3010 if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
3011 (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
3012 (emp->b_prev->b_band < mp->b_band))) {
3013 goto badord;
3014 }
3015 } else {
3016 tmp = q->q_last;
3017 if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
3018 badord:
3019 cmn_err(CE_WARN,
3020 "insq: attempt to insert message out of order "
3021 "on q %p", (void *)q);
3022 if (freezer != curthread)
3023 mutex_exit(QLOCK(q));
3024 return (0);
3025 }
3026 }
3027
3028 if (mp->b_band != 0) {
3029 int i;
3030 qband_t **qbpp;
3031
3032 if (mp->b_band > q->q_nband) {
3033 qbpp = &q->q_bandp;
3034 while (*qbpp)
3035 qbpp = &(*qbpp)->qb_next;
3036 while (mp->b_band > q->q_nband) {
3037 if ((*qbpp = allocband()) == NULL) {
3038 if (freezer != curthread)
3039 mutex_exit(QLOCK(q));
3040 return (0);
3041 }
3042 (*qbpp)->qb_hiwat = q->q_hiwat;
3043 (*qbpp)->qb_lowat = q->q_lowat;
3044 q->q_nband++;
3045 qbpp = &(*qbpp)->qb_next;
3046 }
3047 }
3048 qbp = q->q_bandp;
3049 i = mp->b_band;
3050 while (--i)
3051 qbp = qbp->qb_next;
3052 }
3053
3054 if ((mp->b_next = emp) != NULL) {
3055 if ((mp->b_prev = emp->b_prev) != NULL)
3056 emp->b_prev->b_next = mp;
3057 else
3058 q->q_first = mp;
3059 emp->b_prev = mp;
3060 } else {
3061 if ((mp->b_prev = q->q_last) != NULL)
3062 q->q_last->b_next = mp;
3063 else
3064 q->q_first = mp;
3065 q->q_last = mp;
3066 }
3067
3068 /* Get mblk and byte count for q_count accounting */
3069 bytecnt = mp_cont_len(mp, &mblkcnt);
3070
3071 if (qbp) { /* adjust qband pointers and count */
3072 if (!qbp->qb_first) {
3073 qbp->qb_first = mp;
3074 qbp->qb_last = mp;
3075 } else {
3076 if (mp->b_prev == NULL || (mp->b_prev != NULL &&
3077 (mp->b_prev->b_band != mp->b_band)))
3078 qbp->qb_first = mp;
3079 else if (mp->b_next == NULL || (mp->b_next != NULL &&
3080 (mp->b_next->b_band != mp->b_band)))
3081 qbp->qb_last = mp;
3082 }
3083 qbp->qb_count += bytecnt;
3084 qbp->qb_mblkcnt += mblkcnt;
3085 if ((qbp->qb_count >= qbp->qb_hiwat) ||
3086 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
3087 qbp->qb_flag |= QB_FULL;
3088 }
3089 } else {
3090 q->q_count += bytecnt;
3091 q->q_mblkcnt += mblkcnt;
3092 if ((q->q_count >= q->q_hiwat) ||
3093 (q->q_mblkcnt >= q->q_hiwat)) {
3094 q->q_flag |= QFULL;
3095 }
3096 }
3097
3098 STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL);
3099
3100 if (canenable(q) && (q->q_flag & QWANTR))
3101 qenable_locked(q);
3102
3103 ASSERT(MUTEX_HELD(QLOCK(q)));
3104 if (freezer != curthread)
3105 mutex_exit(QLOCK(q));
3106
3107 return (1);
3108 }
3109
3110 /*
3111 * Create and put a control message on queue.
3112 */
3113 int
putctl(queue_t * q,int type)3114 putctl(queue_t *q, int type)
3115 {
3116 mblk_t *bp;
3117
3118 if ((datamsg(type) && (type != M_DELAY)) ||
3119 (bp = allocb_tryhard(0)) == NULL)
3120 return (0);
3121 bp->b_datap->db_type = (unsigned char) type;
3122
3123 put(q, bp);
3124
3125 return (1);
3126 }
3127
3128 /*
3129 * Control message with a single-byte parameter
3130 */
3131 int
putctl1(queue_t * q,int type,int param)3132 putctl1(queue_t *q, int type, int param)
3133 {
3134 mblk_t *bp;
3135
3136 if ((datamsg(type) && (type != M_DELAY)) ||
3137 (bp = allocb_tryhard(1)) == NULL)
3138 return (0);
3139 bp->b_datap->db_type = (unsigned char)type;
3140 *bp->b_wptr++ = (unsigned char)param;
3141
3142 put(q, bp);
3143
3144 return (1);
3145 }
3146
3147 int
putnextctl1(queue_t * q,int type,int param)3148 putnextctl1(queue_t *q, int type, int param)
3149 {
3150 mblk_t *bp;
3151
3152 if ((datamsg(type) && (type != M_DELAY)) ||
3153 ((bp = allocb_tryhard(1)) == NULL))
3154 return (0);
3155
3156 bp->b_datap->db_type = (unsigned char)type;
3157 *bp->b_wptr++ = (unsigned char)param;
3158
3159 putnext(q, bp);
3160
3161 return (1);
3162 }
3163
3164 int
putnextctl(queue_t * q,int type)3165 putnextctl(queue_t *q, int type)
3166 {
3167 mblk_t *bp;
3168
3169 if ((datamsg(type) && (type != M_DELAY)) ||
3170 ((bp = allocb_tryhard(0)) == NULL))
3171 return (0);
3172 bp->b_datap->db_type = (unsigned char)type;
3173
3174 putnext(q, bp);
3175
3176 return (1);
3177 }
3178
3179 /*
3180 * Return the queue upstream from this one
3181 */
3182 queue_t *
backq(queue_t * q)3183 backq(queue_t *q)
3184 {
3185 q = _OTHERQ(q);
3186 if (q->q_next) {
3187 q = q->q_next;
3188 return (_OTHERQ(q));
3189 }
3190 return (NULL);
3191 }
3192
3193 /*
3194 * Send a block back up the queue in reverse from this
3195 * one (e.g. to respond to ioctls)
3196 */
3197 void
qreply(queue_t * q,mblk_t * bp)3198 qreply(queue_t *q, mblk_t *bp)
3199 {
3200 ASSERT(q && bp);
3201
3202 putnext(_OTHERQ(q), bp);
3203 }
3204
3205 /*
3206 * Streams Queue Scheduling
3207 *
3208 * Queues are enabled through qenable() when they have messages to
3209 * process. They are serviced by queuerun(), which runs each enabled
3210 * queue's service procedure. The call to queuerun() is processor
3211 * dependent - the general principle is that it be run whenever a queue
3212 * is enabled but before returning to user level. For system calls,
3213 * the function runqueues() is called if their action causes a queue
3214 * to be enabled. For device interrupts, queuerun() should be
3215 * called before returning from the last level of interrupt. Beyond
3216 * this, no timing assumptions should be made about queue scheduling.
3217 */
3218
3219 /*
3220 * Enable a queue: put it on list of those whose service procedures are
3221 * ready to run and set up the scheduling mechanism.
3222 * The broadcast is done outside the mutex -> to avoid the woken thread
3223 * from contending with the mutex. This is OK 'cos the queue has been
3224 * enqueued on the runlist and flagged safely at this point.
3225 */
3226 void
qenable(queue_t * q)3227 qenable(queue_t *q)
3228 {
3229 mutex_enter(QLOCK(q));
3230 qenable_locked(q);
3231 mutex_exit(QLOCK(q));
3232 }
3233 /*
3234 * Return number of messages on queue
3235 */
3236 int
qsize(queue_t * qp)3237 qsize(queue_t *qp)
3238 {
3239 int count = 0;
3240 mblk_t *mp;
3241
3242 mutex_enter(QLOCK(qp));
3243 for (mp = qp->q_first; mp; mp = mp->b_next)
3244 count++;
3245 mutex_exit(QLOCK(qp));
3246 return (count);
3247 }
3248
3249 /*
3250 * noenable - set queue so that putq() will not enable it.
3251 * enableok - set queue so that putq() can enable it.
3252 */
3253 void
noenable(queue_t * q)3254 noenable(queue_t *q)
3255 {
3256 mutex_enter(QLOCK(q));
3257 q->q_flag |= QNOENB;
3258 mutex_exit(QLOCK(q));
3259 }
3260
3261 void
enableok(queue_t * q)3262 enableok(queue_t *q)
3263 {
3264 mutex_enter(QLOCK(q));
3265 q->q_flag &= ~QNOENB;
3266 mutex_exit(QLOCK(q));
3267 }
3268
3269 /*
3270 * Set queue fields.
3271 */
3272 int
strqset(queue_t * q,qfields_t what,unsigned char pri,intptr_t val)3273 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
3274 {
3275 qband_t *qbp = NULL;
3276 queue_t *wrq;
3277 int error = 0;
3278 kthread_id_t freezer;
3279
3280 freezer = STREAM(q)->sd_freezer;
3281 if (freezer == curthread) {
3282 ASSERT(frozenstr(q));
3283 ASSERT(MUTEX_HELD(QLOCK(q)));
3284 } else
3285 mutex_enter(QLOCK(q));
3286
3287 if (what >= QBAD) {
3288 error = EINVAL;
3289 goto done;
3290 }
3291 if (pri != 0) {
3292 int i;
3293 qband_t **qbpp;
3294
3295 if (pri > q->q_nband) {
3296 qbpp = &q->q_bandp;
3297 while (*qbpp)
3298 qbpp = &(*qbpp)->qb_next;
3299 while (pri > q->q_nband) {
3300 if ((*qbpp = allocband()) == NULL) {
3301 error = EAGAIN;
3302 goto done;
3303 }
3304 (*qbpp)->qb_hiwat = q->q_hiwat;
3305 (*qbpp)->qb_lowat = q->q_lowat;
3306 q->q_nband++;
3307 qbpp = &(*qbpp)->qb_next;
3308 }
3309 }
3310 qbp = q->q_bandp;
3311 i = pri;
3312 while (--i)
3313 qbp = qbp->qb_next;
3314 }
3315 switch (what) {
3316
3317 case QHIWAT:
3318 if (qbp)
3319 qbp->qb_hiwat = (size_t)val;
3320 else
3321 q->q_hiwat = (size_t)val;
3322 break;
3323
3324 case QLOWAT:
3325 if (qbp)
3326 qbp->qb_lowat = (size_t)val;
3327 else
3328 q->q_lowat = (size_t)val;
3329 break;
3330
3331 case QMAXPSZ:
3332 if (qbp)
3333 error = EINVAL;
3334 else
3335 q->q_maxpsz = (ssize_t)val;
3336
3337 /*
3338 * Performance concern, strwrite looks at the module below
3339 * the stream head for the maxpsz each time it does a write
3340 * we now cache it at the stream head. Check to see if this
3341 * queue is sitting directly below the stream head.
3342 */
3343 wrq = STREAM(q)->sd_wrq;
3344 if (q != wrq->q_next)
3345 break;
3346
3347 /*
3348 * If the stream is not frozen drop the current QLOCK and
3349 * acquire the sd_wrq QLOCK which protects sd_qn_*
3350 */
3351 if (freezer != curthread) {
3352 mutex_exit(QLOCK(q));
3353 mutex_enter(QLOCK(wrq));
3354 }
3355 ASSERT(MUTEX_HELD(QLOCK(wrq)));
3356
3357 if (strmsgsz != 0) {
3358 if (val == INFPSZ)
3359 val = strmsgsz;
3360 else {
3361 if (STREAM(q)->sd_vnode->v_type == VFIFO)
3362 val = MIN(PIPE_BUF, val);
3363 else
3364 val = MIN(strmsgsz, val);
3365 }
3366 }
3367 STREAM(q)->sd_qn_maxpsz = val;
3368 if (freezer != curthread) {
3369 mutex_exit(QLOCK(wrq));
3370 mutex_enter(QLOCK(q));
3371 }
3372 break;
3373
3374 case QMINPSZ:
3375 if (qbp)
3376 error = EINVAL;
3377 else
3378 q->q_minpsz = (ssize_t)val;
3379
3380 /*
3381 * Performance concern, strwrite looks at the module below
3382 * the stream head for the maxpsz each time it does a write
3383 * we now cache it at the stream head. Check to see if this
3384 * queue is sitting directly below the stream head.
3385 */
3386 wrq = STREAM(q)->sd_wrq;
3387 if (q != wrq->q_next)
3388 break;
3389
3390 /*
3391 * If the stream is not frozen drop the current QLOCK and
3392 * acquire the sd_wrq QLOCK which protects sd_qn_*
3393 */
3394 if (freezer != curthread) {
3395 mutex_exit(QLOCK(q));
3396 mutex_enter(QLOCK(wrq));
3397 }
3398 STREAM(q)->sd_qn_minpsz = (ssize_t)val;
3399
3400 if (freezer != curthread) {
3401 mutex_exit(QLOCK(wrq));
3402 mutex_enter(QLOCK(q));
3403 }
3404 break;
3405
3406 case QSTRUIOT:
3407 if (qbp)
3408 error = EINVAL;
3409 else
3410 q->q_struiot = (ushort_t)val;
3411 break;
3412
3413 case QCOUNT:
3414 case QFIRST:
3415 case QLAST:
3416 case QFLAG:
3417 error = EPERM;
3418 break;
3419
3420 default:
3421 error = EINVAL;
3422 break;
3423 }
3424 done:
3425 if (freezer != curthread)
3426 mutex_exit(QLOCK(q));
3427 return (error);
3428 }
3429
3430 /*
3431 * Get queue fields.
3432 */
3433 int
strqget(queue_t * q,qfields_t what,unsigned char pri,void * valp)3434 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
3435 {
3436 qband_t *qbp = NULL;
3437 int error = 0;
3438 kthread_id_t freezer;
3439
3440 freezer = STREAM(q)->sd_freezer;
3441 if (freezer == curthread) {
3442 ASSERT(frozenstr(q));
3443 ASSERT(MUTEX_HELD(QLOCK(q)));
3444 } else
3445 mutex_enter(QLOCK(q));
3446 if (what >= QBAD) {
3447 error = EINVAL;
3448 goto done;
3449 }
3450 if (pri != 0) {
3451 int i;
3452 qband_t **qbpp;
3453
3454 if (pri > q->q_nband) {
3455 qbpp = &q->q_bandp;
3456 while (*qbpp)
3457 qbpp = &(*qbpp)->qb_next;
3458 while (pri > q->q_nband) {
3459 if ((*qbpp = allocband()) == NULL) {
3460 error = EAGAIN;
3461 goto done;
3462 }
3463 (*qbpp)->qb_hiwat = q->q_hiwat;
3464 (*qbpp)->qb_lowat = q->q_lowat;
3465 q->q_nband++;
3466 qbpp = &(*qbpp)->qb_next;
3467 }
3468 }
3469 qbp = q->q_bandp;
3470 i = pri;
3471 while (--i)
3472 qbp = qbp->qb_next;
3473 }
3474 switch (what) {
3475 case QHIWAT:
3476 if (qbp)
3477 *(size_t *)valp = qbp->qb_hiwat;
3478 else
3479 *(size_t *)valp = q->q_hiwat;
3480 break;
3481
3482 case QLOWAT:
3483 if (qbp)
3484 *(size_t *)valp = qbp->qb_lowat;
3485 else
3486 *(size_t *)valp = q->q_lowat;
3487 break;
3488
3489 case QMAXPSZ:
3490 if (qbp)
3491 error = EINVAL;
3492 else
3493 *(ssize_t *)valp = q->q_maxpsz;
3494 break;
3495
3496 case QMINPSZ:
3497 if (qbp)
3498 error = EINVAL;
3499 else
3500 *(ssize_t *)valp = q->q_minpsz;
3501 break;
3502
3503 case QCOUNT:
3504 if (qbp)
3505 *(size_t *)valp = qbp->qb_count;
3506 else
3507 *(size_t *)valp = q->q_count;
3508 break;
3509
3510 case QFIRST:
3511 if (qbp)
3512 *(mblk_t **)valp = qbp->qb_first;
3513 else
3514 *(mblk_t **)valp = q->q_first;
3515 break;
3516
3517 case QLAST:
3518 if (qbp)
3519 *(mblk_t **)valp = qbp->qb_last;
3520 else
3521 *(mblk_t **)valp = q->q_last;
3522 break;
3523
3524 case QFLAG:
3525 if (qbp)
3526 *(uint_t *)valp = qbp->qb_flag;
3527 else
3528 *(uint_t *)valp = q->q_flag;
3529 break;
3530
3531 case QSTRUIOT:
3532 if (qbp)
3533 error = EINVAL;
3534 else
3535 *(short *)valp = q->q_struiot;
3536 break;
3537
3538 default:
3539 error = EINVAL;
3540 break;
3541 }
3542 done:
3543 if (freezer != curthread)
3544 mutex_exit(QLOCK(q));
3545 return (error);
3546 }
3547
3548 /*
3549 * Function awakes all in cvwait/sigwait/pollwait, on one of:
3550 * QWANTWSYNC or QWANTR or QWANTW,
3551 *
3552 * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
3553 * deferred wakeup will be done. Also if strpoll() in progress then a
3554 * deferred pollwakeup will be done.
3555 */
3556 void
strwakeq(queue_t * q,int flag)3557 strwakeq(queue_t *q, int flag)
3558 {
3559 stdata_t *stp = STREAM(q);
3560 pollhead_t *pl;
3561
3562 mutex_enter(&stp->sd_lock);
3563 pl = &stp->sd_pollist;
3564 if (flag & QWANTWSYNC) {
3565 ASSERT(!(q->q_flag & QREADR));
3566 if (stp->sd_flag & WSLEEP) {
3567 stp->sd_flag &= ~WSLEEP;
3568 cv_broadcast(&stp->sd_wrq->q_wait);
3569 } else {
3570 stp->sd_wakeq |= WSLEEP;
3571 }
3572
3573 mutex_exit(&stp->sd_lock);
3574 pollwakeup(pl, POLLWRNORM);
3575 mutex_enter(&stp->sd_lock);
3576
3577 if (stp->sd_sigflags & S_WRNORM)
3578 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3579 } else if (flag & QWANTR) {
3580 if (stp->sd_flag & RSLEEP) {
3581 stp->sd_flag &= ~RSLEEP;
3582 cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
3583 } else {
3584 stp->sd_wakeq |= RSLEEP;
3585 }
3586
3587 mutex_exit(&stp->sd_lock);
3588 pollwakeup(pl, POLLIN | POLLRDNORM);
3589 mutex_enter(&stp->sd_lock);
3590
3591 {
3592 int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
3593
3594 if (events)
3595 strsendsig(stp->sd_siglist, events, 0, 0);
3596 }
3597 } else {
3598 if (stp->sd_flag & WSLEEP) {
3599 stp->sd_flag &= ~WSLEEP;
3600 cv_broadcast(&stp->sd_wrq->q_wait);
3601 }
3602
3603 mutex_exit(&stp->sd_lock);
3604 pollwakeup(pl, POLLWRNORM);
3605 mutex_enter(&stp->sd_lock);
3606
3607 if (stp->sd_sigflags & S_WRNORM)
3608 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3609 }
3610 mutex_exit(&stp->sd_lock);
3611 }
3612
3613 int
struioget(queue_t * q,mblk_t * mp,struiod_t * dp,int noblock)3614 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
3615 {
3616 stdata_t *stp = STREAM(q);
3617 int typ = STRUIOT_STANDARD;
3618 uio_t *uiop = &dp->d_uio;
3619 dblk_t *dbp;
3620 ssize_t uiocnt;
3621 ssize_t cnt;
3622 unsigned char *ptr;
3623 ssize_t resid;
3624 int error = 0;
3625 on_trap_data_t otd;
3626 queue_t *stwrq;
3627
3628 /*
3629 * Plumbing may change while taking the type so store the
3630 * queue in a temporary variable. It doesn't matter even
3631 * if the we take the type from the previous plumbing,
3632 * that's because if the plumbing has changed when we were
3633 * holding the queue in a temporary variable, we can continue
3634 * processing the message the way it would have been processed
3635 * in the old plumbing, without any side effects but a bit
3636 * extra processing for partial ip header checksum.
3637 *
3638 * This has been done to avoid holding the sd_lock which is
3639 * very hot.
3640 */
3641
3642 stwrq = stp->sd_struiowrq;
3643 if (stwrq)
3644 typ = stwrq->q_struiot;
3645
3646 for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
3647 dbp = mp->b_datap;
3648 ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
3649 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3650 cnt = MIN(uiocnt, uiop->uio_resid);
3651 if (!(dbp->db_struioflag & STRUIO_SPEC) ||
3652 (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
3653 /*
3654 * Either this mblk has already been processed
3655 * or there is no more room in this mblk (?).
3656 */
3657 continue;
3658 }
3659 switch (typ) {
3660 case STRUIOT_STANDARD:
3661 if (noblock) {
3662 if (on_trap(&otd, OT_DATA_ACCESS)) {
3663 no_trap();
3664 error = EWOULDBLOCK;
3665 goto out;
3666 }
3667 }
3668 if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
3669 if (noblock)
3670 no_trap();
3671 goto out;
3672 }
3673 if (noblock)
3674 no_trap();
3675 break;
3676
3677 default:
3678 error = EIO;
3679 goto out;
3680 }
3681 dbp->db_struioflag |= STRUIO_DONE;
3682 dbp->db_cksumstuff += cnt;
3683 }
3684 out:
3685 if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
3686 /*
3687 * A fault has occured and some bytes were moved to the
3688 * current mblk, the uio_t has already been updated by
3689 * the appropriate uio routine, so also update the mblk
3690 * to reflect this in case this same mblk chain is used
3691 * again (after the fault has been handled).
3692 */
3693 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3694 if (uiocnt >= resid)
3695 dbp->db_cksumstuff += resid;
3696 }
3697 return (error);
3698 }
3699
3700 /*
3701 * Try to enter queue synchronously. Any attempt to enter a closing queue will
3702 * fails. The qp->q_rwcnt keeps track of the number of successful entries so
3703 * that removeq() will not try to close the queue while a thread is inside the
3704 * queue.
3705 */
3706 static boolean_t
rwnext_enter(queue_t * qp)3707 rwnext_enter(queue_t *qp)
3708 {
3709 mutex_enter(QLOCK(qp));
3710 if (qp->q_flag & QWCLOSE) {
3711 mutex_exit(QLOCK(qp));
3712 return (B_FALSE);
3713 }
3714 qp->q_rwcnt++;
3715 ASSERT(qp->q_rwcnt != 0);
3716 mutex_exit(QLOCK(qp));
3717 return (B_TRUE);
3718 }
3719
3720 /*
3721 * Decrease the count of threads running in sync stream queue and wake up any
3722 * threads blocked in removeq().
3723 */
3724 static void
rwnext_exit(queue_t * qp)3725 rwnext_exit(queue_t *qp)
3726 {
3727 mutex_enter(QLOCK(qp));
3728 qp->q_rwcnt--;
3729 if (qp->q_flag & QWANTRMQSYNC) {
3730 qp->q_flag &= ~QWANTRMQSYNC;
3731 cv_broadcast(&qp->q_wait);
3732 }
3733 mutex_exit(QLOCK(qp));
3734 }
3735
3736 /*
3737 * The purpose of rwnext() is to call the rw procedure of the next
3738 * (downstream) modules queue.
3739 *
3740 * treated as put entrypoint for perimeter syncronization.
3741 *
3742 * There's no need to grab sq_putlocks here (which only exist for CIPUT
3743 * sync queues). If it is CIPUT sync queue sq_count is incremented and it does
3744 * not matter if any regular put entrypoints have been already entered. We
3745 * can't increment one of the sq_putcounts (instead of sq_count) because
3746 * qwait_rw won't know which counter to decrement.
3747 *
3748 * It would be reasonable to add the lockless FASTPUT logic.
3749 */
3750 int
rwnext(queue_t * qp,struiod_t * dp)3751 rwnext(queue_t *qp, struiod_t *dp)
3752 {
3753 queue_t *nqp;
3754 syncq_t *sq;
3755 uint16_t count;
3756 uint16_t flags;
3757 struct qinit *qi;
3758 int (*proc)();
3759 struct stdata *stp;
3760 int isread;
3761 int rval;
3762
3763 stp = STREAM(qp);
3764 /*
3765 * Prevent q_next from changing by holding sd_lock until acquiring
3766 * SQLOCK. Note that a read-side rwnext from the streamhead will
3767 * already have sd_lock acquired. In either case sd_lock is always
3768 * released after acquiring SQLOCK.
3769 *
3770 * The streamhead read-side holding sd_lock when calling rwnext is
3771 * required to prevent a race condition were M_DATA mblks flowing
3772 * up the read-side of the stream could be bypassed by a rwnext()
3773 * down-call. In this case sd_lock acts as the streamhead perimeter.
3774 */
3775 if ((nqp = _WR(qp)) == qp) {
3776 isread = 0;
3777 mutex_enter(&stp->sd_lock);
3778 qp = nqp->q_next;
3779 } else {
3780 isread = 1;
3781 if (nqp != stp->sd_wrq)
3782 /* Not streamhead */
3783 mutex_enter(&stp->sd_lock);
3784 qp = _RD(nqp->q_next);
3785 }
3786 qi = qp->q_qinfo;
3787 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
3788 /*
3789 * Not a synchronous module or no r/w procedure for this
3790 * queue, so just return EINVAL and let the caller handle it.
3791 */
3792 mutex_exit(&stp->sd_lock);
3793 return (EINVAL);
3794 }
3795
3796 if (rwnext_enter(qp) == B_FALSE) {
3797 mutex_exit(&stp->sd_lock);
3798 return (EINVAL);
3799 }
3800
3801 sq = qp->q_syncq;
3802 mutex_enter(SQLOCK(sq));
3803 mutex_exit(&stp->sd_lock);
3804 count = sq->sq_count;
3805 flags = sq->sq_flags;
3806 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3807
3808 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3809 /*
3810 * if this queue is being closed, return.
3811 */
3812 if (qp->q_flag & QWCLOSE) {
3813 mutex_exit(SQLOCK(sq));
3814 rwnext_exit(qp);
3815 return (EINVAL);
3816 }
3817
3818 /*
3819 * Wait until we can enter the inner perimeter.
3820 */
3821 sq->sq_flags = flags | SQ_WANTWAKEUP;
3822 cv_wait(&sq->sq_wait, SQLOCK(sq));
3823 count = sq->sq_count;
3824 flags = sq->sq_flags;
3825 }
3826
3827 if (isread == 0 && stp->sd_struiowrq == NULL ||
3828 isread == 1 && stp->sd_struiordq == NULL) {
3829 /*
3830 * Stream plumbing changed while waiting for inner perimeter
3831 * so just return EINVAL and let the caller handle it.
3832 */
3833 mutex_exit(SQLOCK(sq));
3834 rwnext_exit(qp);
3835 return (EINVAL);
3836 }
3837 if (!(flags & SQ_CIPUT))
3838 sq->sq_flags = flags | SQ_EXCL;
3839 sq->sq_count = count + 1;
3840 ASSERT(sq->sq_count != 0); /* Wraparound */
3841 /*
3842 * Note: The only message ordering guarantee that rwnext() makes is
3843 * for the write queue flow-control case. All others (r/w queue
3844 * with q_count > 0 (or q_first != 0)) are the resposibilty of
3845 * the queue's rw procedure. This could be genralized here buy
3846 * running the queue's service procedure, but that wouldn't be
3847 * the most efficent for all cases.
3848 */
3849 mutex_exit(SQLOCK(sq));
3850 if (! isread && (qp->q_flag & QFULL)) {
3851 /*
3852 * Write queue may be flow controlled. If so,
3853 * mark the queue for wakeup when it's not.
3854 */
3855 mutex_enter(QLOCK(qp));
3856 if (qp->q_flag & QFULL) {
3857 qp->q_flag |= QWANTWSYNC;
3858 mutex_exit(QLOCK(qp));
3859 rval = EWOULDBLOCK;
3860 goto out;
3861 }
3862 mutex_exit(QLOCK(qp));
3863 }
3864
3865 if (! isread && dp->d_mp)
3866 STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
3867 dp->d_mp->b_datap->db_base);
3868
3869 rval = (*proc)(qp, dp);
3870
3871 if (isread && dp->d_mp)
3872 STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
3873 dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
3874 out:
3875 /*
3876 * The queue is protected from being freed by sq_count, so it is
3877 * safe to call rwnext_exit and reacquire SQLOCK(sq).
3878 */
3879 rwnext_exit(qp);
3880
3881 mutex_enter(SQLOCK(sq));
3882 flags = sq->sq_flags;
3883 ASSERT(sq->sq_count != 0);
3884 sq->sq_count--;
3885 if (flags & SQ_TAIL) {
3886 putnext_tail(sq, qp, flags);
3887 /*
3888 * The only purpose of this ASSERT is to preserve calling stack
3889 * in DEBUG kernel.
3890 */
3891 ASSERT(flags & SQ_TAIL);
3892 return (rval);
3893 }
3894 ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3895 /*
3896 * Safe to always drop SQ_EXCL:
3897 * Not SQ_CIPUT means we set SQ_EXCL above
3898 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3899 * did a qwriter(INNER) in which case nobody else
3900 * is in the inner perimeter and we are exiting.
3901 *
3902 * I would like to make the following assertion:
3903 *
3904 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3905 * sq->sq_count == 0);
3906 *
3907 * which indicates that if we are both putshared and exclusive,
3908 * we became exclusive while executing the putproc, and the only
3909 * claim on the syncq was the one we dropped a few lines above.
3910 * But other threads that enter putnext while the syncq is exclusive
3911 * need to make a claim as they may need to drop SQLOCK in the
3912 * has_writers case to avoid deadlocks. If these threads are
3913 * delayed or preempted, it is possible that the writer thread can
3914 * find out that there are other claims making the (sq_count == 0)
3915 * test invalid.
3916 */
3917
3918 sq->sq_flags = flags & ~SQ_EXCL;
3919 if (sq->sq_flags & SQ_WANTWAKEUP) {
3920 sq->sq_flags &= ~SQ_WANTWAKEUP;
3921 cv_broadcast(&sq->sq_wait);
3922 }
3923 mutex_exit(SQLOCK(sq));
3924 return (rval);
3925 }
3926
3927 /*
3928 * The purpose of infonext() is to call the info procedure of the next
3929 * (downstream) modules queue.
3930 *
3931 * treated as put entrypoint for perimeter syncronization.
3932 *
3933 * There's no need to grab sq_putlocks here (which only exist for CIPUT
3934 * sync queues). If it is CIPUT sync queue regular sq_count is incremented and
3935 * it does not matter if any regular put entrypoints have been already
3936 * entered.
3937 */
3938 int
infonext(queue_t * qp,infod_t * idp)3939 infonext(queue_t *qp, infod_t *idp)
3940 {
3941 queue_t *nqp;
3942 syncq_t *sq;
3943 uint16_t count;
3944 uint16_t flags;
3945 struct qinit *qi;
3946 int (*proc)();
3947 struct stdata *stp;
3948 int rval;
3949
3950 stp = STREAM(qp);
3951 /*
3952 * Prevent q_next from changing by holding sd_lock until
3953 * acquiring SQLOCK.
3954 */
3955 mutex_enter(&stp->sd_lock);
3956 if ((nqp = _WR(qp)) == qp) {
3957 qp = nqp->q_next;
3958 } else {
3959 qp = _RD(nqp->q_next);
3960 }
3961 qi = qp->q_qinfo;
3962 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
3963 mutex_exit(&stp->sd_lock);
3964 return (EINVAL);
3965 }
3966 sq = qp->q_syncq;
3967 mutex_enter(SQLOCK(sq));
3968 mutex_exit(&stp->sd_lock);
3969 count = sq->sq_count;
3970 flags = sq->sq_flags;
3971 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3972
3973 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3974 /*
3975 * Wait until we can enter the inner perimeter.
3976 */
3977 sq->sq_flags = flags | SQ_WANTWAKEUP;
3978 cv_wait(&sq->sq_wait, SQLOCK(sq));
3979 count = sq->sq_count;
3980 flags = sq->sq_flags;
3981 }
3982
3983 if (! (flags & SQ_CIPUT))
3984 sq->sq_flags = flags | SQ_EXCL;
3985 sq->sq_count = count + 1;
3986 ASSERT(sq->sq_count != 0); /* Wraparound */
3987 mutex_exit(SQLOCK(sq));
3988
3989 rval = (*proc)(qp, idp);
3990
3991 mutex_enter(SQLOCK(sq));
3992 flags = sq->sq_flags;
3993 ASSERT(sq->sq_count != 0);
3994 sq->sq_count--;
3995 if (flags & SQ_TAIL) {
3996 putnext_tail(sq, qp, flags);
3997 /*
3998 * The only purpose of this ASSERT is to preserve calling stack
3999 * in DEBUG kernel.
4000 */
4001 ASSERT(flags & SQ_TAIL);
4002 return (rval);
4003 }
4004 ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
4005 /*
4006 * XXXX
4007 * I am not certain the next comment is correct here. I need to consider
4008 * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
4009 * might cause other problems. It just might be safer to drop it if
4010 * !SQ_CIPUT because that is when we set it.
4011 */
4012 /*
4013 * Safe to always drop SQ_EXCL:
4014 * Not SQ_CIPUT means we set SQ_EXCL above
4015 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure
4016 * did a qwriter(INNER) in which case nobody else
4017 * is in the inner perimeter and we are exiting.
4018 *
4019 * I would like to make the following assertion:
4020 *
4021 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
4022 * sq->sq_count == 0);
4023 *
4024 * which indicates that if we are both putshared and exclusive,
4025 * we became exclusive while executing the putproc, and the only
4026 * claim on the syncq was the one we dropped a few lines above.
4027 * But other threads that enter putnext while the syncq is exclusive
4028 * need to make a claim as they may need to drop SQLOCK in the
4029 * has_writers case to avoid deadlocks. If these threads are
4030 * delayed or preempted, it is possible that the writer thread can
4031 * find out that there are other claims making the (sq_count == 0)
4032 * test invalid.
4033 */
4034
4035 sq->sq_flags = flags & ~SQ_EXCL;
4036 mutex_exit(SQLOCK(sq));
4037 return (rval);
4038 }
4039
4040 /*
4041 * Return nonzero if the queue is responsible for struio(), else return 0.
4042 */
4043 int
isuioq(queue_t * q)4044 isuioq(queue_t *q)
4045 {
4046 if (q->q_flag & QREADR)
4047 return (STREAM(q)->sd_struiordq == q);
4048 else
4049 return (STREAM(q)->sd_struiowrq == q);
4050 }
4051
4052 #if defined(__sparc)
4053 int disable_putlocks = 0;
4054 #else
4055 int disable_putlocks = 1;
4056 #endif
4057
4058 /*
4059 * called by create_putlock.
4060 */
4061 static void
create_syncq_putlocks(queue_t * q)4062 create_syncq_putlocks(queue_t *q)
4063 {
4064 syncq_t *sq = q->q_syncq;
4065 ciputctrl_t *cip;
4066 int i;
4067
4068 ASSERT(sq != NULL);
4069
4070 ASSERT(disable_putlocks == 0);
4071 ASSERT(n_ciputctrl >= min_n_ciputctrl);
4072 ASSERT(ciputctrl_cache != NULL);
4073
4074 if (!(sq->sq_type & SQ_CIPUT))
4075 return;
4076
4077 for (i = 0; i <= 1; i++) {
4078 if (sq->sq_ciputctrl == NULL) {
4079 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4080 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4081 mutex_enter(SQLOCK(sq));
4082 if (sq->sq_ciputctrl != NULL) {
4083 mutex_exit(SQLOCK(sq));
4084 kmem_cache_free(ciputctrl_cache, cip);
4085 } else {
4086 ASSERT(sq->sq_nciputctrl == 0);
4087 sq->sq_nciputctrl = n_ciputctrl - 1;
4088 /*
4089 * putnext checks sq_ciputctrl without holding
4090 * SQLOCK. if it is not NULL putnext assumes
4091 * sq_nciputctrl is initialized. membar below
4092 * insures that.
4093 */
4094 membar_producer();
4095 sq->sq_ciputctrl = cip;
4096 mutex_exit(SQLOCK(sq));
4097 }
4098 }
4099 ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
4100 if (i == 1)
4101 break;
4102 q = _OTHERQ(q);
4103 if (!(q->q_flag & QPERQ)) {
4104 ASSERT(sq == q->q_syncq);
4105 break;
4106 }
4107 ASSERT(q->q_syncq != NULL);
4108 ASSERT(sq != q->q_syncq);
4109 sq = q->q_syncq;
4110 ASSERT(sq->sq_type & SQ_CIPUT);
4111 }
4112 }
4113
4114 /*
4115 * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
4116 * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
4117 * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
4118 * starting from q and down to the driver.
4119 *
4120 * This should be called after the affected queues are part of stream
4121 * geometry. It should be called from driver/module open routine after
4122 * qprocson() call. It is also called from nfs syscall where it is known that
4123 * stream is configured and won't change its geometry during create_putlock
4124 * call.
4125 *
4126 * caller normally uses 0 value for the stream argument to speed up MT putnext
4127 * into the perimeter of q for example because its perimeter is per module
4128 * (e.g. IP).
4129 *
4130 * caller normally uses non 0 value for the stream argument to hint the system
4131 * that the stream of q is a very contended global system stream
4132 * (e.g. NFS/UDP) and the part of the stream from q to the driver is
4133 * particularly MT hot.
4134 *
4135 * Caller insures stream plumbing won't happen while we are here and therefore
4136 * q_next can be safely used.
4137 */
4138
4139 void
create_putlocks(queue_t * q,int stream)4140 create_putlocks(queue_t *q, int stream)
4141 {
4142 ciputctrl_t *cip;
4143 struct stdata *stp = STREAM(q);
4144
4145 q = _WR(q);
4146 ASSERT(stp != NULL);
4147
4148 if (disable_putlocks != 0)
4149 return;
4150
4151 if (n_ciputctrl < min_n_ciputctrl)
4152 return;
4153
4154 ASSERT(ciputctrl_cache != NULL);
4155
4156 if (stream != 0 && stp->sd_ciputctrl == NULL) {
4157 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4158 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4159 mutex_enter(&stp->sd_lock);
4160 if (stp->sd_ciputctrl != NULL) {
4161 mutex_exit(&stp->sd_lock);
4162 kmem_cache_free(ciputctrl_cache, cip);
4163 } else {
4164 ASSERT(stp->sd_nciputctrl == 0);
4165 stp->sd_nciputctrl = n_ciputctrl - 1;
4166 /*
4167 * putnext checks sd_ciputctrl without holding
4168 * sd_lock. if it is not NULL putnext assumes
4169 * sd_nciputctrl is initialized. membar below
4170 * insures that.
4171 */
4172 membar_producer();
4173 stp->sd_ciputctrl = cip;
4174 mutex_exit(&stp->sd_lock);
4175 }
4176 }
4177
4178 ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);
4179
4180 while (_SAMESTR(q)) {
4181 create_syncq_putlocks(q);
4182 if (stream == 0)
4183 return;
4184 q = q->q_next;
4185 }
4186 ASSERT(q != NULL);
4187 create_syncq_putlocks(q);
4188 }
4189
4190 /*
4191 * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
4192 * through a stream.
4193 *
4194 * Data currently record per-event is a timestamp, module/driver name,
4195 * downstream module/driver name, optional callstack, event type and a per
4196 * type datum. Much of the STREAMS framework is instrumented for automatic
4197 * flow tracing (when enabled). Events can be defined and used by STREAMS
4198 * modules and drivers.
4199 *
4200 * Global objects:
4201 *
4202 * str_ftevent() - Add a flow-trace event to a dblk.
4203 * str_ftfree() - Free flow-trace data
4204 *
4205 * Local objects:
4206 *
4207 * fthdr_cache - pointer to the kmem cache for trace header.
4208 * ftblk_cache - pointer to the kmem cache for trace data blocks.
4209 */
4210
4211 int str_ftnever = 1; /* Don't do STREAMS flow tracing */
4212 int str_ftstack = 0; /* Don't record event call stacks */
4213
4214 void
str_ftevent(fthdr_t * hp,void * p,ushort_t evnt,ushort_t data)4215 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
4216 {
4217 ftblk_t *bp = hp->tail;
4218 ftblk_t *nbp;
4219 ftevnt_t *ep;
4220 int ix, nix;
4221
4222 ASSERT(hp != NULL);
4223
4224 for (;;) {
4225 if ((ix = bp->ix) == FTBLK_EVNTS) {
4226 /*
4227 * Tail doesn't have room, so need a new tail.
4228 *
4229 * To make this MT safe, first, allocate a new
4230 * ftblk, and initialize it. To make life a
4231 * little easier, reserve the first slot (mostly
4232 * by making ix = 1). When we are finished with
4233 * the initialization, CAS this pointer to the
4234 * tail. If this succeeds, this is the new
4235 * "next" block. Otherwise, another thread
4236 * got here first, so free the block and start
4237 * again.
4238 */
4239 nbp = kmem_cache_alloc(ftblk_cache, KM_NOSLEEP);
4240 if (nbp == NULL) {
4241 /* no mem, so punt */
4242 str_ftnever++;
4243 /* free up all flow data? */
4244 return;
4245 }
4246 nbp->nxt = NULL;
4247 nbp->ix = 1;
4248 /*
4249 * Just in case there is another thread about
4250 * to get the next index, we need to make sure
4251 * the value is there for it.
4252 */
4253 membar_producer();
4254 if (atomic_cas_ptr(&hp->tail, bp, nbp) == bp) {
4255 /* CAS was successful */
4256 bp->nxt = nbp;
4257 membar_producer();
4258 bp = nbp;
4259 ix = 0;
4260 goto cas_good;
4261 } else {
4262 kmem_cache_free(ftblk_cache, nbp);
4263 bp = hp->tail;
4264 continue;
4265 }
4266 }
4267 nix = ix + 1;
4268 if (atomic_cas_32((uint32_t *)&bp->ix, ix, nix) == ix) {
4269 cas_good:
4270 if (curthread != hp->thread) {
4271 hp->thread = curthread;
4272 evnt |= FTEV_CS;
4273 }
4274 if (CPU->cpu_seqid != hp->cpu_seqid) {
4275 hp->cpu_seqid = CPU->cpu_seqid;
4276 evnt |= FTEV_PS;
4277 }
4278 ep = &bp->ev[ix];
4279 break;
4280 }
4281 }
4282
4283 if (evnt & FTEV_QMASK) {
4284 queue_t *qp = p;
4285
4286 if (!(qp->q_flag & QREADR))
4287 evnt |= FTEV_ISWR;
4288
4289 ep->mid = Q2NAME(qp);
4290
4291 /*
4292 * We only record the next queue name for FTEV_PUTNEXT since
4293 * that's the only time we *really* need it, and the putnext()
4294 * code ensures that qp->q_next won't vanish. (We could use
4295 * claimstr()/releasestr() but at a performance cost.)
4296 */
4297 if ((evnt & FTEV_MASK) == FTEV_PUTNEXT && qp->q_next != NULL)
4298 ep->midnext = Q2NAME(qp->q_next);
4299 else
4300 ep->midnext = NULL;
4301 } else {
4302 ep->mid = p;
4303 ep->midnext = NULL;
4304 }
4305
4306 if (ep->stk != NULL)
4307 ep->stk->fs_depth = getpcstack(ep->stk->fs_stk, FTSTK_DEPTH);
4308
4309 ep->ts = gethrtime();
4310 ep->evnt = evnt;
4311 ep->data = data;
4312 hp->hash = (hp->hash << 9) + hp->hash;
4313 hp->hash += (evnt << 16) | data;
4314 hp->hash += (uintptr_t)ep->mid;
4315 }
4316
4317 /*
4318 * Free flow-trace data.
4319 */
4320 void
str_ftfree(dblk_t * dbp)4321 str_ftfree(dblk_t *dbp)
4322 {
4323 fthdr_t *hp = dbp->db_fthdr;
4324 ftblk_t *bp = &hp->first;
4325 ftblk_t *nbp;
4326
4327 if (bp != hp->tail || bp->ix != 0) {
4328 /*
4329 * Clear out the hash, have the tail point to itself, and free
4330 * any continuation blocks.
4331 */
4332 bp = hp->first.nxt;
4333 hp->tail = &hp->first;
4334 hp->hash = 0;
4335 hp->first.nxt = NULL;
4336 hp->first.ix = 0;
4337 while (bp != NULL) {
4338 nbp = bp->nxt;
4339 kmem_cache_free(ftblk_cache, bp);
4340 bp = nbp;
4341 }
4342 }
4343 kmem_cache_free(fthdr_cache, hp);
4344 dbp->db_fthdr = NULL;
4345 }
4346